package io.kiw.speedy;

import io.kiw.speedy.helper.ImmutableIntMap;
import io.kiw.speedy.management.HostRegistrationMessage;
import io.kiw.speedy.marshaller.MessageUnMarshaller;
import io.kiw.speedy.marshaller.PacketHandler;
import io.kiw.speedy.publisher.SchedulerThread;
import io.kiw.speedy.publisher.SpeedyMessagingPublisher;
import io.kiw.speedy.subscriber.GenericHandler;
import io.kiw.speedy.subscriber.OnMessageErrorHandler;
import io.kiw.speedy.subscriber.SpeedyMessageHandler;
import io.kiw.speedy.subscriber.SpeedyMessageReplyHandler;
import io.kiw.speedy.subscriber.SpeedyMessagingSubscriber;
import io.kiw.speedy.subscriber.SpeedyMessagingSubscriberFactory;
import io.kiw.speedy.subscriber.SubscriberChannelState;
import io.kiw.speedy.wiring.SpeedyWiring;
import io.kiw.speedy.wiring.thread.ThreadHandler;
import java.io.IOException;
import java.util.Map;

/* loaded from: input_file:io/kiw/speedy/SpeedyMessagingImpl.class */
public class SpeedyMessagingImpl implements SpeedyMessaging {
    public static final int DATAGRAM_LENGTH = 5920;
    private SpeedyMessagingSubscriber speedyMessagingSubscriber;
    private final SpeedyMessagingPublisher speedyMessagingPublisher;
    private final SpeedyHost localhost;
    private final ImmutableIntMap<PublisherBucket> publisherBuckets;
    private final ImmutableIntMap<SpeedyConnection> remoteConnections;
    private final Map<Integer, SubscriberChannelState> channelStates;
    private final SchedulerThread schedulerThread;
    private final SpeedyWiring wiring;
    private SpeedyMessagingSubscriberFactory speedyMessagingSubscriberFactory;

    public SpeedyMessagingImpl(SpeedyHost speedyHost, OnMessageErrorHandler onMessageErrorHandler, SpeedyWiring speedyWiring, ImmutableIntMap<PublisherBucket> immutableIntMap, ImmutableIntMap<SpeedyConnection> immutableIntMap2, Map<Integer, SubscriberChannelState> map, PacketHandler packetHandler, SchedulerThread schedulerThread) {
        this.wiring = speedyWiring;
        this.localhost = speedyHost;
        this.publisherBuckets = immutableIntMap;
        this.remoteConnections = immutableIntMap2;
        this.channelStates = map;
        this.schedulerThread = schedulerThread;
        this.speedyMessagingPublisher = new SpeedyMessagingPublisher(packetHandler, speedyWiring, schedulerThread);
        this.speedyMessagingSubscriberFactory = new SpeedyMessagingSubscriberFactory(speedyHost, new MessageUnMarshaller(), immutableIntMap2, this.speedyMessagingPublisher, onMessageErrorHandler, speedyWiring, immutableIntMap, schedulerThread, map);
    }

    public void start() {
        subscribeToAllResponsesHandlers();
        this.speedyMessagingSubscriber = this.speedyMessagingSubscriberFactory.build();
        assertAllKeysHaveBeenSubscribedTo();
        this.wiring.start(this.speedyMessagingPublisher, this.speedyMessagingSubscriber, this.schedulerThread, this.remoteConnections);
        connectToRemoteHosts();
    }

    @Override // io.kiw.speedy.SpeedyMessaging
    public void subscribe(String str, SpeedyMessageHandler speedyMessageHandler) {
        this.speedyMessagingSubscriberFactory.addSubscriptionHandler(str, new GenericHandler(speedyMessageHandler));
    }

    @Override // io.kiw.speedy.SpeedyMessaging
    public void subscribe(String str, SpeedyMessageReplyHandler speedyMessageReplyHandler) {
        this.speedyMessagingSubscriberFactory.addSubscriptionHandler(str, new GenericHandler(speedyMessageReplyHandler));
    }

    @Override // io.kiw.speedy.SpeedyMessaging
    public void publish(String str, byte[] bArr) {
        this.speedyMessagingPublisher.publish(str, bArr);
    }

    @Override // io.kiw.speedy.SpeedyMessaging
    public void publish(String str, byte[] bArr, int i) {
        this.speedyMessagingPublisher.publish(str, bArr, i);
    }

    @Override // io.kiw.speedy.SpeedyMessaging
    public void request(String str, byte[] bArr, SpeedyMessageHandler speedyMessageHandler) {
        this.speedyMessagingPublisher.request(str, bArr, speedyMessageHandler);
    }

    @Override // io.kiw.speedy.SpeedyMessaging
    public void close() throws IOException {
        this.speedyMessagingPublisher.flush();
        if (this.speedyMessagingSubscriber != null) {
            this.speedyMessagingSubscriber.close();
        }
        this.speedyMessagingPublisher.close();
    }

    private void connectToRemoteHosts() {
        ThreadHandler threadHandler = this.wiring.getThreadHandler();
        for (SpeedyConnection speedyConnection : (SpeedyConnection[]) this.remoteConnections.values().toArray(new SpeedyConnection[this.remoteConnections.size()])) {
            this.speedyMessagingPublisher.hostRegistration(new HostRegistrationMessage(this.localhost, speedyConnection.getHost()), speedyConnection);
        }
        threadHandler.join();
        this.wiring.connectIfSingleConnection(this.remoteConnections);
    }

    private void assertAllKeysHaveBeenSubscribedTo() {
        this.speedyMessagingSubscriberFactory.assertAllKeysHaveBeenSubscribedTo();
    }

    private void subscribeToAllResponsesHandlers() {
        for (SubscriberChannelState subscriberChannelState : this.channelStates.values()) {
            if (subscriberChannelState.getResponseKey() != null) {
                String responseKey = subscriberChannelState.getResponseKey();
                subscriberChannelState.getClass();
                subscribe(responseKey, subscriberChannelState::handleResponse);
            }
        }
    }
}
