Java Test Client
The main class used in the client is com.rubris.client.EngineIOClient
.
The EngineIOClient currently emulates both the long polling and websocket behaviour. It is intended to enable loadtest and scale type testing.
Currently it does not automatically perform Engine-IO heartbeats so for the Websocket channel especially (as there is no poll) if you do not want to suffer server timeouts from not initiating any sends this must be managed in your own code (or ensure that outgoing requests are within the server timeout period)
Instantiating the Client
The loadtest uses Netty as its basic communication library as the performance is reasonable and as we do not tend to do much in the way of application logic in the test things like queueing, context switching, blocking and memory management are less of an issue than on the server.
Configuring Netty for the Client
The initial Netty setup is relatively lightweight and should follow a pattern similar to:
public void execute() throws Exception { // Netty executor worker threads NioEventLoopGroup group = new NioEventLoopGroup(8); // Netty handler setup Bootstrap b = new Bootstrap(); try { b.group(group) .channel(NioSocketChannel.class) .handler(new ClientInitialiser(false)); b.option(ChannelOption.TCP_NODELAY, true); // (4) b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000); b.option(ChannelOption.SO_REUSEADDR, true); // set up our clients here } finally { // Shut down executor threads to exit. group.shutdownGracefully(); } }
The Group and bootstrap object for Netty should be shared for all clients So usually the Netty object setup would occur once and would be supplied as a parameter to all the client instances.
the NIO eventloop setting above means that the instance will top out at around 700% CPU. However, care must be taken in just scaling this up as more CPUs does not necessarily just up the throughput due to NIC, GC etc. See the AWS Testing for more details. Often multiple instances are better.
The ClientInitialiser defined in the com.rubris.client.runners
package determines the Handlers in the Netty pipeline and can be configured to be secure or not using the boolean param.
You are free to use your own pipeline the above is just an example for convenience.
Please see the example com.rubris.client.runners.HttpSubscribeClientRunner
Configuring the client
The client currently relies on the Netty Bootstrap object as shown above which provides the communication objects and a ProtocolListener that provides all the callback methods to drive the scenario. The ProtocolListener is generally the only class you need to implement. although in theory all parts of the client are replaceable.
A convenience stub implementation listener is provided as com.rubris.client.scenarios.BaseScenario
so the implementor only needs to override the methods they are interested in.
EngineIOClient runner = new EngineIOClient( bootstrap, host,port,path,
protocolListener ,upgrade);
The client is inactive until the init() method is called. Once this is called the client will asynchronously initialise its connections to the server and callback on the protocolListener onClientReady method once the long polling and post sockets have been connected and the EngineIO upgrade has completed.
The protocol Listener supplies the following callbacks:
public interface ProtocolListener { public void onBeforeHandshake(HttpClient client); public void onClientFailed(HttpClient client); public void onClientReady(HttpClient client); public void onClientUpgrade(HttpClient client); public void onError(HttpClient client, Object error); public void onChannelSent(HttpClient client); public void onChannelResponse(HttpClient client,ChannelSet channelSet); public void onSubscribeSent(HttpClient client); public void onSubscribeResponse(HttpClient client,Message message); public void onRPCSent(HttpClient client); public void onRPCResponse(HttpClient client,Message message); public void onPush(HttpClient client,PushMessage message); public void onConversationPush(HttpClient client,PushMessage message); public void onConversationSent(HttpClient client); public void onPingSent(HttpClient client); public void onPingResponse(HttpClient client, byte[] data); public void onPollResponse(HttpClient client, byte[] data); public void onPollRequest(HttpClient client); public void onBeforePollRequest(HttpClient client); public Results getClientStats(); }
In the browser the singlethreaded nature means that we cannot get Http Pipeline effects (i.e where the browsers sends another request on the same socket before the previous one has completed). As the Engine-IO client uses at least 2 sockets (1 for the POST and a for the GET), the browser internal structure means that a subsequent POST cannot be sent prior to the first completing and similarly for GET poll requests.
However, for the Java client this is not true as we want a large number of sockets and a reasonable number of threads servicing these sockets. Additionally we have no real control of the threading within the Netty communication library as to ordering and queueing. We therefore have 2 choices:
- we can add some queue structures which simulate this behaviour at the expense of much greater internal memory management and complexity
- adopt a callback mechanism that enables the calling code to coordinate this activity if it chooses (as in some circumstances we do not want to do this)
The library adopts the latter approach and therefore there are some patterns that should be used for the HttpPolling transport that are detailed below.
HttpPolling Client
In order to configure the client as HttpPolling it is simply a matter of not supplying a matching String in the upgrade parameter to a protocol returned by the server e.g.
EngineIOClient runner = new EngineIOClient( bootstrap, "127.0.0.1",9001, "/meta",myProtocolListener,"none");
As shown above we set “none” as a dummy protocol (and could be any non matching string) so we do not match the websocket protocol retuned by the server as an optional upgrade and so we default back to http polling.
The normal pattern to create a client and wait for the termination one would expect given the callback nature of the client is something like:
CountDownLatch latch = new CountDownLatch(1); MyProtocolListener listener = new MyProtocolListener(latch,"content",100); EngineIOClient runner = new EngineIOClient( helper.b, "127.0.0.1",helper.port, helper.path,listener,"none"); runner.init(); latch.await(10000, TimeUnit.MILLISECONDS);
Internally in the listener the countdown latch should be triggered when the desired state of the client (e.g number of messages received) has been reached.
Client Ready
For the Http Polling mode following initialisation the onClientReady callback is called.
@Override public void onClientReady(HttpClient client) { // do things here as the client is ready }
Ready is defined as both channels connected and the EngineIO handshake has been performed. Failure will be returned as either listener.onClientFailed(httpClient)
or listener.onError(httpClient,obj)
depending at what stage of the setup the failure occurs.
It is important to realise that Netty does not use a dedicated thread for the callbacks (it is a threadpool) so the callbacks can arrive on different threads than the sends.
Setting up the channels
Unless you have a specific scenario to test in mind, the normal first step would be to make the “Channels” call to retrieve the available Channels configured on the server (without this you will not be able to make any calls to the server other than direct HTTP calls or pings).
As shown above the code would be triggered using the clientReady:
@Override public void onClientReady(HttpClient client) { this.sid = client.getSid(); this.client = client; client.doChannelRequest(MessageFactory.channelMessage); }
This sends the Channel message to the server. You CANNOT send another message until the Channel message has been sent (you will also need to wait for the Channel response in order to know what channels are available). Therefore you must wait for both these events to complete.
The easiest way to do this is something like below.
// start the request cycle protected void startRequest(){ if(channelsComplete.incrementAndGet() %2 ==0){ client.doRPCRequest(MessageFactory.createRPCMessage(sid, channelId, rpcSentCount.getAndIncrement())); } } // Returned by the Poll socket @Override public void onChannelResponse(HttpClient client, ChannelSet channelSet) { for(Channel channel: channelSet.getChannels()){ if (channelName.equals(channel.getName())){ channelId = channel.getId(); //start the initial startRequest(); } } } // returned by the thread allocated to the post @Override public void onChannelSent(HttpClient client) { startRequest(); }
Here we can see that we trigger the RPC call by calling startRequest() irrespective as to which call returns first. If you fail to follow this pattern (for the channels request) you will get a client that appears to work, but given a pause in the sending thread callback you will get an error callback with the error that HttpPipelining is not supported.
So now we can make RPC request and get the responses:
@Override public void onRPCSent(HttpClient client) { // trigger send on each send if (rpcSentCount.get() < maxMessages){ client.doRPCRequest(MessageFactory.createRPCMessage(sid, channelId, rpcSentCount.getAndIncrement())); } } @Override public void onRPCResponse(HttpClient client,Message message) { recordValue(System.currentTimeMillis() - message.getTimestamp()); int received = pushCount.incrementAndGet(); if (received >= maxMessages){ // we use finished here in case we get errors or multiple responses // that trigger the latch multiple times if (finished.compareAndSet(false, true)){ client.close(); latch.countDown(); } } }
For the above call we can see that we do not wait for the RPC response before making the next RPC send call - we only wait for the RPC send to ensure we are not pipelining requests. In this case the background Poll thread will pick up multiple responses and batch them back to us.
Obviously with this approach (which is not really a situation encountered in the browser apart from under load or delays - and even then it tends to be 2 or 3 in sequence) we could easily send our RPC requests to overwhelm the Client Direct Queue Size on the server for the user replies - leading to rejections to enqueue responses on the server when replying - so one must be sensible as to the number sent before responses are encountered.
If we wanted to more correctly emulate the browser and wait for the response before making another request one would have to adopt the same idiom as shown for the Channels (e.g a method that triggered a new send on the meeting of a condition of both returning. A simple way to do this is something like:
protected void sendMessage(){ // send on each cycle of 2 responses - cannot be out of order as we have // not triggered another send until both complete if(send.incrementAndGet() %2 ==0){ client.doRPCRequest(MessageFactory.createRPCMessage(sid, channelId, rpcSentCount.getAndIncrement())); } } public void onRPCSent(HttpClient client) { // trigger send on each send - count incremented on send if (rpcSentCount.get() < maxMessages){ sendMessage(); } } @Override public void onRPCResponse(HttpClient client,Message message) { recordValue(System.currentTimeMillis() - message.getTimestamp()); int received = pushCount.incrementAndGet(); if (received >= maxMessages){ // we use finished here in case we get errors that have already closed // the socket or multiple responses that trigger the latch multiple times if (finished.compareAndSet(false, true)){ client.close(); latch.countDown(); } }else{ sendMessage(); } }
Scenarios
Subscribing
For shared subscriptions the Client setup would be similar to:
public void startRequest(){
if(channelsComplete.incrementAndGet() %2 ==0){
byte[] content = MessageFactory.createSubscribeMessage(subPool,
new String[0], channelId, messageId++);
client.doSubscribeRequest(content);
}
}
Here the SubscribeMessage (as with the JS client) takes 2 String arrays (the first the subscribe array and the second the unsubscribe array).
Following a subscribe we can listen for the subscribe sent and subscribe response message
@Override
public void onSubscribeSent(HttpClient client) {
// callback to say that subscribe message has been posted
}
@Override
public void onSubscribeResponse(HttpClient client, Message message) {
// response message here with the list of failed (if any) subscriptions)
}
Again, given the thread pool nature of Netty we can get the actual push messages before the above 2 messages have returned.
@Override public void onPush(HttpClient client, PushMessage message) { // check if we get responses from something we did not subscribe to if(!subs.contains(message.getTopic())){ LOGGER.warn(client.getSid()+" Unexpected message topic "+ message.getTopic()); if (finished.compareAndSet(false, true)){ client.close(); latch.countDown(); } return; } // are we over our desired limit - we just close but could unsubscribe if (pushCount.incrementAndGet() > messageLimit){ if (finished.compareAndSet(false, true)){ client.close(); latch.countDown(); } } // our test endpoint sends time in message payload int offset = message.getPayload().getIndex(); long send = Utils.readLong64( ((byte[])message.getPayload().getData()), offset); send =System.currentTimeMillis() - send; // record the longest time difference if(send > longestMessage){ longestMessage = send; results.setLongestGapId(message.getId()); } recordValue(send); }
The above is using an endpoint which sends back timestamps and records the time between send and receive (although it is not strictly completely accurate as it depends on Netty’s internal thread model and queueing. So in general as we ramp the number of clients these numbers increase as the send and receive is queued internally).
Private Conversations
In a similar mechanism to the scenarios shown above we use the Channel as the start for the conversation subscribe. e.g:
// Assumes conversation topic array is already defined protected void sendSubscribe(HttpClient client, int topicIndex) { String payload = "payload" + topicIndex; String topic = conversations[topicIndex]; counts.put(topic, 0); client.doConversation(MessageFactory.createConversationMessage(topic, payload, channelId, messageId++, false)); } public void startRequest(){ if(channelsComplete.incrementAndGet() %2 ==0){ sendSubscribe(client, conversationPosition++); } }
Again the callback methods can be used to determine when the message is sent and when the occurs
@Override public void onConversationSent(HttpClient client) { // here we have the notification the conversation has been sent } @Override public void onConversationPush(HttpClient client, PushMessage message) { // deal with conversation response - there may be more than one depending on // the nature of the endpoint but it is // up to the behaviour for the channel to determine what we need to do here }
Websocket Client
Instantiation
The Websocket client is instantiated in the same manner as the HttpCLient except for the upgrade parameter. e.g:
CountDownLatch latch = new CountDownLatch(1); WsRunner wsScenario = new WsRunner(latch,"content",50); EngineIOClient runner = new EngineIOClient( helper.b, "127.0.0.1",helper. port,helper.path, wsRunner,"websocket"); runner.init(); Assert.assertTrue(latch.await(120000, TimeUnit.MILLISECONDS)); }
In this case the parameter “websocket” is passed to force the client to perform an upgrade.
Websocket Differences
The Websocket client is very similar in function to the HttpPolling mechanism. However there are some differences to be aware of:
- The initial callback is triggered from the onClientUpgrade instead of the clientReady
public void onClientUpgrade(HttpClient client) { // we are done if we have chosen to upgrade sid = client.getSid(); client.doChannelRequest(MessageFactory.channelMessage); }
- There is no need to wait for the Send and the response as the websocket is full duplex and does not suffer from the pipelining issue that HTTP suffers from.
RPC Scenario
So following the initial RPC example for the HttpClient the Websocket equivalent is identical. However, if we want to wait for the RPC response before sending the next we DO NOT need to wait for both the sent and response callback. Instead we can trigger the next message directly on the response callback:
@Override
public void onRPCResponse(HttpClient client,Message message) {
int num = sent.incrementAndGet();
if(num < messages){
client.doRPCRequest(MessageFactory.createRPCMessage(sid, channelId, num));
}
}
Subscribe Scenario
Apart from the initial trigger method and not needing to wait for both callbacks before subsequent sends. The Websocket subscribe is identical to the HTTP Client as shown above. The callbacks are the same and the data is the same.
Conversation Scenario
Again the change in the websocket behaviour allows us to use only the conversation response to drive other behaviour. e.g:
@Override
public void onConversationSent(HttpClient client){
// do nothing
}
@Override
public void onConversationPush(HttpClient client, PushMessage message) {
int lcount = received.incrementAndGet();
if(lcount==messages){
client.doConversation(MessageFactory.createConversationMessage("test",
"test", channelId,messageId++, true));
}else{
client.doConversation(MessageFactory.createConversationMessage("test",
"test", channelId,messageId++, false));
}
}
The above example shows a sequence of conversation messages driven until a count is reached and then send a close conversation message.
We could then for instance close the Client following the notification that the onConversationSent has occurred.
It is important to realise that as we ramp the number of Clients in a test instance (even using the websocket protocol) the threads and sockets servicing the outgoing and incoming data will suffer temporary stalls from ordering, epoll quirks, GC etc and that just sending thousands of messages for each Client at once in a tight loop will lead to full or slow processing of response buffers in the test client. This will eventually push back into the server and manifest as full outbound queues and blocked sockets.
Therefore one should be somewhat realistic in how the clients behave in real life.
In saying that it is easy with the above patterns to generate ~ 10Gb/s in load from 1000 clients with a 1000 rapidly ticking subscriptions facing a 2 module server.
Sample Scenarios
There are currently 4 scenarios supplied as examples in the com.rubris.client.scenarios
package that show:
- Http Polling Conversations
- command line args: host port url clientNum SSL privatesubscriptionsPerClient
- command line args: 127.0.0.1 8091 login 2000 false 100
- Http Polling RPC
- command line args: host port url clientNum SSL rpcMessagesPerClient
- command line args: 127.0.0.1 8091 login 2000 false 1000
- Http Polling subscribe with no messages
- command line args: host port url clientNum SSL topicsPerClient
- command line args: 127.0.0.1 8091 login 2000 false 100
- Http Polling Subscribe with messages.
- command line args: host port url clientNum SSL topicsPerClient channelName messageToWaitForOnEachTopic
- command line args: 127.0.0.1 8091 login 200 false 100 timeHandler 100000
Each scenario is driven from the com.rubris.client.runners
package.
The runner is used to parse the command line args, set up the Scenario and the initialise the Netty Channel instances.
The goal of the scenarios is to run a number of clients, and perform actions that the Web client normally performs.
Each instance supports up to 200 users (before JVM sizing starts to be come an issue) - so for numbers above this it is better to run multiple instances simultaneously.