/*
 * Decompiled with CFR 0.152.
 */
package io.jenkins.plugins.remotingkafka.commandtransport;

import hudson.remoting.Capability;
import hudson.remoting.Channel;
import hudson.remoting.Command;
import hudson.remoting.SynchronousCommandTransport;
import io.jenkins.plugins.remotingkafka.KafkaUtils;
import io.jenkins.plugins.remotingkafka.builder.KafkaTransportBuilder;
import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;

public class KafkaClassicCommandTransport
extends SynchronousCommandTransport {
    private static final Logger LOGGER = Logger.getLogger(KafkaClassicCommandTransport.class.getName());
    private final Capability remoteCapability;
    private final Producer<String, byte[]> producer;
    private final Consumer<String, byte[]> consumer;
    private final String producerTopic;
    private final String producerKey;
    private final String consumerTopic;
    private final String consumerKey;
    private final long pollTimeout;
    private final int producerPartition;
    private final int consumerPartition;
    private boolean isReadClosed;
    private Queue<Command> commandQueue;

    public KafkaClassicCommandTransport(KafkaTransportBuilder settings) {
        this.remoteCapability = settings.getRemoteCapability();
        this.producerKey = settings.getProducerKey();
        this.producerTopic = settings.getProducerTopic();
        this.consumerKey = settings.getConsumerKey();
        this.consumerTopic = settings.getConsumerTopic();
        this.producer = settings.getProducer();
        this.consumer = settings.getConsumer();
        this.pollTimeout = settings.getPollTimeout();
        this.producerPartition = settings.getProducerPartition();
        this.consumerPartition = settings.getConsumerPartition();
        this.commandQueue = new ConcurrentLinkedQueue<Command>();
        this.isReadClosed = false;
    }

    public final Capability getRemoteCapability() throws IOException {
        return this.remoteCapability;
    }

    public final void write(Command cmd, boolean last) throws IOException {
        byte[] bytes = SerializationUtils.serialize((Serializable)cmd);
        this.producer.send(new ProducerRecord(this.producerTopic, Integer.valueOf(this.producerPartition), (Object)this.producerKey, (Object)bytes));
        LOGGER.log(Level.FINE, "Sent a command=" + cmd.toString() + ", in topic=" + this.producerTopic + ", with key=" + this.producerKey);
    }

    public final void closeWrite() throws IOException {
        this.producer.close();
    }

    public final void closeRead() throws IOException {
        if (!this.isReadClosed) {
            this.consumer.commitSync();
            KafkaUtils.unassignConsumer(this.consumer);
            this.isReadClosed = true;
        }
    }

    public final Command read() throws IOException, ClassNotFoundException, InterruptedException {
        Command cmd;
        if (!this.commandQueue.isEmpty()) {
            Command cmd2 = this.commandQueue.poll();
            LOGGER.log(Level.FINE, "Received a command: " + cmd2.toString());
            return cmd2;
        }
        TopicPartition partition = new TopicPartition(this.consumerTopic, this.consumerPartition);
        this.consumer.assign(Arrays.asList(partition));
        do {
            ConsumerRecords records = this.consumer.poll(this.pollTimeout);
            cmd = null;
            for (ConsumerRecord record : records) {
                if (!((String)record.key()).equals(this.consumerKey)) continue;
                Command read = Command.readFrom((Channel)this.channel, (byte[])((byte[])record.value()));
                if (cmd == null) {
                    cmd = read;
                    continue;
                }
                this.commandQueue.add(read);
            }
        } while (cmd == null);
        this.consumer.commitSync();
        LOGGER.log(Level.FINE, "Received a command: " + cmd.toString());
        return cmd;
    }
}

