feat: Simple implementation of heartbeats
This commit is contained in:
parent
3d6dbc5aa5
commit
a15c0c2a87
@ -16,12 +16,15 @@ namespace ArStomp
|
||||
/// </summary>
|
||||
public class StompClient
|
||||
{
|
||||
private int heartBeatInMillis;
|
||||
public static bool Debug { get; set; } = false;
|
||||
private WebSocket ws;
|
||||
private volatile bool stompConnected = false;
|
||||
public CancellationTokenSource Token { get; } = new CancellationTokenSource();
|
||||
private readonly X509Certificate2Collection certCollection;
|
||||
private readonly Dictionary<string, Subscription> subs = new Dictionary<string, Subscription>();
|
||||
private DateTime lastSend = DateTime.UtcNow;
|
||||
private DateTime lastRcvd = DateTime.UtcNow;
|
||||
/// <summary>
|
||||
/// Handler of incoming messages.
|
||||
/// Used only for RPC. <see cref="Subscription">Subsscriptions</see> have own handlers.
|
||||
@ -33,14 +36,12 @@ namespace ArStomp
|
||||
/// Works with binary frames.
|
||||
/// </summary>
|
||||
/// <param name="certCollection">collection of root ca certificates (if TLS is used)</param>
|
||||
public StompClient(X509Certificate2Collection certCollection = null)
|
||||
public StompClient(X509Certificate2Collection certCollection = null, int heartBeatSec = 20)
|
||||
{
|
||||
this.heartBeatInMillis = heartBeatSec * 1000;
|
||||
if (certCollection != null && certCollection.Count > 0)
|
||||
{
|
||||
this.certCollection = certCollection;
|
||||
|
||||
// In .netstandard2.0 - setup validator globally
|
||||
// System.Net.ServicePointManager.ServerCertificateValidationCallback = RemoteCertificateValidationCallback;
|
||||
}
|
||||
}
|
||||
|
||||
@ -101,7 +102,7 @@ namespace ArStomp
|
||||
bool anytrusted = false;
|
||||
foreach (var cert in certCollection)
|
||||
{
|
||||
if (Debug) System.Console.WriteLine("Anytrust: {0}, {1} =? {2}", projectedRootCert.Thumbprint.ToString(),cert.Thumbprint.ToString(), (projectedRootCert.Thumbprint == cert.Thumbprint));
|
||||
if (Debug) System.Console.WriteLine("Anytrust: {0}, {1} =? {2}", projectedRootCert.Thumbprint.ToString(), cert.Thumbprint.ToString(), (projectedRootCert.Thumbprint == cert.Thumbprint));
|
||||
anytrusted = anytrusted || (projectedRootCert.Thumbprint == cert.Thumbprint);
|
||||
}
|
||||
if (!anytrusted)
|
||||
@ -145,7 +146,7 @@ namespace ArStomp
|
||||
public async Task Connect(Uri uri, string login, string password)
|
||||
{
|
||||
if (ws != null) throw new Exception("Cannot connect in this state. Should close before");
|
||||
ws = new WebSocket( uri.ToString(), "v12.stomp");
|
||||
ws = new WebSocket(uri.ToString(), "v12.stomp");
|
||||
if (uri.Scheme == "wss")
|
||||
{
|
||||
if (certCollection != null)
|
||||
@ -172,12 +173,14 @@ namespace ArStomp
|
||||
ws.OnOpen += (sender, o) =>
|
||||
{
|
||||
if (Debug) System.Console.WriteLine("OnOpen");
|
||||
StompFrm connect = new StompFrm(login, password);
|
||||
StompFrm connect = new StompFrm(login, password, heartBeatInMillis);
|
||||
lastSend = DateTime.UtcNow;
|
||||
Task.Run(() => { return connect.Serialize(ws, ct); });
|
||||
};
|
||||
|
||||
ws.OnMessage += (sender, o) =>
|
||||
{
|
||||
lastRcvd = DateTime.UtcNow;
|
||||
try
|
||||
{
|
||||
if (Debug)
|
||||
@ -195,7 +198,8 @@ namespace ArStomp
|
||||
ExpectFrame(fr, FrameType.Connected);
|
||||
openTask.TrySetResult(true);
|
||||
if (Debug) System.Console.WriteLine("Logon done");
|
||||
} catch (Exception e)
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
openTask.TrySetException(e);
|
||||
}
|
||||
@ -224,7 +228,7 @@ namespace ArStomp
|
||||
/// <returns>true if it looks like we have proper connection to server</returns>
|
||||
public bool IsConnected()
|
||||
{
|
||||
return ws != null && ws.IsAlive;
|
||||
return ws != null && ws.IsAlive && (DateTime.UtcNow - lastRcvd) < TimeSpan.FromMilliseconds(3 * heartBeatInMillis);
|
||||
}
|
||||
/// <summary>
|
||||
/// Send message
|
||||
@ -238,6 +242,7 @@ namespace ArStomp
|
||||
|
||||
SendFrm send = new SendFrm(destination, correlationId, body);
|
||||
await send.Serialize(ws, ct);
|
||||
lastSend = DateTime.UtcNow;
|
||||
}
|
||||
|
||||
private int SubId = 0;
|
||||
@ -308,6 +313,12 @@ namespace ArStomp
|
||||
}
|
||||
}
|
||||
}
|
||||
if (fr.Type == FrameType.Heartbeat || DateTime.UtcNow - lastSend > TimeSpan.FromSeconds(20))
|
||||
{
|
||||
lastSend = DateTime.UtcNow;
|
||||
ws.Send(new byte[] { 10 }); // send heartbeat
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
catch (Exception e)
|
||||
|
@ -103,18 +103,18 @@ namespace ArStomp
|
||||
stream.Position = 0;
|
||||
ws.Send(stream, (int) stream.Length);
|
||||
return Task.CompletedTask;
|
||||
//return ws.SendAsync(new ArraySegment<byte>(array, 0, (int)stream.Position), WebSocketMessageType.Binary, true, cancellationToken);
|
||||
}
|
||||
}
|
||||
|
||||
internal class StompFrm : Frame
|
||||
{
|
||||
public StompFrm(string login, string passwd)
|
||||
public StompFrm(string login, string passwd, int hbmilis)
|
||||
{
|
||||
Type = FrameType.Stomp;
|
||||
Headers["login"] = login;
|
||||
Headers["passcode"] = passwd;
|
||||
Headers["accept-version"] = "1.2";
|
||||
if (hbmilis > 0) Headers["heart-beat"] = $"{hbmilis},{hbmilis}";
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user