package org.apache.flink.cdc.debezium.internal;

import io.debezium.embedded.EmbeddedEngineChangeEvent;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import java.util.List;
import java.util.Map;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/cdc/debezium/internal/DebeziumChangeConsumer.class */
public class DebeziumChangeConsumer implements DebeziumEngine.ChangeConsumer<ChangeEvent<SourceRecord, SourceRecord>> {
    public static final String LAST_COMPLETELY_PROCESSED_LSN_KEY = "lsn_proc";
    public static final String LAST_COMMIT_LSN_KEY = "lsn_commit";
    private static final Logger LOG = LoggerFactory.getLogger(DebeziumChangeConsumer.class);
    private final Handover handover;
    private volatile DebeziumEngine.RecordCommitter<ChangeEvent<SourceRecord, SourceRecord>> currentCommitter;

    public DebeziumChangeConsumer(Handover handover) {
        this.handover = handover;
    }

    public void handleBatch(List<ChangeEvent<SourceRecord, SourceRecord>> list, DebeziumEngine.RecordCommitter<ChangeEvent<SourceRecord, SourceRecord>> recordCommitter) {
        try {
            this.currentCommitter = recordCommitter;
            this.handover.produce(list);
        } catch (Throwable th) {
            this.handover.reportError(th);
        }
    }

    public void commitOffset(DebeziumOffset debeziumOffset) throws InterruptedException {
        if (this.currentCommitter == null) {
            LOG.info("commitOffset() called on Debezium change consumer which doesn't receive records yet.");
            return;
        }
        SourceRecord sourceRecord = new SourceRecord(debeziumOffset.sourcePartition, adjustSourceOffset(debeziumOffset.sourceOffset), "DUMMY", Schema.BOOLEAN_SCHEMA, true);
        this.currentCommitter.markProcessed(new EmbeddedEngineChangeEvent(null, sourceRecord, sourceRecord));
        this.currentCommitter.markBatchFinished();
    }

    private Map<String, Object> adjustSourceOffset(Map<String, Object> map) {
        if (map.containsKey(LAST_COMPLETELY_PROCESSED_LSN_KEY)) {
            map.put(LAST_COMPLETELY_PROCESSED_LSN_KEY, Long.valueOf(Long.parseLong(map.get(LAST_COMPLETELY_PROCESSED_LSN_KEY).toString())));
        }
        if (map.containsKey(LAST_COMMIT_LSN_KEY)) {
            map.put(LAST_COMMIT_LSN_KEY, Long.valueOf(Long.parseLong(map.get(LAST_COMMIT_LSN_KEY).toString())));
        }
        return map;
    }
}
