package io.kiw.speedy.subscriber;

import io.kiw.speedy.SpeedyConnection;
import io.kiw.speedy.SpeedyMessagingImpl;
import io.kiw.speedy.channel.NackSchedulerJob;
import io.kiw.speedy.helper.ImmutableIntMap;
import io.kiw.speedy.marshaller.MessageUnMarshaller;
import io.kiw.speedy.publisher.PublishPromise;
import io.kiw.speedy.publisher.SchedulerThread;
import io.kiw.speedy.wiring.SpeedyWiring;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;

/* loaded from: input_file:io/kiw/speedy/subscriber/SpeedyMessagingSubscriber.class */
public class SpeedyMessagingSubscriber implements Runnable {
    private final MessageUnMarshaller messageUnmarshaller;
    private final ImmutableIntMap<SpeedyConnection> remoteConnections;
    private final PublishPromise publishPromise;
    private final ByteBuffer buffer = ByteBuffer.allocateDirect(SpeedyMessagingImpl.DATAGRAM_LENGTH);
    private final OnMessageErrorHandler onMessageErrorHandler;
    private final SpeedyWiring wiring;
    private final SchedulerThread schedulerThread;
    private final ImmutableIntMap<GenericHandler> subscriptions;
    private final NackSchedulerJob nackSchedulerJob;

    public SpeedyMessagingSubscriber(MessageUnMarshaller messageUnMarshaller, ImmutableIntMap<SpeedyConnection> immutableIntMap, PublishPromise publishPromise, OnMessageErrorHandler onMessageErrorHandler, SpeedyWiring speedyWiring, NackSchedulerJob nackSchedulerJob, SchedulerThread schedulerThread, ImmutableIntMap<GenericHandler> immutableIntMap2) {
        this.messageUnmarshaller = messageUnMarshaller;
        this.remoteConnections = immutableIntMap;
        this.publishPromise = publishPromise;
        this.onMessageErrorHandler = onMessageErrorHandler;
        this.wiring = speedyWiring;
        this.schedulerThread = schedulerThread;
        this.subscriptions = immutableIntMap2;
        this.wiring.registerFragmentHandler(this::handleFragment);
        this.nackSchedulerJob = nackSchedulerJob;
    }

    @Override // java.lang.Runnable
    public void run() {
        while (!Thread.interrupted()) {
            handleFragment();
        }
    }

    private void handleFragment() {
        this.buffer.clear();
        this.wiring.receive(this.buffer);
        this.buffer.flip();
        SubscriberChannelState handlePacket = handlePacket(this.buffer);
        if (handlePacket != null) {
            handleRecoveryMessages(handlePacket);
        }
    }

    private void handleRecoveryMessages(SubscriberChannelState subscriberChannelState) {
        while (true) {
            ByteBuffer nextRecoverMessage = subscriberChannelState.getNextRecoverMessage();
            if (!nextRecoverMessage.hasRemaining()) {
                return;
            }
            long j = nextRecoverMessage.getLong();
            int i = nextRecoverMessage.getInt();
            nextRecoverMessage.getInt();
            while (nextRecoverMessage.remaining() > 0) {
                this.messageUnmarshaller.unmarshallAndPotentiallyHandle(nextRecoverMessage, this.subscriptions, this.onMessageErrorHandler, subscriberChannelState, i, this.publishPromise);
            }
            subscriberChannelState.increment(j);
        }
    }

    private SubscriberChannelState handlePacket(ByteBuffer byteBuffer) {
        if (byteBuffer.remaining() == 0) {
            return null;
        }
        long j = byteBuffer.getLong();
        int i = byteBuffer.getInt();
        SpeedyConnection speedyConnection = this.remoteConnections.get(i);
        int i2 = byteBuffer.getInt();
        SubscriberChannelState channelSequenceState = speedyConnection.getChannelSequenceState(i2);
        long sequenceNumber = channelSequenceState.getSequenceNumber();
        if (j == sequenceNumber) {
            while (byteBuffer.remaining() > 0) {
                this.messageUnmarshaller.unmarshallAndPotentiallyHandle(byteBuffer, this.subscriptions, this.onMessageErrorHandler, channelSequenceState, i, this.publishPromise);
            }
            channelSequenceState.increment(j);
        } else {
            if (j > sequenceNumber) {
                byteBuffer.rewind();
                if (sequenceNumber == 0) {
                    channelSequenceState.setSequenceNumber(j + 1);
                    return handlePacket(byteBuffer);
                }
                channelSequenceState.copyToRecovery(byteBuffer, j);
                this.nackSchedulerJob.onNack(i2, i, sequenceNumber, j - 1);
                return null;
            }
            if (j < sequenceNumber) {
                return null;
            }
        }
        byteBuffer.clear();
        return channelSequenceState;
    }

    public void close() throws IOException {
        this.wiring.closeSubscriber();
    }

    public void addNackScheduledJob() {
        SchedulerThread schedulerThread = this.schedulerThread;
        long nanos = TimeUnit.MICROSECONDS.toNanos(100L);
        NackSchedulerJob nackSchedulerJob = this.nackSchedulerJob;
        nackSchedulerJob.getClass();
        schedulerThread.addJob(new SchedulerThread.ScheduledJob(nanos, 0L, nackSchedulerJob::onPulse));
    }
}
