package org.eclipse.ditto.services.connectivity.messaging.rabbitmq;

import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.Status;
import akka.japi.Pair;
import akka.japi.pf.FSMStateFunctionBuilder;
import com.newmotion.akka.rabbitmq.ChannelActor;
import com.newmotion.akka.rabbitmq.ChannelCreated;
import com.newmotion.akka.rabbitmq.ConnectionActor;
import com.newmotion.akka.rabbitmq.CreateChannel;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Delivery;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.client.impl.DefaultExceptionHandler;
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nullable;
import org.eclipse.ditto.model.base.exceptions.DittoRuntimeException;
import org.eclipse.ditto.model.connectivity.AddressMetric;
import org.eclipse.ditto.model.connectivity.ConnectionStatus;
import org.eclipse.ditto.model.connectivity.ConnectivityModelFactory;
import org.eclipse.ditto.model.connectivity.Source;
import org.eclipse.ditto.model.connectivity.Target;
import org.eclipse.ditto.services.connectivity.messaging.BaseClientActor;
import org.eclipse.ditto.services.connectivity.messaging.BaseClientData;
import org.eclipse.ditto.services.connectivity.messaging.BaseClientState;
import org.eclipse.ditto.services.connectivity.messaging.internal.ClientConnected;
import org.eclipse.ditto.services.connectivity.messaging.internal.ClientDisconnected;
import org.eclipse.ditto.services.connectivity.messaging.internal.ConnectionFailure;
import org.eclipse.ditto.services.connectivity.messaging.internal.ImmutableConnectionFailure;
import org.eclipse.ditto.services.utils.akka.LogUtil;
import org.eclipse.ditto.signals.commands.connectivity.exceptions.ConnectionFailedException;
import scala.Option;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/eclipse/ditto/services/connectivity/messaging/rabbitmq/RabbitMQClientActor.class */
public final class RabbitMQClientActor extends BaseClientActor {
    private static final String RMQ_CONNECTION_ACTOR_NAME = "rmq-connection";
    private static final String CONSUMER_CHANNEL = "consumer-channel";
    private static final String PUBLISHER_CHANNEL = "publisher-channel";
    private static final String CONSUMER_ACTOR_PREFIX = "consumer-";
    private final RabbitConnectionFactoryFactory rabbitConnectionFactoryFactory;

    @Nullable
    private ActorRef rmqConnectionActor;

    @Nullable
    private ActorRef consumerChannelActor;

    @Nullable
    private ActorRef rmqPublisherActor;

    @Nullable
    private ActorRef createConnectionSender;
    private final Map<String, String> consumedTagsToAddresses;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/eclipse/ditto/services/connectivity/messaging/rabbitmq/RabbitMQClientActor$RabbitMQExceptionHandler.class */
    public class RabbitMQExceptionHandler extends DefaultExceptionHandler {
        private final Consumer<Throwable> exceptionHandler;

        private RabbitMQExceptionHandler(Consumer<Throwable> consumer) {
            this.exceptionHandler = consumer;
        }

        public void handleUnexpectedConnectionDriverException(Connection connection, Throwable th) {
            this.exceptionHandler.accept(th);
        }
    }

    /* loaded from: input_file:org/eclipse/ditto/services/connectivity/messaging/rabbitmq/RabbitMQClientActor$RabbitMQMessageConsumer.class */
    private class RabbitMQMessageConsumer extends DefaultConsumer {
        private final ActorRef consumerActor;

        private RabbitMQMessageConsumer(ActorRef actorRef, Channel channel) {
            super(channel);
            this.consumerActor = actorRef;
            actorRef.tell(ConnectivityModelFactory.newAddressMetric(ConnectionStatus.OPEN, "Consumer initialized at " + Instant.now(), 0L, (Instant) null), (ActorRef) null);
        }

        public void handleDelivery(String str, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) {
            LogUtil.enhanceLogWithCustomField(RabbitMQClientActor.this.log, BaseClientData.MDC_CONNECTION_ID, RabbitMQClientActor.this.connectionId());
            try {
                try {
                    this.consumerActor.tell(new Delivery(envelope, basicProperties, bArr), RabbitMQClientActor.this.getSelf());
                    RabbitMQClientActor.this.incrementConsumedMessageCounter();
                    try {
                        getChannel().basicAck(envelope.getDeliveryTag(), false);
                    } catch (IOException e) {
                        RabbitMQClientActor.this.log.info("Failed to ack delivery <{}>: {}", Long.valueOf(envelope.getDeliveryTag()), e.getMessage());
                    }
                } catch (Throwable th) {
                    RabbitMQClientActor.this.incrementConsumedMessageCounter();
                    try {
                        getChannel().basicAck(envelope.getDeliveryTag(), false);
                    } catch (IOException e2) {
                        RabbitMQClientActor.this.log.info("Failed to ack delivery <{}>: {}", Long.valueOf(envelope.getDeliveryTag()), e2.getMessage());
                    }
                    throw th;
                }
            } catch (Exception e3) {
                RabbitMQClientActor.this.log.info("Failed to process delivery <{}>: {}", Long.valueOf(envelope.getDeliveryTag()), e3.getMessage());
                RabbitMQClientActor.this.incrementConsumedMessageCounter();
                try {
                    getChannel().basicAck(envelope.getDeliveryTag(), false);
                } catch (IOException e4) {
                    RabbitMQClientActor.this.log.info("Failed to ack delivery <{}>: {}", Long.valueOf(envelope.getDeliveryTag()), e4.getMessage());
                }
            }
        }

        public void handleConsumeOk(String str) {
            super.handleConsumeOk(str);
            LogUtil.enhanceLogWithCustomField(RabbitMQClientActor.this.log, BaseClientData.MDC_CONNECTION_ID, RabbitMQClientActor.this.connectionId());
            RabbitMQClientActor.this.consumingQueueByTag(str).ifPresent(str2 -> {
                RabbitMQClientActor.this.log.info("consume OK for consumer queue <{}> on connection <{}>", str2, RabbitMQClientActor.this.connectionId());
            });
            this.consumerActor.tell(ConnectivityModelFactory.newAddressMetric(ConnectionStatus.OPEN, "Consumer started at " + Instant.now(), 0L, (Instant) null), (ActorRef) null);
        }

        public void handleCancel(String str) throws IOException {
            super.handleCancel(str);
            LogUtil.enhanceLogWithCustomField(RabbitMQClientActor.this.log, BaseClientData.MDC_CONNECTION_ID, RabbitMQClientActor.this.connectionId());
            RabbitMQClientActor.this.consumingQueueByTag(str).ifPresent(str2 -> {
                RabbitMQClientActor.this.log.warning("Consumer with queue <{}> was cancelled on connection <{}> - this can happen for example when the queue was deleted", str2, RabbitMQClientActor.this.connectionId());
            });
            this.consumerActor.tell(ConnectivityModelFactory.newAddressMetric(ConnectionStatus.FAILED, "Consumer for queue cancelled at " + Instant.now(), 0L, (Instant) null), (ActorRef) null);
        }

        public void handleShutdownSignal(String str, ShutdownSignalException shutdownSignalException) {
            super.handleShutdownSignal(str, shutdownSignalException);
            LogUtil.enhanceLogWithCustomField(RabbitMQClientActor.this.log, BaseClientData.MDC_CONNECTION_ID, RabbitMQClientActor.this.connectionId());
            RabbitMQClientActor.this.consumingQueueByTag(str).ifPresent(str2 -> {
                RabbitMQClientActor.this.log.warning("Consumer with queue <{}> shutdown as the channel or the underlying connection has been shut down on connection <{}>", str2, RabbitMQClientActor.this.connectionId());
            });
            this.consumerActor.tell(ConnectivityModelFactory.newAddressMetric(ConnectionStatus.FAILED, "Channel or the underlying connection has been shut down at " + Instant.now(), 0L, (Instant) null), (ActorRef) null);
        }

        public void handleRecoverOk(String str) {
            super.handleRecoverOk(str);
            LogUtil.enhanceLogWithCustomField(RabbitMQClientActor.this.log, BaseClientData.MDC_CONNECTION_ID, RabbitMQClientActor.this.connectionId());
            RabbitMQClientActor.this.log.info("recovered OK for consumer with tag <{}> on connection <{}>", str, RabbitMQClientActor.this.connectionId());
            RabbitMQClientActor.this.getSelf().tell(Optional::empty, RabbitMQClientActor.this.getSelf());
        }
    }

    private RabbitMQClientActor(org.eclipse.ditto.model.connectivity.Connection connection, ConnectionStatus connectionStatus, RabbitConnectionFactoryFactory rabbitConnectionFactoryFactory, ActorRef actorRef) {
        super(connection, connectionStatus, actorRef);
        this.rabbitConnectionFactoryFactory = rabbitConnectionFactoryFactory;
        this.consumedTagsToAddresses = new HashMap();
    }

    private RabbitMQClientActor(org.eclipse.ditto.model.connectivity.Connection connection, ConnectionStatus connectionStatus, ActorRef actorRef) {
        this(connection, connectionStatus, ConnectionBasedRabbitConnectionFactoryFactory.getInstance(), actorRef);
    }

    public static Props props(org.eclipse.ditto.model.connectivity.Connection connection, ActorRef actorRef) {
        return Props.create(RabbitMQClientActor.class, new Object[]{validateConnection(connection), connection.getConnectionStatus(), actorRef});
    }

    public static Props propsForTests(org.eclipse.ditto.model.connectivity.Connection connection, ConnectionStatus connectionStatus, ActorRef actorRef, RabbitConnectionFactoryFactory rabbitConnectionFactoryFactory) {
        return Props.create(RabbitMQClientActor.class, new Object[]{validateConnection(connection), connectionStatus, rabbitConnectionFactoryFactory, actorRef});
    }

    private static org.eclipse.ditto.model.connectivity.Connection validateConnection(org.eclipse.ditto.model.connectivity.Connection connection) {
        connection.getTargets().stream().map((v0) -> {
            return v0.getAddress();
        }).forEach(RabbitMQTarget::fromTargetAddress);
        return connection;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.eclipse.ditto.services.connectivity.messaging.BaseClientActor
    public FSMStateFunctionBuilder<BaseClientState, BaseClientData> unhandledHandler(String str) {
        return super.unhandledHandler(str).event(ChannelCreated.class, BaseClientData.class, (channelCreated, baseClientData) -> {
            handleChannelCreated(channelCreated);
            return stay();
        });
    }

    @Override // org.eclipse.ditto.services.connectivity.messaging.BaseClientActor
    protected CompletionStage<Status.Status> doTestConnection(org.eclipse.ditto.model.connectivity.Connection connection) {
        this.createConnectionSender = getSender();
        return connect(connection, getSender());
    }

    @Override // org.eclipse.ditto.services.connectivity.messaging.BaseClientActor
    protected void doConnectClient(org.eclipse.ditto.model.connectivity.Connection connection, @Nullable ActorRef actorRef) {
        this.createConnectionSender = actorRef;
        connect(connection, actorRef).thenAccept(status -> {
            this.log.info("Status of connecting in doConnectClient: {}", status);
        });
    }

    @Override // org.eclipse.ditto.services.connectivity.messaging.BaseClientActor
    protected void doReconnectClient(org.eclipse.ditto.model.connectivity.Connection connection, @Nullable ActorRef actorRef) {
        stopCommandConsumers();
        stopCommandPublisher();
        onClientDisconnected(Optional::empty, (BaseClientData) stateData());
        this.createConnectionSender = actorRef;
        getContext().getSystem().scheduler().scheduleOnce(FiniteDuration.apply(500L, TimeUnit.MILLISECONDS), () -> {
            connect(connection, actorRef).thenAccept(status -> {
                this.log.info("Reconnected successfully");
            });
        }, getContext().getSystem().dispatcher());
    }

    @Override // org.eclipse.ditto.services.connectivity.messaging.BaseClientActor
    protected void doDisconnectClient(org.eclipse.ditto.model.connectivity.Connection connection, @Nullable ActorRef actorRef) {
        stopCommandConsumers();
        stopCommandPublisher();
        getSelf().tell(() -> {
            return Optional.ofNullable(actorRef);
        }, actorRef);
    }

    @Override // org.eclipse.ditto.services.connectivity.messaging.BaseClientActor
    protected void onClientConnected(ClientConnected clientConnected, BaseClientData baseClientData) {
        this.log.info("Received ClientConnected");
        if (this.rmqConnectionActor != null) {
            if (this.consumerChannelActor != null) {
                this.log.info("Consumer is already created, didn't create it again..");
            } else if (isConsuming()) {
                this.rmqConnectionActor.tell(CreateChannel.apply(ChannelActor.props((channel, actorRef) -> {
                    this.log.info("Did set up consumer channel: {}", channel);
                    startCommandConsumers(channel);
                    this.consumerChannelActor = actorRef;
                    return null;
                }), Option.apply(CONSUMER_CHANNEL)), getSelf());
            } else {
                this.log.info("Not starting channels, no sources were configured");
            }
            this.log.debug("Connection '{}' opened.", connectionId());
        }
    }

    @Override // org.eclipse.ditto.services.connectivity.messaging.BaseClientActor
    protected void onClientDisconnected(ClientDisconnected clientDisconnected, BaseClientData baseClientData) {
        this.log.info("Received ClientDisconnected");
        if (this.consumerChannelActor != null) {
            stopChildActor(this.consumerChannelActor);
            this.consumerChannelActor = null;
        }
        if (this.rmqConnectionActor != null) {
            stopChildActor(this.rmqConnectionActor);
            this.rmqConnectionActor = null;
        }
        if (this.rmqPublisherActor != null) {
            stopChildActor(this.rmqPublisherActor);
            this.rmqPublisherActor = null;
        }
    }

    @Override // org.eclipse.ditto.services.connectivity.messaging.BaseClientActor
    protected Optional<ActorRef> getPublisherActor() {
        return Optional.ofNullable(this.rmqPublisherActor);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.eclipse.ditto.services.connectivity.messaging.BaseClientActor
    public void onConnectionFailure(ConnectionFailure connectionFailure, BaseClientData baseClientData) {
        super.onConnectionFailure(connectionFailure, baseClientData);
        Throwable cause = connectionFailure.getFailure().cause();
        LogUtil.enhanceLogWithCustomField(this.log, BaseClientData.MDC_CONNECTION_ID, connectionId());
        this.log.warning("Got unexpected ConnectionDriver exception on connection <{}> {}: {}", connectionId(), cause.getClass().getSimpleName(), cause.getMessage());
        if (this.createConnectionSender != null) {
            this.createConnectionSender.tell(ConnectionFailedException.newBuilder(connectionId()).description("The requested Connection could not be connected due to '" + cause.getClass().getSimpleName() + ": " + cause.getMessage() + "'").cause(cause).build(), (ActorRef) null);
            this.createConnectionSender = null;
        }
    }

    @Override // org.eclipse.ditto.services.connectivity.messaging.BaseClientActor
    protected Map<String, AddressMetric> getSourceConnectionStatus(Source source) {
        try {
            return (Map) collectAsList((List) source.getAddresses().stream().flatMap(str -> {
                return IntStream.range(0, source.getConsumerCount()).mapToObj(i -> {
                    String str = str + "-" + i;
                    return retrieveAddressMetric(str, escapeActorName(CONSUMER_ACTOR_PREFIX + str));
                });
            }).collect(Collectors.toList())).thenApply(list -> {
                return (Map) list.stream().collect(Collectors.toMap((v0) -> {
                    return v0.first();
                }, (v0) -> {
                    return v0.second();
                }));
            }).get(2L, TimeUnit.SECONDS);
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            this.log.error(e, "Error while aggregating sources ConnectionStatus: {}", e.getMessage());
            return Collections.emptyMap();
        }
    }

    @Override // org.eclipse.ditto.services.connectivity.messaging.BaseClientActor
    protected Map<String, AddressMetric> getTargetConnectionStatus(Target target) {
        HashMap hashMap = new HashMap();
        try {
            Pair<String, AddressMetric> pair = retrieveAddressMetric(target.getAddress(), "rmqPublisherActor").get(2L, TimeUnit.SECONDS);
            hashMap.put(pair.first(), pair.second());
            return hashMap;
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            this.log.error(e, "Error while aggregating target ConnectionStatus: {}", e.getMessage());
            return Collections.emptyMap();
        }
    }

    private void handleChannelCreated(ChannelCreated channelCreated) {
        this.consumerChannelActor = channelCreated.channel();
    }

    private CompletionStage<Status.Status> connect(org.eclipse.ditto.model.connectivity.Connection connection, @Nullable ActorRef actorRef) {
        CompletableFuture completableFuture = new CompletableFuture();
        if (this.rmqConnectionActor == null) {
            ActorRef self = getSelf();
            try {
                this.rmqConnectionActor = startChildActor(RMQ_CONNECTION_ACTOR_NAME, ConnectionActor.props(this.rabbitConnectionFactoryFactory.createConnectionFactory(connection, new RabbitMQExceptionHandler(th -> {
                    self.tell(new ImmutableConnectionFailure(actorRef, th, null), self);
                    completableFuture.complete(new Status.Failure(th));
                })), FiniteDuration.apply(10L, TimeUnit.SECONDS), (connection2, actorRef2) -> {
                    this.log.info("Established RMQ connection: {}", connection2);
                    self.tell(() -> {
                        return Optional.ofNullable(this.createConnectionSender);
                    }, actorRef);
                    return null;
                }));
                this.rmqPublisherActor = startRmqPublisherActor().orElse(null);
                this.rmqConnectionActor.tell(CreateChannel.apply(ChannelActor.props((channel, actorRef3) -> {
                    this.log.info("Did set up publisher channel: {}", channel);
                    completableFuture.complete(new Status.Success("channel created"));
                    return null;
                }), Option.apply(PUBLISHER_CHANNEL)), this.rmqPublisherActor);
            } catch (Exception e) {
                self.tell(new ImmutableConnectionFailure(actorRef, e, null), self);
                completableFuture.complete(new Status.Failure(e));
            }
        } else {
            this.log.debug("Connection '{}' is already open.", connectionId());
            completableFuture.complete(new Status.Success("already connected"));
        }
        return completableFuture;
    }

    private Optional<ActorRef> startRmqPublisherActor() {
        return isPublishing() ? Optional.of(getContext().findChild("rmqPublisherActor").orElseGet(() -> {
            return startChildActor("rmqPublisherActor", RabbitMQPublisherActor.props(getTargetsOrEmptySet()));
        })) : Optional.empty();
    }

    private void stopCommandPublisher() {
        stopChildActor("rmqPublisherActor");
    }

    private void stopCommandConsumers() {
        getContext().getChildren().forEach(actorRef -> {
            if (actorRef.path().name().startsWith(CONSUMER_ACTOR_PREFIX)) {
                stopChildActor(actorRef);
            }
        });
        getContext().getChildren().forEach(actorRef2 -> {
            String name = actorRef2.path().name();
            if (name.startsWith(CONSUMER_ACTOR_PREFIX)) {
                int i = 5;
                while (getContext().findChild(name).isPresent()) {
                    try {
                        Thread.sleep(10L);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                    i--;
                    if (i == 0) {
                        return;
                    }
                }
            }
        });
    }

    private void startCommandConsumers(Channel channel) {
        this.log.info("Starting to consume queues...");
        try {
            ensureQueuesExist(channel);
            stopCommandConsumers();
            startConsumers(channel);
        } catch (DittoRuntimeException e) {
            if (this.createConnectionSender != null) {
                this.createConnectionSender.tell(new Status.Failure(e), getSelf());
                this.createConnectionSender = null;
            }
            if (this.consumerChannelActor != null) {
                stopChildActor(this.consumerChannelActor);
                this.consumerChannelActor = null;
            }
        }
        if (this.createConnectionSender != null) {
            this.createConnectionSender.tell(new Status.Success(BaseClientState.CONNECTED), getSelf());
            this.createConnectionSender = null;
        }
    }

    private void startConsumers(Channel channel) {
        Optional<ActorRef> messageMappingProcessorActor = getMessageMappingProcessorActor();
        if (messageMappingProcessorActor.isPresent()) {
            getSourcesOrEmptySet().forEach(source -> {
                source.getAddresses().forEach(str -> {
                    for (int i = 0; i < source.getConsumerCount(); i++) {
                        String str = str + "-" + i;
                        try {
                            String basicConsume = channel.basicConsume(str, false, new RabbitMQMessageConsumer(startChildActor(CONSUMER_ACTOR_PREFIX + str, RabbitMQConsumerActor.props(str, (ActorRef) messageMappingProcessorActor.get())), channel));
                            this.log.debug("Consuming queue <{}>, consumer tag is <{}>", str, basicConsume);
                            this.consumedTagsToAddresses.put(basicConsume, str);
                        } catch (IOException e) {
                            this.log.warning("Failed to consume queue <{}>: <{}>", str, e.getMessage());
                        }
                    }
                });
            });
        } else {
            this.log.warning("The MessageMappingProcessor was not available and therefore no consumers were started!");
        }
    }

    private void ensureQueuesExist(Channel channel) {
        ArrayList arrayList = new ArrayList();
        getSourcesOrEmptySet().forEach(source -> {
            source.getAddresses().forEach(str -> {
                try {
                    channel.queueDeclarePassive(str);
                } catch (IOException e) {
                    arrayList.add(str);
                    this.log.warning("The queue <{}> does not exist.", str);
                }
            });
        });
        if (arrayList.isEmpty()) {
            return;
        }
        this.log.warning("Stopping RMQ client actor for connection <{}> as queues to connect to are missing: <{}>", connectionId(), arrayList);
        throw ConnectionFailedException.newBuilder(connectionId()).description("The queues " + arrayList + " to connect to are missing.").build();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Optional<String> consumingQueueByTag(String str) {
        return Optional.ofNullable(this.consumedTagsToAddresses.get(str));
    }
}
