Skip to content

Commit ade11ca

Browse files
committed
JAVA-616: To fix issue with the Semaphore overflowing, made SimplePool class a lot... simpler.
1. Removed support for unlimited pool size. Now there is a single maximum size of the pool, representing the total number of potentially available and in-use members 2. Semaphore now has one permit for each pool member that are potentially available (either in available list or could be created). 3. Synchronized on this. Only enter synchronization block in get method if a permit has been acquired 4. Keeping track of list of available members (_avail) and set of checked out members (_out). Get method removes from _avail or creates new member, and adds to _out. Done method removes from _out and adds to _avail. 5. Removed debug and track leaks support 6. Updated JMX monitoring interface to reflect the new structure.
1 parent fba7ac1 commit ade11ca

File tree

12 files changed

+388
-372
lines changed

12 files changed

+388
-372
lines changed

src/main/com/mongodb/DBPortPool.java

Lines changed: 27 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package com.mongodb;
2020

21+
import com.mongodb.util.ConnectionPoolStatisticsBean;
2122
import com.mongodb.util.SimplePool;
2223
import com.mongodb.util.management.JMException;
2324
import com.mongodb.util.management.MBeanServerFactory;
@@ -36,18 +37,6 @@
3637
*/
3738
public class DBPortPool extends SimplePool<DBPort> implements MongoConnectionPoolMXBean {
3839

39-
public InUseConnectionInfo[] getInUseConnections() {
40-
List<InUseConnectionInfo> inUseConnectionInfoList = new ArrayList<InUseConnectionInfo>();
41-
synchronized (_avail) {
42-
for (DBPort port : _all) {
43-
if (!_avail.contains(port)) {
44-
inUseConnectionInfoList.add(new InUseConnectionInfo(port));
45-
}
46-
}
47-
}
48-
return inUseConnectionInfoList.toArray(new InUseConnectionInfo[inUseConnectionInfoList.size()]);
49-
}
50-
5140
@Override
5241
public String getHost() {
5342
return _addr.getHost();
@@ -58,6 +47,21 @@ public int getPort() {
5847
return _addr.getPort();
5948
}
6049

50+
@Override
51+
public synchronized ConnectionPoolStatisticsBean getStatistics() {
52+
return new ConnectionPoolStatisticsBean(getTotal(), getInUse(), getInUseConnections());
53+
}
54+
55+
private InUseConnectionBean[] getInUseConnections() {
56+
List<InUseConnectionBean> inUseConnectionInfoList = new ArrayList<InUseConnectionBean>();
57+
long currentNanoTime = System.nanoTime();
58+
for (DBPort port : _out) {
59+
inUseConnectionInfoList.add(new InUseConnectionBean(port, currentNanoTime));
60+
}
61+
return inUseConnectionInfoList.toArray(new InUseConnectionBean[inUseConnectionInfoList.size()]);
62+
}
63+
64+
6165
static class Holder {
6266

6367
Holder( MongoOptions options ){
@@ -156,7 +160,7 @@ public static class ConnectionWaitTimeOut extends NoMoreConnection {
156160
// ----
157161

158162
DBPortPool( ServerAddress addr , MongoOptions options ){
159-
super( "DBPortPool-" + addr.toString() + ", options = " + options.toString() , options.connectionsPerHost , options.connectionsPerHost );
163+
super( "DBPortPool-" + addr.toString() + ", options = " + options.toString() , options.connectionsPerHost );
160164
_options = options;
161165
_addr = addr;
162166
_waitingSem = new Semaphore( _options.connectionsPerHost * _options.threadsAllowedToBlockForConnectionMultiplier );
@@ -166,24 +170,22 @@ protected long memSize( DBPort p ){
166170
return 0;
167171
}
168172

169-
protected int pick( int iThink , boolean couldCreate ){
170-
final int id = System.identityHashCode(Thread.currentThread());
171-
final int s = _availSafe.size();
172-
for ( int i=0; i<s; i++ ){
173-
DBPort p = _availSafe.get(i);
174-
if ( p._lastThread == id )
173+
@Override
174+
protected int pick( int recommended, boolean couldCreate ){
175+
int id = System.identityHashCode(Thread.currentThread());
176+
for (int i = _avail.size() - 1; i >= 0; i--){
177+
if ( _avail.get(i)._lastThread == id )
175178
return i;
176179
}
177180

178-
if ( couldCreate )
179-
return -1;
180-
return iThink;
181+
return couldCreate ? -1 : recommended;
181182
}
182183

183184
/**
184185
* @return
185186
* @throws MongoException
186187
*/
188+
@Override
187189
public DBPort get() {
188190
DBPort port = null;
189191
if ( ! _waitingSem.tryAcquire() )
@@ -199,7 +201,7 @@ public DBPort get() {
199201
if ( port == null )
200202
throw new ConnectionWaitTimeOut( _options.maxWaitTime );
201203

202-
port._lastThread = System.identityHashCode(Thread.currentThread());
204+
port._lastThread = System.identityHashCode(Thread.currentThread());
203205
return port;
204206
}
205207

@@ -236,18 +238,12 @@ boolean gotError( Exception e ){
236238
return false;
237239
}
238240

239-
void close(){
240-
clear();
241-
}
242-
241+
@Override
243242
public void cleanup( DBPort p ){
244243
p.close();
245244
}
246245

247-
public boolean ok( DBPort t ){
248-
return _addr.getSocketAddress().equals( t._addr );
249-
}
250-
246+
@Override
251247
protected DBPort createNew(){
252248
return new DBPort( _addr , this , _options );
253249
}

src/main/com/mongodb/InUseConnectionInfo.java renamed to src/main/com/mongodb/InUseConnectionBean.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,14 @@
1515
*/
1616
package com.mongodb;
1717

18+
import java.util.concurrent.TimeUnit;
19+
1820
/**
1921
* This class is NOT part of the public API. Be prepared for non-binary compatible changes in minor releases.
2022
*/
21-
public class InUseConnectionInfo {
23+
public class InUseConnectionBean {
2224

23-
InUseConnectionInfo(final DBPort port) {
25+
InUseConnectionBean(final DBPort port, long currentNanoTime) {
2426
DBPort.ActiveState activeState = port.getActiveState();
2527
if (activeState == null) {
2628
durationMS = 0;
@@ -31,7 +33,7 @@ public class InUseConnectionInfo {
3133
numDocuments = 0;
3234
}
3335
else {
34-
durationMS = (System.nanoTime() - activeState.startTime) / 1000000;
36+
durationMS = TimeUnit.NANOSECONDS.toMillis(currentNanoTime - activeState.startTime);
3537
namespace = activeState.outMessage.getNamespace();
3638
opCode = activeState.outMessage.getOpCode();
3739
query = activeState.outMessage.getQuery() != null ? activeState.outMessage.getQuery().toString() : null;

src/main/com/mongodb/MongoConnectionPoolMXBean.java

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,45 @@
1616

1717
package com.mongodb;
1818

19-
import com.mongodb.util.SimplePoolMXBean;
19+
import com.mongodb.util.ConnectionPoolStatisticsBean;
2020

2121
/**
2222
* This interface is NOT part of the public API. Be prepared for non-binary compatible changes in minor releases.
2323
*/
24-
public interface MongoConnectionPoolMXBean extends SimplePoolMXBean {
25-
public InUseConnectionInfo[] getInUseConnections();
26-
public String getHost();
27-
public int getPort();
24+
public interface MongoConnectionPoolMXBean {
25+
/**
26+
* Gets the name of the pool.
27+
*
28+
* @return the name of the pool
29+
*/
30+
String getName();
31+
32+
/**
33+
* Gets the maximum allowed size of the pool, including idle and in-use members.
34+
*
35+
* @return the maximum size
36+
*/
37+
int getMaxSize();
38+
39+
40+
/**
41+
* Gets the host that this connection pool is connecting to.
42+
*
43+
* @return the host
44+
*/
45+
String getHost();
46+
47+
/**
48+
* Gets the port that this connection pool is connecting to.
49+
*
50+
* @return the port
51+
*/
52+
int getPort();
53+
54+
/**
55+
* Gets the statistics for this connection pool.
56+
*
57+
* @return the connection pool statistics
58+
*/
59+
ConnectionPoolStatisticsBean getStatistics();
2860
}

src/main/com/mongodb/gridfs/GridFSInputFile.java

Lines changed: 12 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,19 @@
1818

1919
package com.mongodb.gridfs;
2020

21+
import com.mongodb.BasicDBObjectBuilder;
22+
import com.mongodb.DBObject;
23+
import com.mongodb.MongoException;
24+
import com.mongodb.util.Util;
25+
import org.bson.types.ObjectId;
26+
2127
import java.io.IOException;
2228
import java.io.InputStream;
2329
import java.io.OutputStream;
2430
import java.security.MessageDigest;
31+
import java.security.NoSuchAlgorithmException;
2532
import java.util.Date;
2633

27-
import org.bson.types.ObjectId;
28-
29-
import com.mongodb.BasicDBObjectBuilder;
30-
import com.mongodb.DBObject;
31-
import com.mongodb.MongoException;
32-
import com.mongodb.util.SimplePool;
33-
import com.mongodb.util.Util;
34-
3534
/**
3635
* This class represents a GridFS file to be written to the database
3736
* Operations include:
@@ -64,7 +63,11 @@ protected GridFSInputFile( GridFS fs , InputStream in , String filename, boolean
6463
_id = new ObjectId();
6564
_chunkSize = GridFS.DEFAULT_CHUNKSIZE;
6665
_uploadDate = new Date();
67-
_messageDigester = _md5Pool.get();
66+
try {
67+
_messageDigester = MessageDigest.getInstance("MD5");
68+
} catch (NoSuchAlgorithmException e) {
69+
throw new RuntimeException("No MD5!");
70+
}
6871
_messageDigester.reset();
6972
_buffer = new byte[(int) _chunkSize];
7073
}
@@ -253,8 +256,6 @@ public OutputStream getOutputStream() {
253256
* Dumps a new chunk into the chunks collection. Depending on the flag, also
254257
* partial buffers (at the end) are going to be written immediately.
255258
*
256-
* @param data
257-
* Data for chunk.
258259
* @param writePartial
259260
* Write also partial buffers full.
260261
* @throws MongoException
@@ -319,7 +320,6 @@ private int _readStream2Buffer() throws IOException {
319320
private void _finishData() {
320321
if (!_savedChunks) {
321322
_md5 = Util.toHex( _messageDigester.digest() );
322-
_md5Pool.done( _messageDigester );
323323
_messageDigester = null;
324324
_length = _totalBytes;
325325
_savedChunks = true;
@@ -342,25 +342,6 @@ private void _finishData() {
342342
private MessageDigest _messageDigester = null;
343343
private OutputStream _outputStream = null;
344344

345-
/**
346-
* A pool of {@link java.security.MessageDigest} objects.
347-
*/
348-
static SimplePool<MessageDigest> _md5Pool
349-
= new SimplePool<MessageDigest>( "md5" , 10 , -1 , false , false ) {
350-
/**
351-
* {@inheritDoc}
352-
*
353-
* @see com.mongodb.util.SimplePool#createNew()
354-
*/
355-
protected MessageDigest createNew() {
356-
try {
357-
return MessageDigest.getInstance( "MD5" );
358-
} catch ( java.security.NoSuchAlgorithmException e ) {
359-
throw new RuntimeException( "your system doesn't have md5!" );
360-
}
361-
}
362-
};
363-
364345
/**
365346
* An output stream implementation that can be used to successively write to
366347
* a GridFS file.

src/main/com/mongodb/tools/ConnectionPoolStat.java

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ private static void printUsage() {
151151
System.out.println(" objectName - name of the JMX bean for this connection pool");
152152
System.out.println(" host - host of the mongod/mongos server");
153153
System.out.println(" port - port of the mongod/mongos server");
154-
System.out.println(" size - max # of connections allowed");
154+
System.out.println(" maxSize - max # of connections allowed");
155155
System.out.println(" total - # of connections allocated");
156156
System.out.println(" inUse - # of connections in use");
157157
System.out.println(" inUseConnections - list of all in use connections");
@@ -175,22 +175,31 @@ private void print(PrintWriter pw) throws JMException, IOException {
175175
pw.print(" ");
176176
printAttribute("Host", objectName, pw);
177177
printAttribute("Port", objectName, pw);
178-
printAttribute("Size", objectName, pw);
179-
printAttribute("Total", objectName, pw);
180-
printAttribute("EverCreated", objectName, pw);
181-
printAttribute("InUse", objectName, pw);
182-
pw.println();
183-
printInUseConnections(objectName, pw);
178+
printAttribute("MaxSize", objectName, pw);
179+
printStatistics(pw, objectName);
184180
pw.println(" }" + (i == beanSet.size() - 1 ? "" : ","));
185181
i++;
186182
}
187183
pw.println(" ]");
188184
pw.println("}");
189185
}
190186

191-
private void printInUseConnections(final ObjectName objectName, final PrintWriter pw) throws InstanceNotFoundException, IOException, ReflectionException, AttributeNotFoundException, MBeanException {
192-
String key = "InUseConnections";
193-
CompositeData[] compositeDataArray = (CompositeData[]) mBeanConnection.getAttribute(objectName, key);
187+
private void printStatistics(final PrintWriter pw, final ObjectName objectName) throws InstanceNotFoundException, IOException, ReflectionException, AttributeNotFoundException, MBeanException {
188+
String key = "Statistics";
189+
CompositeData statistics = (CompositeData) mBeanConnection.getAttribute(objectName, key);
190+
printSimpleStatistics(pw, statistics);
191+
printInUseConnections(statistics, pw);
192+
}
193+
194+
private void printSimpleStatistics(final PrintWriter pw, final CompositeData statistics) throws InstanceNotFoundException, IOException, ReflectionException, AttributeNotFoundException, MBeanException {
195+
printCompositeDataAttribute("total", statistics, pw);
196+
printCompositeDataAttribute("inUse", statistics, pw);
197+
pw.println();
198+
}
199+
200+
private void printInUseConnections(final CompositeData statistics, final PrintWriter pw) throws InstanceNotFoundException, IOException, ReflectionException, AttributeNotFoundException, MBeanException {
201+
String key = "inUseConnections";
202+
CompositeData[] compositeDataArray = (CompositeData[]) statistics.get(key);
194203
pw.println(" " + getKeyString(key) + ": [");
195204
for (int i = 0; i < compositeDataArray.length; i++) {
196205
CompositeData compositeData = compositeDataArray[i];
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/**
2+
* Copyright (c) 2008 - 2012 10gen, Inc. <http://10gen.com>
3+
* <p/>
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
* <p/>
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
* <p/>
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.mongodb.util;
17+
18+
import com.mongodb.InUseConnectionBean;
19+
20+
/**
21+
* A bean representing connection pool statistics.
22+
*/
23+
public class ConnectionPoolStatisticsBean {
24+
private final int total;
25+
private final int inUse;
26+
private final InUseConnectionBean[] inUseConnections;
27+
28+
public ConnectionPoolStatisticsBean(final int total, final int inUse, final InUseConnectionBean[] inUseConnections) {
29+
//To change body of created methods use File | Settings | File Templates.
30+
this.total = total;
31+
this.inUse = inUse;
32+
this.inUseConnections = inUseConnections;
33+
}
34+
35+
/**
36+
* Gets the total number of pool members, including idle and and in-use members.
37+
*
38+
* @return total number of members
39+
*/
40+
public int getTotal() {
41+
return total;
42+
}
43+
44+
/**
45+
* Gets the number of pool members that are currently in use.
46+
*
47+
* @return number of in-use members
48+
*/
49+
public int getInUse() {
50+
return inUse;
51+
}
52+
53+
/**
54+
* Gets an array of beans describing all the connections that are currently in use.
55+
*
56+
* @return array of in-use connection beans
57+
*/
58+
public InUseConnectionBean[] getInUseConnections() {
59+
return inUseConnections;
60+
}
61+
}

0 commit comments

Comments
 (0)