|
1 | 1 | from __future__ import annotations
|
2 | 2 |
|
3 |
| -import ast |
4 | 3 | import functools
|
5 | 4 | import logging
|
6 | 5 | import warnings
|
@@ -296,33 +295,55 @@ def _apply_action_to_fields(
|
296 | 295 |
|
297 | 296 | def _apply_masking_rules(self, data: dict, masking_rules: dict) -> dict:
|
298 | 297 | """
|
299 |
| - Apply masking rules to data, supporting different rules for each field. |
| 298 | + Apply masking rules to data, supporting both simple field names and complex path expressions. |
| 299 | +
|
| 300 | + Args: |
| 301 | + data: The dictionary containing data to mask |
| 302 | + masking_rules: Dictionary mapping field names or path expressions to masking rules |
| 303 | +
|
| 304 | + Returns: |
| 305 | + dict: The masked data dictionary |
300 | 306 | """
|
301 | 307 | result = data.copy()
|
302 | 308 |
|
303 | 309 | for path, rule in masking_rules.items():
|
304 | 310 | try:
|
305 |
| - # Handle nested paths (e.g., 'address.street') |
306 |
| - parts = path.split(".") |
307 |
| - current = result |
308 |
| - |
309 |
| - for part in parts[:-1]: |
310 |
| - if isinstance(current[part], str) and current[part].startswith("{"): |
311 |
| - try: |
312 |
| - current[part] = ast.literal_eval(current[part]) |
313 |
| - except (ValueError, SyntaxError): |
314 |
| - continue |
315 |
| - current = current[part] |
316 |
| - |
317 |
| - final_field = parts[-1] |
318 |
| - |
319 |
| - # Apply masking rule to the target field |
320 |
| - if final_field in current: |
321 |
| - current[final_field] = self.provider.erase(str(current[final_field]), **rule) |
322 |
| - |
323 |
| - except (KeyError, TypeError, AttributeError): |
324 |
| - # Log warning if field not found or invalid path |
325 |
| - warnings.warn(f"Could not apply masking rule for path: {path}", stacklevel=2) |
| 311 | + if ".." in path: |
| 312 | + # Handle recursive descent paths (e.g., "address..name") |
| 313 | + base_path, field = path.split("..") |
| 314 | + jsonpath_expr = parse(f"$.{base_path}..{field}") |
| 315 | + elif "[" in path: |
| 316 | + # Handle array notation paths (e.g., "address[*].street") |
| 317 | + jsonpath_expr = parse(f"$.{path}") |
| 318 | + else: |
| 319 | + # Handle simple field names (e.g., "email") |
| 320 | + jsonpath_expr = parse(f"$.{path}") |
| 321 | + |
| 322 | + matches = jsonpath_expr.find(result) |
| 323 | + |
| 324 | + if not matches: |
| 325 | + warnings.warn(f"No matches found for path: {path}", stacklevel=2) |
| 326 | + continue |
| 327 | + |
| 328 | + for match in matches: |
| 329 | + try: |
| 330 | + value = match.value |
| 331 | + if value is not None: |
| 332 | + if isinstance(value, dict): |
| 333 | + # Handle dictionary values by masking each field |
| 334 | + for k, v in value.items(): |
| 335 | + if v is not None: |
| 336 | + value[k] = self.provider.erase(str(v), **rule) |
| 337 | + else: |
| 338 | + masked_value = self.provider.erase(str(value), **rule) |
| 339 | + match.full_path.update(result, masked_value) |
| 340 | + |
| 341 | + except Exception as e: |
| 342 | + warnings.warn(f"Error masking value for path {path}: {str(e)}", stacklevel=2) |
| 343 | + continue |
| 344 | + |
| 345 | + except Exception as e: |
| 346 | + warnings.warn(f"Error processing path {path}: {str(e)}", stacklevel=2) |
326 | 347 | continue
|
327 | 348 |
|
328 | 349 | return result
|
|
0 commit comments