package cn.rmt.support.kafka;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.message.MessageAndMetadata;
/**
* <br>
* <b>功能:</b>kafka log-msg consumer<br>
* <b>作者:</b>zhouzm<br>
* <b>日期:</b>2017-2-7<br>
* <b>版权所有:<b>前海乘势科技有限公司版权所有(C) 2015<br>
*/
public class ConsumerMsgThread extends Thread{
private static Logger logger = LoggerFactory.getLogger(ConsumerMsgThread.class);
private Integer asynTime;
private static Integer batchSize;
private KafkaStream<byte[], byte[]> stream;
private CountDownLatch consumerLatch;
private CountDownLatch loadEslatch;
private static ArrayBlockingQueue<String> queue;
private ExecutorService execES;
private ExecutorService execTime;
public ConsumerMsgThread() {
}
public ConsumerMsgThread(KafkaStream<byte[], byte[]> stream, Integer loadEsThreadNum,
Integer asynTime, Integer batchSize,CountDownLatch consumerLatch) {
this.stream = stream;
this.asynTime = asynTime;
ConsumerMsgThread.batchSize = batchSize;
this.consumerLatch = consumerLatch;
loadEslatch = new CountDownLatch(loadEsThreadNum);
execES = Executors.newFixedThreadPool(loadEsThreadNum);
execTime = Executors.newFixedThreadPool(1);
queue = new ArrayBlockingQueue<>(batchSize);
}
@Override
public void run() {
ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator();
//守护进程
execTime.execute( new Runnable() {
@Override
public void run() {
while(true){
List<String> tmp = new ArrayList<>();
for(int i=0; i<batchSize && queue.size()>0; i++){
String msg = "";
try {
msg = queue.take();
tmp.add(msg);
} catch (InterruptedException e) {
logger.error(" add msg item error msg="+msg);
}
}
if(tmp.size() > 0){
execES.execute(new MessageToESThread(
KafkaConsumerService.topic, tmp, loadEslatch) );
}
//堆积数据
try {
Thread.sleep(asynTime);
} catch (InterruptedException e) {
}
}
}
});
while (consumerIte.hasNext() ) {
MessageAndMetadata<byte[], byte[]> meta = consumerIte.next();
String msg = new String(meta.message(), Charset.forName("UTF-8"));
Map<String, Object> msgMap = null;
try{
msgMap = JSONObject.parseObject(msg, Map.class);
// msgMap = new HashMap<>();
// msgMap.put("msg", msg);
}catch(Exception e){
logger.error("message anlysis error msg="+msg);
e.printStackTrace();
}
msgMap.put("offset", meta.offset());
msgMap.put("timestamp", meta.timestamp());
msgMap.put("partition", meta.partition());
msgMap.put("topic", meta.topic());
String result = JSON.toJSONString(msgMap);
try {
queue.put(result);
} catch (InterruptedException e) {
logger.error("queue put msg item error msg="+result);
}
}
try {
logger.info("upload data to es thread exit succ ... ");
loadEslatch.await();
} catch (InterruptedException e) {
logger.error("upload data to es thread exit error "+e.getMessage() );
e.printStackTrace();
}finally{
consumerLatch.countDown();
}
}
}