diff --git a/Crontab/Interface/IQuartzService.cs b/Crontab/Interface/IQuartzService.cs new file mode 100644 index 0000000..219c3b2 --- /dev/null +++ b/Crontab/Interface/IQuartzService.cs @@ -0,0 +1,21 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Newtonsoft.Json.Linq; +using ZKLT.Quartz.Model; + +namespace ZKLT.Quartz.Interface +{ + public interface IQuartzService + { + public void CreateHttpJob(QZ_JobParams httpParams); + + public void CreateSqlJob(); + + public void CloseJob(); + } + +} + diff --git a/Crontab/Job/HttpJob.cs b/Crontab/Job/HttpJob.cs new file mode 100644 index 0000000..a134063 --- /dev/null +++ b/Crontab/Job/HttpJob.cs @@ -0,0 +1,92 @@ +using Newtonsoft.Json.Linq; +using Quartz; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Net; +using System.Reflection.PortableExecutable; +using System.Text; +using System.Threading.Tasks; +using ZKLT.Quartz.Model; +using ZKLT.Hadoop.Interface; +using Newtonsoft.Json; +using ZKLT.Hadoop.Model; +namespace ZKLT.Quartz.Job +{ + /// + /// HTTP作业类 + /// + public class HttpJob : IJob + { + private IHadoopService HadoopService; + public HttpJob(IHadoopService _hadoopService) + { + HadoopService = _hadoopService; + } + public async Task Execute(IJobExecutionContext context) + { + JobDataMap dataMap = context.JobDetail.JobDataMap; + string taskId = dataMap.Get("TaskId").ToString(); + Console.WriteLine(taskId); + + List configs = (List)dataMap.Get("Params"); + + string url = configs.Find(_item => _item.Key == "Url").Value; + HttpMethod httpMethod = configs.Find(_item => _item.Key == "Method").Value == "Get" ? HttpMethod.Get : HttpMethod.Post; + string bindDatabase = configs.Find(_item => _item.Key == "BindDatabse").Value; + + using (HttpClient client = new HttpClient()) + { + try + { + var resultColumnConfigs = new List(); + + HttpRequestMessage request = new HttpRequestMessage(httpMethod, url); + foreach (var _item in configs) + { + if (_item.Group! == "Headers") + { + request.Headers.Add(_item.Key, _item.Value); + } + else if (_item.Group! == "ResultColumns") + { + resultColumnConfigs.Add(_item); + } + } + //请求提交 + HttpResponseMessage response = await client.SendAsync(request); + response.EnsureSuccessStatusCode(); + string responseBody = await response.Content.ReadAsStringAsync(); + var responseList = JsonConvert.DeserializeObject>(responseBody); + if(bindDatabase != null) + { + HDP_Command[] commands = makeCommand(bindDatabase, responseList, resultColumnConfigs); + HadoopService.PatchCommand(commands); + } + } + catch (HttpRequestException e) + { + Console.WriteLine($"请求错误{e.Message}"); + } + } + + await Task.CompletedTask; + } + + public HDP_Command[] makeCommand(string bindDatabase,List responseList,List configList) + { + HDP_Command[] commands = new HDP_Command?[responseList.Count]; + foreach (var _item in responseList) + { + var command = new HDP_Command(); + command.TableId = bindDatabase; + command.Type = HDP_CommandType.INSERT; + foreach (var _config in configList) + { + command.Data[_config.Value] = _item[_config.Key]; + } + } + return commands; + } + } +} diff --git a/Crontab/Model/QZ_JobConfig.cs b/Crontab/Model/QZ_JobConfig.cs new file mode 100644 index 0000000..189cc1e --- /dev/null +++ b/Crontab/Model/QZ_JobConfig.cs @@ -0,0 +1,24 @@ +using Newtonsoft.Json.Linq; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace ZKLT.Quartz.Model +{ + public class QZ_JobConfig + { + private string _Id; + private string _TaskId; + private string _Key; + private string? _Value; + private string? _Group; + + public string Id { get => _Id; set => _Id = value; } + public string TaskId { get => _TaskId; set => _TaskId = value; } + public string Key { get => _Key; set => _Key = value; } + public string? Value { get => _Value; set => _Value = value; } + public string? Group { get => _Group; set => _Group = value; } + } +} diff --git a/Crontab/Model/QZ_JobParams.cs b/Crontab/Model/QZ_JobParams.cs new file mode 100644 index 0000000..80790df --- /dev/null +++ b/Crontab/Model/QZ_JobParams.cs @@ -0,0 +1,30 @@ +using Newtonsoft.Json.Linq; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace ZKLT.Quartz.Model +{ + public class QZ_JobParams + { + private string _TaskId; + private string? _Url; + private string? _Method; + private string? _Headers; + private string? _Body; + private string? _CronTime; + private string? _BindTable; + private List? _Params; + + public string TaskId { get => _TaskId; set => _TaskId = value;} + public string? Url { get => _Url; set => _Url = value; } + public string? Method { get => _Method; set => _Method = value; } + public string? Headers { get => _Headers; set => _Headers = value;} + public string? Body { get => _Body; set => _Body = value;} + public string? CronTime { get => _CronTime; set => _CronTime = value; } + public string? BindTable { get => _BindTable; set => _BindTable = value; } + public List? Params { get => _Params; set => _Params = value;} + } +} diff --git a/Crontab/QuartzService.cs b/Crontab/QuartzService.cs new file mode 100644 index 0000000..003526b --- /dev/null +++ b/Crontab/QuartzService.cs @@ -0,0 +1,102 @@ +using Newtonsoft.Json.Linq; +using Quartz; +using Quartz.Impl; +using System.Threading; +using ZKLT.Quartz.Interface; +using ZKLT.Quartz.Job; +using ZKLT.Quartz.Model; + +namespace ZKLT.Quartz +{ + public class QuartzService : IQuartzService + { + private const string HTTPTASK = "http"; + private const string SQLTASK = "sql"; + private IScheduler Scheduler; + + public QuartzService() + { + //实例化调度器 + Scheduler = StdSchedulerFactory.GetDefaultScheduler().Result; + Scheduler.Start(); + } + + public void CreateHttpJob(QZ_JobParams jobParams) + { + //生成jobDataMap + JobDataMap jobDataMap = MakeJobDataMap(jobParams); + + //创建HTTP作业 Id作为名称 + IJobDetail job = JobBuilder.Create() + .WithIdentity("j" + jobParams.TaskId, HTTPTASK) + .UsingJobData(jobDataMap) + .Build(); + //创建触发器 + ITrigger trigger = TriggerBuilder.Create().WithIdentity("t" + jobParams.TaskId, HTTPTASK).StartNow().WithCronSchedule(jobParams.CronTime).Build(); + // 把作业,触发器加入调度器 + Scheduler.ScheduleJob(job, trigger); + } + + private JobDataMap MakeJobDataMap(QZ_JobParams jobParams) + { + JobDataMap jobDataMap = new JobDataMap(); + Type jobType = jobParams.GetType(); + foreach (var prop in jobType.GetProperties()) + { + var propValue = prop.GetValue(jobParams); + // 过滤掉 null 值 + if (propValue == null) continue; + jobDataMap.Add(prop.Name, propValue); + } + return jobDataMap; + } + + public void CreateSqlJob() + { + + } + + public void CloseJob() + { + } + + + /// + /// 暂停任务 + /// + /// + /// + //public string Pause(TaskInfoModel taskInfo) + //{ + // JobKey jobkey = new JobKey("j" + taskInfo.Id, "http"); + // TriggerKey triggerKey = new TriggerKey("t" + taskInfo.Id, "http"); + // scheduler.PauseJob(jobkey); + // scheduler.PauseTrigger(triggerKey); + // return taskInfo.Id; + //} + + ///// + ///// 关闭任务 + ///// + ///// + ///// + //public string Close(TaskInfoModel taskInfo) + //{ + // JobKey jobkey = new JobKey("j" + taskInfo.Id, "http"); + // TriggerKey triggerKey = new TriggerKey("t" + taskInfo.Id, "http"); + + // scheduler.PauseTrigger(triggerKey); + // scheduler.UnscheduleJob(triggerKey); + // scheduler.PauseJob(jobkey); + // scheduler.DeleteJob(jobkey); + + // return taskInfo.Id; + //} + + private JobKey GetJobKey(string Id,string GroupName) + { + JobKey jobKey = new JobKey("j" + Id, GroupName); + return jobKey; + } + } +} diff --git a/Crontab/ZKLT.Quartz.csproj b/Crontab/ZKLT.Quartz.csproj new file mode 100644 index 0000000..f95af7f --- /dev/null +++ b/Crontab/ZKLT.Quartz.csproj @@ -0,0 +1,18 @@ + + + + net8.0 + enable + enable + + + + + + + + + + + + diff --git a/Hadoop/ZKLT.Hadoop.API/Controllers/HadoopController.cs b/Hadoop/ZKLT.Hadoop.API/Controllers/HadoopController.cs index 74af2ef..bf69da3 100644 --- a/Hadoop/ZKLT.Hadoop.API/Controllers/HadoopController.cs +++ b/Hadoop/ZKLT.Hadoop.API/Controllers/HadoopController.cs @@ -4,7 +4,6 @@ using MySqlX.XDevAPI.Relational; using Newtonsoft.Json.Linq; using ZKLT.Hadoop.Interface; using ZKLT.Hadoop.Model; - namespace ZKLT.Hadoop.API.Controllers { /// @@ -14,16 +13,19 @@ namespace ZKLT.Hadoop.API.Controllers [ApiController] public class HadoopController : ControllerBase { - public HadoopController(IHadoopService hadoop,ITableService table) + public HadoopController(IHadoopService hadoop,ITableService table, ITaskService task) { _HadoopService = hadoop; _TableService = table; + _TaskService = task; } private IHadoopService _HadoopService; private ITableService _TableService; + private ITaskService _TaskService; + [HttpGet("getid")] public ActionResult GetId([FromQuery] string? prefix, [FromQuery] int? count) { if (count != null && count > 0) @@ -279,5 +281,30 @@ namespace ZKLT.Hadoop.API.Controllers return BadRequest(e.Message); } } + + [HttpPost("createTask")] + public ActionResult CreateTask(HDP_Task taskParams) + { + try + { + return Ok(_TaskService.Start(taskParams)); + } + catch (Exception e) + { + return BadRequest(e.Message); + } + } + [HttpPost("closeTask")] + public ActionResult CloseTask(HDP_Task taskParams) + { + try + { + return Ok(_TaskService.Close("1")); + } + catch (Exception e) + { + return BadRequest(e.Message); + } + } } } diff --git a/Hadoop/ZKLT.Hadoop.Interface/ITaskService.cs b/Hadoop/ZKLT.Hadoop.Interface/ITaskService.cs new file mode 100644 index 0000000..6c1a3d5 --- /dev/null +++ b/Hadoop/ZKLT.Hadoop.Interface/ITaskService.cs @@ -0,0 +1,16 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using ZKLT.Hadoop.Model; + +namespace ZKLT.Hadoop.Interface +{ + public interface ITaskService + { + public string Start(HDP_Task taskParams); + public string Pause(string Id); + public string Close(string Id); + } +} diff --git a/Hadoop/ZKLT.Hadoop.Model/HDP_Task.cs b/Hadoop/ZKLT.Hadoop.Model/HDP_Task.cs new file mode 100644 index 0000000..9a4eb48 --- /dev/null +++ b/Hadoop/ZKLT.Hadoop.Model/HDP_Task.cs @@ -0,0 +1,30 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Security.Cryptography; +using System.Text; +using System.Threading.Tasks; + +namespace ZKLT.Hadoop.Model +{ + public class HDP_Task + { + private string _Id; + private string? _Title; + private int? _IsActive; + private string? _Type; + private string? _CronTime; + private List? _TaskConfigs; + + public string Id { get => _Id; set => _Id = value; } + + public string? Title { get => _Title; set => _Title = value; } + + public int? IsActive { get => _IsActive; set => _IsActive = value; } + + public string? Type { get => _Type; set => _Type = value; } + public string? CronTime { get => _CronTime; set => _CronTime = value; } + + public List? TaskConfigs { get => _TaskConfigs; set => _TaskConfigs = value; } + } +} diff --git a/Hadoop/ZKLT.Hadoop.Model/HDP_TaskConfig.cs b/Hadoop/ZKLT.Hadoop.Model/HDP_TaskConfig.cs new file mode 100644 index 0000000..125440c --- /dev/null +++ b/Hadoop/ZKLT.Hadoop.Model/HDP_TaskConfig.cs @@ -0,0 +1,25 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace ZKLT.Hadoop.Model +{ + public class HDP_TaskConfig + { + + private string _Id; + private string _TaskId; + private string _Key; + private string _Value; + + public string Id { get => _Id; set => _Id = value; } + + public string TaskId { get => _TaskId; set => _TaskId = value; } + + public string Key { get => _Key; set => _Key = value; } + + public string Value { get => _Value; set => _Value = value; } + } +} diff --git a/Hadoop/ZKLT.Hadoop/HadoopService.cs b/Hadoop/ZKLT.Hadoop/HadoopService.cs index 64152ea..c6c479d 100644 --- a/Hadoop/ZKLT.Hadoop/HadoopService.cs +++ b/Hadoop/ZKLT.Hadoop/HadoopService.cs @@ -10,6 +10,8 @@ using System.Threading.Tasks; using System.Transactions; using ZKLT.Hadoop.Interface; using ZKLT.Hadoop.Model; +using ZKLT.Quartz; +using ZKLT.Quartz.Interface; namespace ZKLT.Hadoop { @@ -27,6 +29,8 @@ namespace ZKLT.Hadoop { services.AddSingleton(); services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); return services; } @@ -477,5 +481,6 @@ namespace ZKLT.Hadoop } return _result.ToArray(); } + } } diff --git a/Hadoop/ZKLT.Hadoop/TaskService.cs b/Hadoop/ZKLT.Hadoop/TaskService.cs new file mode 100644 index 0000000..cece116 --- /dev/null +++ b/Hadoop/ZKLT.Hadoop/TaskService.cs @@ -0,0 +1,65 @@ +using Newtonsoft.Json.Linq; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using ZKLT.Hadoop.Interface; +using ZKLT.Hadoop.Model; +using ZKLT.Quartz.Interface; +using ZKLT.Quartz.Model; + +namespace ZKLT.Hadoop +{ + public class TaskService : ITaskService + { + private IHadoopService _HadoopService; + private IQuartzService _QuartzService; + public TaskService(IHadoopService hadoopService, IQuartzService quartzService) { + _HadoopService = hadoopService; + _QuartzService = quartzService; + } + + public string Start(HDP_Task taskParams) + { + //获取计划任务信息 + var command = new HDP_Command(); + command.TableId = "ERP_Task"; + command.Where = new JObject + { + { "Id","=" } + }; + command.Data = new JObject + { + {"Id",taskParams.Id } + }; + var taskInfo = _HadoopService.QuerySingle(command); + command.TableId = "ERP_TaskConfig"; + command.Where = new JObject + { + { "TaskId","=" } + }; + command.Data = new JObject + { + {"TaskId",taskParams.Id } + }; + + QZ_JobParams jobParams = new QZ_JobParams(); + jobParams.TaskId = taskInfo.Id; + jobParams.CronTime = taskInfo.CronTime; + jobParams.Params = _HadoopService.Query(command).ToList(); + // 调用任务管理类 开启任务管理 + _QuartzService.CreateHttpJob(jobParams); + return taskParams.Id; + } + public string Pause(string Id) + { + return Id; + } + public string Close(string Id) + { + return Id; + } + + } +} diff --git a/Hadoop/ZKLT.Hadoop/ZKLT.Hadoop.csproj b/Hadoop/ZKLT.Hadoop/ZKLT.Hadoop.csproj index 231ebe7..da8bde4 100644 --- a/Hadoop/ZKLT.Hadoop/ZKLT.Hadoop.csproj +++ b/Hadoop/ZKLT.Hadoop/ZKLT.Hadoop.csproj @@ -13,6 +13,7 @@ + diff --git a/ZKLT.sln b/ZKLT.sln index d9ae477..fe1cbc3 100644 --- a/ZKLT.sln +++ b/ZKLT.sln @@ -4,6 +4,9 @@ Microsoft Visual Studio Solution File, Format Version 12.00 VisualStudioVersion = 17.8.34408.163 MinimumVisualStudioVersion = 10.0.40219.1 Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ZKLT.Hadoop", "Hadoop\ZKLT.Hadoop\ZKLT.Hadoop.csproj", "{CD7387DB-B80A-412E-89B9-830EFD28C0F4}" + ProjectSection(ProjectDependencies) = postProject + {D189A534-BABE-4B96-8106-F860B94C1F99} = {D189A534-BABE-4B96-8106-F860B94C1F99} + EndProjectSection EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Hadoop", "Hadoop", "{14EA48C3-3F3A-4789-BB0B-5485771D3840}" EndProject @@ -13,6 +16,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ZKLT.Hadoop.Interface", "Ha EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ZKLT.Hadoop.API", "Hadoop\ZKLT.Hadoop.API\ZKLT.Hadoop.API.csproj", "{398DF3E5-B94B-4B2E-9DC6-B6741D7CF4DB}" EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ZKLT.Quartz", "Crontab\ZKLT.Quartz.csproj", "{D189A534-BABE-4B96-8106-F860B94C1F99}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -35,6 +40,10 @@ Global {398DF3E5-B94B-4B2E-9DC6-B6741D7CF4DB}.Debug|Any CPU.Build.0 = Debug|Any CPU {398DF3E5-B94B-4B2E-9DC6-B6741D7CF4DB}.Release|Any CPU.ActiveCfg = Release|Any CPU {398DF3E5-B94B-4B2E-9DC6-B6741D7CF4DB}.Release|Any CPU.Build.0 = Release|Any CPU + {D189A534-BABE-4B96-8106-F860B94C1F99}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {D189A534-BABE-4B96-8106-F860B94C1F99}.Debug|Any CPU.Build.0 = Debug|Any CPU + {D189A534-BABE-4B96-8106-F860B94C1F99}.Release|Any CPU.ActiveCfg = Release|Any CPU + {D189A534-BABE-4B96-8106-F860B94C1F99}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -44,6 +53,7 @@ Global {070B12FF-0B5A-4D0B-B444-70D6F91FB338} = {14EA48C3-3F3A-4789-BB0B-5485771D3840} {893FE0B2-8D14-42BB-B19E-0FD09EEA2433} = {14EA48C3-3F3A-4789-BB0B-5485771D3840} {398DF3E5-B94B-4B2E-9DC6-B6741D7CF4DB} = {14EA48C3-3F3A-4789-BB0B-5485771D3840} + {D189A534-BABE-4B96-8106-F860B94C1F99} = {14EA48C3-3F3A-4789-BB0B-5485771D3840} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {87BED7C0-F181-4EA3-85CD-6D146DA33FF3}