You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

153 lines
6.5 KiB
C#

using Newtonsoft.Json.Linq;
using Quartz;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Reflection.PortableExecutable;
using System.Text;
using System.Threading.Tasks;
using ZKLT.Quartz.Model;
using ZKLT.Hadoop.Interface;
using Newtonsoft.Json;
using ZKLT.Hadoop.Model;
using System.Dynamic;
namespace ZKLT.Quartz.Job
{
/// <summary>
/// HTTP作业类
/// </summary>
public class HttpJob : IJob
{
public async Task Execute(IJobExecutionContext context)
{
JobDataMap dataMap = context.JobDetail.JobDataMap;
IHadoopService _HadoopService = (IHadoopService)dataMap.Get("hadoop");
// 任务配置项
List<QZ_JobConfig> configs = (List<QZ_JobConfig>)dataMap.Get("Params");
// 任务Id
string taskId = dataMap.Get("TaskId").ToString();
// 是否开启日志
int isLog = (int)dataMap.Get("IsLog");
// 请求地址
string url = configs.Find(_item => _item.Key == "Url").Value;
// 请求类型
HttpMethod httpMethod = configs.Find(_item => _item.Key == "Method").Value == "GET" ? HttpMethod.Get : HttpMethod.Post;
// 返回数据储存表
string bindDatabase = configs.Find(_item => _item.Key == "BindDatabse").Value;
// 返回数组储存表的唯一键,用做 更新/新增 判断
string bindDatabasePrimary = configs.Find(_item => _item.Key == "BindDatabasePrimary").Value;
// 返回数据集合名
string resultSet = configs.Find(_item => _item.Key == "ResultSet").Value;
using (HttpClient client = new HttpClient())
{
try
{
var resultColumnConfigs = new List<QZ_JobConfig>();
HttpRequestMessage request = new HttpRequestMessage(httpMethod, url);
foreach (var _item in configs)
{
if (_item.Group! == "Headers")
{
request.Headers.Add(_item.Key, _item.Value);
}
else if (_item.Group! == "ResultColumns")
{
resultColumnConfigs.Add(_item);
}
}
//请求提交
HttpResponseMessage response = await client.SendAsync(request);
response.EnsureSuccessStatusCode();
string responseBody = await response.Content.ReadAsStringAsync();
var responseObject = JsonConvert.DeserializeObject<JObject>(responseBody);
JArray responseList = (JArray)responseObject.GetValue(resultSet);
// 如果设定了绑定表 插入表中
if (bindDatabase != null)
{
HDP_Command[] commands = makeCommand(bindDatabase,bindDatabasePrimary,_HadoopService,responseList, resultColumnConfigs);
_HadoopService.PatchCommand(commands);
}
// 日志开启,插入日志
if (isLog == HDP_Task.LOGOPEN)
{
var data = new HDP_TaskLog()
{
Id = "TL" + DateTime.Now.ToUniversalTime(),
TaskId = taskId,
Content = responseBody
};
var logCommand = new HDP_Command()
{
TableId = HDP_TaskLog.TABLEID,
Data = JToken.FromObject(data)
};
_HadoopService.Insert(logCommand);
}
}
catch (HttpRequestException e)
{
}
}
await Task.CompletedTask;
}
/// <summary>
///
/// </summary>
/// <param name="bindDatabase">绑定数据表</param>
/// <param name="bindDatabasePrimary">数据唯一值</param>
/// <param name="_HadoopService">hadoop实例</param>
/// <param name="responseList">请求返回值</param>
/// <param name="configList">任务配置项</param>
/// <returns></returns>
public HDP_Command[] makeCommand(string bindDatabase,string bindDatabasePrimary,IHadoopService _HadoopService, JArray responseList,List<QZ_JobConfig> configList)
{
//查询数据库是否存在数据
var queryCommand = new HDP_Command()
{
TableId = bindDatabase
};
var oldDataDapperRow = _HadoopService.Query<dynamic>(queryCommand).ToList();
var oldData = new List<JObject>();
foreach (var row in oldDataDapperRow)
{
var rowJson = JsonConvert.SerializeObject(row);
JObject rowJObject = JObject.Parse(rowJson);
oldData.Add(rowJObject);
}
HDP_Command[] commands = new HDP_Command?[responseList.Count];
foreach (var _item in responseList)
{
var command = new HDP_Command();
command.TableId = bindDatabase;
command.Type = HDP_CommandType.INSERT;
var data = new JObject();
foreach (var _config in configList)
{
// _config.Key 结果字段 _config.Value 数据库字段
data[_config.Value] = _item[_config.Key];
if (_config.Value == bindDatabasePrimary && command.Type == HDP_CommandType.INSERT)
{
var oldItem = oldData.Find(_oItem => _oItem.GetValue(bindDatabasePrimary).ToString() == data.GetValue(bindDatabasePrimary).ToString());
if (oldItem != null)
{
command.Where = new JObject()
{
{bindDatabasePrimary, "=" }
};
command.Type = HDP_CommandType.UPDATE;
}
}
}
command.Data = data;
int _index = responseList.IndexOf(_item);
commands[_index] = command;
}
return commands;
}
}
}