package io.debezium.connector.cassandra;

import com.google.common.annotations.VisibleForTesting;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.cassandra.Event;
import io.debezium.connector.cassandra.exceptions.CassandraConnectorTaskException;
import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.Iterator;

/* loaded from: input_file:io/debezium/connector/cassandra/QueueProcessor.class */
public class QueueProcessor extends AbstractProcessor {
    private static final String NAME = "Change Event Queue Processor";
    private final ChangeEventQueue<Event> queue;
    private final KafkaRecordEmitter kafkaRecordEmitter;
    private final String commitLogRelocationDir;
    public static final String ARCHIVE_FOLDER = "archive";
    public static final String ERROR_FOLDER = "error";

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: io.debezium.connector.cassandra.QueueProcessor$1, reason: invalid class name */
    /* loaded from: input_file:io/debezium/connector/cassandra/QueueProcessor$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$io$debezium$connector$cassandra$Event$EventType = new int[Event.EventType.values().length];

        static {
            try {
                $SwitchMap$io$debezium$connector$cassandra$Event$EventType[Event.EventType.CHANGE_EVENT.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$debezium$connector$cassandra$Event$EventType[Event.EventType.TOMBSTONE_EVENT.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$io$debezium$connector$cassandra$Event$EventType[Event.EventType.EOF_EVENT.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    public QueueProcessor(CassandraConnectorContext cassandraConnectorContext) {
        this(cassandraConnectorContext, new KafkaRecordEmitter(cassandraConnectorContext.getCassandraConnectorConfig().kafkaTopicPrefix(), cassandraConnectorContext.getCassandraConnectorConfig().getHeartbeatTopicsPrefix(), cassandraConnectorContext.getCassandraConnectorConfig().getKafkaConfigs(), cassandraConnectorContext.getOffsetWriter(), cassandraConnectorContext.getCassandraConnectorConfig().offsetFlushIntervalMs(), cassandraConnectorContext.getCassandraConnectorConfig().maxOffsetFlushSize(), cassandraConnectorContext.getCassandraConnectorConfig().getKeyConverter(), cassandraConnectorContext.getCassandraConnectorConfig().getValueConverter()));
    }

    @VisibleForTesting
    QueueProcessor(CassandraConnectorContext cassandraConnectorContext, KafkaRecordEmitter kafkaRecordEmitter) {
        super(NAME, Duration.ZERO);
        this.queue = cassandraConnectorContext.getQueue();
        this.kafkaRecordEmitter = kafkaRecordEmitter;
        this.commitLogRelocationDir = cassandraConnectorContext.getCassandraConnectorConfig().commitLogRelocationDir();
    }

    @Override // io.debezium.connector.cassandra.AbstractProcessor
    public void process() throws InterruptedException {
        Iterator it = this.queue.poll().iterator();
        while (it.hasNext()) {
            processEvent((Event) it.next());
        }
    }

    @Override // io.debezium.connector.cassandra.AbstractProcessor
    public void initialize() throws Exception {
        File file = new File(this.commitLogRelocationDir);
        if (!file.exists() && !file.mkdir()) {
            throw new IOException("Failed to create " + this.commitLogRelocationDir);
        }
        File file2 = new File(file, ARCHIVE_FOLDER);
        if (!file2.exists() && !file2.mkdir()) {
            throw new IOException("Failed to create " + file2);
        }
        File file3 = new File(file, ERROR_FOLDER);
        if (!file3.exists() && !file3.mkdir()) {
            throw new IOException("Failed to create " + file3);
        }
    }

    @Override // io.debezium.connector.cassandra.AbstractProcessor
    public void destroy() {
        this.kafkaRecordEmitter.close();
    }

    private void processEvent(Event event) {
        if (event == null) {
            return;
        }
        switch (AnonymousClass1.$SwitchMap$io$debezium$connector$cassandra$Event$EventType[event.getEventType().ordinal()]) {
            case CassandraConnectorConfig.DEFAULT_COMMIT_LOG_POST_PROCESSING_ENABLED /* 1 */:
                this.kafkaRecordEmitter.emit((ChangeRecord) event);
                return;
            case 2:
                this.kafkaRecordEmitter.emit((TombstoneRecord) event);
                return;
            case 3:
                EOFEvent eOFEvent = (EOFEvent) event;
                CommitLogUtil.moveCommitLog(eOFEvent.file, Paths.get(this.commitLogRelocationDir, eOFEvent.success ? ARCHIVE_FOLDER : ERROR_FOLDER));
                return;
            default:
                throw new CassandraConnectorTaskException("Encountered unexpected record with type: " + event.getEventType());
        }
    }
}
