RabbitMQ-Iotdata

所属分类:中间件编程
开发工具:Java
文件大小:0KB
下载次数:0
上传日期:2023-11-12 06:34:27
上 传 者sh-1993
说明:  网络编程最终项目
(Final project of network programming)

文件列表:
AMQP-broker/ (0, 2023-05-23)
AMQP-broker/Dockerfile (161, 2023-05-23)
AMQP-gateway/ (0, 2023-05-23)
AMQP-gateway/pom.xml (2132, 2023-05-23)
AMQP-gateway/src/ (0, 2023-05-23)
AMQP-gateway/src/main/ (0, 2023-05-23)
AMQP-gateway/src/main/java/ (0, 2023-05-23)
AMQP-gateway/src/main/java/com/ (0, 2023-05-23)
AMQP-gateway/src/main/java/com/network/ (0, 2023-05-23)
AMQP-gateway/src/main/java/com/network/programming/ (0, 2023-05-23)
AMQP-gateway/src/main/java/com/network/programming/amqpgateway/ (0, 2023-05-23)
AMQP-gateway/src/main/java/com/network/programming/amqpgateway/AmqpGatewayApplication.java (455, 2023-05-23)
AMQP-gateway/src/main/java/com/network/programming/amqpgateway/Censor.java (1069, 2023-05-23)
AMQP-gateway/src/main/java/com/network/programming/amqpgateway/CensorController.java (2265, 2023-05-23)
AMQP-gateway/src/main/java/com/network/programming/amqpgateway/CensorRepository.java (431, 2023-05-23)
AMQP-gateway/src/main/java/com/network/programming/amqpgateway/ConfigGateway.java (8863, 2023-05-23)
AMQP-gateway/src/main/java/com/network/programming/amqpgateway/IOTMessage.java (1795, 2023-05-23)
AMQP-gateway/src/main/java/com/network/programming/amqpgateway/IoTDataController.java (1035, 2023-05-23)
AMQP-gateway/src/main/java/com/network/programming/amqpgateway/IotData.java (1953, 2023-05-23)
AMQP-gateway/src/main/java/com/network/programming/amqpgateway/IotDataRepository.java (371, 2023-05-23)
AMQP-gateway/src/main/java/com/network/programming/amqpgateway/SendMessage.java (1480, 2023-05-23)
AMQP-gateway/src/main/resources/ (0, 2023-05-23)
AMQP-gateway/src/main/resources/application.properties (230, 2023-05-23)
AMQP-gateway/src/main/resources/sampleDB.js (255, 2023-05-23)
AMQP-node/ (0, 2023-05-23)
AMQP-node/pom.xml (2211, 2023-05-23)
AMQP-node/src/ (0, 2023-05-23)
AMQP-node/src/main/ (0, 2023-05-23)
AMQP-node/src/main/java/ (0, 2023-05-23)
AMQP-node/src/main/java/com/ (0, 2023-05-23)
AMQP-node/src/main/java/com/network/ (0, 2023-05-23)
AMQP-node/src/main/java/com/network/programming/ (0, 2023-05-23)
AMQP-node/src/main/java/com/network/programming/amqpnode/ (0, 2023-05-23)
AMQP-node/src/main/java/com/network/programming/amqpnode/AmqpNodeApplication.java (689, 2023-05-23)
AMQP-node/src/main/java/com/network/programming/amqpnode/ConfigNode.java (2258, 2023-05-23)
AMQP-node/src/main/java/com/network/programming/amqpnode/CsvProcessor.java (1599, 2023-05-23)
AMQP-node/src/main/java/com/network/programming/amqpnode/IOTData.java (1829, 2023-05-23)
AMQP-node/src/main/java/com/network/programming/amqpnode/IOTMessage.java (1977, 2023-05-23)
AMQP-node/src/main/java/com/network/programming/amqpnode/NodeCensor.java (2701, 2023-05-23)
... ...

# **network-programming-group6** ## ***`Ch : ng dng truyn d liu các thit b IOT bng giao thc AMQP`*** ## **Mc lc:** - [1. Tng quan v giao thc AMQP](https://github.com/batamsieuhang/RabbitMQ-Iotdata/blob/master/#tng-quan-v-giao-thc-amqp) - [2. Kin trúc project](https://github.com/batamsieuhang/RabbitMQ-Iotdata/blob/master/#kin-trúc-project) - [3. c t chng trình](https://github.com/batamsieuhang/RabbitMQ-Iotdata/blob/master/#c-t-chng-trình) - [3.1. Chuong trình hin th thng tin các cm bin thc hin kt ni vi gateway](https://github.com/batamsieuhang/RabbitMQ-Iotdata/blob/master/#chuong-trình-hin-th-thng-tin-các-cm-bin-thc-hin-kt-ni-vi-gateway) - [3.2. Chuong trình sinh d liu cho các cm bin: t ng sinh d liu cm bin và gi d liu lên gateway. Chuong trình hin th d liu nhn uc phía gateway (s dng các biu line chart, bar chart, pie chart hin th)](https://github.com/batamsieuhang/RabbitMQ-Iotdata/blob/master/#chuong-trình-sinh-d-liu-cho-các-cm-bin-t-ng-sinh-d-liu-cm-bin-và-gi-d-liu-lên-gateway-chuong-trình-hin-th-d-liu-nhn-uc-phía-gateway-s-dng-các-biu--line-chart-bar-chart-pie-chart--hin-th) - [3.3. Chuong trình hin th thng tin iu khin ti các cm bin (uc gi t gateway)](https://github.com/batamsieuhang/RabbitMQ-Iotdata/blob/master/#chuong-trình-hin-th-thng-tin-iu-khin-ti-các-cm-bin-uc-gi-t-gateway) - [3.4. Chuong trình hin th ánh giá hiu nng ca giao thc: Throughput, delay, v.v. khi s lung node cm bin tng lên.](https://github.com/batamsieuhang/RabbitMQ-Iotdata/blob/master/#chuong-trình-hin-th-ánh-giá-hiu-nng-ca-giao-thc-throughput-delay-vv-khi-s-lung-node-cm-bin-tng-lên) - [3.5. Chuong trình hin th ánh giá s lung ti a các node cm bin có th kt ni n gateway](https://github.com/batamsieuhang/RabbitMQ-Iotdata/blob/master/#chuong-trình-hin-th-ánh-giá-s-lung-ti-a-các-node-cm-bin-có-th-kt-ni-n-gateway) - [4. Tài liu tham kho](https://github.com/batamsieuhang/RabbitMQ-Iotdata/blob/master/#tài-liu-tham-kho) ## **Danh sách thành viên** | Thành viên | Cng vic | ánh giá | | :------------ |:---------------:| -----:| | Anh Tú | Làm phn backend(1,2,3,4,5), làm báo cáo | 22% | | H Thanh Huyn | Làm backend(2), làm báo cáo | 19,5% | | V Vit Thành | Làm backend(1), Làm frontend(1,2,3) | 21% | | Phan Duy Thng | Làm backend(1,5) | 19,5% | | Nguyn Tun Hip | Vit báo cáo, làm frontend(2) | 18% | ## **Tng quan v giao thc AMQP** ### Gii thiu v Rabbit AMQP RabbitMQ là mt h thng phn mm m ngun m cung cp mt giao thc trung gian tin nhn phn tán gi là Advanced Message Queuing Protocol (AMQP). AMQP là mt giao thc chun giao tip gia các h thng phn tán và cung cp tính nng nh m bo tin nhn n, xác nhn gi và giao nhn tin nhn theo th t. RabbitMQ, nh mt trình trin khai AMQP ph bin, cung cp kh nng x l hàng triu tin nhn mt cách tin cy và hiu qu. Nó c thit k h tr gi và nhn các tin nhn gia các ng dng, các dch v và các thành phn trong mt h thng phn tán. RabbitMQ s dng m hình hàng i (queue) lu tr tin nhn trong quá trình gi và nhn. Các ng dng có th gi tin nhn n mt hàng i và các ng dng khác có th ly tin nhn t hàng i ó x l. iu này cho phép các ng dng hot ng c lp và tách bit vi nhau, giúp tng tính m rng và linh hot trong h thng phn tán. RabbitMQ là mt giao thc có th lp trình. Nó h tr nhiu ngn ng lp trình và cung cp các th vin và SDK cho Java, Python, .NET, Ruby và nhiu ngn ng khác. iu này cho phép các nhà phát trin s dng RabbitMQ trong các ng dng ca h mt cách d dàng và linh hot. Tuy nhiên, nó cng òi hi các lp trình viên phi xác nh c xung t nh ngha tim tàng. RabbitMQ và giao thc AMQP em li nhiu li ích quan trng cho vic xy dng và trin khai các h thng phn tán: - m bo tin cy: RabbitMQ s dng giao thc AMQP m bo rng tin nhn c gi và nhn mt cách áng tin cy. Nó h tr xác nhn gi và giao nhn tin nhn, m bo rng khng có tin nhn nào b mt trong quá trình truyn và m bo các tin nhn n úng th t. - Tính m rng và linh hot: RabbitMQ cho phép xy dng các h thng phn tán có tính m rng cao và linh hot. Bng cách s dng m hình hàng i, các ng dng có th gi và nhn tin nhn mt cách khng ng b và c lp. iu này cho phép các thành phn trong h thng hot ng c lp và có th c m rng d dàng khi cn thit. - Kin trúc phn tán: RabbitMQ h tr kin trúc phn tán, cho phép trin khai các m hình phn tán phc tp. Vi RabbitMQ, bn có th xy dng các mng li hàng i và các kch bn giao tip phc tp gia các ng dng và dch v khác nhau. iu tit ti: RabbitMQ có th c s dng iu tit ti trong các h thng có ti cng vic cao. Bng cách s dng hàng i, nó cho phép phn phi cng vic mt cách cng bng và kim soát tc x l ca các ng dng tiêu th tin nhn. - Tích hp d dàng: RabbitMQ cung cp các th vin và SDK cho nhiu ngn ng lp trình ph bin, cho phép tích hp d dàng vào các ng dng hin có. Bn có th s dng RabbitMQ trong các ng dng Java, Python, .NET, Ruby và nhiu ngn ng khác. H tr nhiu tính nng: RabbitMQ cung cp nhiu tính nng hu ích nh x l hàng triu tin nhn, nh tuyn linh hot, x l li và s kin, iu khin quyn truy cp,... ## **Kin trúc project** ### M hình kin trúc dch v
### Cng ngh s dng trong project - **`RabbitMQ`**: Dng broker - **`Spring AMQP`**: Framework phc v code node và gateway bng giao thc AMQP - **`Spring boot`**: S dng làm backend cho project - **`Reactjs`**: S dng làm frontend cho project - **`Mongodb`**: S dng làm database - **`Prometheus`**: Thu thp metric t rabbit và các node container giám sát - **`Grafana`**: M hình hóa các thng s t rabbit và node phc v cho vic giám sát(c th là mc 4 và 5) - **`Docker`**: Chy project di dng container File docker compose thc thi chng trình ```yml version: '3' # https://docs.docker.com/compose/compose-file/#networks networks: rabbitmq-prometheus: # https://docs.docker.com/compose/compose-file/#volumes volumes: rabbitmq-prometheus_prometheus: rabbitmq-prometheus_grafana: services: mongo: image: mongo:6.0 restart: always ports: - 27017:27017 environment: MONGO_INITDB_ROOT_USERNAME: admin MONGO_INITDB_ROOT_PASSWORD: pass MONGO_INITDB_DATABASE: iot_data volumes: - ./mongo-entrypoint/:/docker-entrypoint-initdb.d/ command: mongod rabbitmq: build: ./AMQP-broker ports: - 15671:15671 - 15672:15672 - 15692:15692 - 25672:25672 - 5672:5672 networks: - "rabbitmq-prometheus" reactjs: build: ./frontend ports: - 3006:3000 grafana: # https://hub.docker.com/r/grafana/grafana/tags image: grafana/grafana:8.3.4 ports: - "3000:3000" networks: - "rabbitmq-prometheus" volumes: - rabbitmq-prometheus_grafana:/var/lib/grafana - ./grafana/dashboards.yml:/etc/grafana/provisioning/dashboards/rabbitmq.yaml - ./grafana/datasources.yml:/etc/grafana/provisioning/datasources/prometheus.yaml - ./grafana/dashboards:/dashboards environment: # https://grafana.com/plugins/flant-statusmap-panel # https://grafana.com/plugins/grafana-piechart-panel # https://grafana.com/plugins/grafana-polystat-panel # https://grafana.com/plugins/jdbranham-diagram-panel # https://grafana.com/plugins/michaeldmoore-multistat-panel # https://grafana.com/plugins/vonage-status-panel # https://grafana.com/plugins/yesoreyeram-boomtable-panel GF_INSTALL_PLUGINS: "flant-statusmap-panel,grafana-piechart-panel" prometheus: # https://hub.docker.com/r/prom/prometheus/tags image: prom/prometheus:v2.28.1 networks: - "rabbitmq-prometheus" ports: - "9090:9090" volumes: - rabbitmq-prometheus_prometheus:/prometheus - ./prometheus.yml:/etc/prometheus/prometheus.yml:ro node-exporter: command: - '--path.procfs=/host/proc' - '--path.rootfs=/rootfs' - '--path.sysfs=/host/sys' - '--collector.filesystem.ignored-mount-points=^/(sys|proc|dev|host|etc)($$|/)' expose: - 9100 # https://hub.docker.com/r/prom/node-exporter/tags image: prom/node-exporter:v1.2.0 networks: - "rabbitmq-prometheus" volumes: - /proc:/host/proc:ro - /sys:/host/sys:ro - /:/rootfs:ro ``` ## **c t chng trình** **Các bc chy chng trình** Bc 1 ```bash ~/project/network-programming-group6$ docker compose up ``` Bc 2 Do chúng em cha kp build thành file jar nên chy project ti 2 tab AMQP-node và AMQP-gateway t intellij ### **`Chuong trình hin th thng tin các cm bin thc hin kt ni vi gateway`** **1. Backend:** - Tt c các censor c lu vào trong database có dng mu nh sau: ```json iot_plug_censor = { "id": "10", "name": 10, "status": "on", "have_turn_on": "no" } ``` - Khi ó hin th thng tin các cm bin thc hin kt ni vi gateway bên gateway ch cn có chng trình xut thng tin censor t database lên. ```java @GetMapping("/censor") public ResponseEntity findAllCensors() { return new ResponseEntity(censorRepository.findAll(), HttpStatus.OK); } ``` - Kt qu:
### `Chuong trình sinh d liu cho các cm bin: t ng sinh d liu cm bin và gi d liu lên gateway. Chuong trình hin th d liu nhn uc phía gateway (s dng các biu line chart, bar chart, pie chart hin th)` 1. M t database - B d liu trong DEBS 2014 Grand Challenge bao gm thng tin v các s kin và hot ng din ra trong các ngi nhà thng minh. C th, d liu bao gm các b d liu c ghi li t các cm bin trong nhà (các phích cm thng minh), thu thp thng tin v nhit , ánh sáng, m, in nng tiêu th và các hot ng ca c dn. Tuy nhiên, tp d liu c thu thp t mi trng thc t, do ó, kh nng d liu khng úng nh dng cng nh thiu các phép o. - Mc ích ca vic cung cp d liu này là thúc y s phát trin và th nghim các gii pháp x l d liu thi gian thc bng cách gi nh cu trúc phn cp vi mt ngi nhà thng minh. Lc ca lung c s c m t nh sau: - id: m nh danh duy nht ca phép o [32 bit unsigned int]. - timestamp: du thi gian ca phép o (s giy k t ngày 1 tháng 1 nm 1970, 00:00:00 GMT) [32 bit unsigned int]. - value: phép o [32 bit floating point]. - property: loi phép o: 0 cho cng vic hoc 1 cho ti [boolean]. - plug_id: m nh danh duy nht (trong mt h gia ình) ca phích cm thng minh [32 bit unsigned int]. - household_id: m nh danh duy nht ca h gia ình ni t phích cm [32 bit unsigned int]. - house_id: m nh danh duy nht ca ngi nhà ni có h gia ình có phích cm [32 bit unsigned int]. - D liu mu khi c lu vào database ```json iot_data = { "id": 23711, "unixTime": 937597, "value": 80.1, "workLoad": 0, "plugId": 1, "houseHoldId": 1, "houseId": 0 } ``` - Do vn v thi gian code project nên chúng em ch gii hn data y là 1 nhà trong 1 tòa nhà và các censor là các plug_id. Khi ó d liu chính s là lng in tng mà 1 censor ti 1 cm o c vi n v là wat. Vì vy `plugId` s có giá tr mc nh là 1(lng in tiêu th t lúc cm in ti lúc gi), `houseHoldId` có giá tr mc nh là 1(ngi nhà s 1), `houseId` có giá tr mc nh là 0(tòa nhà s 0). 2. Chng trình sinh d liu t phía Node ```java public void processData(RabbitTemplate iotData, int plugId) throws IOException, InterruptedException { String csvFilePath = "/home/mr8/project/network-programming-group6/AMQP-node/src/main/resources/debs40houses16h/house-0.csv"; CSVReader reader = new CSVReader(new FileReader(csvFilePath)); String[] nextLine; while ((nextLine = reader.readNext()) != null) { if ((Integer.parseInt(nextLine[5]) == 1) && (Integer.parseInt(nextLine[4]) == plugId) && (Integer.parseInt(nextLine[3]) == 1)&&(Double.parseDouble(nextLine[2])!=0)) { int id = Integer.parseInt(nextLine[0]); long unixTime = Long.parseLong(nextLine[1]); double watValue = Double.parseDouble(nextLine[2]); int workLoad = Integer.parseInt(nextLine[3]); int houseHoldId = Integer.parseInt(nextLine[5]); int houseId = Integer.parseInt(nextLine[6]); String msg = String.format("{\"id\": %d, \"unixTime\": %d, \"watValue\": %.3f, \"workLoad\": %d, \"plugId\": %d, \"houseHoldId\": %d, \"houseId\": %d}", id, unixTime, watValue, workLoad, plugId, houseHoldId, houseId); String routingKey = getRoutingKeyByPlugId(plugId); iotData.convertAndSend("IOT_data", routingKey, msg); System.out.println(msg); Thread.sleep(250); } } } } ``` - u tiên chúng em ch nh ng dn là 1 file d liu t tòa nhà s 0. - D liu s c lc theo plugId c ch nh t bên gateway. - File s c gi di nh dng json string tin cho bên gateway thu thp d liu. - Delay c ch i khi c d liu t 1 dòng csv là 0.25s khác vi thc t là 2-3s s gi 1 ln d liu thì chúng em 0.25s là cho ta có th theo di d liu nhanh hn. - Khi gi d liu thì msg s c gi vi exchange là `IOT_data` và routingkey phù hp vi plugId. Ví d plugId = 2 thì routingkey s là `IOTQueue2`. L do là bi vic ch nh riêng routingkey cho tng msg ca plugId khi gi lên trên broker thì cng s ch nh riêng queue cho tng plug id, vi routingkey `IOTQueue2` thì queue trên broker s là `IOTQueue2` nh vy s gim ti cho queue, thay vì gi ht data vào 1 queue thì data ca riêng tng loi s c gi vào queue riêng bit. Vic này giúp chng trình gi d liu sang bên gateway ít khi xut hin tr, thun tin cho vic data realtime. - Hàm chn routingkey theo plugid t gateway: ```java private String getRoutingKeyByPlugId(int plugId) { switch (plugId) { case 1: return "IOTQueue"; case 2: return "IOTQueue2"; case 3: return "IOTQueue3"; case 4: return "IOTQueue4"; case 5: return "IOTQueue5"; case 6: return "IOTQueue6"; case 7: return "IOTQueue7"; case 8: return "IOTQueue8"; case 9: return "IOTQueue9"; case 10: return "IOTQueue10"; // Add more cases for other plugIds and routing keys default: throw new IllegalArgumentException("Invalid plugId"); } } ``` 3. Chng trình nhn d liu t gateway - Hàm lu data t message c ly t queue t broker ```java @Service public class IOTMessage implements MessageListener { @Autowired IotDataRepository myMongoRepository; @Override public void onMessage(Message message) { String msg = new String(message.getBody()); System.out.println(msg); IotData dataObject = parseSensorData(msg); // Save data object to MongoDB using repository myMongoRepository.save(dataObject); } // function convert message to iot data private IotData parseSensorData(String messageBody) { // Parse message data into a Java object using Gson Gson gson = new Gson(); JsonObject dataObject = gson.fromJson(messageBody, JsonObject.class); IotData sampleData = new IotData(dataObject.get("unixTime").getAsLong(),dataObject.get("watValue").getAsDouble(),dataObject.get("workLoad").getAsInt(),dataObject.get("plugId").getAsInt(),dataObject.get("houseHoldId").getAsInt(),dataObject.get("houseId").getAsInt()); return sampleData; } } ``` - u tiên message khi nhn t broker s c convert t string sang nh dng json bng hàm ```java private IotData parseSensorData(String messageBody) { // Parse message data into a Java object using Gson Gson gson = new Gson(); JsonObject dataObject = gson.fromJson(messageBody, JsonObject.class); IotData sampleData = new IotData(dataObject.get("unixTime").getAsLong(),dataObject.get("watValue").getAsDouble(),dataObject.get("workLoad").getAsInt(),dataObject.get("plugId").getAsInt(),dataObject.get("houseHoldId").getAsInt(),dataObject.get("houseId").getAsInt()); return sampleData; } ``` - Sau khi x l message thì data s c lu vào trong db ca mongodb `myMongoRepository.save(dataObject);` - Hàm gi api d liu ca tng plug id ```java @RestController @RequestMapping("/api/v1") public class IoTDataController { @Autowired IotDataRepository iotDataRepository; @GetMapping("/iotdata") public ResponseEntity findAllData() { return new ResponseEntity(iotDataRepository.findAll(), HttpStatusCode.valueOf(200)); } @GetMapping("/iotdata/{plugId}") public ResponseEntity findDataByPlugId(@PathVariable int plugId) { List data = iotDataRepository.findByPlugId(plugId); return new ResponseEntity(data, HttpStatusCode.valueOf(200)); } } ``` - Hàm u tiên là hàm gi api ca tt c các plug, hàm th hai là hàm gi api theo tng plugId ch nh 3. Kt qu - Kt qu api tr v ca `plugId=3`
- Kt qu linechart `plugId=3`
### `Chuong trình hin th thng tin iu khin ti các cm bin (uc gi t gateway)` 1. tng trin khai Vic iu khin thng tin các cm bin s c thc hin frontend, vì vy cn xy dng method post gi thng tin cn bt censor cho gateway. Sau khi nhn c thng tin cn bt censor t ngi dùng, gateway s gi 1 message theo mt lung riêng(`routingkey` là `IOTController`, `exchange` là `IOTController` ) ti node. Nh vy vi mi thng tin c gi t `IOTController` routingkey thì node s coi nh là thng tin iu khin, vic này giúp phn bit cho các bên phn bit c gia thng tin iu khin và thng tin là d liu. Node có vai trò nhn message và bt censor theo yêu cu ca ngi dùng. 2. Post method bt censor ```java @PostMapping("/check") public void checkAndSend(@RequestBody Censor requestCensor) { // Retrieve entries from the database where status is "on" and have_turn_on is "no" Iterable statusCensors = censorRepository.findByNameAndStatus(requestCensor.getName(), "on", "no"); for (Censor censor : statusCensors) { int plugId = censor.getName(); // Assuming name field in Censor corresponds to plugId String msg = "Enable " + plugId; // Send message to IOTController iotData.convertAndSend("IOTController", "IOTController", msg); // Update have_turn_on field to "yes" censor.setHave_turn_on("yes"); censorRepository.save(censor); } } ``` - Khi nhn post method gi ti server gateway, gateway s kim tra trong db có censor ó khng. Nu censor ó ang bt thì s khng bt li và ngc li nu censor ó cha c bt thì s gi message `"Enable " + plugId` ti node. Nh vy theo nh phn 2 thì node s gi data theo úng routingkey và queue theo úng plugid ch nh. - Cui cùng khi censor c bt thì `have_turn_on` ca censor c chuyn thành `yes` ln sau chng trình s khng phi bt li censor. 3. Kt qu - Thêm censor s 4
- Trng thái ca censor 4 khi cha bt là status `Off`
- Tích vào du `+` action bt Censor, khi ó status `On` tc là censor c bt
- Kim tra chart ca plug 4
### `Chuong trình hin th ánh giá hiu nng ca giao thc: Throughput, delay, v.v. khi s lung node cm bin tng lên.` 1. M t chng trình - Cng ngh: - `Prometheus`: Thu thp các metric t node exporter - `Grafana`: M hình hóa các metric c gi t prometheus - ánh giá hiu nng ca ... ...

近期下载者

相关文件


收藏者