package io.debezium.connector.sqlserver;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.sqlserver.SqlServerConnectorConfig;
import io.debezium.connector.sqlserver.util.TestHelper;
import io.debezium.converters.CloudEventsConverterTest;
import io.debezium.data.SchemaAndValueField;
import io.debezium.data.SourceRecordAssert;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.time.Timestamp;
import io.debezium.util.Testing;
import java.math.BigDecimal;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.List;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.fest.assertions.Assertions;
import org.fest.assertions.BooleanAssert;
import org.fest.assertions.MapAssert;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:io/debezium/connector/sqlserver/SnapshotIT.class */
public class SnapshotIT extends AbstractConnectorTest {
    private static final int INITIAL_RECORDS_PER_TABLE = 500;
    private static final int STREAMING_RECORDS_PER_TABLE = 500;
    private SqlServerConnection connection;

    @Before
    public void before() throws SQLException {
        TestHelper.createTestDatabase();
        this.connection = TestHelper.testConnection();
        this.connection.execute(new String[]{"CREATE TABLE table1 (id int, name varchar(30), price decimal(8,2), ts datetime2(0), primary key(id))"});
        for (int i = 0; i < 500; i++) {
            this.connection.execute(new String[]{String.format("INSERT INTO table1 VALUES(%s, '%s', %s, '%s')", Integer.valueOf(i), "name" + i, new BigDecimal(i + ".23"), "2018-07-18 13:28:56")});
        }
        TestHelper.enableTableCdc(this.connection, "table1");
        initializeConnectorTestFramework();
        Testing.Files.delete(TestHelper.DB_HISTORY_PATH);
    }

    @After
    public void after() throws SQLException {
        if (this.connection != null) {
            this.connection.close();
        }
    }

    @Test
    public void takeSnapshotInExclusiveMode() throws Exception {
        takeSnapshot(SqlServerConnectorConfig.SnapshotIsolationMode.EXCLUSIVE);
    }

    @Test
    public void takeSnapshotInSnapshotMode() throws Exception {
        Testing.Print.enable();
        takeSnapshot(SqlServerConnectorConfig.SnapshotIsolationMode.SNAPSHOT);
    }

    @Test
    public void takeSnapshotInRepeatableReadMode() throws Exception {
        takeSnapshot(SqlServerConnectorConfig.SnapshotIsolationMode.REPEATABLE_READ);
    }

    @Test
    public void takeSnapshotInReadCommittedMode() throws Exception {
        takeSnapshot(SqlServerConnectorConfig.SnapshotIsolationMode.READ_COMMITTED);
    }

    @Test
    public void takeSnapshotInReadUncommittedMode() throws Exception {
        takeSnapshot(SqlServerConnectorConfig.SnapshotIsolationMode.READ_UNCOMMITTED);
    }

    private void takeSnapshot(SqlServerConnectorConfig.SnapshotIsolationMode snapshotIsolationMode) throws Exception {
        start(SqlServerConnector.class, TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_ISOLATION_MODE.name(), snapshotIsolationMode.getValue()).build());
        assertConnectorIsRunning();
        List recordsForTopic = consumeRecordsByTopic(500).recordsForTopic("server1.dbo.table1");
        Assertions.assertThat(recordsForTopic).hasSize(500);
        int i = 0;
        while (i < 500) {
            SourceRecord sourceRecord = (SourceRecord) recordsForTopic.get(i);
            List<SchemaAndValueField> asList = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, Integer.valueOf(i)));
            List<SchemaAndValueField> asList2 = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, Integer.valueOf(i)), new SchemaAndValueField("name", Schema.OPTIONAL_STRING_SCHEMA, "name" + i), new SchemaAndValueField("price", Decimal.builder(2).parameter("connect.decimal.precision", "8").optional().build(), new BigDecimal(i + ".23")), new SchemaAndValueField("ts", Timestamp.builder().optional().schema(), 1531920536000L));
            Struct struct = (Struct) sourceRecord.key();
            Struct struct2 = (Struct) sourceRecord.value();
            assertRecord(struct, asList);
            assertRecord((Struct) struct2.get("after"), asList2);
            MapAssert assertThat = Assertions.assertThat(sourceRecord.sourceOffset());
            MapAssert.Entry[] entryArr = new MapAssert.Entry[2];
            entryArr[0] = MapAssert.entry("snapshot", true);
            entryArr[1] = MapAssert.entry("snapshot_completed", Boolean.valueOf(i == 499));
            assertThat.includes(entryArr);
            Assert.assertNull(struct2.get("before"));
            i++;
        }
    }

    @Test
    public void takeSnapshotAndStartStreaming() throws Exception {
        start(SqlServerConnector.class, TestHelper.defaultConfig().build());
        assertConnectorIsRunning();
        List recordsForTopic = consumeRecordsByTopic(500).recordsForTopic("server1.dbo.table1");
        recordsForTopic.subList(0, 499).forEach(sourceRecord -> {
            Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("source").getString("snapshot")).isEqualTo("true");
        });
        Assertions.assertThat(((Struct) ((SourceRecord) recordsForTopic.get(499)).value()).getStruct("source").getString("snapshot")).isEqualTo("last");
        testStreaming();
    }

    @Test
    @FixFor({"DBZ-1280"})
    public void testDeadlockDetection() throws Exception {
        LogInterceptor logInterceptor = new LogInterceptor();
        start(SqlServerConnector.class, TestHelper.defaultConfig().with(RelationalDatabaseConnectorConfig.SNAPSHOT_LOCK_TIMEOUT_MS, 1000).build());
        assertConnectorIsRunning();
        this.connection.setAutoCommit(false).executeWithoutCommitting(new String[]{"SELECT TOP(0) * FROM dbo.table1 WITH (TABLOCKX)"});
        consumeRecordsByTopic(500);
        assertConnectorNotRunning();
        ((BooleanAssert) Assertions.assertThat(logInterceptor.containsStacktraceElement("Lock request time out period exceeded.")).as("Log contains error related to lock timeout")).isTrue();
        this.connection.rollback();
    }

    @Test
    public void takeSnapshotWithOldStructAndStartStreaming() throws Exception {
        start(SqlServerConnector.class, TestHelper.defaultConfig().with(SqlServerConnectorConfig.SOURCE_STRUCT_MAKER_VERSION, CommonConnectorConfig.Version.V1).build());
        assertConnectorIsRunning();
        consumeRecordsByTopic(500).recordsForTopic("server1.dbo.table1").forEach(sourceRecord -> {
            Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("source").getBoolean("snapshot")).isTrue();
        });
        testStreaming();
    }

    private void testStreaming() throws SQLException, InterruptedException {
        for (int i = 0; i < 500; i++) {
            int i2 = i + 500;
            this.connection.execute(new String[]{String.format("INSERT INTO table1 VALUES(%s, '%s', %s, '%s')", Integer.valueOf(i2), "name" + i2, new BigDecimal(i2 + ".23"), "2018-07-18 13:28:56")});
        }
        List recordsForTopic = consumeRecordsByTopic(500).recordsForTopic("server1.dbo.table1");
        Assertions.assertThat(recordsForTopic).hasSize(500);
        for (int i3 = 0; i3 < 500; i3++) {
            int i4 = i3 + 500;
            SourceRecord sourceRecord = (SourceRecord) recordsForTopic.get(i3);
            List<SchemaAndValueField> asList = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, Integer.valueOf(i4)));
            List<SchemaAndValueField> asList2 = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, Integer.valueOf(i4)), new SchemaAndValueField("name", Schema.OPTIONAL_STRING_SCHEMA, "name" + i4), new SchemaAndValueField("price", Decimal.builder(2).parameter("connect.decimal.precision", "8").optional().build(), new BigDecimal(i4 + ".23")), new SchemaAndValueField("ts", Timestamp.builder().optional().schema(), 1531920536000L));
            Struct struct = (Struct) sourceRecord.key();
            Struct struct2 = (Struct) sourceRecord.value();
            assertRecord(struct, asList);
            assertRecord((Struct) struct2.get("after"), asList2);
            Assertions.assertThat(sourceRecord.sourceOffset()).hasSize(4);
            Assert.assertTrue(sourceRecord.sourceOffset().containsKey("change_lsn"));
            Assert.assertTrue(sourceRecord.sourceOffset().containsKey("commit_lsn"));
            Assert.assertTrue(sourceRecord.sourceOffset().containsKey("event_serial_no"));
            Assert.assertNull(struct2.get("before"));
        }
    }

    @Test
    public void takeSchemaOnlySnapshotAndStartStreaming() throws Exception {
        start(SqlServerConnector.class, TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.SCHEMA_ONLY).build());
        assertConnectorIsRunning();
        TestHelper.waitForSnapshotToBeCompleted();
        testStreaming();
    }

    @Test
    @FixFor({"DBZ-1031"})
    public void takeSnapshotFromTableWithReservedName() throws Exception {
        this.connection.execute(new String[]{"CREATE TABLE [User] (id int, name varchar(30), primary key(id))"});
        for (int i = 0; i < 500; i++) {
            this.connection.execute(new String[]{String.format("INSERT INTO [User] VALUES(%s, '%s')", Integer.valueOf(i), "name" + i)});
        }
        TestHelper.enableTableCdc(this.connection, "User");
        initializeConnectorTestFramework();
        Testing.Files.delete(TestHelper.DB_HISTORY_PATH);
        start(SqlServerConnector.class, TestHelper.defaultConfig().with("table.whitelist", "dbo.User").build());
        assertConnectorIsRunning();
        List recordsForTopic = consumeRecordsByTopic(500).recordsForTopic("server1.dbo.User");
        Assertions.assertThat(recordsForTopic).hasSize(500);
        int i2 = 0;
        while (i2 < 500) {
            SourceRecord sourceRecord = (SourceRecord) recordsForTopic.get(i2);
            List<SchemaAndValueField> asList = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, Integer.valueOf(i2)));
            List<SchemaAndValueField> asList2 = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, Integer.valueOf(i2)), new SchemaAndValueField("name", Schema.OPTIONAL_STRING_SCHEMA, "name" + i2));
            Struct struct = (Struct) sourceRecord.key();
            Struct struct2 = (Struct) sourceRecord.value();
            assertRecord(struct, asList);
            assertRecord((Struct) struct2.get("after"), asList2);
            MapAssert assertThat = Assertions.assertThat(sourceRecord.sourceOffset());
            MapAssert.Entry[] entryArr = new MapAssert.Entry[2];
            entryArr[0] = MapAssert.entry("snapshot", true);
            entryArr[1] = MapAssert.entry("snapshot_completed", Boolean.valueOf(i2 == 499));
            assertThat.includes(entryArr);
            Assert.assertNull(struct2.get("before"));
            i2++;
        }
    }

    @Test
    public void takeSchemaOnlySnapshotAndSendHeartbeat() throws Exception {
        start(SqlServerConnector.class, TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.SCHEMA_ONLY).with(Heartbeat.HEARTBEAT_INTERVAL, 300000).build());
        assertConnectorIsRunning();
        TestHelper.waitForSnapshotToBeCompleted();
        SourceRecord consumeRecord = consumeRecord();
        Assertions.assertThat(consumeRecord).isNotNull();
        Assertions.assertThat(consumeRecord.topic()).startsWith("__debezium-heartbeat");
    }

    @Test
    @FixFor({"DBZ-1067"})
    public void blacklistColumn() throws Exception {
        this.connection.execute(new String[]{"CREATE TABLE blacklist_column_table_a (id int, name varchar(30), amount integer primary key(id))", "CREATE TABLE blacklist_column_table_b (id int, name varchar(30), amount integer primary key(id))"});
        this.connection.execute(new String[]{"INSERT INTO blacklist_column_table_a VALUES(10, 'some_name', 120)"});
        this.connection.execute(new String[]{"INSERT INTO blacklist_column_table_b VALUES(11, 'some_name', 447)"});
        TestHelper.enableTableCdc(this.connection, "blacklist_column_table_a");
        TestHelper.enableTableCdc(this.connection, "blacklist_column_table_b");
        start(SqlServerConnector.class, TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.INITIAL).with(SqlServerConnectorConfig.COLUMN_BLACKLIST, "dbo.blacklist_column_table_a.amount").with(SqlServerConnectorConfig.TABLE_WHITELIST, "dbo.blacklist_column_table_a,dbo.blacklist_column_table_b").build());
        assertConnectorIsRunning();
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(2);
        List recordsForTopic = consumeRecordsByTopic.recordsForTopic("server1.dbo.blacklist_column_table_a");
        List recordsForTopic2 = consumeRecordsByTopic.recordsForTopic("server1.dbo.blacklist_column_table_b");
        Schema build = SchemaBuilder.struct().optional().name("server1.dbo.blacklist_column_table_a.Value").field("id", Schema.INT32_SCHEMA).field("name", Schema.OPTIONAL_STRING_SCHEMA).build();
        Struct put = new Struct(build).put("id", 10).put("name", "some_name");
        Schema build2 = SchemaBuilder.struct().optional().name("server1.dbo.blacklist_column_table_b.Value").field("id", Schema.INT32_SCHEMA).field("name", Schema.OPTIONAL_STRING_SCHEMA).field("amount", Schema.OPTIONAL_INT32_SCHEMA).build();
        Struct put2 = new Struct(build2).put("id", 11).put("name", "some_name").put("amount", 447);
        Assertions.assertThat(recordsForTopic).hasSize(1);
        SourceRecordAssert.assertThat((SourceRecord) recordsForTopic.get(0)).valueAfterFieldIsEqualTo(put).valueAfterFieldSchemaIsEqualTo(build);
        Assertions.assertThat(recordsForTopic2).hasSize(1);
        SourceRecordAssert.assertThat((SourceRecord) recordsForTopic2.get(0)).valueAfterFieldIsEqualTo(put2).valueAfterFieldSchemaIsEqualTo(build2);
        stopConnector();
    }

    @Test
    public void reoderCapturedTables() throws Exception {
        this.connection.execute(new String[]{"CREATE TABLE table_a (id int, name varchar(30), amount integer primary key(id))", "CREATE TABLE table_b (id int, name varchar(30), amount integer primary key(id))"});
        this.connection.execute(new String[]{"INSERT INTO table_a VALUES(10, 'some_name', 120)"});
        this.connection.execute(new String[]{"INSERT INTO table_b VALUES(11, 'some_name', 447)"});
        TestHelper.enableTableCdc(this.connection, "table_a");
        TestHelper.enableTableCdc(this.connection, "table_b");
        start(SqlServerConnector.class, TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.INITIAL).with(SqlServerConnectorConfig.TABLE_WHITELIST, "dbo.table_b,dbo.table_a").build());
        assertConnectorIsRunning();
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        List recordsForTopic = consumeRecordsByTopic.recordsForTopic("server1.dbo.table_a");
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("server1.dbo.table_b")).hasSize(1);
        Assertions.assertThat(recordsForTopic).isNull();
        Assertions.assertThat(consumeRecordsByTopic(1).recordsForTopic("server1.dbo.table_a")).hasSize(1);
        stopConnector();
    }

    @Test
    public void reoderCapturedTablesWithOverlappingTableWhitelist() throws Exception {
        this.connection.execute(new String[]{"CREATE TABLE table_a (id int, name varchar(30), amount integer primary key(id))", "CREATE TABLE table_ac (id int, name varchar(30), amount integer primary key(id))", "CREATE TABLE table_ab (id int, name varchar(30), amount integer primary key(id))"});
        this.connection.execute(new String[]{"INSERT INTO table_a VALUES(10, 'some_name', 120)"});
        this.connection.execute(new String[]{"INSERT INTO table_ab VALUES(11, 'some_name', 447)"});
        this.connection.execute(new String[]{"INSERT INTO table_ac VALUES(12, 'some_name', 885)"});
        TestHelper.enableTableCdc(this.connection, "table_a");
        TestHelper.enableTableCdc(this.connection, "table_ab");
        TestHelper.enableTableCdc(this.connection, "table_ac");
        start(SqlServerConnector.class, TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.INITIAL).with(SqlServerConnectorConfig.TABLE_WHITELIST, "dbo.table_ab,dbo.table_(.*)").build());
        assertConnectorIsRunning();
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        List recordsForTopic = consumeRecordsByTopic.recordsForTopic("server1.dbo.table_a");
        List recordsForTopic2 = consumeRecordsByTopic.recordsForTopic("server1.dbo.table_ab");
        List recordsForTopic3 = consumeRecordsByTopic.recordsForTopic("server1.dbo.table_ac");
        Assertions.assertThat(recordsForTopic2).hasSize(1);
        Assertions.assertThat(recordsForTopic).isNull();
        Assertions.assertThat(recordsForTopic3).isNull();
        Assertions.assertThat(consumeRecordsByTopic(1).recordsForTopic("server1.dbo.table_a")).hasSize(1);
        Assertions.assertThat(recordsForTopic3).isNull();
        Assertions.assertThat(consumeRecordsByTopic(1).recordsForTopic("server1.dbo.table_ac")).hasSize(1);
        stopConnector();
    }

    @Test
    public void reoderCapturedTablesWithoutTableWhitelist() throws Exception {
        this.connection.execute(new String[]{"CREATE TABLE table_ac (id int, name varchar(30), amount integer primary key(id))", "CREATE TABLE table_a (id int, name varchar(30), amount integer primary key(id))", "CREATE TABLE table_ab (id int, name varchar(30), amount integer primary key(id))"});
        this.connection.execute(new String[]{"INSERT INTO table_ac VALUES(12, 'some_name', 885)"});
        this.connection.execute(new String[]{"INSERT INTO table_a VALUES(10, 'some_name', 120)"});
        this.connection.execute(new String[]{"INSERT INTO table_ab VALUES(11, 'some_name', 447)"});
        TestHelper.enableTableCdc(this.connection, "table_a");
        TestHelper.enableTableCdc(this.connection, "table_ab");
        TestHelper.enableTableCdc(this.connection, "table_ac");
        start(SqlServerConnector.class, TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.INITIAL).with(SqlServerConnectorConfig.TABLE_BLACKLIST, "dbo.table1").build());
        assertConnectorIsRunning();
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        List recordsForTopic = consumeRecordsByTopic.recordsForTopic("server1.dbo.table_a");
        List recordsForTopic2 = consumeRecordsByTopic.recordsForTopic("server1.dbo.table_ab");
        List recordsForTopic3 = consumeRecordsByTopic.recordsForTopic("server1.dbo.table_ac");
        Assertions.assertThat(recordsForTopic).hasSize(1);
        Assertions.assertThat(recordsForTopic2).isNull();
        Assertions.assertThat(recordsForTopic3).isNull();
        Assertions.assertThat(consumeRecordsByTopic(1).recordsForTopic("server1.dbo.table_ab")).hasSize(1);
        Assertions.assertThat(recordsForTopic3).isNull();
        Assertions.assertThat(consumeRecordsByTopic(1).recordsForTopic("server1.dbo.table_ac")).hasSize(1);
        stopConnector();
    }

    @Test
    @FixFor({"DBZ-1292"})
    public void shouldOutputRecordsInCloudEventsFormat() throws Exception {
        start(SqlServerConnector.class, TestHelper.defaultConfig().build());
        assertConnectorIsRunning();
        List<SourceRecord> recordsForTopic = consumeRecordsByTopic(500).recordsForTopic("server1.dbo.table1");
        Assertions.assertThat(recordsForTopic).hasSize(500);
        for (SourceRecord sourceRecord : recordsForTopic) {
            CloudEventsConverterTest.shouldConvertToCloudEventsInJson(sourceRecord, false);
            CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro(sourceRecord, false);
            CloudEventsConverterTest.shouldConvertToCloudEventsInAvro(sourceRecord, "sqlserver", "server1", false);
        }
        for (int i = 0; i < 500; i++) {
            int i2 = i + 500;
            this.connection.execute(new String[]{String.format("INSERT INTO table1 VALUES(%s, '%s', %s, '%s')", Integer.valueOf(i2), "name" + i2, new BigDecimal(i2 + ".23"), "2018-07-18 13:28:56")});
        }
        List<SourceRecord> recordsForTopic2 = consumeRecordsByTopic(500).recordsForTopic("server1.dbo.table1");
        Assertions.assertThat(recordsForTopic2).hasSize(500);
        for (SourceRecord sourceRecord2 : recordsForTopic2) {
            CloudEventsConverterTest.shouldConvertToCloudEventsInJson(sourceRecord2, false);
            CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro(sourceRecord2, false);
            CloudEventsConverterTest.shouldConvertToCloudEventsInAvro(sourceRecord2, "sqlserver", "server1", false);
        }
    }

    private void assertRecord(Struct struct, List<SchemaAndValueField> list) {
        list.forEach(schemaAndValueField -> {
            schemaAndValueField.assertFor(struct);
        });
    }
}
