package io.debezium.connector.cassandra;

import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.cassandra.Event;
import io.debezium.connector.cassandra.Record;
import io.debezium.util.Testing;
import java.io.File;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.commitlog.CommitLogReadHandler;
import org.apache.cassandra.db.commitlog.CommitLogReader;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:io/debezium/connector/cassandra/AbstractCommitLogProcessorTest.class */
public abstract class AbstractCommitLogProcessorTest extends EmbeddedCassandra4ConnectorTestBase {
    public CassandraConnectorContext context;
    protected CommitLogProcessorMetrics metrics = new CommitLogProcessorMetrics();
    private CommitLogReadHandler commitLogReadHandler;

    @Before
    public void setUp() throws Exception {
        initialiseData();
        this.context = generateTaskContext();
        Awaitility.await().atMost(Duration.ofSeconds(60L)).until(() -> {
            return Boolean.valueOf(this.context.getSchemaHolder().getKeyValueSchema(new KeyspaceTable("test_keyspace", TestUtils.TEST_TABLE_NAME)) != null);
        });
        this.commitLogReadHandler = new Cassandra4CommitLogReadHandlerImpl(this.context, this.metrics);
        this.metrics.registerMetrics();
    }

    @After
    public void tearDown() throws Exception {
        TestUtils.deleteTestOffsets(this.context);
        this.metrics.unregisterMetrics();
        TestUtils.deleteTestKeyspaceTables();
        this.context.cleanUp();
        Testing.Files.delete(DatabaseDescriptor.getCDCLogLocation());
    }

    @Test
    public void test() throws Exception {
        verifyEvents();
    }

    public abstract void initialiseData() throws Exception;

    public abstract void verifyEvents() throws Exception;

    public void createTable(String str) throws Exception {
        createTable(str, "test_keyspace", TestUtils.TEST_TABLE_NAME);
    }

    public void createTable2(String str) throws Exception {
        createTable(str, "test_keyspace", TestUtils.TEST_TABLE_NAME_2);
    }

    public void createTable(String str, String str2, String str3) throws Exception {
        TestUtils.runCql(String.format(str, str2, str3));
    }

    public List<Event> getEvents(int i) throws Exception {
        ChangeEventQueue changeEventQueue = (ChangeEventQueue) this.context.getQueues().get(0);
        ArrayList arrayList = new ArrayList();
        Awaitility.await().atMost(60L, TimeUnit.SECONDS).until(() -> {
            readLogs(changeEventQueue);
            arrayList.clear();
            arrayList.addAll(changeEventQueue.poll());
            return Boolean.valueOf(arrayList.size() == i);
        });
        Assert.assertEquals(i, arrayList.size());
        return arrayList;
    }

    private void readLogs(ChangeEventQueue<Event> changeEventQueue) throws Exception {
        Assert.assertEquals(changeEventQueue.totalCapacity(), changeEventQueue.remainingCapacity());
        File[] commitLogs = CommitLogUtil.getCommitLogs(new File(DatabaseDescriptor.getCommitLogLocation()));
        CommitLogReader commitLogReader = new CommitLogReader();
        for (File file : commitLogs) {
            commitLogReader.readCommitLogSegment(this.commitLogReadHandler, file, true);
        }
    }

    public void assertEventTypes(List<Event> list, Event.EventType eventType, Record.Operation... operationArr) {
        Assert.assertEquals(list.size(), operationArr.length);
        for (int i = 0; i < list.size(); i++) {
            Record record = list.get(i);
            Assert.assertEquals(record.getEventType(), eventType);
            Assert.assertEquals(operationArr[i], record.getOp());
        }
    }
}
