package com.azure.cosmos.implementation.changefeed.implementation;

import com.azure.cosmos.implementation.CosmosSchedulers;
import com.azure.cosmos.implementation.changefeed.CancellationToken;
import com.azure.cosmos.implementation.changefeed.CancellationTokenSource;
import com.azure.cosmos.implementation.changefeed.Lease;
import com.azure.cosmos.implementation.changefeed.LeaseContainer;
import com.azure.cosmos.implementation.changefeed.PartitionController;
import com.azure.cosmos.implementation.changefeed.PartitionLoadBalancer;
import com.azure.cosmos.implementation.changefeed.PartitionLoadBalancingStrategy;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;

/* loaded from: input_file:WEB-INF/lib/azure-cosmos-4.32.1.jar:com/azure/cosmos/implementation/changefeed/implementation/PartitionLoadBalancerImpl.class */
class PartitionLoadBalancerImpl implements PartitionLoadBalancer {
    private final Logger logger = LoggerFactory.getLogger(PartitionLoadBalancerImpl.class);
    private final PartitionController partitionController;
    private final LeaseContainer leaseContainer;
    private final PartitionLoadBalancingStrategy partitionLoadBalancingStrategy;
    private final Duration leaseAcquireInterval;
    private final Scheduler scheduler;
    private CancellationTokenSource cancellationTokenSource;
    private volatile boolean started;
    private final Object lock;

    public PartitionLoadBalancerImpl(PartitionController partitionController, LeaseContainer leaseContainer, PartitionLoadBalancingStrategy partitionLoadBalancingStrategy, Duration duration, Scheduler scheduler) {
        if (partitionController == null) {
            throw new IllegalArgumentException("partitionController");
        }
        if (leaseContainer == null) {
            throw new IllegalArgumentException("leaseContainer");
        }
        if (partitionLoadBalancingStrategy == null) {
            throw new IllegalArgumentException("partitionLoadBalancingStrategy");
        }
        if (scheduler == null) {
            throw new IllegalArgumentException("executorService");
        }
        this.partitionController = partitionController;
        this.leaseContainer = leaseContainer;
        this.partitionLoadBalancingStrategy = partitionLoadBalancingStrategy;
        this.leaseAcquireInterval = duration;
        this.scheduler = scheduler;
        this.started = false;
        this.lock = new Object();
    }

    @Override // com.azure.cosmos.implementation.changefeed.PartitionLoadBalancer
    public Mono<Void> start() {
        synchronized (this.lock) {
            if (this.started) {
                throw new IllegalStateException("Partition load balancer already started");
            }
            this.cancellationTokenSource = new CancellationTokenSource();
            this.started = true;
        }
        return Mono.fromRunnable(() -> {
            this.scheduler.schedule(() -> {
                run(this.cancellationTokenSource.getToken()).subscribe();
            });
        });
    }

    @Override // com.azure.cosmos.implementation.changefeed.PartitionLoadBalancer
    public Mono<Void> stop() {
        synchronized (this.lock) {
            this.started = false;
            this.cancellationTokenSource.cancel();
        }
        return this.partitionController.shutdown();
    }

    @Override // com.azure.cosmos.implementation.changefeed.PartitionLoadBalancer
    public boolean isRunning() {
        return this.started;
    }

    private Mono<Void> run(CancellationToken cancellationToken) {
        return Flux.just(this).flatMap(partitionLoadBalancerImpl -> {
            return this.leaseContainer.getAllLeases();
        }).collectList().flatMap(list -> {
            if (cancellationToken.isCancellationRequested()) {
                return Mono.empty();
            }
            List<Lease> selectLeasesToTake = this.partitionLoadBalancingStrategy.selectLeasesToTake(list);
            if (selectLeasesToTake.size() > 0) {
                this.logger.info("Found {} total leases, taking ownership of {}", Integer.valueOf(list.size()), Integer.valueOf(selectLeasesToTake.size()));
            }
            return cancellationToken.isCancellationRequested() ? Mono.empty() : Flux.fromIterable(selectLeasesToTake).limitRate(1).flatMap(lease -> {
                return cancellationToken.isCancellationRequested() ? Mono.empty() : this.partitionController.addOrUpdateLease(lease);
            }).then(Mono.just(this).flatMap(partitionLoadBalancerImpl2 -> {
                if (cancellationToken.isCancellationRequested()) {
                    return Mono.empty();
                }
                Instant plus = Instant.now().plus((TemporalAmount) this.leaseAcquireInterval);
                return Mono.just(partitionLoadBalancerImpl2).delayElement(Duration.ofMillis(100L), CosmosSchedulers.COSMOS_PARALLEL).repeat(() -> {
                    return !cancellationToken.isCancellationRequested() && Instant.now().isBefore(plus);
                }).last();
            }));
        }).onErrorResume(th -> {
            this.logger.warn("Unexpected exception thrown while trying to acquire available leases", th);
            return Mono.empty();
        }).repeat(() -> {
            return !cancellationToken.isCancellationRequested();
        }).then().onErrorResume(th2 -> {
            this.logger.info("Partition load balancer task stopped.");
            return stop();
        });
    }
}
