package com.nepxion.thunder.protocol.kafka;

import com.nepxion.thunder.common.entity.ApplicationEntity;
import com.nepxion.thunder.common.entity.ConnectionEntity;
import com.nepxion.thunder.protocol.AbstractClientExecutor;
import com.nepxion.thunder.protocol.ProtocolException;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/nepxion/thunder/protocol/kafka/KafkaMQClientExecutor.class */
public class KafkaMQClientExecutor extends AbstractClientExecutor implements KafkaMQExecutorDelegate {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaMQClientExecutor.class);

    @Override // com.nepxion.thunder.protocol.ClientExecutor
    public void start(final String str, final ApplicationEntity applicationEntity) throws Exception {
        final String server = getServer(str);
        final Map<String, KafkaMQContext> referenceContextMap = KafkaMQCacheContainer.getReferenceContextMap();
        final CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
        Executors.newCachedThreadPool().submit(new Callable<Object>() { // from class: com.nepxion.thunder.protocol.kafka.KafkaMQClientExecutor.1
            @Override // java.util.concurrent.Callable
            public Object call() throws Exception {
                try {
                    KafkaMQContext kafkaMQContext = (KafkaMQContext) referenceContextMap.get(server);
                    if (kafkaMQContext == null) {
                        kafkaMQContext = new KafkaMQContext(KafkaMQClientExecutor.this);
                        kafkaMQContext.initializeContext(str, server);
                        referenceContextMap.put(server, kafkaMQContext);
                    }
                    kafkaMQContext.initializeRequestContext(str, applicationEntity);
                } catch (Exception e) {
                    KafkaMQClientExecutor.LOG.error("Start MQ failed", e);
                }
                cyclicBarrier.await();
                return null;
            }
        });
        cyclicBarrier.await();
    }

    private String getServer(String str) {
        return this.cacheContainer.getReferenceEntityMap().get(str).getServer();
    }

    @Override // com.nepxion.thunder.protocol.AbstractClientExecutor, com.nepxion.thunder.protocol.ClientExecutor
    public boolean started(String str, ApplicationEntity applicationEntity) throws Exception {
        return KafkaMQCacheContainer.getReferenceContextMap().get(getServer(str)) != null;
    }

    @Override // com.nepxion.thunder.protocol.AbstractClientExecutor, com.nepxion.thunder.protocol.ClientExecutor
    public ConnectionEntity online(String str, ApplicationEntity applicationEntity, Object obj) throws Exception {
        throw new ProtocolException("Online feature isn't supported in KafkaMQClientExecutor");
    }

    @Override // com.nepxion.thunder.protocol.AbstractClientExecutor, com.nepxion.thunder.protocol.ClientExecutor
    public void offline(String str, ApplicationEntity applicationEntity) throws Exception {
        throw new ProtocolException("Offline feature isn't supported in KafkaMQClientExecutor");
    }
}
