using Dapr.Actors.Client; using Dapr.Actors; using DotNetty.Buffers; using DotNetty.Handlers.Logging; using DotNetty.Handlers.Tls; using DotNetty.Transport.Bootstrapping; using DotNetty.Transport.Channels; using DotNetty.Transport.Channels.Sockets; using LanShengInterface; using LanShengModel; using LanShengService.Tcp; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using SqlSugar; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using System.Text.Json; using System.Data; using Microsoft.AspNetCore.DataProtection.KeyManagement; using Dapr.Client; using DotNetty.Transport.Channels.Groups; using CommonModel; using DotNetty.Handlers.Timeout; using System.Diagnostics; using System.Reflection; namespace LanShengService { public class TcpService : ITcpService { public TcpService(IServiceProvider services) { Services = services; Logger = services.GetRequiredService>(); Logger.LogDebug("创建Tcp服务"); DeviceService = services.GetRequiredService(); Db = services.GetRequiredService(); Db.CodeFirst.SplitTables().InitTables(typeof(TcpDataLog)); Logger.LogDebug("加载Tcp服务数据结构"); IConfiguration configuration = services.GetRequiredService()!; if (configuration.GetSection("DotNetty").Exists()) { if (configuration.GetSection("DotNetty:Use").Exists()) { Use = configuration.GetSection("DotNetty:Use").Value!; } if (configuration.GetSection("DotNetty:ThreadPool").Exists()) { ThreadPool = Convert.ToInt32(configuration.GetSection("DotNetty:ThreadPool").Value!); } if (configuration.GetSection("DotNetty:Port").Exists()) { Port = Convert.ToInt32(configuration.GetSection("DotNetty:Port").Value!); } Logger.LogDebug("加载Tcp服务参数"); } OpenTcpPort(); } private readonly IServiceProvider Services; private readonly ILogger Logger; private readonly string Use = "N"; private readonly int ThreadPool = 1; private readonly int Port = 8888; private readonly ISqlSugarClient Db; private readonly IDeviceService DeviceService; private ServerBootstrap? Bootstrap; private IEventLoopGroup? BossGroup; private IEventLoopGroup? WorkerGroup; private Dictionary? ChannelGroups; private List>? MessageQueue; private async Task OpenTcpPort() { if (Use == "Y") { ChannelGroups = new Dictionary(); MessageQueue = new List>(); Bootstrap = new ServerBootstrap(); BossGroup = new MultithreadEventLoopGroup(ThreadPool); WorkerGroup = new MultithreadEventLoopGroup(); Logger.LogDebug("加载Tcp端口参数"); Bootstrap!.Group(BossGroup, WorkerGroup); Bootstrap.Channel(); Bootstrap .Option(ChannelOption.SoBacklog, 1024) .Option(ChannelOption.Allocator, UnpooledByteBufferAllocator.Default) .Option(ChannelOption.RcvbufAllocator, new AdaptiveRecvByteBufAllocator()) .ChildOption(ChannelOption.Allocator, UnpooledByteBufferAllocator.Default) .ChildOption(ChannelOption.RcvbufAllocator, new AdaptiveRecvByteBufAllocator()) .ChildOption(ChannelOption.SoKeepalive, true) .ChildOption(ChannelOption.TcpNodelay, true) .ChildOption(ChannelOption.SoReuseport, true) .ChildHandler(new ActionChannelInitializer(channel => { IChannelPipeline Pipeline = channel.Pipeline; Pipeline.AddLast(new IdleStateHandler(10, 0, 0));//心跳 Pipeline.AddLast("encoder", new EncoderHandler()); Pipeline.AddLast("decoder", new DecoderHandler()); Pipeline.AddLast(new HeartBeatHandler()); Pipeline.AddLast(new ChannelHandler(Services)); })); await Bootstrap.BindAsync(Port); Logger.LogInformation($"启动Tcp端口监听{Port}"); DeviceService.TimerInsertDataDo(); DeviceService.TimerInsertDataLogDo(); AppDomain.CurrentDomain.ProcessExit += CurrentDomain_ProcessExit; } } private void CurrentDomain_ProcessExit(object? sender, EventArgs e) { Logger.LogDebug("程序关闭"); if (Use == "Y") { Task.WaitAll( BossGroup!.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(0), TimeSpan.FromSeconds(0)), WorkerGroup!.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(0), TimeSpan.FromSeconds(0))); } DeviceService.Offline().Wait(); } public Task AddChannel(string id, IChannelHandlerContext context) { if (!ChannelGroups!.ContainsKey(id)) { lock (this) { if (!ChannelGroups!.ContainsKey(id)) { ChannelGroups.Add(id, new DefaultChannelGroup(context.Executor)); } } } if (!ChannelGroups[id].Contains(context.Channel)) { lock (this) { if (!ChannelGroups[id].Contains(context.Channel)) { ChannelGroups[id].Add(context.Channel); Logger.LogDebug($"添加通道{id}:{context.Channel.Id.AsLongText()}"); } } } return Task.CompletedTask; } public async Task PushParams(DeviceData deviceData) { Logger.LogDebug($"下发指令{JsonSerializer.Serialize(deviceData)}"); if (ChannelGroups == null) { throw new BadRequestException("连接服务器失败"); } else { if (!ChannelGroups.ContainsKey(deviceData.Id!)) { throw new BadRequestException("设备不在线"); } else { await ReloadPush(deviceData); Random random = new Random(); var Frame_number = 0; do { Frame_number = random.Next(0, 20); } while (MessageQueue!.Any(x => x.DTU_ID == deviceData.Id && x.Frame_number == Frame_number)); TcpDataLog packet = new TcpDataLog() { Frame_START = "FAAA", Frame_number = Frame_number, DTU_ID = deviceData.Id, Msg_ID = string.IsNullOrEmpty(deviceData.MsgType) ? "1104" : deviceData.MsgType, Frame_END = "BBFB", Content = deviceData }; try { MessageQueue!.Add(packet); await ChannelGroups[packet.DTU_ID!].WriteAndFlushAsync(packet); } catch { MessageQueue!.Remove(packet); throw new BadRequestException("发送指令失败"); } await WaitPushReply(packet); } } } private Task ReloadPush(DeviceData deviceData) { var oldData = Db.Queryable().Where(x => x.Id == deviceData.Id).First(); Type TempType = typeof(DeviceData); PropertyInfo[] TempPropertys = TempType.GetProperties(); if (deviceData.ConnectString != null && deviceData.ConnectString != oldData.ConnectString) { deviceData.Data27_B0 = 1; } else { deviceData.Data27_B0 = 0; } if (deviceData.UpdateTag == null) { deviceData.UpdateTag = 0; } if (deviceData.Data8_B7_1104 == null) { deviceData.Data8_B7_1104 = 0; } if (deviceData.BinSize == null) { deviceData.BinSize = 0; } foreach (PropertyInfo propertyInfo in TempPropertys) { if (propertyInfo.GetValue(deviceData) == null) { propertyInfo.SetValue(deviceData, propertyInfo.GetValue(oldData)); } } return Task.CompletedTask; } private Task WaitPushReply(TcpDataLog packet) { var tryCount = 20; while (MessageQueue!.Any((x => x.DTU_ID == packet.DTU_ID && x.Frame_number == packet.Frame_number))) { if (tryCount <= 0) { MessageQueue!.Remove(packet); throw new BadRequestException("设备应答超时"); } Thread.Sleep(500); tryCount--; } return Task.CompletedTask; } public Task RemoveMessageItem(TcpDataLog packet) { Logger.LogDebug($"删除消息{JsonSerializer.Serialize(packet)}"); MessageQueue!.RemoveAll((x) => x.DTU_ID == packet.DTU_ID && x.Frame_number == packet.Frame_number); return Task.CompletedTask; } } }