package com.nepxion.thunder.protocol.kafka;

import com.nepxion.thunder.common.entity.ApplicationEntity;
import com.nepxion.thunder.protocol.AbstractServerExecutor;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/nepxion/thunder/protocol/kafka/KafkaMQServerExecutor.class */
public class KafkaMQServerExecutor extends AbstractServerExecutor implements KafkaMQExecutorDelegate {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaMQServerExecutor.class);

    @Override // com.nepxion.thunder.protocol.ServerExecutor
    public void start(final String str, final ApplicationEntity applicationEntity) throws Exception {
        final String server = getServer(str);
        final Map<String, KafkaMQContext> serviceContextMap = KafkaMQCacheContainer.getServiceContextMap();
        final CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
        Executors.newCachedThreadPool().submit(new Callable<Object>() { // from class: com.nepxion.thunder.protocol.kafka.KafkaMQServerExecutor.1
            @Override // java.util.concurrent.Callable
            public Object call() throws Exception {
                try {
                    KafkaMQContext kafkaMQContext = (KafkaMQContext) serviceContextMap.get(server);
                    if (kafkaMQContext == null) {
                        kafkaMQContext = new KafkaMQContext(KafkaMQServerExecutor.this);
                        kafkaMQContext.initializeContext(str, server);
                        serviceContextMap.put(server, kafkaMQContext);
                    }
                    kafkaMQContext.initializeResponseContext(str, applicationEntity);
                } catch (Exception e) {
                    KafkaMQServerExecutor.LOG.error("Start MQ failed", e);
                }
                cyclicBarrier.await();
                return null;
            }
        });
        cyclicBarrier.await();
    }

    @Override // com.nepxion.thunder.protocol.ServerExecutor
    public boolean started(String str, ApplicationEntity applicationEntity) throws Exception {
        return KafkaMQCacheContainer.getServiceContextMap().get(getServer(str)) != null;
    }

    private String getServer(String str) {
        return this.cacheContainer.getServiceEntityMap().get(str).getServer();
    }
}
