package org.eclipse.ditto.services.connectivity.messaging;

import akka.actor.AbstractFSM;
import akka.actor.ActorRef;
import akka.actor.FSM;
import akka.actor.Props;
import akka.actor.Status;
import akka.event.DiagnosticLoggingAdapter;
import akka.japi.Pair;
import akka.japi.pf.FSMStateFunctionBuilder;
import akka.japi.pf.FSMTransitionHandlerBuilder;
import akka.pattern.PatternsCS;
import akka.routing.DefaultResizer;
import akka.routing.RoundRobinPool;
import akka.util.Timeout;
import com.typesafe.config.Config;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.text.MessageFormat;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.eclipse.ditto.model.base.common.ConditionChecker;
import org.eclipse.ditto.model.base.exceptions.DittoRuntimeException;
import org.eclipse.ditto.model.base.headers.DittoHeaders;
import org.eclipse.ditto.model.base.headers.WithDittoHeaders;
import org.eclipse.ditto.model.connectivity.AddressMetric;
import org.eclipse.ditto.model.connectivity.Connection;
import org.eclipse.ditto.model.connectivity.ConnectionStatus;
import org.eclipse.ditto.model.connectivity.ConnectivityModelFactory;
import org.eclipse.ditto.model.connectivity.ExternalMessage;
import org.eclipse.ditto.model.connectivity.MappingContext;
import org.eclipse.ditto.model.connectivity.Source;
import org.eclipse.ditto.model.connectivity.SourceMetrics;
import org.eclipse.ditto.model.connectivity.Target;
import org.eclipse.ditto.model.connectivity.TargetMetrics;
import org.eclipse.ditto.services.connectivity.messaging.DittoHeadersFilter;
import org.eclipse.ditto.services.connectivity.messaging.internal.ClientConnected;
import org.eclipse.ditto.services.connectivity.messaging.internal.ClientDisconnected;
import org.eclipse.ditto.services.connectivity.messaging.internal.ConnectClient;
import org.eclipse.ditto.services.connectivity.messaging.internal.ConnectionFailure;
import org.eclipse.ditto.services.connectivity.messaging.internal.DisconnectClient;
import org.eclipse.ditto.services.connectivity.messaging.internal.RetrieveAddressMetric;
import org.eclipse.ditto.services.utils.akka.LogUtil;
import org.eclipse.ditto.services.utils.config.ConfigUtil;
import org.eclipse.ditto.signals.base.Signal;
import org.eclipse.ditto.signals.commands.base.Command;
import org.eclipse.ditto.signals.commands.connectivity.exceptions.ConnectionFailedException;
import org.eclipse.ditto.signals.commands.connectivity.modify.CloseConnection;
import org.eclipse.ditto.signals.commands.connectivity.modify.ConnectivityModifyCommand;
import org.eclipse.ditto.signals.commands.connectivity.modify.CreateConnection;
import org.eclipse.ditto.signals.commands.connectivity.modify.DeleteConnection;
import org.eclipse.ditto.signals.commands.connectivity.modify.ModifyConnection;
import org.eclipse.ditto.signals.commands.connectivity.modify.OpenConnection;
import org.eclipse.ditto.signals.commands.connectivity.modify.TestConnection;
import org.eclipse.ditto.signals.commands.connectivity.query.RetrieveConnectionMetrics;
import org.eclipse.ditto.signals.commands.connectivity.query.RetrieveConnectionMetricsResponse;

/* loaded from: input_file:org/eclipse/ditto/services/connectivity/messaging/BaseClientActor.class */
public abstract class BaseClientActor extends AbstractFSM<BaseClientState, BaseClientData> {
    private static final int CONNECTING_TIMEOUT = 10;
    protected static final int RETRIEVE_METRICS_TIMEOUT = 2;
    private static final int SOCKET_CHECK_TIMEOUT_MS = 2000;
    private final List<String> headerBlacklist;
    private final ActorRef conciergeForwarder;

    @Nullable
    private ActorRef messageMappingProcessorActor;
    protected final DiagnosticLoggingAdapter log = LogUtil.obtain(this);
    private long consumedMessageCounter = 0;
    private long publishedMessageCounter = 0;

    /* JADX INFO: Access modifiers changed from: protected */
    public BaseClientActor(Connection connection, ConnectionStatus connectionStatus, ActorRef actorRef) {
        ConditionChecker.checkNotNull(connection, "connection");
        Config config = getContext().getSystem().settings().config();
        Duration duration = config.getDuration("ditto.connectivity.client.init-timeout");
        this.headerBlacklist = config.getStringList("ditto.connectivity.message.header-blacklist");
        this.conciergeForwarder = actorRef;
        startWith(BaseClientState.DISCONNECTED, new BaseClientData(connection.getId(), connection, ConnectionStatus.UNKNOWN, connectionStatus, "initialized", Instant.now(), null));
        when(BaseClientState.DISCONNECTED, scala.concurrent.duration.Duration.fromNanos(duration.toNanos()), inDisconnectedState(duration));
        when(BaseClientState.CONNECTING, scala.concurrent.duration.Duration.create(10L, TimeUnit.SECONDS), inConnectingState());
        when(BaseClientState.CONNECTED, inConnectedState());
        when(BaseClientState.DISCONNECTING, scala.concurrent.duration.Duration.create(10L, TimeUnit.SECONDS), inDisconnectingState());
        when(BaseClientState.FAILED, inFailedState());
        onTransition(handleTransitions());
        whenUnhandled(unhandledHandler(connection.getId()).anyEvent((obj, baseClientData) -> {
            this.log.warning("received unknown/unsupported message {} in state {} - status: {}", obj, stateName(), baseClientData.getConnectionStatus() + ": " + baseClientData.getConnectionStatusDetails().orElse(""));
            return stay();
        }));
        initialize();
    }

    private FSMTransitionHandlerBuilder<BaseClientState> handleTransitions() {
        return matchState(BaseClientState.DISCONNECTED, BaseClientState.CONNECTING, this::onTransition).state(BaseClientState.DISCONNECTED, BaseClientState.CONNECTED, this::onTransition).state(BaseClientState.DISCONNECTED, BaseClientState.DISCONNECTING, this::onTransition).state(BaseClientState.DISCONNECTED, BaseClientState.FAILED, this::onTransition).state(BaseClientState.CONNECTING, BaseClientState.CONNECTED, this::onTransition).state(BaseClientState.CONNECTING, BaseClientState.DISCONNECTING, this::onTransition).state(BaseClientState.CONNECTING, BaseClientState.DISCONNECTED, this::onTransition).state(BaseClientState.CONNECTING, BaseClientState.CONNECTING, this::onTransition).state(BaseClientState.CONNECTING, BaseClientState.FAILED, this::onTransition).state(BaseClientState.CONNECTED, BaseClientState.CONNECTING, this::onTransition).state(BaseClientState.CONNECTED, BaseClientState.DISCONNECTING, this::onTransition).state(BaseClientState.CONNECTED, BaseClientState.DISCONNECTED, this::onTransition).state(BaseClientState.CONNECTED, BaseClientState.FAILED, this::onTransition).state(BaseClientState.DISCONNECTING, BaseClientState.CONNECTING, this::onTransition).state(BaseClientState.DISCONNECTING, BaseClientState.CONNECTED, this::onTransition).state(BaseClientState.DISCONNECTING, BaseClientState.DISCONNECTED, this::onTransition).state(BaseClientState.DISCONNECTING, BaseClientState.FAILED, this::onTransition).state(BaseClientState.FAILED, BaseClientState.CONNECTING, this::onTransition).state(BaseClientState.FAILED, BaseClientState.CONNECTED, this::onTransition).state(BaseClientState.FAILED, BaseClientState.DISCONNECTING, this::onTransition).state(BaseClientState.FAILED, BaseClientState.DISCONNECTED, this::onTransition);
    }

    private FSMStateFunctionBuilder<BaseClientState, BaseClientData> inDisconnectedState(Duration duration) {
        return matchEvent(Arrays.asList(CloseConnection.class, DeleteConnection.class), BaseClientData.class, (obj, baseClientData) -> {
            return stay().using(baseClientData.setOrigin(getSender()).setDesiredConnectionStatus(ConnectionStatus.CLOSED)).replying(new Status.Success(BaseClientState.DISCONNECTED));
        }).eventEquals(StateTimeout(), BaseClientData.class, (fSM$StateTimeout$, baseClientData2) -> {
            if (baseClientData2.getDesiredConnectionStatus() != ConnectionStatus.OPEN) {
                return stay();
            }
            this.log.info("Did not receive connect command within {}, trying to go to CONNECTING", duration);
            return goTo(BaseClientState.CONNECTING);
        }).event(TestConnection.class, BaseClientData.class, (testConnection, baseClientData3) -> {
            Connection connection = testConnection.getConnection();
            if (!canConnectViaSocket(connection, testConnection.getDittoHeaders())) {
                return stop();
            }
            try {
                CompletionStage<Status.Status> doTestConnection = doTestConnection(connection);
                CompletionStage<Status.Status> testMessageMappingProcessor = testMessageMappingProcessor((MappingContext) connection.getMappingContext().orElse(null));
                ActorRef sender = getSender();
                doTestConnection.toCompletableFuture().thenCombine((CompletionStage) testMessageMappingProcessor, (status, status2) -> {
                    return ((status instanceof Status.Success) && (status2 instanceof Status.Success)) ? new Status.Success("successfully connected + initialized mapper") : status instanceof Status.Failure ? status : status2;
                }).thenAccept((Consumer<? super V>) status3 -> {
                    sender.tell(status3, getSelf());
                });
            } catch (DittoRuntimeException e) {
                getSender().tell(new Status.Failure(e), getSelf());
            }
            return stop();
        }).event(CreateConnection.class, BaseClientData.class, (createConnection, baseClientData4) -> {
            Connection connection = createConnection.getConnection();
            canConnectViaSocket(connection, createConnection.getDittoHeaders());
            return goTo(baseClientStateFromConnectionStatus(connection.getConnectionStatus())).using(baseClientData4.setConnection(connection).setDesiredConnectionStatus(connection.getConnectionStatus()).setConnectionStatusDetails("creating connection at " + Instant.now()).setOrigin(getSender()));
        }).event(OpenConnection.class, BaseClientData.class, (openConnection, baseClientData5) -> {
            return goTo(BaseClientState.CONNECTING).using(baseClientData5.setOrigin(getSender()));
        });
    }

    private FSMStateFunctionBuilder<BaseClientState, BaseClientData> inConnectingState() {
        return matchEvent(Arrays.asList(CreateConnection.class, OpenConnection.class), BaseClientData.class, (obj, baseClientData) -> {
            return stay().using(baseClientData.setOrigin(getSender()));
        }).event(Arrays.asList(CloseConnection.class, DeleteConnection.class), BaseClientData.class, (obj2, baseClientData2) -> {
            return goTo(BaseClientState.DISCONNECTING).using(baseClientData2.setDesiredConnectionStatus(ConnectionStatus.CLOSED).setConnectionStatusDetails("closing or deleting connection at " + Instant.now()).setOrigin(getSender()));
        }).eventEquals(StateTimeout(), BaseClientData.class, (fSM$StateTimeout$, baseClientData3) -> {
            return baseClientData3.getConnectionStatus() == ConnectionStatus.FAILED ? goTo(BaseClientState.CONNECTING) : goTo(BaseClientState.CONNECTING).using(baseClientData3.setConnectionStatus(ConnectionStatus.FAILED).setConnectionStatusDetails("Connecting timed out at " + Instant.now()));
        });
    }

    private FSMStateFunctionBuilder<BaseClientState, BaseClientData> inConnectedState() {
        return matchEvent(Arrays.asList(CloseConnection.class, DeleteConnection.class), BaseClientData.class, (obj, baseClientData) -> {
            return goTo(BaseClientState.DISCONNECTING).using(baseClientData.setOrigin(getSender()).setDesiredConnectionStatus(ConnectionStatus.CLOSED));
        }).event(OpenConnection.class, BaseClientData.class, (openConnection, baseClientData2) -> {
            return goTo(BaseClientState.CONNECTING).using(baseClientData2.setOrigin(getSender()));
        });
    }

    private FSMStateFunctionBuilder<BaseClientState, BaseClientData> inDisconnectingState() {
        return matchEvent(Arrays.asList(CloseConnection.class, DeleteConnection.class), BaseClientData.class, (obj, baseClientData) -> {
            return stay();
        }).eventEquals(StateTimeout(), BaseClientData.class, (fSM$StateTimeout$, baseClientData2) -> {
            return goTo(BaseClientState.CONNECTED).using(baseClientData2.setConnectionStatus(ConnectionStatus.OPEN).setConnectionStatusDetails("Disconnecting timed out, still connected at " + Instant.now()).setOrigin(getSender()));
        });
    }

    private FSMStateFunctionBuilder<BaseClientState, BaseClientData> inFailedState() {
        return matchEvent(OpenConnection.class, BaseClientData.class, (openConnection, baseClientData) -> {
            return baseClientData.getDesiredConnectionStatus() == ConnectionStatus.OPEN ? goTo(BaseClientState.CONNECTING).using(baseClientData.setOrigin(getSender())) : stay();
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public FSMStateFunctionBuilder<BaseClientState, BaseClientData> unhandledHandler(String str) {
        return matchEvent(RetrieveConnectionMetrics.class, BaseClientData.class, (retrieveConnectionMetrics, baseClientData) -> {
            return stay().using(baseClientData.setOrigin(getSender())).replying(RetrieveConnectionMetricsResponse.of(str, ConnectivityModelFactory.newConnectionMetrics(getCurrentConnectionStatus(), getCurrentConnectionStatusDetails().orElse(null), getInConnectionStatusSince(), ((BaseClientState) stateName()).name(), getCurrentSourcesMetrics(), getCurrentTargetsMetrics()), retrieveConnectionMetrics.getDittoHeaders().toBuilder().source(ConfigUtil.calculateInstanceUniqueSuffix()).build()));
        }).event(ModifyConnection.class, BaseClientData.class, (modifyConnection, baseClientData2) -> {
            Connection connection = modifyConnection.getConnection();
            return goTo(baseClientStateFromConnectionStatus(connection.getConnectionStatus())).using(baseClientData2.setConnection(connection).setDesiredConnectionStatus(connection.getConnectionStatus()).setConnectionStatusDetails("modifying connection at " + Instant.now()).setOrigin(getSender()));
        }).event(ConnectClient.class, BaseClientData.class, (connectClient, baseClientData3) -> {
            return shouldBeConnecting(baseClientData3) ? goTo(BaseClientState.CONNECTING).using(baseClientData3.setOrigin(getSender())) : goTo(BaseClientState.DISCONNECTING).using(baseClientData3.setOrigin(getSender()));
        }).event(DisconnectClient.class, BaseClientData.class, (disconnectClient, baseClientData4) -> {
            return goTo(BaseClientState.DISCONNECTING).using(baseClientData4.setOrigin(getSender()));
        }).event(ClientConnected.class, BaseClientData.class, this::handleClientConnected).event(ClientDisconnected.class, BaseClientData.class, this::handleClientDisconnected).event(ConnectionFailure.class, BaseClientData.class, this::handleConnectionFailure).event(ConnectivityModifyCommand.class, BaseClientData.class, (connectivityModifyCommand, baseClientData5) -> {
            cannotHandle(connectivityModifyCommand, baseClientData5.getConnection());
            return stay().using(baseClientData5.setOrigin(getSender()));
        }).event(Signal.class, BaseClientData.class, (signal, baseClientData6) -> {
            handleSignal(signal);
            return stay();
        }).event(ExternalMessage.class, BaseClientData.class, (externalMessage, baseClientData7) -> {
            handleExternalMessage(externalMessage);
            return stay();
        }).event(Status.Success.class, BaseClientData.class, (success, baseClientData8) -> {
            this.log.info("Got Status.Success: {}", success);
            return stay();
        });
    }

    private static BaseClientState baseClientStateFromConnectionStatus(ConnectionStatus connectionStatus) {
        return connectionStatus == ConnectionStatus.OPEN ? BaseClientState.CONNECTING : BaseClientState.DISCONNECTING;
    }

    private boolean canConnectViaSocket(Connection connection, DittoHeaders dittoHeaders) {
        if (checkHostAndPortForAvailability(connection.getHostname(), connection.getPort())) {
            return true;
        }
        getSender().tell(new Status.Failure(ConnectionFailedException.newBuilder(connection.getId()).dittoHeaders(dittoHeaders).description("Could not establish a connection on '" + connection.getHostname() + ":" + connection.getPort() + "'. Make sure the endpoint is reachable and that no firewall prevents the connection.").build()), getSelf());
        return false;
    }

    private boolean checkHostAndPortForAvailability(String str, int i) {
        try {
            Socket socket = new Socket();
            Throwable th = null;
            try {
                try {
                    socket.connect(new InetSocketAddress(str, i), SOCKET_CHECK_TIMEOUT_MS);
                    if (socket != null) {
                        if (0 != 0) {
                            try {
                                socket.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            socket.close();
                        }
                    }
                    return true;
                } finally {
                }
            } finally {
            }
        } catch (IOException e) {
            LogUtil.enhanceLogWithCustomField(this.log, BaseClientData.MDC_CONNECTION_ID, connectionId());
            this.log.warning("Socket could not be opened for <{}:{}>", str, Integer.valueOf(i));
            return false;
        }
    }

    private FSM.State<BaseClientState, BaseClientData> handleClientConnected(ClientConnected clientConnected, BaseClientData baseClientData) {
        LogUtil.enhanceLogWithCustomField(this.log, BaseClientData.MDC_CONNECTION_ID, connectionId());
        startMessageMappingProcessor((MappingContext) baseClientData.getConnection().getMappingContext().orElse(null));
        onClientConnected(clientConnected, baseClientData);
        return goTo(BaseClientState.CONNECTED).using(baseClientData.setConnectionStatus(ConnectionStatus.OPEN).setConnectionStatusDetails("Connected at " + Instant.now()).setOrigin(getSender())).replying(new Status.Success(BaseClientState.CONNECTED));
    }

    private FSM.State<BaseClientState, BaseClientData> handleClientDisconnected(ClientDisconnected clientDisconnected, BaseClientData baseClientData) {
        LogUtil.enhanceLogWithCustomField(this.log, BaseClientData.MDC_CONNECTION_ID, connectionId());
        if (isConsuming()) {
            stopMessageMappingProcessorActor();
        }
        onClientDisconnected(clientDisconnected, baseClientData);
        return goTo(BaseClientState.DISCONNECTED).using(baseClientData.setConnectionStatus(ConnectionStatus.CLOSED).setConnectionStatusDetails("Disconnected at " + Instant.now()).setOrigin(getSender())).replying(new Status.Success(BaseClientState.DISCONNECTED));
    }

    private FSM.State<BaseClientState, BaseClientData> handleConnectionFailure(ConnectionFailure connectionFailure, BaseClientData baseClientData) {
        LogUtil.enhanceLogWithCustomField(this.log, BaseClientData.MDC_CONNECTION_ID, connectionId());
        onConnectionFailure(connectionFailure, baseClientData);
        return stay().using(baseClientData.setConnectionStatus(ConnectionStatus.FAILED).setConnectionStatusDetails(connectionFailure.getFailureDescription()).setOrigin(getSender()));
    }

    protected void onTransition(BaseClientState baseClientState, BaseClientState baseClientState2) {
        LogUtil.enhanceLogWithCustomField(this.log, BaseClientData.MDC_CONNECTION_ID, connectionId());
        this.log.info("Transition: {} -> {}", baseClientState, baseClientState2);
        Connection connection = ((BaseClientData) nextStateData()).getConnection();
        ActorRef orElse = ((BaseClientData) nextStateData()).getOrigin().orElse(null);
        if (baseClientState2 != BaseClientState.CONNECTING) {
            if (baseClientState2 == BaseClientState.DISCONNECTING) {
                doDisconnectClient(connection, orElse);
            }
        } else if (baseClientState != BaseClientState.CONNECTED) {
            doConnectClient(connection, orElse);
        } else {
            this.log.info("Triggering reconnection");
            doReconnectClient(connection, orElse);
        }
    }

    protected abstract CompletionStage<Status.Status> doTestConnection(Connection connection);

    protected abstract void onClientConnected(ClientConnected clientConnected, BaseClientData baseClientData);

    protected abstract void onClientDisconnected(ClientDisconnected clientDisconnected, BaseClientData baseClientData);

    protected abstract Optional<ActorRef> getPublisherActor();

    /* JADX INFO: Access modifiers changed from: protected */
    public void onConnectionFailure(ConnectionFailure connectionFailure, BaseClientData baseClientData) {
        connectionFailure.getOrigin().ifPresent(actorRef -> {
            actorRef.tell(connectionFailure.getFailure(), getSelf());
        });
    }

    protected abstract void doConnectClient(Connection connection, @Nullable ActorRef actorRef);

    protected abstract void doReconnectClient(Connection connection, @Nullable ActorRef actorRef);

    protected abstract void doDisconnectClient(Connection connection, @Nullable ActorRef actorRef);

    protected abstract Map<String, AddressMetric> getSourceConnectionStatus(Source source);

    protected abstract Map<String, AddressMetric> getTargetConnectionStatus(Target target);

    /* JADX INFO: Access modifiers changed from: protected */
    public final CompletableFuture<Pair<String, AddressMetric>> retrieveAddressMetric(String str, String str2) {
        Optional findChild = getContext().findChild(str2);
        if (findChild.isPresent()) {
            return PatternsCS.ask((ActorRef) findChild.get(), RetrieveAddressMetric.getInstance(), Timeout.apply(2L, TimeUnit.SECONDS)).handle((obj, th) -> {
                return obj != null ? Pair.create(str, (AddressMetric) obj) : Pair.create(str, ConnectivityModelFactory.newAddressMetric(ConnectionStatus.FAILED, th.getClass().getSimpleName() + ": " + th.getMessage(), -1L, Instant.now()));
            }).toCompletableFuture();
        }
        this.log.warning("Consumer actor child <{}> was not found", str2);
        return CompletableFuture.completedFuture(Pair.create(str, ConnectivityModelFactory.newAddressMetric(ConnectionStatus.FAILED, "child <" + str2 + "> not found", -1L, Instant.now())));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void incrementConsumedMessageCounter() {
        this.consumedMessageCounter++;
    }

    protected final void incrementPublishedMessageCounter() {
        this.publishedMessageCounter++;
    }

    private static boolean shouldBeConnecting(BaseClientData baseClientData) {
        return baseClientData.getConnectionStatus() == ConnectionStatus.OPEN;
    }

    private void handleSignal(Signal<?> signal) {
        enhanceLogUtil(signal);
        if (this.messageMappingProcessorActor != null) {
            this.messageMappingProcessorActor.tell(signal, getSelf());
        } else {
            this.log.info("Cannot handle <{}> signal, no MessageMappingProcessor available.", signal.getType());
        }
    }

    private void enhanceLogUtil(WithDittoHeaders<?> withDittoHeaders) {
        LogUtil.enhanceLogWithCorrelationId(this.log, withDittoHeaders);
        LogUtil.enhanceLogWithCustomField(this.log, BaseClientData.MDC_CONNECTION_ID, connectionId());
    }

    private void handleExternalMessage(ExternalMessage externalMessage) {
        getPublisherActor().ifPresent(actorRef -> {
            incrementPublishedMessageCounter();
            actorRef.forward(externalMessage, getContext());
        });
    }

    private ConnectionStatus getCurrentConnectionStatus() {
        return ((BaseClientData) stateData()).getConnectionStatus();
    }

    private Instant getInConnectionStatusSince() {
        return ((BaseClientData) stateData()).getInConnectionStatusSince();
    }

    private Optional<String> getCurrentConnectionStatusDetails() {
        return ((BaseClientData) stateData()).getConnectionStatusDetails();
    }

    private List<SourceMetrics> getCurrentSourcesMetrics() {
        return (List) getSourcesOrEmptySet().stream().map(source -> {
            return ConnectivityModelFactory.newSourceMetrics(getSourceConnectionStatus(source), this.consumedMessageCounter);
        }).collect(Collectors.toList());
    }

    private List<TargetMetrics> getCurrentTargetsMetrics() {
        return (List) getTargetsOrEmptySet().stream().map(target -> {
            return ConnectivityModelFactory.newTargetMetrics(getTargetConnectionStatus(target), this.publishedMessageCounter);
        }).collect(Collectors.toList());
    }

    private CompletionStage<Status.Status> testMessageMappingProcessor(@Nullable MappingContext mappingContext) {
        try {
            MessageMappingProcessor.of(connectionId(), mappingContext, getContext().getSystem(), this.log);
            return CompletableFuture.completedFuture(new Status.Success("mapping"));
        } catch (DittoRuntimeException e) {
            this.log.info("Got DittoRuntimeException during initialization of MessageMappingProcessor: {} {} - desc: {}", e.getClass().getSimpleName(), e.getMessage(), e.getDescription().orElse(""));
            getSender().tell(e, getSelf());
            return CompletableFuture.completedFuture(new Status.Failure(e));
        }
    }

    private void startMessageMappingProcessor(@Nullable MappingContext mappingContext) {
        if (getMessageMappingProcessorActor().isPresent()) {
            this.log.info("MessageMappingProcessor already instantiated, don't initialize again..");
            return;
        }
        Connection connection = connection();
        try {
            MessageMappingProcessor of = MessageMappingProcessor.of(connectionId(), mappingContext, getContext().getSystem(), this.log);
            this.log.info("Configured for processing messages with the following MessageMapperRegistry: <{}>", of.getRegistry());
            this.log.debug("Starting MessageMappingProcessorActor with pool size of <{}>.", Integer.valueOf(connection.getProcessorPoolSize()));
            this.messageMappingProcessorActor = getContext().actorOf(new RoundRobinPool(1).withDispatcher("message-mapping-processor-dispatcher").withResizer(new DefaultResizer(1, connection.getProcessorPoolSize())).props(MessageMappingProcessorActor.props(getSelf(), this.conciergeForwarder, connection.getAuthorizationContext(), new DittoHeadersFilter(DittoHeadersFilter.Mode.EXCLUDE, this.headerBlacklist), of, connectionId())), MessageMappingProcessorActor.ACTOR_NAME);
        } catch (DittoRuntimeException e) {
            this.log.info("Got DittoRuntimeException during initialization of MessageMappingProcessor: {} {} - desc: {}", e.getClass().getSimpleName(), e.getMessage(), e.getDescription().orElse(""));
            getSender().tell(e, getSelf());
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final Optional<ActorRef> getMessageMappingProcessorActor() {
        return Optional.ofNullable(this.messageMappingProcessorActor);
    }

    private void stopMessageMappingProcessorActor() {
        if (this.messageMappingProcessorActor != null) {
            this.log.debug("Stopping MessageMappingProcessorActor.");
            getContext().stop(this.messageMappingProcessorActor);
            this.messageMappingProcessorActor = null;
        }
    }

    private void cannotHandle(Command<?> command, @Nullable Connection connection) {
        enhanceLogUtil(command);
        this.log.info("Command <{}> cannot be handled in current state <{}>.", command.getType(), stateName());
        getSender().tell(new Status.Failure(ConnectionFailedException.newBuilder(connection != null ? connection.getId() : "?").message(MessageFormat.format("Cannot execute command <{0}> in current state <{1}>.", command.getType(), stateName())).build()), getSelf());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static String escapeActorName(String str) {
        return str.replace('/', '_');
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static <T> CompletableFuture<List<T>> collectAsList(List<CompletableFuture<T>> list) {
        return collect(list, Collectors.toList());
    }

    private static <T, A, R> CompletableFuture<R> collect(List<CompletableFuture<T>> list, Collector<T, A, R> collector) {
        return (CompletableFuture<R>) CompletableFuture.allOf((CompletableFuture[]) list.toArray(new CompletableFuture[list.size()])).thenApply(r5 -> {
            return list.stream().map((v0) -> {
                return v0.join();
            }).collect(collector);
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final ActorRef startChildActor(String str, Props props) {
        this.log.debug("Starting child actor '{}'", str);
        return getContext().actorOf(props, escapeActorName(str));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void stopChildActor(String str) {
        String escapeActorName = escapeActorName(str);
        Optional findChild = getContext().findChild(escapeActorName);
        if (!findChild.isPresent()) {
            this.log.debug("Cannot stop child actor '{}' because it does not exist.", escapeActorName);
        } else {
            this.log.debug("Stopping child actor '{}'", escapeActorName);
            getContext().stop((ActorRef) findChild.get());
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void stopChildActor(ActorRef actorRef) {
        this.log.debug("Stopping child actor '{}'", actorRef.path());
        getContext().stop(actorRef);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final boolean isConsuming() {
        return !connection().getSources().isEmpty();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final boolean isPublishing() {
        return !connection().getTargets().isEmpty();
    }

    protected final Connection connection() {
        return ((BaseClientData) stateData()).getConnection();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final String connectionId() {
        return ((BaseClientData) stateData()).getConnectionId();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final Set<Source> getSourcesOrEmptySet() {
        return connection().getSources();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final Set<Target> getTargetsOrEmptySet() {
        return connection().getTargets();
    }
}
