package io.debezium.connector.sqlserver;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.common.CdcSourceTaskContext;
import io.debezium.pipeline.ChangeEventSourceCoordinator;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.metrics.spi.ChangeEventSourceMetricsFactory;
import io.debezium.pipeline.notification.NotificationService;
import io.debezium.pipeline.signal.SignalProcessor;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.source.spi.ChangeEventSourceFactory;
import io.debezium.pipeline.source.spi.SnapshotChangeEventSource;
import io.debezium.pipeline.spi.Offsets;
import io.debezium.pipeline.spi.SnapshotResult;
import io.debezium.schema.DatabaseSchema;
import io.debezium.snapshot.SnapshotterService;
import io.debezium.util.Clock;
import io.debezium.util.LoggingContext;
import io.debezium.util.Metronome;
import java.time.Duration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.connect.source.SourceConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/sqlserver/SqlServerChangeEventSourceCoordinator.class */
public class SqlServerChangeEventSourceCoordinator extends ChangeEventSourceCoordinator<SqlServerPartition, SqlServerOffsetContext> {
    private static final Logger LOGGER = LoggerFactory.getLogger(SqlServerChangeEventSourceCoordinator.class);
    private final Clock clock;
    private final Duration pollInterval;
    private final AtomicBoolean firstStreamingIterationCompletedSuccessfully;

    public SqlServerChangeEventSourceCoordinator(Offsets<SqlServerPartition, SqlServerOffsetContext> offsets, ErrorHandler errorHandler, Class<? extends SourceConnector> cls, CommonConnectorConfig commonConnectorConfig, ChangeEventSourceFactory<SqlServerPartition, SqlServerOffsetContext> changeEventSourceFactory, ChangeEventSourceMetricsFactory<SqlServerPartition> changeEventSourceMetricsFactory, EventDispatcher<SqlServerPartition, ?> eventDispatcher, DatabaseSchema<?> databaseSchema, Clock clock, SignalProcessor<SqlServerPartition, SqlServerOffsetContext> signalProcessor, NotificationService<SqlServerPartition, SqlServerOffsetContext> notificationService, SnapshotterService snapshotterService) {
        super(offsets, errorHandler, cls, commonConnectorConfig, changeEventSourceFactory, changeEventSourceMetricsFactory, eventDispatcher, databaseSchema, signalProcessor, notificationService, snapshotterService);
        this.firstStreamingIterationCompletedSuccessfully = new AtomicBoolean(false);
        this.clock = clock;
        this.pollInterval = commonConnectorConfig.getPollInterval();
    }

    public boolean firstStreamingIterationCompletedSuccessfully() {
        return this.firstStreamingIterationCompletedSuccessfully.get();
    }

    protected void executeChangeEventSources(CdcSourceTaskContext cdcSourceTaskContext, SnapshotChangeEventSource<SqlServerPartition, SqlServerOffsetContext> snapshotChangeEventSource, Offsets<SqlServerPartition, SqlServerOffsetContext> offsets, AtomicReference<LoggingContext.PreviousContext> atomicReference, ChangeEventSource.ChangeEventSourceContext changeEventSourceContext) throws InterruptedException {
        Offsets of = Offsets.of(new HashMap());
        Iterator it = offsets.iterator();
        while (it.hasNext()) {
            Map.Entry entry = (Map.Entry) it.next();
            SqlServerPartition sqlServerPartition = (SqlServerPartition) entry.getKey();
            SqlServerOffsetContext sqlServerOffsetContext = (SqlServerOffsetContext) entry.getValue();
            atomicReference.set(cdcSourceTaskContext.configureLoggingContext("snapshot", sqlServerPartition));
            SnapshotResult doSnapshot = doSnapshot(snapshotChangeEventSource, changeEventSourceContext, sqlServerPartition, sqlServerOffsetContext);
            if (doSnapshot.isCompletedOrSkipped()) {
                of.getOffsets().put(sqlServerPartition, doSnapshot.getOffset());
                if (offsets.getOffsets().size() == 1) {
                    this.signalProcessor.setContext(doSnapshot.getOffset());
                }
            }
        }
        atomicReference.set(cdcSourceTaskContext.configureLoggingContext("streaming"));
        Iterator it2 = of.iterator();
        while (it2.hasNext()) {
            Map.Entry entry2 = (Map.Entry) it2.next();
            initStreamEvents((SqlServerPartition) entry2.getKey(), (SqlServerOffsetContext) entry2.getValue());
        }
        getSignalProcessor(offsets).ifPresent(signalProcessor -> {
            registerSignalActionsAndStartProcessor(signalProcessor, this.eventDispatcher, this, this.connectorConfig);
        });
        Metronome sleeper = Metronome.sleeper(this.pollInterval, this.clock);
        LOGGER.info("Starting streaming");
        while (changeEventSourceContext.isRunning()) {
            boolean z = false;
            Iterator it3 = of.iterator();
            while (it3.hasNext()) {
                Map.Entry entry3 = (Map.Entry) it3.next();
                SqlServerPartition sqlServerPartition2 = (SqlServerPartition) entry3.getKey();
                SqlServerOffsetContext sqlServerOffsetContext2 = (SqlServerOffsetContext) entry3.getValue();
                atomicReference.set(cdcSourceTaskContext.configureLoggingContext("streaming", sqlServerPartition2));
                if (changeEventSourceContext.isRunning()) {
                    z = this.streamingSource.executeIteration(changeEventSourceContext, sqlServerPartition2, sqlServerOffsetContext2);
                }
            }
            if (!z) {
                sleeper.pause();
            }
            if (this.errorHandler.getProducerThrowable() == null) {
                this.firstStreamingIterationCompletedSuccessfully.set(true);
            }
            if (changeEventSourceContext.isPaused()) {
                LOGGER.info("Streaming will now pause");
                changeEventSourceContext.streamingPaused();
                changeEventSourceContext.waitSnapshotCompletion();
                LOGGER.info("Streaming resumed");
            }
        }
        LOGGER.info("Finished streaming");
    }
}
