From ad2f1d7953873045ff5ca48029a61ff583f81309 Mon Sep 17 00:00:00 2001 From: "P. Oscar Boykin" Date: Wed, 26 Nov 2014 13:07:59 -1000 Subject: [PATCH 1/2] Add aggregate to summingbird --- .../com/twitter/summingbird/Producer.scala | 20 ++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/summingbird-core/src/main/scala/com/twitter/summingbird/Producer.scala b/summingbird-core/src/main/scala/com/twitter/summingbird/Producer.scala index a0db69e2f..b1b64f93b 100644 --- a/summingbird-core/src/main/scala/com/twitter/summingbird/Producer.scala +++ b/summingbird-core/src/main/scala/com/twitter/summingbird/Producer.scala @@ -16,7 +16,7 @@ package com.twitter.summingbird -import com.twitter.algebird.Semigroup +import com.twitter.algebird.{ Aggregator, Semigroup } object Producer { @@ -249,6 +249,24 @@ case class Summer[P <: Platform[P], K, V]( */ sealed trait KeyedProducer[P <: Platform[P], K, V] extends Producer[P, (K, V)] { + /** + * This applies an Aggregator to the values. The result type is similar to sumByKey with + * a crucial difference: the tuple is Option(previous aggregated value), current aggregated value + * in sumByKey you get previous and the delta, but after agg.present, the delta cannot be combined + * and is not meaningful in the general case. + */ + def aggregate[V1, V2](store: P#Store[K, V1], agg: Aggregator[V, V1, V2]): KeyedProducer[P, K, (Option[V2], V2)] = { + // When the next version of algebird is added, use agg.semigroup + val sg = Semigroup.from[V1](agg.reduce) + mapValues(agg.prepare) + .sumByKey(store)(sg) + .mapValues { + case (optv1, v1) => + val resultv1 = if (optv1.isDefined) sg.plus(optv1.get, v1) else v1 + (optv1.map(agg.present), agg.present(resultv1)) + } + } + /** Builds a new KeyedProvider by applying a partial function to keys of elements of this one on which the function is defined.*/ def collectKeys[K2](pf: PartialFunction[K, K2]): KeyedProducer[P, K2, V] = IdentityKeyedProducer(collect { case (k, v) if pf.isDefinedAt(k) => (pf(k), v) }) From a6a67d1fb2ef1293f821669e5b66bb19c710e64c Mon Sep 17 00:00:00 2001 From: Oscar Boykin Date: Wed, 20 Jul 2016 18:23:53 -1000 Subject: [PATCH 2/2] Add tests for aggregate --- .../summingbird/memory/MemoryLaws.scala | 28 ++++++++++++++++++- .../com/twitter/summingbird/Producer.scala | 3 +- 2 files changed, 28 insertions(+), 3 deletions(-) diff --git a/summingbird-core-test/src/test/scala/com/twitter/summingbird/memory/MemoryLaws.scala b/summingbird-core-test/src/test/scala/com/twitter/summingbird/memory/MemoryLaws.scala index 3460d98cc..b9ccd3f07 100644 --- a/summingbird-core-test/src/test/scala/com/twitter/summingbird/memory/MemoryLaws.scala +++ b/summingbird-core-test/src/test/scala/com/twitter/summingbird/memory/MemoryLaws.scala @@ -16,7 +16,7 @@ limitations under the License. package com.twitter.summingbird.memory -import com.twitter.algebird.{ MapAlgebra, Monoid, Semigroup } +import com.twitter.algebird.{ Aggregator, MapAlgebra, Monoid, Semigroup } import com.twitter.summingbird._ import com.twitter.summingbird.option.JobId import org.scalacheck.{ Arbitrary, _ } @@ -214,6 +214,32 @@ class MemoryLaws extends WordSpec { assert(store1.toMap == ((0 to 100).groupBy(_ % 3).mapValues(_.sum))) assert(store2.toMap == ((0 to 100).groupBy(_ % 3).mapValues(_.sum))) } + "aggregate should work" in { + val source = Memory.toSource((0 to 100).reverse) + val store = MutableMap.empty[Int, Int] + val buf = MutableMap.empty[Int, List[(Option[Int], Int)]] + val prod = source.map { t => (t % 2, t) } + .aggregate(store, Aggregator.max[Int].andThenPresent(_ * 2).composePrepare(_ / 2)) + .write { kv => + val (k, vs) = kv + buf(k) = vs :: buf.getOrElse(k, Nil) + } + val mem = new Memory + mem.run(mem.plan(prod)) + + assert(store.keySet == Set(0, 1)) + assert(store(0) == (0 to 100).filter(_ % 2 == 0).map(_ / 2).max) + assert(store(1) == (0 to 100).filter(_ % 2 == 1).map(_ / 2).max) + assert(buf.keySet == Set(0, 1)) + assert(buf(0).map(_._2) == + (0 to 100).reverse.filter(_ % 2 == 0).map { t => (t / 2) * 2 }.toList) + assert(buf(0).map(_._1) == + (None :: ((0 to 100).reverse.filter(_ % 2 == 0).map { t => Some((t / 2)*2) }.toList))) + assert(buf(1).map(_._2) == + (0 to 100).reverse.filter(_ % 2 == 1).map { t => (t / 2) * 2 }.toList) + assert(buf(1).map(_._1) == + (None :: ((0 to 100).reverse.filter(_ % 2 == 1).map { t => Some((t / 2)*2) }.toList))) + } "self also shouldn't duplicate work" in { val platform = new Memory diff --git a/summingbird-core/src/main/scala/com/twitter/summingbird/Producer.scala b/summingbird-core/src/main/scala/com/twitter/summingbird/Producer.scala index cf4970b7d..9aa7fe4bf 100644 --- a/summingbird-core/src/main/scala/com/twitter/summingbird/Producer.scala +++ b/summingbird-core/src/main/scala/com/twitter/summingbird/Producer.scala @@ -258,8 +258,7 @@ sealed trait KeyedProducer[P <: Platform[P], K, V] extends Producer[P, (K, V)] { * and is not meaningful in the general case. */ def aggregate[V1, V2](store: P#Store[K, V1], agg: Aggregator[V, V1, V2]): KeyedProducer[P, K, (Option[V2], V2)] = { - // When the next version of algebird is added, use agg.semigroup - val sg = Semigroup.from[V1](agg.reduce) + val sg = agg.semigroup mapValues(agg.prepare) .sumByKey(store)(sg) .mapValues {