package com.github.ddth.queue.impl;

import com.github.ddth.queue.IQueueMessage;
import com.github.ddth.queue.utils.QueueException;
import java.util.Collection;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;

/* loaded from: input_file:com/github/ddth/queue/impl/InmemQueue.class */
public class InmemQueue extends AbstractEphemeralSupportQueue {
    private Queue<IQueueMessage> queue;
    private ConcurrentMap<Object, IQueueMessage> ephemeralStorage;
    private int boundary = -1;

    public InmemQueue() {
    }

    public InmemQueue(int i) {
        setBoundary(i);
    }

    public int getBoundary() {
        return this.boundary;
    }

    public InmemQueue setBoundary(int i) {
        this.boundary = i;
        return this;
    }

    protected Queue<IQueueMessage> createQueue(int i) {
        return i > 0 ? i > 1024 ? new LinkedBlockingQueue(i) : new ArrayBlockingQueue(i) : new ConcurrentLinkedQueue();
    }

    @Override // com.github.ddth.queue.impl.AbstractQueue
    public InmemQueue init() {
        this.queue = createQueue(this.boundary);
        if (!isEphemeralDisabled()) {
            this.ephemeralStorage = new ConcurrentHashMap();
        }
        return this;
    }

    @Override // com.github.ddth.queue.impl.AbstractQueue
    public void destroy() {
    }

    @Override // com.github.ddth.queue.impl.AbstractQueue, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        destroy();
    }

    protected void putToQueue(IQueueMessage iQueueMessage) throws QueueException.QueueIsFull {
        if (!this.queue.offer(iQueueMessage)) {
            throw new QueueException.QueueIsFull(getBoundary());
        }
    }

    @Override // com.github.ddth.queue.IQueue
    public boolean queue(IQueueMessage iQueueMessage) throws QueueException.QueueIsFull {
        IQueueMessage mo5clone = iQueueMessage.mo5clone();
        Date date = new Date();
        mo5clone.qNumRequeues(0).qOriginalTimestamp(date).qTimestamp(date);
        putToQueue(mo5clone);
        return true;
    }

    @Override // com.github.ddth.queue.IQueue
    public boolean requeue(IQueueMessage iQueueMessage) throws QueueException.QueueIsFull {
        IQueueMessage mo5clone = iQueueMessage.mo5clone();
        mo5clone.qIncNumRequeues().qTimestamp(new Date());
        putToQueue(mo5clone);
        if (isEphemeralDisabled()) {
            return true;
        }
        this.ephemeralStorage.remove(mo5clone.qId());
        return true;
    }

    @Override // com.github.ddth.queue.IQueue
    public boolean requeueSilent(IQueueMessage iQueueMessage) throws QueueException.QueueIsFull {
        IQueueMessage mo5clone = iQueueMessage.mo5clone();
        putToQueue(mo5clone);
        if (isEphemeralDisabled()) {
            return true;
        }
        this.ephemeralStorage.remove(mo5clone.qId());
        return true;
    }

    @Override // com.github.ddth.queue.IQueue
    public void finish(IQueueMessage iQueueMessage) {
        if (isEphemeralDisabled()) {
            return;
        }
        this.ephemeralStorage.remove(iQueueMessage.qId());
    }

    protected IQueueMessage takeFromQueue() {
        return this.queue.poll();
    }

    @Override // com.github.ddth.queue.IQueue
    public IQueueMessage take() throws QueueException.EphemeralIsFull {
        int ephemeralMaxSize;
        if (!isEphemeralDisabled() && (ephemeralMaxSize = getEphemeralMaxSize()) > 0 && this.ephemeralStorage.size() >= ephemeralMaxSize) {
            throw new QueueException.EphemeralIsFull(ephemeralMaxSize);
        }
        IQueueMessage takeFromQueue = takeFromQueue();
        if (takeFromQueue != null && !isEphemeralDisabled()) {
            this.ephemeralStorage.putIfAbsent(takeFromQueue.qId(), takeFromQueue);
        }
        return takeFromQueue;
    }

    @Override // com.github.ddth.queue.IQueue
    public Collection<IQueueMessage> getOrphanMessages(long j) {
        if (isEphemeralDisabled()) {
            return null;
        }
        HashSet hashSet = new HashSet();
        long currentTimeMillis = System.currentTimeMillis();
        Iterator<Map.Entry<Object, IQueueMessage>> it = this.ephemeralStorage.entrySet().iterator();
        while (it.hasNext()) {
            IQueueMessage value = it.next().getValue();
            if (value.qTimestamp().getTime() + j < currentTimeMillis) {
                hashSet.add(value);
            }
        }
        return hashSet;
    }

    @Override // com.github.ddth.queue.IQueue
    public boolean moveFromEphemeralToQueueStorage(IQueueMessage iQueueMessage) {
        IQueueMessage remove;
        if (isEphemeralDisabled() || (remove = this.ephemeralStorage.remove(iQueueMessage.qId())) == null) {
            return true;
        }
        try {
            putToQueue(remove);
            return true;
        } catch (QueueException.QueueIsFull e) {
            this.ephemeralStorage.putIfAbsent(remove.qId(), remove);
            return false;
        }
    }

    @Override // com.github.ddth.queue.IQueue
    public int queueSize() {
        return this.queue.size();
    }

    @Override // com.github.ddth.queue.IQueue
    public int ephemeralSize() {
        if (isEphemeralDisabled()) {
            return 0;
        }
        return this.ephemeralStorage.size();
    }
}
