package io.simplesource.kafka.testutils;

import io.simplesource.api.CommandAPI;
import io.simplesource.api.CommandError;
import io.simplesource.data.NonEmptyList;
import io.simplesource.data.Sequence;
import io.simplesource.kafka.model.AggregateUpdate;
import io.simplesource.kafka.model.CommandResponse;
import io.simplesource.kafka.model.ValueWithSequence;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Optional;
import java.util.UUID;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.kafka.streams.KeyValue;
import org.junit.jupiter.api.Assertions;

/* loaded from: input_file:io/simplesource/kafka/testutils/AggregateTestHelper.class */
public final class AggregateTestHelper<K, C, E, A> {
    private final AggregateTestDriver<K, C, E, A> testAPI;

    /* loaded from: input_file:io/simplesource/kafka/testutils/AggregateTestHelper$PublishBuilder.class */
    public final class PublishBuilder {
        private final K key;
        private final Sequence readSequence;
        private final C command;

        PublishBuilder(K k, Sequence sequence, C c) {
            this.key = k;
            this.readSequence = sequence;
            this.command = c;
        }

        public AggregateTestHelper<K, C, E, A>.PublishResponse expecting(NonEmptyList<E> nonEmptyList, A a) {
            return AggregateTestHelper.this.publishExpectingSuccess(this.key, this.readSequence, this.command, nonEmptyList, a);
        }

        public void expectingFailure(Consumer<NonEmptyList<CommandError>> consumer) {
            AggregateTestHelper.this.publishExpectingError(this.key, this.readSequence, this.command, consumer);
        }

        public void expectingFailure(NonEmptyList<CommandError.Reason> nonEmptyList) {
            expectingFailure(nonEmptyList2 -> {
                Assertions.assertEquals(nonEmptyList, nonEmptyList2.map((v0) -> {
                    return v0.getReason();
                }));
            });
        }
    }

    /* loaded from: input_file:io/simplesource/kafka/testutils/AggregateTestHelper$PublishResponse.class */
    public final class PublishResponse {
        private final K key;
        private final AggregateUpdate<A> aggregateUpdate;

        PublishResponse(K k, AggregateUpdate<A> aggregateUpdate) {
            this.key = k;
            this.aggregateUpdate = aggregateUpdate;
        }

        public AggregateTestHelper<K, C, E, A>.PublishBuilder thenPublish(C c) {
            return new PublishBuilder(this.key, this.aggregateUpdate.sequence(), c);
        }

        public AggregateTestHelper<K, C, E, A>.PublishBuilder thenPublish(Function<AggregateUpdate<A>, ValueWithSequence<C>> function) {
            ValueWithSequence<C> apply = function.apply(this.aggregateUpdate);
            return new PublishBuilder(this.key, apply.sequence(), apply.value());
        }
    }

    public AggregateTestHelper(AggregateTestDriver<K, C, E, A> aggregateTestDriver) {
        this.testAPI = aggregateTestDriver;
    }

    public AggregateTestHelper<K, C, E, A>.PublishBuilder publishCommand(K k, Sequence sequence, C c) {
        return new PublishBuilder(k, sequence, c);
    }

    private UUID publish(K k, Sequence sequence, C c) {
        UUID randomUUID = UUID.randomUUID();
        return (UUID) this.testAPI.publishCommand(new CommandAPI.Request<>(k, sequence, randomUUID, c)).unsafePerform(AggregateTestHelper::commandError).fold(nonEmptyList -> {
            return (UUID) Assertions.fail("Publishing command " + c + " failed with " + nonEmptyList);
        }, uuid -> {
            Assertions.assertEquals(uuid, randomUUID);
            return randomUUID;
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public AggregateTestHelper<K, C, E, A>.PublishResponse publishExpectingSuccess(K k, Sequence sequence, C c, NonEmptyList<E> nonEmptyList, A a) {
        UUID publish = publish(k, sequence, c);
        NonEmptyList<Sequence> validateEvents = validateEvents(k, sequence, nonEmptyList);
        KeyValue<K, CommandResponse<K>> orElseGet = this.testAPI.readCommandResponseTopic().orElseGet(() -> {
            return (KeyValue) Assertions.fail("Didn't find command response");
        });
        Assertions.assertEquals(sequence, ((CommandResponse) orElseGet.value).readSequence());
        Assertions.assertEquals(true, Boolean.valueOf(((CommandResponse) orElseGet.value).sequenceResult().isSuccess()));
        ((CommandResponse) orElseGet.value).sequenceResult().ifSuccessful(sequence2 -> {
            Assertions.assertEquals(validateEvents.last(), sequence2);
        });
        KeyValue<K, AggregateUpdate<A>> orElseGet2 = this.testAPI.readAggregateTopic().orElseGet(() -> {
            return (KeyValue) Assertions.fail("Missing update on aggregate_update topic");
        });
        Assertions.assertEquals(k, orElseGet2.key);
        Assertions.assertEquals(validateEvents.last(), ((AggregateUpdate) orElseGet2.value).sequence());
        Assertions.assertEquals(a, ((AggregateUpdate) orElseGet2.value).aggregate());
        this.testAPI.queryCommandResult(publish, Duration.ofSeconds(30L)).unsafePerform(AggregateTestHelper::commandError).fold(nonEmptyList2 -> {
            return Assertions.fail("Failed to fetch result with commandId " + nonEmptyList2);
        }, sequence3 -> {
            Assertions.assertEquals(validateEvents.last(), sequence3);
            return null;
        });
        return new PublishResponse(k, (AggregateUpdate) orElseGet2.value);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void publishExpectingError(K k, Sequence sequence, C c, Consumer<NonEmptyList<CommandError>> consumer) {
        UUID publish = publish(k, sequence, c);
        KeyValue<K, CommandResponse<K>> orElseGet = this.testAPI.readCommandResponseTopic().orElseGet(() -> {
            return (KeyValue) Assertions.fail("Didn't find command response");
        });
        Assertions.assertEquals(publish, ((CommandResponse) orElseGet.value).commandId());
        Assertions.assertEquals(sequence, ((CommandResponse) orElseGet.value).readSequence());
        ((CommandResponse) orElseGet.value).sequenceResult().fold(nonEmptyList -> {
            consumer.accept(nonEmptyList);
            return null;
        }, sequence2 -> {
            return Assertions.fail("Expected update failure for command " + c + " but got update " + sequence2);
        });
        Assertions.assertEquals(Optional.empty(), this.testAPI.readEventTopic());
        Assertions.assertEquals(Optional.empty(), this.testAPI.readAggregateTopic());
        this.testAPI.queryCommandResult(publish, Duration.ofSeconds(30L)).unsafePerform(AggregateTestHelper::commandError).fold(nonEmptyList2 -> {
            consumer.accept(nonEmptyList2);
            return null;
        }, sequence3 -> {
            return Assertions.fail("Expected update failure for command " + c + " but got update " + sequence3);
        });
    }

    private NonEmptyList<Sequence> validateEvents(K k, Sequence sequence, NonEmptyList<E> nonEmptyList) {
        Sequence validEvent = validEvent(k, new ValueWithSequence<>(nonEmptyList.head(), sequence.next()));
        ArrayList arrayList = new ArrayList();
        Sequence next = validEvent.next();
        Iterator<E> it = nonEmptyList.tail().iterator();
        while (it.hasNext()) {
            arrayList.add(validEvent(k, new ValueWithSequence<>(it.next(), next)));
            next = next.next();
        }
        return new NonEmptyList<>(validEvent, arrayList);
    }

    private Sequence validEvent(K k, ValueWithSequence<E> valueWithSequence) {
        KeyValue<K, ValueWithSequence<E>> orElseGet = this.testAPI.readEventTopic().orElseGet(() -> {
            return (KeyValue) Assertions.fail("Missing update on event topic. Expected " + valueWithSequence);
        });
        Assertions.assertEquals(k, orElseGet.key);
        Assertions.assertEquals(valueWithSequence.sequence(), ((ValueWithSequence) orElseGet.value).sequence());
        Assertions.assertEquals(valueWithSequence.value(), ((ValueWithSequence) orElseGet.value).value());
        return valueWithSequence.sequence();
    }

    private static CommandError commandError(Exception exc) {
        return CommandError.of(CommandError.Reason.InternalError, exc);
    }
}
