Plex-FAWE/worldedit-core/src/main/java/com/boydti/fawe/beta/implementation/QueueHandler.java

247 lines
8.1 KiB
Java
Raw Normal View History

2019-04-28 15:44:59 +00:00
package com.boydti.fawe.beta.implementation;
2019-05-02 14:45:03 +00:00
import com.boydti.fawe.Fawe;
2019-05-01 15:45:18 +00:00
import com.boydti.fawe.FaweCache;
2019-04-30 16:19:10 +00:00
import com.boydti.fawe.beta.IChunk;
2019-04-28 15:44:59 +00:00
import com.boydti.fawe.beta.IQueueExtent;
import com.boydti.fawe.beta.Trimable;
2019-04-30 16:19:10 +00:00
import com.boydti.fawe.config.Settings;
2019-04-28 15:44:59 +00:00
import com.boydti.fawe.object.collection.IterableThreadLocal;
2019-05-01 15:45:18 +00:00
import com.boydti.fawe.util.MemUtil;
2019-05-02 14:45:03 +00:00
import com.boydti.fawe.util.TaskManager;
2019-04-28 17:36:23 +00:00
import com.boydti.fawe.wrappers.WorldWrapper;
2019-07-22 06:02:51 +00:00
import com.google.common.util.concurrent.Futures;
2019-04-28 15:44:59 +00:00
import com.sk89q.worldedit.world.World;
import java.lang.ref.WeakReference;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
2019-05-02 08:27:33 +00:00
import java.util.concurrent.Callable;
2019-05-01 15:45:18 +00:00
import java.util.concurrent.ConcurrentLinkedQueue;
2019-07-18 16:07:31 +00:00
import java.util.concurrent.ExecutionException;
2019-04-30 16:19:10 +00:00
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.Future;
2019-05-02 08:27:33 +00:00
import java.util.concurrent.FutureTask;
2019-05-01 15:45:18 +00:00
import java.util.concurrent.ThreadPoolExecutor;
2019-07-22 06:02:51 +00:00
import java.util.function.Supplier;
2019-04-28 15:44:59 +00:00
2019-04-28 17:36:23 +00:00
/**
* Class which handles all the queues {@link IQueueExtent}
*/
2019-05-02 14:45:03 +00:00
public abstract class QueueHandler implements Trimable, Runnable {
2019-05-01 15:45:18 +00:00
private ForkJoinPool forkJoinPoolPrimary = new ForkJoinPool();
private ForkJoinPool forkJoinPoolSecondary = new ForkJoinPool();
private ThreadPoolExecutor blockingExecutor = FaweCache.newBlockingExecutor();
2019-05-02 08:27:33 +00:00
private ConcurrentLinkedQueue<FutureTask> syncTasks = new ConcurrentLinkedQueue();
2019-04-28 17:36:23 +00:00
2019-05-01 15:45:18 +00:00
private Map<World, WeakReference<WorldChunkCache>> chunkCache = new HashMap<>();
private IterableThreadLocal<IQueueExtent> queuePool = new IterableThreadLocal<IQueueExtent>() {
2019-04-28 15:44:59 +00:00
@Override
public IQueueExtent init() {
return create();
}
};
2019-05-02 14:45:03 +00:00
public QueueHandler() {
TaskManager.IMP.repeat(this, 1);
}
2019-07-11 15:41:54 +00:00
/**
* Used to calculate elapsed time in milliseconds and ensure block placement doesn't lag the server
*/
private long last;
private long allocate = 50;
private double targetTPS = 18;
2019-05-02 14:45:03 +00:00
@Override
public void run() {
if (!Fawe.isMainThread()) {
throw new IllegalStateException("Not main thread");
}
2019-07-11 15:41:54 +00:00
if (!syncTasks.isEmpty()) {
long now = System.currentTimeMillis();
targetTPS = 18 - Math.max(Settings.IMP.QUEUE.EXTRA_TIME_MS * 0.05, 0);
long diff = (50 + this.last) - (this.last = now);
long absDiff = Math.abs(diff);
if (diff == 0) {
allocate = Math.min(50, allocate + 1);
} else if (diff < 0) {
allocate = Math.max(5, allocate + diff);
} else if (!Fawe.get().getTimer().isAbove(targetTPS)) {
allocate = Math.max(5, allocate - 1);
}
long currentAllocate = allocate - absDiff;
if (!MemUtil.isMemoryFree()) {
// TODO reduce mem usage
}
long taskAllocate = currentAllocate;
boolean wait = false;
do {
Runnable task = syncTasks.poll();
if (task == null) {
if (wait) {
synchronized (syncTasks) {
try {
syncTasks.wait(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
task = syncTasks.poll();
wait = false;
} else {
break;
}
}
if (task != null) {
task.run();
wait = true;
}
} while (System.currentTimeMillis() - now < taskAllocate);
}
2019-05-02 14:45:03 +00:00
while (!syncTasks.isEmpty()) {
final FutureTask task = syncTasks.poll();
if (task != null) task.run();
}
}
2019-07-18 16:07:31 +00:00
public <T extends Future<T>> void complete(Future<T> task) {
try {
while (task != null) {
task = task.get();
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
2019-05-02 14:45:03 +00:00
public <T> Future<T> async(final Runnable run, final T value) {
2019-05-02 08:27:33 +00:00
return forkJoinPoolSecondary.submit(run, value);
}
2019-07-18 16:07:31 +00:00
public Future<?> async(final Runnable run) {
return forkJoinPoolSecondary.submit(run);
}
2019-05-02 14:45:03 +00:00
public <T> Future<T> async(final Callable<T> call) {
2019-05-02 08:27:33 +00:00
return forkJoinPoolSecondary.submit(call);
}
2019-07-11 15:32:14 +00:00
public ForkJoinTask submit(final Runnable call) {
return forkJoinPoolPrimary.submit(call);
}
2019-05-02 14:45:03 +00:00
public <T> Future<T> sync(final Runnable run, final T value) {
2019-07-22 06:02:51 +00:00
if (Fawe.isMainThread()) {
run.run();
return Futures.immediateFuture(value);
}
2019-05-02 14:45:03 +00:00
final FutureTask<T> result = new FutureTask<>(run, value);
2019-05-02 08:27:33 +00:00
syncTasks.add(result);
2019-07-11 15:41:54 +00:00
notifySync();
2019-05-02 08:27:33 +00:00
return result;
}
2019-05-02 14:45:03 +00:00
public <T> Future<T> sync(final Runnable run) {
2019-07-22 06:02:51 +00:00
if (Fawe.isMainThread()) {
run.run();
return Futures.immediateCancelledFuture();
}
2019-05-02 14:45:03 +00:00
final FutureTask<T> result = new FutureTask<>(run, null);
2019-05-02 08:27:33 +00:00
syncTasks.add(result);
2019-07-11 15:41:54 +00:00
notifySync();
2019-05-02 08:27:33 +00:00
return result;
}
2019-07-22 06:02:51 +00:00
public <T> Future<T> sync(final Callable<T> call) throws Exception {
if (Fawe.isMainThread()) {
return Futures.immediateFuture(call.call());
}
2019-05-02 14:45:03 +00:00
final FutureTask<T> result = new FutureTask<>(call);
2019-05-02 08:27:33 +00:00
syncTasks.add(result);
2019-07-11 15:41:54 +00:00
notifySync();
2019-05-02 08:27:33 +00:00
return result;
}
2019-07-22 06:02:51 +00:00
public <T> Future<T> sync(final Supplier<T> call) {
if (Fawe.isMainThread()) {
return Futures.immediateFuture(call.get());
}
final FutureTask<T> result = new FutureTask<>(call::get);
syncTasks.add(result);
notifySync();
return result;
}
2019-07-11 15:41:54 +00:00
private void notifySync() {
synchronized (syncTasks) {
syncTasks.notifyAll();
}
}
2019-05-02 14:45:03 +00:00
public <T extends Future<T>> T submit(final IChunk<T> chunk) {
2019-07-22 06:02:51 +00:00
// if (MemUtil.isMemoryFree()) { TODO NOT IMPLEMENTED - optimize this
2019-05-01 15:45:18 +00:00
// return (T) forkJoinPoolSecondary.submit(chunk);
2019-07-22 06:02:51 +00:00
// }
2019-05-01 15:45:18 +00:00
return (T) blockingExecutor.submit(chunk);
2019-04-30 16:19:10 +00:00
}
2019-04-28 17:36:23 +00:00
/**
* Get or create the WorldChunkCache for a world
* @param world
* @return
*/
public WorldChunkCache getOrCreate(World world) {
world = WorldWrapper.unwrap(world);
2019-04-28 15:44:59 +00:00
synchronized (chunkCache) {
final WeakReference<WorldChunkCache> ref = chunkCache.get(world);
if (ref != null) {
final WorldChunkCache cached = ref.get();
if (cached != null) {
return cached;
}
}
final WorldChunkCache created = new WorldChunkCache(world);
chunkCache.put(world, new WeakReference<>(created));
return created;
}
}
public abstract IQueueExtent create();
2019-07-22 09:05:14 +00:00
public abstract void startSet(boolean parallel);
2019-07-22 06:02:51 +00:00
2019-07-22 09:05:14 +00:00
public abstract void endSet(boolean parallel);
2019-07-22 06:02:51 +00:00
2019-05-02 14:45:03 +00:00
public IQueueExtent getQueue(final World world) {
final IQueueExtent queue = queuePool.get();
2019-04-30 16:19:10 +00:00
queue.init(getOrCreate(world));
return queue;
}
2019-04-28 17:36:23 +00:00
@Override
2019-04-28 15:44:59 +00:00
public boolean trim(final boolean aggressive) {
boolean result = true;
synchronized (chunkCache) {
final Iterator<Map.Entry<World, WeakReference<WorldChunkCache>>> iter = chunkCache.entrySet().iterator();
while (iter.hasNext()) {
final Map.Entry<World, WeakReference<WorldChunkCache>> entry = iter.next();
final WeakReference<WorldChunkCache> value = entry.getValue();
final WorldChunkCache cache = value.get();
if (cache == null || cache.size() == 0 || cache.trim(aggressive)) {
iter.remove();
continue;
}
result = false;
}
}
return result;
}
}