From 699ae54af48fd96492b6fb536d43120de0179299 Mon Sep 17 00:00:00 2001 From: Terekhin Alexandr Date: Tue, 31 Oct 2023 14:43:31 +0300 Subject: [PATCH] feat: Trigger profile impl --- .../com/yablochkov/ocppstub/BootService.java | 2 + .../com/yablochkov/ocppstub/Communicator.java | 80 +++++++++++++++++++ .../com/yablochkov/ocppstub/EventHandler.java | 6 +- .../yablochkov/ocppstub/TriggerService.java | 78 ++++++++++++++++++ .../ocppstub/configuration/AppConfig.java | 10 ++- .../eu/chargetime/ocpp/YablJSONServer.java | 15 ++-- 6 files changed, 182 insertions(+), 9 deletions(-) create mode 100644 src/main/java/com/yablochkov/ocppstub/Communicator.java create mode 100644 src/main/java/com/yablochkov/ocppstub/TriggerService.java diff --git a/src/main/java/com/yablochkov/ocppstub/BootService.java b/src/main/java/com/yablochkov/ocppstub/BootService.java index 078d027..c3cd415 100644 --- a/src/main/java/com/yablochkov/ocppstub/BootService.java +++ b/src/main/java/com/yablochkov/ocppstub/BootService.java @@ -18,6 +18,7 @@ public class BootService { private final static int INTERVAL_SEC = 10; private final SessionService sessionService; + private final TriggerService triggerService; public BootNotificationConfirmation handle(UUID sessionIndex, BootNotificationRequest request) { String auth = sessionService.getAuthBySessionId(sessionIndex); @@ -25,6 +26,7 @@ public class BootService { if (passwd != null) { log.info("Accept boot request {}", sessionIndex); + triggerService.bootCompleted(sessionIndex); return createResponse(RegistrationStatus.Accepted); } diff --git a/src/main/java/com/yablochkov/ocppstub/Communicator.java b/src/main/java/com/yablochkov/ocppstub/Communicator.java new file mode 100644 index 0000000..2ef3762 --- /dev/null +++ b/src/main/java/com/yablochkov/ocppstub/Communicator.java @@ -0,0 +1,80 @@ +package com.yablochkov.ocppstub; + +import eu.chargetime.ocpp.model.Request; +import eu.chargetime.ocpp.model.core.HeartbeatRequest; +import eu.chargetime.ocpp.model.core.StatusNotificationRequest; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.function.Function; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +@Slf4j +@Service +public class Communicator { + private final Map> map = new HashMap<>(); + + public void handleHeartbeat(UUID session, HeartbeatRequest request) { + process( + session, + request, + (container) -> { + container.future.complete(null); + return null; + }); + } + + public void handleStatusNotification(UUID session, Request request) { + log.info("Status notification for session {}: {}", session, request); + process( + session, + request, + (container) -> { + container.future.complete(request); + return null; + }); + } + + public CompletableFuture waitForHeartbeat(UUID session) { + var future = new CompletableFuture(); + var containers = map.computeIfAbsent(session, (s) -> new LinkedList<>()); + containers.add(new Container(HeartbeatRequest.class, future)); + return future; + } + + public CompletionStage waitForStatusNotification(UUID session) { + var future = new CompletableFuture(); + var containers = map.computeIfAbsent(session, (s) -> new LinkedList<>()); + containers.add(new Container(StatusNotificationRequest.class, future)); + return future; + } + + private void process(UUID sessionIndex, Request request, Function fn) { + Optional.ofNullable(map.get(sessionIndex)) + .ifPresent(containers -> { + var processed = containers.stream() + .filter(container -> container.isClassEqual(request.getClass())) + .peek(fn::apply) + .toList(); + + containers.removeAll(processed); + }); + } + + @AllArgsConstructor + private static class Container { + Class cls; + CompletableFuture future; + + boolean isClassEqual(Class c) { + return cls == c; + } + } +} diff --git a/src/main/java/com/yablochkov/ocppstub/EventHandler.java b/src/main/java/com/yablochkov/ocppstub/EventHandler.java index 84fd9d3..11b06f8 100644 --- a/src/main/java/com/yablochkov/ocppstub/EventHandler.java +++ b/src/main/java/com/yablochkov/ocppstub/EventHandler.java @@ -19,6 +19,7 @@ public class EventHandler implements ServerCoreEventHandler, ServerEvents { private final HeartbeatService heartbeatService; private final ConfigService configService; private final ConnectorStatusService connectorService; + private final Communicator communicator; @Override public AuthorizeConfirmation handleAuthorizeRequest(UUID sessionIndex, AuthorizeRequest request) { @@ -36,11 +37,13 @@ public class EventHandler implements ServerCoreEventHandler, ServerEvents { @Override public DataTransferConfirmation handleDataTransferRequest(UUID sessionIndex, DataTransferRequest request) { - return null; + return new DataTransferConfirmation(DataTransferStatus.Rejected); } @Override public HeartbeatConfirmation handleHeartbeatRequest(UUID sessionIndex, HeartbeatRequest request) { + log.info("Heartbeat for session {}", sessionIndex); + communicator.handleHeartbeat(sessionIndex, request); return heartbeatService.handle(sessionIndex, request); } @@ -56,6 +59,7 @@ public class EventHandler implements ServerCoreEventHandler, ServerEvents { @Override public StatusNotificationConfirmation handleStatusNotificationRequest(UUID sessionIndex, StatusNotificationRequest request) { + communicator.handleStatusNotification(sessionIndex, request); return connectorService.handle(sessionIndex, request); } diff --git a/src/main/java/com/yablochkov/ocppstub/TriggerService.java b/src/main/java/com/yablochkov/ocppstub/TriggerService.java new file mode 100644 index 0000000..16996b2 --- /dev/null +++ b/src/main/java/com/yablochkov/ocppstub/TriggerService.java @@ -0,0 +1,78 @@ +package com.yablochkov.ocppstub; + +import eu.chargetime.ocpp.model.Confirmation; +import eu.chargetime.ocpp.model.core.StatusNotificationRequest; +import eu.chargetime.ocpp.model.remotetrigger.TriggerMessageConfirmation; +import eu.chargetime.ocpp.model.remotetrigger.TriggerMessageRequest; +import eu.chargetime.ocpp.model.remotetrigger.TriggerMessageRequestType; +import eu.chargetime.ocpp.model.remotetrigger.TriggerMessageStatus; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Lazy; +import org.springframework.stereotype.Service; + +@Slf4j +@Service +public class TriggerService { + @Lazy @Autowired private OcppStub ocppStub; + @Autowired private Communicator communicator; + + private final ExecutorService exec = Executors.newCachedThreadPool(); + + + public void bootCompleted(UUID session) { + log.debug("Before future"); + var future = CompletableFuture + .runAsync(TriggerService::timeout, exec) + .thenCompose((param) -> sendRequest(session, TriggerMessageRequestType.Heartbeat)) + .thenAccept(TriggerService::isOk) + .thenCompose((param) -> communicator.waitForHeartbeat(session)) + .thenCompose((param) -> sendRequest(session, TriggerMessageRequestType.StatusNotification)) + .thenAccept(TriggerService::isOk) + .thenCompose((param) -> communicator.waitForStatusNotification(session)); + + log.debug("After future"); + + exec.submit(() -> { + try { + log.debug("Submit trigger task for execution"); + future.get(); + log.debug("Trigger test task completed"); + } catch (Exception e) { + log.error("Triggered msg testing failed", e); + } + }); + } + + @SneakyThrows + private static void timeout() { + log.debug("Waiting for timeout"); + TimeUnit.SECONDS.sleep(2); + } + + private static void isOk(Confirmation confirmation) { + if (confirmation instanceof TriggerMessageConfirmation conf) { + if (conf.getStatus() == TriggerMessageStatus.Accepted) { + + return; + } + } + log.debug("Message rejected, raise error"); + throw new RuntimeException(); + } + + @SneakyThrows + private CompletionStage sendRequest(UUID session, TriggerMessageRequestType type) { + log.debug("Send {} to session {}", type, session); + return ocppStub.send(session, new TriggerMessageRequest(type)); + } +} diff --git a/src/main/java/com/yablochkov/ocppstub/configuration/AppConfig.java b/src/main/java/com/yablochkov/ocppstub/configuration/AppConfig.java index 2fe4f3e..9ec8b4f 100644 --- a/src/main/java/com/yablochkov/ocppstub/configuration/AppConfig.java +++ b/src/main/java/com/yablochkov/ocppstub/configuration/AppConfig.java @@ -3,6 +3,7 @@ package com.yablochkov.ocppstub.configuration; import com.yablochkov.ocppstub.EventHandler; import eu.chargetime.ocpp.YablJSONServer; import eu.chargetime.ocpp.feature.profile.ServerCoreProfile; +import eu.chargetime.ocpp.feature.profile.ServerRemoteTriggerProfile; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -14,7 +15,12 @@ public class AppConfig { } @Bean - public YablJSONServer getJSONServer(ServerCoreProfile profile) { - return new YablJSONServer(profile); + public ServerRemoteTriggerProfile getServerRemoteTriggerProfile() { + return new ServerRemoteTriggerProfile(); + } + + @Bean + public YablJSONServer getJSONServer(ServerCoreProfile coreProfile, ServerRemoteTriggerProfile triggerProfile) { + return new YablJSONServer(coreProfile, triggerProfile); } } diff --git a/src/main/java/eu/chargetime/ocpp/YablJSONServer.java b/src/main/java/eu/chargetime/ocpp/YablJSONServer.java index 74a10e0..4a855d5 100644 --- a/src/main/java/eu/chargetime/ocpp/YablJSONServer.java +++ b/src/main/java/eu/chargetime/ocpp/YablJSONServer.java @@ -2,6 +2,7 @@ package eu.chargetime.ocpp; import eu.chargetime.ocpp.feature.profile.Profile; import eu.chargetime.ocpp.feature.profile.ServerCoreProfile; +import eu.chargetime.ocpp.feature.profile.ServerRemoteTriggerProfile; import eu.chargetime.ocpp.model.Confirmation; import eu.chargetime.ocpp.model.Request; import eu.chargetime.ocpp.wss.BaseWssFactoryBuilder; @@ -34,7 +35,7 @@ public class YablJSONServer implements IServerAPI { * @param coreProfile implementation of the core feature profile. * @param configuration network configuration for a json server. */ - public YablJSONServer(ServerCoreProfile coreProfile, JSONConfiguration configuration) { + public YablJSONServer(ServerCoreProfile coreProfile, ServerRemoteTriggerProfile triggerProfile, JSONConfiguration configuration) { featureRepository = new FeatureRepository(); SessionFactory sessionFactory = new SessionFactory(featureRepository); @@ -46,6 +47,7 @@ public class YablJSONServer implements IServerAPI { this.listener = new YablWebSocketListener(sessionFactory, configuration, draftOcppOnly); server = new Server(this.listener, featureRepository, new PromiseRepository()); featureRepository.addFeatureProfile(coreProfile); + featureRepository.addFeatureProfile(triggerProfile); } /** @@ -53,8 +55,8 @@ public class YablJSONServer implements IServerAPI { * * @param coreProfile implementation of the core feature profile. */ - public YablJSONServer(ServerCoreProfile coreProfile) { - this(coreProfile, JSONConfiguration.get()); + public YablJSONServer(ServerCoreProfile coreProfile, ServerRemoteTriggerProfile triggerProfile) { + this(coreProfile, triggerProfile, JSONConfiguration.get()); } /** @@ -67,9 +69,10 @@ public class YablJSONServer implements IServerAPI { */ public YablJSONServer( ServerCoreProfile coreProfile, + ServerRemoteTriggerProfile triggerProfile, WssFactoryBuilder wssFactoryBuilder, JSONConfiguration configuration) { - this(coreProfile, configuration); + this(coreProfile, triggerProfile, configuration); enableWSS(wssFactoryBuilder); } @@ -80,8 +83,8 @@ public class YablJSONServer implements IServerAPI { * @param wssFactoryBuilder to build {@link org.java_websocket.WebSocketServerFactory} to support * wss://. */ - public YablJSONServer(ServerCoreProfile coreProfile, WssFactoryBuilder wssFactoryBuilder) { - this(coreProfile, wssFactoryBuilder, JSONConfiguration.get()); + public YablJSONServer(ServerCoreProfile coreProfile, ServerRemoteTriggerProfile triggerProfile, WssFactoryBuilder wssFactoryBuilder) { + this(coreProfile, triggerProfile, wssFactoryBuilder, JSONConfiguration.get()); } // To ensure the exposed API is backward compatible