\
In the last post I demonstrated a typical work-flow for processing data on Google Container Engine with Dask and fastparquet. A critical piece for making this quick and easy was dask-kubernetes, a simple set of scripts and templates for spinning up a distributed Dask cluster.
Since that time, dask-kubernetes has progressed to become a much more complete package with a CLI and various nice options. The repo has now graduated to the Dask organization, and joins several Dask deployment methods. With the anecdotal improvement in performance of GCE and GCS over EC2 and S3, usage is expected to increase in the near future.
Dask is a parallel execution engine for python, with a particular emphasis on data processing and analytic work-flows. It makes the transition from single-machine, in-memory computation with Numpy and pandas to out-of-core, parallelized and distributed (across a cluster) trivial in many cases, and allows for for complex custom parallel algorithm development. Kubernetes is a container orchestration system, producing many isolated, identical runtimes from templates, distributed across a cluster of machines. These two play very nicely together: kubernetes can easily create a set of homogenous Dask workers on Google Container Engine (GCE) and present the user with both the scheduler end-point for external access to the scheduler or an in-container editor environment using Jupyter.
Dask-kubernetes provides a nice simple command-line interface for setting up such clusters in GCE with properties suitable for a wide variety of computational needs.
All the containers use an image which includes typical python numerical packages as well as everything Dask users generally may need for big data applications, visualization, interaction. The origin of the image is configurable, so interested users could also make their own.
One particular nice feature, is the inclusion gcsfs, which also graduated to the Dask organization on github. In the clustered deployment, you can now pass token='cloud'
when instantiating, or when using a Dask IO function.
A default configuration is provided in a simple yaml file. This can be copied and edited, and passed to dask-kubernetes at cluster creation time. Any settings not specified in the given file will take the default values.
dask-kubernetes create my_new_cluster settings.yaml
In addition, individual parameters can be set on the command line. In the case that the same setting is specified both in a given file and on the command line, the command line takes precedence.
dask-kubernetes create -s jupyter-port=443 my_other_cluster
You can still use kubectl
to interact directly with containers, but there should be much less cause to do so.
A few highlights of the main commands:
Google Container Engine supports auto-scaling: when more pods are scheduled than can be accommodated by the current nodes, more nodes are spun up until all are can be scheduled. Similarly, if pods are terminated, leaving nodes underutilized, then pods will be moved such that a node is left empty of work, and that node will eventually be reaped. This means that, with auto-scaling enabled, one needs only set the number of desired pods (workers) and the rest will take care of itself.
In dask-kubernetes, auto-scaling is controlled with the cluster.autoscaling
parameter, and the minimum and maximum number of permitted nodes should be given. Note that the dask scheduler and jupyter notebook will be pinned to the first node, so that if kubernetes decides to move pods around, those will not get moved and restarted. The dask scheduler is resilient to worker restarts, and will repeat tasks if necessary to generate requested output.
Note that GCE typically takes over ten minutes between a node becoming idle, and stopping that node. This is partly to prevent rapid starting and stopping of nodes when the number of pods is in flux, and partly because of internal billing requirements.