/*
 * Decompiled with CFR 0.152.
 */
package io.jenkins.plugins.remotingkafka.commandtransport;

import hudson.remoting.AbstractByteArrayCommandTransport;
import hudson.remoting.Capability;
import hudson.remoting.Channel;
import hudson.remoting.ChunkHeader;
import java.io.IOException;
import java.util.Arrays;
import java.util.logging.Logger;
import javax.annotation.Nonnull;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class KafkaByteArrayCommandTransport
extends AbstractByteArrayCommandTransport {
    public static final Logger LOGGER = Logger.getLogger(KafkaByteArrayCommandTransport.class.getName());
    private static final int DEFAULT_TRANSPORT_FRAME_SIZE = 8192;
    private final Capability remoteCapability;
    private final Producer<String, byte[]> producer;
    private final Consumer<String, byte[]> consumer;
    private final String topic;
    private final String key;
    private AbstractByteArrayCommandTransport.ByteArrayReceiver receiver;
    private int transportFrameSize;

    public KafkaByteArrayCommandTransport(Capability remoteCapability, String topic, String key, Producer<String, byte[]> producer, Consumer<String, byte[]> consumer) {
        this.remoteCapability = remoteCapability;
        this.topic = topic;
        this.key = key;
        this.producer = producer;
        this.consumer = consumer;
        this.transportFrameSize = 8192;
    }

    public void writeBlock(Channel channel, byte[] bytes) throws IOException {
        boolean hasMore;
        int pos = 0;
        do {
            int frame;
            hasMore = (frame = Math.min(this.transportFrameSize, bytes.length - pos)) + pos < bytes.length;
            byte[] chunkHeader = ChunkHeader.pack((int)frame, (boolean)hasMore);
            this.producer.send(new ProducerRecord(this.topic, (Object)this.key, (Object)chunkHeader));
            byte[] payload = Arrays.copyOfRange(bytes, pos, pos + frame);
            this.producer.send(new ProducerRecord(this.topic, (Object)this.key, (Object)payload));
            pos += frame;
        } while (hasMore);
    }

    public void setup(@Nonnull AbstractByteArrayCommandTransport.ByteArrayReceiver byteArrayReceiver) {
        this.receiver = byteArrayReceiver;
    }

    public Capability getRemoteCapability() throws IOException {
        return this.remoteCapability;
    }

    public void closeWrite() throws IOException {
    }

    public void closeRead() throws IOException {
        this.consumer.commitSync();
        this.consumer.close();
    }
}

