插入绑定表、日志表

pull/3/head
hhb 2 months ago
parent a5b6c387f8
commit 3cd7a42c4b

@ -11,6 +11,7 @@ using ZKLT.Quartz.Model;
using ZKLT.Hadoop.Interface; using ZKLT.Hadoop.Interface;
using Newtonsoft.Json; using Newtonsoft.Json;
using ZKLT.Hadoop.Model; using ZKLT.Hadoop.Model;
using System.Dynamic;
namespace ZKLT.Quartz.Job namespace ZKLT.Quartz.Job
{ {
/// <summary> /// <summary>
@ -18,29 +19,29 @@ namespace ZKLT.Quartz.Job
/// </summary> /// </summary>
public class HttpJob : IJob public class HttpJob : IJob
{ {
private IHadoopService HadoopService;
public HttpJob(IHadoopService _hadoopService)
{
HadoopService = _hadoopService;
}
public async Task Execute(IJobExecutionContext context) public async Task Execute(IJobExecutionContext context)
{ {
JobDataMap dataMap = context.JobDetail.JobDataMap; JobDataMap dataMap = context.JobDetail.JobDataMap;
string taskId = dataMap.Get("TaskId").ToString(); IHadoopService _HadoopService = (IHadoopService)dataMap.Get("hadoop");
Console.WriteLine(taskId); // 任务配置项
List<QZ_JobConfig> configs = (List<QZ_JobConfig>)dataMap.Get("Params"); List<QZ_JobConfig> configs = (List<QZ_JobConfig>)dataMap.Get("Params");
// 任务Id
string taskId = dataMap.Get("TaskId").ToString();
// 是否开启日志
int isLog = (int)dataMap.Get("IsLog");
// 请求地址
string url = configs.Find(_item => _item.Key == "Url").Value; string url = configs.Find(_item => _item.Key == "Url").Value;
HttpMethod httpMethod = configs.Find(_item => _item.Key == "Method").Value == "Get" ? HttpMethod.Get : HttpMethod.Post; // 请求类型
HttpMethod httpMethod = configs.Find(_item => _item.Key == "Method").Value == "GET" ? HttpMethod.Get : HttpMethod.Post;
// 返回数据储存表
string bindDatabase = configs.Find(_item => _item.Key == "BindDatabse").Value; string bindDatabase = configs.Find(_item => _item.Key == "BindDatabse").Value;
// 返回数据集合名
string resultSet = configs.Find(_item => _item.Key == "ResultSet").Value;
using (HttpClient client = new HttpClient()) using (HttpClient client = new HttpClient())
{ {
try try
{ {
var resultColumnConfigs = new List<QZ_JobConfig>(); var resultColumnConfigs = new List<QZ_JobConfig>();
HttpRequestMessage request = new HttpRequestMessage(httpMethod, url); HttpRequestMessage request = new HttpRequestMessage(httpMethod, url);
foreach (var _item in configs) foreach (var _item in configs)
{ {
@ -57,11 +58,29 @@ namespace ZKLT.Quartz.Job
HttpResponseMessage response = await client.SendAsync(request); HttpResponseMessage response = await client.SendAsync(request);
response.EnsureSuccessStatusCode(); response.EnsureSuccessStatusCode();
string responseBody = await response.Content.ReadAsStringAsync(); string responseBody = await response.Content.ReadAsStringAsync();
var responseList = JsonConvert.DeserializeObject<List<JObject>>(responseBody); var responseObject = JsonConvert.DeserializeObject<JObject>(responseBody);
JArray responseList = (JArray)responseObject.GetValue(resultSet);
// 如果设定了绑定表 插入表中
if (bindDatabase != null) if (bindDatabase != null)
{ {
HDP_Command[] commands = makeCommand(bindDatabase, responseList, resultColumnConfigs); HDP_Command[] commands = makeCommand(bindDatabase, responseList, resultColumnConfigs);
HadoopService.PatchCommand(commands); _HadoopService.PatchCommand(commands);
}
// 日志开启,插入日志
if (isLog == HDP_Task.LOGOPEN)
{
var data = new HDP_TaskLog()
{
Id = "TL" + DateTime.Now.ToUniversalTime(),
TaskId = taskId,
Content = responseBody
};
var logCommand = new HDP_Command()
{
TableId = HDP_TaskLog.TABLEID,
Data = JToken.FromObject(data)
};
_HadoopService.Insert(logCommand);
} }
} }
catch (HttpRequestException e) catch (HttpRequestException e)
@ -70,10 +89,11 @@ namespace ZKLT.Quartz.Job
} }
} }
await Task.CompletedTask; await Task.CompletedTask;
} }
public HDP_Command[] makeCommand(string bindDatabase,List<JObject> responseList,List<QZ_JobConfig> configList) public HDP_Command[] makeCommand(string bindDatabase,JArray responseList,List<QZ_JobConfig> configList)
{ {
HDP_Command[] commands = new HDP_Command?[responseList.Count]; HDP_Command[] commands = new HDP_Command?[responseList.Count];
foreach (var _item in responseList) foreach (var _item in responseList)
@ -81,10 +101,14 @@ namespace ZKLT.Quartz.Job
var command = new HDP_Command(); var command = new HDP_Command();
command.TableId = bindDatabase; command.TableId = bindDatabase;
command.Type = HDP_CommandType.INSERT; command.Type = HDP_CommandType.INSERT;
var data = new JObject();
foreach (var _config in configList) foreach (var _config in configList)
{ {
command.Data[_config.Value] = _item[_config.Key]; data[_config.Value] = _item[_config.Key];
} }
command.Data = data;
int _index = responseList.IndexOf(_item);
commands[_index] = command;
} }
return commands; return commands;
} }

@ -16,6 +16,7 @@ namespace ZKLT.Quartz.Model
private string? _Body; private string? _Body;
private string? _CronTime; private string? _CronTime;
private string? _BindTable; private string? _BindTable;
private int _IsLog;
private List<QZ_JobConfig>? _Params; private List<QZ_JobConfig>? _Params;
public string TaskId { get => _TaskId; set => _TaskId = value;} public string TaskId { get => _TaskId; set => _TaskId = value;}
@ -25,6 +26,7 @@ namespace ZKLT.Quartz.Model
public string? Body { get => _Body; set => _Body = value;} public string? Body { get => _Body; set => _Body = value;}
public string? CronTime { get => _CronTime; set => _CronTime = value; } public string? CronTime { get => _CronTime; set => _CronTime = value; }
public string? BindTable { get => _BindTable; set => _BindTable = value; } public string? BindTable { get => _BindTable; set => _BindTable = value; }
public int IsLog { get => _IsLog; set => _IsLog = value; }
public List<QZ_JobConfig>? Params { get => _Params; set => _Params = value;} public List<QZ_JobConfig>? Params { get => _Params; set => _Params = value;}
} }
} }

@ -2,6 +2,8 @@
using Quartz; using Quartz;
using Quartz.Impl; using Quartz.Impl;
using System.Threading; using System.Threading;
using ZKLT.Hadoop.Interface;
using ZKLT.Hadoop.Model;
using ZKLT.Quartz.Interface; using ZKLT.Quartz.Interface;
using ZKLT.Quartz.Job; using ZKLT.Quartz.Job;
using ZKLT.Quartz.Model; using ZKLT.Quartz.Model;
@ -10,15 +12,16 @@ namespace ZKLT.Quartz
{ {
public class QuartzService : IQuartzService public class QuartzService : IQuartzService
{ {
private const string HTTPTASK = "http";
private const string SQLTASK = "sql";
private IScheduler Scheduler;
public QuartzService() private IScheduler _Scheduler;
private IHadoopService _HadoopService;
public QuartzService(IHadoopService hadoopService)
{ {
_HadoopService = hadoopService;
//实例化调度器 //实例化调度器
Scheduler = StdSchedulerFactory.GetDefaultScheduler().Result; _Scheduler = StdSchedulerFactory.GetDefaultScheduler().Result;
Scheduler.Start(); _Scheduler.Start();
} }
public void CreateHttpJob(QZ_JobParams jobParams) public void CreateHttpJob(QZ_JobParams jobParams)
@ -28,13 +31,17 @@ namespace ZKLT.Quartz
//创建HTTP作业 Id作为名称 //创建HTTP作业 Id作为名称
IJobDetail job = JobBuilder.Create<HttpJob>() IJobDetail job = JobBuilder.Create<HttpJob>()
.WithIdentity("j" + jobParams.TaskId, HTTPTASK) .WithIdentity("j" + jobParams.TaskId, HDP_Task.HTTPTASK)
.UsingJobData(jobDataMap) .UsingJobData(jobDataMap)
.Build(); .Build();
//创建触发器 //创建触发器
ITrigger trigger = TriggerBuilder.Create().WithIdentity("t" + jobParams.TaskId, HTTPTASK).StartNow().WithCronSchedule(jobParams.CronTime).Build(); ITrigger trigger = TriggerBuilder.Create().WithIdentity("t" + jobParams.TaskId, HDP_Task.HTTPTASK).StartNow()
.WithCronSchedule(jobParams.CronTime)
.Build();
// 把作业,触发器加入调度器 // 把作业,触发器加入调度器
Scheduler.ScheduleJob(job, trigger); _Scheduler.ScheduleJob(job, trigger);
} }
private JobDataMap MakeJobDataMap(QZ_JobParams jobParams) private JobDataMap MakeJobDataMap(QZ_JobParams jobParams)
@ -48,6 +55,7 @@ namespace ZKLT.Quartz
if (propValue == null) continue; if (propValue == null) continue;
jobDataMap.Add(prop.Name, propValue); jobDataMap.Add(prop.Name, propValue);
} }
jobDataMap.Add("hadoop", _HadoopService);
return jobDataMap; return jobDataMap;
} }
@ -58,6 +66,7 @@ namespace ZKLT.Quartz
public void CloseJob() public void CloseJob()
{ {
} }

@ -9,11 +9,16 @@ namespace ZKLT.Hadoop.Model
{ {
public class HDP_Task public class HDP_Task
{ {
public const string HTTPTASK = "http";
public const string SQLTASK = "sql";
public const int LOGOPEN = 1;
public const int LOGCLOSE = 0;
private string _Id; private string _Id;
private string? _Title; private string? _Title;
private int? _IsActive; private int? _IsActive;
private string? _Type; private string? _Type;
private string? _CronTime; private string? _CronTime;
private int _IsLog;
private List<HDP_TaskConfig>? _TaskConfigs; private List<HDP_TaskConfig>? _TaskConfigs;
public string Id { get => _Id; set => _Id = value; } public string Id { get => _Id; set => _Id = value; }
@ -24,7 +29,7 @@ namespace ZKLT.Hadoop.Model
public string? Type { get => _Type; set => _Type = value; } public string? Type { get => _Type; set => _Type = value; }
public string? CronTime { get => _CronTime; set => _CronTime = value; } public string? CronTime { get => _CronTime; set => _CronTime = value; }
public int IsLog { get => _IsLog; set => _IsLog = value; }
public List<HDP_TaskConfig>? TaskConfigs { get => _TaskConfigs; set => _TaskConfigs = value; } public List<HDP_TaskConfig>? TaskConfigs { get => _TaskConfigs; set => _TaskConfigs = value; }
} }
} }

@ -0,0 +1,21 @@
using Newtonsoft.Json.Linq;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace ZKLT.Hadoop.Model
{
public class HDP_TaskLog
{
public const string TABLEID = "ERP_TaskLog";
private string _Id;
private string _TaskId;
private JToken? _Content;
public string Id { get =>_Id; set => _Id = value; }
public string TaskId { get => _TaskId; set => _TaskId = value; }
public JToken? Content { get => _Content; set => _Content = value; }
}
}

@ -2,6 +2,9 @@
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
using MySqlX.XDevAPI.Relational; using MySqlX.XDevAPI.Relational;
using Newtonsoft.Json.Linq; using Newtonsoft.Json.Linq;
using Quartz.Impl;
using Quartz;
using Quartz.Spi;
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
@ -31,6 +34,7 @@ namespace ZKLT.Hadoop
services.AddSingleton<ITableService, TableService>(); services.AddSingleton<ITableService, TableService>();
services.AddSingleton<ITaskService, TaskService>(); services.AddSingleton<ITaskService, TaskService>();
services.AddSingleton<IQuartzService, QuartzService>(); services.AddSingleton<IQuartzService, QuartzService>();
return services; return services;
} }

@ -23,33 +23,26 @@ namespace ZKLT.Hadoop
public string Start(HDP_Task taskParams) public string Start(HDP_Task taskParams)
{ {
//获取计划任务信息 //获取计划任务信息
var command = new HDP_Command(); var taskCommand = new HDP_Command();
command.TableId = "ERP_Task"; taskCommand.TableId = "ERP_Task";
command.Where = new JObject taskCommand.Where = new JObject() { { "Id","=" } };
{ taskCommand.Data = new JObject() { {"Id",taskParams.Id } };
{ "Id","=" } var taskInfo = _HadoopService.QuerySingle<HDP_Task>(taskCommand);
}; //获取计划任务配置
command.Data = new JObject var taskConfigCommand = new HDP_Command();
{ taskConfigCommand.TableId = "ERP_TaskConfig";
{"Id",taskParams.Id } taskConfigCommand.Where = new JObject() { { "TaskId","=" } };
}; taskConfigCommand.Data = new JObject() { {"TaskId",taskParams.Id} };
var taskInfo = _HadoopService.QuerySingle<HDP_Task>(command);
command.TableId = "ERP_TaskConfig";
command.Where = new JObject
{
{ "TaskId","=" }
};
command.Data = new JObject
{
{"TaskId",taskParams.Id }
};
QZ_JobParams jobParams = new QZ_JobParams(); QZ_JobParams jobParams = new QZ_JobParams();
jobParams.TaskId = taskInfo.Id; jobParams.TaskId = taskInfo.Id;
jobParams.CronTime = taskInfo.CronTime; jobParams.CronTime = taskInfo.CronTime;
jobParams.Params = _HadoopService.Query<QZ_JobConfig>(command).ToList(); jobParams.IsLog = taskInfo.IsLog;
jobParams.Params = _HadoopService.Query<QZ_JobConfig>(taskConfigCommand).ToList();
// 调用任务管理类 开启任务管理 // 调用任务管理类 开启任务管理
_QuartzService.CreateHttpJob(jobParams); _QuartzService.CreateHttpJob(jobParams);
return taskParams.Id; return taskParams.Id;
} }
public string Pause(string Id) public string Pause(string Id)

Loading…
Cancel
Save