\ Local Exceptions — Martin Durant

Martin Durant

Local Exceptions

written by Martin Durant on 2017-03-06

Replaying exceptions from a remote worker

If you are a spark user, or other programmer of distributed software, you will certainly have been in this situation. You have executed a computation across a cluster (or perhaps just in some processes on one machine), and something has gone awry. The screen shows a traceback, and probably you can find which line in which function failed, but you still don't know why it failed.

If this were not a distributed problem, you would turn to your favourite debugger, like pdb for python, and inspect the state of the system when the exception occurred. You can't do that here.

Dask's distributed scheduler is easy to hack for a distributed computation engine, and here I'll demonstrate a new feature which allows you to pull just the part of a failed computation responsible for an exception into the local thread, so that you can do real debugging.

Let's make an error

Here is a typical workflow: we may wrap functions in dask.delayed to define a computation for parallel execution. The output variable out (which hasn't been executed yet) carries the definition of all the tasks that are to be carried out.

from dask import delayed
from dask.distributed import Client
c = Client()  # default cluster, some local processes
def dec(x):
    return x - 1

def recip(x):
    return 1 / x

inputs = [9, 4, 2, 76, 2, 1, 5]

ddec = delayed(dec)
drecip = delayed(recip)
dsum = delayed(sum)

out = dsum([drecip(ddec(x)) for x in inputs])
dict(out.dask)
{'dec-2320282a-2fe3-4c08-8d05-ec7bcfa3c9a0': (<function __main__.dec>, 9),
 'dec-2c3e77ed-4d41-4d24-a77e-65ca2931d90b': (<function __main__.dec>, 2),
 'dec-73e20b21-c38c-4e2f-96e5-ce0b337e1a17': (<function __main__.dec>, 2),
 'dec-9574e24f-3df4-4b11-acf9-b31f0d831eda': (<function __main__.dec>, 5),
 'dec-bcf44c7d-5eee-4c69-befb-4bb673007a12': (<function __main__.dec>, 1),
 'dec-e0272ce2-ab4f-48b5-9c3a-3534d3d9d288': (<function __main__.dec>, 4),
 'dec-e9439552-450e-427f-a85d-ad1c2159c859': (<function __main__.dec>, 76),
 'recip-29bcb4aa-197d-4c61-bf22-001914b2fceb': (<function __main__.recip>,
  'dec-2320282a-2fe3-4c08-8d05-ec7bcfa3c9a0'),
 'recip-7497b4b9-43b7-49af-9b7d-aa3f91b5a78f': (<function __main__.recip>,
  'dec-e0272ce2-ab4f-48b5-9c3a-3534d3d9d288'),
 'recip-8cbece21-25cc-4a63-9585-905e99ef54ea': (<function __main__.recip>,
  'dec-bcf44c7d-5eee-4c69-befb-4bb673007a12'),
 'recip-9911ba10-cee0-40f1-a3b1-9df451d4724e': (<function __main__.recip>,
  'dec-e9439552-450e-427f-a85d-ad1c2159c859'),
 'recip-a329bb60-948d-4bc1-9024-8b8169e7a141': (<function __main__.recip>,
  'dec-9574e24f-3df4-4b11-acf9-b31f0d831eda'),
 'recip-cafbb029-33f9-40db-95e8-fc5d9dc80cb2': (<function __main__.recip>,
  'dec-73e20b21-c38c-4e2f-96e5-ce0b337e1a17'),
 'recip-d4c3b499-2106-47a7-ad5a-f512f1c62492': (<function __main__.recip>,
  'dec-2c3e77ed-4d41-4d24-a77e-65ca2931d90b'),
 'sum-ab3de01e-565f-407a-8563-e363f3e17138': (<function sum>,
  ['recip-29bcb4aa-197d-4c61-bf22-001914b2fceb',
   'recip-7497b4b9-43b7-49af-9b7d-aa3f91b5a78f',
   'recip-cafbb029-33f9-40db-95e8-fc5d9dc80cb2',
   'recip-9911ba10-cee0-40f1-a3b1-9df451d4724e',
   'recip-d4c3b499-2106-47a7-ad5a-f512f1c62492',
   'recip-8cbece21-25cc-4a63-9585-905e99ef54ea',
   'recip-a329bb60-948d-4bc1-9024-8b8169e7a141'])}

When we compute, we get an exception. Since this is a contrived case, the name and location of the exception are enough to have a good idea of what happened, but let's suppose we don't know.

out.compute()
...
<ipython-input-2-990705f387fe> in recip()
      3 
      4 def recip(x):
----> 5     return 1 / x
      6 
      7 inputs = [9, 4, 2, 76, 2, 1, 5]


ZeroDivisionError: division by zero

Debugging?

Actually, that's the stack-trace from the worker process, but we can't run debug now, the exception didn't hapen in our main process/thread. Until now, the recommended course of action would have been to execute the whole computation graph in the local thread as follows:

import dask
with dask.set_options(get=dask.async.get_sync):
    out.compute()
...
<ipython-input-2-990705f387fe> in recip(x)
      3 
      4 def recip(x):
----> 5     return 1 / x
      6 
      7 inputs = [9, 4, 2, 76, 2, 1, 5]


ZeroDivisionError: division by zero

Which does work, and allows us to debug

debug
> <ipython-input-2-990705f387fe>(5)recip()
      3 
      4 def recip(x):
----> 5     return 1 / x
      6 
      7 inputs = [9, 4, 2, 76, 2, 1, 5]

ipdb> p x
0
ipdb> q

...but the whole point of running a process through Dask is usually because we can't fit all the data in this one machine's memory, so pulling everything into one thread may well not be possible.

Another alternative: rewrite everything using the distributed client's map/submit model. Now when we have the error, we have kept pointers to all the intermediate tasks, so we can go back and find out which first failed and infer when that input to it must have been. We'd rather not have to go through this process.

futs1 = c.map(dec, inputs)
futs2 = c.map(recip, futs1)
fut3 = c.submit(sum, futs2)
c.gather(fut3)
...
<ipython-input-2-990705f387fe> in recip()
      3 
      4 def recip(x):
----> 5     return 1 / x
      6 
      7 inputs = [9, 4, 2, 76, 2, 1, 5]


ZeroDivisionError: division by zero
# all the futures; note that all intermediates are kept in memory
# by the cluster.
futs1, futs2, fut3
# we can infer that recip-d6051c1952b3e227443c7bca1924d10b caused the problem,
# and that dec-610d6bc3a7402e586ae307a8a5da6cda was its input - which value
# was that?
([<Future: status: finished, type: int, key: dec-3867c905cd3db6d8a90a8bc3895adf16>,
  <Future: status: finished, type: int, key: dec-c0050b7dad0484be378457b1f04cd6d2>,
  <Future: status: finished, type: int, key: dec-2b0735c32fef85ee7864e4906c404348>,
  <Future: status: finished, type: int, key: dec-30f697d4b15a3fb82e63663f90e79028>,
  <Future: status: finished, type: int, key: dec-2b0735c32fef85ee7864e4906c404348>,
  <Future: status: finished, type: int, key: dec-610d6bc3a7402e586ae307a8a5da6cda>,
  <Future: status: finished, type: int, key: dec-2ca0787ac027e76e69ef4a72f73b099f>],
 [<Future: status: finished, type: float, key: recip-5f366fbd566b4b1753f34ced032e75cf>,
  <Future: status: finished, type: float, key: recip-b3c91f2dea621849c636422c889374ab>,
  <Future: status: finished, type: float, key: recip-0623a01548741b79eac6cb0d27d03207>,
  <Future: status: finished, type: float, key: recip-6f2f6a852a7fddca81d7aebfb47a80b5>,
  <Future: status: finished, type: float, key: recip-0623a01548741b79eac6cb0d27d03207>,
  <Future: status: error, key: recip-d6051c1952b3e227443c7bca1924d10b>,
  <Future: status: finished, type: float, key: recip-e1bae5f1997f1ba6e7f074315b4fbce0>],
 <Future: status: error, key: sum-0029496e8d105b2ccc25156278d931f4>)

This method doesn't play well with collections, though, where you don't have easy access to the intermediates. Below is the same calculation: bag2 points a future, each of which is the result of more than one task, things are not simple

import dask.bag as db
bag = db.from_sequence(inputs)
bag2 = c.persist(bag.map(dec).map(recip))
bag2.sum().compute()
...
<ipython-input-2-990705f387fe> in recip()
      3 
      4 def recip(x):
----> 5     return 1 / x
      6 
      7 inputs = [9, 4, 2, 76, 2, 1, 5]


ZeroDivisionError: division by zero

and our previous best method also fails, because bag2 depends on data held on the cluster, not locally.

import dask
with dask.set_options(get=dask.async.get_sync):
    bag2.sum().compute()
TypeError: object of type 'Future' has no len()

Recreate error locally

With new functionality, whenever we have a future that comes back from the scheduler as an "error" (see below), we can call the method recreate_error_locally. This analyses the graph for that future, finds the task responsible for the exception, and pulls only it, and its input values, from the cluster for local execution.

f = c.compute(bag2.sum())
f
<Future: status: error, key: finalize-999fffbd587f6214fc6327ae914debe1>
c.recreate_error_locally(f)
...
<ipython-input-2-990705f387fe> in recip(x)
      3 
      4 def recip(x):
----> 5     return 1 / x
      6 
      7 inputs = [9, 4, 2, 76, 2, 1, 5]


ZeroDivisionError: division by zero

Now we can debug again:

debug
> <ipython-input-2-990705f387fe>(5)recip()
      3 
      4 def recip(x):
----> 5     return 1 / x
      6 
      7 inputs = [9, 4, 2, 76, 2, 1, 5]

ipdb> p x
0
ipdb> q

This also works for output created using only dask.delayed:

f = c.compute(out)
c.recreate_error_locally(f)
...
<ipython-input-2-990705f387fe> in recip(x)
      3 
      4 def recip(x):
----> 5     return 1 / x
      6 
      7 inputs = [9, 4, 2, 76, 2, 1, 5]


ZeroDivisionError: division by zero

and output created using map/submit

c.recreate_error_locally(fut3)
...
<ipython-input-2-990705f387fe> in recip(x)
      3 
      4 def recip(x):
----> 5     return 1 / x
      6 
      7 inputs = [9, 4, 2, 76, 2, 1, 5]


ZeroDivisionError: division by zero

Conclusion

I am not aware of any other distributed execution engine with comparable functionality.

The real lesson, though, is that it took relatively little effort and few lines of code to get this working, because the distributed scheduler is pure-python, and very approachable.