Compare commits

...

10 Commits

@ -22,14 +22,38 @@ namespace ZKLT.Hadoop.API.Controllers
private IHadoopService _HadoopService; private IHadoopService _HadoopService;
[HttpGet("getid")] [HttpGet("getid")]
public ActionResult GetId([FromQuery] string? prefix) { public ActionResult GetId([FromQuery] string? prefix, [FromQuery] int? count) {
string _result = ""; if (count != null && count > 0)
if (!string.IsNullOrEmpty(prefix)) { {
_result += prefix; List<string> _result = new List<string>();
} for (int i = 0; i < count; i++) {
var _date = DateTime.Now; string _temp = "";
_result += $@"{_date.Year.ToString().PadLeft(4,'0')}{_date.Month.ToString().PadLeft(2, '0')}{_date.Day.ToString().PadLeft(2, '0')}{_date.Hour.ToString().PadLeft(2, '0')}{_date.Minute.ToString().PadLeft(2, '0')}{_date.Second.ToString().PadLeft(2, '0')}{_date.Millisecond.ToString().PadLeft(3, '0')}{new Random().Next(1000).ToString().PadLeft(4,'0')}"; if (!string.IsNullOrEmpty(prefix))
return Ok(_result); {
_temp += prefix;
}
var _date = DateTime.Now;
_temp += $@"{_date.Year.ToString().PadLeft(4, '0')}{_date.Month.ToString().PadLeft(2, '0')}{_date.Day.ToString().PadLeft(2, '0')}{_date.Hour.ToString().PadLeft(2, '0')}{_date.Minute.ToString().PadLeft(2, '0')}{_date.Second.ToString().PadLeft(2, '0')}{_date.Millisecond.ToString().PadLeft(3, '0')}{new Random().Next(9999).ToString().PadLeft(4, '0')}";
if (_result.Any(v => v == _temp))
{
i--;
}
else {
_result.Add(_temp);
}
}
return Ok(_result);
}
else {
string _result = "";
if (!string.IsNullOrEmpty(prefix))
{
_result += prefix;
}
var _date = DateTime.Now;
_result += $@"{_date.Year.ToString().PadLeft(4, '0')}{_date.Month.ToString().PadLeft(2, '0')}{_date.Day.ToString().PadLeft(2, '0')}{_date.Hour.ToString().PadLeft(2, '0')}{_date.Minute.ToString().PadLeft(2, '0')}{_date.Second.ToString().PadLeft(2, '0')}{_date.Millisecond.ToString().PadLeft(3, '0')}{new Random().Next(9999).ToString().PadLeft(4, '0')}";
return Ok(_result);
}
} }
[HttpGet("getsource")] [HttpGet("getsource")]

@ -40,25 +40,25 @@ namespace ZKLT.Hadoop.API
app.UseCors("all"); app.UseCors("all");
#endregion #endregion
app.UseHadoop((c) =>
{
c.Host = "127.0.0.1";
c.Account = "root";
c.PassWord = "root";
c.Key = "hadoopdb";
c.Port = 3306;
});
//app.UseHadoop((c) => //app.UseHadoop((c) =>
//{ //{
// c.Host = "172.17.0.1"; // c.Host = "127.0.0.1";
// c.Account = "root"; // c.Account = "root";
// c.PassWord = "root"; // c.PassWord = "root";
// c.Key = "testdb"; // c.Key = "hadoopdb";
// c.Port = 4000; // c.Port = 3306;
//}); //});
app.UseHadoop((c) =>
{
c.Host = app.Configuration["ConnectionStrings:Host"];
c.Account = app.Configuration["ConnectionStrings:Account"];
c.PassWord = app.Configuration["ConnectionStrings:PassWord"];
c.Key = app.Configuration["ConnectionStrings:Key"];
c.Port = Convert.ToInt32(app.Configuration["ConnectionStrings:Port"]);
});
//app.UseHadoop((c) => //app.UseHadoop((c) =>
//{ //{
// c.Host = "118.195.165.218"; // c.Host = "118.195.165.218";

@ -8,7 +8,7 @@
"ASPNETCORE_ENVIRONMENT": "Development" "ASPNETCORE_ENVIRONMENT": "Development"
}, },
"dotnetRunMessages": true, "dotnetRunMessages": true,
"applicationUrl": "http://localhost:5171" "applicationUrl": "http://*:5171"
}, },
"IIS Express": { "IIS Express": {
"commandName": "IISExpress", "commandName": "IISExpress",
@ -33,7 +33,7 @@
"windowsAuthentication": false, "windowsAuthentication": false,
"anonymousAuthentication": true, "anonymousAuthentication": true,
"iisExpress": { "iisExpress": {
"applicationUrl": "http://localhost:59844", "applicationUrl": "http://localhost:43666",
"sslPort": 0 "sslPort": 0
} }
} }

@ -4,5 +4,12 @@
"Default": "Information", "Default": "Information",
"Microsoft.AspNetCore": "Warning" "Microsoft.AspNetCore": "Warning"
} }
},
"ConnectionStrings": {
"Host": "1.94.127.210",
"Account": "root",
"PassWord": "Panjiandong1994",
"Key": "erptest",
"Port": "3306"
} }
} }

@ -5,5 +5,12 @@
"Microsoft.AspNetCore": "Warning" "Microsoft.AspNetCore": "Warning"
} }
}, },
"AllowedHosts": "*" "AllowedHosts": "*",
"ConnectionStrings": {
"Host": "1.94.127.210",
"Account": "root",
"PassWord": "Panjiandong1994",
"Key": "erptest",
"Port": "3306"
}
} }

@ -0,0 +1 @@
docker build -f ./Dockerfile -t hadoop:1.0.3 ../../.

@ -2,9 +2,10 @@ version: '3'
services: services:
hadoop: hadoop:
restart: always restart: always
image: hadoop:latest image: hadoop:1.0.0
container_name: hadoop container_name: hadoop
# volumes: volumes:
- ./appsettings.json:/app/appsettings.json
# - /apps/mysql/mydir:/mydir # - /apps/mysql/mydir:/mydir
# - /apps/mysql/datadir:/var/lib/mysql # - /apps/mysql/datadir:/var/lib/mysql
# - /apps/mysql/conf/my.cnf:/etc/my.cnf # - /apps/mysql/conf/my.cnf:/etc/my.cnf
@ -16,3 +17,4 @@ services:
# 使用宿主机的3306端口映射到容器的3306端口 # 使用宿主机的3306端口映射到容器的3306端口
# 宿主机:容器 # 宿主机:容器
- 5000:8080 - 5000:8080
network_mode: bridge

@ -1,4 +1,5 @@
using System; using Newtonsoft.Json.Linq;
using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Text; using System.Text;
@ -32,7 +33,7 @@ namespace ZKLT.Hadoop.Interface
/// <param name="table">数据表</param> /// <param name="table">数据表</param>
/// <param name="row">数据</param> /// <param name="row">数据</param>
/// <returns>是否成功</returns> /// <returns>是否成功</returns>
public bool Insert(HDP_Source source, HDP_Table table, Dictionary<string, object> row); public bool Insert(HDP_Source source, HDP_Table table, JContainer? row);
/// <summary> /// <summary>
/// 更新 /// 更新
@ -42,7 +43,7 @@ namespace ZKLT.Hadoop.Interface
/// <param name="where">条件</param> /// <param name="where">条件</param>
/// <param name="row">数据</param> /// <param name="row">数据</param>
/// <returns>是否成功</returns> /// <returns>是否成功</returns>
public bool Update(HDP_Source source, HDP_Table table, Dictionary<string, string> where, Dictionary<string, object> row); public bool Update(HDP_Source source, HDP_Table table, JContainer? where, JContainer? row);
/// <summary> /// <summary>
/// 删除 /// 删除
@ -52,7 +53,7 @@ namespace ZKLT.Hadoop.Interface
/// <param name="where">条件</param> /// <param name="where">条件</param>
/// <param name="row">数据</param> /// <param name="row">数据</param>
/// <returns>是否成功</returns> /// <returns>是否成功</returns>
public bool Delete(HDP_Source source, HDP_Table table, Dictionary<string, string> where, Dictionary<string, object> row); public bool Delete(HDP_Source source, HDP_Table table, JContainer? where, JContainer? row);
/// <summary> /// <summary>
/// 查询单个 /// 查询单个
@ -62,7 +63,7 @@ namespace ZKLT.Hadoop.Interface
/// <param name="where">条件</param> /// <param name="where">条件</param>
/// <param name="row">数据</param> /// <param name="row">数据</param>
/// <returns>结果</returns> /// <returns>结果</returns>
public T? QuerySingle<T>(HDP_Source source, HDP_Table table, Dictionary<string, string> where, Dictionary<string, object> row, string[]? col); public T? QuerySingle<T>(HDP_Source source, HDP_Table table, JContainer? where, JContainer? row, string[]? col);
/// <summary> /// <summary>
/// 查询列表 /// 查询列表
@ -73,8 +74,8 @@ namespace ZKLT.Hadoop.Interface
/// <param name="where">条件</param> /// <param name="where">条件</param>
/// <param name="row">数据</param> /// <param name="row">数据</param>
/// <returns>结果集</returns> /// <returns>结果集</returns>
public T[] Query<T>(HDP_Source source, HDP_Table table, Dictionary<string, string>? where, Dictionary<string, object>? row, public T[] Query<T>(HDP_Source source, HDP_Table table, JContainer? where, JContainer? row,
Dictionary<string, object>? order, string[]? col); JContainer? order, string[]? col);
/// <summary> /// <summary>
/// 查询列表 /// 查询列表
@ -85,7 +86,7 @@ namespace ZKLT.Hadoop.Interface
/// <param name="where">条件</param> /// <param name="where">条件</param>
/// <param name="row">数据</param> /// <param name="row">数据</param>
/// <returns>结果集</returns> /// <returns>结果集</returns>
public HDP_Page<T> QueryPage<T>(HDP_Source source, HDP_Table table, int pageIndex, int pageSize, Dictionary<string, string>? where, Dictionary<string, object>? row, Dictionary<string, object>? order, string[]? col); public HDP_Page<T> QueryPage<T>(HDP_Source source, HDP_Table table, int pageIndex, int pageSize, JContainer? where, JContainer? row, JContainer? order, string[]? col);
/// <summary> /// <summary>

@ -1,4 +1,5 @@
using System; using Newtonsoft.Json.Linq;
using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Text; using System.Text;
@ -23,11 +24,11 @@ namespace ZKLT.Hadoop.Model
private string[]? _Col; private string[]? _Col;
private Dictionary<string, string>? _Where; private JContainer? _Where;
private Dictionary<string, object>? _Data; private JContainer? _Data;
private Dictionary<string, object>? _Order; private JContainer? _Order;
/// <summary> /// <summary>
/// 源 /// 源
@ -42,12 +43,12 @@ namespace ZKLT.Hadoop.Model
/// <summary> /// <summary>
/// 条件 /// 条件
/// </summary> /// </summary>
public Dictionary<string, string>? Where { get => _Where; set => _Where = value; } public JContainer? Where { get => _Where; set => _Where = value; }
/// <summary> /// <summary>
/// 数据 /// 数据
/// </summary> /// </summary>
public Dictionary<string, object>? Data { get => _Data; set => _Data = value; } public JContainer? Data { get => _Data; set => _Data = value; }
/// <summary> /// <summary>
/// 分页下标 /// 分页下标
@ -62,7 +63,7 @@ namespace ZKLT.Hadoop.Model
/// <summary> /// <summary>
/// 排序 /// 排序
/// </summary> /// </summary>
public Dictionary<string, object>? Order { get => _Order; set => _Order = value; } public JContainer? Order { get => _Order; set => _Order = value; }
/// <summary> /// <summary>
/// 命令类型 /// 命令类型

@ -1,8 +1,10 @@
using System; using Newtonsoft.Json.Linq;
using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Text; using System.Text;
using System.Threading.Tasks; using System.Threading.Tasks;
using System.Text.RegularExpressions;
namespace ZKLT.Hadoop.Model namespace ZKLT.Hadoop.Model
{ {
@ -27,7 +29,7 @@ namespace ZKLT.Hadoop.Model
/// <param name="action">函数</param> /// <param name="action">函数</param>
/// <param name="param">参数</param> /// <param name="param">参数</param>
/// <returns>命令</returns> /// <returns>命令</returns>
public static object Convert(string action, Dictionary<string, object> param) public static object Convert(string action, JContainer param)
{ {
if (action == DATENOW) if (action == DATENOW)
{ {
@ -41,5 +43,35 @@ namespace ZKLT.Hadoop.Model
return action; return action;
} }
} }
/// <summary>
/// 列转换
/// </summary>
/// <param name="column">列</param>
/// <param name="param">参数</param>
/// <param name="action">公式</param>
/// <returns></returns>
public static string ColConvert(HDP_Column column, Dictionary<string, object> param,object value) {
string _guid;
if (value is string) {
string _action = value.ToString()!;
Regex _regex1 = new Regex(@"(?<=(\+\=))[\d\.]+");
if(_regex1.IsMatch(_action))
{
_guid = Guid.NewGuid().ToString("N");
param.Add(_guid,_regex1.Match(_action).Value);
return @$"`{column.Key}` + @{_guid}";
}
Regex _regex2 = new Regex(@"(?<=(\-\=))[\d\.]+");
if (_regex2.IsMatch(_action))
{
_guid = Guid.NewGuid().ToString("N");
param.Add(_guid, _regex2.Match(_action).Value);
return @$"`{column.Key}` - @{_guid}";
}
}
param.Add(column.Key!, value);
return $@"@{column.Key}";
}
} }
} }

@ -1,4 +1,6 @@
using System; using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.ComponentModel.DataAnnotations.Schema; using System.ComponentModel.DataAnnotations.Schema;
using System.Linq; using System.Linq;
@ -138,5 +140,14 @@ namespace ZKLT.Hadoop.Model
return _result; return _result;
} }
/// <summary>
/// 类转JObject
/// </summary>
/// <param name="data">数据</param>
/// <returns></returns>
public static JObject Class2JObject(object data) {
return JsonConvert.DeserializeObject<JObject>(JsonConvert.SerializeObject(data)!)!;
}
} }
} }

@ -10,4 +10,8 @@
<None Remove="HDP_Table.cs~RF4db01d6c.TMP" /> <None Remove="HDP_Table.cs~RF4db01d6c.TMP" />
</ItemGroup> </ItemGroup>
<ItemGroup>
<PackageReference Include="Newtonsoft.Json" Version="13.0.3" />
</ItemGroup>
</Project> </Project>

@ -1,6 +1,7 @@
using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
using MySqlX.XDevAPI.Relational; using MySqlX.XDevAPI.Relational;
using Newtonsoft.Json.Linq;
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
@ -140,10 +141,11 @@ namespace ZKLT.Hadoop
{ {
return _Source; return _Source;
} }
var _result = _TableService.QuerySingle<HDP_Source>(_Source, GetTable("HDP_Source")!, new Dictionary<string, string>
var _result = _TableService.QuerySingle<HDP_Source>(_Source, GetTable("HDP_Source")!, new JObject
{ {
{ "Id","=" } { "Id","=" }
}, new Dictionary<string, object> { }, new JObject {
{ "Id",sourceid} { "Id",sourceid}
}, null); }, null);
return _result; return _result;
@ -186,7 +188,7 @@ namespace ZKLT.Hadoop
throw new ArgumentException("编号已存在"); throw new ArgumentException("编号已存在");
} }
return _TableService.Insert(_Source, GetTable("HDP_Source")!, HDP_Table.Class2Dictionary(source)); return _TableService.Insert(_Source, GetTable("HDP_Source")!, HDP_Table.Class2JObject(source));
} }
/// <summary> /// <summary>
@ -226,9 +228,9 @@ namespace ZKLT.Hadoop
throw new ArgumentException("编号不存在"); throw new ArgumentException("编号不存在");
} }
return _TableService.Update(_Source, GetTable("HDP_Source")!, new Dictionary<string, string> { return _TableService.Update(_Source, GetTable("HDP_Source")!, new JObject {
{"Id","=" } {"Id","=" }
}, HDP_Table.Class2Dictionary(source)); }, HDP_Table.Class2JObject(source));
} }
/// <summary> /// <summary>
@ -248,9 +250,9 @@ namespace ZKLT.Hadoop
throw new ArgumentException("编号不存在"); throw new ArgumentException("编号不存在");
} }
return _TableService.Delete(_Source, GetTable("HDP_Source")!, new Dictionary<string, string> { return _TableService.Delete(_Source, GetTable("HDP_Source")!, new JObject{
{"Id","=" } {"Id","=" }
}, new Dictionary<string, object> { }, new JObject{
{"Id",sourceid } {"Id",sourceid }
}); });
} }
@ -282,18 +284,18 @@ namespace ZKLT.Hadoop
return _Tables.First(x => x.Id == tableid); return _Tables.First(x => x.Id == tableid);
} }
var _result = _TableService.QuerySingle<HDP_Table>(_Source, GetTable("HDP_Table")!, new Dictionary<string, string> var _result = _TableService.QuerySingle<HDP_Table>(_Source, GetTable("HDP_Table")!, new JObject
{ {
{ "Id","=" } { "Id","=" }
}, new Dictionary<string, object> { }, new JObject{
{ "Id",tableid} { "Id",tableid}
}, null); }, null);
if (_result != null) if (_result != null)
{ {
_result.Columns = _TableService.Query<HDP_Column>(_Source, GetTable("HDP_Column")!, new Dictionary<string, string> { _result.Columns = _TableService.Query<HDP_Column>(_Source, GetTable("HDP_Column")!, new JObject {
{ "TableId","="} { "TableId","="}
}, new Dictionary<string, object> { }, new JObject{
{"TableId",_result.Id! } {"TableId",_result.Id! }
}, null, null); }, null, null);
} }
@ -320,7 +322,7 @@ namespace ZKLT.Hadoop
{ {
using (TransactionScope _scope = new TransactionScope()) using (TransactionScope _scope = new TransactionScope())
{ {
if (!_TableService.Insert(_Source, GetTable("HDP_Table")!, HDP_Table.Class2Dictionary(table))) if (!_TableService.Insert(_Source, GetTable("HDP_Table")!, HDP_Table.Class2JObject(table)))
{ {
return false; return false;
} }
@ -328,7 +330,7 @@ namespace ZKLT.Hadoop
{ {
var _column = table.Columns![i]; var _column = table.Columns![i];
_column.TableId = table.Id; _column.TableId = table.Id;
if (!_TableService.Insert(_Source, GetTable("HDP_Column")!, HDP_Table.Class2Dictionary(_column))) if (!_TableService.Insert(_Source, GetTable("HDP_Column")!, HDP_Table.Class2JObject(_column)))
{ {
return false; return false;
} }
@ -362,9 +364,9 @@ namespace ZKLT.Hadoop
{ {
using (TransactionScope _scope = new TransactionScope()) using (TransactionScope _scope = new TransactionScope())
{ {
if (!_TableService.Update(_Source, GetTable("HDP_Table")!, new Dictionary<string, string> { if (!_TableService.Update(_Source, GetTable("HDP_Table")!, new JObject{
{ "Id","="} { "Id","="}
}, HDP_Table.Class2Dictionary(table))) }, HDP_Table.Class2JObject(table)))
{ {
return false; return false;
} }
@ -373,21 +375,21 @@ namespace ZKLT.Hadoop
{ {
var _column = table.Columns![i]; var _column = table.Columns![i];
_column.TableId = table.Id; _column.TableId = table.Id;
if (_TableService.QuerySingle<HDP_Column>(_Source, GetTable("HDP_Column")!, new Dictionary<string, string> if (_TableService.QuerySingle<HDP_Column>(_Source, GetTable("HDP_Column")!, new JObject
{ {
{"Id","=" } {"Id","=" }
}, HDP_Table.Class2Dictionary(_column), null) == null) }, HDP_Table.Class2JObject(_column), null) == null)
{ {
if (!_TableService.Insert(_Source, GetTable("HDP_Column")!, HDP_Table.Class2Dictionary(_column))) if (!_TableService.Insert(_Source, GetTable("HDP_Column")!, HDP_Table.Class2JObject(_column)))
{ {
return false; return false;
} }
} }
else else
{ {
if (!_TableService.Update(_Source, GetTable("HDP_Column")!, new Dictionary<string, string> { if (!_TableService.Update(_Source, GetTable("HDP_Column")!, new JObject {
{"Id","=" } {"Id","=" }
}, HDP_Table.Class2Dictionary(_column))) }, HDP_Table.Class2JObject(_column)))
{ {
return false; return false;
} }
@ -423,17 +425,17 @@ namespace ZKLT.Hadoop
using (TransactionScope _scope = new TransactionScope()) using (TransactionScope _scope = new TransactionScope())
{ {
if (!_TableService.Delete(_Source, GetTable("HDP_Table")!, new Dictionary<string, string> { if (!_TableService.Delete(_Source, GetTable("HDP_Table")!, new JObject{
{"Id","=" } {"Id","=" }
}, new Dictionary<string, object> { }, new JObject {
{"Id",tableId } {"Id",tableId }
})) }))
{ {
return false; return false;
} }
if (!_TableService.Delete(_Source, GetTable("HDP_Column")!, new Dictionary<string, string> { if (!_TableService.Delete(_Source, GetTable("HDP_Column")!, new JObject{
{ "TableId","="} { "TableId","="}
}, new Dictionary<string, object> { }, new JObject {
{"TableId",tableId } {"TableId",tableId }
})) }))
{ {

@ -12,6 +12,8 @@ using MySqlX.XDevAPI.Relational;
using Mysqlx.Crud; using Mysqlx.Crud;
using Newtonsoft.Json; using Newtonsoft.Json;
using Mysqlx.Resultset; using Mysqlx.Resultset;
using Newtonsoft.Json.Linq;
using System.Transactions;
namespace ZKLT.Hadoop namespace ZKLT.Hadoop
{ {
@ -22,60 +24,80 @@ namespace ZKLT.Hadoop
/// </summary> /// </summary>
/// <param name="table">表</param> /// <param name="table">表</param>
/// <param name="where">条件</param> /// <param name="where">条件</param>
/// <param name="row">数据</param> /// <param name="data">数据</param>
/// <param name="command">命令</param>
/// <param name="param">参数</param> /// <param name="param">参数</param>
private void MergeWhere(HDP_Table table, Dictionary<string, string>? where, Dictionary<string, object>? row, StringBuilder command, Dictionary<string, object> param) private string MergeWhere(HDP_Table table, JContainer? where, JContainer? data, Dictionary<string, object> param)
{ {
string _guid = "";
//执行条件
StringBuilder _wherestr = new StringBuilder(); StringBuilder _wherestr = new StringBuilder();
_wherestr.Append("WHERE 1 = 1"); string _guid = "";
if (where != null && where.Count > 0) if (where != null && data != null)
{ {
for (var i = 0; i < table.Columns!.Length; i++) _wherestr.AppendLine("WHERE");
var _wheres = new List<JObject>();
var _datas = new List<JObject>();
if (where.Type == JTokenType.Object)
{
_wheres.Add((JObject)where);
_datas.Add((JObject)data);
}
else if (where.Type == JTokenType.Array)
{ {
var _column = table.Columns[i]; for(var i = 0;i < where.Count;i++)
if (where.ContainsKey(_column.Key!))
{ {
switch (where[_column.Key!]) _wheres.Add((JObject)where.Children().ToArray()[i]);
_datas.Add((JObject)data.Children().ToArray()[i]);
}
}
for (var j = 0; j < _wheres.Count; j++) {
if (j > 0) {
_wherestr.AppendLine("OR");
}
_wherestr.AppendLine("(");
_wherestr.AppendLine("1 = 1");
var _where = _wheres[j];
var _data = _datas[j];
if (_where.Count > 0)
{
var _fileds = _where.Children().ToArray();
for (var i = 0; i < _fileds.Length; i++)
{ {
case HDP_WhereType.LIKE: var _item = (JProperty)_fileds[i];
_guid = Guid.NewGuid().ToString("N"); if (table.Columns!.Any(x => x.Key == _item.Name) && _data.ContainsKey(_item.Name))
_wherestr.Append($@" AND `{_column.Key!}` {where[_column.Key!]} CONCAT('%',@{_guid},'%')"); {
param.Add(_guid, row![_column.Key!]); if (_item.Value.Type == JTokenType.String)
break;
case HDP_WhereType.BETWEEN:
if (row![_column.Key!] != null)
{ {
var _betweendata = JsonConvert.DeserializeObject<object[]>(JsonConvert.SerializeObject(row![_column.Key!])); _guid = Guid.NewGuid().ToString("N");
if (_betweendata != null) _wherestr.AppendLine(@$"AND `{_item.Name}` {_item.Value.ToString()} @{_guid}");
if (_data[_item.Name]!.Type == JTokenType.Object || _data[_item.Name]!.Type == JTokenType.Array)
{
param.Add(_guid, _data[_item.Name]!);
}
else if (_item.Value.ToString() == HDP_WhereType.LIKE)
{ {
if (_betweendata[0] != null && _betweendata[0].ToString() != "") param.Add(_guid,$@"%{((JValue)_data[_item.Name]!).Value!}%" );
{ }
_guid = Guid.NewGuid().ToString("N"); else {
_wherestr.Append($@" AND `{_column.Key!}` >= @{_guid}"); param.Add(_guid, ((JValue)_data[_item.Name]!).Value!);
param.Add(_guid, _betweendata[0]);
}
if (_betweendata[1] != null && _betweendata[1].ToString() != "")
{
_guid = Guid.NewGuid().ToString("N");
_wherestr.Append($@" AND `{_column.Key!}` <= @{_guid}");
param.Add(_guid, _betweendata[1]);
}
} }
} }
break; else if (_item.Value.Type == JTokenType.Array)
default: {
_guid = Guid.NewGuid().ToString("N"); string[] _itemv = _item.Value.ToObject<string[]>()!;
_wherestr.Append($@" AND `{_column.Key!}` {where[_column.Key!]} @{_guid}"); object[] _colv = _data[_item.Name]!.ToObject<object[]>()!;
param.Add(_guid, row![_column.Key!]); for (var k = 0; k < _itemv.Length; k++)
break; {
_guid = Guid.NewGuid().ToString("N");
_wherestr.AppendLine(@$"AND `{_item.Name}` {_itemv[k]} @{_guid}");
param.Add(_guid, _colv.Length > k ? _colv[k] : _colv[_colv.Length - 1]);
}
}
}
} }
} }
_wherestr.AppendLine(")");
} }
command.AppendLine(_wherestr.ToString());
} }
return _wherestr.ToString();
} }
/// <summary> /// <summary>
@ -111,21 +133,23 @@ namespace ZKLT.Hadoop
/// <param name="table">表</param> /// <param name="table">表</param>
/// <param name="order">排序</param> /// <param name="order">排序</param>
/// <returns></returns> /// <returns></returns>
private string MergeOrder(HDP_Table table, Dictionary<string, object>? order, Dictionary<string, object> param) private string MergeOrder(HDP_Table table, JContainer? order, Dictionary<string, object> param)
{ {
string _guid = ""; string _guid = "";
StringBuilder _orderstr = new StringBuilder(); StringBuilder _orderstr = new StringBuilder();
if (order != null && order.Count > 0) if (order != null && order.Count > 0)
{ {
_orderstr.Append("ORDER BY "); _orderstr.Append("ORDER BY ");
foreach (var key in order.Keys) var _fields = order.Children().ToArray();
foreach (var field in _fields)
{ {
if (table.Columns!.Any(x => x.Key == key)) var _field = (JProperty)field;
if (table.Columns!.Any(x => x.Key == _field.Name))
{ {
var _column = table.Columns!.First(x => x.Key == key); var _column = table.Columns!.First(x => x.Key == _field.Name);
if (order[key] is string) if (_field.Value.Type == JTokenType.String)
{ {
switch (order[key]) switch (_field.Value.ToString())
{ {
case "DESC": case "DESC":
_orderstr.Append($@"`{_column.Key!}` DESC,"); _orderstr.Append($@"`{_column.Key!}` DESC,");
@ -135,9 +159,9 @@ namespace ZKLT.Hadoop
break; break;
} }
} }
else if (order[key] is Newtonsoft.Json.Linq.JArray) else if (_field.Value.Type == JTokenType.Array)
{ {
var _orderTemp = JsonConvert.DeserializeObject<object[]>(JsonConvert.SerializeObject(order[key])); var _orderTemp = _field.Value.ToObject<object[]>();
_orderstr.Append(@$"CASE `{_column.Key!}`"); _orderstr.Append(@$"CASE `{_column.Key!}`");
for (var i = 0; i < _orderTemp!.Length; i++) for (var i = 0; i < _orderTemp!.Length; i++)
{ {
@ -373,9 +397,9 @@ namespace ZKLT.Hadoop
/// </summary> /// </summary>
/// <param name="source">数据源</param> /// <param name="source">数据源</param>
/// <param name="table">数据表</param> /// <param name="table">数据表</param>
/// <param name="row">数据</param> /// <param name="data">数据</param>
/// <returns>是否成功</returns> /// <returns>是否成功</returns>
public bool Insert(HDP_Source source, HDP_Table table, Dictionary<string, object> row) public bool Insert(HDP_Source source, HDP_Table table, JContainer? data)
{ {
//数据校验 //数据校验
if (string.IsNullOrEmpty(table.Key)) if (string.IsNullOrEmpty(table.Key))
@ -386,80 +410,99 @@ namespace ZKLT.Hadoop
{ {
throw new ArgumentNullException("列无效"); throw new ArgumentNullException("列无效");
} }
if (row == null || row.Count == 0) if (data == null || data.Count == 0)
{ {
throw new ArgumentNullException("数据无效"); throw new ArgumentNullException("数据无效");
} }
using (MySqlConnection _connection = new MySqlConnection(source.GetConnectString())) List<JObject> _data = new List<JObject>();
if (data.Type == JTokenType.Object)
{ {
try _data.Add((JObject)data);
{ }
_connection.Open(); else if (data.Type == JTokenType.Array)
} {
catch _data = data.ToObject<List<JObject>>()!;
{ }
throw new ArgumentException("数据源连接失败"); var _result = 0;
} using (TransactionScope _scope = new TransactionScope())
StringBuilder _command = new StringBuilder(); {
using (MySqlConnection _connection = new MySqlConnection(source.GetConnectString()))
//主键检查
var _primarys = table.Columns.Where(x => x.IsPrimary == true).ToArray();
for (var i = 0; i < _primarys.Length; i++)
{ {
var _primary = _primarys[i]; try
if (!string.IsNullOrEmpty(_primary.InsertDefault) || (row.ContainsKey(_primary.Key!) && row[_primary.Key!] != null))
{ {
continue; _connection.Open();
} }
else catch
{ {
throw new ArgumentException($@"主键{_primary.Key}值无效"); throw new ArgumentException("数据源连接失败");
} }
} for (var j = 0; j < _data.Count; j++)
{
var _row = _data[j];
//插入命令 StringBuilder _command = new StringBuilder();
_command.AppendLine($@"INSERT INTO `{table.Key}` (");
StringBuilder _colstr = new StringBuilder(); //主键检查
StringBuilder _parmstr = new StringBuilder(); var _primarys = table.Columns.Where(x => x.IsPrimary == true).ToArray();
Dictionary<string, object> _params = new Dictionary<string, object>(); for (var i = 0; i < _primarys.Length; i++)
for (var i = 0; i < table.Columns.Length; i++) {
{ var _primary = _primarys[i];
var _column = table.Columns[i]; if (!string.IsNullOrEmpty(_primary.InsertDefault) || _row.ContainsKey(_primary.Key!))
if (row.ContainsKey(_column.Key!) && row[_column.Key!] != null) {
continue;
}
else
{
throw new ArgumentException($@"主键{_primary.Key}值无效");
}
}
//插入命令
_command.AppendLine($@"INSERT INTO `{table.Key}` (");
StringBuilder _colstr = new StringBuilder();
StringBuilder _parmstr = new StringBuilder();
Dictionary<string, object> _params = new Dictionary<string, object>();
for (var i = 0; i < table.Columns.Length; i++)
{
var _column = table.Columns[i];
if (_row.ContainsKey(_column.Key!))
{
_colstr.Append($@"`{_column.Key!}`,");
_parmstr.Append($@"@{_column.Key},");
_params.Add(_column.Key!, ((JValue)_row[_column.Key!]!).Value!);
}
else if (!string.IsNullOrEmpty(_column.InsertDefault))
{
_colstr.Append($@"`{_column.Key!}`,");
_parmstr.Append($@"@{_column.Key},");
_params.Add(_column.Key!, HDP_CommandAction.Convert(_column.InsertDefault, _row));
}
}
if (_colstr[_colstr.Length - 1] == ',')
{
_colstr.Remove(_colstr.Length - 1, 1);
}
if (_parmstr[_parmstr.Length - 1] == ',')
{
_parmstr.Remove(_parmstr.Length - 1, 1);
}
_command.AppendLine(_colstr.ToString());
_command.AppendLine(") VALUES (");
_command.AppendLine(_parmstr.ToString());
_command.AppendLine(")");
_result += _connection.Execute(_command.ToString(), _params);
}
_connection.Close();
if (_result >= _data.Count)
{ {
_colstr.Append($@"`{_column.Key!}`,"); _scope.Complete();
_parmstr.Append($@"@{_column.Key},"); return true;
_params.Add(_column.Key!, row[_column.Key!]);
} }
else if (!string.IsNullOrEmpty(_column.InsertDefault)) else
{ {
_colstr.Append($@"`{_column.Key!}`,"); return false;
_parmstr.Append($@"@{_column.Key},");
_params.Add(_column.Key!, HDP_CommandAction.Convert(_column.InsertDefault, row));
} }
} }
if (_colstr[_colstr.Length - 1] == ',')
{
_colstr.Remove(_colstr.Length - 1, 1);
}
if (_parmstr[_parmstr.Length - 1] == ',')
{
_parmstr.Remove(_parmstr.Length - 1, 1);
}
_command.AppendLine(_colstr.ToString());
_command.AppendLine(") VALUES (");
_command.AppendLine(_parmstr.ToString());
_command.AppendLine(")");
var _result = _connection.Execute(_command.ToString(), _params);
_connection.Close();
if (_result > 0)
{
return true;
}
else
{
return false;
}
} }
} }
@ -471,7 +514,7 @@ namespace ZKLT.Hadoop
/// <param name="where">条件</param> /// <param name="where">条件</param>
/// <param name="row">数据</param> /// <param name="row">数据</param>
/// <returns>是否成功</returns> /// <returns>是否成功</returns>
public bool Update(HDP_Source source, HDP_Table table, Dictionary<string, string> where, Dictionary<string, object> row) public bool Update(HDP_Source source, HDP_Table table, JContainer? where, JContainer? data)
{ {
//数据校验 //数据校验
if (string.IsNullOrEmpty(table.Key)) if (string.IsNullOrEmpty(table.Key))
@ -482,64 +525,89 @@ namespace ZKLT.Hadoop
{ {
throw new ArgumentNullException("列无效"); throw new ArgumentNullException("列无效");
} }
if (where == null || where.Count == 0) if (where == null)
{ {
throw new ArgumentNullException("条件无效"); throw new ArgumentNullException("条件无效");
} }
if (row == null || row.Count == 0) List<JObject> _where = new List<JObject>();
if (where.Type == JTokenType.Object)
{
_where.Add((JObject)where);
}
else if (where.Type == JTokenType.Array)
{
_where.AddRange(((JArray)where).ToObject<JObject[]>()!);
}
if (data == null || data.Count == 0)
{ {
throw new ArgumentNullException("数据无效"); throw new ArgumentNullException("数据无效");
} }
using (MySqlConnection _connection = new MySqlConnection(source.GetConnectString())) List<JObject> _data = new List<JObject>();
if (data.Type == JTokenType.Object)
{ {
try _data.Add((JObject)data);
{ }
_connection.Open(); else if (data.Type == JTokenType.Array)
} {
catch _data = data.ToObject<List<JObject>>()!;
{ }
throw new ArgumentException("数据源连接失败"); var _result = 0;
} using (TransactionScope _scope = new TransactionScope())
{
//更新命令 using (MySqlConnection _connection = new MySqlConnection(source.GetConnectString()))
StringBuilder _command = new StringBuilder();
Dictionary<string, object> _params = new Dictionary<string, object>();
_command.AppendLine(@$"UPDATE `{table.Key}` SET ");
//更新列
StringBuilder _colstr = new StringBuilder();
for (var i = 0; i < table.Columns.Length; i++)
{ {
var _column = table.Columns[i]; try
if (row.ContainsKey(_column.Key!) && !where.ContainsKey(_column.Key!) && row[_column.Key!] != null)
{ {
_colstr.Append($@"`{_column.Key!}`=@{_column.Key!},"); _connection.Open();
_params.Add(_column.Key!, row[_column.Key!]);
} }
else if (!string.IsNullOrEmpty(_column.UpdateDefault)) catch
{ {
_colstr.Append($@"`{_column.Key!}`=@{_column.Key!},"); throw new ArgumentException("数据源连接失败");
_params.Add(_column.Key!, HDP_CommandAction.Convert(_column.UpdateDefault, row));
} }
} for (var j = 0; j < _data.Count; j++)
if (_colstr[_colstr.Length - 1] == ',') {
{ var _row = _data[j];
_colstr.Remove(_colstr.Length - 1, 1); //更新命令
} StringBuilder _command = new StringBuilder();
_command.AppendLine(_colstr.ToString()); Dictionary<string, object> _params = new Dictionary<string, object>();
_command.AppendLine(@$"UPDATE `{table.Key}` SET ");
//执行条件 //更新列
MergeWhere(table, where, row, _command, _params); StringBuilder _colstr = new StringBuilder();
for (var i = 0; i < table.Columns.Length; i++)
{
var _column = table.Columns[i];
if (_row.ContainsKey(_column.Key!) && !_where.Any(x => x.ContainsKey(_column.Key!)))
{
_colstr.Append(@$"`{_column.Key!}`={HDP_CommandAction.ColConvert(_column,_params, ((JValue)_row[_column.Key!]!).Value!)},");
}
else if (!string.IsNullOrEmpty(_column.UpdateDefault))
{
_colstr.Append($@"`{_column.Key!}`=@{_column.Key!},");
_params.Add(_column.Key!, HDP_CommandAction.Convert(_column.UpdateDefault, _row));
}
}
if (_colstr[_colstr.Length - 1] == ',')
{
_colstr.Remove(_colstr.Length - 1, 1);
}
_command.AppendLine(_colstr.ToString());
var _result = _connection.Execute(_command.ToString(), _params); //执行条件
_connection.Close(); _command.AppendLine(MergeWhere(table, where, _row, _params));
if (_result > 0)
{ _result += _connection.Execute(_command.ToString(), _params);
return true; }
} _connection.Close();
else if (_result >= _data.Count)
{ {
return false; _scope.Complete();
return true;
}
else
{
return false;
}
} }
} }
} }
@ -552,7 +620,7 @@ namespace ZKLT.Hadoop
/// <param name="where">条件</param> /// <param name="where">条件</param>
/// <param name="row">数据</param> /// <param name="row">数据</param>
/// <returns>是否成功</returns> /// <returns>是否成功</returns>
public bool Delete(HDP_Source source, HDP_Table table, Dictionary<string, string> where, Dictionary<string, object> row) public bool Delete(HDP_Source source, HDP_Table table, JContainer? where, JContainer? data)
{ {
//数据校验 //数据校验
if (string.IsNullOrEmpty(table.Key)) if (string.IsNullOrEmpty(table.Key))
@ -563,41 +631,59 @@ namespace ZKLT.Hadoop
{ {
throw new ArgumentNullException("列无效"); throw new ArgumentNullException("列无效");
} }
if (where == null || where.Count == 0) if (where == null)
{ {
throw new ArgumentNullException("条件无效"); throw new ArgumentNullException("条件无效");
} }
if (row == null || row.Count == 0) if (data == null || data.Count == 0)
{ {
throw new ArgumentNullException("数据无效"); throw new ArgumentNullException("数据无效");
} }
using (MySqlConnection _connection = new MySqlConnection(source.GetConnectString())) List<JObject> _data = new List<JObject>();
if (data.Type == JTokenType.Object)
{ {
try _data.Add((JObject)data);
{ }
_connection.Open(); else if (data.Type == JTokenType.Array)
} {
catch _data = data.ToObject<List<JObject>>()!;
}
var _result = 0;
using (TransactionScope _scope = new TransactionScope())
{
using (MySqlConnection _connection = new MySqlConnection(source.GetConnectString()))
{ {
throw new ArgumentException("数据源连接失败"); try
} {
_connection.Open();
}
catch
{
throw new ArgumentException("数据源连接失败");
}
//更新命令 for (var j = 0; j < _data.Count; j++)
StringBuilder _command = new StringBuilder(); {
Dictionary<string, object> _params = new Dictionary<string, object>(); var _row = _data[j];
_command.AppendLine(@$"DELETE FROM `{table.Key}`"); //删除命令
StringBuilder _command = new StringBuilder();
Dictionary<string, object> _params = new Dictionary<string, object>();
_command.AppendLine(@$"DELETE FROM `{table.Key}`");
MergeWhere(table, where, row, _command, _params); _command.AppendLine(MergeWhere(table, where, _row, _params));
var _result = _connection.Execute(_command.ToString(), _params); _result += _connection.Execute(_command.ToString(), _params);
_connection.Close(); }
if (_result > 0) _connection.Close();
{ if (_result >= _data.Count)
return true; {
} _scope.Complete();
else return true;
{ }
return false; else
{
return false;
}
} }
} }
} }
@ -608,9 +694,9 @@ namespace ZKLT.Hadoop
/// <param name="source">数据源</param> /// <param name="source">数据源</param>
/// <param name="table">数据表</param> /// <param name="table">数据表</param>
/// <param name="where">条件</param> /// <param name="where">条件</param>
/// <param name="row">数据</param> /// <param name="data">数据</param>
/// <returns>结果</returns> /// <returns>结果</returns>
public T? QuerySingle<T>(HDP_Source source, HDP_Table table, Dictionary<string, string> where, Dictionary<string, object> row, public T? QuerySingle<T>(HDP_Source source, HDP_Table table, JContainer? where, JContainer? data,
string[]? col) string[]? col)
{ {
//数据校验 //数据校验
@ -622,11 +708,11 @@ namespace ZKLT.Hadoop
{ {
throw new ArgumentNullException("列无效"); throw new ArgumentNullException("列无效");
} }
if (where == null || where.Count == 0) if (where == null)
{ {
throw new ArgumentNullException("条件无效"); throw new ArgumentNullException("条件无效");
} }
if (row == null || row.Count == 0) if (data == null || data.Count == 0)
{ {
throw new ArgumentNullException("数据无效"); throw new ArgumentNullException("数据无效");
} }
@ -656,7 +742,7 @@ namespace ZKLT.Hadoop
} }
//执行条件 //执行条件
MergeWhere(table, where, row, _command, _params); _command.AppendLine(MergeWhere(table, where, data, _params));
var _result = _connection.Query<T>(_command.ToString(), _params).ToArray(); var _result = _connection.Query<T>(_command.ToString(), _params).ToArray();
_connection.Close(); _connection.Close();
@ -678,10 +764,10 @@ namespace ZKLT.Hadoop
/// <param name="source">数据源</param> /// <param name="source">数据源</param>
/// <param name="table">数据表</param> /// <param name="table">数据表</param>
/// <param name="where">条件</param> /// <param name="where">条件</param>
/// <param name="row">数据</param> /// <param name="data">数据</param>
/// <returns>结果集</returns> /// <returns>结果集</returns>
public T[] Query<T>(HDP_Source source, HDP_Table table, Dictionary<string, string>? where, Dictionary<string, object>? row, public T[] Query<T>(HDP_Source source, HDP_Table table, JContainer? where, JContainer? data,
Dictionary<string, object>? order, string[]? col) JContainer? order, string[]? col)
{ {
//数据校验 //数据校验
if (string.IsNullOrEmpty(table.Key)) if (string.IsNullOrEmpty(table.Key))
@ -718,7 +804,7 @@ namespace ZKLT.Hadoop
} }
//执行条件 //执行条件
MergeWhere(table, where, row, _command, _params); _command.AppendLine(MergeWhere(table, where, data, _params));
//执行排序 //执行排序
_command.AppendLine(MergeOrder(table, order, _params)); _command.AppendLine(MergeOrder(table, order, _params));
@ -738,8 +824,8 @@ namespace ZKLT.Hadoop
/// <param name="where">条件</param> /// <param name="where">条件</param>
/// <param name="row">数据</param> /// <param name="row">数据</param>
/// <returns>结果集</returns> /// <returns>结果集</returns>
public HDP_Page<T> QueryPage<T>(HDP_Source source, HDP_Table table, int pageIndex, int pageSize, Dictionary<string, string>? where, public HDP_Page<T> QueryPage<T>(HDP_Source source, HDP_Table table, int pageIndex, int pageSize, JContainer? where,
Dictionary<string, object>? row, Dictionary<string, object>? order, string[]? col) JContainer? data, JContainer? order, string[]? col)
{ {
//数据校验 //数据校验
if (string.IsNullOrEmpty(table.Key)) if (string.IsNullOrEmpty(table.Key))
@ -776,35 +862,10 @@ namespace ZKLT.Hadoop
} }
//执行条件 //执行条件
MergeWhere(table, where, row, _command, _params); _command.AppendLine(MergeWhere(table, where, data, _params));
//执行排序 //执行排序
StringBuilder _orderstr = new StringBuilder(); _command.AppendLine(MergeOrder(table, order, _params));
_orderstr.Append("ORDER BY ");
if (order != null && order.Count > 0)
{
for (var i = 0; i < table.Columns.Length; i++)
{
var _column = table.Columns[i];
if (order.ContainsKey(_column.Key!))
{
switch (order[_column.Key!])
{
case "DESC":
_orderstr.Append($@"`{_column.Key!}` DESC,");
break;
default:
_orderstr.Append($@"`{_column.Key!}` ASC,");
break;
}
}
}
if (_orderstr[_orderstr.Length - 1] == ',')
{
_orderstr.Remove(_orderstr.Length - 1, 1);
}
_command.AppendLine(_orderstr.ToString());
}
var _result = new HDP_Page<T>(); var _result = new HDP_Page<T>();
_result.PageIndex = pageIndex; _result.PageIndex = pageIndex;
_result.PageSize = pageSize; _result.PageSize = pageSize;

@ -1 +0,0 @@
docker build -f Hadoop\ZKLT.Hadoop.API\Dockerfile -t hadoop:latest .

@ -13,12 +13,6 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ZKLT.Hadoop.Interface", "Ha
EndProject EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ZKLT.Hadoop.API", "Hadoop\ZKLT.Hadoop.API\ZKLT.Hadoop.API.csproj", "{398DF3E5-B94B-4B2E-9DC6-B6741D7CF4DB}" Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ZKLT.Hadoop.API", "Hadoop\ZKLT.Hadoop.API\ZKLT.Hadoop.API.csproj", "{398DF3E5-B94B-4B2E-9DC6-B6741D7CF4DB}"
EndProject EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "docker-compose", "docker-compose", "{1E26F2C2-6BCB-4E5D-ADC5-C6AD923155F9}"
ProjectSection(SolutionItems) = preProject
docker-compose\hadoop.yml = docker-compose\hadoop.yml
docker-compose\mysql.yml = docker-compose\mysql.yml
EndProjectSection
EndProject
Global Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU Debug|Any CPU = Debug|Any CPU

@ -1,20 +0,0 @@
version: '3'
services:
hadoop_mysql:
restart: always
image: mysql:8
container_name: hadoop_mysql
# volumes:
# - /apps/mysql/mydir:/mydir
# - /apps/mysql/datadir:/var/lib/mysql
# - /apps/mysql/conf/my.cnf:/etc/my.cnf
# # 数据库还原目录 可将需要还原的sql文件放在这里
# - /apps/mysql/source:/docker-entrypoint-initdb.d
environment:
- "MYSQL_ROOT_PASSWORD=root"
- "MYSQL_DATABASE=hadoopdb"
- "TZ=Asia/Shanghai"
ports:
# 使用宿主机的3306端口映射到容器的3306端口
# 宿主机:容器
- 3306:3306
Loading…
Cancel
Save