package org.eclipse.ditto.services.amqpbridge.messaging;

import akka.ConfigurationException;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Cancellable;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.actor.Status;
import akka.cluster.pubsub.DistributedPubSubMediator;
import akka.event.DiagnosticLoggingAdapter;
import akka.japi.Creator;
import akka.japi.pf.ReceiveBuilder;
import akka.persistence.AbstractPersistentActor;
import akka.persistence.RecoveryCompleted;
import akka.persistence.SnapshotOffer;
import java.util.Iterator;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import javax.jms.Connection;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.JMSRuntimeException;
import javax.jms.Session;
import javax.naming.NamingException;
import org.eclipse.ditto.model.amqpbridge.AmqpConnection;
import org.eclipse.ditto.model.amqpbridge.ConnectionStatus;
import org.eclipse.ditto.services.amqpbridge.messaging.ConnectionSupervisorActor;
import org.eclipse.ditto.services.amqpbridge.messaging.persistence.ConnectionData;
import org.eclipse.ditto.services.amqpbridge.messaging.persistence.MongoConnectionSnapshotAdapter;
import org.eclipse.ditto.services.utils.akka.LogUtil;
import org.eclipse.ditto.services.utils.akka.persistence.SnapshotAdapter;
import org.eclipse.ditto.signals.commands.amqpbridge.AmqpBridgeCommand;
import org.eclipse.ditto.signals.commands.amqpbridge.exceptions.ConnectionFailedException;
import org.eclipse.ditto.signals.commands.amqpbridge.exceptions.ConnectionNotAccessibleException;
import org.eclipse.ditto.signals.commands.amqpbridge.modify.CloseConnection;
import org.eclipse.ditto.signals.commands.amqpbridge.modify.CloseConnectionResponse;
import org.eclipse.ditto.signals.commands.amqpbridge.modify.CreateConnection;
import org.eclipse.ditto.signals.commands.amqpbridge.modify.CreateConnectionResponse;
import org.eclipse.ditto.signals.commands.amqpbridge.modify.DeleteConnection;
import org.eclipse.ditto.signals.commands.amqpbridge.modify.DeleteConnectionResponse;
import org.eclipse.ditto.signals.commands.amqpbridge.modify.OpenConnection;
import org.eclipse.ditto.signals.commands.amqpbridge.modify.OpenConnectionResponse;
import org.eclipse.ditto.signals.commands.amqpbridge.query.RetrieveConnection;
import org.eclipse.ditto.signals.commands.amqpbridge.query.RetrieveConnectionResponse;
import org.eclipse.ditto.signals.commands.amqpbridge.query.RetrieveConnectionStatus;
import org.eclipse.ditto.signals.commands.amqpbridge.query.RetrieveConnectionStatusResponse;
import org.eclipse.ditto.signals.events.amqpbridge.ConnectionClosed;
import org.eclipse.ditto.signals.events.amqpbridge.ConnectionCreated;
import org.eclipse.ditto.signals.events.amqpbridge.ConnectionDeleted;
import org.eclipse.ditto.signals.events.amqpbridge.ConnectionOpened;
import org.eclipse.ditto.signals.events.base.Event;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/eclipse/ditto/services/amqpbridge/messaging/ConnectionActor.class */
public final class ConnectionActor extends AbstractPersistentActor implements ExceptionListener {
    private static final String PERSISTENCE_ID_PREFIX = "connection:";
    private static final String JOURNAL_PLUGIN_ID = "akka-contrib-mongodb-persistence-connection-journal";
    private static final String SNAPSHOT_PLUGIN_ID = "akka-contrib-mongodb-persistence-connection-snapshots";
    private static final int SHUTDOWN_DELAY_SECONDS = 10;
    private static final FiniteDuration SHUTDOWN_DELAY = Duration.apply(10, TimeUnit.SECONDS);
    private final DiagnosticLoggingAdapter log;
    private final String connectionId;
    private final ActorRef pubSubMediator;
    private final String pubSubTargetActorPath;
    private final JmsConnectionFactory jmsConnectionFactory;
    private final long snapshotThreshold;
    private final SnapshotAdapter<ConnectionData> snapshotAdapter;
    private final AbstractActor.Receive connectionCreatedBehaviour;
    private ActorRef commandProcessor;
    private AmqpConnection amqpConnection;
    private ConnectionStatus connectionStatus;
    private Cancellable shutdownCancellable;
    private Connection jmsConnection;
    private Session jmsSession;
    private long lastSnapshotSequenceNr;
    private boolean snapshotInProgress;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/eclipse/ditto/services/amqpbridge/messaging/ConnectionActor$Shutdown.class */
    public static class Shutdown {
        private Shutdown() {
        }

        static Shutdown getInstance() {
            return new Shutdown();
        }
    }

    private ConnectionActor(String str, ActorRef actorRef, String str2, JmsConnectionFactory jmsConnectionFactory) {
        this.log = LogUtil.obtain(this);
        this.lastSnapshotSequenceNr = -1L;
        this.snapshotInProgress = false;
        this.connectionId = str;
        this.pubSubMediator = actorRef;
        this.pubSubTargetActorPath = str2;
        this.jmsConnectionFactory = jmsConnectionFactory;
        this.snapshotThreshold = getContext().system().settings().config().getLong("ditto.amqp-bridge.connection.snapshot.threshold");
        if (this.snapshotThreshold < 0) {
            throw new ConfigurationException(String.format("Config setting '%s' must be positive, but is: %d.", "ditto.amqp-bridge.connection.snapshot.threshold", Long.valueOf(this.snapshotThreshold)));
        }
        this.snapshotAdapter = new MongoConnectionSnapshotAdapter(getContext().system());
        this.connectionCreatedBehaviour = createConnectionCreatedBehaviour();
        this.connectionStatus = ConnectionStatus.CLOSED;
    }

    public static Props props(final String str, final ActorRef actorRef, final String str2, final JmsConnectionFactory jmsConnectionFactory) {
        return Props.create(ConnectionActor.class, new Creator<ConnectionActor>() { // from class: org.eclipse.ditto.services.amqpbridge.messaging.ConnectionActor.1
            private static final long serialVersionUID = 1;

            /* renamed from: create, reason: merged with bridge method [inline-methods] */
            public ConnectionActor m4create() {
                return new ConnectionActor(str, actorRef, str2, jmsConnectionFactory);
            }
        });
    }

    public String persistenceId() {
        return PERSISTENCE_ID_PREFIX + this.connectionId;
    }

    public String journalPluginId() {
        return JOURNAL_PLUGIN_ID;
    }

    public String snapshotPluginId() {
        return SNAPSHOT_PLUGIN_ID;
    }

    public void onException(JMSException jMSException) {
        this.log.error("{} occurred: {}", jMSException.getClass().getName(), jMSException.getMessage());
    }

    public void postStop() {
        super.postStop();
        if (this.shutdownCancellable != null) {
            this.shutdownCancellable.cancel();
        }
    }

    public AbstractActor.Receive createReceiveRecover() {
        return ReceiveBuilder.create().match(SnapshotOffer.class, snapshotOffer -> {
            ConnectionData connectionData = (ConnectionData) this.snapshotAdapter.fromSnapshotStore(snapshotOffer);
            this.log.info("Received SnapshotOffer containing connectionStatus: <{}>", connectionData);
            if (connectionData != null) {
                this.amqpConnection = connectionData.getAmqpConnection();
                this.connectionStatus = connectionData.getConnectionStatus();
            }
            this.lastSnapshotSequenceNr = snapshotOffer.metadata().sequenceNr();
        }).match(ConnectionCreated.class, connectionCreated -> {
            this.amqpConnection = connectionCreated.getAmqpConnection();
            this.connectionStatus = ConnectionStatus.OPEN;
        }).match(ConnectionOpened.class, connectionOpened -> {
            this.connectionStatus = ConnectionStatus.OPEN;
        }).match(ConnectionClosed.class, connectionClosed -> {
            this.connectionStatus = ConnectionStatus.CLOSED;
        }).match(ConnectionDeleted.class, connectionDeleted -> {
            this.amqpConnection = null;
            this.connectionStatus = ConnectionStatus.CLOSED;
        }).match(RecoveryCompleted.class, recoveryCompleted -> {
            if (this.amqpConnection != null) {
                this.log.info("Connection '{}' was recovered.", this.amqpConnection.getId());
                if (ConnectionStatus.OPEN.equals(this.connectionStatus)) {
                    this.jmsConnection = this.jmsConnectionFactory.createConnection(this.amqpConnection, this);
                    startCommandConsumers();
                }
                getContext().become(this.connectionCreatedBehaviour);
            }
            scheduleShutdown();
            getContext().getParent().tell(ConnectionSupervisorActor.ManualReset.getInstance(), getSelf());
        }).matchAny(obj -> {
            this.log.warning("Unknown recover message: {}", obj);
        }).build();
    }

    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().match(CreateConnection.class, this::createConnection).match(AmqpBridgeCommand.class, this::handleCommandDuringInitialization).match(Shutdown.class, shutdown -> {
            stopSelf();
        }).match(Status.Failure.class, failure -> {
            this.log.error(failure.cause(), "Got failure: {}", failure);
        }).matchAny(obj -> {
            this.log.warning("Unknown message: {}", obj);
            unhandled(obj);
        }).build();
    }

    private AbstractActor.Receive createConnectionCreatedBehaviour() {
        return ReceiveBuilder.create().match(OpenConnection.class, this::openConnection).match(CloseConnection.class, this::closeConnection).match(DeleteConnection.class, this::deleteConnection).match(RetrieveConnection.class, this::retrieveConnection).match(RetrieveConnectionStatus.class, this::retrieveConnectionStatus).match(Shutdown.class, shutdown -> {
            this.log.debug("Dropping Shutdown in created behaviour state.");
        }).match(Status.Failure.class, failure -> {
            this.log.error(failure.cause(), "Got failure: {}", failure);
        }).matchAny(obj -> {
            this.log.debug("Unknown message: {}", obj);
            unhandled(obj);
        }).build();
    }

    private void createConnection(CreateConnection createConnection) {
        this.amqpConnection = createConnection.getAmqpConnection();
        try {
            this.jmsConnection = this.jmsConnectionFactory.createConnection(this.amqpConnection, this);
            this.log.info("Connection '{}' created.", this.amqpConnection.getId());
            persistEvent(ConnectionCreated.of(this.amqpConnection, createConnection.getDittoHeaders()), connectionCreated -> {
                if (startCommandConsumersWithErrorHandling("create")) {
                    getSender().tell(CreateConnectionResponse.of(this.amqpConnection, createConnection.getDittoHeaders()), getSelf());
                    getContext().become(this.connectionCreatedBehaviour);
                    getContext().getParent().tell(ConnectionSupervisorActor.ManualReset.getInstance(), getSelf());
                }
            });
        } catch (JMSRuntimeException | JMSException | NamingException e) {
            getSender().tell(ConnectionFailedException.newBuilder(this.connectionId).description(e.getMessage()).build(), getSelf());
            this.log.error(e, "Failed to create Connection '{}' with Error: '{}'.", this.amqpConnection.getId(), e.getMessage());
        }
    }

    private void openConnection(OpenConnection openConnection) {
        persistEvent(ConnectionOpened.of(this.amqpConnection.getId(), openConnection.getDittoHeaders()), connectionOpened -> {
            if (startCommandConsumersWithErrorHandling("open")) {
                getSender().tell(OpenConnectionResponse.of(this.amqpConnection.getId(), openConnection.getDittoHeaders()), getSender());
            }
        });
    }

    private void closeConnection(CloseConnection closeConnection) {
        persistEvent(ConnectionClosed.of(this.amqpConnection.getId(), closeConnection.getDittoHeaders()), connectionClosed -> {
            if (stopCommandConsumersWithErrorHandling("close")) {
                getSender().tell(CloseConnectionResponse.of(this.amqpConnection.getId(), closeConnection.getDittoHeaders()), getSelf());
            }
        });
    }

    private void deleteConnection(DeleteConnection deleteConnection) {
        persistEvent(ConnectionDeleted.of(this.amqpConnection.getId(), deleteConnection.getDittoHeaders()), connectionDeleted -> {
            if (stopCommandConsumersWithErrorHandling("delete")) {
                getSender().tell(DeleteConnectionResponse.of(this.amqpConnection.getId(), deleteConnection.getDittoHeaders()), getSelf());
                stopSelf();
            }
        });
    }

    private void retrieveConnection(RetrieveConnection retrieveConnection) {
        getSender().tell(RetrieveConnectionResponse.of(this.amqpConnection, retrieveConnection.getDittoHeaders()), getSelf());
    }

    private void retrieveConnectionStatus(RetrieveConnectionStatus retrieveConnectionStatus) {
        getSender().tell(RetrieveConnectionStatusResponse.of(this.amqpConnection.getId(), this.connectionStatus, retrieveConnectionStatus.getDittoHeaders()), getSelf());
    }

    private boolean startCommandConsumersWithErrorHandling(String str) {
        try {
            startCommandConsumers();
            return true;
        } catch (JMSRuntimeException | JMSException e) {
            getSender().tell(ConnectionFailedException.newBuilder(this.connectionId).description(e.getMessage()).build(), getSelf());
            this.log.error(e, "Failed to <{}> Connection <{}> with Error: <{}>.", str, this.amqpConnection.getId(), e.getMessage());
            return false;
        }
    }

    private boolean stopCommandConsumersWithErrorHandling(String str) {
        try {
            stopCommandConsumers();
            return true;
        } catch (JMSRuntimeException | JMSException e) {
            getSender().tell(ConnectionFailedException.newBuilder(this.connectionId).description(e.getMessage()).build(), getSelf());
            this.log.error(e, "Failed to <{}> Connection <{}> with Error: <{}>.", str, this.amqpConnection.getId(), e.getMessage());
            return false;
        }
    }

    private void handleCommandDuringInitialization(AmqpBridgeCommand amqpBridgeCommand) {
        this.log.debug("Unexpected command during initialization of actor received: {} - Terminating this actor and sending 'ConnectionNotAccessibleException' to requester..", amqpBridgeCommand.getType());
        getSender().tell(ConnectionNotAccessibleException.newBuilder(amqpBridgeCommand.getId()).dittoHeaders(amqpBridgeCommand.getDittoHeaders()).build(), getSelf());
    }

    private <E extends Event> void persistEvent(E e, Consumer<E> consumer) {
        persist(e, event -> {
            this.log.debug("Successfully persisted Event '{}'", event.getType());
            consumer.accept(event);
            this.pubSubMediator.tell(new DistributedPubSubMediator.Publish(e.getType(), e, true), getSelf());
            if (lastSequenceNr() - this.lastSnapshotSequenceNr > this.snapshotThreshold) {
                doSaveSnapshot();
            }
        });
    }

    private void startCommandConsumers() throws JMSException {
        startConnection();
        this.commandProcessor = startChildActor("amqpCommandProcessor-" + this.amqpConnection.getId(), CommandProcessorActor.props(this.pubSubMediator, this.pubSubTargetActorPath, this.amqpConnection.getAuthorizationSubject()));
        Iterator it = this.amqpConnection.getSources().iterator();
        while (it.hasNext()) {
            startCommandConsumer((String) it.next());
        }
        this.log.info("Subscribed Connection '{}' to sources: {}", this.amqpConnection.getId(), this.amqpConnection.getSources());
        this.connectionStatus = ConnectionStatus.OPEN;
    }

    private void stopCommandConsumers() throws JMSException {
        if (this.amqpConnection != null) {
            Iterator it = this.amqpConnection.getSources().iterator();
            while (it.hasNext()) {
                stopChildActor((String) it.next());
            }
            stopChildActor("amqpCommandProcessor-" + this.amqpConnection.getId());
            this.log.info("Unsubscribed Connection '{}' from sources: {}", this.amqpConnection.getId(), this.amqpConnection.getSources());
            this.connectionStatus = ConnectionStatus.CLOSED;
            stopConnection();
        }
    }

    private void startCommandConsumer(String str) {
        startChildActor("amqpConsumerActor-" + str, CommandConsumerActor.props(this.jmsSession, str, this.commandProcessor));
    }

    private void startConnection() throws JMSException {
        if (this.shutdownCancellable != null) {
            this.shutdownCancellable.cancel();
        }
        if (this.jmsConnection != null) {
            this.jmsConnection.start();
            this.jmsSession = this.jmsConnection.createSession(false, 1);
            this.log.info("Connection '{}' opened.", this.amqpConnection.getId());
        }
    }

    private void stopConnection() throws JMSException {
        if (this.jmsSession != null) {
            try {
                this.jmsSession.close();
                this.jmsSession = null;
            } catch (JMSException e) {
                this.log.debug("Session of connection '{}' already closed: {}", this.amqpConnection.getId(), e.getMessage());
            }
        }
        if (this.jmsConnection != null) {
            try {
                this.jmsConnection.stop();
                this.jmsConnection.close();
                this.jmsConnection = null;
                this.log.info("Connection '{}' closed.", this.amqpConnection.getId());
            } catch (JMSException e2) {
                this.log.debug("Connection '{}' already closed: {}", this.amqpConnection.getId(), e2.getMessage());
            }
        }
    }

    private void doSaveSnapshot() {
        if (this.snapshotInProgress) {
            this.log.debug("Already requested taking a Snapshot - not doing it again");
            return;
        }
        this.snapshotInProgress = true;
        ConnectionData connectionData = new ConnectionData(this.amqpConnection, this.connectionStatus);
        this.log.info("Attempting to save Snapshot for '{}' ..", connectionData);
        saveSnapshot(this.snapshotAdapter.toSnapshotStore(connectionData));
    }

    private ActorRef startChildActor(String str, Props props) {
        this.log.debug("Starting child actor '{}'", str);
        return getContext().actorOf(props, str.replace('/', '_'));
    }

    private void stopChildActor(String str) {
        this.log.debug("Stopping child actor '{}'", str);
        Optional findChild = getContext().findChild(str.replace('/', '_'));
        AbstractActor.ActorContext context = getContext();
        context.getClass();
        findChild.ifPresent(context::stop);
    }

    private void stopSelf() {
        this.log.debug("Shutting down");
        getContext().getParent().tell(PoisonPill.getInstance(), getSelf());
    }

    private void scheduleShutdown() {
        this.shutdownCancellable = getContext().getSystem().scheduler().scheduleOnce(SHUTDOWN_DELAY, getSelf(), Shutdown.getInstance(), getContext().dispatcher(), ActorRef.noSender());
    }
}
