计划任务 #3

Merged
panjiandong merged 6 commits from dev into main 2 months ago

@ -0,0 +1,21 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Newtonsoft.Json.Linq;
using ZKLT.Quartz.Model;
namespace ZKLT.Quartz.Interface
{
public interface IQuartzService
{
public void CreateHttpJob(QZ_JobParams jobParams);
public void CreateSqlJob();
public void CloseJob(QZ_JobParams jobParams);
}
}

@ -0,0 +1,152 @@
using Newtonsoft.Json.Linq;
using Quartz;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Reflection.PortableExecutable;
using System.Text;
using System.Threading.Tasks;
using ZKLT.Quartz.Model;
using ZKLT.Hadoop.Interface;
using Newtonsoft.Json;
using ZKLT.Hadoop.Model;
using System.Dynamic;
namespace ZKLT.Quartz.Job
{
/// <summary>
/// HTTP作业类
/// </summary>
public class HttpJob : IJob
{
public async Task Execute(IJobExecutionContext context)
{
JobDataMap dataMap = context.JobDetail.JobDataMap;
IHadoopService _HadoopService = (IHadoopService)dataMap.Get("hadoop");
// 任务配置项
List<QZ_JobConfig> configs = (List<QZ_JobConfig>)dataMap.Get("Params");
// 任务Id
string taskId = dataMap.Get("TaskId").ToString();
// 是否开启日志
int isLog = (int)dataMap.Get("IsLog");
// 请求地址
string url = configs.Find(_item => _item.Key == "Url").Value;
// 请求类型
HttpMethod httpMethod = configs.Find(_item => _item.Key == "Method").Value == "GET" ? HttpMethod.Get : HttpMethod.Post;
// 返回数据储存表
string bindDatabase = configs.Find(_item => _item.Key == "BindDatabse").Value;
// 返回数组储存表的唯一键,用做 更新/新增 判断
string bindDatabasePrimary = configs.Find(_item => _item.Key == "BindDatabasePrimary").Value;
// 返回数据集合名
string resultSet = configs.Find(_item => _item.Key == "ResultSet").Value;
using (HttpClient client = new HttpClient())
{
try
{
var resultColumnConfigs = new List<QZ_JobConfig>();
HttpRequestMessage request = new HttpRequestMessage(httpMethod, url);
foreach (var _item in configs)
{
if (_item.Group! == "Headers")
{
request.Headers.Add(_item.Key, _item.Value);
}
else if (_item.Group! == "ResultColumns")
{
resultColumnConfigs.Add(_item);
}
}
//请求提交
HttpResponseMessage response = await client.SendAsync(request);
response.EnsureSuccessStatusCode();
string responseBody = await response.Content.ReadAsStringAsync();
var responseObject = JsonConvert.DeserializeObject<JObject>(responseBody);
JArray responseList = (JArray)responseObject.GetValue(resultSet);
// 如果设定了绑定表 插入表中
if (bindDatabase != null)
{
HDP_Command[] commands = makeCommand(bindDatabase,bindDatabasePrimary,_HadoopService,responseList, resultColumnConfigs);
_HadoopService.PatchCommand(commands);
}
// 日志开启,插入日志
if (isLog == HDP_Task.LOGOPEN)
{
var data = new HDP_TaskLog()
{
Id = "TL" + DateTime.Now.ToUniversalTime(),
TaskId = taskId,
Content = responseBody
};
var logCommand = new HDP_Command()
{
TableId = HDP_TaskLog.TABLEID,
Data = JToken.FromObject(data)
};
_HadoopService.Insert(logCommand);
}
}
catch (HttpRequestException e)
{
}
}
await Task.CompletedTask;
}
/// <summary>
///
/// </summary>
/// <param name="bindDatabase">绑定数据表</param>
/// <param name="bindDatabasePrimary">数据唯一值</param>
/// <param name="_HadoopService">hadoop实例</param>
/// <param name="responseList">请求返回值</param>
/// <param name="configList">任务配置项</param>
/// <returns></returns>
public HDP_Command[] makeCommand(string bindDatabase,string bindDatabasePrimary,IHadoopService _HadoopService, JArray responseList,List<QZ_JobConfig> configList)
{
//查询数据库是否存在数据
var queryCommand = new HDP_Command()
{
TableId = bindDatabase
};
var oldDataDapperRow = _HadoopService.Query<dynamic>(queryCommand).ToList();
var oldData = new List<JObject>();
foreach (var row in oldDataDapperRow)
{
var rowJson = JsonConvert.SerializeObject(row);
JObject rowJObject = JObject.Parse(rowJson);
oldData.Add(rowJObject);
}
HDP_Command[] commands = new HDP_Command?[responseList.Count];
foreach (var _item in responseList)
{
var command = new HDP_Command();
command.TableId = bindDatabase;
command.Type = HDP_CommandType.INSERT;
var data = new JObject();
foreach (var _config in configList)
{
// _config.Key 结果字段 _config.Value 数据库字段
data[_config.Value] = _item[_config.Key];
if (_config.Value == bindDatabasePrimary && command.Type == HDP_CommandType.INSERT)
{
var oldItem = oldData.Find(_oItem => _oItem.GetValue(bindDatabasePrimary).ToString() == data.GetValue(bindDatabasePrimary).ToString());
if (oldItem != null)
{
command.Where = new JObject()
{
{bindDatabasePrimary, "=" }
};
command.Type = HDP_CommandType.UPDATE;
}
}
}
command.Data = data;
int _index = responseList.IndexOf(_item);
commands[_index] = command;
}
return commands;
}
}
}

@ -0,0 +1,24 @@
using Newtonsoft.Json.Linq;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace ZKLT.Quartz.Model
{
public class QZ_JobConfig
{
private string _Id;
private string _TaskId;
private string _Key;
private string? _Value;
private string? _Group;
public string Id { get => _Id; set => _Id = value; }
public string TaskId { get => _TaskId; set => _TaskId = value; }
public string Key { get => _Key; set => _Key = value; }
public string? Value { get => _Value; set => _Value = value; }
public string? Group { get => _Group; set => _Group = value; }
}
}

@ -0,0 +1,32 @@
using Newtonsoft.Json.Linq;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace ZKLT.Quartz.Model
{
public class QZ_JobParams
{
private string _TaskId;
private string? _Url;
private string? _Method;
private string? _Headers;
private string? _Body;
private string? _CronTime;
private string? _BindTable;
private int _IsLog;
private List<QZ_JobConfig>? _Params;
public string TaskId { get => _TaskId; set => _TaskId = value;}
public string? Url { get => _Url; set => _Url = value; }
public string? Method { get => _Method; set => _Method = value; }
public string? Headers { get => _Headers; set => _Headers = value;}
public string? Body { get => _Body; set => _Body = value;}
public string? CronTime { get => _CronTime; set => _CronTime = value; }
public string? BindTable { get => _BindTable; set => _BindTable = value; }
public int IsLog { get => _IsLog; set => _IsLog = value; }
public List<QZ_JobConfig>? Params { get => _Params; set => _Params = value;}
}
}

@ -0,0 +1,117 @@
using Newtonsoft.Json.Linq;
using Quartz;
using Quartz.Impl;
using System.Threading;
using ZKLT.Hadoop.Interface;
using ZKLT.Hadoop.Model;
using ZKLT.Quartz.Interface;
using ZKLT.Quartz.Job;
using ZKLT.Quartz.Model;
namespace ZKLT.Quartz
{
public class QuartzService : IQuartzService
{
private IScheduler _Scheduler;
private IHadoopService _HadoopService;
public QuartzService(IHadoopService hadoopService)
{
_HadoopService = hadoopService;
//实例化调度器
_Scheduler = StdSchedulerFactory.GetDefaultScheduler().Result;
_Scheduler.Start();
}
public void CreateHttpJob(QZ_JobParams jobParams)
{
//生成jobDataMap
JobDataMap jobDataMap = MakeJobDataMap(jobParams);
//创建HTTP作业 Id作为名称
IJobDetail job = JobBuilder.Create<HttpJob>()
.WithIdentity("j" + jobParams.TaskId, HDP_Task.HTTPTASK)
.UsingJobData(jobDataMap)
.Build();
//创建触发器
ITrigger trigger = TriggerBuilder.Create().WithIdentity("t" + jobParams.TaskId, HDP_Task.HTTPTASK).StartNow()
.WithCronSchedule(jobParams.CronTime)
.Build();
// 把作业,触发器加入调度器
_Scheduler.ScheduleJob(job, trigger);
}
private JobDataMap MakeJobDataMap(QZ_JobParams jobParams)
{
JobDataMap jobDataMap = new JobDataMap();
Type jobType = jobParams.GetType();
foreach (var prop in jobType.GetProperties())
{
var propValue = prop.GetValue(jobParams);
// 过滤掉 null 值
if (propValue == null) continue;
jobDataMap.Add(prop.Name, propValue);
}
jobDataMap.Add("hadoop", _HadoopService);
return jobDataMap;
}
public void CreateSqlJob()
{
}
public void CloseJob(QZ_JobParams jobParams)
{
JobKey jobkey = new JobKey("j" + jobParams.TaskId, HDP_Task.HTTPTASK);
TriggerKey triggerKey = new TriggerKey("t" + jobParams.TaskId, HDP_Task.HTTPTASK);
_Scheduler.PauseTrigger(triggerKey);
_Scheduler.UnscheduleJob(triggerKey);
_Scheduler.PauseJob(jobkey);
_Scheduler.DeleteJob(jobkey);
}
/// <summary>
/// 暂停任务
/// </summary>
/// <param name="taskInfo"></param>
/// <returns></returns>
//public string Pause(TaskInfoModel taskInfo)
//{
// JobKey jobkey = new JobKey("j" + taskInfo.Id, "http");
// TriggerKey triggerKey = new TriggerKey("t" + taskInfo.Id, "http");
// scheduler.PauseJob(jobkey);
// scheduler.PauseTrigger(triggerKey);
// return taskInfo.Id;
//}
///// <summary>
///// 关闭任务
///// </summary>
///// <param name="taskInfo"></param>
///// <returns></returns>
//public string Close(TaskInfoModel taskInfo)
//{
// JobKey jobkey = new JobKey("j" + taskInfo.Id, "http");
// TriggerKey triggerKey = new TriggerKey("t" + taskInfo.Id, "http");
// scheduler.PauseTrigger(triggerKey);
// scheduler.UnscheduleJob(triggerKey);
// scheduler.PauseJob(jobkey);
// scheduler.DeleteJob(jobkey);
// return taskInfo.Id;
//}
private JobKey GetJobKey(string Id,string GroupName)
{
JobKey jobKey = new JobKey("j" + Id, GroupName);
return jobKey;
}
}
}

@ -0,0 +1,18 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Newtonsoft.Json" Version="13.0.3" />
<PackageReference Include="Quartz" Version="3.13.0" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Hadoop\ZKLT.Hadoop.Interface\ZKLT.Hadoop.Interface.csproj" />
</ItemGroup>
</Project>

@ -4,7 +4,6 @@ using MySqlX.XDevAPI.Relational;
using Newtonsoft.Json.Linq;
using ZKLT.Hadoop.Interface;
using ZKLT.Hadoop.Model;
namespace ZKLT.Hadoop.API.Controllers
{
/// <summary>
@ -14,16 +13,19 @@ namespace ZKLT.Hadoop.API.Controllers
[ApiController]
public class HadoopController : ControllerBase
{
public HadoopController(IHadoopService hadoop,ITableService table)
public HadoopController(IHadoopService hadoop,ITableService table, ITaskService task)
{
_HadoopService = hadoop;
_TableService = table;
_TaskService = task;
}
private IHadoopService _HadoopService;
private ITableService _TableService;
private ITaskService _TaskService;
[HttpGet("getid")]
public ActionResult GetId([FromQuery] string? prefix, [FromQuery] int? count) {
if (count != null && count > 0)
@ -280,5 +282,30 @@ namespace ZKLT.Hadoop.API.Controllers
return BadRequest(e.Message);
}
}
[HttpPost("createTask")]
public ActionResult CreateTask(HDP_Task taskParams)
{
try
{
return Ok(_TaskService.Start(taskParams));
}
catch (Exception e)
{
return BadRequest(e.Message);
}
}
[HttpPost("closeTask")]
public ActionResult CloseTask(HDP_Task taskParams)
{
try
{
return Ok(_TaskService.Close(taskParams));
}
catch (Exception e)
{
return BadRequest(e.Message);
}
}
}
}

@ -1 +1 @@
docker build -f ./Dockerfile -t hadoop:1.0.6 ../../.
docker build -f ./Dockerfile -t hadoop:1.0.7 ../../.

@ -0,0 +1,15 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using ZKLT.Hadoop.Model;
namespace ZKLT.Hadoop.Interface
{
public interface ITaskService
{
public string Start(HDP_Task taskParams);
public string Close(HDP_Task taskParams);
}
}

@ -0,0 +1,37 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Security.Cryptography;
using System.Text;
using System.Threading.Tasks;
namespace ZKLT.Hadoop.Model
{
public class HDP_Task
{
public const string HTTPTASK = "http";
public const string SQLTASK = "sql";
public const int LOGOPEN = 1;
public const int LOGCLOSE = 0;
public const int ACTIVE = 1;
public const int NOTACTIVE = 0;
private string _Id;
private string? _Title;
private int? _IsActive;
private string? _Type;
private string? _CronTime;
private int _IsLog;
private List<HDP_TaskConfig>? _TaskConfigs;
public string Id { get => _Id; set => _Id = value; }
public string? Title { get => _Title; set => _Title = value; }
public int? IsActive { get => _IsActive; set => _IsActive = value; }
public string? Type { get => _Type; set => _Type = value; }
public string? CronTime { get => _CronTime; set => _CronTime = value; }
public int IsLog { get => _IsLog; set => _IsLog = value; }
public List<HDP_TaskConfig>? TaskConfigs { get => _TaskConfigs; set => _TaskConfigs = value; }
}
}

@ -0,0 +1,25 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace ZKLT.Hadoop.Model
{
public class HDP_TaskConfig
{
private string _Id;
private string _TaskId;
private string _Key;
private string _Value;
public string Id { get => _Id; set => _Id = value; }
public string TaskId { get => _TaskId; set => _TaskId = value; }
public string Key { get => _Key; set => _Key = value; }
public string Value { get => _Value; set => _Value = value; }
}
}

@ -0,0 +1,21 @@
using Newtonsoft.Json.Linq;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace ZKLT.Hadoop.Model
{
public class HDP_TaskLog
{
public const string TABLEID = "ERP_TaskLog";
private string _Id;
private string _TaskId;
private JToken? _Content;
public string Id { get =>_Id; set => _Id = value; }
public string TaskId { get => _TaskId; set => _TaskId = value; }
public JToken? Content { get => _Content; set => _Content = value; }
}
}

@ -2,6 +2,9 @@
using Microsoft.Extensions.DependencyInjection;
using MySqlX.XDevAPI.Relational;
using Newtonsoft.Json.Linq;
using Quartz.Impl;
using Quartz;
using Quartz.Spi;
using System;
using System.Collections.Generic;
using System.Linq;
@ -10,6 +13,8 @@ using System.Threading.Tasks;
using System.Transactions;
using ZKLT.Hadoop.Interface;
using ZKLT.Hadoop.Model;
using ZKLT.Quartz;
using ZKLT.Quartz.Interface;
namespace ZKLT.Hadoop
{
@ -27,6 +32,9 @@ namespace ZKLT.Hadoop
{
services.AddSingleton<IHadoopService, HadoopService>();
services.AddSingleton<ITableService, TableService>();
services.AddSingleton<ITaskService, TaskService>();
services.AddSingleton<IQuartzService, QuartzService>();
return services;
}
@ -477,5 +485,6 @@ namespace ZKLT.Hadoop
}
return _result.ToArray();
}
}
}

@ -0,0 +1,73 @@
using Newtonsoft.Json.Linq;
using Org.BouncyCastle.Asn1.Tsp;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using ZKLT.Hadoop.Interface;
using ZKLT.Hadoop.Model;
using ZKLT.Quartz.Interface;
using ZKLT.Quartz.Model;
namespace ZKLT.Hadoop
{
public class TaskService : ITaskService
{
private IHadoopService _HadoopService;
private IQuartzService _QuartzService;
public TaskService(IHadoopService hadoopService, IQuartzService quartzService) {
_HadoopService = hadoopService;
_QuartzService = quartzService;
}
public string Start(HDP_Task taskParams)
{
//获取计划任务信息
var taskCommand = new HDP_Command();
taskCommand.TableId = "ERP_Task";
taskCommand.Where = new JObject() { { "Id","=" } };
taskCommand.Data = new JObject() { {"Id",taskParams.Id } };
var taskInfo = _HadoopService.QuerySingle<HDP_Task>(taskCommand);
if(taskInfo.IsActive == HDP_Task.ACTIVE)
{
return $"任务已经开启,无需再次开启";
}
//获取计划任务配置
var taskConfigCommand = new HDP_Command();
taskConfigCommand.TableId = "ERP_TaskConfig";
taskConfigCommand.Where = new JObject() { { "TaskId","=" } };
taskConfigCommand.Data = new JObject() { {"TaskId",taskParams.Id} };
QZ_JobParams jobParams = new QZ_JobParams();
jobParams.TaskId = taskInfo.Id;
jobParams.CronTime = taskInfo.CronTime;
jobParams.IsLog = taskInfo.IsLog;
jobParams.Params = _HadoopService.Query<QZ_JobConfig>(taskConfigCommand).ToList();
// 调用任务管理类 开启任务管理
_QuartzService.CreateHttpJob(jobParams);
// 更新active
taskCommand.Data = new JObject() { { "Id", taskParams.Id },{ "IsActive",1} };
_HadoopService.Update(taskCommand);
return $"任务{taskParams.Id}已开启";
}
public string Close(HDP_Task taskParams)
{
QZ_JobParams jobParams = new QZ_JobParams() { TaskId=taskParams.Id};
_QuartzService.CloseJob(jobParams);
// 更新active
var taskCommand = new HDP_Command();
taskCommand.TableId = "ERP_Task";
taskCommand.Where = new JObject() { { "Id", "=" } };
taskCommand.Data = new JObject() { { "Id", taskParams.Id }, { "IsActive", 0 } };
_HadoopService.Update(taskCommand);
return $"任务{taskParams.Id}已关闭";
}
}
}

@ -13,6 +13,7 @@
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\Crontab\ZKLT.Quartz.csproj" />
<ProjectReference Include="..\ZKLT.Hadoop.Interface\ZKLT.Hadoop.Interface.csproj" />
<ProjectReference Include="..\ZKLT.Hadoop.Model\ZKLT.Hadoop.Model.csproj" />
</ItemGroup>

@ -4,6 +4,9 @@ Microsoft Visual Studio Solution File, Format Version 12.00
VisualStudioVersion = 17.8.34408.163
MinimumVisualStudioVersion = 10.0.40219.1
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ZKLT.Hadoop", "Hadoop\ZKLT.Hadoop\ZKLT.Hadoop.csproj", "{CD7387DB-B80A-412E-89B9-830EFD28C0F4}"
ProjectSection(ProjectDependencies) = postProject
{D189A534-BABE-4B96-8106-F860B94C1F99} = {D189A534-BABE-4B96-8106-F860B94C1F99}
EndProjectSection
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Hadoop", "Hadoop", "{14EA48C3-3F3A-4789-BB0B-5485771D3840}"
EndProject
@ -13,6 +16,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ZKLT.Hadoop.Interface", "Ha
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ZKLT.Hadoop.API", "Hadoop\ZKLT.Hadoop.API\ZKLT.Hadoop.API.csproj", "{398DF3E5-B94B-4B2E-9DC6-B6741D7CF4DB}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ZKLT.Quartz", "Crontab\ZKLT.Quartz.csproj", "{D189A534-BABE-4B96-8106-F860B94C1F99}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@ -35,6 +40,10 @@ Global
{398DF3E5-B94B-4B2E-9DC6-B6741D7CF4DB}.Debug|Any CPU.Build.0 = Debug|Any CPU
{398DF3E5-B94B-4B2E-9DC6-B6741D7CF4DB}.Release|Any CPU.ActiveCfg = Release|Any CPU
{398DF3E5-B94B-4B2E-9DC6-B6741D7CF4DB}.Release|Any CPU.Build.0 = Release|Any CPU
{D189A534-BABE-4B96-8106-F860B94C1F99}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{D189A534-BABE-4B96-8106-F860B94C1F99}.Debug|Any CPU.Build.0 = Debug|Any CPU
{D189A534-BABE-4B96-8106-F860B94C1F99}.Release|Any CPU.ActiveCfg = Release|Any CPU
{D189A534-BABE-4B96-8106-F860B94C1F99}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@ -44,6 +53,7 @@ Global
{070B12FF-0B5A-4D0B-B444-70D6F91FB338} = {14EA48C3-3F3A-4789-BB0B-5485771D3840}
{893FE0B2-8D14-42BB-B19E-0FD09EEA2433} = {14EA48C3-3F3A-4789-BB0B-5485771D3840}
{398DF3E5-B94B-4B2E-9DC6-B6741D7CF4DB} = {14EA48C3-3F3A-4789-BB0B-5485771D3840}
{D189A534-BABE-4B96-8106-F860B94C1F99} = {14EA48C3-3F3A-4789-BB0B-5485771D3840}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {87BED7C0-F181-4EA3-85CD-6D146DA33FF3}

Loading…
Cancel
Save