diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileInfo.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileInfo.java index 5d136e3c2b..6aa0b05f98 100644 --- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileInfo.java +++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileInfo.java @@ -30,7 +30,6 @@ import org.slf4j.LoggerFactory; import java.io.IOException; -import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.nio.channels.SeekableByteChannel; import java.nio.file.Files; @@ -68,10 +67,6 @@ long getCommittedSize() { "File " + getRelativePath() + " size is unknown."); } - void flush() throws IOException { - // no-op - } - ByteString read(CheckedFunction resolver, long offset, long length, boolean readCommitted) throws IOException { if (readCommitted && offset + length > getCommittedSize()) { @@ -186,8 +181,7 @@ CompletableFuture submitCreate( + close + ") @" + id + ":" + index; final CheckedSupplier task = LogUtils.newCheckedSupplier(LOG, () -> { if (out == null) { - out = new FileStore.FileStoreDataChannel(new RandomAccessFile(resolver.apply(getRelativePath()).toFile(), - "rw")); + out = new FileStore.FileStoreDataChannel(resolver.apply(getRelativePath())); } return write(0L, data, close, sync); }, name); diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java index e910a99e3e..8896462a49 100644 --- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java +++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java @@ -259,7 +259,7 @@ public CompletableFuture createDataChannel(String p) { return CompletableFuture.supplyAsync(() -> { try { final Path full = resolve(normalize(p)); - return new FileStoreDataChannel(new RandomAccessFile(full.toFile(), "rw")); + return new FileStoreDataChannel(full); } catch (IOException e) { throw new CompletionException("Failed to create " + p, e); } @@ -267,14 +267,17 @@ public CompletableFuture createDataChannel(String p) { } static class FileStoreDataChannel implements StateMachine.DataChannel { + private final Path path; private final RandomAccessFile randomAccessFile; - FileStoreDataChannel(RandomAccessFile file) { - randomAccessFile = file; + FileStoreDataChannel(Path path) throws FileNotFoundException { + this.path = path; + this.randomAccessFile = new RandomAccessFile(path.toFile(), "rw"); } @Override public void force(boolean metadata) throws IOException { + LOG.debug("force({}) at {}", metadata, path); randomAccessFile.getChannel().force(metadata); } diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java index 509b1441cb..d6e68511bd 100644 --- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java +++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java @@ -153,7 +153,7 @@ public long write(String path, long offset, boolean close, ByteBuffer buffer, bo return WriteReplyProto.parseFrom(reply).getLength(); } - public DataStreamOutput getStreamOutput(String path, int dataSize) { + public DataStreamOutput getStreamOutput(String path, long dataSize) { final StreamWriteRequestProto header = StreamWriteRequestProto.newBuilder() .setPath(ProtoUtils.toByteString(path)) .setLength(dataSize) diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java index 8dab01aecb..a5b4a7213e 100644 --- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java +++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java @@ -24,8 +24,8 @@ import org.apache.ratis.examples.filestore.FileStoreClient; import org.apache.ratis.protocol.DataStreamReply; import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf; -import org.apache.ratis.thirdparty.io.netty.buffer.ByteBufAllocator; import org.apache.ratis.thirdparty.io.netty.buffer.PooledByteBufAllocator; +import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.Preconditions; import java.io.File; @@ -34,35 +34,74 @@ import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.function.BiFunction; /** * Subcommand to generate load in filestore data stream state machine. */ @Parameters(commandDescription = "Load Generator for FileStore DataStream") public class DataStream extends Client { + enum Type { + DirectByteBuffer(DirectByteBufferType::new), + MappedByteBuffer(MappedByteBufferType::new), + NettyFileRegion(NettyFileRegionType::new); - @Parameter(names = {"--type"}, description = "DirectByteBuffer, MappedByteBuffer, NettyFileRegion", required = true) - private String dataStreamType = "NettyFileRegion"; + private final BiFunction constructor; + + Type(BiFunction constructor) { + this.constructor = constructor; + } + + BiFunction getConstructor() { + return constructor; + } + + static Type valueOfIgnoreCase(String s) { + for (Type type : values()) { + if (type.name().equalsIgnoreCase(s)) { + return type; + } + } + return null; + } + } + + // To be used as a Java annotation attribute value + private static final String DESCRIPTION = "[DirectByteBuffer, MappedByteBuffer, NettyFileRegion]"; + + { + // Assert if the description is correct. + final String expected = Arrays.asList(Type.values()).toString(); + Preconditions.assertTrue(expected.equals(DESCRIPTION), + () -> "Unexpected description: " + DESCRIPTION + " does not equal to the expected string " + expected); + } + + @Parameter(names = {"--type"}, description = DESCRIPTION, required = true) + private String dataStreamType = Type.NettyFileRegion.name(); @Parameter(names = {"--syncSize"}, description = "Sync every syncSize, syncSize % bufferSize should be zero," + "-1 means on sync", required = true) private int syncSize = -1; + int getSyncSize() { + return syncSize; + } + private boolean checkParam() { if (syncSize != -1 && syncSize % getBufferSizeInBytes() != 0) { System.err.println("Error: syncSize % bufferSize should be zero"); return false; } - if (!dataStreamType.equals("DirectByteBuffer") && - !dataStreamType.equals("MappedByteBuffer") && - !dataStreamType.equals("NettyFileRegion")) { - System.err.println("Error: dataStreamType should be one of DirectByteBuffer, MappedByteBuffer, transferTo"); + if (Type.valueOfIgnoreCase(dataStreamType) == null) { + System.err.println("Error: dataStreamType should be one of " + DESCRIPTION); return false; } @@ -101,18 +140,11 @@ private Map>> streamWrite( final long fileLength = file.length(); Preconditions.assertTrue(fileLength == getFileSizeInBytes(), "Unexpected file size: expected size is " + getFileSizeInBytes() + " but actual size is " + fileLength); - FileInputStream fis = new FileInputStream(file); - final DataStreamOutput dataStreamOutput = fileStoreClient.getStreamOutput(path, (int) file.length()); - - if (dataStreamType.equals("DirectByteBuffer")) { - fileMap.put(path, writeByDirectByteBuffer(dataStreamOutput, fis.getChannel())); - } else if (dataStreamType.equals("MappedByteBuffer")) { - fileMap.put(path, writeByMappedByteBuffer(dataStreamOutput, fis.getChannel())); - } else if (dataStreamType.equals("NettyFileRegion")) { - fileMap.put(path, writeByNettyFileRegion(dataStreamOutput, file)); - } - dataStreamOutput.closeAsync(); + final Type type = Optional.ofNullable(Type.valueOfIgnoreCase(dataStreamType)) + .orElseThrow(IllegalStateException::new); + final TransferType writer = type.getConstructor().apply(path, this); + fileMap.put(path, writer.transfer(fileStoreClient)); } return fileMap; } @@ -134,46 +166,126 @@ private long waitStreamFinish(Map> writeByDirectByteBuffer(DataStreamOutput dataStreamOutput, - FileChannel fileChannel) throws IOException { - final int fileSize = getFileSizeInBytes(); - final int bufferSize = getBufferSizeInBytes(); - if (fileSize <= 0) { - return Collections.emptyList(); + abstract static class TransferType { + private final String path; + private final File file; + private final long fileSize; + private final int bufferSize; + private final long syncSize; + private long syncPosition = 0; + + TransferType(String path, DataStream cli) { + this.path = path; + this.file = new File(path); + this.fileSize = cli.getFileSizeInBytes(); + this.bufferSize = cli.getBufferSizeInBytes(); + this.syncSize = cli.getSyncSize(); + + final long actualSize = file.length(); + Preconditions.assertTrue(actualSize == fileSize, () -> "Unexpected file size: expected size is " + + fileSize + " but actual size is " + actualSize + ", path=" + path); + } + + File getFile() { + return file; + } + + int getBufferSize() { + return bufferSize; + } + + long getPacketSize(long offset) { + return Math.min(bufferSize, fileSize - offset); } - List> futures = new ArrayList<>(); - final ByteBufAllocator alloc = PooledByteBufAllocator.DEFAULT; - for(long offset = 0L; offset < fileSize;) { - final ByteBuf buf = alloc.directBuffer(bufferSize); - final int bytesRead = buf.writeBytes(fileChannel, bufferSize); + boolean isSync(long position) { + if (syncSize > 0) { + if (position >= fileSize || position - syncPosition >= syncSize) { + syncPosition = position; + return true; + } + } + return false; + } + + List> transfer(FileStoreClient client) throws IOException { + if (fileSize <= 0) { + return Collections.emptyList(); + } + + final List> futures = new ArrayList<>(); + final DataStreamOutput out = client.getStreamOutput(path, fileSize); + try (FileInputStream fis = new FileInputStream(file)) { + final FileChannel in = fis.getChannel(); + for (long offset = 0L; offset < fileSize; ) { + offset += write(in, out, offset, futures); + } + } catch (Throwable e) { + throw new IOException("Failed to transfer " + path); + } finally { + futures.add(out.closeAsync()); + } + return futures; + } + + abstract long write(FileChannel in, DataStreamOutput out, long offset, + List> futures) throws IOException; + + @Override + public String toString() { + return JavaUtils.getClassSimpleName(getClass()) + "{" + path + ", size=" + fileSize + "}"; + } + } + + static class DirectByteBufferType extends TransferType { + DirectByteBufferType(String path, DataStream cli) { + super(path, cli); + } + + @Override + long write(FileChannel in, DataStreamOutput out, long offset, List> futures) + throws IOException { + final int bufferSize = getBufferSize(); + final ByteBuf buf = PooledByteBufAllocator.DEFAULT.directBuffer(bufferSize); + final int bytesRead = buf.writeBytes(in, bufferSize); if (bytesRead < 0) { - throw new IllegalStateException("Failed to read " + fileSize - + " byte(s). The channel has reached end-of-stream at " + offset); + throw new IllegalStateException("Failed to read " + bufferSize + " byte(s) from " + this + + ". The channel has reached end-of-stream at " + offset); } else if (bytesRead > 0) { - offset += bytesRead; - final CompletableFuture f = dataStreamOutput.writeAsync(buf.nioBuffer(), - syncSize > 0 && (offset == fileSize || offset % syncSize == 0)); + final CompletableFuture f = out.writeAsync(buf.nioBuffer(), isSync(offset + bytesRead)); f.thenRun(buf::release); futures.add(f); } + return bytesRead; } - - return futures; } - private List> writeByMappedByteBuffer(DataStreamOutput dataStreamOutput, - FileChannel fileChannel) throws IOException { - List> futures = new ArrayList<>(); - MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_ONLY, 0, getFileSizeInBytes()); - futures.add(dataStreamOutput.writeAsync(mappedByteBuffer)); - return futures; + static class MappedByteBufferType extends TransferType { + MappedByteBufferType(String path, DataStream cli) { + super(path, cli); + } + + @Override + long write(FileChannel in, DataStreamOutput out, long offset, List> futures) + throws IOException { + final long packetSize = getPacketSize(offset); + final MappedByteBuffer mappedByteBuffer = in.map(FileChannel.MapMode.READ_ONLY, offset, packetSize); + final int remaining = mappedByteBuffer.remaining(); + futures.add(out.writeAsync(mappedByteBuffer, isSync(offset + remaining))); + return remaining; + } } - private List> writeByNettyFileRegion( - DataStreamOutput dataStreamOutput, File file) { - List> futures = new ArrayList<>(); - futures.add(dataStreamOutput.writeAsync(file)); - return futures; + static class NettyFileRegionType extends TransferType { + NettyFileRegionType(String path, DataStream cli) { + super(path, cli); + } + + @Override + long write(FileChannel in, DataStreamOutput out, long offset, List> futures) { + final long packetSize = getPacketSize(offset); + futures.add(out.writeAsync(getFile(), offset, packetSize, isSync(offset + packetSize))); + return packetSize; + } } }