kafka_cdc_sqlserver

所属分类:数据库系统
开发工具:HTML
文件大小:9588KB
下载次数:0
上传日期:2020-04-13 20:28:53
上 传 者sh-1993
说明:  kafka_cdc_sql服务器,,
(kafka_cdc_sqlserver,,)

文件列表:
docker-compose.yaml (2934, 2020-04-14)
images (0, 2020-04-14)
images\CDC_SQL_SERVER.png (290361, 2020-04-14)
images\sinkDB.png (88748, 2020-04-14)
images\sourceDB.png (63783, 2020-04-14)
kafka-connect-sqlserver (0, 2020-04-14)
kafka-connect-sqlserver\Dockerfile (278, 2020-04-14)
kafka-connect-sqlserver\confluentinc-kafka-connect-jdbc-5.4.1 (0, 2020-04-14)
kafka-connect-sqlserver\confluentinc-kafka-connect-jdbc-5.4.1\assets (0, 2020-04-14)
kafka-connect-sqlserver\confluentinc-kafka-connect-jdbc-5.4.1\assets\confluent.png (3156, 2020-04-14)
kafka-connect-sqlserver\confluentinc-kafka-connect-jdbc-5.4.1\assets\jdbc.jpg (33609, 2020-04-14)
kafka-connect-sqlserver\confluentinc-kafka-connect-jdbc-5.4.1\doc (0, 2020-04-14)
kafka-connect-sqlserver\confluentinc-kafka-connect-jdbc-5.4.1\doc\LICENSE (7003, 2020-04-14)
kafka-connect-sqlserver\confluentinc-kafka-connect-jdbc-5.4.1\doc\NOTICE (729, 2020-04-14)
kafka-connect-sqlserver\confluentinc-kafka-connect-jdbc-5.4.1\doc\licenses.html (1182, 2020-04-14)
kafka-connect-sqlserver\confluentinc-kafka-connect-jdbc-5.4.1\doc\licenses (0, 2020-04-14)
kafka-connect-sqlserver\confluentinc-kafka-connect-jdbc-5.4.1\doc\licenses\LICENSE-kafka-connect-jdbc-3.2.0-SNAPSHOT.txt (22794, 2020-04-14)
kafka-connect-sqlserver\confluentinc-kafka-connect-jdbc-5.4.1\doc\licenses\LICENSE-postgresql-42.2.10.txt (6891, 2020-04-14)
kafka-connect-sqlserver\confluentinc-kafka-connect-jdbc-5.4.1\doc\licenses\LICENSE-sqlite-jdbc-3.8.11.2.txt (11358, 2020-04-14)
kafka-connect-sqlserver\confluentinc-kafka-connect-jdbc-5.4.1\doc\version.txt (78, 2020-04-14)
kafka-connect-sqlserver\confluentinc-kafka-connect-jdbc-5.4.1\etc (0, 2020-04-14)
kafka-connect-sqlserver\confluentinc-kafka-connect-jdbc-5.4.1\etc\sink-quickstart-sqlite.properties (1440, 2020-04-14)
kafka-connect-sqlserver\confluentinc-kafka-connect-jdbc-5.4.1\etc\source-quickstart-sqlite.properties (1621, 2020-04-14)
kafka-connect-sqlserver\confluentinc-kafka-connect-jdbc-5.4.1\lib (0, 2020-04-14)
kafka-connect-sqlserver\confluentinc-kafka-connect-jdbc-5.4.1\lib\common-utils-5.4.1.jar (17561, 2020-04-14)
kafka-connect-sqlserver\confluentinc-kafka-connect-jdbc-5.4.1\lib\kafka-connect-jdbc-5.4.1.jar (228877, 2020-04-14)
kafka-connect-sqlserver\confluentinc-kafka-connect-jdbc-5.4.1\lib\postgresql-42.2.10.jar (927447, 2020-04-14)
kafka-connect-sqlserver\confluentinc-kafka-connect-jdbc-5.4.1\lib\slf4j-api-1.7.26.jar (41139, 2020-04-14)
kafka-connect-sqlserver\confluentinc-kafka-connect-jdbc-5.4.1\lib\sqlite-jdbc-3.25.2.jar (7064881, 2020-04-14)
kafka-connect-sqlserver\confluentinc-kafka-connect-jdbc-5.4.1\manifest.json (2699, 2020-04-14)
kafka-connect-sqlserver\mssql-jdbc-8.2.0.jre11.jar (1276479, 2020-04-14)

# CDC para SQL Server com Apache Kafka e Docker ## Overview * ## Kafka O Kafka e usado para criar pipelines de dados em tempo real e aplicativos de streaming. E escalavel horizontalmente, tolerante a falhas, extremamente rapido e e executado em producao em milhares de empresas. * ## Debezium E um projeto de codigo aberto, que oferece varios plugins para buscar os dados de um banco de dados. Ele captura alteracoes em seus bancos de dados para que seus aplicativos possam ver e responder a essas alteracoes. O Debezium em uma transacao registra todas as alteracoes em cada tabela do banco de dados. * ## Kafka Connect O Kafka Connect e uma ferramenta para transmitir dados de forma escalavel e confiavel entre o Apache Kafka e outros sistemas de dados. Torna simples a definicao rapida de conectores que movem grandes conjuntos de dados para dentro e para fora do Kafka. O Kafka Connect pode ingerir bancos de dados inteiros ou coletar metricas de todos os servidores de aplicativos para os topicos do Kafka, disponibilizando os dados para o processamento de fluxo com baixa latencia. Um conector de exportacao pode entregar dados de topicos Kafka em indices secundarios, como Elasticsearch ou sistemas em lote, como o Hadoop, para analise offline. # Arquitetura ![Architeture](https://github.com/samuelbraga/kafka_cdc_sqlserver/blob/master/images/CDC_SQL_SERVER.png) # Configurando o CDC para o seu RDS SQL Server Para realizarmos a operacao da captura de dados para o SQL Server. Primeiro temos que ativar esse recurso em nossa tabela do banco de dados. Para o banco de dados do RDS, podemos usar os seguintes comandos: [Configurando CDC para RDS SQL Server](https://github.com/samuelbraga/kafka_cdc_sqlserver/blob/master/https://docs.aws.amazon.com/pt_br/AmazonRDS/latest/UserGuide/Appendix.SQLServer.CommonDBATasks.CDC.html) ``` USE __DATABASE_SOURCE__ EXEC msdb.dbo.rds_cdc_enable_db '__DATABASE_SOURCE__' EXEC sys.sp_cdc_enable_table @source_schema = N'dbo', @source_name = N'__TABLE__', @role_name = NULL, @supports_net_changes = 1 EXEC sys.sp_cdc_change_job @job_type = 'capture', @pollinginterval = 300 GO ``` # Configurando imagem docker para a realizacao do sink com JDBC Para a replicacao dos dados vamos utilziar um plugin de conexao JDBC (Java Database Connectivity) sink. Dessa forma, vamos criar um `Dockerfile` baseado na imagem do Debezium e adicionar esse plugin. Vamos adicionar o driver de conexao do SQL Server tambem. Dessa forma baixe e estraia o JDBC plugin no diretorio do `Dockerfile` Baixe e estraia o drive do SQL Server tambem ``` FROM debezium/connect:1.0 COPY confluentinc-kafka-connect-jdbc-5.4.1/lib/* /kafka/connect/kafka-connect-jdbc/ COPY confluentinc-kafka-connect-jdbc-5.4.1/etc/* /etc/ COPY mssql-jdbc-8.2.0.jre11.jar /kafka/connect/kafka-connect-jdbc/ ``` * ## Observarcoes para aplicacao de melhores praticas Para o desenvolvimento das blue prints, sera necessario o armazenamento das libs do kafka connect jdbc e driver de conexao do jdbc para sql server em um bucket s3 ou similiar. Assim, conseguiremos ter nosso repositorio de dependencias bem estabelecido e controlado. Vamos, dessa forma, alterar o `Dockerfile` para que realize o download dessas libs no momento de build, evitando a realizacao das operacoes de `COPY` para o mesmo * ## Porque nao utilizamos a imagem confluentinc/cp-kafka-connect Para esse cenario de extracao e replicacao para sql server, tivemos muitas dificuldades no setup dessa imagem. Ademais, com o setup dessa imagem nao conseguimos realizar a operacao de delecao, o erro era ocaicionado no momento de transforcao dos dados para a realizacao do sink. Utilizamos os seguintes transformadores: `Flatten e ValueToKey`, ambos apresentaram erros na operacao de delecao. Dessa forma, procurarmos alternativas para a utilizacao do transformador nativo do Debezium, entao definimos como transformador o `unwrap` utilizando a classe `io.debezium.transforms.ExtractNewRecordState`. Portanto, precisavamos da aplicacao core do debezium rodando, entao, entendemos que a melhor saida seria utilizar a imagem base do Debezium e configurar o plugin do jdbc sink juntamente com o driver de conexao jdbc do sql server. Porem, o metodo e delecao que conseguimos foi o de rewrite. Esse metodo ira criar uma coluna (`__deleted`) de controle da linha. Quando e feito uma insercao no banco de dados de origem e atribuido o valor `false` a essa coluna no banco de dados de destino, quando a linha e removida no banco de origem e atribuido `true` a essa coluna no banco de dados de destino. Existe um metodo de delecao chamado `drop`. Caso a linha fosse removida no banco de dados ed origem, ela deveria excluir a mesma linha no banco de dados de destino. Porem nos nossos experiementos nao houve esse comportamento, proem nao foi aprsentando nenhum erro o que inviabilizou qualquer analise para a correcao do mesmo. E possivel visualizar o resultado no final deste documento. [unwrap](https://github.com/samuelbraga/kafka_cdc_sqlserver/blob/master/https://debezium.io/documentation/reference/1.1/configuration/event-flattening.html) [Flatten e ValueToKey](https://github.com/samuelbraga/kafka_cdc_sqlserver/blob/master/https://docs.confluent.io/current/connect/transforms/index.html) * ## Tempo de replicacao E importante resaltar que o tempo de replicacao pode chegar ser maior que **6 minutos**. Pois o Debezium realizando a coleta da alteracoes eventuais no banco de dados (source), pode demorar ate **1 minuto** para registrar as alteracoes no topico do `Kafka` Equanto o Debezium (sink), pode demoarar ate **5 minutos** para realizar a leitura nesse topico. # Criando ambiente com docker compose ``` version: '3' services: zookeeper: image: zookeeper:3.5.7 ports: - '2181:2181' schema_registry: image: confluentinc/cp-schema-registry:5.4.1 links: - kafka:kafaka ports: - 8081:8081 environment: - SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS=kafka:9092 - SCHEMA_REGISTRY_HOST_NAME=localhost - SCHEMA_REGISTRY_LISTENERS=http://0.0.0.0:8081 kafka: image: wurstmeister/kafka:2.12-2.4.1 ports: - '9092:9092' - '9093:9093' - '9094:9094' links: - zookeeper:zookeeper environment: KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_CREATE_TOPICS: source:1:1,sourceOffset:1:1:compact,sourceStatuses:1:1,sink:1:1,sinkOffset:1:1:compact,sinkStatuses:1:1 KAFKA_LISTENERS: INSIDE://:9092,PLAINTEXT://:9093,OUTSIDE://:9094 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' source_connector: image: debezium/connect:1.0 ports: - '8083:8083' links: - zookeeper:zookeeper - kafka:kafka - schema_registry:schema_registry environment: BOOTSTRAP_SERVERS: kafka:9092 GROUP_ID: 1 CONFIG_STORAGE_TOPIC: source OFFSET_STORAGE_TOPIC: sourceOffset STATUS_STORAGE_TOPIC: sourceStatuses KEY_CONVERTER: io.confluent.connect.avro.AvroConverter VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schema_registry:8081 CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema_registry:8081 sink_connector: build: ./kafka-connect-sqlserver links: - kafka:kafka - schema_registry:schema_registry ports: - 28083:28083 environment: BOOTSTRAP_SERVERS: kafka:9092 REST_PORT: 28083 GROUP_ID: 2 CONFIG_STORAGE_TOPIC: sink OFFSET_STORAGE_TOPIC: sinkOffset STATUS_STORAGE_TOPIC: sinkStatuses CONFIG_STORAGE_REPLICATION_FACTOR: 3 OFFSET_STORAGE_REPLICATION_FACTOR: 3 STATUS_STORAGE_REPLICATION_FACTOR: 3 KEY_CONVERTER: io.confluent.connect.avro.AvroConverter VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schema_registry:8081 CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema_registry:8081 KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schema_registry:8081 VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema_registry:8081 INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter CONNECT_PLUGIN_PATH: /kafka/connect/ ``` * #### Zookeeper O zookeeper, sera responsavel pela orquestracao das nossas aplicacaoes em Java * #### Schema Registry O schema registry, sera resposnsavel pela comunicacao saudavel entre nossos conectors de source e sink * #### Kafka O Kafka server, ficara responsavel por centralizar os topicos e dar mais flexibilidade e escalabilidade aos nossos conectors * #### Source Connector O Source connector (Debezium), sera responsavel por capturar as alteracoes eventuais do nosso banco de dados. * #### Sink Connector O Sink Connector (JDBC), sera responsavel por replicar essas alteracoes eventuais para o banco de dados de destino. # Criando os conectors por chamadas REST do Kafka Para criar os connectors basta executar as seguintes chamadas REST * ## Sink #### OBS: A Database de destino precisa estar criada no SQL Server de destino ``` curl -i -X POST \ -H "Accept:application/json" \ -H "Content-Type:application/json" \ localhost:28083/connectors/ -d '{ "name": "__SINK_NAME__", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "connection.url": "jdbc:sqlserver://__JDBC_CONNECTION__", "table.name.format": "__TABLE_SINK__", "tasks.max": 1, "auto.create": "true", "auto.evolve": "true", "pk.mode": "record_key", "insert.mode": "upsert", "pk.fields": "id", "key.converter": "io.confluent.connect.avro.AvroConverter", "value.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "__SCHEMA_REGISTRY__", "value.converter.schema.registry.url": "__SCHEMA_REGISTRY__", "delete.enabled": "true", "topics": "__SQL_SERVER_NAME__.dbo.__TABLE_SOURCE__", "transforms" : "unwrap", "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", "transforms.unwrap.drop.tombstone": "true", "transforms.unwrap.delete.handling.mode": "drop" } }' ``` * ## Source #### OBS: Para o Source funcionar e necessario a existencia de dados no SQL Server ``` curl -i -X POST \ -H "Accept:application/json" \ -H "Content-Type:application/json" \ localhost:8083/connectors/ -d '{ "name": "__SOURCE_NAME__", "config": { "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector", "database.hostname": "__DB_HOSTNAME__", "database.port": "__DB_PORT__", "database.user": "__DB_USERNAME__", "database.password": "__DB_PASSWORD__", "database.dbname": "__DB_DATABASE__", "database.server.name": "__SQL_SERVER_NAME__", "table.whitelist": "dbo.__DB_TABLE__", "tombstones.on.delete": "true", "database.history.kafka.bootstrap.servers": "__KAFKA_HOST__:__KAFKA_PORT__", "database.history.kafka.topic": "dbhistory.__SQL_SERVER_NAME__" } }' ``` # Resultado na tabela Apos feito o sincronismo das alteracoes temos as tabelas nos seguintes formatos * Todos os campos inseridos na origem foram replicados para o destino * Todos os campos atualizados na origem foram replicados para o destino * Todos os campos removidos na origem tiveram o campo __deleted atualizados no destino * A replicacao pode demorar ate 6 minutos ![Source](https://github.com/samuelbraga/kafka_cdc_sqlserver/blob/master/images/sourceDB.png) ![Sink](https://github.com/samuelbraga/kafka_cdc_sqlserver/blob/master/images/sinkDB.png)

近期下载者

相关文件


收藏者