进步始于交流
收获源于分享

记一次 Python 大规模数据导入的性能优化

上来先说说背景,数据导入就是把 A 利用 B 导入到 C,就让我们先了解一下这次的 ABC 分别是什么:

  • A 是 csv 文件,原始数据;GB 级别,以逗号作为分隔符,构成了千万行级别的单文件大表格
  • B 是 Python,导入脚本;想到要解析表格,就想起了之前用 java pio 的 hssf 的痛苦,所以人生苦短我用 python
  • C 是 Mysql,与 A 不一样的表结构, InnoDB 引擎,顽强的运行在没有 RAID 的机械硬盘

再让我们说说性能优化的前后:

  • 优化前 2GB/1500W 数据量读取-转换结构-写入时长大于 30h,在 64G 内存的设备上无法正常运行
  • 优化后相同数据规模总市场小于3h,内存占用小于 50G

第一节,小学生广播体操

csv 文件表头范例

origin_id,phone_number,name,id_card,gender,address,birthday,profile_photo,……

DB table 范例

create table user
(
    id            bigint auto_increment comment '自增id' primary key,  
    gender        char(6)      not null default 'MALE' comment 'FEMALE/MALE',
    birthday      date         not null default '1899-12-31' comment '出生日期',
    name          varchar(40)  not null default '' comment '姓名',
    address       varchar(128) not null default '' comment '详细地址',
    profile_photo varchar(255) not null default 'http://xxxx/empty.png' comment '头像存储地址url'
);

create table account
(
    id            bigint auto_increment comment '自增id' primary key,
    user_id       bigint       not null comment '姓名',
    account_type  int          not null comment '账号类型',
    account_value varchar(40)  not null default '' comment '账号'
);

基础代码

代码较多,来个浓缩版

导入思路: csv 文件读取到内存->数据处理->写入到目标表

技术栈选择:sqlalchemy 操作 Mysql,pandas 读取 csv

代码结构:创建目标表的 domain entity,一个脚本用 pandas.read_csv() 读取中间表写入到目标表

如果看代码,请注意里面的锚点注释,后面会分别作优化

代码锚点总结

class User(Base):   # 锚点-实体  利用 sqlalchemy 创建了一个 db table entity
session.add_all([,,,,])   # 锚点-插入  利用 sqlalchemy 批量的创建 insert 语句
session.commit()   # 锚点-提交   利用 sqlalchemy 向 mysql 提交 sql 语句
chunks = pd.read_csv(user_csv_file_path, chunksize=10)  # 锚点-读取  用 pandas 分块读取
for index, row in chunk.iterrows()   # 锚点-遍历   遍历每一行数据(注意这个是在遍历 chunk 不是 chunks)
thread_pool = ProcessPoolExecutor(max_workers=15)  # 锚点-多进程

先来创建一个目标表的 domain entity

from sqlalchemy import Column, Integer, String, DateTime, create_engine
from sqlalchemy.ext.declarative import declarative_base

from config.app_config import *

Base = declarative_base()

class User(Base):   # 锚点-实体
    __tablename__ = "user"

    user_id = Column(String(32), primary_key=True)
    phone_number = Column(String(11), unique=True)
    name = Column(String(64))
    nick_name = Column(String(40))
    id_card = Column(String(32))
    gender = Column(String(64))
    birthday = Column(Date(64))
    address = Column(String(255))

    created_time = Column(String(32))
    updated_time = Column(String(32))

    def __init__(self, org_id, ………… updated_time):
        self.org_id = org_id
                ……
        self.created_time = created_time
        self.updated_time = updated_time

再来创建读写脚本

from sqlalchemy.orm import sessionmaker
import pandas as pd
from sqlalchemy import create_engine
from domain.user import UserRaw
engine = create_engine("mysql+pymysql://%s:%s@%s:%s/%s" % (db_username, db_password, db_url, db_port, db_database))
DBSession = sessionmaker(bind=engine)

if __name__ == '__main__':
    user_csv_file_path = '/xxx/xxx/xxx.csv'
  chunks = pd.read_csv(user_csv_file_path, chunksize=10, sep=',', keep_default_na=False)  # 锚点-读取
  read_number = 0
  session = DBSession()
  for chunk in chunks:
    # 锚点-插入
    session.add_all([User(row["源用户ID"], row["手机号"], row["姓名"], row["昵称"], row["密码"],
                          row["密码加盐值"], row["微信unionID"], row["性别"], row["生日"], row["省份"],
                          row["城市"], row["区县"], row["详细地址"], row["头像"], row["爱好"],
                          row["创建时间"], row["更新时间"]) for index, row in chunk.iterrows()])  # 锚点-遍历
    session.commit()   # 锚点-提交
    read_number += len(chunk)
    print("read_number: %s" % read_number) 
    session.close()

第一版竟然会写成这样……很抱歉用了这个标题名

不是说第一套广播体操不好,是后面的越来越好

第二节,初升的太阳

让我们来优化一下这个

“`chunks = pd.read_csv(user_csv_file_path, chunksize=10) # 锚点-读取 用 pandas 分块读取“`

注意磁盘 IO

要了解自己的程序运行在什么环境下,如果是固态硬盘可能影响不大,如果是机械硬盘,一定要充分地利用每一次磁盘 IO,如果 chunksize 太小,那磁盘一次扫描读取的数据只有一部分被使用了,一次 IO 就这么被浪费了一部分,太可惜了

根据数据行长度选择一个足够利用一次 IO 的数据量,且读取后内存占用符合要求的数量:

“`chunks = pd.read_csv(user_csv_file_path, chunksize=3000) # 锚点-读取 用 pandas 分块读取“`

完整读取到内存

如果数据处理时涉及到了跨行分析,那我们需要全文件读取,在确定能存足够存放时可以换一种读取方法:

import datatable as dt
data: DataFrame = dt.fread(csv_file_path).to_pandas()

datatable 读取以后是行数据的 list,并没有做解析,读取过程可以充分地利用磁盘 IO 快速的入内存。然后利用它的一系列 to_xxxx 方法转换成各种工具库的数据结构

对于目标格式为 pandas 的,通过此方式整体速度会更快一点,1GB 数据也许能节省几十秒

第三节,雏鹰起飞

for index, row in chunk.iterrows()   # 锚点-遍历   遍历每一行数据(注意这个是在遍历 chunk 不是 chunks)

先罗列一下 Pandas 的遍历方法:

  • iloc(index):基于 index 进行指定行数据的获取
  • iterrows(): 按行遍历,将DataFrame的每一行迭代为(index, Series)对,可以通过row[name]对元素进行访问。
  • itertuples(): 按行遍历,将DataFrame的每一行迭代为元祖,可以通过row[name]对元素进行访问,比iterrows()效率高。
  • iteritems():按列遍历,将DataFrame的每一列迭代为(列名, Series)对,可以通过row[index]对元素进行访问。
  • zip(data_frame): Python 函数将可迭代的对象作为参数,将对象中对应的元素打包成一个个元组,然后返回由这些元组组成的列表
  • to_dict(orient=’records’):将整体转换成 python 的 list[dict[]] 的形式,每行是list 的一项,list 内为 header 与 value 的 dict

先排除 iteritems,剩下的做个对比:

for index in df.index: df[index]  # 这个到后面会以为死机了
for index, row in chunk.iterrows()  # 最慢的,但可以获得 index 及 header
for row in chunk.itertuples()  # 没有 index 了,也没有 header 了,但是比 iterrows 快很多,我的 case 快了 50 多倍
for row in zip(data_frame)  # 没有 index 了,也没有 header 了,但是比 itertuples 稍微快点,我的 case 快了一倍
to_dict(orient='records')  # 没有 index,有 header,比 iterrows 快,比 itertuples 慢点

选择什么要看是否需要 header 信息,如果 csv 数据的列排序不稳定,可以用包含 header 的方法,当然也可以选择先用 pd 进行列重拍,再使用 zip 处理

第四节,希望风帆

session.commit()   # 锚点-提交   利用 sqlalchemy 向 mysql 提交 sql 语句

数据导入并不是正常的业务逻辑操作,我们不需要每次操作行为都必须进行 commit,可以根据实际场景进行操作聚合后整体 commit,其实 add_all() 就是将很多 insert 进行了聚合。通过降低 commit 的总数量也是可以提高速度的

做 commit 次数优化时注意:

  • 不会被参考的数据:对一个数据的最终操作,不会再被其他逻辑查询使用,才能直接省略,否则需要修改查询逻辑,如 phone_number 如果需要全表去重,在保证无其他数据操作者的情况下,可以对当前 chunk 的所有 phone_number 进行批量查询判断
  • 不要丢掉最后的 commit:丢了会导致最后一批数据没有写入,也没有报错
  • 局部变量初始化:如果逻辑被改变很可能出现一个 for 循环拆成了多个的情况,在完成一个循环后务必注意局部变量的初始化操作,最好保证用过的变量及时主动地 del 以免逻辑不断地拆分、合并的过程中出错

第五节,七彩阳光

session.add_all([,,,,])   # 锚点-插入  利用 sqlalchemy 批量的创建 insert 语句

有没有尝试看看上面的返回结果?真的是想象中的那样么?

engine = create_engine("<db_rul>", echo=True) 
-----
INSERT INTO `xxx`.user (user_id, ……) VALUES (1, ……);
INSERT INTO `xxx`.user (user_id, ……) VALUES (2, ……);
INSERT INTO `xxx`.user (user_id, ……) VALUES (3, ……);

Add all 只是打包了我们要执行的 sql,并不是想象中的使用了批量插入语句,下面才是批量插入的方法:

session.execute(User.__table__.insert(), [{'id':1,'name':'a'}, ['id':2,'name':'b']])
-----
INSERT INTO `xxx`.user (user_id, ……) VALUES (1, ……),(2,……);

批量插入有多种方式,除了上面的还有:session.bulk_save_objects、session.bulk_insert_mappings 方法,可以自行测一下性能差异,并根据使用场景选择合适的 method

除此以外如果要把多行的某个字段改为相同的值还可以用批量 update:

session.query(User).filter(User.id.in__([1,2,3,4,5]).update({User.name:'coologic'})

第六节,青春的活力

class User(Base):   # 锚点-实体  利用 sqlalchemy 创建了一个 db table entity

青春的活力,让我们减重前行,这一步进行内存占用优化

继承一个父类,一定要看看他都做了什么……1GB 的 csv 文件读取后占用了 30GB 的内存,是否会让人怀疑内存都去哪了?

这一步的优化是和批量写入联合生效的,注意代码,批量写入并没有使用任何的 User object,只是用了 User 的声明,那为什么还要把读取的数据放到 User 里?

session.execute(User.__table__.insert(), [{'id':1,'name':'a'}, ['id':2,'name':'b']])
  • 方案1 — 创建一个新的
    “`class UserDto“` 不继承 Base 即可,读取到的信息都放到 UserDto 即可,内存占用会急剧下降
  • 方案2 — 联合
    “`to_dict(orient=’records’)“`直接生成了 dict 写入即可,此方案适合于没有逻辑处理直接写入的导入过程,但要注意变量命名,不要单行代码完成任务导致难以解读

优化内存的同时,其实也提高了运行速度

第七节,时代在召唤

thread_pool = ProcessPoolExecutor(max_workers=15)  # 锚点-多进程

对于读取 csv 导入到 mysql 这个流程来说,这一步优化的效果并不显著,但如果有一个中间库存储了 csv 的内容,在进行中间库到目标库的拷贝,此时大量的 select 等待时间会被浪费掉,我们就要多线程的跑起来,充分地利用 CPU 时间片

开始尝试过多线程、修改 create_engine 参数提高连接数量……各种方式发现效果不明显,数据库完全没有被充分调用,看来是被 GIL 锁“限流”了,那只能多进程跑了

用到多进程不要忘记:通过 fork 创建的子进程会拷贝父进程的代码段、数据段、静态数据段、堆、栈、IO缓冲区

由于在 linux 下 python 多进程是调用的 os.fork(),所以尽早的创建出来进程池的进程们,并保证主进程足够的 clean,降低每次 fork 的无关数据量

否则在内存已经占用 10GB 的时候再去 fork 出 10 个………………

第八节,舞动青春

最后来点其他的注意事项

  • 备份:不要完全依赖 binlog,还是要备份的
  • 索引要不要:大规模写入的时候,索引很拖后腿,能干掉么?看目标表的使用情况吧
  • 批量 insert:小心一颗老鼠屎坏了一锅粥
  • 断点续传的导入:也许关键时刻能让自己早下班几个小时
  • 测试:如果不想每次都人眼扫描一遍所有数据提升“找茬”功底,那测试必不可少,复杂的数据处理逻辑单元测试、整体导入的端到端测试……
  • 多写多写多写:又不是自己用笔写,各种 log 不要吝啬。可能有用的、不出事用不上的、也就业务看看技术不关心的、这地方如果崩溃了反正都要重头再来的……等真的步入深渊,你会后悔这些地方为啥不加 log!
  • 监控监控监控:当 log 不足、不明所以的失败以后,最少还有地方知道自己的脚本在哪个时间点挂了,我不是个什么都不知道的人……
  • 自动化、流程固化:主流程自动化,关键性预案的执行脚本固化,降低人工介入,尤其是避免灾难发生时的恐慌性随机试探(肆无忌惮的破坏)……
  • 注意考虑硬件特性:机械硬盘的随机 4k 读写性能很差会影响什么呢?CPU 单核主频低要如何进行代码优化?单核主频高核心数少呢?网络带宽有限情况下如何保证导入速度?
  • 生产环境账号:一个每天都在删表、清表、delete、重新导入、测试的人,要是某天选错了 db 呢?最多给个只读账号!切记!不要以为禁用了 drop 权限就够了,当脚本连到了生产库,那瞬间的操作量也许比今晚团队的宵夜都多
  • 性能测试不要到最后才做:大规模数据操作的过程中,操作耗时本身就是一个很重要的指标,而且再优化的过程中很可能触发大量的代码变动甚至方案变化
  • 评估性能时要力所能及的控制变量:用 4c16g、8c32g 的服务器都测了一次以后,就能预测到 16c128g 服务器上跑的会更快么?
  • 预案要多一些:集思广益,全面分析,虽然力求一次成功,但灾难下的生存能力也不要忘了
  • 注意!注意!注意!上面说的所有都未考虑:生产库一般并不是脱机离线状态!数据一致性,负载控制,避免死锁,数据恢复……

第九节,放飞理想

突然想到广播体操,看完这篇博客,适当的运动一下吧:收藏关注与点赞

看着床外零星的灯光,脑海里回荡着白天楼上装修的电钻声,也许换个黑轴键盘能盖住这挥之不去的……

赞(2) 打赏
未经允许不得转载:Coologic » 记一次 Python 大规模数据导入的性能优化

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏