feat: implement a player-specific queue for clipboard IO tasks (#2267)

* feat: implement a player-specific queue for clipboard IO tasks
 - Addresses #2222 (hopefully fixes)

* Address comments

* Add since

---------

Co-authored-by: Alexander Brandes <mc.cache@web.de>
This commit is contained in:
Jordan 2023-06-06 23:35:37 +01:00 committed by GitHub
parent 84872cf9a2
commit 1356cd5caa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 211 additions and 4 deletions

View File

@ -12,7 +12,9 @@ import com.fastasyncworldedit.core.util.RandomTextureUtil;
import com.fastasyncworldedit.core.util.TaskManager; import com.fastasyncworldedit.core.util.TaskManager;
import com.fastasyncworldedit.core.util.TextureUtil; import com.fastasyncworldedit.core.util.TextureUtil;
import com.fastasyncworldedit.core.util.WEManager; import com.fastasyncworldedit.core.util.WEManager;
import com.fastasyncworldedit.core.util.task.KeyQueuedExecutorService;
import com.github.luben.zstd.Zstd; import com.github.luben.zstd.Zstd;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.sk89q.worldedit.WorldEdit; import com.sk89q.worldedit.WorldEdit;
import com.sk89q.worldedit.internal.util.LogManagerCompat; import com.sk89q.worldedit.internal.util.LogManagerCompat;
import net.jpountz.lz4.LZ4Factory; import net.jpountz.lz4.LZ4Factory;
@ -33,6 +35,9 @@ import java.lang.management.MemoryPoolMXBean;
import java.lang.management.MemoryUsage; import java.lang.management.MemoryUsage;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
/** /**
@ -86,6 +91,7 @@ public class Fawe {
* The platform specific implementation. * The platform specific implementation.
*/ */
private final IFawe implementation; private final IFawe implementation;
private final KeyQueuedExecutorService<UUID> clipboardExecutor;
private FaweVersion version; private FaweVersion version;
private TextureUtil textures; private TextureUtil textures;
private QueueHandler queueHandler; private QueueHandler queueHandler;
@ -131,6 +137,15 @@ public class Fawe {
}, 0); }, 0);
TaskManager.taskManager().repeat(timer, 1); TaskManager.taskManager().repeat(timer, 1);
clipboardExecutor = new KeyQueuedExecutorService<>(new ThreadPoolExecutor(
1,
Settings.settings().QUEUE.PARALLEL_THREADS,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
new ThreadFactoryBuilder().setNameFormat("fawe-clipboard-%d").build()
));
} }
/** /**
@ -428,4 +443,15 @@ public class Fawe {
return this.thread = Thread.currentThread(); return this.thread = Thread.currentThread();
} }
/**
* Gets the executor used for clipboard IO if clipboard on disk is enabled or null
*
* @return Executor used for clipboard IO if clipboard on disk is enabled or null
* @since TODO
*/
@Nullable
public KeyQueuedExecutorService<UUID> getClipboardExecutor() {
return this.clipboardExecutor;
}
} }

View File

@ -156,7 +156,9 @@ public class DiskOptimizedClipboard extends LinearClipboard {
/** /**
* Load an existing file as a DiskOptimizedClipboard. The file MUST exist and MUST be created as a DiskOptimizedClipboard * Load an existing file as a DiskOptimizedClipboard. The file MUST exist and MUST be created as a DiskOptimizedClipboard
* with data written to it. * with data written to it.
* @deprecated Will be made private, use {@link DiskOptimizedClipboard#loadFromFile(File)}
*/ */
@Deprecated(forRemoval = true, since = "TODO")
public DiskOptimizedClipboard(File file) { public DiskOptimizedClipboard(File file) {
this(file, VERSION); this(file, VERSION);
} }
@ -167,7 +169,9 @@ public class DiskOptimizedClipboard extends LinearClipboard {
* *
* @param file File to read from * @param file File to read from
* @param versionOverride An override version to allow loading of older clipboards if required * @param versionOverride An override version to allow loading of older clipboards if required
* @deprecated Will be made private, use {@link DiskOptimizedClipboard#loadFromFile(File)}
*/ */
@Deprecated(forRemoval = true, since = "TODO")
public DiskOptimizedClipboard(File file, int versionOverride) { public DiskOptimizedClipboard(File file, int versionOverride) {
super(readSize(file, versionOverride), BlockVector3.ZERO); super(readSize(file, versionOverride), BlockVector3.ZERO);
headerSize = getHeaderSizeOverrideFromVersion(versionOverride); headerSize = getHeaderSizeOverrideFromVersion(versionOverride);

View File

@ -0,0 +1,172 @@
package com.fastasyncworldedit.core.util.task;
import javax.annotation.Nonnull;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* Executor service that queues tasks based on keys, executing tasks on a configurable {@link ThreadPoolExecutor}
*
* @param <K> Key type
* @since TODO
*/
public class KeyQueuedExecutorService<K> {
private final ExecutorService parent;
private final Map<K, KeyRunner> keyQueue = new HashMap<>();
/**
* Create a new {@link KeyQueuedExecutorService} instance
*
* @param parent Parent {@link ExecutorService} to use for actual task completion
*/
public KeyQueuedExecutorService(ExecutorService parent) {
this.parent = parent;
}
/**
* Delegates to {@link ThreadPoolExecutor#shutdown()}
*/
public void shutdown() {
parent.shutdown();
}
/**
* Delegates to {@link ThreadPoolExecutor#shutdownNow()}
*/
@Nonnull
public List<Runnable> shutdownNow() {
return parent.shutdownNow();
}
/**
* Delegates to {@link ThreadPoolExecutor#isShutdown()}
*/
public boolean isShutdown() {
return parent.isShutdown();
}
/**
* Delegates to {@link ThreadPoolExecutor#isTerminated()}
*/
public boolean isTerminated() {
return parent.isTerminated();
}
/**
* Delegates to {@link ThreadPoolExecutor#awaitTermination(long, TimeUnit)}
*/
public boolean awaitTermination(long timeout, @Nonnull TimeUnit unit) throws InterruptedException {
return parent.awaitTermination(timeout, unit);
}
protected <T> FutureTask<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<>(runnable, value);
}
protected <T> FutureTask<T> newTaskFor(Callable<T> callable) {
return new FutureTask<>(callable);
}
@Nonnull
public <T> Future<T> submit(@Nonnull K key, @Nonnull Callable<T> task) {
FutureTask<T> ftask = newTaskFor(task);
execute(key, ftask);
return ftask;
}
@Nonnull
public <T> Future<T> submit(@Nonnull K key, @Nonnull Runnable task, T result) {
FutureTask<T> ftask = newTaskFor(task, result);
execute(key, ftask);
return ftask;
}
@Nonnull
public Future<?> submit(@Nonnull K key, @Nonnull Runnable task) {
FutureTask<Void> ftask = newTaskFor(task, null);
execute(key, ftask);
return ftask;
}
public void execute(@Nonnull K key, @Nonnull FutureTask<?> command) {
synchronized (keyQueue) {
boolean triggerRun = false;
KeyRunner runner = keyQueue.get(key);
if (runner == null) {
runner = new KeyRunner(key);
keyQueue.put(key, runner);
triggerRun = true;
}
runner.add(command);
if (triggerRun) {
runner.triggerRun();
}
}
}
private final class KeyRunner {
private final Queue<FutureTask<?>> tasks = new ConcurrentLinkedQueue<>();
private final K key;
private KeyRunner(K key) {
this.key = key;
}
void add(FutureTask<?> task) {
if (!tasks.add(task)) {
throw new RejectedExecutionException(rejection());
}
}
void triggerRun() {
Runnable task = tasks.poll();
if (task == null) {
throw new RejectedExecutionException(rejection());
}
try {
run(task);
} catch (RejectedExecutionException e) {
synchronized (keyQueue) {
keyQueue.remove(key);
}
throw new RejectedExecutionException(rejection(), e);
}
}
private void run(Runnable task) {
parent.execute(() -> {
task.run();
Runnable next = tasks.poll();
if (next == null) {
synchronized (keyQueue) {
next = tasks.poll();
if (next == null) {
keyQueue.remove(key);
}
}
}
if (next != null) {
run(next);
}
});
}
private String rejection() {
return "Task for the key '" + key + "' rejected";
}
}
}

View File

@ -437,14 +437,16 @@ public interface Player extends Entity, Actor {
} else { } else {
continue; continue;
} }
WorldEdit.getInstance().getExecutorService().submit(() -> { Fawe.instance().getClipboardExecutor().submit(getUniqueId(), () -> {
doc.close(); // Ensure closed before deletion doc.close(); // Ensure closed before deletion
doc.getFile().delete(); doc.getFile().delete();
}); });
} }
} }
} else if (Settings.settings().CLIPBOARD.DELETE_ON_LOGOUT || Settings.settings().CLIPBOARD.USE_DISK) { } else if (Settings.settings().CLIPBOARD.USE_DISK) {
WorldEdit.getInstance().getExecutorService().submit(() -> session.setClipboard(null)); Fawe.instance().getClipboardExecutor().submit(getUniqueId(), () -> session.setClipboard(null));
} else if (Settings.settings().CLIPBOARD.DELETE_ON_LOGOUT) {
session.setClipboard(null);
} }
if (Settings.settings().HISTORY.DELETE_ON_LOGOUT) { if (Settings.settings().HISTORY.DELETE_ON_LOGOUT) {
session.clearHistory(); session.clearHistory();
@ -470,7 +472,10 @@ public interface Player extends Entity, Actor {
} }
} catch (EmptyClipboardException ignored) { } catch (EmptyClipboardException ignored) {
} }
DiskOptimizedClipboard doc = DiskOptimizedClipboard.loadFromFile(file); DiskOptimizedClipboard doc = Fawe.instance().getClipboardExecutor().submit(
getUniqueId(),
() -> DiskOptimizedClipboard.loadFromFile(file)
).get();
Clipboard clip = doc.toClipboard(); Clipboard clip = doc.toClipboard();
ClipboardHolder holder = new ClipboardHolder(clip); ClipboardHolder holder = new ClipboardHolder(clip);
session.setClipboard(holder); session.setClipboard(holder);