package org.netcrusher.tcp;

import java.io.EOFException;
import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.AbstractSelectableChannel;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import org.netcrusher.core.meter.RateMeter;
import org.netcrusher.core.meter.RateMeterImpl;
import org.netcrusher.core.nio.NioUtils;
import org.netcrusher.core.nio.SelectionKeyControl;
import org.netcrusher.core.reactor.NioReactor;
import org.netcrusher.core.state.BitState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/netcrusher/tcp/TcpChannel.class */
public class TcpChannel {
    private static final Logger LOGGER = LoggerFactory.getLogger(TcpChannel.class);
    private static final long LINGER_PERIOD_NS = TimeUnit.MILLISECONDS.toNanos(500);
    private final String name;
    private final NioReactor reactor;
    private final Runnable ownerClose;
    private final SocketChannel channel;
    private final SelectionKeyControl selectionKeyControl;
    private final TcpQueue incomingQueue;
    private final TcpQueue outgoingQueue;
    private TcpChannel other;
    private final Queue<Runnable> postOperations = new LinkedList();
    private final Meters meters = new Meters();
    private final State state = new State(State.FROZEN);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/netcrusher/tcp/TcpChannel$Meters.class */
    public static final class Meters {
        private final RateMeterImpl readBytes;
        private final RateMeterImpl sentBytes;

        private Meters() {
            this.readBytes = new RateMeterImpl();
            this.sentBytes = new RateMeterImpl();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/netcrusher/tcp/TcpChannel$State.class */
    public static final class State extends BitState {
        private static final int OPEN = bit(0);
        private static final int FROZEN = bit(1);
        private static final int CLOSED = bit(2);
        private boolean readEof;
        private boolean sendThrottled;

        private State(int i) {
            super(i);
            this.readEof = false;
            this.sendThrottled = false;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void setReadEof(boolean z) {
            this.readEof = z;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean isReadEof() {
            return is(CLOSED) || this.readEof;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean isSendThrottled() {
            return this.sendThrottled;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void setSendThrottled(boolean z) {
            this.sendThrottled = z;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean isWritable() {
            return is(OPEN) && !this.sendThrottled;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean isReadable() {
            return is(OPEN) && !this.readEof;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public TcpChannel(String str, NioReactor nioReactor, Runnable runnable, SocketChannel socketChannel, TcpQueue tcpQueue, TcpQueue tcpQueue2) throws IOException {
        this.name = str;
        this.reactor = nioReactor;
        this.ownerClose = runnable;
        this.channel = socketChannel;
        this.incomingQueue = tcpQueue;
        this.outgoingQueue = tcpQueue2;
        this.selectionKeyControl = new SelectionKeyControl(nioReactor.getSelector().register(socketChannel, 0, this::callback));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void close() {
        this.reactor.getSelector().execute(() -> {
            if (!this.state.not(State.CLOSED)) {
                return false;
            }
            if (this.state.is(State.OPEN)) {
                freeze();
            }
            if (this.meters.sentBytes.getTotalCount() > 0) {
                NioUtils.close((AbstractSelectableChannel) this.channel);
            } else {
                NioUtils.closeNoLinger(this.channel);
            }
            this.state.set(State.CLOSED);
            if (LOGGER.isDebugEnabled()) {
                long calculateReadableBytes = this.incomingQueue.calculateReadableBytes();
                if (calculateReadableBytes > 0) {
                    LOGGER.debug("Channel {} has {} incoming bytes when closing", this.name, Long.valueOf(calculateReadableBytes));
                }
            }
            return true;
        });
    }

    private void closeAll() {
        close();
        this.ownerClose.run();
    }

    private void closeAllDeferred() {
        this.reactor.getSelector().schedule(this::closeAll, LINGER_PERIOD_NS);
    }

    private void closeEOF() {
        this.state.setReadEof(true);
        this.other.postOperations.add(() -> {
            this.other.shutdownWrite();
        });
        this.other.processPostOperations();
        if (!this.other.state.isReadEof() || this.incomingQueue.hasReadable() || this.outgoingQueue.hasReadable()) {
            return;
        }
        closeAllDeferred();
    }

    private void closeLocal() {
        close();
        this.other.postOperations.add(() -> {
            this.other.closeAll();
        });
        this.other.processPostOperations();
    }

    private void callback(SelectionKey selectionKey) throws IOException {
        try {
            if (selectionKey.isWritable()) {
                handleWritableEvent(false);
            }
        } catch (ClosedChannelException e) {
            LOGGER.debug("Channel closed on write {}", this.name);
            closeLocal();
        } catch (Exception e2) {
            LOGGER.error("Exception on {}", this.name, e2);
            closeAll();
        }
        try {
            if (selectionKey.isReadable()) {
                handleReadableEvent();
            }
        } catch (EOFException e3) {
            LOGGER.debug("EOF on transfer on {}", this.name);
            closeEOF();
        } catch (ClosedChannelException e4) {
            LOGGER.debug("Channel closed on {}", this.name);
            closeLocal();
        } catch (Exception e5) {
            LOGGER.error("Exception on {}", this.name, e5);
            closeAll();
        }
    }

    private void handleWritableEvent(boolean z) throws IOException {
        TcpQueue tcpQueue = this.incomingQueue;
        while (true) {
            if (!this.state.isWritable()) {
                break;
            }
            TcpQueueBuffers requestReadableBuffers = tcpQueue.requestReadableBuffers();
            if (!requestReadableBuffers.isEmpty()) {
                try {
                    long write = this.channel.write(requestReadableBuffers.getArray(), requestReadableBuffers.getOffset(), requestReadableBuffers.getCount());
                    tcpQueue.releaseReadableBuffers();
                    if (write == 0) {
                        break;
                    }
                    if (LOGGER.isTraceEnabled()) {
                        LOGGER.trace("Written {} bytes to {}", Long.valueOf(write), this.name);
                    }
                    this.meters.sentBytes.update(write);
                } catch (Throwable th) {
                    tcpQueue.releaseReadableBuffers();
                    throw th;
                }
            } else if (requestReadableBuffers.getDelayNs() > 0) {
                throttleSend(requestReadableBuffers.getDelayNs());
            } else {
                this.selectionKeyControl.disableWrites();
            }
        }
        this.other.suggestDeferredRead();
        processPostOperations();
    }

    private void handleReadableEvent() throws IOException {
        TcpQueue tcpQueue = this.outgoingQueue;
        while (true) {
            if (!this.state.isReadable()) {
                break;
            }
            TcpQueueBuffers requestWritableBuffers = tcpQueue.requestWritableBuffers();
            if (requestWritableBuffers.isEmpty()) {
                this.selectionKeyControl.disableReads();
                break;
            }
            try {
                long read = this.channel.read(requestWritableBuffers.getArray(), requestWritableBuffers.getOffset(), requestWritableBuffers.getCount());
                tcpQueue.releaseWritableBuffers();
                if (read < 0) {
                    if (!this.channel.isOpen()) {
                        throw new ClosedChannelException();
                    }
                    this.selectionKeyControl.disableReads();
                    throw new EOFException();
                }
                if (read == 0) {
                    break;
                }
                if (LOGGER.isTraceEnabled()) {
                    LOGGER.trace("Read {} bytes from {}", Long.valueOf(read), this.name);
                }
                this.meters.readBytes.update(read);
                this.other.suggestImmediateSent();
            } catch (Throwable th) {
                tcpQueue.releaseWritableBuffers();
                throw th;
            }
        }
        this.other.suggestDeferredSent();
    }

    private void processPostOperations() {
        if (this.incomingQueue.hasReadable() || !this.other.state.isReadEof()) {
            return;
        }
        while (!this.postOperations.isEmpty()) {
            this.postOperations.poll().run();
        }
    }

    private void shutdownWrite() {
        if (this.channel.isOpen()) {
            try {
                this.channel.shutdownOutput();
            } catch (IOException e) {
                LOGGER.error("Fail to shutdown output", e);
            }
        }
    }

    private void suggestDeferredRead() {
        if (this.outgoingQueue.hasWritable() && this.state.isReadable()) {
            this.selectionKeyControl.enableReads();
        }
    }

    private void suggestDeferredSent() {
        if (this.incomingQueue.hasReadable() && this.state.isWritable()) {
            this.selectionKeyControl.enableWrites();
        }
    }

    private void suggestImmediateSent() throws IOException {
        if (this.incomingQueue.hasReadable() && this.state.isWritable()) {
            handleWritableEvent(true);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void freeze() {
        if (!this.state.is(State.OPEN)) {
            LOGGER.warn("Freezing while not open");
            return;
        }
        if (this.selectionKeyControl.isValid()) {
            this.selectionKeyControl.setNone();
        }
        this.state.set(State.FROZEN);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void unfreeze() {
        if (!this.state.is(State.FROZEN)) {
            LOGGER.warn("Unfreezing while not frozen");
            return;
        }
        if (this.incomingQueue.hasReadable()) {
            this.selectionKeyControl.setAll();
        } else {
            this.selectionKeyControl.setReadsOnly();
        }
        this.state.set(State.OPEN);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isFrozen() {
        return this.state.isAnyOf(State.CLOSED | State.FROZEN);
    }

    private void throttleSend(long j) {
        if (!this.state.is(State.OPEN) || this.state.isSendThrottled()) {
            return;
        }
        if (LOGGER.isTraceEnabled()) {
            LOGGER.trace("Channel {} is throttled on {}ns", this.name, Long.valueOf(j));
        }
        this.state.setSendThrottled(true);
        if (this.selectionKeyControl.isValid()) {
            this.selectionKeyControl.disableWrites();
        }
        this.reactor.getSelector().schedule(this::unthrottleSend, j);
    }

    private void unthrottleSend() {
        if (this.state.is(State.OPEN) && this.state.isSendThrottled()) {
            if (LOGGER.isTraceEnabled()) {
                LOGGER.trace("Channel {} is unthrottled", this.name);
            }
            this.state.setSendThrottled(false);
            if (this.selectionKeyControl.isValid()) {
                this.selectionKeyControl.enableWrites();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setOther(TcpChannel tcpChannel) {
        this.other = tcpChannel;
    }

    RateMeter getReadBytesMeter() {
        return this.meters.readBytes;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RateMeter getSentBytesMeter() {
        return this.meters.sentBytes;
    }
}
