azure-iot-durable-patterns

所属分类:中间件编程
开发工具:JavaScript
文件大小:0KB
下载次数:0
上传日期:2020-11-23 15:31:44
上 传 者sh-1993
说明:  使用Azure Durable Functions将物联网消息从外部服务处理到物联网中心。,
(Using Azure Durable Functions to process IoT messages into IoT Hub from an external service.,)

文件列表:
.funcignore (66, 2020-11-23)
.vscode/ (0, 2020-11-23)
.vscode/extensions.json (73, 2020-11-23)
.vscode/launch.json (260, 2020-11-23)
.vscode/settings.json (236, 2020-11-23)
.vscode/tasks.json (540, 2020-11-23)
Downlink/ (0, 2020-11-23)
Downlink/function.json (286, 2020-11-23)
Downlink/index.js (1239, 2020-11-23)
EnrichGeofence/ (0, 2020-11-23)
EnrichGeofence/example.json (1624, 2020-11-23)
EnrichGeofence/function.json (285, 2020-11-23)
EnrichGeofence/index.js (674, 2020-11-23)
EnrichZoneStatus/ (0, 2020-11-23)
EnrichZoneStatus/function.json (317, 2020-11-23)
EnrichZoneStatus/index.js (536, 2020-11-23)
LICENSE (1071, 2020-11-23)
LifecycleReplication/ (0, 2020-11-23)
LifecycleReplication/function.json (293, 2020-11-23)
LifecycleReplication/index.js (1882, 2020-11-23)
LifecycleReplication/sample.dat (14, 2020-11-23)
OrchestratorStarter/ (0, 2020-11-23)
OrchestratorStarter/function.json (305, 2020-11-23)
OrchestratorStarter/index.js (2150, 2020-11-23)
ProcessMessage/ (0, 2020-11-23)
ProcessMessage/example.json (341, 2020-11-23)
ProcessMessage/function.json (123, 2020-11-23)
ProcessMessage/index.js (488, 2020-11-23)
TwinReplication/ (0, 2020-11-23)
TwinReplication/function.json (288, 2020-11-23)
TwinReplication/index.js (3773, 2020-11-23)
UplinkBridge/ (0, 2020-11-23)
UplinkBridge/engine.js (9965, 2020-11-23)
UplinkBridge/error.js (294, 2020-11-23)
UplinkBridge/function.json (114, 2020-11-23)
UplinkBridge/index.js (766, 2020-11-23)
assets/ (0, 2020-11-23)
assets/architecture.png (104517, 2020-11-23)
... ...

# Architecture/code patterns: IoT Message processing from external systems using Azure Durable Functions > DISCLAIMER: The code and all other documents in this repository are provided as is under MIT License. Many Azure IoT projects use Azure Functions in some way or form as serverless runtime to do processing on messages. While this works fine in simple scenarios, more complex scenarios might become unreliable, difficult to monitor, or unwieldy. Azure Durable Functions make life easier by allowing you to write stateful workflows using orchestrator functions. Using these orchestrators you won't have to worry about managing (or passing through) state, timeouts, restarts, etc. This document will describe a sample scenario and architecture for this pattern. **Disclaimer: this code sample shows a pattern, it does not implement all (code) best practices regarding security, validation, testing, etc.** **Note: this repo has some breaking changes, as it no longer relies on the (serverless) [IoT Central Bridge](https://github.com/Azure/iotc-device-bridge), but instead uses the (stateful) IoT Hub Gateway [IoT Hub Gateway](https://github.com/jessevl/IoTHubGateway) so that it can multiplex all the device connections over a single connection to IoT Hub (making it more scalable).** > Also be aware of the following (known) limitations of this sample: > 1. Cosmos DB does not currently support partial updates, therefore the twin replication function might insert outdated twin data when multiple functions run in parallel. > 2. The UplinkBridge function does not properly cache device credentials, neither does it cache the device twin (hence, it retrieves the twin on each incoming message). A production environment should implement these to prevent throttling from the IoT Hub side. ## Scenario: Ingesting and enriching messages from a third party solution into IoT Hub. ![Architecture](/assets/architecture.png "Architecture") In this scenario we are building a solution that ingests messages into IoT Hub from a third party system, in this case a Lora network server. To ingest the messages to IoT Hub we use the [IoT Hub Gateway](https://github.com/jessevl/IoTHubGateway), this gateway has the ability to send reported as well as desired properties to the relevant device twins in IoT Hub. The bridge ingests messages and posts them as the corresponding individual (impersonated) devices from the source system. In this case authorization/authentication is done on the Azure Functions level (we create short lived SAS tokens for eachd device). The IoT Hub gateway then uses these SAS tokens for each device individually over a *multiplexed* AMQP connection. The IoT Hub gateway does not need any access or authorization into IoT Hub, and as such it does not need to be secured. Most parts of this solution should also work with IoT Central rather than IoT Hub. Before we send in telemetry however, we want to enrich the messages. In this sample, the messages might contain location data, but we're in fact interested in the specified 'zones' in which a device/tracker/vehicle is in, not the raw location. Secondly, we're interested to see if these locations are marked as safe or not and add that as a flag to the incoming message. Thirdly, we want to trigger an alert to the Lora device if it turns out to be in an unsafe zone, we call this (in Lora terms) a downlink message and it is triggered by changing the device twin. Lastly, we configure Lora to send messages more frequently if one or more zones are unsafe (in distress) by setting a broadcast setting on the gateways (this setting is in fact represented as the property of a 'broadcast' device in IoT Hub). We also include a set of APIs that allow a user to read or change the geofences that are defined, and the status of the zones. These are stored in a Storage Table for zone states, and a Blob store for the GeoJSON zone definitions. Lastly, we copy the telemetry (both latest, as well as history) and the device twins to a Cosmos DB to prevent end-users from getting into throttling limits in IoT Hub. Someone could build a complete safety dashboard using these APIs and the information that's stored into Cosmos DB. The solution contains the following functions: * **OrchestratorStarter**: this function acts as HTTP endpoint to get the incoming message and start the orchestrator function. May be replaced by a function that retrieves, for example, an Event Hub message. An example message can be found is example.json in the function folder. * **ProcessMessage**: describes the sequence of functions to be called and state to be passed through. It also includes some built-in functions to track status, etc. * **EnrichGeofence**: binds a file from Azure blob storage as input where the definition of the zones (geofences) can be found (example in example.json) in GeoJSON format and performs a check for each defined geofence to see if the device is in this zone and adds this as a measurement to the message. * **EnrichZoneStatus**: checks the matched zone (names) against an Azure Table (partition) to see what the status of the matched zones are, the result is added as 'distress' property in the message. The table rows should have a partitionkey (this is the same for all entries), rowkey (the unique identifier of the zone), and a 'Distress' property with an integer. * **UplinkBridge**: finally the message is forwarded to an IoT Hub Gateway with device-specific credentials (the function will retrieve/create those). *Be aware that in production scenarios it is strongly advised to cache the connection strings. * **Downlink**: Once a device twin is changed (most likely due to the 'distress' state changing), this will trigger an event with the updated twin to be put onto the event hub, which this functions will pick up. The function will forward this to the downlink service of the specific Lora provider on a HTTPS endpoint defined in app settings. * **TwinReplication**: Since the IoT Hub imposes throttling limits on reading the device twins, we replicate the twins to a Cosmos DB container. Once a device twin is updated in IoT Hub, the message routing feature for twin change events in IoT Hub sends an event to an Event Hub, which then triggers this function to update the twin in a Cosmos DB container. This function is fed by the same Event Hub with twin change events as the above Downlink function. * **LifecycleReplication**: Similar to the TwinReplication function, the message routing feature in IoT Hub is used for lifecycle events, sending an event to Event Hub when a device is deleted or created, triggering this function to replicate that operation on the device twin in the Cosmos DB container. And for the APIs: * **getZones**: This function provides an API endpoint for GETting the entire GeoJSON with zone definitions. * **getZoneStatus**: This function will GET the latest zone status for all zones (from table storage). * **putZones**: This function allows a user to replace the GeoJSON with zone definitions. Please make sure to follow the right format, as represented in the example.json. When a new GeoJSON is uploaded, this function also makes sure to delete any zones from the status table that do not exist anymore, or create the ones that are new. * **putZoneStatus**: This function allows a user to PUT the distress status of a specific zone. It will also turn on or off the 'broadcast' capability of the gateways by changing the properties of a 'broadcast' device depending on if any zones are in distress (which will in turn trigger the downlink). You will need to set the following app settings (or local.settings.json when testing locally): ```javascript "AzureWebJobsStorage": "connection-string-to-storage-account", "StorageConnection": "connection-string-to-storage-account", "uplinkIdScope": "-id-scope-from-iot-hub", "uplinkSasToken": "sas-token-from-iot-hub", "uplinkRegistrationHost": "global.azure-devices-provisioning.net", "gatewayHost":"https://[your_iot_gateway_appservice].azurewebsites.net/gateway/" "zoneDefinitionPath": "folder/file.json", "zoneStatusPartition":"name-of-partition" "uplinkClientConnectionString": "A managenement key for the IoT Hub", "EventHubIngestConnectionstring": "Connection string for the event hub", "tripletDBendpoint":"Cosmos DB endpoint for the Device Twin DB", "tripletDBkey": "Key for the Device Twin DB", "tripletDBname": "Name of the Device Twin Database", "tripletDBcontainer": "Container of the Device Twin DB", "downlinkHost": "Downlink host", "downlinkPath": "Downlink Path" ``` Note that you will need to deploy: * A storage account with a table ( 'zones') and a blob container ('zones'). * An IoT Hub with Device Provisioning Service and a enrollment group defined. The key from the enrollment group is the one we use as sas token. You can get the IdScope from the DPS instance. Also make sure to configure a route in the IoT Hub that outputs all twin changes to the 'twinchanges' event hub. * The [IoT Hub Gateway](https://github.com/jessevl/IoTHubGateway) deployed to an app service. Make sure to set the app setting 'IoTHubHostName' to the hostname of your IoT Hub, and 'SharedAccessPolicyKeyEnabled' set to 'false'. * An Azure Functions app to deploy all these functions to. * An event hub namespace with 3 event hubs: 'twinchanges' (where changes will be posted), 'twtgingest' (where messages will be ingested from) and 'alldevicetelemetry'. You can obviously change the names to your liking, also make sure that each reader has it's own consumer group. * Two stream analytics jobs, one to stream telemetry to a history collection in Cosmos DB with all telemetry, one to stream telemetry to a collection with the current state (that gets overwritten every time). For the first one use the 'alldevicetelemetry' event hub as input and the Cosmos DB as output (the collection called 'telemetry'). For the second case put the same event hub as input, but use the 'latesttelemetry' colection and make sure to enable the option in Stream analaytics to overwrite records ('Document ID') based on de deviceId. The queries are the same for both jobs: ``` SELECT *, GetMetadataPropertyValue(telemetry, '[EventHub].[IoTConnectionDeviceId]') AS deviceId INTO cosmos FROM telemetry ```

近期下载者

相关文件


收藏者