package org.eclipse.ditto.services.connectivity.messaging.amqp;

import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.Status;
import akka.japi.Pair;
import akka.pattern.PatternsCS;
import akka.util.Timeout;
import java.net.URI;
import java.text.MessageFormat;
import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nullable;
import javax.jms.Connection;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.JmsConnectionListener;
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
import org.apache.qpid.jms.provider.ProviderFactory;
import org.eclipse.ditto.model.base.common.ConditionChecker;
import org.eclipse.ditto.model.connectivity.AddressMetric;
import org.eclipse.ditto.model.connectivity.ConnectionConfigurationInvalidException;
import org.eclipse.ditto.model.connectivity.ConnectionStatus;
import org.eclipse.ditto.model.connectivity.ConnectivityModelFactory;
import org.eclipse.ditto.model.connectivity.Source;
import org.eclipse.ditto.model.connectivity.Target;
import org.eclipse.ditto.services.connectivity.messaging.BaseClientActor;
import org.eclipse.ditto.services.connectivity.messaging.BaseClientData;
import org.eclipse.ditto.services.connectivity.messaging.ReconnectActor;
import org.eclipse.ditto.services.connectivity.messaging.internal.AbstractWithOrigin;
import org.eclipse.ditto.services.connectivity.messaging.internal.ClientConnected;
import org.eclipse.ditto.services.connectivity.messaging.internal.ClientDisconnected;
import org.eclipse.ditto.services.connectivity.messaging.internal.ConnectClient;
import org.eclipse.ditto.services.connectivity.messaging.internal.ConnectionFailure;
import org.eclipse.ditto.services.connectivity.messaging.internal.DisconnectClient;
import org.eclipse.ditto.services.connectivity.messaging.internal.ImmutableConnectionFailure;
import org.eclipse.ditto.services.connectivity.messaging.internal.ReconnectClient;
import org.eclipse.ditto.services.utils.akka.LogUtil;
import org.eclipse.ditto.signals.commands.connectivity.exceptions.ConnectionFailedException;

/* loaded from: input_file:org/eclipse/ditto/services/connectivity/messaging/amqp/AmqpClientActor.class */
public final class AmqpClientActor extends BaseClientActor implements ExceptionListener {
    private static final int TEST_CONNECTION_TIMEOUT = 5;
    private final JmsConnectionFactory jmsConnectionFactory;
    private final ConnectionListener connectionListener;
    private final Map<String, MessageConsumer> consumerMap;

    @Nullable
    private JmsConnection jmsConnection;

    @Nullable
    private Session jmsSession;

    @Nullable
    private ActorRef amqpPublisherActor;

    /* loaded from: input_file:org/eclipse/ditto/services/connectivity/messaging/amqp/AmqpClientActor$ConnectionListener.class */
    private class ConnectionListener implements JmsConnectionListener {
        private ConnectionListener() {
        }

        public void onConnectionEstablished(URI uri) {
            AmqpClientActor.this.log.info("Connection established: {}", uri);
        }

        public void onConnectionFailure(Throwable th) {
            LogUtil.enhanceLogWithCustomField(AmqpClientActor.this.log, BaseClientData.MDC_CONNECTION_ID, AmqpClientActor.this.connectionId());
            AmqpClientActor.this.log.warning("Connection Failure: {}", th.getMessage());
            AmqpClientActor.this.getSelf().tell(new ImmutableConnectionFailure(ActorRef.noSender(), th, null), ActorRef.noSender());
        }

        public void onConnectionInterrupted(URI uri) {
            LogUtil.enhanceLogWithCustomField(AmqpClientActor.this.log, BaseClientData.MDC_CONNECTION_ID, AmqpClientActor.this.connectionId());
            AmqpClientActor.this.log.warning("Connection interrupted: {}", uri);
            AmqpClientActor.this.getSelf().tell(new ImmutableConnectionFailure(ActorRef.noSender(), null, "JMS Interrupted"), ActorRef.noSender());
        }

        public void onConnectionRestored(URI uri) {
            LogUtil.enhanceLogWithCustomField(AmqpClientActor.this.log, BaseClientData.MDC_CONNECTION_ID, AmqpClientActor.this.connectionId());
            AmqpClientActor.this.log.info("Connection restored: {}", uri);
            AmqpClientActor.this.getSelf().tell(Optional::empty, ActorRef.noSender());
        }

        public void onInboundMessage(JmsInboundMessageDispatch jmsInboundMessageDispatch) {
            LogUtil.enhanceLogWithCustomField(AmqpClientActor.this.log, BaseClientData.MDC_CONNECTION_ID, AmqpClientActor.this.connectionId());
            AmqpClientActor.this.log.debug("Inbound message: {}", jmsInboundMessageDispatch);
            AmqpClientActor.this.incrementConsumedMessageCounter();
        }

        public void onSessionClosed(Session session, Throwable th) {
            LogUtil.enhanceLogWithCustomField(AmqpClientActor.this.log, BaseClientData.MDC_CONNECTION_ID, AmqpClientActor.this.connectionId());
            AmqpClientActor.this.log.warning("Session closed: {} - {}", session, th.getMessage());
            AmqpClientActor.this.getSelf().tell(new ImmutableConnectionFailure(ActorRef.noSender(), th, "JMS Session closed"), ActorRef.noSender());
        }

        public void onConsumerClosed(MessageConsumer messageConsumer, Throwable th) {
            LogUtil.enhanceLogWithCustomField(AmqpClientActor.this.log, BaseClientData.MDC_CONNECTION_ID, AmqpClientActor.this.connectionId());
            AmqpClientActor.this.consumerMap.entrySet().stream().filter(entry -> {
                return ((MessageConsumer) entry.getValue()).equals(messageConsumer);
            }).findFirst().ifPresent(entry2 -> {
                AmqpClientActor.this.log.warning("Consumer <{}> closed due to {}: {}", entry2.getKey(), th.getClass().getSimpleName(), th.getMessage());
                AmqpClientActor.this.getContext().findChild(AmqpClientActor.escapeActorName("amqpConsumerActor-" + ((String) entry2.getKey()))).ifPresent(actorRef -> {
                    actorRef.tell(ConnectivityModelFactory.newAddressMetric(ConnectionStatus.FAILED, "Consumer closed at " + Instant.now(), 0L, (Instant) null), (ActorRef) null);
                });
            });
        }

        public void onProducerClosed(MessageProducer messageProducer, Throwable th) {
            LogUtil.enhanceLogWithCustomField(AmqpClientActor.this.log, BaseClientData.MDC_CONNECTION_ID, AmqpClientActor.this.connectionId());
            AmqpClientActor.this.log.warning("Producer <{}> closed due to {}: {}", messageProducer, th.getClass().getSimpleName(), th.getMessage());
            AmqpClientActor.this.getContext().findChild(AmqpClientActor.escapeActorName("amqpPublisherActor")).ifPresent(actorRef -> {
                actorRef.tell(ConnectivityModelFactory.newAddressMetric(ConnectionStatus.FAILED, "Producer closed at " + Instant.now(), 0L, (Instant) null), (ActorRef) null);
            });
        }
    }

    /* loaded from: input_file:org/eclipse/ditto/services/connectivity/messaging/amqp/AmqpClientActor$JmsConnect.class */
    static class JmsConnect extends AbstractWithOrigin implements ConnectClient {
        JmsConnect(@Nullable ActorRef actorRef) {
            super(actorRef);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/eclipse/ditto/services/connectivity/messaging/amqp/AmqpClientActor$JmsConnected.class */
    public static class JmsConnected extends AbstractWithOrigin implements ClientConnected {
        private final JmsConnection connection;
        private final Session session;
        private final Map<String, MessageConsumer> consumers;

        /* JADX INFO: Access modifiers changed from: package-private */
        public JmsConnected(@Nullable ActorRef actorRef, JmsConnection jmsConnection, Session session, Map<String, MessageConsumer> map) {
            super(actorRef);
            this.connection = jmsConnection;
            this.session = session;
            this.consumers = map;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/eclipse/ditto/services/connectivity/messaging/amqp/AmqpClientActor$JmsDisconnect.class */
    public static class JmsDisconnect extends AbstractWithOrigin implements DisconnectClient {

        @Nullable
        private final Connection connection;

        JmsDisconnect(@Nullable ActorRef actorRef, @Nullable Connection connection) {
            super(actorRef);
            this.connection = connection;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Optional<Connection> getConnection() {
            return Optional.ofNullable(this.connection);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/eclipse/ditto/services/connectivity/messaging/amqp/AmqpClientActor$JmsDisconnected.class */
    public static class JmsDisconnected extends AbstractWithOrigin implements ClientDisconnected {
        /* JADX INFO: Access modifiers changed from: package-private */
        public JmsDisconnected(@Nullable ActorRef actorRef) {
            super(actorRef);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/eclipse/ditto/services/connectivity/messaging/amqp/AmqpClientActor$JmsReconnect.class */
    public static class JmsReconnect extends AbstractWithOrigin implements ReconnectClient {
        private final Connection connection;

        JmsReconnect(@Nullable ActorRef actorRef, @Nullable Connection connection) {
            super(actorRef);
            this.connection = (Connection) ConditionChecker.checkNotNull(connection, "connection");
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Connection getConnection() {
            return this.connection;
        }
    }

    private AmqpClientActor(org.eclipse.ditto.model.connectivity.Connection connection, ConnectionStatus connectionStatus, JmsConnectionFactory jmsConnectionFactory, ActorRef actorRef) {
        super(connection, connectionStatus, actorRef);
        this.jmsConnectionFactory = jmsConnectionFactory;
        this.connectionListener = new ConnectionListener();
        this.consumerMap = new HashMap();
    }

    private AmqpClientActor(org.eclipse.ditto.model.connectivity.Connection connection, ConnectionStatus connectionStatus, ActorRef actorRef) {
        this(connection, connectionStatus, ConnectionBasedJmsConnectionFactory.getInstance(), actorRef);
    }

    public static Props props(org.eclipse.ditto.model.connectivity.Connection connection, ActorRef actorRef) {
        return Props.create(AmqpClientActor.class, new Object[]{validateConnection(connection), connection.getConnectionStatus(), actorRef});
    }

    public static Props propsForTests(org.eclipse.ditto.model.connectivity.Connection connection, ConnectionStatus connectionStatus, ActorRef actorRef, JmsConnectionFactory jmsConnectionFactory) {
        return Props.create(AmqpClientActor.class, new Object[]{validateConnection(connection), connectionStatus, jmsConnectionFactory, actorRef});
    }

    private static org.eclipse.ditto.model.connectivity.Connection validateConnection(org.eclipse.ditto.model.connectivity.Connection connection) {
        try {
            ProviderFactory.create(URI.create(ConnectionBasedJmsConnectionFactory.buildAmqpConnectionUriFromConnection(connection)));
            return connection;
        } catch (Exception e) {
            throw ConnectionConfigurationInvalidException.newBuilder(MessageFormat.format("Failed to instantiate an amqp provider from the given configuration: {0}", e.getMessage())).description(e.getMessage()).cause(e).build();
        }
    }

    @Override // org.eclipse.ditto.services.connectivity.messaging.BaseClientActor
    protected CompletionStage<Status.Status> doTestConnection(org.eclipse.ditto.model.connectivity.Connection connection) {
        return PatternsCS.ask(startConnectionHandlingActor("test", connection), new JmsConnect(getSender()), Timeout.apply(5L, TimeUnit.SECONDS)).handle((obj, th) -> {
            if (th == null && !(obj instanceof Status.Failure) && !(obj instanceof Throwable)) {
                return obj instanceof ConnectionFailure ? ((ConnectionFailure) obj).getFailure() : new Status.Success(obj);
            }
            Throwable cause = obj instanceof Status.Failure ? ((Status.Failure) obj).cause() : obj instanceof Throwable ? (Throwable) obj : th;
            return new Status.Failure(ConnectionFailedException.newBuilder(connectionId()).description("The requested Connection could not be connected due to '" + cause.getClass().getSimpleName() + ": " + cause.getMessage() + "'").cause(cause).build());
        });
    }

    @Override // org.eclipse.ditto.services.connectivity.messaging.BaseClientActor
    protected void doConnectClient(org.eclipse.ditto.model.connectivity.Connection connection, @Nullable ActorRef actorRef) {
        startConnectionHandlingActor("connect", connection).tell(new JmsConnect(actorRef), getSelf());
    }

    @Override // org.eclipse.ditto.services.connectivity.messaging.BaseClientActor
    protected void doReconnectClient(org.eclipse.ditto.model.connectivity.Connection connection, @Nullable ActorRef actorRef) {
        startConnectionHandlingActor(ReconnectActor.ACTOR_NAME, connection).tell(new JmsReconnect(actorRef, this.jmsConnection), getSelf());
    }

    @Override // org.eclipse.ditto.services.connectivity.messaging.BaseClientActor
    protected void doDisconnectClient(org.eclipse.ditto.model.connectivity.Connection connection, @Nullable ActorRef actorRef) {
        startConnectionHandlingActor("disconnect", connection).tell(new JmsDisconnect(actorRef, this.jmsConnection), getSelf());
    }

    @Override // org.eclipse.ditto.services.connectivity.messaging.BaseClientActor
    protected Map<String, AddressMetric> getSourceConnectionStatus(Source source) {
        try {
            return (Map) collectAsList((List) source.getAddresses().stream().flatMap(str -> {
                return IntStream.range(0, source.getConsumerCount()).mapToObj(i -> {
                    String str = str + "-" + i;
                    return retrieveAddressMetric(str, escapeActorName("amqpConsumerActor-" + str));
                });
            }).collect(Collectors.toList())).thenApply(list -> {
                return (Map) list.stream().collect(Collectors.toMap((v0) -> {
                    return v0.first();
                }, (v0) -> {
                    return v0.second();
                }));
            }).get(2L, TimeUnit.SECONDS);
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            this.log.error(e, "Error while aggregating sources ConnectionStatus: {}", e.getMessage());
            return Collections.emptyMap();
        }
    }

    @Override // org.eclipse.ditto.services.connectivity.messaging.BaseClientActor
    protected Map<String, AddressMetric> getTargetConnectionStatus(Target target) {
        HashMap hashMap = new HashMap();
        try {
            Pair<String, AddressMetric> pair = retrieveAddressMetric(target.getAddress(), "amqpPublisherActor").get(2L, TimeUnit.SECONDS);
            hashMap.put(pair.first(), pair.second());
            return hashMap;
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            this.log.error(e, "Error while aggregating target ConnectionStatus: {}", e.getMessage());
            return Collections.emptyMap();
        }
    }

    @Override // org.eclipse.ditto.services.connectivity.messaging.BaseClientActor
    protected void onClientConnected(ClientConnected clientConnected, BaseClientData baseClientData) {
        if (!(clientConnected instanceof JmsConnected)) {
            this.log.info("ClientConnected was not JmsConnected as expected, ignoring as this probably was a reconnection");
            return;
        }
        JmsConnected jmsConnected = (JmsConnected) clientConnected;
        this.log.info("Received JmsConnected");
        this.jmsConnection = jmsConnected.connection;
        this.jmsConnection.addConnectionListener(this.connectionListener);
        this.jmsSession = jmsConnected.session;
        this.consumerMap.clear();
        this.consumerMap.putAll(jmsConnected.consumers);
        this.amqpPublisherActor = startAmqpPublisherActor().orElse(null);
        startCommandConsumers(this.consumerMap);
    }

    @Override // org.eclipse.ditto.services.connectivity.messaging.BaseClientActor
    protected void onClientDisconnected(ClientDisconnected clientDisconnected, BaseClientData baseClientData) {
        this.log.info("Received ClientDisconnected");
        stopCommandConsumers();
        stopCommandProducer();
        this.jmsSession = null;
        if (this.jmsConnection != null) {
            this.jmsConnection.removeConnectionListener(this.connectionListener);
        }
        this.jmsConnection = null;
        if (this.amqpPublisherActor != null) {
            stopChildActor(this.amqpPublisherActor);
            this.amqpPublisherActor = null;
        }
        this.consumerMap.clear();
    }

    @Override // org.eclipse.ditto.services.connectivity.messaging.BaseClientActor
    protected Optional<ActorRef> getPublisherActor() {
        return Optional.ofNullable(this.amqpPublisherActor);
    }

    private void startCommandConsumers(Map<String, MessageConsumer> map) {
        Optional<ActorRef> messageMappingProcessorActor = getMessageMappingProcessorActor();
        if (!messageMappingProcessorActor.isPresent()) {
            this.log.warning("The MessageMappingProcessor was not available and therefore no consumers were started!");
        } else {
            if (!isConsuming()) {
                this.log.debug("Not starting consumers, no sources were configured");
                return;
            }
            stopCommandConsumers();
            map.forEach((str, messageConsumer) -> {
                startCommandConsumer(str, messageConsumer, (ActorRef) messageMappingProcessorActor.get());
            });
            this.log.info("Subscribed Connection <{}> to sources: {}", connectionId(), map.keySet());
        }
    }

    private void startCommandConsumer(String str, MessageConsumer messageConsumer, ActorRef actorRef) {
        String str2 = "amqpConsumerActor-" + str;
        if (getContext().findChild(str2).isPresent()) {
            this.log.debug("Child actor {} already exists.", str2);
        } else {
            startChildActor(str2, AmqpConsumerActor.props(str, messageConsumer, actorRef));
        }
    }

    private Optional<ActorRef> startAmqpPublisherActor() {
        if (isPublishing()) {
            return Optional.of(getContext().findChild("amqpPublisherActor").orElseGet(() -> {
                if (this.jmsSession != null) {
                    return startChildActor("amqpPublisherActor", AmqpPublisherActor.props(this.jmsSession, getTargetsOrEmptySet()));
                }
                throw new IllegalStateException("Could not start AmqpPublisherActor due to missing jmsSession or connection");
            }));
        }
        this.log.info("This client is not configured for publishing, not starting AmqpPublisherActor");
        return Optional.empty();
    }

    private void stopCommandProducer() {
        getContext().findChild(escapeActorName("amqpPublisherActor")).ifPresent(this::stopChildActor);
    }

    private void stopCommandConsumers() {
        getContext().getChildren().forEach(actorRef -> {
            if (actorRef.path().name().startsWith("amqpConsumerActor-")) {
                stopChildActor(actorRef);
            }
        });
    }

    private ActorRef startConnectionHandlingActor(String str, org.eclipse.ditto.model.connectivity.Connection connection) {
        String str2 = "jmsConnectionHandling-" + escapeActorName(connectionId() + "-" + str);
        Optional findChild = getContext().findChild(str2);
        if (findChild.isPresent()) {
            this.log.info("JMSConnectionHandlingActor <{}> is still existing and busy executing a command, queuing new command..", str2);
            return (ActorRef) findChild.get();
        }
        return getContext().actorOf(JMSConnectionHandlingActor.props(connection, this, this.jmsConnectionFactory), str2);
    }

    public void onException(JMSException jMSException) {
        this.log.warning("{} occurred: {}", jMSException.getClass().getName(), jMSException.getMessage());
    }
}
