diff --git a/core/src/main/java/com/scalar/db/common/CoreError.java b/core/src/main/java/com/scalar/db/common/CoreError.java index d907f54375..d93f19d182 100644 --- a/core/src/main/java/com/scalar/db/common/CoreError.java +++ b/core/src/main/java/com/scalar/db/common/CoreError.java @@ -1028,6 +1028,12 @@ public enum CoreError implements ScalarDbError { + "you may be able to adjust the settings to enable consistent reads. Please refer to the storage configuration for details. Storage: %s", "", ""), + OBJECT_STORAGE_BLOB_EXCEEDS_MAX_LENGTH_ALLOWED( + Category.USER_ERROR, + "0279", + "The size of a BLOB column value exceeds the maximum allowed size of %d bytes. Column: %s; Size: %d bytes", + "", + ""), // // Errors for the concurrency error category diff --git a/core/src/main/java/com/scalar/db/storage/objectstorage/ObjectStorageOperationChecker.java b/core/src/main/java/com/scalar/db/storage/objectstorage/ObjectStorageOperationChecker.java index 38322b88c4..30849ffb55 100644 --- a/core/src/main/java/com/scalar/db/storage/objectstorage/ObjectStorageOperationChecker.java +++ b/core/src/main/java/com/scalar/db/storage/objectstorage/ObjectStorageOperationChecker.java @@ -23,12 +23,61 @@ import com.scalar.db.io.TimeColumn; import com.scalar.db.io.TimestampColumn; import com.scalar.db.io.TimestampTZColumn; +import java.nio.ByteBuffer; public class ObjectStorageOperationChecker extends OperationChecker { private static final char[] ILLEGAL_CHARACTERS_IN_PRIMARY_KEY = { ObjectStorageUtils.OBJECT_KEY_DELIMITER, ObjectStorageUtils.CONCATENATED_KEY_DELIMITER, }; + private static final ColumnVisitor COMMON_COLUMN_CHECKER = + new ColumnVisitor() { + @Override + public void visit(BooleanColumn column) {} + + @Override + public void visit(IntColumn column) {} + + @Override + public void visit(BigIntColumn column) {} + + @Override + public void visit(FloatColumn column) {} + + @Override + public void visit(DoubleColumn column) {} + + @Override + public void visit(TextColumn column) {} + + @Override + public void visit(BlobColumn column) { + ByteBuffer buffer = column.getBlobValue(); + if (buffer == null) { + return; + } + // Calculate the maximum allowed blob length after Base64 encoding. + long allowedLength = (long) Serializer.MAX_STRING_LENGTH_ALLOWED / 4 * 3; + if (buffer.remaining() > allowedLength) { + throw new IllegalArgumentException( + CoreError.OBJECT_STORAGE_BLOB_EXCEEDS_MAX_LENGTH_ALLOWED.buildMessage( + Serializer.MAX_STRING_LENGTH_ALLOWED, column.getName(), buffer.remaining())); + } + } + + @Override + public void visit(DateColumn column) {} + + @Override + public void visit(TimeColumn column) {} + + @Override + public void visit(TimestampColumn column) {} + + @Override + public void visit(TimestampTZColumn column) {} + }; + private static final ColumnVisitor PRIMARY_KEY_COLUMN_CHECKER = new ColumnVisitor() { @Override @@ -104,6 +153,7 @@ public void check(Scan scan) throws ExecutionException { @Override public void check(Put put) throws ExecutionException { super.check(put); + put.getColumns().values().forEach(column -> column.accept(COMMON_COLUMN_CHECKER)); checkPrimaryKey(put); } diff --git a/core/src/main/java/com/scalar/db/storage/objectstorage/Serializer.java b/core/src/main/java/com/scalar/db/storage/objectstorage/Serializer.java index 93c41822d5..9e9cb346a9 100644 --- a/core/src/main/java/com/scalar/db/storage/objectstorage/Serializer.java +++ b/core/src/main/java/com/scalar/db/storage/objectstorage/Serializer.java @@ -1,5 +1,6 @@ package com.scalar.db.storage.objectstorage; +import com.fasterxml.jackson.core.StreamReadConstraints; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; @@ -7,9 +8,14 @@ import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; public class Serializer { + public static final int MAX_STRING_LENGTH_ALLOWED = Integer.MAX_VALUE; private static final ObjectMapper mapper = new ObjectMapper(); static { + mapper + .getFactory() + .setStreamReadConstraints( + StreamReadConstraints.builder().maxStringLength(MAX_STRING_LENGTH_ALLOWED).build()); mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); mapper.configure(SerializationFeature.WRAP_ROOT_VALUE, false); mapper.registerModule(new JavaTimeModule()); diff --git a/core/src/test/java/com/scalar/db/storage/objectstorage/ObjectStorageOperationCheckerTest.java b/core/src/test/java/com/scalar/db/storage/objectstorage/ObjectStorageOperationCheckerTest.java index 7ab328e823..6bdbd2e599 100644 --- a/core/src/test/java/com/scalar/db/storage/objectstorage/ObjectStorageOperationCheckerTest.java +++ b/core/src/test/java/com/scalar/db/storage/objectstorage/ObjectStorageOperationCheckerTest.java @@ -9,6 +9,8 @@ import static org.assertj.core.api.Assertions.assertThatCode; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; import static org.mockito.MockitoAnnotations.openMocks; @@ -24,9 +26,14 @@ import com.scalar.db.common.TableMetadataManager; import com.scalar.db.config.DatabaseConfig; import com.scalar.db.exception.storage.ExecutionException; +import com.scalar.db.io.BlobColumn; +import com.scalar.db.io.Column; import com.scalar.db.io.DataType; import com.scalar.db.io.Key; +import java.nio.ByteBuffer; import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.Map; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.Mock; @@ -807,6 +814,57 @@ public void check_ForMutationsWithDeleteWithCondition_ShouldBehaveProperly() .doesNotThrowAnyException(); } + @Test + public void check_PutGiven_WhenBlobColumnIsWithinLimit_ShouldNotThrowException() + throws ExecutionException { + // Arrange + when(metadataManager.getTableMetadata(any())).thenReturn(TABLE_METADATA1); + + byte[] blob = new byte[100]; + Put put = + Put.newBuilder() + .namespace(NAMESPACE_NAME) + .table(TABLE_NAME) + .partitionKey(Key.ofInt(PKEY1, 0)) + .clusteringKey(Key.ofInt(CKEY1, 0)) + .blobValue(COL4, blob) + .build(); + + // Act Assert + assertThatCode(() -> operationChecker.check(put)).doesNotThrowAnyException(); + } + + @Test + public void check_PutGiven_WhenBlobColumnExceedsLimit_ShouldThrowIllegalArgumentException() + throws ExecutionException { + // Arrange + when(metadataManager.getTableMetadata(any())).thenReturn(TABLE_METADATA1); + + int allowedLength = Serializer.MAX_STRING_LENGTH_ALLOWED / 4 * 3; + ByteBuffer mockBuffer = mock(ByteBuffer.class); + when(mockBuffer.remaining()).thenReturn(allowedLength + 1); + + BlobColumn blobColumn = mock(BlobColumn.class); + when(blobColumn.getName()).thenReturn(COL4); + when(blobColumn.getBlobValue()).thenReturn(mockBuffer); + + Put put = + spy( + Put.newBuilder() + .namespace(NAMESPACE_NAME) + .table(TABLE_NAME) + .partitionKey(Key.ofInt(PKEY1, 0)) + .clusteringKey(Key.ofInt(CKEY1, 0)) + .build()); + Map> columns = new LinkedHashMap<>(); + columns.put(COL4, blobColumn); + when(put.getColumns()).thenReturn(columns); + + // Act Assert + assertThatThrownBy(() -> operationChecker.check(put)) + .isInstanceOf(IllegalArgumentException.class); + } + private Put buildPutWithCondition(MutationCondition condition) { return Put.newBuilder() .namespace(NAMESPACE_NAME) diff --git a/integration-test/src/main/java/com/scalar/db/api/DistributedStorageIntegrationTestBase.java b/integration-test/src/main/java/com/scalar/db/api/DistributedStorageIntegrationTestBase.java index 0e6b21e47f..c166a002f1 100644 --- a/integration-test/src/main/java/com/scalar/db/api/DistributedStorageIntegrationTestBase.java +++ b/integration-test/src/main/java/com/scalar/db/api/DistributedStorageIntegrationTestBase.java @@ -2396,7 +2396,7 @@ private void populateRecords() { puts.forEach(p -> assertThatCode(() -> storage.put(p)).doesNotThrowAnyException()); } - private Get prepareGet(int pKey, int cKey) { + protected Get prepareGet(int pKey, int cKey) { Key partitionKey = Key.ofInt(getColumnName1(), pKey); Key clusteringKey = Key.ofInt(getColumnName4(), cKey); return Get.newBuilder() @@ -2407,7 +2407,7 @@ private Get prepareGet(int pKey, int cKey) { .build(); } - private List preparePuts() { + protected List preparePuts() { List puts = new ArrayList<>(); IntStream.range(0, 5)