package io.rsocket.aeron;

import io.aeron.Publication;
import io.aeron.Subscription;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.rsocket.DuplexConnection;
import io.rsocket.Frame;
import io.rsocket.aeron.reactor.AeronPublicationSubscriber;
import io.rsocket.aeron.reactor.AeronSubscriptionFlux;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.Operators;
import reactor.core.publisher.WorkQueueProcessor;

/* loaded from: input_file:io/rsocket/aeron/AeronDuplexConnection.class */
public class AeronDuplexConnection implements DuplexConnection {
    private final String name;
    private final Function<? super Publisher<ByteBuf>, ? extends Publisher<ByteBuf>> lift;
    private final AeronSubscriptionFlux receiveFlux;
    private final MonoProcessor<Void> onClose = MonoProcessor.create();

    public AeronDuplexConnection(String str, WorkQueueProcessor<Runnable> workQueueProcessor, Publication publication, Subscription subscription, ByteBufAllocator byteBufAllocator) {
        this.name = str;
        this.lift = Operators.lift((scannable, coreSubscriber) -> {
            return AeronPublicationSubscriber.create(str + "AeronPublicationSubscriber", workQueueProcessor, coreSubscriber, publication);
        });
        this.receiveFlux = AeronSubscriptionFlux.create(str + "AeronSubscriptionFlux", workQueueProcessor, subscription, byteBufAllocator);
        this.onClose.doFinally(signalType -> {
            publication.close();
            subscription.close();
        }).subscribe();
    }

    public Mono<Void> send(Publisher<Frame> publisher) {
        return Flux.from(publisher).map((v0) -> {
            return v0.content();
        }).transform(this.lift).then();
    }

    public Flux<Frame> receive() {
        return this.receiveFlux.map(Frame::from).doOnNext(frame -> {
            System.out.println(this.name + " -^^- received frame " + frame.toString());
        });
    }

    public Mono<Void> onClose() {
        return this.onClose;
    }

    public void dispose() {
        this.onClose.onComplete();
    }
}
