Plex-FAWE/worldedit-core/src/main/java/com/boydti/fawe/beta/implementation/QueueHandler.java

169 lines
6.3 KiB
Java
Raw Normal View History

2019-04-28 15:44:59 +00:00
package com.boydti.fawe.beta.implementation;
2019-05-01 15:45:18 +00:00
import com.boydti.fawe.FaweCache;
2019-04-28 15:44:59 +00:00
import com.boydti.fawe.beta.Filter;
2019-04-30 16:19:10 +00:00
import com.boydti.fawe.beta.FilterBlock;
import com.boydti.fawe.beta.IChunk;
2019-04-28 15:44:59 +00:00
import com.boydti.fawe.beta.IQueueExtent;
import com.boydti.fawe.beta.Trimable;
2019-04-30 16:19:10 +00:00
import com.boydti.fawe.config.Settings;
2019-04-28 15:44:59 +00:00
import com.boydti.fawe.object.collection.IterableThreadLocal;
2019-05-01 15:45:18 +00:00
import com.boydti.fawe.util.MemUtil;
2019-04-28 17:36:23 +00:00
import com.boydti.fawe.wrappers.WorldWrapper;
2019-05-01 15:45:18 +00:00
import com.google.common.util.concurrent.Futures;
2019-04-30 16:19:10 +00:00
import com.sk89q.worldedit.math.BlockVector2;
2019-04-28 15:44:59 +00:00
import com.sk89q.worldedit.regions.Region;
import com.sk89q.worldedit.world.World;
import java.lang.ref.WeakReference;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
2019-04-30 16:19:10 +00:00
import java.util.Set;
2019-05-01 15:45:18 +00:00
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
2019-04-30 16:19:10 +00:00
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.Future;
2019-05-01 15:45:18 +00:00
import java.util.concurrent.ThreadPoolExecutor;
2019-04-28 15:44:59 +00:00
2019-04-28 17:36:23 +00:00
/**
* Class which handles all the queues {@link IQueueExtent}
*/
2019-04-28 15:44:59 +00:00
public abstract class QueueHandler implements Trimable {
2019-05-01 15:45:18 +00:00
private ForkJoinPool forkJoinPoolPrimary = new ForkJoinPool();
private ForkJoinPool forkJoinPoolSecondary = new ForkJoinPool();
private ThreadPoolExecutor blockingExecutor = FaweCache.newBlockingExecutor();
private ConcurrentLinkedQueue<Runnable> syncTasks = new ConcurrentLinkedQueue();
2019-04-28 17:36:23 +00:00
2019-05-01 15:45:18 +00:00
private Map<World, WeakReference<WorldChunkCache>> chunkCache = new HashMap<>();
private IterableThreadLocal<IQueueExtent> queuePool = new IterableThreadLocal<IQueueExtent>() {
2019-04-28 15:44:59 +00:00
@Override
public IQueueExtent init() {
return create();
}
};
2019-05-01 15:45:18 +00:00
public <T extends Future<T>> T submit(IChunk<T> chunk) {
if (MemUtil.isMemoryFree()) {
// return (T) forkJoinPoolSecondary.submit(chunk);
2019-04-30 16:19:10 +00:00
}
2019-05-01 15:45:18 +00:00
return (T) blockingExecutor.submit(chunk);
2019-04-30 16:19:10 +00:00
}
2019-04-28 17:36:23 +00:00
/**
* Get or create the WorldChunkCache for a world
* @param world
* @return
*/
public WorldChunkCache getOrCreate(World world) {
world = WorldWrapper.unwrap(world);
2019-04-28 15:44:59 +00:00
synchronized (chunkCache) {
final WeakReference<WorldChunkCache> ref = chunkCache.get(world);
if (ref != null) {
final WorldChunkCache cached = ref.get();
if (cached != null) {
return cached;
}
}
final WorldChunkCache created = new WorldChunkCache(world);
chunkCache.put(world, new WeakReference<>(created));
return created;
}
}
public abstract IQueueExtent create();
2019-04-30 16:19:10 +00:00
public IQueueExtent getQueue(World world) {
2019-05-01 15:45:18 +00:00
IQueueExtent queue = queuePool.get();
2019-04-30 16:19:10 +00:00
queue.init(getOrCreate(world));
return queue;
}
2019-04-28 17:36:23 +00:00
@Override
2019-04-28 15:44:59 +00:00
public boolean trim(final boolean aggressive) {
boolean result = true;
synchronized (chunkCache) {
final Iterator<Map.Entry<World, WeakReference<WorldChunkCache>>> iter = chunkCache.entrySet().iterator();
while (iter.hasNext()) {
final Map.Entry<World, WeakReference<WorldChunkCache>> entry = iter.next();
final WeakReference<WorldChunkCache> value = entry.getValue();
final WorldChunkCache cache = value.get();
if (cache == null || cache.size() == 0 || cache.trim(aggressive)) {
iter.remove();
continue;
}
result = false;
}
}
return result;
}
2019-04-30 16:19:10 +00:00
public void apply(final World world, final Region region, final Filter filter) {
// The chunks positions to iterate over
final Set<BlockVector2> chunks = region.getChunks();
final Iterator<BlockVector2> chunksIter = chunks.iterator();
// Get a pool, to operate on the chunks in parallel
final int size = Math.min(chunks.size(), Settings.IMP.QUEUE.PARALLEL_THREADS);
2019-05-01 15:45:18 +00:00
ForkJoinTask[] tasks = new ForkJoinTask[size];
2019-04-30 16:19:10 +00:00
for (int i = 0; i < size; i++) {
2019-05-01 15:45:18 +00:00
tasks[i] = forkJoinPoolPrimary.submit(new Runnable() {
2019-04-30 16:19:10 +00:00
@Override
public void run() {
Filter newFilter = filter.fork();
// Create a chunk that we will reuse/reset for each operation
IQueueExtent queue = getQueue(world);
2019-05-01 15:45:18 +00:00
synchronized (queue) {
FilterBlock block = null;
while (true) {
// Get the next chunk pos
final BlockVector2 pos;
synchronized (chunksIter) {
if (!chunksIter.hasNext()) break;
pos = chunksIter.next();
2019-04-30 16:19:10 +00:00
}
2019-05-01 15:45:18 +00:00
final int X = pos.getX();
final int Z = pos.getZ();
IChunk chunk = queue.getCachedChunk(X, Z);
// Initialize
chunk.init(queue, X, Z);
try {
if (!newFilter.appliesChunk(X, Z)) {
continue;
}
chunk = newFilter.applyChunk(chunk);
2019-04-30 16:19:10 +00:00
2019-05-01 15:45:18 +00:00
if (chunk == null) continue;
2019-04-30 16:19:10 +00:00
2019-05-01 15:45:18 +00:00
if (block == null) block = queue.initFilterBlock();
chunk.filter(newFilter, block);
2019-04-30 16:19:10 +00:00
2019-05-01 15:45:18 +00:00
newFilter.finishChunk(chunk);
2019-04-30 16:19:10 +00:00
2019-05-01 15:45:18 +00:00
queue.submit(chunk);
} finally {
if (filter != newFilter) {
synchronized (filter) {
newFilter.join(filter);
}
2019-04-30 16:19:10 +00:00
}
}
}
2019-05-01 15:45:18 +00:00
queue.flush();
2019-04-30 16:19:10 +00:00
}
}
});
}
2019-05-01 15:45:18 +00:00
// Join filters
for (int i = 0; i < tasks.length; i++) {
ForkJoinTask task = tasks[i];
if (task != null) {
task.quietlyJoin();
}
2019-04-30 16:19:10 +00:00
}
2019-04-28 15:44:59 +00:00
}
}