3 Data Processing Pipelines You Can Build With Python Generators

Using generators to create data processing pipelines in Python

3 Data Processing Pipelines You Can Build With Python Generators
Photo by maxxyustas, rights obtained via Envato Elements.

Ever since I saw David Beazley’s presentation about generators in Python, I have been intrigued by them. If you didn’t see it, I recommend watching Python Discovery.

Python generators are a powerful concept. You can use them to create very efficient data processing pipelines. In this article, I will show you three practical examples.

We will see how to string together several filters to create a data processing pipeline. This pipeline is able to process large datasets or streams of data. The big advantage of using generators is that you won’t be maxing out your machine’s memory.

The diagram below shows what we want to create. We want to process a stream of data by combining a set of filters. The ultimate solution should make it easy to add, remove, or change filters.

A diagram showing all the individual filters as boxes with data flow through it
Chaining multiple filters to create a processing pipeline. Image by the author.

This Pipe and Filter pattern is not new. It is frequently used in Unix-like operating systems to construct powerful command lines by combining commands. For example, this command line where several commands are piped together using the pipe | symbol:

$ ps aux | grep [processtokill_name] | grep -v grep | awk '{print $2}' | xargs kill

The pattern is also described as an architectural pattern. For example, in Pattern-Oriented Software Architecture, Buschmann et al. describe it as:

“The Pipes and Filters architectural pattern provides a structure for systems that process a stream of data. Each processing step is encapsulated in a filter component. Data is passed through pipes between adjacent filters. Recombining filters allow you to build families of related systems.”

In Enterprise Integration Patterns, Gregory Hope writes:

“Use the Pipes and Filters architectural style to divide a larger processing task into a sequence of smaller, independent processing steps (Filters) that are connected by channels (Pipes).”

The funny thing is that Hope and Buschmann talk about this pattern in the context of systems. Instead, I will demonstrate the pattern using three practical examples in Python. These examples will:

  • Calculate the sum of all Series A funding from a CSV file.
  • Calculate the number of bytes transferred by a webserver.
  • Calculate the size of a directory on the file system.

I will go through each part of the source code. If you can’t wait, you can find it in this GitHub repository.


1. Calculate the Sum of All Series A Funding

In the first example, we create a processing pipeline to calculate the total amount of money a company received through series A funding. We calculate the total by parsing a CSV file from CrunchBase.

The diagram below shows the processing pipeline for parsing the CSV file:

A diagram showing all the individual filters as boxes with data flow through it
Converting the CSV to a total using filters. Image by the author.

Crunchbase is a platform that provides information about investments and funding. Besides an API, they provide their data via a CSV. You have to pay for the complete version. I used this free sample.

The Python script above reads the CSV file and returns the total sum of all Series A funding. “Series A” funding is the first venture capital that a startup receives.

On line 7, we define the pipeline using a Python list. I call each item in the list a filter. We start with a file_filter to open a file. The second filter uses Python DictReader to read the CSV. The series_a_filter selects only the records that are of type A. The last sum_filter sums the funding of each series A record.

The log_filter is a filter that you can place anywhere in the pipeline. It logs each item as it passes through the channel.

If we run the pipeline using print(next(pipeline)), we get the following output:

The command line and result when executing the pipeline via Python
Executing the processing pipelines. Image by the author.

The individual filters

In this example, we saw several filters being used. Each filter is implemented using a Python function. The general rule for a filter is that it accepts a generator and returns a generator. The generator is created using the yield statement.

Let's look at each filter and examine its implementation.

File filter

The file filter accepts a list of file names, opens the file, and returns the opened file using the yield statement.

Dict filter

The dict filter uses the Python DictReader from the CSV module. It creates a reader that maps the columns in each row to a dictionary. The keys of the dictionary are retrieved from the first row of the file. The first row in a CSV often states the column names.

The dict_filter retrieves an open file and creates the DictReader using the opened file. It then iterates over all the rows and returns each row using the yield statement.

Serie A filter

The CSV contains records for several types of funding, such as series A, B, and C. We only want the Series A records. The Serie A filter does exactly that. It filters only the Series A records.

The implementation iterates over all the rows. We determine if the row contains a Series A funding by checking if the column “round” contains an “a.” It returns the row using the yield statement.

I think you are beginning to see a pattern here. Every filter has an enumerable as input and yield as output.

Sum filter

The last functional filter is the sum filter. It is responsible for calculating the total of the Series A funding.

As with each filter, it receives an iterator and returns the sum using the yield statement.

Logging filter

The logging filter can be used before or after every filter. It does one thing: It iterates over the incoming input, logs the item, and returns the items using yield.

Combining the filters

Finally, we need to combine each filter to create the pipeline. Take a good look at the create_processing_pipeline function. This function strings together the individual filters.


2. Calculating the Number of Bytes Transferred by a Webserver

In the second example, we create a processing pipeline to calculate the total number of bytes served by an Apache webserver. The total is calculated by parsing the access logs.

The diagram below shows the processing pipeline for parsing the Apache webserver logs:

A diagram showing all the individual filters as boxes with data flow through it
Calculating the total bytes served by the Apache webserver. Image by the author.

The creation of the pipeline looks a lot like in the previous example. The first filter iterates over the given filenames of the access logs, opens each file, and reads it line by line.

A single line of the access log looks like this:127.0.0.1 — — [10/May/2013:10:35:35 +0200] “GET /favicon.ico HTTP/1.1” 404 295

The last number at the end, 295 in this case, indicates the number of bytes transferred by the webserver. When no bytes are transferred, it will be “-”.

The bytes filter parses this line and returns the last part. The convert bytes filter converts the string into an integer. When it indicates “-”, the convert bytes filter returns zero.

The last filter, the sum filter, calculates the total bytes.

The individual filters

With this example, each filter is implemented using a Python function. The general rule for a filter is that it accepts a generator and returns a generator. The generator is created using the yield statement.

Let’s look at the filters and examine their implementation.

File filter

The file filter is the same as in the previous example. It receives a list of filenames and returns opened files using yield.

Line filter

The line filter is responsible for reading the opened file line for line. It returns each line using the yield statement.

Bytes filter

The bytes filter splits the line and returns the token at the end of the line. This token can be a number of a dash.127.0.0.1 — — [10/May/2013:10:35:35 +0200] “GET /favicon.ico HTTP/1.1” 404 295

It uses rsplit to split the line from the right with a maximum of 1 split. We then get two items. We get the number of bytes from the item at index 1.

Convert bytes filter

The convert bytes filter is responsible for converting the bytes string into a number. The bytes string can contain a number or a dash.

I implemented this filter using a generator expression instead of using the yield statement. Generator expressions look like list comprehensions but return a generator instead of a list. You convert a list comprehension into a generator expression by adding braces around the statement.

Sum filter

The last filter is the sum filter. It simply returns the sum using the yield statement.

Combining the filters

The filters are combined the same way as in the previous example. If we run the script, we get the following output:

The command line and result when executing the pipeline via Python
Executing the processing pipelines. Image by the author.

3. Calculating the Total Sum of All Files in a Directory

In this third and last example, we create a processing pipeline that calculates the total size of all the files in a directory.

The diagram below shows the processing pipeline for calculating the sum of all files in a directory:

A diagram showing all the individual filters as boxes with data flow through it
Calculating the total files of all files in a directory. Image by the author.

Again, the implementation looks a lot like in the previous examples, but with different filters. The first filter, collect_filter, recursively collects all the files in a directory.

The size_filter receives a filename and determines the size of the file. The last filter, the sum_filter, calculates the total size.

The individual filters

As with the other two examples, each filter is implemented using a Python function that accepts and returns a generator. Let’s look at the filters and examine their implementation.

Collect filter

The collect filter is responsible for collecting all the files in a directory. To collect the files, we use iglob. This is a function that can recursively search through a directory. It returns a generator.

Because I want to include the hidden files, I chain two glob functions together. The second one is for searching hidden files. The result is an iterator that delivers the files.

Size filter

The size filter is responsible for getting the size of the file. It uses os.path.getsize() to retrieve the size of the file in bytes.

Sum filter

The last filter is the sum filter. It simply returns the sum of all sizes using the yield statement.

Combining the filters

We combine the individual filters the same way as in the previous pipelines. If we run the script, we get the following output:

The command line and result when executing the pipeline via Python
Executing the processing pipelines. Image by the author.

Conclusion

After going over these three examples, I hope you agree that Python generators are a powerful concept.

Generators allow you to decouple iteration from the code that uses the results of the iteration. By handling a single item at a time, there is no need to load an entire dataset into memory. This means that there is no limit to the size of data you can process.

All three examples use a common way to define and connect the filters using Python functions and the yield statement. With this structure, it is easy to construct different pipelines by modifying or rearranging the filters.

You can find the source code of the examples in this GitHub repository.

Thanks for reading. I hope this article gives you the inspiration to create your own processing pipelines. Let me know what types of pipelines you created by leaving a response below.