package org.eclipse.ditto.services.concierge.enforcement;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.japi.pf.ReceiveBuilder;
import akka.pattern.Patterns;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.eclipse.ditto.model.base.entity.id.EntityId;
import org.eclipse.ditto.model.base.headers.WithDittoHeaders;
import org.eclipse.ditto.services.utils.akka.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.services.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.services.utils.akka.logging.ThreadSafeDittoLoggingAdapter;
import org.eclipse.ditto.services.utils.metrics.DittoMetrics;
import org.eclipse.ditto.services.utils.metrics.instruments.counter.Counter;

/* loaded from: input_file:org/eclipse/ditto/services/concierge/enforcement/EnforcementScheduler.class */
final class EnforcementScheduler extends AbstractActor {
    static final String ACTOR_NAME = "scheduler";
    private final Map<EntityId, Futures> futuresMap = new HashMap();
    private final DittoDiagnosticLoggingAdapter log = DittoLoggerFactory.getDiagnosticLoggingAdapter(this);
    private final Counter scheduledEnforcementTasks = DittoMetrics.counter("scheduled_enforcement_tasks");
    private final Counter completedEnforcementTasks = DittoMetrics.counter("completed_enforcement_tasks");

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/eclipse/ditto/services/concierge/enforcement/EnforcementScheduler$FutureComplete.class */
    public static final class FutureComplete {
        private final EntityId entityId;

        @Nullable
        private final Throwable error;

        private FutureComplete(EntityId entityId, @Nullable Throwable th) {
            this.entityId = entityId;
            this.error = th;
        }

        private static FutureComplete of(EntityId entityId, @Nullable Throwable th) {
            return new FutureComplete(entityId, th);
        }

        private Optional<Throwable> getError() {
            return Optional.ofNullable(this.error);
        }

        public String toString() {
            return getClass().getSimpleName() + "[entityId=" + this.entityId + "]";
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/eclipse/ditto/services/concierge/enforcement/EnforcementScheduler$Futures.class */
    public static final class Futures {
        private static final Futures INITIAL_FUTURES = new Futures(CompletableFuture.completedStage(null), CompletableFuture.completedStage(null), 0);
        private final CompletionStage<?> beforeStartFuture;
        private final CompletionStage<?> beforeCompleteFuture;
        private final int referenceCount;

        private Futures(CompletionStage<?> completionStage, CompletionStage<?> completionStage2, int i) {
            this.beforeStartFuture = completionStage;
            this.beforeCompleteFuture = completionStage2;
            this.referenceCount = i;
        }

        private static Futures initial() {
            return INITIAL_FUTURES;
        }

        private Futures appendBeforeStartFuture(CompletionStage<?> completionStage) {
            return new Futures(completionStage, completionStage, this.referenceCount + 1);
        }

        private Futures appendBeforeCompleteFuture(CompletionStage<?> completionStage) {
            return new Futures(this.beforeStartFuture, completionStage, this.referenceCount + 1);
        }

        @Nullable
        private Futures onComplete() {
            int i = this.referenceCount - 1;
            if (i <= 0) {
                return null;
            }
            return new Futures(this.beforeStartFuture, this.beforeCompleteFuture, i);
        }

        public String toString() {
            return String.format("%d Futures", Integer.valueOf(this.referenceCount));
        }
    }

    private EnforcementScheduler() {
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Props props() {
        return Props.create(EnforcementScheduler.class, new Object[0]);
    }

    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().match(EnforcementTask.class, this::scheduleEnforcement).match(FutureComplete.class, this::futureComplete).matchAny(obj -> {
            this.log.warning("UnknownMessage <{}>", obj);
        }).build();
    }

    private void scheduleEnforcement(EnforcementTask enforcementTask) {
        this.futuresMap.compute(enforcementTask.getEntityId(), (entityId, futures) -> {
            if (entityId.isDummy()) {
                this.log.error("EnforcementTaskWithoutEntityId <{}>", enforcementTask);
                return null;
            }
            this.log.debug("Scheduling <{}> at <{}>", enforcementTask, futures);
            return scheduleTaskAfter(futures != null ? futures : Futures.initial(), enforcementTask);
        });
        this.scheduledEnforcementTasks.increment();
    }

    private void futureComplete(FutureComplete futureComplete) {
        this.log.debug("Got <{}>", futureComplete);
        futureComplete.getError().ifPresent(th -> {
            this.log.error(th, "FutureFailed <{}>", futureComplete);
        });
        this.futuresMap.computeIfPresent(futureComplete.entityId, (entityId, futures) -> {
            this.log.debug("Reducing reference count <{}>", futures);
            return futures.onComplete();
        });
        this.completedEnforcementTasks.increment();
    }

    private Void dispatchEnforcedMessage(Contextual<?> contextual) {
        Optional<?> messageOptional = contextual.getMessageOptional();
        if (!messageOptional.isPresent()) {
            contextual.getLog().debug("Not dispatching due to lack of message: {}", contextual);
            return null;
        }
        WithDittoHeaders withDittoHeaders = (WithDittoHeaders) messageOptional.get();
        ThreadSafeDittoLoggingAdapter withCorrelationId = contextual.getLog().withCorrelationId(withDittoHeaders);
        Optional<ActorRef> receiver = contextual.getReceiver();
        Optional<Supplier<CompletionStage<Object>>> askFuture = contextual.getAskFuture();
        if (askFuture.isPresent() && receiver.isPresent()) {
            ActorRef actorRef = receiver.get();
            withCorrelationId.debug("About to pipe contextual message <{}> after ask-step to receiver: <{}>", withDittoHeaders, actorRef);
            Patterns.pipe(askFuture.get().get(), getContext().dispatcher()).to(actorRef);
        } else if (receiver.isPresent()) {
            ActorRef actorRef2 = receiver.get();
            Object apply = contextual.getReceiverWrapperFunction().apply(withDittoHeaders);
            withCorrelationId.debug("About to send contextual message <{}> to receiver: <{}>", apply, actorRef2);
            actorRef2.tell(apply, contextual.getSender());
        } else {
            withCorrelationId.debug("No receiver found in Contextual - as a result just ignoring it: <{}>", contextual);
        }
        withCorrelationId.discardCorrelationId();
        return null;
    }

    private Futures scheduleTaskAfter(Futures futures, EnforcementTask enforcementTask) {
        CompletionStage<?> handle = futures.beforeStartFuture.thenCompose(obj -> {
            return futures.beforeCompleteFuture.thenCombine(enforcementTask.start(), (obj, contextual) -> {
                return dispatchEnforcedMessage(contextual);
            });
        }).handle((r6, th) -> {
            return sendFutureComplete(enforcementTask, th);
        });
        return enforcementTask.changesAuthorization() ? futures.appendBeforeStartFuture(handle) : futures.appendBeforeCompleteFuture(handle);
    }

    private Void sendFutureComplete(EnforcementTask enforcementTask, @Nullable Throwable th) {
        getSelf().tell(FutureComplete.of(enforcementTask.getEntityId(), th), ActorRef.noSender());
        return null;
    }
}
