package com.mnt.sio.core.context;

import com.mnt.base.util.CommonUtil;
import com.mnt.sio.core.dtd.StreamData;
import com.mnt.sio.util.DateUtil;
import com.mnt.sio.util.MapUtil;
import com.mnt.sio.util.UtilData;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import java.util.function.Function;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;

/* loaded from: input_file:com/mnt/sio/core/context/AbstractStreamContext.class */
public abstract class AbstractStreamContext implements StreamContext {
    protected static final String UPDATE_TIMESTAMP_KEY = "updateTs";
    private static final int CACHE_MAP_INITIAL_SIZE = 100000;

    @Value("${ticker.persist.num:500}")
    private Integer persistNum;

    @Value("${md.context.latest.allowed.fields:}")
    private String contextAllowedFields;

    @Autowired(required = false)
    private RemoteCacheSupport remoteCacheSupport;
    protected Logger logger = LogManager.getLogger(getClass());
    private Long lastFlushTs = 0L;
    protected final Map<String, Map<String, Object>> latestCache = new ConcurrentHashMap(CACHE_MAP_INITIAL_SIZE);
    protected final Map<String, Map<String, String>> savedCacheMap = new ConcurrentHashMap(CACHE_MAP_INITIAL_SIZE);
    private Set<String> contextAllowedFieldSet = new HashSet();

    @PostConstruct
    public void init() {
        if (CommonUtil.isEmpty(this.contextAllowedFields)) {
            return;
        }
        Arrays.stream(this.contextAllowedFields.split(",")).forEach(str -> {
            this.contextAllowedFieldSet.add(str.trim());
        });
    }

    private Map<String, Object> createLatestMap() {
        return this.contextAllowedFieldSet.isEmpty() ? new ConcurrentHashMap<>() : new ConcurrentHashMap<String, Object>() { // from class: com.mnt.sio.core.context.AbstractStreamContext.1
            private static final long serialVersionUID = 1;

            @Override // java.util.concurrent.ConcurrentHashMap, java.util.AbstractMap, java.util.Map
            public Object put(String str, Object obj) {
                if (AbstractStreamContext.this.contextAllowedFieldSet.contains(str)) {
                    return super.put((AnonymousClass1) str, (String) obj);
                }
                return null;
            }

            @Override // java.util.concurrent.ConcurrentHashMap, java.util.AbstractMap, java.util.Map
            public void putAll(Map<? extends String, ? extends Object> map) {
                HashMap hashMap = new HashMap();
                map.forEach((str, obj) -> {
                    if (AbstractStreamContext.this.contextAllowedFieldSet.contains(str)) {
                        hashMap.put(str, obj);
                    }
                });
                super.putAll(hashMap);
            }

            @Override // java.util.concurrent.ConcurrentHashMap, java.util.Map, java.util.concurrent.ConcurrentMap
            public Object putIfAbsent(String str, Object obj) {
                if (AbstractStreamContext.this.contextAllowedFieldSet.contains(str)) {
                    return super.putIfAbsent((AnonymousClass1) str, (String) obj);
                }
                return null;
            }

            public Object computeIfAbsent(String str, Function<? super String, ? extends Object> function) {
                if (AbstractStreamContext.this.contextAllowedFieldSet.contains(str)) {
                    return super.computeIfAbsent((AnonymousClass1) str, (Function<? super AnonymousClass1, ? extends V>) function);
                }
                throw new RuntimeException("not allow the key to put into the map: " + str);
            }

            public Object computeIfPresent(String str, BiFunction<? super String, ? super Object, ? extends Object> biFunction) {
                if (AbstractStreamContext.this.contextAllowedFieldSet.contains(str)) {
                    return super.computeIfPresent((AnonymousClass1) str, (BiFunction<? super AnonymousClass1, ? super V, ? extends V>) biFunction);
                }
                throw new RuntimeException("not allow the key to put into the map: " + str);
            }

            public Object compute(String str, BiFunction<? super String, ? super Object, ? extends Object> biFunction) {
                if (AbstractStreamContext.this.contextAllowedFieldSet.contains(str)) {
                    return super.compute((AnonymousClass1) str, (BiFunction<? super AnonymousClass1, ? super V, ? extends V>) biFunction);
                }
                throw new RuntimeException("not allow the key to put into the map: " + str);
            }

            @Override // java.util.concurrent.ConcurrentHashMap, java.util.Map, java.util.concurrent.ConcurrentMap
            public /* bridge */ /* synthetic */ Object compute(Object obj, BiFunction biFunction) {
                return compute((String) obj, (BiFunction<? super String, ? super Object, ? extends Object>) biFunction);
            }

            @Override // java.util.concurrent.ConcurrentHashMap, java.util.Map, java.util.concurrent.ConcurrentMap
            public /* bridge */ /* synthetic */ Object computeIfAbsent(Object obj, Function function) {
                return computeIfAbsent((String) obj, (Function<? super String, ? extends Object>) function);
            }

            @Override // java.util.concurrent.ConcurrentHashMap, java.util.Map, java.util.concurrent.ConcurrentMap
            public /* bridge */ /* synthetic */ Object computeIfPresent(Object obj, BiFunction biFunction) {
                return computeIfPresent((String) obj, (BiFunction<? super String, ? super Object, ? extends Object>) biFunction);
            }
        };
    }

    @Override // com.mnt.sio.core.context.StreamContext
    public StreamData latest(String str, long j) {
        Map<String, Object> map = this.latestCache.get(str);
        if (map == null) {
            map = createLatestMap();
            loadCacheFromRemote(str, map);
            this.latestCache.put(str, map);
        } else if (j != -1) {
            long parseAsLong = CommonUtil.parseAsLong(map.get(UtilData.RTDataKey.TID), -1L);
            if (j - parseAsLong > 1 || j == 0 || j < parseAsLong) {
                this.logger.info("ticker {} tid: {} 不连续或初始加载，内存: {}, 重新加载缓存。", str, Long.valueOf(j), Long.valueOf(parseAsLong));
                loadCacheFromRemote(str, map);
            }
        }
        if (j != -1) {
            map.put(UtilData.RTDataKey.TID, Long.valueOf(j));
        }
        return new StreamData(map);
    }

    @Override // com.mnt.sio.core.context.StreamContext
    public void merge(String str, Map<String, Object> map, Map<String, Object> map2) {
        try {
            for (Map.Entry<String, Object> entry : map2.entrySet()) {
                if (entry.getValue() == null) {
                    map.remove(entry.getKey());
                } else {
                    map.put(entry.getKey(), entry.getValue());
                }
            }
            markToUpdate(map);
        } catch (Exception e) {
            this.logger.error("error when save single cache for:{}, at :{}", str, e);
        }
    }

    public void loadCacheFromRemote(String str, Map<String, Object> map) {
        try {
            Map<String, String> cache = getCache(str);
            if (cache != null) {
                map.putAll(cache);
                Map<String, String> map2 = this.savedCacheMap.get(str);
                if (map2 == null) {
                    map2 = new HashMap();
                    this.savedCacheMap.put(str, map2);
                }
                map2.putAll(cache);
            }
            this.logger.debug("TickerCache[{}]未初始化，初始化后：[{}]", str, map);
        } catch (Exception e) {
            this.logger.error("error when get cache from remote:{} at:{}", str, e);
        }
    }

    @PreDestroy
    public void destory() {
        this.logger.info("====>>>process exit, cache write to redis<<<====");
        persistent();
    }

    @Scheduled(fixedDelayString = "${rtcache.flush.interval:100}")
    private void backUpFlush() {
        persistent();
    }

    protected synchronized void persistent() {
        AtomicLong atomicLong = new AtomicLong(0L);
        AtomicInteger atomicInteger = new AtomicInteger();
        HashMap hashMap = new HashMap();
        long currentTimeMillis = System.currentTimeMillis();
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(this.latestCache.keySet());
        arrayList.stream().forEach(str -> {
            try {
                Map<String, Object> map = this.latestCache.get(str);
                if (CommonUtil.parseAsLong(map.get(UPDATE_TIMESTAMP_KEY)) > this.lastFlushTs.longValue()) {
                    Map<String, String> map2 = this.savedCacheMap.get(str);
                    if (map2 == null) {
                        map2 = new HashMap();
                        this.savedCacheMap.put(str, map2);
                    }
                    Map changed = MapUtil.getChanged(map2, MapUtil.convertStringMap(map));
                    ArrayList arrayList2 = new ArrayList();
                    if (!CommonUtil.isEmpty(changed)) {
                        for (Map.Entry entry : changed.entrySet()) {
                            if (entry.getValue() == null) {
                                map2.remove(entry.getKey());
                                arrayList2.add((String) entry.getKey());
                            } else {
                                map2.put((String) entry.getKey(), (String) entry.getValue());
                            }
                        }
                        arrayList2.forEach(str -> {
                            changed.replace(str, null, "");
                        });
                        hashMap.put(str, changed);
                    }
                    if (atomicInteger.incrementAndGet() % this.persistNum.intValue() == 0 && hashMap.size() > 0) {
                        putToCache(hashMap);
                        hashMap.clear();
                    }
                    atomicLong.incrementAndGet();
                }
            } catch (Exception e) {
                this.logger.error("error when persist data to redis, key:{}, error{}", str, e);
            }
        });
        if (hashMap.size() > 0) {
            putToCache(hashMap);
            hashMap.clear();
        }
        long currentTimeMillis2 = System.currentTimeMillis();
        this.lastFlushTs = Long.valueOf(currentTimeMillis);
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("====<<< write cache to redis finished at :{} for size:{} >>>====", DateUtil.prettyTime(Long.valueOf(currentTimeMillis2 - currentTimeMillis)), Long.valueOf(atomicLong.get()));
        }
        atomicLong.set(0L);
    }

    public static void markToUpdate(Map<String, ? extends Object> map) {
        map.put(UPDATE_TIMESTAMP_KEY, CommonUtil.uncheckedCast(String.valueOf(System.currentTimeMillis())));
    }

    public Map<String, String> getCache(String str) {
        if (str == null) {
            return null;
        }
        return this.remoteCacheSupport.hash(cacheKey(str));
    }

    public void putToCache(Map<String, Map<String, String>> map) {
        this.remoteCacheSupport.bulkOp(remoteCacheSetter -> {
            map.entrySet().forEach(entry -> {
                remoteCacheSetter.hash(cacheKey((String) entry.getKey()), (Map) entry.getValue());
            });
        });
    }
}
