package io.debezium.connector.sqlserver;

import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.notification.Notification;
import io.debezium.pipeline.notification.NotificationService;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.schema.SchemaChangeEvent;
import io.debezium.snapshot.SnapshotterService;
import io.debezium.util.Clock;
import io.debezium.util.ElapsedTimeStrategy;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/sqlserver/SqlServerStreamingChangeEventSource.class */
public class SqlServerStreamingChangeEventSource implements StreamingChangeEventSource<SqlServerPartition, SqlServerOffsetContext> {
    private static final Pattern MISSING_CDC_FUNCTION_CHANGES_ERROR = Pattern.compile("Invalid object name '(.*)\\.cdc.fn_cdc_get_all_changes_(.*)'\\.");
    private static final Logger LOGGER = LoggerFactory.getLogger(SqlServerStreamingChangeEventSource.class);
    private static final Duration DEFAULT_INTERVAL_BETWEEN_COMMITS = Duration.ofMinutes(1);
    private static final int INTERVAL_BETWEEN_COMMITS_BASED_ON_POLL_FACTOR = 3;
    private final SqlServerConnection dataConnection;
    private final SqlServerConnection metadataConnection;
    private final EventDispatcher<SqlServerPartition, TableId> dispatcher;
    private final ErrorHandler errorHandler;
    private final Clock clock;
    private final SqlServerDatabaseSchema schema;
    private final Duration pollInterval;
    private final SnapshotterService snapshotterService;
    private final SqlServerConnectorConfig connectorConfig;
    private final ElapsedTimeStrategy pauseBetweenCommits;
    private final Map<SqlServerPartition, SqlServerStreamingExecutionContext> streamingExecutionContexts;
    private final Map<SqlServerPartition, Set<SqlServerChangeTable>> changeTablesWithKnownStopLsn = new HashMap();
    private boolean checkAgent;
    private SqlServerOffsetContext effectiveOffset;
    private final NotificationService<SqlServerPartition, SqlServerOffsetContext> notificationService;

    public SqlServerStreamingChangeEventSource(SqlServerConnectorConfig sqlServerConnectorConfig, SqlServerConnection sqlServerConnection, SqlServerConnection sqlServerConnection2, EventDispatcher<SqlServerPartition, TableId> eventDispatcher, ErrorHandler errorHandler, Clock clock, SqlServerDatabaseSchema sqlServerDatabaseSchema, NotificationService<SqlServerPartition, SqlServerOffsetContext> notificationService, SnapshotterService snapshotterService) {
        this.connectorConfig = sqlServerConnectorConfig;
        this.dataConnection = sqlServerConnection;
        this.metadataConnection = sqlServerConnection2;
        this.dispatcher = eventDispatcher;
        this.errorHandler = errorHandler;
        this.clock = clock;
        this.schema = sqlServerDatabaseSchema;
        this.notificationService = notificationService;
        this.pollInterval = sqlServerConnectorConfig.getPollInterval();
        this.snapshotterService = snapshotterService;
        Duration multipliedBy = this.pollInterval.multipliedBy(3L);
        this.pauseBetweenCommits = ElapsedTimeStrategy.constant(clock, DEFAULT_INTERVAL_BETWEEN_COMMITS.compareTo(multipliedBy) > 0 ? DEFAULT_INTERVAL_BETWEEN_COMMITS.toMillis() : multipliedBy.toMillis());
        this.streamingExecutionContexts = new HashMap();
        this.checkAgent = true;
    }

    public void init(SqlServerOffsetContext sqlServerOffsetContext) throws InterruptedException {
        this.effectiveOffset = sqlServerOffsetContext == null ? new SqlServerOffsetContext(this.connectorConfig, TxLogPosition.NULL, false, false) : sqlServerOffsetContext;
    }

    public void execute(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext, SqlServerPartition sqlServerPartition, SqlServerOffsetContext sqlServerOffsetContext) throws InterruptedException {
        throw new UnsupportedOperationException("Currently unsupported by the SQL Server connector");
    }

    public boolean executeIteration(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext, SqlServerPartition sqlServerPartition, SqlServerOffsetContext sqlServerOffsetContext) {
        SqlServerStreamingExecutionContext orDefault;
        Queue<SqlServerChangeTable> schemaChangeCheckpoints;
        AtomicReference<SqlServerChangeTable[]> tablesSlot;
        TxLogPosition changePosition;
        long eventSerialNo;
        AtomicBoolean changesStoppedBeingMonotonic;
        int maxTransactionsPerIteration;
        TxLogPosition lastProcessedPosition;
        if (!this.snapshotterService.getSnapshotter().shouldStream()) {
            LOGGER.info("Streaming is disabled for snapshot mode {}", this.snapshotterService.getSnapshotter().name());
            return false;
        }
        String databaseName = sqlServerPartition.getDatabaseName();
        this.effectiveOffset = sqlServerOffsetContext;
        try {
            orDefault = this.streamingExecutionContexts.getOrDefault(sqlServerPartition, new SqlServerStreamingExecutionContext(new PriorityQueue(Comparator.comparing((v0) -> {
                return v0.getStopLsn();
            })), new AtomicReference(), sqlServerOffsetContext.getChangePosition(), new AtomicBoolean(false), sqlServerOffsetContext.isSnapshotCompleted()));
            if (!this.streamingExecutionContexts.containsKey(sqlServerPartition)) {
                this.streamingExecutionContexts.put(sqlServerPartition, orDefault);
                LOGGER.info("Last position recorded in offsets is {}[{}]", sqlServerOffsetContext.getChangePosition(), Long.valueOf(sqlServerOffsetContext.getEventSerialNo()));
            }
            schemaChangeCheckpoints = orDefault.getSchemaChangeCheckpoints();
            tablesSlot = orDefault.getTablesSlot();
            changePosition = sqlServerOffsetContext.getChangePosition();
            eventSerialNo = sqlServerOffsetContext.getEventSerialNo();
            changesStoppedBeingMonotonic = orDefault.getChangesStoppedBeingMonotonic();
            maxTransactionsPerIteration = this.connectorConfig.getMaxTransactionsPerIteration();
            lastProcessedPosition = orDefault.getLastProcessedPosition();
        } catch (Exception e) {
            this.errorHandler.setProducerThrowable(e);
            return true;
        }
        if (changeEventSourceContext.isRunning()) {
            commitTransaction();
            Lsn toLsn = getToLsn(this.dataConnection, databaseName, lastProcessedPosition, maxTransactionsPerIteration);
            if (!toLsn.isAvailable()) {
                if (!this.checkAgent) {
                    return false;
                }
                try {
                    if (!this.dataConnection.isAgentRunning(databaseName)) {
                        LOGGER.error("No maximum LSN recorded in the database; SQL Server Agent is not running");
                    }
                } catch (SQLException e2) {
                    LOGGER.warn("No maximum LSN recorded in the database; this may happen if there are no changes recorded in the change table yet or low activity database where the cdc clean up job periodically clears entries from the cdc tables. Otherwise, this may be an indication that the SQL Server Agent is not running. You should follow the documentation on how to configure SQL Server Agent running status query.");
                    LOGGER.warn("Cannot query the status of the SQL Server Agent", e2);
                }
                this.checkAgent = false;
                return false;
            }
            if (!this.checkAgent) {
                this.checkAgent = true;
            }
            if (toLsn.compareTo(lastProcessedPosition.getCommitLsn()) <= 0 && orDefault.getShouldIncreaseFromLsn()) {
                LOGGER.debug("No change in the database");
                this.dispatcher.dispatchHeartbeatEvent(sqlServerPartition, sqlServerOffsetContext);
                return false;
            }
            Lsn incrementLsn = (lastProcessedPosition.getCommitLsn().isAvailable() && orDefault.getShouldIncreaseFromLsn()) ? this.dataConnection.incrementLsn(databaseName, lastProcessedPosition.getCommitLsn()) : lastProcessedPosition.getCommitLsn();
            orDefault.setShouldIncreaseFromLsn(true);
            while (!schemaChangeCheckpoints.isEmpty()) {
                migrateTable(sqlServerPartition, schemaChangeCheckpoints, sqlServerOffsetContext);
            }
            if (!this.dataConnection.getNewChangeTables(databaseName, incrementLsn, toLsn).isEmpty()) {
                SqlServerChangeTable[] changeTablesToQuery = getChangeTablesToQuery(sqlServerPartition, sqlServerOffsetContext, toLsn);
                tablesSlot.set(changeTablesToQuery);
                for (SqlServerChangeTable sqlServerChangeTable : changeTablesToQuery) {
                    if (sqlServerChangeTable.getStartLsn().isBetween(incrementLsn, toLsn)) {
                        LOGGER.info("Schema will be changed for {}", sqlServerChangeTable);
                        schemaChangeCheckpoints.add(sqlServerChangeTable);
                    }
                }
                collectChangeTablesWithKnownStopLsn(sqlServerPartition, changeTablesToQuery);
            }
            if (tablesSlot.get() == null) {
                tablesSlot.set(getChangeTablesToQuery(sqlServerPartition, sqlServerOffsetContext, toLsn));
                collectChangeTablesWithKnownStopLsn(sqlServerPartition, tablesSlot.get());
            }
            try {
                this.dataConnection.getChangesForTables(databaseName, tablesSlot.get(), incrementLsn, toLsn, resultSetArr -> {
                    SqlServerChangeTablePointer sqlServerChangeTablePointer;
                    TableId sourceTableId;
                    long j = 1;
                    int length = resultSetArr.length;
                    SqlServerChangeTablePointer[] sqlServerChangeTablePointerArr = new SqlServerChangeTablePointer[length];
                    SqlServerChangeTable[] sqlServerChangeTableArr = (SqlServerChangeTable[]) tablesSlot.get();
                    for (int i = 0; i < length; i++) {
                        sqlServerChangeTablePointerArr[i] = new SqlServerChangeTablePointer(sqlServerChangeTableArr[i], resultSetArr[i]);
                        sqlServerChangeTablePointerArr[i].next();
                    }
                    while (true) {
                        sqlServerChangeTablePointer = null;
                        for (SqlServerChangeTablePointer sqlServerChangeTablePointer2 : sqlServerChangeTablePointerArr) {
                            if (!sqlServerChangeTablePointer2.isCompleted() && (sqlServerChangeTablePointer == null || sqlServerChangeTablePointer2.compareTo(sqlServerChangeTablePointer) < 0)) {
                                sqlServerChangeTablePointer = sqlServerChangeTablePointer2;
                            }
                        }
                        if (sqlServerChangeTablePointer == null) {
                            return;
                        }
                        if (((TxLogPosition) sqlServerChangeTablePointer.getChangePosition()).isAvailable() && ((TxLogPosition) sqlServerChangeTablePointer.getChangePosition()).getInTxLsn().isAvailable()) {
                            if (sqlServerChangeTablePointer.isNewTransaction() && changesStoppedBeingMonotonic.get()) {
                                LOGGER.info("Resetting changesStoppedBeingMonotonic as transaction changes");
                                changesStoppedBeingMonotonic.set(false);
                            }
                            if (sqlServerChangeTablePointer.isCurrentPositionSmallerThanPreviousPosition()) {
                                LOGGER.info("Disabling skipping changes due to not monotonic order of changes");
                                changesStoppedBeingMonotonic.set(true);
                            }
                            if (!changesStoppedBeingMonotonic.get() && ((TxLogPosition) sqlServerChangeTablePointer.getChangePosition()).compareTo(changePosition) < 0) {
                                LOGGER.info("Skipping change {} as its position is smaller than the last recorded position {}", sqlServerChangeTablePointer, changePosition);
                                sqlServerChangeTablePointer.next();
                            } else if (!changesStoppedBeingMonotonic.get() && ((TxLogPosition) sqlServerChangeTablePointer.getChangePosition()).compareTo(changePosition) == 0 && j <= eventSerialNo) {
                                LOGGER.info("Skipping change {} as its order in the transaction {} is smaller than or equal to the last recorded operation {}[{}]", new Object[]{sqlServerChangeTablePointer, Long.valueOf(j), changePosition, Long.valueOf(eventSerialNo)});
                                j++;
                                sqlServerChangeTablePointer.next();
                            } else if (!((SqlServerChangeTable) sqlServerChangeTablePointer.getChangeTable()).getStopLsn().isAvailable() || ((SqlServerChangeTable) sqlServerChangeTablePointer.getChangeTable()).getStopLsn().compareTo(((TxLogPosition) sqlServerChangeTablePointer.getChangePosition()).getCommitLsn()) > 0) {
                                LOGGER.trace("Processing change {}", sqlServerChangeTablePointer);
                                LOGGER.trace("Schema change checkpoints {}", schemaChangeCheckpoints);
                                if (!schemaChangeCheckpoints.isEmpty() && ((TxLogPosition) sqlServerChangeTablePointer.getChangePosition()).getCommitLsn().compareTo(((SqlServerChangeTable) schemaChangeCheckpoints.peek()).getStartLsn()) >= 0) {
                                    migrateTable(sqlServerPartition, schemaChangeCheckpoints, sqlServerOffsetContext);
                                }
                                sourceTableId = ((SqlServerChangeTable) sqlServerChangeTablePointer.getChangeTable()).getSourceTableId();
                                int operation = sqlServerChangeTablePointer.getOperation();
                                Object[] data = sqlServerChangeTablePointer.getData();
                                int i2 = 1;
                                if (operation == 3) {
                                    if (!sqlServerChangeTablePointer.next() || sqlServerChangeTablePointer.getOperation() != 4) {
                                        break;
                                    } else {
                                        i2 = 2;
                                    }
                                }
                                Object[] data2 = operation == 3 ? sqlServerChangeTablePointer.getData() : null;
                                ResultSet resultSet = sqlServerChangeTablePointer.getResultSet();
                                sqlServerOffsetContext.setChangePosition((TxLogPosition) sqlServerChangeTablePointer.getChangePosition(), i2);
                                sqlServerOffsetContext.event(((SqlServerChangeTable) sqlServerChangeTablePointer.getChangeTable()).getSourceTableId(), resultSet.getTimestamp(resultSet.getMetaData().getColumnCount()).toInstant());
                                this.dispatcher.dispatchDataChangeEvent(sqlServerPartition, sourceTableId, new SqlServerChangeRecordEmitter(sqlServerPartition, sqlServerOffsetContext, operation, data, data2, this.clock, this.connectorConfig));
                                sqlServerChangeTablePointer.next();
                            } else {
                                LOGGER.debug("Skipping table change {} as its stop LSN is smaller than the last recorded LSN {}", sqlServerChangeTablePointer, sqlServerChangeTablePointer.getChangePosition());
                                sqlServerChangeTablePointer.next();
                            }
                        } else {
                            LOGGER.error("Skipping change {} as its LSN is NULL which is not expected", sqlServerChangeTablePointer);
                            sqlServerChangeTablePointer.next();
                        }
                    }
                    throw new IllegalStateException("The update before event at " + sqlServerChangeTablePointer.getChangePosition() + " for table " + sourceTableId + " was not followed by after event.\n Please report this as a bug together with a events around given LSN.");
                });
                orDefault.setLastProcessedPosition(TxLogPosition.valueOf(toLsn));
                this.dataConnection.rollback();
            } catch (SQLException e3) {
                tablesSlot.set(processErrorFromChangeTableQuery(databaseName, e3, tablesSlot.get()));
            }
            this.errorHandler.setProducerThrowable(e);
            return true;
        }
        return true;
    }

    /* renamed from: getOffsetContext, reason: merged with bridge method [inline-methods] */
    public SqlServerOffsetContext m39getOffsetContext() {
        return this.effectiveOffset;
    }

    private void collectChangeTablesWithKnownStopLsn(SqlServerPartition sqlServerPartition, SqlServerChangeTable[] sqlServerChangeTableArr) {
        for (SqlServerChangeTable sqlServerChangeTable : sqlServerChangeTableArr) {
            if (sqlServerChangeTable.getStopLsn().isAvailable()) {
                synchronized (this.changeTablesWithKnownStopLsn) {
                    LOGGER.info("The stop lsn of {} change table became known", sqlServerChangeTable);
                    this.changeTablesWithKnownStopLsn.computeIfAbsent(sqlServerPartition, sqlServerPartition2 -> {
                        return new HashSet();
                    }).add(sqlServerChangeTable);
                }
            }
        }
    }

    private void commitTransaction() throws SQLException {
        if (this.connectorConfig.isReadOnlyDatabaseConnection() || this.pauseBetweenCommits.hasElapsed()) {
            this.dataConnection.commit();
            this.metadataConnection.commit();
        }
    }

    private void migrateTable(SqlServerPartition sqlServerPartition, Queue<SqlServerChangeTable> queue, SqlServerOffsetContext sqlServerOffsetContext) throws InterruptedException, SQLException {
        SqlServerChangeTable poll = queue.poll();
        LOGGER.info("Migrating schema to {}", poll);
        Table tableFor = this.schema.tableFor(poll.getSourceTableId());
        Table tableSchemaFromTable = this.metadataConnection.getTableSchemaFromTable(sqlServerPartition.getDatabaseName(), poll);
        if (tableFor.equals(tableSchemaFromTable)) {
            LOGGER.info("Migration skipped, no table schema changes detected.");
        } else {
            this.dispatcher.dispatchSchemaChangeEvent(sqlServerPartition, sqlServerOffsetContext, poll.getSourceTableId(), new SqlServerSchemaChangeEventEmitter(sqlServerPartition, sqlServerOffsetContext, poll, tableSchemaFromTable, this.schema, SchemaChangeEvent.SchemaChangeEventType.ALTER));
            poll.setSourceTable(tableSchemaFromTable);
        }
    }

    private SqlServerChangeTable[] processErrorFromChangeTableQuery(String str, SQLException sQLException, SqlServerChangeTable[] sqlServerChangeTableArr) throws Exception {
        Matcher matcher = MISSING_CDC_FUNCTION_CHANGES_ERROR.matcher(sQLException.getMessage());
        if (!matcher.matches() || !matcher.group(1).equals(str)) {
            throw sQLException;
        }
        String group = matcher.group(2);
        LOGGER.info("Table is no longer captured with capture instance {}", group);
        return (SqlServerChangeTable[]) Arrays.stream(sqlServerChangeTableArr).filter(sqlServerChangeTable -> {
            return !sqlServerChangeTable.getCaptureInstance().equals(group);
        }).toArray(i -> {
            return new SqlServerChangeTable[i];
        });
    }

    private SqlServerChangeTable[] getChangeTablesToQuery(SqlServerPartition sqlServerPartition, SqlServerOffsetContext sqlServerOffsetContext, Lsn lsn) throws SQLException, InterruptedException {
        SqlServerChangeTable sqlServerChangeTable;
        String databaseName = sqlServerPartition.getDatabaseName();
        List<SqlServerChangeTable> changeTables = this.dataConnection.getChangeTables(databaseName, lsn);
        if (changeTables.isEmpty()) {
            LOGGER.warn("No table has enabled CDC or security constraints prevents getting the list of change tables");
        }
        Map map = (Map) changeTables.stream().filter(sqlServerChangeTable2 -> {
            if (this.connectorConfig.getTableFilters().dataCollectionFilter().isIncluded(sqlServerChangeTable2.getSourceTableId())) {
                return true;
            }
            LOGGER.info("CDC is enabled for table {} but the table is not on connector's table include list", sqlServerChangeTable2);
            return false;
        }).collect(Collectors.groupingBy((v0) -> {
            return v0.getSourceTableId();
        }));
        if (map.isEmpty()) {
            LOGGER.warn("No table on connector's include list has enabled CDC, tables on include list do not contain any table with CDC enabled or no table match the include/exclude filter(s)");
        }
        ArrayList arrayList = new ArrayList();
        for (List list : map.values()) {
            SqlServerChangeTable sqlServerChangeTable3 = (SqlServerChangeTable) list.get(0);
            if (list.size() > 1) {
                if (((SqlServerChangeTable) list.get(0)).getStartLsn().compareTo(((SqlServerChangeTable) list.get(1)).getStartLsn()) < 0) {
                    sqlServerChangeTable = (SqlServerChangeTable) list.get(1);
                } else {
                    sqlServerChangeTable3 = (SqlServerChangeTable) list.get(1);
                    sqlServerChangeTable = (SqlServerChangeTable) list.get(0);
                }
                sqlServerChangeTable3.setStopLsn(sqlServerChangeTable.getStartLsn());
                sqlServerChangeTable.setSourceTable(this.dataConnection.getTableSchemaFromTable(databaseName, sqlServerChangeTable));
                arrayList.add(sqlServerChangeTable);
                LOGGER.info("Multiple capture instances present for the same table: {} and {}", sqlServerChangeTable3, sqlServerChangeTable);
            }
            if (this.schema.tableFor(sqlServerChangeTable3.getSourceTableId()) == null) {
                LOGGER.info("Table {} is new to be monitored by capture instance {}", sqlServerChangeTable3.getSourceTableId(), sqlServerChangeTable3.getCaptureInstance());
                sqlServerOffsetContext.event(sqlServerChangeTable3.getSourceTableId(), Instant.now());
                this.dispatcher.dispatchSchemaChangeEvent(sqlServerPartition, sqlServerOffsetContext, sqlServerChangeTable3.getSourceTableId(), new SqlServerSchemaChangeEventEmitter(sqlServerPartition, sqlServerOffsetContext, sqlServerChangeTable3, this.dataConnection.getTableSchemaFromTable(databaseName, sqlServerChangeTable3), this.schema, SchemaChangeEvent.SchemaChangeEventType.CREATE));
            }
            sqlServerChangeTable3.setSourceTable(this.schema.tableFor(sqlServerChangeTable3.getSourceTableId()));
            arrayList.add(sqlServerChangeTable3);
        }
        return (SqlServerChangeTable[]) arrayList.toArray(new SqlServerChangeTable[arrayList.size()]);
    }

    private Lsn getToLsn(SqlServerConnection sqlServerConnection, String str, TxLogPosition txLogPosition, int i) throws SQLException {
        if (i == 0) {
            return sqlServerConnection.getMaxTransactionLsn(str);
        }
        Lsn commitLsn = txLogPosition.getCommitLsn();
        return !commitLsn.isAvailable() ? sqlServerConnection.getNthTransactionLsnFromBeginning(str, i) : sqlServerConnection.getNthTransactionLsnFromLast(str, commitLsn, i);
    }

    public void commitOffset(Map<String, ?> map, Map<String, ?> map2) {
        Lsn valueOf = Lsn.valueOf((String) map2.get(SourceInfo.COMMIT_LSN_KEY));
        synchronized (this.changeTablesWithKnownStopLsn) {
            Optional<SqlServerPartition> findFirst = this.changeTablesWithKnownStopLsn.keySet().stream().filter(sqlServerPartition -> {
                return sqlServerPartition.getSourcePartition().equals(map);
            }).findFirst();
            if (findFirst.isEmpty()) {
                return;
            }
            SqlServerPartition sqlServerPartition2 = findFirst.get();
            Set<SqlServerChangeTable> set = this.changeTablesWithKnownStopLsn.get(sqlServerPartition2);
            for (SqlServerChangeTable sqlServerChangeTable : (List) set.stream().filter(sqlServerChangeTable2 -> {
                return sqlServerChangeTable2.getStopLsn().compareTo(valueOf) < 0;
            }).collect(Collectors.toList())) {
                HashMap hashMap = new HashMap();
                hashMap.put("connector_name", this.connectorConfig.getLogicalName());
                hashMap.put("capture_instance", sqlServerChangeTable.getCaptureInstance());
                hashMap.put("start_lsn", sqlServerChangeTable.getStartLsn().toString());
                hashMap.put("stop_lsn", sqlServerChangeTable.getStopLsn().toString());
                hashMap.put(SourceInfo.COMMIT_LSN_KEY, valueOf.toString());
                hashMap.putAll(sqlServerPartition2.getSourcePartition());
                this.notificationService.notify(Notification.Builder.builder().withId(UUID.randomUUID().toString()).withAggregateType("Capture Instance").withType("COMPLETED").withAdditionalData(hashMap).withTimestamp(Long.valueOf(this.clock.currentTimeInMillis())).build());
                set.remove(sqlServerChangeTable);
                LOGGER.info("Complete reading from change table {} as the committed change lsn ({}) is greater than the table's stop lsn ({})", new Object[]{sqlServerChangeTable, map2, sqlServerChangeTable.getStopLsn().toString()});
            }
        }
    }
}
