package com.hazelcast.jet.kafka.impl;

import com.hazelcast.config.DataConnectionConfig;
import com.hazelcast.jet.SimpleTestInClusterSupport;
import com.hazelcast.jet.impl.connector.SinkStressTestUtil;
import com.hazelcast.jet.kafka.KafkaSinks;
import com.hazelcast.jet.pipeline.DataConnectionRef;
import com.hazelcast.jet.pipeline.Sink;
import com.hazelcast.map.IMap;
import com.hazelcast.test.HazelcastParametrizedRunner;
import com.hazelcast.test.annotation.NightlyTest;
import com.hazelcast.test.annotation.ParallelJVMTest;
import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(HazelcastParametrizedRunner.class)
@Category({NightlyTest.class, ParallelJVMTest.class})
/* loaded from: input_file:com/hazelcast/jet/kafka/impl/KafkaDataConnectionStressTest.class */
public class KafkaDataConnectionStressTest extends SimpleTestInClusterSupport {
    private static final int PARTITION_COUNT = 20;
    private static KafkaTestSupport kafkaTestSupport;

    @Parameterized.Parameter
    public boolean shared;

    @Parameterized.Parameter(1)
    public boolean exactlyOnce;

    @Parameterized.Parameter(2)
    public boolean graceful;
    private final String sourceIMapName = randomMapName();
    private Properties properties;
    private String topic;
    private String dataConnectionName;
    private IMap<Integer, String> sourceIMap;

    @BeforeClass
    public static void beforeClass() throws IOException {
        kafkaTestSupport = KafkaTestSupport.create();
        kafkaTestSupport.createKafkaCluster();
        initialize(2, null);
    }

    @Parameterized.Parameters(name = "shared:{0}, exactlyOnce:{1}, graceful:{2}")
    public static Iterable<Object[]> parameters() {
        return Arrays.asList(new Object[]{false, false, false}, new Object[]{false, false, true}, new Object[]{false, true, false}, new Object[]{false, true, true}, new Object[]{true, false, false}, new Object[]{true, false, true});
    }

    @Before
    public void before() {
        this.properties = new Properties();
        this.properties.setProperty("bootstrap.servers", kafkaTestSupport.getBrokerConnectionString());
        this.properties.setProperty("key.serializer", IntegerSerializer.class.getName());
        this.properties.setProperty("value.serializer", StringSerializer.class.getName());
        this.properties.setProperty("max.block.ms", String.valueOf(KafkaTestSupport.KAFKA_MAX_BLOCK_MS));
        this.topic = randomName();
        kafkaTestSupport.createTopic(this.topic, PARTITION_COUNT);
        this.sourceIMap = instance().getMap(this.sourceIMapName);
        for (int i = 0; i < PARTITION_COUNT; i++) {
            this.sourceIMap.put(Integer.valueOf(i), String.valueOf(i));
        }
        this.dataConnectionName = "kafka-data-connection-" + randomName();
        instance().getConfig().addDataConnectionConfig(new DataConnectionConfig(this.dataConnectionName).setShared(this.shared).setType("Kafka").setProperties(this.properties));
    }

    @AfterClass
    public static void afterClass() {
        if (kafkaTestSupport != null) {
            kafkaTestSupport.shutdownKafkaCluster();
            kafkaTestSupport = null;
        }
    }

    @Test
    public void stressTest() {
        String str = this.topic;
        Sink build = KafkaSinks.kafka(DataConnectionRef.dataConnectionRef(this.dataConnectionName)).toRecordFn(num -> {
            return new ProducerRecord(str, 0, (Object) null, num.toString());
        }).exactlyOnce(this.exactlyOnce).build();
        KafkaConsumer<Integer, String> createConsumer = kafkaTestSupport.createConsumer(this.topic);
        try {
            ArrayList arrayList = new ArrayList();
            SinkStressTestUtil.test_withRestarts(instance(), this.logger, build, this.graceful, this.exactlyOnce, () -> {
                while (true) {
                    ConsumerRecords poll = createConsumer.poll(Duration.ofMillis(10L));
                    if (poll.isEmpty()) {
                        return arrayList;
                    }
                    Iterator it = poll.iterator();
                    while (it.hasNext()) {
                        arrayList.add(Integer.valueOf(Integer.parseInt((String) ((ConsumerRecord) it.next()).value())));
                    }
                }
            });
            if (createConsumer != null) {
                createConsumer.close();
            }
        } catch (Throwable th) {
            if (createConsumer != null) {
                try {
                    createConsumer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -773753003:
                if (implMethodName.equals("lambda$stressTest$9137008c$1")) {
                    z = false;
                    break;
                }
                break;
            case 1622285497:
                if (implMethodName.equals("lambda$stressTest$8b15dbba$1")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/kafka/impl/KafkaDataConnectionStressTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;Ljava/lang/Integer;)Lorg/apache/kafka/clients/producer/ProducerRecord;")) {
                    String str = (String) serializedLambda.getCapturedArg(0);
                    return num -> {
                        return new ProducerRecord(str, 0, (Object) null, num.toString());
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/SupplierEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("getEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/kafka/impl/KafkaDataConnectionStressTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/kafka/clients/consumer/KafkaConsumer;Ljava/util/List;)Ljava/util/List;")) {
                    KafkaConsumer kafkaConsumer = (KafkaConsumer) serializedLambda.getCapturedArg(0);
                    List list = (List) serializedLambda.getCapturedArg(1);
                    return () -> {
                        while (true) {
                            ConsumerRecords poll = kafkaConsumer.poll(Duration.ofMillis(10L));
                            if (poll.isEmpty()) {
                                return list;
                            }
                            Iterator it = poll.iterator();
                            while (it.hasNext()) {
                                list.add(Integer.valueOf(Integer.parseInt((String) ((ConsumerRecord) it.next()).value())));
                            }
                        }
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
