package io.datakernel.csp.net;

import io.datakernel.bytebuf.ByteBuf;
import io.datakernel.bytebuf.ByteBufQueue;
import io.datakernel.csp.ChannelConsumer;
import io.datakernel.csp.ChannelSupplier;
import io.datakernel.csp.ChannelSuppliers;
import io.datakernel.csp.binary.BinaryChannelSupplier;
import io.datakernel.csp.binary.ByteBufSerializer;
import io.datakernel.net.AsyncTcpSocket;
import io.datakernel.promise.Promise;
import org.jetbrains.annotations.NotNull;

/* loaded from: input_file:io/datakernel/csp/net/MessagingWithBinaryStreaming.class */
public final class MessagingWithBinaryStreaming<I, O> implements Messaging<I, O> {
    private final AsyncTcpSocket socket;
    private final ByteBufSerializer<I, O> serializer;
    private final ByteBufQueue bufs = new ByteBufQueue();
    private final BinaryChannelSupplier bufsSupplier = BinaryChannelSupplier.ofProvidedQueue(this.bufs, () -> {
        return this.socket.read().then(byteBuf -> {
            if (byteBuf == null) {
                return Promise.ofException(BinaryChannelSupplier.UNEXPECTED_END_OF_STREAM_EXCEPTION);
            }
            this.bufs.add(byteBuf);
            return Promise.complete();
        }).whenException(this::close);
    }, Promise::complete, this);
    private Throwable closedException;
    private boolean readDone;
    private boolean writeDone;

    private MessagingWithBinaryStreaming(AsyncTcpSocket asyncTcpSocket, ByteBufSerializer<I, O> byteBufSerializer) {
        this.socket = asyncTcpSocket;
        this.serializer = byteBufSerializer;
    }

    public static <I, O> MessagingWithBinaryStreaming<I, O> create(AsyncTcpSocket asyncTcpSocket, ByteBufSerializer<I, O> byteBufSerializer) {
        MessagingWithBinaryStreaming<I, O> messagingWithBinaryStreaming = new MessagingWithBinaryStreaming<>(asyncTcpSocket, byteBufSerializer);
        messagingWithBinaryStreaming.prefetch();
        return messagingWithBinaryStreaming;
    }

    private void prefetch() {
        if (this.bufs.isEmpty()) {
            this.socket.read().whenResult(byteBuf -> {
                if (byteBuf != null) {
                    this.bufs.add(byteBuf);
                } else {
                    this.readDone = true;
                    closeIfDone();
                }
            }).whenException(this::close);
        }
    }

    @Override // io.datakernel.csp.net.Messaging
    public Promise<I> receive() {
        return this.bufsSupplier.parse(this.serializer).whenResult(obj -> {
            prefetch();
        }).whenException(this::close);
    }

    @Override // io.datakernel.csp.net.Messaging
    public Promise<Void> send(O o) {
        return this.socket.write(this.serializer.serialize(o));
    }

    @Override // io.datakernel.csp.net.Messaging
    public Promise<Void> sendEndOfStream() {
        return this.socket.write((ByteBuf) null).whenResult(r4 -> {
            this.writeDone = true;
            closeIfDone();
        }).whenException(this::close);
    }

    @Override // io.datakernel.csp.net.Messaging
    public ChannelConsumer<ByteBuf> sendBinaryStream() {
        return ChannelConsumer.ofSocket(this.socket).withAcknowledgement(promise -> {
            return promise.whenResult(r4 -> {
                this.writeDone = true;
                closeIfDone();
            });
        });
    }

    @Override // io.datakernel.csp.net.Messaging
    public ChannelSupplier<ByteBuf> receiveBinaryStream() {
        return ChannelSuppliers.concat(ChannelSupplier.ofIterator(this.bufs.asIterator()), ChannelSupplier.ofSocket(this.socket)).withEndOfStream(promise -> {
            return promise.whenResult(r4 -> {
                this.readDone = true;
                closeIfDone();
            });
        });
    }

    public void close(@NotNull Throwable th) {
        if (isClosed()) {
            return;
        }
        this.closedException = th;
        this.socket.close(th);
        this.bufs.recycle();
    }

    private void closeIfDone() {
        if (this.readDone && this.writeDone) {
            close();
        }
    }

    public boolean isClosed() {
        return this.closedException != null;
    }

    public String toString() {
        return "MessagingWithBinaryStreaming{socket=" + this.socket + "}";
    }
}
