package io.debezium.connector.sqlserver;

import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
import io.debezium.relational.TableId;
import io.debezium.util.Clock;
import io.debezium.util.Metronome;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Duration;
import java.util.Map;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.class */
public class SqlServerStreamingChangeEventSource implements StreamingChangeEventSource {
    private static final int COL_COMMIT_LSN = 1;
    private static final int COL_ROW_LSN = 2;
    private static final int COL_OPERATION = 3;
    private static final int COL_DATA = 5;
    private static final Logger LOGGER = LoggerFactory.getLogger(SqlServerStreamingChangeEventSource.class);
    private final SqlServerConnection connection;
    private final EventDispatcher<TableId> dispatcher;
    private final ErrorHandler errorHandler;
    private final Clock clock;
    private final SqlServerDatabaseSchema schema;
    private final SqlServerOffsetContext offsetContext;
    private final Duration pollInterval = Duration.ofSeconds(1);

    /* loaded from: input_file:io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource$ChangeTable.class */
    private static class ChangeTable {
        private final TableId tableId;
        private final ResultSet resultSet;
        private boolean completed = false;
        private Lsn currentChangeLsn;

        public ChangeTable(TableId tableId, ResultSet resultSet) {
            this.tableId = tableId;
            this.resultSet = resultSet;
        }

        public TableId getTableId() {
            return this.tableId;
        }

        public Lsn getCommitLsn() throws SQLException {
            return Lsn.valueOf(this.resultSet.getBytes(1));
        }

        public Lsn getRowLsn() throws SQLException {
            return this.currentChangeLsn;
        }

        public int getOperation() throws SQLException {
            return this.resultSet.getInt(3);
        }

        public Object[] getData() throws SQLException {
            int columnCount = this.resultSet.getMetaData().getColumnCount() - 4;
            Object[] objArr = new Object[columnCount];
            for (int i = 0; i < columnCount; i++) {
                objArr[i] = this.resultSet.getObject(SqlServerStreamingChangeEventSource.COL_DATA + i);
            }
            return objArr;
        }

        public boolean next() throws SQLException {
            this.completed = !this.resultSet.next();
            this.currentChangeLsn = this.completed ? Lsn.NULL : Lsn.valueOf(this.resultSet.getBytes(2));
            return !this.completed;
        }

        public boolean isCompleted() {
            return this.completed;
        }

        public int compareTo(ChangeTable changeTable) throws SQLException {
            return getRowLsn().compareTo(changeTable.getRowLsn());
        }

        public String toString() {
            return "ChangeTable [tableId=" + this.tableId + ", resultSet=" + this.resultSet + ", completed=" + this.completed + "]";
        }
    }

    public SqlServerStreamingChangeEventSource(SqlServerConnectorConfig sqlServerConnectorConfig, SqlServerOffsetContext sqlServerOffsetContext, SqlServerConnection sqlServerConnection, EventDispatcher<TableId> eventDispatcher, ErrorHandler errorHandler, Clock clock, SqlServerDatabaseSchema sqlServerDatabaseSchema) {
        this.connection = sqlServerConnection;
        this.dispatcher = eventDispatcher;
        this.errorHandler = errorHandler;
        this.clock = clock;
        this.schema = sqlServerDatabaseSchema;
        this.offsetContext = sqlServerOffsetContext;
    }

    public void execute(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext) throws InterruptedException {
        Metronome sleeper = Metronome.sleeper(this.pollInterval, this.clock);
        try {
            TableId[] tableIdArr = (TableId[]) this.schema.getCapturedTables().toArray(new TableId[this.schema.getCapturedTables().size()]);
            Lsn changeLsn = this.offsetContext.getChangeLsn();
            while (changeEventSourceContext.isRunning()) {
                Lsn maxLsn = this.connection.getMaxLsn();
                if (!maxLsn.isAvailable()) {
                    LOGGER.debug("No maximum LSN recorded in the database");
                    sleeper.pause();
                } else if (maxLsn.equals(changeLsn)) {
                    LOGGER.debug("No change in the database");
                    sleeper.pause();
                } else {
                    this.connection.getChangesForTables(tableIdArr, changeLsn.isAvailable() ? this.connection.incrementLsn(changeLsn) : changeLsn, maxLsn, resultSetArr -> {
                        TableId tableId;
                        Lsn rowLsn;
                        int length = resultSetArr.length;
                        ChangeTable[] changeTableArr = new ChangeTable[length];
                        for (int i = 0; i < length; i++) {
                            changeTableArr[i] = new ChangeTable(tableIdArr[i], resultSetArr[i]);
                            changeTableArr[i].next();
                        }
                        while (true) {
                            ChangeTable changeTable = null;
                            for (int i2 = 0; i2 < length; i2++) {
                                ChangeTable changeTable2 = changeTableArr[i2];
                                if (!changeTable2.isCompleted() && (changeTable == null || changeTable2.compareTo(changeTable) < 0)) {
                                    changeTable = changeTable2;
                                }
                            }
                            if (changeTable == null) {
                                return;
                            }
                            LOGGER.trace("Processing change {}", changeTable);
                            tableId = changeTable.getTableId();
                            Lsn commitLsn = changeTable.getCommitLsn();
                            rowLsn = changeTable.getRowLsn();
                            int operation = changeTable.getOperation();
                            Object[] data = changeTable.getData();
                            if (operation != 3 || (changeTable.next() && changeTable.getOperation() == 4)) {
                                Object[] data2 = operation == 3 ? changeTable.getData() : null;
                                this.offsetContext.setChangeLsn(rowLsn);
                                this.offsetContext.setCommitLsn(commitLsn);
                                this.offsetContext.setSourceTime(this.connection.timestampOfLsn(commitLsn));
                                this.dispatcher.dispatchDataChangeEvent(tableId, new SqlServerChangeRecordEmitter(this.offsetContext, operation, data, data2, this.schema.tableFor(tableId), this.clock));
                                changeTable.next();
                            }
                        }
                        throw new IllegalStateException("The update before event at " + rowLsn + " for table " + tableId + " was not followed by after event.\n Please report this as a bug together with a events around given LSN.");
                    });
                    changeLsn = maxLsn;
                }
            }
        } catch (Exception e) {
            throw new ConnectException(e);
        }
    }

    public void commitOffset(Map<String, ?> map) {
    }
}
