Skip to content

Commit 0681ad9

Browse files
Adding TransferFamilyAuthorizer and TransferFamilyAuthorizerResponse class
1 parent 7a05f43 commit 0681ad9

File tree

9 files changed

+348
-11
lines changed

9 files changed

+348
-11
lines changed

aws_lambda_powertools/utilities/data_classes/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
from .ses_event import SESEvent
4646
from .sns_event import SNSEvent
4747
from .sqs_event import SQSEvent
48+
from .transfer_family_event import TransferFamilyAuthorizer, TransferFamilyAuthorizerResponse
4849
from .vpc_lattice import VPCLatticeEvent, VPCLatticeEventV2
4950

5051
__all__ = [
@@ -87,4 +88,6 @@
8788
"VPCLatticeEvent",
8889
"VPCLatticeEventV2",
8990
"CloudFormationCustomResourceEvent",
91+
"TransferFamilyAuthorizerResponse",
92+
"TransferFamilyAuthorizer",
9093
]
Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
from __future__ import annotations
2+
3+
from typing import Any, Literal
4+
5+
from aws_lambda_powertools.utilities.data_classes.common import (
6+
DictWrapper,
7+
)
8+
9+
10+
class TransferFamilyAuthorizer(DictWrapper):
11+
@property
12+
def username(self) -> str:
13+
"""The username used for authentication"""
14+
return self["username"]
15+
16+
@property
17+
def password(self) -> str | None:
18+
"""
19+
The password used for authentication.
20+
None in case customer authenticating with certificates
21+
"""
22+
return self["password"]
23+
24+
@property
25+
def protocol(self) -> str:
26+
"""The protocol can be SFTP, FTP or FTPS"""
27+
return self["protocol"]
28+
29+
@property
30+
def server_id(self) -> str:
31+
"""The AWS Transfer Family ServerID"""
32+
return self["serverId"]
33+
34+
@property
35+
def source_ip(self) -> str:
36+
"""The customer IP used for connection"""
37+
return self["sourceIp"]
38+
39+
40+
class TransferFamilyAuthorizerResponse:
41+
42+
def _build_authentication_response(
43+
self,
44+
role_arn: str,
45+
policy: str | None = None,
46+
home_directory: str | None = None,
47+
home_directory_details: dict | None = None,
48+
home_directory_type: Literal["LOGICAL", "PATH"] = "PATH",
49+
user_gid: int | None = None,
50+
user_uid: int | None = None,
51+
public_keys: str | None = None,
52+
) -> dict[str, Any]:
53+
54+
response: dict[str, Any] = {}
55+
56+
if home_directory_type == "PATH":
57+
if not home_directory:
58+
raise ValueError("home_directory must be set when home_directory_type is PATH")
59+
60+
response["HomeDirectory"] = home_directory
61+
elif home_directory_type == "LOGICAL":
62+
if not home_directory_details:
63+
raise ValueError("home_directory_details must be set when home_directory_type is LOGICAL")
64+
65+
response["HomeDirectoryDetails"] = [home_directory_details]
66+
67+
else:
68+
raise ValueError(f"Invalid home_directory_type: {home_directory_type}")
69+
70+
if user_uid is not None:
71+
response["PosixProfile"] = {"Gid": user_gid, "Uid": user_gid}
72+
73+
if policy:
74+
response["Policy"] = policy
75+
76+
if public_keys:
77+
response["PublicKeys"] = public_keys
78+
79+
response["Role"] = role_arn
80+
response["HomeDirectoryType"] = home_directory_type
81+
82+
return response
83+
84+
def build_authentication_response_efs(
85+
self,
86+
role_arn: str,
87+
user_gid: int,
88+
user_uid: int,
89+
policy: str | None = None,
90+
home_directory: str | None = None,
91+
home_directory_details: dict | None = None,
92+
home_directory_type: Literal["LOGICAL", "PATH"] = "PATH",
93+
public_keys: str | None = None,
94+
) -> dict[str, Any]:
95+
"""
96+
Build an authentication response for AWS Transfer Family using EFS (Elastic File System).
97+
98+
Parameters:
99+
-----------
100+
role_arn : str
101+
The Amazon Resource Name (ARN) of the IAM role.
102+
user_gid : int
103+
The group ID of the user.
104+
user_uid : int
105+
The user ID.
106+
policy : str | None, optional
107+
The IAM policy document. Defaults to None.
108+
home_directory : str | None, optional
109+
The home directory path. Required if home_directory_type is "PATH". Defaults to None.
110+
home_directory_details : dict | None, optional
111+
Details of the home directory. Required if home_directory_type is "LOGICAL". Defaults to None.
112+
home_directory_type : Literal["LOGICAL", "PATH"], optional
113+
The type of home directory. Must be either "LOGICAL" or "PATH". Defaults to "PATH".
114+
public_keys : str | None, optional
115+
The public keys associated with the user. Defaults to None.
116+
117+
Returns:
118+
--------
119+
dict[str, Any]
120+
A dictionary containing the authentication response with various details such as
121+
role ARN, policy, home directory information, and user details.
122+
123+
Raises:
124+
-------
125+
ValueError
126+
If an invalid home_directory_type is provided or if required parameters are missing
127+
for the specified home_directory_type.
128+
"""
129+
130+
return self._build_authentication_response(
131+
role_arn=role_arn,
132+
policy=policy,
133+
home_directory=home_directory,
134+
home_directory_details=home_directory_details,
135+
home_directory_type=home_directory_type,
136+
public_keys=public_keys,
137+
user_gid=user_gid,
138+
user_uid=user_uid,
139+
)
140+
141+
def build_authentication_response_s3(
142+
self,
143+
role_arn: str,
144+
policy: str | None = None,
145+
home_directory: str | None = None,
146+
home_directory_details: dict | None = None,
147+
home_directory_type: Literal["LOGICAL", "PATH"] = "PATH",
148+
public_keys: str | None = None,
149+
) -> dict[str, Any]:
150+
"""
151+
Build an authentication response for Amazon S3.
152+
153+
This method constructs an authentication response tailored for S3 access,
154+
likely by calling an internal method with the provided parameters.
155+
156+
Parameters:
157+
-----------
158+
role_arn : str
159+
The Amazon Resource Name (ARN) of the IAM role for S3 access.
160+
policy : str | None, optional
161+
The IAM policy document for S3 access. Defaults to None.
162+
home_directory : str | None, optional
163+
The home directory path in S3. Required if home_directory_type is "PATH". Defaults to None.
164+
home_directory_details : dict | None, optional
165+
Details of the home directory in S3. Required if home_directory_type is "LOGICAL". Defaults to None.
166+
home_directory_type : Literal["LOGICAL", "PATH"], optional
167+
The type of home directory in S3. Must be either "LOGICAL" or "PATH". Defaults to "PATH".
168+
public_keys : str | None, optional
169+
The public keys associated with the user for S3 access. Defaults to None.
170+
171+
Returns:
172+
--------
173+
dict[str, Any]
174+
A dictionary containing the authentication response with various details such as
175+
role ARN, policy, home directory information, and potentially other S3-specific attributes.
176+
177+
Raises:
178+
-------
179+
ValueError
180+
If an invalid home_directory_type is provided or if required parameters are missing
181+
for the specified home_directory_type.
182+
"""
183+
return self._build_authentication_response(
184+
role_arn=role_arn,
185+
policy=policy,
186+
home_directory=home_directory,
187+
home_directory_details=home_directory_details,
188+
home_directory_type=home_directory_type,
189+
public_keys=public_keys,
190+
)

aws_lambda_powertools/utilities/parser/models/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@
109109
)
110110
from .sns import SnsModel, SnsNotificationModel, SnsRecordModel
111111
from .sqs import SqsAttributesModel, SqsModel, SqsMsgAttributeModel, SqsRecordModel
112-
from .transfer_family import TransferFamily
112+
from .transfer_family import TransferFamilyAuthorizer
113113
from .vpc_lattice import VpcLatticeModel
114114
from .vpc_latticev2 import VpcLatticeV2Model
115115

@@ -180,7 +180,7 @@
180180
"SqsAttributesModel",
181181
"S3SqsEventNotificationModel",
182182
"S3SqsEventNotificationRecordModel",
183-
"TransferFamily",
183+
"TransferFamilyAuthorizer",
184184
"APIGatewayProxyEventModel",
185185
"APIGatewayEventRequestContext",
186186
"APIGatewayEventAuthorizer",

aws_lambda_powertools/utilities/parser/models/transfer_family.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from pydantic.networks import IPvAnyAddress
55

66

7-
class TransferFamily(BaseModel):
7+
class TransferFamilyAuthorizer(BaseModel):
88
username: str
99
password: Optional[str] = None
1010
protocol: Literal["SFTP", "FTP", "FTPS"]

docs/utilities/data_classes.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,8 @@ Log Data Event for Troubleshooting
108108
| [SES](#ses) | `SESEvent` |
109109
| [SNS](#sns) | `SNSEvent` |
110110
| [SQS](#sqs) | `SQSEvent` |
111+
| [TransferFamilyAuthorizer] | `TransferFamilyAuthorizer` |
112+
| [TransferFamilyAuthorizerResponse] | `TransferFamilyAuthorizerResponse` |
111113
| [VPC Lattice V2](#vpc-lattice-v2) | `VPCLatticeV2Event` |
112114
| [VPC Lattice V1](#vpc-lattice-v1) | `VPCLatticeEvent` |
113115

docs/utilities/parser.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ The example above uses `SqsModel`. Other built-in models can be found below.
132132
| **SesModel** | Lambda Event Source payload for Amazon Simple Email Service |
133133
| **SnsModel** | Lambda Event Source payload for Amazon Simple Notification Service |
134134
| **SqsModel** | Lambda Event Source payload for Amazon SQS |
135-
| **TransferFamily** | Lambda Event Source payload for AWS Transfer Family custom identity provider |
135+
| **TransferFamilyAuthorizer** | Lambda Event Source payload for AWS Transfer Family Lambda authorizer |
136136
| **VpcLatticeModel** | Lambda Event Source payload for Amazon VPC Lattice |
137137
| **VpcLatticeV2Model** | Lambda Event Source payload for Amazon VPC Lattice v2 payload |
138138

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
import pytest
2+
3+
from aws_lambda_powertools.utilities.data_classes.transfer_family_event import (
4+
TransferFamilyAuthorizer,
5+
TransferFamilyAuthorizerResponse,
6+
)
7+
from tests.functional.utils import load_event
8+
9+
10+
def test_transfer_family_event():
11+
raw_event = load_event("transferFamilyAuthorizer.json")
12+
parsed_event = TransferFamilyAuthorizer(raw_event)
13+
14+
assert parsed_event.username == raw_event["username"]
15+
assert parsed_event.password == raw_event["password"]
16+
assert parsed_event.protocol == raw_event["protocol"]
17+
assert parsed_event.server_id == raw_event["serverId"]
18+
assert parsed_event.source_ip == raw_event["sourceIp"]
19+
20+
21+
@pytest.mark.parametrize("home_directory_type", ["LOGICAL", "PATH"])
22+
def test_build_authentication_response_s3(home_directory_type):
23+
24+
# GIVEN a Authorizer response
25+
response = TransferFamilyAuthorizerResponse()
26+
27+
role_arn = "arn:aws:iam::123456789012:role/S3Access"
28+
policy = '{"Version": "2012-10-17", "Statement": [{"Effect": "Allow", "Action": "s3:*", "Resource": "*"}]}'
29+
home_directory = "/bucket/user" if home_directory_type == "PATH" else None
30+
home_directory_details = (
31+
{"Entry": "/", "Target": "/bucket/${transfer:UserName}"} if home_directory_type == "LOGICAL" else None
32+
)
33+
public_keys = "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQC0g+Z"
34+
35+
# WHEN building an authentication response for S3 with different home directory types
36+
response = response.build_authentication_response_s3(
37+
role_arn=role_arn,
38+
policy=policy,
39+
home_directory=home_directory,
40+
home_directory_details=home_directory_details,
41+
home_directory_type=home_directory_type,
42+
public_keys=public_keys,
43+
)
44+
45+
# THEN the authentication response is correctly built
46+
assert isinstance(response, dict)
47+
assert response.get("Role") == role_arn
48+
assert response.get("Policy") == policy
49+
assert response.get("PublicKeys") == public_keys
50+
51+
if home_directory_type == "PATH":
52+
assert response.get("HomeDirectory") == home_directory
53+
assert "HomeDirectoryDetails" not in response
54+
else:
55+
assert response.get("HomeDirectoryDetails") == [home_directory_details]
56+
assert "HomeDirectory" not in response
57+
58+
59+
@pytest.mark.parametrize("home_directory_type", ["LOGICAL", "PATH"])
60+
def test_build_authentication_response_efs(home_directory_type):
61+
62+
# GIVEN a Authorizer response
63+
response = TransferFamilyAuthorizerResponse()
64+
65+
role_arn = "arn:aws:iam::123456789012:role/S3Access"
66+
home_directory = "/bucket/user" if home_directory_type == "PATH" else None
67+
home_directory_details = (
68+
{"Entry": "/", "Target": "/bucket/${transfer:UserName}"} if home_directory_type == "LOGICAL" else None
69+
)
70+
71+
# WHEN building an authentication response for EFS with different home directory types
72+
response = response.build_authentication_response_efs(
73+
role_arn=role_arn,
74+
home_directory=home_directory,
75+
home_directory_details=home_directory_details,
76+
home_directory_type=home_directory_type,
77+
user_gid=0,
78+
user_uid=0,
79+
)
80+
81+
# THEN the authentication response is correctly built
82+
assert isinstance(response, dict)
83+
assert response.get("Role") == role_arn
84+
85+
if home_directory_type == "PATH":
86+
assert response.get("HomeDirectory") == home_directory
87+
assert "HomeDirectoryDetails" not in response
88+
else:
89+
assert response.get("HomeDirectoryDetails") == [home_directory_details]
90+
assert "HomeDirectory" not in response
91+
92+
93+
def test_build_authentication_missing_home_directory():
94+
95+
# GIVEN a Authorizer response
96+
response = TransferFamilyAuthorizerResponse()
97+
98+
# WHEN home_directory_details is empty and type is LOGICAL
99+
role_arn = "arn:aws:iam::123456789012:role/S3Access"
100+
home_directory_details = {}
101+
home_directory_type = "LOGICAL"
102+
103+
# THEN must raise an exception
104+
with pytest.raises(ValueError):
105+
response = response.build_authentication_response_efs(
106+
role_arn=role_arn,
107+
home_directory_details=home_directory_details,
108+
home_directory_type=home_directory_type,
109+
user_gid=0,
110+
user_uid=0,
111+
)
112+
113+
114+
def test_build_authentication_response_invalid_type():
115+
# GIVEN a Authorizer response
116+
response = TransferFamilyAuthorizerResponse()
117+
118+
# WHEN set an invalid home_directory_type
119+
invalid_type = "INVALID"
120+
121+
# THEN must raise an exception
122+
with pytest.raises(ValueError):
123+
response.build_authentication_response_s3(
124+
role_arn="arn:aws:iam::123456789012:role/S3Access",
125+
home_directory_type=invalid_type,
126+
)
127+
128+
129+
def test_build_authentication_response_missing_required_params():
130+
# GIVEN a Authorizer response
131+
response = TransferFamilyAuthorizerResponse()
132+
133+
# WHEN set a PATH without home_directory
134+
home_directory_type = "PATH"
135+
136+
# THEN must raise an exception
137+
with pytest.raises(ValueError):
138+
response.build_authentication_response_s3(
139+
role_arn="arn:aws:iam::123456789012:role/S3Access",
140+
home_directory_type=home_directory_type,
141+
# Missing required home_directory for PATH type
142+
)

0 commit comments

Comments
 (0)