package io.debezium.connector.cassandra;

import com.datastax.driver.core.KeyspaceMetadata;
import com.datastax.driver.core.SchemaChangeListener;
import com.datastax.driver.core.TableMetadata;
import io.debezium.connector.SourceInfoStructMaker;
import io.debezium.connector.cassandra.SchemaHolder;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.schema.KeyspaceParams;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/cassandra/SchemaProcessor.class */
public class SchemaProcessor extends AbstractProcessor {
    private static final Logger LOGGER = LoggerFactory.getLogger(SchemaProcessor.class);
    private static final String NAME = "Schema Processor";
    private final SchemaHolder schemaHolder;
    private final CassandraClient cassandraClient;
    private final String kafkaTopicPrefix;
    private final SourceInfoStructMaker sourceInfoStructMaker;
    private final SchemaChangeListener schemaChangeListener;

    public SchemaProcessor(CassandraConnectorContext cassandraConnectorContext) {
        super(NAME, cassandraConnectorContext.getCassandraConnectorConfig().schemaPollInterval());
        this.schemaHolder = cassandraConnectorContext.getSchemaHolder();
        this.cassandraClient = cassandraConnectorContext.getCassandraClient();
        this.kafkaTopicPrefix = this.schemaHolder.kafkaTopicPrefix;
        this.sourceInfoStructMaker = this.schemaHolder.sourceInfoStructMaker;
        this.schemaChangeListener = new NoOpSchemaChangeListener() { // from class: io.debezium.connector.cassandra.SchemaProcessor.1
            @Override // io.debezium.connector.cassandra.NoOpSchemaChangeListener
            public void onKeyspaceAdded(KeyspaceMetadata keyspaceMetadata) {
                try {
                    Schema.instance.setKeyspaceMetadata(org.apache.cassandra.schema.KeyspaceMetadata.create(keyspaceMetadata.getName(), KeyspaceParams.create(keyspaceMetadata.isDurableWrites(), keyspaceMetadata.getReplication())));
                    Keyspace.openWithoutSSTables(keyspaceMetadata.getName());
                    SchemaProcessor.LOGGER.info("Added keyspace {}", keyspaceMetadata.asCQLQuery());
                } catch (Throwable th) {
                    SchemaProcessor.LOGGER.error("Error happened while adding the keyspace {}", keyspaceMetadata.getName(), th);
                }
            }

            @Override // io.debezium.connector.cassandra.NoOpSchemaChangeListener
            public void onKeyspaceChanged(KeyspaceMetadata keyspaceMetadata, KeyspaceMetadata keyspaceMetadata2) {
                try {
                    Schema.instance.updateKeyspace(keyspaceMetadata.getName(), KeyspaceParams.create(keyspaceMetadata.isDurableWrites(), keyspaceMetadata.getReplication()));
                    SchemaProcessor.LOGGER.info("Updated keyspace {}", keyspaceMetadata.asCQLQuery());
                } catch (Throwable th) {
                    SchemaProcessor.LOGGER.error("Error happened while updating the keyspace {}", keyspaceMetadata.getName(), th);
                }
            }

            @Override // io.debezium.connector.cassandra.NoOpSchemaChangeListener
            public void onKeyspaceRemoved(KeyspaceMetadata keyspaceMetadata) {
                try {
                    SchemaProcessor.this.schemaHolder.removeSchemasOfAllTablesInKeyspace(keyspaceMetadata.getName());
                    Schema.instance.clearKeyspaceMetadata(org.apache.cassandra.schema.KeyspaceMetadata.create(keyspaceMetadata.getName(), KeyspaceParams.create(keyspaceMetadata.isDurableWrites(), keyspaceMetadata.getReplication())));
                    SchemaProcessor.LOGGER.info("Removed keyspace {}", keyspaceMetadata.asCQLQuery());
                } catch (Throwable th) {
                    SchemaProcessor.LOGGER.error("Error happened while removing the keyspace {}", keyspaceMetadata.getName(), th);
                }
            }

            @Override // io.debezium.connector.cassandra.NoOpSchemaChangeListener
            public void onTableAdded(TableMetadata tableMetadata) {
                try {
                    SchemaProcessor.LOGGER.debug("Table {}.{} detected to be added!", tableMetadata.getKeyspace().getName(), tableMetadata.getName());
                    if (tableMetadata.getOptions().isCDC()) {
                        SchemaProcessor.this.schemaHolder.addOrUpdateTableSchema(new KeyspaceTable(tableMetadata), new SchemaHolder.KeyValueSchema(SchemaProcessor.this.kafkaTopicPrefix, tableMetadata, SchemaProcessor.this.sourceInfoStructMaker));
                    }
                    CFMetaData copy = CFMetaData.compile(tableMetadata.asCQLQuery(), tableMetadata.getKeyspace().getName()).copy(tableMetadata.getId());
                    Keyspace.open(copy.ksName).initCf(copy, false);
                    org.apache.cassandra.schema.KeyspaceMetadata kSMetaData = Schema.instance.getKSMetaData(copy.ksName);
                    if (kSMetaData == null) {
                        SchemaProcessor.LOGGER.warn("Keyspace {} doesn't exist", copy.ksName);
                        return;
                    }
                    if (kSMetaData.tables.get(tableMetadata.getName()).isPresent()) {
                        SchemaProcessor.LOGGER.debug("Table {}.{} is already added!", tableMetadata.getKeyspace(), tableMetadata.getName());
                        return;
                    }
                    Function function = keyspaceMetadata -> {
                        return keyspaceMetadata.withSwapped(keyspaceMetadata.tables.with(copy));
                    };
                    Schema.instance.setKeyspaceMetadata((org.apache.cassandra.schema.KeyspaceMetadata) function.apply(kSMetaData));
                    Schema.instance.load(copy);
                    SchemaProcessor.LOGGER.info("Added schema for table {}", tableMetadata.asCQLQuery());
                } catch (Throwable th) {
                    SchemaProcessor.LOGGER.error(String.format("Error happend while adding table %s.%s", tableMetadata.getKeyspace(), tableMetadata.getName()), th);
                }
            }

            @Override // io.debezium.connector.cassandra.NoOpSchemaChangeListener
            public void onTableRemoved(TableMetadata tableMetadata) {
                try {
                    SchemaProcessor.LOGGER.info(String.format("Table %s.%s detected to be removed!", tableMetadata.getKeyspace().getName(), tableMetadata.getName()));
                    if (tableMetadata.getOptions().isCDC()) {
                        SchemaProcessor.this.schemaHolder.removeTableSchema(new KeyspaceTable(tableMetadata));
                    }
                    String name = tableMetadata.getKeyspace().getName();
                    String name2 = tableMetadata.getName();
                    org.apache.cassandra.schema.KeyspaceMetadata kSMetaData = Schema.instance.getKSMetaData(tableMetadata.getKeyspace().getName());
                    if (kSMetaData == null) {
                        SchemaProcessor.LOGGER.warn("KeyspaceMetadata for keyspace {} is not found!", tableMetadata.getKeyspace().getName());
                        return;
                    }
                    ColumnFamilyStore columnFamilyStore = Keyspace.openWithoutSSTables(name).getColumnFamilyStore(name2);
                    if (columnFamilyStore == null) {
                        SchemaProcessor.LOGGER.warn("ColumnFamilyStore for {}.{} is not found!", tableMetadata.getKeyspace(), tableMetadata.getName());
                        return;
                    }
                    columnFamilyStore.indexManager.markAllIndexesRemoved();
                    CFMetaData cFMetaData = (CFMetaData) kSMetaData.tables.get(name2).get();
                    org.apache.cassandra.schema.KeyspaceMetadata withSwapped = kSMetaData.withSwapped(kSMetaData.tables.without(name2));
                    Schema.instance.unload(cFMetaData);
                    Schema.instance.setKeyspaceMetadata(withSwapped);
                    SchemaProcessor.LOGGER.info("Removed schema for table {}", tableMetadata.asCQLQuery());
                } catch (Throwable th) {
                    SchemaProcessor.LOGGER.error(String.format("Error happened while removing table %s.%s", tableMetadata.getKeyspace(), tableMetadata.getName()), th);
                }
            }

            @Override // io.debezium.connector.cassandra.NoOpSchemaChangeListener
            public void onTableChanged(TableMetadata tableMetadata, TableMetadata tableMetadata2) {
                try {
                    SchemaProcessor.LOGGER.debug("Detected alternation in schema of {}.{} (previous cdc = {}, current cdc = {})", new Object[]{tableMetadata.getKeyspace().getName(), tableMetadata.getName(), Boolean.valueOf(tableMetadata2.getOptions().isCDC()), Boolean.valueOf(tableMetadata.getOptions().isCDC())});
                    if (tableMetadata.getOptions().isCDC()) {
                        SchemaProcessor.this.schemaHolder.addOrUpdateTableSchema(new KeyspaceTable(tableMetadata), new SchemaHolder.KeyValueSchema(SchemaProcessor.this.kafkaTopicPrefix, tableMetadata, SchemaProcessor.this.sourceInfoStructMaker));
                    } else if (tableMetadata2.getOptions().isCDC()) {
                        SchemaProcessor.this.schemaHolder.removeTableSchema(new KeyspaceTable(tableMetadata));
                    }
                    CFMetaData compile = CFMetaData.compile(tableMetadata.asCQLQuery(), tableMetadata.getKeyspace().getName());
                    CFMetaData cFMetaData = Schema.instance.getCFMetaData(tableMetadata2.getKeyspace().getName(), tableMetadata2.getName());
                    cFMetaData.apply(compile.copy(cFMetaData.cfId));
                    SchemaProcessor.LOGGER.info("Updated schema for table {}", tableMetadata.asCQLQuery());
                } catch (Throwable th) {
                    SchemaProcessor.LOGGER.error(String.format("Error happened while reacting on changed table %s.%s", tableMetadata.getKeyspace(), tableMetadata.getName()), th);
                }
            }
        };
    }

    @Override // io.debezium.connector.cassandra.AbstractProcessor
    public void initialize() {
        Map map = (Map) this.schemaHolder.getCdcEnabledTableMetadataSet().stream().collect(Collectors.toMap(KeyspaceTable::new, tableMetadata -> {
            return new SchemaHolder.KeyValueSchema(this.kafkaTopicPrefix, tableMetadata, this.sourceInfoStructMaker);
        }));
        SchemaHolder schemaHolder = this.schemaHolder;
        Objects.requireNonNull(schemaHolder);
        map.forEach(schemaHolder::addOrUpdateTableSchema);
        LOGGER.info("Registering schema change listener ...");
        this.cassandraClient.getCluster().register(this.schemaChangeListener);
    }

    @Override // io.debezium.connector.cassandra.AbstractProcessor
    public void process() {
    }

    @Override // io.debezium.connector.cassandra.AbstractProcessor
    public void destroy() {
        LOGGER.info("Unregistering schema change listener ...");
        this.cassandraClient.getCluster().unregister(this.schemaChangeListener);
        LOGGER.info("Clearing cdc keyspace / table map ... ");
        this.schemaHolder.tableToKVSchemaMap.clear();
    }
}
