Compare commits

..

10 Commits

@ -22,14 +22,38 @@ namespace ZKLT.Hadoop.API.Controllers
private IHadoopService _HadoopService;
[HttpGet("getid")]
public ActionResult GetId([FromQuery] string? prefix) {
string _result = "";
if (!string.IsNullOrEmpty(prefix)) {
_result += prefix;
}
var _date = DateTime.Now;
_result += $@"{_date.Year.ToString().PadLeft(4,'0')}{_date.Month.ToString().PadLeft(2, '0')}{_date.Day.ToString().PadLeft(2, '0')}{_date.Hour.ToString().PadLeft(2, '0')}{_date.Minute.ToString().PadLeft(2, '0')}{_date.Second.ToString().PadLeft(2, '0')}{_date.Millisecond.ToString().PadLeft(3, '0')}{new Random().Next(1000).ToString().PadLeft(4,'0')}";
return Ok(_result);
public ActionResult GetId([FromQuery] string? prefix, [FromQuery] int? count) {
if (count != null && count > 0)
{
List<string> _result = new List<string>();
for (int i = 0; i < count; i++) {
string _temp = "";
if (!string.IsNullOrEmpty(prefix))
{
_temp += prefix;
}
var _date = DateTime.Now;
_temp += $@"{_date.Year.ToString().PadLeft(4, '0')}{_date.Month.ToString().PadLeft(2, '0')}{_date.Day.ToString().PadLeft(2, '0')}{_date.Hour.ToString().PadLeft(2, '0')}{_date.Minute.ToString().PadLeft(2, '0')}{_date.Second.ToString().PadLeft(2, '0')}{_date.Millisecond.ToString().PadLeft(3, '0')}{new Random().Next(9999).ToString().PadLeft(4, '0')}";
if (_result.Any(v => v == _temp))
{
i--;
}
else {
_result.Add(_temp);
}
}
return Ok(_result);
}
else {
string _result = "";
if (!string.IsNullOrEmpty(prefix))
{
_result += prefix;
}
var _date = DateTime.Now;
_result += $@"{_date.Year.ToString().PadLeft(4, '0')}{_date.Month.ToString().PadLeft(2, '0')}{_date.Day.ToString().PadLeft(2, '0')}{_date.Hour.ToString().PadLeft(2, '0')}{_date.Minute.ToString().PadLeft(2, '0')}{_date.Second.ToString().PadLeft(2, '0')}{_date.Millisecond.ToString().PadLeft(3, '0')}{new Random().Next(9999).ToString().PadLeft(4, '0')}";
return Ok(_result);
}
}
[HttpGet("getsource")]

@ -40,25 +40,25 @@ namespace ZKLT.Hadoop.API
app.UseCors("all");
#endregion
app.UseHadoop((c) =>
{
c.Host = "127.0.0.1";
c.Account = "root";
c.PassWord = "root";
c.Key = "hadoopdb";
c.Port = 3306;
});
//app.UseHadoop((c) =>
//{
// c.Host = "172.17.0.1";
// c.Host = "127.0.0.1";
// c.Account = "root";
// c.PassWord = "root";
// c.Key = "testdb";
// c.Port = 4000;
// c.Key = "hadoopdb";
// c.Port = 3306;
//});
app.UseHadoop((c) =>
{
c.Host = app.Configuration["ConnectionStrings:Host"];
c.Account = app.Configuration["ConnectionStrings:Account"];
c.PassWord = app.Configuration["ConnectionStrings:PassWord"];
c.Key = app.Configuration["ConnectionStrings:Key"];
c.Port = Convert.ToInt32(app.Configuration["ConnectionStrings:Port"]);
});
//app.UseHadoop((c) =>
//{
// c.Host = "118.195.165.218";

@ -8,7 +8,7 @@
"ASPNETCORE_ENVIRONMENT": "Development"
},
"dotnetRunMessages": true,
"applicationUrl": "http://localhost:5171"
"applicationUrl": "http://*:5171"
},
"IIS Express": {
"commandName": "IISExpress",
@ -33,7 +33,7 @@
"windowsAuthentication": false,
"anonymousAuthentication": true,
"iisExpress": {
"applicationUrl": "http://localhost:59844",
"applicationUrl": "http://localhost:43666",
"sslPort": 0
}
}

@ -4,5 +4,12 @@
"Default": "Information",
"Microsoft.AspNetCore": "Warning"
}
},
"ConnectionStrings": {
"Host": "1.94.127.210",
"Account": "root",
"PassWord": "Panjiandong1994",
"Key": "erptest",
"Port": "3306"
}
}

@ -5,5 +5,12 @@
"Microsoft.AspNetCore": "Warning"
}
},
"AllowedHosts": "*"
"AllowedHosts": "*",
"ConnectionStrings": {
"Host": "1.94.127.210",
"Account": "root",
"PassWord": "Panjiandong1994",
"Key": "erptest",
"Port": "3306"
}
}

@ -0,0 +1 @@
docker build -f ./Dockerfile -t hadoop:1.0.3 ../../.

@ -2,9 +2,10 @@ version: '3'
services:
hadoop:
restart: always
image: hadoop:latest
image: hadoop:1.0.0
container_name: hadoop
# volumes:
volumes:
- ./appsettings.json:/app/appsettings.json
# - /apps/mysql/mydir:/mydir
# - /apps/mysql/datadir:/var/lib/mysql
# - /apps/mysql/conf/my.cnf:/etc/my.cnf
@ -16,3 +17,4 @@ services:
# 使用宿主机的3306端口映射到容器的3306端口
# 宿主机:容器
- 5000:8080
network_mode: bridge

@ -1,4 +1,5 @@
using System;
using Newtonsoft.Json.Linq;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
@ -32,7 +33,7 @@ namespace ZKLT.Hadoop.Interface
/// <param name="table">数据表</param>
/// <param name="row">数据</param>
/// <returns>是否成功</returns>
public bool Insert(HDP_Source source, HDP_Table table, Dictionary<string, object> row);
public bool Insert(HDP_Source source, HDP_Table table, JContainer? row);
/// <summary>
/// 更新
@ -42,7 +43,7 @@ namespace ZKLT.Hadoop.Interface
/// <param name="where">条件</param>
/// <param name="row">数据</param>
/// <returns>是否成功</returns>
public bool Update(HDP_Source source, HDP_Table table, Dictionary<string, string> where, Dictionary<string, object> row);
public bool Update(HDP_Source source, HDP_Table table, JContainer? where, JContainer? row);
/// <summary>
/// 删除
@ -52,7 +53,7 @@ namespace ZKLT.Hadoop.Interface
/// <param name="where">条件</param>
/// <param name="row">数据</param>
/// <returns>是否成功</returns>
public bool Delete(HDP_Source source, HDP_Table table, Dictionary<string, string> where, Dictionary<string, object> row);
public bool Delete(HDP_Source source, HDP_Table table, JContainer? where, JContainer? row);
/// <summary>
/// 查询单个
@ -62,7 +63,7 @@ namespace ZKLT.Hadoop.Interface
/// <param name="where">条件</param>
/// <param name="row">数据</param>
/// <returns>结果</returns>
public T? QuerySingle<T>(HDP_Source source, HDP_Table table, Dictionary<string, string> where, Dictionary<string, object> row, string[]? col);
public T? QuerySingle<T>(HDP_Source source, HDP_Table table, JContainer? where, JContainer? row, string[]? col);
/// <summary>
/// 查询列表
@ -73,8 +74,8 @@ namespace ZKLT.Hadoop.Interface
/// <param name="where">条件</param>
/// <param name="row">数据</param>
/// <returns>结果集</returns>
public T[] Query<T>(HDP_Source source, HDP_Table table, Dictionary<string, string>? where, Dictionary<string, object>? row,
Dictionary<string, object>? order, string[]? col);
public T[] Query<T>(HDP_Source source, HDP_Table table, JContainer? where, JContainer? row,
JContainer? order, string[]? col);
/// <summary>
/// 查询列表
@ -85,7 +86,7 @@ namespace ZKLT.Hadoop.Interface
/// <param name="where">条件</param>
/// <param name="row">数据</param>
/// <returns>结果集</returns>
public HDP_Page<T> QueryPage<T>(HDP_Source source, HDP_Table table, int pageIndex, int pageSize, Dictionary<string, string>? where, Dictionary<string, object>? row, Dictionary<string, object>? order, string[]? col);
public HDP_Page<T> QueryPage<T>(HDP_Source source, HDP_Table table, int pageIndex, int pageSize, JContainer? where, JContainer? row, JContainer? order, string[]? col);
/// <summary>

@ -1,4 +1,5 @@
using System;
using Newtonsoft.Json.Linq;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
@ -23,11 +24,11 @@ namespace ZKLT.Hadoop.Model
private string[]? _Col;
private Dictionary<string, string>? _Where;
private JContainer? _Where;
private Dictionary<string, object>? _Data;
private JContainer? _Data;
private Dictionary<string, object>? _Order;
private JContainer? _Order;
/// <summary>
/// 源
@ -42,12 +43,12 @@ namespace ZKLT.Hadoop.Model
/// <summary>
/// 条件
/// </summary>
public Dictionary<string, string>? Where { get => _Where; set => _Where = value; }
public JContainer? Where { get => _Where; set => _Where = value; }
/// <summary>
/// 数据
/// </summary>
public Dictionary<string, object>? Data { get => _Data; set => _Data = value; }
public JContainer? Data { get => _Data; set => _Data = value; }
/// <summary>
/// 分页下标
@ -62,7 +63,7 @@ namespace ZKLT.Hadoop.Model
/// <summary>
/// 排序
/// </summary>
public Dictionary<string, object>? Order { get => _Order; set => _Order = value; }
public JContainer? Order { get => _Order; set => _Order = value; }
/// <summary>
/// 命令类型

@ -1,8 +1,10 @@
using System;
using Newtonsoft.Json.Linq;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Text.RegularExpressions;
namespace ZKLT.Hadoop.Model
{
@ -27,7 +29,7 @@ namespace ZKLT.Hadoop.Model
/// <param name="action">函数</param>
/// <param name="param">参数</param>
/// <returns>命令</returns>
public static object Convert(string action, Dictionary<string, object> param)
public static object Convert(string action, JContainer param)
{
if (action == DATENOW)
{
@ -41,5 +43,35 @@ namespace ZKLT.Hadoop.Model
return action;
}
}
/// <summary>
/// 列转换
/// </summary>
/// <param name="column">列</param>
/// <param name="param">参数</param>
/// <param name="action">公式</param>
/// <returns></returns>
public static string ColConvert(HDP_Column column, Dictionary<string, object> param,object value) {
string _guid;
if (value is string) {
string _action = value.ToString()!;
Regex _regex1 = new Regex(@"(?<=(\+\=))[\d\.]+");
if(_regex1.IsMatch(_action))
{
_guid = Guid.NewGuid().ToString("N");
param.Add(_guid,_regex1.Match(_action).Value);
return @$"`{column.Key}` + @{_guid}";
}
Regex _regex2 = new Regex(@"(?<=(\-\=))[\d\.]+");
if (_regex2.IsMatch(_action))
{
_guid = Guid.NewGuid().ToString("N");
param.Add(_guid, _regex2.Match(_action).Value);
return @$"`{column.Key}` - @{_guid}";
}
}
param.Add(column.Key!, value);
return $@"@{column.Key}";
}
}
}

@ -1,4 +1,6 @@
using System;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using System;
using System.Collections.Generic;
using System.ComponentModel.DataAnnotations.Schema;
using System.Linq;
@ -138,5 +140,14 @@ namespace ZKLT.Hadoop.Model
return _result;
}
/// <summary>
/// 类转JObject
/// </summary>
/// <param name="data">数据</param>
/// <returns></returns>
public static JObject Class2JObject(object data) {
return JsonConvert.DeserializeObject<JObject>(JsonConvert.SerializeObject(data)!)!;
}
}
}

@ -10,4 +10,8 @@
<None Remove="HDP_Table.cs~RF4db01d6c.TMP" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Newtonsoft.Json" Version="13.0.3" />
</ItemGroup>
</Project>

@ -1,6 +1,7 @@
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.DependencyInjection;
using MySqlX.XDevAPI.Relational;
using Newtonsoft.Json.Linq;
using System;
using System.Collections.Generic;
using System.Linq;
@ -140,10 +141,11 @@ namespace ZKLT.Hadoop
{
return _Source;
}
var _result = _TableService.QuerySingle<HDP_Source>(_Source, GetTable("HDP_Source")!, new Dictionary<string, string>
var _result = _TableService.QuerySingle<HDP_Source>(_Source, GetTable("HDP_Source")!, new JObject
{
{ "Id","=" }
}, new Dictionary<string, object> {
}, new JObject {
{ "Id",sourceid}
}, null);
return _result;
@ -186,7 +188,7 @@ namespace ZKLT.Hadoop
throw new ArgumentException("编号已存在");
}
return _TableService.Insert(_Source, GetTable("HDP_Source")!, HDP_Table.Class2Dictionary(source));
return _TableService.Insert(_Source, GetTable("HDP_Source")!, HDP_Table.Class2JObject(source));
}
/// <summary>
@ -226,9 +228,9 @@ namespace ZKLT.Hadoop
throw new ArgumentException("编号不存在");
}
return _TableService.Update(_Source, GetTable("HDP_Source")!, new Dictionary<string, string> {
return _TableService.Update(_Source, GetTable("HDP_Source")!, new JObject {
{"Id","=" }
}, HDP_Table.Class2Dictionary(source));
}, HDP_Table.Class2JObject(source));
}
/// <summary>
@ -248,9 +250,9 @@ namespace ZKLT.Hadoop
throw new ArgumentException("编号不存在");
}
return _TableService.Delete(_Source, GetTable("HDP_Source")!, new Dictionary<string, string> {
return _TableService.Delete(_Source, GetTable("HDP_Source")!, new JObject{
{"Id","=" }
}, new Dictionary<string, object> {
}, new JObject{
{"Id",sourceid }
});
}
@ -282,18 +284,18 @@ namespace ZKLT.Hadoop
return _Tables.First(x => x.Id == tableid);
}
var _result = _TableService.QuerySingle<HDP_Table>(_Source, GetTable("HDP_Table")!, new Dictionary<string, string>
var _result = _TableService.QuerySingle<HDP_Table>(_Source, GetTable("HDP_Table")!, new JObject
{
{ "Id","=" }
}, new Dictionary<string, object> {
}, new JObject{
{ "Id",tableid}
}, null);
if (_result != null)
{
_result.Columns = _TableService.Query<HDP_Column>(_Source, GetTable("HDP_Column")!, new Dictionary<string, string> {
_result.Columns = _TableService.Query<HDP_Column>(_Source, GetTable("HDP_Column")!, new JObject {
{ "TableId","="}
}, new Dictionary<string, object> {
}, new JObject{
{"TableId",_result.Id! }
}, null, null);
}
@ -320,7 +322,7 @@ namespace ZKLT.Hadoop
{
using (TransactionScope _scope = new TransactionScope())
{
if (!_TableService.Insert(_Source, GetTable("HDP_Table")!, HDP_Table.Class2Dictionary(table)))
if (!_TableService.Insert(_Source, GetTable("HDP_Table")!, HDP_Table.Class2JObject(table)))
{
return false;
}
@ -328,7 +330,7 @@ namespace ZKLT.Hadoop
{
var _column = table.Columns![i];
_column.TableId = table.Id;
if (!_TableService.Insert(_Source, GetTable("HDP_Column")!, HDP_Table.Class2Dictionary(_column)))
if (!_TableService.Insert(_Source, GetTable("HDP_Column")!, HDP_Table.Class2JObject(_column)))
{
return false;
}
@ -362,9 +364,9 @@ namespace ZKLT.Hadoop
{
using (TransactionScope _scope = new TransactionScope())
{
if (!_TableService.Update(_Source, GetTable("HDP_Table")!, new Dictionary<string, string> {
if (!_TableService.Update(_Source, GetTable("HDP_Table")!, new JObject{
{ "Id","="}
}, HDP_Table.Class2Dictionary(table)))
}, HDP_Table.Class2JObject(table)))
{
return false;
}
@ -373,21 +375,21 @@ namespace ZKLT.Hadoop
{
var _column = table.Columns![i];
_column.TableId = table.Id;
if (_TableService.QuerySingle<HDP_Column>(_Source, GetTable("HDP_Column")!, new Dictionary<string, string>
if (_TableService.QuerySingle<HDP_Column>(_Source, GetTable("HDP_Column")!, new JObject
{
{"Id","=" }
}, HDP_Table.Class2Dictionary(_column), null) == null)
}, HDP_Table.Class2JObject(_column), null) == null)
{
if (!_TableService.Insert(_Source, GetTable("HDP_Column")!, HDP_Table.Class2Dictionary(_column)))
if (!_TableService.Insert(_Source, GetTable("HDP_Column")!, HDP_Table.Class2JObject(_column)))
{
return false;
}
}
else
{
if (!_TableService.Update(_Source, GetTable("HDP_Column")!, new Dictionary<string, string> {
if (!_TableService.Update(_Source, GetTable("HDP_Column")!, new JObject {
{"Id","=" }
}, HDP_Table.Class2Dictionary(_column)))
}, HDP_Table.Class2JObject(_column)))
{
return false;
}
@ -423,17 +425,17 @@ namespace ZKLT.Hadoop
using (TransactionScope _scope = new TransactionScope())
{
if (!_TableService.Delete(_Source, GetTable("HDP_Table")!, new Dictionary<string, string> {
if (!_TableService.Delete(_Source, GetTable("HDP_Table")!, new JObject{
{"Id","=" }
}, new Dictionary<string, object> {
}, new JObject {
{"Id",tableId }
}))
{
return false;
}
if (!_TableService.Delete(_Source, GetTable("HDP_Column")!, new Dictionary<string, string> {
if (!_TableService.Delete(_Source, GetTable("HDP_Column")!, new JObject{
{ "TableId","="}
}, new Dictionary<string, object> {
}, new JObject {
{"TableId",tableId }
}))
{

@ -12,6 +12,8 @@ using MySqlX.XDevAPI.Relational;
using Mysqlx.Crud;
using Newtonsoft.Json;
using Mysqlx.Resultset;
using Newtonsoft.Json.Linq;
using System.Transactions;
namespace ZKLT.Hadoop
{
@ -22,60 +24,80 @@ namespace ZKLT.Hadoop
/// </summary>
/// <param name="table">表</param>
/// <param name="where">条件</param>
/// <param name="row">数据</param>
/// <param name="command">命令</param>
/// <param name="data">数据</param>
/// <param name="param">参数</param>
private void MergeWhere(HDP_Table table, Dictionary<string, string>? where, Dictionary<string, object>? row, StringBuilder command, Dictionary<string, object> param)
private string MergeWhere(HDP_Table table, JContainer? where, JContainer? data, Dictionary<string, object> param)
{
string _guid = "";
//执行条件
StringBuilder _wherestr = new StringBuilder();
_wherestr.Append("WHERE 1 = 1");
if (where != null && where.Count > 0)
string _guid = "";
if (where != null && data != null)
{
for (var i = 0; i < table.Columns!.Length; i++)
_wherestr.AppendLine("WHERE");
var _wheres = new List<JObject>();
var _datas = new List<JObject>();
if (where.Type == JTokenType.Object)
{
_wheres.Add((JObject)where);
_datas.Add((JObject)data);
}
else if (where.Type == JTokenType.Array)
{
var _column = table.Columns[i];
if (where.ContainsKey(_column.Key!))
for(var i = 0;i < where.Count;i++)
{
switch (where[_column.Key!])
_wheres.Add((JObject)where.Children().ToArray()[i]);
_datas.Add((JObject)data.Children().ToArray()[i]);
}
}
for (var j = 0; j < _wheres.Count; j++) {
if (j > 0) {
_wherestr.AppendLine("OR");
}
_wherestr.AppendLine("(");
_wherestr.AppendLine("1 = 1");
var _where = _wheres[j];
var _data = _datas[j];
if (_where.Count > 0)
{
var _fileds = _where.Children().ToArray();
for (var i = 0; i < _fileds.Length; i++)
{
case HDP_WhereType.LIKE:
_guid = Guid.NewGuid().ToString("N");
_wherestr.Append($@" AND `{_column.Key!}` {where[_column.Key!]} CONCAT('%',@{_guid},'%')");
param.Add(_guid, row![_column.Key!]);
break;
case HDP_WhereType.BETWEEN:
if (row![_column.Key!] != null)
var _item = (JProperty)_fileds[i];
if (table.Columns!.Any(x => x.Key == _item.Name) && _data.ContainsKey(_item.Name))
{
if (_item.Value.Type == JTokenType.String)
{
var _betweendata = JsonConvert.DeserializeObject<object[]>(JsonConvert.SerializeObject(row![_column.Key!]));
if (_betweendata != null)
_guid = Guid.NewGuid().ToString("N");
_wherestr.AppendLine(@$"AND `{_item.Name}` {_item.Value.ToString()} @{_guid}");
if (_data[_item.Name]!.Type == JTokenType.Object || _data[_item.Name]!.Type == JTokenType.Array)
{
param.Add(_guid, _data[_item.Name]!);
}
else if (_item.Value.ToString() == HDP_WhereType.LIKE)
{
if (_betweendata[0] != null && _betweendata[0].ToString() != "")
{
_guid = Guid.NewGuid().ToString("N");
_wherestr.Append($@" AND `{_column.Key!}` >= @{_guid}");
param.Add(_guid, _betweendata[0]);
}
if (_betweendata[1] != null && _betweendata[1].ToString() != "")
{
_guid = Guid.NewGuid().ToString("N");
_wherestr.Append($@" AND `{_column.Key!}` <= @{_guid}");
param.Add(_guid, _betweendata[1]);
}
param.Add(_guid,$@"%{((JValue)_data[_item.Name]!).Value!}%" );
}
else {
param.Add(_guid, ((JValue)_data[_item.Name]!).Value!);
}
}
break;
default:
_guid = Guid.NewGuid().ToString("N");
_wherestr.Append($@" AND `{_column.Key!}` {where[_column.Key!]} @{_guid}");
param.Add(_guid, row![_column.Key!]);
break;
else if (_item.Value.Type == JTokenType.Array)
{
string[] _itemv = _item.Value.ToObject<string[]>()!;
object[] _colv = _data[_item.Name]!.ToObject<object[]>()!;
for (var k = 0; k < _itemv.Length; k++)
{
_guid = Guid.NewGuid().ToString("N");
_wherestr.AppendLine(@$"AND `{_item.Name}` {_itemv[k]} @{_guid}");
param.Add(_guid, _colv.Length > k ? _colv[k] : _colv[_colv.Length - 1]);
}
}
}
}
}
_wherestr.AppendLine(")");
}
command.AppendLine(_wherestr.ToString());
}
return _wherestr.ToString();
}
/// <summary>
@ -111,21 +133,23 @@ namespace ZKLT.Hadoop
/// <param name="table">表</param>
/// <param name="order">排序</param>
/// <returns></returns>
private string MergeOrder(HDP_Table table, Dictionary<string, object>? order, Dictionary<string, object> param)
private string MergeOrder(HDP_Table table, JContainer? order, Dictionary<string, object> param)
{
string _guid = "";
StringBuilder _orderstr = new StringBuilder();
if (order != null && order.Count > 0)
{
_orderstr.Append("ORDER BY ");
foreach (var key in order.Keys)
var _fields = order.Children().ToArray();
foreach (var field in _fields)
{
if (table.Columns!.Any(x => x.Key == key))
var _field = (JProperty)field;
if (table.Columns!.Any(x => x.Key == _field.Name))
{
var _column = table.Columns!.First(x => x.Key == key);
if (order[key] is string)
var _column = table.Columns!.First(x => x.Key == _field.Name);
if (_field.Value.Type == JTokenType.String)
{
switch (order[key])
switch (_field.Value.ToString())
{
case "DESC":
_orderstr.Append($@"`{_column.Key!}` DESC,");
@ -135,9 +159,9 @@ namespace ZKLT.Hadoop
break;
}
}
else if (order[key] is Newtonsoft.Json.Linq.JArray)
else if (_field.Value.Type == JTokenType.Array)
{
var _orderTemp = JsonConvert.DeserializeObject<object[]>(JsonConvert.SerializeObject(order[key]));
var _orderTemp = _field.Value.ToObject<object[]>();
_orderstr.Append(@$"CASE `{_column.Key!}`");
for (var i = 0; i < _orderTemp!.Length; i++)
{
@ -373,9 +397,9 @@ namespace ZKLT.Hadoop
/// </summary>
/// <param name="source">数据源</param>
/// <param name="table">数据表</param>
/// <param name="row">数据</param>
/// <param name="data">数据</param>
/// <returns>是否成功</returns>
public bool Insert(HDP_Source source, HDP_Table table, Dictionary<string, object> row)
public bool Insert(HDP_Source source, HDP_Table table, JContainer? data)
{
//数据校验
if (string.IsNullOrEmpty(table.Key))
@ -386,80 +410,99 @@ namespace ZKLT.Hadoop
{
throw new ArgumentNullException("列无效");
}
if (row == null || row.Count == 0)
if (data == null || data.Count == 0)
{
throw new ArgumentNullException("数据无效");
}
using (MySqlConnection _connection = new MySqlConnection(source.GetConnectString()))
List<JObject> _data = new List<JObject>();
if (data.Type == JTokenType.Object)
{
try
{
_connection.Open();
}
catch
{
throw new ArgumentException("数据源连接失败");
}
StringBuilder _command = new StringBuilder();
//主键检查
var _primarys = table.Columns.Where(x => x.IsPrimary == true).ToArray();
for (var i = 0; i < _primarys.Length; i++)
_data.Add((JObject)data);
}
else if (data.Type == JTokenType.Array)
{
_data = data.ToObject<List<JObject>>()!;
}
var _result = 0;
using (TransactionScope _scope = new TransactionScope())
{
using (MySqlConnection _connection = new MySqlConnection(source.GetConnectString()))
{
var _primary = _primarys[i];
if (!string.IsNullOrEmpty(_primary.InsertDefault) || (row.ContainsKey(_primary.Key!) && row[_primary.Key!] != null))
try
{
continue;
_connection.Open();
}
else
catch
{
throw new ArgumentException($@"主键{_primary.Key}值无效");
throw new ArgumentException("数据源连接失败");
}
}
for (var j = 0; j < _data.Count; j++)
{
var _row = _data[j];
//插入命令
_command.AppendLine($@"INSERT INTO `{table.Key}` (");
StringBuilder _colstr = new StringBuilder();
StringBuilder _parmstr = new StringBuilder();
Dictionary<string, object> _params = new Dictionary<string, object>();
for (var i = 0; i < table.Columns.Length; i++)
{
var _column = table.Columns[i];
if (row.ContainsKey(_column.Key!) && row[_column.Key!] != null)
StringBuilder _command = new StringBuilder();
//主键检查
var _primarys = table.Columns.Where(x => x.IsPrimary == true).ToArray();
for (var i = 0; i < _primarys.Length; i++)
{
var _primary = _primarys[i];
if (!string.IsNullOrEmpty(_primary.InsertDefault) || _row.ContainsKey(_primary.Key!))
{
continue;
}
else
{
throw new ArgumentException($@"主键{_primary.Key}值无效");
}
}
//插入命令
_command.AppendLine($@"INSERT INTO `{table.Key}` (");
StringBuilder _colstr = new StringBuilder();
StringBuilder _parmstr = new StringBuilder();
Dictionary<string, object> _params = new Dictionary<string, object>();
for (var i = 0; i < table.Columns.Length; i++)
{
var _column = table.Columns[i];
if (_row.ContainsKey(_column.Key!))
{
_colstr.Append($@"`{_column.Key!}`,");
_parmstr.Append($@"@{_column.Key},");
_params.Add(_column.Key!, ((JValue)_row[_column.Key!]!).Value!);
}
else if (!string.IsNullOrEmpty(_column.InsertDefault))
{
_colstr.Append($@"`{_column.Key!}`,");
_parmstr.Append($@"@{_column.Key},");
_params.Add(_column.Key!, HDP_CommandAction.Convert(_column.InsertDefault, _row));
}
}
if (_colstr[_colstr.Length - 1] == ',')
{
_colstr.Remove(_colstr.Length - 1, 1);
}
if (_parmstr[_parmstr.Length - 1] == ',')
{
_parmstr.Remove(_parmstr.Length - 1, 1);
}
_command.AppendLine(_colstr.ToString());
_command.AppendLine(") VALUES (");
_command.AppendLine(_parmstr.ToString());
_command.AppendLine(")");
_result += _connection.Execute(_command.ToString(), _params);
}
_connection.Close();
if (_result >= _data.Count)
{
_colstr.Append($@"`{_column.Key!}`,");
_parmstr.Append($@"@{_column.Key},");
_params.Add(_column.Key!, row[_column.Key!]);
_scope.Complete();
return true;
}
else if (!string.IsNullOrEmpty(_column.InsertDefault))
else
{
_colstr.Append($@"`{_column.Key!}`,");
_parmstr.Append($@"@{_column.Key},");
_params.Add(_column.Key!, HDP_CommandAction.Convert(_column.InsertDefault, row));
return false;
}
}
if (_colstr[_colstr.Length - 1] == ',')
{
_colstr.Remove(_colstr.Length - 1, 1);
}
if (_parmstr[_parmstr.Length - 1] == ',')
{
_parmstr.Remove(_parmstr.Length - 1, 1);
}
_command.AppendLine(_colstr.ToString());
_command.AppendLine(") VALUES (");
_command.AppendLine(_parmstr.ToString());
_command.AppendLine(")");
var _result = _connection.Execute(_command.ToString(), _params);
_connection.Close();
if (_result > 0)
{
return true;
}
else
{
return false;
}
}
}
@ -471,7 +514,7 @@ namespace ZKLT.Hadoop
/// <param name="where">条件</param>
/// <param name="row">数据</param>
/// <returns>是否成功</returns>
public bool Update(HDP_Source source, HDP_Table table, Dictionary<string, string> where, Dictionary<string, object> row)
public bool Update(HDP_Source source, HDP_Table table, JContainer? where, JContainer? data)
{
//数据校验
if (string.IsNullOrEmpty(table.Key))
@ -482,64 +525,89 @@ namespace ZKLT.Hadoop
{
throw new ArgumentNullException("列无效");
}
if (where == null || where.Count == 0)
if (where == null)
{
throw new ArgumentNullException("条件无效");
}
if (row == null || row.Count == 0)
List<JObject> _where = new List<JObject>();
if (where.Type == JTokenType.Object)
{
_where.Add((JObject)where);
}
else if (where.Type == JTokenType.Array)
{
_where.AddRange(((JArray)where).ToObject<JObject[]>()!);
}
if (data == null || data.Count == 0)
{
throw new ArgumentNullException("数据无效");
}
using (MySqlConnection _connection = new MySqlConnection(source.GetConnectString()))
List<JObject> _data = new List<JObject>();
if (data.Type == JTokenType.Object)
{
try
{
_connection.Open();
}
catch
{
throw new ArgumentException("数据源连接失败");
}
//更新命令
StringBuilder _command = new StringBuilder();
Dictionary<string, object> _params = new Dictionary<string, object>();
_command.AppendLine(@$"UPDATE `{table.Key}` SET ");
//更新列
StringBuilder _colstr = new StringBuilder();
for (var i = 0; i < table.Columns.Length; i++)
_data.Add((JObject)data);
}
else if (data.Type == JTokenType.Array)
{
_data = data.ToObject<List<JObject>>()!;
}
var _result = 0;
using (TransactionScope _scope = new TransactionScope())
{
using (MySqlConnection _connection = new MySqlConnection(source.GetConnectString()))
{
var _column = table.Columns[i];
if (row.ContainsKey(_column.Key!) && !where.ContainsKey(_column.Key!) && row[_column.Key!] != null)
try
{
_colstr.Append($@"`{_column.Key!}`=@{_column.Key!},");
_params.Add(_column.Key!, row[_column.Key!]);
_connection.Open();
}
else if (!string.IsNullOrEmpty(_column.UpdateDefault))
catch
{
_colstr.Append($@"`{_column.Key!}`=@{_column.Key!},");
_params.Add(_column.Key!, HDP_CommandAction.Convert(_column.UpdateDefault, row));
throw new ArgumentException("数据源连接失败");
}
}
if (_colstr[_colstr.Length - 1] == ',')
{
_colstr.Remove(_colstr.Length - 1, 1);
}
_command.AppendLine(_colstr.ToString());
for (var j = 0; j < _data.Count; j++)
{
var _row = _data[j];
//更新命令
StringBuilder _command = new StringBuilder();
Dictionary<string, object> _params = new Dictionary<string, object>();
_command.AppendLine(@$"UPDATE `{table.Key}` SET ");
//执行条件
MergeWhere(table, where, row, _command, _params);
//更新列
StringBuilder _colstr = new StringBuilder();
for (var i = 0; i < table.Columns.Length; i++)
{
var _column = table.Columns[i];
if (_row.ContainsKey(_column.Key!) && !_where.Any(x => x.ContainsKey(_column.Key!)))
{
_colstr.Append(@$"`{_column.Key!}`={HDP_CommandAction.ColConvert(_column,_params, ((JValue)_row[_column.Key!]!).Value!)},");
}
else if (!string.IsNullOrEmpty(_column.UpdateDefault))
{
_colstr.Append($@"`{_column.Key!}`=@{_column.Key!},");
_params.Add(_column.Key!, HDP_CommandAction.Convert(_column.UpdateDefault, _row));
}
}
if (_colstr[_colstr.Length - 1] == ',')
{
_colstr.Remove(_colstr.Length - 1, 1);
}
_command.AppendLine(_colstr.ToString());
var _result = _connection.Execute(_command.ToString(), _params);
_connection.Close();
if (_result > 0)
{
return true;
}
else
{
return false;
//执行条件
_command.AppendLine(MergeWhere(table, where, _row, _params));
_result += _connection.Execute(_command.ToString(), _params);
}
_connection.Close();
if (_result >= _data.Count)
{
_scope.Complete();
return true;
}
else
{
return false;
}
}
}
}
@ -552,7 +620,7 @@ namespace ZKLT.Hadoop
/// <param name="where">条件</param>
/// <param name="row">数据</param>
/// <returns>是否成功</returns>
public bool Delete(HDP_Source source, HDP_Table table, Dictionary<string, string> where, Dictionary<string, object> row)
public bool Delete(HDP_Source source, HDP_Table table, JContainer? where, JContainer? data)
{
//数据校验
if (string.IsNullOrEmpty(table.Key))
@ -563,41 +631,59 @@ namespace ZKLT.Hadoop
{
throw new ArgumentNullException("列无效");
}
if (where == null || where.Count == 0)
if (where == null)
{
throw new ArgumentNullException("条件无效");
}
if (row == null || row.Count == 0)
if (data == null || data.Count == 0)
{
throw new ArgumentNullException("数据无效");
}
using (MySqlConnection _connection = new MySqlConnection(source.GetConnectString()))
List<JObject> _data = new List<JObject>();
if (data.Type == JTokenType.Object)
{
try
{
_connection.Open();
}
catch
_data.Add((JObject)data);
}
else if (data.Type == JTokenType.Array)
{
_data = data.ToObject<List<JObject>>()!;
}
var _result = 0;
using (TransactionScope _scope = new TransactionScope())
{
using (MySqlConnection _connection = new MySqlConnection(source.GetConnectString()))
{
throw new ArgumentException("数据源连接失败");
}
try
{
_connection.Open();
}
catch
{
throw new ArgumentException("数据源连接失败");
}
//更新命令
StringBuilder _command = new StringBuilder();
Dictionary<string, object> _params = new Dictionary<string, object>();
_command.AppendLine(@$"DELETE FROM `{table.Key}`");
for (var j = 0; j < _data.Count; j++)
{
var _row = _data[j];
//删除命令
StringBuilder _command = new StringBuilder();
Dictionary<string, object> _params = new Dictionary<string, object>();
_command.AppendLine(@$"DELETE FROM `{table.Key}`");
MergeWhere(table, where, row, _command, _params);
_command.AppendLine(MergeWhere(table, where, _row, _params));
var _result = _connection.Execute(_command.ToString(), _params);
_connection.Close();
if (_result > 0)
{
return true;
}
else
{
return false;
_result += _connection.Execute(_command.ToString(), _params);
}
_connection.Close();
if (_result >= _data.Count)
{
_scope.Complete();
return true;
}
else
{
return false;
}
}
}
}
@ -608,9 +694,9 @@ namespace ZKLT.Hadoop
/// <param name="source">数据源</param>
/// <param name="table">数据表</param>
/// <param name="where">条件</param>
/// <param name="row">数据</param>
/// <param name="data">数据</param>
/// <returns>结果</returns>
public T? QuerySingle<T>(HDP_Source source, HDP_Table table, Dictionary<string, string> where, Dictionary<string, object> row,
public T? QuerySingle<T>(HDP_Source source, HDP_Table table, JContainer? where, JContainer? data,
string[]? col)
{
//数据校验
@ -622,11 +708,11 @@ namespace ZKLT.Hadoop
{
throw new ArgumentNullException("列无效");
}
if (where == null || where.Count == 0)
if (where == null)
{
throw new ArgumentNullException("条件无效");
}
if (row == null || row.Count == 0)
if (data == null || data.Count == 0)
{
throw new ArgumentNullException("数据无效");
}
@ -656,7 +742,7 @@ namespace ZKLT.Hadoop
}
//执行条件
MergeWhere(table, where, row, _command, _params);
_command.AppendLine(MergeWhere(table, where, data, _params));
var _result = _connection.Query<T>(_command.ToString(), _params).ToArray();
_connection.Close();
@ -678,10 +764,10 @@ namespace ZKLT.Hadoop
/// <param name="source">数据源</param>
/// <param name="table">数据表</param>
/// <param name="where">条件</param>
/// <param name="row">数据</param>
/// <param name="data">数据</param>
/// <returns>结果集</returns>
public T[] Query<T>(HDP_Source source, HDP_Table table, Dictionary<string, string>? where, Dictionary<string, object>? row,
Dictionary<string, object>? order, string[]? col)
public T[] Query<T>(HDP_Source source, HDP_Table table, JContainer? where, JContainer? data,
JContainer? order, string[]? col)
{
//数据校验
if (string.IsNullOrEmpty(table.Key))
@ -718,7 +804,7 @@ namespace ZKLT.Hadoop
}
//执行条件
MergeWhere(table, where, row, _command, _params);
_command.AppendLine(MergeWhere(table, where, data, _params));
//执行排序
_command.AppendLine(MergeOrder(table, order, _params));
@ -738,8 +824,8 @@ namespace ZKLT.Hadoop
/// <param name="where">条件</param>
/// <param name="row">数据</param>
/// <returns>结果集</returns>
public HDP_Page<T> QueryPage<T>(HDP_Source source, HDP_Table table, int pageIndex, int pageSize, Dictionary<string, string>? where,
Dictionary<string, object>? row, Dictionary<string, object>? order, string[]? col)
public HDP_Page<T> QueryPage<T>(HDP_Source source, HDP_Table table, int pageIndex, int pageSize, JContainer? where,
JContainer? data, JContainer? order, string[]? col)
{
//数据校验
if (string.IsNullOrEmpty(table.Key))
@ -776,35 +862,10 @@ namespace ZKLT.Hadoop
}
//执行条件
MergeWhere(table, where, row, _command, _params);
_command.AppendLine(MergeWhere(table, where, data, _params));
//执行排序
StringBuilder _orderstr = new StringBuilder();
_orderstr.Append("ORDER BY ");
if (order != null && order.Count > 0)
{
for (var i = 0; i < table.Columns.Length; i++)
{
var _column = table.Columns[i];
if (order.ContainsKey(_column.Key!))
{
switch (order[_column.Key!])
{
case "DESC":
_orderstr.Append($@"`{_column.Key!}` DESC,");
break;
default:
_orderstr.Append($@"`{_column.Key!}` ASC,");
break;
}
}
}
if (_orderstr[_orderstr.Length - 1] == ',')
{
_orderstr.Remove(_orderstr.Length - 1, 1);
}
_command.AppendLine(_orderstr.ToString());
}
_command.AppendLine(MergeOrder(table, order, _params));
var _result = new HDP_Page<T>();
_result.PageIndex = pageIndex;
_result.PageSize = pageSize;

@ -1 +0,0 @@
docker build -f Hadoop\ZKLT.Hadoop.API\Dockerfile -t hadoop:latest .

@ -13,12 +13,6 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ZKLT.Hadoop.Interface", "Ha
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ZKLT.Hadoop.API", "Hadoop\ZKLT.Hadoop.API\ZKLT.Hadoop.API.csproj", "{398DF3E5-B94B-4B2E-9DC6-B6741D7CF4DB}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "docker-compose", "docker-compose", "{1E26F2C2-6BCB-4E5D-ADC5-C6AD923155F9}"
ProjectSection(SolutionItems) = preProject
docker-compose\hadoop.yml = docker-compose\hadoop.yml
docker-compose\mysql.yml = docker-compose\mysql.yml
EndProjectSection
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU

@ -1,20 +0,0 @@
version: '3'
services:
hadoop_mysql:
restart: always
image: mysql:8
container_name: hadoop_mysql
# volumes:
# - /apps/mysql/mydir:/mydir
# - /apps/mysql/datadir:/var/lib/mysql
# - /apps/mysql/conf/my.cnf:/etc/my.cnf
# # 数据库还原目录 可将需要还原的sql文件放在这里
# - /apps/mysql/source:/docker-entrypoint-initdb.d
environment:
- "MYSQL_ROOT_PASSWORD=root"
- "MYSQL_DATABASE=hadoopdb"
- "TZ=Asia/Shanghai"
ports:
# 使用宿主机的3306端口映射到容器的3306端口
# 宿主机:容器
- 3306:3306
Loading…
Cancel
Save