package org.eclipse.ditto.services.models.acks;

import akka.actor.ActorContext;
import akka.actor.ActorRef;
import akka.japi.pf.PFBuilder;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import org.eclipse.ditto.model.base.acks.AbstractCommandAckRequestSetter;
import org.eclipse.ditto.model.base.common.ConditionChecker;
import org.eclipse.ditto.model.base.exceptions.DittoHeaderInvalidException;
import org.eclipse.ditto.model.base.headers.DittoHeaderDefinition;
import org.eclipse.ditto.model.base.headers.DittoHeaders;
import org.eclipse.ditto.protocoladapter.HeaderTranslator;
import org.eclipse.ditto.services.models.acks.config.AcknowledgementConfig;
import org.eclipse.ditto.signals.base.Signal;
import org.eclipse.ditto.signals.commands.messages.MessageCommand;
import org.eclipse.ditto.signals.commands.things.ThingCommand;
import org.eclipse.ditto.signals.commands.things.modify.ThingModifyCommand;
import scala.PartialFunction;

/* loaded from: input_file:org/eclipse/ditto/services/models/acks/AcknowledgementAggregatorActorStarter.class */
public final class AcknowledgementAggregatorActorStarter {
    protected final ActorContext actorContext;
    protected final AcknowledgementConfig acknowledgementConfig;
    protected final HeaderTranslator headerTranslator;
    protected final PartialFunction<Signal<?>, Signal<?>> ackRequestSetter;
    private int childCounter = 0;

    private AcknowledgementAggregatorActorStarter(ActorContext actorContext, AcknowledgementConfig acknowledgementConfig, HeaderTranslator headerTranslator, PartialFunction<Signal<?>, Signal<?>> partialFunction) {
        this.actorContext = (ActorContext) ConditionChecker.checkNotNull(actorContext, "context");
        this.ackRequestSetter = partialFunction;
        this.acknowledgementConfig = (AcknowledgementConfig) ConditionChecker.checkNotNull(acknowledgementConfig, "acknowledgementConfig");
        this.headerTranslator = (HeaderTranslator) ConditionChecker.checkNotNull(headerTranslator, "headerTranslator");
    }

    public static AcknowledgementAggregatorActorStarter of(ActorContext actorContext, AcknowledgementConfig acknowledgementConfig, HeaderTranslator headerTranslator, AbstractCommandAckRequestSetter<?>... abstractCommandAckRequestSetterArr) {
        return new AcknowledgementAggregatorActorStarter(actorContext, acknowledgementConfig, headerTranslator, buildAckRequestSetter(abstractCommandAckRequestSetterArr));
    }

    public <T> T start(Signal<?> signal, Function<Object, T> function, BiFunction<Signal<?>, ActorRef, T> biFunction, Function<Signal<?>, T> function2) {
        return (T) preprocess(signal, (signal2, bool) -> {
            if (!bool.booleanValue()) {
                return function2.apply(signal2);
            }
            Objects.requireNonNull(function);
            return doStart(signal2, function::apply, actorRef -> {
                return biFunction.apply(signal2, actorRef);
            });
        }, function);
    }

    public <T> T preprocess(Signal<?> signal, BiFunction<Signal<?>, Boolean, T> biFunction, Function<? super DittoHeaderInvalidException, T> function) {
        Signal signal2 = (Signal) this.ackRequestSetter.apply(signal);
        return (T) getDittoHeaderInvalidException(signal2).map(function).orElseGet(() -> {
            return biFunction.apply(signal2, Boolean.valueOf(shouldStartForIncoming(signal2)));
        });
    }

    public <T> T doStart(Signal<?> signal, Consumer<Object> consumer, Function<ActorRef, T> function) {
        return function.apply(startAckAggregatorActor(signal, consumer));
    }

    private ActorRef startAckAggregatorActor(Signal<?> signal, Consumer<Object> consumer) {
        return this.actorContext.actorOf(AcknowledgementAggregatorActor.props(signal, this.acknowledgementConfig, this.headerTranslator, consumer), getNextActorName(signal));
    }

    private String getNextActorName(Signal<?> signal) {
        String str = (String) signal.getDittoHeaders().getCorrelationId().map(str2 -> {
            return URLEncoder.encode(str2, StandardCharsets.UTF_8);
        }).orElse("_");
        int i = this.childCounter;
        this.childCounter = i + 1;
        return String.format("ackr%x-%s", Integer.valueOf(i), str);
    }

    private static PartialFunction<Signal<?>, Signal<?>> buildAckRequestSetter(AbstractCommandAckRequestSetter<?>... abstractCommandAckRequestSetterArr) {
        PFBuilder pFBuilder = new PFBuilder();
        for (AbstractCommandAckRequestSetter<?> abstractCommandAckRequestSetter : abstractCommandAckRequestSetterArr) {
            Class matchedClass = abstractCommandAckRequestSetter.getMatchedClass();
            Objects.requireNonNull(abstractCommandAckRequestSetter);
            pFBuilder = pFBuilder.match(matchedClass, abstractCommandAckRequestSetter::isApplicable, withDittoHeaders -> {
                return abstractCommandAckRequestSetter.apply(withDittoHeaders);
            });
        }
        return pFBuilder.matchAny(signal -> {
            return signal;
        }).build();
    }

    private static Optional<DittoHeaderInvalidException> getDittoHeaderInvalidException(Signal<?> signal) {
        DittoHeaders dittoHeaders = signal.getDittoHeaders();
        return ((Boolean) dittoHeaders.getTimeout().map((v0) -> {
            return v0.isZero();
        }).orElse(false)).booleanValue() && (dittoHeaders.isResponseRequired() || !dittoHeaders.getAcknowledgementRequests().isEmpty()) ? Optional.of(DittoHeaderInvalidException.newCustomMessageBuilder(String.format("The value of the header '%s' must not be zero if response or acknowledgements are requested.", DittoHeaderDefinition.TIMEOUT.getKey())).description("Please provide a positive timeout.").dittoHeaders(dittoHeaders).build()) : Optional.empty();
    }

    static boolean shouldStartForIncoming(Signal<?> signal) {
        boolean isLiveSignal = AcknowledgementForwarderActorStarter.isLiveSignal(signal);
        Set acknowledgementRequests = signal.getDittoHeaders().getAcknowledgementRequests();
        if ((signal instanceof ThingModifyCommand) && !isLiveSignal) {
            return acknowledgementRequests.stream().anyMatch(AcknowledgementForwarderActorStarter::isNotLiveResponse);
        }
        if ((signal instanceof MessageCommand) || (isLiveSignal && (signal instanceof ThingCommand))) {
            return acknowledgementRequests.stream().anyMatch(AcknowledgementForwarderActorStarter::isNotTwinPersisted);
        }
        return false;
    }
}
