package io.debezium.connector.cassandra;

import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.cassandra.exceptions.CassandraConnectorTaskException;
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.commitlog.CommitLogReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/cassandra/CommitLogProcessor.class */
public class CommitLogProcessor extends AbstractProcessor {
    private static final Logger LOGGER = LoggerFactory.getLogger(CommitLogProcessor.class);
    private static final String NAME = "Commit Log Processor";
    private final CommitLogReader commitLogReader;
    private final CommitLogReadHandlerImpl commitLogReadHandler;
    private final File cdcDir;
    private final AbstractDirectoryWatcher watcher;
    private final ChangeEventQueue<Event> queue;
    private final boolean latestOnly;
    private final CommitLogProcessorMetrics metrics;
    private boolean initial;
    private final boolean errorCommitLogReprocessEnabled;
    private final CommitLogTransfer commitLogTransfer;

    public CommitLogProcessor(CassandraConnectorContext cassandraConnectorContext) throws IOException {
        super(NAME, Duration.ZERO);
        this.metrics = new CommitLogProcessorMetrics();
        this.initial = true;
        this.commitLogReader = new CommitLogReader();
        this.queue = cassandraConnectorContext.getQueue();
        this.commitLogReadHandler = new CommitLogReadHandlerImpl(cassandraConnectorContext.getSchemaHolder(), cassandraConnectorContext.getQueue(), cassandraConnectorContext.getOffsetWriter(), new RecordMaker(cassandraConnectorContext.getCassandraConnectorConfig().tombstonesOnDelete(), new Filters(cassandraConnectorContext.getCassandraConnectorConfig().fieldExcludeList()), cassandraConnectorContext.getCassandraConnectorConfig()), this.metrics);
        this.cdcDir = new File(DatabaseDescriptor.getCDCLogLocation());
        this.watcher = new AbstractDirectoryWatcher(this.cdcDir.toPath(), cassandraConnectorContext.getCassandraConnectorConfig().cdcDirPollInterval(), Collections.singleton(StandardWatchEventKinds.ENTRY_CREATE)) { // from class: io.debezium.connector.cassandra.CommitLogProcessor.1
            @Override // io.debezium.connector.cassandra.AbstractDirectoryWatcher
            void handleEvent(WatchEvent<?> watchEvent, Path path) throws IOException {
                if (CommitLogProcessor.this.isRunning()) {
                    CommitLogProcessor.this.processCommitLog(path.toFile());
                }
            }
        };
        this.latestOnly = cassandraConnectorContext.getCassandraConnectorConfig().latestCommitLogOnly();
        this.errorCommitLogReprocessEnabled = cassandraConnectorContext.getCassandraConnectorConfig().errorCommitLogReprocessEnabled();
        this.commitLogTransfer = cassandraConnectorContext.getCassandraConnectorConfig().getCommitLogTransfer();
    }

    @Override // io.debezium.connector.cassandra.AbstractProcessor
    public void initialize() {
        this.metrics.registerMetrics();
    }

    @Override // io.debezium.connector.cassandra.AbstractProcessor
    public void destroy() {
        this.metrics.unregisterMetrics();
    }

    @Override // io.debezium.connector.cassandra.AbstractProcessor
    public void process() throws IOException, InterruptedException {
        LOGGER.debug("Processing commitLogFiles while initial is {}", Boolean.valueOf(this.initial));
        if (this.latestOnly) {
            processLastModifiedCommitLog();
            throw new InterruptedException();
        }
        if (this.initial) {
            if (this.errorCommitLogReprocessEnabled) {
                LOGGER.info("CommitLog Error Processing is enabled. Attempting to get all error commitLog files.");
                this.commitLogTransfer.getErrorCommitLogFiles();
            }
            LOGGER.info("Reading existing commit logs in {}", this.cdcDir);
            File[] commitLogs = CommitLogUtil.getCommitLogs(this.cdcDir);
            Arrays.sort(commitLogs, CommitLogUtil::compareCommitLogs);
            for (File file : commitLogs) {
                if (isRunning()) {
                    processCommitLog(file);
                }
            }
            this.initial = false;
        }
        this.watcher.poll();
    }

    void processCommitLog(File file) throws IOException {
        if (file == null) {
            throw new IOException("Commit log is null");
        }
        try {
            if (!file.exists()) {
                throw new IOException("Commit log " + file.getName() + " does not exist");
            }
            try {
                LOGGER.info("Processing commit log {}", file.getName());
                this.metrics.setCommitLogFilename(file.getName());
                this.commitLogReader.readCommitLogSegment(this.commitLogReadHandler, file, false);
                if (!this.latestOnly) {
                    this.queue.enqueue(new EOFEvent(file, true));
                }
                LOGGER.info("Successfully processed commit log {}", file.getName());
            } catch (Exception e) {
                if (!this.latestOnly) {
                    this.queue.enqueue(new EOFEvent(file, false));
                }
                if (this.commitLogTransfer.getClass().getName().equals(CassandraConnectorConfig.DEFAULT_COMMIT_LOG_TRANSFER_CLASS)) {
                    LOGGER.error("Error occurred while processing commit log " + file.getName(), e);
                    throw e;
                }
                LOGGER.error("Error occurred while processing commit log " + file.getName(), e);
            }
        } catch (InterruptedException e2) {
            LOGGER.error("Interruption while enqueuing EOF Event for file {}", file.getName());
            throw new CassandraConnectorTaskException("Enqueuing has been interrupted: ", e2);
        }
    }

    void processLastModifiedCommitLog() throws IOException {
        LOGGER.warn("CommitLogProcessor will read the last modified commit log from the COMMIT LOG DIRECTORY based on modified timestamp, NOT FROM THE CDC_RAW DIRECTORY. This method should not be used in PRODUCTION!");
        File file = null;
        for (File file2 : CommitLogUtil.getCommitLogs(new File(DatabaseDescriptor.getCommitLogLocation()))) {
            if (file == null || file.lastModified() < file2.lastModified()) {
                file = file2;
            }
        }
        if (file != null) {
            processCommitLog(file);
        } else {
            LOGGER.info("No commit logs found in {}", DatabaseDescriptor.getCommitLogLocation());
        }
    }
}
