package io.trino.plugin.exchange.filesystem;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Multimap;
import io.airlift.concurrent.MoreFutures;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
import io.airlift.units.DataSize;
import io.trino.spi.QueryId;
import io.trino.spi.exchange.Exchange;
import io.trino.spi.exchange.ExchangeContext;
import io.trino.spi.exchange.ExchangeId;
import io.trino.spi.exchange.ExchangeManager;
import io.trino.spi.exchange.ExchangeSink;
import io.trino.spi.exchange.ExchangeSinkHandle;
import io.trino.spi.exchange.ExchangeSinkInstanceHandle;
import io.trino.spi.exchange.ExchangeSource;
import io.trino.spi.exchange.ExchangeSourceHandle;
import io.trino.spi.exchange.ExchangeSourceHandleSource;
import io.trino.spi.exchange.ExchangeSourceOutputSelector;
import java.util.ArrayDeque;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import org.assertj.core.api.Assertions;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:io/trino/plugin/exchange/filesystem/AbstractTestExchangeManager.class */
public abstract class AbstractTestExchangeManager {
    private ExchangeManager exchangeManager;

    @BeforeClass
    public void init() throws Exception {
        this.exchangeManager = createExchangeManager();
    }

    @AfterClass(alwaysRun = true)
    public void destroy() throws Exception {
        if (this.exchangeManager != null) {
            this.exchangeManager = null;
        }
    }

    protected abstract ExchangeManager createExchangeManager();

    @Test
    public void testHappyPath() throws Exception {
        ExchangeId createRandomExchangeId = ExchangeId.createRandomExchangeId();
        Exchange createExchange = this.exchangeManager.createExchange(new ExchangeContext(new QueryId("query"), createRandomExchangeId), 2, false);
        ExchangeSinkHandle addSink = createExchange.addSink(0);
        ExchangeSinkHandle addSink2 = createExchange.addSink(1);
        ExchangeSinkHandle addSink3 = createExchange.addSink(2);
        createExchange.noMoreSinks();
        writeData(createExchange.instantiateSink(addSink, 0), ImmutableListMultimap.of(0, "0-0-0", 1, "0-1-0", 0, "0-0-1", 1, "0-1-1"), true);
        createExchange.sinkFinished(addSink, 0);
        writeData(createExchange.instantiateSink(addSink, 1), ImmutableListMultimap.of(0, "0-0-0", 1, "0-1-0", 0, "0-0-1", 1, "0-1-1"), true);
        createExchange.sinkFinished(addSink, 1);
        writeData(createExchange.instantiateSink(addSink, 2), ImmutableListMultimap.of(0, "failed", 1, "another failed"), false);
        createExchange.sinkFinished(addSink, 2);
        writeData(createExchange.instantiateSink(addSink2, 0), ImmutableListMultimap.of(0, "1-0-0", 1, "1-1-0", 0, "1-0-1", 1, "1-1-1"), true);
        createExchange.sinkFinished(addSink2, 0);
        writeData(createExchange.instantiateSink(addSink2, 1), ImmutableListMultimap.of(0, "1-0-0", 1, "1-1-0", 0, "1-0-1", 1, "1-1-1"), true);
        createExchange.sinkFinished(addSink2, 1);
        writeData(createExchange.instantiateSink(addSink2, 2), ImmutableListMultimap.of(0, "more failed", 1, "another failed"), false);
        createExchange.sinkFinished(addSink2, 2);
        writeData(createExchange.instantiateSink(addSink3, 2), ImmutableListMultimap.of(0, "2-0-0", 1, "2-1-0"), true);
        createExchange.sinkFinished(addSink3, 2);
        createExchange.allRequiredSinksFinished();
        ExchangeSourceHandleSource.ExchangeSourceHandleBatch exchangeSourceHandleBatch = (ExchangeSourceHandleSource.ExchangeSourceHandleBatch) createExchange.getSourceHandles().getNextBatch().get();
        Assert.assertTrue(exchangeSourceHandleBatch.lastBatch());
        List handles = exchangeSourceHandleBatch.handles();
        Assertions.assertThat(handles).hasSize(2);
        Map map = (Map) handles.stream().collect(ImmutableMap.toImmutableMap((v0) -> {
            return v0.getPartitionId();
        }, Function.identity()));
        ExchangeSourceOutputSelector build = ExchangeSourceOutputSelector.builder(ImmutableSet.of(createRandomExchangeId)).include(createRandomExchangeId, 0, 0).include(createRandomExchangeId, 1, 0).include(createRandomExchangeId, 2, 2).setPartitionCount(createRandomExchangeId, 3).setFinal().build();
        Assertions.assertThat(readData((ExchangeSourceHandle) map.get(0), build)).containsExactlyInAnyOrder(new String[]{"0-0-0", "0-0-1", "1-0-0", "1-0-1", "2-0-0"});
        Assertions.assertThat(readData((ExchangeSourceHandle) map.get(1), build)).containsExactlyInAnyOrder(new String[]{"0-1-0", "0-1-1", "1-1-0", "1-1-1", "2-1-0"});
        createExchange.close();
    }

    @Test
    public void testLargePages() throws Exception {
        String repeat = "a".repeat(Math.toIntExact(DataSize.of(123L, DataSize.Unit.BYTE).toBytes()));
        String repeat2 = "b".repeat(Math.toIntExact(DataSize.of(66L, DataSize.Unit.KILOBYTE).toBytes()));
        String repeat3 = "c".repeat(Math.toIntExact(DataSize.of(5L, DataSize.Unit.MEGABYTE).toBytes()) - 4);
        String repeat4 = "d".repeat(Math.toIntExact(DataSize.of(16L, DataSize.Unit.MEGABYTE).toBytes()) - 4);
        ExchangeId createRandomExchangeId = ExchangeId.createRandomExchangeId();
        Exchange createExchange = this.exchangeManager.createExchange(new ExchangeContext(new QueryId("query"), createRandomExchangeId), 3, false);
        ExchangeSinkHandle addSink = createExchange.addSink(0);
        ExchangeSinkHandle addSink2 = createExchange.addSink(1);
        ExchangeSinkHandle addSink3 = createExchange.addSink(2);
        createExchange.noMoreSinks();
        writeData(createExchange.instantiateSink(addSink, 0), new ImmutableListMultimap.Builder().putAll(0, ImmutableList.of(repeat)).putAll(1, ImmutableList.of(repeat4, repeat2)).putAll(2, ImmutableList.of()).build(), true);
        createExchange.sinkFinished(addSink, 0);
        writeData(createExchange.instantiateSink(addSink2, 0), new ImmutableListMultimap.Builder().putAll(0, ImmutableList.of(repeat2)).putAll(1, ImmutableList.of(repeat3)).putAll(2, ImmutableList.of(repeat)).build(), true);
        createExchange.sinkFinished(addSink2, 0);
        writeData(createExchange.instantiateSink(addSink3, 0), new ImmutableListMultimap.Builder().putAll(0, ImmutableList.of(repeat3, repeat4)).putAll(1, ImmutableList.of(repeat)).putAll(2, ImmutableList.of(repeat4, repeat3, repeat2)).build(), true);
        createExchange.sinkFinished(addSink3, 0);
        createExchange.allRequiredSinksFinished();
        ExchangeSourceHandleSource.ExchangeSourceHandleBatch exchangeSourceHandleBatch = (ExchangeSourceHandleSource.ExchangeSourceHandleBatch) createExchange.getSourceHandles().getNextBatch().get();
        Assert.assertTrue(exchangeSourceHandleBatch.lastBatch());
        List handles = exchangeSourceHandleBatch.handles();
        Assertions.assertThat(handles).hasSize(10);
        ListMultimap listMultimap = (ListMultimap) handles.stream().collect(ImmutableListMultimap.toImmutableListMultimap((v0) -> {
            return v0.getPartitionId();
        }, Function.identity()));
        ExchangeSourceOutputSelector build = ExchangeSourceOutputSelector.builder(ImmutableSet.of(createRandomExchangeId)).include(createRandomExchangeId, 0, 0).include(createRandomExchangeId, 1, 0).include(createRandomExchangeId, 2, 0).setPartitionCount(createRandomExchangeId, 3).setFinal().build();
        Assertions.assertThat(readData(listMultimap.get(0), build)).containsExactlyInAnyOrder(new String[]{repeat, repeat2, repeat3, repeat4});
        Assertions.assertThat(readData(listMultimap.get(1), build)).containsExactlyInAnyOrder(new String[]{repeat, repeat2, repeat3, repeat4});
        Assertions.assertThat(readData(listMultimap.get(2), build)).containsExactlyInAnyOrder(new String[]{repeat, repeat2, repeat3, repeat4});
        createExchange.close();
    }

    @Test
    public void testMaxOutputPartitionCountCheck() {
        Assertions.assertThatThrownBy(() -> {
            this.exchangeManager.createExchange(new ExchangeContext(new QueryId("query"), ExchangeId.createRandomExchangeId()), 51, false);
        }).hasMessageContaining("Max number of output partitions exceeded for exchange").hasFieldOrPropertyWithValue("errorCode", FileSystemExchangeErrorCode.MAX_OUTPUT_PARTITION_COUNT_EXCEEDED.toErrorCode());
    }

    private void writeData(ExchangeSinkInstanceHandle exchangeSinkInstanceHandle, Multimap<Integer, String> multimap, boolean z) {
        ExchangeSink createSink = this.exchangeManager.createSink(exchangeSinkInstanceHandle);
        multimap.forEach((num, str) -> {
            createSink.add(num.intValue(), Slices.utf8Slice(str));
        });
        if (z) {
            MoreFutures.getFutureValue(createSink.finish());
        } else {
            MoreFutures.getFutureValue(createSink.abort());
        }
    }

    private List<String> readData(ExchangeSourceHandle exchangeSourceHandle, ExchangeSourceOutputSelector exchangeSourceOutputSelector) {
        return readData((List<ExchangeSourceHandle>) ImmutableList.of(exchangeSourceHandle), exchangeSourceOutputSelector);
    }

    private List<String> readData(List<ExchangeSourceHandle> list, ExchangeSourceOutputSelector exchangeSourceOutputSelector) {
        ImmutableList.Builder builder = ImmutableList.builder();
        ExchangeSource createSource = this.exchangeManager.createSource();
        try {
            createSource.setOutputSelector(exchangeSourceOutputSelector);
            ArrayDeque arrayDeque = new ArrayDeque(list);
            while (!createSource.isFinished()) {
                Slice read = createSource.read();
                if (read != null) {
                    builder.add(read.toStringUtf8());
                }
                ExchangeSourceHandle exchangeSourceHandle = (ExchangeSourceHandle) arrayDeque.poll();
                if (exchangeSourceHandle != null) {
                    createSource.addSourceHandles(ImmutableList.of(exchangeSourceHandle));
                } else {
                    createSource.noMoreSourceHandles();
                }
            }
            if (createSource != null) {
                createSource.close();
            }
            return builder.build();
        } catch (Throwable th) {
            if (createSource != null) {
                try {
                    createSource.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }
}
