package io.debezium.connector.cassandra;

import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.cassandra.Event;
import java.io.File;
import java.util.List;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.SimpleBuilders;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:io/debezium/connector/cassandra/CommitLogProcessorTest.class */
public class CommitLogProcessorTest extends EmbeddedCassandraConnectorTestBase {
    private CassandraConnectorContext context;
    private CommitLogProcessor commitLogProcessor;

    @Before
    public void setUp() throws Exception {
        this.context = generateTaskContext();
        this.commitLogProcessor = new CommitLogProcessor(this.context);
        this.commitLogProcessor.initialize();
    }

    @After
    public void tearDown() throws Exception {
        deleteTestOffsets(this.context);
        this.commitLogProcessor.destroy();
        this.context.cleanUp();
    }

    @Test
    public void testProcessCommitLogs() throws Exception {
        this.context.getCassandraClient().execute("CREATE TABLE IF NOT EXISTS " + keyspaceTable("cdc_table") + " (a int, b int, PRIMARY KEY(a)) WITH cdc = true;");
        this.context.getSchemaHolder().refreshSchemas();
        CFMetaData cFMetaData = Schema.instance.getCFMetaData(EmbeddedCassandraConnectorTestBase.TEST_KEYSPACE, "cdc_table");
        for (int i = 0; i < 10; i++) {
            SimpleBuilders.PartitionUpdateBuilder partitionUpdateBuilder = new SimpleBuilders.PartitionUpdateBuilder(cFMetaData, new Object[]{Integer.valueOf(i)});
            CommitLog.instance.add(new Mutation(PartitionUpdate.singleRowUpdate(cFMetaData, partitionUpdateBuilder.build().partitionKey(), partitionUpdateBuilder.row(new Object[0]).add("b", Integer.valueOf(i)).build())));
        }
        CommitLog.instance.sync(true);
        ChangeEventQueue queue = this.context.getQueue();
        Assert.assertEquals(queue.totalCapacity(), queue.remainingCapacity());
        for (File file : CommitLogUtil.getCommitLogs(new File(DatabaseDescriptor.getCommitLogLocation()))) {
            this.commitLogProcessor.processCommitLog(file);
        }
        List poll = queue.poll();
        Assert.assertEquals(10 + r0.length, poll.size());
        for (int i2 = 0; i2 < poll.size(); i2++) {
            EOFEvent eOFEvent = (Event) poll.get(i2);
            if (eOFEvent instanceof Record) {
                Record record = (Record) poll.get(i2);
                Assert.assertEquals(record.getEventType(), Event.EventType.CHANGE_EVENT);
                Assert.assertEquals(record.getSource().cluster, DatabaseDescriptor.getClusterName());
                Assert.assertFalse(record.getSource().snapshot);
                Assert.assertEquals(record.getSource().keyspaceTable.name(), keyspaceTable("cdc_table"));
                Assert.assertTrue(record.getSource().offsetPosition.fileName.contains(String.valueOf(CommitLog.instance.getCurrentPosition().segmentId)));
            } else {
                if (!(eOFEvent instanceof EOFEvent)) {
                    throw new Exception("unexpected event type");
                }
                Assert.assertTrue(eOFEvent.success);
            }
        }
        deleteTestKeyspaceTables();
    }
}
