3
3
import enum
4
4
import re
5
5
import warnings
6
- from typing import Any , overload
6
+ from typing import Any , overload , override
7
7
8
8
from typing_extensions import deprecated
9
9
@@ -31,6 +31,7 @@ def __init__(
31
31
http_method : str ,
32
32
resource : str ,
33
33
partition : str = "aws" ,
34
+ is_websocket_authorizer : bool = False ,
34
35
):
35
36
self .partition = partition
36
37
self .region = region
@@ -40,39 +41,54 @@ def __init__(
40
41
self .http_method = http_method
41
42
# Remove matching "/" from `resource`.
42
43
self .resource = resource .lstrip ("/" )
44
+ self .is_websocket_authorizer = is_websocket_authorizer
43
45
44
46
@property
45
47
def arn (self ) -> str :
46
48
"""Build an arn from its parts
47
49
eg: arn:aws:execute-api:us-east-1:123456789012:abcdef123/test/GET/request"""
48
- return (
49
- f"arn:{ self .partition } :execute-api:{ self .region } :{ self .aws_account_id } :{ self .api_id } /{ self .stage } /"
50
- f"{ self .http_method } /{ self .resource } "
51
- )
50
+ base_arn = f"arn:{ self .partition } :execute-api:{ self .region } :{ self .aws_account_id } :{ self .api_id } /{ self .stage } "
51
+
52
+ if not self .is_websocket_authorizer :
53
+ return f"{ base_arn } /{ self .http_method } /{ self .resource } "
54
+ else :
55
+ return f"{ base_arn } /{ self .resource } "
52
56
53
57
54
- def parse_api_gateway_arn (arn : str ) -> APIGatewayRouteArn :
58
+ def parse_api_gateway_arn (arn : str , is_websocket_authorizer : bool = False ) -> APIGatewayRouteArn :
55
59
"""Parses a gateway route arn as a APIGatewayRouteArn class
56
60
57
61
Parameters
58
62
----------
59
63
arn : str
60
64
ARN string for a methodArn or a routeArn
65
+ is_websocket_authorizer: bool
66
+ If it's a API Gateway Websocket
67
+
61
68
Returns
62
69
-------
63
70
APIGatewayRouteArn
64
71
"""
65
72
arn_parts = arn .split (":" )
66
73
api_gateway_arn_parts = arn_parts [5 ].split ("/" )
74
+
75
+ if not is_websocket_authorizer :
76
+ http_method = api_gateway_arn_parts [2 ]
77
+ resource = "/" .join (api_gateway_arn_parts [3 :]) if len (api_gateway_arn_parts ) >= 4 else ""
78
+ else :
79
+ http_method = None
80
+ resource = "/" .join (api_gateway_arn_parts [2 :])
81
+
67
82
return APIGatewayRouteArn (
68
83
partition = arn_parts [1 ],
69
84
region = arn_parts [3 ],
70
85
aws_account_id = arn_parts [4 ],
71
86
api_id = api_gateway_arn_parts [0 ],
72
87
stage = api_gateway_arn_parts [1 ],
73
- http_method = api_gateway_arn_parts [ 2 ] ,
88
+ http_method = http_method ,
74
89
# conditional allow us to handle /path/{proxy+} resources, as their length changes.
75
- resource = "/" .join (api_gateway_arn_parts [3 :]) if len (api_gateway_arn_parts ) >= 4 else "" ,
90
+ resource = resource ,
91
+ is_websocket_authorizer = is_websocket_authorizer ,
76
92
)
77
93
78
94
@@ -512,13 +528,14 @@ def _add_route(self, effect: str, http_method: str, resource: str, conditions: l
512
528
raise ValueError (f"Invalid resource path: { resource } . Path should match { self .path_regex } " )
513
529
514
530
resource_arn = APIGatewayRouteArn (
515
- self .region ,
516
- self .aws_account_id ,
517
- self .api_id ,
518
- self .stage ,
519
- http_method ,
520
- resource ,
521
- self .partition ,
531
+ region = self .region ,
532
+ aws_account_id = self .aws_account_id ,
533
+ api_id = self .api_id ,
534
+ stage = self .stage ,
535
+ http_method = http_method ,
536
+ resource = resource ,
537
+ partition = self .partition ,
538
+ is_websocket_authorizer = False ,
522
539
).arn
523
540
524
541
route = {"resourceArn" : resource_arn , "conditions" : conditions }
@@ -617,3 +634,124 @@ def asdict(self) -> dict[str, Any]:
617
634
response ["context" ] = self .context
618
635
619
636
return response
637
+
638
+
639
+ class APIGatewayAuthorizerResponseWebSocket (APIGatewayAuthorizerResponse ):
640
+ """The IAM Policy Response required for API Gateway WebSocket APIs
641
+
642
+ Based on: - https://github.com/awslabs/aws-apigateway-lambda-authorizer-blueprints/blob/\
643
+ master/blueprints/python/api-gateway-authorizer-python.py
644
+
645
+ Documentation:
646
+ -------------
647
+ - https://docs.aws.amazon.com/apigateway/latest/developerguide/http-api-lambda-authorizer.html
648
+ - https://docs.aws.amazon.com/apigateway/latest/developerguide/api-gateway-lambda-authorizer-output.html
649
+ """
650
+
651
+ @staticmethod
652
+ def from_route_arn (
653
+ arn : str ,
654
+ principal_id : str ,
655
+ context : dict | None = None ,
656
+ usage_identifier_key : str | None = None ,
657
+ ) -> APIGatewayAuthorizerResponseWebSocket :
658
+ parsed_arn = parse_api_gateway_arn (arn , is_websocket_authorizer = True )
659
+ return APIGatewayAuthorizerResponseWebSocket (
660
+ principal_id ,
661
+ parsed_arn .region ,
662
+ parsed_arn .aws_account_id ,
663
+ parsed_arn .api_id ,
664
+ parsed_arn .stage ,
665
+ context ,
666
+ usage_identifier_key ,
667
+ )
668
+
669
+ @override
670
+ def _add_route (self , effect : str , resource : str , conditions : list [dict ] | None = None ):
671
+ """Adds a route to the internal lists of allowed or denied routes. Each object in
672
+ the internal list contains a resource ARN and a condition statement. The condition
673
+ statement can be null."""
674
+ resource_arn = APIGatewayRouteArn (
675
+ region = self .region ,
676
+ aws_account_id = self .aws_account_id ,
677
+ api_id = self .api_id ,
678
+ stage = self .stage ,
679
+ http_method = None ,
680
+ resource = resource ,
681
+ partition = self .partition ,
682
+ is_websocket_authorizer = True ,
683
+ ).arn
684
+
685
+ route = {"resourceArn" : resource_arn , "conditions" : conditions }
686
+
687
+ if effect .lower () == "allow" :
688
+ self ._allow_routes .append (route )
689
+ else : # deny
690
+ self ._deny_routes .append (route )
691
+
692
+ @override
693
+ def allow_all_routes (self ):
694
+ """Adds a '*' allow to the policy to authorize access to all methods of an API"""
695
+ self ._add_route (effect = "Allow" , resource = "*" )
696
+
697
+ @override
698
+ def deny_all_routes (self ):
699
+ """Adds a '*' allow to the policy to deny access to all methods of an API"""
700
+
701
+ self ._add_route (effect = "Deny" , resource = "*" )
702
+
703
+ @override
704
+ def allow_route (self , resource : str , conditions : list [dict ] | None = None ):
705
+ """
706
+ Add an API Gateway Websocket method to the list of allowed methods for the policy.
707
+
708
+ This method adds an API Gateway Websocket method Resource path) to the list of
709
+ allowed methods for the policy. It optionally includes conditions for the policy statement.
710
+
711
+ Parameters
712
+ ----------
713
+ resource : str
714
+ The API Gateway resource path to allow.
715
+ conditions : list[dict] | None, optional
716
+ A list of condition dictionaries to apply to the policy statement.
717
+ Default is None.
718
+
719
+ Notes
720
+ -----
721
+ For more information on AWS policy conditions, see:
722
+ https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_elements.html#Condition
723
+
724
+ Example
725
+ --------
726
+ >>> policy = APIGatewayAuthorizerResponseWebSocket(...)
727
+ >>> policy.allow_route("/api/users", [{"StringEquals": {"aws:RequestTag/Environment": "Production"}}])
728
+ """
729
+ self ._add_route (effect = "Allow" , resource = resource , conditions = conditions )
730
+
731
+ @override
732
+ def deny_route (self , resource : str , conditions : list [dict ] | None = None ):
733
+ """
734
+ Add an API Gateway Websocket method to the list of allowed methods for the policy.
735
+
736
+ This method adds an API Gateway Websocket method Resource path) to the list of
737
+ denied methods for the policy. It optionally includes conditions for the policy statement.
738
+
739
+ Parameters
740
+ ----------
741
+ resource : str
742
+ The API Gateway resource path to allow.
743
+ conditions : list[dict] | None, optional
744
+ A list of condition dictionaries to apply to the policy statement.
745
+ Default is None.
746
+
747
+ Notes
748
+ -----
749
+ For more information on AWS policy conditions, see:
750
+ https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_elements.html#Condition
751
+
752
+ Example
753
+ --------
754
+ >>> policy = APIGatewayAuthorizerResponseWebSocket(...)
755
+ >>> policy.deny_route("/api/users", [{"StringEquals": {"aws:RequestTag/Environment": "Production"}}])
756
+ """
757
+ self ._add_route (effect = "Deny" , resource = resource , conditions = conditions )
0 commit comments