-
Notifications
You must be signed in to change notification settings - Fork 1.2k
feature: support multiprocess feature group ingest (#2111) #2288
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
data_frame (DataFrame): source DataFrame to be ingested. | ||
wait (bool): whether to wait for the ingestion to finish or not. | ||
timeout (Union[int, float]): ``concurrent.futures.TimeoutError`` will be raised | ||
if timeout is reached. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this need to indented
self.message = message | ||
|
||
def __str__(self): | ||
return f'{self.failed_rows} -> {self.message}' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's convention to use double quotation marks
args = [] | ||
for i in range(self.max_processes): | ||
start_index = min(i * batch_size, data_frame.shape[0]) | ||
end_index = min(i * batch_size + batch_size, data_frame.shape[0]) | ||
args += [(data_frame[start_index:end_index], start_index, timeout)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
optional
args = [] | |
for i in range(self.max_processes): | |
start_index = min(i * batch_size, data_frame.shape[0]) | |
end_index = min(i * batch_size + batch_size, data_frame.shape[0]) | |
args += [(data_frame[start_index:end_index], start_index, timeout)] | |
args = [ | |
data_frame[ | |
min(i * batch_size, data_frame.shape[0]): | |
min(i * batch_size + batch_size, data_frame.shape[0])) | |
], | |
min(i * batch_size, data_frame.shape[0]), timeout) | |
for i in range(self.max_processes) | |
] |
fs_runtime_client_config_mock, | ||
): | ||
df = pd.DataFrame({"float": pd.Series([2.0], dtype="float64")}) | ||
manager = IngestionManagerPandas( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Recommendation generated by Amazon CodeGuru Reviewer. Leave feedback on this recommendation by replying to the comment or by reacting to the comment using emoji.
Problem:
This line of code lacks validation when processing input data through the following parameter: 'fs_runtime_client_config_mock' (index: 0 | type: Unknown). The parameter is exposed to external callers, because its enclosing class and method are publicly accessible. This means that upstream validation, if it exists, can be bypassed. Other methods validating the parameter: 'test_ingest'. The same parameter type is validated here for example: src/sagemaker/feature_store/feature_group.py:196. Malicious, malformed, or unbounded inputs can cause unexpected runtime behavior or crashes, and can slow performance.
Fix:
Add checks to ensure the validity of the parameter's value, such as testing it for nullness, emptiness, or equality. Or to prevent direct calls to it, reduce the method's visibility.
Learn more about potential threats and guidance from the Common Weakness Enumeration website and the OWASP Cheat Sheet series.
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
Co-authored-by: Alex <[email protected]> Co-authored-by: icywang86rui <[email protected]>
*Issue #, if available: 2111
*Description of changes:
*Testing done:
wait=True
andwait=False
functionality with IngestionManager.wait()Merge Checklist
Put an
x
in the boxes that apply. You can also fill these out after creating the PR. If you're unsure about any of them, don't hesitate to ask. We're here to help! This is simply a reminder of what we are going to look for before merging your pull request.General
Tests
unique_name_from_base
to create resource names in integ tests (if appropriate)By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.