package com.github.charithe.kafka;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

/* loaded from: input_file:com/github/charithe/kafka/KafkaHelper.class */
public class KafkaHelper {
    private final EphemeralKafkaBroker broker;

    /* loaded from: input_file:com/github/charithe/kafka/KafkaHelper$RecordConsumer.class */
    public static class RecordConsumer<K, V> implements Callable<List<ConsumerRecord<K, V>>> {
        private final int numRecordsToPoll;
        private final KafkaConsumer<K, V> consumer;

        RecordConsumer(int i, KafkaConsumer<K, V> kafkaConsumer) {
            this.numRecordsToPoll = i;
            this.consumer = kafkaConsumer;
        }

        @Override // java.util.concurrent.Callable
        public List<ConsumerRecord<K, V>> call() throws Exception {
            try {
                HashMap newHashMap = Maps.newHashMap();
                ArrayList arrayList = new ArrayList(this.numRecordsToPoll);
                while (arrayList.size() < this.numRecordsToPoll && !Thread.currentThread().isInterrupted()) {
                    Iterator it = this.consumer.poll(0L).iterator();
                    while (true) {
                        if (it.hasNext()) {
                            ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                            arrayList.add(consumerRecord);
                            newHashMap.put(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), new OffsetAndMetadata(consumerRecord.offset() + 1));
                            if (arrayList.size() == this.numRecordsToPoll) {
                                this.consumer.commitSync(newHashMap);
                                break;
                            }
                        }
                    }
                }
                return arrayList;
            } finally {
                this.consumer.close();
            }
        }
    }

    public static KafkaHelper createFor(EphemeralKafkaBroker ephemeralKafkaBroker) {
        return new KafkaHelper(ephemeralKafkaBroker);
    }

    KafkaHelper(EphemeralKafkaBroker ephemeralKafkaBroker) {
        this.broker = ephemeralKafkaBroker;
    }

    public Properties producerConfig() {
        return this.broker.producerConfig();
    }

    public Properties consumerConfig() {
        return this.broker.consumerConfig();
    }

    public Properties consumerConfig(boolean z) {
        return this.broker.consumerConfig(z);
    }

    public String zookeeperConnectionString() {
        return this.broker.getZookeeperConnectString().orElseThrow(() -> {
            return new IllegalStateException("KafkaBroker is not running");
        });
    }

    public int zookeeperPort() {
        return this.broker.getZookeeperPort().orElseThrow(() -> {
            return new IllegalStateException("KafkaBroker is not running");
        }).intValue();
    }

    public int kafkaPort() {
        return this.broker.getKafkaPort().orElseThrow(() -> {
            return new IllegalStateException("KafkaBroker is not running");
        }).intValue();
    }

    public <K, V> KafkaProducer<K, V> createProducer(Serializer<K> serializer, Serializer<V> serializer2, Properties properties) {
        return this.broker.createProducer(serializer, serializer2, properties);
    }

    public KafkaProducer<String, String> createStringProducer() {
        return createProducer(new StringSerializer(), new StringSerializer(), null);
    }

    public KafkaProducer<String, String> createStringProducer(Properties properties) {
        return createProducer(new StringSerializer(), new StringSerializer(), properties);
    }

    public KafkaProducer<byte[], byte[]> createByteProducer() {
        return createProducer(new ByteArraySerializer(), new ByteArraySerializer(), null);
    }

    public KafkaProducer<byte[], byte[]> createByteProducer(Properties properties) {
        return createProducer(new ByteArraySerializer(), new ByteArraySerializer(), properties);
    }

    public <K, V> void produce(String str, KafkaProducer<K, V> kafkaProducer, Map<K, V> map) {
        map.forEach((obj, obj2) -> {
            kafkaProducer.send(new ProducerRecord(str, obj, obj2));
        });
        kafkaProducer.flush();
    }

    public void produceStrings(String str, String... strArr) {
        KafkaProducer<String, String> createStringProducer = createStringProducer();
        try {
            produce(str, createStringProducer, (Map) Arrays.stream(strArr).collect(Collectors.toMap(str2 -> {
                return String.valueOf(str2.hashCode());
            }, Function.identity())));
            if (createStringProducer != null) {
                createStringProducer.close();
            }
        } catch (Throwable th) {
            if (createStringProducer != null) {
                try {
                    createStringProducer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    public <K, V> KafkaConsumer<K, V> createConsumer(Deserializer<K> deserializer, Deserializer<V> deserializer2, Properties properties) {
        return this.broker.createConsumer(deserializer, deserializer2, properties);
    }

    public KafkaConsumer<String, String> createStringConsumer() {
        return createConsumer(new StringDeserializer(), new StringDeserializer(), null);
    }

    public KafkaConsumer<String, String> createStringConsumer(Properties properties) {
        return createConsumer(new StringDeserializer(), new StringDeserializer(), properties);
    }

    public KafkaConsumer<byte[], byte[]> createByteConsumer() {
        return createConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), null);
    }

    public KafkaConsumer<byte[], byte[]> createByteConsumer(Properties properties) {
        return createConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), properties);
    }

    public <K, V> ListenableFuture<List<ConsumerRecord<K, V>>> consume(String str, KafkaConsumer<K, V> kafkaConsumer, int i) {
        kafkaConsumer.subscribe(Lists.newArrayList(new String[]{str}));
        return MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()).submit(new RecordConsumer(i, kafkaConsumer));
    }

    public ListenableFuture<List<String>> consumeStrings(String str, int i) {
        return Futures.transform(consume(str, createStringConsumer(), i), this::extractValues, MoreExecutors.directExecutor());
    }

    private List<String> extractValues(List<ConsumerRecord<String, String>> list) {
        return list == null ? Collections.emptyList() : (List) list.stream().map((v0) -> {
            return v0.value();
        }).collect(Collectors.toList());
    }
}
