package org.eclipse.mosaic.fed.cell.module.streammodules;

import java.util.Iterator;
import org.eclipse.mosaic.fed.cell.chain.ChainManager;
import org.eclipse.mosaic.fed.cell.config.model.CNetworkProperties;
import org.eclipse.mosaic.fed.cell.message.CellModuleMessage;
import org.eclipse.mosaic.fed.cell.message.GeocasterResult;
import org.eclipse.mosaic.fed.cell.message.StreamResult;
import org.eclipse.mosaic.fed.cell.module.CellModuleNames;
import org.eclipse.mosaic.fed.cell.module.streammodules.StreamProcessor;
import org.eclipse.mosaic.fed.cell.utility.RegionUtility;
import org.eclipse.mosaic.fed.cell.viz.StreamListener;
import org.eclipse.mosaic.interactions.communication.V2xFullMessageReception;
import org.eclipse.mosaic.interactions.communication.V2xMessageAcknowledgement;
import org.eclipse.mosaic.interactions.communication.V2xMessageReception;
import org.eclipse.mosaic.lib.enums.ProtocolType;
import org.eclipse.mosaic.lib.objects.v2x.MessageStreamRouting;
import org.eclipse.mosaic.lib.objects.v2x.V2xMessage;
import org.eclipse.mosaic.lib.objects.v2x.V2xReceiverInformation;
import org.eclipse.mosaic.lib.util.scheduling.Event;
import org.eclipse.mosaic.rti.TIME;
import org.eclipse.mosaic.rti.api.InternalFederateException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/eclipse/mosaic/fed/cell/module/streammodules/DownstreamModule.class */
public final class DownstreamModule extends AbstractStreamModule {
    private static final Logger log = LoggerFactory.getLogger(DownstreamModule.class);

    public DownstreamModule(ChainManager chainManager) {
        super(CellModuleNames.DOWNSTREAM_MODULE, chainManager, log);
    }

    public void processEvent(Event event) throws Exception {
        Object resource = event.getResource();
        if (resource == null) {
            throw new RuntimeException("No input message (event resource) for " + this.moduleName);
        }
        if (!(resource instanceof CellModuleMessage)) {
            throw new RuntimeException("The resource of the event is not a CellModuleResultMessage");
        }
        CellModuleMessage cellModuleMessage = (CellModuleMessage) resource;
        if (cellModuleMessage.getResource() instanceof GeocasterResult) {
            processMessage((GeocasterResult) cellModuleMessage.getResource(), event.getTime());
        } else {
            if (!(cellModuleMessage.getResource() instanceof StreamResult) || !cellModuleMessage.getEmittingModule().equals(CellModuleNames.DOWNSTREAM_MODULE)) {
                throw new RuntimeException("Unsupported input message (resource of the event resource) for " + this.moduleName);
            }
            freeBandwidth(cellModuleMessage);
        }
    }

    private void processMessage(GeocasterResult geocasterResult, long j) throws InternalFederateException {
        log.debug("t={}: Entering processMessage() of module {}", TIME.format(j), getModuleName());
        switch (geocasterResult.getDownstreamMode()) {
            case DownlinkUnicast:
                doUnicast(geocasterResult, j);
                return;
            case DownlinkMulticast:
                doMulticast(geocasterResult, j);
                return;
            default:
                throw new RuntimeException("Unsupported transmission mode " + geocasterResult.getDownstreamMode());
        }
    }

    private void doUnicast(GeocasterResult geocasterResult, long j) throws InternalFederateException {
        for (CNetworkProperties cNetworkProperties : geocasterResult.getReceivers().keySet()) {
            for (String str : geocasterResult.getReceivers().get(cNetworkProperties)) {
                StreamProcessor.Input node = new StreamProcessor.Input().module(CellModuleNames.DOWNSTREAM_MODULE, null).message(j, geocasterResult.getV2xMessage(), geocasterResult.getDownstreamMode()).node(str, cNetworkProperties);
                StreamProcessor.Result doStreamProcessing = doStreamProcessing(node);
                CellModuleMessage processResult = processResult(node, doStreamProcessing);
                if (doStreamProcessing.isAcknowledged()) {
                    sendReceptionInteraction(geocasterResult.isFullMessage(), doStreamProcessing.getMessageEndTime(), str, node.getV2xMessage(), extractReceiverInformation(doStreamProcessing, j));
                    notifyStreamListeners(node, doStreamProcessing, processResult);
                    sendAck(node, doStreamProcessing);
                }
            }
        }
    }

    private void doMulticast(GeocasterResult geocasterResult, long j) throws InternalFederateException {
        for (CNetworkProperties cNetworkProperties : geocasterResult.getReceivers().keySet()) {
            StreamProcessor.Input node = new StreamProcessor.Input().module(CellModuleNames.DOWNSTREAM_MODULE, null).message(j, geocasterResult.getV2xMessage(), geocasterResult.getDownstreamMode()).node(null, cNetworkProperties);
            StreamProcessor.Result doStreamProcessing = doStreamProcessing(node);
            CellModuleMessage processResult = processResult(node, doStreamProcessing);
            if (doStreamProcessing.isAcknowledged()) {
                V2xReceiverInformation extractReceiverInformation = extractReceiverInformation(doStreamProcessing, j);
                Iterator it = geocasterResult.getReceivers().get(cNetworkProperties).iterator();
                while (it.hasNext()) {
                    sendReceptionInteraction(geocasterResult.isFullMessage(), doStreamProcessing.getMessageEndTime(), (String) it.next(), node.getV2xMessage(), extractReceiverInformation);
                }
                notifyStreamListeners(node, doStreamProcessing, processResult);
            }
        }
    }

    private V2xReceiverInformation extractReceiverInformation(StreamProcessor.Result result, long j) {
        return new V2xReceiverInformation(result.getMessageEndTime()).sendTime(j).neededBandwidth(result.getRequiredBandwidthInBps());
    }

    private void sendReceptionInteraction(boolean z, long j, String str, V2xMessage v2xMessage, V2xReceiverInformation v2xReceiverInformation) {
        if (z) {
            this.chainManager.sendInteractionToRti(new V2xFullMessageReception(j, str, v2xMessage, v2xReceiverInformation));
        } else {
            this.chainManager.sendInteractionToRti(new V2xMessageReception(j, str, v2xMessage.getId(), v2xReceiverInformation));
        }
    }

    private void notifyStreamListeners(StreamProcessor.Input input, StreamProcessor.Result result, CellModuleMessage cellModuleMessage) throws InternalFederateException {
        String str = RegionUtility.getRegionForNode(input.getV2xMessage().getRouting().getSource().getSourceName()).id;
        if (!(cellModuleMessage.getResource() instanceof StreamResult)) {
            throw new InternalFederateException("The results from the DownstreamModule did not contain a StreamResult object");
        }
        StreamResult streamResult = (StreamResult) cellModuleMessage.getResource();
        long messageEndTime = result.getMessageEndTime();
        if (streamResult.getV2xMessage().getRouting() instanceof MessageStreamRouting) {
            messageEndTime += streamResult.getV2xMessage().getRouting().getStreamingDuration();
        }
        this.chainManager.notifyStreamListeners(new StreamListener.StreamParticipant(str, input.getMessageStartTime()), new StreamListener.StreamParticipant(input.getRegion().id, messageEndTime), new StreamListener.StreamProperties("*", Long.valueOf(streamResult.getConsumedBandwidth())));
    }

    private void sendAck(StreamProcessor.Input input, StreamProcessor.Result result) {
        if (input.getV2xMessage().getRouting().getDestination().getProtocolType().equals(ProtocolType.TCP)) {
            this.chainManager.sendInteractionToRti(new V2xMessageAcknowledgement(result.getMessageEndTime(), input.getV2xMessage()));
        }
    }
}
