Jupyter + Spark cluster + HDFS

With Spark How to install Spark? and HDFS How to install HDFS?, we are moving to form a cluster. Oh, I hope you are using virtual machine if you are following this series. It matters because you don’t have go through every step again by just cloning a VM!

So clone completely a copy of virtual machine with Spark and HDFS installed. Boot it and run ifconfig to get the IP, e.g. 192.168.11.138. Let us call this copied vm the slave, and the original vm the master with IP 192.168.11.136.

HDFS cluster

  1. In the master, reformat namenode  giving a cluster name, whatever you want to call it

    $ hdfs namenode -format <cluster name>

  2. In the slave machine, edit your core-site.xml to replace localhost with your real IP address, e.g. 192.168.11.136. (you can always use ifconfig to find it out).
  3. In the master, edit etc/hadoop/workers and add one line for the additional worker, i.e. 192.168.11.138. If you are looking at hadoop-2.7, it should be etc/hadoop/slaves. (No idea why they change the filename. Personally, I think slaves  is better as it says.)
  4. Start the services

    $ /opt/hadoop/sbin/start-dfs.sh

  5. Check by run ‘jps’ in 192.168.11.138 to see whether DataNode service has been started. Also, in your slave machine, you can put README.md file to HDFS and list from master. Also, try the Spark script using the hdfs url: hdfs://198.168.11.136:9000/README.md

That’s it! Goddamn it! These Hadoop guys really make the basic deployment easy!

Spark Cluster

Spark has a similar framework as HDFS, i.e. master-slave mode.


$ $SPARK_HOME/sbin/start-master.sh

Launch Spark monitoring: http://localhost:8080/

copy SSPARK_HOME/conf/slaves.template to SPARK_HOME/conf/slaves. Add 192.168.11.138 at the end of the file

$ $SPARK_HOME/sbin/start-slaves.sh

From the Spark Monitor, I can see the local worker (in the master machine) is alive, and the worker in slave machine (192.168. 11.138) never shows up. Not that easy, hah!

[Trouble shooting]

I login to the slave and run jps. Surprisely enough, it shows that the worker is running. I check the log – nothing!!!

I noticed the parameters in the worker thread by ‘ps -aux’. There is an option ‘host’, which is specified as a hostname. Since I copied the virtual machine, so my slave, i.e. 192.168.11.138, has the same dignity name as my master. That’s why!!!!

But even after I run start-slave.sh at my slave, trying to connect my master, it fails either. However, the worker in the master always shows up in the Spark monitoring.

$ $SPARK_HOME/sbin/start-slave.sh spark://192.168.11.136:7077

Why? Because my Spark master is running on localhost!!! So, I need to start my master on its IP, instead of hostname (in fact, it should be FQDN). So I went back to shutdown the master. and restart it by the following command

$ SPARK_MASTER_HOST=192.168.11.136  $SPARK_HOME/sbin/start-master.sh

Now, I run start-slave.sh on both machines. Two workers show up. For the start-slaves.sh, instead of localhost, I need to put the real IP of my master if I want my master is also a worker.


There is a script conf/spark-env.sh, which is used to make all these problem goes away. The idea is that you can configure all these environment variables in the script and run it in all nodes where you want turn it to be a worker. Easy, ah!


Jupyter

Now, let us connect jupyter to Spark master so we can parallelize our data processing but still have the power of jupyter. How?

$ pyspark –master spark://192.168.11.136:7077

That is it?! Yes, that’s it!

qooej

https://spark.apache.org/docs/1.1.0/spark-standalone.html

Advertisements

Spark in Jupyter

Jupyter is the Swiss knife for data scientist. I am addicted to it since I discovered this tool. As the limitation of python, esp. when we are dealing with high volume data, we may naturally wonder how to use Spark, which is another fantastic tool but for parallel data processing. This tutorial intends to bind them together for a powerful cannon.

If you don’t have Jupyter yet, install it.

$ sudo apt-get python-pip python-pip3
$ sudo pip install jupyter
$ sudo pip3 install jupyter
$ jupyter notebook

Although you installed jupyter with both version 2 and 3. If you check the ‘new’, you may see only one kernel either ‘Python 2’ or ‘Python 3’ but not both. To enable both kernels, you can simply run the following

$ python3 -m ipykernel install –user

Create a file for testing

$ echo “abcdabcef” >> README.md

Now, run ‘$ jupyter notebook’ anywhere you want. Once the page is up, create a new notebook using python 3 (or python 2 if you want). Try this example.

from pyspark import SparkContext
logFile = “README.md”
logData = sc.textFile(logFile).cache()
numAs = logData.filter(lambda s: ‘a’ in s).count()
numBs = logData.filter(lambda s: ‘b’ in s).count()
print(“Lines with a: %i, lines with b: %i” % (numAs, numBs))

Of course you;’ll get a “ImportError: No module named ‘pyspark'” because the jupyter has not integrated with Spark yet.

Or if you run ‘$ pyspark’, it should give you the Spark welcome doodle.

Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ ‘_/
/__ / .__/\_,_/_/ /_/\_\ version 2.0.1
/_/

Using Python version 2.7.12 (default, Jul 1 2016 15:12:24)
SparkSession available as ‘spark’.
>>>

Ok. Here comes the magic. You copy and paste the following lines to ~/.bashrc.

export PATH=$SPARK_HOME/bin:$PATH
export PYSPARK_DRIVER_PYTHON=/usr/local/bin/jupyter
export PYSPARK_DRIVER_PYTHON_OPTS=”notebook –NotebookApp.port=8880″
export PYSPARK_PYTHON=/usr/bin/python3.5

Run ‘$ pyspark’ again, you will see a different interface like the below:

[W 09:24:01.833 NotebookApp] WARNING: The notebook server is listening on all IP addresses and not using encryption. This is not recommended.
[W 09:24:01.833 NotebookApp] WARNING: The notebook server is listening on all IP addresses and not using authentication. This is highly insecure and not recommended.
[I 09:24:01.839 NotebookApp] Serving notebooks from local directory: /home/dvlabs
[I 09:24:01.839 NotebookApp] 0 active kernels
[I 09:24:01.839 NotebookApp] The Jupyter Notebook is running at: http://%5Ball ip addresses on your system]:8880/
[I 09:24:01.840 NotebookApp] Use Control-C to stop this server and shut down all kernels (twice to skip confirmation).

Now, open the browser, type sincerely ‘http://localhost:8880&#8217;, create a notebook with python 3, and rerun our example. And?! It works. Just like that.

from pyspark import SparkContext
logFile = “README.md”
logData = sc.textFile(logFile).cache()
numAs = logData.filter(lambda s: ‘a’ in s).count()
numBs = logData.filter(lambda s: ‘b’ in s).count()
print(“Lines with a: %i, lines with b: %i” % (numAs, numBs))

That I keep mentioning ‘python 3’ is for the syntax reason. If you are comfortable with python 2. Go for it! Just modify the code accordingly.

Yield in Python

What is the use of the yield keyword in Python? What does it do?

Copied line by line from: http://stackoverflow.com/questions/231767/the-python-yield-keyword-explained

To understand what yield does, you must understand what generators are. And before generators come iterables.

Iterables

When you create a list, you can read its items one by one, and it’s called iteration:

>>> mylist = [1, 2, 3]
>>> for i in mylist:
...    print(i)
1
2
3

Mylist is an iterable. When you use a list comprehension, you create a list, and so an iterable:

>>> mylist = [x*x for x in range(3)]
>>> for i in mylist:
...    print(i)
0
1
4

Everything you can use “for… in…” on is an iterable: lists, strings, files… These iterables are handy because you can read them as much as you wish, but you store all the values in memory and it’s not always what you want when you have a lot of values.

Generators

Generators are iterators, but you can only iterate over them once. It’s because they do not store all the values in memory, they generate the values on the fly:

It is just the same except you used () instead of []. BUT, you can not perform for i in mygenerator a second time since generators can only be used once: they calculate 0, then forget about it and calculate 1, and end calculating 4, one by one.

Yield

Yield is a keyword that is used like return, except the function will return a generator.

>>> def createGenerator():
...    mylist = range(3)
...    for i in mylist:
...        yield i*i
...
>>> mygenerator = createGenerator() # create a generator
>>> print(mygenerator) # mygenerator is an object!
<generator object createGenerator at 0xb7555c34>
>>> for i in mygenerator:
...     print(i)
0
1
4

Here it’s a useless example, but it’s handy when you know your function will return a huge set of values that you will only need to read once.

To master yield, you must understand that when you call the function, the code you have written in the function body does not run. The function only returns the generator object, this is a bit tricky 🙂

Then, your code will be run each time the for uses the generator.

Now the hard part:

The first time the for calls the generator object created from your function, it will run the code in your function from the beginning until it hits yield, then it’ll return the first value of the loop. Then, each other call will run the loop you have written in the function one more time, and return the next value, until there is no value to return.

The generator is considered empty once the function runs but does not hit yield anymore. It can be because the loop had come to an end, or because you do not satisfy a “if/else” anymore.

Your code explained

Generator:

# Here you create the method of the node object that will return the generator
def node._get_child_candidates(self, distance, min_dist, max_dist):

  # Here is the code that will be called each time you use the generator object:

  # If there is still a child of the node object on its left
  # AND if distance is ok, return the next child
  if self._leftchild and distance - max_dist < self._median:
                yield self._leftchild

  # If there is still a child of the node object on its right
  # AND if distance is ok, return the next child
  if self._rightchild and distance + max_dist >= self._median:
                yield self._rightchild

  # If the function arrives here, the generator will be considered empty
  # there is no more than two values: the left and the right children

Caller:

# Create an empty list and a list with the current object reference
result, candidates = list(), [self]

# Loop on candidates (they contain only one element at the beginning)
while candidates:

    # Get the last candidate and remove it from the list
    node = candidates.pop()

    # Get the distance between obj and the candidate
    distance = node._get_dist(obj)

    # If distance is ok, then you can fill the result
    if distance <= max_dist and distance >= min_dist:
        result.extend(node._values)

    # Add the children of the candidate in the candidates list
    # so the loop will keep running until it will have looked
    # at all the children of the children of the children, etc. of the candidate
    candidates.extend(node._get_child_candidates(distance, min_dist, max_dist))

return result

This code contains several smart parts:

  • The loop iterates on a list but the list expands while the loop is being iterated 🙂 It’s a concise way to go through all these nested data even if it’s a bit dangerous since you can end up with an infinite loop. In this case, candidates.extend(node._get_child_candidates(distance, min_dist, max_dist)) exhausts all the values of the generator, but while keeps creating new generator objects which will produce different values from the previous ones since it’s not applied on the same node.
  • The extend() method is a list object method that expects an iterable and adds its values to the list.

Usually we pass a list to it:

>>> a = [1, 2]
>>> b = [3, 4]
>>> a.extend(b)
>>> print(a)
[1, 2, 3, 4]

But in your code it gets a generator, which is good because:

  1. You don’t need to read the values twice.
  2. You can have a lot of children and you don’t want them all stored in memory.

And it works because Python does not care if the argument of a method is a list or not. Python expects iterables so it will work with strings, lists, tuples and generators! This is called duck typing and is one of the reason why Python is so cool. But this is another story, for another question…

You can stop here, or read a little bit to see a advanced use of generator:

Controlling a generator exhaustion

>>> class Bank(): # let's create a bank, building ATMs
...    crisis = False
...    def create_atm(self):
...        while not self.crisis:
...            yield "$100"
>>> hsbc = Bank() # when everything's ok the ATM gives you as much as you want
>>> corner_street_atm = hsbc.create_atm()
>>> print(corner_street_atm.next())
$100
>>> print(corner_street_atm.next())
$100
>>> print([corner_street_atm.next() for cash in range(5)])
['$100', '$100', '$100', '$100', '$100']
>>> hsbc.crisis = True # crisis is coming, no more money!
>>> print(corner_street_atm.next())
<type 'exceptions.StopIteration'>
>>> wall_street_atm = hsbc.create_atm() # it's even true for new ATMs
>>> print(wall_street_atm.next())
<type 'exceptions.StopIteration'>
>>> hsbc.crisis = False # trouble is, even post-crisis the ATM remains empty
>>> print(corner_street_atm.next())
<type 'exceptions.StopIteration'>
>>> brand_new_atm = hsbc.create_atm() # build a new one to get back in business
>>> for cash in brand_new_atm:
...    print cash
$100

$100
$100
$100
$100
$100
$100
$100
$100
...

The itertools module contains special functions to manipulate iterables. Ever wish to duplicate a generator? Chain two generators? Group values in a nested list with a one liner? Map / Zip without creating another list?

Then just import itertools.

An example? Let’s see the possible orders of arrival for a 4 horse race:

>>> horses = [1, 2, 3, 4]
>>> races = itertools.permutations(horses)
>>> print(races)
<itertools.permutations object at 0xb754f1dc>
>>> print(list(itertools.permutations(horses)))
[(1, 2, 3, 4),
 (1, 2, 4, 3),
 (1, 3, 2, 4),
 (1, 3, 4, 2),
 (1, 4, 2, 3),
 (1, 4, 3, 2),
 (2, 1, 3, 4),
 (2, 1, 4, 3),
 (2, 3, 1, 4),
 (2, 3, 4, 1),
 (2, 4, 1, 3),
 (2, 4, 3, 1),
 (3, 1, 2, 4),
 (3, 1, 4, 2),
 (3, 2, 1, 4),
 (3, 2, 4, 1),
 (3, 4, 1, 2),
 (3, 4, 2, 1),
 (4, 1, 2, 3),
 (4, 1, 3, 2),
 (4, 2, 1, 3),
 (4, 2, 3, 1),
 (4, 3, 1, 2),
 (4, 3, 2, 1)]

Understanding the inner mechanisms of iteration

Iteration is a process implying iterables (implementing the __iter__() method) and iterators (implementing the __next__() method). Iterables are any objects you can get an iterator from. Iterators are objects that let you iterate on iterables.

More about it in this article about how does the for loop work.

 

Use Cookies in Python

I was trying to download tons of files from a website, so I wrote a python script to automate this procedure. However, the website requires to login. Thus, I have to enable my script to login automatically. This leads me to use cookies, which will maintain the session and keep it alive after I succeed in login.

from cookielib import CookieJar
cookie = CookieJar()
opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cookie))
formdata = { "username" : <myusername>, "password": <mypassword>}
data_encoded = urllib.urlencode(formdata)
response = opener.open("http://<mywebsite>/user/login.html", data_encoded)
content = response.read()

Luckily, the website I am connecting to is not using any verification code, i.e. CAP.