|
31 | 31 | from . import methods
|
32 | 32 | from .indexing import (_partition_of_index_value, _loc, _try_loc,
|
33 | 33 | _coerce_loc_index, _maybe_partial_time_string)
|
34 |
| -from .utils import meta_nonempty, make_meta, insert_meta_param_description |
| 34 | +from .utils import (meta_nonempty, make_meta, insert_meta_param_description, |
| 35 | + raise_on_meta_error) |
35 | 36 |
|
36 | 37 | no_default = '__no_default__'
|
37 | 38 |
|
@@ -157,11 +158,7 @@ def _get_unary_operator(cls, op):
|
157 | 158 | def f(self):
|
158 | 159 | name = funcname(op) + '-' + tokenize(self)
|
159 | 160 | dsk = {(name, 0): (op, (self._name, 0))}
|
160 |
| - try: |
161 |
| - meta = op(self._meta_nonempty) |
162 |
| - except: |
163 |
| - raise ValueError("Metadata inference failed in operator " |
164 |
| - "{0}.".format(funcname(op))) |
| 161 | + meta = op(self._meta_nonempty) |
165 | 162 | return Scalar(merge(dsk, self.dask), name, meta)
|
166 | 163 | return f
|
167 | 164 |
|
@@ -189,16 +186,12 @@ def _scalar_binary(op, self, other, inv=False):
|
189 | 186 | else:
|
190 | 187 | dsk.update({(name, 0): (op, (self._name, 0), other_key)})
|
191 | 188 |
|
192 |
| - try: |
193 |
| - other_meta = make_meta(other) |
194 |
| - other_meta_nonempty = meta_nonempty(other_meta) |
195 |
| - if inv: |
196 |
| - meta = op(other_meta_nonempty, self._meta_nonempty) |
197 |
| - else: |
198 |
| - meta = op(self._meta_nonempty, other_meta_nonempty) |
199 |
| - except: |
200 |
| - raise ValueError("Metadata inference failed in operator " |
201 |
| - "{0}.".format(funcname(op))) |
| 189 | + other_meta = make_meta(other) |
| 190 | + other_meta_nonempty = meta_nonempty(other_meta) |
| 191 | + if inv: |
| 192 | + meta = op(other_meta_nonempty, self._meta_nonempty) |
| 193 | + else: |
| 194 | + meta = op(self._meta_nonempty, other_meta_nonempty) |
202 | 195 |
|
203 | 196 | if return_type is not Scalar:
|
204 | 197 | return return_type(dsk, name, meta,
|
@@ -1631,11 +1624,7 @@ def map(self, arg, na_action=None, meta=no_default):
|
1631 | 1624 | enumerate(self._keys()))
|
1632 | 1625 | dsk.update(self.dask)
|
1633 | 1626 | if meta is no_default:
|
1634 |
| - try: |
1635 |
| - meta = self._meta_nonempty.map(arg, na_action=na_action) |
1636 |
| - except Exception: |
1637 |
| - raise ValueError("Metadata inference failed, please provide " |
1638 |
| - "`meta` keyword") |
| 1627 | + meta = _emulate(M.map, self, arg, na_action=na_action) |
1639 | 1628 | else:
|
1640 | 1629 | meta = make_meta(meta)
|
1641 | 1630 |
|
@@ -1761,13 +1750,9 @@ def apply(self, func, convert_dtype=True, meta=no_default,
|
1761 | 1750 | " or: .apply(func, meta=('x', 'f8')) for series result")
|
1762 | 1751 | warnings.warn(msg)
|
1763 | 1752 |
|
1764 |
| - try: |
1765 |
| - meta = _emulate(M.apply, self._meta_nonempty, func, |
1766 |
| - convert_dtype=convert_dtype, |
1767 |
| - args=args, **kwds) |
1768 |
| - except Exception: |
1769 |
| - raise ValueError("Metadata inference failed, please provide " |
1770 |
| - "`meta` keyword") |
| 1753 | + meta = _emulate(M.apply, self._meta_nonempty, func, |
| 1754 | + convert_dtype=convert_dtype, |
| 1755 | + args=args, **kwds) |
1771 | 1756 |
|
1772 | 1757 | return map_partitions(M.apply, self, func,
|
1773 | 1758 | convert_dtype, args, meta=meta, **kwds)
|
@@ -2350,12 +2335,8 @@ def apply(self, func, axis=0, args=(), meta=no_default,
|
2350 | 2335 | " or: .apply(func, meta=('x', 'f8')) for series result")
|
2351 | 2336 | warnings.warn(msg)
|
2352 | 2337 |
|
2353 |
| - try: |
2354 |
| - meta = _emulate(M.apply, self._meta_nonempty, func, |
2355 |
| - axis=axis, args=args, **kwds) |
2356 |
| - except Exception: |
2357 |
| - raise ValueError("Metadata inference failed, please provide " |
2358 |
| - "`meta` keyword") |
| 2338 | + meta = _emulate(M.apply, self._meta_nonempty, func, |
| 2339 | + axis=axis, args=args, **kwds) |
2359 | 2340 |
|
2360 | 2341 | return map_partitions(M.apply, self, func, axis,
|
2361 | 2342 | False, False, None, args, meta=meta, **kwds)
|
@@ -2605,13 +2586,9 @@ def apply_concat_apply(args, chunk=None, aggregate=None, meta=no_default,
|
2605 | 2586 | dsk2 = {(b, 0): (apply, aggregate, [conc], aggregate_kwargs)}
|
2606 | 2587 |
|
2607 | 2588 | if meta is no_default:
|
2608 |
| - try: |
2609 |
| - meta_chunk = _emulate(apply, chunk, args, chunk_kwargs) |
2610 |
| - meta = _emulate(apply, aggregate, [_concat([meta_chunk])], |
2611 |
| - aggregate_kwargs) |
2612 |
| - except Exception: |
2613 |
| - raise ValueError("Metadata inference failed, please provide " |
2614 |
| - "`meta` keyword") |
| 2589 | + meta_chunk = _emulate(apply, chunk, args, chunk_kwargs) |
| 2590 | + meta = _emulate(apply, aggregate, [_concat([meta_chunk])], |
| 2591 | + aggregate_kwargs) |
2615 | 2592 | meta = make_meta(meta)
|
2616 | 2593 |
|
2617 | 2594 | dasks = [arg.dask for arg in args if isinstance(arg, _Frame)]
|
@@ -2645,7 +2622,8 @@ def _emulate(func, *args, **kwargs):
|
2645 | 2622 | Apply a function using args / kwargs. If arguments contain dd.DataFrame /
|
2646 | 2623 | dd.Series, using internal cache (``_meta``) for calculation
|
2647 | 2624 | """
|
2648 |
| - return func(*_extract_meta(args, True), **_extract_meta(kwargs, True)) |
| 2625 | + with raise_on_meta_error(funcname(func)): |
| 2626 | + return func(*_extract_meta(args, True), **_extract_meta(kwargs, True)) |
2649 | 2627 |
|
2650 | 2628 |
|
2651 | 2629 | @insert_meta_param_description
|
@@ -2678,11 +2656,7 @@ def map_partitions(func, *args, **kwargs):
|
2678 | 2656 | args = _maybe_align_partitions(args)
|
2679 | 2657 |
|
2680 | 2658 | if meta is no_default:
|
2681 |
| - try: |
2682 |
| - meta = _emulate(func, *args, **kwargs) |
2683 |
| - except Exception: |
2684 |
| - raise ValueError("Metadata inference failed, please provide " |
2685 |
| - "`meta` keyword") |
| 2659 | + meta = _emulate(func, *args, **kwargs) |
2686 | 2660 |
|
2687 | 2661 | if all(isinstance(arg, Scalar) for arg in args):
|
2688 | 2662 | dask = {(name, 0):
|
|
0 commit comments