azure-hubs-sql-stream-analytics

所属分类:数据库系统
开发工具:GO
文件大小:0KB
下载次数:0
上传日期:2022-02-25 01:41:59
上 传 者sh-1993
说明:  azure中心sql流分析,,
(azure hubs sql stream analytics,,)

文件列表:
database/ (0, 2022-02-24)
database/tables.sql (639, 2022-02-24)
deployment/ (0, 2022-02-24)
deployment/main.bicep (7192, 2022-02-24)
images/ (0, 2022-02-24)
images/Architecture-Hubs-Stream-Sql.png (37919, 2022-02-24)
src/ (0, 2022-02-24)
src/AzFuncDemo.sln (1616, 2022-02-24)
src/Domain.Common/ (0, 2022-02-24)
src/Domain.Common/Domain.Common.csproj (138, 2022-02-24)
src/Domain.Common/Models/ (0, 2022-02-24)
src/Domain.Common/Models/ACEvent.cs (320, 2022-02-24)
src/Domain.Common/Models/BaseEvent.cs (302, 2022-02-24)
src/Domain.Common/Models/GeneratorEvent.cs (369, 2022-02-24)
src/Domain.Common/Models/MotorEvent.cs (303, 2022-02-24)
src/EventsApi.Dockerfile (1045, 2022-02-24)
src/GenerateMessages/ (0, 2022-02-24)
src/GenerateMessages/Function1.cs (3401, 2022-02-24)
src/GenerateMessages/GenerateMessages.csproj (797, 2022-02-24)
src/GenerateMessages/Properties/ (0, 2022-02-24)
src/GenerateMessages/Properties/serviceDependencies.json (177, 2022-02-24)
src/GenerateMessages/Properties/serviceDependencies.local.json (190, 2022-02-24)
src/GenerateMessages/host.json (217, 2022-02-24)
src/GenerateMessages/local.settings.json (157, 2022-02-24)
src/Monitor.Dockerfile (1037, 2022-02-24)
src/Receiver.Dockerfile (811, 2022-02-24)
src/Sender.Dockerfile (1030, 2022-02-24)
src/azclidacrbuild.sh (695, 2022-02-24)
src/common/ (0, 2022-02-24)
src/common/apphelper.go (224, 2022-02-24)
src/common/build.go (50, 2022-02-24)
src/common/hubshelper.go (441, 2022-02-24)
src/common/messages.go (1448, 2022-02-24)
src/common/structs.go (2374, 2022-02-24)
src/eventsapi/ (0, 2022-02-24)
src/eventsapi/main.go (3779, 2022-02-24)
src/eventsapi/run.sh (487, 2022-02-24)
... ...

# Demo of Azure integration with Event Hubs, Azure Stream Analytics, and Azure SQL ## Problem Statement A customer is sending messages of different types to Event Hubs and wants to save these messages to Azure SQL tables without having to write custom code. He also wants to detect anomalies and process these anomalies as they are raised. ## Solution with Stream Analytics Azure Stream Analytics offers functionaltiy that allows messages to be read from Event Hubs, filter them, process them and save them to different destinations including Azure SQL tables and back to Event Hubs. ## Solution Diagram ![Solution Diagram](https://github.com/msalemor/azure-hubs-sql-stream-analytics/blob/master/images/Architecture-Hubs-Stream-Sql.png) ## Services Deployment [Bicep template](https://github.com/msalemor/azure-hubs-sql-stream-analytics/blob/master/deployment/main.bicep) ## Azure SQL ### Table Definitions ```sql create table ACEvents ( Id int not null primary key identity, DeviceId varchar(10) not null, Ts DateTime not null, CoolantTemperature float not null, AirFlow float not null, AirTemperature float not null ) create table GeneratorEvents ( Id int not null primary key identity, DeviceId varchar(10) not null, Ts DateTime not null, Hertz float not null, Amps float not null, Voltage float not null, GasPercentage float not null ) create table MotorEvents ( Id int not null primary key identity, DeviceId varchar(10) not null, Ts DateTime not null, Temperature float not null, Revolutions float not null ) ``` ## Stream Analytics Setup ### Event Hubs Input - Azure Hubs [hub-ecloud1-location1] - Hub [hub-location1] - Consumer group [hub_location1_cg] ### SQL Output Azure SQL Tables: - ACEvents [hubdb-ACEvents] - GeneratorEvents [hubdb-GeneratorEvents] - MotorEvents [hubdb-MotorEvents] ### Event Hub Output - Anomalies Hub [anomalies-hub] ### Stream Analytics Jobs > Note: One Stream Analytic jobs instance can process many jobs. The Stream Analytics query language can perform time based operations, aggregations, etc. ```sql with [allData] as ( select * FROM [hub-ecloud1-location1] ), anomalies AS ( SELECT a.ts,a.deviceId,'ACAnomality' as eventType, 'airFlow' as property, a.airflow as value FROM allData a where a.type='ACEvent' and a.airFlow=0 UNION SELECT a.ts,a.deviceId,'GeneratorAnomality' as eventType, 'voltage' as property, a.voltage as value FROM allData a where a.type='GeneratorEvent' and a.voltage=0 UNION SELECT a.ts,a.deviceId,'GeneratorAnomality' as eventType, 'gasPercentage' as property, a.voltage as value FROM allData a where a.type='GeneratorEvent' and a.gasPercentage<20 UNION SELECT a.ts,a.deviceId,'MotorAnomality' as eventType, 'revolutions' as property, a.revolutions as value FROM allData a where a.type='MotorEvent' and a.revolutions=0 ) select a.ts,a.deviceId,a.eventType,a.property,a.value into [anomalies-hub] from anomalies a; select a.deviceId,a.ts,a.coolantTemperature,a.airFlow,a.airTemperature into [hubdb-ACEvents] from allData a where type='ACEvent'; select a.deviceId,a.ts,a.hertz,a.amps,a.voltage,a.gasPercentage into [hubdb-GeneratorEvents] from allData a where type='GeneratorEvent'; select a.deviceId,a.ts,a.temperature,a.revolutions into [hubdb-MotorEvents] from allData a where type='MotorEvent'; ```` ## Services ### Common structures For the purposes of this demo, the code has been implemented in GO and the different executables share these structures in common: ```go type acEvent struct { Ts time.Time `json:"ts"` Type string `json:"type"` DeviceID string `json:"deviceId"` AirFlow float64 `json:"airflow"` AirTemperature float64 `json:"airTemperature"` CoolantTemperature float64 `json:"coolantTemperature"` } type generatorEvent struct { Ts time.Time `json:"ts"` Type string `json:"type"` DeviceID string `json:"deviceId"` Hertz float64 `json:"hertz"` Amps float64 `json:"amps"` Voltage float64 `json:"voltage"` GasPercentage float64 `json:"gasPercentage"` } type motorEvent struct { Ts time.Time `json:"ts"` Type string `json:"type"` DeviceID string `json:"deviceId"` Temperature float64 `json:"temperature"` Revolutions float64 `json:"revolutions"` } type EventsRequest struct { Count int `json:"count"` Delay int `json:"delay"` Batch bool `json:"batch"` } type EventsResponse struct { Message string `json:"message"` Count int `json:"count"` Delay int `json:"delay"` Batch bool `json:"batch"` } type AnomalyEvent struct { Ts time.Time `json:"ts"` DeviceID string `json:"deviceId"` EventType string `json:"eventType"` Property string `json:"Property"` Value float64 `json:"Value"` } ``` ### Sender - Emmiting Events to Event Hubs The sender application is an API server that can receive a message to emmit events via a POST event. ```go func GetRandomEvent() string { anomaly := getRandom(1, 6) eventType := getRandom(1, 4) airTemperature := float64(getRandom(600, 800)) / 10.0 airFlow := float64(getRandom(30, 40)) / 10.0 coolantTemperature := float64(getRandom(200, 400)) / 10.0 gasPercentage := float64(getRandom(1, 1000)) / 10.0 voltage := float64(getRandom(2300, 2450)) / 10.0 motorTemp := float64(getRandom(1800, 2000)) / 10.0 motorRevolutions := float64(getRandom(2000, 5000)) / 10.0 hertz := float64(getRandom(580, 650)) / 10.0 amps := float64(getRandom(150, 250)) / 10.0 if anomaly == RaiseAnomaly { voltage = 0 motorTemp = 0 motorRevolutions = 0 gasPercentage = 10 airFlow = 0 airTemperature = 90 } var event Event if eventType == AC_VENT { event = NewACEvent(airFlow, airTemperature, coolantTemperature) jsonBytes, _ := json.Marshal(event) return string(jsonBytes) } else if eventType == GENERATOR_EVENT { event = NewGeneratorEvent(hertz, amps, voltage, gasPercentage) jsonBytes, _ := json.Marshal(event) return string(jsonBytes) } else { event = NewMotorEvent(motorTemp, motorRevolutions) jsonBytes, _ := json.Marshal(event) return string(jsonBytes) } } func processInBatch(eventRequest common.EventsRequest) error { ctx, cancel := context.WithTimeout(context.Background(), time.Duration(30)*time.Second) defer cancel() hub, err := eventhub.NewHubFromConnectionString(connectionString) if err != nil { logger.Error(err) return err } if verbose { logger.Debug(fmt.Sprintf("Sending %d messages in batch", eventRequest.Count)) } var events []*eventhub.Event for i := 1; i <= eventRequest.Count; i++ { evt := common.GetRandomEvent() events = append(events, eventhub.NewEventFromString(evt)) time.Sleep(1 * time.Millisecond) } hub.SendBatch(ctx, eventhub.NewEventBatchIterator(events...)) if verbose { logger.Debug(fmt.Sprintf("Sent: %d messages in batch", eventRequest.Count)) } wg.Done() return nil } ``` The expected POST message can be: ```{ "count": 50, "delay": 10, "batch": false}``` or ```{ "count": 100, "delay": 1, "batch": true}``` Where: - Count: The number of messages to send - Delay: Delay between messages (ignored when in batch mode) - Batch: Send messages in batch or not ### Monitor The monitor application polls the SQL tables are reports the number of rows in the tables. ```go func getRowTotals() { for { acRows, _ := getRowCount("ACEvents") genRows, _ := getRowCount("GeneratorEvents") motorRows, _ := getRowCount("MotorEvents") fmt.Println(styles.Bold(colors.Green("AC Events:")), acRows) fmt.Println(styles.Bold(colors.Green("Generator Events:")), genRows) fmt.Println(styles.Bold(colors.Green("Motor Events:")), motorRows) time.Sleep(250 * time.Millisecond) ansi.HideCursor() fmt.Print(ansi.CursorUp(3)) ansi.ShowCursor() } wg.Done() } ``` ### Receiver The receiver application subscribes to the Anomaly hub and processes the messages as they are raised via a handler. The application is able to keep in state the last message received and avoids re-processing. ```go func main() { // Azure Storage account information storageAccountName := common.MustPassEvn("STORAGE_NAME") storageAccountKey := common.MustPassEvn("STORAGE_KEY") storageContainerName := common.MustPassEvn("STORAGE_CONTAINER") // Azure Event Hub connection string eventHubConnStr := common.MustPassEvn("EVENT_HUBS_STRING") parsed, err := conn.ParsedConnectionFromStr(eventHubConnStr) if err != nil { // handle error logger.Error(err) os.Exit(1) } // create a new Azure Storage Leaser / Checkpointer cred, err := azblob.NewSharedKeyCredential(storageAccountName, storageAccountKey) if err != nil { logger.Error(err) os.Exit(1) } leaserCheckpointer, err := storage.NewStorageLeaserCheckpointer(cred, storageAccountName, storageContainerName, azure.PublicCloud) if err != nil { logger.Error(err) os.Exit(1) } // SAS token provider for Azure Event Hubs provider, err := sas.NewTokenProvider(sas.TokenProviderWithKey(parsed.KeyName, parsed.Key)) if err != nil { logger.Error(err) os.Exit(1) } ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) defer cancel() // create a new EPH processor processor, err := eph.New(ctx, parsed.Namespace, parsed.HubName, provider, leaserCheckpointer, leaserCheckpointer) if err != nil { fmt.Println(err) return } // register a message handler -- many can be registered handlerID, err := processor.RegisterHandler(ctx, func(c context.Context, e *eventhub.Event) error { var anomaly common.AnomalyEvent err = json.Unmarshal(e.Data, &anomaly) if err == nil { fmt.Println(colors.Green("Device ID:"), anomaly.DeviceID) fmt.Println(colors.Green("Type:"), anomaly.EventType) fmt.Println(colors.Yellow("Property:"), anomaly.Property) strValue := strconv.FormatFloat(anomaly.Value, 'f', 5, 64) fmt.Println(colors.Yellow("Value:"), colors.Red(strValue)) } return nil }) if err != nil { logger.Error(err) os.Exit(1) } fmt.Printf("handler id: %q is running\n", handlerID) // start handling messages from all of the partitions balancing across multiple consumers err = processor.StartNonBlocking(ctx) if err != nil { logger.Error(err) os.Exit(1) } // Wait for a signal to quit: logger.Info("Listening for events. Press CTRL+C to exit.") signalChan := make(chan os.Signal, 1) signal.Notify(signalChan, os.Interrupt, os.Kill) <-signalChan logger.Debug("Terminating program & closing EPH Processor") err = processor.Close(context.Background()) if err != nil { logger.Error(err) os.Exit(1) } } ```

近期下载者

相关文件


收藏者