1717package com .mongodb .async .client ;
1818
1919
20+ import com .mongodb .MongoBulkWriteException ;
2021import com .mongodb .MongoException ;
2122import com .mongodb .ReadConcern ;
2223import com .mongodb .ReadConcernLevel ;
2324import com .mongodb .ReadPreference ;
2425import com .mongodb .WriteConcern ;
2526import com .mongodb .async .FutureResultCallback ;
27+ import com .mongodb .bulk .BulkWriteError ;
2628import com .mongodb .bulk .BulkWriteResult ;
2729import com .mongodb .bulk .BulkWriteUpsert ;
2830import com .mongodb .client .model .BulkWriteOptions ;
6163import java .lang .reflect .InvocationTargetException ;
6264import java .lang .reflect .Method ;
6365import java .util .ArrayList ;
66+ import java .util .Collections ;
6467import java .util .List ;
6568
6669import static java .lang .String .format ;
@@ -145,7 +148,8 @@ BsonDocument toResult(final UpdateResult updateResult) {
145148 return toResult (resultDoc );
146149 }
147150
148- BsonDocument toResult (final BulkWriteResult bulkWriteResult , final List <WriteModel <BsonDocument >> writeModels ) {
151+ BsonDocument toResult (final BulkWriteResult bulkWriteResult , final List <? extends WriteModel <BsonDocument >> writeModels ,
152+ final List <BulkWriteError > writeErrors ) {
149153
150154 BsonDocument resultDoc = new BsonDocument ();
151155 if (bulkWriteResult .wasAcknowledged ()) {
@@ -155,8 +159,7 @@ BsonDocument toResult(final BulkWriteResult bulkWriteResult, final List<WriteMod
155159 BsonDocument insertedIds = new BsonDocument ();
156160 for (int i = 0 ; i < writeModels .size (); i ++) {
157161 WriteModel <BsonDocument > cur = writeModels .get (i );
158- // TODO: Any need to suport InsertManyModel, and if so, how to represent it?
159- if (cur instanceof InsertOneModel ) {
162+ if (cur instanceof InsertOneModel && writeSuccessful (i , writeErrors )) {
160163 InsertOneModel <BsonDocument > insertOneModel = (InsertOneModel <BsonDocument >) cur ;
161164 insertedIds .put (Integer .toString (i ), insertOneModel .getDocument ().get ("_id" ));
162165 }
@@ -179,6 +182,15 @@ BsonDocument toResult(final BulkWriteResult bulkWriteResult, final List<WriteMod
179182 return toResult (resultDoc );
180183 }
181184
185+ private boolean writeSuccessful (final int index , final List <BulkWriteError > writeErrors ) {
186+ for (BulkWriteError cur : writeErrors ) {
187+ if (cur .getIndex () == index ) {
188+ return false ;
189+ }
190+ }
191+ return true ;
192+ }
193+
182194 BsonDocument toResult (@ Nullable final BsonValue results ) {
183195 return new BsonDocument ("result" , results != null ? results : BsonNull .VALUE );
184196 }
@@ -485,27 +497,40 @@ BsonDocument getInsertManyResult(final BsonDocument collectionOptions, final Bso
485497 documents .add (document .asDocument ());
486498 }
487499 FutureResultCallback <Void > futureResultCallback = new FutureResultCallback <Void >();
488- if (clientSession == null ) {
489- getCollection (collectionOptions )
490- .insertMany (documents ,
491- new InsertManyOptions ().ordered (arguments .getDocument ("options" , new BsonDocument ())
492- .getBoolean ("ordered" , BsonBoolean .TRUE ).getValue ()),
493- futureResultCallback );
494- } else {
495- getCollection (collectionOptions )
496- .insertMany (clientSession ,
497- documents ,
498- new InsertManyOptions ().ordered (arguments .getDocument ("options" , new BsonDocument ())
499- .getBoolean ("ordered" , BsonBoolean .TRUE ).getValue ()),
500- futureResultCallback );
501- }
502- futureResult (futureResultCallback );
503500
504- BsonDocument insertedIds = new BsonDocument ();
505- for (int i = 0 ; i < documents .size (); i ++) {
506- insertedIds .put (Integer .toString (i ), documents .get (i ).get ("_id" ));
501+ try {
502+ if (clientSession == null ) {
503+ getCollection (collectionOptions )
504+ .insertMany (documents ,
505+ new InsertManyOptions ().ordered (arguments .getDocument ("options" , new BsonDocument ())
506+ .getBoolean ("ordered" , BsonBoolean .TRUE ).getValue ()),
507+ futureResultCallback );
508+ } else {
509+ getCollection (collectionOptions )
510+ .insertMany (clientSession ,
511+ documents ,
512+ new InsertManyOptions ().ordered (arguments .getDocument ("options" , new BsonDocument ())
513+ .getBoolean ("ordered" , BsonBoolean .TRUE ).getValue ()),
514+ futureResultCallback );
515+ }
516+ futureResult (futureResultCallback );
517+
518+ BsonDocument insertedIds = new BsonDocument ();
519+ for (int i = 0 ; i < documents .size (); i ++) {
520+ insertedIds .put (Integer .toString (i ), documents .get (i ).get ("_id" ));
521+ }
522+ return toResult (new BsonDocument ("insertedIds" , insertedIds ));
523+ } catch (MongoBulkWriteException e ) {
524+ // Test results are expecting this to look just like bulkWrite error, so translate to InsertOneModel so the result
525+ // translation code can be reused.
526+ List <InsertOneModel <BsonDocument >> writeModels = new ArrayList <InsertOneModel <BsonDocument >>();
527+ for (BsonValue document : arguments .getArray ("documents" )) {
528+ writeModels .add (new InsertOneModel <BsonDocument >(document .asDocument ()));
529+ }
530+ BsonDocument result = toResult (e .getWriteResult (), writeModels , e .getWriteErrors ());
531+ result .put ("error" , BsonBoolean .TRUE );
532+ return result ;
507533 }
508- return toResult (new BsonDocument ("insertedIds" , insertedIds ));
509534 }
510535
511536 BsonDocument getReplaceOneResult (final BsonDocument collectionOptions , final BsonDocument arguments ,
@@ -603,33 +628,41 @@ BsonDocument getBulkWriteResult(final BsonDocument collectionOptions, final Bson
603628 requestArguments .getDocument ("update" ),
604629 getUpdateOptions (requestArguments )));
605630 } else if (name .equals ("deleteOne" )) {
606- writeModels .add (new DeleteOneModel <BsonDocument >(requestArguments .getDocument ("filter" )));
631+ writeModels .add (new DeleteOneModel <BsonDocument >(requestArguments .getDocument ("filter" ),
632+ getDeleteOptions (requestArguments )));
607633 } else if (name .equals ("deleteMany" )) {
608- writeModels .add (new DeleteManyModel <BsonDocument >(requestArguments .getDocument ("filter" )));
634+ writeModels .add (new DeleteManyModel <BsonDocument >(requestArguments .getDocument ("filter" ),
635+ getDeleteOptions (requestArguments )));
609636 } else if (name .equals ("replaceOne" )) {
610637 writeModels .add (new ReplaceOneModel <BsonDocument >(requestArguments .getDocument ("filter" ),
611- requestArguments .getDocument ("replacement" )));
638+ requestArguments .getDocument ("replacement" ), getReplaceOptions ( requestArguments ) ));
612639 } else {
613640 throw new UnsupportedOperationException (format ("Unsupported write request type: %s" , name ));
614641 }
615642 }
616643
617- FutureResultCallback <BulkWriteResult > futureResultCallback = new FutureResultCallback <BulkWriteResult >();
618- if (clientSession == null ) {
619- getCollection (collectionOptions ).withWriteConcern (writeConcern )
620- .bulkWrite (writeModels ,
621- new BulkWriteOptions ().ordered (arguments .getDocument ("options" , new BsonDocument ())
622- .getBoolean ("ordered" , BsonBoolean .TRUE ).getValue ()),
623- futureResultCallback );
624- } else {
625- getCollection (collectionOptions ).withWriteConcern (writeConcern )
626- .bulkWrite (clientSession ,
627- writeModels ,
628- new BulkWriteOptions ().ordered (arguments .getDocument ("options" , new BsonDocument ())
629- .getBoolean ("ordered" , BsonBoolean .TRUE ).getValue ()),
630- futureResultCallback );
644+ try {
645+ FutureResultCallback <BulkWriteResult > futureResultCallback = new FutureResultCallback <BulkWriteResult >();
646+ if (clientSession == null ) {
647+ getCollection (collectionOptions ).withWriteConcern (writeConcern )
648+ .bulkWrite (writeModels ,
649+ new BulkWriteOptions ().ordered (arguments .getDocument ("options" , new BsonDocument ())
650+ .getBoolean ("ordered" , BsonBoolean .TRUE ).getValue ()),
651+ futureResultCallback );
652+ } else {
653+ getCollection (collectionOptions ).withWriteConcern (writeConcern )
654+ .bulkWrite (clientSession ,
655+ writeModels ,
656+ new BulkWriteOptions ().ordered (arguments .getDocument ("options" , new BsonDocument ())
657+ .getBoolean ("ordered" , BsonBoolean .TRUE ).getValue ()),
658+ futureResultCallback );
659+ }
660+ return toResult (futureResult (futureResultCallback ), writeModels , Collections .<BulkWriteError >emptyList ());
661+ } catch (MongoBulkWriteException e ) {
662+ BsonDocument result = toResult (e .getWriteResult (), writeModels , e .getWriteErrors ());
663+ result .put ("error" , BsonBoolean .TRUE );
664+ return result ;
631665 }
632- return toResult (futureResult (futureResultCallback ), writeModels );
633666 }
634667
635668 Collation getCollation (final BsonDocument bsonCollation ) {
@@ -675,6 +708,28 @@ private UpdateOptions getUpdateOptions(final BsonDocument requestArguments) {
675708 if (requestArguments .containsKey ("arrayFilters" )) {
676709 options .arrayFilters (getArrayFilters (requestArguments .getArray ("arrayFilters" )));
677710 }
711+ if (requestArguments .containsKey ("collation" )) {
712+ options .collation (getCollation (requestArguments .getDocument ("collation" )));
713+ }
714+ return options ;
715+ }
716+
717+ private DeleteOptions getDeleteOptions (final BsonDocument requestArguments ) {
718+ DeleteOptions options = new DeleteOptions ();
719+ if (requestArguments .containsKey ("collation" )) {
720+ options .collation (getCollation (requestArguments .getDocument ("collation" )));
721+ }
722+ return options ;
723+ }
724+
725+ private ReplaceOptions getReplaceOptions (final BsonDocument requestArguments ) {
726+ ReplaceOptions options = new ReplaceOptions ();
727+ if (requestArguments .containsKey ("upsert" )) {
728+ options .upsert (true );
729+ }
730+ if (requestArguments .containsKey ("collation" )) {
731+ options .collation (getCollation (requestArguments .getDocument ("collation" )));
732+ }
678733 return options ;
679734 }
680735
0 commit comments