1- // OutMessage.java
2-
31/**
42 * Copyright (C) 2008 10gen Inc.
53 *
2826import java .util .Collection ;
2927import java .util .concurrent .atomic .AtomicInteger ;
3028
31- import java .io .IOException ;
32- import java .io .OutputStream ;
33- import java .util .concurrent .atomic .AtomicInteger ;
34-
3529class OutMessage extends BasicBSONEncoder {
3630
3731 enum OpCode {
@@ -57,58 +51,26 @@ public int getValue() {
5751
5852 public static OutMessage insert (final DBCollection collection , final DBEncoder encoder , WriteConcern concern ) {
5953 OutMessage om = new OutMessage (collection , OpCode .OP_INSERT , encoder );
60-
61- int flags = 0 ;
62- if (concern .getContinueOnErrorForInsert ()) {
63- flags |= 1 ;
64- }
65- om .writeInt ( flags );
66- om .writeCString ( collection .getFullName () );
54+ om .writeInsertPrologue (concern );
6755
6856 return om ;
6957 }
7058
7159 public static OutMessage update (final DBCollection collection , final DBEncoder encoder ,
7260 final boolean upsert , final boolean multi , final DBObject query , final DBObject o ) {
73- OutMessage om = new OutMessage (collection , OpCode .OP_UPDATE , encoder );
74-
75- om .writeInt ( 0 ); // reserved
76- om .writeCString (collection .getFullName ());
77-
78- int flags = 0 ;
79- if ( upsert ) flags |= 1 ;
80- if ( multi ) flags |= 2 ;
81- om .writeInt ( flags );
82-
83- om .putObject ( query );
84- om .putObject ( o );
85-
86- om ._query = query ;
61+ OutMessage om = new OutMessage (collection , OpCode .OP_UPDATE , encoder , query );
62+ om .writeUpdate (upsert , multi , query , o );
8763
8864 return om ;
8965 }
9066
9167 public static OutMessage remove (final DBCollection collection , final DBEncoder encoder , final DBObject query ) {
92- OutMessage om = new OutMessage (collection , OpCode .OP_DELETE , encoder );
93-
94- om .writeInt ( 0 ); // reserved
95- om .writeCString ( collection .getFullName () );
96-
97- Collection <String > keys = query .keySet ();
98-
99- if ( keys .size () == 1 && keys .iterator ().next ().equals ( "_id" ) && query .get ( keys .iterator ().next () ) instanceof ObjectId )
100- om .writeInt ( 1 );
101- else
102- om .writeInt ( 0 );
103-
104- om .putObject ( query );
105-
106- om ._query = query ;
68+ OutMessage om = new OutMessage (collection , OpCode .OP_DELETE , encoder , query );
69+ om .writeRemove ();
10770
10871 return om ;
10972 }
11073
111-
11274 static OutMessage query ( DBCollection collection , int options , int numToSkip , int batchSize , DBObject query , DBObject fields ){
11375 return query ( collection , options , numToSkip , batchSize , query , fields , ReadPreference .primary () );
11476 }
@@ -118,95 +80,140 @@ static OutMessage query( DBCollection collection , int options , int numToSkip ,
11880 }
11981
12082 static OutMessage query ( DBCollection collection , int options , int numToSkip , int batchSize , DBObject query , DBObject fields , ReadPreference readPref , DBEncoder enc ){
121- OutMessage out = new OutMessage (collection , OpCode .OP_QUERY , enc );
83+ OutMessage om = new OutMessage (collection , enc , query , options , readPref );
84+ om .writeQuery (fields , numToSkip , batchSize );
12285
123- out ._appendQuery (options , collection , numToSkip , batchSize , query , fields , readPref );
124-
125- return out ;
86+ return om ;
12687 }
12788
12889 static OutMessage getMore (DBCollection collection , long cursorId , int batchSize ) {
12990 OutMessage om = new OutMessage (collection , OpCode .OP_GETMORE );
130-
131- om .writeInt (0 );
132- om .writeCString (collection .getFullName ());
133- om .writeInt (batchSize );
134- om .writeLong (cursorId );
91+ om .writeGetMore (cursorId , batchSize );
13592
13693 return om ;
13794 }
13895
13996 static OutMessage killCursors (Mongo mongo , int numCursors ) {
14097 OutMessage om = new OutMessage (mongo , OpCode .OP_KILL_CURSORS );
141-
142- om .writeInt (0 ); // reserved
143- om .writeInt (numCursors );
144-
98+ om .writeKillCursorsPrologue (numCursors );
14599
146100 return om ;
147101 }
148102
149- OutMessage (final DBCollection collection , final OpCode opQuery , final DBEncoder enc ) {
150- this (collection .getDB ().getMongo (), opQuery , enc );
151- this ._collection = collection ;
103+ private OutMessage ( Mongo m , OpCode opCode ){
104+ this (null , m , opCode , null );
152105 }
153106
154- OutMessage (final DBCollection collection , final OpCode opQuery ) {
155- this (collection .getDB ().getMongo (), opQuery );
156- this ._collection = collection ;
107+ private OutMessage (final DBCollection collection , final OpCode opCode ) {
108+ this (collection , opCode , null );
157109 }
158110
111+ private OutMessage (final DBCollection collection , final OpCode opCode , final DBEncoder enc ) {
112+ this (collection , collection .getDB ().getMongo (), opCode , enc );
113+ }
159114
160- OutMessage ( Mongo m , OpCode opCode ){
161- this ( m , DefaultDBEncoder .FACTORY .create () );
162- reset (opCode );
115+ private OutMessage (final DBCollection collection , final Mongo m , final OpCode opCode , final DBEncoder enc ) {
116+ this (collection , m , opCode , enc , null , -1 , null );
163117 }
164118
165- OutMessage ( Mongo m , OpCode opCode , DBEncoder enc ) {
166- this ( m , enc );
167- reset ( opCode );
119+ private OutMessage (final DBCollection collection , final OpCode opCode , final DBEncoder enc , final DBObject query ) {
120+ this (collection , collection .getDB ().getMongo (), opCode , enc , query , 0 , null );
168121 }
169122
170- private OutMessage ( Mongo m , DBEncoder encoder ) {
171- _encoder = encoder ;
123+ private OutMessage (final DBCollection collection , final DBEncoder enc , final DBObject query , final int options , final ReadPreference readPref ) {
124+ this (collection , collection .getDB ().getMongo (), OpCode .OP_QUERY , enc , query , options , readPref );
125+ }
126+
127+ private OutMessage (final DBCollection collection , final Mongo m , OpCode opCode , final DBEncoder enc , final DBObject query , final int options , final ReadPreference readPref ) {
128+ _collection = collection ;
172129 _mongo = m ;
173- _buffer = _mongo == null ? new PoolOutputBuffer () : _mongo ._bufferPool .get ();
130+ _encoder = enc ;
131+
132+ _buffer = _mongo ._bufferPool .get ();
174133 _buffer .reset ();
134+ set (_buffer );
175135
176- set ( _buffer );
177- }
136+ _id = REQUEST_ID .getAndIncrement ();
137+ _opCode = opCode ;
138+
139+ writeMessagePrologue (opCode );
178140
179- private void _appendQuery ( int options , DBCollection collection , int numToSkip , int batchSize , DBObject query , DBObject fields , ReadPreference readPref ){
180- _queryOptions = options ;
181- _readPref = readPref ;
141+ if (query == null ) {
142+ _query = null ;
143+ _queryOptions = 0 ;
144+ } else {
145+ _query = query ;
182146
183- //If the readPrefs are non-null and non-primary, set slaveOk query option
184- if (_readPref != null && _readPref .isSlaveOk ()) {
185- _queryOptions |= Bytes .QUERYOPTION_SLAVEOK ;
147+ int allOptions = options ;
148+ if (readPref != null && readPref .isSlaveOk ()) {
149+ allOptions |= Bytes .QUERYOPTION_SLAVEOK ;
150+ }
151+
152+ _queryOptions = allOptions ;
186153 }
154+ }
155+
156+ private void writeInsertPrologue (final WriteConcern concern ) {
157+ int flags = 0 ;
158+ if (concern .getContinueOnErrorForInsert ()) {
159+ flags |= 1 ;
160+ }
161+ writeInt (flags );
162+ writeCString (_collection .getFullName ());
163+ }
187164
188- writeInt ( _queryOptions );
189- writeCString ( collection .getFullName () );
165+ private void writeUpdate (final boolean upsert , final boolean multi , final DBObject query , final DBObject o ) {
166+ writeInt (0 ); // reserved
167+ writeCString (_collection .getFullName ());
190168
191- writeInt ( numToSkip );
192- writeInt ( batchSize );
169+ int flags = 0 ;
170+ if ( upsert ) flags |= 1 ;
171+ if ( multi ) flags |= 2 ;
172+ writeInt (flags );
173+
174+ putObject (query );
175+ putObject (o );
176+ }
193177
194- putObject ( query );
195- if ( fields != null )
196- putObject ( fields );
178+ private void writeRemove () {
179+ writeInt ( 0 ); // reserved
180+ writeCString ( _collection . getFullName () );
197181
198- this . _query = query ;
182+ Collection < String > keys = _query . keySet () ;
199183
184+ if ( keys .size () == 1 && keys .iterator ().next ().equals ( "_id" ) && _query .get ( keys .iterator ().next () ) instanceof ObjectId )
185+ writeInt ( 1 );
186+ else
187+ writeInt ( 0 );
188+
189+ putObject (_query );
200190 }
201191
202- private void reset ( OpCode opCode ){
203- done ();
204- _buffer .reset ();
205- set ( _buffer );
192+ private void writeGetMore (final long cursorId , final int batchSize ) {
193+ writeInt (0 );
194+ writeCString (_collection .getFullName ());
195+ writeInt (batchSize );
196+ writeLong (cursorId );
197+ }
206198
207- _id = REQUEST_ID .getAndIncrement ();
208- _opCode = opCode ;
199+ private void writeKillCursorsPrologue (final int numCursors ) {
200+ writeInt (0 ); // reserved
201+ writeInt (numCursors );
202+ }
203+
204+ private void writeQuery (final DBObject fields , final int numToSkip , final int batchSize ) {
205+ writeInt (_queryOptions );
206+ writeCString (_collection .getFullName ());
207+
208+ writeInt (numToSkip );
209+ writeInt (batchSize );
209210
211+ putObject (_query );
212+ if (fields != null )
213+ putObject (fields );
214+ }
215+
216+ private void writeMessagePrologue (final OpCode opCode ) {
210217 writeInt ( 0 ); // length: will set this later
211218 writeInt ( _id );
212219 writeInt ( 0 ); // response to
@@ -217,28 +224,17 @@ void prepare(){
217224 _buffer .writeInt ( 0 , _buffer .size () );
218225 }
219226
220-
221- void pipe ( OutputStream out )
222- throws IOException {
227+ void pipe ( OutputStream out ) throws IOException {
223228 _buffer .pipe ( out );
224229 }
225230
226231 int size (){
227232 return _buffer .size ();
228233 }
229234
230- byte [] toByteArray (){
231- return _buffer .toByteArray ();
232- }
233-
234235 void doneWithMessage (){
235- if ( _buffer != null && _mongo != null ) {
236- _buffer .reset ();
237- _mongo ._bufferPool .done ( _buffer );
238- }
239-
240- _buffer = null ;
241- _mongo = null ;
236+ _buffer .reset ();
237+ _mongo ._bufferPool .done (_buffer );
242238 }
243239
244240 boolean hasOption ( int option ){
@@ -264,30 +260,19 @@ String getNamespace() {
264260 @ Override
265261 public int putObject (BSONObject o ) {
266262 // check max size
267- int sz = _encoder .writeObject (_buf , o );
268- if (_mongo != null ) {
269- int maxsize = _mongo .getConnector ().getMaxBsonObjectSize ();
270- maxsize = Math .max (maxsize , Bytes .MAX_OBJECT_SIZE );
271- if (sz > maxsize ) {
272- throw new MongoInternalException ("DBObject of size " + sz + " is over Max BSON size " + _mongo .getMaxBsonObjectSize ());
273- }
263+ int objectSize = _encoder .writeObject (_buf , o );
264+ if (objectSize > Math .max (_mongo .getConnector ().getMaxBsonObjectSize (), Bytes .MAX_OBJECT_SIZE )) {
265+ throw new MongoInternalException ("DBObject of size " + objectSize + " is over Max BSON size " + _mongo .getMaxBsonObjectSize ());
274266 }
275- return sz ;
267+ return objectSize ;
276268 }
277269
278-
279- public ReadPreference getReadPreference (){
280- return _readPref ;
281- }
282-
283- private Mongo _mongo ;
284- private PoolOutputBuffer _buffer ;
285- private int _id ;
286- private OpCode _opCode ;
287- private DBCollection _collection ;
288- private int _queryOptions = 0 ;
289- private DBObject _query ;
290- private ReadPreference _readPref = ReadPreference .primary ();
291- private DBEncoder _encoder ;
292-
270+ private final Mongo _mongo ;
271+ private final DBCollection _collection ;
272+ private final PoolOutputBuffer _buffer ;
273+ private final int _id ;
274+ private final OpCode _opCode ;
275+ private final int _queryOptions ;
276+ private final DBObject _query ;
277+ private final DBEncoder _encoder ;
293278}
0 commit comments