package io.simplesource.kafka.testutils;

import io.simplesource.api.CommandAPI;
import io.simplesource.api.CommandError;
import io.simplesource.api.CommandId;
import io.simplesource.data.FutureResult;
import io.simplesource.data.Sequence;
import io.simplesource.kafka.api.AggregateResources;
import io.simplesource.kafka.api.AggregateSerdes;
import io.simplesource.kafka.dsl.KafkaConfig;
import io.simplesource.kafka.internal.client.KafkaCommandAPI;
import io.simplesource.kafka.internal.client.RequestAPIContext;
import io.simplesource.kafka.internal.streams.topology.EventSourcedTopology;
import io.simplesource.kafka.internal.streams.topology.TopologyContext;
import io.simplesource.kafka.internal.util.NamedThreadFactory;
import io.simplesource.kafka.model.AggregateUpdate;
import io.simplesource.kafka.model.CommandResponse;
import io.simplesource.kafka.model.ValueWithSequence;
import io.simplesource.kafka.spec.AggregateSpec;
import io.simplesource.kafka.spec.CommandSpec;
import io.simplesource.kafka.testutils.TestTopologyReceiver;
import io.simplesource.kafka.util.SpecUtils;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TopologyTestDriver;

/* loaded from: input_file:io/simplesource/kafka/testutils/AggregateTestDriver.class */
public final class AggregateTestDriver<K, C, E, A> {
    private final TopologyTestDriver driver;
    private final AggregateSpec<K, C, E, A> aggregateSpec;
    private final AggregateSerdes<K, C, E, A> aggregateSerdes;
    private final KafkaCommandAPI<K, C> commandAPI;
    private final ArrayList<Runnable> statePollers;

    public AggregateTestDriver(AggregateSpec<K, C, E, A> aggregateSpec, KafkaConfig kafkaConfig) {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        TopologyContext topologyContext = new TopologyContext(aggregateSpec);
        ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("QueryAPI-scheduler"));
        EventSourcedTopology.addTopology(topologyContext, streamsBuilder);
        this.aggregateSpec = aggregateSpec;
        this.aggregateSerdes = aggregateSpec.serialization().serdes();
        Properties properties = new Properties();
        properties.putAll(kafkaConfig.streamsConfig());
        this.driver = new TopologyTestDriver(streamsBuilder.build(), properties, 0L);
        TestPublisher testPublisher = new TestPublisher(this.driver, this.aggregateSerdes.aggregateKey(), this.aggregateSerdes.commandRequest(), topicName(AggregateResources.TopicEntity.command_request));
        TestPublisher testPublisher2 = new TestPublisher(this.driver, this.aggregateSerdes.commandId(), Serdes.String(), topicName(AggregateResources.TopicEntity.command_response_topic_map));
        CommandSpec commandSpec = SpecUtils.getCommandSpec(aggregateSpec, "localhost");
        RequestAPIContext requestAPIContext = KafkaCommandAPI.getRequestAPIContext(commandSpec, kafkaConfig, newSingleThreadScheduledExecutor);
        TestTopologyReceiver.ReceiverSpec receiverSpec = new TestTopologyReceiver.ReceiverSpec(requestAPIContext.privateResponseTopic(), 400, 4, requestAPIContext.responseValueSerde(), str -> {
            return CommandId.of(UUID.fromString(str.substring(str.length() - 36)));
        });
        this.statePollers = new ArrayList<>();
        this.commandAPI = new KafkaCommandAPI<>(commandSpec, kafkaConfig, newSingleThreadScheduledExecutor, testPublisher, testPublisher2, biConsumer -> {
            TestTopologyReceiver testTopologyReceiver = new TestTopologyReceiver(biConsumer, this.driver, receiverSpec);
            ArrayList<Runnable> arrayList = this.statePollers;
            Objects.requireNonNull(testTopologyReceiver);
            arrayList.add(testTopologyReceiver::pollForState);
            return testTopologyReceiver;
        });
    }

    public FutureResult<CommandError, CommandId> publishCommand(CommandAPI.Request<K, C> request) {
        return this.commandAPI.publishCommand(request);
    }

    public FutureResult<CommandError, Sequence> queryCommandResult(CommandId commandId, Duration duration) {
        pollForAPIResponse();
        return this.commandAPI.queryCommandResult(commandId, duration);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Optional<KeyValue<K, AggregateUpdate<A>>> readAggregateTopic() {
        return Optional.ofNullable(this.driver.readOutput(topicName(AggregateResources.TopicEntity.aggregate), this.aggregateSerdes.aggregateKey().deserializer(), this.aggregateSerdes.aggregateUpdate().deserializer())).map(producerRecord -> {
            return KeyValue.pair(producerRecord.key(), (AggregateUpdate) producerRecord.value());
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Optional<KeyValue<K, CommandResponse<K>>> readCommandResponseTopic() {
        return Optional.ofNullable(this.driver.readOutput(topicName(AggregateResources.TopicEntity.command_response), this.aggregateSerdes.aggregateKey().deserializer(), this.aggregateSerdes.commandResponse().deserializer())).map(producerRecord -> {
            return KeyValue.pair(producerRecord.key(), (CommandResponse) producerRecord.value());
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Optional<KeyValue<K, ValueWithSequence<E>>> readEventTopic() {
        return Optional.ofNullable(this.driver.readOutput(topicName(AggregateResources.TopicEntity.event), this.aggregateSerdes.aggregateKey().deserializer(), this.aggregateSerdes.valueWithSequence().deserializer())).map(producerRecord -> {
            return KeyValue.pair(producerRecord.key(), (ValueWithSequence) producerRecord.value());
        });
    }

    public void pollForAPIResponse() {
        this.statePollers.forEach((v0) -> {
            v0.run();
        });
    }

    public void close() {
        if (this.driver != null) {
            this.driver.close();
        }
    }

    private String topicName(AggregateResources.TopicEntity topicEntity) {
        return this.aggregateSpec.serialization().resourceNamingStrategy().topicName(this.aggregateSpec.aggregateName(), topicEntity.name());
    }
}
