package reactor.ipc.aeron.server;

import io.aeron.Subscription;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import org.reactivestreams.Publisher;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.ipc.aeron.AeronInbound;
import reactor.ipc.aeron.AeronOptions;
import reactor.ipc.aeron.AeronOutbound;
import reactor.ipc.aeron.AeronUtils;
import reactor.ipc.aeron.AeronWrapper;
import reactor.ipc.aeron.ControlMessageSubscriber;
import reactor.ipc.aeron.DefaultAeronOutbound;
import reactor.ipc.aeron.HeartbeatSender;
import reactor.ipc.aeron.HeartbeatWatchdog;
import reactor.ipc.aeron.MessageType;
import reactor.ipc.aeron.Pooler;
import reactor.util.Logger;
import reactor.util.Loggers;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:reactor/ipc/aeron/server/ServerHandler.class */
public final class ServerHandler implements ControlMessageSubscriber, Disposable {
    private final String category;
    private final AeronWrapper wrapper;
    private final BiFunction<? super AeronInbound, ? super AeronOutbound, ? extends Publisher<Void>> ioHandler;
    private final AeronOptions options;
    private final Pooler pooler;
    private final HeartbeatWatchdog heartbeatWatchdog;
    private final HeartbeatSender heartbeatSender;
    private final Subscription controlSubscription;
    private static final Logger logger = Loggers.getLogger(ServerHandler.class);
    private static final AtomicInteger streamIdCounter = new AtomicInteger(1000);
    private final AtomicLong nextSessionId = new AtomicLong(0);
    private final Map<Long, SessionHandler> sessionHandlerById = new ConcurrentHashMap();

    /* loaded from: input_file:reactor/ipc/aeron/server/ServerHandler$SessionHandler.class */
    class SessionHandler implements Disposable {
        private final Logger logger = Loggers.getLogger(SessionHandler.class);
        private final DefaultAeronOutbound outbound;
        private final AeronServerInbound inbound;
        private final int clientSessionStreamId;
        private final UUID connectRequestId;
        private final long sessionId;
        private final ServerConnector connector;

        SessionHandler(String str, int i, int i2, UUID uuid, long j, int i3) {
            this.clientSessionStreamId = i;
            this.outbound = new DefaultAeronOutbound(ServerHandler.this.category, ServerHandler.this.wrapper, str, ServerHandler.this.options);
            this.connectRequestId = uuid;
            this.sessionId = j;
            this.inbound = new AeronServerInbound(ServerHandler.this.category, ServerHandler.this.wrapper, ServerHandler.this.options, ServerHandler.this.pooler, i3, j, this::dispose);
            this.connector = new ServerConnector(ServerHandler.this.category, ServerHandler.this.wrapper, str, i2, j, i3, uuid, ServerHandler.this.options, ServerHandler.this.heartbeatSender);
        }

        Mono<Void> initialise() {
            return this.connector.connect().then(this.outbound.initialise(this.sessionId, this.clientSessionStreamId)).doOnSuccess(r9 -> {
                this.inbound.initialise();
                HeartbeatWatchdog heartbeatWatchdog = ServerHandler.this.heartbeatWatchdog;
                long j = this.sessionId;
                Runnable runnable = () -> {
                    ServerHandler.this.heartbeatWatchdog.remove(this.sessionId);
                    dispose();
                };
                AeronServerInbound aeronServerInbound = this.inbound;
                aeronServerInbound.getClass();
                heartbeatWatchdog.add(j, runnable, aeronServerInbound::lastSignalTimeNs);
                ServerHandler.this.sessionHandlerById.put(Long.valueOf(this.sessionId), this);
                this.logger.debug("[{}] Client with connectRequestId: {} successfully connected, sessionId: {}", new Object[]{ServerHandler.this.category, this.connectRequestId, Long.valueOf(this.sessionId)});
                Mono.from((Publisher) ServerHandler.this.ioHandler.apply(this.inbound, this.outbound)).doOnTerminate(this::dispose).subscribe();
            }).doOnError(th -> {
                this.logger.debug("[{}] Failed to connect to the client for sessionId: {}", new Object[]{ServerHandler.this.category, Long.valueOf(this.sessionId), th});
                dispose();
            });
        }

        public void dispose() {
            ServerHandler.this.sessionHandlerById.remove(this);
            ServerHandler.this.heartbeatWatchdog.remove(this.sessionId);
            this.connector.dispose();
            this.outbound.dispose();
            this.inbound.dispose();
            this.logger.debug("[{}] Closed session with sessionId: {}", new Object[]{ServerHandler.this.category, Long.valueOf(this.sessionId)});
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ServerHandler(String str, BiFunction<? super AeronInbound, ? super AeronOutbound, ? extends Publisher<Void>> biFunction, AeronOptions aeronOptions) {
        this.wrapper = new AeronWrapper(str, aeronOptions);
        this.controlSubscription = this.wrapper.addSubscription(aeronOptions.serverChannel(), aeronOptions.serverStreamId(), "to receive control requests on", 0L);
        this.category = str;
        this.ioHandler = biFunction;
        this.options = aeronOptions;
        this.pooler = new Pooler(str);
        this.heartbeatWatchdog = new HeartbeatWatchdog(aeronOptions.heartbeatTimeoutMillis(), str);
        this.heartbeatSender = new HeartbeatSender(aeronOptions.heartbeatTimeoutMillis(), str);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void initialise() {
        this.pooler.addControlSubscription(this.controlSubscription, this);
        this.pooler.initialise();
    }

    public void dispose() {
        this.pooler.shutdown().doOnTerminate(() -> {
            this.sessionHandlerById.values().forEach((v0) -> {
                v0.dispose();
            });
            this.controlSubscription.close();
            this.wrapper.dispose();
        }).subscribe();
    }

    @Override // reactor.ipc.aeron.PoolerSubscriber
    public void onSubscribe(org.reactivestreams.Subscription subscription) {
        subscription.request(Long.MAX_VALUE);
    }

    @Override // reactor.ipc.aeron.ControlMessageSubscriber
    public void onConnect(UUID uuid, String str, int i, int i2) {
        logger.debug("[{}] Received {} for connectRequestId: {}, channel={}, clientControlStreamId={}, clientSessionStreamId={}", new Object[]{this.category, MessageType.CONNECT, uuid, AeronUtils.minifyChannel(str), Integer.valueOf(i), Integer.valueOf(i2)});
        new SessionHandler(str, i2, i, uuid, this.nextSessionId.incrementAndGet(), streamIdCounter.incrementAndGet()).initialise().subscribeOn(Schedulers.single()).subscribe();
    }

    @Override // reactor.ipc.aeron.ControlMessageSubscriber
    public void onConnectAck(UUID uuid, long j, int i) {
        logger.error("[{}] Received unsupported server request {}, connectRequestId: {}", new Object[]{this.category, MessageType.CONNECT_ACK, uuid});
    }

    @Override // reactor.ipc.aeron.ControlMessageSubscriber
    public void onHeartbeat(long j) {
        this.heartbeatWatchdog.heartbeatReceived(j);
    }
}
