package com.hazelcast.jet.kafka.impl;

import com.hazelcast.collection.IList;
import com.hazelcast.config.DataConnectionConfig;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.function.FunctionEx;
import com.hazelcast.internal.util.UuidUtil;
import com.hazelcast.jet.Job;
import com.hazelcast.jet.SimpleTestInClusterSupport;
import com.hazelcast.jet.Util;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.config.ProcessingGuarantee;
import com.hazelcast.jet.core.BroadcastKey;
import com.hazelcast.jet.core.EventTimePolicy;
import com.hazelcast.jet.core.JobAssertions;
import com.hazelcast.jet.core.JobStatus;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.Watermark;
import com.hazelcast.jet.core.WatermarkPolicy;
import com.hazelcast.jet.core.test.TestInbox;
import com.hazelcast.jet.core.test.TestOutbox;
import com.hazelcast.jet.core.test.TestProcessorContext;
import com.hazelcast.jet.datamodel.Tuple2;
import com.hazelcast.jet.impl.JobExecutionRecord;
import com.hazelcast.jet.impl.JobRepository;
import com.hazelcast.jet.impl.execution.WatermarkCoalescer;
import com.hazelcast.jet.kafka.KafkaSources;
import com.hazelcast.jet.kafka.TopicsConfig;
import com.hazelcast.jet.pipeline.DataConnectionRef;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.test.annotation.QuickTest;
import java.io.IOException;
import java.io.PrintStream;
import java.lang.invoke.SerializedLambda;
import java.time.Duration;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nonnull;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.assertj.core.api.Assertions;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({QuickTest.class})
/* loaded from: input_file:com/hazelcast/jet/kafka/impl/StreamKafkaPTest.class */
public class StreamKafkaPTest extends SimpleTestInClusterSupport {
    private static final int INITIAL_PARTITION_COUNT = 4;
    private static final long LAG = 3;
    private static KafkaTestSupport kafkaTestSupport;
    private String topic1Name;
    private String topic2Name;
    static final /* synthetic */ boolean $assertionsDisabled;

    @BeforeClass
    public static void beforeClass() throws IOException {
        kafkaTestSupport = KafkaTestSupport.create();
        kafkaTestSupport.createKafkaCluster();
        initialize(2, null);
    }

    @Before
    public void before() {
        this.topic1Name = randomString();
        this.topic2Name = randomString();
        kafkaTestSupport.createTopic(this.topic1Name, INITIAL_PARTITION_COUNT);
        kafkaTestSupport.createTopic(this.topic2Name, INITIAL_PARTITION_COUNT);
    }

    @AfterClass
    public static void afterClass() {
        kafkaTestSupport.shutdownKafkaCluster();
        kafkaTestSupport = null;
    }

    @Test
    public void test_nonExistentTopic() {
        Pipeline create = Pipeline.create();
        create.readFrom(KafkaSources.kafka(properties(), new String[]{"nonExistentTopic"})).withoutTimestamps().writeTo(Sinks.list("test_nonExistentTopic"));
        Job newJob = instance().getJet().newJob(create);
        JobAssertions.assertThat(newJob).eventuallyHasStatus(JobStatus.RUNNING);
        assertTrueAllTheTime(() -> {
            Assert.assertEquals(JobStatus.RUNNING, newJob.getStatus());
        }, LAG);
    }

    @Test
    public void when_projectionFunctionProvided_then_appliedToReadRecords() {
        int i = 20;
        String randomName = randomName();
        Pipeline create = Pipeline.create();
        create.readFrom(KafkaSources.kafka(properties(), consumerRecord -> {
            return ((String) consumerRecord.value()) + "-x";
        }, new String[]{this.topic1Name})).withoutTimestamps().writeTo(Sinks.list(randomName));
        instance().getJet().newJob(create);
        sleepAtLeastSeconds(LAG);
        for (int i2 = 0; i2 < 20; i2++) {
            kafkaTestSupport.produce(this.topic1Name, Integer.valueOf(i2), Integer.toString(i2));
        }
        IList list = instance().getList(randomName);
        assertTrueEventually(() -> {
            Assert.assertEquals(i, list.size());
            for (int i3 = 0; i3 < i; i3++) {
                String str = i3 + "-x";
                Assert.assertTrue("missing entry: " + str, list.contains(str));
            }
        });
    }

    @Test
    public void when_processingGuaranteeAtLeastOnce_then_readFromPartitionsInitialOffsets() throws Exception {
        testWithPartitionsInitialOffsets(ProcessingGuarantee.AT_LEAST_ONCE);
    }

    @Test
    public void when_processingGuaranteeExactlyOnce_then_readFromPartitionsInitialOffsets() throws Exception {
        testWithPartitionsInitialOffsets(ProcessingGuarantee.EXACTLY_ONCE);
    }

    private void testWithPartitionsInitialOffsets(ProcessingGuarantee processingGuarantee) throws Exception {
        String randomName = randomName();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 100; i++) {
            arrayList.add(kafkaTestSupport.produce(this.topic1Name, Integer.valueOf(i), String.valueOf(i)));
            arrayList.add(kafkaTestSupport.produce(this.topic2Name, Integer.valueOf(i), String.valueOf(i)));
        }
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            ((Future) it.next()).get();
        }
        sleepAtLeastSeconds(LAG);
        TopicsConfig addTopicConfig = new TopicsConfig().addTopicConfig(new TopicsConfig.TopicConfig(this.topic1Name).addPartitionInitialOffset(0, 5L).addPartitionInitialOffset(1, 5L).addPartitionInitialOffset(2, 5L).addPartitionInitialOffset(3, 5L)).addTopicConfig(new TopicsConfig.TopicConfig(this.topic2Name).addPartitionInitialOffset(0, 5L).addPartitionInitialOffset(2, 5L));
        Pipeline create = Pipeline.create();
        create.readFrom(KafkaSources.kafka(properties(), consumerRecord -> {
            return Tuple2.tuple2((String) consumerRecord.value(), consumerRecord.topic());
        }, addTopicConfig)).withoutTimestamps().writeTo(Sinks.list(randomName));
        instance().getJet().newJob(create, new JobConfig().setProcessingGuarantee(processingGuarantee));
        sleepAtLeastSeconds(LAG);
        IList list = instance().getList(randomName);
        int i2 = 80 + 90;
        assertTrueEventually(() -> {
            Assert.assertEquals(i2, list.size());
        });
        Map map = (Map) list.stream().collect(Collectors.groupingBy((v0) -> {
            return v0.requiredF1();
        }, Collectors.mapping((v0) -> {
            return v0.f0();
        }, Collectors.toList())));
        Assertions.assertThat(((List) map.get(this.topic1Name)).size()).isEqualTo(80);
        Assertions.assertThat(((List) map.get(this.topic2Name)).size()).isEqualTo(90);
    }

    @Test
    public void when_processingGuaranteeAtLeastOnceAndJobResumedAfterSuspension_then_readFromPartitionsInitialOffsets() {
        testSuspendResumeWithPartitionInitialOffsets(10, ProcessingGuarantee.AT_LEAST_ONCE);
    }

    @Test
    public void when_processingExactlyOnceAndJobResumedAfterSuspension_then_readFromPartitionsInitialOffsets() {
        testSuspendResumeWithPartitionInitialOffsets(20, ProcessingGuarantee.EXACTLY_ONCE);
    }

    private void testSuspendResumeWithPartitionInitialOffsets(int i, ProcessingGuarantee processingGuarantee) {
        String randomName = randomName();
        for (int i2 = 0; i2 < i; i2++) {
            kafkaTestSupport.produce(this.topic1Name, 0, Long.valueOf(System.currentTimeMillis()), Integer.valueOf(i2), String.valueOf(i2));
        }
        TopicsConfig addTopicConfig = new TopicsConfig().addTopicConfig(new TopicsConfig.TopicConfig(this.topic1Name).addPartitionInitialOffset(0, i));
        Pipeline create = Pipeline.create();
        create.readFrom(KafkaSources.kafka(properties(), consumerRecord -> {
            return Tuple2.tuple2((String) consumerRecord.value(), consumerRecord.topic());
        }, addTopicConfig)).withoutTimestamps().writeTo(Sinks.list(randomName));
        Job newJob = instance().getJet().newJob(create, new JobConfig().setProcessingGuarantee(processingGuarantee));
        sleepAtLeastSeconds(LAG);
        assertTrueEventually(() -> {
            Assert.assertEquals(0L, instance().getList(randomName).size());
        }, 5L);
        newJob.suspend();
        JobAssertions.assertThat(newJob).eventuallyHasStatus(JobStatus.SUSPENDED);
        newJob.resume();
        JobAssertions.assertThat(newJob).eventuallyHasStatus(JobStatus.RUNNING);
        for (int i3 = i; i3 < 2 * i; i3++) {
            kafkaTestSupport.produce(this.topic1Name, Integer.valueOf(i3), String.valueOf(i3));
        }
        sleepAtLeastSeconds(LAG);
        assertTrueEventually(() -> {
            Assert.assertEquals(i, instance().getList(randomName).size());
        }, 5L);
    }

    @Test
    public void when_atLeastOnce_then_continueFromLastReadMessageAfterJobRestart() {
        testWithJobRestart(100, new TopicsConfig().addTopicConfig(new TopicsConfig.TopicConfig(this.topic1Name)), ProcessingGuarantee.AT_LEAST_ONCE, 100, 200);
    }

    @Test
    public void when_atLeastOnceWithInitialOffsets_then_continueFromLastReadMessageAfterJobRestart() {
        testWithJobRestart(100, new TopicsConfig().addTopicConfig(new TopicsConfig.TopicConfig(this.topic1Name).addPartitionInitialOffset(0, 5L).addPartitionInitialOffset(1, 5L).addPartitionInitialOffset(2, 5L).addPartitionInitialOffset(3, 5L)), ProcessingGuarantee.AT_LEAST_ONCE, 80, 180);
    }

    private void testWithJobRestart(int i, TopicsConfig topicsConfig, ProcessingGuarantee processingGuarantee, int i2, int i3) {
        testWithJobRestart(i, topicsConfig, processingGuarantee, i2, i3, properties());
    }

    private void testWithJobRestart(int i, TopicsConfig topicsConfig, ProcessingGuarantee processingGuarantee, int i2, int i3, Properties properties) {
        String randomName = randomName();
        for (int i4 = 0; i4 < i; i4++) {
            kafkaTestSupport.produceSync(this.topic1Name, Integer.valueOf(i4), String.valueOf(i4));
        }
        Pipeline create = Pipeline.create();
        create.readFrom(KafkaSources.kafka(properties, (v0) -> {
            return v0.value();
        }, topicsConfig)).withoutTimestamps().writeTo(Sinks.list(randomName));
        Job newJob = instance().getJet().newJob(create, new JobConfig().setProcessingGuarantee(processingGuarantee));
        long eventuallyJobRunning = JobAssertions.assertThat(newJob).eventuallyJobRunning(instance(), (Long) null);
        assertTrueEventually(() -> {
            Assert.assertEquals(i2, instance().getList(randomName).size());
        });
        newJob.restart();
        for (int i5 = i; i5 < i * 2; i5++) {
            kafkaTestSupport.produceSync(this.topic1Name, Integer.valueOf(i5), String.valueOf(i5));
        }
        JobAssertions.assertThat(newJob).eventuallyJobRunning(instance(), Long.valueOf(eventuallyJobRunning));
        assertTrueEventually(() -> {
            Assert.assertEquals(i3, instance().getList(randomName).size());
        });
    }

    @Test
    public void integrationTest_noSnapshotting() throws Exception {
        integrationTest(ProcessingGuarantee.NONE);
    }

    @Test
    public void integrationTest_withSnapshotting() throws Exception {
        integrationTest(ProcessingGuarantee.EXACTLY_ONCE);
    }

    private void integrationTest(ProcessingGuarantee processingGuarantee) throws Exception {
        int i = 20;
        HazelcastInstance[] hazelcastInstanceArr = new HazelcastInstance[2];
        Arrays.setAll(hazelcastInstanceArr, i2 -> {
            return createHazelcastInstance();
        });
        String randomName = randomName();
        Pipeline create = Pipeline.create();
        create.readFrom(KafkaSources.kafka(properties(), new String[]{this.topic1Name, this.topic2Name})).withoutTimestamps().writeTo(Sinks.list(randomName));
        JobConfig jobConfig = new JobConfig();
        jobConfig.setProcessingGuarantee(processingGuarantee);
        jobConfig.setSnapshotIntervalMillis(500L);
        Job newJob = hazelcastInstanceArr[0].getJet().newJob(create, jobConfig);
        sleepSeconds(3);
        for (int i3 = 0; i3 < 20; i3++) {
            kafkaTestSupport.produceSync(this.topic1Name, Integer.valueOf(i3), Integer.toString(i3));
            kafkaTestSupport.produceSync(this.topic2Name, Integer.valueOf(i3 - 20), Integer.toString(i3 - 20));
        }
        IList list = hazelcastInstanceArr[0].getList(randomName);
        assertTrueEventually(() -> {
            Assert.assertEquals(i * 2, list.size());
            for (int i4 = 0; i4 < i; i4++) {
                Map.Entry<Integer, String> createEntry = createEntry(i4);
                Map.Entry<Integer, String> createEntry2 = createEntry(i4 - i);
                Assert.assertTrue("missing entry: " + createEntry, list.contains(createEntry));
                Assert.assertTrue("missing entry: " + createEntry2, list.contains(createEntry2));
            }
        });
        if (processingGuarantee != ProcessingGuarantee.NONE) {
            JobRepository jobRepository = new JobRepository(hazelcastInstanceArr[0]);
            long snapshotId = jobRepository.getJobExecutionRecord(newJob.getId()).snapshotId();
            assertTrueEventually(() -> {
                JobExecutionRecord jobExecutionRecord = jobRepository.getJobExecutionRecord(newJob.getId());
                Assert.assertNotNull("jobExecutionRecord == null", jobExecutionRecord);
                long snapshotId2 = jobExecutionRecord.snapshotId();
                Assert.assertTrue("no snapshot produced", snapshotId2 > snapshotId);
                PrintStream printStream = System.out;
                printStream.println("snapshot " + snapshotId2 + " found, previous was " + printStream);
            });
            hazelcastInstanceArr[1].getLifecycleService().terminate();
            Thread.sleep(500L);
            for (int i4 = 20; i4 < 2 * 20; i4++) {
                kafkaTestSupport.produceSync(this.topic1Name, Integer.valueOf(i4), Integer.toString(i4));
                kafkaTestSupport.produceSync(this.topic2Name, Integer.valueOf(i4 - 20), Integer.toString(i4 - 20));
            }
            assertTrueEventually(() -> {
                Assert.assertTrue("Not all messages were received", list.size() >= i * INITIAL_PARTITION_COUNT);
                for (int i5 = 0; i5 < 2 * i; i5++) {
                    Map.Entry<Integer, String> createEntry = createEntry(i5);
                    Map.Entry<Integer, String> createEntry2 = createEntry(i5 - i);
                    Assert.assertTrue("missing entry: " + createEntry, list.contains(createEntry));
                    Assert.assertTrue("missing entry: " + createEntry2, list.contains(createEntry2));
                }
            });
        }
        Assert.assertFalse(newJob.getFuture().isDone());
        newJob.cancel();
        assertTrueEventually(() -> {
            Assert.assertTrue(newJob.getFuture().isDone());
        });
    }

    @Test
    public void when_eventsInAllPartitions_then_watermarkOutputImmediately() throws Exception {
        StreamKafkaP createProcessor = createProcessor(properties(), 1, consumerRecord -> {
            return Util.entry((Integer) consumerRecord.key(), (String) consumerRecord.value());
        }, 10000L);
        TestOutbox testOutbox = new TestOutbox(new int[]{10}, 10);
        createProcessor.init(testOutbox, new TestProcessorContext());
        for (int i = 0; i < INITIAL_PARTITION_COUNT; i++) {
            Map.Entry entry = Util.entry(Integer.valueOf(i + 100), Integer.toString(i));
            System.out.println("produced event " + entry);
            kafkaTestSupport.produce(this.topic1Name, i, null, entry.getKey(), entry.getValue()).get();
            if (i == 3) {
                Assert.assertEquals(new Watermark(97L), consumeEventually(createProcessor, testOutbox));
            }
            Assert.assertEquals(entry, consumeEventually(createProcessor, testOutbox));
        }
    }

    @Test
    public void when_noAssignedPartitionAndAddedLater_then_resumesFromIdle() throws Exception {
        Object consumeEventually;
        StreamKafkaP createProcessor = createProcessor(properties(), 1, consumerRecord -> {
            return Util.entry((Integer) consumerRecord.key(), (String) consumerRecord.value());
        }, 10000L);
        TestOutbox testOutbox = new TestOutbox(new int[]{10}, 10);
        createProcessor.init(testOutbox, new TestProcessorContext().setTotalParallelism(5).setGlobalProcessorIndex(INITIAL_PARTITION_COUNT));
        Assert.assertTrue(createProcessor.currentAssignment.isEmpty());
        Assert.assertEquals(WatermarkCoalescer.IDLE_MESSAGE, consumeEventually(createProcessor, testOutbox));
        kafkaTestSupport.setPartitionCount(this.topic1Name, 5);
        Map.Entry<Integer, String> produceEventToNewPartition = produceEventToNewPartition();
        do {
            consumeEventually = consumeEventually(createProcessor, testOutbox);
        } while (consumeEventually instanceof Watermark);
        Assert.assertEquals(produceEventToNewPartition, consumeEventually);
    }

    @Test
    public void when_eventsInSinglePartition_then_watermarkAfterIdleTime() throws Exception {
        StreamKafkaP createProcessor = createProcessor(properties(), 2, consumerRecord -> {
            return Util.entry((Integer) consumerRecord.key(), (String) consumerRecord.value());
        }, 10000L);
        TestOutbox testOutbox = new TestOutbox(new int[]{10}, 10);
        createProcessor.init(testOutbox, new TestProcessorContext());
        kafkaTestSupport.produceSync(this.topic1Name, 10, "foo");
        Assert.assertEquals(Util.entry(10, "foo"), consumeEventually(createProcessor, testOutbox));
        long nanoTime = System.nanoTime();
        Assert.assertEquals(new Watermark(7L), consumeEventually(createProcessor, testOutbox));
        assertBetween("elapsed time", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime), 3000L, 30000L);
    }

    @Test
    public void when_snapshotSaved_then_offsetsRestored() throws Exception {
        StreamKafkaP createProcessor = createProcessor(properties(), 2, consumerRecord -> {
            return Util.entry((Integer) consumerRecord.key(), (String) consumerRecord.value());
        }, 10000L);
        TestOutbox testOutbox = new TestOutbox(new int[]{10}, 10);
        createProcessor.init(testOutbox, new TestProcessorContext().setProcessingGuarantee(ProcessingGuarantee.EXACTLY_ONCE));
        kafkaTestSupport.produceSync(this.topic1Name, 0, "0");
        Assert.assertEquals(Util.entry(0, "0"), consumeEventually(createProcessor, testOutbox));
        TestInbox saveSnapshot = saveSnapshot(createProcessor, testOutbox);
        Set<Map.Entry<Object, Object>> unwrapBroadcastKey = unwrapBroadcastKey(saveSnapshot.queue());
        kafkaTestSupport.produceSync(this.topic1Name, 1, "1");
        Assert.assertEquals(Util.entry(1, "1"), consumeEventually(createProcessor, testOutbox));
        StreamKafkaP createProcessor2 = createProcessor(properties(), 2, consumerRecord2 -> {
            return Util.entry((Integer) consumerRecord2.key(), (String) consumerRecord2.value());
        }, 10000L);
        TestOutbox testOutbox2 = new TestOutbox(new int[]{10}, 10);
        createProcessor2.init(testOutbox2, new TestProcessorContext().setProcessingGuarantee(ProcessingGuarantee.EXACTLY_ONCE));
        createProcessor2.restoreFromSnapshot(saveSnapshot);
        Assert.assertTrue("snapshot not fully processed", saveSnapshot.isEmpty());
        Assert.assertEquals("new snapshot not equal after restore", unwrapBroadcastKey, unwrapBroadcastKey(saveSnapshot(createProcessor2, testOutbox2).queue()));
        Assert.assertEquals(Util.entry(1, "1"), consumeEventually(createProcessor2, testOutbox2));
        assertNoMoreItems(createProcessor2, testOutbox2);
    }

    @Test
    public void when_duplicateTopicsProvide_then_uniqueTopicsSubscribed() throws Exception {
        HazelcastInstance[] instances = instances();
        assertClusterSizeEventually(2, instances);
        String randomName = randomName();
        String randomString = randomString();
        kafkaTestSupport.createTopic(randomString, 2);
        Pipeline create = Pipeline.create();
        create.readFrom(KafkaSources.kafka(properties(), new String[]{randomString, randomString})).withoutTimestamps().setLocalParallelism(1).writeTo(Sinks.list(randomName));
        Job newJob = instances[0].getJet().newJob(create, new JobConfig());
        JobAssertions.assertThat(newJob).eventuallyHasStatus(JobStatus.RUNNING);
        int i = 1000;
        Future[] futureArr = new Future[1000];
        for (int i2 = 0; i2 < 1000; i2++) {
            futureArr[i2] = kafkaTestSupport.produce(randomString, Integer.valueOf(i2), Integer.toString(i2));
        }
        for (Future future : futureArr) {
            future.get();
        }
        IList list = instances[0].getList(randomName);
        try {
            assertTrueEventually(() -> {
                Assertions.assertThat(list).hasSize(i);
            });
            assertTrueAllTheTime(() -> {
                Assertions.assertThat(list).hasSize(i);
            }, 1L);
            newJob.cancel();
        } catch (Throwable th) {
            newJob.cancel();
            throw th;
        }
    }

    private <T> StreamKafkaP<Integer, String, T> createProcessor(Properties properties, int i, @Nonnull FunctionEx<ConsumerRecord<Integer, String>, T> functionEx, long j) {
        if ($assertionsDisabled || i == 1 || i == 2) {
            return createProcessor(properties, new TopicsConfig().addTopics(i == 1 ? Collections.singletonList(this.topic1Name) : Arrays.asList(this.topic1Name, this.topic2Name)), functionEx, j);
        }
        throw new AssertionError();
    }

    private <T> StreamKafkaP<Integer, String, T> createProcessor(Properties properties, TopicsConfig topicsConfig, @Nonnull FunctionEx<ConsumerRecord<Integer, String>, T> functionEx, long j) {
        return new StreamKafkaP<>(context -> {
            return new KafkaConsumer(properties);
        }, topicsConfig, functionEx, EventTimePolicy.eventTimePolicy(obj -> {
            return obj instanceof Map.Entry ? ((Integer) ((Map.Entry) obj).getKey()).intValue() : System.currentTimeMillis();
        }, WatermarkPolicy.limitingLag(LAG), 1L, 0L, j));
    }

    @Test
    public void when_partitionAdded_then_consumedFromBeginning() throws Exception {
        Properties properties = properties();
        properties.setProperty("metadata.max.age.ms", "100");
        StreamKafkaP createProcessor = createProcessor(properties, 2, consumerRecord -> {
            return Util.entry((Integer) consumerRecord.key(), (String) consumerRecord.value());
        }, 10000L);
        TestOutbox testOutbox = new TestOutbox(new int[]{10}, 10);
        createProcessor.init(testOutbox, new TestProcessorContext());
        kafkaTestSupport.produceSync(this.topic1Name, 0, "0");
        Assert.assertEquals(Util.entry(0, "0"), consumeEventually(createProcessor, testOutbox));
        kafkaTestSupport.setPartitionCount(this.topic1Name, 6);
        boolean z = false;
        for (int i = 1; i < 11; i++) {
            RecordMetadata recordMetadata = kafkaTestSupport.produce(this.topic1Name, Integer.valueOf(i), Integer.toString(i)).get();
            System.out.println("Entry " + i + " produced to partition " + recordMetadata.partition());
            z |= recordMetadata.partition() == 1;
        }
        Assert.assertTrue("nothing was produced to partition-1", z);
        HashSet hashSet = new HashSet();
        int i2 = 1;
        while (i2 < 11) {
            try {
                Object consumeEventually = consumeEventually(createProcessor, testOutbox);
                if (!(consumeEventually instanceof Watermark)) {
                    hashSet.add(consumeEventually);
                    i2++;
                }
            } catch (AssertionError e) {
                throw new AssertionError("Unable to receive 10 items, events so far: " + hashSet, e);
            }
        }
        Assertions.assertThat(hashSet).containsExactlyInAnyOrderElementsOf((Set) IntStream.range(1, 11).mapToObj(i3 -> {
            return Util.entry(Integer.valueOf(i3), Integer.toString(i3));
        }).collect(Collectors.toSet()));
    }

    @Test
    public void when_partitionAddedWhilePartitionsInitialOffsetsProvided_then_consumedFromBeginning() throws Exception {
        Properties properties = properties();
        properties.setProperty("metadata.max.age.ms", "100");
        StreamKafkaP createProcessor = createProcessor(properties, new TopicsConfig().addTopic(this.topic2Name).addTopicConfig(new TopicsConfig.TopicConfig(this.topic1Name).addPartitionInitialOffset(0, 1L).addPartitionInitialOffset(1, 1L).addPartitionInitialOffset(2, 1L).addPartitionInitialOffset(3, 1L).addPartitionInitialOffset(INITIAL_PARTITION_COUNT, 1L).addPartitionInitialOffset(5, 1L)), consumerRecord -> {
            return Util.entry((Integer) consumerRecord.key(), (String) consumerRecord.value());
        }, 60000L);
        TestOutbox testOutbox = new TestOutbox(new int[]{10}, 10);
        TestProcessorContext testProcessorContext = new TestProcessorContext();
        testProcessorContext.setProcessingGuarantee(ProcessingGuarantee.AT_LEAST_ONCE);
        createProcessor.init(testOutbox, testProcessorContext);
        kafkaTestSupport.produceSync(this.topic1Name, 0, "0");
        kafkaTestSupport.produceSync(this.topic1Name, 1, "1");
        Assert.assertEquals(Util.entry(1, "1"), consumeEventually(createProcessor, testOutbox));
        kafkaTestSupport.setPartitionCount(this.topic1Name, 6);
        boolean z = false;
        for (int i = 2; i < 12; i++) {
            RecordMetadata recordMetadata = kafkaTestSupport.produce(this.topic1Name, Integer.valueOf(i), Integer.toString(i)).get();
            System.out.println("## Entry " + i + " produced to partition " + recordMetadata.partition());
            z |= recordMetadata.partition() == 1;
        }
        Assert.assertTrue("nothing was produced to partition-1", z);
        LinkedHashSet linkedHashSet = new LinkedHashSet();
        int i2 = 2;
        while (i2 < 12) {
            try {
                Object consumeEventually = consumeEventually(createProcessor, testOutbox);
                if (!(consumeEventually instanceof Watermark)) {
                    linkedHashSet.add(consumeEventually);
                    i2++;
                }
            } catch (AssertionError e) {
                throw new AssertionError("Unable to receive 10 items, events so far: " + linkedHashSet, e);
            }
        }
        Assertions.assertThat(linkedHashSet).containsExactlyInAnyOrderElementsOf((Set) IntStream.range(2, 12).mapToObj(i3 -> {
            return Util.entry(Integer.valueOf(i3), Integer.toString(i3));
        }).collect(Collectors.toSet()));
    }

    @Test
    public void when_partitionAddedWhileJobDown_then_consumedFromBeginning() throws Exception {
        IList list = instance().getList(randomName());
        Pipeline create = Pipeline.create();
        Properties properties = properties();
        properties.setProperty("auto.offset.reset", "latest");
        create.readFrom(KafkaSources.kafka(properties, new String[]{this.topic1Name})).withoutTimestamps().writeTo(Sinks.list(list));
        Job newJob = instance().getJet().newJob(create, new JobConfig().setProcessingGuarantee(ProcessingGuarantee.EXACTLY_ONCE));
        assertTrueEventually(() -> {
            kafkaTestSupport.produce(this.topic1Name, 0, "0");
            Assert.assertFalse(list.isEmpty());
            Assert.assertEquals(Util.entry(0, "0"), list.get(0));
        });
        newJob.suspend();
        JobAssertions.assertThat(newJob).eventuallyHasStatus(JobStatus.SUSPENDED);
        kafkaTestSupport.setPartitionCount(this.topic1Name, 6);
        Map.Entry<Integer, String> produceEventToNewPartition = produceEventToNewPartition();
        newJob.resume();
        assertTrueEventually(() -> {
            Assertions.assertThat(list).contains(new Map.Entry[]{produceEventToNewPartition});
        });
    }

    @Test
    public void when_autoOffsetResetLatest_then_doesNotReadOldMessages() {
        IList list = instance().getList(randomName());
        Pipeline create = Pipeline.create();
        Properties properties = properties();
        properties.setProperty("auto.offset.reset", "latest");
        create.readFrom(KafkaSources.kafka(properties, new String[]{this.topic1Name})).withoutTimestamps().writeTo(Sinks.list(list));
        kafkaTestSupport.produceSync(this.topic1Name, 0, "0");
        instance().getJet().newJob(create);
        assertTrueAllTheTime(() -> {
            Assert.assertTrue(list.isEmpty());
        }, 2L);
    }

    @Test
    public void when_autoOffsetResetEarliest_then_startsFromEarliestAfterRestart() throws Exception {
        IList list = instance().getList(randomName());
        Pipeline create = Pipeline.create();
        Properties properties = properties();
        properties.setProperty("auto.offset.reset", "earliest");
        create.readFrom(KafkaSources.kafka(properties, new String[]{this.topic1Name})).withoutTimestamps().writeTo(Sinks.list(list));
        kafkaTestSupport.produceSync(this.topic1Name, 0, "0");
        Job newJob = instance().getJet().newJob(create);
        assertTrueEventually(() -> {
            Assert.assertEquals(Collections.singletonList(Util.entry(0, "0")), list);
        });
        newJob.suspend();
        JobAssertions.assertThat(newJob).eventuallyHasStatus(JobStatus.SUSPENDED);
        kafkaTestSupport.produce(this.topic1Name, 0, "1").get();
        list.clear();
        newJob.resume();
        assertTrueEventually(() -> {
            Assert.assertEquals(Arrays.asList(Util.entry(0, "0"), Util.entry(0, "1")), new ArrayList((Collection) list));
        });
    }

    @Test
    public void when_autoOffsetResetEarliestAndGroupIdSet_then_startsFromCommittedOffsetAfterRestart() throws Exception {
        IList list = instance().getList(randomName());
        Pipeline create = Pipeline.create();
        Properties properties = properties();
        properties.setProperty("auto.offset.reset", "earliest");
        properties.setProperty("group.id", randomString());
        create.readFrom(KafkaSources.kafka(properties, new String[]{this.topic1Name})).withoutTimestamps().writeTo(Sinks.list(list));
        kafkaTestSupport.produceSync(this.topic1Name, 0, "0");
        Job newJob = instance().getJet().newJob(create);
        assertTrueEventually(() -> {
            Assert.assertEquals(Collections.singletonList(Util.entry(0, "0")), list);
        });
        newJob.suspend();
        JobAssertions.assertThat(newJob).eventuallyHasStatus(JobStatus.SUSPENDED);
        kafkaTestSupport.produceSync(this.topic1Name, 0, "1");
        list.clear();
        newJob.resume();
        assertTrueEventually(() -> {
            Assert.assertEquals(Collections.singletonList(Util.entry(0, "1")), new ArrayList((Collection) list));
        });
    }

    @Test
    public void when_noAssignedPartitions_then_emitIdleMsgImmediately() throws Exception {
        StreamKafkaP createProcessor = createProcessor(properties(), 2, consumerRecord -> {
            return Util.entry((Integer) consumerRecord.key(), (String) consumerRecord.value());
        }, 100000L);
        TestOutbox testOutbox = new TestOutbox(new int[]{10}, 10);
        createProcessor.init(testOutbox, new TestProcessorContext().setTotalParallelism(9).setGlobalProcessorIndex(8));
        createProcessor.complete();
        Assert.assertEquals(WatermarkCoalescer.IDLE_MESSAGE, testOutbox.queue(0).poll());
    }

    @Test
    public void when_customProjection_then_used() throws Exception {
        StreamKafkaP createProcessor = createProcessor(properties(), 2, consumerRecord -> {
            return consumerRecord.key() + "=" + ((String) consumerRecord.value());
        }, 10000L);
        TestOutbox testOutbox = new TestOutbox(new int[]{10}, 10);
        createProcessor.init(testOutbox, new TestProcessorContext());
        kafkaTestSupport.produceSync(this.topic1Name, 0, "0");
        Assert.assertEquals("0=0", consumeEventually(createProcessor, testOutbox));
    }

    @Test
    public void when_customProjectionToNull_then_filteredOut() throws Exception {
        StreamKafkaP streamKafkaP = new StreamKafkaP(context -> {
            return new KafkaConsumer(properties());
        }, Collections.singletonList(this.topic1Name), consumerRecord -> {
            if ("0".equals(consumerRecord.value())) {
                return null;
            }
            return (String) consumerRecord.value();
        }, EventTimePolicy.eventTimePolicy(Long::parseLong, WatermarkPolicy.limitingLag(0L), 1L, 0L, 0L));
        TestOutbox testOutbox = new TestOutbox(new int[]{10}, 10);
        streamKafkaP.init(testOutbox, new TestProcessorContext());
        kafkaTestSupport.produceSync(this.topic1Name, 0, "0");
        kafkaTestSupport.produceSync(this.topic1Name, 0, "1");
        assertTrueEventually(() -> {
            Assert.assertFalse(streamKafkaP.complete());
            Assert.assertFalse("no item in outbox", testOutbox.queue(0).isEmpty());
        });
        Assert.assertEquals("1", testOutbox.queue(0).poll());
        Assert.assertNull(testOutbox.queue(0).poll());
    }

    @Test
    public void when_topicDoesNotExist_then_partitionCountGreaterThanZero() {
        KafkaConsumer<Integer, String> createConsumer = kafkaTestSupport.createConsumer("non-existing-topic");
        try {
            assertGreaterOrEquals("partition count", createConsumer.partitionsFor("non-existing-topic", Duration.ofSeconds(2L)).size(), 1L);
            if (createConsumer != null) {
                createConsumer.close();
            }
        } catch (Throwable th) {
            if (createConsumer != null) {
                try {
                    createConsumer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void when_consumerCannotConnect_then_partitionForTimeout() {
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", "127.0.0.1:33333");
        hashMap.put("key.deserializer", ByteArrayDeserializer.class.getName());
        hashMap.put("value.deserializer", ByteArrayDeserializer.class.getName());
        KafkaConsumer kafkaConsumer = new KafkaConsumer(hashMap);
        try {
            Assertions.assertThatThrownBy(() -> {
                kafkaConsumer.partitionsFor("t", Duration.ofMillis(100L));
            }).isInstanceOf(TimeoutException.class);
            kafkaConsumer.close();
        } catch (Throwable th) {
            try {
                kafkaConsumer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void when_dataConnectionRef_then_readMessages() throws Exception {
        instance().getConfig().addDataConnectionConfig(new DataConnectionConfig("kafka-config").setType("Kafka").setShared(false).setProperties(properties()));
        IList list = instance().getList(randomName());
        Pipeline create = Pipeline.create();
        properties().setProperty("auto.offset.reset", "latest");
        create.readFrom(KafkaSources.kafka(new DataConnectionRef("kafka-config"), new String[]{this.topic1Name})).withoutTimestamps().writeTo(Sinks.list(list));
        kafkaTestSupport.produce(this.topic1Name, 0, "0").get();
        instance().getJet().newJob(create);
        assertTrueEventually(() -> {
            Assertions.assertThat(list).contains(new Map.Entry[]{Util.entry(0, "0")});
        });
    }

    private <T> T consumeEventually(Processor processor, TestOutbox testOutbox) {
        assertTrueEventually(() -> {
            Assert.assertFalse(processor.complete());
            Assert.assertFalse("no item in outbox", testOutbox.queue(0).isEmpty());
        });
        return (T) testOutbox.queue(0).poll();
    }

    private void assertNoMoreItems(StreamKafkaP<?, ?, ?> streamKafkaP, TestOutbox testOutbox) throws InterruptedException {
        Thread.sleep(1000L);
        Assert.assertFalse(streamKafkaP.complete());
        Assert.assertTrue("unexpected items in outbox: " + testOutbox.queue(0), testOutbox.queue(0).isEmpty());
    }

    private Set<Map.Entry<Object, Object>> unwrapBroadcastKey(Collection<?> collection) {
        HashSet hashSet = new HashSet();
        Iterator<?> it = collection.iterator();
        while (it.hasNext()) {
            Map.Entry entry = (Map.Entry) it.next();
            hashSet.add(Util.entry(((BroadcastKey) entry.getKey()).key(), entry.getValue() instanceof long[] ? Arrays.toString((long[]) entry.getValue()) : entry.getValue()));
        }
        return hashSet;
    }

    private TestInbox saveSnapshot(StreamKafkaP streamKafkaP, TestOutbox testOutbox) {
        TestInbox testInbox = new TestInbox();
        Assert.assertTrue(streamKafkaP.saveToSnapshot());
        testOutbox.drainSnapshotQueueAndReset(testInbox.queue(), false);
        return testInbox;
    }

    public static Properties properties() {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", kafkaTestSupport.getBrokerConnectionString());
        properties.setProperty("key.deserializer", IntegerDeserializer.class.getCanonicalName());
        properties.setProperty("value.deserializer", StringDeserializer.class.getCanonicalName());
        properties.setProperty("auto.offset.reset", "earliest");
        return properties;
    }

    private static Map.Entry<Integer, String> createEntry(int i) {
        return new AbstractMap.SimpleImmutableEntry(Integer.valueOf(i), Integer.toString(i));
    }

    private Map.Entry<Integer, String> produceEventToNewPartition() throws Exception {
        while (true) {
            String newUnsecureUuidString = UuidUtil.newUnsecureUuidString();
            if (kafkaTestSupport.produce(this.topic1Name, INITIAL_PARTITION_COUNT, null, 0, newUnsecureUuidString).get().partition() == INITIAL_PARTITION_COUNT) {
                return Util.entry(0, newUnsecureUuidString);
            }
            sleepMillis(250);
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1885904347:
                if (implMethodName.equals("lambda$when_eventsInAllPartitions_then_watermarkOutputImmediately$987e3703$1")) {
                    z = 10;
                    break;
                }
                break;
            case -1833319473:
                if (implMethodName.equals("parseLong")) {
                    z = 12;
                    break;
                }
                break;
            case -1737350792:
                if (implMethodName.equals("lambda$when_partitionAddedWhilePartitionsInitialOffsetsProvided_then_consumedFromBeginning$987e3703$1")) {
                    z = 6;
                    break;
                }
                break;
            case -1577677747:
                if (implMethodName.equals("lambda$when_snapshotSaved_then_offsetsRestored$987e3703$1")) {
                    z = 15;
                    break;
                }
                break;
            case -1491437312:
                if (implMethodName.equals("lambda$when_projectionFunctionProvided_then_appliedToReadRecords$a441ef18$1")) {
                    z = 13;
                    break;
                }
                break;
            case -1048669587:
                if (implMethodName.equals("lambda$when_eventsInSinglePartition_then_watermarkAfterIdleTime$987e3703$1")) {
                    z = 11;
                    break;
                }
                break;
            case -1021604172:
                if (implMethodName.equals("lambda$when_noAssignedPartitionAndAddedLater_then_resumesFromIdle$987e3703$1")) {
                    z = 7;
                    break;
                }
                break;
            case -910379466:
                if (implMethodName.equals("lambda$createProcessor$4fe79c1a$1")) {
                    z = 5;
                    break;
                }
                break;
            case -572517327:
                if (implMethodName.equals("lambda$when_snapshotSaved_then_offsetsRestored$a441ef18$1")) {
                    z = 8;
                    break;
                }
                break;
            case -486687235:
                if (implMethodName.equals("lambda$when_customProjection_then_used$987e3703$1")) {
                    z = false;
                    break;
                }
                break;
            case 111972721:
                if (implMethodName.equals("value")) {
                    z = 16;
                    break;
                }
                break;
            case 369195067:
                if (implMethodName.equals("lambda$when_customProjectionToNull_then_filteredOut$987e3703$1")) {
                    z = 14;
                    break;
                }
                break;
            case 369195068:
                if (implMethodName.equals("lambda$when_customProjectionToNull_then_filteredOut$987e3703$2")) {
                    z = 17;
                    break;
                }
                break;
            case 657423870:
                if (implMethodName.equals("lambda$when_noAssignedPartitions_then_emitIdleMsgImmediately$987e3703$1")) {
                    z = 9;
                    break;
                }
                break;
            case 865164887:
                if (implMethodName.equals("lambda$testSuspendResumeWithPartitionInitialOffsets$bb7f2384$1")) {
                    z = INITIAL_PARTITION_COUNT;
                    break;
                }
                break;
            case 866512356:
                if (implMethodName.equals("lambda$createProcessor$a1f6624d$1")) {
                    z = true;
                    break;
                }
                break;
            case 886698427:
                if (implMethodName.equals("lambda$when_partitionAdded_then_consumedFromBeginning$987e3703$1")) {
                    z = 2;
                    break;
                }
                break;
            case 931841391:
                if (implMethodName.equals("lambda$testWithPartitionsInitialOffsets$b6a2dd1d$1")) {
                    z = 3;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/kafka/impl/StreamKafkaPTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/kafka/clients/consumer/ConsumerRecord;)Ljava/lang/String;")) {
                    return consumerRecord -> {
                        return consumerRecord.key() + "=" + ((String) consumerRecord.value());
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/ToLongFunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyAsLongEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)J") && serializedLambda.getImplClass().equals("com/hazelcast/jet/kafka/impl/StreamKafkaPTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)J")) {
                    return obj -> {
                        return obj instanceof Map.Entry ? ((Integer) ((Map.Entry) obj).getKey()).intValue() : System.currentTimeMillis();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/kafka/impl/StreamKafkaPTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/kafka/clients/consumer/ConsumerRecord;)Ljava/util/Map$Entry;")) {
                    return consumerRecord2 -> {
                        return Util.entry((Integer) consumerRecord2.key(), (String) consumerRecord2.value());
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/kafka/impl/StreamKafkaPTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/kafka/clients/consumer/ConsumerRecord;)Lcom/hazelcast/jet/datamodel/Tuple2;")) {
                    return consumerRecord3 -> {
                        return Tuple2.tuple2((String) consumerRecord3.value(), consumerRecord3.topic());
                    };
                }
                break;
            case INITIAL_PARTITION_COUNT /* 4 */:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/kafka/impl/StreamKafkaPTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/kafka/clients/consumer/ConsumerRecord;)Lcom/hazelcast/jet/datamodel/Tuple2;")) {
                    return consumerRecord4 -> {
                        return Tuple2.tuple2((String) consumerRecord4.value(), consumerRecord4.topic());
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/kafka/impl/StreamKafkaPTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/Properties;Lcom/hazelcast/jet/core/Processor$Context;)Lorg/apache/kafka/clients/consumer/Consumer;")) {
                    Properties properties = (Properties) serializedLambda.getCapturedArg(0);
                    return context -> {
                        return new KafkaConsumer(properties);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/kafka/impl/StreamKafkaPTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/kafka/clients/consumer/ConsumerRecord;)Ljava/util/Map$Entry;")) {
                    return consumerRecord5 -> {
                        return Util.entry((Integer) consumerRecord5.key(), (String) consumerRecord5.value());
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/kafka/impl/StreamKafkaPTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/kafka/clients/consumer/ConsumerRecord;)Ljava/util/Map$Entry;")) {
                    return consumerRecord6 -> {
                        return Util.entry((Integer) consumerRecord6.key(), (String) consumerRecord6.value());
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/kafka/impl/StreamKafkaPTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/kafka/clients/consumer/ConsumerRecord;)Ljava/util/Map$Entry;")) {
                    return consumerRecord22 -> {
                        return Util.entry((Integer) consumerRecord22.key(), (String) consumerRecord22.value());
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/kafka/impl/StreamKafkaPTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/kafka/clients/consumer/ConsumerRecord;)Ljava/util/Map$Entry;")) {
                    return consumerRecord7 -> {
                        return Util.entry((Integer) consumerRecord7.key(), (String) consumerRecord7.value());
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/kafka/impl/StreamKafkaPTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/kafka/clients/consumer/ConsumerRecord;)Ljava/util/Map$Entry;")) {
                    return consumerRecord8 -> {
                        return Util.entry((Integer) consumerRecord8.key(), (String) consumerRecord8.value());
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/kafka/impl/StreamKafkaPTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/kafka/clients/consumer/ConsumerRecord;)Ljava/util/Map$Entry;")) {
                    return consumerRecord9 -> {
                        return Util.entry((Integer) consumerRecord9.key(), (String) consumerRecord9.value());
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/ToLongFunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyAsLongEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)J") && serializedLambda.getImplClass().equals("java/lang/Long") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;)J")) {
                    return Long::parseLong;
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/kafka/impl/StreamKafkaPTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/kafka/clients/consumer/ConsumerRecord;)Ljava/lang/String;")) {
                    return consumerRecord10 -> {
                        return ((String) consumerRecord10.value()) + "-x";
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/kafka/impl/StreamKafkaPTest") && serializedLambda.getImplMethodSignature().equals("(Lcom/hazelcast/jet/core/Processor$Context;)Lorg/apache/kafka/clients/consumer/Consumer;")) {
                    return context2 -> {
                        return new KafkaConsumer(properties());
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/kafka/impl/StreamKafkaPTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/kafka/clients/consumer/ConsumerRecord;)Ljava/util/Map$Entry;")) {
                    return consumerRecord11 -> {
                        return Util.entry((Integer) consumerRecord11.key(), (String) consumerRecord11.value());
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/kafka/clients/consumer/ConsumerRecord") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/Object;")) {
                    return (v0) -> {
                        return v0.value();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/kafka/impl/StreamKafkaPTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/kafka/clients/consumer/ConsumerRecord;)Ljava/lang/String;")) {
                    return consumerRecord12 -> {
                        if ("0".equals(consumerRecord12.value())) {
                            return null;
                        }
                        return (String) consumerRecord12.value();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }

    static {
        $assertionsDisabled = !StreamKafkaPTest.class.desiredAssertionStatus();
    }
}
