KafkaCDC

所属分类:中间件编程
开发工具:C#
文件大小:0KB
下载次数:0
上传日期:2022-11-21 06:56:08
上 传 者sh-1993
说明:  卡夫卡疾控中心,,
(KafkaCDC,,)

文件列表:
.dockerignore (316, 2022-11-20)
KafkaCDC.sln (3079, 2022-11-20)
docker-compose.yml (2594, 2022-11-20)
postman/ (0, 2022-11-20)
postman/Kafka CDC.postman_collection.json (8624, 2022-11-20)
src/ (0, 2022-11-20)
src/KafkaCDC.Common/ (0, 2022-11-20)
src/KafkaCDC.Common/Entities/ (0, 2022-11-20)
src/KafkaCDC.Common/Entities/BaseEntity.cs (128, 2022-11-20)
src/KafkaCDC.Common/Entities/Outbox.cs (349, 2022-11-20)
src/KafkaCDC.Common/Kafka/ (0, 2022-11-20)
src/KafkaCDC.Common/Kafka/BackgroungConsumer.cs (1975, 2022-11-20)
src/KafkaCDC.Common/Kafka/IKafkaHandler.cs (273, 2022-11-20)
src/KafkaCDC.Common/Kafka/KafkaConsumerConfig.cs (346, 2022-11-20)
src/KafkaCDC.Common/Kafka/KafkaDeserializer.cs (648, 2022-11-20)
src/KafkaCDC.Common/Kafka/KafkaRegistration.cs (671, 2022-11-20)
src/KafkaCDC.Common/KafkaCDC.Common.csproj (1881, 2022-11-20)
src/KafkaCDC.Common/MigrationManager.cs (561, 2022-11-20)
src/KafkaCDC.Deals/ (0, 2022-11-20)
src/KafkaCDC.Deals/Comands/ (0, 2022-11-20)
src/KafkaCDC.Deals/Comands/AddDealCommand.cs (576, 2022-11-20)
src/KafkaCDC.Deals/Comands/Handlers/ (0, 2022-11-20)
src/KafkaCDC.Deals/Comands/Handlers/AddDealCommandHandler.cs (2494, 2022-11-20)
src/KafkaCDC.Deals/Comands/Handlers/UpdateDealPriceCommandHandler.cs (1479, 2022-11-20)
src/KafkaCDC.Deals/Comands/UpdateDealPriceCommand.cs (253, 2022-11-20)
src/KafkaCDC.Deals/Controllers/ (0, 2022-11-20)
src/KafkaCDC.Deals/Controllers/DealController.cs (704, 2022-11-20)
src/KafkaCDC.Deals/Data/ (0, 2022-11-20)
src/KafkaCDC.Deals/Data/DealDbContext.cs (942, 2022-11-20)
src/KafkaCDC.Deals/Data/Migration/ (0, 2022-11-20)
src/KafkaCDC.Deals/Data/Migration/20220523130527_Initial.Designer.cs (3265, 2022-11-20)
src/KafkaCDC.Deals/Data/Migration/20220523130527_Initial.cs (2688, 2022-11-20)
src/KafkaCDC.Deals/Data/Migration/20220524131232_UpdateEnumToString.Designer.cs (3285, 2022-11-20)
src/KafkaCDC.Deals/Data/Migration/20220524131232_UpdateEnumToString.cs (1577, 2022-11-20)
src/KafkaCDC.Deals/Data/Migration/DealDbContextModelSnapshot.cs (3202, 2022-11-20)
src/KafkaCDC.Deals/Dockerfile (803, 2022-11-20)
src/KafkaCDC.Deals/Domain/ (0, 2022-11-20)
... ...

# KafkaCDC # start with: docker-compose up ## Seq http://localhost:5555/#/events ![image](https://user-images.githubusercontent.com/46414904/202564011-5e87c5f5-464f-4ada-a439-179113330269.png) ## Confluent Kafka ![image](https://user-images.githubusercontent.com/46414904/202567766-433238fc-1444-45c0-a360-0c92297ef9a4.png) # run 3 services in VS: ## KafkaCDC.Deals ![image](https://user-images.githubusercontent.com/46414904/202930943-1196bf41-ea77-4047-8f2b-7240c1dd8b88.png) ## KafkaCDC.Traders ![image](https://user-images.githubusercontent.com/46414904/202931044-dbccb6ed-1e95-4c66-a4b7-75c82a824655.png) ## KafkaCDC.Notifications ![image](https://user-images.githubusercontent.com/46414904/202932789-52f385f0-be67-4c13-8630-4121d124bc5c.png) ## postman requests - Create new trader: ```json { "email": "tyschenk90@gmail.com", "firstName": "ira", "lastName": "tysh", "address": "home", "phoneNumber": "09721121", "birthDate": "2022-05-24T13:22:25.667Z", "gender": "female" } ``` Record will be added to traders table as well as to outbox table. ![image](https://user-images.githubusercontent.com/46414904/202932939-c4586215-3fdb-4f82-af58-03b409aa6c6a.png) ![image](https://user-images.githubusercontent.com/46414904/202932953-c963df4c-5adf-42e8-bbea-29fd43bee76a.png) -Create new deal ```json { "id":"6fa85f64-5717-4562-b3fc-2c963f66afa6", "shortName": "swr576skj6", "dealType": "Equity", "dealStatus": "Open", "amount": 12, "initialPriceRangeLow": 10, "initialPriceRangeHigh": 13, "revisedPriceRangeLow": 14, "revisedPriceRangeHigh": 15 } ``` When new deal is added in the same time record wis added to outbox table: ![image](https://user-images.githubusercontent.com/46414904/202933024-6bfb8bd8-b665-46a1-9029-d13d5512bed2.png) - Create subscrition of trader to deal: ```json { "traderId": "f14490e4-0410-4d44-a2c5-9274167ceb2a", "dealId": "7d5f0271-37c3-4586-9481-6e7f302ee405" } ``` New subscription will be added to the database and dealSubscriptions event will be written to outbox table. ![image](https://user-images.githubusercontent.com/46414904/202933149-3dc84fa5-8996-4c33-a538-3bca7bd08188.png) ## Debezium connector configuration: configuration for deals service will be configured by calling ### PUT http://localhost:8083/connectors/outbox-connector/config ```json { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "tasks.max": "1", "database.hostname": "postgres", "database.port": "5432", "database.user": "postgres", "database.password": "root", "database.dbname": "kafkacdc.deals", "database.server.name": "postgres", "schema.include.list": "public", "table.include.list": "public.OutboxEvents", "tombstones.on.delete": "false", "transforms": "outbox", "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter", "slot.name": "debezium10", "transforms.outbox.table.field.event.id": "Id", "transforms.outbox.table.field.event.key": "AggregateId", "transforms.outbox.table.field.event.payload": "Payload", "transforms.outbox.route.by.field": "AggregateType", "transforms.outbox.route.topic.replacement": "${routedByValue}.events", "transforms.outbox.table.fields.additional.placement": "Type:header:eventType", "transforms.outbox.debezium.expand.json.payload": "true", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": false, "plugin.name": "pgoutput" } ``` configuration for traders service will be configured by calling ### PUT http://localhost:8083/connectors/outbox-connector/config ```json { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "tasks.max": "1", "database.hostname": "postgres", "database.port": "5432", "database.user": "postgres", "database.password": "root", "database.dbname": "kafkacdc.traders", "database.server.name": "postgres", "schema.include.list": "public", "table.include.list": "public.OutboxEvents", "tombstones.on.delete": "false", "transforms": "outbox", "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter", "transforms.outbox.table.field.event.id": "Id", "transforms.outbox.table.field.event.key": "AggregateId", "transforms.outbox.table.field.event.payload": "Payload", "transforms.outbox.route.by.field": "AggregateType", "transforms.outbox.route.topic.replacement": "${routedByValue}.events", "slot.name": "debezium11", "transforms.outbox.table.fields.additional.placement": "Type:header:eventType", "transforms.outbox.debezium.expand.json.payload": "true", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": false, "plugin.name": "pgoutput" } ``` ## status of connectors: http://localhost:8083/connectors?expand=info&expand=status Debezium connector now will stream events to subscribers from outbox tables. First it will create topics in kafka and stream payload to appropriate topic based on configuration defined above. ![image](https://user-images.githubusercontent.com/46414904/202933343-dde2dbc6-58e8-4e56-888b-f6a18adc3943.png) Notification service subscribed to traders.events topic and will send welcomw email to a new trader in system. ![image](https://user-images.githubusercontent.com/46414904/202983856-8e8816d9-8061-467d-817a-a8a182af078b.png) In case of deal price change: ```json { "id": "48a716d1-20bc-418b-8507-feaf95282a46", "priceLow": 12, "priceHigh": 23 } ``` The information about chnage will be recorded to outbox table and all subscribed traders will receaive an email with price update: ![image](https://user-images.githubusercontent.com/46414904/202984272-1b9c03fa-631e-4089-bdc3-057cf4931941.png)

近期下载者

相关文件


收藏者