投稿: 2022/09/01
更新: 2023/02/15
【Python】SQLAlchemyでバルクインサート/バルクアップデートしたい
結論
sqlalchemy.sql.schema.Table.insert() , sqlalchemy.sql.schema.Table.update() を使います。
バルクインサート
1import sqlalchemy as sa
2
3table = sa.Table('table_name', sa.MetaData(),
4 sa.Column('id', sa.Integer),
5 sa.Column('name', sa.String),
6 )
7
8with engine.connect() as conn:
9 data = [
10 {'id': 1, 'name': 'alice', 'email': 'alice@example.com'},
11 {'id': 2, 'name': 'bob', 'email': 'bob@example.com'},
12 {'id': 2, 'name': 'carol', 'email': 'carol@example.com'}
13 ]
14
15 stmt: sa.sql.expression.Insert = table.insert()
16
17 result = conn.execute(stmt, data)バルクアップデート
1import sqlalchemy as sa
2
3table = sa.Table('table_name', sa.MetaData(),
4 sa.Column('id', sa.Integer),
5 sa.Column('name', sa.String),
6 )
7
8with engine.connect() as conn:
9 data = [
10 {'_id': 1, 'name': 'alice', 'email': 'alice@example.com'},
11 {'_id': 2, 'name': 'bob', 'email': 'bob@example.com'},
12 {'_id': 2, 'name': 'carol', 'email': 'carol@example.com'}
13 ]
14
15 stmt: sa.sql.expression.Update = table.update().where(
16 table.c.id == sa.sql.expression.bindparam('_id') # 仕様の関係上 id という名称のカラムは _id という辞書のキーにバインドする
17 ).values({
18 table.c.name: sa.sql.expression.bindparam('name'),
19 table.c.email: sa.sql.expression.bindparam('email')
20 })
21
22 result = conn.execute(stmt, sql)サンプルソースコード
1from sqlalchemy.engine.base import Engine
2import sqlalchemy as sa
3import os
4
5DB_USER = os.getenv('DB_USER') or 'root'
6DB_PASS = os.getenv('DB_PASS') or 'root'
7DB_HOST = os.getenv('DB_HOST') or '127.0.0.1'
8DB_PORT = os.getenv('DB_PORT') or '3306'
9DB_NAME = os.getenv('DB_NAME') or 'test_db'
10
11dsn = f'mysql+mysqldb://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/{DB_NAME}?charset=utf8mb4'
12
13engine: Engine = sa.create_engine(dsn, encoding='utf-8', echo=False)
14
15table_name = 'table_name'
16metadata_obj = sa.MetaData()
17
18table = sa.Table(table_name, metadata_obj,
19 sa.Column('id', sa.Integer),
20 sa.Column('name', sa.String),
21 sa.Column('email', sa.String),
22 )
23
24with engine.connect() as conn:
25 data = [
26 {'id': 1, 'name': 'alice', 'email': 'alice@example.com'},
27 {'id': 2, 'name': 'bob', 'email': 'bob@example.com'},
28 {'id': 2, 'name': 'carol', 'email': 'carol@example.com'}
29 ]
30
31 stmt: sa.sql.expression.Insert = table.insert()
32
33 result = conn.execute(stmt, data)
34
35with engine.connect() as conn:
36 data = [
37 {'_id': 1, 'name': 'alice', 'email': 'alice@example.com'},
38 {'_id': 2, 'name': 'bob', 'email': 'bob@example.com'},
39 {'_id': 2, 'name': 'carol', 'email': 'carol@example.com'}
40 ]
41
42 stmt: sa.sql.expression.Update = table.update().where(
43 table.c.id == sa.sql.expression.bindparam('_id') # 仕様の関係上 id という名称のカラムは _id という辞書のキーにバインドする
44 ).values({
45 table.c.name: sa.sql.expression.bindparam('name'),
46 table.c.email: sa.sql.expression.bindparam('email')
47 })
48
49 result = conn.execute(stmt, data)