@@ -27,6 +27,8 @@ import com.mongodb.ServerAddress
2727import com.mongodb.async.FutureResultCallback
2828import com.mongodb.async.SingleResultCallback
2929import com.mongodb.event.ConnectionListener
30+ import com.mongodb.event.ConnectionMessageReceivedEvent
31+ import com.mongodb.event.ConnectionMessagesSentEvent
3032import org.bson.BsonBinaryWriter
3133import org.bson.BsonDocument
3234import org.bson.BsonInt32
@@ -50,6 +52,7 @@ import java.util.concurrent.ExecutorService
5052import java.util.concurrent.Executors
5153
5254import static MongoNamespace.COMMAND_COLLECTION_NAME
55+ import static com.mongodb.CustomMatchers.compare
5356import static com.mongodb.connection.ConnectionDescription.getDefaultMaxMessageSize
5457import static com.mongodb.connection.ConnectionDescription.getDefaultMaxWriteBatchSize
5558import static com.mongodb.connection.ServerDescription.getDefaultMaxDocumentSize
@@ -61,7 +64,9 @@ class InternalStreamConnectionSpecification extends Specification {
6164
6265 def helper = new StreamHelper ()
6366 def serverAddress = new ServerAddress ()
64- def connectionDescription = new ConnectionDescription (new ConnectionId (SERVER_ID , 1 , 1 ), new ServerVersion (), ServerType . STANDALONE ,
67+ def connectionId = new ConnectionId (SERVER_ID , 1 , 1 )
68+
69+ def connectionDescription = new ConnectionDescription (connectionId, new ServerVersion (), ServerType . STANDALONE ,
6570 getDefaultMaxWriteBatchSize(), getDefaultMaxDocumentSize(),
6671 getDefaultMaxMessageSize())
6772 def stream = Mock (Stream ) {
@@ -106,23 +111,31 @@ class InternalStreamConnectionSpecification extends Specification {
106111 given :
107112 def connection = getOpenedConnection()
108113 def (buffers1, messageId1) = helper. isMaster()
109- stream. read(_) >>> helper. read([messageId1])
114+ def messageSize = helper. remaining(buffers1)
115+ stream. write(_) >> {
116+ helper. write(buffers1)
117+ }
110118 when :
111119 connection. sendMessage(buffers1, messageId1)
112120
121+
113122 then :
114- 1 * listener. messagesSent(_)
123+ 1 * listener. messagesSent {
124+ compare(new ConnectionMessagesSentEvent (connectionId, messageId1, messageSize), it)
125+ }
115126 }
116127
117128 @Category (Async )
118129 @IgnoreIf ({ javaVersion < 1.7 })
119130 def ' should fire message sent event asynchronously' () {
120- stream. writeAsync(_, _) >> { List<ByteBuf > buffers , AsyncCompletionHandler<Void > callback ->
121- callback. completed(null )
122- }
123131 def (buffers1, messageId1) = helper. isMaster()
132+ def messageSize = helper. remaining(buffers1)
124133 def connection = getOpenedConnection()
125134 def latch = new CountDownLatch (1 );
135+ stream. writeAsync(_, _) >> { List<ByteBuf > buffers , AsyncCompletionHandler<Void > callback ->
136+ helper. write(buffers1)
137+ callback. completed(null )
138+ }
126139
127140 when :
128141 connection. sendMessageAsync(buffers1, messageId1, new SingleResultCallback<Void > () {
@@ -134,7 +147,9 @@ class InternalStreamConnectionSpecification extends Specification {
134147 latch. await()
135148
136149 then :
137- 1 * listener. messagesSent(_)
150+ 1 * listener. messagesSent {
151+ compare(new ConnectionMessagesSentEvent (connectionId, messageId1, messageSize), it)
152+ }
138153 }
139154
140155 def ' should fire message received event' () {
@@ -147,7 +162,9 @@ class InternalStreamConnectionSpecification extends Specification {
147162 connection. receiveMessage(messageId1)
148163
149164 then :
150- 1 * listener. messageReceived(_)
165+ 1 * listener. messageReceived {
166+ compare(new ConnectionMessageReceivedEvent (connectionId, messageId1, 110 ), it)
167+ }
151168 }
152169
153170 @Category (Async )
@@ -175,7 +192,9 @@ class InternalStreamConnectionSpecification extends Specification {
175192 latch. await()
176193
177194 then :
178- 1 * listener. messageReceived(_)
195+ 1 * listener. messageReceived {
196+ compare(new ConnectionMessageReceivedEvent (connectionId, messageId1, 110 ), it)
197+ }
179198 }
180199
181200 def ' should change the connection description when opened' () {
@@ -653,6 +672,21 @@ class InternalStreamConnectionSpecification extends Specification {
653672 class StreamHelper {
654673 int nextMessageId = 900000 // Generates a message then adds one to the id
655674
675+ def remaining (List<ByteBuf > buffers ) {
676+ int remaining = 0
677+ buffers. each {
678+ remaining + = it. remaining()
679+ }
680+ remaining
681+ }
682+
683+ def write (List<ByteBuf > buffers ) {
684+ buffers. each {
685+ it. get(new byte [it. remaining()])
686+ }
687+ }
688+
689+
656690 def read (List<Integer > messageIds ) {
657691 read(messageIds, true )
658692 }
@@ -706,10 +740,10 @@ class InternalStreamConnectionSpecification extends Specification {
706740 def command = new CommandMessage (new MongoNamespace (' admin' , COMMAND_COLLECTION_NAME ). getFullName(),
707741 new BsonDocument (' ismaster' , new BsonInt32 (1 )),
708742 false , MessageSettings . builder(). build());
709- OutputBuffer buffer = new BasicOutputBuffer ();
710- command. encode(buffer );
743+ OutputBuffer outputBuffer = new BasicOutputBuffer ();
744+ command. encode(outputBuffer );
711745 nextMessageId++
712- [buffer . byteBuffers, nextMessageId]
746+ [outputBuffer . byteBuffers, nextMessageId]
713747 }
714748
715749 def isMasterAsync () {
0 commit comments