Skip to content

Commit c959433

Browse files
authored
Fix bug in FileAsyncRequestBody where inflight parts were negative (#6564)
* Fix bug in FileAsyncRequestBody where inflight parts were negative * cleanups * Update test * Alternative approach * Cleanup/rename * Fix spotbugs
1 parent e7032e8 commit c959433

File tree

3 files changed

+16
-3
lines changed

3 files changed

+16
-3
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "bugfix",
3+
"category": "Amazon S3",
4+
"contributor": "",
5+
"description": "Fix bug in S3 Multipart uploads with FileAsyncRequestBody - ensure that concurrency is limited correctly by bufferSizeInBytes"
6+
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodySplitHelper.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@ private final class FileAsyncRequestBodyWrapper implements AsyncRequestBody {
166166

167167
private final FileAsyncRequestBody fileAsyncRequestBody;
168168
private final SimplePublisher<AsyncRequestBody> simplePublisher;
169+
private final AtomicBoolean hasCompleted = new AtomicBoolean(false);
169170

170171
FileAsyncRequestBodyWrapper(FileAsyncRequestBody fileAsyncRequestBody,
171172
SimplePublisher<AsyncRequestBody> simplePublisher) {
@@ -175,16 +176,22 @@ private final class FileAsyncRequestBodyWrapper implements AsyncRequestBody {
175176

176177
@Override
177178
public void subscribe(Subscriber<? super ByteBuffer> s) {
178-
fileAsyncRequestBody.doAfterOnComplete(() -> startNextRequestBody(simplePublisher))
179+
fileAsyncRequestBody.doAfterOnComplete(this::startNextIfNeeded)
179180
// The reason we still need to call startNextRequestBody when the subscription is
180181
// cancelled is that upstream could cancel the subscription even though the stream has
181182
// finished successfully before onComplete. If this happens, doAfterOnComplete callback
182183
// will never be invoked, and if the current buffer is full, the publisher will stop
183184
// sending new FileAsyncRequestBody, leading to uncompleted future.
184-
.doAfterOnCancel(() -> startNextRequestBody(simplePublisher))
185+
.doAfterOnCancel(this::startNextIfNeeded)
185186
.subscribe(s);
186187
}
187188

189+
private void startNextIfNeeded() {
190+
if (hasCompleted.compareAndSet(false, true)) {
191+
startNextRequestBody(simplePublisher);
192+
}
193+
}
194+
188195
@Override
189196
public Optional<Long> contentLength() {
190197
return fileAsyncRequestBody.contentLength();

core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodySplitHelperTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ private static Runnable verifyConcurrentRequests(FileAsyncRequestBodySplitHelper
9292
if (concurrency > maxConcurrency.get()) {
9393
maxConcurrency.set(concurrency);
9494
}
95-
assertThat(helper.numAsyncRequestBodiesInFlight()).hasValueLessThan(10);
95+
assertThat(helper.numAsyncRequestBodiesInFlight()).hasValueBetween(0,10);
9696
};
9797
}
9898
}

0 commit comments

Comments
 (0)