From 475824a0093883979eee716eec67276b712ed8e9 Mon Sep 17 00:00:00 2001 From: Dengke Tang Date: Fri, 4 Apr 2025 15:48:54 -0700 Subject: [PATCH 1/4] interface and initial implementation --- crt/aws-c-s3 | 2 +- .../amazon/awssdk/crt/s3/ResumeToken.java | 25 +++ .../amazon/awssdk/crt/s3/S3MetaRequest.java | 19 ++ src/native/java_class_ids.c | 3 + src/native/java_class_ids.h | 1 + src/native/s3_client.c | 170 ++++++++++++++---- .../amazon/awssdk/crt/test/S3ClientTest.java | 14 +- 7 files changed, 198 insertions(+), 36 deletions(-) diff --git a/crt/aws-c-s3 b/crt/aws-c-s3 index 169842b7e..2ca5e0da2 160000 --- a/crt/aws-c-s3 +++ b/crt/aws-c-s3 @@ -1 +1 @@ -Subproject commit 169842b7e2f81d71d0719d4a77f9c3e186512f99 +Subproject commit 2ca5e0da29ca46b5639baf02255c13c90a53fa4b diff --git a/src/main/java/software/amazon/awssdk/crt/s3/ResumeToken.java b/src/main/java/software/amazon/awssdk/crt/s3/ResumeToken.java index 376e01298..6f0f2ce66 100644 --- a/src/main/java/software/amazon/awssdk/crt/s3/ResumeToken.java +++ b/src/main/java/software/amazon/awssdk/crt/s3/ResumeToken.java @@ -11,6 +11,7 @@ static public class PutResumeTokenBuilder { private long totalNumParts; private long numPartsCompleted; private String uploadId; + private String objectLastModified; /** * Default constructor @@ -53,6 +54,15 @@ public PutResumeTokenBuilder withUploadId(String uploadId) { return this; } + /** + * @param objectLastModified objectLastModified time from the server + * @return this resume token object + */ + public PutResumeTokenBuilder withObjectLastModified(String objectLastModified) { + this.objectLastModified = objectLastModified; + return this; + } + public ResumeToken build() { return new ResumeToken(this); } @@ -63,6 +73,7 @@ public ResumeToken build() { private long totalNumParts; private long numPartsCompleted; private String uploadId; + private String objectLastModified; public ResumeToken(PutResumeTokenBuilder builder) { this.nativeType = S3MetaRequestOptions.MetaRequestType.PUT_OBJECT.getNativeValue(); @@ -70,6 +81,7 @@ public ResumeToken(PutResumeTokenBuilder builder) { this.totalNumParts = builder.totalNumParts; this.numPartsCompleted = builder.numPartsCompleted; this.uploadId = builder.uploadId; + this.objectLastModified = builder.objectLastModified; } /** * Default constructor @@ -122,4 +134,17 @@ public String getUploadId() { return uploadId; } + /****** + * Download Specific fields. + ******/ + /** + * @return Object last modified time + */ + public String getObjectLastModifiedString() { + if (getType() != S3MetaRequestOptions.MetaRequestType.GET_OBJECT) { + throw new IllegalArgumentException("ResumeToken - Object last modified time is only defined for Get Object Resume tokens"); + } + + return objectLastModified; + } } diff --git a/src/main/java/software/amazon/awssdk/crt/s3/S3MetaRequest.java b/src/main/java/software/amazon/awssdk/crt/s3/S3MetaRequest.java index 7824bac78..596685ce2 100644 --- a/src/main/java/software/amazon/awssdk/crt/s3/S3MetaRequest.java +++ b/src/main/java/software/amazon/awssdk/crt/s3/S3MetaRequest.java @@ -6,6 +6,7 @@ import java.util.concurrent.CompletableFuture; import software.amazon.awssdk.crt.CrtResource; +import software.amazon.awssdk.crt.CrtRuntimeException; public class S3MetaRequest extends CrtResource { @@ -75,6 +76,20 @@ public ResumeToken pause() { return s3MetaRequestPause(getNativeHandle()); } + public CompletableFuture pauseAsync() { + CompletableFuture future = new CompletableFuture<>(); + if (isNull()) { + throw new IllegalStateException("S3MetaRequest has been closed."); + } + try { + s3MetaRequestPauseAsync(getNativeHandle(), future); + } catch (Exception e) { + future.completeExceptionally(e); + } + + return future; + } + /** * Increment the flow-control window, so that response data continues downloading. *

@@ -114,5 +129,9 @@ public void incrementReadWindow(long bytes) { private static native ResumeToken s3MetaRequestPause(long s3MetaRequest); + private static native void s3MetaRequestPauseAsync( + long s3MetaRequest, + CompletableFuture future) throws CrtRuntimeException; + private static native void s3MetaRequestIncrementReadWindow(long s3MetaRequest, long bytes); } diff --git a/src/native/java_class_ids.c b/src/native/java_class_ids.c index ffa0041f0..621cdf526 100644 --- a/src/native/java_class_ids.c +++ b/src/native/java_class_ids.c @@ -988,6 +988,9 @@ static void s_cache_s3_meta_request_resume_token(JNIEnv *env) { s3_meta_request_resume_token_properties.upload_id_field_id = (*env)->GetFieldID(env, cls, "uploadId", "Ljava/lang/String;"); AWS_FATAL_ASSERT(s3_meta_request_resume_token_properties.upload_id_field_id); + s3_meta_request_resume_token_properties.object_last_modified_field_id = + (*env)->GetFieldID(env, cls, "objectLastModified", "Ljava/lang/String;"); + AWS_FATAL_ASSERT(s3_meta_request_resume_token_properties.object_last_modified_field_id); } struct java_aws_mqtt5_connack_packet_properties mqtt5_connack_packet_properties; diff --git a/src/native/java_class_ids.h b/src/native/java_class_ids.h index 304c86fe1..d93db1f5e 100644 --- a/src/native/java_class_ids.h +++ b/src/native/java_class_ids.h @@ -455,6 +455,7 @@ struct java_aws_s3_meta_request_resume_token { jfieldID total_num_parts_field_id; jfieldID num_parts_completed_field_id; jfieldID upload_id_field_id; + jfieldID object_last_modified_field_id; }; extern struct java_aws_s3_meta_request_resume_token s3_meta_request_resume_token_properties; diff --git a/src/native/s3_client.c b/src/native/s3_client.c index 7754387b3..1b96a5e8a 100644 --- a/src/native/s3_client.c +++ b/src/native/s3_client.c @@ -1210,28 +1210,7 @@ JNIEXPORT void JNICALL Java_software_amazon_awssdk_crt_s3_S3MetaRequest_s3MetaRe aws_s3_meta_request_cancel(meta_request); } -JNIEXPORT jobject JNICALL Java_software_amazon_awssdk_crt_s3_S3MetaRequest_s3MetaRequestPause( - JNIEnv *env, - jclass jni_class, - jlong jni_s3_meta_request) { - - (void)jni_class; - aws_cache_jni_ids(env); - - struct aws_s3_meta_request *meta_request = (struct aws_s3_meta_request *)jni_s3_meta_request; - if (!meta_request) { - aws_raise_error(AWS_ERROR_INVALID_ARGUMENT); - aws_jni_throw_illegal_argument_exception(env, "S3MetaRequest.s3MetaRequestPause: Invalid/null meta request"); - return NULL; - } - - struct aws_s3_meta_request_resume_token *resume_token = NULL; - - if (aws_s3_meta_request_pause(meta_request, &resume_token)) { - aws_jni_throw_runtime_exception(env, "S3MetaRequest.s3MetaRequestPause: Failed to pause request"); - return NULL; - } - +static jobject s_java_resume_token_from_native(JNIEnv *env, struct aws_s3_meta_request_resume_token *resume_token) { jobject resume_token_jni = NULL; if (resume_token != NULL) { resume_token_jni = (*env)->NewObject( @@ -1240,15 +1219,10 @@ JNIEXPORT jobject JNICALL Java_software_amazon_awssdk_crt_s3_S3MetaRequest_s3Met s3_meta_request_resume_token_properties.s3_meta_request_resume_token_constructor_method_id); if ((*env)->ExceptionCheck(env) || resume_token_jni == NULL) { aws_jni_throw_runtime_exception(env, "S3MetaRequest.s3MetaRequestPause: Failed to create ResumeToken."); - goto on_done; + goto done; } enum aws_s3_meta_request_type type = aws_s3_meta_request_resume_token_type(resume_token); - if (type != AWS_S3_META_REQUEST_TYPE_PUT_OBJECT) { - aws_jni_throw_runtime_exception(env, "S3MetaRequest.s3MetaRequestPause: Failed to convert resume token."); - goto on_done; - } - (*env)->SetIntField(env, resume_token_jni, s3_meta_request_resume_token_properties.native_type_field_id, type); (*env)->SetLongField( env, @@ -1267,17 +1241,147 @@ JNIEXPORT jobject JNICALL Java_software_amazon_awssdk_crt_s3_S3MetaRequest_s3Met aws_s3_meta_request_resume_token_num_parts_completed(resume_token)); struct aws_byte_cursor upload_id_cur = aws_s3_meta_request_resume_token_upload_id(resume_token); - jstring upload_id_jni = aws_jni_string_from_cursor(env, &upload_id_cur); - (*env)->SetObjectField( - env, resume_token_jni, s3_meta_request_resume_token_properties.upload_id_field_id, upload_id_jni); + if (upload_id_cur.len > 0) { + jstring upload_id_jni = aws_jni_string_from_cursor(env, &upload_id_cur); + (*env)->SetObjectField( + env, resume_token_jni, s3_meta_request_resume_token_properties.upload_id_field_id, upload_id_jni); + + (*env)->DeleteLocalRef(env, upload_id_jni); + } + struct aws_byte_cursor object_last_modified_cur = aws_s3_meta_request_resume_object_last_modified(resume_token); + if (object_last_modified_cur.len > 0) { + jstring object_last_modified = aws_jni_string_from_cursor(env, &object_last_modified_cur); + (*env)->SetObjectField( + env, + resume_token_jni, + s3_meta_request_resume_token_properties.object_last_modified_field_id, + object_last_modified); + + (*env)->DeleteLocalRef(env, object_last_modified); + } + printf("Resume token: %p\n", (void *)resume_token); + printf("Java resume token: %p\n", (void *)resume_token_jni); + printf("Resume token type: %d\n", type); + printf( + "Resume token part size: %llu\n", + (unsigned long long)aws_s3_meta_request_resume_token_part_size(resume_token)); + printf( + "Resume token total num parts: %llu\n", + (unsigned long long)aws_s3_meta_request_resume_token_total_num_parts(resume_token)); + printf( + "Resume token num parts completed: %llu\n", + (unsigned long long)aws_s3_meta_request_resume_token_num_parts_completed(resume_token)); + } +done: + return resume_token_jni; +} + +JNIEXPORT jobject JNICALL Java_software_amazon_awssdk_crt_s3_S3MetaRequest_s3MetaRequestPause( + JNIEnv *env, + jclass jni_class, + jlong jni_s3_meta_request) { + + (void)jni_class; + aws_cache_jni_ids(env); - (*env)->DeleteLocalRef(env, upload_id_jni); + struct aws_s3_meta_request *meta_request = (struct aws_s3_meta_request *)jni_s3_meta_request; + if (!meta_request) { + aws_raise_error(AWS_ERROR_INVALID_ARGUMENT); + aws_jni_throw_illegal_argument_exception(env, "S3MetaRequest.s3MetaRequestPause: Invalid/null meta request"); + return NULL; + } + + struct aws_s3_meta_request_resume_token *resume_token = NULL; + + if (aws_s3_meta_request_pause(meta_request, &resume_token)) { + aws_jni_throw_runtime_exception(env, "S3MetaRequest.s3MetaRequestPause: Failed to pause request"); + return NULL; } -on_done: + jobject resume_token_jni = s_java_resume_token_from_native(env, resume_token); aws_s3_meta_request_resume_token_release(resume_token); return resume_token_jni; } +struct s_pause_callback_data { + JavaVM *jvm; + jobject java_pause_future; +}; + +static void s_s3_meta_request_pause_complete( + struct aws_s3_meta_request *meta_request, + struct aws_s3_meta_request_resume_token *resume_token, + void *user_data) { + (void)meta_request; + struct s_pause_callback_data *callback_data = (struct s_pause_callback_data *)user_data; + /********** JNI ENV ACQUIRE **********/ + JNIEnv *env = aws_jni_acquire_thread_env(callback_data->jvm); + JavaVM *jvm = callback_data->jvm; + if (env == NULL) { + /* If we can't get an environment, then the JVM is probably shutting down. Don't crash. */ + return; + } + + jobject resume_token_jni = s_java_resume_token_from_native(env, resume_token); + if (resume_token_jni == NULL) { + jobject exception; + if (!aws_jni_get_and_clear_exception(env, &exception)) { + /* No exception raised from Java, create our own */ + exception = aws_jni_new_crt_exception_from_error_code(env, aws_last_error()); + } + + (*env)->CallBooleanMethod( + env, + callback_data->java_pause_future, + completable_future_properties.complete_exceptionally_method_id, + exception); + + goto done; + } + + (*env)->CallBooleanMethod( + env, callback_data->java_pause_future, completable_future_properties.complete_method_id, resume_token_jni); + (*env)->DeleteLocalRef(env, resume_token_jni); + +done: + (*env)->DeleteGlobalRef(env, callback_data->java_pause_future); + aws_mem_release(aws_jni_get_allocator(), callback_data); + aws_jni_release_thread_env(jvm, env); + /********** JNI ENV RELEASE **********/ +} + +JNIEXPORT void JNICALL Java_software_amazon_awssdk_crt_s3_S3MetaRequest_s3MetaRequestPauseAsync( + JNIEnv *env, + jclass jni_class, + jlong jni_s3_meta_request, + jobject java_pause_future) { + + (void)jni_class; + aws_cache_jni_ids(env); + + struct aws_s3_meta_request *meta_request = (struct aws_s3_meta_request *)jni_s3_meta_request; + if (!meta_request) { + aws_raise_error(AWS_ERROR_INVALID_ARGUMENT); + aws_jni_throw_illegal_argument_exception( + env, "S3MetaRequest.s3MetaRequestPauseAsync: Invalid/null meta request"); + return; + } + + struct aws_allocator *allocator = aws_jni_get_allocator(); + struct s_pause_callback_data *callback_data = aws_mem_calloc(allocator, 1, sizeof(struct s_pause_callback_data)); + jint jvmresult = (*env)->GetJavaVM(env, &callback_data->jvm); + AWS_FATAL_ASSERT(jvmresult == 0); + + callback_data->java_pause_future = (*env)->NewGlobalRef(env, java_pause_future); + AWS_FATAL_ASSERT(callback_data->java_pause_future != NULL); + + if (aws_s3_meta_request_pause_async(meta_request, s_s3_meta_request_pause_complete, callback_data)) { + printf("Failed to pause async, last error: %d\n", aws_last_error()); + (*env)->DeleteGlobalRef(env, callback_data->java_pause_future); + aws_mem_release(allocator, callback_data); + aws_jni_throw_runtime_exception(env, "S3MetaRequest.s3MetaRequestPauseAsync: Failed to pause request"); + return; + } +} JNIEXPORT void JNICALL Java_software_amazon_awssdk_crt_s3_S3MetaRequest_s3MetaRequestIncrementReadWindow( JNIEnv *env, diff --git a/src/test/java/software/amazon/awssdk/crt/test/S3ClientTest.java b/src/test/java/software/amazon/awssdk/crt/test/S3ClientTest.java index da80c5c75..6156fde9a 100644 --- a/src/test/java/software/amazon/awssdk/crt/test/S3ClientTest.java +++ b/src/test/java/software/amazon/awssdk/crt/test/S3ClientTest.java @@ -337,14 +337,24 @@ public void onFinished(S3FinishedResponseContext context) { }; HttpHeader[] headers = { new HttpHeader("Host", ENDPOINT) }; - HttpRequest httpRequest = new HttpRequest("GET", PRE_EXIST_1MB_PATH, headers, null); + HttpRequest httpRequest = new HttpRequest("GET", "/pre-existing-2GB", headers, null); S3MetaRequestOptions metaRequestOptions = new S3MetaRequestOptions() .withMetaRequestType(MetaRequestType.GET_OBJECT).withHttpRequest(httpRequest) .withResponseHandler(responseHandler); try (S3MetaRequest metaRequest = client.makeMetaRequest(metaRequestOptions)) { - Assert.assertEquals(Integer.valueOf(0), onFinishedFuture.get()); + Thread.sleep(3000); // sleep 1 sec to pause + CompletableFuture pauseFuture = metaRequest.pauseAsync(); + Assert.assertNotNull(pauseFuture); + ResumeToken token = pauseFuture.get(); + // ResumeToken token = metaRequest.pause(); + Assert.assertNotNull(token); + System.err.println("Resume token getObjectLastModifiedString from Java: " + token.getObjectLastModifiedString()); + System.err.println("Resume token part size from Java: " + token.getPartSize()); + System.err.println("Resume token getNumPartsCompleted from Java: " + token.getNumPartsCompleted()); + System.err.println("Resume token getTotalNumParts from Java: " + token.getTotalNumParts()); + // Assert.assertEquals(Integer.valueOf(14352), onFinishedFuture.get()); } } catch (InterruptedException | ExecutionException ex) { Assert.fail(ex.getMessage()); From 411e50d6ad61382716294897ac3e5914bbbeb105 Mon Sep 17 00:00:00 2001 From: Dengke Tang Date: Fri, 4 Apr 2025 15:49:28 -0700 Subject: [PATCH 2/4] return code --- crt/aws-c-s3 | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crt/aws-c-s3 b/crt/aws-c-s3 index 2ca5e0da2..30862dfb6 160000 --- a/crt/aws-c-s3 +++ b/crt/aws-c-s3 @@ -1 +1 @@ -Subproject commit 2ca5e0da29ca46b5639baf02255c13c90a53fa4b +Subproject commit 30862dfb6a1bd44289d3f76195dcac3f0bdc719e From 74ca8c7f0417d83dfc885e385d700cccb878dd59 Mon Sep 17 00:00:00 2001 From: Dengke Tang Date: Fri, 4 Apr 2025 16:09:35 -0700 Subject: [PATCH 3/4] leak --- crt/aws-c-s3 | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crt/aws-c-s3 b/crt/aws-c-s3 index 30862dfb6..f20a0634e 160000 --- a/crt/aws-c-s3 +++ b/crt/aws-c-s3 @@ -1 +1 @@ -Subproject commit 30862dfb6a1bd44289d3f76195dcac3f0bdc719e +Subproject commit f20a0634e65c907a7a024f7107d22dac63841235 From c4731b8407c8b7e719a8497b45b1d95b0be57c82 Mon Sep 17 00:00:00 2001 From: Dengke Tang Date: Fri, 4 Apr 2025 16:13:01 -0700 Subject: [PATCH 4/4] oh, don't add the constructor --- .../software/amazon/awssdk/crt/s3/ResumeToken.java | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/src/main/java/software/amazon/awssdk/crt/s3/ResumeToken.java b/src/main/java/software/amazon/awssdk/crt/s3/ResumeToken.java index 6f0f2ce66..5bd654828 100644 --- a/src/main/java/software/amazon/awssdk/crt/s3/ResumeToken.java +++ b/src/main/java/software/amazon/awssdk/crt/s3/ResumeToken.java @@ -11,7 +11,6 @@ static public class PutResumeTokenBuilder { private long totalNumParts; private long numPartsCompleted; private String uploadId; - private String objectLastModified; /** * Default constructor @@ -54,15 +53,6 @@ public PutResumeTokenBuilder withUploadId(String uploadId) { return this; } - /** - * @param objectLastModified objectLastModified time from the server - * @return this resume token object - */ - public PutResumeTokenBuilder withObjectLastModified(String objectLastModified) { - this.objectLastModified = objectLastModified; - return this; - } - public ResumeToken build() { return new ResumeToken(this); } @@ -81,7 +71,6 @@ public ResumeToken(PutResumeTokenBuilder builder) { this.totalNumParts = builder.totalNumParts; this.numPartsCompleted = builder.numPartsCompleted; this.uploadId = builder.uploadId; - this.objectLastModified = builder.objectLastModified; } /** * Default constructor