You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

341 lines
12 KiB
C#

9 months ago
using Dapr.Actors.Client;
using Dapr.Actors;
using DotNetty.Buffers;
using DotNetty.Handlers.Logging;
using DotNetty.Handlers.Tls;
using DotNetty.Transport.Bootstrapping;
using DotNetty.Transport.Channels;
using DotNetty.Transport.Channels.Sockets;
using LanShengInterface;
using LanShengModel;
using LanShengService.Tcp;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using SqlSugar;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Text.Json;
using System.Data;
using Microsoft.AspNetCore.DataProtection.KeyManagement;
using Dapr.Client;
using DotNetty.Transport.Channels.Groups;
using CommonModel;
using DotNetty.Handlers.Timeout;
using System.Diagnostics;
namespace LanShengService
{
public class TcpService : ITcpService
{
public TcpService(IServiceProvider services)
{
Services = services;
Logger = services.GetRequiredService<ILogger<TcpService>>();
Logger.LogDebug("创建Tcp服务");
DeviceService = services.GetRequiredService<IDeviceService>();
Db = services.GetRequiredService<ISqlSugarClient>();
Db.CodeFirst.SplitTables().InitTables(typeof(TcpDataLog<DeviceData>));
Logger.LogDebug("加载Tcp服务数据结构");
IConfiguration configuration = services.GetRequiredService<IConfiguration>()!;
if (configuration.GetSection("DotNetty").Exists())
{
if (configuration.GetSection("DotNetty:Use").Exists())
{
Use = configuration.GetSection("DotNetty:Use").Value!;
}
if (configuration.GetSection("DotNetty:ThreadPool").Exists())
{
ThreadPool = Convert.ToInt32(configuration.GetSection("DotNetty:ThreadPool").Value!);
}
if (configuration.GetSection("DotNetty:Port").Exists())
{
Port = Convert.ToInt32(configuration.GetSection("DotNetty:Port").Value!);
}
Logger.LogDebug("加载Tcp服务参数");
}
OpenTcpPort();
}
private readonly IServiceProvider Services;
private readonly ILogger Logger;
private readonly string Use = "N";
private readonly int ThreadPool = 1;
private readonly int Port = 8888;
private readonly ISqlSugarClient Db;
private readonly IDeviceService DeviceService;
private ServerBootstrap? Bootstrap;
private IEventLoopGroup? BossGroup;
private IEventLoopGroup? WorkerGroup;
private Dictionary<string, IChannelGroup>? ChannelGroups;
private List<TcpDataLog<DeviceData>>? MessageQueue;
private async Task OpenTcpPort()
{
if (Use == "Y")
{
ChannelGroups = new Dictionary<string, IChannelGroup>();
MessageQueue = new List<TcpDataLog<DeviceData>>();
Bootstrap = new ServerBootstrap();
BossGroup = new MultithreadEventLoopGroup(ThreadPool);
WorkerGroup = new MultithreadEventLoopGroup();
Logger.LogDebug("加载Tcp端口参数");
Bootstrap!.Group(BossGroup, WorkerGroup);
Bootstrap.Channel<TcpServerSocketChannel>();
Bootstrap
.Option(ChannelOption.SoBacklog, 1024)
.Option(ChannelOption.Allocator, UnpooledByteBufferAllocator.Default)
.Option(ChannelOption.RcvbufAllocator,new AdaptiveRecvByteBufAllocator())
.ChildOption(ChannelOption.Allocator, UnpooledByteBufferAllocator.Default)
.ChildOption(ChannelOption.RcvbufAllocator, new AdaptiveRecvByteBufAllocator())
.ChildOption(ChannelOption.SoKeepalive, true)
.ChildOption(ChannelOption.TcpNodelay, true)
.ChildOption(ChannelOption.SoReuseport, true)
.ChildHandler(new ActionChannelInitializer<IChannel>(channel =>
{
IChannelPipeline Pipeline = channel.Pipeline;
Pipeline.AddLast(new IdleStateHandler(10, 0, 0));//心跳
Pipeline.AddLast("encoder", new EncoderHandler());
Pipeline.AddLast("decoder", new DecoderHandler());
Pipeline.AddLast(new HeartBeatHandler());
Pipeline.AddLast(new ChannelHandler(Services));
}));
await Bootstrap.BindAsync(Port);
Logger.LogInformation($"启动Tcp端口监听{Port}");
DeviceService.TimerInsertDataDo();
DeviceService.TimerInsertDataLogDo();
AppDomain.CurrentDomain.ProcessExit += CurrentDomain_ProcessExit;
}
}
private void CurrentDomain_ProcessExit(object? sender, EventArgs e)
{
Logger.LogDebug("程序关闭");
if (Use == "Y")
{
Task.WaitAll(
BossGroup!.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(0), TimeSpan.FromSeconds(0)),
WorkerGroup!.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(0), TimeSpan.FromSeconds(0)));
}
DeviceService.Offline().Wait();
}
public Task AddChannel(string id, IChannelHandlerContext context)
{
if (!ChannelGroups!.ContainsKey(id))
{
lock (this)
{
if (!ChannelGroups!.ContainsKey(id))
{
ChannelGroups.Add(id, new DefaultChannelGroup(context.Executor));
}
}
}
if (!ChannelGroups[id].Contains(context.Channel))
{
lock (this)
{
if (!ChannelGroups[id].Contains(context.Channel))
{
ChannelGroups[id].Add(context.Channel);
Logger.LogDebug($"添加通道{id}:{context.Channel.Id.AsLongText()}");
}
}
}
return Task.CompletedTask;
}
public async Task PushParams(DeviceData deviceData)
{
Logger.LogDebug($"下发指令{JsonSerializer.Serialize(deviceData)}");
if (ChannelGroups == null)
{
throw new BadRequestException("连接服务器失败");
}
else
{
if (!ChannelGroups.ContainsKey(deviceData.Id!))
{
throw new BadRequestException("设备不在线");
}
else
{
await ReloadPush(deviceData);
Random random = new Random();
var Frame_number = 0;
do
{
Frame_number = random.Next(0, 20);
}
while (MessageQueue!.Any(x => x.DTU_ID == deviceData.Id && x.Frame_number == Frame_number));
TcpDataLog<DeviceData> packet = new TcpDataLog<DeviceData>()
{
Frame_START = "FAAA",
Frame_number = Frame_number,
DTU_ID = deviceData.Id,
Msg_ID = "1104",
Frame_END = "BBFB",
Content = deviceData
};
try
{
MessageQueue!.Add(packet);
await ChannelGroups[packet.DTU_ID!].WriteAndFlushAsync(packet);
}
catch
{
MessageQueue!.Remove(packet);
throw new BadRequestException("发送指令失败");
}
await WaitPushReply(packet);
}
}
}
private Task ReloadPush(DeviceData deviceData)
{
var oldData = Db.Queryable<DeviceData>().Where(x => x.Id == deviceData.Id).First();
if (deviceData.Version == null)
{
deviceData.Version = oldData.Version;
}
if (deviceData.DataTick == null)
{
deviceData.DataTick = oldData.DataTick;
}
if (deviceData.Data27_B7 == null)
{
deviceData.Data27_B7 = oldData.Data27_B7;
}
if (deviceData.Data27_B6 == null)
{
deviceData.Data27_B6 = oldData.Data27_B6;
}
if (deviceData.Data27_B5 == null)
{
deviceData.Data27_B5 = oldData.Data27_B5;
}
if (deviceData.Data27_B4 == null)
{
deviceData.Data27_B4 = oldData.Data27_B4;
}
if (deviceData.Data27_B3 == null)
{
deviceData.Data27_B3 = oldData.Data27_B3;
}
if (deviceData.ConnectString != null && deviceData.ConnectString != oldData.ConnectString)
{
deviceData.Data27_B0 = 1;
}
else
{
deviceData.Data27_B0 = 0;
}
if (deviceData.Data23 == null)
{
deviceData.Data23 = oldData.Data23;
}
if (deviceData.Data20 == null)
{
deviceData.Data20 = oldData.Data20;
}
if (deviceData.Data19 == null)
{
deviceData.Data19 = oldData.Data19;
}
if (deviceData.Data18 == null)
{
deviceData.Data18 = oldData.Data18;
}
if (deviceData.Data16_B2 == null)
{
deviceData.Data16_B2 = oldData.Data16_B2;
}
if (deviceData.Data25_B4 == null)
{
deviceData.Data25_B4 = oldData.Data25_B4;
}
if (deviceData.Data25_B0 == null)
{
deviceData.Data25_B0 = oldData.Data25_B0;
}
if (deviceData.Data26_B4 == null)
{
deviceData.Data26_B4 = oldData.Data26_B4;
}
if (deviceData.Data26_B0 == null)
{
deviceData.Data26_B0 = oldData.Data26_B0;
}
if (deviceData.UpdateTag == null)
{
deviceData.UpdateTag = 0;
}
if (deviceData.ConnectString == null)
{
deviceData.ConnectString = oldData.ConnectString;
}
return Task.CompletedTask;
}
private Task WaitPushReply(TcpDataLog<DeviceData> packet)
{
var tryCount = 20;
while (MessageQueue!.Any((x => x.DTU_ID == packet.DTU_ID && x.Frame_number == packet.Frame_number)))
{
if (tryCount <= 0)
{
MessageQueue!.Remove(packet);
throw new BadRequestException("设备应答超时");
}
Thread.Sleep(500);
tryCount--;
}
return Task.CompletedTask;
}
public Task RemoveMessageItem(TcpDataLog<DeviceData> packet)
{
Logger.LogDebug($"删除消息{JsonSerializer.Serialize(packet)}");
MessageQueue!.RemoveAll((x) => x.DTU_ID == packet.DTU_ID && x.Frame_number == packet.Frame_number);
return Task.CompletedTask;
}
}
}