eventstreamer
所属分类:物联网
开发工具:C#
文件大小:16KB
下载次数:0
上传日期:2018-09-06 18:18:28
上 传 者:
sh-1993
说明: 一种功能的实现,用于处理来自事件中心的事件,即来自小型车身传感器的事件。
(An implementation of a function to process events coming in from Event Hub, i.e. from small on-body sensors.)
文件列表:
LICENSE (1071, 2018-09-07)
appveyor.yml (172, 2018-09-07)
src (0, 2018-09-07)
src\DispatcherFunction (0, 2018-09-07)
src\DispatcherFunction\Common (0, 2018-09-07)
src\DispatcherFunction\Common\DataPoint.cs (831, 2018-09-07)
src\DispatcherFunction\DispatcherFunction.cs (7148, 2018-09-07)
src\DispatcherFunction\DispatcherFunction.csproj (859, 2018-09-07)
src\DispatcherFunction\host.json (3, 2018-09-07)
src\EventProcessing.sln (1128, 2018-09-07)
# EventStreamer - Azure Function & Redis
This is the Redis implementation of the event streaming through Azure Functions and Redis.
[![Build status](https://ci.appveyor.com/api/projects/status/90vp8weadevbwf3m/branch/master?svg=true)](https://ci.appveyor.com/project/avodovnik/eventstreamer/branch/master)
## What is it?
We're using an Azure Function to listen to events, coming into _Event Hub_ in a _cannonical format_. Each event contains a `DataPoint`, which looks like this:
```csharp
public class DataPoint
{
public string Key { get; set; }
public DateTime Timestamp { get; set; }
public string DeviceId { get; set; }
public string SessionId { get; set; }
public string SensorType { get; set; }
public List Names { get; set; } = new List();
public List Values { get; set; } = new List();
}
```
The reason for this structure is simple: we believed that there are many use cases where we will be collecting similar structured data from sensors, and we knew there will be some similarities (i.e. `SessionId`, etc.). The `Names` and `Values` fields are split so that it makes processing them with something like _Azure Stream Analytics_ easier, should we choose to use it in the future.
Events come in from multiple _players_, or uniquely identifiable streams. The function separates them based on SessionId. Because this is a specific implementation that is based on a real-world example of one of the partners we're working with, we've also made an assumption on a specific field being there - we build the `Key` based on `SessionId` and `AL` which is the field that we can _assume_ exists.
This happens in the following snippet of code:
```csharp
private static (DataPoint point, string message) SafelyConvertToDataPoint(byte[] data, ILogger log)
{
try
{
/// ...
var key = $"{point.SessionId}:{point.Values[1]}";
/// ...
}
/// ...
}
```
You can think of this function as a sort of [demultiplexer](https://en.wikipedia.org/wiki/Multiplexer). It looks at a continuous stream of data, and splits out multiple individual streams (in our case, per player).
## What does it do with the data?
When data comes in, the function buffers them into _Redis_ and tries to aggregate until it gets **a full second** of data; the reason here is simple, the customer uses sensors which emit data at 100 Hz. To make meaningful processing of this data possible, we need to aggregate it into a wider time period. To achieve this, we use the `Task ProcessPlayerAsync(string playerId, (DataPoint point, string strRepresentation)[] messages, ILogger log)` method, which looks at the buffer that we have available for each player, and makes a decision to _push time forward_ or not. The method `Task PushTimeAsync(string playerId, ILogger log)`, when called, then pops all the data from the buffer (Redis) queue, and aggregates the values within the `DataPoint`.
The result of this is an aggregated row that can be sent forward for processing, or used for calculations.
## How to Run it?
The function needs **two configuration settings** to function properly:
- `incomingEventHub` which is the connection string to the event hub onto which the events are being sent to, and
- `RedisConnectionString` which of course, is the connection string of the _Redis_ cache that is used as a buffer for the events inside Azure Function.
When deploying _locally_, edit the `local.settings.json` file, and add the above fileds into `Values`.
> Note, even though they are connection strings, they don't go into the `ConnectionStrings` set of values, as that's reserved for SQL connections only.
When running on Azure, make sure to add the above settings into the `Application Settings` section of the Function.
## Generating sample data
If you want to generate sample data, you can use the [Streamer.CLI](https://github.com/avodovnik/servicefabric-eventstreamer/tree/master/src/Streamer.CLI) which is a part of this project's `master` branch (for now). The streamer generates random events that satisfy the above assumption. It allows streaming to, and listening from, an `EventHub` with multiple partitions, and ensures each `player` (for our definition of such an entity) is only ever streamed to a single partition.
You can run the streamer using the following command:
```powershell
dotnet run stream --eh "event hub connection string" --num 5000 --interval 10 -s "***ef1baf-02d3-4122-8d54-02f83577d38f"
```
The `stream` tells the application to _stream_ data into the EventHub, `num` defines the amount of events it produces, the `interval` specifies the frequency it simulates (i.e. `10` means 10ms, or 100 Hz), and the optional `-s` allows you to specify a `Session ID` (this way you can test without flooding Redis with too many new keys).
## Known Problems
1) At the moment, Redis is never cleaned, which means that we are getting a ton of unused keys. [Issue #9](https://github.com/avodovnik/eventstreamer/issues/9).
2) Sometimes we see timeout exceptions, but the recent addition of `async` code seems to have fixed that.
近期下载者:
相关文件:
收藏者: