From a5b6c387f83dde70185ea7531c2a6256dce02d45 Mon Sep 17 00:00:00 2001 From: hhb <455982533@qq.com> Date: Tue, 10 Sep 2024 16:03:10 +0800 Subject: [PATCH 1/3] =?UTF-8?q?=E8=AE=A1=E5=88=92=E4=BB=BB=E5=8A=A1?= =?UTF-8?q?=E6=9A=82=E5=AD=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Crontab/Interface/IQuartzService.cs | 21 ++++ Crontab/Job/HttpJob.cs | 92 ++++++++++++++++ Crontab/Model/QZ_JobConfig.cs | 24 +++++ Crontab/Model/QZ_JobParams.cs | 30 ++++++ Crontab/QuartzService.cs | 102 ++++++++++++++++++ Crontab/ZKLT.Quartz.csproj | 18 ++++ .../Controllers/HadoopController.cs | 31 +++++- Hadoop/ZKLT.Hadoop.Interface/ITaskService.cs | 16 +++ Hadoop/ZKLT.Hadoop.Model/HDP_Task.cs | 30 ++++++ Hadoop/ZKLT.Hadoop.Model/HDP_TaskConfig.cs | 25 +++++ Hadoop/ZKLT.Hadoop/HadoopService.cs | 5 + Hadoop/ZKLT.Hadoop/TaskService.cs | 65 +++++++++++ Hadoop/ZKLT.Hadoop/ZKLT.Hadoop.csproj | 1 + ZKLT.sln | 10 ++ 14 files changed, 468 insertions(+), 2 deletions(-) create mode 100644 Crontab/Interface/IQuartzService.cs create mode 100644 Crontab/Job/HttpJob.cs create mode 100644 Crontab/Model/QZ_JobConfig.cs create mode 100644 Crontab/Model/QZ_JobParams.cs create mode 100644 Crontab/QuartzService.cs create mode 100644 Crontab/ZKLT.Quartz.csproj create mode 100644 Hadoop/ZKLT.Hadoop.Interface/ITaskService.cs create mode 100644 Hadoop/ZKLT.Hadoop.Model/HDP_Task.cs create mode 100644 Hadoop/ZKLT.Hadoop.Model/HDP_TaskConfig.cs create mode 100644 Hadoop/ZKLT.Hadoop/TaskService.cs diff --git a/Crontab/Interface/IQuartzService.cs b/Crontab/Interface/IQuartzService.cs new file mode 100644 index 0000000..219c3b2 --- /dev/null +++ b/Crontab/Interface/IQuartzService.cs @@ -0,0 +1,21 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Newtonsoft.Json.Linq; +using ZKLT.Quartz.Model; + +namespace ZKLT.Quartz.Interface +{ + public interface IQuartzService + { + public void CreateHttpJob(QZ_JobParams httpParams); + + public void CreateSqlJob(); + + public void CloseJob(); + } + +} + diff --git a/Crontab/Job/HttpJob.cs b/Crontab/Job/HttpJob.cs new file mode 100644 index 0000000..a134063 --- /dev/null +++ b/Crontab/Job/HttpJob.cs @@ -0,0 +1,92 @@ +using Newtonsoft.Json.Linq; +using Quartz; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Net; +using System.Reflection.PortableExecutable; +using System.Text; +using System.Threading.Tasks; +using ZKLT.Quartz.Model; +using ZKLT.Hadoop.Interface; +using Newtonsoft.Json; +using ZKLT.Hadoop.Model; +namespace ZKLT.Quartz.Job +{ + /// + /// HTTP作业类 + /// + public class HttpJob : IJob + { + private IHadoopService HadoopService; + public HttpJob(IHadoopService _hadoopService) + { + HadoopService = _hadoopService; + } + public async Task Execute(IJobExecutionContext context) + { + JobDataMap dataMap = context.JobDetail.JobDataMap; + string taskId = dataMap.Get("TaskId").ToString(); + Console.WriteLine(taskId); + + List configs = (List)dataMap.Get("Params"); + + string url = configs.Find(_item => _item.Key == "Url").Value; + HttpMethod httpMethod = configs.Find(_item => _item.Key == "Method").Value == "Get" ? HttpMethod.Get : HttpMethod.Post; + string bindDatabase = configs.Find(_item => _item.Key == "BindDatabse").Value; + + using (HttpClient client = new HttpClient()) + { + try + { + var resultColumnConfigs = new List(); + + HttpRequestMessage request = new HttpRequestMessage(httpMethod, url); + foreach (var _item in configs) + { + if (_item.Group! == "Headers") + { + request.Headers.Add(_item.Key, _item.Value); + } + else if (_item.Group! == "ResultColumns") + { + resultColumnConfigs.Add(_item); + } + } + //请求提交 + HttpResponseMessage response = await client.SendAsync(request); + response.EnsureSuccessStatusCode(); + string responseBody = await response.Content.ReadAsStringAsync(); + var responseList = JsonConvert.DeserializeObject>(responseBody); + if(bindDatabase != null) + { + HDP_Command[] commands = makeCommand(bindDatabase, responseList, resultColumnConfigs); + HadoopService.PatchCommand(commands); + } + } + catch (HttpRequestException e) + { + Console.WriteLine($"请求错误{e.Message}"); + } + } + + await Task.CompletedTask; + } + + public HDP_Command[] makeCommand(string bindDatabase,List responseList,List configList) + { + HDP_Command[] commands = new HDP_Command?[responseList.Count]; + foreach (var _item in responseList) + { + var command = new HDP_Command(); + command.TableId = bindDatabase; + command.Type = HDP_CommandType.INSERT; + foreach (var _config in configList) + { + command.Data[_config.Value] = _item[_config.Key]; + } + } + return commands; + } + } +} diff --git a/Crontab/Model/QZ_JobConfig.cs b/Crontab/Model/QZ_JobConfig.cs new file mode 100644 index 0000000..189cc1e --- /dev/null +++ b/Crontab/Model/QZ_JobConfig.cs @@ -0,0 +1,24 @@ +using Newtonsoft.Json.Linq; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace ZKLT.Quartz.Model +{ + public class QZ_JobConfig + { + private string _Id; + private string _TaskId; + private string _Key; + private string? _Value; + private string? _Group; + + public string Id { get => _Id; set => _Id = value; } + public string TaskId { get => _TaskId; set => _TaskId = value; } + public string Key { get => _Key; set => _Key = value; } + public string? Value { get => _Value; set => _Value = value; } + public string? Group { get => _Group; set => _Group = value; } + } +} diff --git a/Crontab/Model/QZ_JobParams.cs b/Crontab/Model/QZ_JobParams.cs new file mode 100644 index 0000000..80790df --- /dev/null +++ b/Crontab/Model/QZ_JobParams.cs @@ -0,0 +1,30 @@ +using Newtonsoft.Json.Linq; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace ZKLT.Quartz.Model +{ + public class QZ_JobParams + { + private string _TaskId; + private string? _Url; + private string? _Method; + private string? _Headers; + private string? _Body; + private string? _CronTime; + private string? _BindTable; + private List? _Params; + + public string TaskId { get => _TaskId; set => _TaskId = value;} + public string? Url { get => _Url; set => _Url = value; } + public string? Method { get => _Method; set => _Method = value; } + public string? Headers { get => _Headers; set => _Headers = value;} + public string? Body { get => _Body; set => _Body = value;} + public string? CronTime { get => _CronTime; set => _CronTime = value; } + public string? BindTable { get => _BindTable; set => _BindTable = value; } + public List? Params { get => _Params; set => _Params = value;} + } +} diff --git a/Crontab/QuartzService.cs b/Crontab/QuartzService.cs new file mode 100644 index 0000000..003526b --- /dev/null +++ b/Crontab/QuartzService.cs @@ -0,0 +1,102 @@ +using Newtonsoft.Json.Linq; +using Quartz; +using Quartz.Impl; +using System.Threading; +using ZKLT.Quartz.Interface; +using ZKLT.Quartz.Job; +using ZKLT.Quartz.Model; + +namespace ZKLT.Quartz +{ + public class QuartzService : IQuartzService + { + private const string HTTPTASK = "http"; + private const string SQLTASK = "sql"; + private IScheduler Scheduler; + + public QuartzService() + { + //实例化调度器 + Scheduler = StdSchedulerFactory.GetDefaultScheduler().Result; + Scheduler.Start(); + } + + public void CreateHttpJob(QZ_JobParams jobParams) + { + //生成jobDataMap + JobDataMap jobDataMap = MakeJobDataMap(jobParams); + + //创建HTTP作业 Id作为名称 + IJobDetail job = JobBuilder.Create() + .WithIdentity("j" + jobParams.TaskId, HTTPTASK) + .UsingJobData(jobDataMap) + .Build(); + //创建触发器 + ITrigger trigger = TriggerBuilder.Create().WithIdentity("t" + jobParams.TaskId, HTTPTASK).StartNow().WithCronSchedule(jobParams.CronTime).Build(); + // 把作业,触发器加入调度器 + Scheduler.ScheduleJob(job, trigger); + } + + private JobDataMap MakeJobDataMap(QZ_JobParams jobParams) + { + JobDataMap jobDataMap = new JobDataMap(); + Type jobType = jobParams.GetType(); + foreach (var prop in jobType.GetProperties()) + { + var propValue = prop.GetValue(jobParams); + // 过滤掉 null 值 + if (propValue == null) continue; + jobDataMap.Add(prop.Name, propValue); + } + return jobDataMap; + } + + public void CreateSqlJob() + { + + } + + public void CloseJob() + { + } + + + /// + /// 暂停任务 + /// + /// + /// + //public string Pause(TaskInfoModel taskInfo) + //{ + // JobKey jobkey = new JobKey("j" + taskInfo.Id, "http"); + // TriggerKey triggerKey = new TriggerKey("t" + taskInfo.Id, "http"); + // scheduler.PauseJob(jobkey); + // scheduler.PauseTrigger(triggerKey); + // return taskInfo.Id; + //} + + ///// + ///// 关闭任务 + ///// + ///// + ///// + //public string Close(TaskInfoModel taskInfo) + //{ + // JobKey jobkey = new JobKey("j" + taskInfo.Id, "http"); + // TriggerKey triggerKey = new TriggerKey("t" + taskInfo.Id, "http"); + + // scheduler.PauseTrigger(triggerKey); + // scheduler.UnscheduleJob(triggerKey); + // scheduler.PauseJob(jobkey); + // scheduler.DeleteJob(jobkey); + + // return taskInfo.Id; + //} + + private JobKey GetJobKey(string Id,string GroupName) + { + JobKey jobKey = new JobKey("j" + Id, GroupName); + return jobKey; + } + } +} diff --git a/Crontab/ZKLT.Quartz.csproj b/Crontab/ZKLT.Quartz.csproj new file mode 100644 index 0000000..f95af7f --- /dev/null +++ b/Crontab/ZKLT.Quartz.csproj @@ -0,0 +1,18 @@ + + + + net8.0 + enable + enable + + + + + + + + + + + + diff --git a/Hadoop/ZKLT.Hadoop.API/Controllers/HadoopController.cs b/Hadoop/ZKLT.Hadoop.API/Controllers/HadoopController.cs index 74af2ef..bf69da3 100644 --- a/Hadoop/ZKLT.Hadoop.API/Controllers/HadoopController.cs +++ b/Hadoop/ZKLT.Hadoop.API/Controllers/HadoopController.cs @@ -4,7 +4,6 @@ using MySqlX.XDevAPI.Relational; using Newtonsoft.Json.Linq; using ZKLT.Hadoop.Interface; using ZKLT.Hadoop.Model; - namespace ZKLT.Hadoop.API.Controllers { /// @@ -14,16 +13,19 @@ namespace ZKLT.Hadoop.API.Controllers [ApiController] public class HadoopController : ControllerBase { - public HadoopController(IHadoopService hadoop,ITableService table) + public HadoopController(IHadoopService hadoop,ITableService table, ITaskService task) { _HadoopService = hadoop; _TableService = table; + _TaskService = task; } private IHadoopService _HadoopService; private ITableService _TableService; + private ITaskService _TaskService; + [HttpGet("getid")] public ActionResult GetId([FromQuery] string? prefix, [FromQuery] int? count) { if (count != null && count > 0) @@ -279,5 +281,30 @@ namespace ZKLT.Hadoop.API.Controllers return BadRequest(e.Message); } } + + [HttpPost("createTask")] + public ActionResult CreateTask(HDP_Task taskParams) + { + try + { + return Ok(_TaskService.Start(taskParams)); + } + catch (Exception e) + { + return BadRequest(e.Message); + } + } + [HttpPost("closeTask")] + public ActionResult CloseTask(HDP_Task taskParams) + { + try + { + return Ok(_TaskService.Close("1")); + } + catch (Exception e) + { + return BadRequest(e.Message); + } + } } } diff --git a/Hadoop/ZKLT.Hadoop.Interface/ITaskService.cs b/Hadoop/ZKLT.Hadoop.Interface/ITaskService.cs new file mode 100644 index 0000000..6c1a3d5 --- /dev/null +++ b/Hadoop/ZKLT.Hadoop.Interface/ITaskService.cs @@ -0,0 +1,16 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using ZKLT.Hadoop.Model; + +namespace ZKLT.Hadoop.Interface +{ + public interface ITaskService + { + public string Start(HDP_Task taskParams); + public string Pause(string Id); + public string Close(string Id); + } +} diff --git a/Hadoop/ZKLT.Hadoop.Model/HDP_Task.cs b/Hadoop/ZKLT.Hadoop.Model/HDP_Task.cs new file mode 100644 index 0000000..9a4eb48 --- /dev/null +++ b/Hadoop/ZKLT.Hadoop.Model/HDP_Task.cs @@ -0,0 +1,30 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Security.Cryptography; +using System.Text; +using System.Threading.Tasks; + +namespace ZKLT.Hadoop.Model +{ + public class HDP_Task + { + private string _Id; + private string? _Title; + private int? _IsActive; + private string? _Type; + private string? _CronTime; + private List? _TaskConfigs; + + public string Id { get => _Id; set => _Id = value; } + + public string? Title { get => _Title; set => _Title = value; } + + public int? IsActive { get => _IsActive; set => _IsActive = value; } + + public string? Type { get => _Type; set => _Type = value; } + public string? CronTime { get => _CronTime; set => _CronTime = value; } + + public List? TaskConfigs { get => _TaskConfigs; set => _TaskConfigs = value; } + } +} diff --git a/Hadoop/ZKLT.Hadoop.Model/HDP_TaskConfig.cs b/Hadoop/ZKLT.Hadoop.Model/HDP_TaskConfig.cs new file mode 100644 index 0000000..125440c --- /dev/null +++ b/Hadoop/ZKLT.Hadoop.Model/HDP_TaskConfig.cs @@ -0,0 +1,25 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace ZKLT.Hadoop.Model +{ + public class HDP_TaskConfig + { + + private string _Id; + private string _TaskId; + private string _Key; + private string _Value; + + public string Id { get => _Id; set => _Id = value; } + + public string TaskId { get => _TaskId; set => _TaskId = value; } + + public string Key { get => _Key; set => _Key = value; } + + public string Value { get => _Value; set => _Value = value; } + } +} diff --git a/Hadoop/ZKLT.Hadoop/HadoopService.cs b/Hadoop/ZKLT.Hadoop/HadoopService.cs index 64152ea..c6c479d 100644 --- a/Hadoop/ZKLT.Hadoop/HadoopService.cs +++ b/Hadoop/ZKLT.Hadoop/HadoopService.cs @@ -10,6 +10,8 @@ using System.Threading.Tasks; using System.Transactions; using ZKLT.Hadoop.Interface; using ZKLT.Hadoop.Model; +using ZKLT.Quartz; +using ZKLT.Quartz.Interface; namespace ZKLT.Hadoop { @@ -27,6 +29,8 @@ namespace ZKLT.Hadoop { services.AddSingleton(); services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); return services; } @@ -477,5 +481,6 @@ namespace ZKLT.Hadoop } return _result.ToArray(); } + } } diff --git a/Hadoop/ZKLT.Hadoop/TaskService.cs b/Hadoop/ZKLT.Hadoop/TaskService.cs new file mode 100644 index 0000000..cece116 --- /dev/null +++ b/Hadoop/ZKLT.Hadoop/TaskService.cs @@ -0,0 +1,65 @@ +using Newtonsoft.Json.Linq; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using ZKLT.Hadoop.Interface; +using ZKLT.Hadoop.Model; +using ZKLT.Quartz.Interface; +using ZKLT.Quartz.Model; + +namespace ZKLT.Hadoop +{ + public class TaskService : ITaskService + { + private IHadoopService _HadoopService; + private IQuartzService _QuartzService; + public TaskService(IHadoopService hadoopService, IQuartzService quartzService) { + _HadoopService = hadoopService; + _QuartzService = quartzService; + } + + public string Start(HDP_Task taskParams) + { + //获取计划任务信息 + var command = new HDP_Command(); + command.TableId = "ERP_Task"; + command.Where = new JObject + { + { "Id","=" } + }; + command.Data = new JObject + { + {"Id",taskParams.Id } + }; + var taskInfo = _HadoopService.QuerySingle(command); + command.TableId = "ERP_TaskConfig"; + command.Where = new JObject + { + { "TaskId","=" } + }; + command.Data = new JObject + { + {"TaskId",taskParams.Id } + }; + + QZ_JobParams jobParams = new QZ_JobParams(); + jobParams.TaskId = taskInfo.Id; + jobParams.CronTime = taskInfo.CronTime; + jobParams.Params = _HadoopService.Query(command).ToList(); + // 调用任务管理类 开启任务管理 + _QuartzService.CreateHttpJob(jobParams); + return taskParams.Id; + } + public string Pause(string Id) + { + return Id; + } + public string Close(string Id) + { + return Id; + } + + } +} diff --git a/Hadoop/ZKLT.Hadoop/ZKLT.Hadoop.csproj b/Hadoop/ZKLT.Hadoop/ZKLT.Hadoop.csproj index 231ebe7..da8bde4 100644 --- a/Hadoop/ZKLT.Hadoop/ZKLT.Hadoop.csproj +++ b/Hadoop/ZKLT.Hadoop/ZKLT.Hadoop.csproj @@ -13,6 +13,7 @@ + diff --git a/ZKLT.sln b/ZKLT.sln index d9ae477..fe1cbc3 100644 --- a/ZKLT.sln +++ b/ZKLT.sln @@ -4,6 +4,9 @@ Microsoft Visual Studio Solution File, Format Version 12.00 VisualStudioVersion = 17.8.34408.163 MinimumVisualStudioVersion = 10.0.40219.1 Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ZKLT.Hadoop", "Hadoop\ZKLT.Hadoop\ZKLT.Hadoop.csproj", "{CD7387DB-B80A-412E-89B9-830EFD28C0F4}" + ProjectSection(ProjectDependencies) = postProject + {D189A534-BABE-4B96-8106-F860B94C1F99} = {D189A534-BABE-4B96-8106-F860B94C1F99} + EndProjectSection EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Hadoop", "Hadoop", "{14EA48C3-3F3A-4789-BB0B-5485771D3840}" EndProject @@ -13,6 +16,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ZKLT.Hadoop.Interface", "Ha EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ZKLT.Hadoop.API", "Hadoop\ZKLT.Hadoop.API\ZKLT.Hadoop.API.csproj", "{398DF3E5-B94B-4B2E-9DC6-B6741D7CF4DB}" EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ZKLT.Quartz", "Crontab\ZKLT.Quartz.csproj", "{D189A534-BABE-4B96-8106-F860B94C1F99}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -35,6 +40,10 @@ Global {398DF3E5-B94B-4B2E-9DC6-B6741D7CF4DB}.Debug|Any CPU.Build.0 = Debug|Any CPU {398DF3E5-B94B-4B2E-9DC6-B6741D7CF4DB}.Release|Any CPU.ActiveCfg = Release|Any CPU {398DF3E5-B94B-4B2E-9DC6-B6741D7CF4DB}.Release|Any CPU.Build.0 = Release|Any CPU + {D189A534-BABE-4B96-8106-F860B94C1F99}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {D189A534-BABE-4B96-8106-F860B94C1F99}.Debug|Any CPU.Build.0 = Debug|Any CPU + {D189A534-BABE-4B96-8106-F860B94C1F99}.Release|Any CPU.ActiveCfg = Release|Any CPU + {D189A534-BABE-4B96-8106-F860B94C1F99}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -44,6 +53,7 @@ Global {070B12FF-0B5A-4D0B-B444-70D6F91FB338} = {14EA48C3-3F3A-4789-BB0B-5485771D3840} {893FE0B2-8D14-42BB-B19E-0FD09EEA2433} = {14EA48C3-3F3A-4789-BB0B-5485771D3840} {398DF3E5-B94B-4B2E-9DC6-B6741D7CF4DB} = {14EA48C3-3F3A-4789-BB0B-5485771D3840} + {D189A534-BABE-4B96-8106-F860B94C1F99} = {14EA48C3-3F3A-4789-BB0B-5485771D3840} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {87BED7C0-F181-4EA3-85CD-6D146DA33FF3} From 3cd7a42c4bb1591b5a2a26e0f9fe4fbc189a37e1 Mon Sep 17 00:00:00 2001 From: hhb <455982533@qq.com> Date: Wed, 11 Sep 2024 17:16:03 +0800 Subject: [PATCH 2/3] =?UTF-8?q?=E6=8F=92=E5=85=A5=E7=BB=91=E5=AE=9A?= =?UTF-8?q?=E8=A1=A8=E3=80=81=E6=97=A5=E5=BF=97=E8=A1=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Crontab/Job/HttpJob.cs | 58 +++++++++++++++++-------- Crontab/Model/QZ_JobParams.cs | 2 + Crontab/QuartzService.cs | 27 ++++++++---- Hadoop/ZKLT.Hadoop.Model/HDP_Task.cs | 7 ++- Hadoop/ZKLT.Hadoop.Model/HDP_TaskLog.cs | 21 +++++++++ Hadoop/ZKLT.Hadoop/HadoopService.cs | 4 ++ Hadoop/ZKLT.Hadoop/TaskService.cs | 35 ++++++--------- 7 files changed, 106 insertions(+), 48 deletions(-) create mode 100644 Hadoop/ZKLT.Hadoop.Model/HDP_TaskLog.cs diff --git a/Crontab/Job/HttpJob.cs b/Crontab/Job/HttpJob.cs index a134063..13742da 100644 --- a/Crontab/Job/HttpJob.cs +++ b/Crontab/Job/HttpJob.cs @@ -11,6 +11,7 @@ using ZKLT.Quartz.Model; using ZKLT.Hadoop.Interface; using Newtonsoft.Json; using ZKLT.Hadoop.Model; +using System.Dynamic; namespace ZKLT.Quartz.Job { /// @@ -18,29 +19,29 @@ namespace ZKLT.Quartz.Job /// public class HttpJob : IJob { - private IHadoopService HadoopService; - public HttpJob(IHadoopService _hadoopService) - { - HadoopService = _hadoopService; - } public async Task Execute(IJobExecutionContext context) { JobDataMap dataMap = context.JobDetail.JobDataMap; - string taskId = dataMap.Get("TaskId").ToString(); - Console.WriteLine(taskId); - + IHadoopService _HadoopService = (IHadoopService)dataMap.Get("hadoop"); + // 任务配置项 List configs = (List)dataMap.Get("Params"); - + // 任务Id + string taskId = dataMap.Get("TaskId").ToString(); + // 是否开启日志 + int isLog = (int)dataMap.Get("IsLog"); + // 请求地址 string url = configs.Find(_item => _item.Key == "Url").Value; - HttpMethod httpMethod = configs.Find(_item => _item.Key == "Method").Value == "Get" ? HttpMethod.Get : HttpMethod.Post; + // 请求类型 + HttpMethod httpMethod = configs.Find(_item => _item.Key == "Method").Value == "GET" ? HttpMethod.Get : HttpMethod.Post; + // 返回数据储存表 string bindDatabase = configs.Find(_item => _item.Key == "BindDatabse").Value; - + // 返回数据集合名 + string resultSet = configs.Find(_item => _item.Key == "ResultSet").Value; using (HttpClient client = new HttpClient()) { try { var resultColumnConfigs = new List(); - HttpRequestMessage request = new HttpRequestMessage(httpMethod, url); foreach (var _item in configs) { @@ -57,11 +58,29 @@ namespace ZKLT.Quartz.Job HttpResponseMessage response = await client.SendAsync(request); response.EnsureSuccessStatusCode(); string responseBody = await response.Content.ReadAsStringAsync(); - var responseList = JsonConvert.DeserializeObject>(responseBody); - if(bindDatabase != null) + var responseObject = JsonConvert.DeserializeObject(responseBody); + JArray responseList = (JArray)responseObject.GetValue(resultSet); + // 如果设定了绑定表 插入表中 + if (bindDatabase != null) { HDP_Command[] commands = makeCommand(bindDatabase, responseList, resultColumnConfigs); - HadoopService.PatchCommand(commands); + _HadoopService.PatchCommand(commands); + } + // 日志开启,插入日志 + if (isLog == HDP_Task.LOGOPEN) + { + var data = new HDP_TaskLog() + { + Id = "TL" + DateTime.Now.ToUniversalTime(), + TaskId = taskId, + Content = responseBody + }; + var logCommand = new HDP_Command() + { + TableId = HDP_TaskLog.TABLEID, + Data = JToken.FromObject(data) + }; + _HadoopService.Insert(logCommand); } } catch (HttpRequestException e) @@ -69,11 +88,12 @@ namespace ZKLT.Quartz.Job Console.WriteLine($"请求错误{e.Message}"); } } + await Task.CompletedTask; } - public HDP_Command[] makeCommand(string bindDatabase,List responseList,List configList) + public HDP_Command[] makeCommand(string bindDatabase,JArray responseList,List configList) { HDP_Command[] commands = new HDP_Command?[responseList.Count]; foreach (var _item in responseList) @@ -81,10 +101,14 @@ namespace ZKLT.Quartz.Job var command = new HDP_Command(); command.TableId = bindDatabase; command.Type = HDP_CommandType.INSERT; + var data = new JObject(); foreach (var _config in configList) { - command.Data[_config.Value] = _item[_config.Key]; + data[_config.Value] = _item[_config.Key]; } + command.Data = data; + int _index = responseList.IndexOf(_item); + commands[_index] = command; } return commands; } diff --git a/Crontab/Model/QZ_JobParams.cs b/Crontab/Model/QZ_JobParams.cs index 80790df..d3f3e40 100644 --- a/Crontab/Model/QZ_JobParams.cs +++ b/Crontab/Model/QZ_JobParams.cs @@ -16,6 +16,7 @@ namespace ZKLT.Quartz.Model private string? _Body; private string? _CronTime; private string? _BindTable; + private int _IsLog; private List? _Params; public string TaskId { get => _TaskId; set => _TaskId = value;} @@ -25,6 +26,7 @@ namespace ZKLT.Quartz.Model public string? Body { get => _Body; set => _Body = value;} public string? CronTime { get => _CronTime; set => _CronTime = value; } public string? BindTable { get => _BindTable; set => _BindTable = value; } + public int IsLog { get => _IsLog; set => _IsLog = value; } public List? Params { get => _Params; set => _Params = value;} } } diff --git a/Crontab/QuartzService.cs b/Crontab/QuartzService.cs index 003526b..d1cf79e 100644 --- a/Crontab/QuartzService.cs +++ b/Crontab/QuartzService.cs @@ -2,6 +2,8 @@ using Quartz; using Quartz.Impl; using System.Threading; +using ZKLT.Hadoop.Interface; +using ZKLT.Hadoop.Model; using ZKLT.Quartz.Interface; using ZKLT.Quartz.Job; using ZKLT.Quartz.Model; @@ -10,15 +12,16 @@ namespace ZKLT.Quartz { public class QuartzService : IQuartzService { - private const string HTTPTASK = "http"; - private const string SQLTASK = "sql"; - private IScheduler Scheduler; - public QuartzService() + private IScheduler _Scheduler; + + private IHadoopService _HadoopService; + public QuartzService(IHadoopService hadoopService) { + _HadoopService = hadoopService; //实例化调度器 - Scheduler = StdSchedulerFactory.GetDefaultScheduler().Result; - Scheduler.Start(); + _Scheduler = StdSchedulerFactory.GetDefaultScheduler().Result; + _Scheduler.Start(); } public void CreateHttpJob(QZ_JobParams jobParams) @@ -28,13 +31,17 @@ namespace ZKLT.Quartz //创建HTTP作业 Id作为名称 IJobDetail job = JobBuilder.Create() - .WithIdentity("j" + jobParams.TaskId, HTTPTASK) + .WithIdentity("j" + jobParams.TaskId, HDP_Task.HTTPTASK) .UsingJobData(jobDataMap) .Build(); + //创建触发器 - ITrigger trigger = TriggerBuilder.Create().WithIdentity("t" + jobParams.TaskId, HTTPTASK).StartNow().WithCronSchedule(jobParams.CronTime).Build(); + ITrigger trigger = TriggerBuilder.Create().WithIdentity("t" + jobParams.TaskId, HDP_Task.HTTPTASK).StartNow() + .WithCronSchedule(jobParams.CronTime) + .Build(); + // 把作业,触发器加入调度器 - Scheduler.ScheduleJob(job, trigger); + _Scheduler.ScheduleJob(job, trigger); } private JobDataMap MakeJobDataMap(QZ_JobParams jobParams) @@ -48,6 +55,7 @@ namespace ZKLT.Quartz if (propValue == null) continue; jobDataMap.Add(prop.Name, propValue); } + jobDataMap.Add("hadoop", _HadoopService); return jobDataMap; } @@ -58,6 +66,7 @@ namespace ZKLT.Quartz public void CloseJob() { + } diff --git a/Hadoop/ZKLT.Hadoop.Model/HDP_Task.cs b/Hadoop/ZKLT.Hadoop.Model/HDP_Task.cs index 9a4eb48..3d60470 100644 --- a/Hadoop/ZKLT.Hadoop.Model/HDP_Task.cs +++ b/Hadoop/ZKLT.Hadoop.Model/HDP_Task.cs @@ -9,11 +9,16 @@ namespace ZKLT.Hadoop.Model { public class HDP_Task { + public const string HTTPTASK = "http"; + public const string SQLTASK = "sql"; + public const int LOGOPEN = 1; + public const int LOGCLOSE = 0; private string _Id; private string? _Title; private int? _IsActive; private string? _Type; private string? _CronTime; + private int _IsLog; private List? _TaskConfigs; public string Id { get => _Id; set => _Id = value; } @@ -24,7 +29,7 @@ namespace ZKLT.Hadoop.Model public string? Type { get => _Type; set => _Type = value; } public string? CronTime { get => _CronTime; set => _CronTime = value; } - + public int IsLog { get => _IsLog; set => _IsLog = value; } public List? TaskConfigs { get => _TaskConfigs; set => _TaskConfigs = value; } } } diff --git a/Hadoop/ZKLT.Hadoop.Model/HDP_TaskLog.cs b/Hadoop/ZKLT.Hadoop.Model/HDP_TaskLog.cs new file mode 100644 index 0000000..ec86e1c --- /dev/null +++ b/Hadoop/ZKLT.Hadoop.Model/HDP_TaskLog.cs @@ -0,0 +1,21 @@ +using Newtonsoft.Json.Linq; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace ZKLT.Hadoop.Model +{ + public class HDP_TaskLog + { + public const string TABLEID = "ERP_TaskLog"; + private string _Id; + private string _TaskId; + private JToken? _Content; + + public string Id { get =>_Id; set => _Id = value; } + public string TaskId { get => _TaskId; set => _TaskId = value; } + public JToken? Content { get => _Content; set => _Content = value; } + } +} diff --git a/Hadoop/ZKLT.Hadoop/HadoopService.cs b/Hadoop/ZKLT.Hadoop/HadoopService.cs index c6c479d..c2ff30c 100644 --- a/Hadoop/ZKLT.Hadoop/HadoopService.cs +++ b/Hadoop/ZKLT.Hadoop/HadoopService.cs @@ -2,6 +2,9 @@ using Microsoft.Extensions.DependencyInjection; using MySqlX.XDevAPI.Relational; using Newtonsoft.Json.Linq; +using Quartz.Impl; +using Quartz; +using Quartz.Spi; using System; using System.Collections.Generic; using System.Linq; @@ -31,6 +34,7 @@ namespace ZKLT.Hadoop services.AddSingleton(); services.AddSingleton(); services.AddSingleton(); + return services; } diff --git a/Hadoop/ZKLT.Hadoop/TaskService.cs b/Hadoop/ZKLT.Hadoop/TaskService.cs index cece116..34ff26d 100644 --- a/Hadoop/ZKLT.Hadoop/TaskService.cs +++ b/Hadoop/ZKLT.Hadoop/TaskService.cs @@ -23,33 +23,26 @@ namespace ZKLT.Hadoop public string Start(HDP_Task taskParams) { //获取计划任务信息 - var command = new HDP_Command(); - command.TableId = "ERP_Task"; - command.Where = new JObject - { - { "Id","=" } - }; - command.Data = new JObject - { - {"Id",taskParams.Id } - }; - var taskInfo = _HadoopService.QuerySingle(command); - command.TableId = "ERP_TaskConfig"; - command.Where = new JObject - { - { "TaskId","=" } - }; - command.Data = new JObject - { - {"TaskId",taskParams.Id } - }; + var taskCommand = new HDP_Command(); + taskCommand.TableId = "ERP_Task"; + taskCommand.Where = new JObject() { { "Id","=" } }; + taskCommand.Data = new JObject() { {"Id",taskParams.Id } }; + var taskInfo = _HadoopService.QuerySingle(taskCommand); + //获取计划任务配置 + var taskConfigCommand = new HDP_Command(); + taskConfigCommand.TableId = "ERP_TaskConfig"; + taskConfigCommand.Where = new JObject() { { "TaskId","=" } }; + taskConfigCommand.Data = new JObject() { {"TaskId",taskParams.Id} }; QZ_JobParams jobParams = new QZ_JobParams(); jobParams.TaskId = taskInfo.Id; jobParams.CronTime = taskInfo.CronTime; - jobParams.Params = _HadoopService.Query(command).ToList(); + jobParams.IsLog = taskInfo.IsLog; + jobParams.Params = _HadoopService.Query(taskConfigCommand).ToList(); + // 调用任务管理类 开启任务管理 _QuartzService.CreateHttpJob(jobParams); + return taskParams.Id; } public string Pause(string Id) From 812061416123b92ad3bf2f32d1e647c6373d5acf Mon Sep 17 00:00:00 2001 From: hhb <455982533@qq.com> Date: Tue, 17 Sep 2024 13:33:22 +0800 Subject: [PATCH 3/3] =?UTF-8?q?=E8=AE=A1=E5=88=92=E4=BB=BB=E5=8A=A1HTTP?= =?UTF-8?q?=E5=AE=8C=E6=88=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Crontab/Interface/IQuartzService.cs | 4 +- Crontab/Job/HttpJob.cs | 46 +++++++++++++++++-- Crontab/QuartzService.cs | 8 +++- .../Controllers/HadoopController.cs | 2 +- Hadoop/ZKLT.Hadoop.Interface/ITaskService.cs | 3 +- Hadoop/ZKLT.Hadoop.Model/HDP_Task.cs | 2 + Hadoop/ZKLT.Hadoop/TaskService.cs | 29 +++++++++--- 7 files changed, 76 insertions(+), 18 deletions(-) diff --git a/Crontab/Interface/IQuartzService.cs b/Crontab/Interface/IQuartzService.cs index 219c3b2..0e9b73b 100644 --- a/Crontab/Interface/IQuartzService.cs +++ b/Crontab/Interface/IQuartzService.cs @@ -10,11 +10,11 @@ namespace ZKLT.Quartz.Interface { public interface IQuartzService { - public void CreateHttpJob(QZ_JobParams httpParams); + public void CreateHttpJob(QZ_JobParams jobParams); public void CreateSqlJob(); - public void CloseJob(); + public void CloseJob(QZ_JobParams jobParams); } } diff --git a/Crontab/Job/HttpJob.cs b/Crontab/Job/HttpJob.cs index 13742da..124d450 100644 --- a/Crontab/Job/HttpJob.cs +++ b/Crontab/Job/HttpJob.cs @@ -35,6 +35,8 @@ namespace ZKLT.Quartz.Job HttpMethod httpMethod = configs.Find(_item => _item.Key == "Method").Value == "GET" ? HttpMethod.Get : HttpMethod.Post; // 返回数据储存表 string bindDatabase = configs.Find(_item => _item.Key == "BindDatabse").Value; + // 返回数组储存表的唯一键,用做 更新/新增 判断 + string bindDatabasePrimary = configs.Find(_item => _item.Key == "BindDatabasePrimary").Value; // 返回数据集合名 string resultSet = configs.Find(_item => _item.Key == "ResultSet").Value; using (HttpClient client = new HttpClient()) @@ -63,7 +65,7 @@ namespace ZKLT.Quartz.Job // 如果设定了绑定表 插入表中 if (bindDatabase != null) { - HDP_Command[] commands = makeCommand(bindDatabase, responseList, resultColumnConfigs); + HDP_Command[] commands = makeCommand(bindDatabase,bindDatabasePrimary,_HadoopService,responseList, resultColumnConfigs); _HadoopService.PatchCommand(commands); } // 日志开启,插入日志 @@ -85,16 +87,35 @@ namespace ZKLT.Quartz.Job } catch (HttpRequestException e) { - Console.WriteLine($"请求错误{e.Message}"); } } - - await Task.CompletedTask; } + /// + /// + /// + /// 绑定数据表 + /// 数据唯一值 + /// hadoop实例 + /// 请求返回值 + /// 任务配置项 + /// - public HDP_Command[] makeCommand(string bindDatabase,JArray responseList,List configList) + public HDP_Command[] makeCommand(string bindDatabase,string bindDatabasePrimary,IHadoopService _HadoopService, JArray responseList,List configList) { + //查询数据库是否存在数据 + var queryCommand = new HDP_Command() + { + TableId = bindDatabase + }; + var oldDataDapperRow = _HadoopService.Query(queryCommand).ToList(); + var oldData = new List(); + foreach (var row in oldDataDapperRow) + { + var rowJson = JsonConvert.SerializeObject(row); + JObject rowJObject = JObject.Parse(rowJson); + oldData.Add(rowJObject); + } HDP_Command[] commands = new HDP_Command?[responseList.Count]; foreach (var _item in responseList) { @@ -104,7 +125,22 @@ namespace ZKLT.Quartz.Job var data = new JObject(); foreach (var _config in configList) { + // _config.Key 结果字段 _config.Value 数据库字段 data[_config.Value] = _item[_config.Key]; + + if (_config.Value == bindDatabasePrimary && command.Type == HDP_CommandType.INSERT) + { + + var oldItem = oldData.Find(_oItem => _oItem.GetValue(bindDatabasePrimary).ToString() == data.GetValue(bindDatabasePrimary).ToString()); + if (oldItem != null) + { + command.Where = new JObject() + { + {bindDatabasePrimary, "=" } + }; + command.Type = HDP_CommandType.UPDATE; + } + } } command.Data = data; int _index = responseList.IndexOf(_item); diff --git a/Crontab/QuartzService.cs b/Crontab/QuartzService.cs index d1cf79e..42dc9ec 100644 --- a/Crontab/QuartzService.cs +++ b/Crontab/QuartzService.cs @@ -64,9 +64,15 @@ namespace ZKLT.Quartz } - public void CloseJob() + public void CloseJob(QZ_JobParams jobParams) { + JobKey jobkey = new JobKey("j" + jobParams.TaskId, HDP_Task.HTTPTASK); + TriggerKey triggerKey = new TriggerKey("t" + jobParams.TaskId, HDP_Task.HTTPTASK); + _Scheduler.PauseTrigger(triggerKey); + _Scheduler.UnscheduleJob(triggerKey); + _Scheduler.PauseJob(jobkey); + _Scheduler.DeleteJob(jobkey); } diff --git a/Hadoop/ZKLT.Hadoop.API/Controllers/HadoopController.cs b/Hadoop/ZKLT.Hadoop.API/Controllers/HadoopController.cs index bf69da3..7bf90fd 100644 --- a/Hadoop/ZKLT.Hadoop.API/Controllers/HadoopController.cs +++ b/Hadoop/ZKLT.Hadoop.API/Controllers/HadoopController.cs @@ -299,7 +299,7 @@ namespace ZKLT.Hadoop.API.Controllers { try { - return Ok(_TaskService.Close("1")); + return Ok(_TaskService.Close(taskParams)); } catch (Exception e) { diff --git a/Hadoop/ZKLT.Hadoop.Interface/ITaskService.cs b/Hadoop/ZKLT.Hadoop.Interface/ITaskService.cs index 6c1a3d5..8742405 100644 --- a/Hadoop/ZKLT.Hadoop.Interface/ITaskService.cs +++ b/Hadoop/ZKLT.Hadoop.Interface/ITaskService.cs @@ -10,7 +10,6 @@ namespace ZKLT.Hadoop.Interface public interface ITaskService { public string Start(HDP_Task taskParams); - public string Pause(string Id); - public string Close(string Id); + public string Close(HDP_Task taskParams); } } diff --git a/Hadoop/ZKLT.Hadoop.Model/HDP_Task.cs b/Hadoop/ZKLT.Hadoop.Model/HDP_Task.cs index 3d60470..65d9ed8 100644 --- a/Hadoop/ZKLT.Hadoop.Model/HDP_Task.cs +++ b/Hadoop/ZKLT.Hadoop.Model/HDP_Task.cs @@ -13,6 +13,8 @@ namespace ZKLT.Hadoop.Model public const string SQLTASK = "sql"; public const int LOGOPEN = 1; public const int LOGCLOSE = 0; + public const int ACTIVE = 1; + public const int NOTACTIVE = 0; private string _Id; private string? _Title; private int? _IsActive; diff --git a/Hadoop/ZKLT.Hadoop/TaskService.cs b/Hadoop/ZKLT.Hadoop/TaskService.cs index 34ff26d..9889b29 100644 --- a/Hadoop/ZKLT.Hadoop/TaskService.cs +++ b/Hadoop/ZKLT.Hadoop/TaskService.cs @@ -1,4 +1,5 @@ using Newtonsoft.Json.Linq; +using Org.BouncyCastle.Asn1.Tsp; using System; using System.Collections.Generic; using System.Linq; @@ -28,6 +29,10 @@ namespace ZKLT.Hadoop taskCommand.Where = new JObject() { { "Id","=" } }; taskCommand.Data = new JObject() { {"Id",taskParams.Id } }; var taskInfo = _HadoopService.QuerySingle(taskCommand); + if(taskInfo.IsActive == HDP_Task.ACTIVE) + { + return $"任务已经开启,无需再次开启"; + } //获取计划任务配置 var taskConfigCommand = new HDP_Command(); taskConfigCommand.TableId = "ERP_TaskConfig"; @@ -43,15 +48,25 @@ namespace ZKLT.Hadoop // 调用任务管理类 开启任务管理 _QuartzService.CreateHttpJob(jobParams); - return taskParams.Id; - } - public string Pause(string Id) - { - return Id; + // 更新active + taskCommand.Data = new JObject() { { "Id", taskParams.Id },{ "IsActive",1} }; + _HadoopService.Update(taskCommand); + + return $"任务{taskParams.Id}已开启"; } - public string Close(string Id) + public string Close(HDP_Task taskParams) { - return Id; + QZ_JobParams jobParams = new QZ_JobParams() { TaskId=taskParams.Id}; + _QuartzService.CloseJob(jobParams); + + // 更新active + var taskCommand = new HDP_Command(); + taskCommand.TableId = "ERP_Task"; + taskCommand.Where = new JObject() { { "Id", "=" } }; + taskCommand.Data = new JObject() { { "Id", taskParams.Id }, { "IsActive", 0 } }; + _HadoopService.Update(taskCommand); + + return $"任务{taskParams.Id}已关闭"; } }