package com.nepxion.thunder.protocol.kafka;

import com.nepxion.thunder.common.constant.ThunderConstant;
import com.nepxion.thunder.common.entity.ApplicationEntity;
import com.nepxion.thunder.common.entity.MQPropertyEntity;
import com.nepxion.thunder.common.thread.ThreadPoolFactory;
import com.nepxion.thunder.protocol.ProtocolResponse;
import com.nepxion.thunder.serialization.SerializerExecutor;
import java.util.Arrays;
import java.util.Iterator;
import java.util.concurrent.Callable;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/nepxion/thunder/protocol/kafka/KafkaMQClientHandler.class */
public class KafkaMQClientHandler extends KafkaMQConsumer {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaMQClientHandler.class);
    private int consumerClientPollTimeout;
    private boolean transportLogPrint;

    public KafkaMQClientHandler(MQPropertyEntity mQPropertyEntity, String str) {
        super(mQPropertyEntity, str);
        this.consumerClientPollTimeout = 500;
        try {
            this.consumerClientPollTimeout = mQPropertyEntity.getInteger(ThunderConstant.KAFKA_CONSUMER_CLIENT_POLL_TIMEOUT_ATTRIBUTE_NAME);
            this.transportLogPrint = mQPropertyEntity.getBoolean(ThunderConstant.TRANSPORT_LOG_PRINT_ATTRIBUTE_NAME);
        } catch (Exception e) {
            LOG.error("Get properties failed", e);
        }
    }

    public void consume(final String str, final String str2, ApplicationEntity applicationEntity) throws Exception {
        final String url = applicationEntity.toUrl();
        ThreadPoolFactory.createThreadPoolClientExecutor(url, str2).submit(new Callable<Object>() { // from class: com.nepxion.thunder.protocol.kafka.KafkaMQClientHandler.1
            @Override // java.util.concurrent.Callable
            public Object call() throws Exception {
                KafkaMQClientHandler.this.consumer.assign(Arrays.asList(new TopicPartition(str, KafkaMQClientHandler.this.getPartitionIndex(KafkaMQClientHandler.this.consumer, str, url))));
                while (true) {
                    ConsumerRecords poll = KafkaMQClientHandler.this.consumer.poll(KafkaMQClientHandler.this.consumerClientPollTimeout);
                    if (poll != null && poll.count() != 0) {
                        Iterator it = poll.iterator();
                        while (it.hasNext()) {
                            ProtocolResponse protocolResponse = (ProtocolResponse) SerializerExecutor.deserialize((byte[]) ((ConsumerRecord) it.next()).value());
                            try {
                                String obj = protocolResponse.getResponseSource().toString();
                                if (KafkaMQClientHandler.this.transportLogPrint) {
                                    KafkaMQClientHandler.LOG.info("Response from server={}, service={}", obj, str2);
                                }
                                KafkaMQClientHandler.this.executorContainer.getClientExecutorAdapter().handle(protocolResponse);
                            } catch (Exception e) {
                                KafkaMQClientHandler.LOG.error("Consume request failed", e);
                            }
                        }
                    }
                }
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public int getPartitionIndex(Consumer<String, byte[]> consumer, String str, String str2) {
        return (Utils.murmur2(new StringSerializer().serialize(str, str2)) & Integer.MAX_VALUE) % consumer.partitionsFor(str).size();
    }
}
