package com.hazelcast.jet.kafka;

import com.hazelcast.config.DataConnectionConfig;
import com.hazelcast.core.HazelcastException;
import com.hazelcast.dataconnection.DataConnectionResource;
import com.hazelcast.jet.kafka.impl.KafkaTestSupport;
import com.hazelcast.test.annotation.NightlyTest;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.Fail;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({NightlyTest.class})
/* loaded from: input_file:com/hazelcast/jet/kafka/KafkaDataConnectionTest.class */
public class KafkaDataConnectionTest {
    private static final KafkaTestSupport kafkaTestSupport = KafkaTestSupport.create();
    private KafkaDataConnection kafkaDataConnection;

    @BeforeClass
    public static void beforeClass() throws Exception {
        kafkaTestSupport.createKafkaCluster();
    }

    @AfterClass
    public static void afterClass() {
        kafkaTestSupport.shutdownKafkaCluster();
    }

    @After
    public void tearDown() {
        if (this.kafkaDataConnection != null) {
            this.kafkaDataConnection.destroy();
        }
    }

    @Test
    public void should_create_new_consumer_for_each_call() {
        this.kafkaDataConnection = createNonSharedKafkaDataConnection();
        Consumer newConsumer = this.kafkaDataConnection.newConsumer();
        try {
            Consumer newConsumer2 = this.kafkaDataConnection.newConsumer();
            try {
                Assertions.assertThat(newConsumer).isNotSameAs(newConsumer2);
                if (newConsumer2 != null) {
                    newConsumer2.close();
                }
                if (newConsumer != null) {
                    newConsumer.close();
                }
            } catch (Throwable th) {
                if (newConsumer2 != null) {
                    try {
                        newConsumer2.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (Throwable th3) {
            if (newConsumer != null) {
                try {
                    newConsumer.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    @Test
    public void newConsumer_should_fail_with_shared_data_connection() {
        this.kafkaDataConnection = createKafkaDataConnection(kafkaTestSupport);
        Assertions.assertThatThrownBy(() -> {
            this.kafkaDataConnection.newConsumer();
        }).isInstanceOf(IllegalArgumentException.class).hasMessage("KafkaConsumer is not thread-safe and can't be used with shared DataConnection 'kafka-data-connection'");
        Assertions.assertThatThrownBy(() -> {
            this.kafkaDataConnection.newConsumer(new Properties());
        }).isInstanceOf(IllegalArgumentException.class).hasMessage("KafkaConsumer is not thread-safe and can't be used with shared DataConnection 'kafka-data-connection'");
    }

    @Test
    public void shared_data_connection_should_return_same_producer() {
        this.kafkaDataConnection = createKafkaDataConnection(kafkaTestSupport);
        KafkaProducer producer = this.kafkaDataConnection.getProducer((String) null);
        try {
            KafkaProducer producer2 = this.kafkaDataConnection.getProducer((String) null);
            try {
                Assertions.assertThat(producer).isSameAs(producer2);
                if (producer2 != null) {
                    producer2.close();
                }
                if (producer != null) {
                    producer.close();
                }
            } catch (Throwable th) {
                if (producer2 != null) {
                    try {
                        producer2.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (Throwable th3) {
            if (producer != null) {
                try {
                    producer.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    @Test
    public void non_shared_data_connection_should_return_new_producer() {
        this.kafkaDataConnection = createNonSharedKafkaDataConnection();
        KafkaProducer producer = this.kafkaDataConnection.getProducer((String) null);
        try {
            KafkaProducer producer2 = this.kafkaDataConnection.getProducer((String) null);
            try {
                Assertions.assertThat(producer).isNotSameAs(producer2);
                if (producer2 != null) {
                    producer2.close();
                }
                if (producer != null) {
                    producer.close();
                }
            } catch (Throwable th) {
                if (producer2 != null) {
                    try {
                        producer2.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (Throwable th3) {
            if (producer != null) {
                try {
                    producer.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    @Test
    public void list_resources_should_return_empty_list_for_no_topics() {
        this.kafkaDataConnection = createKafkaDataConnection(kafkaTestSupport);
        Assertions.assertThat((List) this.kafkaDataConnection.listResources().stream().filter(dataConnectionResource -> {
            return !Arrays.toString(dataConnectionResource.name()).contains("__confluent");
        }).collect(Collectors.toList())).isEmpty();
    }

    @Test
    public void list_resources_should_return_topics() throws IOException {
        KafkaTestSupport create = KafkaTestSupport.create();
        create.createKafkaCluster();
        this.kafkaDataConnection = createKafkaDataConnection(create);
        try {
            create.createTopic("my-topic", 2);
            Assertions.assertThat((List) this.kafkaDataConnection.listResources().stream().filter(dataConnectionResource -> {
                return !Arrays.toString(dataConnectionResource.name()).contains("__confluent");
            }).collect(Collectors.toList())).containsExactly(new DataConnectionResource[]{new DataConnectionResource("topic", "my-topic")});
            this.kafkaDataConnection.destroy();
            create.shutdownKafkaCluster();
        } catch (Throwable th) {
            this.kafkaDataConnection.destroy();
            create.shutdownKafkaCluster();
            throw th;
        }
    }

    @Test
    public void releasing_data_connection_does_not_close_shared_producer() {
        this.kafkaDataConnection = createKafkaDataConnection(kafkaTestSupport);
        KafkaProducer producer = this.kafkaDataConnection.getProducer((String) null);
        this.kafkaDataConnection.release();
        try {
            producer.partitionsFor("my-topic");
        } catch (Exception e) {
            Fail.fail("Should not throw exception", e);
        }
    }

    @Test
    public void shared_producer_should_not_be_closed_after_one_close() {
        this.kafkaDataConnection = createKafkaDataConnection(kafkaTestSupport);
        KafkaProducer producer = this.kafkaDataConnection.getProducer((String) null);
        this.kafkaDataConnection.getProducer((String) null);
        this.kafkaDataConnection.release();
        producer.close();
        try {
            producer.partitionsFor("my-topic");
        } catch (Exception e) {
            Fail.fail("Should not throw exception", e);
        }
    }

    @Test
    public void shared_producer_should_be_closed_after_all_close() {
        this.kafkaDataConnection = createKafkaDataConnection(kafkaTestSupport);
        KafkaProducer producer = this.kafkaDataConnection.getProducer((String) null);
        KafkaProducer producer2 = this.kafkaDataConnection.getProducer((String) null);
        this.kafkaDataConnection.release();
        producer.close();
        producer2.close();
        Assertions.assertThatThrownBy(() -> {
            producer.partitionsFor("my-topic");
        }).isInstanceOf(KafkaException.class).hasMessage("Requested metadata update after close");
    }

    @Test
    public void should_list_resource_types() {
        this.kafkaDataConnection = createKafkaDataConnection(kafkaTestSupport);
        Assertions.assertThat(this.kafkaDataConnection.resourceTypes()).map(str -> {
            return str.toLowerCase(Locale.ROOT);
        }).containsExactlyInAnyOrder(new String[]{"topic"});
    }

    @Test
    public void shared_producer_should_not_be_created_with_additional_props() {
        this.kafkaDataConnection = createKafkaDataConnection(kafkaTestSupport);
        Properties properties = new Properties();
        properties.setProperty("A", "B");
        Assertions.assertThatThrownBy(() -> {
            this.kafkaDataConnection.getProducer((String) null, properties);
        }).isInstanceOf(HazelcastException.class).hasMessageContaining("For shared Kafka producer, please provide all serialization options");
        this.kafkaDataConnection.release();
    }

    @Test
    public void shared_producer_is_allowed_to_be_created_with_empty_props() {
        this.kafkaDataConnection = createKafkaDataConnection(kafkaTestSupport);
        KafkaProducer producer = this.kafkaDataConnection.getProducer((String) null, new Properties());
        Assertions.assertThat(producer).isNotNull();
        producer.close();
        this.kafkaDataConnection.release();
    }

    private KafkaDataConnection createKafkaDataConnection(KafkaTestSupport kafkaTestSupport2) {
        return new KafkaDataConnection(new DataConnectionConfig("kafka-data-connection").setType("Kafka").setShared(true).setProperties(properties(kafkaTestSupport2)));
    }

    private KafkaDataConnection createNonSharedKafkaDataConnection() {
        return new KafkaDataConnection(new DataConnectionConfig("kafka-data-connection").setType("Kafka").setShared(false).setProperties(properties(kafkaTestSupport)));
    }

    public Properties properties(KafkaTestSupport kafkaTestSupport2) {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", kafkaTestSupport2.getBrokerConnectionString());
        properties.setProperty("key.deserializer", IntegerDeserializer.class.getCanonicalName());
        properties.setProperty("key.serializer", IntegerSerializer.class.getCanonicalName());
        properties.setProperty("value.serializer", StringSerializer.class.getCanonicalName());
        properties.setProperty("value.deserializer", StringDeserializer.class.getCanonicalName());
        properties.setProperty("auto.offset.reset", "earliest");
        return properties;
    }
}
