package org.apache.drill.exec.physical.impl.partitionsender;

import com.sun.codemodel.JExpr;
import com.sun.codemodel.JExpression;
import com.sun.codemodel.JType;
import java.io.IOException;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerArray;
import org.apache.drill.common.expression.ErrorCollectorImpl;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.exec.exception.ClassTransformationException;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.ClassGenerator;
import org.apache.drill.exec.expr.CodeGenerator;
import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.MetricDef;
import org.apache.drill.exec.physical.config.HashPartitionSender;
import org.apache.drill.exec.physical.impl.BaseRootExec;
import org.apache.drill.exec.physical.impl.SendingAccountor;
import org.apache.drill.exec.proto.CoordinationProtos;
import org.apache.drill.exec.proto.ExecProtos;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.FragmentWritableBatch;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.rpc.data.DataTunnel;
import org.apache.drill.exec.vector.CopyUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.class */
public class PartitionSenderRootExec extends BaseRootExec {
    static final Logger logger = LoggerFactory.getLogger(PartitionSenderRootExec.class);
    private RecordBatch incoming;
    private HashPartitionSender operator;
    private Partitioner partitioner;
    private FragmentContext context;
    private boolean ok;
    private final SendingAccountor sendCount;
    private final int outGoingBatchCount;
    private final HashPartitionSender popConfig;
    private final StatusHandler statusHandler;
    private final AtomicIntegerArray remainingReceivers;
    private final AtomicInteger remaingReceiverCount;
    private volatile boolean done;

    /* loaded from: input_file:org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec$Metric.class */
    public enum Metric implements MetricDef {
        BATCHES_SENT,
        RECORDS_SENT,
        MIN_RECORDS,
        MAX_RECORDS,
        N_RECEIVERS,
        BYTES_SENT;

        @Override // org.apache.drill.exec.ops.MetricDef
        public int metricId() {
            return ordinal();
        }
    }

    @Override // org.apache.drill.exec.physical.impl.BaseRootExec
    public boolean innerNext() {
        RecordBatch.IterOutcome iterOutcome;
        if (!this.ok) {
            stop();
            return false;
        }
        if (this.done) {
            this.incoming.kill(true);
            iterOutcome = RecordBatch.IterOutcome.NONE;
        } else {
            iterOutcome = next(this.incoming);
        }
        logger.debug("Partitioner.next(): got next record batch with status {}", iterOutcome);
        switch (iterOutcome) {
            case NONE:
                try {
                    if (this.partitioner != null) {
                        this.partitioner.flushOutgoingBatches(true, false);
                    } else {
                        sendEmptyBatch();
                    }
                    return false;
                } catch (IOException e) {
                    this.incoming.kill(false);
                    logger.error("Error while creating partitioning sender or flushing outgoing batches", (Throwable) e);
                    this.context.fail(e);
                    return false;
                }
            case STOP:
                if (this.partitioner == null) {
                    return false;
                }
                this.partitioner.clear();
                return false;
            case OK_NEW_SCHEMA:
                try {
                    if (this.partitioner != null) {
                        this.partitioner.flushOutgoingBatches(false, true);
                        this.partitioner.clear();
                    }
                    createPartitioner();
                    break;
                } catch (IOException e2) {
                    this.incoming.kill(false);
                    logger.error("Error while flushing outgoing batches", (Throwable) e2);
                    this.context.fail(e2);
                    return false;
                } catch (SchemaChangeException e3) {
                    this.incoming.kill(false);
                    logger.error("Error while setting up partitioner", (Throwable) e3);
                    this.context.fail(e3);
                    return false;
                }
            case OK:
                break;
            case NOT_YET:
            default:
                throw new IllegalStateException();
        }
        try {
            this.partitioner.partitionBatch(this.incoming);
            Iterator it = this.incoming.iterator();
            while (it.hasNext()) {
                ((VectorWrapper) it.next()).clear();
            }
            return true;
        } catch (IOException e4) {
            this.context.fail(e4);
            this.incoming.kill(false);
            return false;
        }
    }

    private void createPartitioner() throws SchemaChangeException {
        LogicalExpression expr = this.operator.getExpr();
        ErrorCollectorImpl errorCollectorImpl = new ErrorCollectorImpl();
        ClassGenerator root = CodeGenerator.getRoot(Partitioner.TEMPLATE_DEFINITION, this.context.getFunctionRegistry());
        ClassGenerator innerGenerator = root.getInnerGenerator("OutgoingRecordBatch");
        LogicalExpression materialize = ExpressionTreeMaterializer.materialize(expr, this.incoming, errorCollectorImpl, this.context.getFunctionRegistry());
        if (errorCollectorImpl.hasErrors()) {
            throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema.  Errors:\n %s.", errorCollectorImpl.toErrorString()));
        }
        JExpression direct = JExpr.direct("bucket");
        root.getEvalBlock().decl(JType.parse(root.getModel(), "int"), "bucket", root.addExpr(materialize).getValue().mod(JExpr.lit(this.outGoingBatchCount)));
        root.getEvalBlock()._return(root.getModel().ref(Math.class).staticInvoke("abs").arg(direct));
        CopyUtil.generateCopies(innerGenerator, this.incoming, this.incoming.getSchema().getSelectionVectorMode() == BatchSchema.SelectionVectorMode.FOUR_BYTE);
        try {
            this.partitioner = (Partitioner) this.context.getImplementationClass(root);
            this.partitioner.setup(this.context, this.incoming, this.popConfig, this.stats, this.sendCount, this.oContext, this.statusHandler);
        } catch (IOException | ClassTransformationException e) {
            throw new SchemaChangeException("Failure while attempting to load generated class", e);
        }
    }

    @Override // org.apache.drill.exec.physical.impl.BaseRootExec, org.apache.drill.exec.physical.impl.RootExec
    public void receivingFragmentFinished(ExecProtos.FragmentHandle fragmentHandle) {
        if (this.remainingReceivers.compareAndSet(fragmentHandle.getMinorFragmentId(), 0, 1)) {
            this.partitioner.getOutgoingBatches().get(fragmentHandle.getMinorFragmentId()).terminate();
            if (this.remaingReceiverCount.decrementAndGet() == 0) {
                this.done = true;
            }
        }
    }

    @Override // org.apache.drill.exec.physical.impl.RootExec
    public void stop() {
        logger.debug("Partition sender stopping.");
        this.ok = false;
        if (this.partitioner != null) {
            this.partitioner.clear();
        }
        this.sendCount.waitForSendComplete();
        if (!this.statusHandler.isOk()) {
            this.context.fail(this.statusHandler.getException());
        }
        this.oContext.close();
        this.incoming.cleanup();
    }

    public void sendEmptyBatch() {
        ExecProtos.FragmentHandle handle = this.context.getHandle();
        int i = 0;
        StatusHandler statusHandler = new StatusHandler(this.sendCount, this.context);
        Iterator<CoordinationProtos.DrillbitEndpoint> it = this.popConfig.getDestinations().iterator();
        while (it.hasNext()) {
            DataTunnel dataTunnel = this.context.getDataTunnel(it.next(), this.context.getHandle().toBuilder().setMajorFragmentId(this.popConfig.getOppositeMajorFragmentId()).setMinorFragmentId(i).build());
            FragmentWritableBatch emptyLastWithSchema = FragmentWritableBatch.getEmptyLastWithSchema(handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), this.operator.getOppositeMajorFragmentId(), i, this.incoming.getSchema());
            this.stats.startWait();
            try {
                dataTunnel.sendRecordBatch(statusHandler, emptyLastWithSchema);
                this.stats.stopWait();
                this.sendCount.increment();
                i++;
            } catch (Throwable th) {
                this.stats.stopWait();
                throw th;
            }
        }
    }
}
