package com.github.phantomthief.zookeeper.broadcast;

import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.function.Supplier;
import javax.annotation.Nonnull;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/github/phantomthief/zookeeper/broadcast/ZkBroadcaster.class */
public class ZkBroadcaster {
    private static final String DEFAULT_ZK_PREFIX = "/broadcast";
    private final Logger logger;
    private final Supplier<CuratorFramework> curatorFactory;
    private final String zkPrefix;
    private final ConcurrentMap<String, Set<Runnable>> subscribeMap;
    private final ConcurrentMap<String, NodeCache> nodeCacheMap;

    public ZkBroadcaster(Supplier<CuratorFramework> supplier, String str) {
        this.logger = LoggerFactory.getLogger(getClass());
        this.subscribeMap = new ConcurrentHashMap();
        this.nodeCacheMap = new ConcurrentHashMap();
        this.curatorFactory = supplier;
        this.zkPrefix = str;
    }

    public ZkBroadcaster(Supplier<CuratorFramework> supplier) {
        this(supplier, DEFAULT_ZK_PREFIX);
    }

    public void subscribe(String str, Runnable runnable) {
        subscribe(str, runnable, MoreExecutors.directExecutor());
    }

    public void subscribe(@Nonnull String str, @Nonnull Runnable runnable, @Nonnull Executor executor) {
        Preconditions.checkNotNull(str);
        Preconditions.checkNotNull(runnable);
        Preconditions.checkNotNull(executor);
        Set<Runnable> compute = this.subscribeMap.compute(str, (str2, set) -> {
            if (set == null) {
                set = new HashSet();
            }
            set.add(runnable);
            return set;
        });
        this.nodeCacheMap.computeIfAbsent(str, str3 -> {
            NodeCache nodeCache = new NodeCache(this.curatorFactory.get(), ZKPaths.makePath(this.zkPrefix, str3));
            try {
                nodeCache.start();
                nodeCache.rebuild();
                nodeCache.getListenable().addListener(() -> {
                    compute.parallelStream().forEach(runnable2 -> {
                        try {
                            runnable2.run();
                        } catch (Throwable th) {
                            this.logger.error("Ops. fail to do handle for:{}->{}", new Object[]{this.zkPrefix, runnable2, th});
                        }
                    });
                }, executor);
                return nodeCache;
            } catch (Throwable th) {
                Throwables.throwIfUnchecked(th);
                throw new RuntimeException(th);
            }
        });
    }

    public void broadcast(String str, String str2) {
        String makePath = ZKPaths.makePath(this.zkPrefix, str);
        try {
            try {
                this.curatorFactory.get().setData().forPath(makePath, str2.getBytes());
            } catch (KeeperException.NoNodeException e) {
                this.curatorFactory.get().create().creatingParentsIfNeeded().forPath(makePath, str2.getBytes());
            }
        } catch (Throwable th) {
            Throwables.throwIfUnchecked(th);
            throw new RuntimeException(th);
        }
    }
}
