diff --git a/.gitignore b/.gitignore index 88ec626..c5e5832 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,4 @@ docs/ arstomp/obj/ arstomp/bin/ arstomp/src/internal +/tester diff --git a/arstomp/arstomp.csproj b/arstomp/arstomp.csproj index 0896a5a..c82c723 100644 --- a/arstomp/arstomp.csproj +++ b/arstomp/arstomp.csproj @@ -1,12 +1,12 @@ - Exe + Library netstandard2.0 - + diff --git a/arstomp/src/Helpers.cs b/arstomp/src/Helpers.cs index 6b2ea86..541caac 100644 --- a/arstomp/src/Helpers.cs +++ b/arstomp/src/Helpers.cs @@ -48,28 +48,6 @@ namespace ArStomp default: return "UNKNOWN"; }; } - private static async Task GetMessage(MemoryStream output, ClientWebSocket ws, CancellationToken cancellationToken) - { - var barray = new byte[128 * 1024]; - // read first frame - ArraySegment buffer = new ArraySegment(barray); - var result = await ws.ReceiveAsync(buffer, cancellationToken); - - if (result.CloseStatus != null) - { - throw new Exception($"Unexpected close: {result.CloseStatus}: {result.CloseStatusDescription}"); - } - - output.Write(barray, 0, result.Count); - - while (result.EndOfMessage != true) - { - buffer = new ArraySegment(barray); - result = await ws.ReceiveAsync(buffer, cancellationToken); - output.Write(barray, 0, result.Count); - } - output.Seek(0, SeekOrigin.Begin); - } private static StreamReader findBody(Stream input) { @@ -101,20 +79,12 @@ namespace ArStomp return new StreamReader(output, Encoding.UTF8); // return UTF8 reader } - internal static async Task GetFrame(ClientWebSocket ws, CancellationToken cancellationToken) + internal static Frame GetFrame(byte[] msgBuffer, CancellationToken cancellationToken) { var utf8 = Encoding.UTF8; - var inputstream = new MemoryStream(); + var inputstream = new MemoryStream(msgBuffer); var bodyoutput = new MemoryStream(); - try - { - await GetMessage(inputstream, ws, cancellationToken); - } - catch (TaskCanceledException) - { - return HeartbeatFrame; // just return empty frame - } if (inputstream.ReadByte() == 10) { diff --git a/arstomp/src/StompClient.cs b/arstomp/src/StompClient.cs index 50ff89e..60e4a52 100644 --- a/arstomp/src/StompClient.cs +++ b/arstomp/src/StompClient.cs @@ -16,7 +16,6 @@ namespace ArStomp /// public class StompClient { - private Task runner = null; public static bool Debug { get; set; } = false; private WebSocket ws; private volatile bool stompConnected = false; @@ -38,10 +37,10 @@ namespace ArStomp { if (certCollection != null && certCollection.Count > 0) { - this.certCollection = certCollection; + this.certCollection = certCollection; - // In .netstandard2.0 - setup validator globally - System.Net.ServicePointManager.ServerCertificateValidationCallback = RemoteCertificateValidationCallback; + // In .netstandard2.0 - setup validator globally + // System.Net.ServicePointManager.ServerCertificateValidationCallback = RemoteCertificateValidationCallback; } } @@ -67,6 +66,7 @@ namespace ArStomp /// true if server certificate is valid, false otherwise private bool RemoteCertificateValidationCallback(object sender, X509Certificate certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors) { + if (Debug) Console.WriteLine("Custom RemoteCertificateValidationCallback"); // if there is no detected problems we can say OK if ((sslPolicyErrors & (SslPolicyErrors.None)) > 0) return true; // sins that cannot be forgiven @@ -86,6 +86,7 @@ namespace ArStomp // any other problems than unknown CA? if (chain.ChainStatus.Any(statusFlags => statusFlags.Status != X509ChainStatusFlags.UntrustedRoot)) return false; // everything OK + if (Debug) Console.WriteLine("Certificate OK"); return true; } @@ -111,35 +112,78 @@ namespace ArStomp /// uri in format ws://host[:port][/path] or wss://host[:port][/path] /// login name /// password - public Task Connect(Uri uri, string login, string password) + public async Task Connect(Uri uri, string login, string password) { if (ws != null) throw new Exception("Cannot connect in this state. Should close before"); - ws = new WebSocket(uri.ToString(), "v12.stomp"); - + ws = new WebSocket( uri.ToString(), "v12.stomp"); + if (uri.Scheme == "wss" && certCollection != null) + { + ws.SslConfiguration.ServerCertificateValidationCallback = RemoteCertificateValidationCallback; + ws.SslConfiguration.EnabledSslProtocols = System.Security.Authentication.SslProtocols.Tls12; + } var ct = Token.Token; + TaskCompletionSource openTask = new TaskCompletionSource(); ws.OnClose += (sender, o) => { + if (Debug) System.Console.WriteLine("OnClose: code: {0}, wasClean: {1}, reason: {2}", o.Code, o.WasClean, o.Reason); + Token.Cancel(); + }; + + ws.OnError += (sender, o) => + { + if (Debug) System.Console.WriteLine("OnError: {0}, {1}", o.Message, o.Exception); Token.Cancel(); }; ws.OnOpen += (sender, o) => { + if (Debug) System.Console.WriteLine("OnOpen"); StompFrm connect = new StompFrm(login, password); - Task.Run(async () => { await connect.Serialize(ws, ct);}); + Task.Run(() => { return connect.Serialize(ws, ct); }); }; ws.OnMessage += (sender, o) => { - //Frame fr = await Helpers.GetFrame(ws, ct); - //ExpectFrame(fr, FrameType.Connected); - System.Console.WriteLine("Msg: type {0} data: {1}", o.Type.ToString(), o.ToString()); + try + { + if (Debug) + { + string type = o.IsBinary ? "Binary" : (o.IsText ? "Text" : (o.IsPing ? "Ping" : "Unknown")); + string value = o.IsBinary ? $"byte[{o.RawData.Length}]" : (o.IsText ? o.Data : ""); + System.Console.WriteLine("OnMessage: type: {0} data: {1}", type, value); + } + if (!stompConnected) + { + Frame fr = Helpers.GetFrame(o.RawData, ct); + stompConnected = true; + try + { + ExpectFrame(fr, FrameType.Connected); + openTask.TrySetResult(true); + if (Debug) System.Console.WriteLine("Logon done"); + } catch (Exception e) + { + openTask.TrySetException(e); + } + return; + } + + ProcessMessage(o); + } + catch (Exception e) + { + Console.WriteLine("onMessage: Wyjątek {0}", e); + } + + }; ws.Connect(); - - //runner = Run(); // Run is async - return Task.CompletedTask; + CancellationTokenSource cts = new CancellationTokenSource(5000); // 5 seconds timeout + cts.Token.Register(() => openTask.TrySetCanceled(), useSynchronizationContext: false); + await openTask.Task; + if (Debug) System.Console.WriteLine("STOMP Connected"); } /// /// Reports state of conection @@ -147,7 +191,7 @@ namespace ArStomp /// true if it looks like we have proper connection to server public bool IsConnected() { - return ws.IsAlive; + return ws != null && ws.IsAlive; } /// /// Send message @@ -155,12 +199,12 @@ namespace ArStomp /// queue o exchange (eg /exchange/name/routing-key in case of RabbitMQ) /// property correlationId for the message /// content of the message - public Task Send(string destination, string correlationId, byte[] body) + public async Task Send(string destination, string correlationId, byte[] body) { var ct = Token.Token; SendFrm send = new SendFrm(destination, correlationId, body); - return send.Serialize(ws, ct); + await send.Serialize(ws, ct); } private int SubId = 0; @@ -199,45 +243,43 @@ namespace ArStomp } } - private async Task Run() + private void ProcessMessage(WebSocketSharp.MessageEventArgs msg) { var ct = Token.Token; try { - while (!ct.IsCancellationRequested) + Frame fr = null; + try { - Frame fr = null; - try + fr = Helpers.GetFrame(msg.RawData, ct); + } + catch (Exception e) + { + throw e; + } + if (fr.Type == FrameType.Error) ExpectFrame(fr, FrameType.Message); + if (fr.Type == FrameType.Message) + { + if (fr.Headers.ContainsKey("subscription")) { - //fr = await Helpers.GetFrame(ws, ct); - } - catch (ThreadInterruptedException) - { - break; - } - if (fr.Type == FrameType.Error) ExpectFrame(fr, FrameType.Message); - if (fr.Type == FrameType.Message) - { - if (fr.Headers.ContainsKey("subscription")) + if (fr.Headers["subscription"] == "/temp-queue/rpc-replies") { - if (fr.Headers["subscription"] == "/temp-queue/rpc-replies") - { - InvokeOnMessage(new SubscriptionEventArgs(fr)); - } - else - { - var sub = subs[fr.Headers["subscription"]]; - if (sub != null) sub.InvokeOnMessage(new SubscriptionEventArgs(fr)); - else Console.WriteLine("Nieoczekiwany komunikat {0}", fr); + InvokeOnMessage(new SubscriptionEventArgs(fr)); + } + else + { + var sub = subs[fr.Headers["subscription"]]; + if (sub != null) sub.InvokeOnMessage(new SubscriptionEventArgs(fr)); + else Console.WriteLine("Unexpected message {0}", fr); - } } } } + } - finally + catch (Exception e) { - await Close(); + System.Console.WriteLine("Excpetion: {0}", e); } } /// @@ -251,6 +293,8 @@ namespace ArStomp Token.Cancel(); var ct = new CancellationTokenSource().Token; ws.Close(); + stompConnected = false; + ws = null; } catch { diff --git a/arstomp/src/StompFrame.cs b/arstomp/src/StompFrame.cs index a86946f..e67a4b1 100644 --- a/arstomp/src/StompFrame.cs +++ b/arstomp/src/StompFrame.cs @@ -101,7 +101,7 @@ namespace ArStomp var array = stream.GetBuffer(); if (StompClient.Debug) Console.WriteLine(">>>\n{0}\n>>>\n", this); stream.Position = 0; - ws.Send(stream, (int) stream.Length, false); + ws.Send(stream, (int) stream.Length); return Task.CompletedTask; //return ws.SendAsync(new ArraySegment(array, 0, (int)stream.Position), WebSocketMessageType.Binary, true, cancellationToken); }