package com.github.xiaolyuh.listener;

import com.alibaba.fastjson.JSON;
import com.github.xiaolyuh.cache.Cache;
import com.github.xiaolyuh.cache.LayeringCache;
import com.github.xiaolyuh.manager.AbstractCacheManager;
import com.github.xiaolyuh.redis.clinet.RedisClient;
import com.github.xiaolyuh.support.LayeringCacheRedisLock;
import com.github.xiaolyuh.util.BeanFactory;
import com.github.xiaolyuh.util.GlobalConfig;
import com.github.xiaolyuh.util.StringUtils;
import io.lettuce.core.ScriptOutputType;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;

/* loaded from: input_file:com/github/xiaolyuh/listener/RedisMessageService.class */
public class RedisMessageService {
    private static final Logger logger = LoggerFactory.getLogger(RedisMessageService.class);
    private static final AtomicLong OFFSET = new AtomicLong(-1);
    private static volatile long LAST_PUSH_TIME = 0;
    private static volatile long LAST_PULL_TIME = 0;
    public static final long RECONNECTION_TIME = 10000;
    private AbstractCacheManager cacheManager;
    public static final String LUA_SCRIPT = "local messageKey = KEYS[1] local oldOffset = tonumber(ARGV[1]) local maxOffset = redis.call('llen', messageKey) - 1 if maxOffset < 0 then     return { maxOffset, {} } end if oldOffset >= maxOffset then     return { maxOffset, {} } end local messages = redis.call('lrange', messageKey, 0, maxOffset - oldOffset - 1) return { maxOffset, messages }";

    public RedisMessageService init(AbstractCacheManager abstractCacheManager) {
        this.cacheManager = abstractCacheManager;
        return this;
    }

    public void pullMessage() {
        RedisClient redisClient = this.cacheManager.getRedisClient();
        String messageRedisKey = GlobalConfig.getMessageRedisKey();
        synchronized (this) {
            List list = (List) redisClient.eval(LUA_SCRIPT, ScriptOutputType.MULTI, Collections.singletonList(messageRedisKey), Collections.singletonList(String.valueOf(OFFSET.get())));
            if (CollectionUtils.isEmpty(list) || list.size() < 2) {
                throw new RuntimeException("拉取清除一级缓存的消息失败，Lua表达式执行错误");
            }
            long longValue = ((Long) list.get(0)).longValue();
            List list2 = (List) list.get(1);
            if (longValue < 0 || CollectionUtils.isEmpty(list2)) {
                return;
            }
            OFFSET.set(longValue > 0 ? longValue : 0L);
            updateLastPullTime();
            Iterator it = list2.iterator();
            while (it.hasNext()) {
                String str = (String) GlobalConfig.GLOBAL_REDIS_SERIALIZER.deserialize((byte[]) it.next(), String.class);
                if (logger.isDebugEnabled()) {
                    logger.debug("redis 通过PULL方式处理本地缓，消息内容：{}", str);
                }
                if (!StringUtils.isBlank(str)) {
                    RedisPubSubMessage redisPubSubMessage = (RedisPubSubMessage) JSON.parseObject(str, RedisPubSubMessage.class);
                    for (Cache cache : this.cacheManager.getCache(redisPubSubMessage.getCacheName())) {
                        if (cache instanceof LayeringCache) {
                            removeSecondCache(redisClient, redisPubSubMessage, (LayeringCache) cache);
                            removeFirstCache(redisPubSubMessage, (LayeringCache) cache);
                        }
                    }
                }
            }
        }
    }

    private void removeFirstCache(RedisPubSubMessage redisPubSubMessage, LayeringCache layeringCache) {
        switch (redisPubSubMessage.getMessageType()) {
            case EVICT:
                layeringCache.getFirstCache().evict(redisPubSubMessage.getKey());
                logger.info("删除一级缓存 {} 数据,key={}", redisPubSubMessage.getCacheName(), redisPubSubMessage.getKey());
                return;
            case CLEAR:
                layeringCache.getFirstCache().clear();
                logger.info("清除一级缓存 {} 数据", redisPubSubMessage.getCacheName());
                return;
            default:
                logger.error("接收到没有定义的消息数据");
                return;
        }
    }

    private void removeSecondCache(RedisClient redisClient, RedisPubSubMessage redisPubSubMessage, LayeringCache layeringCache) {
        if (RedisPubSubMessage.SOURCE.equals(redisPubSubMessage.getSource())) {
            LayeringCacheRedisLock layeringCacheRedisLock = new LayeringCacheRedisLock(redisClient, (RedisPubSubMessageType.EVICT.equals(redisPubSubMessage.getMessageType()) ? redisPubSubMessage.getKey() : redisPubSubMessage.getCacheName()) + "_remove_lock");
            try {
                try {
                    if (layeringCacheRedisLock.lock()) {
                        switch (redisPubSubMessage.getMessageType()) {
                            case EVICT:
                                layeringCache.getSecondCache().evict(redisPubSubMessage.getKey());
                                logger.info("删除二级缓存 {} 数据,key={}", redisPubSubMessage.getCacheName(), redisPubSubMessage.getKey());
                                break;
                            case CLEAR:
                                layeringCache.getSecondCache().clear();
                                logger.info("清除二级级缓存 {} 数据", redisPubSubMessage.getCacheName());
                                break;
                            default:
                                logger.error("接收到没有定义的消息数据");
                                break;
                        }
                        Thread.sleep(5000L);
                    }
                    layeringCacheRedisLock.unlock();
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                    layeringCacheRedisLock.unlock();
                }
            } catch (Throwable th) {
                layeringCacheRedisLock.unlock();
                throw th;
            }
        }
    }

    public void clearMessageQueue() {
        if (new LayeringCacheRedisLock(this.cacheManager.getRedisClient(), GlobalConfig.getMessageRedisKey(), 60).lock()) {
            this.cacheManager.getRedisClient().delete(GlobalConfig.getMessageRedisKey());
        }
        OFFSET.getAndSet(-1L);
    }

    public void syncOffset() {
        long longValue = this.cacheManager.getRedisClient().llen(GlobalConfig.getMessageRedisKey()).longValue() - 1;
        if (longValue < 0) {
            return;
        }
        OFFSET.getAndSet(longValue > 0 ? longValue : 0L);
    }

    public void reconnection() {
        if (LAST_PULL_TIME - LAST_PUSH_TIME >= RECONNECTION_TIME) {
            try {
                updateLastPushTime();
                ((RedisMessageListener) BeanFactory.getBean(RedisMessageListener.class)).init(this.cacheManager);
            } catch (Exception e) {
                logger.error("layering-cache 清楚一级缓存异常：{}", e.getMessage(), e);
            }
        }
    }

    public static void updateLastPullTime() {
        LAST_PULL_TIME = System.currentTimeMillis();
    }

    public static void updateLastPushTime() {
        LAST_PUSH_TIME = System.currentTimeMillis();
    }
}
