feat: Version for Windows 7

This commit is contained in:
Arek 2021-05-17 17:27:05 +02:00
parent 5011fc6adb
commit c7a45b4663
5 changed files with 93 additions and 78 deletions

1
.gitignore vendored
View File

@ -4,3 +4,4 @@ docs/
arstomp/obj/ arstomp/obj/
arstomp/bin/ arstomp/bin/
arstomp/src/internal arstomp/src/internal
/tester

View File

@ -1,12 +1,12 @@
<Project Sdk="Microsoft.NET.Sdk"> <Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup> <PropertyGroup>
<OutputType>Exe</OutputType> <OutputType>Library</OutputType>
<TargetFramework>netstandard2.0</TargetFramework> <TargetFramework>netstandard2.0</TargetFramework>
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="websocket-sharp" Version="1.0.0" /> <PackageReference Include="WebSocketSharp-netstandard" Version="1.0.1" />
</ItemGroup> </ItemGroup>
</Project> </Project>

View File

@ -48,28 +48,6 @@ namespace ArStomp
default: return "UNKNOWN"; default: return "UNKNOWN";
}; };
} }
private static async Task GetMessage(MemoryStream output, ClientWebSocket ws, CancellationToken cancellationToken)
{
var barray = new byte[128 * 1024];
// read first frame
ArraySegment<byte> buffer = new ArraySegment<byte>(barray);
var result = await ws.ReceiveAsync(buffer, cancellationToken);
if (result.CloseStatus != null)
{
throw new Exception($"Unexpected close: {result.CloseStatus}: {result.CloseStatusDescription}");
}
output.Write(barray, 0, result.Count);
while (result.EndOfMessage != true)
{
buffer = new ArraySegment<byte>(barray);
result = await ws.ReceiveAsync(buffer, cancellationToken);
output.Write(barray, 0, result.Count);
}
output.Seek(0, SeekOrigin.Begin);
}
private static StreamReader findBody(Stream input) private static StreamReader findBody(Stream input)
{ {
@ -101,20 +79,12 @@ namespace ArStomp
return new StreamReader(output, Encoding.UTF8); // return UTF8 reader return new StreamReader(output, Encoding.UTF8); // return UTF8 reader
} }
internal static async Task<Frame> GetFrame(ClientWebSocket ws, CancellationToken cancellationToken) internal static Frame GetFrame(byte[] msgBuffer, CancellationToken cancellationToken)
{ {
var utf8 = Encoding.UTF8; var utf8 = Encoding.UTF8;
var inputstream = new MemoryStream(); var inputstream = new MemoryStream(msgBuffer);
var bodyoutput = new MemoryStream(); var bodyoutput = new MemoryStream();
try
{
await GetMessage(inputstream, ws, cancellationToken);
}
catch (TaskCanceledException)
{
return HeartbeatFrame; // just return empty frame
}
if (inputstream.ReadByte() == 10) if (inputstream.ReadByte() == 10)
{ {

View File

@ -16,7 +16,6 @@ namespace ArStomp
/// </summary> /// </summary>
public class StompClient public class StompClient
{ {
private Task runner = null;
public static bool Debug { get; set; } = false; public static bool Debug { get; set; } = false;
private WebSocket ws; private WebSocket ws;
private volatile bool stompConnected = false; private volatile bool stompConnected = false;
@ -38,10 +37,10 @@ namespace ArStomp
{ {
if (certCollection != null && certCollection.Count > 0) if (certCollection != null && certCollection.Count > 0)
{ {
this.certCollection = certCollection; this.certCollection = certCollection;
// In .netstandard2.0 - setup validator globally // In .netstandard2.0 - setup validator globally
System.Net.ServicePointManager.ServerCertificateValidationCallback = RemoteCertificateValidationCallback; // System.Net.ServicePointManager.ServerCertificateValidationCallback = RemoteCertificateValidationCallback;
} }
} }
@ -67,6 +66,7 @@ namespace ArStomp
/// <returns>true if server certificate is valid, false otherwise</returns> /// <returns>true if server certificate is valid, false otherwise</returns>
private bool RemoteCertificateValidationCallback(object sender, X509Certificate certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors) private bool RemoteCertificateValidationCallback(object sender, X509Certificate certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors)
{ {
if (Debug) Console.WriteLine("Custom RemoteCertificateValidationCallback");
// if there is no detected problems we can say OK // if there is no detected problems we can say OK
if ((sslPolicyErrors & (SslPolicyErrors.None)) > 0) return true; if ((sslPolicyErrors & (SslPolicyErrors.None)) > 0) return true;
// sins that cannot be forgiven // sins that cannot be forgiven
@ -86,6 +86,7 @@ namespace ArStomp
// any other problems than unknown CA? // any other problems than unknown CA?
if (chain.ChainStatus.Any(statusFlags => statusFlags.Status != X509ChainStatusFlags.UntrustedRoot)) return false; if (chain.ChainStatus.Any(statusFlags => statusFlags.Status != X509ChainStatusFlags.UntrustedRoot)) return false;
// everything OK // everything OK
if (Debug) Console.WriteLine("Certificate OK");
return true; return true;
} }
@ -111,35 +112,78 @@ namespace ArStomp
/// <param name="uri">uri in format ws://host[:port][/path] or wss://host[:port][/path]</param> /// <param name="uri">uri in format ws://host[:port][/path] or wss://host[:port][/path]</param>
/// <param name="login">login name</param> /// <param name="login">login name</param>
/// <param name="password">password</param> /// <param name="password">password</param>
public Task Connect(Uri uri, string login, string password) public async Task Connect(Uri uri, string login, string password)
{ {
if (ws != null) throw new Exception("Cannot connect in this state. Should close before"); if (ws != null) throw new Exception("Cannot connect in this state. Should close before");
ws = new WebSocket(uri.ToString(), "v12.stomp"); ws = new WebSocket( uri.ToString(), "v12.stomp");
if (uri.Scheme == "wss" && certCollection != null)
{
ws.SslConfiguration.ServerCertificateValidationCallback = RemoteCertificateValidationCallback;
ws.SslConfiguration.EnabledSslProtocols = System.Security.Authentication.SslProtocols.Tls12;
}
var ct = Token.Token; var ct = Token.Token;
TaskCompletionSource<bool> openTask = new TaskCompletionSource<bool>();
ws.OnClose += (sender, o) => ws.OnClose += (sender, o) =>
{ {
if (Debug) System.Console.WriteLine("OnClose: code: {0}, wasClean: {1}, reason: {2}", o.Code, o.WasClean, o.Reason);
Token.Cancel();
};
ws.OnError += (sender, o) =>
{
if (Debug) System.Console.WriteLine("OnError: {0}, {1}", o.Message, o.Exception);
Token.Cancel(); Token.Cancel();
}; };
ws.OnOpen += (sender, o) => ws.OnOpen += (sender, o) =>
{ {
if (Debug) System.Console.WriteLine("OnOpen");
StompFrm connect = new StompFrm(login, password); StompFrm connect = new StompFrm(login, password);
Task.Run(async () => { await connect.Serialize(ws, ct);}); Task.Run(() => { return connect.Serialize(ws, ct); });
}; };
ws.OnMessage += (sender, o) => ws.OnMessage += (sender, o) =>
{ {
//Frame fr = await Helpers.GetFrame(ws, ct); try
//ExpectFrame(fr, FrameType.Connected); {
System.Console.WriteLine("Msg: type {0} data: {1}", o.Type.ToString(), o.ToString()); if (Debug)
{
string type = o.IsBinary ? "Binary" : (o.IsText ? "Text" : (o.IsPing ? "Ping" : "Unknown"));
string value = o.IsBinary ? $"byte[{o.RawData.Length}]" : (o.IsText ? o.Data : "");
System.Console.WriteLine("OnMessage: type: {0} data: {1}", type, value);
}
if (!stompConnected)
{
Frame fr = Helpers.GetFrame(o.RawData, ct);
stompConnected = true;
try
{
ExpectFrame(fr, FrameType.Connected);
openTask.TrySetResult(true);
if (Debug) System.Console.WriteLine("Logon done");
} catch (Exception e)
{
openTask.TrySetException(e);
}
return;
}
ProcessMessage(o);
}
catch (Exception e)
{
Console.WriteLine("onMessage: Wyjątek {0}", e);
}
}; };
ws.Connect(); ws.Connect();
CancellationTokenSource cts = new CancellationTokenSource(5000); // 5 seconds timeout
//runner = Run(); // Run is async cts.Token.Register(() => openTask.TrySetCanceled(), useSynchronizationContext: false);
return Task.CompletedTask; await openTask.Task;
if (Debug) System.Console.WriteLine("STOMP Connected");
} }
/// <summary> /// <summary>
/// Reports state of conection /// Reports state of conection
@ -147,7 +191,7 @@ namespace ArStomp
/// <returns>true if it looks like we have proper connection to server</returns> /// <returns>true if it looks like we have proper connection to server</returns>
public bool IsConnected() public bool IsConnected()
{ {
return ws.IsAlive; return ws != null && ws.IsAlive;
} }
/// <summary> /// <summary>
/// Send message /// Send message
@ -155,12 +199,12 @@ namespace ArStomp
/// <param name="destination">queue o exchange (eg /exchange/name/routing-key in case of RabbitMQ)</param> /// <param name="destination">queue o exchange (eg /exchange/name/routing-key in case of RabbitMQ)</param>
/// <param name="correlationId">property correlationId for the message</param> /// <param name="correlationId">property correlationId for the message</param>
/// <param name="body">content of the message</param> /// <param name="body">content of the message</param>
public Task Send(string destination, string correlationId, byte[] body) public async Task Send(string destination, string correlationId, byte[] body)
{ {
var ct = Token.Token; var ct = Token.Token;
SendFrm send = new SendFrm(destination, correlationId, body); SendFrm send = new SendFrm(destination, correlationId, body);
return send.Serialize(ws, ct); await send.Serialize(ws, ct);
} }
private int SubId = 0; private int SubId = 0;
@ -199,45 +243,43 @@ namespace ArStomp
} }
} }
private async Task Run() private void ProcessMessage(WebSocketSharp.MessageEventArgs msg)
{ {
var ct = Token.Token; var ct = Token.Token;
try try
{ {
while (!ct.IsCancellationRequested) Frame fr = null;
try
{ {
Frame fr = null; fr = Helpers.GetFrame(msg.RawData, ct);
try }
catch (Exception e)
{
throw e;
}
if (fr.Type == FrameType.Error) ExpectFrame(fr, FrameType.Message);
if (fr.Type == FrameType.Message)
{
if (fr.Headers.ContainsKey("subscription"))
{ {
//fr = await Helpers.GetFrame(ws, ct); if (fr.Headers["subscription"] == "/temp-queue/rpc-replies")
}
catch (ThreadInterruptedException)
{
break;
}
if (fr.Type == FrameType.Error) ExpectFrame(fr, FrameType.Message);
if (fr.Type == FrameType.Message)
{
if (fr.Headers.ContainsKey("subscription"))
{ {
if (fr.Headers["subscription"] == "/temp-queue/rpc-replies") InvokeOnMessage(new SubscriptionEventArgs(fr));
{ }
InvokeOnMessage(new SubscriptionEventArgs(fr)); else
} {
else var sub = subs[fr.Headers["subscription"]];
{ if (sub != null) sub.InvokeOnMessage(new SubscriptionEventArgs(fr));
var sub = subs[fr.Headers["subscription"]]; else Console.WriteLine("Unexpected message {0}", fr);
if (sub != null) sub.InvokeOnMessage(new SubscriptionEventArgs(fr));
else Console.WriteLine("Nieoczekiwany komunikat {0}", fr);
}
} }
} }
} }
} }
finally catch (Exception e)
{ {
await Close(); System.Console.WriteLine("Excpetion: {0}", e);
} }
} }
/// <summary> /// <summary>
@ -251,6 +293,8 @@ namespace ArStomp
Token.Cancel(); Token.Cancel();
var ct = new CancellationTokenSource().Token; var ct = new CancellationTokenSource().Token;
ws.Close(); ws.Close();
stompConnected = false;
ws = null;
} }
catch catch
{ {

View File

@ -101,7 +101,7 @@ namespace ArStomp
var array = stream.GetBuffer(); var array = stream.GetBuffer();
if (StompClient.Debug) Console.WriteLine(">>>\n{0}\n>>>\n", this); if (StompClient.Debug) Console.WriteLine(">>>\n{0}\n>>>\n", this);
stream.Position = 0; stream.Position = 0;
ws.Send(stream, (int) stream.Length, false); ws.Send(stream, (int) stream.Length);
return Task.CompletedTask; return Task.CompletedTask;
//return ws.SendAsync(new ArraySegment<byte>(array, 0, (int)stream.Position), WebSocketMessageType.Binary, true, cancellationToken); //return ws.SendAsync(new ArraySegment<byte>(array, 0, (int)stream.Position), WebSocketMessageType.Binary, true, cancellationToken);
} }