Skip to content

Commit c9d8910

Browse files
committed
JAVA-613: Expand connection pool JMX stats to include the current operation being executed by each in use connection. Did this by by using MXBean instean of MBean
1 parent 5664f79 commit c9d8910

File tree

12 files changed

+296
-132
lines changed

12 files changed

+296
-132
lines changed

src/main/com/mongodb/DBApiLayer.java

Lines changed: 10 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.bson.types.ObjectId;
2424

2525
import java.util.ArrayList;
26-
import java.util.Collection;
2726
import java.util.Collections;
2827
import java.util.HashMap;
2928
import java.util.Iterator;
@@ -162,10 +161,7 @@ void killCursors( ServerAddress addr , List<Long> all ){
162161
if ( all == null || all.size() == 0 )
163162
return;
164163

165-
OutMessage om = new OutMessage( _mongo , 2007 );
166-
om.writeInt( 0 ); // reserved
167-
168-
om.writeInt( Math.min( NUM_CURSORS_PER_BATCH , all.size() ) );
164+
OutMessage om = OutMessage.killCursors(_mongo, Math.min( NUM_CURSORS_PER_BATCH , all.size()));
169165

170166
int soFar = 0;
171167
int totalSoFar = 0;
@@ -177,9 +173,7 @@ void killCursors( ServerAddress addr , List<Long> all ){
177173

178174
if ( soFar >= NUM_CURSORS_PER_BATCH ){
179175
_connector.say( this , om ,com.mongodb.WriteConcern.NONE );
180-
om = new OutMessage( _mongo , 2007 );
181-
om.writeInt( 0 ); // reserved
182-
om.writeInt( Math.min( NUM_CURSORS_PER_BATCH , all.size() - totalSoFar ) );
176+
om = OutMessage.killCursors(_mongo, Math.min( NUM_CURSORS_PER_BATCH , all.size() - totalSoFar));
183177
soFar = 0;
184178
}
185179
}
@@ -232,15 +226,11 @@ protected WriteResult insert(DBObject[] arr, boolean shouldApply , com.mongodb.W
232226

233227
int cur = 0;
234228
int maxsize = _mongo.getMaxBsonObjectSize();
235-
while ( cur < arr.length ){
236-
OutMessage om = new OutMessage( _mongo , 2002, encoder );
229+
while ( cur < arr.length ) {
237230

238-
int flags = 0;
239-
if ( concern.getContinueOnErrorForInsert() ) flags |= 1;
240-
om.writeInt( flags );
241-
om.writeCString( _fullNameSpace );
231+
OutMessage om = OutMessage.insert( this , encoder, concern );
242232

243-
for ( ; cur<arr.length; cur++ ){
233+
for ( ; cur<arr.length; cur++ ){
244234
DBObject o = arr[cur];
245235
om.putObject( o );
246236

@@ -264,21 +254,7 @@ public WriteResult remove( DBObject o , com.mongodb.WriteConcern concern, DBEnco
264254

265255
if ( willTrace() ) trace( "remove: " + _fullNameSpace + " " + JSON.serialize( o ) );
266256

267-
OutMessage om = new OutMessage( _mongo , 2006, encoder );
268-
269-
om.writeInt( 0 ); // reserved
270-
om.writeCString( _fullNameSpace );
271-
272-
Collection<String> keys = o.keySet();
273-
274-
if ( keys.size() == 1 &&
275-
keys.iterator().next().equals( "_id" ) &&
276-
o.get( keys.iterator().next() ) instanceof ObjectId )
277-
om.writeInt( 1 );
278-
else
279-
om.writeInt( 0 );
280-
281-
om.putObject( o );
257+
OutMessage om = OutMessage.remove(this, encoder, o);
282258

283259
return _connector.say( _db , om , concern );
284260
}
@@ -298,7 +274,7 @@ Iterator<DBObject> __find( DBObject ref , DBObject fields , int numToSkip , int
298274

299275
if ( willTrace() ) trace( "find: " + _fullNameSpace + " " + JSON.serialize( ref ) );
300276

301-
OutMessage query = OutMessage.query( _mongo , options , _fullNameSpace , numToSkip , chooseBatchSize(batchSize, limit, 0) , ref , fields, readPref,
277+
OutMessage query = OutMessage.query( this , options , numToSkip , chooseBatchSize(batchSize, limit, 0) , ref , fields, readPref,
302278
encoder);
303279

304280
Response res = _connector.call( _db , this , query , null , 2, readPref, decoder );
@@ -334,17 +310,7 @@ public WriteResult update( DBObject query , DBObject o , boolean upsert , boolea
334310
trace( "update: " + _fullNameSpace + " " + JSON.serialize( query ) + " " + JSON.serialize( o ) );
335311
}
336312

337-
OutMessage om = new OutMessage( _mongo , 2001, encoder );
338-
om.writeInt( 0 ); // reserved
339-
om.writeCString( _fullNameSpace );
340-
341-
int flags = 0;
342-
if ( upsert ) flags |= 1;
343-
if ( multi ) flags |= 2;
344-
om.writeInt( flags );
345-
346-
om.putObject( query );
347-
om.putObject( o );
313+
OutMessage om = OutMessage.update(this, encoder, upsert, multi, query, o);
348314

349315
return _connector.say( _db , om , concern );
350316
}
@@ -446,12 +412,8 @@ private void _advance(){
446412
if ( _curResult.cursor() <= 0 )
447413
throw new RuntimeException( "can't advance a cursor <= 0" );
448414

449-
OutMessage m = new OutMessage( _mongo , 2005 );
450-
451-
m.writeInt( 0 );
452-
m.writeCString( _collection._fullNameSpace );
453-
m.writeInt( chooseBatchSize(_batchSize, _limit, _numFetched) );
454-
m.writeLong( _curResult.cursor() );
415+
OutMessage m = OutMessage.getMore(_collection, _curResult.cursor(),
416+
chooseBatchSize(_batchSize, _limit, _numFetched));
455417

456418
Response res = _connector.call( DBApiLayer.this , _collection , m , _host, _decoder );
457419
_numGetMores++;

src/main/com/mongodb/DBPort.java

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -70,12 +70,8 @@ Response call( OutMessage msg , DBCollection coll ) throws IOException{
7070
return go( msg, coll );
7171
}
7272

73-
Response call( OutMessage msg , DBCollection coll , DBDecoder decoder) throws IOException{
74-
return go( msg, coll, false, null, decoder);
75-
}
76-
77-
Response call( OutMessage msg , DBCollection coll , ReadPreference readPref , DBDecoder decoder) throws IOException{
78-
return go( msg, coll, false, readPref, decoder);
73+
Response call(OutMessage msg, DBCollection coll, DBDecoder decoder) throws IOException{
74+
return go( msg, coll, false, decoder);
7975
}
8076

8177
void say( OutMessage msg )
@@ -85,14 +81,14 @@ void say( OutMessage msg )
8581

8682
private synchronized Response go( OutMessage msg , DBCollection coll )
8783
throws IOException {
88-
return go( msg , coll , false, null, null );
84+
return go( msg , coll , false, null );
8985
}
9086

9187
private synchronized Response go( OutMessage msg , DBCollection coll , DBDecoder decoder ) throws IOException{
92-
return go( msg, coll, false, null, decoder );
88+
return go( msg, coll, false, decoder );
9389
}
9490

95-
private synchronized Response go( OutMessage msg , DBCollection coll , boolean forceReponse , ReadPreference readPref, DBDecoder decoder)
91+
private synchronized Response go(OutMessage msg, DBCollection coll, boolean forceResponse, DBDecoder decoder)
9692
throws IOException {
9793

9894
if ( _processingResponse ){
@@ -114,13 +110,14 @@ private synchronized Response go( OutMessage msg , DBCollection coll , boolean f
114110
throw new IllegalStateException( "_out shouldn't be null" );
115111

116112
try {
113+
_activeOutMessage = msg;
117114
msg.prepare();
118115
msg.pipe( _out );
119116

120117
if ( _pool != null )
121118
_pool._everWorked = true;
122119

123-
if ( coll == null && ! forceReponse )
120+
if ( coll == null && ! forceResponse )
124121
return null;
125122

126123
_processingResponse = true;
@@ -131,6 +128,7 @@ private synchronized Response go( OutMessage msg , DBCollection coll , boolean f
131128
throw ioe;
132129
}
133130
finally {
131+
_activeOutMessage = null;
134132
_processingResponse = false;
135133
}
136134
}
@@ -141,17 +139,11 @@ synchronized CommandResult getLastError( DB db , WriteConcern concern ) throws I
141139
}
142140

143141
synchronized private Response findOne( DB db , String coll , DBObject q ) throws IOException {
144-
OutMessage msg = OutMessage.query( db._mongo , 0 , db.getName() + "." + coll , 0 , -1 , q , null );
142+
OutMessage msg = OutMessage.query( db.getCollection(coll) , 0 , 0 , -1 , q , null );
145143
Response res = go( msg , db.getCollection( coll ) , null );
146144
return res;
147145
}
148146

149-
synchronized private Response findOne( String ns , DBObject q ) throws IOException{
150-
OutMessage msg = OutMessage.query( null , 0 , ns , 0 , -1 , q , null );
151-
Response res = go( msg , null , true, null, null );
152-
return res;
153-
}
154-
155147
synchronized CommandResult runCommand( DB db , DBObject cmd ) throws IOException {
156148
Response res = findOne( db , "$cmd" , cmd );
157149
return convertToCommandResult(cmd, res);
@@ -273,6 +265,10 @@ protected void finalize() throws Throwable{
273265
close();
274266
}
275267

268+
OutMessage getOutMessageBeingProcessed() {
269+
return _activeOutMessage;
270+
}
271+
276272
/**
277273
* closes the underlying connection and streams
278274
*/
@@ -338,6 +334,7 @@ public DBPortPool getPool() {
338334
private Map<DB,Boolean> _authed = new ConcurrentHashMap<DB, Boolean>( );
339335
int _lastThread;
340336
long _calls = 0;
337+
private volatile OutMessage _activeOutMessage;
341338

342339
private static Logger _rootLogger = Logger.getLogger( "com.mongodb.port" );
343340
}

src/main/com/mongodb/DBPortPool.java

Lines changed: 35 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -31,27 +31,44 @@
3131
import java.util.concurrent.atomic.AtomicInteger;
3232
import java.util.logging.Level;
3333

34-
public class DBPortPool extends SimplePool<DBPort> {
34+
public class DBPortPool extends SimplePool<DBPort> implements MongoConnectionPoolMXBean {
35+
36+
public MongoConnection[] getInUseConnections() {
37+
List<MongoConnection> connectionList = new ArrayList<MongoConnection>();
38+
synchronized (_avail) {
39+
for (DBPort port : _all) {
40+
if (!_avail.contains(port)) {
41+
OutMessage curOutMessage = port.getOutMessageBeingProcessed();
42+
if (curOutMessage != null) {
43+
connectionList.add(new MongoConnection(curOutMessage.getNamespace(),
44+
curOutMessage.getOpCode(),
45+
curOutMessage.getQuery() != null ? curOutMessage.getQuery().toString() : null));
46+
}
47+
}
48+
}
49+
}
50+
return connectionList.toArray(new MongoConnection[connectionList.size()]);
51+
}
3552

3653
static class Holder {
37-
54+
3855
Holder( MongoOptions options ){
3956
_options = options;
4057
}
4158

4259
DBPortPool get( ServerAddress addr ){
43-
60+
4461
DBPortPool p = _pools.get( addr );
45-
46-
if (p != null)
62+
63+
if (p != null)
4764
return p;
48-
65+
4966
synchronized (_pools) {
5067
p = _pools.get( addr );
5168
if (p != null) {
5269
return p;
5370
}
54-
71+
5572
p = new DBPortPool( addr , _options );
5673
_pools.put( addr , p);
5774

@@ -105,15 +122,15 @@ private String createObjectName( ServerAddress addr ) {
105122
}
106123

107124
// ----
108-
125+
109126
public static class NoMoreConnection extends MongoInternalException {
110127
private static final long serialVersionUID = -4415279469780082174L;
111-
128+
112129
NoMoreConnection( String msg ){
113130
super( msg );
114131
}
115132
}
116-
133+
117134
public static class SemaphoresOut extends NoMoreConnection {
118135
private static final long serialVersionUID = -4415279469780082174L;
119136
SemaphoresOut(){
@@ -129,7 +146,7 @@ public static class ConnectionWaitTimeOut extends NoMoreConnection {
129146
}
130147

131148
// ----
132-
149+
133150
DBPortPool( ServerAddress addr , MongoOptions options ){
134151
super( "DBPortPool-" + addr.toString() + ", options = " + options.toString() , options.connectionsPerHost , options.connectionsPerHost );
135152
_options = options;
@@ -154,10 +171,10 @@ protected int pick( int iThink , boolean couldCreate ){
154171
return -1;
155172
return iThink;
156173
}
157-
174+
158175
/**
159176
* @return
160-
* @throws MongoException
177+
* @throws MongoException
161178
*/
162179
public DBPort get() {
163180
DBPort port = null;
@@ -173,20 +190,20 @@ public DBPort get() {
173190

174191
if ( port == null )
175192
throw new ConnectionWaitTimeOut( _options.maxWaitTime );
176-
193+
177194
port._lastThread = System.identityHashCode(Thread.currentThread());
178195
return port;
179196
}
180197

181198
// return true if the exception is recoverable
182199
boolean gotError( Exception e ){
183-
if ( e instanceof java.nio.channels.ClosedByInterruptException ||
200+
if ( e instanceof java.nio.channels.ClosedByInterruptException ||
184201
e instanceof InterruptedException ){
185202
// this is probably a request that is taking too long
186203
// so usually doesn't mean there is a real db problem
187204
return true;
188205
}
189-
206+
190207
if ( e instanceof java.net.SocketTimeoutException ){
191208
// we don't want to clear the port pool for a connection timing out
192209
return true;
@@ -202,7 +219,7 @@ boolean gotError( Exception e ){
202219
break;
203220
all.add( temp );
204221
}
205-
222+
206223
for ( DBPort p : all ){
207224
p.close();
208225
done(p);
@@ -222,7 +239,7 @@ public void cleanup( DBPort p ){
222239
public boolean ok( DBPort t ){
223240
return _addr.getSocketAddress().equals( t._addr );
224241
}
225-
242+
226243
protected DBPort createNew(){
227244
return new DBPort( _addr , this , _options );
228245
}

src/main/com/mongodb/DBTCPConnector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ public Response call( DB db, DBCollection coll, OutMessage m, ServerAddress host
267267
boolean retry = false;
268268
try {
269269
port.checkAuth( db );
270-
res = port.call( m , coll, readPref, decoder );
270+
res = port.call( m , coll, decoder );
271271
if ( res._responseTo != m.getId() )
272272
throw new MongoException( "ids don't match" );
273273
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/**
2+
* Copyright (c) 2008 - 2011 10gen, Inc. <http://10gen.com>
3+
* <p/>
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
* <p/>
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
* <p/>
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.mongodb;
17+
18+
public class MongoConnection {
19+
20+
public MongoConnection(final String namespace, final OutMessage.OpCode opCode, final String query) {
21+
this.namespace = namespace;
22+
this.opCode = opCode;
23+
this.query = query;
24+
}
25+
26+
public String getNamespace() {
27+
return namespace;
28+
}
29+
30+
public OutMessage.OpCode getOpCode() {
31+
return opCode;
32+
}
33+
34+
public String getQuery() {
35+
return query;
36+
}
37+
38+
private final String namespace;
39+
private final OutMessage.OpCode opCode;
40+
private final String query;
41+
42+
}

0 commit comments

Comments
 (0)