Handler API
Almost all interaction with the application is via the use of Handlers. Each handler defines an Endpoint/Channel in the Server. Communication from the Client mostly consists of Messages addressed to a particular Endpoint (apart from Direct or Cross Functional Handlers).
The handlers are not loaded by class name or similar as is common in some config oriented approaches, instead you must supply a List of fully constructed handlers to the Server code. This allows complete control of how the objects are created and wired together.
As each Endpoint has a specific behaviour depending on its type (and config), most applications will consist of multiple Endpoints each of which provides a particular function and behaviour. When designing applications, one thing to keep in mind is that the micro-service concept is a pattern that works well within the server as well as across servers. Accordingly, it is recommended that each Endpoint should be kept simple and narrow (and ideally do one thing), and complex behaviour should be constructed by composing Endpoint behaviours.
NOTE: All Handlers (except the ConnectionHandler) are asynchronous in behaviour and it is VERY IMPORTANT that any long running activity is delegated to a different thread otherwise the read thread will be blocked from servicing other users.
Handler Types
There are a number of Handlers in the Rubris API:
- AsynchronousHandler [E]
- This type of handler is for Request/Reply messages
- ConnectionHandler
- This handler authorises browser connections
- UserClientHandler
- This handler authorises new User Sessions
- SharedSubscriptionHandler [E]
- this handler represents common subscription topics that all users on the same channel share
- GroupSubscriptionHandler [E]
- this handler represents subscription topics that users in the same group on a channel share. These are private topics scoped to the group with the same semantics as SharedSubscription topics
- PrivateSubscriptionHandler [E]
- This handler represents topics that are private to users
- DirectPathHandler [E]
- This handler is for handling direct HTTP request/response outside of the Rubris/EngineIO protocol
- LoginHandler
- handles login/logout using the Rubris protocol (may be removed as it is not that useful with SSO)
- EndpointAuthorisationHandler
- this handler is used to allow or deny user access to an endpoint (defined using one of the user handlers above on the server)
Note: The handlers marked with [E] represents a user endpoint. The others are functional handlers for other behaviours.
In essence then there are really 5 types of User Endpoints that can be defined:
- Request/Reply
- Shared Subscription
- Group/User Scoped shared Subscription
- Private Subscription
- Direct Path
the other handlers are supporting callbacks for cross-cutting functions.
Configuring an Endpoint Name
A name for an endpoint is mandatory and can be provided either using the Endpoint annotation or implementing the EndpointNameProvider interface:
@Endpoint(name = "meta") class MetaHandler implements AsynchronousHandler { ... }
Or:
class LoggingHandler implements AsynchronousHandler, EndpointNameProvider { @Override public String name() { return "content"; } ... }
RPC Endpoints
For RPC Handlers there is really only the name to be chosen as an option. All other behaviour is contained in your own code.
@Endpoint(name = "meta") class MetaHandler implements AsynchronousHandler { @Override public void onMessage(User user, Object message, DataHandle handle) { String s = new String((byte[]) message); LOGGER.debug("Meta Message received " + message); handle.onMessage(s + " response META"); } }
The code above shows the simplest possible reply to the original message. The method is completely asynchronous and returns no value. Instead all communication takes place using the DataHandle
. It is perfectly valid to ignore the data handle and return no response.
As covered more fully in the Threading section, unless you are doing something that is very simple it is strongly recommended that ALL ACTIVITY takes place asynchronously on another thread and the calling thread is never blocked. In that case the Class would look something like:
@Endpoint(name = "meta") class MetaHandler implements AsynchronousHandler { // you probably want this injected onto the class ExecutorService service = Executors.newFixedThreadPool(2); @Override public void onMessage(User user, Object message, DataHandle handle) { service.execute(new Runnable() { @Override public void run() { String s = new String((byte[]) message); LOGGER.debug("Meta Message received " + message); handle.onMessage(s + " response META"); } }); } }
All messages will be delivered as a byte[] payload by default. The DataHandle is entirely private to that message and can be used only once.
From the browser’s perspective each RPC message carries with it a messageId. When replying to the message the response will carry back the messageId of the original request Message as the messageId of the response to enable correlation of RPC messages.
There is no need for the messageIds to be sequential or even be different from the server’s perspective, it is entirely controlled by the client sending the message.
Multiple Responses for RPC messages
Although the term RPC is used here in actual fact it is possible to use the RPC handle multiple times to send multiple responses. An example use case for this is that the message you want to return is too big to send all at once but needs to be split up into multiple pages/fragments to prevent blocked sends when writing large buffers to the network. A common case is paging large State of The World messages.
A contrived example:
@Endpoint(name = "RPCPaging") class RPCPagingHandler implements AsynchronousHandler { Executor exec = new RetryExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); @Override public void onMessage(User user, Object message, DataHandle handle) { // execute with upto 10 retries exec.execute(new PagingRunner(handle, 10)); } // dummy results method protected List<String> getResults() { int size = 2000; List<String> res = new ArrayList(size); for (int i = 0; i < size; i++) { res.add("Str" + i); } return res; } // simple runnbale that keeps track of its state class PagingRunner implements Runnable { int i = 0; int retries = 0; List<String> res; DataHandle handle; int maxRetries ; User user; PagingRunner(User user, DataHandle handle, int maxRetries) { this.user = user; this.handle = handle; this.maxRetries = maxRetries; } @Override public void run() { if (i == 0) { res = getResults(); } while (i < res.size()) { if (handle.onMessage("page "+ res.get(i))) { i++; } else { // we just break and keep the resubmit deal with this // as we are probably a bit slow break; } } } protected boolean isIncomplete() { return retries++ < maxRetries & i < res.size(); } protected int remaining() { return res.size() - i; } } // actual work is here public class RetryExecutor extends ThreadPoolExecutor { private final Logger LOGGER = LoggerProxy.getLogger(RetryExecutor.class); public RetryExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); PagingRunner pr = (PagingRunner) r; if (pr.isIncomplete()) { LOGGER.info("Sending incomplete data with retry " + pr.retries); this.execute(r); } else if ((pr.remaining()) > 0) { // we probably want a it more action here than logging LOGGER.info("Unable to send complete data to "+pr.user+" with topics remaining " + pr.remaining()); } } } }
The main factor to be aware of is that all the outgoing messages sent on a handle instance will have the same message id in order to correlate on the client with the request. If adopting this approach, your messages should have enough information in order for the client to work out when it has received all the segments for the message (e.g some sort of m of n indicator or finished flag).
It is not recommended that you keep RPC handles beyond a single request -> response cycle as otherwise you will lose the benefit of the messageId correlations and it will become tricky to manage which ones are active, especially as there is no unsubscribe equivalent, placing a large burden on your own code to track handles to client state.
They are intended to be discarded after a single response or set of responses that are logically grouped together.
If you find the RPC message rejected it is advised that you look at the settings for clientDirectQueueSize.
Other strategies such as defered scheduling for rejected messages are also possible.
Sending Messages
All handlers pretty much implement the same mechanism for achieving this. Each Handler type supplies a Handle that can be used to send one or more replies (or none) to the user depending on the type of the Endpoint. Each handle has an onMessage(Object obj) method that is used to send data to the client (e,g):
public interface MessageHandle { public boolean onMessage(Object message); }
Although defined as Object, currently the only allowed type is byte[] or String (Strings are converted to byte[] (using UTF-8) internally prior to sending). Further types or plugin serialisers may be added in the future.
RPC Endpoint handle types also support an onError method.
public interface DataHandle { public boolean onError(Object message); }
The onError
sets a fail status on the RPC message received by the Client in its callback.
Endpoint Authorisation
In addition to Connection and User level Authorisation (see below), rubris enables a coarse grain authorisation at a channel level.
The mechanism to do this is to implement the EndpointAuthorisationHandler interface
/** * A mixin interface that when implemented on an endpoint handler will receive * callbacks for the user object * to enable restriction as to whether the user is allowed access the endpoint. * * For SharedSubscriptions particularly this enables the implementor to veto a * user being able to subscribe to an endpoint, even though * normally the subscribe handle provides no indication of the users attached to * the endpoint. * * For the other types of endpoint this is less useful, but may have a role in * enabling some forms of authorisation. * * Unlike the {@link ClientHandle} or {@link ConnectionHandle} this results * in a message with an AUTH_FAIL status being returned to the client, rather * than, as with the latter 2 handles which result in a connection level error. * * @author steve * */ public interface EndpointAuthorisationHandler { /** * Return whether to authorise the user object for the {@link Endpoint}. * Return True to enable and False to deny access. * * @param user * @return */ public boolean allow(User user); }
This interface will be called back on every user message to the channel. For RPC endpoints a failure will result in a success status message being returned to the client with a status of AUTH_FAIL. This should be checked on the client. For subscribe endpoints this results in an authorisation failure message and the client being disconnected.
An accept will allow the processing of the message to continue.
Finer grained topic permissions can be achieved using the Namespaced topics or Topic authorisation as outlined below, or using Groups to partition the topics transparently. Alternatively, depending on the requirements of the authorisation, (e.g for applications that have a restricted user base that is well defined) it may be a better design to implement the SharedSubscriptionMembershipHandler and deal with users on a topic or channel asynchronously and then kill the User object and blacklist the user. This enables us to keep an asynch model and still control user’s membership without the overhead of individual subscription.
In general, for RPC and PrivateChannels it is better to deal with authorisation individually rather than this form of auth.
Shared Subscriptions
A Shared Subscription endpoint is essentially a name space under which an arbitrary number of topics can be defined. The topic namespace has no intrinsic hierarchy and is entirely flat. Similarly, there is no concept of default wildcard topics or subscriptions.
The absence of default wildcards or pattern subscriptions is predicated on the fact the bulk subscribe/unsubscribe of topics (up to 2048 topics per subscribe message) is the preferred mechanism to deal with large amount of subscriptions, is simpler to reason about and gives the client clear visibility of its topic set.
Generally, it is a far more useful pattern to retrieve a set of topics that are available for the user using an RPC call and subscribe to those in bulk.
Notifications for new topic availability can easily be added on an update channel to inform the client when to retrieve update to the topics available for itself. Combining this with sequence Ids or hashes can greatly help with only needing to retrieve the new/deleted topics and is a lot simpler to reason about, and provides a much cleaner authorisation model.
Topics are entirely arbitrary and are created through the act of subscription. For example, on a trading application topics for a PRICE Endpoint may be ISIN of a bond, an equity ticker symbol(AAPL, IBM etc) or an FX code (EUR,USD etc). However, a topic can easily represent something more abstract. For example, on an endpoint that streams trade amendments or new trades, each topic may be a time period such as a day, or for live trade updates for the day or an hour time slot etc.
A Shared Subscription is one where all the users connected to the same topic on the Endpoint see the same updates. See the Batching docs for discussion on how queueing affects message visibility.
Shared subscriptions are named as shown above:
class LoggingSharedHandler implements SharedSubscriptionHandler, EndpointNameProvider { @Override public String name() { return "LOGGING"; } ... }
Lifecycle behaviour
The lifecycle of a subscription is:
- The first subscription on a topic will result in a call to
onSubscribe(TopicHandle handle)
- Subsequent subscribes will be added to the Topic Handle and will receive copies of messages sent using the handle. The user code will NOT receive further subscribe callbacks for any subsequent subscribes to the same topic by other users unless all users unsubscribe first and a new subscription to the same topic occurs.
- The last unsubscribe on a handle will result in a call to
-
onUnsubscribe(TopicHandle handle)
Given the threading behaviour in Java it is possible for all the users to unsubscribe from a topic to generate an unsubscribe but in the meantime a new subscribe arises which generates a new TopicHandle and the new handle to be received in the callback code before the unsubscribe has run.
It is important if you are using the handles in a map to call remove using the handle instance as the object to remove. Using just the topic name is liable to result in a race condition. i.e if using something like ConcurrentHashMap use the remove(Object key, Object value)
method rather than remove(Object key)
.
Thread safety
Endpoints are themselves inherently multi-threaded. However, TopicHandles are NOT thread safe and if you have multiple threads writing to the same topic you must synchronise this externally. It is far better if using multi-threaded Endpoints to partition the topicHandles by thread or adopt some other strategy to ensure they are not shared.
A recommended pattern in this case is something like StripedLock. Where each lock covers a (small) subset of the handles.
Message behaviour
The TopicHandle can be used to send messages to all users currently subscribed to it. By default the queue for a Topic is a slot representing a single message. Therefore repeated writing to the onMessage method over-writes this slot with the new value.
Deltas are not supported directly (although they can be emulated using queues to some extent). Delta payloads are generally more trouble than the advantages they give and are not considered a valid use case for Shared topics. Keep your messages small and self contained.
On a send the Client objects on the server subscribed on a handle are notified that there is a message pending each time the onMessage is called. The notified Clients then consume the message from this Handle just prior to sending to the browser (rather than have each message pushed into a queue for them).
The Client objects therefore have no Queue of messages themselves (for shared subscription type endpoints) and will consume whatever message was last written to the handle as part of its activity cycle following notification. If messages are produced in quick succession then (depending on the performance of the client and frequency of polling) clients will potentially not see in-between messages (or even all clients see all messages). Although they will all at least be guaranteed yo see the last message.
For endpoints with streaming data where the last element only is important (such as prices), this substantially reduces the latency and potential for staleness and means there is in effect no back-pressure to manage for this type of data (apart from the individual connection’s TCP RX/TX queues).
This also vastly reduces the need for conflation type approaches and reduces the memory issues and latency effects of queues.
By default clients joining a shared topic will have to wait until the next message tick to receive an update. However, various options are also available to change this behaviour as detailed below.
Enabling lastMessage snapshot
Optionally you may want new users joining on a topic to be sent the last message written to the handle without waiting for the next tick on the topic and without wanting to maintain the overhead of a queue yourself. This functionality is extremely useful and removes the need to implement a cache of data yourself when combined with pinned topics.
This can be achieved using the LastMessageSnapshot annotation as shown below:
@LastMessageSnapshot class SnapshotSharedHandler implements SharedSubscriptionHandler, EndpointNameProvider { .... }
This annotation can provide a reasonably effective near side cache for messages (at a depth of 1) for new clients and an implicit state of the world on the topic (if the messages are self-contained) without the application code having to do anything else to support this.
This annotation is only applicable to Shared or Group handles. When used in conjunction with the Queued
annotation it is only valid if startAtOldest
is set to false. In this case the last message on the Queue is returned to the user on subscription. It is invalid to use in combination with startAtOldest=true
and will cause an error to be thrown at start up.
Enabling Queued messages
For some use cases it is preferable that the users on a shared topic are able to obtain an ordered history of messages (up to a certain point) and be able to both see older messages and ensure that messages are to some extent not lost if they are blocked or fall behind. Note: this is a weak guarantee of delivery, rather than a strong guarantee (see below).
This is enabled using either the Queue annotation as shown below:
@Queue(depth=64, startAtOldest=true) class OrderedMessageHandler implements SharedSubscriptionHandler, EndpointNameProvider{ @Override public String name() { return "news"; } ... }
Or implementing the EndpointQueueProvider which provides the same functionality as the annotation.
class OrderedMessageHandler implements SharedSubscriptionHandler, EndpointNameProvider, EndpointQueueProvider{ @Override public String name() { return "news"; } @Override public int queueSize() { return 64; } @Override public boolean startAtOldest() { return true; } }
The Queue configuration above creates a circular Queue on the handle with a default depth of 64 (the depth can be altered - but must be a power of 2 and if not, it is rounded up to the next power of 2). Up to depth(64) messages are then available to be delivered to late arriving consumers or slow clients. A 65th message will overwrite the first message and so on. The startAtOldest flag starts late arriving clients at the oldest message in the queue and the client will attempt to retrieve all of them in sequence in a fast forward manner until the latest is reached. Setting this to false will ignore messages already in the queue and adding LAstMessageSnapshot will return just the last message (if any) in the queue.
The Queue is managed with a single Circular buffer in the Handle and each client keeps track by index as to where it is up to. If a client is very slow (or the updates are very fast) and is lapped by the writing thread it is possible for message loss to occur from the perspective of that client. Under these circumstances the client will fast forward itself to the oldest message that is still valid and start sending again from that point.
Missed messages for clients can be determined by implementing:
public interface QueueMessageLossCallback { public void onMessageLoss(User user, String topic, String channel); }
on your SharedSubscriptionHandler.
Back-pressure for a topic is therefore limited to the Queue depth specified, irrespective as to the number of clients.
All TopicHandles in an endpoint inherit the same depth and behaviour. For varied behaviours on different topics you should create different endpoints which themselves can be configured with different parameters.
This is not a general panacea for all topics keeping a history. The queue is not free. It does consume more memory, has a higher processing cost (so if you have hundreds of topics in this manner the memory profile will be substantially higher). It also means that all payloads for the last queuedepth number are not eligible for garbage collection until they are overwritten on the next cycle. This is because it is not viable to work out if all consumers have consumed the message and are slow without sampling all consumers in each push (which is just too slow to be viable).
As well as filling memory, setting very large queue depths will also cause longer latencies for clients. Like most things it is a tradeoff and while this pattern proves very useful for things like news/event/desk messaging streams where the last n messages are important it is Very Strongly discouraged for things like pricing.
Although, queue usage is intentionally constructed to provide a reasonable attempt to deliver messages in order without loss to users, it does not attempt to guarantee this behaviour and for stability reasons slow consumers or very fast updates should not create large knock on effects for other users or the application as a whole. Rather clients at their own pace report message loss and the application can decide whether this matters on this particular topic/channel and what to do about it (like disconnect and reconnect the user).
For this reason one must be extremely careful about trying to construct a delta type approach on this type of Queue. Further discussion on this and the reasoning behind this design is presented in the Batching doc.
Permissioning Topics
In Rubris there are essentially 3 ways to “permission” topics depending on how we want to treat the constraints and the trade offs we want to incur.
- Group Topics
A group is an automatic namespacing for topics where all users in the same “group” on the server have their topics automatically partitioned on the server. This is really useful for simple partitioning for things like multi-winodws for the same user, or firm groups and is discussed further in Group Subscriptions. The cost for this is low and the group addition is transparent to he Client. - Namespaced Topics
This provides similar functionality to Groups except the user can be a member of multiple arbitrary namespaces rather than a single group. This is further outlined in the next section. there is more flexibility , but at the price of a lookup for a topic subscription into an internal data structure for the topic prefix matching. - Topic Level Authorisation
Topic level authorisation provides very granular permissioning in a more traditional manner where a callback is invoked for each topic dynamically. This allows the user code to take responsibility for authorising the topics individually, however it has the potential to introduce potentially significant performance overheads compared to the other 2 approaches. This is outlined in Topic Level Authorisation.
Namespaced Topics
Namespaced topics are a topic permission restriction on top of the normal shared topic behaviour. The topic semantic namespace is used to restrict which topics a user can join. For example we have UserA and want to restrict him to “/mypath/prices/eprices/*” and “/mypath/prices/quotes/*” topics in the channel.
In general this approach provides much of the functionality we require from topic authorisation, with less overhead than individual topic permissioning due to:
- it is much more efficient do this than checking each topic manually
- fine grained permissions tend to become annoying very quickly (everything then gets set to true)
- prevents in-path lookups during the subscribe per topic (if this in the 1000 topic set range this can have significant impact)
However, fine grained permissioning is available when this is too restrictive or dynamic permissioning is required (see next section).
Namespacing is achieved on the server by implementing on the SubscriptionHandler the NamespaceProvider interface:
public interface NamespaceProvider { public NamespaceProviderResult namespaces(User user); public int namespaceCacheSize(); }
In a manner similar to GroupSubscriptions, the namespaces a user can subscribe to are defined in the NamespaceResult. This callback is used on first connect to the Channel by the user. All subsequent subscribes use this result until the user disconnects and reconnects. Unlike groups, a user can be a member of any number of namespaces (or all) and the namespaces are part of the topic string that is roundtripped to the user and must be used in its complete form in the subscribe/unsubscribe. The namespace interface is not applicable to Group channels.
For the client code this introduces some slight burden if the code wants to hide or split at the separator in the view and developers should be aware of this.
The cachesize is used to direct the server how many namespace entries should be cached as they are converted from strings (this is an overall String cache per channel and is not related to the namespace datatstructure used by each user connection). Generally this should be slightly more than the total number of namespaces on a channel if it is in the sub 20,000 range. This is an upper range to prevent excessive garbage aggregating over time rather than an allocation. Setting the value to 0 prevents any cache buildup.
The NamespaceProviderResult must implement this interface:
public interface NamespaceProviderResult { public static final String ANY ="*"; List<String> allowedNamespaces(); }
The list of Strings returned defines the namespaces that a user is allowed to subscribe to. The namespaces are always prefixes in a topic and are a sequence of characters up until a predefined separator. The namespace itself can contain the separator character. The topics in a namespace channel therefore must follow the pattern “[[separator|namespace]][separator][characters]” e.g “myprefix_restoftopic”, or “/myprefix/prefix2/restoftopic” (using _ or / as separators).
”*” (ANY) is a special character that is not a real wildcard, rather it is a directive to allow any topic to be subscribed to for that user.
Returning an empty or null list will prevent the user subscribing to any topic.
The separator character for a channel is “_” by default but can be set to any byte character (ASCII printable) using the:
public interface EndpointTopicSeparatorProvider { public byte getTopicSeparator(); }
All namespaces returned must either end with the separator you have specified (or the default), or one will be added to the end of the String when building the permission structure (this means you cannot have a a double separator at the end of the namespace). There is no wildcard or equivalent regex support as part of the namespace, as each namespace is a complete prefix, so each must be specified in full. Defining a namespace that is a prefix of another in the same list will result in the longer namespace being removed and the shorter taking precedence. There is no use setting [“/pathA”,”/pathA/pathB”] as “/pathA/pathB” will be discarded.
Using this mechanism it is therefore easy to represent a tree semantic space among topics on the channel and give each user permissions into different parts of the tree. For example: for userA we define [“pathA_pathB”, “pathA_pathC”] and userB we define [“pathA”]. UserB can connect to any topic starting “pathA_”, while userA can only subscribe to those starting “pathA_pathB_”, or “pathA_pathC_”.
Internally the namespaces are represented similarly to a compressed radix tree per user/channel so the lookup cost is approximately proportional to the length of the namespaces, rather than the number of prefixes. The tree is built in the order of the list so a bias is present in checking skewed to the list order.
The topic check overhead, while not substantial, is not free, and some thought should be given as to how the namespace tree is structured and the lengths involved. This is especially true if your subscription messages contain a 1,000 or more topics, as some slow down will occur compared to unrestricted subscribes.
Topic Level Authorisation
If Namespaced or Group topics do not provide enough flexibility it is possible to provide arbitrary authorisation via the TopicAuthorisationProvider
interface.
public interface TopicAuthorisationProvider { /** * Callback to allow access to a topic or not. The user object is the user and the sessionId represents the session instance in the browser. * * @param user the user * @param sessionId the session instance in the browser * @param topic the full topic name * * @return true to allow and false to deny */ public boolean allow(User user, String sessionId, String topic); }
There are some caveats in using this interface that one must be aware of:
- The checking is inline in the main server execution thread and therefore you MUST not perform and call that will block or result in a network call, or take a long time, as this will block other pending subscriptions in the module.
- A fail will result in an ERROR returned to the client
- There is some overhead in creating the topic String for the first subscribe on a topic.
- repeated subscribes/unsubscribes on the same topic by a user will always result in a callback
The timing sensitivity of the call cannot be stressed enough, especially for endpoints where bulk subscribes happen. e.g If a client subscribes to 1000 items and each check takes 10ms. A second client will not be able to be processed for 10 seconds. Some structures can introduce blocking where you may not think it would be the case, so be careful that readers and writers on whatever structures you use do not give rise to this sort of behaviour. Generally, any structure that is included in the checking should either be updated out of band and swapped in, or be known to never to block readers, rather than use a form of concurrent structure that has a potential to block any readers.
Server Only Topic Creation
For some channels we may not want the clients to be able to create subscription topics on the fly. Instead it is preferable for the server to define all the topics and allow clients to subscribe only to pre-existing topics.
This is achieved by adding the marker interface:
public interface ServerManagedTopics { }
The interface instructs the server to reject all topics that are in the subscribe from remote clients if they do not already exist on the server. The ONLY way to create topics on a channel using this interface is to use the Server Managed topics functionality as outlined below.
This allows us to completely control channels where we know the topic universe and do not want to let the client add them dynamically.
Server initiation of Topics
As detailed above on channels with Server topic restricted semantics, or on occasions when it is useful for the server to generate topics in advance or ensure that topics do not unsubscribe when the last subscription is removed, we can register a callback to enable the server code to control topic creation. E.g if you want to push say a desk message every day to a set of subscribers but you want the message to keep accumulating for new users as a backlog even if there is no one subscribed (to achieve this you can use the Pin mechanism with a queue depth defined for the endpoint).
Alternatively, without the queue enabled it will keep a simple subscription alive so no subscribe/unsubscribe cycle will happen for the last user leaving/first user joining. This enables a simple but widely applicable in-memory cache type snapshot mechanism to be set up for things like reference data/opening prices without relying too much on user arrival/departure to drive this behaviour.
The mechanism to manage this on the shared subscription is the SubscriptionManager interface:
public interface SubscriptionManager { public void topicManager(TopicManagementHandle handle); }
Implementing this interface on a shared subscription endpoint will result in a callback to the topicManager
method with a TopicManagementHandle
. This handle can be used to create a topic subscription in the absence of subscribers or pin an existing topic to prevent a final unsubscribe destroying the TopicHandle.
The handle can be used at any time to create or pin topic handles and will result in an onSubscribe
callback if this is the first subscribe on the topic.
This is particularly useful in context with the snapshot/queued mechanism if one wants a behaviour similar to a bounded persistent queue of messages which users can join at any time and is always available on the server (note that persistence in this sense means while the server is running - it is not persisted in any storage sense).
The same handle can also be used to unpin a pinned topic to allow the normal unsubscribe mechanics to occur.
public interface TopicManagementHandle { /** * Supplies the channel name of the handle. * * @return channel name */ public String getChannel(); public boolean createOrPinTopic(String topic); public boolean createOrPinTopic(String topic, byte[] initialPayload); public void unpinTopic(String topic); }
The 2 argument createOrPinTopic
method allows us to set an initial payload on the created handle. However, if used without ServerManagedTopics
there is an inherent race where the handle can be created and a message added in the normal subscribe path. Accordingly, the 2 argument form is restricted to channels which also implement ServerManagedTopics
and attempting to use it on a non-managed channel will result in a runtime exception.
Controlling memory allocation
For Shared and Group endpoints the default number of clients expected to be added to an individual topic is 64 for Shared topics and 8 for GroupSharedTopics. This number affects the initial size of the data structure underlying the subscriptions.
If you expect to have large subscribe numbers then a large initial value prevents a large number of resizes (with associated memory costs). However, it does not bound the number that can be added. It is simply a dial for performance and GC. Similarly if you know you have small numbers for certain endpoints you can reduce wasted allocation by reducing these numbers.
To control this it is possible to use either the ClientsPerSubscription annotation:
@ClientsPerSubscription(expected=32) class TimingHandlerNoPush implements SharedSubscriptionHandler, EndpointNameProvider { .... }
or implement the EndpointClientPerSubscriptionSizeProvider
class TimingHandlerNoPush implements SharedSubscriptionHandler, EndpointNameProvider, EndpointClientPerSubscriptionSizeProvider { @Override public int expected() { return 32; } }
The mechanics are generally that a smaller number will allocate less memory per handle, while it will suffer slightly more of an amortised cost if resizes occur.
Users joining and leaving topics
As the Subscribe/Unsubscribe is not the same lifecycle as the users sharing the topic handle there is no User information passed on these callbacks. However, we sometime want to be able to have visibility of the membership of the topic. This can be achieved by implementing:
public interface SharedSubscriptionMembershipHandler { /** * Called when a user session joins the topic. If the User object is not supplied as part of the authentication mechanism this can be null. * @param user - A user Object (if attached to the client) * @param sid - a session identifier for the user * @param handle - the handle of the topic */ public void join(User user, String sid, TopicHandle handle); /** * Called when a user session leaves the topic through either an unsubscribe or the client being destroyed. * @param user - the user object (if applicable) * @param sid - the session identifier * @param handle - the handle */ public void leave(User user, String sid, TopicHandle handle); }
This callback can be used to see User sessions joining and leaving the handle. There some caveats with this however.
For Channels where the pattern is many topics with many users this is NOT recommended as the callback is per handle and the bulk subscribes/unsubscribes will be significantly affected by each topic resulting in a callback. This may result in many thousands of callbacks and will potentially affect to a significant degree the subscribe performance. Any activity driven by the callback should be in its own thread if non-trivial or if potentially blocking actions are called.
A join will be issued for the first user on the handle which will occur in addition to the subscribe callback.
The leave is issued when an unsubscribe is made by the client but can also occur when the client is destroyed or times out on the server (for example if the browser is closed).
The handle is fully usable from this callback, however multi-threaded access must be managed by the calling code as no effort is made to synchronise usage internally between the subscribe callback and the join callback.
As a single user can have multiple sessions the User object on its own may not be enough to distinguish a unique subscribe (depending on client usage). Therefore the sid, representing the Browser Engine-IO session is included.
As this is concurrent with subscribe/unsubscribe for the last user on the handle caution must be used on using this to callback as it may be invalid. In which case any on Message will return false.
Note: If you use topic pinning you will get a callback for a user that is the system user. The default UserName is $SYSTEM_USER$. This can be changed in the module config, but if you cast the User interface to your own implementation you should be aware this may fail of the System user is active for any topics.
Sending Errors
As with the RPC handle, the Shared handles also provide an onError method. This behaves slightly differently than in the RPC case. The resulting message is not marked with a fail status (as no such concept exists for push messages). Instead it is primarily useful to send messages that bypass any of the snapshot or queue behaviour to prevent errors littering the handles for new clients arriving.
As with normal sends, the onError message sends the data to all the currently connected clients on the topic. However, it does not interfere with the normal messages, and is not kept in the Handle for the LastMessageSnapshot or added to the Queue for Shared Handles that have a Queue configured. This is primarily useful for sending a transient error type response (for instance of the backend has a temporary failure) but you do not want to retain this message for future clients or make it part of the Queued set of messages for the fast forward functionality.
The errors are not queued internally and therefore subsequent errors on the same handle (if produced very quickly) will potentially overwrite the previous ones depending on the Client’s speed in consuming the messages. Therefore the client may receive all the errors, some of the errors or at a minimum the last error only. Although the errors will be delivered in order with respect to themselves, they may delivered out of order with the normal Messages for shared topics. The error message does not overwrite or interact with the normal message path and Vice-Versa. Be aware that the error path is more expensive than the normal path and does allocate some memory per message.
For Private Topics the Error is in effect a normal message and is treated accordingly in respect of queue offering and ordering. It therefore offers no benefit at the moment to use this method for Private Topics.
The message for the Client is a normal Push Message and the payload should be used if you want to indicate this data is different to a normal message.
Closing Handles
Occassionally it is useful to be able to force close a handle on the server, rather than wait for all the clients to unsubscribe.
This is achieved using the close()
method on the handle.
The close does not send a message to the clients. Instead it forces each client on the server to unsubscribe and remove itself from the subscription, as well as cleaning up any associated resources. It does not generate an unsubscribe callback, nor is the usermanagement callback (if present) invoked as clients are removed from the handle.
It is safe just to discard any reference you have to the handle after calling close.
As the close()
does not generate a message to the client it is up to the calling code to do this. This is simply achieved by calling onMessage
prior to close and this last message will be delivered to the attached clients prior to the handle being disposed.
If you do not send a final message the client will simply not get anymore messages from that topic.
There is no restriction on a client re-creating the topic if it wants to as the close only applies to the handle instance and not to the topic id.
All future messages on the handle will be rejected.
Group Subscriptions
Group subscriptions have all the same characteristics as Shared Subscriptions above except that a group of users have effectively a private Topic space on the same Channel. Multiple topics with the same name can exist on the Channel provided the members are in different groups.
This provides a useful mechanism that bridges the behaviour of the Shared and Private Subscriptions and provide a simple but effective “permission” behaviour with little overhead. The Group subscriptions enable the sharing of private information for a group of users that behaves as if each group had its own channel. Although we use Group in the sense of different related users (e.g in an organisation or desk) it is also a useful pattern for 1 user with multiple active sessions where sharing of the data is required across multiple browser windows or devices.
Groups cannot be combined with the namespace or Topic Authorisation functionality and differ from this in a number of aspects:
- A user session on a channel can only be in 1 group
- The prefixes added to the topic on the server are not visible to the client
- it is not a permission mechanism as such, rather it is a way of generating dynamic group specific topics that are private to a subset of users.
- It greatly simplifies circumstances where we want the client code to use the same topic names without group specific knowledge.
Note: Group endpoints may be deprecated in future versions as the Namespace functions provides similar behaviour, but in a more flexible form.
Creating Groups
Groups for users can be defined either through the use of the Group annotation or the endpoint implementing the GroupProvider interface. By default a group is derived from User.getUserName(). However, defining a custom Group provider allows an arbitrary grouping of users.
Further, for something like private price streams that have little or no queueing for a user (or group of users) - this is a much lower cost manner of providing this behaviour than using private topics directly (which have much higher setup and queue costs). Group channels combined with queues possess the behaviours described in the shared queue functionality and there are drawbacks in using large queues in this manner that should lead one to strongly prefer private queues if large queue sizes are needed.
The endpoint is specified as:
class LoggingGroupHandler implements GroupSubscriptionHandler, EndpointNameProvider, GroupProvider { @Override public String name() { return "BidDepthGroup"; } @Override public ProviderResult group(User user) { return new ProviderResult() { @Override public String name() { return user.getName(); } @Override public Object data() { return null; } }; } ... }
When a user subscribes to a Channel/Endpoint for the first time it will callback into the group method. The ProviderResult response consists of a name and an opaque data object.
The ProviderResult.name becomes the Group name and the data item is added to the topic handles created on this subscribe. The ProviderResult which is used as the initial subscribe is the data object which is permanently attached to the User for that Channel. Subsequent ProviderResults for others users assigned to the same group will be ignored (as far as the handle is concerned).
Once a user has been assigned a Provider result for a channel (for the current User session), all subsequent subscribes to the channel will inherit this Provider and no other callbacks will be made.
If the user subscribes on a new channel a new GroupProvider callback will occur.
Different channels for the same user can therefore have different group memberships and different Data objects.
Generally Group subscriptions would be expected to be used where reference type/pricing information is specific to a group of users, or multiple devices/windows exist for the same user and this can greatly simplify keeping these windows in sync.
Group info behaves identically to SharedSubscriptions and supports LastMessageSnapshot, Queues, MessageLoss and Subscription pinning in the same manner as Shared Subscriptions.
One may expect that Groups could provide guaranteed message queued behaviour in the same way as PrivateSubscriptions. However, this is NOT supported primarily as the axis of Group membership and ordered sequential delivery is essentially mutually exclusive. See the Batching docs for further discussion.
As a quick discussion of this. If 2 devices are attached to the same group and one device suffers network blocking or is very slow, the behaviour choices are either prevent any new data to device 1 until device 2 has space in its queue or register a callback to disconnect device 2 at which point the use of the group becomes pointless. For this reason (among others) a Group behaves like a shared topic albeit with a restricted server enforced membership.
For example Tiered reference data for groups of users for the same data topic (as far as the browser is concerned) is easily then achieved by using a Group topic with LastSnapshot enabled, or Tiered pricing can be supported in a similar manner as an alternative to having Channels that vary the pricing.
Generally these handles are lighter weight than the Private Subscriptions and provide a good balance of isolation and delivery. Many patterns that look initially as if they require Private Subscriptions can be solved with Groups with snapshots or small Queues. Conceptually one often thinks that no message loss must occur on a topic. However, in reality there are relatively few instances where large amounts of fast data for groups of users possesses this characteristic, and if it is a definite requirement either private topics should be used or the message depth should be of a known limited number.
Note: The Browser does not see nor has any visibility of the group name or that the subscription is part of a Group Endpoint. This means the browser for all users can use the same topics and the users can be isolated entirely on the server. Similarly, if the client tries to subscribe to a Topic using a group prefix its own group prefix will always be prepended to the topic on the server. This means it is impossible for a user not part of GroupA to subscribe to topics in GroupA.
Server initiation of Group Topics
Creating a group topic using the pinning mechanism works identically to normal topics however the format of the topic name in the method call should be GROUP_TOPIC.
This is how the group topic is represented internally and any group_topic combination can be pinned. For normal users the group part is invisible and as the group is always added to non-system subscribes, the user cannot supply a group topic even maliciously which will not be prepended with at least another group separator.
Users joining and leaving GroupSubscriptions
A parallel interface exists for GroupSubscriptions in the same way as described above for SharedSubscriptions:
public interface GroupSubscriptionMembershipHandler { /** * Called when a user session joins the topic. If the User object is not supplied as part of the authentication mechanism this can be null. * @param user - A user Object (if attached to the client) * @param sid - a session identifier for the user * @param handle - the handle of the topic */ public void join(User user, String sid, GroupTopicHandle handle); /** * Called when a user session leaves the topic through either an unsubscribe or the client being destroyed. * @param user - the user object (if applicable) * @param sid - the session identifier * @param handle - the handle */ public void leave(User user, String sid, GroupTopicHandle handle);
It is identical in behaviour to the SharedSubscriptionMembershipHandler but supplies a GroupTopicHandle.
Groups And Server Managed Topics
Server managed topics operate on groups in exactly the same way as with normal SharedSubscriptionEndpoints. However, the pin/unpin functionality must pass the group_topicname as the topic int he interface. This is split internally on the separator character “_” (or whatever separator is used on the channel) and the group subscription created.
For this reason the group name cannot contain the separator character.
Private Subscriptions
Private subscriptions have a number of differences to shared subscriptions.
- The client can send messages to a private subscription
- Even if the topic name is the same, each user actually has a private TopicHandle on the server
- All messages are queued on the server in the handle instance for each client individually as a non-lossy queue
- The handle is valid until either side calls close
- If the topic is closed on the server and the client sends another message the topic is recreated automatically
However, a PrivateSubscription endpoint still acts similarly to the Shared Subscriptions as a namespace at the topic level. Therefore a single user can have multiple private subscriptions to different topics simultaneously on the same endpoint.
Private endpoints are defined using:
class ConversationHandler implements PrivateSubscriptionHandler, EndpointNameProvider { @Override public String name() { return "RFQ"; } ... }
Queueing behaviour
PrivateSubscriptions have a default limit of 64 as their queue size. This can be altered by setting a Queue
depth as shown on the Shared Subscriptions (with either annotation or the EndpointQueueProvider interface):
@Queue(depth=128) class ConversationHandler implements PrivateSubscriptionHandler, EndpointNameProvider { @Override public String name() { return "RFQ"; } ... }
The startAtOldest option has no effect for private subscriptions as the Queue is NOT circular. Unlike shared topics messages attempting to be sent on the handle over this limit will be rejected as indicated by a response of false
from the onMessage
method. the queue size is not a count of the total messages that can be sent on the topic, rather it is a queue depth of number of messages that can be outstanding at any one time.
A queue size of 0 can be set to assign an unbounded queue for Private Subscriptions. In reality this sets the queue size to default to the maxPrivateQueueSize in the ModuleConfig (default 1024*256). This queue will also be a linked list and has no pre-allocation.
This is potentially dangerous as slow consumers or very fast message rates will not lead to back-pressure on the submit until the max number of messages is reached. If this occurs for a number of clients this can cause instability or Out Of Memory issues if the queues build up in the VM - use AT YOUR OWN RISK.
Unbounded queues are a linked-list type structure and therefore this form of queue also introduces a higher allocation overhead and more garbage as nodes are released. However, they can be useful to limit pre-allocation overhead from using fixed size queues and can be used to deal with uncertain or very bursty topics which may overflow the pre-allocation type queues.
Preallocation
Private Queues are also able to be configured without preallocation:
@Queue(depth=128) @QueueType(preallocated=false) class ConversationHandler implements PrivateSubscriptionHandler, EndpointNameProvider { @Override public String name() { return "RFQ"; } ... }
Or
@Queue(depth=128) class ConversationHandler implements PrivateSubscriptionHandler, EndpointNameProvider, EndpointQueueTypeProvider { @Override public String name() { return "RFQ"; } @Override public boolean preallocated(){ return false; } ... }
The primary use case for this is occasionally an endpoint needs a very large queue depth (In the thousands) but only at intermittently (for instance a State Of the World initial population). In this case the memory cost for keeping the arrays backing the queues is wasteful. Setting this to not be preallocated can save this memory cost at the expense of more GC, memory churn and slightly increased latency.
Messaging for Private Queues
Unlike shared subscriptions, the idiom to create create private topics is for the client to send a message to a topic string, rather than subscribe (removing the need for the subscription bootstrap). This will create a TopicHandle if one does not exist and reuse an existing one if the TopicHandle has not been closed. The callback then results in a call for each message:
@Override public void onMessage(User user,Object message,PrivateSubscriptionHandle handle) { LOGGER.debug("Conversation message from "+user.getName()+ handle.getTopic()); }
The PrivateSubscriptionHandle can be used to send multiple responses to the client. A Conversation type behaviour over the same Topic can then easily be constructed from these components. Similarly to the other handles the individual PrivateSubscriptionHandle is Not thread safe.
As with Request/Reply handles, the message payload is a byte[] and any action should take place in a separate thread and use the handle asynchronously.
Generally one would not expect private subscriptions to be used for pricing or similar as they are more expensive than the shared subscriptions, in terms of memory, garbage produced and performance. They are more suited to transient conversational scenarios such as RFQ or trading.
In order for the handle to be garbage collected and not be kept indefinitely it is necessary to call close either from the server or the client. For the client the close can include a payload which is delivered to the server in the onclose callback
@Override public void onClose(User user, Object message, PrivateSubscriptionHandle handle) { LOGGER.warn("Close message from "+user.getName()+ handle.getTopic()); }
The server calling close does not deliver a message to the client and a subsequent send by the client to the same topic will just automatically recreate the handle. However, calling close on the server enables the handle to be freedup for GC.
Batching within Topics
Batching for messages within a single private topic can be achieved by using the Batch annotation or implementing the BatchProvider interface:
class ConversationHandler implements PrivateSubscriptionHandler, EndpointNameProvider, BatchProvider { @Override public String name() { return "RFQ"; } @Override public int size(){ // a batch size of 10 return 10; } ... }
A batch for a single topic enables a single push cycle to attempt to batch together up to (n) messages into a single group of messages from the same Topic in a single message sent to the client. Whether a full batch is available depends on the space remaining in the outbound buffer for the user, the number of message queued and the size of the messages. E.g with a batchsize of 10, a default 256k outbound buffer and each message being ~5k and a single handle it is likely each batch will consist of 10 messages.
There are some major caveats with batching:
- A topic with a constant flow of messages will potentially starve other topic handles if its batch occupies the majority of the output buffer.
-
Very large batch size can lead to buffer bloat which will slow messages being pushed to the user as the TCP buffers will repeatedly reject large writes causing the socket channel to be penalised for having blocked writes.
- It is really only safe to use with a few topics concurrently, or for topics that have bursty but intermittent messaging. A single Topic pattern is an anti-pattern and unless data characteristics force you down this path it is strongly encouraged that you split data by topics and channels to take advantage of the inherent parallel behaviour of these as explained in the Batching docs. Rather than try and batch what is effectively a single HTTP Stream.
However, if you do have single private topics, or a few topics with sequences that have a burst type behaviour, batching can ease the HTTP polling roundtrip issues.
Direct Endpoints
Direct endpoints are outside the normal Rubris protocol and EngineIO stack. They are more analogous to the traditional HttpRequest/HttpResponse mechanism. However, there are differences.
- The endpoint name is actually a url path that is available to the browser
- A simplified form of HTTP request and HTTP response is provided
- The body and headers of the request are available in relatively raw form
This is easier to show as an example direct handler to process something like OKTA SAML request/redirect and response handlers.
Note: The Token parsing for HTTP headers always converts the keys to lower case (the values and other Token types are unaffected).
@Endpoint(name = "/sso") class OktaInitialPathHandler implements DirectPathHandler { @Override public void onRequest(DirectRequest req, DirectResponse response) { byte[] out; try { out = samlProvider.buildOutboundParams( "http://www.mydomain.com/meta/sso/response"); response.sendRedirect(samlProvider.redirectUrl + "?SAMLRequest=" + new String(out, StandardCharsets.US_ASCII)); } catch (Exception e) { LOGGER.warn(e); response.sendError(401); } } } @Endpoint(name = "/sso/response") class OktaSamlResponseHandler implements DirectPathHandler { ParamParser bodyParser = new ParamParser("SAMLResponse", Type.PARAM); @Override public void onRequest(DirectRequest req, DirectResponse response) { if (req.getBody() == null) { response.sendError(401); } byte[] token = bodyParser.parse(req.getBody(), req.getBody().length); if (token == null) { response.sendError(401); } else { try { String user = samlProvider.decodeResponse(token); response.setHeader("Set-Cookie", "X-AUTH-SSO=" + (user + "; path=/;")); response.sendRedirect("http://www.mydomain.com//startpage.html"); } catch (Exception e) { response.sendError(500); } } } }
Here we can see that the direct path will be (HttpConfig.servicePath + the endpoint name). In this case assuming a service path of “meta” this will become http://www.mydomain/meta/sso
and http://www.mydomain.com/meta/sso/response
.
The paths are greedy so unless another path is more specific it will also match any path that is an extension of that path (as we can see in the example). In the example above we initially land at the /sso url and issue a redirect to OKTA after building a SAML request. After logging in to the OKTA site the browser is redirected back the to the sso/response
url and the response is processed. If correct we set a cookie and redirect the httpRequest to the test landing page.
The DirectRequest is a simple version of the more traditional HttpRequest and gives access to
- the headers (all headernames are lower case by default - but can be accessed by mixed case lookups).
- the URI including the query params (if any) in its raw form (e.g. “/meta/sso/response?xx=yyy”)
- the body as a byte[]
- If the method type was POST or GET
- If the request was over XHR (if the header “x-requested-with” is set)
As with other Endpoints, the DirectReponse can be used asynchronously to respond, but can only be used once.
It has a number of convenience methods to make error response simpler e.g response.sendError(401)
. It can be used to set arbitrary headers, or send a redirect, or a body payload as a response. The content-length, date and CORS headers (if required) are added automatically and will be overwritten if set by the user.
From the example above we can see that we are using the set-cookie header to specify a cookie based on the SAML response which will then be used by the browser on the subsequent XHR EngineIO requests to enable for example the ConnectionHandler to validate this cookie and accept connections or not.
It is expected that DirectHandlers are not used for much outside the SSO or liveness status type connections for monitoring/loadbalancers etc and its usage is generally discouraged as all other messaging should be within the confines of the Rubris/Engine protocol. If you find yourself creating a few of these handlers one should reconsider how these can be expressed in one of the messaging forms (e.g RPC) or perhaps consider switching to use one of the good quality traditional Webservers or HTTP communication libraries.
General differences are:
- There is no such thing as a HTTP session in this interface (in fact Rubris does not use them at all)
- The user of the connection is not accessible as it will most likely not exist and has no meaning outside of the Rubris protocol.
Although one could see a usecase in serving for instance static files such as a javascript, css, html et,. Rubris comes with a FileCache mechanism entirely for that purpose that is far more efficient and supports GZIP and caching. The use of direct path handlers for that purpose is STRONGLY discouraged.
This type of endpoint is much more a get out of jail free where the engine-IO protocol does not allow certain types of behaviour. e.g setting up a general cookie, pre-flight authorisation for Cross-Domain XHR connections etc.
Connection Handling
The ConnectionHandler is one of the non-Endpoint types and is instead provided to enable optional authorisation for physical connections. The ConnectionHandler is called each time a new connection from the browser is made for a path that is under the service path (if running behind a proxy server/load balancer then the connection is the one from the proxy server only). Static resource requests, Direct Paths and the service.json payload are always served without authorising the connection.
A new connection is defined as a new TCP connection that is not currently known on the server. As browsers tend to make TCP connections in an adhoc manner and recycle existing socket connections, the arrival of new connections is somewhat unpredictable.
For instance in a normal flow with Chrome, the browser will make ~ 5 connections for static resources. It will then close immediately 1 or 2 of these and reuse the TCP socket for subsequent XHR connections or when upgrading to Websockets. It may also issue the websocket upgrade on a socket that has already made a Http poll or similar request, although as far as EngineIO is concerned on the client these are new transport connections.
Similarly, apart from websocket connections, the browser may shutdown a TCP connection used for HttpRequests and start a new one even though no error has been received on the existing one.
This means that Rubris treats the TCP connections as entirely orthogonal to the protocol. The user code is entirely responsible for accepting or denying these connections and the mechanisms by which this is achieved is entirely within the User’s control. An example is given below.
By default a dummy connectionHandler is configured which will accept all connections. To override this behaviour it is necessary to declare our own handler:
Note: The Token parsing for HTTP headers always converts the keys to lower case (the cookie names and uri params are provided as is).
@Token(name = "X-AUTH-SSO", type = Type.Cookie) @Token(name = "x-forwarded-for", type = Type.Header) class OKTAConnectionHandler implements ConnectionHandler { @Override public void onConnection(SocketAddress remoteAddress, final String uri, final Map<String, TokenData> tokens, boolean isCrossOrigin,boolean isXhr, final ConnectionHandle handle) { try { TokenData t = tokens.get("X-AUTH-SSO"); if (t != null) { if (LOGGER.isDebugEnabled()) { LOGGER.debug("XHR auth request "); } final String st = new String(t.getData(),StandardCharsets.UTF_8); //this is where we would call check the format of the token or originating ip address //must be called synchronously to allow request to proceed /* DO NOT DO ANYTHING LONG RUNNING OR NETWORK LOOKUP INCLUDING ADDDRESS RESOLUTION HERE */ handle.accept(); } else { handle.deny(); } } catch (Exception e) { LOGGER.warn("Error processing token ", e); // make sure we callback to enable outbound write handle.deny(); } } }
Following on from our OKTA example we have defined a ConnectionHandler that is looking for the cookie we have previously set in our Direct Path handler and rejecting connections that do not have such a token. The callback provides us with the remote address and some other information that we can use to determine if we want to accept the connection or not.
This is the only API which is synchronous in nature. As it is synchronous every effort should be made to perform this action as quickly as possible and actions such as remote network calls are a REALLY BAD IDEA.
As this is per physical connection, this may be called for the same user more than once (as we do not know who the user is until the authorisation has been performed) depending on the browser and how it handles connections.
However, once accept is called the physical connection is marked as authorised and subsequent HTTP requests on the same physical socket (even if EngineIO believes this is a new transport connection) will not trigger a call to this interface. In many ways this is sort of similar to in memory type scoping for transient cookie, however it is impossible for the physical socket to survive in the same way that a transient cookie could do. It also means that the User code can if it so desires tie cookies to IP ranges or other information to prevent cookies being used from other IPs (although this is dangerous given leased IP blocks and mobile clients).
As this is not a general purpose HttpRequest the mechanism to obtain parts of the original request is defined using the TokenData annotations. This is a repeating annotation which can be used to grab headers, cookie values or url parameters from the actual request triggering this callback.
As we can see above, this endpoint is asking for a cookie called “X-AUTH-SSO” and the “x-forwarded-for” header. The values (if present on the request) will be present in the params map. The payload data is a byte[] on the tokenData object stored against the key.
This enables us to do some simple things such as accept a multi-domain cookie set on another instance in a micro-architecture without having to go around the login/re-auth loop, use custom headers or request params on redirects to allow the connection to be authenticated without cookies, use cookie upgrade mechanisms or token negotiation etc. Rubris is entirely agnostic to how the User code achieves this and does not use its own session or similar cookie mechanism. See the Engine-IO guide for details on the connection lifecycle and how its SIDS are managed.
Further information on Connection and User authorisation can be found in the UserManagement.
Note: For Proxy servers (such as running inside AWS) where the connection is generally load balanced among multiple active users this mechanism is not much use (apart from identifying the originating IP) and hence it is not recommended as away of authorising users. Rather it is a way of providing an extra restriction for things like IP restriction.
Client Handles
The UserClientHandler is used to authenticate a User when a new EngineIO session is created on the browser. This is orthogonal to the connection handler and indeed the ConnectionHandler should be used for Connection level information and the UserClientHandler for ALL User level authorisation.
This is especially true for running behind a Proxy such as AWS ELB as the connections will not be 1-to-1 with the Users but the UserClientHandler will be.
As with other normal handlers that provide a Handle parameter, all operations in this callback should be run asynchronously.
Further information on Connection and User authorisation can be found in the UserManagement docs.