package org.apache.drill.exec.work.batch;

import com.google.common.collect.Queues;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.proto.BitData;
import org.apache.drill.exec.record.RawFragmentBatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.class */
public class UnlimitedRawBatchBuffer implements RawBatchBuffer {
    static final Logger logger = LoggerFactory.getLogger(UnlimitedRawBatchBuffer.class);
    private final int softlimit;
    private final int startlimit;
    private int streamCounter;
    private FragmentContext context;
    private volatile boolean finished = false;
    private final AtomicBoolean overlimit = new AtomicBoolean(false);
    private final AtomicBoolean outOfMemory = new AtomicBoolean(false);
    private final ResponseSenderQueue readController = new ResponseSenderQueue();
    private final LinkedBlockingDeque<RawFragmentBatch> buffer = Queues.newLinkedBlockingDeque();

    public UnlimitedRawBatchBuffer(FragmentContext fragmentContext, int i) {
        this.softlimit = fragmentContext.getConfig().getInt(ExecConstants.INCOMING_BUFFER_SIZE) * i;
        this.startlimit = Math.max(this.softlimit / 2, 1);
        this.streamCounter = i;
        this.context = fragmentContext;
    }

    @Override // org.apache.drill.exec.work.batch.RawBatchBuffer
    public void enqueue(RawFragmentBatch rawFragmentBatch) {
        if (this.finished) {
            throw new RuntimeException("Attempted to enqueue batch after finished");
        }
        if (!rawFragmentBatch.getHeader().getIsOutOfMemory()) {
            this.buffer.add(rawFragmentBatch);
            if (this.buffer.size() != this.softlimit) {
                rawFragmentBatch.sendOk();
                return;
            } else {
                this.overlimit.set(true);
                this.readController.enqueueResponse(rawFragmentBatch.getSender());
                return;
            }
        }
        logger.debug("Setting autoread false");
        RawFragmentBatch peekFirst = this.buffer.peekFirst();
        BitData.FragmentRecordBatch header = peekFirst == null ? null : peekFirst.getHeader();
        if (!this.outOfMemory.get() && header != null && header.getIsOutOfMemory()) {
            this.buffer.addFirst(rawFragmentBatch);
        }
        this.outOfMemory.set(true);
    }

    @Override // org.apache.drill.exec.record.RawFragmentBatchProvider
    public void cleanup() {
        if (!this.finished && !this.context.isCancelled()) {
            IllegalStateException illegalStateException = new IllegalStateException("Cleanup before finished");
            this.context.fail(illegalStateException);
            throw illegalStateException;
        }
        if (this.buffer.isEmpty()) {
            return;
        }
        if (!this.context.isFailed() && !this.context.isCancelled()) {
            this.context.fail(new IllegalStateException("Batches still in queue during cleanup"));
            logger.error("{} Batches in queue.", Integer.valueOf(this.buffer.size()));
            while (true) {
                RawFragmentBatch poll = this.buffer.poll();
                if (poll == null) {
                    break;
                } else {
                    logger.error("Batch left in queue: {}", poll);
                }
            }
        }
        while (true) {
            RawFragmentBatch poll2 = this.buffer.poll();
            if (poll2 == null) {
                return;
            }
            if (poll2.getBody() != null) {
                poll2.getBody().release();
            }
        }
    }

    @Override // org.apache.drill.exec.record.RawFragmentBatchProvider
    public void kill(FragmentContext fragmentContext) {
        while (!this.buffer.isEmpty()) {
            this.buffer.poll().getBody().release();
        }
    }

    @Override // org.apache.drill.exec.work.batch.RawBatchBuffer
    public void finished() {
        this.finished = true;
        if (!this.buffer.isEmpty()) {
            throw new IllegalStateException("buffer not empty when finished");
        }
    }

    @Override // org.apache.drill.exec.record.RawFragmentBatchProvider
    public RawFragmentBatch getNext() {
        if (this.outOfMemory.get() && this.buffer.size() < 10) {
            logger.debug("Setting autoread true");
            this.outOfMemory.set(false);
            this.readController.flushResponses();
        }
        RawFragmentBatch poll = this.buffer.poll();
        if (poll == null && (!this.finished || !this.buffer.isEmpty())) {
            try {
                poll = this.buffer.take();
            } catch (InterruptedException e) {
                return null;
            }
        }
        if (poll != null && poll.getHeader().getIsOutOfMemory()) {
            this.outOfMemory.set(true);
            return poll;
        }
        if (!this.finished && this.overlimit.get() && this.buffer.size() == this.startlimit) {
            this.overlimit.set(false);
            this.readController.flushResponses();
        }
        if (poll != null && poll.getHeader().getIsLastBatch()) {
            this.streamCounter--;
            if (this.streamCounter == 0) {
                finished();
            }
        }
        if (poll == null && this.buffer.size() > 0) {
            throw new IllegalStateException("Returning null when there are batches left in queue");
        }
        if (poll != null || this.finished) {
            return poll;
        }
        throw new IllegalStateException("Returning null when not finished");
    }
}
