package com.hazelcast.jet.kafka.impl;

import com.hazelcast.core.HazelcastJsonValue;
import com.hazelcast.jet.kafka.HazelcastKafkaAvroSerializer;
import com.hazelcast.test.DockerTestUtil;
import com.hazelcast.test.HazelcastTestSupport;
import io.confluent.kafka.schemaregistry.client.rest.entities.Schema;
import io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryException;
import io.confluent.kafka.schemaregistry.rest.SchemaRegistryConfig;
import io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication;
import io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Bytes;
import org.eclipse.jetty.server.Server;
import org.junit.Assert;

/* loaded from: input_file:com/hazelcast/jet/kafka/impl/KafkaTestSupport.class */
public abstract class KafkaTestSupport {
    static final long KAFKA_MAX_BLOCK_MS = TimeUnit.MINUTES.toMillis(2);
    private final Map<String, KafkaProducer<Object, Object>> producers = new HashMap();
    private final Map<String, Map<String, String>> producerProperties = new HashMap();
    private String brokerConnectionString;
    private Admin admin;
    private Server schemaRegistryServer;
    private KafkaSchemaRegistry schemaRegistry;

    public static KafkaTestSupport create() {
        if (DockerTestUtil.dockerEnabled()) {
            return System.getProperties().containsKey("test.kafka.use.redpanda") ? new DockerizedRedPandaTestSupport() : new DockerizedKafkaTestSupport();
        }
        assertPropertyNotSet("test.kafka.version");
        assertPropertyNotSet("test.redpanda.version");
        assertPropertyNotSet("test.kafka.use.redpanda");
        return new EmbeddedKafkaTestSupport();
    }

    private static void assertPropertyNotSet(String str) {
        if (System.getProperties().containsKey(str)) {
            throw new IllegalArgumentException("'" + str + "' system property requires docker enabled");
        }
    }

    public void createKafkaCluster() throws IOException {
        this.brokerConnectionString = createKafkaCluster0();
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", this.brokerConnectionString);
        this.admin = Admin.create(properties);
    }

    protected abstract String createKafkaCluster0() throws IOException;

    public void shutdownKafkaCluster() {
        shutdownKafkaCluster0();
        this.brokerConnectionString = null;
        if (this.admin != null) {
            this.admin.close();
            this.admin = null;
        }
        this.producers.values().forEach((v0) -> {
            v0.close();
        });
        this.producers.clear();
    }

    protected abstract void shutdownKafkaCluster0();

    public String getBrokerConnectionString() {
        return this.brokerConnectionString;
    }

    public void createTopic(String str, int i) {
        try {
            this.admin.createTopics(Collections.singletonList(new NewTopic(str, i, (short) 1))).all().get();
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    public void deleteTopic(String str) {
        try {
            this.admin.deleteTopics(List.of(str)).all().get();
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    public void setPartitionCount(String str, int i) {
        HashMap hashMap = new HashMap();
        hashMap.put(str, NewPartitions.increaseTo(i));
        this.admin.createPartitions(hashMap);
        this.producers.remove(str);
    }

    public void setProducerProperties(String str, Map<String, String> map) {
        this.producerProperties.put(str, map);
        this.producers.remove(str);
    }

    public Future<RecordMetadata> produce(String str, Object obj, Object obj2) {
        return this.producers.computeIfAbsent(str, str2 -> {
            return createProducer(str, obj, obj2);
        }).send(new ProducerRecord(str, obj, obj2));
    }

    public void produceSync(String str, Object obj, Object obj2) {
        try {
            produce(str, obj, obj2).get();
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    public Future<RecordMetadata> produce(String str, int i, Long l, Object obj, Object obj2) {
        return this.producers.computeIfAbsent(str, str2 -> {
            return createProducer(str, obj, obj2);
        }).send(new ProducerRecord(str, Integer.valueOf(i), l, obj, obj2));
    }

    private KafkaProducer<Object, Object> createProducer(String str, Object obj, Object obj2) {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", this.brokerConnectionString);
        properties.setProperty("key.serializer", resolveSerializer(str, obj));
        properties.setProperty("value.serializer", resolveSerializer(str, obj2));
        properties.setProperty("max.block.ms", String.valueOf(KAFKA_MAX_BLOCK_MS));
        Optional ofNullable = Optional.ofNullable(this.producerProperties.get(str));
        Objects.requireNonNull(properties);
        ofNullable.ifPresent(properties::putAll);
        return new KafkaProducer<>(properties);
    }

    private String resolveSerializer(String str, Object obj) {
        if (obj instanceof String) {
            return "org.apache.kafka.common.serialization.StringSerializer";
        }
        if (obj instanceof Short) {
            return "org.apache.kafka.common.serialization.ShortSerializer";
        }
        if (obj instanceof Integer) {
            return "org.apache.kafka.common.serialization.IntegerSerializer";
        }
        if (obj instanceof Long) {
            return "org.apache.kafka.common.serialization.LongSerializer";
        }
        if (obj instanceof Float) {
            return "org.apache.kafka.common.serialization.FloatSerializer";
        }
        if (obj instanceof Double) {
            return "org.apache.kafka.common.serialization.DoubleSerializer";
        }
        if (obj instanceof byte[]) {
            return "org.apache.kafka.common.serialization.ByteArraySerializer";
        }
        if (obj instanceof ByteBuffer) {
            return "org.apache.kafka.common.serialization.ByteBufferSerializer";
        }
        if (obj instanceof Bytes) {
            return "org.apache.kafka.common.serialization.BytesSerializer";
        }
        if (obj instanceof UUID) {
            return "org.apache.kafka.common.serialization.UUIDSerializer";
        }
        if (obj instanceof GenericRecord) {
            Map<String, String> map = this.producerProperties.get(str);
            return (map == null || !map.containsKey("schema.registry.url")) ? HazelcastKafkaAvroSerializer.class.getCanonicalName() : "io.confluent.kafka.serializers.KafkaAvroSerializer";
        }
        if (obj instanceof HazelcastJsonValue) {
            return HazelcastJsonValueSerializer.class.getCanonicalName();
        }
        throw new IllegalArgumentException("Unknown class: " + obj.getClass().getCanonicalName() + ". Supported types are: String, Short, Integer, Long, Float, Double, ByteArray, ByteBuffer, Bytes, UUID, GenericRecord, HazelcastJsonValue");
    }

    public KafkaConsumer<Integer, String> createConsumer(String... strArr) {
        return createConsumer(IntegerDeserializer.class, StringDeserializer.class, Collections.emptyMap(), strArr);
    }

    public <K, V> KafkaConsumer<K, V> createConsumer(Class<? extends Deserializer<? super K>> cls, Class<? extends Deserializer<? super V>> cls2, Map<String, String> map, String... strArr) {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", this.brokerConnectionString);
        properties.setProperty("group.id", HazelcastTestSupport.randomString());
        properties.setProperty("client.id", "consumer0");
        properties.setProperty("key.deserializer", cls.getCanonicalName());
        properties.setProperty("value.deserializer", cls2.getCanonicalName());
        properties.setProperty("isolation.level", "read_committed");
        properties.setProperty("auto.offset.reset", "earliest");
        properties.putAll(map);
        KafkaConsumer<K, V> kafkaConsumer = new KafkaConsumer<>(properties);
        kafkaConsumer.subscribe(List.of((Object[]) strArr));
        return kafkaConsumer;
    }

    public void createSchemaRegistry(SchemaRegistryConfig schemaRegistryConfig) throws Exception {
        SchemaRegistryRestApplication schemaRegistryRestApplication = new SchemaRegistryRestApplication(schemaRegistryConfig);
        this.schemaRegistryServer = schemaRegistryRestApplication.createServer();
        this.schemaRegistryServer.start();
        this.schemaRegistry = schemaRegistryRestApplication.schemaRegistry();
    }

    public void shutdownSchemaRegistry() throws Exception {
        if (this.schemaRegistryServer != null) {
            this.schemaRegistryServer.stop();
        }
    }

    public URI getSchemaRegistryURI() {
        return this.schemaRegistryServer.getURI();
    }

    public Schema registerSchema(String str, org.apache.avro.Schema schema) throws SchemaRegistryException {
        return this.schemaRegistry.register(str, new Schema(str, -1, -1, "AVRO", Collections.emptyList(), schema.toString()));
    }

    public int getLatestSchemaVersion(String str) throws SchemaRegistryException {
        return ((Integer) Optional.ofNullable(this.schemaRegistry.getLatestVersion(str)).map(schema -> {
            return schema.getVersion();
        }).orElseThrow(() -> {
            return new SchemaRegistryException("No schema found in subject '" + str + "'");
        })).intValue();
    }

    public void assertTopicContentsEventually(String str, Map<Integer, String> map, boolean z) {
        KafkaConsumer<Integer, String> createConsumer = createConsumer(str);
        try {
            long nanoTime = System.nanoTime() + TimeUnit.SECONDS.toNanos(10L);
            int i = 0;
            while (i < map.size() && System.nanoTime() < nanoTime) {
                Iterator it = createConsumer.poll(Duration.ofMillis(100L)).iterator();
                while (it.hasNext()) {
                    ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                    Assert.assertEquals("key=" + consumerRecord.key(), map.get(consumerRecord.key()), consumerRecord.value());
                    if (z) {
                        Assert.assertEquals(((Integer) consumerRecord.key()).intValue(), consumerRecord.partition());
                    }
                    i++;
                }
            }
            if (createConsumer != null) {
                createConsumer.close();
            }
        } catch (Throwable th) {
            if (createConsumer != null) {
                try {
                    createConsumer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    public <K, V> void assertTopicContentsEventually(String str, Map<K, V> map, Class<? extends Deserializer<? super K>> cls, Class<? extends Deserializer<? super V>> cls2) {
        assertTopicContentsEventually(str, map, cls, cls2, Collections.emptyMap());
    }

    public <K, V> void assertTopicContentsEventually(String str, Map<K, V> map, Class<? extends Deserializer<? super K>> cls, Class<? extends Deserializer<? super V>> cls2, Map<String, String> map2) {
        KafkaConsumer<K, V> createConsumer = createConsumer(cls, cls2, map2, str);
        try {
            long nanoTime = System.nanoTime() + TimeUnit.SECONDS.toNanos(KAFKA_MAX_BLOCK_MS);
            HashSet hashSet = new HashSet();
            int i = 0;
            while (i < map.size() && System.nanoTime() < nanoTime) {
                Iterator it = createConsumer.poll(Duration.ofMillis(100L)).iterator();
                while (it.hasNext()) {
                    ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                    Assert.assertTrue("key=" + consumerRecord.key() + " already seen", hashSet.add(consumerRecord.key()));
                    V v = map.get(consumerRecord.key());
                    Assert.assertNotNull("key=" + consumerRecord.key() + " received, but not expected", v);
                    Assert.assertEquals("key=" + consumerRecord.key(), v, consumerRecord.value());
                    i++;
                }
            }
            if (createConsumer != null) {
                createConsumer.close();
            }
        } catch (Throwable th) {
            if (createConsumer != null) {
                try {
                    createConsumer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }
}
