From d6a8e9738abeff8757cf94013df10cefab517b1f Mon Sep 17 00:00:00 2001 From: Jesse Boyd Date: Fri, 12 Jul 2019 01:41:54 +1000 Subject: [PATCH] delete SetQueue --- .../beta/implementation/QueueHandler.java | 60 +++ .../java/com/boydti/fawe/util/SetQueue.java | 448 ------------------ 2 files changed, 60 insertions(+), 448 deletions(-) delete mode 100644 worldedit-core/src/main/java/com/boydti/fawe/util/SetQueue.java diff --git a/worldedit-core/src/main/java/com/boydti/fawe/beta/implementation/QueueHandler.java b/worldedit-core/src/main/java/com/boydti/fawe/beta/implementation/QueueHandler.java index 9a369fef5..c323463f0 100644 --- a/worldedit-core/src/main/java/com/boydti/fawe/beta/implementation/QueueHandler.java +++ b/worldedit-core/src/main/java/com/boydti/fawe/beta/implementation/QueueHandler.java @@ -10,6 +10,7 @@ import com.boydti.fawe.beta.Trimable; import com.boydti.fawe.config.Settings; import com.boydti.fawe.object.collection.IterableThreadLocal; import com.boydti.fawe.util.MemUtil; +import com.boydti.fawe.util.SetQueue; import com.boydti.fawe.util.TaskManager; import com.boydti.fawe.wrappers.WorldWrapper; import com.sk89q.worldedit.math.BlockVector2; @@ -51,11 +52,61 @@ public abstract class QueueHandler implements Trimable, Runnable { TaskManager.IMP.repeat(this, 1); } + /** + * Used to calculate elapsed time in milliseconds and ensure block placement doesn't lag the server + */ + private long last; + private long allocate = 50; + private double targetTPS = 18; + @Override public void run() { if (!Fawe.isMainThread()) { throw new IllegalStateException("Not main thread"); } + if (!syncTasks.isEmpty()) { + long now = System.currentTimeMillis(); + targetTPS = 18 - Math.max(Settings.IMP.QUEUE.EXTRA_TIME_MS * 0.05, 0); + long diff = (50 + this.last) - (this.last = now); + long absDiff = Math.abs(diff); + if (diff == 0) { + allocate = Math.min(50, allocate + 1); + } else if (diff < 0) { + allocate = Math.max(5, allocate + diff); + } else if (!Fawe.get().getTimer().isAbove(targetTPS)) { + allocate = Math.max(5, allocate - 1); + } + long currentAllocate = allocate - absDiff; + + if (!MemUtil.isMemoryFree()) { + // TODO reduce mem usage + } + + long taskAllocate = currentAllocate; + boolean wait = false; + do { + Runnable task = syncTasks.poll(); + if (task == null) { + if (wait) { + synchronized (syncTasks) { + try { + syncTasks.wait(1); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + task = syncTasks.poll(); + wait = false; + } else { + break; + } + } + if (task != null) { + task.run(); + wait = true; + } + } while (System.currentTimeMillis() - now < taskAllocate); + } while (!syncTasks.isEmpty()) { final FutureTask task = syncTasks.poll(); if (task != null) task.run(); @@ -77,21 +128,30 @@ public abstract class QueueHandler implements Trimable, Runnable { public Future sync(final Runnable run, final T value) { final FutureTask result = new FutureTask<>(run, value); syncTasks.add(result); + notifySync(); return result; } public Future sync(final Runnable run) { final FutureTask result = new FutureTask<>(run, null); syncTasks.add(result); + notifySync(); return result; } public Future sync(final Callable call) { final FutureTask result = new FutureTask<>(call); syncTasks.add(result); + notifySync(); return result; } + private void notifySync() { + synchronized (syncTasks) { + syncTasks.notifyAll(); + } + } + public > T submit(final IChunk chunk) { if (MemUtil.isMemoryFree()) { // return (T) forkJoinPoolSecondary.submit(chunk); diff --git a/worldedit-core/src/main/java/com/boydti/fawe/util/SetQueue.java b/worldedit-core/src/main/java/com/boydti/fawe/util/SetQueue.java deleted file mode 100644 index df7e10df5..000000000 --- a/worldedit-core/src/main/java/com/boydti/fawe/util/SetQueue.java +++ /dev/null @@ -1,448 +0,0 @@ -package com.boydti.fawe.util; - -import com.boydti.fawe.Fawe; -import com.boydti.fawe.config.Settings; -import com.boydti.fawe.object.FaweQueue; -import com.boydti.fawe.wrappers.WorldWrapper; -import com.sk89q.worldedit.world.World; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.ConcurrentModificationException; -import java.util.Iterator; -import java.util.concurrent.ConcurrentLinkedDeque; -import java.util.concurrent.ExecutorCompletionService; -import java.util.concurrent.ForkJoinPool; -import java.util.concurrent.TimeUnit; - -public class SetQueue { - - /** - * The implementation specific queue - */ - public static final SetQueue IMP = new SetQueue(); - private double targetTPS = 18; - - public enum QueueStage { - INACTIVE, ACTIVE, NONE; - } - - private final ConcurrentLinkedDeque activeQueues; - private final ConcurrentLinkedDeque inactiveQueues; - private final ConcurrentLinkedDeque tasks; - - /** - * Used to calculate elapsed time in milliseconds and ensure block placement doesn't lag the server - */ - private long last; - private long allocate = 50; - private long lastSuccess; - - /** - * A queue of tasks that will run when the queue is empty - */ - private final ConcurrentLinkedDeque emptyTasks = new ConcurrentLinkedDeque<>(); - - private ForkJoinPool pool = new ForkJoinPool(); - private ExecutorCompletionService completer = new ExecutorCompletionService(pool); - - /** - * @return ForkJoinPool - * @see TaskManager#getPublicForkJoinPool() - */ - @Deprecated - public ExecutorCompletionService getCompleterService() { - return completer; - } - - @Deprecated - public ForkJoinPool getForkJoinPool() { - return pool; - } - - public void runMiscTasks() { - while (Fawe.get().getTimer().isAbove(targetTPS)) { - Runnable task = tasks.poll(); - if (task != null) { - task.run(); - } else { - break; - } - } - } - - public SetQueue() { - tasks = new ConcurrentLinkedDeque<>(); - activeQueues = new ConcurrentLinkedDeque<>(); - inactiveQueues = new ConcurrentLinkedDeque<>(); - if (TaskManager.IMP == null) return; - TaskManager.IMP.repeat(() -> { - try { - long now = System.currentTimeMillis(); - boolean empty = (inactiveQueues.isEmpty() && activeQueues.isEmpty()); - boolean emptyTasks = tasks.isEmpty(); - if (emptyTasks && empty) { - last = now; - runEmptyTasks(); - return; - } - - targetTPS = 18 - Math.max(Settings.IMP.QUEUE.EXTRA_TIME_MS * 0.05, 0); - - long diff = (50 + SetQueue.this.last) - (SetQueue.this.last = now); - long absDiff = Math.abs(diff); - if (diff == 0) { - allocate = Math.min(50, allocate + 1); - } else if (diff < 0) { - allocate = Math.max(5, allocate + diff); - } else if (!Fawe.get().getTimer().isAbove(targetTPS)) { - allocate = Math.max(5, allocate - 1); - } - - long currentAllocate = allocate - absDiff; - - if (!emptyTasks) { - long taskAllocate = activeQueues.isEmpty() ? currentAllocate : 1 + (currentAllocate >> 1); - long used = 0; - boolean wait = false; - do { - Runnable task = tasks.poll(); - if (task == null) { - if (wait) { - synchronized (tasks) { - tasks.wait(1); - } - task = tasks.poll(); - wait = false; - } else { - break; - } - } - if (task != null) { - task.run(); - wait = true; - } - } while ((used = System.currentTimeMillis() - now) < taskAllocate); - currentAllocate -= used; - } - - if (empty) { - runEmptyTasks(); - return; - } - - if (!MemUtil.isMemoryFree()) { - final int mem = MemUtil.calculateMemory(); - if (mem != Integer.MAX_VALUE) { - allocate = Math.max(5, allocate - 1); - if ((mem <= 1) && Settings.IMP.PREVENT_CRASHES) { - for (FaweQueue queue : getAllQueues()) { - queue.saveMemory(); - } - return; - } - if (SetQueue.this.forceChunkSet()) { - System.gc(); - } else { - SetQueue.this.runEmptyTasks(); - } - return; - } - } - - FaweQueue queue = getNextQueue(); - if (queue == null) { - return; - } - - long time = (long) Settings.IMP.QUEUE.EXTRA_TIME_MS + currentAllocate - System.currentTimeMillis() + now; - // Disable the async catcher as it can't discern async vs parallel - boolean parallel = Settings.IMP.QUEUE.PARALLEL_THREADS > 1; - queue.startSet(parallel); - try { - if (!queue.next(Settings.IMP.QUEUE.PARALLEL_THREADS, time) && queue.getStage() == QueueStage.ACTIVE) { - queue.setStage(QueueStage.NONE); - queue.runTasks(); - } - } catch (Throwable e) { - pool.awaitQuiescence(Settings.IMP.QUEUE.DISCARD_AFTER_MS, TimeUnit.MILLISECONDS); - completer = new ExecutorCompletionService(pool); - e.printStackTrace(); - } - if (pool.getQueuedSubmissionCount() != 0 || pool.getRunningThreadCount() != 0 || pool.getQueuedTaskCount() != 0) { -// if (Fawe.get().isJava8()) - { - pool.awaitQuiescence(Long.MAX_VALUE, TimeUnit.MILLISECONDS); - } -// else { -// pool.shutdown(); -// pool.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS); -// pool = new ForkJoinPool(); -// completer = new ExecutorCompletionService(pool); -// } - } - queue.endSet(parallel); - } catch (Throwable e) { - e.printStackTrace(); - } - }, 1); - } - - public QueueStage getStage(FaweQueue queue) { - return queue.getStage(); - } - - public boolean isStage(FaweQueue queue, QueueStage stage) { - switch (stage) { - case ACTIVE: - return activeQueues.contains(queue); - case INACTIVE: - return inactiveQueues.contains(queue); - case NONE: - return !activeQueues.contains(queue) && !inactiveQueues.contains(queue); - } - return false; - } - - public boolean enqueue(FaweQueue queue) { - queue.setStage(QueueStage.ACTIVE); - inactiveQueues.remove(queue); - if (queue.size() > 0) { - if (!activeQueues.contains(queue)) { - queue.optimize(); - activeQueues.add(queue); - } - return true; - } - return false; - } - - public void dequeue(FaweQueue queue) { - queue.setStage(QueueStage.NONE); - inactiveQueues.remove(queue); - activeQueues.remove(queue); - queue.runTasks(); - } - - public Collection getAllQueues() { - ArrayList list = new ArrayList<>(activeQueues.size() + inactiveQueues.size()); - list.addAll(inactiveQueues); - list.addAll(activeQueues); - return list; - } - - public Collection getActiveQueues() { - return Collections.unmodifiableCollection(activeQueues); - } - - public Collection getInactiveQueues() { - return Collections.unmodifiableCollection(inactiveQueues); - } - - public FaweQueue getNewQueue(World world, boolean fast, boolean autoqueue) { - world = WorldWrapper.unwrap(world); - if (world instanceof FaweQueue) return (FaweQueue) world; - FaweQueue queue = Fawe.imp().getNewQueue(world, fast); - if (autoqueue) { - queue.setStage(QueueStage.INACTIVE); - inactiveQueues.add(queue); - } - return queue; - } - - public FaweQueue getNewQueue(String world, boolean fast, boolean autoqueue) { - FaweQueue queue = Fawe.imp().getNewQueue(world, fast); - if (autoqueue) { - queue.setStage(QueueStage.INACTIVE); - inactiveQueues.add(queue); - } - return queue; - } - - public void flush(FaweQueue queue) { - int parallelThreads; - if (Fawe.isMainThread()) { - parallelThreads = Settings.IMP.QUEUE.PARALLEL_THREADS; - Settings.IMP.QUEUE.PARALLEL_THREADS = 1; - } else { - parallelThreads = 0; - } - try { - queue.startSet(Settings.IMP.QUEUE.PARALLEL_THREADS > 1); - queue.next(Settings.IMP.QUEUE.PARALLEL_THREADS, Long.MAX_VALUE); - } catch (Throwable e) { - pool.awaitQuiescence(Settings.IMP.QUEUE.DISCARD_AFTER_MS, TimeUnit.MILLISECONDS); - completer = new ExecutorCompletionService(pool); - e.printStackTrace(); - } finally { - queue.endSet(Settings.IMP.QUEUE.PARALLEL_THREADS > 1); - queue.setStage(QueueStage.NONE); - queue.runTasks(); - if (parallelThreads != 0) { - Settings.IMP.QUEUE.PARALLEL_THREADS = parallelThreads; - } - } - } - - public FaweQueue getNextQueue() { - long now = System.currentTimeMillis(); - while (!activeQueues.isEmpty()) { - FaweQueue queue = activeQueues.peek(); - if (queue != null && queue.size() > 0) { - queue.setModified(now); - return queue; - } else { - queue.setStage(QueueStage.NONE); - queue.runTasks(); - activeQueues.poll(); - } - } - int size = inactiveQueues.size(); - if (size > 0) { - Iterator iter = inactiveQueues.iterator(); - try { - int total = 0; - FaweQueue firstNonEmpty = null; - while (iter.hasNext()) { - FaweQueue queue = iter.next(); - long age = now - queue.getModified(); - total += queue.size(); - if (queue.size() == 0) { - if (age > Settings.IMP.QUEUE.DISCARD_AFTER_MS) { - queue.setStage(QueueStage.NONE); - queue.runTasks(); - iter.remove(); - } - continue; - } - if (firstNonEmpty == null) { - firstNonEmpty = queue; - } - if (total > Settings.IMP.QUEUE.TARGET_SIZE) { - firstNonEmpty.setModified(now); - return firstNonEmpty; - } - if (age > Settings.IMP.QUEUE.MAX_WAIT_MS) { - queue.setModified(now); - return queue; - } - } - } catch (ConcurrentModificationException e) { - e.printStackTrace(); - } - } - return null; - } - - public boolean next() { - while (activeQueues.size() > 0) { - FaweQueue queue = activeQueues.poll(); - if (queue != null) { - final boolean set = queue.next(); - if (set) { - activeQueues.add(queue); - return set; - } else { - queue.setStage(QueueStage.NONE); - queue.runTasks(); - } - } - } - if (inactiveQueues.size() > 0) { - ArrayList tmp = new ArrayList<>(inactiveQueues); - if (Settings.IMP.QUEUE.MAX_WAIT_MS != -1) { - long now = System.currentTimeMillis(); - if (lastSuccess == 0) { - lastSuccess = now; - } - long diff = now - lastSuccess; - if (diff > Settings.IMP.QUEUE.MAX_WAIT_MS) { - for (FaweQueue queue : tmp) { - boolean result = queue.next(); - if (result) { - return result; - } - } - if (diff > Settings.IMP.QUEUE.DISCARD_AFTER_MS) { - // These edits never finished - for (FaweQueue queue : tmp) { - queue.setStage(QueueStage.NONE); - queue.runTasks(); - } - inactiveQueues.clear(); - } - return false; - } - } - if (Settings.IMP.QUEUE.TARGET_SIZE != -1) { - int total = 0; - for (FaweQueue queue : tmp) { - total += queue.size(); - } - if (total > Settings.IMP.QUEUE.TARGET_SIZE) { - for (FaweQueue queue : tmp) { - boolean result = queue.next(); - if (result) { - return result; - } - } - } - } - } - return false; - } - - public boolean forceChunkSet() { - return next(); - } - - /** - * Is the this empty - * - * @return - */ - public boolean isEmpty() { - return activeQueues.size() == 0 && inactiveQueues.size() == 0; - } - - public void addTask(Runnable whenFree) { - tasks.add(whenFree); - synchronized (tasks) { - tasks.notifyAll(); - } - } - - /** - * Add a task to run when it is empty - * - * @param whenDone - * @return - */ - public boolean addEmptyTask(final Runnable whenDone) { - if (this.isEmpty()) { - // Run - this.runEmptyTasks(); - if (whenDone != null) { - whenDone.run(); - } - return true; - } - if (whenDone != null) { - this.emptyTasks.add(whenDone); - } - return false; - } - - private synchronized boolean runEmptyTasks() { - if (this.emptyTasks.isEmpty()) { - return false; - } - final ConcurrentLinkedDeque tmp = new ConcurrentLinkedDeque<>(this.emptyTasks); - this.emptyTasks.clear(); - for (final Runnable runnable : tmp) { - runnable.run(); - } - return true; - } -}