package org.eclipse.ditto.services.connectivity.messaging.amqp;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.event.DiagnosticLoggingAdapter;
import akka.japi.Creator;
import java.util.HashMap;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Session;
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.JmsQueue;
import org.eclipse.ditto.model.base.common.ConditionChecker;
import org.eclipse.ditto.model.connectivity.Connection;
import org.eclipse.ditto.services.connectivity.messaging.amqp.AmqpClientActor;
import org.eclipse.ditto.services.connectivity.messaging.internal.ImmutableConnectionFailure;
import org.eclipse.ditto.services.utils.akka.LogUtil;
import org.eclipse.ditto.signals.commands.connectivity.exceptions.ConnectionFailedException;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/eclipse/ditto/services/connectivity/messaging/amqp/JMSConnectionHandlingActor.class */
public class JMSConnectionHandlingActor extends AbstractActor {
    static final String ACTOR_NAME_PREFIX = "jmsConnectionHandling-";
    private final DiagnosticLoggingAdapter log;
    private final Connection connection;
    private final ExceptionListener exceptionListener;
    private final JmsConnectionFactory jmsConnectionFactory;

    private JMSConnectionHandlingActor(Connection connection, ExceptionListener exceptionListener, JmsConnectionFactory jmsConnectionFactory) {
        this.log = LogUtil.obtain(this);
        this.connection = (Connection) ConditionChecker.checkNotNull(connection, "connection");
        this.exceptionListener = exceptionListener;
        this.jmsConnectionFactory = jmsConnectionFactory;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Props props(final Connection connection, final ExceptionListener exceptionListener, final JmsConnectionFactory jmsConnectionFactory) {
        return Props.create(JMSConnectionHandlingActor.class, new Creator<JMSConnectionHandlingActor>() { // from class: org.eclipse.ditto.services.connectivity.messaging.amqp.JMSConnectionHandlingActor.1
            private static final long serialVersionUID = 1;

            /* renamed from: create, reason: merged with bridge method [inline-methods] */
            public JMSConnectionHandlingActor m14create() {
                return new JMSConnectionHandlingActor(connection, exceptionListener, jmsConnectionFactory);
            }
        });
    }

    public AbstractActor.Receive createReceive() {
        return receiveBuilder().match(AmqpClientActor.JmsConnect.class, this::handleConnect).match(AmqpClientActor.JmsReconnect.class, this::handleReconnect).match(AmqpClientActor.JmsDisconnect.class, this::handleDisconnect).build();
    }

    private void handleConnect(AmqpClientActor.JmsConnect jmsConnect) {
        doConnect(getSender(), jmsConnect.getOrigin().orElse(null));
        getContext().stop(getSelf());
    }

    private void handleReconnect(AmqpClientActor.JmsReconnect jmsReconnect) {
        javax.jms.Connection connection = jmsReconnect.getConnection();
        this.log.info("Reconnecting");
        doDisconnect(connection, null);
        ActorRef sender = getSender();
        getContext().getSystem().scheduler().scheduleOnce(FiniteDuration.apply(500L, TimeUnit.MILLISECONDS), () -> {
            doConnect(sender, jmsReconnect.getOrigin().orElse(null));
        }, getContext().getSystem().dispatcher());
    }

    private void handleDisconnect(AmqpClientActor.JmsDisconnect jmsDisconnect) {
        Optional<javax.jms.Connection> connection = jmsDisconnect.getConnection();
        if (connection.isPresent()) {
            doDisconnect(connection.get(), jmsDisconnect.getOrigin().orElse(null));
        } else {
            getSender().tell(new AmqpClientActor.JmsDisconnected(jmsDisconnect.getOrigin().orElse(null)), jmsDisconnect.getOrigin().orElse(null));
        }
        this.log.debug("Stopping myself {}", getSelf());
        getContext().stop(getSelf());
    }

    private void doConnect(ActorRef actorRef, @Nullable ActorRef actorRef2) {
        try {
            JmsConnection createConnection = this.jmsConnectionFactory.createConnection(this.connection, this.exceptionListener);
            this.log.debug("Starting connection.");
            createConnection.start();
            this.log.debug("Connection started successfully, creating session.");
            Session createSession = createConnection.createSession(2);
            this.log.debug("Session created.");
            HashMap hashMap = new HashMap();
            HashMap hashMap2 = new HashMap();
            this.connection.getSources().forEach(source -> {
                source.getAddresses().forEach(str -> {
                    for (int i = 0; i < source.getConsumerCount(); i++) {
                        String str = str + "-" + i;
                        this.log.debug("Creating AMQP Consumer for <{}>", str);
                        try {
                            hashMap.put(str, createSession.createConsumer(new JmsQueue(str)));
                        } catch (JMSException e) {
                            hashMap2.put(str, e);
                        }
                    }
                });
            });
            if (hashMap2.isEmpty()) {
                actorRef.tell(new AmqpClientActor.JmsConnected(actorRef2, createConnection, createSession, hashMap), actorRef2);
                this.log.debug("Connection <{}> established successfully, stopping myself.", this.connection.getId());
            } else {
                this.log.warning("Failed to consume sources: {}.", hashMap2);
                actorRef.tell(new ImmutableConnectionFailure(actorRef2, ConnectionFailedException.newBuilder(this.connection.getId()).message("Failed to consume sources: " + hashMap2.keySet()).description(() -> {
                    return (String) hashMap2.entrySet().stream().map(entry -> {
                        return ((String) entry.getKey()) + ": " + ((Exception) entry.getValue()).getMessage();
                    }).collect(Collectors.joining(", "));
                }).build(), null), getSelf());
            }
        } catch (Exception e) {
            actorRef.tell(new ImmutableConnectionFailure(actorRef2, e, null), getSelf());
        }
    }

    private void doDisconnect(javax.jms.Connection connection, @Nullable ActorRef actorRef) {
        try {
            this.log.debug("Closing JMS connection {}", this.connection.getId());
            connection.stop();
            connection.close();
            this.log.info("Connection <{}> closed.", this.connection.getId());
        } catch (JMSException e) {
            this.log.debug("Connection <{}> already closed: {}", this.connection.getId(), e.getMessage());
        }
        getSender().tell(new AmqpClientActor.JmsDisconnected(actorRef), actorRef);
    }
}
