Pyspark Practics (2) – SPARK_LOCAL_DIRS

 

You may have run into the error that there is no space left on the disk for shuffle RDD data although you seems having much than enough disk space in fact.

It happens because usually we allocate a not-so-large space for system dir /tmp, while SPARK by default use /tmp for shuffle RDD data which might be quite large. (There are some posts questioning whether SPARK never clean temporary data – which can be a severe problem that I personally did not confirm). Anyway, as you can guess now, the SPARK_LOCAL_DIRS is designed for this purpose that specifies the location for temporary data.

You could configure this variable in conf/spark-env.sh, e.g. use hdfs

SPARK_LOCAL_DIRS=hdfs://server:50090

There is spark.local.dirs in conf/spark-default.conf for the same purpose, which however will be overwritten by SPARK_LCOAL_DIRS.

Pyspark Practice (1) – PYTHONHASHSEED

Here is a small chunk of code for testing Spark RDD join function.


a=[(1, 'a'), (2, 'b')]
b=[(1, 'c'), (4, 'd')]
ardd = sc.parallelize(a)
brdd = sc.parallelize(b)
def merge(a, b):
if a is None:
return b
if b is None:
return a
return a+b
ardd.fullOuterJoin(brdd).map(lambda x: (x[0], merge(x[1][0], x[1][1]))).collect()

This code works fine. But when I apply this to my real data (reading from HDFS and Join and write it back). I ran into the PYTHONHASHSEED problem again! YES AGAIN. I did not get chance to fix this problem before.

This problem happens for Python 3.3+. The line of code responsible for this trouble is pythont/pyspark/rdd.py, line 74.

 if sys.version >= '3.3' and 'PYTHONHASHSEED' not in os.environ:
        raise Exception("Randomness of hash of string should be disabled
via
PYTHONHASHSEED")

After searching around and trying many different proposals, I really got frustrated about this. It seems the community knows well this issue and Spark Github seems having fixed it (2015), while my version (2016) still does not work

A few options I found:

  1. put export PYTHONOHASHSEED=0 .bashrc
    • Failed. In a notebook, I could get out the os.environ[‘PYTHONHASHSEE’] and it was correctly set. This is the correct way for standalone python program, but not for spark cluster.
    • A possible reason is pyspark has a different set of environment variables. It is not about propagating this variable across workers either because even if all workers has this variable exported in .bashrc, it still will complain.
  2. SPARK_YARN_USER_ENV=PYTHONHASHSEED=0
    • Doesn’t work. Some suggested to pass this to pyspark when starting notebook. Unfortunately, nothing fortunate happened. and I don’t think I am even using yarn.

Anyway, in the end, I find the solution from this link. Most of pssh can be ignored. The only line matters is place ‘Export PYTHONHASHSEED=0’ in to conf/spark-env.sh for each worker, which confirms the statement that PYTHONHASHSEED=0 should be somehow placed into the Spark Run-time Environment.

Thanks to this post, which saves my ass: http://comments.gmane.org/gmane.comp.lang.scala.spark.user/24459

module ‘urllib’ has no attribute ‘request’

Ran into a error ” Module ‘urllib’ has no attribute ‘request’ ”

The script runs well before I threw it into a parallel mode by calling sc.parallelize(data, 8). The spark log shows the above error. So far, I could not find any solution by googling. I have printed the the python version used, which is 3.5. Have no clue where goes wrong.

Update.

after a few exploration, I finally found the solution, i.e. put a statement import urllib.request right before I use urllib.request.urlopen(…). Is this caused by the fact that I am using Jupyter, in which, the import statement was in another cell.

Jupyter + Spark cluster + HDFS

With Spark How to install Spark? and HDFS How to install HDFS?, we are moving to form a cluster. Oh, I hope you are using virtual machine if you are following this series. It matters because you don’t have go through every step again by just cloning a VM!

So clone completely a copy of virtual machine with Spark and HDFS installed. Boot it and run ifconfig to get the IP, e.g. 192.168.11.138. Let us call this copied vm the slave, and the original vm the master with IP 192.168.11.136.

HDFS cluster

  1. In the master, reformat namenode  giving a cluster name, whatever you want to call it

    $ hdfs namenode -format <cluster name>

  2. In the slave machine, edit your core-site.xml to replace localhost with your real IP address, e.g. 192.168.11.136. (you can always use ifconfig to find it out).
  3. In the master, edit etc/hadoop/workers and add one line for the additional worker, i.e. 192.168.11.138. If you are looking at hadoop-2.7, it should be etc/hadoop/slaves. (No idea why they change the filename. Personally, I think slaves  is better as it says.)
  4. Start the services

    $ /opt/hadoop/sbin/start-dfs.sh

  5. Check by run ‘jps’ in 192.168.11.138 to see whether DataNode service has been started. Also, in your slave machine, you can put README.md file to HDFS and list from master. Also, try the Spark script using the hdfs url: hdfs://198.168.11.136:9000/README.md

That’s it! Goddamn it! These Hadoop guys really make the basic deployment easy!

Spark Cluster

Spark has a similar framework as HDFS, i.e. master-slave mode.


$ $SPARK_HOME/sbin/start-master.sh

Launch Spark monitoring: http://localhost:8080/

copy SSPARK_HOME/conf/slaves.template to SPARK_HOME/conf/slaves. Add 192.168.11.138 at the end of the file

$ $SPARK_HOME/sbin/start-slaves.sh

From the Spark Monitor, I can see the local worker (in the master machine) is alive, and the worker in slave machine (192.168. 11.138) never shows up. Not that easy, hah!

[Trouble shooting]

I login to the slave and run jps. Surprisely enough, it shows that the worker is running. I check the log – nothing!!!

I noticed the parameters in the worker thread by ‘ps -aux’. There is an option ‘host’, which is specified as a hostname. Since I copied the virtual machine, so my slave, i.e. 192.168.11.138, has the same dignity name as my master. That’s why!!!!

But even after I run start-slave.sh at my slave, trying to connect my master, it fails either. However, the worker in the master always shows up in the Spark monitoring.

$ $SPARK_HOME/sbin/start-slave.sh spark://192.168.11.136:7077

Why? Because my Spark master is running on localhost!!! So, I need to start my master on its IP, instead of hostname (in fact, it should be FQDN). So I went back to shutdown the master. and restart it by the following command

$ SPARK_MASTER_HOST=192.168.11.136  $SPARK_HOME/sbin/start-master.sh

Now, I run start-slave.sh on both machines. Two workers show up. For the start-slaves.sh, instead of localhost, I need to put the real IP of my master if I want my master is also a worker.


There is a script conf/spark-env.sh, which is used to make all these problem goes away. The idea is that you can configure all these environment variables in the script and run it in all nodes where you want turn it to be a worker. Easy, ah!


Jupyter

Now, let us connect jupyter to Spark master so we can parallelize our data processing but still have the power of jupyter. How?

$ pyspark –master spark://192.168.11.136:7077

That is it?! Yes, that’s it!

qooej

https://spark.apache.org/docs/1.1.0/spark-standalone.html

Spark in Jupyter

Jupyter is the Swiss knife for data scientist. I am addicted to it since I discovered this tool. As the limitation of python, esp. when we are dealing with high volume data, we may naturally wonder how to use Spark, which is another fantastic tool but for parallel data processing. This tutorial intends to bind them together for a powerful cannon.

If you don’t have Jupyter yet, install it.

$ sudo apt-get python-pip python-pip3
$ sudo pip install jupyter
$ sudo pip3 install jupyter
$ jupyter notebook

Although you installed jupyter with both version 2 and 3. If you check the ‘new’, you may see only one kernel either ‘Python 2’ or ‘Python 3’ but not both. To enable both kernels, you can simply run the following

$ python3 -m ipykernel install –user

Create a file for testing

$ echo “abcdabcef” >> README.md

Now, run ‘$ jupyter notebook’ anywhere you want. Once the page is up, create a new notebook using python 3 (or python 2 if you want). Try this example.

from pyspark import SparkContext
logFile = “README.md”
logData = sc.textFile(logFile).cache()
numAs = logData.filter(lambda s: ‘a’ in s).count()
numBs = logData.filter(lambda s: ‘b’ in s).count()
print(“Lines with a: %i, lines with b: %i” % (numAs, numBs))

Of course you;’ll get a “ImportError: No module named ‘pyspark'” because the jupyter has not integrated with Spark yet.

Or if you run ‘$ pyspark’, it should give you the Spark welcome doodle.

Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ ‘_/
/__ / .__/\_,_/_/ /_/\_\ version 2.0.1
/_/

Using Python version 2.7.12 (default, Jul 1 2016 15:12:24)
SparkSession available as ‘spark’.
>>>

Ok. Here comes the magic. You copy and paste the following lines to ~/.bashrc.

export PATH=$SPARK_HOME/bin:$PATH
export PYSPARK_DRIVER_PYTHON=/usr/local/bin/jupyter
export PYSPARK_DRIVER_PYTHON_OPTS=”notebook –NotebookApp.port=8880″
export PYSPARK_PYTHON=/usr/bin/python3.5

Run ‘$ pyspark’ again, you will see a different interface like the below:

[W 09:24:01.833 NotebookApp] WARNING: The notebook server is listening on all IP addresses and not using encryption. This is not recommended.
[W 09:24:01.833 NotebookApp] WARNING: The notebook server is listening on all IP addresses and not using authentication. This is highly insecure and not recommended.
[I 09:24:01.839 NotebookApp] Serving notebooks from local directory: /home/dvlabs
[I 09:24:01.839 NotebookApp] 0 active kernels
[I 09:24:01.839 NotebookApp] The Jupyter Notebook is running at: http://%5Ball ip addresses on your system]:8880/
[I 09:24:01.840 NotebookApp] Use Control-C to stop this server and shut down all kernels (twice to skip confirmation).

Now, open the browser, type sincerely ‘http://localhost:8880&#8217;, create a notebook with python 3, and rerun our example. And?! It works. Just like that.

from pyspark import SparkContext
logFile = “README.md”
logData = sc.textFile(logFile).cache()
numAs = logData.filter(lambda s: ‘a’ in s).count()
numBs = logData.filter(lambda s: ‘b’ in s).count()
print(“Lines with a: %i, lines with b: %i” % (numAs, numBs))

That I keep mentioning ‘python 3’ is for the syntax reason. If you are comfortable with python 2. Go for it! Just modify the code accordingly.

How to install HDFS?

I have a blog about HDFS before, which has more background knowledge about HDFS. In this blog, I mainly show the steps to install and configure HDFS in Ubuntu 16.04. This tutorial assumes you have oracle-java8-installer installed. If not, please refer the first step in

  1. Download latest Hadoop tar from Apache Hadoop. I chose 3.0.0-alpha1 binary.
  2. unzip the tar to a directory and create a soft link

    $ sudo tar zxvf  hadoop-3.0.0-alpha1.tar.gz -C /opt
    $ sudo ln -s /opt/hadoop-3.0.0-alpha1 /opt/hadoop

  3. edit ~/.bashrc file and add the following line.

    export HADOOP_HOME=/opt/hadoop
    export PATH=$HADOOP_HOME/bin:$PATH

  4. edit /opt/hadoop/etc/hadoop/hadoop-env.sh

    export JAVA_HOME=/usr/lib/jvm/java-8-oracle

  5. Now, you can check whether your hdfs is available or not

    $ hadoop

II. Configure the pseudo-distributed mode

The core-site.xml configuration

<property>
<name>fs.defaultFS</name>
<value>hdfs://localhost:9000</value>
</property>

the hdfs-site.xml configuration

<property>

<name>dfs.replication</name>
<value>1</value>
</property>
<property>
<name>dfs.namenode.name.dir</name>
<value>file:/tmp/dfs/namenode</value>
</property>
<property>
<name>dfs.datanode.data.dir</name>
<value>file:/tmp/dfs/datanode</value>
</property>

III. Start the NameNode and DataNode services

  1. initialize the HDFS database (NOTICE!!!! the database will be gone after you restart this machine and your namenode service will fail when you run start-dfs.sh, because this command create a temporary database under /tmp, as specified in the hdfs-site.xml. To avoid this issue, you can simply reconfigure the two properties dfs.datanode.data.dir and dfs.namenode.name.dir to be some permenent path in hdfs-site.xml.

    $ hdfs namenode -format

  2. Now, the HDFS “database” is ready but HDFS service isn’t running. If you run ‘$hdfs dfs -ls’, you will get an error saying, ConnectionRefused. To start the HDFS service (the API for operating this database), you have to run the following script.

    $ /opt/hadoop/sbin/start-dfs.sh

  3. But you may get the port 22 connection refused error. Create a passphraseless SSH key and add to the authorized list. The way to check  the following way is to try ‘ssh localhost’ and see whether it prompts for a passphrase
      $ ssh-keygen -t dsa -P '' -f ~/.ssh/id_dsa
      $ cat ~/.ssh/id_dsa.pub >> ~/.ssh/authorized_keys
      $ chmod 0600 ~/.ssh/authorized_keys

    [Trouble Shooting] Oddly enough, it complains ‘permission denied’. Search around to find that I can use the following command to check what is going on during SSH connection, i.e. the debug mode.

    $ ssh -v localhost

    what I can see is following. You are smart enough to figure it out what is going wrong here (notice the encryption schema checked).

    debug1: kex_input_ext_info: server-sig-algs=<rsa-sha2-256,rsa-sha2-512>
    debug1: SSH2_MSG_SERVICE_ACCEPT received
    debug1: Authentications that can continue: publickey,password
    debug1: Next authentication method: publickey
    debug1: Trying private key: /home/xxx/.ssh/id_rsa
    debug1: Trying private key: /home/xxx/.ssh/id_ecdsa
    debug1: Trying private key: /home/xxx/.ssh/id_ed25519
    debug1: Next authentication method: password

  4. rerun the start-dfs.sh again and you will see namenode and datanode started. You can always use the command jps to see whether they are running. Also, you can use netstat to check wehther the hdfs is active on port 9000

    netstat -tulp

IV. Sanity Check: you should be able to put files into the database and also to list

$ hdfs dfs -put /opt/hadoop/README.txt /
$ hdfs dfs -ls /


If you have Spark installed, you can also test HDFS with Spark, which is extremely straightforward by simply using the HDFS path instead of local path. Use ‘$hdfs dfs -cat /output/part-00000’ to see the results. It shows that HDFS is another way of storing file and Spark is compatible with HDFS.

>>> myfile=sc.textFile(‘hdfs://localhost:9000/README.txt’)
>> counts = myfile.flatMap(lambda line: line.split(” “)).map(lambda word: (word, 1)).reduceByKey(lambda v1,v2: v1 + v2)
>>> counts.saveAsTextFile(“hdfs://localhost:9000/output”)

To stop HDFS … (really, I won’t be going to tell you anything, lookup the manual for handy commands 🙂

$ /opt/hadoop/sbin/stop-dfs.sh

V. Last Word as a Gift

If anything goes wrong, check /opt/hadoop/logs/*.log, you may easily solve any problems you unfortunately run into.

 

 

https://hadoop.apache.org/docs/r2.7.2/hadoop-project-dist/hadoop-common/SingleCluster.html