package com.azure.cosmos.implementation.changefeed.implementation;

import com.azure.cosmos.implementation.CosmosSchedulers;
import com.azure.cosmos.implementation.changefeed.CancellationToken;
import com.azure.cosmos.implementation.changefeed.CancellationTokenSource;
import com.azure.cosmos.implementation.changefeed.ChangeFeedObserver;
import com.azure.cosmos.implementation.changefeed.ChangeFeedObserverCloseReason;
import com.azure.cosmos.implementation.changefeed.ChangeFeedObserverContext;
import com.azure.cosmos.implementation.changefeed.Lease;
import com.azure.cosmos.implementation.changefeed.LeaseRenewer;
import com.azure.cosmos.implementation.changefeed.PartitionProcessor;
import com.azure.cosmos.implementation.changefeed.PartitionSupervisor;
import com.azure.cosmos.implementation.changefeed.exceptions.LeaseLostException;
import com.azure.cosmos.implementation.changefeed.exceptions.ObserverException;
import com.azure.cosmos.implementation.changefeed.exceptions.PartitionSplitException;
import com.azure.cosmos.implementation.changefeed.exceptions.TaskCancelledException;
import java.time.Duration;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;

/* loaded from: input_file:WEB-INF/lib/azure-cosmos-4.32.1.jar:com/azure/cosmos/implementation/changefeed/implementation/PartitionSupervisorImpl.class */
class PartitionSupervisorImpl implements PartitionSupervisor {
    private final Lease lease;
    private final ChangeFeedObserver observer;
    private final PartitionProcessor processor;
    private final LeaseRenewer renewer;
    private CancellationTokenSource childShutdownCts = new CancellationTokenSource();
    private volatile RuntimeException resultException;
    private Scheduler scheduler;

    public PartitionSupervisorImpl(Lease lease, ChangeFeedObserver changeFeedObserver, PartitionProcessor partitionProcessor, LeaseRenewer leaseRenewer, Scheduler scheduler) {
        this.lease = lease;
        this.observer = changeFeedObserver;
        this.processor = partitionProcessor;
        this.renewer = leaseRenewer;
        this.scheduler = scheduler;
    }

    @Override // com.azure.cosmos.implementation.changefeed.PartitionSupervisor
    public Mono<Void> run(CancellationToken cancellationToken) {
        this.resultException = null;
        ChangeFeedObserverContextImpl changeFeedObserverContextImpl = new ChangeFeedObserverContextImpl(this.lease.getLeaseToken());
        this.observer.open(changeFeedObserverContextImpl);
        this.scheduler.schedule(() -> {
            this.processor.run(this.childShutdownCts.getToken()).subscribe();
        });
        this.scheduler.schedule(() -> {
            this.renewer.run(this.childShutdownCts.getToken()).subscribe();
        });
        return Mono.just(this).delayElement(Duration.ofMillis(100L), CosmosSchedulers.COSMOS_PARALLEL).repeat(() -> {
            return !cancellationToken.isCancellationRequested() && this.processor.getResultException() == null && this.renewer.getResultException() == null;
        }).last().flatMap(partitionSupervisorImpl -> {
            return afterRun(changeFeedObserverContextImpl, cancellationToken);
        });
    }

    private Mono<Void> afterRun(ChangeFeedObserverContext changeFeedObserverContext, CancellationToken cancellationToken) {
        RuntimeException resultException;
        ChangeFeedObserverCloseReason changeFeedObserverCloseReason = ChangeFeedObserverCloseReason.UNKNOWN;
        try {
            try {
                try {
                    this.childShutdownCts.cancel();
                    changeFeedObserverCloseReason = cancellationToken.isCancellationRequested() ? ChangeFeedObserverCloseReason.SHUTDOWN : ChangeFeedObserverCloseReason.UNKNOWN;
                    resultException = this.processor.getResultException();
                    if ((resultException == null || (resultException instanceof TaskCancelledException)) && this.renewer.getResultException() != null) {
                        resultException = this.renewer.getResultException();
                    }
                } catch (ObserverException e) {
                    ChangeFeedObserverCloseReason changeFeedObserverCloseReason2 = ChangeFeedObserverCloseReason.OBSERVER_ERROR;
                    this.resultException = e;
                    this.observer.close(changeFeedObserverContext, changeFeedObserverCloseReason2);
                } catch (Exception e2) {
                    this.observer.close(changeFeedObserverContext, ChangeFeedObserverCloseReason.UNKNOWN);
                }
            } catch (LeaseLostException e3) {
                ChangeFeedObserverCloseReason changeFeedObserverCloseReason3 = ChangeFeedObserverCloseReason.LEASE_LOST;
                this.resultException = e3;
                this.observer.close(changeFeedObserverContext, changeFeedObserverCloseReason3);
            } catch (PartitionSplitException e4) {
                ChangeFeedObserverCloseReason changeFeedObserverCloseReason4 = ChangeFeedObserverCloseReason.LEASE_GONE;
                this.resultException = e4;
                this.observer.close(changeFeedObserverContext, changeFeedObserverCloseReason4);
            } catch (TaskCancelledException e5) {
                ChangeFeedObserverCloseReason changeFeedObserverCloseReason5 = ChangeFeedObserverCloseReason.SHUTDOWN;
                this.resultException = null;
                this.observer.close(changeFeedObserverContext, changeFeedObserverCloseReason5);
            }
            if (resultException != null) {
                throw resultException;
            }
            this.observer.close(changeFeedObserverContext, changeFeedObserverCloseReason);
            return this.resultException != null ? Mono.error(this.resultException) : Mono.empty();
        } catch (Throwable th) {
            this.observer.close(changeFeedObserverContext, changeFeedObserverCloseReason);
            throw th;
        }
    }

    @Override // com.azure.cosmos.implementation.changefeed.PartitionSupervisor
    public RuntimeException getResultException() {
        return this.resultException;
    }

    @Override // com.azure.cosmos.implementation.changefeed.PartitionSupervisor
    public void shutdown() {
        if (this.childShutdownCts != null) {
            this.childShutdownCts.cancel();
        }
    }
}
