kafka elasticsearch

  • R4_706701
    了解作者
  • 6.7KB
    文件大小
  • zip
    文件格式
  • 0
    收藏次数
  • VIP专享
    资源类型
  • 0
    下载次数
  • 2022-06-08 12:43
    上传日期
通过获取kafka消息列队消费发送到elasticsearch做持久存储
kafka.zip
  • kafka
  • MessageToESThread.java
    1KB
  • service-es-beans.xml
    1.6KB
  • KafkaConsumerService.java
    3.3KB
  • ESClientFactory.java
    2.2KB
  • ESUtil.java
    3.5KB
  • ConsumerMsgThread.java
    3.6KB
内容介绍
package cn.rmt.support.kafka; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.message.MessageAndMetadata; /** * <br> * <b>功能:</b>kafka log-msg consumer<br> * <b>作者:</b>zhouzm<br> * <b>日期:</b>2017-2-7<br> * <b>版权所有:<b>前海乘势科技有限公司版权所有(C) 2015<br> */ public class ConsumerMsgThread extends Thread{ private static Logger logger = LoggerFactory.getLogger(ConsumerMsgThread.class); private Integer asynTime; private static Integer batchSize; private KafkaStream<byte[], byte[]> stream; private CountDownLatch consumerLatch; private CountDownLatch loadEslatch; private static ArrayBlockingQueue<String> queue; private ExecutorService execES; private ExecutorService execTime; public ConsumerMsgThread() { } public ConsumerMsgThread(KafkaStream<byte[], byte[]> stream, Integer loadEsThreadNum, Integer asynTime, Integer batchSize,CountDownLatch consumerLatch) { this.stream = stream; this.asynTime = asynTime; ConsumerMsgThread.batchSize = batchSize; this.consumerLatch = consumerLatch; loadEslatch = new CountDownLatch(loadEsThreadNum); execES = Executors.newFixedThreadPool(loadEsThreadNum); execTime = Executors.newFixedThreadPool(1); queue = new ArrayBlockingQueue<>(batchSize); } @Override public void run() { ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator(); //守护进程 execTime.execute( new Runnable() { @Override public void run() { while(true){ List<String> tmp = new ArrayList<>(); for(int i=0; i<batchSize && queue.size()>0; i++){ String msg = ""; try { msg = queue.take(); tmp.add(msg); } catch (InterruptedException e) { logger.error(" add msg item error msg="+msg); } } if(tmp.size() > 0){ execES.execute(new MessageToESThread( KafkaConsumerService.topic, tmp, loadEslatch) ); } //堆积数据 try { Thread.sleep(asynTime); } catch (InterruptedException e) { } } } }); while (consumerIte.hasNext() ) { MessageAndMetadata<byte[], byte[]> meta = consumerIte.next(); String msg = new String(meta.message(), Charset.forName("UTF-8")); Map<String, Object> msgMap = null; try{ msgMap = JSONObject.parseObject(msg, Map.class); // msgMap = new HashMap<>(); // msgMap.put("msg", msg); }catch(Exception e){ logger.error("message anlysis error msg="+msg); e.printStackTrace(); } msgMap.put("offset", meta.offset()); msgMap.put("timestamp", meta.timestamp()); msgMap.put("partition", meta.partition()); msgMap.put("topic", meta.topic()); String result = JSON.toJSONString(msgMap); try { queue.put(result); } catch (InterruptedException e) { logger.error("queue put msg item error msg="+result); } } try { logger.info("upload data to es thread exit succ ... "); loadEslatch.await(); } catch (InterruptedException e) { logger.error("upload data to es thread exit error "+e.getMessage() ); e.printStackTrace(); }finally{ consumerLatch.countDown(); } } }
评论
    相关推荐
    • Kafka技术内幕
      Kafka技术内幕 Kafka技术内幕:图文详解Kafka源码设计与实现.郑奇煌
    • chaperone:Kafka审核系统
      基本上,伴侣将时间线切成10分钟的存储桶,并根据其事件时间将消息分配给相应的存储桶。 存储桶的统计信息会相应更新,例如消息总数。 这些统计数据会定期发送给专门的Kafka主题,例如“陪伴审核”。 ...
    • Kafka管理工具Kafka Tool
      Kafka Tool是一个用于管理和使用Apache Kafka集群的GUI应用程序。它提供了一个直观的UI,允许用户快速查看Kafka群集中的对象以及存储在群集主题中的消息。
    • Kafka技术内幕
      本书以0.10版本的源码为基础,深入分析了Kafka的设计与实现,包括生产者和消费者的消息处理流程,新旧消费者不同的设计方式,存储层的实现,协调者和控制器如何确保Kafka集群的分布式和容错特性,两种同步集群工具...
    • kafka-playground
      卡夫卡游乐场 一个用于存放一些与Kafka相关的测试代码的运动场存储库。
    • Kafka_Learn.zip
      该代码包含kafka的生产者、消费者原理详解,各种参数解析,主题、分区、存储等的代码演示,可用于搭配博客学习
    • kafka-journal:使用Kafka作为主要存储的事件源日志实现
      kafka-journal:使用Kafka作为主要存储的事件源日志实现
    • zipkin-storage-kafka:基于Kafka的Zipkin存储
      邮编存储Kafka [EXPERIMENTAL] 基于Kafka的Zipkin存储。 +----------------------------*zipkin*---------------------------------------------- | [ dependency-storage ]--->( dependencies ) | ^ +--&...
    • kafka
      该项目需要将ojdbc jar文件添加到本地Maven存储库中 jar文件在仓库中可用 mvn install:安装文件-Dfile = ojdbc6.jar -DgroupId = com.oracle -DartifactId = ojdbc6 -Dversion = 12.1.0.2 -Dpackaging = jar
    • SIM800C_MQTT.rar
      使用SIM800C模块,使用MQTT协议,连接中国移动onenet平台,能实现数据的订阅、发布、存储等