package reactor.ipc.aeron;

import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.BiPredicate;
import java.util.function.Consumer;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.core.publisher.Operators;
import reactor.core.scheduler.Scheduler;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.concurrent.Queues;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:reactor/ipc/aeron/WriteSequencer.class */
public abstract class WriteSequencer<T> {
    private volatile int wip;
    private volatile boolean innerActive;
    private final Consumer<Object> discardedHandler;
    private final Scheduler scheduler;
    private static final Logger logger = Loggers.getLogger(WriteSequencer.class);
    static final AtomicIntegerFieldUpdater<WriteSequencer> WIP = AtomicIntegerFieldUpdater.newUpdater(WriteSequencer.class, "wip");
    private final Queue<?> pendingWrites = (Queue) Queues.unbounded().get();
    private final BiPredicate<MonoSink<?>, Object> pendingWriteOffer = (BiPredicate) this.pendingWrites;

    /* loaded from: input_file:reactor/ipc/aeron/WriteSequencer$InnerSubscriber.class */
    static abstract class InnerSubscriber<T> implements CoreSubscriber<T>, Subscription {
        final WriteSequencer<T> parent;
        volatile Subscription missedSubscription;
        volatile long missedRequested;
        volatile long missedProduced;
        volatile int wip;
        volatile boolean isCancelled;
        long requested;
        boolean unbounded;
        Subscription actual;
        long produced;
        MonoSink<?> promise;
        long upstreamRequested;
        final int batchSize;
        static final AtomicReferenceFieldUpdater<InnerSubscriber, Subscription> MISSED_SUBSCRIPTION = AtomicReferenceFieldUpdater.newUpdater(InnerSubscriber.class, Subscription.class, "missedSubscription");
        static final AtomicLongFieldUpdater<InnerSubscriber> MISSED_REQUESTED = AtomicLongFieldUpdater.newUpdater(InnerSubscriber.class, "missedRequested");
        static final AtomicLongFieldUpdater<InnerSubscriber> MISSED_PRODUCED = AtomicLongFieldUpdater.newUpdater(InnerSubscriber.class, "missedProduced");
        static final AtomicIntegerFieldUpdater<InnerSubscriber> WIP = AtomicIntegerFieldUpdater.newUpdater(InnerSubscriber.class, "wip");

        /* JADX INFO: Access modifiers changed from: package-private */
        public InnerSubscriber(WriteSequencer<T> writeSequencer, int i) {
            this.parent = writeSequencer;
            this.batchSize = i;
        }

        public void setResultSink(MonoSink<?> monoSink) {
            this.promise = monoSink;
        }

        public final void cancel() {
            if (this.isCancelled) {
                return;
            }
            this.isCancelled = true;
            drain();
        }

        public void onComplete() {
            long j = this.produced;
            ((WriteSequencer) this.parent).innerActive = false;
            if (j != 0) {
                this.produced = 0L;
                produced(j);
            }
            doOnComplete();
        }

        abstract void doOnComplete();

        public void onError(Throwable th) {
            long j = this.produced;
            ((WriteSequencer) this.parent).innerActive = false;
            if (j != 0) {
                this.produced = 0L;
                produced(j);
            }
            doOnError(th);
        }

        abstract void doOnError(Throwable th);

        public void onNext(T t) {
            this.produced++;
            doOnNext(t);
            if (this.upstreamRequested - this.produced != 0 || this.requested - this.produced <= 0) {
                return;
            }
            requestFromUpstream(this.actual);
        }

        abstract void doOnNext(T t);

        public void onSubscribe(Subscription subscription) {
            Objects.requireNonNull(subscription);
            if (this.isCancelled) {
                subscription.cancel();
                return;
            }
            if (this.wip != 0 || !WIP.compareAndSet(this, 0, 1)) {
                MISSED_SUBSCRIPTION.set(this, subscription);
                drain();
                return;
            }
            this.actual = subscription;
            this.upstreamRequested = 0L;
            doOnSubscribe();
            long j = this.requested;
            if (WIP.decrementAndGet(this) != 0) {
                drainLoop();
            }
            if (j != 0) {
                requestFromUpstream(subscription);
            }
        }

        abstract void doOnSubscribe();

        public final void request(long j) {
            if (!Operators.validate(j) || this.unbounded) {
                return;
            }
            if (this.wip != 0 || !WIP.compareAndSet(this, 0, 1)) {
                Operators.addCap(MISSED_REQUESTED, this, j);
                drain();
                return;
            }
            long j2 = this.requested;
            if (j2 != Long.MAX_VALUE) {
                long addCap = Operators.addCap(j2, j);
                this.requested = addCap;
                if (addCap == Long.MAX_VALUE) {
                    this.unbounded = true;
                }
            }
            Subscription subscription = this.actual;
            if (WIP.decrementAndGet(this) != 0) {
                drainLoop();
            }
            if (subscription != null) {
                requestFromUpstream(subscription);
            }
        }

        final void requestFromUpstream(Subscription subscription) {
            if (this.upstreamRequested < this.requested) {
                this.upstreamRequested += this.batchSize;
                subscription.request(this.batchSize);
            }
        }

        final void drain() {
            if (WIP.getAndIncrement(this) != 0) {
                return;
            }
            drainLoop();
        }

        final void drainLoop() {
            int i = 1;
            long j = 0;
            Subscription subscription = null;
            do {
                Subscription subscription2 = this.missedSubscription;
                if (subscription2 != null) {
                    subscription2 = MISSED_SUBSCRIPTION.getAndSet(this, null);
                }
                long j2 = this.missedRequested;
                if (j2 != 0) {
                    j2 = MISSED_REQUESTED.getAndSet(this, 0L);
                }
                long j3 = this.missedProduced;
                if (j3 != 0) {
                    j3 = MISSED_PRODUCED.getAndSet(this, 0L);
                }
                Subscription subscription3 = this.actual;
                if (this.isCancelled) {
                    if (subscription3 != null) {
                        subscription3.cancel();
                        this.actual = null;
                        this.upstreamRequested = 0L;
                    }
                    if (subscription2 != null) {
                        subscription2.cancel();
                    }
                    ((WriteSequencer) this.parent).innerActive = false;
                } else {
                    long j4 = this.requested;
                    if (j4 != Long.MAX_VALUE) {
                        long addCap = Operators.addCap(j4, j2);
                        if (addCap != Long.MAX_VALUE) {
                            long j5 = addCap - j3;
                            if (j5 < 0) {
                                Operators.reportMoreProduced();
                                j5 = 0;
                            }
                            j4 = j5;
                        } else {
                            j4 = addCap;
                        }
                        this.requested = j4;
                    }
                    if (subscription2 != null) {
                        this.actual = subscription2;
                        this.upstreamRequested = 0L;
                        if (j4 != 0) {
                            j = Operators.addCap(j, j4);
                            subscription = subscription2;
                        }
                    } else if (j2 != 0 && subscription3 != null) {
                        j = Operators.addCap(j, j2);
                        subscription = subscription3;
                    }
                }
                i = WIP.addAndGet(this, -i);
            } while (i != 0);
            if (j != 0) {
                requestFromUpstream(subscription);
            }
        }

        final void produced(long j) {
            if (this.unbounded) {
                return;
            }
            if (this.wip != 0 || !WIP.compareAndSet(this, 0, 1)) {
                Operators.addCap(MISSED_PRODUCED, this, j);
                drain();
                return;
            }
            long j2 = this.requested;
            if (j2 != Long.MAX_VALUE) {
                long j3 = j2 - j;
                if (j3 < 0) {
                    Operators.reportMoreProduced();
                    j3 = 0;
                }
                this.requested = j3;
            } else {
                this.unbounded = true;
            }
            if (WIP.decrementAndGet(this) == 0) {
                return;
            }
            drainLoop();
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void scheduleNextPublisherDrain() {
            this.parent.scheduleDrain();
        }
    }

    public WriteSequencer(Scheduler scheduler, Consumer<Object> consumer) {
        this.discardedHandler = consumer;
        this.scheduler = scheduler;
    }

    abstract InnerSubscriber<T> getInner();

    abstract Consumer<Throwable> getErrorHandler();

    public Mono<Void> add(Publisher<?> publisher) {
        return Mono.create(monoSink -> {
            if (!this.pendingWriteOffer.test(monoSink, publisher)) {
                monoSink.error(new Exception("Failed to enqueue publisher"));
            }
            scheduleDrain();
        });
    }

    public boolean isEmpty() {
        return this.pendingWrites.isEmpty();
    }

    boolean isReady() {
        return !getInner().isCancelled;
    }

    public void drain() {
        InnerSubscriber<T> inner = getInner();
        if (WIP.getAndIncrement(this) != 0) {
            return;
        }
        while (true) {
            if (inner.isCancelled) {
                discard();
                inner.isCancelled = false;
                if (WIP.decrementAndGet(this) == 0) {
                    return;
                }
            } else if (!this.innerActive) {
                try {
                    MonoSink<?> monoSink = (MonoSink) this.pendingWrites.poll();
                    if (!(monoSink == null)) {
                        Callable callable = (Publisher) this.pendingWrites.poll();
                        if (callable instanceof Callable) {
                            try {
                                Object call = callable.call();
                                if (call == null) {
                                    monoSink.success();
                                } else {
                                    this.innerActive = true;
                                    inner.setResultSink(monoSink);
                                    inner.onSubscribe(Operators.scalarSubscription(inner, call));
                                }
                            } catch (Throwable th) {
                                monoSink.error(th);
                            }
                        } else {
                            this.innerActive = true;
                            inner.setResultSink(monoSink);
                            callable.subscribe(inner);
                        }
                    } else if (WIP.decrementAndGet(this) == 0) {
                        return;
                    }
                } catch (Throwable th2) {
                    getErrorHandler().accept(th2);
                    return;
                }
            } else if (WIP.decrementAndGet(this) == 0) {
                return;
            }
        }
    }

    void discard() {
        while (!this.pendingWrites.isEmpty()) {
            try {
                MonoSink monoSink = (MonoSink) this.pendingWrites.poll();
                Object poll = this.pendingWrites.poll();
                if (logger.isDebugEnabled()) {
                    logger.debug("Terminated. Dropping: {}", new Object[]{poll});
                }
                this.discardedHandler.accept(poll);
                monoSink.error(new AbortedException());
            } catch (Throwable th) {
                getErrorHandler().accept(th);
                return;
            }
        }
    }

    void scheduleDrain() {
        this.scheduler.schedule(this::drain);
    }
}
