【Python】SQLAlchemyでバルクインサート/バルクアップデートしたい

結論

sqlalchemy.sql.schema.Table.insert() , sqlalchemy.sql.schema.Table.update() を使います。

バルクインサート

 1import sqlalchemy as sa
 2
 3table = sa.Table('table_name', sa.MetaData(),
 4                 sa.Column('id', sa.Integer),
 5                 sa.Column('name', sa.String),
 6                 )
 7
 8with engine.connect() as conn:
 9    data = [
10        {'id': 1, 'name': 'alice', 'email': 'alice@example.com'},
11        {'id': 2, 'name': 'bob', 'email': 'bob@example.com'},
12        {'id': 2, 'name': 'carol', 'email': 'carol@example.com'}
13    ]
14
15    stmt: sa.sql.expression.Insert = table.insert()
16
17    result = conn.execute(stmt, data)

バルクアップデート

 1import sqlalchemy as sa
 2
 3table = sa.Table('table_name', sa.MetaData(),
 4                 sa.Column('id', sa.Integer),
 5                 sa.Column('name', sa.String),
 6                 )
 7
 8with engine.connect() as conn:
 9    data = [
10        {'_id': 1, 'name': 'alice', 'email': 'alice@example.com'},
11        {'_id': 2, 'name': 'bob', 'email': 'bob@example.com'},
12        {'_id': 2, 'name': 'carol', 'email': 'carol@example.com'}
13    ]
14
15    stmt: sa.sql.expression.Update = table.update().where(
16        table.c.id == sa.sql.expression.bindparam('_id')  # 仕様の関係上 id という名称のカラムは _id という辞書のキーにバインドする
17    ).values({
18        table.c.name: sa.sql.expression.bindparam('name'),
19        table.c.email: sa.sql.expression.bindparam('email')
20    })
21
22    result = conn.execute(stmt, sql)

サンプルソースコード

 1from sqlalchemy.engine.base import Engine
 2import sqlalchemy as sa
 3import os
 4
 5DB_USER = os.getenv('DB_USER') or 'root'
 6DB_PASS = os.getenv('DB_PASS') or 'root'
 7DB_HOST = os.getenv('DB_HOST') or '127.0.0.1'
 8DB_PORT = os.getenv('DB_PORT') or '3306'
 9DB_NAME = os.getenv('DB_NAME') or 'test_db'
10
11dsn = f'mysql+mysqldb://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/{DB_NAME}?charset=utf8mb4'
12
13engine: Engine = sa.create_engine(dsn, encoding='utf-8', echo=False)
14
15table_name = 'table_name'
16metadata_obj = sa.MetaData()
17
18table = sa.Table(table_name, metadata_obj,
19                 sa.Column('id', sa.Integer),
20                 sa.Column('name', sa.String),
21                 sa.Column('email', sa.String),
22                 )
23
24with engine.connect() as conn:
25    data = [
26        {'id': 1, 'name': 'alice', 'email': 'alice@example.com'},
27        {'id': 2, 'name': 'bob', 'email': 'bob@example.com'},
28        {'id': 2, 'name': 'carol', 'email': 'carol@example.com'}
29    ]
30
31    stmt: sa.sql.expression.Insert = table.insert()
32
33    result = conn.execute(stmt, data)
34
35with engine.connect() as conn:
36    data = [
37        {'_id': 1, 'name': 'alice', 'email': 'alice@example.com'},
38        {'_id': 2, 'name': 'bob', 'email': 'bob@example.com'},
39        {'_id': 2, 'name': 'carol', 'email': 'carol@example.com'}
40    ]
41
42    stmt: sa.sql.expression.Update = table.update().where(
43        table.c.id == sa.sql.expression.bindparam('_id')  # 仕様の関係上 id という名称のカラムは _id という辞書のキーにバインドする
44    ).values({
45        table.c.name: sa.sql.expression.bindparam('name'),
46        table.c.email: sa.sql.expression.bindparam('email')
47    })
48
49    result = conn.execute(stmt, data)