package io.debezium.connector.mongodb.transforms;

import io.debezium.connector.mongodb.TestHelper;
import io.debezium.data.Envelope;
import io.debezium.data.Json;
import io.debezium.data.SchemaUtil;
import io.debezium.data.VerifyRecord;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.util.Collect;
import io.debezium.util.IoUtil;
import io.debezium.util.Testing;
import java.io.IOException;
import java.io.InputStream;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.source.SourceRecord;
import org.bson.Document;
import org.bson.RawBsonDocument;
import org.bson.types.ObjectId;
import org.fest.assertions.Assertions;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:io/debezium/connector/mongodb/transforms/ExtractNewDocumentStateTestIT.class */
public class ExtractNewDocumentStateTestIT extends AbstractExtractNewDocumentStateTestIT {
    private static final String CONFIG_DROP_TOMBSTONES = "drop.tombstones";
    private static final String HANDLE_DELETES = "delete.handling.mode";
    private static final String FLATTEN_STRUCT = "flatten.struct";
    private static final String DELIMITER = "flatten.struct.delimiter";
    private static final String OPERATION_HEADER = "operation.header";
    private static final String DROP_TOMBSTONE = "drop.tombstones";
    private static final String ADD_SOURCE_FIELDS = "add.source.fields";
    private static final String ADD_HEADERS = "add.headers";
    private static final String ADD_FIELDS = "add.fields";
    private static final String ADD_FIELDS_PREFIX = "add.fields.prefix";
    private static final String ADD_HEADERS_PREFIX = "add.headers.prefix";
    private static final String ARRAY_ENCODING = "array.encoding";

    @Override // io.debezium.connector.mongodb.transforms.AbstractExtractNewDocumentStateTestIT
    protected String getCollectionName() {
        return "functional";
    }

    @Test
    @FixFor({"DBZ-563"})
    public void shouldDropTombstoneByDefault() throws InterruptedException {
        primary().execute("insert", mongoClient -> {
            mongoClient.getDatabase("transform_operations").getCollection(getCollectionName()).insertOne(Document.parse("{'_id': 1, 'dataStr': 'hello', 'dataInt': 123, 'dataLong': 80000000000}"));
        });
        Assertions.assertThat(consumeRecordsByTopic(1).recordsForTopic(topicName()).size()).isEqualTo(1);
        primary().execute("delete", mongoClient2 -> {
            mongoClient2.getDatabase("transform_operations").getCollection(getCollectionName()).deleteOne(RawBsonDocument.parse("{'_id' : 1}"));
        });
        Assertions.assertThat(this.transformation.apply(getRecordByOperation(Envelope.Operation.DELETE))).isNull();
        SourceRecord nextRecord = getNextRecord();
        Assertions.assertThat(nextRecord).isNotNull();
        Assertions.assertThat(this.transformation.apply(nextRecord)).isNull();
    }

    @Test
    public void shouldTransformEvents() throws InterruptedException, IOException {
        HashMap hashMap = new HashMap();
        hashMap.put("drop.tombstones", "false");
        hashMap.put(HANDLE_DELETES, "none");
        this.transformation.configure(hashMap);
        primary().execute("insert", mongoClient -> {
            mongoClient.getDatabase("transform_operations").getCollection(getCollectionName()).insertOne(Document.parse("{  '_id': 1,   'dataStr': 'hello',   'dataInt': 123,   'dataLong': 80000000000,   'dataDate': ISODate(\"2020-01-27T10:47:12.311Z\"),   'dataTimestamp': Timestamp(" + ZonedDateTime.of(2020, 1, 28, 10, 0, 33, 0, ZoneId.of("UTC")).toEpochSecond() + ", 1)}"));
        });
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(topicName()).size()).isEqualTo(1);
        SourceRecord apply = this.transformation.apply((SourceRecord) consumeRecordsByTopic.recordsForTopic(topicName()).get(0));
        Struct struct = (Struct) apply.value();
        Assertions.assertThat(apply.valueSchema().field("id").schema()).isEqualTo(Schema.OPTIONAL_INT32_SCHEMA);
        Assertions.assertThat(apply.valueSchema().field("dataStr").schema()).isEqualTo(Schema.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat(apply.valueSchema().field("dataInt").schema()).isEqualTo(Schema.OPTIONAL_INT32_SCHEMA);
        Assertions.assertThat(apply.valueSchema().field("dataLong").schema()).isEqualTo(Schema.OPTIONAL_INT64_SCHEMA);
        Assertions.assertThat(struct.get("id")).isEqualTo(1);
        Assertions.assertThat(struct.get("dataStr")).isEqualTo("hello");
        Assertions.assertThat(struct.get("dataInt")).isEqualTo(123);
        Assertions.assertThat(struct.get("dataLong")).isEqualTo(80000000000L);
        Assertions.assertThat(struct.get("dataDate")).isEqualTo(Date.from(Instant.from(ZonedDateTime.of(2020, 1, 27, 10, 47, 12, 311000000, ZoneId.of("UTC")))));
        Assertions.assertThat(struct.get("dataTimestamp")).isEqualTo(Date.from(Instant.from(ZonedDateTime.of(2020, 1, 28, 10, 0, 33, 0, ZoneId.of("UTC")))));
        primary().execute("update", mongoClient2 -> {
            mongoClient2.getDatabase("transform_operations").getCollection(getCollectionName()).updateOne(RawBsonDocument.parse("{'_id' : 1}"), RawBsonDocument.parse("{'$set': {'dataStr': 'bye'}}"));
        });
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(1);
        if (((Struct) ((SourceRecord) consumeRecordsByTopic2.recordsForTopic(topicName()).get(0)).value()).get("op").equals("c")) {
            consumeRecordsByTopic2 = consumeRecordsByTopic(1);
        }
        Assertions.assertThat(consumeRecordsByTopic2.recordsForTopic(topicName()).size()).isEqualTo(1);
        SourceRecord apply2 = this.transformation.apply((SourceRecord) consumeRecordsByTopic2.recordsForTopic(topicName()).get(0));
        Struct struct2 = (Struct) apply2.value();
        Assertions.assertThat(apply2.valueSchema().field("id").schema()).isEqualTo(Schema.OPTIONAL_INT32_SCHEMA);
        Assertions.assertThat(apply2.valueSchema().field("dataStr").schema()).isEqualTo(Schema.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat(struct2.get("id")).isEqualTo(1);
        Assertions.assertThat(struct2.get("dataStr")).isEqualTo("bye");
        primary().execute("update", mongoClient3 -> {
            mongoClient3.getDatabase("transform_operations").getCollection(getCollectionName()).updateOne(RawBsonDocument.parse("{'_id' : 1}"), RawBsonDocument.parse("{'$set': {'newStr': 'hello', 'dataInt': 456}}"));
        });
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic3 = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic3.recordsForTopic(topicName()).size()).isEqualTo(1);
        SourceRecord apply3 = this.transformation.apply((SourceRecord) consumeRecordsByTopic3.recordsForTopic(topicName()).get(0));
        Struct struct3 = (Struct) apply3.value();
        Assertions.assertThat(apply3.valueSchema().field("id").schema()).isEqualTo(Schema.OPTIONAL_INT32_SCHEMA);
        Assertions.assertThat(apply3.valueSchema().field("newStr").schema()).isEqualTo(Schema.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat(apply3.valueSchema().field("dataInt").schema()).isEqualTo(Schema.OPTIONAL_INT32_SCHEMA);
        Assertions.assertThat(struct3.get("id")).isEqualTo(1);
        Assertions.assertThat(struct3.get("newStr")).isEqualTo("hello");
        Assertions.assertThat(struct3.get("dataInt")).isEqualTo(456);
        primary().execute("update", mongoClient4 -> {
            mongoClient4.getDatabase("transform_operations").getCollection(getCollectionName()).updateOne(RawBsonDocument.parse("{'_id' : 1}"), RawBsonDocument.parse("{'$unset': {'newStr': ''}}"));
        });
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic4 = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic4.recordsForTopic(topicName()).size()).isEqualTo(1);
        SourceRecord apply4 = this.transformation.apply((SourceRecord) consumeRecordsByTopic4.recordsForTopic(topicName()).get(0));
        Struct struct4 = (Struct) apply4.value();
        Assertions.assertThat(apply4.valueSchema().field("id").schema()).isEqualTo(Schema.OPTIONAL_INT32_SCHEMA);
        Assertions.assertThat(apply4.valueSchema().field("newStr").schema()).isEqualTo(Schema.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat(struct4.get("id")).isEqualTo(1);
        Assertions.assertThat(struct4.get("newStr")).isEqualTo((Object) null);
        primary().execute("update", mongoClient5 -> {
            mongoClient5.getDatabase("transform_operations").getCollection(getCollectionName()).updateOne(RawBsonDocument.parse("{'_id' : 1}"), RawBsonDocument.parse("{'dataStr': 'Hi again'}"));
        });
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic5 = consumeRecordsByTopic(1);
        if (((Struct) ((SourceRecord) consumeRecordsByTopic5.recordsForTopic(topicName()).get(0)).value()).get("op").equals("c")) {
            consumeRecordsByTopic5 = consumeRecordsByTopic(1);
        }
        Assertions.assertThat(consumeRecordsByTopic5.recordsForTopic(topicName()).size()).isEqualTo(1);
        SourceRecord apply5 = this.transformation.apply((SourceRecord) consumeRecordsByTopic5.recordsForTopic(topicName()).get(0));
        Struct struct5 = (Struct) apply5.value();
        Assertions.assertThat(apply5.valueSchema().field("id").schema()).isEqualTo(Schema.OPTIONAL_INT32_SCHEMA);
        Assertions.assertThat(apply5.valueSchema().field("dataStr").schema()).isEqualTo(Schema.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat(struct5.get("id")).isEqualTo(1);
        Assertions.assertThat(struct5.get("dataStr")).isEqualTo("Hi again");
        primary().execute("delete", mongoClient6 -> {
            mongoClient6.getDatabase("transform_operations").getCollection(getCollectionName()).deleteOne(RawBsonDocument.parse("{'_id' : 1}"));
        });
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic6 = consumeRecordsByTopic(2);
        Assertions.assertThat(consumeRecordsByTopic6.recordsForTopic(topicName()).size()).isEqualTo(2);
        SourceRecord apply6 = this.transformation.apply((SourceRecord) consumeRecordsByTopic6.recordsForTopic(topicName()).get(0));
        Assertions.assertThat((Struct) apply6.value()).isNull();
        SourceRecord apply7 = this.transformation.apply((SourceRecord) consumeRecordsByTopic6.recordsForTopic(topicName()).get(1));
        Assertions.assertThat(apply7.value()).isNull();
        Assertions.assertThat(SchemaUtil.asString(apply6.keySchema())).isEqualTo(SchemaUtil.asString(apply7.keySchema()));
        Assertions.assertThat(apply6.key().toString()).isEqualTo(apply7.key().toString());
    }

    @Test
    @FixFor({"DBZ-1767"})
    public void shouldSupportDbRef() throws InterruptedException, IOException {
        HashMap hashMap = new HashMap();
        hashMap.put(ARRAY_ENCODING, "array");
        hashMap.put(OPERATION_HEADER, "true");
        hashMap.put("sanitize.field.names", "true");
        this.transformation.configure(hashMap);
        primary().execute("insert", mongoClient -> {
            mongoClient.getDatabase("transform_operations").getCollection(getCollectionName()).insertOne(Document.parse("{ '_id' : 2, 'data' : { '$ref' : 'a2', '$id' : 4, '$db' : 'b2' } }"));
        });
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(topicName()).size()).isEqualTo(1);
        SourceRecord sourceRecord = (SourceRecord) this.transformation.apply((SourceRecord) consumeRecordsByTopic.allRecordsInOrder().get(0));
        validate(sourceRecord);
        Struct struct = ((Struct) sourceRecord.value()).getStruct("data");
        Assertions.assertThat(struct.getString("_ref")).isEqualTo("a2");
        Assertions.assertThat(struct.getInt32("_id")).isEqualTo(4);
        Assertions.assertThat(struct.getString("_db")).isEqualTo("b2");
    }

    @Test
    @FixFor({"DBZ-2680"})
    public void shouldSupportSubSanitizeFieldName() throws InterruptedException, IOException {
        HashMap hashMap = new HashMap();
        hashMap.put(ARRAY_ENCODING, "array");
        hashMap.put(OPERATION_HEADER, "true");
        hashMap.put("sanitize.field.names", "true");
        this.transformation.configure(hashMap);
        primary().execute("insert", mongoClient -> {
            mongoClient.getDatabase("transform_operations").getCollection(getCollectionName()).insertOne(Document.parse("{  \"_id\": \"222\",  \"metrics\": {    \"metric::fct\": {      \"min\": 0,      \"max\": 1,    },  }}"));
        });
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(topicName()).size()).isEqualTo(1);
        SourceRecord sourceRecord = (SourceRecord) this.transformation.apply((SourceRecord) consumeRecordsByTopic.allRecordsInOrder().get(0));
        validate(sourceRecord);
        Struct struct = ((Struct) sourceRecord.value()).getStruct("metrics").getStruct("metric__fct");
        Assertions.assertThat(struct.getInt32("min")).isEqualTo(0);
        Assertions.assertThat(struct.getInt32("max")).isEqualTo(1);
    }

    @Test
    @FixFor({"DBZ-1442"})
    public void shouldAddSourceFields() throws InterruptedException {
        waitForStreamingRunning();
        HashMap hashMap = new HashMap();
        hashMap.put(ADD_SOURCE_FIELDS, "h,ts_ms,ord , db,rs");
        this.transformation.configure(hashMap);
        primary().execute("insert", mongoClient -> {
            mongoClient.getDatabase("transform_operations").getCollection(getCollectionName()).insertOne(Document.parse("{ '_id' : 3, 'name' : 'Tim' }"));
        });
        Assertions.assertThat(consumeRecordsByTopic(1).recordsForTopic(topicName()).size()).isEqualTo(1);
        assertNoRecordsToConsume();
        primary().execute("update", mongoClient2 -> {
            mongoClient2.getDatabase("transform_operations").getCollection(getCollectionName()).updateOne(RawBsonDocument.parse("{'_id' : 3}"), RawBsonDocument.parse("{'$set': {'name': 'Sally'}}"));
        });
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(topicName()).size()).isEqualTo(1);
        assertNoRecordsToConsume();
        SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic.allRecordsInOrder().get(0);
        Struct struct = ((Struct) sourceRecord.value()).getStruct("source");
        SourceRecord sourceRecord2 = (SourceRecord) this.transformation.apply(sourceRecord);
        validate(sourceRecord2);
        Struct struct2 = (Struct) sourceRecord2.value();
        Assertions.assertThat(struct2.get("__h")).isEqualTo(struct.getInt64("h"));
        Assertions.assertThat(struct2.get("__ts_ms")).isEqualTo(struct.getInt64("ts_ms"));
        Assertions.assertThat(struct2.get("__ord")).isEqualTo(struct.getInt32("ord"));
        Assertions.assertThat(struct2.get("__db")).isEqualTo(struct.getString("db"));
        Assertions.assertThat(struct2.get("__rs")).isEqualTo(struct.getString("rs"));
        Assertions.assertThat(struct2.get("__db")).isEqualTo("transform_operations");
        Assertions.assertThat(struct2.get("__rs")).isEqualTo("rs0");
    }

    @Test
    @FixFor({"DBZ-1442"})
    public void shouldAddSourceFieldsForRewriteDeleteEvent() throws InterruptedException {
        waitForStreamingRunning();
        HashMap hashMap = new HashMap();
        hashMap.put(ADD_SOURCE_FIELDS, "h,ts_ms,ord,db,rs");
        hashMap.put(HANDLE_DELETES, "rewrite");
        this.transformation.configure(hashMap);
        primary().execute("insert", mongoClient -> {
            mongoClient.getDatabase("transform_operations").getCollection(getCollectionName()).insertOne(Document.parse("{ '_id' : 4, 'name' : 'Sally' }"));
        });
        Assertions.assertThat(consumeRecordsByTopic(1).recordsForTopic(topicName()).size()).isEqualTo(1);
        assertNoRecordsToConsume();
        primary().execute("delete", mongoClient2 -> {
            mongoClient2.getDatabase("transform_operations").getCollection(getCollectionName()).deleteOne(RawBsonDocument.parse("{ '_id' : 4 }"));
        });
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(2);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(topicName()).size()).isEqualTo(2);
        assertNoRecordsToConsume();
        SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic.allRecordsInOrder().get(0);
        Struct struct = ((Struct) sourceRecord.value()).getStruct("source");
        SourceRecord sourceRecord2 = (SourceRecord) this.transformation.apply(sourceRecord);
        validate(sourceRecord2);
        Struct struct2 = (Struct) sourceRecord2.value();
        Assertions.assertThat(struct2.get("__h")).isEqualTo(struct.getInt64("h"));
        Assertions.assertThat(struct2.get("__ts_ms")).isEqualTo(struct.getInt64("ts_ms"));
        Assertions.assertThat(struct2.get("__ord")).isEqualTo(struct.getInt32("ord"));
        Assertions.assertThat(struct2.get("__db")).isEqualTo(struct.getString("db"));
        Assertions.assertThat(struct2.get("__rs")).isEqualTo(struct.getString("rs"));
        Assertions.assertThat(struct2.get("__db")).isEqualTo("transform_operations");
        Assertions.assertThat(struct2.get("__rs")).isEqualTo("rs0");
    }

    @Test
    public void shouldTransformRecordForInsertEvent() throws InterruptedException {
        waitForStreamingRunning();
        HashMap hashMap = new HashMap();
        hashMap.put(OPERATION_HEADER, "true");
        this.transformation.configure(hashMap);
        ObjectId objectId = new ObjectId();
        Document append = new Document().append("_id", objectId).append("name", "Sally").append("phone", 123L).append("active", true).append("scores", Arrays.asList(Double.valueOf(1.2d), Double.valueOf(3.4d), Double.valueOf(5.6d)));
        primary().execute("insert", mongoClient -> {
            mongoClient.getDatabase("transform_operations").getCollection(getCollectionName()).insertOne(append);
        });
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(topicName()).size()).isEqualTo(1);
        assertNoRecordsToConsume();
        SourceRecord sourceRecord = (SourceRecord) this.transformation.apply((SourceRecord) consumeRecordsByTopic.allRecordsInOrder().get(0));
        validate(sourceRecord);
        Iterator allWithName = sourceRecord.headers().allWithName("__op");
        Assertions.assertThat(allWithName.hasNext()).isTrue();
        Assertions.assertThat(((Header) allWithName.next()).value().toString()).isEqualTo(Envelope.Operation.CREATE.code());
        Struct struct = (Struct) sourceRecord.key();
        Struct struct2 = (Struct) sourceRecord.value();
        Assertions.assertThat(struct.schema()).isSameAs(sourceRecord.keySchema());
        Assertions.assertThat(struct.schema().field("id").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat(struct.get("id")).isEqualTo(objectId.toString());
        Assertions.assertThat(struct2.schema().name()).isEqualTo("mongo.transform_operations." + getCollectionName());
        Assertions.assertThat(struct2.schema()).isSameAs(sourceRecord.valueSchema());
        Assertions.assertThat(struct2.get("name")).isEqualTo("Sally");
        Assertions.assertThat(struct2.get("id")).isEqualTo(objectId.toString());
        Assertions.assertThat(struct2.get("phone")).isEqualTo(123L);
        Assertions.assertThat(struct2.get("active")).isEqualTo(true);
        Assertions.assertThat(struct2.get("scores")).isEqualTo(Arrays.asList(Double.valueOf(1.2d), Double.valueOf(3.4d), Double.valueOf(5.6d)));
        Assertions.assertThat(struct2.schema().field("id").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat(struct2.schema().field("name").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat(struct2.schema().field("phone").schema()).isEqualTo(SchemaBuilder.OPTIONAL_INT64_SCHEMA);
        Assertions.assertThat(struct2.schema().field("active").schema()).isEqualTo(SchemaBuilder.OPTIONAL_BOOLEAN_SCHEMA);
        Assertions.assertThat(struct2.schema().field("scores").schema()).isEqualTo(SchemaBuilder.array(SchemaBuilder.OPTIONAL_FLOAT64_SCHEMA).optional().build());
        Assertions.assertThat(struct2.schema().fields()).hasSize(5);
    }

    @Test
    public void shouldTransformRecordForInsertEventWithComplexIdType() throws InterruptedException {
        waitForStreamingRunning();
        Document append = new Document().append("_id", new Document().append("company", 32).append("dept", "home improvement")).append("name", "Sally");
        primary().execute("insert", mongoClient -> {
            mongoClient.getDatabase("transform_operations").getCollection(getCollectionName()).insertOne(append);
        });
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(topicName()).size()).isEqualTo(1);
        assertNoRecordsToConsume();
        SourceRecord sourceRecord = (SourceRecord) this.transformation.apply((SourceRecord) consumeRecordsByTopic.allRecordsInOrder().get(0));
        validate(sourceRecord);
        Struct struct = (Struct) sourceRecord.key();
        Struct struct2 = (Struct) sourceRecord.value();
        Assertions.assertThat(struct.schema()).isSameAs(sourceRecord.keySchema());
        Assertions.assertThat(struct.schema().field("id").schema().field("company").schema()).isEqualTo(SchemaBuilder.OPTIONAL_INT32_SCHEMA);
        Assertions.assertThat(struct.schema().field("id").schema().field("dept").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat(((Struct) struct.get("id")).get("company")).isEqualTo(32);
        Assertions.assertThat(((Struct) struct.get("id")).get("dept")).isEqualTo("home improvement");
        Assertions.assertThat(struct2.schema()).isSameAs(sourceRecord.valueSchema());
        Assertions.assertThat(((Struct) struct2.get("id")).get("company")).isEqualTo(32);
        Assertions.assertThat(((Struct) struct2.get("id")).get("dept")).isEqualTo("home improvement");
        Assertions.assertThat(struct2.get("name")).isEqualTo("Sally");
        Assertions.assertThat(struct2.schema().field("id").schema().field("company").schema()).isEqualTo(SchemaBuilder.OPTIONAL_INT32_SCHEMA);
        Assertions.assertThat(struct2.schema().field("id").schema().field("dept").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat(struct2.schema().field("name").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat(struct2.schema().fields()).hasSize(2);
    }

    @Test
    public void shouldGenerateRecordForUpdateEvent() throws InterruptedException {
        waitForStreamingRunning();
        HashMap hashMap = new HashMap();
        hashMap.put(OPERATION_HEADER, "true");
        this.transformation.configure(hashMap);
        ObjectId objectId = new ObjectId();
        Document append = new Document().append("_id", objectId).append("name", "Tim");
        primary().execute("insert", mongoClient -> {
            mongoClient.getDatabase("transform_operations").getCollection(getCollectionName()).insertOne(append);
        });
        Assertions.assertThat(consumeRecordsByTopic(1).recordsForTopic(topicName()).size()).isEqualTo(1);
        assertNoRecordsToConsume();
        Document append2 = new Document().append("$set", new Document("name", "Sally"));
        primary().execute("update", mongoClient2 -> {
            mongoClient2.getDatabase("transform_operations").getCollection(getCollectionName()).updateOne(RawBsonDocument.parse("{ '_id' : { '$oid' : '" + objectId + "'}}"), append2);
        });
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(topicName()).size()).isEqualTo(1);
        assertNoRecordsToConsume();
        SourceRecord sourceRecord = (SourceRecord) this.transformation.apply((SourceRecord) consumeRecordsByTopic.allRecordsInOrder().get(0));
        validate(sourceRecord);
        Iterator allWithName = sourceRecord.headers().allWithName("__op");
        Assertions.assertThat(allWithName.hasNext()).isTrue();
        Assertions.assertThat(((Header) allWithName.next()).value().toString()).isEqualTo(Envelope.Operation.UPDATE.code());
        Struct struct = (Struct) sourceRecord.key();
        Struct struct2 = (Struct) sourceRecord.value();
        Assertions.assertThat(struct.schema()).isSameAs(sourceRecord.keySchema());
        Assertions.assertThat(struct.schema().field("id").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat(struct.get("id")).isEqualTo(objectId.toString());
        Assertions.assertThat(struct2.schema()).isSameAs(sourceRecord.valueSchema());
        Assertions.assertThat(struct2.get("name")).isEqualTo("Sally");
        Assertions.assertThat(struct2.get("id")).isEqualTo(objectId.toString());
        Assertions.assertThat(struct2.schema().field("id").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat(struct2.schema().field("name").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat(struct2.schema().fields()).hasSize(2);
    }

    @Test
    @FixFor({"DBZ-612"})
    public void shouldGenerateRecordForUpdateEventWithUnset() throws InterruptedException {
        waitForStreamingRunning();
        ObjectId objectId = new ObjectId();
        Document append = new Document().append("_id", objectId).append("name", "Tim").append("phone", 123L).append("active", false);
        primary().execute("insert", mongoClient -> {
            mongoClient.getDatabase("transform_operations").getCollection(getCollectionName()).insertOne(append);
        });
        Assertions.assertThat(consumeRecordsByTopic(1).recordsForTopic(topicName()).size()).isEqualTo(1);
        assertNoRecordsToConsume();
        Document append2 = new Document().append("$set", new Document("name", "Sally")).append("$unset", new Document().append("phone", true).append("active", false));
        primary().execute("update", mongoClient2 -> {
            mongoClient2.getDatabase("transform_operations").getCollection(getCollectionName()).updateOne(RawBsonDocument.parse("{ '_id' : { '$oid' : '" + objectId + "'}}"), append2);
        });
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(topicName()).size()).isEqualTo(1);
        assertNoRecordsToConsume();
        SourceRecord sourceRecord = (SourceRecord) this.transformation.apply((SourceRecord) consumeRecordsByTopic.allRecordsInOrder().get(0));
        validate(sourceRecord);
        Struct struct = (Struct) sourceRecord.value();
        Assertions.assertThat(struct.schema()).isSameAs(sourceRecord.valueSchema());
        Assertions.assertThat(struct.get("name")).isEqualTo("Sally");
        Assertions.assertThat(struct.get("phone")).isEqualTo((Object) null);
        Assertions.assertThat(struct.schema().field("phone").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat(struct.schema().fields()).hasSize(4);
    }

    @Test
    @FixFor({"DBZ-612"})
    public void shouldGenerateRecordForUnsetOnlyUpdateEvent() throws InterruptedException {
        waitForStreamingRunning();
        ObjectId objectId = new ObjectId();
        Document append = new Document().append("_id", objectId).append("name", "Sally").append("phone", 123L).append("active", false);
        primary().execute("insert", mongoClient -> {
            mongoClient.getDatabase("transform_operations").getCollection(getCollectionName()).insertOne(append);
        });
        Assertions.assertThat(consumeRecordsByTopic(1).recordsForTopic(topicName()).size()).isEqualTo(1);
        assertNoRecordsToConsume();
        Document append2 = new Document().append("$unset", new Document().append("phone", true).append("active", false));
        primary().execute("update", mongoClient2 -> {
            mongoClient2.getDatabase("transform_operations").getCollection(getCollectionName()).updateOne(RawBsonDocument.parse("{ '_id' : { '$oid' : '" + objectId + "'}}"), append2);
        });
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(topicName()).size()).isEqualTo(1);
        assertNoRecordsToConsume();
        SourceRecord sourceRecord = (SourceRecord) this.transformation.apply((SourceRecord) consumeRecordsByTopic.allRecordsInOrder().get(0));
        validate(sourceRecord);
        Struct struct = (Struct) sourceRecord.value();
        Assertions.assertThat(struct.schema()).isSameAs(sourceRecord.valueSchema());
        Assertions.assertThat(struct.get("phone")).isEqualTo((Object) null);
        Assertions.assertThat(struct.schema().field("phone").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat(struct.schema().fields()).hasSize(3);
    }

    @Test
    @FixFor({"DBZ-582"})
    public void shouldGenerateRecordForDeleteEventWithoutTombstone() throws InterruptedException {
        restartConnectorWithoutEmittingTombstones();
        waitForStreamingRunning();
        HashMap hashMap = new HashMap();
        hashMap.put(HANDLE_DELETES, "none");
        this.transformation.configure(hashMap);
        ObjectId objectId = new ObjectId();
        Document append = new Document().append("_id", objectId);
        primary().execute("insert", mongoClient -> {
            mongoClient.getDatabase("transform_operations").getCollection(getCollectionName()).insertOne(append);
        });
        Assertions.assertThat(consumeRecordsByTopic(1).recordsForTopic(topicName()).size()).isEqualTo(1);
        assertNoRecordsToConsume();
        primary().execute("delete", mongoClient2 -> {
            mongoClient2.getDatabase("transform_operations").getCollection(getCollectionName()).deleteOne(RawBsonDocument.parse("{ '_id' : { '$oid' : '" + objectId + "'}}"));
        });
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(topicName()).size()).isEqualTo(1);
        assertNoRecordsToConsume();
        SourceRecord sourceRecord = (SourceRecord) this.transformation.apply((SourceRecord) consumeRecordsByTopic.allRecordsInOrder().get(0));
        validate(sourceRecord);
        Struct struct = (Struct) sourceRecord.key();
        Struct struct2 = (Struct) sourceRecord.value();
        Assertions.assertThat(struct.schema()).isSameAs(sourceRecord.keySchema());
        Assertions.assertThat(struct.schema().field("id").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat(struct.get("id")).isEqualTo(objectId.toString());
        Assertions.assertThat(struct2).isNull();
    }

    @Test
    @FixFor({"DBZ-1032"})
    public void shouldGenerateRecordHeaderForTombstone() throws InterruptedException {
        waitForStreamingRunning();
        HashMap hashMap = new HashMap();
        hashMap.put(OPERATION_HEADER, "true");
        hashMap.put("drop.tombstones", "false");
        this.transformation.configure(hashMap);
        ObjectId objectId = new ObjectId();
        Document append = new Document().append("_id", objectId);
        primary().execute("insert", mongoClient -> {
            mongoClient.getDatabase("transform_operations").getCollection(getCollectionName()).insertOne(append);
        });
        Assertions.assertThat(consumeRecordsByTopic(1).recordsForTopic(topicName()).size()).isEqualTo(1);
        assertNoRecordsToConsume();
        primary().execute("delete", mongoClient2 -> {
            mongoClient2.getDatabase("transform_operations").getCollection(getCollectionName()).deleteOne(RawBsonDocument.parse("{ '_id' : { '$oid' : '" + objectId + "'}}"));
        });
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(2);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(topicName()).size()).isEqualTo(2);
        assertNoRecordsToConsume();
        SourceRecord sourceRecord = (SourceRecord) this.transformation.apply((SourceRecord) consumeRecordsByTopic.allRecordsInOrder().get(1));
        validate(sourceRecord);
        Struct struct = (Struct) sourceRecord.key();
        Struct struct2 = (Struct) sourceRecord.value();
        Assertions.assertThat(struct.schema()).isSameAs(sourceRecord.keySchema());
        Assertions.assertThat(struct.schema().field("id").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat(struct.get("id")).isEqualTo(objectId.toString());
        Iterator allWithName = sourceRecord.headers().allWithName("__op");
        Assertions.assertThat(allWithName.hasNext()).isTrue();
        Assertions.assertThat(((Header) allWithName.next()).value().toString()).isEqualTo(Envelope.Operation.DELETE.code());
        Assertions.assertThat(struct2).isNull();
    }

    @Test
    @FixFor({"DBZ-583"})
    public void shouldDropDeleteMessagesByDefault() throws InterruptedException {
        restartConnectorWithoutEmittingTombstones();
        waitForStreamingRunning();
        ObjectId objectId = new ObjectId();
        Document append = new Document().append("_id", objectId);
        primary().execute("insert", mongoClient -> {
            mongoClient.getDatabase("transform_operations").getCollection(getCollectionName()).insertOne(append);
        });
        Assertions.assertThat(consumeRecordsByTopic(1).recordsForTopic(topicName()).size()).isEqualTo(1);
        assertNoRecordsToConsume();
        primary().execute("delete", mongoClient2 -> {
            mongoClient2.getDatabase("transform_operations").getCollection(getCollectionName()).deleteOne(RawBsonDocument.parse("{ '_id' : { '$oid' : '" + objectId + "'}}"));
        });
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(topicName()).size()).isEqualTo(1);
        assertNoRecordsToConsume();
        Assertions.assertThat(this.transformation.apply((SourceRecord) consumeRecordsByTopic.allRecordsInOrder().get(0))).isNull();
    }

    @Test
    @FixFor({"DBZ-583"})
    public void shouldRewriteDeleteMessage() throws InterruptedException {
        restartConnectorWithoutEmittingTombstones();
        waitForStreamingRunning();
        HashMap hashMap = new HashMap();
        hashMap.put(HANDLE_DELETES, "rewrite");
        this.transformation.configure(hashMap);
        ObjectId objectId = new ObjectId();
        Document append = new Document().append("_id", objectId);
        primary().execute("insert", mongoClient -> {
            mongoClient.getDatabase("transform_operations").getCollection(getCollectionName()).insertOne(append);
        });
        Assertions.assertThat(consumeRecordsByTopic(1).recordsForTopic(topicName()).size()).isEqualTo(1);
        assertNoRecordsToConsume();
        primary().execute("delete", mongoClient2 -> {
            mongoClient2.getDatabase("transform_operations").getCollection(getCollectionName()).deleteOne(RawBsonDocument.parse("{ '_id' : { '$oid' : '" + objectId + "'}}"));
        });
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(topicName()).size()).isEqualTo(1);
        assertNoRecordsToConsume();
        SourceRecord apply = this.transformation.apply((SourceRecord) consumeRecordsByTopic.allRecordsInOrder().get(0));
        Struct struct = (Struct) apply.key();
        Struct struct2 = (Struct) apply.value();
        Assertions.assertThat(struct.schema()).isSameAs(apply.keySchema());
        Assertions.assertThat(struct.schema().field("id").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat(struct.get("id")).isEqualTo(objectId.toString());
        Assertions.assertThat(struct2.schema().field("__deleted").schema()).isEqualTo(SchemaBuilder.OPTIONAL_BOOLEAN_SCHEMA);
        Assertions.assertThat(struct2.get("__deleted")).isEqualTo(true);
    }

    @Test
    @FixFor({"DBZ-583"})
    public void shouldRewriteMessagesWhichAreNotDeletes() throws InterruptedException {
        waitForStreamingRunning();
        HashMap hashMap = new HashMap();
        hashMap.put(HANDLE_DELETES, "rewrite");
        this.transformation.configure(hashMap);
        ObjectId objectId = new ObjectId();
        Document append = new Document().append("_id", objectId).append("name", "Tim");
        primary().execute("insert", mongoClient -> {
            mongoClient.getDatabase("transform_operations").getCollection(getCollectionName()).insertOne(append);
        });
        Assertions.assertThat(consumeRecordsByTopic(1).recordsForTopic(topicName()).size()).isEqualTo(1);
        assertNoRecordsToConsume();
        Document append2 = new Document().append("$set", new Document("name", "Sally"));
        primary().execute("update", mongoClient2 -> {
            mongoClient2.getDatabase("transform_operations").getCollection(getCollectionName()).updateOne(RawBsonDocument.parse("{ '_id' : { '$oid' : '" + objectId + "'}}"), append2);
        });
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(topicName()).size()).isEqualTo(1);
        assertNoRecordsToConsume();
        Struct struct = (Struct) this.transformation.apply((SourceRecord) consumeRecordsByTopic.allRecordsInOrder().get(0)).value();
        Assertions.assertThat(struct.schema().field("__deleted").schema()).isEqualTo(SchemaBuilder.OPTIONAL_BOOLEAN_SCHEMA);
        Assertions.assertThat(struct.get("__deleted")).isEqualTo(false);
    }

    @Test
    public void shouldGenerateRecordForDeleteEvent() throws InterruptedException {
        waitForStreamingRunning();
        HashMap hashMap = new HashMap();
        hashMap.put(OPERATION_HEADER, "true");
        hashMap.put(HANDLE_DELETES, "none");
        this.transformation.configure(hashMap);
        ObjectId objectId = new ObjectId();
        Document append = new Document().append("_id", objectId);
        primary().execute("insert", mongoClient -> {
            mongoClient.getDatabase("transform_operations").getCollection(getCollectionName()).insertOne(append);
        });
        Assertions.assertThat(consumeRecordsByTopic(1).recordsForTopic(topicName()).size()).isEqualTo(1);
        assertNoRecordsToConsume();
        primary().execute("delete", mongoClient2 -> {
            mongoClient2.getDatabase("transform_operations").getCollection(getCollectionName()).deleteOne(RawBsonDocument.parse("{ '_id' : { '$oid' : '" + objectId + "'}}"));
        });
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(2);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(topicName()).size()).isEqualTo(2);
        assertNoRecordsToConsume();
        SourceRecord apply = this.transformation.apply((SourceRecord) consumeRecordsByTopic.allRecordsInOrder().get(0));
        Iterator allWithName = apply.headers().allWithName("__op");
        Assertions.assertThat(allWithName.hasNext()).isTrue();
        Assertions.assertThat(((Header) allWithName.next()).value().toString()).isEqualTo(Envelope.Operation.DELETE.code());
        Struct struct = (Struct) apply.key();
        Struct struct2 = (Struct) apply.value();
        Assertions.assertThat(struct.schema()).isSameAs(apply.keySchema());
        Assertions.assertThat(struct.schema().field("id").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat(struct.get("id")).isEqualTo(objectId.toString());
        Assertions.assertThat(struct2).isNull();
    }

    @Test
    @FixFor({"DBZ-971"})
    public void shouldPropagatePreviousRecordHeaders() throws InterruptedException {
        waitForStreamingRunning();
        ObjectId objectId = new ObjectId();
        Document append = new Document().append("_id", objectId).append("name", "Tim");
        primary().execute("insert", mongoClient -> {
            mongoClient.getDatabase("transform_operations").getCollection(getCollectionName()).insertOne(append);
        });
        Assertions.assertThat(consumeRecordsByTopic(1).recordsForTopic(topicName()).size()).isEqualTo(1);
        assertNoRecordsToConsume();
        Document append2 = new Document().append("$set", new Document("name", "Sally"));
        primary().execute("update", mongoClient2 -> {
            mongoClient2.getDatabase("transform_operations").getCollection(getCollectionName()).updateOne(RawBsonDocument.parse("{ '_id' : { '$oid' : '" + objectId + "'}}"), append2);
        });
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(topicName()).size()).isEqualTo(1);
        assertNoRecordsToConsume();
        SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic.allRecordsInOrder().get(0);
        sourceRecord.headers().addString("application/debezium-test-header", "shouldPropagatePreviousRecordHeaders");
        SourceRecord apply = this.transformation.apply(sourceRecord);
        Assertions.assertThat(apply.headers()).hasSize(1);
        Iterator allWithName = apply.headers().allWithName("application/debezium-test-header");
        Assertions.assertThat(allWithName.hasNext()).isTrue();
        Assertions.assertThat(((Header) allWithName.next()).value().toString()).isEqualTo("shouldPropagatePreviousRecordHeaders");
    }

    @Test
    public void shouldNotFlattenTransformRecordForInsertEvent() throws InterruptedException {
        waitForStreamingRunning();
        ObjectId objectId = new ObjectId();
        Document append = new Document().append("_id", objectId).append("name", "Sally").append("address", new Document().append("street", "Morris Park Ave").append("zipcode", "10462"));
        primary().execute("insert", mongoClient -> {
            mongoClient.getDatabase("transform_operations").getCollection(getCollectionName()).insertOne(append);
        });
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(topicName()).size()).isEqualTo(1);
        assertNoRecordsToConsume();
        SourceRecord apply = this.transformation.apply((SourceRecord) consumeRecordsByTopic.allRecordsInOrder().get(0));
        Struct struct = (Struct) apply.key();
        Struct struct2 = (Struct) apply.value();
        Assertions.assertThat(struct.schema()).isSameAs(apply.keySchema());
        Assertions.assertThat(struct.schema().field("id").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat(struct.get("id")).isEqualTo(objectId.toString());
        Assertions.assertThat(struct2.schema()).isSameAs(apply.valueSchema());
        Assertions.assertThat(struct2.get("name")).isEqualTo("Sally");
        Assertions.assertThat(struct2.get("id")).isEqualTo(objectId.toString());
        Assertions.assertThat(struct2.get("address")).isEqualTo(new Struct(struct2.schema().field("address").schema()).put("street", "Morris Park Ave").put("zipcode", "10462"));
        Assertions.assertThat(struct2.schema().field("id").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat(struct2.schema().field("name").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat(struct2.schema().field("address").schema()).isEqualTo(SchemaBuilder.struct().name("mongo.transform_operations." + getCollectionName() + ".address").optional().field("street", Schema.OPTIONAL_STRING_SCHEMA).field("zipcode", Schema.OPTIONAL_STRING_SCHEMA).build());
        Assertions.assertThat(struct2.schema().fields()).hasSize(3);
    }

    @Test
    public void shouldFlattenTransformRecordForInsertEvent() throws InterruptedException {
        waitForStreamingRunning();
        HashMap hashMap = new HashMap();
        hashMap.put(FLATTEN_STRUCT, "true");
        this.transformation.configure(hashMap);
        ObjectId objectId = new ObjectId();
        Document append = new Document().append("_id", objectId).append("name", "Sally").append("address", new Document().append("street", "Morris Park Ave").append("zipcode", "10462"));
        primary().execute("insert", mongoClient -> {
            mongoClient.getDatabase("transform_operations").getCollection(getCollectionName()).insertOne(append);
        });
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(topicName()).size()).isEqualTo(1);
        assertNoRecordsToConsume();
        SourceRecord apply = this.transformation.apply((SourceRecord) consumeRecordsByTopic.allRecordsInOrder().get(0));
        Struct struct = (Struct) apply.key();
        Struct struct2 = (Struct) apply.value();
        Assertions.assertThat(struct.schema()).isSameAs(apply.keySchema());
        Assertions.assertThat(struct.schema().field("id").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat(struct.get("id")).isEqualTo(objectId.toString());
        Assertions.assertThat(struct2.schema()).isSameAs(apply.valueSchema());
        Assertions.assertThat(struct2.get("name")).isEqualTo("Sally");
        Assertions.assertThat(struct2.get("id")).isEqualTo(objectId.toString());
        Assertions.assertThat(struct2.get("address_street")).isEqualTo("Morris Park Ave");
        Assertions.assertThat(struct2.get("address_zipcode")).isEqualTo("10462");
        Assertions.assertThat(struct2.schema().field("id").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat(struct2.schema().field("name").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat(struct2.schema().field("address_street").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat(struct2.schema().field("address_zipcode").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat(struct2.schema().fields()).hasSize(4);
    }

    @Test
    public void shouldFlattenWithDelimiterTransformRecordForInsertEvent() throws InterruptedException {
        waitForStreamingRunning();
        HashMap hashMap = new HashMap();
        hashMap.put(FLATTEN_STRUCT, "true");
        hashMap.put(DELIMITER, "-");
        this.transformation.configure(hashMap);
        ObjectId objectId = new ObjectId();
        Document append = new Document().append("_id", objectId).append("name", "Sally").append("address", new Document().append("street", "Morris Park Ave").append("zipcode", "10462"));
        primary().execute("insert", mongoClient -> {
            mongoClient.getDatabase("transform_operations").getCollection(getCollectionName()).insertOne(append);
        });
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(topicName()).size()).isEqualTo(1);
        assertNoRecordsToConsume();
        SourceRecord apply = this.transformation.apply((SourceRecord) consumeRecordsByTopic.allRecordsInOrder().get(0));
        Struct struct = (Struct) apply.key();
        Struct struct2 = (Struct) apply.value();
        Assertions.assertThat(struct.schema()).isSameAs(apply.keySchema());
        Assertions.assertThat(struct.schema().field("id").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat(struct.get("id")).isEqualTo(objectId.toString());
        Assertions.assertThat(struct2.schema()).isSameAs(apply.valueSchema());
        Assertions.assertThat(struct2.get("name")).isEqualTo("Sally");
        Assertions.assertThat(struct2.get("id")).isEqualTo(objectId.toString());
        Assertions.assertThat(struct2.get("address-street")).isEqualTo("Morris Park Ave");
        Assertions.assertThat(struct2.get("address-zipcode")).isEqualTo("10462");
        Assertions.assertThat(struct2.schema().field("id").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat(struct2.schema().field("name").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat(struct2.schema().field("address-street").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat(struct2.schema().field("address-zipcode").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat(struct2.schema().fields()).hasSize(4);
    }

    @Test
    public void shouldFlattenWithDelimiterTransformRecordForUpdateEvent() throws InterruptedException {
        waitForStreamingRunning();
        HashMap hashMap = new HashMap();
        hashMap.put(FLATTEN_STRUCT, "true");
        hashMap.put(DELIMITER, "-");
        this.transformation.configure(hashMap);
        ObjectId objectId = new ObjectId();
        Document append = new Document().append("_id", objectId).append("name", "Sally").append("address", new Document().append("street", "Morris Park Ave").append("zipcode", "10462"));
        primary().execute("insert", mongoClient -> {
            mongoClient.getDatabase("transform_operations").getCollection(getCollectionName()).insertOne(append);
        });
        Assertions.assertThat(consumeRecordsByTopic(1).recordsForTopic(topicName()).size()).isEqualTo(1);
        assertNoRecordsToConsume();
        Document append2 = new Document().append("$set", new Document(Collect.hashMapOf("address.city", "Canberra", "address.name", "James", "address.city2.part", 3)));
        primary().execute("update", mongoClient2 -> {
            mongoClient2.getDatabase("transform_operations").getCollection(getCollectionName()).updateOne(RawBsonDocument.parse("{ '_id' : { '$oid' : '" + objectId + "'}}"), append2);
        });
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(topicName()).size()).isEqualTo(1);
        assertNoRecordsToConsume();
        SourceRecord apply = this.transformation.apply((SourceRecord) consumeRecordsByTopic.allRecordsInOrder().get(0));
        Struct struct = (Struct) apply.key();
        Struct struct2 = (Struct) apply.value();
        Assertions.assertThat(struct.schema()).isSameAs(apply.keySchema());
        Assertions.assertThat(struct.schema().field("id").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat(struct.get("id")).isEqualTo(objectId.toString());
        Assertions.assertThat(struct2.schema()).isSameAs(apply.valueSchema());
        Assertions.assertThat(struct2.get("id")).isEqualTo(objectId.toString());
        Assertions.assertThat(struct2.get("address-city")).isEqualTo("Canberra");
        Assertions.assertThat(struct2.get("address-name")).isEqualTo("James");
        Assertions.assertThat(struct2.get("address-city2-part")).isEqualTo(3);
        Assertions.assertThat(struct2.schema().field("id").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat(struct2.schema().field("address-city").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat(struct2.schema().field("address-name").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat(struct2.schema().field("address-city2-part").schema()).isEqualTo(SchemaBuilder.OPTIONAL_INT32_SCHEMA);
        Assertions.assertThat(struct2.schema().fields()).hasSize(4);
    }

    @Test
    @FixFor({"DBZ-1791"})
    public void testAddHeader() throws Exception {
        waitForStreamingRunning();
        HashMap hashMap = new HashMap();
        hashMap.put(ADD_HEADERS, "op");
        this.transformation.configure(hashMap);
        SourceRecord sourceRecord = (SourceRecord) this.transformation.apply(createCreateRecord());
        Assertions.assertThat(sourceRecord.headers()).hasSize(1);
        Assertions.assertThat(getSourceRecordHeaderByKey(sourceRecord, "__op")).isEqualTo(Envelope.Operation.CREATE.code());
    }

    @Test
    @FixFor({"DBZ-1791"})
    public void testAddHeadersForMissingOrInvalidFields() throws Exception {
        waitForStreamingRunning();
        HashMap hashMap = new HashMap();
        hashMap.put(ADD_HEADERS, "op,id");
        this.transformation.configure(hashMap);
        SourceRecord sourceRecord = (SourceRecord) this.transformation.apply(createCreateRecord());
        Assertions.assertThat(sourceRecord.headers()).hasSize(2);
        Assertions.assertThat(getSourceRecordHeaderByKey(sourceRecord, "__op")).isEqualTo(Envelope.Operation.CREATE.code());
        Assertions.assertThat(getSourceRecordHeaderByKey(sourceRecord, "__id")).isNull();
    }

    @Test
    @FixFor({"DBZ-1791", "DBZ-2504"})
    public void testAddHeadersSpecifyingStruct() throws Exception {
        waitForStreamingRunning();
        HashMap hashMap = new HashMap();
        hashMap.put(ADD_HEADERS, "op,source.rs,source.collection");
        hashMap.put(ADD_HEADERS_PREFIX, "prefix.");
        this.transformation.configure(hashMap);
        SourceRecord sourceRecord = (SourceRecord) this.transformation.apply(createCreateRecord());
        Assertions.assertThat(sourceRecord.headers()).hasSize(3);
        Assertions.assertThat(getSourceRecordHeaderByKey(sourceRecord, "prefix.op")).isEqualTo(Envelope.Operation.CREATE.code());
        Assertions.assertThat(getSourceRecordHeaderByKey(sourceRecord, "prefix.source_rs")).isEqualTo("rs0");
        Assertions.assertThat(getSourceRecordHeaderByKey(sourceRecord, "prefix.source_collection")).isEqualTo(getCollectionName());
    }

    @Test
    @FixFor({"DBZ-1791"})
    public void testAddField() throws Exception {
        waitForStreamingRunning();
        HashMap hashMap = new HashMap();
        hashMap.put(ADD_FIELDS, "op");
        this.transformation.configure(hashMap);
        Assertions.assertThat(((Struct) this.transformation.apply(createCreateRecord()).value()).get("__op")).isEqualTo(Envelope.Operation.CREATE.code());
    }

    @Test
    @FixFor({"DBZ-1791", "DBZ-2504"})
    public void testAddFields() throws Exception {
        waitForStreamingRunning();
        HashMap hashMap = new HashMap();
        hashMap.put(ADD_FIELDS, "op , ts_ms");
        hashMap.put(ADD_FIELDS_PREFIX, "prefix.");
        this.transformation.configure(hashMap);
        SourceRecord apply = this.transformation.apply(createCreateRecord());
        Assertions.assertThat(((Struct) apply.value()).get("prefix.op")).isEqualTo(Envelope.Operation.CREATE.code());
        Assertions.assertThat(((Struct) apply.value()).get("prefix.ts_ms")).isNotNull();
    }

    @Test
    @FixFor({"DBZ-1791"})
    public void testAddFieldsForMissingOptionalField() throws Exception {
        waitForStreamingRunning();
        HashMap hashMap = new HashMap();
        hashMap.put(ADD_FIELDS, "op,id");
        this.transformation.configure(hashMap);
        SourceRecord apply = this.transformation.apply(createCreateRecord());
        Assertions.assertThat(((Struct) apply.value()).get("__op")).isEqualTo(Envelope.Operation.CREATE.code());
        Assertions.assertThat(((Struct) apply.value()).get("__id")).isNull();
    }

    @Test
    @FixFor({"DBZ-1791"})
    public void testAddFieldsSpecifyStruct() throws Exception {
        waitForStreamingRunning();
        HashMap hashMap = new HashMap();
        hashMap.put(ADD_FIELDS, "op,source.rs,source.collection");
        this.transformation.configure(hashMap);
        SourceRecord apply = this.transformation.apply(createCreateRecord());
        Assertions.assertThat(((Struct) apply.value()).get("__op")).isEqualTo(Envelope.Operation.CREATE.code());
        Assertions.assertThat(((Struct) apply.value()).get("__source_rs")).isEqualTo("rs0");
        Assertions.assertThat(((Struct) apply.value()).get("__source_collection")).isEqualTo(getCollectionName());
    }

    @Test
    @FixFor({"DBZ-1791"})
    public void testAddFieldHandleDeleteRewrite() throws Exception {
        waitForStreamingRunning();
        HashMap hashMap = new HashMap();
        hashMap.put(HANDLE_DELETES, "rewrite");
        hashMap.put(ADD_FIELDS, "op");
        this.transformation.configure(hashMap);
        SourceRecord apply = this.transformation.apply((SourceRecord) createDeleteRecordWithTombstone().allRecordsInOrder().get(0));
        Assertions.assertThat(((Struct) apply.value()).get("__deleted")).isEqualTo(true);
        Assertions.assertThat(((Struct) apply.value()).get("__op")).isEqualTo(Envelope.Operation.DELETE.code());
    }

    @Test
    @FixFor({"DBZ-1791"})
    public void tesAddFieldsHandleDeleteRewrite() throws Exception {
        waitForStreamingRunning();
        HashMap hashMap = new HashMap();
        hashMap.put(HANDLE_DELETES, "rewrite");
        hashMap.put(ADD_FIELDS, "op,ts_ms");
        this.transformation.configure(hashMap);
        SourceRecord apply = this.transformation.apply((SourceRecord) createDeleteRecordWithTombstone().allRecordsInOrder().get(0));
        Assertions.assertThat(((Struct) apply.value()).get("__deleted")).isEqualTo(true);
        Assertions.assertThat(((Struct) apply.value()).get("__op")).isEqualTo(Envelope.Operation.DELETE.code());
        Assertions.assertThat(((Struct) apply.value()).get("__ts_ms")).isNotNull();
    }

    @Test
    @FixFor({"DBZ-1791"})
    public void testAddFieldsSpecifyStructHandleDeleteRewrite() throws Exception {
        waitForStreamingRunning();
        HashMap hashMap = new HashMap();
        hashMap.put(HANDLE_DELETES, "rewrite");
        hashMap.put(ADD_FIELDS, "op,source.rs,source.collection");
        this.transformation.configure(hashMap);
        SourceRecord apply = this.transformation.apply((SourceRecord) createDeleteRecordWithTombstone().allRecordsInOrder().get(0));
        Assertions.assertThat(((Struct) apply.value()).get("__deleted")).isEqualTo(true);
        Assertions.assertThat(((Struct) apply.value()).get("__op")).isEqualTo(Envelope.Operation.DELETE.code());
        Assertions.assertThat(((Struct) apply.value()).get("__source_rs")).isEqualTo("rs0");
        Assertions.assertThat(((Struct) apply.value()).get("__source_collection")).isEqualTo(getCollectionName());
    }

    @Test
    @FixFor({"DBZ-1791"})
    public void testAddFieldsHandleDeleteRewriteAndTombstone() throws Exception {
        waitForStreamingRunning();
        HashMap hashMap = new HashMap();
        hashMap.put(HANDLE_DELETES, "rewrite");
        hashMap.put(ADD_FIELDS, "op,ts_ms");
        hashMap.put("drop.tombstones", "false");
        this.transformation.configure(hashMap);
        AbstractConnectorTest.SourceRecords createDeleteRecordWithTombstone = createDeleteRecordWithTombstone();
        SourceRecord apply = this.transformation.apply((SourceRecord) createDeleteRecordWithTombstone.allRecordsInOrder().get(0));
        Assertions.assertThat(((Struct) apply.value()).get("__deleted")).isEqualTo(true);
        Assertions.assertThat(((Struct) apply.value()).get("__op")).isEqualTo(Envelope.Operation.DELETE.code());
        Assertions.assertThat(((Struct) apply.value()).get("__ts_ms")).isNotNull();
        Assertions.assertThat(this.transformation.apply((SourceRecord) createDeleteRecordWithTombstone.allRecordsInOrder().get(1)).value()).isNull();
    }

    @Test
    @FixFor({"DBZ-2585"})
    public void testEmptyArray() throws InterruptedException, IOException {
        HashMap hashMap = new HashMap();
        hashMap.put(ARRAY_ENCODING, "array");
        hashMap.put("sanitize.field.names", "true");
        this.transformation.configure(hashMap);
        primary().execute("insert", mongoClient -> {
            mongoClient.getDatabase("transform_operations").getCollection(getCollectionName()).insertOne(Document.parse("{'empty_array': [] }"));
        });
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(topicName()).size()).isEqualTo(1);
        SourceRecord apply = this.transformation.apply((SourceRecord) consumeRecordsByTopic.recordsForTopic(topicName()).get(0));
        Assertions.assertThat(apply.valueSchema().field("empty_array")).isNull();
        VerifyRecord.isValid(apply);
    }

    @Test
    @FixFor({"DBZ-2455"})
    public void testAddPatchFieldAfterUpdate() throws Exception {
        waitForStreamingRunning();
        ObjectId objectId = new ObjectId();
        Document append = new Document().append("_id", objectId).append("a", 1).append("b", 2).append("c", 3);
        primary().execute("insert", mongoClient -> {
            mongoClient.getDatabase("transform_operations").getCollection(getCollectionName()).insertOne(append);
        });
        Assertions.assertThat(consumeRecordsByTopic(1).recordsForTopic(topicName()).size()).isEqualTo(1);
        assertNoRecordsToConsume();
        Document append2 = new Document().append("$set", new Document(Collect.hashMapOf("a", 22)));
        primary().execute("update", mongoClient2 -> {
            mongoClient2.getDatabase("transform_operations").getCollection(getCollectionName()).updateOne(RawBsonDocument.parse("{ '_id' : { '$oid' : '" + objectId + "'}}"), append2);
        });
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(topicName()).size()).isEqualTo(1);
        assertNoRecordsToConsume();
        HashMap hashMap = new HashMap();
        hashMap.put(ADD_FIELDS, "patch");
        this.transformation.configure(hashMap);
        SourceRecord apply = this.transformation.apply((SourceRecord) consumeRecordsByTopic.allRecordsInOrder().get(0));
        Struct struct = (Struct) apply.key();
        Struct struct2 = (Struct) apply.value();
        Assertions.assertThat(struct.schema()).isSameAs(apply.keySchema());
        Assertions.assertThat(struct.schema().field("id").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat(struct.get("id")).isEqualTo(objectId.toString());
        Assertions.assertThat(struct2.schema()).isSameAs(apply.valueSchema());
        Assertions.assertThat(struct2.get("id")).isEqualTo(objectId.toString());
        Assertions.assertThat(struct2.get("a")).isEqualTo(22);
        Assertions.assertThat(TestHelper.getDocumentWithoutLanguageVersion(struct2.getString("__patch")).toJson()).isEqualTo("{\"$set\": {\"a\": 22}}");
        Assertions.assertThat(struct2.schema().field("id").schema()).isEqualTo(SchemaBuilder.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat(struct2.schema().field("a").schema()).isEqualTo(SchemaBuilder.OPTIONAL_INT32_SCHEMA);
        Assertions.assertThat(struct2.schema().field("__patch").schema()).isEqualTo(Json.builder().optional().build());
        Assertions.assertThat(struct2.schema().fields()).hasSize(3);
    }

    @Test(expected = DataException.class)
    @FixFor({"DBZ-2316"})
    public void testShouldThrowExceptionWithElementsDifferingStructures() throws Exception {
        waitForStreamingRunning();
        HashMap hashMap = new HashMap();
        hashMap.put(ARRAY_ENCODING, "array");
        hashMap.put(ADD_FIELDS, "op,source.ts_ms");
        this.transformation.configure(hashMap);
        Iterator it = createCreateRecordFromJson("dbz-2316.json").allRecordsInOrder().iterator();
        while (it.hasNext()) {
            this.transformation.apply((SourceRecord) it.next());
        }
    }

    @Test
    @FixFor({"DBZ-2569"})
    public void testMatrixType() throws InterruptedException, IOException {
        HashMap hashMap = new HashMap();
        hashMap.put(ARRAY_ENCODING, "array");
        hashMap.put("drop.tombstones", "false");
        hashMap.put(HANDLE_DELETES, "none");
        this.transformation.configure(hashMap);
        primary().execute("insert", mongoClient -> {
            mongoClient.getDatabase("transform_operations").getCollection(getCollectionName()).insertOne(Document.parse("{  'matrix': [    [1,2,3],    [4,5,6],    [7,8,9],  ]  ,'array_complex': [    {'k1' : 'v1','k2' : 1},    {'k1' : 'v2','k2' : 2},  ]  ,'matrix_complex': [    [      {'k3' : 'v111',       'k4' : [1,2,3]},      {'k3' : 'v211',       'k4' : [4,5,6]}    ],    [      {'k3' : 'v112',       'k4' : [7,8]},      {'k3' : 'v212',       'k4' : [8]}    ],  ]}"));
        });
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(topicName()).size()).isEqualTo(1);
        SourceRecord apply = this.transformation.apply((SourceRecord) consumeRecordsByTopic.recordsForTopic(topicName()).get(0));
        Struct struct = (Struct) apply.value();
        Schema schema = apply.valueSchema().field("matrix").schema();
        Assertions.assertThat(schema.type()).isEqualTo(Schema.Type.ARRAY);
        Schema schema2 = schema.valueSchema().schema();
        Assertions.assertThat(schema2.type()).isEqualTo(Schema.Type.ARRAY);
        Assertions.assertThat(schema2.valueSchema()).isEqualTo(Schema.OPTIONAL_INT32_SCHEMA);
        Assertions.assertThat(struct.get("matrix")).isEqualTo(Arrays.asList(Arrays.asList(1, 2, 3), Arrays.asList(4, 5, 6), Arrays.asList(7, 8, 9)));
        Schema schema3 = apply.valueSchema().field("array_complex").schema();
        Assertions.assertThat(schema3.type()).isEqualTo(Schema.Type.ARRAY);
        Schema schema4 = schema3.valueSchema().schema();
        Assertions.assertThat(schema4.type()).isEqualTo(Schema.Type.STRUCT);
        Assertions.assertThat(schema4.field("k1").schema()).isEqualTo(Schema.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat(schema4.field("k2").schema()).isEqualTo(Schema.OPTIONAL_INT32_SCHEMA);
        Field field = schema4.field("k1");
        Field field2 = schema4.field("k2");
        Struct struct2 = new Struct(schema4);
        struct2.put(field, "v1");
        struct2.put(field2, 1);
        Struct struct3 = new Struct(schema4);
        struct3.put(field, "v2");
        struct3.put(field2, 2);
        Assertions.assertThat(struct.get("array_complex")).isEqualTo(Arrays.asList(struct2, struct3));
        Schema schema5 = apply.valueSchema().field("matrix_complex").schema();
        Assertions.assertThat(schema5.type()).isEqualTo(Schema.Type.ARRAY);
        Schema schema6 = schema5.valueSchema().schema();
        Assertions.assertThat(schema6.type()).isEqualTo(Schema.Type.ARRAY);
        Schema valueSchema = schema6.valueSchema();
        Assertions.assertThat(valueSchema.schema().type()).isEqualTo(Schema.Type.STRUCT);
        Assertions.assertThat(valueSchema.field("k3").schema()).isEqualTo(Schema.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat(valueSchema.field("k4").schema().type()).isEqualTo(Schema.Type.ARRAY);
        Assertions.assertThat(valueSchema.field("k4").schema().valueSchema()).isEqualTo(Schema.OPTIONAL_INT32_SCHEMA);
        Field field3 = valueSchema.field("k3");
        Field field4 = valueSchema.field("k4");
        Struct struct4 = new Struct(valueSchema.schema());
        struct4.put(field3, "v111");
        struct4.put(field4, Arrays.asList(1, 2, 3));
        Struct struct5 = new Struct(valueSchema.schema());
        struct5.put(field3, "v112");
        struct5.put(field4, Arrays.asList(7, 8));
        Struct struct6 = new Struct(valueSchema.schema());
        struct6.put(field3, "v211");
        struct6.put(field4, Arrays.asList(4, 5, 6));
        Struct struct7 = new Struct(valueSchema.schema());
        struct7.put(field3, "v212");
        struct7.put(field4, Arrays.asList(8));
        Assertions.assertThat(struct.get("matrix_complex")).isEqualTo(Arrays.asList(Arrays.asList(struct4, struct6), Arrays.asList(struct5, struct7)));
    }

    @Test
    @FixFor({"DBZ-2569"})
    public void testMatrixArrayAsDocumentType() throws InterruptedException, IOException {
        HashMap hashMap = new HashMap();
        hashMap.put(ARRAY_ENCODING, "document");
        hashMap.put("drop.tombstones", "false");
        hashMap.put(HANDLE_DELETES, "none");
        this.transformation.configure(hashMap);
        primary().execute("insert", mongoClient -> {
            mongoClient.getDatabase("transform_operations").getCollection(getCollectionName()).insertOne(Document.parse("{  'matrix': [    [1,'aa',3],    [4,5,'6'],    [7.0,8],  ]}"));
        });
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(topicName()).size()).isEqualTo(1);
        SourceRecord apply = this.transformation.apply((SourceRecord) consumeRecordsByTopic.recordsForTopic(topicName()).get(0));
        Schema schema = apply.valueSchema().field("matrix").schema();
        Assertions.assertThat(schema.type()).isEqualTo(Schema.Type.STRUCT);
        Assertions.assertThat(schema.fields().size()).isEqualTo(3);
        Schema schema2 = schema.field("_0").schema();
        Assertions.assertThat(schema2.type()).isEqualTo(Schema.Type.STRUCT);
        Assertions.assertThat(schema2.fields().size()).isEqualTo(3);
        Assertions.assertThat(schema2.field("_0").schema()).isEqualTo(Schema.OPTIONAL_INT32_SCHEMA);
        Assertions.assertThat(schema2.field("_1").schema()).isEqualTo(Schema.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat(schema2.field("_2").schema()).isEqualTo(Schema.OPTIONAL_INT32_SCHEMA);
        Schema schema3 = schema.field("_1").schema();
        Assertions.assertThat(schema3.type()).isEqualTo(Schema.Type.STRUCT);
        Assertions.assertThat(schema3.fields().size()).isEqualTo(3);
        Assertions.assertThat(schema3.field("_0").schema()).isEqualTo(Schema.OPTIONAL_INT32_SCHEMA);
        Assertions.assertThat(schema3.field("_1").schema()).isEqualTo(Schema.OPTIONAL_INT32_SCHEMA);
        Assertions.assertThat(schema3.field("_2").schema()).isEqualTo(Schema.OPTIONAL_STRING_SCHEMA);
        Schema schema4 = schema.field("_2").schema();
        Assertions.assertThat(schema4.type()).isEqualTo(Schema.Type.STRUCT);
        Assertions.assertThat(schema4.fields().size()).isEqualTo(2);
        Assertions.assertThat(schema4.field("_0").schema()).isEqualTo(Schema.OPTIONAL_FLOAT64_SCHEMA);
        Assertions.assertThat(schema4.field("_1").schema()).isEqualTo(Schema.OPTIONAL_INT32_SCHEMA);
        Struct struct = (Struct) apply.value();
        Struct struct2 = new Struct(schema2);
        struct2.put(schema2.field("_0"), 1);
        struct2.put(schema2.field("_1"), "aa");
        struct2.put(schema2.field("_2"), 3);
        Struct struct3 = new Struct(schema3);
        struct3.put(schema3.field("_0"), 4);
        struct3.put(schema3.field("_1"), 5);
        struct3.put(schema3.field("_2"), "6");
        Struct struct4 = new Struct(schema4);
        struct4.put(schema4.field("_0"), Double.valueOf(7.0d));
        struct4.put(schema4.field("_1"), 8);
        Struct struct5 = new Struct(schema);
        struct5.put(schema.field("_0"), struct2);
        struct5.put(schema.field("_1"), struct3);
        struct5.put(schema.field("_2"), struct4);
        Assertions.assertThat(struct.get("matrix")).isEqualTo(struct5);
    }

    private AbstractConnectorTest.SourceRecords createCreateRecordFromJson(String str) throws Exception {
        List<Document> loadTestDocuments = loadTestDocuments(str);
        primary().execute("Load JSON", mongoClient -> {
            Iterator it = loadTestDocuments.iterator();
            while (it.hasNext()) {
                mongoClient.getDatabase("transform_operations").getCollection(getCollectionName()).insertOne((Document) it.next());
            }
        });
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(loadTestDocuments.size());
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(topicName()).size()).isEqualTo(loadTestDocuments.size());
        assertNoRecordsToConsume();
        return consumeRecordsByTopic;
    }

    private List<Document> loadTestDocuments(String str) {
        ArrayList arrayList = new ArrayList();
        try {
            InputStream readResourceAsStream = Testing.Files.readResourceAsStream(str);
            try {
                Assertions.assertThat(readResourceAsStream).isNotNull();
                IoUtil.readLines(readResourceAsStream, str2 -> {
                    Document parse = Document.parse(str2);
                    Assertions.assertThat(parse.size()).isGreaterThan(0);
                    arrayList.add(parse);
                });
                if (readResourceAsStream != null) {
                    readResourceAsStream.close();
                }
            } finally {
            }
        } catch (IOException e) {
            Assert.fail("Unable to find or read file '" + str + "': " + e.getMessage());
        }
        return arrayList;
    }

    private SourceRecord createCreateRecord() throws Exception {
        Document append = new Document().append("_id", new ObjectId()).append("name", "Sally").append("address", new Document().append("struct", "Morris Park Ave").append("zipcode", "10462"));
        primary().execute("insert", mongoClient -> {
            mongoClient.getDatabase("transform_operations").getCollection(getCollectionName()).insertOne(append);
        });
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(topicName()).size()).isEqualTo(1);
        assertNoRecordsToConsume();
        return (SourceRecord) consumeRecordsByTopic.allRecordsInOrder().get(0);
    }

    private AbstractConnectorTest.SourceRecords createDeleteRecordWithTombstone() throws Exception {
        ObjectId objectId = new ObjectId();
        Document append = new Document().append("_id", objectId).append("name", "Sally").append("address", new Document().append("struct", "Morris Park Ave").append("zipcode", "10462"));
        primary().execute("insert", mongoClient -> {
            mongoClient.getDatabase("transform_operations").getCollection(getCollectionName()).insertOne(append);
        });
        Assertions.assertThat(consumeRecordsByTopic(1).recordsForTopic(topicName()).size()).isEqualTo(1);
        assertNoRecordsToConsume();
        primary().execute("delete", mongoClient2 -> {
            mongoClient2.getDatabase("transform_operations").getCollection(getCollectionName()).deleteOne(Document.parse("{\"_id\": {\"$oid\": \"" + objectId + "\"}}"));
        });
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(2);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(topicName()).size()).isEqualTo(2);
        assertNoRecordsToConsume();
        return consumeRecordsByTopic;
    }

    private static void waitForStreamingRunning() throws InterruptedException {
        waitForStreamingRunning("mongodb", "mongo");
    }

    private String getSourceRecordHeaderByKey(SourceRecord sourceRecord, String str) {
        Object value;
        Iterator allWithName = sourceRecord.headers().allWithName(str);
        if (allWithName.hasNext() && (value = ((Header) allWithName.next()).value()) != null) {
            return value.toString();
        }
        return null;
    }
}
