package io.debezium.connector.sqlserver;

import io.debezium.config.Configuration;
import io.debezium.connector.sqlserver.SqlServerConnectorConfig;
import io.debezium.connector.sqlserver.util.TestHelper;
import io.debezium.data.Envelope;
import io.debezium.data.SchemaAndValueField;
import io.debezium.data.SourceRecordAssert;
import io.debezium.data.VerifyRecord;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.Tables;
import io.debezium.relational.ddl.DdlParser;
import io.debezium.relational.history.DatabaseHistory;
import io.debezium.relational.history.DatabaseHistoryException;
import io.debezium.relational.history.DatabaseHistoryListener;
import io.debezium.relational.history.FileDatabaseHistory;
import io.debezium.relational.history.HistoryRecordComparator;
import io.debezium.relational.history.TableChanges;
import io.debezium.util.Testing;
import java.io.IOException;
import java.nio.file.Files;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.awaitility.Awaitility;
import org.fest.assertions.Assertions;
import org.fest.assertions.MapAssert;
import org.fest.assertions.ObjectAssert;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;

/* loaded from: input_file:io/debezium/connector/sqlserver/SqlServerConnectorIT.class */
public class SqlServerConnectorIT extends AbstractConnectorTest {
    private SqlServerConnection connection;

    /* loaded from: input_file:io/debezium/connector/sqlserver/SqlServerConnectorIT$PurgableFileDatabaseHistory.class */
    public static class PurgableFileDatabaseHistory implements DatabaseHistory {
        final DatabaseHistory delegate = new FileDatabaseHistory();

        public boolean exists() {
            try {
                if (storageExists()) {
                    if (Files.size(TestHelper.DB_HISTORY_PATH) > 0) {
                        return true;
                    }
                }
                return false;
            } catch (IOException e) {
                throw new DatabaseHistoryException("File should exist");
            }
        }

        public void configure(Configuration configuration, HistoryRecordComparator historyRecordComparator, DatabaseHistoryListener databaseHistoryListener, boolean z) {
            this.delegate.configure(configuration, historyRecordComparator, databaseHistoryListener, z);
        }

        public void start() {
            this.delegate.start();
        }

        public void record(Map<String, ?> map, Map<String, ?> map2, String str, String str2) throws DatabaseHistoryException {
            this.delegate.record(map, map2, str, str2);
        }

        public void record(Map<String, ?> map, Map<String, ?> map2, String str, String str2, String str3, TableChanges tableChanges) throws DatabaseHistoryException {
            this.delegate.record(map, map2, str, str2, str3, tableChanges);
        }

        public void recover(Map<String, ?> map, Map<String, ?> map2, Tables tables, DdlParser ddlParser) {
            this.delegate.recover(map, map2, tables, ddlParser);
        }

        public void stop() {
            this.delegate.stop();
        }

        public boolean storageExists() {
            return this.delegate.storageExists();
        }

        public void initializeStorage() {
            this.delegate.initializeStorage();
        }

        public boolean storeOnlyCapturedTables() {
            return false;
        }

        public boolean skipUnparseableDdlStatements() {
            return false;
        }
    }

    @Before
    public void before() throws SQLException {
        TestHelper.createTestDatabase();
        this.connection = TestHelper.testConnection();
        this.connection.execute(new String[]{"CREATE TABLE tablea (id int primary key, cola varchar(30))", "CREATE TABLE tableb (id int primary key, colb varchar(30))", "INSERT INTO tablea VALUES(1, 'a')"});
        TestHelper.enableTableCdc(this.connection, "tablea");
        TestHelper.enableTableCdc(this.connection, "tableb");
        initializeConnectorTestFramework();
        Testing.Files.delete(TestHelper.DB_HISTORY_PATH);
    }

    @After
    public void after() throws SQLException {
        if (this.connection != null) {
            this.connection.close();
        }
    }

    @Test
    public void createAndDelete() throws Exception {
        start(SqlServerConnector.class, TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.INITIAL).build());
        assertConnectorIsRunning();
        consumeRecordsByTopic(1);
        for (int i = 0; i < 5; i++) {
            int i2 = 10 + i;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + i2 + ", 'a')"});
            this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + i2 + ", 'b')"});
        }
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(10);
        List recordsForTopic = consumeRecordsByTopic.recordsForTopic("server1.dbo.tablea");
        List recordsForTopic2 = consumeRecordsByTopic.recordsForTopic("server1.dbo.tableb");
        Assertions.assertThat(recordsForTopic).hasSize(5);
        Assertions.assertThat(recordsForTopic2).hasSize(5);
        for (int i3 = 0; i3 < 5; i3++) {
            SourceRecord sourceRecord = (SourceRecord) recordsForTopic.get(i3);
            SourceRecord sourceRecord2 = (SourceRecord) recordsForTopic2.get(i3);
            List<SchemaAndValueField> asList = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, Integer.valueOf(i3 + 10)), new SchemaAndValueField("cola", Schema.OPTIONAL_STRING_SCHEMA, "a"));
            List<SchemaAndValueField> asList2 = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, Integer.valueOf(i3 + 10)), new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, "b"));
            Struct struct = (Struct) sourceRecord.value();
            assertRecord((Struct) struct.get("after"), asList);
            Assert.assertNull(struct.get("before"));
            Struct struct2 = (Struct) sourceRecord2.value();
            assertRecord((Struct) struct2.get("after"), asList2);
            Assert.assertNull(struct2.get("before"));
        }
        this.connection.execute(new String[]{"DELETE FROM tableB"});
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(10);
        List recordsForTopic3 = consumeRecordsByTopic2.recordsForTopic("server1.dbo.tablea");
        List recordsForTopic4 = consumeRecordsByTopic2.recordsForTopic("server1.dbo.tableb");
        Assertions.assertThat(recordsForTopic3).isNullOrEmpty();
        Assertions.assertThat(recordsForTopic4).hasSize(10);
        for (int i4 = 0; i4 < 5; i4++) {
            SourceRecord sourceRecord3 = (SourceRecord) recordsForTopic4.get(i4 * 2);
            SourceRecord sourceRecord4 = (SourceRecord) recordsForTopic4.get((i4 * 2) + 1);
            List<SchemaAndValueField> asList3 = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, Integer.valueOf(i4 + 10)), new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, "b"));
            Struct struct3 = (Struct) sourceRecord3.value();
            assertRecord((Struct) struct3.get("before"), asList3);
            Assert.assertNull(struct3.get("after"));
            Assert.assertNull((Struct) sourceRecord4.value());
        }
        stopConnector();
    }

    @Test
    @FixFor({"DBZ-1642"})
    public void readOnlyApplicationIntent() throws Exception {
        LogInterceptor logInterceptor = new LogInterceptor();
        String str = "readOnlyApplicationIntent-" + UUID.randomUUID();
        start(SqlServerConnector.class, TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.INITIAL).with("database.applicationIntent", "ReadOnly").with("database.applicationName", str).build());
        assertConnectorIsRunning();
        TestHelper.waitForSnapshotToBeCompleted();
        consumeRecordsByTopic(1);
        TestHelper.waitForStreamingStarted();
        for (int i = 0; i < 5; i++) {
            int i2 = 10 + i;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + i2 + ", 'a')"});
            this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + i2 + ", 'b')"});
        }
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(10, 24);
        List recordsForTopic = consumeRecordsByTopic.recordsForTopic("server1.dbo.tablea");
        List recordsForTopic2 = consumeRecordsByTopic.recordsForTopic("server1.dbo.tableb");
        Assertions.assertThat(recordsForTopic).hasSize(5);
        Assertions.assertThat(recordsForTopic2).hasSize(5);
        for (int i3 = 0; i3 < 5; i3++) {
            SourceRecord sourceRecord = (SourceRecord) recordsForTopic.get(i3);
            SourceRecord sourceRecord2 = (SourceRecord) recordsForTopic2.get(i3);
            List<SchemaAndValueField> asList = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, Integer.valueOf(i3 + 10)), new SchemaAndValueField("cola", Schema.OPTIONAL_STRING_SCHEMA, "a"));
            List<SchemaAndValueField> asList2 = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, Integer.valueOf(i3 + 10)), new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, "b"));
            Struct struct = (Struct) sourceRecord.value();
            assertRecord((Struct) struct.get("after"), asList);
            Assert.assertNull(struct.get("before"));
            Struct struct2 = (Struct) sourceRecord2.value();
            assertRecord((Struct) struct2.get("after"), asList2);
            Assert.assertNull(struct2.get("before"));
        }
        Assertions.assertThat(logInterceptor.containsMessage("Schema locking was disabled in connector configuration")).isTrue();
        SqlServerConnection adminConnection = TestHelper.adminConnection();
        try {
            HashSet hashSet = new HashSet();
            Awaitility.await().atMost(TestHelper.waitTimeForRecords() * 5, TimeUnit.SECONDS).pollInterval(100L, TimeUnit.MILLISECONDS).until(() -> {
                adminConnection.query("SELECT (SELECT transaction_id FROM sys.dm_tran_session_transactions AS t WHERE s.session_id=t.session_id) FROM sys.dm_exec_sessions AS s WHERE program_name='" + str + "'", resultSet -> {
                    while (resultSet.next()) {
                        long j = resultSet.getLong(1);
                        if (j != 0) {
                            hashSet.add(Long.valueOf(j));
                        }
                    }
                });
                return Boolean.valueOf(hashSet.size() > 2);
            });
            if (adminConnection != null) {
                adminConnection.close();
            }
            stopConnector();
        } catch (Throwable th) {
            if (adminConnection != null) {
                try {
                    adminConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-1643"})
    public void timestampAndTimezone() throws Exception {
        TimeZone timeZone = TimeZone.getDefault();
        try {
            TimeZone.setDefault(TimeZone.getTimeZone("Australia/Canberra"));
            start(SqlServerConnector.class, TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.INITIAL).build());
            assertConnectorIsRunning();
            consumeRecordsByTopic(1);
            Instant now = Instant.now();
            Instant minusSeconds = now.minusSeconds(300L);
            Instant plusSeconds = now.plusSeconds(300L);
            for (int i = 0; i < 5; i++) {
                int i2 = 10 + i;
                this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + i2 + ", 'a')"});
                this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + i2 + ", 'b')"});
            }
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(10);
            List recordsForTopic = consumeRecordsByTopic.recordsForTopic("server1.dbo.tablea");
            List recordsForTopic2 = consumeRecordsByTopic.recordsForTopic("server1.dbo.tableb");
            Assertions.assertThat(recordsForTopic).hasSize(5);
            Assertions.assertThat(recordsForTopic2).hasSize(5);
            for (int i3 = 0; i3 < 5; i3++) {
                Instant ofEpochMilli = Instant.ofEpochMilli(((Struct) ((SourceRecord) recordsForTopic.get(i3)).value()).getStruct("source").getInt64("ts_ms").longValue());
                Assertions.assertThat(ofEpochMilli.isAfter(minusSeconds) && ofEpochMilli.isBefore(plusSeconds)).isTrue();
            }
            stopConnector();
            TimeZone.setDefault(timeZone);
        } catch (Throwable th) {
            TimeZone.setDefault(timeZone);
            throw th;
        }
    }

    @Test
    public void deleteWithoutTombstone() throws Exception {
        start(SqlServerConnector.class, TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.INITIAL).with(SqlServerConnectorConfig.TOMBSTONES_ON_DELETE, false).build());
        assertConnectorIsRunning();
        consumeRecordsByTopic(1);
        for (int i = 0; i < 5; i++) {
            int i2 = 10 + i;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + i2 + ", 'a')"});
            this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + i2 + ", 'b')"});
        }
        consumeRecordsByTopic(10);
        this.connection.execute(new String[]{"DELETE FROM tableB"});
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(5);
        List recordsForTopic = consumeRecordsByTopic.recordsForTopic("server1.dbo.tablea");
        List recordsForTopic2 = consumeRecordsByTopic.recordsForTopic("server1.dbo.tableb");
        Assertions.assertThat(recordsForTopic).isNullOrEmpty();
        Assertions.assertThat(recordsForTopic2).hasSize(5);
        for (int i3 = 0; i3 < 5; i3++) {
            SourceRecord sourceRecord = (SourceRecord) recordsForTopic2.get(i3);
            List<SchemaAndValueField> asList = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, Integer.valueOf(i3 + 10)), new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, "b"));
            Struct struct = (Struct) sourceRecord.value();
            assertRecord((Struct) struct.get("before"), asList);
            Assert.assertNull(struct.get("after"));
        }
        stopConnector();
    }

    @Test
    public void update() throws Exception {
        start(SqlServerConnector.class, TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.INITIAL).build());
        assertConnectorIsRunning();
        consumeRecordsByTopic(1);
        this.connection.setAutoCommit(false);
        String[] strArr = new String[5];
        for (int i = 0; i < 5; i++) {
            strArr[i] = "INSERT INTO tableb VALUES(" + (10 + i) + ", 'b')";
        }
        this.connection.execute(strArr);
        this.connection.setAutoCommit(true);
        this.connection.execute(new String[]{"UPDATE tableb SET colb='z'"});
        List recordsForTopic = consumeRecordsByTopic(10).recordsForTopic("server1.dbo.tableb");
        Assertions.assertThat(recordsForTopic).hasSize(10);
        for (int i2 = 0; i2 < 5; i2++) {
            SourceRecord sourceRecord = (SourceRecord) recordsForTopic.get(i2);
            List<SchemaAndValueField> asList = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, Integer.valueOf(i2 + 10)), new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, "b"));
            Struct struct = (Struct) sourceRecord.value();
            assertRecord((Struct) struct.get("after"), asList);
            Assert.assertNull(struct.get("before"));
        }
        for (int i3 = 0; i3 < 5; i3++) {
            SourceRecord sourceRecord2 = (SourceRecord) recordsForTopic.get(i3 + 5);
            List<SchemaAndValueField> asList2 = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, Integer.valueOf(i3 + 10)), new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, "b"));
            List<SchemaAndValueField> asList3 = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, Integer.valueOf(i3 + 10)), new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, "z"));
            Struct struct2 = (Struct) sourceRecord2.value();
            assertRecord((Struct) struct2.get("before"), asList2);
            assertRecord((Struct) struct2.get("after"), asList3);
        }
        stopConnector();
    }

    @Test
    public void updatePrimaryKey() throws Exception {
        start(SqlServerConnector.class, TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.INITIAL).build());
        assertConnectorIsRunning();
        consumeRecordsByTopic(1);
        this.connection.execute(new String[]{"INSERT INTO tableb VALUES(1, 'b')"});
        consumeRecordsByTopic(1);
        this.connection.setAutoCommit(false);
        this.connection.execute(new String[]{"UPDATE tablea SET id=100 WHERE id=1", "UPDATE tableb SET id=100 WHERE id=1"});
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(6);
        List recordsForTopic = consumeRecordsByTopic.recordsForTopic("server1.dbo.tablea");
        List recordsForTopic2 = consumeRecordsByTopic.recordsForTopic("server1.dbo.tableb");
        Assertions.assertThat(recordsForTopic).hasSize(3);
        Assertions.assertThat(recordsForTopic2).hasSize(3);
        List<SchemaAndValueField> asList = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, 1), new SchemaAndValueField("cola", Schema.OPTIONAL_STRING_SCHEMA, "a"));
        List<SchemaAndValueField> asList2 = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, 1));
        List<SchemaAndValueField> asList3 = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, 100), new SchemaAndValueField("cola", Schema.OPTIONAL_STRING_SCHEMA, "a"));
        List<SchemaAndValueField> asList4 = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, 100));
        SourceRecord sourceRecord = (SourceRecord) recordsForTopic.get(0);
        SourceRecord sourceRecord2 = (SourceRecord) recordsForTopic.get(1);
        SourceRecord sourceRecord3 = (SourceRecord) recordsForTopic.get(2);
        Struct struct = (Struct) sourceRecord.key();
        Struct struct2 = (Struct) sourceRecord.value();
        assertRecord(struct2.getStruct("before"), asList);
        assertRecord(struct, asList2);
        Assert.assertNull(struct2.get("after"));
        Struct struct3 = (Struct) sourceRecord2.key();
        Struct struct4 = (Struct) sourceRecord2.value();
        assertRecord(struct3, asList2);
        Assert.assertNull(struct4);
        Struct struct5 = (Struct) sourceRecord3.key();
        Struct struct6 = (Struct) sourceRecord3.value();
        assertRecord(struct6.getStruct("after"), asList3);
        assertRecord(struct5, asList4);
        Assert.assertNull(struct6.get("before"));
        List<SchemaAndValueField> asList5 = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, 1), new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, "b"));
        List<SchemaAndValueField> asList6 = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, 1));
        List<SchemaAndValueField> asList7 = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, 100), new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, "b"));
        List<SchemaAndValueField> asList8 = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, 100));
        SourceRecord sourceRecord4 = (SourceRecord) recordsForTopic2.get(0);
        SourceRecord sourceRecord5 = (SourceRecord) recordsForTopic2.get(1);
        SourceRecord sourceRecord6 = (SourceRecord) recordsForTopic2.get(2);
        Struct struct7 = (Struct) sourceRecord4.key();
        Struct struct8 = (Struct) sourceRecord4.value();
        assertRecord(struct8.getStruct("before"), asList5);
        assertRecord(struct7, asList6);
        Assert.assertNull(struct8.get("after"));
        Assertions.assertThat(struct8.getStruct("source").getInt64("event_serial_no")).isEqualTo(1L);
        Struct struct9 = (Struct) sourceRecord5.key();
        Struct struct10 = (Struct) sourceRecord5.value();
        assertRecord(struct9, asList6);
        Assert.assertNull(struct10);
        Struct struct11 = (Struct) sourceRecord6.key();
        Struct struct12 = (Struct) sourceRecord6.value();
        assertRecord(struct12.getStruct("after"), asList7);
        assertRecord(struct11, asList8);
        Assert.assertNull(struct12.get("before"));
        Assertions.assertThat(struct12.getStruct("source").getInt64("event_serial_no")).isEqualTo(2L);
        stopConnector();
    }

    @Test
    @FixFor({"DBZ-1152"})
    public void updatePrimaryKeyWithRestartInMiddle() throws Exception {
        Configuration build = TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.INITIAL).build();
        start(SqlServerConnector.class, build, sourceRecord -> {
            Struct struct = (Struct) sourceRecord.value();
            return struct != null && "c".equals(struct.get("op")) && struct.getStruct("after").getInt32("id").intValue() == 100;
        });
        assertConnectorIsRunning();
        consumeRecordsByTopic(1);
        this.connection.execute(new String[]{"INSERT INTO tableb VALUES(1, 'b')"});
        consumeRecordsByTopic(1);
        this.connection.setAutoCommit(false);
        this.connection.execute(new String[]{"UPDATE tablea SET id=100 WHERE id=1", "UPDATE tableb SET id=100 WHERE id=1"});
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(2);
        stopConnector();
        start(SqlServerConnector.class, build);
        assertConnectorIsRunning();
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(4);
        List recordsForTopic = consumeRecordsByTopic.recordsForTopic("server1.dbo.tablea");
        recordsForTopic.addAll(consumeRecordsByTopic2.recordsForTopic("server1.dbo.tablea"));
        List recordsForTopic2 = consumeRecordsByTopic2.recordsForTopic("server1.dbo.tableb");
        Assertions.assertThat(recordsForTopic).hasSize(3);
        Assertions.assertThat(recordsForTopic2).hasSize(3);
        List<SchemaAndValueField> asList = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, 1), new SchemaAndValueField("cola", Schema.OPTIONAL_STRING_SCHEMA, "a"));
        List<SchemaAndValueField> asList2 = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, 1));
        List<SchemaAndValueField> asList3 = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, 100), new SchemaAndValueField("cola", Schema.OPTIONAL_STRING_SCHEMA, "a"));
        List<SchemaAndValueField> asList4 = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, 100));
        SourceRecord sourceRecord2 = (SourceRecord) recordsForTopic.get(0);
        SourceRecord sourceRecord3 = (SourceRecord) recordsForTopic.get(1);
        SourceRecord sourceRecord4 = (SourceRecord) recordsForTopic.get(2);
        Struct struct = (Struct) sourceRecord2.key();
        Struct struct2 = (Struct) sourceRecord2.value();
        assertRecord(struct2.getStruct("before"), asList);
        assertRecord(struct, asList2);
        Assert.assertNull(struct2.get("after"));
        Struct struct3 = (Struct) sourceRecord3.key();
        Struct struct4 = (Struct) sourceRecord3.value();
        assertRecord(struct3, asList2);
        Assert.assertNull(struct4);
        Struct struct5 = (Struct) sourceRecord4.key();
        Struct struct6 = (Struct) sourceRecord4.value();
        assertRecord(struct6.getStruct("after"), asList3);
        assertRecord(struct5, asList4);
        Assert.assertNull(struct6.get("before"));
        List<SchemaAndValueField> asList5 = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, 1), new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, "b"));
        List<SchemaAndValueField> asList6 = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, 1));
        List<SchemaAndValueField> asList7 = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, 100), new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, "b"));
        List<SchemaAndValueField> asList8 = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, 100));
        SourceRecord sourceRecord5 = (SourceRecord) recordsForTopic2.get(0);
        SourceRecord sourceRecord6 = (SourceRecord) recordsForTopic2.get(1);
        SourceRecord sourceRecord7 = (SourceRecord) recordsForTopic2.get(2);
        Struct struct7 = (Struct) sourceRecord5.key();
        Struct struct8 = (Struct) sourceRecord5.value();
        assertRecord(struct8.getStruct("before"), asList5);
        assertRecord(struct7, asList6);
        Assert.assertNull(struct8.get("after"));
        Struct struct9 = (Struct) sourceRecord6.key();
        Struct struct10 = (Struct) sourceRecord6.value();
        assertRecord(struct9, asList6);
        Assert.assertNull(struct10);
        Struct struct11 = (Struct) sourceRecord7.key();
        Struct struct12 = (Struct) sourceRecord7.value();
        assertRecord(struct12.getStruct("after"), asList7);
        assertRecord(struct11, asList8);
        Assert.assertNull(struct12.get("before"));
        stopConnector();
    }

    @Test
    @FixFor({"DBZ-2329"})
    public void updatePrimaryKeyTwiceWithRestartInMiddleOfTx() throws Exception {
        Configuration build = TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.INITIAL).with(SqlServerConnectorConfig.MAX_QUEUE_SIZE, 2).with(SqlServerConnectorConfig.MAX_BATCH_SIZE, 1).with(SqlServerConnectorConfig.TOMBSTONES_ON_DELETE, false).build();
        start(SqlServerConnector.class, build, sourceRecord -> {
            Struct struct = (Struct) sourceRecord.value();
            return struct != null && "d".equals(struct.get("op")) && struct.getStruct("before").getInt32("id").intValue() == 305;
        });
        assertConnectorIsRunning();
        consumeRecordsByTopic(1);
        this.connection.setAutoCommit(false);
        this.connection.execute(new String[]{"INSERT INTO tableb (id, colb) values (1,'1')"});
        this.connection.execute(new String[]{"INSERT INTO tableb (id, colb) values (2,'2')"});
        this.connection.execute(new String[]{"INSERT INTO tableb (id, colb) values (3,'3')"});
        this.connection.execute(new String[]{"INSERT INTO tableb (id, colb) values (4,'4')"});
        this.connection.execute(new String[]{"INSERT INTO tableb (id, colb) values (5,'5')"});
        consumeRecordsByTopic(5);
        this.connection.execute(new String[]{"UPDATE tableb set id = colb + 300"});
        this.connection.execute(new String[]{"UPDATE tableb set id = colb + 300"});
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(14);
        stopConnector();
        start(SqlServerConnector.class, build);
        assertConnectorIsRunning();
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(6);
        List recordsForTopic = consumeRecordsByTopic.recordsForTopic("server1.dbo.tableb");
        recordsForTopic.addAll(consumeRecordsByTopic2.recordsForTopic("server1.dbo.tableb"));
        Assertions.assertThat(recordsForTopic).hasSize(20);
        stopConnector();
    }

    @Test
    public void streamChangesWhileStopped() throws Exception {
        Configuration build = TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.INITIAL).build();
        start(SqlServerConnector.class, build);
        assertConnectorIsRunning();
        consumeRecordsByTopic(1);
        for (int i = 0; i < 5; i++) {
            int i2 = 10 + i;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + i2 + ", 'a')"});
            this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + i2 + ", 'b')"});
        }
        consumeRecordsByTopic(10);
        stopConnector();
        for (int i3 = 0; i3 < 5; i3++) {
            int i4 = 100 + i3;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + i4 + ", 'a')"});
            this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + i4 + ", 'b')"});
        }
        start(SqlServerConnector.class, build);
        assertConnectorIsRunning();
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(10);
        List recordsForTopic = consumeRecordsByTopic.recordsForTopic("server1.dbo.tablea");
        List recordsForTopic2 = consumeRecordsByTopic.recordsForTopic("server1.dbo.tableb");
        Assertions.assertThat(recordsForTopic).hasSize(5);
        Assertions.assertThat(recordsForTopic2).hasSize(5);
        for (int i5 = 0; i5 < 5; i5++) {
            int i6 = i5 + 100;
            SourceRecord sourceRecord = (SourceRecord) recordsForTopic.get(i5);
            SourceRecord sourceRecord2 = (SourceRecord) recordsForTopic2.get(i5);
            List<SchemaAndValueField> asList = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, Integer.valueOf(i6)), new SchemaAndValueField("cola", Schema.OPTIONAL_STRING_SCHEMA, "a"));
            List<SchemaAndValueField> asList2 = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, Integer.valueOf(i6)), new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, "b"));
            Struct struct = (Struct) sourceRecord.value();
            assertRecord((Struct) struct.get("after"), asList);
            Assert.assertNull(struct.get("before"));
            Struct struct2 = (Struct) sourceRecord2.value();
            assertRecord((Struct) struct2.get("after"), asList2);
            Assert.assertNull(struct2.get("before"));
        }
    }

    @Test
    @FixFor({"DBZ-1069"})
    public void verifyOffsets() throws Exception {
        Configuration build = TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.INITIAL).build();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 5; i++) {
            int i2 = 10 + i;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + i2 + ", 'a')"});
            this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + i2 + ", 'b')"});
            arrayList.add(Integer.valueOf(i2));
        }
        String nameOfChangeTable = this.connection.getNameOfChangeTable("tablea");
        String nameOfChangeTable2 = this.connection.getNameOfChangeTable("tableb");
        TestHelper.waitForCdcRecord(this.connection, "tableb", resultSet -> {
            return resultSet.getInt("id") == ((Integer) arrayList.get(arrayList.size() - 1)).intValue();
        });
        Awaitility.await().atMost(30L, TimeUnit.SECONDS).until(() -> {
            if (!this.connection.getMaxLsn(TestHelper.TEST_DATABASE).isAvailable()) {
                return false;
            }
            HashMap hashMap = new HashMap();
            this.connection.listOfChangeTables(TestHelper.TEST_DATABASE).forEach(sqlServerChangeTable -> {
                String table = sqlServerChangeTable.getChangeTableId().table();
                if (table.endsWith("dbo_" + nameOfChangeTable) || table.endsWith("dbo_" + nameOfChangeTable2)) {
                    try {
                        Lsn minLsn = this.connection.getMinLsn(TestHelper.TEST_DATABASE, table);
                        Lsn maxLsn = this.connection.getMaxLsn(TestHelper.TEST_DATABASE);
                        SqlServerChangeTable[] sqlServerChangeTableArr = (SqlServerChangeTable[]) Collections.singletonList(sqlServerChangeTable).toArray(new SqlServerChangeTable[0]);
                        ArrayList arrayList2 = new ArrayList();
                        this.connection.getChangesForTables(TestHelper.TEST_DATABASE, sqlServerChangeTableArr, minLsn, maxLsn, resultSetArr -> {
                            ResultSet resultSet2 = resultSetArr[0];
                            while (resultSet2.next()) {
                                arrayList2.add(Integer.valueOf(resultSet2.getInt("id")));
                            }
                        });
                        if (arrayList2.equals(arrayList)) {
                            hashMap.put(table, true);
                        } else {
                            hashMap.put(table, false);
                        }
                    } catch (Exception e) {
                        Assert.fail("Failed to fetch changes for table " + table + ": " + e.getMessage());
                    }
                }
            });
            return Boolean.valueOf(hashMap.values().stream().filter(bool -> {
                return !bool.booleanValue();
            }).count() == 0);
        });
        start(SqlServerConnector.class, build);
        assertConnectorIsRunning();
        List allRecordsInOrder = consumeRecordsByTopic(11).allRecordsInOrder();
        Iterator it = allRecordsInOrder.subList(1, allRecordsInOrder.size()).iterator();
        while (it.hasNext()) {
            SourceRecord sourceRecord = (SourceRecord) it.next();
            ((ObjectAssert) Assertions.assertThat(sourceRecord.sourceOffset().get("snapshot")).as("Snapshot phase")).isEqualTo(true);
            if (it.hasNext()) {
                ((ObjectAssert) Assertions.assertThat(sourceRecord.sourceOffset().get("snapshot_completed")).as("Snapshot in progress")).isEqualTo(false);
            } else {
                ((ObjectAssert) Assertions.assertThat(sourceRecord.sourceOffset().get("snapshot_completed")).as("Snapshot completed")).isEqualTo(true);
            }
        }
        stopConnector();
        for (int i3 = 0; i3 < 5; i3++) {
            int i4 = 100 + i3;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + i4 + ", 'a')"});
            this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + i4 + ", 'b')"});
        }
        start(SqlServerConnector.class, build);
        assertConnectorIsRunning();
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(10);
        List recordsForTopic = consumeRecordsByTopic.recordsForTopic("server1.dbo.tablea");
        List recordsForTopic2 = consumeRecordsByTopic.recordsForTopic("server1.dbo.tableb");
        Assertions.assertThat(recordsForTopic).hasSize(5);
        Assertions.assertThat(recordsForTopic2).hasSize(5);
        for (int i5 = 0; i5 < 5; i5++) {
            int i6 = i5 + 100;
            SourceRecord sourceRecord2 = (SourceRecord) recordsForTopic.get(i5);
            SourceRecord sourceRecord3 = (SourceRecord) recordsForTopic2.get(i5);
            List<SchemaAndValueField> asList = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, Integer.valueOf(i6)), new SchemaAndValueField("cola", Schema.OPTIONAL_STRING_SCHEMA, "a"));
            List<SchemaAndValueField> asList2 = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, Integer.valueOf(i6)), new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, "b"));
            Struct struct = (Struct) sourceRecord2.value();
            assertRecord((Struct) struct.get("after"), asList);
            Assert.assertNull(struct.get("before"));
            Struct struct2 = (Struct) sourceRecord3.value();
            assertRecord((Struct) struct2.get("after"), asList2);
            Assert.assertNull(struct2.get("before"));
            ((ObjectAssert) Assertions.assertThat(sourceRecord2.sourceOffset().get("snapshot")).as("Streaming phase")).isNull();
            ((ObjectAssert) Assertions.assertThat(sourceRecord2.sourceOffset().get("snapshot_completed")).as("Streaming phase")).isNull();
            ((ObjectAssert) Assertions.assertThat(sourceRecord2.sourceOffset().get("change_lsn")).as("LSN present")).isNotNull();
            ((ObjectAssert) Assertions.assertThat(sourceRecord3.sourceOffset().get("snapshot")).as("Streaming phase")).isNull();
            ((ObjectAssert) Assertions.assertThat(sourceRecord3.sourceOffset().get("snapshot_completed")).as("Streaming phase")).isNull();
            ((ObjectAssert) Assertions.assertThat(sourceRecord3.sourceOffset().get("change_lsn")).as("LSN present")).isNotNull();
        }
    }

    @Test
    public void testWhitelistTable() throws Exception {
        Configuration build = TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.SCHEMA_ONLY).with(SqlServerConnectorConfig.TABLE_WHITELIST, "dbo.tableb").build();
        this.connection.execute(new String[]{"INSERT INTO tableb VALUES(1, 'b')"});
        start(SqlServerConnector.class, build);
        assertConnectorIsRunning();
        consumeRecordsByTopic(1);
        for (int i = 0; i < 5; i++) {
            int i2 = 10 + i;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + i2 + ", 'a')"});
            this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + i2 + ", 'b')"});
        }
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(5);
        List recordsForTopic = consumeRecordsByTopic.recordsForTopic("server1.dbo.tablea");
        List recordsForTopic2 = consumeRecordsByTopic.recordsForTopic("server1.dbo.tableb");
        Assertions.assertThat(recordsForTopic == null || recordsForTopic.isEmpty()).isTrue();
        Assertions.assertThat(recordsForTopic2).hasSize(5);
        stopConnector();
    }

    @Test
    public void testTableIncludeList() throws Exception {
        Configuration build = TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.INITIAL).with(SqlServerConnectorConfig.TABLE_INCLUDE_LIST, "^dbo.tableb$").build();
        this.connection.execute(new String[]{"INSERT INTO tableb VALUES(1, 'b')"});
        start(SqlServerConnector.class, build);
        assertConnectorIsRunning();
        Assertions.assertThat(consumeRecordsByTopic(1).recordsForTopic("server1.dbo.tableb")).isNotEmpty();
        for (int i = 0; i < 5; i++) {
            int i2 = 10 + i;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + i2 + ", 'a')"});
            this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + i2 + ", 'b')"});
        }
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(5);
        List recordsForTopic = consumeRecordsByTopic.recordsForTopic("server1.dbo.tablea");
        List recordsForTopic2 = consumeRecordsByTopic.recordsForTopic("server1.dbo.tableb");
        Assertions.assertThat(recordsForTopic == null || recordsForTopic.isEmpty()).isTrue();
        Assertions.assertThat(recordsForTopic2).hasSize(5);
        stopConnector();
    }

    @Test
    public void testBlacklistTable() throws Exception {
        Configuration build = TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.INITIAL).with(SqlServerConnectorConfig.TABLE_BLACKLIST, "dbo.tablea").build();
        this.connection.execute(new String[]{"INSERT INTO tableb VALUES(1, 'b')"});
        start(SqlServerConnector.class, build);
        assertConnectorIsRunning();
        consumeRecordsByTopic(1);
        for (int i = 0; i < 5; i++) {
            int i2 = 10 + i;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + i2 + ", 'a')"});
            this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + i2 + ", 'b')"});
        }
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(5);
        List recordsForTopic = consumeRecordsByTopic.recordsForTopic("server1.dbo.tablea");
        List recordsForTopic2 = consumeRecordsByTopic.recordsForTopic("server1.dbo.tableb");
        Assertions.assertThat(recordsForTopic == null || recordsForTopic.isEmpty()).isTrue();
        Assertions.assertThat(recordsForTopic2).hasSize(5);
        stopConnector();
    }

    @Test
    public void testTableExcludeList() throws Exception {
        Configuration build = TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.INITIAL).with(SqlServerConnectorConfig.TABLE_EXCLUDE_LIST, "dbo.tablea").build();
        this.connection.execute(new String[]{"INSERT INTO tableb VALUES(1, 'b')"});
        start(SqlServerConnector.class, build);
        assertConnectorIsRunning();
        consumeRecordsByTopic(1);
        for (int i = 0; i < 5; i++) {
            int i2 = 10 + i;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + i2 + ", 'a')"});
            this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + i2 + ", 'b')"});
        }
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(5);
        List recordsForTopic = consumeRecordsByTopic.recordsForTopic("server1.dbo.tablea");
        List recordsForTopic2 = consumeRecordsByTopic.recordsForTopic("server1.dbo.tableb");
        Assertions.assertThat(recordsForTopic == null || recordsForTopic.isEmpty()).isTrue();
        Assertions.assertThat(recordsForTopic2).hasSize(5);
        stopConnector();
    }

    @Test
    @FixFor({"DBZ-1617"})
    public void blacklistColumnWhenCdcColumnsDoNotMatchWithOriginalSnapshot() throws Exception {
        this.connection.execute(new String[]{"CREATE TABLE table_a (id int, name varchar(30), amount integer primary key(id))"});
        TestHelper.enableTableCdc(this.connection, "table_a");
        this.connection.execute(new String[]{"ALTER TABLE table_a ADD blacklisted_column varchar(30)"});
        start(SqlServerConnector.class, TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.SCHEMA_ONLY).with(SqlServerConnectorConfig.COLUMN_EXCLUDE_LIST, "dbo.table_a.blacklisted_column").build());
        assertConnectorIsRunning();
        consumeRecordsByTopic(1);
        this.connection.execute(new String[]{"INSERT INTO table_a VALUES(10, 'some_name', 120, 'some_string')"});
        List recordsForTopic = consumeRecordsByTopic(1).recordsForTopic("server1.dbo.table_a");
        Schema build = SchemaBuilder.struct().optional().name("server1.dbo.table_a.Value").field("id", Schema.INT32_SCHEMA).field("name", Schema.OPTIONAL_STRING_SCHEMA).field("amount", Schema.OPTIONAL_INT32_SCHEMA).build();
        Struct put = new Struct(build).put("id", 10).put("name", "some_name").put("amount", 120);
        Assertions.assertThat(recordsForTopic).hasSize(1);
        SourceRecordAssert.assertThat((SourceRecord) recordsForTopic.get(0)).valueAfterFieldIsEqualTo(put).valueAfterFieldSchemaIsEqualTo(build);
        stopConnector();
    }

    @Test
    @FixFor({"DBZ-1067"})
    public void testBlacklistColumn() throws Exception {
        this.connection.execute(new String[]{"CREATE TABLE blacklist_column_table_a (id int, name varchar(30), amount integer primary key(id))", "CREATE TABLE blacklist_column_table_b (id int, name varchar(30), amount integer primary key(id))"});
        TestHelper.enableTableCdc(this.connection, "blacklist_column_table_a");
        TestHelper.enableTableCdc(this.connection, "blacklist_column_table_b");
        start(SqlServerConnector.class, TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.SCHEMA_ONLY).with(SqlServerConnectorConfig.COLUMN_BLACKLIST, "dbo.blacklist_column_table_a.amount").build());
        assertConnectorIsRunning();
        consumeRecordsByTopic(1);
        this.connection.execute(new String[]{"INSERT INTO blacklist_column_table_a VALUES(10, 'some_name', 120)"});
        this.connection.execute(new String[]{"INSERT INTO blacklist_column_table_b VALUES(11, 'some_name', 447)"});
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(2);
        List recordsForTopic = consumeRecordsByTopic.recordsForTopic("server1.dbo.blacklist_column_table_a");
        List recordsForTopic2 = consumeRecordsByTopic.recordsForTopic("server1.dbo.blacklist_column_table_b");
        Schema build = SchemaBuilder.struct().optional().name("server1.dbo.blacklist_column_table_a.Value").field("id", Schema.INT32_SCHEMA).field("name", Schema.OPTIONAL_STRING_SCHEMA).build();
        Struct put = new Struct(build).put("id", 10).put("name", "some_name");
        Schema build2 = SchemaBuilder.struct().optional().name("server1.dbo.blacklist_column_table_b.Value").field("id", Schema.INT32_SCHEMA).field("name", Schema.OPTIONAL_STRING_SCHEMA).field("amount", Schema.OPTIONAL_INT32_SCHEMA).build();
        Struct put2 = new Struct(build2).put("id", 11).put("name", "some_name").put("amount", 447);
        Assertions.assertThat(recordsForTopic).hasSize(1);
        SourceRecordAssert.assertThat((SourceRecord) recordsForTopic.get(0)).valueAfterFieldIsEqualTo(put).valueAfterFieldSchemaIsEqualTo(build);
        Assertions.assertThat(recordsForTopic2).hasSize(1);
        SourceRecordAssert.assertThat((SourceRecord) recordsForTopic2.get(0)).valueAfterFieldIsEqualTo(put2).valueAfterFieldSchemaIsEqualTo(build2);
        stopConnector();
    }

    @Test
    @FixFor({"DBZ-1067"})
    public void testColumnExcludeList() throws Exception {
        this.connection.execute(new String[]{"CREATE TABLE blacklist_column_table_a (id int, name varchar(30), amount integer primary key(id))", "CREATE TABLE blacklist_column_table_b (id int, name varchar(30), amount integer primary key(id))"});
        TestHelper.enableTableCdc(this.connection, "blacklist_column_table_a");
        TestHelper.enableTableCdc(this.connection, "blacklist_column_table_b");
        start(SqlServerConnector.class, TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.SCHEMA_ONLY).with(SqlServerConnectorConfig.COLUMN_EXCLUDE_LIST, "dbo.blacklist_column_table_a.amount").build());
        assertConnectorIsRunning();
        consumeRecordsByTopic(1);
        this.connection.execute(new String[]{"INSERT INTO blacklist_column_table_a VALUES(10, 'some_name', 120)"});
        this.connection.execute(new String[]{"INSERT INTO blacklist_column_table_b VALUES(11, 'some_name', 447)"});
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(2);
        List recordsForTopic = consumeRecordsByTopic.recordsForTopic("server1.dbo.blacklist_column_table_a");
        List recordsForTopic2 = consumeRecordsByTopic.recordsForTopic("server1.dbo.blacklist_column_table_b");
        Schema build = SchemaBuilder.struct().optional().name("server1.dbo.blacklist_column_table_a.Value").field("id", Schema.INT32_SCHEMA).field("name", Schema.OPTIONAL_STRING_SCHEMA).build();
        Struct put = new Struct(build).put("id", 10).put("name", "some_name");
        Schema build2 = SchemaBuilder.struct().optional().name("server1.dbo.blacklist_column_table_b.Value").field("id", Schema.INT32_SCHEMA).field("name", Schema.OPTIONAL_STRING_SCHEMA).field("amount", Schema.OPTIONAL_INT32_SCHEMA).build();
        Struct put2 = new Struct(build2).put("id", 11).put("name", "some_name").put("amount", 447);
        Assertions.assertThat(recordsForTopic).hasSize(1);
        SourceRecordAssert.assertThat((SourceRecord) recordsForTopic.get(0)).valueAfterFieldIsEqualTo(put).valueAfterFieldSchemaIsEqualTo(build);
        Assertions.assertThat(recordsForTopic2).hasSize(1);
        SourceRecordAssert.assertThat((SourceRecord) recordsForTopic2.get(0)).valueAfterFieldIsEqualTo(put2).valueAfterFieldSchemaIsEqualTo(build2);
        stopConnector();
    }

    @Test
    @FixFor({"DBZ-2522"})
    public void testColumnIncludeList() throws Exception {
        this.connection.execute(new String[]{"CREATE TABLE include_list_column_table_a (id int, name varchar(30), amount integer primary key(id))", "CREATE TABLE include_list_column_table_b (id int, name varchar(30), amount integer primary key(id))"});
        TestHelper.enableTableCdc(this.connection, "include_list_column_table_a");
        TestHelper.enableTableCdc(this.connection, "include_list_column_table_b");
        start(SqlServerConnector.class, TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.SCHEMA_ONLY).with(SqlServerConnectorConfig.COLUMN_INCLUDE_LIST, ".*id,.*name,dbo.include_list_column_table_b.amount").build());
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted("sql_server", "server1");
        consumeRecordsByTopic(1);
        this.connection.execute(new String[]{"INSERT INTO include_list_column_table_a VALUES(10, 'some_name', 120)"});
        this.connection.execute(new String[]{"INSERT INTO include_list_column_table_b VALUES(11, 'some_name', 447)"});
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(2);
        List recordsForTopic = consumeRecordsByTopic.recordsForTopic("server1.dbo.include_list_column_table_a");
        List recordsForTopic2 = consumeRecordsByTopic.recordsForTopic("server1.dbo.include_list_column_table_b");
        Schema build = SchemaBuilder.struct().optional().name("server1.dbo.include_list_column_table_a.Value").field("id", Schema.INT32_SCHEMA).field("name", Schema.OPTIONAL_STRING_SCHEMA).build();
        Struct put = new Struct(build).put("id", 10).put("name", "some_name");
        Schema build2 = SchemaBuilder.struct().optional().name("server1.dbo.include_list_column_table_b.Value").field("id", Schema.INT32_SCHEMA).field("name", Schema.OPTIONAL_STRING_SCHEMA).field("amount", Schema.OPTIONAL_INT32_SCHEMA).build();
        Struct put2 = new Struct(build2).put("id", 11).put("name", "some_name").put("amount", 447);
        Assertions.assertThat(recordsForTopic).hasSize(1);
        SourceRecordAssert.assertThat((SourceRecord) recordsForTopic.get(0)).valueAfterFieldIsEqualTo(put).valueAfterFieldSchemaIsEqualTo(build);
        Assertions.assertThat(recordsForTopic2).hasSize(1);
        SourceRecordAssert.assertThat((SourceRecord) recordsForTopic2.get(0)).valueAfterFieldIsEqualTo(put2).valueAfterFieldSchemaIsEqualTo(build2);
        stopConnector();
    }

    @Test
    @FixFor({"DBZ-3505"})
    public void shouldHandleInvalidColumnFilter() throws Exception {
        Configuration build = TestHelper.defaultConfig().with(SqlServerConnectorConfig.COLUMN_INCLUDE_LIST, ".^").build();
        LogInterceptor logInterceptor = new LogInterceptor();
        start(SqlServerConnector.class, build);
        assertConnectorIsRunning();
        consumeRecordsByTopic(1);
        assertNoRecordsToConsume();
        stopConnector(z -> {
            Assertions.assertThat(logInterceptor.containsMessage("All columns in table testDB.dbo.tablea were excluded due to include/exclude lists, defaulting to selecting all columns")).isTrue();
        });
    }

    @Test
    @FixFor({"DBZ-1692"})
    public void shouldConsumeEventsWithMaskedHashedColumns() throws Exception {
        this.connection.execute(new String[]{"CREATE TABLE masked_hashed_column_table_a (id int, name varchar(255) primary key(id))", "CREATE TABLE masked_hashed_column_table_b (id int, name varchar(20), primary key(id))"});
        TestHelper.enableTableCdc(this.connection, "masked_hashed_column_table_a");
        TestHelper.enableTableCdc(this.connection, "masked_hashed_column_table_b");
        start(SqlServerConnector.class, TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.SCHEMA_ONLY).with("column.mask.hash.SHA-256.with.salt.CzQMA0cB5K", "testDB.dbo.masked_hashed_column_table_a.name, testDB.dbo.masked_hashed_column_table_b.name").build());
        assertConnectorIsRunning();
        consumeRecordsByTopic(1);
        this.connection.execute(new String[]{"INSERT INTO masked_hashed_column_table_a VALUES(10, 'some_name')"});
        this.connection.execute(new String[]{"INSERT INTO masked_hashed_column_table_b VALUES(11, 'some_name')"});
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(2);
        List recordsForTopic = consumeRecordsByTopic.recordsForTopic("server1.dbo.masked_hashed_column_table_a");
        List recordsForTopic2 = consumeRecordsByTopic.recordsForTopic("server1.dbo.masked_hashed_column_table_b");
        Assertions.assertThat(recordsForTopic).hasSize(1);
        SourceRecord sourceRecord = (SourceRecord) recordsForTopic.get(0);
        VerifyRecord.isValidInsert(sourceRecord, "id", 10);
        Struct struct = (Struct) sourceRecord.value();
        if (struct.getStruct("after") != null) {
            Assertions.assertThat(struct.getStruct("after").getString("name")).isEqualTo("3b225d0696535d66f2c0fb2e36b012c520d396af3dd8f18330b9c9cd23ca714e");
        }
        Assertions.assertThat(recordsForTopic2).hasSize(1);
        SourceRecord sourceRecord2 = (SourceRecord) recordsForTopic2.get(0);
        VerifyRecord.isValidInsert(sourceRecord2, "id", 11);
        Struct struct2 = (Struct) sourceRecord2.value();
        if (struct2.getStruct("after") != null) {
            Assertions.assertThat(struct2.getStruct("after").getString("name")).isEqualTo("3b225d0696535d66f2c0");
        }
        stopConnector();
    }

    @Test
    @FixFor({"DBZ-1972"})
    public void shouldConsumeEventsWithMaskedAndTruncatedColumns() throws Exception {
        this.connection.execute(new String[]{"CREATE TABLE masked_hashed_column_table (id int, name varchar(255) primary key(id))", "CREATE TABLE truncated_column_table (id int, name varchar(20), primary key(id))"});
        TestHelper.enableTableCdc(this.connection, "masked_hashed_column_table");
        TestHelper.enableTableCdc(this.connection, "truncated_column_table");
        start(SqlServerConnector.class, TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.SCHEMA_ONLY).with("column.mask.with.12.chars", "testDB.dbo.masked_hashed_column_table.name").with("column.truncate.to.4.chars", "testDB.dbo.truncated_column_table.name").build());
        assertConnectorIsRunning();
        consumeRecordsByTopic(1);
        this.connection.execute(new String[]{"INSERT INTO masked_hashed_column_table VALUES(10, 'some_name')"});
        this.connection.execute(new String[]{"INSERT INTO truncated_column_table VALUES(11, 'some_name')"});
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(2);
        List recordsForTopic = consumeRecordsByTopic.recordsForTopic("server1.dbo.masked_hashed_column_table");
        List recordsForTopic2 = consumeRecordsByTopic.recordsForTopic("server1.dbo.truncated_column_table");
        Assertions.assertThat(recordsForTopic).hasSize(1);
        SourceRecord sourceRecord = (SourceRecord) recordsForTopic.get(0);
        VerifyRecord.isValidInsert(sourceRecord, "id", 10);
        Struct struct = (Struct) sourceRecord.value();
        if (struct.getStruct("after") != null) {
            Assertions.assertThat(struct.getStruct("after").getString("name")).isEqualTo("************");
        }
        Assertions.assertThat(recordsForTopic2).hasSize(1);
        SourceRecord sourceRecord2 = (SourceRecord) recordsForTopic2.get(0);
        VerifyRecord.isValidInsert(sourceRecord2, "id", 11);
        Struct struct2 = (Struct) sourceRecord2.value();
        if (struct2.getStruct("after") != null) {
            Assertions.assertThat(struct2.getStruct("after").getString("name")).isEqualTo("some");
        }
        stopConnector();
    }

    @Test
    @FixFor({"DBZ-2522"})
    public void whenCaptureInstanceExcludesColumnsExpectSnapshotAndStreamingToExcludeColumns() throws Exception {
        this.connection.execute(new String[]{"CREATE TABLE excluded_column_table_a (id int, name varchar(30), amount integer primary key(id))"});
        this.connection.execute(new String[]{"INSERT INTO excluded_column_table_a VALUES(10, 'a name', 100)"});
        TestHelper.enableTableCdc(this.connection, "excluded_column_table_a", "dbo_excluded_column_table_a", Arrays.asList("id", "name"));
        start(SqlServerConnector.class, TestHelper.defaultConfig().build());
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted("sql_server", "server1");
        this.connection.execute(new String[]{"INSERT INTO excluded_column_table_a VALUES(11, 'some_name', 120)"});
        List recordsForTopic = consumeRecordsByTopic(3).recordsForTopic("server1.dbo.excluded_column_table_a");
        Schema build = SchemaBuilder.struct().optional().name("server1.dbo.excluded_column_table_a.Value").field("id", Schema.INT32_SCHEMA).field("name", Schema.OPTIONAL_STRING_SCHEMA).build();
        Struct put = new Struct(build).put("id", 10).put("name", "a name");
        Struct put2 = new Struct(build).put("id", 11).put("name", "some_name");
        Assertions.assertThat(recordsForTopic).hasSize(2);
        SourceRecordAssert.assertThat((SourceRecord) recordsForTopic.get(0)).valueAfterFieldSchemaIsEqualTo(build).valueAfterFieldIsEqualTo(put);
        SourceRecordAssert.assertThat((SourceRecord) recordsForTopic.get(1)).valueAfterFieldSchemaIsEqualTo(build).valueAfterFieldIsEqualTo(put2);
        stopConnector();
    }

    @Test
    @FixFor({"DBZ-2522"})
    public void whenMultipleCaptureInstancesExcludesColumnsExpectLatestCDCTableUtilized() throws Exception {
        this.connection.execute(new String[]{"CREATE TABLE excluded_column_table_a (id int, name varchar(30), amount integer primary key(id))"});
        this.connection.execute(new String[]{"INSERT INTO excluded_column_table_a VALUES(10, 'a name', 100)"});
        TestHelper.enableTableCdc(this.connection, "excluded_column_table_a", "dbo_excluded_column_table_a", Arrays.asList("id", "name"));
        this.connection.execute(new String[]{"ALTER TABLE excluded_column_table_a ADD note varchar(30)"});
        TestHelper.enableTableCdc(this.connection, "excluded_column_table_a", "dbo_excluded_column_table_a_2", Arrays.asList("id", "name", "note"));
        start(SqlServerConnector.class, TestHelper.defaultConfig().build());
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted("sql_server", "server1");
        this.connection.execute(new String[]{"INSERT INTO excluded_column_table_a VALUES(11, 'some_name', 120, 'a note')"});
        List recordsForTopic = consumeRecordsByTopic(3).recordsForTopic("server1.dbo.excluded_column_table_a");
        Schema build = SchemaBuilder.struct().optional().name("server1.dbo.excluded_column_table_a.Value").field("id", Schema.INT32_SCHEMA).field("name", Schema.OPTIONAL_STRING_SCHEMA).field("note", Schema.OPTIONAL_STRING_SCHEMA).build();
        Struct put = new Struct(build).put("id", 10).put("name", "a name").put("note", (Object) null);
        Struct put2 = new Struct(build).put("id", 11).put("name", "some_name").put("note", "a note");
        Assertions.assertThat(recordsForTopic).hasSize(2);
        SourceRecordAssert.assertThat((SourceRecord) recordsForTopic.get(0)).valueAfterFieldSchemaIsEqualTo(build).valueAfterFieldIsEqualTo(put);
        SourceRecordAssert.assertThat((SourceRecord) recordsForTopic.get(1)).valueAfterFieldSchemaIsEqualTo(build).valueAfterFieldIsEqualTo(put2);
        stopConnector();
    }

    @Test
    @FixFor({"DBZ-2522"})
    @Ignore
    public void whenCaptureInstanceExcludesColumnsAndColumnsRenamedExpectNoErrors() throws Exception {
        this.connection.execute(new String[]{"CREATE TABLE excluded_column_table_a (id int, name varchar(30), amount integer primary key(id))"});
        this.connection.execute(new String[]{"INSERT INTO excluded_column_table_a VALUES(10, 'a name', 100)"});
        TestHelper.enableTableCdc(this.connection, "excluded_column_table_a", "dbo_excluded_column_table_a", Arrays.asList("id", "name"));
        start(SqlServerConnector.class, TestHelper.defaultConfig().with(RelationalDatabaseConnectorConfig.TABLE_INCLUDE_LIST, ".*excluded_column_table_a").build());
        assertConnectorIsRunning();
        waitForStreamingRunning("sql_server", "server1");
        TestHelper.disableTableCdc(this.connection, "excluded_column_table_a");
        this.connection.execute(new String[]{"EXEC sp_RENAME 'excluded_column_table_a.name', 'first_name', 'COLUMN'"});
        TestHelper.enableTableCdc(this.connection, "excluded_column_table_a", "dbo_excluded_column_table_a", Arrays.asList("id", "first_name"));
        this.connection.execute(new String[]{"INSERT INTO excluded_column_table_a VALUES(11, 'some_name', 120)"});
        TestHelper.waitForCdcRecord(this.connection, "excluded_column_table_a", "dbo_excluded_column_table_a", resultSet -> {
            return resultSet.getInt("id") == 11;
        });
        List recordsForTopic = consumeRecordsByTopic(2).recordsForTopic("server1.dbo.excluded_column_table_a");
        Schema build = SchemaBuilder.struct().optional().name("server1.dbo.excluded_column_table_a.Value").field("id", Schema.INT32_SCHEMA).field("name", Schema.OPTIONAL_STRING_SCHEMA).build();
        Struct put = new Struct(build).put("id", 10).put("name", "a name");
        Schema build2 = SchemaBuilder.struct().optional().name("server1.dbo.excluded_column_table_a.Value").field("id", Schema.INT32_SCHEMA).field("first_name", Schema.OPTIONAL_STRING_SCHEMA).build();
        Struct put2 = new Struct(build2).put("id", 11).put("first_name", "some_name");
        Assertions.assertThat(recordsForTopic).hasSize(2);
        SourceRecordAssert.assertThat((SourceRecord) recordsForTopic.get(0)).valueAfterFieldSchemaIsEqualTo(build).valueAfterFieldIsEqualTo(put);
        SourceRecordAssert.assertThat((SourceRecord) recordsForTopic.get(1)).valueAfterFieldSchemaIsEqualTo(build2).valueAfterFieldIsEqualTo(put2);
        stopConnector();
    }

    @Test
    @FixFor({"DBZ-1068"})
    public void excludeColumnWhenCaptureInstanceExcludesColumns() throws Exception {
        this.connection.execute(new String[]{"CREATE TABLE excluded_column_table_a (id int, name varchar(30), amount integer primary key(id))"});
        TestHelper.enableTableCdc(this.connection, "excluded_column_table_a", "dbo_excluded_column_table_a", Arrays.asList("id", "name"));
        start(SqlServerConnector.class, TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.SCHEMA_ONLY).build());
        assertConnectorIsRunning();
        waitForStreamingRunning("sql_server", "server1");
        this.connection.execute(new String[]{"INSERT INTO excluded_column_table_a VALUES(10, 'some_name', 120)"});
        List recordsForTopic = consumeRecordsByTopic(1).recordsForTopic("server1.dbo.excluded_column_table_a");
        Schema build = SchemaBuilder.struct().optional().name("server1.dbo.excluded_column_table_a.Value").field("id", Schema.INT32_SCHEMA).field("name", Schema.OPTIONAL_STRING_SCHEMA).build();
        Struct put = new Struct(build).put("id", 10).put("name", "some_name");
        Assertions.assertThat(recordsForTopic).hasSize(1);
        SourceRecordAssert.assertThat((SourceRecord) recordsForTopic.get(0)).valueAfterFieldSchemaIsEqualTo(build).valueAfterFieldIsEqualTo(put);
        stopConnector();
    }

    @Test
    @FixFor({"DBZ-2522"})
    public void excludeColumnWhenCaptureInstanceExcludesColumnInMiddleOfTable() throws Exception {
        this.connection.execute(new String[]{"CREATE TABLE exclude_list_column_table_a (id int, amount integer, name varchar(30), primary key(id))"});
        this.connection.execute(new String[]{"INSERT INTO exclude_list_column_table_a VALUES(10, 100, 'a name')"});
        TestHelper.enableTableCdc(this.connection, "exclude_list_column_table_a", "dbo_exclude_list_column_table_a", Arrays.asList("id", "name"));
        start(SqlServerConnector.class, TestHelper.defaultConfig().with(RelationalDatabaseConnectorConfig.TABLE_INCLUDE_LIST, ".*exclude_list_column_table_a").build());
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted("sql_server", "server1");
        this.connection.execute(new String[]{"INSERT INTO exclude_list_column_table_a VALUES(11, 120, 'some_name')"});
        TestHelper.waitForCdcRecord(this.connection, "exclude_list_column_table_a", resultSet -> {
            return resultSet.getInt("id") == 11;
        });
        List recordsForTopic = consumeRecordsByTopic(2).recordsForTopic("server1.dbo.exclude_list_column_table_a");
        Schema build = SchemaBuilder.struct().optional().name("server1.dbo.exclude_list_column_table_a.Value").field("id", Schema.INT32_SCHEMA).field("name", Schema.OPTIONAL_STRING_SCHEMA).build();
        Struct put = new Struct(build).put("id", 10).put("name", "a name");
        Struct put2 = new Struct(build).put("id", 11).put("name", "some_name");
        Assertions.assertThat(recordsForTopic).hasSize(2);
        SourceRecordAssert.assertThat((SourceRecord) recordsForTopic.get(0)).valueAfterFieldSchemaIsEqualTo(build).valueAfterFieldIsEqualTo(put);
        SourceRecordAssert.assertThat((SourceRecord) recordsForTopic.get(1)).valueAfterFieldSchemaIsEqualTo(build).valueAfterFieldIsEqualTo(put2);
        stopConnector();
    }

    @Test
    @FixFor({"DBZ-2522"})
    public void includeColumnsWhenCaptureInstanceExcludesColumnInMiddleOfTable() throws Exception {
        this.connection.execute(new String[]{"CREATE TABLE include_list_column_table_a (id int, amount integer, name varchar(30), primary key(id))"});
        TestHelper.enableTableCdc(this.connection, "include_list_column_table_a", "dbo_include_list_column_table_a", Arrays.asList("id", "name"));
        start(SqlServerConnector.class, TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.SCHEMA_ONLY).with(SqlServerConnectorConfig.COLUMN_INCLUDE_LIST, "dbo.include_list_column_table_a.id,dbo.include_list_column_table_a.name").build());
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted("sql_server", "server1");
        this.connection.execute(new String[]{"INSERT INTO include_list_column_table_a VALUES(10, 120, 'some_name')"});
        TestHelper.waitForCdcRecord(this.connection, "include_list_column_table_a", resultSet -> {
            return resultSet.getInt("id") == 10;
        });
        List recordsForTopic = consumeRecordsByTopic(1).recordsForTopic("server1.dbo.include_list_column_table_a");
        Schema build = SchemaBuilder.struct().optional().name("server1.dbo.include_list_column_table_a.Value").field("id", Schema.INT32_SCHEMA).field("name", Schema.OPTIONAL_STRING_SCHEMA).build();
        Struct put = new Struct(build).put("id", 10).put("name", "some_name");
        Assertions.assertThat(recordsForTopic).hasSize(1);
        SourceRecordAssert.assertThat((SourceRecord) recordsForTopic.get(0)).valueAfterFieldSchemaIsEqualTo(build).valueAfterFieldIsEqualTo(put);
        stopConnector();
    }

    @Test
    @FixFor({"DBZ-2522"})
    public void excludeMultipleColumnsWhenCaptureInstanceExcludesSingleColumn() throws Exception {
        this.connection.execute(new String[]{"CREATE TABLE exclude_list_column_table_a (id int, amount integer, note varchar(30), name varchar(30), primary key(id))"});
        TestHelper.enableTableCdc(this.connection, "exclude_list_column_table_a", "dbo_exclude_list_column_table_a", Arrays.asList("id", "note", "name"));
        start(SqlServerConnector.class, TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.SCHEMA_ONLY).with(SqlServerConnectorConfig.COLUMN_EXCLUDE_LIST, "dbo.exclude_list_column_table_a.amount,dbo.exclude_list_column_table_a.note").build());
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted("sql_server", "server1");
        this.connection.execute(new String[]{"INSERT INTO exclude_list_column_table_a VALUES(10, 120, 'a note', 'some_name')"});
        TestHelper.waitForCdcRecord(this.connection, "exclude_list_column_table_a", resultSet -> {
            return resultSet.getInt("id") == 10;
        });
        List recordsForTopic = consumeRecordsByTopic(1).recordsForTopic("server1.dbo.exclude_list_column_table_a");
        Schema build = SchemaBuilder.struct().optional().name("server1.dbo.exclude_list_column_table_a.Value").field("id", Schema.INT32_SCHEMA).field("name", Schema.OPTIONAL_STRING_SCHEMA).build();
        Struct put = new Struct(build).put("id", 10).put("name", "some_name");
        Assertions.assertThat(recordsForTopic).hasSize(1);
        SourceRecordAssert.assertThat((SourceRecord) recordsForTopic.get(0)).valueAfterFieldSchemaIsEqualTo(build).valueAfterFieldIsEqualTo(put);
        stopConnector();
    }

    @Test
    @FixFor({"DBZ-2522"})
    public void includeMultipleColumnsWhenCaptureInstanceExcludesSingleColumn() throws Exception {
        this.connection.execute(new String[]{"CREATE TABLE include_list_column_table_a (id int, amount integer, note varchar(30), name varchar(30), primary key(id))"});
        TestHelper.enableTableCdc(this.connection, "include_list_column_table_a", "dbo_include_list_column_table_a", Arrays.asList("id", "note", "name"));
        start(SqlServerConnector.class, TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.SCHEMA_ONLY).with(SqlServerConnectorConfig.COLUMN_INCLUDE_LIST, "dbo.include_list_column_table_a.id,dbo.include_list_column_table_a.name").build());
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted("sql_server", "server1");
        this.connection.execute(new String[]{"INSERT INTO include_list_column_table_a VALUES(10, 120, 'a note', 'some_name')"});
        TestHelper.waitForCdcRecord(this.connection, "include_list_column_table_a", resultSet -> {
            return resultSet.getInt("id") == 10;
        });
        List recordsForTopic = consumeRecordsByTopic(1).recordsForTopic("server1.dbo.include_list_column_table_a");
        Schema build = SchemaBuilder.struct().optional().name("server1.dbo.include_list_column_table_a.Value").field("id", Schema.INT32_SCHEMA).field("name", Schema.OPTIONAL_STRING_SCHEMA).build();
        Struct put = new Struct(build).put("id", 10).put("name", "some_name");
        Assertions.assertThat(recordsForTopic).hasSize(1);
        SourceRecordAssert.assertThat((SourceRecord) recordsForTopic.get(0)).valueAfterFieldSchemaIsEqualTo(build).valueAfterFieldIsEqualTo(put);
        stopConnector();
    }

    @Test
    @FixFor({"DBZ-964"})
    public void shouldPropagateDatabaseDriverProperties() throws Exception {
        start(SqlServerConnector.class, TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.SCHEMA_ONLY).with("database.applicationName", "Debezium App DBZ-964").build());
        assertConnectorIsRunning();
        this.connection.execute(new String[]{"INSERT INTO tablea VALUES(964, 'a')"});
        consumeRecordsByTopic(1);
        this.connection.query("select count(1) from sys.dm_exec_sessions where program_name = 'Debezium App DBZ-964'", resultSet -> {
            resultSet.next();
            Assertions.assertThat(resultSet.getInt(1)).isGreaterThanOrEqualTo(1);
        });
    }

    private void restartInTheMiddleOfTx(boolean z, boolean z2) throws Exception {
        Configuration build = TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.INITIAL).build();
        if (z) {
            start(SqlServerConnector.class, build);
            assertConnectorIsRunning();
            consumeRecordsByTopic(1);
            stopConnector();
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(-1, '-a')"});
        }
        start(SqlServerConnector.class, build, sourceRecord -> {
            if (!"server1.dbo.tablea.Envelope".equals(sourceRecord.valueSchema().name())) {
                return false;
            }
            Struct struct = ((Struct) sourceRecord.value()).getStruct("after");
            Integer int32 = struct.getInt32("id");
            return int32 != null && int32.intValue() == 25 && "a".equals(struct.getString("cola"));
        });
        assertConnectorIsRunning();
        consumeRecordsByTopic(1);
        if (z2) {
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(-2, '-a')"});
            assertRecord(((Struct) ((SourceRecord) consumeRecordsByTopic(1).allRecordsInOrder().get(0)).value()).getStruct("after"), Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, -2), new SchemaAndValueField("cola", Schema.OPTIONAL_STRING_SCHEMA, "-a")));
        }
        this.connection.setAutoCommit(false);
        for (int i = 0; i < 30; i++) {
            int i2 = 10 + i;
            this.connection.executeWithoutCommitting(new String[]{"INSERT INTO tablea VALUES(" + i2 + ", 'a')"});
            this.connection.executeWithoutCommitting(new String[]{"INSERT INTO tableb VALUES(" + i2 + ", 'b')"});
        }
        this.connection.connection().commit();
        TestHelper.waitForCdcRecord(this.connection, "tablea", resultSet -> {
            return resultSet.getInt("id") == 39;
        });
        TestHelper.waitForCdcRecord(this.connection, "tableb", resultSet2 -> {
            return resultSet2.getInt("id") == 39;
        });
        List allRecordsInOrder = consumeRecordsByTopic(30).allRecordsInOrder();
        Assertions.assertThat(allRecordsInOrder).hasSize(30);
        assertRecord((Struct) ((Struct) ((SourceRecord) allRecordsInOrder.get(29)).value()).get("after"), Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, 24), new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, "b")));
        stopConnector();
        start(SqlServerConnector.class, build);
        assertConnectorIsRunning();
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(30);
        Assertions.assertThat(consumeRecordsByTopic.allRecordsInOrder()).hasSize(30);
        List recordsForTopic = consumeRecordsByTopic.recordsForTopic("server1.dbo.tablea");
        List recordsForTopic2 = consumeRecordsByTopic.recordsForTopic("server1.dbo.tableb");
        for (int i3 = 0; i3 < 15; i3++) {
            int i4 = 25 + i3;
            SourceRecord sourceRecord2 = (SourceRecord) recordsForTopic.get(i3);
            SourceRecord sourceRecord3 = (SourceRecord) recordsForTopic2.get(i3);
            List<SchemaAndValueField> asList = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, Integer.valueOf(i4)), new SchemaAndValueField("cola", Schema.OPTIONAL_STRING_SCHEMA, "a"));
            List<SchemaAndValueField> asList2 = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, Integer.valueOf(i4)), new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, "b"));
            Struct struct = (Struct) sourceRecord2.value();
            assertRecord((Struct) struct.get("after"), asList);
            Assert.assertNull(struct.get("before"));
            Struct struct2 = (Struct) sourceRecord3.value();
            assertRecord((Struct) struct2.get("after"), asList2);
            Assert.assertNull(struct2.get("before"));
        }
        for (int i5 = 0; i5 < 30; i5++) {
            int i6 = 1000 + i5;
            this.connection.executeWithoutCommitting(new String[]{"INSERT INTO tablea VALUES(" + i6 + ", 'a')"});
            this.connection.executeWithoutCommitting(new String[]{"INSERT INTO tableb VALUES(" + i6 + ", 'b')"});
            this.connection.connection().commit();
        }
        TestHelper.waitForCdcRecord(this.connection, "tablea", resultSet3 -> {
            return resultSet3.getInt("id") == 1029;
        });
        TestHelper.waitForCdcRecord(this.connection, "tableb", resultSet4 -> {
            return resultSet4.getInt("id") == 1029;
        });
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(60);
        List recordsForTopic3 = consumeRecordsByTopic2.recordsForTopic("server1.dbo.tablea");
        List recordsForTopic4 = consumeRecordsByTopic2.recordsForTopic("server1.dbo.tableb");
        Assertions.assertThat(recordsForTopic3).hasSize(30);
        Assertions.assertThat(recordsForTopic4).hasSize(30);
        for (int i7 = 0; i7 < 30; i7++) {
            int i8 = i7 + 1000;
            SourceRecord sourceRecord4 = (SourceRecord) recordsForTopic3.get(i7);
            SourceRecord sourceRecord5 = (SourceRecord) recordsForTopic4.get(i7);
            List<SchemaAndValueField> asList3 = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, Integer.valueOf(i8)), new SchemaAndValueField("cola", Schema.OPTIONAL_STRING_SCHEMA, "a"));
            List<SchemaAndValueField> asList4 = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, Integer.valueOf(i8)), new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, "b"));
            Struct struct3 = (Struct) sourceRecord4.value();
            assertRecord((Struct) struct3.get("after"), asList3);
            Assert.assertNull(struct3.get("before"));
            Struct struct4 = (Struct) sourceRecord5.value();
            assertRecord((Struct) struct4.get("after"), asList4);
            Assert.assertNull(struct4.get("before"));
        }
    }

    @Test
    @FixFor({"DBZ-1128"})
    public void restartInTheMiddleOfTxAfterSnapshot() throws Exception {
        restartInTheMiddleOfTx(true, false);
    }

    @Test
    @FixFor({"DBZ-1128"})
    public void restartInTheMiddleOfTxAfterCompletedTx() throws Exception {
        restartInTheMiddleOfTx(false, true);
    }

    @Test
    @FixFor({"DBZ-1128"})
    public void restartInTheMiddleOfTx() throws Exception {
        restartInTheMiddleOfTx(false, false);
    }

    @Test
    @FixFor({"DBZ-1242"})
    public void testEmptySchemaWarningAfterApplyingFilters() throws Exception {
        LogInterceptor logInterceptor = new LogInterceptor();
        start(SqlServerConnector.class, TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.INITIAL).with(SqlServerConnectorConfig.TABLE_INCLUDE_LIST, "my_products").build());
        assertConnectorIsRunning();
        waitForAvailableRecords(100L, TimeUnit.MILLISECONDS);
        stopConnector(z -> {
            Assertions.assertThat(logInterceptor.containsWarnMessage("After applying the include/exclude list filters, no changes will be captured. Please check your configuration!")).isTrue();
        });
    }

    @Test
    @FixFor({"DBZ-1242"})
    public void testNoEmptySchemaWarningAfterApplyingFilters() throws Exception {
        LogInterceptor logInterceptor = new LogInterceptor();
        start(SqlServerConnector.class, TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.INITIAL).build());
        assertConnectorIsRunning();
        waitForAvailableRecords(100L, TimeUnit.MILLISECONDS);
        stopConnector(z -> {
            Assertions.assertThat(logInterceptor.containsWarnMessage("After applying the include/exclude list filters, no changes will be captured. Please check your configuration!")).isFalse();
        });
    }

    @Test
    @FixFor({"DBZ-916"})
    public void keylessTable() throws Exception {
        this.connection.execute(new String[]{"CREATE TABLE keyless (id int, name varchar(30))", "INSERT INTO keyless VALUES(1, 'k')"});
        TestHelper.enableTableCdc(this.connection, "keyless");
        start(SqlServerConnector.class, TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.INITIAL).with(SqlServerConnectorConfig.TABLE_INCLUDE_LIST, "dbo.keyless").build());
        assertConnectorIsRunning();
        Arrays.asList(new SchemaAndValueField("id", Schema.OPTIONAL_INT32_SCHEMA, 1), new SchemaAndValueField("name", Schema.OPTIONAL_STRING_SCHEMA, "k"));
        List<SchemaAndValueField> asList = Arrays.asList(new SchemaAndValueField("id", Schema.OPTIONAL_INT32_SCHEMA, 2), new SchemaAndValueField("name", Schema.OPTIONAL_STRING_SCHEMA, "k"));
        List<SchemaAndValueField> asList2 = Arrays.asList(new SchemaAndValueField("id", Schema.OPTIONAL_INT32_SCHEMA, 3), new SchemaAndValueField("name", Schema.OPTIONAL_STRING_SCHEMA, "k"));
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(((SourceRecord) consumeRecordsByTopic.recordsForTopic("server1.dbo.keyless").get(0)).key()).isNull();
        Assertions.assertThat(((SourceRecord) consumeRecordsByTopic.recordsForTopic("server1.dbo.keyless").get(0)).keySchema()).isNull();
        this.connection.execute(new String[]{"INSERT INTO keyless VALUES(2, 'k')"});
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(1);
        Assertions.assertThat(((SourceRecord) consumeRecordsByTopic2.recordsForTopic("server1.dbo.keyless").get(0)).key()).isNull();
        Assertions.assertThat(((SourceRecord) consumeRecordsByTopic2.recordsForTopic("server1.dbo.keyless").get(0)).key()).isNull();
        this.connection.execute(new String[]{"UPDATE keyless SET id=3 WHERE ID=2"});
        SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic(3).recordsForTopic("server1.dbo.keyless").get(0);
        Assertions.assertThat(sourceRecord.key()).isNull();
        Assertions.assertThat(sourceRecord.keySchema()).isNull();
        assertRecord(((Struct) sourceRecord.value()).getStruct("before"), asList);
        assertRecord(((Struct) sourceRecord.value()).getStruct("after"), asList2);
        this.connection.execute(new String[]{"DELETE FROM keyless WHERE id=3"});
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic3 = consumeRecordsByTopic(2, false);
        Assertions.assertThat(((SourceRecord) consumeRecordsByTopic3.recordsForTopic("server1.dbo.keyless").get(0)).key()).isNull();
        Assertions.assertThat(((SourceRecord) consumeRecordsByTopic3.recordsForTopic("server1.dbo.keyless").get(0)).keySchema()).isNull();
        Assert.assertNull(((SourceRecord) consumeRecordsByTopic3.recordsForTopic("server1.dbo.keyless").get(1)).value());
        stopConnector();
    }

    @Test
    @FixFor({"DBZ-1015"})
    public void shouldRewriteIdentityKey() throws InterruptedException, SQLException {
        this.connection.execute(new String[]{"CREATE TABLE keyless (id int, name varchar(30))", "INSERT INTO keyless VALUES(1, 'k')"});
        TestHelper.enableTableCdc(this.connection, "keyless");
        start(SqlServerConnector.class, TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.INITIAL).with(SqlServerConnectorConfig.TABLE_INCLUDE_LIST, "dbo.keyless").with(SqlServerConnectorConfig.MSG_KEY_COLUMNS, "(.*).keyless:id").build());
        List recordsForTopic = consumeRecordsByTopic(1).recordsForTopic("server1.dbo.keyless");
        Assertions.assertThat(((SourceRecord) recordsForTopic.get(0)).key()).isNotNull();
        Assertions.assertThat(((Struct) ((SourceRecord) recordsForTopic.get(0)).key()).get("id")).isNotNull();
        stopConnector();
    }

    @Test
    @FixFor({"DBZ-1491"})
    public void shouldCaptureTableSchema() throws SQLException, InterruptedException {
        this.connection.execute(new String[]{"CREATE TABLE table_schema_test (key_cola int not null,key_colb varchar(10) not null,cola int not null,colb datetimeoffset not null default ('2019-01-01 12:34:56.1234567+04:00'),colc varchar(20) default ('default_value'),cold float,primary key(key_cola, key_colb))"});
        TestHelper.enableTableCdc(this.connection, "table_schema_test");
        start(SqlServerConnector.class, TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.SCHEMA_ONLY).build());
        assertConnectorIsRunning();
        TestHelper.waitForSnapshotToBeCompleted();
        this.connection.execute(new String[]{"INSERT INTO table_schema_test (key_cola, key_colb, cola, colb, colc, cold) VALUES(1, 'a', 100, '2019-01-01 10:20:39.1234567 +02:00', 'some_value', 100.20)"});
        List recordsForTopic = consumeRecordsByTopic(1).recordsForTopic("server1.dbo.table_schema_test");
        Assertions.assertThat(recordsForTopic).hasSize(1);
        SourceRecordAssert.assertThat((SourceRecord) recordsForTopic.get(0)).keySchemaIsEqualTo(SchemaBuilder.struct().name("server1.dbo.table_schema_test.Key").field("key_cola", Schema.INT32_SCHEMA).field("key_colb", Schema.STRING_SCHEMA).build()).valueAfterFieldSchemaIsEqualTo(SchemaBuilder.struct().optional().name("server1.dbo.table_schema_test.Value").field("key_cola", Schema.INT32_SCHEMA).field("key_colb", Schema.STRING_SCHEMA).field("cola", Schema.INT32_SCHEMA).field("colb", SchemaBuilder.string().name("io.debezium.time.ZonedTimestamp").required().defaultValue("2019-01-01T12:34:56.1234567+04:00").version(1).build()).field("colc", SchemaBuilder.string().optional().defaultValue("default_value").build()).field("cold", Schema.OPTIONAL_FLOAT64_SCHEMA).build());
        stopConnector();
    }

    @Test
    @FixFor({"DBZ-1923"})
    public void shouldDetectPurgedHistory() throws Exception {
        Configuration build = TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.INITIAL).with(SqlServerConnectorConfig.DATABASE_HISTORY, PurgableFileDatabaseHistory.class).build();
        for (int i = 0; i < 5; i++) {
            int i2 = 10 + i;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + i2 + ", 'a')"});
            this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + i2 + ", 'b')"});
        }
        Awaitility.await().atMost(30L, TimeUnit.SECONDS).pollInterval(100L, TimeUnit.MILLISECONDS).until(() -> {
            Testing.debug("Waiting for initial changes to be propagated to CDC structures");
            return Boolean.valueOf(this.connection.getMaxLsn(TestHelper.TEST_DATABASE).isAvailable());
        });
        start(SqlServerConnector.class, build);
        assertConnectorIsRunning();
        List allRecordsInOrder = consumeRecordsByTopic(11).allRecordsInOrder();
        Iterator it = allRecordsInOrder.subList(1, allRecordsInOrder.size()).iterator();
        while (it.hasNext()) {
            SourceRecord sourceRecord = (SourceRecord) it.next();
            ((ObjectAssert) Assertions.assertThat(sourceRecord.sourceOffset().get("snapshot")).as("Snapshot phase")).isEqualTo(true);
            if (it.hasNext()) {
                ((ObjectAssert) Assertions.assertThat(sourceRecord.sourceOffset().get("snapshot_completed")).as("Snapshot in progress")).isEqualTo(false);
            } else {
                ((ObjectAssert) Assertions.assertThat(sourceRecord.sourceOffset().get("snapshot_completed")).as("Snapshot completed")).isEqualTo(true);
            }
        }
        stopConnector();
        for (int i3 = 0; i3 < 5; i3++) {
            int i4 = 100 + i3;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + i4 + ", 'a')"});
            this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + i4 + ", 'b')"});
        }
        Testing.Files.delete(TestHelper.DB_HISTORY_PATH);
        LogInterceptor logInterceptor = new LogInterceptor();
        start(SqlServerConnector.class, build);
        assertConnectorNotRunning();
        Assertions.assertThat(logInterceptor.containsStacktraceElement("The db history topic or its content is fully or partially missing. Please check database history topic configuration and re-execute the snapshot.")).isTrue();
    }

    @Test
    @FixFor({"DBZ-1988"})
    public void shouldHonorSourceTimestampMode() throws InterruptedException, SQLException {
        this.connection.execute(new String[]{"CREATE TABLE source_timestamp_mode (id int, name varchar(30) primary key(id))"});
        TestHelper.enableTableCdc(this.connection, "source_timestamp_mode");
        start(SqlServerConnector.class, TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.SCHEMA_ONLY).with(SqlServerConnectorConfig.TABLE_INCLUDE_LIST, "dbo.source_timestamp_mode").with(SqlServerConnectorConfig.SOURCE_TIMESTAMP_MODE, "processing").build());
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted("sql_server", "server1");
        this.connection.execute(new String[]{"INSERT INTO source_timestamp_mode VALUES(1, 'abc')"});
        SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic(1).recordsForTopic("server1.dbo.source_timestamp_mode").get(0);
        Assertions.assertThat(((Long) ((Struct) sourceRecord.value()).get("ts_ms")).longValue() - ((Long) ((Struct) ((Struct) sourceRecord.value()).get("source")).get("ts_ms")).longValue()).isLessThan(100L);
        stopConnector();
    }

    @Test
    @FixFor({"DBZ-1312"})
    public void useShortTableNamesForColumnMapper() throws Exception {
        start(SqlServerConnector.class, TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.INITIAL).with("column.mask.with.4.chars", "dbo.tablea.cola").build());
        assertConnectorIsRunning();
        consumeRecordsByTopic(1);
        for (int i = 0; i < 5; i++) {
            int i2 = 10 + i;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + i2 + ", 'a')"});
            this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + i2 + ", 'b')"});
        }
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(10);
        List recordsForTopic = consumeRecordsByTopic.recordsForTopic("server1.dbo.tablea");
        List recordsForTopic2 = consumeRecordsByTopic.recordsForTopic("server1.dbo.tableb");
        Assertions.assertThat(recordsForTopic).hasSize(5);
        Assertions.assertThat(recordsForTopic2).hasSize(5);
        for (int i3 = 0; i3 < 5; i3++) {
            SourceRecord sourceRecord = (SourceRecord) recordsForTopic.get(i3);
            SourceRecord sourceRecord2 = (SourceRecord) recordsForTopic2.get(i3);
            List<SchemaAndValueField> asList = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, Integer.valueOf(i3 + 10)), new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, "b"));
            Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("after").getString("cola")).isEqualTo("****");
            Struct struct = (Struct) sourceRecord2.value();
            assertRecord((Struct) struct.get("after"), asList);
            Assert.assertNull(struct.get("before"));
        }
        stopConnector();
    }

    @Test
    @FixFor({"DBZ-1312"})
    public void useLongTableNamesForColumnMapper() throws Exception {
        start(SqlServerConnector.class, TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.INITIAL).with("column.mask.with.4.chars", "testDB.dbo.tablea.cola").build());
        assertConnectorIsRunning();
        consumeRecordsByTopic(1);
        for (int i = 0; i < 5; i++) {
            int i2 = 10 + i;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + i2 + ", 'a')"});
            this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + i2 + ", 'b')"});
        }
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(10);
        List recordsForTopic = consumeRecordsByTopic.recordsForTopic("server1.dbo.tablea");
        List recordsForTopic2 = consumeRecordsByTopic.recordsForTopic("server1.dbo.tableb");
        Assertions.assertThat(recordsForTopic).hasSize(5);
        Assertions.assertThat(recordsForTopic2).hasSize(5);
        for (int i3 = 0; i3 < 5; i3++) {
            SourceRecord sourceRecord = (SourceRecord) recordsForTopic.get(i3);
            SourceRecord sourceRecord2 = (SourceRecord) recordsForTopic2.get(i3);
            List<SchemaAndValueField> asList = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, Integer.valueOf(i3 + 10)), new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, "b"));
            Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("after").getString("cola")).isEqualTo("****");
            Struct struct = (Struct) sourceRecord2.value();
            assertRecord((Struct) struct.get("after"), asList);
            Assert.assertNull(struct.get("before"));
        }
        stopConnector();
    }

    @Test
    @FixFor({"DBZ-1312"})
    public void useLongTableNamesForKeyMapper() throws Exception {
        start(SqlServerConnector.class, TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.INITIAL).with(SqlServerConnectorConfig.MSG_KEY_COLUMNS, "testDB.dbo.tablea:cola").build());
        assertConnectorIsRunning();
        consumeRecordsByTopic(1);
        for (int i = 0; i < 5; i++) {
            int i2 = 10 + i;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + i2 + ", 'a')"});
            this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + i2 + ", 'b')"});
        }
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(10);
        List recordsForTopic = consumeRecordsByTopic.recordsForTopic("server1.dbo.tablea");
        List recordsForTopic2 = consumeRecordsByTopic.recordsForTopic("server1.dbo.tableb");
        Assertions.assertThat(recordsForTopic).hasSize(5);
        Assertions.assertThat(recordsForTopic2).hasSize(5);
        for (int i3 = 0; i3 < 5; i3++) {
            SourceRecord sourceRecord = (SourceRecord) recordsForTopic.get(i3);
            SourceRecord sourceRecord2 = (SourceRecord) recordsForTopic2.get(i3);
            List<SchemaAndValueField> asList = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, Integer.valueOf(i3 + 10)), new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, "b"));
            Assertions.assertThat(((Struct) sourceRecord.key()).getString("cola")).isEqualTo("a");
            Struct struct = (Struct) sourceRecord2.value();
            assertRecord((Struct) struct.get("after"), asList);
            Assert.assertNull(struct.get("before"));
        }
        stopConnector();
    }

    @Test
    @FixFor({"DBZ-1312"})
    public void useShortTableNamesForKeyMapper() throws Exception {
        start(SqlServerConnector.class, TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.INITIAL).with(SqlServerConnectorConfig.MSG_KEY_COLUMNS, "dbo.tablea:cola").build());
        assertConnectorIsRunning();
        consumeRecordsByTopic(1);
        for (int i = 0; i < 5; i++) {
            int i2 = 10 + i;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + i2 + ", 'a')"});
            this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + i2 + ", 'b')"});
        }
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(10);
        List recordsForTopic = consumeRecordsByTopic.recordsForTopic("server1.dbo.tablea");
        List recordsForTopic2 = consumeRecordsByTopic.recordsForTopic("server1.dbo.tableb");
        Assertions.assertThat(recordsForTopic).hasSize(5);
        Assertions.assertThat(recordsForTopic2).hasSize(5);
        for (int i3 = 0; i3 < 5; i3++) {
            SourceRecord sourceRecord = (SourceRecord) recordsForTopic.get(i3);
            SourceRecord sourceRecord2 = (SourceRecord) recordsForTopic2.get(i3);
            List<SchemaAndValueField> asList = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, Integer.valueOf(i3 + 10)), new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, "b"));
            Assertions.assertThat(((Struct) sourceRecord.key()).getString("cola")).isEqualTo("a");
            Struct struct = (Struct) sourceRecord2.value();
            assertRecord((Struct) struct.get("after"), asList);
            Assert.assertNull(struct.get("before"));
        }
        stopConnector();
    }

    @Test
    @FixFor({"DBZ-1916", "DBZ-1830"})
    public void shouldPropagateSourceTypeByDatatype() throws Exception {
        this.connection.execute(new String[]{"CREATE TABLE dt_table (id int, c1 int, c2 int, c3a numeric(5,2), c3b varchar(128), f1 float(10), f2 decimal(8,4) primary key(id))"});
        TestHelper.enableTableCdc(this.connection, "dt_table");
        start(SqlServerConnector.class, TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.SCHEMA_ONLY).with(SqlServerConnectorConfig.TABLE_INCLUDE_LIST, "dbo.dt_table").with("datatype.propagate.source.type", ".+\\.NUMERIC,.+\\.VARCHAR,.+\\.REAL,.+\\.DECIMAL").build());
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted("sql_server", "server1");
        this.connection.execute(new String[]{"INSERT INTO dt_table (id,c1,c2,c3a,c3b,f1,f2) values (1, 123, 456, 789.01, 'test', 1.228, 234.56)"});
        Field field = ((SourceRecord) consumeRecordsByTopic(1).recordsForTopic("server1.dbo.dt_table").get(0)).valueSchema().field("before");
        Assertions.assertThat(field.schema().field("id").schema().parameters()).isNull();
        Assertions.assertThat(field.schema().field("c1").schema().parameters()).isNull();
        Assertions.assertThat(field.schema().field("c2").schema().parameters()).isNull();
        Assertions.assertThat(field.schema().field("c3a").schema().parameters()).includes(new MapAssert.Entry[]{MapAssert.entry(TestHelper.TYPE_NAME_PARAMETER_KEY, "NUMERIC"), MapAssert.entry(TestHelper.TYPE_LENGTH_PARAMETER_KEY, "5"), MapAssert.entry(TestHelper.TYPE_SCALE_PARAMETER_KEY, "2")});
        Assertions.assertThat(field.schema().field("c3b").schema().parameters()).includes(new MapAssert.Entry[]{MapAssert.entry(TestHelper.TYPE_NAME_PARAMETER_KEY, "VARCHAR"), MapAssert.entry(TestHelper.TYPE_LENGTH_PARAMETER_KEY, "128")});
        Assertions.assertThat(field.schema().field("f2").schema().parameters()).includes(new MapAssert.Entry[]{MapAssert.entry(TestHelper.TYPE_NAME_PARAMETER_KEY, "DECIMAL"), MapAssert.entry(TestHelper.TYPE_LENGTH_PARAMETER_KEY, "8"), MapAssert.entry(TestHelper.TYPE_SCALE_PARAMETER_KEY, "4")});
        Assertions.assertThat(field.schema().field("f1").schema().parameters()).includes(new MapAssert.Entry[]{MapAssert.entry(TestHelper.TYPE_NAME_PARAMETER_KEY, "REAL"), MapAssert.entry(TestHelper.TYPE_LENGTH_PARAMETER_KEY, "24")});
        stopConnector();
    }

    @Test
    @FixFor({"DBZ-2379"})
    public void shouldNotStreamWhenUsingSnapshotModeInitialOnly() throws Exception {
        Configuration build = TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.INITIAL_ONLY).build();
        LogInterceptor logInterceptor = new LogInterceptor();
        start(SqlServerConnector.class, build);
        assertConnectorIsRunning();
        consumeRecordsByTopic(1);
        assertNoRecordsToConsume();
        stopConnector(z -> {
            Assertions.assertThat(logInterceptor.containsMessage("Streaming is not enabled in current configuration")).isTrue();
        });
    }

    @Test
    @FixFor({"DBZ-2582"})
    public void testMaxLsnSelectStatementWithoutLimit() throws Exception {
        start(SqlServerConnector.class, TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.SCHEMA_ONLY).build());
        assertConnectorIsRunning();
        consumeRecordsByTopic(1);
        for (int i = 0; i < 5; i++) {
            int i2 = 10 + i;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + i2 + ", 'a')"});
            this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + i2 + ", 'b')"});
        }
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(10);
        List recordsForTopic = consumeRecordsByTopic.recordsForTopic("server1.dbo.tablea");
        List recordsForTopic2 = consumeRecordsByTopic.recordsForTopic("server1.dbo.tableb");
        Assertions.assertThat(recordsForTopic).hasSize(5);
        Assertions.assertThat(recordsForTopic2).hasSize(5);
        stopConnector();
    }

    @Test
    @FixFor({"DBZ-2582"})
    public void testMaxLsnSelectStatementWithLimit() throws Exception {
        start(SqlServerConnector.class, TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.SCHEMA_ONLY).with(SqlServerConnectorConfig.MAX_TRANSACTIONS_PER_ITERATION, 1).build());
        assertConnectorIsRunning();
        consumeRecordsByTopic(1);
        for (int i = 0; i < 5; i++) {
            int i2 = 10 + i;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + i2 + ", 'a')"});
            this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + i2 + ", 'b')"});
        }
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(10);
        List recordsForTopic = consumeRecordsByTopic.recordsForTopic("server1.dbo.tablea");
        List recordsForTopic2 = consumeRecordsByTopic.recordsForTopic("server1.dbo.tableb");
        Assertions.assertThat(recordsForTopic).hasSize(5);
        Assertions.assertThat(recordsForTopic2).hasSize(5);
        stopConnector();
    }

    @Test
    @FixFor({"DBZ-2699"})
    public void shouldEmitNoEventsForSkippedUpdateAndDeleteOperations() throws Exception {
        start(SqlServerConnector.class, TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.INITIAL).with(SqlServerConnectorConfig.SKIPPED_OPERATIONS, "u,d").build());
        assertConnectorIsRunning();
        TestHelper.waitForSnapshotToBeCompleted();
        consumeRecordsByTopic(1);
        this.connection.execute(new String[]{"INSERT INTO tablea VALUES(201, 'insert201')"});
        this.connection.execute(new String[]{"UPDATE tablea SET cola='insert201-update' WHERE id=201"});
        this.connection.execute(new String[]{"INSERT INTO tablea VALUES(202, 'insert202')"});
        this.connection.execute(new String[]{"DELETE FROM tablea WHERE id=202"});
        this.connection.execute(new String[]{"INSERT INTO tablea VALUES(203, 'insert203')"});
        List recordsForTopic = consumeRecordsByTopic(3).recordsForTopic("server1.dbo.tablea");
        Assertions.assertThat(recordsForTopic).hasSize(3);
        recordsForTopic.forEach(sourceRecord -> {
            Struct struct = (Struct) sourceRecord.value();
            Assertions.assertThat(struct.get("op")).isEqualTo(Envelope.Operation.CREATE.code());
            Assertions.assertThat(struct.get("op")).isNotEqualTo(Envelope.Operation.UPDATE.code());
            Assertions.assertThat(struct.get("op")).isNotEqualTo(Envelope.Operation.DELETE.code());
        });
        assertInsert((SourceRecord) recordsForTopic.get(0), "id", 201);
        assertInsert((SourceRecord) recordsForTopic.get(1), "id", 202);
        assertInsert((SourceRecord) recordsForTopic.get(2), "id", 203);
    }

    private void assertRecord(Struct struct, List<SchemaAndValueField> list) {
        list.forEach(schemaAndValueField -> {
            schemaAndValueField.assertFor(struct);
        });
    }
}
