package org.eclipse.ditto.services.amqpbridge.messaging;

import akka.ConfigurationException;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.cluster.pubsub.DistributedPubSubMediator;
import akka.event.DiagnosticLoggingAdapter;
import akka.japi.Creator;
import akka.japi.pf.ReceiveBuilder;
import akka.persistence.AbstractPersistentActor;
import akka.persistence.RecoveryCompleted;
import akka.persistence.SnapshotOffer;
import java.util.HashSet;
import java.util.Set;
import java.util.function.Consumer;
import org.eclipse.ditto.model.amqpbridge.ConnectionStatus;
import org.eclipse.ditto.model.base.headers.DittoHeaders;
import org.eclipse.ditto.services.amqpbridge.messaging.persistence.MongoReconnectSnapshotAdapter;
import org.eclipse.ditto.services.utils.akka.LogUtil;
import org.eclipse.ditto.services.utils.akka.persistence.SnapshotAdapter;
import org.eclipse.ditto.signals.commands.amqpbridge.query.RetrieveConnectionStatus;
import org.eclipse.ditto.signals.commands.amqpbridge.query.RetrieveConnectionStatusResponse;
import org.eclipse.ditto.signals.events.amqpbridge.ConnectionClosed;
import org.eclipse.ditto.signals.events.amqpbridge.ConnectionCreated;
import org.eclipse.ditto.signals.events.amqpbridge.ConnectionDeleted;
import org.eclipse.ditto.signals.events.amqpbridge.ConnectionOpened;
import org.eclipse.ditto.signals.events.base.Event;

/* loaded from: input_file:org/eclipse/ditto/services/amqpbridge/messaging/ReconnectActor.class */
public final class ReconnectActor extends AbstractPersistentActor {
    public static final String ACTOR_NAME = "reconnect";
    private final DiagnosticLoggingAdapter log;
    private final ActorRef connectionShardRegion;
    private final long snapshotThreshold;
    private final SnapshotAdapter<Set<String>> snapshotAdapter;
    private final Set<String> connectionIds;
    private boolean snapshotInProgress;
    private long lastSnapshotSequenceNr;

    private ReconnectActor(ActorRef actorRef, ActorRef actorRef2) {
        this.log = LogUtil.obtain(this);
        this.snapshotInProgress = false;
        this.lastSnapshotSequenceNr = -1L;
        this.connectionShardRegion = actorRef;
        this.snapshotThreshold = getContext().system().settings().config().getLong("ditto.amqp-bridge.reconnect.snapshot.threshold");
        if (this.snapshotThreshold < 0) {
            throw new ConfigurationException(String.format("Config setting '%s' must be positive, but is: %d.", "ditto.amqp-bridge.reconnect.snapshot.threshold", Long.valueOf(this.snapshotThreshold)));
        }
        this.snapshotAdapter = new MongoReconnectSnapshotAdapter(getContext().system());
        this.connectionIds = new HashSet();
        actorRef2.tell(new DistributedPubSubMediator.Subscribe("amqp.bridge.events:connectionCreated", ACTOR_NAME, getSelf()), getSelf());
        actorRef2.tell(new DistributedPubSubMediator.Subscribe("amqp.bridge.events:connectionOpened", ACTOR_NAME, getSelf()), getSelf());
        actorRef2.tell(new DistributedPubSubMediator.Subscribe("amqp.bridge.events:connectionClosed", ACTOR_NAME, getSelf()), getSelf());
        actorRef2.tell(new DistributedPubSubMediator.Subscribe("amqp.bridge.events:connectionDeleted", ACTOR_NAME, getSelf()), getSelf());
    }

    public static Props props(final ActorRef actorRef, final ActorRef actorRef2) {
        return Props.create(ReconnectActor.class, new Creator<ReconnectActor>() { // from class: org.eclipse.ditto.services.amqpbridge.messaging.ReconnectActor.1
            private static final long serialVersionUID = 1;

            /* renamed from: create, reason: merged with bridge method [inline-methods] */
            public ReconnectActor m6create() {
                return new ReconnectActor(actorRef, actorRef2);
            }
        });
    }

    public String persistenceId() {
        return ACTOR_NAME;
    }

    public String journalPluginId() {
        return "akka-contrib-mongodb-persistence-reconnect-journal";
    }

    public String snapshotPluginId() {
        return "akka-contrib-mongodb-persistence-reconnect-snapshots";
    }

    public AbstractActor.Receive createReceiveRecover() {
        return ReceiveBuilder.create().match(SnapshotOffer.class, snapshotOffer -> {
            Set set = (Set) this.snapshotAdapter.fromSnapshotStore(snapshotOffer);
            this.log.info("Received SnapshotOffer containing connectionIds: <{}>", set);
            if (set != null) {
                this.connectionIds.clear();
                this.connectionIds.addAll(set);
            }
            this.lastSnapshotSequenceNr = snapshotOffer.metadata().sequenceNr();
        }).match(ConnectionCreated.class, connectionCreated -> {
            this.connectionIds.add(connectionCreated.getConnectionId());
        }).match(ConnectionOpened.class, connectionOpened -> {
            this.connectionIds.add(connectionOpened.getConnectionId());
        }).match(ConnectionClosed.class, connectionClosed -> {
            this.connectionIds.remove(connectionClosed.getConnectionId());
        }).match(ConnectionDeleted.class, connectionDeleted -> {
            this.connectionIds.remove(connectionDeleted.getConnectionId());
        }).match(RecoveryCompleted.class, recoveryCompleted -> {
            this.connectionIds.forEach(this::reconnect);
        }).matchAny(obj -> {
            this.log.warning("Unknown recover message: {}", obj);
        }).build();
    }

    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().match(RetrieveConnectionStatusResponse.class, retrieveConnectionStatusResponse -> {
            if (ConnectionStatus.OPEN.equals(retrieveConnectionStatusResponse.getConnectionStatus())) {
                return;
            }
            this.connectionIds.remove(retrieveConnectionStatusResponse.getConnectionId());
        }).match(ConnectionCreated.class, connectionCreated -> {
            persistEvent(connectionCreated, connectionCreated -> {
                this.connectionIds.add(connectionCreated.getConnectionId());
            });
        }).match(ConnectionOpened.class, connectionOpened -> {
            persistEvent(connectionOpened, connectionOpened -> {
                this.connectionIds.add(connectionOpened.getConnectionId());
            });
        }).match(ConnectionClosed.class, connectionClosed -> {
            persistEvent(connectionClosed, connectionClosed -> {
                this.connectionIds.remove(connectionClosed.getConnectionId());
            });
        }).match(ConnectionDeleted.class, connectionDeleted -> {
            persistEvent(connectionDeleted, connectionDeleted -> {
                this.connectionIds.remove(connectionDeleted.getConnectionId());
            });
        }).match(DistributedPubSubMediator.SubscribeAck.class, subscribeAck -> {
            this.log.debug("Successfully subscribed to distributed pub/sub on topic '{}'", subscribeAck.subscribe().topic());
        }).matchAny(obj -> {
            this.log.warning("Unknown message: {}", obj);
            unhandled(obj);
        }).build();
    }

    private <E extends Event> void persistEvent(E e, Consumer<E> consumer) {
        persist(e, event -> {
            this.log.info("Successfully persisted Event '{}'", event.getType());
            consumer.accept(event);
            if (lastSequenceNr() - this.lastSnapshotSequenceNr > this.snapshotThreshold) {
                doSaveSnapshot();
            }
        });
    }

    private void reconnect(String str) {
        this.connectionShardRegion.tell(RetrieveConnectionStatus.of(str, DittoHeaders.empty()), getSelf());
    }

    private void doSaveSnapshot() {
        if (this.snapshotInProgress) {
            this.log.debug("Already requested taking a Snapshot - not doing it again");
            return;
        }
        this.snapshotInProgress = true;
        this.log.info("Attempting to save Snapshot for '{}' ..", this.connectionIds);
        saveSnapshot(this.snapshotAdapter.toSnapshotStore(this.connectionIds));
    }
}
