From a15c0c2a87bde001202cad0f3ae28bb6b6f528c4 Mon Sep 17 00:00:00 2001 From: Arek Date: Tue, 1 Jun 2021 16:53:51 +0200 Subject: [PATCH] feat: Simple implementation of heartbeats --- arstomp/src/StompClient.cs | 43 ++++++++++++++++++++++++-------------- arstomp/src/StompFrame.cs | 4 ++-- 2 files changed, 29 insertions(+), 18 deletions(-) diff --git a/arstomp/src/StompClient.cs b/arstomp/src/StompClient.cs index c12f9fe..3fdf878 100644 --- a/arstomp/src/StompClient.cs +++ b/arstomp/src/StompClient.cs @@ -16,12 +16,15 @@ namespace ArStomp /// public class StompClient { + private int heartBeatInMillis; public static bool Debug { get; set; } = false; private WebSocket ws; private volatile bool stompConnected = false; public CancellationTokenSource Token { get; } = new CancellationTokenSource(); private readonly X509Certificate2Collection certCollection; private readonly Dictionary subs = new Dictionary(); + private DateTime lastSend = DateTime.UtcNow; + private DateTime lastRcvd = DateTime.UtcNow; /// /// Handler of incoming messages. /// Used only for RPC. Subsscriptions have own handlers. @@ -33,14 +36,12 @@ namespace ArStomp /// Works with binary frames. /// /// collection of root ca certificates (if TLS is used) - public StompClient(X509Certificate2Collection certCollection = null) + public StompClient(X509Certificate2Collection certCollection = null, int heartBeatSec = 20) { + this.heartBeatInMillis = heartBeatSec * 1000; if (certCollection != null && certCollection.Count > 0) { - this.certCollection = certCollection; - - // In .netstandard2.0 - setup validator globally - // System.Net.ServicePointManager.ServerCertificateValidationCallback = RemoteCertificateValidationCallback; + this.certCollection = certCollection; } } @@ -67,10 +68,10 @@ namespace ArStomp private bool RemoteCertificateValidationCallback(object sender, X509Certificate certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors) { if (Debug) - { + { System.Console.WriteLine("Subject: {0}", certificate.Subject.ToString()); System.Console.WriteLine("Cert: {0}", certificate.ToString()); - } + } // if there is no detected problems we can say OK if ((sslPolicyErrors & (SslPolicyErrors.None)) > 0) { @@ -88,20 +89,20 @@ namespace ArStomp return false; } if (Debug) - { + { System.Console.WriteLine("Chain:"); foreach (var ce in chain.ChainElements) - { + { System.Console.WriteLine("Element: {0}", ce.Certificate); - } - } + } + } // last certificate in chain should be one of our trust anchors X509Certificate2 projectedRootCert = chain.ChainElements[chain.ChainElements.Count - 1].Certificate; // check if server's root ca is one of our trusted bool anytrusted = false; foreach (var cert in certCollection) { - if (Debug) System.Console.WriteLine("Anytrust: {0}, {1} =? {2}", projectedRootCert.Thumbprint.ToString(),cert.Thumbprint.ToString(), (projectedRootCert.Thumbprint == cert.Thumbprint)); + if (Debug) System.Console.WriteLine("Anytrust: {0}, {1} =? {2}", projectedRootCert.Thumbprint.ToString(), cert.Thumbprint.ToString(), (projectedRootCert.Thumbprint == cert.Thumbprint)); anytrusted = anytrusted || (projectedRootCert.Thumbprint == cert.Thumbprint); } if (!anytrusted) @@ -145,7 +146,7 @@ namespace ArStomp public async Task Connect(Uri uri, string login, string password) { if (ws != null) throw new Exception("Cannot connect in this state. Should close before"); - ws = new WebSocket( uri.ToString(), "v12.stomp"); + ws = new WebSocket(uri.ToString(), "v12.stomp"); if (uri.Scheme == "wss") { if (certCollection != null) @@ -172,12 +173,14 @@ namespace ArStomp ws.OnOpen += (sender, o) => { if (Debug) System.Console.WriteLine("OnOpen"); - StompFrm connect = new StompFrm(login, password); + StompFrm connect = new StompFrm(login, password, heartBeatInMillis); + lastSend = DateTime.UtcNow; Task.Run(() => { return connect.Serialize(ws, ct); }); }; ws.OnMessage += (sender, o) => { + lastRcvd = DateTime.UtcNow; try { if (Debug) @@ -195,7 +198,8 @@ namespace ArStomp ExpectFrame(fr, FrameType.Connected); openTask.TrySetResult(true); if (Debug) System.Console.WriteLine("Logon done"); - } catch (Exception e) + } + catch (Exception e) { openTask.TrySetException(e); } @@ -224,7 +228,7 @@ namespace ArStomp /// true if it looks like we have proper connection to server public bool IsConnected() { - return ws != null && ws.IsAlive; + return ws != null && ws.IsAlive && (DateTime.UtcNow - lastRcvd) < TimeSpan.FromMilliseconds(3 * heartBeatInMillis); } /// /// Send message @@ -238,6 +242,7 @@ namespace ArStomp SendFrm send = new SendFrm(destination, correlationId, body); await send.Serialize(ws, ct); + lastSend = DateTime.UtcNow; } private int SubId = 0; @@ -308,6 +313,12 @@ namespace ArStomp } } } + if (fr.Type == FrameType.Heartbeat || DateTime.UtcNow - lastSend > TimeSpan.FromSeconds(20)) + { + lastSend = DateTime.UtcNow; + ws.Send(new byte[] { 10 }); // send heartbeat + } + } catch (Exception e) diff --git a/arstomp/src/StompFrame.cs b/arstomp/src/StompFrame.cs index e67a4b1..921e9f5 100644 --- a/arstomp/src/StompFrame.cs +++ b/arstomp/src/StompFrame.cs @@ -103,18 +103,18 @@ namespace ArStomp stream.Position = 0; ws.Send(stream, (int) stream.Length); return Task.CompletedTask; - //return ws.SendAsync(new ArraySegment(array, 0, (int)stream.Position), WebSocketMessageType.Binary, true, cancellationToken); } } internal class StompFrm : Frame { - public StompFrm(string login, string passwd) + public StompFrm(string login, string passwd, int hbmilis) { Type = FrameType.Stomp; Headers["login"] = login; Headers["passcode"] = passwd; Headers["accept-version"] = "1.2"; + if (hbmilis > 0) Headers["heart-beat"] = $"{hbmilis},{hbmilis}"; } }