Published: May 28, 2020 by

A few months ago I had the opportunity to collaborate with some Data Scientist porting PySpark queries to raw Python. One of the primary areas of concern was aggregation statements. These were seen as functionality that would be particularly troublesome to write in Python. As an example I was provided a Spark SQL query similar to this:

text = ocrdata.filter(col('lvl')==5).filter(f.length(f.trim(col'txt'))) > 0).select(txtcols) \  
 .withColumn('ctxt' casestd(col('txt'))) \  
 .drop('txt') \  
 .withColumn('tkpos', f.struct('wrdnm', 'ctxt')) \  
 .groupBy(lncols) \  
 .agg(f.sortarray(f.collectlist('tkpos')).alias('txtarray')) \  
 .withColum('txtln', f.concatws(' ', col('txtarray.casestd'))) \  
 .drop('txtarray')This query was transforming token data generated by Tesseract into lines. Beyond the aggregation operation there was also some concern that the operation may be ran against quite large datasets depending on how much [Tesseract](https://github.com/tesseract-ocr) output was being manipulated at once.

Outside of the raw functionality I was asked if the data could be structured to provide an interface with named columns in a style similar to SQL rather than having to reference positional data.

All of this seemed fairly straightforward. Provided with some sample data I pulled in the UDF that was already in Python and set out to apply the transformations first illustrating how we could interact with the data in a way similar to SQL with pipeline transformations and named references.

Porting Transformations

import itertoolsfrom csv import DictReader  
from collections import namedtupledef casestd(x):  
 if x.isupper():  
 return x.title()  
 elif not x.islower() and not x.isupper():  
 return x.title()  
 else:  
 return xwith open("sampledata/export.csv") as sample:  
 reader = DictReader(sample)  
 data = [row for row in reader]a = filter(lambda x: int(x["level"]) == 5, data)  
filtered = filter(lambda x: len(x["text"].strip()) > 0, a)  
fixed = ({**row, 'text':casestd(row["text"])} for row in filtered)tkpos = namedtuple("tkpos", "wordnum, text")  
result = (dict(row, **{"tkpos":tkpos(row["wrdnum"], row["text"])}) for row in fixed)To start I read the data in with a [DictReader](https://docs.python.org/3.7/library/csv.html#csv.DictReader) which allowed me to reference values by name like “level” and “text”. I then applied similar data transformations making use of [filter](https://docs.python.org/3.7/library/functions.html#filter), [comprehensions](https://docs.python.org/3/tutorial/datastructures.html#list-comprehensions), and [unpacking](https://docs.python.org/3/reference/expressions.html) to try and keep a style similar to some PySpark operations.

Finally I put the rest of the transformations into a generator expression containing a dict of namedtuplevalues so that later operations could continue working on named values in a manner similar to SQL columns.

GROUPBY

With the transformation and named values part out of the way I moved onto the GROUPBY aggregations. Thinking about GROUPBY the goal is to apply an aggregation function to a unique value. That unique value can be represented multiple ways, but I wanted to show the idea behind what was happening to help with future port efforts. So on my first pass I wrote:

grouped = []  
seen = set()*# Order is known because it represents data generated by tesseract  
*for row in fixed:  
 key = (row["pagenum"], row["blocknum"], row["parnum"], row["linenum"]) if key in seen:  
 continue seen.add(key) line = [] for r in fixed:  
 rkey = (r["pagenum', r["blocknum"], r["parnum"], r["linenum"])  
 if key == rkey:  
 line.append(r["ctxt"]) txt = " ".join(line)  
 cleantxt = txt.strip() if cleantxt:  
 grouped.append(  
 {  
 "pagenum": row["pagenum"],  
 "blocknum": row["blocknum"],  
 "parnum": row["parnum"],  
 "lnnum": row["linenum"],  
 "text": cleantxt,  
 }  
 )

Keeping in mind that this was to conceptualize what could be happening behind the scenes for the GROUPBYand AGG operation here we loop over our rows generating a hash from some values. Once we have this hash we check if we have seen it before by referencing a set. If this is a new value we find all values of the hash in our transformed data, append the tokens, handle empty tokens and finally add the data to our final dataset. At the end we have lines of text (instead of individual tokens) that can be referenced by page, block, paragraph and line number.

While this works it’s horribly inefficient. It stands out that we are reiterating our transformed data every time we find a new key. But the goal for this wasn’t to be efficient. It was to show the ideas expressed in SQL with Python. Specifically it was highlighting how to express a GROUPBY/AGG operation manually using hashes of values and tracking what we have and have not seen providing a final dataset that was the same as the output of the SQL statement.

itertools

Continuing on from that point one of my favorite Python modules is itertools. If you haven’t spent much time with it I highly recommend taking some of your existing code and looking over it while scanning the itertools docs. I’ve used islice, chain and ziplongest innumberable times. Because of that I knew there was a handy groupby function stowed in there too:

Make an iterator that returns consecutive keys and groups from the iterable.
The key is a function computing a key value for each element. If not specified
or is None, key defaults to an identity function and returns the element
unchanged.Generally, the iterable needs to already be sorted on the same key function.Replacing the block above:

final = []for key, group in itertools.groupby(  
 req, key=lambda x: (x["pagenum"], x["blocknum"], x["parnum"], x["linenum"])  
):  
 line = "".join([row['text'] + " " for row in group])  
 final.append({"pagenum": key[0],  
 "blocknum": key[1],  
 "parnum": key[2],  
 "linenum": key[3],  
 "text": line,  
 })And with that change we have a clean, faster implementation. Additionally since this was a port of Spark SQL if the data was to get truly large it wouldnt be much work to start iterating through all of the pipeline in batches since we can use generators all the way through.

Conclusion

So what was the point of sharing that here? Nothing specific. It was a fun exercise at the time, and it made me pause to consider how I would express GROUPBY on my own. The exercise also helped introduce some of my colleagues to the filter expression and in turn map and reduce. Using those they were able to express a lot of their pipeline concepts without a lot of the iteration structures they were used to having abstracted away. If you find yourself doing a lot of pipelining I recommend checking out itertools and functools. Both are built into the Python stdlib and provide a lot of helpful functionality.

python, sql, programming