-
Notifications
You must be signed in to change notification settings - Fork 347
Add an Aggregator.randomSample aggregator #529
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add an Aggregator.randomSample aggregator #529
Conversation
|
This looks great. One thing I'm not sure about: should the API be able to take a random seed instead of a |
|
Hi Avi! Great question. I though about this a bit, but I don't have a strong preference for one way over the other. I did a code search in Algebird before posting the review, and picked this way to be consistent with the API of GaussianDistribution class ( I guess in principle, it might be more efficient to use one random number generator thought all code, instead of building one per object? Although I would bet this is a totally unnecessary optimization. Looking more through the code base, it seems that other classes (including SketchMapParams) take in a seed and build the RNG directly. Also, that class does not take a default seed. My fear with allowing there to be a default seed is that users would be hesitant to change the default which seems like it isn't ideal? Another cool thing might be to make the RNG implicit: def randomSample[T]((count: Int)(implicit r: java.util.Random) ... That way, you could set it once at the top of your code. But I guess this would only be nice if the rest of Algebird expected an implicit RNG. Anyway, this is a long-winded way of saying I am happy to take in the RNG in whatever way you think is best! |
| * Selects exactly 'count' of the input records randomly (or all of the records if there are less | ||
| * then 'count' total records). This assumes that all 'count' of the records can fit in memory, | ||
| * so use this only for small values of 'count'. | ||
| */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think this is more commonly called Reservoir Sampling: https://en.wikipedia.org/wiki/Reservoir_sampling
random sample is imprecise (could also be constant fraction).
My preference would be for a default (constant) seed version also, and in scala, we generally prefer multiple parameter groups only for functions (or in some esoteric cases where path-dependent types require it). So, can we do:
def reservoirSample[T](r: java.util.Random, count: Int): ...
|
@johnynek thanks for your feedback! I updated the code to be called reservoirSample and take a non-curried default seed as @avibryant suggested! I also realized that for completeness, it could be useful to include an aggregator that does the sampling where we pick each item with a given probability. So I added another randomSample aggregator that takes in a probability to do this other kind of sampling. Does this seem useful? And also, is there a more precise name we can give this? I did a brief google search, but couldn't find an obviously-better name. Thanks again for the help! A bit of testing code: scala> import com.twitter.algebird.Aggregator
import com.twitter.algebird.Aggregator
scala> val randomSample = Aggregator.randomSample[Int](prob=0.03, seed=123)
randomSample: com.twitter.algebird.MonoidAggregator[Int,List[Int],List[Int]] = com.twitter.algebird.MonoidAggregator$$anon$6@69b78d37
scala> randomSample(1 to 100)
res0: List[Int] = List(40, 73)
scala> randomSample(1 to 100)
res1: List[Int] = List(3, 8, 15, 65, 74)
scala> randomSample(1 to 100)
res2: List[Int] = List(42, 47, 64, 100)
scala> randomSample(1 to 100)
res3: List[Int] = List(7, 16, 42, 46, 48, 78)
scala> randomSample(1 to 100)
res4: List[Int] = List(11, 82, 92)
scala> randomSample(1 to 100)
res5: List[Int] = List(34, 46, 49, 67, 68, 94)
scala> randomSample(1 to 100)
res6: List[Int] = List(1, 13, 42, 49, 83, 88, 92)
scala> randomSample(1 to 100)
res7: List[Int] = List(12, 18, 20, 91)
scala> randomSample(1 to 2)
res8: List[Int] = List()
scala>
scala> val reservoirSample = Aggregator.reservoirSample[Int](count=3, seed=123)
reservoirSample: com.twitter.algebird.MonoidAggregator[Int,java.util.PriorityQueue[(Double, Int)],Seq[Int]] = com.twitter.algebird.MonoidAggregator$$anon$5@66a77f76
scala> reservoirSample(1 to 100)
res9: Seq[Int] = List(73, 40, 41)
scala> reservoirSample(1 to 100)
res10: Seq[Int] = List(3, 65, 15)
scala> reservoirSample(1 to 100)
res11: Seq[Int] = List(47, 100, 64)
scala> reservoirSample(1 to 100)
res12: Seq[Int] = List(16, 48, 42)
scala> reservoirSample(1 to 100)
res13: Seq[Int] = List(92, 11, 82)
scala> reservoirSample(1 to 100)
res14: Seq[Int] = List(94, 46, 67)
scala> reservoirSample(1 to 2)
res15: Seq[Int] = List(1, 2) |
|
👍 |
|
@avibryant thanks so much!!!!! |
| * then 'count' total records). This assumes that all 'count' of the records can fit in memory, | ||
| * so use this only for small values of 'count'. | ||
| */ | ||
| def reservoirSample[T](count: Int, seed: Int = DefaultSeed): MonoidAggregator[T, PriorityQueue[(Double, T)], Seq[T]] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I chatted @joshualande offline about this, and this is a recap:
When this runs as distributed MapReduce, each jvm ends up using identical pseudo-rng that will pick same record positions when processing its respective split. Since this is not equivalent to the single-jvm execution it's a problem.
We can define rng lazy to make sure they each rng is inited independently, and of course we need an independent seed. It should not be the default JDK implementation of Random() with currentTimeMillis. With this it will be impossible to fulfill the MR contract that the computation should be deterministic for multiple attempts of the same task. A typical way to solve it is to use mapreduce.task.id to create seed deterministically.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, in map-reduce this is a real issue. We could possible mitigate it somewhat by doing:
.map { t => ((t.hashCode & Int.MaxValue) ^ rng.nextInt(), t) }rather than just nextDouble
which would at least mix data from the item along with the random variable, but still if the same T instance occurs in the same order on two different machines, they will certainly both be selected.
we should make it a bit easier to access the task id in scalding (right now you could do it using https://github.com/twitter/scalding/blob/develop/scalding-core/src/main/scala/com/twitter/scalding/Stats.scala#L142 and the UniqueID
For my work, I needed an Aggregator that would generate a random sample of a few records. I created an Aggregator that generates this random sample by appending a random number to each record and then taking the smallest n records based on the random number. I think this is related to reservoir sampling.
I did a code search in Algebird and couldn't find anything exactly like what I needed, although I did find that in Scalding the Grouped class has a bufferedTake function:
https://github.com/twitter/scalding/blob/develop/scalding-core/src/main/scala/com/twitter/scalding/typed/Grouped.scala
In that code, it alludes to the fact that using the hashCode does not ensure randomness and it is better to sort based on a random number. So overall, I am hopefully that my code is a good approach.
I am not sure if there is a good way to write unit tests for this code, although I would be open to any suggestions!
Here is some quick repl code to test it out: