package io.zeebe.gateway.impl.broker;

import io.zeebe.dispatcher.Dispatcher;
import io.zeebe.dispatcher.Dispatchers;
import io.zeebe.gateway.Loggers;
import io.zeebe.gateway.impl.broker.cluster.BrokerTopologyManager;
import io.zeebe.gateway.impl.broker.cluster.BrokerTopologyManagerImpl;
import io.zeebe.gateway.impl.broker.request.BrokerRequest;
import io.zeebe.gateway.impl.broker.response.BrokerResponse;
import io.zeebe.gateway.impl.configuration.GatewayCfg;
import io.zeebe.transport.ClientTransport;
import io.zeebe.transport.ClientTransportBuilder;
import io.zeebe.transport.RemoteAddress;
import io.zeebe.transport.SocketAddress;
import io.zeebe.transport.Transports;
import io.zeebe.transport.impl.memory.NonBlockingMemoryPool;
import io.zeebe.transport.impl.memory.UnboundedMemoryPool;
import io.zeebe.util.ByteValue;
import io.zeebe.util.sched.ActorScheduler;
import io.zeebe.util.sched.clock.ActorClock;
import io.zeebe.util.sched.future.ActorFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import org.slf4j.Logger;

/* loaded from: input_file:io/zeebe/gateway/impl/broker/BrokerClientImpl.class */
public class BrokerClientImpl implements BrokerClient {
    public static final Logger LOG = Loggers.GATEWAY_LOGGER;
    protected final ActorScheduler actorScheduler;
    private final Dispatcher dataFrameReceiveBuffer;
    protected final ClientTransport transport;
    private final ClientTransport internalTransport;
    private final BrokerRequestManager requestManager;
    protected final BrokerTopologyManagerImpl topologyManager;
    protected boolean isClosed;

    public BrokerClientImpl(GatewayCfg gatewayCfg) {
        this(gatewayCfg, null);
    }

    public BrokerClientImpl(GatewayCfg gatewayCfg, ActorClock actorClock) {
        this.actorScheduler = ActorScheduler.newActorScheduler().setCpuBoundActorThreadCount(gatewayCfg.getThreads().getManagementThreads()).setIoBoundActorThreadCount(0).setActorClock(actorClock).setSchedulerName("gateway").build();
        this.actorScheduler.start();
        ByteValue transportBuffer = gatewayCfg.getCluster().getTransportBuffer();
        this.dataFrameReceiveBuffer = Dispatchers.create("gateway-receive-buffer").bufferSize(transportBuffer).modePubSub().frameMaxLength(1048576).actorScheduler(this.actorScheduler).build();
        ClientTransportBuilder scheduler = Transports.newClientTransport("broker-client").messageMaxLength(1048576).messageReceiveBuffer(this.dataFrameReceiveBuffer).messageMemoryPool(new UnboundedMemoryPool()).requestMemoryPool(new NonBlockingMemoryPool(transportBuffer)).scheduler(this.actorScheduler);
        ClientTransportBuilder scheduler2 = Transports.newClientTransport("broker-client-internal").messageMaxLength(1048576).messageMemoryPool(new UnboundedMemoryPool()).requestMemoryPool(new UnboundedMemoryPool()).scheduler(this.actorScheduler);
        this.transport = scheduler.build();
        this.internalTransport = scheduler2.build();
        this.topologyManager = new BrokerTopologyManagerImpl(this.internalTransport.getOutput(), (v1, v2) -> {
            registerEndpoint(v1, v2);
        });
        this.actorScheduler.submitActor(this.topologyManager);
        this.requestManager = new BrokerRequestManager(this.transport.getOutput(), this.topologyManager, new RoundRobinDispatchStrategy(this.topologyManager), gatewayCfg.getCluster().getRequestTimeout());
        this.actorScheduler.submitActor(this.requestManager);
        registerEndpoint(-1, SocketAddress.from(gatewayCfg.getCluster().getContactPoint()));
    }

    private void registerEndpoint(int i, SocketAddress socketAddress) {
        registerEndpoint(this.transport, i, socketAddress);
        registerEndpoint(this.internalTransport, i, socketAddress);
    }

    private void registerEndpoint(ClientTransport clientTransport, int i, SocketAddress socketAddress) {
        RemoteAddress endpoint = clientTransport.getEndpoint(i);
        if (endpoint == null || !socketAddress.equals(endpoint.getAddress())) {
            clientTransport.registerEndpoint(i, socketAddress);
        }
    }

    @Override // io.zeebe.gateway.impl.broker.BrokerClient, java.lang.AutoCloseable
    public void close() {
        if (this.isClosed) {
            return;
        }
        this.isClosed = true;
        LOG.debug("Closing client ...");
        doAndLogException(() -> {
        });
        LOG.debug("topology manager closed");
        ClientTransport clientTransport = this.transport;
        clientTransport.getClass();
        doAndLogException(clientTransport::close);
        LOG.debug("transport closed");
        ClientTransport clientTransport2 = this.internalTransport;
        clientTransport2.getClass();
        doAndLogException(clientTransport2::close);
        LOG.debug("internal transport closed");
        Dispatcher dispatcher = this.dataFrameReceiveBuffer;
        dispatcher.getClass();
        doAndLogException(dispatcher::close);
        LOG.debug("data frame receive buffer closed");
        try {
            this.actorScheduler.stop().get(15L, TimeUnit.SECONDS);
            LOG.debug("Client closed.");
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            throw new RuntimeException("Failed to gracefully shutdown client", e);
        }
    }

    protected void doAndLogException(Runnable runnable) {
        try {
            runnable.run();
        } catch (Exception e) {
            LOG.error("Exception when closing client. Ignoring", e);
        }
    }

    @Override // io.zeebe.gateway.impl.broker.BrokerClient
    public <T> ActorFuture<BrokerResponse<T>> sendRequest(BrokerRequest<T> brokerRequest) {
        return this.requestManager.sendRequest(brokerRequest);
    }

    @Override // io.zeebe.gateway.impl.broker.BrokerClient
    public <T> void sendRequest(BrokerRequest<T> brokerRequest, BrokerResponseConsumer<T> brokerResponseConsumer, Consumer<Throwable> consumer) {
        this.requestManager.sendRequest(brokerRequest, brokerResponseConsumer, consumer);
    }

    @Override // io.zeebe.gateway.impl.broker.BrokerClient
    public BrokerTopologyManager getTopologyManager() {
        return this.topologyManager;
    }

    public ClientTransport getTransport() {
        return this.transport;
    }

    public ActorScheduler getScheduler() {
        return this.actorScheduler;
    }
}
