package io.fluxcapacitor.axonclient.commandhandling.result;

import io.fluxcapacitor.axonclient.common.serialization.AxonMessageSerializer;
import io.fluxcapacitor.common.ObjectUtils;
import io.fluxcapacitor.common.Registration;
import io.fluxcapacitor.common.api.Message;
import io.fluxcapacitor.javaclient.tracking.ConsumerService;
import io.fluxcapacitor.javaclient.tracking.Processor;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/fluxcapacitor/axonclient/commandhandling/result/ResultProcessor.class */
public class ResultProcessor implements ResultService {
    private static final Logger log = LoggerFactory.getLogger(ResultProcessor.class);
    private final AxonMessageSerializer serializer;
    private final ConsumerService consumerService;
    private final String name;
    private final int threads;
    private final Map<String, CompletableFuture<Object>> outstandingRequests;
    private volatile Registration registration;

    public ResultProcessor(AxonMessageSerializer axonMessageSerializer, ConsumerService consumerService, String str) {
        this(axonMessageSerializer, consumerService, str, 1);
    }

    public ResultProcessor(AxonMessageSerializer axonMessageSerializer, ConsumerService consumerService, String str, int i) {
        this.outstandingRequests = new ConcurrentHashMap();
        this.serializer = axonMessageSerializer;
        this.consumerService = consumerService;
        this.name = str;
        this.threads = i;
    }

    @Override // io.fluxcapacitor.axonclient.commandhandling.result.ResultService
    public CompletableFuture<Object> awaitResult(String str) {
        CompletableFuture<Object> completableFuture = new CompletableFuture<>();
        this.outstandingRequests.put(str, completableFuture);
        return completableFuture;
    }

    protected void handle(List<Message> list) {
        Iterator<Message> it = list.iterator();
        while (it.hasNext()) {
            org.axonframework.messaging.Message<?> deserializeMessage = this.serializer.deserializeMessage(it.next());
            String str = (String) deserializeMessage.getMetaData().get("correlationId");
            Optional ofNullable = Optional.ofNullable(this.outstandingRequests.remove(str));
            if (ofNullable.isPresent()) {
                CompletableFuture completableFuture = (CompletableFuture) ofNullable.get();
                Object payload = deserializeMessage.getPayload();
                if (payload instanceof Throwable) {
                    completableFuture.completeExceptionally((Throwable) payload);
                } else {
                    completableFuture.complete(payload);
                }
                ObjectUtils.ifTrue(log.isDebugEnabled(), () -> {
                    log.debug("Remaining outstanding requests: {}", Integer.valueOf(this.outstandingRequests.size()));
                });
            } else {
                log.warn("Received result for an unknown request {}", str);
            }
        }
    }

    public void start() {
        if (this.registration == null) {
            this.registration = Processor.startMultiple(this.name, this.threads, this.consumerService, this::handle);
        }
    }

    public void shutDown() {
        Optional.ofNullable(this.registration).ifPresent((v0) -> {
            v0.cancel();
        });
        this.registration = null;
    }
}
