From 3cd7a42c4bb1591b5a2a26e0f9fe4fbc189a37e1 Mon Sep 17 00:00:00 2001 From: hhb <455982533@qq.com> Date: Wed, 11 Sep 2024 17:16:03 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8F=92=E5=85=A5=E7=BB=91=E5=AE=9A=E8=A1=A8?= =?UTF-8?q?=E3=80=81=E6=97=A5=E5=BF=97=E8=A1=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Crontab/Job/HttpJob.cs | 58 +++++++++++++++++-------- Crontab/Model/QZ_JobParams.cs | 2 + Crontab/QuartzService.cs | 27 ++++++++---- Hadoop/ZKLT.Hadoop.Model/HDP_Task.cs | 7 ++- Hadoop/ZKLT.Hadoop.Model/HDP_TaskLog.cs | 21 +++++++++ Hadoop/ZKLT.Hadoop/HadoopService.cs | 4 ++ Hadoop/ZKLT.Hadoop/TaskService.cs | 35 ++++++--------- 7 files changed, 106 insertions(+), 48 deletions(-) create mode 100644 Hadoop/ZKLT.Hadoop.Model/HDP_TaskLog.cs diff --git a/Crontab/Job/HttpJob.cs b/Crontab/Job/HttpJob.cs index a134063..13742da 100644 --- a/Crontab/Job/HttpJob.cs +++ b/Crontab/Job/HttpJob.cs @@ -11,6 +11,7 @@ using ZKLT.Quartz.Model; using ZKLT.Hadoop.Interface; using Newtonsoft.Json; using ZKLT.Hadoop.Model; +using System.Dynamic; namespace ZKLT.Quartz.Job { /// @@ -18,29 +19,29 @@ namespace ZKLT.Quartz.Job /// public class HttpJob : IJob { - private IHadoopService HadoopService; - public HttpJob(IHadoopService _hadoopService) - { - HadoopService = _hadoopService; - } public async Task Execute(IJobExecutionContext context) { JobDataMap dataMap = context.JobDetail.JobDataMap; - string taskId = dataMap.Get("TaskId").ToString(); - Console.WriteLine(taskId); - + IHadoopService _HadoopService = (IHadoopService)dataMap.Get("hadoop"); + // 任务配置项 List configs = (List)dataMap.Get("Params"); - + // 任务Id + string taskId = dataMap.Get("TaskId").ToString(); + // 是否开启日志 + int isLog = (int)dataMap.Get("IsLog"); + // 请求地址 string url = configs.Find(_item => _item.Key == "Url").Value; - HttpMethod httpMethod = configs.Find(_item => _item.Key == "Method").Value == "Get" ? HttpMethod.Get : HttpMethod.Post; + // 请求类型 + HttpMethod httpMethod = configs.Find(_item => _item.Key == "Method").Value == "GET" ? HttpMethod.Get : HttpMethod.Post; + // 返回数据储存表 string bindDatabase = configs.Find(_item => _item.Key == "BindDatabse").Value; - + // 返回数据集合名 + string resultSet = configs.Find(_item => _item.Key == "ResultSet").Value; using (HttpClient client = new HttpClient()) { try { var resultColumnConfigs = new List(); - HttpRequestMessage request = new HttpRequestMessage(httpMethod, url); foreach (var _item in configs) { @@ -57,11 +58,29 @@ namespace ZKLT.Quartz.Job HttpResponseMessage response = await client.SendAsync(request); response.EnsureSuccessStatusCode(); string responseBody = await response.Content.ReadAsStringAsync(); - var responseList = JsonConvert.DeserializeObject>(responseBody); - if(bindDatabase != null) + var responseObject = JsonConvert.DeserializeObject(responseBody); + JArray responseList = (JArray)responseObject.GetValue(resultSet); + // 如果设定了绑定表 插入表中 + if (bindDatabase != null) { HDP_Command[] commands = makeCommand(bindDatabase, responseList, resultColumnConfigs); - HadoopService.PatchCommand(commands); + _HadoopService.PatchCommand(commands); + } + // 日志开启,插入日志 + if (isLog == HDP_Task.LOGOPEN) + { + var data = new HDP_TaskLog() + { + Id = "TL" + DateTime.Now.ToUniversalTime(), + TaskId = taskId, + Content = responseBody + }; + var logCommand = new HDP_Command() + { + TableId = HDP_TaskLog.TABLEID, + Data = JToken.FromObject(data) + }; + _HadoopService.Insert(logCommand); } } catch (HttpRequestException e) @@ -69,11 +88,12 @@ namespace ZKLT.Quartz.Job Console.WriteLine($"请求错误{e.Message}"); } } + await Task.CompletedTask; } - public HDP_Command[] makeCommand(string bindDatabase,List responseList,List configList) + public HDP_Command[] makeCommand(string bindDatabase,JArray responseList,List configList) { HDP_Command[] commands = new HDP_Command?[responseList.Count]; foreach (var _item in responseList) @@ -81,10 +101,14 @@ namespace ZKLT.Quartz.Job var command = new HDP_Command(); command.TableId = bindDatabase; command.Type = HDP_CommandType.INSERT; + var data = new JObject(); foreach (var _config in configList) { - command.Data[_config.Value] = _item[_config.Key]; + data[_config.Value] = _item[_config.Key]; } + command.Data = data; + int _index = responseList.IndexOf(_item); + commands[_index] = command; } return commands; } diff --git a/Crontab/Model/QZ_JobParams.cs b/Crontab/Model/QZ_JobParams.cs index 80790df..d3f3e40 100644 --- a/Crontab/Model/QZ_JobParams.cs +++ b/Crontab/Model/QZ_JobParams.cs @@ -16,6 +16,7 @@ namespace ZKLT.Quartz.Model private string? _Body; private string? _CronTime; private string? _BindTable; + private int _IsLog; private List? _Params; public string TaskId { get => _TaskId; set => _TaskId = value;} @@ -25,6 +26,7 @@ namespace ZKLT.Quartz.Model public string? Body { get => _Body; set => _Body = value;} public string? CronTime { get => _CronTime; set => _CronTime = value; } public string? BindTable { get => _BindTable; set => _BindTable = value; } + public int IsLog { get => _IsLog; set => _IsLog = value; } public List? Params { get => _Params; set => _Params = value;} } } diff --git a/Crontab/QuartzService.cs b/Crontab/QuartzService.cs index 003526b..d1cf79e 100644 --- a/Crontab/QuartzService.cs +++ b/Crontab/QuartzService.cs @@ -2,6 +2,8 @@ using Quartz; using Quartz.Impl; using System.Threading; +using ZKLT.Hadoop.Interface; +using ZKLT.Hadoop.Model; using ZKLT.Quartz.Interface; using ZKLT.Quartz.Job; using ZKLT.Quartz.Model; @@ -10,15 +12,16 @@ namespace ZKLT.Quartz { public class QuartzService : IQuartzService { - private const string HTTPTASK = "http"; - private const string SQLTASK = "sql"; - private IScheduler Scheduler; - public QuartzService() + private IScheduler _Scheduler; + + private IHadoopService _HadoopService; + public QuartzService(IHadoopService hadoopService) { + _HadoopService = hadoopService; //实例化调度器 - Scheduler = StdSchedulerFactory.GetDefaultScheduler().Result; - Scheduler.Start(); + _Scheduler = StdSchedulerFactory.GetDefaultScheduler().Result; + _Scheduler.Start(); } public void CreateHttpJob(QZ_JobParams jobParams) @@ -28,13 +31,17 @@ namespace ZKLT.Quartz //创建HTTP作业 Id作为名称 IJobDetail job = JobBuilder.Create() - .WithIdentity("j" + jobParams.TaskId, HTTPTASK) + .WithIdentity("j" + jobParams.TaskId, HDP_Task.HTTPTASK) .UsingJobData(jobDataMap) .Build(); + //创建触发器 - ITrigger trigger = TriggerBuilder.Create().WithIdentity("t" + jobParams.TaskId, HTTPTASK).StartNow().WithCronSchedule(jobParams.CronTime).Build(); + ITrigger trigger = TriggerBuilder.Create().WithIdentity("t" + jobParams.TaskId, HDP_Task.HTTPTASK).StartNow() + .WithCronSchedule(jobParams.CronTime) + .Build(); + // 把作业,触发器加入调度器 - Scheduler.ScheduleJob(job, trigger); + _Scheduler.ScheduleJob(job, trigger); } private JobDataMap MakeJobDataMap(QZ_JobParams jobParams) @@ -48,6 +55,7 @@ namespace ZKLT.Quartz if (propValue == null) continue; jobDataMap.Add(prop.Name, propValue); } + jobDataMap.Add("hadoop", _HadoopService); return jobDataMap; } @@ -58,6 +66,7 @@ namespace ZKLT.Quartz public void CloseJob() { + } diff --git a/Hadoop/ZKLT.Hadoop.Model/HDP_Task.cs b/Hadoop/ZKLT.Hadoop.Model/HDP_Task.cs index 9a4eb48..3d60470 100644 --- a/Hadoop/ZKLT.Hadoop.Model/HDP_Task.cs +++ b/Hadoop/ZKLT.Hadoop.Model/HDP_Task.cs @@ -9,11 +9,16 @@ namespace ZKLT.Hadoop.Model { public class HDP_Task { + public const string HTTPTASK = "http"; + public const string SQLTASK = "sql"; + public const int LOGOPEN = 1; + public const int LOGCLOSE = 0; private string _Id; private string? _Title; private int? _IsActive; private string? _Type; private string? _CronTime; + private int _IsLog; private List? _TaskConfigs; public string Id { get => _Id; set => _Id = value; } @@ -24,7 +29,7 @@ namespace ZKLT.Hadoop.Model public string? Type { get => _Type; set => _Type = value; } public string? CronTime { get => _CronTime; set => _CronTime = value; } - + public int IsLog { get => _IsLog; set => _IsLog = value; } public List? TaskConfigs { get => _TaskConfigs; set => _TaskConfigs = value; } } } diff --git a/Hadoop/ZKLT.Hadoop.Model/HDP_TaskLog.cs b/Hadoop/ZKLT.Hadoop.Model/HDP_TaskLog.cs new file mode 100644 index 0000000..ec86e1c --- /dev/null +++ b/Hadoop/ZKLT.Hadoop.Model/HDP_TaskLog.cs @@ -0,0 +1,21 @@ +using Newtonsoft.Json.Linq; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace ZKLT.Hadoop.Model +{ + public class HDP_TaskLog + { + public const string TABLEID = "ERP_TaskLog"; + private string _Id; + private string _TaskId; + private JToken? _Content; + + public string Id { get =>_Id; set => _Id = value; } + public string TaskId { get => _TaskId; set => _TaskId = value; } + public JToken? Content { get => _Content; set => _Content = value; } + } +} diff --git a/Hadoop/ZKLT.Hadoop/HadoopService.cs b/Hadoop/ZKLT.Hadoop/HadoopService.cs index c6c479d..c2ff30c 100644 --- a/Hadoop/ZKLT.Hadoop/HadoopService.cs +++ b/Hadoop/ZKLT.Hadoop/HadoopService.cs @@ -2,6 +2,9 @@ using Microsoft.Extensions.DependencyInjection; using MySqlX.XDevAPI.Relational; using Newtonsoft.Json.Linq; +using Quartz.Impl; +using Quartz; +using Quartz.Spi; using System; using System.Collections.Generic; using System.Linq; @@ -31,6 +34,7 @@ namespace ZKLT.Hadoop services.AddSingleton(); services.AddSingleton(); services.AddSingleton(); + return services; } diff --git a/Hadoop/ZKLT.Hadoop/TaskService.cs b/Hadoop/ZKLT.Hadoop/TaskService.cs index cece116..34ff26d 100644 --- a/Hadoop/ZKLT.Hadoop/TaskService.cs +++ b/Hadoop/ZKLT.Hadoop/TaskService.cs @@ -23,33 +23,26 @@ namespace ZKLT.Hadoop public string Start(HDP_Task taskParams) { //获取计划任务信息 - var command = new HDP_Command(); - command.TableId = "ERP_Task"; - command.Where = new JObject - { - { "Id","=" } - }; - command.Data = new JObject - { - {"Id",taskParams.Id } - }; - var taskInfo = _HadoopService.QuerySingle(command); - command.TableId = "ERP_TaskConfig"; - command.Where = new JObject - { - { "TaskId","=" } - }; - command.Data = new JObject - { - {"TaskId",taskParams.Id } - }; + var taskCommand = new HDP_Command(); + taskCommand.TableId = "ERP_Task"; + taskCommand.Where = new JObject() { { "Id","=" } }; + taskCommand.Data = new JObject() { {"Id",taskParams.Id } }; + var taskInfo = _HadoopService.QuerySingle(taskCommand); + //获取计划任务配置 + var taskConfigCommand = new HDP_Command(); + taskConfigCommand.TableId = "ERP_TaskConfig"; + taskConfigCommand.Where = new JObject() { { "TaskId","=" } }; + taskConfigCommand.Data = new JObject() { {"TaskId",taskParams.Id} }; QZ_JobParams jobParams = new QZ_JobParams(); jobParams.TaskId = taskInfo.Id; jobParams.CronTime = taskInfo.CronTime; - jobParams.Params = _HadoopService.Query(command).ToList(); + jobParams.IsLog = taskInfo.IsLog; + jobParams.Params = _HadoopService.Query(taskConfigCommand).ToList(); + // 调用任务管理类 开启任务管理 _QuartzService.CreateHttpJob(jobParams); + return taskParams.Id; } public string Pause(string Id)