Skip to content

Index

Dealing with NULL in PySpark transformations

Lately I’ve been dealing with nested data on a semi regular basis with PySpark. One of the scenarios that tends to come up a lot is to apply transformations to semi/unstructured data to generate a tabular dataset for consumption by data scientist. When processing and transforming data I’ve previously found it beneficial to make use of the RDD data structure so that I have the ability to easily apply custom transformations the same way I would if I was interacting with normal Python data structures, but with the benefit of Spark and the functionality provided by the RDD API.

With my most recent project though I decided to spend more time working with the Spark Dataframe data structure specifically for the potential performance gains from Catalyst and Tungeston. Along with this Spark offers a set of complex types for Spark Dataframe columns to make interaction with collection types a little bit easier.

Diving in I immediately used the Databricks XML library to load some data into my dataframe which had a similar shape (although different contents) to this:

from pyspark.sql import Row  
from pyspark.sql.functions import explode, first, col, monotonicallyincreasingid, when, array, lit  
from pyspark.sql.column import Column, tojavacolumn  

df = spark.createDataFrame([  
 Row(dataCells=[Row(posx=0, posy=1, posz=.5, value=1.5, shape=[Row(type='square', len=1)]),  
 Row(posx=1, posy=3, posz=.5, value=4.5, shape=[]),  
 Row(posx=2, posy=5, posz=.5, value=7.5, shape=[Row(type='circle', len=.5)])  
 ])  
])  

df.printSchema()  

 root  
 |-- dataCells: array (nullable = true)  
 | |-- element: struct (containsNull = true)  
 | | |-- posx: long (nullable = true)  
 | | |-- posy: long (nullable = true)  
 | | |-- posz: double (nullable = true)  
 | | |-- shape: array (nullable = true)  
 | | | |-- element: struct (containsNull = true)  
 | | | | |-- len: long (nullable = true)  
 | | | | |-- type: string (nullable = true)  
 | | |-- value: double (nullable = true)df.show()  

 +--------------------+  
 | dataCells|  
 +--------------------+  
 |[[0, 1, 0.5, [[1,...|  
 +--------------------+

Perfect. Nothing too crazy, but I wanted to transform the nested array of structs into column representing the members of each struct type. So I started by looking at the options available to flatten my array column and I came across which appeared to do exactly what I needed. Next I needed to take the member attributes of the structs and turn those into columns. I wasn’t able to find a built in function for this, but using the select syntax available on dataframes along with the* wildcard available on structs I was able to write my own function to do this.

def flattenstructcols(df):  
 flatcols = [column[0] for column in df.dtypes if 'struct' not in column[1][:6]]  
 structcolumns = [column[0] for column in df.dtypes if 'struct' in column[1][:6]]  

 df = df.select(flatcols +  
 [col(sc + '.' + c).alias(sc + '' + c)  
 for sc in structcolumns  
 for c in df.select(sc + '.*').columns])  

 return dfAnd with that out of the way Im ready to go.

flatdf = df.withColumn('dataCells', explode(col('dataCells')))  
flatdf = flattenstructcols(flatdf)  
flatdf.show(3)  

 +--------------|--------------|--------------|---------------|---------------+  
 |dataCellsposx|dataCellsposy|dataCellsposz|dataCellsshape|dataCellsvalue|  
 +--------------|--------------|--------------|---------------|---------------+  
 | 0| 1| 0.5| [[1, square]]| 1.5|  
 | 1| 3| 0.5| []| 4.5|  
 | 2| 5| 0.5| [[, circle]]| 7.5|  
 +--------------|--------------|--------------|---------------|---------------+
 flatdf.printSchema()  

 root  
 |-- dataCellsposx: long (nullable = true)  
 |-- dataCellsposy: long (nullable = true)  
 |-- dataCellsposz: double (nullable = true)  
 |-- dataCellsshape: array (nullable = true)  
 | |-- element: struct (containsNull = true)  
 | | |-- len: long (nullable = true)  
 | | |-- type: string (nullable = true)  
 |-- dataCellsvalue: double (nullable = true)So far so good. Lets try it again, and if all goes well we can throw this in a loop, flatten nested columns and be on our way.

flatdf = flatdf.withColumn('dataCellsshape', explode(col('dataCellsshape')))  
flatdf = flattenstructcols(flatdf)  
flatdf.show(3)  

 +--------------|--------------|--------------|---------------|--------------------|---------------------+  
 |dataCellsposx|dataCellsposy|dataCellsposz|dataCellsvalue|dataCellsshapelen|dataCellsshapetype|  
 +--------------|--------------|--------------|---------------|--------------------|---------------------+  
 | 0| 1| 0.5| 1.5| 1| square|  
 | 2| 5| 0.5| 7.5| null| circle|  
 +--------------|--------------|--------------|---------------|--------------------|---------------------+

And now we have a problem. After back tracking I found that explode is silently dropping out my row with null in it. Let's check the docs. Interestingly I didn't see anything about this. So I checked the latest docs and just so happened to noticeexplodeouter listed right below this. It turns out in 2.2.0 a set ofouter functions where added that retain null for certain operations such as explode. Unfortunately some of these are not available in PySpark until 2.3 and I didn't have the option to migrate from 2.2.x to 2.3.x.

StackOverflow to the rescue. After reviewing the PySpark tag I didn't find any solutions with accepted answers so I went ahead and wrote my own question. Thanks to that I learned a lot about PySpark/JVM interop and about some of the disparities between the JVM API and other language APIs.

Otherwise()

flatdf = df.withColumn('dataCells', explode(col('dataCells')))  
flatdf = flattenstructcols(flatdf)  
flatdf.withColumn('dataCellsshapetest', explode(when(col('dataCellsshape').isNotNull(), col('dataCellsshape'))  
 .otherwise(array(lit(None).cast(flatdf.select(col('dataCellsshape')  
 .getItem(0))  
 .dtypes[0][1]))))).show()  

+--------------|--------------|--------------|---------------|---------------|--------------------+  
|dataCellsposx|dataCellsposy|dataCellsposz|dataCellsshape|dataCellsvalue|dataCellsshapetest|  
+--------------|--------------|--------------|---------------|---------------|--------------------+  
| 0| 1| 0.5| [[1, square]]| 1.5| [1, square]|  
| 2| 5| 0.5| [[, circle]]| 7.5| [, circle]|  
+--------------|--------------|--------------|---------------|---------------|--------------------+

Based on some responses to my question I found another question that provided a scala solution involving .otherwise and casting the nested structure with a null literal. None in Python. This seemed like the more direct solution without making use of private functionality in the library, so I opted to try implementing the scala solution in PySpark first.

But unfortunately it appears that the explode may have a precedence behind the scenes that drops the row before otherwise is evaluated. With a quickly approaching deadline I unfortunately did not have time to dig deep into why this was with other options on the table.

Into the JVM

def explodeouter(col):  
 """  
 Calling the explodeouter Java function from PySpark  
 """  
 explodeouter = sc.jvm.org.apache.spark.sql.functions.explodeouter  
 return Column(explodeouter(tojavacolumn(col)))flatdfwithnull = df.withColumn('dataCells', explode(col('dataCells')))  
flatdfwithnull = flattenstructcols(flatdfwithnull)  
flatdfwithnull = flatdfwithnull.withColumn("dataCellsshape", explodeouter(col("dataCellsshape")))  
flatdfwithnull.show()  

 +--------------|--------------|--------------|---------------|---------------+  
 |dataCellsposx|dataCellsposy|dataCellsposz|dataCellsshape|dataCellsvalue|  
 +--------------|--------------|--------------|---------------|---------------+  
 | 0| 1| 0.5| [1, square]| 1.5|  
 | 1| 3| 0.5| null| 4.5|  
 | 2| 5| 0.5| [, circle]| 7.5|  
 +--------------|--------------|--------------|---------------|---------------+
 flatdfwithnull = flattenstructcols(flatdfwithnull)  
flatdfwithnull.show()  

 +--------------|--------------|--------------|---------------|--------------------|---------------------+  
 |dataCellsposx|dataCellsposy|dataCellsposz|dataCellsvalue|dataCellsshapelen|dataCellsshapetype|  
 +--------------|--------------|--------------|---------------|--------------------|---------------------+  
 | 0| 1| 0.5| 1.5| 1| square|  
 | 1| 3| 0.5| 4.5| null| null|  
 | 2| 5| 0.5| 7.5| null| circle|  
 +--------------|--------------|--------------|---------------|--------------------|---------------------+

While reviewing suggested solutions I found out that SparkContext has ajvm object that provides access to org.apache.* functionality. Along with this I also noticed that Databricks has an entire "private" api used with Python and Java. Part of this API istojavacolumn which makes it possible to transform a PySpark column to a Java column to match Java method signatures. Learning all of this, and knowing that the Java API already had explodeouter implemented I reviewed the Java explodeouter method to verify the type signature and built my own function in Python to call the Java function and return the column with null in place.

And it works! With that I am able to flatten out arbitrarily nested collections in PySpark dataframes while retaining nulls when using Spark 2.2.x.

Wrapping Up

A couple of things to note; if you have an array with more than one struct as a member this will fail, and if you have a deeply nested structure the growth of this transformation is typically not sustainable on a large dataset.

I have questions that I hope to continue spending time on. For instance why are rows with null dropped at all? I wonder if the operation makes a new dataframe from the column to apply the operation to and then joins it back on an index and along the way that join loses nulls. Why are functions that are lossy not identified as such? Is there always a version lag between the JVM api and the PySpark api? I'm also curious how Catalyst handles denesting operations and adding new columns from the result of exploding arrays or flattening structs.

Finally instead of adding new columns I want to try using the MapType to instead create a new column of key, value pairs that allows me to flatten out arbitrarily deep collections into a MapType so that I can use the same methodology on much deeper structures without adding a lot of columns that are mostly null.

DerbyPy Intro to PySpark

This month at DerbyPy I provided a high level introduction to PySpark. For this talk I went over the Spark execution model at a high level, talked about the difference between the PySpark Dataframe and RDD api, and provided some examples of how to use both. As part of this I put together a jupyter notebook and some scripts that can be used via spark-submit along with instructions on how to run PySpark locally.

If you’re interested in the material and presentation they can be found here.

DerbyPy Introduction to Python Modules and Packages

Most programming languages offer ways to organize your code into namespaces. These namespaces are logical containers that group different names and behaviors together and isolate them to that namespace. By organizing your code with namespaces it makes it easier to structure your application without naming collisions and it can make it easier for you and others to maintain your code by adding some additional organization to your project.

In Python we can use modules and packages to create namespaces that we can then reference in other modules as we build our application.

A Python module is a .py file containing Python definitions and statements. The file name is the module name with the suffix.py appended.

As with all things in Python when we import a module it is an object, and just like other objects it has dunder (double underscore) attributes that define additional data about that module. We can use that to learn more about the module before we ever start to use it.

import pprintext  

print(pprintext.doc)  
print(dir(pprintext))  
 A module providing extensions to pretty print structures that pprint may not handle well.  

 ['builtins', 'cached', 'doc', 'file', 'loader', 'name', 'package', 'spec', 'listdirectory', 'os']From the output of dir() we can see there is a function called listdirectory that is part of this module.

pprintext.listdirectory("plugins")  

 plugins/  
 ipynb/  
 init.py  
 liquid.py  
 markup.py  
 requirements.txt  
 .git  
 README.md  
 ipynb.py  
 LICENSE  
 .gitignore  
 core.py  
 pycache/  
 core.cpython-36.pyc  
 init.cpython-36.pyc  
 markup.cpython-36.pyc  
 ipynb.cpython-36.pyc  
 tests/  
 pelican/  
 pelicanconfmarkup.py  
 pelicanconfliquid.py  
 theme/  
 templates/  
 base.html  
 content/  
 with-meta-file.ipynb-meta  
 with-liquid-tag.ipynb  
 with-metacell.ipynb  
 with-meta-file.ipynb  
 with-liquid-tag.md

Finally, we can see where we are importing this module from with .file and we see that this is a module local to our application.

pprintext.file '/home/alex/projects/alexhagerman.github.io/pprintext.py'### Packages

For the sake of brevity and simplicity tonight we can say that a Python package is a collection of Python modules. It is a folder that contains .py file and provides a parent namespace for the modules in the folder.

Another way of saying this is:

Just like we did with our module we can call dir() on our package to see associated attributes and objects.

import pprintextension  
dir(pprintextension)  

 ['all',  
 'builtins',  
 'cached',  
 'doc',  
 'file',  
 'loader',  
 'name',  
 'package',  
 'path',  
 'spec',  
 'network',  
 'pprintextension']

Additionally, we can call help which may provide more information about the package defined in init.py. You can think of init.py as a place to put initialization behavior and documentation for your package. In the way thatinit handles initializing your class init.py handles the initialization of your package during import.init.py used to be required to make a directory a package, but as of Python 3.3 thanks to pep-420 it is no longer required. More links and information are provided at the end of the notebook.

help(pprintextension)  

 Help on package pprintextension:  

 NAME  
 pprintextension  

 DESCRIPTION  
 A package providing functions to pretty print structures that may have alternative renderings from the standard  
 pprint package.  

 PACKAGE CONTENTS  
 filesystem  
 network  

 DATA  
 all = ['filesystem']  

 FILE  
 /home/alex/projects/modules-and-packages-into/pprintextension/init.pyAdditionally we can import modules from packages and refer to them directly instead of using the fully qualified namespacing syntax <package>.<module>.<object>

from pprintextension import filesystem  
filesystem.listhiddendirectory()  

 ./  
 .ipynbcheckpoints/  
 .git/  
 .idea/Packages go way beyond what we have covered here. As you build packages you want to consider their structure relative to the public API youre creating. Publishing and distributing packages is a talk or series of talks on its own. For now what we have covered is how we can group modules together in a package and some basics for how to control the initialization behavior of a package.

Finishing up

Now that we know what a Python module and package is next month we will look at the import statement. As a sneak peak I'll leave you with sys.path and you can begin exploring how this relates to our own packages and modules that make up our application as well as those we might install with tools such as pip or conda.

import sys  
sys.path  

 ['',  
 '/home/alex/miniconda3/envs/blogging/lib/python36.zip',  
 '/home/alex/miniconda3/envs/blogging/lib/python3.6',  
 '/home/alex/miniconda3/envs/blogging/lib/python3.6/lib-dynload',  
 '/home/alex/miniconda3/envs/blogging/lib/python3.6/site-packages',  
 '/home/alex/miniconda3/envs/blogging/lib/python3.6/site-packages/IPython/extensions',  
 '/home/alex/.ipython']<https://docs.python.org/3/library/sys.html#sys.path>

Additional Reading

Recursive Search with Python

Recently I received from JSON like data that I needed to transform into a tabular dataset. As part of that there was a specific key that could occur as a child of different keys at different depths in the structure. Not only could the key I needed appear at different locations and depths, but when it was located it was possible that it would have N sibling occurrences I needed to retrieve at the same location. Finally for all of these there were a set of id and date keys at the top level of the structure that I was asked to include with each search key result.

I took a couple different paths on my way to solving this. One of the first things I found was the total depth was inconsistent across the structures. Not only that, but it wasn’t uncommon to find the key scattered across depths up to 5 or 6 levels deep. The function below is what I ended up using. It’s a recursive search that relies on the fact that the data is JSON like. Instead of trying to pull the parent keys out as part of the search I have a function that parses out the id and date keys passing those into this function as base. Then a search is performed on the input object checking the dictionary collections for all instances of the search key and when located appending the search keys value to the base data, which is then added to a list of results which is returned when the entire collection has been searched.

Gotchas

  • This needed to be Python 2 and 3 compatible so pay attention to iterating dictionary keys and values when you have this requirement. There are different ways to handle this. I used future.
  • The way that Python appends to list can be tricky. This bit me when I found that results contained the right number of results, but all of my results where the same and where based on the last hit. This is because I was calling append on base which was creating bindings that I mutated on each search result. Luckily Python has acopy module in the standard library to help with this scenario.

Problem Solved

The function below represents my final result. This worked well on the sample data, and eventually was used on PySpark RDDs to process hundreds of millions of structures quickly.

import copy from future.utils   
import iteritems 

def search ( input , rowbase , searchkey , results ):   
     """ A search function to help transform nested JSON   
     like objects into tabular rows. The function takes  
     a JSON like object as input along with a search key   
     and returns a row for each occurrence of the key in   
     the object. rowbase is expected to be a list containing  
     any base data you would like associated with the   
     searchkey data.   
     """
     if input :   
         for i in input :   
             # If input contains a list run it through search   
             # again since it may contain dictionaries with   
             # the key being searched   
             if isinstance (i, list):   
                 search (i, rowbase, searchkey, results)   
             # If input contains a dictionary check if it   
             # contains the searchkey. Also check if any of   
             # the values are list or dictionaries that need   
             # to be searched   
             if isinstance (i, dict):   
                 for k, v in iteritems (i):   
                 # If the searchkey is located deepcopy   
                 # rowbase to prevent changing rowbase   
                 # on future hits. Create full row and   
                 # append to results   
                 if k == searchkey:   
                     row = copy.deepcopy(rowbase)   
                     row.append(i)  
                     results.append(row)   
                     continue  
             elif isinstance(v, list):   
                 search(v, rowbase, searchkey, results)  
             elif isinstance(v, dict):   
                 search(v, rowbase, searchkey, results)  
         # Search has been exhausted return search  
         # results to caller. Results will be a   
         # list of list. 
         return results

Next Steps

Since this works there are a couple of ideas I want to explore with it.

  • This seems like a good place to gain experience with Python type annotations.
  • Since this needs to work in a pure Python environment as well as a PySpark environment I want to do some profiling, but I’m not sure how tools like Cython or Numba will work/interact with the PySpark piece of this. That will be interesting to explore.
  • It would be interesting to add depth tracking and see if there are any levels where the search key never occurs so that the function could potentially skip iteritems at that level.

Docs

For more information documentation on copy and future you can check out the documentation.

I’m sure others will have different ideas and approaches to something like this. Or you might have suggestions on something that could be done to make this faster or easier to read. If you have feedback or suggestion feel free to send them my way via up via email.

Publishing with Pelican on Windows

To get things started I thought it might be a good idea to document using Pelican on Windows with Github and Gandi for blog publishing. I’ll start by configuring Pelican and Github. Once that’s working I’ll then talk about configuring Gandi so you can use a custom domain. If you’re using a different domain provider you may need to use different settings, but Github has plenty of documentation around this that I’ll provide links for. Using Pelican on Windows isn’t that much different than macOS or Linux, but you won’t find as many tutorials or be able to use the quickstart makefile.

Github Pages Setup

The first thing you should do is login to Github and then setup a Github pages repo. You can read more detailed istructions here: https://pages.github.com/ or create a repo that follows the pattern:

I followed a pattern for User Github pages. This will be important when publishing with Pelican.

https://help.github.com/articles/user-organization-and-project-pages/

Pelican Local

With that out of the way we want to move on to setting up our project on Windows. I’m using Anaconda and I will be creating a new conda environment for this project.

The main thing to pay attention to when you go through the quickstart prompts is that you won’t need or be able to use the makefile with Windows. Once you have completed the quikstart there are a couple things to pay attention to.

  1. Your articles should be markdown documents in the content folder.
  2. pelicanconf.py contains various settings related to you blog.
  3. publishconf.py can be left alone because we are using ghp-import

Publishing

Go ahead and create a file under content. Something like gettingstarted.md and add some text. Once you’ve done that switch back to the terminal prompt.

Custom Domain URL

Ok, now that we have Github setup and we can see our blog pages I want to look at the steps required to use my custom domain hosted by Gandi with the Github pages. With Gandi we want to modify our A Records to allow routing to Github. Logging into your Gandi dashboard, select domains from the menu and then DNS records. On this page you should be able to edit your DNS record and add the following:

https://wiki.gandi.net/en/dns/zone/a-record

Ok finally navigate back to your Github repo and go to the settings page. Under settings scroll down until you see Github pages. You should see a textbox allowing you to enter a custom domain. Add that, and if possible I recommend checking the enforce https box below this.

Wrapping Up

With that done you should be good to go. Whenever you want to write a new article create a markdown document in the content folder and follow the same steps above for publishing. One last note if this doesn’t work immediately you might want to wait before beginning to change settings since your A record changes can take some time to replicate.

Getting Started

I’ve needed to start this for a while. I think most programmers, sys admins, dbas and others work on projects large and small where we quickly find out how to make x work with y or a produce b. Then we check into source control, walk away and forget until we need to remember a detail 6 weeks later. So that’s what I plan to record here. Little notes and snippets of code, settings, whatever might have been on my mind or come across my keyboard in recent days or weeks. Often times it will have something to do with Python, SQL or clustered systems.

The first few post will be on getting Pelican setup with GitHub Pages and Gandi. After that I’ll probably jump into PySpark, HBase, Python and execution plans. Chances are if databases, Python or distributed systems are involved I’m interested in learning and writing about it. If that sounds interesting definitely keep an eye out for more post here.

A little bit about myself. I am currently a data engineer at Humana writing a lot of Python and SQL mixing software engineering practices with data science projects. Before that I worked at a few different companies focused on the Microsoft Data Stack mixing SQL Server, SSIS, SSAS and C# to build data intensive applications. There’s more information on my LinkedIn and GitHub pages.

That’s all for now. More to come soon.