|
|
@ -0,0 +1,152 @@
|
|
|
|
|
|
|
|
using Newtonsoft.Json.Linq;
|
|
|
|
|
|
|
|
using Quartz;
|
|
|
|
|
|
|
|
using System;
|
|
|
|
|
|
|
|
using System.Collections.Generic;
|
|
|
|
|
|
|
|
using System.Linq;
|
|
|
|
|
|
|
|
using System.Net;
|
|
|
|
|
|
|
|
using System.Reflection.PortableExecutable;
|
|
|
|
|
|
|
|
using System.Text;
|
|
|
|
|
|
|
|
using System.Threading.Tasks;
|
|
|
|
|
|
|
|
using ZKLT.Quartz.Model;
|
|
|
|
|
|
|
|
using ZKLT.Hadoop.Interface;
|
|
|
|
|
|
|
|
using Newtonsoft.Json;
|
|
|
|
|
|
|
|
using ZKLT.Hadoop.Model;
|
|
|
|
|
|
|
|
using System.Dynamic;
|
|
|
|
|
|
|
|
namespace ZKLT.Quartz.Job
|
|
|
|
|
|
|
|
{
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
|
|
|
/// HTTP作业类
|
|
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
|
|
public class HttpJob : IJob
|
|
|
|
|
|
|
|
{
|
|
|
|
|
|
|
|
public async Task Execute(IJobExecutionContext context)
|
|
|
|
|
|
|
|
{
|
|
|
|
|
|
|
|
JobDataMap dataMap = context.JobDetail.JobDataMap;
|
|
|
|
|
|
|
|
IHadoopService _HadoopService = (IHadoopService)dataMap.Get("hadoop");
|
|
|
|
|
|
|
|
// 任务配置项
|
|
|
|
|
|
|
|
List<QZ_JobConfig> configs = (List<QZ_JobConfig>)dataMap.Get("Params");
|
|
|
|
|
|
|
|
// 任务Id
|
|
|
|
|
|
|
|
string taskId = dataMap.Get("TaskId").ToString();
|
|
|
|
|
|
|
|
// 是否开启日志
|
|
|
|
|
|
|
|
int isLog = (int)dataMap.Get("IsLog");
|
|
|
|
|
|
|
|
// 请求地址
|
|
|
|
|
|
|
|
string url = configs.Find(_item => _item.Key == "Url").Value;
|
|
|
|
|
|
|
|
// 请求类型
|
|
|
|
|
|
|
|
HttpMethod httpMethod = configs.Find(_item => _item.Key == "Method").Value == "GET" ? HttpMethod.Get : HttpMethod.Post;
|
|
|
|
|
|
|
|
// 返回数据储存表
|
|
|
|
|
|
|
|
string bindDatabase = configs.Find(_item => _item.Key == "BindDatabse").Value;
|
|
|
|
|
|
|
|
// 返回数组储存表的唯一键,用做 更新/新增 判断
|
|
|
|
|
|
|
|
string bindDatabasePrimary = configs.Find(_item => _item.Key == "BindDatabasePrimary").Value;
|
|
|
|
|
|
|
|
// 返回数据集合名
|
|
|
|
|
|
|
|
string resultSet = configs.Find(_item => _item.Key == "ResultSet").Value;
|
|
|
|
|
|
|
|
using (HttpClient client = new HttpClient())
|
|
|
|
|
|
|
|
{
|
|
|
|
|
|
|
|
try
|
|
|
|
|
|
|
|
{
|
|
|
|
|
|
|
|
var resultColumnConfigs = new List<QZ_JobConfig>();
|
|
|
|
|
|
|
|
HttpRequestMessage request = new HttpRequestMessage(httpMethod, url);
|
|
|
|
|
|
|
|
foreach (var _item in configs)
|
|
|
|
|
|
|
|
{
|
|
|
|
|
|
|
|
if (_item.Group! == "Headers")
|
|
|
|
|
|
|
|
{
|
|
|
|
|
|
|
|
request.Headers.Add(_item.Key, _item.Value);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
else if (_item.Group! == "ResultColumns")
|
|
|
|
|
|
|
|
{
|
|
|
|
|
|
|
|
resultColumnConfigs.Add(_item);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
//请求提交
|
|
|
|
|
|
|
|
HttpResponseMessage response = await client.SendAsync(request);
|
|
|
|
|
|
|
|
response.EnsureSuccessStatusCode();
|
|
|
|
|
|
|
|
string responseBody = await response.Content.ReadAsStringAsync();
|
|
|
|
|
|
|
|
var responseObject = JsonConvert.DeserializeObject<JObject>(responseBody);
|
|
|
|
|
|
|
|
JArray responseList = (JArray)responseObject.GetValue(resultSet);
|
|
|
|
|
|
|
|
// 如果设定了绑定表 插入表中
|
|
|
|
|
|
|
|
if (bindDatabase != null)
|
|
|
|
|
|
|
|
{
|
|
|
|
|
|
|
|
HDP_Command[] commands = makeCommand(bindDatabase,bindDatabasePrimary,_HadoopService,responseList, resultColumnConfigs);
|
|
|
|
|
|
|
|
_HadoopService.PatchCommand(commands);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
// 日志开启,插入日志
|
|
|
|
|
|
|
|
if (isLog == HDP_Task.LOGOPEN)
|
|
|
|
|
|
|
|
{
|
|
|
|
|
|
|
|
var data = new HDP_TaskLog()
|
|
|
|
|
|
|
|
{
|
|
|
|
|
|
|
|
Id = "TL" + DateTime.Now.ToUniversalTime(),
|
|
|
|
|
|
|
|
TaskId = taskId,
|
|
|
|
|
|
|
|
Content = responseBody
|
|
|
|
|
|
|
|
};
|
|
|
|
|
|
|
|
var logCommand = new HDP_Command()
|
|
|
|
|
|
|
|
{
|
|
|
|
|
|
|
|
TableId = HDP_TaskLog.TABLEID,
|
|
|
|
|
|
|
|
Data = JToken.FromObject(data)
|
|
|
|
|
|
|
|
};
|
|
|
|
|
|
|
|
_HadoopService.Insert(logCommand);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
catch (HttpRequestException e)
|
|
|
|
|
|
|
|
{
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
await Task.CompletedTask;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
/// <summary>
|
|
|
|
|
|
|
|
///
|
|
|
|
|
|
|
|
/// </summary>
|
|
|
|
|
|
|
|
/// <param name="bindDatabase">绑定数据表</param>
|
|
|
|
|
|
|
|
/// <param name="bindDatabasePrimary">数据唯一值</param>
|
|
|
|
|
|
|
|
/// <param name="_HadoopService">hadoop实例</param>
|
|
|
|
|
|
|
|
/// <param name="responseList">请求返回值</param>
|
|
|
|
|
|
|
|
/// <param name="configList">任务配置项</param>
|
|
|
|
|
|
|
|
/// <returns></returns>
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public HDP_Command[] makeCommand(string bindDatabase,string bindDatabasePrimary,IHadoopService _HadoopService, JArray responseList,List<QZ_JobConfig> configList)
|
|
|
|
|
|
|
|
{
|
|
|
|
|
|
|
|
//查询数据库是否存在数据
|
|
|
|
|
|
|
|
var queryCommand = new HDP_Command()
|
|
|
|
|
|
|
|
{
|
|
|
|
|
|
|
|
TableId = bindDatabase
|
|
|
|
|
|
|
|
};
|
|
|
|
|
|
|
|
var oldDataDapperRow = _HadoopService.Query<dynamic>(queryCommand).ToList();
|
|
|
|
|
|
|
|
var oldData = new List<JObject>();
|
|
|
|
|
|
|
|
foreach (var row in oldDataDapperRow)
|
|
|
|
|
|
|
|
{
|
|
|
|
|
|
|
|
var rowJson = JsonConvert.SerializeObject(row);
|
|
|
|
|
|
|
|
JObject rowJObject = JObject.Parse(rowJson);
|
|
|
|
|
|
|
|
oldData.Add(rowJObject);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
HDP_Command[] commands = new HDP_Command?[responseList.Count];
|
|
|
|
|
|
|
|
foreach (var _item in responseList)
|
|
|
|
|
|
|
|
{
|
|
|
|
|
|
|
|
var command = new HDP_Command();
|
|
|
|
|
|
|
|
command.TableId = bindDatabase;
|
|
|
|
|
|
|
|
command.Type = HDP_CommandType.INSERT;
|
|
|
|
|
|
|
|
var data = new JObject();
|
|
|
|
|
|
|
|
foreach (var _config in configList)
|
|
|
|
|
|
|
|
{
|
|
|
|
|
|
|
|
// _config.Key 结果字段 _config.Value 数据库字段
|
|
|
|
|
|
|
|
data[_config.Value] = _item[_config.Key];
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if (_config.Value == bindDatabasePrimary && command.Type == HDP_CommandType.INSERT)
|
|
|
|
|
|
|
|
{
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
var oldItem = oldData.Find(_oItem => _oItem.GetValue(bindDatabasePrimary).ToString() == data.GetValue(bindDatabasePrimary).ToString());
|
|
|
|
|
|
|
|
if (oldItem != null)
|
|
|
|
|
|
|
|
{
|
|
|
|
|
|
|
|
command.Where = new JObject()
|
|
|
|
|
|
|
|
{
|
|
|
|
|
|
|
|
{bindDatabasePrimary, "=" }
|
|
|
|
|
|
|
|
};
|
|
|
|
|
|
|
|
command.Type = HDP_CommandType.UPDATE;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
command.Data = data;
|
|
|
|
|
|
|
|
int _index = responseList.IndexOf(_item);
|
|
|
|
|
|
|
|
commands[_index] = command;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
return commands;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|