package org.jgroups.protocols.pbcast;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.jgroups.Address;
import org.jgroups.Event;
import org.jgroups.Header;
import org.jgroups.Message;
import org.jgroups.View;
import org.jgroups.annotations.MBean;
import org.jgroups.annotations.ManagedAttribute;
import org.jgroups.annotations.ManagedOperation;
import org.jgroups.annotations.Property;
import org.jgroups.stack.IpAddress;
import org.jgroups.stack.Protocol;
import org.jgroups.stack.StateTransferInfo;
import org.jgroups.util.Digest;
import org.jgroups.util.ProcessingQueue;
import org.jgroups.util.ShutdownRejectedExecutionHandler;
import org.jgroups.util.StateTransferResult;
import org.jgroups.util.Tuple;
import org.jgroups.util.Util;

@MBean(description = "Streaming state transfer protocol base class")
/* loaded from: classes4.dex */
public abstract class StreamingStateTransfer extends Protocol implements ProcessingQueue.Handler<Address> {
    protected double avg_state_size;
    protected Address local_addr;
    protected volatile Address state_provider;
    protected ThreadPoolExecutor thread_pool;

    @Property(description = "Size (in bytes) of the state transfer buffer")
    protected int buffer_size = 8192;

    @Property(description = "Maximum number of pool threads serving state requests")
    protected int max_pool = 5;

    @Property(description = "Keep alive for pool threads serving state requests")
    protected long pool_thread_keep_alive = 20000;
    protected final AtomicInteger num_state_reqs = new AtomicInteger(0);
    protected final AtomicLong num_bytes_sent = new AtomicLong(0);
    protected final List<Address> members = new ArrayList();
    protected volatile boolean flushProtocolInStack = false;
    protected final ProcessingQueue<Address> state_requesters = new ProcessingQueue().setHandler(this);

    /* loaded from: classes4.dex */
    protected class StateGetter implements Runnable {
        protected final OutputStream output;
        protected final Address requester;

        public StateGetter(Address address, OutputStream outputStream) {
            this.requester = address;
            this.output = outputStream;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                ((Protocol) StreamingStateTransfer.this).log.debug("%s: getting the state from the application", StreamingStateTransfer.this.local_addr);
                ((Protocol) StreamingStateTransfer.this).up_prot.up(new Event(72, this.output));
                this.output.flush();
                StreamingStateTransfer.this.sendEof(this.requester);
                if (!StreamingStateTransfer.this.isDigestNeeded()) {
                }
            } catch (Throwable th) {
                try {
                    StreamingStateTransfer.this.sendException(this.requester, th);
                } finally {
                    if (StreamingStateTransfer.this.isDigestNeeded()) {
                        StreamingStateTransfer.this.resumeStable();
                        StreamingStateTransfer.this.closeHoleFor(this.requester);
                    }
                }
            }
        }
    }

    /* loaded from: classes4.dex */
    public static class StateHeader extends Header {
        public static final byte STATE_EOF = 4;
        public static final byte STATE_EX = 5;
        public static final byte STATE_PART = 3;
        public static final byte STATE_REQ = 1;
        public static final byte STATE_RSP = 2;
        protected IpAddress bind_addr;
        protected Digest digest;
        protected byte type;

        public StateHeader() {
            this.type = (byte) 0;
        }

        public StateHeader(byte b) {
            this.type = b;
        }

        public StateHeader(byte b, IpAddress ipAddress, Digest digest) {
            this.type = b;
            this.digest = digest;
            this.bind_addr = ipAddress;
        }

        public StateHeader(byte b, Digest digest) {
            this.type = b;
            this.digest = digest;
        }

        static String type2Str(int i) {
            return i != 1 ? i != 2 ? i != 3 ? i != 4 ? i != 5 ? "<unknown>" : "STATE_EX" : "STATE_EOF" : "STATE_PART" : "STATE_RSP" : "STATE_REQ";
        }

        public Digest getDigest() {
            return this.digest;
        }

        public int getType() {
            return this.type;
        }

        @Override // org.jgroups.util.Streamable
        public void readFrom(DataInput dataInput) throws Exception {
            this.type = dataInput.readByte();
            this.digest = (Digest) Util.readStreamable(Digest.class, dataInput);
            this.bind_addr = (IpAddress) Util.readStreamable(IpAddress.class, dataInput);
        }

        @Override // org.jgroups.Header
        public int size() {
            Digest digest = this.digest;
            return (digest != null ? (int) (2 + digest.serializedSize(true)) : 2) + Util.size(this.bind_addr);
        }

        @Override // org.jgroups.Header
        public String toString() {
            StringBuilder sb = new StringBuilder();
            sb.append("type=");
            sb.append(type2Str(this.type));
            if (this.digest != null) {
                sb.append(", digest=");
                sb.append(this.digest);
            }
            if (this.bind_addr != null) {
                sb.append(", bind_addr=" + this.bind_addr);
            }
            return sb.toString();
        }

        @Override // org.jgroups.util.Streamable
        public void writeTo(DataOutput dataOutput) throws Exception {
            dataOutput.writeByte(this.type);
            Util.writeStreamable(this.digest, dataOutput);
            Util.writeStreamable(this.bind_addr, dataOutput);
        }
    }

    protected void close(Object obj) {
    }

    @ManagedOperation(description = "Closes BARRIER and suspends STABLE")
    public void closeBarrierAndSuspendStable() {
        if (isDigestNeeded()) {
            this.log.trace("%s: sending down CLOSE_BARRIER and SUSPEND_STABLE", this.local_addr);
            this.down_prot.down(new Event(76));
            this.down_prot.down(new Event(65));
        }
    }

    protected void closeHoleFor(Address address) {
        this.down_prot.down(new Event(107, address));
    }

    protected abstract Tuple<InputStream, Object> createStreamToProvider(Address address, StateHeader stateHeader) throws Exception;

    protected void createStreamToRequester(Address address) {
    }

    protected ThreadPoolExecutor createThreadPool() {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, this.max_pool, this.pool_thread_keep_alive, TimeUnit.MILLISECONDS, new SynchronousQueue());
        ThreadFactory threadFactory = new ThreadFactory() { // from class: org.jgroups.protocols.pbcast.StreamingStateTransfer.1
            private final AtomicInteger thread_id = new AtomicInteger(1);

            @Override // java.util.concurrent.ThreadFactory
            public Thread newThread(Runnable runnable) {
                return StreamingStateTransfer.this.getThreadFactory().newThread(runnable, "StreamingStateTransfer-sender-" + this.thread_id.getAndIncrement());
            }
        };
        threadPoolExecutor.setRejectedExecutionHandler(new ShutdownRejectedExecutionHandler(threadPoolExecutor.getRejectedExecutionHandler()));
        threadPoolExecutor.setThreadFactory(threadFactory);
        return threadPoolExecutor;
    }

    @Override // org.jgroups.stack.Protocol
    public void destroy() {
        this.thread_pool.shutdown();
        super.destroy();
    }

    protected Address determineCoordinator() {
        synchronized (this.members) {
            for (Address address : this.members) {
                if (!this.local_addr.equals(address)) {
                    return address;
                }
            }
            return null;
        }
    }

    @Override // org.jgroups.stack.Protocol
    public Object down(Event event) {
        int type = event.getType();
        if (type == 6) {
            handleViewChange((View) event.getArg());
        } else if (type == 8) {
            this.local_addr = (Address) event.getArg();
        } else {
            if (type == 19) {
                Address address = ((StateTransferInfo) event.getArg()).target;
                if (address != null && address.equals(this.local_addr)) {
                    this.log.error("%s: cannot fetch state from myself", this.local_addr);
                    address = null;
                }
                if (address == null) {
                    address = determineCoordinator();
                }
                if (address == null) {
                    this.log.debug("%s: first member (no state)", this.local_addr);
                    this.up_prot.up(new Event(73, new StateTransferResult()));
                } else {
                    this.state_provider = address;
                    Message flag = new Message(address).putHeader(this.id, new StateHeader((byte) 1)).setFlag(Message.Flag.SKIP_BARRIER, Message.Flag.DONT_BUNDLE, Message.Flag.OOB);
                    this.log.debug("%s: asking %s for state", this.local_addr, address);
                    this.down_prot.down(new Event(1, flag));
                }
                return null;
            }
            if (type == 56) {
                handleConfig((Map) event.getArg());
            }
        }
        return this.down_prot.down(event);
    }

    @ManagedAttribute
    public double getAverageStateSize() {
        return this.avg_state_size;
    }

    @ManagedAttribute
    public long getNumberOfStateBytesSent() {
        return this.num_bytes_sent.get();
    }

    @ManagedAttribute
    public int getNumberOfStateRequests() {
        return this.num_state_reqs.get();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void getStateFromApplication(Address address, OutputStream outputStream, boolean z) {
        if (outputStream == null || address == null) {
            throw new IllegalArgumentException("output stream and requester's address have to be non-null");
        }
        StateGetter stateGetter = new StateGetter(address, outputStream);
        if (z) {
            this.thread_pool.execute(stateGetter);
        } else {
            stateGetter.run();
        }
    }

    @ManagedAttribute
    public long getThreadPoolCompletedTasks() {
        return this.thread_pool.getCompletedTaskCount();
    }

    @ManagedAttribute
    public int getThreadPoolSize() {
        return this.thread_pool.getPoolSize();
    }

    @Override // org.jgroups.util.ProcessingQueue.Handler
    public void handle(Address address) {
        handleStateReq(address);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void handleConfig(Map<String, Object> map) {
        if (map != null && map.containsKey("flush_supported")) {
            this.flushProtocolInStack = true;
        }
        if (map != null && map.containsKey("state_transfer")) {
            throw new IllegalArgumentException("Protocol stack must have only one state transfer protocol");
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void handleEOF(Address address) {
        this.state_provider = null;
        this.down_prot.down(new Event(108));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void handleException(Throwable th) {
        this.state_provider = null;
        this.up_prot.up(new Event(73, new StateTransferResult(th)));
    }

    protected void handleStateChunk(Address address, byte[] bArr, int i, int i2) {
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void handleStateReq(Address address) {
        Digest digest;
        if (address == null) {
            return;
        }
        this.log.debug("%s: received state request from %s", this.local_addr, address);
        if (isDigestNeeded()) {
            try {
                punchHoleFor(address);
                closeBarrierAndSuspendStable();
                digest = (Digest) this.down_prot.down(Event.GET_DIGEST_EVT);
            } catch (Throwable th) {
                try {
                    sendException(address, th);
                    resumeStable();
                    closeHoleFor(address);
                    return;
                } finally {
                    openBarrier();
                }
            }
        } else {
            digest = null;
        }
        StateHeader stateHeader = new StateHeader((byte) 2, null, digest);
        modifyStateResponseHeader(stateHeader);
        Message putHeader = new Message(address).putHeader(this.id, stateHeader);
        this.log.debug("%s: responding to state requester %s", this.local_addr, address);
        this.down_prot.down(new Event(1, putHeader));
        if (this.stats) {
            this.num_state_reqs.incrementAndGet();
        }
        try {
            createStreamToRequester(address);
        } catch (Throwable th2) {
            sendException(address, th2);
        }
    }

    protected void handleStateRsp(final Address address, StateHeader stateHeader) {
        final InputStream inputStream;
        if (isDigestNeeded()) {
            try {
                punchHoleFor(address);
                closeBarrierAndSuspendStable();
                this.down_prot.down(new Event(42, stateHeader.getDigest()));
            } catch (Throwable th) {
                handleException(th);
                openBarrierAndResumeStable();
                closeHoleFor(address);
                return;
            }
        }
        try {
            Tuple<InputStream, Object> createStreamToProvider = createStreamToProvider(address, stateHeader);
            inputStream = createStreamToProvider.getVal1();
            try {
                final Object val2 = createStreamToProvider.getVal2();
                if (useAsyncStateDelivery()) {
                    getThreadFactory().newThread(new Runnable() { // from class: org.jgroups.protocols.pbcast.StreamingStateTransfer.2
                        @Override // java.lang.Runnable
                        public void run() {
                            StreamingStateTransfer.this.setStateInApplication(inputStream, val2, address);
                        }
                    }, "STATE state reader").start();
                } else {
                    setStateInApplication(inputStream, val2, address);
                }
            } catch (Throwable th2) {
                th = th2;
                handleException(th);
                Util.close(inputStream);
                close(null);
                if (isDigestNeeded()) {
                    openBarrierAndResumeStable();
                    closeHoleFor(address);
                }
            }
        } catch (Throwable th3) {
            th = th3;
            inputStream = null;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void handleViewChange(View view) {
        List<Address> members = view.getMembers();
        synchronized (this.members) {
            this.members.clear();
            this.members.addAll(members);
        }
        this.state_requesters.retainAll(members);
    }

    @Override // org.jgroups.stack.Protocol
    public void init() throws Exception {
        super.init();
        this.thread_pool = createThreadPool();
    }

    protected boolean isDigestNeeded() {
        return !this.flushProtocolInStack;
    }

    protected void modifyStateResponseHeader(StateHeader stateHeader) {
    }

    protected void openBarrier() {
        this.down_prot.down(new Event(77));
    }

    @ManagedOperation(description = "Opens BARRIER and resumes STABLE")
    public void openBarrierAndResumeStable() {
        if (isDigestNeeded()) {
            this.log.trace("%s: sending down OPEN_BARRIER and RESUME_STABLE", this.local_addr);
            openBarrier();
            resumeStable();
        }
    }

    protected void punchHoleFor(Address address) {
        this.down_prot.down(new Event(106, address));
    }

    @Override // org.jgroups.stack.Protocol
    public List<Integer> requiredDownServices() {
        ArrayList arrayList = new ArrayList(2);
        arrayList.add(39);
        arrayList.add(42);
        return arrayList;
    }

    @Override // org.jgroups.stack.Protocol
    public void resetStats() {
        super.resetStats();
        this.num_state_reqs.set(0);
        this.num_bytes_sent.set(0L);
        this.avg_state_size = 0.0d;
    }

    protected void resumeStable() {
        this.down_prot.down(new Event(66));
    }

    protected void sendEof(Address address) {
        try {
            Message putHeader = new Message(address).putHeader(getId(), new StateHeader((byte) 4));
            this.log.trace("%s --> EOF --> %s", this.local_addr, address);
            down(new Event(1, putHeader));
        } catch (Throwable unused) {
            this.log.error("%s: failed sending EOF to %s", this.local_addr, address);
        }
    }

    protected void sendException(Address address, Throwable th) {
        try {
            down(new Event(1, new Message(address, (Address) null, th).putHeader(getId(), new StateHeader((byte) 5))));
        } catch (Throwable unused) {
            this.log.error("%s: failed sending exception %s to %s", this.local_addr, th.toString(), address);
        }
    }

    protected void setStateInApplication(InputStream inputStream, Object obj, Address address) {
        this.log.debug("%s: setting the state in the aplication", this.local_addr);
        try {
            this.up_prot.up(new Event(71, inputStream));
            this.up_prot.up(new Event(73, new StateTransferResult()));
            this.down_prot.down(new Event(108));
            Util.close(inputStream);
            close(obj);
            if (!isDigestNeeded()) {
            }
        } catch (Throwable th) {
            try {
                handleException(th);
            } finally {
                Util.close(inputStream);
                close(obj);
                if (isDigestNeeded()) {
                    openBarrierAndResumeStable();
                    closeHoleFor(address);
                }
            }
        }
    }

    @Override // org.jgroups.stack.Protocol
    public void start() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("state_transfer", Boolean.TRUE);
        hashMap.put("protocol_class", getClass().getName());
        this.up_prot.up(new Event(56, hashMap));
        if (this.buffer_size <= 0) {
            throw new IllegalArgumentException("buffer_size has to be > 0");
        }
    }

    @Override // org.jgroups.stack.Protocol
    public void stop() {
        super.stop();
    }

    @Override // org.jgroups.stack.Protocol, org.jgroups.UpHandler
    public Object up(Event event) {
        int type = event.getType();
        if (type == 1) {
            Message message = (Message) event.getArg();
            StateHeader stateHeader = (StateHeader) message.getHeader(this.id);
            if (stateHeader != null) {
                Address src = message.getSrc();
                byte b = stateHeader.type;
                if (b == 1) {
                    this.state_requesters.add(message.getSrc());
                    return null;
                }
                if (b == 2) {
                    handleStateRsp(src, stateHeader);
                    return null;
                }
                if (b == 3) {
                    handleStateChunk(src, message.getRawBuffer(), message.getOffset(), message.getLength());
                    return null;
                }
                if (b == 4) {
                    this.log.trace("%s <-- EOF <-- %s", this.local_addr, src);
                    handleEOF(src);
                    return null;
                }
                if (b != 5) {
                    this.log.error("%s: type %d not known in StateHeader", this.local_addr, Byte.valueOf(b));
                    return null;
                }
                handleException((Throwable) message.getObject());
                return null;
            }
        } else if (type == 6 || type == 15) {
            handleViewChange((View) event.getArg());
        } else if (type == 56) {
            handleConfig((Map) event.getArg());
        }
        return this.up_prot.up(event);
    }

    protected boolean useAsyncStateDelivery() {
        return false;
    }
}
