From fc76b4ec6713854ac3a65b308c50ab4c52b240dc Mon Sep 17 00:00:00 2001 From: aeslampanah Date: Thu, 6 Apr 2023 13:43:37 -0400 Subject: [PATCH 1/2] Added ChronicleMap persistence as an alternative to SQLite --- code/pom.xml | 16 ++ .../disk/chronicle/ChronicleMapIterator.java | 34 +++ .../disk/chronicle/ChroniclePersistence.java | 205 ++++++++++++++++++ .../disk/chronicle/TimestampDelegate.java | 37 ++++ 4 files changed, 292 insertions(+) create mode 100644 code/src/main/java/com/googlecode/cqengine/persistence/disk/chronicle/ChronicleMapIterator.java create mode 100644 code/src/main/java/com/googlecode/cqengine/persistence/disk/chronicle/ChroniclePersistence.java create mode 100644 code/src/main/java/com/googlecode/cqengine/persistence/disk/chronicle/TimestampDelegate.java diff --git a/code/pom.xml b/code/pom.xml index ac4e7200a..0e23225cc 100644 --- a/code/pom.xml +++ b/code/pom.xml @@ -317,6 +317,22 @@ test + + net.openhft + chronicle-map + 3.24ea2 + + + io.protostuff + protostuff-core + 1.8.0 + + + io.protostuff + protostuff-runtime + 1.8.0 + + diff --git a/code/src/main/java/com/googlecode/cqengine/persistence/disk/chronicle/ChronicleMapIterator.java b/code/src/main/java/com/googlecode/cqengine/persistence/disk/chronicle/ChronicleMapIterator.java new file mode 100644 index 000000000..4ec29cdeb --- /dev/null +++ b/code/src/main/java/com/googlecode/cqengine/persistence/disk/chronicle/ChronicleMapIterator.java @@ -0,0 +1,34 @@ +package com.googlecode.cqengine.persistence.disk.chronicle; + +import com.googlecode.cqengine.index.support.CloseableIterator; +import net.openhft.chronicle.map.ChronicleMap; + +import java.util.Iterator; +import java.util.Map; + +public class ChronicleMapIterator> implements CloseableIterator { + private final Iterator> iterator; + + public ChronicleMapIterator(ChronicleMap map) { + iterator = map.entrySet().iterator(); + } + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public O next() { + return iterator.next().getValue(); + } + + @Override + public void remove() { + iterator.remove(); + } + + @Override + public void close() { + } +} diff --git a/code/src/main/java/com/googlecode/cqengine/persistence/disk/chronicle/ChroniclePersistence.java b/code/src/main/java/com/googlecode/cqengine/persistence/disk/chronicle/ChroniclePersistence.java new file mode 100644 index 000000000..7bf87f746 --- /dev/null +++ b/code/src/main/java/com/googlecode/cqengine/persistence/disk/chronicle/ChroniclePersistence.java @@ -0,0 +1,205 @@ +package com.googlecode.cqengine.persistence.disk.chronicle; + +import com.googlecode.cqengine.attribute.SimpleAttribute; +import com.googlecode.cqengine.index.Index; +import com.googlecode.cqengine.index.support.CloseableIterator; +import com.googlecode.cqengine.persistence.Persistence; +import com.googlecode.cqengine.persistence.support.ObjectStore; +import com.googlecode.cqengine.query.option.QueryOptions; +import io.protostuff.*; +import io.protostuff.runtime.*; +import net.openhft.chronicle.map.ChronicleMap; +import net.openhft.chronicle.map.ChronicleMapBuilder; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collection; + +public class ChroniclePersistence> implements ObjectStore, Persistence { + + private final File dbFile; + + final SimpleAttribute primaryKeyAttribute; + + Schema objectSchema; + Schema indexSchema; + + int objectMaxSize; + int indexMaxSize; + + + private ChronicleMap chronicleMap; + + public ChroniclePersistence(SimpleAttribute primaryKeyAttribute, File dbFile, Class indexClass, Class objectClass, int indexMaxSize, int objectMaxSize) throws + IOException { + // Timestamps aren't correctly decoded without this delegate.. + TimestampDelegate timestampDelegate = new TimestampDelegate(); + io.protostuff.runtime.DefaultIdStrategy sessionIdStrategy = new DefaultIdStrategy(); + sessionIdStrategy.registerDelegate(timestampDelegate); + + this.indexMaxSize = indexMaxSize; + this.objectMaxSize = objectMaxSize; + + this.indexSchema = RuntimeSchema.getSchema(indexClass, sessionIdStrategy); + this.objectSchema = RuntimeSchema.getSchema(objectClass, sessionIdStrategy); + + this.primaryKeyAttribute = primaryKeyAttribute; + this.dbFile = dbFile; + + ChronicleMapBuilder mapBuilder = ChronicleMapBuilder.of(indexClass, objectClass) + .name(dbFile.getName()) + .averageKeySize(indexMaxSize) + .averageValueSize(objectMaxSize) + .entries(1000000); // Adjust the expected number of entries as needed + try { + chronicleMap = mapBuilder.createPersistedTo(dbFile); + } catch (Exception e) { + chronicleMap = mapBuilder.createOrRecoverPersistedTo(dbFile); + } + } + + + @Override + public ObjectStore createObjectStore() { + return this; + } + + @Override + public boolean supportsIndex(Index index) { + return true; + } + + @Override + public void openRequestScopeResources(QueryOptions queryOptions) { + // No resources need to be opened for this implementation + } + + @Override + public void closeRequestScopeResources(QueryOptions queryOptions) { + // No resources need to be closed for this implementation + } + + @Override + public SimpleAttribute getPrimaryKeyAttribute() { + return primaryKeyAttribute; + } + + @Override + public int size(QueryOptions queryOptions) { + return (int) chronicleMap.longSize(); + } + + @Override + public boolean contains(Object o, QueryOptions queryOptions) { + A key = primaryKeyAttribute.getValue((O) o, queryOptions); + return chronicleMap.containsKey(key); + } + + @Override + public boolean add(O o, QueryOptions queryOptions) { + A key = primaryKeyAttribute.getValue(o, queryOptions); + O existingValue = chronicleMap.putIfAbsent(key, o); + return existingValue == null; + } + + @Override + public boolean remove(Object o, QueryOptions queryOptions) { + A key = primaryKeyAttribute.getValue((O) o, queryOptions); + O removedValue = chronicleMap.remove(key); + return removedValue != null; + } + + @Override + public void clear(QueryOptions queryOptions) { + chronicleMap.clear(); + } + + @Override + public CloseableIterator iterator(QueryOptions queryOptions) { + return new ChronicleMapIterator<>(chronicleMap); + } + + @Override + public boolean isEmpty(QueryOptions queryOptions) { + return size(queryOptions) == 0; + } + + @Override + public boolean containsAll(Collection collection, QueryOptions queryOptions) { + for (Object o : collection) { + if (!contains(o, queryOptions)) { + return false; + } + } + return true; + } + + @Override + public boolean addAll(Collection collection, QueryOptions queryOptions) { + boolean modified = false; + for (O o : collection) { + if (add(o, queryOptions)) { + modified = true; + } + } + return modified; + } + + @Override + public boolean retainAll(Collection collection, QueryOptions queryOptions) { + boolean modified = false; + try (CloseableIterator iterator = iterator(queryOptions)) { + while (iterator.hasNext()) { + O o = iterator.next(); + if (!collection.contains(o)) { + iterator.remove(); + modified = true; + } + } + } + return modified; + } + + @Override + public boolean removeAll(Collection collection, QueryOptions queryOptions) { + boolean modified = false; + for (Object o : collection) { + if (remove(o, queryOptions)) { + modified = true; + } + } + return modified; + } + + + // Helper methods to convert keys and values to ByteBuffers + + private ByteBuffer serializeKey(A key) { + LinkedBuffer buffer = LinkedBuffer.allocate(indexMaxSize); + byte[] bytes = ProtostuffIOUtil.toByteArray(key, indexSchema, buffer); + return ByteBuffer.wrap(bytes); + } + + private ByteBuffer serializeValue(O value) { + LinkedBuffer buffer = LinkedBuffer.allocate(objectMaxSize); + byte[] bytes = ProtostuffIOUtil.toByteArray(value, objectSchema, buffer); + return ByteBuffer.wrap(bytes); + } + + private A deserializeKey(ByteBuffer keyBuffer) { + A key = indexSchema.newMessage(); + ProtostuffIOUtil.mergeFrom(keyBuffer.array(), key, indexSchema); + return key; + } + + private O deserializeValue(ByteBuffer valueBuffer) { + O value = objectSchema.newMessage(); + ProtostuffIOUtil.mergeFrom(valueBuffer.array(), value, objectSchema); + return value; + } + + public File getDbFile() { + return dbFile; + } +} diff --git a/code/src/main/java/com/googlecode/cqengine/persistence/disk/chronicle/TimestampDelegate.java b/code/src/main/java/com/googlecode/cqengine/persistence/disk/chronicle/TimestampDelegate.java new file mode 100644 index 000000000..91a20eccd --- /dev/null +++ b/code/src/main/java/com/googlecode/cqengine/persistence/disk/chronicle/TimestampDelegate.java @@ -0,0 +1,37 @@ +package com.googlecode.cqengine.persistence.disk.chronicle; + +import io.protostuff.*; +import io.protostuff.WireFormat.FieldType; +import io.protostuff.runtime.*; + +import java.io.IOException; +import java.sql.Timestamp; + +public class TimestampDelegate implements Delegate { + + @Override + public FieldType getFieldType() { + return FieldType.SFIXED64; + } + + @Override + public Class typeClass() { + return Timestamp.class; + } + + @Override + public Timestamp readFrom(Input input) throws IOException { + return new Timestamp(input.readSFixed64()); + } + + @Override + public void writeTo(Output output, int number, Timestamp value, boolean repeated) throws IOException { + output.writeFixed64(number, value.getTime(), repeated); + } + + @Override + public void transfer(Pipe pipe, Input input, Output output, int number, boolean repeated) throws + IOException { + output.writeFixed64(number, input.readSFixed64(), repeated); + } +} From dcc3c47c9ecd4c60904973e93a6548e604935f30 Mon Sep 17 00:00:00 2001 From: aeslampanah Date: Mon, 10 Apr 2023 10:21:21 -0400 Subject: [PATCH 2/2] Added some javadoc, and made maxEntries parameterized as part of the ChroniclePersistence constructor --- .../disk/chronicle/ChroniclePersistence.java | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/code/src/main/java/com/googlecode/cqengine/persistence/disk/chronicle/ChroniclePersistence.java b/code/src/main/java/com/googlecode/cqengine/persistence/disk/chronicle/ChroniclePersistence.java index 7bf87f746..7255a95d4 100644 --- a/code/src/main/java/com/googlecode/cqengine/persistence/disk/chronicle/ChroniclePersistence.java +++ b/code/src/main/java/com/googlecode/cqengine/persistence/disk/chronicle/ChroniclePersistence.java @@ -31,7 +31,21 @@ public class ChroniclePersistence> implements ObjectS private ChronicleMap chronicleMap; - public ChroniclePersistence(SimpleAttribute primaryKeyAttribute, File dbFile, Class indexClass, Class objectClass, int indexMaxSize, int objectMaxSize) throws + + /** + * Create's a chronicle persistence object, this will save the state to disk with a filesize proportional to the settings + * + * Note that the maximums below presume the worst case, (eg if your actual object size average is lower than what is set then you will be able to store more entries than indicated) + * @param primaryKeyAttribute The primary attribute + * @param dbFile The file to store in + * @param indexClass The class of the indexing object + * @param objectClass The class of the stored object + * @param indexMaxSize The maximum expected size of an indexing object + * @param objectMaxSize The maximum expected size of a stored object + * @param maxEntries The maximum number of entries expected for this store + * @throws IOException + */ + public ChroniclePersistence(SimpleAttribute primaryKeyAttribute, File dbFile, Class indexClass, Class objectClass, int indexMaxSize, int objectMaxSize, long maxEntries) throws IOException { // Timestamps aren't correctly decoded without this delegate.. TimestampDelegate timestampDelegate = new TimestampDelegate(); @@ -51,7 +65,7 @@ public ChroniclePersistence(SimpleAttribute primaryKeyAttribute, File dbFi .name(dbFile.getName()) .averageKeySize(indexMaxSize) .averageValueSize(objectMaxSize) - .entries(1000000); // Adjust the expected number of entries as needed + .entries(maxEntries); // Adjust the expected number of entries as needed try { chronicleMap = mapBuilder.createPersistedTo(dbFile); } catch (Exception e) {