package org.jgroups.util;

import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.jgroups.Address;
import org.jgroups.Event;
import org.jgroups.Message;
import org.jgroups.logging.Log;
import org.jgroups.stack.Protocol;

/* loaded from: input_file:WEB-INF/lib/jgroups-3.6.3.Final.jar:org/jgroups/util/ForwardQueue.class */
public class ForwardQueue {
    protected Protocol up_prot;
    protected Protocol down_prot;
    protected Address local_addr;
    protected volatile Flusher flusher;
    protected final Log log;
    protected final NavigableMap<Long, Message> forward_table = new ConcurrentSkipListMap();
    protected final Lock send_lock = new ReentrantLock();
    protected final java.util.concurrent.locks.Condition send_cond = this.send_lock.newCondition();
    protected volatile boolean flushing = false;
    protected volatile boolean running = true;
    protected final AtomicInteger in_flight_sends = new AtomicInteger(0);
    protected final ConcurrentMap<Address, BoundedHashMap<Long, Long>> delivery_table = Util.createConcurrentMap();
    protected final Promise<Long> ack_promise = new Promise<>();
    protected int delivery_table_max_size = 500;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:WEB-INF/lib/jgroups-3.6.3.Final.jar:org/jgroups/util/ForwardQueue$Flusher.class */
    public class Flusher extends Thread {
        protected final Address new_coord;

        public Flusher(Address address) {
            this.new_coord = address;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                ForwardQueue.this.doFlush(this.new_coord);
            } catch (InterruptedException e) {
            }
        }
    }

    public ForwardQueue(Log log) {
        this.log = log;
    }

    public Protocol getUpProt() {
        return this.up_prot;
    }

    public void setUpProt(Protocol protocol) {
        this.up_prot = protocol;
    }

    public Protocol getDownProt() {
        return this.down_prot;
    }

    public void setDownProt(Protocol protocol) {
        this.down_prot = protocol;
    }

    public Address getLocalAddr() {
        return this.local_addr;
    }

    public void setLocalAddr(Address address) {
        this.local_addr = address;
    }

    public int getDeliveryTableMaxSize() {
        return this.delivery_table_max_size;
    }

    public void setDeliveryTableMaxSize(int i) {
        this.delivery_table_max_size = i;
    }

    public int deliveryTableSize() {
        int i = 0;
        Iterator<BoundedHashMap<Long, Long>> it = this.delivery_table.values().iterator();
        while (it.hasNext()) {
            i += it.next().size();
        }
        return i;
    }

    public void start() {
        this.running = true;
    }

    public void stop() {
        this.running = false;
        unblockAll();
        stopFlusher();
        this.forward_table.clear();
    }

    public void send(long j, Message message) {
        if (this.flushing) {
            block();
        }
        this.in_flight_sends.incrementAndGet();
        try {
            this.forward_table.put(Long.valueOf(j), message);
            if (this.running && !this.flushing) {
                this.down_prot.down(new Event(1, message));
            }
        } finally {
            this.in_flight_sends.decrementAndGet();
        }
    }

    public void receive(long j, Message message) {
        Address src = message.getSrc();
        if (src == null) {
            if (this.log.isErrorEnabled()) {
                this.log.error(this.local_addr + ": sender is null, cannot deliver message ::" + j);
            }
        } else if (canDeliver(src, j)) {
            if (this.log.isTraceEnabled()) {
                this.log.trace(this.local_addr + ": delivering " + src + "::" + j);
            }
            this.up_prot.up(new Event(1, message));
        } else if (this.log.isWarnEnabled()) {
            this.log.warn(this.local_addr + ": dropped duplicate message " + src + "::" + j);
        }
    }

    public void flush(Address address, List<Address> list) {
        this.delivery_table.keySet().retainAll(list);
        if (address != null) {
            stopFlusher();
            startFlusher(address);
        }
    }

    public void ack(long j) {
        this.forward_table.remove(Long.valueOf(j));
        this.ack_promise.setResult(Long.valueOf(j));
    }

    public int size() {
        return this.forward_table.size();
    }

    protected void doFlush(Address address) throws InterruptedException {
        while (this.flushing && this.running && this.in_flight_sends.get() != 0) {
            Thread.sleep(100L);
        }
        this.send_lock.lockInterruptibly();
        try {
            if (this.log.isTraceEnabled()) {
                this.log.trace(this.local_addr + ": target changed to " + address);
            }
            flushMessagesInForwardTable(address);
            if (this.log.isTraceEnabled()) {
                this.log.trace(this.local_addr + ": flushing completed");
            }
            this.flushing = false;
            this.send_cond.signalAll();
            this.send_lock.unlock();
        } catch (Throwable th) {
            if (this.log.isTraceEnabled()) {
                this.log.trace(this.local_addr + ": flushing completed");
            }
            this.flushing = false;
            this.send_cond.signalAll();
            this.send_lock.unlock();
            throw th;
        }
    }

    protected void flushMessagesInForwardTable(Address address) {
        Map.Entry<Long, Message> firstEntry = this.forward_table.firstEntry();
        if (firstEntry == null) {
            return;
        }
        Long key = firstEntry.getKey();
        Message value = firstEntry.getValue();
        while (this.flushing && this.running && !this.forward_table.isEmpty()) {
            Message copy = value.copy();
            copy.setDest(address);
            copy.setFlag(Message.Flag.DONT_BUNDLE);
            if (this.log.isTraceEnabled()) {
                this.log.trace(this.local_addr + ": flushing (forwarding) ::" + key + " to target " + address);
            }
            this.ack_promise.reset();
            this.down_prot.down(new Event(1, copy));
            Long result = this.ack_promise.getResult(500L);
            if ((result != null && result.equals(key)) || !this.forward_table.containsKey(key)) {
                break;
            }
        }
        for (Map.Entry<Long, Message> entry : this.forward_table.entrySet()) {
            Long key2 = entry.getKey();
            Message value2 = entry.getValue();
            if (this.flushing && this.running) {
                Message copy2 = value2.copy();
                copy2.setDest(address);
                copy2.setFlag(Message.Flag.DONT_BUNDLE);
                if (this.log.isTraceEnabled()) {
                    this.log.trace(this.local_addr + ": flushing (forwarding) ::" + key2 + " to target " + address);
                }
                this.down_prot.down(new Event(1, copy2));
            }
        }
    }

    protected boolean canDeliver(Address address, long j) {
        BoundedHashMap<Long, Long> boundedHashMap = this.delivery_table.get(address);
        if (boundedHashMap == null) {
            boundedHashMap = new BoundedHashMap<>(this.delivery_table_max_size);
            BoundedHashMap<Long, Long> put = this.delivery_table.put(address, boundedHashMap);
            if (put != null) {
                boundedHashMap = put;
            }
        }
        return boundedHashMap.add(Long.valueOf(j), Long.valueOf(j));
    }

    protected void block() {
        this.send_lock.lock();
        while (this.flushing && this.running) {
            try {
                try {
                    this.send_cond.await();
                } catch (InterruptedException e) {
                }
            } finally {
                this.send_lock.unlock();
            }
        }
    }

    protected void unblockAll() {
        this.flushing = false;
        this.send_lock.lock();
        try {
            this.send_cond.signalAll();
            this.ack_promise.setResult(null);
            this.send_lock.unlock();
        } catch (Throwable th) {
            this.send_lock.unlock();
            throw th;
        }
    }

    protected synchronized void startFlusher(Address address) {
        if (this.flusher == null || !this.flusher.isAlive()) {
            if (this.log.isTraceEnabled()) {
                this.log.trace(this.local_addr + ": flushing started");
            }
            this.flushing = true;
            this.flusher = new Flusher(address);
            this.flusher.setName("Flusher");
            this.flusher.start();
        }
    }

    protected void stopFlusher() {
        this.flushing = false;
        Flusher flusher = this.flusher;
        while (flusher != null && flusher.isAlive()) {
            flusher.interrupt();
            this.ack_promise.setResult(null);
            try {
                flusher.join();
            } catch (InterruptedException e) {
            }
        }
    }
}
