package com.nepxion.thunder.protocol.kafka;

import com.nepxion.thunder.common.constant.ThunderConstant;
import com.nepxion.thunder.common.entity.ApplicationEntity;
import com.nepxion.thunder.common.entity.MQPropertyEntity;
import com.nepxion.thunder.common.thread.ThreadPoolFactory;
import com.nepxion.thunder.protocol.ProtocolRequest;
import com.nepxion.thunder.protocol.ProtocolResponse;
import com.nepxion.thunder.serialization.SerializerExecutor;
import java.util.Arrays;
import java.util.Iterator;
import java.util.concurrent.Callable;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/nepxion/thunder/protocol/kafka/KafkaMQServerHandler.class */
public class KafkaMQServerHandler extends KafkaMQConsumer {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaMQServerHandler.class);
    private KafkaMQProducer producer;
    private int consumerServerPollTimeout;
    private boolean transportLogPrint;

    public KafkaMQServerHandler(MQPropertyEntity mQPropertyEntity, String str) {
        super(mQPropertyEntity, str);
        this.consumerServerPollTimeout = 500;
        try {
            this.consumerServerPollTimeout = mQPropertyEntity.getInteger(ThunderConstant.KAFKA_CONSUMER_SERVER_POLL_TIMEOUT_ATTRIBUTE_NAME);
            this.transportLogPrint = mQPropertyEntity.getBoolean(ThunderConstant.TRANSPORT_LOG_PRINT_ATTRIBUTE_NAME);
        } catch (Exception e) {
            LOG.error("Get properties failed", e);
        }
        this.producer = new KafkaMQProducer(mQPropertyEntity);
    }

    public void consume(final String str, final String str2, final String str3, final ApplicationEntity applicationEntity) throws Exception {
        ThreadPoolFactory.createThreadPoolServerExecutor(applicationEntity.toUrl(), str3).submit(new Callable<Object>() { // from class: com.nepxion.thunder.protocol.kafka.KafkaMQServerHandler.1
            @Override // java.util.concurrent.Callable
            public Object call() throws Exception {
                KafkaMQServerHandler.this.consumer.subscribe(Arrays.asList(str));
                while (true) {
                    ConsumerRecords poll = KafkaMQServerHandler.this.consumer.poll(KafkaMQServerHandler.this.consumerServerPollTimeout);
                    if (poll != null && poll.count() != 0) {
                        Iterator it = poll.iterator();
                        while (it.hasNext()) {
                            ProtocolRequest protocolRequest = (ProtocolRequest) SerializerExecutor.deserialize((byte[]) ((ConsumerRecord) it.next()).value());
                            String obj = protocolRequest.getRequestSource().toString();
                            if (KafkaMQServerHandler.this.transportLogPrint) {
                                KafkaMQServerHandler.LOG.info("Request from client={}, service={}", obj, str3);
                            }
                            ProtocolResponse protocolResponse = new ProtocolResponse();
                            try {
                                KafkaMQServerHandler.this.executorContainer.getServerExecutorAdapter().handle(protocolRequest, protocolResponse);
                            } catch (Exception e) {
                                KafkaMQServerHandler.LOG.error("Consume request failed", e);
                            }
                            if (protocolRequest.isFeedback()) {
                                try {
                                    KafkaMQServerHandler.this.producer.produceResponse(str2, applicationEntity, protocolResponse, obj);
                                } catch (Exception e2) {
                                    KafkaMQServerHandler.LOG.error("Produce response failed", e2);
                                }
                            }
                        }
                    }
                }
            }
        });
    }
}
