diff --git a/src/main/java/io/github/simplex/api/ExecutableService.java b/src/main/java/io/github/simplex/api/ExecutableService.java index 8b6bbad..f4f9c91 100644 --- a/src/main/java/io/github/simplex/api/ExecutableService.java +++ b/src/main/java/io/github/simplex/api/ExecutableService.java @@ -6,7 +6,7 @@ public abstract class ExecutableService implements Service { private final int serviceID; private final Plugin plugin; - public ExecutableService(Plugin plugin, int serviceID) { + public ExecutableService(Plugin plugin, int serviceID, long delay, long period, boolean delayed, boolean repeating) { this.serviceID = serviceID; this.plugin = plugin; } diff --git a/src/main/java/io/github/simplex/api/Scheduler.java b/src/main/java/io/github/simplex/api/Scheduler.java index 72fc06a..259a366 100644 --- a/src/main/java/io/github/simplex/api/Scheduler.java +++ b/src/main/java/io/github/simplex/api/Scheduler.java @@ -1,12 +1,13 @@ package io.github.simplex.api; import io.github.simplex.simplexss.ServiceManager; +import io.github.simplex.simplexss.ServicePool; import reactor.core.publisher.Mono; public interface Scheduler { Mono getServiceManager(); - Mono queue(Service service); + Mono queue(Service service); Mono runOnce(Service service); diff --git a/src/main/java/io/github/simplex/api/Service.java b/src/main/java/io/github/simplex/api/Service.java index a8a88b6..346fddc 100644 --- a/src/main/java/io/github/simplex/api/Service.java +++ b/src/main/java/io/github/simplex/api/Service.java @@ -1,22 +1,30 @@ package io.github.simplex.api; -import io.github.simplex.simplexss.ServiceManager; import io.github.simplex.simplexss.ServicePool; import org.bukkit.plugin.Plugin; import reactor.core.publisher.Mono; -public interface Service { +import java.util.concurrent.RunnableScheduledFuture; + +public interface Service extends RunnableScheduledFuture { int getServiceID(); boolean isDelayed(); boolean isRepeating(); + long getPeriod(); + + long getDelay(); + Mono start(); Mono stop(); Plugin getProvidingPlugin(); - ServicePool getServicePool(); + @Override + default void run() { + start().subscribe(); + } } diff --git a/src/main/java/io/github/simplex/simplexss/SchedulingSystem.java b/src/main/java/io/github/simplex/simplexss/SchedulingSystem.java index 5e7b114..bb5a01e 100644 --- a/src/main/java/io/github/simplex/simplexss/SchedulingSystem.java +++ b/src/main/java/io/github/simplex/simplexss/SchedulingSystem.java @@ -35,24 +35,11 @@ public final class SchedulingSystem implements Scheduler { } @Override - public Mono queue(Service service) { - getServiceManager().doOnNext(m -> { - Mono pool = Mono.justOrEmpty(m.getAssociatedServicePool(service)); - pool.defaultIfEmpty(m.createServicePool(service)) - .map(p -> p.getService(service.getServiceID())) - .doOnNext(s -> { - if (s.getServicePool().isPoolDelayed()) { - getDelayedPools().add(s.getServicePool()); - } - if (s.getServicePool().isPoolRepeating()) { - getRepeatingPools().add(s.getServicePool()); - } - else { - runOnce(s).block(); - } - }); + public Mono queue(Service service) { + return getServiceManager().flatMap(serviceManager -> { + Mono pool = Mono.justOrEmpty(serviceManager.getAssociatedServicePool(service)); + return pool.defaultIfEmpty(serviceManager.createServicePool(service)); }); - return Mono.empty(); } @Override diff --git a/src/main/java/io/github/simplex/simplexss/ServiceManager.java b/src/main/java/io/github/simplex/simplexss/ServiceManager.java index 0000965..44eb889 100644 --- a/src/main/java/io/github/simplex/simplexss/ServiceManager.java +++ b/src/main/java/io/github/simplex/simplexss/ServiceManager.java @@ -49,7 +49,7 @@ public final class ServiceManager { public @Nullable ServicePool getAssociatedServicePool(Service service) { if (!locateServiceWithinPools(service)) return null; - return servicePools + return getServicePools() .stream() .filter(p -> p.getAssociatedServices().contains(service)) .findFirst() diff --git a/src/main/java/io/github/simplex/simplexss/ServicePool.java b/src/main/java/io/github/simplex/simplexss/ServicePool.java index b9989b8..2f1e418 100644 --- a/src/main/java/io/github/simplex/simplexss/ServicePool.java +++ b/src/main/java/io/github/simplex/simplexss/ServicePool.java @@ -1,47 +1,63 @@ package io.github.simplex.simplexss; import io.github.simplex.api.Service; +import org.jetbrains.annotations.NotNull; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; public final class ServicePool { private final Set associatedServices; + private final Scheduler scheduler; + private final ExecutorService executor; private boolean delayed = false; private boolean repeating = false; public ServicePool() { this.associatedServices = new HashSet<>(); + this.executor = Executors.newSingleThreadExecutor(); + this.scheduler = Schedulers.fromExecutorService(executor); } - public void setDelayed(boolean delayed) { - this.delayed = delayed; - } - - public void setRepeating(boolean repeating) { - this.repeating = repeating; - } - - public boolean isPoolDelayed() { - return delayed; - } - - public boolean isPoolRepeating() { - return repeating; - } - - public void addService(Service service) { + void addService(Service service) { getAssociatedServices().add(service); } - public boolean isValidService(Service service) { + boolean isValidService(Service service) { return getAssociatedServices().contains(service); } + @NotNull public Set getAssociatedServices() { return associatedServices; } + public Mono startServices() { + return Mono.just(getAssociatedServices()).doOnNext(services -> { + for (Service service : services) { + if (service.isRepeating()) { + scheduler.schedulePeriodically(service, service.getDelay() * 5, service.getPeriod() * 5, TimeUnit.MILLISECONDS); + } else if (service.isDelayed()) { + scheduler.schedule(service, service.getDelay() * 5, TimeUnit.MILLISECONDS); + } + } + }).then(); + } + + public Mono stopServices() { + return Mono.just(getAssociatedServices()).doOnNext(services -> { + for (Service service : services) { + service.stop(); + } + }).then(); + } + public Service getService(int serviceID) { return getAssociatedServices() .stream() @@ -53,4 +69,11 @@ public final class ServicePool { public void removeService(Service service) { getAssociatedServices().remove(service); } + + public ServicePool recycle() { + this.getAssociatedServices().clear(); + this.repeating = false; + this.delayed = false; + return this; + } }