Very first version

This commit is contained in:
Arek 2021-02-09 20:42:57 +01:00
parent df525e70fb
commit 3f5190fd94
8 changed files with 3378 additions and 0 deletions

6
.gitignore vendored Normal file
View File

@ -0,0 +1,6 @@
.vscode/
*.crt
docs/
arstomp/obj/
arstomp/bin/
arstomp/src/internal

2579
Doxyfile Normal file

File diff suppressed because it is too large Load Diff

60
README.md Normal file
View File

@ -0,0 +1,60 @@
# Simple STOMP over WS client
Implemented in C# (.NET Core and Framework)
Works with RabbitMQ.
Uses (and probably requires) binary frames.
Supports 'custom' Root CA certficates.
Has simple RPC helper.
## Usage
```csharp
class Program
{
static async Task Main(string[] args)
{
var utf8 = Encoding.UTF8;
byte[] bytes = File.ReadAllBytes("path/to/ca.crt");
X509Certificate2 myca = new X509Certificate2(bytes);
X509Certificate2Collection mycerts = new X509Certificate2Collection { myca };
StompClient client = new StompClient(mycerts);
// can get messages in handler
//client.OnMessage += OnMessage;
// or use something to make request/response simpler
using var rpc = new RPC(client);
// connect
var uri = new Uri("wss://stomp.server:15673/ws");
await client.Connect(uri, "login", "pass");
// subscriptions
var sub1 = await client.Subscribe("/exchange/ex1/test.#");
sub1.OnMessage += OnBroadcast;
var sub2 = await client.Subscribe("/exchange/ex2/test.#");
sub2.OnMessage += OnBroadcast;
// simple publish (no response)
await client.Send("/exchange/something/test", "1", utf8.GetBytes("Test 1"));
// simple call (sending request and expecting response)
var result = await rpc.Call("/exchange/rpc/test", utf8.GetBytes("Test 2"),
TimeSpan.FromSeconds(3));
Console.WriteLine("RCP: {0}", result);
await client.Close();
}
static void OnBroadcast(object sender, SubscriptionEventArgs ea)
{
Console.WriteLine("Broadcast {0}", ea.Frame);
var body = Encoding.UTF8.GetString(ea.Frame.Body);
Console.WriteLine("Body: {0}", body);
}
}
```

8
arstomp/arstomp.csproj Normal file
View File

@ -0,0 +1,8 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>library</OutputType>
<TargetFramework>netcoreapp3.1</TargetFramework>
</PropertyGroup>
</Project>

183
arstomp/src/Helpers.cs Normal file
View File

@ -0,0 +1,183 @@
using System;
using System.Collections.Generic;
using System.Net.WebSockets;
using System.Threading;
using System.Threading.Tasks;
using System.IO;
using System.Text;
namespace ArStomp
{
internal static class Helpers
{
/// <summary>
/// Static instance of heartbeat frame
/// </summary>
private readonly static Frame HeartbeatFrame = new Frame() { Type = FrameType.Heartbeat };
internal static readonly Dictionary<string, FrameType> CmdMap = new Dictionary<string, FrameType>()
{
{"CONNECTED", FrameType.Connected},
{"ERROR", FrameType.Error},
{"RECEIPT", FrameType.Receipt},
{"MESSAGE", FrameType.Message},
{"", FrameType.Heartbeat} // fake command
};
internal static string GetCmdString(FrameType type)
{
return type switch
{
FrameType.Unknown => "UNKNOWN",
FrameType.Connected => "CONNECTED",
FrameType.Message => "MESSAGE",
FrameType.Receipt => "RECEIPT",
FrameType.Error => "ERROR",
FrameType.Stomp => "STOMP",
FrameType.Send => "SEND",
FrameType.Subscribe => "SUBSCRIBE",
FrameType.Unsubscribe => "UNSUBSCRIBE",
FrameType.Ack => "ACK",
FrameType.Nack => "NACK",
FrameType.Begin => "BEGIN",
FrameType.Commit => "COMMIT",
FrameType.Abort => "ABORT",
FrameType.Disconnect => "DISCONNECT",
FrameType.Heartbeat => "",
_ => "UNKNOWN"
};
}
private static async Task GetMessage(MemoryStream output, ClientWebSocket ws, CancellationToken cancellationToken)
{
var barray = new byte[128 * 1024];
// read first frame
ArraySegment<byte> buffer = new ArraySegment<byte>(barray);
var result = await ws.ReceiveAsync(buffer, cancellationToken);
if (result.CloseStatus != null)
{
throw new Exception($"Unexpected close: {result.CloseStatus}: {result.CloseStatusDescription}");
}
output.Write(barray, 0, result.Count);
while (result.EndOfMessage != true)
{
buffer = new ArraySegment<byte>(barray);
result = await ws.ReceiveAsync(buffer, cancellationToken);
output.Write(barray, 0, result.Count);
}
output.Seek(0, SeekOrigin.Begin);
}
private static StreamReader findBody(Stream input)
{
var output = new MemoryStream();
int count;
// read headers
do
{
// read one line
count = 0;
while (true)
{
var ch = input.ReadByte();
if (ch == -1) throw new Exception("Unexpected end of data");
byte b = (byte)(0xff & ch); // convert to byte
if (b == 13) continue; //skip CR
output.WriteByte(b);
if (b != 10) // LF - end of line
{
count++; // chars in line
}
else
{
break;
}
}
} while (count > 0); // finish when got empty line
output.Seek(0, SeekOrigin.Begin); // start read from begining
return new StreamReader(output, Encoding.UTF8); // return UTF8 reader
}
internal static async Task<Frame> GetFrame(ClientWebSocket ws, CancellationToken cancellationToken)
{
var utf8 = Encoding.UTF8;
var inputstream = new MemoryStream();
var bodyoutput = new MemoryStream();
try
{
await GetMessage(inputstream, ws, cancellationToken);
}
catch (TaskCanceledException)
{
return HeartbeatFrame; // just return empty frame
}
if (inputstream.ReadByte() == 10)
{
return HeartbeatFrame;
}
else
{
inputstream.Seek(0, SeekOrigin.Begin);
}
StreamReader reader = findBody(inputstream);
var cmd = reader.ReadLine();
if (!CmdMap.ContainsKey(cmd))
{
throw new Exception($"Bad STOMP Frame, unknown command {cmd}");
}
Frame frame = new Frame
{
Type = CmdMap[cmd]
};
// parse headers
var line = reader.ReadLine().TrimEnd();
while (line != "")
{
var colon = line.IndexOf(":");
if (colon < 1) // must exist and cannot by first character in the line
{
throw new Exception("Cannot parse header");
}
var key = line.Substring(0, colon);
var value = line[(colon + 1)..];
frame.Headers[key.ToLower()] = value;
line = reader.ReadLine().TrimEnd(); // next header
}
int length = -1;
if (frame.Headers.ContainsKey("content-length"))
{
if (!int.TryParse(frame.Headers["content-length"], out length))
{
throw new Exception("Error: not valid value of header content-length");
}
byte[] body = new byte[length];
inputstream.Read(body, 0, body.Length);
frame.Body = new ArraySegment<byte>(body);
}
else
{
var bodyStream = new MemoryStream();
int b;
while ((b = inputstream.ReadByte()) > 0) // not -1 and not 0
{
bodyStream.WriteByte((byte)b);
}
var bl = (int)bodyStream.Length;
byte[] data = bodyStream.GetBuffer();
frame.Body = new ArraySegment<byte>(data, 0, bl);
}
if (StompClient.Debug) Console.WriteLine("<<<\n{0}\n<<<\n", frame);
return frame;
}
}
}

101
arstomp/src/RPC.cs Normal file
View File

@ -0,0 +1,101 @@
using System;
using System.Threading.Tasks;
using System.Timers;
using System.Collections.Concurrent;
/// <summary>
/// Minimal implementation of stomp client
/// </summary>
namespace ArStomp
{
/// <summary>
/// Simple helper implementing Remote Procedure Call on stomp.
/// </summary>
public class RPC : IDisposable
{
private readonly ConcurrentDictionary<string, Request> requests = new ConcurrentDictionary<string, Request>(5, 10);
private Timer timer;
private readonly StompClient client;
/// <summary>
/// Creates new RPC layer fror given stomp client
/// </summary>
/// <param name="client">initialized and connected stomp client</param>
public RPC(StompClient client)
{
this.client = client;
client.OnMessage += this.OnMessage;
timer = new Timer(3000);
timer.Elapsed += OnTimer;
timer.Start();
}
/// <summary>
/// Calles remote method
/// </summary>
/// <param name="destination">destination, where request is sent</param>
/// <param name="body">content of the message</param>
/// <param name="timeout">how long we're going to wait for response (default is 15s)</param>
/// <returns>response for our call (<see cref="Frame"/> of returned message)</returns>
/// <exception cref="TimeoutException">if there is no reponse after timeout</exception>
public async Task<Frame> Call(string destination, byte[] body, TimeSpan timeout = default)
{
string correlationId = Guid.NewGuid().ToString();
TimeSpan to = (timeout != default) ? timeout : TimeSpan.FromSeconds(15);
var req = new Request(correlationId, to);
if (!requests.TryAdd(correlationId, req))
throw new Exception("Request with this cid is already processed");
await client.Send(destination, correlationId, body);
return await req.Source.Task;
}
public void Dispose()
{
if (timer != null)
{
var t = timer;
timer = null;
client.OnMessage -= this.OnMessage;
t.Stop();
t.Dispose();
}
}
private void OnTimer(object sender, ElapsedEventArgs ea)
{
var ts = DateTime.UtcNow;
foreach (var i in requests)
{
if (i.Value.Timeout < ts)
{
if (requests.TryRemove(i.Key, out Request req))
{
req.Source.SetException(new TimeoutException());
}
}
}
}
private void OnMessage(object sender, SubscriptionEventArgs ea)
{
var cid = ea.Frame.Headers["correlation-id"];
if (requests.TryRemove(cid, out Request req))
{
req.Source.SetResult(ea.Frame);
}
// else skip unknown message
}
}
internal sealed class Request
{
public readonly DateTime Timeout;
public readonly string Cid;
public TaskCompletionSource<Frame> Source { get; } = new TaskCompletionSource<Frame>();
public Request(string cid, TimeSpan waitingTime)
{
this.Cid = cid;
this.Timeout = DateTime.UtcNow + waitingTime;
}
}
}

291
arstomp/src/StompClient.cs Normal file
View File

@ -0,0 +1,291 @@
using System.Security.Cryptography.X509Certificates;
using System.Net.Security;
using System.Net.WebSockets;
using System.Threading;
using System.Threading.Tasks;
using System.Text;
using System;
using System.Collections.Generic;
using System.Linq;
namespace ArStomp
{
/// <summary>
/// Stomp client
/// </summary>
public class StompClient
{
private Task runner = null;
public static bool Debug { get; set; } = false;
private ClientWebSocket ws = new ClientWebSocket();
public CancellationTokenSource Token { get; } = new CancellationTokenSource();
private readonly X509Certificate2Collection certCollection;
private readonly Dictionary<string, Subscription> subs = new Dictionary<string, Subscription>();
/// <summary>
/// Handler of incoming messages.
/// Used only for RPC. <see cref="Subscription">Subsscriptions</see> have own handlers.
/// </summary>
public event EventHandler<SubscriptionEventArgs> OnMessage;
/// <summary>
/// Creates new object.
/// Supports RabbitMQ.
/// Works with binary frames.
/// </summary>
/// <param name="certCollection">collection of root ca certificates (if TLS is used)</param>
public StompClient(X509Certificate2Collection certCollection = null)
{
ws = new ClientWebSocket();
ws.Options.AddSubProtocol("v12.stomp");
if (certCollection != null && certCollection.Count > 0)
{
this.certCollection = certCollection;
ws.Options.RemoteCertificateValidationCallback = RemoteCertificateValidationCallback;
}
}
internal void InvokeOnMessage(SubscriptionEventArgs args)
{
try
{
OnMessage?.Invoke(this, args);
}
catch
{
// skip all exceptions
}
}
/// <summary>
/// Verify cert chain in case of using own (custom) ca certificates for TLS
/// </summary>
/// <param name="sender">tls stream</param>
/// <param name="certificate">server's certificate</param>
/// <param name="chain">constructed chain of trust</param>
/// <param name="sslPolicyErrors">detected errors</param>
/// <returns>true if server certificate is valid, false otherwise</returns>
private bool RemoteCertificateValidationCallback(object sender, X509Certificate certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors)
{
// if there is no detected problems we can say OK
if ((sslPolicyErrors & (SslPolicyErrors.None)) > 0) return true;
// sins that cannot be forgiven
if (
(sslPolicyErrors & (SslPolicyErrors.RemoteCertificateNameMismatch)) > 0 ||
(sslPolicyErrors & (SslPolicyErrors.RemoteCertificateNotAvailable)) > 0
) return false;
// last certificate in chain should be one of our trust anchors
X509Certificate2 projectedRootCert = chain.ChainElements[^1].Certificate;
// check if server's root ca is one of our trusted
bool anytrusted = false;
foreach (var cert in certCollection)
{
anytrusted = anytrusted || (projectedRootCert.Thumbprint == cert.Thumbprint);
}
if (!anytrusted) return false;
// any other problems than unknown CA?
if (chain.ChainStatus.Any(statusFlags => statusFlags.Status != X509ChainStatusFlags.UntrustedRoot)) return false;
// everything OK
return true;
}
private void ExpectFrame(Frame frame, FrameType expected)
{
if (frame.Type != expected)
{
var reason = "Unknown reason";
if (frame.Type == FrameType.Error)
{
if (frame.Body != null)
{
reason = Encoding.UTF8.GetString(frame.Body);
}
}
throw new Exception($"Unexpected frame '{frame.Type}'. Message from server: {reason}");
}
}
/// <summary>
/// Connects to stomp server.
/// Uses <see cref="ClientWebSocket"/>.
/// </summary>
/// <param name="uri">uri in format ws://host[:port][/path] or wss://host[:port][/path]</param>
/// <param name="login">login name</param>
/// <param name="password">password</param>
public async Task Connect(Uri uri, string login, string password)
{
if (runner != null) throw new Exception("Cannot connect in this state. Should close before");
var ct = Token.Token;
await ws.ConnectAsync(uri, ct);
StompFrm connect = new StompFrm(login, password);
await connect.Serialize(ws, ct);
Frame fr = await Helpers.GetFrame(ws, ct);
ExpectFrame(fr, FrameType.Connected);
runner = Run(); // Run is async
}
/// <summary>
/// Reports state of conection
/// </summary>
/// <returns>true if it looks like we have proper connection to server</returns>
public bool IsConnected()
{
return ws.CloseStatus == null;
}
/// <summary>
/// Send message
/// </summary>
/// <param name="destination">queue o exchange (eg /exchange/name/routing-key in case of RabbitMQ)</param>
/// <param name="correlationId">property correlationId for the message</param>
/// <param name="body">content of the message</param>
public ValueTask Send(string destination, string correlationId, byte[] body)
{
var ct = Token.Token;
SendFrm send = new SendFrm(destination, correlationId, body);
return send.Serialize(ws, ct);
}
private int SubId = 0;
/// <summary>
/// Create news subscription
/// </summary>
/// <param name="destination">eg /exchange/name/rouring-key</param>
/// <returns><see cref="Subscription"/> object</returns>
public async Task<Subscription> Subscribe(string destination)
{
var ct = Token.Token;
var id = $"sub-{++SubId}";
var sub = new Subscription()
{
Destination = destination,
Id = id
};
var sframe = new SubscribeFrame(id, destination);
await sframe.Serialize(ws, ct);
subs.Add(id, sub);
return sub;
}
/// <summary>
/// Cancel subscrption
/// </summary>
/// <param name="sub">object of subscription</param>
public async Task Unsubscribe(Subscription sub)
{
if (subs.ContainsKey(sub.Id))
{
subs.Remove(sub.Id);
var ct = Token.Token;
var sframe = new UnsubscribeFrame(sub.Id);
await sframe.Serialize(ws, ct);
}
}
private async Task Run()
{
var ct = Token.Token;
try
{
while (!ct.IsCancellationRequested)
{
Frame fr = null;
try
{
fr = await Helpers.GetFrame(ws, ct);
}
catch (ThreadInterruptedException)
{
break;
}
if (fr.Type == FrameType.Error) ExpectFrame(fr, FrameType.Message);
if (fr.Type == FrameType.Message)
{
if (fr.Headers.ContainsKey("subscription"))
{
if (fr.Headers["subscription"] == "/temp-queue/rpc-replies")
{
InvokeOnMessage(new SubscriptionEventArgs(fr));
}
else
{
var sub = subs[fr.Headers["subscription"]];
if (sub != null) sub.InvokeOnMessage(new SubscriptionEventArgs(fr));
else Console.WriteLine("Nieoczekiwany komunikat {0}", fr);
}
}
}
}
}
finally
{
await Close();
}
}
/// <summary>
/// Cancel current operaton and close connection
/// </summary>
/// <returns></returns>
public async Task Close()
{
try
{
Token.Cancel();
var ct = new CancellationTokenSource().Token;
await ws.CloseAsync(WebSocketCloseStatus.Empty, null, ct);
}
catch
{
// skip
}
}
}
/// <summary>
/// Represents the subscpriotn
/// </summary>
public class Subscription
{
/// <summary>
/// Id of this particular subscription
/// </summary>
public string Id { get; internal set; }
/// <summary>
/// Destination used with this syubscription
/// </summary>
/// <value></value>
public string Destination { get; internal set; }
/// <summary>
/// Handler fro incoming messages
/// </summary>
public event EventHandler<SubscriptionEventArgs> OnMessage;
internal void InvokeOnMessage(SubscriptionEventArgs args)
{
try
{
OnMessage?.Invoke(this, args);
}
catch
{
// skip all exceptions
}
}
}
/// <summary>
/// Arguments for OnMessage handlers
/// </summary>
public class SubscriptionEventArgs : EventArgs
{
/// <summary>
/// The frame (MESSAGE) got from server
/// </summary>
/// <value></value>
public Frame Frame { get; }
internal SubscriptionEventArgs(Frame f)
{
Frame = f;
}
}
}

150
arstomp/src/StompFrame.cs Normal file
View File

@ -0,0 +1,150 @@
using System;
using System.Collections.Generic;
using System.Net.WebSockets;
using System.Threading;
using System.Threading.Tasks;
using System.IO;
using System.Text;
namespace ArStomp
{
/// <summary>
/// Possible STOMP frame types. Not everything is used in this library
/// </summary>
public enum FrameType
{
// no type
Unknown = 0,
// server frames
Connected,
Message,
Receipt,
Error,
// client frames
Stomp,
Send,
Subscribe,
Unsubscribe,
Ack,
Nack,
Begin,
Commit,
Abort,
Disconnect,
Heartbeat // fake, but gives common view of server messages
}
/// <summary>
/// Represents the frame of STOMP protocol: message, command or error
/// </summary>
public class Frame
{
/// <summary>
/// Type of frame
/// </summary>
public FrameType Type { get; internal set; }
/// <summary>
/// Headers from STOMP frame
/// </summary>
/// <typeparam name="string">header's name</typeparam>
/// <typeparam name="string">header's value</typeparam>
public Dictionary<string, string> Headers { get; } = new Dictionary<string, string>();
/// <summary>
/// Content (body) of the message.
/// Not parsed and not processed in any way.
/// </summary>s
public ArraySegment<byte> Body { get; internal set; }
public override string ToString()
{
StringBuilder sb = new StringBuilder();
sb.Append(Helpers.GetCmdString(Type)).Append("\n");
foreach (var i in Headers)
{
sb.AppendFormat("{0}:{1}\n", i.Key, i.Value);
}
sb.AppendFormat("Body size: {0}", Body.Count);
return sb.ToString();
}
internal ValueTask Serialize(ClientWebSocket ws, CancellationToken cancellationToken)
{
var utf8 = Encoding.UTF8;
var EOL = utf8.GetBytes("\n");
var COLON = utf8.GetBytes(":");
var NUL = new byte[] { 0 };
var stream = new MemoryStream();
// write command
var cmd = Helpers.GetCmdString(Type);
stream.Write(utf8.GetBytes(cmd));
stream.Write(EOL);
// write headers
foreach (var i in Headers)
{
stream.Write(utf8.GetBytes(i.Key));
stream.Write(COLON);
stream.Write(utf8.GetBytes(i.Value));
stream.Write(EOL);
}
// write empty line
stream.Write(EOL);
// write body
if (Body != null && Body.Count > 0)
{
stream.Write(Body);
}
// write NUL character
stream.Write(NUL);
stream.Flush();
var array = stream.GetBuffer();
if (StompClient.Debug) Console.WriteLine(">>>\n{0}\n>>>\n", this);
return ws.SendAsync(array.AsMemory(0, (int)stream.Position), WebSocketMessageType.Binary, true, cancellationToken);
}
}
internal class StompFrm : Frame
{
public StompFrm(string login, string passwd)
{
Type = FrameType.Stomp;
Headers["login"] = login;
Headers["passcode"] = passwd;
Headers["accept-version"] = "1.2";
}
}
internal class SendFrm : Frame
{
public SendFrm(string destination, string correlationId, byte[] body)
{
Type = FrameType.Send;
Headers["destination"] = destination;
Headers["reply-to"] = "/temp-queue/rpc-replies";
if (correlationId != null) Headers["correlation-id"] = correlationId;
Headers["content-length"] = body.Length.ToString();
Body = body;
}
}
internal class SubscribeFrame : Frame
{
public SubscribeFrame(string id, string destination)
{
Type = FrameType.Subscribe;
Headers["destination"] = destination;
Headers["id"] = id;
}
}
internal class UnsubscribeFrame : Frame
{
public UnsubscribeFrame(string id)
{
Type = FrameType.Unsubscribe;
Headers["id"] = id;
}
}
}