Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
90f4399
first commit
LCHammer May 30, 2020
ec73202
fixed wrong import
JuliusNmn May 30, 2020
7cfed35
removed type variable from DeltaMergable
JuliusNmn Jun 6, 2020
0fd1f7b
Added DeltaHandler trait, allowing a DCRDT to explicitly send messages
JuliusNmn Jun 6, 2020
bc54b62
Added DCRDT demo module with simple AddOnlySet implementation, WIP
JuliusNmn Jun 6, 2020
d958e23
updated dcrdt example
JuliusNmn Jun 6, 2020
534e8bb
added dcrdt jconsistencylevel
JuliusNmn Jun 6, 2020
2fb10b7
fixes to dcrdt demo, still WIP
JuliusNmn Jun 6, 2020
578b87e
dcrdt-demo works, framework needs further adaption
JuliusNmn Jun 12, 2020
be1d935
added deltacrdt to AkkaReplicaSystemFactory, added debug statements (…
JuliusNmn Jun 12, 2020
85ac83e
changed dcrdt behavior. removed transmit delta function. if a functio…
JuliusNmn Jun 22, 2020
d955a3c
added AddOnlySetString
LCHammer Jun 22, 2020
d7e906b
edited benchmark
JuliusNmn Jun 22, 2020
4be6da8
added Delta and ResultWrapper classes
JuliusNmn Jun 22, 2020
6f06e06
Removed Generics
JuliusNmn Jun 22, 2020
de50017
adapted addonlysetstring
JuliusNmn Jun 22, 2020
f07c42a
added IntegerVector
LCHammer Jun 22, 2020
2026eec
Merge branch 'develop-Kris' of github.com:c00lius/consysT-code into d…
LCHammer Jun 22, 2020
8d89161
added IntegerVector
LCHammer Jun 22, 2020
5f46087
Renamed demo project
JuliusNmn Jun 26, 2020
347458a
added AddRemoveSet
LCHammer Jun 26, 2020
1fc4095
added Hashmap
LCHammer Jun 27, 2020
c892a8d
fixed compile time errors in Hashmap, added essential methods
JuliusNmn Jun 29, 2020
88703f7
added dcrdt to demos/pom.xml
JuliusNmn Jun 29, 2020
ea5e284
added stringhashmap
JuliusNmn Jun 29, 2020
6ceffb8
fixed compile time errors
JuliusNmn Jun 29, 2020
12503cd
Merge branch 'develop-Kris' of github.com:JuliusNmn/consysT-code into…
JuliusNmn Jun 29, 2020
d2b66b4
added dotStore and Events
LCHammer Jul 13, 2020
d07a5f5
added dcrdt hashmap
JuliusNmn Jul 14, 2020
295c97b
added comments
LCHammer Aug 2, 2020
a2c5f6d
changed dotstore
LCHammer Aug 4, 2020
7c29e03
added comments
LCHammer Aug 4, 2020
f991181
added comments
LCHammer Aug 4, 2020
a5c781f
big testbench
LCHammer Aug 7, 2020
02160a0
Merge remote-tracking branch 'upstream/develop' into develop-Kris
JuliusNmn Aug 7, 2020
a39859e
Merge branch 'develop-Kris' of github.com:JuliusNmn/consysT-code into…
JuliusNmn Aug 7, 2020
40e53d1
fixed merge conflicts
JuliusNmn Aug 9, 2020
4a6201c
added comments to ShtringHashmap
LCHammer Aug 10, 2020
b9dd1bf
Merge branch 'develop-Kris' of https://github.com/JuliusNmn/consysT-c…
LCHammer Aug 10, 2020
a8e835c
commented StringHashmap
LCHammer Aug 10, 2020
fec3e7a
commented Hashmap
LCHammer Aug 10, 2020
1af1eb6
added readme
JuliusNmn Aug 11, 2020
648d94c
Merge branch 'develop-Kris' of github.com:JuliusNmn/consysT-code into…
JuliusNmn Aug 11, 2020
ea322f6
improvements to readme
JuliusNmn Aug 12, 2020
beaaf07
improvements to readme
JuliusNmn Aug 12, 2020
d23521a
added disclaimer to StringHashmap
JuliusNmn Aug 12, 2020
f2c96e2
refactor
JuliusNmn Aug 12, 2020
b8b68c7
cleanup
JuliusNmn Aug 12, 2020
a0a43a8
cleanup
JuliusNmn Aug 12, 2020
3b33918
cleanup
JuliusNmn Aug 12, 2020
a2dbfd4
Fixed DCRDTHashMap. wrong value was being passed to value.merge() on …
JuliusNmn Aug 12, 2020
2365dca
added Stringmethod in Dotstore
LCHammer Aug 12, 2020
25995b6
final change in benchmark
LCHammer Aug 12, 2020
3cc71c3
cleanup
JuliusNmn Aug 12, 2020
23649c3
cleanup
JuliusNmn Aug 12, 2020
ed8e22e
Merge branch 'develop-Kris' of github.com:JuliusNmn/consysT-code into…
JuliusNmn Aug 12, 2020
147d93b
cleanup
JuliusNmn Aug 12, 2020
a680725
cleanup
JuliusNmn Aug 12, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ object ConsistencyLabel {
/* CRDTs */
case object CvRDT extends ConsistencyLabel
case object CmRDT extends ConsistencyLabel
case object DCRDT extends ConsistencyLabel


case class Cassandra(replicas : Int) extends ConsistencyLabel
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ object AkkaReplicaSystemFactory extends ReplicaSystemFactory {
with CausalAkkaReplicaSystem
with CmRDTAkkaReplicaSystem
with CvRDTAkkaReplicaSystem
with DeltaCRDTAkkaReplicaSystem
{
override protected def freshAddr() : String =
"$" + String.valueOf(Random.alphanumeric.take(16).toArray)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package de.tuda.stg.consys.core.akka
import java.io

import akka.actor.{ActorRef, ActorSystem}
import de.tuda.stg.consys.core.Address

import scala.reflect.runtime.universe._
import de.tuda.stg.consys.core.akka.DeltaCRDTAkkaReplicaSystem.{DeltaCRDTReplicatedObject, DeltaUpdateReq}
import de.tuda.stg.consys.core.ConsistencyLabel.DCRDT
import de.tuda.stg.consys.core.akka.Requests.{InvokeOp, NoAnswerRequest, Operation, Request, SynchronousRequest}

import scala.concurrent.duration.FiniteDuration
import scala.reflect.runtime.universe
/**
@author: Kris Frühwein und Julius Näumann
*/
trait DeltaCRDTAkkaReplicaSystem extends AkkaReplicaSystem {


//creates master replica
override protected def createMasterReplica[T <: Obj : TypeTag](l: ConsistencyLevel, addr: Addr, obj: T): AkkaReplicatedObject[Addr, T] = {
val result = l match {

case DCRDT => new DeltaCRDTReplicatedObject[Addr, T](obj, addr, this)
case _ => super.createMasterReplica[T](l, addr, obj)
}
println("created master replica")
result
}

//creates follower replica
override protected def createFollowerReplica[T <: Obj : TypeTag](l: ConsistencyLevel, addr: Addr, obj: T, masterRef: ActorRef): AkkaReplicatedObject[Addr, T] = {
val result = l match {
case DCRDT => new DeltaCRDTReplicatedObject[Addr, T](obj, addr, this)
case _ => super.createFollowerReplica[T](l, addr, obj, masterRef)
}
println("created follower replicas")
result
}
}


trait DeltaHandler {
def transmitDelta(delta: Any)
}

object DeltaCRDTAkkaReplicaSystem {

private case class RequestOperation(op: Operation[_]) extends SynchronousRequest[Unit]

private case class RequestSync(tx: Transaction) extends SynchronousRequest[Unit]

case class Message(obj: Any) extends NoAnswerRequest

case class DeltaUpdateReq(obj: Any) extends NoAnswerRequest



//Class for an Replicated Object. must be serializable
class DeltaCRDTReplicatedObject[Loc, T]
(
init: T, val addr: Loc, val replicaSystem: AkkaReplicaSystem {type Addr = Loc}
)(
protected implicit val ttt: TypeTag[T]
) extends AkkaSECReplicatedObject[Loc, T]
with Lockable[T]
with Serializable
with DeltaHandler
{
setObject(init)
val t = init.asInstanceOf[DeltaCRDT]

override final def consistencyLevel: ConsistencyLevel = DCRDT

override def handleRequest[R](request: Request[R]): R = request match {
case DeltaUpdateReq(state: AkkaReplicaSystem#Obj) =>
getObject.asInstanceOf[DeltaCRDT].merge(state)

None.asInstanceOf[R]
case _ =>
super.handleRequest(request)

}

override def internalInvoke[R](tx: Transaction, methodName: String, args: Seq[Seq[Any]]): R = {
val result = super.internalInvoke[R](tx, methodName, args)
if (result.isInstanceOf[Delta]) {
val d = result.asInstanceOf[Delta]
replicaSystem.foreachOtherReplica(handler => handler.request(addr, DeltaUpdateReq(d.delta)))
}
result
}



override def sync(): Unit = {
// todo:
// traditional sync method does not make sense in the context of dcrdt
// should there be an option to force sync?
println(s"DCRDT '$addr' sync")
}

override protected def transactionStarted(tx: Transaction): Unit = {
super.transactionStarted(tx)
}

override protected def transactionFinished(tx: Transaction): Unit = {
super.transactionFinished(tx)
}

override def toString: String = s"@DCRDT($addr, $getObject)"

override def transmitDelta(delta: Any): Unit = {
replicaSystem.foreachOtherReplica(handler => handler.request(addr, DeltaUpdateReq(delta)))
}
}
}

//abstract class for all deltaCRDT
abstract class DeltaCRDT extends DeltaMergeable {

}

//general delta return type
class Delta (
d: AkkaReplicaSystem#Obj
) {
var delta :AkkaReplicaSystem#Obj = d
}

//delta return type if method returns something too
class ResultWrapper[T <: Object] (v: T, d: AkkaReplicaSystem#Obj)
extends Delta(d) {
val value: T = v
}



Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package de.tuda.stg.consys.core.akka

/**
* Classes with this trait allow being updated with a delta state
* @tparam T delta state type
*/
trait DeltaMergeable {

private[akka] def merge(other:AkkaReplicaSystem#Obj)

}


Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ public interface JConsistencyLevels {

@Local ConsistencyLabel CMRDT = ConsistencyLabel.CmRDT$.MODULE$;
@Local ConsistencyLabel CVRDT = ConsistencyLabel.CvRDT$.MODULE$;

@Local ConsistencyLabel DCRDT = ConsistencyLabel.DCRDT$.MODULE$;
}
Original file line number Diff line number Diff line change
@@ -1,19 +1,13 @@
package de.tuda.stg.consys.demo.counter;
package de.tuda.stg.consys.demo.dcrdt;

import com.typesafe.config.Config;
import de.tuda.stg.consys.bench.OutputFileResolver;
import de.tuda.stg.consys.demo.DemoBenchmark;
import de.tuda.stg.consys.demo.counter.schema.Counter;
import de.tuda.stg.consys.demo.dcrdt.schema.Counter;
import de.tuda.stg.consys.japi.JRef;
import de.tuda.stg.consys.japi.impl.JReplicaSystems;
import de.tuda.stg.consys.japi.impl.akka.JAkkaReplicaSystem;
import org.checkerframework.com.google.common.collect.Sets;
import scala.Option;

import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* Created on 10.10.19.
*
Expand All @@ -32,6 +26,7 @@ public CounterBenchmark(Config config, Option<OutputFileResolver> outputResolver

@Override
public void setup() {

if (processId() == 0) {
counter = system().replicate("counter", new Counter(0), getWeakLevel());
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package de.tuda.stg.consys.demo.counter;
package de.tuda.stg.consys.demo.dcrdt;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package de.tuda.stg.consys.demo.counter;
package de.tuda.stg.consys.demo.dcrdt;

import de.tuda.stg.consys.core.ConsistencyLabel;
import de.tuda.stg.consys.demo.counter.schema.Counter;
import de.tuda.stg.consys.demo.dcrdt.schema.Counter;
import de.tuda.stg.consys.japi.JConsistencyLevels;
import de.tuda.stg.consys.japi.JRef;
import de.tuda.stg.consys.japi.JReplicaSystem;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package de.tuda.stg.consys.demo.counter.schema;
package de.tuda.stg.consys.demo.dcrdt.schema;

import java.io.Serializable;

Expand Down
81 changes: 81 additions & 0 deletions demos/dcrdt-demo/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# Implementing Delta-CRDTs in Consys

Consys supports implementing custom Delta-CRDTs, as presented in `Almeida et al. - 2018 - Delta State Replicated Data Types`.

## Delta-CRDT overview
Delta-CRDTs are an extension of CRDTs, which are a replicated data structure synchronized using a `merge` method, which merges two states into a new one.

Delta-CRDTs provide the advantage of reduced data transfer, as they are synchronized through _delta-states_. If a replica has changed, it will only need to send a difference.
Other replicas can incorporate these changes by implementing the `merge` method, which takes a _delta-state_ parameter.

The structure must implement a _join-semilattice_; Please see the referenced publication for more details on the formal requirements it has to meet.
## Implementing a Delta-CRDT

Contrary to the typical workflow of Consys, using a Delta-CRDT structure requires implementing a custom class defining a `merge` method. Additionally, methods must follow a certain convention to convey whether they result in a delta state.
Automatically inferring delta states is not currently in scope of this project.

The following exemplifies a DCRDT implementation using an AddOnlySet.

```
public class AddOnlySetString extends DeltaCRDT implements Serializable {

private Set<String> set = new HashSet<String>();


@Override
public void merge(Object other) {
if (other instanceof Set) {
Set<String> s = (Set<String>) other;
set.addAll(s);
}
}

// adds a new String to this set
public Delta addElement(String str) {
set.add(str);
Set<String> s = new HashSet<String>();
s.add(str);
return new Delta(s);
}

// adds a new String to this set.
// Returns whether the current local set did not yet include this string.
public ResultWrapper<Boolean> addElement2(String str) {
boolean result = set.add(str);
Set<String> s = new HashSet<String>();
s.add(str);
return new ResultWrapper<>(result, s);
}


}
```
The `addElement` method returns a Delta instance containing _delta-state_.
`merge()` takes an `Object` parameter. Any value wrapped in a `Delta` or `ResultWrapper` instance will be passed here.


Things to note:
* The class must be Serializable. When a replica is initially registered using `replicate()`, it is transmitted to other clients as a whole, without the use of delta states.
* As of yet, akka does not support generics, which is why the merge method only takes an `Object`. This is also why this example explicitly uses strings.
* If a method results in a changed state, it must return a `Delta` instance that includes the delta state. Akka will transmit this Delta to other replicas by invoking their `merge` method.
* If a method intended to return a value results in a changed state, it must return a `ResultWrapper` object, which allows setting a delta state and an arbitrary value. `ResultWrapper` takes a type parameter, as akka's generics limitation does not apply here.
* Please not that DCRDT methods are currently not yet atomic. This can pose a problem if a set being iterated over is modified by a `merge()` call. This issue will be fixed in a future version.

Once implemented correctly, instances of DCRDT classes can be used just like any other data type in akka:

```
// ...
if (master) {
set = system().replicate("counter", new AddOnlySetString(), JConsistencyLevels.DCRDT);
} else {
set = system().lookup("counter", AddOnlySetString.class, JConsistencyLevels.DCRDT);
}

// ...
set.ref().addElement("Hello");

// ...
if (! set.ref().addElement2("Hello").value) {
System.out.println("element already in set");
}
```
Loading