BigData-News

所属分类:云计算
开发工具:Scala
文件大小:292KB
下载次数:0
上传日期:2019-04-03 06:34:02
上 传 者sh-1993
说明:  基于Spark2.2新闻网大数据实时系统项目
(Large data real-time system project based on Spark2.2 News Network)

文件列表:
.idea (0, 2019-04-03)
.idea\BigData-News.iml (742, 2019-04-03)
.idea\artifacts (0, 2019-04-03)
.idea\artifacts\structured_streaming_demo_jar.xml (26640, 2019-04-03)
.idea\compiler.xml (928, 2019-04-03)
.idea\encodings.xml (197, 2019-04-03)
.idea\hydra.xml (383, 2019-04-03)
.idea\libraries (0, 2019-04-03)
.idea\libraries\Maven__antlr_antlr_2_7_7.xml (462, 2019-04-03)
.idea\libraries\Maven__aopalliance_aopalliance_1_0.xml (514, 2019-04-03)
.idea\libraries\Maven__asm_asm_3_1.xml (426, 2019-04-03)
.idea\libraries\Maven__com_101tec_zkclient_0_10.xml (484, 2019-04-03)
.idea\libraries\Maven__com_clearspring_analytics_stream_2_7_0.xml (549, 2019-04-03)
.idea\libraries\Maven__com_databricks_spark_csv_2_11_1_5_0.xml (561, 2019-04-03)
.idea\libraries\Maven__com_esotericsoftware_kryo_shaded_3_0_3.xml (564, 2019-04-03)
.idea\libraries\Maven__com_esotericsoftware_minlog_1_3_0.xml (529, 2019-04-03)
.idea\libraries\Maven__com_fasterxml_jackson_core_jackson_annotations_2_6_5.xml (644, 2019-04-03)
.idea\libraries\Maven__com_fasterxml_jackson_core_jackson_core_2_6_5.xml (595, 2019-04-03)
.idea\libraries\Maven__com_fasterxml_jackson_core_jackson_databind_2_6_5.xml (623, 2019-04-03)
.idea\libraries\Maven__com_fasterxml_jackson_module_jackson_module_paranamer_2_6_5.xml (687, 2019-04-03)
.idea\libraries\Maven__com_fasterxml_jackson_module_jackson_module_scala_2_11_2_6_5.xml (694, 2019-04-03)
.idea\libraries\Maven__com_github_stephenc_findbugs_findbugs_annotations_1_3_9_1.xml (673, 2019-04-03)
.idea\libraries\Maven__com_github_stephenc_high_scale_lib_high_scale_lib_1_1_1.xml (641, 2019-04-03)
.idea\libraries\Maven__com_github_stephenc_jcip_jcip_annotations_1_0_1.xml (615, 2019-04-03)
.idea\libraries\Maven__com_google_code_findbugs_jsr305_1_3_9.xml (545, 2019-04-03)
.idea\libraries\Maven__com_google_code_gson_gson_2_2_2.xml (515, 2019-04-03)
.idea\libraries\Maven__com_google_code_gson_gson_2_2_4.xml (515, 2019-04-03)
.idea\libraries\Maven__com_google_guava_guava_11_0_2.xml (513, 2019-04-03)
.idea\libraries\Maven__com_google_guava_guava_16_0_1.xml (513, 2019-04-03)
.idea\libraries\Maven__com_google_inject_extensions_guice_assistedinject_3_0.xml (645, 2019-04-03)
.idea\libraries\Maven__com_google_inject_extensions_guice_servlet_3_0.xml (596, 2019-04-03)
.idea\libraries\Maven__com_google_inject_guice_3_0.xml (496, 2019-04-03)
.idea\libraries\Maven__com_google_protobuf_protobuf_java_2_5_0.xml (574, 2019-04-03)
.idea\libraries\Maven__com_googlecode_javaewah_JavaEWAH_0_3_2.xml (555, 2019-04-03)
.idea\libraries\Maven__com_jamesmurty_utils_java_xmlbuilder_0_4.xml (578, 2019-04-03)
.idea\libraries\Maven__com_jamesmurty_utils_java_xmlbuilder_1_0.xml (578, 2019-04-03)
.idea\libraries\Maven__com_jcraft_jsch_0_1_42.xml (482, 2019-04-03)
.idea\libraries\Maven__com_jolbox_bonecp_0_8_0_RELEASE.xml (545, 2019-04-03)
... ...

## 基于Spark2.2新闻网大数据实时系统项目 ### 1. 说明 [项目代码](https://github.com/pkeropen/BigData-News)是参考[基于Spark2.x新闻网大数据实时分析可视化系统项目](https://blog.csdn.net/u011254180/article/details/80172452) 或者[大数据项目实战之新闻话题的实时统计分析](http://www.raincent.com/content-10-11077-1.html),谢谢作者分享心得! ### 2.环境配置 ##### 2.1 CDH-5.14.2 (安装步骤可参考[地址](https://blog.51cto.com/kaliarch/2122467)),关于版本是按实际操作, CDH的版本兼容性很好。 |Service | hadoop01 | hadoop02 | hadoop03 |:----------|:----------|:----------|:--------- |HDFS | NameNode | DateNode | DataNode |HBase | HMaster、HRegionServer | HRegionServer| HRegionServer |Hive | Hive |Flume | Flume | Flume | Flume |Kafka | Kafka |YARN | ResourceManager | NodeManager | NodeManager |Oozie | Oozie |Hue | Hue |Spark2 | Spark |Zookeeper | Zookeeper |MySQL | MySQL ##### 2.2 主机配置 ``` 1.Hadoop01, 4核16G , centos7.2 2.Hadoop02, 2核8G, centos7.2 3.Haddop03, 2核8G, centos7.2 ``` ##### 2.3 项目架构 ![项目架构图](https://github.com/pkeropen/BigData-News/blob/master/pic/Architecture.png) ##### 2.4 安装依赖包 ``` # yum -y install psmisc MySQL-python at bc bind-libs bind-utils cups-client cups-libs cyrus-sasl-gssapi cyrus-sasl-plain ed fuse fuse-libs httpd httpd-tools keyutils-libs-devel krb5-devel libcom_err-devel libselinux-devel libsepol-devel libverto-devel mailcap noarch mailx mod_ssl openssl-devel pcre-devel postgresql-libs python-psycopg2 redhat-lsb-core redhat-lsb-submod-security x86_*** spax time zlib-devel wget psmisc # chmod +x /etc/rc.d/rc.local # echo "echo 0 > /proc/sys/vm/swappiness" >>/etc/rc.d/rc.local # echo "echo never > /sys/kernel/mm/transparent_hugepage/defrag" >>/etc/rc.d/rc.local # echo 0 > /proc/sys/vm/swappiness # echo never > /sys/kernel/mm/transparent_hugepage/defrag # yum -y install rpcbind # systemctl start rpcbind # echo "systemctl start rpcbind" >> /etc/rc.d/rc.local 安装perl支持 yum install perl* (yum安装perl相关支持) yum install cpan (perl需要的程序库,需要cpan的支持,详细自行百度) ``` ### 3. 编写数据生成模拟程序 ##### 3.1 模拟从nginx生成日志的log,数据来源(搜狗实验室[下载](https://www.sogou.com/labs/resource/q.php)用户查询日志,搜索引擎查询日志库设计为包括约1个月(2008年6月)Sogou搜索引擎部分网页查询需求及用户点击情况的网页查询日志数据集合。) ##### 3.2 数据清洗 ##### 数据格式为:访问时间\t用户ID\t[查询词]\t该URL在返回结果中的排名\t用户点击的顺序号\t用户点击的URL其中,用户ID是根据用户使用浏览器访问搜索引擎时的Cookie信息自动赋值,即同一次使用浏览器输入的不同查询对应同一个用户ID 1. 将文件中的tab更换成逗号 ``` cat weblog.log|tr "\t" "," > weblog2.log ``` 2. 将文件中的空格更换成逗号 ``` cat weblog2.log|tr " " "," > weblog.log ``` ##### 3.3 主要代码段 ``` public static void readFileByLines(String fileName) { FileInputStream fis = null; InputStreamReader isr = null; BufferedReader br = null; String tempString = null; try { System.out.println("以行为单位读取文件内容,一次读一整行:"); fis = new FileInputStream(fileName); //// 从文件系统中的某个文件中获取字节 isr = new InputStreamReader(fis, "GBK"); br = new BufferedReader(isr); int count = 0; while ((tempString = br.readLine()) != null) { count++; //显示行号 Thread.sleep(300); String str = new String(tempString.getBytes("GBK"), "UTF8"); System.out.println("row:"+count+">>>>>>>>"+str); writeFile(writeFileName, str); } isr.close(); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } finally { if (isr != null) { try { isr.close(); } catch (IOException e1) { } } } } ``` #### 3.4 打包成weblogs.jar,[打包步骤](https://blog.csdn.net/xuemengrui12/article/details/74***4731), 写Shell脚本weblog-shell.sh ``` #/bin/bash echo "start log......" #第一个参数是原日志文件,第二个参数是日志生成输出文件 java -jar /opt/jars/weblogs.jar /opt/datas/weblog.log /opt/datas/weblog-flume.log ``` #### 3.5 修改weblog-shell.sh可执行权限 ``` chmod 777 weblog-shell.sh ``` ### 4. Flume数据采集配置 ##### 4.1 将hadoop02, hadoop03中Flume数据采集到hadoop01中,而且hadoop02和hadoop03的flume配置文件大致相同 ``` flume-collect-conf.properties # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type =exec a1.sources.r1.command= tail -F /opt/datas/weblog-flume.log # Describe the sink a1.sinks.k1.type = avro a1.sinks.k1.hostname = hadoop01 a1.sinks.k1.port = 5555 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 1000 a1.channels.c1.keep-alive = 5 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 ``` ##### 4.2 hadoop01通过flume接收hadoop02与hadoop03中flume传来的数据,并将其分别发送至hbase与kafka中,配置内容如下: ``` a1.sources = r1 a1.channels = kafkaC hbaseC a1.sinks = kafkaSink hbaseSink a1.sources.r1.type = avro a1.sources.r1.channels = hbaseC kafkaC a1.sources.r1.bind = hadoop01 a1.sources.r1.port = 5555 a1.sources.r1.threads = 5 #****************************flume + hbase****************************** a1.channels.hbaseC.type = memory a1.channels.hbaseC.capacity = 10000 a1.channels.hbaseC.transactionCapacity = 10000 a1.channels.hbaseC.keep-alive = 20 a1.sinks.hbaseSink.type = asynchbase ## HBase表名 a1.sinks.hbaseSink.table = weblogs ## HBase表的列族名称 a1.sinks.hbaseSink.columnFamily = info ## 自定义异步写入Hbase a1.sinks.hbaseSink.serializer = main.hbase.KfkAsyncHbaseEventSerializer a1.sinks.hbaseSink.channel = hbaseC ## Hbase表的列 名称 a1.sinks.hbaseSink.serializer.payloadColumn = datetime,userid,searchname,retorder,cliorder,cliurl #****************************flume + kafka****************************** a1.channels.kafkaC.type = memory a1.channels.kafkaC.capacity = 10000 a1.channels.kafkaC.transactionCapacity = 10000 a1.channels.kafkaC.keep-alive = 20 a1.sinks.kafkaSink.channel = kafkaC a1.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.kafkaSink.brokerList = hadoop01:9092 a1.sinks.kafkaSink.topic = webCount a1.sinks.kafkaSink.zookeeperConnect = hadoop01:2181 a1.sinks.kafkaSink.requiredAcks = 1 a1.sinks.kafkaSink.batchSize = 1 a1.sinks.kafkaSink.serializer.class = kafka.serializer.StringEncoder ``` ##### 4.3 配置Flume执行Shell脚本 ``` flume-collect-start.sh 分发到hadoop02,hadoop03 ,/opt/shell/ #/bin/bash echo "flume-collect start ......" sh /bin/flume-ng agent --conf conf -f /opt/conf/flume-collect-conf.properties -n a1 -Dflume.root.logger=INFO,console ``` ``` flume-kfk-hb-start.sh 分发到hadoop01 ,/opt/shell #/bin/bash echo "flume-collect start ......" sh /bin/flume-ng agent --conf conf -f /opt/conf/flume-hbase-kafka-conf.properties -n a1 -Dflume.root.logger=INFO,console ``` ##### 4.4 Flume分发到Hbase集成 下载Flume源码并导入IDEA开发工具 1)将apache-flume-1.7.0-src.tar.gz源码下载到本地解压 2)通过IDEA导入flume源码 3)根据flume-ng-hbase-sink模块源码修改 4)修改代码SimpleAsyncHbaseEventSerializer.java 5)具体代码看源码 ``` KfkAsyncHbaseEventSerializer.java 关键代码 @Override public List getActions() { List actions = new ArrayList(); if (payloadColumn != null) { byte[] rowKey; try { /*---------------------------代码修改开始---------------------------------*/ // 解析列字段 String[] columns = new String(this.payloadColumn).split(","); // 解析flume采集过来的每行的值 String[] values = new String(this.payload).split(","); for (int i = 0; i < columns.length; i++) { byte[] colColumn = columns[i].getBytes(); byte[] colValue = values[i].getBytes(Charsets.UTF_8); // 数据校验:字段和值是否对应 if (columns.length != values.length) break; // 时间 String datetime = values[0].toString(); // 用户id String userid = values[1].toString(); // 根据业务自定义Rowkey rowKey = SimpleRowKeyGenerator.getKfkRowKey(userid, datetime); // 插入数据 PutRequest putRequest = new PutRequest(table, rowKey, cf, colColumn, colValue); actions.add(putRequest); /*---------------------------代码修改结束---------------------------------*/ } } catch (Exception e) { throw new FlumeException("Could not get row key!", e); } } return actions; } ``` ##### 4.5 将项目打包成jar,vita-flume-ng-hbase-sink.jar,分发到CDH的Flume/libs/下 ### 5. Kafka配置(测试环境,Kafka部署hadoop01,不做高可用) ##### 5.1 配置 配置advertised.listeners:=PLAINTEXT://xxxx:9092 ##### 5.2 测试生产消费是否成功 ``` //create topic,副本数为1、分区数为1的topic,如果是配置了auto.create.topics.enable参数为true,可以忽略 sh bin/kafka-topics.sh --create --zookeeper hadoop01:2181 --topic webCount --replication-factor 1 --partitions 1 //producer sh /bin/kafka-console-producer --broker-list hadoop01:9092 --topic webCount //consumer sh /bin/kafka-console-consumer --zookeeper hadoop01:2181 --topic webCount --from-beginning //delete topic sh /bin/kafka-topics --delete --zookeeper hadoop01 --topic webCount //topic list sh /bin/kafka-topics --zookeeper hadoop01:2181 --list ``` ##### 5.3 编写Kafka Consumer执行脚本kfk-test-consumer.sh,分发到/opt/shell/ ``` #/bin/bash echo "kfk-kafka-consumer.sh start......" /bin/kafka-console-consumer --zookeeper hadoop01:2181 --from-beginning --topic webCount ``` ### 6. Hbase配置 ##### 6.1 创建业务表 ``` create 'weblogs','info' //查看数据 count 'weblogs' ``` ### 7. Hive配置 ##### 7.1 CDH配置 Hive与Hbase集成,或者配置 ``` hbase.zookeeper.quorum hadoop01,hadoop02,hadoop03 ``` ##### 7.2 在hive中创建与hbase集成的外部表 ``` CREATE EXTERNAL TABLE weblogs( id string, datetime string, userid string, searchname string, retorder string, cliorder string, cliurl string ) STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' WITH SERDEPROPERTIES("hbase.columns.mapping"= ":key,info:datetime,info:userid,info:searchname,info:retorder,info:cliorder,info:cliurl") TBLPROPERTIES("hbase.table.name"="weblogs"); #查看hbase数据记录 select count(*) from weblogs; # 查看表 show tables; # 查看前10条数据 select * from weblogs limit 10; ``` ### 8. Structured Streaming配置 ##### 8.1 测试Spark与mysql ``` val df =spark.sql("select count(1) from weblogs").show ``` ##### 8.2 Structured Streaming与MySQL集成 mysql创建相应的数据库和数据表,用于接收数据 ``` create database test; use test; CREATE TABLE `webCount` ( `titleName` varchar(255) CHARACTER SET utf8 DEFAULT NULL, `count` int(11) DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8; ``` ##### 8.3 Structured Streaming关键代码 ``` /** * 结构化流从kafka中读取数据存储到关系型数据库mysql * 目前结构化流对kafka的要求版本0.10及以上 */ object StructuredStreamingKafka { case class Weblog(datatime: String, userid: String, searchname: String, retorder: String, cliorder: String, cliurl: String) val LOGGER: Logger = LogManager.getLogger("vita") def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .master("yarn") .appName("streaming") .getOrCreate() val df = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "hadoop01:9092") .option("subscribe", "webCount") .load() import spark.implicits._ val lines = df.selectExpr("CAST(value AS STRING)").as[String] // // lines.map(_.split(",")).foreach(x => print(" 0 = " + x(0) + " 1 = " + x(1) + " 2 = " + x(2) + " 3 = " + x(3) + " 4 = " + x(4) + " 5 = " + x(5))) val weblog = lines.map(_.split(",")) .map(x => Weblog(x(0), x(1), x(2), x(3), x(4), x(5))) val titleCount = weblog .groupBy("searchname") .count() .toDF("titleName", "count") val url = "jdbc:mysql://hadoop01:3306/test" val username = "root" val password = "root" val writer = new JDBCSink(url, username, password) // val writer = new MysqlSink(url, username, password) val query = titleCount .writeStream .foreach(writer) .outputMode("update") .trigger(ProcessingTime("5 seconds")) .start() query.awaitTermination() } } ``` ##### 8.4 项目打包,spark-weblogs.jar. ### 9. 启动流程 ##### 9.1 CDH启动Zookeeper,Hadoop,Hbase,Mysql,Yarn,Flume,Kafka ##### 9.2 先在Hadoop01 执行/opt/shell/flume-kfk-hb-start.sh 将数据分别传到hbase和kafka中 ##### 9.3 在Hadoop02,Hadoop03 执行/opt/shell/flume-collect-start.sh 将数据发送到Hadoop01中 ##### 9.4 在hadoop01 , 执行提交Spark任务 ``` spark on yarn, 集成spark-sql-kafka sh /bin/spark2-submit \ --class com.vita.spark.StructuredStreamingKafka \ --master yarn \ --deploy-mode cluster \ --executor-memory 1G \ --executor-cores 2 \ --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0 \ /opt/jars/spark-weblogs.jar \ 10 ``` 用IDEA 远程调试Spark代码,参考[地址](https://blog.csdn.net/yiluohan0307/article/details/80048765) ``` sh /bin/spark2-submit \ --class com.vita.spark.StructuredStreamingKafka \ --master yarn \ --deploy-mode cluster \ --executor-memory 1G \ --executor-cores 1 \ --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0 \ --driver-java-options "-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=5005" \ /opt/jars/spark-weblogs.jar \ 10 ``` * Yarn kill Spark任务 : yarn application -kill [任务名] ##### 9.5 在Hadoop02,Hadoop03 执行/opt/weblog-shell.sh , 启动 StructuredStreamingKafka来从kafka中取得数据,处理后存到mysql中 ##### 9.6 登录MySQL ,查看webCount表

近期下载者

相关文件


收藏者