/*
 * Decompiled with CFR 0.152.
 */
package io.jenkins.plugins.remotingkafka.commandtransport;

import hudson.remoting.AbstractSynchronousByteArrayCommandTransport;
import hudson.remoting.Capability;
import hudson.remoting.Channel;
import io.jenkins.plugins.remotingkafka.commandtransport.KafkaClassicCommandTransport;
import java.io.IOException;
import java.util.List;
import java.util.logging.Logger;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class KafkaChunkedCommandTransport
extends AbstractSynchronousByteArrayCommandTransport {
    private static final Logger LOGGER = Logger.getLogger(KafkaClassicCommandTransport.class.getName());
    private final Capability remoteCapability;
    private final Producer<String, byte[]> producer;
    private final Consumer<String, byte[]> consumer;
    private final String producerTopic;
    private final String producerKey;
    private final List<String> consumerTopics;
    private final String consumerKey;
    private final long pollTimeout;

    public KafkaChunkedCommandTransport(Capability remoteCapability, String producerTopic, String producerKey, List<String> consumerTopics, String consumerKey, long pollTimeout, Producer<String, byte[]> producer, Consumer<String, byte[]> consumer) {
        this.remoteCapability = remoteCapability;
        this.producerKey = producerKey;
        this.producerTopic = producerTopic;
        this.consumerKey = consumerKey;
        this.consumerTopics = consumerTopics;
        this.producer = producer;
        this.consumer = consumer;
        this.pollTimeout = pollTimeout;
    }

    public byte[] readBlock(Channel channel) throws IOException, ClassNotFoundException {
        this.consumer.subscribe(this.consumerTopics);
        byte[] data = null;
        this.consumer.subscribe(this.consumerTopics);
        do {
            ConsumerRecords records = this.consumer.poll(this.pollTimeout);
            for (ConsumerRecord record : records) {
                if (!((String)record.key()).equals(this.consumerKey)) continue;
                data = (byte[])record.value();
            }
        } while (data == null);
        return data;
    }

    public void writeBlock(Channel channel, byte[] bytes) throws IOException {
        this.producer.send(new ProducerRecord(this.producerTopic, (Object)this.producerKey, (Object)bytes));
    }

    public Capability getRemoteCapability() throws IOException {
        return this.remoteCapability;
    }

    public void closeWrite() throws IOException {
    }

    public void closeRead() throws IOException {
        this.consumer.commitSync();
        this.consumer.close();
    }
}

