From 67c6688d2ea04390a04201e6acb895515df073c5 Mon Sep 17 00:00:00 2001 From: Sam Ritchie Date: Sat, 29 Oct 2016 08:01:07 -0600 Subject: [PATCH 01/17] exponential histogram, slow but tested. --- .../algebird/benchmark/ExpHistBenchmark.scala | 53 ++++ .../scala/com/twitter/algebird/ExpHist.scala | 255 ++++++++++++++++++ .../com/twitter/algebird/ExpHistLaws.scala | 164 +++++++++++ 3 files changed, 472 insertions(+) create mode 100644 algebird-benchmark/src/main/scala/com/twitter/algebird/benchmark/ExpHistBenchmark.scala create mode 100644 algebird-core/src/main/scala/com/twitter/algebird/ExpHist.scala create mode 100644 algebird-test/src/test/scala/com/twitter/algebird/ExpHistLaws.scala diff --git a/algebird-benchmark/src/main/scala/com/twitter/algebird/benchmark/ExpHistBenchmark.scala b/algebird-benchmark/src/main/scala/com/twitter/algebird/benchmark/ExpHistBenchmark.scala new file mode 100644 index 000000000..50aa454fe --- /dev/null +++ b/algebird-benchmark/src/main/scala/com/twitter/algebird/benchmark/ExpHistBenchmark.scala @@ -0,0 +1,53 @@ +package com.twitter.algebird +package benchmark + +import org.openjdk.jmh.annotations._ +import org.openjdk.jmh.infra.Blackhole +import scala.util.Random + +/** + * Benchmarks the Exponential Histogram implementation in Algebird. + */ +object ExpHistBenchmark { + @State(Scope.Benchmark) + class ExpHistState { + @Param(Array("0.1", "0.005")) + var eps: Double = 0.0 + + @Param(Array("1000")) + var window: Int = 0 + + // number of data values to combine into an ExpHist + @Param(Array("10", "100", "1000")) + var numElements: Int = 0 + + var empty: ExpHist = _ + var inputData: Vector[(Long, Long)] = _ + + @Setup(Level.Trial) + def setup(): Unit = { + val rng = new Random(3) + + val conf = ExpHist.Config(math.ceil(1 / eps).toInt, window) + empty = ExpHist.empty(conf) + + inputData = (0L until numElements).map { _ => + val timestamp = rng.nextInt(window).toLong + val item = rng.nextInt(Int.MaxValue).toLong + (item, timestamp) + }.sortBy(_._2).toVector + + } + } +} + +class ExpHistBenchmark { + import ExpHistBenchmark._ + + @Benchmark + def timeAdd(state: ExpHistState, bh: Blackhole) = { + state.inputData.foreach { pair => + bh.consume(state.empty.add(pair._1, pair._2)) + } + } +} diff --git a/algebird-core/src/main/scala/com/twitter/algebird/ExpHist.scala b/algebird-core/src/main/scala/com/twitter/algebird/ExpHist.scala new file mode 100644 index 000000000..4f10b02ef --- /dev/null +++ b/algebird-core/src/main/scala/com/twitter/algebird/ExpHist.scala @@ -0,0 +1,255 @@ +package com.twitter.algebird + +/** + * Exponential Histogram algorithm from + * http://ilpubs.stanford.edu:8090/504/1/2001-34.pdf. + * + * + * Next step - code up the l-canonical representation of numbers, + * then do some of the fancier business in the paper! + */ + +object ExpHist { + private[this] def log2(i: Float): Double = math.log(i) / math.log(2) + + def minBuckets(k: Int) = math.ceil(k / 2.0).toInt + def maxBuckets(k: Int) = minBuckets(k) + 1 + + /** + * returns a vector of the total number of buckets of size `s^i`, + * where `i` is the vector index, used to represent s for a given + * k. + * + * `s` is the "sum of the sizes of all of the buckets tracked by + * the ExpHist instance. + * + * `k` is `math.ceil(epsilon / 2)`, where epsilon is the relative + * error of `s`. + */ + def lNormalize(s: Long, k: Int): Vector[Int] = { + val l = minBuckets(k) + val j = log2(s / l + 1).toInt + // returns the little-endian bit rep of the supplied number. + def binarize(sh: Long): Vector[Int] = + ((0 until j).map { i => l + ((sh.toInt >> i) % 2) }).toVector + + val twoJ = 1 << j + val sPrime = s - (twoJ - 1) * l + + if (sPrime >= twoJ) { + val m = (sPrime >> j) + val sHat = (sPrime - m * twoJ) + binarize(sHat) :+ m.toInt + } else binarize(sPrime) + } + + /** + * Expand out a number's l-normalized form into the original + * number. + */ + def expand(form: Vector[Int]): Long = + form.zipWithIndex + .map { case (i, exp) => i.toLong << exp } + .reduce(_ + _) + + def relativeError(e: ExpHist): Double = + if (e.total == 0) 0.0 + else { + val maxOutsideWindow = e.last - 1 + val minInsideWindow = 1 + e.total - e.last + val absoluteError = maxOutsideWindow / 2.0 + absoluteError / minInsideWindow + } + + case class Config(k: Int, windowSize: Long) { + // Maximum number of buckets of size 2^i allowed in the repr of + // this exponential histogram. + def maxBuckets: Int = ExpHist.maxBuckets(k) + + // Returns the last timestamp before the window. any ts <= [the + // returned timestamp] is outside the window. + def expiration(currTime: Long): Long = currTime - windowSize + } + + def empty(conf: Config): ExpHist = ExpHist(conf, Vector.empty, 0L, 0L, 0L) + def empty(k: Int, windowSize: Long): ExpHist = empty(Config(k, windowSize)) +} + +/** + * @param total counter for the total size of all buckets. + * @param last the size of the oldest bucket. + */ +case class ExpHist(conf: ExpHist.Config, buckets: Vector[BucketSeq], time: Long, last: Long, total: Long) { + /** + * Returns the same ExpHist with a new window. If the new window is + * smaller than the current window, evicts older items. + */ + def withWindow(newWindow: Long): ExpHist = copy(conf = conf.copy(windowSize = newWindow)).step(time) + + /** + * Step forward to `newTime`, evicting any wrapped buckets that + * fall outside of the window. + */ + def step(newTime: Long): ExpHist = + if (newTime <= time) this + else { + val t = conf.expiration(newTime) + + // TODO this is junk, but now the madness is contained. + val newBuckets = buckets.flatMap(_.expire(t)) + val newLast = newBuckets.headOption.map(_.bucketSize).getOrElse(0L) + val newTotal = newBuckets.map(_.count).reduceLeftOption(_ + _).getOrElse(0L) + + copy( + buckets = buckets.flatMap(_.expire(t)), + time = newTime, total = newTotal, last = newLast + ) + } + + /** + * "inc" algorithm: + * + * * Step forward to the new timestamp + * * create a new bucket with size 1 and timestamp + * * Traverse the list of buckets in order of increasing sizes. + * + * If there are (k / 2 + 2) or more buckets of the same size, merge + * the oldest two of these buckets into a single bucket of double + * the size. Repeat until the bucket size is less than the limit. + */ + def inc(timestamp: Long): ExpHist = { + val stepped = step(timestamp) + val newBuckets = + stepped.buckets.lastOption match { + case None => Vector(BucketSeq.one(timestamp)) + case Some(bucketSeq) => stepped.buckets.init :+ (bucketSeq + timestamp) + } + stepped.copy( + total = total + 1, + buckets = BucketSeq.normalize(conf.maxBuckets, newBuckets)) + } + + // Stupid implementation of `add` - just inc a bunch of times with + // the same timestamp. + def add(i: Long, timestamp: Long): ExpHist = + (0L until i).foldLeft(step(timestamp)) { + case (acc, _) => acc.inc(timestamp) + } + + def lowerBoundSum: Long = total - last + def upperBoundSum: Long = total + + // For testing. Returns the vector of bucket sizes from largest to + // smallest. + def windows: Vector[Long] = + for { + b <- buckets + t <- b.timestamps + } yield b.bucketSize +} + +/** + * "compressed" representation of `timestamps.length` buckets of size + * `exp`. + * + * Bucket timestamps are sorted in time-increasing order (so the + * oldest bucket is at the head). + */ +case class BucketSeq(exp: BucketSeq.Pow2, timestamps: Vector[Long]) { l => + require(timestamps.sorted == timestamps) + require(timestamps.nonEmpty) + + // bucketSize, as defined in the paper. + def bucketSize: Long = exp.value + + // Total number of ticks recorded in this BucketSeq. + def count: Long = bucketSize * length + + // Total number of buckets tracked by the BucketSeq. + def length: Int = timestamps.length + + // Add a new timestamp. TODO: NOT sure if it's legit to not require + // that the new timestamp be > all existing timestamps. + def +(ts: Long): BucketSeq = copy(timestamps = (timestamps :+ ts).sorted) + + def ++(r: BucketSeq): BucketSeq = { + require(l.exp == r.exp) + copy(timestamps = (l.timestamps ++ r.timestamps).sorted) + } + + /** + * Remove all timestamps <= the cutoff. Returns: + * + * - None if the resulting [[BucketSeq]] is empty, + * - Some(the filtered [[BucketSeq]]) otherwise. + */ + def expire(cutoff: Long): Option[BucketSeq] = + Some(timestamps.filter(_ > cutoff)) + .filter(_.nonEmpty) + .map { v => copy(timestamps = v) } + + /** + * Returns the number of pairs to drop to get this BucketSeq's final + * length <= the supplied limit. + */ + private[this] def pairsToDrop(limit: Int): Int = + 0 max math.ceil((length - limit) / 2.0).toInt + + /** + * if this.length <= limit, returns (this, None). + * + * else, splits this BucketSeq into two by combining pairs of + * timestamps into new buckets until enough timestamps have been + * removed that this.length <= limit. + * + * The new pairs form a new [[BucketSeq]] instance with a doubled + * bucketSize. + * + * The return value in this case is a pair of the new, slimmed-down + * current BucketSeq and Some(the new, doubled BucketSeq). + */ + def evolve(limit: Int): (BucketSeq, Option[BucketSeq]) = + pairsToDrop(limit) match { + case 0 => (this, None) + case pairs => + val childTs = (0 until pairs).map(i => timestamps(i * 2 + 1)) + val child = Some(BucketSeq(exp.double, childTs.toVector)) + (copy(timestamps = timestamps.drop(pairs * 2)), child) + } + + /** + * Expands this BucketSeq out into a vector of BucketSeq instances + * that all have lengths <= the supplied limit. (Used on the oldest + * bucket after normalizing a sequence of BucketSeqs.) + */ + def expand(limit: Int): Vector[BucketSeq] = + evolve(limit) match { + case (bs, None) => Vector(bs) + case (bs, Some(remaining)) => remaining.expand(limit) :+ bs + } +} + +object BucketSeq { + case class Pow2(exp: Int) extends AnyVal { l => + def double: Pow2 = Pow2(exp + 1) + def value: Long = 1 << exp + def *(r: Pow2) = Pow2(l.exp + r.exp) + } + + // Returns a singleton bucketseq. + def one(timestamp: Long): BucketSeq = BucketSeq(Pow2(0), Vector(timestamp)) + + // evolves all BucketSeq instances in `seqs` from newest -> oldest + // until every [[BucketSeq]] length is <= the supplied limit. + def normalize(limit: Int, seqs: Vector[BucketSeq]): Vector[BucketSeq] = { + val empty = (Vector.empty[BucketSeq], None: Option[BucketSeq]) + + val (ret, extra) = seqs.foldRight(empty) { + case (bs, (acc, optCarry)) => + val withCarry = optCarry.map(_ ++ bs).getOrElse(bs) + val (evolved, carry) = withCarry.evolve(limit) + (evolved +: acc, carry) + } + extra.map(_.expand(limit) ++ ret).getOrElse(ret) + } +} diff --git a/algebird-test/src/test/scala/com/twitter/algebird/ExpHistLaws.scala b/algebird-test/src/test/scala/com/twitter/algebird/ExpHistLaws.scala new file mode 100644 index 000000000..4aa2fe729 --- /dev/null +++ b/algebird-test/src/test/scala/com/twitter/algebird/ExpHistLaws.scala @@ -0,0 +1,164 @@ +package com.twitter.algebird + +import org.scalatest.{ PropSpec, Matchers } +import org.scalatest.prop.PropertyChecks +import org.scalatest.prop.Checkers.check +import org.scalacheck.{ Gen, Arbitrary } +import Arbitrary.arbitrary + +case class PosNum[T: Numeric](value: T) + +object PosNum { + implicit def arb[T: Numeric: Gen.Choose]: Arbitrary[PosNum[T]] = + Arbitrary(for { p <- Gen.posNum[T] } yield PosNum(p)) +} + +case class Tick(count: Long, timestamp: Long) + +object Tick { + implicit val ord: Ordering[Tick] = + Ordering.by { t: Tick => (t.timestamp, t.count) } + + implicit val arb: Arbitrary[Tick] = + Arbitrary(for { + ts <- Gen.posNum[Long] + tick <- Gen.posNum[Long] + } yield Tick(tick - 1L, ts)) +} + +case class NonEmptyList[T](items: List[T]) { + def sorted(implicit ev: Ordering[T]): List[T] = items.sorted +} + +object NonEmptyList { + implicit def arb[T: Ordering: Arbitrary]: Arbitrary[NonEmptyList[T]] = + Arbitrary(for { + l <- Gen.nonEmptyListOf(arbitrary[T]) + } yield NonEmptyList[T](l)) +} + +class ExpHistLaws extends PropSpec with PropertyChecks with Matchers { + import BaseProperties._ + + implicit val conf: Arbitrary[ExpHist.Config] = + Arbitrary(for { + k <- Gen.posNum[Short] + windowSize <- Gen.posNum[Long] + } yield ExpHist.Config(k, windowSize)) + + def addAll(e: ExpHist, ticks: List[Tick]): ExpHist = + ticks.foldLeft(e) { + case (e, Tick(count, timestamp)) => + e.add(count, timestamp) + } + + implicit val expHist: Arbitrary[ExpHist] = + Arbitrary(for { + ticks <- arbitrary[List[Tick]] + conf <- arbitrary[ExpHist.Config] + } yield addAll(ExpHist.empty(conf), ticks)) + + // The example in the paper is actually busted, based on his + // algorithm. He says to assume that k/2 == 2, but then he promotes + // as if k/2 == 1, ie k == 2. + property("example from paper") { + val e = ExpHist.empty(2, 100) + val plus76 = e.add(76, 0) + + val inc = plus76.inc(0) + val incinc = inc.add(2, 0) + + plus76.windows shouldBe Vector(32, 16, 8, 8, 4, 4, 2, 1, 1) + inc.windows shouldBe Vector(32, 16, 8, 8, 4, 4, 2, 2, 1) + incinc.windows shouldBe Vector(32, 16, 16, 8, 4, 2, 1) + } + + property("adding i results in upperBoundSum == i") { + forAll { (conf: ExpHist.Config, tick: Tick) => + assert(ExpHist.empty(conf) + .add(tick.count, tick.timestamp) + .upperBoundSum == tick.count) + } + } + + property("step should be idempotent") { + forAll { (expHist: ExpHist, tick: Tick) => + val ts = tick.timestamp + val stepped = expHist.step(ts) + assert(stepped == stepped.step(ts)) + + val added = expHist.add(tick.count, ts) + val addThenStep = added.step(ts) + val stepThenAdd = stepped.add(tick.count, ts) + + assert(added == addThenStep) + assert(added == stepThenAdd) + assert(addThenStep == stepThenAdd) + } + } + + property("stepping is the same as adding 0 at the same ts") { + forAll { (expHist: ExpHist, ts: PosNum[Long]) => + assert(expHist.step(ts.value) == expHist.add(0, ts.value)) + } + } + + // This is the meat! + property("bounded error of the query") { + forAll { (items: NonEmptyList[Tick], conf: ExpHist.Config) => + val ticks = items.sorted + + val full = addAll(ExpHist.empty(conf), ticks) + + val finalTs = ticks.last.timestamp + + val actualSum = ticks.collect { + case Tick(count, ts) if ts > (finalTs - conf.windowSize) => count + }.sum + + val upperBound = full.upperBoundSum + val lowerBound = full.lowerBoundSum + assert(lowerBound <= actualSum) + assert(upperBound >= actualSum) + + val maxOutsideWindow = full.last - 1 + val minInsideWindow = 1 + full.total - full.last + val absoluteError = maxOutsideWindow / 2.0 + val relativeError = absoluteError / minInsideWindow + assert(ExpHist.relativeError(full) <= 1.0 / conf.k) + } + } + + property("add and inc should generate the same results") { + forAll { (tick: Tick, conf: ExpHist.Config) => + val e = ExpHist.empty(conf) + + val incs = (0L until tick.count).foldLeft(e) { + case (acc, _) => acc.inc(tick.timestamp) + } + + val adds = e.add(tick.count, tick.timestamp) + + assert(incs.total == adds.total) + assert(incs.lowerBoundSum == adds.lowerBoundSum) + assert(incs.upperBoundSum == adds.upperBoundSum) + } + } + + property("l-canonical representation round-trips") { + forAll { (i: PosNum[Long], k: PosNum[Short]) => + assert(ExpHist.expand(ExpHist.lNormalize(i.value, k.value)) == i.value) + } + } + + property("all i except last have either k/2, k/2 + 1 buckets") { + forAll { (i: PosNum[Long], k: PosNum[Short]) => + val lower = ExpHist.minBuckets(k.value) + val upper = ExpHist.maxBuckets(k.value) + assert( + ExpHist.lNormalize(i.value, k.value).init.forall { numBuckets => + lower <= numBuckets && numBuckets <= upper + }) + } + } +} From 0a6234787a83b751ef6485b8782b1957eccc0f26 Mon Sep 17 00:00:00 2001 From: Sam Ritchie Date: Mon, 31 Oct 2016 13:03:19 -0600 Subject: [PATCH 02/17] better impl working --- .../scala/com/twitter/algebird/ExpHist.scala | 364 ++++++++---------- .../com/twitter/algebird/ExpHistLaws.scala | 36 +- 2 files changed, 194 insertions(+), 206 deletions(-) diff --git a/algebird-core/src/main/scala/com/twitter/algebird/ExpHist.scala b/algebird-core/src/main/scala/com/twitter/algebird/ExpHist.scala index 4f10b02ef..4ff1e219a 100644 --- a/algebird-core/src/main/scala/com/twitter/algebird/ExpHist.scala +++ b/algebird-core/src/main/scala/com/twitter/algebird/ExpHist.scala @@ -1,255 +1,221 @@ package com.twitter.algebird +import java.lang.{ Long => JLong } +import scala.annotation.tailrec + /** * Exponential Histogram algorithm from * http://ilpubs.stanford.edu:8090/504/1/2001-34.pdf. - * - * - * Next step - code up the l-canonical representation of numbers, - * then do some of the fancier business in the paper! */ - object ExpHist { - private[this] def log2(i: Float): Double = math.log(i) / math.log(2) + import Canonical.Exp2 - def minBuckets(k: Int) = math.ceil(k / 2.0).toInt - def maxBuckets(k: Int) = minBuckets(k) + 1 + // Same as math.ceil(x / 2.0).toInt + def div2Ceil(x: Int): Int = (x >> 1) + (if ((x & 1) == 0) 0 else 1) - /** - * returns a vector of the total number of buckets of size `s^i`, - * where `i` is the vector index, used to represent s for a given - * k. - * - * `s` is the "sum of the sizes of all of the buckets tracked by - * the ExpHist instance. - * - * `k` is `math.ceil(epsilon / 2)`, where epsilon is the relative - * error of `s`. - */ - def lNormalize(s: Long, k: Int): Vector[Int] = { - val l = minBuckets(k) - val j = log2(s / l + 1).toInt - // returns the little-endian bit rep of the supplied number. - def binarize(sh: Long): Vector[Int] = - ((0 until j).map { i => l + ((sh.toInt >> i) % 2) }).toVector - - val twoJ = 1 << j - val sPrime = s - (twoJ - 1) * l - - if (sPrime >= twoJ) { - val m = (sPrime >> j) - val sHat = (sPrime - m * twoJ) - binarize(sHat) :+ m.toInt - } else binarize(sPrime) - } - - /** - * Expand out a number's l-normalized form into the original - * number. - */ - def expand(form: Vector[Int]): Long = - form.zipWithIndex - .map { case (i, exp) => i.toLong << exp } - .reduce(_ + _) - - def relativeError(e: ExpHist): Double = - if (e.total == 0) 0.0 - else { - val maxOutsideWindow = e.last - 1 - val minInsideWindow = 1 + e.total - e.last - val absoluteError = maxOutsideWindow / 2.0 - absoluteError / minInsideWindow - } - - case class Config(k: Int, windowSize: Long) { - // Maximum number of buckets of size 2^i allowed in the repr of - // this exponential histogram. - def maxBuckets: Int = ExpHist.maxBuckets(k) + case class Config(epsilon: Double, windowSize: Long) { + val l: Int = div2Ceil(math.ceil(1 / epsilon).toInt) // Returns the last timestamp before the window. any ts <= [the // returned timestamp] is outside the window. def expiration(currTime: Long): Long = currTime - windowSize } - def empty(conf: Config): ExpHist = ExpHist(conf, Vector.empty, 0L, 0L, 0L) - def empty(k: Int, windowSize: Long): ExpHist = empty(Config(k, windowSize)) -} + def empty(conf: Config): ExpHist = ExpHist(conf, Vector.empty, Vector.empty, 0L) + + def from(i: Long, ts: Long, conf: Config): ExpHist = { + val sizes = Canonical.fromLong(i, conf.l) + ExpHist(conf, Vector.fill(sizes.sum)(ts), sizes, ts) + } -/** - * @param total counter for the total size of all buckets. - * @param last the size of the oldest bucket. - */ -case class ExpHist(conf: ExpHist.Config, buckets: Vector[BucketSeq], time: Long, last: Long, total: Long) { /** - * Returns the same ExpHist with a new window. If the new window is - * smaller than the current window, evicts older items. + * Takes a vector of timestamps sorted in descending order (newest + * first) and a cutoff and returns a vector containing only + * timestamps AFTER the cutoff. */ - def withWindow(newWindow: Long): ExpHist = copy(conf = conf.copy(windowSize = newWindow)).step(time) + def dropExpired(timestamps: Vector[Long], cutoff: Long): Vector[Long] = + timestamps.reverse.dropWhile(_ <= cutoff).reverse /** - * Step forward to `newTime`, evicting any wrapped buckets that - * fall outside of the window. + * @param x total count to remove from the head of `input`. + * @param input pairs of (count, T). + * @return Vector with x total items removed from the head; if + * an element wasn't fully consumed, the remainder will be + * stuck back onto the head. */ - def step(newTime: Long): ExpHist = - if (newTime <= time) this - else { - val t = conf.expiration(newTime) - - // TODO this is junk, but now the madness is contained. - val newBuckets = buckets.flatMap(_.expire(t)) - val newLast = newBuckets.headOption.map(_.bucketSize).getOrElse(0L) - val newTotal = newBuckets.map(_.count).reduceLeftOption(_ + _).getOrElse(0L) + @tailrec private[this] def take[T](x: Long, input: Vector[(Long, T)]): Vector[(Long, T)] = { + val (count, t) +: tail = input + (x - count) match { + case 0 => tail + case x if x < 0 => (-x, t) +: tail + case x if x > 0 => take(x, tail) + } + } - copy( - buckets = buckets.flatMap(_.expire(t)), - time = newTime, total = newTotal, last = newLast - ) + @tailrec def take2[T](x: Long, input: Vector[T])(from: T => Long)(to: (Long, T) => T): Vector[T] = { + val head +: tail = input + val count = from(head) + (x - count) match { + case 0 => tail + case x if x < 0 => to(-x, head) +: tail + case x if x > 0 => take2(x, tail)(from)(to) } + } /** - * "inc" algorithm: - * - * * Step forward to the new timestamp - * * create a new bucket with size 1 and timestamp - * * Traverse the list of buckets in order of increasing sizes. - * - * If there are (k / 2 + 2) or more buckets of the same size, merge - * the oldest two of these buckets into a single bucket of double - * the size. Repeat until the bucket size is less than the limit. + * Rebuckets the vector of inputs into buckets of the supplied + * sequence of sizes. Returns only the associated `T` information + * (timestamp, for example). */ - def inc(timestamp: Long): ExpHist = { - val stepped = step(timestamp) - val newBuckets = - stepped.buckets.lastOption match { - case None => Vector(BucketSeq.one(timestamp)) - case Some(bucketSeq) => stepped.buckets.init :+ (bucketSeq + timestamp) - } - stepped.copy( - total = total + 1, - buckets = BucketSeq.normalize(conf.maxBuckets, newBuckets)) + def rebucket[T](v: Vector[(Long, T)], buckets: Vector[Exp2]): Vector[T] = { + if (buckets.isEmpty) Vector.empty + else { + val exp2 +: tail = buckets + val remaining = take(exp2.value, v) // (_._1) { (l, pair) => (l + pair._1, pair._2) } + v.head._2 +: rebucket(remaining, tail) + } } +} - // Stupid implementation of `add` - just inc a bunch of times with - // the same timestamp. - def add(i: Long, timestamp: Long): ExpHist = - (0L until i).foldLeft(step(timestamp)) { - case (acc, _) => acc.inc(timestamp) +// TODO: Interesting that relative error only depends on sizes, not on +// timestamps. You could totally get a relative error off of some +// number without ever going through this business, then use that to +// decide when to evict stuff from the bucket. +case class ExpHist(conf: ExpHist.Config, timestamps: Vector[Long], sizes: Vector[Int], time: Long) { + def step(newTime: Long): ExpHist = + if (newTime <= time) this + else { + val filtered = ExpHist.dropExpired(timestamps, conf.expiration(newTime)) + val bucketsDropped = timestamps.size - filtered.size + copy( + time = newTime, + timestamps = filtered, + sizes = Canonical.dropBiggest(sizes, bucketsDropped)) } + // Efficient implementation of add. To make this solid we'll want to + // keep a buffer of new items and only add when the buffer expires. + def add(i: Long, timestamp: Long): ExpHist = { + val self = step(timestamp) + if (i == 0) self + else { + val newSizes = Canonical.fromLong(self.total + i, conf.l) + val bucketSizes = Canonical.toBuckets(self.sizes).map(_.value) + val inputs = (i, timestamp) +: (bucketSizes zip self.timestamps) + self.copy( + timestamps = ExpHist.rebucket(inputs, Canonical.toBuckets(newSizes)), + sizes = newSizes) + } + } + + def inc(timestamp: Long): ExpHist = add(1L, timestamp) + + def last: Long = 1 << (sizes.size - 1) + + def total: Long = Canonical.expand(sizes) + def lowerBoundSum: Long = total - last + def upperBoundSum: Long = total - // For testing. Returns the vector of bucket sizes from largest to - // smallest. + def guess: Double = total - (last - 1) / 2.0 + + def relativeError: Double = + if (total == 0) 0.0 + else { + val minInsideWindow = total + 1 - last + val absoluteError = (last - 1) / 2.0 + absoluteError / minInsideWindow + } + + /** + * Returns the same ExpHist with a new window. If the new window is + * smaller than the current window, evicts older items. + */ + def withWindow(newWindow: Long): ExpHist = + copy(conf = conf.copy(windowSize = newWindow)).step(time) + + // Returns the vector of bucket sizes from largest to smallest. def windows: Vector[Long] = for { - b <- buckets - t <- b.timestamps - } yield b.bucketSize + (numBuckets, idx) <- sizes.zipWithIndex.reverse + bucket <- List.fill(numBuckets)(1L << idx) + } yield bucket } -/** - * "compressed" representation of `timestamps.length` buckets of size - * `exp`. - * - * Bucket timestamps are sorted in time-increasing order (so the - * oldest bucket is at the head). - */ -case class BucketSeq(exp: BucketSeq.Pow2, timestamps: Vector[Long]) { l => - require(timestamps.sorted == timestamps) - require(timestamps.nonEmpty) - - // bucketSize, as defined in the paper. - def bucketSize: Long = exp.value - - // Total number of ticks recorded in this BucketSeq. - def count: Long = bucketSize * length +object Canonical { + case class Exp2(exp: Int) extends AnyVal { l => + def double: Exp2 = Exp2(exp + 1) + def value: Long = 1 << exp + } - // Total number of buckets tracked by the BucketSeq. - def length: Int = timestamps.length + @inline private[this] def prevPowerOfTwo(x: Long): Int = + JLong.numberOfTrailingZeros(JLong.highestOneBit(x)) - // Add a new timestamp. TODO: NOT sure if it's legit to not require - // that the new timestamp be > all existing timestamps. - def +(ts: Long): BucketSeq = copy(timestamps = (timestamps :+ ts).sorted) + @inline private[this] def modPow2(i: Int, exp2: Int): Int = i & ((1 << exp2) - 1) + @inline private[this] def quotient(i: Int, exp2: Int): Int = i >> exp2 + @inline private[this] def bit(i: Int, idx: Int): Int = (i >> idx) & 1 - def ++(r: BucketSeq): BucketSeq = { - require(l.exp == r.exp) - copy(timestamps = (l.timestamps ++ r.timestamps).sorted) - } + private[this] def binarize(i: Int, bits: Int, offset: Int): Vector[Int] = + (0 until bits).map { idx => offset + bit(i, idx) }.toVector /** - * Remove all timestamps <= the cutoff. Returns: + * returns a vector of the total number of buckets of size `s^i`, + * where `i` is the vector index, used to represent s for a given + * k. + * + * `s` is the "sum of the sizes of all of the buckets tracked by + * the ExpHist instance. * - * - None if the resulting [[BucketSeq]] is empty, - * - Some(the filtered [[BucketSeq]]) otherwise. + * `l` is... well, gotta explain that based on the paper. */ - def expire(cutoff: Long): Option[BucketSeq] = - Some(timestamps.filter(_ > cutoff)) - .filter(_.nonEmpty) - .map { v => copy(timestamps = v) } + def fromLong(s: Long, l: Int): Vector[Int] = { + val num = s + l + val denom = l + 1 + val j = prevPowerOfTwo(num / denom) + val offset = (num - (denom << j)).toInt + binarize(modPow2(offset, j), j, l) :+ (quotient(offset, j) + 1) + } /** - * Returns the number of pairs to drop to get this BucketSeq's final - * length <= the supplied limit. + * Total number of buckets required to represent this number. */ - private[this] def pairsToDrop(limit: Int): Int = - 0 max math.ceil((length - limit) / 2.0).toInt + def bucketsRequired(s: Long, l: Short): Int = fromLong(s, l).sum /** - * if this.length <= limit, returns (this, None). - * - * else, splits this BucketSeq into two by combining pairs of - * timestamps into new buckets until enough timestamps have been - * removed that this.length <= limit. - * - * The new pairs form a new [[BucketSeq]] instance with a doubled - * bucketSize. - * - * The return value in this case is a pair of the new, slimmed-down - * current BucketSeq and Some(the new, doubled BucketSeq). + * Expand out a number's l-canonical form into the original number. */ - def evolve(limit: Int): (BucketSeq, Option[BucketSeq]) = - pairsToDrop(limit) match { - case 0 => (this, None) - case pairs => - val childTs = (0 until pairs).map(i => timestamps(i * 2 + 1)) - val child = Some(BucketSeq(exp.double, childTs.toVector)) - (copy(timestamps = timestamps.drop(pairs * 2)), child) + def expand(rep: Vector[Int]): Long = + if (rep.isEmpty) 0L + else { + rep.iterator.zipWithIndex + .map { case (i, exp) => i.toLong << exp } + .reduce(_ + _) } + // Expands out the compressed form to a list of the bucket sizes (sized with exponent only) + // + // TODO: law - toBuckets(rep).size == rep.sum + def toBuckets(rep: Vector[Int]): Vector[Exp2] = + rep.zipWithIndex.flatMap { case (i, exp) => List.fill(i)(Exp2(exp)) } + /** - * Expands this BucketSeq out into a vector of BucketSeq instances - * that all have lengths <= the supplied limit. (Used on the oldest - * bucket after normalizing a sequence of BucketSeqs.) + * Given a canonical representation, takes a number of buckets to + * drop (starting with the largest buckets!) and removes that + * number of buckets from the end. + * + * (The sum of `canonical` is the total number of buckets in the + * representation.) */ - def expand(limit: Int): Vector[BucketSeq] = - evolve(limit) match { - case (bs, None) => Vector(bs) - case (bs, Some(remaining)) => remaining.expand(limit) :+ bs + @tailrec def dropBiggest(canonical: Vector[Int], bucketsToDrop: Int): Vector[Int] = + (canonical, bucketsToDrop) match { + case (l, 0) => l + case (l @ (init :+ last), toDrop) => + (toDrop - last) match { + case 0 => init + case x if x < 0 => init :+ -x + case x if x > 0 => dropBiggest(init, x) + } + case _ => Vector.empty[Int] } } - -object BucketSeq { - case class Pow2(exp: Int) extends AnyVal { l => - def double: Pow2 = Pow2(exp + 1) - def value: Long = 1 << exp - def *(r: Pow2) = Pow2(l.exp + r.exp) - } - - // Returns a singleton bucketseq. - def one(timestamp: Long): BucketSeq = BucketSeq(Pow2(0), Vector(timestamp)) - - // evolves all BucketSeq instances in `seqs` from newest -> oldest - // until every [[BucketSeq]] length is <= the supplied limit. - def normalize(limit: Int, seqs: Vector[BucketSeq]): Vector[BucketSeq] = { - val empty = (Vector.empty[BucketSeq], None: Option[BucketSeq]) - - val (ret, extra) = seqs.foldRight(empty) { - case (bs, (acc, optCarry)) => - val withCarry = optCarry.map(_ ++ bs).getOrElse(bs) - val (evolved, carry) = withCarry.evolve(limit) - (evolved +: acc, carry) - } - extra.map(_.expand(limit) ++ ret).getOrElse(ret) - } -} diff --git a/algebird-test/src/test/scala/com/twitter/algebird/ExpHistLaws.scala b/algebird-test/src/test/scala/com/twitter/algebird/ExpHistLaws.scala index 4aa2fe729..58b6c359e 100644 --- a/algebird-test/src/test/scala/com/twitter/algebird/ExpHistLaws.scala +++ b/algebird-test/src/test/scala/com/twitter/algebird/ExpHistLaws.scala @@ -44,7 +44,7 @@ class ExpHistLaws extends PropSpec with PropertyChecks with Matchers { Arbitrary(for { k <- Gen.posNum[Short] windowSize <- Gen.posNum[Long] - } yield ExpHist.Config(k, windowSize)) + } yield ExpHist.Config(1 / k.toDouble, windowSize)) def addAll(e: ExpHist, ticks: List[Tick]): ExpHist = ticks.foldLeft(e) { @@ -62,7 +62,7 @@ class ExpHistLaws extends PropSpec with PropertyChecks with Matchers { // algorithm. He says to assume that k/2 == 2, but then he promotes // as if k/2 == 1, ie k == 2. property("example from paper") { - val e = ExpHist.empty(2, 100) + val e = ExpHist.empty(ExpHist.Config(0.5, 100)) val plus76 = e.add(76, 0) val inc = plus76.inc(0) @@ -81,6 +81,14 @@ class ExpHistLaws extends PropSpec with PropertyChecks with Matchers { } } + property("empty.add and from are identical") { + forAll { (conf: ExpHist.Config, tick: Tick) => + val byAdd = ExpHist.empty(conf).add(tick.count, tick.timestamp) + val byFrom = ExpHist.from(tick.count, tick.timestamp, conf) + assert(byAdd == byFrom) + } + } + property("step should be idempotent") { forAll { (expHist: ExpHist, tick: Tick) => val ts = tick.timestamp @@ -125,7 +133,15 @@ class ExpHistLaws extends PropSpec with PropertyChecks with Matchers { val minInsideWindow = 1 + full.total - full.last val absoluteError = maxOutsideWindow / 2.0 val relativeError = absoluteError / minInsideWindow - assert(ExpHist.relativeError(full) <= 1.0 / conf.k) + + // local relative error calc: + assert(relativeError <= conf.epsilon) + + // Instance's relative error calc: + assert(full.relativeError <= conf.epsilon) + + // Error can never be above 0.5. + assert(full.relativeError <= 0.5) } } @@ -147,18 +163,24 @@ class ExpHistLaws extends PropSpec with PropertyChecks with Matchers { property("l-canonical representation round-trips") { forAll { (i: PosNum[Long], k: PosNum[Short]) => - assert(ExpHist.expand(ExpHist.lNormalize(i.value, k.value)) == i.value) + assert(Canonical.expand(Canonical.fromLong(i.value, k.value)) == i.value) } } property("all i except last have either k/2, k/2 + 1 buckets") { forAll { (i: PosNum[Long], k: PosNum[Short]) => - val lower = ExpHist.minBuckets(k.value) - val upper = ExpHist.maxBuckets(k.value) + val lower = k.value + val upper = lower + 1 assert( - ExpHist.lNormalize(i.value, k.value).init.forall { numBuckets => + Canonical.fromLong(i.value, k.value).init.forall { numBuckets => lower <= numBuckets && numBuckets <= upper }) } } + + property("impl of `div2Ceil` matches the simpler impl") { + forAll { i: Int => + assert(ExpHist.div2Ceil(i) == math.ceil(i / 2.0).toInt) + } + } } From a4a3731d3efe7c6331be47f37af5f97a8e89a738 Mon Sep 17 00:00:00 2001 From: Sam Ritchie Date: Mon, 31 Oct 2016 16:09:59 -0600 Subject: [PATCH 03/17] working again --- .../scala/com/twitter/algebird/ExpHist.scala | 101 ++++++++++++------ 1 file changed, 67 insertions(+), 34 deletions(-) diff --git a/algebird-core/src/main/scala/com/twitter/algebird/ExpHist.scala b/algebird-core/src/main/scala/com/twitter/algebird/ExpHist.scala index 4ff1e219a..3ecef74a9 100644 --- a/algebird-core/src/main/scala/com/twitter/algebird/ExpHist.scala +++ b/algebird-core/src/main/scala/com/twitter/algebird/ExpHist.scala @@ -36,32 +36,6 @@ object ExpHist { def dropExpired(timestamps: Vector[Long], cutoff: Long): Vector[Long] = timestamps.reverse.dropWhile(_ <= cutoff).reverse - /** - * @param x total count to remove from the head of `input`. - * @param input pairs of (count, T). - * @return Vector with x total items removed from the head; if - * an element wasn't fully consumed, the remainder will be - * stuck back onto the head. - */ - @tailrec private[this] def take[T](x: Long, input: Vector[(Long, T)]): Vector[(Long, T)] = { - val (count, t) +: tail = input - (x - count) match { - case 0 => tail - case x if x < 0 => (-x, t) +: tail - case x if x > 0 => take(x, tail) - } - } - - @tailrec def take2[T](x: Long, input: Vector[T])(from: T => Long)(to: (Long, T) => T): Vector[T] = { - val head +: tail = input - val count = from(head) - (x - count) match { - case 0 => tail - case x if x < 0 => to(-x, head) +: tail - case x if x > 0 => take2(x, tail)(from)(to) - } - } - /** * Rebuckets the vector of inputs into buckets of the supplied * sequence of sizes. Returns only the associated `T` information @@ -71,7 +45,7 @@ object ExpHist { if (buckets.isEmpty) Vector.empty else { val exp2 +: tail = buckets - val remaining = take(exp2.value, v) // (_._1) { (l, pair) => (l + pair._1, pair._2) } + val remaining = Canonical.drop(exp2.value, v) v.head._2 +: rebucket(remaining, tail) } } @@ -90,7 +64,7 @@ case class ExpHist(conf: ExpHist.Config, timestamps: Vector[Long], sizes: Vector copy( time = newTime, timestamps = filtered, - sizes = Canonical.dropBiggest(sizes, bucketsDropped)) + sizes = Canonical.dropBiggest(bucketsDropped, sizes)) } // Efficient implementation of add. To make this solid we'll want to @@ -99,8 +73,15 @@ case class ExpHist(conf: ExpHist.Config, timestamps: Vector[Long], sizes: Vector val self = step(timestamp) if (i == 0) self else { + // calculate canonical representation of the NEW total. val newSizes = Canonical.fromLong(self.total + i, conf.l) + + // Vector[TotalSizeOfEachBucket] + // Vector(3,3,2) expands out to + // Vector((1, ts),(1, ts),.... 1,2,2,2,4,4) == bucketSizes val bucketSizes = Canonical.toBuckets(self.sizes).map(_.value) + + // this is how we'd implement addAll: val inputs = (i, timestamp) +: (bucketSizes zip self.timestamps) self.copy( timestamps = ExpHist.rebucket(inputs, Canonical.toBuckets(newSizes)), @@ -110,7 +91,7 @@ case class ExpHist(conf: ExpHist.Config, timestamps: Vector[Long], sizes: Vector def inc(timestamp: Long): ExpHist = add(1L, timestamp) - def last: Long = 1 << (sizes.size - 1) + def last: Long = if (sizes.isEmpty) 0L else (1 << (sizes.size - 1)) def total: Long = Canonical.expand(sizes) @@ -149,7 +130,7 @@ object Canonical { def value: Long = 1 << exp } - @inline private[this] def prevPowerOfTwo(x: Long): Int = + @inline private[this] def floorPowerOfTwo(x: Long): Int = JLong.numberOfTrailingZeros(JLong.highestOneBit(x)) @inline private[this] def modPow2(i: Int, exp2: Int): Int = i & ((1 << exp2) - 1) @@ -168,13 +149,49 @@ object Canonical { * the ExpHist instance. * * `l` is... well, gotta explain that based on the paper. + * + * 1 + * 11 + * 111 + * 112 + * 1112 + * 1122 + * + * 1 + * 2 + * 3 + * 2 1 + * 3 1 + * 2 2 + * 3 2 + * 2 3 + * 3 3 + * 2 2 1 + * 3 2 1 + * 2 3 1 + * 3 3 1 + * 2 2 2 + * 3 2 2 + * 2 3 2 + * 3 3 2 */ def fromLong(s: Long, l: Int): Vector[Int] = { + if (s <= 0) Vector.empty + else { + val num = s + l + val denom = l + 1 + val j = floorPowerOfTwo(num / denom) + val offset = (num - (denom << j)).toInt + binarize(modPow2(offset, j), j, l) :+ (quotient(offset, j) + 1) + } + } + + def lastBucketSize(s: Long, l: Int): Int = { val num = s + l val denom = l + 1 - val j = prevPowerOfTwo(num / denom) + val j = floorPowerOfTwo(num / denom) val offset = (num - (denom << j)).toInt - binarize(modPow2(offset, j), j, l) :+ (quotient(offset, j) + 1) + (quotient(offset, j) + 1) } /** @@ -207,15 +224,31 @@ object Canonical { * (The sum of `canonical` is the total number of buckets in the * representation.) */ - @tailrec def dropBiggest(canonical: Vector[Int], bucketsToDrop: Int): Vector[Int] = + @tailrec def dropBiggest(bucketsToDrop: Int, canonical: Vector[Int]): Vector[Int] = (canonical, bucketsToDrop) match { case (l, 0) => l case (l @ (init :+ last), toDrop) => (toDrop - last) match { case 0 => init case x if x < 0 => init :+ -x - case x if x > 0 => dropBiggest(init, x) + case x if x > 0 => dropBiggest(x, init) } case _ => Vector.empty[Int] } + + /** + * @param x total count to remove from the head of `input`. + * @param input pairs of (count, T). + * @return Vector with x total items removed from the head; if + * an element wasn't fully consumed, the remainder will be + * stuck back onto the head. + */ + @tailrec def drop[T](x: Long, input: Vector[(Long, T)]): Vector[(Long, T)] = { + val (count, t) +: tail = input + (x - count) match { + case 0 => tail + case x if x < 0 => (-x, t) +: tail + case x if x > 0 => drop(x, tail) + } + } } From f6df59218850f259229c519a8c6661abbb4b72bd Mon Sep 17 00:00:00 2001 From: Sam Ritchie Date: Mon, 31 Oct 2016 16:16:11 -0600 Subject: [PATCH 04/17] get drop impl together --- .../scala/com/twitter/algebird/ExpHist.scala | 33 ++++++++++--------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/algebird-core/src/main/scala/com/twitter/algebird/ExpHist.scala b/algebird-core/src/main/scala/com/twitter/algebird/ExpHist.scala index 3ecef74a9..5eb62d3ff 100644 --- a/algebird-core/src/main/scala/com/twitter/algebird/ExpHist.scala +++ b/algebird-core/src/main/scala/com/twitter/algebird/ExpHist.scala @@ -64,7 +64,7 @@ case class ExpHist(conf: ExpHist.Config, timestamps: Vector[Long], sizes: Vector copy( time = newTime, timestamps = filtered, - sizes = Canonical.dropBiggest(bucketsDropped, sizes)) + sizes = Canonical.dropBiggest(bucketsDropped, sizes.reverse).reverse) } // Efficient implementation of add. To make this solid we'll want to @@ -225,13 +225,13 @@ object Canonical { * representation.) */ @tailrec def dropBiggest(bucketsToDrop: Int, canonical: Vector[Int]): Vector[Int] = - (canonical, bucketsToDrop) match { - case (l, 0) => l - case (l @ (init :+ last), toDrop) => - (toDrop - last) match { - case 0 => init - case x if x < 0 => init :+ -x - case x if x > 0 => dropBiggest(x, init) + (bucketsToDrop, canonical) match { + case (0, l) => l + case (toDrop, count +: tail) => + (toDrop - count) match { + case 0 => tail + case x if x < 0 => -x +: tail + case x if x > 0 => dropBiggest(x, tail) } case _ => Vector.empty[Int] } @@ -243,12 +243,15 @@ object Canonical { * an element wasn't fully consumed, the remainder will be * stuck back onto the head. */ - @tailrec def drop[T](x: Long, input: Vector[(Long, T)]): Vector[(Long, T)] = { - val (count, t) +: tail = input - (x - count) match { - case 0 => tail - case x if x < 0 => (-x, t) +: tail - case x if x > 0 => drop(x, tail) + @tailrec def drop[T](x: Long, input: Vector[(Long, T)]): Vector[(Long, T)] = + (x, input) match { + case (0, input) => input + case (toDrop, (count, t) +: tail) => + (toDrop - count) match { + case 0 => tail + case x if x < 0 => (-x, t) +: tail + case x if x > 0 => drop(x, tail) + } + case _ => Vector.empty[(Long, T)] } - } } From 370abf928e3996fa3d7a7475018f9895a78cfc14 Mon Sep 17 00:00:00 2001 From: Sam Ritchie Date: Mon, 31 Oct 2016 20:47:51 -0600 Subject: [PATCH 05/17] checkpoint --- .../src/main/scala/com/twitter/algebird/ExpHist.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/algebird-core/src/main/scala/com/twitter/algebird/ExpHist.scala b/algebird-core/src/main/scala/com/twitter/algebird/ExpHist.scala index 5eb62d3ff..5f743a520 100644 --- a/algebird-core/src/main/scala/com/twitter/algebird/ExpHist.scala +++ b/algebird-core/src/main/scala/com/twitter/algebird/ExpHist.scala @@ -11,7 +11,8 @@ object ExpHist { import Canonical.Exp2 // Same as math.ceil(x / 2.0).toInt - def div2Ceil(x: Int): Int = (x >> 1) + (if ((x & 1) == 0) 0 else 1) + @inline def div2Ceil(x: Int): Int = + (x >>> 1) + (if ((x & 1) == 0) 0 else case) case class Config(epsilon: Double, windowSize: Long) { val l: Int = div2Ceil(math.ceil(1 / epsilon).toInt) @@ -134,8 +135,8 @@ object Canonical { JLong.numberOfTrailingZeros(JLong.highestOneBit(x)) @inline private[this] def modPow2(i: Int, exp2: Int): Int = i & ((1 << exp2) - 1) - @inline private[this] def quotient(i: Int, exp2: Int): Int = i >> exp2 - @inline private[this] def bit(i: Int, idx: Int): Int = (i >> idx) & 1 + @inline private[this] def quotient(i: Int, exp2: Int): Int = i >>> exp2 + @inline private[this] def bit(i: Int, idx: Int): Int = (i >>> idx) & 1 private[this] def binarize(i: Int, bits: Int, offset: Int): Vector[Int] = (0 until bits).map { idx => offset + bit(i, idx) }.toVector From 48fbcc92dbe65523bea36fc4ebd6810568aa4af2 Mon Sep 17 00:00:00 2001 From: Sam Ritchie Date: Tue, 1 Nov 2016 08:34:48 -0600 Subject: [PATCH 06/17] convert to buckets with addAll --- .../scala/com/twitter/algebird/ExpHist.scala | 249 +++++++----------- .../com/twitter/algebird/ExpHistLaws.scala | 179 +++++++------ 2 files changed, 199 insertions(+), 229 deletions(-) diff --git a/algebird-core/src/main/scala/com/twitter/algebird/ExpHist.scala b/algebird-core/src/main/scala/com/twitter/algebird/ExpHist.scala index 5f743a520..eb7fcec91 100644 --- a/algebird-core/src/main/scala/com/twitter/algebird/ExpHist.scala +++ b/algebird-core/src/main/scala/com/twitter/algebird/ExpHist.scala @@ -8,129 +8,140 @@ import scala.annotation.tailrec * http://ilpubs.stanford.edu:8090/504/1/2001-34.pdf. */ object ExpHist { - import Canonical.Exp2 + case class Bucket(size: Long, timestamp: Long) - // Same as math.ceil(x / 2.0).toInt - @inline def div2Ceil(x: Int): Int = - (x >>> 1) + (if ((x & 1) == 0) 0 else case) + object Bucket { + implicit val ord: Ordering[Bucket] = Ordering.by { b: Bucket => (b.timestamp, b.size) } + } case class Config(epsilon: Double, windowSize: Long) { - val l: Int = div2Ceil(math.ceil(1 / epsilon).toInt) + val k: Int = math.ceil(1 / epsilon).toInt + val l: Int = math.ceil(k / 2.0).toInt // Returns the last timestamp before the window. any ts <= [the // returned timestamp] is outside the window. def expiration(currTime: Long): Long = currTime - windowSize + def dropExpired(buckets: Vector[Bucket], currTime: Long): (Long, Vector[Bucket]) = + ExpHist.dropExpired(buckets, expiration(currTime)) } - def empty(conf: Config): ExpHist = ExpHist(conf, Vector.empty, Vector.empty, 0L) + /** + * Create an empty instance with the supplied Config. + */ + def empty(conf: Config): ExpHist = ExpHist(conf, Vector.empty, 0L, 0L) + /** + * Create an instance from a number. + */ def from(i: Long, ts: Long, conf: Config): ExpHist = { - val sizes = Canonical.fromLong(i, conf.l) - ExpHist(conf, Vector.fill(sizes.sum)(ts), sizes, ts) + val buckets = Canonical.longToBuckets(i, conf.l).map(Bucket(_, ts)) + ExpHist(conf, buckets, i, ts) } /** - * Takes a vector of timestamps sorted in descending order (newest - * first) and a cutoff and returns a vector containing only - * timestamps AFTER the cutoff. + * Takes a vector of buckets sorted in descending order (newest + * first) and a cutoff and returns + * + * - The sum of the sizes of the dropped buckets + * - a vector containing only buckets with timestamps AFTER the cutoff. */ - def dropExpired(timestamps: Vector[Long], cutoff: Long): Vector[Long] = - timestamps.reverse.dropWhile(_ <= cutoff).reverse + def dropExpired(timestamps: Vector[Bucket], cutoff: Long): (Long, Vector[Bucket]) = { + val (dropped, remaining) = timestamps.reverse.span(_.timestamp <= cutoff) + (dropped.map(_.size).sum, remaining.reverse) + } /** * Rebuckets the vector of inputs into buckets of the supplied * sequence of sizes. Returns only the associated `T` information * (timestamp, for example). */ - def rebucket[T](v: Vector[(Long, T)], buckets: Vector[Exp2]): Vector[T] = { - if (buckets.isEmpty) Vector.empty + def rebucket(buckets: Vector[Bucket], desired: Vector[Long]): Vector[Bucket] = + if (desired.isEmpty) Vector.empty else { - val exp2 +: tail = buckets - val remaining = Canonical.drop(exp2.value, v) - v.head._2 +: rebucket(remaining, tail) + val bucketSize +: tail = desired + val remaining = drop(bucketSize, buckets) + buckets.head.copy(size = bucketSize) +: rebucket(remaining, tail) + } + + /** + * @param toDrop total count to remove from the left of `input`. + * @param input buckets + * @return Vector with buckets, or pieces of buckets, with sizes + * totalling `toDrop` items removed from the head. If an + * element wasn't fully consumed, the remainder will be + * stuck back onto the head. + */ + @tailrec private[this] def drop(toDrop: Long, input: Vector[Bucket]): Vector[Bucket] = { + val (b @ Bucket(count, _)) +: tail = input + (toDrop - count) match { + case 0 => tail + case x if x < 0 => b.copy(size = -x) +: tail + case x if x > 0 => drop(x, tail) } } } -// TODO: Interesting that relative error only depends on sizes, not on -// timestamps. You could totally get a relative error off of some -// number without ever going through this business, then use that to -// decide when to evict stuff from the bucket. -case class ExpHist(conf: ExpHist.Config, timestamps: Vector[Long], sizes: Vector[Int], time: Long) { +case class ExpHist(conf: ExpHist.Config, buckets: Vector[ExpHist.Bucket], total: Long, time: Long) { + import ExpHist.Bucket + def step(newTime: Long): ExpHist = if (newTime <= time) this else { - val filtered = ExpHist.dropExpired(timestamps, conf.expiration(newTime)) - val bucketsDropped = timestamps.size - filtered.size - copy( - time = newTime, - timestamps = filtered, - sizes = Canonical.dropBiggest(bucketsDropped, sizes.reverse).reverse) + val (dropped, filtered) = conf.dropExpired(buckets, newTime) + copy(time = newTime, buckets = filtered, total = total - dropped) } - // Efficient implementation of add. To make this solid we'll want to - // keep a buffer of new items and only add when the buffer expires. - def add(i: Long, timestamp: Long): ExpHist = { - val self = step(timestamp) - if (i == 0) self - else { - // calculate canonical representation of the NEW total. - val newSizes = Canonical.fromLong(self.total + i, conf.l) - - // Vector[TotalSizeOfEachBucket] - // Vector(3,3,2) expands out to - // Vector((1, ts),(1, ts),.... 1,2,2,2,4,4) == bucketSizes - val bucketSizes = Canonical.toBuckets(self.sizes).map(_.value) + def inc(timestamp: Long): ExpHist = add(1L, timestamp) - // this is how we'd implement addAll: - val inputs = (i, timestamp) +: (bucketSizes zip self.timestamps) - self.copy( - timestamps = ExpHist.rebucket(inputs, Canonical.toBuckets(newSizes)), - sizes = newSizes) - } + def add(delta: Long, timestamp: Long): ExpHist = { + val self = step(timestamp) + if (delta == 0) self + else self.addAllWithoutStep(Vector(Bucket(delta, timestamp)), delta) } - def inc(timestamp: Long): ExpHist = add(1L, timestamp) + def addAll(unsorted: Vector[Bucket]): ExpHist = + if (unsorted.isEmpty) this + else { + val sorted = unsorted.sorted(Ordering[Bucket].reverse) + val delta = sorted.map(_.size).sum + val timestamp = sorted.head.timestamp + if (delta == 0) + step(timestamp) + else + addAllWithoutStep(sorted, delta).step(timestamp) + } - def last: Long = if (sizes.isEmpty) 0L else (1 << (sizes.size - 1)) + // Note that this internal method assumes that the instance is + // stepped forward already, and does NOT try to step internally. It + // also assumes that `items` is sorted in ASCENDING order, with + // newer items on the right side. + // private[ExpHist] + def addAllWithoutStep(items: Vector[Bucket], delta: Long): ExpHist = { + val inputs = items ++ buckets + val desiredBuckets = Canonical.longToBuckets(total + delta, conf.l) + copy( + buckets = ExpHist.rebucket(inputs, desiredBuckets), + total = total + delta) + } - def total: Long = Canonical.expand(sizes) + def oldestBucketSize: Long = if (buckets.isEmpty) 0L else buckets.last.size - def lowerBoundSum: Long = total - last + def lowerBoundSum: Long = total - oldestBucketSize def upperBoundSum: Long = total - def guess: Double = total - (last - 1) / 2.0 + def guess: Double = total - (oldestBucketSize - 1) / 2.0 def relativeError: Double = if (total == 0) 0.0 else { - val minInsideWindow = total + 1 - last - val absoluteError = (last - 1) / 2.0 + val minInsideWindow = total + 1 - oldestBucketSize + val absoluteError = (oldestBucketSize - 1) / 2.0 absoluteError / minInsideWindow } - - /** - * Returns the same ExpHist with a new window. If the new window is - * smaller than the current window, evicts older items. - */ - def withWindow(newWindow: Long): ExpHist = - copy(conf = conf.copy(windowSize = newWindow)).step(time) - - // Returns the vector of bucket sizes from largest to smallest. - def windows: Vector[Long] = - for { - (numBuckets, idx) <- sizes.zipWithIndex.reverse - bucket <- List.fill(numBuckets)(1L << idx) - } yield bucket } object Canonical { - case class Exp2(exp: Int) extends AnyVal { l => - def double: Exp2 = Exp2(exp + 1) - def value: Long = 1 << exp - } - @inline private[this] def floorPowerOfTwo(x: Long): Int = JLong.numberOfTrailingZeros(JLong.highestOneBit(x)) @@ -146,35 +157,10 @@ object Canonical { * where `i` is the vector index, used to represent s for a given * k. * - * `s` is the "sum of the sizes of all of the buckets tracked by + * `s` is the sum of the sizes of all of the buckets tracked by * the ExpHist instance. * - * `l` is... well, gotta explain that based on the paper. - * - * 1 - * 11 - * 111 - * 112 - * 1112 - * 1122 - * - * 1 - * 2 - * 3 - * 2 1 - * 3 1 - * 2 2 - * 3 2 - * 2 3 - * 3 3 - * 2 2 1 - * 3 2 1 - * 2 3 1 - * 3 3 1 - * 2 2 2 - * 3 2 2 - * 2 3 2 - * 3 3 2 + * `l` or `l + 1` buckets allowed per size. */ def fromLong(s: Long, l: Int): Vector[Int] = { if (s <= 0) Vector.empty @@ -187,23 +173,10 @@ object Canonical { } } - def lastBucketSize(s: Long, l: Int): Int = { - val num = s + l - val denom = l + 1 - val j = floorPowerOfTwo(num / denom) - val offset = (num - (denom << j)).toInt - (quotient(offset, j) + 1) - } - - /** - * Total number of buckets required to represent this number. - */ - def bucketsRequired(s: Long, l: Short): Int = fromLong(s, l).sum - /** * Expand out a number's l-canonical form into the original number. */ - def expand(rep: Vector[Int]): Long = + def toLong(rep: Vector[Int]): Long = if (rep.isEmpty) 0L else { rep.iterator.zipWithIndex @@ -211,48 +184,16 @@ object Canonical { .reduce(_ + _) } - // Expands out the compressed form to a list of the bucket sizes (sized with exponent only) - // - // TODO: law - toBuckets(rep).size == rep.sum - def toBuckets(rep: Vector[Int]): Vector[Exp2] = - rep.zipWithIndex.flatMap { case (i, exp) => List.fill(i)(Exp2(exp)) } - /** - * Given a canonical representation, takes a number of buckets to - * drop (starting with the largest buckets!) and removes that - * number of buckets from the end. - * - * (The sum of `canonical` is the total number of buckets in the - * representation.) + * Expands out the compressed form to a list of the bucket sizes in + * ascending order. */ - @tailrec def dropBiggest(bucketsToDrop: Int, canonical: Vector[Int]): Vector[Int] = - (bucketsToDrop, canonical) match { - case (0, l) => l - case (toDrop, count +: tail) => - (toDrop - count) match { - case 0 => tail - case x if x < 0 => -x +: tail - case x if x > 0 => dropBiggest(x, tail) - } - case _ => Vector.empty[Int] - } + def toBuckets(rep: Vector[Int]): Vector[Long] = + rep.zipWithIndex.flatMap { case (i, exp) => List.fill(i)(1L << exp) } /** - * @param x total count to remove from the head of `input`. - * @param input pairs of (count, T). - * @return Vector with x total items removed from the head; if - * an element wasn't fully consumed, the remainder will be - * stuck back onto the head. + * Returns a sequence of (ascending-order) bucket sizes required to + * store the supplied `s` in l-canonical form. */ - @tailrec def drop[T](x: Long, input: Vector[(Long, T)]): Vector[(Long, T)] = - (x, input) match { - case (0, input) => input - case (toDrop, (count, t) +: tail) => - (toDrop - count) match { - case 0 => tail - case x if x < 0 => (-x, t) +: tail - case x if x > 0 => drop(x, tail) - } - case _ => Vector.empty[(Long, T)] - } + def longToBuckets(s: Long, l: Int): Vector[Long] = toBuckets(fromLong(s, l)) } diff --git a/algebird-test/src/test/scala/com/twitter/algebird/ExpHistLaws.scala b/algebird-test/src/test/scala/com/twitter/algebird/ExpHistLaws.scala index 58b6c359e..707daef82 100644 --- a/algebird-test/src/test/scala/com/twitter/algebird/ExpHistLaws.scala +++ b/algebird-test/src/test/scala/com/twitter/algebird/ExpHistLaws.scala @@ -13,91 +13,81 @@ object PosNum { Arbitrary(for { p <- Gen.posNum[T] } yield PosNum(p)) } -case class Tick(count: Long, timestamp: Long) - -object Tick { - implicit val ord: Ordering[Tick] = - Ordering.by { t: Tick => (t.timestamp, t.count) } - - implicit val arb: Arbitrary[Tick] = - Arbitrary(for { - ts <- Gen.posNum[Long] - tick <- Gen.posNum[Long] - } yield Tick(tick - 1L, ts)) +case class NonEmptyVector[T](items: Vector[T]) { + def sorted(implicit ev: Ordering[T]): Vector[T] = items.sorted } -case class NonEmptyList[T](items: List[T]) { - def sorted(implicit ev: Ordering[T]): List[T] = items.sorted -} - -object NonEmptyList { - implicit def arb[T: Ordering: Arbitrary]: Arbitrary[NonEmptyList[T]] = +object NonEmptyVector { + implicit def arb[T: Ordering: Arbitrary]: Arbitrary[NonEmptyVector[T]] = Arbitrary(for { l <- Gen.nonEmptyListOf(arbitrary[T]) - } yield NonEmptyList[T](l)) + } yield NonEmptyVector[T](l.toVector)) } class ExpHistLaws extends PropSpec with PropertyChecks with Matchers { - import BaseProperties._ + import ExpHist.{ Bucket, Config } + + implicit val arb: Arbitrary[Bucket] = + Arbitrary(for { + count <- Gen.posNum[Long] + timestamp <- Gen.posNum[Long] + } yield Bucket(count - 1L, timestamp)) - implicit val conf: Arbitrary[ExpHist.Config] = + implicit val conf: Arbitrary[Config] = Arbitrary(for { k <- Gen.posNum[Short] windowSize <- Gen.posNum[Long] - } yield ExpHist.Config(1 / k.toDouble, windowSize)) - - def addAll(e: ExpHist, ticks: List[Tick]): ExpHist = - ticks.foldLeft(e) { - case (e, Tick(count, timestamp)) => - e.add(count, timestamp) - } + } yield Config(1 / k.toDouble, windowSize)) implicit val expHist: Arbitrary[ExpHist] = Arbitrary(for { - ticks <- arbitrary[List[Tick]] + buckets <- arbitrary[Vector[Bucket]] conf <- arbitrary[ExpHist.Config] - } yield addAll(ExpHist.empty(conf), ticks)) + } yield ExpHist.empty(conf).addAll(buckets)) // The example in the paper is actually busted, based on his // algorithm. He says to assume that k/2 == 2, but then he promotes // as if k/2 == 1, ie k == 2. property("example from paper") { + // Returns the vector of bucket sizes from largest to smallest. + def windows(e: ExpHist): Vector[Long] = e.buckets.reverse.map(_.size) + val e = ExpHist.empty(ExpHist.Config(0.5, 100)) val plus76 = e.add(76, 0) val inc = plus76.inc(0) val incinc = inc.add(2, 0) - plus76.windows shouldBe Vector(32, 16, 8, 8, 4, 4, 2, 1, 1) - inc.windows shouldBe Vector(32, 16, 8, 8, 4, 4, 2, 2, 1) - incinc.windows shouldBe Vector(32, 16, 16, 8, 4, 2, 1) + assert(windows(plus76) == Vector(32, 16, 8, 8, 4, 4, 2, 1, 1)) + assert(windows(inc) == Vector(32, 16, 8, 8, 4, 4, 2, 2, 1)) + assert(windows(incinc) == Vector(32, 16, 16, 8, 4, 2, 1)) } property("adding i results in upperBoundSum == i") { - forAll { (conf: ExpHist.Config, tick: Tick) => + forAll { (conf: ExpHist.Config, bucket: Bucket) => assert(ExpHist.empty(conf) - .add(tick.count, tick.timestamp) - .upperBoundSum == tick.count) + .add(bucket.size, bucket.timestamp) + .upperBoundSum == bucket.size) } } property("empty.add and from are identical") { - forAll { (conf: ExpHist.Config, tick: Tick) => - val byAdd = ExpHist.empty(conf).add(tick.count, tick.timestamp) - val byFrom = ExpHist.from(tick.count, tick.timestamp, conf) + forAll { (conf: ExpHist.Config, bucket: Bucket) => + val byAdd = ExpHist.empty(conf).add(bucket.size, bucket.timestamp) + val byFrom = ExpHist.from(bucket.size, bucket.timestamp, conf) assert(byAdd == byFrom) } } property("step should be idempotent") { - forAll { (expHist: ExpHist, tick: Tick) => - val ts = tick.timestamp + forAll { (expHist: ExpHist, bucket: Bucket) => + val ts = bucket.timestamp val stepped = expHist.step(ts) assert(stepped == stepped.step(ts)) - val added = expHist.add(tick.count, ts) + val added = expHist.add(bucket.size, ts) val addThenStep = added.step(ts) - val stepThenAdd = stepped.add(tick.count, ts) + val stepThenAdd = stepped.add(bucket.size, ts) assert(added == addThenStep) assert(added == stepThenAdd) @@ -111,59 +101,104 @@ class ExpHistLaws extends PropSpec with PropertyChecks with Matchers { } } + // property("add matches addAll") { + // forAll { (unsorted: NonEmptyVector[Bucket], conf: ExpHist.Config) => + // val e = ExpHist.empty(conf) + // val items = unsorted.sorted + // val withAdd = items.foldLeft(e) { + // case (e, Bucket(count, timestamp)) => + // e.add(count, timestamp) + // } + + // assert(withAdd == e.addAll(items)) + // } + // } + // This is the meat! - property("bounded error of the query") { - forAll { (items: NonEmptyList[Tick], conf: ExpHist.Config) => - val ticks = items.sorted + def testBounds(eh: ExpHist, actualSum: Long) { + val upperBound = eh.upperBoundSum + val lowerBound = eh.lowerBoundSum + assert(lowerBound <= actualSum) + assert(upperBound >= actualSum) + + val maxOutsideWindow = eh.oldestBucketSize - 1 + val minInsideWindow = 1 + eh.total - eh.oldestBucketSize + val absoluteError = maxOutsideWindow / 2.0 + val relativeError = absoluteError / minInsideWindow - val full = addAll(ExpHist.empty(conf), ticks) + // local relative error calc: + assert(relativeError <= eh.conf.epsilon) - val finalTs = ticks.last.timestamp + // Instance's relative error calc: + assert(eh.relativeError <= eh.conf.epsilon) - val actualSum = ticks.collect { - case Tick(count, ts) if ts > (finalTs - conf.windowSize) => count - }.sum + // Error can never be above 0.5. + assert(eh.relativeError <= 0.5) + } - val upperBound = full.upperBoundSum - val lowerBound = full.lowerBoundSum - assert(lowerBound <= actualSum) - assert(upperBound >= actualSum) + def actualBucketSum(buckets: Vector[Bucket], exclusiveCutoff: Long): Long = + buckets.collect { + case Bucket(count, ts) if ts > exclusiveCutoff => count + }.sum - val maxOutsideWindow = full.last - 1 - val minInsideWindow = 1 + full.total - full.last - val absoluteError = maxOutsideWindow / 2.0 - val relativeError = absoluteError / minInsideWindow + property("bounded error of the query with addAll") { + forAll { (v: NonEmptyVector[Bucket], conf: Config) => + val buckets = v.items + val recentTs = buckets.maxBy(_.timestamp).timestamp + val cutoff = conf.expiration(recentTs) - // local relative error calc: - assert(relativeError <= conf.epsilon) + testBounds( + ExpHist.empty(conf).addAll(buckets), + actualBucketSum(buckets, cutoff)) + } + } - // Instance's relative error calc: - assert(full.relativeError <= conf.epsilon) + property("bounded error of the query with add") { + forAll { (items: NonEmptyVector[Bucket], conf: Config) => + val buckets = items.sorted + val recentTs = buckets.last.timestamp + val cutoff = conf.expiration(recentTs) - // Error can never be above 0.5. - assert(full.relativeError <= 0.5) + val full = buckets.foldLeft(ExpHist.empty(conf)) { + case (e, Bucket(c, t)) => e.add(c, t) + } + + testBounds(full, actualBucketSum(buckets, cutoff)) } } property("add and inc should generate the same results") { - forAll { (tick: Tick, conf: ExpHist.Config) => + forAll { (bucket: Bucket, conf: ExpHist.Config) => val e = ExpHist.empty(conf) - val incs = (0L until tick.count).foldLeft(e) { - case (acc, _) => acc.inc(tick.timestamp) + val incs = (0L until bucket.size).foldLeft(e) { + case (acc, _) => acc.inc(bucket.timestamp) } - val adds = e.add(tick.count, tick.timestamp) + val adds = e.add(bucket.size, bucket.timestamp) assert(incs.total == adds.total) assert(incs.lowerBoundSum == adds.lowerBoundSum) assert(incs.upperBoundSum == adds.upperBoundSum) } } +} +class CanonicalLaws extends PropSpec with PropertyChecks with Matchers { property("l-canonical representation round-trips") { + forAll { (i: PosNum[Long], l: PosNum[Short]) => + assert(Canonical.toLong(Canonical.fromLong(i.value, l.value)) == i.value) + } + } + + property("Sum of canonical rep is # of buckets required.") { forAll { (i: PosNum[Long], k: PosNum[Short]) => - assert(Canonical.expand(Canonical.fromLong(i.value, k.value)) == i.value) + val rep = Canonical.fromLong(i.value, k.value) + val numBuckets = Canonical.toBuckets(rep).size + assert(numBuckets == rep.sum) + + // longToBuckets works. + assert(Canonical.longToBuckets(i.value, k.value).size == numBuckets) } } @@ -177,10 +212,4 @@ class ExpHistLaws extends PropSpec with PropertyChecks with Matchers { }) } } - - property("impl of `div2Ceil` matches the simpler impl") { - forAll { i: Int => - assert(ExpHist.div2Ceil(i) == math.ceil(i / 2.0).toInt) - } - } } From 344acdc8da9e60a3a3f001f95918b2cfb2702d80 Mon Sep 17 00:00:00 2001 From: Sam Ritchie Date: Tue, 1 Nov 2016 11:19:45 -0600 Subject: [PATCH 07/17] cleaned up and ready for the dance --- .../algebird/benchmark/ExpHistBenchmark.scala | 53 --- .../scala/com/twitter/algebird/ExpHist.scala | 225 +++++++++--- .../com/twitter/algebird/ExpHistLaws.scala | 320 +++++++++++------- .../com/twitter/algebird/Generators.scala | 31 +- 4 files changed, 402 insertions(+), 227 deletions(-) delete mode 100644 algebird-benchmark/src/main/scala/com/twitter/algebird/benchmark/ExpHistBenchmark.scala diff --git a/algebird-benchmark/src/main/scala/com/twitter/algebird/benchmark/ExpHistBenchmark.scala b/algebird-benchmark/src/main/scala/com/twitter/algebird/benchmark/ExpHistBenchmark.scala deleted file mode 100644 index 50aa454fe..000000000 --- a/algebird-benchmark/src/main/scala/com/twitter/algebird/benchmark/ExpHistBenchmark.scala +++ /dev/null @@ -1,53 +0,0 @@ -package com.twitter.algebird -package benchmark - -import org.openjdk.jmh.annotations._ -import org.openjdk.jmh.infra.Blackhole -import scala.util.Random - -/** - * Benchmarks the Exponential Histogram implementation in Algebird. - */ -object ExpHistBenchmark { - @State(Scope.Benchmark) - class ExpHistState { - @Param(Array("0.1", "0.005")) - var eps: Double = 0.0 - - @Param(Array("1000")) - var window: Int = 0 - - // number of data values to combine into an ExpHist - @Param(Array("10", "100", "1000")) - var numElements: Int = 0 - - var empty: ExpHist = _ - var inputData: Vector[(Long, Long)] = _ - - @Setup(Level.Trial) - def setup(): Unit = { - val rng = new Random(3) - - val conf = ExpHist.Config(math.ceil(1 / eps).toInt, window) - empty = ExpHist.empty(conf) - - inputData = (0L until numElements).map { _ => - val timestamp = rng.nextInt(window).toLong - val item = rng.nextInt(Int.MaxValue).toLong - (item, timestamp) - }.sortBy(_._2).toVector - - } - } -} - -class ExpHistBenchmark { - import ExpHistBenchmark._ - - @Benchmark - def timeAdd(state: ExpHistState, bh: Blackhole) = { - state.inputData.foreach { pair => - bh.consume(state.empty.add(pair._1, pair._2)) - } - } -} diff --git a/algebird-core/src/main/scala/com/twitter/algebird/ExpHist.scala b/algebird-core/src/main/scala/com/twitter/algebird/ExpHist.scala index eb7fcec91..d5b61de83 100644 --- a/algebird-core/src/main/scala/com/twitter/algebird/ExpHist.scala +++ b/algebird-core/src/main/scala/com/twitter/algebird/ExpHist.scala @@ -5,7 +5,30 @@ import scala.annotation.tailrec /** * Exponential Histogram algorithm from - * http://ilpubs.stanford.edu:8090/504/1/2001-34.pdf. + * http://ilpubs.stanford.edu:8090/504/1/2001-34.pdf + * + * An Exponential Histogram is a sliding window counter that can + * guarantee a bounded relative error. You configure the data structure with + * + * - epsilon, the relative error you're willing to tolerate + * - windowSize, the number of time ticks that you want to track + * + * You interact with the data structure by adding (number, timestamp) + * pairs into the exponential histogram, and querying it for + * approximate counts. + * + * The approximate count is guaranteed to be within conf.epsilon + * relative error of the true count seen across the supplied + * `windowSize`. + * + * + * Next steps: + * + * - combine Canonical.fromLong and Canonical.bucketsFromLong + * - efficient serialization using Canonical + * - Query EH with a shorter window than the configured window + * - Discussion of epsilon vs memory tradeoffs + * - Discussion of how Canonical.fromLong works */ object ExpHist { case class Bucket(size: Long, timestamp: Long) @@ -14,6 +37,11 @@ object ExpHist { implicit val ord: Ordering[Bucket] = Ordering.by { b: Bucket => (b.timestamp, b.size) } } + /** + * ExpHist guarantees that the returned guess will be within + * `epsilon` relative error of the true count across a sliding + * window of size `windowSize`. + */ case class Config(epsilon: Double, windowSize: Long) { val k: Int = math.ceil(1 / epsilon).toInt val l: Int = math.ceil(k / 2.0).toInt @@ -21,6 +49,9 @@ object ExpHist { // Returns the last timestamp before the window. any ts <= [the // returned timestamp] is outside the window. def expiration(currTime: Long): Long = currTime - windowSize + + // Drops all buckets with an expired timestamp, based on the + // configured window and the supplied current time. def dropExpired(buckets: Vector[Bucket], currTime: Long): (Long, Vector[Bucket]) = ExpHist.dropExpired(buckets, expiration(currTime)) } @@ -31,36 +62,44 @@ object ExpHist { def empty(conf: Config): ExpHist = ExpHist(conf, Vector.empty, 0L, 0L) /** - * Create an instance from a number. + * Generate an instance directly from a number. All buckets in the + * returned ExpHist will have the same timestamp, equal to `ts`. */ def from(i: Long, ts: Long, conf: Config): ExpHist = { - val buckets = Canonical.longToBuckets(i, conf.l).map(Bucket(_, ts)) + val buckets = Canonical.bucketsFromLong(i, conf.l).map(Bucket(_, ts)) ExpHist(conf, buckets, i, ts) } /** - * Takes a vector of buckets sorted in descending order (newest - * first) and a cutoff and returns - * - * - The sum of the sizes of the dropped buckets - * - a vector containing only buckets with timestamps AFTER the cutoff. + * @param buckets [buckets] sorted in DESCENDING order (recent first) + * @param cutoff buckets with ts <= cutoff are expired + * @return the sum of evicted bucket sizes and the unexpired buckets */ - def dropExpired(timestamps: Vector[Bucket], cutoff: Long): (Long, Vector[Bucket]) = { - val (dropped, remaining) = timestamps.reverse.span(_.timestamp <= cutoff) + def dropExpired(buckets: Vector[Bucket], cutoff: Long): (Long, Vector[Bucket]) = { + val (dropped, remaining) = buckets.reverse.span(_.timestamp <= cutoff) (dropped.map(_.size).sum, remaining.reverse) } /** - * Rebuckets the vector of inputs into buckets of the supplied - * sequence of sizes. Returns only the associated `T` information - * (timestamp, for example). + * Converts the supplied buckets into a NEW vector of buckets + * satisfying this law: + * + * {{{ + * rebucket(buckets, desired).map(_.size).sum == desired + * }}} + * + * (rebucket only works if desired.sum == buckets.map(_.size).sum) + * + * @param buckets [buckets] sorted in DESCENDING order (recent first) + * @param desired bucket sizes to rebucket `buckets` into. */ - def rebucket(buckets: Vector[Bucket], desired: Vector[Long]): Vector[Bucket] = + private[algebird] def rebucket(buckets: Vector[Bucket], desired: Vector[Long]): Vector[Bucket] = if (desired.isEmpty) Vector.empty else { + val input = buckets.dropWhile(_.size == 0) val bucketSize +: tail = desired - val remaining = drop(bucketSize, buckets) - buckets.head.copy(size = bucketSize) +: rebucket(remaining, tail) + val remaining = drop(bucketSize, input) + input.head.copy(size = bucketSize) +: rebucket(remaining, tail) } /** @@ -81,9 +120,27 @@ object ExpHist { } } +/** + * @param conf the config values for this isntance. + * @param buckets Vector of timestamps of each (powers of 2) + * ticks. This is the core of the exponential + * histogram representation. See [[Canonical]] for + * more info. + * @param total total ticks tracked, == buckets.map(_.size).sum + * @param time current timestamp of this instance. + * Used with conf.windowSize to expire buckets. + */ case class ExpHist(conf: ExpHist.Config, buckets: Vector[ExpHist.Bucket], total: Long, time: Long) { import ExpHist.Bucket + /** + * Steps this instance forward to the new supplied time. Any + * buckets with a timestamp <= (newTime - conf.windowSize) will be + * evicted. + * + * @param newTime the new current time. + * @return ExpHist instance stepped forward to newTime. + */ def step(newTime: Long): ExpHist = if (newTime <= time) this else { @@ -91,14 +148,27 @@ case class ExpHist(conf: ExpHist.Config, buckets: Vector[ExpHist.Bucket], total: copy(time = newTime, buckets = filtered, total = total - dropped) } + /** + * Increment ExpHist by 1 at the supplied timestamp. + */ def inc(timestamp: Long): ExpHist = add(1L, timestamp) + /** + * Increment ExpHist by delta at the supplied timestamp. + */ def add(delta: Long, timestamp: Long): ExpHist = { val self = step(timestamp) if (delta == 0) self else self.addAllWithoutStep(Vector(Bucket(delta, timestamp)), delta) } + /** + * Efficiently add many buckets at once. + * + * @param unsorted [bucket]. All timestamps must be >= this.time. + * @return ExpHist instance with all buckets added, stepped + * forward to the most timestamp in `unsorted`. + */ def addAll(unsorted: Vector[Bucket]): ExpHist = if (unsorted.isEmpty) this else { @@ -107,31 +177,49 @@ case class ExpHist(conf: ExpHist.Config, buckets: Vector[ExpHist.Bucket], total: val timestamp = sorted.head.timestamp if (delta == 0) step(timestamp) - else + else { addAllWithoutStep(sorted, delta).step(timestamp) + } } - // Note that this internal method assumes that the instance is - // stepped forward already, and does NOT try to step internally. It - // also assumes that `items` is sorted in ASCENDING order, with - // newer items on the right side. - // private[ExpHist] - def addAllWithoutStep(items: Vector[Bucket], delta: Long): ExpHist = { + // This internal method assumes that the instance is stepped forward + // already, and does NOT try to step internally. It also assumes + // that `items` is sorted in ASCENDING order, with newer items on + // the right side, and that `items.map(_.size).sum == delta`. + private[ExpHist] def addAllWithoutStep(items: Vector[Bucket], delta: Long): ExpHist = { val inputs = items ++ buckets - val desiredBuckets = Canonical.longToBuckets(total + delta, conf.l) + val desiredBuckets = Canonical.bucketsFromLong(total + delta, conf.l) copy( buckets = ExpHist.rebucket(inputs, desiredBuckets), total = total + delta) } - def oldestBucketSize: Long = if (buckets.isEmpty) 0L else buckets.last.size + def oldestBucketSize: Long = if (total == 0) 0L else buckets.last.size + /** + * Smallest possible count seen in the last conf.windowSize + * timestamps. + */ def lowerBoundSum: Long = total - oldestBucketSize + /** + * Largest possible count seen in the last conf.windowSize + * timestamps. + */ def upperBoundSum: Long = total - def guess: Double = total - (oldestBucketSize - 1) / 2.0 + /** + * Estimate of the count seen across the last conf.windowSize + * timestamps. Guaranteed to be within conf.epsilon of the true + * count. + */ + def guess: Double = + if (total == 0) 0.0 + else (total - (oldestBucketSize - 1) / 2.0) + /** + * relative error of guess, guaranteed to be <= conf.epsilon. + */ def relativeError: Double = if (total == 0) 0.0 else { @@ -141,6 +229,39 @@ case class ExpHist(conf: ExpHist.Config, buckets: Vector[ExpHist.Bucket], total: } } +/** + * The paper that introduces the exponential histogram proves that, + * given a positive number `l`, every integer can be uniquely + * represented as the sum of + * + * (`l` or `l + 1`) * 2^i + * + * for i = 0 to j, given some j. + * + * The paper calls this the "l-canonical" representation of the + * number. + * + * It turns out that if you follow the exponential histogram + * bucket-merging algorithm, you end up with the invariant that the + * number of buckets with size 2^i exactly matches that power of 2's + * coefficient in its l-canonical representation. + * + * Put another way - only sequences of buckets with sizes matching + * the l-canonical representation are valid exponential histograms. + * + * (We use this idea in `ExpHist.rebucket` to take a sequence of + * buckets of any size and rebucket them into a sequence where the + * above invariant holds.) + * + * This is huge. This means that you can implement `addAll(newBuckets)` by + * + * - calculating newTotal = total + delta contributed by newBuckets + * - generating the l-canonical sequence of bucket sizes for newTotal + * - rebucketing newBuckets ++ oldBuckets into those bucket sizes + * + * The resulting sequence of buckets is a valid exponential + * histogram. + */ object Canonical { @inline private[this] def floorPowerOfTwo(x: Long): Int = JLong.numberOfTrailingZeros(JLong.highestOneBit(x)) @@ -153,14 +274,15 @@ object Canonical { (0 until bits).map { idx => offset + bit(i, idx) }.toVector /** - * returns a vector of the total number of buckets of size `s^i`, - * where `i` is the vector index, used to represent s for a given - * k. + * (i = vector index, j = index of last entry) * - * `s` is the sum of the sizes of all of the buckets tracked by - * the ExpHist instance. + * returns a vector of the the coefficients of s^i in the + * l-canonical representation of s. * - * `l` or `l + 1` buckets allowed per size. + * the "l" means that + * + * - ret(i) for all i < j == l or l + 1 + * - ret(j) < l + 1 */ def fromLong(s: Long, l: Int): Vector[Int] = { if (s <= 0) Vector.empty @@ -174,7 +296,31 @@ object Canonical { } /** - * Expand out a number's l-canonical form into the original number. + * Expands `s` out into a list of numbers that all sum to s. Each + * entry is a power of 2, and the number of entries of each power + * of 2 matches `s`'s l-canonical representation (for the supplied + * l). + */ + def bucketsFromLong(s: Long, l: Int): Vector[Long] = { + if (s <= 0) Vector.empty + else { + val num = s + l + val denom = l + 1 + val j = floorPowerOfTwo(num / denom) + val offset = (num - (denom << j)).toInt + val prefixRep = modPow2(offset, j) + + (0 until j).toVector.flatMap { + idx => Vector.fill(l + bit(prefixRep, idx))(1L << idx) + } ++ List.fill(quotient(offset, j) + 1)(1L << j) + } + } + + /** + * Expands out an l-canonical representation into the original number. + * + * @param rep l-canonical representation of some number s for some l + * @return The original s */ def toLong(rep: Vector[Int]): Long = if (rep.isEmpty) 0L @@ -185,15 +331,12 @@ object Canonical { } /** - * Expands out the compressed form to a list of the bucket sizes in - * ascending order. + * Expands out the l-canonical representation of some number s into + * a list of bucket sizes in ascending order. + * + * @param rep l-canonical representation of some number s for some l + * @return vector of powers of 2 (where ret.sum == the original s) */ def toBuckets(rep: Vector[Int]): Vector[Long] = rep.zipWithIndex.flatMap { case (i, exp) => List.fill(i)(1L << exp) } - - /** - * Returns a sequence of (ascending-order) bucket sizes required to - * store the supplied `s` in l-canonical form. - */ - def longToBuckets(s: Long, l: Int): Vector[Long] = toBuckets(fromLong(s, l)) } diff --git a/algebird-test/src/test/scala/com/twitter/algebird/ExpHistLaws.scala b/algebird-test/src/test/scala/com/twitter/algebird/ExpHistLaws.scala index 707daef82..d4fe85d6e 100644 --- a/algebird-test/src/test/scala/com/twitter/algebird/ExpHistLaws.scala +++ b/algebird-test/src/test/scala/com/twitter/algebird/ExpHistLaws.scala @@ -1,78 +1,132 @@ package com.twitter.algebird -import org.scalatest.{ PropSpec, Matchers } +import org.scalatest.PropSpec import org.scalatest.prop.PropertyChecks import org.scalatest.prop.Checkers.check import org.scalacheck.{ Gen, Arbitrary } import Arbitrary.arbitrary -case class PosNum[T: Numeric](value: T) +class ExpHistLaws extends PropSpec with PropertyChecks { + import ExpHistGenerators._ + import ExpHist.{ Bucket, Config } -object PosNum { - implicit def arb[T: Numeric: Gen.Choose]: Arbitrary[PosNum[T]] = - Arbitrary(for { p <- Gen.posNum[T] } yield PosNum(p)) -} + property("Increment example from DGIM paper") { + // Returns a vector of bucket sizes from largest to smallest. + def bSizes(e: ExpHist): Vector[Long] = e.buckets.reverse.map(_.size) -case class NonEmptyVector[T](items: Vector[T]) { - def sorted(implicit ev: Ordering[T]): Vector[T] = items.sorted -} + // epsilon of 0.5 gives us window sizes of 1 or 2. + val e = ExpHist.empty(Config(0.5, 100)) -object NonEmptyVector { - implicit def arb[T: Ordering: Arbitrary]: Arbitrary[NonEmptyVector[T]] = - Arbitrary(for { - l <- Gen.nonEmptyListOf(arbitrary[T]) - } yield NonEmptyVector[T](l.toVector)) -} + val plus76 = e.add(76, 0) + val inc = plus76.inc(0) + val twoMore = inc.add(2, 0) -class ExpHistLaws extends PropSpec with PropertyChecks with Matchers { - import ExpHist.{ Bucket, Config } + assert(bSizes(plus76) == Vector(32, 16, 8, 8, 4, 4, 2, 1, 1)) + assert(bSizes(inc) == Vector(32, 16, 8, 8, 4, 4, 2, 2, 1)) + assert(bSizes(twoMore) == Vector(32, 16, 16, 8, 4, 2, 1)) + } - implicit val arb: Arbitrary[Bucket] = - Arbitrary(for { - count <- Gen.posNum[Long] - timestamp <- Gen.posNum[Long] - } yield Bucket(count - 1L, timestamp)) + /** + * An "exponential histogram" tracks the count of a sliding window + * with a fixed maximum relative error. The core guarantees are: + * + * - The actual sum will always be within the tracked bounds + * - The EH's guess is always within epsilon the actual. + * - The relative error of the count is at most epsilon + * - the relative error is always between 0 and 0.5. + */ + def checkCoreProperties(eh: ExpHist, actualSum: Long) { + assert(eh.lowerBoundSum <= actualSum) + assert(eh.upperBoundSum >= actualSum) + + val maxError = actualSum * eh.relativeError + assert(eh.guess <= actualSum + maxError) + assert(eh.guess >= actualSum - maxError) - implicit val conf: Arbitrary[Config] = - Arbitrary(for { - k <- Gen.posNum[Short] - windowSize <- Gen.posNum[Long] - } yield Config(1 / k.toDouble, windowSize)) + assert(eh.relativeError <= eh.conf.epsilon) - implicit val expHist: Arbitrary[ExpHist] = - Arbitrary(for { - buckets <- arbitrary[Vector[Bucket]] - conf <- arbitrary[ExpHist.Config] - } yield ExpHist.empty(conf).addAll(buckets)) + assert(eh.relativeError <= 0.5) + assert(eh.relativeError >= 0) + } + + /** + * Returns the ACTUAL sum of the supplied vector of buckets, + * filtering out any bucket with a timestamp <= exclusiveCutoff. + */ + def actualBucketSum(buckets: Vector[Bucket], exclusiveCutoff: Long): Long = + buckets.collect { + case Bucket(count, ts) if ts > exclusiveCutoff => count + }.sum - // The example in the paper is actually busted, based on his - // algorithm. He says to assume that k/2 == 2, but then he promotes - // as if k/2 == 1, ie k == 2. - property("example from paper") { - // Returns the vector of bucket sizes from largest to smallest. - def windows(e: ExpHist): Vector[Long] = e.buckets.reverse.map(_.size) + // addAll and add are NOT guaranteed to return the same exponential + // histogram, but either method of inserting buckets needs to return + // an EH that satisfies the core properties. + property("ExpHist.addAll satisfies core properties") { + forAll { (v: NonEmptyVector[Bucket], conf: Config) => + val buckets = v.items + val mostRecentTs = buckets.maxBy(_.timestamp).timestamp + val cutoff = conf.expiration(mostRecentTs) - val e = ExpHist.empty(ExpHist.Config(0.5, 100)) - val plus76 = e.add(76, 0) + val fullViaAddAll = ExpHist.empty(conf).addAll(buckets) + val actualSum = actualBucketSum(buckets, cutoff) + checkCoreProperties(fullViaAddAll, actualSum) + } + } - val inc = plus76.inc(0) - val incinc = inc.add(2, 0) + property("ExpHist.add satisfies core properties") { + forAll { (items: NonEmptyVector[Bucket], conf: Config) => + val buckets = items.sorted + val mostRecentTs = buckets.last.timestamp + val cutoff = conf.expiration(mostRecentTs) + + val fullViaAdd = buckets.foldLeft(ExpHist.empty(conf)) { + case (e, Bucket(c, t)) => e.add(c, t) + } - assert(windows(plus76) == Vector(32, 16, 8, 8, 4, 4, 2, 1, 1)) - assert(windows(inc) == Vector(32, 16, 8, 8, 4, 4, 2, 2, 1)) - assert(windows(incinc) == Vector(32, 16, 16, 8, 4, 2, 1)) + val actualSum = actualBucketSum(buckets, cutoff) + checkCoreProperties(fullViaAdd, actualSum) + } + } + + def isPowerOfTwo(i: Long): Boolean = (i & -i) == i + + property("verify isPowerOfTwo") { + forAll { i: PosNum[Int] => + val power = math.pow(2, i.value % 64).toLong + assert(isPowerOfTwo(power)) + } + } + + property("bucket sizes are all powers of two") { + forAll { e: ExpHist => + assert(e.buckets.forall { b => isPowerOfTwo(b.size) }) + } + } + + property("Total tracked by e is the sum of all bucket sizes") { + forAll { e: ExpHist => + assert(e.buckets.map(_.size).sum == e.total) + } + } + + property("ExpHist bucket sizes are the l-canonical rep of the tracked total") { + forAll { e: ExpHist => + assert(e.buckets.map(_.size) == Canonical.bucketsFromLong(e.total, e.conf.l)) + } } property("adding i results in upperBoundSum == i") { - forAll { (conf: ExpHist.Config, bucket: Bucket) => + // since every bucket has the same timestamp, if nothing expires + // then the upper bound is equal to the actual total. + forAll { (conf: Config, bucket: Bucket) => assert(ExpHist.empty(conf) .add(bucket.size, bucket.timestamp) .upperBoundSum == bucket.size) } } - property("empty.add and from are identical") { - forAll { (conf: ExpHist.Config, bucket: Bucket) => + property("ExpHist.empty.add(i) and ExpHist.from(i) are identical") { + forAll { (conf: Config, bucket: Bucket) => val byAdd = ExpHist.empty(conf).add(bucket.size, bucket.timestamp) val byFrom = ExpHist.from(bucket.size, bucket.timestamp, conf) assert(byAdd == byFrom) @@ -81,13 +135,13 @@ class ExpHistLaws extends PropSpec with PropertyChecks with Matchers { property("step should be idempotent") { forAll { (expHist: ExpHist, bucket: Bucket) => - val ts = bucket.timestamp - val stepped = expHist.step(ts) - assert(stepped == stepped.step(ts)) + val t = bucket.timestamp + val stepped = expHist.step(t) + assert(stepped == stepped.step(t)) - val added = expHist.add(bucket.size, ts) - val addThenStep = added.step(ts) - val stepThenAdd = stepped.add(bucket.size, ts) + val added = expHist.add(bucket.size, t) + val addThenStep = added.step(t) + val stepThenAdd = stepped.add(bucket.size, t) assert(added == addThenStep) assert(added == stepThenAdd) @@ -95,110 +149,122 @@ class ExpHistLaws extends PropSpec with PropertyChecks with Matchers { } } - property("stepping is the same as adding 0 at the same ts") { + property("step(t) == add(0, t)") { forAll { (expHist: ExpHist, ts: PosNum[Long]) => assert(expHist.step(ts.value) == expHist.add(0, ts.value)) } } - // property("add matches addAll") { - // forAll { (unsorted: NonEmptyVector[Bucket], conf: ExpHist.Config) => - // val e = ExpHist.empty(conf) - // val items = unsorted.sorted - // val withAdd = items.foldLeft(e) { - // case (e, Bucket(count, timestamp)) => - // e.add(count, timestamp) - // } - - // assert(withAdd == e.addAll(items)) - // } - // } + property("add(i) and inc i times should generate the same EH") { + forAll { (bucket: Bucket, conf: Config) => + val e = ExpHist.empty(conf) - // This is the meat! - def testBounds(eh: ExpHist, actualSum: Long) { - val upperBound = eh.upperBoundSum - val lowerBound = eh.lowerBoundSum - assert(lowerBound <= actualSum) - assert(upperBound >= actualSum) + val incs = (0L until bucket.size).foldLeft(e) { + case (acc, _) => acc.inc(bucket.timestamp) + }.step(bucket.timestamp) + val adds = e.add(bucket.size, bucket.timestamp) + assert(incs == adds) + } + } - val maxOutsideWindow = eh.oldestBucketSize - 1 - val minInsideWindow = 1 + eh.total - eh.oldestBucketSize - val absoluteError = maxOutsideWindow / 2.0 - val relativeError = absoluteError / minInsideWindow + property("empty EH returns 0 for all metrics") { + forAll { (conf: Config) => + val e = ExpHist.empty(conf) + assert(e.guess == 0) + assert(e.total == 0) + assert(e.relativeError == 0) + assert(e.upperBoundSum == 0) + assert(e.lowerBoundSum == 0) + } + } - // local relative error calc: - assert(relativeError <= eh.conf.epsilon) + property("dropExpired works properly") { + forAll { (v: NonEmptyVector[Bucket], window: PosNum[Long]) => + val buckets = v.sorted.reverse + val cutoff = buckets.head.timestamp - window.value - // Instance's relative error calc: - assert(eh.relativeError <= eh.conf.epsilon) + val (droppedSum, remaining) = ExpHist.dropExpired(buckets, cutoff) - // Error can never be above 0.5. - assert(eh.relativeError <= 0.5) + assert(droppedSum == buckets.filter(_.timestamp <= cutoff).map(_.size).sum) + assert(remaining == buckets.filter(_.timestamp > cutoff)) + } } - def actualBucketSum(buckets: Vector[Bucket], exclusiveCutoff: Long): Long = - buckets.collect { - case Bucket(count, ts) if ts > exclusiveCutoff => count - }.sum + property("rebucketing into bucket sizes from a canonical rep works") { + forAll { (v: NonEmptyVector[Bucket], k: PosNum[Short]) => + val buckets = v.sorted + val total = buckets.map(_.size).sum + val desired = Canonical.bucketsFromLong(total, k.value) + val rebucketed = ExpHist.rebucket(buckets, desired) - property("bounded error of the query with addAll") { - forAll { (v: NonEmptyVector[Bucket], conf: Config) => - val buckets = v.items - val recentTs = buckets.maxBy(_.timestamp).timestamp - val cutoff = conf.expiration(recentTs) + // rebucketing doesn't change the sum of bucket sizes. + assert(rebucketed.map(_.size).sum == total) + + // rebucketing works - the final sequence of sizes matches the + // desired sequence. + assert(rebucketed.map(_.size) == desired) - testBounds( - ExpHist.empty(conf).addAll(buckets), - actualBucketSum(buckets, cutoff)) + // all bucket sizes are now powers of two. + assert(rebucketed.forall { b => isPowerOfTwo(b.size) }) } } +} - property("bounded error of the query with add") { - forAll { (items: NonEmptyVector[Bucket], conf: Config) => - val buckets = items.sorted - val recentTs = buckets.last.timestamp - val cutoff = conf.expiration(recentTs) +object ExpHistGenerators { + import ExpHist.{ Bucket, Config } - val full = buckets.foldLeft(ExpHist.empty(conf)) { - case (e, Bucket(c, t)) => e.add(c, t) - } + implicit val arb: Arbitrary[Bucket] = + Arbitrary(for { + count <- Gen.posNum[Long] + timestamp <- Gen.posNum[Long] + } yield Bucket(count - 1L, timestamp)) - testBounds(full, actualBucketSum(buckets, cutoff)) - } - } + implicit val conf: Arbitrary[Config] = + Arbitrary(for { + k <- Gen.posNum[Short] + windowSize <- Gen.posNum[Long] + } yield Config(1 / k.toDouble, windowSize)) - property("add and inc should generate the same results") { - forAll { (bucket: Bucket, conf: ExpHist.Config) => - val e = ExpHist.empty(conf) + implicit val expHist: Arbitrary[ExpHist] = + Arbitrary(for { + buckets <- arbitrary[Vector[Bucket]] + conf <- arbitrary[Config] + } yield ExpHist.empty(conf).addAll(buckets)) +} - val incs = (0L until bucket.size).foldLeft(e) { - case (acc, _) => acc.inc(bucket.timestamp) - } +class CanonicalLaws extends PropSpec with PropertyChecks { + import Canonical._ - val adds = e.add(bucket.size, bucket.timestamp) + property("l-canonical representation is all l or l+1s except for last") { + forAll { (i: PosNum[Long], l: PosNum[Short]) => + val rep = fromLong(i.value, l.value) - assert(incs.total == adds.total) - assert(incs.lowerBoundSum == adds.lowerBoundSum) - assert(incs.upperBoundSum == adds.upperBoundSum) + // all values but the last are l or l + 1 + assert(rep.init.forall(v => v == l.value || v == l.value + 1)) + + assert(rep.last <= l.value + 1) } } -} -class CanonicalLaws extends PropSpec with PropertyChecks with Matchers { - property("l-canonical representation round-trips") { + property("canonical representation round-trips") { forAll { (i: PosNum[Long], l: PosNum[Short]) => - assert(Canonical.toLong(Canonical.fromLong(i.value, l.value)) == i.value) + assert(toLong(fromLong(i.value, l.value)) == i.value) } } - property("Sum of canonical rep is # of buckets required.") { + property("fromLong(i, k).sum == # of buckets required to encode i") { forAll { (i: PosNum[Long], k: PosNum[Short]) => - val rep = Canonical.fromLong(i.value, k.value) - val numBuckets = Canonical.toBuckets(rep).size - assert(numBuckets == rep.sum) + val rep = fromLong(i.value, k.value) + val numBuckets = toBuckets(rep).size + + assert(rep.sum == numBuckets) + } + } - // longToBuckets works. - assert(Canonical.longToBuckets(i.value, k.value).size == numBuckets) + property("bucketsFromLong(i, k).sum generates buckets directly") { + forAll { (i: PosNum[Long], k: PosNum[Short]) => + val rep = fromLong(i.value, k.value) + assert(bucketsFromLong(i.value, k.value) == toBuckets(rep)) } } @@ -207,7 +273,7 @@ class CanonicalLaws extends PropSpec with PropertyChecks with Matchers { val lower = k.value val upper = lower + 1 assert( - Canonical.fromLong(i.value, k.value).init.forall { numBuckets => + fromLong(i.value, k.value).init.forall { numBuckets => lower <= numBuckets && numBuckets <= upper }) } diff --git a/algebird-test/src/test/scala/com/twitter/algebird/Generators.scala b/algebird-test/src/test/scala/com/twitter/algebird/Generators.scala index 506bbdb1e..f63fff3bf 100644 --- a/algebird-test/src/test/scala/com/twitter/algebird/Generators.scala +++ b/algebird-test/src/test/scala/com/twitter/algebird/Generators.scala @@ -19,6 +19,25 @@ package com.twitter.algebird import org.scalacheck.Arbitrary import org.scalacheck.Gen import org.scalacheck.Gen._ +import Arbitrary.arbitrary + +case class PosNum[T: Numeric](value: T) + +object PosNum { + implicit def arb[T: Numeric: Gen.Choose]: Arbitrary[PosNum[T]] = + Arbitrary(for { p <- Gen.posNum[T] } yield PosNum(p)) +} + +case class NonEmptyVector[T](items: Vector[T]) { + def sorted(implicit ev: Ordering[T]): Vector[T] = items.sorted +} + +object NonEmptyVector { + implicit def arb[T: Ordering: Arbitrary]: Arbitrary[NonEmptyVector[T]] = + Arbitrary(for { + l <- Gen.nonEmptyListOf(arbitrary[T]) + } yield NonEmptyVector[T](l.toVector)) +} /** * Generators useful in testing Interval @@ -55,27 +74,27 @@ object Generators { def genInclusiveLower[T: Arbitrary: Ordering] = for { - l <- Arbitrary.arbitrary[T] + l <- arbitrary[T] } yield InclusiveLower(l) def genExclusiveLower[T: Arbitrary: Ordering] = for { - l <- Arbitrary.arbitrary[T] + l <- arbitrary[T] } yield ExclusiveLower(l) def genInclusiveUpper[T: Arbitrary: Ordering] = for { - u <- Arbitrary.arbitrary[T] + u <- arbitrary[T] } yield InclusiveUpper(u) def genExclusiveUpper[T: Arbitrary: Ordering] = for { - u <- Arbitrary.arbitrary[T] + u <- arbitrary[T] } yield ExclusiveUpper(u) def genIntersection[T: Arbitrary: Ordering] = for { - l <- Arbitrary.arbitrary[Lower[T]] - u <- Arbitrary.arbitrary[Upper[T]] if l.intersects(u) + l <- arbitrary[Lower[T]] + u <- arbitrary[Upper[T]] if l.intersects(u) } yield Intersection(l, u) } From 9c5f375b7fe8e54d6b00f486680c23356e1814e6 Mon Sep 17 00:00:00 2001 From: Sam Ritchie Date: Tue, 1 Nov 2016 12:59:54 -0600 Subject: [PATCH 08/17] add docs, address oscar's comments --- .../scala/com/twitter/algebird/ExpHist.scala | 393 ++++++++++-------- .../com/twitter/algebird/ExpHistLaws.scala | 16 +- 2 files changed, 218 insertions(+), 191 deletions(-) diff --git a/algebird-core/src/main/scala/com/twitter/algebird/ExpHist.scala b/algebird-core/src/main/scala/com/twitter/algebird/ExpHist.scala index d5b61de83..1e7515a5e 100644 --- a/algebird-core/src/main/scala/com/twitter/algebird/ExpHist.scala +++ b/algebird-core/src/main/scala/com/twitter/algebird/ExpHist.scala @@ -29,98 +29,9 @@ import scala.annotation.tailrec * - Query EH with a shorter window than the configured window * - Discussion of epsilon vs memory tradeoffs * - Discussion of how Canonical.fromLong works - */ -object ExpHist { - case class Bucket(size: Long, timestamp: Long) - - object Bucket { - implicit val ord: Ordering[Bucket] = Ordering.by { b: Bucket => (b.timestamp, b.size) } - } - - /** - * ExpHist guarantees that the returned guess will be within - * `epsilon` relative error of the true count across a sliding - * window of size `windowSize`. - */ - case class Config(epsilon: Double, windowSize: Long) { - val k: Int = math.ceil(1 / epsilon).toInt - val l: Int = math.ceil(k / 2.0).toInt - - // Returns the last timestamp before the window. any ts <= [the - // returned timestamp] is outside the window. - def expiration(currTime: Long): Long = currTime - windowSize - - // Drops all buckets with an expired timestamp, based on the - // configured window and the supplied current time. - def dropExpired(buckets: Vector[Bucket], currTime: Long): (Long, Vector[Bucket]) = - ExpHist.dropExpired(buckets, expiration(currTime)) - } - - /** - * Create an empty instance with the supplied Config. - */ - def empty(conf: Config): ExpHist = ExpHist(conf, Vector.empty, 0L, 0L) - - /** - * Generate an instance directly from a number. All buckets in the - * returned ExpHist will have the same timestamp, equal to `ts`. - */ - def from(i: Long, ts: Long, conf: Config): ExpHist = { - val buckets = Canonical.bucketsFromLong(i, conf.l).map(Bucket(_, ts)) - ExpHist(conf, buckets, i, ts) - } - - /** - * @param buckets [buckets] sorted in DESCENDING order (recent first) - * @param cutoff buckets with ts <= cutoff are expired - * @return the sum of evicted bucket sizes and the unexpired buckets - */ - def dropExpired(buckets: Vector[Bucket], cutoff: Long): (Long, Vector[Bucket]) = { - val (dropped, remaining) = buckets.reverse.span(_.timestamp <= cutoff) - (dropped.map(_.size).sum, remaining.reverse) - } - - /** - * Converts the supplied buckets into a NEW vector of buckets - * satisfying this law: - * - * {{{ - * rebucket(buckets, desired).map(_.size).sum == desired - * }}} - * - * (rebucket only works if desired.sum == buckets.map(_.size).sum) - * - * @param buckets [buckets] sorted in DESCENDING order (recent first) - * @param desired bucket sizes to rebucket `buckets` into. - */ - private[algebird] def rebucket(buckets: Vector[Bucket], desired: Vector[Long]): Vector[Bucket] = - if (desired.isEmpty) Vector.empty - else { - val input = buckets.dropWhile(_.size == 0) - val bucketSize +: tail = desired - val remaining = drop(bucketSize, input) - input.head.copy(size = bucketSize) +: rebucket(remaining, tail) - } - - /** - * @param toDrop total count to remove from the left of `input`. - * @param input buckets - * @return Vector with buckets, or pieces of buckets, with sizes - * totalling `toDrop` items removed from the head. If an - * element wasn't fully consumed, the remainder will be - * stuck back onto the head. - */ - @tailrec private[this] def drop(toDrop: Long, input: Vector[Bucket]): Vector[Bucket] = { - val (b @ Bucket(count, _)) +: tail = input - (toDrop - count) match { - case 0 => tail - case x if x < 0 => b.copy(size = -x) +: tail - case x if x > 0 => drop(x, tail) - } - } -} - -/** + * + * + * * @param conf the config values for this isntance. * @param buckets Vector of timestamps of each (powers of 2) * ticks. This is the core of the exponential @@ -131,7 +42,7 @@ object ExpHist { * Used with conf.windowSize to expire buckets. */ case class ExpHist(conf: ExpHist.Config, buckets: Vector[ExpHist.Bucket], total: Long, time: Long) { - import ExpHist.Bucket + import ExpHist.{ Bucket, Canonical } /** * Steps this instance forward to the new supplied time. Any @@ -229,114 +140,230 @@ case class ExpHist(conf: ExpHist.Config, buckets: Vector[ExpHist.Bucket], total: } } -/** - * The paper that introduces the exponential histogram proves that, - * given a positive number `l`, every integer can be uniquely - * represented as the sum of - * - * (`l` or `l + 1`) * 2^i - * - * for i = 0 to j, given some j. - * - * The paper calls this the "l-canonical" representation of the - * number. - * - * It turns out that if you follow the exponential histogram - * bucket-merging algorithm, you end up with the invariant that the - * number of buckets with size 2^i exactly matches that power of 2's - * coefficient in its l-canonical representation. - * - * Put another way - only sequences of buckets with sizes matching - * the l-canonical representation are valid exponential histograms. - * - * (We use this idea in `ExpHist.rebucket` to take a sequence of - * buckets of any size and rebucket them into a sequence where the - * above invariant holds.) - * - * This is huge. This means that you can implement `addAll(newBuckets)` by - * - * - calculating newTotal = total + delta contributed by newBuckets - * - generating the l-canonical sequence of bucket sizes for newTotal - * - rebucketing newBuckets ++ oldBuckets into those bucket sizes - * - * The resulting sequence of buckets is a valid exponential - * histogram. - */ -object Canonical { - @inline private[this] def floorPowerOfTwo(x: Long): Int = - JLong.numberOfTrailingZeros(JLong.highestOneBit(x)) +object ExpHist { + case class Bucket(size: Long, timestamp: Long) - @inline private[this] def modPow2(i: Int, exp2: Int): Int = i & ((1 << exp2) - 1) - @inline private[this] def quotient(i: Int, exp2: Int): Int = i >>> exp2 - @inline private[this] def bit(i: Int, idx: Int): Int = (i >>> idx) & 1 + object Bucket { + implicit val ord: Ordering[Bucket] = Ordering.by { b: Bucket => (b.timestamp, b.size) } + } - private[this] def binarize(i: Int, bits: Int, offset: Int): Vector[Int] = - (0 until bits).map { idx => offset + bit(i, idx) }.toVector + /** + * ExpHist guarantees that the returned guess will be within + * `epsilon` relative error of the true count across a sliding + * window of size `windowSize`. + */ + case class Config(epsilon: Double, windowSize: Long) { + val k: Int = math.ceil(1 / epsilon).toInt + val l: Int = math.ceil(k / 2.0).toInt + + // Returns the last timestamp before the window. any ts <= [the + // returned timestamp] is outside the window. + def expiration(currTime: Long): Long = currTime - windowSize + + // Drops all buckets with an expired timestamp, based on the + // configured window and the supplied current time. + def dropExpired(buckets: Vector[Bucket], currTime: Long): (Long, Vector[Bucket]) = + ExpHist.dropExpired(buckets, expiration(currTime)) + } /** - * (i = vector index, j = index of last entry) - * - * returns a vector of the the coefficients of s^i in the - * l-canonical representation of s. - * - * the "l" means that - * - * - ret(i) for all i < j == l or l + 1 - * - ret(j) < l + 1 + * Create an empty instance with the supplied Config. */ - def fromLong(s: Long, l: Int): Vector[Int] = { - if (s <= 0) Vector.empty - else { - val num = s + l - val denom = l + 1 - val j = floorPowerOfTwo(num / denom) - val offset = (num - (denom << j)).toInt - binarize(modPow2(offset, j), j, l) :+ (quotient(offset, j) + 1) - } + def empty(conf: Config): ExpHist = ExpHist(conf, Vector.empty, 0L, 0L) + + /** + * Generate an instance directly from a number. All buckets in the + * returned ExpHist will have the same timestamp, equal to `ts`. + */ + def from(i: Long, ts: Long, conf: Config): ExpHist = { + val buckets = Canonical.bucketsFromLong(i, conf.l).map(Bucket(_, ts)) + ExpHist(conf, buckets, i, ts) } /** - * Expands `s` out into a list of numbers that all sum to s. Each - * entry is a power of 2, and the number of entries of each power - * of 2 matches `s`'s l-canonical representation (for the supplied - * l). + * @param buckets [buckets] sorted in DESCENDING order (recent first) + * @param cutoff buckets with ts <= cutoff are expired + * @return the sum of evicted bucket sizes and the unexpired buckets */ - def bucketsFromLong(s: Long, l: Int): Vector[Long] = { - if (s <= 0) Vector.empty - else { - val num = s + l - val denom = l + 1 - val j = floorPowerOfTwo(num / denom) - val offset = (num - (denom << j)).toInt - val prefixRep = modPow2(offset, j) - - (0 until j).toVector.flatMap { - idx => Vector.fill(l + bit(prefixRep, idx))(1L << idx) - } ++ List.fill(quotient(offset, j) + 1)(1L << j) - } + def dropExpired(buckets: Vector[Bucket], cutoff: Long): (Long, Vector[Bucket]) = { + val (dropped, remaining) = buckets.reverse.span(_.timestamp <= cutoff) + (dropped.map(_.size).sum, remaining.reverse) } /** - * Expands out an l-canonical representation into the original number. + * Converts the supplied buckets into a NEW vector of buckets + * satisfying this law: + * + * {{{ + * rebucket(buckets, desired).map(_.size).sum == desired + * }}} * - * @param rep l-canonical representation of some number s for some l - * @return The original s + * (rebucket only works if desired.sum == buckets.map(_.size).sum) + * + * @param buckets [buckets] sorted in DESCENDING order (recent first) + * @param desired bucket sizes to rebucket `buckets` into. */ - def toLong(rep: Vector[Int]): Long = - if (rep.isEmpty) 0L + private[algebird] def rebucket(buckets: Vector[Bucket], desired: Vector[Long]): Vector[Bucket] = + if (desired.isEmpty) Vector.empty else { - rep.iterator.zipWithIndex - .map { case (i, exp) => i.toLong << exp } - .reduce(_ + _) + val input = buckets.dropWhile(_.size == 0) + val bucketSize +: tail = desired + val remaining = drop(bucketSize, input) + input.head.copy(size = bucketSize) +: rebucket(remaining, tail) + } + + /** + * @param toDrop total count to remove from the left of `input`. + * @param input buckets + * @return Vector with buckets, or pieces of buckets, with sizes + * totalling `toDrop` items removed from the head. If an + * element wasn't fully consumed, the remainder will be + * stuck back onto the head. + */ + @tailrec private[this] def drop(toDrop: Long, input: Vector[Bucket]): Vector[Bucket] = { + val (b @ Bucket(count, _)) +: tail = input + (toDrop - count) match { + case 0 => tail + case x if x < 0 => b.copy(size = -x) +: tail + case x if x > 0 => drop(x, tail) } + } /** - * Expands out the l-canonical representation of some number s into - * a list of bucket sizes in ascending order. + * The paper that introduces the exponential histogram proves that, + * given a positive number `l`, every integer can be uniquely + * represented as the sum of + * + * (l or (l + 1)) * 2^i + (# from 1 to (l + 1)) 2^j + * + * for i = (0 to j - 1), given some j. + * + * The paper calls this the "l-canonical" representation of the + * number. + * + * It turns out that if you follow the exponential histogram + * bucket-merging algorithm, you end up with the invariant that the + * number of buckets with size 2^i exactly matches that power of 2's + * coefficient in its l-canonical representation. + * + * Put another way - only sequences of buckets with sizes matching + * the l-canonical representation are valid exponential histograms. + * + * (We use this idea in `ExpHist.rebucket` to take a sequence of + * buckets of any size and rebucket them into a sequence where the + * above invariant holds.) * - * @param rep l-canonical representation of some number s for some l - * @return vector of powers of 2 (where ret.sum == the original s) + * This is huge. This means that you can implement `addAll(newBuckets)` by + * + * - calculating newTotal = total + delta contributed by newBuckets + * - generating the l-canonical sequence of bucket sizes for newTotal + * - rebucketing newBuckets ++ oldBuckets into those bucket sizes + * + * The resulting sequence of buckets is a valid exponential + * histogram. */ - def toBuckets(rep: Vector[Int]): Vector[Long] = - rep.zipWithIndex.flatMap { case (i, exp) => List.fill(i)(1L << exp) } + + object Canonical { + @inline private[this] def floorPowerOfTwo(x: Long): Int = + JLong.numberOfTrailingZeros(JLong.highestOneBit(x)) + + @inline private[this] def modPow2(i: Int, exp2: Int): Int = i & ((1 << exp2) - 1) + @inline private[this] def quotient(i: Int, exp2: Int): Int = i >>> exp2 + @inline private[this] def bit(i: Int, idx: Int): Int = (i >>> idx) & 1 + + private[this] def binarize(i: Int, bits: Int, offset: Int): Vector[Int] = + (0 until bits).map { idx => offset + bit(i, idx) }.toVector + + /** + * @param s the number to convert to l-canonical form + * @param l the "l" in l-canonical form + * @return vector of the coefficients of 2^i in the + * l-canonical representation of s. + * + * the "l" in l-canonical means that + * + * - all return vector entries but the last one == `l` or `l + 1` + * - 1 <= `returnVector.last` <= l + 1 + * + * The return vector's size is the largest k such that + * + * 2^k <= (s + l) / (l + 1) + * + * For example: + * + * {{{ + * scala> Canonical.fromLong(15, 2) + * res0: Vector[Int] = Vector(3, 2, 2) + * }}} + * 15 = (3 * 2^0) + (2 * 2^1) + (2 * 2^2) + */ + def fromLong(s: Long, l: Int): CanonicalVector = + if (s <= 0) CanonicalVector(Vector.empty) + else { + val num = s + l + val denom = l + 1 + val j = floorPowerOfTwo(num / denom) + val offset = (num - (denom << j)).toInt + CanonicalVector( + binarize(modPow2(offset, j), j, l) :+ (quotient(offset, j) + 1)) + } + + /** + * @param s the number to convert to l-canonical form + * @param l the "l" in l-canonical form + * @return vector of numbers that sum to s. Each + * entry is a power of 2, and the number of entries of + * each power of 2 matches the l-canonical + * representation of s. + * + * Note that: + * + * {{{ + * bucketsFromLong(s, l) == fromLong(s, k).toBuckets + * }}} + * + * bucketsFromLong is more efficient. + */ + def bucketsFromLong(s: Long, l: Int): Vector[Long] = + if (s <= 0) Vector.empty + else { + val num = s + l + val denom = l + 1 + val j = floorPowerOfTwo(num / denom) + val offset = (num - (denom << j)).toInt + val prefixRep = modPow2(offset, j) + + (0 until j).toVector.flatMap { + idx => Vector.fill(l + bit(prefixRep, idx))(1L << idx) + } ++ List.fill(quotient(offset, j) + 1)(1L << j) + } + } + + case class CanonicalVector(rep: Vector[Int]) extends AnyVal { + def sum: Int = rep.sum + + /** + * Expands out an l-canonical representation into the original number. + * + * @param rep l-canonical representation of some number s for some l + * @return The original s + */ + def toLong: Long = + if (rep.isEmpty) 0L + else { + rep.iterator.zipWithIndex + .map { case (i, exp) => i.toLong << exp } + .reduce(_ + _) + } + + /** + * Expands out the l-canonical representation of some number s into + * a list of bucket sizes in ascending order. + * + * @param rep l-canonical representation of some number s for some l + * @return vector of powers of 2 (where ret.sum == the original s) + */ + def toBuckets: Vector[Long] = + rep.zipWithIndex.flatMap { case (i, exp) => List.fill(i)(1L << exp) } + } } diff --git a/algebird-test/src/test/scala/com/twitter/algebird/ExpHistLaws.scala b/algebird-test/src/test/scala/com/twitter/algebird/ExpHistLaws.scala index d4fe85d6e..40fe94bbb 100644 --- a/algebird-test/src/test/scala/com/twitter/algebird/ExpHistLaws.scala +++ b/algebird-test/src/test/scala/com/twitter/algebird/ExpHistLaws.scala @@ -8,7 +8,7 @@ import Arbitrary.arbitrary class ExpHistLaws extends PropSpec with PropertyChecks { import ExpHistGenerators._ - import ExpHist.{ Bucket, Config } + import ExpHist.{ Bucket, Canonical, Config } property("Increment example from DGIM paper") { // Returns a vector of bucket sizes from largest to smallest. @@ -92,7 +92,7 @@ class ExpHistLaws extends PropSpec with PropertyChecks { property("verify isPowerOfTwo") { forAll { i: PosNum[Int] => - val power = math.pow(2, i.value % 64).toLong + val power = math.pow(2, i.value % 32).toLong assert(isPowerOfTwo(power)) } } @@ -233,11 +233,11 @@ object ExpHistGenerators { } class CanonicalLaws extends PropSpec with PropertyChecks { - import Canonical._ + import ExpHist.Canonical._ property("l-canonical representation is all l or l+1s except for last") { forAll { (i: PosNum[Long], l: PosNum[Short]) => - val rep = fromLong(i.value, l.value) + val rep = fromLong(i.value, l.value).rep // all values but the last are l or l + 1 assert(rep.init.forall(v => v == l.value || v == l.value + 1)) @@ -248,14 +248,14 @@ class CanonicalLaws extends PropSpec with PropertyChecks { property("canonical representation round-trips") { forAll { (i: PosNum[Long], l: PosNum[Short]) => - assert(toLong(fromLong(i.value, l.value)) == i.value) + assert(fromLong(i.value, l.value).toLong == i.value) } } property("fromLong(i, k).sum == # of buckets required to encode i") { forAll { (i: PosNum[Long], k: PosNum[Short]) => val rep = fromLong(i.value, k.value) - val numBuckets = toBuckets(rep).size + val numBuckets = rep.toBuckets.size assert(rep.sum == numBuckets) } @@ -264,7 +264,7 @@ class CanonicalLaws extends PropSpec with PropertyChecks { property("bucketsFromLong(i, k).sum generates buckets directly") { forAll { (i: PosNum[Long], k: PosNum[Short]) => val rep = fromLong(i.value, k.value) - assert(bucketsFromLong(i.value, k.value) == toBuckets(rep)) + assert(bucketsFromLong(i.value, k.value) == rep.toBuckets) } } @@ -273,7 +273,7 @@ class CanonicalLaws extends PropSpec with PropertyChecks { val lower = k.value val upper = lower + 1 assert( - fromLong(i.value, k.value).init.forall { numBuckets => + fromLong(i.value, k.value).rep.init.forall { numBuckets => lower <= numBuckets && numBuckets <= upper }) } From df313a7224d91d547188672aa9ac9a90ee2319c9 Mon Sep 17 00:00:00 2001 From: Sam Ritchie Date: Tue, 1 Nov 2016 13:47:04 -0600 Subject: [PATCH 09/17] add more tests based on the paper --- .../com/twitter/algebird/ExpHistLaws.scala | 25 ++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/algebird-test/src/test/scala/com/twitter/algebird/ExpHistLaws.scala b/algebird-test/src/test/scala/com/twitter/algebird/ExpHistLaws.scala index 40fe94bbb..dbebdae07 100644 --- a/algebird-test/src/test/scala/com/twitter/algebird/ExpHistLaws.scala +++ b/algebird-test/src/test/scala/com/twitter/algebird/ExpHistLaws.scala @@ -97,9 +97,32 @@ class ExpHistLaws extends PropSpec with PropertyChecks { } } - property("bucket sizes are all powers of two") { + // The next two properties are invariants from the paper. + property("Invariant 1: relative error bound applies as old buckets expire") { + forAll { hist: ExpHist => + val numBuckets = hist.buckets.size + + // sequence of histograms, each with one more oldest bucket + // dropped off of its tail. + val histograms = (0 until numBuckets).scanLeft(hist) { + case (e, _) => + e.copy( + buckets = e.buckets.init, + total = e.total - e.oldestBucketSize) + } + + // every histogram's relative error stays within bounds. + histograms.foreach { e => assert(e.relativeError <= e.conf.epsilon) } + } + } + + property("Invariant 2: bucket sizes are nondecreasing powers of two") { forAll { e: ExpHist => assert(e.buckets.forall { b => isPowerOfTwo(b.size) }) + + // sizes are nondecreasing: + val sizes = e.buckets.map(_.size) + assert(sizes.sorted == sizes) } } From c360cade3704e378658b3f428676d7e2ea14fae6 Mon Sep 17 00:00:00 2001 From: Sam Ritchie Date: Wed, 2 Nov 2016 08:30:40 -0600 Subject: [PATCH 10/17] update paper link --- .../src/main/scala/com/twitter/algebird/ExpHist.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/algebird-core/src/main/scala/com/twitter/algebird/ExpHist.scala b/algebird-core/src/main/scala/com/twitter/algebird/ExpHist.scala index 1e7515a5e..e43142198 100644 --- a/algebird-core/src/main/scala/com/twitter/algebird/ExpHist.scala +++ b/algebird-core/src/main/scala/com/twitter/algebird/ExpHist.scala @@ -5,7 +5,7 @@ import scala.annotation.tailrec /** * Exponential Histogram algorithm from - * http://ilpubs.stanford.edu:8090/504/1/2001-34.pdf + * http://www-cs-students.stanford.edu/~datar/papers/sicomp_streams.pdf * * An Exponential Histogram is a sliding window counter that can * guarantee a bounded relative error. You configure the data structure with @@ -21,7 +21,6 @@ import scala.annotation.tailrec * relative error of the true count seen across the supplied * `windowSize`. * - * * Next steps: * * - combine Canonical.fromLong and Canonical.bucketsFromLong @@ -31,7 +30,6 @@ import scala.annotation.tailrec * - Discussion of how Canonical.fromLong works * * - * * @param conf the config values for this isntance. * @param buckets Vector of timestamps of each (powers of 2) * ticks. This is the core of the exponential From 8ebc9863a91a92cf371bafbe700b48acab2c514d Mon Sep 17 00:00:00 2001 From: Sam Ritchie Date: Thu, 3 Nov 2016 12:26:42 -0600 Subject: [PATCH 11/17] big documentation push --- .../scala/com/twitter/algebird/ExpHist.scala | 210 +++++++++++++++--- .../com/twitter/algebird/ExpHistLaws.scala | 37 +-- 2 files changed, 193 insertions(+), 54 deletions(-) diff --git a/algebird-core/src/main/scala/com/twitter/algebird/ExpHist.scala b/algebird-core/src/main/scala/com/twitter/algebird/ExpHist.scala index e43142198..db1d7facf 100644 --- a/algebird-core/src/main/scala/com/twitter/algebird/ExpHist.scala +++ b/algebird-core/src/main/scala/com/twitter/algebird/ExpHist.scala @@ -8,14 +8,15 @@ import scala.annotation.tailrec * http://www-cs-students.stanford.edu/~datar/papers/sicomp_streams.pdf * * An Exponential Histogram is a sliding window counter that can - * guarantee a bounded relative error. You configure the data structure with + * guarantee a bounded relative error. You configure the data + * structure with * * - epsilon, the relative error you're willing to tolerate * - windowSize, the number of time ticks that you want to track * * You interact with the data structure by adding (number, timestamp) - * pairs into the exponential histogram, and querying it for - * approximate counts. + * pairs into the exponential histogram. querying it for an + * approximate counts with `guess`. * * The approximate count is guaranteed to be within conf.epsilon * relative error of the true count seen across the supplied @@ -23,21 +24,18 @@ import scala.annotation.tailrec * * Next steps: * - * - combine Canonical.fromLong and Canonical.bucketsFromLong - * - efficient serialization using Canonical + * - efficient serialization * - Query EH with a shorter window than the configured window * - Discussion of epsilon vs memory tradeoffs - * - Discussion of how Canonical.fromLong works * * - * @param conf the config values for this isntance. + * @param conf the config values for this instance. * @param buckets Vector of timestamps of each (powers of 2) - * ticks. This is the core of the exponential - * histogram representation. See [[Canonical]] for - * more info. - * @param total total ticks tracked, == buckets.map(_.size).sum + * ticks. This is the key to the exponential histogram + * representation. See [[ExpHist.Canonical]] for more + * info. + * @param total total ticks tracked. `total == buckets.map(_.size).sum` * @param time current timestamp of this instance. - * Used with conf.windowSize to expire buckets. */ case class ExpHist(conf: ExpHist.Config, buckets: Vector[ExpHist.Bucket], total: Long, time: Long) { import ExpHist.{ Bucket, Canonical } @@ -74,9 +72,9 @@ case class ExpHist(conf: ExpHist.Config, buckets: Vector[ExpHist.Bucket], total: /** * Efficiently add many buckets at once. * - * @param unsorted [bucket]. All timestamps must be >= this.time. + * @param unsorted vector of buckets. All timestamps must be >= this.time. * @return ExpHist instance with all buckets added, stepped - * forward to the most timestamp in `unsorted`. + * forward to the max timestamp in `unsorted`. */ def addAll(unsorted: Vector[Bucket]): ExpHist = if (unsorted.isEmpty) this @@ -139,6 +137,10 @@ case class ExpHist(conf: ExpHist.Config, buckets: Vector[ExpHist.Bucket], total: } object ExpHist { + /** + * @param size number of items tracked by this bucket. + * @param timestamp timestamp of the most recent item tracked by this bucket. + */ case class Bucket(size: Long, timestamp: Long) object Bucket { @@ -149,6 +151,8 @@ object ExpHist { * ExpHist guarantees that the returned guess will be within * `epsilon` relative error of the true count across a sliding * window of size `windowSize`. + * @param epsilon relative error, from [0, 0.5] + * @param windowSize number of time ticks to track */ case class Config(epsilon: Double, windowSize: Long) { val k: Int = math.ceil(1 / epsilon).toInt @@ -165,13 +169,14 @@ object ExpHist { } /** - * Create an empty instance with the supplied Config. + * Returns an empty instance with the supplied Config. */ def empty(conf: Config): ExpHist = ExpHist(conf, Vector.empty, 0L, 0L) /** - * Generate an instance directly from a number. All buckets in the - * returned ExpHist will have the same timestamp, equal to `ts`. + * Returns an instance directly from a number `i`. All buckets in + * the returned ExpHist will have the same timestamp, equal to + * `ts`. */ def from(i: Long, ts: Long, conf: Config): ExpHist = { val buckets = Canonical.bucketsFromLong(i, conf.l).map(Bucket(_, ts)) @@ -198,7 +203,7 @@ object ExpHist { * * (rebucket only works if desired.sum == buckets.map(_.size).sum) * - * @param buckets [buckets] sorted in DESCENDING order (recent first) + * @param buckets vector of buckets sorted in DESCENDING order (recent first) * @param desired bucket sizes to rebucket `buckets` into. */ private[algebird] def rebucket(buckets: Vector[Bucket], desired: Vector[Long]): Vector[Bucket] = @@ -229,23 +234,23 @@ object ExpHist { /** * The paper that introduces the exponential histogram proves that, - * given a positive number `l`, every integer can be uniquely + * given a positive number `l`, every integer s can be uniquely * represented as the sum of * * (l or (l + 1)) * 2^i + (# from 1 to (l + 1)) 2^j * * for i = (0 to j - 1), given some j. * - * The paper calls this the "l-canonical" representation of the - * number. + * The paper calls this the "l-canonical" representation of s. * * It turns out that if you follow the exponential histogram * bucket-merging algorithm, you end up with the invariant that the * number of buckets with size 2^i exactly matches that power of 2's - * coefficient in its l-canonical representation. + * coefficient in s's l-canonical representation. * * Put another way - only sequences of buckets with sizes matching - * the l-canonical representation are valid exponential histograms. + * the l-canonical representation of some number s are valid + * exponential histograms. * * (We use this idea in `ExpHist.rebucket` to take a sequence of * buckets of any size and rebucket them into a sequence where the @@ -253,20 +258,19 @@ object ExpHist { * * This is huge. This means that you can implement `addAll(newBuckets)` by * - * - calculating newTotal = total + delta contributed by newBuckets - * - generating the l-canonical sequence of bucket sizes for newTotal + * - calculating newS = s + delta contributed by newBuckets + * - generating the l-canonical sequence of bucket sizes for newS * - rebucketing newBuckets ++ oldBuckets into those bucket sizes * * The resulting sequence of buckets is a valid exponential * histogram. */ - object Canonical { @inline private[this] def floorPowerOfTwo(x: Long): Int = JLong.numberOfTrailingZeros(JLong.highestOneBit(x)) - @inline private[this] def modPow2(i: Int, exp2: Int): Int = i & ((1 << exp2) - 1) - @inline private[this] def quotient(i: Int, exp2: Int): Int = i >>> exp2 + @inline private[this] def modPow2Minus1(i: Int, exp2: Int): Int = i & ((1 << exp2) - 1) + @inline private[this] def quotientPow2(i: Int, exp2: Int): Int = i >>> exp2 @inline private[this] def bit(i: Int, idx: Int): Int = (i >>> idx) & 1 private[this] def binarize(i: Int, bits: Int, offset: Int): Vector[Int] = @@ -278,22 +282,154 @@ object ExpHist { * @return vector of the coefficients of 2^i in the * l-canonical representation of s. * + * For example: + * + * {{{ + * scala> Canonical.fromLong(15, 2) + * res0: Vector[Int] = Vector(3, 2, 2) + * }}} + * 15 = (3 * 2^0) + (2 * 2^1) + (2 * 2^2) + * + * * the "l" in l-canonical means that * * - all return vector entries but the last one == `l` or `l + 1` * - 1 <= `returnVector.last` <= l + 1 * - * The return vector's size is the largest k such that + * ## L-Canonical Representation Generation: * - * 2^k <= (s + l) / (l + 1) + * - Find the largest j s.t. 2^j <= (s + l) / (1 + l) + * - let s' = 2^j(1 + l) - l * - * For example: + * - let diff = (s - s') is the position of s within that group. + * - let b = the little-endian binary rep of diff % (2^j - 1) + * - let ret = return vector of length j: * * {{{ - * scala> Canonical.fromLong(15, 2) - * res0: Vector[Int] = Vector(3, 2, 2) + * (0 until j).map { i => ret(i) = b(i) + l } + * ret(j) = math.floor(diff / 2^j) + * }}} + * + * + * ## Implementation Discussion + * + * The exponential histogram algorithm tracks buckets of size + * 2^i. Every new increment to the histogram adds a bucket of + * size 1. + * + * Because only l or l+1 buckets of size 2^i are allowed for each + * i, this increment might trigger an incremental merge of + * smaller buckets into larger buckets. + * + * Let's look at 10 steps of the algorithm with l == 2: + * + * 1: 1 (1 added) + * 2: 1 1 (1 added) + * 3: 1 1 1 (1 added) + * 4: 1 1 2 (1 added, triggering a 1 + 1 = 2 merge) + * 5: 1 1 1 2 (1 added) + * 6: 1 1 2 2 (1 added, triggering a 1 + 1 = 2 merge) + * 7: 1 1 1 2 2 (1 added) + * 8: 1 1 2 2 2 (1 added, triggering a 1 + 1 = 2 merge AND a 2 + 2 = 4 merge) + * 9: 1 1 1 2 2 2 (1 added) + * 10: 1 1 2 2 4 (1 added, triggering a 1 + 1 = 2 merge AND a 2 + 2 = 4 merge) + * + * Notice that the bucket sizes always sum to the algorithm step, + * ie (10 == 1 + 1 + 1 + 2 + 2 + 4). + * + * Now let's write out a list of the number of buckets of each size, ie + * [bucketsOfSize(1), bucketsOfSize(2), bucketsOfSize(4), ....] + * + * Here's the above sequence in the new representation, plus a + * few more steps: + * + * 1: 1 <-- (l + 1)2^0 - l = 3 * 2^0 - 2 = 1 + * 2: 2 + * 3: 3 + * 4: 2 1 <-- (l + 1)2^1 - l = 3 * 2^0 - 2 = 4 + * 5: 3 1 + * 6: 2 2 + * 7: 3 2 + * 8: 2 3 + * 9: 3 3 + * 10: 2 2 1 <-- (l + 1)2^2 - l = 3 * 2^0 - 2 = 10 + * 11: 3 2 1 + * 12: 2 3 1 + * 13: 3 3 1 + * 14: 2 2 2 + * 15: 3 2 2 + * 16: 2 3 2 + * 16: 3 3 2 + * 17: 2 2 3 + * + * This sequence is called the "l-canonical representation" of s. + * + * A pattern emerges! Every bucket size except the largest looks + * like a binary counter... if you added `l + 1` to the bit, and + * made the counter little-endian, so the least-significant bits + * came first. Let's call this the "binary" prefix, or "bin(_)". + * + * Here's the above sequence with the prefix decoded from + * "binary": + * + * 1: 1 <-- (l + 1)2^0 - l = 3 * 2^0 - 2 = 1 + * 2: 2 + * 3: 3 + * + * 4: bin(0) 1 <-- (l + 1)2^1 - l = 3 * 2^0 - 2 = 4 + * 5: bin(1) 1 + * 6: bin(0) 2 + * 7: bin(1) 2 + * 8: bin(0) 3 + * 9: bin(1) 3 + * + * 10: bin(0) 1 <-- (l + 1)2^2 - l = 3 * 2^0 - 2 = 10 + * 11: bin(1) 1 + * 12: bin(2) 1 + * 13: bin(3) 1 + * 14: bin(0) 2 + * 15: bin(1) 2 + * 16: bin(2) 2 + * 16: bin(3) 2 + * 17: bin(0) 3 + * + * Some observations about the pattern: + * + * The l-canonical representation groups the natural numbers into + * groups of size (l + 1)2^i for i >= 0. + * + * Each group starts at (l + 1)2^i - l (see 1, 4, 10... above) + * + * Within each group, the "binary" prefix of the l-canonical rep + * cycles from 0 to (2^i - 1), l + 1 total times. (This makes + * sense; each cycle increments the final entry by one until it + * hits l + 1; after that an increment triggers a merge and a new + * "group" begins.) + * + * The final l-canonical entry == + * + * floor((position within the group) / 2^i), or the "quotient" of + * that position and 2^i. + * + * That's all we need to know to write a procedure to generate + * the l-canonical representation! Here it is again: + * + * + * ## L-Canonical Representation Procedure: + * + * - Find the largest j s.t. 2^j <= (s + l) / (1 + l) + * - let s' = 2^j(1 + l) - l + * + * (s' is the position if the start of a group, ie 1, 4, 10...) + * + * - let diff = (s - s') is the position of s within that group. + * - let b = the little-endian binary rep of diff % (2^j - 1) + * - let ret = return vector of length j: + * + * {{{ + * (0 until j).map { i => ret(i) = b(i) + l } + * ret(j) = math.floor(diff / 2^j) * }}} - * 15 = (3 * 2^0) + (2 * 2^1) + (2 * 2^2) */ def fromLong(s: Long, l: Int): CanonicalVector = if (s <= 0) CanonicalVector(Vector.empty) @@ -303,7 +439,7 @@ object ExpHist { val j = floorPowerOfTwo(num / denom) val offset = (num - (denom << j)).toInt CanonicalVector( - binarize(modPow2(offset, j), j, l) :+ (quotient(offset, j) + 1)) + binarize(modPow2Minus1(offset, j), j, l) :+ (quotientPow2(offset, j) + 1)) } /** @@ -329,11 +465,11 @@ object ExpHist { val denom = l + 1 val j = floorPowerOfTwo(num / denom) val offset = (num - (denom << j)).toInt - val prefixRep = modPow2(offset, j) + val prefixRep = modPow2Minus1(offset, j) (0 until j).toVector.flatMap { idx => Vector.fill(l + bit(prefixRep, idx))(1L << idx) - } ++ List.fill(quotient(offset, j) + 1)(1L << j) + } ++ List.fill(quotientPow2(offset, j) + 1)(1L << j) } } diff --git a/algebird-test/src/test/scala/com/twitter/algebird/ExpHistLaws.scala b/algebird-test/src/test/scala/com/twitter/algebird/ExpHistLaws.scala index dbebdae07..38ab89646 100644 --- a/algebird-test/src/test/scala/com/twitter/algebird/ExpHistLaws.scala +++ b/algebird-test/src/test/scala/com/twitter/algebird/ExpHistLaws.scala @@ -236,23 +236,26 @@ class ExpHistLaws extends PropSpec with PropertyChecks { object ExpHistGenerators { import ExpHist.{ Bucket, Config } - implicit val arb: Arbitrary[Bucket] = - Arbitrary(for { - count <- Gen.posNum[Long] - timestamp <- Gen.posNum[Long] - } yield Bucket(count - 1L, timestamp)) - - implicit val conf: Arbitrary[Config] = - Arbitrary(for { - k <- Gen.posNum[Short] - windowSize <- Gen.posNum[Long] - } yield Config(1 / k.toDouble, windowSize)) - - implicit val expHist: Arbitrary[ExpHist] = - Arbitrary(for { - buckets <- arbitrary[Vector[Bucket]] - conf <- arbitrary[Config] - } yield ExpHist.empty(conf).addAll(buckets)) + implicit val genBucket: Gen[Bucket] = for { + count <- Gen.posNum[Long] + timestamp <- Gen.posNum[Long] + } yield Bucket(count - 1L, timestamp) + + implicit val genConfig: Gen[Config] = for { + k <- Gen.posNum[Short] + windowSize <- Gen.posNum[Long] + } yield Config(1 / k.toDouble, windowSize) + + implicit val genExpHist: Gen[ExpHist] = + for { + buckets <- Gen.containerOf[Vector, Bucket](genBucket) + conf <- genConfig + } yield ExpHist.empty(conf).addAll(buckets) + + implicit val arbBucket: Arbitrary[Bucket] = Arbitrary(genBucket) + implicit val arbConfig: Arbitrary[Config] = Arbitrary(genConfig) + implicit val arbExpHist: Arbitrary[ExpHist] = Arbitrary(genExpHist) + } class CanonicalLaws extends PropSpec with PropertyChecks { From b6a8544a5955f5546ac983c957770c6092ecb2c7 Mon Sep 17 00:00:00 2001 From: Sam Ritchie Date: Fri, 4 Nov 2016 16:23:46 -0600 Subject: [PATCH 12/17] move docs to tut template --- .../scala/com/twitter/algebird/ExpHist.scala | 121 ----------------- docs/src/main/tut/datatypes/exphist.md | 128 ++++++++++++++++++ 2 files changed, 128 insertions(+), 121 deletions(-) create mode 100644 docs/src/main/tut/datatypes/exphist.md diff --git a/algebird-core/src/main/scala/com/twitter/algebird/ExpHist.scala b/algebird-core/src/main/scala/com/twitter/algebird/ExpHist.scala index db1d7facf..6e8291ee4 100644 --- a/algebird-core/src/main/scala/com/twitter/algebird/ExpHist.scala +++ b/algebird-core/src/main/scala/com/twitter/algebird/ExpHist.scala @@ -296,132 +296,11 @@ object ExpHist { * - all return vector entries but the last one == `l` or `l + 1` * - 1 <= `returnVector.last` <= l + 1 * - * ## L-Canonical Representation Generation: - * - * - Find the largest j s.t. 2^j <= (s + l) / (1 + l) - * - let s' = 2^j(1 + l) - l - * - * - let diff = (s - s') is the position of s within that group. - * - let b = the little-endian binary rep of diff % (2^j - 1) - * - let ret = return vector of length j: - * - * {{{ - * (0 until j).map { i => ret(i) = b(i) + l } - * ret(j) = math.floor(diff / 2^j) - * }}} - * - * - * ## Implementation Discussion - * - * The exponential histogram algorithm tracks buckets of size - * 2^i. Every new increment to the histogram adds a bucket of - * size 1. - * - * Because only l or l+1 buckets of size 2^i are allowed for each - * i, this increment might trigger an incremental merge of - * smaller buckets into larger buckets. - * - * Let's look at 10 steps of the algorithm with l == 2: - * - * 1: 1 (1 added) - * 2: 1 1 (1 added) - * 3: 1 1 1 (1 added) - * 4: 1 1 2 (1 added, triggering a 1 + 1 = 2 merge) - * 5: 1 1 1 2 (1 added) - * 6: 1 1 2 2 (1 added, triggering a 1 + 1 = 2 merge) - * 7: 1 1 1 2 2 (1 added) - * 8: 1 1 2 2 2 (1 added, triggering a 1 + 1 = 2 merge AND a 2 + 2 = 4 merge) - * 9: 1 1 1 2 2 2 (1 added) - * 10: 1 1 2 2 4 (1 added, triggering a 1 + 1 = 2 merge AND a 2 + 2 = 4 merge) - * - * Notice that the bucket sizes always sum to the algorithm step, - * ie (10 == 1 + 1 + 1 + 2 + 2 + 4). - * - * Now let's write out a list of the number of buckets of each size, ie - * [bucketsOfSize(1), bucketsOfSize(2), bucketsOfSize(4), ....] - * - * Here's the above sequence in the new representation, plus a - * few more steps: - * - * 1: 1 <-- (l + 1)2^0 - l = 3 * 2^0 - 2 = 1 - * 2: 2 - * 3: 3 - * 4: 2 1 <-- (l + 1)2^1 - l = 3 * 2^0 - 2 = 4 - * 5: 3 1 - * 6: 2 2 - * 7: 3 2 - * 8: 2 3 - * 9: 3 3 - * 10: 2 2 1 <-- (l + 1)2^2 - l = 3 * 2^0 - 2 = 10 - * 11: 3 2 1 - * 12: 2 3 1 - * 13: 3 3 1 - * 14: 2 2 2 - * 15: 3 2 2 - * 16: 2 3 2 - * 16: 3 3 2 - * 17: 2 2 3 - * - * This sequence is called the "l-canonical representation" of s. - * - * A pattern emerges! Every bucket size except the largest looks - * like a binary counter... if you added `l + 1` to the bit, and - * made the counter little-endian, so the least-significant bits - * came first. Let's call this the "binary" prefix, or "bin(_)". - * - * Here's the above sequence with the prefix decoded from - * "binary": - * - * 1: 1 <-- (l + 1)2^0 - l = 3 * 2^0 - 2 = 1 - * 2: 2 - * 3: 3 - * - * 4: bin(0) 1 <-- (l + 1)2^1 - l = 3 * 2^0 - 2 = 4 - * 5: bin(1) 1 - * 6: bin(0) 2 - * 7: bin(1) 2 - * 8: bin(0) 3 - * 9: bin(1) 3 - * - * 10: bin(0) 1 <-- (l + 1)2^2 - l = 3 * 2^0 - 2 = 10 - * 11: bin(1) 1 - * 12: bin(2) 1 - * 13: bin(3) 1 - * 14: bin(0) 2 - * 15: bin(1) 2 - * 16: bin(2) 2 - * 16: bin(3) 2 - * 17: bin(0) 3 - * - * Some observations about the pattern: - * - * The l-canonical representation groups the natural numbers into - * groups of size (l + 1)2^i for i >= 0. - * - * Each group starts at (l + 1)2^i - l (see 1, 4, 10... above) - * - * Within each group, the "binary" prefix of the l-canonical rep - * cycles from 0 to (2^i - 1), l + 1 total times. (This makes - * sense; each cycle increments the final entry by one until it - * hits l + 1; after that an increment triggers a merge and a new - * "group" begins.) - * - * The final l-canonical entry == - * - * floor((position within the group) / 2^i), or the "quotient" of - * that position and 2^i. - * - * That's all we need to know to write a procedure to generate - * the l-canonical representation! Here it is again: - * - * * ## L-Canonical Representation Procedure: * * - Find the largest j s.t. 2^j <= (s + l) / (1 + l) * - let s' = 2^j(1 + l) - l * - * (s' is the position if the start of a group, ie 1, 4, 10...) - * * - let diff = (s - s') is the position of s within that group. * - let b = the little-endian binary rep of diff % (2^j - 1) * - let ret = return vector of length j: diff --git a/docs/src/main/tut/datatypes/exphist.md b/docs/src/main/tut/datatypes/exphist.md new file mode 100644 index 000000000..23806ce70 --- /dev/null +++ b/docs/src/main/tut/datatypes/exphist.md @@ -0,0 +1,128 @@ +--- +layout: docs +title: "Exponential Histogram" +section: "data" +source: "algebird-core/src/main/scala/com/twitter/algebird/ExpHist.scala" +scaladoc: "#com.twitter.algebird.ExpHist" +--- + +# Exponential Histogram + +Exponential Histogram algorithm from [Maintaining Stream Statistics over Sliding Windows](http://www-cs-students.stanford.edu/~datar/papers/sicomp_streams.pdf), by Datar, Gionis, Indyk and Motwani. + +An Exponential Histogram is a sliding window counter that can guarantee a bounded relative error. You configure the data structure with + +- epsilon, the relative error you're willing to tolerate +- windowSize, the number of time ticks that you want to track + +You interact with the data structure by adding (number, timestamp) pairs into the exponential histogram. querying it for an approximate counts with `guess`. + +The approximate count is guaranteed to be within conf.epsilon relative error of the true count seen across the supplied `windowSize`. + +## l-Canonical Representation + +The exponential histogram algorithm tracks buckets of size 2^i. Every new increment to the histogram adds a bucket of size 1. + +Because only l or l+1 buckets of size 2^i are allowed for each i, this increment might trigger an incremental merge of smaller buckets into larger buckets. + +Let's look at 10 steps of the algorithm with l == 2: + +``` +1: 1 (1 added) +2: 1 1 (1 added) +3: 1 1 1 (1 added) +4: 1 1 2 (1 added, triggering a 1 + 1 = 2 merge) +5: 1 1 1 2 (1 added) +6: 1 1 2 2 (1 added, triggering a 1 + 1 = 2 merge) +7: 1 1 1 2 2 (1 added) +8: 1 1 2 2 2 (1 added, triggering a 1 + 1 = 2 merge AND a 2 + 2 = 4 merge) +9: 1 1 1 2 2 2 (1 added) +10: 1 1 2 2 4 (1 added, triggering a 1 + 1 = 2 merge AND a 2 + 2 = 4 merge) +``` + +Notice that the bucket sizes always sum to the algorithm step, ie (10 == 1 + 1 + 1 + 2 + 2 + 4). + +Now let's write out a list of the number of buckets of each size, ie [bucketsOfSize(1), bucketsOfSize(2), bucketsOfSize(4), ....] + +Here's the above sequence in the new representation, plus a few more steps: + +``` +1: 1 <-- (l + 1)2^0 - l = 3 * 2^0 - 2 = 1 +2: 2 +3: 3 +4: 2 1 <-- (l + 1)2^1 - l = 3 * 2^0 - 2 = 4 +5: 3 1 +6: 2 2 +7: 3 2 +8: 2 3 +9: 3 3 +10: 2 2 1 <-- (l + 1)2^2 - l = 3 * 2^0 - 2 = 10 +11: 3 2 1 +12: 2 3 1 +13: 3 3 1 +14: 2 2 2 +15: 3 2 2 +16: 2 3 2 +16: 3 3 2 +17: 2 2 3 +``` + +This sequence is called the "l-canonical representation" of s. + +A pattern emerges! Every bucket size except the largest looks like a binary counter... if you added `l + 1` to the bit, and made the counter little-endian, so the least-significant bits came first. Let's call this the "binary" prefix, or "bin(_)". + +Here's the above sequence with the prefix decoded from "binary": + +``` +1: 1 <-- (l + 1)2^0 - l = 3 * 2^0 - 2 = 1 +2: 2 +3: 3 + +4: bin(0) 1 <-- (l + 1)2^1 - l = 3 * 2^0 - 2 = 4 +5: bin(1) 1 +6: bin(0) 2 +7: bin(1) 2 +8: bin(0) 3 +9: bin(1) 3 + +10: bin(0) 1 <-- (l + 1)2^2 - l = 3 * 2^0 - 2 = 10 +11: bin(1) 1 +12: bin(2) 1 +13: bin(3) 1 +14: bin(0) 2 +15: bin(1) 2 +16: bin(2) 2 +16: bin(3) 2 +17: bin(0) 3 +``` + +Some observations about the pattern: + +The l-canonical representation groups the natural numbers into groups of size (l + 1)2^i for i >= 0. + +Each group starts at (l + 1)2^i - l (see 1, 4, 10... above) + +Within each group, the "binary" prefix of the l-canonical rep cycles from 0 to (2^i - 1), l + 1 total times. (This makes sense; each cycle increments the final entry by one until it hits l + 1; after that an increment triggers a merge and a new "group" begins.) + +The final l-canonical entry == + +floor((position within the group) / 2^i), or the "quotient" of that position and 2^i. + +That's all we need to know to write a procedure to generate the l-canonical representation! Here it is again: + +## L-Canonical Representation Procedure: + +- Find the largest j s.t. 2^j <= (s + l) / (1 + l) +- let s' = 2^j(1 + l) - l + +(s' is the position if the start of a group, ie 1, 4, 10...) + +- let diff = (s - s') is the position of s within that group. +- let b = the little-endian binary rep of diff % (2^j - 1) +- let ret = return vector of length j: + + +```scala +(0 until j).map { i => ret(i) = b(i) + l } +ret(j) = math.floor(diff / 2^j) +``` From 6017ad7e799dc99a59a2e0b835d0698f25125bae Mon Sep 17 00:00:00 2001 From: Sam Ritchie Date: Fri, 4 Nov 2016 16:45:47 -0600 Subject: [PATCH 13/17] fix api docs --- README.md | 2 +- docs/src/main/tut/index.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 88dd5f82a..a1930c382 100644 --- a/README.md +++ b/README.md @@ -39,7 +39,7 @@ In the above, the class `Max[T]` signifies that the `+` operator should actually To learn more and find links to tutorials and information around the web, check out the [website](https://twitter.github.io/algebird). -The latest API docs are hosted on Algebird's [ScalaDoc index](http://twitter.github.io/algebird/api/#index). +The latest API docs are hosted on Algebird's [ScalaDoc index](http://twitter.github.io/algebird/api/). ## Get Involved + Code of Conduct diff --git a/docs/src/main/tut/index.md b/docs/src/main/tut/index.md index 02e28c0aa..0ebe48a95 100644 --- a/docs/src/main/tut/index.md +++ b/docs/src/main/tut/index.md @@ -24,7 +24,7 @@ In the above, the class `Max[T]` signifies that the `+` operator should actually ## Documentation -The latest API docs are hosted at Algebird's [ScalaDoc index](api/#index). +The latest API docs are hosted at Algebird's [ScalaDoc index](api/). ## Get Involved + Code of Conduct From 606715e4acd0f00c6b55af5deaad05909774d554 Mon Sep 17 00:00:00 2001 From: Sam Ritchie Date: Tue, 8 Nov 2016 11:25:18 -0700 Subject: [PATCH 14/17] final push --- .../scala/com/twitter/algebird/ExpHist.scala | 22 ++++++++++ .../{exphist.md => exponential_histogram.md} | 44 +++++++++---------- 2 files changed, 42 insertions(+), 24 deletions(-) rename docs/src/main/tut/datatypes/{exphist.md => exponential_histogram.md} (58%) diff --git a/algebird-core/src/main/scala/com/twitter/algebird/ExpHist.scala b/algebird-core/src/main/scala/com/twitter/algebird/ExpHist.scala index 6e8291ee4..1081588be 100644 --- a/algebird-core/src/main/scala/com/twitter/algebird/ExpHist.scala +++ b/algebird-core/src/main/scala/com/twitter/algebird/ExpHist.scala @@ -89,6 +89,13 @@ case class ExpHist(conf: ExpHist.Config, buckets: Vector[ExpHist.Bucket], total: } } + /** + * Returns a [[Fold]] instance that uses `add` to accumulate deltas + * into this exponential histogram instance. + */ + def fold: Fold[ExpHist, (Long, Long)] = + Fold.foldLeft(this) { case (e, (delta, timestamp)) => e.add(delta, timestamp) } + // This internal method assumes that the instance is stepped forward // already, and does NOT try to step internally. It also assumes // that `items` is sorted in ASCENDING order, with newer items on @@ -124,6 +131,13 @@ case class ExpHist(conf: ExpHist.Config, buckets: Vector[ExpHist.Bucket], total: if (total == 0) 0.0 else (total - (oldestBucketSize - 1) / 2.0) + /** + * Returns an Approximate instance encoding the bounds and the + * closest long to the estimated count tracked by this instance. + */ + def approx: Approximate[Long] = + Approximate(lowerBoundSum, math.round(guess), upperBoundSum, 1.0) + /** * relative error of guess, guaranteed to be <= conf.epsilon. */ @@ -166,6 +180,14 @@ object ExpHist { // configured window and the supplied current time. def dropExpired(buckets: Vector[Bucket], currTime: Long): (Long, Vector[Bucket]) = ExpHist.dropExpired(buckets, expiration(currTime)) + + /** + * Returns a [[Fold]] instance that uses `add` to accumulate deltas + * into an empty exponential histogram instance configured with + * this Config. + */ + def fold: Fold[ExpHist, (Long, Long)] = + Fold.foldLeft(ExpHist.empty(this)) { case (e, (delta, timestamp)) => e.add(delta, timestamp) } } /** diff --git a/docs/src/main/tut/datatypes/exphist.md b/docs/src/main/tut/datatypes/exponential_histogram.md similarity index 58% rename from docs/src/main/tut/datatypes/exphist.md rename to docs/src/main/tut/datatypes/exponential_histogram.md index 23806ce70..c8d3e5eee 100644 --- a/docs/src/main/tut/datatypes/exphist.md +++ b/docs/src/main/tut/datatypes/exponential_histogram.md @@ -12,20 +12,20 @@ Exponential Histogram algorithm from [Maintaining Stream Statistics over Sliding An Exponential Histogram is a sliding window counter that can guarantee a bounded relative error. You configure the data structure with -- epsilon, the relative error you're willing to tolerate -- windowSize, the number of time ticks that you want to track +- `epsilon`, the relative error you're willing to tolerate +- `windowSize`, the number of time ticks that you want to track You interact with the data structure by adding (number, timestamp) pairs into the exponential histogram. querying it for an approximate counts with `guess`. -The approximate count is guaranteed to be within conf.epsilon relative error of the true count seen across the supplied `windowSize`. +The approximate count is guaranteed to be within `conf.epsilon` relative error of the true count seen across the supplied `windowSize`. ## l-Canonical Representation -The exponential histogram algorithm tracks buckets of size 2^i. Every new increment to the histogram adds a bucket of size 1. +The exponential histogram algorithm tracks buckets of size `2^i`. Every new increment to the histogram adds a bucket of size 1. -Because only l or l+1 buckets of size 2^i are allowed for each i, this increment might trigger an incremental merge of smaller buckets into larger buckets. +Because only `l` or `l+1` buckets of size `2^i` are allowed for each `i`, this increment might trigger an incremental merge of smaller buckets into larger buckets. -Let's look at 10 steps of the algorithm with l == 2: +Let's look at 10 steps of the algorithm with `l == 2`: ``` 1: 1 (1 added) @@ -40,11 +40,9 @@ Let's look at 10 steps of the algorithm with l == 2: 10: 1 1 2 2 4 (1 added, triggering a 1 + 1 = 2 merge AND a 2 + 2 = 4 merge) ``` -Notice that the bucket sizes always sum to the algorithm step, ie (10 == 1 + 1 + 1 + 2 + 2 + 4). +Notice that the bucket sizes always sum to the algorithm step, ie `10 == 1 + 1 + 1 + 2 + 2 + 4`. -Now let's write out a list of the number of buckets of each size, ie [bucketsOfSize(1), bucketsOfSize(2), bucketsOfSize(4), ....] - -Here's the above sequence in the new representation, plus a few more steps: +Now let's write out a list of the number of buckets of each size, ie `[bucketsOfSize(1), bucketsOfSize(2), bucketsOfSize(4), ....]`. Here's the above sequence in the new representation, plus a few more steps: ``` 1: 1 <-- (l + 1)2^0 - l = 3 * 2^0 - 2 = 1 @@ -67,9 +65,9 @@ Here's the above sequence in the new representation, plus a few more steps: 17: 2 2 3 ``` -This sequence is called the "l-canonical representation" of s. +This sequence is called the "l-canonical representation" of `s`. -A pattern emerges! Every bucket size except the largest looks like a binary counter... if you added `l + 1` to the bit, and made the counter little-endian, so the least-significant bits came first. Let's call this the "binary" prefix, or "bin(_)". +A pattern emerges! Every bucket size except the largest looks like a binary counter (if you added `l + 1` to the bit, and made the counter little-endian). Let's call this the "binary" prefix, or `bin(_)`. Here's the above sequence with the prefix decoded from "binary": @@ -98,28 +96,26 @@ Here's the above sequence with the prefix decoded from "binary": Some observations about the pattern: -The l-canonical representation groups the natural numbers into groups of size (l + 1)2^i for i >= 0. - -Each group starts at (l + 1)2^i - l (see 1, 4, 10... above) +The l-canonical representation groups the natural numbers into groups of size `(l + 1)2^i` for `i >= 0`. -Within each group, the "binary" prefix of the l-canonical rep cycles from 0 to (2^i - 1), l + 1 total times. (This makes sense; each cycle increments the final entry by one until it hits l + 1; after that an increment triggers a merge and a new "group" begins.) +Each group starts at `(l + 1)2^i - l` (see 1, 4, 10... above) -The final l-canonical entry == +Within each group, the "binary" prefix of the l-canonical rep cycles from `0` to `(2^i - 1)`, `l + 1` total times. (This makes sense; each cycle increments the final entry by one until it hits `l + 1`; after that an increment triggers a merge and a new "group" begins.) -floor((position within the group) / 2^i), or the "quotient" of that position and 2^i. +The final l-canonical entry == `floor((position within the group) / 2^i)`, or the "quotient" of that position and `2^i`. That's all we need to know to write a procedure to generate the l-canonical representation! Here it is again: ## L-Canonical Representation Procedure: -- Find the largest j s.t. 2^j <= (s + l) / (1 + l) -- let s' = 2^j(1 + l) - l +- Find the largest `j` s.t. `2^j <= (s + l) / (1 + l)` +- let `s' := 2^j(1 + l) - l` -(s' is the position if the start of a group, ie 1, 4, 10...) +(`s'` is the position if the start of a group, ie 1, 4, 10...) -- let diff = (s - s') is the position of s within that group. -- let b = the little-endian binary rep of diff % (2^j - 1) -- let ret = return vector of length j: +- `diff := (s - s')` is the position of s within that group. +- let `b :=` the little-endian binary rep of `diff % (2^j - 1)` +- let `ret :=` return vector of length `j`: ```scala From 45f8c5aa04e3f82093442323e303e86d054082a7 Mon Sep 17 00:00:00 2001 From: Sam Ritchie Date: Tue, 8 Nov 2016 11:50:02 -0700 Subject: [PATCH 15/17] add demo --- .../scala/com/twitter/algebird/ExpHist.scala | 25 ++++++------ .../tut/datatypes/exponential_histogram.md | 38 ++++++++++++++++++- 2 files changed, 50 insertions(+), 13 deletions(-) diff --git a/algebird-core/src/main/scala/com/twitter/algebird/ExpHist.scala b/algebird-core/src/main/scala/com/twitter/algebird/ExpHist.scala index 1081588be..3d8047d6d 100644 --- a/algebird-core/src/main/scala/com/twitter/algebird/ExpHist.scala +++ b/algebird-core/src/main/scala/com/twitter/algebird/ExpHist.scala @@ -90,11 +90,13 @@ case class ExpHist(conf: ExpHist.Config, buckets: Vector[ExpHist.Bucket], total: } /** - * Returns a [[Fold]] instance that uses `add` to accumulate deltas - * into this exponential histogram instance. - */ - def fold: Fold[ExpHist, (Long, Long)] = - Fold.foldLeft(this) { case (e, (delta, timestamp)) => e.add(delta, timestamp) } + * Returns a [[Fold]] instance that uses `add` to accumulate deltas + * into this exponential histogram instance. + */ + def fold: Fold[Bucket, ExpHist] = + Fold.foldLeft(this) { + case (e, Bucket(delta, timestamp)) => e.add(delta, timestamp) + } // This internal method assumes that the instance is stepped forward // already, and does NOT try to step internally. It also assumes @@ -132,10 +134,10 @@ case class ExpHist(conf: ExpHist.Config, buckets: Vector[ExpHist.Bucket], total: else (total - (oldestBucketSize - 1) / 2.0) /** - * Returns an Approximate instance encoding the bounds and the - * closest long to the estimated count tracked by this instance. - */ - def approx: Approximate[Long] = + * Returns an Approximate instance encoding the bounds and the + * closest long to the estimated sum tracked by this instance. + */ + def approximateSum: Approximate[Long] = Approximate(lowerBoundSum, math.round(guess), upperBoundSum, 1.0) /** @@ -159,7 +161,7 @@ object ExpHist { object Bucket { implicit val ord: Ordering[Bucket] = Ordering.by { b: Bucket => (b.timestamp, b.size) } - } + } /** * ExpHist guarantees that the returned guess will be within @@ -186,8 +188,7 @@ object ExpHist { * into an empty exponential histogram instance configured with * this Config. */ - def fold: Fold[ExpHist, (Long, Long)] = - Fold.foldLeft(ExpHist.empty(this)) { case (e, (delta, timestamp)) => e.add(delta, timestamp) } + def fold: Fold[Bucket, ExpHist] = ExpHist.empty(this).fold } /** diff --git a/docs/src/main/tut/datatypes/exponential_histogram.md b/docs/src/main/tut/datatypes/exponential_histogram.md index c8d3e5eee..15be226c5 100644 --- a/docs/src/main/tut/datatypes/exponential_histogram.md +++ b/docs/src/main/tut/datatypes/exponential_histogram.md @@ -8,7 +8,7 @@ scaladoc: "#com.twitter.algebird.ExpHist" # Exponential Histogram -Exponential Histogram algorithm from [Maintaining Stream Statistics over Sliding Windows](http://www-cs-students.stanford.edu/~datar/papers/sicomp_streams.pdf), by Datar, Gionis, Indyk and Motwani. +The `ExpHist` data structure implements the Exponential Histogram algorithm from [Maintaining Stream Statistics over Sliding Windows](http://www-cs-students.stanford.edu/~datar/papers/sicomp_streams.pdf), by Datar, Gionis, Indyk and Motwani. An Exponential Histogram is a sliding window counter that can guarantee a bounded relative error. You configure the data structure with @@ -19,6 +19,42 @@ You interact with the data structure by adding (number, timestamp) pairs into th The approximate count is guaranteed to be within `conf.epsilon` relative error of the true count seen across the supplied `windowSize`. +## Example Usage + +Let's set up a bunch of buckets to add into our exponential histogram. Each bucket tracks a delta and a timestamp. This example uses the same number for both, for simplicity. + +```tut:book +import com.twitter.algebird.ExpHist +import ExpHist.{ Bucket, Config } + +val maxTimestamp = 200 +val inputs = (1 to maxTimestamp).map { i => ExpHist.Bucket(i, i) }.toVector + +val actualSum = inputs.map(_.size).sum +``` + +Now we'll configure an instance of `ExpHist` to track the count and add each of our buckets in. + +```tut:book +val epsilon = 0.01 +val windowSize = maxTimestamp +val eh = ExpHist.empty(Config(epsilon, windowSize)) +val full = inputs.foldLeft(eh) { + case (histogram, Bucket(delta, timestamp)) => histogram.add(delta, timestamp) +} +``` + +Now we can query the full exponential histogram and compare the guess to the actual sum: + +```tut:book +val approximateSum = full.guess +full.relativeError +val maxError = actualSum * full.relativeError + +assert(full.guess <= actualSum + maxError) +assert(full.guess >= actualSum - maxError) +``` + ## l-Canonical Representation The exponential histogram algorithm tracks buckets of size `2^i`. Every new increment to the histogram adds a bucket of size 1. From 752fc07aecef5601adad59b7e5b3894cb7b027ac Mon Sep 17 00:00:00 2001 From: Sam Ritchie Date: Fri, 11 Nov 2016 14:17:26 -0500 Subject: [PATCH 16/17] oscar's comments --- .../scala/com/twitter/algebird/ExpHist.scala | 72 +++++++++++-------- .../com/twitter/algebird/ExpHistLaws.scala | 39 ++++++---- 2 files changed, 70 insertions(+), 41 deletions(-) diff --git a/algebird-core/src/main/scala/com/twitter/algebird/ExpHist.scala b/algebird-core/src/main/scala/com/twitter/algebird/ExpHist.scala index 3d8047d6d..87ae911e5 100644 --- a/algebird-core/src/main/scala/com/twitter/algebird/ExpHist.scala +++ b/algebird-core/src/main/scala/com/twitter/algebird/ExpHist.scala @@ -2,6 +2,7 @@ package com.twitter.algebird import java.lang.{ Long => JLong } import scala.annotation.tailrec +import scala.collection.mutable.Builder /** * Exponential Histogram algorithm from @@ -37,8 +38,8 @@ import scala.annotation.tailrec * @param total total ticks tracked. `total == buckets.map(_.size).sum` * @param time current timestamp of this instance. */ -case class ExpHist(conf: ExpHist.Config, buckets: Vector[ExpHist.Bucket], total: Long, time: Long) { - import ExpHist.{ Bucket, Canonical } +case class ExpHist(conf: ExpHist.Config, buckets: Vector[ExpHist.Bucket], total: Long, time: ExpHist.Timestamp) { + import ExpHist.{ Bucket, Canonical, Timestamp } /** * Steps this instance forward to the new supplied time. Any @@ -48,7 +49,7 @@ case class ExpHist(conf: ExpHist.Config, buckets: Vector[ExpHist.Bucket], total: * @param newTime the new current time. * @return ExpHist instance stepped forward to newTime. */ - def step(newTime: Long): ExpHist = + def step(newTime: Timestamp): ExpHist = if (newTime <= time) this else { val (dropped, filtered) = conf.dropExpired(buckets, newTime) @@ -58,15 +59,15 @@ case class ExpHist(conf: ExpHist.Config, buckets: Vector[ExpHist.Bucket], total: /** * Increment ExpHist by 1 at the supplied timestamp. */ - def inc(timestamp: Long): ExpHist = add(1L, timestamp) + def inc(ts: Timestamp): ExpHist = add(1L, ts) /** * Increment ExpHist by delta at the supplied timestamp. */ - def add(delta: Long, timestamp: Long): ExpHist = { - val self = step(timestamp) + def add(delta: Long, ts: Timestamp): ExpHist = { + val self = step(ts) if (delta == 0) self - else self.addAllWithoutStep(Vector(Bucket(delta, timestamp)), delta) + else self.addAllWithoutStep(Vector(Bucket(delta, ts)), delta) } /** @@ -79,12 +80,13 @@ case class ExpHist(conf: ExpHist.Config, buckets: Vector[ExpHist.Bucket], total: def addAll(unsorted: Vector[Bucket]): ExpHist = if (unsorted.isEmpty) this else { - val sorted = unsorted.sorted(Ordering[Bucket].reverse) - val delta = sorted.map(_.size).sum - val timestamp = sorted.head.timestamp - if (delta == 0) - step(timestamp) - else { + val delta = unsorted.map(_.size).sum + + if (delta == 0) { + step(unsorted.maxBy(_.timestamp).timestamp) + } else { + val sorted = unsorted.sorted(Ordering[Bucket].reverse) + val timestamp = sorted.head.timestamp addAllWithoutStep(sorted, delta).step(timestamp) } } @@ -94,9 +96,10 @@ case class ExpHist(conf: ExpHist.Config, buckets: Vector[ExpHist.Bucket], total: * into this exponential histogram instance. */ def fold: Fold[Bucket, ExpHist] = - Fold.foldLeft(this) { - case (e, Bucket(delta, timestamp)) => e.add(delta, timestamp) - } + Fold.foldMutable[Builder[Bucket, Vector[Bucket]], Bucket, ExpHist]( + { case (b, bucket) => b += bucket }, + { _ => Vector.newBuilder[Bucket] }, + { x => addAll(x.result) }) // This internal method assumes that the instance is stepped forward // already, and does NOT try to step internally. It also assumes @@ -153,15 +156,26 @@ case class ExpHist(conf: ExpHist.Config, buckets: Vector[ExpHist.Bucket], total: } object ExpHist { + /** + * Value class wrapper around timestamps (>= 0) used by each bucket. + */ + case class Timestamp(toLong: Long) extends AnyVal { + def <=(r: Timestamp): Boolean = toLong <= r.toLong + def >(r: Timestamp): Boolean = toLong > r.toLong + } + object Timestamp { + implicit val ord: Ordering[Timestamp] = Ordering.by(_.toLong) + } + /** * @param size number of items tracked by this bucket. * @param timestamp timestamp of the most recent item tracked by this bucket. */ - case class Bucket(size: Long, timestamp: Long) + case class Bucket(size: Long, timestamp: Timestamp) object Bucket { implicit val ord: Ordering[Bucket] = Ordering.by { b: Bucket => (b.timestamp, b.size) } - } + } /** * ExpHist guarantees that the returned guess will be within @@ -176,11 +190,11 @@ object ExpHist { // Returns the last timestamp before the window. any ts <= [the // returned timestamp] is outside the window. - def expiration(currTime: Long): Long = currTime - windowSize + def expiration(currTime: Timestamp): Timestamp = Timestamp(currTime.toLong - windowSize) // Drops all buckets with an expired timestamp, based on the // configured window and the supplied current time. - def dropExpired(buckets: Vector[Bucket], currTime: Long): (Long, Vector[Bucket]) = + def dropExpired(buckets: Vector[Bucket], currTime: Timestamp): (Long, Vector[Bucket]) = ExpHist.dropExpired(buckets, expiration(currTime)) /** @@ -194,14 +208,14 @@ object ExpHist { /** * Returns an empty instance with the supplied Config. */ - def empty(conf: Config): ExpHist = ExpHist(conf, Vector.empty, 0L, 0L) + def empty(conf: Config): ExpHist = ExpHist(conf, Vector.empty, 0L, Timestamp(0L)) /** * Returns an instance directly from a number `i`. All buckets in * the returned ExpHist will have the same timestamp, equal to * `ts`. */ - def from(i: Long, ts: Long, conf: Config): ExpHist = { + def from(i: Long, ts: Timestamp, conf: Config): ExpHist = { val buckets = Canonical.bucketsFromLong(i, conf.l).map(Bucket(_, ts)) ExpHist(conf, buckets, i, ts) } @@ -211,7 +225,7 @@ object ExpHist { * @param cutoff buckets with ts <= cutoff are expired * @return the sum of evicted bucket sizes and the unexpired buckets */ - def dropExpired(buckets: Vector[Bucket], cutoff: Long): (Long, Vector[Bucket]) = { + def dropExpired(buckets: Vector[Bucket], cutoff: Timestamp): (Long, Vector[Bucket]) = { val (dropped, remaining) = buckets.reverse.span(_.timestamp <= cutoff) (dropped.map(_.size).sum, remaining.reverse) } @@ -385,12 +399,9 @@ object ExpHist { * @return The original s */ def toLong: Long = - if (rep.isEmpty) 0L - else { + Monoid.sum( rep.iterator.zipWithIndex - .map { case (i, exp) => i.toLong << exp } - .reduce(_ + _) - } + .map { case (i, exp) => i.toLong << exp }) /** * Expands out the l-canonical representation of some number s into @@ -400,6 +411,9 @@ object ExpHist { * @return vector of powers of 2 (where ret.sum == the original s) */ def toBuckets: Vector[Long] = - rep.zipWithIndex.flatMap { case (i, exp) => List.fill(i)(1L << exp) } + rep.iterator + .zipWithIndex + .flatMap { case (i, exp) => Iterator.fill(i)(1L << exp) } + .toVector } } diff --git a/algebird-test/src/test/scala/com/twitter/algebird/ExpHistLaws.scala b/algebird-test/src/test/scala/com/twitter/algebird/ExpHistLaws.scala index 38ab89646..f2b139a4e 100644 --- a/algebird-test/src/test/scala/com/twitter/algebird/ExpHistLaws.scala +++ b/algebird-test/src/test/scala/com/twitter/algebird/ExpHistLaws.scala @@ -8,7 +8,7 @@ import Arbitrary.arbitrary class ExpHistLaws extends PropSpec with PropertyChecks { import ExpHistGenerators._ - import ExpHist.{ Bucket, Canonical, Config } + import ExpHist.{ Bucket, Canonical, Config, Timestamp } property("Increment example from DGIM paper") { // Returns a vector of bucket sizes from largest to smallest. @@ -17,9 +17,9 @@ class ExpHistLaws extends PropSpec with PropertyChecks { // epsilon of 0.5 gives us window sizes of 1 or 2. val e = ExpHist.empty(Config(0.5, 100)) - val plus76 = e.add(76, 0) - val inc = plus76.inc(0) - val twoMore = inc.add(2, 0) + val plus76 = e.add(76, Timestamp(0)) + val inc = plus76.inc(Timestamp(0)) + val twoMore = inc.add(2, Timestamp(0)) assert(bSizes(plus76) == Vector(32, 16, 8, 8, 4, 4, 2, 1, 1)) assert(bSizes(inc) == Vector(32, 16, 8, 8, 4, 4, 2, 2, 1)) @@ -53,7 +53,7 @@ class ExpHistLaws extends PropSpec with PropertyChecks { * Returns the ACTUAL sum of the supplied vector of buckets, * filtering out any bucket with a timestamp <= exclusiveCutoff. */ - def actualBucketSum(buckets: Vector[Bucket], exclusiveCutoff: Long): Long = + def actualBucketSum(buckets: Vector[Bucket], exclusiveCutoff: Timestamp): Long = buckets.collect { case Bucket(count, ts) if ts > exclusiveCutoff => count }.sum @@ -73,6 +73,18 @@ class ExpHistLaws extends PropSpec with PropertyChecks { } } + property("ExpHist.fold satisfies core properties") { + forAll { (v: NonEmptyVector[Bucket], conf: Config) => + val buckets = v.items + val mostRecentTs = buckets.maxBy(_.timestamp).timestamp + val cutoff = conf.expiration(mostRecentTs) + + val fullViaFold = ExpHist.empty(conf).fold.overTraversable(buckets) + val actualSum = actualBucketSum(buckets, cutoff) + checkCoreProperties(fullViaFold, actualSum) + } + } + property("ExpHist.add satisfies core properties") { forAll { (items: NonEmptyVector[Bucket], conf: Config) => val buckets = items.sorted @@ -173,8 +185,8 @@ class ExpHistLaws extends PropSpec with PropertyChecks { } property("step(t) == add(0, t)") { - forAll { (expHist: ExpHist, ts: PosNum[Long]) => - assert(expHist.step(ts.value) == expHist.add(0, ts.value)) + forAll { (expHist: ExpHist, ts: Timestamp) => + assert(expHist.step(ts) == expHist.add(0, ts)) } } @@ -202,9 +214,9 @@ class ExpHistLaws extends PropSpec with PropertyChecks { } property("dropExpired works properly") { - forAll { (v: NonEmptyVector[Bucket], window: PosNum[Long]) => + forAll { (v: NonEmptyVector[Bucket], conf: Config) => val buckets = v.sorted.reverse - val cutoff = buckets.head.timestamp - window.value + val cutoff = conf.expiration(buckets.head.timestamp) val (droppedSum, remaining) = ExpHist.dropExpired(buckets, cutoff) @@ -234,11 +246,14 @@ class ExpHistLaws extends PropSpec with PropertyChecks { } object ExpHistGenerators { - import ExpHist.{ Bucket, Config } + import ExpHist.{ Bucket, Config, Timestamp } + + implicit val genTimestamp: Gen[Timestamp] = + Gen.posNum[Long].map(Timestamp(_)) implicit val genBucket: Gen[Bucket] = for { count <- Gen.posNum[Long] - timestamp <- Gen.posNum[Long] + timestamp <- genTimestamp } yield Bucket(count - 1L, timestamp) implicit val genConfig: Gen[Config] = for { @@ -252,10 +267,10 @@ object ExpHistGenerators { conf <- genConfig } yield ExpHist.empty(conf).addAll(buckets) + implicit val arbTs: Arbitrary[Timestamp] = Arbitrary(genTimestamp) implicit val arbBucket: Arbitrary[Bucket] = Arbitrary(genBucket) implicit val arbConfig: Arbitrary[Config] = Arbitrary(genConfig) implicit val arbExpHist: Arbitrary[ExpHist] = Arbitrary(genExpHist) - } class CanonicalLaws extends PropSpec with PropertyChecks { From 84b299be4376958f51b768b9beba97f12ea33549 Mon Sep 17 00:00:00 2001 From: Sam Ritchie Date: Fri, 11 Nov 2016 14:35:45 -0500 Subject: [PATCH 17/17] fix docs --- docs/src/main/tut/datatypes/exponential_histogram.md | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/docs/src/main/tut/datatypes/exponential_histogram.md b/docs/src/main/tut/datatypes/exponential_histogram.md index 15be226c5..cc96ffddb 100644 --- a/docs/src/main/tut/datatypes/exponential_histogram.md +++ b/docs/src/main/tut/datatypes/exponential_histogram.md @@ -25,10 +25,12 @@ Let's set up a bunch of buckets to add into our exponential histogram. Each buck ```tut:book import com.twitter.algebird.ExpHist -import ExpHist.{ Bucket, Config } +import ExpHist.{ Bucket, Config, Timestamp } val maxTimestamp = 200 -val inputs = (1 to maxTimestamp).map { i => ExpHist.Bucket(i, i) }.toVector +val inputs = (1 to maxTimestamp).map { + i => ExpHist.Bucket(i, Timestamp(i)) + }.toVector val actualSum = inputs.map(_.size).sum ```