package org.apache.asterix.event.service;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.StringWriter;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.asterix.common.api.IClusterManagementWork;
import org.apache.asterix.event.error.EventException;
import org.apache.asterix.event.model.AsterixInstance;
import org.apache.asterix.installer.schema.conf.Configuration;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.log4j.Logger;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

/* loaded from: input_file:org/apache/asterix/event/service/ZooKeeperService.class */
public class ZooKeeperService implements ILookupService {
    private static final int ZOOKEEPER_LEADER_CONN_PORT = 2222;
    private static final int ZOOKEEPER_LEADER_ELEC_PORT = 2223;
    private static final int ZOOKEEPER_SESSION_TIME_OUT = 40000;
    private ZooKeeper zk;
    private String zkConnectionString;
    public static final int DEFAULT_NODE_VERSION = -1;
    private static final Logger LOGGER = Logger.getLogger(ZooKeeperService.class.getName());
    private static final String ZOOKEEPER_HOME = AsterixEventService.getEventHome() + File.separator + "zookeeper";
    private static final String ZOO_KEEPER_CONFIG = ZOOKEEPER_HOME + File.separator + "zk.cfg";
    public static final String ASTERIX_INSTANCE_BASE_PATH = File.separator + "Asterix";
    public static final String ASTERIX_INSTANCE_STATE_PATH = File.separator + "state";
    public static final String ASTERIX_INSTANCE_STATE_REPORT = File.separator + "clusterState";
    private boolean isRunning = false;
    private LinkedBlockingQueue<String> msgQ = new LinkedBlockingQueue<>();
    private ZooKeeperWatcher watcher = new ZooKeeperWatcher(this.msgQ);

    @Override // org.apache.asterix.event.service.ILookupService
    public boolean isRunning(Configuration configuration) throws Exception {
        List<String> server = configuration.getZookeeper().getServers().getServer();
        int intValue = configuration.getZookeeper().getClientPort().intValue();
        StringBuilder sb = new StringBuilder();
        Iterator<String> it = server.iterator();
        while (it.hasNext()) {
            sb.append(it.next());
            sb.append(":");
            sb.append(intValue);
            sb.append(",");
        }
        if (sb.length() > 0) {
            sb.deleteCharAt(sb.length() - 1);
        }
        this.zkConnectionString = sb.toString();
        this.zk = new ZooKeeper(this.zkConnectionString, ZOOKEEPER_SESSION_TIME_OUT, this.watcher);
        try {
            this.zk.exists("/dummy", this.watcher);
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("ZooKeeper running at " + ((Object) sb));
            }
            createRootIfNotExist();
            this.isRunning = true;
        } catch (KeeperException e) {
            this.isRunning = false;
        }
        return this.isRunning;
    }

    @Override // org.apache.asterix.event.service.ILookupService
    public void startService(Configuration configuration) throws Exception {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Starting ZooKeeper at " + this.zkConnectionString);
        }
        ZookeeperUtil.writeConfiguration(ZOO_KEEPER_CONFIG, configuration, ZOOKEEPER_LEADER_CONN_PORT, ZOOKEEPER_LEADER_ELEC_PORT);
        String str = ZOOKEEPER_HOME + File.separator + "bin" + File.separator + "zk.init";
        StringBuilder sb = new StringBuilder();
        sb.append(str + " ");
        sb.append(configuration.getZookeeper().getHomeDir() + " ");
        sb.append(configuration.getZookeeper().getServers().getJavaHome() + " ");
        Iterator<String> it = configuration.getZookeeper().getServers().getServer().iterator();
        while (it.hasNext()) {
            sb.append(it.next() + " ");
        }
        Process exec = Runtime.getRuntime().exec(sb.toString());
        int waitFor = exec.waitFor();
        Pair<CharSequence, CharSequence> processStreams = getProcessStreams(exec);
        if (waitFor != 0) {
            StringBuilder sb2 = new StringBuilder("Error starting zookeeper server; output code = ");
            sb2.append(waitFor);
            appendNonEmptyStreams(processStreams, sb2);
            throw new Exception(sb2.toString());
        }
        this.zk = new ZooKeeper(this.zkConnectionString, ZOOKEEPER_SESSION_TIME_OUT, this.watcher);
        if (this.msgQ.poll(60L, TimeUnit.SECONDS) == null) {
            StringBuilder sb3 = new StringBuilder("Unable to start Zookeeper Service. This could be because of the following reasons.\n1) Managix is incorrectly configured. Please run managix validate to run a validation test and correct the errors reported.\n2) If validation in (1) is successful, ensure that java_home parameter is set correctly in Managix configuration (" + AsterixEventServiceUtil.MANAGIX_CONF_XML + ")");
            appendNonEmptyStreams(processStreams, sb3);
            throw new Exception(sb3.toString());
        }
        this.msgQ.take();
        createRootIfNotExist();
    }

    private void appendNonEmptyStreams(Pair<CharSequence, CharSequence> pair, StringBuilder sb) {
        appendIfNotEmpty(sb, (CharSequence) pair.getLeft(), "stdout");
        appendIfNotEmpty(sb, (CharSequence) pair.getRight(), "stderr");
    }

    private Pair<CharSequence, CharSequence> getProcessStreams(Process process) throws IOException {
        StringWriter stringWriter = new StringWriter();
        StringWriter stringWriter2 = new StringWriter();
        IOUtils.copy(process.getInputStream(), stringWriter, Charset.defaultCharset());
        IOUtils.copy(process.getErrorStream(), stringWriter2, Charset.defaultCharset());
        return new ImmutablePair(stringWriter.getBuffer(), stringWriter2.getBuffer());
    }

    private void appendIfNotEmpty(StringBuilder sb, CharSequence charSequence, String str) {
        if (charSequence.length() > 0) {
            sb.append(", ").append(str).append(" = ").append(charSequence);
        }
    }

    @Override // org.apache.asterix.event.service.ILookupService
    public void stopService(Configuration configuration) throws Exception {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Stopping ZooKeeper running at " + this.zkConnectionString);
        }
        String str = ZOOKEEPER_HOME + File.separator + "bin" + File.separator + "stop_zk";
        StringBuilder sb = new StringBuilder();
        sb.append(str + " ");
        sb.append(configuration.getZookeeper().getHomeDir() + " ");
        Iterator<String> it = configuration.getZookeeper().getServers().getServer().iterator();
        while (it.hasNext()) {
            sb.append(it.next() + " ");
        }
        Runtime.getRuntime().exec(sb.toString());
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Stopped ZooKeeper service at " + this.zkConnectionString);
        }
    }

    @Override // org.apache.asterix.event.service.ILookupService
    public void writeAsterixInstance(AsterixInstance asterixInstance) throws Exception {
        String str = ASTERIX_INSTANCE_BASE_PATH + File.separator + asterixInstance.getName();
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        new ObjectOutputStream(byteArrayOutputStream).writeObject(asterixInstance);
        this.zk.create(str, byteArrayOutputStream.toByteArray(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        this.zk.create(str + ASTERIX_INSTANCE_STATE_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    }

    private void createRootIfNotExist() throws Exception {
        try {
            if (this.zk.exists(ASTERIX_INSTANCE_BASE_PATH, false) == null) {
                this.zk.create(ASTERIX_INSTANCE_BASE_PATH, "root".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
        } catch (Exception e) {
            LOGGER.error("An error took place when creating the root in Zookeeper");
            e.printStackTrace();
            createRootIfNotExist();
        }
    }

    @Override // org.apache.asterix.event.service.ILookupService
    public AsterixInstance getAsterixInstance(String str) throws Exception {
        String str2 = ASTERIX_INSTANCE_BASE_PATH + File.separator + str;
        if (this.zk.exists(ASTERIX_INSTANCE_BASE_PATH + File.separator + str, false) == null) {
            return null;
        }
        return readAsterixInstanceObject(this.zk.getData(str2, false, new Stat()));
    }

    @Override // org.apache.asterix.event.service.ILookupService
    public boolean exists(String str) throws Exception {
        return this.zk.exists(new StringBuilder().append(ASTERIX_INSTANCE_BASE_PATH).append(File.separator).append(str).toString(), false) != null;
    }

    @Override // org.apache.asterix.event.service.ILookupService
    public void removeAsterixInstance(String str) throws Exception {
        if (!exists(str)) {
            throw new EventException("Asterix instance by name " + str + " does not exists.");
        }
        if (exists(str + ASTERIX_INSTANCE_STATE_PATH)) {
            if (exists(str + ASTERIX_INSTANCE_STATE_PATH + File.separator + "clusterState")) {
                this.zk.delete(ASTERIX_INSTANCE_BASE_PATH + File.separator + str + ASTERIX_INSTANCE_STATE_PATH + ASTERIX_INSTANCE_STATE_REPORT, -1);
            }
            this.zk.delete(ASTERIX_INSTANCE_BASE_PATH + File.separator + str + ASTERIX_INSTANCE_STATE_PATH, -1);
        }
        this.zk.delete(ASTERIX_INSTANCE_BASE_PATH + File.separator + str, -1);
    }

    @Override // org.apache.asterix.event.service.ILookupService
    public List<AsterixInstance> getAsterixInstances() throws Exception {
        List children = this.zk.getChildren(ASTERIX_INSTANCE_BASE_PATH, false);
        ArrayList arrayList = new ArrayList();
        Iterator it = children.iterator();
        while (it.hasNext()) {
            arrayList.add(readAsterixInstanceObject(this.zk.getData(ASTERIX_INSTANCE_BASE_PATH + File.separator + ((String) it.next()), false, new Stat())));
        }
        return arrayList;
    }

    private AsterixInstance readAsterixInstanceObject(byte[] bArr) throws IOException, ClassNotFoundException {
        return (AsterixInstance) new ObjectInputStream(new ByteArrayInputStream(bArr)).readObject();
    }

    @Override // org.apache.asterix.event.service.ILookupService
    public void updateAsterixInstance(AsterixInstance asterixInstance) throws Exception {
        removeAsterixInstance(asterixInstance.getName());
        writeAsterixInstance(asterixInstance);
    }

    @Override // org.apache.asterix.event.service.ILookupService
    public ClusterStateWatcher startWatchingClusterState(String str) {
        ClusterStateWatcher clusterStateWatcher = new ClusterStateWatcher(this.zk, str);
        clusterStateWatcher.startMonitoringThread();
        return clusterStateWatcher;
    }

    @Override // org.apache.asterix.event.service.ILookupService
    public void reportClusterState(String str, IClusterManagementWork.ClusterState clusterState) throws Exception {
        this.zk.create((ASTERIX_INSTANCE_BASE_PATH + File.separator + str + ASTERIX_INSTANCE_STATE_PATH) + ASTERIX_INSTANCE_STATE_REPORT, new byte[]{Integer.valueOf(clusterState.ordinal()).byteValue()}, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    }
}
