Server Examples
Starting and Configuring
Following the quick start guide we will start with a simple launcher for the application and build this out. There is nothing stopping you wiring things together using Spring or any other framework if you choose. the below is just minimal clutter.
public class ServerLauncher { public static void main(String[] args) throws Exception { ServerLauncher launcher = new ServerLauncher(); int port = 0; if (args.length > 0) { port = Integer.parseInt(args[0]); } String path = "mypath"; if (args.length > 1) { path = args[1]; } launcher.run(port, path); } public void run(int port, String path) throws Exception { Server s = new Server(); s.getConfig().getSocketConfig().setListenPort(port); ((HttpConfig)s.getConfig().getTransportConfig(ModuleConfig.HTTP)) .setServicePath(path); s.init(); CountDownLatch l = new CountDownLatch(1); l.await(); s.close(); } }
The above setup is not particularly useful as there are no endpoints to use. However, let us first tune the common settings.
Configuring the Engine-IO Heartbeat
If the Engine-IO heartbeat default of every 30 seconds (with a 30 seconds timeout) does not suit then we can alter this:
public void run(int port, String path) throws Exception { Server s = new Server(); s.getConfig().getSocketConfig().setListenPort(port); ((HttpConfig)s.getConfig().getTransportConfig(ModuleConfig.HTTP)) .setServicePath(path); configureServer(s); s.init(); CountDownLatch l = new CountDownLatch(1); l.await(); s.close(); } protected void configureServer(Server server){ EngineIOConfig eioC =((EngineIOConfig) server.getConfig(). getTransportConfig(ModuleConfig.ENGINEIO)); eioC.setPingInterval(25000); eioC.setPingTimeout(25000); }
Be aware that long running processing, network stalls, proxy bottlenecking or browser suspension will potentially cause some significant deviation from these timings and the Client may well close its current instance based on these effects. This is especially true as one starts to try and use very short heartbeat times as an indication of immediate liveness. This pattern is strongly discouraged and other approaches such as auto-roundtripped confirmation tokens are a far better way of dealing with this.
Configuring the Server Timeout
As well as configuring the heartbeat from the client we probably want to change the server’s monitoring of the client liveness. This is defined as the last time an inbound activity (of any type) occurred on the Server for a client.
protected void configureServer(Server server){ EngineIOConfig eioC =((EngineIOConfig) server.getConfig(). getTransportConfig(ModuleConfig.ENGINEIO)); eioC.setPingInterval(25000); eioC.setPingTimeout(25000); server.getConfig().setTimeoutCheck(60000); server.getConfig().setClientTimeout(45000); }
These 2 settings are different in that the Engine-IO heartbeat is the Client’s view of the responsiveness of the server and the latter is the server’s view of the Client’s activity.
Adding Request Reply
Now we have set up the server we can add out first Endpoint (or Channel). the channel name and id will be returned to the server as part of the Channels
call from the Client which returns all the available channels on the server instance as well as their types and Ids (the ID is required prior to making any call - do not hardcode the ID in the client as it can change depending on the startup order).
public void run(int port, String path) throws Exception { Server s = new Server(); s.getConfig().getSocketConfig().setListenPort(port); ((HttpConfig)s.getConfig().getTransportConfig(ModuleConfig.HTTP)) .setServicePath(path); configureServer(s); List<Object> userHandlers = new ArrayList<Object>(); createAPI(userHandlers); s.setUserHandlers(createAPI(userHandlers)); s.init(); CountDownLatch l = new CountDownLatch(1); l.await(); s.close(); } protected void createAPI(List<Object> userHandlers) { userHandlers.add(new RPCEchoHandler()); } @Endpoint(name = "RPCEcho") class RPCEchoHandler implements AsynchronousHandler { AtomicInteger mCount = new AtomicInteger(); @Override public void onMessage(User user, Object message, DataHandle handle) { if(handle.onMessage(message)){ if(LOGGER.isDebugEnabled()){ LOGGER.debug("Sent message count "+mCount.incrementAndGet() ); } }else{ LOGGER.warn(user+ "Unable to send message "); } } }
A More Realistic handler
In a normal situation we want to ensure that any processing does NOT BLOCK the calling thread under any circumstances. Accordingly the common pattern is to ensure that all processing is done asynchronously. Any of the thread handoff idioms are adaptable to this task (Executor Handoff, Work Queue, Sharded Queues etc), it purely depends on how you want to approach it. For this example we will choose a direct Executor handoff as we do not care about ordering.
@Endpoint(name = "MyChannel") class MyChannelHandler implements AsynchronousHandler { private ExecutorService service; private DownStreamProcessor processor; @Override public void onMessage(User user, Object message, DataHandle handle) { service.execute(new Runnable() { @Override public void run() { // Do some processing Result result = processor.process(message); handle.onMessage(result.toSerialisedBytes()); } }); } public void setService(ExecutorService service){ this.service =service; } }
That is pretty much all there is to adding an asynchronous endpoint and in many ways it is not much different than regular HTTP request response handling.
Of course one would expect a number of Channels
in the application, as it is usually best to limit the channel to a specific function if possible and compose functionality.
Be aware that using a simple threadpool as above and the given the nature of the asynchronous handling you can receive other messages from the same client before the original request has been processed or replied to. Although Rubris delivers the messages in order - if you want to retain this processing , you must use some form of handoff that retains this behaviour (e.g a sharded work queue or similar). However, this is completely controlled by how you have written the client and whether it waits for replies returned in its poll method before submitting again on the same channel.
Adding Shared Subscriptions
Lets do something a little more useful by adding a subscription Endpoint anyone can create topics on, that automatically caches the last message for new joiners and sends a new double value randomly until all the clients have unsubscribed.
protected void createAPI(List<Object> userHandlers) { userHandlers.add(new RPCEchoHandler()); userHandlers.add(new SimpleSharedHandler()); } @LastMessageSnapshot class SimpleSharedHandler implements SharedSubscriptionHandler, EndpointNameProvider { private ConcurrentHashMap<String, ScheduledFuture<?>> subscriptions = new ConcurrentHashMap<String, ScheduledFuture<?>>(); private ScheduledExecutorService service; @Override public String name() { return "SimpleShared"; } @Override public void onSubscribe(TopicHandle handle) { // do not use put if absent as we need to overwrite and cancel existing // if subscriptions // arrive before unsubscribe complete cancelFuture(subscriptions.put(handle.getTopic(), service.scheduleAtFixedRate(new Runnable() { int count=0; @Override public void run() { double d = ThreadLocalRandom.current().nextDouble(); long longBits = Double.doubleToLongBits(d); byte[] bytes = new byte[8]; // encode to little endian BitUtils.writeLEfloat64(bytes, longBits, 0); if (!handle.onMessage(bytes){ LOGGER.warn(handle.getTopic()+" Failed to push message "+count); }else{ count++; } } }, 1, Math.max(1, ThreadLocalRandom.current().nextInt(2000)), TimeUnit.MILLISECONDS))); } @Override public void onUnsubscribe(TopicHandle handle) { // make sure we remove the object instance itself - not rely on the key cancelFuture(subscriptions.remove(handle.getTopic(),handle)); } protected void cancelFuture(ScheduledFuture<?> future){ if (future !=null){ future.cancel(true); } } }
Safely Share Handles
The handles are not Thread safe and therefore if we have a situation where we could get concurrent access (say from some form of messaging system) we need to ensure this is enforced:
A useful pattern is something similar to the StripedLock (assuming we have something like a Notifier interface and a QueueService which pushed the data upwards)
@LastMessageSnapshot class StripedharedHandler implements SharedSubscriptionHandler, EndpointNameProvider, Notifier { private ConcurrentHashMap<String, TopicHandle> subscriptions = new ConcurrentHashMap<String, TopicHandle>(); private ExecutorService service; private QueueService queue; private Striped<Lock> locks = Striped.lock(1000); @Override public String name() { return "StripedShared"; } @Override public void onSubscribe(TopicHandle handle) { // do not use put if absent as we need to overwrite service.execute(new Runnable() { @Override public void run() { if (subscriptions.put(handle.getTopic(), handle) ==null){ queue.newSubscription(handle.getTopic(), StripedharedHandler.this); } } }); } @Override public void onUnsubscribe(TopicHandle handle) { // make sure we remove the object instance itself - not rely on the key if (subscriptions.remove(handle.getTopic(), handle)) { service.execute(new Runnable() { @Override public void run() { queue.removeSubscription(handle.getTopic(), StripedharedHandler.this); } }); } } public void notify(String topic, Object message) { TopicHandle h = subscriptions.get(topic); if (h != null) { Lock l = locks.get(topic); l.lock(); try { h.onMessage(message); } finally { l.unlock(); } } } }
Other alternatives include wrapping the handle with a class that enforces serial access either in the map (although this can have many complications)/before passing it to the downstream service, or using the handle itself as the lock object, or using a lock an object stored on the handle using the data field.
Adding Queuing
Now lets add some queueing to the shared handle so that users that are late joining can start at the beginning of the queue and we want existing the users to receive a number of messages without losing them (Note: this is a weak guarantee depending on the push frequency and client consume speed if we get queue wrapping behaviour).
@Queue(depth=64, startAtOldest=true) class OrderedMessageHandler implements SharedSubscriptionHandler, EndpointNameProvider{ ConcurrentHashMap<String, ScheduledFuture<?>> subscriptions = new ConcurrentHashMap<String, ScheduledFuture<?>>(); private ScheduledExecutorService service; @Override public String name() { return "news"; } @Override public void onSubscribe(TopicHandle handle) { subscriptions.put(handle.getTopic(), service.scheduleAtFixedRate(new Runnable() { int count = 1; @Override public void run() { handle.onMessage("some news message here "+ count); } }, 10, Math.max(3, ThreadLocalRandom.current().nextInt(10000)), TimeUnit.MILLISECONDS)); } @Override public void onUnsubscribe(TopicHandle handle) { ScheduledFuture<?> future = subscriptions.remove(handle.getTopic(),handle); if (future != null){ future.cancel(true); } } }
We can see from the above example that the code has not really changed from the normal way of writing the Channel
. Only the annotation has changed. This particular annotation sets up a circular queue of 64 elements, and new users will start at the oldest message and fast forward themselves up to the last message.
Monitoring Topic Membership
@Queue(depth=64, startAtOldest=true) class OrderedMessageHandler implements SharedSubscriptionHandler, EndpointNameProvider, SharedSubscriptionMembershipHandler { ConcurrentHashMap<String, ScheduledFuture<?>> subscriptions = new ConcurrentHashMap<String, ScheduledFuture<?>>(); private ScheduledExecutorService service; @Override public String name() { return "news"; } @Override public void onSubscribe(TopicHandle handle) { subscriptions.put(handle.getTopic(), service.scheduleAtFixedRate(new Runnable(){ int count = 1; @Override public void run() { handle.onMessage("some news message here "+ count); } }, 10, Math.max(3, ThreadLocalRandom.current().nextInt(10000)), TimeUnit.MILLISECONDS)); } @Override public void onUnsubscribe(TopicHandle handle) { ScheduledFuture<?> future = subscriptions.remove(handle.getTopic(),handle); if (future != null){ future.cancel(true); } } @Override public void join(User user, String sid, TopicHandle topic) { LOGGER.info("User joined "+user+" sid:"+sid+ " "+ topic.getTopic()); } @Override public void leave(User user, String sid, TopicHandle handle) { LOGGER.info("User left "+user+" sid:"+sid+ " "+ handle.getTopic()); } }
The topic membership enables us to get callbacks when users join or leave a topic.
However, this is not free and imposes a somewhat onerous cost on the subscribe mechanism if you are dealing with many topics in a single subscribe message from the client.
DO NOT use this on a handle where you expect to have thousands of subscriptions as it will impact the overall processing speed.
The method is also synchronous so anything other than logging should be run on a separate thread.
Creating a Topic at Startup
Some times we want to create the topic at startup to allow us to set a value for the first client and get rid of the overhead of perhaps going to downstream systems. This is particularly useful for LastMessageSnapshot
or Queue
topics which we want to act as memory persistent topics that do not disappear with the clients themselves.
@Queue(depth=64, startAtOldest=true) class OrderedMessageHandler implements SharedSubscriptionHandler, EndpointNameProvider, SubscriptionManager { ConcurrentHashMap<String, ScheduledFuture<?>> subscriptions = new ConcurrentHashMap<String, ScheduledFuture<?>>(); private ScheduledExecutorService service; @Override public String name() { return "news"; } @Override public void onSubscribe(TopicHandle handle) { subscriptions.put(handle.getTopic(), service.scheduleAtFixedRate(new Runnable(){ int count = 1; @Override public void run() { h handle.onMessage("some news message here "+ count); } }, 10, Math.max(3, ThreadLocalRandom.current().nextInt(10000)), TimeUnit.MILLISECONDS)); } @Override public void onUnsubscribe(TopicHandle handle) { ScheduledFuture<?> future = subscriptions.remove(handle.getTopic(),handle); if (future != null){ future.cancel(true); } } @Override public void topicManager(TopicManagementHandle handle) { handle.createOrPinTopic("EMEA News"); } }
The above code will create a subscription, which will also trigger the subscribe callback.
We could also stash the TopicManagementHandle
somewhere and use it at any time (for instance send off to get the set of startup topics from a remote service and set them using this handle when they return).
Adding Group Subscriptions
Multi-window or Device User Scoped Group
@LastMessageSnapshot class TimingGroupHandler implements GroupSubscriptionHandler, EndpointNameProvider { ConcurrentHashMap<String, ScheduledFuture<?>> subscriptions = new ConcurrentHashMap<String, ScheduledFuture<?>>(); private ScheduledExecutorService service; @Override public String name() { return "groupsHandler"; } @Override public void onSubscribe(GroupTopicHandle handle) { subscriptions.put(handle.getTopic(), service.scheduleWithFixedDelay( new Runnable(){ @Override public void run() { try { double d = ThreadLocalRandom.current().nextDouble(); long longBits = Double.doubleToLongBits(d); byte[] bytes = new byte[1024]; BitUtils.writeLEfloat64(bytes, longBits, 0); handle.onMessage(bytes); } catch (Exception e) { LOGGER.warn("Fail to push message ", e); } } }, 10, Math.max(3, ThreadLocalRandom.current().nextInt(2000)), TimeUnit.MILLISECONDS)); } @Override public void onUnsubscribe(GroupTopicHandle handle) { ScheduledFuture<?> future = subscriptions.remove(handle.getTopic(),handle); future.cancel(true); } }
The Group above is defaulted to the username. The User object is entirely defined by you (as shown below).
This means that all subscriptions to the same topic from the same username will have a shared subscription that is effectively private to the User. This makes is straightforward to do multi-window, or multi-device apps by setting the group to the username for example without having to have complex cross-window or synchronized messaging.
Scoped to an Arbitrary Group
@LastMessageSnapshot class GroupCustomHandler implements GroupSubscriptionHandler, EndpointNameProvider, GroupProvider{ ConcurrentHashMap<String, ScheduledFuture<?>> subscriptions = new ConcurrentHashMap<String, ScheduledFuture<?>>(); private ScheduledExecutorService service; @Override public String name() { return "groupFirmHandler"; } @Override public void onSubscribe(GroupTopicHandle handle) { subscriptions.put(handle.getTopic(), service.scheduleWithFixedDelay( new Runnable(){ @Override public void run() { try { double d = ThreadLocalRandom.current().nextDouble(); long longBits = Double.doubleToLongBits(d); byte[] bytes = new byte[1024]; BitUtils.writeLEfloat64(bytes, longBits, 0); handle.onMessage(bytes); } catch (Exception e) { LOGGER.warn("Fail to push message ", e); } } }, 10, Math.max(3, ThreadLocalRandom.current().nextInt(2000)), TimeUnit.MILLISECONDS)); } @Override public void onUnsubscribe(GroupTopicHandle handle) { ScheduledFuture<?> future = subscriptions.remove(handle.getTopic(),handle); future.cancel(true); } @Override public ProviderResult group(User user) { MyUser myUser = (MyUser)user; return new GroupClientProcessor.DefaultProvider(myUser.getFirmId(),null); } }
The code above shows how we can use attributes of the User object (or anything else we want to scope all users to a firm or any other axis we choose), so all users that belong to the same group get the same message.
Adding Private Subscriptions
class ConversationHandler implements PrivateSubscriptionHandler, EndpointNameProvider { ConcurrentHashMap<String, ScheduledFuture<?>> subscriptions = new ConcurrentHashMap<String, ScheduledFuture<?>>(); private ScheduledExecutorService service; @Override public String name() { return "RFQ"; } @Override public void onMessage(User user, Object message, PrivateSubscriptionHandle handle) { subscriptions.put(((SessionUser)user).getSid() +"_"+handle.getTopic(), service.scheduleWithFixedDelay(new Runnable() { int count = 0; @Override public void run() { handle.onMessage(("reply " +(count++); } }, 10, Math.max(1, ThreadLocalRandom.current().nextInt(5000)), TimeUnit.MILLISECONDS)); } @Override public void onClose(User user, Object message, PrivateSubscriptionHandle handle) { ScheduledFuture<?> future = subscriptions.remove(((SessionUser)user).getSid()+ "_"+handle.getTopic()); if (future !=null){ future.cancel(true); } } }
Here we set up a PrivateConversation
with a default queue size and send data until the client calls close. Unlike with the shared subscriptions, we have access to the User on the subscribe as this queue is entirely private to that User instance. Queue size control can be made using the Queue annotations or APIs as detailed in the Handlers guide.
Creating And Authorising Users
The User
object is entirely in control of your own code. It is implemented by adding a UserConnectionHandler
:
@Token(name = "X-AUTH-SSO", type = Type.Cookie) class AsynchUserConnectionHandler implements UserClientHandler { private ExecutorService service; @Override public void onNewClient(SocketAddress remoteAddress, final Map<String, TokenData> tokens, ClientHandle handle) { service.execute(new Runnable(){ @Override public void run() { try { TokenData t = tokens.get("X-AUTH-SSO"); if (validate(t. getData())){ User u = new SessionUser("user"+ count.getAndIncrement(), handle.getSID()); LOGGER.info(remoteAddress+" New Client for user "+ u.getName()+ " "+ handle.getSID()); handle.accept(u); }else{ handle.deny(); } } catch (Exception e) { LOGGER.warn("Error processing token ", e); // make sure we callback to enable outbound write handle.deny(); } } }); } }
Here we get a custom val from the X-AUTH-SSO
cookie which we can use to check the state of something like an auth cookie (or anything else we want to) and then accept the user.