package org.apache.flink.cdc.connectors.oracle.source.reader.fetch;

import io.debezium.DebeziumException;
import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleDatabaseSchema;
import io.debezium.connector.oracle.OracleOffsetContext;
import io.debezium.connector.oracle.OraclePartition;
import io.debezium.connector.oracle.OracleStreamingChangeEventSourceMetrics;
import io.debezium.connector.oracle.logminer.events.LogMinerEventRow;
import io.debezium.connector.oracle.logminer.processor.LogMinerEventProcessor;
import io.debezium.connector.oracle.logminer.processor.infinispan.EmbeddedInfinispanLogMinerEventProcessor;
import io.debezium.connector.oracle.logminer.processor.infinispan.RemoteInfinispanLogMinerEventProcessor;
import io.debezium.connector.oracle.logminer.processor.memory.MemoryLogMinerEventProcessor;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import java.sql.SQLException;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.connectors.base.relational.JdbcSourceEventDispatcher;
import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit;
import org.apache.flink.cdc.connectors.base.source.meta.wartermark.WatermarkKind;
import org.apache.flink.cdc.connectors.oracle.source.meta.offset.RedoLogOffset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/cdc/connectors/oracle/source/reader/fetch/EventProcessorFactory.class */
public class EventProcessorFactory {
    private static final Logger LOG = LoggerFactory.getLogger(EventProcessorFactory.class);

    /* loaded from: input_file:org/apache/flink/cdc/connectors/oracle/source/reader/fetch/EventProcessorFactory$CDCEmbeddedInfinispanLogMinerEventProcessor.class */
    public static class CDCEmbeddedInfinispanLogMinerEventProcessor extends EmbeddedInfinispanLogMinerEventProcessor {
        private final StreamSplit redoLogSplit;
        private final ErrorHandler errorHandler;
        private ChangeEventSource.ChangeEventSourceContext context;
        private final JdbcSourceEventDispatcher<OraclePartition> dispatcher;

        public CDCEmbeddedInfinispanLogMinerEventProcessor(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext, OracleConnectorConfig oracleConnectorConfig, OracleConnection oracleConnection, JdbcSourceEventDispatcher<OraclePartition> jdbcSourceEventDispatcher, OraclePartition oraclePartition, OracleOffsetContext oracleOffsetContext, OracleDatabaseSchema oracleDatabaseSchema, OracleStreamingChangeEventSourceMetrics oracleStreamingChangeEventSourceMetrics, ErrorHandler errorHandler, StreamSplit streamSplit) {
            super(changeEventSourceContext, oracleConnectorConfig, oracleConnection, jdbcSourceEventDispatcher, oraclePartition, oracleOffsetContext, oracleDatabaseSchema, oracleStreamingChangeEventSourceMetrics);
            this.redoLogSplit = streamSplit;
            this.errorHandler = errorHandler;
            this.context = changeEventSourceContext;
            this.dispatcher = jdbcSourceEventDispatcher;
        }

        protected void processRow(OraclePartition oraclePartition, LogMinerEventRow logMinerEventRow) throws SQLException, InterruptedException {
            if (EventProcessorFactory.reachEndingOffset(oraclePartition, logMinerEventRow, this.redoLogSplit, this.errorHandler, this.dispatcher, this.context)) {
                return;
            }
            super.processRow(oraclePartition, logMinerEventRow);
        }
    }

    /* loaded from: input_file:org/apache/flink/cdc/connectors/oracle/source/reader/fetch/EventProcessorFactory$CDCMemoryLogMinerEventProcessor.class */
    public static class CDCMemoryLogMinerEventProcessor extends MemoryLogMinerEventProcessor {
        private final StreamSplit redoLogSplit;
        private final ErrorHandler errorHandler;
        private ChangeEventSource.ChangeEventSourceContext context;
        private final JdbcSourceEventDispatcher<OraclePartition> dispatcher;

        public CDCMemoryLogMinerEventProcessor(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext, OracleConnectorConfig oracleConnectorConfig, OracleConnection oracleConnection, JdbcSourceEventDispatcher<OraclePartition> jdbcSourceEventDispatcher, OraclePartition oraclePartition, OracleOffsetContext oracleOffsetContext, OracleDatabaseSchema oracleDatabaseSchema, OracleStreamingChangeEventSourceMetrics oracleStreamingChangeEventSourceMetrics, ErrorHandler errorHandler, StreamSplit streamSplit) {
            super(changeEventSourceContext, oracleConnectorConfig, oracleConnection, jdbcSourceEventDispatcher, oraclePartition, oracleOffsetContext, oracleDatabaseSchema, oracleStreamingChangeEventSourceMetrics);
            this.redoLogSplit = streamSplit;
            this.errorHandler = errorHandler;
            this.context = changeEventSourceContext;
            this.dispatcher = jdbcSourceEventDispatcher;
        }

        protected void processRow(OraclePartition oraclePartition, LogMinerEventRow logMinerEventRow) throws SQLException, InterruptedException {
            if (EventProcessorFactory.reachEndingOffset(oraclePartition, logMinerEventRow, this.redoLogSplit, this.errorHandler, this.dispatcher, this.context)) {
                return;
            }
            super.processRow(oraclePartition, logMinerEventRow);
        }
    }

    /* loaded from: input_file:org/apache/flink/cdc/connectors/oracle/source/reader/fetch/EventProcessorFactory$CDCRemoteInfinispanLogMinerEventProcessor.class */
    public static class CDCRemoteInfinispanLogMinerEventProcessor extends RemoteInfinispanLogMinerEventProcessor {
        private final StreamSplit redoLogSplit;
        private final ErrorHandler errorHandler;
        private ChangeEventSource.ChangeEventSourceContext context;
        private final JdbcSourceEventDispatcher<OraclePartition> dispatcher;

        public CDCRemoteInfinispanLogMinerEventProcessor(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext, OracleConnectorConfig oracleConnectorConfig, OracleConnection oracleConnection, JdbcSourceEventDispatcher<OraclePartition> jdbcSourceEventDispatcher, OraclePartition oraclePartition, OracleOffsetContext oracleOffsetContext, OracleDatabaseSchema oracleDatabaseSchema, OracleStreamingChangeEventSourceMetrics oracleStreamingChangeEventSourceMetrics, ErrorHandler errorHandler, StreamSplit streamSplit) {
            super(changeEventSourceContext, oracleConnectorConfig, oracleConnection, jdbcSourceEventDispatcher, oraclePartition, oracleOffsetContext, oracleDatabaseSchema, oracleStreamingChangeEventSourceMetrics);
            this.redoLogSplit = streamSplit;
            this.errorHandler = errorHandler;
            this.context = changeEventSourceContext;
            this.dispatcher = jdbcSourceEventDispatcher;
        }

        protected void processRow(OraclePartition oraclePartition, LogMinerEventRow logMinerEventRow) throws SQLException, InterruptedException {
            if (EventProcessorFactory.reachEndingOffset(oraclePartition, logMinerEventRow, this.redoLogSplit, this.errorHandler, this.dispatcher, this.context)) {
                return;
            }
            super.processRow(oraclePartition, logMinerEventRow);
        }
    }

    private EventProcessorFactory() {
    }

    public static LogMinerEventProcessor createProcessor(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext, OracleConnectorConfig oracleConnectorConfig, OracleConnection oracleConnection, JdbcSourceEventDispatcher<OraclePartition> jdbcSourceEventDispatcher, OraclePartition oraclePartition, OracleOffsetContext oracleOffsetContext, OracleDatabaseSchema oracleDatabaseSchema, OracleStreamingChangeEventSourceMetrics oracleStreamingChangeEventSourceMetrics, ErrorHandler errorHandler, StreamSplit streamSplit) {
        OracleConnectorConfig.LogMiningBufferType logMiningBufferType = oracleConnectorConfig.getLogMiningBufferType();
        if (logMiningBufferType.equals(OracleConnectorConfig.LogMiningBufferType.MEMORY)) {
            return new CDCMemoryLogMinerEventProcessor(changeEventSourceContext, oracleConnectorConfig, oracleConnection, jdbcSourceEventDispatcher, oraclePartition, oracleOffsetContext, oracleDatabaseSchema, oracleStreamingChangeEventSourceMetrics, errorHandler, streamSplit);
        }
        if (logMiningBufferType.equals(OracleConnectorConfig.LogMiningBufferType.INFINISPAN_EMBEDDED)) {
            return new CDCEmbeddedInfinispanLogMinerEventProcessor(changeEventSourceContext, oracleConnectorConfig, oracleConnection, jdbcSourceEventDispatcher, oraclePartition, oracleOffsetContext, oracleDatabaseSchema, oracleStreamingChangeEventSourceMetrics, errorHandler, streamSplit);
        }
        if (logMiningBufferType.equals(OracleConnectorConfig.LogMiningBufferType.INFINISPAN_REMOTE)) {
            return new CDCRemoteInfinispanLogMinerEventProcessor(changeEventSourceContext, oracleConnectorConfig, oracleConnection, jdbcSourceEventDispatcher, oraclePartition, oracleOffsetContext, oracleDatabaseSchema, oracleStreamingChangeEventSourceMetrics, errorHandler, streamSplit);
        }
        throw new IllegalArgumentException("not support this type of bufferType: " + logMiningBufferType);
    }

    public static boolean reachEndingOffset(OraclePartition oraclePartition, LogMinerEventRow logMinerEventRow, StreamSplit streamSplit, ErrorHandler errorHandler, JdbcSourceEventDispatcher jdbcSourceEventDispatcher, ChangeEventSource.ChangeEventSourceContext changeEventSourceContext) {
        if (!isBoundedRead(streamSplit)) {
            return false;
        }
        RedoLogOffset redoLogOffset = new RedoLogOffset(Long.valueOf(logMinerEventRow.getScn().longValue()));
        if (!redoLogOffset.isAtOrAfter(streamSplit.getEndingOffset())) {
            return false;
        }
        try {
            jdbcSourceEventDispatcher.dispatchWatermarkEvent(oraclePartition.getSourcePartition(), streamSplit, redoLogOffset, WatermarkKind.END);
        } catch (InterruptedException e) {
            LOG.error("Send signal event error.", e);
            errorHandler.setProducerThrowable(new DebeziumException("Error processing redo log signal event", e));
        }
        ((StoppableChangeEventSourceContext) changeEventSourceContext).stopChangeEventSource();
        return true;
    }

    private static boolean isBoundedRead(StreamSplit streamSplit) {
        return !RedoLogOffset.NO_STOPPING_OFFSET.equals(streamSplit.getEndingOffset());
    }
}
