\ Announcing fastparquet — Martin Durant

Martin Durant

Announcing fastparquet

written by Martin Durant on 2016-12-06

Posted to the world here

A compliant, flexible and speedy interface to Parquet format files for Python. It provides seamless translation between in-memory pandas DataFrames and on-disc storage.

We will introduce the two functions that will most commonly be used within fastparquet, followed by a discussion of the current Big Data landscape, python's place within it, and then details of how fastparquet fills one of the gaps on the way to building out a full end-to-end Big Data pipeline in python.

Teaser

New users of fastparquet will mainly use the functions write and ParquetFile.to_pandas. Both functions offer good performance with default values and both have a number of options to improve performance further.

import fastparquet

# write data
fastparquet.write('out.parq', df, compression='SNAPPY')

# load data
pfile = fastparquet.ParquetFile('out.parq')
df2 = pfile.to_pandas()  # all columns
df3 = pfile.to_pandas(columns=['floats', 'times']) # pick some columns

Introduction: Python and big data

Python was named as a favourite tool for data science by 45% of data scientists in 2016. Many reasons can be presented for this, and near the top will be:

"Big data", however, has been typically based on traditional databases and, in latter years, the Hadoop ecosystem. Hadoop provides a distributed file-system, cluster resource management (YARN, Mesos), and a set of frameworks for processing data (map-reduce, pig, kafka, and many more). In the past few years, Spark has rapidly increased in usage to become a major force, even though 62% of users use Python to execute Spark jobs (via pySpark).

The Hadoop ecosystem and its tools, including Spark, are heavily based around the Java Virtual Machine (JVM), which creates a gap between the familiar, rich Python data ecosystem and clustered big data with Hadoop. One such missing piece is a data format which can efficiently store large amounts of tabular data, in a columnar layout, and split it into blocks on a distributed file-system.

Parquet has become the de-facto standard file format for tabular data in Spark, Impala and other clustered frameworks. Parquet provides several advantages relevant to big-data processing:

fastparquet bridges the gap to provide native Python read and write access with no need to use Java.

Until now, Spark's Python interface provided the only way to write Spark files from Python. Much of the time is spent in de/serializing the data in the Java-Python bridge. Also note that the times column returned is now just integers rather than the correct datetime type. Not only does fastparquet provide native access to parquet files, it in fact makes the transfer of data to spark much faster.

# to make and save a large-ish DataFrame
import pandas as pd
import numpy as np
N = 10000000

df = pd.DataFrame({'ints': np.random.randint(0, 1000, size=N),
                   'floats': np.random.randn(N),
                   'times': pd.DatetimeIndex(start='1980', freq='s', periods=N)})
import pyspark
sc = pyspark.SparkContext()
sql = pyspark.SQLContext(sc)

Using the default spark single-machine configuration cannot handle the above DataFrame (out-of-memory error), so we'll perform timing for 1/10 of the data:

# sending data to spark via pySpark serialization, 1/10 of the data
%time o = sql.createDataFrame(df[::10]).count()
CPU times: user 3.45 s, sys: 96.6 ms, total: 3.55 s
Wall time: 4.14 s
%%time
# sending data to spark via a file made with fastparquet, all the data
fastparquet.write('out.parq', df, compression='SNAPPY')
df4 = sql.read.parquet('outspark.parq').count()
CPU times: user 2.75 s, sys: 285 ms, total: 3.04 s
Wall time: 3.27 s

The fastparquet library

fastparquet is an open source library providing a Python interface to the Parquet file format. It uses Numba and NumPy to provide speed, and writes data to and from pandas DataFrames, the most typical starting point for Python data science operations.

fastparquet can be installed using conda:

> conda install -c conda-forge fastparquet

(currently only available for Python 3)

Bleeding edge installation directly from the GitHub repo is also supported, so long as Numba, pandas, pytest and ThriftPy are installed.

Reading parquet files into pandas is simple and, again, much faster than via pySpark serialization.

import fastparquet
pfile = fastparquet.ParquetFile('out.parq')
%time df2 = pfile.to_pandas()
CPU times: user 812 ms, sys: 291 ms, total: 1.1 s
Wall time: 1.1 s

The parquet format is more compact and faster to load than the ubiquitous CSV format.

df.to_csv('out.csv')
!du -sh out.csv out.parq
490M    out.csv
162M    out.parq

In this case, the data is 229MB in memory, and this translates to 162MB on-disc as Parquet or 490MB as CSV. Loading from CSV takes substantially longer than from Parquet.

%time df2 = pd.read_csv('out.csv', parse_dates=True)
CPU times: user 9.85 s, sys: 1 s, total: 10.9 s
Wall time: 10.9 s

The biggest advantage, however, is the ability to pick only some columns of interest. In CSV this still means scanning through the whole file (if not parsing all the values), but the columnar nature of Parquet means only reading the data you need.

%time df3 = pd.read_csv('out.csv', usecols=['floats'])
%time df3 = pfile.to_pandas(columns=['floats'])
CPU times: user 4.04 s, sys: 176 ms, total: 4.22 s
Wall time: 4.22 s
CPU times: user 40 ms, sys: 96.9 ms, total: 137 ms
Wall time: 137 ms

Example

We have taken the airlines dataset and converted it into Parquet format using fastparquet. The original data was in CSV format, one file per year 1987-2004. The total data size is 11GB as CSV, uncompressed, which becomes about double that in memory as a pandas DataFrame for typical dtypes. This is approaching, if not Big Data, then Sizable Data, because it cannot fit into my machine's memory.

The Parquet data is stored as a multi-file dataset. The total size is 2.5GB, with Snappy compression throughout.

ls airlines-parq/
_common_metadata  part.12.parquet   part.18.parquet   part.4.parquet
_metadata         part.13.parquet   part.19.parquet   part.5.parquet
part.0.parquet    part.14.parquet   part.2.parquet    part.6.parquet
part.1.parquet    part.15.parquet   part.20.parquet   part.7.parquet
part.10.parquet   part.16.parquet   part.21.parquet   part.8.parquet
part.11.parquet   part.17.parquet   part.3.parquet    part.9.parquet

To load the metadata:

import fastparquet
pf = fastparquet.ParquetFile('airlines-parq')

The ParquetFile instance provides various information about the data-set in attributes:

pf.info
pf.schema
pf.dtypes
pf.count

Furthermore, we have information available about the "row-groups" (logical chunks) and the 29 column fragments contained within each. In this case, we have one row-group for each of the original CSV files, that is, one per year.

fastparquet will not generally be as fast as a direct memory-dump such as numpy.save or Feather, nor will it be as fast or compact as custom tuned formats like bcolz. However, it provides good trade-offs and options which can be tuned to the nature of the data. For example, the column/row-group chunking of the data allow pre-selection of only some portions of the total, which enables not having to scan through the other parts of the disc at all. The load speed will depend on the data-type of the column, the efficiency of compression, and whether there are any NULLs.

There is, in general, a trad-eoff between compression and processing speed: uncompressed will tend to be faster but larger on disc, and gzip compression will be the most compact and slowest. Snappy compression, in this example, provides moderate space efficiency without too much processing cost.

fastparquet has no problem loading a very large number of rows or columns (memory allowing).

%%time
# 124M bool values
d = pf.to_pandas(columns=['Cancelled'])
CPU times: user 436 ms, sys: 167 ms, total: 603 ms
Wall time: 620 ms
%%time
d = pf.to_pandas(columns=['Distance'])
CPU times: user 964 ms, sys: 466 ms, total: 1.43 s
Wall time: 1.47 s
%%time
# just the first portion of the data, 1.3M rows, 29 columns.
d = pf.to_pandas(filters=(('Year', '==', 1987), ))
CPU times: user 1.37 s, sys: 212 ms, total: 1.58 s
Wall time: 1.59 s

The following factors are known to reduce performance:

The Python big-data ecosystem

fastparquet provides one of the necessary links for Python to be a first-class citizen within Big Data processing. Although useful alone, it is intended to work seamlessly with the following libraries:

With the blossoming of interactive visualization technologies for Python, the prospect of end-to-end big data processing projects is now fully realizable.

Status and plans

As of the publication of this article, the library can be considered beta, useful to the general public and able to cope with many situations, but with some caveats (see below). Please try for your own use case and report issues and comments on the GitHub tracker. The code will continue to develop (contributions welcome), and we will endeavour to keep the documentation in sync and provide regular updates.

A number of nice-to-haves are planned, and work to improve the performance should be completed by about new-year, 2017.

Further information

We don't have the space to talk about it here, but documentation at RTD gives further details on

Caveats

Aside from the performance pointers, above, some specific things do not work in fastparquet, and for some of these, fixes are not planned - unless there is substantial community interest.

This work is fully open source (Apache-2.0), and contributions are welcome.

Development of the library has been supported by Continuum Analytics.