package elephantdb.cascading;

import cascading.flow.Flow;
import cascading.flow.FlowProcess;
import cascading.tap.TapException;
import cascading.tap.hadoop.Hfs;
import cascading.tuple.Fields;
import elephantdb.DomainSpec;
import elephantdb.Utils;
import elephantdb.hadoop.ElephantInputFormat;
import elephantdb.hadoop.ElephantOutputFormat;
import elephantdb.hadoop.LocalElephantManager;
import elephantdb.index.IdentityIndexer;
import elephantdb.index.Indexer;
import elephantdb.store.DomainStore;
import java.io.IOException;
import java.io.Serializable;
import java.util.List;
import java.util.UUID;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.log4j.Logger;

/* loaded from: input_file:elephantdb/cascading/ElephantDBTap.class */
public class ElephantDBTap extends Hfs {
    public static final Logger LOG = Logger.getLogger(ElephantDBTap.class);
    String domainDir;
    DomainSpec spec;
    Args args;
    String newVersionPath;

    /* loaded from: input_file:elephantdb/cascading/ElephantDBTap$Args.class */
    public static class Args implements Serializable {
        public List<String> tmpDirs = null;
        public int timeoutMs = 7200000;
        public Gateway gateway = new IdentityGateway();
        public Fields sourceFields = Fields.ALL;
        public Long version = null;
        public Fields sinkFields = Fields.ALL;
        public Indexer indexer = new IdentityIndexer();
        public boolean incremental = false;
    }

    public ElephantDBTap(String str, DomainSpec domainSpec, Args args) throws IOException {
        this.domainDir = str;
        this.args = args;
        this.spec = new DomainStore(str, domainSpec).getSpec();
        setStringPath(this.domainDir);
        setScheme(new ElephantScheme(this.args.sourceFields, this.args.sinkFields, this.spec, this.args.gateway));
    }

    public DomainStore getDomainStore() throws IOException {
        return new DomainStore(this.domainDir, this.spec);
    }

    public DomainSpec getSpec() {
        return this.spec;
    }

    public void sourceConfInit(FlowProcess<JobConf> flowProcess, JobConf jobConf) {
        super.sourceConfInit(flowProcess, jobConf);
        FileInputFormat.setInputPaths(jobConf, "/" + UUID.randomUUID().toString());
        ElephantInputFormat.Args args = new ElephantInputFormat.Args(this.domainDir);
        args.inputDirHdfs = this.domainDir;
        if (this.args.tmpDirs != null) {
            LocalElephantManager.setTmpDirs(jobConf, this.args.tmpDirs);
        }
        args.version = this.args.version;
        jobConf.setInt("mapred.task.timeout", this.args.timeoutMs);
        Utils.setObject(jobConf, "elephant.input.args", args);
    }

    public void sinkConfInit(FlowProcess<JobConf> flowProcess, JobConf jobConf) {
        super.sinkConfInit(flowProcess, jobConf);
        try {
            Utils.setObject(jobConf, "elephant.output.args", outputArgs(jobConf));
            jobConf.setInt("mapred.task.timeout", this.args.timeoutMs);
            jobConf.setNumReduceTasks(this.spec.getNumShards());
            jobConf.setSpeculativeExecution(false);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public ElephantOutputFormat.Args outputArgs(JobConf jobConf) throws IOException {
        DomainStore domainStore = getDomainStore();
        FileSystem fileSystem = domainStore.getFileSystem();
        if (this.newVersionPath == null) {
            this.newVersionPath = domainStore.createVersion();
            this.newVersionPath = new Path(this.newVersionPath).makeQualified(fileSystem).toString();
        }
        ElephantOutputFormat.Args args = new ElephantOutputFormat.Args(this.spec, this.newVersionPath);
        if (this.args.tmpDirs != null) {
            LocalElephantManager.setTmpDirs(jobConf, this.args.tmpDirs);
        }
        if (this.args.indexer != null) {
            args.indexer = this.args.indexer;
        }
        if (this.args.incremental) {
            args.updateDirHdfs = domainStore.mostRecentVersionPath();
        }
        return args;
    }

    public Path getPath() {
        return new Path(this.domainDir);
    }

    public boolean createResource(JobConf jobConf) throws IOException {
        throw new UnsupportedOperationException("Not supported yet.");
    }

    public boolean deleteResource(JobConf jobConf) throws IOException {
        throw new UnsupportedOperationException("Not supported yet.");
    }

    public long getModifiedTime(JobConf jobConf) throws IOException {
        return System.currentTimeMillis();
    }

    public boolean commitResource(JobConf jobConf) {
        try {
            try {
                DomainStore domainStore = getDomainStore();
                domainStore.getFileSystem().mkdirs(new Path(this.newVersionPath));
                if (this.args.incremental) {
                    domainStore.synchronizeInProgressVersion(this.newVersionPath);
                }
                domainStore.succeedVersion(this.newVersionPath);
                this.newVersionPath = null;
                return true;
            } catch (IOException e) {
                throw new TapException("Couldn't finalize new elephant domain version", e);
            }
        } catch (Throwable th) {
            this.newVersionPath = null;
            throw th;
        }
    }

    public boolean rollbackResource(JobConf jobConf) throws IOException {
        getDomainStore().failVersion(this.newVersionPath);
        return true;
    }

    public boolean onThrowable(Flow flow, Throwable th) {
        return false;
    }

    public /* bridge */ /* synthetic */ void sinkConfInit(FlowProcess flowProcess, Object obj) {
        sinkConfInit((FlowProcess<JobConf>) flowProcess, (JobConf) obj);
    }

    public /* bridge */ /* synthetic */ void sourceConfInit(FlowProcess flowProcess, Object obj) {
        sourceConfInit((FlowProcess<JobConf>) flowProcess, (JobConf) obj);
    }
}
