using System; using System.Collections.Generic; using System.Linq; using System.Security.Policy; using System.Text; using System.Threading.Tasks; using MySql.Data.MySqlClient; using Dapper; using ZKLT.Hadoop.Interface; using ZKLT.Hadoop.Model; using MySqlX.XDevAPI.Relational; using Mysqlx.Crud; using Newtonsoft.Json; using Mysqlx.Resultset; using Newtonsoft.Json.Linq; using System.Transactions; namespace ZKLT.Hadoop { public class TableService : ITableService { /// /// 合并条件 /// /// 表 /// 条件 /// 数据 /// 参数 private string MergeWhere(HDP_Table table, JContainer? where, JContainer? data, Dictionary param) { StringBuilder _wherestr = new StringBuilder(); string _guid = ""; if (where != null && data != null) { _wherestr.AppendLine("WHERE"); var _wheres = new List(); var _datas = new List(); if (where.Type == JTokenType.Object) { _wheres.Add((JObject)where); _datas.Add((JObject)data); } else if (where.Type == JTokenType.Array) { for(var i = 0;i < where.Count;i++) { _wheres.Add((JObject)where.Children().ToArray()[i]); _datas.Add((JObject)data.Children().ToArray()[i]); } } for (var j = 0; j < _wheres.Count; j++) { if (j > 0) { _wherestr.AppendLine("OR"); } _wherestr.AppendLine("("); var _where = _wheres[j]; var _data = _datas[j]; if (_where.Count > 0) { _wherestr.AppendLine("1 = 1"); var _fileds = _where.Children().ToArray(); for (var i = 0; i < _fileds.Length; i++) { var _item = (JProperty)_fileds[i]; if (table.Columns!.Any(x => x.Key == _item.Name) && _data.ContainsKey(_item.Name)) { if (_item.Value.Type == JTokenType.String) { _guid = Guid.NewGuid().ToString("N"); _wherestr.AppendLine(@$"AND `{_item.Name}` {_item.Value.ToString()} @{_guid}"); param.Add(_guid, ((JValue)_data[_item.Name]!).Value!); } else if (_item.Value.Type == JTokenType.Array) { string[] _itemv = _item.Value.ToObject()!; object[] _colv = _data[_item.Name]!.ToObject()!; for (var k = 0; k < _itemv.Length; k++) { _guid = Guid.NewGuid().ToString("N"); _wherestr.AppendLine(@$"AND `{_item.Name}` {_itemv[k]} @{_guid}"); param.Add(_guid, _colv.Length > k ? _colv[k] : _colv[_colv.Length - 1]); } } } } } _wherestr.AppendLine(")"); } } return _wherestr.ToString(); } /// /// 合并列 /// /// 表 /// 列 /// 命令 private void MergeCols(HDP_Table table, string[]? cols, StringBuilder command) { if (cols == null || cols.Length == 0) { command.AppendLine(" * "); } StringBuilder _colstr = new StringBuilder(); for (var i = 0; i < table.Columns!.Length; i++) { if (cols!.Any(x => x == table.Columns[i].Key)) { _colstr.Append($@"`{table.Columns[i].Key}`,"); } } if (_colstr[_colstr.Length - 1] == ',') { _colstr.Remove(_colstr.Length - 1, 1); } command.AppendLine(_colstr.ToString()); } /// /// 合并排序 /// /// 表 /// 排序 /// private string MergeOrder(HDP_Table table, JContainer? order, Dictionary param) { string _guid = ""; StringBuilder _orderstr = new StringBuilder(); if (order != null && order.Count > 0) { _orderstr.Append("ORDER BY "); var _fields = order.Children().ToArray(); foreach (var field in _fields) { var _field = (JProperty)field; if (table.Columns!.Any(x => x.Key == _field.Name)) { var _column = table.Columns!.First(x => x.Key == _field.Name); if (_field.Value.Type == JTokenType.String) { switch (_field.Value.ToString()) { case "DESC": _orderstr.Append($@"`{_column.Key!}` DESC,"); break; default: _orderstr.Append($@"`{_column.Key!}` ASC,"); break; } } else if (_field.Value.Type == JTokenType.Array) { var _orderTemp = _field.Value.ToObject(); _orderstr.Append(@$"CASE `{_column.Key!}`"); for (var i = 0; i < _orderTemp!.Length; i++) { _guid = Guid.NewGuid().ToString("N"); _orderstr.Append(@$" WHEN @{_guid} THEN {i}"); param.Add(_guid, _orderTemp[i]); } _orderstr.Append($@" ELSE {_orderTemp.Length} END,"); } } } if (_orderstr.Length > 0 && _orderstr[_orderstr.Length - 1] == ',') { _orderstr.Remove(_orderstr.Length - 1, 1); } } return _orderstr.ToString(); } /// /// 同步结构 /// /// 数据源 /// 数据表 /// 是否成功 public bool InitStruct(HDP_Source source, HDP_Table table) { //数据校验 if (string.IsNullOrEmpty(table.Key)) { throw new ArgumentNullException("表键无效"); } if (table.Columns == null || table.Columns.Length == 0) { throw new ArgumentNullException("列无效"); } using (MySqlConnection _connection = new MySqlConnection(source.GetConnectString())) { try { _connection.Open(); } catch { throw new ArgumentException("数据源连接失败"); } StringBuilder _command = new StringBuilder(); if (DbExistTable(source, table.Key)) { var _dbColumns = DbGetColumns(source, table.Key); _command.AppendLine($@"ALTER TABLE `{table.Key}`"); _command.AppendLine("DROP PRIMARY KEY,"); //加载列 for (var i = 0; i < table.Columns.Length; i++) { StringBuilder _colstr = new StringBuilder(); var _column = table.Columns[i]; if (_dbColumns.Any(x => x.Key!.ToLower() == _column.Key!.ToLower())) { _colstr.Append($@"MODIFY `{_column.Key}`"); } else { _colstr.Append($@"ADD `{_column.Key}`"); } switch (_column.DataType) { case HDP_ColumnDataType.VARCHAR: if (_column.Length == 0) { throw new ArgumentNullException(@$"列{_column.Key}长度无效"); } _colstr.Append($@" {_column.DataType}({_column.Length})"); break; case HDP_ColumnDataType.DECIMAL: if (_column.Length == 0 || _column.DecimalLength == 0) { throw new ArgumentNullException(@$"列{_column.Key}长度无效"); } _colstr.Append($@" {_column.DataType}({_column.Length},{_column.DecimalLength})"); break; default: _colstr.Append(@$" {_column.DataType}"); break; } if (!string.IsNullOrEmpty(_column.Description)) { _colstr.Append($@" COMMENT '{_column.Description}'"); } _colstr.Append(","); _command.AppendLine(_colstr.ToString()); } //加载主键 var _primarys = table.Columns.Where(x => x.IsPrimary == true).ToArray(); if (_primarys.Length > 0) { StringBuilder _primaryStr = new StringBuilder(); _primaryStr.Append("ADD PRIMARY KEY ("); for (var i = 0; i < _primarys.Length; i++) { var _primary = _primarys[i]; _primaryStr.Append($@"`{_primary.Key}`"); if (i < _primarys.Length - 1) { _primaryStr.Append(","); } } _primaryStr.Append(")"); _command.AppendLine(_primaryStr.ToString()); } if (!string.IsNullOrEmpty(table.Description)) { _command.AppendLine(@$"COMMENT '{table.Description}'"); } _connection.Execute(_command.ToString()); return true; } else { _command.AppendLine($@"CREATE TABLE `{table.Key}` ("); //加载列 for (int i = 0; i < table.Columns.Length; i++) { StringBuilder _colstr = new StringBuilder(); var _column = table.Columns[i]; _colstr.Append($@"`{_column.Key}`"); switch (_column.DataType) { case HDP_ColumnDataType.VARCHAR: if (_column.Length == 0) { throw new ArgumentNullException(@$"列{_column.Key}长度无效"); } _colstr.Append($@" {_column.DataType}({_column.Length})"); break; case HDP_ColumnDataType.DECIMAL: if (_column.Length == 0 || _column.DecimalLength == 0) { throw new ArgumentNullException(@$"列{_column.Key}长度无效"); } _colstr.Append($@" {_column.DataType}({_column.Length},{_column.DecimalLength})"); break; default: _colstr.Append(@$" {_column.DataType}"); break; } if (!string.IsNullOrEmpty(_column.Description)) { _colstr.Append($@" COMMENT '{_column.Description}'"); } if (i < table.Columns.Length - 1) { _colstr.Append(","); } _command.AppendLine(_colstr.ToString()); } //加载主键 var _primarys = table.Columns.Where(x => x.IsPrimary == true).ToArray(); if (_primarys.Length > 0) { _command.Append(","); StringBuilder _primaryStr = new StringBuilder(); _primaryStr.Append("PRIMARY KEY ("); for (var i = 0; i < _primarys.Length; i++) { var _primary = _primarys[i]; _primaryStr.Append($@"`{_primary.Key}`"); if (i < _primarys.Length - 1) { _primaryStr.Append(","); } } _primaryStr.Append(")"); _command.AppendLine(_primaryStr.ToString()); } _command.AppendLine(")"); if (!string.IsNullOrEmpty(table.Description)) { _command.AppendLine(@$"COMMENT '{table.Description}'"); } _connection.Execute(_command.ToString()); } _connection.Close(); return true; } } /// /// 删除结构 /// /// 源 /// 表 /// 是否成功 public bool RemoveStruct(HDP_Source source, string tableKey) { //数据校验 if (string.IsNullOrEmpty(tableKey)) { throw new ArgumentNullException("表键无效"); } if (!DbExistTable(source, tableKey)) { throw new ArgumentNullException("表不存在"); } using (MySqlConnection _connection = new MySqlConnection(source.GetConnectString())) { try { _connection.Open(); } catch { throw new ArgumentException("数据源连接失败"); } StringBuilder _command = new StringBuilder(); _command.AppendLine(@$"DROP TABLE {tableKey}"); _connection.Execute(_command.ToString()); _connection.Close(); return !DbExistTable(source, tableKey); } } /// /// 插入数据 /// /// 数据源 /// 数据表 /// 数据 /// 是否成功 public bool Insert(HDP_Source source, HDP_Table table, JContainer? data) { //数据校验 if (string.IsNullOrEmpty(table.Key)) { throw new ArgumentNullException("表键无效"); } if (table.Columns == null || table.Columns.Length == 0) { throw new ArgumentNullException("列无效"); } if (data == null || data.Count == 0) { throw new ArgumentNullException("数据无效"); } List _data = new List(); if (data.Type == JTokenType.Object) { _data.Add((JObject)data); } else if (data.Type == JTokenType.Array) { _data = data.ToObject>()!; } var _result = 0; using (TransactionScope _scope = new TransactionScope()) { using (MySqlConnection _connection = new MySqlConnection(source.GetConnectString())) { try { _connection.Open(); } catch { throw new ArgumentException("数据源连接失败"); } for (var j = 0; j < _data.Count; j++) { var _row = _data[j]; StringBuilder _command = new StringBuilder(); //主键检查 var _primarys = table.Columns.Where(x => x.IsPrimary == true).ToArray(); for (var i = 0; i < _primarys.Length; i++) { var _primary = _primarys[i]; if (!string.IsNullOrEmpty(_primary.InsertDefault) || _row.ContainsKey(_primary.Key!)) { continue; } else { throw new ArgumentException($@"主键{_primary.Key}值无效"); } } //插入命令 _command.AppendLine($@"INSERT INTO `{table.Key}` ("); StringBuilder _colstr = new StringBuilder(); StringBuilder _parmstr = new StringBuilder(); Dictionary _params = new Dictionary(); for (var i = 0; i < table.Columns.Length; i++) { var _column = table.Columns[i]; if (_row.ContainsKey(_column.Key!)) { _colstr.Append($@"`{_column.Key!}`,"); _parmstr.Append($@"@{_column.Key},"); _params.Add(_column.Key!, ((JValue)_row[_column.Key!]!).Value!); } else if (!string.IsNullOrEmpty(_column.InsertDefault)) { _colstr.Append($@"`{_column.Key!}`,"); _parmstr.Append($@"@{_column.Key},"); _params.Add(_column.Key!, HDP_CommandAction.Convert(_column.InsertDefault, _row)); } } if (_colstr[_colstr.Length - 1] == ',') { _colstr.Remove(_colstr.Length - 1, 1); } if (_parmstr[_parmstr.Length - 1] == ',') { _parmstr.Remove(_parmstr.Length - 1, 1); } _command.AppendLine(_colstr.ToString()); _command.AppendLine(") VALUES ("); _command.AppendLine(_parmstr.ToString()); _command.AppendLine(")"); _result += _connection.Execute(_command.ToString(), _params); } _connection.Close(); if (_result >= _data.Count) { _scope.Complete(); return true; } else { return false; } } } } /// /// 更新 /// /// 数据源 /// 数据表 /// 条件 /// 数据 /// 是否成功 public bool Update(HDP_Source source, HDP_Table table, JContainer? where, JContainer? data) { //数据校验 if (string.IsNullOrEmpty(table.Key)) { throw new ArgumentNullException("表键无效"); } if (table.Columns == null || table.Columns.Length == 0) { throw new ArgumentNullException("列无效"); } if (where == null) { throw new ArgumentNullException("条件无效"); } if (data == null || data.Count == 0) { throw new ArgumentNullException("数据无效"); } List _data = new List(); if (data.Type == JTokenType.Object) { _data.Add((JObject)data); } else if (data.Type == JTokenType.Array) { _data = data.ToObject>()!; } var _result = 0; using (TransactionScope _scope = new TransactionScope()) { using (MySqlConnection _connection = new MySqlConnection(source.GetConnectString())) { try { _connection.Open(); } catch { throw new ArgumentException("数据源连接失败"); } for (var j = 0; j < _data.Count; j++) { var _row = _data[j]; //更新命令 StringBuilder _command = new StringBuilder(); Dictionary _params = new Dictionary(); _command.AppendLine(@$"UPDATE `{table.Key}` SET "); //更新列 StringBuilder _colstr = new StringBuilder(); for (var i = 0; i < table.Columns.Length; i++) { var _column = table.Columns[i]; if (_row.ContainsKey(_column.Key!)) { _colstr.Append($@"`{_column.Key!}`=@{_column.Key!},"); _params.Add(_column.Key!, ((JValue)_row[_column.Key!]!).Value!); } else if (!string.IsNullOrEmpty(_column.UpdateDefault)) { _colstr.Append($@"`{_column.Key!}`=@{_column.Key!},"); _params.Add(_column.Key!, HDP_CommandAction.Convert(_column.UpdateDefault, _row)); } } if (_colstr[_colstr.Length - 1] == ',') { _colstr.Remove(_colstr.Length - 1, 1); } _command.AppendLine(_colstr.ToString()); //执行条件 _command.AppendLine(MergeWhere(table, where, _row, _params)); _result += _connection.Execute(_command.ToString(), _params); } _connection.Close(); if (_result >= _data.Count) { _scope.Complete(); return true; } else { return false; } } } } /// /// 删除 /// /// 数据源 /// 数据表 /// 条件 /// 数据 /// 是否成功 public bool Delete(HDP_Source source, HDP_Table table, JContainer? where, JContainer? data) { //数据校验 if (string.IsNullOrEmpty(table.Key)) { throw new ArgumentNullException("表键无效"); } if (table.Columns == null || table.Columns.Length == 0) { throw new ArgumentNullException("列无效"); } if (where == null) { throw new ArgumentNullException("条件无效"); } if (data == null || data.Count == 0) { throw new ArgumentNullException("数据无效"); } List _data = new List(); if (data.Type == JTokenType.Object) { _data.Add((JObject)data); } else if (data.Type == JTokenType.Array) { _data = data.ToObject>()!; } var _result = 0; using (TransactionScope _scope = new TransactionScope()) { using (MySqlConnection _connection = new MySqlConnection(source.GetConnectString())) { try { _connection.Open(); } catch { throw new ArgumentException("数据源连接失败"); } for (var j = 0; j < _data.Count; j++) { var _row = _data[j]; //删除命令 StringBuilder _command = new StringBuilder(); Dictionary _params = new Dictionary(); _command.AppendLine(@$"DELETE FROM `{table.Key}`"); _command.AppendLine(MergeWhere(table, where, _row, _params)); _result += _connection.Execute(_command.ToString(), _params); } _connection.Close(); if (_result >= _data.Count) { _scope.Complete(); return true; } else { return false; } } } } /// /// 查询单个 /// /// 数据源 /// 数据表 /// 条件 /// 数据 /// 结果 public T? QuerySingle(HDP_Source source, HDP_Table table, JContainer? where, JContainer? data, string[]? col) { //数据校验 if (string.IsNullOrEmpty(table.Key)) { throw new ArgumentNullException("表键无效"); } if (table.Columns == null || table.Columns.Length == 0) { throw new ArgumentNullException("列无效"); } if (where == null) { throw new ArgumentNullException("条件无效"); } if (data == null || data.Count == 0) { throw new ArgumentNullException("数据无效"); } using (MySqlConnection _connection = new MySqlConnection(source.GetConnectString())) { try { _connection.Open(); } catch { throw new ArgumentException("数据源连接失败"); } //查询命令 StringBuilder _command = new StringBuilder(); Dictionary _params = new Dictionary(); if (col == null || col.Length == 0) { _command.AppendLine(@$"SELECT * FROM `{table.Key}`"); } else { _command.AppendLine("SELECT"); MergeCols(table, col, _command); _command.AppendLine($@"FROM `{table.Key}`"); } //执行条件 _command.AppendLine(MergeWhere(table, where, data, _params)); var _result = _connection.Query(_command.ToString(), _params).ToArray(); _connection.Close(); if (_result.Length > 0) { return _result[0]; } else { return default(T); } } } /// /// 查询列表 /// /// 返回类型 /// 数据源 /// 数据表 /// 条件 /// 数据 /// 结果集 public T[] Query(HDP_Source source, HDP_Table table, JContainer? where, JContainer? data, JContainer? order, string[]? col) { //数据校验 if (string.IsNullOrEmpty(table.Key)) { throw new ArgumentNullException("表键无效"); } if (table.Columns == null || table.Columns.Length == 0) { throw new ArgumentNullException("列无效"); } using (MySqlConnection _connection = new MySqlConnection(source.GetConnectString())) { try { _connection.Open(); } catch { throw new ArgumentException("数据源连接失败"); } //查询命令 StringBuilder _command = new StringBuilder(); Dictionary _params = new Dictionary(); if (col == null || col.Length == 0) { _command.AppendLine(@$"SELECT * FROM `{table.Key}`"); } else { _command.AppendLine("SELECT"); MergeCols(table, col, _command); _command.AppendLine($@"FROM `{table.Key}`"); } //执行条件 _command.AppendLine(MergeWhere(table, where, data, _params)); //执行排序 _command.AppendLine(MergeOrder(table, order, _params)); var _result = _connection.Query(_command.ToString(), _params); _connection.Close(); return _result.ToArray(); } } /// /// 查询列表 /// /// 返回类型 /// 数据源 /// 数据表 /// 条件 /// 数据 /// 结果集 public HDP_Page QueryPage(HDP_Source source, HDP_Table table, int pageIndex, int pageSize, JContainer? where, JContainer? data, JContainer? order, string[]? col) { //数据校验 if (string.IsNullOrEmpty(table.Key)) { throw new ArgumentNullException("表键无效"); } if (table.Columns == null || table.Columns.Length == 0) { throw new ArgumentNullException("列无效"); } using (MySqlConnection _connection = new MySqlConnection(source.GetConnectString())) { try { _connection.Open(); } catch { throw new ArgumentException("数据源连接失败"); } //查询命令 StringBuilder _command = new StringBuilder(); Dictionary _params = new Dictionary(); if (col == null || col.Length == 0) { _command.AppendLine(@$"SELECT * FROM `{table.Key}`"); } else { _command.AppendLine("SELECT"); MergeCols(table, col, _command); _command.AppendLine($@"FROM `{table.Key}`"); } //执行条件 _command.AppendLine(MergeWhere(table, where, data, _params)); //执行排序 _command.AppendLine(MergeOrder(table, order, _params)); var _result = new HDP_Page(); _result.PageIndex = pageIndex; _result.PageSize = pageSize; StringBuilder _totalstr = new StringBuilder(); _totalstr.AppendLine($@"SELECT COUNT(0) FROM ("); _totalstr.AppendLine(_command.ToString()); _totalstr.AppendLine(") AS Temp"); _result.Total = _connection.QuerySingle(_totalstr.ToString(), _params); StringBuilder _pagestr = new StringBuilder(); _pagestr.AppendLine($@"SELECT * FROM ("); _pagestr.AppendLine(_command.ToString()); _pagestr.AppendLine(") AS Temp"); _pagestr.AppendLine(@$"LIMIT {(_result.PageIndex - 1) * _result.PageSize},{_result.PageSize}"); _result.Data = _connection.Query(_pagestr.ToString(), _params).ToArray(); _connection.Close(); return _result; } } /// /// 判断数据源是否存在表 /// /// 数据源 /// 表名 /// 是否存在 public bool DbExistTable(HDP_Source source, string tableName) { using (MySqlConnection _connection = new MySqlConnection(source.GetConnectString())) { try { _connection.Open(); } catch { throw new ArgumentException("数据源连接失败"); } StringBuilder _command = new StringBuilder(); _command.AppendLine($@"SELECT COUNT(0) FROM INFORMATION_SCHEMA.TABLES WHERE `TABLE_TYPE`='BASE TABLE' AND `TABLE_SCHEMA`=@SourceKey AND `TABLE_NAME` =@TableKey"); int _result = _connection.QuerySingle(_command.ToString(), new { SourceKey = source.Key, TableKey = tableName }); _connection.Close(); if (_result > 0) { return true; } else { return false; } } } /// /// 查询数据列 /// /// 数据源 /// 表名 /// public HDP_Column[] DbGetColumns(HDP_Source source, string tableName) { using (MySqlConnection _connection = new MySqlConnection(source.GetConnectString())) { var _result = new List(); try { _connection.Open(); } catch { throw new ArgumentException("数据源连接失败"); } StringBuilder _command = new StringBuilder(); _command.AppendLine($@"SELECT `COLUMN_NAME` AS `Key`,`DATA_TYPE` AS `DataType`,`COLUMN_COMMENT` AS `Description` FROM INFORMATION_SCHEMA.COLUMNS WHERE `TABLE_SCHEMA`=@SourceKey AND `TABLE_NAME`=@TableKey"); _result.AddRange(_connection.Query(_command.ToString(), new { SourceKey = source.Key, TableKey = tableName })); _connection.Close(); return _result.ToArray(); } } } }