package io.debezium.connector.sqlserver;

import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
import io.debezium.relational.TableId;
import io.debezium.util.Clock;
import io.debezium.util.Metronome;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.class */
public class SqlServerStreamingChangeEventSource implements StreamingChangeEventSource {
    private static final int COL_COMMIT_LSN = 1;
    private static final int COL_ROW_LSN = 2;
    private static final int COL_OPERATION = 3;
    private static final int COL_DATA = 5;
    private static final Pattern MISSING_CDC_FUNCTION_CHANGES_ERROR = Pattern.compile("Invalid object name 'cdc.fn_cdc_get_all_changes_(.*)'\\.");
    private static final Logger LOGGER = LoggerFactory.getLogger(SqlServerStreamingChangeEventSource.class);
    private final SqlServerConnection connection;
    private final EventDispatcher<TableId> dispatcher;
    private final ErrorHandler errorHandler;
    private final Clock clock;
    private final SqlServerDatabaseSchema schema;
    private final SqlServerOffsetContext offsetContext;
    private final Duration pollInterval;
    private final SqlServerConnectorConfig connectorConfig;

    /* loaded from: input_file:io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource$ChangeTablePointer.class */
    private static class ChangeTablePointer {
        private final ChangeTable changeTable;
        private final ResultSet resultSet;
        private boolean completed = false;
        private Lsn currentChangeLsn;

        public ChangeTablePointer(ChangeTable changeTable, ResultSet resultSet) {
            this.changeTable = changeTable;
            this.resultSet = resultSet;
        }

        public ChangeTable getChangeTable() {
            return this.changeTable;
        }

        public Lsn getCommitLsn() throws SQLException {
            return Lsn.valueOf(this.resultSet.getBytes(1));
        }

        public Lsn getRowLsn() throws SQLException {
            return this.currentChangeLsn;
        }

        public int getOperation() throws SQLException {
            return this.resultSet.getInt(3);
        }

        public Object[] getData() throws SQLException {
            int columnCount = this.resultSet.getMetaData().getColumnCount() - 4;
            Object[] objArr = new Object[columnCount];
            for (int i = 0; i < columnCount; i++) {
                objArr[i] = this.resultSet.getObject(SqlServerStreamingChangeEventSource.COL_DATA + i);
            }
            return objArr;
        }

        public boolean next() throws SQLException {
            this.completed = !this.resultSet.next();
            this.currentChangeLsn = this.completed ? Lsn.NULL : Lsn.valueOf(this.resultSet.getBytes(2));
            if (this.completed) {
                SqlServerStreamingChangeEventSource.LOGGER.trace("Closing result set of change tables for table {}", this.changeTable);
                this.resultSet.close();
            }
            return !this.completed;
        }

        public boolean isCompleted() {
            return this.completed;
        }

        public int compareTo(ChangeTablePointer changeTablePointer) throws SQLException {
            return getRowLsn().compareTo(changeTablePointer.getRowLsn());
        }

        public String toString() {
            return "ChangeTablePointer [changeTable=" + this.changeTable + ", resultSet=" + this.resultSet + ", completed=" + this.completed + ", currentChangeLsn=" + this.currentChangeLsn + "]";
        }
    }

    public SqlServerStreamingChangeEventSource(SqlServerConnectorConfig sqlServerConnectorConfig, SqlServerOffsetContext sqlServerOffsetContext, SqlServerConnection sqlServerConnection, EventDispatcher<TableId> eventDispatcher, ErrorHandler errorHandler, Clock clock, SqlServerDatabaseSchema sqlServerDatabaseSchema) {
        this.connectorConfig = sqlServerConnectorConfig;
        this.connection = sqlServerConnection;
        this.dispatcher = eventDispatcher;
        this.errorHandler = errorHandler;
        this.clock = clock;
        this.schema = sqlServerDatabaseSchema;
        this.offsetContext = sqlServerOffsetContext;
        this.pollInterval = sqlServerConnectorConfig.getPollInterval();
    }

    public void execute(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext) throws InterruptedException {
        Metronome sleeper = Metronome.sleeper(this.pollInterval, this.clock);
        PriorityQueue priorityQueue = new PriorityQueue((changeTable, changeTable2) -> {
            return changeTable.getStopLsn().compareTo(changeTable2.getStopLsn());
        });
        try {
            AtomicReference atomicReference = new AtomicReference(getCdcTablesToQuery());
            Lsn changeLsn = this.offsetContext.getChangeLsn();
            LOGGER.info("Last LSN recorded in offsets is {}", changeLsn);
            Lsn changeLsn2 = this.offsetContext.getChangeLsn();
            while (changeEventSourceContext.isRunning()) {
                Lsn maxLsn = this.connection.getMaxLsn();
                if (!maxLsn.isAvailable()) {
                    LOGGER.debug("No maximum LSN recorded in the database");
                    sleeper.pause();
                } else if (maxLsn.equals(changeLsn2)) {
                    LOGGER.debug("No change in the database");
                    sleeper.pause();
                } else {
                    Lsn incrementLsn = changeLsn2.isAvailable() ? this.connection.incrementLsn(changeLsn2) : changeLsn2;
                    while (!priorityQueue.isEmpty()) {
                        migrateTable(priorityQueue);
                    }
                    if (!this.connection.listOfNewChangeTables(incrementLsn, maxLsn).isEmpty()) {
                        ChangeTable[] cdcTablesToQuery = getCdcTablesToQuery();
                        atomicReference.set(cdcTablesToQuery);
                        for (ChangeTable changeTable3 : cdcTablesToQuery) {
                            if (changeTable3.getStartLsn().isBetween(incrementLsn, maxLsn)) {
                                LOGGER.info("Schema will be changed for {}", changeTable3);
                                priorityQueue.add(changeTable3);
                            }
                        }
                    }
                    try {
                        this.connection.getChangesForTables((ChangeTable[]) atomicReference.get(), incrementLsn, maxLsn, resultSetArr -> {
                            TableId sourceTableId;
                            Lsn rowLsn;
                            int length = resultSetArr.length;
                            ChangeTablePointer[] changeTablePointerArr = new ChangeTablePointer[length];
                            ChangeTable[] changeTableArr = (ChangeTable[]) atomicReference.get();
                            for (int i = 0; i < length; i++) {
                                changeTablePointerArr[i] = new ChangeTablePointer(changeTableArr[i], resultSetArr[i]);
                                changeTablePointerArr[i].next();
                            }
                            while (true) {
                                ChangeTablePointer changeTablePointer = null;
                                for (ChangeTablePointer changeTablePointer2 : changeTablePointerArr) {
                                    if (!changeTablePointer2.isCompleted() && (changeTablePointer == null || changeTablePointer2.compareTo(changeTablePointer) < 0)) {
                                        changeTablePointer = changeTablePointer2;
                                    }
                                }
                                if (changeTablePointer == null) {
                                    return;
                                }
                                if (!changeTablePointer.getRowLsn().isAvailable()) {
                                    LOGGER.error("Skipping change {} as its LSN is NULL which is not expected", changeTablePointer);
                                    changeTablePointer.next();
                                } else if (changeTablePointer.getRowLsn().compareTo(changeLsn) <= 0) {
                                    LOGGER.info("Skipping change {} as its LSN is smaller than the last recorded LSN {}", changeTablePointer, changeLsn);
                                    changeTablePointer.next();
                                } else if (!changeTablePointer.getChangeTable().getStopLsn().isAvailable() || changeTablePointer.getChangeTable().getStopLsn().compareTo(changeTablePointer.getRowLsn()) > 0) {
                                    LOGGER.trace("Processing change {}", changeTablePointer);
                                    if (!priorityQueue.isEmpty() && changeTablePointer.getRowLsn().compareTo(((ChangeTable) priorityQueue.peek()).getStopLsn()) >= 0) {
                                        migrateTable(priorityQueue);
                                    }
                                    sourceTableId = changeTablePointer.getChangeTable().getSourceTableId();
                                    Lsn commitLsn = changeTablePointer.getCommitLsn();
                                    rowLsn = changeTablePointer.getRowLsn();
                                    int operation = changeTablePointer.getOperation();
                                    Object[] data = changeTablePointer.getData();
                                    if (operation != 3 || (changeTablePointer.next() && changeTablePointer.getOperation() == 4)) {
                                        Object[] data2 = operation == 3 ? changeTablePointer.getData() : null;
                                        this.offsetContext.setChangeLsn(rowLsn);
                                        this.offsetContext.setCommitLsn(commitLsn);
                                        this.offsetContext.setSourceTime(this.connection.timestampOfLsn(commitLsn));
                                        this.dispatcher.dispatchDataChangeEvent(sourceTableId, new SqlServerChangeRecordEmitter(this.offsetContext, operation, data, data2, this.schema.tableFor(sourceTableId), this.clock));
                                        changeTablePointer.next();
                                    }
                                } else {
                                    LOGGER.debug("Skipping table change {} as its stop LSN is smaller than the last recorded LSN {}", changeTablePointer, changeTablePointer.getRowLsn());
                                    changeTablePointer.next();
                                }
                            }
                            throw new IllegalStateException("The update before event at " + rowLsn + " for table " + sourceTableId + " was not followed by after event.\n Please report this as a bug together with a events around given LSN.");
                        });
                        changeLsn2 = maxLsn;
                        this.connection.rollback();
                    } catch (SQLException e) {
                        atomicReference.set(processErrorFromChangeTableQuery(e, (ChangeTable[]) atomicReference.get()));
                    }
                }
            }
        } catch (Exception e2) {
            this.errorHandler.setProducerThrowable(e2);
        }
    }

    private void migrateTable(Queue<ChangeTable> queue) throws InterruptedException, SQLException {
        ChangeTable poll = queue.poll();
        LOGGER.info("Migrating schema to {}", poll);
        this.dispatcher.dispatchSchemaChangeEvent(poll.getSourceTableId(), new SqlServerSchemaChangeEventEmitter(this.offsetContext, poll, this.connection.getTableSchemaFromTable(poll)));
    }

    private ChangeTable[] processErrorFromChangeTableQuery(SQLException sQLException, ChangeTable[] changeTableArr) throws Exception {
        Matcher matcher = MISSING_CDC_FUNCTION_CHANGES_ERROR.matcher(sQLException.getMessage());
        if (!matcher.matches()) {
            throw sQLException;
        }
        String group = matcher.group(1);
        LOGGER.info("Table is no longer captured with capture instance {}", group);
        return (ChangeTable[]) ((List) Arrays.asList(changeTableArr).stream().filter(changeTable -> {
            return !changeTable.getCaptureInstance().equals(group);
        }).collect(Collectors.toList())).toArray(new ChangeTable[0]);
    }

    private ChangeTable[] getCdcTablesToQuery() throws SQLException, InterruptedException {
        Map map = (Map) this.connection.listOfChangeTables().stream().filter(changeTable -> {
            if (this.connectorConfig.getTableFilters().dataCollectionFilter().isIncluded(changeTable.getSourceTableId())) {
                return true;
            }
            LOGGER.info("CDC is enabled for table {} but the table is not whitelisted by connector");
            return false;
        }).collect(Collectors.groupingBy(changeTable2 -> {
            return changeTable2.getSourceTableId();
        }));
        ArrayList arrayList = new ArrayList();
        for (List list : map.values()) {
            ChangeTable changeTable3 = (ChangeTable) list.get(0);
            if (list.size() > 1) {
                if (((ChangeTable) list.get(0)).getStartLsn().compareTo(((ChangeTable) list.get(1)).getStartLsn()) < 0) {
                    ((ChangeTable) list.get(0)).setStopLsn(((ChangeTable) list.get(1)).getStartLsn());
                    arrayList.add(list.get(1));
                } else {
                    ((ChangeTable) list.get(1)).setStopLsn(((ChangeTable) list.get(0)).getStartLsn());
                    changeTable3 = (ChangeTable) list.get(1);
                    arrayList.add(list.get(0));
                }
                LOGGER.info("Multiple capture instances {} and {} present for the same table", changeTable3, list.get(1));
            }
            if (this.schema.tableFor(changeTable3.getSourceTableId()) == null) {
                LOGGER.info("Table {} is new to be monitored by capture instance {}", changeTable3.getSourceTableId(), changeTable3.getCaptureInstance());
                this.dispatcher.dispatchSchemaChangeEvent(changeTable3.getSourceTableId(), new SqlServerSchemaChangeEventEmitter(this.offsetContext, changeTable3, this.connection.getTableSchemaFromTable(changeTable3)));
            }
            arrayList.add(changeTable3);
        }
        return (ChangeTable[]) arrayList.toArray(new ChangeTable[arrayList.size()]);
    }

    public void commitOffset(Map<String, ?> map) {
    }
}
