package io.debezium.connector.cassandra;

import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
import io.debezium.config.Configuration;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.cassandra.CassandraSchemaFactory;
import io.debezium.connector.cassandra.Record;
import java.io.File;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.awaitility.Awaitility;
import org.junit.Assert;
import org.junit.Before;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/cassandra/CommitLogRealTimeParserTest.class */
public class CommitLogRealTimeParserTest extends AbstractCommitLogProcessorTest {
    private static final Logger LOGGER = LoggerFactory.getLogger(CommitLogRealTimeParserTest.class);
    private CommitLogIdxProcessor commitLogProcessor;

    @Override // io.debezium.connector.cassandra.AbstractCommitLogProcessorTest
    @Before
    public void setUp() throws Exception {
        super.setUp();
        this.commitLogProcessor = new CommitLogIdxProcessor(this.context, this.metrics, new Cassandra4CommitLogSegmentReader(this.context, this.metrics), new File(DatabaseDescriptor.getCDCLogLocation()));
        readLogs();
    }

    public CassandraConnectorContext generateTaskContext() throws Exception {
        Properties generateDefaultConfigMap = TestUtils.generateDefaultConfigMap();
        generateDefaultConfigMap.put(CassandraConnectorConfig.COMMIT_LOG_REAL_TIME_PROCESSING_ENABLED.name(), "true");
        generateDefaultConfigMap.put(CassandraConnectorConfig.COMMIT_LOG_MARKED_COMPLETE_POLL_INTERVAL_IN_MS.name(), "1000");
        return generateTaskContext(Configuration.from(generateDefaultConfigMap));
    }

    @Override // io.debezium.connector.cassandra.AbstractCommitLogProcessorTest
    public void initialiseData() throws Exception {
        createTable("CREATE TABLE IF NOT EXISTS %s.%s (a int, b int, PRIMARY KEY(a)) WITH cdc = true;");
        insertRows(3, 10);
    }

    private void insertRows(int i, int i2) {
        for (int i3 = 0; i3 < i; i3++) {
            TestUtils.runCql(QueryBuilder.insertInto("test_keyspace", TestUtils.TEST_TABLE_NAME).value("a", QueryBuilder.literal(Integer.valueOf(i3 + i2))).value("b", QueryBuilder.literal(Integer.valueOf(i3))).build());
        }
        LOGGER.info("Inserted rows: {}", Integer.valueOf(i));
    }

    @Override // io.debezium.connector.cassandra.AbstractCommitLogProcessorTest
    public void verifyEvents() {
        verify(3, 10);
        insertRows(2, 20);
        verify(2, 20);
    }

    private void verify(int i, int i2) {
        ArrayList arrayList = new ArrayList();
        Awaitility.await().atMost(5L, TimeUnit.SECONDS).until(() -> {
            arrayList.addAll(((ChangeEventQueue) this.context.getQueues().get(0)).poll());
            return Boolean.valueOf(arrayList.size() == i);
        });
        LOGGER.info("Total events received: {}", Integer.valueOf(arrayList.size()));
        Assert.assertEquals("Total number of events received must be " + i, i, arrayList.size());
        for (int i3 = 0; i3 < i; i3++) {
            Record record = (Record) arrayList.get(i3);
            Record.Operation op = record.getOp();
            Assert.assertEquals("Operation type must be insert but it was " + String.valueOf(op), Record.Operation.INSERT, op);
            Assert.assertEquals("Inserted key should be " + i3 + i2, ((CassandraSchemaFactory.CellData) record.getRowData().getPrimary().get(0)).value, Integer.valueOf(i3 + i2));
        }
    }

    private void readLogs() {
        ChangeEventQueue changeEventQueue = (ChangeEventQueue) this.context.getQueues().get(0);
        Assert.assertEquals(changeEventQueue.totalCapacity(), changeEventQueue.remainingCapacity());
        String cDCLogLocation = DatabaseDescriptor.getCDCLogLocation();
        LOGGER.info("CDC Location: {}", cDCLogLocation);
        Awaitility.await().timeout(Duration.ofSeconds(3L)).until(() -> {
            return Boolean.valueOf(CommitLogUtil.getIndexes(new File(cDCLogLocation)).length >= 1);
        });
        File[] indexes = CommitLogUtil.getIndexes(new File(cDCLogLocation));
        Arrays.sort(indexes, (file, file2) -> {
            return CommitLogUtil.compareCommitLogsIndexes(file, file2);
        });
        Assert.assertTrue("At least one idx file must be generated", indexes.length >= 1);
        this.commitLogProcessor.submit(indexes[indexes.length - 1].toPath());
    }
}
