package com.hazelcast.jet.cdc;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.hazelcast.collection.IList;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.jet.JetException;
import com.hazelcast.jet.Job;
import com.hazelcast.jet.accumulator.LongAccumulator;
import com.hazelcast.jet.core.JobAssertions;
import com.hazelcast.jet.core.JobStatus;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.StreamSource;
import com.hazelcast.test.HazelcastSerialClassRunner;
import com.hazelcast.test.annotation.NightlyTest;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import io.debezium.connector.mongodb.MongoDbConnector;
import io.debezium.connector.mysql.MySqlConnector;
import io.debezium.connector.postgresql.PostgresConnector;
import java.lang.invoke.SerializedLambda;
import java.sql.Connection;
import java.sql.Statement;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import javax.annotation.Nonnull;
import org.assertj.core.api.Assertions;
import org.bson.Document;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.testcontainers.containers.MongoDBContainer;
import org.testcontainers.containers.MySQLContainer;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.utility.DockerImageName;

@RunWith(HazelcastSerialClassRunner.class)
@Category({NightlyTest.class})
/* loaded from: input_file:com/hazelcast/jet/cdc/DebeziumCdcIntegrationTest.class */
public class DebeziumCdcIntegrationTest extends AbstractCdcIntegrationTest {
    private static final DockerImageName MYSQL_IMAGE = DockerImageName.parse("debezium/example-mysql:2.3.0.Final").asCompatibleSubstituteFor("mysql");
    private static final DockerImageName POSTGRES_IMAGE = DockerImageName.parse("debezium/example-postgres:2.3.0.Final").asCompatibleSubstituteFor("postgres");
    private static final DockerImageName MONGODB_IMAGE = DockerImageName.parse("mongo:6.0.3").asCompatibleSubstituteFor("mongodb");

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/hazelcast/jet/cdc/DebeziumCdcIntegrationTest$Customer.class */
    public static class Customer {

        @JsonProperty("id")
        public int id;

        @JsonProperty("first_name")
        public String firstName;

        @JsonProperty("last_name")
        public String lastName;

        @JsonProperty("email")
        public String email;

        Customer() {
        }

        public int hashCode() {
            return Objects.hash(this.email, this.firstName, Integer.valueOf(this.id), this.lastName);
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            Customer customer = (Customer) obj;
            return this.id == customer.id && Objects.equals(this.firstName, customer.firstName) && Objects.equals(this.lastName, customer.lastName) && Objects.equals(this.email, customer.email);
        }

        public String toString() {
            return "Customer {id=" + this.id + ", firstName=" + this.firstName + ", lastName=" + this.lastName + ", email=" + this.email + "}";
        }
    }

    @Test
    public void mysql() throws Exception {
        MySQLContainer<?> mySqlContainer = mySqlContainer();
        try {
            mySqlContainer.start();
            List asList = Arrays.asList("SYNC:Customer {id=1001, firstName=Sally, lastName=Thomas, email=sally.thomas@acme.com}", "SYNC:Customer {id=1002, firstName=George, lastName=Bailey, email=gbailey@foobar.com}", "SYNC:Customer {id=1003, firstName=Edward, lastName=Walker, email=ed@walker.com}", "SYNC:Customer {id=1004, firstName=Anne, lastName=Kretchmar, email=annek@noanswer.org}", "UPDATE:Customer {id=1004, firstName=Anne Marie, lastName=Kretchmar, email=annek@noanswer.org}", "INSERT:Customer {id=1005, firstName=Jason, lastName=Bourne, email=jason@bourne.org}", "DELETE:Customer {id=1005, firstName=Jason, lastName=Bourne, email=jason@bourne.org}");
            StreamSource<ChangeRecord> mySqlSource = mySqlSource(mySqlContainer);
            Pipeline create = Pipeline.create();
            create.readFrom(mySqlSource).withNativeTimestamps(0L).customTransform("filter_timestamps", filterTimestampsProcessorSupplier()).filter(changeRecord -> {
                return changeRecord.operation() != Operation.UNSPECIFIED;
            }).map(changeRecord2 -> {
                return changeRecord2.operation() + ":" + ((Customer) changeRecord2.value().toObject(Customer.class));
            }).writeTo(Sinks.list("results"));
            create.setPreserveOrder(true);
            HazelcastInstance hazelcastInstance = createHazelcastInstances(2)[0];
            Job newJob = hazelcastInstance.getJet().newJob(create);
            assertEqualsEventually(() -> {
                return Integer.valueOf(hazelcastInstance.getList("results").size());
            }, 4);
            Connection mySqlConnection = MySQLTestUtils.getMySqlConnection(mySqlContainer.withDatabaseName("inventory").getJdbcUrl(), mySqlContainer.getUsername(), mySqlContainer.getPassword());
            try {
                Statement createStatement = mySqlConnection.createStatement();
                try {
                    createStatement.addBatch("UPDATE customers SET first_name='Anne Marie' WHERE id=1004");
                    createStatement.addBatch("INSERT INTO customers VALUES (1005, 'Jason', 'Bourne', 'jason@bourne.org')");
                    createStatement.addBatch("DELETE FROM customers WHERE id=1005");
                    createStatement.executeBatch();
                    if (createStatement != null) {
                        createStatement.close();
                    }
                    if (mySqlConnection != null) {
                        mySqlConnection.close();
                    }
                    try {
                        assertEqualsEventually(() -> {
                            return String.join("\n", (Iterable<? extends CharSequence>) hazelcastInstance.getList("results"));
                        }, String.join("\n", asList));
                        newJob.cancel();
                        JobAssertions.assertThat(newJob).eventuallyHasStatus(JobStatus.FAILED);
                        if (mySqlContainer != null) {
                            mySqlContainer.close();
                        }
                    } catch (Throwable th) {
                        newJob.cancel();
                        JobAssertions.assertThat(newJob).eventuallyHasStatus(JobStatus.FAILED);
                        throw th;
                    }
                } catch (Throwable th2) {
                    if (createStatement != null) {
                        try {
                            createStatement.close();
                        } catch (Throwable th3) {
                            th2.addSuppressed(th3);
                        }
                    }
                    throw th2;
                }
            } finally {
            }
        } catch (Throwable th4) {
            if (mySqlContainer != null) {
                try {
                    mySqlContainer.close();
                } catch (Throwable th5) {
                    th4.addSuppressed(th5);
                }
            }
            throw th4;
        }
    }

    @Nonnull
    private StreamSource<ChangeRecord> mySqlSource(MySQLContainer<?> mySQLContainer) {
        return DebeziumCdcSources.debezium("mysql", MySqlConnector.class).setProperty("include.schema.changes", "true").setProperty("database.hostname", mySQLContainer.getHost()).setProperty("database.port", Integer.toString(mySQLContainer.getMappedPort(MySQLContainer.MYSQL_PORT.intValue()).intValue())).setProperty("database.user", "debezium").setProperty("database.password", "dbz").setProperty("database.server.id", "184054").setProperty("database.server.name", "dbserver1").setProperty("database.whitelist", "inventory").setProperty("table.whitelist", "inventory.customers").build();
    }

    @Test
    public void mysql_simpleJson() {
        MySQLContainer<?> mySqlContainer = mySqlContainer();
        try {
            mySqlContainer.start();
            List asList = Arrays.asList("\\{\"id\":1001}:\\{\"before\":null,\"after\":\\{\"id\":1001,\"first_name\":\"Sally\",\"last_name\":\"Thomas\",\"email\":\"sally.thomas@acme.com\"},\"source\":\\{\"version\":\"[\\w\\d\\.]*\",\"connector\":\"mysql\",\"name\":\"dbserver1\",\"ts_ms\":[0-9]*,\"snapshot\":\"true\",\"db\":\"inventory\",\"sequence\":null,\"table\":\"customers\",\"server_id\":0,\"gtid\":null,\"file\":\"mysql-bin.000003\",\"pos\":157,\"row\":0,\"thread\":null,\"query\":null},\"op\":\"r\",\"ts_ms\":[0-9]*,\"transaction\":null}", "\\{\"id\":1002}:\\{\"before\":null,\"after\":\\{\"id\":1002,\"first_name\":\"George\",\"last_name\":\"Bailey\",\"email\":\"gbailey@foobar.com\"},\"source\":\\{\"version\":\"[\\w\\d\\.]*\",\"connector\":\"mysql\",\"name\":\"dbserver1\",\"ts_ms\":[0-§9]*,\"snapshot\":\"true\",\"db\":\"inventory\",\"sequence\":null,\"table\":\"customers\",\"server_id\":0,\"gtid\":null,\"file\":\"mysql-bin.000003\",\"pos\":157,\"row\":0,\"thread\":null,\"query\":null},\"op\":\"r\",\"ts_ms\":[0-9]*,\"transaction\":null}", "\\{\"id\":1003}:\\{\"before\":null,\"after\":\\{\"id\":1003,\"first_name\":\"Edward\",\"last_name\":\"Walker\",\"email\":\"ed@walker.com\"},\"source\":\\{\"version\":\"[\\w\\d\\.]*\",\"connector\":\"mysql\",\"name\":\"dbserver1\",\"ts_ms\":[0-9]*,\"snapshot\":\"true\",\"db\":\"inventory\",\"sequence\":null,\"table\":\"customers\",\"server_id\":0,\"gtid\":null,\"file\":\"mysql-bin.000003\",\"pos\":157,\"row\":0,\"thread\":null,\"query\":null},\"op\":\"r\",\"ts_ms\":[0-9]*,\"transaction\":null}", "\\{\"id\":1004}:\\{\"before\":null,\"after\":\\{\"id\":1004,\"first_name\":\"Anne\",\"last_name\":\"Kretchmar\",\"email\":\"annek@noanswer.org\"},\"source\":\\{\"version\":\"[\\w\\d\\.]*\",\"connector\":\"mysql\",\"name\":\"dbserver1\",\"ts_ms\":[0-9]*,\"snapshot\":\"last\",\"db\":\"inventory\",\"sequence\":null,\"table\":\"customers\",\"server_id\":0,\"gtid\":null,\"file\":\"mysql-bin.000003\",\"pos\":157,\"row\":0,\"thread\":null,\"query\":null},\"op\":\"r\",\"ts_ms\":[0-9]*,\"transaction\":null}");
            StreamSource build = DebeziumCdcSources.debeziumJson("mysql", MySqlConnector.class).setProperty("include.schema.changes", "false").setProperty("database.hostname", mySqlContainer.getHost()).setProperty("database.port", Integer.toString(mySqlContainer.getMappedPort(MySQLContainer.MYSQL_PORT.intValue()).intValue())).setProperty("database.user", "debezium").setProperty("database.password", "dbz").setProperty("database.server.id", "184054").setProperty("database.server.name", "dbserver1").setProperty("database.whitelist", "inventory").setProperty("table.whitelist", "inventory.customers").build();
            Pipeline create = Pipeline.create();
            create.readFrom(build).withNativeTimestamps(0L).writeTo(Sinks.map("results"));
            HazelcastInstance hazelcastInstance = createHazelcastInstances(2)[0];
            Job newJob = hazelcastInstance.getJet().newJob(create);
            try {
                assertTrueEventually(() -> {
                    assertMatch(asList, mapResultsToSortedList(hazelcastInstance.getMap("results")));
                });
                newJob.cancel();
                JobAssertions.assertThat(newJob).eventuallyHasStatus(JobStatus.FAILED);
                if (mySqlContainer != null) {
                    mySqlContainer.close();
                }
            } catch (Throwable th) {
                newJob.cancel();
                JobAssertions.assertThat(newJob).eventuallyHasStatus(JobStatus.FAILED);
                throw th;
            }
        } catch (Throwable th2) {
            if (mySqlContainer != null) {
                try {
                    mySqlContainer.close();
                } catch (Throwable th3) {
                    th2.addSuppressed(th3);
                }
            }
            throw th2;
        }
    }

    private MySQLContainer<?> mySqlContainer() {
        return (MySQLContainer) namedTestContainer(new MySQLContainer(MYSQL_IMAGE).withUsername("mysqluser").withPassword("mysqlpw"));
    }

    @Test
    public void postgres() throws Exception {
        PostgreSQLContainer<?> postgresContainer = postgresContainer();
        try {
            postgresContainer.start();
            List asList = Arrays.asList("SYNC:Customer {id=1001, firstName=Sally, lastName=Thomas, email=sally.thomas@acme.com}", "SYNC:Customer {id=1002, firstName=George, lastName=Bailey, email=gbailey@foobar.com}", "SYNC:Customer {id=1003, firstName=Edward, lastName=Walker, email=ed@walker.com}", "SYNC:Customer {id=1004, firstName=Anne, lastName=Kretchmar, email=annek@noanswer.org}", "UPDATE:Customer {id=1004, firstName=Anne Marie, lastName=Kretchmar, email=annek@noanswer.org}", "INSERT:Customer {id=1005, firstName=Jason, lastName=Bourne, email=jason@bourne.org}", "DELETE:Customer {id=1005, firstName=Jason, lastName=Bourne, email=jason@bourne.org}");
            StreamSource build = DebeziumCdcSources.debezium("postgres", "io.debezium.connector.postgresql.PostgresConnector").setProperty("database.server.name", "dbserver1").setProperty("database.hostname", postgresContainer.getHost()).setProperty("database.port", Integer.toString(postgresContainer.getMappedPort(PostgreSQLContainer.POSTGRESQL_PORT.intValue()).intValue())).setProperty("database.user", "postgres").setProperty("database.password", "postgres").setProperty("database.dbname", "postgres").setProperty("table.whitelist", "inventory.customers").setProperty("heartbeat.interval.ms", "1000").build();
            Pipeline create = Pipeline.create();
            create.readFrom(build).withNativeTimestamps(0L).filter(changeRecord -> {
                return changeRecord.operation() != Operation.UNSPECIFIED;
            }).customTransform("filter_timestamps", filterTimestampsProcessorSupplier()).map(changeRecord2 -> {
                return changeRecord2.operation() + ":" + ((Customer) changeRecord2.value().toObject(Customer.class));
            }).setLocalParallelism(1).writeTo(Sinks.list("results"));
            create.setPreserveOrder(true);
            HazelcastInstance hazelcastInstance = createHazelcastInstances(2)[0];
            Job newJob = hazelcastInstance.getJet().newJob(create);
            assertEqualsEventually(() -> {
                return Integer.valueOf(hazelcastInstance.getList("results").size());
            }, 4);
            Connection postgreSqlConnection = PostgresTestUtils.getPostgreSqlConnection(postgresContainer.getJdbcUrl(), postgresContainer.getUsername(), postgresContainer.getPassword());
            try {
                postgreSqlConnection.setSchema("inventory");
                Statement createStatement = postgreSqlConnection.createStatement();
                try {
                    createStatement.addBatch("UPDATE customers SET first_name='Anne Marie' WHERE id=1004");
                    createStatement.addBatch("INSERT INTO customers VALUES (1005, 'Jason', 'Bourne', 'jason@bourne.org')");
                    createStatement.addBatch("DELETE FROM customers WHERE id=1005");
                    createStatement.executeBatch();
                    if (createStatement != null) {
                        createStatement.close();
                    }
                    if (postgreSqlConnection != null) {
                        postgreSqlConnection.close();
                    }
                    try {
                        assertEqualsEventually(() -> {
                            return String.join("\n", (Iterable<? extends CharSequence>) hazelcastInstance.getList("results"));
                        }, String.join("\n", asList));
                        newJob.cancel();
                        JobAssertions.assertThat(newJob).eventuallyHasStatus(JobStatus.FAILED);
                        if (postgresContainer != null) {
                            postgresContainer.close();
                        }
                    } catch (Throwable th) {
                        newJob.cancel();
                        JobAssertions.assertThat(newJob).eventuallyHasStatus(JobStatus.FAILED);
                        throw th;
                    }
                } catch (Throwable th2) {
                    if (createStatement != null) {
                        try {
                            createStatement.close();
                        } catch (Throwable th3) {
                            th2.addSuppressed(th3);
                        }
                    }
                    throw th2;
                }
            } finally {
            }
        } catch (Throwable th4) {
            if (postgresContainer != null) {
                try {
                    postgresContainer.close();
                } catch (Throwable th5) {
                    th4.addSuppressed(th5);
                }
            }
            throw th4;
        }
    }

    @Test
    public void postgres_simpleJson() {
        PostgreSQLContainer<?> postgresContainer = postgresContainer();
        try {
            postgresContainer.start();
            List asList = Arrays.asList("\\{\"id\":1001}:\\{\"before\":null,\"after\":\\{\"id\":1001,\"first_name\":\"Sally\",\"last_name\":\"Thomas\",\"email\":\"sally.thomas@acme.com\"},\"source\":\\{\"version\":\"[\\w\\d\\.]*\",\"connector\":\"postgresql\",\"name\":\"dbserver1\",\"ts_ms\":[0-9]*,\"snapshot\":\"true\",\"db\":\"postgres\",\"sequence\":\"\\[.*]\",\"schema\":\"inventory\",\"table\":\"customers\",\"txId\":[0-9]*,\"lsn\":[0-9]*,\"xmin\":null},\"op\":\"r\",\"ts_ms\":[0-9]*,\"transaction\":null}", "\\{\"id\":1002}:\\{\"before\":null,\"after\":\\{\"id\":1002,\"first_name\":\"George\",\"last_name\":\"Bailey\",\"email\":\"gbailey@foobar.com\"},\"source\":\\{\"version\":\"[\\w\\d\\.]*\",\"connector\":\"postgresql\",\"name\":\"dbserver1\",\"ts_ms\":[0-9]*,\"snapshot\":\"true\",\"db\":\"postgres\",\"sequence\":\"\\[.*]\",\"schema\":\"inventory\",\"table\":\"customers\",\"txId\":[0-9]*,\"lsn\":[0-9]*,\"xmin\":null},\"op\":\"r\",\"ts_ms\":[0-9]*,\"transaction\":null}", "\\{\"id\":1003}:\\{\"before\":null,\"after\":\\{\"id\":1003,\"first_name\":\"Edward\",\"last_name\":\"Walker\",\"email\":\"ed@walker.com\"},\"source\":\\{\"version\":\"[\\w\\d\\.]*\",\"connector\":\"postgresql\",\"name\":\"dbserver1\",\"ts_ms\":[0-9]*,\"snapshot\":\"true\",\"db\":\"postgres\",\"sequence\":\"\\[.*]\",\"schema\":\"inventory\",\"table\":\"customers\",\"txId\":[0-9]*,\"lsn\":[0-9]*,\"xmin\":null},\"op\":\"r\",\"ts_ms\":[0-9]*,\"transaction\":null}", "\\{\"id\":1004}:\\{\"before\":null,\"after\":\\{\"id\":1004,\"first_name\":\"Anne\",\"last_name\":\"Kretchmar\",\"email\":\"annek@noanswer.org\"},\"source\":\\{\"version\":\"[\\w\\d\\.]*\",\"connector\":\"postgresql\",\"name\":\"dbserver1\",\"ts_ms\":[0-9]*,\"snapshot\":\"last\",\"db\":\"postgres\",\"sequence\":\"\\[.*]\",\"schema\":\"inventory\",\"table\":\"customers\",\"txId\":[0-9]*,\"lsn\":[0-9]*,\"xmin\":null},\"op\":\"r\",\"ts_ms\":[0-9]*,\"transaction\":null}");
            StreamSource build = DebeziumCdcSources.debeziumJson("postgres", "io.debezium.connector.postgresql.PostgresConnector").setProperty("database.server.name", "dbserver1").setProperty("database.hostname", postgresContainer.getHost()).setProperty("database.port", Integer.toString(postgresContainer.getMappedPort(PostgreSQLContainer.POSTGRESQL_PORT.intValue()).intValue())).setProperty("database.user", "postgres").setProperty("database.password", "postgres").setProperty("database.dbname", "postgres").setProperty("table.whitelist", "inventory.customers").build();
            Pipeline create = Pipeline.create();
            create.readFrom(build).withNativeTimestamps(0L).writeTo(Sinks.map("results"));
            HazelcastInstance hazelcastInstance = createHazelcastInstances(2)[0];
            Job newJob = hazelcastInstance.getJet().newJob(create);
            try {
                assertTrueEventually(() -> {
                    assertMatch(asList, mapResultsToSortedList(hazelcastInstance.getMap("results")));
                });
                newJob.cancel();
                JobAssertions.assertThat(newJob).eventuallyHasStatus(JobStatus.FAILED);
                if (postgresContainer != null) {
                    postgresContainer.close();
                }
            } catch (Throwable th) {
                newJob.cancel();
                JobAssertions.assertThat(newJob).eventuallyHasStatus(JobStatus.FAILED);
                throw th;
            }
        } catch (Throwable th2) {
            if (postgresContainer != null) {
                try {
                    postgresContainer.close();
                } catch (Throwable th3) {
                    th2.addSuppressed(th3);
                }
            }
            throw th2;
        }
    }

    private PostgreSQLContainer<?> postgresContainer() {
        return (PostgreSQLContainer) namedTestContainer(new PostgreSQLContainer(POSTGRES_IMAGE).withDatabaseName("postgres").withUsername("postgres").withPassword("postgres"));
    }

    @Test
    public void invalidConnectorClass() {
        StreamSource build = DebeziumCdcSources.debeziumJson("connector", "io.debezium.connector.xxx.BlaBlaBla").build();
        Pipeline create = Pipeline.create();
        create.readFrom(build).withNativeTimestamps(0L).writeTo(Sinks.noop());
        Job newJob = createHazelcastInstances(2)[0].getJet().newJob(create);
        Objects.requireNonNull(newJob);
        Assertions.assertThatThrownBy(newJob::join).hasRootCauseInstanceOf(JetException.class).hasStackTraceContaining("connector class io.debezium.connector.xxx.BlaBlaBla not found");
    }

    @Test
    public void notFailWhenOldValueNotPresent() {
        MySQLContainer<?> mySqlContainer = mySqlContainer();
        try {
            mySqlContainer.start();
            Pipeline create = Pipeline.create();
            create.readFrom(mySqlSource(mySqlContainer)).withNativeTimestamps(1L).setLocalParallelism(1).groupingKey(changeRecord -> {
                return changeRecord;
            }).mapStateful(LongAccumulator::new, (longAccumulator, changeRecord2, changeRecord3) -> {
                longAccumulator.add(1L);
                return Long.valueOf(longAccumulator.get());
            }).peek().writeTo(Sinks.list("notFailWhenOldValueNotPresent"));
            HazelcastInstance hazelcastInstance = createHazelcastInstances(1)[0];
            hazelcastInstance.getJet().newJob(create);
            assertTrueEventually(() -> {
                Assertions.assertThat(hazelcastInstance.getList("notFailWhenOldValueNotPresent")).isNotEmpty();
            });
            if (mySqlContainer != null) {
                mySqlContainer.close();
            }
        } catch (Throwable th) {
            if (mySqlContainer != null) {
                try {
                    mySqlContainer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void noFailWhenNoPrimaryKey() throws Exception {
        PostgreSQLContainer<?> postgresContainer = postgresContainer();
        try {
            postgresContainer.start();
            Connection postgreSqlConnection = PostgresTestUtils.getPostgreSqlConnection(postgresContainer.getJdbcUrl(), postgresContainer.getUsername(), postgresContainer.getPassword());
            try {
                postgreSqlConnection.setSchema("inventory");
                Statement createStatement = postgreSqlConnection.createStatement();
                createStatement.addBatch("CREATE TABLE NO_PK (SOME_INT INT);");
                createStatement.addBatch("INSERT INTO NO_PK VALUES (1)");
                createStatement.executeBatch();
                if (postgreSqlConnection != null) {
                    postgreSqlConnection.close();
                }
                Pipeline create = Pipeline.create();
                create.readFrom(DebeziumCdcSources.debezium("postgres", PostgresConnector.class).setProperty("database.server.name", "dbserver1").setProperty("database.hostname", postgresContainer.getHost()).setProperty("database.port", Integer.toString(postgresContainer.getMappedPort(PostgreSQLContainer.POSTGRESQL_PORT.intValue()).intValue())).setProperty("database.user", "postgres").setProperty("database.password", "postgres").setProperty("database.dbname", "postgres").setProperty("table.whitelist", "inventory.no_pk").build()).withNativeTimestamps(1L).writeTo(Sinks.list("no_pk"));
                HazelcastInstance hazelcastInstance = createHazelcastInstances(1)[0];
                hazelcastInstance.getJet().newJob(create);
                assertTrueEventually(() -> {
                    Assertions.assertThat(hazelcastInstance.getList("no_pk")).isNotEmpty();
                });
                if (postgresContainer != null) {
                    postgresContainer.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (postgresContainer != null) {
                try {
                    postgresContainer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void nullIsNotValidOperationId() {
        MySQLContainer<?> mySqlContainer = mySqlContainer();
        try {
            mySqlContainer.start();
            HazelcastInstance createHazelcastInstance = createHazelcastInstance();
            IList list = createHazelcastInstance.getList("nullIsNotValidOperationId");
            StreamSource<ChangeRecord> mySqlSource = mySqlSource(mySqlContainer);
            Pipeline create = Pipeline.create();
            create.readFrom(mySqlSource).withIngestionTimestamps().setLocalParallelism(1).writeTo(Sinks.list(list));
            JobAssertions.assertThat(createHazelcastInstance.getJet().newJob(create)).eventuallyHasStatus(JobStatus.RUNNING);
            assertTrueEventually(() -> {
                this.logger.info(String.format("List size: %s", Integer.valueOf(list.size())));
                Assertions.assertThat(list).as("nullIsNotValidOperationId", new Object[0]).isNotEmpty();
                this.logger.info(((ChangeRecord) list.get(0)).toString());
            });
            if (mySqlContainer != null) {
                mySqlContainer.close();
            }
        } catch (Throwable th) {
            if (mySqlContainer != null) {
                try {
                    mySqlContainer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void schemaChangesArePassed() {
        MySQLContainer<?> mySqlContainer = mySqlContainer();
        try {
            mySqlContainer.start();
            HazelcastInstance createHazelcastInstance = createHazelcastInstance();
            IList list = createHazelcastInstance.getList("schemaChangesArePassed");
            StreamSource<ChangeRecord> mySqlSource = mySqlSource(mySqlContainer);
            Pipeline create = Pipeline.create();
            create.readFrom(mySqlSource).withIngestionTimestamps().setLocalParallelism(1).writeTo(Sinks.list(list));
            JobAssertions.assertThat(createHazelcastInstance.getJet().newJob(create)).eventuallyHasStatus(JobStatus.RUNNING);
            MySQLTestUtils.runQuery(mySqlContainer, "CREATE TABLE `inventory`.`tableForSchemaChangesArePassed` (id INT)");
            try {
                assertTrueEventually(() -> {
                    this.logger.info(String.format("List size: %s", Integer.valueOf(list.size())));
                    Assertions.assertThat(list).as("schemaChangesArePassed", new Object[0]).isNotEmpty();
                    Assertions.assertThat(list).anyMatch(changeRecord -> {
                        String json = changeRecord.value().toJson();
                        return json.contains("CREATE TABLE") && json.contains("tableForSchemaChangesArePassed");
                    });
                });
                MySQLTestUtils.runQuery(mySqlContainer, "DROP TABLE IF EXISTS inventory.tableForSchemaChangesArePassed");
                if (mySqlContainer != null) {
                    mySqlContainer.close();
                }
            } catch (Throwable th) {
                MySQLTestUtils.runQuery(mySqlContainer, "DROP TABLE IF EXISTS inventory.tableForSchemaChangesArePassed");
                throw th;
            }
        } catch (Throwable th2) {
            if (mySqlContainer != null) {
                try {
                    mySqlContainer.close();
                } catch (Throwable th3) {
                    th2.addSuppressed(th3);
                }
            }
            throw th2;
        }
    }

    @Test
    public void noFailWhenBeforeIsNotPresent() {
        MongoDBContainer withNetworkAliases = new MongoDBContainer(MONGODB_IMAGE).withExposedPorts(new Integer[]{27017}).withNetworkAliases(new String[]{"mongo"});
        try {
            withNetworkAliases.start();
            MongoClient create = MongoClients.create(withNetworkAliases.getConnectionString());
            try {
                HazelcastInstance createHazelcastInstance = createHazelcastInstance();
                IList list = createHazelcastInstance.getList("noFailWhenBeforeIsNotPresent");
                create.getDatabase("test").getCollection("test").insertOne(new Document("test", "test"));
                StreamSource build = DebeziumCdcSources.debezium("mongo", MongoDbConnector.class).setProperty("mongodb.hosts", withNetworkAliases.getHost() + ":" + withNetworkAliases.getMappedPort(27017)).setProperty("mongodb.members.auto.discover", "false").setProperty("mongodb.name", "test").setProperty("topic.prefix", "customer").setProperty("snapshot.mode", "initial").setProperty("connect.keep.alive", "true").setProperty("connect.keep.alive.interval.ms", "1000").setProperty("capture.mode", "change_streams_update_full").build();
                Pipeline create2 = Pipeline.create();
                create2.readFrom(build).withIngestionTimestamps().setLocalParallelism(1).writeTo(Sinks.list(list));
                JobAssertions.assertThat(createHazelcastInstance.getJet().newJob(create2)).eventuallyHasStatus(JobStatus.RUNNING);
                create.getDatabase("test").getCollection("test").insertOne(new Document("test", "test"));
                assertTrueEventually(() -> {
                    Assertions.assertThat(list).as("Should receive record without exception", new Object[0]).isNotEmpty();
                });
                if (create != null) {
                    create.close();
                }
                if (withNetworkAliases != null) {
                    withNetworkAliases.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (withNetworkAliases != null) {
                try {
                    withNetworkAliases.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -2090933436:
                if (implMethodName.equals("lambda$mysql$a441ef18$1")) {
                    z = 4;
                    break;
                }
                break;
            case -1437761111:
                if (implMethodName.equals("lambda$notFailWhenOldValueNotPresent$a441ef18$1")) {
                    z = false;
                    break;
                }
                break;
            case -389224757:
                if (implMethodName.equals("lambda$postgres$a45a00b3$1")) {
                    z = true;
                    break;
                }
                break;
            case -216778637:
                if (implMethodName.equals("lambda$postgres$a441ef18$1")) {
                    z = 2;
                    break;
                }
                break;
            case -104352320:
                if (implMethodName.equals("lambda$notFailWhenOldValueNotPresent$a7a92ce5$1")) {
                    z = 3;
                    break;
                }
                break;
            case 1818100338:
                if (implMethodName.equals("<init>")) {
                    z = 6;
                    break;
                }
                break;
            case 2031587740:
                if (implMethodName.equals("lambda$mysql$a45a00b3$1")) {
                    z = 5;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/cdc/DebeziumCdcIntegrationTest") && serializedLambda.getImplMethodSignature().equals("(Lcom/hazelcast/jet/cdc/ChangeRecord;)Lcom/hazelcast/jet/cdc/ChangeRecord;")) {
                    return changeRecord -> {
                        return changeRecord;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/PredicateEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("testEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Z") && serializedLambda.getImplClass().equals("com/hazelcast/jet/cdc/DebeziumCdcIntegrationTest") && serializedLambda.getImplMethodSignature().equals("(Lcom/hazelcast/jet/cdc/ChangeRecord;)Z")) {
                    return changeRecord2 -> {
                        return changeRecord2.operation() != Operation.UNSPECIFIED;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/cdc/DebeziumCdcIntegrationTest") && serializedLambda.getImplMethodSignature().equals("(Lcom/hazelcast/jet/cdc/ChangeRecord;)Ljava/lang/String;")) {
                    return changeRecord22 -> {
                        return changeRecord22.operation() + ":" + ((Customer) changeRecord22.value().toObject(Customer.class));
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/jet/function/TriFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/cdc/DebeziumCdcIntegrationTest") && serializedLambda.getImplMethodSignature().equals("(Lcom/hazelcast/jet/accumulator/LongAccumulator;Lcom/hazelcast/jet/cdc/ChangeRecord;Lcom/hazelcast/jet/cdc/ChangeRecord;)Ljava/lang/Long;")) {
                    return (longAccumulator, changeRecord23, changeRecord3) -> {
                        longAccumulator.add(1L);
                        return Long.valueOf(longAccumulator.get());
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/cdc/DebeziumCdcIntegrationTest") && serializedLambda.getImplMethodSignature().equals("(Lcom/hazelcast/jet/cdc/ChangeRecord;)Ljava/lang/String;")) {
                    return changeRecord24 -> {
                        return changeRecord24.operation() + ":" + ((Customer) changeRecord24.value().toObject(Customer.class));
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/PredicateEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("testEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Z") && serializedLambda.getImplClass().equals("com/hazelcast/jet/cdc/DebeziumCdcIntegrationTest") && serializedLambda.getImplMethodSignature().equals("(Lcom/hazelcast/jet/cdc/ChangeRecord;)Z")) {
                    return changeRecord4 -> {
                        return changeRecord4.operation() != Operation.UNSPECIFIED;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 8 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/SupplierEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("getEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/accumulator/LongAccumulator") && serializedLambda.getImplMethodSignature().equals("()V")) {
                    return LongAccumulator::new;
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
