You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

691 lines
23 KiB
C#

9 months ago
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.DependencyInjection;
using MySqlX.XDevAPI.Relational;
using Newtonsoft.Json.Linq;
9 months ago
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Transactions;
using ZKLT.Hadoop.Interface;
using ZKLT.Hadoop.Model;
namespace ZKLT.Hadoop
{
/// <summary>
/// 云计算扩展
/// </summary>
public static class HadoopServiceExtend
{
/// <summary>
/// 注入云计算服务
/// </summary>
/// <param name="services">服务集合</param>
/// <returns></returns>
public static IServiceCollection AddHadoop(this IServiceCollection services)
{
services.AddSingleton<IHadoopService, HadoopService>();
services.AddSingleton<ITableService, TableService>();
return services;
}
/// <summary>
/// 应用云计算
/// </summary>
/// <param name="app">应用</param>
/// <param name="config">源配置</param>
/// <returns></returns>
public static IApplicationBuilder UseHadoop(this IApplicationBuilder app, Action<HDP_Source> config)
{
var _HadoopService = app.ApplicationServices.GetRequiredService<IHadoopService>();
_HadoopService.Init(config);
return app;
}
}
/// <summary>
/// 云计算服务
/// </summary>
public class HadoopService : IHadoopService
{
public HadoopService(ITableService tableService)
{
_TableService = tableService;
_Source = new HDP_Source();
_Tables = new List<HDP_Table>();
}
private ITableService _TableService;
private HDP_Source _Source;
private List<HDP_Table> _Tables;
/// <summary>
/// 初始化云计算
/// </summary>
/// <param name="config">配置</param>
public void Init(Action<HDP_Source> config)
{
if (config == null)
{
throw new ArgumentNullException("配置无效");
}
config(_Source);
//参数校验
if (string.IsNullOrEmpty(_Source.Host))
{
throw new ArgumentException("主机无效");
}
if (string.IsNullOrEmpty(_Source.Key))
{
throw new ArgumentException("源无效");
}
if (string.IsNullOrEmpty(_Source.Account))
{
throw new ArgumentException("用户名无效");
}
if (string.IsNullOrEmpty(_Source.PassWord))
{
throw new ArgumentException("密码无效");
}
//初始化
if (_Source.Port == null)
{
_Source.Port = 3306;
}
if (string.IsNullOrEmpty(_Source.Id))
{
_Source.Id = "";
}
if (string.IsNullOrEmpty(_Source.Description))
{
_Source.Description = "云计算系统";
}
var _source = HDP_Table.Class2Table<HDP_Source>();
_Tables.Add(_source);
if (!_TableService.InitStruct(_Source, _source))
{
throw new Exception("初始化数据源失败");
}
var _table = HDP_Table.Class2Table<HDP_Table>();
_Tables.Add(_table);
if (!_TableService.InitStruct(_Source, _table))
{
throw new Exception("初始化数据表失败");
}
var _column = HDP_Table.Class2Table<HDP_Column>();
_Tables.Add(_column);
if (!_TableService.InitStruct(_Source, _column))
{
throw new Exception("初始化数据列失败");
}
}
/// <summary>
/// 获取源
/// </summary>
/// <param name="sourceid">数据源编号</param>
/// <returns>结果</returns>
public HDP_Source? GetSource(string sourceid)
{
if (string.IsNullOrEmpty(sourceid) || _Source.Id == sourceid)
{
return _Source;
}
var _result = _TableService.QuerySingle<HDP_Source>(_Source, GetTable("HDP_Source")!, new JObject
9 months ago
{
{ "Id","=" }
}, new JObject {
9 months ago
{ "Id",sourceid}
}, null);
9 months ago
return _result;
}
/// <summary>
/// 创建源
/// </summary>
/// <param name="source">源</param>
/// <returns>是否成功</returns>
public bool InsertSource(HDP_Source source)
{
//校验
if (string.IsNullOrEmpty(source.Id))
{
throw new ArgumentNullException("编号无效");
}
if (string.IsNullOrEmpty(source.Host))
{
throw new ArgumentNullException("主机无效");
}
if (source.Port == null)
{
throw new ArgumentNullException("端口无效");
}
if (string.IsNullOrEmpty(source.Key))
{
throw new ArgumentNullException("数据源键无效");
}
if (string.IsNullOrEmpty(source.Account))
{
throw new ArgumentNullException("用户名无效");
}
if (string.IsNullOrEmpty(source.PassWord))
{
throw new ArgumentNullException("密码无效");
}
if (GetSource(source.Id) != null)
{
throw new ArgumentException("编号已存在");
}
return _TableService.Insert(_Source, GetTable("HDP_Source")!, HDP_Table.Class2JObject(source));
9 months ago
}
/// <summary>
/// 更新源
/// </summary>
/// <param name="source">源</param>
/// <returns>是否成功</returns>
public bool UpdateSource(HDP_Source source)
{
//校验
if (string.IsNullOrEmpty(source.Id))
{
throw new ArgumentNullException("编号无效");
}
if (string.IsNullOrEmpty(source.Host))
{
throw new ArgumentNullException("主机无效");
}
if (source.Port == null)
{
throw new ArgumentNullException("端口无效");
}
if (string.IsNullOrEmpty(source.Key))
{
throw new ArgumentNullException("数据源键无效");
}
if (string.IsNullOrEmpty(source.Account))
{
throw new ArgumentNullException("用户名无效");
}
if (string.IsNullOrEmpty(source.PassWord))
{
throw new ArgumentNullException("密码无效");
}
if (GetSource(source.Id) == null)
{
throw new ArgumentException("编号不存在");
}
return _TableService.Update(_Source, GetTable("HDP_Source")!, new JObject {
9 months ago
{"Id","=" }
}, HDP_Table.Class2JObject(source));
9 months ago
}
/// <summary>
/// 删除源
/// </summary>
/// <param name="sourceid">源</param>
/// <returns>是否成功</returns>
public bool DeleteSource(string sourceid)
{
//校验
if (string.IsNullOrEmpty(sourceid))
{
throw new ArgumentNullException("编号无效");
}
if (GetSource(sourceid) == null)
{
throw new ArgumentException("编号不存在");
}
return _TableService.Delete(_Source, GetTable("HDP_Source")!, new JObject{
9 months ago
{"Id","=" }
}, new JObject{
9 months ago
{"Id",sourceid }
});
}
/// <summary>
/// 查询源
/// </summary>
/// <param name="command">命令</param>
/// <returns>结果</returns>
public HDP_Source[] QuerySource(HDP_Command command)
{
return _TableService.Query<HDP_Source>(_Source, GetTable("HDP_Source")!, command.Where!, command.Data!, command.Order!, command.Col);
9 months ago
}
/// <summary>
/// 获取表
/// </summary>
/// <param name="tableid">表编号</param>
/// <returns>结果</returns>
public HDP_Table? GetTable(string tableid)
{
if (string.IsNullOrEmpty(tableid))
{
throw new ArgumentNullException("数据表编号无效");
}
if (_Tables.Any(x => x.Id == tableid))
{
return _Tables.First(x => x.Id == tableid);
}
var _result = _TableService.QuerySingle<HDP_Table>(_Source, GetTable("HDP_Table")!, new JObject
9 months ago
{
{ "Id","=" }
}, new JObject{
9 months ago
{ "Id",tableid}
}, null);
9 months ago
if (_result != null)
{
_result.Columns = _TableService.Query<HDP_Column>(_Source, GetTable("HDP_Column")!, new JObject {
9 months ago
{ "TableId","="}
}, new JObject{
9 months ago
{"TableId",_result.Id! }
}, null, null);
9 months ago
}
return _result;
}
/// <summary>
/// 创建表
/// </summary>
/// <param name="table">表</param>
/// <returns>是否成功</returns>
public bool InsertTable(HDP_Table table)
{
if (string.IsNullOrEmpty(table.Id))
{
throw new ArgumentNullException("表编号无效");
}
if (GetTable(table.Id) != null)
{
throw new ArgumentNullException("表编号已存在");
}
if (_TableService.InitStruct(_Source, table))
{
using (TransactionScope _scope = new TransactionScope())
{
if (!_TableService.Insert(_Source, GetTable("HDP_Table")!, HDP_Table.Class2JObject(table)))
9 months ago
{
return false;
}
for (var i = 0; i < table.Columns!.Length; i++)
{
var _column = table.Columns![i];
_column.TableId = table.Id;
if (!_TableService.Insert(_Source, GetTable("HDP_Column")!, HDP_Table.Class2JObject(_column)))
9 months ago
{
return false;
}
}
_scope.Complete();
return true;
}
}
else
{
return false;
}
}
/// <summary>
/// 更新表
/// </summary>
/// <param name="table">表</param>
/// <returns>是否成功</returns>
public bool UpdateTable(HDP_Table table)
{
if (string.IsNullOrEmpty(table.Id))
{
throw new ArgumentNullException("表编号无效");
}
if (GetTable(table.Id) == null)
{
throw new ArgumentNullException("表编号不存在");
}
if (_TableService.InitStruct(_Source, table))
{
using (TransactionScope _scope = new TransactionScope())
{
if (!_TableService.Update(_Source, GetTable("HDP_Table")!, new JObject{
9 months ago
{ "Id","="}
}, HDP_Table.Class2JObject(table)))
9 months ago
{
return false;
}
for (var i = 0; i < table.Columns!.Length; i++)
{
var _column = table.Columns![i];
_column.TableId = table.Id;
if (_TableService.QuerySingle<HDP_Column>(_Source, GetTable("HDP_Column")!, new JObject
9 months ago
{
{"Id","=" }
}, HDP_Table.Class2JObject(_column), null) == null)
9 months ago
{
if (!_TableService.Insert(_Source, GetTable("HDP_Column")!, HDP_Table.Class2JObject(_column)))
9 months ago
{
return false;
}
}
else
{
if (!_TableService.Update(_Source, GetTable("HDP_Column")!, new JObject {
9 months ago
{"Id","=" }
}, HDP_Table.Class2JObject(_column)))
9 months ago
{
return false;
}
}
}
_scope.Complete();
return true;
}
}
else
{
return false;
}
}
/// <summary>
/// 删除表
/// </summary>
/// <param name="tableId">表编号</param>
/// <returns>是否成功</returns>
public bool DeleteTable(string tableId)
{
//校验
if (string.IsNullOrEmpty(tableId))
{
throw new ArgumentNullException("编号无效");
}
if (GetTable(tableId) == null)
{
throw new ArgumentException("编号不存在");
}
using (TransactionScope _scope = new TransactionScope())
{
if (!_TableService.Delete(_Source, GetTable("HDP_Table")!, new JObject{
9 months ago
{"Id","=" }
}, new JObject {
9 months ago
{"Id",tableId }
}))
{
return false;
}
if (!_TableService.Delete(_Source, GetTable("HDP_Column")!, new JObject{
9 months ago
{ "TableId","="}
}, new JObject {
9 months ago
{"TableId",tableId }
}))
{
return false;
}
_scope.Complete();
return true;
}
}
/// <summary>
/// 查询表
/// </summary>
/// <param name="command">命令</param>
/// <returns>结果</returns>
public HDP_Table[] QueryTable(HDP_Command command)
{
return _TableService.Query<HDP_Table>(_Source, GetTable("HDP_Table")!, command.Where!, command.Data!, command.Order!, command.Col!);
9 months ago
}
/// <summary>
/// 插入数据
/// </summary>
/// <param name="command">命令</param>
/// <returns>是否成功</returns>
public bool Insert(HDP_Command command)
{
if (string.IsNullOrEmpty(command.TableId))
{
throw new ArgumentNullException("表无效");
}
var _table = GetTable(command.TableId);
if (_table == null)
{
throw new ArgumentException("表不存在");
}
var _source = GetSource(_table.SourceId!);
return _TableService.Insert(_source!, _table, command.Data!);
}
/// <summary>
/// 更新数据
/// </summary>
/// <param name="command"></param>
/// <returns>是否成功</returns>
public bool Update(HDP_Command command)
{
if (string.IsNullOrEmpty(command.TableId))
{
throw new ArgumentNullException("表无效");
}
var _table = GetTable(command.TableId);
if (_table == null)
{
throw new ArgumentException("表不存在");
}
var _source = GetSource(_table.SourceId!);
return _TableService.Update(_source!, _table, command.Where!, command.Data!);
}
/// <summary>
/// 删除数据
/// </summary>
/// <param name="command">命令</param>
/// <returns>是否成功</returns>
public bool Delete(HDP_Command command)
{
if (string.IsNullOrEmpty(command.TableId))
{
throw new ArgumentNullException("表无效");
}
var _table = GetTable(command.TableId);
if (_table == null)
{
throw new ArgumentException("表不存在");
}
var _source = GetSource(_table.SourceId!);
return _TableService.Delete(_source!, _table, command.Where!, command.Data!);
}
/// <summary>
/// 查询单条
/// </summary>
/// <typeparam name="T">返回类型</typeparam>
/// <param name="command">命令</param>
/// <returns>结果</returns>
public T? QuerySingle<T>(HDP_Command command)
{
if (string.IsNullOrEmpty(command.TableId))
{
throw new ArgumentNullException("表无效");
}
var _table = GetTable(command.TableId);
if (_table == null)
{
throw new ArgumentException("表不存在");
}
var _source = GetSource(_table.SourceId!);
return _TableService.QuerySingle<T>(_source!, _table, command.Where!, command.Data!, command.Col);
9 months ago
}
/// <summary>
/// 查询列表
/// </summary>
/// <typeparam name="T">返回类型</typeparam>
/// <param name="command">命令</param>
/// <returns>结果</returns>
public T[] Query<T>(HDP_Command command)
{
if (string.IsNullOrEmpty(command.TableId))
{
throw new ArgumentNullException("表无效");
}
var _table = GetTable(command.TableId);
if (_table == null)
{
throw new ArgumentException("表不存在");
}
var _source = GetSource(_table.SourceId!);
return _TableService.Query<T>(_source!, _table, command.Where!, command.Data!, command.Order!, command.Col);
9 months ago
}
/// <summary>
/// 分页查询
/// </summary>
/// <typeparam name="T">返回类型</typeparam>
/// <param name="command">命令</param>
/// <returns>结果</returns>
public HDP_Page<T> Page<T>(HDP_Command command)
{
if (string.IsNullOrEmpty(command.TableId))
{
throw new ArgumentNullException("表无效");
}
if (command.PageIndex == null || command.PageIndex <= 0)
{
throw new ArgumentNullException("分页下标无效");
}
if (command.PageSize == null || command.PageSize <= 0)
{
throw new ArgumentNullException("分页大小无效");
}
var _table = GetTable(command.TableId);
if (_table == null)
{
throw new ArgumentException("表不存在");
}
var _source = GetSource(_table.SourceId!);
return _TableService.QueryPage<T>(_source!, _table, (int)command.PageIndex, (int)command.PageSize, command.Where!, command.Data!, command.Order!, command.Col!);
9 months ago
}
/// <summary>
/// 批量执行任务
/// </summary>
/// <param name="command">指令</param>
/// <returns></returns>
public object?[] PatchCommand(HDP_Command[] command)
{
object?[] _result = new object?[command.Length];
using (TransactionScope _scope = new TransactionScope())
{
var _isComplete = true;
for (var i = 0; i < command.Length; i++)
{
try
{
if (command[i].Type == HDP_CommandType.INSERT)
{
var _temp = Insert(command[i]);
if (!_temp)
{
_isComplete = false;
}
_result[i] = _temp;
}
else if (command[i].Type == HDP_CommandType.UPDATE)
{
var _temp = Update(command[i]);
if (!_temp)
{
_isComplete = false;
}
_result[i] = _temp;
}
else if (command[i].Type == HDP_CommandType.DELETE)
{
var _temp = Delete(command[i]);
if (!_temp)
{
_isComplete = false;
}
_result[i] = _temp;
}
else if (command[i].Type == HDP_CommandType.QUERYSINGLE || command[i].Type == HDP_CommandType.QUERY || command[i].Type == HDP_CommandType.PAGE)
{
continue;
}
else
{
_result[i] = "不支持该命令";
_isComplete = false;
}
}
catch (Exception ex)
{
_result[i] = ex.Message;
_isComplete = false;
}
}
if (_isComplete)
{
_scope.Complete();
}
}
for (var i = 0; i < command.Length; i++)
{
try
{
if (command[i].Type == HDP_CommandType.INSERT || command[i].Type == HDP_CommandType.UPDATE || command[i].Type == HDP_CommandType.DELETE)
{
continue;
}
else if (command[i].Type == HDP_CommandType.QUERYSINGLE)
{
var _temp = QuerySingle<dynamic>(command[i]);
_result[i] = _temp;
}
else if (command[i].Type == HDP_CommandType.QUERY)
{
var _temp = Query<dynamic>(command[i]);
_result[i] = _temp;
}
else if (command[i].Type == HDP_CommandType.PAGE)
{
var _temp = Page<dynamic>(command[i]);
_result[i] = _temp;
}
else
{
_result[i] = "不支持该命令";
}
}
catch (Exception ex)
{
_result[i] = ex.Message;
}
}
return _result.ToArray();
}
9 months ago
}
}