Client Guide
The following is a quick guide as to how the Client JS library is used and the options available.
Loading the Client
The rubris JS library is loaded into the browser using the following pattern.
<script src="/$some_path$/js/rubris-io.js"></script>
Dependencies
The following dependency (at a minimum) is required
<script src="/$some_path$/js/engine-io/1.6.8/engine.io.js"></script>
This provides the engine-io basic functionality that Rubris hooks into.
For IE11 you will need to use the TextEncoder polyfill as it is not supported natively:
<script src="/$some_path$/js/text-encoding/encoding.js"></script>
Initialising
Multiple instances of the rubris object can be held on a page or on different tabs and are constructed like so:
var rubris = new Rubris({});
the above constructor shows no options set in the library.
Limitations
NOTE: ALL messageIds must be wrapped to unsigned 32-bit integers or the server/client libraries will truncate the values.
The following function wraps at 4294967295.
function wrap32BitUnsigned(num){
return num >>> 0
}
messageId = wrap32BitUnsigned(messageId+1)
IMPORTANT: the “message” instance returned is reused (to prevent the creation of thousands of objects by the library) for each callback. You MUST extract the data you want from the message in the callback. It is NOT safe to cache the actual message itself unless a deep copy is made.
Callback APIs
Rather than wrapping EngineIO, the Rubris library plugs itself into the EngineIO library to try and provide a small amount of intrusion into the natural style of using EngineIO.
// create engine socket as normal
// use http as the protocol if no ws required
socket = eio.Socket({ path : uri.path , port: uri.port,
protocol: "ws", host: uri.host });
socket.binaryType = 'arraybuffer';
// on socket open set socket on rubris to enable use of socket event emitter
socket.once('open', function(){
rubris.setSocket(socket)
// make the initial channels call and set up the library
// this must be done first as all messages require a channelid
rubris.initChannels();
....
}
Once the root object is initialise then we can register callback functions to receive the response data.
The callbacks shown below are provided by hooking into the Engine-IO event mechanism to extend the events generated. It is important to bear in mind that the message
callback is consumed by Rubris (although you can register a proxy if necessary to wrap the Rubris message parsing if required).
IMPORTANT: the “message” callback MUST be set in order for Rubris to parse the response messages.
You MUST extract the data you want from the message in the callback. It is NOT safe to cache the actual message itself unless a deep copy is made.
// The engineIO callback for messages.
// Register Rubris library to parse or wrap with own function
socket.on('message', function(data){
rubris.parseMessage(data);
});
// Following callback functions will be called with above message registration
// callback resulting from init message with the all channels and status
socket.on('channels', function(data){
...
});
// RPC responses occur in this callback
socket.on('rpc', function(data){
...
});
// subscribe ack message with status of subscribe
socket.on('subscribeResponse', function(data){
...
});
// data payload messages from topic subscribed to (can arrive before
// subscribe ack)
socket.on('push', function(data){
...
});
// conversation response messages
socket.on('conversation', function(data){
...
});
//if rubris login/logout is used ( optional and may be removed)
socket.on('logout', function(data){
...
});
socket.on('login', function(message){
...
});
As Engine-IO is asynchronous it is only after the channels message has been processed that messages can be sent (as you require the channelId to create a message. Therefore all RPC and subscribe message can only be created after the on("channels"...)
callback has been called.
Sending data
Sending data utilises the Engine-IO socket library send
method with a few methods provided to generate the appropriate message formats for the Rubris protocol itself.
The general format to send data is:
socket.send(rubris.xxxxMessage($PAYLOAD));
The xxxMessage above is a placeholder for one of the specialized message constructors Rubris provides for each message type. You cannot mix and match messages types and Endpoints.
Message types
Channels message
The channels message is the first message that must be sent. No other Endpoint can be addressed without this. The recommended mechanism is to set up the callbacks first for the Engine-IO socket then set up the channels initialisation as shown above. e.g:
rubris.initChannels();
Processing the Channels then involves iterating over the returned set where we can set up the structures that suit our purposes. For example, below we partition the listeners by channel and set up 3 drop downs type:
function displayChannels(channels){
var d = new Date(channels.timestamp)
$('#rpcChannelsTime').text( d.getHours() + ":"+d.getMinutes() +
":"+d.getSeconds());
var rpcContainer = setupChannelList('#channelList');
var sharedListContainer = setupChannelList('#sharedChannelList');
var conversationListContainer = setupChannelList('#conversationChannelList');
for (var key in channels) {
var chan = channels[key]
if (channels.hasOwnProperty(chan.name)) {
channelsById[chan.id]={}
if(chan.type ==0){
rpcContainer.append('<option>'+chan.name+'</option>')
.val(chan.id);
}
if (chan.type ==1){
sharedListContainer.append('<option>'+chan.name+'</option>')
.val(chan.id);
}
if (chan.type ==2){
conversationListContainer.append('<option>'+chan.name+'</option>')
.val(chan.id);
}
}
}
}
Available types are:
public static final byte RPC_ENDPOINT = 0;
public static final byte SHARED_ENDPOINT = 1;
public static final byte CONVERSATION_ENDPOINT = 2;
Although the channels response is asynchronous listening for the channels
callback tells us when this has completed. The resulting Channel set is retained int he the Rubris Object has an array of Channel objects accessible as rubris.channels
. The Channel array is constructed as an array keyed on channel name int he following way:
channels[channelName] = {
type: type,
id: id,
name: channelName
};
This will generally result in a call along the lines of
socket.send(rubris.rpcMessage(PAYLOAD), rubris.channels[SESSION_DATA_CHANNEL].id,
nextMessageId()));
Subscribe/Unsubscribe messages
To send a subscription one must construct a subscribe call as shown below:
var channelId = rubris.channels["news"].id
socket.send(rubris.subscribeMessage([subscribe_array],[unsubscribe_array]
, channelId, messageId));
ChannelId is the numeric id of the channel you want to send on as returned from the channels callback. This means that you must wait for the channels call to return before being able to send any message, as the channelIds are entirely controlled on the server and are not guaranteed to be the same across restarts.
MessageId is the sequence number of the message.
The messageId can be any number and is used in the response of the subscribe ack as a correlation id. Note: the messageId is a 32bit unisgned number and will wrap at this count so a helper method to wrap the message id is useful. e.g:
function wrap32BitUnsigned(num){ return num >>> 0 } messageId = wrap32BitUnsigned(messageId++);
For a subscribe only message, simply send an empty [] as the unsubscribe list and for unsubscribe only send an empty [] as the subscribe list.
The total list size is currently recommended to be no more than 1,000 topics in a single message (although 2048 is the max size). For more than this simply chunk up the topics into multiple messages. There is no need to wait for each response in order to do this. The response messages can be correlated using the MessageId.
Compressing Subscribe messages
Although mostly one would not need to compress the subscription topics, occasionally there is a need to do this.
For instance, if the topic names are extremely long and there are very many, or if you need to send multiple batches of thousands of topics in sequence (Engine-io will batch up send messages if a POST is currently in flight, so when this is fianlly sent under the covers (when the XHR connection is again writable) one could get a very large number of messages in a single HTTP POST leading to very large HTTP sizes which could be an issue for intermediate proxies).
To ameliorate some of this sizing effect, it is possible to tell Rubris to compress the subscription strings by configuring a JS callback to compress the bytes to be sent overloading the send callt o include a compress parameter.
// set the compressor on rubris // here we use pako in a similar way to dealing with the decompressor // path from the server for large messages rubris.setCompressor(function(data){ return pako.deflate(data,{ level: 1}); }); ... // overloaded compress flag at end of args socket.send(rubris.subscribeMessage(subArr,[] , channelId, messageid,true));
This flag tells rubris to callback into the compressor after the subscribe bytes are set and send these. Additionally, a compress flag is | to the message header. The MessageId and channel remain in plain text, but the message body is compressed and the returned array is used instead.
The contract for the compressor function is that it must accept a Uint8array buffer and return a UInt8Array buffer.
This will not allow more topics to be sent in each send (as we do not want a single client to be able to spam the subscribes on the server), but it will make each message in a batch of messages smaller. Obviously, there will be some time/memory overhead in uncompressing this on the server, but if you have many topics and this is resulting in large HTTP POST bodies, the win at the network layer should outweigh the extra work. Note: there can be multiple compressed messages in a single HTTP Request, or a mix of compressed and non-compressed if message types are mixed, or queued in the send function.
The compressor is arbitrary and you can register your own on the server to deal with whatever library is chosen. See Large Messages on how to set the compression function on the server
There is NO compress flag for RPC or conversation messages as the body is directly accessible on the server in your own code (and opaque to rubris) and if you want to compress this, you should deal with it prior to sending, and on the server decompress in your own handlers.
If the compressed data is >= the plain data size the message will be sent as is without compression.
Subscribe Responses
The subscribe response message returns back the number and numeric index (in the original array that was sent) of the topics that resulted in an error on the server. This is defined as topics that threw an Exception on the subscribe attempt (due to error or rejection).
This can be checked on the response message by checking the message.failedCount
on the subscribeResponse callback.
If the failedcount >0 then the payload body of the message will be a JS Object with a property of the failed index set to 1 (if there are multiple failures then multiple entries will be present).
e.g - you can then work out the failed ones by examining the returned object:
// assume sub below is an object that property index recorded the
// outgoing topic index
// for each subscription
if (message.payload && message.payload.type==
rubris.protocol.payload.jsObject){
if (message.payload.data[sub.index]){
// we know this subscribe topic failed ...
}else{
// this subscribe topic succeeded
}
}
RPC messages
To construct an RPC the following form can be used.
socket.send(rubris.rpcMessage($PAYLOAD, channelId, messageid));
ChannelId is the numeric id of the channel you want to send on as returned from the channels callback.
MessageId is the sequence number of the message.
The payload can be a String or an ArrayBuffer. A String is converted internally to an ArrayBuffer using the native or shim textEncoder (if you want control as to how this is serialised one should perform this before calling the send).
Responses are roundtripped using the messageId you originally sent as a correlation identifier. Note, as the server can send multiple responses on the handle (for chunking etc) you can get multiple responses with the same id and some internal payload identifier should be used in this case to distinguish these.
Conversation messages
Conversations are slightly different in that they are essentially a mix of a send and a subscribe at once. Accordingly the format is:
socket.send(rubris.conversationMessage(topic, message , channelId, messageId));
ChannelId is the numeric id of the channel you want to send on as returned from the channels callback.
MessageId is the sequence number of the message.
Each message includes a topic string that is used to implicitly create a subscription on the server (if it does not exist yet) or send it to a handle with the same name if a previous subscribe has been sent on the same topic. As with SharedSubscriptions the topic can be any string and any number of personal topics can be active at once.
There is no bulk subscribe equivalent for Private Subscriptions as the expectation is that these types of conversations are likely to be less numerous than in the Shared use cases.
Closing a conversation is achieved in the following manner:
socket.send(rubris.closeConversationMessage(topic, message ,
channelId, messageid));
The close has no acknowledgment message from the server. Sending a message to a topic that has previously been closed will recreate the topic on the server. Similarly, it is possible that the server will send a response on a topic that the client has already closed.
Response messages
The response messages in all the the callbacks are defined in the following prototype:
function Message(type, timestamp, channel, id, status, payload) {
this.type = type;
this.timestamp = timestamp;
this.id = id;
this.channel = channel;
this.status = status;
this.payload = payload;
};
Not all fields are on each message type. Push messages from Shared or Private Subscriptions contain a topic field that is the string value of the topic. This value is not present on RPC responses. Similarly, Push messages do not carry timestamps (as this adds to he message overhead for high volume updates).
The payload object is:
function Payload(data, type, start, end) {
this.data = data;
this.start = start;
this.end = end;
this.type = type;
}
The payload types are shown below in the protocol definition. In most cases the payload will be a DataView for a push message.
This can be overridden by changing the function pushMessageProvider
in the Rubris object once constructed.
It is worth bearing in mind that the response messages can be (and often are) batched and the response messages are really views over a single ArrayBuffer. Therefore one should take a slice of the buffer if you want to retain it, or if you need to transform it. e.g:
var length = message.payload.end - message.payload.start;
var ab = message.payload.data.buffer.slice(message.payload.start, message.payload.end);
Rubris provides a convenience function to extract the payload as text if your messages on a channel happen to be in something like JSON or FIX and you want to deal with them as Strings:
var sessions = JSON.parse(rubris.getPayloadAsText(data.payload));
This uses the native TextDecoder(or polyfil if provided) and is generally very fast. However, you can always extract the payload yourself if you want more flexibility.
Protocol
Rubris externalises a protocol object which can be used as the source for constants and type information for the messages. This is obtained by calling:
rubris.protocol
// defined as
var prot = {
littleEndian: true,
version: 1,
batchIndicator: 255,
serverTokenLength: 24,
maxTopicBatch: MAX_TOPICS,
endpoint: {
rpc: 0,
shared: 1
},
payload: {
text: 0,
uint8: 1,
arraybuffer: 2,
dataview: 3,
jsObject: 4
},
status: {
success: 0,
error: 1,
authFail: 2
},
message: {
channels: 1,
rpc: 2,
subscribe: 3,
conversation: 4,
closeConversation: 5,
login: 6,
logout: 7,
push: 9
}
};
The protocol can be used to determine message type, payload type, message status and whether we treat the binary data in a little endian or not (by default this is true and generally the server uses native ordering - for intel this is always littleEndian).
An example of its use would be something like:
$('#myselector').html( (message.payload.data.getFloat64(message.payload.start,
rubris.protocol.littleEndian)).toFixed(5));
Assuming the data is of the correct type to construct a view from.
Payload types
The following are the current payload types
- RPC - uint8 array
- channels - String []
- Subscribe/unsubscribe ack - text string (for auth failure) - jsObject for failed topics - failedCount (as additional property)
- Push/Conversation - dataView
As Rubris will always batch the data on the server if it can. A single EngineIO message from the server can represent multiple Rubris messages (potentially of different types). Accordingly the library tries to choose a payload type which suits this in the common case.
As the push data is assumed to be correlated and frequently batched , the dataview allows the library to provide a slice into the message batch without having to copy the array buffer from the socket. This small overhead can be done away with as shown above if you really want to access the raw unint8 array without having to construct a dataview for each message (Although this cost is mostly minimal).
Message correlation
For all RPC/subscribe message the returned messageId will match the original messageId sent. For Push messages the messageId is a sequence in ascending order.
For conversation messages the response sequence will always start at messageId given in the previous send message on that topic. This enables the user to use the sequence to pass semantic information.
Compressed Messages from Server
As detailed in the Large Message & Compression section of the server docs, it is possible for messages from the server to be configured to be compressed.
This cannot be handled directly by the browser as the compression is set per message, rather than for the transport. It is intended that compression is used sparingly (if at all), as Rubris is a messaging library and other solutions are better suited to sending megabytes of data to the browser.
However, when it is required it is straightforward to configure. For the default Deflate compression it is recommended to use a library such as Pako (http://nodeca.github.io/pako/).
This is added to Rubris by adding the pako library to your HTML file.
<script src="/mypath/js/pako/1.0.5/pako-inflate-min.js"></script>
Pako (or whatever library you use) needs to be hooked into the rubris library by adding a small wrapper function to act as the decoder interface. Like so:
var inflator = new pako.Inflate(); rubris.setDecompressor(function(data){ return pako.inflate(data); });
The contract for the decompressor function is that it must accept a Uint8array buffer and return a UInt8Array buffer.
Note this version the JS for rubris is backwards compatible with the earlier versions. However, earlier versions of the client will not understand compressed messages and will report them as unknown message types.
This is the mirror of the compressed subscribes detailed above.
Querying endpoints
Rubris allows the lookup of the service path and any direct endpoints configured using a call to service.json at the root of the domain.
e.g. assuming this is running locally
get 127.0.0.1/service.json
This returns something like:
{"servicepath": "login", "directpaths": ["/login/sso/xhr","/login/sso"]}
The service path is required as the base for all protocol calls and the directpaths are otherwise configured direct urls. Using this mechanism enables the application to bootstrap its service path knowledge from the server if required.
Note: this path can also be used for loadbalance keepalive or similar ping behaviours if required without doing any extra work.