Python 使用 PyMySQL 操作 MySQL


#Python 笔记


简介

PyMySQL 库可以用来操作 MySQL。

官方文档:

安装

使用 pip 安装 PyMySQL:

$ python3 -m pip install PyMySQL

获取 DB 列表

方式1:

import pymysql

conn = pymysql.connect(host='127.0.0.1',
                       user='root',
                       password='123456',
                       port=3306)
cursor = conn.cursor()
cursor.execute('show databases;')

for item in cursor.fetchall():
    print(type(item), item)   # item 是 tuple 元组

cursor.close()
conn.close()

运行结果示例:

<class 'tuple'> ('information_schema',)
<class 'tuple'> ('mysql',)
<class 'tuple'> ('performance_schema',)
<class 'tuple'> ('school',)
<class 'tuple'> ('test',)

方式2:

查询结果可以是 dict 类型,不过需要生成 cursor 时指定 cursor 参数为 pymysql.cursors.DictCursor 。

import pymysql

conn = pymysql.connect(host='127.0.0.1',
                       user='root',
                       password='123456',
                       port=3306)
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)  # 指定 cursor 后,结果会被处理为 dict
cursor.execute('show databases;')

for item in cursor.fetchall():
    print(type(item), item)

cursor.close()
conn.close()

运行结果示例:

<class 'dict'> {'Database': 'information_schema'}
<class 'dict'> {'Database': 'mysql'}
<class 'dict'> {'Database': 'performance_schema'}
<class 'dict'> {'Database': 'school'}
<class 'dict'> {'Database': 'test'}

创建数据库

import pymysql

conn = pymysql.connect(host='127.0.0.1',
                       user='root',
                       password='123456',
                       port=3306)
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
affect_rows = cursor.execute('create database blog;')

print(affect_rows)  # 输出 1

cursor.close()
conn.close()

运行后,可以在 MySQL 中看到 blog 库。

切换数据库

我们以查询某个库下所有的表为例。

方式1

先创建数据库连接,然后使用 use dbname 选择数据库。

import pymysql

conn = pymysql.connect(host='127.0.0.1',
                       user='root',
                       password='123456',
                       port=3306)
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
cursor.execute('use information_schema;')
cursor.execute('show tables;')

for item in cursor.fetchall():
    print(type(item), item)

cursor.close()
conn.close()

运行结果示例(省略部分内容):

<class 'dict'> {'Tables_in_information_schema': 'CHARACTER_SETS'}
<class 'dict'> {'Tables_in_information_schema': 'COLLATIONS'}
<class 'dict'> {'Tables_in_information_schema': 'COLLATION_CHARACTER_SET_APPLICABILITY'}
<class 'dict'> {'Tables_in_information_schema': 'COLUMNS'}
<class 'dict'> {'Tables_in_information_schema': 'COLUMN_PRIVILEGES'}
<class 'dict'> {'Tables_in_information_schema': 'ENGINES'}
<class 'dict'> {'Tables_in_information_schema': 'EVENTS'}
<class 'dict'> {'Tables_in_information_schema': 'FILES'}
<class 'dict'> {'Tables_in_information_schema': 'GLOBAL_STATUS'}
<class 'dict'> {'Tables_in_information_schema': 'GLOBAL_VARIABLES'}

方式2

连接时指定数据库:

import pymysql

conn = pymysql.connect(host='127.0.0.1',
                       user='root',
                       password='123456',
                       port=3306,
                       database='information_schema')   # 指定数据库
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
cursor.execute('show tables;')

for item in cursor.fetchall():
    print(type(item), item)

cursor.close()
conn.close()

创建表

import pymysql

conn = pymysql.connect(host='127.0.0.1',
                       user='root',
                       password='123456',
                       port=3306,
                       database='blog')
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
affect_rows = cursor.execute('''
create table `user_info` (
    `id` bigint unsigned not null auto_increment,
    `name` varchar(45) not null default '',
    primary key (`id`)
) engine = InnoDB default charset = utf8mb4;
''')

print(affect_rows)  # 输出0

cursor.close()
conn.close()

添加索引

创建表

import pymysql

conn = pymysql.connect(host='127.0.0.1',
                       user='root',
                       password='123456',
                       port=3306,
                       database='blog')
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
affect_rows = cursor.execute('''
create table `user_info` (
    `id` bigint unsigned not null auto_increment,
    `name` varchar(45) not null default '',
    primary key (`id`)
) engine = InnoDB default charset = utf8mb4;
''')

print(affect_rows)  # 输出0

cursor.close()
conn.close()

给表添加索引

import pymysql

conn = pymysql.connect(host='127.0.0.1',
                       user='root',
                       password='123456',
                       port=3306,
                       database='blog')
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
affect_rows = cursor.execute('''
ALTER TABLE `user_info` ADD INDEX idx_name(`name`); 
''')

print(affect_rows)  # 输出0

cursor.close()
conn.close()

执行后,通过mysql客户端查询表的创建语句,会发现多了新的索引:

mysql> show create table user_info\G
*************************** 1. row ***************************
       Table: user_info
Create Table: CREATE TABLE `user_info` (
  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
  `name` varchar(45) NOT NULL DEFAULT '',
  PRIMARY KEY (`id`),
  KEY `idx_name` (`name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
1 row in set (0.00 sec)

插入数据

建表

在 blog 库下建表:

create table `user_info` (
    `id` bigint unsigned not null auto_increment,
    `name` varchar(45) not null default '',
    primary key (`id`)
) engine = InnoDB default charset = utf8mb4;

直接执行组装后的 sql

注意,这种方式有sql注入风险,会影响数据安全!!

import pymysql

conn = pymysql.connect(host='127.0.0.1',
                       user='root',
                       password='123456',
                       port=3306,
                       database='blog')
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
sql = '''INSERT INTO `user_info` (name) values('{}');'''.format('name001')
affect_rows = cursor.execute(sql)

print('影响行数: ', affect_rows)
print('插入数据对应的主键id: ', cursor.lastrowid)

conn.commit()  # 必须加这个

cursor.close()
conn.close()

运行结果示例:

影响行数:  1
插入数据对应的主键id:  2

查询数据库中的数据:

mysql> select * from user_info;
+----+---------+
| id | name    |
+----+---------+
|  2 | name001 |
+----+---------+

若不想每次都添加conn.commit(),可以在创建数据库连接时,指定 autocommit 为 True。

示例:

import pymysql

conn = pymysql.connect(host='127.0.0.1',
                       user='root',
                       password='123456',
                       port=3306,
                       database='blog',
                       autocommit=True    # 指定 autocommit 为 True
                       )
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
sql = '''INSERT INTO `user_info` (name) values('{}');'''.format('name002')
affect_rows = cursor.execute(sql)

print('影响行数: ', affect_rows)
print('插入数据对应的主键id: ', cursor.lastrowid)

cursor.close()
conn.close()

使用参数化语句添加数据 - 使用元组/列表指定数据

这个和 Java jdbc 的 prepare statement 参数化声明不同。pymysql 的参数化语句本质上仍然是组装 sql,不过会对数据进行安全转义,以防止 sql 注入。

示例:

import pymysql

conn = pymysql.connect(host='127.0.0.1',
                       user='root',
                       password='123456',
                       port=3306,
                       database='blog',
                       autocommit=True
                       )
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
affect_rows = cursor.execute('INSERT INTO `user_info` (name) values(%s);', ['name003'])

print('影响行数: ', affect_rows)
print('插入数据对应的主键id: ', cursor.lastrowid)

cursor.close()
conn.close()

使用参数化语句添加数据 - 使用字典指定数据

示例:

import pymysql

conn = pymysql.connect(host='127.0.0.1',
                       user='root',
                       password='123456',
                       port=3306,
                       database='blog',
                       autocommit=True
                       )
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
affect_rows = cursor.execute('INSERT INTO `user_info` (name) values(%(name)s);', {'name': 'name004'})

print('影响行数: ', affect_rows)
print('插入数据对应的主键id: ', cursor.lastrowid)

cursor.close()
conn.close()

如何使用参数化声明

上面提到,pymysql 的参数化语句和 Java jdbc 的 prepare statement 参数化声明不同。pymysql 的参数化语句本质上仍然是组装 sql,不过会对数据进行安全转义,以防止 sql 注入。

MySQL 的官方 Python 库 mysql.connector 是支持的,具体可参考:

获取插入数据的主键 ID

插入数据后,通过cursor.lastrowid可以获取对应数据的主键id。具体使用方式见上面的示例代码。

一次插入多条数据

使用 executemany 方法。

示例:

import pymysql

conn = pymysql.connect(host='127.0.0.1',
                       user='root',
                       password='123456',
                       port=3306,
                       database='blog',
                       autocommit=True
                       )
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
affect_rows = cursor.executemany('INSERT INTO `user_info` (name) values(%(name)s);',
                                 [
                                     {'name': 'name005'},
                                     {'name': 'name006'}
                                 ])

print('影响行数: ', affect_rows)   # 输出 2
print('插入数据对应的主键id: ', cursor.lastrowid)   # 这里只返回第一个数据的id

cursor.close()
conn.close()

运行结果示例:

影响行数:  2
插入数据对应的主键id:  6

根据注释,executemany 方法可以针对 insert、replace SQL语句使用。

查询数据

执行 select 后,可以通过 cursor.fetchone、cursor.fetchmany、cursor.fetchall 获取数据。注意,查询结果中的每一条数据只能被fetch一次,被取出后。下一次 fetch 就拿不到数据了。

以下是示例:

建表和数据准备

在 blog 库下建表:

create table `user_info` (
    `id` bigint unsigned not null auto_increment,
    `name` varchar(45) not null default '',
    primary key (`id`)
) engine = InnoDB default charset = utf8mb4;

插入数据,数据内容如下:

mysql> select * from user_info;
+----+---------+
| id | name    |
+----+---------+
|  2 | name001 |
|  3 | name002 |
|  4 | name003 |
|  5 | name004 |
|  6 | name005 |
|  7 | name006 |
+----+---------+
6 rows in set (0.01 sec)

指定 id 查询

import pymysql

conn = pymysql.connect(host='127.0.0.1',
                       user='root',
                       password='123456',
                       port=3306,
                       database='blog',
                       autocommit=True
                       )
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
affect_rows = cursor.execute('select id, name from user_info where id=%s', [2])

print('影响行数: ', affect_rows)

print('fetchone 结果: ', cursor.fetchone())
print('fetchall 结果: ', cursor.fetchall())

cursor.close()
conn.close()

运行结果示例:

影响行数:  1
fetchone 结果:  {'id': 2, 'name': 'name001'}
fetchall 结果:  []

可以看到 fetchall 的时候没有数据。

如果将 fetch 的顺序调整成:

print('fetchall 结果: ', cursor.fetchall())
print('fetchone 结果: ', cursor.fetchone())

运行结果是:

fetchall 结果:  [{'id': 2, 'name': 'name001'}]
fetchone 结果:  None

根据 id 范围查询

import pymysql

conn = pymysql.connect(host='127.0.0.1',
                       user='root',
                       password='123456',
                       port=3306,
                       database='blog',
                       autocommit=True
                       )
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
affect_rows = cursor.execute('select id, name from user_info where id>%s', [2])

print('影响行数: ', affect_rows)

print('fetchall 结果: ', cursor.fetchall())
print('fetchone 结果: ', cursor.fetchone())

cursor.close()
conn.close()

运行结果示例:

影响行数:  5
fetchall 结果:  [{'id': 3, 'name': 'name002'}, {'id': 4, 'name': 'name003'}, {'id': 5, 'name': 'name004'}, {'id': 6, 'name': 'name005'}, {'id': 7, 'name': 'name006'}]
fetchone 结果:  None

如果只用fetchone 不停的取数据:

import pymysql

conn = pymysql.connect(host='127.0.0.1',
                       user='root',
                       password='123456',
                       port=3306,
                       database='blog',
                       autocommit=True
                       )
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
affect_rows = cursor.execute('select id, name from user_info where id>%s', [2])

print('影响行数: ', affect_rows)

print('fetchone 结果: ', cursor.fetchone())
print('fetchone 结果: ', cursor.fetchone())
print('fetchone 结果: ', cursor.fetchone())
print('fetchone 结果: ', cursor.fetchone())
print('fetchone 结果: ', cursor.fetchone())
print('fetchone 结果: ', cursor.fetchone())
print('fetchone 结果: ', cursor.fetchone())

cursor.close()
conn.close()

运行结果如下:

影响行数:  5
fetchone 结果:  {'id': 3, 'name': 'name002'}
fetchone 结果:  {'id': 4, 'name': 'name003'}
fetchone 结果:  {'id': 5, 'name': 'name004'}
fetchone 结果:  {'id': 6, 'name': 'name005'}
fetchone 结果:  {'id': 7, 'name': 'name006'}
fetchone 结果:  None
fetchone 结果:  None

当 fetchone 取不到数据时会返回 None,当 fetchall 取不到数据时会返回空list。

删除数据

建表和数据准备

在 blog 库下建表:

create table `user_info` (
    `id` bigint unsigned not null auto_increment,
    `name` varchar(45) not null default '',
    primary key (`id`)
) engine = InnoDB default charset = utf8mb4;

插入数据,数据内容如下:

mysql> select * from user_info;
+----+---------+
| id | name    |
+----+---------+
|  2 | name001 |
|  3 | name002 |
|  4 | name003 |
|  5 | name004 |
|  6 | name005 |
|  7 | name006 |
+----+---------+
6 rows in set (0.01 sec)

删除数据

import pymysql

conn = pymysql.connect(host='127.0.0.1',
                       user='root',
                       password='123456',
                       port=3306,
                       database='blog',
                       autocommit=True
                       )
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
affect_rows = cursor.execute('delete from user_info where id=%s', [2])

print('影响行数: ', affect_rows)

cursor.close()
conn.close()

运行结果:

影响行数:  1

更新数据

建表和数据准备

在 blog 库下建表:

create table `user_info` (
    `id` bigint unsigned not null auto_increment,
    `name` varchar(45) not null default '',
    primary key (`id`)
) engine = InnoDB default charset = utf8mb4;

插入数据,数据内容如下:

mysql> select * from user_info;
+----+---------+
| id | name    |
+----+---------+
|  2 | name001 |
|  3 | name002 |
|  4 | name003 |
|  5 | name004 |
|  6 | name005 |
|  7 | name006 |
+----+---------+
6 rows in set (0.01 sec)

更新数据

import pymysql

conn = pymysql.connect(host='127.0.0.1',
                       user='root',
                       password='123456',
                       port=3306,
                       database='blog',
                       autocommit=True
                       )
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
affect_rows = cursor.execute('update user_info set name=%s where id=%s', ['新名字', 2])

print('影响行数: ', affect_rows)

cursor.close()
conn.close()

运行结果:

影响行数:  1

如果再次执行,运行结果是:

影响行数:  0

为什么影响行数变成0了 ?因为 MySQL 发现要更新的 name 值和已有值相同,就不更新了。

数据库事务

  • 开启事务:conn.begin()
  • 提交事务:conn.commit()
  • 回滚事务:conn.rollback()

以下为示例。

建表

在 blog 库下建表:

create table `user_info` (
    `id` bigint unsigned not null auto_increment,
    `name` varchar(45) not null default '',
    primary key (`id`)
) engine = InnoDB default charset = utf8mb4;

插入数据,数据内容如下:

mysql> select * from user_info;
+----+---------+
| id | name    |
+----+---------+
|  2 | name001 |
|  3 | name002 |
|  4 | name003 |
|  5 | name004 |
|  6 | name005 |
|  7 | name006 |
+----+---------+
6 rows in set (0.01 sec)

示例1:开启和提交事务

import pymysql

conn = pymysql.connect(host='127.0.0.1',
                       user='root',
                       password='123456',
                       port=3306,
                       database='blog',
                       autocommit=True
                       )
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)

conn.begin()
cursor.execute('INSERT INTO `user_info` (name) values(%s);', ['name100'])
cursor.execute('INSERT INTO `user_info` (name) values(%s);', ['name101'])
conn.commit()


cursor.close()
conn.close()

示例2:回滚事务

import pymysql

conn = pymysql.connect(host='127.0.0.1',
                       user='root',
                       password='123456',
                       port=3306,
                       database='blog',
                       autocommit=True
                       )
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)

try:
    conn.begin()
    cursor.execute('INSERT INTO `user_info` (name) values(%s);', ['name102'])
    cursor.execute('INSERT INTO `user_info` (name) values(%s);', ['name103'])
    raise Exception('模拟异常')
    conn.commit()
except Exception as ex:
    print('出现异常,回滚')
    conn.rollback()


cursor.close()
conn.close()


( 本文完 )