package io.jenkins.plugins.remotingkafka.commandtransport;

import hudson.remoting.Capability;
import hudson.remoting.Command;
import hudson.remoting.SynchronousCommandTransport;
import io.jenkins.plugins.remotingkafka.KafkaUtils;
import io.jenkins.plugins.remotingkafka.builder.KafkaTransportBuilder;
import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;

/* loaded from: input_file:WEB-INF/lib/kafka-client-lib-1.1.3.jar:io/jenkins/plugins/remotingkafka/commandtransport/KafkaClassicCommandTransport.class */
public class KafkaClassicCommandTransport extends SynchronousCommandTransport {
    private static final Logger LOGGER = Logger.getLogger(KafkaClassicCommandTransport.class.getName());
    private final Capability remoteCapability;
    private final Producer<String, byte[]> producer;
    private final Consumer<String, byte[]> consumer;
    private final String producerTopic;
    private final String producerKey;
    private final String consumerTopic;
    private final String consumerKey;
    private final long pollTimeout;
    private final int producerPartition;
    private final int consumerPartition;
    private Queue<Command> commandQueue = new ConcurrentLinkedQueue();
    private boolean isReadClosed = false;

    public KafkaClassicCommandTransport(KafkaTransportBuilder kafkaTransportBuilder) {
        this.remoteCapability = kafkaTransportBuilder.getRemoteCapability();
        this.producerKey = kafkaTransportBuilder.getProducerKey();
        this.producerTopic = kafkaTransportBuilder.getProducerTopic();
        this.consumerKey = kafkaTransportBuilder.getConsumerKey();
        this.consumerTopic = kafkaTransportBuilder.getConsumerTopic();
        this.producer = kafkaTransportBuilder.getProducer();
        this.consumer = kafkaTransportBuilder.getConsumer();
        this.pollTimeout = kafkaTransportBuilder.getPollTimeout();
        this.producerPartition = kafkaTransportBuilder.getProducerPartition();
        this.consumerPartition = kafkaTransportBuilder.getConsumerPartition();
    }

    public final Capability getRemoteCapability() throws IOException {
        return this.remoteCapability;
    }

    public final void write(Command command, boolean z) throws IOException {
        this.producer.send(new ProducerRecord<>(this.producerTopic, Integer.valueOf(this.producerPartition), this.producerKey, SerializationUtils.serialize(command)));
        LOGGER.log(Level.FINE, "Sent a command=" + command.toString() + ", in topic=" + this.producerTopic + ", with key=" + this.producerKey);
    }

    public final void closeWrite() throws IOException {
        this.producer.close();
    }

    public final void closeRead() throws IOException {
        if (this.isReadClosed) {
            return;
        }
        this.consumer.commitSync();
        KafkaUtils.unassignConsumer(this.consumer);
        this.isReadClosed = true;
    }

    public final Command read() throws IOException, ClassNotFoundException, InterruptedException {
        Command command;
        if (!this.commandQueue.isEmpty()) {
            Command poll = this.commandQueue.poll();
            LOGGER.log(Level.FINE, "Received a command: " + poll.toString());
            return poll;
        }
        this.consumer.assign(Arrays.asList(new TopicPartition(this.consumerTopic, this.consumerPartition)));
        do {
            command = null;
            Iterator<ConsumerRecord<String, byte[]>> it = this.consumer.poll(this.pollTimeout).iterator();
            while (it.hasNext()) {
                ConsumerRecord<String, byte[]> next = it.next();
                if (next.key().equals(this.consumerKey)) {
                    Command readFrom = Command.readFrom(this.channel, next.value());
                    if (command == null) {
                        command = readFrom;
                    } else {
                        this.commandQueue.add(readFrom);
                    }
                }
            }
        } while (command == null);
        this.consumer.commitSync();
        LOGGER.log(Level.FINE, "Received a command: " + command.toString());
        return command;
    }
}
