Jupyter + Spark cluster + HDFS

With Spark How to install Spark? and HDFS How to install HDFS?, we are moving to form a cluster. Oh, I hope you are using virtual machine if you are following this series. It matters because you don’t have go through every step again by just cloning a VM!

So clone completely a copy of virtual machine with Spark and HDFS installed. Boot it and run ifconfig to get the IP, e.g. 192.168.11.138. Let us call this copied vm the slave, and the original vm the master with IP 192.168.11.136.

HDFS cluster

  1. In the master, reformat namenode  giving a cluster name, whatever you want to call it

    $ hdfs namenode -format <cluster name>

  2. In the slave machine, edit your core-site.xml to replace localhost with your real IP address, e.g. 192.168.11.136. (you can always use ifconfig to find it out).
  3. In the master, edit etc/hadoop/workers and add one line for the additional worker, i.e. 192.168.11.138. If you are looking at hadoop-2.7, it should be etc/hadoop/slaves. (No idea why they change the filename. Personally, I think slaves  is better as it says.)
  4. Start the services

    $ /opt/hadoop/sbin/start-dfs.sh

  5. Check by run ‘jps’ in 192.168.11.138 to see whether DataNode service has been started. Also, in your slave machine, you can put README.md file to HDFS and list from master. Also, try the Spark script using the hdfs url: hdfs://198.168.11.136:9000/README.md

That’s it! Goddamn it! These Hadoop guys really make the basic deployment easy!

Spark Cluster

Spark has a similar framework as HDFS, i.e. master-slave mode.


$ $SPARK_HOME/sbin/start-master.sh

Launch Spark monitoring: http://localhost:8080/

copy SSPARK_HOME/conf/slaves.template to SPARK_HOME/conf/slaves. Add 192.168.11.138 at the end of the file

$ $SPARK_HOME/sbin/start-slaves.sh

From the Spark Monitor, I can see the local worker (in the master machine) is alive, and the worker in slave machine (192.168. 11.138) never shows up. Not that easy, hah!

[Trouble shooting]

I login to the slave and run jps. Surprisely enough, it shows that the worker is running. I check the log – nothing!!!

I noticed the parameters in the worker thread by ‘ps -aux’. There is an option ‘host’, which is specified as a hostname. Since I copied the virtual machine, so my slave, i.e. 192.168.11.138, has the same dignity name as my master. That’s why!!!!

But even after I run start-slave.sh at my slave, trying to connect my master, it fails either. However, the worker in the master always shows up in the Spark monitoring.

$ $SPARK_HOME/sbin/start-slave.sh spark://192.168.11.136:7077

Why? Because my Spark master is running on localhost!!! So, I need to start my master on its IP, instead of hostname (in fact, it should be FQDN). So I went back to shutdown the master. and restart it by the following command

$ SPARK_MASTER_HOST=192.168.11.136  $SPARK_HOME/sbin/start-master.sh

Now, I run start-slave.sh on both machines. Two workers show up. For the start-slaves.sh, instead of localhost, I need to put the real IP of my master if I want my master is also a worker.


There is a script conf/spark-env.sh, which is used to make all these problem goes away. The idea is that you can configure all these environment variables in the script and run it in all nodes where you want turn it to be a worker. Easy, ah!


Jupyter

Now, let us connect jupyter to Spark master so we can parallelize our data processing but still have the power of jupyter. How?

$ pyspark –master spark://192.168.11.136:7077

That is it?! Yes, that’s it!

qooej

https://spark.apache.org/docs/1.1.0/spark-standalone.html

Spark in Jupyter

Jupyter is the Swiss knife for data scientist. I am addicted to it since I discovered this tool. As the limitation of python, esp. when we are dealing with high volume data, we may naturally wonder how to use Spark, which is another fantastic tool but for parallel data processing. This tutorial intends to bind them together for a powerful cannon.

If you don’t have Jupyter yet, install it.

$ sudo apt-get python-pip python-pip3
$ sudo pip install jupyter
$ sudo pip3 install jupyter
$ jupyter notebook

Although you installed jupyter with both version 2 and 3. If you check the ‘new’, you may see only one kernel either ‘Python 2’ or ‘Python 3’ but not both. To enable both kernels, you can simply run the following

$ python3 -m ipykernel install –user

Create a file for testing

$ echo “abcdabcef” >> README.md

Now, run ‘$ jupyter notebook’ anywhere you want. Once the page is up, create a new notebook using python 3 (or python 2 if you want). Try this example.

from pyspark import SparkContext
logFile = “README.md”
logData = sc.textFile(logFile).cache()
numAs = logData.filter(lambda s: ‘a’ in s).count()
numBs = logData.filter(lambda s: ‘b’ in s).count()
print(“Lines with a: %i, lines with b: %i” % (numAs, numBs))

Of course you;’ll get a “ImportError: No module named ‘pyspark'” because the jupyter has not integrated with Spark yet.

Or if you run ‘$ pyspark’, it should give you the Spark welcome doodle.

Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ ‘_/
/__ / .__/\_,_/_/ /_/\_\ version 2.0.1
/_/

Using Python version 2.7.12 (default, Jul 1 2016 15:12:24)
SparkSession available as ‘spark’.
>>>

Ok. Here comes the magic. You copy and paste the following lines to ~/.bashrc.

export PATH=$SPARK_HOME/bin:$PATH
export PYSPARK_DRIVER_PYTHON=/usr/local/bin/jupyter
export PYSPARK_DRIVER_PYTHON_OPTS=”notebook –NotebookApp.port=8880″
export PYSPARK_PYTHON=/usr/bin/python3.5

Run ‘$ pyspark’ again, you will see a different interface like the below:

[W 09:24:01.833 NotebookApp] WARNING: The notebook server is listening on all IP addresses and not using encryption. This is not recommended.
[W 09:24:01.833 NotebookApp] WARNING: The notebook server is listening on all IP addresses and not using authentication. This is highly insecure and not recommended.
[I 09:24:01.839 NotebookApp] Serving notebooks from local directory: /home/dvlabs
[I 09:24:01.839 NotebookApp] 0 active kernels
[I 09:24:01.839 NotebookApp] The Jupyter Notebook is running at: http://%5Ball ip addresses on your system]:8880/
[I 09:24:01.840 NotebookApp] Use Control-C to stop this server and shut down all kernels (twice to skip confirmation).

Now, open the browser, type sincerely ‘http://localhost:8880&#8217;, create a notebook with python 3, and rerun our example. And?! It works. Just like that.

from pyspark import SparkContext
logFile = “README.md”
logData = sc.textFile(logFile).cache()
numAs = logData.filter(lambda s: ‘a’ in s).count()
numBs = logData.filter(lambda s: ‘b’ in s).count()
print(“Lines with a: %i, lines with b: %i” % (numAs, numBs))

That I keep mentioning ‘python 3’ is for the syntax reason. If you are comfortable with python 2. Go for it! Just modify the code accordingly.

How to install HDFS?

I have a blog about HDFS before, which has more background knowledge about HDFS. In this blog, I mainly show the steps to install and configure HDFS in Ubuntu 16.04. This tutorial assumes you have oracle-java8-installer installed. If not, please refer the first step in

  1. Download latest Hadoop tar from Apache Hadoop. I chose 3.0.0-alpha1 binary.
  2. unzip the tar to a directory and create a soft link

    $ sudo tar zxvf  hadoop-3.0.0-alpha1.tar.gz -C /opt
    $ sudo ln -s /opt/hadoop-3.0.0-alpha1 /opt/hadoop

  3. edit ~/.bashrc file and add the following line.

    export HADOOP_HOME=/opt/hadoop
    export PATH=$HADOOP_HOME/bin:$PATH

  4. edit /opt/hadoop/etc/hadoop/hadoop-env.sh

    export JAVA_HOME=/usr/lib/jvm/java-8-oracle

  5. Now, you can check whether your hdfs is available or not

    $ hadoop

II. Configure the pseudo-distributed mode

The core-site.xml configuration

<property>
<name>fs.defaultFS</name>
<value>hdfs://localhost:9000</value>
</property>

the hdfs-site.xml configuration

<property>

<name>dfs.replication</name>
<value>1</value>
</property>
<property>
<name>dfs.namenode.name.dir</name>
<value>file:/tmp/dfs/namenode</value>
</property>
<property>
<name>dfs.datanode.data.dir</name>
<value>file:/tmp/dfs/datanode</value>
</property>

III. Start the NameNode and DataNode services

  1. initialize the HDFS database (NOTICE!!!! the database will be gone after you restart this machine and your namenode service will fail when you run start-dfs.sh, because this command create a temporary database under /tmp, as specified in the hdfs-site.xml. To avoid this issue, you can simply reconfigure the two properties dfs.datanode.data.dir and dfs.namenode.name.dir to be some permenent path in hdfs-site.xml.

    $ hdfs namenode -format

  2. Now, the HDFS “database” is ready but HDFS service isn’t running. If you run ‘$hdfs dfs -ls’, you will get an error saying, ConnectionRefused. To start the HDFS service (the API for operating this database), you have to run the following script.

    $ /opt/hadoop/sbin/start-dfs.sh

  3. But you may get the port 22 connection refused error. Create a passphraseless SSH key and add to the authorized list. The way to check  the following way is to try ‘ssh localhost’ and see whether it prompts for a passphrase
      $ ssh-keygen -t dsa -P '' -f ~/.ssh/id_dsa
      $ cat ~/.ssh/id_dsa.pub >> ~/.ssh/authorized_keys
      $ chmod 0600 ~/.ssh/authorized_keys

    [Trouble Shooting] Oddly enough, it complains ‘permission denied’. Search around to find that I can use the following command to check what is going on during SSH connection, i.e. the debug mode.

    $ ssh -v localhost

    what I can see is following. You are smart enough to figure it out what is going wrong here (notice the encryption schema checked).

    debug1: kex_input_ext_info: server-sig-algs=<rsa-sha2-256,rsa-sha2-512>
    debug1: SSH2_MSG_SERVICE_ACCEPT received
    debug1: Authentications that can continue: publickey,password
    debug1: Next authentication method: publickey
    debug1: Trying private key: /home/xxx/.ssh/id_rsa
    debug1: Trying private key: /home/xxx/.ssh/id_ecdsa
    debug1: Trying private key: /home/xxx/.ssh/id_ed25519
    debug1: Next authentication method: password

  4. rerun the start-dfs.sh again and you will see namenode and datanode started. You can always use the command jps to see whether they are running. Also, you can use netstat to check wehther the hdfs is active on port 9000

    netstat -tulp

IV. Sanity Check: you should be able to put files into the database and also to list

$ hdfs dfs -put /opt/hadoop/README.txt /
$ hdfs dfs -ls /


If you have Spark installed, you can also test HDFS with Spark, which is extremely straightforward by simply using the HDFS path instead of local path. Use ‘$hdfs dfs -cat /output/part-00000’ to see the results. It shows that HDFS is another way of storing file and Spark is compatible with HDFS.

>>> myfile=sc.textFile(‘hdfs://localhost:9000/README.txt’)
>> counts = myfile.flatMap(lambda line: line.split(” “)).map(lambda word: (word, 1)).reduceByKey(lambda v1,v2: v1 + v2)
>>> counts.saveAsTextFile(“hdfs://localhost:9000/output”)

To stop HDFS … (really, I won’t be going to tell you anything, lookup the manual for handy commands 🙂

$ /opt/hadoop/sbin/stop-dfs.sh

V. Last Word as a Gift

If anything goes wrong, check /opt/hadoop/logs/*.log, you may easily solve any problems you unfortunately run into.

 

 

https://hadoop.apache.org/docs/r2.7.2/hadoop-project-dist/hadoop-common/SingleCluster.html

How to install Spark?

It is fairly easy to install spark. This tutorial mostly adapt from this.

  1. Install Oracle Java 8 if you haven’t

    $ sudo add-apt-repository ppa:webupd8team/java
    $ sudo apt-get update
    $ sudo apt-get install oracle-java8-installer

  2. Download latest binary distribution Spark from Spark Website
  3. unzip the tar file

    $ cd Downloads; tar zxvf spark-2.0.1-bin-hadoop2.7.tgz

  4. Copy to the root directory (personal choice)

    $ sudo cp ~/Downloads/spark-2.0.1-bin-hadoop2.7 /opt/spark-2.0.1

  5. Create a soft link for easy maintenance, e.g. upgrade spark

    $ sudo ln -s /opt/spark-2.0.1 /opt/spark

  6. Set environment in your .bashrc by adding two lines as below

    export SPARK_HOME=/opt/spark
    export PATH=$SPARK_HOME/bin:$PATH

  7. test your Spark

    $ pyspark
    Python 2.7.12 (default, Jul 1 2016, 15:12:24)
    [GCC 5.4.0 20160609] on linux2
    Type “help”, “copyright”, “credits” or “license” for more information.
    Using Spark’s default log4j profile: org/apache/spark/log4j-defaults.properties
    Setting default log level to “WARN”.
    To adjust logging level use sc.setLogLevel(newLevel).
    16/10/04 13:32:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform… using builtin-java classes where applicable
    16/10/04 13:32:43 WARN Utils: Your hostname, [yourhostname] resolves to a loopback address: 127.0.0.1; using [some ip] instead (on interface ens33)
    16/10/04 13:32:43 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
    Welcome to
    ____ __
    / __/__ ___ _____/ /__
    _\ \/ _ \/ _ `/ __/ ‘_/
    /__ / .__/\_,_/_/ /_/\_\ version 2.0.1
    /_/

    Using Python version 2.7.12 (default, Jul 1 2016 15:12:24)
    SparkSession available as ‘spark’.
    >>>

[Troubleshoot] If you run into the error, java.net.UnknownHostException:, edit the /etc/hosts files to add the following line

127.0.0.1    [you hostname]

if you don’t know what your hostname is, run the following command to find it out.

$ hostname

Run your first spark script in python

Using Python version 2.7.12 (default, Jul 1 2016 15:12:24)
SparkSession available as ‘spark’.
>>> myfile = sc.textFile(‘/tmp/test.txt’)
>>> counts = myfile.flatMap(lambda line: line.split(” “)).map(lambda word: (word, 1)).reduceByKey(lambda v1,v2: v1 + v2)
>>> counts.saveAsTextFile(“/tmp/output”)

If no error, you should find a folder created under /tmp, which may have several files with name pattern ‘part-xxxxx’.

As Spark can also work with HDFS, if you have HDFS ready to use, here is another test you can run

>>> myfile = sc.textFile(“hdfs://namenode_host:8020/path/to/input”)
>>> counts = myfile.flatMap(lambda line: line.split(” “)).map(lambda word: (word, 1)).reduceByKey(lambda v1,v2: v1 + v2)
>>> counts.saveAsTextFile(“hdfs://namenode:8020/path/to/output”)

The word count example is copied and modified from the article. You can also refer to my “copied” tutorial How to set up HDFS.

You can run the command ‘jps’ to find out that there is one job ‘SparkSubmit’ is running also.

HDFS

Notes about HDFS Deamon

$ bin/hdfs namenode -format

HDFS isn’t a real filesystem that runs on a hard drive like ext3 or something similar. It stores data on a regular file system like ext3 and provides API to access its data. It is more like a database. This command initializes the database.

By default the namenode location: /tmp/hadoop-/dfs/name

To change the namenode location add the follwing properties At hdfs-site.xml

<property>
   <name>dfs.namenode.name.dir</name>
   <value>file:/dfs/namenode</value>
</property>

 <property>
   <name>dfs.datanode.data.dir</name>
   <value>file:/dfs/datanode</value>
</property>

Make sure you have the right permission to access the paths specified.
Make sure you got dfs.namenode.name.dir and dfs.datanode.data.dir right. (when copying, may leave name->data not completely replaced) 😦

HDFS has a master/slave architecture. An HDFS cluster consists of a single NameNode, a master server that manages the file system namespace and regulates access to files by clients. There are a number of DataNodes, usually one per node in the cluster, which manage storage attached to the nodes that they run on. HDFS exposes a file system namespace and allows user data to be stored in files. I

The NameNode executes file system namespace operations like opening, closing, and renaming files and directories. It also determines the mapping of blocks to DataNodes.

A file is split into one or more blocks and these blocks are stored in a set of DataNodes.

The DataNodes are responsible for serving read and write requests from the file system’s clients. The DataNodes also perform block creation, deletion, and replication upon instruction from the NameNode.

——

; Start NameNode daemon and DataNode daemon
$ sbin/start-dfs.sh

; create folder /usr and /usr/abc in the hdfs
$ bin/hdfs dfs -mkdir /usr
$ bin/hdfs dfs -mkdir /usr/abc

; put a local file to hdfs location
$ bin/hdfs dfs -put

—-

HDFS daemons are NameNode, SecondaryNameNode, and DataNode.

– etc/hadoop/core-site.xml
fs.defaultFS NameNode URI hdfs://host:port/

– etc/hadoop/hdfs-site.xml
dfs.namenode.name.dir
Path on the local filesystem where the NameNode stores the namespace and transactions logs persistently.
If this is a comma-delimited list of directories then the name table is replicated in all of the directories, for redundancy.

dfs.datanode.data.dir Comma separated list of paths on the local filesystem of a DataNode where it should store its blocks. If this is a comma-delimited list of directories, then data will be stored in all named directories, typically on different devices.

—- TROUBLE SHOOT
Connection Refused
– jps # check whether hadoop is running: nemanode, datanode
– /etc/hosts -> remove 127.0.1.1 ubuntu

http://www.michael-noll.com/tutorials/running-hadoop-on-ubuntu-linux-single-node-cluster/
http://stackoverflow.com/questions/27143409/what-the-command-hadoop-namenode-format-will-do
https://hadoop.apache.org/docs/r1.2.1/hdfs_design.html
https://hadoop.apache.org/docs/r2.7.2/hadoop-project-dist/hadoop-common/ClusterSetup.html

Hadoop

  1. a clean vm: Ubuntu 16.04 LTS
  2. follow single node cluster tutorial: http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/SingleCluster.html
    • download hadoop-2.7.3-src
    • follow BUILDING.txt
      • install all packages listed
        $ sudo apt-get purge openjdk*
        $ sudo apt-get install software-properties-common
        $ sudo add-apt-repository ppa:webupd8team/java
        $ sudo apt-get update
        $ sudo apt-get install oracle-java7-installer
        $ sudo apt-get -y install maven
        $ sudo apt-get -y install build-essential autoconf automake libtool cmake zlib1g-dev pkg-config libssl-dev
        $ sudo apt-get -y install libprotobuf-dev protobuf-compiler
        $ sudo apt-get install snappy libsnappy-dev
        $ sudo apt-get install bzip2 libbz2-dev
        $ sudo apt-get install libjansson-dev
        $ sudo apt-get install fuse libfuse-dev
    • building hadoop binary
      $ mvn package -Pdist -DskipTests -Dtar

      • ERROR: protoc version is ‘libprotoc 2.6.1’, expected version is ‘2.5.0’
      • FIX: http://codetips.coloza.com/compile-hadoop-from-source/
      • ERROR: libprotoc.so.8: cannot open shared object file: No such file or directory
      • FIX: $ sudo ldconfig /usr/local/lib    (Note: the libprotoc.so.8 should be in /usr/local/lib
    • install $mvn install
    • edit hadoop-dist/target/etc/hadoop/hadoop-env.sh
    • set export JAVA_HOME=/usr/lib/jvm/java-7-oracle
    • standalone mode test passed
    • pseudo-distributed mode:
      • $ bin/hdfs dfs -mkdir /user/<username>/input
        $ bin/hdfs dfs -put etc/hadoop/*.xml /user/<username>/input
          $ bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.3.jar grep input output 'dfs[a-z.]+'