|
|
|
@ -35,6 +35,8 @@ namespace ZKLT.Quartz.Job
|
|
|
|
|
HttpMethod httpMethod = configs.Find(_item => _item.Key == "Method").Value == "GET" ? HttpMethod.Get : HttpMethod.Post;
|
|
|
|
|
// 返回数据储存表
|
|
|
|
|
string bindDatabase = configs.Find(_item => _item.Key == "BindDatabse").Value;
|
|
|
|
|
// 返回数组储存表的唯一键,用做 更新/新增 判断
|
|
|
|
|
string bindDatabasePrimary = configs.Find(_item => _item.Key == "BindDatabasePrimary").Value;
|
|
|
|
|
// 返回数据集合名
|
|
|
|
|
string resultSet = configs.Find(_item => _item.Key == "ResultSet").Value;
|
|
|
|
|
using (HttpClient client = new HttpClient())
|
|
|
|
@ -63,7 +65,7 @@ namespace ZKLT.Quartz.Job
|
|
|
|
|
// 如果设定了绑定表 插入表中
|
|
|
|
|
if (bindDatabase != null)
|
|
|
|
|
{
|
|
|
|
|
HDP_Command[] commands = makeCommand(bindDatabase, responseList, resultColumnConfigs);
|
|
|
|
|
HDP_Command[] commands = makeCommand(bindDatabase,bindDatabasePrimary,_HadoopService,responseList, resultColumnConfigs);
|
|
|
|
|
_HadoopService.PatchCommand(commands);
|
|
|
|
|
}
|
|
|
|
|
// 日志开启,插入日志
|
|
|
|
@ -85,16 +87,35 @@ namespace ZKLT.Quartz.Job
|
|
|
|
|
}
|
|
|
|
|
catch (HttpRequestException e)
|
|
|
|
|
{
|
|
|
|
|
Console.WriteLine($"请求错误{e.Message}");
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
await Task.CompletedTask;
|
|
|
|
|
}
|
|
|
|
|
/// <summary>
|
|
|
|
|
///
|
|
|
|
|
/// </summary>
|
|
|
|
|
/// <param name="bindDatabase">绑定数据表</param>
|
|
|
|
|
/// <param name="bindDatabasePrimary">数据唯一值</param>
|
|
|
|
|
/// <param name="_HadoopService">hadoop实例</param>
|
|
|
|
|
/// <param name="responseList">请求返回值</param>
|
|
|
|
|
/// <param name="configList">任务配置项</param>
|
|
|
|
|
/// <returns></returns>
|
|
|
|
|
|
|
|
|
|
public HDP_Command[] makeCommand(string bindDatabase,JArray responseList,List<QZ_JobConfig> configList)
|
|
|
|
|
public HDP_Command[] makeCommand(string bindDatabase,string bindDatabasePrimary,IHadoopService _HadoopService, JArray responseList,List<QZ_JobConfig> configList)
|
|
|
|
|
{
|
|
|
|
|
//查询数据库是否存在数据
|
|
|
|
|
var queryCommand = new HDP_Command()
|
|
|
|
|
{
|
|
|
|
|
TableId = bindDatabase
|
|
|
|
|
};
|
|
|
|
|
var oldDataDapperRow = _HadoopService.Query<dynamic>(queryCommand).ToList();
|
|
|
|
|
var oldData = new List<JObject>();
|
|
|
|
|
foreach (var row in oldDataDapperRow)
|
|
|
|
|
{
|
|
|
|
|
var rowJson = JsonConvert.SerializeObject(row);
|
|
|
|
|
JObject rowJObject = JObject.Parse(rowJson);
|
|
|
|
|
oldData.Add(rowJObject);
|
|
|
|
|
}
|
|
|
|
|
HDP_Command[] commands = new HDP_Command?[responseList.Count];
|
|
|
|
|
foreach (var _item in responseList)
|
|
|
|
|
{
|
|
|
|
@ -104,7 +125,22 @@ namespace ZKLT.Quartz.Job
|
|
|
|
|
var data = new JObject();
|
|
|
|
|
foreach (var _config in configList)
|
|
|
|
|
{
|
|
|
|
|
// _config.Key 结果字段 _config.Value 数据库字段
|
|
|
|
|
data[_config.Value] = _item[_config.Key];
|
|
|
|
|
|
|
|
|
|
if (_config.Value == bindDatabasePrimary && command.Type == HDP_CommandType.INSERT)
|
|
|
|
|
{
|
|
|
|
|
|
|
|
|
|
var oldItem = oldData.Find(_oItem => _oItem.GetValue(bindDatabasePrimary).ToString() == data.GetValue(bindDatabasePrimary).ToString());
|
|
|
|
|
if (oldItem != null)
|
|
|
|
|
{
|
|
|
|
|
command.Where = new JObject()
|
|
|
|
|
{
|
|
|
|
|
{bindDatabasePrimary, "=" }
|
|
|
|
|
};
|
|
|
|
|
command.Type = HDP_CommandType.UPDATE;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
command.Data = data;
|
|
|
|
|
int _index = responseList.IndexOf(_item);
|
|
|
|
|