diff --git a/Crontab/Interface/IQuartzService.cs b/Crontab/Interface/IQuartzService.cs
index 219c3b2..0e9b73b 100644
--- a/Crontab/Interface/IQuartzService.cs
+++ b/Crontab/Interface/IQuartzService.cs
@@ -10,11 +10,11 @@ namespace ZKLT.Quartz.Interface
{
public interface IQuartzService
{
- public void CreateHttpJob(QZ_JobParams httpParams);
+ public void CreateHttpJob(QZ_JobParams jobParams);
public void CreateSqlJob();
- public void CloseJob();
+ public void CloseJob(QZ_JobParams jobParams);
}
}
diff --git a/Crontab/Job/HttpJob.cs b/Crontab/Job/HttpJob.cs
index 13742da..124d450 100644
--- a/Crontab/Job/HttpJob.cs
+++ b/Crontab/Job/HttpJob.cs
@@ -35,6 +35,8 @@ namespace ZKLT.Quartz.Job
HttpMethod httpMethod = configs.Find(_item => _item.Key == "Method").Value == "GET" ? HttpMethod.Get : HttpMethod.Post;
// 返回数据储存表
string bindDatabase = configs.Find(_item => _item.Key == "BindDatabse").Value;
+ // 返回数组储存表的唯一键,用做 更新/新增 判断
+ string bindDatabasePrimary = configs.Find(_item => _item.Key == "BindDatabasePrimary").Value;
// 返回数据集合名
string resultSet = configs.Find(_item => _item.Key == "ResultSet").Value;
using (HttpClient client = new HttpClient())
@@ -63,7 +65,7 @@ namespace ZKLT.Quartz.Job
// 如果设定了绑定表 插入表中
if (bindDatabase != null)
{
- HDP_Command[] commands = makeCommand(bindDatabase, responseList, resultColumnConfigs);
+ HDP_Command[] commands = makeCommand(bindDatabase,bindDatabasePrimary,_HadoopService,responseList, resultColumnConfigs);
_HadoopService.PatchCommand(commands);
}
// 日志开启,插入日志
@@ -85,16 +87,35 @@ namespace ZKLT.Quartz.Job
}
catch (HttpRequestException e)
{
- Console.WriteLine($"请求错误{e.Message}");
}
}
-
-
await Task.CompletedTask;
}
+ ///
+ ///
+ ///
+ /// 绑定数据表
+ /// 数据唯一值
+ /// hadoop实例
+ /// 请求返回值
+ /// 任务配置项
+ ///
- public HDP_Command[] makeCommand(string bindDatabase,JArray responseList,List configList)
+ public HDP_Command[] makeCommand(string bindDatabase,string bindDatabasePrimary,IHadoopService _HadoopService, JArray responseList,List configList)
{
+ //查询数据库是否存在数据
+ var queryCommand = new HDP_Command()
+ {
+ TableId = bindDatabase
+ };
+ var oldDataDapperRow = _HadoopService.Query(queryCommand).ToList();
+ var oldData = new List();
+ foreach (var row in oldDataDapperRow)
+ {
+ var rowJson = JsonConvert.SerializeObject(row);
+ JObject rowJObject = JObject.Parse(rowJson);
+ oldData.Add(rowJObject);
+ }
HDP_Command[] commands = new HDP_Command?[responseList.Count];
foreach (var _item in responseList)
{
@@ -104,7 +125,22 @@ namespace ZKLT.Quartz.Job
var data = new JObject();
foreach (var _config in configList)
{
+ // _config.Key 结果字段 _config.Value 数据库字段
data[_config.Value] = _item[_config.Key];
+
+ if (_config.Value == bindDatabasePrimary && command.Type == HDP_CommandType.INSERT)
+ {
+
+ var oldItem = oldData.Find(_oItem => _oItem.GetValue(bindDatabasePrimary).ToString() == data.GetValue(bindDatabasePrimary).ToString());
+ if (oldItem != null)
+ {
+ command.Where = new JObject()
+ {
+ {bindDatabasePrimary, "=" }
+ };
+ command.Type = HDP_CommandType.UPDATE;
+ }
+ }
}
command.Data = data;
int _index = responseList.IndexOf(_item);
diff --git a/Crontab/QuartzService.cs b/Crontab/QuartzService.cs
index d1cf79e..42dc9ec 100644
--- a/Crontab/QuartzService.cs
+++ b/Crontab/QuartzService.cs
@@ -64,9 +64,15 @@ namespace ZKLT.Quartz
}
- public void CloseJob()
+ public void CloseJob(QZ_JobParams jobParams)
{
+ JobKey jobkey = new JobKey("j" + jobParams.TaskId, HDP_Task.HTTPTASK);
+ TriggerKey triggerKey = new TriggerKey("t" + jobParams.TaskId, HDP_Task.HTTPTASK);
+ _Scheduler.PauseTrigger(triggerKey);
+ _Scheduler.UnscheduleJob(triggerKey);
+ _Scheduler.PauseJob(jobkey);
+ _Scheduler.DeleteJob(jobkey);
}
diff --git a/Hadoop/ZKLT.Hadoop.API/Controllers/HadoopController.cs b/Hadoop/ZKLT.Hadoop.API/Controllers/HadoopController.cs
index bf69da3..7bf90fd 100644
--- a/Hadoop/ZKLT.Hadoop.API/Controllers/HadoopController.cs
+++ b/Hadoop/ZKLT.Hadoop.API/Controllers/HadoopController.cs
@@ -299,7 +299,7 @@ namespace ZKLT.Hadoop.API.Controllers
{
try
{
- return Ok(_TaskService.Close("1"));
+ return Ok(_TaskService.Close(taskParams));
}
catch (Exception e)
{
diff --git a/Hadoop/ZKLT.Hadoop.Interface/ITaskService.cs b/Hadoop/ZKLT.Hadoop.Interface/ITaskService.cs
index 6c1a3d5..8742405 100644
--- a/Hadoop/ZKLT.Hadoop.Interface/ITaskService.cs
+++ b/Hadoop/ZKLT.Hadoop.Interface/ITaskService.cs
@@ -10,7 +10,6 @@ namespace ZKLT.Hadoop.Interface
public interface ITaskService
{
public string Start(HDP_Task taskParams);
- public string Pause(string Id);
- public string Close(string Id);
+ public string Close(HDP_Task taskParams);
}
}
diff --git a/Hadoop/ZKLT.Hadoop.Model/HDP_Task.cs b/Hadoop/ZKLT.Hadoop.Model/HDP_Task.cs
index 3d60470..65d9ed8 100644
--- a/Hadoop/ZKLT.Hadoop.Model/HDP_Task.cs
+++ b/Hadoop/ZKLT.Hadoop.Model/HDP_Task.cs
@@ -13,6 +13,8 @@ namespace ZKLT.Hadoop.Model
public const string SQLTASK = "sql";
public const int LOGOPEN = 1;
public const int LOGCLOSE = 0;
+ public const int ACTIVE = 1;
+ public const int NOTACTIVE = 0;
private string _Id;
private string? _Title;
private int? _IsActive;
diff --git a/Hadoop/ZKLT.Hadoop/TaskService.cs b/Hadoop/ZKLT.Hadoop/TaskService.cs
index 34ff26d..9889b29 100644
--- a/Hadoop/ZKLT.Hadoop/TaskService.cs
+++ b/Hadoop/ZKLT.Hadoop/TaskService.cs
@@ -1,4 +1,5 @@
using Newtonsoft.Json.Linq;
+using Org.BouncyCastle.Asn1.Tsp;
using System;
using System.Collections.Generic;
using System.Linq;
@@ -28,6 +29,10 @@ namespace ZKLT.Hadoop
taskCommand.Where = new JObject() { { "Id","=" } };
taskCommand.Data = new JObject() { {"Id",taskParams.Id } };
var taskInfo = _HadoopService.QuerySingle(taskCommand);
+ if(taskInfo.IsActive == HDP_Task.ACTIVE)
+ {
+ return $"任务已经开启,无需再次开启";
+ }
//获取计划任务配置
var taskConfigCommand = new HDP_Command();
taskConfigCommand.TableId = "ERP_TaskConfig";
@@ -43,15 +48,25 @@ namespace ZKLT.Hadoop
// 调用任务管理类 开启任务管理
_QuartzService.CreateHttpJob(jobParams);
- return taskParams.Id;
- }
- public string Pause(string Id)
- {
- return Id;
+ // 更新active
+ taskCommand.Data = new JObject() { { "Id", taskParams.Id },{ "IsActive",1} };
+ _HadoopService.Update(taskCommand);
+
+ return $"任务{taskParams.Id}已开启";
}
- public string Close(string Id)
+ public string Close(HDP_Task taskParams)
{
- return Id;
+ QZ_JobParams jobParams = new QZ_JobParams() { TaskId=taskParams.Id};
+ _QuartzService.CloseJob(jobParams);
+
+ // 更新active
+ var taskCommand = new HDP_Command();
+ taskCommand.TableId = "ERP_Task";
+ taskCommand.Where = new JObject() { { "Id", "=" } };
+ taskCommand.Data = new JObject() { { "Id", taskParams.Id }, { "IsActive", 0 } };
+ _HadoopService.Update(taskCommand);
+
+ return $"任务{taskParams.Id}已关闭";
}
}