Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions algebird-core/src/main/scala/com/twitter/algebird/Aggregator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import scala.collection.generic.CanBuildFrom
object Aggregator extends java.io.Serializable {
implicit def applicative[I]: Applicative[({ type L[O] = Aggregator[I, _, O] })#L] = new AggregatorApplicative[I]

private val DefaultSeed = 471312384

/**
* This is a trivial aggregator that always returns a single value
*/
Expand Down Expand Up @@ -211,6 +213,30 @@ object Aggregator extends java.io.Serializable {
*/
def immutableSortedReverseTake[T: Ordering](count: Int): MonoidAggregator[T, TopK[T], Seq[T]] =
new TopKToListAggregator[T](count)(implicitly[Ordering[T]].reverse)
/**
* Randomly selects input items where each item has an independent probability 'prob' of being
* selected. This assumes that all sampled records can fit in memory, so use this only when the
* expected number of sampled values is small.
*/
def randomSample[T](prob: Double, seed: Int = DefaultSeed): MonoidAggregator[T, List[T], List[T]] = {
assert(prob >= 0 && prob <= 1, "randomSample.prob must lie in [0, 1]")
val rng = new java.util.Random(seed)
Preparer[T]
.filter(_ => rng.nextDouble() <= prob)
.monoidAggregate(toList)
}
/**
* Selects exactly 'count' of the input records randomly (or all of the records if there are less
* then 'count' total records). This assumes that all 'count' of the records can fit in memory,
* so use this only for small values of 'count'.
*/
def reservoirSample[T](count: Int, seed: Int = DefaultSeed): MonoidAggregator[T, PriorityQueue[(Double, T)], Seq[T]] = {
val rng = new java.util.Random(seed)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I chatted @joshualande offline about this, and this is a recap:
When this runs as distributed MapReduce, each jvm ends up using identical pseudo-rng that will pick same record positions when processing its respective split. Since this is not equivalent to the single-jvm execution it's a problem.

We can define rng lazy to make sure they each rng is inited independently, and of course we need an independent seed. It should not be the default JDK implementation of Random() with currentTimeMillis. With this it will be impossible to fulfill the MR contract that the computation should be deterministic for multiple attempts of the same task. A typical way to solve it is to use mapreduce.task.id to create seed deterministically.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, in map-reduce this is a real issue. We could possible mitigate it somewhat by doing:

.map { t => ((t.hashCode & Int.MaxValue) ^ rng.nextInt(), t) }

rather than just nextDouble

which would at least mix data from the item along with the random variable, but still if the same T instance occurs in the same order on two different machines, they will certainly both be selected.

we should make it a bit easier to access the task id in scalding (right now you could do it using https://github.com/twitter/scalding/blob/develop/scalding-core/src/main/scala/com/twitter/scalding/Stats.scala#L142 and the UniqueID

Preparer[T]
.map(rng.nextDouble() -> _)
.monoidAggregate(sortByTake(count)(_._1))
.andThenPresent(_.map(_._2))
}
/**
* Put everything in a List. Note, this could fill the memory if the List is very large.
*/
Expand Down