4
4
BaseHeadersSerializer ,
5
5
HttpApiHeadersSerializer ,
6
6
)
7
- from aws_lambda_powertools .utilities .data_classes .common import BaseProxyEvent
7
+ from aws_lambda_powertools .utilities .data_classes .common import BaseProxyEvent , DictWrapper
8
8
from aws_lambda_powertools .utilities .data_classes .shared_functions import (
9
9
base64_decode ,
10
10
get_header_value ,
11
11
get_query_string_value ,
12
12
)
13
13
14
14
15
- class VPCLatticeEvent (BaseProxyEvent ):
15
+ class VPCLatticeEventBase (BaseProxyEvent ):
16
16
@property
17
17
def body (self ) -> str :
18
18
"""The VPC Lattice body."""
@@ -30,11 +30,6 @@ def headers(self) -> Dict[str, str]:
30
30
"""The VPC Lattice event headers."""
31
31
return self ["headers" ]
32
32
33
- @property
34
- def is_base64_encoded (self ) -> bool :
35
- """A boolean flag to indicate if the applicable request payload is Base64-encode"""
36
- return self ["is_base64_encoded" ]
37
-
38
33
@property
39
34
def decoded_body (self ) -> str :
40
35
"""Dynamically base64 decode body as a str"""
@@ -48,24 +43,6 @@ def method(self) -> str:
48
43
"""The VPC Lattice method used. Valid values include: DELETE, GET, HEAD, OPTIONS, PATCH, POST, and PUT."""
49
44
return self ["method" ]
50
45
51
- @property
52
- def query_string_parameters (self ) -> Dict [str , str ]:
53
- """The request query string parameters."""
54
- return self ["query_string_parameters" ]
55
-
56
- @property
57
- def raw_path (self ) -> str :
58
- """The raw VPC Lattice request path."""
59
- return self ["raw_path" ]
60
-
61
- # VPCLattice event has no path field
62
- # Added here for consistency with the BaseProxyEvent class
63
- @property
64
- def path (self ) -> str :
65
- return self ["raw_path" ]
66
-
67
- # VPCLattice event has no http_method field
68
- # Added here for consistency with the BaseProxyEvent class
69
46
@property
70
47
def http_method (self ) -> str :
71
48
"""The HTTP method used. Valid values include: DELETE, GET, HEAD, OPTIONS, PATCH, POST, and PUT."""
@@ -140,3 +117,137 @@ def get_header_value(
140
117
def header_serializer (self ) -> BaseHeadersSerializer :
141
118
# When using the VPC Lattice integration, we have multiple HTTP Headers.
142
119
return HttpApiHeadersSerializer ()
120
+
121
+
122
+ class VPCLatticeEvent (VPCLatticeEventBase ):
123
+ @property
124
+ def raw_path (self ) -> str :
125
+ """The raw VPC Lattice request path."""
126
+ return self ["raw_path" ]
127
+
128
+ @property
129
+ def is_base64_encoded (self ) -> bool :
130
+ """A boolean flag to indicate if the applicable request payload is Base64-encode"""
131
+ return self ["is_base64_encoded" ]
132
+
133
+ # VPCLattice event has no path field
134
+ # Added here for consistency with the BaseProxyEvent class
135
+ @property
136
+ def path (self ) -> str :
137
+ return self ["raw_path" ]
138
+
139
+ @property
140
+ def query_string_parameters (self ) -> Dict [str , str ]:
141
+ """The request query string parameters."""
142
+ return self ["query_string_parameters" ]
143
+
144
+
145
+ class vpcLatticeEventV2Identity (DictWrapper ):
146
+ @property
147
+ def source_vpc_arn (self ) -> Optional [str ]:
148
+ """The VPC Lattice v2 Event requestContext Identity sourceVpcArn"""
149
+ return self .get ("sourceVpcArn" )
150
+
151
+ @property
152
+ def get_type (self ) -> Optional [str ]:
153
+ """The VPC Lattice v2 Event requestContext Identity type"""
154
+ return self .get ("type" )
155
+
156
+ @property
157
+ def principal (self ) -> Optional [str ]:
158
+ """The VPC Lattice v2 Event requestContext principal"""
159
+ return self .get ("principal" )
160
+
161
+ @property
162
+ def principal_org_id (self ) -> Optional [str ]:
163
+ """The VPC Lattice v2 Event requestContext principalOrgID"""
164
+ return self .get ("principalOrgID" )
165
+
166
+ @property
167
+ def session_name (self ) -> Optional [str ]:
168
+ """The VPC Lattice v2 Event requestContext sessionName"""
169
+ return self .get ("sessionName" )
170
+
171
+ @property
172
+ def x509_subject_cn (self ) -> Optional [str ]:
173
+ """The VPC Lattice v2 Event requestContext X509SubjectCn"""
174
+ return self .get ("X509SubjectCn" )
175
+
176
+ @property
177
+ def x509_issuer_ou (self ) -> Optional [str ]:
178
+ """The VPC Lattice v2 Event requestContext X509IssuerOu"""
179
+ return self .get ("X509IssuerOu" )
180
+
181
+ @property
182
+ def x509_san_dns (self ) -> Optional [str ]:
183
+ """The VPC Lattice v2 Event requestContext X509SanDns"""
184
+ return self .get ("x509SanDns" )
185
+
186
+ @property
187
+ def x509_san_uri (self ) -> Optional [str ]:
188
+ """The VPC Lattice v2 Event requestContext X509SanUri"""
189
+ return self .get ("X509SanUri" )
190
+
191
+ @property
192
+ def x509_san_name_cn (self ) -> Optional [str ]:
193
+ """The VPC Lattice v2 Event requestContext X509SanNameCn"""
194
+ return self .get ("X509SanNameCn" )
195
+
196
+
197
+ class vpcLatticeEventV2RequestContext (DictWrapper ):
198
+ @property
199
+ def service_network_arn (self ) -> str :
200
+ """The VPC Lattice v2 Event requestContext serviceNetworkArn"""
201
+ return self ["serviceNetworkArn" ]
202
+
203
+ @property
204
+ def service_arn (self ) -> str :
205
+ """The VPC Lattice v2 Event requestContext serviceArn"""
206
+ return self ["serviceArn" ]
207
+
208
+ @property
209
+ def target_group_arn (self ) -> str :
210
+ """The VPC Lattice v2 Event requestContext targetGroupArn"""
211
+ return self ["targetGroupArn" ]
212
+
213
+ @property
214
+ def identity (self ) -> vpcLatticeEventV2Identity :
215
+ """The VPC Lattice v2 Event requestContext identity"""
216
+ return vpcLatticeEventV2Identity (self ["identity" ])
217
+
218
+ @property
219
+ def region (self ) -> str :
220
+ """The VPC Lattice v2 Event requestContext serviceNetworkArn"""
221
+ return self ["region" ]
222
+
223
+ @property
224
+ def time_epoch (self ) -> float :
225
+ """The VPC Lattice v2 Event requestContext timeEpoch"""
226
+ return self ["timeEpoch" ]
227
+
228
+
229
+ class VPCLatticeEventV2 (VPCLatticeEventBase ):
230
+ @property
231
+ def version (self ) -> str :
232
+ """The VPC Lattice v2 Event version"""
233
+ return self ["version" ]
234
+
235
+ @property
236
+ def is_base64_encoded (self ) -> Optional [bool ]:
237
+ """A boolean flag to indicate if the applicable request payload is Base64-encode"""
238
+ return self .get ("isBase64Encoded" )
239
+
240
+ @property
241
+ def path (self ) -> str :
242
+ """The VPC Lattice v2 Event path"""
243
+ return self ["path" ]
244
+
245
+ @property
246
+ def request_context (self ) -> vpcLatticeEventV2RequestContext :
247
+ """he VPC Lattice v2 Event request context."""
248
+ return vpcLatticeEventV2RequestContext (self ["requestContext" ])
249
+
250
+ @property
251
+ def query_string_parameters (self ) -> Optional [Dict [str , str ]]:
252
+ """The request query string parameters."""
253
+ return self .get ("queryStringParameters" )
0 commit comments