package io.rsocket.test;

import io.rsocket.Closeable;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.transport.ClientTransport;
import io.rsocket.transport.ServerTransport;
import io.rsocket.util.DefaultPayload;
import java.time.Duration;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;

/* loaded from: input_file:io/rsocket/test/TransportTest.class */
public interface TransportTest {

    /* loaded from: input_file:io/rsocket/test/TransportTest$TransportPair.class */
    public static final class TransportPair<T, S extends Closeable> implements Disposable {
        private static final String data = "hello world";
        private static final String metadata = "metadata";
        private final RSocket client;
        private final S server;

        public TransportPair(Supplier<T> supplier, BiFunction<T, S, ClientTransport> biFunction, Function<T, ServerTransport<S>> function) {
            T t = supplier.get();
            this.server = (S) RSocketFactory.receive().acceptor((connectionSetupPayload, rSocket) -> {
                return Mono.just(new TestRSocket(data, metadata));
            }).transport(function.apply(t)).start().block();
            this.client = (RSocket) RSocketFactory.connect().transport(biFunction.apply(t, this.server)).start().doOnError((v0) -> {
                v0.printStackTrace();
            }).block();
        }

        public void dispose() {
            this.server.dispose();
        }

        RSocket getClient() {
            return this.client;
        }

        public String expectedPayloadData() {
            return data;
        }

        public String expectedPayloadMetadata() {
            return metadata;
        }
    }

    @AfterEach
    default void close() {
        getTransportPair().dispose();
    }

    default Payload createTestPayload(int i) {
        CharSequence charSequence;
        switch (i % 5) {
            case 0:
                charSequence = null;
                break;
            case 1:
                charSequence = "";
                break;
            default:
                charSequence = "metadata";
                break;
        }
        return DefaultPayload.create("test-data", charSequence);
    }

    @DisplayName("makes 10 fireAndForget requests")
    @Test
    default void fireAndForget10() {
        ((StepVerifier.FirstStep) Flux.range(1, 10).flatMap(num -> {
            return getClient().fireAndForget(createTestPayload(num.intValue()));
        }).as((v0) -> {
            return StepVerifier.create(v0);
        })).expectNextCount(0L).expectComplete().verify(getTimeout());
    }

    default RSocket getClient() {
        return getTransportPair().getClient();
    }

    Duration getTimeout();

    TransportPair getTransportPair();

    @DisplayName("makes 10 metadataPush requests")
    @Test
    default void metadataPush10() {
        ((StepVerifier.FirstStep) Flux.range(1, 10).flatMap(num -> {
            return getClient().metadataPush(DefaultPayload.create("", "test-metadata"));
        }).as((v0) -> {
            return StepVerifier.create(v0);
        })).expectNextCount(0L).expectComplete().verify(getTimeout());
    }

    @DisplayName("makes 1 requestChannel request with 0 payloads")
    @Test
    default void requestChannel0() {
        ((StepVerifier.FirstStep) getClient().requestChannel(Flux.empty()).as((v0) -> {
            return StepVerifier.create(v0);
        })).expectNextCount(0L).expectComplete().verify(getTimeout());
    }

    @DisplayName("makes 1 requestChannel request with 1 payloads")
    @Test
    default void requestChannel1() {
        ((StepVerifier.FirstStep) getClient().requestChannel(Mono.just(createTestPayload(0))).as((v0) -> {
            return StepVerifier.create(v0);
        })).expectNextCount(1L).expectComplete().verify(getTimeout());
    }

    @DisplayName("makes 1 requestChannel request with 200,000 payloads")
    @Test
    default void requestChannel200_000() {
        ((StepVerifier.FirstStep) getClient().requestChannel(Flux.range(0, 200000).map((v1) -> {
            return createTestPayload(v1);
        })).as((v0) -> {
            return StepVerifier.create(v0);
        })).expectNextCount(200000L).expectComplete().verify(getTimeout());
    }

    @DisplayName("makes 1 requestChannel request with 20,000 payloads")
    @Test
    default void requestChannel20_000() {
        ((StepVerifier.FirstStep) getClient().requestChannel(Flux.range(0, 20000).map(num -> {
            return createTestPayload(7);
        })).doOnNext(this::assertChannelPayload).as((v0) -> {
            return StepVerifier.create(v0);
        })).expectNextCount(20000L).expectComplete().verify(getTimeout());
    }

    @DisplayName("makes 1 requestChannel request with 2,000,000 payloads")
    @SlowTest
    default void requestChannel2_000_000() {
        ((StepVerifier.FirstStep) getClient().requestChannel(Flux.range(0, 2000000).map((v1) -> {
            return createTestPayload(v1);
        })).as((v0) -> {
            return StepVerifier.create(v0);
        })).expectNextCount(2000000L).expectComplete().verify(getTimeout());
    }

    @DisplayName("makes 1 requestChannel request with 3 payloads")
    @Test
    default void requestChannel3() {
        ((StepVerifier.FirstStep) getClient().requestChannel(Flux.range(0, 3).map((v1) -> {
            return createTestPayload(v1);
        })).as((v0) -> {
            return StepVerifier.create(v0);
        })).expectNextCount(3L).expectComplete().verify(getTimeout());
    }

    @DisplayName("makes 1 requestChannel request with 512 payloads")
    @Test
    default void requestChannel512() {
        Flux map = Flux.range(0, 512).map((v1) -> {
            return createTestPayload(v1);
        });
        Flux.range(0, 1024).flatMap(num -> {
            return Mono.fromRunnable(() -> {
                check(map);
            }).subscribeOn(Schedulers.elastic());
        }, 12).blockLast();
    }

    default void check(Flux<Payload> flux) {
        ((StepVerifier.FirstStep) getClient().requestChannel(flux).as((v0) -> {
            return StepVerifier.create(v0);
        })).expectNextCount(512L).as("expected 512 items").expectComplete().verify(getTimeout());
    }

    @DisplayName("makes 1 requestResponse request")
    @Test
    default void requestResponse1() {
        ((StepVerifier.FirstStep) getClient().requestResponse(createTestPayload(1)).doOnNext(this::assertPayload).as((v0) -> {
            return StepVerifier.create(v0);
        })).expectNextCount(1L).expectComplete().verify(getTimeout());
    }

    @DisplayName("makes 10 requestResponse requests")
    @Test
    default void requestResponse10() {
        ((StepVerifier.FirstStep) Flux.range(1, 10).flatMap(num -> {
            return getClient().requestResponse(createTestPayload(num.intValue())).doOnNext(payload -> {
                assertPayload(payload);
            });
        }).as((v0) -> {
            return StepVerifier.create(v0);
        })).expectNextCount(10L).expectComplete().verify(getTimeout());
    }

    @DisplayName("makes 100 requestResponse requests")
    @Test
    default void requestResponse100() {
        ((StepVerifier.FirstStep) Flux.range(1, 100).flatMap(num -> {
            return getClient().requestResponse(createTestPayload(num.intValue())).map((v0) -> {
                return v0.getDataUtf8();
            });
        }).as((v0) -> {
            return StepVerifier.create(v0);
        })).expectNextCount(100L).expectComplete().verify(getTimeout());
    }

    @DisplayName("makes 10,000 requestResponse requests")
    @Test
    default void requestResponse10_000() {
        ((StepVerifier.FirstStep) Flux.range(1, 10000).flatMap(num -> {
            return getClient().requestResponse(createTestPayload(num.intValue())).map((v0) -> {
                return v0.getDataUtf8();
            });
        }).as((v0) -> {
            return StepVerifier.create(v0);
        })).expectNextCount(10000L).expectComplete().verify(getTimeout());
    }

    @DisplayName("makes 1 requestStream request and receives 10,000 responses")
    @Test
    default void requestStream10_000() {
        ((StepVerifier.FirstStep) getClient().requestStream(createTestPayload(3)).doOnNext(this::assertPayload).as((v0) -> {
            return StepVerifier.create(v0);
        })).expectNextCount(10000L).expectComplete().verify(getTimeout());
    }

    @DisplayName("makes 1 requestStream request and receives 5 responses")
    @Test
    default void requestStream5() {
        ((StepVerifier.FirstStep) getClient().requestStream(createTestPayload(3)).doOnNext(this::assertPayload).take(5L).as((v0) -> {
            return StepVerifier.create(v0);
        })).expectNextCount(5L).expectComplete().verify(getTimeout());
    }

    @DisplayName("makes 1 requestStream request and consumes result incrementally")
    @Test
    default void requestStreamDelayedRequestN() {
        ((StepVerifier.FirstStep) getClient().requestStream(createTestPayload(3)).take(10L).as((v0) -> {
            return StepVerifier.create(v0);
        })).thenRequest(5L).expectNextCount(5L).thenRequest(5L).expectNextCount(5L).expectComplete().verify(getTimeout());
    }

    default void assertPayload(Payload payload) {
        TransportPair transportPair = getTransportPair();
        if (!transportPair.expectedPayloadData().equals(payload.getDataUtf8()) || !transportPair.expectedPayloadMetadata().equals(payload.getMetadataUtf8())) {
            throw new IllegalStateException("Unexpected payload");
        }
    }

    default void assertChannelPayload(Payload payload) {
        if (!"test-data".equals(payload.getDataUtf8()) || !"metadata".equals(payload.getMetadataUtf8())) {
            throw new IllegalStateException("Unexpected payload");
        }
    }
}
