FlinkCDC-Hudi
所属分类:大数据
开发工具:Others
文件大小:0KB
下载次数:0
上传日期:2022-04-26 14:48:17
上 传 者:
sh-1993
说明: 本案例为flink sql实现CDC mysql数据到Hudi,照官网案例整理。,
(In this case, Flink sql implements CDC mysql data to Hudi, which is sorted according to the official website case.,)
文件列表:
flink-sql-connector-mysql-cdc-2.2.0.jar (22093914, 2022-04-26)
hudi-hadoop-mr-bundle-0.10.0.jar (17276369, 2022-04-26)
# 实时捕获MYSQL数据入湖
## 1. 概述
FlinkCDC实时数据入湖是当下最具代表性的数据处理架构,本案例实时捕获mysql表变更数据,将products、orders、shipments表关联结果实时入湖,入湖方式读多写少选择MOR,写多读少选择COW。版本为Flink1.13.2,Hudi版本为0.10,hadoop版本为3.2.2,hive版本为3.1.2。主要满足以下场景:
* 代替传统架构中ODS;
* 上游实时数据动态捕获;
* 解决流存储,实现流计算和存储闭环;
* 支持ACID,做拉链、归档等更简单;
* 向数据应用形成湖仓一体。
## 2. 准备
hudi-flink-bundle_2.12-0.10.0-SNAPSHOT.jar、flink-shaded-hadoop-2-uber-2.8.3-10.0.jar拷贝到 $FLINK_HOME/lib目录下
hudi-hadoop-mr-bundle-0.10.0-SNAPSHOT.jar拷贝到 $HIVE_HOME/auxlib/目录下
**启动Flink:**
start-cluster.sh
** 启动hive:**
hive --service metastore &
hive --service hiveserver2 &
## 3. Mysql建表
```sql
-- MySQL
CREATE DATABASE appdb;
USE appdb;
CREATE TABLE products (
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description VARCHAR(512)
);
ALTER TABLE products AUTO_INCREMENT = 101;
INSERT INTO products
VALUES (default,"scooter","Small 2-wheel scooter"),
(default,"car battery","12V car battery"),
(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
(default,"hammer","12oz carpenter's hammer"),
(default,"hammer","14oz carpenter's hammer"),
(default,"hammer","16oz carpenter's hammer"),
(default,"rocks","box of assorted rocks"),
(default,"jacket","water resistent black wind breaker"),
(default,"spare tire","24 inch spare tire");
CREATE TABLE orders (
order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
order_date DATETIME NOT NULL,
customer_name VARCHAR(255) NOT NULL,
price DECIMAL(10, 5) NOT NULL,
product_id INTEGER NOT NULL,
order_status BOOLEAN NOT NULL -- Whether order has been placed
) AUTO_INCREMENT = 10001;
INSERT INTO orders
VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false),
(default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false),
(default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false);
CREATE TABLE shipments (
shipment_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
order_id INTEGER NOT NULL,
origin VARCHAR(255) NOT NULL,
destination VARCHAR(255) NOT NULL,
is_arrived BOOLEAN NOT NULL
) AUTO_INCREMENT = 10001;
INSERT INTO shipments
VALUES (default,10001,'Beijing','Shanghai',false),
(default,10002,'Hangzhou','Shanghai',false),
(default,10003,'Shanghai','Hangzhou',false);
```
## 4. Flink注册表
[CDC配置参数详情](https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html#supported-databases)
[Hudi配置参数详情](https://hudi.apache.org/docs/configurations#FLINK_SQL)
```sql
--First, enable checkpoints every 3 seconds
-- Flink SQL
Flink SQL> SET execution.checkpointing.interval = 3s;
--Then, create tables that capture the change data from the corresponding database tables.
-- Flink SQL
Flink SQL> CREATE TABLE products (
id INT,
name STRING,
description STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '192.168.1.161',
'port' = '3306',
'username' = 'root',
'password' = '@Rq834009465',
'database-name' = 'appdb',
'table-name' = 'products'
);
Flink SQL> CREATE TABLE orders (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '192.168.1.161',
'port' = '3306',
'username' = 'root',
'password' = '@Rq834009465',
'database-name' = 'appdb',
'table-name' = 'orders'
);
Flink SQL> CREATE TABLE shipments (
shipment_id INT,
order_id INT,
origin STRING,
destination STRING,
is_arrived BOOLEAN,
PRIMARY KEY (shipment_id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '192.168.1.161',
'port' = '3306',
'username' = 'root',
'password' = '@Rq834009465',
'database-name' = 'appdb',
'table-name' = 'shipments'
);
--Finally, create enriched_orders table that is used to load data to the Hudi
-- Flink SQL COW方式入湖
Flink SQL> CREATE TABLE enriched_orders1 (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
product_name STRING,
product_description STRING,
shipment_id INT,
origin STRING,
destination STRING,
is_arrived BOOLEAN,
PRIMARY KEY (order_id) NOT ENFORCED
) PARTITIONED BY (`origin`)
with(
'connector'='hudi',
'path' ='hdfs://cluster/hudi/enriched_orders1',
'table.type'='COPY_ON_WRITE', -- MERGE_ON_READ方式在没生成 parquet 文件前,hive不会有输出
'hive_sync.enable'='true', -- required,开启hive同步功能
'hive_sync.mode' = 'hms', -- required, 将hive sync mode设置为hms, 默认jdbc
'hive_sync.metastore.uris' = 'thrift://192.168.1.161:9083', -- required, metastore的端口
'hive_sync.table'='enriched_orders1', -- required, hive 新建的表名
'hive_sync.db'='zhruiqi' , -- required, hive 新建的数据库名
'compaction.async.enabled'='true',
'compaction.trigger.strategy'='num_commits',
'compaction.delta_commits'='3',
'write.tasks' = '4',
'compaction.max_memory'='1024',
'execution.checkpointing.interval'='3000',
'hoodie.clustering.async.enabled' = 'true',
'hoodie.clustering.async.max.commits' = '3',
'hoodie.clustering.preserve.commit.metadata'='true',
'hive_sync.file_format' ='PARQUET',
'write.insert.cluster'='true',
'hive_sync.username'='', --required HMS 用户名
'hive_sync.password'=''
) ;
-- Flink SQL MOR方式入湖
Flink SQL> CREATE TABLE enriched_orders2 (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
product_name STRING,
product_description STRING,
shipment_id INT,
origin STRING,
destination STRING,
is_arrived BOOLEAN,
PRIMARY KEY (order_id) NOT ENFORCED
)
PARTITIONED BY (`origin`)
with(
'connector'='hudi',
'path' ='hdfs://cluster/hudi/enriched_orders2',
'table.type'='MERGE_ON_READ', -- MERGE_ON_READ方式在没生成 parquet 文件前,hive不会有输出
'hive_sync.enable'='true', -- required,开启hive同步功能
'hive_sync.mode' = 'hms', -- required, 将hive sync mode设置为hms, 默认jdbc
'read.streaming.enabled' = 'true',
--'read.streaming.start-commit' = '20210901151206' ,
'read.streaming.check-interval' = '2',
'hive_sync.metastore.uris' = 'thrift://192.168.1.161:9083' ,
'hive_sync.table'='enriched_orders2', -- required, hive 新建的表名
'hive_sync.db'='zhruiqi', -- required, hive 新建的数据库名
'write.tasks'='1',
'compaction.tasks'='1'
) ;
```
## 5. Flink执行sql
```sql
--Use Flink SQL to join the order table with the products and shipments table to enrich orders and write to the Hudi.
-- Flink SQL
Flink SQL> INSERT INTO enriched_orders1
SELECT o.order_id,
o.order_date,
o.customer_name,
o.price,
o.product_id,
o.order_status,
p.name,
p.description,
s.shipment_id,
s.origin,
s.destination,
s.is_arrived
FROM orders AS o
LEFT JOIN products AS p
ON o.product_id = p.id
LEFT JOIN shipments AS s
ON o.order_id = s.order_id ;
Flink SQL> INSERT INTO enriched_orders2
SELECT o.order_id,
o.order_date,
o.customer_name,
o.price,
o.product_id,
o.order_status,
p.name,
p.description,
s.shipment_id,
s.origin,
s.destination,
s.is_arrived
FROM orders AS o
LEFT JOIN products AS p
ON o.product_id = p.id
LEFT JOIN shipments AS s
ON o.order_id = s.order_id ;
```
## 6. 测试
```sql
--Next, do some change in the databases, and then the enriched orders shown in Kibana will be updated after each step in real time.
--Insert a new order in MySQL
INSERT INTO orders
VALUES (default, '2020-07-30 15:22:00', 'Jark', 29.71, 104, false);
--Insert a shipment in MySQL
INSERT INTO shipments
VALUES (default,10004,'Shanghai','Beijing',false);
--Update the order status in MySQL
UPDATE orders SET order_status = true WHERE order_id = 10004;
--Update the shipment status in MySQL
UPDATE shipments SET is_arrived = true WHERE shipment_id = 10004;
--Delete the order in MySQL
DELETE FROM orders WHERE order_id = 10004;
```
## 4. Hive 查询
使用 beeline 查询时需要手动设置:
set hive.input.format = org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;
近期下载者:
相关文件:
收藏者: