using System; using System.Collections.Generic; using System.Linq; using System.Security.Policy; using System.Text; using System.Threading.Tasks; using MySql.Data.MySqlClient; using Dapper; using ZKLT.Hadoop.Interface; using ZKLT.Hadoop.Model; using MySqlX.XDevAPI.Relational; using Mysqlx.Crud; namespace ZKLT.Hadoop { public class TableService : ITableService { /// /// 合并条件 /// /// 表 /// 条件 /// 数据 /// 命令 /// 参数 private void MergeWhere(HDP_Table table, Dictionary? where, Dictionary? row, StringBuilder command, Dictionary param) { //执行条件 StringBuilder _wherestr = new StringBuilder(); _wherestr.Append("WHERE 1 = 1"); if (where != null && where.Count > 0) { for (var i = 0; i < table.Columns!.Length; i++) { var _column = table.Columns[i]; if (where.ContainsKey(_column.Key!)) { switch (where[_column.Key!]) { case HDP_WhereType.LIKE: _wherestr.Append($@" AND `{_column.Key!}` {where[_column.Key!]} CONCAT('%',@{_column.Key!},'%')"); break; default: _wherestr.Append($@" AND `{_column.Key!}` {where[_column.Key!]} @{_column.Key!}"); break; } param.Add(_column.Key!, row![_column.Key!]); } } command.AppendLine(_wherestr.ToString()); } } /// /// 同步结构 /// /// 数据源 /// 数据表 /// 是否成功 public bool InitStruct(HDP_Source source, HDP_Table table) { //数据校验 if (string.IsNullOrEmpty(table.Key)) { throw new ArgumentNullException("表键无效"); } if (table.Columns == null || table.Columns.Length == 0) { throw new ArgumentNullException("列无效"); } using (MySqlConnection _connection = new MySqlConnection(source.GetConnectString())) { try { _connection.Open(); } catch { throw new ArgumentException("数据源连接失败"); } StringBuilder _command = new StringBuilder(); if (DbExistTable(source, table.Key)) { var _dbColumns = DbGetColumns(source, table.Key); _command.AppendLine($@"ALTER TABLE `{table.Key}`"); _command.AppendLine("DROP PRIMARY KEY,"); //加载列 for (var i = 0; i < table.Columns.Length; i++) { StringBuilder _colstr = new StringBuilder(); var _column = table.Columns[i]; if (_dbColumns.Any(x => x.Key!.ToLower() == _column.Key!.ToLower())) { _colstr.Append($@"MODIFY `{_column.Key}`"); } else { _colstr.Append($@"ADD `{_column.Key}`"); } switch (_column.DataType) { case HDP_ColumnDataType.VARCHAR: if (_column.Length == 0) { throw new ArgumentNullException(@$"列{_column.Key}长度无效"); } _colstr.Append($@" {_column.DataType}({_column.Length})"); break; case HDP_ColumnDataType.DECIMAL: if (_column.Length == 0 || _column.DecimalLength == 0) { throw new ArgumentNullException(@$"列{_column.Key}长度无效"); } _colstr.Append($@" {_column.DataType}({_column.Length},{_column.DecimalLength})"); break; default: _colstr.Append(@$" {_column.DataType}"); break; } if (!string.IsNullOrEmpty(_column.Description)) { _colstr.Append($@" COMMENT '{_column.Description}'"); } _colstr.Append(","); _command.AppendLine(_colstr.ToString()); } //加载主键 var _primarys = table.Columns.Where(x => x.IsPrimary == true).ToArray(); if (_primarys.Length > 0) { StringBuilder _primaryStr = new StringBuilder(); _primaryStr.Append("ADD PRIMARY KEY ("); for (var i = 0; i < _primarys.Length; i++) { var _primary = _primarys[i]; _primaryStr.Append($@"`{_primary.Key}`"); if (i < _primarys.Length - 1) { _primaryStr.Append(","); } } _primaryStr.Append(")"); _command.AppendLine(_primaryStr.ToString()); } if (!string.IsNullOrEmpty(table.Description)) { _command.AppendLine(@$"COMMENT '{table.Description}'"); } _connection.Execute(_command.ToString()); return true; } else { _command.AppendLine($@"CREATE TABLE `{table.Key}` ("); //加载列 for (int i = 0; i < table.Columns.Length; i++) { StringBuilder _colstr = new StringBuilder(); var _column = table.Columns[i]; _colstr.Append($@"`{_column.Key}`"); switch (_column.DataType) { case HDP_ColumnDataType.VARCHAR: if (_column.Length == 0) { throw new ArgumentNullException(@$"列{_column.Key}长度无效"); } _colstr.Append($@" {_column.DataType}({_column.Length})"); break; case HDP_ColumnDataType.DECIMAL: if (_column.Length == 0 || _column.DecimalLength == 0) { throw new ArgumentNullException(@$"列{_column.Key}长度无效"); } _colstr.Append($@" {_column.DataType}({_column.Length},{_column.DecimalLength})"); break; default: _colstr.Append(@$" {_column.DataType}"); break; } if (!string.IsNullOrEmpty(_column.Description)) { _colstr.Append($@" COMMENT '{_column.Description}'"); } if (i < table.Columns.Length - 1) { _colstr.Append(","); } _command.AppendLine(_colstr.ToString()); } //加载主键 var _primarys = table.Columns.Where(x => x.IsPrimary == true).ToArray(); if (_primarys.Length > 0) { _command.Append(","); StringBuilder _primaryStr = new StringBuilder(); _primaryStr.Append("PRIMARY KEY ("); for (var i = 0; i < _primarys.Length; i++) { var _primary = _primarys[i]; _primaryStr.Append($@"`{_primary.Key}`"); if (i < _primarys.Length - 1) { _primaryStr.Append(","); } } _primaryStr.Append(")"); _command.AppendLine(_primaryStr.ToString()); } _command.AppendLine(")"); if (!string.IsNullOrEmpty(table.Description)) { _command.AppendLine(@$"COMMENT '{table.Description}'"); } _connection.Execute(_command.ToString()); } _connection.Close(); return true; } } /// /// 删除结构 /// /// 源 /// 表 /// 是否成功 public bool RemoveStruct(HDP_Source source, string tableKey) { //数据校验 if (string.IsNullOrEmpty(tableKey)) { throw new ArgumentNullException("表键无效"); } if (!DbExistTable(source, tableKey)) { throw new ArgumentNullException("表不存在"); } using (MySqlConnection _connection = new MySqlConnection(source.GetConnectString())) { try { _connection.Open(); } catch { throw new ArgumentException("数据源连接失败"); } StringBuilder _command = new StringBuilder(); _command.AppendLine(@$"DROP TABLE {tableKey}"); _connection.Execute(_command.ToString()); _connection.Close(); return !DbExistTable(source, tableKey); } } /// /// 插入数据 /// /// 数据源 /// 数据表 /// 数据 /// 是否成功 public bool Insert(HDP_Source source, HDP_Table table, Dictionary row) { //数据校验 if (string.IsNullOrEmpty(table.Key)) { throw new ArgumentNullException("表键无效"); } if (table.Columns == null || table.Columns.Length == 0) { throw new ArgumentNullException("列无效"); } if (row == null || row.Count == 0) { throw new ArgumentNullException("数据无效"); } using (MySqlConnection _connection = new MySqlConnection(source.GetConnectString())) { try { _connection.Open(); } catch { throw new ArgumentException("数据源连接失败"); } StringBuilder _command = new StringBuilder(); //主键检查 var _primarys = table.Columns.Where(x => x.IsPrimary == true).ToArray(); for (var i = 0; i < _primarys.Length; i++) { var _primary = _primarys[i]; if (row[_primary.Key!] == null || string.IsNullOrEmpty(row[_primary.Key!].ToString())) { throw new ArgumentException($@"主键{_primary.Key}值无效"); } } //插入命令 _command.AppendLine($@"INSERT INTO `{table.Key}` ("); StringBuilder _colstr = new StringBuilder(); StringBuilder _parmstr = new StringBuilder(); Dictionary _params = new Dictionary(); for (var i = 0; i < table.Columns.Length; i++) { var _column = table.Columns[i]; if (row.ContainsKey(_column.Key!)) { _colstr.Append($@"`{_column.Key!}`,"); _parmstr.Append($@"@{_column.Key},"); _params.Add(_column.Key!, row[_column.Key!]); } else if (!string.IsNullOrEmpty(_column.InsertDefault)) { if (HDP_CommandAction.IsAction(_column.InsertDefault)) { _colstr.Append($@"`{_column.Key!}`,"); _parmstr.Append($"{_column.InsertDefault},"); } else { _colstr.Append($@"`{_column.Key!}`,"); _parmstr.Append($@"@{_column.Key},"); _params.Add(_column.Key!, _column.InsertDefault); } } } if (_colstr[_colstr.Length - 1] == ',') { _colstr.Remove(_colstr.Length - 1, 1); } if (_parmstr[_parmstr.Length - 1] == ',') { _parmstr.Remove(_parmstr.Length - 1, 1); } _command.AppendLine(_colstr.ToString()); _command.AppendLine(") VALUES ("); _command.AppendLine(_parmstr.ToString()); _command.AppendLine(")"); var _result = _connection.Execute(HDP_CommandAction.ConvertCommand(_command.ToString(), row), _params); _connection.Close(); if (_result > 0) { return true; } else { return false; } } } /// /// 更新 /// /// 数据源 /// 数据表 /// 条件 /// 数据 /// 是否成功 public bool Update(HDP_Source source, HDP_Table table, Dictionary where, Dictionary row) { //数据校验 if (string.IsNullOrEmpty(table.Key)) { throw new ArgumentNullException("表键无效"); } if (table.Columns == null || table.Columns.Length == 0) { throw new ArgumentNullException("列无效"); } if (where == null || where.Count == 0) { throw new ArgumentNullException("条件无效"); } if (row == null || row.Count == 0) { throw new ArgumentNullException("数据无效"); } using (MySqlConnection _connection = new MySqlConnection(source.GetConnectString())) { try { _connection.Open(); } catch { throw new ArgumentException("数据源连接失败"); } //更新命令 StringBuilder _command = new StringBuilder(); Dictionary _params = new Dictionary(); _command.AppendLine(@$"UPDATE `{table.Key}` SET "); //更新列 StringBuilder _colstr = new StringBuilder(); for (var i = 0; i < table.Columns.Length; i++) { var _column = table.Columns[i]; if (row.ContainsKey(_column.Key!) && !where.ContainsKey(_column.Key!)) { _colstr.Append($@"`{_column.Key!}`=@{_column.Key!},"); _params.Add(_column.Key!, row[_column.Key!]); } else if (!string.IsNullOrEmpty(_column.UpdateDefault)) { if (HDP_CommandAction.IsAction(_column.UpdateDefault)) { _colstr.Append($@"`{_column.Key!}`={_column.UpdateDefault}"); } else { _colstr.Append($@"`{_column.Key!}`=@{_column.Key!},"); _params.Add(_column.Key!, _column.UpdateDefault); } } } if (_colstr[_colstr.Length - 1] == ',') { _colstr.Remove(_colstr.Length - 1, 1); } _command.AppendLine(_colstr.ToString()); //执行条件 MergeWhere(table, where, row, _command, _params); var _result = _connection.Execute(HDP_CommandAction.ConvertCommand(_command.ToString(), row), _params); _connection.Close(); if (_result > 0) { return true; } else { return false; } } } /// /// 删除 /// /// 数据源 /// 数据表 /// 条件 /// 数据 /// 是否成功 public bool Delete(HDP_Source source, HDP_Table table, Dictionary where, Dictionary row) { //数据校验 if (string.IsNullOrEmpty(table.Key)) { throw new ArgumentNullException("表键无效"); } if (table.Columns == null || table.Columns.Length == 0) { throw new ArgumentNullException("列无效"); } if (where == null || where.Count == 0) { throw new ArgumentNullException("条件无效"); } if (row == null || row.Count == 0) { throw new ArgumentNullException("数据无效"); } using (MySqlConnection _connection = new MySqlConnection(source.GetConnectString())) { try { _connection.Open(); } catch { throw new ArgumentException("数据源连接失败"); } //更新命令 StringBuilder _command = new StringBuilder(); Dictionary _params = new Dictionary(); _command.AppendLine(@$"DELETE FROM `{table.Key}`"); MergeWhere(table, where, row, _command, _params); var _result = _connection.Execute(HDP_CommandAction.ConvertCommand(_command.ToString(), row), _params); _connection.Close(); if (_result > 0) { return true; } else { return false; } } } /// /// 查询单个 /// /// 数据源 /// 数据表 /// 条件 /// 数据 /// 结果 public T? QuerySingle(HDP_Source source, HDP_Table table, Dictionary where, Dictionary row) { //数据校验 if (string.IsNullOrEmpty(table.Key)) { throw new ArgumentNullException("表键无效"); } if (table.Columns == null || table.Columns.Length == 0) { throw new ArgumentNullException("列无效"); } if (where == null || where.Count == 0) { throw new ArgumentNullException("条件无效"); } if (row == null || row.Count == 0) { throw new ArgumentNullException("数据无效"); } using (MySqlConnection _connection = new MySqlConnection(source.GetConnectString())) { try { _connection.Open(); } catch { throw new ArgumentException("数据源连接失败"); } //查询命令 StringBuilder _command = new StringBuilder(); Dictionary _params = new Dictionary(); _command.AppendLine(@$"SELECT * FROM `{table.Key}`"); //执行条件 MergeWhere(table, where, row, _command, _params); var _result = _connection.Query(HDP_CommandAction.ConvertCommand(_command.ToString(), row), _params).ToArray(); _connection.Close(); if (_result.Length > 0) { return _result[0]; } else { return default(T); } } } /// /// 查询列表 /// /// 返回类型 /// 数据源 /// 数据表 /// 条件 /// 数据 /// 结果集 public T[] Query(HDP_Source source, HDP_Table table, Dictionary? where = null, Dictionary? row = null, Dictionary? order = null) { //数据校验 if (string.IsNullOrEmpty(table.Key)) { throw new ArgumentNullException("表键无效"); } if (table.Columns == null || table.Columns.Length == 0) { throw new ArgumentNullException("列无效"); } using (MySqlConnection _connection = new MySqlConnection(source.GetConnectString())) { try { _connection.Open(); } catch { throw new ArgumentException("数据源连接失败"); } //查询命令 StringBuilder _command = new StringBuilder(); Dictionary _params = new Dictionary(); _command.AppendLine(@$"SELECT * FROM `{table.Key}`"); //执行条件 MergeWhere(table, where, row, _command, _params); //执行排序 StringBuilder _orderstr = new StringBuilder(); _orderstr.Append("ORDER BY "); if (order != null && order.Count > 0) { for (var i = 0; i < table.Columns.Length; i++) { var _column = table.Columns[i]; if (order.ContainsKey(_column.Key!)) { switch (order[_column.Key!]) { case "DESC": _orderstr.Append($@"`{_column.Key!}` DESC,"); break; default: _orderstr.Append($@"`{_column.Key!}` ASC,"); break; } } } if (_orderstr[_orderstr.Length - 1] == ',') { _orderstr.Remove(_orderstr.Length - 1, 1); } _command.AppendLine(_orderstr.ToString()); } var _result = _connection.Query(HDP_CommandAction.ConvertCommand(_command.ToString(), row), _params); _connection.Close(); return _result.ToArray(); } } /// /// 查询列表 /// /// 返回类型 /// 数据源 /// 数据表 /// 条件 /// 数据 /// 结果集 public HDP_Page QueryPage(HDP_Source source, HDP_Table table, int pageIndex, int pageSize, Dictionary? where = null, Dictionary? row = null, Dictionary? order = null) { //数据校验 if (string.IsNullOrEmpty(table.Key)) { throw new ArgumentNullException("表键无效"); } if (table.Columns == null || table.Columns.Length == 0) { throw new ArgumentNullException("列无效"); } using (MySqlConnection _connection = new MySqlConnection(source.GetConnectString())) { try { _connection.Open(); } catch { throw new ArgumentException("数据源连接失败"); } //查询命令 StringBuilder _command = new StringBuilder(); Dictionary _params = new Dictionary(); _command.AppendLine(@$"SELECT * FROM `{table.Key}`"); //执行条件 MergeWhere(table, where, row, _command, _params); //执行排序 StringBuilder _orderstr = new StringBuilder(); _orderstr.Append("ORDER BY "); if (order != null && order.Count > 0) { for (var i = 0; i < table.Columns.Length; i++) { var _column = table.Columns[i]; if (order.ContainsKey(_column.Key!)) { switch (order[_column.Key!]) { case "DESC": _orderstr.Append($@"`{_column.Key!}` DESC,"); break; default: _orderstr.Append($@"`{_column.Key!}` ASC,"); break; } } } if (_orderstr[_orderstr.Length - 1] == ',') { _orderstr.Remove(_orderstr.Length - 1, 1); } _command.AppendLine(_orderstr.ToString()); } var _result = new HDP_Page(); _result.PageIndex = pageIndex; _result.PageSize = pageSize; StringBuilder _totalstr = new StringBuilder(); _totalstr.AppendLine($@"SELECT COUNT(0) FROM ("); _totalstr.AppendLine(_command.ToString()); _totalstr.AppendLine(") AS Temp"); _result.Total = _connection.QuerySingle(_totalstr.ToString(), _params); StringBuilder _pagestr = new StringBuilder(); _pagestr.AppendLine($@"SELECT * FROM ("); _pagestr.AppendLine(_command.ToString()); _pagestr.AppendLine(") AS Temp"); _pagestr.AppendLine(@$"LIMIT {(_result.PageIndex - 1) * _result.PageSize},{_result.PageSize}"); _result.Data = _connection.Query(HDP_CommandAction.ConvertCommand(_pagestr.ToString(), row), _params).ToArray(); _connection.Close(); return _result; } } /// /// 判断数据源是否存在表 /// /// 数据源 /// 表名 /// 是否存在 public bool DbExistTable(HDP_Source source, string tableName) { using (MySqlConnection _connection = new MySqlConnection(source.GetConnectString())) { try { _connection.Open(); } catch { throw new ArgumentException("数据源连接失败"); } StringBuilder _command = new StringBuilder(); _command.AppendLine($@"SELECT COUNT(0) FROM INFORMATION_SCHEMA.TABLES WHERE `TABLE_TYPE`='BASE TABLE' AND `TABLE_SCHEMA`=@SourceKey AND `TABLE_NAME` =@TableKey"); int _result = _connection.QuerySingle(_command.ToString(), new { SourceKey = source.Key, TableKey = tableName }); _connection.Close(); if (_result > 0) { return true; } else { return false; } } } /// /// 查询数据列 /// /// 数据源 /// 表名 /// public HDP_Column[] DbGetColumns(HDP_Source source, string tableName) { using (MySqlConnection _connection = new MySqlConnection(source.GetConnectString())) { var _result = new List(); try { _connection.Open(); } catch { throw new ArgumentException("数据源连接失败"); } StringBuilder _command = new StringBuilder(); _command.AppendLine($@"SELECT `COLUMN_NAME` AS `Key`,`DATA_TYPE` AS `DataType`,`COLUMN_COMMENT` AS `Description` FROM INFORMATION_SCHEMA.COLUMNS WHERE `TABLE_SCHEMA`=@SourceKey AND `TABLE_NAME`=@TableKey"); _result.AddRange(_connection.Query(_command.ToString(), new { SourceKey = source.Key, TableKey = tableName })); _connection.Close(); return _result.ToArray(); } } } }