Sparkstreaming_News

所属分类:云计算
开发工具:Scala
文件大小:6345KB
下载次数:0
上传日期:2022-07-01 17:42:04
上 传 者sh-1993
说明:  大数据项目实战之基于Spark2.X的新闻话题的实时统计分析
(Big data project practice - real-time statistical analysis of news topics based on Spark2. X)

文件列表:
SparkWeb (0, 2019-06-02)
SparkWeb\.idea (0, 2019-06-02)
SparkWeb\.idea\artifacts (0, 2019-06-02)
SparkWeb\.idea\artifacts\SparkWeb_war_exploded.xml (495, 2019-06-02)
SparkWeb\.idea\compiler.xml (628, 2019-06-02)
SparkWeb\.idea\libraries (0, 2019-06-02)
SparkWeb\.idea\libraries\Maven__antlr_antlr_2_7_7.xml (474, 2019-06-02)
SparkWeb\.idea\libraries\Maven__com_101tec_zkclient_0_3.xml (501, 2019-06-02)
SparkWeb\.idea\libraries\Maven__com_alibaba_fastjson_1_2_13.xml (526, 2019-06-02)
SparkWeb\.idea\libraries\Maven__com_clearspring_analytics_stream_2_7_0.xml (561, 2019-06-02)
SparkWeb\.idea\libraries\Maven__com_esotericsoftware_kryo_shaded_3_0_3.xml (576, 2019-06-02)
SparkWeb\.idea\libraries\Maven__com_esotericsoftware_minlog_1_3_0.xml (541, 2019-06-02)
SparkWeb\.idea\libraries\Maven__com_fasterxml_jackson_core_jackson_annotations_2_6_5.xml (656, 2019-06-02)
SparkWeb\.idea\libraries\Maven__com_fasterxml_jackson_core_jackson_core_2_6_5.xml (607, 2019-06-02)
SparkWeb\.idea\libraries\Maven__com_fasterxml_jackson_core_jackson_databind_2_6_5.xml (635, 2019-06-02)
SparkWeb\.idea\libraries\Maven__com_fasterxml_jackson_module_jackson_module_paranamer_2_6_5.xml (699, 2019-06-02)
SparkWeb\.idea\libraries\Maven__com_fasterxml_jackson_module_jackson_module_scala_2_11_2_6_5.xml (706, 2019-06-02)
SparkWeb\.idea\libraries\Maven__com_google_code_findbugs_jsr305_1_3_9.xml (557, 2019-06-02)
SparkWeb\.idea\libraries\Maven__com_google_guava_guava_16_0_1.xml (525, 2019-06-02)
SparkWeb\.idea\libraries\Maven__com_google_protobuf_protobuf_java_2_5_0.xml (586, 2019-06-02)
SparkWeb\.idea\libraries\Maven__com_googlecode_javaewah_JavaEWAH_0_3_2.xml (567, 2019-06-02)
SparkWeb\.idea\libraries\Maven__com_jamesmurty_utils_java_xmlbuilder_1_0.xml (590, 2019-06-02)
SparkWeb\.idea\libraries\Maven__com_jolbox_bonecp_0_8_0_RELEASE.xml (557, 2019-06-02)
SparkWeb\.idea\libraries\Maven__com_ning_compress_lzf_1_0_3.xml (535, 2019-06-02)
SparkWeb\.idea\libraries\Maven__com_sun_jersey_jersey_client_1_9.xml (552, 2019-06-02)
SparkWeb\.idea\libraries\Maven__com_sun_jersey_jersey_core_1_9.xml (538, 2019-06-02)
SparkWeb\.idea\libraries\Maven__com_sun_mail_javax_mail_1_5_0.xml (537, 2019-06-02)
SparkWeb\.idea\libraries\Maven__com_thoughtworks_paranamer_paranamer_2_3.xml (572, 2019-06-02)
SparkWeb\.idea\libraries\Maven__com_twitter_chill_2_11_0_8_0.xml (533, 2019-06-02)
SparkWeb\.idea\libraries\Maven__com_twitter_chill_java_0_8_0.xml (533, 2019-06-02)
SparkWeb\.idea\libraries\Maven__com_twitter_parquet_hadoop_bundle_1_6_0.xml (610, 2019-06-02)
SparkWeb\.idea\libraries\Maven__com_univocity_univocity_parsers_2_2_1.xml (590, 2019-06-02)
SparkWeb\.idea\libraries\Maven__com_yammer_metrics_metrics_core_2_2_0.xml (575, 2019-06-02)
SparkWeb\.idea\libraries\Maven__commons_beanutils_commons_beanutils_1_7_0.xml (606, 2019-06-02)
SparkWeb\.idea\libraries\Maven__commons_beanutils_commons_beanutils_core_1_8_0.xml (641, 2019-06-02)
SparkWeb\.idea\libraries\Maven__commons_cli_commons_cli_1_2.xml (526, 2019-06-02)
SparkWeb\.idea\libraries\Maven__commons_codec_commons_codec_1_10.xml (555, 2019-06-02)
SparkWeb\.idea\libraries\Maven__commons_collections_commons_collections_3_2_1.xml (628, 2019-06-02)
... ...

# Sparkstreaming_News 大数据项目实战之基于Spark2.X的新闻话题的实时统计分析 ## 一、业务需求分析 1.捕获用户浏览日志信息 2.实时分析前20名流量最高的新闻话题 3.实时统计当前线上已曝光的新闻话题 4.统计哪个时段用户浏览量最高 ## 二、系统架构图设计 ![image](https://github.com/huashishaojie/Sparkstreaming_News/blob/master/images/architecture.png) ## 三、项目介绍 本项目分为SparkS和SparkWeb两部分。 SparkS是本项目使用SparkStreaming近实时消费kafka中数据,使用mysqlPool向mysql中写入分析后数据的部分。 SparkWeb是本项目使用WebSocket和WebService搭建的前台展示页面,效果如下: ![image](https://github.com/huashishaojie/Sparkstreaming_News/blob/master/images/SparkWeb.jpg) ## 四、参考步骤 ### 1.创建hbase表 create 'weblogs','info' ### 2.配置flume文件 node2中: a2.sources = r1 a2.sinks = k1 a2.channels = c1 a2.sources.r1.type = exec a2.sources.r1.command = tail -F /opt/data/weblog-flume.log a2.sources.r1.channels = c1 a2.channels.c1.type = memory a2.channels.c1.capacity = 1000 a2.channels.c1.transactionCapacity = 1000 a2.channels.c1.keep-alive = 5 a2.sinks.k1.type = avro a2.sinks.k1.channel = c1 a2.sinks.k1.hostname = node1 a2.sinks.k1.port = 5555 ####################################################################### node3中: a3.sources = r1 a3.sinks = k1 a3.channels = c1 a3.sources.r1.type = exec a3.sources.r1.command = tail -F /opt/data/weblog-flume.log a3.sources.r1.channels = c1 a3.channels.c1.type = memory a3.channels.c1.capacity = 1000 a3.channels.c1.transactionCapacity = 1000 a3.channels.c1.keep-alive = 5 a3.sinks.k1.type = avro a3.sinks.k1.channel = c1 a3.sinks.k1.hostname = node1 a3.sinks.k1.port = 5555 ######################################################################## node1中: a1.sources = r1 a1.channels = kafkaC hbaseC a1.sinks = kafkaSink hbaseSink a1.sources.r1.type = avro a1.sources.r1.channels = hbaseC kafkaC a1.sources.r1.bind = node1 a1.sources.r1.port = 5555 a1.sources.r1.threads = 5 #****************************flume + hbase****************************** a1.channels.hbaseC.type = memory a1.channels.hbaseC.capacity = 10000 a1.channels.hbaseC.transactionCapacity = 10000 a1.channels.hbaseC.keep-alive = 20 a1.sinks.hbaseSink.type = asynchbase a1.sinks.hbaseSink.table = weblogs a1.sinks.hbaseSink.columnFamily = info a1.sinks.hbaseSink.serializer = org.apache.flume.sink.hbase.KfkAsyncHbaseEventSerializer a1.sinks.hbaseSink.channel = hbaseC a1.sinks.hbaseSink.serializer.payloadColumn = datetime,userid,searchname,retorder,cliorder,cliurl #****************************flume + kafka****************************** a1.channels.kafkaC.type = memory a1.channels.kafkaC.capacity = 10000 a1.channels.kafkaC.transactionCapacity = 10000 a1.channels.kafkaC.keep-alive = 20 a1.sinks.kafkaSink.channel = kafkaC a1.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.kafkaSink.brokerList = node1:9092,node2:9092,node3:9092 a1.sinks.kafkaSink.topic = weblogs a1.sinks.kafkaSink.zookeeperConnect = node1:2181,node2:2181,node3:2181 a1.sinks.kafkaSink.requiredAcks = 1 a1.sinks.kafkaSink.batchSize = 1 a1.sinks.kafkaSink.serializer.class = kafka.serializer.StringEncoder ### 3.日志数据格式处理 cat weblog.log |tr "\t" "," > weblog2.log // 将制表符改为逗号 cat word.txt | sed 's/[ ][ ]*/,/g' // 将多个空格换位逗号 ### 4.自定义flume的hbase sink并打成jar包上传到flume/lib下 ### 5.创建weblogs项目来采集数据,并打成jar包发布到服务器node2和node3(/opt/jars) ### 6.编写启动jar包weblogs程序的shell在node2和node3(/opt/shell) touch weblog-shell.sh #/bin/bash echo "start log......" java -jar /opt/jars/weblogs.jar /opt/data/weblog.log /opt/data/weblog-flume.log ### 7.编写flume集群服务启动脚本(node1,node2,node3中flume目录下,下例在node2中,node3中a2改为a3) #/bin/bash echo "flume-2 start" bin/flume-ng agent --conf conf -f conf/flume-conf.properties -n a2 -Dflume.root.logger=INFO.console ### 8.编写测试kafka消费的shell vi kfk-test-consumer.sh #/bin/bash echo "kfk-kafka-consumer.sh start......" bin/kafka-console-consumer.sh --zookeeper node1:2181,node2:2181,node3:2181 --from-beginning --topic weblogs ### 9.进行测试flume采集数据的全流程 1)启动hdfs、zookeeper、kafka、flume 2)启动weblog-shell.sh脚本 3)创建名为weblogs的topic bin/kafka-topics.sh --create --zookeeper node1:2181,node2:2181,node3:2181 --topic weblogs --partitions 1 --replication-factor 3 4)启动node2、node3的脚本发送数据到node1 5)启动node1的脚本接收node2、node3的数据,发送到hbase和kafka ### 10.安装mysql ### 11.安装hive(启动hive前需先启动yarn,因为mapreduce需在yarn上运行) 1)启动:bin/hive 2)测试加载数据到hive: load data local inpath '/opt/data/test.txt' into table test; 3)根据业务需求创建表结构 CREATE EXTERNAL TABLE weblogs( id string, datetime string, userid string, searchname string, retorder string, cliorder string, cliurl string ) STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' WITH SERDEPROPERTIES("hbase.columns.mapping"= ":key,info:datetime,info:userid,info:searchname,info:retorder,info:cliorder,info:cliurl") TBLPROPERTIES("hbase.table.name"="weblogs"); ### 12.Hive与Hbase集成 1)第一种方式,比较麻烦,将hbase下配置文件拷贝到hive/conf下 2)第二种方式 a)在hive-site.xml中配置 hbase.zookeeper.quorum node1,node2,node3 b)将hbase的9个jar拷贝到hive/lib下(high-scale-lib-1.1.2.jar自己下载) export HBASE_HOME=/opt/soft/hbase export HIVE_LIB=/opt/soft/hive-1.2.1-bin ln -s $HBASE_HOME/lib/hbase-server-1.1.3.jar $HIVE_LIB/lib/hbase-server-1.1.3.jar ln -s $HBASE_HOME/lib/hbase-client-1.1.3.jar $HIVE_LIB/lib/hbase-client-1.1.3.jar ln -s $HBASE_HOME/lib/hbase-protocol-1.1.3.jar $HIVE_LIB/lib/hbase-protocol-1.1.3.jar ln -s $HBASE_HOME/lib/hbase-it-1.1.3.jar $HIVE_LIB/lib/hbase-it-1.1.3.jar ln -s $HBASE_HOME/lib/htrace-core-3.1.0-incubating.jar $HIVE_LIB/lib/htrace-core-3.1.0-incubating.jar ln -s $HBASE_HOME/lib/hbase-hadoop2-compat-1.1.3.jar $HIVE_LIB/lib/hbase-hadoop2-compat-1.1.3.jar ln -s $HBASE_HOME/lib/hbase-hadoop-compat-1.1.3.jar $HIVE_LIB/lib/hbase-hadoop-compat-1.1.3.jar ln -s $HBASE_HOME/lib/high-scale-lib-1.1.2.jar $HIVE_LIB/lib/high-scale-lib-1.1.2.jar ln -s $HBASE_HOME/lib/hbase-common-1.1.3.jar $HIVE_LIB/lib/hbase-common-1.1.3.jar ### 13.Hue安装部署 1)下载 2)编译Hue a)安装需要依赖的包(下面的包可能多了几个) yum install ant asciidoc cyrus-sasl-devel cyrus-sasl-gssapi cyrus-sasl-plain gcc gcc-c++ krb5-devel libtidy libffi-devel libxml2-devel libxslt-devel make mysql mysql-devel openldap-devel python-devel sqlite-devel gmp-devel openssl-devel mysql-devel b)hue文件中:make apps 3)配置(vi $HUE_HOME/desktop/conf/hue.ini) secret_key=jFE93j;2[290-eiw.KEiwN2s3['d;/.q[eIW^y#e=+Iei*@Mn < qW5o http_host=node3 http_port=8888 time_zone=Asia/Shanghai 4)设置desktop.db的权限 [root@node3 desktop]# chmod o+w desktop.db 5)启动服务 [root@node3 hue-4.0.0]# ./build/env/bin/supervisor 如果出现错误KeyError: "Couldn't get user id for user hue" 如下:adduser hue,并将desktop.db改为hue:hue下,不要在root下 chown -R hue:hue desktop.db 6)Hue与Hive集成(hue.ini) fs_defaultfs=hdfs://node1:8020 // hdfs默认路径 webhdfs_url=http://node1:50070/webhdfs/v1 hadoop_conf_dir=/opt/soft/hadoop-2.***/etc/hadoop hadoop_bin=/opt/soft/hadoop-2.***/bin hadoop_hdfs_home=/opt/soft/hadoop-2.*** ------------------------------------------------------------ // 在三台hadoop中的core-site.xml中添加内容: hadoop.proxyuser.hue.hosts * hadoop.proxyuser.hue.groups * ------------------------------------------------------------ 启动hdfs: start-dfs.sh ------------------------------------------------------------ 访问url: http://node3:8888/filebrowser/ 7)Hue与Yarn集成(hue.ini) resourcemanager_host=zxl2 resourcemanager_port=8032 resourcemanager_api_url=http://node1:8088 proxy_api_url=http://node1:8088 history_server_api_url=http://node1:1***88 ------------------------------------------------------------ 启动yarn: start-yarn.sh 8)Hue与Hive集成(Hue.ini) hive_server_host=node3 hive_server_port=10000 hive_conf_dir=/opt/soft/hive-1.2.1-bin/conf ------------------------------------------------------------ 启动hive [root@node3 bin]# ./hive --service hiveserver2 9)Hue与Mysql集成(Hue.ini) nice_name="My SQL DB" // 随意配置 name=metastore // 数据库名 engine=mysql host=node3 port=3306 user=root password=1234 注意:[[[mysql]]]前的##要删掉 10)Hue与Hbase集成(Hue.ini) hbase_clusters=(Cluster|node1:9090) // 随意配置集群中某一台hbase hbase_conf_dir=/opt/soft/hbase/conf ------------------------------------------------------------ 启动hbase(thrift) [root@node1 hbase]# bin/start-hbase.sh [root@node1 hbase]# bin/hbase-daemon.sh start thrift // 启动一个就行 ### 14.配置Spark集群模式,因为受内存的影响,配置为standlone模式 ### 15.配置Spark SQL与Hive集成(此次在安装Hive的服务器里的spark配置) 1)将hive的配置文件hive-site.xml拷贝到spark conf目录,同时添加metastore的url配置 vi hive-site.xml hive.metastore.uris thrift://node3:9083 2)拷贝hive中的mysql jar包到spark的jar目录下 cp hive-1.2.1-bin/lib/mysql-connector-java-5.1.35-bin.jar spark-2.2.0/jars/ 3)检查spark-env.sh 文件中的配置项 vi spark-env.sh HADOOP_CONF_DIR=/opt/soft/hadoop-2.***/etc/hadoop 4)启动mysql service mysqld start 5)启动hive metastore服务 bin/hive --service metastore 6)启动hive并测试下 bin/hive show databases; create database zxl; use zxl; create table if not exists test(userid string,username string)ROW FORMAT DELIMITED FIELDS TERMINATED BY ' ' STORED AS textfile; load data local inpath "/opt/data/test.txt" into table test; more /opt/data/test.txt 0001 spark 0002 hive 0003 hbase 0004 hadoop 7)启动spark-shell bin/spark-shell spark.sql("select * from zxl.test").show 8)展示启动spark-sql bin/spark-sql show databases; #查看数据库 default zxl use zxl; #使用数据库 show tables; #查看表 test select * from test; #查看表数据 9)Spark SQL之ThriftServer和beeline使用 a)启动ThriftServer sbin/start-thriftserver.sh b)启动beeline bin/beeline !connect jdbc:hive2://node3:10000 show databases; #查看数据库 select * from kfk.test; #查看表数据 ### 16.配置Spark SQL与MySQL集成(spark1为test数据库中的表) 启动spark-shell bin/spark-shell :paste #可以多行输入,包括注释,需顶格书写 val jdbcDF = spark.read .format("jdbc") .option("url", "jdbc:mysql://node3:3306/test") .option("dbtable", "spark1") .option("user", "root") .option("password", 1234) .load() ctr+d #退出 #打印读取数据 jdbcDF.show ### 17.配置Spark SQL与Hbase集成 Spark SQL与HBase集成,其核心就是Spark Sql通过hive外部表来获取HBase的表数据。 1)拷贝HBase的包和hive包到spark 的jars目录下 hbase-client-1.1.3.jar hbase-common-1.1.3.jar hbase-protocol-1.1.3.jar hbase-server-1.1.3.jar hive-hbase-handler-1.2.1.jar htrace-core-3.1.0-incubating.jar #incubating表示刚出现版本 mysql-connector-java-5.1.35-bin.jar 2)启动spark-shell bin/spark-shell val df =spark.sql("select count(1) from weblogs").show ### 18.安装nc作为外部数据源 yum -y install nc 或者 rpm安装(rpm -ivh nc-1.84-24.el6.x86_***.rpm) ### 19.简单运行nc与spark例子 [root@node2 ~]# nc -lk 9999 [root@node2 spark-2.2.0]# bin/run-example --master local[2] streaming.NetworkWordCount localhost 9999 注:记得设置master时,local[n],n的值一定要大于worker的个数 ### 20.Spark Streaming结果数据保存到外部数据库(mysql) // 一般与数据库建立连接时,使用foreachPartition来避免频繁创建数据库连接 Class.forName("com.mysql.jdbc.Driver") val conn = DriverManager .getConnection("jdbc:mysql://node3:3306/test","root","1234") try{ for(row <- line){ val sql = "insert into webCount(titleName,count)values('"+row._1+"',"+row._2+")" conn.prepareStatement(sql).executeUpdate() } }finally { conn.close() } ### 21.StructuredStreaming与kafka、mysql集成 添加spark一些jar,spark+kfk和spark+hbase ### 22.创建表webCount用来接收数据 CREATE TABLE `webCount` ( `titleName` varchar(255) DEFAULT NULL, `count` int(11) DEFAULT NULL ) ENGINE=MyISAM DEFAULT CHARSET=utf8; ### 23.简单流程测试: 1)启动zookeeper:zkServer.sh start 2)启动dfs:start-dfs.sh 3)启动hbase:start-hbase.sh 4)启动mysql;service mysqld start 5)node2(node3)启动flume:flume-kfk-start.sh 6)node1启动flume:flume-kfk-start.sh 7)启动kafka-0.10(最好三台都启动,不然易出错): bin/kafka-server-start.sh config/server.properties > kafka.log 2>&1 & 8)启动node2(node3)中的脚本:weblog-shell.sh 9)启动应用程序 10)解决Structured Streaming向数据库写入乱码 1)修改数据库文件my.cnf(linux下) vi my.cnf ----------------------------------------------------------------------------- [client] socket=/var/lib/mysql/mysql.sock //添加 default-character-set=utf8 //添加 [mysqld] character-set-server=utf8 //添加 datadir=/var/lib/mysql socket=/var/lib/mysql/mysql.sock user=mysql # Disabling symbolic-links is recommended to prevent assorted security risks symbolic-links=0 [mysqld_safe] log-error=/var/log/mysqld.log pid-file=/var/run/mysqld/mysqld.pid ----------------------------------------------------------------------------- 2)建表时形如下: CREATE TABLE `webCount` ( `titleName` varchar(255) CHARACTER SET utf8 DEFAULT NULL, `count` int(11) DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8;

近期下载者

相关文件


收藏者