fix: do not clash ChunkHolder recycling with processors that extend outside the chunk (#2353)

This commit is contained in:
Jordan 2023-07-20 16:57:29 +01:00 committed by GitHub
parent fe1859e9d2
commit 0a8a479214
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 34 additions and 143 deletions

View File

@ -1,119 +0,0 @@
package com.fastasyncworldedit.core.concurrent;
import org.jetbrains.annotations.NotNull;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.StampedLock;
/**
* Allows for reentrant behaviour of a wrapped {@link StampedLock}. Will not count the number of times it is re-entered.
*
* @since 2.3.0
*/
public class ReentrantWrappedStampedLock implements Lock {
private final StampedLock parent = new StampedLock();
private volatile Thread owner;
private volatile long stamp = 0;
@Override
public void lock() {
if (Thread.currentThread() == owner) {
return;
}
stamp = parent.writeLock();
owner = Thread.currentThread();
}
@Override
public void lockInterruptibly() throws InterruptedException {
if (Thread.currentThread() == owner) {
return;
}
stamp = parent.writeLockInterruptibly();
owner = Thread.currentThread();
}
@Override
public boolean tryLock() {
if (Thread.currentThread() == owner) {
return true;
}
if (parent.isWriteLocked()) {
return false;
}
stamp = parent.writeLock();
owner = Thread.currentThread();
return true;
}
@Override
public boolean tryLock(final long time, @NotNull final TimeUnit unit) throws InterruptedException {
if (Thread.currentThread() == owner) {
return true;
}
if (!parent.isWriteLocked()) {
stamp = parent.writeLock();
owner = Thread.currentThread();
return true;
}
stamp = parent.tryWriteLock(time, unit);
owner = Thread.currentThread();
return false;
}
@Override
public void unlock() {
if (owner != Thread.currentThread()) {
throw new IllegalCallerException("The lock should only be unlocked by the owning thread when a stamp is not supplied");
}
unlock(stamp);
}
@NotNull
@Override
public Condition newCondition() {
throw new UnsupportedOperationException("Conditions are not supported by StampedLock");
}
/**
* Retrieves the stamp associated with the current lock. 0 if the wrapped {@link StampedLock} is not write-locked. This method is
* thread-checking.
*
* @return lock stam[ or 0 if not locked.
* @throws IllegalCallerException if the {@link StampedLock} is write-locked and the calling thread is not the lock owner
* @since 2.3.0
*/
public long getStampChecked() {
if (stamp != 0 && owner != Thread.currentThread()) {
throw new IllegalCallerException("The stamp should be be acquired by a thread that does not own the lock");
}
return stamp;
}
/**
* Unlock the wrapped {@link StampedLock} using the given stamp. This can be called by any thread.
*
* @param stamp Stamp to unlock with
* @throws IllegalMonitorStateException if the given stamp does not match the lock's stamp
* @since 2.3.0
*/
public void unlock(final long stamp) {
parent.unlockWrite(stamp);
this.stamp = 0;
owner = null;
}
/**
* Returns true if the lock is currently held.
*
* @return true if the lock is currently held.
* @since 2.3.0
*/
public boolean isLocked() {
return owner == null && this.stamp == 0 && parent.isWriteLocked(); // Be verbose
}
}

View File

@ -2,9 +2,7 @@ package com.fastasyncworldedit.core.queue;
import com.sk89q.jnbt.CompoundTag; import com.sk89q.jnbt.CompoundTag;
import com.sk89q.jnbt.DoubleTag; import com.sk89q.jnbt.DoubleTag;
import com.sk89q.jnbt.IntArrayTag;
import com.sk89q.jnbt.ListTag; import com.sk89q.jnbt.ListTag;
import com.sk89q.jnbt.LongTag;
import com.sk89q.jnbt.NBTUtils; import com.sk89q.jnbt.NBTUtils;
import com.sk89q.jnbt.StringTag; import com.sk89q.jnbt.StringTag;
import com.sk89q.jnbt.Tag; import com.sk89q.jnbt.Tag;

View File

@ -83,17 +83,6 @@ public class SingleThreadQueueExtent extends ExtentBatchProcessorHolder implemen
this.maxY = maxY; this.maxY = maxY;
} }
/**
* Safety check to ensure that the thread being used matches the one being initialized on. - Can
* be removed later
*/
private void checkThread() {
if (Thread.currentThread() != currentThread && currentThread != null) {
throw new UnsupportedOperationException(
"This class must be used from a single thread. Use multiple queues for concurrent operations");
}
}
@Override @Override
public void enableQueue() { public void enableQueue() {
enabledQueue = true; enabledQueue = true;
@ -154,10 +143,10 @@ public class SingleThreadQueueExtent extends ExtentBatchProcessorHolder implemen
return; return;
} }
if (!this.chunks.isEmpty()) { if (!this.chunks.isEmpty()) {
getChunkLock.lock();
for (IChunk chunk : this.chunks.values()) { for (IChunk chunk : this.chunks.values()) {
chunk.recycle(); chunk.recycle();
} }
getChunkLock.lock();
this.chunks.clear(); this.chunks.clear();
getChunkLock.unlock(); getChunkLock.unlock();
} }
@ -233,9 +222,21 @@ public class SingleThreadQueueExtent extends ExtentBatchProcessorHolder implemen
*/ */
private <V extends Future<V>> V submitUnchecked(IQueueChunk chunk) { private <V extends Future<V>> V submitUnchecked(IQueueChunk chunk) {
if (chunk.isEmpty()) { if (chunk.isEmpty()) {
chunk.recycle(); if (chunk instanceof ChunkHolder<?> holder) {
Future result = Futures.immediateFuture(null); long age = holder.initAge();
return (V) result; // Ensure we've given time for the chunk to be used - it was likely used for a reason!
if (age < 5) {
try {
Thread.sleep(5 - age);
} catch (InterruptedException ignored) {
}
}
}
if (chunk.isEmpty()) {
chunk.recycle();
Future result = Futures.immediateFuture(null);
return (V) result;
}
} }
if (Fawe.isMainThread()) { if (Fawe.isMainThread()) {
@ -451,6 +452,7 @@ public class SingleThreadQueueExtent extends ExtentBatchProcessorHolder implemen
@Override @Override
public synchronized void flush() { public synchronized void flush() {
if (!chunks.isEmpty()) { if (!chunks.isEmpty()) {
getChunkLock.lock();
if (MemUtil.isMemoryLimited()) { if (MemUtil.isMemoryLimited()) {
for (IQueueChunk chunk : chunks.values()) { for (IQueueChunk chunk : chunks.values()) {
final Future future = submitUnchecked(chunk); final Future future = submitUnchecked(chunk);
@ -467,7 +469,6 @@ public class SingleThreadQueueExtent extends ExtentBatchProcessorHolder implemen
} }
} }
} }
getChunkLock.lock();
chunks.clear(); chunks.clear();
getChunkLock.unlock(); getChunkLock.unlock();
} }

View File

@ -1,7 +1,6 @@
package com.fastasyncworldedit.core.queue.implementation.chunk; package com.fastasyncworldedit.core.queue.implementation.chunk;
import com.fastasyncworldedit.core.FaweCache; import com.fastasyncworldedit.core.FaweCache;
import com.fastasyncworldedit.core.concurrent.ReentrantWrappedStampedLock;
import com.fastasyncworldedit.core.configuration.Settings; import com.fastasyncworldedit.core.configuration.Settings;
import com.fastasyncworldedit.core.extent.filter.block.ChunkFilterBlock; import com.fastasyncworldedit.core.extent.filter.block.ChunkFilterBlock;
import com.fastasyncworldedit.core.extent.processor.EmptyBatchProcessor; import com.fastasyncworldedit.core.extent.processor.EmptyBatchProcessor;
@ -26,6 +25,8 @@ import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/** /**
* An abstract {@link IChunk} class that implements basic get/set blocks. * An abstract {@link IChunk} class that implements basic get/set blocks.
@ -43,7 +44,7 @@ public class ChunkHolder<T extends Future<T>> implements IQueueChunk<T> {
return POOL.poll(); return POOL.poll();
} }
private final ReentrantWrappedStampedLock calledLock = new ReentrantWrappedStampedLock(); private final Lock calledLock = new ReentrantLock();
private volatile IChunkGet chunkExisting; // The existing chunk (e.g. a clipboard, or the world, before changes) private volatile IChunkGet chunkExisting; // The existing chunk (e.g. a clipboard, or the world, before changes)
private volatile IChunkSet chunkSet; // The blocks to be set to the chunkExisting private volatile IChunkSet chunkSet; // The blocks to be set to the chunkExisting
@ -55,6 +56,7 @@ public class ChunkHolder<T extends Future<T>> implements IQueueChunk<T> {
private int bitMask = -1; // Allow forceful setting of bitmask (for lighting) private int bitMask = -1; // Allow forceful setting of bitmask (for lighting)
private boolean isInit = false; // Lighting handles queue differently. It relies on the chunk cache and not doing init. private boolean isInit = false; // Lighting handles queue differently. It relies on the chunk cache and not doing init.
private boolean createCopy = false; private boolean createCopy = false;
private long initTime = -1L;
private ChunkHolder() { private ChunkHolder() {
this.delegate = NULL; this.delegate = NULL;
@ -66,6 +68,7 @@ public class ChunkHolder<T extends Future<T>> implements IQueueChunk<T> {
@Override @Override
public synchronized void recycle() { public synchronized void recycle() {
calledLock.lock();
delegate = NULL; delegate = NULL;
if (chunkSet != null) { if (chunkSet != null) {
chunkSet.recycle(); chunkSet.recycle();
@ -74,6 +77,11 @@ public class ChunkHolder<T extends Future<T>> implements IQueueChunk<T> {
chunkExisting = null; chunkExisting = null;
extent = null; extent = null;
POOL.offer(this); POOL.offer(this);
calledLock.unlock();
}
public long initAge() {
return System.currentTimeMillis() - initTime;
} }
public synchronized IBlockDelegate getDelegate() { public synchronized IBlockDelegate getDelegate() {
@ -84,10 +92,10 @@ public class ChunkHolder<T extends Future<T>> implements IQueueChunk<T> {
* If the chunk is currently being "called", this method will block until completed. * If the chunk is currently being "called", this method will block until completed.
*/ */
private void checkAndWaitOnCalledLock() { private void checkAndWaitOnCalledLock() {
if (calledLock.isLocked()) { if (!calledLock.tryLock()) {
calledLock.lock(); calledLock.lock();
calledLock.unlock();
} }
calledLock.unlock();
} }
@Override @Override
@ -1024,6 +1032,7 @@ public class ChunkHolder<T extends Future<T>> implements IQueueChunk<T> {
@Override @Override
public synchronized <V extends IChunk> void init(IQueueExtent<V> extent, int chunkX, int chunkZ) { public synchronized <V extends IChunk> void init(IQueueExtent<V> extent, int chunkX, int chunkZ) {
this.initTime = System.currentTimeMillis();
this.extent = extent; this.extent = extent;
this.chunkX = chunkX; this.chunkX = chunkX;
this.chunkZ = chunkZ; this.chunkZ = chunkZ;
@ -1040,14 +1049,15 @@ public class ChunkHolder<T extends Future<T>> implements IQueueChunk<T> {
@Override @Override
public synchronized T call() { public synchronized T call() {
calledLock.lock(); calledLock.lock();
final long stamp = calledLock.getStampChecked();
if (chunkSet != null && !chunkSet.isEmpty()) { if (chunkSet != null && !chunkSet.isEmpty()) {
this.delegate = GET; this.delegate = GET;
chunkSet.setBitMask(bitMask); chunkSet.setBitMask(bitMask);
try { try {
IChunkSet copy = chunkSet.createCopy(); IChunkSet copy = chunkSet.createCopy();
chunkSet = null; chunkSet = null;
return this.call(copy, () -> calledLock.unlock(stamp)); return this.call(copy, () -> {
// Do nothing
});
} catch (Throwable t) { } catch (Throwable t) {
calledLock.unlock(); calledLock.unlock();
throw t; throw t;
@ -1072,6 +1082,7 @@ public class ChunkHolder<T extends Future<T>> implements IQueueChunk<T> {
} else { } else {
finalizer = finalize; finalizer = finalize;
} }
calledLock.unlock();
return get.call(set, finalizer); return get.call(set, finalizer);
} }
return null; return null;