Better is a subjective term but there are a few approaches you can try.
-
The simplest thing you can do in this particular case is to avoid exceptions whatsoever. All you need is a
flatMap
and some slicing:log.flatMap(lambda s : s.split(' ')[8:9])
As you can see it means no need for an exception handling or subsequent
filter
. -
Previous idea can be extended with a simple wrapper
def seq_try(f, *args, **kwargs): try: return [f(*args, **kwargs)] except: return []
and example usage
from operator import div # FYI operator provides getitem as well. rdd = sc.parallelize([1, 2, 0, 3, 0, 5, "foo"]) rdd.flatMap(lambda x: seq_try(div, 1., x)).collect() ## [1.0, 0.5, 0.3333333333333333, 0.2]
-
finally more OO approach:
import inspect as _inspect class _Try(object): pass class Failure(_Try): def __init__(self, e): if Exception not in _inspect.getmro(e.__class__): msg = "Invalid type for Failure: {0}" raise TypeError(msg.format(e.__class__)) self._e = e self.isSuccess = False self.isFailure = True def get(self): raise self._e def __repr__(self): return "Failure({0})".format(repr(self._e)) class Success(_Try): def __init__(self, v): self._v = v self.isSuccess = True self.isFailure = False def get(self): return self._v def __repr__(self): return "Success({0})".format(repr(self._v)) def Try(f, *args, **kwargs): try: return Success(f(*args, **kwargs)) except Exception as e: return Failure(e)
and example usage:
tries = rdd.map(lambda x: Try(div, 1.0, x)) tries.collect() ## [Success(1.0), ## Success(0.5), ## Failure(ZeroDivisionError('float division by zero',)), ## Success(0.3333333333333333), ## Failure(ZeroDivisionError('float division by zero',)), ## Success(0.2), ## Failure(TypeError("unsupported operand type(s) for /: 'float' and 'str'",))] tries.filter(lambda x: x.isSuccess).map(lambda x: x.get()).collect() ## [1.0, 0.5, 0.3333333333333333, 0.2]
You can even use pattern matching with
multipledispatch
from multipledispatch import dispatch from operator import getitem @dispatch(Success) def check(x): return "Another great success" @dispatch(Failure) def check(x): return "What a failure" a_list = [1, 2, 3] check(Try(getitem, a_list, 1)) ## 'Another great success' check(Try(getitem, a_list, 10)) ## 'What a failure'
If you like this approach I’ve pushed a little bit more complete implementation to GitHub and pypi.