package io.vertx.camel;

import com.jayway.awaitility.Awaitility;
import com.jayway.awaitility.Duration;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.eventbus.Message;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import java.io.File;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.commons.io.FileUtils;
import org.assertj.core.api.Assertions;
import org.assertj.core.data.MapEntry;
import org.hamcrest.CoreMatchers;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;

@RunWith(VertxUnitRunner.class)
/* loaded from: input_file:io/vertx/camel/OutboundEndpointTest.class */
public class OutboundEndpointTest {
    private static final Duration DEFAULT_TIMEOUT = Duration.TEN_SECONDS;
    private Vertx vertx;
    private DefaultCamelContext camel;
    private CamelBridge bridge;

    @Before
    public void setUp() {
        this.vertx = Vertx.vertx();
        this.camel = new DefaultCamelContext();
    }

    @After
    public void tearDown(TestContext testContext) throws Exception {
        BridgeHelper.stopBlocking(this.bridge);
        this.camel.stop();
        this.vertx.close(testContext.asyncAssertSuccess());
    }

    @Test
    public void testWithMockWithASingleMessage() throws Exception {
        MockEndpoint createEndpoint = this.camel.getComponent("mock").createEndpoint("mock:foo");
        this.camel.addEndpoint("output", createEndpoint);
        this.bridge = CamelBridge.create(this.vertx, new CamelBridgeOptions(this.camel).addOutboundMapping(OutboundMapping.fromVertx("test").toCamel("output")));
        this.camel.start();
        BridgeHelper.startBlocking(this.bridge);
        this.vertx.eventBus().send("test", "hello");
        Awaitility.await().atMost(DEFAULT_TIMEOUT).until(() -> {
            return Boolean.valueOf(!createEndpoint.getExchanges().isEmpty());
        });
        createEndpoint.expectedBodiesReceived(new Object[]{"hello"});
        Exchange exchange = (Exchange) createEndpoint.getExchanges().get(0);
        Assertions.assertThat(exchange.getIn().getBody()).isEqualTo("hello");
        Assertions.assertThat(exchange.getIn().getHeaders()).hasSize(0);
    }

    @Test
    public void testWithMockWithASingleMessageUsingByteArray() throws Exception {
        byte[] randomBytes = getRandomBytes();
        MockEndpoint createEndpoint = this.camel.getComponent("mock").createEndpoint("mock:foo");
        this.camel.addEndpoint("output", createEndpoint);
        this.bridge = CamelBridge.create(this.vertx, new CamelBridgeOptions(this.camel).addOutboundMapping(OutboundMapping.fromVertx("test").toCamel("output")));
        this.camel.start();
        BridgeHelper.startBlocking(this.bridge);
        this.vertx.eventBus().send("test", randomBytes);
        Awaitility.await().atMost(DEFAULT_TIMEOUT).until(() -> {
            return Boolean.valueOf(!createEndpoint.getExchanges().isEmpty());
        });
        Assertions.assertThat((byte[]) ((Exchange) createEndpoint.getExchanges().get(0)).getIn().getBody()).isEqualTo(randomBytes);
    }

    @Test
    public void testWithMockWithASingleMessageUsingBuffer() throws Exception {
        byte[] randomBytes = getRandomBytes();
        MockEndpoint createEndpoint = this.camel.getComponent("mock").createEndpoint("mock:foo");
        this.camel.addEndpoint("output", createEndpoint);
        this.bridge = CamelBridge.create(this.vertx, new CamelBridgeOptions(this.camel).addOutboundMapping(OutboundMapping.fromVertx("test").toCamel("output")));
        this.camel.start();
        BridgeHelper.startBlocking(this.bridge);
        this.vertx.eventBus().send("test", Buffer.buffer(randomBytes));
        Awaitility.await().atMost(DEFAULT_TIMEOUT).until(() -> {
            return Boolean.valueOf(!createEndpoint.getExchanges().isEmpty());
        });
        Assertions.assertThat(((Exchange) createEndpoint.getExchanges().get(0)).getIn().getBody()).isEqualTo(Buffer.buffer(randomBytes));
    }

    private byte[] getRandomBytes() {
        byte[] bArr = new byte[1024];
        new Random().nextBytes(bArr);
        return bArr;
    }

    @Test
    public void testWithMockWithASingleMessageHeadersNotCopied() throws Exception {
        MockEndpoint createEndpoint = this.camel.getComponent("mock").createEndpoint("mock:foo");
        this.camel.addEndpoint("output", createEndpoint);
        this.bridge = CamelBridge.create(this.vertx, new CamelBridgeOptions(this.camel).addOutboundMapping(OutboundMapping.fromVertx("test").toCamel("output").withoutHeadersCopy()));
        this.camel.start();
        BridgeHelper.startBlocking(this.bridge);
        this.vertx.eventBus().send("test", "hello", new DeliveryOptions().addHeader("key", "value"));
        Awaitility.await().atMost(DEFAULT_TIMEOUT).until(() -> {
            return Boolean.valueOf(!createEndpoint.getExchanges().isEmpty());
        });
        createEndpoint.expectedBodiesReceived(new Object[]{"hello"});
        Exchange exchange = (Exchange) createEndpoint.getExchanges().get(0);
        Assertions.assertThat(exchange.getIn().getBody()).isEqualTo("hello");
        Assertions.assertThat(exchange.getIn().getHeaders()).doesNotContainKey("key");
    }

    @Test
    public void testWithMockWithASingleMessageHeadersCopied() throws Exception {
        MockEndpoint createEndpoint = this.camel.getComponent("mock").createEndpoint("mock:foo");
        this.camel.addEndpoint("output", createEndpoint);
        this.bridge = CamelBridge.create(this.vertx, new CamelBridgeOptions(this.camel).addOutboundMapping(OutboundMapping.fromVertx("test").toCamel("output")));
        this.camel.start();
        BridgeHelper.startBlocking(this.bridge);
        this.vertx.eventBus().send("test", "hello", new DeliveryOptions().addHeader("key", "value"));
        Awaitility.await().atMost(DEFAULT_TIMEOUT).until(() -> {
            return Boolean.valueOf(!createEndpoint.getExchanges().isEmpty());
        });
        createEndpoint.expectedBodiesReceived(new Object[]{"hello"});
        Exchange exchange = (Exchange) createEndpoint.getExchanges().get(0);
        Assertions.assertThat(exchange.getIn().getBody()).isEqualTo("hello");
        Assertions.assertThat(exchange.getIn().getHeaders()).contains(new Map.Entry[]{MapEntry.entry("key", "value")}).hasSize(1);
    }

    @Test
    public void testWithMockWithMultipleMessages() throws Exception {
        MockEndpoint createEndpoint = this.camel.getComponent("mock").createEndpoint("mock:foo");
        this.camel.addEndpoint("output", createEndpoint);
        this.bridge = CamelBridge.create(this.vertx, new CamelBridgeOptions(this.camel).addOutboundMapping(OutboundMapping.fromVertx("test").toCamel("output")));
        this.camel.start();
        BridgeHelper.startBlocking(this.bridge);
        this.vertx.eventBus().send("test", "hello");
        this.vertx.eventBus().send("test", "hello2");
        Awaitility.await().atMost(DEFAULT_TIMEOUT).until(() -> {
            return Boolean.valueOf(createEndpoint.getExchanges().size() == 2);
        });
        createEndpoint.expectedBodiesReceived(new Object[]{"hello", "hello2"});
    }

    @Test
    public void testWithMockUsingOptions() throws Exception {
        MockEndpoint createEndpoint = this.camel.getComponent("mock").createEndpoint("mock:foo?retainLast=2");
        this.camel.addEndpoint("output", createEndpoint);
        this.bridge = CamelBridge.create(this.vertx, new CamelBridgeOptions(this.camel).addOutboundMapping(OutboundMapping.fromVertx("test").toCamel("output")));
        this.camel.start();
        BridgeHelper.startBlocking(this.bridge);
        this.vertx.eventBus().send("test", "hello");
        this.vertx.eventBus().send("test", "hello2");
        Awaitility.await().atMost(DEFAULT_TIMEOUT).until(() -> {
            return Boolean.valueOf(createEndpoint.getExchanges().size() == 2);
        });
    }

    @Test
    public void testWithSeveralEndpoints() throws Exception {
        MockEndpoint createEndpoint = this.camel.getComponent("mock").createEndpoint("mock:foo");
        MockEndpoint endpoint = this.camel.getEndpoint("mock:foo2");
        this.camel.addEndpoint("output", createEndpoint);
        this.camel.addEndpoint("output2", endpoint);
        this.bridge = CamelBridge.create(this.vertx, new CamelBridgeOptions(this.camel).addOutboundMapping(OutboundMapping.fromVertx("test").toCamel("output")).addOutboundMapping(OutboundMapping.fromVertx("test").setEndpoint(endpoint)));
        this.camel.start();
        BridgeHelper.startBlocking(this.bridge);
        this.vertx.eventBus().publish("test", "hello");
        this.vertx.eventBus().publish("test", "hello2");
        Awaitility.await().atMost(DEFAULT_TIMEOUT).until(() -> {
            return Boolean.valueOf(createEndpoint.getExchanges().size() == 2);
        });
        Awaitility.await().atMost(DEFAULT_TIMEOUT).until(() -> {
            return Boolean.valueOf(endpoint.getExchanges().size() == 2);
        });
        createEndpoint.expectedBodiesReceived(new Object[]{"hello", "hello2"});
        endpoint.expectedBodiesReceived(new Object[]{"hello", "hello2"});
    }

    @Test
    public void testWithStreams() throws Exception {
        File file = new File("target/junk");
        File file2 = new File(file, "foo.txt");
        if (file2.exists()) {
            file2.delete();
        }
        file.mkdirs();
        this.camel.addEndpoint("output", this.camel.getEndpoint("stream:file?fileName=target/junk/foo.txt"));
        this.bridge = CamelBridge.create(this.vertx, new CamelBridgeOptions(this.camel).addOutboundMapping(OutboundMapping.fromVertx("test").toCamel("output")));
        this.camel.start();
        BridgeHelper.startBlocking(this.bridge);
        long currentTimeMillis = System.currentTimeMillis();
        this.vertx.eventBus().send("test", Long.valueOf(currentTimeMillis));
        Awaitility.await().atMost(DEFAULT_TIMEOUT).until(() -> {
            return Boolean.valueOf(file2.isFile() && FileUtils.readFileToString(file2).length() > 0);
        });
        String readFileToString = FileUtils.readFileToString(file2);
        Assertions.assertThat(readFileToString).contains(new CharSequence[]{Long.toString(currentTimeMillis)});
        long currentTimeMillis2 = System.currentTimeMillis();
        this.vertx.eventBus().send("test", Long.valueOf(currentTimeMillis2));
        Awaitility.await().atMost(DEFAULT_TIMEOUT).until(() -> {
            return Boolean.valueOf(FileUtils.readFileToString(file2).length() > readFileToString.length());
        });
        Assertions.assertThat(FileUtils.readFileToString(file2)).containsSequence(new CharSequence[]{Long.toString(currentTimeMillis), Long.toString(currentTimeMillis2)});
    }

    @Test
    public void testReply(TestContext testContext) throws Exception {
        this.camel.addRoutes(new RouteBuilder() { // from class: io.vertx.camel.OutboundEndpointTest.1
            public void configure() throws Exception {
                from("direct:start").transform(constant("OK"));
            }
        });
        this.bridge = CamelBridge.create(this.vertx, new CamelBridgeOptions(this.camel).addOutboundMapping(OutboundMapping.fromVertx("test").toCamel("direct:start")));
        this.camel.start();
        BridgeHelper.startBlocking(this.bridge);
        Async async = testContext.async();
        this.vertx.eventBus().send("test", "hello", asyncResult -> {
            testContext.assertEquals("OK", ((Message) asyncResult.result()).body());
            async.complete();
        });
    }

    @Test
    public void testWithRoute() throws Exception {
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        AtomicBoolean atomicBoolean2 = new AtomicBoolean();
        this.vertx.createHttpServer().requestHandler(httpServerRequest -> {
            atomicBoolean.set(true);
            httpServerRequest.response().end("Alright");
        }).listen(8081, asyncResult -> {
            atomicBoolean2.set(asyncResult.succeeded());
        });
        Awaitility.await().atMost(DEFAULT_TIMEOUT).untilAtomic(atomicBoolean2, CoreMatchers.is(true));
        this.camel.addRoutes(new RouteBuilder() { // from class: io.vertx.camel.OutboundEndpointTest.2
            public void configure() throws Exception {
                from("direct:my-route").to("seda:next").to("http://localhost:8081");
            }
        });
        this.bridge = CamelBridge.create(this.vertx, new CamelBridgeOptions(this.camel).addOutboundMapping(OutboundMapping.fromVertx("camel-route").toCamel("direct:my-route")));
        this.camel.start();
        BridgeHelper.startBlocking(this.bridge);
        this.vertx.eventBus().send("camel-route", "hello");
        Awaitility.await().atMost(DEFAULT_TIMEOUT).untilAtomic(atomicBoolean, CoreMatchers.is(true));
    }

    @Test
    public void testWithBlocking() throws Exception {
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        AtomicBoolean atomicBoolean2 = new AtomicBoolean();
        this.vertx.createHttpServer().requestHandler(httpServerRequest -> {
            atomicBoolean.set(true);
            httpServerRequest.response().end("Alright");
        }).listen(8081, asyncResult -> {
            atomicBoolean2.set(asyncResult.succeeded());
        });
        Awaitility.await().atMost(DEFAULT_TIMEOUT).untilAtomic(atomicBoolean2, CoreMatchers.is(true));
        this.camel.addRoutes(new RouteBuilder() { // from class: io.vertx.camel.OutboundEndpointTest.3
            public void configure() throws Exception {
                from("direct:my-route").process(exchange -> {
                    Thread.sleep(3000L);
                }).to("http://localhost:8081");
            }
        });
        this.bridge = CamelBridge.create(this.vertx, new CamelBridgeOptions(this.camel).addOutboundMapping(OutboundMapping.fromVertx("camel-route").toCamel("direct:my-route").setBlocking(true)));
        this.camel.start();
        BridgeHelper.startBlocking(this.bridge);
        this.vertx.eventBus().send("camel-route", "hello");
        Awaitility.await().atMost(DEFAULT_TIMEOUT).untilAtomic(atomicBoolean, CoreMatchers.is(true));
    }

    @Test
    public void testWithBlockingWithWorker() throws Exception {
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        AtomicBoolean atomicBoolean2 = new AtomicBoolean();
        this.vertx.createHttpServer().requestHandler(httpServerRequest -> {
            atomicBoolean.set(true);
            httpServerRequest.response().end("Alright");
        }).listen(8081, asyncResult -> {
            atomicBoolean2.set(asyncResult.succeeded());
        });
        Awaitility.await().atMost(DEFAULT_TIMEOUT).untilAtomic(atomicBoolean2, CoreMatchers.is(true));
        this.camel.addRoutes(new RouteBuilder() { // from class: io.vertx.camel.OutboundEndpointTest.4
            public void configure() throws Exception {
                from("direct:my-route").process(exchange -> {
                    Thread.sleep(3000L);
                }).to("http://localhost:8081");
            }
        });
        this.bridge = CamelBridge.create(this.vertx, new CamelBridgeOptions(this.camel).addOutboundMapping(OutboundMapping.fromVertx("camel-route").toCamel("direct:my-route").setBlocking(true).setWorkerExecutor(this.vertx.createSharedWorkerExecutor("some-fancy-name"))));
        this.camel.start();
        BridgeHelper.startBlocking(this.bridge);
        this.vertx.eventBus().send("camel-route", "hello");
        Awaitility.await().atMost(DEFAULT_TIMEOUT).untilAtomic(atomicBoolean, CoreMatchers.is(true));
    }

    @Test
    public void testWithRouteWithFailure() throws Exception {
        AtomicReference atomicReference = new AtomicReference();
        this.camel.addRoutes(new RouteBuilder() { // from class: io.vertx.camel.OutboundEndpointTest.5
            public void configure() throws Exception {
                from("direct:my-route").to("http://localhost:8081");
            }
        });
        this.bridge = CamelBridge.create(this.vertx, new CamelBridgeOptions(this.camel).addOutboundMapping(OutboundMapping.fromVertx("camel-route").toCamel("direct:my-route")));
        this.camel.start();
        BridgeHelper.startBlocking(this.bridge);
        this.vertx.eventBus().send("camel-route", "hello", asyncResult -> {
            atomicReference.set(asyncResult.cause().getMessage());
        });
        Awaitility.await().atMost(DEFAULT_TIMEOUT).untilAtomic(atomicReference, CoreMatchers.is("Connection refused"));
    }
}
