Some fixes and improvements to internal queuing (#1715)

* the set array given in NMSAdapter should have get data written to it
 - Fixes #1664
 - May fix #1700

* Having target size >= 2* parallel threads allows for adjacent chunks to be loaded with issues

* "empty" chunk section doesn't need to be localised to the chunk and may be static

* Switch to slightly more performant stream method for testing for non-empty sections

* Implement lock into ChunkHolder preventing any modification to occur whilst the edit is being applied to the world
 (when ChunkHolder is called)

* Add config note about target-size

* set ordinal equal to air if both set and get are `__reserved__`

* Add note to checkAndWaitOnCalledLock method of its use

* Don't print exception and throw

* Switch to a wrapped StampedLock allowing reentrant behaviour
 - StampedLock is not reentrent
 - Allow unlock from a different thread only if it provides the correct stamp
 - This stamp can only be retrieved by the thread owning the lock

* Avoid some "doubling-up" of using checkAndWaitOnCalledLock

* Unbloat `checkAndWaitOnCalledLock`

* Add since tags
This commit is contained in:
Jordan 2022-06-05 19:52:28 +01:00 committed by GitHub
parent 8228b798e5
commit 198c6b7800
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 238 additions and 28 deletions

View File

@ -4,11 +4,15 @@ import com.fastasyncworldedit.core.FAWEPlatformAdapterImpl;
import com.fastasyncworldedit.core.queue.IChunkGet;
import com.fastasyncworldedit.core.util.MathMan;
import com.sk89q.worldedit.world.block.BlockTypesCache;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.function.Function;
public class NMSAdapter implements FAWEPlatformAdapterImpl {
private static final Logger LOGGER = LogManager.getLogger();
public static int createPalette(
int[] blockToPalette,
int[] paletteToBlock,
@ -79,14 +83,20 @@ public class NMSAdapter implements FAWEPlatformAdapterImpl {
if (getArr == null) {
getArr = get.apply(layer);
}
ordinal = getArr[i];
switch (ordinal) {
// write to set array as this should be a copied array, and will be important when the changes are written
// to the GET chunk cached by FAWE
set[i] = switch (ordinal = getArr[i]) {
case BlockTypesCache.ReservedIDs.__RESERVED__ -> {
ordinal = BlockTypesCache.ReservedIDs.AIR;
nonAir--;
yield (ordinal = BlockTypesCache.ReservedIDs.AIR);
}
case BlockTypesCache.ReservedIDs.AIR, BlockTypesCache.ReservedIDs.CAVE_AIR, BlockTypesCache.ReservedIDs.VOID_AIR -> nonAir--;
}
case BlockTypesCache.ReservedIDs.AIR, BlockTypesCache.ReservedIDs.CAVE_AIR,
BlockTypesCache.ReservedIDs.VOID_AIR -> {
nonAir--;
yield ordinal;
}
default -> ordinal;
};
}
case BlockTypesCache.ReservedIDs.AIR, BlockTypesCache.ReservedIDs.CAVE_AIR, BlockTypesCache.ReservedIDs.VOID_AIR -> nonAir--;
}
@ -109,12 +119,8 @@ public class NMSAdapter implements FAWEPlatformAdapterImpl {
for (int i = 0; i < 4096; i++) {
char ordinal = set[i];
if (ordinal == BlockTypesCache.ReservedIDs.__RESERVED__) {
if (getArr == null) {
getArr = get.apply(layer);
}
if ((ordinal = getArr[i]) == BlockTypesCache.ReservedIDs.__RESERVED__) {
ordinal = BlockTypesCache.ReservedIDs.AIR;
}
LOGGER.error("Empty (__RESERVED__) ordinal given where not expected, default to air.");
ordinal = BlockTypesCache.ReservedIDs.AIR;
}
int palette = blockToPalette[ordinal];
blocksCopy[i] = palette;

View File

@ -335,7 +335,17 @@ public class Fawe {
} catch (Throwable e) {
LOGGER.error("Failed to load config.", e);
}
Settings.settings().QUEUE.TARGET_SIZE = Math.max(Settings.settings().QUEUE.TARGET_SIZE, Settings.settings().QUEUE.PARALLEL_THREADS);
Settings.settings().QUEUE.TARGET_SIZE = Math.max(
Settings.settings().QUEUE.TARGET_SIZE,
Settings.settings().QUEUE.PARALLEL_THREADS
);
if (Settings.settings().QUEUE.TARGET_SIZE < 2 * Settings.settings().QUEUE.PARALLEL_THREADS) {
LOGGER.error(
"queue.target_size is {}, and queue.parallel_threads is {}. It is HIGHLY recommended that queue" + ".target_size be at least twice queue.parallel_threads or higher.",
Settings.settings().QUEUE.TARGET_SIZE,
Settings.settings().QUEUE.PARALLEL_THREADS
);
}
try {
byte[] in = new byte[0];
byte[] compressed = LZ4Factory.fastestJavaInstance().fastCompressor().compress(in);

View File

@ -0,0 +1,119 @@
package com.fastasyncworldedit.core.concurrent;
import org.jetbrains.annotations.NotNull;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.StampedLock;
/**
* Allows for reentrant behaviour of a wrapped {@link StampedLock}. Will not count the number of times it is re-entered.
*
* @since TODO
*/
public class ReentrantWrappedStampedLock implements Lock {
private final StampedLock parent = new StampedLock();
private volatile Thread owner;
private volatile long stamp = 0;
@Override
public void lock() {
if (Thread.currentThread() == owner) {
return;
}
stamp = parent.writeLock();
owner = Thread.currentThread();
}
@Override
public void lockInterruptibly() throws InterruptedException {
if (Thread.currentThread() == owner) {
return;
}
stamp = parent.writeLockInterruptibly();
owner = Thread.currentThread();
}
@Override
public boolean tryLock() {
if (Thread.currentThread() == owner) {
return true;
}
if (parent.isWriteLocked()) {
return false;
}
stamp = parent.writeLock();
owner = Thread.currentThread();
return true;
}
@Override
public boolean tryLock(final long time, @NotNull final TimeUnit unit) throws InterruptedException {
if (Thread.currentThread() == owner) {
return true;
}
if (!parent.isWriteLocked()) {
stamp = parent.writeLock();
owner = Thread.currentThread();
return true;
}
stamp = parent.tryWriteLock(time, unit);
owner = Thread.currentThread();
return false;
}
@Override
public void unlock() {
if (owner != Thread.currentThread()) {
throw new IllegalCallerException("The lock should only be unlocked by the owning thread when a stamp is not supplied");
}
unlock(stamp);
}
@NotNull
@Override
public Condition newCondition() {
throw new UnsupportedOperationException("Conditions are not supported by StampedLock");
}
/**
* Retrieves the stamp associated with the current lock. 0 if the wrapped {@link StampedLock} is not write-locked. This method is
* thread-checking.
*
* @return lock stam[ or 0 if not locked.
* @throws IllegalCallerException if the {@link StampedLock} is write-locked and the calling thread is not the lock owner
* @since TODO
*/
public long getStampChecked() {
if (stamp != 0 && owner != Thread.currentThread()) {
throw new IllegalCallerException("The stamp should be be acquired by a thread that does not own the lock");
}
return stamp;
}
/**
* Unlock the wrapped {@link StampedLock} using the given stamp. This can be called by any thread.
*
* @param stamp Stamp to unlock with
* @throws IllegalMonitorStateException if the given stamp does not match the lock's stamp
* @since TODO
*/
public void unlock(final long stamp) {
parent.unlockWrite(stamp);
this.stamp = 0;
owner = null;
}
/**
* Returns true if the lock is currently held.
*
* @return true if the lock is currently held.
* @since TODO
*/
public boolean isLocked() {
return owner == null && this.stamp == 0 && parent.isWriteLocked(); // Be verbose
}
}

View File

@ -519,7 +519,8 @@ public class Settings extends Config {
" - A larger value will use slightly less CPU time",
" - A smaller value will reduce memory usage",
" - A value too small may break some operations (deform?)",
" - Values smaller than the configurated parallel threads are not accepted"
" - Values smaller than the configurated parallel-threads are not accepted",
" - It is recommended this option be at least 2x greater than parallel-threads"
})
public int TARGET_SIZE = 64;

View File

@ -31,7 +31,7 @@ public abstract class CharBlocks implements IBlocks {
return true;
}
};
protected final Section empty = new Section() {
protected static final Section EMPTY = new Section() {
@Override
public char[] get(CharBlocks blocks, int layer) {
// Defaults to aggressive as it should only be avoided where we know we've reset a chunk during an edit
@ -41,6 +41,9 @@ public abstract class CharBlocks implements IBlocks {
@Override
public char[] get(CharBlocks blocks, int layer, boolean aggressive) {
synchronized (blocks.sectionLocks[layer]) {
if (blocks.sections[layer] == FULL) {
return FULL.get(blocks, layer);
}
char[] arr = blocks.blocks[layer];
if (arr == null) {
arr = blocks.blocks[layer] = blocks.update(layer, null, aggressive);
@ -83,7 +86,7 @@ public abstract class CharBlocks implements IBlocks {
sections = new Section[sectionCount];
sectionLocks = new Object[sectionCount];
for (int i = 0; i < sectionCount; i++) {
sections[i] = empty;
sections[i] = EMPTY;
sectionLocks[i] = new Object();
}
}
@ -117,7 +120,7 @@ public abstract class CharBlocks implements IBlocks {
@Override
public synchronized IChunkSet reset() {
for (int i = 0; i < sectionCount; i++) {
sections[i] = empty;
sections[i] = EMPTY;
}
return null;
}
@ -125,7 +128,7 @@ public abstract class CharBlocks implements IBlocks {
public void reset(int layer) {
layer -= minSectionPosition;
synchronized (sectionLocks[layer]) {
sections[layer] = empty;
sections[layer] = EMPTY;
}
}
@ -235,8 +238,10 @@ public abstract class CharBlocks implements IBlocks {
int normalized = layer - blocks.minSectionPosition;
char[] section = get(blocks, normalized);
if (section == null) {
blocks.reset(layer);
section = blocks.empty.get(blocks, normalized, false);
synchronized (blocks.sectionLocks[normalized]) {
blocks.reset(layer);
section = EMPTY.get(blocks, normalized, false);
}
}
return section[index];
}

View File

@ -26,7 +26,7 @@ public abstract class CharGetBlocks extends CharBlocks implements IChunkGet {
@Override
public synchronized boolean trim(boolean aggressive) {
for (int i = 0; i < sectionCount; i++) {
sections[i] = empty;
sections[i] = EMPTY;
blocks[i] = null;
}
return true;
@ -49,7 +49,7 @@ public abstract class CharGetBlocks extends CharBlocks implements IChunkGet {
@Override
public synchronized boolean trim(boolean aggressive, int layer) {
layer -= minSectionPosition;
sections[layer] = empty;
sections[layer] = EMPTY;
blocks[layer] = null;
return true;
}

View File

@ -121,8 +121,8 @@ public class CharSetBlocks extends CharBlocks implements IChunkSet {
public void setBlocks(int layer, char[] data) {
updateSectionIndexRange(layer);
layer -= minSectionPosition;
this.sections[layer] = data == null ? EMPTY : FULL;
this.blocks[layer] = data;
this.sections[layer] = data == null ? empty : FULL;
}
@Override
@ -299,7 +299,8 @@ public class CharSetBlocks extends CharBlocks implements IChunkSet {
if (biomes != null || light != null || skyLight != null) {
return false;
}
return IntStream.range(minSectionPosition, maxSectionPosition + 1).noneMatch(this::hasSection);
//noinspection SimplifyStreamApiCallChains - this is faster than using #noneMatch
return !IntStream.range(minSectionPosition, maxSectionPosition + 1).anyMatch(this::hasSection);
}
@Override
@ -347,7 +348,7 @@ public class CharSetBlocks extends CharBlocks implements IChunkSet {
System.arraycopy(sections, 0, tmpSections, diff, sections.length);
System.arraycopy(sectionLocks, 0, tmpSectionLocks, diff, sections.length);
for (int i = 0; i < diff; i++) {
tmpSections[i] = empty;
tmpSections[i] = EMPTY;
tmpSectionLocks[i] = new Object();
}
blocks = tmpBlocks;
@ -379,7 +380,7 @@ public class CharSetBlocks extends CharBlocks implements IChunkSet {
System.arraycopy(sections, 0, tmpSections, 0, sections.length);
System.arraycopy(sectionLocks, 0, tmpSectionLocks, 0, sections.length);
for (int i = sectionCount - diff; i < sectionCount; i++) {
tmpSections[i] = empty;
tmpSections[i] = EMPTY;
tmpSectionLocks[i] = new Object();
}
blocks = tmpBlocks;

View File

@ -1,6 +1,7 @@
package com.fastasyncworldedit.core.queue.implementation.chunk;
import com.fastasyncworldedit.core.FaweCache;
import com.fastasyncworldedit.core.concurrent.ReentrantWrappedStampedLock;
import com.fastasyncworldedit.core.configuration.Settings;
import com.fastasyncworldedit.core.extent.filter.block.ChunkFilterBlock;
import com.fastasyncworldedit.core.extent.processor.EmptyBatchProcessor;
@ -42,6 +43,8 @@ public class ChunkHolder<T extends Future<T>> implements IQueueChunk<T> {
return POOL.poll();
}
private final ReentrantWrappedStampedLock calledLock = new ReentrantWrappedStampedLock();
private IChunkGet chunkExisting; // The existing chunk (e.g. a clipboard, or the world, before changes)
private IChunkSet chunkSet; // The blocks to be set to the chunkExisting
private IBlockDelegate delegate; // delegate handles the abstraction of the chunk layers
@ -64,55 +67,75 @@ public class ChunkHolder<T extends Future<T>> implements IQueueChunk<T> {
@Override
public synchronized void recycle() {
delegate = NULL;
chunkSet = null;
}
public synchronized IBlockDelegate getDelegate() {
return delegate;
}
/**
* If the chunk is currently being "called", this method will block until completed.
*/
private void checkAndWaitOnCalledLock() {
if (calledLock.isLocked()) {
calledLock.lock();
calledLock.unlock();
}
}
@Override
public boolean setTile(int x, int y, int z, CompoundTag tag) {
checkAndWaitOnCalledLock();
return delegate.set(this).setTile(x, y, z, tag);
}
@Override
public CompoundTag getTile(int x, int y, int z) {
checkAndWaitOnCalledLock();
return delegate.set(this).getTile(x, y, z);
}
@Override
public void setEntity(CompoundTag tag) {
checkAndWaitOnCalledLock();
delegate.set(this).setEntity(tag);
}
@Override
public void removeEntity(UUID uuid) {
checkAndWaitOnCalledLock();
delegate.set(this).removeEntity(uuid);
}
@Override
public Set<UUID> getEntityRemoves() {
checkAndWaitOnCalledLock();
return delegate.set(this).getEntityRemoves();
}
@Override
public BiomeType[][] getBiomes() {
checkAndWaitOnCalledLock();
// Uses set as this method is only used to retrieve biomes that have been set to the extent/chunk.
return delegate.set(this).getBiomes();
}
@Override
public char[][] getLight() {
checkAndWaitOnCalledLock();
return delegate.set(this).getLight();
}
@Override
public char[][] getSkyLight() {
checkAndWaitOnCalledLock();
return delegate.set(this).getSkyLight();
}
@Override
public void setBlocks(int layer, char[] data) {
checkAndWaitOnCalledLock();
delegate.set(this).setBlocks(layer, data);
}
@ -124,7 +147,10 @@ public class ChunkHolder<T extends Future<T>> implements IQueueChunk<T> {
@Nullable
@Override
public char[] loadIfPresent(final int layer) {
return getOrCreateGet().loadIfPresent(layer);
if (chunkExisting == null) {
return null;
}
return chunkExisting.loadIfPresent(layer);
}
@Override
@ -134,10 +160,12 @@ public class ChunkHolder<T extends Future<T>> implements IQueueChunk<T> {
@Override
public void setFastMode(boolean fastmode) {
checkAndWaitOnCalledLock();
this.fastmode = fastmode;
}
public void setBitMask(int bitMask) {
checkAndWaitOnCalledLock();
this.bitMask = bitMask;
}
@ -147,6 +175,7 @@ public class ChunkHolder<T extends Future<T>> implements IQueueChunk<T> {
@Override
public boolean hasBiomes(final int layer) {
checkAndWaitOnCalledLock();
// No need to go through delegate. hasBiomes is SET only.
return chunkSet != null && chunkSet.hasBiomes(layer);
}
@ -157,11 +186,13 @@ public class ChunkHolder<T extends Future<T>> implements IQueueChunk<T> {
@Override
public CompoundTag getEntity(UUID uuid) {
checkAndWaitOnCalledLock();
return delegate.get(this).getEntity(uuid);
}
@Override
public void setCreateCopy(boolean createCopy) {
checkAndWaitOnCalledLock();
this.createCopy = createCopy;
}
@ -172,16 +203,19 @@ public class ChunkHolder<T extends Future<T>> implements IQueueChunk<T> {
@Override
public void setLightingToGet(char[][] lighting, int minSectionPosition, int maxSectionPosition) {
checkAndWaitOnCalledLock();
delegate.setLightingToGet(this, lighting);
}
@Override
public void setSkyLightingToGet(char[][] lighting, int minSectionPosition, int maxSectionPosition) {
checkAndWaitOnCalledLock();
delegate.setSkyLightingToGet(this, lighting);
}
@Override
public void setHeightmapToGet(HeightMapType type, int[] data) {
checkAndWaitOnCalledLock();
delegate.setHeightmapToGet(this, type, data);
}
@ -206,6 +240,7 @@ public class ChunkHolder<T extends Future<T>> implements IQueueChunk<T> {
}
public void flushLightToGet() {
checkAndWaitOnCalledLock();
delegate.flushLightToGet(this);
}
@ -872,16 +907,19 @@ public class ChunkHolder<T extends Future<T>> implements IQueueChunk<T> {
@Override
public Map<BlockVector3, CompoundTag> getTiles() {
checkAndWaitOnCalledLock();
return delegate.get(this).getTiles();
}
@Override
public Set<CompoundTag> getEntities() {
checkAndWaitOnCalledLock();
return delegate.get(this).getEntities();
}
@Override
public boolean hasSection(int layer) {
checkAndWaitOnCalledLock();
return chunkExisting != null && chunkExisting.hasSection(layer);
}
@ -933,6 +971,7 @@ public class ChunkHolder<T extends Future<T>> implements IQueueChunk<T> {
@Override
public boolean isEmpty() {
checkAndWaitOnCalledLock();
return chunkSet == null || chunkSet.isEmpty();
}
@ -940,6 +979,7 @@ public class ChunkHolder<T extends Future<T>> implements IQueueChunk<T> {
* Get or create the existing part of this chunk.
*/
public final IChunkGet getOrCreateGet() {
checkAndWaitOnCalledLock();
if (chunkExisting == null) {
chunkExisting = newWrappedGet();
chunkExisting.trim(false);
@ -951,6 +991,7 @@ public class ChunkHolder<T extends Future<T>> implements IQueueChunk<T> {
* Get or create the settable part of this chunk.
*/
public final IChunkSet getOrCreateSet() {
checkAndWaitOnCalledLock();
if (chunkSet == null) {
chunkSet = newWrappedSet();
}
@ -992,9 +1033,19 @@ public class ChunkHolder<T extends Future<T>> implements IQueueChunk<T> {
@Override
public synchronized T call() {
if (chunkSet != null) {
calledLock.lock();
final long stamp = calledLock.getStampChecked();
if (chunkSet != null && !chunkSet.isEmpty()) {
chunkSet.setBitMask(bitMask);
return this.call(chunkSet, this::recycle);
try {
return this.call(chunkSet, () -> {
recycle();
calledLock.unlock(stamp);
});
} catch (Throwable t) {
calledLock.unlock();
throw t;
}
}
return null;
}
@ -1036,86 +1087,103 @@ public class ChunkHolder<T extends Future<T>> implements IQueueChunk<T> {
@Override
public boolean setBiome(int x, int y, int z, BiomeType biome) {
checkAndWaitOnCalledLock();
return delegate.setBiome(this, x, y, z, biome);
}
@Override
public <B extends BlockStateHolder<B>> boolean setBlock(int x, int y, int z, B block) {
checkAndWaitOnCalledLock();
return delegate.setBlock(this, x, y, z, block);
}
@Override
public BiomeType getBiomeType(int x, int y, int z) {
checkAndWaitOnCalledLock();
return delegate.getBiome(this, x, y, z);
}
@Override
public BlockState getBlock(int x, int y, int z) {
checkAndWaitOnCalledLock();
return delegate.getBlock(this, x, y, z);
}
@Override
public BaseBlock getFullBlock(int x, int y, int z) {
checkAndWaitOnCalledLock();
return delegate.getFullBlock(this, x, y, z);
}
@Override
public void setSkyLight(int x, int y, int z, int value) {
checkAndWaitOnCalledLock();
delegate.setSkyLight(this, x, y, z, value);
}
@Override
public void setHeightMap(HeightMapType type, int[] heightMap) {
checkAndWaitOnCalledLock();
delegate.setHeightMap(this, type, heightMap);
}
@Override
public void removeSectionLighting(int layer, boolean sky) {
checkAndWaitOnCalledLock();
delegate.removeSectionLighting(this, layer, sky);
}
@Override
public void setFullBright(int layer) {
checkAndWaitOnCalledLock();
delegate.setFullBright(this, layer);
}
@Override
public void setBlockLight(int x, int y, int z, int value) {
checkAndWaitOnCalledLock();
delegate.setBlockLight(this, x, y, z, value);
}
@Override
public void setLightLayer(int layer, char[] toSet) {
checkAndWaitOnCalledLock();
delegate.setLightLayer(this, layer, toSet);
}
@Override
public void setSkyLightLayer(int layer, char[] toSet) {
checkAndWaitOnCalledLock();
delegate.setSkyLightLayer(this, layer, toSet);
}
@Override
public int getSkyLight(int x, int y, int z) {
checkAndWaitOnCalledLock();
return delegate.getSkyLight(this, x, y, z);
}
@Override
public int getEmittedLight(int x, int y, int z) {
checkAndWaitOnCalledLock();
return delegate.getEmittedLight(this, x, y, z);
}
@Override
public int getBrightness(int x, int y, int z) {
checkAndWaitOnCalledLock();
return delegate.getBrightness(this, x, y, z);
}
@Override
public int getOpacity(int x, int y, int z) {
checkAndWaitOnCalledLock();
return delegate.getOpacity(this, x, y, z);
}
@Override
public int[] getHeightMap(HeightMapType type) {
checkAndWaitOnCalledLock();
return delegate.getHeightMap(this, type);
}