package io.zeebe.gateway.impl.broker.cluster;

import io.zeebe.gateway.impl.broker.request.BrokerTopologyRequest;
import io.zeebe.gateway.impl.broker.response.BrokerResponse;
import io.zeebe.protocol.impl.data.cluster.TopologyResponseDto;
import io.zeebe.transport.ClientOutput;
import io.zeebe.transport.ClientResponse;
import io.zeebe.transport.SocketAddress;
import io.zeebe.util.sched.Actor;
import io.zeebe.util.sched.clock.ActorClock;
import io.zeebe.util.sched.future.ActorFuture;
import io.zeebe.util.sched.future.CompletableActorFuture;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

/* loaded from: input_file:io/zeebe/gateway/impl/broker/cluster/BrokerTopologyManagerImpl.class */
public class BrokerTopologyManagerImpl extends Actor implements BrokerTopologyManager {
    public static final Duration MAX_REFRESH_INTERVAL_MILLIS = Duration.ofSeconds(10);
    public static final Duration MIN_REFRESH_INTERVAL_MILLIS = Duration.ofMillis(300);
    public static final Duration TOPOLOGY_TIMEOUT = Duration.ofSeconds(5);
    protected final ClientOutput output;
    protected final BiConsumer<Integer, SocketAddress> registerEndpoint;
    protected final List<CompletableActorFuture<BrokerClusterState>> nextTopologyFutures = new ArrayList();
    protected final BrokerTopologyRequest topologyRequest = new BrokerTopologyRequest();
    protected int refreshAttempt = 0;
    protected long lastRefreshTime = -1;
    protected final AtomicReference<BrokerClusterStateImpl> topology = new AtomicReference<>(null);

    public BrokerTopologyManagerImpl(ClientOutput clientOutput, BiConsumer<Integer, SocketAddress> biConsumer) {
        this.output = clientOutput;
        this.registerEndpoint = biConsumer;
    }

    protected void onActorStarted() {
        this.actor.run(this::refreshTopology);
    }

    public ActorFuture<Void> close() {
        return this.actor.close();
    }

    @Override // io.zeebe.gateway.impl.broker.cluster.BrokerTopologyManager
    public BrokerClusterState getTopology() {
        return this.topology.get();
    }

    @Override // io.zeebe.gateway.impl.broker.cluster.BrokerTopologyManager
    public ActorFuture<BrokerClusterState> requestTopology() {
        CompletableActorFuture completableActorFuture = new CompletableActorFuture();
        this.actor.run(() -> {
            boolean isEmpty = this.nextTopologyFutures.isEmpty();
            this.nextTopologyFutures.add(completableActorFuture);
            if (isEmpty) {
                scheduleNextRefresh();
            }
        });
        return completableActorFuture;
    }

    @Override // io.zeebe.gateway.impl.broker.cluster.BrokerTopologyManager
    public void withTopology(Consumer<BrokerClusterState> consumer) {
        BrokerClusterStateImpl brokerClusterStateImpl = this.topology.get();
        if (brokerClusterStateImpl != null) {
            consumer.accept(brokerClusterStateImpl);
        } else {
            this.actor.run(() -> {
                this.actor.runOnCompletion(requestTopology(), (brokerClusterState, th) -> {
                    if (th == null) {
                        consumer.accept(brokerClusterState);
                    } else {
                        withTopology(consumer);
                    }
                });
            });
        }
    }

    private void scheduleNextRefresh() {
        long currentTimeMillis = ActorClock.currentTimeMillis() - this.lastRefreshTime;
        if (currentTimeMillis >= MIN_REFRESH_INTERVAL_MILLIS.toMillis()) {
            refreshTopology();
        } else {
            this.actor.runDelayed(Duration.ofMillis(MIN_REFRESH_INTERVAL_MILLIS.toMillis() - currentTimeMillis), this::refreshTopology);
        }
    }

    @Override // io.zeebe.gateway.impl.broker.cluster.BrokerTopologyManager
    public void provideTopology(TopologyResponseDto topologyResponseDto) {
        this.actor.call(() -> {
            onNewTopology(topologyResponseDto);
        });
    }

    private void refreshTopology() {
        BrokerClusterStateImpl brokerClusterStateImpl = this.topology.get();
        ActorFuture sendRequest = this.output.sendRequest(Integer.valueOf(brokerClusterStateImpl != null ? brokerClusterStateImpl.getRandomBroker() : -1), this.topologyRequest, TOPOLOGY_TIMEOUT);
        this.refreshAttempt++;
        this.lastRefreshTime = ActorClock.currentTimeMillis();
        this.actor.runOnCompletion(sendRequest, this::handleResponse);
        this.actor.runDelayed(MAX_REFRESH_INTERVAL_MILLIS, scheduleIdleRefresh());
    }

    private Runnable scheduleIdleRefresh() {
        int i = this.refreshAttempt;
        return () -> {
            if (i == this.refreshAttempt) {
                this.actor.run(this::refreshTopology);
            }
        };
    }

    private void handleResponse(ClientResponse clientResponse, Throwable th) {
        if (th != null) {
            failRefreshFutures(th);
            return;
        }
        BrokerResponse<TopologyResponseDto> response = this.topologyRequest.getResponse(clientResponse);
        if (response.isResponse()) {
            onNewTopology(response.getResponse());
        } else {
            failRefreshFutures(new RuntimeException("Failed to refresh topology: " + response));
        }
    }

    private void onNewTopology(TopologyResponseDto topologyResponseDto) {
        BrokerClusterStateImpl brokerClusterStateImpl = new BrokerClusterStateImpl(topologyResponseDto, this.registerEndpoint);
        this.topology.set(brokerClusterStateImpl);
        completeRefreshFutures(brokerClusterStateImpl);
    }

    private void completeRefreshFutures(BrokerClusterStateImpl brokerClusterStateImpl) {
        this.nextTopologyFutures.forEach(completableActorFuture -> {
            completableActorFuture.complete(brokerClusterStateImpl);
        });
        this.nextTopologyFutures.clear();
    }

    private void failRefreshFutures(Throwable th) {
        this.nextTopologyFutures.forEach(completableActorFuture -> {
            completableActorFuture.completeExceptionally("Could not refresh topology", th);
        });
        this.nextTopologyFutures.clear();
    }
}
