Compare commits

...

5 Commits

Author SHA1 Message Date
Arkadiusz Rychliński
5de224de85 chore: Remove some unused variables 2021-12-30 14:48:26 +01:00
Arek
6884d5715f fix: Check for 13/10 EOL in heartbeat frame 2021-06-27 16:23:56 +02:00
Arek
8b7d743dad feat: slightly more expressive exception message
More datails in exception during header parsing.
2021-06-27 16:06:11 +02:00
Arek
a15c0c2a87 feat: Simple implementation of heartbeats 2021-06-01 16:53:51 +02:00
Arek
3d6dbc5aa5 fix: EnabledSslProtocols now is set always when wss scheme is used 2021-05-21 11:11:29 +02:00
3 changed files with 76 additions and 35 deletions

View File

@ -81,19 +81,23 @@ namespace ArStomp
internal static Frame GetFrame(byte[] msgBuffer, CancellationToken cancellationToken) internal static Frame GetFrame(byte[] msgBuffer, CancellationToken cancellationToken)
{ {
var utf8 = Encoding.UTF8;
var inputstream = new MemoryStream(msgBuffer); var inputstream = new MemoryStream(msgBuffer);
var bodyoutput = new MemoryStream();
if (inputstream.ReadByte() == 10) var firstByte = inputstream.ReadByte();
if (firstByte == 10)
{ {
return HeartbeatFrame; return HeartbeatFrame;
} }
else else if (firstByte == 13) {
{ var secondByte = inputstream.ReadByte();
inputstream.Seek(0, SeekOrigin.Begin); if (secondByte == 10) {
return HeartbeatFrame;
} else {
throw new Exception("Invalid frame");
}
} }
// start from beginning
inputstream.Seek(0, SeekOrigin.Begin);
StreamReader reader = findBody(inputstream); StreamReader reader = findBody(inputstream);
@ -115,7 +119,7 @@ namespace ArStomp
var colon = line.IndexOf(":"); var colon = line.IndexOf(":");
if (colon < 1) // must exist and cannot by first character in the line if (colon < 1) // must exist and cannot by first character in the line
{ {
throw new Exception("Cannot parse header"); throw new Exception($"Cannot parse header: colon not found. Cmd: {cmd}, line: `${line}`");
} }
var key = line.Substring(0, colon); var key = line.Substring(0, colon);
var value = line.Substring(colon + 1); var value = line.Substring(colon + 1);

View File

@ -16,12 +16,15 @@ namespace ArStomp
/// </summary> /// </summary>
public class StompClient public class StompClient
{ {
private int heartBeatInMillis;
public static bool Debug { get; set; } = false; public static bool Debug { get; set; } = false;
private WebSocket ws; private WebSocket ws;
private volatile bool stompConnected = false; private volatile bool stompConnected = false;
public CancellationTokenSource Token { get; } = new CancellationTokenSource(); public CancellationTokenSource Token { get; } = new CancellationTokenSource();
private readonly X509Certificate2Collection certCollection; private readonly X509Certificate2Collection certCollection;
private readonly Dictionary<string, Subscription> subs = new Dictionary<string, Subscription>(); private readonly Dictionary<string, Subscription> subs = new Dictionary<string, Subscription>();
private DateTime lastSend = DateTime.UtcNow;
private DateTime lastRcvd = DateTime.UtcNow;
/// <summary> /// <summary>
/// Handler of incoming messages. /// Handler of incoming messages.
/// Used only for RPC. <see cref="Subscription">Subsscriptions</see> have own handlers. /// Used only for RPC. <see cref="Subscription">Subsscriptions</see> have own handlers.
@ -33,14 +36,12 @@ namespace ArStomp
/// Works with binary frames. /// Works with binary frames.
/// </summary> /// </summary>
/// <param name="certCollection">collection of root ca certificates (if TLS is used)</param> /// <param name="certCollection">collection of root ca certificates (if TLS is used)</param>
public StompClient(X509Certificate2Collection certCollection = null) public StompClient(X509Certificate2Collection certCollection = null, int heartBeatSec = 20)
{ {
this.heartBeatInMillis = heartBeatSec * 1000;
if (certCollection != null && certCollection.Count > 0) if (certCollection != null && certCollection.Count > 0)
{ {
this.certCollection = certCollection; this.certCollection = certCollection;
// In .netstandard2.0 - setup validator globally
// System.Net.ServicePointManager.ServerCertificateValidationCallback = RemoteCertificateValidationCallback;
} }
} }
@ -66,25 +67,55 @@ namespace ArStomp
/// <returns>true if server certificate is valid, false otherwise</returns> /// <returns>true if server certificate is valid, false otherwise</returns>
private bool RemoteCertificateValidationCallback(object sender, X509Certificate certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors) private bool RemoteCertificateValidationCallback(object sender, X509Certificate certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors)
{ {
if (Debug) Console.WriteLine("Custom RemoteCertificateValidationCallback"); if (Debug)
{
System.Console.WriteLine("Subject: {0}", certificate.Subject.ToString());
System.Console.WriteLine("Cert: {0}", certificate.ToString());
}
// if there is no detected problems we can say OK // if there is no detected problems we can say OK
if ((sslPolicyErrors & (SslPolicyErrors.None)) > 0) return true; if ((sslPolicyErrors & (SslPolicyErrors.None)) > 0)
{
if (Debug) System.Console.WriteLine("Cert OK: ((sslPolicyErrors & (SslPolicyErrors.None)) > 0)");
return true;
}
// sins that cannot be forgiven // sins that cannot be forgiven
if ( if (
(sslPolicyErrors & (SslPolicyErrors.RemoteCertificateNameMismatch)) > 0 || (sslPolicyErrors & (SslPolicyErrors.RemoteCertificateNameMismatch)) > 0 ||
(sslPolicyErrors & (SslPolicyErrors.RemoteCertificateNotAvailable)) > 0 (sslPolicyErrors & (SslPolicyErrors.RemoteCertificateNotAvailable)) > 0
) return false; )
{
if (Debug) System.Console.WriteLine("Cert Fail: (sslPolicyErrors & (SslPolicyErrors.RemoteCertificateNameMismatch)) > 0 - {0}", (sslPolicyErrors & (SslPolicyErrors.RemoteCertificateNameMismatch)) > 0);
if (Debug) System.Console.WriteLine("Cert Fail: (sslPolicyErrors & (SslPolicyErrors.RemoteCertificateNotAvailable)) > 0 - {0}", (sslPolicyErrors & (SslPolicyErrors.RemoteCertificateNotAvailable)) > 0);
return false;
}
if (Debug)
{
System.Console.WriteLine("Chain:");
foreach (var ce in chain.ChainElements)
{
System.Console.WriteLine("Element: {0}", ce.Certificate);
}
}
// last certificate in chain should be one of our trust anchors // last certificate in chain should be one of our trust anchors
X509Certificate2 projectedRootCert = chain.ChainElements[chain.ChainElements.Count - 1].Certificate; X509Certificate2 projectedRootCert = chain.ChainElements[chain.ChainElements.Count - 1].Certificate;
// check if server's root ca is one of our trusted // check if server's root ca is one of our trusted
bool anytrusted = false; bool anytrusted = false;
foreach (var cert in certCollection) foreach (var cert in certCollection)
{ {
if (Debug) System.Console.WriteLine("Anytrust: {0}, {1} =? {2}", projectedRootCert.Thumbprint.ToString(), cert.Thumbprint.ToString(), (projectedRootCert.Thumbprint == cert.Thumbprint));
anytrusted = anytrusted || (projectedRootCert.Thumbprint == cert.Thumbprint); anytrusted = anytrusted || (projectedRootCert.Thumbprint == cert.Thumbprint);
} }
if (!anytrusted) return false; if (!anytrusted)
{
if (Debug) System.Console.WriteLine("Cert Fail: (!anytrusted)");
return false;
}
// any other problems than unknown CA? // any other problems than unknown CA?
if (chain.ChainStatus.Any(statusFlags => statusFlags.Status != X509ChainStatusFlags.UntrustedRoot)) return false; if (chain.ChainStatus.Any(statusFlags => statusFlags.Status != X509ChainStatusFlags.UntrustedRoot))
{
if (Debug) System.Console.WriteLine("Cert Fail: chain.ChainStatus.Any(statusFlags => statusFlags.Status != X509ChainStatusFlags.UntrustedRoot)");
return false;
}
// everything OK // everything OK
if (Debug) Console.WriteLine("Certificate OK"); if (Debug) Console.WriteLine("Certificate OK");
return true; return true;
@ -115,10 +146,13 @@ namespace ArStomp
public async Task Connect(Uri uri, string login, string password) public async Task Connect(Uri uri, string login, string password)
{ {
if (ws != null) throw new Exception("Cannot connect in this state. Should close before"); if (ws != null) throw new Exception("Cannot connect in this state. Should close before");
ws = new WebSocket( uri.ToString(), "v12.stomp"); ws = new WebSocket(uri.ToString(), "v12.stomp");
if (uri.Scheme == "wss" && certCollection != null) if (uri.Scheme == "wss")
{ {
ws.SslConfiguration.ServerCertificateValidationCallback = RemoteCertificateValidationCallback; if (certCollection != null)
{
ws.SslConfiguration.ServerCertificateValidationCallback = RemoteCertificateValidationCallback;
}
ws.SslConfiguration.EnabledSslProtocols = System.Security.Authentication.SslProtocols.Tls12; ws.SslConfiguration.EnabledSslProtocols = System.Security.Authentication.SslProtocols.Tls12;
} }
var ct = Token.Token; var ct = Token.Token;
@ -139,12 +173,14 @@ namespace ArStomp
ws.OnOpen += (sender, o) => ws.OnOpen += (sender, o) =>
{ {
if (Debug) System.Console.WriteLine("OnOpen"); if (Debug) System.Console.WriteLine("OnOpen");
StompFrm connect = new StompFrm(login, password); StompFrm connect = new StompFrm(login, password, heartBeatInMillis);
lastSend = DateTime.UtcNow;
Task.Run(() => { return connect.Serialize(ws, ct); }); Task.Run(() => { return connect.Serialize(ws, ct); });
}; };
ws.OnMessage += (sender, o) => ws.OnMessage += (sender, o) =>
{ {
lastRcvd = DateTime.UtcNow;
try try
{ {
if (Debug) if (Debug)
@ -162,7 +198,8 @@ namespace ArStomp
ExpectFrame(fr, FrameType.Connected); ExpectFrame(fr, FrameType.Connected);
openTask.TrySetResult(true); openTask.TrySetResult(true);
if (Debug) System.Console.WriteLine("Logon done"); if (Debug) System.Console.WriteLine("Logon done");
} catch (Exception e) }
catch (Exception e)
{ {
openTask.TrySetException(e); openTask.TrySetException(e);
} }
@ -191,7 +228,7 @@ namespace ArStomp
/// <returns>true if it looks like we have proper connection to server</returns> /// <returns>true if it looks like we have proper connection to server</returns>
public bool IsConnected() public bool IsConnected()
{ {
return ws != null && ws.IsAlive; return ws != null && ws.IsAlive && (DateTime.UtcNow - lastRcvd) < TimeSpan.FromMilliseconds(3 * heartBeatInMillis);
} }
/// <summary> /// <summary>
/// Send message /// Send message
@ -205,6 +242,7 @@ namespace ArStomp
SendFrm send = new SendFrm(destination, correlationId, body); SendFrm send = new SendFrm(destination, correlationId, body);
await send.Serialize(ws, ct); await send.Serialize(ws, ct);
lastSend = DateTime.UtcNow;
} }
private int SubId = 0; private int SubId = 0;
@ -248,15 +286,8 @@ namespace ArStomp
var ct = Token.Token; var ct = Token.Token;
try try
{ {
Frame fr = null; Frame fr = Helpers.GetFrame(msg.RawData, ct);
try
{
fr = Helpers.GetFrame(msg.RawData, ct);
}
catch (Exception e)
{
throw e;
}
if (fr.Type == FrameType.Error) ExpectFrame(fr, FrameType.Message); if (fr.Type == FrameType.Error) ExpectFrame(fr, FrameType.Message);
if (fr.Type == FrameType.Message) if (fr.Type == FrameType.Message)
{ {
@ -275,6 +306,12 @@ namespace ArStomp
} }
} }
} }
if (fr.Type == FrameType.Heartbeat || DateTime.UtcNow - lastSend > TimeSpan.FromSeconds(20))
{
lastSend = DateTime.UtcNow;
ws.Send(new byte[] { 10 }); // send heartbeat
}
} }
catch (Exception e) catch (Exception e)

View File

@ -103,18 +103,18 @@ namespace ArStomp
stream.Position = 0; stream.Position = 0;
ws.Send(stream, (int) stream.Length); ws.Send(stream, (int) stream.Length);
return Task.CompletedTask; return Task.CompletedTask;
//return ws.SendAsync(new ArraySegment<byte>(array, 0, (int)stream.Position), WebSocketMessageType.Binary, true, cancellationToken);
} }
} }
internal class StompFrm : Frame internal class StompFrm : Frame
{ {
public StompFrm(string login, string passwd) public StompFrm(string login, string passwd, int hbmilis)
{ {
Type = FrameType.Stomp; Type = FrameType.Stomp;
Headers["login"] = login; Headers["login"] = login;
Headers["passcode"] = passwd; Headers["passcode"] = passwd;
Headers["accept-version"] = "1.2"; Headers["accept-version"] = "1.2";
if (hbmilis > 0) Headers["heart-beat"] = $"{hbmilis},{hbmilis}";
} }
} }