.nuget/ (0, 2016-05-09)
.nuget/NuGet.Config (164, 2016-05-09)
.nuget/NuGet.exe (1664000, 2016-05-09)
.nuget/NuGet.targets (7484, 2016-05-09)
CONTRIBUTING.md (815, 2016-05-09)
Entities/ (0, 2016-05-09)
Entities/Entities.csproj (3562, 2016-05-09)
Entities/Payload.cs (1779, 2016-05-09)
Entities/Properties/ (0, 2016-05-09)
Entities/Properties/AssemblyInfo.cs (1392, 2016-05-09)
Entities/packages.config (139, 2016-05-09)
EventProcessorHostWorkerRole/ (0, 2016-05-09)
EventProcessorHostWorkerRole/EventProcessor.cs (6335, 2016-05-09)
EventProcessorHostWorkerRole/EventProcessorFactory.cs (2134, 2016-05-09)
EventProcessorHostWorkerRole/EventProcessorHostWorkerRole.csproj (7193, 2016-05-09)
EventProcessorHostWorkerRole/Properties/ (0, 2016-05-09)
EventProcessorHostWorkerRole/Properties/AssemblyInfo.cs (1432, 2016-05-09)
EventProcessorHostWorkerRole/WorkerRole.cs (21143, 2016-05-09)
EventProcessorHostWorkerRole/app.config (5605, 2016-05-09)
EventProcessorHostWorkerRole/packages.config (929, 2016-05-09)
Helpers/ (0, 2016-05-09)
Helpers/CloudConfigurationHelper.cs (2045, 2016-05-09)
Helpers/Helpers.csproj (4053, 2016-05-09)
Helpers/Properties/ (0, 2016-05-09)
Helpers/Properties/AssemblyInfo.cs (1390, 2016-05-09)
Helpers/TraceEventSource.cs (15690, 2016-05-09)
Helpers/packages.config (167, 2016-05-09)
Images/ (0, 2016-05-09)
Images/Client.png (48203, 2016-05-09)
Images/ETW01.png (36457, 2016-05-09)
Images/ETW02.png (13356, 2016-05-09)
Images/Prototype.png (37463, 2016-05-09)
LICENSE (1087, 2016-05-09)
Scripts/ (0, 2016-05-09)
Scripts/CreateIoTDb.sql (1216, 2016-05-09)
Scripts/Sample.sql (659, 2016-05-09)
Sender/ (0, 2016-05-09)
... ...
---
services: cloud-services, event-hubs, sql-database
platforms: dotnet
author: paolosalvatori
---
# How to use a Worker Role to read telemetry data from an Event Hub and store it to Azure SQL Database using JSON functionalities
This sample shows how to use the **EventProcessorHost** to retrieve events from an **Event Hub** and store them in a batch mode to an **Azure SQL Database** using the [OPENJSON](https://msdn.microsoft.com/en-us/library/dn921885.aspx) function.The solution demonstrates how the use the following techniques:
* Send events to an [Event Hub](https://msdn.microsoft.com/en-us/library/azure/dn789973.aspx) using both AMQP and HTTPS transport protocols.
* Create an entity level shared access policy with only the Send claim. This key will be used to create SAS tokens, one for each publisher endpoint.
* Issue a SAS token to secure individual publisher endpoints.
* Use a SAS token to authenticate at a publisher endpoint level.
* Use the[EventProcessorHost](https://msdn.microsoft.com/en-us/library/microsoft.servicebus.messaging.eventprocessorhost(v=azure.95).aspx)to retrieve and process events from an event hub.
* Perform structured and semantic logging using a custom[EventSource](https://msdn.microsoft.com/en-us/library/system.diagnostics.tracing.eventsource%28v=vs.110%29.aspx?f=255&MSPPError=-2147217396)class and ETW Logs introduced by Azure SDK 2.5.
* Use the [OPENJSON](https://msdn.microsoft.com/en-us/library/dn921885.aspx) table-value function in a stored procedure to process a batch of rows.
**NOTE**: this article is not intended to provide an exhaustive analysis of the various batching techniques offered by Azure SQL Database. Relying on batching to optimize data ingestion is a topic by itself, if you’re interested in the details take a look at this dedicated article: [How to use batching to improve SQL Database application performance](https://azure.microsoft.com/en-us/documentation/articles/sql-database-use-batching-to-improve-performance/).
Also look at [How to store Event Hub events to Azure SQL Database](https://code.msdn.microsoft.com/How-to-integrate-store-828769eb) for a version of the sample where the event processor uses a stored procedure with a [Table-Valued Parameter](https://msdn.microsoft.com/en-us/library/bb675163%28v=vs.110%29.aspx?f=255&MSPPError=-2147217396) to store multiple events in a batch mode to a table on an Azure SQL database.
# Scenario
This solution simulates an Internet of Things (IoT) scenario where thousands of devices send events (e.g. sensor readings) to a backend system via a message broker. The backend system retrieves events from the messaging infrastructure and store them to a persistent repository in a scalable manner.
# Architecture
The sample is structured as follows:
* A Windows Forms application can be used to create an event hub and an entity level shared access policy with only the Send access right.The same application can be used to simulate a configurable amount of devices that send readings into the event hub. Each device uses a separate publisher endpoint to send data to the underlying event hub and a separate SAS token to authenticate with the **Service Bus** namespace.
* An **Event Hub** is used to ingest device events.
* A worker role with multiple instances uses an[EventProcessorHost](https://msdn.microsoft.com/en-us/library/microsoft.servicebus.messaging.eventprocessorhost(v=azure.95).aspx) to read and process messages from the partitions of the event hub.
* The custom [EventProcessor](https://msdn.microsoft.com/en-us/library/microsoft.servicebus.messaging.ieventprocessor.aspx) class inserts a collection of events into a table of a **SQL Database** in a batch mode by invoking a stored procedure.
* The worker role uses a custom [EventSource](https://msdn.microsoft.com/en-us/library/system.diagnostics.tracing.eventsource%28v=vs.110%29.aspx?f=255&MSPPError=-2147217396) class and the Windows Azure Diagnostics support for ETW Events to write log data to table storage.
* The stored procedure uses the [OPENJSON](https://msdn.microsoft.com/en-us/library/dn921885.aspx) table-value function and the [MERGE](https://msdn.microsoft.com/en-us/library/bb510625.aspx) statement to implement an **UPSERT** mechanism.
The following picture shows the architecture of the solution:
![](https://i1.code.msdn.s-msft.com/how-to-integrate-store-828769eb/image/file/134594/1/prototype.png)
# References
JSON Functionalities of Azure SQL Database
* [JSON in SQL Server 2016: Part 1 of 4](https://blogs.technet.microsoft.com/dataplatforminsider/2016/01/05/json-in-sql-server-2016-part-1-of-4/)
* [Channel9 Video: SQL Server 2016 and JSON Support](https://channel9.msdn.com/Shows/Data-Exposed/SQL-Server-2016-and-JSON-Support)
* [Reference Documentation](https://msdn.microsoft.com/en-us/library/dn921897.aspx)
Event Hubs
* [Event Hubs](http://azure.microsoft.com/en-us/services/event-hubs/)
* [Get started with Event Hubs](http://azure.microsoft.com/en-us/documentation/articles/service-bus-event-hubs-csharp-ephcs-getstarted/)
* [Event Hubs Programming Guide](https://msdn.microsoft.com/en-us/library/azure/dn789972.aspx)
* [Service Bus Event Hubs Getting Started](https://code.msdn.microsoft.com/windowsazure/Service-Bus-Event-Hub-286fd097)
* [Event Hubs Authentication and Security Model Overview](https://msdn.microsoft.com/en-us/library/azure/dn789974.aspx)
* [Service Bus Event Hubs Large Scale Secure Publishing](https://code.msdn.microsoft.com/windowsazure/Service-Bus-Event-Hub-99ce67ab)
* [Service Bus Event Hubs Direct Receivers](https://code.msdn.microsoft.com/windowsazure/Event-Hub-Direct-Receivers-13fa95c6)
* [Service Bus Explorer](https://code.msdn.microsoft.com/windowsapps/Service-Bus-Explorer-f2abca5a)
* [Episode 160: Event Hubs with ElioDamaggio](http://channel9.msdn.com/Shows/Cloud+Cover/Episode-160-Event-Hubs-with-Elio-Damaggio) (video)
* [Telemetry and Data Flow at Hyper-Scale: Azure Event Hub](http://channel9.msdn.com/Events/TechEd/Europe/2014/CDP-B307) (video)
* [Data Pipeline Guidance](https://github.com/mspnp/data-pipeline) (Patterns & Practices solution)
* [Event Processor Host Best Practices Part 1](http://blogs.msdn.com/b/servicebus/archive/2015/01/16/event-processor-host-best-practices-part-1.aspx)
* [Event Processor Host Best Practices Part 2](http://blogs.msdn.com/b/servicebus/archive/2015/01/21/event-processor-host-best-practices-part-2.aspx)
* [How to create a Service Bus Namespace and an Event Hub using a PowerShell script](http://blogs.msdn.com/b/paolos/archive/2014/12/01/how-to-create-a-service-bus-namespace-and-an-event-hub-using-a-powershell-script.aspx)
ETW Logs
* [Diagnostics: Improved diagnostics logging with ETW](http://azure.microsoft.com/blog/2014/11/12/announcing-azure-sdk-2-5-for-net-and-visual-studio-2015-preview/)
# Visual Studio Solution
The Visual Studio solution includes the following projects:
* **CreateIoTDbWithMerge.sql**: this script can be used to create the **SQL Database** used to store device events.
* **Entities**: this library contains the **Payload** class. This class defines the structure and content of the[EventData](https://msdn.microsoft.com/en-us/library/microsoft.servicebus.messaging.eventdata.aspx?f=255&MSPPError=-2147217396) message body.
* **EventProcessorHostWorkerRole**: this library defines the worker role used to handle the events from the event hub.
* **Helpers**: this library defines the **TraceEventSource** class used by the worker role to create ETW logs at runtime.
* **DeviceSimulator**: this **Windows Forms** application can be used to create the **Event Hub** used by the sample and simulate a configurable amount of devices sending telemetry events to the IoT application.
* **StoreEventsToAzureSqlDatabase**: this project defines the cloud service hosting the **Worker Role** used to handle the events from the event hub.
**NOTE**:To reduce the size of the zip file, I deleted the NuGet packages. To repair the solution, make sure to right click the solution and select**Enable NuGet Package Restore**. For more information on this topic, see the following[post](http://blogs.4ward.it/enable-nuget-package-restore-in-visual-studio-and-tfs-2012-rc-to-building-windows-8-metro-apps/).
# Solution
This section briefly describes the individual components of the solution.
## SQL Azure Database
Run the**CreateIoTDbWithMerge.sql**script to create the database used by the solution. In particular, the script create the following artifacts:
* The **Events** table used to store events.
* The **sp_InsertJsonEvents** stored procedure used to store events.
The stored procedure receives a single input parameter of type **nvarchar(max)** which contains the events to store in **JSON** format and uses the[MERGE](https://msdn.microsoft.com/en-us/library/bb510625.aspx)statement to implement an **UPSERT** mechanism. This technique is commonly used to implement idempotency: if an row already exists in the table with the a given EventId, the store procedure updates its columns, otherwise a new record is created.
The stored procedure uses the [OPENJSON](https://msdn.microsoft.com/en-us/library/dn921885.aspx) table-value function that parses JSON text and returns objects and properties in JSON as rows and columns. [OPENJSON](https://msdn.microsoft.com/en-us/library/dn921885.aspx) provides a rowset view over a JSON document, with the ability to explicitly specify the columns in the rowset and the property paths to use to populate the columns. Since OPENJSON returns a set of rows, you can use [OPENJSON](https://msdn.microsoft.com/en-us/library/dn921885.aspx) function in FROM clause of Transact-SQL statements like any other table, view, or table-value function.
The [OPENJSON](https://msdn.microsoft.com/en-us/library/dn921885.aspx) function is available only under compatibility level 130. If your database compatibility level is lower than 130, SQL Server will not be able to find and execute OPENJSON function. Other JSON functions are available at all compatibility levels. You can check compatibility level in sys.databases view or in database properties. You can change a compatibility level of database using the following command: **ALTER DATABASE DatabaseName SET COMPATIBILITY_LEVEL = 130**. Note that compatibility level 120 might be default even in new Azure SQL Databases. For more information on the new JSON support in Azure SQL Database, see [JSON functionalities in Azure SQL Database](https://azure.microsoft.com/en-us/blog/json-functionalities-in-azure-sql-database-public-preview/).
```sql
USE IoTDB
GO
-- Drop sp_InsertJsonEvents stored procedure
DROP PROCEDURE IF EXISTS [dbo].[sp_InsertJsonEvents]
GO
-- Drop Events table
DROP TABLE IF EXISTS [dbo].[Events]
GO
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
-- Create Events table
CREATE TABLE [dbo].[Events](
[EventId] [int] NOT NULL,
[DeviceId] [int] NOT NULL,
[Value] [int] NOT NULL,
[Timestamp] [datetime2](7) NULL,
PRIMARY KEY CLUSTERED
(
[EventId] ASC
)WITH (PAD_INDEX = OFF,
STATISTICS_NORECOMPUTE = OFF,
IGNORE_DUP_KEY = OFF,
ALLOW_ROW_LOCKS = ON,
ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY]
GO
-- Create sp_InsertEvents stored procedure
CREATE PROCEDURE dbo.sp_InsertJsonEvents
@Events NVARCHAR(MAX)
AS
BEGIN
MERGE INTO dbo.Events AS A
USING (SELECT *
FROM OPENJSON(@Events)
WITH ([eventId] int, [deviceId] int, [value] int, [timestamp] datetime2(7))) B
ON (A.EventId = B.EventId)
WHEN MATCHED THEN
UPDATE SET A.DeviceId = B.DeviceId,
A.Value = B.Value,
A.Timestamp = B.Timestamp
WHEN NOT MATCHED THEN
INSERT (EventId, DeviceId, Value, Timestamp)
VALUES(B.EventId, B.DeviceId, B.Value, B.Timestamp);
END
GO
```
## Entities
The following table contains the code of the **Payload** class. This class is used to define the body of the messages sent to the event hub.Note that the properties of the class are decorated with the **JsonPropertyAttribute**. In fact, the code the client application uses [Json.NET](http://www.newtonsoft.com/json) to serialize and deserialize the message content in JSON format.
```csharp
#region Using Directives
using System;
using Newtonsoft.Json;
#endregion
namespace Microsoft.AzureCat.Samples.Entities
{
[Serializable]
public class Payload
{
///
/// Gets or sets the device id.
///
[JsonProperty(PropertyName = "eventId", Order = 1)]
public int EventId { get; set; }
///
/// Gets or sets the device id.
///
[JsonProperty(PropertyName = "deviceId", Order = 2)]
public int DeviceId { get; set; }
///
/// Gets or sets the device value.
///
[JsonProperty(PropertyName = "value", Order = 3)]
public int Value { get; set; }
///
/// Gets or sets the event timestamp.
///
[JsonProperty(PropertyName = "timestamp", Order = 4)]
public DateTime Timestamp { get; set; }
}
}
```
## Helpers
This library defines the **TraceEventSource** class used by the worker role to trace events to ETW logs. The WAD agent running on the worker role instances will read data out of local ETW logs and persist this data to a couple of storage tables (**WASDiagnosticTable** and **WADEventProcessorTable**) in the storage account configured for Windows Azure Diagnostics.
```csharp
#region Using Directives
using System;
using System.Diagnostics.Tracing;
using System.Runtime.CompilerServices;
using Microsoft.WindowsAzure.ServiceRuntime;
#endregion
namespace Microsoft.AzureCat.Samples.Helpers
{
[EventSource(Name = "TraceEventSource")]
public sealed class TraceEventSource : EventSource
{
#region Internal Enums
public class Keywords
{
public const EventKeywords EventHub = (EventKeywords)1;
public const EventKeywords DataBase = (EventKeywords)2;
public const EventKeywords Diagnostic = (EventKeywords)4;
public const EventKeywords Performance = (EventKeywords)8;
}
#endregion
#region Public Static Properties
public static readonly TraceEventSource Log = new TraceEventSource();
#endregion
#region Private Methods
[Event(1,
Message = "TraceIn",
Keywords = Keywords.Diagnostic,
Level = EventLevel.Verbose)]
private void TraceIn(string application,
string instance,
Guid activityId,
string description,
string source,
string method)
{
if (string.IsNullOrWhiteSpace(application) ||
string.IsNullOrWhiteSpace(instance))
{
return;
}
WriteEvent(1, application, instance, activityId, description, source, method);
}
[Event(2,
Message = "TraceOut",
Keywords = Keywords.Diagnostic,
Level = EventLevel.Verbose)]
private void TraceOut(string application,
string instance,
Guid activityId,
string description,
string source,
string method)
{
if (string.IsNullOrWhiteSpace(application) ||
string.IsNullOrWhiteSpace(instance))
{
return;
}
WriteEvent(2, application, instance, activityId, description, source, method);
}
[Event(3,
Message = "TraceApi",
Keywords = Keywords.Diagnostic,
Level = EventLevel.Informational)]
private void TraceExec(string application,
string instance,
Guid activityId,
double elapsed,
string description,
string source,
string method)
{
if (string.IsNullOrWhiteSpace(application) ||
string.IsNullOrWhiteSpace(instance) ||
string.IsNullOrWhiteSpace(description))
{
return;
}
WriteEvent(3, application, instance, activityId, elapsed, description, source, method);
}
[Event(4,
Message = "TraceInfo",
Keywords = Keywords.Diagnostic,
Level = EventLevel.Informational)]
private void TraceInfo(string application,
string instance,
string description,
string source,
string method)
{
if (string.IsNullOrWhiteSpace(application) ||
string.IsNullOrWhiteSpace(instance) ||
string.IsNullOrWhiteSpace(description))
{
return;
}
WriteEvent(4, application, instance, description, source, method);
}
[Event(5,
Message = "TraceError",
Keywords = Keywords.Diagnostic,
Level = EventLevel.Error)]
private void TraceError(string application,
string instance,
Guid activityId,
string exception,
string innerException,
string source,
string method)
{
if (string.IsNullOrWhiteSpace(application) ||
string.IsNullOrWhiteSpace(instance) ||
string.IsNullOrWhiteSpace(exception))
{
return;
}
WriteEvent(5,
application,
instance,
activityId,
exception,
string.IsNullOrWhiteSpace(innerException) ?
string.Empty :
innerException,
source,
method);
}
[Event(6,
Message = "OpenPartition",
Keywords = Keywords.EventHub,
Level = EventLevel.Informational)]
private void OpenPartition(string application,
string instance,
string eventHub,
string consumerGroup,
string partitionId,
string source,
string method)
{
if (string.IsNullOrWhiteSpace(application) ||
string.IsNullOrWhiteSpace(instance) ||
string.IsNullOrWhiteSpace(eventHub) ||
string.IsNullOrWhiteSpace(consumerGroup) ||
string.IsNullOrWhiteSpace(partitionId))
... ...
近期下载者:
相关文件:
收藏者: