package org.eclipse.ditto.services.connectivity.messaging;

import akka.ConfigurationException;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.cluster.pubsub.DistributedPubSubMediator;
import akka.event.DiagnosticLoggingAdapter;
import akka.japi.pf.ReceiveBuilder;
import akka.persistence.AbstractPersistentActor;
import akka.persistence.RecoveryCompleted;
import akka.persistence.SnapshotOffer;
import java.util.HashSet;
import java.util.Set;
import java.util.function.Consumer;
import org.eclipse.ditto.model.base.headers.DittoHeaders;
import org.eclipse.ditto.model.connectivity.ConnectionStatus;
import org.eclipse.ditto.services.connectivity.messaging.persistence.MongoReconnectSnapshotAdapter;
import org.eclipse.ditto.services.utils.akka.LogUtil;
import org.eclipse.ditto.services.utils.persistence.SnapshotAdapter;
import org.eclipse.ditto.signals.commands.connectivity.ConnectivityCommandResponse;
import org.eclipse.ditto.signals.commands.connectivity.query.RetrieveConnectionStatus;
import org.eclipse.ditto.signals.commands.connectivity.query.RetrieveConnectionStatusResponse;
import org.eclipse.ditto.signals.events.base.Event;
import org.eclipse.ditto.signals.events.connectivity.ConnectionClosed;
import org.eclipse.ditto.signals.events.connectivity.ConnectionCreated;
import org.eclipse.ditto.signals.events.connectivity.ConnectionDeleted;
import org.eclipse.ditto.signals.events.connectivity.ConnectionModified;
import org.eclipse.ditto.signals.events.connectivity.ConnectionOpened;

/* loaded from: input_file:org/eclipse/ditto/services/connectivity/messaging/ReconnectActor.class */
public final class ReconnectActor extends AbstractPersistentActor {
    public static final String ACTOR_NAME = "reconnect";
    private final ActorRef connectionShardRegion;
    private final SnapshotAdapter<Set<String>> snapshotAdapter;
    private final Set<String> connectionIds;
    private final DiagnosticLoggingAdapter log = LogUtil.obtain(this);
    private boolean snapshotInProgress = false;
    private long lastSnapshotSequenceNr = -1;
    private final long snapshotThreshold = getContext().system().settings().config().getLong("ditto.connectivity.reconnect.snapshot.threshold");

    /* renamed from: org.eclipse.ditto.services.connectivity.messaging.ReconnectActor$1, reason: invalid class name */
    /* loaded from: input_file:org/eclipse/ditto/services/connectivity/messaging/ReconnectActor$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$eclipse$ditto$model$connectivity$ConnectionStatus = new int[ConnectionStatus.values().length];

        static {
            try {
                $SwitchMap$org$eclipse$ditto$model$connectivity$ConnectionStatus[ConnectionStatus.OPEN.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$eclipse$ditto$model$connectivity$ConnectionStatus[ConnectionStatus.CLOSED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    private ReconnectActor(ActorRef actorRef, ActorRef actorRef2) {
        this.connectionShardRegion = actorRef;
        if (this.snapshotThreshold < 0) {
            throw new ConfigurationException(String.format("Config setting '%s' must be positive, but is: %d.", "ditto.connectivity.reconnect.snapshot.threshold", Long.valueOf(this.snapshotThreshold)));
        }
        this.snapshotAdapter = new MongoReconnectSnapshotAdapter(getContext().system());
        this.connectionIds = new HashSet();
        actorRef2.tell(new DistributedPubSubMediator.Subscribe("connectivity.events:connectionCreated", ACTOR_NAME, getSelf()), getSelf());
        actorRef2.tell(new DistributedPubSubMediator.Subscribe("connectivity.events:connectionModified", ACTOR_NAME, getSelf()), getSelf());
        actorRef2.tell(new DistributedPubSubMediator.Subscribe("connectivity.events:connectionOpened", ACTOR_NAME, getSelf()), getSelf());
        actorRef2.tell(new DistributedPubSubMediator.Subscribe("connectivity.events:connectionClosed", ACTOR_NAME, getSelf()), getSelf());
        actorRef2.tell(new DistributedPubSubMediator.Subscribe("connectivity.events:connectionDeleted", ACTOR_NAME, getSelf()), getSelf());
    }

    public static Props props(ActorRef actorRef, ActorRef actorRef2) {
        return Props.create(ReconnectActor.class, new Object[]{actorRef, actorRef2});
    }

    public String persistenceId() {
        return ACTOR_NAME;
    }

    public String journalPluginId() {
        return "akka-contrib-mongodb-persistence-reconnect-journal";
    }

    public String snapshotPluginId() {
        return "akka-contrib-mongodb-persistence-reconnect-snapshots";
    }

    public AbstractActor.Receive createReceiveRecover() {
        return ReceiveBuilder.create().match(SnapshotOffer.class, snapshotOffer -> {
            Set set = (Set) this.snapshotAdapter.fromSnapshotStore(snapshotOffer);
            this.log.info("Received SnapshotOffer containing connectionIds: <{}>", set);
            if (set != null) {
                this.connectionIds.clear();
                this.connectionIds.addAll(set);
            }
            this.lastSnapshotSequenceNr = snapshotOffer.metadata().sequenceNr();
        }).match(ConnectionCreated.class, connectionCreated -> {
            this.connectionIds.add(connectionCreated.getConnectionId());
        }).match(ConnectionModified.class, this::handleConnectionModified).match(ConnectionOpened.class, connectionOpened -> {
            this.connectionIds.add(connectionOpened.getConnectionId());
        }).match(ConnectionClosed.class, connectionClosed -> {
            this.connectionIds.remove(connectionClosed.getConnectionId());
        }).match(ConnectionDeleted.class, connectionDeleted -> {
            this.connectionIds.remove(connectionDeleted.getConnectionId());
        }).match(RecoveryCompleted.class, recoveryCompleted -> {
            this.connectionIds.forEach(this::reconnect);
        }).matchAny(obj -> {
            this.log.warning("Unknown recover message: {}", obj);
        }).build();
    }

    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().match(RetrieveConnectionStatusResponse.class, retrieveConnectionStatusResponse -> {
            if (ConnectionStatus.OPEN.equals(retrieveConnectionStatusResponse.getConnectionStatus())) {
                return;
            }
            this.connectionIds.remove(retrieveConnectionStatusResponse.getConnectionId());
        }).match(ConnectionCreated.class, connectionCreated -> {
            persistEvent(connectionCreated, connectionCreated -> {
                this.connectionIds.add(connectionCreated.getConnectionId());
            });
        }).match(ConnectionModified.class, this::handleConnectionModified).match(ConnectionOpened.class, connectionOpened -> {
            persistEvent(connectionOpened, connectionOpened -> {
                this.connectionIds.add(connectionOpened.getConnectionId());
            });
        }).match(ConnectionClosed.class, connectionClosed -> {
            persistEvent(connectionClosed, connectionClosed -> {
                this.connectionIds.remove(connectionClosed.getConnectionId());
            });
        }).match(ConnectionDeleted.class, connectionDeleted -> {
            persistEvent(connectionDeleted, connectionDeleted -> {
                this.connectionIds.remove(connectionDeleted.getConnectionId());
            });
        }).match(ConnectivityCommandResponse.class, connectivityCommandResponse -> {
            this.log.info("Received CommandResponse: <{}>", connectivityCommandResponse);
        }).match(DistributedPubSubMediator.SubscribeAck.class, subscribeAck -> {
            this.log.debug("Successfully subscribed to distributed pub/sub on topic '{}'", subscribeAck.subscribe().topic());
        }).matchAny(obj -> {
            this.log.warning("Unknown message: {}", obj);
            unhandled(obj);
        }).build();
    }

    private void handleConnectionModified(ConnectionModified connectionModified) {
        switch (AnonymousClass1.$SwitchMap$org$eclipse$ditto$model$connectivity$ConnectionStatus[connectionModified.getConnection().getConnectionStatus().ordinal()]) {
            case 1:
                this.connectionIds.add(connectionModified.getConnectionId());
                return;
            case 2:
                this.connectionIds.remove(connectionModified.getConnectionId());
                return;
            default:
                return;
        }
    }

    private <E extends Event> void persistEvent(E e, Consumer<E> consumer) {
        persist(e, event -> {
            this.log.info("Successfully persisted Event '{}'", event.getType());
            consumer.accept(event);
            if (lastSequenceNr() - this.lastSnapshotSequenceNr > this.snapshotThreshold) {
                doSaveSnapshot();
            }
        });
    }

    private void reconnect(String str) {
        this.connectionShardRegion.tell(RetrieveConnectionStatus.of(str, DittoHeaders.newBuilder().correlationId("reconnect-actor-triggered").build()), getSelf());
    }

    private void doSaveSnapshot() {
        if (this.snapshotInProgress) {
            this.log.debug("Already requested taking a Snapshot - not doing it again");
            return;
        }
        this.snapshotInProgress = true;
        this.log.info("Attempting to save Snapshot for '{}' ..", this.connectionIds);
        saveSnapshot(this.snapshotAdapter.toSnapshotStore(this.connectionIds));
    }
}
