package org.apache.drill.exec.work;

import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry;
import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.drill.exec.cache.DistributedCache;
import org.apache.drill.exec.coord.ClusterCoordinator;
import org.apache.drill.exec.proto.CoordinationProtos;
import org.apache.drill.exec.proto.ExecProtos;
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.rpc.NamedThreadFactory;
import org.apache.drill.exec.rpc.control.Controller;
import org.apache.drill.exec.rpc.control.WorkEventBus;
import org.apache.drill.exec.rpc.data.DataConnectionCreator;
import org.apache.drill.exec.rpc.data.DataResponseHandler;
import org.apache.drill.exec.rpc.data.DataResponseHandlerImpl;
import org.apache.drill.exec.server.BootStrapContext;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.sys.PStoreProvider;
import org.apache.drill.exec.work.batch.ControlHandlerImpl;
import org.apache.drill.exec.work.batch.ControlMessageHandler;
import org.apache.drill.exec.work.foreman.Foreman;
import org.apache.drill.exec.work.foreman.QueryStatus;
import org.apache.drill.exec.work.fragment.FragmentExecutor;
import org.apache.drill.exec.work.fragment.FragmentManager;
import org.apache.drill.exec.work.user.UserWorker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/drill/exec/work/WorkManager.class */
public class WorkManager implements Closeable {
    static final Logger logger = LoggerFactory.getLogger(WorkManager.class);
    private BootStrapContext bContext;
    private DrillbitContext dContext;
    private ExecutorService executor;
    private Set<FragmentManager> incomingFragments = Collections.newSetFromMap(Maps.newConcurrentMap());
    private LinkedBlockingQueue<RunnableWrapper> pendingTasks = Queues.newLinkedBlockingQueue();
    private Map<ExecProtos.FragmentHandle, FragmentExecutor> runningFragments = Maps.newConcurrentMap();
    private ConcurrentMap<UserBitShared.QueryId, Foreman> queries = Maps.newConcurrentMap();
    private ConcurrentMap<UserBitShared.QueryId, QueryStatus> status = Maps.newConcurrentMap();
    private final WorkerBee bee = new WorkerBee();
    private final WorkEventBus workBus = new WorkEventBus(this.bee);
    private final ControlMessageHandler controlMessageWorker = new ControlHandlerImpl(this.bee);
    private final UserWorker userWorker = new UserWorker(this.bee);
    private final EventThread eventThread = new EventThread();
    private final DataResponseHandler dataHandler = new DataResponseHandlerImpl(this.bee);

    /* loaded from: input_file:org/apache/drill/exec/work/WorkManager$EventThread.class */
    private class EventThread extends Thread {
        public EventThread() {
            setDaemon(true);
            setName("WorkManager Event Thread");
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (true) {
                try {
                    RunnableWrapper runnableWrapper = (RunnableWrapper) WorkManager.this.pendingTasks.take();
                    if (runnableWrapper != null) {
                        WorkManager.logger.debug("Starting pending task {}", runnableWrapper);
                        if (runnableWrapper.inner instanceof FragmentExecutor) {
                            FragmentExecutor fragmentExecutor = (FragmentExecutor) runnableWrapper.inner;
                            WorkManager.this.runningFragments.put(fragmentExecutor.getContext().getHandle(), fragmentExecutor);
                        }
                        WorkManager.this.executor.execute(runnableWrapper);
                    }
                } catch (InterruptedException e) {
                    WorkManager.logger.info("Work Manager stopping as it was interrupted.");
                    return;
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/drill/exec/work/WorkManager$RunnableWrapper.class */
    public class RunnableWrapper implements Runnable {
        final Runnable inner;
        private final String id;

        public RunnableWrapper(Runnable runnable, String str) {
            this.inner = runnable;
            this.id = str;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                this.inner.run();
            } catch (Error | Exception e) {
                WorkManager.logger.error("Failure while running wrapper [{}]", this.id, e);
            }
        }
    }

    /* loaded from: input_file:org/apache/drill/exec/work/WorkManager$WorkerBee.class */
    public class WorkerBee {
        public WorkerBee() {
        }

        public void addFragmentRunner(FragmentExecutor fragmentExecutor) {
            WorkManager.logger.debug("Adding pending task {}", fragmentExecutor);
            WorkManager.this.pendingTasks.add(new RunnableWrapper(fragmentExecutor, WorkManager.getId(fragmentExecutor.getContext().getHandle())));
        }

        public void addNewForeman(Foreman foreman) {
            WorkManager.this.pendingTasks.add(new RunnableWrapper(foreman, "Foreman: " + QueryIdHelper.getQueryId(foreman.getQueryId())));
            WorkManager.this.queries.put(foreman.getQueryId(), foreman);
        }

        public void addFragmentPendingRemote(FragmentManager fragmentManager) {
            WorkManager.this.incomingFragments.add(fragmentManager);
        }

        public void startFragmentPendingRemote(FragmentManager fragmentManager) {
            WorkManager.this.incomingFragments.remove(fragmentManager);
            FragmentExecutor runnable = fragmentManager.getRunnable();
            WorkManager.this.pendingTasks.add(new RunnableWrapper(runnable, WorkManager.getId(runnable.getContext().getHandle())));
        }

        public FragmentExecutor getFragmentRunner(ExecProtos.FragmentHandle fragmentHandle) {
            return (FragmentExecutor) WorkManager.this.runningFragments.get(fragmentHandle);
        }

        public void removeFragment(ExecProtos.FragmentHandle fragmentHandle) {
            WorkManager.this.runningFragments.remove(fragmentHandle);
        }

        public Foreman getForemanForQueryId(UserBitShared.QueryId queryId) {
            return (Foreman) WorkManager.this.queries.get(queryId);
        }

        public void retireForeman(Foreman foreman) {
            WorkManager.this.queries.remove(foreman.getQueryId(), foreman);
        }

        public DrillbitContext getContext() {
            return WorkManager.this.dContext;
        }
    }

    public WorkManager(BootStrapContext bootStrapContext) {
        this.bContext = bootStrapContext;
    }

    public void start(CoordinationProtos.DrillbitEndpoint drillbitEndpoint, DistributedCache distributedCache, Controller controller, DataConnectionCreator dataConnectionCreator, ClusterCoordinator clusterCoordinator, PStoreProvider pStoreProvider) {
        this.dContext = new DrillbitContext(drillbitEndpoint, this.bContext, clusterCoordinator, controller, dataConnectionCreator, distributedCache, this.workBus, pStoreProvider);
        this.executor = Executors.newCachedThreadPool(new NamedThreadFactory("WorkManager-"));
        this.eventThread.start();
        this.dContext.getMetrics().register(MetricRegistry.name("drill.exec.work.running_fragments." + this.dContext.getEndpoint().getUserPort(), new String[0]), new Gauge<Integer>() { // from class: org.apache.drill.exec.work.WorkManager.1
            /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
            public Integer m1161getValue() {
                return Integer.valueOf(WorkManager.this.runningFragments.size());
            }
        });
        this.dContext.getMetrics().register(MetricRegistry.name("drill.exec.work.pendingTasks" + this.dContext.getEndpoint().getUserPort(), new String[0]), new Gauge<Integer>() { // from class: org.apache.drill.exec.work.WorkManager.2
            /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
            public Integer m1162getValue() {
                return Integer.valueOf(WorkManager.this.pendingTasks.size());
            }
        });
    }

    public WorkEventBus getWorkBus() {
        return this.workBus;
    }

    public DataResponseHandler getDataHandler() {
        return this.dataHandler;
    }

    public ControlMessageHandler getControlMessageHandler() {
        return this.controlMessageWorker;
    }

    public UserWorker getUserWorker() {
        return this.userWorker;
    }

    public WorkerBee getBee() {
        return this.bee;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        try {
            if (this.executor != null) {
                this.executor.awaitTermination(1L, TimeUnit.SECONDS);
            }
        } catch (InterruptedException e) {
            logger.warn("Executor interrupted while awaiting termination");
        }
    }

    public DrillbitContext getContext() {
        return this.dContext;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static String getId(ExecProtos.FragmentHandle fragmentHandle) {
        return "FragmentExecutor: " + QueryIdHelper.getQueryId(fragmentHandle.getQueryId()) + ':' + fragmentHandle.getMajorFragmentId() + ':' + fragmentHandle.getMinorFragmentId();
    }
}
