Skip to content

请教PYODPS多进程读写数据问题 #152

Open
@templaryang

Description

@templaryang

问题背景:需要读取大约500万行,2个字段的表,然后计算出一个同等记录行数结果集,都用dataframe下载、计算、并上传到Dataphin上,现在性能瓶颈在下载数据(需要3分钟)和上传数据(需要10分钟),计算只需要2分钟完成。

目的:为了改善读写性能,查看PYODPS文档,支持多进程读写数据

  1. 支持多进程读取,编写和示例代码几乎一样的代码,但报错
t = odps.get_table('test_table')
n_process = multiprocessing.cpu_count()
with t.open_reader() as reader:
    pd_df = reader.to_pandas(n_process=n_process)

问题:为什么报错?:TypeError: to_pandas() got an unexpected keyword argument 'n_process'

  1. 写入表也是支持多进程程写入,但我对比过,比我单进程写入更慢,为什么?
    1. 单进程写入
records = df.values().values.tolist()
o.write_table('test_table', records)
2. 多进程写入
def write_records(session_id, block_id):
    local_session = tunnel.create_upload_session(table.name, upload_id=session_id)
    with local_session.open_record_writer(block_id) as writer:
        maxlen = len(records)
        strlen = int(maxlen/N_WORKERS) * block_id
        endlen = int(maxlen/N_WORKERS) * (block_id + 1)
        if block_id == N_WORKERS - 1:
            endlen = maxlen
        for row in range(strlen, endlen):
            record = table.new_record(records[row])
            writer.write(record)
if __name__ == '__main__':
    records = df.values().values.tolist()
    N_WORKERS = 4
    table = odps.get_table("test_table")
    tunnel = TableTunnel(odps)
    upload_session = tunnel.create_upload_session(table.name)
    session_id = upload_session.id
    pool = Pool(processes=N_WORKERS)
    block_ids = []
    for i in range(N_WORKERS):
        pool.apply_async(write_records, (session_id, i))
        block_ids.append(i)
    pool.close()
    pool.join()
    upload_session.commit(block_ids)

** 问题:多进程比单进程更慢,单进程在大约10分钟完成500万记录的插入,多进程超过20分钟仍未结束(已经设置需要的cpu为4) **

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions