package org.eclipse.ditto.services.connectivity.messaging;

import akka.ConfigurationException;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.actor.ReceiveTimeout;
import akka.actor.Status;
import akka.cluster.pubsub.DistributedPubSubMediator;
import akka.cluster.routing.ClusterRouterPool;
import akka.cluster.routing.ClusterRouterPoolSettings;
import akka.event.DiagnosticLoggingAdapter;
import akka.japi.Creator;
import akka.japi.pf.ReceiveBuilder;
import akka.pattern.PatternsCS;
import akka.persistence.AbstractPersistentActor;
import akka.persistence.RecoveryCompleted;
import akka.persistence.SaveSnapshotSuccess;
import akka.persistence.SnapshotOffer;
import akka.routing.Broadcast;
import akka.routing.RoundRobinPool;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.eclipse.ditto.model.base.auth.AuthorizationContext;
import org.eclipse.ditto.model.base.common.ConditionChecker;
import org.eclipse.ditto.model.base.common.HttpStatusCode;
import org.eclipse.ditto.model.base.exceptions.DittoRuntimeException;
import org.eclipse.ditto.model.base.headers.DittoHeaders;
import org.eclipse.ditto.model.base.headers.WithDittoHeaders;
import org.eclipse.ditto.model.connectivity.Connection;
import org.eclipse.ditto.model.connectivity.ConnectionConfigurationInvalidException;
import org.eclipse.ditto.model.connectivity.ConnectionStatus;
import org.eclipse.ditto.services.connectivity.messaging.ConnectionSupervisorActor;
import org.eclipse.ditto.services.connectivity.messaging.persistence.ConnectionMongoSnapshotAdapter;
import org.eclipse.ditto.services.utils.akka.LogUtil;
import org.eclipse.ditto.services.utils.persistence.SnapshotAdapter;
import org.eclipse.ditto.signals.base.Signal;
import org.eclipse.ditto.signals.commands.base.Command;
import org.eclipse.ditto.signals.commands.base.CommandResponse;
import org.eclipse.ditto.signals.commands.connectivity.AggregatedConnectivityCommandResponse;
import org.eclipse.ditto.signals.commands.connectivity.ConnectivityCommand;
import org.eclipse.ditto.signals.commands.connectivity.ConnectivityErrorResponse;
import org.eclipse.ditto.signals.commands.connectivity.exceptions.ConnectionConflictException;
import org.eclipse.ditto.signals.commands.connectivity.exceptions.ConnectionFailedException;
import org.eclipse.ditto.signals.commands.connectivity.exceptions.ConnectionNotAccessibleException;
import org.eclipse.ditto.signals.commands.connectivity.modify.CloseConnection;
import org.eclipse.ditto.signals.commands.connectivity.modify.CloseConnectionResponse;
import org.eclipse.ditto.signals.commands.connectivity.modify.CreateConnection;
import org.eclipse.ditto.signals.commands.connectivity.modify.CreateConnectionResponse;
import org.eclipse.ditto.signals.commands.connectivity.modify.DeleteConnection;
import org.eclipse.ditto.signals.commands.connectivity.modify.DeleteConnectionResponse;
import org.eclipse.ditto.signals.commands.connectivity.modify.ModifyConnection;
import org.eclipse.ditto.signals.commands.connectivity.modify.ModifyConnectionResponse;
import org.eclipse.ditto.signals.commands.connectivity.modify.OpenConnection;
import org.eclipse.ditto.signals.commands.connectivity.modify.OpenConnectionResponse;
import org.eclipse.ditto.signals.commands.connectivity.modify.TestConnection;
import org.eclipse.ditto.signals.commands.connectivity.modify.TestConnectionResponse;
import org.eclipse.ditto.signals.commands.connectivity.query.RetrieveConnection;
import org.eclipse.ditto.signals.commands.connectivity.query.RetrieveConnectionMetrics;
import org.eclipse.ditto.signals.commands.connectivity.query.RetrieveConnectionResponse;
import org.eclipse.ditto.signals.commands.connectivity.query.RetrieveConnectionStatus;
import org.eclipse.ditto.signals.commands.connectivity.query.RetrieveConnectionStatusResponse;
import org.eclipse.ditto.signals.events.base.Event;
import org.eclipse.ditto.signals.events.connectivity.ConnectionClosed;
import org.eclipse.ditto.signals.events.connectivity.ConnectionCreated;
import org.eclipse.ditto.signals.events.connectivity.ConnectionDeleted;
import org.eclipse.ditto.signals.events.connectivity.ConnectionModified;
import org.eclipse.ditto.signals.events.connectivity.ConnectionOpened;
import scala.concurrent.duration.Duration;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/eclipse/ditto/services/connectivity/messaging/ConnectionActor.class */
public final class ConnectionActor extends AbstractPersistentActor {
    private static final String PERSISTENCE_ID_PREFIX = "connection:";
    private static final String JOURNAL_PLUGIN_ID = "akka-contrib-mongodb-persistence-connection-journal";
    private static final String SNAPSHOT_PLUGIN_ID = "akka-contrib-mongodb-persistence-connection-snapshots";
    private static final long DEFAULT_TIMEOUT_MS = 5000;
    private static final String PUB_SUB_GROUP_PREFIX = "connection:";
    private final DiagnosticLoggingAdapter log;
    private final String connectionId;
    private final ActorRef pubSubMediator;
    private final ActorRef conciergeForwarder;
    private final long snapshotThreshold;
    private final SnapshotAdapter<Connection> snapshotAdapter;
    private final ConnectionActorPropsFactory propsFactory;
    private final AbstractActor.Receive connectionCreatedBehaviour;

    @Nullable
    private ActorRef clientActor;

    @Nullable
    private Connection connection;
    private long lastSnapshotSequenceNr;
    private boolean snapshotInProgress;
    private Set<String> uniqueTopicPaths;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/eclipse/ditto/services/connectivity/messaging/ConnectionActor$AggregateActor.class */
    public static class AggregateActor extends AbstractActor {
        private final String connectionId;
        private final ActorRef clientActor;
        private final int expectedResponses;
        private final long timeout;

        @Nullable
        private ActorRef origin;

        @Nullable
        private DittoHeaders originHeaders;
        private final DiagnosticLoggingAdapter log = LogUtil.obtain(this);
        private int responseCount = 0;
        private final List<CommandResponse<?>> aggregatedResults = new ArrayList();
        private final Map<String, Status.Status> aggregatedStatus = new HashMap();

        static Props props(String str, ActorRef actorRef, int i, long j) {
            return Props.create(AggregateActor.class, new Object[]{str, actorRef, Integer.valueOf(i), Long.valueOf(j)});
        }

        private AggregateActor(String str, ActorRef actorRef, int i, long j) {
            this.connectionId = str;
            this.clientActor = actorRef;
            this.expectedResponses = i;
            this.timeout = j;
        }

        public AbstractActor.Receive createReceive() {
            return ReceiveBuilder.create().match(Command.class, command -> {
                this.clientActor.tell(new Broadcast(command), getSelf());
                this.originHeaders = command.getDittoHeaders();
                this.origin = getSender();
                getContext().setReceiveTimeout(Duration.create(this.timeout / 2.0d, TimeUnit.MILLISECONDS));
            }).match(ReceiveTimeout.class, receiveTimeout -> {
                sendBackAggregatedResults();
            }).matchAny(obj -> {
                if (obj instanceof CommandResponse) {
                    this.aggregatedResults.add((CommandResponse) obj);
                } else if (obj instanceof Status.Status) {
                    this.aggregatedStatus.put(getSender().path().address().hostPort(), (Status.Status) obj);
                } else if (obj instanceof DittoRuntimeException) {
                    this.aggregatedResults.add(ConnectivityErrorResponse.of((DittoRuntimeException) obj));
                } else {
                    this.log.error("Could not handle non-Jsonifiable non-Status response: {}", obj);
                }
                this.responseCount++;
                if (this.expectedResponses == this.responseCount) {
                    sendBackAggregatedResults();
                }
            }).build();
        }

        private void sendBackAggregatedResults() {
            if (this.origin != null && this.originHeaders != null && !this.aggregatedResults.isEmpty()) {
                AggregatedConnectivityCommandResponse of = AggregatedConnectivityCommandResponse.of(this.connectionId, this.aggregatedResults, this.aggregatedResults.get(0).getType(), HttpStatusCode.OK, this.originHeaders);
                this.log.debug("Aggregated response: {}", of);
                this.origin.tell(of, getSelf());
            } else if (this.origin == null || this.originHeaders == null || this.aggregatedStatus.isEmpty()) {
                this.log.warning("No origin was present or results were empty in order to send back aggregated results to");
            } else {
                this.log.debug("Aggregated stati: {}", this.aggregatedStatus);
                Optional findFirst = this.aggregatedStatus.entrySet().stream().filter(entry -> {
                    return entry.getValue() instanceof Status.Failure;
                }).map((v0) -> {
                    return v0.getValue();
                }).findFirst();
                if (findFirst.isPresent()) {
                    this.origin.tell(findFirst.get(), getSelf());
                } else {
                    this.origin.tell(new Status.Success((String) this.aggregatedStatus.entrySet().stream().map((v0) -> {
                        return v0.toString();
                    }).collect(Collectors.joining(","))), getSelf());
                }
            }
            getContext().stop(getSelf());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/eclipse/ditto/services/connectivity/messaging/ConnectionActor$Shutdown.class */
    public static class Shutdown {
        private Shutdown() {
        }

        private static Shutdown getInstance() {
            return new Shutdown();
        }
    }

    private ConnectionActor(String str, ActorRef actorRef, ActorRef actorRef2, ConnectionActorPropsFactory connectionActorPropsFactory) {
        this.log = LogUtil.obtain(this);
        this.lastSnapshotSequenceNr = -1L;
        this.snapshotInProgress = false;
        this.uniqueTopicPaths = Collections.emptySet();
        this.connectionId = str;
        this.pubSubMediator = actorRef;
        this.conciergeForwarder = actorRef2;
        this.propsFactory = connectionActorPropsFactory;
        this.snapshotThreshold = getContext().system().settings().config().getLong("ditto.connectivity.connection.snapshot.threshold");
        if (this.snapshotThreshold < 0) {
            throw new ConfigurationException(String.format("Config setting '%s' must be positive, but is: %d.", "ditto.connectivity.connection.snapshot.threshold", Long.valueOf(this.snapshotThreshold)));
        }
        this.snapshotAdapter = new ConnectionMongoSnapshotAdapter();
        this.connectionCreatedBehaviour = createConnectionCreatedBehaviour();
    }

    public static Props props(final String str, final ActorRef actorRef, final ActorRef actorRef2, final ConnectionActorPropsFactory connectionActorPropsFactory) {
        return Props.create(ConnectionActor.class, new Creator<ConnectionActor>() { // from class: org.eclipse.ditto.services.connectivity.messaging.ConnectionActor.1
            private static final long serialVersionUID = 1;

            /* renamed from: create, reason: merged with bridge method [inline-methods] */
            public ConnectionActor m1create() {
                return new ConnectionActor(str, actorRef, actorRef2, connectionActorPropsFactory);
            }
        });
    }

    public String persistenceId() {
        return "connection:" + this.connectionId;
    }

    public String journalPluginId() {
        return JOURNAL_PLUGIN_ID;
    }

    public String snapshotPluginId() {
        return SNAPSHOT_PLUGIN_ID;
    }

    public void postStop() {
        super.postStop();
        this.log.info("stopped connection <{}>", this.connectionId);
    }

    public AbstractActor.Receive createReceiveRecover() {
        return ReceiveBuilder.create().match(SnapshotOffer.class, snapshotOffer -> {
            Connection connection = (Connection) this.snapshotAdapter.fromSnapshotStore(snapshotOffer);
            this.log.info("Received SnapshotOffer containing connection: <{}>", connection);
            if (connection != null) {
                this.connection = connection;
            }
            this.lastSnapshotSequenceNr = snapshotOffer.metadata().sequenceNr();
        }).match(ConnectionCreated.class, connectionCreated -> {
            this.connection = connectionCreated.getConnection();
        }).match(ConnectionOpened.class, connectionOpened -> {
            this.connection = this.connection != null ? this.connection.toBuilder().connectionStatus(ConnectionStatus.OPEN).build() : null;
        }).match(ConnectionClosed.class, connectionClosed -> {
            this.connection = this.connection != null ? this.connection.toBuilder().connectionStatus(ConnectionStatus.CLOSED).build() : null;
        }).match(ConnectionDeleted.class, connectionDeleted -> {
            this.connection = null;
        }).match(RecoveryCompleted.class, recoveryCompleted -> {
            this.log.info("Connection '{}' was recovered: {}", this.connectionId, this.connection);
            if (this.connection != null) {
                if (ConnectionStatus.OPEN.equals(this.connection.getConnectionStatus())) {
                    this.log.debug("Opening connection {} after recovery.", this.connectionId);
                    CreateConnection of = CreateConnection.of(this.connection, DittoHeaders.empty());
                    ActorRef sender = getSender();
                    askClientActor(of, obj -> {
                        this.log.info("CreateConnection result: {}", obj);
                    }, th -> {
                        handleException("recovery-connect", sender, th);
                    });
                    subscribeForEvents();
                }
                getContext().become(this.connectionCreatedBehaviour);
            }
            getContext().getParent().tell(ConnectionSupervisorActor.ManualReset.getInstance(), getSelf());
        }).matchAny(obj -> {
            this.log.warning("Unknown recover message: {}", obj);
        }).build();
    }

    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().match(TestConnection.class, this::testConnection).match(CreateConnection.class, this::createConnection).match(ConnectivityCommand.class, this::handleCommandDuringInitialization).match(Shutdown.class, shutdown -> {
            stopSelf();
        }).match(Status.Failure.class, failure -> {
            this.log.warning("Got failure in initial behaviour with cause {}: {}", failure.cause().getClass().getSimpleName(), failure.cause().getMessage());
        }).matchAny(obj -> {
            this.log.warning("Unknown message: {}", obj);
            unhandled(obj);
        }).build();
    }

    private AbstractActor.Receive createConnectionCreatedBehaviour() {
        return ReceiveBuilder.create().match(TestConnection.class, testConnection -> {
            getSender().tell(TestConnectionResponse.alreadyCreated(testConnection.getConnectionId(), testConnection.getDittoHeaders()), getSelf());
        }).match(CreateConnection.class, createConnection -> {
            enhanceLogUtil(createConnection);
            this.log.info("Connection <{}> already exists, responding with conflict", createConnection.getId());
            getSender().tell(ConnectionConflictException.newBuilder(createConnection.getId()).dittoHeaders(createConnection.getDittoHeaders()).build(), getSelf());
        }).match(ModifyConnection.class, this::modifyConnection).match(OpenConnection.class, this::openConnection).match(CloseConnection.class, this::closeConnection).match(DeleteConnection.class, this::deleteConnection).match(RetrieveConnection.class, this::retrieveConnection).match(RetrieveConnectionStatus.class, this::retrieveConnectionStatus).match(RetrieveConnectionMetrics.class, this::retrieveConnectionMetrics).match(Signal.class, this::handleSignal).match(DistributedPubSubMediator.SubscribeAck.class, this::handleSubscribeAck).match(DistributedPubSubMediator.UnsubscribeAck.class, this::handleUnsubscribeAck).match(SaveSnapshotSuccess.class, this::handleSnapshotSuccess).match(Shutdown.class, shutdown -> {
            this.log.debug("Dropping Shutdown in created behaviour state.");
        }).match(Status.Failure.class, failure -> {
            this.log.warning("Got failure in connectionCreated behaviour with cause {}: {}", failure.cause().getClass().getSimpleName(), failure.cause().getMessage());
        }).matchAny(obj -> {
            this.log.warning("Unknown message: {}", obj);
            unhandled(obj);
        }).build();
    }

    private void enhanceLogUtil(WithDittoHeaders<?> withDittoHeaders) {
        LogUtil.enhanceLogWithCorrelationId(this.log, withDittoHeaders);
        LogUtil.enhanceLogWithCustomField(this.log, BaseClientData.MDC_CONNECTION_ID, this.connectionId);
    }

    private void handleSignal(Signal<?> signal) {
        enhanceLogUtil(signal);
        if (this.clientActor == null) {
            this.log.debug("Cannot forward thing event, client actor not ready.");
            return;
        }
        if (this.connection == null) {
            this.log.debug("No Connection configuration available.");
            return;
        }
        if (this.uniqueTopicPaths.isEmpty()) {
            this.log.debug("Not forwarding anything.");
            return;
        }
        if (this.connectionId.equals(signal.getDittoHeaders().getOrigin().orElse(null))) {
            this.log.debug("Dropping signal, was sent by myself.");
            return;
        }
        String mapSignalToTopicPath = TopicPathMapper.mapSignalToTopicPath(signal);
        if (!this.uniqueTopicPaths.contains(mapSignalToTopicPath)) {
            this.log.debug("Dropping signal, topic '{}' is not subscribed.", mapSignalToTopicPath);
        } else if (isAuthorized(signal, this.connection.getAuthorizationContext())) {
            this.log.debug("Forwarding signal <{}> to client actor.", signal.getType());
            this.clientActor.tell(signal, getSelf());
        }
    }

    private boolean isAuthorized(Signal<?> signal, AuthorizationContext authorizationContext) {
        return !Collections.disjoint(signal.getDittoHeaders().getReadSubjects(), authorizationContext.getAuthorizationSubjectIds());
    }

    private void testConnection(TestConnection testConnection) {
        ActorRef sender = getSender();
        if (isConnectionConfigurationValid(testConnection.getConnection(), sender)) {
            this.connection = testConnection.getConnection();
            askClientActor(testConnection, obj -> {
                sender.tell(TestConnectionResponse.success(testConnection.getConnectionId(), obj.toString(), testConnection.getDittoHeaders()), getSelf());
                stopSelf();
            }, th -> {
                handleException("test", sender, th);
                stopSelf();
            });
        }
    }

    private void createConnection(CreateConnection createConnection) {
        ActorRef sender = getSender();
        if (isConnectionConfigurationValid(createConnection.getConnection(), sender)) {
            persistEvent(ConnectionCreated.of(createConnection.getConnection(), createConnection.getDittoHeaders()), connectionCreated -> {
                this.connection = connectionCreated.getConnection();
                askClientActor(createConnection, obj -> {
                    getContext().become(this.connectionCreatedBehaviour);
                    subscribeForEvents();
                    sender.tell(CreateConnectionResponse.of(this.connection, createConnection.getDittoHeaders()), getSelf());
                    getContext().getParent().tell(ConnectionSupervisorActor.ManualReset.getInstance(), getSelf());
                }, th -> {
                    getContext().become(this.connectionCreatedBehaviour);
                    handleException("connect", sender, th);
                    getContext().getParent().tell(ConnectionSupervisorActor.ManualReset.getInstance(), getSelf());
                });
            });
        }
    }

    private boolean isConnectionConfigurationValid(Connection connection, ActorRef actorRef) {
        try {
            this.propsFactory.getActorPropsForType(connection, this.conciergeForwarder);
            return true;
        } catch (Exception e) {
            handleException("connect", actorRef, e);
            stopSelf();
            return false;
        }
    }

    private void modifyConnection(ModifyConnection modifyConnection) {
        ActorRef sender = getSender();
        if (isConnectionConfigurationValid(modifyConnection.getConnection(), sender)) {
            if (this.connection == null || this.connection.getConnectionType().equals(modifyConnection.getConnection().getConnectionType())) {
                persistEvent(ConnectionModified.of(modifyConnection.getConnection(), modifyConnection.getDittoHeaders()), connectionModified -> {
                    this.connection = connectionModified.getConnection();
                    askClientActor(modifyConnection, obj -> {
                        getContext().become(this.connectionCreatedBehaviour);
                        subscribeForEvents();
                        sender.tell(ModifyConnectionResponse.modified(this.connectionId, modifyConnection.getDittoHeaders()), getSelf());
                        getContext().getParent().tell(ConnectionSupervisorActor.ManualReset.getInstance(), getSelf());
                    }, th -> {
                        handleException("connect-after-modify", sender, th);
                    });
                });
            } else {
                handleException("modify", sender, ConnectionConfigurationInvalidException.newBuilder("ConnectionType '" + this.connection.getConnectionType().getName() + "' of existing connection '" + this.connectionId + "' cannot be changed").dittoHeaders(modifyConnection.getDittoHeaders()).build());
            }
        }
    }

    private void openConnection(OpenConnection openConnection) {
        ConditionChecker.checkNotNull(this.connection, "Connection");
        ConnectionOpened of = ConnectionOpened.of(openConnection.getConnectionId(), openConnection.getDittoHeaders());
        ActorRef sender = getSender();
        persistEvent(of, connectionOpened -> {
            this.connection.toBuilder().connectionStatus(ConnectionStatus.OPEN).build();
            askClientActor(openConnection, obj -> {
                subscribeForEvents();
                sender.tell(OpenConnectionResponse.of(this.connectionId, openConnection.getDittoHeaders()), getSelf());
            }, th -> {
                handleException("open-connection", sender, th);
            });
        });
    }

    private void closeConnection(CloseConnection closeConnection) {
        ConnectionClosed of = ConnectionClosed.of(closeConnection.getConnectionId(), closeConnection.getDittoHeaders());
        ActorRef sender = getSender();
        persistEvent(of, connectionClosed -> {
            if (this.connection != null) {
                this.connection = this.connection.toBuilder().connectionStatus(ConnectionStatus.CLOSED).build();
            }
            askClientActor(closeConnection, obj -> {
                sender.tell(CloseConnectionResponse.of(this.connectionId, closeConnection.getDittoHeaders()), getSelf());
                unsubscribeFromEvents();
            }, th -> {
                handleException("disconnect", sender, th);
            });
        });
    }

    private void deleteConnection(DeleteConnection deleteConnection) {
        ConnectionDeleted of = ConnectionDeleted.of(deleteConnection.getConnectionId(), deleteConnection.getDittoHeaders());
        ActorRef sender = getSender();
        persistEvent(of, connectionDeleted -> {
            askClientActor(deleteConnection, obj -> {
                unsubscribeFromEvents();
                stopClientActor();
                sender.tell(DeleteConnectionResponse.of(this.connectionId, deleteConnection.getDittoHeaders()), getSelf());
                stopSelf();
            }, th -> {
                unsubscribeFromEvents();
                stopClientActor();
                sender.tell(DeleteConnectionResponse.of(this.connectionId, deleteConnection.getDittoHeaders()), getSelf());
                stopSelf();
            });
        });
    }

    private void askClientActor(Command<?> command, Consumer<Object> consumer, Consumer<Throwable> consumer2) {
        startClientActorIfRequired();
        long longValue = ((Long) Optional.ofNullable(command.getDittoHeaders().get("timeout")).map(Long::parseLong).orElse(Long.valueOf(DEFAULT_TIMEOUT_MS))).longValue();
        if (this.clientActor == null || this.connection == null) {
            return;
        }
        PatternsCS.ask(getContext().actorOf(AggregateActor.props(this.connectionId, this.clientActor, this.connection.getClientCount(), longValue)), command, longValue).whenComplete((obj, th) -> {
            this.log.debug("Got response to {}: {}", command.getType(), th == null ? obj : th);
            if (th != null) {
                consumer2.accept(th);
                return;
            }
            if (obj instanceof Status.Failure) {
                consumer2.accept(((Status.Failure) obj).cause());
            } else if (obj instanceof DittoRuntimeException) {
                consumer2.accept((DittoRuntimeException) obj);
            } else {
                consumer.accept(obj);
            }
        });
    }

    private void handleException(String str, ActorRef actorRef, Throwable th) {
        DittoRuntimeException build = th instanceof DittoRuntimeException ? (DittoRuntimeException) th : ConnectionFailedException.newBuilder(this.connectionId).description(th.getMessage()).cause(th).build();
        actorRef.tell(build, getSelf());
        this.log.warning("Operation <{}> on connection <{}> failed due to {}: {}.", str, this.connectionId, build.getClass().getSimpleName(), build.getMessage());
    }

    private void retrieveConnection(RetrieveConnection retrieveConnection) {
        ConditionChecker.checkNotNull(this.connection, "Connection");
        getSender().tell(RetrieveConnectionResponse.of(this.connection, retrieveConnection.getDittoHeaders()), getSelf());
    }

    private void retrieveConnectionStatus(RetrieveConnectionStatus retrieveConnectionStatus) {
        ConditionChecker.checkNotNull(this.connection, "Connection");
        getSender().tell(RetrieveConnectionStatusResponse.of(this.connectionId, this.connection.getConnectionStatus(), retrieveConnectionStatus.getDittoHeaders()), getSelf());
    }

    private void retrieveConnectionMetrics(RetrieveConnectionMetrics retrieveConnectionMetrics) {
        ConditionChecker.checkNotNull(this.connection, "Connection");
        ActorRef sender = getSender();
        askClientActor(retrieveConnectionMetrics, obj -> {
            sender.tell(obj, getSelf());
        }, th -> {
            handleException("retrieve-metrics", sender, th);
        });
    }

    private void subscribeForEvents() {
        ConditionChecker.checkNotNull(this.connection, "Connection");
        this.uniqueTopicPaths = (Set) this.connection.getTargets().stream().flatMap(target -> {
            return target.getTopics().stream();
        }).collect(Collectors.toSet());
        forEachPubSubTopicDo(str -> {
            DistributedPubSubMediator.Subscribe subscribe = new DistributedPubSubMediator.Subscribe(str, "connection:" + this.connectionId, getSelf());
            this.log.debug("Subscribing to pubsub topic '{}' for connection '{}'.", str, this.connectionId);
            this.pubSubMediator.tell(subscribe, getSelf());
        });
    }

    private void unsubscribeFromEvents() {
        forEachPubSubTopicDo(str -> {
            this.log.debug("Unsubscribing from pubsub topic '{}' for connection '{}'.", str, this.connectionId);
            this.pubSubMediator.tell(new DistributedPubSubMediator.Unsubscribe(str, "connection:" + this.connectionId, getSelf()), getSelf());
        });
    }

    private void forEachPubSubTopicDo(Consumer<String> consumer) {
        this.uniqueTopicPaths.stream().map(TopicPathMapper::mapToPubSubTopic).filter((v0) -> {
            return v0.isPresent();
        }).map((v0) -> {
            return v0.get();
        }).forEach(consumer);
    }

    private void handleCommandDuringInitialization(ConnectivityCommand connectivityCommand) {
        this.log.debug("Unexpected command during initialization of actor received: {} - Terminating this actor and sending 'ConnectionNotAccessibleException' to requester..", connectivityCommand.getType());
        getSender().tell(ConnectionNotAccessibleException.newBuilder(connectivityCommand.getId()).dittoHeaders(connectivityCommand.getDittoHeaders()).build(), getSelf());
    }

    private <E extends Event> void persistEvent(E e, Consumer<E> consumer) {
        persist(e, event -> {
            this.log.debug("Successfully persisted Event '{}'", event.getType());
            consumer.accept(event);
            this.pubSubMediator.tell(new DistributedPubSubMediator.Publish(e.getType(), e, true), getSelf());
            if (lastSequenceNr() - this.lastSnapshotSequenceNr > this.snapshotThreshold) {
                doSaveSnapshot();
            }
        });
    }

    private void doSaveSnapshot() {
        if (this.snapshotInProgress) {
            this.log.debug("Already requested taking a Snapshot - not doing it again");
        } else {
            if (this.connection == null) {
                this.log.warning("Connection and MappingContext must not be null when taking snapshot.");
                return;
            }
            this.snapshotInProgress = true;
            this.log.info("Attempting to save Snapshot for Connection: <{}> ..", this.connection);
            saveSnapshot(this.snapshotAdapter.toSnapshotStore(this.connection));
        }
    }

    private void startClientActorIfRequired() {
        ConditionChecker.checkNotNull(this.connectionId, "connectionId");
        ConditionChecker.checkNotNull(this.connection, "connection");
        if (this.clientActor != null) {
            this.log.debug("ClientActor already started.");
            return;
        }
        int clientCount = this.connection.getClientCount();
        this.log.info("Starting ClientActor for connection <{}> with <{}> clients.", this.connectionId, Integer.valueOf(clientCount));
        this.clientActor = getContext().actorOf(new ClusterRouterPool(new RoundRobinPool(clientCount), new ClusterRouterPoolSettings(clientCount, 1, true, Collections.singleton("connectivity"))).props(this.propsFactory.getActorPropsForType(this.connection, this.conciergeForwarder)), "client-router");
    }

    private void stopClientActor() {
        if (this.clientActor != null) {
            this.log.debug("Stopping the client actor.");
            stopChildActor(this.clientActor);
            this.clientActor = null;
        }
    }

    private void stopChildActor(ActorRef actorRef) {
        this.log.debug("Stopping child actor '{}'", actorRef.path());
        getContext().stop(actorRef);
    }

    private void stopSelf() {
        this.log.debug("Shutting down");
        getContext().getParent().tell(PoisonPill.getInstance(), getSelf());
    }

    private void handleSubscribeAck(DistributedPubSubMediator.SubscribeAck subscribeAck) {
        this.log.debug("Successfully subscribed to distributed pub/sub on topic '{}'", subscribeAck.subscribe().topic());
    }

    private void handleUnsubscribeAck(DistributedPubSubMediator.UnsubscribeAck unsubscribeAck) {
        this.log.debug("Successfully unsubscribed from distributed pub/sub on topic '{}'", unsubscribeAck.unsubscribe().topic());
    }

    private void handleSnapshotSuccess(SaveSnapshotSuccess saveSnapshotSuccess) {
        this.log.debug("Snapshot was saved successfully: {}", saveSnapshotSuccess);
    }
}
