You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
341 lines
12 KiB
C#
341 lines
12 KiB
C#
using Dapr.Actors.Client;
|
|
using Dapr.Actors;
|
|
using DotNetty.Buffers;
|
|
using DotNetty.Handlers.Logging;
|
|
using DotNetty.Handlers.Tls;
|
|
using DotNetty.Transport.Bootstrapping;
|
|
using DotNetty.Transport.Channels;
|
|
using DotNetty.Transport.Channels.Sockets;
|
|
using LanShengInterface;
|
|
using LanShengModel;
|
|
using LanShengService.Tcp;
|
|
using Microsoft.Extensions.Configuration;
|
|
using Microsoft.Extensions.DependencyInjection;
|
|
using Microsoft.Extensions.Logging;
|
|
using SqlSugar;
|
|
using System;
|
|
using System.Collections.Generic;
|
|
using System.Linq;
|
|
using System.Text;
|
|
using System.Threading.Tasks;
|
|
using System.Text.Json;
|
|
using System.Data;
|
|
using Microsoft.AspNetCore.DataProtection.KeyManagement;
|
|
using Dapr.Client;
|
|
using DotNetty.Transport.Channels.Groups;
|
|
using CommonModel;
|
|
using DotNetty.Handlers.Timeout;
|
|
using System.Diagnostics;
|
|
|
|
namespace LanShengService
|
|
{
|
|
public class TcpService : ITcpService
|
|
{
|
|
public TcpService(IServiceProvider services)
|
|
{
|
|
Services = services;
|
|
|
|
Logger = services.GetRequiredService<ILogger<TcpService>>();
|
|
|
|
Logger.LogDebug("创建Tcp服务");
|
|
|
|
DeviceService = services.GetRequiredService<IDeviceService>();
|
|
|
|
Db = services.GetRequiredService<ISqlSugarClient>();
|
|
|
|
Db.CodeFirst.SplitTables().InitTables(typeof(TcpDataLog<DeviceData>));
|
|
|
|
Logger.LogDebug("加载Tcp服务数据结构");
|
|
|
|
IConfiguration configuration = services.GetRequiredService<IConfiguration>()!;
|
|
|
|
if (configuration.GetSection("DotNetty").Exists())
|
|
{
|
|
|
|
if (configuration.GetSection("DotNetty:Use").Exists())
|
|
{
|
|
Use = configuration.GetSection("DotNetty:Use").Value!;
|
|
}
|
|
if (configuration.GetSection("DotNetty:ThreadPool").Exists())
|
|
{
|
|
ThreadPool = Convert.ToInt32(configuration.GetSection("DotNetty:ThreadPool").Value!);
|
|
}
|
|
if (configuration.GetSection("DotNetty:Port").Exists())
|
|
{
|
|
Port = Convert.ToInt32(configuration.GetSection("DotNetty:Port").Value!);
|
|
}
|
|
Logger.LogDebug("加载Tcp服务参数");
|
|
}
|
|
|
|
OpenTcpPort();
|
|
}
|
|
|
|
private readonly IServiceProvider Services;
|
|
|
|
private readonly ILogger Logger;
|
|
|
|
private readonly string Use = "N";
|
|
|
|
private readonly int ThreadPool = 1;
|
|
|
|
private readonly int Port = 8888;
|
|
|
|
private readonly ISqlSugarClient Db;
|
|
|
|
private readonly IDeviceService DeviceService;
|
|
|
|
private ServerBootstrap? Bootstrap;
|
|
|
|
private IEventLoopGroup? BossGroup;
|
|
|
|
private IEventLoopGroup? WorkerGroup;
|
|
|
|
private Dictionary<string, IChannelGroup>? ChannelGroups;
|
|
|
|
private List<TcpDataLog<DeviceData>>? MessageQueue;
|
|
|
|
private async Task OpenTcpPort()
|
|
{
|
|
if (Use == "Y")
|
|
{
|
|
ChannelGroups = new Dictionary<string, IChannelGroup>();
|
|
|
|
MessageQueue = new List<TcpDataLog<DeviceData>>();
|
|
|
|
Bootstrap = new ServerBootstrap();
|
|
|
|
BossGroup = new MultithreadEventLoopGroup(ThreadPool);
|
|
|
|
WorkerGroup = new MultithreadEventLoopGroup();
|
|
|
|
Logger.LogDebug("加载Tcp端口参数");
|
|
|
|
Bootstrap!.Group(BossGroup, WorkerGroup);
|
|
|
|
Bootstrap.Channel<TcpServerSocketChannel>();
|
|
|
|
Bootstrap
|
|
.Option(ChannelOption.SoBacklog, 1024)
|
|
.Option(ChannelOption.Allocator, UnpooledByteBufferAllocator.Default)
|
|
.Option(ChannelOption.RcvbufAllocator,new AdaptiveRecvByteBufAllocator())
|
|
.ChildOption(ChannelOption.Allocator, UnpooledByteBufferAllocator.Default)
|
|
.ChildOption(ChannelOption.RcvbufAllocator, new AdaptiveRecvByteBufAllocator())
|
|
.ChildOption(ChannelOption.SoKeepalive, true)
|
|
.ChildOption(ChannelOption.TcpNodelay, true)
|
|
.ChildOption(ChannelOption.SoReuseport, true)
|
|
.ChildHandler(new ActionChannelInitializer<IChannel>(channel =>
|
|
{
|
|
IChannelPipeline Pipeline = channel.Pipeline;
|
|
Pipeline.AddLast(new IdleStateHandler(10, 0, 0));//心跳
|
|
Pipeline.AddLast("encoder", new EncoderHandler());
|
|
Pipeline.AddLast("decoder", new DecoderHandler());
|
|
Pipeline.AddLast(new HeartBeatHandler());
|
|
Pipeline.AddLast(new ChannelHandler(Services));
|
|
}));
|
|
await Bootstrap.BindAsync(Port);
|
|
Logger.LogInformation($"启动Tcp端口监听{Port}");
|
|
|
|
DeviceService.TimerInsertDataDo();
|
|
DeviceService.TimerInsertDataLogDo();
|
|
AppDomain.CurrentDomain.ProcessExit += CurrentDomain_ProcessExit;
|
|
}
|
|
}
|
|
|
|
private void CurrentDomain_ProcessExit(object? sender, EventArgs e)
|
|
{
|
|
Logger.LogDebug("程序关闭");
|
|
if (Use == "Y")
|
|
{
|
|
Task.WaitAll(
|
|
BossGroup!.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(0), TimeSpan.FromSeconds(0)),
|
|
WorkerGroup!.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(0), TimeSpan.FromSeconds(0)));
|
|
}
|
|
DeviceService.Offline().Wait();
|
|
}
|
|
|
|
public Task AddChannel(string id, IChannelHandlerContext context)
|
|
{
|
|
if (!ChannelGroups!.ContainsKey(id))
|
|
{
|
|
lock (this)
|
|
{
|
|
if (!ChannelGroups!.ContainsKey(id))
|
|
{
|
|
ChannelGroups.Add(id, new DefaultChannelGroup(context.Executor));
|
|
}
|
|
}
|
|
}
|
|
if (!ChannelGroups[id].Contains(context.Channel))
|
|
{
|
|
lock (this)
|
|
{
|
|
if (!ChannelGroups[id].Contains(context.Channel))
|
|
{
|
|
ChannelGroups[id].Add(context.Channel);
|
|
Logger.LogDebug($"添加通道{id}:{context.Channel.Id.AsLongText()}");
|
|
}
|
|
}
|
|
}
|
|
|
|
return Task.CompletedTask;
|
|
}
|
|
|
|
public async Task PushParams(DeviceData deviceData)
|
|
{
|
|
Logger.LogDebug($"下发指令{JsonSerializer.Serialize(deviceData)}");
|
|
if (ChannelGroups == null)
|
|
{
|
|
throw new BadRequestException("连接服务器失败");
|
|
}
|
|
else
|
|
{
|
|
if (!ChannelGroups.ContainsKey(deviceData.Id!))
|
|
{
|
|
throw new BadRequestException("设备不在线");
|
|
}
|
|
else
|
|
{
|
|
await ReloadPush(deviceData);
|
|
Random random = new Random();
|
|
var Frame_number = 0;
|
|
do
|
|
{
|
|
Frame_number = random.Next(0, 20);
|
|
}
|
|
while (MessageQueue!.Any(x => x.DTU_ID == deviceData.Id && x.Frame_number == Frame_number));
|
|
TcpDataLog<DeviceData> packet = new TcpDataLog<DeviceData>()
|
|
{
|
|
Frame_START = "FAAA",
|
|
Frame_number = Frame_number,
|
|
DTU_ID = deviceData.Id,
|
|
Msg_ID = "1104",
|
|
Frame_END = "BBFB",
|
|
Content = deviceData
|
|
};
|
|
try
|
|
{
|
|
MessageQueue!.Add(packet);
|
|
await ChannelGroups[packet.DTU_ID!].WriteAndFlushAsync(packet);
|
|
}
|
|
catch
|
|
{
|
|
MessageQueue!.Remove(packet);
|
|
throw new BadRequestException("发送指令失败");
|
|
}
|
|
await WaitPushReply(packet);
|
|
}
|
|
}
|
|
}
|
|
|
|
private Task ReloadPush(DeviceData deviceData)
|
|
{
|
|
var oldData = Db.Queryable<DeviceData>().Where(x => x.Id == deviceData.Id).First();
|
|
|
|
if (deviceData.Version == null)
|
|
{
|
|
deviceData.Version = oldData.Version;
|
|
}
|
|
if (deviceData.DataTick == null)
|
|
{
|
|
deviceData.DataTick = oldData.DataTick;
|
|
}
|
|
if (deviceData.Data27_B7 == null)
|
|
{
|
|
deviceData.Data27_B7 = oldData.Data27_B7;
|
|
}
|
|
if (deviceData.Data27_B6 == null)
|
|
{
|
|
deviceData.Data27_B6 = oldData.Data27_B6;
|
|
}
|
|
if (deviceData.Data27_B5 == null)
|
|
{
|
|
deviceData.Data27_B5 = oldData.Data27_B5;
|
|
}
|
|
if (deviceData.Data27_B4 == null)
|
|
{
|
|
deviceData.Data27_B4 = oldData.Data27_B4;
|
|
}
|
|
if (deviceData.Data27_B3 == null)
|
|
{
|
|
deviceData.Data27_B3 = oldData.Data27_B3;
|
|
}
|
|
if (deviceData.ConnectString != null && deviceData.ConnectString != oldData.ConnectString)
|
|
{
|
|
deviceData.Data27_B0 = 1;
|
|
}
|
|
else
|
|
{
|
|
deviceData.Data27_B0 = 0;
|
|
}
|
|
if (deviceData.Data23 == null)
|
|
{
|
|
deviceData.Data23 = oldData.Data23;
|
|
}
|
|
if (deviceData.Data20 == null)
|
|
{
|
|
deviceData.Data20 = oldData.Data20;
|
|
}
|
|
if (deviceData.Data19 == null)
|
|
{
|
|
deviceData.Data19 = oldData.Data19;
|
|
}
|
|
if (deviceData.Data18 == null)
|
|
{
|
|
deviceData.Data18 = oldData.Data18;
|
|
}
|
|
if (deviceData.Data16_B2 == null)
|
|
{
|
|
deviceData.Data16_B2 = oldData.Data16_B2;
|
|
}
|
|
if (deviceData.Data25_B4 == null)
|
|
{
|
|
deviceData.Data25_B4 = oldData.Data25_B4;
|
|
}
|
|
if (deviceData.Data25_B0 == null)
|
|
{
|
|
deviceData.Data25_B0 = oldData.Data25_B0;
|
|
}
|
|
if (deviceData.Data26_B4 == null)
|
|
{
|
|
deviceData.Data26_B4 = oldData.Data26_B4;
|
|
}
|
|
if (deviceData.Data26_B0 == null)
|
|
{
|
|
deviceData.Data26_B0 = oldData.Data26_B0;
|
|
}
|
|
if (deviceData.UpdateTag == null)
|
|
{
|
|
deviceData.UpdateTag = 0;
|
|
}
|
|
if (deviceData.ConnectString == null)
|
|
{
|
|
deviceData.ConnectString = oldData.ConnectString;
|
|
}
|
|
return Task.CompletedTask;
|
|
}
|
|
|
|
private Task WaitPushReply(TcpDataLog<DeviceData> packet)
|
|
{
|
|
var tryCount = 20;
|
|
while (MessageQueue!.Any((x => x.DTU_ID == packet.DTU_ID && x.Frame_number == packet.Frame_number)))
|
|
{
|
|
if (tryCount <= 0)
|
|
{
|
|
MessageQueue!.Remove(packet);
|
|
throw new BadRequestException("设备应答超时");
|
|
}
|
|
Thread.Sleep(500);
|
|
tryCount--;
|
|
}
|
|
return Task.CompletedTask;
|
|
}
|
|
|
|
public Task RemoveMessageItem(TcpDataLog<DeviceData> packet)
|
|
{
|
|
Logger.LogDebug($"删除消息{JsonSerializer.Serialize(packet)}");
|
|
MessageQueue!.RemoveAll((x) => x.DTU_ID == packet.DTU_ID && x.Frame_number == packet.Frame_number);
|
|
return Task.CompletedTask;
|
|
}
|
|
}
|
|
}
|