Implement async notify queue that submits to a KeyQueuedExecutorService (#2334)

This commit is contained in:
Jordan 2023-07-20 16:58:01 +01:00 committed by GitHub
parent 18df87a4e8
commit 9543adc776
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 114 additions and 22 deletions

View File

@ -27,6 +27,7 @@ import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.Future;
@ -52,6 +53,7 @@ public abstract class QueueHandler implements Trimable, Runnable {
null,
false
);
/**
* Secondary queue should be used for "cleanup" tasks that are likely to be shorter in life than those submitted to the
* primary queue. They may be IO-bound tasks.
@ -508,4 +510,28 @@ public abstract class QueueHandler implements Trimable, Runnable {
return result;
}
/**
* Primary queue should be used for tasks that are unlikely to wait on other tasks, IO, etc. (i.e. spend most of their
* time utilising CPU.
* <p>
* Internal API usage only.
*
* @since TODO
*/
public ExecutorService getForkJoinPoolPrimary() {
return forkJoinPoolPrimary;
}
/**
* Secondary queue should be used for "cleanup" tasks that are likely to be shorter in life than those submitted to the
* primary queue. They may be IO-bound tasks.
* <p>
* Internal API usage only.
*
* @since TODO
*/
public ExecutorService getForkJoinPoolSecondary() {
return forkJoinPoolSecondary;
}
}

View File

@ -0,0 +1,81 @@
package com.fastasyncworldedit.core.util.task;
import com.fastasyncworldedit.core.configuration.Settings;
import java.io.Closeable;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.function.Supplier;
/**
* async queue that accepts a {@link Thread.UncaughtExceptionHandler} for exception handling per instance, delegating to a
* parent {@link KeyQueuedExecutorService}.
*
* @since TODO
*/
public class AsyncNotifyKeyedQueue implements Closeable {
private static final KeyQueuedExecutorService<UUID> QUEUE_SUBMISSIONS = new KeyQueuedExecutorService<>(new ForkJoinPool(
Settings.settings().QUEUE.PARALLEL_THREADS,
new FaweForkJoinWorkerThreadFactory("AsyncNotifyKeyedQueue - %s"),
null,
false
));
private final Thread.UncaughtExceptionHandler handler;
private final Supplier<UUID> key;
private volatile boolean closed;
/**
* New instance
*
* @param handler exception handler
* @param key supplier of UUID key
*/
public AsyncNotifyKeyedQueue(Thread.UncaughtExceptionHandler handler, Supplier<UUID> key) {
this.handler = handler;
this.key = key;
}
public Thread.UncaughtExceptionHandler getHandler() {
return handler;
}
public <T> Future<T> run(Runnable task) {
return call(() -> {
task.run();
return null;
});
}
public <T> Future<T> call(Callable<T> task) {
Future[] self = new Future[1];
Callable<T> wrapped = () -> {
if (!closed) {
try {
return task.call();
} catch (Throwable e) {
handler.uncaughtException(Thread.currentThread(), e);
}
}
if (self[0] != null) {
self[0].cancel(true);
}
return null;
};
self[0] = QUEUE_SUBMISSIONS.submit(key.get(), wrapped);
return self[0];
}
@Override
public void close() {
closed = true;
}
public boolean isClosed() {
return closed;
}
}

View File

@ -1,13 +1,9 @@
package com.fastasyncworldedit.core.util.task;
import com.fastasyncworldedit.core.Fawe;
import com.fastasyncworldedit.core.configuration.Settings;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.Closeable;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@ -15,13 +11,6 @@ import java.util.function.Supplier;
public class AsyncNotifyQueue implements Closeable {
private static final ForkJoinPool QUEUE_SUBMISSIONS = new ForkJoinPool(
Settings.settings().QUEUE.PARALLEL_THREADS,
new FaweForkJoinWorkerThreadFactory("AsyncNotifyQueue - %s"),
null,
false
);
private final Lock lock = new ReentrantLock(true);
private final Thread.UncaughtExceptionHandler handler;
private boolean closed;
@ -56,9 +45,6 @@ public class AsyncNotifyQueue implements Closeable {
return task.call();
} catch (Throwable e) {
handler.uncaughtException(Thread.currentThread(), e);
if (self[0] != null) {
self[0].cancel(true);
}
}
}
} finally {
@ -70,7 +56,7 @@ public class AsyncNotifyQueue implements Closeable {
}
return null;
};
self[0] = QUEUE_SUBMISSIONS.submit(wrapped);
self[0] = Fawe.instance().getQueueHandler().async(wrapped);
return self[0];
}

View File

@ -19,10 +19,9 @@
package com.sk89q.worldedit.extension.platform;
import com.fastasyncworldedit.core.configuration.Caption;
import com.fastasyncworldedit.core.internal.exception.FaweException;
import com.fastasyncworldedit.core.util.TaskManager;
import com.fastasyncworldedit.core.util.task.AsyncNotifyQueue;
import com.fastasyncworldedit.core.util.task.AsyncNotifyKeyedQueue;
import com.sk89q.worldedit.WorldEditException;
import com.sk89q.worldedit.internal.cui.CUIEvent;
import com.sk89q.worldedit.util.formatting.text.TextComponent;
@ -68,7 +67,7 @@ public abstract class AbstractNonPlayerActor implements Actor {
// Queue for async tasks
private final AtomicInteger runningCount = new AtomicInteger();
private final AsyncNotifyQueue asyncNotifyQueue = new AsyncNotifyQueue((thread, throwable) -> {
private final AsyncNotifyKeyedQueue asyncNotifyQueue = new AsyncNotifyKeyedQueue((thread, throwable) -> {
while (throwable.getCause() != null) {
throwable = throwable.getCause();
}
@ -82,7 +81,7 @@ public abstract class AbstractNonPlayerActor implements Actor {
throwable.printStackTrace();
}
}
});
}, this::getUniqueId);
/**
* Run a task either async, or on the current thread.

View File

@ -25,7 +25,7 @@ import com.fastasyncworldedit.core.math.MutableBlockVector3;
import com.fastasyncworldedit.core.regions.FaweMaskManager;
import com.fastasyncworldedit.core.util.TaskManager;
import com.fastasyncworldedit.core.util.WEManager;
import com.fastasyncworldedit.core.util.task.AsyncNotifyQueue;
import com.fastasyncworldedit.core.util.task.AsyncNotifyKeyedQueue;
import com.sk89q.worldedit.EditSession;
import com.sk89q.worldedit.MaxChangedBlocksException;
import com.sk89q.worldedit.WorldEdit;
@ -81,7 +81,7 @@ public abstract class AbstractPlayerActor implements Actor, Player, Cloneable {
// Queue for async tasks
private final AtomicInteger runningCount = new AtomicInteger();
private final AsyncNotifyQueue asyncNotifyQueue = new AsyncNotifyQueue(
private final AsyncNotifyKeyedQueue asyncNotifyQueue = new AsyncNotifyKeyedQueue(
(thread, throwable) -> {
while (throwable.getCause() != null) {
throwable = throwable.getCause();
@ -96,7 +96,7 @@ public abstract class AbstractPlayerActor implements Actor, Player, Cloneable {
throwable.printStackTrace();
}
}
});
}, this::getUniqueId);
public AbstractPlayerActor(Map<String, Object> meta) {
this.meta = meta;