mirror of
https://github.com/SimplexDevelopment/Traverse.git
synced 2025-01-22 09:10:05 +00:00
Added scheduling
This commit is contained in:
parent
b730f329fe
commit
d2e4f6669a
@ -1,5 +1,6 @@
|
||||
plugins {
|
||||
id 'java'
|
||||
id 'com.github.johnrengelman.shadow' version '7.1.2'
|
||||
}
|
||||
|
||||
group = 'mc.unraveled.reforged'
|
||||
@ -20,9 +21,14 @@ repositories {
|
||||
dependencies {
|
||||
implementation 'org.projectlombok:lombok:1.18.20'
|
||||
implementation 'org.postgresql:postgresql:42.2.20'
|
||||
shadow 'io.projectreactor:reactor-core:3.4.10'
|
||||
compileOnly 'io.papermc.paper:paper-api:1.19.3-R0.1-SNAPSHOT'
|
||||
}
|
||||
|
||||
shadowJar {
|
||||
relocate("reactor", "mc.unraveled.reforged.rs")
|
||||
}
|
||||
|
||||
def targetJavaVersion = 17
|
||||
java {
|
||||
def javaVersion = JavaVersion.toVersion(targetJavaVersion)
|
||||
|
29
src/main/java/mc/unraveled/reforged/api/IService.java
Normal file
29
src/main/java/mc/unraveled/reforged/api/IService.java
Normal file
@ -0,0 +1,29 @@
|
||||
package mc.unraveled.reforged.api;
|
||||
|
||||
import mc.unraveled.reforged.service.base.ServicePool;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
public interface IService extends Runnable {
|
||||
Mono<ServicePool> getParentPool();
|
||||
|
||||
String getName();
|
||||
|
||||
int getServiceId();
|
||||
|
||||
Mono<Void> start();
|
||||
|
||||
Mono<Void> stop();
|
||||
|
||||
boolean isPeriodic();
|
||||
|
||||
long getInitialDelay();
|
||||
|
||||
long getPeriod();
|
||||
|
||||
@Override
|
||||
default void run() {
|
||||
start().subscribe();
|
||||
}
|
||||
|
||||
Mono<Void> setParentPool(ServicePool servicePool);
|
||||
}
|
@ -1,6 +1,7 @@
|
||||
package mc.unraveled.reforged.command.base;
|
||||
|
||||
import mc.unraveled.reforged.api.ICommandBase;
|
||||
import mc.unraveled.reforged.permission.TPermission;
|
||||
import mc.unraveled.reforged.util.BasicColors;
|
||||
import net.kyori.adventure.text.Component;
|
||||
import net.kyori.adventure.text.TextComponent;
|
||||
|
@ -1,16 +1,17 @@
|
||||
package mc.unraveled.reforged.permission;
|
||||
|
||||
import mc.unraveled.reforged.util.BasicColors;
|
||||
import net.kyori.adventure.text.format.TextColor;
|
||||
|
||||
public enum Rank {
|
||||
EXECUTIVE("executive", "Exec", TextColor.color(254, 0, 0), 7),
|
||||
DEV("developer", "Dev", TextColor.color(165, 0, 218), 6),
|
||||
ADMIN("admin", "Admin", TextColor.color(214, 108, 32), 5),
|
||||
MOD("mod", "Mod", TextColor.color(0, 198, 98), 4),
|
||||
BUILDER("builder", "Bldr", TextColor.color(0, 168, 238), 3),
|
||||
VIP("vip", "VIP", TextColor.color(238, 98, 150), 2),
|
||||
OP("op", "OP", TextColor.color(198, 64, 64), 1),
|
||||
NON_OP("guest", "", TextColor.color(178, 178, 178), 0);
|
||||
EXECUTIVE("executive", "Exec", BasicColors.DARK_RED.getColor(), 7),
|
||||
DEV("developer", "Dev", BasicColors.PURPLE.getColor(), 6),
|
||||
ADMIN("admin", "Admin", BasicColors.GOLD.getColor(), 5),
|
||||
MOD("mod", "Mod", BasicColors.GREEN.getColor(), 4),
|
||||
BUILDER("builder", "Bldr", BasicColors.AQUA.getColor(), 3),
|
||||
VIP("vip", "VIP", BasicColors.PINK.getColor(), 2),
|
||||
OP("op", "OP", BasicColors.RED.getColor(), 1),
|
||||
NON_OP("guest", "", BasicColors.WHITE.getColor(), 0);
|
||||
|
||||
final RankAttachment attachment;
|
||||
|
||||
|
@ -1,4 +1,4 @@
|
||||
package mc.unraveled.reforged.command.base;
|
||||
package mc.unraveled.reforged.permission;
|
||||
|
||||
import org.bukkit.command.CommandSender;
|
||||
|
@ -0,0 +1,28 @@
|
||||
package mc.unraveled.reforged.service;
|
||||
|
||||
import mc.unraveled.reforged.service.base.AbstractService;
|
||||
import mc.unraveled.reforged.service.base.ServicePool;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
public class SimpleService extends AbstractService {
|
||||
public SimpleService(@Nullable ServicePool parentPool, @NotNull String service_name) {
|
||||
super(parentPool, service_name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getServiceId() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Void> start() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Void> stop() {
|
||||
return null;
|
||||
}
|
||||
}
|
@ -0,0 +1,235 @@
|
||||
package mc.unraveled.reforged.service.base;
|
||||
|
||||
|
||||
import mc.unraveled.reforged.api.IService;
|
||||
import mc.unraveled.reforged.plugin.Traverse;
|
||||
import org.bukkit.plugin.java.JavaPlugin;
|
||||
import org.jetbrains.annotations.Contract;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
public abstract class AbstractService implements IService {
|
||||
/**
|
||||
* The name of the service.
|
||||
*/
|
||||
private final String service_name;
|
||||
/**
|
||||
* How long the service should wait before executing the first time.
|
||||
*/
|
||||
private final long delay;
|
||||
/**
|
||||
* How long the service should wait between executions.
|
||||
*/
|
||||
private final long period;
|
||||
/**
|
||||
* If the service should be executed once or continuously.
|
||||
*/
|
||||
private final boolean repeating;
|
||||
/**
|
||||
* If the service should be allowed to stop while executing.
|
||||
*/
|
||||
private final boolean mayInterruptWhenRunning;
|
||||
/**
|
||||
* The service's execution thread.
|
||||
*/
|
||||
private ServicePool parentPool;
|
||||
/**
|
||||
* Whether the service has been cancelled or not.
|
||||
*/
|
||||
private boolean cancelled = false;
|
||||
|
||||
/**
|
||||
* Creates a new instance of an executable service.
|
||||
* Each service is registered with a {@link String},
|
||||
* to allow for easy identification within the associated {@link ServicePool}.
|
||||
*
|
||||
* @param service_name A namespaced key which can be used to identify the service.
|
||||
*/
|
||||
public AbstractService(@NotNull String service_name) {
|
||||
this((new ServicePool("defaultPool" + Scheduling.denom, JavaPlugin.getPlugin(Traverse.class))),
|
||||
service_name,
|
||||
0L,
|
||||
0L,
|
||||
false,
|
||||
false);
|
||||
|
||||
Scheduling.denom++;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new instance of an executable service.
|
||||
* Each service is registered with a {@link String},
|
||||
* to allow for easy identification within the associated {@link ServicePool}.
|
||||
*
|
||||
* @param parentPool The {@link ServicePool} which this service is executing on.
|
||||
* @param service_name A namespaced key which can be used to identify the service.
|
||||
*/
|
||||
public AbstractService(@Nullable ServicePool parentPool, @NotNull String service_name) {
|
||||
this(parentPool,
|
||||
service_name,
|
||||
0L,
|
||||
0L,
|
||||
false,
|
||||
false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new instance of an executable service.
|
||||
* The timings are measured in ticks (20 ticks per second).
|
||||
* You do not need to explicitly define a delay.
|
||||
* Each service is registered with a {@link String},
|
||||
* to allow for easy identification within the associated {@link ServicePool}.
|
||||
*
|
||||
* @param parentPool The {@link ServicePool} which this service is executing on.
|
||||
* @param service_name A namespaced key which can be used to identify the service.
|
||||
* @param delay A specified amount of time (in ticks) to wait before the service runs.
|
||||
*/
|
||||
public AbstractService(
|
||||
@Nullable ServicePool parentPool,
|
||||
@NotNull String service_name,
|
||||
@Nullable Long delay) {
|
||||
this(parentPool,
|
||||
service_name,
|
||||
delay,
|
||||
0L,
|
||||
false,
|
||||
false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new instance of an executable service.
|
||||
* The timings are measured in ticks (20 ticks per second).
|
||||
* You do not need to explicitly define a delay or a period,
|
||||
* however if you have flagged {@link #repeating} as true, and the period is null,
|
||||
* then the period will automatically be set to 20 minutes.
|
||||
* Each service is registered with a {@link String},
|
||||
* to allow for easy identification within the associated {@link ServicePool}.
|
||||
*
|
||||
* @param parentPool The {@link ServicePool} which this service is executing on.
|
||||
* @param service_name A namespaced key which can be used to identify the service.
|
||||
* @param delay A specified amount of time (in ticks) to wait before the service runs.
|
||||
* @param period How long the service should wait between service executions (in ticks).
|
||||
* @param repeating If the service should be scheduled for repeated executions or not.
|
||||
*/
|
||||
public AbstractService(
|
||||
@Nullable ServicePool parentPool,
|
||||
@NotNull String service_name,
|
||||
@NotNull Long delay,
|
||||
@NotNull Long period,
|
||||
@NotNull Boolean repeating) {
|
||||
this(parentPool,
|
||||
service_name,
|
||||
delay, period,
|
||||
repeating,
|
||||
false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new instance of an executable service.
|
||||
* The timings are measured in ticks (20 ticks per second).
|
||||
* You do not need to explicitly define a delay or a period,
|
||||
* however if you have flagged {@link #repeating} as true, and the period is null,
|
||||
* then the period will automatically be set to 20 minutes.
|
||||
* Each service is registered with a {@link String},
|
||||
* to allow for easy identification within the associated {@link ServicePool}.
|
||||
*
|
||||
* @param parentPool The {@link ServicePool} which this service is executing on.
|
||||
* @param service_name A namespaced key which can be used to identify the service.
|
||||
* @param delay A specified amount of time (in ticks) to wait before the service runs.
|
||||
* @param period How long the service should wait between service executions (in ticks).
|
||||
* @param repeating If the service should be scheduled for repeated executions or not.
|
||||
* @param mayInterruptWhenRunning If the service can be cancelled during execution.
|
||||
*/
|
||||
public AbstractService(
|
||||
@Nullable ServicePool parentPool,
|
||||
@NotNull String service_name,
|
||||
@Nullable Long delay,
|
||||
@Nullable Long period,
|
||||
@NotNull Boolean repeating,
|
||||
@NotNull Boolean mayInterruptWhenRunning) {
|
||||
this.service_name = service_name;
|
||||
this.repeating = repeating;
|
||||
this.delay = Objects.requireNonNullElse(delay, 0L);
|
||||
this.period = Objects.requireNonNullElse(period, (20L * 60L) * 20L);
|
||||
this.mayInterruptWhenRunning = mayInterruptWhenRunning;
|
||||
|
||||
if (parentPool == null) {
|
||||
this.parentPool = new ServicePool("defaultPool" + Scheduling.denom, JavaPlugin.getPlugin(Traverse.class));
|
||||
Scheduling.denom++;
|
||||
} else {
|
||||
this.parentPool = parentPool;
|
||||
}
|
||||
|
||||
this.parentPool.getServices().add(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getInitialDelay() {
|
||||
return delay;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getPeriod() {
|
||||
return period;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isPeriodic() {
|
||||
return repeating;
|
||||
}
|
||||
|
||||
/**
|
||||
* Cancels the execution of this service.
|
||||
*
|
||||
* @return true if the service was cancelled, false if not.
|
||||
*/
|
||||
public boolean isCancelled() {
|
||||
return this.cancelled;
|
||||
}
|
||||
|
||||
/**
|
||||
* Cancels the execution of this service.
|
||||
*
|
||||
* @param cancel Whether the service should be cancelled or not.
|
||||
*/
|
||||
public Mono<Void> setCancelled(boolean cancel) {
|
||||
if (!mayInterruptWhenRunning) {
|
||||
return Mono.empty();
|
||||
}
|
||||
|
||||
cancelled = cancel;
|
||||
return cancel();
|
||||
}
|
||||
|
||||
/**
|
||||
* Actual stop call, to ensure that the service actually #isCancelled().
|
||||
*/
|
||||
@Contract(pure = true)
|
||||
Mono<Void> cancel() {
|
||||
if (isCancelled()) {
|
||||
return stop().then();
|
||||
}
|
||||
return Mono.empty();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<ServicePool> getParentPool() {
|
||||
return Mono.just(parentPool);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return service_name;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Void> setParentPool(ServicePool servicePool) {
|
||||
return Mono.create(sink -> {
|
||||
this.parentPool = servicePool;
|
||||
sink.success();
|
||||
});
|
||||
}
|
||||
}
|
@ -0,0 +1,25 @@
|
||||
package mc.unraveled.reforged.service.base;
|
||||
|
||||
import org.bukkit.scheduler.BukkitTask;
|
||||
import reactor.core.Disposable;
|
||||
|
||||
public record BukkitDisposable(BukkitTask task) implements Disposable {
|
||||
/**
|
||||
* Disposes of the task upstream on the Bukkit scheduler.
|
||||
*/
|
||||
@Override
|
||||
public void dispose() {
|
||||
task.cancel();
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if the task is cancelled.
|
||||
*
|
||||
* @return true if the task is cancelled, false otherwise.
|
||||
*/
|
||||
@Override
|
||||
public boolean isDisposed() {
|
||||
return task.isCancelled();
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,90 @@
|
||||
package mc.unraveled.reforged.service.base;
|
||||
|
||||
import org.bukkit.plugin.java.JavaPlugin;
|
||||
import org.bukkit.scheduler.BukkitScheduler;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import reactor.core.Disposable;
|
||||
import reactor.core.scheduler.Scheduler;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public final class ReactorBukkitScheduler
|
||||
implements Scheduler, Scheduler.Worker {
|
||||
/**
|
||||
* The plugin instance.
|
||||
*/
|
||||
private final JavaPlugin plugin;
|
||||
/**
|
||||
* The bukkit scheduler.
|
||||
*/
|
||||
private final BukkitScheduler scheduler;
|
||||
|
||||
public ReactorBukkitScheduler(JavaPlugin plugin) {
|
||||
this.plugin = plugin;
|
||||
this.scheduler = plugin.getServer().getScheduler();
|
||||
}
|
||||
|
||||
/**
|
||||
* Delegates to the {@link BukkitScheduler}.
|
||||
*
|
||||
* @param task The task to delegate.
|
||||
* @return A disposable that can be used to cancel the task.
|
||||
*/
|
||||
@Override
|
||||
public @NotNull Disposable schedule(@NotNull Runnable task) {
|
||||
return new BukkitDisposable(scheduler.runTask(plugin, task));
|
||||
}
|
||||
|
||||
/**
|
||||
* Delegates to the {@link BukkitScheduler} with a delay.
|
||||
*
|
||||
* @param task The task to delegate
|
||||
* @param delay The amount of time to wait before running the task
|
||||
* @param unit Unused parameter in this implementation.
|
||||
* Regardless of what value you use, this parameter will never be called.
|
||||
* @return A disposable that can be used to cancel the task.
|
||||
*/
|
||||
@Override
|
||||
public @NotNull Disposable schedule(@NotNull Runnable task, long delay, @Deprecated @Nullable TimeUnit unit) {
|
||||
return new BukkitDisposable(scheduler.runTaskLater(plugin, task, delay));
|
||||
}
|
||||
|
||||
/**
|
||||
* Delegates to the {@link BukkitScheduler} with a delay and a period.
|
||||
* The initial delay may be 0L, but the period must be greater than 0L.
|
||||
*
|
||||
* @param task The task to delegate.
|
||||
* @param initialDelay The amount of time to wait before running the task.
|
||||
* @param period The amount of time to wait between each execution of the task.
|
||||
* @param unit Unused parameter in this implementation.
|
||||
* Regardless of what value you use, this parameter will never be called.
|
||||
* @return A disposable that can be used to cancel the task.
|
||||
*/
|
||||
@Override
|
||||
public @NotNull Disposable schedulePeriodically(@NotNull Runnable task, long initialDelay, long period, @Deprecated @Nullable TimeUnit unit) {
|
||||
if (period <= 0L) {
|
||||
throw new IllegalArgumentException("Period must be greater than 0L");
|
||||
}
|
||||
|
||||
return new BukkitDisposable(scheduler.runTaskTimer(plugin, task, initialDelay, period));
|
||||
}
|
||||
|
||||
/**
|
||||
* A new {@link Worker}.
|
||||
*
|
||||
* @return This class instance, as it implements {@link Worker}.
|
||||
*/
|
||||
@Override
|
||||
public @NotNull Scheduler.Worker createWorker() {
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* This method does nothing and is unused.
|
||||
*/
|
||||
@Override
|
||||
@Deprecated
|
||||
public void dispose() {
|
||||
}
|
||||
}
|
@ -0,0 +1,66 @@
|
||||
package mc.unraveled.reforged.service.base;
|
||||
|
||||
import mc.unraveled.reforged.api.IService;
|
||||
import mc.unraveled.reforged.plugin.Traverse;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import reactor.core.Disposable;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
public final class Scheduling {
|
||||
/**
|
||||
* A denominator to use when registering default service pool names.
|
||||
*/
|
||||
static int denom = 0;
|
||||
/**
|
||||
* The service manager to use for controlling service pools.
|
||||
*/
|
||||
private final ServiceManager serviceManager;
|
||||
/**
|
||||
* The plugin to use for registering tasks. This should be an instance of your plugin.
|
||||
*/
|
||||
private final Traverse plugin;
|
||||
|
||||
/**
|
||||
* Creates a new instance of the scheduling system. This is used to manage the scheduling of services.
|
||||
*
|
||||
* @param plugin The plugin to use for this scheduling system. This should be an instance of your plugin.
|
||||
*/
|
||||
public Scheduling(Traverse plugin) {
|
||||
this.serviceManager = new ServiceManager();
|
||||
this.plugin = plugin;
|
||||
}
|
||||
|
||||
public @NotNull Mono<ServiceManager> getServiceManager() {
|
||||
return Mono.just(serviceManager);
|
||||
}
|
||||
|
||||
@NotNull
|
||||
public Mono<Disposable> queue(@NotNull IService service) {
|
||||
return getServiceManager()
|
||||
.flatMap(manager -> manager.getAssociatedServicePool(service))
|
||||
.flatMap(pool -> pool.queue(service));
|
||||
}
|
||||
|
||||
public @NotNull Flux<Disposable> queueAll() {
|
||||
return getServiceManager()
|
||||
.flatMapMany(ServiceManager::getServicePools)
|
||||
.flatMap(ServicePool::queueServices);
|
||||
}
|
||||
|
||||
public @NotNull Mono<Void> runOnce(IService service) {
|
||||
return Mono.create(sink -> service.start().then(service.stop()).subscribe(sink::success));
|
||||
}
|
||||
|
||||
public Mono<Void> forceStop(@NotNull IService service) {
|
||||
return service.stop();
|
||||
}
|
||||
|
||||
public Mono<Void> forceStart(@NotNull IService service) {
|
||||
return service.start();
|
||||
}
|
||||
|
||||
public @NotNull Traverse getPlugin() {
|
||||
return plugin;
|
||||
}
|
||||
}
|
@ -0,0 +1,135 @@
|
||||
package mc.unraveled.reforged.service.base;
|
||||
|
||||
import mc.unraveled.reforged.api.IService;
|
||||
import mc.unraveled.reforged.plugin.Traverse;
|
||||
import org.jetbrains.annotations.Contract;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
public final class ServiceManager {
|
||||
/**
|
||||
* A set of {@link ServicePool}s which are currently active.
|
||||
*/
|
||||
private final Set<ServicePool> servicePools;
|
||||
|
||||
/**
|
||||
* Creates a new instance of the Service Manager class.
|
||||
* This class acts as a Service Pool factory, and can be used to create
|
||||
* both single and multithreaded Service Pools, empty service pools, as well as
|
||||
* retrieve existing Service Pools. It also provides methods for you to add and remove
|
||||
* {@link IService}s from the {ServicePool} parameter.
|
||||
*/
|
||||
public ServiceManager() {
|
||||
servicePools = new HashSet<>();
|
||||
}
|
||||
|
||||
/**
|
||||
* @param poolName The name of the service pool.
|
||||
* @param plugin The plugin which will be used to register the service pool.
|
||||
* @return A {@link Mono} object which contains a {@link ServicePool} element.
|
||||
* This Service Pool will execute each service within the main server thread.
|
||||
*/
|
||||
@Contract(pure = true, value = "_, _ -> new")
|
||||
public @NotNull Mono<ServicePool> emptyBukkitServicePool(String poolName, Traverse plugin) {
|
||||
ServicePool pool = new ServicePool(poolName, plugin);
|
||||
servicePools.add(pool);
|
||||
return Mono.just(pool);
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* @param poolName The name of the service pool.
|
||||
* @param plugin The plugin which will be used to register the service pool.
|
||||
* @param services The services to register within the service pool.
|
||||
* @return A {@link Mono} object which contains a {@link ServicePool} element.
|
||||
* This Service Pool will execute each service within the main server thread.
|
||||
*/
|
||||
@Contract(pure = true, value = "_, _, _ -> new")
|
||||
public @NotNull Mono<ServicePool> bukkitServicePool(String poolName, Traverse plugin, IService... services) {
|
||||
ServicePool pool = new ServicePool(poolName, plugin);
|
||||
Flux.fromIterable(Arrays.asList(services)).doOnEach(s -> pool.addService(s.get()));
|
||||
servicePools.add(pool);
|
||||
return Mono.just(pool);
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds a service to an existing service pool.
|
||||
*
|
||||
* @param poolName The service pool to add to.
|
||||
* @param services The services to register within the service pool.
|
||||
* @return A {@link Mono} object which contains the {@link ServicePool} element that now contains the registered services.
|
||||
*/
|
||||
@Contract("_, _ -> new")
|
||||
public @NotNull Mono<ServicePool> addToExistingPool(@NotNull String poolName, IService... services) {
|
||||
return Mono.create(sink -> {
|
||||
final ServicePool[] servicePool = new ServicePool[1];
|
||||
findPool(poolName).subscribe(pool -> {
|
||||
if (pool == null) throw new RuntimeException("There is no pool currently registered with that name.");
|
||||
servicePool[0] = pool;
|
||||
});
|
||||
List<IService> serviceList = Arrays.asList(services);
|
||||
Flux.fromIterable(serviceList).doOnEach(s -> servicePool[0].addService(s.get()));
|
||||
sink.success(servicePool[0]);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Finds a {@link ServicePool} within the ServiceManager's pool list.
|
||||
*
|
||||
* @param poolName The name of the pool.
|
||||
* @return A Mono object which holds the requested ServicePool, or an empty Mono if the pool does not exist.
|
||||
*/
|
||||
@Contract()
|
||||
public @NotNull Mono<ServicePool> findPool(String poolName) {
|
||||
return getServicePools().filter(pool -> pool.getName().equalsIgnoreCase(poolName)).next();
|
||||
}
|
||||
|
||||
/**
|
||||
* @param pool The service pool to take from.
|
||||
* @param services The services to remove from the pool.
|
||||
* @return A {@link Mono} object which contains the {@link ServicePool} that no longer contains the removed services.
|
||||
*/
|
||||
@Contract("_, _ -> new")
|
||||
public @NotNull Mono<ServicePool> takeFromExistingPool(@NotNull ServicePool pool, IService... services) {
|
||||
Flux.fromIterable(Arrays.asList(services)).doOnEach(s -> {
|
||||
pool.removeService(s.get());
|
||||
});
|
||||
return Mono.just(pool);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return A {@link Flux} object which contains all the service pools currently available.
|
||||
*/
|
||||
@Contract(" -> new")
|
||||
public @NotNull Flux<ServicePool> getServicePools() {
|
||||
return Flux.fromIterable(servicePools);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param service The service to locate.
|
||||
* @return True if the service is somewhere within a service pool, false otherwise.
|
||||
*/
|
||||
@Contract(pure = true)
|
||||
public boolean locateServiceWithinPools(IService service) {
|
||||
return servicePools.stream().map(p -> p.isValidService(service)).findFirst().orElseGet(() -> false);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param service The service pool to call from.
|
||||
* @return A {@link Mono} object which contains a {@link ServicePool} element which contains the specified service.
|
||||
* If no service pool can be found, an empty Mono is returned.
|
||||
*/
|
||||
@Contract("_ -> new")
|
||||
public @NotNull Mono<ServicePool> getAssociatedServicePool(IService service) {
|
||||
if (!locateServiceWithinPools(service)) return Mono.empty();
|
||||
return getServicePools()
|
||||
.filter(p -> p.getServices().contains(service))
|
||||
.next();
|
||||
}
|
||||
}
|
@ -0,0 +1,102 @@
|
||||
package mc.unraveled.reforged.service.base;
|
||||
|
||||
import mc.unraveled.reforged.api.IService;
|
||||
import mc.unraveled.reforged.plugin.Traverse;
|
||||
import org.jetbrains.annotations.Contract;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import reactor.core.Disposable;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.core.scheduler.Scheduler;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public final class ServicePool {
|
||||
private final Set<IService> services;
|
||||
|
||||
private final Scheduler scheduler;
|
||||
|
||||
private final String name;
|
||||
|
||||
public ServicePool(String name, Traverse plugin) {
|
||||
this.services = new HashSet<>();
|
||||
this.scheduler = new ReactorBukkitScheduler(plugin);
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
void addService(IService service) {
|
||||
this.services.add(service);
|
||||
}
|
||||
|
||||
boolean isValidService(IService service) {
|
||||
return this.services.contains(service);
|
||||
}
|
||||
|
||||
public Set<IService> getServices() {
|
||||
return this.services;
|
||||
}
|
||||
|
||||
public @NotNull Mono<Disposable> queue(@NotNull IService service) {
|
||||
return Mono.just(service).map(s -> {
|
||||
if (s.isPeriodic()) {
|
||||
return scheduler.schedulePeriodically(s,
|
||||
s.getInitialDelay() * 50,
|
||||
s.getPeriod() * 50,
|
||||
TimeUnit.MILLISECONDS);
|
||||
} else {
|
||||
return scheduler.schedule(s, s.getInitialDelay() * 50, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public @NotNull Flux<Disposable> queueServices() {
|
||||
Set<Disposable> disposables = new HashSet<>();
|
||||
return Flux.fromIterable(getServices())
|
||||
.filter(Objects::nonNull)
|
||||
.doOnEach(service -> disposables.add(queue(service.get()).block()))
|
||||
.flatMap(service -> Flux.fromIterable(disposables));
|
||||
}
|
||||
|
||||
public @NotNull Mono<Void> stopServices(@NotNull Flux<Disposable> disposableThread) {
|
||||
getServices().forEach(service -> service.stop().subscribe());
|
||||
return disposableThread.doOnNext(Disposable::dispose).then();
|
||||
}
|
||||
|
||||
public @NotNull Mono<Void> stopService(@NotNull String service_name, @Nullable Mono<Disposable> disposable) {
|
||||
return Mono.create(sink -> {
|
||||
getService(service_name).doOnNext(IService::stop).subscribe();
|
||||
if (disposable != null) {
|
||||
disposable.doOnNext(Disposable::dispose).subscribe();
|
||||
}
|
||||
sink.success();
|
||||
});
|
||||
}
|
||||
|
||||
public @NotNull Mono<IService> getService(String service_name) {
|
||||
return Flux.fromIterable(getServices())
|
||||
.filter(service -> service.getName().equals(service_name))
|
||||
.next();
|
||||
}
|
||||
|
||||
void removeService(IService service) {
|
||||
getServices().remove(service);
|
||||
}
|
||||
|
||||
public @NotNull Mono<ServicePool> recycle() {
|
||||
this.getServices().clear();
|
||||
return Mono.create(sink -> sink.success(this));
|
||||
}
|
||||
|
||||
@Contract(pure = true)
|
||||
public Scheduler getScheduler() {
|
||||
return scheduler;
|
||||
}
|
||||
|
||||
public String getName() {
|
||||
return name;
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user