package org.eclipse.kura.internal.wire.fifo;

import java.util.ArrayList;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.eclipse.kura.configuration.ConfigurableComponent;
import org.eclipse.kura.wire.WireEmitter;
import org.eclipse.kura.wire.WireEnvelope;
import org.eclipse.kura.wire.WireHelperService;
import org.eclipse.kura.wire.WireReceiver;
import org.eclipse.kura.wire.WireSupport;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.wireadmin.Wire;

/* loaded from: input_file:org/eclipse/kura/internal/wire/fifo/Fifo.class */
public class Fifo implements WireEmitter, WireReceiver, ConfigurableComponent {
    private static final String DISCARD_ENVELOPES_PROP_NAME = "discard.envelopes";
    private static final String QUEUE_CAPACITY_PROP_NAME = "queue.capacity";
    private static final Logger logger = LogManager.getLogger(Fifo.class);
    private volatile WireHelperService wireHelperService;
    private WireSupport wireSupport;
    private FifoEmitterThread emitterThread;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/eclipse/kura/internal/wire/fifo/Fifo$FifoEmitterThread.class */
    public class FifoEmitterThread extends Thread {
        private final Lock lock = new ReentrantLock();
        private final Condition producer = this.lock.newCondition();
        private final Condition consumer = this.lock.newCondition();
        private boolean run = true;
        private final ArrayList<WireEnvelope> queue = new ArrayList<>();
        private final int queueCapacity;
        private Consumer<WireEnvelope> submitter;

        public FifoEmitterThread(String str, int i, boolean z) {
            this.queueCapacity = i;
            setName(str);
            if (z) {
                this.submitter = getEnvelopeDiscardingSubmitter();
            } else {
                this.submitter = getEmitterBlockingSubmitter();
            }
        }

        private Consumer<WireEnvelope> getEnvelopeDiscardingSubmitter() {
            return wireEnvelope -> {
                try {
                    this.lock.lock();
                    if (!this.run || this.queue.size() >= this.queueCapacity) {
                        Fifo.logger.debug("envelope discarded");
                        return;
                    }
                    this.queue.add(wireEnvelope);
                    this.producer.signal();
                    Fifo.logger.debug("envelope submitted");
                } finally {
                    this.lock.unlock();
                }
            };
        }

        private Consumer<WireEnvelope> getEmitterBlockingSubmitter() {
            return wireEnvelope -> {
                try {
                    this.lock.lock();
                    while (this.run && this.queue.size() >= this.queueCapacity) {
                        this.consumer.await();
                    }
                    if (this.run) {
                        this.queue.add(wireEnvelope);
                        this.producer.signal();
                        Fifo.logger.debug("envelope submitted");
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    Fifo.logger.warn("Interrupted while adding new envelope to queue", e);
                } finally {
                    this.lock.unlock();
                }
            };
        }

        public void shutdown() {
            try {
                this.lock.lock();
                this.run = false;
                this.producer.signalAll();
                this.consumer.signalAll();
            } finally {
                this.lock.unlock();
            }
        }

        public void submit(WireEnvelope wireEnvelope) {
            this.submitter.accept(wireEnvelope);
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (true) {
                if (!this.run) {
                    break;
                }
                try {
                    try {
                        this.lock.lock();
                        while (this.run && this.queue.isEmpty()) {
                            this.producer.await();
                        }
                    } catch (Throwable th) {
                        this.lock.unlock();
                        throw th;
                    }
                } catch (Exception e) {
                    Fifo.logger.warn("Unexpected exception while dispatching envelope", e);
                }
                if (!this.run) {
                    this.lock.unlock();
                    break;
                }
                WireEnvelope remove = this.queue.remove(0);
                this.consumer.signal();
                this.lock.unlock();
                Fifo.this.wireSupport.emit(remove.getRecords());
            }
            Fifo.logger.debug("exiting");
        }
    }

    public void bindWireHelperService(WireHelperService wireHelperService) {
        if (Objects.isNull(this.wireHelperService)) {
            this.wireHelperService = wireHelperService;
        }
    }

    public void unbindWireHelperService(WireHelperService wireHelperService) {
        if (this.wireHelperService == wireHelperService) {
            this.wireHelperService = null;
        }
    }

    public void activate(Map<String, Object> map, ComponentContext componentContext) {
        logger.info("Activating Fifo...");
        this.wireSupport = this.wireHelperService.newWireSupport(this, componentContext.getServiceReference());
        updated(map);
        logger.info("Activating Fifo... Done");
    }

    public void deactivate() {
        logger.info("Dectivating Fifo...");
        stopEmitterThread();
        logger.info("Dectivating Fifo... Done");
    }

    public void updated(Map<String, Object> map) {
        logger.info("Updating Fifo...");
        restartEmitterThread(String.valueOf((String) map.getOrDefault("kura.service.pid", "Fifo")) + "-EmitterThread", ((Integer) map.getOrDefault(QUEUE_CAPACITY_PROP_NAME, 50)).intValue(), ((Boolean) map.getOrDefault(DISCARD_ENVELOPES_PROP_NAME, false)).booleanValue());
        logger.info("Updating Fifo... Done");
    }

    private synchronized void stopEmitterThread() {
        if (this.emitterThread != null) {
            this.emitterThread.shutdown();
            this.emitterThread = null;
        }
    }

    private synchronized void restartEmitterThread(String str, int i, boolean z) {
        stopEmitterThread();
        logger.debug("Creating new emitter thread: {}, queue capacity: {}, discard envelopes: {}", str, Integer.valueOf(i), Boolean.valueOf(z));
        this.emitterThread = new FifoEmitterThread(str, i, z);
        this.emitterThread.start();
    }

    public void onWireReceive(WireEnvelope wireEnvelope) {
        Objects.requireNonNull(wireEnvelope, "Wire Envelope cannot be null");
        if (this.emitterThread != null) {
            this.emitterThread.submit(wireEnvelope);
        }
    }

    public Object polled(Wire wire) {
        return this.wireSupport.polled(wire);
    }

    public void consumersConnected(Wire[] wireArr) {
        this.wireSupport.consumersConnected(wireArr);
    }

    public void updated(Wire wire, Object obj) {
        this.wireSupport.updated(wire, obj);
    }

    public void producersConnected(Wire[] wireArr) {
        this.wireSupport.producersConnected(wireArr);
    }
}
