Added Bukkit communication with BukkitScheduler

This commit is contained in:
Paldiu
2022-12-14 11:15:42 -06:00
parent 329726ed95
commit 262115a52e
78 changed files with 5535 additions and 711 deletions

View File

@ -0,0 +1,24 @@
package io.github.simplexdevelopment.scheduler;
import org.bukkit.scheduler.BukkitTask;
import reactor.core.Disposable;
public record BukkitDisposable(BukkitTask task) implements Disposable {
/**
* Disposes of the task upstream on the Bukkit scheduler.
*/
@Override
public void dispose() {
task.cancel();
}
/**
* Checks if the task is cancelled.
*
* @return true if the task is cancelled, false otherwise.
*/
@Override
public boolean isDisposed() {
return task.isCancelled();
}
}

View File

@ -0,0 +1,77 @@
package io.github.simplexdevelopment.scheduler;
import org.bukkit.plugin.java.JavaPlugin;
import org.bukkit.scheduler.BukkitScheduler;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import reactor.core.Disposable;
import reactor.core.scheduler.Scheduler;
import java.util.concurrent.TimeUnit;
public record ReactorBukkitScheduler(JavaPlugin plugin, BukkitScheduler scheduler)
implements Scheduler, Scheduler.Worker {
/**
* Delegates to the {@link BukkitScheduler}.
*
* @param task The task to delegate.
* @return A disposable that can be used to cancel the task.
*/
@Override
public @NotNull Disposable schedule(@NotNull Runnable task) {
return new BukkitDisposable(scheduler.runTask(plugin, task));
}
/**
* Delegates to the {@link BukkitScheduler} with a delay.
*
* @param task The task to delegate
* @param delay The amount of time to wait before running the task
* @param unit Unused parameter in this implementation.
* Regardless of what value you use, this parameter will never be called.
* @return A disposable that can be used to cancel the task.
*/
@Override
public @NotNull Disposable schedule(@NotNull Runnable task, long delay, @Nullable TimeUnit unit) {
return new BukkitDisposable(scheduler.runTaskLater(plugin, task, delay));
}
/**
* Delegates to the {@link BukkitScheduler} with a delay and a period.
* The initial delay may be 0L, but the period must be greater than 0L.
*
* @param task The task to delegate.
* @param initialDelay The amount of time to wait before running the task.
* @param period The amount of time to wait between each execution of the task.
* @param unit Unused parameter in this implementation.
* Regardless of what value you use, this parameter will never be called.
* @return A disposable that can be used to cancel the task.
*/
@Override
public @NotNull Disposable schedulePeriodically(@NotNull Runnable task, long initialDelay, long period, @Nullable TimeUnit unit) {
if (period <= 0L) {
throw new IllegalArgumentException("Period must be greater than 0L");
}
return new BukkitDisposable(scheduler.runTaskTimer(plugin, task, initialDelay, period));
}
/**
* A new {@link Scheduler.Worker}.
*
* @return This class instance, as it implements {@link Scheduler.Worker}.
*/
@Override
public @NotNull Worker createWorker() {
return this;
}
/**
* This method does nothing and is unused.
*/
@Override
public void dispose() {
}
}

View File

@ -0,0 +1,90 @@
package io.github.simplexdevelopment.scheduler;
import io.github.simplexdevelopment.api.ISchedule;
import io.github.simplexdevelopment.api.IService;
import org.bukkit.plugin.java.JavaPlugin;
import org.bukkit.scheduler.BukkitScheduler;
import org.jetbrains.annotations.Contract;
import org.jetbrains.annotations.NotNull;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
public final class SchedulingSystem<T extends JavaPlugin> implements ISchedule {
private final ServiceManager serviceManager;
private final T plugin;
private final Set<ServicePool> repeatingPools;
private final ReactorBukkitScheduler mainScheduler;
/**
* Creates a new instance of the scheduling system. This is used to manage the scheduling of services.
*
* @param plugin The plugin to use for this scheduling system. This should be an instance of your plugin.
*/
public SchedulingSystem(T plugin) {
this.serviceManager = new ServiceManager();
this.plugin = plugin;
this.repeatingPools = new HashSet<>();
this.mainScheduler = new ReactorBukkitScheduler(plugin, plugin.getServer().getScheduler());
}
/**
* @return A set of {@link ServicePool}s which contain repeating services.
*/
@Contract(pure = true)
public Set<ServicePool> getRepeatingPools() {
return repeatingPools;
}
@Override
public @NotNull Mono<ServiceManager> getServiceManager() {
return Mono.just(serviceManager);
}
@Override
@NotNull
public Mono<ServicePool> queue(@NotNull IService service) {
return getServiceManager().flatMap(serviceManager -> {
Mono<ServicePool> pool = serviceManager.getAssociatedServicePool(service);
return pool.defaultIfEmpty(Objects.requireNonNull(serviceManager
.createServicePool(ServicePool.getDefaultNamespacedKey(), service)
.block()));
});
}
@Override
public @NotNull Mono<Void> runOnce(IService service) {
return Mono.just(service).doOnNext(s -> {
s.start().then(s.stop()).subscribe();
}).then();
}
@Override
public Mono<Void> forceStop(IService service) {
return service.stop();
}
@Override
public Mono<Void> forceStart(IService service) {
return service.start();
}
/**
* @return A Mono object containing your plugin, for non-blocking communication.
*/
public @NotNull Mono<T> getProvidingPlugin() {
return Mono.just(plugin);
}
/**
* @return The main thread which the scheduling system operates on.
*/
@Contract(pure = true)
public ReactorBukkitScheduler getMainScheduler() {
return mainScheduler;
}
}

View File

@ -0,0 +1,123 @@
package io.github.simplexdevelopment.scheduler;
import io.github.simplexdevelopment.api.IService;
import org.bukkit.NamespacedKey;
import org.jetbrains.annotations.Contract;
import org.jetbrains.annotations.NotNull;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
public final class ServiceManager {
private final Set<ServicePool> servicePools;
/**
* Creates a new instance of the Service Manager class.
* This class acts as a Service Pool factory, and can be used to create
* both single and multithreaded Service Pools, empty service pools, as well as
* retrieve existing Service Pools. It also provides methods for you to add and remove
* {@link IService}s from the {ServicePool} parameter.
*/
public ServiceManager() {
servicePools = new HashSet<>();
}
/**
* @param poolName The name of the service pool.
* @param services The services to register within the service pool.
* @return A {@link Mono} object which contains a {@link ServicePool} element.
* This service pool will execute each service consecutively within a singular non-blocking thread.
*/
@Contract(pure = true, value = "_, _ -> new")
public @NotNull Mono<ServicePool> createServicePool(NamespacedKey poolName, IService... services) {
ServicePool pool = new ServicePool(poolName, false);
Flux.fromIterable(Arrays.asList(services)).doOnEach(s -> pool.addService(s.get()));
return Mono.just(pool);
}
/**
* @param name The name of the service pool.
* @param services The services to register within the service pool.
* @return A {@link Mono} object which contains a {@link ServicePool} element.
* This service pool will execute each service across a set of non-blocking threads.
*/
@Contract(pure = true, value = "_, _ -> new")
public @NotNull Mono<ServicePool> multithreadedServicePool(NamespacedKey name, IService... services) {
ServicePool pool = new ServicePool(name, true);
Flux.fromIterable(Arrays.asList(services)).doOnEach(s -> pool.addService(s.get()));
return Mono.just(pool);
}
/**
* @param poolName The name of the service pool.
* @param multithreaded Whether the service pool should be multithreaded, or operate upon a single thread.
* @return A {@link Mono} object which contains a {@link ServicePool} element.
* This pool is empty, meaning it contains no services. Any attempt to run services on this pool while it remains empty will either fail or error.
* You can add services to this pool by using {@link ServiceManager#addToExistingPool(ServicePool, IService...)},
* or by using {@link ServicePool#addService(IService)}.
*/
@Contract(pure = true, value = "_, _ -> new")
public @NotNull Mono<ServicePool> emptyServicePool(NamespacedKey poolName, boolean multithreaded) {
ServicePool pool = new ServicePool(poolName, multithreaded);
return Mono.just(pool);
}
/**
* @param pool The service pool to add to.
* @param services The services to register within the service pool.
* @return A {@link Mono} object which contains the {@link ServicePool} element that now contains the registered services.
*/
@Contract("_, _ -> new")
public @NotNull Mono<ServicePool> addToExistingPool(@NotNull ServicePool pool, IService... services) {
Flux.fromIterable(Arrays.asList(services)).doOnEach(s -> {
pool.addService(s.get());
});
return Mono.just(pool);
}
/**
* @param pool The service pool to take from.
* @param services The services to remove from the pool.
* @return A {@link Mono} object which contains the {@link ServicePool} that no longer contains the removed services.
*/
@Contract("_, _ -> new")
public @NotNull Mono<ServicePool> takeFromExistingPool(@NotNull ServicePool pool, IService... services) {
Flux.fromIterable(Arrays.asList(services)).doOnEach(s -> {
pool.removeService(s.get());
});
return Mono.just(pool);
}
/**
* @return A {@link Flux} object which contains all the service pools currently available.
*/
@Contract(" -> new")
public @NotNull Flux<ServicePool> getServicePools() {
return Flux.fromIterable(servicePools);
}
/**
* @param service The service to locate.
* @return True if the service is somewhere within a service pool, false otherwise.
*/
@Contract(pure = true)
public boolean locateServiceWithinPools(IService service) {
return servicePools.stream().map(p -> p.isValidService(service)).findFirst().orElseGet(() -> false);
}
/**
* @param service The service pool to call from.
* @return A {@link Mono} object which contains a {@link ServicePool} element which contains the specified service.
* If no service pool can be found, an empty Mono is returned.
*/
@Contract("_ -> new")
public @NotNull Mono<ServicePool> getAssociatedServicePool(IService service) {
if (!locateServiceWithinPools(service)) return Mono.empty();
return getServicePools()
.filter(p -> p.getAssociatedServices().contains(service))
.next();
}
}

View File

@ -0,0 +1,205 @@
package io.github.simplexdevelopment.scheduler;
import io.github.simplexdevelopment.api.IService;
import io.github.simplexdevelopment.api.InvalidServicePoolException;
import org.bukkit.NamespacedKey;
import org.bukkit.plugin.java.JavaPlugin;
import org.jetbrains.annotations.Contract;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;
public final class ServicePool {
/**
* The default {@link NamespacedKey} used to identify unmarked services. This will cause errors if left unchecked.
*/
private static final NamespacedKey DEFAULT = new NamespacedKey("simplex_ss", "default_service_pool");
/**
* A collection of services related to this service pool.
*/
private final Set<IService> associatedServices;
/**
* The scheduler used to run the services in this pool.
*/
private final Scheduler scheduler;
/**
* The key used to identify this service pool.
*/
private final NamespacedKey name;
private final ReactorBukkitScheduler rbScheduler;
/**
* This will create a new instance of a Service Pool with a {@link Scheduler} as its main scheduler.
* This should be used if you'd like to execute tasks without communicating on the main server thread.
*
* @param name The name of this service pool.
* @param multithreaded Whether this service pool should be multithreaded, or operate upon a single thread.
*/
public ServicePool(NamespacedKey name, boolean multithreaded) {
this.name = name;
this.associatedServices = new HashSet<>();
if (multithreaded) {
this.scheduler = Schedulers.boundedElastic();
} else {
this.scheduler = Schedulers.single();
}
this.rbScheduler = null;
}
/**
* This will create a new instance of a Service Pool with the {@link ReactorBukkitScheduler} as its main scheduler.
* This should be used if you'd like to execute tasks while communicating on the main server thread.
*
* @param name The name of this service pool.
*/
public ServicePool(NamespacedKey name, JavaPlugin plugin) {
this.name = name;
this.associatedServices = new HashSet<>();
this.scheduler = null;
this.rbScheduler = new ReactorBukkitScheduler(plugin, plugin.getServer().getScheduler());
}
/**
* @return The default namespaced key to use if one is not assigned.
*/
static NamespacedKey getDefaultNamespacedKey() {
return DEFAULT;
}
/**
* @param service Add a service to the pool's service collection.
*/
void addService(IService service) {
getAssociatedServices().add(service);
}
/**
* @param service The service to check against this pool.
* @return Whether the service is registered with this pool.
*/
boolean isValidService(IService service) {
return getAssociatedServices().contains(service);
}
/**
* @return A {@link Set} of {@link IService} objects which are registered with this pool.
*/
@Contract(pure = true)
@NotNull
public Set<IService> getAssociatedServices() {
return associatedServices;
}
/**
* @param service The name of the service to queue. This should be a service that is located within this service pool.
* If you name a service that is stored within another service pool,
* this method will throw an error.
* @return A {@link Mono} object which contains a {@link Disposable} element which can be used to destroy the registered service.
*/
public @NotNull Mono<Disposable> queueService(IService service) {
return Mono.just(service).map(s -> {
if (s.isPeriodic()) {
if (scheduler != null) {
return scheduler.schedulePeriodically(s,
s.getDelay() * 50,
s.getPeriod() * 50,
TimeUnit.MILLISECONDS);
} else if (rbScheduler != null) {
return rbScheduler.schedulePeriodically(s,
s.getDelay() * 50,
s.getPeriod() * 50,
TimeUnit.MILLISECONDS);
} else {
throw new InvalidServicePoolException("Service pool is not initialized properly.");
}
} else {
if (scheduler != null) {
return scheduler.schedule(s,
s.getDelay() * 50,
TimeUnit.MILLISECONDS);
} else if (rbScheduler != null) {
return rbScheduler.schedule(s,
s.getDelay() * 50,
TimeUnit.MILLISECONDS);
} else {
throw new InvalidServicePoolException("Service pool is not initialized properly.");
}
}
});
}
/**
* @return A {@link Flux} object which contains a collection of {@link Disposable} elements,
* which can be used to destroy the registered services using {@link ServicePool#stopServices(Flux)}.
*/
public @NotNull Flux<Disposable> startServices() {
Set<Disposable> disposables = new HashSet<>();
return Flux.fromIterable(getAssociatedServices())
.doOnEach(service -> disposables.add(queueService(service.get()).block()))
.flatMap(service -> Flux.fromIterable(disposables));
}
/**
* @param disposableThread A {@link Flux<Disposable>} which contains all the services that should be disposed..
* @return A {@link Mono<Void>} object which can be used to stop the services.
*/
public @NotNull Mono<Void> stopServices(@NotNull Flux<Disposable> disposableThread) {
getAssociatedServices().forEach(service -> service.stop().subscribe());
return disposableThread.doOnNext(Disposable::dispose).then();
}
/**
* @param service_name The name of the service to stop.
* @param disposable A {@link Disposable} object which contains the service that should be disposed.
* @return A {@link Mono<Void>} object which can be used to stop the service.
*/
public @NotNull Mono<Void> stopService(@NotNull NamespacedKey service_name, @Nullable Mono<Disposable> disposable) {
getService(service_name).doOnNext(IService::stop).subscribe();
if (disposable != null) {
disposable.doOnNext(Disposable::dispose).subscribe();
}
return Mono.empty();
}
/**
* @param service_name The name of the service to get.
* @return A {@link Mono} object which contains the service.
*/
public @NotNull Mono<IService> getService(NamespacedKey service_name) {
return Flux.fromIterable(getAssociatedServices())
.filter(service -> service.getNamespacedKey().equals(service_name))
.next();
}
/**
* @param service The service to remove from the pool's service collection.
*/
void removeService(IService service) {
getAssociatedServices().remove(service);
}
/**
* @return This service pool after being cleared of all services.
* You will need to register services with this pool again before using it.
*/
public @NotNull Mono<ServicePool> recycle() {
this.getAssociatedServices().clear();
return Mono.just(this);
}
/**
* @return The {@link Scheduler} which hosts the threads for the service pool.
*/
@Contract(pure = true)
public Scheduler getScheduler() {
return scheduler;
}
}