package org.eclipse.ditto.internal.utils.pubsub.actors;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.OneForOneStrategy;
import akka.actor.Props;
import akka.actor.SupervisorStrategy;
import akka.actor.Terminated;
import akka.cluster.Cluster;
import akka.japi.pf.DeciderBuilder;
import akka.japi.pf.ReceiveBuilder;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.internal.utils.pubsub.DistributedAcks;
import org.eclipse.ditto.internal.utils.pubsub.PubSubFactory;
import org.eclipse.ditto.internal.utils.pubsub.api.LocalAcksChanged;
import org.eclipse.ditto.internal.utils.pubsub.api.PublishSignal;
import org.eclipse.ditto.internal.utils.pubsub.config.PubSubConfig;
import org.eclipse.ditto.internal.utils.pubsub.ddata.SubscriptionsReader;
import org.eclipse.ditto.internal.utils.pubsub.extractors.AckExtractor;
import org.eclipse.ditto.internal.utils.pubsub.extractors.PubSubTopicExtractor;

/* loaded from: input_file:org/eclipse/ditto/internal/utils/pubsub/actors/Subscriber.class */
public final class Subscriber<T extends Signal<?>> extends AbstractSubscriber<T> {
    public static final String ACTOR_NAME_PREFIX = "subscriber";
    private final List<ActorRef> subSubscribers;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/eclipse/ditto/internal/utils/pubsub/actors/Subscriber$Control.class */
    public enum Control {
        RECEIVE_LOCAL_DECLARED_ACKS
    }

    private Subscriber(Class<T> cls, PubSubTopicExtractor<T> pubSubTopicExtractor, AckExtractor<T> ackExtractor, DistributedAcks distributedAcks) {
        super(cls, pubSubTopicExtractor, ackExtractor, distributedAcks);
        this.subSubscribers = new ArrayList(distributedAcks.getConfig().getNumberOfShards());
        distributedAcks.receiveLocalDeclaredAcks(getSelf());
    }

    public static <T> Props props(Class<T> cls, PubSubTopicExtractor<T> pubSubTopicExtractor, AckExtractor<T> ackExtractor, DistributedAcks distributedAcks) {
        return Props.create(Subscriber.class, new Object[]{cls, pubSubTopicExtractor, ackExtractor, distributedAcks});
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static ActorSelection chooseSubscriber(ActorRef actorRef, PublishSignal publishSignal, int i) {
        int hashForPubSub;
        return (i <= 1 || (hashForPubSub = PubSubFactory.hashForPubSub(publishSignal.getGroupIndexKey()) % i) <= 0) ? ActorSelection.apply(actorRef, "") : ActorSelection.apply(actorRef, String.valueOf(hashForPubSub));
    }

    @Override // org.eclipse.ditto.internal.utils.pubsub.actors.AbstractSubscriber
    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().match(PublishSignal.class, this::broadcastToLocalSubscribers).match(SubscriptionsReader.class, this::updateLocalSubscriptions).match(LocalAcksChanged.class, this::updateLocalAcks).match(Terminated.class, this::terminated).matchEquals(ActorEvent.ACK_UPDATER_NOT_AVAILABLE, this::scheduleReceiveLocalDeclaredAcks).matchEquals(Control.RECEIVE_LOCAL_DECLARED_ACKS, this::receiveLocalDeclaredAcks).build();
    }

    public SupervisorStrategy supervisorStrategy() {
        return new OneForOneStrategy(DeciderBuilder.matchAny(th -> {
            this.logger.error("Unknown error:'{}'! Escalating!", th);
            return SupervisorStrategy.escalate();
        }).build());
    }

    public void preStart() throws Exception {
        super.preStart();
        int subscriberPoolSize = this.distributedAcks.getConfig().getSubscriberPoolSize();
        if (subscriberPoolSize > 1) {
            Props props = SubSubscriber.props(this.messageClass, this.topicExtractor, this.ackExtractor, this.distributedAcks);
            for (int i = 1; i < subscriberPoolSize; i++) {
                this.subSubscribers.add(getContext().actorOf(props, String.valueOf(i)));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // org.eclipse.ditto.internal.utils.pubsub.actors.AbstractSubscriber
    public void updateLocalSubscriptions(SubscriptionsReader subscriptionsReader) {
        super.updateLocalSubscriptions(subscriptionsReader);
        Iterator<ActorRef> it = this.subSubscribers.iterator();
        while (it.hasNext()) {
            it.next().forward(subscriptionsReader, getContext());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // org.eclipse.ditto.internal.utils.pubsub.actors.AbstractSubscriber
    public void updateLocalAcks(LocalAcksChanged localAcksChanged) {
        super.updateLocalAcks(localAcksChanged);
        getContext().watch(this.ackUpdater);
        Iterator<ActorRef> it = this.subSubscribers.iterator();
        while (it.hasNext()) {
            it.next().forward(localAcksChanged, getContext());
        }
    }

    private void scheduleReceiveLocalDeclaredAcks(ActorEvent actorEvent) {
        if (timers().isTimerActive(Control.RECEIVE_LOCAL_DECLARED_ACKS)) {
            return;
        }
        timers().startSingleTimer(Control.RECEIVE_LOCAL_DECLARED_ACKS, Control.RECEIVE_LOCAL_DECLARED_ACKS, getRestartDelayWithBuffer());
    }

    private void receiveLocalDeclaredAcks(Control control) {
        this.distributedAcks.receiveLocalDeclaredAcks(getSelf());
    }

    private void terminated(Terminated terminated) {
        if (Cluster.get(getContext().getSystem()).isTerminated()) {
            this.logger.info("This cluster instance was terminated - no action required ..");
        } else if (terminated.getActor().equals(this.ackUpdater)) {
            this.logger.error("Notifying SubUpdater <{}> of AckUpdater termination: <{}>", this.subUpdater, terminated);
            if (this.subUpdater != null) {
                this.subUpdater.tell(ActorEvent.PUBSUB_TERMINATED, getSelf());
            }
            scheduleReceiveLocalDeclaredAcks(ActorEvent.ACK_UPDATER_NOT_AVAILABLE);
        }
    }

    private Duration getRestartDelayWithBuffer() {
        Duration restartDelay = PubSubConfig.of(getContext().getSystem()).getRestartDelay();
        return restartDelay.plus(restartDelay.dividedBy(4L));
    }
}
