diff --git a/src/main/java/io/github/simplex/api/Context.java b/src/main/java/io/github/simplex/api/Context.java new file mode 100644 index 0000000..8b1a023 --- /dev/null +++ b/src/main/java/io/github/simplex/api/Context.java @@ -0,0 +1,31 @@ +package io.github.simplex.api; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.Collection; + +public interface Context { + /** + * @return A Mono object which contains a single element represented by the definer of this Context class. + */ + Mono getContext(); + + /** + * @param context A separate (or identical) object identified by the definer of this Context class. + * @return A Mono object which can be used to set the element of this Context class in a non-blocking manner. + */ + Mono setContext(S context); + + /** + * @return A collection of objects related to the definer of this Context class. + */ + Collection contextCollection(); + + /** + * @return A Flux object which contains the values of the {@link Context#contextCollection()}, for non-blocking interpretation. + */ + default Flux fluxFromCollection() { + return Flux.fromIterable(contextCollection()); + } +} diff --git a/src/main/java/io/github/simplex/api/ExecutableService.java b/src/main/java/io/github/simplex/api/ExecutableService.java index 5f39376..b3b3c8c 100644 --- a/src/main/java/io/github/simplex/api/ExecutableService.java +++ b/src/main/java/io/github/simplex/api/ExecutableService.java @@ -2,22 +2,19 @@ package io.github.simplex.api; import io.github.simplex.simplexss.ServicePool; import org.bukkit.NamespacedKey; -import org.bukkit.plugin.Plugin; +import org.jetbrains.annotations.Contract; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import reactor.core.publisher.Mono; import java.util.Objects; -import java.util.concurrent.Delayed; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; public abstract class ExecutableService implements IService { private final NamespacedKey service_name; - private final Plugin plugin; private final long delay; private final long period; private final boolean repeating; + private final boolean mayInterruptWhenRunning; private boolean cancelled = false; @@ -29,42 +26,31 @@ public abstract class ExecutableService implements IService { * then the period will automatically be set to 20 minutes. * Each service is registered with a {@link NamespacedKey}, * to allow for easy identification within the associated {@link ServicePool}. - * Each service also has a plugin parameter to allow for easy dependency injection. * - * @param plugin Your plugin - * @param service_name A namespaced key which can be used to identify the service. - * @param delay A specified amount of time (in ticks) to wait before the service runs. - * @param period How long the service should wait between service executions (in ticks). - * @param repeating If the service should be scheduled for repeated executions or not. + * @param service_name A namespaced key which can be used to identify the service. + * @param delay A specified amount of time (in ticks) to wait before the service runs. + * @param period How long the service should wait between service executions (in ticks). + * @param repeating If the service should be scheduled for repeated executions or not. + * @param mayInterruptWhenRunning If the service can be cancelled during execution. */ - public ExecutableService(@NotNull Plugin plugin, @NotNull NamespacedKey service_name, @Nullable Long delay, @Nullable Long period, @NotNull Boolean repeating) { - this.plugin = plugin; + public ExecutableService( + @NotNull NamespacedKey service_name, + @Nullable Long delay, + @Nullable Long period, + @NotNull Boolean repeating, + @NotNull Boolean mayInterruptWhenRunning) { this.service_name = service_name; this.repeating = repeating; this.delay = Objects.requireNonNullElse(delay, 0L); this.period = Objects.requireNonNullElse(period, (20L * 60L) * 20L); + this.mayInterruptWhenRunning = mayInterruptWhenRunning; } - /** - * @return The NamespacedKey of this service. - */ @Override public NamespacedKey getNamespacedKey() { return service_name; } - /** - * @return The plugin which was defined in the constructor. - * This should be an instance of your main plugin class. - */ - @Override - public Plugin getProvidingPlugin() { - return plugin; - } - - /** - * @return - */ @Override public long getDelay() { return delay; @@ -80,18 +66,37 @@ public abstract class ExecutableService implements IService { return repeating; } - @Override - public void setCancelled(boolean mayInterruptIfRunning) { - if (!mayInterruptIfRunning) { - cancelled = false; - } - - stop(); - cancelled = true; - } - - @Override + /** + * Cancels the execution of this service. + * + * @return true if the service was cancelled, false if not. + */ public boolean isCancelled() { return this.cancelled; } + + /** + * Cancels the execution of this service. + * + * @param cancel Whether the service should be cancelled or not. + */ + public Mono setCancelled(boolean cancel) { + if (!mayInterruptWhenRunning) { + return Mono.empty(); + } + + cancelled = cancel; + return cancel(); + } + + /** + * Actual stop call, to ensure that the service actually #isCancelled(). + */ + @Contract(pure = true) + Mono cancel() { + if (isCancelled()) { + return stop().then(); + } + return Mono.empty(); + } } diff --git a/src/main/java/io/github/simplex/api/ISchedule.java b/src/main/java/io/github/simplex/api/ISchedule.java index 7c24efd..355824d 100644 --- a/src/main/java/io/github/simplex/api/ISchedule.java +++ b/src/main/java/io/github/simplex/api/ISchedule.java @@ -5,13 +5,34 @@ import io.github.simplex.simplexss.ServicePool; import reactor.core.publisher.Mono; public interface ISchedule { + + /** + * @return The service manager which exerts control over the service pools and their associated services. + */ Mono getServiceManager(); + /** + * @param service The service to use to locate the associated service pool and queue the service for execution. + * @return A Mono that can be used to prepare the service for execution within it's associated service pool. + * If the service has no associated pool, a new pool will be created. + */ Mono queue(IService service); + /** + * @param service The service to run once. + * @return A Mono object which can be used to run the service one time using {@link Mono#subscribe()}. + */ Mono runOnce(IService service); + /** + * @param service The service to forcefully stop. + * @return A Mono object which can be used to forcefully stop the service with {@link Mono#subscribe()}. + */ Mono forceStop(IService service); + /** + * @param service The service to forcefully start. + * @return A Mono object which can be used to forcefully start the service with {@link Mono#subscribe()}. + */ Mono forceStart(IService service); } diff --git a/src/main/java/io/github/simplex/api/IService.java b/src/main/java/io/github/simplex/api/IService.java index 960c03b..aad787b 100644 --- a/src/main/java/io/github/simplex/api/IService.java +++ b/src/main/java/io/github/simplex/api/IService.java @@ -1,34 +1,71 @@ package io.github.simplex.api; +import io.github.simplex.simplexss.ServicePool; import org.bukkit.NamespacedKey; -import org.bukkit.event.Cancellable; -import org.bukkit.plugin.Plugin; +import org.bukkit.plugin.java.JavaPlugin; import org.jetbrains.annotations.Contract; import org.jetbrains.annotations.NotNull; import reactor.core.publisher.Mono; -public interface IService extends Runnable, Cancellable { +public interface IService extends Runnable { @Contract(" -> new") static @NotNull NamespacedKey getDefaultNamespacedKey() { return new NamespacedKey("simplex_ss", "default_service_name"); } + /** + * @return The NamespacedKey of this service. + */ NamespacedKey getNamespacedKey(); + /** + * @return If the service should be scheduled for repeated executions or not. + */ boolean isPeriodic(); + /** + * @return How long the service should wait between subsequent executions. + */ long getPeriod(); + /** + * @return How long the service should wait before executing the first time. + */ long getDelay(); + /** + * The actual start method for the service. This should be overridden by subclasses, + * and should include all the required code necessary to execute when the service is queued. + * + * @return An encapsulated Mono object representing the start method for the service. + */ Mono start(); + /** + * The actual end method for the service. This should be overridden by subclasses, + * and should include all the required code necessary to execute when the service is stopped. + * + * @return An encapsulated Mono object representing the end method for the service. + */ Mono stop(); - Plugin getProvidingPlugin(); + /** + * @return The plugin which was defined in the constructor. + * This should be an instance of your main plugin class. + */ + JavaPlugin getPlugin(); + + /** + * @return The {@link ServicePool} which this service is executing on. + */ + Mono getParentPool(); @Override default void run() { - start().subscribe(); + start().subscribeOn(getParentPool() + .map(ServicePool::getScheduler) + .blockOptional() + .orElseThrow(InvalidServicePoolException.supplyException())) + .subscribe(); } } diff --git a/src/main/java/io/github/simplex/api/InvalidServiceException.java b/src/main/java/io/github/simplex/api/InvalidServiceException.java new file mode 100644 index 0000000..363c126 --- /dev/null +++ b/src/main/java/io/github/simplex/api/InvalidServiceException.java @@ -0,0 +1,17 @@ +package io.github.simplex.api; + +import java.util.function.Supplier; + +public class InvalidServiceException extends RuntimeException { + public InvalidServiceException(IService service) { + super("The service " + service.getNamespacedKey().getKey() + " has encountered an exception."); + } + + public InvalidServiceException(Throwable th) { + super(th); + } + + public static Supplier supplyException(IService service) { + return () -> new InvalidServiceException(service); + } +} diff --git a/src/main/java/io/github/simplex/api/InvalidServicePoolException.java b/src/main/java/io/github/simplex/api/InvalidServicePoolException.java new file mode 100644 index 0000000..a73616c --- /dev/null +++ b/src/main/java/io/github/simplex/api/InvalidServicePoolException.java @@ -0,0 +1,21 @@ +package io.github.simplex.api; + +import org.jetbrains.annotations.Contract; +import org.jetbrains.annotations.NotNull; + +import java.util.function.Supplier; + +public class InvalidServicePoolException extends RuntimeException { + public InvalidServicePoolException() { + super("There is no service pool associated with this service. The service will be automatically recycled."); + } + + public InvalidServicePoolException(Throwable ex) { + super(ex); + } + + @Contract(pure = true) + public static @NotNull Supplier supplyException() { + return InvalidServicePoolException::new; + } +} diff --git a/src/main/java/io/github/simplex/impl/Main.java b/src/main/java/io/github/simplex/impl/Main.java index 8be20ad..402e391 100644 --- a/src/main/java/io/github/simplex/impl/Main.java +++ b/src/main/java/io/github/simplex/impl/Main.java @@ -3,18 +3,36 @@ package io.github.simplex.impl; import io.github.simplex.simplexss.SchedulingSystem; import io.github.simplex.simplexss.ServiceManager; import org.bukkit.plugin.java.JavaPlugin; +import reactor.core.Disposable; +import reactor.core.publisher.Flux; + +import java.util.Objects; public class Main extends JavaPlugin { - private SchedulingSystem scheduler; - private ServiceManager serviceManager; + private SchedulingSystem
scheduler; + private Flux disposables; @Override public void onEnable() { - this.serviceManager = new ServiceManager(this); - this.scheduler = new SchedulingSystem(serviceManager, this); + ServiceManager serviceManager = new ServiceManager(); + this.scheduler = new SchedulingSystem<>(serviceManager, this); + scheduler.getServiceManager().subscribe(manager -> manager.getServicePools() + .doOnEach(signal -> disposables = Objects.requireNonNull(signal.get()).startServices()) + .subscribeOn(scheduler.getMainSchedulerThread(), false) + .subscribe()); } @Override public void onDisable() { + scheduler.getServiceManager().subscribe(manager -> { + manager.getServicePools().doOnEach(signal -> Objects.requireNonNull(signal.get()) + .stopServices(disposables) + .subscribeOn(scheduler.getMainSchedulerThread()) + .subscribe()); + }); + } + + public SchedulingSystem
getScheduler() { + return this.scheduler; } } diff --git a/src/main/java/io/github/simplex/impl/ServiceImpl.java b/src/main/java/io/github/simplex/impl/ServiceImpl.java index e585141..0f60181 100644 --- a/src/main/java/io/github/simplex/impl/ServiceImpl.java +++ b/src/main/java/io/github/simplex/impl/ServiceImpl.java @@ -2,29 +2,44 @@ package io.github.simplex.impl; import io.github.simplex.api.ExecutableService; import io.github.simplex.api.IService; -import org.bukkit.NamespacedKey; -import org.jetbrains.annotations.NotNull; +import io.github.simplex.simplexss.ServicePool; +import org.bukkit.plugin.java.JavaPlugin; import reactor.core.publisher.Mono; -import java.util.concurrent.TimeUnit; - public class ServiceImpl extends ExecutableService { + private final Main plugin; + public ServiceImpl(Main plugin) { - super(plugin, IService.getDefaultNamespacedKey(), 20L, 20L * 60L * 10L, true); + super(IService.getDefaultNamespacedKey(), 20L, 20L * 60L * 10L, true, true); + this.plugin = plugin; } @Override public Mono start() { - return null; + return Mono.just(plugin) + .map(JavaPlugin::getLogger) + .doOnNext(l -> l.info("The service has executed successfully!")) + .then(); } @Override public Mono stop() { - return null; + return Mono.just(plugin) + .map(JavaPlugin::getLogger) + .doOnNext(l -> l.info("The service has stopped")) + .then(); } @Override - public void run() { - super.run(); + public Main getPlugin() { + return plugin; + } + + @Override + public Mono getParentPool() { + return getPlugin() + .getScheduler() + .getServiceManager() + .flatMap(manager -> manager.getAssociatedServicePool(this)); } } diff --git a/src/main/java/io/github/simplex/simplexss/SchedulingSystem.java b/src/main/java/io/github/simplex/simplexss/SchedulingSystem.java index 16e6f39..a9daeef 100644 --- a/src/main/java/io/github/simplex/simplexss/SchedulingSystem.java +++ b/src/main/java/io/github/simplex/simplexss/SchedulingSystem.java @@ -2,40 +2,40 @@ package io.github.simplex.simplexss; import io.github.simplex.api.ISchedule; import io.github.simplex.api.IService; -import org.bukkit.plugin.Plugin; +import org.bukkit.plugin.java.JavaPlugin; import org.jetbrains.annotations.Contract; import org.jetbrains.annotations.NotNull; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; -import java.util.Arrays; import java.util.HashSet; import java.util.Objects; import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; -public final class SchedulingSystem implements ISchedule { +public final class SchedulingSystem implements ISchedule { private final ServiceManager serviceManager; - private final Plugin plugin; + private final T plugin; private final Set repeatingPools; - private final Set delayedPools; + private final Scheduler mainScheduler; - public SchedulingSystem(@NotNull ServiceManager serviceManager, Plugin plugin) { + public SchedulingSystem(@NotNull ServiceManager serviceManager, T plugin) { this.serviceManager = serviceManager; this.plugin = plugin; this.repeatingPools = new HashSet<>(); - this.delayedPools = new HashSet<>(); + this.mainScheduler = Schedulers.boundedElastic(); } + /** + * @return A set of {@link ServicePool}s which contain repeating services. + */ + @Contract(pure = true) public Set getRepeatingPools() { return repeatingPools; } - public Set getDelayedPools() { - return delayedPools; - } - @Override - public Mono getServiceManager() { + public @NotNull Mono getServiceManager() { return Mono.just(serviceManager); } @@ -51,10 +51,10 @@ public final class SchedulingSystem implements ISchedule { } @Override - public Mono runOnce(IService service) { + public @NotNull Mono runOnce(IService service) { return Mono.just(service).doOnNext(s -> { - s.start(); - s.stop(); + s.start().subscribe(); + s.stop().subscribe(); }).then(); } @@ -68,7 +68,18 @@ public final class SchedulingSystem implements ISchedule { return service.start(); } - public Plugin getProvidingPlugin() { - return plugin; + /** + * @return A Mono object containing your plugin, for non-blocking communication. + */ + public @NotNull Mono getProvidingPlugin() { + return Mono.just(plugin); + } + + /** + * @return The main thread which the scheduling system operates on. + */ + @Contract(pure = true) + public Scheduler getMainSchedulerThread() { + return mainScheduler; } } diff --git a/src/main/java/io/github/simplex/simplexss/ServiceManager.java b/src/main/java/io/github/simplex/simplexss/ServiceManager.java index 98c4b30..89e466d 100644 --- a/src/main/java/io/github/simplex/simplexss/ServiceManager.java +++ b/src/main/java/io/github/simplex/simplexss/ServiceManager.java @@ -2,7 +2,6 @@ package io.github.simplex.simplexss; import io.github.simplex.api.IService; import org.bukkit.NamespacedKey; -import org.bukkit.plugin.Plugin; import org.jetbrains.annotations.Contract; import org.jetbrains.annotations.NotNull; import reactor.core.publisher.Flux; @@ -14,13 +13,17 @@ import java.util.Set; public final class ServiceManager { private final Set servicePools; - private final Plugin plugin; - public ServiceManager(Plugin plugin) { - this.plugin = plugin; + public ServiceManager() { servicePools = new HashSet<>(); } + /** + * @param poolName The name of the service pool. + * @param services The services to register within the service pool. + * @return A {@link Mono} object which contains a {@link ServicePool} element. + * This service pool will execute each service consecutively within a singular non-blocking thread. + */ @Contract(pure = true, value = "_, _ -> new") public @NotNull Mono createServicePool(NamespacedKey poolName, IService... services) { ServicePool pool = new ServicePool(poolName, false); @@ -28,6 +31,12 @@ public final class ServiceManager { return Mono.just(pool); } + /** + * @param name The name of the service pool. + * @param services The services to register within the service pool. + * @return A {@link Mono} object which contains a {@link ServicePool} element. + * This service pool will execute each service across a set of non-blocking threads. + */ @Contract(pure = true, value = "_, _ -> new") public @NotNull Mono multithreadedServicePool(NamespacedKey name, IService... services) { ServicePool pool = new ServicePool(name, true); @@ -35,12 +44,25 @@ public final class ServiceManager { return Mono.just(pool); } + /** + * @param poolName The name of the service pool. + * @param multithreaded Whether the service pool should be multithreaded, or operate upon a single thread. + * @return A {@link Mono} object which contains a {@link ServicePool} element. + * This pool is empty, meaning it contains no services. Any attempt to run services on this pool while it remains empty will either fail or error. + * You can add services to this pool by using {@link ServiceManager#addToExistingPool(ServicePool, IService...)}, + * or by using {@link ServicePool#addService(IService)}. + */ @Contract(pure = true, value = "_, _ -> new") public @NotNull Mono emptyServicePool(NamespacedKey poolName, boolean multithreaded) { ServicePool pool = new ServicePool(poolName, multithreaded); return Mono.just(pool); } + /** + * @param pool The service pool to add to. + * @param services The services to register within the service pool. + * @return A {@link Mono} object which contains the {@link ServicePool} element that now contains the registered services. + */ @Contract("_, _ -> new") public @NotNull Mono addToExistingPool(@NotNull ServicePool pool, IService... services) { Flux.fromIterable(Arrays.asList(services)).doOnEach(s -> { @@ -49,6 +71,11 @@ public final class ServiceManager { return Mono.just(pool); } + /** + * @param pool The service pool to take from. + * @param services The services to remove from the pool. + * @return A {@link Mono} object which contains the {@link ServicePool} that no longer contains the removed services. + */ @Contract("_, _ -> new") public @NotNull Mono takeFromExistingPool(@NotNull ServicePool pool, IService... services) { Flux.fromIterable(Arrays.asList(services)).doOnEach(s -> { @@ -57,16 +84,28 @@ public final class ServiceManager { return Mono.just(pool); } + /** + * @return A {@link Flux} object which contains all the service pools currently available. + */ @Contract(" -> new") public @NotNull Flux getServicePools() { return Flux.fromIterable(servicePools); } + /** + * @param service The service to locate. + * @return True if the service is somewhere within a service pool, false otherwise. + */ @Contract(pure = true) public boolean locateServiceWithinPools(IService service) { return servicePools.stream().map(p -> p.isValidService(service)).findFirst().orElseGet(() -> false); } + /** + * @param service The service pool to call from. + * @return A {@link Mono} object which contains a {@link ServicePool} element which contains the specified service. + * If no service pool can be found, an empty Mono is returned. + */ @Contract("_ -> new") public @NotNull Mono getAssociatedServicePool(IService service) { if (!locateServiceWithinPools(service)) return Mono.empty(); @@ -74,9 +113,4 @@ public final class ServiceManager { .filter(p -> p.getAssociatedServices().contains(service)) .next(); } - - @Contract("-> _") - public Plugin getProvidingPlugin() { - return plugin; - } } diff --git a/src/main/java/io/github/simplex/simplexss/ServicePool.java b/src/main/java/io/github/simplex/simplexss/ServicePool.java index 7033f82..916aad2 100644 --- a/src/main/java/io/github/simplex/simplexss/ServicePool.java +++ b/src/main/java/io/github/simplex/simplexss/ServicePool.java @@ -1,6 +1,7 @@ package io.github.simplex.simplexss; import io.github.simplex.api.IService; +import io.github.simplex.api.InvalidServiceException; import org.bukkit.NamespacedKey; import org.jetbrains.annotations.NotNull; import reactor.core.Disposable; @@ -84,6 +85,7 @@ public final class ServicePool { } public Mono stopServices(Flux disposableThread) { + getAssociatedServices().forEach(service -> service.stop().subscribe()); return disposableThread.doOnNext(Disposable::dispose).then(); } @@ -105,4 +107,8 @@ public final class ServicePool { this.getAssociatedServices().clear(); return Mono.just(this); } + + public Scheduler getScheduler() { + return scheduler; + } }