\ Moving to Google Compute and Storage — Martin Durant

Martin Durant

Moving to Google Compute and Storage

written by Martin Durant on 2017-03-23

Much of our cloud-based dask-distributed examples have taken place on Amazon EC2, with longer-term storage in S3. We have built out tools such as dask-ec2 to quickly get up and running.

With increased interest at Continuum for a Google-based solution, and, particularly, Kubernetes container workflows for Anaconda Enterprise 5, this is a very short experiment to see whether we can get a working environment up and running just as easily as we could within EC2.

This blog demonstrates the use with dask-distributed of:

Launching the cluster

From the local CLI, this was as simple as

> source scripts/make_cluster.sh
> kubectl get pods       #  wait until pods are alive
> kubectl get services   #  list assigned IPs

... and in the browser go to the external IP of the jupyter-notebook service, port 8888. Note that the password was baked into the supplied config/jupyter_notebook_config.py in the docker image.

Transferring data

Let's take one of the typical datasets often referenced in dataframe examples: NYC taxi data. In this case we pick just 2014 "green" data, stored in S3 as CSVs. This should seem familiar to anyone who has come across dask-dataframe workflows before. We use s3fs for access.

import s3fs
s3 = s3fs.S3FileSystem(anon=True)
print(sum(size for key, size in s3.du('blaze-data/nyc-taxi/csv/').items() if 'green_' in key) / 2**30)
s3.du('blaze-data/nyc-taxi/csv')
# total size of only the greens: ~4GB
3.9011352648958564

{'blaze-data/nyc-taxi/csv/green_tripdata_2014-01.csv': 129441669,
 'blaze-data/nyc-taxi/csv/green_tripdata_2014-02.csv': 162233821,
 'blaze-data/nyc-taxi/csv/green_tripdata_2014-03.csv': 209397471,
...
print(s3.head('blaze-data/nyc-taxi/csv/green_tripdata_2014-01.csv').decode())
# Structure of the data
lpep_pickup_datetime,Lpep_dropoff_datetime,Store_and_fwd_flag,RateCodeID,Pickup_longitude,Pickup_latitude,Dropoff_longitude,Dropoff_latitude,Passenger_count,Trip_distance,Fare_amount,Extra,MTA_tax,Tip_amount,Tolls_amount,Ehail_fee,Total_amount,Payment_type,Distance_between_service,Time_between_service,Trip_type
2014-01-01 00:00:00,2014-01-01 01:08:06,N,1,0,0,-73.865043640136719,40.872306823730469,1,6.47,20,0.5,0.5,0,0,,21,1,1.00,0,
2014-01-01 00:00:00,2014-01-01 06:03:57,N,2,0,0,-73.7763671875,40.645488739013672,1,20.12,52,0,0.5,0,5.33,,57.83,1,.00,0,
2014-01-01 00:00:00,2014-01-01 18:22:44,N,1,0,0,-73.932647705078125,40.852573394775391,2,.81,5,0.5,0.5,0,0,,6,1,21.00,0,
2014-01-01 00:00:00,2014-01-01 00:52:03,N,1,0,0,-73.99407958984375,40.749092102050781,1,9.55,33.5,0.5,0.5,2.17,5.33,,42,1,.00,0,
2014-01-01 00:00:00,2014-01-01 00:49:25,N,1,0,0,-73.936065673828125,40.734725952148437,1,1.22,7,0.5,0.5,2,0,,10,1,.00,0,
2014-01-01 00:00:00,2014-01-01 00:01:15,N,1,0,0,-73.912155151367188,40.684059143066406,2,
# prescribe the dataframe with some typical CSV ingest options
import dask.dataframe as dd
dtype={'Store_and_fwd_flag': 'category', 'Passenger_count': 'uint8', 'Payment_type': 'category', 'Trip_type': 'float32', 'RateCodeID': 'category'}
dtype.update({f: 'float32' for f in ['Fare_aamount', 'Extra', 'MTA_tax', 'Tip_amount', 'Tolls_amount', 'Ehail_fee', 'Total_amount', 'Trip_distance', 'Distance_between_service']})
df = dd.read_csv('s3://blaze-data/nyc-taxi/csv/green*csv', dtype=dtype,
                 parse_dates=['lpep_pickup_datetime', 'Lpep_dropoff_datetime'],
                 storage_options={'anon': True}, blocksize=1000000000)
# blocksize to control number of rows per partition
df.dtypes
(lpep_pickup_datetime        datetime64[ns]
 Lpep_dropoff_datetime       datetime64[ns]
 Store_and_fwd_flag                category
 RateCodeID                        category
 Pickup_longitude                   float64
 Pickup_latitude                    float64
 Dropoff_longitude                  float64
 Dropoff_latitude                   float64
 Passenger_count                      uint8
 Trip_distance                      float32
 Fare_amount                        float64
 Extra                              float32
 MTA_tax                            float32
 Tip_amount                         float32
 Tolls_amount                       float32
 Ehail_fee                          float32
 Total_amount                       float32
 Payment_type                      category
 Distance_between_service           float32
 Time_between_service                 int64
 Trip_type                          float32
 dtype: object,
# Test token on the local filesystem to log into GCS
import gcsfs
import json
token = json.load(open('application_default_credentials.json'))
gcs = gcsfs.GCSFileSystem(project='continuum=compute', token=token)
# Perform operations on the whole cluster
# 'dask-scheduler' is a known IP within a container
from dask.distributed import Client, progress
c = Client('dask-scheduler:8786')
c
<Client: scheduler='tcp://dask-scheduler:8786' processes=10 cores=20>

Finally an all-in-one task to read from S3, parse the data, convert to parquet (with snappy compression) and write to GCS.

df.to_parquet('gcs://temporary_output/NYC.parq', compression='SNAPPY',
              storage_options={'project': 'continuum-compute', 'token': token})
# and we see files! Total size: ~1GB
gcs.du('temporary_output/NYC.parq'), gcs.du('temporary_output/NYC.parq', total=True) / 2**30
({'temporary_output/NYC.parq/_common_metadata': 542,
  'temporary_output/NYC.parq/_metadata': 38492,
  'temporary_output/NYC.parq/part.0.parquet': 36174840,
  ...},
 1.0810074405744672)

Investigate the output

We can choose to interrogate a parquet dataset on GCS directly with fastparquet: notice that we know the data-types, total number of rows and any partitioning before loading any of the data.

import fastparquet
pf = fastparquet.ParquetFile('temporary_output/NYC.parq', open_with=gcs.open)
pf
<Parquet File: {'name': 'temporary_output/NYC.parq/_metadata', 'columns': ['lpep_pickup_datetime', 'Lpep_dropoff_datetime', 'Store_and_fwd_flag', 'RateCodeID', 'Pickup_longitude', 'Pickup_latitude', 'Dropoff_longitude', 'Dropoff_latitude', 'Passenger_count', 'Trip_distance', 'Fare_amount', 'Extra', 'MTA_tax', 'Tip_amount', 'Tolls_amount', 'Ehail_fee', 'Total_amount', 'Payment_type', 'Distance_between_service', 'Time_between_service', 'Trip_type'], 'categories': [], 'rows': 25732985}>
len(pf.row_groups), pf.dtypes, pf.statistics['max']['Lpep_dropoff_datetime']
(18,
 {'Distance_between_service': dtype('float32'),
  'Dropoff_latitude': dtype('float64'),
  'Dropoff_longitude': dtype('float64'),
  'Ehail_fee': dtype('float32'),
  'Extra': dtype('float32'),
  'Fare_amount': dtype('float64'),
  'Lpep_dropoff_datetime': dtype('<M8[ns]'),
  'MTA_tax': dtype('float32'),
  'Passenger_count': dtype('uint8'),
  'Payment_type': dtype('O'),
  'Pickup_latitude': dtype('float64'),
  'Pickup_longitude': dtype('float64'),
  'RateCodeID': dtype('O'),
  'Store_and_fwd_flag': dtype('O'),
  'Time_between_service': dtype('int64'),
  'Tip_amount': dtype('float32'),
  'Tolls_amount': dtype('float32'),
  'Total_amount': dtype('float32'),
  'Trip_distance': dtype('float32'),
  'Trip_type': dtype('float32'),
  'lpep_pickup_datetime': dtype('<M8[ns]')},
 [numpy.datetime64('2014-02-01T23:28:00.000000000'),
  numpy.datetime64('2014-03-01T23:43:57.000000000'),
  ...])

Or we can prescribe a new dask dataframe for the GCS parquet version, as we did initially for the S3 CSV version. The first few rows look identical, except that 'lpep_pickup_datetime', which happened to be the natural ordering of the data, has been automatically chosen as the index. This could have been avoided, but is probably a fine choice.

df2 = dd.read_parquet('gcs://temporary_output/NYC.parq', categories=['Payment_type'],
                     storage_options={'project': 'continuum_compute'})
# the 'categories' keyword will not be necessary very soon.
df2.head()
Lpep_dropoff_datetime Store_and_fwd_flag RateCodeID Pickup_longitude Pickup_latitude Dropoff_longitude Dropoff_latitude Passenger_count Trip_distance Fare_amount Extra MTA_tax Tip_amount Tolls_amount Ehail_fee Total_amount Payment_type Distance_between_service Time_between_service Trip_type
lpep_pickup_datetime
2014-01-01 2014-01-01 01:08:06 N 1 0.0 0.0 -73.865044 40.872307 1 6.470000 20.0 0.5 0.5 0.00 0.00 NaN 21.000000 1 1.0 0 NaN
2014-01-01 2014-01-01 06:03:57 N 2 0.0 0.0 -73.776367 40.645489 1 20.120001 52.0 0.0 0.5 0.00 5.33 NaN 57.830002 1 0.0 0 NaN
2014-01-01 2014-01-01 18:22:44 N 1 0.0 0.0 -73.932648 40.852573 2 0.810000 5.0 0.5 0.5 0.00 0.00 NaN 6.000000 1 21.0 0 NaN
2014-01-01 2014-01-01 00:52:03 N 1 0.0 0.0 -73.994080 40.749092 1 9.550000 33.5 0.5 0.5 2.17 5.33 NaN 42.000000 1 0.0 0 NaN
2014-01-01 2014-01-01 00:49:25 N 1 0.0 0.0 -73.936066 40.734726 1 1.220000 7.0 0.5 0.5 2.00 0.00 NaN 10.000000 1 0.0 0 NaN
df.head()  # S3 CSV version
lpep_pickup_datetime Lpep_dropoff_datetime Store_and_fwd_flag RateCodeID Pickup_longitude Pickup_latitude Dropoff_longitude Dropoff_latitude Passenger_count Trip_distance ... Extra MTA_tax Tip_amount Tolls_amount Ehail_fee Total_amount Payment_type Distance_between_service Time_between_service Trip_type
0 2014-01-01 2014-01-01 01:08:06 N 1 0.0 0.0 -73.865044 40.872307 1 6.470000 ... 0.5 0.5 0.00 0.00 NaN 21.000000 1 1.0 0 NaN
1 2014-01-01 2014-01-01 06:03:57 N 2 0.0 0.0 -73.776367 40.645489 1 20.120001 ... 0.0 0.5 0.00 5.33 NaN 57.830002 1 0.0 0 NaN
2 2014-01-01 2014-01-01 18:22:44 N 1 0.0 0.0 -73.932648 40.852573 2 0.810000 ... 0.5 0.5 0.00 0.00 NaN 6.000000 1 21.0 0 NaN
3 2014-01-01 2014-01-01 00:52:03 N 1 0.0 0.0 -73.994080 40.749092 1 9.550000 ... 0.5 0.5 2.17 5.33 NaN 42.000000 1 0.0 0 NaN
4 2014-01-01 2014-01-01 00:49:25 N 1 0.0 0.0 -73.936066 40.734726 1 1.220000 ... 0.5 0.5 2.00 0.00 NaN 10.000000 1 0.0 0 NaN

5 rows × 21 columns

Test

Let's do some quick timing tests to show that this was generally worthwhile

%time len(df.Passenger_count)  # data from S3 CSV
Wall time: 20 s

25732985
%time len(df2.Passenger_count)  # data from GCS parquet
Wall time: 2.52 s

25732985
%%time
# data from S3 CSV
df['tip_percent'] = df.Tip_amount / df.Total_amount * 100
df.groupby(df.Payment_type).tip_percent.mean().compute()
Wall time: 20.1 s

Payment_type
1    13.963146
2     0.000032
3     0.151614
4     0.022007
5     0.000000
Name: tip_percent, dtype: float64
%%time
# data from GCS parquet
df2['tip_percent'] = df2.Tip_amount / df2.Total_amount * 100
df2.groupby(df2.Payment_type).tip_percent.mean()).compute()
Wall time: 8.5 s

Payment_type
1    13.963146
2     0.000032
3     0.151614
4     0.022007
5     0.000000
Name: tip_percent, dtype: float64

So we see that we get a speedup of between a factor of 8 and 2.4, depending on how many columns were accessed.

Conclusions

The purpose of this post is to show that we can get up an running on Google infrastructure just as easily as we could on EC2, and be able to quickly do some real dask data processing. This worked pretty well, both for setting up an environment and for interfacing with GCS as an alternative to S3.

Further thoughts: