From 84872cf9a26cc2b90119239fd1617b644936aa05 Mon Sep 17 00:00:00 2001 From: Jordan Date: Tue, 6 Jun 2023 18:22:25 +0100 Subject: [PATCH] chore: improve queue documentation and submit history to better queue (#2266) --- .../bukkit/adapter/BukkitQueueHandler.java | 6 +- .../history/changeset/AbstractChangeSet.java | 2 +- .../queue/implementation/QueueHandler.java | 219 +++++++++++++++++- .../core/util/TaskManager.java | 4 +- .../core/util/task/AsyncNotifyQueue.java | 13 +- .../task/FaweForkJoinWorkerThreadFactory.java | 21 ++ 6 files changed, 245 insertions(+), 20 deletions(-) create mode 100644 worldedit-core/src/main/java/com/fastasyncworldedit/core/util/task/FaweForkJoinWorkerThreadFactory.java diff --git a/worldedit-bukkit/src/main/java/com/fastasyncworldedit/bukkit/adapter/BukkitQueueHandler.java b/worldedit-bukkit/src/main/java/com/fastasyncworldedit/bukkit/adapter/BukkitQueueHandler.java index 6e1bb1f9e..eec38e7ac 100644 --- a/worldedit-bukkit/src/main/java/com/fastasyncworldedit/bukkit/adapter/BukkitQueueHandler.java +++ b/worldedit-bukkit/src/main/java/com/fastasyncworldedit/bukkit/adapter/BukkitQueueHandler.java @@ -3,8 +3,6 @@ package com.fastasyncworldedit.bukkit.adapter; import co.aikar.timings.Timings; import com.fastasyncworldedit.bukkit.listener.ChunkListener; import com.fastasyncworldedit.core.queue.implementation.QueueHandler; -import com.sk89q.worldedit.internal.util.LogManagerCompat; -import org.apache.logging.log4j.Logger; import java.lang.reflect.Field; import java.lang.reflect.Method; @@ -31,7 +29,7 @@ public class BukkitQueueHandler extends QueueHandler { } @Override - public void startSet(boolean parallel) { + public void startUnsafe(boolean parallel) { ChunkListener.physicsFreeze = true; if (parallel) { try { @@ -51,7 +49,7 @@ public class BukkitQueueHandler extends QueueHandler { } @Override - public void endSet(boolean parallel) { + public void endUnsafe(boolean parallel) { ChunkListener.physicsFreeze = false; if (parallel) { try { diff --git a/worldedit-core/src/main/java/com/fastasyncworldedit/core/history/changeset/AbstractChangeSet.java b/worldedit-core/src/main/java/com/fastasyncworldedit/core/history/changeset/AbstractChangeSet.java index bad1f96c6..b3493c896 100644 --- a/worldedit-core/src/main/java/com/fastasyncworldedit/core/history/changeset/AbstractChangeSet.java +++ b/worldedit-core/src/main/java/com/fastasyncworldedit/core/history/changeset/AbstractChangeSet.java @@ -389,7 +389,7 @@ public abstract class AbstractChangeSet implements ChangeSet, IBatchProcessor { return; // fast path to avoid additional tasks: a worker is already draining the queue } // create a new worker to drain the current queue - Fawe.instance().getQueueHandler().submit(() -> drainQueue(false)); + Fawe.instance().getQueueHandler().async(() -> drainQueue(false)); } private void drainQueue(boolean ignoreRunningState) { diff --git a/worldedit-core/src/main/java/com/fastasyncworldedit/core/queue/implementation/QueueHandler.java b/worldedit-core/src/main/java/com/fastasyncworldedit/core/queue/implementation/QueueHandler.java index 2091a0bb9..ae3ba4043 100644 --- a/worldedit-core/src/main/java/com/fastasyncworldedit/core/queue/implementation/QueueHandler.java +++ b/worldedit-core/src/main/java/com/fastasyncworldedit/core/queue/implementation/QueueHandler.java @@ -14,6 +14,7 @@ import com.fastasyncworldedit.core.queue.implementation.chunk.ChunkCache; import com.fastasyncworldedit.core.util.MemUtil; import com.fastasyncworldedit.core.util.TaskManager; import com.fastasyncworldedit.core.util.collection.CleanableThreadLocal; +import com.fastasyncworldedit.core.util.task.FaweForkJoinWorkerThreadFactory; import com.fastasyncworldedit.core.wrappers.WorldWrapper; import com.google.common.util.concurrent.Futures; import com.sk89q.worldedit.world.World; @@ -39,10 +40,41 @@ import java.util.function.Supplier; @SuppressWarnings({"unchecked", "rawtypes"}) public abstract class QueueHandler implements Trimable, Runnable { - private final ForkJoinPool forkJoinPoolPrimary = new ForkJoinPool(); - private final ForkJoinPool forkJoinPoolSecondary = new ForkJoinPool(); + private static final int PROCESSORS = Runtime.getRuntime().availableProcessors(); + + /** + * Primary queue should be used for tasks that are unlikely to wait on other tasks, IO, etc. (i.e. spend most of their + * time utilising CPU. + */ + private final ForkJoinPool forkJoinPoolPrimary = new ForkJoinPool( + PROCESSORS, + new FaweForkJoinWorkerThreadFactory("FAWE Fork Join Pool Primary - %s"), + null, + false + ); + /** + * Secondary queue should be used for "cleanup" tasks that are likely to be shorter in life than those submitted to the + * primary queue. They may be IO-bound tasks. + */ + private final ForkJoinPool forkJoinPoolSecondary = new ForkJoinPool( + PROCESSORS, + new FaweForkJoinWorkerThreadFactory("FAWE Fork Join Pool Secondary - %s"), + null, + false + ); + /** + * Main "work-horse" queue for FAWE. Handles chunk submission (and chunk submission alone). Blocking in order to forcibly + * prevent overworking/over-submission of chunk process tasks. + */ private final ThreadPoolExecutor blockingExecutor = FaweCache.INSTANCE.newBlockingExecutor(); + /** + * Queue for tasks to be completed on the main thread. These take priority of tasks submitted to syncWhenFree queue + */ private final ConcurrentLinkedQueue syncTasks = new ConcurrentLinkedQueue<>(); + /** + * Queue for tasks to be completed on the main thread. These are completed only if and when there is time left in a tick + * after completing all tasks in the syncTasks queue + */ private final ConcurrentLinkedQueue syncWhenFree = new ConcurrentLinkedQueue<>(); private final Map>> chunkGetCache = new HashMap<>(); @@ -54,7 +86,7 @@ public abstract class QueueHandler implements Trimable, Runnable { private long last; private long allocate = 50; - public QueueHandler() { + protected QueueHandler() { TaskManager.taskManager().repeat(this, 1); } @@ -80,6 +112,12 @@ public abstract class QueueHandler implements Trimable, Runnable { } } + /** + * Get if the {@code blockingExecutor} is saturated with tasks or not. Under-utilisation implies the queue has space for + * more submissions. + * + * @return true if {@code blockingExecutor} is not saturated with tasks + */ public boolean isUnderutilized() { return blockingExecutor.getActiveCount() < blockingExecutor.getMaximumPoolSize(); } @@ -125,6 +163,10 @@ public abstract class QueueHandler implements Trimable, Runnable { } while (System.currentTimeMillis() - start < currentAllocate); } + /** + * @deprecated For removal without replacement. + */ + @Deprecated(forRemoval = true, since = "TODO") public > void complete(Future task) { try { while (task != null) { @@ -135,49 +177,140 @@ public abstract class QueueHandler implements Trimable, Runnable { } } + /** + * Complete a task in the {@code forkJoinPoolSecondary} queue. Secondary queue should be used for "cleanup" tasks that are + * likely to be shorter in life than those submitted to the primary queue. They may be IO-bound tasks. + * + * @param run Runnable to run + * @param value Value to return when done + * @param Value type + * @return Future for submitted task + */ public Future async(Runnable run, T value) { return forkJoinPoolSecondary.submit(run, value); } + /** + * Complete a task in the {@code forkJoinPoolSecondary} queue. Secondary queue should be used for "cleanup" tasks that are + * likely to be shorter in life than those submitted to the primary queue. They may be IO-bound tasks. + * + * @param run Runnable to run + * @return Future for submitted task + */ public Future async(Runnable run) { return forkJoinPoolSecondary.submit(run); } + /** + * Complete a task in the {@code forkJoinPoolSecondary} queue. Secondary queue should be used for "cleanup" tasks that are + * likely to be shorter in life than those submitted to the primary queue. They may be IO-bound tasks. + * + * @param call Callable to run + * @param Return value type + * @return Future for submitted task + */ public Future async(Callable call) { return forkJoinPoolSecondary.submit(call); } - public ForkJoinTask submit(Runnable call) { - return forkJoinPoolPrimary.submit(call); + /** + * Complete a task in the {@code forkJoinPoolPrimary} queue. Primary queue should be used for tasks that are unlikely to + * wait on other tasks, IO, etc. (i.e. spend most of their time utilising CPU. + * + * @param run Task to run + * @return {@link ForkJoinTask} representing task being run + */ + public ForkJoinTask submit(Runnable run) { + return forkJoinPoolPrimary.submit(run); } + /** + * Submit a task to be run on the main thread. Does not guarantee to be run on the next tick as FAWE will only operate to + * maintain approx. 18 tps. + * + * @param run Task to run + * @param Value type + * @return Future representing task + */ public Future sync(Runnable run) { return sync(run, syncTasks); } + /** + * Submit a task to be run on the main thread. Does not guarantee to be run on the next tick as FAWE will only operate to + * maintain approx. 18 tps. + * + * @param call Task to run + * @param Value type + * @return Future representing task + */ public Future sync(Callable call) throws Exception { return sync(call, syncTasks); } - public Future sync(Supplier call) { - return sync(call, syncTasks); + /** + * Submit a task to be run on the main thread. Does not guarantee to be run on the next tick as FAWE will only operate to + * maintain approx. 18 tps. + * + * @param supplier Task to run + * @param Value type + * @return Future representing task + */ + public Future sync(Supplier supplier) { + return sync(supplier, syncTasks); } - // Lower priority sync task (runs only when there are no other tasks) + /** + * Submit a task to be run on the main thread. Does not guarantee to be run on the next tick as FAWE will only operate to + * maintain approx. 18 tps. Takes lower priority than tasks submitted via any {@code QueueHandler#sync} method. Completed + * only if and when there is time left in a tick after completing all sync tasks submitted using the aforementioned methods. + * + * @param run Task to run + * @param value Value to return when done + * @param Value type + * @return Future representing task + */ public Future syncWhenFree(Runnable run, T value) { return sync(run, value, syncWhenFree); } + /** + * Submit a task to be run on the main thread. Does not guarantee to be run on the next tick as FAWE will only operate to + * maintain approx. 18 tps. Takes lower priority than tasks submitted via any {@code QueueHandler#sync} method. Completed + * only if and when there is time left in a tick after completing all sync tasks submitted using the aforementioned methods. + * + * @param run Task to run + * @param Value type + * @return Future representing task + */ public Future syncWhenFree(Runnable run) { return sync(run, syncWhenFree); } + /** + * Submit a task to be run on the main thread. Does not guarantee to be run on the next tick as FAWE will only operate to + * maintain approx. 18 tps. Takes lower priority than tasks submitted via any {@code QueueHandler#sync} method. Completed + * only if and when there is time left in a tick after completing all sync tasks submitted using the aforementioned methods. + * + * @param call Task to run + * @param Value type + * @return Future representing task + */ public Future syncWhenFree(Callable call) throws Exception { return sync(call, syncWhenFree); } - public Future syncWhenFree(Supplier call) { - return sync(call, syncWhenFree); + /** + * Submit a task to be run on the main thread. Does not guarantee to be run on the next tick as FAWE will only operate to + * maintain approx. 18 tps. Takes lower priority than tasks submitted via any {@code QueueHandler#sync} method. Completed + * only if and when there is time left in a tick after completing all sync tasks submitted using the aforementioned methods. + * + * @param supplier Task to run + * @param Value type + * @return Future representing task + */ + public Future syncWhenFree(Supplier supplier) { + return sync(supplier, syncWhenFree); } private Future sync(Runnable run, T value, Queue queue) { @@ -228,6 +361,15 @@ public abstract class QueueHandler implements Trimable, Runnable { } } + /** + * Internal use only. Specifically for submitting {@link IQueueChunk} for "processing" an edit. Submits to the blocking + * executor, the main "work-horse" queue for FAWE. Handles chunk submission (and chunk submission alone). Blocking in order + * to forcibly prevent overworking/over-submission of chunk process tasks. + * + * @param chunk chunk + * @param + * @return Future representing task + */ public > T submit(IQueueChunk chunk) { // if (MemUtil.isMemoryFree()) { TODO NOT IMPLEMENTED - optimize this // return (T) forkJoinPoolSecondary.submit(chunk); @@ -259,6 +401,9 @@ public abstract class QueueHandler implements Trimable, Runnable { return new SingleThreadQueueExtent(); } + /** + * Sets the current thread's {@link IQueueExtent} instance in the queue pool to null. + */ public void unCache() { queuePool.set(null); } @@ -271,14 +416,58 @@ public abstract class QueueHandler implements Trimable, Runnable { return queue; } - public abstract void startSet(boolean parallel); + /** + * Indicate a "set" task is being started. + * + * @param parallel if the "set" being started is parallel/async + * @deprecated To be replaced by better-named {@link QueueHandler#startUnsafe(boolean)} )} + */ + @Deprecated(forRemoval = true, since = "TODO") + public void startSet(boolean parallel) { + startUnsafe(parallel); + } - public abstract void endSet(boolean parallel); + /** + * Indicate a "set" task is ending. + * + * @param parallel if the "set" being started is parallel/async + * @deprecated To be replaced by better-named {@link QueueHandler#endUnsafe(boolean)} )} + */ + @Deprecated(forRemoval = true, since = "TODO") + public void endSet(boolean parallel) { + startUnsafe(parallel); + } + + /** + * Indicate an unsafe task is starting. Physics are frozen, async catchers disabled, etc. for the duration of the task + * + * @param parallel If the task is being run async and/or in parallel + */ + public abstract void startUnsafe(boolean parallel); + + /** + * Indicate a/the unsafe task submitted after a {@link QueueHandler#startUnsafe(boolean)} call has ended. + * + * @param parallel If the task was being run async and/or in parallel + */ + public abstract void endUnsafe(boolean parallel); + + /** + * Create a new queue for a given world. + */ public IQueueExtent getQueue(World world) { return getQueue(world, null, null); } + /** + * Create a new queue for a given world. + * + * @param world World to create queue for + * @param processor existing processor to set to queue or null + * @param postProcessor existing post-processor to set to queue or null + * @return New queue for given world + */ public IQueueExtent getQueue(World world, IBatchProcessor processor, IBatchProcessor postProcessor) { final IQueueExtent queue = pool(); IChunkCache cacheGet = getOrCreateWorldCache(world); @@ -293,6 +482,12 @@ public abstract class QueueHandler implements Trimable, Runnable { return queue; } + /** + * Trims each chunk GET cache + * + * @param aggressive if each chunk GET cache should be trimmed aggressively + * @return true if all chunk GET caches could be trimmed + */ @Override public boolean trim(boolean aggressive) { boolean result = true; diff --git a/worldedit-core/src/main/java/com/fastasyncworldedit/core/util/TaskManager.java b/worldedit-core/src/main/java/com/fastasyncworldedit/core/util/TaskManager.java index ad69c87d6..f3abf0ac3 100644 --- a/worldedit-core/src/main/java/com/fastasyncworldedit/core/util/TaskManager.java +++ b/worldedit-core/src/main/java/com/fastasyncworldedit/core/util/TaskManager.java @@ -157,13 +157,13 @@ public abstract class TaskManager { */ public void runUnsafe(Runnable run) { QueueHandler queue = Fawe.instance().getQueueHandler(); - queue.startSet(true); + queue.startUnsafe(Fawe.isMainThread()); try { run.run(); } catch (Throwable e) { e.printStackTrace(); } - queue.endSet(true); + queue.endUnsafe(Fawe.isMainThread()); } /** diff --git a/worldedit-core/src/main/java/com/fastasyncworldedit/core/util/task/AsyncNotifyQueue.java b/worldedit-core/src/main/java/com/fastasyncworldedit/core/util/task/AsyncNotifyQueue.java index 10f910928..7cd91f87a 100644 --- a/worldedit-core/src/main/java/com/fastasyncworldedit/core/util/task/AsyncNotifyQueue.java +++ b/worldedit-core/src/main/java/com/fastasyncworldedit/core/util/task/AsyncNotifyQueue.java @@ -1,9 +1,13 @@ package com.fastasyncworldedit.core.util.task; import com.fastasyncworldedit.core.Fawe; +import com.fastasyncworldedit.core.configuration.Settings; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.io.Closeable; import java.util.concurrent.Callable; +import java.util.concurrent.Executors; +import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -11,6 +15,13 @@ import java.util.function.Supplier; public class AsyncNotifyQueue implements Closeable { + private static final ForkJoinPool QUEUE_SUBMISSIONS = new ForkJoinPool( + Settings.settings().QUEUE.PARALLEL_THREADS, + new FaweForkJoinWorkerThreadFactory("AsyncNotifyQueue - %s"), + null, + false + ); + private final Lock lock = new ReentrantLock(true); private final Thread.UncaughtExceptionHandler handler; private boolean closed; @@ -59,7 +70,7 @@ public class AsyncNotifyQueue implements Closeable { } return null; }; - self[0] = Fawe.instance().getQueueHandler().async(wrapped); + self[0] = QUEUE_SUBMISSIONS.submit(wrapped); return self[0]; } diff --git a/worldedit-core/src/main/java/com/fastasyncworldedit/core/util/task/FaweForkJoinWorkerThreadFactory.java b/worldedit-core/src/main/java/com/fastasyncworldedit/core/util/task/FaweForkJoinWorkerThreadFactory.java new file mode 100644 index 000000000..589b5b863 --- /dev/null +++ b/worldedit-core/src/main/java/com/fastasyncworldedit/core/util/task/FaweForkJoinWorkerThreadFactory.java @@ -0,0 +1,21 @@ +package com.fastasyncworldedit.core.util.task; + +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ForkJoinWorkerThread; + +public class FaweForkJoinWorkerThreadFactory implements ForkJoinPool.ForkJoinWorkerThreadFactory { + + private final String nameFormat; + + public FaweForkJoinWorkerThreadFactory(String nameFormat) { + this.nameFormat = nameFormat; + } + + @Override + public ForkJoinWorkerThread newThread(ForkJoinPool pool) { + final ForkJoinWorkerThread worker = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool); + worker.setName(String.format(nameFormat, worker.getPoolIndex())); + return worker; + } + +}