package org.iplass.mtp.impl.infinispan.cluster.channel;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import org.infinispan.distexec.DefaultExecutorService;
import org.infinispan.distexec.DistributedExecutorService;
import org.iplass.mtp.impl.cluster.ClusterService;
import org.iplass.mtp.impl.cluster.Message;
import org.iplass.mtp.impl.cluster.channel.MessageChannel;
import org.iplass.mtp.impl.cluster.channel.MessageReceiver;
import org.iplass.mtp.impl.infinispan.InfinispanService;
import org.iplass.mtp.spi.Config;
import org.iplass.mtp.spi.ServiceInitListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/iplass/mtp/impl/infinispan/cluster/channel/InfinispanMessageChannel.class */
public class InfinispanMessageChannel implements MessageChannel, ServiceInitListener<ClusterService> {
    private static Logger logger = LoggerFactory.getLogger(InfinispanMessageChannel.class);
    private static Logger fatalLog = LoggerFactory.getLogger("mtp.fatal.cluster");
    private DistributedExecutorService ds;
    private InfinispanService is;
    private MessageReceiver receiver;
    private boolean sync;
    private BlockingQueue<Message> msgQueue;
    private ExecutorService ats;

    public boolean isSync() {
        return this.sync;
    }

    public void setSync(boolean z) {
        this.sync = z;
    }

    public void inited(ClusterService clusterService, Config config) {
        this.is = (InfinispanService) config.getDependentService(InfinispanService.class);
        this.ds = new DefaultExecutorService(this.is.getDefaultCache());
        if (this.sync) {
            return;
        }
        final LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        this.msgQueue = linkedBlockingQueue;
        this.ats = Executors.newSingleThreadExecutor();
        this.ats.submit(new Callable<Void>() { // from class: org.iplass.mtp.impl.infinispan.cluster.channel.InfinispanMessageChannel.1
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Void call() {
                ArrayList arrayList = new ArrayList(32);
                while (true) {
                    try {
                        Message message = (Message) linkedBlockingQueue.poll();
                        while (arrayList.size() < 32 && message != null) {
                            arrayList.add(message);
                            message = (Message) linkedBlockingQueue.poll();
                        }
                        if (arrayList.size() != 0) {
                            Message[] messageArr = (Message[]) arrayList.toArray(new Message[arrayList.size()]);
                            arrayList.clear();
                            InfinispanMessageChannel.this.doSendMessage(messageArr);
                        }
                        arrayList.add((Message) linkedBlockingQueue.take());
                    } catch (Error | RuntimeException e) {
                        InfinispanMessageChannel.fatalLog.error("send message worker failed.error=" + e, e);
                    } catch (InterruptedException e2) {
                        if (InfinispanMessageChannel.this.ats.isShutdown()) {
                            return null;
                        }
                        InfinispanMessageChannel.fatalLog.error("send message worker failed.error=" + e2, e2);
                    }
                }
            }
        });
    }

    public void destroyed() {
        this.ds.shutdown();
        this.ds = null;
        if (this.ats != null) {
            this.ats.shutdownNow();
        }
    }

    public void setMessageReceiver(MessageReceiver messageReceiver) {
        this.receiver = messageReceiver;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void receiveMessage(Message[] messageArr) {
        for (Message message : messageArr) {
            if (logger.isDebugEnabled()) {
                logger.debug("receive message :" + message);
            }
            this.receiver.receiveMessage(message);
        }
    }

    public void sendMessage(Message message) {
        if (this.sync) {
            doSendMessage(new Message[]{message});
        } else {
            if (this.msgQueue.offer(message)) {
                return;
            }
            fatalLog.error("send message failed. cause cant put to messageQueue. message=" + message);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void doSendMessage(Message[] messageArr) {
        if (logger.isDebugEnabled()) {
            logger.debug("send message over infinispan. message=" + Arrays.toString(messageArr));
        }
        Iterator it = this.ds.submitEverywhere(new InfinispanMessageTask(messageArr, this.is.getCacheManager().getAddress())).iterator();
        while (it.hasNext()) {
            try {
                ((Future) it.next()).get();
            } catch (Exception e) {
                fatalLog.error("send message failed.error=" + e + ", message=" + Arrays.toString(messageArr), e);
            }
        }
    }
}
