package io.debezium.connector.cassandra;

import com.datastax.driver.core.ColumnMetadata;
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.DataType;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.TableMetadata;
import com.datastax.driver.core.querybuilder.BuiltStatement;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.datastax.driver.core.querybuilder.Select;
import io.debezium.DebeziumException;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.cassandra.CassandraConnectorConfig;
import io.debezium.connector.cassandra.CellData;
import io.debezium.connector.cassandra.SchemaHolder;
import io.debezium.connector.cassandra.exceptions.CassandraConnectorTaskException;
import io.debezium.connector.cassandra.transforms.CassandraTypeDeserializer;
import io.debezium.time.Conversions;
import io.debezium.util.Collect;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.kafka.connect.data.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/cassandra/SnapshotProcessor.class */
public class SnapshotProcessor extends AbstractProcessor {
    private static final String NAME = "Snapshot Processor";
    private static final String CASSANDRA_NOW_UNIXTIMESTAMP = "UNIXTIMESTAMPOF(NOW())";
    private static final String EXECUTION_TIME_ALIAS = "execution_time";
    private final CassandraClient cassandraClient;
    private final ChangeEventQueue<Event> queue;
    private final OffsetWriter offsetWriter;
    private final SchemaHolder schemaHolder;
    private final RecordMaker recordMaker;
    private final CassandraConnectorConfig.SnapshotMode snapshotMode;
    private final ConsistencyLevel consistencyLevel;
    private final Set<String> startedTableNames;
    private final SnapshotProcessorMetrics metrics;
    private boolean initial;
    private static final Logger LOGGER = LoggerFactory.getLogger(SnapshotProcessor.class);
    private static final Set<DataType.Name> collectionTypes = Collect.unmodifiableSet(new DataType.Name[]{DataType.Name.LIST, DataType.Name.SET, DataType.Name.MAP});

    public SnapshotProcessor(CassandraConnectorContext cassandraConnectorContext) {
        super(NAME, cassandraConnectorContext.getCassandraConnectorConfig().snapshotPollIntervalMs().toMillis());
        this.startedTableNames = new HashSet();
        this.metrics = new SnapshotProcessorMetrics();
        this.initial = true;
        this.cassandraClient = cassandraConnectorContext.getCassandraClient();
        this.queue = cassandraConnectorContext.getQueue();
        this.offsetWriter = cassandraConnectorContext.getOffsetWriter();
        this.schemaHolder = cassandraConnectorContext.getSchemaHolder();
        this.recordMaker = new RecordMaker(cassandraConnectorContext.getCassandraConnectorConfig().tombstonesOnDelete(), new Filters(cassandraConnectorContext.getCassandraConnectorConfig().fieldExcludeList()), cassandraConnectorContext.getCassandraConnectorConfig());
        this.snapshotMode = cassandraConnectorContext.getCassandraConnectorConfig().snapshotMode();
        this.consistencyLevel = cassandraConnectorContext.getCassandraConnectorConfig().snapshotConsistencyLevel();
    }

    @Override // io.debezium.connector.cassandra.AbstractProcessor
    public void initialize() {
        this.metrics.registerMetrics();
    }

    @Override // io.debezium.connector.cassandra.AbstractProcessor
    public void destroy() {
        this.metrics.unregisterMetrics();
    }

    @Override // io.debezium.connector.cassandra.AbstractProcessor
    public void process() {
        if (this.snapshotMode == CassandraConnectorConfig.SnapshotMode.ALWAYS) {
            snapshot();
        } else if (this.snapshotMode != CassandraConnectorConfig.SnapshotMode.INITIAL || !this.initial) {
            LOGGER.debug("Skipping snapshot [mode: {}]", this.snapshotMode);
        } else {
            snapshot();
            this.initial = false;
        }
    }

    synchronized void snapshot() {
        try {
            Set<TableMetadata> tablesToSnapshot = getTablesToSnapshot();
            if (tablesToSnapshot.isEmpty()) {
                LOGGER.info("No tables to snapshot");
            } else {
                String[] strArr = (String[]) tablesToSnapshot.stream().map(SnapshotProcessor::tableName).toArray(i -> {
                    return new String[i];
                });
                LOGGER.info("Found {} tables to snapshot: {}", Integer.valueOf(tablesToSnapshot.size()), strArr);
                long currentTimeMillis = System.currentTimeMillis();
                this.metrics.setTableCount(tablesToSnapshot.size());
                this.metrics.startSnapshot();
                for (TableMetadata tableMetadata : tablesToSnapshot) {
                    if (isRunning()) {
                        String tableName = tableName(tableMetadata);
                        LOGGER.info("Snapshotting table {}", tableName);
                        this.startedTableNames.add(tableName);
                        takeTableSnapshot(tableMetadata);
                        this.metrics.completeTable();
                    }
                }
                this.metrics.stopSnapshot();
                LOGGER.info("Snapshot completely queued in {} seconds for tables: {}", Long.valueOf(Duration.ofMillis(System.currentTimeMillis() - currentTimeMillis).getSeconds()), strArr);
            }
        } catch (IOException e) {
            throw new CassandraConnectorTaskException(e);
        }
    }

    private Set<TableMetadata> getTablesToSnapshot() {
        return (Set) this.schemaHolder.getCdcEnabledTableMetadataSet().stream().filter(tableMetadata -> {
            return !this.offsetWriter.isOffsetProcessed(tableName(tableMetadata), OffsetPosition.defaultOffsetPosition().serialize(), true);
        }).filter(tableMetadata2 -> {
            return !this.startedTableNames.contains(tableName(tableMetadata2));
        }).collect(Collectors.toSet());
    }

    private void takeTableSnapshot(TableMetadata tableMetadata) throws IOException {
        try {
            Statement generateSnapshotStatement = generateSnapshotStatement(tableMetadata);
            generateSnapshotStatement.setConsistencyLevel(this.consistencyLevel);
            LOGGER.info("Executing snapshot query '{}' with consistency level {}", generateSnapshotStatement.getQueryString(), generateSnapshotStatement.getConsistencyLevel());
            processResultSet(tableMetadata, this.cassandraClient.execute(generateSnapshotStatement));
            LOGGER.debug("The snapshot of table '{}' has been taken", tableName(tableMetadata));
        } catch (IOException e) {
            throw e;
        } catch (Exception e2) {
            throw new DebeziumException(String.format("Failed to snapshot table %s in keyspace %s", tableMetadata.getName(), tableMetadata.getKeyspace().getName()), e2);
        }
    }

    private static BuiltStatement generateSnapshotStatement(TableMetadata tableMetadata) {
        List<String> list = (List) tableMetadata.getColumns().stream().map((v0) -> {
            return v0.getName();
        }).collect(Collectors.toList());
        Set set = (Set) tableMetadata.getPrimaryKey().stream().map((v0) -> {
            return v0.getName();
        }).collect(Collectors.toSet());
        List list2 = (List) tableMetadata.getColumns().stream().filter(columnMetadata -> {
            return collectionTypes.contains(columnMetadata.getType().getName());
        }).map((v0) -> {
            return v0.getName();
        }).collect(Collectors.toList());
        Select.Selection as = QueryBuilder.select().raw(CASSANDRA_NOW_UNIXTIMESTAMP).as(EXECUTION_TIME_ALIAS);
        for (String str : list) {
            as.column(withQuotes(str));
            if (!set.contains(str) && !list2.contains(str)) {
                as.ttl(withQuotes(str)).as(ttlAlias(str));
            }
        }
        return as.from(tableMetadata.getKeyspace().getName(), tableMetadata.getName());
    }

    private void processResultSet(TableMetadata tableMetadata, ResultSet resultSet) throws IOException {
        String tableName = tableName(tableMetadata);
        KeyspaceTable keyspaceTable = new KeyspaceTable(tableMetadata);
        SchemaHolder.KeyValueSchema orUpdateKeyValueSchema = this.schemaHolder.getOrUpdateKeyValueSchema(keyspaceTable);
        Schema keySchema = orUpdateKeyValueSchema.keySchema();
        Schema valueSchema = orUpdateKeyValueSchema.valueSchema();
        Set set = (Set) tableMetadata.getPartitionKey().stream().map((v0) -> {
            return v0.getName();
        }).collect(Collectors.toSet());
        Set set2 = (Set) tableMetadata.getClusteringColumns().stream().map((v0) -> {
            return v0.getName();
        }).collect(Collectors.toSet());
        Iterator it = resultSet.iterator();
        long j = 0;
        if (!it.hasNext()) {
            this.offsetWriter.markOffset(tableName, OffsetPosition.defaultOffsetPosition().serialize(), true);
            this.offsetWriter.flush();
        }
        while (it.hasNext()) {
            if (!isRunning()) {
                LOGGER.warn("Terminated snapshot processing while table {} is in progress", tableName);
                this.metrics.setRowsScanned(tableName, Long.valueOf(j));
                return;
            }
            Row row = (Row) it.next();
            Object readExecutionTime = readExecutionTime(row);
            RowData extractRowData = extractRowData(row, tableMetadata.getColumns(), set, set2, readExecutionTime);
            boolean z = !it.hasNext();
            RecordMaker recordMaker = this.recordMaker;
            String clusterName = DatabaseDescriptor.getClusterName();
            OffsetPosition defaultOffsetPosition = OffsetPosition.defaultOffsetPosition();
            Instant instantFromMicros = Conversions.toInstantFromMicros(TimeUnit.MICROSECONDS.convert(((Long) readExecutionTime).longValue(), TimeUnit.MILLISECONDS));
            ChangeEventQueue<Event> changeEventQueue = this.queue;
            changeEventQueue.getClass();
            recordMaker.insert(clusterName, defaultOffsetPosition, keyspaceTable, true, instantFromMicros, extractRowData, keySchema, valueSchema, z, (v1) -> {
                r10.enqueue(v1);
            });
            j++;
            if (j % 10000 == 0) {
                LOGGER.info("Queued {} snapshot records from table {}", Long.valueOf(j), tableName);
                this.metrics.setRowsScanned(tableName, Long.valueOf(j));
            }
        }
        this.metrics.setRowsScanned(tableName, Long.valueOf(j));
    }

    private static RowData extractRowData(Row row, List<ColumnMetadata> list, Set<String> set, Set<String> set2, Object obj) {
        Object readColTtl;
        RowData rowData = new RowData();
        for (ColumnMetadata columnMetadata : list) {
            String name = columnMetadata.getName();
            Object readCol = readCol(row, name, columnMetadata);
            Long l = null;
            CellData.ColumnType type = getType(name, set, set2);
            if (type == CellData.ColumnType.REGULAR && readCol != null && !collectionTypes.contains(columnMetadata.getType().getName()) && (readColTtl = readColTtl(row, name)) != null && obj != null) {
                l = Long.valueOf(calculateDeletionTs(obj, readColTtl));
            }
            rowData.addCell(new CellData(name, readCol, l, type));
        }
        return rowData;
    }

    private static CellData.ColumnType getType(String str, Set<String> set, Set<String> set2) {
        return set.contains(str) ? CellData.ColumnType.PARTITION : set2.contains(str) ? CellData.ColumnType.CLUSTERING : CellData.ColumnType.REGULAR;
    }

    private static Object readExecutionTime(Row row) {
        return CassandraTypeDeserializer.deserialize(DataType.bigint(), row.getBytesUnsafe(EXECUTION_TIME_ALIAS));
    }

    private static Object readCol(Row row, String str, ColumnMetadata columnMetadata) {
        return CassandraTypeDeserializer.deserialize(columnMetadata.getType(), row.getBytesUnsafe(str));
    }

    private static Object readColTtl(Row row, String str) {
        return CassandraTypeDeserializer.deserialize(DataType.cint(), row.getBytesUnsafe(ttlAlias(str)));
    }

    private static long calculateDeletionTs(Object obj, Object obj2) {
        return TimeUnit.MICROSECONDS.convert(((Long) obj).longValue(), TimeUnit.MILLISECONDS) + TimeUnit.MICROSECONDS.convert(((Integer) obj2).intValue(), TimeUnit.SECONDS);
    }

    private static String ttlAlias(String str) {
        return str + "_ttl";
    }

    private static String withQuotes(String str) {
        return "\"" + str + "\"";
    }

    private static String tableName(TableMetadata tableMetadata) {
        return tableMetadata.getKeyspace().getName() + "." + tableMetadata.getName();
    }
}
