Hummingbird

所属分类:Redis
开发工具:C#
文件大小:374KB
下载次数:0
上传日期:2023-05-14 03:53:08
上 传 者sh-1993
说明:  Hummingbird,分布式锁,分布式ID,分布式消息队列、配置中心、注册中心、服务注册发现、超时、重试、熔断、负载均衡
(Hummingbird, distributed lock, distributed ID, distributed message queue, configuration center, registration center, service registration discovery, timeout, retry, fuse, load balancing)

文件列表:
Hummingbird.sln (28617, 2023-06-15)
Hummingbird.sln.DotSettings.user (1367, 2023-06-15)
LICENSE (11357, 2023-06-15)
ci-clear.sh (94, 2023-06-15)
ci-nuget-pack.sh (190, 2023-06-15)
dingding.jpg (166280, 2023-06-15)
example (0, 2023-06-15)
example\Hummingbird.Example (0, 2023-06-15)
example\Hummingbird.Example\Config (0, 2023-06-15)
example\Hummingbird.Example\Config\appsettings-local.json (3313, 2023-06-15)
example\Hummingbird.Example\Config\bootstrap.json (1092, 2023-06-15)
example\Hummingbird.Example\Controllers (0, 2023-06-15)
example\Hummingbird.Example\Controllers\CacheingController.cs (1436, 2023-06-15)
example\Hummingbird.Example\Controllers\ConfigController.cs (724, 2023-06-15)
example\Hummingbird.Example\Controllers\DistribuctedLockController.cs (1581, 2023-06-15)
example\Hummingbird.Example\Controllers\HttpController.cs (1029, 2023-06-15)
example\Hummingbird.Example\Controllers\MQController.cs (3782, 2023-06-15)
example\Hummingbird.Example\Controllers\UniqueIdController.cs (663, 2023-06-15)
example\Hummingbird.Example\Dockerfile (474, 2023-06-15)
example\Hummingbird.Example\Events (0, 2023-06-15)
example\Hummingbird.Example\Events\CanalEvent (0, 2023-06-15)
example\Hummingbird.Example\Events\CanalEvent\CanalEntryEvent.cs (750, 2023-06-15)
example\Hummingbird.Example\Events\CanalEvent\CanalEntryEventHandler.cs (767, 2023-06-15)
example\Hummingbird.Example\Events\MongoSharkEvent (0, 2023-06-15)
example\Hummingbird.Example\Events\MongoSharkEvent\MongodbSharkEvent.cs (1269, 2023-06-15)
example\Hummingbird.Example\Events\MongoSharkEvent\MongodbSharkEventHandler.cs (565, 2023-06-15)
example\Hummingbird.Example\Events\TestEvent (0, 2023-06-15)
example\Hummingbird.Example\Events\TestEvent\TestEvent.cs (228, 2023-06-15)
example\Hummingbird.Example\Events\TestEvent\TestEventHandle.cs (756, 2023-06-15)
example\Hummingbird.Example\Hummingbird.Example.csproj (6742, 2023-06-15)
example\Hummingbird.Example\Hummingbird.Example.csproj.user (406, 2023-06-15)
example\Hummingbird.Example\Interceptors (0, 2023-06-15)
example\Hummingbird.Example\Interceptors\DependencyInjectionExtersion.cs (1329, 2023-06-15)
example\Hummingbird.Example\Interceptors\MetricInterceptor.cs (781, 2023-06-15)
example\Hummingbird.Example\Interceptors\TracerInterceptor.cs (661, 2023-06-15)
example\Hummingbird.Example\Jobs (0, 2023-06-15)
example\Hummingbird.Example\Jobs\HelloJob.cs (805, 2023-06-15)
... ...

# Hummingbird [toc] ## 1. 项目简介 项目开发常用脚手架 ## 2. 功能概要 * 分布式锁 * 基于Redis * 基于Consul * 分布式缓存 * 基于Redis * 分布式Id * 基于Snowfake * 分布式追踪 Opentracing * 基于Jaeger * 消息总线 * 消息队列 * 基于Rabbitmq * 基于Kafka * 消息可靠性保证 * 基于MySql * 基于SqlServer * 健康检查 * Mongodb 健康检查 * MySql 健康检查 * SqlServer 健康检查 * Redis 健康检查 * Rabbitmq 健康检查 * Kafka 健康检查 * 负载均衡 * 随机负载均衡 * 轮训负载均衡 * 配置中心 * 基于Apollo配置中心 * 基于Nacos配置中心 * 服务注册 * 基于Consul服务注册和发现 * 基于Nacos服务注册和发现 * 服务调用 * 基于HTTP弹性客户端(支持:服务发现、负载均衡、超时、重试、熔断) * 基于HTTP非弹性客户端(支持:服务发现、负载均衡) * Canal 数据集成 * 输出到控制台 * 输出到Rabbitmq(待实现) * 输出到Kafka(待实现) ## 3. 项目中如何使用 ### 3.1 分布式锁 步骤1:安装Nuget包 ``` SHELL Install-Package Hummingbird.Extensions.DistributedLock -Version 1.15.0 ``` 步骤2:配置连接信息 ``` C# public class Startup { public void ConfigureServices(IServiceCollection services) { services.AddHummingbird(hb => { hb.AddDistributedLock(option => { option.WithDb(0); option.WithKeyPrefix(""); option.WithPassword("123456"); option.WithServerList("127.0.0.1:6379"); option.WithSsl(false); }); }); } } ``` 步骤3:测试分布式锁 ``` C# using Microsoft.AspNetCore.Mvc; using System; using System.Threading.Tasks; [Route("api/[controller]")] public class DistributedLockController : Controller { private readonly IDistributedLock distributedLock; public DistributedLockController(IDistributedLock distributedLock) { this.distributedLock = distributedLock; } [HttpGet] [Route("Test")] public async Task Test() { var lockName = "name"; var lockToken = Guid.NewGuid().ToString("N"); try { if (distributedLock.Enter(lockName, lockToken, TimeSpan.FromSeconds(30))) { // do something return "ok"; } else { return "error"; } } finally { distributedLock.Exit(lockName, lockToken); } } } ``` ### 3.2 分布式缓存 步骤1:安装Nuget包 ``` SHELL Install-Package Hummingbird.Extensions.Cacheing -Version 1.15.0 ``` 步骤2:设置缓存连接信息 ```C# public class Startup { public void ConfigureServices(IServiceCollection services) { services.AddHummingbird(hb => { hb.AddCacheing(option => { option.WithDb(0); option.WithKeyPrefix(""); option.WithPassword("123456"); option.WithReadServerList("192.168.109.44:6379"); option.WithWriteServerList("192.168.109.44:6379"); option.WithSsl(false); }) }); } } ``` 步骤3:测试Redis缓存 ```c# using Hummingbird.Extensions.Cacheing; using Microsoft.AspNetCore.Mvc; using System.Threading.Tasks; [Route("api/[controller]")] public class CacheingController : Controller { private readonly ICacheManager cacheManager; public CacheingController(ICacheManager cacheManager) { this.cacheManager = cacheManager; } [HttpGet] [Route("Test")] public async Task Test() { var cacheKey = "cacheKey"; var cacheValue = cacheManager.StringGet(cacheKey); if(cacheValue == null) { cacheValue = "value"; cacheManager.StringSet(cacheKey, cacheValue); } return cacheValue; } ``` ### 3.3 分布式Id 步骤1: 安装Nuget包 ``` SHELL Install-Package Hummingbird.Extensions.UidGenerator -Version 1.15.5 Install-Package Hummingbird.Extensions.UidGenerator.ConsulWorkIdStrategy -Version 1.15.7 ``` 步骤2:配置使用Snowfake算法生产唯一Id ```c# public class Startup { public void ConfigureServices(IServiceCollection services) { services.AddHummingbird(hb => { hb.AddSnowflakeUniqueIdGenerator(workIdBuilder => { workIdBuilder.CenterId = 0; // 设置CenterId workIdBuilder.AddConsulWorkIdCreateStrategy("Example"); //设置使用Consul创建WorkId }) }); } } ``` 步骤3:测试Id生成 ```c# using Hummingbird.Extensions.UidGenerator; using Microsoft.AspNetCore.Mvc; using System.Threading.Tasks; [Route("api/[controller]")] public class UniqueIdController : Controller { private readonly IUniqueIdGenerator uniqueIdGenerator; public UniqueIdController(IUniqueIdGenerator uniqueIdGenerator) { this.uniqueIdGenerator = uniqueIdGenerator; } [HttpGet] [Route("Test")] public async Task Test() { return uniqueIdGenerator.NewId(); } } ``` ### 3.4 分布式追踪 步骤1:安装Nuget包 ``` SHELL Install-Package Hummingbird.Extensions.OpenTracing -Version 1.15.0 Install-Package Hummingbird.Extensions.OpenTracking.Jaeger -Version 1.15.0 ``` 步骤2: 创建tracing.json 配置 ```JSON { "Tracing": { "Open": false, "SerivceName": "SERVICE_EXAMPLE", "FlushIntervalSeconds": 15, "SamplerType": "const", "LogSpans": true, "AgentPort": "5775", //***端口 "AgentHost": "dev.jaeger-agent.service.consul", //***地址 "EndPoint": "http://dev.jaeger-collector.service.consul:14268/api/traces" } } ``` 步骤3:添加tracing.json 配置依赖 ``` C# public class Program { public static void Main(string[] args) { BuildWebHost(args).Run(); } public static IWebHost BuildWebHost(string[] args) => WebHost.CreateDefaultBuilder(args) .UseStartup() .ConfigureAppConfiguration((builderContext, config) => { config.SetBasePath(Directory.GetCurrentDirectory()); config.AddJsonFile("tracing.json"); config.AddEnvironmentVariables(); }) .ConfigureLogging((hostingContext, logging) => { logging.ClearProviders(); }) .Build(); } ``` 步骤3: 配置OpenTracing基于Jaeger实现 ```c# public class Startup { public void ConfigureServices(IServiceCollection services) { services.AddHummingbird(hb => { hb.AddOpenTracing(builder => { builder.AddJaeger(Configuration.GetSection("Tracing")); }) }); } } ``` 步骤4:测试手动埋点日志 ```c# using Microsoft.AspNetCore.Mvc; using System.Threading.Tasks; [Route("api/[controller]")] public class OpenTracingController : Controller { [HttpGet] [Route("Test")] public async Task Test() { using (Hummingbird.Extensions.Tracing.Tracer tracer = new Hummingbird.Extensions.Tracing.Tracer("Test")) { tracer.SetTag("tag1", "value1"); tracer.SetError(); tracer.Log("key1", "value1"); } } } ``` ### 3.5 消息总线 步骤1:安装Nuget包 ``` SHELL Install-Package Hummingbird.Extensions.EventBus -Version 1.15.0 Install-Package Hummingbird.Extensions.EventBus.RabbitMQ -Version 1.15.3 Install-Package Hummingbird.Extensions.EventBus.MySqlLogging -Version 1.15.3 ``` 步骤2:创建消息消费端,消息处理程序 ```c# using Hummingbird.Extensions.EventBus.Abstractions; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; public class TestEvent { public string EventType { get; set; } } public class TestEventHandler1 : IEventHandler { public Task Handle(TestEvent @event, Dictionary headers, CancellationToken cancellationToken) { //执行业务操作1并返回操作结果 return Task.FromResult(true); } } public class TestEventHandler2 : IEventHandler { public Task Handle(TestEvent @event, Dictionary headers, CancellationToken cancellationToken) { //执行业务操作2并返回操作结果 return Task.FromResult(true); } } ``` 步骤2:创建消息生产端,消息发送程序 ```c# using Hummingbird.Extensions.EventBus.Abstractions; using Hummingbird.Extensions.EventBus.Models; using Microsoft.AspNetCore.Mvc; using MySql.Data.MySqlClient; using System.Collections.Generic; using System.Threading.Tasks; using Dapper; using System.Linq; using System.Threading; [Route("api/[controller]")] public class MQPublisherTestController : Controller { private readonly IEventLogger eventLogger; private readonly IEventBus eventBus; public MQPublisherTestController( IEventLogger eventLogger, IEventBus eventBus) { this.eventLogger = eventLogger; this.eventBus = eventBus; } /// /// 无本地事务发布消息,消息直接写入队列 /// [HttpGet] [Route("Test1")] public async Task Test1() { var events = new List() { new EventLogEntry("TestEvent",new Events.TestEvent() { EventType="Test1" }), new EventLogEntry("TestEvent",new { EventType="Test1" }), }; var ret= await eventBus.PublishAsync(events); return ret.ToString(); } /// /// 有本地事务发布消息,消息落盘到数据库确保事务完整性 /// /// [HttpGet] [Route("Test2")] public async Task Test2() { var connectionString = "Server=localhost;Port=63307;Database=test; User=root;Password=123456;pooling=True;minpoolsize=1;maxpoolsize=100;connectiontimeout=180"; using (var sqlConnection = new MySqlConnection(connectionString)) { await sqlConnection.OpenAsync(); var sqlTran = await sqlConnection.BeginTransactionAsync(); var events = new List() { new EventLogEntry("TestEvent",new Events.TestEvent() { EventType="Test2" }), new EventLogEntry("TestEvent",new { EventType="Test2" }), }; //保存消息至业务数据库,保证写消息和业务操作在一个事务 await eventLogger.SaveEventAsync(events, sqlTran); var ret = await sqlConnection.ExecuteAsync("you sql code"); return ret.ToString(); } } /// /// 有本地事务发布消息,消息落盘到数据库,从数据库重新取出消息发送到队列 /// /// [HttpGet] [Route("Test3")] public async Task Test3() { //获取1000条没有发布的事件 var unPublishedEventList = eventLogger.GetUnPublishedEventList(1000); //通过消息总线发布消息 var ret = await eventBus.PublishAsync(unPublishedEventList); if (ret) { await eventLogger.MarkEventAsPublishedAsync(unPublishedEventList.Select(a => a.EventId).ToList(), CancellationToken.None); } else { await eventLogger.MarkEventAsPublishedFailedAsync(unPublishedEventList.Select(a => a.EventId).ToList(), CancellationToken.None); } } } ``` 步骤3: 配置使用Rabbitmq消息队列和使用Mysql消息持久化 ```c# public class Startup { public void ConfigureServices(IServiceCollection services) { services.AddHummingbird(hb => { hb.AddEventBus((builder) => { //使用Rabbitmq 消息队列 builder.AddRabbitmq(factory => { factory.WithEndPoint("192.168.109.2,192.168.109.3", "5672")); factory.WithAuth("guest", "guest"); factory.WithExchange("/"); factory.WithReceiver(PreFetch: 10, ReceiverMaxConnections: 1, ReveiverMaxDegreeOfParallelism: 1); factory.WithSender(10); }); //使用Kafka 消息队列 //builder.AddKafka(option => //{ // option.WithSenderConfig(new Confluent.Kafka.ProducerConfig() // { // EnableDeliveryReports = true, // BootstrapServers = "192.168.78.29:9092,192.168.78.30:9092,192.168.78.31:9092", // // Debug = "msg" // Debug = "broker,topic,msg" // }); // option.WithReceiverConfig(new Confluent.Kafka.ConsumerConfig() // { // // Debug= "consumer,cgrp,topic,fetch", // GroupId = "test-consumer-group", // BootstrapServers = "192.168.78.29:9092,192.168.78.30:9092,192.168.78.31:9092", // }); // option.WithReceiver(1, 1); // option.WithSender(10, 3, 1000 * 5, 50); //}); // 基于MySql 数据库 进行消息持久化,当存在分布式事务问题时 builder.AddMySqlEventLogging(o => { o.WithEndpoint("Server=localhost;Port=63307;Database=test; User=root;Password=123456;pooling=True;minpoolsize=1;maxpoolsize=100;connectiontimeout=180"); }); // 基于SqlServer 数据库 进行消息持久化,当存在分布式事务问题时 //builder.AddSqlServerEventLogging(a => //{ // a.WithEndpoint("Data Source=localhost,63341;Initial Catalog=test;User Id=sa;Password=123456"); //}); }) }); } // This method gets called by the runtime. Use this method to configure the HTTP request pipeline. public void Configure(IApplicationBuilder app, IHostingEnvironment env) { var eventBus = app.ApplicationServices.GetRequiredService(); var logger = app.ApplicationServices.GetRequiredService>(); app.UseHummingbird(humming => { humming.UseEventBus(sp => { sp.UseSubscriber(eventbus => { eventbus.Register("TestEventHandler1", "TestEvent"); eventbus.Register("TestEventHandler2", "TestEvent"); //订阅消息 eventbus.Subscribe((Messages) => { foreach (var message in Messages) { logger.LogDebug($"ACK: queue {message.QueueName} route={message.RouteKey} messageId:{message.MessageId}"); } }, async (obj) => { foreach (var message in obj.Messages) { logger.LogError($"NAck: queue {message.QueueName} route={message.RouteKey} messageId:{message.MessageId}"); } //消息消费失败执行以下代码 if (obj.Exception != null) { logger.LogError(obj.Exception, obj.Exception.Message); } // 消息等待5秒后重试,最大重试次数3次 var events = obj.Messages.Select(message => message.WaitAndRetry(a => 5,3)).ToList(); // 消息写到重试队列 var ret = !(await eventBus.PublishAsync(events)); return ret; }); }); }); }); } } ``` ### 3.6 健康检查 步骤1: 安装Nuget包 ``` SHELL Install-Package Hummingbird.Extensions.HealthChecks -Version 1.15.0 Install-Package Hummingbird.Extensions.HealthChecks.Redis -Version 1.15.0 Install-Package Hummingbird.Extensions.HealthChecks.Rabbitmq -Version 1.15.0 Install-Package Hummingbird.Extensions.HealthChecks.MySql -Version 1.15.0 Install-Package Hummingbird.Extensions.HealthChecks.SqlServer -Version 1.15.0 ``` 步骤2: 配置健康检查Endpoint ```c# public class Program { public static void Main(string[] args) { BuildWebHost(args).Run(); } public static IWebHost BuildWebHost(string[] args) => WebHost.CreateDefaultBuilder(args) .UseStartup() .UseHealthChecks("/healthcheck") .ConfigureAppConfiguration((builderContext, config) => { config.SetBasePath(Directory.GetCurrentDirectory()); config.AddEnvironmentVariables(); }) .ConfigureLogging((hostingContext, logging) => { logging.ClearProviders(); }) .Build(); } ``` 步骤3: 配置监控检查项 ```c# public class Startup { public IConfiguration Configuration { get; } // This method gets called by the runtime. Use this method to add services to the container. public void ConfigureServices(IServiceCollection services) { services.AddHealthChecks(checks => { checks.WithDefaultCacheDuration(TimeSpan.FromSeconds(5)); checks.AddMySqlCheck("mysql", "Server=localhost;Port=63307;Database=test; User=root;Password=123456;pooling=True;minpoolsize=1;maxpoolsize=100;connectiontimeout=180;SslMode=None"); checks.AddSqlCheck("sqlserver", "Data Source=localhost,63341;Initial Catalog=test;User Id=sa;Password=123456"); checks.AddRedisCheck("redis", "localhost:637 ... ...

近期下载者

相关文件


收藏者