diff --git a/worldedit-core/src/main/java/com/fastasyncworldedit/core/configuration/Settings.java b/worldedit-core/src/main/java/com/fastasyncworldedit/core/configuration/Settings.java index 3b80dcdfa..4db469187 100644 --- a/worldedit-core/src/main/java/com/fastasyncworldedit/core/configuration/Settings.java +++ b/worldedit-core/src/main/java/com/fastasyncworldedit/core/configuration/Settings.java @@ -622,6 +622,13 @@ public class Settings extends Config { }) public static class EXPERIMENTAL { + @Comment({ + "Undo operation batch size", + " - The size defines the number of changes read at once.", + " - Larger numbers might reduce overhead but increase latency for edits with only few changes.", + " - 0 means undo operations are not batched."}) + public int UNDO_BATCH_SIZE = 128; + @Comment({ "[UNSAFE] Directly modify the region files. (OBSOLETE - USE ANVIL COMMANDS)", " - IMPROPER USE CAN CAUSE WORLD CORRUPTION!", diff --git a/worldedit-core/src/main/java/com/fastasyncworldedit/core/history/change/ChangePopulator.java b/worldedit-core/src/main/java/com/fastasyncworldedit/core/history/change/ChangePopulator.java new file mode 100644 index 000000000..6831a443c --- /dev/null +++ b/worldedit-core/src/main/java/com/fastasyncworldedit/core/history/change/ChangePopulator.java @@ -0,0 +1,65 @@ +package com.fastasyncworldedit.core.history.change; + +import com.sk89q.worldedit.history.change.Change; +import org.jetbrains.annotations.ApiStatus; +import org.jetbrains.annotations.Contract; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +/** + * @since TODO + */ +@ApiStatus.Internal +public interface ChangePopulator { + + static ChangePopulator empty() { + class Empty implements ChangePopulator { + private static final Empty EMPTY = new Empty(); + + @Override + public @NotNull C create() { + throw new UnsupportedOperationException("empty"); + } + + @Override + public @Nullable C populate(@NotNull final C change) { + return null; + } + + @Override + public @Nullable C updateOrCreate(@Nullable final Change change) { + return null; + } + + @Override + public boolean accepts(final Change change) { + return false; + } + } + return Empty.EMPTY; + } + + @SuppressWarnings("unchecked") + default @NotNull C update(@Nullable Change before) { + if (accepts(before)) { + return (C) before; + } + return create(); + } + + @NotNull + C create(); + + @Nullable + default C updateOrCreate(@Nullable Change change) { + C u = update(change); + return populate(u); + } + + @Nullable + C populate(@NotNull C change); + + @Contract("null->false") + boolean accepts(Change change); + +} diff --git a/worldedit-core/src/main/java/com/fastasyncworldedit/core/history/changeset/AbstractChangeSet.java b/worldedit-core/src/main/java/com/fastasyncworldedit/core/history/changeset/AbstractChangeSet.java index 346543a0b..84c1193eb 100644 --- a/worldedit-core/src/main/java/com/fastasyncworldedit/core/history/changeset/AbstractChangeSet.java +++ b/worldedit-core/src/main/java/com/fastasyncworldedit/core/history/changeset/AbstractChangeSet.java @@ -32,6 +32,7 @@ import com.sk89q.worldedit.world.block.BaseBlock; import com.sk89q.worldedit.world.block.BlockState; import com.sk89q.worldedit.world.block.BlockTypesCache; import org.apache.logging.log4j.Logger; +import org.jetbrains.annotations.ApiStatus; import java.io.IOException; import java.util.Iterator; @@ -250,6 +251,13 @@ public abstract class AbstractChangeSet implements ChangeSet, IBatchProcessor { return getIterator(redo); } + /** + * {@return a coordinator to exchange sets of changes between a producer and a consumer} + * @since TODO + */ + @ApiStatus.Internal + public abstract ChangeExchangeCoordinator getCoordinatedChanges(BlockBag blockBag, int mode, boolean dir); + public abstract Iterator getIterator(boolean redo); public EditSession toEditSession(Actor actor) { diff --git a/worldedit-core/src/main/java/com/fastasyncworldedit/core/history/changeset/AbstractDelegateChangeSet.java b/worldedit-core/src/main/java/com/fastasyncworldedit/core/history/changeset/AbstractDelegateChangeSet.java index a44e73013..f3f0cdae4 100644 --- a/worldedit-core/src/main/java/com/fastasyncworldedit/core/history/changeset/AbstractDelegateChangeSet.java +++ b/worldedit-core/src/main/java/com/fastasyncworldedit/core/history/changeset/AbstractDelegateChangeSet.java @@ -76,6 +76,11 @@ public class AbstractDelegateChangeSet extends AbstractChangeSet { return parent.getIterator(blockBag, mode, redo); } + @Override + public ChangeExchangeCoordinator getCoordinatedChanges(final BlockBag blockBag, final int mode, final boolean dir) { + return parent.getCoordinatedChanges(blockBag, mode, dir); + } + @Override public Iterator getIterator(boolean redo) { return parent.getIterator(redo); diff --git a/worldedit-core/src/main/java/com/fastasyncworldedit/core/history/changeset/ChangeExchangeCoordinator.java b/worldedit-core/src/main/java/com/fastasyncworldedit/core/history/changeset/ChangeExchangeCoordinator.java new file mode 100644 index 000000000..56d47500f --- /dev/null +++ b/worldedit-core/src/main/java/com/fastasyncworldedit/core/history/changeset/ChangeExchangeCoordinator.java @@ -0,0 +1,50 @@ +package com.fastasyncworldedit.core.history.changeset; + +import com.sk89q.worldedit.history.change.Change; +import org.jetbrains.annotations.ApiStatus; + +import java.util.concurrent.Exchanger; +import java.util.function.BiConsumer; + +/** + * @since TODO + */ +@ApiStatus.Internal +public class ChangeExchangeCoordinator implements AutoCloseable { + + private static final Thread.Builder.OfVirtual UNDO_VIRTUAL_THREAD_BUILDER = Thread.ofVirtual() + .name("FAWE undo", 0); + private final Exchanger exchanger; + private final BiConsumer, Change[]> runnerTask; + private boolean started = false; + private Thread runner; + + public ChangeExchangeCoordinator(BiConsumer, Change[]> runner) { + this.runnerTask = runner; + this.exchanger = new Exchanger<>(); + } + + public Change[] take(Change[] consumed) { + if (!this.started) { + this.started = true; + final int length = consumed.length; + this.runner = UNDO_VIRTUAL_THREAD_BUILDER + .start(() -> this.runnerTask.accept(this.exchanger, new Change[length])); + } + try { + return exchanger.exchange(consumed); + } catch (InterruptedException e) { + this.runner.interrupt(); + Thread.currentThread().interrupt(); + return null; + } + } + + @Override + public void close() { + if (this.runner != null) { + this.runner.interrupt(); + } + } + +} diff --git a/worldedit-core/src/main/java/com/fastasyncworldedit/core/history/changeset/FaweStreamChangeSet.java b/worldedit-core/src/main/java/com/fastasyncworldedit/core/history/changeset/FaweStreamChangeSet.java index e7132bf5a..211a71303 100644 --- a/worldedit-core/src/main/java/com/fastasyncworldedit/core/history/changeset/FaweStreamChangeSet.java +++ b/worldedit-core/src/main/java/com/fastasyncworldedit/core/history/changeset/FaweStreamChangeSet.java @@ -1,6 +1,7 @@ package com.fastasyncworldedit.core.history.changeset; import com.fastasyncworldedit.core.configuration.Settings; +import com.fastasyncworldedit.core.history.change.ChangePopulator; import com.fastasyncworldedit.core.history.change.MutableBiomeChange; import com.fastasyncworldedit.core.history.change.MutableBlockChange; import com.fastasyncworldedit.core.history.change.MutableEntityChange; @@ -20,15 +21,22 @@ import com.sk89q.worldedit.regions.Region; import com.sk89q.worldedit.world.World; import com.sk89q.worldedit.world.biome.BiomeType; import com.sk89q.worldedit.world.block.BlockTypes; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.ArrayDeque; import java.util.Arrays; import java.util.Collections; import java.util.Iterator; +import java.util.List; import java.util.NoSuchElementException; +import java.util.Queue; +import java.util.concurrent.Exchanger; +import java.util.function.BiConsumer; /** * FAWE stream ChangeSet offering support for extended-height worlds @@ -711,6 +719,284 @@ public abstract class FaweStreamChangeSet extends AbstractChangeSet { } } + @Override + public ChangeExchangeCoordinator getCoordinatedChanges(BlockBag blockBag, int mode, boolean dir) { + try { + return coordinatedChanges(blockBag, mode, dir); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private ChangeExchangeCoordinator coordinatedChanges(final BlockBag blockBag, final int mode, boolean dir) throws IOException { + close(); + var tileCreate = tileChangePopulator(getTileCreateIS(), true); + var tileRemove = tileChangePopulator(getTileRemoveIS(), false); + + var entityCreate = entityChangePopulator(getEntityCreateIS(), true); + var entityRemove = entityChangePopulator(getEntityRemoveIS(), false); + + var blockChange = blockBag != null && mode > 0 ? fullBlockChangePopulator(blockBag, mode, dir) : blockChangePopulator(dir); + + var biomeChange = biomeChangePopulator(dir); + + Queue> populators = new ArrayDeque<>(List.of( + tileCreate, + tileRemove, + entityCreate, + entityRemove, + blockChange, + biomeChange + )); + BiConsumer, Change[]> task = (exchanger, array) -> { + while (fillArray(array, populators)) { + try { + array = exchanger.exchange(array); + } catch (InterruptedException e) { + return; + } + } + }; + return new ChangeExchangeCoordinator(task); + } + + private boolean fillArray(Change[] changes, Queue> populators) { + ChangePopulator populator = populators.peek(); + if (populator == null) { + return false; + } + for (int i = 0; i < changes.length; i++) { + Change change = changes[i]; + do { + change = populator.updateOrCreate(change); + if (change == null) { + populators.remove(); + populator = populators.peek(); + if (populator == null) { + changes[i] = null; // mark end + return true; // still needs to consume the elements of the current round + } + } else { + break; + } + } while (true); + changes[i] = change; + } + return true; + } + + private static abstract class CompoundTagPopulator implements ChangePopulator { + private final NBTInputStream inputStream; + + private CompoundTagPopulator(final NBTInputStream stream) { + inputStream = stream; + } + + @Override + public @Nullable C populate(final @NotNull C change) { + try { + write(change, (CompoundTag) inputStream.readTag()); + return change; + } catch (Exception ignored) { + } + try { + inputStream.close(); + } catch (IOException e) { + e.printStackTrace(); + } + return null; + } + + protected abstract void write(C change, CompoundTag tag); + } + + private ChangePopulator tileChangePopulator(NBTInputStream is, boolean create) { + if (is == null) { + return ChangePopulator.empty(); + } + class Populator extends CompoundTagPopulator { + + private Populator() { + super(is); + } + + @Override + public @NotNull MutableTileChange create() { + return new MutableTileChange(null, create); + } + + @Override + protected void write(final MutableTileChange change, final CompoundTag tag) { + change.tag = tag; + } + + @Override + public boolean accepts(final Change change) { + return change instanceof MutableTileChange; + } + + } + return new Populator(); + } + private ChangePopulator entityChangePopulator(NBTInputStream is, boolean create) { + if (is == null) { + return ChangePopulator.empty(); + } + class Populator extends CompoundTagPopulator { + + private Populator() { + super(is); + } + + @Override + public @NotNull MutableEntityChange create() { + return new MutableEntityChange(null, create); + } + + @Override + protected void write(final MutableEntityChange change, final CompoundTag tag) { + change.tag = tag; + } + + @Override + public boolean accepts(final Change change) { + return change instanceof MutableTileChange; + } + + } + return new Populator(); + } + + private ChangePopulator fullBlockChangePopulator(BlockBag blockBag, int mode, boolean dir) throws + IOException { + final FaweInputStream is = getBlockIS(); + if (is == null) { + return ChangePopulator.empty(); + } + class Populator implements ChangePopulator { + + @Override + public @NotNull MutableFullBlockChange create() { + return new MutableFullBlockChange(blockBag, mode, dir); + } + + @Override + public @Nullable MutableFullBlockChange populate(@NotNull final MutableFullBlockChange change) { + try { + change.x = posDel.readX(is) + originX; + change.y = posDel.readY(is); + change.z = posDel.readZ(is) + originZ; + idDel.readCombined(is, change); + return change; + } catch (EOFException ignored) { + } catch (Exception e) { + e.printStackTrace(); + } + try { + is.close(); + } catch (IOException e) { + e.printStackTrace(); + } + return null; + } + + @Override + public boolean accepts(final Change change) { + return change instanceof MutableFullBlockChange; + } + + } + return new Populator(); + + } + + private ChangePopulator blockChangePopulator(boolean dir) throws IOException { + final FaweInputStream is = getBlockIS(); + if (is == null) { + return ChangePopulator.empty(); + } + class Populator implements ChangePopulator { + + @Override + public @NotNull MutableBlockChange create() { + return new MutableBlockChange(0, 0, 0, BlockTypes.AIR.getInternalId()); + } + + @Override + public @Nullable MutableBlockChange populate(@NotNull final MutableBlockChange change) { + try { + change.x = posDel.readX(is) + originX; + change.y = posDel.readY(is); + change.z = posDel.readZ(is) + originZ; + idDel.readCombined(is, change, dir); + return change; + } catch (EOFException ignored) { + } catch (Exception e) { + e.printStackTrace(); + } + try { + is.close(); + } catch (IOException e) { + e.printStackTrace(); + } + return null; + } + + @Override + public boolean accepts(final Change change) { + return change instanceof MutableBlockChange; + } + + } + return new Populator(); + } + + private ChangePopulator biomeChangePopulator(boolean dir) throws IOException { + final FaweInputStream is = getBiomeIS(); + if (is == null) { + return ChangePopulator.empty(); + } + class Populator implements ChangePopulator { + + @Override + public @NotNull MutableBiomeChange create() { + return new MutableBiomeChange(); + } + + @Override + public @Nullable MutableBiomeChange populate(@NotNull final MutableBiomeChange change) { + try { + int int1 = is.read(); + if (int1 != -1) { + int x = ((int1 << 24) + (is.read() << 16) + (is.read() << 8) + is.read()) << 2; + int z = ((is.read() << 24) + (is.read() << 16) + (is.read() << 8) + is.read()) << 2; + int y = (is.read() - 128) << 2; + int from = is.readVarInt(); + int to = is.readVarInt(); + change.setBiome(x, y, z, from, to); + return change; + } + } catch (EOFException ignored) { + } catch (Exception e) { + e.printStackTrace(); + } + try { + is.close(); + } catch (IOException e) { + e.printStackTrace(); + } + return null; + } + + @Override + public boolean accepts(final Change change) { + return change instanceof MutableBiomeChange; + } + + } + return new Populator(); + } + @Override public Iterator getIterator(final boolean dir) { try { diff --git a/worldedit-core/src/main/java/com/fastasyncworldedit/core/history/changeset/NullChangeSet.java b/worldedit-core/src/main/java/com/fastasyncworldedit/core/history/changeset/NullChangeSet.java index dd4fddf1a..2a60b3913 100644 --- a/worldedit-core/src/main/java/com/fastasyncworldedit/core/history/changeset/NullChangeSet.java +++ b/worldedit-core/src/main/java/com/fastasyncworldedit/core/history/changeset/NullChangeSet.java @@ -54,6 +54,17 @@ public class NullChangeSet extends AbstractChangeSet { return getIterator(redo); } + @Override + public ChangeExchangeCoordinator getCoordinatedChanges(final BlockBag blockBag, final int mode, final boolean dir) { + return new ChangeExchangeCoordinator(((exchanger, changes) -> { + try { + exchanger.exchange(null); + } catch (InterruptedException ignored) { + Thread.currentThread().interrupt(); + } + })); + } + @Override public final Iterator getIterator(boolean undo) { return Collections.emptyIterator(); diff --git a/worldedit-core/src/main/java/com/sk89q/worldedit/function/operation/ChangeSetExecutor.java b/worldedit-core/src/main/java/com/sk89q/worldedit/function/operation/ChangeSetExecutor.java index 12359fdee..ea52bf335 100644 --- a/worldedit-core/src/main/java/com/sk89q/worldedit/function/operation/ChangeSetExecutor.java +++ b/worldedit-core/src/main/java/com/sk89q/worldedit/function/operation/ChangeSetExecutor.java @@ -19,7 +19,9 @@ package com.sk89q.worldedit.function.operation; +import com.fastasyncworldedit.core.configuration.Settings; import com.fastasyncworldedit.core.history.changeset.AbstractChangeSet; +import com.fastasyncworldedit.core.history.changeset.ChangeExchangeCoordinator; import com.sk89q.worldedit.WorldEditException; import com.sk89q.worldedit.extent.inventory.BlockBag; import com.sk89q.worldedit.history.UndoContext; @@ -56,6 +58,7 @@ public class ChangeSetExecutor implements Operation { //FAWE end private final Iterator iterator; + private final ChangeExchangeCoordinator changeExchangeCoordinator; private final Type type; private final UndoContext context; @@ -74,18 +77,42 @@ public class ChangeSetExecutor implements Operation { this.type = type; this.context = context; - if (changeSet instanceof AbstractChangeSet) { - iterator = ((AbstractChangeSet) changeSet).getIterator(blockBag, inventory, type == Type.REDO); + if (changeSet instanceof AbstractChangeSet abstractChangeSet) { + if (Settings.settings().EXPERIMENTAL.UNDO_BATCH_SIZE > 0) { + this.changeExchangeCoordinator = abstractChangeSet.getCoordinatedChanges(blockBag, inventory, type == Type.REDO); + this.iterator = null; + } else { + this.iterator = abstractChangeSet.getIterator(blockBag, inventory, type == Type.REDO); + this.changeExchangeCoordinator = null; + } } else if (type == Type.UNDO) { iterator = changeSet.backwardIterator(); + this.changeExchangeCoordinator = null; } else { iterator = changeSet.forwardIterator(); + this.changeExchangeCoordinator = null; } } //FAWE end @Override public Operation resume(RunContext run) throws WorldEditException { + // FAWE start - ChangeExchangeCoordinator + if (this.changeExchangeCoordinator != null) { + try (this.changeExchangeCoordinator) { + Change[] changes = new Change[Settings.settings().EXPERIMENTAL.UNDO_BATCH_SIZE]; + while ((changes = this.changeExchangeCoordinator.take(changes)) != null) { + for (final Change change : changes) { + if (change == null) { + return null; // end + } + type.perform(change, context); + } + } + return null; + } + } + // FAWE end while (iterator.hasNext()) { Change change = iterator.next(); //FAWE start - types > individual history step