package io.debezium.server.nats.streaming;

import io.debezium.server.events.ConnectorCompletedEvent;
import io.debezium.server.events.ConnectorStartedEvent;
import io.debezium.testing.testcontainers.PostgresTestResourceLifecycleManager;
import io.debezium.util.Testing;
import io.nats.streaming.Message;
import io.nats.streaming.MessageHandler;
import io.nats.streaming.NatsStreaming;
import io.nats.streaming.Options;
import io.nats.streaming.StreamingConnection;
import io.nats.streaming.Subscription;
import io.nats.streaming.SubscriptionOptions;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import jakarta.enterprise.event.Observes;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Test;

@QuarkusTest
@QuarkusTestResource.List({@QuarkusTestResource(PostgresTestResourceLifecycleManager.class), @QuarkusTestResource(NatsStreamingTestResourceLifecycleManager.class)})
/* loaded from: input_file:io/debezium/server/nats/streaming/NatsStreamingIT.class */
public class NatsStreamingIT {
    private static final int MESSAGE_COUNT = 4;
    private static final String SUBJECT_NAME = "testc.inventory.customers";
    private static final String CLUSTER_ID = "debezium";
    private static final String CLIENT_ID = "debezium-test";
    protected static StreamingConnection sc;
    protected static Subscription subscription;
    private static final List<Message> messages;

    void setupDependencies(@Observes ConnectorStartedEvent connectorStartedEvent) {
        Testing.Print.enable();
        try {
            sc = NatsStreaming.connect(CLUSTER_ID, CLIENT_ID, new Options.Builder().natsUrl(NatsStreamingTestResourceLifecycleManager.getNatsStreamingContainerUrl()).build());
        } catch (Exception e) {
            Testing.print("Could not connect to NATS Streaming");
        }
        try {
            subscription = sc.subscribe(SUBJECT_NAME, new MessageHandler() { // from class: io.debezium.server.nats.streaming.NatsStreamingIT.1
                public void onMessage(Message message) {
                    NatsStreamingIT.messages.add(message);
                }
            }, new SubscriptionOptions.Builder().deliverAllAvailable().build());
        } catch (Exception e2) {
            Testing.print("Could not register message handler");
        }
    }

    void connectorCompleted(@Observes ConnectorCompletedEvent connectorCompletedEvent) throws Exception {
        if (!connectorCompletedEvent.isSuccess()) {
            throw ((Exception) connectorCompletedEvent.getError().get());
        }
    }

    @AfterAll
    static void stop() throws Exception {
        if (subscription != null) {
            subscription.unsubscribe();
        }
        sc.close();
    }

    @Test
    public void testNatsStreaming() {
        Awaitility.await().atMost(Duration.ofSeconds(NatsStreamingTestConfigSource.waitForSeconds())).until(() -> {
            return Boolean.valueOf(messages.size() >= MESSAGE_COUNT);
        });
        Assertions.assertThat(messages.size()).isGreaterThanOrEqualTo(MESSAGE_COUNT);
    }

    static {
        Testing.Files.delete(NatsStreamingTestConfigSource.OFFSET_STORE_PATH);
        Testing.Files.createTestingFile(NatsStreamingTestConfigSource.OFFSET_STORE_PATH);
        messages = Collections.synchronizedList(new ArrayList());
    }
}
