parent
52509c477c
commit
699ae54af4
@ -0,0 +1,80 @@ |
||||
package com.yablochkov.ocppstub; |
||||
|
||||
import eu.chargetime.ocpp.model.Request; |
||||
import eu.chargetime.ocpp.model.core.HeartbeatRequest; |
||||
import eu.chargetime.ocpp.model.core.StatusNotificationRequest; |
||||
import java.util.HashMap; |
||||
import java.util.LinkedList; |
||||
import java.util.List; |
||||
import java.util.Map; |
||||
import java.util.Optional; |
||||
import java.util.UUID; |
||||
import java.util.concurrent.CompletableFuture; |
||||
import java.util.concurrent.CompletionStage; |
||||
import java.util.function.Function; |
||||
import lombok.AllArgsConstructor; |
||||
import lombok.extern.slf4j.Slf4j; |
||||
import org.springframework.stereotype.Service; |
||||
|
||||
@Slf4j |
||||
@Service |
||||
public class Communicator { |
||||
private final Map<UUID, List<Container>> map = new HashMap<>(); |
||||
|
||||
public void handleHeartbeat(UUID session, HeartbeatRequest request) { |
||||
process( |
||||
session, |
||||
request, |
||||
(container) -> { |
||||
container.future.complete(null); |
||||
return null; |
||||
}); |
||||
} |
||||
|
||||
public void handleStatusNotification(UUID session, Request request) { |
||||
log.info("Status notification for session {}: {}", session, request); |
||||
process( |
||||
session, |
||||
request, |
||||
(container) -> { |
||||
container.future.complete(request); |
||||
return null; |
||||
}); |
||||
} |
||||
|
||||
public CompletableFuture<Void> waitForHeartbeat(UUID session) { |
||||
var future = new CompletableFuture<Void>(); |
||||
var containers = map.computeIfAbsent(session, (s) -> new LinkedList<>()); |
||||
containers.add(new Container(HeartbeatRequest.class, future)); |
||||
return future; |
||||
} |
||||
|
||||
public CompletionStage<StatusNotificationRequest> waitForStatusNotification(UUID session) { |
||||
var future = new CompletableFuture<StatusNotificationRequest>(); |
||||
var containers = map.computeIfAbsent(session, (s) -> new LinkedList<>()); |
||||
containers.add(new Container(StatusNotificationRequest.class, future)); |
||||
return future; |
||||
} |
||||
|
||||
private void process(UUID sessionIndex, Request request, Function<Container,Void> fn) { |
||||
Optional.ofNullable(map.get(sessionIndex)) |
||||
.ifPresent(containers -> { |
||||
var processed = containers.stream() |
||||
.filter(container -> container.isClassEqual(request.getClass())) |
||||
.peek(fn::apply) |
||||
.toList(); |
||||
|
||||
containers.removeAll(processed); |
||||
}); |
||||
} |
||||
|
||||
@AllArgsConstructor |
||||
private static class Container { |
||||
Class<?> cls; |
||||
CompletableFuture future; |
||||
|
||||
boolean isClassEqual(Class<?> c) { |
||||
return cls == c; |
||||
} |
||||
} |
||||
} |
@ -0,0 +1,78 @@ |
||||
package com.yablochkov.ocppstub; |
||||
|
||||
import eu.chargetime.ocpp.model.Confirmation; |
||||
import eu.chargetime.ocpp.model.core.StatusNotificationRequest; |
||||
import eu.chargetime.ocpp.model.remotetrigger.TriggerMessageConfirmation; |
||||
import eu.chargetime.ocpp.model.remotetrigger.TriggerMessageRequest; |
||||
import eu.chargetime.ocpp.model.remotetrigger.TriggerMessageRequestType; |
||||
import eu.chargetime.ocpp.model.remotetrigger.TriggerMessageStatus; |
||||
import java.util.UUID; |
||||
import java.util.concurrent.CompletableFuture; |
||||
import java.util.concurrent.CompletionStage; |
||||
import java.util.concurrent.ExecutionException; |
||||
import java.util.concurrent.Executor; |
||||
import java.util.concurrent.ExecutorService; |
||||
import java.util.concurrent.Executors; |
||||
import java.util.concurrent.TimeUnit; |
||||
import lombok.SneakyThrows; |
||||
import lombok.extern.slf4j.Slf4j; |
||||
import org.springframework.beans.factory.annotation.Autowired; |
||||
import org.springframework.context.annotation.Lazy; |
||||
import org.springframework.stereotype.Service; |
||||
|
||||
@Slf4j |
||||
@Service |
||||
public class TriggerService { |
||||
@Lazy @Autowired private OcppStub ocppStub; |
||||
@Autowired private Communicator communicator; |
||||
|
||||
private final ExecutorService exec = Executors.newCachedThreadPool(); |
||||
|
||||
|
||||
public void bootCompleted(UUID session) { |
||||
log.debug("Before future"); |
||||
var future = CompletableFuture |
||||
.runAsync(TriggerService::timeout, exec) |
||||
.thenCompose((param) -> sendRequest(session, TriggerMessageRequestType.Heartbeat)) |
||||
.thenAccept(TriggerService::isOk) |
||||
.thenCompose((param) -> communicator.waitForHeartbeat(session)) |
||||
.thenCompose((param) -> sendRequest(session, TriggerMessageRequestType.StatusNotification)) |
||||
.thenAccept(TriggerService::isOk) |
||||
.thenCompose((param) -> communicator.waitForStatusNotification(session)); |
||||
|
||||
log.debug("After future"); |
||||
|
||||
exec.submit(() -> { |
||||
try { |
||||
log.debug("Submit trigger task for execution"); |
||||
future.get(); |
||||
log.debug("Trigger test task completed"); |
||||
} catch (Exception e) { |
||||
log.error("Triggered msg testing failed", e); |
||||
} |
||||
}); |
||||
} |
||||
|
||||
@SneakyThrows |
||||
private static void timeout() { |
||||
log.debug("Waiting for timeout"); |
||||
TimeUnit.SECONDS.sleep(2); |
||||
} |
||||
|
||||
private static void isOk(Confirmation confirmation) { |
||||
if (confirmation instanceof TriggerMessageConfirmation conf) { |
||||
if (conf.getStatus() == TriggerMessageStatus.Accepted) { |
||||
|
||||
return; |
||||
} |
||||
} |
||||
log.debug("Message rejected, raise error"); |
||||
throw new RuntimeException(); |
||||
} |
||||
|
||||
@SneakyThrows |
||||
private CompletionStage<Confirmation> sendRequest(UUID session, TriggerMessageRequestType type) { |
||||
log.debug("Send {} to session {}", type, session); |
||||
return ocppStub.send(session, new TriggerMessageRequest(type)); |
||||
} |
||||
} |
Loading…
Reference in new issue