diff --git a/arstomp/arstomp.csproj b/arstomp/arstomp.csproj index 41d4021..0896a5a 100644 --- a/arstomp/arstomp.csproj +++ b/arstomp/arstomp.csproj @@ -1,8 +1,12 @@ - library + Exe netstandard2.0 + + + + diff --git a/arstomp/src/StompClient.cs b/arstomp/src/StompClient.cs index 6801905..50ff89e 100644 --- a/arstomp/src/StompClient.cs +++ b/arstomp/src/StompClient.cs @@ -1,12 +1,13 @@ using System.Security.Cryptography.X509Certificates; using System.Net.Security; -using System.Net.WebSockets; using System.Threading; using System.Threading.Tasks; using System.Text; using System; using System.Collections.Generic; using System.Linq; +using WebSocketSharp; +using WebSocketSharp.Net; namespace ArStomp { @@ -17,7 +18,8 @@ namespace ArStomp { private Task runner = null; public static bool Debug { get; set; } = false; - private ClientWebSocket ws = new ClientWebSocket(); + private WebSocket ws; + private volatile bool stompConnected = false; public CancellationTokenSource Token { get; } = new CancellationTokenSource(); private readonly X509Certificate2Collection certCollection; private readonly Dictionary subs = new Dictionary(); @@ -34,9 +36,6 @@ namespace ArStomp /// collection of root ca certificates (if TLS is used) public StompClient(X509Certificate2Collection certCollection = null) { - - ws = new ClientWebSocket(); - ws.Options.AddSubProtocol("v12.stomp"); if (certCollection != null && certCollection.Count > 0) { this.certCollection = certCollection; @@ -112,19 +111,35 @@ namespace ArStomp /// uri in format ws://host[:port][/path] or wss://host[:port][/path] /// login name /// password - public async Task Connect(Uri uri, string login, string password) + public Task Connect(Uri uri, string login, string password) { - if (runner != null) throw new Exception("Cannot connect in this state. Should close before"); + if (ws != null) throw new Exception("Cannot connect in this state. Should close before"); + ws = new WebSocket(uri.ToString(), "v12.stomp"); + var ct = Token.Token; - await ws.ConnectAsync(uri, ct); - StompFrm connect = new StompFrm(login, password); - await connect.Serialize(ws, ct); + ws.OnClose += (sender, o) => + { + Token.Cancel(); + }; - Frame fr = await Helpers.GetFrame(ws, ct); + ws.OnOpen += (sender, o) => + { + StompFrm connect = new StompFrm(login, password); + Task.Run(async () => { await connect.Serialize(ws, ct);}); + }; - ExpectFrame(fr, FrameType.Connected); - runner = Run(); // Run is async + ws.OnMessage += (sender, o) => + { + //Frame fr = await Helpers.GetFrame(ws, ct); + //ExpectFrame(fr, FrameType.Connected); + System.Console.WriteLine("Msg: type {0} data: {1}", o.Type.ToString(), o.ToString()); + }; + + ws.Connect(); + + //runner = Run(); // Run is async + return Task.CompletedTask; } /// /// Reports state of conection @@ -132,7 +147,7 @@ namespace ArStomp /// true if it looks like we have proper connection to server public bool IsConnected() { - return ws.CloseStatus == null; + return ws.IsAlive; } /// /// Send message @@ -194,7 +209,7 @@ namespace ArStomp Frame fr = null; try { - fr = await Helpers.GetFrame(ws, ct); + //fr = await Helpers.GetFrame(ws, ct); } catch (ThreadInterruptedException) { @@ -229,18 +244,19 @@ namespace ArStomp /// Cancel current operaton and close connection /// /// - public async Task Close() + public Task Close() { try { Token.Cancel(); var ct = new CancellationTokenSource().Token; - await ws.CloseAsync(WebSocketCloseStatus.Empty, null, ct); + ws.Close(); } catch { // skip } + return Task.CompletedTask; } } /// diff --git a/arstomp/src/StompFrame.cs b/arstomp/src/StompFrame.cs index 780e420..a86946f 100644 --- a/arstomp/src/StompFrame.cs +++ b/arstomp/src/StompFrame.cs @@ -1,11 +1,11 @@ using System; using System.Collections.Generic; -using System.Net.WebSockets; using System.Threading; using System.Threading.Tasks; using System.IO; using System.Text; - +using WebSocketSharp; +using WebSocketSharp.Net; namespace ArStomp { @@ -68,7 +68,7 @@ namespace ArStomp return sb.ToString(); } - internal Task Serialize(ClientWebSocket ws, CancellationToken cancellationToken) + internal Task Serialize(WebSocket ws, CancellationToken cancellationToken) { var utf8 = Encoding.UTF8; var EOL = utf8.GetBytes("\n"); @@ -100,7 +100,10 @@ namespace ArStomp stream.Flush(); var array = stream.GetBuffer(); if (StompClient.Debug) Console.WriteLine(">>>\n{0}\n>>>\n", this); - return ws.SendAsync(new ArraySegment(array, 0, (int)stream.Position), WebSocketMessageType.Binary, true, cancellationToken); + stream.Position = 0; + ws.Send(stream, (int) stream.Length, false); + return Task.CompletedTask; + //return ws.SendAsync(new ArraySegment(array, 0, (int)stream.Position), WebSocketMessageType.Binary, true, cancellationToken); } }