标签:
关键字:
udpclient、Udp超时、软备份、事件通知
问题场景:
现实环境中为了保证程序健壮性会采用守护进程(看门狗)、备份程序等方式,实现理论上的热备。下文采用UDP协议配合自定义上层协议流程,实现一种简单的双机备份策略。两个功能程序 具有相同业务功能 但同一时刻只有一个进行服务,另外一个下文称为兄弟,进行待命。当主失去连接或发生命令指派时,改变自身状态为主进行服务。
该模型可抽象理解为一种进程间互斥,即保证二者状态不一致。程序初始双方状态都为未指定,即二者初始地位平等,通过协商报文,随机指定其一为主,则兄弟程序自动改变自身状态为备。主进行状态保持报文发送,备进行响应以确定对方状态。通过超时控制解决兄弟其一断线通知。并通过事件将状态改变、连接状态及重要报文信息抛出给订阅者。
主 | 备 | 协商 | 指派 | 连接 | 回执 | 在线 | |
active | standby | negotiate | designed | connect | receipt | Frame | Online |
1 /// <summary> 2 /// 2015-05-11 3 /// kisi 4 /// 双机备份协商协议实现 5 /// 定长 6 /// FH | Head | ASState | ID | data | 7 /// </summary> 8 /// <summary> 9 /// 数据帧开始标志 10 /// </summary> 11 private byte FrameHead = 0xFF; 12 13 /// <summary> 14 /// 定义端口 15 /// </summary> 16 private int listenPort = 1101; 17 18 /// <summary> 19 /// 数据帧类型 20 /// 协商,发送token 21 /// 指定,确定主从 22 /// 连接,保持联系 23 /// </summary> 24 private enum HeadOptions 25 { 26 // The flag for 协商 is 10100000. 27 Negotiate = 0xA0, 28 // The flag for 指定 is 10100101. 29 Designate = 0xA5, 30 // The flag for 连接保持 is 10101111. 31 Connected = 0xAA, 32 // The flag for 响应报文 33 Responsed = 0xAF 34 } 35 36 private enum StateOptions 37 { 38 // The Initial state 39 Initial = 0x00, 40 // 主,进行数据采集及业务逻辑 41 Active = 0x01, 42 // 备,进行数据同步及连接状态保持 43 Standby = 0x02 44 }
UDP封装部分
1 private IPEndPoint _RemoteIPEndPoint = null; 2 // 定义UDP发送和接收client 3 private UdpClient uclientReceive = null; 4 private UdpClient uclientSend = null; 5 private ManualResetEvent sendDone = new ManualResetEvent(false); 6 private ManualResetEvent receiveDone = new ManualResetEvent(false); 7 //初始状态未指定 8 private StateOptions state = StateOptions.Initial; 9 private byte value = 0; 10 private byte frameid = 0; 11 private byte lossFramecount = 0; 12 private List<SendFrame> listSend = new List<SendFrame>(); 13 private List<int> listRemaindel = new List<int>(); 14 private bool initial = true; 15 private bool initialconnect = true; 16 private DateTime lastConnectTime = DateTime.Now; 17 object lockList = new object(); 18 public StateEvent trevent = new StateEvent(); 19 20 public ASNegotiate(string remoteip, int port) 21 { 22 if (port > 1000) 23 listenPort = port; 24 _RemoteIPEndPoint = new IPEndPoint(IPAddress.Parse(remoteip), listenPort); 25 this.value = GetRandom(); 26 } 27 28 private byte GetRandom() 29 { 30 return Convert.ToByte(new Random().Next(0, 255)); 31 } 32 33 private byte[] Package(HeadOptions head, byte id, StateOptions state, byte value) 34 { 35 byte[] byteData = new byte[5]; 36 byteData[0] = FrameHead; 37 byteData[1] = (byte)head; 38 byteData[2] = (byte)state; 39 byteData[3] = id; 40 byteData[4] = value; 41 return byteData; 42 } 43 44 #region 发送 45 private void SendMsg(byte[] sendbytes) 46 { 47 if (uclientSend == null) 48 uclientSend = new UdpClient(); 49 uclientSend.BeginSend(sendbytes, sendbytes.Length, _RemoteIPEndPoint, new AsyncCallback(SendCallback), uclientSend); 50 sendDone.WaitOne(); 51 } 52 53 private void SendCallback(IAsyncResult iar) 54 { 55 UdpClient u = (UdpClient)iar.AsyncState; 56 string msg = string.Format("{0}: Frameid={1} : value ={2}number of bytes sent: {3}", state.ToString(), frameid, value, u.EndSend(iar)); 57 trevent.RaiseEvent((byte)state,0,msg); 58 Console.WriteLine(msg); 59 sendDone.Set(); 60 } 61 #endregion 62 63 #region 接收 64 public void ReceiveMsg() 65 { 66 IPEndPoint e = new IPEndPoint(IPAddress.Any, listenPort); 67 uclientReceive = new UdpClient(e); 68 while (true) 69 { 70 uclientReceive.BeginReceive(new AsyncCallback(ReceiveCallback), uclientReceive); 71 receiveDone.WaitOne(); 72 Thread.Sleep(100); 73 } 74 75 } 76 77 private void ReceiveCallback(IAsyncResult iar) 78 { 79 UdpClient u = (UdpClient)iar.AsyncState; 80 IPEndPoint e = null; 81 if (iar.IsCompleted) 82 { 83 Byte[] receiveBytes = u.EndReceive(iar, ref e); 84 if (CheckFrame(receiveBytes)) 85 { 86 string msg = string.Format("Received: headoption:{0}-state:{1}-{2}-{3}", receiveBytes[1], receiveBytes[2], receiveBytes[3], receiveBytes[4]); 87 Console.WriteLine(msg); 88 trevent.RaiseEvent((byte)state, 0, msg); 89 AnalysisFrame(receiveBytes[1], receiveBytes[2], receiveBytes[3], receiveBytes[4]); 90 } 91 } 92 receiveDone.Set(); 93 } 94 #endregion
1 public class SendFrame 2 { 3 private int _ID; 4 5 public int ID 6 { 7 get { return _ID; } 8 set { _ID = value; } 9 } 10 11 private DateTime _SendTime; 12 13 public DateTime SendTime 14 { 15 get { return _SendTime; } 16 set { _SendTime = value; } 17 } 18 19 public SendFrame(int id, DateTime datetime) 20 { 21 this.ID = id; 22 this.SendTime = datetime; 23 } 24 25 public override int GetHashCode() 26 { 27 return ID; 28 } 29 30 public override bool Equals(object obj) 31 { 32 if (obj == null) return false; 33 SendFrame objAsFrame = obj as SendFrame; 34 if (objAsFrame == null) return false; 35 else return Equals(objAsFrame); 36 } 37 38 public bool Equals(SendFrame other) 39 { 40 if (other == null) return false; 41 return (this.ID.Equals(other.ID)); 42 } 43 } 44 45 //以下为ASNegotiate类内容定义一个发送的对象的list 46 private List<SendFrame> listSend = new List<SendFrame>(); 47 private List<int> listRemaindel = new List<int>(); 48 public void ReceiptTimeout() 49 { 50 while (true) 51 { 52 if (state == StateOptions.Standby) 53 { 54 if ((DateTime.Now - lastConnectTime).TotalSeconds > 60) 55 { 56 state = StateOptions.Active; 57 string msg = "失去连接,我要激活"; 58 Console.WriteLine(msg); 59 trevent.RaiseEvent((byte)state, 0, msg); 60 } 61 } 62 listRemaindel.Clear(); 63 lock (lockList) 64 { 65 foreach (SendFrame _frame in listSend) 66 { 67 if ((DateTime.Now - _frame.SendTime).TotalSeconds > 10) 68 { 69 lossFramecount++; 70 listRemaindel.Add(_frame.ID); 71 } 72 } 73 foreach (int _frameid in listRemaindel) 74 { 75 listSend.Remove(new SendFrame(_frameid, DateTime.Now)); 76 } 77 if (lossFramecount > 5) 78 { 79 lossFramecount = 0; 80 if (state == StateOptions.Active) 81 { 82 string msg = "standy 端超时"; 83 Console.WriteLine(msg); 84 trevent.RaiseEvent((byte)state, 0, msg); 85 } 86 else if (state == StateOptions.Initial) 87 { 88 state = StateOptions.Active; 89 initial = false; 90 string msg = "无法协商,我要激活"; 91 Console.WriteLine(msg); 92 trevent.RaiseEvent((byte)state, 0, msg); 93 } 94 } 95 } 96 Thread.Sleep(1000); 97 } 98 }
事件通知
1 public class StateEvent 2 { 3 //定义事件参数类 4 public class StateEventArgs : EventArgs 5 { 6 public readonly byte state; 7 8 public readonly byte online; 9 10 public readonly string msg; 11 12 public StateEventArgs(byte _state, byte _online, string value) 13 { 14 state = _state; 15 online = _online; 16 msg = value; 17 } 18 } 19 20 public delegate void MyEventHandler(object sender, StateEventArgs e); 21 22 //用event 关键字声明事件对象 23 24 public event MyEventHandler ASEvent; 25 26 //事件触发方法 27 protected virtual void OnTestEvent(StateEventArgs e) 28 { 29 30 if (ASEvent != null) 31 ASEvent(this, e); 32 33 } 34 35 //引发事件 36 public void RaiseEvent(byte _state, byte _online, string value) 37 { 38 StateEventArgs e = new StateEventArgs(_state, _online, value); 39 OnTestEvent(e); 40 } 41 }
调用方式,采用winform窗体调用,订阅事件,注意事件响应回显非界面线程,窗体显示内容需要invoke。
1 private void button3_Click(object sender, EventArgs e) 2 { 3 string remoteip = this.txtbrotherip.Text.Trim(); 4 ASNegotiate m_negotiate = new ASNegotiate(remoteip, 0); 5 m_negotiate.trevent.ASEvent += new StateEvent.MyEventHandler(GenerateEvent); 6 m_negotiate.Start(); 7 } 8 9 public void GenerateEvent(object sender, StateEvent.StateEventArgs e) 10 { 11 string state = ""; 12 string online = ""; 13 if (e.state == 0x00) 14 { 15 state = "未指定"; 16 } 17 else if (e.state == 0x01) 18 { 19 state = "主"; 20 } 21 else if (e.state == 0x02) 22 { 23 state = "备"; 24 } 25 else 26 { 27 state = ""; 28 } 29 if (e.online == 0x00) 30 { 31 online = "兄弟在线"; 32 } 33 else if (e.online == 0x01) 34 { 35 online = "兄弟离线"; 36 } 37 else 38 { 39 online = ""; 40 } 41 lock (objlock) 42 { 43 stateout(state); 44 onlineout(online); 45 MessageOut(e.msg); 46 } 47 }
标签:
原文地址:http://www.cnblogs.com/beeshow00/p/4498886.html