diff --git a/code/src/main/java/com/googlecode/cqengine/CapacityLimitedIndexedCollection.java b/code/src/main/java/com/googlecode/cqengine/CapacityLimitedIndexedCollection.java new file mode 100644 index 00000000..aa23bc19 --- /dev/null +++ b/code/src/main/java/com/googlecode/cqengine/CapacityLimitedIndexedCollection.java @@ -0,0 +1,170 @@ +package com.googlecode.cqengine; + +import com.googlecode.cqengine.attribute.Attribute; +import com.googlecode.cqengine.persistence.Persistence; +import com.googlecode.cqengine.query.option.AttributeOrder; +import com.googlecode.cqengine.query.option.QueryOptions; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +import static com.googlecode.cqengine.query.QueryFactory.*; + +/** + * @author wayne + * @since 0.1.0 + */ +public class CapacityLimitedIndexedCollection> extends ConcurrentIndexedCollection { + + private static final int DEF_CAPACITY_LIMIT = 1000; + private static final int DEF_RELEASE_SIZE = 3; + + /** + * The capacity size of collection + */ + private final int capacityLimit; + + /** + * The size of each release capacity + */ + private final int releaseSize; + + /** + * Sort field + */ + private final AttributeOrder attributeOrder; + + /** + * Expiration strategy: LRU,LFU,FIFO,NONE + */ + private Eviction eviction; + + public CapacityLimitedIndexedCollection() { + this(DEF_CAPACITY_LIMIT); + } + + public CapacityLimitedIndexedCollection(int capacityLimit) { + this(capacityLimit, DEF_RELEASE_SIZE); + } + + public CapacityLimitedIndexedCollection(int capacityLimit, int releaseSize) { + this(capacityLimit, 0, Eviction.NONE, null); + } + + public CapacityLimitedIndexedCollection( + int capacityLimit, + int releaseSize, + Eviction eviction, + Attribute orderControlAttribute + ) { + this.capacityLimit = capacityLimit; + this.releaseSize = releaseSize; + this.eviction = eviction; + this.attributeOrder = new AttributeOrder<>(orderControlAttribute, true); + } + + public CapacityLimitedIndexedCollection( + Persistence persistence, + int capacityLimit, + int releaseSize, + Attribute orderControlAttribute + ) { + super(persistence); + this.capacityLimit = capacityLimit; + this.releaseSize = releaseSize; + this.attributeOrder = new AttributeOrder<>(orderControlAttribute, true); + } + + @Override + public boolean add(O o) { + return addAll(Collections.singleton(o)); + } + + @Override + @SuppressWarnings("unchecked") + public boolean addAll(Collection c) { + if (c == null || c.isEmpty()) { + return true; + } + if (isReachedCapacityLimit(c.size())) { + if (this.eviction == Eviction.NONE) { + throw new RuntimeException("Reached maximum capacity limit: " + capacityLimit); + } + int evictCount = getEvictCount(c.size()); + + Class clazz = (Class) c.stream().findFirst().get().getClass(); + doEviction(evictCount, clazz); + } + return super.addAll(c); + } + + private void doEviction(int evictCount, Class clazz) { + List toEvictEntries = retrieve(all(clazz), getQueryOptionsByEviction()).stream().limit(evictCount).collect(Collectors.toList()); + + if (toEvictEntries.size() > 0) { + toEvictEntries.forEach(this::remove); + } + } + + private boolean isReachedCapacityLimit() { + return size() >= capacityLimit; + } + + private boolean isReachedCapacityLimit(int increment) { + return (size() + increment) > capacityLimit; + } + + private int getEvictCount(int increment) { + int overSize = (size() + increment) - capacityLimit; + return overSize + releaseSize; + } + + public static class OrderedEntry { + private O o; + private long timestamp; + + public OrderedEntry(O o) { + this(o, System.currentTimeMillis()); + } + + public OrderedEntry(O o, long timestamp) { + this.o = o; + this.timestamp = timestamp; + } + + public O getO() { + return o; + } + + public void setO(O o) { + this.o = o; + } + + public long getTimestamp() { + return timestamp; + } + + public void setTimestamp(long timestamp) { + this.timestamp = timestamp; + } + } + + public QueryOptions getQueryOptionsByEviction() { + if (this.eviction == Eviction.NONE) { + return null; + } + return queryOptions(orderBy(attributeOrder)); + } + + /** + * Expiration strategy + */ + public enum Eviction { + LRU, + LFU, + FIFO, + NONE + } +} diff --git a/code/src/test/java/com/googlecode/cqengine/CapacityLimitedIndexedCollectionTest.java b/code/src/test/java/com/googlecode/cqengine/CapacityLimitedIndexedCollectionTest.java new file mode 100644 index 00000000..44909502 --- /dev/null +++ b/code/src/test/java/com/googlecode/cqengine/CapacityLimitedIndexedCollectionTest.java @@ -0,0 +1,146 @@ +package com.googlecode.cqengine; + +import com.google.common.collect.testing.TestStringSetGenerator; +import com.googlecode.cqengine.attribute.Attribute; +import com.googlecode.cqengine.attribute.SimpleAttribute; +import com.googlecode.cqengine.index.hash.HashIndex; +import com.googlecode.cqengine.persistence.offheap.OffHeapPersistence; +import com.googlecode.cqengine.persistence.onheap.OnHeapPersistence; +import com.googlecode.cqengine.query.Query; +import com.googlecode.cqengine.query.QueryFactory; +import com.googlecode.cqengine.query.option.QueryOptions; +import junit.framework.TestCase; +import junit.framework.TestSuite; +import org.junit.Assert; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Set; + +import static com.googlecode.cqengine.query.QueryFactory.equal; +import static java.util.Arrays.asList; + +/** + * @author wayne + * @since 0.1.0 + */ +public class CapacityLimitedIndexedCollectionTest extends TestCase { + + public static junit.framework.Test suite() { + TestSuite suite = new TestSuite(); + suite.addTestSuite(CapacityLimitedIndexedCollectionTest.class); + return suite; + } + + private static TestStringSetGenerator onHeapIndexedCollectionGenerator() { + return new TestStringSetGenerator() { + @Override protected Set create(String[] elements) { + IndexedCollection indexedCollection = new ConcurrentIndexedCollection<>(OnHeapPersistence.onPrimaryKey(QueryFactory.selfAttribute(String.class))); + indexedCollection.addAll(asList(elements)); + return indexedCollection; + } + }; + } + + private static TestStringSetGenerator offHeapIndexedCollectionGenerator() { + return new TestStringSetGenerator() { + @Override protected Set create(String[] elements) { + IndexedCollection indexedCollection = new ConcurrentIndexedCollection<>(OffHeapPersistence.onPrimaryKey(QueryFactory.selfAttribute(String.class))); + indexedCollection.addAll(asList(elements)); + return indexedCollection; + } + }; + } + + public void testAdd() { + IndexedCollection coll = new CapacityLimitedIndexedCollection<>(5); + + coll.addIndex(HashIndex.onAttribute(TestEntry.NAME)); + + for (int i = 0; i < 5; i++) { + coll.add(new TestEntry(String.valueOf(i), (long) i)); + } + + Query q = equal(TestEntry.NAME, "3"); + coll.retrieve(q).forEach(System.out::println); + + Exception ex = Assert.assertThrows(RuntimeException.class, () -> coll.add(new TestEntry("5", 5L))); + Assert.assertTrue(ex.getMessage().startsWith("Reached maximum")); + + System.out.println("Expected error occurred: " + ex.getMessage()); + } + + public void testAddWithFIFO() { + IndexedCollection coll = new CapacityLimitedIndexedCollection<>( + 5, + 3, + CapacityLimitedIndexedCollection.Eviction.FIFO, + TestEntry.ORDER + ); + + coll.addIndex(HashIndex.onAttribute(TestEntry.NAME)); + + for (int i = 0; i <= 100; i++) { + coll.add(new TestEntry(String.valueOf(i), (long) i)); + } + + Assert.assertTrue(coll.size() <= 5); + } + + public void testAddAll() { + IndexedCollection coll = new CapacityLimitedIndexedCollection<>(5); + + coll.addIndex(HashIndex.onAttribute(TestEntry.NAME)); + + for (int i = 0; i < 3; i++) { + coll.add(new TestEntry(String.valueOf(i), (long) i)); + } + + Query q = equal(TestEntry.NAME, "2"); + coll.retrieve(q).forEach(System.out::println); + + Collection list = new ArrayList<>(); + list.add(new TestEntry("3", 3L)); + list.add(new TestEntry("4", 4L)); + list.add(new TestEntry("5", 5L)); + + Exception ex = Assert.assertThrows(RuntimeException.class, () -> coll.addAll(list)); + Assert.assertTrue(ex.getMessage().startsWith("Reached maximum")); + + System.out.println("Expected error occurred: " + ex.getMessage()); + } + + static class TestEntry { + private final String name; + private final Long order; + + public TestEntry(String name, Long order) { + this.name = name; + this.order = order; + } + + public String getName() { + return name; + } + + public Long getOrder() { + return order; + } + + // -------------------------- Attributes -------------------------- + public static final Attribute NAME = new SimpleAttribute("name") { + public String getValue(TestEntry entry, QueryOptions queryOptions) { return entry.name; } + }; + + public static final Attribute ORDER = new SimpleAttribute("order") { + public Long getValue(TestEntry entry, QueryOptions queryOptions) { return entry.order; } + }; + + @Override + public String toString() { + return "TestEntry{" + + "name='" + name + '\'' + + '}'; + } + } +}