package org.eclipse.kura.core.data.store;

import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLDataException;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.List;
import java.util.TimeZone;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.eclipse.kura.KuraStoreCapacityReachedException;
import org.eclipse.kura.KuraStoreException;
import org.eclipse.kura.core.data.DataMessage;
import org.eclipse.kura.core.data.DataStore;
import org.eclipse.kura.core.db.HsqlDbServiceImpl;
import org.eclipse.kura.db.DbService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/eclipse/kura/core/data/store/DbDataStore.class */
public class DbDataStore implements DataStore {
    private static final Logger s_logger = LoggerFactory.getLogger(DbDataStore.class);
    private DbService m_dbService;
    private Calendar m_utcCalendar = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
    private ScheduledExecutorService m_houseKeeperExecutor;
    private ScheduledFuture<?> m_houseKeeperTask;
    int m_capacity;

    @Override // org.eclipse.kura.core.data.DataStore
    public synchronized void start(DbService dbService, int i, int i2, int i3) throws KuraStoreException {
        this.m_dbService = dbService;
        this.m_houseKeeperExecutor = Executors.newSingleThreadScheduledExecutor();
        init(i, i2, i3);
    }

    private void init(int i, int i2, int i3) throws KuraStoreException {
        execute("CREATE TABLE IF NOT EXISTS ds_messages (id INTEGER GENERATED ALWAYS AS IDENTITY PRIMARY KEY, topic VARCHAR(32767 CHARACTERS), qos INTEGER, retain BOOLEAN, createdOn TIMESTAMP, publishedOn TIMESTAMP, publishedMessageId INTEGER, confirmedOn TIMESTAMP, payload VARBINARY(16777216), priority INTEGER, sessionId VARCHAR(32767 CHARACTERS), droppedOn TIMESTAMP);", new Integer[0]);
        execute("DROP INDEX IF EXISTS ds_messages_publishedOn;", new Integer[0]);
        try {
            execute("CREATE INDEX ds_messages_nextMsg ON ds_messages (priority ASC, createdOn ASC, publishedOn, qos);", new Integer[0]);
        } catch (KuraStoreException e) {
            boolean z = false;
            if (e.getCause() != null && (e.getCause() instanceof SQLException) && ((SQLException) e.getCause()).getErrorCode() == -5504) {
                z = true;
            }
            if (!z) {
                throw e;
            }
        }
        update(i, i2, i3);
    }

    @Override // org.eclipse.kura.core.data.DataStore
    public synchronized void stop() {
        s_logger.info("Canceling the Housekeeper Task...");
        if (this.m_houseKeeperTask != null) {
            this.m_houseKeeperTask.cancel(true);
        }
        this.m_houseKeeperExecutor.shutdownNow();
    }

    @Override // org.eclipse.kura.core.data.DataStore
    public synchronized void update(int i, int i2, int i3) {
        this.m_capacity = i3;
        if (this.m_houseKeeperTask != null) {
            this.m_houseKeeperTask.cancel(true);
        }
        this.m_houseKeeperTask = this.m_houseKeeperExecutor.scheduleAtFixedRate(new HouseKeeperTask(this, i2, !((HsqlDbServiceImpl) this.m_dbService).isLogDataEnabled()), 1L, i, TimeUnit.SECONDS);
    }

    private synchronized int getMessageCount() throws KuraStoreException {
        ResultSet resultSet = null;
        Connection connection = null;
        PreparedStatement preparedStatement = null;
        int i = -1;
        try {
            try {
                connection = getConnection();
                preparedStatement = connection.prepareStatement("SELECT COUNT(*) FROM ds_messages;");
                resultSet = preparedStatement.executeQuery();
                if (resultSet.next()) {
                    i = resultSet.getInt(1);
                }
                close(resultSet);
                close(preparedStatement);
                close(connection);
                return i;
            } catch (Exception e) {
                throw new KuraStoreException(e, "Cannot get message count");
            }
        } catch (Throwable th) {
            close(resultSet);
            close(preparedStatement);
            close(connection);
            throw th;
        }
    }

    private synchronized void resetIdentityGenerator() throws KuraStoreException {
        execute("ALTER TABLE ds_messages ALTER COLUMN id RESTART WITH 0;", new Integer[0]);
    }

    @Override // org.eclipse.kura.core.data.DataStore
    public synchronized DataMessage store(String str, byte[] bArr, int i, boolean z, int i2) throws KuraStoreException {
        DataMessage storeInternal;
        if (str == null || str.trim().length() == 0) {
            throw new IllegalArgumentException("topic");
        }
        if (i2 != 0 && i2 != 1) {
            int messageCount = getMessageCount();
            s_logger.debug("Store message count: {}", Integer.valueOf(messageCount));
            if (messageCount >= this.m_capacity) {
                s_logger.error("Store capacity exceeded");
                throw new KuraStoreCapacityReachedException("Store capacity exceeded");
            }
        }
        try {
            storeInternal = storeInternal(str, bArr, i, z, i2);
        } catch (KuraStoreException e) {
            Throwable cause = e.getCause();
            if (!(cause instanceof SQLException)) {
                throw e;
            }
            if (((SQLException) cause).getErrorCode() != -3416) {
                throw e;
            }
            s_logger.warn("Identity generator limit exceeded. Resetting it...");
            resetIdentityGenerator();
            storeInternal = storeInternal(str, bArr, i, z, i2);
        }
        return storeInternal;
    }

    private synchronized DataMessage storeInternal(String str, byte[] bArr, int i, boolean z, int i2) throws KuraStoreException {
        if (str == null || str.trim().length() == 0) {
            throw new IllegalArgumentException("topic");
        }
        Timestamp timestamp = new Timestamp(new Date().getTime());
        int i3 = -1;
        ResultSet resultSet = null;
        Connection connection = null;
        PreparedStatement preparedStatement = null;
        PreparedStatement preparedStatement2 = null;
        try {
            try {
                connection = getConnection();
                preparedStatement = connection.prepareStatement("INSERT INTO ds_messages (topic, qos, retain, createdOn, publishedOn, publishedMessageId, confirmedOn, payload, priority, sessionId, droppedOn) VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);");
                preparedStatement.setString(1, str);
                preparedStatement.setInt(2, i);
                preparedStatement.setBoolean(3, z);
                preparedStatement.setTimestamp(4, timestamp, this.m_utcCalendar);
                preparedStatement.setTimestamp(5, null);
                preparedStatement.setInt(6, -1);
                preparedStatement.setTimestamp(7, null);
                preparedStatement.setBytes(8, bArr);
                preparedStatement.setInt(9, i2);
                preparedStatement.setString(10, null);
                preparedStatement.setTimestamp(11, null);
                preparedStatement.execute();
                preparedStatement2 = connection.prepareStatement("CALL IDENTITY();");
                resultSet = preparedStatement2.executeQuery();
                if (resultSet != null && resultSet.next()) {
                    i3 = resultSet.getInt(1);
                }
                connection.commit();
                close(resultSet);
                close(preparedStatement2);
                close(preparedStatement);
                close(connection);
                return get(i3);
            } catch (SQLException e) {
                rollback(connection);
                s_logger.error("SQL error code: {}", Integer.valueOf(e.getErrorCode()));
                throw new KuraStoreException(e, "Cannot store message");
            }
        } catch (Throwable th) {
            close(resultSet);
            close(preparedStatement2);
            close(preparedStatement);
            close(connection);
            throw th;
        }
    }

    @Override // org.eclipse.kura.core.data.DataStore
    public synchronized DataMessage get(int i) throws KuraStoreException {
        DataMessage dataMessage = null;
        ResultSet resultSet = null;
        Connection connection = null;
        PreparedStatement preparedStatement = null;
        try {
            try {
                connection = getConnection();
                preparedStatement = connection.prepareStatement("SELECT id, topic, qos, retain, createdOn, publishedOn, publishedMessageId, confirmedOn, payload, priority, sessionId, droppedOn FROM ds_messages WHERE id = ?");
                preparedStatement.setInt(1, i);
                resultSet = preparedStatement.executeQuery();
                if (resultSet.next()) {
                    dataMessage = buildDataMessage(resultSet);
                }
                close(resultSet);
                close(preparedStatement);
                close(connection);
                return dataMessage;
            } catch (Exception e) {
                throw new KuraStoreException(e, "Cannot get message by ID: " + i);
            }
        } catch (Throwable th) {
            close(resultSet);
            close(preparedStatement);
            close(connection);
            throw th;
        }
    }

    @Override // org.eclipse.kura.core.data.DataStore
    public synchronized DataMessage getNextMessage() throws KuraStoreException {
        DataMessage dataMessage = null;
        ResultSet resultSet = null;
        Connection connection = null;
        PreparedStatement preparedStatement = null;
        try {
            try {
                connection = getConnection();
                preparedStatement = connection.prepareStatement("SELECT d.id, d.topic, d.qos, d.retain, d.createdOn, d.publishedOn, d.publishedMessageId, d.confirmedOn, d.payload, d.priority, d.sessionId, d.droppedOn FROM (SELECT id FROM ds_messages WHERE publishedOn IS NULL ORDER BY priority ASC, createdOn ASC LIMIT 1 USING INDEX) a, ds_messages d WHERE a.id = d.id;");
                resultSet = preparedStatement.executeQuery();
                if (resultSet != null && resultSet.next()) {
                    dataMessage = buildDataMessage(resultSet);
                }
                close(resultSet);
                close(preparedStatement);
                close(connection);
                return dataMessage;
            } catch (Exception e) {
                throw new KuraStoreException(e, "Cannot get message next message");
            }
        } catch (Throwable th) {
            close(resultSet);
            close(preparedStatement);
            close(connection);
            throw th;
        }
    }

    @Override // org.eclipse.kura.core.data.DataStore
    public synchronized void published(int i, int i2, String str) throws KuraStoreException {
        Timestamp timestamp = new Timestamp(new Date().getTime());
        Connection connection = null;
        PreparedStatement preparedStatement = null;
        try {
            try {
                connection = getConnection();
                preparedStatement = connection.prepareStatement("UPDATE ds_messages SET publishedOn = ?, publishedMessageId = ?, sessionId = ? WHERE id = ?;");
                preparedStatement.setTimestamp(1, timestamp, this.m_utcCalendar);
                preparedStatement.setInt(2, i2);
                preparedStatement.setString(3, str);
                preparedStatement.setInt(4, i);
                preparedStatement.execute();
                connection.commit();
                close(preparedStatement);
                close(connection);
            } catch (SQLException e) {
                rollback(connection);
                throw new KuraStoreException(e, "Cannot update timestamp");
            }
        } catch (Throwable th) {
            close(preparedStatement);
            close(connection);
            throw th;
        }
    }

    @Override // org.eclipse.kura.core.data.DataStore
    public synchronized void published(int i) throws KuraStoreException {
        updateTimestamp("UPDATE ds_messages SET publishedOn = ? WHERE id = ?;", Integer.valueOf(i));
    }

    @Override // org.eclipse.kura.core.data.DataStore
    public synchronized void confirmed(int i) throws KuraStoreException {
        updateTimestamp("UPDATE ds_messages SET confirmedOn = ? WHERE id = ?;", Integer.valueOf(i));
    }

    @Override // org.eclipse.kura.core.data.DataStore
    public synchronized List<DataMessage> allUnpublishedMessagesNoPayload() throws KuraStoreException {
        return listMessages("SELECT id, topic, qos, retain, createdOn, publishedOn, publishedMessageId, confirmedOn, priority, sessionId, droppedOn FROM ds_messages WHERE publishedOn IS NULL ORDER BY priority ASC, createdOn ASC;", new Integer[0]);
    }

    @Override // org.eclipse.kura.core.data.DataStore
    public synchronized List<DataMessage> allInFlightMessagesNoPayload() throws KuraStoreException {
        return listMessages("SELECT id, topic, qos, retain, createdOn, publishedOn, publishedMessageId, confirmedOn, priority, sessionId, droppedOn FROM ds_messages WHERE publishedOn IS NOT NULL AND qos > 0 AND confirmedOn IS NULL AND droppedOn IS NULL ORDER BY priority ASC, createdOn ASC;", new Integer[0]);
    }

    @Override // org.eclipse.kura.core.data.DataStore
    public synchronized List<DataMessage> allDroppedInFlightMessagesNoPayload() throws KuraStoreException {
        return listMessages("SELECT id, topic, qos, retain, createdOn, publishedOn, publishedMessageId, confirmedOn, priority, sessionId, droppedOn FROM ds_messages WHERE droppedOn IS NOT NULL ORDER BY priority ASC, createdOn ASC;", new Integer[0]);
    }

    @Override // org.eclipse.kura.core.data.DataStore
    public synchronized void unpublishAllInFlighMessages() throws KuraStoreException {
        execute("UPDATE ds_messages SET publishedOn = NULL WHERE publishedOn IS NOT NULL AND qos > 0 AND confirmedOn IS NULL;", new Integer[0]);
    }

    @Override // org.eclipse.kura.core.data.DataStore
    public synchronized void dropAllInFlightMessages() throws KuraStoreException {
        updateTimestamp("UPDATE ds_messages SET droppedOn = ? WHERE publishedOn IS NOT NULL AND qos > 0 AND confirmedOn IS NULL;", new Integer[0]);
    }

    @Override // org.eclipse.kura.core.data.DataStore
    public synchronized void deleteStaleMessages(int i) throws KuraStoreException {
        Timestamp timestamp = new Timestamp(new Date().getTime());
        try {
            execute("DELETE FROM ds_messages WHERE DATEDIFF('ss', droppedOn, ?) > ? AND droppedOn IS NOT NULL;", timestamp, Integer.valueOf(i));
        } catch (KuraStoreException e) {
            Throwable cause = e.getCause();
            if (cause == null || !(cause instanceof SQLDataException) || ((SQLDataException) cause).getErrorCode() != -3435) {
                throw e;
            }
            s_logger.info("Delete all dropped messages older than one year");
            execute("DELETE FROM ds_messages WHERE DATEDIFF('yy', droppedOn, ?) > ? AND droppedOn IS NOT NULL;", timestamp, 0);
        }
        try {
            execute("DELETE FROM ds_messages WHERE DATEDIFF('ss', confirmedOn, ?) > ? AND confirmedOn IS NOT NULL;", timestamp, Integer.valueOf(i));
        } catch (KuraStoreException e2) {
            Throwable cause2 = e2.getCause();
            if (cause2 == null || !(cause2 instanceof SQLDataException) || ((SQLDataException) cause2).getErrorCode() != -3435) {
                throw e2;
            }
            s_logger.info("Delete all confirmed messages older than one year");
            execute("DELETE FROM ds_messages WHERE DATEDIFF('yy', confirmedOn, ?) > ? AND confirmedOn IS NOT NULL;", timestamp, 0);
        }
        try {
            execute("DELETE FROM ds_messages WHERE qos = 0 AND DATEDIFF('ss', publishedOn, ?) > ? AND publishedOn IS NOT NULL;", timestamp, Integer.valueOf(i));
        } catch (KuraStoreException e3) {
            Throwable cause3 = e3.getCause();
            if (cause3 == null || !(cause3 instanceof SQLDataException) || ((SQLDataException) cause3).getErrorCode() != -3435) {
                throw e3;
            }
            s_logger.info("Delete all published messages older than one year");
            execute("DELETE FROM ds_messages WHERE qos = 0 AND DATEDIFF('yy', publishedOn, ?) > ? AND publishedOn IS NOT NULL;", timestamp, 0);
        }
    }

    @Override // org.eclipse.kura.core.data.DataStore
    public synchronized void defrag() throws KuraStoreException {
        execute("CHECKPOINT DEFRAG", new Integer[0]);
    }

    @Override // org.eclipse.kura.core.data.DataStore
    public synchronized void checkpoint() throws KuraStoreException {
        execute("CHECKPOINT", new Integer[0]);
    }

    @Override // org.eclipse.kura.core.data.DataStore
    public synchronized void repair() throws KuraStoreException {
        ResultSet resultSet = null;
        Connection connection = null;
        PreparedStatement preparedStatement = null;
        Statement statement = null;
        int i = -1;
        try {
            try {
                connection = getConnection();
                preparedStatement = connection.prepareStatement("SELECT count(*) FROM (SELECT id, COUNT(id) FROM ds_messages GROUP BY id HAVING (COUNT(id) > 1)) dups;");
                resultSet = preparedStatement.executeQuery();
                if (resultSet.next()) {
                    i = resultSet.getInt(1);
                }
                if (i <= 0) {
                    close(resultSet);
                    close(preparedStatement);
                    close(null);
                    close(connection);
                    return;
                }
                s_logger.error("Found messages with duplicate ID. Count of IDs for which duplicates exist: {}. Attempting to repair...", Integer.valueOf(i));
                statement = connection.createStatement();
                statement.execute("ALTER TABLE ds_messages DROP PRIMARY KEY;");
                s_logger.info("Primary key dropped");
                statement.execute("DELETE FROM ds_messages WHERE id IN (SELECT id FROM ds_messages GROUP BY id HAVING COUNT(*) > 1);");
                s_logger.info("Duplicate messages deleted");
                statement.execute("ALTER TABLE ds_messages ADD PRIMARY KEY (id);");
                s_logger.info("Primary key created");
                connection.commit();
                statement.execute("CHECKPOINT DEFRAG");
                s_logger.info("Checkpoint defrag");
                connection.commit();
                close(resultSet);
                close(preparedStatement);
                close(statement);
                close(connection);
            } catch (SQLException e) {
                rollback(connection);
                throw new KuraStoreException(e, "Cannot repair database");
            }
        } catch (Throwable th) {
            close(resultSet);
            close(preparedStatement);
            close(statement);
            close(connection);
            throw th;
        }
    }

    private synchronized void updateTimestamp(String str, Integer... numArr) throws KuraStoreException {
        Timestamp timestamp = new Timestamp(new Date().getTime());
        Connection connection = null;
        PreparedStatement preparedStatement = null;
        try {
            try {
                connection = getConnection();
                preparedStatement = connection.prepareStatement(str);
                preparedStatement.setTimestamp(1, timestamp, this.m_utcCalendar);
                for (int i = 0; i < numArr.length; i++) {
                    preparedStatement.setInt(2 + i, numArr[i].intValue());
                }
                preparedStatement.execute();
                connection.commit();
                close(preparedStatement);
                close(connection);
            } catch (SQLException e) {
                rollback(connection);
                throw new KuraStoreException(e, "Cannot update timestamp");
            }
        } catch (Throwable th) {
            close(preparedStatement);
            close(connection);
            throw th;
        }
    }

    private synchronized List<DataMessage> listMessages(String str, Integer... numArr) throws KuraStoreException {
        new ArrayList();
        ResultSet resultSet = null;
        Connection connection = null;
        PreparedStatement preparedStatement = null;
        try {
            try {
                connection = getConnection();
                preparedStatement = connection.prepareStatement(str);
                if (numArr != null) {
                    for (int i = 0; i < numArr.length; i++) {
                        preparedStatement.setInt(2 + i, numArr[i].intValue());
                    }
                }
                resultSet = preparedStatement.executeQuery();
                List<DataMessage> buildDataMessagesNoPayload = buildDataMessagesNoPayload(resultSet);
                close(resultSet);
                close(preparedStatement);
                close(connection);
                return buildDataMessagesNoPayload;
            } catch (Exception e) {
                throw new KuraStoreException(e, "Cannot list messages");
            }
        } catch (Throwable th) {
            close(resultSet);
            close(preparedStatement);
            close(connection);
            throw th;
        }
    }

    private synchronized void execute(String str, Integer... numArr) throws KuraStoreException {
        Connection connection = null;
        PreparedStatement preparedStatement = null;
        try {
            try {
                connection = getConnection();
                preparedStatement = connection.prepareStatement(str);
                for (int i = 0; i < numArr.length; i++) {
                    preparedStatement.setInt(1 + i, numArr[i].intValue());
                }
                preparedStatement.execute();
                connection.commit();
                close(preparedStatement);
                close(connection);
            } catch (SQLException e) {
                rollback(connection);
                throw new KuraStoreException(e, "Cannot execute query");
            }
        } catch (Throwable th) {
            close(preparedStatement);
            close(connection);
            throw th;
        }
    }

    private synchronized void execute(String str, Timestamp timestamp, Integer... numArr) throws KuraStoreException {
        Connection connection = null;
        PreparedStatement preparedStatement = null;
        try {
            try {
                connection = getConnection();
                preparedStatement = connection.prepareStatement(str);
                if (timestamp != null) {
                    preparedStatement.setTimestamp(1, timestamp, this.m_utcCalendar);
                }
                for (int i = 0; i < numArr.length; i++) {
                    preparedStatement.setInt(2 + i, numArr[i].intValue());
                }
                preparedStatement.execute();
                connection.commit();
                close(preparedStatement);
                close(connection);
            } catch (SQLException e) {
                rollback(connection);
                throw new KuraStoreException(e, "Cannot execute query");
            }
        } catch (Throwable th) {
            close(preparedStatement);
            close(connection);
            throw th;
        }
    }

    private List<DataMessage> buildDataMessagesNoPayload(ResultSet resultSet) throws SQLException, IOException {
        ArrayList arrayList = new ArrayList();
        while (resultSet.next()) {
            arrayList.add(buildDataMessageNoPayload(resultSet));
        }
        return arrayList;
    }

    private DataMessage buildDataMessageNoPayload(ResultSet resultSet) throws SQLException {
        return buildDataMessageBuilder(resultSet).build();
    }

    private DataMessage buildDataMessage(ResultSet resultSet) throws SQLException {
        return buildDataMessageBuilder(resultSet).withPayload(resultSet.getBytes("payload")).build();
    }

    private DataMessage.Builder buildDataMessageBuilder(ResultSet resultSet) throws SQLException {
        return new DataMessage.Builder(resultSet.getInt("id")).withTopic(resultSet.getString("topic")).withQos(resultSet.getInt("qos")).withRetain(resultSet.getBoolean("retain")).withCreatedOn(resultSet.getTimestamp("createdOn", this.m_utcCalendar)).withPublishedOn(resultSet.getTimestamp("publishedOn", this.m_utcCalendar)).withPublishedMessageId(resultSet.getInt("publishedMessageId")).withConfirmedOn(resultSet.getTimestamp("confirmedOn", this.m_utcCalendar)).withPriority(resultSet.getInt("priority")).withSessionId(resultSet.getString("sessionId")).withDroppedOn(resultSet.getTimestamp("droppedOn"));
    }

    private Connection getConnection() throws SQLException {
        return this.m_dbService.getConnection();
    }

    private void rollback(Connection connection) {
        this.m_dbService.rollback(connection);
    }

    private void close(ResultSet... resultSetArr) {
        this.m_dbService.close(resultSetArr);
    }

    private void close(Statement... statementArr) {
        this.m_dbService.close(statementArr);
    }

    private void close(Connection connection) {
        this.m_dbService.close(connection);
    }
}
