Skip to content

Conversation

@joshualande
Copy link
Contributor

For my work, I needed an Aggregator that would generate a random sample of a few records. I created an Aggregator that generates this random sample by appending a random number to each record and then taking the smallest n records based on the random number. I think this is related to reservoir sampling.

I did a code search in Algebird and couldn't find anything exactly like what I needed, although I did find that in Scalding the Grouped class has a bufferedTake function:

https://github.com/twitter/scalding/blob/develop/scalding-core/src/main/scala/com/twitter/scalding/typed/Grouped.scala

      // By default, there is no ordering. This method is overridden
      // in IdentityValueSortedReduce
      // Note, this is going to bias toward low hashcode items.
      // If you care which items you take, you should sort by a random number
      // or the value itself.
      val fakeOrdering: Ordering[V1] = Ordering.by { v: V1 => v.hashCode }
      implicit val mon = new PriorityQueueMonoid[V1](n)(fakeOrdering)

In that code, it alludes to the fact that using the hashCode does not ensure randomness and it is better to sort based on a random number. So overall, I am hopefully that my code is a good approach.

I am not sure if there is a good way to write unit tests for this code, although I would be open to any suggestions!

Here is some quick repl code to test it out:

 ./sbt algebird-core/console
scala> import com.twitter.algebird.Aggregator
import com.twitter.algebird.Aggregator

scala> val rng = new java.util.Random(123L)
rng: java.util.Random = java.util.Random@3f44d99d

scala> val agg = Aggregator.randomSample[Int](rng)(3)
agg: com.twitter.algebird.MonoidAggregator[Int,java.util.PriorityQueue[(Double, Int)],Seq[Int]] = com.twitter.algebird.MonoidAggregator$$anon$5@3297aa7b

scala> agg(1 to 100)
res9: Seq[Int] = List(73, 40, 41)

scala> agg(1 to 100)
res10: Seq[Int] = List(3, 65, 15)

scala> agg(1 to 100)
res11: Seq[Int] = List(47, 100, 64)

scala> agg(1 to 2)
res12: Seq[Int] = List(1, 2)

@avibryant
Copy link
Contributor

This looks great. One thing I'm not sure about: should the API be able to take a random seed instead of a Random instance? Should there be a default seed so that you don't have to provide anything?

@joshualande
Copy link
Contributor Author

Hi Avi!

Great question. I though about this a bit, but I don't have a strong preference for one way over the other.

I did a code search in Algebird before posting the review, and picked this way to be consistent with the API of GaussianDistribution class (
https://github.com/twitter/algebird/blob/7bcbc68776b7ecc01cc005a223755a7722c6f6dd/algebird-core/src/main/scala/com/twitter/algebird/statistics/GaussianDistributionMonoid.scala).

I guess in principle, it might be more efficient to use one random number generator thought all code, instead of building one per object? Although I would bet this is a totally unnecessary optimization.

Looking more through the code base, it seems that other classes (including SketchMapParams) take in a seed and build the RNG directly. Also, that class does not take a default seed. My fear with allowing there to be a default seed is that users would be hesitant to change the default which seems like it isn't ideal?

Another cool thing might be to make the RNG implicit:

def randomSample[T]((count: Int)(implicit r: java.util.Random) ...

That way, you could set it once at the top of your code. But I guess this would only be nice if the rest of Algebird expected an implicit RNG.

Anyway, this is a long-winded way of saying I am happy to take in the RNG in whatever way you think is best!

* Selects exactly 'count' of the input records randomly (or all of the records if there are less
* then 'count' total records). This assumes that all 'count' of the records can fit in memory,
* so use this only for small values of 'count'.
*/
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think this is more commonly called Reservoir Sampling: https://en.wikipedia.org/wiki/Reservoir_sampling

random sample is imprecise (could also be constant fraction).

My preference would be for a default (constant) seed version also, and in scala, we generally prefer multiple parameter groups only for functions (or in some esoteric cases where path-dependent types require it). So, can we do:
def reservoirSample[T](r: java.util.Random, count: Int): ...

@joshualande
Copy link
Contributor Author

@johnynek thanks for your feedback!

I updated the code to be called reservoirSample and take a non-curried default seed as @avibryant suggested!

I also realized that for completeness, it could be useful to include an aggregator that does the sampling where we pick each item with a given probability. So I added another randomSample aggregator that takes in a probability to do this other kind of sampling. Does this seem useful? And also, is there a more precise name we can give this? I did a brief google search, but couldn't find an obviously-better name.

Thanks again for the help!


A bit of testing code:

scala> import com.twitter.algebird.Aggregator
import com.twitter.algebird.Aggregator

scala> val randomSample = Aggregator.randomSample[Int](prob=0.03, seed=123)
randomSample: com.twitter.algebird.MonoidAggregator[Int,List[Int],List[Int]] = com.twitter.algebird.MonoidAggregator$$anon$6@69b78d37

scala> randomSample(1 to 100)
res0: List[Int] = List(40, 73)

scala> randomSample(1 to 100)
res1: List[Int] = List(3, 8, 15, 65, 74)

scala> randomSample(1 to 100)
res2: List[Int] = List(42, 47, 64, 100)

scala> randomSample(1 to 100)
res3: List[Int] = List(7, 16, 42, 46, 48, 78)

scala> randomSample(1 to 100)
res4: List[Int] = List(11, 82, 92)

scala> randomSample(1 to 100)
res5: List[Int] = List(34, 46, 49, 67, 68, 94)

scala> randomSample(1 to 100)
res6: List[Int] = List(1, 13, 42, 49, 83, 88, 92)

scala> randomSample(1 to 100)
res7: List[Int] = List(12, 18, 20, 91)

scala> randomSample(1 to 2)
res8: List[Int] = List()

scala> 

scala> val reservoirSample = Aggregator.reservoirSample[Int](count=3, seed=123)
reservoirSample: com.twitter.algebird.MonoidAggregator[Int,java.util.PriorityQueue[(Double, Int)],Seq[Int]] = com.twitter.algebird.MonoidAggregator$$anon$5@66a77f76

scala> reservoirSample(1 to 100)
res9: Seq[Int] = List(73, 40, 41)

scala> reservoirSample(1 to 100)
res10: Seq[Int] = List(3, 65, 15)

scala> reservoirSample(1 to 100)
res11: Seq[Int] = List(47, 100, 64)

scala> reservoirSample(1 to 100)
res12: Seq[Int] = List(16, 48, 42)

scala> reservoirSample(1 to 100)
res13: Seq[Int] = List(92, 11, 82)

scala> reservoirSample(1 to 100)
res14: Seq[Int] = List(94, 46, 67)

scala> reservoirSample(1 to 2)
res15: Seq[Int] = List(1, 2)

@johnynek
Copy link
Collaborator

👍

@johnynek johnynek merged commit b8fa8e0 into twitter:develop Jun 14, 2016
@joshualande
Copy link
Contributor Author

@avibryant thanks so much!!!!!

* then 'count' total records). This assumes that all 'count' of the records can fit in memory,
* so use this only for small values of 'count'.
*/
def reservoirSample[T](count: Int, seed: Int = DefaultSeed): MonoidAggregator[T, PriorityQueue[(Double, T)], Seq[T]] = {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I chatted @joshualande offline about this, and this is a recap:
When this runs as distributed MapReduce, each jvm ends up using identical pseudo-rng that will pick same record positions when processing its respective split. Since this is not equivalent to the single-jvm execution it's a problem.

We can define rng lazy to make sure they each rng is inited independently, and of course we need an independent seed. It should not be the default JDK implementation of Random() with currentTimeMillis. With this it will be impossible to fulfill the MR contract that the computation should be deterministic for multiple attempts of the same task. A typical way to solve it is to use mapreduce.task.id to create seed deterministically.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, in map-reduce this is a real issue. We could possible mitigate it somewhat by doing:

.map { t => ((t.hashCode & Int.MaxValue) ^ rng.nextInt(), t) }

rather than just nextDouble

which would at least mix data from the item along with the random variable, but still if the same T instance occurs in the same order on two different machines, they will certainly both be selected.

we should make it a bit easier to access the task id in scalding (right now you could do it using https://github.com/twitter/scalding/blob/develop/scalding-core/src/main/scala/com/twitter/scalding/Stats.scala#L142 and the UniqueID

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants