Skip to content

Commit d3823c0

Browse files
committed
send_throughput test runnable in different processes.
1 parent b5c30da commit d3823c0

File tree

3 files changed

+125
-67
lines changed

3 files changed

+125
-67
lines changed

tests/src/org/jsl/tests/send_throughput/Client.java

Lines changed: 76 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.jsl.tests.Util;
2424

2525
import java.io.IOException;
26+
import java.net.InetSocketAddress;
2627
import java.net.SocketAddress;
2728
import java.nio.ByteBuffer;
2829
import java.nio.channels.SocketChannel;
@@ -42,8 +43,8 @@ public void run()
4243
{
4344
try
4445
{
45-
SocketChannel socketChannel = SocketChannel.open();
46-
socketChannel.socket().setReceiveBufferSize( m_socketBufferSize );
46+
final SocketChannel socketChannel = SocketChannel.open();
47+
socketChannel.socket().setReceiveBufferSize(m_socketBufferSize);
4748

4849
StreamDefragger streamDefragger = new StreamDefragger(4)
4950
{
@@ -59,49 +60,54 @@ protected int validateHeader( ByteBuffer header )
5960
return;
6061
}
6162

62-
ByteBuffer bb = ByteBuffer.allocateDirect( m_socketBufferSize );
63-
64-
ByteBuffer startRequest = m_startRequest.duplicate();
65-
final int bytesSent = socketChannel.write( startRequest );
63+
final ByteBuffer startRequest = m_startRequest.duplicate();
64+
final int bytesSent = socketChannel.write(startRequest);
6665
if (bytesSent != startRequest.capacity())
6766
{
6867
System.out.println( "SocketChannel.send() failed." );
6968
return;
7069
}
7170

71+
final ByteBuffer bb = ByteBuffer.allocateDirect(1024*128);
7272
int messages = 0;
7373
int bytesReceivedTotal = 0;
74-
long startTime = 0;
75-
76-
readSocketLoop: for (;;)
74+
int bytesReceived = socketChannel.read(bb);
75+
if (bytesReceived > 0)
7776
{
78-
final int bytesReceived = socketChannel.read( bb );
79-
if (bytesReceived > 0)
77+
final long startTime = System.nanoTime();
78+
for (;;)
8079
{
81-
if (bytesReceivedTotal == 0)
82-
startTime = System.nanoTime();
8380
bytesReceivedTotal += bytesReceived;
8481

85-
bb.position( 0 );
86-
bb.limit( bytesReceived );
87-
ByteBuffer msg = streamDefragger.getNext( bb );
82+
bb.position(0);
83+
bb.limit(bytesReceived);
84+
ByteBuffer msg = streamDefragger.getNext(bb);
8885
while (msg != null)
8986
{
9087
final int messageLength = msg.getInt();
91-
assert( messageLength == m_messageLength );
92-
if (++messages == m_messages)
93-
break readSocketLoop;
88+
assert(messageLength == m_messageLength);
89+
messages++;
9490
msg = streamDefragger.getNext();
9591
}
92+
93+
if (messages == m_messages)
94+
break;
95+
9696
bb.clear();
97+
bytesReceived = socketChannel.read(bb);
98+
if (bytesReceived <= 0)
99+
break;
97100
}
101+
long entTime = System.nanoTime();
102+
103+
System.out.println(
104+
"Received " + messages + " messages (" + bytesReceivedTotal +
105+
" bytes) at " + Util.formatDelay(startTime, entTime));
98106
}
99-
long entTime = System.nanoTime();
100-
socketChannel.close();
107+
else
108+
System.out.println("Socket read error.");
101109

102-
System.out.println(
103-
"Received " + messages + " messages (" + bytesReceivedTotal +
104-
" bytes) at " + Util.formatDelay(startTime, entTime) + "." );
110+
socketChannel.close();
105111
}
106112
catch (IOException ex)
107113
{
@@ -110,23 +116,23 @@ protected int validateHeader( ByteBuffer header )
110116
}
111117
}
112118

113-
public Client( int sessions, int messages, int messageLength, int socketBufferSize )
119+
public Client(int sessions, int messages, int messageLength, int socketBufferSize)
114120
{
115121
m_session = new SessionThread[sessions];
116122
m_messages = messages;
117123
m_messageLength = messageLength;
118124
m_socketBufferSize = socketBufferSize;
119125

120126
/* length + sessions + messages + message length */
121-
m_startRequest = ByteBuffer.allocateDirect( 4 + 4 + 4 + 4 );
122-
m_startRequest.putInt( 16 );
123-
m_startRequest.putInt( sessions );
124-
m_startRequest.putInt( messages );
125-
m_startRequest.putInt( messageLength );
126-
m_startRequest.position( 0 );
127+
m_startRequest = ByteBuffer.allocateDirect(4 + 4 + 4 + 4);
128+
m_startRequest.putInt(16);
129+
m_startRequest.putInt(sessions);
130+
m_startRequest.putInt(messages);
131+
m_startRequest.putInt(messageLength);
132+
m_startRequest.position(0);
127133
}
128134

129-
public void start( SocketAddress addr )
135+
public void start(SocketAddress addr)
130136
{
131137
m_addr = addr;
132138
for (int idx=0; idx<m_session.length; idx++)
@@ -143,9 +149,46 @@ public void stopAndWait()
143149
for (SessionThread session : m_session)
144150
session.join();
145151
}
146-
catch (InterruptedException ex)
152+
catch (final InterruptedException ex)
147153
{
148154
ex.printStackTrace();
149155
}
150156
}
157+
158+
public static void main(String [] args)
159+
{
160+
int port;
161+
int sessions = 1;
162+
int messages = 1000000;
163+
int messageLength = 500;
164+
int socketBufferSize = (64*1024);
165+
166+
if (args.length > 0)
167+
{
168+
port = Integer.parseInt(args[0]);
169+
if (args.length > 1)
170+
{
171+
sessions = Integer.parseInt(args[1]);
172+
if (args.length > 2)
173+
{
174+
messages = Integer.parseInt(args[2]);
175+
if (args.length > 3)
176+
{
177+
messageLength = Integer.parseInt(args[3]);
178+
if (args.length > 4)
179+
socketBufferSize = Integer.parseInt(args[4]);
180+
}
181+
}
182+
}
183+
}
184+
else
185+
{
186+
System.out.println("Usage: <port> [sessions] [messages] [message length] [socket buffer size]");
187+
return;
188+
}
189+
190+
final Client client = new Client(sessions, messages, messageLength, socketBufferSize);
191+
client.start(new InetSocketAddress("localhost", port));
192+
client.stopAndWait();
193+
}
151194
}

tests/src/org/jsl/tests/send_throughput/Main.java

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,29 +21,32 @@
2121

2222
public class Main
2323
{
24-
public static void main( String [] args )
24+
public static void main(String [] args)
2525
{
26+
final int port = 0; // system assigned free port will be used
2627
int sessions = 1;
27-
int messages = 100000;
28+
int messages = 1000000;
2829
int messageLength = 500;
29-
int socketBufferSize = (64*1024);
30+
int socketBufferSize = (64 * 1024);
3031

3132
if (args.length > 0)
3233
{
33-
sessions = Integer.parseInt( args[0] );
34+
sessions = Integer.parseInt(args[0]);
3435
if (args.length > 1)
3536
{
36-
messages = Integer.parseInt( args[1] );
37+
messages = Integer.parseInt(args[1]);
3738
if (args.length > 2)
3839
{
39-
messageLength = Integer.parseInt( args[2] );
40+
messageLength = Integer.parseInt(args[2]);
4041
if (args.length > 3)
41-
socketBufferSize = Integer.parseInt( args[3] );
42+
socketBufferSize = Integer.parseInt(args[3]);
4243
}
4344
}
4445
}
4546

46-
Client client = new Client( sessions, messages, messageLength, socketBufferSize );
47-
new Server(client, socketBufferSize).run();
47+
final Client client = new Client(sessions, messages, messageLength, socketBufferSize);
48+
final Server server = new Server();
49+
server.run(port, socketBufferSize, client);
50+
client.stopAndWait();
4851
}
4952
}

tests/src/org/jsl/tests/send_throughput/Server.java

Lines changed: 37 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,6 @@
3333

3434
public class Server
3535
{
36-
private final Client m_client;
37-
private final int m_socketBufferSize;
3836
private final Semaphore m_semStart;
3937
private final AtomicInteger m_sessionsConnected;
4038
private final AtomicInteger m_sessionsReady;
@@ -47,19 +45,19 @@ private class ServerListener implements Session.Listener
4745
{
4846
private Session m_session;
4947

50-
public ServerListener( Session session )
48+
public ServerListener(Session session)
5149
{
5250
m_session = session;
53-
System.out.println( "Connection accepted from " + session.getRemoteAddress() );
51+
System.out.println("Connection accepted from " + session.getRemoteAddress());
5452
}
5553

56-
public void onDataReceived( RetainableByteBuffer data )
54+
public void onDataReceived(RetainableByteBuffer data)
5755
{
5856
/* We expect only one small message from the client,
5957
* does not make a sense to handle possible fragmentation.
6058
*/
6159
if (data.remaining() == 0)
62-
throw new RuntimeException( "zero ByteBuffer" );
60+
throw new RuntimeException("zero ByteBuffer");
6361

6462
data.getInt(); // skip packet length
6563
final int sessionsExpected = data.getInt();
@@ -81,7 +79,7 @@ public void onDataReceived( RetainableByteBuffer data )
8179
else
8280
while (m_sender == null);
8381

84-
m_sender[sessions] = new Sender( m_session );
82+
m_sender[sessions] = new Sender(m_session);
8583
m_sender[sessions].start();
8684

8785
sessions = m_sessionsReady.incrementAndGet();
@@ -93,15 +91,15 @@ public void onConnectionClosed()
9391
{
9492
System.out.println(
9593
m_session.getLocalAddress() + " -> " + m_session.getRemoteAddress() +
96-
": connection closed." );
94+
": connection closed.");
9795
}
9896
}
9997

10098
private class Sender extends Thread
10199
{
102100
private final Session m_session;
103101

104-
public Sender( Session session )
102+
Sender(Session session)
105103
{
106104
m_session = session;
107105
}
@@ -119,7 +117,7 @@ public void run()
119117

120118
System.out.println(
121119
"Sent " + m_messages + " messages at " +
122-
Util.formatDelay(startTime, endTime) + " sec." );
120+
Util.formatDelay(startTime, endTime) + " sec.");
123121

124122
m_session.closeConnection();
125123
final int sessions = m_sessionsDone.decrementAndGet();
@@ -130,48 +128,62 @@ public void run()
130128

131129
private class TestAcceptor extends Acceptor
132130
{
133-
public TestAcceptor()
131+
private final Client m_client;
132+
133+
public TestAcceptor(int port, int socketBufferSize, Client client)
134134
{
135-
super( new InetSocketAddress(0) );
136-
tcpNoDelay = true;
137-
socketSendBufSize = m_socketBufferSize;
135+
super(new InetSocketAddress(port));
136+
this.tcpNoDelay = true;
137+
this.socketSendBufSize = socketBufferSize;
138+
this.m_client = client;
138139
}
139140

140-
public void onAcceptorStarted( Collider collider, int localPort )
141+
public void onAcceptorStarted(Collider collider, int localPort)
141142
{
142143
System.out.println( "Server started at port " + localPort );
143144
if (m_client != null)
144-
m_client.start( new InetSocketAddress("localhost", localPort) );
145+
m_client.start(new InetSocketAddress("localhost", localPort));
145146
}
146147

147-
public Session.Listener createSessionListener( Session session )
148+
public Session.Listener createSessionListener(Session session)
148149
{
149-
return new ServerListener( session );
150+
return new ServerListener(session);
150151
}
151152
}
152153

153-
public Server( Client client, int socketBufferSize )
154+
public Server()
154155
{
155-
m_client = client;
156-
m_socketBufferSize = socketBufferSize;
157156
m_semStart = new Semaphore(0);
158157
m_sessionsConnected = new AtomicInteger();
159158
m_sessionsReady = new AtomicInteger();
160159
m_sessionsDone = new AtomicInteger();
161160
}
162161

163-
public void run()
162+
public void run(int port, int socketBufferSize, Client client)
164163
{
165164
try
166165
{
167166
final Collider collider = Collider.create();
168-
collider.addAcceptor( new TestAcceptor() );
167+
collider.addAcceptor(new TestAcceptor(port, socketBufferSize, client));
169168
collider.run();
170-
m_client.stopAndWait();
171169
}
172-
catch (IOException ex)
170+
catch (final IOException ex)
173171
{
174172
ex.printStackTrace();
175173
}
176174
}
175+
176+
public static void main(String [] args)
177+
{
178+
int port = 0;
179+
int socketBufferSize = (64 * 1024);
180+
if (args.length > 0)
181+
{
182+
port = Integer.parseInt(args[0]);
183+
if (args.length > 1)
184+
socketBufferSize = Integer.parseInt(args[1]);
185+
}
186+
final Server server = new Server();
187+
server.run(port, socketBufferSize, null);
188+
}
177189
}

0 commit comments

Comments
 (0)