package io.debezium.connector.cassandra;

import io.debezium.config.Configuration;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.cassandra.Record;
import io.debezium.time.Conversions;
import java.util.Properties;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.kafka.connect.data.Schema;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:io/debezium/connector/cassandra/QueueProcessorTest.class */
public class QueueProcessorTest extends EmbeddedCassandraConnectorTestBase {
    private CassandraConnectorContext context;
    private QueueProcessor queueProcessor;
    private KafkaRecordEmitter emitter;

    @Before
    public void setUp() throws Exception {
        this.context = generateTaskContext();
        this.emitter = (KafkaRecordEmitter) Mockito.mock(KafkaRecordEmitter.class);
        this.queueProcessor = new QueueProcessor(this.context, this.emitter);
    }

    @After
    public void tearDown() {
        this.context.cleanUp();
    }

    @Test
    public void testProcessChangeRecords() throws Exception {
        ((KafkaRecordEmitter) Mockito.doNothing().when(this.emitter)).emit((Record) ArgumentMatchers.any());
        ChangeEventQueue queue = this.context.getQueue();
        for (int i = 0; i < 5; i++) {
            queue.enqueue(new ChangeRecord(new SourceInfo(new CassandraConnectorConfig(Configuration.from(new Properties())), DatabaseDescriptor.getClusterName(), new OffsetPosition("CommitLog-6-123.log", i), new KeyspaceTable(EmbeddedCassandraConnectorTestBase.TEST_KEYSPACE, "cdc_table"), false, Conversions.toInstantFromMicros(System.currentTimeMillis() * 1000)), new RowData(), Schema.INT32_SCHEMA, Schema.INT32_SCHEMA, Record.Operation.INSERT, false));
        }
        Assert.assertEquals(5, queue.totalCapacity() - queue.remainingCapacity());
        this.queueProcessor.process();
        ((KafkaRecordEmitter) Mockito.verify(this.emitter, Mockito.times(5))).emit((Record) ArgumentMatchers.any());
        Assert.assertEquals(queue.totalCapacity(), queue.remainingCapacity());
    }

    @Test
    public void testProcessTombstoneRecords() throws Exception {
        ((KafkaRecordEmitter) Mockito.doNothing().when(this.emitter)).emit((Record) ArgumentMatchers.any());
        ChangeEventQueue queue = this.context.getQueue();
        for (int i = 0; i < 5; i++) {
            queue.enqueue(new TombstoneRecord(new SourceInfo(new CassandraConnectorConfig(Configuration.from(new Properties())), DatabaseDescriptor.getClusterName(), new OffsetPosition("CommitLog-6-123.log", i), new KeyspaceTable(EmbeddedCassandraConnectorTestBase.TEST_KEYSPACE, "cdc_table"), false, Conversions.toInstantFromMicros(System.currentTimeMillis() * 1000)), new RowData(), Schema.INT32_SCHEMA));
        }
        Assert.assertEquals(5, queue.totalCapacity() - queue.remainingCapacity());
        this.queueProcessor.process();
        ((KafkaRecordEmitter) Mockito.verify(this.emitter, Mockito.times(5))).emit((Record) ArgumentMatchers.any());
        Assert.assertEquals(queue.totalCapacity(), queue.remainingCapacity());
    }

    @Test
    public void testProcessEofEvent() throws Exception {
        ((KafkaRecordEmitter) Mockito.doNothing().when(this.emitter)).emit((Record) ArgumentMatchers.any());
        this.context.getQueue().enqueue(new EOFEvent(generateCommitLogFile(), true));
        Assert.assertEquals(1L, r0.totalCapacity() - r0.remainingCapacity());
        this.queueProcessor.process();
        ((KafkaRecordEmitter) Mockito.verify(this.emitter, Mockito.times(0))).emit((Record) ArgumentMatchers.any());
        Assert.assertEquals(r0.totalCapacity(), r0.remainingCapacity());
    }
}
