-
Notifications
You must be signed in to change notification settings - Fork 68
/
Copy pathutils.py
162 lines (134 loc) · 5.48 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
import inspect
import re
from abc import ABCMeta
from enum import Enum
from typing import TypeVar, Optional, Union, Iterable, Type, Callable
T = TypeVar("T", bound=Enum)
SNAKE_CASE_RE = re.compile(r'^([a-z]+\d*_[a-z\d_]*|_+[a-z\d]+[a-z\d_]*)$',
re.IGNORECASE)
WORD_RE = re.compile(r'^([a-z]+\d*)$', re.IGNORECASE)
from azure.functions._json import StringifyEnum, StringifyEnumJsonEncoder # NOQA
class BuildDictMeta(type):
def __new__(mcs, name, bases, dct):
"""BuildDictMeta will apply to every binding.
It will apply :meth:`add_to_dict` decorator to :meth:`__init__` of
every binding class to collect list of params to include in building
json dictionary which corresponds to function.json in legacy app.
It will also apply :meth:`skip_none` to :meth:`get_dict_repr` to
enable json dictionary generated for every binding has non-empty
value fields. It is needed for enabling binding param optionality.
"""
cls = super().__new__(mcs, name, bases, dct)
setattr(cls, '__init__',
cls.add_to_dict(getattr(cls, '__init__')))
setattr(cls, 'get_dict_repr',
cls.skip_none(getattr(cls, 'get_dict_repr')))
return cls
@staticmethod
def skip_none(func):
def wrapper(*args, **kw):
res = func(*args, **kw)
return BuildDictMeta.clean_nones(res)
return wrapper
@staticmethod
def add_to_dict(func: Callable):
def wrapper(*args, **kwargs):
if args is None or len(args) == 0:
raise ValueError(
f'{func.__name__} has no args. Please ensure func is an '
f'object method.')
func(*args, **kwargs)
self = args[0]
init_params = set(inspect.signature(func).parameters.keys())
init_params.update(kwargs.keys())
for key in kwargs.keys():
if not hasattr(self, key):
setattr(self, key, kwargs[key])
setattr(self, 'init_params', init_params)
return wrapper
@staticmethod
def clean_nones(value):
"""
Recursively remove all None values from dictionaries and lists,
and returns
the result as a new dictionary or list.
"""
if isinstance(value, list):
return [BuildDictMeta.clean_nones(x) for x in value if
x is not None]
elif isinstance(value, dict):
return {
key: BuildDictMeta.clean_nones(val)
for key, val in value.items()
if val is not None
}
else:
return value
class ABCBuildDictMeta(ABCMeta, BuildDictMeta):
pass
def parse_singular_param_to_enum(param: Optional[Union[T, str]],
class_name: Type[T]) -> Optional[T]:
if param is None:
return None
if isinstance(param, str):
try:
return class_name[param.upper()]
except KeyError:
raise KeyError(
f"Can not parse str '{param}' to {class_name.__name__}. "
f"Allowed values are {[e.name for e in class_name]}")
return param
def parse_iterable_param_to_enums(
param_values: Optional[Union[Iterable[str], Iterable[T]]],
class_name: Type[T]) -> Optional[Iterable[T]]:
if param_values is None:
return None
try:
return [class_name[value.upper()] if isinstance(value, str) else value
for value in param_values]
except KeyError:
raise KeyError(
f"Can not parse '{param_values}' to "
f"Optional[Iterable[{class_name.__name__}]]. "
f"Please ensure param all list elements exist in "
f"{[e.name for e in class_name]}")
def to_camel_case(snake_case_str: str):
if snake_case_str is None or len(snake_case_str) == 0:
raise ValueError(
f"Please ensure arg name {snake_case_str} is not empty!")
if not is_snake_case(snake_case_str) and not is_word(snake_case_str):
raise ValueError(
f"Please ensure {snake_case_str} is a word or snake case "
f"string with underscore as separator.")
words = snake_case_str.split('_')
return words[0] + ''.join(ele.title() for ele in words[1:])
def is_snake_case(input_string: str) -> bool:
"""
Checks if a string is formatted as "snake case".
A string is considered snake case when:
- it's composed only by lowercase/uppercase letters and digits
- it contains at least one underscore
- it does not start with a number
*Examples:*
>>> is_snake_case('foo_bar_baz') # returns true
>>> is_snake_case('foo') # returns false
:param input_string: String to test.
:return: True for a snake case string, false otherwise.
"""
return SNAKE_CASE_RE.match(input_string) is not None
def is_word(input_string: str) -> bool:
"""
Checks if a string is one word.
A string is considered one word when:
- it's composed only by lowercase/uppercase letters and digits
- it does not start with a number
*Examples:*
>>> is_word('1foo') # returns false
>>> is_word('foo_') # returns false
>>> is_word('foo') # returns true
:param input_string: String to test.
:return: True for one word string, false otherwise.
"""
return WORD_RE.match(input_string) is not None