package backtype.storm.contrib.signals.spout;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.base.BaseRichSpout;
import com.netflix.curator.framework.CuratorFramework;
import com.netflix.curator.framework.CuratorFrameworkFactory;
import com.netflix.curator.framework.api.BackgroundPathable;
import com.netflix.curator.retry.RetryNTimes;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:backtype/storm/contrib/signals/spout/BaseSignalSpout.class */
public abstract class BaseSignalSpout extends BaseRichSpout implements Watcher {
    private static final Logger LOG = LoggerFactory.getLogger(BaseSignalSpout.class);
    private static final String namespace = "storm-signals";
    private String name;
    private CuratorFramework client;

    /* renamed from: backtype.storm.contrib.signals.spout.BaseSignalSpout$1, reason: invalid class name */
    /* loaded from: input_file:backtype/storm/contrib/signals/spout/BaseSignalSpout$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$zookeeper$Watcher$Event$EventType = new int[Watcher.Event.EventType.values().length];

        static {
            try {
                $SwitchMap$org$apache$zookeeper$Watcher$Event$EventType[Watcher.Event.EventType.NodeCreated.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$zookeeper$Watcher$Event$EventType[Watcher.Event.EventType.NodeDataChanged.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$zookeeper$Watcher$Event$EventType[Watcher.Event.EventType.NodeDeleted.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    public BaseSignalSpout(String str) {
        this.name = str;
    }

    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        try {
            initZookeeper(map);
        } catch (Exception e) {
            LOG.error("Error creating zookeeper client.", e);
        }
    }

    private void initZookeeper(Map map) throws Exception {
        this.client = CuratorFrameworkFactory.builder().namespace(namespace).connectString(zkHosts(map)).retryPolicy(new RetryNTimes(((Integer) map.get("storm.zookeeper.retry.times")).intValue(), ((Integer) map.get("storm.zookeeper.retry.interval")).intValue())).build();
        this.client.start();
        if (((Stat) ((BackgroundPathable) this.client.checkExists().usingWatcher(this)).forPath(this.name)) == null) {
            LOG.info("Created: " + ((String) this.client.create().creatingParentsIfNeeded().forPath(this.name)));
        }
    }

    private String zkHosts(Map map) {
        int intValue = ((Integer) map.get("storm.zookeeper.port")).intValue();
        Iterator it = ((List) map.get("storm.zookeeper.servers")).iterator();
        StringBuffer stringBuffer = new StringBuffer();
        while (it.hasNext()) {
            stringBuffer.append((String) it.next());
            stringBuffer.append(":");
            stringBuffer.append(intValue);
            if (it.hasNext()) {
                stringBuffer.append(",");
            }
        }
        return stringBuffer.toString();
    }

    public void close() {
        super.close();
        this.client.close();
    }

    public void process(WatchedEvent watchedEvent) {
        try {
            ((BackgroundPathable) this.client.checkExists().usingWatcher(this)).forPath(this.name);
            LOG.debug("Renewed watch for path %s", this.name);
        } catch (Exception e) {
            LOG.error("Error renewing watch.", e);
        }
        switch (AnonymousClass1.$SwitchMap$org$apache$zookeeper$Watcher$Event$EventType[watchedEvent.getType().ordinal()]) {
            case 1:
                LOG.debug("Node created.");
                return;
            case 2:
                LOG.debug("Received signal.");
                try {
                    onSignal((byte[]) this.client.getData().forPath(watchedEvent.getPath()));
                    return;
                } catch (Exception e2) {
                    LOG.warn("Unable to process signal.", e2);
                    return;
                }
            case 3:
                LOG.debug("NodeDeleted");
                return;
            default:
                return;
        }
    }

    protected abstract void onSignal(byte[] bArr);
}
