Wednesday, December 31, 2014

Elasticsearch 5-node PicoCluster test - 1,000,000 tweets and 3495.1316MB in just under 9 hours.

We've been doing some benchmarks with Elasticsearch on the 5-node PicoCluster. We were able to push 1,000,000 tweets and 3495.1316MB in just under 9 hours. The tweets are small, around 1KB each, but have a lot of fields and are pretty complicated. That's pretty good considering that the SD cards are not very good a small reads and writes.


Pushed 1000 docs in 34.457 secs at 29.021679 docs/sec 3.4431486 MB/sec, 980000 total docs in 520 minutes 33 seconds at 31.37682 docs/second 98.0% complete
Pushed 1000 docs in 36.632 secs at 27.298536 docs/sec 3.5004349 MB/sec, 981000 total docs in 521 minutes 9 seconds at 31.372042 docs/second 98.1% complete
Pushed 1000 docs in 34.607 secs at 28.89589 docs/sec 3.5908194 MB/sec, 982000 total docs in 521 minutes 44 seconds at 31.369303 docs/second 98.2% complete
Pushed 1000 docs in 30.67 secs at 32.605152 docs/sec 3.3349895 MB/sec, 983000 total docs in 522 minutes 15 seconds at 31.370514 docs/second 98.299995% complete
Pushed 1000 docs in 31.243 secs at 32.007168 docs/sec 3.431964 MB/sec, 984000 total docs in 522 minutes 46 seconds at 31.37115 docs/second 98.4% complete
Pushed 1000 docs in 28.858 secs at 34.652435 docs/sec 3.4087648 MB/sec, 985000 total docs in 523 minutes 15 seconds at 31.374163 docs/second 98.5% complete
Pushed 1000 docs in 29.598 secs at 33.786068 docs/sec 3.4104357 MB/sec, 986000 total docs in 523 minutes 44 seconds at 31.376436 docs/second 98.6% complete
Pushed 1000 docs in 32.356 secs at 30.90617 docs/sec 3.4084692 MB/sec, 987000 total docs in 524 minutes 17 seconds at 31.375952 docs/second 98.7% complete
Pushed 1000 docs in 37.807 secs at 26.450129 docs/sec 3.4255342 MB/sec, 988000 total docs in 524 minutes 55 seconds at 31.370039 docs/second 98.799995% complete
Pushed 1000 docs in 33.404 secs at 29.936535 docs/sec 3.4184904 MB/sec, 989000 total docs in 525 minutes 28 seconds at 31.36852 docs/second 98.9% complete
Pushed 1000 docs in 34.465 secs at 29.014942 docs/sec 3.4793549 MB/sec, 990000 total docs in 526 minutes 2 seconds at 31.36595 docs/second 99.0% complete
Pushed 1000 docs in 30.792 secs at 32.475967 docs/sec 3.4305592 MB/sec, 991000 total docs in 526 minutes 33 seconds at 31.367033 docs/second 99.1% complete
Pushed 1000 docs in 29.749 secs at 33.614574 docs/sec 3.4574842 MB/sec, 992000 total docs in 527 minutes 3 seconds at 31.369146 docs/second 99.2% complete
Pushed 1000 docs in 32.825 secs at 30.464584 docs/sec 3.370614 MB/sec, 993000 total docs in 527 minutes 36 seconds at 31.368208 docs/second 99.299995% complete
Pushed 1000 docs in 37.048 secs at 26.99201 docs/sec 3.451209 MB/sec, 994000 total docs in 528 minutes 13 seconds at 31.36309 docs/second 99.4% complete
Pushed 1000 docs in 35.307 secs at 28.322996 docs/sec 3.3885374 MB/sec, 995000 total docs in 528 minutes 48 seconds at 31.35971 docs/second 99.5% complete
Pushed 1000 docs in 37.64 secs at 26.567482 docs/sec 3.4242926 MB/sec, 996000 total docs in 529 minutes 26 seconds at 31.35403 docs/second 99.6% complete
Pushed 1000 docs in 28.108 secs at 35.57706 docs/sec 3.4203835 MB/sec, 997000 total docs in 529 minutes 54 seconds at 31.357765 docs/second 99.7% complete
Pushed 1000 docs in 28.886 secs at 34.618847 docs/sec 3.4412107 MB/sec, 998000 total docs in 530 minutes 23 seconds at 31.360725 docs/second 99.8% complete
Pushed 1000 docs in 40.074 secs at 24.953835 docs/sec 3.4108858 MB/sec, 999000 total docs in 531 minutes 3 seconds at 31.352667 docs/second 99.9% complete
Pushed 1000 docs in 0.0 secs at Infinity docs/sec 3.4554148 MB/sec, 1000000 total docs in 531 minutes 3 seconds at 31.38405 docs/second 100.0% complete

Pushed 1000000 total docs and 3495.1316MB in 531 minutes 3 seconds at 31.38405 per second,

Thursday, December 18, 2014

10GB Terasort Benchmark on 5-node Raspberry PI Cluster - 2H 52m 56s

We just set a new record for the 10GB terasort on a 5-node PicoCluster! We cut over an our off the benchmark time bringing the total to under 3 hours! Pretty amazing!

Hadoop job_201412181311_0002 on master User: hadoop
Job Name: TeraSort
Job File: hdfs://pi0:54310/tmp/hadoop-hadoop/mapred/staging/hadoop/.staging/job_201412181311_0002/job.xml
Submit Host: pi0
Submit Host Address: 10.1.10.120
Job-ACLs: All users are allowed
Job Setup: Successful
Status: Succeeded
Started at: Thu Dec 18 14:54:20 MST 2014
Finished at: Thu Dec 18 17:47:16 MST 2014
Finished in: 2hrs, 52mins, 56sec
Job Cleanup: Successful

Kind% CompleteNum TasksPendingRunningCompleteKilledFailed/Killed
Task Attempts
map100.00%

80008000 / 0
reduce100.00%

80008000 / 0


Counter Map Reduce Total
Map-Reduce Framework Spilled Records 0 0 300,000,000
Map output materialized bytes 0 0 10,200,038,400
Reduce input records 0 0 100,000,000
Virtual memory (bytes) snapshot 0 0 46,356,074,496
Map input records 0 0 100,000,000
SPLIT_RAW_BYTES 8,800 0 8,800
Map output bytes 0 0 10,000,000,000
Reduce shuffle bytes 0 0 10,200,038,400
Physical memory (bytes) snapshot 0 0 32,931,528,704
Map input bytes 0 0 10,000,000,000
Reduce input groups 0 0 100,000,000
Combine output records 0 0 0
Reduce output records 0 0 100,000,000
Map output records 0 0 100,000,000
Combine input records 0 0 0
CPU time spent (ms) 0 0 27,827,080
Total committed heap usage (bytes) 0 0 32,344,113,152
File Input Format Counters Bytes Read 0 0 10,000,144,320
FileSystemCounters HDFS_BYTES_READ 10,000,153,120 0 10,000,153,120
FILE_BYTES_WRITTEN 20,404,679,750 10,204,290,230 30,608,969,980
FILE_BYTES_READ 10,265,248,834 10,200,000,960 20,465,249,794
HDFS_BYTES_WRITTEN 0 10,000,000,000 10,000,000,000
File Output Format Counters Bytes Written 0 0 10,000,000,000
Job Counters Launched map tasks 0 0 80
Launched reduce tasks 0 0 80
SLOTS_MILLIS_REDUCES 0 0 28,079,434
Total time spent by all reduces waiting after reserving slots (ms) 0 0 0
SLOTS_MILLIS_MAPS 0 0 22,051,330
Total time spent by all maps waiting after reserving slots (ms) 0 0 0
Rack-local map tasks 0 0 30
Data-local map tasks 0 0 50

Map Completion Graph - close

Reduce Completion Graph - close



Go back to JobTracker

This is Apache Hadoop release 1.2.1

Thursday, December 4, 2014

The Daily WTF: The Robot Guys

Not really Big Data, but pretty funny :)

http://thedailywtf.com/articles/the-robot-guys

  - Craig

Thursday, November 13, 2014

10GB Terasort Benchmark on 5-node Raspberry PI Cluster - 4H 12m 5s

After more than a week of testing, tweeking, and retesting, we were able to successfully run a 10GB Terasort on a 5-node Raspberry PI cluster! Each node has 512MB ram and a 16GB SD card.

Hadoop job_201411131351_0001 on master

User: hadoop
Job Name: TeraSort
Job File: hdfs://pi0:54310/tmp/hadoop-hadoop/mapred/staging/hadoop/.staging/job_201411131351_0001/job.xml
Submit Host: pi0
Submit Host Address: 10.1.10.120
Job-ACLs: All users are allowed
Job Setup: Successful
Status: Succeeded
Started at: Thu Nov 13 14:23:35 MST 2014
Finished at: Thu Nov 13 18:35:40 MST 2014
Finished in: 4hrs, 12mins, 5sec
Job Cleanup: Successful

Kind% CompleteNum TasksPendingRunningCompleteKilledFailed/Killed
Task Attempts
map100.00%

1520015200 / 0
reduce100.00%

1520015200 / 0


Counter Map Reduce Total
File Input Format Counters Bytes Read 0 0 10,000,298,372
Job Counters SLOTS_MILLIS_MAPS 0 0 24,993,499
Launched reduce tasks 0 0 152
Total time spent by all reduces waiting after reserving slots (ms) 0 0 0
Rack-local map tasks 0 0 144
Total time spent by all maps waiting after reserving slots (ms) 0 0 0
Launched map tasks 0 0 152
Data-local map tasks 0 0 8
SLOTS_MILLIS_REDUCES 0 0 34,824,665
File Output Format Counters Bytes Written 0 0 10,000,000,000
FileSystemCounters FILE_BYTES_READ 10,341,496,856 10,200,000,912 20,541,497,768
HDFS_BYTES_READ 10,000,315,092 0 10,000,315,092
FILE_BYTES_WRITTEN 20,409,243,506 10,208,123,719 30,617,367,225
HDFS_BYTES_WRITTEN 0 10,000,000,000 10,000,000,000
Map-Reduce Framework Map output materialized bytes 0 0 10,200,138,624
Map input records 0 0 100,000,000
Reduce shuffle bytes 0 0 10,200,138,624
Spilled Records 0 0 300,000,000
Map output bytes 0 0 10,000,000,000
Total committed heap usage (bytes) 0 0 57,912,754,176
CPU time spent (ms) 0 0 40,328,090
Map input bytes 0 0 10,000,000,000
SPLIT_RAW_BYTES 16,720 0 16,720
Combine input records 0 0 0
Reduce input records 0 0 100,000,000
Reduce input groups 0 0 100,000,000
Combine output records 0 0 0
Physical memory (bytes) snapshot 0 0 52,945,952,768
Reduce output records 0 0 100,000,000
Virtual memory (bytes) snapshot 0 0 123,024,928,768
Map output records 0 0 100,000,000

Map Completion Graph
Reduce Completion Graph


Go back to JobTracker

This is Apache Hadoop release 1.2.1

Installing Hadoop on a Raspberry PI cluster

This document details setting up and testing Hadoop on a Raspberry PI cluster. The details are almost exactly the same for any Linux flavor, particularly Debian/Ubuntu. There are only a couple of PI specifics.

In this case, we have a 5-node cluster. If you cluster is a different size, make sure the hadoop distribution is on each of the machines, make sure you update the slaves file, and copy the config files to each machine as shown.
 

Machine Setup

This first thing to do would be to add dns entries for all of your cluster machines. If you have a dns sever available, then that's great. If you don't, then you can edit the hosts file and add the correct entries there. This has to be done for every machine in the cluster. My entries look like this:

10.1.10.120 pi0 master
10.1.10.121 pi1
10.1.10.122 pi2
10.1.10.123 pi3
10.1.10.124 pi4
 
For each machine, the hostname needs to be changed to reflect the hostnames defined above. The default Rasbian hostname is raspberrypi .

Edit the /etc/hostname file and change the entry to the hostname that you want. In my case, the first PI is pi0 . A quick reboot of each node should verify the correct hostname of each node.

 
We want to add a hadoop user to each machine. This is not strictly necessary for hadoop to run, but we do this to keep our hadoop stuff separate from anything else on these machines.

Add a new user to each machine to run hadoop.
pi@pi0 ~ $ sudo adduser hadoop 
Adding user `hadoop' ... 
Adding new group `hadoop' (1004) ... 
Adding new user `hadoop' (1001) with group `hadoop' ... 
Creating home directory `/home/hadoop' ... 
Copying files from `/etc/skel' ... 
Enter new UNIX password: 
Retype new UNIX password: 
passwd: password updated successfully 
Changing the user information for hadoop 
Enter the new value, or press ENTER for the default 
 Full Name []: Hadoop 
 Room Number []: 
 Work Phone []: 
 Home Phone []: 
 Other []: 
Is the information correct? [Y/n] Y 
 
 
Log out as the pi user and log back in as the hadoop user on your master node. Create a private identity key and copy to your authorized_keys file.
 
ssh-keygen -t dsa -P '' -f ~/.ssh/id_dsa
cat ~/.ssh/id_dsa.pub >> ~/.ssh/authorized_keys

Copy the identity key to all of your other nodes.
ssh-copy-id -i .ssh/id_dsa.pub hadoop@pi1
ssh-copy-id -i .ssh/id_dsa.pub hadoop@pi2
ssh-copy-id -i .ssh/id_dsa.pub hadoop@pi3
ssh-copy-id -i .ssh/id_dsa.pub hadoop@pi4
 
 


You can also copy the keys directly to the new machine with something like this.

scp .ssh/id_dsa.pub hadoop@pi1:.ssh/authorized_keys
 

Hadoop Setup



Copy the distribution to the hadoop user account and decompress.
I always create symlink to make things a bit easier:
ln -s hadoop-1.x.x hadoop


Now we need to edit some hadoop config files to make everything work.

cd hadoop/conf
ls


This should give you a listing that looks like this:
hadoop@raspberrypi ~/hadoop/conf $ ls 
capacity-scheduler.xml  core-site.xml       hadoop-env.sh               hadoop-policy.xml  
log4j.properties       mapred-site.xml  slaves                  ssl-server.xml.example 
configuration.xsl       fair-scheduler.xml  hadoop-metrics2.properties  hdfs-site.xml      
mapred-queue-acls.xml  masters          ssl-client.xml.example  taskcontroller.cfg 


This files we're interested in are masters and slaves. Add the address of the node that will be the primary NameNode and JobTracker to the masters file. Add the addresses of the nodes that will be the DataNodes and TaskTrackers to the slaves file. You can also add the master node address to the slaves file if you want it to store data and run map reduce jobs, but I'm not doing that in this case. These files looks like this for my cluster:
 
hadoop@raspberrypi ~/hadoop/conf $ more masters 
pi0 
hadoop@raspberrypi ~/hadoop/conf $ more slaves 
pi1
pi2
pi3
pi4 


These files need to be copied to all the nodes in the cluster. The easy way to do this would be as follows:

hadoop@pi0 ~/hadoop/conf $ scp masters slaves pi1:hadoop/conf/. 
masters       100%   12     0.0KB/s   00:00                                        
slaves       100%   48     0.1KB/s   00:00    
hadoop@pi0 ~/hadoop/conf $ scp masters slaves pi2:hadoop/conf/. 
masters       100%   12     0.0KB/s   00:00    
slaves       100%   48     0.1KB/s   00:00    
hadoop@pi0 ~/hadoop/conf $ scp masters slaves pi3:hadoop/conf/. 
masters       100%   12     0.0KB/s   00:00    
slaves       100%   48     0.1KB/s   00:00    
hadoop@pi0 ~/hadoop/conf $ scp masters slaves pi4:hadoop/conf/. 
masters       100%   12     0.0KB/s   00:00    
slaves       100%   48     0.1KB/s   00:00


We need to define the JAVA_HOME setting in hadoop-env.sh file. Since Hadoop runs on java, it needs to know where the executable is. The Oracle JDK comes standard with the latest Rasbian release. We can set JAVA_HOME as follows:

# The java implementation to use.  Required. 
export JAVA_HOME=/usr/lib/jvm/jdk-7-oracle-armhf


Next we need to look at the core-site.xml, hdfs-site.xml and mapred-site.xml files. These are the configuration files for the distributed file system HDFS, and the code that runs the map reduce programs that we want to run. By default, both of these processes put their files in temporary file system space. That's fine to play around with, but as soon as you reboot, all of your data disappears and you have to start over again. We first need to tell each machine where the NameNode is. This is configured in the core-site.xml file.

<property>
 <name>fs.default.name</name>
 <value>hdfs://master:54310</value>
</property>

Next we need to tell each node where the
JobTracker is and a safe place to store data so that it stays permanent. This is configured in the mapred-site.xml file.

<property>
 <name>mapred.job.tracker</name>
 <value>master:54311</value>
</property>
 
<property>
 <name>mapred.local.dir</name>
 <value>/home/hadoop/mapred</value>
</property>


Lastly we need to specify safe directories to store our files for HDFS.

<property>
 <name>dfs.name.dir</name>
 <value>/home/hadoop/name</value>
</property>
<property>
 <name>dfs.data.dir</name>
 <value>/home/hadoop/data</value>
</property> 
 
 
All of these files need to be copied to each of the other nodes in the cluster. We can do that like we did before with the masters and slaves files.
 
hadoop@pi0 ~/hadoop/conf $ scp hadoop-env.sh core-site.xml hdfs-site.xml mapred-site.xml hadoop@pi1:hadoop/conf/. 
hadoop-env.sh       100% 2218     2.2KB/s   00:00    
core-site.xml      100%  268     0.3KB/s   00:00    
hdfs-site.xml      100%  347     0.3KB/s   00:00    
mapred-site.xml      100%  356     0.4KB/s   00:00    
hadoop@pi0 ~/hadoop/conf $ scp hadoop-env.sh core-site.xml hdfs-site.xml mapred-site.xml hadoop@pi2:hadoop/conf/. 
hadoop-env.sh       100% 2218     2.2KB/s   00:00    
core-site.xml      100%  268     0.3KB/s   00:00    
hdfs-site.xml      100%  347     0.3KB/s   00:00    
mapred-site.xml      100%  356     0.4KB/s   00:00    
hadoop@pi0 ~/hadoop/conf $ scp hadoop-env.sh core-site.xml hdfs-site.xml mapred-site.xml hadoop@pi3:hadoop/conf/. 
hadoop-env.sh       100% 2218     2.2KB/s   00:00    
core-site.xml      100%  268     0.3KB/s   00:00    
hdfs-site.xml      100%  347     0.3KB/s   00:00    
mapred-site.xml      100%  356     0.4KB/s   00:00    
hadoop@pi0 ~/hadoop/conf $ scp hadoop-env.sh core-site.xml hdfs-site.xml mapred-site.xml hadoop@pi4:hadoop/conf/. 
hadoop-env.sh       100% 2218     2.2KB/s   00:00    
core-site.xml      100%  268     0.3KB/s   00:00    
hdfs-site.xml      100%  347     0.3KB/s   00:00    
mapred-site.xml      100%  356     0.4KB/s   00:00


We have one interesting task left. The hadoop script by default wants to run the datanode in server mode. The Oracle Java distribution does not support this option. We need to edit the bin/hadoop script to remove this option.

cd bin
vi hadoop

Search for “-server”
Change this:
    HADOOP_OPTS="$HADOOP_OPTS -server $HADOOP_DATANODE_OPTS" 
to this:
    HADOOP_OPTS="$HADOOP_OPTS $HADOOP_DATANODE_OPTS" 
 
 
We need to copy this file to all of the nodes in the cluster.

hadoop@pi0 ~/hadoop/bin $ scp hadoop hadoop@pi1:hadoop/bin/. 
hadoop       100%   14KB  13.8KB/s   00:00    
hadoop@pi0 ~/hadoop/bin $ scp hadoop hadoop@pi2:hadoop/bin/. 
hadoop       100%   14KB  13.8KB/s   00:00    
hadoop@pi0 ~/hadoop/bin $ scp hadoop hadoop@pi3:hadoop/bin/. 
hadoop       100%   14KB  13.8KB/s   00:00    
hadoop@pi0 ~/hadoop/bin $ scp hadoop hadoop@pi4:hadoop/bin/. 
hadoop       100%   14KB  13.8KB/s   00:00    


Now we're ready to get things up and running! We need to format the NameNode this sets up the storage structure HDFS. This is done with the bin/hadoop namenode -format command as follows:

hadoop@pi0 ~/hadoop $ bin/hadoop namenode -format 
14/11/04 10:53:28 INFO namenode.NameNode: STARTUP_MSG: 
/************************************************************ 
STARTUP_MSG: Starting NameNode 
STARTUP_MSG:   host = pi0/10.1.10.120 
STARTUP_MSG:   args = [-format] 
STARTUP_MSG:   version = 1.1.1 
STARTUP_MSG:   build = https://svn.apache.org/repos/asf/hadoop/common/branches/branch-1.1 -r 1411108; compiled by 'hortonfo' on Mon Nov 19 10:48:11 UTC 2012 
************************************************************/ 
14/11/04 10:53:30 INFO util.GSet: VM type       = 32-bit 
14/11/04 10:53:30 INFO util.GSet: 2% max memory = 19.335 MB 
14/11/04 10:53:30 INFO util.GSet: capacity      = 2^22 = 4194304 entries 
14/11/04 10:53:30 INFO util.GSet: recommended=4194304, actual=4194304 
14/11/04 10:53:34 INFO namenode.FSNamesystem: fsOwner=hadoop 
14/11/04 10:53:35 INFO namenode.FSNamesystem: supergroup=supergroup 
14/11/04 10:53:35 INFO namenode.FSNamesystem: isPermissionEnabled=true 
14/11/04 10:53:35 INFO namenode.FSNamesystem: dfs.block.invalidate.limit=100 
14/11/04 10:53:35 INFO namenode.FSNamesystem: isAccessTokenEnabled=false accessKeyUpdateInterval=0 min(s), accessTokenLifetime=0 min(s) 
14/11/04 10:53:35 INFO namenode.NameNode: Caching file names occuring more than 10 times 
14/11/04 10:53:37 INFO common.Storage: Image file of size 112 saved in 0 seconds. 
14/11/04 10:53:38 INFO namenode.FSEditLog: closing edit log: position=4, editlog=/home/hadoop/name/current/edits 
14/11/04 10:53:38 INFO namenode.FSEditLog: close success: truncate to 4, editlog=/home/hadoop/name/current/edits 
14/11/04 10:53:39 INFO common.Storage: Storage directory /home/hadoop/name has been successfully formatted. 
14/11/04 10:53:39 INFO namenode.NameNode: SHUTDOWN_MSG: 
/************************************************************ 
SHUTDOWN_MSG: Shutting down NameNode at pi0/10.1.10.120 
************************************************************/
 
 
Since we set up the SSH login niceness before, we cam start the entire cluster now with a single command!
 
hadoop@pi0 ~/hadoop $ bin/start-all.sh 
starting namenode, logging to /home/hadoop/hadoop-1.1.1/libexec/../logs/hadoop-hadoop-namenode-pi0.out 
pi1: starting datanode, logging to /home/hadoop/hadoop-1.1.1/libexec/../logs/hadoop-hadoop-datanode-pi1.out 
pi3: starting datanode, logging to /home/hadoop/hadoop-1.1.1/libexec/../logs/hadoop-hadoop-datanode-pi3.out 
pi4: starting datanode, logging to /home/hadoop/hadoop-1.1.1/libexec/../logs/hadoop-hadoop-datanode-pi4.out 
pi2: starting datanode, logging to /home/hadoop/hadoop-1.1.1/libexec/../logs/hadoop-hadoop-datanode-pi2.out 
pi0: starting secondarynamenode, logging to /home/hadoop/hadoop-1.1.1/libexec/../logs/hadoop-hadoop-secondarynamenode-pi0.out 
starting jobtracker, logging to /home/hadoop/hadoop-1.1.1/libexec/../logs/hadoop-hadoop-jobtracker-pi0.out 
pi1: starting tasktracker, logging to /home/hadoop/hadoop-1.1.1/libexec/../logs/hadoop-hadoop-tasktracker-pi1.out 
pi2: starting tasktracker, logging to /home/hadoop/hadoop-1.1.1/libexec/../logs/hadoop-hadoop-tasktracker-pi2.out 
pi4: starting tasktracker, logging to /home/hadoop/hadoop-1.1.1/libexec/../logs/hadoop-hadoop-tasktracker-pi4.out 
pi3: starting tasktracker, logging to /home/hadoop/hadoop-1.1.1/libexec/../logs/hadoop-hadoop-tasktracker-pi3.out 


Open up a browser and go to the following URLs: pi0 is the master or namenode in your cluster.
http://pi0:50070/dfshealth.jsp
http://pi0:50030/jobtracker.jsp

Testing the Setup

Now we can run something simple and quick and make sure things are working.

hadoop@pi0 ~/hadoop $ bin/hadoop jar hadoop-examples-1.1.1.jar pi 4 1000 
Number of Maps  = 4 
Samples per Map = 1000 
Wrote input for Map #0 
Wrote input for Map #1 
Wrote input for Map #2 
Wrote input for Map #3 
Starting Job 
14/11/04 11:50:50 INFO mapred.FileInputFormat: Total input paths to process : 4 
14/11/04 11:50:55 INFO mapred.JobClient: Running job: job_201411041127_0001 
14/11/04 11:50:56 INFO mapred.JobClient:  map 0% reduce 0% 
14/11/04 11:51:49 INFO mapred.JobClient:  map 25% reduce 0% 
14/11/04 11:51:54 INFO mapred.JobClient:  map 100% reduce 0% 
14/11/04 11:52:22 INFO mapred.JobClient:  map 100% reduce 66% 
14/11/04 11:52:25 INFO mapred.JobClient:  map 100% reduce 100% 
14/11/04 11:52:44 INFO mapred.JobClient: Job complete: job_201411041127_0001 
14/11/04 11:52:44 INFO mapred.JobClient: Counters: 30 
14/11/04 11:52:44 INFO mapred.JobClient:   Job Counters 
14/11/04 11:52:44 INFO mapred.JobClient:     Launched reduce tasks=1 
14/11/04 11:52:44 INFO mapred.JobClient:     SLOTS_MILLIS_MAPS=129653 
14/11/04 11:52:44 INFO mapred.JobClient:     Total time spent by all reduces waiting after reserving slots (ms)=0 
14/11/04 11:52:44 INFO mapred.JobClient:     Total time spent by all maps waiting after reserving slots (ms)=0 
14/11/04 11:52:44 INFO mapred.JobClient:     Launched map tasks=4 
14/11/04 11:52:44 INFO mapred.JobClient:     Data-local map tasks=4 
14/11/04 11:52:44 INFO mapred.JobClient:     SLOTS_MILLIS_REDUCES=34464 
14/11/04 11:52:44 INFO mapred.JobClient:   File Input Format Counters 
14/11/04 11:52:44 INFO mapred.JobClient:     Bytes Read=472 
14/11/04 11:52:44 INFO mapred.JobClient:   File Output Format Counters 
14/11/04 11:52:44 INFO mapred.JobClient:     Bytes Written=97 
14/11/04 11:52:44 INFO mapred.JobClient:   FileSystemCounters 
14/11/04 11:52:44 INFO mapred.JobClient:     FILE_BYTES_READ=94 
14/11/04 11:52:44 INFO mapred.JobClient:     HDFS_BYTES_READ=956 
14/11/04 11:52:44 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=119665 
14/11/04 11:52:44 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=215 
14/11/04 11:52:44 INFO mapred.JobClient:   Map-Reduce Framework 
14/11/04 11:52:44 INFO mapred.JobClient:     Map output materialized bytes=112 
14/11/04 11:52:44 INFO mapred.JobClient:     Map input records=4 
14/11/04 11:52:44 INFO mapred.JobClient:     Reduce shuffle bytes=112 
14/11/04 11:52:44 INFO mapred.JobClient:     Spilled Records=16 
14/11/04 11:52:44 INFO mapred.JobClient:     Map output bytes=72 
14/11/04 11:52:44 INFO mapred.JobClient:     Total committed heap usage (bytes)=819818496 
14/11/04 11:52:44 INFO mapred.JobClient:     CPU time spent (ms)=16580 
14/11/04 11:52:44 INFO mapred.JobClient:     Map input bytes=96 
14/11/04 11:52:44 INFO mapred.JobClient:     SPLIT_RAW_BYTES=484 
14/11/04 11:52:44 INFO mapred.JobClient:     Combine input records=0 
14/11/04 11:52:44 INFO mapred.JobClient:     Reduce input records=8 
14/11/04 11:52:44 INFO mapred.JobClient:     Reduce input groups=8 
14/11/04 11:52:44 INFO mapred.JobClient:     Combine output records=0 
14/11/04 11:52:44 INFO mapred.JobClient:     Physical memory (bytes) snapshot=586375168 
14/11/04 11:52:44 INFO mapred.JobClient:     Reduce output records=0 
14/11/04 11:52:44 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=1716187136 
14/11/04 11:52:44 INFO mapred.JobClient:     Map output records=8 
Job Finished in 116.488 seconds 
Estimated value of Pi is 3.14000000000000000000

Tuesday, August 26, 2014

All About ElasticSearch Scripting

This is taken from the ElasticSearch blog of a similar name, all about scripting.

There is a lot of good news in here. This is as of the 1.3 branch:

  • Dynamic scripting disable by default for security
  • Moving from MVEL to Groovy
  • Added Lucene Expressions
  • field_value_factor functions

Here's an earlier article about scripting and script security.

I personally haven't done that much with ElasticSearch scripting, but I need to start playing with it to see how it can help. I'd encourage you to do the same.

  - Craig

Tuesday, July 29, 2014

ElasticSearch Resiliancy and Failover

ElasticSearch is designed with the assumption that things break. Hardware fails and software crashes. That’s a fact of life. ElasticSearch mainly deals with this through the use of clustering. That means that many machines can be combined together to work as a single unit.

Data can be replicated or copied across multiple servers so that the loss of one or more nodes can be tolerated by the cluster and still have the cluster respond to requests in a timely manner. This obviously depends on the exact configuration.

The primary mechanism for ElasticSearch resiliency is divided into 3 parts - nodes, shards, and replicas. Nodes are the individual servers and as nodes are added to the cluster, data is spread across them in terms of shards and replicas. A shard is a slice or section of the data and is specified when an index is created and cannot be changed. A replica is simply copy of a shard. The number of replicas can be changed on the fly.

If a cluster is formed from a single node, then all primary shards on are on that node. If there is a failure of the node or of a shard, then data will likely be lost.

When you add a node to the cluster so you now have 2 nodes, then primary shards will be spread across both nodes. If a node is lost, then some data will not be available until that node is restored. If that node is lost, then data will be lost.

The next step would be to add a replica. Again, a replica is a copy of the data. In a 2-node scenario, the primary shards will be spread across both nodes, but the a replica of the same shard will be allocated to the alternate node. The effect is that both nodes will have a full copy of the data, but one node will not hold all of the primary shards. If a node is lost in this case, the remaining node can handle all requests. When the failed node is restored or a replacement node is added to the cluster, ElasticSearch will replicate shards to the second node and achieve equilibrium again.

Adding more nodes to the cluster will spread the 2 data copies across all 3 nodes now. The cluster can handle the failure of a single node, but not 2 nodes.

If a second replica is added (making 3 copies of the data), then each node would effectively have a copy of the data. This configuration should be able to handle the loss of 2 of the 3 nodes, though you really don’t want that to happen.

ElasticSearch has some features that allow you to influence shard allocation. There are a couple of different algorithms that it can use to control shard placement. Recently, they have added the size of the shard into the allocation strategy such that one node does not end of having lots of large shards while another node has primarily small shards.

ElasticSearch also has a rack awareness feature. This allows you to tell ElasticSearch that some nodes are physically close to each other while other nodes are physically far apart. For example, you can have some nodes in one data center and other nodes of the same cluster in another data center. ElasticSearch will try to keep requests localized as much as possible. Having a cluster split across data centers is not really recommended for performance reasons, but is an option.

ElasticSearch has a federated search feature. This allows 2 clusters to be in physically separate data centers while essentially a third cluster will arbitrate requests across clusters. This is a very welcome feature for ElasticSearch.

ElasticSearch has also added a snapshot-restore feature as of the 1.0 release. This allows an entire cluster to be backed up and restored, or one or more indices can be specified for a particular snapshot. Snapshots can be taken on a fully running cluster and can be scripted and taken periodically.

Once the initial snapshot occurs, subsequent snapshots taken are incremental in nature. http://www.elasticsearch.org/blog/introducing-snapshot-restore/

One of the great things about snapshot/restore is that it can be used to keep a near-line cluster that mirrors the production cluster. If the production cluster goes down, the backup cluster can be brought back online at roughly the same point in time document wise.

This feature works by shutting the snapshots to the near-line cluster, then applying the snapshots to that cluster. Since the snapshots are incremental, the process should be pretty quick, even for clusters with a good amount of volume.

There are a variety of ways that ElasticSearch can be made resilient depending on the exact architecture. ElasticSearch continues to evolve in this regard as more and more companies rely on it for mission critical applications.

  - Craig

Tuesday, July 22, 2014

What is Big Data?

I've been wanting to write this for a long time, but now have time to put some thoughts down based on discussions and comments from various groups of technical and business people.

The question is, What is Big Data?

Is Big Data just that, big ... data? Well, there have always been very large data sets, certainly constrained to the technology limitations of the time. Storage systems that once were measured in gigabytes, then terabytes, are now measured in petabytes. Certainly out ability to store vast quantities of data have increased dramatically.

With the advent of the Internet and the World Wide Web, our ability to collect information has also grown dramatically as well. Where companies and individuals were limited to collecting data within their particular sphere, now collecting data from locations and individuals around the globe is a daily occurance.

I've hear people sometimes assert that Big Data is simply doing the same things we were before, just in a different way, maybe with different tools. I can understand that point of view. I think that is generally the first step in the process of moving towards a Big Data mindset.

I would put forth that Big Data really is something different than just working with large data sets. To me, Big Data is really a different mind set. We live in a vastly more connected world than just a decade ago. The daily effect of Data in people's lives in general is incredibly pervasive. It's almost impossible to escape.

Big Data is about thinking differently about your data.

Big Data is about connecting data sources. What other data is available that can enhance what you have? Can you add user behavior from your web site? How about advertising or industry data? How about public sources from government or private sources like Twitter, Facebook, search engines like Google, Bing, Yahoo and others?

Big Data is more about openness and sharing. Not everything can be shared and you really, really need to be careful of user identifiable information, but what can you share? What can you publish or what data sets can you contribute to that enrich the overall community?

Big Data many times involves working with unstructured data. SQL database by their nature, enforce very strict structures on your data. If your data doesn't conform, then you're kind of out of luck. There are some things that SQL databases do extremely well, but there are real limits.

There are many new tools and techniques for working with unstructured data and extracting value from them. So called NOSQL data stores are designed to work with data that has little or no structure, providing new capabilities. Open source search engines like ElasticSearch and SOLR provide incredible search and faceting/aggregation abilities.

We have many machine learning algorithms and tools that let us dissect and layer structure on top of our data to help us make sense of it. Algorithms help us to cluster, classify, and figure out which documents/data are similar to other ones.

We can process volumes of data in ways that we couldn't before. Traditional compute requires the data to be moved to the application/algorithm, then the answer was written back to storage. Now we have platforms like Hadoop that effectively distribute large data sets and move the algorithm to the data allowing it to be processed and answers written in place, or to be distributed elsewhere.

Does Big Data require big/large data to be useful? You can run most of these tools on your laptop, so no, not really. You can even run many tools like Map/Reduce on a $35 Raspberry Pi. With Big Data tools, we can do things on our laptops that required data centers in the past.

Big Data requires experimentation. It requires people to ask new questions and develop new answers. What can we do now that we couldn't do before? It requires a different mindset.

  - Craig

Elasticsearch 1.2.2 released

Spreading the word about the latest ElasticSearch release. The latest release is based on Lucene 4.8.1. and includes some nice bug fixes.

There are fixes for possible translog corruption and some file locking issues with previous Lucene releases. It also includes a fix for caching issues with percolation of nested documents.

Great work by the the ElasticSeach team. Be sure and git the new release! http://www.elasticsearch.org/downloads/1-2-2/

  - Craig


Beta launch of ExecuteSalces.com

https://beta.executesales.com/ is not open and available for Beta. ExecuteSales is a platform connecting companies and sales service providers, enabling both to generate new business and achieve higher sales revenue.

I've been working on the search core of ExecuteSales for the last 18 months along with some other fantastic team members and owners and am very excited to get to beta! Beat is totally free so check it out!

  - Craig

Tuesday, April 8, 2014

Blue Raspberry Pi Cluster - 10 Nodes Running ElasticSearch, Hadoop, and Tomcat!

This is my latest project. It's a 10-node Raspberry Pi Cluster! This was built as part of the Utah Big Mountain Data Conference. It is a competition prize and will be given away as a promotional item from NosqlRevolution LLC which is a Nosql/ElasticSearch/Hadoop/Machine Learning consulting company that I founded.

The idea behind the box is to be able to show and demonstrate many of the concepts that are being talked about during the conference. It also gives an idea of how an individual may be able to work and study these concepts in a very small form factor.

From the Application node, the USB and HDMI ports are extended to the outside of the box. A network port from the 16-port switch is also extended. You can plug in a keyboard, mouse, video, and network and then use the box similar as you would a Linux PC.

The cluster is currently pulling Big Data related tweets directly into ElasticSearch via the Twitter river plugin. Periodically, some of the tweets are pulled into the Hadoop cluster for some basic processing, then written back to ElasticSearch for display. A Java REST application with an AngularJS front end provides a search interface for the tweets and displays the results and basic trending provided by Hadoop.

There you go. A single box that let's us run all of this software and demonstrate and end-to-end "Big Data" system.

Stats:

  • 10 Raspberry Pi Model B with 512MB ram and 32GB CD each.
  • Total cluster memory - 5GB
  • Total cluster storage - 320GB
  • 5 Hadoop nodes - 1 name node and 4 data nodes
  • 4 ElasticSearch nodes
  • 1 Application node running tomcat
  • 1 16-port network switch


Please join us on April 12, 2014 in Salt Lake City, UT. See www.uhug.org and utahbigmountaindata.com and utahcodecamp.com for more information.

  - Craig









Wednesday, January 15, 2014

OSv: The Open Source Cloud Operating System That is Not Linux

I just ran across this today and found it very interesting. OSv: The Open Source Cloud Operating System That is Not Linux on linux.com. It looks like they are building a stripped down OS that is designed to run JVM applications. Since the JVM runs Java, Ruby, Scall, Javascript and others, it gives a lot of options to run your applications. Of course, not everything will run on the JVM, so they indicate they'll be working with other projects to port them to the JVM.

Part of what makes this interesting is that OSv is a single-user, single-application OS layer than runs either on KVM or XEN. To quote one of the authors:
Avi: The traditional OS supports multiple users with multiple applications running on top. OSV doesn't do any of that. It trusts the hypervisor to provide the multi-tenancy and it just runs a single one. We do a smaller job and that allows us to do it well and allows the administration of that system to be easeir as well because there's simply less to administer.

From the web site:
OSv reduces the memory and cpu overhead imposed by traditional OS. Scheduling is lightweight, the application and the kernel cooperate, memory pools are shared. It provides unparalleled short latencies and constant predictable performance, translated directly to capex  saving by reduction of the number of OS instances/sizes.

I'm excited to check this out since you can run it easily in an existing Linux environment. I'd really like to see how ElasticSearch performs on OSv. You can find out more information on their site, osv.io.

  - Craig