diff --git a/build.gradle b/build.gradle index 5a048cb..768521e 100644 --- a/build.gradle +++ b/build.gradle @@ -1,5 +1,6 @@ plugins { id 'java' + id 'com.github.johnrengelman.shadow' version '7.1.2' } group = 'mc.unraveled.reforged' @@ -20,9 +21,14 @@ repositories { dependencies { implementation 'org.projectlombok:lombok:1.18.20' implementation 'org.postgresql:postgresql:42.2.20' + shadow 'io.projectreactor:reactor-core:3.4.10' compileOnly 'io.papermc.paper:paper-api:1.19.3-R0.1-SNAPSHOT' } +shadowJar { + relocate("reactor", "mc.unraveled.reforged.rs") +} + def targetJavaVersion = 17 java { def javaVersion = JavaVersion.toVersion(targetJavaVersion) diff --git a/src/main/java/mc/unraveled/reforged/api/IService.java b/src/main/java/mc/unraveled/reforged/api/IService.java new file mode 100644 index 0000000..3261836 --- /dev/null +++ b/src/main/java/mc/unraveled/reforged/api/IService.java @@ -0,0 +1,29 @@ +package mc.unraveled.reforged.api; + +import mc.unraveled.reforged.service.base.ServicePool; +import reactor.core.publisher.Mono; + +public interface IService extends Runnable { + Mono getParentPool(); + + String getName(); + + int getServiceId(); + + Mono start(); + + Mono stop(); + + boolean isPeriodic(); + + long getInitialDelay(); + + long getPeriod(); + + @Override + default void run() { + start().subscribe(); + } + + Mono setParentPool(ServicePool servicePool); +} diff --git a/src/main/java/mc/unraveled/reforged/command/base/AbstractCommandBase.java b/src/main/java/mc/unraveled/reforged/command/base/AbstractCommandBase.java index d80cfe9..f1271e5 100644 --- a/src/main/java/mc/unraveled/reforged/command/base/AbstractCommandBase.java +++ b/src/main/java/mc/unraveled/reforged/command/base/AbstractCommandBase.java @@ -1,6 +1,7 @@ package mc.unraveled.reforged.command.base; import mc.unraveled.reforged.api.ICommandBase; +import mc.unraveled.reforged.permission.TPermission; import mc.unraveled.reforged.util.BasicColors; import net.kyori.adventure.text.Component; import net.kyori.adventure.text.TextComponent; diff --git a/src/main/java/mc/unraveled/reforged/permission/Rank.java b/src/main/java/mc/unraveled/reforged/permission/Rank.java index 9dca79d..3dc40e6 100644 --- a/src/main/java/mc/unraveled/reforged/permission/Rank.java +++ b/src/main/java/mc/unraveled/reforged/permission/Rank.java @@ -1,16 +1,17 @@ package mc.unraveled.reforged.permission; +import mc.unraveled.reforged.util.BasicColors; import net.kyori.adventure.text.format.TextColor; public enum Rank { - EXECUTIVE("executive", "Exec", TextColor.color(254, 0, 0), 7), - DEV("developer", "Dev", TextColor.color(165, 0, 218), 6), - ADMIN("admin", "Admin", TextColor.color(214, 108, 32), 5), - MOD("mod", "Mod", TextColor.color(0, 198, 98), 4), - BUILDER("builder", "Bldr", TextColor.color(0, 168, 238), 3), - VIP("vip", "VIP", TextColor.color(238, 98, 150), 2), - OP("op", "OP", TextColor.color(198, 64, 64), 1), - NON_OP("guest", "", TextColor.color(178, 178, 178), 0); + EXECUTIVE("executive", "Exec", BasicColors.DARK_RED.getColor(), 7), + DEV("developer", "Dev", BasicColors.PURPLE.getColor(), 6), + ADMIN("admin", "Admin", BasicColors.GOLD.getColor(), 5), + MOD("mod", "Mod", BasicColors.GREEN.getColor(), 4), + BUILDER("builder", "Bldr", BasicColors.AQUA.getColor(), 3), + VIP("vip", "VIP", BasicColors.PINK.getColor(), 2), + OP("op", "OP", BasicColors.RED.getColor(), 1), + NON_OP("guest", "", BasicColors.WHITE.getColor(), 0); final RankAttachment attachment; diff --git a/src/main/java/mc/unraveled/reforged/command/base/TPermission.java b/src/main/java/mc/unraveled/reforged/permission/TPermission.java similarity index 97% rename from src/main/java/mc/unraveled/reforged/command/base/TPermission.java rename to src/main/java/mc/unraveled/reforged/permission/TPermission.java index 65fa35e..f4ef2d9 100644 --- a/src/main/java/mc/unraveled/reforged/command/base/TPermission.java +++ b/src/main/java/mc/unraveled/reforged/permission/TPermission.java @@ -1,4 +1,4 @@ -package mc.unraveled.reforged.command.base; +package mc.unraveled.reforged.permission; import org.bukkit.command.CommandSender; diff --git a/src/main/java/mc/unraveled/reforged/service/SimpleService.java b/src/main/java/mc/unraveled/reforged/service/SimpleService.java new file mode 100644 index 0000000..fd153cc --- /dev/null +++ b/src/main/java/mc/unraveled/reforged/service/SimpleService.java @@ -0,0 +1,28 @@ +package mc.unraveled.reforged.service; + +import mc.unraveled.reforged.service.base.AbstractService; +import mc.unraveled.reforged.service.base.ServicePool; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import reactor.core.publisher.Mono; + +public class SimpleService extends AbstractService { + public SimpleService(@Nullable ServicePool parentPool, @NotNull String service_name) { + super(parentPool, service_name); + } + + @Override + public int getServiceId() { + return 0; + } + + @Override + public Mono start() { + return null; + } + + @Override + public Mono stop() { + return null; + } +} diff --git a/src/main/java/mc/unraveled/reforged/service/base/AbstractService.java b/src/main/java/mc/unraveled/reforged/service/base/AbstractService.java new file mode 100644 index 0000000..a027978 --- /dev/null +++ b/src/main/java/mc/unraveled/reforged/service/base/AbstractService.java @@ -0,0 +1,235 @@ +package mc.unraveled.reforged.service.base; + + +import mc.unraveled.reforged.api.IService; +import mc.unraveled.reforged.plugin.Traverse; +import org.bukkit.plugin.java.JavaPlugin; +import org.jetbrains.annotations.Contract; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import reactor.core.publisher.Mono; + +import java.util.Objects; + +public abstract class AbstractService implements IService { + /** + * The name of the service. + */ + private final String service_name; + /** + * How long the service should wait before executing the first time. + */ + private final long delay; + /** + * How long the service should wait between executions. + */ + private final long period; + /** + * If the service should be executed once or continuously. + */ + private final boolean repeating; + /** + * If the service should be allowed to stop while executing. + */ + private final boolean mayInterruptWhenRunning; + /** + * The service's execution thread. + */ + private ServicePool parentPool; + /** + * Whether the service has been cancelled or not. + */ + private boolean cancelled = false; + + /** + * Creates a new instance of an executable service. + * Each service is registered with a {@link String}, + * to allow for easy identification within the associated {@link ServicePool}. + * + * @param service_name A namespaced key which can be used to identify the service. + */ + public AbstractService(@NotNull String service_name) { + this((new ServicePool("defaultPool" + Scheduling.denom, JavaPlugin.getPlugin(Traverse.class))), + service_name, + 0L, + 0L, + false, + false); + + Scheduling.denom++; + } + + /** + * Creates a new instance of an executable service. + * Each service is registered with a {@link String}, + * to allow for easy identification within the associated {@link ServicePool}. + * + * @param parentPool The {@link ServicePool} which this service is executing on. + * @param service_name A namespaced key which can be used to identify the service. + */ + public AbstractService(@Nullable ServicePool parentPool, @NotNull String service_name) { + this(parentPool, + service_name, + 0L, + 0L, + false, + false); + } + + /** + * Creates a new instance of an executable service. + * The timings are measured in ticks (20 ticks per second). + * You do not need to explicitly define a delay. + * Each service is registered with a {@link String}, + * to allow for easy identification within the associated {@link ServicePool}. + * + * @param parentPool The {@link ServicePool} which this service is executing on. + * @param service_name A namespaced key which can be used to identify the service. + * @param delay A specified amount of time (in ticks) to wait before the service runs. + */ + public AbstractService( + @Nullable ServicePool parentPool, + @NotNull String service_name, + @Nullable Long delay) { + this(parentPool, + service_name, + delay, + 0L, + false, + false); + } + + /** + * Creates a new instance of an executable service. + * The timings are measured in ticks (20 ticks per second). + * You do not need to explicitly define a delay or a period, + * however if you have flagged {@link #repeating} as true, and the period is null, + * then the period will automatically be set to 20 minutes. + * Each service is registered with a {@link String}, + * to allow for easy identification within the associated {@link ServicePool}. + * + * @param parentPool The {@link ServicePool} which this service is executing on. + * @param service_name A namespaced key which can be used to identify the service. + * @param delay A specified amount of time (in ticks) to wait before the service runs. + * @param period How long the service should wait between service executions (in ticks). + * @param repeating If the service should be scheduled for repeated executions or not. + */ + public AbstractService( + @Nullable ServicePool parentPool, + @NotNull String service_name, + @NotNull Long delay, + @NotNull Long period, + @NotNull Boolean repeating) { + this(parentPool, + service_name, + delay, period, + repeating, + false); + } + + /** + * Creates a new instance of an executable service. + * The timings are measured in ticks (20 ticks per second). + * You do not need to explicitly define a delay or a period, + * however if you have flagged {@link #repeating} as true, and the period is null, + * then the period will automatically be set to 20 minutes. + * Each service is registered with a {@link String}, + * to allow for easy identification within the associated {@link ServicePool}. + * + * @param parentPool The {@link ServicePool} which this service is executing on. + * @param service_name A namespaced key which can be used to identify the service. + * @param delay A specified amount of time (in ticks) to wait before the service runs. + * @param period How long the service should wait between service executions (in ticks). + * @param repeating If the service should be scheduled for repeated executions or not. + * @param mayInterruptWhenRunning If the service can be cancelled during execution. + */ + public AbstractService( + @Nullable ServicePool parentPool, + @NotNull String service_name, + @Nullable Long delay, + @Nullable Long period, + @NotNull Boolean repeating, + @NotNull Boolean mayInterruptWhenRunning) { + this.service_name = service_name; + this.repeating = repeating; + this.delay = Objects.requireNonNullElse(delay, 0L); + this.period = Objects.requireNonNullElse(period, (20L * 60L) * 20L); + this.mayInterruptWhenRunning = mayInterruptWhenRunning; + + if (parentPool == null) { + this.parentPool = new ServicePool("defaultPool" + Scheduling.denom, JavaPlugin.getPlugin(Traverse.class)); + Scheduling.denom++; + } else { + this.parentPool = parentPool; + } + + this.parentPool.getServices().add(this); + } + + @Override + public long getInitialDelay() { + return delay; + } + + @Override + public long getPeriod() { + return period; + } + + @Override + public boolean isPeriodic() { + return repeating; + } + + /** + * Cancels the execution of this service. + * + * @return true if the service was cancelled, false if not. + */ + public boolean isCancelled() { + return this.cancelled; + } + + /** + * Cancels the execution of this service. + * + * @param cancel Whether the service should be cancelled or not. + */ + public Mono setCancelled(boolean cancel) { + if (!mayInterruptWhenRunning) { + return Mono.empty(); + } + + cancelled = cancel; + return cancel(); + } + + /** + * Actual stop call, to ensure that the service actually #isCancelled(). + */ + @Contract(pure = true) + Mono cancel() { + if (isCancelled()) { + return stop().then(); + } + return Mono.empty(); + } + + @Override + public Mono getParentPool() { + return Mono.just(parentPool); + } + + @Override + public String getName() { + return service_name; + } + + @Override + public Mono setParentPool(ServicePool servicePool) { + return Mono.create(sink -> { + this.parentPool = servicePool; + sink.success(); + }); + } +} diff --git a/src/main/java/mc/unraveled/reforged/service/base/BukkitDisposable.java b/src/main/java/mc/unraveled/reforged/service/base/BukkitDisposable.java new file mode 100644 index 0000000..ed067d7 --- /dev/null +++ b/src/main/java/mc/unraveled/reforged/service/base/BukkitDisposable.java @@ -0,0 +1,25 @@ +package mc.unraveled.reforged.service.base; + +import org.bukkit.scheduler.BukkitTask; +import reactor.core.Disposable; + +public record BukkitDisposable(BukkitTask task) implements Disposable { + /** + * Disposes of the task upstream on the Bukkit scheduler. + */ + @Override + public void dispose() { + task.cancel(); + } + + /** + * Checks if the task is cancelled. + * + * @return true if the task is cancelled, false otherwise. + */ + @Override + public boolean isDisposed() { + return task.isCancelled(); + } +} + diff --git a/src/main/java/mc/unraveled/reforged/service/base/ReactorBukkitScheduler.java b/src/main/java/mc/unraveled/reforged/service/base/ReactorBukkitScheduler.java new file mode 100644 index 0000000..8c572ff --- /dev/null +++ b/src/main/java/mc/unraveled/reforged/service/base/ReactorBukkitScheduler.java @@ -0,0 +1,90 @@ +package mc.unraveled.reforged.service.base; + +import org.bukkit.plugin.java.JavaPlugin; +import org.bukkit.scheduler.BukkitScheduler; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import reactor.core.Disposable; +import reactor.core.scheduler.Scheduler; + +import java.util.concurrent.TimeUnit; + +public final class ReactorBukkitScheduler + implements Scheduler, Scheduler.Worker { + /** + * The plugin instance. + */ + private final JavaPlugin plugin; + /** + * The bukkit scheduler. + */ + private final BukkitScheduler scheduler; + + public ReactorBukkitScheduler(JavaPlugin plugin) { + this.plugin = plugin; + this.scheduler = plugin.getServer().getScheduler(); + } + + /** + * Delegates to the {@link BukkitScheduler}. + * + * @param task The task to delegate. + * @return A disposable that can be used to cancel the task. + */ + @Override + public @NotNull Disposable schedule(@NotNull Runnable task) { + return new BukkitDisposable(scheduler.runTask(plugin, task)); + } + + /** + * Delegates to the {@link BukkitScheduler} with a delay. + * + * @param task The task to delegate + * @param delay The amount of time to wait before running the task + * @param unit Unused parameter in this implementation. + * Regardless of what value you use, this parameter will never be called. + * @return A disposable that can be used to cancel the task. + */ + @Override + public @NotNull Disposable schedule(@NotNull Runnable task, long delay, @Deprecated @Nullable TimeUnit unit) { + return new BukkitDisposable(scheduler.runTaskLater(plugin, task, delay)); + } + + /** + * Delegates to the {@link BukkitScheduler} with a delay and a period. + * The initial delay may be 0L, but the period must be greater than 0L. + * + * @param task The task to delegate. + * @param initialDelay The amount of time to wait before running the task. + * @param period The amount of time to wait between each execution of the task. + * @param unit Unused parameter in this implementation. + * Regardless of what value you use, this parameter will never be called. + * @return A disposable that can be used to cancel the task. + */ + @Override + public @NotNull Disposable schedulePeriodically(@NotNull Runnable task, long initialDelay, long period, @Deprecated @Nullable TimeUnit unit) { + if (period <= 0L) { + throw new IllegalArgumentException("Period must be greater than 0L"); + } + + return new BukkitDisposable(scheduler.runTaskTimer(plugin, task, initialDelay, period)); + } + + /** + * A new {@link Worker}. + * + * @return This class instance, as it implements {@link Worker}. + */ + @Override + public @NotNull Scheduler.Worker createWorker() { + return this; + } + + /** + * This method does nothing and is unused. + */ + @Override + @Deprecated + public void dispose() { + } +} diff --git a/src/main/java/mc/unraveled/reforged/service/base/Scheduling.java b/src/main/java/mc/unraveled/reforged/service/base/Scheduling.java new file mode 100644 index 0000000..c4812d3 --- /dev/null +++ b/src/main/java/mc/unraveled/reforged/service/base/Scheduling.java @@ -0,0 +1,66 @@ +package mc.unraveled.reforged.service.base; + +import mc.unraveled.reforged.api.IService; +import mc.unraveled.reforged.plugin.Traverse; +import org.jetbrains.annotations.NotNull; +import reactor.core.Disposable; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +public final class Scheduling { + /** + * A denominator to use when registering default service pool names. + */ + static int denom = 0; + /** + * The service manager to use for controlling service pools. + */ + private final ServiceManager serviceManager; + /** + * The plugin to use for registering tasks. This should be an instance of your plugin. + */ + private final Traverse plugin; + + /** + * Creates a new instance of the scheduling system. This is used to manage the scheduling of services. + * + * @param plugin The plugin to use for this scheduling system. This should be an instance of your plugin. + */ + public Scheduling(Traverse plugin) { + this.serviceManager = new ServiceManager(); + this.plugin = plugin; + } + + public @NotNull Mono getServiceManager() { + return Mono.just(serviceManager); + } + + @NotNull + public Mono queue(@NotNull IService service) { + return getServiceManager() + .flatMap(manager -> manager.getAssociatedServicePool(service)) + .flatMap(pool -> pool.queue(service)); + } + + public @NotNull Flux queueAll() { + return getServiceManager() + .flatMapMany(ServiceManager::getServicePools) + .flatMap(ServicePool::queueServices); + } + + public @NotNull Mono runOnce(IService service) { + return Mono.create(sink -> service.start().then(service.stop()).subscribe(sink::success)); + } + + public Mono forceStop(@NotNull IService service) { + return service.stop(); + } + + public Mono forceStart(@NotNull IService service) { + return service.start(); + } + + public @NotNull Traverse getPlugin() { + return plugin; + } +} diff --git a/src/main/java/mc/unraveled/reforged/service/base/ServiceManager.java b/src/main/java/mc/unraveled/reforged/service/base/ServiceManager.java new file mode 100644 index 0000000..bf815b6 --- /dev/null +++ b/src/main/java/mc/unraveled/reforged/service/base/ServiceManager.java @@ -0,0 +1,135 @@ +package mc.unraveled.reforged.service.base; + +import mc.unraveled.reforged.api.IService; +import mc.unraveled.reforged.plugin.Traverse; +import org.jetbrains.annotations.Contract; +import org.jetbrains.annotations.NotNull; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public final class ServiceManager { + /** + * A set of {@link ServicePool}s which are currently active. + */ + private final Set servicePools; + + /** + * Creates a new instance of the Service Manager class. + * This class acts as a Service Pool factory, and can be used to create + * both single and multithreaded Service Pools, empty service pools, as well as + * retrieve existing Service Pools. It also provides methods for you to add and remove + * {@link IService}s from the {ServicePool} parameter. + */ + public ServiceManager() { + servicePools = new HashSet<>(); + } + + /** + * @param poolName The name of the service pool. + * @param plugin The plugin which will be used to register the service pool. + * @return A {@link Mono} object which contains a {@link ServicePool} element. + * This Service Pool will execute each service within the main server thread. + */ + @Contract(pure = true, value = "_, _ -> new") + public @NotNull Mono emptyBukkitServicePool(String poolName, Traverse plugin) { + ServicePool pool = new ServicePool(poolName, plugin); + servicePools.add(pool); + return Mono.just(pool); + + } + + /** + * @param poolName The name of the service pool. + * @param plugin The plugin which will be used to register the service pool. + * @param services The services to register within the service pool. + * @return A {@link Mono} object which contains a {@link ServicePool} element. + * This Service Pool will execute each service within the main server thread. + */ + @Contract(pure = true, value = "_, _, _ -> new") + public @NotNull Mono bukkitServicePool(String poolName, Traverse plugin, IService... services) { + ServicePool pool = new ServicePool(poolName, plugin); + Flux.fromIterable(Arrays.asList(services)).doOnEach(s -> pool.addService(s.get())); + servicePools.add(pool); + return Mono.just(pool); + } + + /** + * Adds a service to an existing service pool. + * + * @param poolName The service pool to add to. + * @param services The services to register within the service pool. + * @return A {@link Mono} object which contains the {@link ServicePool} element that now contains the registered services. + */ + @Contract("_, _ -> new") + public @NotNull Mono addToExistingPool(@NotNull String poolName, IService... services) { + return Mono.create(sink -> { + final ServicePool[] servicePool = new ServicePool[1]; + findPool(poolName).subscribe(pool -> { + if (pool == null) throw new RuntimeException("There is no pool currently registered with that name."); + servicePool[0] = pool; + }); + List serviceList = Arrays.asList(services); + Flux.fromIterable(serviceList).doOnEach(s -> servicePool[0].addService(s.get())); + sink.success(servicePool[0]); + }); + } + + /** + * Finds a {@link ServicePool} within the ServiceManager's pool list. + * + * @param poolName The name of the pool. + * @return A Mono object which holds the requested ServicePool, or an empty Mono if the pool does not exist. + */ + @Contract() + public @NotNull Mono findPool(String poolName) { + return getServicePools().filter(pool -> pool.getName().equalsIgnoreCase(poolName)).next(); + } + + /** + * @param pool The service pool to take from. + * @param services The services to remove from the pool. + * @return A {@link Mono} object which contains the {@link ServicePool} that no longer contains the removed services. + */ + @Contract("_, _ -> new") + public @NotNull Mono takeFromExistingPool(@NotNull ServicePool pool, IService... services) { + Flux.fromIterable(Arrays.asList(services)).doOnEach(s -> { + pool.removeService(s.get()); + }); + return Mono.just(pool); + } + + /** + * @return A {@link Flux} object which contains all the service pools currently available. + */ + @Contract(" -> new") + public @NotNull Flux getServicePools() { + return Flux.fromIterable(servicePools); + } + + /** + * @param service The service to locate. + * @return True if the service is somewhere within a service pool, false otherwise. + */ + @Contract(pure = true) + public boolean locateServiceWithinPools(IService service) { + return servicePools.stream().map(p -> p.isValidService(service)).findFirst().orElseGet(() -> false); + } + + /** + * @param service The service pool to call from. + * @return A {@link Mono} object which contains a {@link ServicePool} element which contains the specified service. + * If no service pool can be found, an empty Mono is returned. + */ + @Contract("_ -> new") + public @NotNull Mono getAssociatedServicePool(IService service) { + if (!locateServiceWithinPools(service)) return Mono.empty(); + return getServicePools() + .filter(p -> p.getServices().contains(service)) + .next(); + } +} diff --git a/src/main/java/mc/unraveled/reforged/service/base/ServicePool.java b/src/main/java/mc/unraveled/reforged/service/base/ServicePool.java new file mode 100644 index 0000000..83636ed --- /dev/null +++ b/src/main/java/mc/unraveled/reforged/service/base/ServicePool.java @@ -0,0 +1,102 @@ +package mc.unraveled.reforged.service.base; + +import mc.unraveled.reforged.api.IService; +import mc.unraveled.reforged.plugin.Traverse; +import org.jetbrains.annotations.Contract; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import reactor.core.Disposable; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; + +import java.util.HashSet; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +public final class ServicePool { + private final Set services; + + private final Scheduler scheduler; + + private final String name; + + public ServicePool(String name, Traverse plugin) { + this.services = new HashSet<>(); + this.scheduler = new ReactorBukkitScheduler(plugin); + this.name = name; + } + + void addService(IService service) { + this.services.add(service); + } + + boolean isValidService(IService service) { + return this.services.contains(service); + } + + public Set getServices() { + return this.services; + } + + public @NotNull Mono queue(@NotNull IService service) { + return Mono.just(service).map(s -> { + if (s.isPeriodic()) { + return scheduler.schedulePeriodically(s, + s.getInitialDelay() * 50, + s.getPeriod() * 50, + TimeUnit.MILLISECONDS); + } else { + return scheduler.schedule(s, s.getInitialDelay() * 50, TimeUnit.MILLISECONDS); + } + }); + } + + public @NotNull Flux queueServices() { + Set disposables = new HashSet<>(); + return Flux.fromIterable(getServices()) + .filter(Objects::nonNull) + .doOnEach(service -> disposables.add(queue(service.get()).block())) + .flatMap(service -> Flux.fromIterable(disposables)); + } + + public @NotNull Mono stopServices(@NotNull Flux disposableThread) { + getServices().forEach(service -> service.stop().subscribe()); + return disposableThread.doOnNext(Disposable::dispose).then(); + } + + public @NotNull Mono stopService(@NotNull String service_name, @Nullable Mono disposable) { + return Mono.create(sink -> { + getService(service_name).doOnNext(IService::stop).subscribe(); + if (disposable != null) { + disposable.doOnNext(Disposable::dispose).subscribe(); + } + sink.success(); + }); + } + + public @NotNull Mono getService(String service_name) { + return Flux.fromIterable(getServices()) + .filter(service -> service.getName().equals(service_name)) + .next(); + } + + void removeService(IService service) { + getServices().remove(service); + } + + public @NotNull Mono recycle() { + this.getServices().clear(); + return Mono.create(sink -> sink.success(this)); + } + + @Contract(pure = true) + public Scheduler getScheduler() { + return scheduler; + } + + public String getName() { + return name; + } +}