kafka_cdc_sqlserver
所属分类:数据库系统
开发工具:HTML
文件大小:9588KB
下载次数:0
上传日期:2020-04-13 20:28:53
上 传 者:
sh-1993
说明: kafka_cdc_sql服务器,,
(kafka_cdc_sqlserver,,)
文件列表:
docker-compose.yaml (2934, 2020-04-14)
images (0, 2020-04-14)
images\CDC_SQL_SERVER.png (290361, 2020-04-14)
images\sinkDB.png (88748, 2020-04-14)
images\sourceDB.png (63783, 2020-04-14)
kafka-connect-sqlserver (0, 2020-04-14)
kafka-connect-sqlserver\Dockerfile (278, 2020-04-14)
kafka-connect-sqlserver\confluentinc-kafka-connect-jdbc-5.4.1 (0, 2020-04-14)
kafka-connect-sqlserver\confluentinc-kafka-connect-jdbc-5.4.1\assets (0, 2020-04-14)
kafka-connect-sqlserver\confluentinc-kafka-connect-jdbc-5.4.1\assets\confluent.png (3156, 2020-04-14)
kafka-connect-sqlserver\confluentinc-kafka-connect-jdbc-5.4.1\assets\jdbc.jpg (33609, 2020-04-14)
kafka-connect-sqlserver\confluentinc-kafka-connect-jdbc-5.4.1\doc (0, 2020-04-14)
kafka-connect-sqlserver\confluentinc-kafka-connect-jdbc-5.4.1\doc\LICENSE (7003, 2020-04-14)
kafka-connect-sqlserver\confluentinc-kafka-connect-jdbc-5.4.1\doc\NOTICE (729, 2020-04-14)
kafka-connect-sqlserver\confluentinc-kafka-connect-jdbc-5.4.1\doc\licenses.html (1182, 2020-04-14)
kafka-connect-sqlserver\confluentinc-kafka-connect-jdbc-5.4.1\doc\licenses (0, 2020-04-14)
kafka-connect-sqlserver\confluentinc-kafka-connect-jdbc-5.4.1\doc\licenses\LICENSE-kafka-connect-jdbc-3.2.0-SNAPSHOT.txt (22794, 2020-04-14)
kafka-connect-sqlserver\confluentinc-kafka-connect-jdbc-5.4.1\doc\licenses\LICENSE-postgresql-42.2.10.txt (6891, 2020-04-14)
kafka-connect-sqlserver\confluentinc-kafka-connect-jdbc-5.4.1\doc\licenses\LICENSE-sqlite-jdbc-3.8.11.2.txt (11358, 2020-04-14)
kafka-connect-sqlserver\confluentinc-kafka-connect-jdbc-5.4.1\doc\version.txt (78, 2020-04-14)
kafka-connect-sqlserver\confluentinc-kafka-connect-jdbc-5.4.1\etc (0, 2020-04-14)
kafka-connect-sqlserver\confluentinc-kafka-connect-jdbc-5.4.1\etc\sink-quickstart-sqlite.properties (1440, 2020-04-14)
kafka-connect-sqlserver\confluentinc-kafka-connect-jdbc-5.4.1\etc\source-quickstart-sqlite.properties (1621, 2020-04-14)
kafka-connect-sqlserver\confluentinc-kafka-connect-jdbc-5.4.1\lib (0, 2020-04-14)
kafka-connect-sqlserver\confluentinc-kafka-connect-jdbc-5.4.1\lib\common-utils-5.4.1.jar (17561, 2020-04-14)
kafka-connect-sqlserver\confluentinc-kafka-connect-jdbc-5.4.1\lib\kafka-connect-jdbc-5.4.1.jar (228877, 2020-04-14)
kafka-connect-sqlserver\confluentinc-kafka-connect-jdbc-5.4.1\lib\postgresql-42.2.10.jar (927447, 2020-04-14)
kafka-connect-sqlserver\confluentinc-kafka-connect-jdbc-5.4.1\lib\slf4j-api-1.7.26.jar (41139, 2020-04-14)
kafka-connect-sqlserver\confluentinc-kafka-connect-jdbc-5.4.1\lib\sqlite-jdbc-3.25.2.jar (7064881, 2020-04-14)
kafka-connect-sqlserver\confluentinc-kafka-connect-jdbc-5.4.1\manifest.json (2699, 2020-04-14)
kafka-connect-sqlserver\mssql-jdbc-8.2.0.jre11.jar (1276479, 2020-04-14)
# CDC para SQL Server com Apache Kafka e Docker
## Overview
* ## Kafka
O Kafka e usado para criar pipelines de dados em tempo real e aplicativos de streaming. E escalavel horizontalmente, tolerante a falhas, extremamente rapido e e executado em producao em milhares de empresas.
* ## Debezium
E um projeto de codigo aberto, que oferece varios plugins para buscar os dados de um banco de dados. Ele captura alteracoes em seus bancos de dados para que seus aplicativos possam ver e responder a essas alteracoes. O Debezium em uma transacao registra todas as alteracoes em cada tabela do banco de dados.
* ## Kafka Connect
O Kafka Connect e uma ferramenta para transmitir dados de forma escalavel e confiavel entre o Apache Kafka e outros sistemas de dados. Torna simples a definicao rapida de conectores que movem grandes conjuntos de dados para dentro e para fora do Kafka. O Kafka Connect pode ingerir bancos de dados inteiros ou coletar metricas de todos os servidores de aplicativos para os topicos do Kafka, disponibilizando os dados para o processamento de fluxo com baixa latencia. Um conector de exportacao pode entregar dados de topicos Kafka em indices secundarios, como Elasticsearch ou sistemas em lote, como o Hadoop, para analise offline.
# Arquitetura
![Architeture](https://github.com/samuelbraga/kafka_cdc_sqlserver/blob/master/images/CDC_SQL_SERVER.png)
# Configurando o CDC para o seu RDS SQL Server
Para realizarmos a operacao da captura de dados para o SQL Server. Primeiro temos que ativar esse recurso em nossa tabela do banco de dados. Para o banco de dados do RDS, podemos usar os seguintes comandos:
[Configurando CDC para RDS SQL Server](https://github.com/samuelbraga/kafka_cdc_sqlserver/blob/master/https://docs.aws.amazon.com/pt_br/AmazonRDS/latest/UserGuide/Appendix.SQLServer.CommonDBATasks.CDC.html)
```
USE __DATABASE_SOURCE__
EXEC msdb.dbo.rds_cdc_enable_db '__DATABASE_SOURCE__'
EXEC sys.sp_cdc_enable_table @source_schema = N'dbo', @source_name = N'__TABLE__', @role_name = NULL, @supports_net_changes = 1
EXEC sys.sp_cdc_change_job @job_type = 'capture', @pollinginterval = 300
GO
```
# Configurando imagem docker para a realizacao do sink com JDBC
Para a replicacao dos dados vamos utilziar um plugin de conexao JDBC (Java Database Connectivity) sink. Dessa forma, vamos criar um `Dockerfile` baseado na imagem do Debezium e adicionar esse plugin. Vamos adicionar o driver de conexao do SQL Server tambem.
Dessa forma baixe e estraia o JDBC plugin no diretorio do `Dockerfile`
Baixe e estraia o drive do SQL Server tambem
```
FROM debezium/connect:1.0
COPY confluentinc-kafka-connect-jdbc-5.4.1/lib/* /kafka/connect/kafka-connect-jdbc/
COPY confluentinc-kafka-connect-jdbc-5.4.1/etc/* /etc/
COPY mssql-jdbc-8.2.0.jre11.jar /kafka/connect/kafka-connect-jdbc/
```
* ## Observarcoes para aplicacao de melhores praticas
Para o desenvolvimento das blue prints, sera necessario o armazenamento das libs do kafka connect jdbc e driver de conexao do jdbc para sql server em um bucket s3 ou similiar. Assim, conseguiremos ter nosso repositorio de dependencias bem estabelecido e controlado. Vamos, dessa forma, alterar o `Dockerfile` para que realize o download dessas libs no momento de build, evitando a realizacao das operacoes de `COPY` para o mesmo
* ## Porque nao utilizamos a imagem confluentinc/cp-kafka-connect
Para esse cenario de extracao e replicacao para sql server, tivemos muitas dificuldades no setup dessa imagem. Ademais, com o setup dessa imagem nao conseguimos realizar a operacao de delecao, o erro era ocaicionado no momento de transforcao dos dados para a realizacao do sink. Utilizamos os seguintes transformadores: `Flatten e ValueToKey`, ambos apresentaram erros na operacao de delecao.
Dessa forma, procurarmos alternativas para a utilizacao do transformador nativo do Debezium, entao definimos como transformador o `unwrap` utilizando a classe `io.debezium.transforms.ExtractNewRecordState`.
Portanto, precisavamos da aplicacao core do debezium rodando, entao, entendemos que a melhor saida seria utilizar a imagem base do Debezium e configurar o plugin do jdbc sink juntamente com o driver de conexao jdbc do sql server.
Porem, o metodo e delecao que conseguimos foi o de rewrite. Esse metodo ira criar uma coluna (`__deleted`) de controle da linha. Quando e feito uma insercao no banco de dados de origem e atribuido o valor `false` a essa coluna no banco de dados de destino, quando a linha e removida no banco de origem e atribuido `true` a essa coluna no banco de dados de destino.
Existe um metodo de delecao chamado `drop`. Caso a linha fosse removida no banco de dados ed origem, ela deveria excluir a mesma linha no banco de dados de destino. Porem nos nossos experiementos nao houve esse comportamento, proem nao foi aprsentando nenhum erro o que inviabilizou qualquer analise para a correcao do mesmo.
E possivel visualizar o resultado no final deste documento.
[unwrap](https://github.com/samuelbraga/kafka_cdc_sqlserver/blob/master/https://debezium.io/documentation/reference/1.1/configuration/event-flattening.html)
[Flatten e ValueToKey](https://github.com/samuelbraga/kafka_cdc_sqlserver/blob/master/https://docs.confluent.io/current/connect/transforms/index.html)
* ## Tempo de replicacao
E importante resaltar que o tempo de replicacao pode chegar ser maior que **6 minutos**.
Pois o Debezium realizando a coleta da alteracoes eventuais no banco de dados (source), pode demorar ate **1 minuto** para registrar as alteracoes no topico do `Kafka`
Equanto o Debezium (sink), pode demoarar ate **5 minutos** para realizar a leitura nesse topico.
# Criando ambiente com docker compose
```
version: '3'
services:
zookeeper:
image: zookeeper:3.5.7
ports:
- '2181:2181'
schema_registry:
image: confluentinc/cp-schema-registry:5.4.1
links:
- kafka:kafaka
ports:
- 8081:8081
environment:
- SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS=kafka:9092
- SCHEMA_REGISTRY_HOST_NAME=localhost
- SCHEMA_REGISTRY_LISTENERS=http://0.0.0.0:8081
kafka:
image: wurstmeister/kafka:2.12-2.4.1
ports:
- '9092:9092'
- '9093:9093'
- '9094:9094'
links:
- zookeeper:zookeeper
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_CREATE_TOPICS: source:1:1,sourceOffset:1:1:compact,sourceStatuses:1:1,sink:1:1,sinkOffset:1:1:compact,sinkStatuses:1:1
KAFKA_LISTENERS: INSIDE://:9092,PLAINTEXT://:9093,OUTSIDE://:9094
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
source_connector:
image: debezium/connect:1.0
ports:
- '8083:8083'
links:
- zookeeper:zookeeper
- kafka:kafka
- schema_registry:schema_registry
environment:
BOOTSTRAP_SERVERS: kafka:9092
GROUP_ID: 1
CONFIG_STORAGE_TOPIC: source
OFFSET_STORAGE_TOPIC: sourceOffset
STATUS_STORAGE_TOPIC: sourceStatuses
KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schema_registry:8081
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema_registry:8081
sink_connector:
build: ./kafka-connect-sqlserver
links:
- kafka:kafka
- schema_registry:schema_registry
ports:
- 28083:28083
environment:
BOOTSTRAP_SERVERS: kafka:9092
REST_PORT: 28083
GROUP_ID: 2
CONFIG_STORAGE_TOPIC: sink
OFFSET_STORAGE_TOPIC: sinkOffset
STATUS_STORAGE_TOPIC: sinkStatuses
CONFIG_STORAGE_REPLICATION_FACTOR: 3
OFFSET_STORAGE_REPLICATION_FACTOR: 3
STATUS_STORAGE_REPLICATION_FACTOR: 3
KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schema_registry:8081
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema_registry:8081
KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schema_registry:8081
VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema_registry:8081
INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_PLUGIN_PATH: /kafka/connect/
```
* #### Zookeeper
O zookeeper, sera responsavel pela orquestracao das nossas aplicacaoes em Java
* #### Schema Registry
O schema registry, sera resposnsavel pela comunicacao saudavel entre nossos conectors de source e sink
* #### Kafka
O Kafka server, ficara responsavel por centralizar os topicos e dar mais flexibilidade e escalabilidade aos nossos conectors
* #### Source Connector
O Source connector (Debezium), sera responsavel por capturar as alteracoes eventuais do nosso banco de dados.
* #### Sink Connector
O Sink Connector (JDBC), sera responsavel por replicar essas alteracoes eventuais para o banco de dados de destino.
# Criando os conectors por chamadas REST do Kafka
Para criar os connectors basta executar as seguintes chamadas REST
* ## Sink
#### OBS: A Database de destino precisa estar criada no SQL Server de destino
```
curl -i -X POST \
-H "Accept:application/json" \
-H "Content-Type:application/json" \
localhost:28083/connectors/ -d '{
"name": "__SINK_NAME__",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:sqlserver://__JDBC_CONNECTION__",
"table.name.format": "__TABLE_SINK__",
"tasks.max": 1,
"auto.create": "true",
"auto.evolve": "true",
"pk.mode": "record_key",
"insert.mode": "upsert",
"pk.fields": "id",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "__SCHEMA_REGISTRY__",
"value.converter.schema.registry.url": "__SCHEMA_REGISTRY__",
"delete.enabled": "true",
"topics": "__SQL_SERVER_NAME__.dbo.__TABLE_SOURCE__",
"transforms" : "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstone": "true",
"transforms.unwrap.delete.handling.mode": "drop"
}
}'
```
* ## Source
#### OBS: Para o Source funcionar e necessario a existencia de dados no SQL Server
```
curl -i -X POST \
-H "Accept:application/json" \
-H "Content-Type:application/json" \
localhost:8083/connectors/ -d '{
"name": "__SOURCE_NAME__",
"config": {
"connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
"database.hostname": "__DB_HOSTNAME__",
"database.port": "__DB_PORT__",
"database.user": "__DB_USERNAME__",
"database.password": "__DB_PASSWORD__",
"database.dbname": "__DB_DATABASE__",
"database.server.name": "__SQL_SERVER_NAME__",
"table.whitelist": "dbo.__DB_TABLE__",
"tombstones.on.delete": "true",
"database.history.kafka.bootstrap.servers": "__KAFKA_HOST__:__KAFKA_PORT__",
"database.history.kafka.topic": "dbhistory.__SQL_SERVER_NAME__"
}
}'
```
# Resultado na tabela
Apos feito o sincronismo das alteracoes temos as tabelas nos seguintes formatos
* Todos os campos inseridos na origem foram replicados para o destino
* Todos os campos atualizados na origem foram replicados para o destino
* Todos os campos removidos na origem tiveram o campo __deleted atualizados no destino
* A replicacao pode demorar ate 6 minutos
![Source](https://github.com/samuelbraga/kafka_cdc_sqlserver/blob/master/images/sourceDB.png) ![Sink](https://github.com/samuelbraga/kafka_cdc_sqlserver/blob/master/images/sinkDB.png)
近期下载者:
相关文件:
收藏者: