package org.apache.drill.exec.work.foreman;

import io.netty.buffer.ByteBuf;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.base.FragmentRoot;
import org.apache.drill.exec.proto.BitControl;
import org.apache.drill.exec.proto.CoordinationProtos;
import org.apache.drill.exec.proto.ExecProtos;
import org.apache.drill.exec.proto.GeneralRPCProtos;
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.proto.UserProtos;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.control.Controller;
import org.apache.drill.exec.rpc.control.WorkEventBus;
import org.apache.drill.exec.rpc.user.UserServer;
import org.apache.drill.exec.store.sys.PStoreProvider;
import org.apache.drill.exec.work.EndpointListener;
import org.apache.drill.exec.work.WorkManager;
import org.apache.drill.exec.work.batch.IncomingBuffers;
import org.apache.drill.exec.work.foreman.Foreman;
import org.apache.drill.exec.work.fragment.AbstractStatusReporter;
import org.apache.drill.exec.work.fragment.FragmentExecutor;
import org.apache.drill.exec.work.fragment.RootFragmentManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/drill/exec/work/foreman/QueryManager.class */
public class QueryManager implements FragmentStatusListener {
    static final Logger logger;
    private final QueryStatus status;
    private final Controller controller;
    private Foreman.ForemanManagerListener foremanManagerListener;
    private WorkEventBus workBus;
    private UserBitShared.QueryId queryId;
    private FragmentExecutor rootRunner;
    private UserProtos.RunQuery query;
    static final /* synthetic */ boolean $assertionsDisabled;
    private volatile boolean running = false;
    private volatile boolean cancelled = false;
    private volatile boolean stopped = false;
    private AtomicInteger remainingFragmentCount = new AtomicInteger(0);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/drill/exec/work/foreman/QueryManager$CancelListener.class */
    public class CancelListener extends EndpointListener<GeneralRPCProtos.Ack, ExecProtos.FragmentHandle> {
        public CancelListener(CoordinationProtos.DrillbitEndpoint drillbitEndpoint, ExecProtos.FragmentHandle fragmentHandle) {
            super(drillbitEndpoint, fragmentHandle);
        }

        @Override // org.apache.drill.exec.rpc.BaseRpcOutcomeListener, org.apache.drill.exec.rpc.RpcOutcomeListener
        public void failed(RpcException rpcException) {
            QueryManager.logger.error("Failure while attempting to cancel fragment {} on endpoint {}.", this.value, this.endpoint, rpcException);
        }

        @Override // org.apache.drill.exec.rpc.BaseRpcOutcomeListener, org.apache.drill.exec.rpc.RpcOutcomeListener
        public void success(GeneralRPCProtos.Ack ack, ByteBuf byteBuf) {
            if (ack.getOk()) {
                return;
            }
            QueryManager.logger.warn("Remote node {} responded negative on cancellation request for fragment {}.", this.endpoint, ack);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/drill/exec/work/foreman/QueryManager$FragmentSubmitListener.class */
    public class FragmentSubmitListener extends EndpointListener<GeneralRPCProtos.Ack, BitControl.PlanFragment> {
        public FragmentSubmitListener(CoordinationProtos.DrillbitEndpoint drillbitEndpoint, BitControl.PlanFragment planFragment) {
            super(drillbitEndpoint, planFragment);
        }

        @Override // org.apache.drill.exec.rpc.BaseRpcOutcomeListener, org.apache.drill.exec.rpc.RpcOutcomeListener
        public void failed(RpcException rpcException) {
            QueryManager.logger.debug("Failure while sending fragment.  Stopping query.", (Throwable) rpcException);
            QueryManager.this.stopQuery();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/drill/exec/work/foreman/QueryManager$RootStatusHandler.class */
    public class RootStatusHandler extends AbstractStatusReporter {
        private RootStatusHandler(FragmentContext fragmentContext, BitControl.PlanFragment planFragment) {
            super(fragmentContext);
        }

        @Override // org.apache.drill.exec.work.fragment.AbstractStatusReporter
        protected void statusChange(ExecProtos.FragmentHandle fragmentHandle, BitControl.FragmentStatus fragmentStatus) {
            QueryManager.this.statusUpdate(fragmentStatus);
        }
    }

    public QueryManager(UserBitShared.QueryId queryId, UserProtos.RunQuery runQuery, PStoreProvider pStoreProvider, Foreman.ForemanManagerListener foremanManagerListener, Controller controller, Foreman foreman) {
        this.foremanManagerListener = foremanManagerListener;
        this.query = runQuery;
        this.queryId = queryId;
        this.controller = controller;
        this.status = new QueryStatus(runQuery, queryId, pStoreProvider, foreman);
    }

    public QueryStatus getStatus() {
        return this.status;
    }

    public void runFragments(WorkManager.WorkerBee workerBee, BitControl.PlanFragment planFragment, FragmentRoot fragmentRoot, UserServer.UserClientConnection userClientConnection, List<BitControl.PlanFragment> list, List<BitControl.PlanFragment> list2) throws ExecutionSetupException {
        logger.debug("Setting up fragment runs.");
        this.remainingFragmentCount.set(list2.size() + list.size() + 1);
        if (!$assertionsDisabled && this.queryId != planFragment.getHandle().getQueryId()) {
            throw new AssertionError();
        }
        this.workBus = workerBee.getContext().getWorkBus();
        logger.debug("Setting up root context.");
        FragmentContext fragmentContext = new FragmentContext(workerBee.getContext(), planFragment, userClientConnection, workerBee.getContext().getFunctionImplementationRegistry());
        logger.debug("Setting up incoming buffers");
        IncomingBuffers incomingBuffers = new IncomingBuffers(fragmentRoot, fragmentContext);
        logger.debug("Setting buffers on root context.");
        fragmentContext.setBuffers(incomingBuffers);
        this.status.add(new FragmentData(planFragment.getHandle(), null, true));
        logger.debug("Fragment added to local node.");
        this.rootRunner = new FragmentExecutor(fragmentContext, workerBee, fragmentRoot, new RootStatusHandler(fragmentContext, planFragment));
        RootFragmentManager rootFragmentManager = new RootFragmentManager(planFragment.getHandle(), incomingBuffers, this.rootRunner);
        if (incomingBuffers.isDone()) {
            workerBee.addFragmentRunner(rootFragmentManager.getRunnable());
        } else {
            this.workBus.setRootFragmentManager(rootFragmentManager);
        }
        for (BitControl.PlanFragment planFragment2 : list2) {
            logger.debug("Tracking intermediate remote node {} with data {}", planFragment2.getAssignment(), planFragment2.getFragmentJson());
            this.status.add(new FragmentData(planFragment2.getHandle(), planFragment2.getAssignment(), false));
        }
        Iterator<BitControl.PlanFragment> it = list.iterator();
        while (it.hasNext()) {
            sendRemoteFragment(it.next());
        }
        logger.debug("Fragment runs setup is complete.");
        this.running = true;
        if (!this.cancelled || this.stopped) {
            return;
        }
        stopQuery();
    }

    private void sendRemoteFragment(BitControl.PlanFragment planFragment) {
        logger.debug("Sending remote fragment to node {} with data {}", planFragment.getAssignment(), planFragment.getFragmentJson());
        this.status.add(new FragmentData(planFragment.getHandle(), planFragment.getAssignment(), false));
        this.controller.getTunnel(planFragment.getAssignment()).sendFragment(new FragmentSubmitListener(planFragment.getAssignment(), planFragment), planFragment);
    }

    @Override // org.apache.drill.exec.work.foreman.FragmentStatusListener
    public void statusUpdate(BitControl.FragmentStatus fragmentStatus) {
        logger.debug("New fragment status was provided to Foreman of {}", fragmentStatus);
        switch (fragmentStatus.getProfile().getState()) {
            case AWAITING_ALLOCATION:
                updateStatus(fragmentStatus, true);
                return;
            case CANCELLED:
                return;
            case FAILED:
                fail(fragmentStatus);
                return;
            case FINISHED:
                finished(fragmentStatus);
                return;
            case RUNNING:
                updateStatus(fragmentStatus, false);
                return;
            default:
                throw new UnsupportedOperationException(String.format("Received status of %s", fragmentStatus));
        }
    }

    private void updateStatus(BitControl.FragmentStatus fragmentStatus, boolean z) {
        this.status.update(fragmentStatus, z);
    }

    private void finished(BitControl.FragmentStatus fragmentStatus) {
        if (this.remainingFragmentCount.decrementAndGet() == 0) {
            logger.info("Outcome status: {}", this.status);
            this.foremanManagerListener.cleanupAndSendResult(UserBitShared.QueryResult.newBuilder().setQueryState(UserBitShared.QueryResult.QueryState.COMPLETED).setQueryId(this.queryId).build());
            this.workBus.removeFragmentStatusListener(this.queryId);
        }
        this.status.setEndTime(System.currentTimeMillis());
        this.status.incrementFinishedFragments();
        updateStatus(fragmentStatus, true);
    }

    private void fail(BitControl.FragmentStatus fragmentStatus) {
        stopQuery();
        this.foremanManagerListener.cleanupAndSendResult(UserBitShared.QueryResult.newBuilder().setQueryId(this.queryId).setQueryState(UserBitShared.QueryResult.QueryState.FAILED).addError(fragmentStatus.getProfile().getError()).build());
        this.status.setEndTime(System.currentTimeMillis());
        updateStatus(fragmentStatus, true);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void stopQuery() {
        this.workBus.removeFragmentStatusListener(this.queryId);
        List<FragmentData> fragmentData = this.status.getFragmentData();
        Collections.sort(fragmentData, new Comparator<FragmentData>() { // from class: org.apache.drill.exec.work.foreman.QueryManager.1
            @Override // java.util.Comparator
            public int compare(FragmentData fragmentData2, FragmentData fragmentData3) {
                return fragmentData3.getHandle().getMajorFragmentId() - fragmentData2.getHandle().getMajorFragmentId();
            }
        });
        for (FragmentData fragmentData2 : fragmentData) {
            ExecProtos.FragmentHandle handle = fragmentData2.getStatus().getHandle();
            switch (fragmentData2.getStatus().getProfile().getState()) {
                case AWAITING_ALLOCATION:
                case RUNNING:
                case SENDING:
                    if (fragmentData2.isLocal()) {
                        this.rootRunner.cancel();
                        break;
                    } else {
                        this.controller.getTunnel(fragmentData2.getEndpoint()).cancelFragment(new CancelListener(fragmentData2.getEndpoint(), handle), handle);
                        break;
                    }
            }
        }
    }

    public void cancel() {
        this.cancelled = true;
        if (this.running) {
            stopQuery();
            this.stopped = true;
        }
    }

    static {
        $assertionsDisabled = !QueryManager.class.desiredAssertionStatus();
        logger = LoggerFactory.getLogger(QueryManager.class);
    }
}
