简介
PyMySQL 库可以用来操作 MySQL。
官方文档:
- https://pypi.org/project/PyMySQL/
- https://pymysql.readthedocs.io/en/latest/
- https://github.com/PyMySQL/PyMySQL
安装
使用 pip 安装 PyMySQL:
$ python3 -m pip install PyMySQL
获取 DB 列表
方式1:
import pymysql
conn = pymysql.connect(host='127.0.0.1',
user='root',
password='123456',
port=3306)
cursor = conn.cursor()
cursor.execute('show databases;')
for item in cursor.fetchall():
print(type(item), item) # item 是 tuple 元组
cursor.close()
conn.close()
运行结果示例:
<class 'tuple'> ('information_schema',)
<class 'tuple'> ('mysql',)
<class 'tuple'> ('performance_schema',)
<class 'tuple'> ('school',)
<class 'tuple'> ('test',)
方式2:
查询结果可以是 dict 类型,不过需要生成 cursor 时指定 cursor 参数为 pymysql.cursors.DictCursor 。
import pymysql
conn = pymysql.connect(host='127.0.0.1',
user='root',
password='123456',
port=3306)
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor) # 指定 cursor 后,结果会被处理为 dict
cursor.execute('show databases;')
for item in cursor.fetchall():
print(type(item), item)
cursor.close()
conn.close()
运行结果示例:
<class 'dict'> {'Database': 'information_schema'}
<class 'dict'> {'Database': 'mysql'}
<class 'dict'> {'Database': 'performance_schema'}
<class 'dict'> {'Database': 'school'}
<class 'dict'> {'Database': 'test'}
创建数据库
import pymysql
conn = pymysql.connect(host='127.0.0.1',
user='root',
password='123456',
port=3306)
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
affect_rows = cursor.execute('create database blog;')
print(affect_rows) # 输出 1
cursor.close()
conn.close()
运行后,可以在 MySQL 中看到 blog 库。
切换数据库
我们以查询某个库下所有的表为例。
方式1
先创建数据库连接,然后使用 use dbname 选择数据库。
import pymysql
conn = pymysql.connect(host='127.0.0.1',
user='root',
password='123456',
port=3306)
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
cursor.execute('use information_schema;')
cursor.execute('show tables;')
for item in cursor.fetchall():
print(type(item), item)
cursor.close()
conn.close()
运行结果示例(省略部分内容):
<class 'dict'> {'Tables_in_information_schema': 'CHARACTER_SETS'}
<class 'dict'> {'Tables_in_information_schema': 'COLLATIONS'}
<class 'dict'> {'Tables_in_information_schema': 'COLLATION_CHARACTER_SET_APPLICABILITY'}
<class 'dict'> {'Tables_in_information_schema': 'COLUMNS'}
<class 'dict'> {'Tables_in_information_schema': 'COLUMN_PRIVILEGES'}
<class 'dict'> {'Tables_in_information_schema': 'ENGINES'}
<class 'dict'> {'Tables_in_information_schema': 'EVENTS'}
<class 'dict'> {'Tables_in_information_schema': 'FILES'}
<class 'dict'> {'Tables_in_information_schema': 'GLOBAL_STATUS'}
<class 'dict'> {'Tables_in_information_schema': 'GLOBAL_VARIABLES'}
方式2
连接时指定数据库:
import pymysql
conn = pymysql.connect(host='127.0.0.1',
user='root',
password='123456',
port=3306,
database='information_schema') # 指定数据库
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
cursor.execute('show tables;')
for item in cursor.fetchall():
print(type(item), item)
cursor.close()
conn.close()
创建表
import pymysql
conn = pymysql.connect(host='127.0.0.1',
user='root',
password='123456',
port=3306,
database='blog')
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
affect_rows = cursor.execute('''
create table `user_info` (
`id` bigint unsigned not null auto_increment,
`name` varchar(45) not null default '',
primary key (`id`)
) engine = InnoDB default charset = utf8mb4;
''')
print(affect_rows) # 输出0
cursor.close()
conn.close()
添加索引
创建表
import pymysql
conn = pymysql.connect(host='127.0.0.1',
user='root',
password='123456',
port=3306,
database='blog')
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
affect_rows = cursor.execute('''
create table `user_info` (
`id` bigint unsigned not null auto_increment,
`name` varchar(45) not null default '',
primary key (`id`)
) engine = InnoDB default charset = utf8mb4;
''')
print(affect_rows) # 输出0
cursor.close()
conn.close()
给表添加索引
import pymysql
conn = pymysql.connect(host='127.0.0.1',
user='root',
password='123456',
port=3306,
database='blog')
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
affect_rows = cursor.execute('''
ALTER TABLE `user_info` ADD INDEX idx_name(`name`);
''')
print(affect_rows) # 输出0
cursor.close()
conn.close()
执行后,通过mysql客户端查询表的创建语句,会发现多了新的索引:
mysql> show create table user_info\G
*************************** 1. row ***************************
Table: user_info
Create Table: CREATE TABLE `user_info` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
`name` varchar(45) NOT NULL DEFAULT '',
PRIMARY KEY (`id`),
KEY `idx_name` (`name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
1 row in set (0.00 sec)
插入数据
建表
在 blog 库下建表:
create table `user_info` (
`id` bigint unsigned not null auto_increment,
`name` varchar(45) not null default '',
primary key (`id`)
) engine = InnoDB default charset = utf8mb4;
直接执行组装后的 sql
注意,这种方式有sql注入风险,会影响数据安全!!
import pymysql
conn = pymysql.connect(host='127.0.0.1',
user='root',
password='123456',
port=3306,
database='blog')
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
sql = '''INSERT INTO `user_info` (name) values('{}');'''.format('name001')
affect_rows = cursor.execute(sql)
print('影响行数: ', affect_rows)
print('插入数据对应的主键id: ', cursor.lastrowid)
conn.commit() # 必须加这个
cursor.close()
conn.close()
运行结果示例:
影响行数: 1
插入数据对应的主键id: 2
查询数据库中的数据:
mysql> select * from user_info;
+----+---------+
| id | name |
+----+---------+
| 2 | name001 |
+----+---------+
若不想每次都添加conn.commit()
,可以在创建数据库连接时,指定 autocommit 为 True。
示例:
import pymysql
conn = pymysql.connect(host='127.0.0.1',
user='root',
password='123456',
port=3306,
database='blog',
autocommit=True # 指定 autocommit 为 True
)
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
sql = '''INSERT INTO `user_info` (name) values('{}');'''.format('name002')
affect_rows = cursor.execute(sql)
print('影响行数: ', affect_rows)
print('插入数据对应的主键id: ', cursor.lastrowid)
cursor.close()
conn.close()
使用参数化语句添加数据 - 使用元组/列表指定数据
这个和 Java jdbc 的 prepare statement 参数化声明不同。pymysql 的参数化语句本质上仍然是组装 sql,不过会对数据进行安全转义,以防止 sql 注入。
示例:
import pymysql
conn = pymysql.connect(host='127.0.0.1',
user='root',
password='123456',
port=3306,
database='blog',
autocommit=True
)
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
affect_rows = cursor.execute('INSERT INTO `user_info` (name) values(%s);', ['name003'])
print('影响行数: ', affect_rows)
print('插入数据对应的主键id: ', cursor.lastrowid)
cursor.close()
conn.close()
使用参数化语句添加数据 - 使用字典指定数据
示例:
import pymysql
conn = pymysql.connect(host='127.0.0.1',
user='root',
password='123456',
port=3306,
database='blog',
autocommit=True
)
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
affect_rows = cursor.execute('INSERT INTO `user_info` (name) values(%(name)s);', {'name': 'name004'})
print('影响行数: ', affect_rows)
print('插入数据对应的主键id: ', cursor.lastrowid)
cursor.close()
conn.close()
如何使用参数化声明
上面提到,pymysql 的参数化语句和 Java jdbc 的 prepare statement 参数化声明不同。pymysql 的参数化语句本质上仍然是组装 sql,不过会对数据进行安全转义,以防止 sql 注入。
MySQL 的官方 Python 库 mysql.connector
是支持的,具体可参考:
获取插入数据的主键 ID
插入数据后,通过cursor.lastrowid
可以获取对应数据的主键id。具体使用方式见上面的示例代码。
一次插入多条数据
使用 executemany 方法。
示例:
import pymysql
conn = pymysql.connect(host='127.0.0.1',
user='root',
password='123456',
port=3306,
database='blog',
autocommit=True
)
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
affect_rows = cursor.executemany('INSERT INTO `user_info` (name) values(%(name)s);',
[
{'name': 'name005'},
{'name': 'name006'}
])
print('影响行数: ', affect_rows) # 输出 2
print('插入数据对应的主键id: ', cursor.lastrowid) # 这里只返回第一个数据的id
cursor.close()
conn.close()
运行结果示例:
影响行数: 2
插入数据对应的主键id: 6
根据注释,executemany 方法可以针对 insert、replace SQL语句使用。
查询数据
执行 select 后,可以通过 cursor.fetchone、cursor.fetchmany、cursor.fetchall 获取数据。注意,查询结果中的每一条数据只能被fetch一次,被取出后。下一次 fetch 就拿不到数据了。
以下是示例:
建表和数据准备
在 blog 库下建表:
create table `user_info` (
`id` bigint unsigned not null auto_increment,
`name` varchar(45) not null default '',
primary key (`id`)
) engine = InnoDB default charset = utf8mb4;
插入数据,数据内容如下:
mysql> select * from user_info;
+----+---------+
| id | name |
+----+---------+
| 2 | name001 |
| 3 | name002 |
| 4 | name003 |
| 5 | name004 |
| 6 | name005 |
| 7 | name006 |
+----+---------+
6 rows in set (0.01 sec)
指定 id 查询
import pymysql
conn = pymysql.connect(host='127.0.0.1',
user='root',
password='123456',
port=3306,
database='blog',
autocommit=True
)
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
affect_rows = cursor.execute('select id, name from user_info where id=%s', [2])
print('影响行数: ', affect_rows)
print('fetchone 结果: ', cursor.fetchone())
print('fetchall 结果: ', cursor.fetchall())
cursor.close()
conn.close()
运行结果示例:
影响行数: 1
fetchone 结果: {'id': 2, 'name': 'name001'}
fetchall 结果: []
可以看到 fetchall 的时候没有数据。
如果将 fetch 的顺序调整成:
print('fetchall 结果: ', cursor.fetchall())
print('fetchone 结果: ', cursor.fetchone())
运行结果是:
fetchall 结果: [{'id': 2, 'name': 'name001'}]
fetchone 结果: None
根据 id 范围查询
import pymysql
conn = pymysql.connect(host='127.0.0.1',
user='root',
password='123456',
port=3306,
database='blog',
autocommit=True
)
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
affect_rows = cursor.execute('select id, name from user_info where id>%s', [2])
print('影响行数: ', affect_rows)
print('fetchall 结果: ', cursor.fetchall())
print('fetchone 结果: ', cursor.fetchone())
cursor.close()
conn.close()
运行结果示例:
影响行数: 5
fetchall 结果: [{'id': 3, 'name': 'name002'}, {'id': 4, 'name': 'name003'}, {'id': 5, 'name': 'name004'}, {'id': 6, 'name': 'name005'}, {'id': 7, 'name': 'name006'}]
fetchone 结果: None
如果只用fetchone 不停的取数据:
import pymysql
conn = pymysql.connect(host='127.0.0.1',
user='root',
password='123456',
port=3306,
database='blog',
autocommit=True
)
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
affect_rows = cursor.execute('select id, name from user_info where id>%s', [2])
print('影响行数: ', affect_rows)
print('fetchone 结果: ', cursor.fetchone())
print('fetchone 结果: ', cursor.fetchone())
print('fetchone 结果: ', cursor.fetchone())
print('fetchone 结果: ', cursor.fetchone())
print('fetchone 结果: ', cursor.fetchone())
print('fetchone 结果: ', cursor.fetchone())
print('fetchone 结果: ', cursor.fetchone())
cursor.close()
conn.close()
运行结果如下:
影响行数: 5
fetchone 结果: {'id': 3, 'name': 'name002'}
fetchone 结果: {'id': 4, 'name': 'name003'}
fetchone 结果: {'id': 5, 'name': 'name004'}
fetchone 结果: {'id': 6, 'name': 'name005'}
fetchone 结果: {'id': 7, 'name': 'name006'}
fetchone 结果: None
fetchone 结果: None
当 fetchone 取不到数据时会返回 None,当 fetchall 取不到数据时会返回空list。
删除数据
建表和数据准备
在 blog 库下建表:
create table `user_info` (
`id` bigint unsigned not null auto_increment,
`name` varchar(45) not null default '',
primary key (`id`)
) engine = InnoDB default charset = utf8mb4;
插入数据,数据内容如下:
mysql> select * from user_info;
+----+---------+
| id | name |
+----+---------+
| 2 | name001 |
| 3 | name002 |
| 4 | name003 |
| 5 | name004 |
| 6 | name005 |
| 7 | name006 |
+----+---------+
6 rows in set (0.01 sec)
删除数据
import pymysql
conn = pymysql.connect(host='127.0.0.1',
user='root',
password='123456',
port=3306,
database='blog',
autocommit=True
)
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
affect_rows = cursor.execute('delete from user_info where id=%s', [2])
print('影响行数: ', affect_rows)
cursor.close()
conn.close()
运行结果:
影响行数: 1
更新数据
建表和数据准备
在 blog 库下建表:
create table `user_info` (
`id` bigint unsigned not null auto_increment,
`name` varchar(45) not null default '',
primary key (`id`)
) engine = InnoDB default charset = utf8mb4;
插入数据,数据内容如下:
mysql> select * from user_info;
+----+---------+
| id | name |
+----+---------+
| 2 | name001 |
| 3 | name002 |
| 4 | name003 |
| 5 | name004 |
| 6 | name005 |
| 7 | name006 |
+----+---------+
6 rows in set (0.01 sec)
更新数据
import pymysql
conn = pymysql.connect(host='127.0.0.1',
user='root',
password='123456',
port=3306,
database='blog',
autocommit=True
)
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
affect_rows = cursor.execute('update user_info set name=%s where id=%s', ['新名字', 2])
print('影响行数: ', affect_rows)
cursor.close()
conn.close()
运行结果:
影响行数: 1
如果再次执行,运行结果是:
影响行数: 0
为什么影响行数变成0了 ?因为 MySQL 发现要更新的 name 值和已有值相同,就不更新了。
数据库事务
- 开启事务:conn.begin()
- 提交事务:conn.commit()
- 回滚事务:conn.rollback()
以下为示例。
建表
在 blog 库下建表:
create table `user_info` (
`id` bigint unsigned not null auto_increment,
`name` varchar(45) not null default '',
primary key (`id`)
) engine = InnoDB default charset = utf8mb4;
插入数据,数据内容如下:
mysql> select * from user_info;
+----+---------+
| id | name |
+----+---------+
| 2 | name001 |
| 3 | name002 |
| 4 | name003 |
| 5 | name004 |
| 6 | name005 |
| 7 | name006 |
+----+---------+
6 rows in set (0.01 sec)
示例1:开启和提交事务
import pymysql
conn = pymysql.connect(host='127.0.0.1',
user='root',
password='123456',
port=3306,
database='blog',
autocommit=True
)
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
conn.begin()
cursor.execute('INSERT INTO `user_info` (name) values(%s);', ['name100'])
cursor.execute('INSERT INTO `user_info` (name) values(%s);', ['name101'])
conn.commit()
cursor.close()
conn.close()
示例2:回滚事务
import pymysql
conn = pymysql.connect(host='127.0.0.1',
user='root',
password='123456',
port=3306,
database='blog',
autocommit=True
)
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
try:
conn.begin()
cursor.execute('INSERT INTO `user_info` (name) values(%s);', ['name102'])
cursor.execute('INSERT INTO `user_info` (name) values(%s);', ['name103'])
raise Exception('模拟异常')
conn.commit()
except Exception as ex:
print('出现异常,回滚')
conn.rollback()
cursor.close()
conn.close()