From 812061416123b92ad3bf2f32d1e647c6373d5acf Mon Sep 17 00:00:00 2001 From: hhb <455982533@qq.com> Date: Tue, 17 Sep 2024 13:33:22 +0800 Subject: [PATCH] =?UTF-8?q?=E8=AE=A1=E5=88=92=E4=BB=BB=E5=8A=A1HTTP?= =?UTF-8?q?=E5=AE=8C=E6=88=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Crontab/Interface/IQuartzService.cs | 4 +- Crontab/Job/HttpJob.cs | 46 +++++++++++++++++-- Crontab/QuartzService.cs | 8 +++- .../Controllers/HadoopController.cs | 2 +- Hadoop/ZKLT.Hadoop.Interface/ITaskService.cs | 3 +- Hadoop/ZKLT.Hadoop.Model/HDP_Task.cs | 2 + Hadoop/ZKLT.Hadoop/TaskService.cs | 29 +++++++++--- 7 files changed, 76 insertions(+), 18 deletions(-) diff --git a/Crontab/Interface/IQuartzService.cs b/Crontab/Interface/IQuartzService.cs index 219c3b2..0e9b73b 100644 --- a/Crontab/Interface/IQuartzService.cs +++ b/Crontab/Interface/IQuartzService.cs @@ -10,11 +10,11 @@ namespace ZKLT.Quartz.Interface { public interface IQuartzService { - public void CreateHttpJob(QZ_JobParams httpParams); + public void CreateHttpJob(QZ_JobParams jobParams); public void CreateSqlJob(); - public void CloseJob(); + public void CloseJob(QZ_JobParams jobParams); } } diff --git a/Crontab/Job/HttpJob.cs b/Crontab/Job/HttpJob.cs index 13742da..124d450 100644 --- a/Crontab/Job/HttpJob.cs +++ b/Crontab/Job/HttpJob.cs @@ -35,6 +35,8 @@ namespace ZKLT.Quartz.Job HttpMethod httpMethod = configs.Find(_item => _item.Key == "Method").Value == "GET" ? HttpMethod.Get : HttpMethod.Post; // 返回数据储存表 string bindDatabase = configs.Find(_item => _item.Key == "BindDatabse").Value; + // 返回数组储存表的唯一键,用做 更新/新增 判断 + string bindDatabasePrimary = configs.Find(_item => _item.Key == "BindDatabasePrimary").Value; // 返回数据集合名 string resultSet = configs.Find(_item => _item.Key == "ResultSet").Value; using (HttpClient client = new HttpClient()) @@ -63,7 +65,7 @@ namespace ZKLT.Quartz.Job // 如果设定了绑定表 插入表中 if (bindDatabase != null) { - HDP_Command[] commands = makeCommand(bindDatabase, responseList, resultColumnConfigs); + HDP_Command[] commands = makeCommand(bindDatabase,bindDatabasePrimary,_HadoopService,responseList, resultColumnConfigs); _HadoopService.PatchCommand(commands); } // 日志开启,插入日志 @@ -85,16 +87,35 @@ namespace ZKLT.Quartz.Job } catch (HttpRequestException e) { - Console.WriteLine($"请求错误{e.Message}"); } } - - await Task.CompletedTask; } + /// + /// + /// + /// 绑定数据表 + /// 数据唯一值 + /// hadoop实例 + /// 请求返回值 + /// 任务配置项 + /// - public HDP_Command[] makeCommand(string bindDatabase,JArray responseList,List configList) + public HDP_Command[] makeCommand(string bindDatabase,string bindDatabasePrimary,IHadoopService _HadoopService, JArray responseList,List configList) { + //查询数据库是否存在数据 + var queryCommand = new HDP_Command() + { + TableId = bindDatabase + }; + var oldDataDapperRow = _HadoopService.Query(queryCommand).ToList(); + var oldData = new List(); + foreach (var row in oldDataDapperRow) + { + var rowJson = JsonConvert.SerializeObject(row); + JObject rowJObject = JObject.Parse(rowJson); + oldData.Add(rowJObject); + } HDP_Command[] commands = new HDP_Command?[responseList.Count]; foreach (var _item in responseList) { @@ -104,7 +125,22 @@ namespace ZKLT.Quartz.Job var data = new JObject(); foreach (var _config in configList) { + // _config.Key 结果字段 _config.Value 数据库字段 data[_config.Value] = _item[_config.Key]; + + if (_config.Value == bindDatabasePrimary && command.Type == HDP_CommandType.INSERT) + { + + var oldItem = oldData.Find(_oItem => _oItem.GetValue(bindDatabasePrimary).ToString() == data.GetValue(bindDatabasePrimary).ToString()); + if (oldItem != null) + { + command.Where = new JObject() + { + {bindDatabasePrimary, "=" } + }; + command.Type = HDP_CommandType.UPDATE; + } + } } command.Data = data; int _index = responseList.IndexOf(_item); diff --git a/Crontab/QuartzService.cs b/Crontab/QuartzService.cs index d1cf79e..42dc9ec 100644 --- a/Crontab/QuartzService.cs +++ b/Crontab/QuartzService.cs @@ -64,9 +64,15 @@ namespace ZKLT.Quartz } - public void CloseJob() + public void CloseJob(QZ_JobParams jobParams) { + JobKey jobkey = new JobKey("j" + jobParams.TaskId, HDP_Task.HTTPTASK); + TriggerKey triggerKey = new TriggerKey("t" + jobParams.TaskId, HDP_Task.HTTPTASK); + _Scheduler.PauseTrigger(triggerKey); + _Scheduler.UnscheduleJob(triggerKey); + _Scheduler.PauseJob(jobkey); + _Scheduler.DeleteJob(jobkey); } diff --git a/Hadoop/ZKLT.Hadoop.API/Controllers/HadoopController.cs b/Hadoop/ZKLT.Hadoop.API/Controllers/HadoopController.cs index bf69da3..7bf90fd 100644 --- a/Hadoop/ZKLT.Hadoop.API/Controllers/HadoopController.cs +++ b/Hadoop/ZKLT.Hadoop.API/Controllers/HadoopController.cs @@ -299,7 +299,7 @@ namespace ZKLT.Hadoop.API.Controllers { try { - return Ok(_TaskService.Close("1")); + return Ok(_TaskService.Close(taskParams)); } catch (Exception e) { diff --git a/Hadoop/ZKLT.Hadoop.Interface/ITaskService.cs b/Hadoop/ZKLT.Hadoop.Interface/ITaskService.cs index 6c1a3d5..8742405 100644 --- a/Hadoop/ZKLT.Hadoop.Interface/ITaskService.cs +++ b/Hadoop/ZKLT.Hadoop.Interface/ITaskService.cs @@ -10,7 +10,6 @@ namespace ZKLT.Hadoop.Interface public interface ITaskService { public string Start(HDP_Task taskParams); - public string Pause(string Id); - public string Close(string Id); + public string Close(HDP_Task taskParams); } } diff --git a/Hadoop/ZKLT.Hadoop.Model/HDP_Task.cs b/Hadoop/ZKLT.Hadoop.Model/HDP_Task.cs index 3d60470..65d9ed8 100644 --- a/Hadoop/ZKLT.Hadoop.Model/HDP_Task.cs +++ b/Hadoop/ZKLT.Hadoop.Model/HDP_Task.cs @@ -13,6 +13,8 @@ namespace ZKLT.Hadoop.Model public const string SQLTASK = "sql"; public const int LOGOPEN = 1; public const int LOGCLOSE = 0; + public const int ACTIVE = 1; + public const int NOTACTIVE = 0; private string _Id; private string? _Title; private int? _IsActive; diff --git a/Hadoop/ZKLT.Hadoop/TaskService.cs b/Hadoop/ZKLT.Hadoop/TaskService.cs index 34ff26d..9889b29 100644 --- a/Hadoop/ZKLT.Hadoop/TaskService.cs +++ b/Hadoop/ZKLT.Hadoop/TaskService.cs @@ -1,4 +1,5 @@ using Newtonsoft.Json.Linq; +using Org.BouncyCastle.Asn1.Tsp; using System; using System.Collections.Generic; using System.Linq; @@ -28,6 +29,10 @@ namespace ZKLT.Hadoop taskCommand.Where = new JObject() { { "Id","=" } }; taskCommand.Data = new JObject() { {"Id",taskParams.Id } }; var taskInfo = _HadoopService.QuerySingle(taskCommand); + if(taskInfo.IsActive == HDP_Task.ACTIVE) + { + return $"任务已经开启,无需再次开启"; + } //获取计划任务配置 var taskConfigCommand = new HDP_Command(); taskConfigCommand.TableId = "ERP_TaskConfig"; @@ -43,15 +48,25 @@ namespace ZKLT.Hadoop // 调用任务管理类 开启任务管理 _QuartzService.CreateHttpJob(jobParams); - return taskParams.Id; - } - public string Pause(string Id) - { - return Id; + // 更新active + taskCommand.Data = new JObject() { { "Id", taskParams.Id },{ "IsActive",1} }; + _HadoopService.Update(taskCommand); + + return $"任务{taskParams.Id}已开启"; } - public string Close(string Id) + public string Close(HDP_Task taskParams) { - return Id; + QZ_JobParams jobParams = new QZ_JobParams() { TaskId=taskParams.Id}; + _QuartzService.CloseJob(jobParams); + + // 更新active + var taskCommand = new HDP_Command(); + taskCommand.TableId = "ERP_Task"; + taskCommand.Where = new JObject() { { "Id", "=" } }; + taskCommand.Data = new JObject() { { "Id", taskParams.Id }, { "IsActive", 0 } }; + _HadoopService.Update(taskCommand); + + return $"任务{taskParams.Id}已关闭"; } }