asp.net core microservices 架构之 分布式自动计算(二)

一  简介                  

上一篇介绍了zookeeper如何进行分布式协调,这次主要讲解quartz使用zookeeper进行分布式计算,因为上一篇只是讲解原理,而这次实际使用,zookeeperService更改了一部分代码,算是集成优化吧。

系统结构图如下:

asp.net core microservices 架构之 分布式自动计算(二)

上图展示了,两个部分,一个是故障灾难转移集群,一个实现了分片的功能。故障灾难转移集群是quartz自带的功能,就不多说,分片功能是自己实现的。在这里要说下quartz使用故障灾难转移集群的一些配置注意事项:

asp.net core microservices 架构之 分布式自动计算(二)

再者就是netcore不支持remote,这个很重要,所以需要自己做一个web服务接口,但是本实例没有实现,而是仅仅使用数据库去配置和跟踪quartz服务,但是这是主要的。而使用api的一些功能就是实时开启,关闭,监控quartz主机状态,监控分片主机状态。所以大家留意这些功能暂时没有,不过大家在本文学会后很容易就可以自己扩展。

在这里要感谢 github账号为 weizhong1988/Weiz.TaskManager 的一个quartz管理的项目。

当然我这次的案例都是基于linux和mysql,而这个项目是sql server,所以我把sql全部替换了,更改了一些东西。后面会把代码全部放出来。界面如下图

asp.net core microservices 架构之 分布式自动计算(二)

asp.net core microservices 架构之 分布式自动计算(二)

好,下面看代码实现。

二 quartz故障灾难转移和分片功能             

首先看结构:

asp.net core microservices 架构之 分布式自动计算(二)

然后看Program入口方法:

 var host = new HostBuilder()
.UseEnvironment(EnvironmentName.Development)
.ConfigureAppConfiguration((hostContext, configApp) =>
{
configApp.SetBasePath(Directory.GetCurrentDirectory());
configApp.AddJsonFile(
$"appsettings.{hostContext.HostingEnvironment.EnvironmentName}.json",
optional: true);
configApp.AddEnvironmentVariables("PREFIX_");
configApp.AddCommandLine(args);
var rootConfiguration = configApp.Build();
QuartzOpt = new QuartzOption();
rootConfiguration.GetSection("Quartz").Bind(QuartzOpt); //绑定quartz的配置类的数据
}).ConfigureLogging((hostContext, configBuild) =>
{
configBuild.AddConfiguration(hostContext.Configuration.GetSection("Logging"));
configBuild.AddConsole();
configBuild.AddCustomizationLogger();
})
.ConfigureServices((hostContext, service) =>
{
service.AddKafka(KafkaBuilder =>
{
KafkaBuilder.AddConfiguration(hostContext.Configuration.GetSection("KafkaService"));
});
service.AddZookeeper(zookeeperBuilder =>
{
zookeeperBuilder.AddConfiguration(hostContext.Configuration.GetSection("zookeeperService"));
});
service.AddDbContext<QuartzDbContext>(option =>
option.UseMySQL(hostContext.Configuration.GetConnectionString("QuartzDatabase")), ServiceLifetime.Transient, ServiceLifetime.Transient);
              //这个是操作数据库的数据库服务,这个和 quartz的cluster数据提供程序是分开的。
})
.Build(); Host = host;
ILoggerFactory loggerFact = host.Services.GetService<ILoggerFactory>(); LogProvider.SetCurrentLogProvider(new ConsoleLogProvider(loggerFact)); //将框架的日志提供程序,传递给quart的日志接口。  
       var ischema = RunProgramRunExample(loggerFact); //从数据库构造job的方法
host.WaitForShutdown(); //netcore的通用主机。
ischema.Shutdown(true);//quartz自己的主机。

quartz框架的架构和netcore目前的架构不兼容,netcore的通道和服务部件的软件架构方式,quartz先天不支持,你无法将任何上下文,比如host上下文,configuration上下文或者service上下文,传递给quartz。所以我使用了属性的方式:

     private ILoggerFactory _loggerFact;

        public static IHost Host { get; set; }

        public static String QUARTZ_INSTANCE_ID = "PREFIX_QUARTZ_INSTANCE_ID";

        public static QuartzOption QuartzOpt { get; set; }

在quartz上下文是这样使用的:

      ILoggerFactory loggerFact = Program.Host.Services.GetService<ILoggerFactory>();
var _logger=loggerFact.CreateLogger<ZookeeperService>();

而我在上一节说的不支持remote的解决方案,使用netcore的解决方案就是kestrel,netcoe宣称后面将要将webHost合并到通用主机里面,netcore确实目前发展较快,内部代码优化时,代码变动较大

,比如我上次扩展log模块,上一版本程序设计上和当前版本几乎没有什么可重用的,当然,对于开发者,并没有什么区别,因为肯定会保持兼容。

咱们看quart的日志模块:ConsoleLogProvider

using System;
using Microsoft.Extensions.Logging;
using Quartz.Logging; namespace Walt.Framework.Quartz.Host
{
public class ConsoleLogProvider : ILogProvider
{
private ILoggerFactory _logFactory; public ConsoleLogProvider(ILoggerFactory logFactory)
{
_logFactory=logFactory;
}
public Logger GetLogger(string name)
{
return (level, func, exception, parameters) =>
{
if (func != null)
{
string logInfo=string.Format(func(), parameters);
var log=_logFactory.CreateLogger<ConsoleLogProvider>(); //将提供程序,替换为自定义的分布式log提供程序
log.LogDebug(logInfo);
}
return true;
};
} public IDisposable OpenNestedContext(string message)
{
throw new NotImplementedException();
} public IDisposable OpenMappedContext(string key, string value)
{
throw new NotImplementedException();
}
}
}

再看quartz的配置类:

namespace Walt.Framework.Quartz.Host
{
public class QuartzOption
{
public string InsatanceId{get;set;} //很重要,cluster中必须两个实例不一样。 public string InstanceName{get;set;} //quartz的实例名称,一般情况下用于显示名称。 public bool IsClear{get;set;} //是否启动的时候清理job,因为cluster在数据库中有历史数据。 public bool IsSlave{get;set;} //是不是slave,预留,暂时没用 public int CustomerRecordCountForTest{get;set;} //分片时候,每个机器分到的需要处理的数据的数量
}
}

然后就是咱们的主要方法:RunProgramRunExample

  private static IScheduler RunProgramRunExample(ILoggerFactory loggerFact)
{
var log = loggerFact.CreateLogger<Program>();
try
{ var config = Host.Services.GetService<IConfiguration>();
// Grab the Scheduler instance from the Factory
NameValueCollection properties = new NameValueCollection
{
["quartz.scheduler.instanceName"] = QuartzOpt.InstanceName,
["quartz.scheduler.instanceId"] = QuartzOpt.InsatanceId,
["quartz.threadPool.type"] = "Quartz.Simpl.SimpleThreadPool, Quartz",
["quartz.threadPool.threadCount"] = "",
["quartz.jobStore.misfireThreshold"] = "",
["quartz.jobStore.type"] = "Quartz.Impl.AdoJobStore.JobStoreTX, Quartz",
["quartz.jobStore.useProperties"] = "false",
["quartz.jobStore.dataSource"] = "default",
["quartz.jobStore.tablePrefix"] = "QRTZ_",
["quartz.jobStore.clustered"] = "true",
["quartz.jobStore.driverDelegateType"] = "Quartz.Impl.AdoJobStore.MySQLDelegate, Quartz",
["quartz.dataSource.default.connectionString"] = config.GetConnectionString("QuatrzClustDatabase"),
["quartz.dataSource.default.provider"] = "MySql",
["quartz.serializer.type"] = "json",
["quartz.scheduler.exporter.type"] = "Quartz.Simpl.RemotingSchedulerExporter, Quartz",
          //从这个往下netcore都不支持,以上为集群配置,只要两个实例id不同的quartz,配置同一个数据源
,就会自动的按照cluster运行,还有一点就是如果分布到不同机器,一定要配置ntp时间服务器,同步时间。
["quartz.scheduler.exporter.port"] = "",
["quartz.scheduler.exporter.bindName"] = "QuartzScheduler",
["quartz.scheduler.exporter.channelType"] = "tcp",
["quartz.scheduler.exporter.channelName"] = "httpQuartz",
["quartz.scheduler.exporter.rejectRemoteRequests"] = "true"
};
StdSchedulerFactory factory = new StdSchedulerFactory(properties); IScheduler scheduler = factory.GetScheduler().GetAwaiter().GetResult(); string machine = Environment.MachineName; //获取当前的机器名
QuartzDbContext db = Host.Services.GetService<QuartzDbContext>();
var listQuartzTask = db.QuartzTask.Where(w => w.IsDelete == 0
&& w.MachineName == machine && w.InstanceId == QuartzOpt.InsatanceId)
.ToListAsync().GetAwaiter().GetResult(); //从数据库中获取这台机器和实例中的job log.LogDebug("从数据库获取task记录,详细信息:{0}", Newtonsoft.Json.JsonConvert.SerializeObject(listQuartzTask));
Dictionary<string,Assembly> collAssembly=new Dictionary<string, Assembly>(); //加载程序集
foreach (var item in listQuartzTask)//首先第一次加载全部的程序集
{
//加载程序集
if (!string.IsNullOrEmpty(item.AssemblyName) && !collAssembly.ContainsKey(item.AssemblyName))
{
try
{
collAssembly[item.AssemblyName] =
AssemblyHelp.GetAssemblyByteByAssemblyName(
Path.Combine(Directory.GetCurrentDirectory(), "AssemblyColl"), item.AssemblyName);
}
catch (Exception ep)
{
log.Log(Microsoft.Extensions.Logging.LogLevel.Error, , ep, "没有找到程序集.");
Task.Delay();
continue;
}
}
} // and start it off
scheduler.Start();
// if (!QuartzOpt.IsSlave)
// {
var task = Task.Run(() =>
{
bool isClear = QuartzOpt.IsClear;
log.LogInformation("job监控程序开始循环,间隔为15秒"); while (true) //主要用来循环数据库记录,在添加或者修改job的时候,自动重新添加和执行job。
{
try
{
if (scheduler != null)
{ log.LogDebug("检查scheduler是否开始");
if (scheduler.IsStarted)
{
if (isClear) //启动清理
{
scheduler.Clear().GetAwaiter().GetResult();
isClear = false;
}
log.LogDebug("scheduler已经开始");
                                        db = Host.Services.GetService<QuartzDbContext>();
listQuartzTask = db.QuartzTask.Where(w => w.IsDelete == 0
&& w.MachineName == machine && w.InstanceId == QuartzOpt.InsatanceId)
.ToListAsync().GetAwaiter().GetResult(); //在循环中获取数据库中本机器和实例的job记录。
log.LogDebug("从数据库获取task记录,详细信息:{0}", Newtonsoft.Json.JsonConvert.SerializeObject(listQuartzTask));
foreach (var item in listQuartzTask)
{//加载程序集
if (!string.IsNullOrEmpty(item.AssemblyName) && !collAssembly.ContainsKey(item.AssemblyName)) //预先加载新添加的job的程序集
{
try
{
collAssembly[item.AssemblyName] =
AssemblyHelp.GetAssemblyByteByAssemblyName(
Path.Combine(Directory.GetCurrentDirectory(), "AssemblyColl"), item.AssemblyName);
}
catch (Exception ep)
{
log.Log(Microsoft.Extensions.Logging.LogLevel.Error, , ep, "没有找到程序集.");
Task.Delay();
continue;
}
}
log.LogDebug("开始检查task:{0}", Newtonsoft.Json.JsonConvert.SerializeObject(item));
var jobKey = new JobKey(item.TaskName, item.GroupName);
var triggerKey = new TriggerKey(item.TaskName, item.GroupName);
if (scheduler.CheckExists(jobKey).Result) //如果存在,则根据状态处理相应的动作。
{
var jobDetai = scheduler.GetJobDetail(jobKey);
var trigger = scheduler.GetTrigger(triggerKey);
log.LogDebug("此task已经存在scheduler中,数据库状态:{0},scheduer中的状态:{1}.trigger状态:{2}"
, ((OperateStatus)item.OperateStatus).ToString(), jobDetai.Status.ToString(), trigger.Status.ToString()); if ((OperateStatus)item.OperateStatus == OperateStatus.Stop) //如果数据库中停止job,则删除这个job,如果有remote,可以实时,处理,这断代码就没有用了,但是可以作为远程处理失败的预防错误,所以可以保留。
{
log.LogInformation("删除schduler中的job:{0}", jobKey.ToString());
if (!scheduler.DeleteJob(jobKey).GetAwaiter().GetResult())
{
log.LogError("删除job失败。name:{0},group:{1}", jobKey.Name, jobKey.Group);
}
}
else
{
if (jobDetai.IsFaulted) //如果失败,则更改数据库中job的状体,同理,如果有remote,这个可以作为预防错误。
{
if (jobDetai.Exception != null)
{
log.LogError(, jobDetai.Exception, "job faulted");
}
var jobItem = db.QuartzTask.FirstOrDefault(w => w.IsDelete ==
&& w.TaskName == jobKey.Name
&& w.GroupName == jobKey.Group
&& w.MachineName == machine
&& w.InstanceId == scheduler.SchedulerInstanceId);
item.Status = (int)TaskStatus.Faulted;
item.OperateStatus = (int)OperateStatus.Stop;
db.Update<QuartzTask>(jobItem);
db.SaveChanges();
}
else //如果非执行状态,则中断
{
if (jobDetai.Status != TaskStatus.Running
&& jobDetai.Status != TaskStatus.RanToCompletion
&& jobDetai.Status != TaskStatus.WaitingForActivation
&& jobDetai.Status != TaskStatus.WaitingForChildrenToComplete
&& jobDetai.Status != TaskStatus.WaitingToRun)
{
var interTask = scheduler.Interrupt(jobKey, new CancellationToken(true))
.GetAwaiter().GetResult();
jobDetai.Start();
}
}
} var triggerListener = scheduler.ListenerManager.GetTriggerListener("triggerUpdate"); if (triggerListener == null)
{
triggerListener = new TriggerUpdateListens("trigger"+item.TaskName);
IMatcher<TriggerKey> triggermatcher = KeyMatcher<TriggerKey>.KeyEquals(triggerKey);
scheduler.ListenerManager.AddTriggerListener(triggerListener, triggermatcher);
} var jobListener = scheduler.ListenerManager.GetJobListener("jobupdateListens");
if (jobListener == null)
{
IJobListener jobUpdateListener = new JobUpdateListens("job"+item.TaskName);
IMatcher<JobKey> jobmatcher = KeyMatcher<JobKey>.KeyEquals(jobKey);
scheduler.ListenerManager.AddJobListener(jobUpdateListener, jobmatcher);
}
}
else //如果不存在,则新添加job,以及执行
{
log.LogInformation("添加新的job,判断是否状态为停止。");
if ((OperateStatus)item.OperateStatus != OperateStatus.Stop)
{
log.LogInformation("添加新的job");
var assemblyName = item.AssemblyName;
var className = item.ClassName; Type jobTaskType = null;
try
{
jobTaskType = AssemblyHelp.GetTypeByAssemblyNameAndClassName(collAssembly[item.AssemblyName], className);
log.LogInformation("找到类型,type:{0}",className);
}
catch (Exception ep)
{
log.Log(Microsoft.Extensions.Logging.LogLevel.Error, , ep, "没有找到type.");
}
if (jobTaskType == null)
{
try
{
jobTaskType = AssemblyHelp
.GetTypeByCurrentAssemblyNameAndClassName(className, Assembly.GetExecutingAssembly());
if (jobTaskType == null)
{
log.LogInformation("没有找到类型");
continue;
}
log.LogInformation("找到类型,type:{0}",className);
}
catch (Exception ep)
{
log.Log(Microsoft.Extensions.Logging.LogLevel.Error, , ep, "没有找到类型.");
continue;
}
}
IJobDetail job = JobBuilder.Create(jobTaskType)
.WithIdentity(item.TaskName, item.GroupName)
.Build(); ITrigger trigger = TriggerBuilder.Create()
.WithIdentity(item.TaskName, item.GroupName)
.StartNow()
.WithCronSchedule(item.CronExpressionString)
.Build();
scheduler.ScheduleJob(job, trigger).GetAwaiter().GetResult();
log.LogInformation("添加成功,type:{0}",className);
ITriggerListener triggerListener = new TriggerUpdateListens("trigger"+item.TaskName);
IMatcher<TriggerKey> triggermatcher = KeyMatcher<TriggerKey>.KeyEquals(trigger.Key);
scheduler.ListenerManager.AddTriggerListener(triggerListener, triggermatcher); IJobListener jobUpdateListener = new JobUpdateListens("job"+item.TaskName);
IMatcher<JobKey> jobmatcher = KeyMatcher<JobKey>.KeyEquals(job.Key);
scheduler.ListenerManager.AddJobListener(jobUpdateListener, jobmatcher);
}
}
}
}
else
{
log.LogInformation("scheduler is not IsStarted");
}
}
else
{
log.LogInformation("scheduler is null");
}
}
catch (Exception ep)
{
log.Log(Microsoft.Extensions.Logging.LogLevel.Error, , ep, "task监控程序执行错误.");
}
Thread.Sleep();
}
});
// }
// else
// {
// db = Host.Services.GetService<QuartzDbContext>();
// listQuartzTask = db.QuartzTask.Where(w => w.IsDelete == 0
// && w.MachineName == machine
// && w.InstanceId == QuartzOpt.InsatanceId)
// .ToListAsync().GetAwaiter().GetResult();
// foreach (var item in listQuartzTask)
// {
// var jobKey = new JobKey(item.TaskName, item.GroupName);
// var triggerKey = new TriggerKey(item.TaskName, item.GroupName); // // var jobItem = db.QuartzTask.FirstOrDefault(w => w.IsDelete == 0
// // && w.TaskName == jobKey.Name
// // && w.GroupName == jobKey.Group
// // && w.MachineName == machine
// // && w.InstanceId == scheduler.SchedulerInstanceId);
// // item.Status = (int)TaskStatus.Faulted;
// // item.OperateStatus = (int)OperateStatus.Stop;
// // db.Update<QuartzTask>(jobItem);
// // db.SaveChanges(); // if (scheduler.CheckExists(jobKey).Result)
// {
// var triggerListener = scheduler.ListenerManager.GetTriggerListener("triggerUpdate");
// if (triggerListener == null)
// {
// triggerListener = new TriggerUpdateListens();
// IMatcher<TriggerKey> triggermatcher = KeyMatcher<TriggerKey>.KeyEquals(triggerKey);
// scheduler.ListenerManager.AddTriggerListener(triggerListener, triggermatcher);
// } // var jobListener = scheduler.ListenerManager.GetJobListener("jobupdateListens");
// if (jobListener == null)
// {
// IJobListener jobUpdateListener = new JobUpdateListens();
// IMatcher<JobKey> jobmatcher = KeyMatcher<JobKey>.KeyEquals(jobKey);
// scheduler.ListenerManager.AddJobListener(jobUpdateListener, jobmatcher);
// }
// }
// }
//}
return scheduler;
// Tell quartz to schedule the job using our trigger
//await scheduler.ScheduleJob(job, trigger);
}
catch (SchedulerException sep)
{
log.Log(Microsoft.Extensions.Logging.LogLevel.Error, , sep, "job执行错误。");
}
return null;
}

咱们现在看trriger监控类和job的监控类:

using System;
using System.Threading;
using System.Threading.Tasks;
using Quartz;
using Quartz.Logging;
using Quartz.Impl;
using Walt.Framework.Service.Zookeeper;
using Microsoft.Extensions.DependencyInjection;
using org.apache.zookeeper;
using System.Linq;
using System.Collections.Generic;
using Microsoft.Extensions.Logging;
using static org.apache.zookeeper.KeeperException; namespace Walt.Framework.Quartz.Host
{ public class TriggerUpdateListens : ITriggerListener
{
public string Name { get; set; } public TriggerUpdateListens(string name)
{
Name = name;
} private bool VoteJob{ get; set;} public Task TriggerComplete(ITrigger trigger, IJobExecutionContext context, SchedulerInstruction triggerInstructionCode, CancellationToken cancellationToken = default(CancellationToken))
{
ILoggerFactory loggerFact = Program.Host.Services.GetService<ILoggerFactory>();
var _logger=loggerFact.CreateLogger<ZookeeperService>();
_logger.LogInformation(, null, "执行成功.name:{0} group:{1}", context.JobDetail.Key.Name, context.JobDetail.Key.Group);
return Task.FromResult(true);
}
     //trigger激发,这是job执行的第一个执行的。
public Task TriggerFired(ITrigger trigger, IJobExecutionContext context, CancellationToken cancellationToken = default(CancellationToken))
{
ILoggerFactory loggerFact = Program.Host.Services.GetService<ILoggerFactory>();
var _logger=loggerFact.CreateLogger<ZookeeperService>();
_logger.LogInformation(, null, "开始执行job.name:{0} group:{1}", context.JobDetail.Key.Name, context.JobDetail.Key.Group);
string machine = Environment.MachineName; //获取当前机器名
try
{
var customerAttri = context.JobDetail.JobType.GetCustomAttributes(false);
foreach (var customer in customerAttri)
{
if (customer is DistributingAttributes) //如果打这个标签,则说明是分片job。
{
var distri = customer as DistributingAttributes;
var zookeeper = Program.Host.Services.GetService<IZookeeperService>();
string currentTempNodeName = string.Empty;
string fullPath = "/lock/"+ context.JobDetail.Key.Name + context.JobDetail.Key.Group;
int flag = ;
Repeat: //这里因为某些原因失败,可以给重复几次。
string jsonData = zookeeper.GetDataByLockNode(fullPath, "getlock"
, ZooDefs.Ids.OPEN_ACL_UNSAFE, out currentTempNodeName);
if(jsonData==null)
{
_logger.LogError("获取锁失败。节点:{0},锁前缀:{1},重试:{2}",fullPath,"getlock",flag);
if(flag<=)
{
flag = flag + ;
goto Repeat;
}
VoteJob = true; //如果获取失败,则否决执行job,这个变量在下面的trriger方法中使用。
//context.Scheduler.Interrupt(context.JobDetail.Key);
                  return Task.FromResult(false); //返回false,则会执行VetoJobExecution方法。
                       }

              //获取锁成功,处理分片数据,构造分片上下文。
QuartzDbContext db = Program.Host.Services.GetService<QuartzDbContext>();
var item = db.QuartzTask.Where(w => w.IsDelete ==
&& w.TaskName == context.JobDetail.Key.Name
&& w.GroupName == context.JobDetail.Key.Group
&& w.MachineName == machine
&& w.InstanceId == context.Scheduler.SchedulerInstanceId).FirstOrDefault(); if (item != null)
{
//TODO 这里可以找出机器名,拼接remote的api,可以查看分片主机是否存活,从而将一些挂起的任务重新分配。
}
string distributeFlag = item.MachineName + item.InstanceId;
List<DistributingData> distriData = new List<DistributingData>();
DistributingData currentDistriEntity = null;
if (string.IsNullOrEmpty(jsonData))
{
currentDistriEntity= new DistributingData //分片元数据
{
DistributeFlag =distributeFlag, //分片标记,以机器名和实例名构造
PageIndex = ,
PageSize = Program.QuartzOpt.CustomerRecordCountForTest //配置的需要处理的数据数量
};
distriData.Add(currentDistriEntity);
}
else
{
distriData = Newtonsoft.Json.JsonConvert.DeserializeObject<List<DistributingData>>(jsonData);
if (distriData == null || distriData.Count() < )
{
currentDistriEntity= new DistributingData
{
DistributeFlag =distributeFlag,
PageIndex = ,
PageSize = Program.QuartzOpt.CustomerRecordCountForTest //配置
};
distriData.Add(currentDistriEntity);
}
else
{
currentDistriEntity= distriData.Where(w => w.DistributeFlag == distributeFlag).SingleOrDefault();
if (currentDistriEntity == null) //当前主机还没有分片过,将当前主机加入分片集群
{
var maxPageIndex = distriData.Max(w => w.PageIndex);
maxPageIndex = maxPageIndex + ;
var entity = new DistributingData
{
DistributeFlag = distributeFlag,
PageIndex = maxPageIndex,
PageSize = Program.QuartzOpt.CustomerRecordCountForTest //配置
};
distriData.Add(entity);
}
else
{
var maxPageIndex = distriData.Max(w => w.PageIndex);
maxPageIndex = maxPageIndex + ;
currentDistriEntity.PageIndex = maxPageIndex;
}
}
}
item.Remark = Newtonsoft.Json.JsonConvert.SerializeObject(currentDistriEntity);
db.Update(item);
db.SaveChanges();
string resultData = Newtonsoft.Json.JsonConvert.SerializeObject(distriData);
context.JobDetail.JobDataMap.Put("distriData", currentDistriEntity); //将分片数据放入数据上下文,job中可以访问。 zookeeper.SetDataAsync(fullPath
, resultData, false).GetAwaiter().GetResult();
zookeeper.DeleteNode(currentTempNodeName); //处理完成,需要删除当前节点,释放锁。
_logger.LogInformation("分片执行:{0}",resultData);
}
}
}
catch(ConnectionLossException cle)
{
VoteJob = true;
_logger.LogError(cle, "获取同步锁出现错误。连接丢失");
}
catch(SessionExpiredException sep)
{
VoteJob = true;
_logger.LogError(sep, "获取同步锁出现错误。连接过期");
}
catch(KeeperException kep)
{
VoteJob = true;
_logger.LogError(kep, "获取同步锁出现错误。操作zookeeper出错");
}
catch(Exception ep)
{ try
{
_logger.LogError(,ep,"分片失败。");
//context.Scheduler.DeleteJob(context.JobDetail.Key).GetAwaiter().GetResult();
VoteJob = true;
QuartzDbContext db = Program.Host.Services.GetService<QuartzDbContext>();
var item = db.QuartzTask.Where(w => w.IsDelete ==
&& w.TaskName == context.JobDetail.Key.Name
&& w.GroupName == context.JobDetail.Key.Group
&& w.MachineName == machine
&& w.InstanceId == context.Scheduler.SchedulerInstanceId).FirstOrDefault();
if (item == null)
{
_logger.LogError(, ep, "分片失败,获取数据库记录失败。");
}
else
{ item.Status = (int)TaskStatus.Canceled;
item.OperateStatus = (int)OperateStatus.Stop;
item.Remark = ep.ToString();
db.Update(item);
db.SaveChanges();
}
}
catch (Exception eep)
{
_logger.LogError(, eep, "分片失败,更新数据库失败。");
}
}
return Task.FromResult(true);
} public Task TriggerMisfired(ITrigger trigger, CancellationToken cancellationToken = default(CancellationToken))
{
return Task.FromResult(true);
}
     //
     //当TriggerComplete返回false,执行这个方法。
public Task<bool> VetoJobExecution(ITrigger trigger, IJobExecutionContext context, CancellationToken cancellationToken = default(CancellationToken))
{
ILoggerFactory loggerFact = Program.Host.Services.GetService<ILoggerFactory>();
var _logger=loggerFact.CreateLogger<ZookeeperService>();
if (VoteJob)
{
_logger.LogInformation(, null, "取消执行job.name:{0} group:{1}", context.JobDetail.Key.Name, context.JobDetail.Key.Group);
}
return Task.FromResult(VoteJob);//VoteJob在TriggerComplete中指定,默认为false
                          //,如果获取锁失败,则设置为true,这个方法返回true,则只执行JobUpdateListens的JobExecutionVetoed方法,然后job这一次将不执行。
} } }

接下来看job执行前都执行那些方法:

using System;
using System.Threading;
using System.Threading.Tasks;
using Quartz;
using Quartz.Logging;
using Quartz.Impl;
using Microsoft.Extensions.DependencyInjection;
using System.Linq;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging; namespace Walt.Framework.Quartz.Host
{ public class JobUpdateListens : IJobListener
{
public string Name { get; set; } public JobUpdateListens(string name)
{
Name = name;
} public Task JobExecutionVetoed(IJobExecutionContext context, CancellationToken cancellationToken = default(CancellationToken))
{
return Task.FromResult(true);
}
//job执行前执行,将状态放入数据库。
public Task JobToBeExecuted(IJobExecutionContext context, CancellationToken cancellationToken = default(CancellationToken))
{
try
{
string machine = Environment.MachineName;
QuartzDbContext db = Program.Host.Services.GetService<QuartzDbContext>();
var item = db.QuartzTask.FirstOrDefault(w => w.IsDelete ==
&& w.TaskName == context.JobDetail.Key.Name
&& w.GroupName == context.JobDetail.Key.Group
&& w.MachineName == machine
&& w.InstanceId == context.Scheduler.SchedulerInstanceId);
item.Status = (int)TaskStatus.WaitingToRun;
db.Update<QuartzTask>(item);
db.SaveChanges();
}
catch (Exception ep)
{
//context.Scheduler.Interrupt(context.JobDetail.Key);
var logFaoctory = Program.Host.Services.GetService<ILoggerFactory>();
var log = logFaoctory.CreateLogger<JobUpdateListens>();
log.LogError(, ep, "JobToBeExecuted:Job执行错误,name:{0},Group:{1}", context.JobDetail.Key.Name, context.JobDetail.Key.Group);
}
return Task.FromResult(true);
}
//job执行后执行,这个方法将执行结果放入数据库,处理异常。
public Task JobWasExecuted(IJobExecutionContext context, JobExecutionException jobException, CancellationToken cancellationToken = default(CancellationToken))
{
try
{
QuartzDbContext db = Program.Host.Services.GetService<QuartzDbContext>();
var logFaoctory = Program.Host.Services.GetService<ILoggerFactory>();
var log = logFaoctory.CreateLogger<JobUpdateListens>();
string machine = Environment.MachineName;
var item = db.QuartzTask.FirstOrDefault(w => w.IsDelete ==
&& w.TaskName == context.JobDetail.Key.Name
&& w.GroupName == context.JobDetail.Key.Group
&& w.MachineName == machine
&& w.InstanceId == context.Scheduler.SchedulerInstanceId);
if (jobException != null)
{
item.Status = (int)TaskStatus.Faulted;
item.Remark = Newtonsoft.Json.JsonConvert.SerializeObject(jobException);
log.LogError("Job执行错误,name:{0},Group:{1}", context.JobDetail.Key.Name, context.JobDetail.Key.Group);
}
else
{
item.Status = (int)TaskStatus.RanToCompletion;
item.RecentRunTime = context.FireTimeUtc.DateTime;
if (context.NextFireTimeUtc.HasValue)
{
item.NextFireTime = context.NextFireTimeUtc.Value.DateTime;
}
}
db.Update<QuartzTask>(item);
db.SaveChanges();
}
catch (Exception ep)
{
//context.Scheduler.Interrupt(context.JobDetail.Key);
var logFaoctory = Program.Host.Services.GetService<ILoggerFactory>();
var log = logFaoctory.CreateLogger<JobUpdateListens>();
log.LogError(, ep, "JobWasExecuted:Job执行错误,name:{0},Group:{1}", context.JobDetail.Key.Name, context.JobDetail.Key.Group);
}
return Task.FromResult(true);
}
} }

在这次使用 zookeeperservice中,优化了一些代码:

using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using org.apache.zookeeper;
using org.apache.zookeeper.data;
using static org.apache.zookeeper.ZooKeeper;
using System.Linq;
using System.Linq.Expressions;
using System.Threading;
using static org.apache.zookeeper.Watcher.Event;
using Newtonsoft.Json;
using System.Collections.Concurrent;
using static org.apache.zookeeper.ZooDefs;
using static org.apache.zookeeper.KeeperException; namespace Walt.Framework.Service.Zookeeper
{ internal class WaitLockWatch : Watcher
{
private AutoResetEvent _autoResetEvent; private ManualResetEvent _mutex;
private ILogger _logger; private string _path; public WaitLockWatch(AutoResetEvent autoResetEvent
, ILogger logger, string path
, ManualResetEvent mutex)
{
_autoResetEvent = autoResetEvent;
_logger = logger;
_path = path;
_mutex = mutex;
} public override Task process(WatchedEvent @event)
{
_mutex.Set();
return Task.FromResult(true);
}
} internal class WaitConnWatch : Watcher
{
private AutoResetEvent _autoResetEvent;
private ILogger _logger; private ManualResetEvent _mutex; public WaitConnWatch(AutoResetEvent autoResetEvent
,ILogger logger
,ManualResetEvent mutex)
{
_autoResetEvent=autoResetEvent;
_logger=logger;
_mutex = mutex;
} public override Task process(WatchedEvent @event)
{
_logger.LogInformation("watch激发,回掉状态:{0}",@event.getState().ToString());
if(@event.getState()== KeeperState.SyncConnected
||@event.getState()== KeeperState.ConnectedReadOnly)
{
_logger.LogInformation("释放连接阻塞");
_autoResetEvent.Set();
}
else
{
_logger.LogInformation("连接断开,释放分布式锁阻塞");
_mutex.Set();
}
return Task.FromResult();
}
} public class ZookeeperService : IZookeeperService
{
private ZookeeperOptions _zookeeperOptions;
private ZooKeeper _zookeeper; private static readonly byte[] NO_PASSWORD = new byte[]; public Watcher Wathcer {get;set;} public ILoggerFactory LoggerFac { get; set; } private ILogger _logger; internal Thread CurrThread{ get; } AutoResetEvent[] autoResetEvent=new AutoResetEvent[]
{new AutoResetEvent(false),new AutoResetEvent(false)};
ManualResetEvent _manualReset = new ManualResetEvent(false);
public ZookeeperService(IOptionsMonitor<ZookeeperOptions> zookeeperOptions
,ILoggerFactory loggerFac)
{
LoggerFac=loggerFac;
_logger=LoggerFac.CreateLogger<ZookeeperService>();
_zookeeperOptions=zookeeperOptions.CurrentValue;
_logger.LogInformation("配置参数:{0}",JsonConvert.SerializeObject(_zookeeperOptions));
zookeeperOptions.OnChange((zookopt,s)=>{
_zookeeperOptions=zookopt;
});
_logger.LogInformation("开始连接");
Conn(_zookeeperOptions);
CurrThread = System.Threading.Thread.CurrentThread;
} private void Conn(ZookeeperOptions zookeeperOptions)
{
bool isReadOnly=default(Boolean);
Wathcer=new WaitConnWatch(autoResetEvent[],_logger,_manualReset);
if(isReadOnly!=zookeeperOptions.IsReadOnly)
{
isReadOnly=zookeeperOptions.IsReadOnly;
} byte[] pwd=new byte[];
//如果没有密码和sessionId
if(string.IsNullOrEmpty(zookeeperOptions.SessionPwd)
&&_zookeeperOptions.SessionId==default(int))
{
_zookeeper=new ZooKeeper(zookeeperOptions.Connectstring,zookeeperOptions.SessionTimeout,Wathcer,isReadOnly);
}
else if (!string.IsNullOrEmpty(zookeeperOptions.SessionPwd))
{
pwd=System.Text.Encoding.Default.GetBytes(zookeeperOptions.SessionPwd);
_zookeeper=new ZooKeeper(zookeeperOptions.Connectstring,zookeeperOptions.SessionTimeout,Wathcer,,pwd,isReadOnly);
}
else
{
_zookeeper=new ZooKeeper(zookeeperOptions.Connectstring
,zookeeperOptions.SessionTimeout,Wathcer,zookeeperOptions.SessionId,pwd,isReadOnly);
}
if(_zookeeper.getState()==States.CONNECTING)
{
_logger.LogInformation("当前状态:CONNECTING。阻塞等待");
autoResetEvent[].WaitOne();
}
} public Task<string> CreateZNode(string path,string data,CreateMode createMode,List<ACL> aclList)
{
ReConn();
if(string.IsNullOrEmpty(path)||!path.StartsWith('/'))
{
_logger.LogInformation("path路径非法,参数:path:{0}",path);
return null;
}
byte[] dat=new byte[];
if(string.IsNullOrEmpty(data))
{
dat=System.Text.Encoding.Default.GetBytes(data);
}
if(createMode==null)
{
_logger.LogInformation("createMode为null,默认使用CreateMode.PERSISTENT");
createMode=CreateMode.PERSISTENT;
}
return _zookeeper.createAsync(path,dat,aclList,createMode);
} public async void Sync(string path)
{
try
{
_logger.LogInformation("同步成功");
await _zookeeper.sync(path);
}
catch (Exception ep)
{
_logger.LogError("同步失败。", ep);
}
} public async Task<DataResult> GetDataAsync(string path,Watcher watcher,bool isSync)
{
ReConn();
if(await _zookeeper.existsAsync(path)==null )
{
_logger.LogInformation("path不存在");
return null;
}
if (isSync)
{
_logger.LogInformation("即将进行同步。");
try
{
await _zookeeper.sync(path);
_logger.LogInformation("同步成功");
}
catch (Exception ep)
{
_logger.LogError("同步失败。", ep);
}
}
return await _zookeeper.getDataAsync(path,watcher);
} public async Task<Stat> SetDataAsync(string path,string data,bool isSync)
{
ReConn();
if(await _zookeeper.existsAsync(path)==null )
{
_logger.LogInformation("path不存在");
return null;
}
byte[] dat=new byte[];
if(!string.IsNullOrEmpty(data))
{
dat=System.Text.Encoding.Default.GetBytes(data);
}
return await _zookeeper.setDataAsync(path,dat);
} public async Task<ChildrenResult> GetChildrenAsync(string path, Watcher watcher, bool isSync)
{
ReConn();
if (await _zookeeper.existsAsync(path) == null)
{
_logger.LogInformation("path不存在");
return null;
}
if (isSync)
{
_logger.LogInformation("即将进行同步。");
try
{
_logger.LogInformation("开始同步");
await _zookeeper.sync(path);
_logger.LogInformation("同步成功");
}
catch (Exception ep)
{
_logger.LogError("同步失败。", ep);
}
}
return await _zookeeper.getChildrenAsync(path, watcher);
} public async Task DeleteNode(string path)
{
ReConn();
if(await _zookeeper.existsAsync(path)==null )
{
_logger.LogDebug("删除path:path不存在");
return;
}
try
{
_logger.LogDebug("删除node:{0}", path);
await _zookeeper.deleteAsync(path);
}
catch (Exception ep)
{
_logger.LogError("删除失败", ep);
return;
}
} public async Task<bool> SetWatcher(string path,Watcher watcher)
{
ReConn();
var stat = await _zookeeper.existsAsync(path);
if(stat==null )
{
_logger.LogDebug("判断path是否存在:path不存在");
return false;
}
try
{
_logger.LogDebug("设置监控:{0}", path);
await _zookeeper.getDataAsync(path,watcher);
return true;
}
catch (Exception ep)
{
_logger.LogError("设置监控错误", ep);
return false;
}
} public string GetDataByLockNode(string path,string sequenceName,List<ACL> aclList,out string tempNodeOut)
{
_logger.LogInformation("获取分布式锁开始。");
string tempNode=string.Empty;
tempNodeOut=string.Empty;
try
{ ReConn();
if (_zookeeper.existsAsync(path).Result == null)
{
_logger.LogDebug("path不存在,创建");
CreateZNode(path, "", CreateMode.PERSISTENT, aclList).GetAwaiter().GetResult();
} tempNode = CreateZNode(path + "/" + sequenceName, "", CreateMode.EPHEMERAL_SEQUENTIAL, aclList).Result;
_logger.LogDebug("创建节点:{0}", tempNode);
if (tempNode == null)
{
_logger.LogDebug("创建临时序列节点失败。详细参数:path:{0},data:{1},CreateMode:{2}"
, path + "/squence", "", CreateMode.EPHEMERAL_SEQUENTIAL);
return null;
}
_logger.LogInformation("创建成功。"); // var taskGetData=Task.Run(async () =>{
// int circleCount = 0;
// while (true)
// {
// Thread.Sleep(200);
// circleCount++;
// _logger.LogInformation("循环获取锁。当前循环次数:{0}", circleCount);
// try
// {
// var childList =await GetChildrenAsync(path, null, true);
// if (childList == null || childList.Children == null || childList.Children.Count < 1)
// {
// _logger.LogWarning("获取子序列失败,计数为零.path:{0}", path);
// return null;
// }
// _logger.LogInformation("获取path:{0}的子节点:{1}", path, Newtonsoft.Json.JsonConvert.SerializeObject(childList.Children)); // var top = childList.Children.OrderBy(or => or).First();
// if (path + "/" + top == tempNode)
// {
// return tempNode;
// }
// }
// catch (Exception ep)
// {
// _logger.LogError(ep,"循环获取锁出错。");
// return null;
// }
// }
// });
// tempNode = taskGetData.GetAwaiter().GetResult();
// if (!string.IsNullOrEmpty(tempNode))
// {
// byte[] da = null;
// tempNodeOut = tempNode;
// da = GetDataAsync(path, null, true).Result.Data;
// if (da == null || da.Length < 1)
// {
// return string.Empty;
// }
// return System.Text.Encoding.Default.GetString(da);
// }
int clycleCount = ;
GetChild: //这里防止并发出现错误。
clycleCount++;
var childList = GetChildrenAsync(path, null, true).GetAwaiter().GetResult();
if (childList == null || childList.Children == null || childList.Children.Count < )
{
_logger.LogWarning("获取子序列失败,计数为零.path:{0}", path);
return null;
}
_logger.LogInformation("获取path:{0}的子节点:{1}", path, Newtonsoft.Json.JsonConvert.SerializeObject(childList.Children)); var top = childList.Children.OrderBy(or => or).First();
if (path + "/" + top == tempNode)
{
tempNodeOut = tempNode;
var da = GetDataAsync(path, null, true).Result.Data;
if (da == null || da.Length < )
{
return string.Empty;
}
return System.Text.Encoding.Default.GetString(da);
}
// bool isSet=
// SetWatcher(path + "/" + top,).Result;
// if(!isSet)
// {
// goto GetChild;
// }
bool isSet= SetWatcher(path + "/" + top,new WaitLockWatch(autoResetEvent[], _logger, path,_manualReset)).Result;
if(!isSet)
{
_logger.LogWarning("没有设置上watcher,需要重新运行一遍。");
goto GetChild;
}
_manualReset.WaitOne();
childList = GetChildrenAsync(path, null, true).GetAwaiter().GetResult();
if (childList == null || childList.Children == null || childList.Children.Count < )
{
_logger.LogWarning("再次获取子序列失败,计数为零.path:{0}", path);
return null;
}
_logger.LogInformation("再次获取path:{0}的子节点:{1}", path, Newtonsoft.Json.JsonConvert.SerializeObject(childList.Children));
top = childList.Children.OrderBy(or => or).First();
if (path + "/" + top == tempNode)
{
_logger.LogDebug("节点获取到锁权限。");
tempNodeOut = tempNode;
var da = GetDataAsync(path, null, true).Result.Data;
if (da == null || da.Length < )
{
return string.Empty;
}
return System.Text.Encoding.Default.GetString(da);
}
else
{
_logger.LogDebug("没有获取到锁权限,进行循环。循环第:{0}次",clycleCount);
Thread.Sleep();
goto GetChild;
// Sync(path); //DeleteNode(tempNode).GetAwaiter().GetResult();
// DeleteNode(tempNode).GetAwaiter().GetResult();
// _logger.LogError("没有获取到锁,Watcher出现问题,请查看日志。");
// if (_zookeeper.existsAsync(tempNode).Result== null)
// {
// _logger.LogWarning("tempNode:{0}存在,但是没有获取到锁,在等待的时候,被线程检查程序释放了阻塞,属于误伤"
// ,tempNode); // }
// else
// {
// _logger.LogError("没有获取到锁,Watcher出现问题,请查看日志。");
// }
} }
catch(ConnectionLossException cle)
{
_logger.LogError(cle, "获取同步锁出现错误。连接丢失");
}
catch(SessionExpiredException sep)
{
_logger.LogError(sep, "获取同步锁出现错误。连接过期");
}
catch(KeeperException kep)
{
_logger.LogError(kep, "获取同步锁出现错误。操作zookeeper出错");
}
catch (Exception ep)
{
_logger.LogError(ep, "获取同步锁出现错误。");
if (!string.IsNullOrEmpty(tempNode))
{
try{
DeleteNode(tempNode).GetAwaiter().GetResult();
}catch(Exception)
{ }
}
} return null;
} private void ReConn()
{
_logger.LogInformation("检查连接状态,status:{0}",_zookeeper.getState());
if(_zookeeper.getState()==States.CLOSED
||_zookeeper.getState()== States.NOT_CONNECTED)
{
_logger.LogInformation("连接为关闭,开始重新连接。");
Conn(_zookeeperOptions);
}
} public async void Close(string tempNode)
{
try
{
await _zookeeper.closeAsync();
}
catch (Exception ep)
{
_logger.LogError("zookeeper关闭失败。", ep);
}
} }
}

下面看结果

asp.net core microservices 架构之 分布式自动计算(二)

咱们关闭master2,然后看看slave1:

asp.net core microservices 架构之 分布式自动计算(二)

大家看时间上,master2已经2分钟没运行了,如果要有好的监控,还是最好实现remote,也不是什么难事,将通用主机改为webhost,然后写几个api就行了,就会实时的监控quartz主机的状态

,而且分片也会自动去除失败的主机,自动分派任务。咱们看看分片的情况:

asp.net core microservices 架构之 分布式自动计算(二)

master2已经失败,但是这里没有去掉,不过不影响接下来的任务,再者master2的任务如果在失败的时候没执行完成,那么会有一部分数据是没有处理的。

三 总结                  

quartz扩展主要注意两点,一是job listeners和trigger listeners,做好异常处理,尤其是trigger listeners,如果出错,job会失去控制,不激发,而且job状态也会失效,必须重新关闭和重新运行一次。在写业务代码的时候,分片的需要处理的数据源必须是有规律自增的或者是静态的,这样分页才满足业务分片的要求。如果是非自增的或者随机增加,那么quartz就必须把需要处理的主键存进去,但是这样的需求毕竟是少数。

微服务系列的github:https://github.com/ck0074451665/Walt.Framework.git

测试例子:https://github.com/ck0074451665/Walt.MicroServicesTest.git

管理界面:https://pan.baidu.com/s/1gYNDX1j3-XctuPiejV2XPQ

上一篇:JDK7下VisualVm插件无法链接到插件中心


下一篇:python设计模式之门面模式