You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1142 lines
45 KiB
C#

using System;
using System.Collections.Generic;
using System.Linq;
using System.Security.Policy;
using System.Text;
using System.Threading.Tasks;
using MySql.Data.MySqlClient;
using Dapper;
using ZKLT.Hadoop.Interface;
using ZKLT.Hadoop.Model;
using MySqlX.XDevAPI.Relational;
using Mysqlx.Crud;
using Newtonsoft.Json;
using Mysqlx.Resultset;
using Newtonsoft.Json.Linq;
using System.Transactions;
namespace ZKLT.Hadoop
{
public class TableService : ITableService
{
public TableService()
{
_Source = new HDP_Source();
_Tables = new List<HDP_Table>();
}
private HDP_Source _Source;
private List<HDP_Table> _Tables;
/// <summary>
/// 合并条件
/// </summary>
/// <param name="table">表</param>
/// <param name="where">条件</param>
/// <param name="data">数据</param>
/// <param name="param">参数</param>
private string MergeWhere(HDP_Table table, JToken? where, JToken? data, Dictionary<string, object> param)
{
StringBuilder _wherestr = new StringBuilder();
string _guid = "";
if (where != null && data != null)
{
_wherestr.AppendLine("WHERE");
var _wheres = new List<JObject>();
var _datas = new List<JObject>();
if (where.Type == JTokenType.Object)
{
_wheres.Add((JObject)where);
_datas.Add((JObject)data);
}
else if (where.Type == JTokenType.Array)
{
for (var i = 0; i < where.Count(); i++)
{
_wheres.Add((JObject)where.Children().ToArray()[i]);
_datas.Add((JObject)data.Children().ToArray()[i]);
}
}
for (var j = 0; j < _wheres.Count; j++)
{
if (j > 0)
{
_wherestr.AppendLine("OR");
}
_wherestr.AppendLine("(");
_wherestr.AppendLine("1 = 1");
var _where = _wheres[j];
var _data = _datas[j];
if (_where.Count > 0)
{
var _fileds = _where.Children().ToArray();
for (var i = 0; i < _fileds.Length; i++)
{
var _item = (JProperty)_fileds[i];
if (table.Columns!.Any(x => x.Key == _item.Name) && _data.ContainsKey(_item.Name))
{
List<string> _Itemv = new List<string>();
List<JToken> _Colv = new List<JToken>();
if (_item.Value.Type == JTokenType.String)
{
_Itemv.Add(_item.Value.ToString());
_Colv.Add(_data[_item.Name]!);
}
else if (_item.Value.Type == JTokenType.Array)
{
_Itemv.AddRange(_item.Value.ToObject<string[]>()!);
_Colv.AddRange(_data[_item.Name]!);
}
for (var k = 0; k < _Itemv.Count; k++)
{
_guid = Guid.NewGuid().ToString("N");
switch (_Itemv[k])
{
case HDP_WhereType.LIKE:
_wherestr.AppendLine(@$"AND `{_item.Name}` {_Itemv[k]} @{_guid}");
param.Add(_guid, $@"%{((JValue)_Colv[k]!).Value!}%");
break;
case HDP_WhereType.IN:
_wherestr.AppendLine(@$"AND `{_item.Name}` {_Itemv[k]} @{_guid}");
param.Add(_guid, _Colv[k]!);
break;
case HDP_WhereType.QUERYIN:
var _Command = JsonConvert.DeserializeObject<HDP_Command>(_Colv[k].ToString())!;
var _Query = QueryString(_Command.SourceId!, _Command.TableId!, _Command.Where, _Command.Data, _Command.Order, _Command.Col, param);
_wherestr.AppendLine(@$"AND `{_item.Name}` IN ({_Query})");
break;
case HDP_WhereType.EQUAL:
case HDP_WhereType.NOTEQUAL:
case HDP_WhereType.MORE:
case HDP_WhereType.LESS:
case HDP_WhereType.MORETHEN:
case HDP_WhereType.LESSTHEN:
case HDP_WhereType.BETWEEN:
default:
_wherestr.AppendLine(@$"AND `{_item.Name}` {_Itemv[k]} @{_guid}");
param.Add(_guid, ((JValue)_Colv[k]!).Value!);
break;
}
}
}
}
}
_wherestr.AppendLine(")");
}
}
return _wherestr.ToString();
}
/// <summary>
/// 合并列
/// </summary>
/// <param name="table">表</param>
/// <param name="cols">列</param>
/// <param name="command">命令</param>
private void MergeCols(HDP_Table table, string[]? cols, StringBuilder command)
{
if (cols == null || cols.Length == 0)
{
command.AppendLine(" * ");
}
StringBuilder _colstr = new StringBuilder();
for (var i = 0; i < table.Columns!.Length; i++)
{
if (cols!.Any(x => x == table.Columns[i].Key))
{
_colstr.Append($@"`{table.Columns[i].Key}`,");
}
}
if (_colstr[_colstr.Length - 1] == ',')
{
_colstr.Remove(_colstr.Length - 1, 1);
}
command.AppendLine(_colstr.ToString());
}
/// <summary>
/// 合并排序
/// </summary>
/// <param name="table">表</param>
/// <param name="order">排序</param>
/// <returns></returns>
private string MergeOrder(HDP_Table table, JToken? order, Dictionary<string, object> param)
{
string _guid = "";
StringBuilder _orderstr = new StringBuilder();
if (order != null && order.Count() > 0)
{
_orderstr.Append("ORDER BY ");
var _fields = order.Children().ToArray();
foreach (var field in _fields)
{
var _field = (JProperty)field;
if (table.Columns!.Any(x => x.Key == _field.Name))
{
var _column = table.Columns!.First(x => x.Key == _field.Name);
if (_field.Value.Type == JTokenType.String)
{
switch (_field.Value.ToString())
{
case "DESC":
_orderstr.Append($@"`{_column.Key!}` DESC,");
break;
default:
_orderstr.Append($@"`{_column.Key!}` ASC,");
break;
}
}
else if (_field.Value.Type == JTokenType.Array)
{
var _orderTemp = _field.Value.ToObject<object[]>();
_orderstr.Append(@$"CASE `{_column.Key!}`");
for (var i = 0; i < _orderTemp!.Length; i++)
{
_guid = Guid.NewGuid().ToString("N");
_orderstr.Append(@$" WHEN @{_guid} THEN {i}");
param.Add(_guid, _orderTemp[i]);
}
_orderstr.Append($@" ELSE {_orderTemp.Length} END,");
}
}
}
if (_orderstr.Length > 0 && _orderstr[_orderstr.Length - 1] == ',')
{
_orderstr.Remove(_orderstr.Length - 1, 1);
}
}
return _orderstr.ToString();
}
/// <summary>
/// 初始化云计算
/// </summary>
/// <param name="config">配置</param>
public void Init(Action<HDP_Source> config)
{
if (config == null)
{
throw new ArgumentNullException("配置无效");
}
config(_Source);
//参数校验
if (string.IsNullOrEmpty(_Source.Host))
{
throw new ArgumentException("主机无效");
}
if (string.IsNullOrEmpty(_Source.Key))
{
throw new ArgumentException("源无效");
}
if (string.IsNullOrEmpty(_Source.Account))
{
throw new ArgumentException("用户名无效");
}
if (string.IsNullOrEmpty(_Source.PassWord))
{
throw new ArgumentException("密码无效");
}
//初始化
if (_Source.Port == null)
{
_Source.Port = 3306;
}
if (string.IsNullOrEmpty(_Source.Id))
{
_Source.Id = "";
}
if (string.IsNullOrEmpty(_Source.Description))
{
_Source.Description = "云计算系统";
}
var _source = HDP_Table.Class2Table<HDP_Source>();
_Tables.Add(_source);
if (!InitStruct(_Source.Id, _source))
{
throw new Exception("初始化数据源失败");
}
var _table = HDP_Table.Class2Table<HDP_Table>();
_Tables.Add(_table);
if (!InitStruct(_Source.Id, _table))
{
throw new Exception("初始化数据表失败");
}
var _column = HDP_Table.Class2Table<HDP_Column>();
_Tables.Add(_column);
if (!InitStruct(_Source.Id, _column))
{
throw new Exception("初始化数据列失败");
}
}
/// <summary>
/// 获取源
/// </summary>
/// <param name="sourceid">数据源编号</param>
/// <returns>结果</returns>
public HDP_Source GetSource(string sourceid)
{
if (string.IsNullOrEmpty(sourceid) || _Source.Id == sourceid)
{
return _Source;
}
var _result = QuerySingle<HDP_Source>("", "HDP_Source", new JObject
{
{ "Id","=" }
}, new JObject {
{ "Id",sourceid}
}, null);
if (_result == null)
{
throw new ArgumentNullException("数据源不存在");
}
return _result;
}
/// <summary>
/// 获取表
/// </summary>
/// <param name="tableid">表编号</param>
/// <returns>结果</returns>
public HDP_Table GetTable(string tableid)
{
if (string.IsNullOrEmpty(tableid))
{
throw new ArgumentNullException("数据表编号无效");
}
if (_Tables.Any(x => x.Id == tableid))
{
return _Tables.First(x => x.Id == tableid);
}
var _result = QuerySingle<HDP_Table>("", "HDP_Table", new JObject
{
{ "Id","=" }
}, new JObject{
{ "Id",tableid}
}, null);
if (_result != null)
{
_result.Columns = Query<HDP_Column>("", "HDP_Column", new JObject {
{ "TableId","="}
}, new JObject{
{"TableId",_result.Id! }
}, null, null);
}
else
{
throw new ArgumentNullException("数据表不存在");
}
return _result;
}
/// <summary>
/// 同步结构
/// </summary>
/// <param name="sourceId">数据源</param>
/// <param name="table">数据表</param>
/// <returns>是否成功</returns>
public bool InitStruct(string sourceId, HDP_Table table)
{
var source = GetSource(sourceId);
//数据校验
if (string.IsNullOrEmpty(table.Key))
{
throw new ArgumentNullException("表键无效");
}
if (table.Columns == null || table.Columns.Length == 0)
{
throw new ArgumentNullException("列无效");
}
using (MySqlConnection _connection = new MySqlConnection(source.GetConnectString()))
{
try
{
_connection.Open();
}
catch
{
throw new ArgumentException("数据源连接失败");
}
StringBuilder _command = new StringBuilder();
if (DbExistTable(sourceId, table.Key))
{
var _dbColumns = DbGetColumns(sourceId, table.Key);
_command.AppendLine($@"ALTER TABLE `{table.Key}`");
_command.AppendLine("DROP PRIMARY KEY,");
//加载列
for (var i = 0; i < table.Columns.Length; i++)
{
StringBuilder _colstr = new StringBuilder();
var _column = table.Columns[i];
if (_dbColumns.Any(x => x.Key!.ToLower() == _column.Key!.ToLower()))
{
_colstr.Append($@"MODIFY `{_column.Key}`");
}
else
{
_colstr.Append($@"ADD `{_column.Key}`");
}
switch (_column.DataType)
{
case HDP_ColumnDataType.VARCHAR:
if (_column.Length == 0)
{
throw new ArgumentNullException(@$"列{_column.Key}长度无效");
}
_colstr.Append($@" {_column.DataType}({_column.Length})");
break;
case HDP_ColumnDataType.DECIMAL:
if (_column.Length == 0 || _column.DecimalLength == 0)
{
throw new ArgumentNullException(@$"列{_column.Key}长度无效");
}
_colstr.Append($@" {_column.DataType}({_column.Length},{_column.DecimalLength})");
break;
default:
_colstr.Append(@$" {_column.DataType}");
break;
}
if (!string.IsNullOrEmpty(_column.Description))
{
_colstr.Append($@" COMMENT '{_column.Description}'");
}
_colstr.Append(",");
_command.AppendLine(_colstr.ToString());
}
//加载主键
var _primarys = table.Columns.Where(x => x.IsPrimary == true).ToArray();
if (_primarys.Length > 0)
{
StringBuilder _primaryStr = new StringBuilder();
_primaryStr.Append("ADD PRIMARY KEY (");
for (var i = 0; i < _primarys.Length; i++)
{
var _primary = _primarys[i];
_primaryStr.Append($@"`{_primary.Key}`");
if (i < _primarys.Length - 1)
{
_primaryStr.Append(",");
}
}
_primaryStr.Append(")");
_command.AppendLine(_primaryStr.ToString());
}
if (!string.IsNullOrEmpty(table.Description))
{
_command.AppendLine(@$"COMMENT '{table.Description}'");
}
_connection.Execute(_command.ToString());
return true;
}
else
{
_command.AppendLine($@"CREATE TABLE `{table.Key}` (");
//加载列
for (int i = 0; i < table.Columns.Length; i++)
{
StringBuilder _colstr = new StringBuilder();
var _column = table.Columns[i];
_colstr.Append($@"`{_column.Key}`");
switch (_column.DataType)
{
case HDP_ColumnDataType.VARCHAR:
if (_column.Length == 0)
{
throw new ArgumentNullException(@$"列{_column.Key}长度无效");
}
_colstr.Append($@" {_column.DataType}({_column.Length})");
break;
case HDP_ColumnDataType.DECIMAL:
if (_column.Length == 0 || _column.DecimalLength == 0)
{
throw new ArgumentNullException(@$"列{_column.Key}长度无效");
}
_colstr.Append($@" {_column.DataType}({_column.Length},{_column.DecimalLength})");
break;
default:
_colstr.Append(@$" {_column.DataType}");
break;
}
if (!string.IsNullOrEmpty(_column.Description))
{
_colstr.Append($@" COMMENT '{_column.Description}'");
}
if (i < table.Columns.Length - 1)
{
_colstr.Append(",");
}
_command.AppendLine(_colstr.ToString());
}
//加载主键
var _primarys = table.Columns.Where(x => x.IsPrimary == true).ToArray();
if (_primarys.Length > 0)
{
_command.Append(",");
StringBuilder _primaryStr = new StringBuilder();
_primaryStr.Append("PRIMARY KEY (");
for (var i = 0; i < _primarys.Length; i++)
{
var _primary = _primarys[i];
_primaryStr.Append($@"`{_primary.Key}`");
if (i < _primarys.Length - 1)
{
_primaryStr.Append(",");
}
}
_primaryStr.Append(")");
_command.AppendLine(_primaryStr.ToString());
}
_command.AppendLine(")");
if (!string.IsNullOrEmpty(table.Description))
{
_command.AppendLine(@$"COMMENT '{table.Description}'");
}
_connection.Execute(_command.ToString());
}
_connection.Close();
return true;
}
}
/// <summary>
/// 删除结构
/// </summary>
/// <param name="sourceId">源</param>
/// <param name="tableId">表</param>
/// <returns>是否成功</returns>
public bool RemoveStruct(string sourceId, string tableId)
{
var source = GetSource(sourceId);
var table = GetTable(tableId);
using (MySqlConnection _connection = new MySqlConnection(source.GetConnectString()))
{
try
{
_connection.Open();
}
catch
{
throw new ArgumentException("数据源连接失败");
}
StringBuilder _command = new StringBuilder();
_command.AppendLine(@$"DROP TABLE {table.Key}");
_connection.Execute(_command.ToString());
_connection.Close();
return !DbExistTable(sourceId, table.Key!);
}
}
/// <summary>
/// 插入数据
/// </summary>
/// <param name="sourceId">数据源</param>
/// <param name="tableId">数据表</param>
/// <param name="data">数据</param>
/// <returns>是否成功</returns>
public bool Insert(string sourceId, string tableId, JToken? data)
{
var source = GetSource(sourceId)!;
var table = GetTable(tableId)!;
//数据校验
if (string.IsNullOrEmpty(table.Key))
{
throw new ArgumentNullException("表键无效");
}
if (table.Columns == null || table.Columns.Length == 0)
{
throw new ArgumentNullException("列无效");
}
if (data == null || data.Count() == 0)
{
throw new ArgumentNullException("数据无效");
}
List<JObject> _data = new List<JObject>();
if (data.Type == JTokenType.Object)
{
_data.Add((JObject)data);
}
else if (data.Type == JTokenType.Array)
{
_data = data.ToObject<List<JObject>>()!;
}
var _result = 0;
using (TransactionScope _scope = new TransactionScope())
{
using (MySqlConnection _connection = new MySqlConnection(source.GetConnectString()))
{
try
{
_connection.Open();
}
catch
{
throw new ArgumentException("数据源连接失败");
}
for (var j = 0; j < _data.Count; j++)
{
var _row = _data[j];
StringBuilder _command = new StringBuilder();
//主键检查
var _primarys = table.Columns.Where(x => x.IsPrimary == true).ToArray();
for (var i = 0; i < _primarys.Length; i++)
{
var _primary = _primarys[i];
if (!string.IsNullOrEmpty(_primary.InsertDefault) || _row.ContainsKey(_primary.Key!))
{
continue;
}
else
{
throw new ArgumentException($@"主键{_primary.Key}值无效");
}
}
//插入命令
_command.AppendLine($@"INSERT INTO `{table.Key}` (");
StringBuilder _colstr = new StringBuilder();
StringBuilder _parmstr = new StringBuilder();
Dictionary<string, object> _params = new Dictionary<string, object>();
for (var i = 0; i < table.Columns.Length; i++)
{
var _column = table.Columns[i];
if (_row.ContainsKey(_column.Key!))
{
_colstr.Append($@"`{_column.Key!}`,");
_parmstr.Append($@"@{_column.Key},");
_params.Add(_column.Key!, ((JValue)_row[_column.Key!]!).Value!);
}
else if (!string.IsNullOrEmpty(_column.InsertDefault))
{
_colstr.Append($@"`{_column.Key!}`,");
_parmstr.Append($@"@{_column.Key},");
_params.Add(_column.Key!, HDP_CommandAction.Convert(_column.InsertDefault, _row));
}
}
if (_colstr[_colstr.Length - 1] == ',')
{
_colstr.Remove(_colstr.Length - 1, 1);
}
if (_parmstr[_parmstr.Length - 1] == ',')
{
_parmstr.Remove(_parmstr.Length - 1, 1);
}
_command.AppendLine(_colstr.ToString());
_command.AppendLine(") VALUES (");
_command.AppendLine(_parmstr.ToString());
_command.AppendLine(")");
_result += _connection.Execute(_command.ToString(), _params);
}
_connection.Close();
if (_result >= _data.Count)
{
_scope.Complete();
return true;
}
else
{
return false;
}
}
}
}
/// <summary>
/// 更新
/// </summary>
/// <param name="source">数据源</param>
/// <param name="table">数据表</param>
/// <param name="where">条件</param>
/// <param name="data">数据</param>
/// <returns>是否成功</returns>
public bool Update(string sourceId, string tableId, JToken? where, JToken? data)
{
var source = GetSource(sourceId);
var table = GetTable(tableId);
//数据校验
if (string.IsNullOrEmpty(table.Key))
{
throw new ArgumentNullException("表键无效");
}
if (table.Columns == null || table.Columns.Length == 0)
{
throw new ArgumentNullException("列无效");
}
if (where == null)
{
throw new ArgumentNullException("条件无效");
}
List<JObject> _where = new List<JObject>();
if (where.Type == JTokenType.Object)
{
_where.Add((JObject)where);
}
else if (where.Type == JTokenType.Array)
{
_where.AddRange(((JArray)where).ToObject<JObject[]>()!);
}
if (data == null || data.Count() == 0)
{
throw new ArgumentNullException("数据无效");
}
List<JObject> _data = new List<JObject>();
if (data.Type == JTokenType.Object)
{
_data.Add((JObject)data);
}
else if (data.Type == JTokenType.Array)
{
_data = data.ToObject<List<JObject>>()!;
}
var _result = 0;
using (TransactionScope _scope = new TransactionScope())
{
using (MySqlConnection _connection = new MySqlConnection(source.GetConnectString()))
{
try
{
_connection.Open();
}
catch
{
throw new ArgumentException("数据源连接失败");
}
for (var j = 0; j < _data.Count; j++)
{
var _row = _data[j];
//更新命令
StringBuilder _command = new StringBuilder();
Dictionary<string, object> _params = new Dictionary<string, object>();
_command.AppendLine(@$"UPDATE `{table.Key}` SET ");
//更新列
StringBuilder _colstr = new StringBuilder();
for (var i = 0; i < table.Columns.Length; i++)
{
var _column = table.Columns[i];
if (_row.ContainsKey(_column.Key!) && !_where.Any(x => x.ContainsKey(_column.Key!)))
{
_colstr.Append(@$"`{_column.Key!}`={HDP_CommandAction.ColConvert(_column, _params, ((JValue)_row[_column.Key!]!).Value!)},");
}
else if (!string.IsNullOrEmpty(_column.UpdateDefault))
{
_colstr.Append($@"`{_column.Key!}`=@{_column.Key!},");
_params.Add(_column.Key!, HDP_CommandAction.Convert(_column.UpdateDefault, _row));
}
}
if (_colstr[_colstr.Length - 1] == ',')
{
_colstr.Remove(_colstr.Length - 1, 1);
}
_command.AppendLine(_colstr.ToString());
//执行条件
_command.AppendLine(MergeWhere(table, where, _row, _params));
_result += _connection.Execute(_command.ToString(), _params);
}
_connection.Close();
if (_result >= _data.Count)
{
_scope.Complete();
return true;
}
else
{
return false;
}
}
}
}
/// <summary>
/// 删除
/// </summary>
/// <param name="sourceId">数据源</param>
/// <param name="tableId">数据表</param>
/// <param name="where">条件</param>
/// <param name="row">数据</param>
/// <returns>是否成功</returns>
public bool Delete(string sourceId, string tableId, JToken? where, JToken? data)
{
var source = GetSource(sourceId);
var table = GetTable(tableId);
//数据校验
if (string.IsNullOrEmpty(table.Key))
{
throw new ArgumentNullException("表键无效");
}
if (table.Columns == null || table.Columns.Length == 0)
{
throw new ArgumentNullException("列无效");
}
if (where == null)
{
throw new ArgumentNullException("条件无效");
}
if (data == null || data.Count() == 0)
{
throw new ArgumentNullException("数据无效");
}
List<JObject> _data = new List<JObject>();
if (data.Type == JTokenType.Object)
{
_data.Add((JObject)data);
}
else if (data.Type == JTokenType.Array)
{
_data = data.ToObject<List<JObject>>()!;
}
var _result = 0;
using (TransactionScope _scope = new TransactionScope())
{
using (MySqlConnection _connection = new MySqlConnection(source.GetConnectString()))
{
try
{
_connection.Open();
}
catch
{
throw new ArgumentException("数据源连接失败");
}
for (var j = 0; j < _data.Count; j++)
{
var _row = _data[j];
//删除命令
StringBuilder _command = new StringBuilder();
Dictionary<string, object> _params = new Dictionary<string, object>();
_command.AppendLine(@$"DELETE FROM `{table.Key}`");
_command.AppendLine(MergeWhere(table, where, _row, _params));
_result += _connection.Execute(_command.ToString(), _params);
}
_connection.Close();
if (_result >= _data.Count)
{
_scope.Complete();
return true;
}
else
{
return false;
}
}
}
}
/// <summary>
/// 查询单个
/// </summary>
/// <param name="sourceId">数据源</param>
/// <param name="tableId">数据表</param>
/// <param name="where">条件</param>
/// <param name="data">数据</param>
/// <returns>结果</returns>
public T? QuerySingle<T>(string sourceId, string tableId, JToken? where, JToken? data,
string[]? col)
{
var source = GetSource(sourceId);
var table = GetTable(tableId);
//数据校验
if (string.IsNullOrEmpty(table.Key))
{
throw new ArgumentNullException("表键无效");
}
if (table.Columns == null || table.Columns.Length == 0)
{
throw new ArgumentNullException("列无效");
}
if (where == null)
{
throw new ArgumentNullException("条件无效");
}
if (data == null || data.Count() == 0)
{
throw new ArgumentNullException("数据无效");
}
using (MySqlConnection _connection = new MySqlConnection(source.GetConnectString()))
{
try
{
_connection.Open();
}
catch
{
throw new ArgumentException("数据源连接失败");
}
//查询命令
StringBuilder _command = new StringBuilder();
Dictionary<string, object> _params = new Dictionary<string, object>();
if (col == null || col.Length == 0)
{
_command.AppendLine(@$"SELECT * FROM `{table.Key}`");
}
else
{
_command.AppendLine("SELECT");
MergeCols(table, col, _command);
_command.AppendLine($@"FROM `{table.Key}`");
}
//执行条件
_command.AppendLine(MergeWhere(table, where, data, _params));
var _result = _connection.Query<T>(_command.ToString(), _params).ToArray();
_connection.Close();
if (_result.Length > 0)
{
return _result[0];
}
else
{
return default(T);
}
}
}
/// <summary>
/// 查询列表
/// </summary>
/// <typeparam name="T">返回类型</typeparam>
/// <param name="source">数据源</param>
/// <param name="table">数据表</param>
/// <param name="where">条件</param>
/// <param name="data">数据</param>
/// <param name="order">排序</param>
/// <param name="col">筛选返回字段</param>
/// <returns>结果集</returns>
public T[] Query<T>(string sourceId, string tableId, JToken? where, JToken? data,
JToken? order, string[]? col)
{
var source = GetSource(sourceId);
var table = GetTable(tableId);
Dictionary<string, object> _params = new Dictionary<string, object>();
using (MySqlConnection _connection = new MySqlConnection(source.GetConnectString()))
{
try
{
_connection.Open();
}
catch
{
throw new ArgumentException("数据源连接失败");
}
var _result = _connection.Query<T>(QueryString(sourceId, tableId, where, data, order, col, _params), _params);
_connection.Close();
return _result.ToArray();
}
}
public string QueryString(string sourceId, string tableId, JToken? where, JToken? data,
JToken? order, string[]? col, Dictionary<string, object> param)
{
var source = GetSource(sourceId);
var table = GetTable(tableId);
//数据校验
if (string.IsNullOrEmpty(table.Key))
{
throw new ArgumentNullException("表键无效");
}
if (table.Columns == null || table.Columns.Length == 0)
{
throw new ArgumentNullException("列无效");
}
//查询命令
StringBuilder _command = new StringBuilder();
if (col == null || col.Length == 0)
{
_command.AppendLine(@$"SELECT * FROM `{table.Key}`");
}
else
{
_command.AppendLine("SELECT");
MergeCols(table, col, _command);
_command.AppendLine($@"FROM `{table.Key}`");
}
//执行条件
_command.AppendLine(MergeWhere(table, where, data, param));
//执行排序
_command.AppendLine(MergeOrder(table, order, param));
return _command.ToString();
}
/// <summary>
/// 查询分页列表
/// </summary>
/// <typeparam name="T">返回类型</typeparam>
/// <param name="sourceId">数据源</param>
/// <param name="tableId">数据表</param>
/// <param name="pageIndex">分页下标</param>
/// <param name="pageSize">分页大小</param>
/// <param name="where">条件</param>
/// <param name="data">数据</param>
/// <param name="order">排序</param>
/// <param name="col">返回咧</param>
/// <returns>结果集</returns>
public HDP_Page<T> QueryPage<T>(string sourceId, string tableId, int pageIndex, int pageSize, JToken? where,
JToken? data, JToken? order, string[]? col)
{
var source = GetSource(sourceId);
var table = GetTable(tableId);
//数据校验
if (string.IsNullOrEmpty(table.Key))
{
throw new ArgumentNullException("表键无效");
}
if (table.Columns == null || table.Columns.Length == 0)
{
throw new ArgumentNullException("列无效");
}
using (MySqlConnection _connection = new MySqlConnection(source.GetConnectString()))
{
try
{
_connection.Open();
}
catch
{
throw new ArgumentException("数据源连接失败");
}
//查询命令
StringBuilder _command = new StringBuilder();
Dictionary<string, object> _params = new Dictionary<string, object>();
if (col == null || col.Length == 0)
{
_command.AppendLine(@$"SELECT * FROM `{table.Key}`");
}
else
{
_command.AppendLine("SELECT");
MergeCols(table, col, _command);
_command.AppendLine($@"FROM `{table.Key}`");
}
//执行条件
_command.AppendLine(MergeWhere(table, where, data, _params));
//执行排序
_command.AppendLine(MergeOrder(table, order, _params));
var _result = new HDP_Page<T>();
_result.PageIndex = pageIndex;
_result.PageSize = pageSize;
StringBuilder _totalstr = new StringBuilder();
_totalstr.AppendLine($@"SELECT COUNT(0) FROM (");
_totalstr.AppendLine(_command.ToString());
_totalstr.AppendLine(") AS Temp");
_result.Total = _connection.QuerySingle<int>(_totalstr.ToString(), _params);
StringBuilder _pagestr = new StringBuilder();
_pagestr.AppendLine($@"SELECT * FROM (");
_pagestr.AppendLine(_command.ToString());
_pagestr.AppendLine(") AS Temp");
_pagestr.AppendLine(@$"LIMIT {(_result.PageIndex - 1) * _result.PageSize},{_result.PageSize}");
_result.Data = _connection.Query<T>(_pagestr.ToString(), _params).ToArray();
_connection.Close();
return _result;
}
}
/// <summary>
/// 判断数据源是否存在表
/// </summary>
/// <param name="sourceId">数据源</param>
/// <param name="tableName">表名</param>
/// <returns>是否存在</returns>
public bool DbExistTable(string sourceId, string tableName)
{
var source = GetSource(sourceId);
using (MySqlConnection _connection = new MySqlConnection(source.GetConnectString()))
{
try
{
_connection.Open();
}
catch
{
throw new ArgumentException("数据源连接失败");
}
StringBuilder _command = new StringBuilder();
_command.AppendLine($@"SELECT COUNT(0) FROM INFORMATION_SCHEMA.TABLES WHERE `TABLE_TYPE`='BASE TABLE' AND `TABLE_SCHEMA`=@SourceKey AND `TABLE_NAME` =@TableKey");
int _result = _connection.QuerySingle<int>(_command.ToString(), new { SourceKey = source.Key, TableKey = tableName });
_connection.Close();
if (_result > 0)
{
return true;
}
else
{
return false;
}
}
}
/// <summary>
/// 查询数据列
/// </summary>
/// <param name="sourceId">数据源</param>
/// <param name="tableId">表</param>
/// <returns>列</returns>
public HDP_Column[] DbGetColumns(string sourceId, string tableId)
{
var source = GetSource(sourceId);
var table = GetTable(tableId);
using (MySqlConnection _connection = new MySqlConnection(source.GetConnectString()))
{
var _result = new List<HDP_Column>();
try
{
_connection.Open();
}
catch
{
throw new ArgumentException("数据源连接失败");
}
StringBuilder _command = new StringBuilder();
_command.AppendLine($@"SELECT `COLUMN_NAME` AS `Key`,`DATA_TYPE` AS `DataType`,`COLUMN_COMMENT` AS `Description` FROM INFORMATION_SCHEMA.COLUMNS
WHERE `TABLE_SCHEMA`=@SourceKey AND `TABLE_NAME`=@TableKey");
_result.AddRange(_connection.Query<HDP_Column>(_command.ToString(), new { SourceKey = source.Key, TableKey = table.Key }));
_connection.Close();
return _result.ToArray();
}
}
}
}