KafkaCDC
所属分类:中间件编程
开发工具:C#
文件大小:0KB
下载次数:0
上传日期:2022-11-21 06:56:08
上 传 者:
sh-1993
说明: 卡夫卡疾控中心,,
(KafkaCDC,,)
文件列表:
.dockerignore (316, 2022-11-20)
KafkaCDC.sln (3079, 2022-11-20)
docker-compose.yml (2594, 2022-11-20)
postman/ (0, 2022-11-20)
postman/Kafka CDC.postman_collection.json (8624, 2022-11-20)
src/ (0, 2022-11-20)
src/KafkaCDC.Common/ (0, 2022-11-20)
src/KafkaCDC.Common/Entities/ (0, 2022-11-20)
src/KafkaCDC.Common/Entities/BaseEntity.cs (128, 2022-11-20)
src/KafkaCDC.Common/Entities/Outbox.cs (349, 2022-11-20)
src/KafkaCDC.Common/Kafka/ (0, 2022-11-20)
src/KafkaCDC.Common/Kafka/BackgroungConsumer.cs (1975, 2022-11-20)
src/KafkaCDC.Common/Kafka/IKafkaHandler.cs (273, 2022-11-20)
src/KafkaCDC.Common/Kafka/KafkaConsumerConfig.cs (346, 2022-11-20)
src/KafkaCDC.Common/Kafka/KafkaDeserializer.cs (648, 2022-11-20)
src/KafkaCDC.Common/Kafka/KafkaRegistration.cs (671, 2022-11-20)
src/KafkaCDC.Common/KafkaCDC.Common.csproj (1881, 2022-11-20)
src/KafkaCDC.Common/MigrationManager.cs (561, 2022-11-20)
src/KafkaCDC.Deals/ (0, 2022-11-20)
src/KafkaCDC.Deals/Comands/ (0, 2022-11-20)
src/KafkaCDC.Deals/Comands/AddDealCommand.cs (576, 2022-11-20)
src/KafkaCDC.Deals/Comands/Handlers/ (0, 2022-11-20)
src/KafkaCDC.Deals/Comands/Handlers/AddDealCommandHandler.cs (2494, 2022-11-20)
src/KafkaCDC.Deals/Comands/Handlers/UpdateDealPriceCommandHandler.cs (1479, 2022-11-20)
src/KafkaCDC.Deals/Comands/UpdateDealPriceCommand.cs (253, 2022-11-20)
src/KafkaCDC.Deals/Controllers/ (0, 2022-11-20)
src/KafkaCDC.Deals/Controllers/DealController.cs (704, 2022-11-20)
src/KafkaCDC.Deals/Data/ (0, 2022-11-20)
src/KafkaCDC.Deals/Data/DealDbContext.cs (942, 2022-11-20)
src/KafkaCDC.Deals/Data/Migration/ (0, 2022-11-20)
src/KafkaCDC.Deals/Data/Migration/20220523130527_Initial.Designer.cs (3265, 2022-11-20)
src/KafkaCDC.Deals/Data/Migration/20220523130527_Initial.cs (2688, 2022-11-20)
src/KafkaCDC.Deals/Data/Migration/20220524131232_UpdateEnumToString.Designer.cs (3285, 2022-11-20)
src/KafkaCDC.Deals/Data/Migration/20220524131232_UpdateEnumToString.cs (1577, 2022-11-20)
src/KafkaCDC.Deals/Data/Migration/DealDbContextModelSnapshot.cs (3202, 2022-11-20)
src/KafkaCDC.Deals/Dockerfile (803, 2022-11-20)
src/KafkaCDC.Deals/Domain/ (0, 2022-11-20)
... ...
# KafkaCDC
# start with:
docker-compose up
## Seq
http://localhost:5555/#/events
![image](https://user-images.githubusercontent.com/46414904/202564011-5e87c5f5-464f-4ada-a439-179113330269.png)
## Confluent Kafka
![image](https://user-images.githubusercontent.com/46414904/202567766-433238fc-1444-45c0-a360-0c92297ef9a4.png)
# run 3 services in VS:
## KafkaCDC.Deals
![image](https://user-images.githubusercontent.com/46414904/202930943-1196bf41-ea77-4047-8f2b-7240c1dd8b88.png)
## KafkaCDC.Traders
![image](https://user-images.githubusercontent.com/46414904/202931044-dbccb6ed-1e95-4c66-a4b7-75c82a824655.png)
## KafkaCDC.Notifications
![image](https://user-images.githubusercontent.com/46414904/202932789-52f385f0-be67-4c13-8630-4121d124bc5c.png)
## postman requests
- Create new trader:
```json
{
"email": "tyschenk90@gmail.com",
"firstName": "ira",
"lastName": "tysh",
"address": "home",
"phoneNumber": "09721121",
"birthDate": "2022-05-24T13:22:25.667Z",
"gender": "female"
}
```
Record will be added to traders table as well as to outbox table.
![image](https://user-images.githubusercontent.com/46414904/202932939-c4586215-3fdb-4f82-af58-03b409aa6c6a.png)
![image](https://user-images.githubusercontent.com/46414904/202932953-c963df4c-5adf-42e8-bbea-29fd43bee76a.png)
-Create new deal
```json
{
"id":"6fa85f64-5717-4562-b3fc-2c963f66afa6",
"shortName": "swr576skj6",
"dealType": "Equity",
"dealStatus": "Open",
"amount": 12,
"initialPriceRangeLow": 10,
"initialPriceRangeHigh": 13,
"revisedPriceRangeLow": 14,
"revisedPriceRangeHigh": 15
}
```
When new deal is added in the same time record wis added to outbox table:
![image](https://user-images.githubusercontent.com/46414904/202933024-6bfb8bd8-b665-46a1-9029-d13d5512bed2.png)
- Create subscrition of trader to deal:
```json
{
"traderId": "f14490e4-0410-4d44-a2c5-9274167ceb2a",
"dealId": "7d5f0271-37c3-4586-9481-6e7f302ee405"
}
```
New subscription will be added to the database and dealSubscriptions event will be written to outbox table.
![image](https://user-images.githubusercontent.com/46414904/202933149-3dc84fa5-8996-4c33-a538-3bca7bd08188.png)
## Debezium connector configuration:
configuration for deals service will be configured by calling
### PUT http://localhost:8083/connectors/outbox-connector/config
```json
{
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "root",
"database.dbname": "kafkacdc.deals",
"database.server.name": "postgres",
"schema.include.list": "public",
"table.include.list": "public.OutboxEvents",
"tombstones.on.delete": "false",
"transforms": "outbox",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"slot.name": "debezium10",
"transforms.outbox.table.field.event.id": "Id",
"transforms.outbox.table.field.event.key": "AggregateId",
"transforms.outbox.table.field.event.payload": "Payload",
"transforms.outbox.route.by.field": "AggregateType",
"transforms.outbox.route.topic.replacement": "${routedByValue}.events",
"transforms.outbox.table.fields.additional.placement": "Type:header:eventType",
"transforms.outbox.debezium.expand.json.payload": "true",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false,
"plugin.name": "pgoutput"
}
```
configuration for traders service will be configured by calling
### PUT http://localhost:8083/connectors/outbox-connector/config
```json
{
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "root",
"database.dbname": "kafkacdc.traders",
"database.server.name": "postgres",
"schema.include.list": "public",
"table.include.list": "public.OutboxEvents",
"tombstones.on.delete": "false",
"transforms": "outbox",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.table.field.event.id": "Id",
"transforms.outbox.table.field.event.key": "AggregateId",
"transforms.outbox.table.field.event.payload": "Payload",
"transforms.outbox.route.by.field": "AggregateType",
"transforms.outbox.route.topic.replacement": "${routedByValue}.events",
"slot.name": "debezium11",
"transforms.outbox.table.fields.additional.placement": "Type:header:eventType",
"transforms.outbox.debezium.expand.json.payload": "true",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false,
"plugin.name": "pgoutput"
}
```
## status of connectors:
http://localhost:8083/connectors?expand=info&expand=status
Debezium connector now will stream events to subscribers from outbox tables.
First it will create topics in kafka and stream payload to appropriate topic based on configuration defined above.
![image](https://user-images.githubusercontent.com/46414904/202933343-dde2dbc6-58e8-4e56-888b-f6a18adc3943.png)
Notification service subscribed to traders.events topic and will send welcomw email to a new trader in system.
![image](https://user-images.githubusercontent.com/46414904/202983856-8e8816d9-8061-467d-817a-a8a182af078b.png)
In case of deal price change:
```json
{
"id": "48a716d1-20bc-418b-8507-feaf95282a46",
"priceLow": 12,
"priceHigh": 23
}
```
The information about chnage will be recorded to outbox table and all subscribed traders will receaive an email with price update:
![image](https://user-images.githubusercontent.com/46414904/202984272-1b9c03fa-631e-4089-bdc3-057cf4931941.png)
近期下载者:
相关文件:
收藏者: