12
12
from ._common import app , request , db
13
13
from ._exceptions import MissingAPIKeyException , UnAuthenticatedException
14
14
from ._db import metadata , TABLE_OPTIONS
15
+ from ._logger import get_structured_logger
16
+ import re
15
17
16
18
API_KEY_HARD_WARNING = API_KEY_REQUIRED_STARTING_AT - timedelta (days = 14 )
17
19
API_KEY_SOFT_WARNING = API_KEY_HARD_WARNING - timedelta (days = 14 )
@@ -130,8 +132,8 @@ class UserRole(str, Enum):
130
132
131
133
class User :
132
134
api_key : str
133
- roles : Set [UserRole ]
134
135
authenticated : bool
136
+ roles : Set [UserRole ]
135
137
tracking : bool = True
136
138
registered : bool = True
137
139
@@ -141,18 +143,31 @@ def __init__(self, api_key: str, authenticated: bool, roles: Set[UserRole], trac
141
143
self .roles = roles
142
144
self .tracking = tracking
143
145
self .registered = registered
146
+
147
+ def get_apikey (self ) -> str :
148
+ return self .api_key
149
+
150
+ def is_authenticated (self ) -> bool :
151
+ return self .authenticated
144
152
145
153
def has_role (self , role : UserRole ) -> bool :
146
154
return role in self .roles
147
155
148
- def log_info (self , msg : str , * args , ** kwargs ) -> None :
149
- if self .authenticated and self .tracking :
150
- app .logger .info (f"apikey: { self .api_key } , { msg } " , * args , ** kwargs )
151
- else :
152
- app .logger .info (msg , * args , ** kwargs )
153
-
154
156
def is_rate_limited (self ) -> bool :
155
157
return not self .registered
158
+
159
+ def is_tracking (self ) -> bool :
160
+ return self .tracking
161
+
162
+ def log_info (self , msg : str , * args , ** kwargs ) -> None :
163
+ logger = get_structured_logger ("api_key_logs" , filename = "api_key_logs.log" )
164
+ if self .is_authenticated ():
165
+ if self .is_tracking ():
166
+ logger .info (msg , * args , ** dict (kwargs , apikey = self .get_apikey ()))
167
+ else :
168
+ logger .info (msg , * args , ** dict (kwargs , apikey = "*****" ))
169
+ else :
170
+ logger .info (msg , * args , ** kwargs )
156
171
157
172
158
173
ANONYMOUS_USER = User ("anonymous" , False , set ())
@@ -169,7 +184,6 @@ def _find_user(api_key: Optional[str]) -> User:
169
184
return User (user .api_key , True , set (user .roles .split ("," )), user .tracking , user .registered )
170
185
171
186
def resolve_auth_token () -> Optional [str ]:
172
- # auth request param
173
187
for name in ('auth' , 'api_key' , 'token' ):
174
188
if name in request .values :
175
189
return request .values [name ]
@@ -187,12 +201,23 @@ def _get_current_user() -> User:
187
201
if "user" not in g :
188
202
api_key = resolve_auth_token ()
189
203
user = _find_user (api_key )
190
- if not user .authenticated and require_api_key ():
204
+ request_path = request .full_path
205
+ if not user .is_authenticated () and require_api_key ():
191
206
raise MissingAPIKeyException ()
192
- user .log_info (request .full_path )
207
+ # If the user configured no-track option, mask the API key
208
+ if not user .is_tracking ():
209
+ request_path = mask_apikey (request_path )
210
+ user .log_info ("Get path" , path = request_path )
193
211
g .user = user
194
212
return g .user
195
213
214
+ def mask_apikey (path : str ) -> str :
215
+ # Function to mask API key query string from a request path
216
+ regexp = re .compile (r'[\\?&]api_key=([^&#]*)' )
217
+ if regexp .search (path ):
218
+ path = re .sub (regexp , "&api_key=*****" , path )
219
+ return path
220
+
196
221
197
222
current_user : User = cast (User , LocalProxy (_get_current_user ))
198
223
@@ -204,12 +229,12 @@ def require_api_key() -> bool:
204
229
205
230
def show_soft_api_key_warning () -> bool :
206
231
n = date .today ()
207
- return not current_user .authenticated and not app .config .get ('TESTING' , False ) and n > API_KEY_SOFT_WARNING and n < API_KEY_HARD_WARNING
232
+ return not current_user .is_authenticated () and not app .config .get ('TESTING' , False ) and n > API_KEY_SOFT_WARNING and n < API_KEY_HARD_WARNING
208
233
209
234
210
235
def show_hard_api_key_warning () -> bool :
211
236
n = date .today ()
212
- return not current_user .authenticated and not app .config .get ('TESTING' , False ) and n > API_KEY_HARD_WARNING
237
+ return not current_user .is_authenticated () and not app .config .get ('TESTING' , False ) and n > API_KEY_HARD_WARNING
213
238
214
239
215
240
def _is_public_route () -> bool :
0 commit comments