From 164271374b46957775f52aedf7a39bebd3ff3df3 Mon Sep 17 00:00:00 2001 From: Hannes Greule Date: Mon, 4 Mar 2024 07:31:56 +0100 Subject: [PATCH] Decrease lock contention in SingleThreadQueueExtent (#2594) * thread local extent * avoid race conditions due to ChunkHolder pooling * clean up JFR events, javadoc * remove ThreadLocalPassthroughExtent --- .../implementation/ParallelQueueExtent.java | 33 +++++++++++++- .../queue/implementation/QueueHandler.java | 2 +- .../SingleThreadQueueExtent.java | 16 +++---- .../implementation/chunk/ChunkHolder.java | 43 +++++++++++-------- .../com/sk89q/worldedit/LocalSession.java | 5 +++ 5 files changed, 67 insertions(+), 32 deletions(-) diff --git a/worldedit-core/src/main/java/com/fastasyncworldedit/core/queue/implementation/ParallelQueueExtent.java b/worldedit-core/src/main/java/com/fastasyncworldedit/core/queue/implementation/ParallelQueueExtent.java index 2d2b45aae..52e6cf2c6 100644 --- a/worldedit-core/src/main/java/com/fastasyncworldedit/core/queue/implementation/ParallelQueueExtent.java +++ b/worldedit-core/src/main/java/com/fastasyncworldedit/core/queue/implementation/ParallelQueueExtent.java @@ -18,6 +18,7 @@ import com.fastasyncworldedit.core.queue.Filter; import com.fastasyncworldedit.core.queue.IQueueChunk; import com.fastasyncworldedit.core.queue.IQueueExtent; import com.sk89q.worldedit.MaxChangedBlocksException; +import com.sk89q.worldedit.extent.Extent; import com.sk89q.worldedit.extent.clipboard.Clipboard; import com.sk89q.worldedit.function.mask.BlockMask; import com.sk89q.worldedit.function.mask.ExistingBlockMask; @@ -45,6 +46,7 @@ import java.util.stream.IntStream; public class ParallelQueueExtent extends PassthroughExtent { private static final Logger LOGGER = LogManagerCompat.getLogger(); + private static final ThreadLocal extents = new ThreadLocal<>(); private final World world; private final QueueHandler handler; @@ -73,10 +75,36 @@ public class ParallelQueueExtent extends PassthroughExtent { this.fastmode = fastmode; } + /** + * Removes the extent currently associated with the calling thread. + */ + public static void clearCurrentExtent() { + extents.remove(); + } + + /** + * Sets the extent associated with the calling thread. + */ + public static void setCurrentExtent(Extent extent) { + extents.set(extent); + } + + private void enter(Extent extent) { + setCurrentExtent(extent); + } + + private void exit() { + clearCurrentExtent(); + } + @Override @SuppressWarnings({"unchecked", "rawtypes"}) public IQueueExtent getExtent() { - return (IQueueExtent) super.getExtent(); + Extent extent = extents.get(); + if (extent == null) { + extent = super.getExtent(); + } + return (IQueueExtent) extent; } @Override @@ -114,6 +142,7 @@ public class ParallelQueueExtent extends PassthroughExtent { final SingleThreadQueueExtent queue = (SingleThreadQueueExtent) getNewQueue(); queue.setFastMode(fastmode); queue.setFaweExceptionArray(faweExceptionReasonsUsed); + enter(queue); synchronized (queue) { try { ChunkFilterBlock block = null; @@ -154,6 +183,8 @@ public class ParallelQueueExtent extends PassthroughExtent { exceptionCount++; LOGGER.warn(message); } + } finally { + exit(); } })).toArray(ForkJoinTask[]::new); // Join filters diff --git a/worldedit-core/src/main/java/com/fastasyncworldedit/core/queue/implementation/QueueHandler.java b/worldedit-core/src/main/java/com/fastasyncworldedit/core/queue/implementation/QueueHandler.java index 956c33fb2..7bdbf4645 100644 --- a/worldedit-core/src/main/java/com/fastasyncworldedit/core/queue/implementation/QueueHandler.java +++ b/worldedit-core/src/main/java/com/fastasyncworldedit/core/queue/implementation/QueueHandler.java @@ -408,7 +408,7 @@ public abstract class QueueHandler implements Trimable, Runnable { * Sets the current thread's {@link IQueueExtent} instance in the queue pool to null. */ public void unCache() { - queuePool.set(null); + queuePool.remove(); } private IQueueExtent pool() { diff --git a/worldedit-core/src/main/java/com/fastasyncworldedit/core/queue/implementation/SingleThreadQueueExtent.java b/worldedit-core/src/main/java/com/fastasyncworldedit/core/queue/implementation/SingleThreadQueueExtent.java index 132229d1d..6e06ce348 100644 --- a/worldedit-core/src/main/java/com/fastasyncworldedit/core/queue/implementation/SingleThreadQueueExtent.java +++ b/worldedit-core/src/main/java/com/fastasyncworldedit/core/queue/implementation/SingleThreadQueueExtent.java @@ -9,7 +9,6 @@ import com.fastasyncworldedit.core.extent.processor.EmptyBatchProcessor; import com.fastasyncworldedit.core.extent.processor.ExtentBatchProcessorHolder; import com.fastasyncworldedit.core.extent.processor.ProcessorScope; import com.fastasyncworldedit.core.internal.exception.FaweException; -import com.fastasyncworldedit.core.queue.IChunk; import com.fastasyncworldedit.core.queue.IChunkCache; import com.fastasyncworldedit.core.queue.IChunkGet; import com.fastasyncworldedit.core.queue.IChunkSet; @@ -48,11 +47,9 @@ public class SingleThreadQueueExtent extends ExtentBatchProcessorHolder implemen private static final Logger LOGGER = LogManagerCompat.getLogger(); - // Pool discarded chunks for reuse (can safely be cleared by another thread) - // private static final ConcurrentLinkedQueue CHUNK_POOL = new ConcurrentLinkedQueue<>(); // Chunks currently being queued / worked on - private final Long2ObjectLinkedOpenHashMap chunks = new Long2ObjectLinkedOpenHashMap<>(); - private final ConcurrentLinkedQueue submissions = new ConcurrentLinkedQueue<>(); + private final Long2ObjectLinkedOpenHashMap> chunks = new Long2ObjectLinkedOpenHashMap<>(); + private final ConcurrentLinkedQueue> submissions = new ConcurrentLinkedQueue<>(); private final ReentrantLock getChunkLock = new ReentrantLock(); private World world = null; private int minY = 0; @@ -142,12 +139,10 @@ public class SingleThreadQueueExtent extends ExtentBatchProcessorHolder implemen if (!this.initialized) { return; } - if (!this.chunks.isEmpty()) { - getChunkLock.lock(); - for (IChunk chunk : this.chunks.values()) { - chunk.recycle(); - } + getChunkLock.lock(); + try { this.chunks.clear(); + } finally { getChunkLock.unlock(); } this.enabledQueue = true; @@ -234,7 +229,6 @@ public class SingleThreadQueueExtent extends ExtentBatchProcessorHolder implemen } } if (chunk.isEmpty()) { - chunk.recycle(); Future result = Futures.immediateFuture(null); return (V) result; } diff --git a/worldedit-core/src/main/java/com/fastasyncworldedit/core/queue/implementation/chunk/ChunkHolder.java b/worldedit-core/src/main/java/com/fastasyncworldedit/core/queue/implementation/chunk/ChunkHolder.java index 6103a1649..a7417eddf 100644 --- a/worldedit-core/src/main/java/com/fastasyncworldedit/core/queue/implementation/chunk/ChunkHolder.java +++ b/worldedit-core/src/main/java/com/fastasyncworldedit/core/queue/implementation/chunk/ChunkHolder.java @@ -1,7 +1,5 @@ package com.fastasyncworldedit.core.queue.implementation.chunk; -import com.fastasyncworldedit.core.FaweCache; -import com.fastasyncworldedit.core.configuration.Settings; import com.fastasyncworldedit.core.extent.filter.block.ChunkFilterBlock; import com.fastasyncworldedit.core.extent.processor.EmptyBatchProcessor; import com.fastasyncworldedit.core.extent.processor.heightmap.HeightMapType; @@ -11,36 +9,34 @@ import com.fastasyncworldedit.core.queue.IChunkGet; import com.fastasyncworldedit.core.queue.IChunkSet; import com.fastasyncworldedit.core.queue.IQueueChunk; import com.fastasyncworldedit.core.queue.IQueueExtent; -import com.fastasyncworldedit.core.queue.Pool; +import com.fastasyncworldedit.core.queue.implementation.ParallelQueueExtent; import com.fastasyncworldedit.core.util.MemUtil; import com.sk89q.jnbt.CompoundTag; +import com.sk89q.worldedit.internal.util.LogManagerCompat; import com.sk89q.worldedit.math.BlockVector3; import com.sk89q.worldedit.regions.Region; import com.sk89q.worldedit.world.biome.BiomeType; import com.sk89q.worldedit.world.block.BaseBlock; import com.sk89q.worldedit.world.block.BlockState; import com.sk89q.worldedit.world.block.BlockStateHolder; +import org.apache.logging.log4j.Logger; import javax.annotation.Nullable; import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; /** * An abstract {@link IChunk} class that implements basic get/set blocks. */ @SuppressWarnings("rawtypes") public class ChunkHolder> implements IQueueChunk { - - private static final Pool POOL = FaweCache.INSTANCE.registerPool( - ChunkHolder.class, - ChunkHolder::new, - Settings.settings().QUEUE.POOL - ); + private static final Logger LOGGER = LogManagerCompat.getLogger(); public static ChunkHolder newInstance() { - return POOL.poll(); + return new ChunkHolder(); } private volatile IChunkGet chunkExisting; // The existing chunk (e.g. a clipboard, or the world, before changes) @@ -63,16 +59,12 @@ public class ChunkHolder> implements IQueueChunk { this.delegate = delegate; } + private static final AtomicBoolean recycleWarning = new AtomicBoolean(false); @Override - public synchronized void recycle() { - delegate = NULL; - if (chunkSet != null) { - chunkSet.recycle(); - chunkSet = null; + public void recycle() { + if (!recycleWarning.getAndSet(true)) { + LOGGER.warn("ChunkHolder should not be recycled.", new Exception()); } - chunkExisting = null; - extent = null; - POOL.offer(this); } public long initAge() { @@ -1018,7 +1010,6 @@ public class ChunkHolder> implements IQueueChunk { // Do nothing }); } - recycle(); return null; } @@ -1031,6 +1022,7 @@ public class ChunkHolder> implements IQueueChunk { IChunkGet get = getOrCreateGet(); try { get.lockCall(); + trackExtent(); boolean postProcess = !(getExtent().getPostProcessor() instanceof EmptyBatchProcessor); final int copyKey = get.setCreateCopy(postProcess); final IChunkSet iChunkSet = getExtent().processSet(this, get, set); @@ -1046,11 +1038,24 @@ public class ChunkHolder> implements IQueueChunk { return get.call(set, finalizer); } finally { get.unlockCall(); + untrackExtent(); } } return null; } + // "call" can be called by QueueHandler#blockingExecutor. In such case, we still want the other thread + // to use this SingleThreadQueueExtent. Otherwise, many threads might end up locking on **one** STQE. + // This way, locking is spread across multiple STQEs, allowing for better performance + + private void trackExtent() { + ParallelQueueExtent.setCurrentExtent(extent); + } + + private void untrackExtent() { + ParallelQueueExtent.clearCurrentExtent(); + } + /** * Get the extent this chunk is in. */ diff --git a/worldedit-core/src/main/java/com/sk89q/worldedit/LocalSession.java b/worldedit-core/src/main/java/com/sk89q/worldedit/LocalSession.java index 67bbff7af..a00b26702 100644 --- a/worldedit-core/src/main/java/com/sk89q/worldedit/LocalSession.java +++ b/worldedit-core/src/main/java/com/sk89q/worldedit/LocalSession.java @@ -32,6 +32,7 @@ import com.fastasyncworldedit.core.internal.io.FaweOutputStream; import com.fastasyncworldedit.core.limit.FaweLimit; import com.fastasyncworldedit.core.util.BrushCache; import com.fastasyncworldedit.core.util.MainUtil; +import com.fastasyncworldedit.core.util.MaskTraverser; import com.fastasyncworldedit.core.util.StringMan; import com.fastasyncworldedit.core.util.TaskManager; import com.fastasyncworldedit.core.util.TextureHolder; @@ -53,6 +54,7 @@ import com.sk89q.worldedit.command.tool.Tool; import com.sk89q.worldedit.entity.Player; import com.sk89q.worldedit.extension.platform.Actor; import com.sk89q.worldedit.extension.platform.Locatable; +import com.sk89q.worldedit.extent.NullExtent; import com.sk89q.worldedit.extent.clipboard.BlockArrayClipboard; import com.sk89q.worldedit.extent.clipboard.Clipboard; import com.sk89q.worldedit.extent.inventory.BlockBag; @@ -594,6 +596,9 @@ public class LocalSession implements TextureHolder { long size = MainUtil.getSize(item); historySize -= size; } + // free the mask from any remaining references to e.g. extents + // if used again + new MaskTraverser(mask).reset(NullExtent.INSTANCE); } finally { historyWriteLock.unlock(); }