package io.zeebe.gateway.impl.job;

import io.grpc.stub.StreamObserver;
import io.zeebe.gateway.Loggers;
import io.zeebe.gateway.impl.broker.BrokerClient;
import io.zeebe.gateway.impl.broker.cluster.BrokerClusterState;
import io.zeebe.gateway.metrics.LongPollingMetrics;
import io.zeebe.gateway.protocol.GatewayOuterClass;
import io.zeebe.util.sched.Actor;
import io.zeebe.util.sched.ActorControl;
import io.zeebe.util.sched.clock.ActorClock;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import org.slf4j.Logger;

/* loaded from: input_file:io/zeebe/gateway/impl/job/LongPollingActivateJobsHandler.class */
public final class LongPollingActivateJobsHandler extends Actor {
    private static final String JOBS_AVAILABLE_TOPIC = "jobsAvailable";
    private static final Logger LOG = Loggers.GATEWAY_LOGGER;
    private final ActivateJobsHandler activateJobsHandler;
    private final BrokerClient brokerClient;
    private final Duration longPollingTimeout;
    private final long probeTimeoutMillis;
    private final int emptyResponseThreshold;
    private final Map<String, JobTypeAvailabilityState> jobTypeState = new HashMap();
    private final LongPollingMetrics metrics = new LongPollingMetrics();

    /* loaded from: input_file:io/zeebe/gateway/impl/job/LongPollingActivateJobsHandler$Builder.class */
    public static class Builder {
        private static final long DEFAULT_LONG_POLLING_TIMEOUT = 10000;
        private static final long DEFAULT_PROBE_TIMEOUT = 10000;
        private static final int EMPTY_RESPONSE_THRESHOLD = 3;
        private BrokerClient brokerClient;
        private long longPollingTimeout = 10000;
        private long probeTimeoutMillis = 10000;
        private int minEmptyResponses = EMPTY_RESPONSE_THRESHOLD;

        public Builder setBrokerClient(BrokerClient brokerClient) {
            this.brokerClient = brokerClient;
            return this;
        }

        public Builder setLongPollingTimeout(long j) {
            this.longPollingTimeout = j;
            return this;
        }

        public Builder setProbeTimeoutMillis(long j) {
            this.probeTimeoutMillis = j;
            return this;
        }

        public Builder setMinEmptyResponses(int i) {
            this.minEmptyResponses = i;
            return this;
        }

        public LongPollingActivateJobsHandler build() {
            Objects.requireNonNull(this.brokerClient, "brokerClient");
            return new LongPollingActivateJobsHandler(this.brokerClient, this.longPollingTimeout, this.probeTimeoutMillis, this.minEmptyResponses);
        }
    }

    private LongPollingActivateJobsHandler(BrokerClient brokerClient, long j, long j2, int i) {
        this.brokerClient = brokerClient;
        this.activateJobsHandler = new ActivateJobsHandler(brokerClient);
        this.longPollingTimeout = Duration.ofMillis(j);
        this.probeTimeoutMillis = j2;
        this.emptyResponseThreshold = i;
    }

    public void activateJobs(GatewayOuterClass.ActivateJobsRequest activateJobsRequest, StreamObserver<GatewayOuterClass.ActivateJobsResponse> streamObserver) {
        activateJobs(new LongPollingActivateJobsRequest(activateJobsRequest, streamObserver));
    }

    public void activateJobs(LongPollingActivateJobsRequest longPollingActivateJobsRequest) {
        this.actor.run(() -> {
            JobTypeAvailabilityState jobTypeAvailabilityState = this.jobTypeState.get(longPollingActivateJobsRequest.getType());
            if (jobTypeAvailabilityState == null || jobTypeAvailabilityState.getEmptyResponses() < this.emptyResponseThreshold) {
                activateJobsUnchecked(longPollingActivateJobsRequest);
            } else {
                block(jobTypeAvailabilityState, longPollingActivateJobsRequest);
            }
        });
    }

    private void activateJobsUnchecked(LongPollingActivateJobsRequest longPollingActivateJobsRequest) {
        BrokerClusterState topology = this.brokerClient.getTopologyManager().getTopology();
        if (topology != null) {
            this.activateJobsHandler.activateJobs(topology.getPartitionsCount(), longPollingActivateJobsRequest.getRequest(), longPollingActivateJobsRequest.getMaxJobsToActivate(), longPollingActivateJobsRequest.getType(), activateJobsResponse -> {
                onResponse(longPollingActivateJobsRequest, activateJobsResponse);
            }, num -> {
                onCompleted(longPollingActivateJobsRequest, num);
            });
        }
    }

    protected void onActorStarted() {
        this.brokerClient.subscribeJobAvailableNotification(JOBS_AVAILABLE_TOPIC, this::onNotification);
        this.actor.runAtFixedRate(Duration.ofMillis(this.probeTimeoutMillis), this::probe);
    }

    private void onNotification(String str) {
        LOG.trace("Received jobs available notification for type {}.", str);
        this.actor.call(() -> {
            jobsAvailable(str);
        });
    }

    private void onCompleted(LongPollingActivateJobsRequest longPollingActivateJobsRequest, Integer num) {
        if (num.intValue() == longPollingActivateJobsRequest.getMaxJobsToActivate()) {
            this.actor.submit(() -> {
                jobsNotAvailable(longPollingActivateJobsRequest);
            });
            return;
        }
        ActorControl actorControl = this.actor;
        Objects.requireNonNull(longPollingActivateJobsRequest);
        actorControl.submit(longPollingActivateJobsRequest::complete);
    }

    private void onResponse(LongPollingActivateJobsRequest longPollingActivateJobsRequest, GatewayOuterClass.ActivateJobsResponse activateJobsResponse) {
        this.actor.submit(() -> {
            longPollingActivateJobsRequest.onResponse(activateJobsResponse);
            jobsAvailable(longPollingActivateJobsRequest.getType());
        });
    }

    private void jobsNotAvailable(LongPollingActivateJobsRequest longPollingActivateJobsRequest) {
        JobTypeAvailabilityState computeIfAbsent = this.jobTypeState.computeIfAbsent(longPollingActivateJobsRequest.getType(), str -> {
            return new JobTypeAvailabilityState(str, this.metrics);
        });
        computeIfAbsent.incrementEmptyResponses(ActorClock.currentTimeMillis());
        block(computeIfAbsent, longPollingActivateJobsRequest);
    }

    private void jobsAvailable(String str) {
        JobTypeAvailabilityState remove = this.jobTypeState.remove(str);
        if (remove != null) {
            unblockRequests(remove);
        }
    }

    private void unblockRequests(JobTypeAvailabilityState jobTypeAvailabilityState) {
        Queue<LongPollingActivateJobsRequest> blockedRequests = jobTypeAvailabilityState.getBlockedRequests();
        if (blockedRequests == null) {
            return;
        }
        blockedRequests.stream().filter(longPollingActivateJobsRequest -> {
            return !longPollingActivateJobsRequest.isCanceled();
        }).forEach(longPollingActivateJobsRequest2 -> {
            LOG.trace("Unblocking ActivateJobsRequest {}", longPollingActivateJobsRequest2.getRequest());
            activateJobs(longPollingActivateJobsRequest2);
        });
        jobTypeAvailabilityState.clearBlockedRequests();
    }

    private void block(JobTypeAvailabilityState jobTypeAvailabilityState, LongPollingActivateJobsRequest longPollingActivateJobsRequest) {
        if (longPollingActivateJobsRequest.isLongPollingDisabled()) {
            longPollingActivateJobsRequest.complete();
            return;
        }
        if (longPollingActivateJobsRequest.isTimedOut()) {
            return;
        }
        LOG.debug("Jobs of type {} not available. Blocking request {}", longPollingActivateJobsRequest.getType(), longPollingActivateJobsRequest.getRequest());
        jobTypeAvailabilityState.blockRequest(longPollingActivateJobsRequest);
        if (longPollingActivateJobsRequest.hasScheduledTimer()) {
            return;
        }
        addTimeOut(jobTypeAvailabilityState, longPollingActivateJobsRequest);
    }

    private void addTimeOut(JobTypeAvailabilityState jobTypeAvailabilityState, LongPollingActivateJobsRequest longPollingActivateJobsRequest) {
        ActorClock.currentTimeMillis();
        Duration longPollingTimeout = longPollingActivateJobsRequest.getLongPollingTimeout(this.longPollingTimeout);
        longPollingActivateJobsRequest.setScheduledTimer(this.actor.runDelayed(longPollingTimeout, () -> {
            LOG.debug("Remove blocking request {} for job type {} after timeout of {}", new Object[]{longPollingActivateJobsRequest.getRequest(), longPollingActivateJobsRequest.getType(), longPollingTimeout});
            jobTypeAvailabilityState.removeBlockedRequest(longPollingActivateJobsRequest);
            longPollingActivateJobsRequest.timeout();
        }));
    }

    private void probe() {
        long currentTimeMillis = ActorClock.currentTimeMillis();
        this.jobTypeState.forEach((str, jobTypeAvailabilityState) -> {
            if (jobTypeAvailabilityState.getLastUpdatedTime() < currentTimeMillis - this.probeTimeoutMillis) {
                jobTypeAvailabilityState.removeCanceledRequests();
                LongPollingActivateJobsRequest pollBlockedRequests = jobTypeAvailabilityState.pollBlockedRequests();
                if (pollBlockedRequests != null) {
                    activateJobsUnchecked(pollBlockedRequests);
                } else if (jobTypeAvailabilityState.getEmptyResponses() >= this.emptyResponseThreshold) {
                    jobTypeAvailabilityState.resetEmptyResponses(this.emptyResponseThreshold - 1);
                }
            }
        });
    }

    public static Builder newBuilder() {
        return new Builder();
    }
}
