Open
Description
问题背景:需要读取大约500万行,2个字段的表,然后计算出一个同等记录行数结果集,都用dataframe下载、计算、并上传到Dataphin上,现在性能瓶颈在下载数据(需要3分钟)和上传数据(需要10分钟),计算只需要2分钟完成。
目的:为了改善读写性能,查看PYODPS文档,支持多进程读写数据
- 支持多进程读取,编写和示例代码几乎一样的代码,但报错
t = odps.get_table('test_table')
n_process = multiprocessing.cpu_count()
with t.open_reader() as reader:
pd_df = reader.to_pandas(n_process=n_process)
问题:为什么报错?:TypeError: to_pandas() got an unexpected keyword argument 'n_process'
- 写入表也是支持多进程程写入,但我对比过,比我单进程写入更慢,为什么?
- 单进程写入
records = df.values().values.tolist()
o.write_table('test_table', records)
2. 多进程写入
def write_records(session_id, block_id):
local_session = tunnel.create_upload_session(table.name, upload_id=session_id)
with local_session.open_record_writer(block_id) as writer:
maxlen = len(records)
strlen = int(maxlen/N_WORKERS) * block_id
endlen = int(maxlen/N_WORKERS) * (block_id + 1)
if block_id == N_WORKERS - 1:
endlen = maxlen
for row in range(strlen, endlen):
record = table.new_record(records[row])
writer.write(record)
if __name__ == '__main__':
records = df.values().values.tolist()
N_WORKERS = 4
table = odps.get_table("test_table")
tunnel = TableTunnel(odps)
upload_session = tunnel.create_upload_session(table.name)
session_id = upload_session.id
pool = Pool(processes=N_WORKERS)
block_ids = []
for i in range(N_WORKERS):
pool.apply_async(write_records, (session_id, i))
block_ids.append(i)
pool.close()
pool.join()
upload_session.commit(block_ids)
** 问题:多进程比单进程更慢,单进程在大约10分钟完成500万记录的插入,多进程超过20分钟仍未结束(已经设置需要的cpu为4) **
Metadata
Metadata
Assignees
Labels
No labels