package com.github.ddth.queue.impl;

import com.github.ddth.commons.utils.IdGenerator;
import com.github.ddth.queue.IQueue;
import com.github.ddth.queue.IQueueMessage;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.io.FileUtils;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.DBOptions;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.RocksObject;
import org.rocksdb.WriteBatch;
import org.rocksdb.WriteOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/github/ddth/queue/impl/RocksDbQueue.class */
public abstract class RocksDbQueue implements IQueue, Closeable, AutoCloseable {
    private RocksDB rocksDb;
    private RocksIterator itQueue;
    private DBOptions dbOptions;
    private WriteOptions writeOptions;
    private ReadOptions readOptions;
    private WriteBatch batchPutToQueue;
    private WriteBatch batchTake;
    private ColumnFamilyHandle cfQueue;
    private ColumnFamilyHandle cfMetadata;
    private ColumnFamilyHandle cfEphemeral;
    private static final byte[] keyLastFetchedId = "last-fetched-id".getBytes();
    private Logger LOGGER = LoggerFactory.getLogger(RocksDbQueue.class);
    private String storageDir = "/tmp/ddth-rocksdb-queue";
    private IdGenerator idGen = IdGenerator.getInstance(IdGenerator.getMacAddr());
    private byte[] lastFetchedId = null;
    private Lock lockPut = new ReentrantLock();
    private Lock lockTake = new ReentrantLock();
    private boolean ephemeralDisabled = false;
    private String cfNameQueue = "queue";
    private String cfNameMetadata = "metadata";
    private String cfNameEphemeral = "ephemeral";
    private List<ColumnFamilyHandle> cfHandleList = new ArrayList();

    protected RocksDB getRocksDb() {
        return this.rocksDb;
    }

    protected ColumnFamilyHandle getCfQueue() {
        return this.cfQueue;
    }

    protected ColumnFamilyHandle getCfMetadata() {
        return this.cfMetadata;
    }

    protected ColumnFamilyHandle getCfEphemeral() {
        return this.cfEphemeral;
    }

    protected ReadOptions getReadOptions() {
        return this.readOptions;
    }

    protected WriteOptions getWriteOptions() {
        return this.writeOptions;
    }

    public boolean getEphemeralDisabled() {
        return this.ephemeralDisabled;
    }

    public boolean isEphemeralDisabled() {
        return this.ephemeralDisabled;
    }

    public RocksDbQueue setEphemeralDisabled(boolean z) {
        this.ephemeralDisabled = z;
        return this;
    }

    public String getStorageDir() {
        return this.storageDir;
    }

    public RocksDbQueue setStorageDir(String str) {
        this.storageDir = str;
        return this;
    }

    public String getCfNameQueue() {
        return this.cfNameQueue;
    }

    public RocksDbQueue setCfNameQueue(String str) {
        this.cfNameQueue = str;
        return this;
    }

    public String getCfNameMetadata() {
        return this.cfNameMetadata;
    }

    public RocksDbQueue setCfNameMetadata(String str) {
        this.cfNameMetadata = str;
        return this;
    }

    public String getCfNameEphemeral() {
        return this.cfNameEphemeral;
    }

    public RocksDbQueue setCfNameEphemeral(String str) {
        this.cfNameEphemeral = str;
        return this;
    }

    public RocksDbQueue init() {
        File file = new File(this.storageDir);
        this.LOGGER.info("Storage Directory: " + file.getAbsolutePath());
        try {
            FileUtils.forceMkdir(file);
            RocksDB.loadLibrary();
            this.batchPutToQueue = new WriteBatch();
            this.batchTake = new WriteBatch();
            this.dbOptions = new DBOptions();
            this.dbOptions.setCreateIfMissing(true).setCreateMissingColumnFamilies(true).setMaxBackgroundFlushes(2).setMaxBackgroundCompactions(2);
            this.dbOptions.setAllowMmapReads(true).setAllowMmapWrites(true).setAllowOsBuffer(true);
            this.writeOptions = new WriteOptions().setSync(false).setDisableWAL(false);
            this.readOptions = new ReadOptions().setTailing(true);
            try {
                ArrayList arrayList = new ArrayList();
                arrayList.add(new ColumnFamilyDescriptor(this.cfNameEphemeral.getBytes()));
                arrayList.add(new ColumnFamilyDescriptor(this.cfNameMetadata.getBytes()));
                arrayList.add(new ColumnFamilyDescriptor(this.cfNameQueue.getBytes()));
                arrayList.add(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY));
                this.rocksDb = RocksDB.open(this.dbOptions, file.getAbsolutePath(), arrayList, this.cfHandleList);
                this.cfEphemeral = this.cfHandleList.get(0);
                this.cfMetadata = this.cfHandleList.get(1);
                this.cfQueue = this.cfHandleList.get(2);
                this.itQueue = this.rocksDb.newIterator(this.cfQueue, this.readOptions);
                this.lastFetchedId = loadLastFetchedId();
                return this;
            } catch (RocksDBException e) {
                destroy();
                throw new RuntimeException((Throwable) e);
            }
        } catch (IOException e2) {
            throw new RuntimeException(e2);
        }
    }

    private void disposeRocksObject(RocksObject rocksObject) {
        if (rocksObject != null) {
            try {
                rocksObject.dispose();
            } catch (Exception e) {
                this.LOGGER.warn(e.getMessage(), e);
            }
        }
    }

    public void destroy() {
        try {
            saveLastFetchedId(this.lastFetchedId);
        } catch (RocksDBException e) {
            this.LOGGER.error(e.getMessage(), e);
        }
        disposeRocksObject(this.batchPutToQueue);
        disposeRocksObject(this.batchTake);
        disposeRocksObject(this.cfEphemeral);
        disposeRocksObject(this.cfMetadata);
        disposeRocksObject(this.cfQueue);
        disposeRocksObject(this.itQueue);
        disposeRocksObject(this.rocksDb);
        disposeRocksObject(this.dbOptions);
        disposeRocksObject(this.readOptions);
        disposeRocksObject(this.writeOptions);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        destroy();
    }

    private byte[] loadLastFetchedId() throws RocksDBException {
        return this.rocksDb.get(this.cfMetadata, this.readOptions, keyLastFetchedId);
    }

    private void saveLastFetchedId(byte[] bArr) throws RocksDBException {
        if (bArr != null) {
            this.rocksDb.put(this.cfMetadata, this.writeOptions, keyLastFetchedId, bArr);
        }
    }

    protected abstract byte[] serialize(IQueueMessage iQueueMessage);

    protected abstract IQueueMessage deserialize(byte[] bArr);

    protected boolean putToQueue(IQueueMessage iQueueMessage, boolean z) {
        byte[] serialize = serialize(iQueueMessage);
        this.lockPut.lock();
        try {
            try {
                try {
                    this.batchPutToQueue.put(this.cfQueue, this.idGen.generateId128Hex().toLowerCase().getBytes(), serialize);
                    if (z && !this.ephemeralDisabled) {
                        this.batchPutToQueue.remove(this.cfEphemeral, iQueueMessage.qId().toString().getBytes());
                    }
                    this.rocksDb.write(this.writeOptions, this.batchPutToQueue);
                    this.batchPutToQueue.clear();
                    return true;
                } catch (RocksDBException e) {
                    throw new RuntimeException((Throwable) e);
                }
            } catch (Throwable th) {
                this.batchPutToQueue.clear();
                throw th;
            }
        } finally {
            this.lockPut.unlock();
        }
    }

    @Override // com.github.ddth.queue.IQueue
    public boolean queue(IQueueMessage iQueueMessage) {
        IQueueMessage mo2clone = iQueueMessage.mo2clone();
        Date date = new Date();
        mo2clone.qNumRequeues(0).qOriginalTimestamp(date).qTimestamp(date);
        return putToQueue(mo2clone, false);
    }

    @Override // com.github.ddth.queue.IQueue
    public boolean requeue(IQueueMessage iQueueMessage) {
        IQueueMessage mo2clone = iQueueMessage.mo2clone();
        mo2clone.qIncNumRequeues().qTimestamp(new Date());
        return putToQueue(mo2clone, true);
    }

    @Override // com.github.ddth.queue.IQueue
    public boolean requeueSilent(IQueueMessage iQueueMessage) {
        return putToQueue(iQueueMessage, true);
    }

    @Override // com.github.ddth.queue.IQueue
    public void finish(IQueueMessage iQueueMessage) {
        if (this.ephemeralDisabled) {
            return;
        }
        try {
            this.rocksDb.remove(this.cfEphemeral, this.writeOptions, iQueueMessage.qId().toString().getBytes());
        } catch (RocksDBException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    @Override // com.github.ddth.queue.IQueue
    public IQueueMessage take() {
        this.lockTake.lock();
        try {
            if (this.lastFetchedId == null) {
                this.itQueue.seekToFirst();
            } else {
                this.itQueue.seek(this.lastFetchedId);
            }
            if (!this.itQueue.isValid()) {
                return null;
            }
            this.lastFetchedId = this.itQueue.key();
            byte[] value = this.itQueue.value();
            IQueueMessage deserialize = deserialize(value);
            try {
                try {
                    this.batchTake.remove(this.cfQueue, this.lastFetchedId);
                    this.batchTake.put(this.cfMetadata, keyLastFetchedId, this.lastFetchedId);
                    if (!this.ephemeralDisabled && deserialize != null) {
                        this.batchTake.put(this.cfEphemeral, deserialize.qId().toString().getBytes(), value);
                    }
                    this.rocksDb.write(this.writeOptions, this.batchTake);
                    this.batchTake.clear();
                } catch (Throwable th) {
                    this.batchTake.clear();
                    throw th;
                }
            } catch (RocksDBException e) {
                this.LOGGER.error(e.getMessage(), e);
            }
            this.itQueue.next();
            this.lockTake.unlock();
            return deserialize;
        } finally {
            this.lockTake.unlock();
        }
    }

    @Override // com.github.ddth.queue.IQueue
    public Collection<IQueueMessage> getOrphanMessages(long j) {
        return null;
    }

    @Override // com.github.ddth.queue.IQueue
    public boolean moveFromEphemeralToQueueStorage(IQueueMessage iQueueMessage) {
        return putToQueue(iQueueMessage, true);
    }

    @Override // com.github.ddth.queue.IQueue
    public synchronized int queueSize() {
        RocksIterator newIterator = getRocksDb().newIterator(this.cfQueue, this.readOptions);
        try {
            int i = 0;
            newIterator.seekToFirst();
            while (newIterator.isValid()) {
                i++;
                newIterator.next();
            }
            return i;
        } finally {
            newIterator.dispose();
        }
    }

    @Override // com.github.ddth.queue.IQueue
    public synchronized int ephemeralSize() {
        RocksIterator newIterator = getRocksDb().newIterator(getCfEphemeral(), getReadOptions());
        try {
            int i = 0;
            newIterator.seekToFirst();
            while (newIterator.isValid()) {
                i++;
                newIterator.next();
            }
            return i;
        } finally {
            newIterator.dispose();
        }
    }
}
