package io.vertx.tp.jet.uca.micro;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.eventbus.EventBus;
import io.vertx.tp.jet.JetThanatos;
import io.vertx.tp.jet.atom.JtUri;
import io.vertx.tp.jet.monitor.JtMonitor;
import io.vertx.tp.jet.refine.Jt;
import io.vertx.tp.optic.jet.JtConsumer;
import io.vertx.up.commune.Envelop;
import io.vertx.up.fn.Fn;
import io.vertx.up.util.Ut;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

/* loaded from: input_file:io/vertx/tp/jet/uca/micro/JtMinos.class */
public class JtMinos extends AbstractVerticle {
    private final transient JetThanatos ensurer = JetThanatos.create(getClass());
    private final transient JtMonitor monitor = JtMonitor.create(getClass());

    public void start() {
        EventBus eventBus = this.vertx.eventBus();
        ConcurrentMap<String, JtUri> answer = Jt.answer(config());
        consumeAddr(answer).forEach((str, jtConsumer) -> {
            eventBus.consumer(str, message -> {
                Envelop envelop = (Envelop) message.body();
                JtUri jtUri = (JtUri) answer.get(envelop.key());
                if (Objects.isNull(jtUri)) {
                    message.reply(this.ensurer.to500DefinitionError(envelop.key()));
                } else {
                    this.monitor.receiveData(envelop.key(), jtUri);
                    jtConsumer.async(envelop, jtUri).setHandler(asyncResult -> {
                        if (asyncResult.succeeded()) {
                            message.reply(asyncResult.result());
                        } else {
                            message.reply(Envelop.failure(asyncResult.cause()));
                        }
                    });
                }
            });
        });
    }

    private ConcurrentMap<String, JtConsumer> consumeAddr(ConcurrentMap<String, JtUri> concurrentMap) {
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        concurrentMap.values().stream().map((v0) -> {
            return v0.worker();
        }).forEach(jtWorker -> {
            String workerAddress = jtWorker.getWorkerAddress();
            Class<?> workerConsumer = jtWorker.getWorkerConsumer();
            JtConsumer jtConsumer = (JtConsumer) Fn.poolThread(Pool.CONSUMER_CLS, () -> {
                return (JtConsumer) Ut.instance(workerConsumer, new Object[0]);
            });
            if (Ut.notNil(workerAddress) && Objects.nonNull(jtConsumer)) {
                concurrentHashMap.put(workerAddress, jtConsumer);
            }
        });
        return concurrentHashMap;
    }
}
