package alluxio.worker.job.task;

import alluxio.collections.Pair;
import alluxio.conf.PropertyKey;
import alluxio.conf.ServerConfiguration;
import alluxio.grpc.RunTaskCommand;
import alluxio.job.ErrorUtils;
import alluxio.job.RunTaskContext;
import alluxio.job.wire.Status;
import alluxio.job.wire.TaskInfo;
import alluxio.util.ThreadFactoryUtils;
import alluxio.util.logging.SamplingLogger;
import alluxio.wire.WorkerNetAddress;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
/* loaded from: input_file:alluxio/worker/job/task/TaskExecutorManager.class */
public class TaskExecutorManager {
    private static final Logger LOG = LoggerFactory.getLogger(TaskExecutorManager.class);
    private static final SamplingLogger SAMPLING_LOGGER = new SamplingLogger(LOG, 30000);
    private static final int MAX_TASK_EXECUTOR_POOL_SIZE = 10000;
    private final PausableThreadPoolExecutor mTaskExecutionService;
    private final WorkerNetAddress mAddress;
    private int mDefaultTaskExecutorPoolSize;
    private final Map<Pair<Long, Long>, Future<?>> mTaskFutures = Maps.newHashMap();
    private final Map<Pair<Long, Long>, TaskInfo> mUnfinishedTasks = Maps.newHashMap();
    private final Map<Pair<Long, Long>, TaskInfo> mTaskUpdates = Maps.newHashMap();
    private boolean mThrottled = false;

    public TaskExecutorManager(int i, WorkerNetAddress workerNetAddress) {
        this.mTaskExecutionService = new PausableThreadPoolExecutor(i, MAX_TASK_EXECUTOR_POOL_SIZE, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), ThreadFactoryUtils.build("task-execution-service-%d", true));
        this.mDefaultTaskExecutorPoolSize = i;
        this.mAddress = workerNetAddress;
    }

    public int getNumActiveTasks() {
        return this.mTaskExecutionService.getNumActiveTasks();
    }

    public int getTaskExecutorPoolSize() {
        return this.mTaskExecutionService.getCorePoolSize();
    }

    private synchronized void setTaskExecutorPoolSize(int i) {
        Preconditions.checkArgument(i >= 0);
        Preconditions.checkArgument(i <= MAX_TASK_EXECUTOR_POOL_SIZE);
        if (i == 0) {
            this.mTaskExecutionService.pause();
        } else {
            this.mTaskExecutionService.resume();
        }
        this.mTaskExecutionService.setCorePoolSize(i);
    }

    public synchronized void setDefaultTaskExecutorPoolSize(int i) {
        this.mDefaultTaskExecutorPoolSize = i;
        if (this.mThrottled) {
            return;
        }
        setTaskExecutorPoolSize(this.mDefaultTaskExecutorPoolSize);
    }

    public synchronized void throttle() {
        this.mThrottled = true;
        setTaskExecutorPoolSize(0);
    }

    public synchronized void unthrottle() {
        this.mThrottled = false;
        setTaskExecutorPoolSize(this.mDefaultTaskExecutorPoolSize);
    }

    public int unfinishedTasks() {
        return this.mUnfinishedTasks.size();
    }

    public synchronized void notifyTaskRunning(long j, long j2) {
        this.mUnfinishedTasks.get(new Pair(Long.valueOf(j), Long.valueOf(j2))).setStatus(Status.RUNNING);
        LOG.info("Task {} for job {} started", Long.valueOf(j2), Long.valueOf(j));
    }

    public synchronized void notifyTaskCompletion(long j, long j2, Serializable serializable) {
        Pair<Long, Long> pair = new Pair<>(Long.valueOf(j), Long.valueOf(j2));
        TaskInfo taskInfo = this.mUnfinishedTasks.get(pair);
        taskInfo.setStatus(Status.COMPLETED);
        taskInfo.setResult(serializable);
        finishTask(pair);
        LOG.info("Task {} for job {} completed.", Long.valueOf(j2), Long.valueOf(j));
    }

    public synchronized void notifyTaskFailure(long j, long j2, Throwable th) {
        Pair<Long, Long> pair = new Pair<>(Long.valueOf(j), Long.valueOf(j2));
        TaskInfo taskInfo = this.mUnfinishedTasks.get(pair);
        taskInfo.setStatus(Status.FAILED);
        String stackTraceAsString = ServerConfiguration.getBoolean(PropertyKey.DEBUG) ? Throwables.getStackTraceAsString(th) : th.getMessage();
        taskInfo.setErrorType(ErrorUtils.getErrorType(th));
        if (stackTraceAsString != null) {
            taskInfo.setErrorMessage(stackTraceAsString);
        }
        finishTask(pair);
        LOG.info("Task {} for job {} failed: {}", new Object[]{Long.valueOf(j2), Long.valueOf(j), stackTraceAsString});
        SAMPLING_LOGGER.info("Stack trace for taskId: {} jobId: {} : {}", new Object[]{Long.valueOf(j2), Long.valueOf(j), Throwables.getStackTraceAsString(th)});
    }

    public synchronized void executeTask(long j, long j2, RunTaskCommand runTaskCommand, RunTaskContext runTaskContext) {
        Future<?> submit = this.mTaskExecutionService.submit(new TaskExecutor(j, j2, runTaskCommand, runTaskContext, this));
        Pair<Long, Long> pair = new Pair<>(Long.valueOf(j), Long.valueOf(j2));
        this.mTaskFutures.put(pair, submit);
        TaskInfo taskInfo = new TaskInfo(j, j2, Status.CREATED, this.mAddress, (Object) null);
        this.mUnfinishedTasks.put(pair, taskInfo);
        this.mTaskUpdates.put(pair, taskInfo);
        LOG.info("Task {} for job {} received", Long.valueOf(j2), Long.valueOf(j));
    }

    public synchronized void cancelTask(long j, long j2) {
        Pair<Long, Long> pair = new Pair<>(Long.valueOf(j), Long.valueOf(j2));
        TaskInfo taskInfo = this.mUnfinishedTasks.get(pair);
        if (!this.mTaskFutures.containsKey(pair) || taskInfo.getStatus().equals(Status.CANCELED)) {
            return;
        }
        LOG.info("Task {} for job {} canceled", Long.valueOf(j2), Long.valueOf(j));
        if (this.mTaskFutures.get(pair).cancel(true)) {
            taskInfo.setStatus(Status.CANCELED);
        } else {
            taskInfo.setStatus(Status.FAILED);
            taskInfo.setErrorType("FailedCancel");
            taskInfo.setErrorMessage("Failed to cancel the task");
        }
        finishTask(pair);
    }

    public synchronized List<TaskInfo> getAndClearTaskUpdates() {
        try {
            return ImmutableList.copyOf(this.mTaskUpdates.values());
        } finally {
            this.mTaskUpdates.clear();
        }
    }

    public synchronized void restoreTaskUpdates(List<TaskInfo> list) {
        for (TaskInfo taskInfo : list) {
            Pair<Long, Long> pair = new Pair<>(taskInfo.getParentId(), Long.valueOf(taskInfo.getId()));
            if (!this.mTaskUpdates.containsKey(pair)) {
                this.mTaskUpdates.put(pair, taskInfo);
            }
        }
    }

    private void finishTask(Pair<Long, Long> pair) {
        TaskInfo taskInfo = this.mUnfinishedTasks.get(pair);
        this.mTaskFutures.remove(pair);
        this.mUnfinishedTasks.remove(pair);
        this.mTaskUpdates.put(pair, taskInfo);
    }
}
