package com.nepxion.thunder.protocol.kafka;

import com.nepxion.thunder.common.constant.ThunderConstant;
import com.nepxion.thunder.common.entity.ApplicationEntity;
import com.nepxion.thunder.common.entity.MQPropertyEntity;
import com.nepxion.thunder.common.entity.ProtocolType;
import com.nepxion.thunder.common.thread.ThreadPoolFactory;
import com.nepxion.thunder.event.protocol.ProtocolEventFactory;
import com.nepxion.thunder.protocol.ProtocolRequest;
import com.nepxion.thunder.protocol.ProtocolResponse;
import com.nepxion.thunder.serialization.SerializerExecutor;
import java.util.Map;
import java.util.concurrent.Callable;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/nepxion/thunder/protocol/kafka/KafkaMQProducer.class */
public class KafkaMQProducer {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaMQProducer.class);
    protected MQPropertyEntity mqPropertyEntity;
    protected Producer<String, byte[]> producer;

    public KafkaMQProducer(MQPropertyEntity mQPropertyEntity) {
        Map<String, Object> map = null;
        try {
            map = mQPropertyEntity.summarizeProperties(ThunderConstant.KAFKA_PRODUCER_ATTRIBUTE_NAME);
        } catch (Exception e) {
            LOG.error("Extract properties failed", e);
        }
        this.mqPropertyEntity = mQPropertyEntity;
        this.producer = new KafkaProducer(map, new StringSerializer(), new ByteArraySerializer());
    }

    public MQPropertyEntity getMQPropertyEntity() {
        return this.mqPropertyEntity;
    }

    public Producer<String, byte[]> getProducer() {
        return this.producer;
    }

    public void produceRequest(final String str, ApplicationEntity applicationEntity, final ProtocolRequest protocolRequest) throws Exception {
        final String url = applicationEntity.toUrl();
        ThreadPoolFactory.createThreadPoolClientExecutor(url, protocolRequest.getInterface()).submit(new Callable<Object>() { // from class: com.nepxion.thunder.protocol.kafka.KafkaMQProducer.1
            @Override // java.util.concurrent.Callable
            public Object call() throws Exception {
                if (StringUtils.isEmpty(str)) {
                    KafkaMQProducer.LOG.error("Topic can't be null");
                    return null;
                }
                protocolRequest.setRequestSource(url);
                KafkaMQProducer.this.producer.send(new ProducerRecord(str, SerializerExecutor.serialize(protocolRequest)), new Callback() { // from class: com.nepxion.thunder.protocol.kafka.KafkaMQProducer.1.1
                    public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
                        if (exc == null) {
                            return;
                        }
                        KafkaMQProducer.LOG.error("Produce request failed", exc);
                        ProtocolEventFactory.postClientProducerEvent(ProtocolType.KAFKA, protocolRequest);
                    }
                });
                return null;
            }
        });
    }

    public void produceResponse(String str, ApplicationEntity applicationEntity, final ProtocolResponse protocolResponse, String str2) throws Exception {
        if (StringUtils.isEmpty(str)) {
            LOG.error("Topic can't be null");
            return;
        }
        if (StringUtils.isEmpty(str2)) {
            LOG.error("Request source can't be null");
            return;
        }
        String url = applicationEntity.toUrl();
        protocolResponse.setRequestSource(str2);
        protocolResponse.setResponseSource(url);
        this.producer.send(new ProducerRecord(str, str2, SerializerExecutor.serialize(protocolResponse)), new Callback() { // from class: com.nepxion.thunder.protocol.kafka.KafkaMQProducer.2
            public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
                if (exc == null) {
                    return;
                }
                KafkaMQProducer.LOG.error("Produce response failed", exc);
                ProtocolEventFactory.postServerProducerEvent(ProtocolType.KAFKA, protocolResponse);
            }
        });
    }
}
