package io.prestosql.plugin.kafka;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.airlift.json.JsonCodec;
import io.airlift.log.Logger;
import io.airlift.log.Logging;
import io.airlift.testing.Closeables;
import io.airlift.tpch.TpchTable;
import io.airlift.units.Duration;
import io.prestosql.Session;
import io.prestosql.metadata.Metadata;
import io.prestosql.metadata.QualifiedObjectName;
import io.prestosql.plugin.kafka.util.CodecSupplier;
import io.prestosql.plugin.kafka.util.EmbeddedKafka;
import io.prestosql.plugin.kafka.util.TestUtils;
import io.prestosql.plugin.tpch.TpchPlugin;
import io.prestosql.spi.connector.SchemaTableName;
import io.prestosql.testing.TestingSession;
import io.prestosql.tests.DistributedQueryRunner;
import io.prestosql.tests.TestingPrestoClient;
import java.io.Closeable;
import java.util.Iterator;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;

/* loaded from: input_file:io/prestosql/plugin/kafka/KafkaQueryRunner.class */
public final class KafkaQueryRunner {
    private static final Logger log = Logger.get("TestQueries");
    private static final String TPCH_SCHEMA = "tpch";

    private KafkaQueryRunner() {
    }

    public static DistributedQueryRunner createKafkaQueryRunner(EmbeddedKafka embeddedKafka, TpchTable<?>... tpchTableArr) throws Exception {
        return createKafkaQueryRunner(embeddedKafka, (Iterable<TpchTable<?>>) ImmutableList.copyOf(tpchTableArr));
    }

    public static DistributedQueryRunner createKafkaQueryRunner(EmbeddedKafka embeddedKafka, Iterable<TpchTable<?>> iterable) throws Exception {
        DistributedQueryRunner distributedQueryRunner = null;
        try {
            distributedQueryRunner = new DistributedQueryRunner(createSession(), 2);
            distributedQueryRunner.installPlugin(new TpchPlugin());
            distributedQueryRunner.createCatalog(TPCH_SCHEMA, TPCH_SCHEMA);
            embeddedKafka.start();
            Iterator<TpchTable<?>> it = iterable.iterator();
            while (it.hasNext()) {
                embeddedKafka.createTopics(kafkaTopicName(it.next()));
            }
            TestUtils.installKafkaPlugin(embeddedKafka, distributedQueryRunner, createTpchTopicDescriptions(distributedQueryRunner.getCoordinator().getMetadata(), iterable));
            TestingPrestoClient client = distributedQueryRunner.getClient();
            log.info("Loading data...");
            long nanoTime = System.nanoTime();
            Iterator<TpchTable<?>> it2 = iterable.iterator();
            while (it2.hasNext()) {
                loadTpchTopic(embeddedKafka, client, it2.next());
            }
            log.info("Loading complete in %s", new Object[]{Duration.nanosSince(nanoTime).toString(TimeUnit.SECONDS)});
            return distributedQueryRunner;
        } catch (Throwable th) {
            Closeables.closeAllSuppress(th, new Closeable[]{distributedQueryRunner, embeddedKafka});
            throw th;
        }
    }

    private static void loadTpchTopic(EmbeddedKafka embeddedKafka, TestingPrestoClient testingPrestoClient, TpchTable<?> tpchTable) {
        long nanoTime = System.nanoTime();
        log.info("Running import for %s", new Object[]{tpchTable.getTableName()});
        TestUtils.loadTpchTopic(embeddedKafka, testingPrestoClient, kafkaTopicName(tpchTable), new QualifiedObjectName(TPCH_SCHEMA, "tiny", tpchTable.getTableName().toLowerCase(Locale.ENGLISH)));
        log.info("Imported %s in %s", new Object[]{0, tpchTable.getTableName(), Duration.nanosSince(nanoTime).convertToMostSuccinctTimeUnit()});
    }

    private static String kafkaTopicName(TpchTable<?> tpchTable) {
        return "tpch." + tpchTable.getTableName().toLowerCase(Locale.ENGLISH);
    }

    private static Map<SchemaTableName, KafkaTopicDescription> createTpchTopicDescriptions(Metadata metadata, Iterable<TpchTable<?>> iterable) throws Exception {
        JsonCodec jsonCodec = new CodecSupplier(KafkaTopicDescription.class, metadata).get();
        ImmutableMap.Builder builder = ImmutableMap.builder();
        Iterator<TpchTable<?>> it = iterable.iterator();
        while (it.hasNext()) {
            SchemaTableName schemaTableName = new SchemaTableName(TPCH_SCHEMA, it.next().getTableName());
            builder.put(TestUtils.loadTpchTopicDescription(jsonCodec, schemaTableName.toString(), schemaTableName));
        }
        return builder.build();
    }

    public static Session createSession() {
        return TestingSession.testSessionBuilder().setCatalog("kafka").setSchema(TPCH_SCHEMA).build();
    }

    public static void main(String[] strArr) throws Exception {
        Logging.initialize();
        DistributedQueryRunner createKafkaQueryRunner = createKafkaQueryRunner(EmbeddedKafka.createEmbeddedKafka(), TpchTable.getTables());
        Thread.sleep(10L);
        Logger logger = Logger.get(KafkaQueryRunner.class);
        logger.info("======== SERVER STARTED ========");
        logger.info("\n====\n%s\n====", new Object[]{createKafkaQueryRunner.getCoordinator().getBaseUrl()});
    }
}
