package com.hazelcast.jet.kafka.impl;

import com.hazelcast.function.FunctionEx;
import com.hazelcast.jet.Job;
import com.hazelcast.jet.core.DAG;
import com.hazelcast.jet.core.EventTimePolicy;
import com.hazelcast.jet.core.JetTestSupport;
import com.hazelcast.jet.core.JobAssertions;
import com.hazelcast.jet.core.JobStatus;
import com.hazelcast.jet.kafka.KafkaProcessors;
import com.hazelcast.test.HazelcastSerialClassRunner;
import com.hazelcast.test.annotation.NightlyTest;
import com.hazelcast.test.annotation.ParallelJVMTest;
import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.CancellationException;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

@RunWith(HazelcastSerialClassRunner.class)
@Category({NightlyTest.class, ParallelJVMTest.class})
/* loaded from: input_file:com/hazelcast/jet/kafka/impl/StreamKafkaP_StandaloneKafkaTest.class */
public class StreamKafkaP_StandaloneKafkaTest extends JetTestSupport {
    @Test
    public void when_cancelledAfterBrokerDown_then_cancelsPromptly() throws IOException {
        KafkaTestSupport create = KafkaTestSupport.create();
        create.createKafkaCluster();
        create.createTopic("topic", 1);
        DAG dag = new DAG();
        dag.newVertex("src", KafkaProcessors.streamKafkaP(getProperties(create.getBrokerConnectionString()), FunctionEx.identity(), EventTimePolicy.noEventTime(), new String[]{"topic"})).localParallelism(1);
        Job newJob = createHazelcastInstance().getJet().newJob(dag);
        JobAssertions.assertThat(newJob).eventuallyHasStatus(JobStatus.RUNNING);
        sleepSeconds(1);
        create.shutdownKafkaCluster();
        sleepSeconds(3);
        long nanoTime = System.nanoTime();
        newJob.cancel();
        try {
            newJob.join();
        } catch (CancellationException e) {
        }
        long seconds = TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - nanoTime);
        Assert.assertTrue("durationSeconds=" + seconds, seconds < 10);
        this.logger.info("Job cancelled in " + seconds + " seconds");
    }

    public static Properties getProperties(String str) {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", str);
        properties.setProperty("key.deserializer", IntegerDeserializer.class.getCanonicalName());
        properties.setProperty("value.deserializer", StringDeserializer.class.getCanonicalName());
        properties.setProperty("auto.offset.reset", "earliest");
        return properties;
    }
}
