package io.debezium.connector.sqlserver;

import io.debezium.config.Configuration;
import io.debezium.connector.sqlserver.SqlServerConnectorConfig;
import io.debezium.connector.sqlserver.util.TestHelper;
import io.debezium.data.SchemaAndValueField;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.junit.EqualityCheck;
import io.debezium.junit.SkipTestRule;
import io.debezium.junit.SkipWhenKafkaVersion;
import io.debezium.util.Collect;
import io.debezium.util.Testing;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.awaitility.Awaitility;
import org.fest.assertions.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

@SkipWhenKafkaVersion(check = EqualityCheck.EQUAL, value = SkipWhenKafkaVersion.KafkaVersion.KAFKA_1XX, description = "Not compatible with Kafka 1.x")
/* loaded from: input_file:io/debezium/connector/sqlserver/TransactionMetadataIT.class */
public class TransactionMetadataIT extends AbstractConnectorTest {
    private SqlServerConnection connection;

    @Rule
    public SkipTestRule skipRule = new SkipTestRule();

    @Before
    public void before() throws SQLException {
        TestHelper.createTestDatabase();
        this.connection = TestHelper.testConnection();
        this.connection.execute(new String[]{"CREATE TABLE tablea (id int primary key, cola varchar(30))", "CREATE TABLE tableb (id int primary key, colb varchar(30))", "INSERT INTO tablea VALUES(1, 'a')"});
        TestHelper.enableTableCdc(this.connection, "tablea");
        TestHelper.enableTableCdc(this.connection, "tableb");
        initializeConnectorTestFramework();
        Testing.Files.delete(TestHelper.DB_HISTORY_PATH);
    }

    @After
    public void after() throws SQLException {
        if (this.connection != null) {
            this.connection.close();
        }
    }

    @Test
    public void transactionMetadata() throws Exception {
        start(SqlServerConnector.class, TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.INITIAL).with(SqlServerConnectorConfig.PROVIDE_TRANSACTION_METADATA, true).build());
        assertConnectorIsRunning();
        consumeRecordsByTopic(1);
        this.connection.setAutoCommit(false);
        String[] strArr = new String[10];
        for (int i = 0; i < 5; i++) {
            int i2 = 10 + i;
            strArr[2 * i] = "INSERT INTO tablea VALUES(" + i2 + ", 'a')";
            strArr[(2 * i) + 1] = "INSERT INTO tableb VALUES(" + i2 + ", 'b')";
        }
        this.connection.execute(strArr);
        this.connection.setAutoCommit(true);
        this.connection.execute(new String[]{"INSERT INTO tableb VALUES(1000, 'b')"});
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(14);
        List recordsForTopic = consumeRecordsByTopic.recordsForTopic("server1.dbo.tablea");
        List recordsForTopic2 = consumeRecordsByTopic.recordsForTopic("server1.dbo.tableb");
        List recordsForTopic3 = consumeRecordsByTopic.recordsForTopic("server1.transaction");
        Assertions.assertThat(recordsForTopic).hasSize(5);
        Assertions.assertThat(recordsForTopic2).hasSize(6);
        Assertions.assertThat(recordsForTopic3).hasSize(3);
        List allRecordsInOrder = consumeRecordsByTopic.allRecordsInOrder();
        String assertBeginTransaction = assertBeginTransaction((SourceRecord) allRecordsInOrder.get(0));
        long j = 1;
        for (int i3 = 1; i3 <= 10; i3++) {
            assertRecordTransactionMetadata((SourceRecord) allRecordsInOrder.get(i3), assertBeginTransaction, j, (j + 1) / 2);
            j++;
        }
        assertEndTransaction((SourceRecord) allRecordsInOrder.get(11), assertBeginTransaction, 10L, Collect.hashMapOf("testDB.dbo.tablea", 5, "testDB.dbo.tableb", 5));
        stopConnector();
    }

    private void restartInTheMiddleOfTx(boolean z, boolean z2) throws Exception {
        Configuration build = TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.INITIAL).with(SqlServerConnectorConfig.PROVIDE_TRANSACTION_METADATA, true).build();
        if (z) {
            start(SqlServerConnector.class, build);
            assertConnectorIsRunning();
            consumeRecordsByTopic(1);
            stopConnector();
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(-1, '-a')"});
            Awaitility.await().atMost(30L, TimeUnit.SECONDS).until(() -> {
                if (!this.connection.getMaxLsn().isAvailable()) {
                    return false;
                }
                for (SqlServerChangeTable sqlServerChangeTable : this.connection.listOfChangeTables()) {
                    String table = sqlServerChangeTable.getChangeTableId().table();
                    if (table.endsWith("dbo_" + this.connection.getNameOfChangeTable("tablea"))) {
                        try {
                            Lsn minLsn = this.connection.getMinLsn(table);
                            Lsn maxLsn = this.connection.getMaxLsn();
                            AtomicReference atomicReference = new AtomicReference(false);
                            this.connection.getChangesForTables((SqlServerChangeTable[]) Collections.singletonList(sqlServerChangeTable).toArray(new SqlServerChangeTable[0]), minLsn, maxLsn, resultSetArr -> {
                                ResultSet resultSet = resultSetArr[0];
                                while (resultSet.next()) {
                                    if (resultSet.getInt("id") == -1) {
                                        atomicReference.set(true);
                                        return;
                                    }
                                }
                            });
                            return (Boolean) atomicReference.get();
                        } catch (Exception e) {
                            Assert.fail("Failed to fetch changes for tablea: " + e.getMessage());
                        }
                    }
                }
                return false;
            });
        }
        start(SqlServerConnector.class, build, sourceRecord -> {
            if (!"server1.dbo.tablea.Envelope".equals(sourceRecord.valueSchema().name())) {
                return false;
            }
            Struct struct = ((Struct) sourceRecord.value()).getStruct("after");
            Integer int32 = struct.getInt32("id");
            return int32 != null && int32.intValue() == 25 && "a".equals(struct.getString("cola"));
        });
        assertConnectorIsRunning();
        String assertBeginTransaction = z ? assertBeginTransaction((SourceRecord) consumeRecordsByTopic(1).allRecordsInOrder().get(0)) : null;
        consumeRecordsByTopic(1);
        if (z2) {
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(-2, '-a')"});
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(2);
            assertRecord(((Struct) ((SourceRecord) consumeRecordsByTopic.allRecordsInOrder().get(1)).value()).getStruct("after"), Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, -2), new SchemaAndValueField("cola", Schema.OPTIONAL_STRING_SCHEMA, "-a")));
            assertBeginTransaction = assertBeginTransaction((SourceRecord) consumeRecordsByTopic.allRecordsInOrder().get(0));
        }
        this.connection.setAutoCommit(false);
        for (int i = 0; i < 30; i++) {
            int i2 = 10 + i;
            this.connection.executeWithoutCommitting(new String[]{"INSERT INTO tablea VALUES(" + i2 + ", 'a')"});
            this.connection.executeWithoutCommitting(new String[]{"INSERT INTO tableb VALUES(" + i2 + ", 'b')"});
        }
        this.connection.connection().commit();
        int i3 = assertBeginTransaction != null ? 1 : 0;
        int i4 = i3 + 1 + 30;
        List allRecordsInOrder = consumeRecordsByTopic(i4).allRecordsInOrder();
        Assertions.assertThat(allRecordsInOrder).hasSize(i4);
        if (assertBeginTransaction != null) {
            assertEndTransaction((SourceRecord) allRecordsInOrder.get(0), assertBeginTransaction, 1L, Collect.hashMapOf("testDB.dbo.tablea", 1));
        }
        String assertBeginTransaction2 = assertBeginTransaction((SourceRecord) allRecordsInOrder.get(i3));
        SourceRecord sourceRecord2 = (SourceRecord) allRecordsInOrder.get(30 + i3);
        assertRecord((Struct) ((Struct) sourceRecord2.value()).get("after"), Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, 24), new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, "b")));
        assertRecordTransactionMetadata(sourceRecord2, assertBeginTransaction2, 30L, 15L);
        stopConnector();
        start(SqlServerConnector.class, build);
        assertConnectorIsRunning();
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(30);
        Assertions.assertThat(consumeRecordsByTopic2.allRecordsInOrder()).hasSize(30);
        List recordsForTopic = consumeRecordsByTopic2.recordsForTopic("server1.dbo.tablea");
        List recordsForTopic2 = consumeRecordsByTopic2.recordsForTopic("server1.dbo.tableb");
        for (int i5 = 0; i5 < 15; i5++) {
            int i6 = 25 + i5;
            SourceRecord sourceRecord3 = (SourceRecord) recordsForTopic.get(i5);
            SourceRecord sourceRecord4 = (SourceRecord) recordsForTopic2.get(i5);
            List<SchemaAndValueField> asList = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, Integer.valueOf(i6)), new SchemaAndValueField("cola", Schema.OPTIONAL_STRING_SCHEMA, "a"));
            List<SchemaAndValueField> asList2 = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, Integer.valueOf(i6)), new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, "b"));
            Struct struct = (Struct) sourceRecord3.value();
            assertRecord((Struct) struct.get("after"), asList);
            Assert.assertNull(struct.get("before"));
            Struct struct2 = (Struct) sourceRecord4.value();
            assertRecord((Struct) struct2.get("after"), asList2);
            Assert.assertNull(struct2.get("before"));
            assertRecordTransactionMetadata(sourceRecord3, assertBeginTransaction2, 30 + (2 * i5) + 1, 15 + i5 + 1);
            assertRecordTransactionMetadata(sourceRecord4, assertBeginTransaction2, 30 + (2 * i5) + 2, 15 + i5 + 1);
        }
        for (int i7 = 0; i7 < 30; i7++) {
            int i8 = 1000 + i7;
            this.connection.executeWithoutCommitting(new String[]{"INSERT INTO tablea VALUES(" + i8 + ", 'a')"});
            this.connection.executeWithoutCommitting(new String[]{"INSERT INTO tableb VALUES(" + i8 + ", 'b')"});
            this.connection.connection().commit();
        }
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic3 = consumeRecordsByTopic(120);
        List recordsForTopic3 = consumeRecordsByTopic3.recordsForTopic("server1.dbo.tablea");
        List recordsForTopic4 = consumeRecordsByTopic3.recordsForTopic("server1.dbo.tableb");
        List recordsForTopic5 = consumeRecordsByTopic3.recordsForTopic("server1.transaction");
        Assertions.assertThat(recordsForTopic3).hasSize(30);
        Assertions.assertThat(recordsForTopic4).hasSize(30);
        Assertions.assertThat(recordsForTopic5).hasSize(60);
        assertEndTransaction((SourceRecord) recordsForTopic5.get(0), assertBeginTransaction2, 60L, Collect.hashMapOf("testDB.dbo.tablea", 30, "testDB.dbo.tableb", 30));
        for (int i9 = 0; i9 < 30; i9++) {
            int i10 = i9 + 1000;
            SourceRecord sourceRecord5 = (SourceRecord) recordsForTopic3.get(i9);
            SourceRecord sourceRecord6 = (SourceRecord) recordsForTopic4.get(i9);
            List<SchemaAndValueField> asList3 = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, Integer.valueOf(i10)), new SchemaAndValueField("cola", Schema.OPTIONAL_STRING_SCHEMA, "a"));
            List<SchemaAndValueField> asList4 = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, Integer.valueOf(i10)), new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, "b"));
            Struct struct3 = (Struct) sourceRecord5.value();
            assertRecord((Struct) struct3.get("after"), asList3);
            Assert.assertNull(struct3.get("before"));
            Struct struct4 = (Struct) sourceRecord6.value();
            assertRecord((Struct) struct4.get("after"), asList4);
            Assert.assertNull(struct4.get("before"));
            String assertBeginTransaction3 = assertBeginTransaction((SourceRecord) recordsForTopic5.get((2 * i9) + 1));
            assertRecordTransactionMetadata(sourceRecord5, assertBeginTransaction3, 1L, 1L);
            assertRecordTransactionMetadata(sourceRecord6, assertBeginTransaction3, 2L, 1L);
            if (i9 < 29) {
                assertEndTransaction((SourceRecord) recordsForTopic5.get((2 * i9) + 2), assertBeginTransaction3, 2L, Collect.hashMapOf("testDB.dbo.tablea", 1, "testDB.dbo.tableb", 1));
            }
        }
    }

    @Test
    public void restartInTheMiddleOfTxAfterSnapshot() throws Exception {
        restartInTheMiddleOfTx(true, false);
    }

    @Test
    public void restartInTheMiddleOfTxAfterCompletedTx() throws Exception {
        restartInTheMiddleOfTx(false, true);
    }

    @Test
    public void restartInTheMiddleOfTx() throws Exception {
        restartInTheMiddleOfTx(false, false);
    }

    private void assertRecord(Struct struct, List<SchemaAndValueField> list) {
        list.forEach(schemaAndValueField -> {
            schemaAndValueField.assertFor(struct);
        });
    }
}
