You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

93 lines
3.4 KiB
C#

using Newtonsoft.Json.Linq;
using Quartz;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Reflection.PortableExecutable;
using System.Text;
using System.Threading.Tasks;
using ZKLT.Quartz.Model;
using ZKLT.Hadoop.Interface;
using Newtonsoft.Json;
using ZKLT.Hadoop.Model;
namespace ZKLT.Quartz.Job
{
/// <summary>
/// HTTP作业类
/// </summary>
public class HttpJob : IJob
{
private IHadoopService HadoopService;
public HttpJob(IHadoopService _hadoopService)
{
HadoopService = _hadoopService;
}
public async Task Execute(IJobExecutionContext context)
{
JobDataMap dataMap = context.JobDetail.JobDataMap;
string taskId = dataMap.Get("TaskId").ToString();
Console.WriteLine(taskId);
List<QZ_JobConfig> configs = (List<QZ_JobConfig>)dataMap.Get("Params");
string url = configs.Find(_item => _item.Key == "Url").Value;
HttpMethod httpMethod = configs.Find(_item => _item.Key == "Method").Value == "Get" ? HttpMethod.Get : HttpMethod.Post;
string bindDatabase = configs.Find(_item => _item.Key == "BindDatabse").Value;
using (HttpClient client = new HttpClient())
{
try
{
var resultColumnConfigs = new List<QZ_JobConfig>();
HttpRequestMessage request = new HttpRequestMessage(httpMethod, url);
foreach (var _item in configs)
{
if (_item.Group! == "Headers")
{
request.Headers.Add(_item.Key, _item.Value);
}
else if (_item.Group! == "ResultColumns")
{
resultColumnConfigs.Add(_item);
}
}
//请求提交
HttpResponseMessage response = await client.SendAsync(request);
response.EnsureSuccessStatusCode();
string responseBody = await response.Content.ReadAsStringAsync();
var responseList = JsonConvert.DeserializeObject<List<JObject>>(responseBody);
if(bindDatabase != null)
{
HDP_Command[] commands = makeCommand(bindDatabase, responseList, resultColumnConfigs);
HadoopService.PatchCommand(commands);
}
}
catch (HttpRequestException e)
{
Console.WriteLine($"请求错误{e.Message}");
}
}
await Task.CompletedTask;
}
public HDP_Command[] makeCommand(string bindDatabase,List<JObject> responseList,List<QZ_JobConfig> configList)
{
HDP_Command[] commands = new HDP_Command?[responseList.Count];
foreach (var _item in responseList)
{
var command = new HDP_Command();
command.TableId = bindDatabase;
command.Type = HDP_CommandType.INSERT;
foreach (var _config in configList)
{
command.Data[_config.Value] = _item[_config.Key];
}
}
return commands;
}
}
}