package reactor.ipc.aeron.server;

import io.aeron.Subscription;
import java.nio.ByteBuffer;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import reactor.core.Disposable;
import reactor.core.publisher.TopicProcessor;
import reactor.ipc.aeron.AeronInbound;
import reactor.ipc.aeron.AeronOptions;
import reactor.ipc.aeron.AeronWrapper;
import reactor.ipc.aeron.ByteBufferFlux;
import reactor.ipc.aeron.DataMessageSubscriber;
import reactor.ipc.aeron.MessageType;
import reactor.ipc.aeron.Pooler;
import reactor.util.Logger;
import reactor.util.Loggers;

/* loaded from: input_file:reactor/ipc/aeron/server/AeronServerInbound.class */
final class AeronServerInbound implements AeronInbound, Disposable {
    private final ByteBufferFlux flux;
    private final TopicProcessor<ByteBuffer> processor;
    private final Pooler pooler;
    private final Subscription serverDataSubscription;
    private final ServerDataMessageProcessor messageProcessor;

    /* loaded from: input_file:reactor/ipc/aeron/server/AeronServerInbound$ServerDataMessageProcessor.class */
    static class ServerDataMessageProcessor implements DataMessageSubscriber, Publisher<ByteBuffer> {
        private static final Logger logger = Loggers.getLogger(ServerDataMessageProcessor.class);
        private final String category;
        private volatile org.reactivestreams.Subscription subscription;
        private volatile long lastSignalTimeNs;
        private volatile Subscriber<? super ByteBuffer> subscriber;
        private final long sessionId;
        private final Runnable onCompleteHandler;

        ServerDataMessageProcessor(String str, long j, Runnable runnable) {
            this.category = str;
            this.sessionId = j;
            this.onCompleteHandler = runnable;
        }

        @Override // reactor.ipc.aeron.PoolerSubscriber
        public void onSubscribe(org.reactivestreams.Subscription subscription) {
            this.subscription = subscription;
        }

        @Override // reactor.ipc.aeron.DataMessageSubscriber
        public void onNext(long j, ByteBuffer byteBuffer) {
            if (logger.isTraceEnabled()) {
                logger.trace("[{}] Received {} for sessionId: {}, buffer: {}", new Object[]{this.category, MessageType.NEXT, Long.valueOf(j), byteBuffer});
            }
            this.lastSignalTimeNs = System.nanoTime();
            if (this.sessionId == j) {
                this.subscriber.onNext(byteBuffer);
            } else {
                logger.error("[{}] Received {} for unexpected sessionId: {}", new Object[]{this.category, MessageType.NEXT, Long.valueOf(j)});
            }
        }

        @Override // reactor.ipc.aeron.DataMessageSubscriber
        public void onComplete(long j) {
            if (logger.isTraceEnabled()) {
                logger.trace("[{}] Received {} for sessionId: {}", new Object[]{this.category, MessageType.COMPLETE, Long.valueOf(j)});
            }
            if (this.sessionId == j) {
                this.onCompleteHandler.run();
            } else {
                logger.error("[{}] Received {} for unexpected sessionId: {}", new Object[]{this.category, MessageType.COMPLETE, Long.valueOf(j)});
            }
        }

        public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
            this.subscriber = subscriber;
            subscriber.onSubscribe(this.subscription);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AeronServerInbound(String str, AeronWrapper aeronWrapper, AeronOptions aeronOptions, Pooler pooler, int i, long j, Runnable runnable) {
        this.processor = TopicProcessor.builder().name(str).build();
        this.pooler = pooler;
        this.flux = new ByteBufferFlux(this.processor);
        this.serverDataSubscription = aeronWrapper.addSubscription(aeronOptions.serverChannel(), i, "to receive client data on", j);
        this.messageProcessor = new ServerDataMessageProcessor(str, j, runnable);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void initialise() {
        this.pooler.addDataSubscription(this.serverDataSubscription, this.messageProcessor);
        this.messageProcessor.subscribe(this.processor);
    }

    @Override // reactor.ipc.aeron.AeronInbound
    public ByteBufferFlux receive() {
        return this.flux;
    }

    public void dispose() {
        this.processor.onComplete();
        this.pooler.removeSubscription(this.serverDataSubscription);
        this.serverDataSubscription.close();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long lastSignalTimeNs() {
        return this.messageProcessor.lastSignalTimeNs;
    }
}
