package org.eclipse.kura.core.data;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.regex.Pattern;
import org.eclipse.kura.KuraConnectException;
import org.eclipse.kura.KuraException;
import org.eclipse.kura.KuraNotConnectedException;
import org.eclipse.kura.KuraStoreException;
import org.eclipse.kura.KuraTimeoutException;
import org.eclipse.kura.KuraTooManyInflightMessagesException;
import org.eclipse.kura.configuration.ConfigurableComponent;
import org.eclipse.kura.core.data.store.DbDataStore;
import org.eclipse.kura.data.DataService;
import org.eclipse.kura.data.DataServiceListener;
import org.eclipse.kura.data.DataTransportListener;
import org.eclipse.kura.data.DataTransportService;
import org.eclipse.kura.data.DataTransportToken;
import org.eclipse.kura.db.DbService;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.component.ComponentException;
import org.osgi.util.tracker.ServiceTracker;
import org.osgi.util.tracker.ServiceTrackerCustomizer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/eclipse/kura/core/data/DataServiceImpl.class */
public class DataServiceImpl implements DataService, DataTransportListener, ConfigurableComponent {
    private static final Logger s_logger = LoggerFactory.getLogger(DataServiceImpl.class);
    private static final int TRANSPORT_TASK_TIMEOUT = 1;
    private static final String AUTOCONNECT_PROP_NAME = "connect.auto-on-startup";
    private static final String CONNECT_DELAY_PROP_NAME = "connect.retry-interval";
    private static final String DISCONNECT_DELAY_PROP_NAME = "disconnect.quiesce-timeout";
    private static final String STORE_HOUSEKEEPER_INTERVAL_PROP_NAME = "store.housekeeper-interval";
    private static final String STORE_PURGE_AGE_PROP_NAME = "store.purge-age";
    private static final String STORE_CAPACITY_PROP_NAME = "store.capacity";
    private static final String REPUBLISH_IN_FLIGHT_MSGS_PROP_NAME = "in-flight-messages.republish-on-new-session";
    private static final String MAX_IN_FLIGHT_MSGS_PROP_NAME = "in-flight-messages.max-number";
    private static final String IN_FLIGHT_MSGS_CONGESTION_TIMEOUT_PROP_NAME = "in-flight-messages.congestion-timeout";
    private Map<String, Object> m_properties = new HashMap();
    private DataTransportService m_dataTransportService;
    private DbService m_dbService;
    private DataServiceListeners m_dataServiceListeners;
    protected ScheduledExecutorService m_reconnectExecutor;
    private ScheduledFuture<?> m_reconnectFuture;
    private ScheduledExecutorService m_publisherExecutor;
    private DataStore m_store;
    private Map<DataTransportToken, Integer> m_inFlightMsgIds;
    private ScheduledExecutorService m_congestionExecutor;
    private ScheduledFuture<?> m_congestionFuture;

    protected void activate(ComponentContext componentContext, Map<String, Object> map) {
        s_logger.info("Activating...");
        this.m_reconnectExecutor = Executors.newSingleThreadScheduledExecutor();
        this.m_publisherExecutor = Executors.newSingleThreadScheduledExecutor();
        this.m_congestionExecutor = Executors.newSingleThreadScheduledExecutor();
        this.m_properties.putAll(map);
        this.m_store = new DbDataStore();
        try {
            this.m_store.start(this.m_dbService, ((Integer) this.m_properties.get(STORE_HOUSEKEEPER_INTERVAL_PROP_NAME)).intValue(), ((Integer) this.m_properties.get(STORE_PURGE_AGE_PROP_NAME)).intValue(), ((Integer) this.m_properties.get(STORE_CAPACITY_PROP_NAME)).intValue());
            List<DataMessage> allInFlightMessagesNoPayload = this.m_store.allInFlightMessagesNoPayload();
            this.m_inFlightMsgIds = new ConcurrentHashMap();
            if (allInFlightMessagesNoPayload != null) {
                for (DataMessage dataMessage : allInFlightMessagesNoPayload) {
                    this.m_inFlightMsgIds.put(new DataTransportToken(dataMessage.getPublishedMessageId(), dataMessage.getSessionId()), Integer.valueOf(dataMessage.getId()));
                    s_logger.debug("Restored in-fligh messages from store. Topic: {}, ID: {}, MQTT message ID: {}", new Object[]{dataMessage.getTopic(), Integer.valueOf(dataMessage.getId()), Integer.valueOf(dataMessage.getPublishedMessageId())});
                }
            }
            this.m_dataServiceListeners = new DataServiceListeners(new ServiceTracker(componentContext.getBundleContext(), DataServiceListener.class, (ServiceTrackerCustomizer) null));
            startReconnectTask();
        } catch (KuraStoreException e) {
            s_logger.error("Failed to start store", e);
            throw new ComponentException("Failed to start store", e);
        }
    }

    public void updated(Map<String, Object> map) {
        s_logger.info("Updating...");
        stopReconnectTask();
        this.m_properties.clear();
        this.m_properties.putAll(map);
        this.m_store.update(((Integer) this.m_properties.get(STORE_HOUSEKEEPER_INTERVAL_PROP_NAME)).intValue(), ((Integer) this.m_properties.get(STORE_PURGE_AGE_PROP_NAME)).intValue(), ((Integer) this.m_properties.get(STORE_CAPACITY_PROP_NAME)).intValue());
        if (this.m_dataTransportService.isConnected()) {
            return;
        }
        startReconnectTask();
    }

    protected void deactivate(ComponentContext componentContext) {
        s_logger.info("Deactivating...");
        this.m_congestionExecutor.shutdownNow();
        try {
            this.m_publisherExecutor.awaitTermination(1L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            s_logger.info("Interrupted", e);
        }
        this.m_publisherExecutor.shutdownNow();
        stopReconnectTask();
        this.m_reconnectExecutor.shutdownNow();
        disconnect();
        this.m_dataServiceListeners.close();
        this.m_store.stop();
    }

    public void setDataTransportService(DataTransportService dataTransportService) {
        this.m_dataTransportService = dataTransportService;
    }

    public void unsetDataTransportService(DataTransportService dataTransportService) {
        this.m_dataTransportService = null;
    }

    public void setDbService(DbService dbService) {
        this.m_dbService = dbService;
    }

    public void unsetDbService(DbService dbService) {
        this.m_dbService = null;
    }

    public void onConnectionEstablished(boolean z) {
        s_logger.info("Notified connected");
        if (z) {
            if (((Boolean) this.m_properties.get(REPUBLISH_IN_FLIGHT_MSGS_PROP_NAME)).booleanValue()) {
                s_logger.info("New session established. Unpublishing all in-flight messages. Disregarding the QoS level, this may cause duplicate messages.");
                try {
                    this.m_store.unpublishAllInFlighMessages();
                    this.m_inFlightMsgIds.clear();
                } catch (KuraStoreException e) {
                    s_logger.error("Failed to unpublish in-flight messages", e);
                }
            } else {
                s_logger.info("New session established. Dropping all in-flight messages.");
                try {
                    this.m_store.dropAllInFlightMessages();
                    this.m_inFlightMsgIds.clear();
                } catch (KuraStoreException e2) {
                    s_logger.error("Failed to drop in-flight messages", e2);
                }
            }
        }
        this.m_dataServiceListeners.onConnectionEstablished();
        submitPublishingWork();
    }

    public void onDisconnecting() {
        s_logger.info("Notified disconnecting");
        this.m_dataServiceListeners.onDisconnecting();
        try {
            submitPublishingWork().get(1L, TimeUnit.SECONDS);
        } catch (InterruptedException unused) {
            s_logger.info("Interrupted while waiting for the publishing work to complete");
        } catch (ExecutionException e) {
            s_logger.warn("ExecutionException while waiting for the publishing work to complete", e);
        } catch (TimeoutException unused2) {
            s_logger.warn("Timeout while waiting for the publishing work to complete");
        }
    }

    public void onDisconnected() {
        s_logger.info("Notified disconnected");
        this.m_dataServiceListeners.onDisconnected();
    }

    public void onConfigurationUpdating(boolean z) {
        s_logger.info("Notified DataTransportService configuration updating...");
        stopReconnectTask();
        disconnect(0L);
    }

    public void onConfigurationUpdated(boolean z) {
        s_logger.info("Notified DataTransportService configuration updated.");
        if (startReconnectTask() || !z) {
            return;
        }
        try {
            connect();
        } catch (KuraConnectException e) {
            s_logger.error("Error during re-connect after configuration update.", e);
        }
    }

    public void onConnectionLost(Throwable th) {
        s_logger.info("connectionLost");
        stopReconnectTask();
        startReconnectTask();
        this.m_dataServiceListeners.onConnectionLost(th);
    }

    public void onMessageArrived(String str, byte[] bArr, int i, boolean z) {
        s_logger.debug("Message arrived on topic: {}", str);
        this.m_dataServiceListeners.onMessageArrived(str, bArr, i, z);
        submitPublishingWork();
    }

    public synchronized void onMessageConfirmed(DataTransportToken dataTransportToken) {
        s_logger.debug("Confirmed message with MQTT message ID: {} on session ID: {}", Integer.valueOf(dataTransportToken.getMessageId()), dataTransportToken.getSessionId());
        Integer remove = this.m_inFlightMsgIds.remove(dataTransportToken);
        if (remove == null) {
            s_logger.info("Confirmed message published with MQTT message ID: {} not tracked in the map of in-flight messages", Integer.valueOf(dataTransportToken.getMessageId()));
        } else {
            DataMessage dataMessage = null;
            try {
                s_logger.info("Confirmed message ID: {} to store", remove);
                this.m_store.confirmed(remove.intValue());
                dataMessage = this.m_store.get(remove.intValue());
            } catch (KuraStoreException e) {
                s_logger.error("Cannot confirm message to store", e);
            }
            if (dataMessage != null) {
                this.m_dataServiceListeners.onMessageConfirmed(remove.intValue(), dataMessage.getTopic());
            } else {
                s_logger.error("Confirmed Message with ID {} could not be loaded from the DataStore.", remove);
            }
        }
        if (this.m_inFlightMsgIds.size() < ((Integer) this.m_properties.get(MAX_IN_FLIGHT_MSGS_PROP_NAME)).intValue()) {
            handleInFlightDecongestion();
        }
        submitPublishingWork();
    }

    public void connect() throws KuraConnectException {
        stopReconnectTask();
        if (this.m_dataTransportService.isConnected()) {
            return;
        }
        this.m_dataTransportService.connect();
    }

    public boolean isConnected() {
        return this.m_dataTransportService.isConnected();
    }

    public boolean isAutoConnectEnabled() {
        return ((Boolean) this.m_properties.get(AUTOCONNECT_PROP_NAME)).booleanValue();
    }

    public int getRetryInterval() {
        return ((Integer) this.m_properties.get(CONNECT_DELAY_PROP_NAME)).intValue();
    }

    public void disconnect(long j) {
        stopReconnectTask();
        this.m_dataTransportService.disconnect(j);
    }

    public void subscribe(String str, int i) throws KuraTimeoutException, KuraException, KuraNotConnectedException {
        this.m_dataTransportService.subscribe(str, i);
    }

    public void unsubscribe(String str) throws KuraTimeoutException, KuraException, KuraNotConnectedException {
        this.m_dataTransportService.unsubscribe(str);
    }

    public int publish(String str, byte[] bArr, int i, boolean z, int i2) throws KuraStoreException {
        s_logger.info("Storing message on topic :{}, priority: {}", str, Integer.valueOf(i2));
        DataMessage store = this.m_store.store(str, bArr, i, z, i2);
        s_logger.info("Stored message on topic :{}, priority: {}", str, Integer.valueOf(i2));
        submitPublishingWork();
        return store.getId();
    }

    public List<Integer> getUnpublishedMessageIds(String str) throws KuraStoreException {
        return buildMessageIds(this.m_store.allUnpublishedMessagesNoPayload(), str);
    }

    public List<Integer> getInFlightMessageIds(String str) throws KuraStoreException {
        return buildMessageIds(this.m_store.allInFlightMessagesNoPayload(), str);
    }

    public List<Integer> getDroppedInFlightMessageIds(String str) throws KuraStoreException {
        return buildMessageIds(this.m_store.allDroppedInFlightMessagesNoPayload(), str);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean startReconnectTask() {
        if (this.m_reconnectFuture != null && !this.m_reconnectFuture.isDone()) {
            s_logger.error("Reconnect task already running");
            throw new IllegalStateException("Reconnect task already running");
        }
        boolean booleanValue = ((Boolean) this.m_properties.get(AUTOCONNECT_PROP_NAME)).booleanValue();
        int intValue = ((Integer) this.m_properties.get(CONNECT_DELAY_PROP_NAME)).intValue();
        if (booleanValue) {
            int i = intValue / 5;
            int nextInt = new Random().nextInt(i > 0 ? i : TRANSPORT_TASK_TIMEOUT);
            s_logger.info("Starting reconnect task with initial delay {}", Integer.valueOf(nextInt));
            this.m_reconnectFuture = this.m_reconnectExecutor.scheduleAtFixedRate(new Runnable() { // from class: org.eclipse.kura.core.data.DataServiceImpl.1
                @Override // java.lang.Runnable
                public void run() {
                    String name = Thread.currentThread().getName();
                    Thread.currentThread().setName("DataServiceImpl:ReconnectTask");
                    try {
                        try {
                            try {
                                DataServiceImpl.s_logger.info("Connecting...");
                                if (DataServiceImpl.this.m_dataTransportService.isConnected()) {
                                    DataServiceImpl.s_logger.info("Already connected. Reconnect task will be terminated.");
                                } else {
                                    DataServiceImpl.this.m_dataTransportService.connect();
                                    DataServiceImpl.s_logger.info("Connected. Reconnect task will be terminated.");
                                }
                                Thread.currentThread().setName(name);
                                if (DataServiceImpl.TRANSPORT_TASK_TIMEOUT != 0) {
                                    throw new RuntimeException("Connected. Reconnect task will be terminated.");
                                }
                            } catch (Error e) {
                                DataServiceImpl.s_logger.error("Unexpected Error. Task will be terminated", e);
                                throw e;
                            }
                        } catch (Exception e2) {
                            DataServiceImpl.s_logger.warn("Connect failed", e2.getCause().getMessage());
                            Thread.currentThread().setName(name);
                            if (0 != 0) {
                                throw new RuntimeException("Connected. Reconnect task will be terminated.");
                            }
                        }
                    } catch (Throwable th) {
                        Thread.currentThread().setName(name);
                        if (0 == 0) {
                            throw th;
                        }
                        throw new RuntimeException("Connected. Reconnect task will be terminated.");
                    }
                }
            }, nextInt, intValue, TimeUnit.SECONDS);
        }
        return booleanValue;
    }

    private void stopReconnectTask() {
        if (this.m_reconnectFuture == null || this.m_reconnectFuture.isDone()) {
            return;
        }
        s_logger.info("Reconnect task running. Stopping it");
        this.m_reconnectFuture.cancel(true);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void disconnect() {
        this.m_dataTransportService.disconnect(((Integer) this.m_properties.get(DISCONNECT_DELAY_PROP_NAME)).intValue() * 1000);
    }

    private Future<?> submitPublishingWork() {
        return this.m_publisherExecutor.submit(new Runnable() { // from class: org.eclipse.kura.core.data.DataServiceImpl.2
            @Override // java.lang.Runnable
            public void run() {
                Thread.currentThread().setName("DataServiceImpl:Submit");
                if (!DataServiceImpl.this.m_dataTransportService.isConnected()) {
                    DataServiceImpl.s_logger.info("DataPublisherService not connected");
                    return;
                }
                while (true) {
                    try {
                        DataMessage nextMessage = DataServiceImpl.this.m_store.getNextMessage();
                        if (nextMessage == null) {
                            return;
                        }
                        if (nextMessage.getQos() > 0 && DataServiceImpl.this.m_inFlightMsgIds.size() >= ((Integer) DataServiceImpl.this.m_properties.get(DataServiceImpl.MAX_IN_FLIGHT_MSGS_PROP_NAME)).intValue()) {
                            DataServiceImpl.s_logger.warn("The configured maximum number of in-flight messages has been reached");
                            DataServiceImpl.this.handleInFlightCongestion();
                            return;
                        } else {
                            DataServiceImpl.this.publishInternal(nextMessage);
                            DataServiceImpl.this.m_dataServiceListeners.onMessagePublished(nextMessage.getId(), nextMessage.getTopic());
                        }
                    } catch (Exception e) {
                        DataServiceImpl.s_logger.error("Probably an unrecoverable exception", e);
                        return;
                    } catch (KuraConnectException e2) {
                        DataServiceImpl.s_logger.info("DataPublisherService is not connected", e2);
                        return;
                    } catch (KuraTooManyInflightMessagesException e3) {
                        DataServiceImpl.s_logger.info("Too many in-flight messages", e3);
                        DataServiceImpl.this.handleInFlightCongestion();
                        return;
                    }
                }
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void publishInternal(DataMessage dataMessage) throws KuraConnectException, KuraTooManyInflightMessagesException, KuraStoreException, KuraException {
        String topic = dataMessage.getTopic();
        byte[] payload = dataMessage.getPayload();
        int qos = dataMessage.getQos();
        boolean isRetain = dataMessage.isRetain();
        int id = dataMessage.getId();
        s_logger.debug("Publishing message with ID: {} on topic: {}, priority: {}", new Object[]{Integer.valueOf(id), topic, Integer.valueOf(dataMessage.getPriority())});
        DataTransportToken publish = this.m_dataTransportService.publish(topic, payload, qos, isRetain);
        if (publish == null) {
            this.m_store.published(id);
            s_logger.debug("Published message with ID: {}", Integer.valueOf(id));
            return;
        }
        if (this.m_inFlightMsgIds.get(publish) != null) {
            s_logger.error("Token already tracked: " + publish.getSessionId() + "-" + publish.getMessageId());
        }
        this.m_inFlightMsgIds.put(publish, Integer.valueOf(id));
        this.m_store.published(id, publish.getMessageId(), publish.getSessionId());
        s_logger.debug("Published message with ID: {} and MQTT message ID: {}", Integer.valueOf(id), Integer.valueOf(publish.getMessageId()));
    }

    private List<Integer> buildMessageIds(List<DataMessage> list, String str) {
        Pattern compile = Pattern.compile(str);
        ArrayList arrayList = new ArrayList();
        if (list != null) {
            for (DataMessage dataMessage : list) {
                if (compile.matcher(dataMessage.getTopic()).matches()) {
                    arrayList.add(Integer.valueOf(dataMessage.getId()));
                }
            }
        }
        return arrayList;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleInFlightCongestion() {
        int intValue = ((Integer) this.m_properties.get(IN_FLIGHT_MSGS_CONGESTION_TIMEOUT_PROP_NAME)).intValue();
        if (intValue != 0) {
            if (this.m_congestionFuture == null || this.m_congestionFuture.isDone()) {
                s_logger.warn("In-flight message congestion timeout started");
                this.m_congestionFuture = this.m_congestionExecutor.schedule(new Runnable() { // from class: org.eclipse.kura.core.data.DataServiceImpl.3
                    @Override // java.lang.Runnable
                    public void run() {
                        Thread.currentThread().setName("DataServiceImpl:InFlightCongestion");
                        DataServiceImpl.s_logger.warn("In-flight message congestion timeout elapsed. Disconnecting and reconnecting again");
                        DataServiceImpl.this.disconnect();
                        DataServiceImpl.this.startReconnectTask();
                    }
                }, intValue, TimeUnit.SECONDS);
            }
        }
    }

    private void handleInFlightDecongestion() {
        if (this.m_congestionFuture == null || this.m_congestionFuture.isDone()) {
            return;
        }
        this.m_congestionFuture.cancel(true);
    }
}
