package io.jenkins.plugins.remotingkafka.commandtransport;

import hudson.remoting.AbstractSynchronousByteArrayCommandTransport;
import hudson.remoting.Capability;
import hudson.remoting.Channel;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.logging.Logger;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

/* loaded from: input_file:WEB-INF/lib/kafka-client-lib-2.0.1.jar:io/jenkins/plugins/remotingkafka/commandtransport/KafkaChunkedCommandTransport.class */
public class KafkaChunkedCommandTransport extends AbstractSynchronousByteArrayCommandTransport {
    private static final Logger LOGGER = Logger.getLogger(KafkaClassicCommandTransport.class.getName());
    private final Capability remoteCapability;
    private final Producer<String, byte[]> producer;
    private final Consumer<String, byte[]> consumer;
    private final String producerTopic;
    private final String producerKey;
    private final List<String> consumerTopics;
    private final String consumerKey;
    private final long pollTimeout;

    public KafkaChunkedCommandTransport(Capability capability, String str, String str2, List<String> list, String str3, long j, Producer<String, byte[]> producer, Consumer<String, byte[]> consumer) {
        this.remoteCapability = capability;
        this.producerKey = str2;
        this.producerTopic = str;
        this.consumerKey = str3;
        this.consumerTopics = list;
        this.producer = producer;
        this.consumer = consumer;
        this.pollTimeout = j;
    }

    public byte[] readBlock(Channel channel) throws IOException, ClassNotFoundException {
        this.consumer.subscribe(this.consumerTopics);
        byte[] bArr = null;
        this.consumer.subscribe(this.consumerTopics);
        do {
            Iterator<ConsumerRecord<String, byte[]>> it = this.consumer.poll(this.pollTimeout).iterator();
            while (it.hasNext()) {
                ConsumerRecord<String, byte[]> next = it.next();
                if (next.key().equals(this.consumerKey)) {
                    bArr = next.value();
                }
            }
        } while (bArr == null);
        return bArr;
    }

    public void writeBlock(Channel channel, byte[] bArr) throws IOException {
        this.producer.send(new ProducerRecord<>(this.producerTopic, this.producerKey, bArr));
    }

    public Capability getRemoteCapability() throws IOException {
        return this.remoteCapability;
    }

    public void closeWrite() throws IOException {
    }

    public void closeRead() throws IOException {
        this.consumer.commitSync();
        this.consumer.close();
    }
}
