\
Much of our cloud-based dask-distributed examples have taken place on Amazon EC2, with longer-term storage in S3. We have built out tools such as dask-ec2 to quickly get up and running.
With increased interest at Continuum for a Google-based solution, and, particularly, Kubernetes container workflows for Anaconda Enterprise 5, this is a very short experiment to see whether we can get a working environment up and running just as easily as we could within EC2.
This blog demonstrates the use with dask-distributed of:
From the local CLI, this was as simple as
> source scripts/make_cluster.sh > kubectl get pods # wait until pods are alive > kubectl get services # list assigned IPs
... and in the browser go to the external IP of the jupyter-notebook service, port 8888. Note that the password was baked into the supplied config/jupyter_notebook_config.py in the docker image.
Let's take one of the typical datasets often referenced in dataframe examples: NYC taxi data. In this case we pick just 2014 "green" data, stored in S3 as CSVs. This should seem familiar to anyone who has come across dask-dataframe workflows before. We use s3fs for access.
import s3fs s3 = s3fs.S3FileSystem(anon=True)
print(sum(size for key, size in s3.du('blaze-data/nyc-taxi/csv/').items() if 'green_' in key) / 2**30) s3.du('blaze-data/nyc-taxi/csv') # total size of only the greens: ~4GB
3.9011352648958564
{'blaze-data/nyc-taxi/csv/green_tripdata_2014-01.csv': 129441669,
'blaze-data/nyc-taxi/csv/green_tripdata_2014-02.csv': 162233821,
'blaze-data/nyc-taxi/csv/green_tripdata_2014-03.csv': 209397471,
...
print(s3.head('blaze-data/nyc-taxi/csv/green_tripdata_2014-01.csv').decode()) # Structure of the data
lpep_pickup_datetime,Lpep_dropoff_datetime,Store_and_fwd_flag,RateCodeID,Pickup_longitude,Pickup_latitude,Dropoff_longitude,Dropoff_latitude,Passenger_count,Trip_distance,Fare_amount,Extra,MTA_tax,Tip_amount,Tolls_amount,Ehail_fee,Total_amount,Payment_type,Distance_between_service,Time_between_service,Trip_type
2014-01-01 00:00:00,2014-01-01 01:08:06,N,1,0,0,-73.865043640136719,40.872306823730469,1,6.47,20,0.5,0.5,0,0,,21,1,1.00,0,
2014-01-01 00:00:00,2014-01-01 06:03:57,N,2,0,0,-73.7763671875,40.645488739013672,1,20.12,52,0,0.5,0,5.33,,57.83,1,.00,0,
2014-01-01 00:00:00,2014-01-01 18:22:44,N,1,0,0,-73.932647705078125,40.852573394775391,2,.81,5,0.5,0.5,0,0,,6,1,21.00,0,
2014-01-01 00:00:00,2014-01-01 00:52:03,N,1,0,0,-73.99407958984375,40.749092102050781,1,9.55,33.5,0.5,0.5,2.17,5.33,,42,1,.00,0,
2014-01-01 00:00:00,2014-01-01 00:49:25,N,1,0,0,-73.936065673828125,40.734725952148437,1,1.22,7,0.5,0.5,2,0,,10,1,.00,0,
2014-01-01 00:00:00,2014-01-01 00:01:15,N,1,0,0,-73.912155151367188,40.684059143066406,2,
# prescribe the dataframe with some typical CSV ingest options import dask.dataframe as dd dtype={'Store_and_fwd_flag': 'category', 'Passenger_count': 'uint8', 'Payment_type': 'category', 'Trip_type': 'float32', 'RateCodeID': 'category'} dtype.update({f: 'float32' for f in ['Fare_aamount', 'Extra', 'MTA_tax', 'Tip_amount', 'Tolls_amount', 'Ehail_fee', 'Total_amount', 'Trip_distance', 'Distance_between_service']}) df = dd.read_csv('s3://blaze-data/nyc-taxi/csv/green*csv', dtype=dtype, parse_dates=['lpep_pickup_datetime', 'Lpep_dropoff_datetime'], storage_options={'anon': True}, blocksize=1000000000) # blocksize to control number of rows per partition
df.dtypes
(lpep_pickup_datetime datetime64[ns]
Lpep_dropoff_datetime datetime64[ns]
Store_and_fwd_flag category
RateCodeID category
Pickup_longitude float64
Pickup_latitude float64
Dropoff_longitude float64
Dropoff_latitude float64
Passenger_count uint8
Trip_distance float32
Fare_amount float64
Extra float32
MTA_tax float32
Tip_amount float32
Tolls_amount float32
Ehail_fee float32
Total_amount float32
Payment_type category
Distance_between_service float32
Time_between_service int64
Trip_type float32
dtype: object,
# Test token on the local filesystem to log into GCS import gcsfs import json token = json.load(open('application_default_credentials.json')) gcs = gcsfs.GCSFileSystem(project='continuum=compute', token=token)
# Perform operations on the whole cluster # 'dask-scheduler' is a known IP within a container from dask.distributed import Client, progress c = Client('dask-scheduler:8786') c
<Client: scheduler='tcp://dask-scheduler:8786' processes=10 cores=20>
Finally an all-in-one task to read from S3, parse the data, convert to parquet (with snappy compression) and write to GCS.
df.to_parquet('gcs://temporary_output/NYC.parq', compression='SNAPPY', storage_options={'project': 'continuum-compute', 'token': token})
# and we see files! Total size: ~1GB gcs.du('temporary_output/NYC.parq'), gcs.du('temporary_output/NYC.parq', total=True) / 2**30
({'temporary_output/NYC.parq/_common_metadata': 542,
'temporary_output/NYC.parq/_metadata': 38492,
'temporary_output/NYC.parq/part.0.parquet': 36174840,
...},
1.0810074405744672)
We can choose to interrogate a parquet dataset on GCS directly with fastparquet: notice that we know the data-types, total number of rows and any partitioning before loading any of the data.
import fastparquet pf = fastparquet.ParquetFile('temporary_output/NYC.parq', open_with=gcs.open) pf
<Parquet File: {'name': 'temporary_output/NYC.parq/_metadata', 'columns': ['lpep_pickup_datetime', 'Lpep_dropoff_datetime', 'Store_and_fwd_flag', 'RateCodeID', 'Pickup_longitude', 'Pickup_latitude', 'Dropoff_longitude', 'Dropoff_latitude', 'Passenger_count', 'Trip_distance', 'Fare_amount', 'Extra', 'MTA_tax', 'Tip_amount', 'Tolls_amount', 'Ehail_fee', 'Total_amount', 'Payment_type', 'Distance_between_service', 'Time_between_service', 'Trip_type'], 'categories': [], 'rows': 25732985}>
len(pf.row_groups), pf.dtypes, pf.statistics['max']['Lpep_dropoff_datetime']
(18,
{'Distance_between_service': dtype('float32'),
'Dropoff_latitude': dtype('float64'),
'Dropoff_longitude': dtype('float64'),
'Ehail_fee': dtype('float32'),
'Extra': dtype('float32'),
'Fare_amount': dtype('float64'),
'Lpep_dropoff_datetime': dtype('<M8[ns]'),
'MTA_tax': dtype('float32'),
'Passenger_count': dtype('uint8'),
'Payment_type': dtype('O'),
'Pickup_latitude': dtype('float64'),
'Pickup_longitude': dtype('float64'),
'RateCodeID': dtype('O'),
'Store_and_fwd_flag': dtype('O'),
'Time_between_service': dtype('int64'),
'Tip_amount': dtype('float32'),
'Tolls_amount': dtype('float32'),
'Total_amount': dtype('float32'),
'Trip_distance': dtype('float32'),
'Trip_type': dtype('float32'),
'lpep_pickup_datetime': dtype('<M8[ns]')},
[numpy.datetime64('2014-02-01T23:28:00.000000000'),
numpy.datetime64('2014-03-01T23:43:57.000000000'),
...])
Or we can prescribe a new dask dataframe for the GCS parquet version, as we did initially for the S3 CSV version. The first few rows look identical, except that 'lpep_pickup_datetime', which happened to be the natural ordering of the data, has been automatically chosen as the index. This could have been avoided, but is probably a fine choice.
df2 = dd.read_parquet('gcs://temporary_output/NYC.parq', categories=['Payment_type'], storage_options={'project': 'continuum_compute'}) # the 'categories' keyword will not be necessary very soon. df2.head()
Lpep_dropoff_datetime | Store_and_fwd_flag | RateCodeID | Pickup_longitude | Pickup_latitude | Dropoff_longitude | Dropoff_latitude | Passenger_count | Trip_distance | Fare_amount | Extra | MTA_tax | Tip_amount | Tolls_amount | Ehail_fee | Total_amount | Payment_type | Distance_between_service | Time_between_service | Trip_type | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
lpep_pickup_datetime | ||||||||||||||||||||
2014-01-01 | 2014-01-01 01:08:06 | N | 1 | 0.0 | 0.0 | -73.865044 | 40.872307 | 1 | 6.470000 | 20.0 | 0.5 | 0.5 | 0.00 | 0.00 | NaN | 21.000000 | 1 | 1.0 | 0 | NaN |
2014-01-01 | 2014-01-01 06:03:57 | N | 2 | 0.0 | 0.0 | -73.776367 | 40.645489 | 1 | 20.120001 | 52.0 | 0.0 | 0.5 | 0.00 | 5.33 | NaN | 57.830002 | 1 | 0.0 | 0 | NaN |
2014-01-01 | 2014-01-01 18:22:44 | N | 1 | 0.0 | 0.0 | -73.932648 | 40.852573 | 2 | 0.810000 | 5.0 | 0.5 | 0.5 | 0.00 | 0.00 | NaN | 6.000000 | 1 | 21.0 | 0 | NaN |
2014-01-01 | 2014-01-01 00:52:03 | N | 1 | 0.0 | 0.0 | -73.994080 | 40.749092 | 1 | 9.550000 | 33.5 | 0.5 | 0.5 | 2.17 | 5.33 | NaN | 42.000000 | 1 | 0.0 | 0 | NaN |
2014-01-01 | 2014-01-01 00:49:25 | N | 1 | 0.0 | 0.0 | -73.936066 | 40.734726 | 1 | 1.220000 | 7.0 | 0.5 | 0.5 | 2.00 | 0.00 | NaN | 10.000000 | 1 | 0.0 | 0 | NaN |
df.head() # S3 CSV version
lpep_pickup_datetime | Lpep_dropoff_datetime | Store_and_fwd_flag | RateCodeID | Pickup_longitude | Pickup_latitude | Dropoff_longitude | Dropoff_latitude | Passenger_count | Trip_distance | ... | Extra | MTA_tax | Tip_amount | Tolls_amount | Ehail_fee | Total_amount | Payment_type | Distance_between_service | Time_between_service | Trip_type | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 2014-01-01 | 2014-01-01 01:08:06 | N | 1 | 0.0 | 0.0 | -73.865044 | 40.872307 | 1 | 6.470000 | ... | 0.5 | 0.5 | 0.00 | 0.00 | NaN | 21.000000 | 1 | 1.0 | 0 | NaN |
1 | 2014-01-01 | 2014-01-01 06:03:57 | N | 2 | 0.0 | 0.0 | -73.776367 | 40.645489 | 1 | 20.120001 | ... | 0.0 | 0.5 | 0.00 | 5.33 | NaN | 57.830002 | 1 | 0.0 | 0 | NaN |
2 | 2014-01-01 | 2014-01-01 18:22:44 | N | 1 | 0.0 | 0.0 | -73.932648 | 40.852573 | 2 | 0.810000 | ... | 0.5 | 0.5 | 0.00 | 0.00 | NaN | 6.000000 | 1 | 21.0 | 0 | NaN |
3 | 2014-01-01 | 2014-01-01 00:52:03 | N | 1 | 0.0 | 0.0 | -73.994080 | 40.749092 | 1 | 9.550000 | ... | 0.5 | 0.5 | 2.17 | 5.33 | NaN | 42.000000 | 1 | 0.0 | 0 | NaN |
4 | 2014-01-01 | 2014-01-01 00:49:25 | N | 1 | 0.0 | 0.0 | -73.936066 | 40.734726 | 1 | 1.220000 | ... | 0.5 | 0.5 | 2.00 | 0.00 | NaN | 10.000000 | 1 | 0.0 | 0 | NaN |
5 rows × 21 columns
Let's do some quick timing tests to show that this was generally worthwhile
%time len(df.Passenger_count) # data from S3 CSV
Wall time: 20 s
25732985
%time len(df2.Passenger_count) # data from GCS parquet
Wall time: 2.52 s
25732985
%%time # data from S3 CSV df['tip_percent'] = df.Tip_amount / df.Total_amount * 100 df.groupby(df.Payment_type).tip_percent.mean().compute()
Wall time: 20.1 s
Payment_type
1 13.963146
2 0.000032
3 0.151614
4 0.022007
5 0.000000
Name: tip_percent, dtype: float64
%%time # data from GCS parquet df2['tip_percent'] = df2.Tip_amount / df2.Total_amount * 100 df2.groupby(df2.Payment_type).tip_percent.mean()).compute()
Wall time: 8.5 s
Payment_type
1 13.963146
2 0.000032
3 0.151614
4 0.022007
5 0.000000
Name: tip_percent, dtype: float64
So we see that we get a speedup of between a factor of 8 and 2.4, depending on how many columns were accessed.
The purpose of this post is to show that we can get up an running on Google infrastructure just as easily as we could on EC2, and be able to quickly do some real dask data processing. This worked pretty well, both for setting up an environment and for interfacing with GCS as an alternative to S3.
Further thoughts: