package io.debezium.connector.mongodb;

import com.mongodb.DBRef;
import com.mongodb.client.ClientSession;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.InsertOneOptions;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.connector.mongodb.ConnectionContext;
import io.debezium.connector.mongodb.MongoDbConnectorConfig;
import io.debezium.converters.CloudEventsConverterTest;
import io.debezium.data.Envelope;
import io.debezium.data.VerifyRecord;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.util.Collect;
import io.debezium.util.IoUtil;
import io.debezium.util.Testing;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintStream;
import java.math.BigDecimal;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.bson.BsonDocument;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.bson.types.Decimal128;
import org.bson.types.ObjectId;
import org.fest.assertions.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:io/debezium/connector/mongodb/MongoDbConnectorIT.class */
public class MongoDbConnectorIT extends AbstractConnectorTest {
    private Configuration config;
    private MongoDbTaskContext context;

    @Before
    public void beforeEach() {
        Testing.Debug.disable();
        Testing.Print.disable();
        stopConnector();
        initializeConnectorTestFramework();
    }

    @After
    public void afterEach() {
        try {
            stopConnector();
        } finally {
            if (this.context != null) {
                this.context.getConnectionContext().shutdown();
            }
        }
    }

    @Test
    public void shouldNotStartWithInvalidConfiguration() {
        this.config = Configuration.create().with(MongoDbConnectorConfig.AUTO_DISCOVER_MEMBERS, "true").build();
        this.logger.info("Attempting to start the connector with an INVALID configuration, so MULTIPLE error messages & one exceptions will appear in the log");
        start(MongoDbConnector.class, this.config, (z, str, th) -> {
            Assertions.assertThat(z).isFalse();
            Assertions.assertThat(th).isNotNull();
        });
        assertConnectorNotRunning();
    }

    @Test
    public void shouldFailToValidateInvalidConfiguration() {
        Config validate = new MongoDbConnector().validate(Configuration.create().build().asMap());
        assertConfigurationErrors(validate, MongoDbConnectorConfig.HOSTS, 1);
        assertConfigurationErrors(validate, MongoDbConnectorConfig.LOGICAL_NAME, 1);
        assertNoConfigurationErrors(validate, new Field[]{MongoDbConnectorConfig.USER});
        assertNoConfigurationErrors(validate, new Field[]{MongoDbConnectorConfig.PASSWORD});
        assertNoConfigurationErrors(validate, new Field[]{MongoDbConnectorConfig.AUTO_DISCOVER_MEMBERS});
        assertNoConfigurationErrors(validate, new Field[]{MongoDbConnectorConfig.DATABASE_WHITELIST});
        assertNoConfigurationErrors(validate, new Field[]{MongoDbConnectorConfig.DATABASE_INCLUDE_LIST});
        assertNoConfigurationErrors(validate, new Field[]{MongoDbConnectorConfig.DATABASE_BLACKLIST});
        assertNoConfigurationErrors(validate, new Field[]{MongoDbConnectorConfig.DATABASE_EXCLUDE_LIST});
        assertNoConfigurationErrors(validate, new Field[]{MongoDbConnectorConfig.COLLECTION_WHITELIST});
        assertNoConfigurationErrors(validate, new Field[]{MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST});
        assertNoConfigurationErrors(validate, new Field[]{MongoDbConnectorConfig.COLLECTION_BLACKLIST});
        assertNoConfigurationErrors(validate, new Field[]{MongoDbConnectorConfig.COLLECTION_EXCLUDE_LIST});
        assertNoConfigurationErrors(validate, new Field[]{MongoDbConnectorConfig.SNAPSHOT_MAX_THREADS});
        assertNoConfigurationErrors(validate, new Field[]{MongoDbConnectorConfig.MAX_QUEUE_SIZE});
        assertNoConfigurationErrors(validate, new Field[]{MongoDbConnectorConfig.MAX_BATCH_SIZE});
        assertNoConfigurationErrors(validate, new Field[]{MongoDbConnectorConfig.POLL_INTERVAL_MS});
        assertNoConfigurationErrors(validate, new Field[]{MongoDbConnectorConfig.CONNECT_BACKOFF_INITIAL_DELAY_MS});
        assertNoConfigurationErrors(validate, new Field[]{MongoDbConnectorConfig.CONNECT_BACKOFF_MAX_DELAY_MS});
        assertNoConfigurationErrors(validate, new Field[]{MongoDbConnectorConfig.MAX_FAILED_CONNECTIONS});
        assertNoConfigurationErrors(validate, new Field[]{MongoDbConnectorConfig.SSL_ENABLED});
        assertNoConfigurationErrors(validate, new Field[]{MongoDbConnectorConfig.SSL_ALLOW_INVALID_HOSTNAMES});
        assertNoConfigurationErrors(validate, new Field[]{CommonConnectorConfig.TOMBSTONES_ON_DELETE});
    }

    @Test
    public void shouldThrowExceptionWhenFieldExcludeListDatabasePartIsOnlyProvided() {
        shouldValidateFilterFieldConfiguration(MongoDbConnectorConfig.FIELD_EXCLUDE_LIST, "inventory", 1);
    }

    @Test
    public void shouldThrowExceptionWhenFieldExcludeListDatabaseAndCollectionPartIsOnlyProvided() {
        shouldValidateFilterFieldConfiguration(MongoDbConnectorConfig.FIELD_EXCLUDE_LIST, "inventory.collectionA", 1);
    }

    @Test
    public void shouldThrowExceptionWhenFieldExcludeListDatabaseAndCollectionPartsAreMissing() {
        shouldValidateFilterFieldConfiguration(MongoDbConnectorConfig.FIELD_EXCLUDE_LIST, ".name", 1);
    }

    @Test
    public void shouldThrowExceptionWhenFieldExcludeListFieldPartIsMissing() {
        shouldValidateFilterFieldConfiguration(MongoDbConnectorConfig.FIELD_EXCLUDE_LIST, "db1.collectionA.", 1);
    }

    @Test
    public void shouldNotThrowExceptionWhenFieldExcludeListHasLeadingWhiteSpaces() {
        shouldValidateFilterFieldConfiguration(MongoDbConnectorConfig.FIELD_EXCLUDE_LIST, " *.collectionA.name", 0);
    }

    @Test
    public void shouldNotThrowExceptionWhenFieldExcludeListHasWhiteSpaces() {
        shouldValidateFilterFieldConfiguration(MongoDbConnectorConfig.FIELD_EXCLUDE_LIST, "db1.collectionA.name ,db2.collectionB.house ", 0);
    }

    @Test
    public void shouldNotThrowExceptionWhenFieldExcludeListIsValid() {
        shouldValidateFilterFieldConfiguration(MongoDbConnectorConfig.FIELD_EXCLUDE_LIST, "db1.collectionA.name1", 0);
    }

    @Test
    public void shouldThrowExceptionWhenFieldRenamesDatabaseAndCollectionPartsAreMissing() {
        shouldValidateFilterFieldConfiguration(MongoDbConnectorConfig.FIELD_RENAMES, ".name=new_name", 1);
    }

    @Test
    public void shouldThrowExceptionWhenFieldRenamesReplacementPartIsMissing() {
        shouldValidateFilterFieldConfiguration(MongoDbConnectorConfig.FIELD_RENAMES, "db1.collectionA.", 1);
    }

    @Test
    public void shouldThrowExceptionWhenFieldRenamesReplacementPartSeparatorIsMissing() {
        shouldValidateFilterFieldConfiguration(MongoDbConnectorConfig.FIELD_RENAMES, "db1.collectionA.namenew_name", 1);
    }

    @Test
    public void shouldThrowExceptionWhenFieldRenamesRenameMappingKeyIsMissing() {
        shouldValidateFilterFieldConfiguration(MongoDbConnectorConfig.FIELD_RENAMES, "db1.collectionA.=new_name", 1);
    }

    @Test
    public void shouldThrowExceptionWhenFieldRenamesRenameMappingValueIsMissing() {
        shouldValidateFilterFieldConfiguration(MongoDbConnectorConfig.FIELD_RENAMES, "db1.collectionA.name=", 1);
    }

    @Test
    public void shouldNotThrowExceptionWhenFieldRenamesHasLeadingWhiteSpaces() {
        shouldValidateFilterFieldConfiguration(MongoDbConnectorConfig.FIELD_RENAMES, " db1.collectionA.name:newname", 0);
    }

    @Test
    public void shouldNotThrowExceptionWhenFieldRenamesHasWhiteSpaces() {
        shouldValidateFilterFieldConfiguration(MongoDbConnectorConfig.FIELD_RENAMES, "*.collectionA.name:new_name, db2.collectionB.house:new_house ", 0);
    }

    @Test
    public void shouldNotThrowExceptionWhenFieldRenamesIsValid() {
        shouldValidateFilterFieldConfiguration(MongoDbConnectorConfig.FIELD_RENAMES, "db1.collectionA.name1:new_name1", 0);
    }

    public void shouldValidateFilterFieldConfiguration(Field field, String str, int i) {
        this.config = TestHelper.getConfiguration().edit().with(field, str).build();
        Config validate = new MongoDbConnector().validate(this.config.asMap());
        if (i == 0) {
            assertNoConfigurationErrors(validate, new Field[]{field});
        } else {
            assertConfigurationErrors(validate, field, i);
        }
    }

    @Test
    public void shouldValidateAcceptableConfiguration() {
        this.config = TestHelper.getConfiguration();
        this.context = new MongoDbTaskContext(this.config);
        storeDocuments("dbval", "validationColl1", "simple_objects.json");
        storeDocuments("dbval2", "validationColl2", "restaurants1.json");
        Config validate = new MongoDbConnector().validate(this.config.asMap());
        assertNoConfigurationErrors(validate, new Field[]{MongoDbConnectorConfig.HOSTS});
        assertNoConfigurationErrors(validate, new Field[]{MongoDbConnectorConfig.LOGICAL_NAME});
        assertNoConfigurationErrors(validate, new Field[]{MongoDbConnectorConfig.USER});
        assertNoConfigurationErrors(validate, new Field[]{MongoDbConnectorConfig.PASSWORD});
        assertNoConfigurationErrors(validate, new Field[]{MongoDbConnectorConfig.AUTO_DISCOVER_MEMBERS});
        assertNoConfigurationErrors(validate, new Field[]{MongoDbConnectorConfig.DATABASE_WHITELIST});
        assertNoConfigurationErrors(validate, new Field[]{MongoDbConnectorConfig.DATABASE_INCLUDE_LIST});
        assertNoConfigurationErrors(validate, new Field[]{MongoDbConnectorConfig.DATABASE_BLACKLIST});
        assertNoConfigurationErrors(validate, new Field[]{MongoDbConnectorConfig.DATABASE_EXCLUDE_LIST});
        assertNoConfigurationErrors(validate, new Field[]{MongoDbConnectorConfig.COLLECTION_WHITELIST});
        assertNoConfigurationErrors(validate, new Field[]{MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST});
        assertNoConfigurationErrors(validate, new Field[]{MongoDbConnectorConfig.COLLECTION_BLACKLIST});
        assertNoConfigurationErrors(validate, new Field[]{MongoDbConnectorConfig.COLLECTION_EXCLUDE_LIST});
        assertNoConfigurationErrors(validate, new Field[]{MongoDbConnectorConfig.SNAPSHOT_MAX_THREADS});
        assertNoConfigurationErrors(validate, new Field[]{MongoDbConnectorConfig.MAX_QUEUE_SIZE});
        assertNoConfigurationErrors(validate, new Field[]{MongoDbConnectorConfig.MAX_BATCH_SIZE});
        assertNoConfigurationErrors(validate, new Field[]{MongoDbConnectorConfig.POLL_INTERVAL_MS});
        assertNoConfigurationErrors(validate, new Field[]{MongoDbConnectorConfig.CONNECT_BACKOFF_INITIAL_DELAY_MS});
        assertNoConfigurationErrors(validate, new Field[]{MongoDbConnectorConfig.CONNECT_BACKOFF_MAX_DELAY_MS});
        assertNoConfigurationErrors(validate, new Field[]{MongoDbConnectorConfig.MAX_FAILED_CONNECTIONS});
        assertNoConfigurationErrors(validate, new Field[]{MongoDbConnectorConfig.SSL_ENABLED});
        assertNoConfigurationErrors(validate, new Field[]{MongoDbConnectorConfig.SSL_ALLOW_INVALID_HOSTNAMES});
        assertNoConfigurationErrors(validate, new Field[]{CommonConnectorConfig.TOMBSTONES_ON_DELETE});
    }

    @Test
    public void shouldConsumeAllEventsFromDatabase() throws InterruptedException, IOException {
        this.config = TestHelper.getConfiguration().edit().with(MongoDbConnectorConfig.POLL_INTERVAL_MS, 10).with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.*").with(MongoDbConnectorConfig.LOGICAL_NAME, "mongo").build();
        this.context = new MongoDbTaskContext(this.config);
        TestHelper.cleanDatabase(primary(), "dbit");
        storeDocuments("dbit", "simpletons", "simple_objects.json");
        storeDocuments("dbit", "restaurants", "restaurants1.json");
        start(MongoDbConnector.class, this.config);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(12);
        Set set = consumeRecordsByTopic.topics();
        PrintStream printStream = System.out;
        Objects.requireNonNull(printStream);
        set.forEach(printStream::println);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("mongo.dbit.simpletons").size()).isEqualTo(6);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("mongo.dbit.restaurants").size()).isEqualTo(6);
        Assertions.assertThat(consumeRecordsByTopic.topics().size()).isEqualTo(2);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        consumeRecordsByTopic.forEach(sourceRecord -> {
            validate(sourceRecord);
            verifyFromInitialSync(sourceRecord, atomicBoolean);
            verifyReadOperation(sourceRecord);
        });
        Assertions.assertThat(atomicBoolean.get()).isTrue();
        storeDocuments("dbit", "restaurants", "restaurants2.json");
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(4);
        Assertions.assertThat(consumeRecordsByTopic2.recordsForTopic("mongo.dbit.restaurants").size()).isEqualTo(4);
        Assertions.assertThat(consumeRecordsByTopic2.topics().size()).isEqualTo(1);
        consumeRecordsByTopic2.forEach(sourceRecord2 -> {
            validate(sourceRecord2);
            verifyNotFromInitialSync(sourceRecord2);
            verifyCreateOperation(sourceRecord2);
            verifyNotFromTransaction(sourceRecord2);
        });
        stopConnector();
        storeDocuments("dbit", "restaurants", "restaurants3.json");
        start(MongoDbConnector.class, this.config);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic3 = consumeRecordsByTopic(5);
        Assertions.assertThat(consumeRecordsByTopic3.recordsForTopic("mongo.dbit.restaurants").size()).isEqualTo(5);
        Assertions.assertThat(consumeRecordsByTopic3.topics().size()).isEqualTo(1);
        consumeRecordsByTopic3.forEach(sourceRecord3 -> {
            validate(sourceRecord3);
            verifyNotFromInitialSync(sourceRecord3);
            verifyCreateOperation(sourceRecord3);
        });
        storeDocuments("dbit", "restaurants", "restaurants4.json");
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic4 = consumeRecordsByTopic(8);
        Assertions.assertThat(consumeRecordsByTopic4.recordsForTopic("mongo.dbit.restaurants").size()).isEqualTo(8);
        Assertions.assertThat(consumeRecordsByTopic4.topics().size()).isEqualTo(1);
        consumeRecordsByTopic4.forEach(sourceRecord4 -> {
            validate(sourceRecord4);
            verifyNotFromInitialSync(sourceRecord4);
            verifyCreateOperation(sourceRecord4);
        });
        AtomicReference atomicReference = new AtomicReference();
        primary().execute("create", mongoClient -> {
            MongoCollection collection = mongoClient.getDatabase("dbit").getCollection("arbitrary");
            collection.drop();
            collection.insertOne(Document.parse("{\"a\": 1, \"b\": 2}"), new InsertOneOptions().bypassDocumentValidation(true));
            Document document = (Document) collection.find().first();
            Testing.debug("Document: " + document);
            atomicReference.set(document.getObjectId("_id").toString());
            Testing.debug("Document ID: " + ((String) atomicReference.get()));
        });
        primary().execute("update", mongoClient2 -> {
            MongoCollection collection = mongoClient2.getDatabase("dbit").getCollection("arbitrary");
            Testing.debug("Document: " + ((Document) collection.find().first()));
            collection.updateOne(Document.parse("{\"a\": 1}"), Document.parse("{ \"$set\": { \"b\": 10 } }"));
            Testing.debug("Document: " + ((Document) collection.find().first()));
        });
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic5 = consumeRecordsByTopic(2);
        Assertions.assertThat(consumeRecordsByTopic5.recordsForTopic("mongo.dbit.arbitrary").size()).isEqualTo(2);
        Assertions.assertThat(consumeRecordsByTopic5.topics().size()).isEqualTo(1);
        consumeRecordsByTopic4.forEach(sourceRecord5 -> {
            validate(sourceRecord5);
            verifyNotFromInitialSync(sourceRecord5);
            verifyCreateOperation(sourceRecord5);
        });
        SourceRecord sourceRecord6 = (SourceRecord) consumeRecordsByTopic5.allRecordsInOrder().get(0);
        SourceRecord sourceRecord7 = (SourceRecord) consumeRecordsByTopic5.allRecordsInOrder().get(1);
        Testing.debug("Insert event: " + sourceRecord6);
        Testing.debug("Update event: " + sourceRecord7);
        Struct struct = (Struct) sourceRecord6.key();
        Struct struct2 = (Struct) sourceRecord7.key();
        String objectId = toObjectId(struct.getString("id")).toString();
        String objectId2 = toObjectId(struct2.getString("id")).toString();
        Assertions.assertThat(objectId).isEqualTo((String) atomicReference.get());
        Assertions.assertThat(objectId2).isEqualTo((String) atomicReference.get());
        primary().execute("delete", mongoClient3 -> {
            mongoClient3.getDatabase("dbit").getCollection("arbitrary").deleteOne(Document.parse("{\"a\": 1}"));
        });
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic6 = consumeRecordsByTopic(2);
        Assertions.assertThat(consumeRecordsByTopic6.recordsForTopic("mongo.dbit.arbitrary").size()).isEqualTo(2);
        Assertions.assertThat(consumeRecordsByTopic6.topics().size()).isEqualTo(1);
        SourceRecord sourceRecord8 = (SourceRecord) consumeRecordsByTopic6.allRecordsInOrder().get(0);
        validate(sourceRecord8);
        verifyNotFromInitialSync(sourceRecord8);
        verifyDeleteOperation(sourceRecord8);
        SourceRecord sourceRecord9 = (SourceRecord) consumeRecordsByTopic6.allRecordsInOrder().get(1);
        validate(sourceRecord9);
        Testing.debug("Delete event: " + sourceRecord8);
        Testing.debug("Tombstone event: " + sourceRecord9);
        Assertions.assertThat(toObjectId(((Struct) sourceRecord8.key()).getString("id")).toString()).isEqualTo((String) atomicReference.get());
    }

    @Test
    @FixFor({"DBZ-1831"})
    public void shouldConsumeAllEventsFromDatabaseWithSkippedOperations() throws InterruptedException, IOException {
        this.config = TestHelper.getConfiguration().edit().with(MongoDbConnectorConfig.POLL_INTERVAL_MS, 10).with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.*").with(MongoDbConnectorConfig.LOGICAL_NAME, "mongo").with(MongoDbConnectorConfig.SKIPPED_OPERATIONS, "u").build();
        this.context = new MongoDbTaskContext(this.config);
        TestHelper.cleanDatabase(primary(), "dbit");
        start(MongoDbConnector.class, this.config);
        waitForStreamingRunning("mongodb", "mongo");
        AtomicReference atomicReference = new AtomicReference();
        primary().execute("create", mongoClient -> {
            MongoCollection collection = mongoClient.getDatabase("dbit").getCollection("arbitrary");
            collection.drop();
            collection.insertOne(Document.parse("{\"a\": 1, \"b\": 2}"), new InsertOneOptions().bypassDocumentValidation(true));
            Document document = (Document) collection.find().first();
            Testing.debug("Document: " + document);
            atomicReference.set(document.getObjectId("_id").toString());
            Testing.debug("Document ID: " + ((String) atomicReference.get()));
        });
        Assertions.assertThat(consumeRecordsByTopic(1).recordsForTopic("mongo.dbit.arbitrary")).hasSize(1);
        primary().execute("update", mongoClient2 -> {
            MongoCollection collection = mongoClient2.getDatabase("dbit").getCollection("arbitrary");
            Testing.debug("Document: " + ((Document) collection.find().first()));
            collection.updateOne(Document.parse("{\"a\": 1}"), Document.parse("{ \"$set\": { \"b\": 10 } }"));
            Testing.debug("Document: " + ((Document) collection.find().first()));
        });
        primary().execute("delete", mongoClient3 -> {
            MongoCollection collection = mongoClient3.getDatabase("dbit").getCollection("arbitrary");
            Testing.debug("Document: " + ((Document) collection.find().first()));
            collection.deleteOne(Document.parse("{\"a\": 1}"));
            Testing.debug("Document: " + ((Document) collection.find().first()));
        });
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("mongo.dbit.arbitrary")).hasSize(1);
        SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic.allRecordsInOrder().get(0);
        validate(sourceRecord);
        verifyDeleteOperation(sourceRecord);
    }

    @Test
    @FixFor({"DBZ-1168"})
    public void shouldConsumeAllEventsFromDatabaseWithCustomAuthSource() throws InterruptedException, IOException {
        this.config = TestHelper.getConfiguration().edit().with(MongoDbConnectorConfig.POLL_INTERVAL_MS, 10).with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.*").with(MongoDbConnectorConfig.LOGICAL_NAME, "mongo").build();
        this.context = new MongoDbTaskContext(this.config);
        TestHelper.cleanDatabase(primary(), "dbit");
        primary().execute("Create auth database", mongoClient -> {
            MongoDatabase database = mongoClient.getDatabase("authdb");
            try {
                database.runCommand(BsonDocument.parse("{dropUser: \"dbz\"}"));
            } catch (Exception e) {
                this.logger.info("Expected error while dropping user", e);
            }
            database.runCommand(BsonDocument.parse("{createUser: \"dbz\", pwd: \"pass\", roles: [{role: \"readAnyDatabase\", db: \"admin\"}]}"));
        });
        storeDocuments("dbit", "simpletons", "simple_objects.json");
        storeDocuments("dbit", "restaurants", "restaurants1.json");
        this.config = TestHelper.getConfiguration().edit().with(MongoDbConnectorConfig.USER, "dbz").with(MongoDbConnectorConfig.PASSWORD, "pass").with(MongoDbConnectorConfig.AUTH_SOURCE, "authdb").with(MongoDbConnectorConfig.POLL_INTERVAL_MS, 10).with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.*").with(MongoDbConnectorConfig.LOGICAL_NAME, "mongo").build();
        this.context = new MongoDbTaskContext(this.config);
        start(MongoDbConnector.class, this.config);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(12);
        Set set = consumeRecordsByTopic.topics();
        PrintStream printStream = System.out;
        Objects.requireNonNull(printStream);
        set.forEach(printStream::println);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("mongo.dbit.simpletons").size()).isEqualTo(6);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("mongo.dbit.restaurants").size()).isEqualTo(6);
        Assertions.assertThat(consumeRecordsByTopic.topics().size()).isEqualTo(2);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        consumeRecordsByTopic.forEach(sourceRecord -> {
            validate(sourceRecord);
            verifyFromInitialSync(sourceRecord, atomicBoolean);
            verifyReadOperation(sourceRecord);
        });
        Assertions.assertThat(atomicBoolean.get()).isTrue();
        storeDocuments("dbit", "restaurants", "restaurants2.json");
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(4);
        Assertions.assertThat(consumeRecordsByTopic2.recordsForTopic("mongo.dbit.restaurants").size()).isEqualTo(4);
        Assertions.assertThat(consumeRecordsByTopic2.topics().size()).isEqualTo(1);
        consumeRecordsByTopic2.forEach(sourceRecord2 -> {
            validate(sourceRecord2);
            verifyNotFromInitialSync(sourceRecord2);
            verifyCreateOperation(sourceRecord2);
            verifyNotFromTransaction(sourceRecord2);
        });
        stopConnector();
    }

    @Test
    @FixFor({"DBZ-1767"})
    public void shouldSupportDbRef() throws InterruptedException, IOException {
        this.config = TestHelper.getConfiguration().edit().with(MongoDbConnectorConfig.POLL_INTERVAL_MS, 10).with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.*").with(MongoDbConnectorConfig.LOGICAL_NAME, "mongo").build();
        this.context = new MongoDbTaskContext(this.config);
        TestHelper.cleanDatabase(primary(), "dbit");
        storeDocuments("dbit", "spec", "spec_objects.json");
        this.context = new MongoDbTaskContext(this.config);
        start(MongoDbConnector.class, this.config);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("mongo.dbit.spec").size()).isEqualTo(1);
        Assertions.assertThat(consumeRecordsByTopic.topics().size()).isEqualTo(1);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        consumeRecordsByTopic.forEach(sourceRecord -> {
            validate(sourceRecord);
            verifyFromInitialSync(sourceRecord, atomicBoolean);
            verifyReadOperation(sourceRecord);
        });
        Assertions.assertThat(atomicBoolean.get()).isTrue();
        primary().execute("insert", mongoClient -> {
            mongoClient.getDatabase("dbit").getCollection("spec").insertOne(Document.parse("{ '_id' : 2, 'data' : { '$ref' : 'a2', '$id' : 4, '$db' : 'b2' } }"));
        });
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic2.recordsForTopic("mongo.dbit.spec").size()).isEqualTo(1);
        Assertions.assertThat(consumeRecordsByTopic2.topics().size()).isEqualTo(1);
        consumeRecordsByTopic2.forEach(sourceRecord2 -> {
            validate(sourceRecord2);
            verifyNotFromInitialSync(sourceRecord2);
            verifyCreateOperation(sourceRecord2);
            verifyNotFromTransaction(sourceRecord2);
        });
        stopConnector();
    }

    @Test
    @FixFor({"DBZ-865 and DBZ-1242"})
    public void shouldConsumeEventsFromCollectionWithReplacedTopicName() throws InterruptedException, IOException {
        LogInterceptor logInterceptor = new LogInterceptor();
        this.config = TestHelper.getConfiguration().edit().with(MongoDbConnectorConfig.POLL_INTERVAL_MS, 10).with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.dbz865.*").with(MongoDbConnectorConfig.LOGICAL_NAME, "mongo").build();
        this.context = new MongoDbTaskContext(this.config);
        TestHelper.cleanDatabase(primary(), "dbit");
        primary().execute("create", mongoClient -> {
            MongoCollection collection = mongoClient.getDatabase("dbit").getCollection("dbz865_my@collection");
            collection.drop();
            collection.insertOne(Document.parse("{\"a\": 1, \"b\": 2}"), new InsertOneOptions().bypassDocumentValidation(true));
        });
        start(MongoDbConnector.class, this.config);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(12);
        Set set = consumeRecordsByTopic.topics();
        PrintStream printStream = System.out;
        Objects.requireNonNull(printStream);
        set.forEach(printStream::println);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("mongo.dbit.dbz865_my_collection")).hasSize(1);
        Assertions.assertThat(consumeRecordsByTopic.topics().size()).isEqualTo(1);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        consumeRecordsByTopic.forEach(sourceRecord -> {
            validate(sourceRecord);
            verifyFromInitialSync(sourceRecord, atomicBoolean);
            verifyReadOperation(sourceRecord);
        });
        Assertions.assertThat(atomicBoolean.get()).isTrue();
        stopConnector(z -> {
            Assertions.assertThat(logInterceptor.containsWarnMessage("After applying the include/exclude list filters, no changes will be captured. Please check your configuration!")).isFalse();
        });
    }

    @Test
    @FixFor({"DBZ-1242"})
    public void testEmptySchemaWarningAfterApplyingCollectionFilters() throws Exception {
        LogInterceptor logInterceptor = new LogInterceptor();
        this.config = TestHelper.getConfiguration().edit().with(MongoDbConnectorConfig.POLL_INTERVAL_MS, 10).with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.dbz865.my_products").with(MongoDbConnectorConfig.LOGICAL_NAME, "mongo").build();
        this.context = new MongoDbTaskContext(this.config);
        TestHelper.cleanDatabase(primary(), "dbit");
        primary().execute("create", mongoClient -> {
            MongoCollection collection = mongoClient.getDatabase("dbit").getCollection("dbz865_my@collection");
            collection.drop();
            collection.insertOne(Document.parse("{\"a\": 1, \"b\": 2}"), new InsertOneOptions().bypassDocumentValidation(true));
        });
        start(MongoDbConnector.class, this.config);
        consumeRecordsByTopic(12);
        stopConnector(z -> {
            Assertions.assertThat(logInterceptor.containsWarnMessage("After applying the include/exclude list filters, no changes will be captured. Please check your configuration!")).isTrue();
        });
    }

    protected void verifyFromInitialSync(SourceRecord sourceRecord, AtomicBoolean atomicBoolean) {
        if (sourceRecord.sourceOffset().containsKey("initsync")) {
            Assertions.assertThat(sourceRecord.sourceOffset().containsKey("initsync")).isTrue();
            Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("source").getString("snapshot")).isEqualTo("true");
        } else {
            Assertions.assertThat(atomicBoolean.getAndSet(true)).isFalse();
            Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("source").getString("snapshot")).isEqualTo("last");
        }
        verifyNotFromTransaction(sourceRecord);
    }

    @Test
    @FixFor({"DBZ-1215"})
    public void shouldConsumeTransaction() throws InterruptedException, IOException {
        this.config = TestHelper.getConfiguration().edit().with(MongoDbConnectorConfig.POLL_INTERVAL_MS, 10).with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.*").with(MongoDbConnectorConfig.LOGICAL_NAME, "mongo").build();
        this.context = new MongoDbTaskContext(this.config);
        if (!TestHelper.transactionsSupported(primary(), "dbit")) {
            this.logger.info("Test not executed, transactions not supported in the server");
            return;
        }
        TestHelper.cleanDatabase(primary(), "dbit");
        storeDocuments("dbit", "simpletons", "simple_objects.json");
        storeDocuments("dbit", "restaurants", "restaurants1.json");
        start(MongoDbConnector.class, this.config);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(12);
        Set set = consumeRecordsByTopic.topics();
        PrintStream printStream = System.out;
        Objects.requireNonNull(printStream);
        set.forEach(printStream::println);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("mongo.dbit.simpletons").size()).isEqualTo(6);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("mongo.dbit.restaurants").size()).isEqualTo(6);
        Assertions.assertThat(consumeRecordsByTopic.topics().size()).isEqualTo(2);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        consumeRecordsByTopic.forEach(sourceRecord -> {
            validate(sourceRecord);
            verifyFromInitialSync(sourceRecord, atomicBoolean);
            verifyReadOperation(sourceRecord);
        });
        Assertions.assertThat(atomicBoolean.get()).isTrue();
        storeDocumentsInTx("dbit", "restaurants", "restaurants2.json");
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(4);
        Assertions.assertThat(consumeRecordsByTopic2.recordsForTopic("mongo.dbit.restaurants").size()).isEqualTo(4);
        Assertions.assertThat(consumeRecordsByTopic2.topics().size()).isEqualTo(1);
        AtomicLong atomicLong = new AtomicLong(0L);
        consumeRecordsByTopic2.forEach(sourceRecord2 -> {
            validate(sourceRecord2);
            verifyNotFromInitialSync(sourceRecord2);
            verifyCreateOperation(sourceRecord2);
            verifyFromTransaction(sourceRecord2, atomicLong.incrementAndGet());
        });
        stopConnector();
        storeDocumentsInTx("dbit", "restaurants", "restaurants3.json");
        start(MongoDbConnector.class, this.config);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic3 = consumeRecordsByTopic(5);
        Assertions.assertThat(consumeRecordsByTopic3.recordsForTopic("mongo.dbit.restaurants").size()).isEqualTo(5);
        Assertions.assertThat(consumeRecordsByTopic3.topics().size()).isEqualTo(1);
        atomicLong.set(0L);
        consumeRecordsByTopic3.forEach(sourceRecord3 -> {
            validate(sourceRecord3);
            verifyNotFromInitialSync(sourceRecord3);
            verifyCreateOperation(sourceRecord3);
            verifyFromTransaction(sourceRecord3, atomicLong.incrementAndGet());
        });
    }

    @Test
    @FixFor({"DBZ-1215"})
    public void shouldResumeTransactionInMiddle() throws InterruptedException, IOException {
        this.config = TestHelper.getConfiguration().edit().with(MongoDbConnectorConfig.POLL_INTERVAL_MS, 10).with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.*").with(MongoDbConnectorConfig.LOGICAL_NAME, "mongo").build();
        this.context = new MongoDbTaskContext(this.config);
        if (!TestHelper.transactionsSupported(primary(), "dbit")) {
            this.logger.info("Test not executed, transactions not supported in the server");
            return;
        }
        TestHelper.cleanDatabase(primary(), "dbit");
        storeDocuments("dbit", "simpletons", "simple_objects.json");
        storeDocuments("dbit", "restaurants", "restaurants1.json");
        start(MongoDbConnector.class, this.config, sourceRecord -> {
            Long int64 = ((Struct) sourceRecord.value()).getStruct("source").getInt64("tord");
            return int64 != null && int64.equals(3L);
        });
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(12);
        Set set = consumeRecordsByTopic.topics();
        PrintStream printStream = System.out;
        Objects.requireNonNull(printStream);
        set.forEach(printStream::println);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("mongo.dbit.simpletons").size()).isEqualTo(6);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("mongo.dbit.restaurants").size()).isEqualTo(6);
        Assertions.assertThat(consumeRecordsByTopic.topics().size()).isEqualTo(2);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        consumeRecordsByTopic.forEach(sourceRecord2 -> {
            validate(sourceRecord2);
            verifyFromInitialSync(sourceRecord2, atomicBoolean);
            verifyReadOperation(sourceRecord2);
        });
        Assertions.assertThat(atomicBoolean.get()).isTrue();
        storeDocumentsInTx("dbit", "restaurants", "restaurants2.json");
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(2);
        Assertions.assertThat(consumeRecordsByTopic2.recordsForTopic("mongo.dbit.restaurants").size()).isEqualTo(2);
        Assertions.assertThat(consumeRecordsByTopic2.topics().size()).isEqualTo(1);
        AtomicLong atomicLong = new AtomicLong(0L);
        consumeRecordsByTopic2.forEach(sourceRecord3 -> {
            validate(sourceRecord3);
            verifyNotFromInitialSync(sourceRecord3);
            verifyCreateOperation(sourceRecord3);
            verifyFromTransaction(sourceRecord3, atomicLong.incrementAndGet());
        });
        stopConnector();
        storeDocumentsInTx("dbit", "restaurants", "restaurants3.json");
        start(MongoDbConnector.class, this.config);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic3 = consumeRecordsByTopic(7);
        Assertions.assertThat(consumeRecordsByTopic3.recordsForTopic("mongo.dbit.restaurants").size()).isEqualTo(7);
        Assertions.assertThat(consumeRecordsByTopic3.topics().size()).isEqualTo(1);
        List arrayListOf = Collect.arrayListOf(3L, new Long[]{4L, 1L, 2L, 3L, 4L, 5L});
        consumeRecordsByTopic3.forEach(sourceRecord4 -> {
            validate(sourceRecord4);
            verifyNotFromInitialSync(sourceRecord4);
            verifyCreateOperation(sourceRecord4);
            verifyFromTransaction(sourceRecord4, ((Long) arrayListOf.remove(0)).longValue());
        });
    }

    @Test
    @FixFor({"DBZ-2116"})
    public void shouldSnapshotDocumentContainingFieldNamedOp() throws Exception {
        this.config = TestHelper.getConfiguration().edit().with(MongoDbConnectorConfig.POLL_INTERVAL_MS, 10).with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.*").with(MongoDbConnectorConfig.LOGICAL_NAME, "mongo").build();
        this.context = new MongoDbTaskContext(this.config);
        TestHelper.cleanDatabase(primary(), "dbit");
        storeDocuments("dbit", "fieldnamedop", "fieldnamedop.json");
        start(MongoDbConnector.class, this.config);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(2);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("mongo.dbit.fieldnamedop").size()).isEqualTo(2);
        Assertions.assertThat(consumeRecordsByTopic.topics().size()).isEqualTo(1);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        consumeRecordsByTopic.forEach(sourceRecord -> {
            validate(sourceRecord);
            verifyFromInitialSync(sourceRecord, atomicBoolean);
            verifyReadOperation(sourceRecord);
        });
        Assertions.assertThat(atomicBoolean.get()).isTrue();
        SourceRecord sourceRecord2 = (SourceRecord) consumeRecordsByTopic.recordsForTopic("mongo.dbit.fieldnamedop").get(0);
        Assertions.assertThat(((Struct) sourceRecord2.value()).get("op")).isEqualTo("r");
        Assertions.assertThat(Document.parse((String) ((Struct) sourceRecord2.value()).get("after")).get("op")).isEqualTo("foo");
        SourceRecord sourceRecord3 = (SourceRecord) consumeRecordsByTopic.recordsForTopic("mongo.dbit.fieldnamedop").get(1);
        Assertions.assertThat(((Struct) sourceRecord3.value()).get("op")).isEqualTo("r");
        Assertions.assertThat(Document.parse((String) ((Struct) sourceRecord3.value()).get("after")).get("op")).isEqualTo("bar");
    }

    @Test
    @FixFor({"DBZ-2496"})
    public void shouldFilterItemsInCollectionWhileTakingSnapshot() throws Exception {
        this.config = TestHelper.getConfiguration().edit().with(MongoDbConnectorConfig.POLL_INTERVAL_MS, 10).with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.*").with(MongoDbConnectorConfig.LOGICAL_NAME, "mongo").with(MongoDbConnectorConfig.SNAPSHOT_FILTER_QUERY_BY_COLLECTION, "dbit.simpletons,dbit.restaurants1,dbit.restaurants4").with(MongoDbConnectorConfig.SNAPSHOT_FILTER_QUERY_BY_COLLECTION + ".dbit.simpletons", "{ \"_id\": { \"$gt\": 4 } }").with(MongoDbConnectorConfig.SNAPSHOT_FILTER_QUERY_BY_COLLECTION + ".dbit.restaurants1", "{ $or: [ { cuisine: \"American \"}, { \"grades.grade\": \"Z\" } ] }").with(MongoDbConnectorConfig.SNAPSHOT_FILTER_QUERY_BY_COLLECTION + ".dbit.restaurants4", "{ cuisine: \"American \" , borough: \"Manhattan\"  }").build();
        this.context = new MongoDbTaskContext(this.config);
        TestHelper.cleanDatabase(primary(), "dbit");
        storeDocuments("dbit", "simpletons", "simple_objects.json");
        storeDocuments("dbit", "restaurants1", "restaurants1.json");
        storeDocuments("dbit", "restaurants2", "restaurants2.json");
        storeDocuments("dbit", "restaurants4", "restaurants4.json");
        start(MongoDbConnector.class, this.config);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(15);
        Assertions.assertThat(consumeRecordsByTopic.topics().size()).isEqualTo(4);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("mongo.dbit.simpletons").size()).isEqualTo(4);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("mongo.dbit.restaurants1").size()).isEqualTo(3);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("mongo.dbit.restaurants2").size()).isEqualTo(4);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("mongo.dbit.restaurants4").size()).isEqualTo(4);
        assertNoRecordsToConsume();
        stopConnector();
    }

    @Test
    @FixFor({"DBZ-2456"})
    public void shouldSelectivelySnapshot() throws InterruptedException {
        this.config = TestHelper.getConfiguration().edit().with(MongoDbConnectorConfig.POLL_INTERVAL_MS, 10).with(MongoDbConnectorConfig.SNAPSHOT_MODE, MongoDbConnectorConfig.SnapshotMode.INITIAL).with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.*").with(CommonConnectorConfig.SNAPSHOT_MODE_TABLES, "[A-z].*dbit.restaurants1").with(MongoDbConnectorConfig.LOGICAL_NAME, "mongo").build();
        this.context = new MongoDbTaskContext(this.config);
        TestHelper.cleanDatabase(primary(), "dbit");
        storeDocuments("dbit", "restaurants1", "restaurants1.json");
        storeDocuments("dbit", "restaurants2", "restaurants2.json");
        start(MongoDbConnector.class, this.config);
        waitForStreamingRunning("mongodb", "mongo");
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(6);
        List recordsForTopic = consumeRecordsByTopic.recordsForTopic("mongo.dbit.restaurants1");
        List recordsForTopic2 = consumeRecordsByTopic.recordsForTopic("mongo.dbit.restaurants2");
        Assertions.assertThat(recordsForTopic.size()).isEqualTo(6);
        Assertions.assertThat(recordsForTopic2).isNull();
        Instant.now();
        new ObjectId();
        insertDocuments("dbit", "restaurants2", Document.parse("{\"name\": \"Brunos On The Boulevard\", \"restaurant_id\": \"40356151\"}"));
        Assertions.assertThat(consumeRecordsByTopic(1).allRecordsInOrder().size()).isEqualTo(1);
        assertNoRecordsToConsume();
        stopConnector();
    }

    protected void verifyNotFromInitialSync(SourceRecord sourceRecord) {
        Assertions.assertThat(sourceRecord.sourceOffset().containsKey("initsync")).isFalse();
        Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("source").getString("snapshot")).isNull();
    }

    protected void verifyFromTransaction(SourceRecord sourceRecord, long j) {
        Assertions.assertThat(sourceRecord.sourceOffset().containsKey("tord")).isTrue();
        Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("source").getInt64("tord")).isEqualTo(j);
    }

    protected void verifyNotFromTransaction(SourceRecord sourceRecord) {
        Assertions.assertThat(sourceRecord.sourceOffset().containsKey("tord")).isFalse();
    }

    protected void verifyCreateOperation(SourceRecord sourceRecord) {
        verifyOperation(sourceRecord, Envelope.Operation.CREATE);
    }

    protected void verifyReadOperation(SourceRecord sourceRecord) {
        verifyOperation(sourceRecord, Envelope.Operation.READ);
    }

    protected void verifyUpdateOperation(SourceRecord sourceRecord) {
        verifyOperation(sourceRecord, Envelope.Operation.UPDATE);
    }

    protected void verifyDeleteOperation(SourceRecord sourceRecord) {
        verifyOperation(sourceRecord, Envelope.Operation.DELETE);
    }

    protected void verifyOperation(SourceRecord sourceRecord, Envelope.Operation operation) {
        Assertions.assertThat(((Struct) sourceRecord.value()).getString("op")).isEqualTo(operation.code());
    }

    protected ConnectionContext.MongoPrimary primary() {
        return this.context.getConnectionContext().primaryFor(ReplicaSet.parse(this.context.getConnectionContext().hosts()), this.context.filters(), connectionErrorHandler(3));
    }

    protected void storeDocuments(String str, String str2, String str3) {
        primary().execute("storing documents", mongoClient -> {
            Testing.debug("Storing in '" + str + "." + str2 + "' documents loaded from from '" + str3 + "'");
            MongoCollection<Document> collection = mongoClient.getDatabase(str).getCollection(str2);
            collection.drop();
            storeDocuments(collection, str3);
        });
    }

    protected void storeDocuments(MongoCollection<Document> mongoCollection, String str) {
        InsertOneOptions bypassDocumentValidation = new InsertOneOptions().bypassDocumentValidation(true);
        loadTestDocuments(str).forEach(document -> {
            Assertions.assertThat(document).isNotNull();
            Assertions.assertThat(document.size()).isGreaterThan(0);
            mongoCollection.insertOne(document, bypassDocumentValidation);
        });
    }

    protected void storeDocumentsInTx(String str, String str2, String str3) {
        primary().execute("storing documents", mongoClient -> {
            Testing.debug("Storing in '" + str + "." + str2 + "' documents loaded from from '" + str3 + "'");
            MongoDatabase database = mongoClient.getDatabase(str);
            MongoCollection<Document> collection = database.getCollection(str2);
            collection.drop();
            database.createCollection(str2);
            ClientSession startSession = mongoClient.startSession();
            MongoDatabase database2 = mongoClient.getDatabase("admin");
            if (database2 != null) {
                int parseInt = Integer.parseInt(System.getProperty("mongo.transaction.lock.request.timeout.ms", "1000"));
                Testing.debug("Setting MongoDB transaction lock request timeout as '" + parseInt + "ms'");
                database2.runCommand(startSession, new Document().append("setParameter", 1).append("maxTransactionLockRequestTimeoutMillis", Integer.valueOf(parseInt)));
            }
            startSession.startTransaction();
            storeDocuments(startSession, collection, str3);
            startSession.commitTransaction();
        });
    }

    protected void storeDocuments(ClientSession clientSession, MongoCollection<Document> mongoCollection, String str) {
        InsertOneOptions bypassDocumentValidation = new InsertOneOptions().bypassDocumentValidation(true);
        loadTestDocuments(str).forEach(document -> {
            Assertions.assertThat(document).isNotNull();
            Assertions.assertThat(document.size()).isGreaterThan(0);
            if (clientSession == null) {
                mongoCollection.insertOne(document, bypassDocumentValidation);
            } else {
                mongoCollection.insertOne(clientSession, document, bypassDocumentValidation);
            }
        });
    }

    protected List<Document> loadTestDocuments(String str) {
        ArrayList arrayList = new ArrayList();
        try {
            InputStream readResourceAsStream = Testing.Files.readResourceAsStream(str);
            try {
                Assertions.assertThat(readResourceAsStream).isNotNull();
                IoUtil.readLines(readResourceAsStream, str2 -> {
                    Document parse = Document.parse(str2);
                    Assertions.assertThat(parse.size()).isGreaterThan(0);
                    arrayList.add(parse);
                });
                if (readResourceAsStream != null) {
                    readResourceAsStream.close();
                }
            } finally {
            }
        } catch (IOException e) {
            Assert.fail("Unable to find or read file '" + str + "': " + e.getMessage());
        }
        return arrayList;
    }

    protected BiConsumer<String, Throwable> connectionErrorHandler(int i) {
        AtomicInteger atomicInteger = new AtomicInteger();
        return (str, th) -> {
            if (atomicInteger.incrementAndGet() > i) {
                Assert.fail("Unable to connect to primary after " + i + " errors trying to " + str + ": " + th);
            }
            this.logger.error("Error while attempting to {}: {}", new Object[]{str, th.getMessage(), th});
        };
    }

    @Test(expected = ConnectException.class)
    public void shouldUseSSL() throws InterruptedException, IOException {
        this.config = TestHelper.getConfiguration().edit().with(MongoDbConnectorConfig.POLL_INTERVAL_MS, 10).with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.*").with(MongoDbConnectorConfig.LOGICAL_NAME, "mongo").with(MongoDbConnectorConfig.MAX_FAILED_CONNECTIONS, 0).with(MongoDbConnectorConfig.SSL_ENABLED, true).with(MongoDbConnectorConfig.SERVER_SELECTION_TIMEOUT_MS, 2000).build();
        this.context = new MongoDbTaskContext(this.config);
        ConnectionContext.MongoPrimary primary = primary();
        primary.executeBlocking("Try SSL connection", mongoClient -> {
            primary.stop();
            mongoClient.getDatabase("dbit").listCollectionNames().first();
        });
    }

    @Test
    @FixFor({"DBZ-1198"})
    public void shouldEmitHeartbeatMessages() throws InterruptedException, IOException {
        Testing.Print.enable();
        this.config = TestHelper.getConfiguration().edit().with(MongoDbConnectorConfig.POLL_INTERVAL_MS, 10).with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.mhb").with(MongoDbConnectorConfig.LOGICAL_NAME, "mongo").with(Heartbeat.HEARTBEAT_INTERVAL, "1").build();
        this.context = new MongoDbTaskContext(this.config);
        TestHelper.cleanDatabase(primary(), "dbit");
        primary().execute("create", mongoClient -> {
            MongoDatabase database = mongoClient.getDatabase("dbit");
            MongoCollection collection = database.getCollection("mhb");
            collection.drop();
            collection.insertOne(Document.parse("{\"a\": 1, \"b\": 2}"), new InsertOneOptions().bypassDocumentValidation(true));
            database.getCollection("nmhb").drop();
        });
        start(MongoDbConnector.class, this.config);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.allRecordsInOrder()).hasSize(1);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("mongo.dbit.mhb")).hasSize(1);
        primary().execute("insert-monitored", mongoClient2 -> {
            mongoClient2.getDatabase("dbit").getCollection("mhb").insertOne(Document.parse("{\"a\": 2, \"b\": 2}"), new InsertOneOptions().bypassDocumentValidation(true));
        });
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(2);
        Assertions.assertThat(consumeRecordsByTopic2.recordsForTopic("mongo.dbit.mhb")).hasSize(1);
        Map sourceOffset = ((SourceRecord) consumeRecordsByTopic2.recordsForTopic("mongo.dbit.mhb").get(0)).sourceOffset();
        Integer num = (Integer) sourceOffset.get("sec");
        Integer num2 = (Integer) sourceOffset.get("ord");
        Assertions.assertThat(consumeRecordsByTopic2.recordsForTopic("__debezium-heartbeat.mongo")).hasSize(1);
        Map sourceOffset2 = ((SourceRecord) consumeRecordsByTopic2.recordsForTopic("__debezium-heartbeat.mongo").get(0)).sourceOffset();
        Assertions.assertThat(num).isEqualTo((Integer) sourceOffset2.get("sec"));
        Assertions.assertThat(num2).isEqualTo((Integer) sourceOffset2.get("ord"));
        primary().execute("insert-nonmonitored", mongoClient3 -> {
            mongoClient3.getDatabase("dbit").getCollection("nmhb").insertOne(Document.parse("{\"a\": 3, \"b\": 2}"), new InsertOneOptions().bypassDocumentValidation(true));
        });
        List recordsForTopic = consumeRecordsByTopic(2).recordsForTopic("__debezium-heartbeat.mongo");
        Assertions.assertThat(recordsForTopic.size()).isGreaterThanOrEqualTo(1);
        recordsForTopic.forEach(sourceRecord -> {
            Map sourceOffset3 = sourceRecord.sourceOffset();
            Integer num3 = (Integer) sourceOffset3.get("sec");
            Assertions.assertThat(num3.intValue() > num.intValue() || (num3 == num && ((Integer) sourceOffset3.get("ord")).intValue() > num2.intValue()));
        });
        stopConnector();
    }

    @Test
    @FixFor({"DBZ-1292"})
    public void shouldOutputRecordsInCloudEventsFormat() throws Exception {
        this.config = TestHelper.getConfiguration().edit().with(MongoDbConnectorConfig.POLL_INTERVAL_MS, 10).with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.*").with(MongoDbConnectorConfig.LOGICAL_NAME, "mongo").build();
        this.context = new MongoDbTaskContext(this.config);
        TestHelper.cleanDatabase(primary(), "dbit");
        storeDocuments("dbit", "restaurants", "restaurants1.json");
        start(MongoDbConnector.class, this.config);
        for (SourceRecord sourceRecord : consumeRecordsByTopic(12).recordsForTopic("mongo.dbit.restaurants")) {
            CloudEventsConverterTest.shouldConvertToCloudEventsInJson(sourceRecord, false);
            CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro(sourceRecord, false);
            CloudEventsConverterTest.shouldConvertToCloudEventsInAvro(sourceRecord, "mongodb", "mongo", false);
        }
        storeDocuments("dbit", "restaurants", "restaurants2.json");
        for (SourceRecord sourceRecord2 : consumeRecordsByTopic(4).recordsForTopic("mongo.dbit.restaurants")) {
            CloudEventsConverterTest.shouldConvertToCloudEventsInJson(sourceRecord2, false);
            CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro(sourceRecord2, false);
            CloudEventsConverterTest.shouldConvertToCloudEventsInAvro(sourceRecord2, "mongodb", "mongo", false);
        }
        stopConnector();
    }

    @Test
    public void shouldGenerateRecordForInsertEvent() throws Exception {
        this.config = TestHelper.getConfiguration().edit().with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.*").with(MongoDbConnectorConfig.LOGICAL_NAME, "mongo").build();
        this.context = new MongoDbTaskContext(this.config);
        TestHelper.cleanDatabase(primary(), "dbit");
        start(MongoDbConnector.class, this.config);
        waitForStreamingRunning("mongodb", "mongo");
        Instant now = Instant.now();
        ObjectId objectId = new ObjectId();
        Document document = new Document("_id", objectId);
        insertDocuments("dbit", "c1", document);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.allRecordsInOrder().size()).isEqualTo(1);
        assertNoRecordsToConsume();
        SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic.allRecordsInOrder().get(0);
        Struct struct = (Struct) sourceRecord.key();
        Struct struct2 = (Struct) sourceRecord.value();
        Assertions.assertThat(struct.schema()).isSameAs(sourceRecord.keySchema());
        Assertions.assertThat(struct.get("id")).isEqualTo(formatObjectId(objectId));
        Assertions.assertThat(struct2.schema()).isSameAs(sourceRecord.valueSchema());
        Assertions.assertThat(struct2.getString("after")).isEqualTo(document.toJson(JsonSerialization.COMPACT_JSON_SETTINGS));
        Assertions.assertThat(struct2.getString("op")).isEqualTo(Envelope.Operation.CREATE.code());
        Assertions.assertThat(struct2.getInt64("ts_ms")).isGreaterThanOrEqualTo(now.toEpochMilli());
    }

    @Test
    public void shouldGenerateRecordForUpdateEvent() throws Exception {
        this.config = TestHelper.getConfiguration().edit().with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.*").with(MongoDbConnectorConfig.LOGICAL_NAME, "mongo").build();
        this.context = new MongoDbTaskContext(this.config);
        TestHelper.cleanDatabase(primary(), "dbit");
        start(MongoDbConnector.class, this.config);
        waitForStreamingRunning("mongodb", "mongo");
        ObjectId objectId = new ObjectId();
        insertDocuments("dbit", "c1", new Document("_id", objectId));
        consumeRecordsByTopic(1);
        assertNoRecordsToConsume();
        Document append = new Document().append("$set", new Document().append("name", "Sally"));
        Instant now = Instant.now();
        updateDocuments("dbit", "c1", Document.parse("{\"_id\": {\"$oid\": \"" + objectId + "\"}}"), append);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.allRecordsInOrder().size()).isEqualTo(1);
        assertNoRecordsToConsume();
        SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic.allRecordsInOrder().get(0);
        Struct struct = (Struct) sourceRecord.key();
        Struct struct2 = (Struct) sourceRecord.value();
        Assertions.assertThat(struct.schema()).isSameAs(sourceRecord.keySchema());
        Assertions.assertThat(struct.get("id")).isEqualTo(formatObjectId(objectId));
        Document parse = Document.parse(struct2.getString("patch"));
        parse.remove("$v");
        Assertions.assertThat(struct2.schema()).isSameAs(sourceRecord.valueSchema());
        Assertions.assertThat(struct2.getString("after")).isNull();
        Assertions.assertThat(parse.toJson(JsonSerialization.COMPACT_JSON_SETTINGS)).isEqualTo(append.toJson(JsonSerialization.COMPACT_JSON_SETTINGS));
        Assertions.assertThat(struct2.getString("op")).isEqualTo(Envelope.Operation.UPDATE.code());
        Assertions.assertThat(struct2.getInt64("ts_ms")).isGreaterThanOrEqualTo(now.toEpochMilli());
    }

    @Test
    public void shouldGeneratorRecordForDeleteEvent() throws Exception {
        this.config = TestHelper.getConfiguration().edit().with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.*").with(MongoDbConnectorConfig.LOGICAL_NAME, "mongo").build();
        this.context = new MongoDbTaskContext(this.config);
        TestHelper.cleanDatabase(primary(), "dbit");
        start(MongoDbConnector.class, this.config);
        waitForStreamingRunning("mongodb", "mongo");
        ObjectId objectId = new ObjectId();
        insertDocuments("dbit", "c1", new Document("_id", objectId));
        consumeRecordsByTopic(1);
        assertNoRecordsToConsume();
        Instant now = Instant.now();
        deleteDocument("dbit", "c1", objectId);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(2);
        Assertions.assertThat(consumeRecordsByTopic.allRecordsInOrder().size()).isEqualTo(2);
        assertNoRecordsToConsume();
        SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic.allRecordsInOrder().get(0);
        Struct struct = (Struct) sourceRecord.key();
        Struct struct2 = (Struct) sourceRecord.value();
        Assertions.assertThat(struct.schema()).isSameAs(sourceRecord.keySchema());
        Assertions.assertThat(struct.get("id")).isEqualTo(formatObjectId(objectId));
        Assertions.assertThat(struct2.schema()).isSameAs(sourceRecord.valueSchema());
        Assertions.assertThat(struct2.getString("after")).isNull();
        Assertions.assertThat(struct2.getString("patch")).isNull();
        Assertions.assertThat(struct2.getString("op")).isEqualTo(Envelope.Operation.DELETE.code());
        Assertions.assertThat(struct2.getInt64("ts_ms")).isGreaterThanOrEqualTo(now.toEpochMilli());
        SourceRecord sourceRecord2 = (SourceRecord) consumeRecordsByTopic.allRecordsInOrder().get(1);
        Struct struct3 = (Struct) sourceRecord2.key();
        Assertions.assertThat(struct3.schema()).isSameAs(sourceRecord2.keySchema());
        Assertions.assertThat(struct3.get("id")).isEqualTo(formatObjectId(objectId));
        Assertions.assertThat(sourceRecord2.value()).isNull();
        Assertions.assertThat(sourceRecord2.valueSchema()).isNull();
    }

    @Test
    @FixFor({"DBZ-582"})
    public void shouldGenerateRecordForDeleteEventWithoutTombstone() throws Exception {
        this.config = TestHelper.getConfiguration().edit().with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.*").with(MongoDbConnectorConfig.LOGICAL_NAME, "mongo").with(MongoDbConnectorConfig.TOMBSTONES_ON_DELETE, false).build();
        this.context = new MongoDbTaskContext(this.config);
        TestHelper.cleanDatabase(primary(), "dbit");
        start(MongoDbConnector.class, this.config);
        waitForStreamingRunning("mongodb", "mongo");
        ObjectId objectId = new ObjectId();
        insertDocuments("dbit", "c1", new Document("_id", objectId));
        consumeRecordsByTopic(1);
        assertNoRecordsToConsume();
        Instant now = Instant.now();
        deleteDocument("dbit", "c1", objectId);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.allRecordsInOrder().size()).isEqualTo(1);
        assertNoRecordsToConsume();
        SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic.allRecordsInOrder().get(0);
        Struct struct = (Struct) sourceRecord.key();
        Struct struct2 = (Struct) sourceRecord.value();
        Assertions.assertThat(struct.schema()).isSameAs(sourceRecord.keySchema());
        Assertions.assertThat(struct.get("id")).isEqualTo(formatObjectId(objectId));
        Assertions.assertThat(struct2.schema()).isSameAs(sourceRecord.valueSchema());
        Assertions.assertThat(struct2.getString("after")).isNull();
        Assertions.assertThat(struct2.getString("patch")).isNull();
        Assertions.assertThat(struct2.getString("op")).isEqualTo(Envelope.Operation.DELETE.code());
        Assertions.assertThat(struct2.getInt64("ts_ms")).isGreaterThanOrEqualTo(now.toEpochMilli());
    }

    @Test
    public void shouldGenerateRecordsWithCorrectlySerializedId() throws Exception {
        this.config = TestHelper.getConfiguration().edit().with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.*").with(MongoDbConnectorConfig.LOGICAL_NAME, "mongo").build();
        this.context = new MongoDbTaskContext(this.config);
        TestHelper.cleanDatabase(primary(), "dbit");
        start(MongoDbConnector.class, this.config);
        waitForStreamingRunning("mongodb", "mongo");
        Long l = 2147483647L;
        insertDocuments("dbit", "c1", new Document().append("_id", Long.valueOf(l.longValue() + 10)).append("name", "Sally"));
        insertDocuments("dbit", "c1", new Document().append("_id", "123").append("name", "Sally"));
        insertDocuments("dbit", "c1", new Document().append("_id", new Document().append("company", 32).append("dept", "home improvement")).append("name", "Sally"));
        Calendar calendar = Calendar.getInstance();
        calendar.set(2017, 9, 19);
        insertDocuments("dbit", "c1", new Document().append("_id", calendar.getTime()).append("name", "Sally"));
        boolean decimal128Supported = TestHelper.decimal128Supported(primary(), "mongo");
        if (decimal128Supported) {
            insertDocuments("dbit", "c1", new Document().append("_id", new Decimal128(new BigDecimal("123.45678"))).append("name", "Sally"));
        }
        List allRecordsInOrder = consumeRecordsByTopic(decimal128Supported ? 5 : 4).allRecordsInOrder();
        assertSourceRecordKeyFieldIsEqualTo((SourceRecord) allRecordsInOrder.get(0), "id", "2147483657");
        assertSourceRecordKeyFieldIsEqualTo((SourceRecord) allRecordsInOrder.get(1), "id", "\"123\"");
        assertSourceRecordKeyFieldIsEqualTo((SourceRecord) allRecordsInOrder.get(2), "id", "{\"company\": 32,\"dept\": \"home improvement\"}");
        assertSourceRecordKeyFieldIsEqualTo((SourceRecord) allRecordsInOrder.get(3), "id", "{\"$date\": \"" + ZonedDateTime.ofInstant(Instant.ofEpochMilli(calendar.getTimeInMillis()), ZoneId.of("Z")).format(DateTimeFormatter.ISO_OFFSET_DATE_TIME) + "\"}");
        if (decimal128Supported) {
            assertSourceRecordKeyFieldIsEqualTo((SourceRecord) allRecordsInOrder.get(4), "id", "{\"$numberDecimal\": \"123.45678\"}");
        }
    }

    private static void assertSourceRecordKeyFieldIsEqualTo(SourceRecord sourceRecord, String str, String str2) {
        Assertions.assertThat(((Struct) sourceRecord.key()).get(str)).isEqualTo(str2);
    }

    @Test
    public void shouldSupportDbRef2() throws Exception {
        this.config = TestHelper.getConfiguration().edit().with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.*").with(MongoDbConnectorConfig.LOGICAL_NAME, "mongo").build();
        this.context = new MongoDbTaskContext(this.config);
        TestHelper.cleanDatabase(primary(), "dbit");
        start(MongoDbConnector.class, this.config);
        waitForStreamingRunning("mongodb", "mongo");
        ObjectId objectId = new ObjectId();
        Document append = new Document().append("_id", objectId).append("name", "Sally").append("ref", new DBRef("othercollection", 15));
        Instant now = Instant.now();
        insertDocuments("dbit", "c1", append);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.topics().size()).isEqualTo(1);
        Assertions.assertThat(consumeRecordsByTopic.allRecordsInOrder().size()).isEqualTo(1);
        SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic.allRecordsInOrder().get(0);
        Struct struct = (Struct) sourceRecord.key();
        Struct struct2 = (Struct) sourceRecord.value();
        Assertions.assertThat(struct.schema()).isSameAs(sourceRecord.keySchema());
        Assertions.assertThat(struct.get("id")).isEqualTo(formatObjectId(objectId));
        Assertions.assertThat(struct2.schema()).isSameAs(sourceRecord.valueSchema());
        Assertions.assertThat(struct2.getString("after")).isEqualTo("{\"_id\": {\"$oid\": \"" + objectId + "\"},\"name\": \"Sally\",\"ref\": {\"$ref\": \"othercollection\",\"$id\": 15}}");
        Assertions.assertThat(struct2.getString("op")).isEqualTo(Envelope.Operation.CREATE.code());
        Assertions.assertThat(struct2.getInt64("ts_ms")).isGreaterThanOrEqualTo(now.toEpochMilli());
    }

    @Test
    public void shouldReplicateContent() throws Exception {
        this.config = TestHelper.getConfiguration().edit().with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbA.contacts").with(MongoDbConnectorConfig.LOGICAL_NAME, "mongo").with(MongoDbConnectorConfig.SNAPSHOT_MODE, MongoDbConnectorConfig.SnapshotMode.INITIAL).build();
        this.context = new MongoDbTaskContext(this.config);
        TestHelper.cleanDatabase(primary(), "dbA");
        primary().execute("shouldCreateContactsDatabase", mongoClient -> {
            MongoDatabase database = mongoClient.getDatabase("dbA");
            database.getCollection("contacts").insertOne(new Document().append("name", "Jon Snow"), new InsertOneOptions().bypassDocumentValidation(true));
            Assertions.assertThat(database.getCollection("contacts").countDocuments()).isEqualTo(1L);
            MongoCursor it = database.getCollection("contacts").find(Filters.eq("name", "Jon Snow")).iterator();
            try {
                Assertions.assertThat(((Document) it.tryNext()).getString("name")).isEqualTo("Jon Snow");
                Assertions.assertThat((Map) it.tryNext()).isNull();
                if (it != null) {
                    it.close();
                }
            } catch (Throwable th) {
                if (it != null) {
                    try {
                        it.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        });
        start(MongoDbConnector.class, this.config);
        waitForStreamingRunning("mongodb", "mongo");
        Object[] objArr = {"Jon Snow", "Sally Hamm"};
        primary().execute("shouldAddMoreRecordsToContacts", mongoClient2 -> {
            MongoDatabase database = mongoClient2.getDatabase("dbA");
            database.getCollection("contacts").insertOne(new Document().append("name", "Sally Hamm"), new InsertOneOptions().bypassDocumentValidation(true));
            Assertions.assertThat(database.getCollection("contacts").countDocuments()).isEqualTo(2L);
            FindIterable find = database.getCollection("contacts").find();
            HashSet hashSet = new HashSet();
            MongoCursor it = find.iterator();
            while (it.hasNext()) {
                try {
                    hashSet.add(((Document) it.next()).getString("name"));
                } catch (Throwable th) {
                    if (it != null) {
                        try {
                            it.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            }
            if (it != null) {
                it.close();
            }
            Assertions.assertThat(hashSet).containsOnly(objArr);
        });
        List allRecordsInOrder = consumeRecordsByTopic(2).allRecordsInOrder();
        HashSet hashSet = new HashSet();
        allRecordsInOrder.forEach(sourceRecord -> {
            VerifyRecord.isValid(sourceRecord);
            Struct struct = (Struct) sourceRecord.value();
            hashSet.add(Document.parse(struct.getString("after")).getString("name"));
            Envelope.Operation forCode = Envelope.Operation.forCode(struct.getString("op"));
            Assertions.assertThat(forCode == Envelope.Operation.READ || forCode == Envelope.Operation.CREATE).isTrue();
        });
        assertNoRecordsToConsume();
        Assertions.assertThat(hashSet).containsOnly(objArr);
        stopConnector();
        start(MongoDbConnector.class, this.config);
        waitForStreamingRunning("mongodb", "mongo");
        assertNoRecordsToConsume();
        AtomicReference atomicReference = new AtomicReference();
        primary().execute("removeJohnSnow", mongoClient3 -> {
            MongoDatabase database = mongoClient3.getDatabase("dbA");
            MongoCollection collection = database.getCollection("contacts");
            Bson eq = Filters.eq("name", "Jon Snow");
            MongoCursor it = database.getCollection("contacts").find(eq).iterator();
            try {
                Document document = (Document) it.tryNext();
                Assertions.assertThat(document.getString("name")).isEqualTo("Jon Snow");
                Assertions.assertThat((Map) it.tryNext()).isNull();
                atomicReference.set(document.getObjectId("_id"));
                Assertions.assertThat(atomicReference.get()).isNotNull();
                if (it != null) {
                    it.close();
                }
                collection.deleteOne(eq);
            } catch (Throwable th) {
                if (it != null) {
                    try {
                        it.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        });
        List allRecordsInOrder2 = consumeRecordsByTopic(2).allRecordsInOrder();
        HashSet hashSet2 = new HashSet();
        allRecordsInOrder2.forEach(sourceRecord2 -> {
            VerifyRecord.isValid(sourceRecord2);
            hashSet2.add(toObjectId(((Struct) sourceRecord2.key()).getString("id")));
            if (sourceRecord2.value() != null) {
                Assertions.assertThat(Envelope.Operation.forCode(((Struct) sourceRecord2.value()).getString("op"))).isEqualTo(Envelope.Operation.DELETE);
            }
        });
        stopConnector();
        initializeConnectorTestFramework();
        start(MongoDbConnector.class, this.config);
        waitForSnapshotToBeCompleted("mongodb", "mongo");
        List allRecordsInOrder3 = consumeRecordsByTopic(1).allRecordsInOrder();
        hashSet.clear();
        allRecordsInOrder3.forEach(sourceRecord3 -> {
            VerifyRecord.isValid(sourceRecord3);
            Struct struct = (Struct) sourceRecord3.value();
            hashSet.add(Document.parse(struct.getString("after")).getString("name"));
            Assertions.assertThat(Envelope.Operation.forCode(struct.getString("op"))).isEqualTo(Envelope.Operation.READ);
        });
        Assertions.assertThat(hashSet).containsOnly(new Object[]{"Sally Hamm"});
        waitForStreamingRunning("mongodb", "mongo");
        assertNoRecordsToConsume();
    }

    @Test
    public void shouldNotReplicateSnapshot() throws Exception {
        this.config = TestHelper.getConfiguration().edit().with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbA.contacts").with(MongoDbConnectorConfig.LOGICAL_NAME, "mongo").with(MongoDbConnectorConfig.SNAPSHOT_MODE, MongoDbConnectorConfig.SnapshotMode.NEVER).build();
        this.context = new MongoDbTaskContext(this.config);
        TestHelper.cleanDatabase(primary(), "dbA");
        primary().execute("shouldCreateContactsDatabase", mongoClient -> {
            MongoDatabase database = mongoClient.getDatabase("dbA");
            database.getCollection("contacts").insertOne(new Document().append("name", "Jon Snow"), new InsertOneOptions().bypassDocumentValidation(true));
            Assertions.assertThat(database.getCollection("contacts").countDocuments()).isEqualTo(1L);
        });
        start(MongoDbConnector.class, this.config);
        waitForStreamingRunning("mongodb", "mongo");
        primary().execute("shouldAddMoreRecordsToContacts", mongoClient2 -> {
            MongoDatabase database = mongoClient2.getDatabase("dbA");
            database.getCollection("contacts").insertOne(new Document().append("name", "Ygritte"), new InsertOneOptions().bypassDocumentValidation(true));
            Assertions.assertThat(database.getCollection("contacts").countDocuments()).isEqualTo(2L);
        });
        List allRecordsInOrder = consumeRecordsByTopic(1).allRecordsInOrder();
        HashSet hashSet = new HashSet();
        allRecordsInOrder.forEach(sourceRecord -> {
            VerifyRecord.isValid(sourceRecord);
            Struct struct = (Struct) sourceRecord.value();
            hashSet.add(Document.parse(struct.getString("after")).getString("name"));
            Assertions.assertThat(Envelope.Operation.forCode(struct.getString("op"))).isEqualTo(Envelope.Operation.CREATE);
        });
        assertNoRecordsToConsume();
        Assertions.assertThat(hashSet).containsOnly(new Object[]{"Ygritte"});
    }

    @Test
    @FixFor({"DBZ-1880"})
    public void shouldGenerateRecordForUpdateEventUsingLegacyV1SourceInfo() throws Exception {
        this.config = TestHelper.getConfiguration().edit().with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.*").with(CommonConnectorConfig.SOURCE_STRUCT_MAKER_VERSION, "v1").with(MongoDbConnectorConfig.LOGICAL_NAME, "mongo").build();
        this.context = new MongoDbTaskContext(this.config);
        TestHelper.cleanDatabase(primary(), "dbit");
        start(MongoDbConnector.class, this.config);
        waitForStreamingRunning("mongodb", "mongo");
        ObjectId objectId = new ObjectId();
        insertDocuments("dbit", "c1", new Document("_id", objectId).append("name", "John"));
        consumeRecordsByTopic(1);
        assertNoRecordsToConsume();
        Document append = new Document().append("$set", new Document().append("name", "Sally"));
        Instant now = Instant.now();
        updateDocuments("dbit", "c1", Document.parse("{\"_id\": {\"$oid\": \"" + objectId + "\"}}"), append);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.allRecordsInOrder().size()).isEqualTo(1);
        assertNoRecordsToConsume();
        SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic.allRecordsInOrder().get(0);
        Struct struct = (Struct) sourceRecord.key();
        Struct struct2 = (Struct) sourceRecord.value();
        Assertions.assertThat(struct.schema()).isSameAs(sourceRecord.keySchema());
        Assertions.assertThat(struct.get("id")).isEqualTo(formatObjectId(objectId));
        Document parse = Document.parse(struct2.getString("patch"));
        parse.remove("$v");
        Assertions.assertThat(struct2.schema()).isSameAs(sourceRecord.valueSchema());
        Assertions.assertThat(struct2.getString("after")).isNull();
        Assertions.assertThat(parse.toJson(JsonSerialization.COMPACT_JSON_SETTINGS)).isEqualTo(append.toJson(JsonSerialization.COMPACT_JSON_SETTINGS));
        Assertions.assertThat(struct2.getString("op")).isEqualTo(Envelope.Operation.UPDATE.code());
        Assertions.assertThat(struct2.getInt64("ts_ms")).isGreaterThanOrEqualTo(now.toEpochMilli());
    }

    private String formatObjectId(ObjectId objectId) {
        return "{\"$oid\": \"" + objectId + "\"}";
    }

    private void insertDocuments(String str, String str2, Document... documentArr) {
        primary().execute("store documents", mongoClient -> {
            Testing.debug("Storing in '" + str + "." + str2 + "' document");
            MongoCollection collection = mongoClient.getDatabase(str).getCollection(str2);
            for (Document document : documentArr) {
                InsertOneOptions bypassDocumentValidation = new InsertOneOptions().bypassDocumentValidation(true);
                Assertions.assertThat(document).isNotNull();
                Assertions.assertThat(document.size()).isGreaterThan(0);
                collection.insertOne(document, bypassDocumentValidation);
            }
        });
    }

    private void updateDocuments(String str, String str2, Document document, Document document2) {
        primary().execute("update", mongoClient -> {
            mongoClient.getDatabase(str).getCollection(str2).updateOne(document, document2);
        });
    }

    private void deleteDocument(String str, String str2, ObjectId objectId) {
        primary().execute("delete", mongoClient -> {
            mongoClient.getDatabase(str).getCollection(str2).deleteOne(Document.parse("{\"_id\": {\"$oid\": \"" + objectId + "\"}}"));
        });
    }

    private ObjectId toObjectId(String str) {
        return new ObjectId(str.substring(10, str.length() - 2));
    }
}
