Analyzing the blockchain.
# download the blockchain bootstrap db to get a large number of transactions
# prepared by the bitcoin.org from here: https://bitcoin.org/en/download
wget https://bitcoin.org/bin/blockchain/bootstrap.dat.torrent
# now install rtorrent to pull this down (blazing fast!)
apt-get install rtorrent
rtorrent bootstrap.dat.torrent
# Now move this dump into the canonical location of reference implementation blockchain dat files
mkdir ~/.bitcoin
cp bootstrap.dat ~/.bitcoin/blk0001.dat# Install the blockchain parser so we can read what's in here
sudo apt-get install -y libssl-dev build-essential g++-4.4 libboost-all-dev libsparsehash-dev git-core perl
git clone git://github.com/witoff/blockparser.git
cd blockparser
makesudo -s
fdisk -l
mkfs.ext4 /dev/xvdb
mount -t ext4 /dev/xvdb /mnt
mount -a
mv blockparser /mnt
cd /mnt
# Move blockchain onto this volume and redefine home
mkdir .bitcoin
mv ~/.bitcoin/blk* .bitcoin/
export HOME=/mnt
# parse
./parser <option>
# export all balances
./parser allBalances > ~/all_balances.txt
# exports 5.8GB+ of data into the output text file
# export all transactions (custom to my fork)
./parser allTransactions > ~/all_tx.txt
# Run `./parser man` to see full docs.Exporting balances looks like this:
| balance | Hash160 | Base58 | nbIn | lastTimeIn | nbOut | lastTimeOut |
|---|---|---|---|---|---|---|
144341.5 |
a0e6ca5444e... |
1FfmbHf... |
589 | Sun Apr 6 12:56:29 2014 | 0 | Thu Jan 1 00:00:00 1970 |
97831.5 |
1855055056b9... |
13Df4x5... |
63 | Wed Apr 2 21:02:53 2014 | 39 | Wed Mar 12 18:07:06 2014 |
where:
balanceis the outstanding net balance on this addresshash160is the directly listed public key hash output of any transaction going to this address (=RIPEMD160(SHA256(x)))bash58is the base58 encoded public key (more here)nbInis the number of transactions sending to this addresslastTimeInis the last time a tx was received herenbOutis the number of transactions sent from this addresslastTimeOutis the last time a tx was broadcast pulling from this address.
And exporting transactions look like this:
| time | address | txId | txAmount |
|---|---|---|---|
| Sat Jan 3 18:15:05 2009 | 62e907.. |
4a5e1e... |
50 |
From here
AWS Tokens (.):
- key: ...
- secret: ...
# N.B. Work in US-East
# setup keys
export AWS_ACCESS_KEY_ID=...
export AWS_SECRET_ACCESS_KEY=...
# clone bdas setup scripts
git clone git://github.com/witoff/bdas-scripts.git
# Launch Cluster
cd training-scripts
# launch cluster with ganglia, using my keys
./spark-ec2 -i ~/.ssh/bdas.pem -k bdas -w 240 -g -t m3.xlarge --root-vol-size 31 --copy launch amplab-training
# ssh into master e.g.
ssh -i ~/.ssh/bdas.pem [email protected]
# slaves
ssh -i ~/.ssh/bdas.pem [email protected]
ssh -i ~/.ssh/bdas.pem [email protected]
ssh -i ~/.ssh/bdas.pem [email protected]
ssh -i ~/.ssh/bdas.pem [email protected]
ssh -i ~/.ssh/bdas.pem [email protected]Spark Example:
/root/spark/bin/spark-shell
# load data from hdfs
val pagecounts = sc.textFile("/wiki/pagecounts")
# look at some datafiles and print each on a line
pagecounts.take(10).map(println)
# each line in data contains stats for one page. Schema is:
# <date_time> <project_code/language> <page_title> <num_hits for hour> <page_size in bytes>
# 20090505-000000 aa Main_Page 2 9980
# count number of lines
pagecounts.count
# get all english pages and cache rdd
val enPages = pagecounts.filter(_.split(" ")(1) == "en").cache
# count en pages from the cache
enPages.count
# count number of visits on each day
val enTuples = enPages.map(_.split(" "))
val enKeyValuePairs = enTuples.map(tuple => (tuple(0).substring(0, 8), tuple(3).toInt))
enKeyValuePairs.reduceByKey(_+_, 1).collect
# Array((20090505,207698578), (20090506,204190442), (20090507,202617618))
enPages.map( line => (line.substring(0,8), line.split(" ")(3).toInt)).reduceByKey(_ + _).collect
# Array((20090505,207698578), (20090506,204190442), (20090507,202617618))
# find the biggest pages
enPages.map(_.split(" ")).map(tuple => (tuple(2), tuple(3).toInt)).reduceByKey(_+_).filter(el => el._2 > 200000).collect.map(println)
enPages.map(l => l.split(" ")).map(l => (l(2), l(3).toInt)).reduceByKey(_+_, 40).filter(x => x._2 > 200000).map(x => (x._2, x._1)).collect.foreach(println)
# finished up here: http://ampcamp.berkeley.edu/3/exercises/data-exploration-using-spark.htmlFull Scala API here Full Python API here
clear the tachyon cache with ./tachyon/bin/tachyon clearCache
Shark / Spark SQL
# copying in btc data
cd /ampcamp-data/
wget https://s3.amazonaws.com/witoff-bitcoin/all_balances.tar.gz
gunzip all_balances.tar.gz
mv all_balances.tar all_balances.txt
# copy data into HDFS
/root/ephemeral-hdfs/bin/hadoop fs -copyFromLocal /ampcamp-data/all_balances.txt /all_balances.txt
val data = sc.textFile("/all_balances.txt")
data.take(10).map(println)
data.count
# 32994774
val balances = data.filter(x => x.length>0 && x(0) !='-')
balances.count
# Balance Hash160 Base58 nbIn lastTimeIn nbOut lastTimeOut
# 144341.5 a0e6... 1Ffmb... 589 Sun Apr 6 12:56:29 2014 0 Thu Jan 1 00:00:00 1970
# 97831.54 1855... 13Df4... 63 Wed Apr 2 21:02:53 2014 39 Wed Mar 12 18:07:06 2014
val parts = balances.map(_.trim).filter(_(0) != 'B').map(_.split("\\s+"))
val biggest = parts.map(_(0).toFloat).filter(_ > 1000)
biggest.collect().map(println)
# sum of all mined blocks
val sum = parts.map(_(0).toFloat).reduce(_+_)
# sum: Float = 1.2573022E7
val tx_sum = parts.map(x => x(3).toInt + x(9).toInt).reduce(_+_)
# 176,227,748
val hashes = parts.map(x => x(2).toLowerCase())
hashes.filter(_.contains("d")).count
# save as a text file
nums.saveAsTextFile("hdfs:///nums")
./ephemeral-hdfs/bin/hadoop fs -copyToLocal /nums.txt/part-00000 /root/nums
cat /root/nums
# DONE!GraphX Notes From here