package io.rsocket.aeron.reactor;

import io.aeron.FragmentAssembler;
import io.aeron.logbuffer.FragmentHandler;
import io.aeron.logbuffer.Header;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.agrona.DirectBuffer;
import org.agrona.io.DirectBufferInputStream;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Operators;
import reactor.core.publisher.WorkQueueProcessor;

/* loaded from: input_file:io/rsocket/aeron/reactor/AeronSubscriptionFlux.class */
public class AeronSubscriptionFlux extends Flux<ByteBuf> implements Subscription, FragmentHandler {
    private static final int EFFORT = 8;
    private final WorkQueueProcessor<Runnable> workQueueProcessor;
    private final io.aeron.Subscription subscription;
    private final ByteBufAllocator allocator;
    private final FragmentAssembler assembler = new FragmentAssembler(this::onFragment);
    private final String name;
    volatile int missed;
    volatile int wip;
    volatile long requested;
    volatile int once;
    volatile CoreSubscriber<? super ByteBuf> actual;
    volatile boolean cancelled;
    static final AtomicIntegerFieldUpdater<AeronSubscriptionFlux> ONCE = AtomicIntegerFieldUpdater.newUpdater(AeronSubscriptionFlux.class, "once");
    static final AtomicIntegerFieldUpdater<AeronSubscriptionFlux> WIP = AtomicIntegerFieldUpdater.newUpdater(AeronSubscriptionFlux.class, "wip");
    static final AtomicIntegerFieldUpdater<AeronSubscriptionFlux> MISSED = AtomicIntegerFieldUpdater.newUpdater(AeronSubscriptionFlux.class, "missed");
    static final AtomicLongFieldUpdater<AeronSubscriptionFlux> REQUESTED = AtomicLongFieldUpdater.newUpdater(AeronSubscriptionFlux.class, "requested");
    private static final Logger logger = LoggerFactory.getLogger(AeronSubscriptionFlux.class);
    private static final ThreadLocal<DirectBufferInputStream> DIRECT_BUFFER_INPUT_STREAM = ThreadLocal.withInitial(DirectBufferInputStream::new);

    AeronSubscriptionFlux(String str, WorkQueueProcessor<Runnable> workQueueProcessor, io.aeron.Subscription subscription, ByteBufAllocator byteBufAllocator) {
        this.name = str;
        this.workQueueProcessor = workQueueProcessor;
        this.subscription = subscription;
        this.allocator = byteBufAllocator;
    }

    public static AeronSubscriptionFlux create(String str, WorkQueueProcessor<Runnable> workQueueProcessor, io.aeron.Subscription subscription, ByteBufAllocator byteBufAllocator) {
        return new AeronSubscriptionFlux(str, workQueueProcessor, subscription, byteBufAllocator);
    }

    public void subscribe(CoreSubscriber<? super ByteBuf> coreSubscriber) {
        Objects.requireNonNull(coreSubscriber, "subscribe");
        if (this.once != 0 || !ONCE.compareAndSet(this, 0, 1)) {
            Operators.error(coreSubscriber, new IllegalStateException("AeronSubscriptionFlux allows only a single Subscriber"));
            return;
        }
        coreSubscriber.onSubscribe(this);
        this.actual = coreSubscriber;
        if (this.cancelled) {
            this.actual = null;
        }
    }

    public void request(long j) {
        System.out.println("n - " + j);
        if (Operators.validate(j)) {
            Operators.addCap(REQUESTED, this, j);
            tryEmit();
        }
    }

    void tryEmit() {
        MISSED.incrementAndGet(this);
        if (WIP.compareAndSet(this, 0, 1)) {
            emit();
        }
    }

    void emit() {
        do {
            MISSED.set(this, 0);
            long j = 8;
            do {
                long j2 = j;
                j = j2 - 1;
                if (j2 <= 0 || this.requested <= 0) {
                    break;
                }
            } while (this.subscription.poll(this.assembler, 4096) >= 1);
            if (this.cancelled) {
                break;
            }
        } while (MISSED.get(this) != 0);
        if (this.cancelled || this.requested <= 0) {
            WIP.set(this, 0);
        } else {
            this.workQueueProcessor.onNext(this::emit);
        }
    }

    public void cancel() {
        System.out.printf(this.name + " - here", new Object[0]);
        if (this.cancelled) {
            return;
        }
        this.cancelled = true;
        this.actual = null;
    }

    public void onFragment(DirectBuffer directBuffer, int i, int i2, Header header) {
        try {
            ByteBuf wrappedBuffer = Unpooled.wrappedBuffer(directBuffer.byteBuffer());
            ByteBuf buffer = this.allocator.buffer(i2);
            buffer.writeBytes(wrappedBuffer, i, i2);
            if (logger.isDebugEnabled()) {
                logger.debug(this.name + " receiving:\n{}\n", ByteBufUtil.prettyHexDump(buffer));
            }
            this.actual.onNext(buffer);
            Operators.addCap(REQUESTED, this, -1L);
        } catch (Throwable th) {
            logger.error("error receiving bytes", th);
            this.actual.onError(th);
        }
    }
}
