@@ -186,6 +186,103 @@ func TestBulkIndexer(t *testing.T) {
186186 }
187187 })
188188
189+ t .Run ("BulkIndexerConfig.QueueSizeMultiplier" , func (t * testing.T ) {
190+ tests := []struct {
191+ name string
192+ numWorkers int
193+ queueSizeMultiplier int
194+ expectedQueueCap int
195+ }{
196+ {
197+ name : "Default QueueSizeMultiplier with 1 worker" ,
198+ numWorkers : 1 ,
199+ queueSizeMultiplier : 0 ,
200+ expectedQueueCap : 1 ,
201+ },
202+ {
203+ name : "Default QueueSizeMultiplier with 2 workers" ,
204+ numWorkers : 2 ,
205+ queueSizeMultiplier : 0 ,
206+ expectedQueueCap : 2 ,
207+ },
208+ {
209+ name : "QueueSizeMultiplier=2 with 1 worker" ,
210+ numWorkers : 1 ,
211+ queueSizeMultiplier : 2 ,
212+ expectedQueueCap : 2 ,
213+ },
214+ {
215+ name : "QueueSizeMultiplier=5 with 1 worker" ,
216+ numWorkers : 1 ,
217+ queueSizeMultiplier : 5 ,
218+ expectedQueueCap : 5 ,
219+ },
220+ {
221+ name : "QueueSizeMultiplier=10 with 1 worker" ,
222+ numWorkers : 1 ,
223+ queueSizeMultiplier : 10 ,
224+ expectedQueueCap : 10 ,
225+ },
226+ {
227+ name : "QueueSizeMultiplier=1 with 4 workers" ,
228+ numWorkers : 4 ,
229+ queueSizeMultiplier : 1 ,
230+ expectedQueueCap : 4 ,
231+ },
232+ {
233+ name : "QueueSizeMultiplier=2 with 4 workers" ,
234+ numWorkers : 4 ,
235+ queueSizeMultiplier : 2 ,
236+ expectedQueueCap : 8 ,
237+ },
238+ {
239+ name : "QueueSizeMultiplier=5 with 3 workers" ,
240+ numWorkers : 3 ,
241+ queueSizeMultiplier : 5 ,
242+ expectedQueueCap : 15 ,
243+ },
244+ {
245+ name : "QueueSizeMultiplier=-1 with 1 worker" ,
246+ numWorkers : 1 ,
247+ queueSizeMultiplier : - 1 ,
248+ expectedQueueCap : 1 ,
249+ },
250+ }
251+
252+ for _ , tt := range tests {
253+ tt := tt
254+ t .Run (tt .name , func (t * testing.T ) {
255+ es , err := elasticsearch .NewClient (elasticsearch.Config {Transport : & mockTransport {}})
256+ if err != nil {
257+ t .Fatalf ("Unexpected error: %s" , err )
258+ }
259+
260+ cfg := BulkIndexerConfig {
261+ NumWorkers : tt .numWorkers ,
262+ QueueSizeMultiplier : tt .queueSizeMultiplier ,
263+ Client : es ,
264+ }
265+
266+ bi , err := NewBulkIndexer (cfg )
267+ if err != nil {
268+ t .Fatalf ("Unexpected error: %s" , err )
269+ }
270+
271+ bir , ok := bi .(* bulkIndexer )
272+ if ! ok {
273+ t .Fatalf ("Unexpected type: %T" , bi )
274+ }
275+
276+ if queueCap := cap (bir .queue ); queueCap != tt .expectedQueueCap {
277+ t .Errorf ("Unexpected queue capacity: want=%d, got=%d" , tt .expectedQueueCap , queueCap )
278+ }
279+
280+ // Clean up
281+ _ = bi .Close (context .Background ())
282+ })
283+ }
284+ })
285+
189286 t .Run ("Add() Timeout" , func (t * testing.T ) {
190287 es , _ := elasticsearch .NewClient (elasticsearch.Config {Transport : & mockTransport {}})
191288 bi , _ := NewBulkIndexer (BulkIndexerConfig {NumWorkers : 1 , Client : es })
0 commit comments