Compare commits
No commits in common. "5de224de85938f3ddc5b5bae1b78ae804db20a24" and "c7a45b4663edb47b908c47401dcc476c57fd54e9" have entirely different histories.
5de224de85
...
c7a45b4663
@ -81,23 +81,19 @@ namespace ArStomp
|
|||||||
|
|
||||||
internal static Frame GetFrame(byte[] msgBuffer, CancellationToken cancellationToken)
|
internal static Frame GetFrame(byte[] msgBuffer, CancellationToken cancellationToken)
|
||||||
{
|
{
|
||||||
var inputstream = new MemoryStream(msgBuffer);
|
var utf8 = Encoding.UTF8;
|
||||||
|
|
||||||
var firstByte = inputstream.ReadByte();
|
var inputstream = new MemoryStream(msgBuffer);
|
||||||
if (firstByte == 10)
|
var bodyoutput = new MemoryStream();
|
||||||
|
|
||||||
|
if (inputstream.ReadByte() == 10)
|
||||||
{
|
{
|
||||||
return HeartbeatFrame;
|
return HeartbeatFrame;
|
||||||
}
|
}
|
||||||
else if (firstByte == 13) {
|
else
|
||||||
var secondByte = inputstream.ReadByte();
|
{
|
||||||
if (secondByte == 10) {
|
|
||||||
return HeartbeatFrame;
|
|
||||||
} else {
|
|
||||||
throw new Exception("Invalid frame");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// start from beginning
|
|
||||||
inputstream.Seek(0, SeekOrigin.Begin);
|
inputstream.Seek(0, SeekOrigin.Begin);
|
||||||
|
}
|
||||||
|
|
||||||
StreamReader reader = findBody(inputstream);
|
StreamReader reader = findBody(inputstream);
|
||||||
|
|
||||||
@ -119,7 +115,7 @@ namespace ArStomp
|
|||||||
var colon = line.IndexOf(":");
|
var colon = line.IndexOf(":");
|
||||||
if (colon < 1) // must exist and cannot by first character in the line
|
if (colon < 1) // must exist and cannot by first character in the line
|
||||||
{
|
{
|
||||||
throw new Exception($"Cannot parse header: colon not found. Cmd: {cmd}, line: `${line}`");
|
throw new Exception("Cannot parse header");
|
||||||
}
|
}
|
||||||
var key = line.Substring(0, colon);
|
var key = line.Substring(0, colon);
|
||||||
var value = line.Substring(colon + 1);
|
var value = line.Substring(colon + 1);
|
||||||
|
@ -16,15 +16,12 @@ namespace ArStomp
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
public class StompClient
|
public class StompClient
|
||||||
{
|
{
|
||||||
private int heartBeatInMillis;
|
|
||||||
public static bool Debug { get; set; } = false;
|
public static bool Debug { get; set; } = false;
|
||||||
private WebSocket ws;
|
private WebSocket ws;
|
||||||
private volatile bool stompConnected = false;
|
private volatile bool stompConnected = false;
|
||||||
public CancellationTokenSource Token { get; } = new CancellationTokenSource();
|
public CancellationTokenSource Token { get; } = new CancellationTokenSource();
|
||||||
private readonly X509Certificate2Collection certCollection;
|
private readonly X509Certificate2Collection certCollection;
|
||||||
private readonly Dictionary<string, Subscription> subs = new Dictionary<string, Subscription>();
|
private readonly Dictionary<string, Subscription> subs = new Dictionary<string, Subscription>();
|
||||||
private DateTime lastSend = DateTime.UtcNow;
|
|
||||||
private DateTime lastRcvd = DateTime.UtcNow;
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Handler of incoming messages.
|
/// Handler of incoming messages.
|
||||||
/// Used only for RPC. <see cref="Subscription">Subsscriptions</see> have own handlers.
|
/// Used only for RPC. <see cref="Subscription">Subsscriptions</see> have own handlers.
|
||||||
@ -36,12 +33,14 @@ namespace ArStomp
|
|||||||
/// Works with binary frames.
|
/// Works with binary frames.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <param name="certCollection">collection of root ca certificates (if TLS is used)</param>
|
/// <param name="certCollection">collection of root ca certificates (if TLS is used)</param>
|
||||||
public StompClient(X509Certificate2Collection certCollection = null, int heartBeatSec = 20)
|
public StompClient(X509Certificate2Collection certCollection = null)
|
||||||
{
|
{
|
||||||
this.heartBeatInMillis = heartBeatSec * 1000;
|
|
||||||
if (certCollection != null && certCollection.Count > 0)
|
if (certCollection != null && certCollection.Count > 0)
|
||||||
{
|
{
|
||||||
this.certCollection = certCollection;
|
this.certCollection = certCollection;
|
||||||
|
|
||||||
|
// In .netstandard2.0 - setup validator globally
|
||||||
|
// System.Net.ServicePointManager.ServerCertificateValidationCallback = RemoteCertificateValidationCallback;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -67,55 +66,25 @@ namespace ArStomp
|
|||||||
/// <returns>true if server certificate is valid, false otherwise</returns>
|
/// <returns>true if server certificate is valid, false otherwise</returns>
|
||||||
private bool RemoteCertificateValidationCallback(object sender, X509Certificate certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors)
|
private bool RemoteCertificateValidationCallback(object sender, X509Certificate certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors)
|
||||||
{
|
{
|
||||||
if (Debug)
|
if (Debug) Console.WriteLine("Custom RemoteCertificateValidationCallback");
|
||||||
{
|
|
||||||
System.Console.WriteLine("Subject: {0}", certificate.Subject.ToString());
|
|
||||||
System.Console.WriteLine("Cert: {0}", certificate.ToString());
|
|
||||||
}
|
|
||||||
// if there is no detected problems we can say OK
|
// if there is no detected problems we can say OK
|
||||||
if ((sslPolicyErrors & (SslPolicyErrors.None)) > 0)
|
if ((sslPolicyErrors & (SslPolicyErrors.None)) > 0) return true;
|
||||||
{
|
|
||||||
if (Debug) System.Console.WriteLine("Cert OK: ((sslPolicyErrors & (SslPolicyErrors.None)) > 0)");
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
// sins that cannot be forgiven
|
// sins that cannot be forgiven
|
||||||
if (
|
if (
|
||||||
(sslPolicyErrors & (SslPolicyErrors.RemoteCertificateNameMismatch)) > 0 ||
|
(sslPolicyErrors & (SslPolicyErrors.RemoteCertificateNameMismatch)) > 0 ||
|
||||||
(sslPolicyErrors & (SslPolicyErrors.RemoteCertificateNotAvailable)) > 0
|
(sslPolicyErrors & (SslPolicyErrors.RemoteCertificateNotAvailable)) > 0
|
||||||
)
|
) return false;
|
||||||
{
|
|
||||||
if (Debug) System.Console.WriteLine("Cert Fail: (sslPolicyErrors & (SslPolicyErrors.RemoteCertificateNameMismatch)) > 0 - {0}", (sslPolicyErrors & (SslPolicyErrors.RemoteCertificateNameMismatch)) > 0);
|
|
||||||
if (Debug) System.Console.WriteLine("Cert Fail: (sslPolicyErrors & (SslPolicyErrors.RemoteCertificateNotAvailable)) > 0 - {0}", (sslPolicyErrors & (SslPolicyErrors.RemoteCertificateNotAvailable)) > 0);
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
if (Debug)
|
|
||||||
{
|
|
||||||
System.Console.WriteLine("Chain:");
|
|
||||||
foreach (var ce in chain.ChainElements)
|
|
||||||
{
|
|
||||||
System.Console.WriteLine("Element: {0}", ce.Certificate);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// last certificate in chain should be one of our trust anchors
|
// last certificate in chain should be one of our trust anchors
|
||||||
X509Certificate2 projectedRootCert = chain.ChainElements[chain.ChainElements.Count - 1].Certificate;
|
X509Certificate2 projectedRootCert = chain.ChainElements[chain.ChainElements.Count - 1].Certificate;
|
||||||
// check if server's root ca is one of our trusted
|
// check if server's root ca is one of our trusted
|
||||||
bool anytrusted = false;
|
bool anytrusted = false;
|
||||||
foreach (var cert in certCollection)
|
foreach (var cert in certCollection)
|
||||||
{
|
{
|
||||||
if (Debug) System.Console.WriteLine("Anytrust: {0}, {1} =? {2}", projectedRootCert.Thumbprint.ToString(), cert.Thumbprint.ToString(), (projectedRootCert.Thumbprint == cert.Thumbprint));
|
|
||||||
anytrusted = anytrusted || (projectedRootCert.Thumbprint == cert.Thumbprint);
|
anytrusted = anytrusted || (projectedRootCert.Thumbprint == cert.Thumbprint);
|
||||||
}
|
}
|
||||||
if (!anytrusted)
|
if (!anytrusted) return false;
|
||||||
{
|
|
||||||
if (Debug) System.Console.WriteLine("Cert Fail: (!anytrusted)");
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
// any other problems than unknown CA?
|
// any other problems than unknown CA?
|
||||||
if (chain.ChainStatus.Any(statusFlags => statusFlags.Status != X509ChainStatusFlags.UntrustedRoot))
|
if (chain.ChainStatus.Any(statusFlags => statusFlags.Status != X509ChainStatusFlags.UntrustedRoot)) return false;
|
||||||
{
|
|
||||||
if (Debug) System.Console.WriteLine("Cert Fail: chain.ChainStatus.Any(statusFlags => statusFlags.Status != X509ChainStatusFlags.UntrustedRoot)");
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
// everything OK
|
// everything OK
|
||||||
if (Debug) Console.WriteLine("Certificate OK");
|
if (Debug) Console.WriteLine("Certificate OK");
|
||||||
return true;
|
return true;
|
||||||
@ -146,13 +115,10 @@ namespace ArStomp
|
|||||||
public async Task Connect(Uri uri, string login, string password)
|
public async Task Connect(Uri uri, string login, string password)
|
||||||
{
|
{
|
||||||
if (ws != null) throw new Exception("Cannot connect in this state. Should close before");
|
if (ws != null) throw new Exception("Cannot connect in this state. Should close before");
|
||||||
ws = new WebSocket(uri.ToString(), "v12.stomp");
|
ws = new WebSocket( uri.ToString(), "v12.stomp");
|
||||||
if (uri.Scheme == "wss")
|
if (uri.Scheme == "wss" && certCollection != null)
|
||||||
{
|
|
||||||
if (certCollection != null)
|
|
||||||
{
|
{
|
||||||
ws.SslConfiguration.ServerCertificateValidationCallback = RemoteCertificateValidationCallback;
|
ws.SslConfiguration.ServerCertificateValidationCallback = RemoteCertificateValidationCallback;
|
||||||
}
|
|
||||||
ws.SslConfiguration.EnabledSslProtocols = System.Security.Authentication.SslProtocols.Tls12;
|
ws.SslConfiguration.EnabledSslProtocols = System.Security.Authentication.SslProtocols.Tls12;
|
||||||
}
|
}
|
||||||
var ct = Token.Token;
|
var ct = Token.Token;
|
||||||
@ -173,14 +139,12 @@ namespace ArStomp
|
|||||||
ws.OnOpen += (sender, o) =>
|
ws.OnOpen += (sender, o) =>
|
||||||
{
|
{
|
||||||
if (Debug) System.Console.WriteLine("OnOpen");
|
if (Debug) System.Console.WriteLine("OnOpen");
|
||||||
StompFrm connect = new StompFrm(login, password, heartBeatInMillis);
|
StompFrm connect = new StompFrm(login, password);
|
||||||
lastSend = DateTime.UtcNow;
|
|
||||||
Task.Run(() => { return connect.Serialize(ws, ct); });
|
Task.Run(() => { return connect.Serialize(ws, ct); });
|
||||||
};
|
};
|
||||||
|
|
||||||
ws.OnMessage += (sender, o) =>
|
ws.OnMessage += (sender, o) =>
|
||||||
{
|
{
|
||||||
lastRcvd = DateTime.UtcNow;
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
if (Debug)
|
if (Debug)
|
||||||
@ -198,8 +162,7 @@ namespace ArStomp
|
|||||||
ExpectFrame(fr, FrameType.Connected);
|
ExpectFrame(fr, FrameType.Connected);
|
||||||
openTask.TrySetResult(true);
|
openTask.TrySetResult(true);
|
||||||
if (Debug) System.Console.WriteLine("Logon done");
|
if (Debug) System.Console.WriteLine("Logon done");
|
||||||
}
|
} catch (Exception e)
|
||||||
catch (Exception e)
|
|
||||||
{
|
{
|
||||||
openTask.TrySetException(e);
|
openTask.TrySetException(e);
|
||||||
}
|
}
|
||||||
@ -228,7 +191,7 @@ namespace ArStomp
|
|||||||
/// <returns>true if it looks like we have proper connection to server</returns>
|
/// <returns>true if it looks like we have proper connection to server</returns>
|
||||||
public bool IsConnected()
|
public bool IsConnected()
|
||||||
{
|
{
|
||||||
return ws != null && ws.IsAlive && (DateTime.UtcNow - lastRcvd) < TimeSpan.FromMilliseconds(3 * heartBeatInMillis);
|
return ws != null && ws.IsAlive;
|
||||||
}
|
}
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Send message
|
/// Send message
|
||||||
@ -242,7 +205,6 @@ namespace ArStomp
|
|||||||
|
|
||||||
SendFrm send = new SendFrm(destination, correlationId, body);
|
SendFrm send = new SendFrm(destination, correlationId, body);
|
||||||
await send.Serialize(ws, ct);
|
await send.Serialize(ws, ct);
|
||||||
lastSend = DateTime.UtcNow;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private int SubId = 0;
|
private int SubId = 0;
|
||||||
@ -286,8 +248,15 @@ namespace ArStomp
|
|||||||
var ct = Token.Token;
|
var ct = Token.Token;
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
Frame fr = Helpers.GetFrame(msg.RawData, ct);
|
Frame fr = null;
|
||||||
|
try
|
||||||
|
{
|
||||||
|
fr = Helpers.GetFrame(msg.RawData, ct);
|
||||||
|
}
|
||||||
|
catch (Exception e)
|
||||||
|
{
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
if (fr.Type == FrameType.Error) ExpectFrame(fr, FrameType.Message);
|
if (fr.Type == FrameType.Error) ExpectFrame(fr, FrameType.Message);
|
||||||
if (fr.Type == FrameType.Message)
|
if (fr.Type == FrameType.Message)
|
||||||
{
|
{
|
||||||
@ -306,12 +275,6 @@ namespace ArStomp
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (fr.Type == FrameType.Heartbeat || DateTime.UtcNow - lastSend > TimeSpan.FromSeconds(20))
|
|
||||||
{
|
|
||||||
lastSend = DateTime.UtcNow;
|
|
||||||
ws.Send(new byte[] { 10 }); // send heartbeat
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
catch (Exception e)
|
catch (Exception e)
|
||||||
|
@ -103,18 +103,18 @@ namespace ArStomp
|
|||||||
stream.Position = 0;
|
stream.Position = 0;
|
||||||
ws.Send(stream, (int) stream.Length);
|
ws.Send(stream, (int) stream.Length);
|
||||||
return Task.CompletedTask;
|
return Task.CompletedTask;
|
||||||
|
//return ws.SendAsync(new ArraySegment<byte>(array, 0, (int)stream.Position), WebSocketMessageType.Binary, true, cancellationToken);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
internal class StompFrm : Frame
|
internal class StompFrm : Frame
|
||||||
{
|
{
|
||||||
public StompFrm(string login, string passwd, int hbmilis)
|
public StompFrm(string login, string passwd)
|
||||||
{
|
{
|
||||||
Type = FrameType.Stomp;
|
Type = FrameType.Stomp;
|
||||||
Headers["login"] = login;
|
Headers["login"] = login;
|
||||||
Headers["passcode"] = passwd;
|
Headers["passcode"] = passwd;
|
||||||
Headers["accept-version"] = "1.2";
|
Headers["accept-version"] = "1.2";
|
||||||
if (hbmilis > 0) Headers["heart-beat"] = $"{hbmilis},{hbmilis}";
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user