Skip to content

Commit 4e160c7

Browse files
authored
Merge branch 'master' into framework-processor-python3
2 parents 30417db + 0eaa7c0 commit 4e160c7

File tree

4 files changed

+54
-11
lines changed

4 files changed

+54
-11
lines changed

CHANGELOG.md

+10
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,15 @@
11
# Changelog
22

3+
## v2.54.0 (2021-08-16)
4+
5+
### Features
6+
7+
* add pytorch 1.5.1 eia configuration
8+
9+
### Bug Fixes and Other Changes
10+
11+
* issue #2253 where Processing job in Local mode would call Describe…
12+
313
## v2.53.0 (2021-08-12)
414

515
### Features

VERSION

+1-1
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
2.53.1.dev0
1+
2.54.1.dev0

src/sagemaker/feature_store/feature_group.py

+32-10
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,8 @@ def _ingest_single_batch(
207207
for row in data_frame[start_index:end_index].itertuples():
208208
record = [
209209
FeatureValue(
210-
feature_name=data_frame.columns[index - 1], value_as_string=str(row[index])
210+
feature_name=data_frame.columns[index - 1],
211+
value_as_string=str(row[index]),
211212
)
212213
for index in range(1, len(row))
213214
if pd.notna(row[index])
@@ -270,13 +271,24 @@ def _run_multi_process(self, data_frame: DataFrame, wait=True, timeout=None):
270271
timeout (Union[int, float]): ``concurrent.futures.TimeoutError`` will be raised
271272
if timeout is reached.
272273
"""
274+
# pylint: disable=I1101
273275
batch_size = math.ceil(data_frame.shape[0] / self.max_processes)
276+
# pylint: enable=I1101
274277

275278
args = []
276279
for i in range(self.max_processes):
277280
start_index = min(i * batch_size, data_frame.shape[0])
278281
end_index = min(i * batch_size + batch_size, data_frame.shape[0])
279-
args += [(data_frame[start_index:end_index], start_index, timeout)]
282+
args += [
283+
(
284+
self.max_workers,
285+
self.feature_group_name,
286+
self.sagemaker_fs_runtime_client_config,
287+
data_frame[start_index:end_index],
288+
start_index,
289+
timeout,
290+
)
291+
]
280292

281293
def init_worker():
282294
# ignore keyboard interrupts in child processes.
@@ -285,13 +297,21 @@ def init_worker():
285297
self._processing_pool = ProcessingPool(self.max_processes, init_worker)
286298
self._processing_pool.restart(force=True)
287299

288-
f = lambda x: self._run_multi_threaded(*x) # noqa: E731
300+
f = lambda x: IngestionManagerPandas._run_multi_threaded(*x) # noqa: E731
289301
self._async_result = self._processing_pool.amap(f, args)
290302

291303
if wait:
292304
self.wait(timeout=timeout)
293305

294-
def _run_multi_threaded(self, data_frame: DataFrame, row_offset=0, timeout=None) -> List[int]:
306+
@staticmethod
307+
def _run_multi_threaded(
308+
max_workers: int,
309+
feature_group_name: str,
310+
sagemaker_fs_runtime_client_config: Config,
311+
data_frame: DataFrame,
312+
row_offset=0,
313+
timeout=None,
314+
) -> List[int]:
295315
"""Start the ingestion process.
296316
297317
Args:
@@ -305,21 +325,23 @@ def _run_multi_threaded(self, data_frame: DataFrame, row_offset=0, timeout=None)
305325
Returns:
306326
List of row indices that failed to be ingested.
307327
"""
308-
executor = ThreadPoolExecutor(max_workers=self.max_workers)
309-
batch_size = math.ceil(data_frame.shape[0] / self.max_workers)
328+
executor = ThreadPoolExecutor(max_workers=max_workers)
329+
# pylint: disable=I1101
330+
batch_size = math.ceil(data_frame.shape[0] / max_workers)
331+
# pylint: enable=I1101
310332

311333
futures = {}
312-
for i in range(self.max_workers):
334+
for i in range(max_workers):
313335
start_index = min(i * batch_size, data_frame.shape[0])
314336
end_index = min(i * batch_size + batch_size, data_frame.shape[0])
315337
futures[
316338
executor.submit(
317-
self._ingest_single_batch,
318-
feature_group_name=self.feature_group_name,
339+
IngestionManagerPandas._ingest_single_batch,
340+
feature_group_name=feature_group_name,
319341
data_frame=data_frame,
320342
start_index=start_index,
321343
end_index=end_index,
322-
client_config=self.sagemaker_fs_runtime_client_config,
344+
client_config=sagemaker_fs_runtime_client_config,
323345
)
324346
] = (start_index + row_offset, end_index + row_offset)
325347

src/sagemaker/lineage/association.py

+11
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,14 @@
1515

1616
from typing import Optional, Iterator
1717
from datetime import datetime
18+
import logging
1819

1920
from sagemaker.apiutils import _base_types
2021
from sagemaker.lineage import _api_types
2122
from sagemaker.lineage._api_types import AssociationSummary
2223

24+
logger = logging.getLogger(__name__)
25+
2326

2427
class Association(_base_types.Record):
2528
"""An Amazon SageMaker artifact, which is part of a SageMaker lineage.
@@ -73,6 +76,10 @@ def set_tag(self, tag=None):
7376
Returns:
7477
list({str:str}): a list of key value pairs
7578
"""
79+
logger.warning(
80+
"set_tag on Association is deprecated. Use set_tag on the source or destination\
81+
entity instead."
82+
)
7683
return self._set_tags(resource_arn=self.source_arn, tags=[tag])
7784

7885
def set_tags(self, tags=None):
@@ -84,6 +91,10 @@ def set_tags(self, tags=None):
8491
Returns:
8592
list({str:str}): a list of key value pairs
8693
"""
94+
logger.warning(
95+
"set_tags on Association is deprecated. Use set_tags on the source or destination\
96+
entity instead."
97+
)
8798
return self._set_tags(resource_arn=self.source_arn, tags=tags)
8899

89100
@classmethod

0 commit comments

Comments
 (0)