From eaaa054e58f80594d2a5b4897239505912964826 Mon Sep 17 00:00:00 2001 From: Erik Erlandson Date: Wed, 25 Nov 2015 17:56:32 -0700 Subject: [PATCH 1/5] Implement an appendMonoid Aggregator factory which yields aggregators that can take advantage of an efficient append method for faster aggregations --- .../com/twitter/algebird/Aggregator.scala | 41 +++++++++++++ .../algebird/AppendAggregatorTest.scala | 60 +++++++++++++++++++ 2 files changed, 101 insertions(+) create mode 100644 algebird-test/src/test/scala/com/twitter/algebird/AppendAggregatorTest.scala diff --git a/algebird-core/src/main/scala/com/twitter/algebird/Aggregator.scala b/algebird-core/src/main/scala/com/twitter/algebird/Aggregator.scala index ad9b4a5f6..4a3f512b2 100644 --- a/algebird-core/src/main/scala/com/twitter/algebird/Aggregator.scala +++ b/algebird-core/src/main/scala/com/twitter/algebird/Aggregator.scala @@ -47,6 +47,47 @@ object Aggregator extends java.io.Serializable { def present(reduction: T) = reduction } + /** + * Obtain a [[MonoidAggregator]] that uses an efficient append operation for faster aggregation + * @tparam F Data input type + * @tparam T Aggregating [[Monoid]] type + * @param appnd Function that appends the [[Monoid]]. Defines the [[append]] method for this aggregator. + * Analogous to the 'seqop' function in Scala's sequence 'aggregate' method + * @param m The [[Monoid]] type class + */ + def appendMonoid[F, T](appnd: (T, F) => T)(implicit m: Monoid[T]): MonoidAggregator[F, T, T] = + appendMonoid(appnd, identity[T]_)(m) + + /** + * Obtain a [[MonoidAggregator]] that uses an efficient append operation for faster aggregation + * @tparam F Data input type + * @tparam T Aggregating [[Monoid]] type + * @tparam P Presentation (output) type + * @param appnd Function that appends the [[Monoid]]. Defines the [[append]] method for this aggregator. + * Analogous to the 'seqop' function in Scala's sequence 'aggregate' method + * @param pres The presentation function + * @param m The [[Monoid]] type class + */ + def appendMonoid[F, T, P](appnd: (T, F) => T, pres: T => P)(implicit m: Monoid[T]): MonoidAggregator[F, T, P] = + new MonoidAggregator[F, T, P] { + def monoid: Monoid[T] = m + def prepare(input: F): T = appnd(m.zero, input) + def present(reduction: T): P = pres(reduction) + + override def apply(inputs: TraversableOnce[F]): P = present(agg(inputs)) + + override def applyOption(inputs: TraversableOnce[F]): Option[P] = + if (inputs.isEmpty) None else Some(apply(inputs)) + + override def append(l: T, r: F): T = appnd(l, r) + + override def appendAll(old: T, items: TraversableOnce[F]): T = reduce(old, agg(items)) + + override def appendAll(items: TraversableOnce[F]): T = agg(items) + + private def agg(inputs: TraversableOnce[F]): T = inputs.aggregate(m.zero)(appnd, m.plus) + } + /** * How many items satisfy a predicate */ diff --git a/algebird-test/src/test/scala/com/twitter/algebird/AppendAggregatorTest.scala b/algebird-test/src/test/scala/com/twitter/algebird/AppendAggregatorTest.scala new file mode 100644 index 000000000..1dcab3585 --- /dev/null +++ b/algebird-test/src/test/scala/com/twitter/algebird/AppendAggregatorTest.scala @@ -0,0 +1,60 @@ +package com.twitter.algebird + +import org.scalatest._ + +class AppendAggregatorTest extends WordSpec with Matchers { + val data = Vector.fill(100) { scala.util.Random.nextInt(100) } + val mpty = Vector.empty[Int] + + // test the methods that appendMonoid method defines or overrides + def testMethods[E, M, P]( + agg1: MonoidAggregator[E, M, P], + agg2: MonoidAggregator[E, M, P], + data: Seq[E], + empty: Seq[E]) { + + val n = data.length + val (half1, half2) = data.splitAt(n / 2) + val lhs = agg1.appendAll(half1) + + data.foreach { e => + agg1.prepare(e) should be(agg2.prepare(e)) + } + + agg1.present(lhs) should be(agg2.present(lhs)) + + agg1(data) should be (agg2(data)) + agg1(empty) should be (agg2(empty)) + + agg1.applyOption(data) should be(agg2.applyOption(data)) + agg1.applyOption(empty) should be(agg2.applyOption(empty)) + + half2.foreach { e => + agg1.append(lhs, e) should be(agg2.append(lhs, e)) + } + + agg1.appendAll(lhs, half2) should be(agg2.appendAll(lhs, half2)) + + agg2.appendAll(data) should be(agg2.appendAll(data)) + } + + "appendMonoid" should { + "be equivalent to integer monoid aggregator" in { + val agg1 = Aggregator.fromMonoid[Int] + val agg2 = Aggregator.appendMonoid((m: Int, e: Int) => m + e) + testMethods(agg1, agg2, data, mpty) + } + + "be equivalent to set monoid aggregator" in { + object setMonoid extends Monoid[Set[Int]] { + val zero = Set.empty[Int] + def plus(m1: Set[Int], m2: Set[Int]) = m1 ++ m2 + } + + val agg1 = Aggregator.prepareMonoid((e: Int) => Set(e))(setMonoid) + val agg2 = Aggregator.appendMonoid((m: Set[Int], e: Int) => m + e)(setMonoid) + + testMethods(agg1, agg2, data, mpty) + } + } +} From fd9b8668f0c7e43f7bb814c3fbb44688002028c6 Mon Sep 17 00:00:00 2001 From: Erik Erlandson Date: Wed, 2 Dec 2015 09:48:38 -0700 Subject: [PATCH 2/5] Add updateSemigroup factory function --- .../com/twitter/algebird/Aggregator.scala | 52 +++++++++++++++++-- 1 file changed, 48 insertions(+), 4 deletions(-) diff --git a/algebird-core/src/main/scala/com/twitter/algebird/Aggregator.scala b/algebird-core/src/main/scala/com/twitter/algebird/Aggregator.scala index 4a3f512b2..af731a47b 100644 --- a/algebird-core/src/main/scala/com/twitter/algebird/Aggregator.scala +++ b/algebird-core/src/main/scala/com/twitter/algebird/Aggregator.scala @@ -35,6 +35,11 @@ object Aggregator extends java.io.Serializable { def fromMonoid[F, T](implicit mon: Monoid[T], prep: F => T): MonoidAggregator[F, T, T] = prepareMonoid(prep)(mon) + def prepareSemigroup[F, T](prep: F => T)(implicit sg: Semigroup[T]): Aggregator[F, T, T] = new Aggregator[F, T, T] { + def prepare(input: F) = prep(input) + def semigroup = sg + def present(reduction: T) = reduction + } def prepareMonoid[F, T](prep: F => T)(implicit m: Monoid[T]): MonoidAggregator[F, T, T] = new MonoidAggregator[F, T, T] { def prepare(input: F) = prep(input) def monoid = m @@ -48,12 +53,50 @@ object Aggregator extends java.io.Serializable { } /** - * Obtain a [[MonoidAggregator]] that uses an efficient append operation for faster aggregation + * Obtain an [[Aggregator]] that uses an efficient append operation for faster aggregation. + * Equivalent to {{{ appendSemigroup(prep, appnd, identity[T]_)(sg) }}} + */ + def appendSemigroup[F, T](prep: F => T, appnd: (T, F) => T)(implicit sg: Semigroup[T]): Aggregator[F, T, T] = + appendSemigroup(prep, appnd, identity[T]_)(sg) + + /** + * Obtain an [[Aggregator]] that uses an efficient append operation for faster aggregation * @tparam F Data input type - * @tparam T Aggregating [[Monoid]] type - * @param appnd Function that appends the [[Monoid]]. Defines the [[append]] method for this aggregator. + * @tparam T Aggregating [[Semigroup]] type + * @tparam P Presentation (output) type + * @param prep The preparation function. Expected to construct an instance of type T from a single data element. + * @param appnd Function that appends the [[Semigroup]]. Defines the [[append]] method for this aggregator. * Analogous to the 'seqop' function in Scala's sequence 'aggregate' method - * @param m The [[Monoid]] type class + * @param pres The presentation function + * @param sg The [[Semigroup]] type class + * @note The functions 'appnd' and 'prep' are expected to obey the law: {{{ appnd(t, f) == sg.plus(t, prep(f)) }}} + */ + def appendSemigroup[F, T, P](prep: F => T, appnd: (T, F) => T, pres: T => P)(implicit sg: Semigroup[T]): Aggregator[F, T, P] = + new Aggregator[F, T, P] { + def semigroup: Semigroup[T] = sg + def prepare(input: F): T = prep(input) + def present(reduction: T): P = pres(reduction) + + override def apply(inputs: TraversableOnce[F]): P = applyOption(inputs).get + + override def applyOption(inputs: TraversableOnce[F]): Option[P] = agg(inputs).map(pres) + + override def append(l: T, r: F): T = appnd(l, r) + + override def appendAll(old: T, items: TraversableOnce[F]): T = + if (items.isEmpty) old else reduce(old, agg(items).get) + + private def agg(inputs: TraversableOnce[F]): Option[T] = + if (inputs.isEmpty) None else { + val itr = inputs.toIterator + val t = prepare(itr.next) + Some(itr.foldLeft(t)(appnd)) + } + } + + /** + * Obtain a [[MonoidAggregator]] that uses an efficient append operation for faster aggregation. + * Equivalent to {{{ appendMonoid(appnd, identity[T]_)(m) }}} */ def appendMonoid[F, T](appnd: (T, F) => T)(implicit m: Monoid[T]): MonoidAggregator[F, T, T] = appendMonoid(appnd, identity[T]_)(m) @@ -67,6 +110,7 @@ object Aggregator extends java.io.Serializable { * Analogous to the 'seqop' function in Scala's sequence 'aggregate' method * @param pres The presentation function * @param m The [[Monoid]] type class + * @note The function 'appnd' is expected to obey the law: {{{ appnd(t, f) == m.plus(t, appnd(m.zero, f)) }}} */ def appendMonoid[F, T, P](appnd: (T, F) => T, pres: T => P)(implicit m: Monoid[T]): MonoidAggregator[F, T, P] = new MonoidAggregator[F, T, P] { From c9f2e06555c694ce31f70338693341c773ad108e Mon Sep 17 00:00:00 2001 From: Erik Erlandson Date: Wed, 2 Dec 2015 09:50:48 -0700 Subject: [PATCH 3/5] Optimize the AlgebirdRDD aggregateOption method to take advantage of faster appendAll methods for Aggregator objects --- .../com/twitter/algebird/spark/AlgebirdRDD.scala | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/algebird-spark/src/main/scala/com/twitter/algebird/spark/AlgebirdRDD.scala b/algebird-spark/src/main/scala/com/twitter/algebird/spark/AlgebirdRDD.scala index 88fe1a425..f930a2965 100644 --- a/algebird-spark/src/main/scala/com/twitter/algebird/spark/AlgebirdRDD.scala +++ b/algebird-spark/src/main/scala/com/twitter/algebird/spark/AlgebirdRDD.scala @@ -18,10 +18,17 @@ class AlgebirdRDD[T](val rdd: RDD[T]) extends AnyVal { * requires a commutative Semigroup. To generalize to non-commutative, we need a sorted partition for * T. */ - def aggregateOption[B: ClassTag, C](agg: Aggregator[T, B, C]): Option[C] = - (new AlgebirdRDD(rdd.map(agg.prepare))) - .sumOption(agg.semigroup, implicitly) - .map(agg.present) + def aggregateOption[B: ClassTag, C](agg: Aggregator[T, B, C]): Option[C] = { + val pr = rdd.mapPartitions({ data => + if (data.isEmpty) Iterator.empty else { + val sg = agg.prepare(data.next) + Iterator(agg.appendAll(sg, data)) + } + }, preservesPartitioning = true) + pr.coalesce(1, shuffle = true) + .mapPartitions(pr => Iterator(agg.semigroup.sumOption(pr))) + .collect.head.map(agg.present) + } /** * This will throw if you use a non-MonoidAggregator with an empty RDD From ba9df9f6871007c86b163ef0574d4b25f65e0945 Mon Sep 17 00:00:00 2001 From: Erik Erlandson Date: Wed, 2 Dec 2015 09:51:07 -0700 Subject: [PATCH 4/5] unit testing for semigroup variants --- .../algebird/AppendAggregatorTest.scala | 47 +++++++++++++++---- 1 file changed, 38 insertions(+), 9 deletions(-) diff --git a/algebird-test/src/test/scala/com/twitter/algebird/AppendAggregatorTest.scala b/algebird-test/src/test/scala/com/twitter/algebird/AppendAggregatorTest.scala index 1dcab3585..64769a2f6 100644 --- a/algebird-test/src/test/scala/com/twitter/algebird/AppendAggregatorTest.scala +++ b/algebird-test/src/test/scala/com/twitter/algebird/AppendAggregatorTest.scala @@ -6,16 +6,16 @@ class AppendAggregatorTest extends WordSpec with Matchers { val data = Vector.fill(100) { scala.util.Random.nextInt(100) } val mpty = Vector.empty[Int] - // test the methods that appendMonoid method defines or overrides - def testMethods[E, M, P]( - agg1: MonoidAggregator[E, M, P], - agg2: MonoidAggregator[E, M, P], + // test the methods that appendSemigroup method defines or overrides + def testMethodsSemigroup[E, M, P]( + agg1: Aggregator[E, M, P], + agg2: Aggregator[E, M, P], data: Seq[E], empty: Seq[E]) { val n = data.length val (half1, half2) = data.splitAt(n / 2) - val lhs = agg1.appendAll(half1) + val lhs = agg1.appendAll(agg1.prepare(half1.head), half1.tail) data.foreach { e => agg1.prepare(e) should be(agg2.prepare(e)) @@ -24,7 +24,6 @@ class AppendAggregatorTest extends WordSpec with Matchers { agg1.present(lhs) should be(agg2.present(lhs)) agg1(data) should be (agg2(data)) - agg1(empty) should be (agg2(empty)) agg1.applyOption(data) should be(agg2.applyOption(data)) agg1.applyOption(empty) should be(agg2.applyOption(empty)) @@ -34,15 +33,26 @@ class AppendAggregatorTest extends WordSpec with Matchers { } agg1.appendAll(lhs, half2) should be(agg2.appendAll(lhs, half2)) + } + + // test the methods that appendMonoid method defines or overrides + def testMethodsMonoid[E, M, P]( + agg1: MonoidAggregator[E, M, P], + agg2: MonoidAggregator[E, M, P], + data: Seq[E], + empty: Seq[E]) { - agg2.appendAll(data) should be(agg2.appendAll(data)) + testMethodsSemigroup(agg1, agg2, data, empty) + + agg1(empty) should be (agg2(empty)) + agg1.appendAll(data) should be(agg2.appendAll(data)) } "appendMonoid" should { "be equivalent to integer monoid aggregator" in { val agg1 = Aggregator.fromMonoid[Int] val agg2 = Aggregator.appendMonoid((m: Int, e: Int) => m + e) - testMethods(agg1, agg2, data, mpty) + testMethodsMonoid(agg1, agg2, data, mpty) } "be equivalent to set monoid aggregator" in { @@ -54,7 +64,26 @@ class AppendAggregatorTest extends WordSpec with Matchers { val agg1 = Aggregator.prepareMonoid((e: Int) => Set(e))(setMonoid) val agg2 = Aggregator.appendMonoid((m: Set[Int], e: Int) => m + e)(setMonoid) - testMethods(agg1, agg2, data, mpty) + testMethodsMonoid(agg1, agg2, data, mpty) + } + } + + "appendSemigroup" should { + "be equivalent to integer semigroup aggregator" in { + val agg1 = Aggregator.fromSemigroup[Int] + val agg2 = Aggregator.appendSemigroup(identity[Int]_, (m: Int, e: Int) => m + e) + testMethodsSemigroup(agg1, agg2, data, mpty) + } + + "be equivalent to set semigroup aggregator" in { + object setSemigroup extends Semigroup[Set[Int]] { + def plus(m1: Set[Int], m2: Set[Int]) = m1 ++ m2 + } + + val agg1 = Aggregator.prepareSemigroup((e: Int) => Set(e))(setSemigroup) + val agg2 = Aggregator.appendSemigroup((e: Int) => Set(e), (m: Set[Int], e: Int) => m + e)(setSemigroup) + + testMethodsSemigroup(agg1, agg2, data, mpty) } } } From f0dc4f27504eb76cdd2130d55dc37b97d030c235 Mon Sep 17 00:00:00 2001 From: Erik Erlandson Date: Wed, 2 Dec 2015 13:12:05 -0700 Subject: [PATCH 5/5] rename sg -> b to represent proper type --- .../main/scala/com/twitter/algebird/spark/AlgebirdRDD.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/algebird-spark/src/main/scala/com/twitter/algebird/spark/AlgebirdRDD.scala b/algebird-spark/src/main/scala/com/twitter/algebird/spark/AlgebirdRDD.scala index f930a2965..ea07466e6 100644 --- a/algebird-spark/src/main/scala/com/twitter/algebird/spark/AlgebirdRDD.scala +++ b/algebird-spark/src/main/scala/com/twitter/algebird/spark/AlgebirdRDD.scala @@ -21,8 +21,8 @@ class AlgebirdRDD[T](val rdd: RDD[T]) extends AnyVal { def aggregateOption[B: ClassTag, C](agg: Aggregator[T, B, C]): Option[C] = { val pr = rdd.mapPartitions({ data => if (data.isEmpty) Iterator.empty else { - val sg = agg.prepare(data.next) - Iterator(agg.appendAll(sg, data)) + val b = agg.prepare(data.next) + Iterator(agg.appendAll(b, data)) } }, preservesPartitioning = true) pr.coalesce(1, shuffle = true)