package org.eclipse.ditto.internal.utils.search;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.actor.ReceiveTimeout;
import akka.japi.pf.ReceiveBuilder;
import java.time.Duration;
import org.eclipse.ditto.base.model.exceptions.DittoInternalErrorException;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.internal.utils.akka.actors.AbstractActorWithStashWithTimers;
import org.eclipse.ditto.internal.utils.akka.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.json.JsonArray;
import org.eclipse.ditto.thingsearch.model.signals.commands.ThingSearchCommand;
import org.eclipse.ditto.thingsearch.model.signals.commands.exceptions.SubscriptionProtocolErrorException;
import org.eclipse.ditto.thingsearch.model.signals.commands.exceptions.SubscriptionTimeoutException;
import org.eclipse.ditto.thingsearch.model.signals.commands.subscription.CancelSubscription;
import org.eclipse.ditto.thingsearch.model.signals.commands.subscription.RequestFromSubscription;
import org.eclipse.ditto.thingsearch.model.signals.events.SubscriptionComplete;
import org.eclipse.ditto.thingsearch.model.signals.events.SubscriptionCreated;
import org.eclipse.ditto.thingsearch.model.signals.events.SubscriptionFailed;
import org.eclipse.ditto.thingsearch.model.signals.events.SubscriptionHasNextPage;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

/* loaded from: input_file:org/eclipse/ditto/internal/utils/search/SubscriptionActor.class */
public final class SubscriptionActor extends AbstractActorWithStashWithTimers {
    private static final Duration ZOMBIE_LIFETIME = Duration.ofSeconds(10);
    private final DittoDiagnosticLoggingAdapter log = DittoLoggerFactory.getDiagnosticLoggingAdapter(this);
    private Subscription subscription;
    private ActorRef sender;
    private DittoHeaders dittoHeaders;

    /* loaded from: input_file:org/eclipse/ditto/internal/utils/search/SubscriptionActor$SubscriberOps.class */
    private static final class SubscriberOps implements Subscriber<JsonArray> {
        private final ActorRef subscriptionActor;
        private final String subscriptionId;

        private SubscriberOps(ActorRef actorRef) {
            this.subscriptionActor = actorRef;
            this.subscriptionId = actorRef.path().name();
        }

        public void onSubscribe(Subscription subscription) {
            this.subscriptionActor.tell(subscription, ActorRef.noSender());
        }

        public void onNext(JsonArray jsonArray) {
            this.subscriptionActor.tell(SubscriptionHasNextPage.of(this.subscriptionId, jsonArray, DittoHeaders.empty()), ActorRef.noSender());
        }

        public void onError(Throwable th) {
            this.subscriptionActor.tell(SubscriptionFailed.of(this.subscriptionId, DittoRuntimeException.asDittoRuntimeException(th, th2 -> {
                return th2 instanceof IllegalArgumentException ? SubscriptionProtocolErrorException.of(th2, DittoHeaders.empty()) : DittoInternalErrorException.newBuilder().cause(th2).build();
            }), DittoHeaders.empty()), ActorRef.noSender());
        }

        public void onComplete() {
            this.subscriptionActor.tell(SubscriptionComplete.of(this.subscriptionId, DittoHeaders.empty()), ActorRef.noSender());
        }
    }

    SubscriptionActor(Duration duration, ActorRef actorRef, DittoHeaders dittoHeaders) {
        this.sender = actorRef;
        this.dittoHeaders = dittoHeaders;
        getContext().setReceiveTimeout(duration);
    }

    public static Props props(Duration duration, ActorRef actorRef, DittoHeaders dittoHeaders) {
        return Props.create(SubscriptionActor.class, new Object[]{duration, actorRef, dittoHeaders});
    }

    public static Subscriber<JsonArray> asSubscriber(ActorRef actorRef) {
        return new SubscriberOps(actorRef);
    }

    public void postStop() {
        if (this.subscription != null) {
            this.subscription.cancel();
        }
    }

    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().match(RequestFromSubscription.class, this::requestSubscription).match(CancelSubscription.class, this::cancelSubscription).match(SubscriptionHasNextPage.class, this::subscriptionHasNext).match(SubscriptionComplete.class, this::subscriptionComplete).match(SubscriptionFailed.class, this::subscriptionFailed).match(Subscription.class, this::onSubscribe).matchEquals(ReceiveTimeout.getInstance(), (v1) -> {
            idleTimeout(v1);
        }).build();
    }

    private AbstractActor.Receive createZombieBehavior() {
        return ReceiveBuilder.create().match(RequestFromSubscription.class, requestFromSubscription -> {
            this.log.withCorrelationId(requestFromSubscription).info("Rejecting RequestSubscription[demand={}] as zombie", Long.valueOf(requestFromSubscription.getDemand()));
            getSender().tell(SubscriptionFailed.of(getSubscriptionId(), SubscriptionProtocolErrorException.newBuilder().message("This subscription is considered cancelled. No more messages are processed.").build(), requestFromSubscription.getDittoHeaders()), ActorRef.noSender());
        }).matchAny(obj -> {
            this.log.debug("Ignoring as zombie: <{}>", obj);
        }).build();
    }

    private void idleTimeout(ReceiveTimeout receiveTimeout) {
        this.log.info("Stopping due to idle timeout");
        getContext().cancelReceiveTimeout();
        String subscriptionId = getSubscriptionId();
        SubscriptionFailed of = SubscriptionFailed.of(subscriptionId, SubscriptionTimeoutException.of(subscriptionId, this.dittoHeaders), this.dittoHeaders);
        if (this.subscription == null) {
            this.sender.tell(getSubscriptionCreated(), ActorRef.noSender());
        }
        this.sender.tell(of, ActorRef.noSender());
        becomeZombie();
    }

    private void onSubscribe(Subscription subscription) {
        if (this.subscription != null) {
            subscription.cancel();
            return;
        }
        this.subscription = subscription;
        this.sender.tell(getSubscriptionCreated(), ActorRef.noSender());
        unstashAll();
    }

    private SubscriptionCreated getSubscriptionCreated() {
        return SubscriptionCreated.of(getSubscriptionId(), this.dittoHeaders);
    }

    private void setSenderAndDittoHeaders(ThingSearchCommand<?> thingSearchCommand) {
        this.sender = getSender();
        this.dittoHeaders = thingSearchCommand.getDittoHeaders();
    }

    private void requestSubscription(RequestFromSubscription requestFromSubscription) {
        if (this.subscription == null) {
            this.log.withCorrelationId(requestFromSubscription).debug("Stashing <{}>", requestFromSubscription);
            stash();
        } else {
            this.log.withCorrelationId(requestFromSubscription).debug("Processing <{}>", requestFromSubscription);
            setSenderAndDittoHeaders(requestFromSubscription);
            this.subscription.request(requestFromSubscription.getDemand());
        }
    }

    private void cancelSubscription(CancelSubscription cancelSubscription) {
        if (this.subscription == null) {
            this.log.withCorrelationId(cancelSubscription).info("Stashing <{}>", cancelSubscription);
            stash();
        } else {
            this.log.withCorrelationId(cancelSubscription).info("Processing <{}>", cancelSubscription);
            setSenderAndDittoHeaders(cancelSubscription);
            this.subscription.cancel();
            becomeZombie();
        }
    }

    private void subscriptionHasNext(SubscriptionHasNextPage subscriptionHasNextPage) {
        this.log.debug("Forwarding {}", subscriptionHasNextPage);
        this.sender.tell(subscriptionHasNextPage.setDittoHeaders(this.dittoHeaders), ActorRef.noSender());
    }

    private void subscriptionComplete(SubscriptionComplete subscriptionComplete) {
        if (this.subscription == null) {
            this.log.withCorrelationId(subscriptionComplete).debug("Stashing <{}>", subscriptionComplete);
            stash();
        } else {
            this.log.info("{}", subscriptionComplete);
            this.sender.tell(subscriptionComplete.setDittoHeaders(this.dittoHeaders), ActorRef.noSender());
            becomeZombie();
        }
    }

    private void subscriptionFailed(SubscriptionFailed subscriptionFailed) {
        if (this.subscription == null) {
            this.log.withCorrelationId(subscriptionFailed).debug("Stashing <{}>", subscriptionFailed);
            stash();
        } else {
            this.log.withCorrelationId(subscriptionFailed).info("{}", subscriptionFailed);
            this.sender.tell(subscriptionFailed.setDittoHeaders(this.dittoHeaders), ActorRef.noSender());
            becomeZombie();
        }
    }

    private void becomeZombie() {
        getTimers().startSingleTimer(PoisonPill.getInstance(), PoisonPill.getInstance(), ZOMBIE_LIFETIME);
        getContext().become(createZombieBehavior());
    }

    private String getSubscriptionId() {
        return getSelf().path().name();
    }
}
