package io.debezium.connector.sqlserver;

import io.debezium.config.Configuration;
import io.debezium.connector.sqlserver.SqlServerConnectorConfig;
import io.debezium.connector.sqlserver.util.TestHelper;
import io.debezium.doc.FixFor;
import io.debezium.document.Array;
import io.debezium.document.Document;
import io.debezium.document.DocumentReader;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.util.IoUtil;
import io.debezium.util.Testing;
import java.io.IOException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:io/debezium/connector/sqlserver/SqlServerChangeTableSetIT.class */
public class SqlServerChangeTableSetIT extends AbstractConnectorTest {
    private SqlServerConnection connection;

    @Before
    public void before() throws SQLException {
        TestHelper.createTestDatabase();
        this.connection = TestHelper.testConnection();
        this.connection.execute(new String[]{"CREATE TABLE tablea (id int primary key, cola varchar(30))", "CREATE TABLE tableb (id int primary key, colb varchar(30))", "CREATE TABLE tablec (id int primary key, colc varchar(30))"});
        TestHelper.enableTableCdc(this.connection, "tablea");
        TestHelper.enableTableCdc(this.connection, "tableb");
        initializeConnectorTestFramework();
        Testing.Files.delete(TestHelper.SCHEMA_HISTORY_PATH);
    }

    @After
    public void after() throws SQLException {
        if (this.connection != null) {
            this.connection.close();
        }
    }

    @Test
    public void addTable() throws Exception {
        start(SqlServerConnector.class, TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.NO_DATA).build());
        assertConnectorIsRunning();
        TestHelper.waitForSnapshotToBeCompleted();
        for (int i = 0; i < 5; i++) {
            int i2 = 10 + i;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + i2 + ", 'a')"});
            this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + i2 + ", 'b')"});
        }
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(10);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("server1.testDB1.dbo.tablea")).hasSize(5);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("server1.testDB1.dbo.tableb")).hasSize(5);
        TestHelper.enableTableCdc(this.connection, "tablec");
        this.connection.execute(new String[]{"CREATE TABLE tabled (id int primary key, cold varchar(30))"});
        TestHelper.enableTableCdc(this.connection, "tabled");
        for (int i3 = 0; i3 < 5; i3++) {
            int i4 = 10 + i3;
            this.connection.execute(new String[]{"INSERT INTO tablec VALUES(" + i4 + ", 'c')"});
            this.connection.execute(new String[]{"INSERT INTO tabled VALUES(" + i4 + ", 'd')"});
        }
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(10);
        Assertions.assertThat(consumeRecordsByTopic2.recordsForTopic("server1.testDB1.dbo.tablec")).hasSize(5);
        Assertions.assertThat(consumeRecordsByTopic2.recordsForTopic("server1.testDB1.dbo.tabled")).hasSize(5);
        consumeRecordsByTopic2.recordsForTopic("server1.testDB1.dbo.tablec").forEach(sourceRecord -> {
            assertSchemaMatchesStruct((Struct) ((Struct) sourceRecord.value()).get("after"), SchemaBuilder.struct().optional().name("server1.testDB1.dbo.tablec.Value").field("id", Schema.INT32_SCHEMA).field("colc", Schema.OPTIONAL_STRING_SCHEMA).build());
        });
        consumeRecordsByTopic2.recordsForTopic("server1.testDB1.dbo.tabled").forEach(sourceRecord2 -> {
            assertSchemaMatchesStruct((Struct) ((Struct) sourceRecord2.value()).get("after"), SchemaBuilder.struct().optional().name("server1.testDB1.dbo.tabled.Value").field("id", Schema.INT32_SCHEMA).field("cold", Schema.OPTIONAL_STRING_SCHEMA).build());
        });
    }

    @Test
    public void removeTable() throws Exception {
        start(SqlServerConnector.class, TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.NO_DATA).build());
        assertConnectorIsRunning();
        TestHelper.waitForSnapshotToBeCompleted();
        for (int i = 0; i < 5; i++) {
            int i2 = 10 + i;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + i2 + ", 'a')"});
            this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + i2 + ", 'b')"});
        }
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(10);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("server1.testDB1.dbo.tablea")).hasSize(5);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("server1.testDB1.dbo.tableb")).hasSize(5);
        TestHelper.disableTableCdc(this.connection, "tableb");
        for (int i3 = 0; i3 < 5; i3++) {
            int i4 = 100 + i3;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + i4 + ", 'a2')"});
            this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + i4 + ", 'b2')"});
        }
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(5);
        Assertions.assertThat(consumeRecordsByTopic2.recordsForTopic("server1.testDB1.dbo.tablea")).hasSize(5);
        Assertions.assertThat(consumeRecordsByTopic2.recordsForTopic("server1.testDB1.dbo.tableb")).isNullOrEmpty();
    }

    @Test
    public void addColumnToTableEndOfBatchWithoutLsnLimit() throws Exception {
        addColumnToTable(TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.NO_DATA).build(), true);
    }

    @Test
    @FixFor({"DBZ-3992"})
    public void addColumnToTableEndOfBatchWithLsnLimit() throws Exception {
        addColumnToTable(TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.NO_DATA).with(SqlServerConnectorConfig.MAX_TRANSACTIONS_PER_ITERATION, 1).build(), true);
    }

    @Test
    public void addColumnToTableMiddleOfBatchWithoutLsnLimit() throws Exception {
        addColumnToTable(TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.NO_DATA).build(), false);
    }

    @Test
    @FixFor({"DBZ-3992"})
    public void addColumnToTableMiddleOfBatchWithLsnLimit() throws Exception {
        addColumnToTable(TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.NO_DATA).with(SqlServerConnectorConfig.MAX_TRANSACTIONS_PER_ITERATION, 1).build(), true);
    }

    private void addColumnToTable(Configuration configuration, boolean z) throws Exception {
        start(SqlServerConnector.class, configuration);
        assertConnectorIsRunning();
        TestHelper.waitForSnapshotToBeCompleted();
        for (int i = 0; i < 5; i++) {
            int i2 = 10 + i;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + i2 + ", 'a')"});
            this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + i2 + ", 'b')"});
        }
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(10);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("server1.testDB1.dbo.tablea")).hasSize(5);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("server1.testDB1.dbo.tableb")).hasSize(5);
        consumeRecordsByTopic.recordsForTopic("server1.testDB1.dbo.tableb").forEach(sourceRecord -> {
            assertSchemaMatchesStruct((Struct) ((Struct) sourceRecord.value()).get("after"), SchemaBuilder.struct().optional().name("server1.testDB1.dbo.tableb.Value").field("id", Schema.INT32_SCHEMA).field("colb", Schema.OPTIONAL_STRING_SCHEMA).build());
        });
        this.connection.execute(new String[]{"ALTER TABLE dbo.tableb ADD newcol INT NOT NULL DEFAULT 0"});
        for (int i3 = 0; i3 < 5; i3++) {
            int i4 = 100 + i3;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + i4 + ", 'a2')"});
            this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + i4 + ", 'b2', 2)"});
        }
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(10);
        Assertions.assertThat(consumeRecordsByTopic2.recordsForTopic("server1.testDB1.dbo.tablea")).hasSize(5);
        Assertions.assertThat(consumeRecordsByTopic2.recordsForTopic("server1.testDB1.dbo.tableb")).hasSize(5);
        consumeRecordsByTopic2.recordsForTopic("server1.testDB1.dbo.tableb").forEach(sourceRecord2 -> {
            assertSchemaMatchesStruct((Struct) ((Struct) sourceRecord2.value()).get("after"), SchemaBuilder.struct().optional().name("server1.testDB1.dbo.tableb.Value").field("id", Schema.INT32_SCHEMA).field("colb", Schema.OPTIONAL_STRING_SCHEMA).build());
        });
        TestHelper.enableTableCdc(this.connection, "tableb", "after_change");
        if (z) {
            Thread.sleep(5000L);
        }
        for (int i5 = 0; i5 < 5; i5++) {
            int i6 = 1000 + i5;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + i6 + ", 'a3')"});
            this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + i6 + ", 'b3', 3)"});
        }
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic3 = consumeRecordsByTopic(10);
        Assertions.assertThat(consumeRecordsByTopic3.recordsForTopic("server1.testDB1.dbo.tablea")).hasSize(5);
        Assertions.assertThat(consumeRecordsByTopic3.recordsForTopic("server1.testDB1.dbo.tableb")).hasSize(5);
        consumeRecordsByTopic3.recordsForTopic("server1.testDB1.dbo.tableb").forEach(sourceRecord3 -> {
            assertSchemaMatchesStruct((Struct) ((Struct) sourceRecord3.value()).get("after"), SchemaBuilder.struct().optional().name("server1.testDB1.dbo.tableb.Value").field("id", Schema.INT32_SCHEMA).field("colb", Schema.OPTIONAL_STRING_SCHEMA).field("newcol", SchemaBuilder.int32().defaultValue(0).build()).build());
        });
        for (int i7 = 0; i7 < 5; i7++) {
            int i8 = 10000 + i7;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + i8 + ", 'a4')"});
            this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + i8 + ", 'b4', 4)"});
        }
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic4 = consumeRecordsByTopic(10);
        Assertions.assertThat(consumeRecordsByTopic4.recordsForTopic("server1.testDB1.dbo.tablea")).hasSize(5);
        Assertions.assertThat(consumeRecordsByTopic4.recordsForTopic("server1.testDB1.dbo.tableb")).hasSize(5);
        consumeRecordsByTopic4.recordsForTopic("server1.testDB1.dbo.tableb").forEach(sourceRecord4 -> {
            assertSchemaMatchesStruct((Struct) ((Struct) sourceRecord4.value()).get("after"), SchemaBuilder.struct().optional().name("server1.testDB1.dbo.tableb.Value").field("id", Schema.INT32_SCHEMA).field("colb", Schema.OPTIONAL_STRING_SCHEMA).field("newcol", SchemaBuilder.int32().defaultValue(0).build()).build());
        });
    }

    @Test
    public void removeColumnFromTable() throws Exception {
        start(SqlServerConnector.class, TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.NO_DATA).build());
        assertConnectorIsRunning();
        TestHelper.waitForSnapshotToBeCompleted();
        for (int i = 0; i < 5; i++) {
            int i2 = 10 + i;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + i2 + ", 'a')"});
            this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + i2 + ", 'b')"});
        }
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(10);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("server1.testDB1.dbo.tablea")).hasSize(5);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("server1.testDB1.dbo.tableb")).hasSize(5);
        consumeRecordsByTopic.recordsForTopic("server1.testDB1.dbo.tableb").forEach(sourceRecord -> {
            assertSchemaMatchesStruct((Struct) ((Struct) sourceRecord.value()).get("after"), SchemaBuilder.struct().optional().name("server1.testDB1.dbo.tableb.Value").field("id", Schema.INT32_SCHEMA).field("colb", Schema.OPTIONAL_STRING_SCHEMA).build());
        });
        this.connection.execute(new String[]{"ALTER TABLE dbo.tableb DROP COLUMN colb"});
        TestHelper.enableTableCdc(this.connection, "tableb", "after_change");
        for (int i3 = 0; i3 < 5; i3++) {
            int i4 = 100 + i3;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + i4 + ", 'a2')"});
            this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + i4 + ")"});
        }
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(10);
        Assertions.assertThat(consumeRecordsByTopic2.recordsForTopic("server1.testDB1.dbo.tablea")).hasSize(5);
        Assertions.assertThat(consumeRecordsByTopic2.recordsForTopic("server1.testDB1.dbo.tableb")).hasSize(5);
        consumeRecordsByTopic2.recordsForTopic("server1.testDB1.dbo.tableb").forEach(sourceRecord2 -> {
            assertSchemaMatchesStruct((Struct) ((Struct) sourceRecord2.value()).get("after"), SchemaBuilder.struct().optional().name("server1.testDB1.dbo.tableb.Value").field("id", Schema.INT32_SCHEMA).build());
        });
        for (int i5 = 0; i5 < 5; i5++) {
            int i6 = 1000 + i5;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + i6 + ", 'a3')"});
            this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + i6 + ")"});
        }
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic3 = consumeRecordsByTopic(10);
        Assertions.assertThat(consumeRecordsByTopic3.recordsForTopic("server1.testDB1.dbo.tablea")).hasSize(5);
        Assertions.assertThat(consumeRecordsByTopic3.recordsForTopic("server1.testDB1.dbo.tableb")).hasSize(5);
        consumeRecordsByTopic3.recordsForTopic("server1.testDB1.dbo.tableb").forEach(sourceRecord3 -> {
            assertSchemaMatchesStruct((Struct) ((Struct) sourceRecord3.value()).get("after"), SchemaBuilder.struct().optional().name("server1.testDB1.dbo.tableb.Value").field("id", Schema.INT32_SCHEMA).build());
        });
    }

    @Test
    @FixFor({"DBZ-2716"})
    public void removeColumnFromTableWithoutChangingCapture() throws Exception {
        this.connection.execute(new String[]{"CREATE TABLE tableb2 (colb1 varchar(30), id int primary key, colb2 varchar(30))"});
        TestHelper.enableTableCdc(this.connection, "tableb2");
        this.connection.execute(new String[]{"ALTER TABLE dbo.tableb2 DROP COLUMN colb1"});
        start(SqlServerConnector.class, TestHelper.defaultConfig().with(SqlServerConnectorConfig.TABLE_INCLUDE_LIST, "dbo.tableb2").with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.NO_DATA).with(SqlServerConnectorConfig.COLUMN_INCLUDE_LIST, ".*id").build());
        assertConnectorIsRunning();
        TestHelper.waitForSnapshotToBeCompleted();
        for (int i = 0; i < 5; i++) {
            this.connection.execute(new String[]{"INSERT INTO tableb2 VALUES(" + (10 + i) + ", 'b2')"});
        }
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(5);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("server1.testDB1.dbo.tableb2")).hasSize(5);
        consumeRecordsByTopic.recordsForTopic("server1.testDB1.dbo.tableb2").forEach(sourceRecord -> {
            assertSchemaMatchesStruct((Struct) ((Struct) sourceRecord.value()).get("after"), SchemaBuilder.struct().optional().name("server1.testDB1.dbo.tableb2.Value").field("id", Schema.INT32_SCHEMA).build());
        });
    }

    @Test
    public void addColumnToTableWithParallelWrites() throws Exception {
        start(SqlServerConnector.class, TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.NO_DATA).build());
        assertConnectorIsRunning();
        TestHelper.waitForSnapshotToBeCompleted();
        for (int i = 0; i < 20; i++) {
            int i2 = 10 + i;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + i2 + ", 'a')"});
            this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + i2 + ", 'b')"});
        }
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(40);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("server1.testDB1.dbo.tablea")).hasSize(20);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("server1.testDB1.dbo.tableb")).hasSize(20);
        consumeRecordsByTopic.recordsForTopic("server1.testDB1.dbo.tableb").forEach(sourceRecord -> {
            assertSchemaMatchesStruct((Struct) ((Struct) sourceRecord.value()).get("after"), SchemaBuilder.struct().optional().name("server1.testDB1.dbo.tableb.Value").field("id", Schema.INT32_SCHEMA).field("colb", Schema.OPTIONAL_STRING_SCHEMA).build());
        });
        Executors.newSingleThreadExecutor().submit(() -> {
            try {
                SqlServerConnection testConnection = TestHelper.testConnection();
                for (int i3 = 0; i3 < 20; i3++) {
                    try {
                        int i4 = 100 + i3;
                        testConnection.execute(new String[]{"INSERT INTO tablea VALUES(" + i4 + ", 'a2')"});
                        testConnection.execute(new String[]{"INSERT INTO tableb(id,colb) VALUES(" + i4 + ",'b')"});
                        Thread.sleep(1000L);
                    } finally {
                    }
                }
                if (testConnection != null) {
                    testConnection.close();
                }
            } catch (Exception e) {
                e.printStackTrace();
                throw new IllegalArgumentException(e);
            }
        });
        this.connection.execute(new String[]{"ALTER TABLE dbo.tableb ADD colb2 VARCHAR(32)"});
        TestHelper.enableTableCdc(this.connection, "tableb", "after_change");
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(40);
        Assertions.assertThat(consumeRecordsByTopic2.recordsForTopic("server1.testDB1.dbo.tablea")).hasSize(20);
        Assertions.assertThat(consumeRecordsByTopic2.recordsForTopic("server1.testDB1.dbo.tableb")).hasSize(20);
        AtomicInteger atomicInteger = new AtomicInteger();
        AtomicInteger atomicInteger2 = new AtomicInteger();
        consumeRecordsByTopic2.recordsForTopic("server1.testDB1.dbo.tableb").forEach(sourceRecord2 -> {
            if (((Struct) sourceRecord2.value()).getStruct("after").schema().field("colb2") != null) {
                atomicInteger2.incrementAndGet();
            } else {
                atomicInteger.incrementAndGet();
                Assertions.assertThat(atomicInteger2.intValue()).isZero();
            }
        });
        Assertions.assertThat(atomicInteger.intValue()).isPositive();
        Assertions.assertThat(atomicInteger2.intValue()).isPositive();
        for (int i3 = 0; i3 < 20; i3++) {
            int i4 = 1000 + i3;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + i4 + ", 'a3')"});
            this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + i4 + ", 'b1', 'b2')"});
        }
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic3 = consumeRecordsByTopic(40);
        Assertions.assertThat(consumeRecordsByTopic3.recordsForTopic("server1.testDB1.dbo.tablea")).hasSize(20);
        Assertions.assertThat(consumeRecordsByTopic3.recordsForTopic("server1.testDB1.dbo.tableb")).hasSize(20);
        consumeRecordsByTopic3.recordsForTopic("server1.testDB1.dbo.tableb").forEach(sourceRecord3 -> {
            assertSchemaMatchesStruct((Struct) ((Struct) sourceRecord3.value()).get("after"), SchemaBuilder.struct().optional().name("server1.testDB1.dbo.tableb.Value").field("id", Schema.INT32_SCHEMA).field("colb", Schema.OPTIONAL_STRING_SCHEMA).field("colb2", Schema.OPTIONAL_STRING_SCHEMA).build());
        });
    }

    @Test
    public void readHistoryAfterRestart() throws Exception {
        Configuration build = TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.NO_DATA).build();
        start(SqlServerConnector.class, build);
        assertConnectorIsRunning();
        TestHelper.waitForStreamingStarted();
        for (int i = 0; i < 1; i++) {
            int i2 = 10 + i;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + i2 + ", 'a')"});
            this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + i2 + ", 'b')"});
        }
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(2);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("server1.testDB1.dbo.tablea")).hasSize(1);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("server1.testDB1.dbo.tableb")).hasSize(1);
        this.connection.execute(new String[]{"ALTER TABLE dbo.tableb DROP COLUMN colb"});
        TestHelper.enableTableCdc(this.connection, "tableb", "after_change");
        for (int i3 = 0; i3 < 1; i3++) {
            int i4 = 100 + i3;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + i4 + ", 'a2')"});
            this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + i4 + ")"});
        }
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(2);
        Assertions.assertThat(consumeRecordsByTopic2.recordsForTopic("server1.testDB1.dbo.tablea")).hasSize(1);
        Assertions.assertThat(consumeRecordsByTopic2.recordsForTopic("server1.testDB1.dbo.tableb")).hasSize(1);
        stopConnector();
        start(SqlServerConnector.class, build);
        assertConnectorIsRunning();
        for (int i5 = 0; i5 < 1; i5++) {
            int i6 = 1000 + i5;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + i6 + ", 'a3')"});
            this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + i6 + ")"});
        }
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic3 = consumeRecordsByTopic(2);
        Assertions.assertThat(consumeRecordsByTopic3.recordsForTopic("server1.testDB1.dbo.tablea")).hasSize(1);
        Assertions.assertThat(consumeRecordsByTopic3.recordsForTopic("server1.testDB1.dbo.tableb")).hasSize(1);
        consumeRecordsByTopic3.recordsForTopic("server1.testDB1.dbo.tableb").forEach(sourceRecord -> {
            assertSchemaMatchesStruct((Struct) ((Struct) sourceRecord.value()).get("after"), SchemaBuilder.struct().optional().name("server1.testDB1.dbo.tableb.Value").field("id", Schema.INT32_SCHEMA).build());
        });
        DocumentReader defaultReader = DocumentReader.defaultReader();
        ArrayList arrayList = new ArrayList();
        IoUtil.readLines(TestHelper.SCHEMA_HISTORY_PATH, str -> {
            try {
                arrayList.add(defaultReader.read(str));
            } catch (IOException e) {
                throw new IllegalStateException(e);
            }
        });
        Assertions.assertThat(arrayList).hasSize(4);
        arrayList.subList(0, 3).forEach(document -> {
            Array array = document.getArray("tableChanges");
            Assertions.assertThat(array.size()).isEqualTo(1);
            Assertions.assertThat(array.get(0).asDocument().getString("type")).isEqualTo("CREATE");
        });
        Array array = ((Document) arrayList.get(3)).getArray("tableChanges");
        Assertions.assertThat(array.size()).isEqualTo(1);
        String string = array.get(0).asDocument().getString("type");
        String string2 = array.get(0).asDocument().getString("id");
        Assertions.assertThat(string).isEqualTo("ALTER");
        Assertions.assertThat(string2).isEqualTo("\"testDB1\".\"dbo\".\"tableb\"");
    }

    @Test
    public void renameColumn() throws Exception {
        start(SqlServerConnector.class, TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.NO_DATA).build());
        assertConnectorIsRunning();
        TestHelper.waitForSnapshotToBeCompleted();
        for (int i = 0; i < 5; i++) {
            int i2 = 10 + i;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + i2 + ", 'a')"});
            this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + i2 + ", 'b')"});
        }
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(10);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("server1.testDB1.dbo.tablea")).hasSize(5);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("server1.testDB1.dbo.tableb")).hasSize(5);
        consumeRecordsByTopic.recordsForTopic("server1.testDB1.dbo.tableb").forEach(sourceRecord -> {
            assertSchemaMatchesStruct((Struct) ((Struct) sourceRecord.value()).get("after"), SchemaBuilder.struct().optional().name("server1.testDB1.dbo.tableb.Value").field("id", Schema.INT32_SCHEMA).field("colb", Schema.OPTIONAL_STRING_SCHEMA).build());
        });
        TestHelper.disableTableCdc(this.connection, "tableb");
        this.connection.execute(new String[]{"exec sp_rename 'tableb.colb', 'newcolb';"});
        TestHelper.enableTableCdc(this.connection, "tableb", "after_change");
        for (int i3 = 0; i3 < 5; i3++) {
            int i4 = 100 + i3;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + i4 + ", 'a2')"});
            this.connection.execute(new String[]{"INSERT INTO tableb(id,newcolb) VALUES(" + i4 + ", 'b2')"});
        }
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(10);
        Assertions.assertThat(consumeRecordsByTopic2.recordsForTopic("server1.testDB1.dbo.tablea")).hasSize(5);
        Assertions.assertThat(consumeRecordsByTopic2.recordsForTopic("server1.testDB1.dbo.tableb")).hasSize(5);
        consumeRecordsByTopic2.recordsForTopic("server1.testDB1.dbo.tableb").forEach(sourceRecord2 -> {
            assertSchemaMatchesStruct((Struct) ((Struct) sourceRecord2.value()).get("after"), SchemaBuilder.struct().optional().name("server1.testDB1.dbo.tableb.Value").field("id", Schema.INT32_SCHEMA).field("newcolb", Schema.OPTIONAL_STRING_SCHEMA).build());
        });
        for (int i5 = 0; i5 < 5; i5++) {
            int i6 = 1000 + i5;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + i6 + ", 'a3')"});
            this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + i6 + ", 'b3')"});
        }
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic3 = consumeRecordsByTopic(10);
        Assertions.assertThat(consumeRecordsByTopic3.recordsForTopic("server1.testDB1.dbo.tablea")).hasSize(5);
        Assertions.assertThat(consumeRecordsByTopic3.recordsForTopic("server1.testDB1.dbo.tableb")).hasSize(5);
        consumeRecordsByTopic3.recordsForTopic("server1.testDB1.dbo.tableb").forEach(sourceRecord3 -> {
            assertSchemaMatchesStruct((Struct) ((Struct) sourceRecord3.value()).get("after"), SchemaBuilder.struct().optional().name("server1.testDB1.dbo.tableb.Value").field("id", Schema.INT32_SCHEMA).field("newcolb", Schema.OPTIONAL_STRING_SCHEMA).build());
        });
    }

    @Test
    public void changeColumn() throws Exception {
        start(SqlServerConnector.class, TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.NO_DATA).build());
        assertConnectorIsRunning();
        TestHelper.waitForSnapshotToBeCompleted();
        for (int i = 0; i < 5; i++) {
            int i2 = 10 + i;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + i2 + ", 'a')"});
            this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + i2 + ", '" + i2 + "')"});
        }
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(10);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("server1.testDB1.dbo.tablea")).hasSize(5);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("server1.testDB1.dbo.tableb")).hasSize(5);
        consumeRecordsByTopic.recordsForTopic("server1.testDB1.dbo.tableb").forEach(sourceRecord -> {
            assertSchemaMatchesStruct((Struct) ((Struct) sourceRecord.value()).get("after"), SchemaBuilder.struct().optional().name("server1.testDB1.dbo.tableb.Value").field("id", Schema.INT32_SCHEMA).field("colb", Schema.OPTIONAL_STRING_SCHEMA).build());
            Struct struct = ((Struct) sourceRecord.value()).getStruct("after");
            int intValue = struct.getInt32("id").intValue();
            Assertions.assertThat(Integer.toString(intValue)).isEqualTo(struct.getString("colb"));
        });
        this.connection.execute(new String[]{"ALTER TABLE dbo.tableb ALTER COLUMN colb INT"});
        TestHelper.enableTableCdc(this.connection, "tableb", "after_change");
        for (int i3 = 0; i3 < 5; i3++) {
            int i4 = 100 + i3;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + i4 + ", 'a2')"});
            this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + i4 + ", '" + i4 + " ')"});
        }
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(10);
        Assertions.assertThat(consumeRecordsByTopic2.recordsForTopic("server1.testDB1.dbo.tablea")).hasSize(5);
        Assertions.assertThat(consumeRecordsByTopic2.recordsForTopic("server1.testDB1.dbo.tableb")).hasSize(5);
        consumeRecordsByTopic2.recordsForTopic("server1.testDB1.dbo.tableb").forEach(sourceRecord2 -> {
            assertSchemaMatchesStruct((Struct) ((Struct) sourceRecord2.value()).get("after"), SchemaBuilder.struct().optional().name("server1.testDB1.dbo.tableb.Value").field("id", Schema.INT32_SCHEMA).field("colb", Schema.OPTIONAL_INT32_SCHEMA).build());
            Struct struct = ((Struct) sourceRecord2.value()).getStruct("after");
            int intValue = struct.getInt32("id").intValue();
            Assertions.assertThat(intValue).isEqualTo(struct.getInt32("colb").intValue());
        });
        for (int i5 = 0; i5 < 5; i5++) {
            int i6 = 1000 + i5;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + i6 + ", 'a3')"});
            this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + i6 + ", '" + i6 + " ')"});
        }
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic3 = consumeRecordsByTopic(10);
        Assertions.assertThat(consumeRecordsByTopic3.recordsForTopic("server1.testDB1.dbo.tablea")).hasSize(5);
        Assertions.assertThat(consumeRecordsByTopic3.recordsForTopic("server1.testDB1.dbo.tableb")).hasSize(5);
        consumeRecordsByTopic3.recordsForTopic("server1.testDB1.dbo.tableb").forEach(sourceRecord3 -> {
            assertSchemaMatchesStruct((Struct) ((Struct) sourceRecord3.value()).get("after"), SchemaBuilder.struct().optional().name("server1.testDB1.dbo.tableb.Value").field("id", Schema.INT32_SCHEMA).field("colb", Schema.OPTIONAL_INT32_SCHEMA).build());
            Struct struct = ((Struct) sourceRecord3.value()).getStruct("after");
            int intValue = struct.getInt32("id").intValue();
            Assertions.assertThat(intValue).isEqualTo(struct.getInt32("colb").intValue());
        });
    }

    @Test
    @FixFor({"DBZ-1491"})
    public void addDefaultValue() throws Exception {
        start(SqlServerConnector.class, TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.NO_DATA).build());
        assertConnectorIsRunning();
        TestHelper.waitForSnapshotToBeCompleted();
        TestHelper.waitForStreamingStarted();
        TestHelper.waitForMaxLsnAvailable(this.connection);
        this.connection.execute(new String[]{"ALTER TABLE dbo.tableb ADD DEFAULT ('default_value') FOR colb"});
        TestHelper.enableTableCdc(this.connection, "tableb", "after_change");
        this.connection.execute(new String[]{"INSERT INTO tableb VALUES('1', 'some_value')"});
        TestHelper.waitForCdcRecord(this.connection, "tableb", "after_change", resultSet -> {
            return resultSet.getInt("id") == 1;
        });
        List recordsForTopic = consumeRecordsByTopic(1).recordsForTopic("server1.testDB1.dbo.tableb");
        Assertions.assertThat(recordsForTopic).hasSize(1);
        Testing.debug("Records: " + String.valueOf(recordsForTopic));
        Testing.debug("Value Schema: " + String.valueOf(((SourceRecord) recordsForTopic.get(0)).valueSchema()));
        Testing.debug("Fields: " + String.valueOf(((SourceRecord) recordsForTopic.get(0)).valueSchema().fields()));
        Testing.debug("After Schema: " + String.valueOf(((SourceRecord) recordsForTopic.get(0)).valueSchema().field("after").schema()));
        Testing.debug("After Columns: " + String.valueOf(((SourceRecord) recordsForTopic.get(0)).valueSchema().field("after").schema().fields()));
        Schema schema = ((SourceRecord) recordsForTopic.get(0)).valueSchema().field("after").schema().field("colb").schema();
        Testing.debug("ColumnB Schema: " + String.valueOf(schema));
        Testing.debug("ColumnB Schema Default Value: " + String.valueOf(schema.defaultValue()));
        Assertions.assertThat(schema.defaultValue()).isNotNull();
        Assertions.assertThat(schema.defaultValue()).isEqualTo("default_value");
    }

    @Test
    @FixFor({"DBZ-1491"})
    public void alterDefaultValue() throws Exception {
        this.connection.execute(new String[]{"CREATE TABLE table_dv (id int primary key, colb varchar(30))"});
        this.connection.execute(new String[]{"ALTER TABLE dbo.table_dv ADD CONSTRAINT DV_colb DEFAULT ('default_value') FOR colb"});
        TestHelper.enableTableCdc(this.connection, "table_dv");
        start(SqlServerConnector.class, TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.NO_DATA).build());
        assertConnectorIsRunning();
        TestHelper.waitForSnapshotToBeCompleted();
        this.connection.execute(new String[]{"INSERT INTO table_dv VALUES('1', 'some_value')"});
        consumeRecordsByTopic(1);
        this.connection.execute(new String[]{"ALTER TABLE dbo.table_dv DROP CONSTRAINT DV_colb"});
        this.connection.execute(new String[]{"ALTER TABLE dbo.table_dv ADD DEFAULT ('new_default_value') FOR colb"});
        TestHelper.enableTableCdc(this.connection, "table_dv", "after_change");
        this.connection.execute(new String[]{"INSERT INTO table_dv VALUES('2', 'some_value2')"});
        List recordsForTopic = consumeRecordsByTopic(1).recordsForTopic("server1.testDB1.dbo.table_dv");
        Assertions.assertThat(recordsForTopic).hasSize(1);
        Schema schema = ((SourceRecord) recordsForTopic.get(0)).valueSchema().field("after").schema().field("colb").schema();
        Assertions.assertThat(schema.defaultValue()).isNotNull();
        Assertions.assertThat(schema.defaultValue()).isEqualTo("new_default_value");
    }
}
