diff --git a/Crontab/Job/HttpJob.cs b/Crontab/Job/HttpJob.cs
index a134063..13742da 100644
--- a/Crontab/Job/HttpJob.cs
+++ b/Crontab/Job/HttpJob.cs
@@ -11,6 +11,7 @@ using ZKLT.Quartz.Model;
using ZKLT.Hadoop.Interface;
using Newtonsoft.Json;
using ZKLT.Hadoop.Model;
+using System.Dynamic;
namespace ZKLT.Quartz.Job
{
///
@@ -18,29 +19,29 @@ namespace ZKLT.Quartz.Job
///
public class HttpJob : IJob
{
- private IHadoopService HadoopService;
- public HttpJob(IHadoopService _hadoopService)
- {
- HadoopService = _hadoopService;
- }
public async Task Execute(IJobExecutionContext context)
{
JobDataMap dataMap = context.JobDetail.JobDataMap;
- string taskId = dataMap.Get("TaskId").ToString();
- Console.WriteLine(taskId);
-
+ IHadoopService _HadoopService = (IHadoopService)dataMap.Get("hadoop");
+ // 任务配置项
List configs = (List)dataMap.Get("Params");
-
+ // 任务Id
+ string taskId = dataMap.Get("TaskId").ToString();
+ // 是否开启日志
+ int isLog = (int)dataMap.Get("IsLog");
+ // 请求地址
string url = configs.Find(_item => _item.Key == "Url").Value;
- HttpMethod httpMethod = configs.Find(_item => _item.Key == "Method").Value == "Get" ? HttpMethod.Get : HttpMethod.Post;
+ // 请求类型
+ HttpMethod httpMethod = configs.Find(_item => _item.Key == "Method").Value == "GET" ? HttpMethod.Get : HttpMethod.Post;
+ // 返回数据储存表
string bindDatabase = configs.Find(_item => _item.Key == "BindDatabse").Value;
-
+ // 返回数据集合名
+ string resultSet = configs.Find(_item => _item.Key == "ResultSet").Value;
using (HttpClient client = new HttpClient())
{
try
{
var resultColumnConfigs = new List();
-
HttpRequestMessage request = new HttpRequestMessage(httpMethod, url);
foreach (var _item in configs)
{
@@ -57,11 +58,29 @@ namespace ZKLT.Quartz.Job
HttpResponseMessage response = await client.SendAsync(request);
response.EnsureSuccessStatusCode();
string responseBody = await response.Content.ReadAsStringAsync();
- var responseList = JsonConvert.DeserializeObject>(responseBody);
- if(bindDatabase != null)
+ var responseObject = JsonConvert.DeserializeObject(responseBody);
+ JArray responseList = (JArray)responseObject.GetValue(resultSet);
+ // 如果设定了绑定表 插入表中
+ if (bindDatabase != null)
{
HDP_Command[] commands = makeCommand(bindDatabase, responseList, resultColumnConfigs);
- HadoopService.PatchCommand(commands);
+ _HadoopService.PatchCommand(commands);
+ }
+ // 日志开启,插入日志
+ if (isLog == HDP_Task.LOGOPEN)
+ {
+ var data = new HDP_TaskLog()
+ {
+ Id = "TL" + DateTime.Now.ToUniversalTime(),
+ TaskId = taskId,
+ Content = responseBody
+ };
+ var logCommand = new HDP_Command()
+ {
+ TableId = HDP_TaskLog.TABLEID,
+ Data = JToken.FromObject(data)
+ };
+ _HadoopService.Insert(logCommand);
}
}
catch (HttpRequestException e)
@@ -69,11 +88,12 @@ namespace ZKLT.Quartz.Job
Console.WriteLine($"请求错误{e.Message}");
}
}
+
await Task.CompletedTask;
}
- public HDP_Command[] makeCommand(string bindDatabase,List responseList,List configList)
+ public HDP_Command[] makeCommand(string bindDatabase,JArray responseList,List configList)
{
HDP_Command[] commands = new HDP_Command?[responseList.Count];
foreach (var _item in responseList)
@@ -81,10 +101,14 @@ namespace ZKLT.Quartz.Job
var command = new HDP_Command();
command.TableId = bindDatabase;
command.Type = HDP_CommandType.INSERT;
+ var data = new JObject();
foreach (var _config in configList)
{
- command.Data[_config.Value] = _item[_config.Key];
+ data[_config.Value] = _item[_config.Key];
}
+ command.Data = data;
+ int _index = responseList.IndexOf(_item);
+ commands[_index] = command;
}
return commands;
}
diff --git a/Crontab/Model/QZ_JobParams.cs b/Crontab/Model/QZ_JobParams.cs
index 80790df..d3f3e40 100644
--- a/Crontab/Model/QZ_JobParams.cs
+++ b/Crontab/Model/QZ_JobParams.cs
@@ -16,6 +16,7 @@ namespace ZKLT.Quartz.Model
private string? _Body;
private string? _CronTime;
private string? _BindTable;
+ private int _IsLog;
private List? _Params;
public string TaskId { get => _TaskId; set => _TaskId = value;}
@@ -25,6 +26,7 @@ namespace ZKLT.Quartz.Model
public string? Body { get => _Body; set => _Body = value;}
public string? CronTime { get => _CronTime; set => _CronTime = value; }
public string? BindTable { get => _BindTable; set => _BindTable = value; }
+ public int IsLog { get => _IsLog; set => _IsLog = value; }
public List? Params { get => _Params; set => _Params = value;}
}
}
diff --git a/Crontab/QuartzService.cs b/Crontab/QuartzService.cs
index 003526b..d1cf79e 100644
--- a/Crontab/QuartzService.cs
+++ b/Crontab/QuartzService.cs
@@ -2,6 +2,8 @@
using Quartz;
using Quartz.Impl;
using System.Threading;
+using ZKLT.Hadoop.Interface;
+using ZKLT.Hadoop.Model;
using ZKLT.Quartz.Interface;
using ZKLT.Quartz.Job;
using ZKLT.Quartz.Model;
@@ -10,15 +12,16 @@ namespace ZKLT.Quartz
{
public class QuartzService : IQuartzService
{
- private const string HTTPTASK = "http";
- private const string SQLTASK = "sql";
- private IScheduler Scheduler;
- public QuartzService()
+ private IScheduler _Scheduler;
+
+ private IHadoopService _HadoopService;
+ public QuartzService(IHadoopService hadoopService)
{
+ _HadoopService = hadoopService;
//实例化调度器
- Scheduler = StdSchedulerFactory.GetDefaultScheduler().Result;
- Scheduler.Start();
+ _Scheduler = StdSchedulerFactory.GetDefaultScheduler().Result;
+ _Scheduler.Start();
}
public void CreateHttpJob(QZ_JobParams jobParams)
@@ -28,13 +31,17 @@ namespace ZKLT.Quartz
//创建HTTP作业 Id作为名称
IJobDetail job = JobBuilder.Create()
- .WithIdentity("j" + jobParams.TaskId, HTTPTASK)
+ .WithIdentity("j" + jobParams.TaskId, HDP_Task.HTTPTASK)
.UsingJobData(jobDataMap)
.Build();
+
//创建触发器
- ITrigger trigger = TriggerBuilder.Create().WithIdentity("t" + jobParams.TaskId, HTTPTASK).StartNow().WithCronSchedule(jobParams.CronTime).Build();
+ ITrigger trigger = TriggerBuilder.Create().WithIdentity("t" + jobParams.TaskId, HDP_Task.HTTPTASK).StartNow()
+ .WithCronSchedule(jobParams.CronTime)
+ .Build();
+
// 把作业,触发器加入调度器
- Scheduler.ScheduleJob(job, trigger);
+ _Scheduler.ScheduleJob(job, trigger);
}
private JobDataMap MakeJobDataMap(QZ_JobParams jobParams)
@@ -48,6 +55,7 @@ namespace ZKLT.Quartz
if (propValue == null) continue;
jobDataMap.Add(prop.Name, propValue);
}
+ jobDataMap.Add("hadoop", _HadoopService);
return jobDataMap;
}
@@ -58,6 +66,7 @@ namespace ZKLT.Quartz
public void CloseJob()
{
+
}
diff --git a/Hadoop/ZKLT.Hadoop.Model/HDP_Task.cs b/Hadoop/ZKLT.Hadoop.Model/HDP_Task.cs
index 9a4eb48..3d60470 100644
--- a/Hadoop/ZKLT.Hadoop.Model/HDP_Task.cs
+++ b/Hadoop/ZKLT.Hadoop.Model/HDP_Task.cs
@@ -9,11 +9,16 @@ namespace ZKLT.Hadoop.Model
{
public class HDP_Task
{
+ public const string HTTPTASK = "http";
+ public const string SQLTASK = "sql";
+ public const int LOGOPEN = 1;
+ public const int LOGCLOSE = 0;
private string _Id;
private string? _Title;
private int? _IsActive;
private string? _Type;
private string? _CronTime;
+ private int _IsLog;
private List? _TaskConfigs;
public string Id { get => _Id; set => _Id = value; }
@@ -24,7 +29,7 @@ namespace ZKLT.Hadoop.Model
public string? Type { get => _Type; set => _Type = value; }
public string? CronTime { get => _CronTime; set => _CronTime = value; }
-
+ public int IsLog { get => _IsLog; set => _IsLog = value; }
public List? TaskConfigs { get => _TaskConfigs; set => _TaskConfigs = value; }
}
}
diff --git a/Hadoop/ZKLT.Hadoop.Model/HDP_TaskLog.cs b/Hadoop/ZKLT.Hadoop.Model/HDP_TaskLog.cs
new file mode 100644
index 0000000..ec86e1c
--- /dev/null
+++ b/Hadoop/ZKLT.Hadoop.Model/HDP_TaskLog.cs
@@ -0,0 +1,21 @@
+using Newtonsoft.Json.Linq;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace ZKLT.Hadoop.Model
+{
+ public class HDP_TaskLog
+ {
+ public const string TABLEID = "ERP_TaskLog";
+ private string _Id;
+ private string _TaskId;
+ private JToken? _Content;
+
+ public string Id { get =>_Id; set => _Id = value; }
+ public string TaskId { get => _TaskId; set => _TaskId = value; }
+ public JToken? Content { get => _Content; set => _Content = value; }
+ }
+}
diff --git a/Hadoop/ZKLT.Hadoop/HadoopService.cs b/Hadoop/ZKLT.Hadoop/HadoopService.cs
index c6c479d..c2ff30c 100644
--- a/Hadoop/ZKLT.Hadoop/HadoopService.cs
+++ b/Hadoop/ZKLT.Hadoop/HadoopService.cs
@@ -2,6 +2,9 @@
using Microsoft.Extensions.DependencyInjection;
using MySqlX.XDevAPI.Relational;
using Newtonsoft.Json.Linq;
+using Quartz.Impl;
+using Quartz;
+using Quartz.Spi;
using System;
using System.Collections.Generic;
using System.Linq;
@@ -31,6 +34,7 @@ namespace ZKLT.Hadoop
services.AddSingleton();
services.AddSingleton();
services.AddSingleton();
+
return services;
}
diff --git a/Hadoop/ZKLT.Hadoop/TaskService.cs b/Hadoop/ZKLT.Hadoop/TaskService.cs
index cece116..34ff26d 100644
--- a/Hadoop/ZKLT.Hadoop/TaskService.cs
+++ b/Hadoop/ZKLT.Hadoop/TaskService.cs
@@ -23,33 +23,26 @@ namespace ZKLT.Hadoop
public string Start(HDP_Task taskParams)
{
//获取计划任务信息
- var command = new HDP_Command();
- command.TableId = "ERP_Task";
- command.Where = new JObject
- {
- { "Id","=" }
- };
- command.Data = new JObject
- {
- {"Id",taskParams.Id }
- };
- var taskInfo = _HadoopService.QuerySingle(command);
- command.TableId = "ERP_TaskConfig";
- command.Where = new JObject
- {
- { "TaskId","=" }
- };
- command.Data = new JObject
- {
- {"TaskId",taskParams.Id }
- };
+ var taskCommand = new HDP_Command();
+ taskCommand.TableId = "ERP_Task";
+ taskCommand.Where = new JObject() { { "Id","=" } };
+ taskCommand.Data = new JObject() { {"Id",taskParams.Id } };
+ var taskInfo = _HadoopService.QuerySingle(taskCommand);
+ //获取计划任务配置
+ var taskConfigCommand = new HDP_Command();
+ taskConfigCommand.TableId = "ERP_TaskConfig";
+ taskConfigCommand.Where = new JObject() { { "TaskId","=" } };
+ taskConfigCommand.Data = new JObject() { {"TaskId",taskParams.Id} };
QZ_JobParams jobParams = new QZ_JobParams();
jobParams.TaskId = taskInfo.Id;
jobParams.CronTime = taskInfo.CronTime;
- jobParams.Params = _HadoopService.Query(command).ToList();
+ jobParams.IsLog = taskInfo.IsLog;
+ jobParams.Params = _HadoopService.Query(taskConfigCommand).ToList();
+
// 调用任务管理类 开启任务管理
_QuartzService.CreateHttpJob(jobParams);
+
return taskParams.Id;
}
public string Pause(string Id)