using Newtonsoft.Json.Linq; using Quartz; using System; using System.Collections.Generic; using System.Linq; using System.Net; using System.Reflection.PortableExecutable; using System.Text; using System.Threading.Tasks; using ZKLT.Quartz.Model; using ZKLT.Hadoop.Interface; using Newtonsoft.Json; using ZKLT.Hadoop.Model; using System.Dynamic; namespace ZKLT.Quartz.Job { /// /// HTTP作业类 /// public class HttpJob : IJob { public async Task Execute(IJobExecutionContext context) { JobDataMap dataMap = context.JobDetail.JobDataMap; IHadoopService _HadoopService = (IHadoopService)dataMap.Get("hadoop"); // 任务配置项 List configs = (List)dataMap.Get("Params"); // 任务Id string taskId = dataMap.Get("TaskId").ToString(); // 是否开启日志 int isLog = (int)dataMap.Get("IsLog"); // 请求地址 string url = configs.Find(_item => _item.Key == "Url").Value; // 请求类型 HttpMethod httpMethod = configs.Find(_item => _item.Key == "Method").Value == "GET" ? HttpMethod.Get : HttpMethod.Post; // 返回数据储存表 string bindDatabase = configs.Find(_item => _item.Key == "BindDatabse").Value; // 返回数组储存表的唯一键,用做 更新/新增 判断 string bindDatabasePrimary = configs.Find(_item => _item.Key == "BindDatabasePrimary").Value; // 返回数据集合名 string resultSet = configs.Find(_item => _item.Key == "ResultSet").Value; using (HttpClient client = new HttpClient()) { try { var resultColumnConfigs = new List(); HttpRequestMessage request = new HttpRequestMessage(httpMethod, url); foreach (var _item in configs) { if (_item.Group! == "Headers") { request.Headers.Add(_item.Key, _item.Value); } else if (_item.Group! == "ResultColumns") { resultColumnConfigs.Add(_item); } } //请求提交 HttpResponseMessage response = await client.SendAsync(request); response.EnsureSuccessStatusCode(); string responseBody = await response.Content.ReadAsStringAsync(); var responseObject = JsonConvert.DeserializeObject(responseBody); JArray responseList = (JArray)responseObject.GetValue(resultSet); // 如果设定了绑定表 插入表中 if (bindDatabase != null) { HDP_Command[] commands = makeCommand(bindDatabase,bindDatabasePrimary,_HadoopService,responseList, resultColumnConfigs); _HadoopService.PatchCommand(commands); } // 日志开启,插入日志 if (isLog == HDP_Task.LOGOPEN) { var data = new HDP_TaskLog() { Id = "TL" + DateTime.Now.ToUniversalTime(), TaskId = taskId, Content = responseBody }; var logCommand = new HDP_Command() { TableId = HDP_TaskLog.TABLEID, Data = JToken.FromObject(data) }; _HadoopService.Insert(logCommand); } } catch (HttpRequestException e) { } } await Task.CompletedTask; } /// /// /// /// 绑定数据表 /// 数据唯一值 /// hadoop实例 /// 请求返回值 /// 任务配置项 /// public HDP_Command[] makeCommand(string bindDatabase,string bindDatabasePrimary,IHadoopService _HadoopService, JArray responseList,List configList) { //查询数据库是否存在数据 var queryCommand = new HDP_Command() { TableId = bindDatabase }; var oldDataDapperRow = _HadoopService.Query(queryCommand).ToList(); var oldData = new List(); foreach (var row in oldDataDapperRow) { var rowJson = JsonConvert.SerializeObject(row); JObject rowJObject = JObject.Parse(rowJson); oldData.Add(rowJObject); } HDP_Command[] commands = new HDP_Command?[responseList.Count]; foreach (var _item in responseList) { var command = new HDP_Command(); command.TableId = bindDatabase; command.Type = HDP_CommandType.INSERT; var data = new JObject(); foreach (var _config in configList) { // _config.Key 结果字段 _config.Value 数据库字段 data[_config.Value] = _item[_config.Key]; if (_config.Value == bindDatabasePrimary && command.Type == HDP_CommandType.INSERT) { var oldItem = oldData.Find(_oItem => _oItem.GetValue(bindDatabasePrimary).ToString() == data.GetValue(bindDatabasePrimary).ToString()); if (oldItem != null) { command.Where = new JObject() { {bindDatabasePrimary, "=" } }; command.Type = HDP_CommandType.UPDATE; } } } command.Data = data; int _index = responseList.IndexOf(_item); commands[_index] = command; } return commands; } } }