workerrolejsonsqldb

所属分类:数据库系统
开发工具:C#
文件大小:0KB
下载次数:0
上传日期:2016-05-09 09:55:49
上 传 者sh-1993
说明:  此示例显示如何使用EventProcessorHost从事件中心检索事件,并将它们以批处理模式存储到Azure SQL D...,
(This sample shows how to use the EventProcessorHost to retrieve events from an Event Hub and store them in a batch mode to an Azure SQL Database using the OPENJSON function.)

文件列表:
.nuget/ (0, 2016-05-09)
.nuget/NuGet.Config (164, 2016-05-09)
.nuget/NuGet.exe (1664000, 2016-05-09)
.nuget/NuGet.targets (7484, 2016-05-09)
CONTRIBUTING.md (815, 2016-05-09)
Entities/ (0, 2016-05-09)
Entities/Entities.csproj (3562, 2016-05-09)
Entities/Payload.cs (1779, 2016-05-09)
Entities/Properties/ (0, 2016-05-09)
Entities/Properties/AssemblyInfo.cs (1392, 2016-05-09)
Entities/packages.config (139, 2016-05-09)
EventProcessorHostWorkerRole/ (0, 2016-05-09)
EventProcessorHostWorkerRole/EventProcessor.cs (6335, 2016-05-09)
EventProcessorHostWorkerRole/EventProcessorFactory.cs (2134, 2016-05-09)
EventProcessorHostWorkerRole/EventProcessorHostWorkerRole.csproj (7193, 2016-05-09)
EventProcessorHostWorkerRole/Properties/ (0, 2016-05-09)
EventProcessorHostWorkerRole/Properties/AssemblyInfo.cs (1432, 2016-05-09)
EventProcessorHostWorkerRole/WorkerRole.cs (21143, 2016-05-09)
EventProcessorHostWorkerRole/app.config (5605, 2016-05-09)
EventProcessorHostWorkerRole/packages.config (929, 2016-05-09)
Helpers/ (0, 2016-05-09)
Helpers/CloudConfigurationHelper.cs (2045, 2016-05-09)
Helpers/Helpers.csproj (4053, 2016-05-09)
Helpers/Properties/ (0, 2016-05-09)
Helpers/Properties/AssemblyInfo.cs (1390, 2016-05-09)
Helpers/TraceEventSource.cs (15690, 2016-05-09)
Helpers/packages.config (167, 2016-05-09)
Images/ (0, 2016-05-09)
Images/Client.png (48203, 2016-05-09)
Images/ETW01.png (36457, 2016-05-09)
Images/ETW02.png (13356, 2016-05-09)
Images/Prototype.png (37463, 2016-05-09)
LICENSE (1087, 2016-05-09)
Scripts/ (0, 2016-05-09)
Scripts/CreateIoTDb.sql (1216, 2016-05-09)
Scripts/Sample.sql (659, 2016-05-09)
Sender/ (0, 2016-05-09)
... ...

--- services: cloud-services, event-hubs, sql-database platforms: dotnet author: paolosalvatori --- # How to use a Worker Role to read telemetry data from an Event Hub and store it to Azure SQL Database using JSON functionalities This sample shows how to use the **EventProcessorHost** to retrieve events from an **Event Hub** and store them in a batch mode to an **Azure SQL Database** using the [OPENJSON](https://msdn.microsoft.com/en-us/library/dn921885.aspx) function.The solution demonstrates how the use the following techniques: * Send events to an [Event Hub](https://msdn.microsoft.com/en-us/library/azure/dn789973.aspx) using both AMQP and HTTPS transport protocols. * Create an entity level shared access policy with only the Send claim. This key will be used to create SAS tokens, one for each publisher endpoint. * Issue a SAS token to secure individual publisher endpoints. * Use a SAS token to authenticate at a publisher endpoint level. * Use the[EventProcessorHost](https://msdn.microsoft.com/en-us/library/microsoft.servicebus.messaging.eventprocessorhost(v=azure.95).aspx)to retrieve and process events from an event hub. * Perform structured and semantic logging using a custom[EventSource](https://msdn.microsoft.com/en-us/library/system.diagnostics.tracing.eventsource%28v=vs.110%29.aspx?f=255&MSPPError=-2147217396)class and ETW Logs introduced by Azure SDK 2.5. * Use the [OPENJSON](https://msdn.microsoft.com/en-us/library/dn921885.aspx) table-value function in a stored procedure to process a batch of rows. **NOTE**: this article is not intended to provide an exhaustive analysis of the various batching techniques offered by Azure SQL Database. Relying on batching to optimize data ingestion is a topic by itself, if you’re interested in the details take a look at this dedicated article: [How to use batching to improve SQL Database application performance](https://azure.microsoft.com/en-us/documentation/articles/sql-database-use-batching-to-improve-performance/). Also look at [How to store Event Hub events to Azure SQL Database](https://code.msdn.microsoft.com/How-to-integrate-store-828769eb) for a version of the sample where the event processor uses a stored procedure with a [Table-Valued Parameter](https://msdn.microsoft.com/en-us/library/bb675163%28v=vs.110%29.aspx?f=255&MSPPError=-2147217396) to store multiple events in a batch mode to a table on an Azure SQL database. # Scenario This solution simulates an Internet of Things (IoT) scenario where thousands of devices send events (e.g. sensor readings) to a backend system via a message broker. The backend system retrieves events from the messaging infrastructure and store them to a persistent repository in a scalable manner. # Architecture The sample is structured as follows: * A Windows Forms application can be used to create an event hub and an entity level shared access policy with only the Send access right.The same application can be used to simulate a configurable amount of devices that send readings into the event hub. Each device uses a separate publisher endpoint to send data to the underlying event hub and a separate SAS token to authenticate with the **Service Bus** namespace. * An **Event Hub** is used to ingest device events. * A worker role with multiple instances uses an[EventProcessorHost](https://msdn.microsoft.com/en-us/library/microsoft.servicebus.messaging.eventprocessorhost(v=azure.95).aspx) to read and process messages from the partitions of the event hub. * The custom [EventProcessor](https://msdn.microsoft.com/en-us/library/microsoft.servicebus.messaging.ieventprocessor.aspx) class inserts a collection of events into a table of a **SQL Database** in a batch mode by invoking a stored procedure. * The worker role uses a custom [EventSource](https://msdn.microsoft.com/en-us/library/system.diagnostics.tracing.eventsource%28v=vs.110%29.aspx?f=255&MSPPError=-2147217396) class and the Windows Azure Diagnostics support for ETW Events to write log data to table storage. * The stored procedure uses the [OPENJSON](https://msdn.microsoft.com/en-us/library/dn921885.aspx) table-value function and the [MERGE](https://msdn.microsoft.com/en-us/library/bb510625.aspx) statement to implement an **UPSERT** mechanism. The following picture shows the architecture of the solution: ![](https://i1.code.msdn.s-msft.com/how-to-integrate-store-828769eb/image/file/134594/1/prototype.png) # References JSON Functionalities of Azure SQL Database * [JSON in SQL Server 2016: Part 1 of 4](https://blogs.technet.microsoft.com/dataplatforminsider/2016/01/05/json-in-sql-server-2016-part-1-of-4/) * [Channel9 Video: SQL Server 2016 and JSON Support](https://channel9.msdn.com/Shows/Data-Exposed/SQL-Server-2016-and-JSON-Support) * [Reference Documentation](https://msdn.microsoft.com/en-us/library/dn921897.aspx) Event Hubs * [Event Hubs](http://azure.microsoft.com/en-us/services/event-hubs/) * [Get started with Event Hubs](http://azure.microsoft.com/en-us/documentation/articles/service-bus-event-hubs-csharp-ephcs-getstarted/) * [Event Hubs Programming Guide](https://msdn.microsoft.com/en-us/library/azure/dn789972.aspx) * [Service Bus Event Hubs Getting Started](https://code.msdn.microsoft.com/windowsazure/Service-Bus-Event-Hub-286fd097) * [Event Hubs Authentication and Security Model Overview](https://msdn.microsoft.com/en-us/library/azure/dn789974.aspx) * [Service Bus Event Hubs Large Scale Secure Publishing](https://code.msdn.microsoft.com/windowsazure/Service-Bus-Event-Hub-99ce67ab) * [Service Bus Event Hubs Direct Receivers](https://code.msdn.microsoft.com/windowsazure/Event-Hub-Direct-Receivers-13fa95c6) * [Service Bus Explorer](https://code.msdn.microsoft.com/windowsapps/Service-Bus-Explorer-f2abca5a) * [Episode 160: Event Hubs with ElioDamaggio](http://channel9.msdn.com/Shows/Cloud+Cover/Episode-160-Event-Hubs-with-Elio-Damaggio) (video) * [Telemetry and Data Flow at Hyper-Scale: Azure Event Hub](http://channel9.msdn.com/Events/TechEd/Europe/2014/CDP-B307) (video) * [Data Pipeline Guidance](https://github.com/mspnp/data-pipeline) (Patterns & Practices solution) * [Event Processor Host Best Practices Part 1](http://blogs.msdn.com/b/servicebus/archive/2015/01/16/event-processor-host-best-practices-part-1.aspx) * [Event Processor Host Best Practices Part 2](http://blogs.msdn.com/b/servicebus/archive/2015/01/21/event-processor-host-best-practices-part-2.aspx) * [How to create a Service Bus Namespace and an Event Hub using a PowerShell script](http://blogs.msdn.com/b/paolos/archive/2014/12/01/how-to-create-a-service-bus-namespace-and-an-event-hub-using-a-powershell-script.aspx) ETW Logs * [Diagnostics: Improved diagnostics logging with ETW](http://azure.microsoft.com/blog/2014/11/12/announcing-azure-sdk-2-5-for-net-and-visual-studio-2015-preview/) # Visual Studio Solution The Visual Studio solution includes the following projects: * **CreateIoTDbWithMerge.sql**: this script can be used to create the **SQL Database** used to store device events. * **Entities**: this library contains the **Payload** class. This class defines the structure and content of the[EventData](https://msdn.microsoft.com/en-us/library/microsoft.servicebus.messaging.eventdata.aspx?f=255&MSPPError=-2147217396) message body. * **EventProcessorHostWorkerRole**: this library defines the worker role used to handle the events from the event hub. * **Helpers**: this library defines the **TraceEventSource** class used by the worker role to create ETW logs at runtime. * **DeviceSimulator**: this **Windows Forms** application can be used to create the **Event Hub** used by the sample and simulate a configurable amount of devices sending telemetry events to the IoT application. * **StoreEventsToAzureSqlDatabase**: this project defines the cloud service hosting the **Worker Role** used to handle the events from the event hub. **NOTE**:To reduce the size of the zip file, I deleted the NuGet packages. To repair the solution, make sure to right click the solution and select**Enable NuGet Package Restore**. For more information on this topic, see the following[post](http://blogs.4ward.it/enable-nuget-package-restore-in-visual-studio-and-tfs-2012-rc-to-building-windows-8-metro-apps/). # Solution This section briefly describes the individual components of the solution. ## SQL Azure Database Run the**CreateIoTDbWithMerge.sql**script to create the database used by the solution. In particular, the script create the following artifacts: * The **Events** table used to store events. * The **sp_InsertJsonEvents** stored procedure used to store events. The stored procedure receives a single input parameter of type **nvarchar(max)** which contains the events to store in **JSON** format and uses the[MERGE](https://msdn.microsoft.com/en-us/library/bb510625.aspx)statement to implement an **UPSERT** mechanism. This technique is commonly used to implement idempotency: if an row already exists in the table with the a given EventId, the store procedure updates its columns, otherwise a new record is created. The stored procedure uses the [OPENJSON](https://msdn.microsoft.com/en-us/library/dn921885.aspx) table-value function that parses JSON text and returns objects and properties in JSON as rows and columns. [OPENJSON](https://msdn.microsoft.com/en-us/library/dn921885.aspx) provides a rowset view over a JSON document, with the ability to explicitly specify the columns in the rowset and the property paths to use to populate the columns. Since OPENJSON returns a set of rows, you can use [OPENJSON](https://msdn.microsoft.com/en-us/library/dn921885.aspx) function in FROM clause of Transact-SQL statements like any other table, view, or table-value function. The [OPENJSON](https://msdn.microsoft.com/en-us/library/dn921885.aspx) function is available only under compatibility level 130. If your database compatibility level is lower than 130, SQL Server will not be able to find and execute OPENJSON function. Other JSON functions are available at all compatibility levels. You can check compatibility level in sys.databases view or in database properties. You can change a compatibility level of database using the following command: **ALTER DATABASE DatabaseName SET COMPATIBILITY_LEVEL = 130**. Note that compatibility level 120 might be default even in new Azure SQL Databases. For more information on the new JSON support in Azure SQL Database, see [JSON functionalities in Azure SQL Database](https://azure.microsoft.com/en-us/blog/json-functionalities-in-azure-sql-database-public-preview/). ```sql USE IoTDB GO -- Drop sp_InsertJsonEvents stored procedure DROP PROCEDURE IF EXISTS [dbo].[sp_InsertJsonEvents] GO -- Drop Events table DROP TABLE IF EXISTS [dbo].[Events] GO SET ANSI_NULLS ON GO SET QUOTED_IDENTIFIER ON GO -- Create Events table CREATE TABLE [dbo].[Events]( [EventId] [int] NOT NULL, [DeviceId] [int] NOT NULL, [Value] [int] NOT NULL, [Timestamp] [datetime2](7) NULL, PRIMARY KEY CLUSTERED ( [EventId] ASC )WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY] ) ON [PRIMARY] GO -- Create sp_InsertEvents stored procedure CREATE PROCEDURE dbo.sp_InsertJsonEvents @Events NVARCHAR(MAX) AS BEGIN MERGE INTO dbo.Events AS A USING (SELECT * FROM OPENJSON(@Events) WITH ([eventId] int, [deviceId] int, [value] int, [timestamp] datetime2(7))) B ON (A.EventId = B.EventId) WHEN MATCHED THEN UPDATE SET A.DeviceId = B.DeviceId, A.Value = B.Value, A.Timestamp = B.Timestamp WHEN NOT MATCHED THEN INSERT (EventId, DeviceId, Value, Timestamp) VALUES(B.EventId, B.DeviceId, B.Value, B.Timestamp); END GO ``` ## Entities The following table contains the code of the **Payload** class. This class is used to define the body of the messages sent to the event hub.Note that the properties of the class are decorated with the **JsonPropertyAttribute**. In fact, the code the client application uses [Json.NET](http://www.newtonsoft.com/json) to serialize and deserialize the message content in JSON format. ```csharp #region Using Directives using System; using Newtonsoft.Json; #endregion namespace Microsoft.AzureCat.Samples.Entities { [Serializable] public class Payload { /// /// Gets or sets the device id. /// [JsonProperty(PropertyName = "eventId", Order = 1)] public int EventId { get; set; } /// /// Gets or sets the device id. /// [JsonProperty(PropertyName = "deviceId", Order = 2)] public int DeviceId { get; set; } /// /// Gets or sets the device value. /// [JsonProperty(PropertyName = "value", Order = 3)] public int Value { get; set; } /// /// Gets or sets the event timestamp. /// [JsonProperty(PropertyName = "timestamp", Order = 4)] public DateTime Timestamp { get; set; } } } ``` ## Helpers
This library defines the **TraceEventSource** class used by the worker role to trace events to ETW logs. The WAD agent running on the worker role instances will read data out of local ETW logs and persist this data to a couple of storage tables (**WASDiagnosticTable** and **WADEventProcessorTable**) in the storage account configured for Windows Azure Diagnostics. ```csharp #region Using Directives using System; using System.Diagnostics.Tracing; using System.Runtime.CompilerServices; using Microsoft.WindowsAzure.ServiceRuntime; #endregion namespace Microsoft.AzureCat.Samples.Helpers { [EventSource(Name = "TraceEventSource")] public sealed class TraceEventSource : EventSource { #region Internal Enums public class Keywords { public const EventKeywords EventHub = (EventKeywords)1; public const EventKeywords DataBase = (EventKeywords)2; public const EventKeywords Diagnostic = (EventKeywords)4; public const EventKeywords Performance = (EventKeywords)8; } #endregion #region Public Static Properties public static readonly TraceEventSource Log = new TraceEventSource(); #endregion #region Private Methods [Event(1, Message = "TraceIn", Keywords = Keywords.Diagnostic, Level = EventLevel.Verbose)] private void TraceIn(string application, string instance, Guid activityId, string description, string source, string method) { if (string.IsNullOrWhiteSpace(application) || string.IsNullOrWhiteSpace(instance)) { return; } WriteEvent(1, application, instance, activityId, description, source, method); } [Event(2, Message = "TraceOut", Keywords = Keywords.Diagnostic, Level = EventLevel.Verbose)] private void TraceOut(string application, string instance, Guid activityId, string description, string source, string method) { if (string.IsNullOrWhiteSpace(application) || string.IsNullOrWhiteSpace(instance)) { return; } WriteEvent(2, application, instance, activityId, description, source, method); } [Event(3, Message = "TraceApi", Keywords = Keywords.Diagnostic, Level = EventLevel.Informational)] private void TraceExec(string application, string instance, Guid activityId, double elapsed, string description, string source, string method) { if (string.IsNullOrWhiteSpace(application) || string.IsNullOrWhiteSpace(instance) || string.IsNullOrWhiteSpace(description)) { return; } WriteEvent(3, application, instance, activityId, elapsed, description, source, method); } [Event(4, Message = "TraceInfo", Keywords = Keywords.Diagnostic, Level = EventLevel.Informational)] private void TraceInfo(string application, string instance, string description, string source, string method) { if (string.IsNullOrWhiteSpace(application) || string.IsNullOrWhiteSpace(instance) || string.IsNullOrWhiteSpace(description)) { return; } WriteEvent(4, application, instance, description, source, method); } [Event(5, Message = "TraceError", Keywords = Keywords.Diagnostic, Level = EventLevel.Error)] private void TraceError(string application, string instance, Guid activityId, string exception, string innerException, string source, string method) { if (string.IsNullOrWhiteSpace(application) || string.IsNullOrWhiteSpace(instance) || string.IsNullOrWhiteSpace(exception)) { return; } WriteEvent(5, application, instance, activityId, exception, string.IsNullOrWhiteSpace(innerException) ? string.Empty : innerException, source, method); } [Event(6, Message = "OpenPartition", Keywords = Keywords.EventHub, Level = EventLevel.Informational)] private void OpenPartition(string application, string instance, string eventHub, string consumerGroup, string partitionId, string source, string method) { if (string.IsNullOrWhiteSpace(application) || string.IsNullOrWhiteSpace(instance) || string.IsNullOrWhiteSpace(eventHub) || string.IsNullOrWhiteSpace(consumerGroup) || string.IsNullOrWhiteSpace(partitionId)) ... ...

近期下载者

相关文件


收藏者