简介
我们在做项目的时候, 往往要处理一些后台的任务. 一般是两种, 一种是不停的运行,比如消息队列的消费者。另一种是定时任务。
在.NET Framework + Windows环境里, 我们一般会使用 Windows 服务处理这些情形.
但在.Net Core + Linux环境里, 有没有类似的解决方案呢? 了解的方法有两种:
- Web Host: 创建一个 ASP.Net Core 的 Web 项目(如MVC 或 WebAPI), 然后使用IHostedService或者BackgroundService处理后台任务. 这种方案是Web项目和后台任务混杂在一起运行.
- Generic Host: 通用主机是 .NET Core 2.1 中的新增功能, 它将HTTP管道从Web Host的API中分离出来, 从而提供更多的主机选择方案, 比如后台任务, 非 HTTP 工作负载等. 同时可以方便使用基础功能如配置、依赖关系注入 [DI] 和日志记录等。
基本用法
-
创建一个控制台程序并添加Hosting Nuget包。
Install-Package Microsoft.Extensions.Hosting Install-Package Microsoft.Extensions.Configuration.EnvironmentVariables
Install-Package Microsoft.Extensions.Configuration.CommandLine
Install-Package Microsoft.Extensions.Configuration.Json Install-Package Microsoft.Extensions.Logging.Console
Install-Package Microsoft.Extensions.Logging.Debug -
创建一个基于 Timer 的简单 Hosted Service. 继承自抽象类 BackgroundService.
public class TimedHostedService : BackgroundService
{
private readonly ILogger _logger;
private Timer _timer; public TimedHostedService(ILogger<TimedHostedService> logger)
{
this._logger = logger;
} protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
_timer = new Timer(DoWork, null, TimeSpan.Zero, TimeSpan.FromSeconds(5));
return Task.CompletedTask;
} private void DoWork(object state)
{
_logger.LogInformation(string.Format("[{0:yyyy-MM-dd hh:mm:ss}] Timed Background Service is working.", DateTime.Now));
} public override Task StopAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("Timed Background Service is stopping.");
_timer?.Change(Timeout.Infinite, 0);
return Task.CompletedTask;
} public override void Dispose()
{
_timer?.Dispose();
base.Dispose();
}
} -
Main 函数中添加 Host的相关代码.
static async Task Main(string[] args)
{
var builder = new HostBuilder()
//Host config
.ConfigureHostConfiguration(configHost =>
{
//配置根目录
//configHost.SetBasePath(Directory.GetCurrentDirectory());
//读取host的配置json,和appsetting类似
//configHost.AddJsonFile("hostsettings.json", true, true);
//读取环境变量,Asp.Net core默认的环境变量是以ASPNETCORE_作为前缀的,这里也采用此前缀以保持一致
configHost.AddEnvironmentVariables(prefix: "ASPNETCORE_");
//启动host的时候之可传入参数
if (args != null)
{
configHost.AddCommandLine(args);
}
})
//App config
.ConfigureAppConfiguration((hostContext, configApp) =>
{
//读取应用的配置json
configApp.AddJsonFile("appsettings.json", optional: true);
//读取应用特定环境下的配置json
configApp.AddJsonFile($"appsettings.{hostContext.HostingEnvironment.EnvironmentName}.json", optional: true);
//读取环境变量
configApp.AddEnvironmentVariables();
//启动host的时候可传入参数
if (args != null)
{
configApp.AddCommandLine(args);
}
})
//配置服务及依赖注入注册
.ConfigureServices((hostContext, services) =>
{
//添加TimedHostedService
services.AddHostedService<TimedHostedService>();
})
//日志配置
.ConfigureLogging((hostContext, configLogging) =>
{
//输出控制台日志
configLogging.AddConsole();
//输出Debug日志
if (hostContext.HostingEnvironment.EnvironmentName == EnvironmentName.Development)
{
configLogging.AddDebug();
}
}); //使用控制台生命周期, Ctrl + C退出
await builder.RunConsoleAsync();
} 运行并测试.
进阶使用
集成第三方日志 Nlog
支持.Net Core的第三方日志库有很多, 下面以 Nlog 为例, 集成到 Generic host 里.
-
添加 NLog.Extensions.Hosting Nuget包
Install-Package NLog.Extensions.Hosting
-
添加配置文件
新建一个文件nlog.config(建议全部小写,linux系统中要注意), 并右键点击其属性,将其“复制到输出目录”设置为“始终复制”。文件内容如下:
<?xml version="1.0" encoding="utf-8" ?>
<nlog xmlns="http://www.nlog-project.org/schemas/NLog.xsd"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
autoReload="true"
internalLogLevel="info"
internalLogFile="logs\internal-nlog.txt"> <!-- the targets to write to -->
<targets>
<!-- write logs to file -->
<target xsi:type="File" name="fileTarget" fileName="logs\${shortdate}.log"
layout="${date}|${level:uppercase=true}|${message} ${exception:format=tostring}|${logger}|${all-event-properties}" />
<target xsi:type="Console" name="consoleTarget"
layout="${date}|${level:uppercase=true}|${message} ${exception:format=tostring}|${logger}|${all-event-properties}" />
</targets> <!-- rules to map from logger name to target -->
<rules>
<logger name="*" minlevel="Trace" writeTo="fileTarget,consoleTarget" />
</rules>
</nlog><ItemGroup>
<None Update="nlog.config">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
</ItemGroup> -
修改 Main 方法:
static async Task Main(string[] args)
{
var logger = LogManager.GetCurrentClassLogger();
try
{
var builder = new HostBuilder()
//Host config
.ConfigureHostConfiguration(configHost =>
{
})
//App config
.ConfigureAppConfiguration((hostContext, configApp) =>
{
})
//Services
.ConfigureServices((hostContext, services) =>
{
})
//Log
//.ConfigureLogging((hostContext, configLogging) =>
//{
// configLogging.AddConsole();
// if (hostContext.HostingEnvironment.EnvironmentName == EnvironmentName.Development)
// {
// configLogging.AddDebug();
// }
//})
.UseNLog(); await builder.RunConsoleAsync();
}
catch (Exception ex)
{
logger.Fatal(ex, "Stopped program because of exception");
throw;
}
finally
{
// Ensure to flush and stop internal timers/threads before application-exit (Avoid segmentation fault on Linux)
LogManager.Shutdown();
}
} 运行并测试.
集成 EF Core
EF Core的集成相对比较简单, 基本上和我们在 MVC / WebAPI 中用法一样.
-
添加 Nuget 包
// SQL Server
Install-Package Microsoft.EntityFrameworkCore.SqlServer -
添加实体
public class Product
{
public int Id { get; set; }
public string Name { get; set; }
public int Count { get; set; }
} public class ProductConfiguration : IEntityTypeConfiguration<Product>
{
public void Configure(EntityTypeBuilder<Product> builder)
{
builder.Property(t => t.Name).IsRequired().HasMaxLength(100); builder.HasData(new Product { Id = 1, Name = "Test_Prouct_1", Count = 0 });
}
} -
添加数据库上下文
public class HostDemoContext : DbContext
{
public HostDemoContext(DbContextOptions<HostDemoContext> options) : base(options)
{ } public DbSet<Product> Products { get; set; } protected override void OnModelCreating(ModelBuilder modelBuilder)
{
base.OnModelCreating(modelBuilder);
modelBuilder.ApplyConfiguration(new ProductConfiguration());
}
} -
添加数据库连接字符串到 appsettings.json 并修改 Main 方法
{
"sqlserverconnection": "Server=.\\SQLEXPRESS;Database=GenericHostDemo;Trusted_Connection=True;"
}//Services
.ConfigureServices((hostContext, services) =>
{
var connectionString = hostContext.Configuration["sqlserverconnection"];
services.AddDbContext<HostDemoContext>(o => o.UseSqlServer(connectionString), ServiceLifetime.Singleton); services.AddHostedService<TimedHostedService>();
}) -
在 Hosted Service 中使用 Context
private readonly HostDemoContext _context; public TimedHostedService(HostDemoContext context)
{
this._context = context;
} private void DoWork(object state)
{
int id = 1;
var product = _context.Products.Find(id);
product.Count++;
_context.SaveChanges(); _logger.LogInformation($"Processed {product.Name} at {DateTime.Now:yyyy-MM-dd hh:mm:ss}, current count is {product.Count}."); //_logger.LogInformation(string.Format("[{0:yyyy-MM-dd hh:mm:ss}] Timed Background Service is working.", DateTime.Now));
} -
迁移Migration
- 通过 Nuget 添加引用
Install-Package Microsoft.EntityFrameworkCore.Tools
-
创建 DesignTimeDbContextFactory 类
不同于Web项目, 执行Add-Migration迁移命令时候由于拿不到连接字符串可能会报错
Unable to create an object of type 'HostDemoContext'. Add an implementation of 'IDesignTimeDbContextFactory<HostDemoContext>' to the project, or see https://go.microsoft.com/fwlink/?linkid=851728 for additional patterns supported at design time.
解决方法:创建一个与DbContext同一目录下的DesignTimeDbContextFactory文件,然后实现接口中的方法CreateDbContext,并配置ConnectionString:
public class DesignTimeDbContextFactory : IDesignTimeDbContextFactory<HostDemoContext>
{
public HostDemoContext CreateDbContext(string[] args)
{
var builder = new DbContextOptionsBuilder<HostDemoContext>();
builder.UseSqlServer("Server=.\\SQLEXPRESS;Database=GenericHostDemo;Trusted_Connection=True;");
return new HostDemoContext(builder.Options);
}
} - 生成迁移. 打开Package Manager Console,执行命令
Add-Migration InitialCreate
更新迁移到数据库. 执行命令
Update-Database
。
- 通过 Nuget 添加引用
运行并测试.
消费 MQ 消息
接下来实现一个后台任务用于监听并消费 RabbitMQ 消息.
这里使用库EasyNetQ, 它是一款基于RabbitMQ.Client封装的API库,正如其名,使用起来比较Easy,它把原RabbitMQ.Client中的很多操作都进行了再次封装,让开发人员减少了很多工作量。详细请参考 https://github.com/EasyNetQ/EasyNetQ
-
添加 EasyNetQ 相关 Nuget 包
install-package EasyNetQ
install-package EasyNetQ.DI.Microsoft -
创建后台服务 RabbitMQDemoHostedService
public class RabbitMQDemoHostedService : BackgroundService
{
private readonly ILogger _logger;
private readonly IBus _bus; public RabbitMQDemoHostedService(ILogger<RabbitMQDemoHostedService> logger, IBus bus)
{
this._logger = logger;
this._bus = bus;
} protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
_bus.Subscribe<DemoMessage>("demo_subscription_1", HandleDemoMessage);
return Task.CompletedTask;
} private void HandleDemoMessage(DemoMessage demoMessage)
{
_logger.LogInformation($"Got Message : {demoMessage.Id} {demoMessage.Text} {demoMessage.CreatedTime}");
}
} -
Program 里面配置相关服务
//Services
.ConfigureServices((hostContext, services) =>
{
//Rabbit MQ
string rabbitMqConnection = hostContext.Configuration["rabbitmqconnection"];
services.RegisterEasyNetQ(rabbitMqConnection); services.AddHostedService<RabbitMQDemoHostedService>();
}) 启动 MQ 发送程序来模拟消息的发送并测试.
集成 Quartz
Quartz 是一个开源作业调度框架, quartznet的详细信息请参考 http://www.quartz-scheduler.net/ , 集成 Quartz 可以帮助提供比只使用定时器更强大的功能.
-
添加 Quartz 相关 Nuget 包
Install-Package Quartz
Install-Package Quartz.Plugins -
新建 Quartz 配置类 QuartzOption
// More settings refer to:https://github.com/quartznet/quartznet/blob/master/src/Quartz/Impl/StdSchedulerFactory.cs
public class QuartzOption
{
public QuartzOption(IConfiguration config)
{
if (config == null)
{
throw new ArgumentNullException(nameof(config));
} var section = config.GetSection("quartz");
section.Bind(this);
} public Scheduler Scheduler { get; set; } public ThreadPool ThreadPool { get; set; } public Plugin Plugin { get; set; } public NameValueCollection ToProperties()
{
var properties = new NameValueCollection
{
["quartz.scheduler.instanceName"] = Scheduler?.InstanceName,
["quartz.threadPool.type"] = ThreadPool?.Type,
["quartz.threadPool.threadPriority"] = ThreadPool?.ThreadPriority,
["quartz.threadPool.threadCount"] = ThreadPool?.ThreadCount.ToString(),
["quartz.plugin.jobInitializer.type"] = Plugin?.JobInitializer?.Type,
["quartz.plugin.jobInitializer.fileNames"] = Plugin?.JobInitializer?.FileNames
}; return properties;
}
} public class Scheduler
{
public string InstanceName { get; set; }
} public class ThreadPool
{
public string Type { get; set; } public string ThreadPriority { get; set; } public int ThreadCount { get; set; }
} public class Plugin
{
public JobInitializer JobInitializer { get; set; }
} public class JobInitializer
{
public string Type { get; set; }
public string FileNames { get; set; }
} -
重写 JobFactory
public class JobFactory : IJobFactory
{
private readonly IServiceProvider _serviceProvider; public JobFactory(IServiceProvider serviceProvider)
{
_serviceProvider = serviceProvider;
} public IJob NewJob(TriggerFiredBundle bundle, IScheduler scheduler)
{
var job = _serviceProvider.GetService(bundle.JobDetail.JobType) as IJob;
return job;
} public void ReturnJob(IJob job)
{
var disposable = job as IDisposable;
disposable?.Dispose();
}
} -
编写 Quartz Hosted Service
public class QuartzService : BackgroundService
{
private readonly ILogger _logger;
private readonly IScheduler _scheduler; public QuartzService(ILogger<QuartzService> logger, IScheduler scheduler)
{
_logger = logger;
_scheduler = scheduler;
} protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("Start quartz scheduler...");
await _scheduler.Start(stoppingToken);
} public override async Task StopAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("Stop quartz scheduler...");
await _scheduler.Shutdown(cancellationToken);
await base.StopAsync(cancellationToken);
}
} -
编写一个测试 Job
public class TestJob : IJob
{
private readonly ILogger _logger; public TestJob(ILogger<TestJob> logger)
{
_logger = logger;
} public Task Execute(IJobExecutionContext context)
{
_logger.LogInformation(string.Format("[{0:yyyy-MM-dd hh:mm:ss}] TestJob is working.", DateTime.Now));
return Task.CompletedTask;
}
} -
准备 appsettings.json 文件
"quartz": {
"scheduler": {
"instanceName": "GenericHostDemo.Quartz"
},
"threadPool": {
"type": "Quartz.Simpl.SimpleThreadPool, Quartz",
"threadPriority": "Normal",
"threadCount": 10
},
"plugin": {
"jobInitializer": {
"type": "Quartz.Plugin.Xml.XMLSchedulingDataProcessorPlugin, Quartz.Plugins",
"fileNames": "quartz_jobs.xml"
}
}
} -
准备 Quartz 的调度文件 quartz_jobs.xml, 并右键点击其属性,将其“复制到输出目录”设置为“始终复制”。
<?xml version="1.0" encoding="UTF-8"?> <job-scheduling-data xmlns="http://quartznet.sourceforge.net/JobSchedulingData"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
version="2.0"> <processing-directives>
<overwrite-existing-data>true</overwrite-existing-data>
</processing-directives> <schedule>
<job>
<name>TestJob</name>
<group>TestGroup</group>
<description>TestJob Description</description>
<job-type>GenericHostDemo.Jobs.TestJob, GenericHostDemo</job-type>
<durable>true</durable>
<recover>false</recover>
</job>
<trigger>
<simple>
<name>TestJobTrigger</name>
<group>TestGroup</group>
<description>TestJobTrigger Description</description>
<job-name>TestJob</job-name>
<job-group>TestGroup</job-group>
<repeat-count>-1</repeat-count>
<repeat-interval>5000</repeat-interval>
</simple>
</trigger> </schedule>
</job-scheduling-data> -
Program 注册 Quartz Hosted Service 和 TestJob
//Quartz
services.AddSingleton<IJobFactory, JobFactory>();
services.AddSingleton(provider =>
{
var option = new QuartzOption(hostContext.Configuration);
var sf = new StdSchedulerFactory(option.ToProperties());
var scheduler = sf.GetScheduler().Result;
scheduler.JobFactory = provider.GetService<IJobFactory>();
return scheduler;
});
services.AddHostedService<QuartzService>(); //Jobs
services.AddSingleton<TestJob>(); 运行并查看结果.
部署
通用主机的部署其实就是让它在后台运行。
在Linux下面让程序在后台运行方式有很多种,比如通过systemctl, Supervisor等。
也可以把它部署在Docker里面.
BackgroundService 和 IHostedService
在 .NET Core 2.1 之前, 需要自己实现 IHostedService 接口来创建服务. 在 .NET Core 2.1 中, 考虑到大多数后台任务在取消令牌管理和其他典型操作方面有相似的需求,微软提供了一个非常方便的抽象基类BackgroundService. 源码如下:
// Copyright (c) .NET Foundation. Licensed under the Apache License, Version 2.0.
/// <summary>
/// Base class for implementing a long running <see cref="IHostedService"/>.
/// </summary>
public abstract class BackgroundService : IHostedService, IDisposable
{
private Task _executingTask;
private readonly CancellationTokenSource _stoppingCts =
new CancellationTokenSource();
protected abstract Task ExecuteAsync(CancellationToken stoppingToken);
public virtual Task StartAsync(CancellationToken cancellationToken)
{
// Store the task we're executing
_executingTask = ExecuteAsync(_stoppingCts.Token);
// If the task is completed then return it,
// this will bubble cancellation and failure to the caller
if (_executingTask.IsCompleted)
{
return _executingTask;
}
// Otherwise it's running
return Task.CompletedTask;
}
public virtual async Task StopAsync(CancellationToken cancellationToken)
{
// Stop called without start
if (_executingTask == null)
{
return;
}
try
{
// Signal cancellation to the executing method
_stoppingCts.Cancel();
}
finally
{
// Wait until the task completes or the stop token triggers
await Task.WhenAny(_executingTask, Task.Delay(Timeout.Infinite,
cancellationToken));
}
}
public virtual void Dispose()
{
_stoppingCts.Cancel();
}
}