Python 操作 MySQL 数据库的三个模块

​python使用MySQL主要有两个模块,pymysql(MySQLdb)和SQLAchemy。

  • pymysql(MySQLdb)为原生模块,直接执行sql语句,其中pymysql模块支持python 2和python3,MySQLdb只支持python2,两者使用起来几乎一样。
  • SQLAchemy为一个ORM框架,将数据对象转换成SQL,然后使用数据API执行SQL并获取执行结果
  • 另外DBUtils模块提供了一个数据库连接池,方便多线程场景中python操作数据库。

1.pymysql模块

安装:pip install pymysql

创建表格操作(注意中文格式设置)

#coding:utf-8
importpymysql

#关于中文问题
#1. mysql命令行创建数据库,设置编码为gbk:createdatabsedemo2charactersetutf8;
#2. python代码中连接时设置charset="gbk"
#3. 创建表格时设置defaultcharset=utf8

#连接数据库
conn=pymysql.connect(host="localhost", user="root", passwd="", db='learningsql', charset='utf8', port=3306) #和mysql服务端设置格式一样(还可设置为gbk, gb2312)
#创建游标
cursor=conn.cursor()
#执行sql语句
cursor.execute("""create table if not exists t_sales(
idintprimarykeyauto_incrementnotnull,
nickNamevarchar(128) notnull,
colorvarchar(128) notnull,
sizevarchar(128) notnull,
commenttextnotnull,
saledatevarchar(128) notnull)engine=InnoDBdefaultcharset=utf8;""")

# cursor.execute("""insert into t_sales(nickName,color,size,comment,saledate)
# values('%s','%s','%s','%s','%s');""" % ("zack", "黑色", "L", "大小合适", "2019-04-20"))

cursor.execute("""insert into t_sales(nickName,color,size,comment,saledate)
values(%s,%s,%s,%s,%s);""" , ("zack", "黑色", "L", "大小合适", "2019-04-20"))
#提交
conn.commit()
#关闭游标
cursor.close()
#关闭连接
conn.close()

增删改查:

注意execute执行sql语句参数的两种情况:

  • execute(“insert into t_sales(nickName, size) values(‘%s’,’%s’);” % (“zack”,”L”) )  #此时的%s为字符窜拼接占位符,需要引号加’%s’  (有sql注入风险)
  • execute(“insert into t_sales(nickName, size) values(%s,%s);” , (“zack”,”L”) ) #此时的%s为sql语句占位符,不需要引号%s
#***************************增删改查******************************************************
conn=pymysql.connect(host="localhost", user="root", passwd="", db='learningsql', charset='utf8', port=3306) #和mysql服务端设置格式一样(还可设置为gbk, gb2312)
#创建游标
cursor=conn.cursor()

insert_sql="insert into t_sales(nickName,color,size,comment,saledate) values(%s,%s,%s,%s,%s);"
#返回受影响的行数
row1=cursor.execute(insert_sql,("Bob", "黑色", "XL", "便宜实惠", "2019-04-20"))

update_sql="update t_sales set color='白色' where id=%s;"
#返回受影响的行数
row2=cursor.execute(update_sql,(1,))

select_sql="select * from t_sales where id>%s;"
#返回受影响的行数
row3=cursor.execute(select_sql,(1,))

delete_sql="delete from t_sales where id=%s;"
#返回受影响的行数
row4=cursor.execute(delete_sql,(4,))

#提交,不然无法保存新建或者修改的数据(增删改得提交)
conn.commit()
cursor.close()
conn.close()

批量插入和自增id

#***************************批量插入******************************************************
conn=pymysql.connect(host="localhost", user="root", passwd="", db='learningsql', charset='utf8', port=3306) #和mysql服务端设置格式一样(还可设置为gbk, gb2312)
#创建游标
cursor=conn.cursor()

insert_sql="insert into t_sales(nickName,color,size,comment,saledate) values(%s,%s,%s,%s,%s);"
data= [("Bob", "黑色", "XL", "便宜实惠", "2019-04-20"),("Ted", "黄色", "M", "便宜实惠", "2019-04-20"),("Gary", "黑色", "M", "穿着舒服", "2019-04-20")]
row1=cursor.executemany(insert_sql, data)

conn.commit()
#为插入的第一条数据的id,即插入的为5,6,7,new_id=5
new_id=cursor.lastrowid
print(new_id)
cursor.close()
conn.close()

获取查询数据

#***************************获取查找sql的查询数据******************************************************
conn=pymysql.connect(host="localhost", user="root", passwd="", db='learningsql', charset='utf8', port=3306) #和mysql服务端设置格式一样(还可设置为gbk, gb2312)
#创建游标
cursor=conn.cursor()

select_sql="select id,nickname,size from t_sales where id>%s;"
cursor.execute(select_sql, (3,))

row1=cursor.fetchone() #获取第一条数据,获取后游标会向下移动一行
row_n=cursor.fetchmany(3) #获取前n条数据,获取后游标会向下移动n行
row_all=cursor.fetchall() #获取所有数据,获取后游标会向下移动到末尾
print(row1)
print(row_n)
print(row_all)
#conn.commit()
cursor.close()
conn.close()

注:在fetch数据时按照顺序进行,可以使用cursor.scroll(num,mode)来移动游标位置,如:

  • cursor.scroll(1,mode=’relative’)  # 相对当前位置移动
  • cursor.scroll(2,mode=’absolute’) # 相对绝对位置移动

fetch获取数据类型

fetch获取的数据默认为元组格式,还可以获取字典类型的,如下:

#***************************获取字典格式数据******************************************************
conn=pymysql.connect(host="localhost", user="root", passwd="", db='learningsql', charset='utf8', port=3306) #和mysql服务端设置格式一样(还可设置为gbk, gb2312)
#创建游标
cursor=conn.cursor(cursor=pymysql.cursors.DictCursor)

select_sql="select id,nickname,size from t_sales where id>%s;"
cursor.execute(select_sql, (3,))

row1=cursor.fetchall()

print(row1)

conn.commit()
cursor.close()
conn.close()

2.SQLAlchmy框架

SQLAlchemy的整体架构如下,建立在第三方的DB API上,将类和对象操作转换为数据库sql,然后利用DB API执sql语句得到结果。其适用于多种数据库。另外其内部实现了数据库连接池,方便进行多线程操作。

  • Engine,框架的引擎
  • Connection Pooling ,数据库连接池
  • ​​Dialect​​​,选择连接数据库的DB API种类,(pymysql,mysqldb等)“
  • Schema/Types,架构和类型
  • SQL Exprression Language,SQL表达式语言
  • ​​DB API​​:Python Database API Specification

2.1 执行原生sql

安装:pip install sqlalchemy

SQLAlchmy也可以不利用ORM,使用数据库连接池,类似pymysql模块执行原生sql。

#coding:utf-8

fromsqlalchemyimportcreate_engine
fromsqlalchemy.ext.declarativeimportdeclarative_base
fromsqlalchemyimportColumn, String, Integer
importthreading

engine=create_engine(
"mysql+pymysql://[email protected]:3306/learningsql?charset=utf8",
max_overflow=0, #超过连接池大小外最多创建的连接,为0表示超过5个连接后,其他连接请求会阻塞(默认为10)
pool_size=5, #连接池大小(默认为5)
pool_timeout=30, #连接线程池中,没有连接时最多等待的时间,不设置无连接时直接报错(默认为30)
pool_recycle=-1) #多久之后对线程池中的线程进行一次连接的回收(重置)(默认为-1

# deftask():
# conn=engine.raw_connection() #建立原生连接,和pymysql的连接一样
# cur=conn.cursor()
# cur.execute("select * from t_sales where id>%s",(2,))
# result=cur.fetchone()
# cur.close()
# conn.close()
# print(result)



# deftask():
# conn=engine.contextual_connect() #建立上下文管理器连接,自动打开和关闭
# withconn:
# cur=conn.execute("select * from t_sales where id>%s",(2,))
# result=cur.fetchone()
# print(result)


deftask():
cur=engine.execute("select * from t_sales where id>%s",(2,)) #engine直接执行
result=cur.fetchone()
cur.close()
print(result)

if__name__=="__main__":
foriinrange(10):
t=threading.Thread(target=task)
t.start()

2.2 执行ORM语句

A. 创建和删除表

#coding:utf-8

importdatetime
fromsqlalchemyimportcreate_engine
fromsqlalchemy.ext.declarativeimportdeclarative_base
fromsqlalchemyimportColumn, String, Integer, DateTime, Text

Base=declarative_base()

classUser(Base):
__tablename__="users"
id=Column(Integer,primary_key=True)
name=Column(String(32),index=True, nullable=False) #创建索引,不为空
email=Column(String(32),unique=True)
ctime=Column(DateTime, default=datetime.datetime.now) #传入方法名datetime.datetime.now
extra=Column(Text,nullable=True)

__table_args__= {

# UniqueConstraint('id', 'name', name='uix_id_name'), #设置联合唯一约束
# Index('ix_id_name', 'name', 'email'), # 创建索引
}

defcreate_tbs():
engine=create_engine("mysql+pymysql://[email protected]:3306/learningsql?charset=utf8",max_overflow=2,pool_size=5)
Base.metadata.create_all(engine) #创建所有定义的表

defdrop_dbs():
engine=create_engine("mysql+pymysql://[email protected]:3306/learningsql?charset=utf8",max_overflow=2,pool_size=5)
Base.metadata.drop_all(engine) #删除所有创建的表

if__name__=="__main__":
create_tbs() #创建表
#drop_dbs() #删除表

B.表中定义外键关系(一对多,多对多)

思考:下面代码中的一对多关系,relationship 定义在了 customer 表中,应该定义在 PurchaseOrder 更合理?

注意:mysql 数据库中避免使用 order做为表的名字,order 为一个 mysql 关键字,做为表名字时必须用反引号order (键盘数字1旁边的符号)。

#coding:utf-8

fromsqlalchemyimportcreate_engine
fromsqlalchemy.ext.declarativeimportdeclarative_base
fromsqlalchemyimportColumn,Integer,String,Text,DateTime,ForeignKey,Float
fromsqlalchemy.ormimportrelationship
importdatetime

engine=create_engine("mysql+pymysql://[email protected]:3306/learningsql?charset=utf8") #数据库有密码时,//root:[email protected]:3306/
Base=declarative_base()

classCustomer(Base):
__tablename__="customer"#数据库中保存的表名字

id=Column(Integer,primary_key=True)
name=Column(String(64),index=True,nullable=False)
phone=Column(String(16),nullable=False)
address=Column(String(256),nullable=False)
purchase_order_id=Column(Integer,ForeignKey("purchase_order.id")) #关键关系,关联表的__tablename__="purchase_order"

# 和建立表结构无关,方便外键关系查询,backref反向查询时使用order_obj.customer
purchase_order=relationship("PurchaseOrder",backref="customer")



classPurchaseOrder(Base):
__tablename__="purchase_order"#mysql数据库中表的名字避免使用order,order为一个关键字,使用时必须用反引号`order` (键盘数字1旁边的符号)
id=Column(Integer,primary_key=True)
cost=Column(Float,nullable=True)
ctime=Column(DateTime,default=datetime.datetime.now)
desc=Column(String(528))

#多对多关系时,secondary为中间表
product=relationship("Product",secondary="order_to_product",backref="purchase_order")

classProduct(Base):
__tablename__="product"
id=Column(Integer,primary_key=True)
name=Column(String(256))
price=Column(Float,nullable=False)

classOrdertoProduct(Base):
__tablename__="order_to_product"
id=Column(Integer,primary_key=True)
product_id=Column(Integer,ForeignKey("product.id"))
purchase_order_id=Column(Integer,ForeignKey("purchase_order.id"))



if__name__=="__main__":
Base.metadata.create_all(engine)
#Base.metadata.drop_all(engine)

C.增删改查操作

增删改查

#coding:utf-8

fromsqlalchemyimportcreate_engine
fromsqlalchemy.ext.declarativeimportdeclarative_base
fromsqlalchemyimportColumn,Integer,String,Text,DateTime,ForeignKey,Float
fromsqlalchemy.ormimportrelationship,sessionmaker
fromsqlalchemy.sqlimporttext
importdatetime

engine=create_engine("mysql+pymysql://[email protected]:3306/learningsql?charset=utf8") #数据库有密码时,//root:[email protected]:3306/, 设置utf8防止中文乱码
Base=declarative_base()

classCustomer(Base):
__tablename__="customer"#数据库中保存的表名字

id=Column(Integer,primary_key=True)
name=Column(String(64),index=True,nullable=False)
phone=Column(String(16),nullable=False)
address=Column(String(256),nullable=False)
purchase_order_id=Column(Integer,ForeignKey("purchase_order.id")) #关键关系,关联表的__tablename__="purchase_order"

# 和建立表结构无关,方便外键关系查询,backref反向查询时使用order_obj.customer
purchase_order=relationship("PurchaseOrder",backref="customer")


classPurchaseOrder(Base):
__tablename__="purchase_order"#mysql数据库中表的名字避免使用order,order为一个关键字,使用时必须用反引号`order` (键盘数字1旁边的符号)
id=Column(Integer,primary_key=True)
cost=Column(Float,nullable=True)
ctime=Column(DateTime,default=datetime.datetime.now)
desc=Column(String(528))

#多对多关系时,secondary为中间表
product=relationship("Product",secondary="order_to_product",backref="purchase_order")

classProduct(Base):
__tablename__="product"
id=Column(Integer,primary_key=True)
name=Column(String(256))
price=Column(Float,nullable=False)

classOrdertoProduct(Base):
__tablename__="order_to_product"
id=Column(Integer,primary_key=True)
product_id=Column(Integer,ForeignKey("product.id"))
purchase_order_id=Column(Integer,ForeignKey("purchase_order.id"))


if__name__=="__main__":
#Base.metadata.create_all(engine)
#Base.metadata.drop_all(engine)

Session=sessionmaker(bind=engine)

#每次进行数据库操作时都要创建session
session=Session()

#*****************增加数据********************
# pur_order=PurchaseOrder(cost=19.7,desc="python编程之路")

# session.add(pur_order)
# session.add_all(
# [PurchaseOrder(cost=39.7,desc="linux操作系统"),
# PurchaseOrder(cost=59.6,desc="python cookbook")])
# session.commit()

#*****************修改数据********************

#session.query(PurchaseOrder).filter(PurchaseOrder.id>2).update({"cost":29.7})
#session.query(PurchaseOrder).filter(PurchaseOrder.id==2).update({"cost":PurchaseOrder.cost+40.1},synchronize_session=False) #synchronize_session用于query在进行deleteorupdate操作时,对session的同步策略。
#session.commit()

#*****************删除数据********************
#session.query(PurchaseOrder).filter(PurchaseOrder.id==1).delete()
#session.commit()

#*****************查询数据********************
#ret=session.query(PurchaseOrder).all()
# ret=session.query(PurchaseOrder).filter(PurchaseOrder.id==2).all() #包含对象的列表
# ret=session.query(PurchaseOrder).filter(PurchaseOrder.id==2).first() #单个对象
# ret=session.query(PurchaseOrder).filter_by(id=2).all() #通过列名字的表达式
# ret=session.query(PurchaseOrder).filter_by(id=2).first()
#ret=session.query(PurchaseOrder).filter(text("id<:value and cost>:price")).params(value=6,price=15).order_by(PurchaseOrder.cost).all()
#ret=session.query(PurchaseOrder).from_statement(text("SELECT * FROM purchase_order WHERE cost>:price")).params(price=40).all()
# printret
# foriinret:
# printi.id, i.cost, i.ctime,i.desc

#ret2=session.query(PurchaseOrder.id,PurchaseOrder.cost.label('totalcost')).all() #只查询两列,ret2为列表
#printret2

#关闭session
session.close()

查询语句

# 条件
ret=session.query(Users).filter_by(name='alex').all()
ret=session.query(Users).filter(Users.id>1, Users.name=='eric').all()
ret=session.query(Users).filter(Users.id.between(1, 3), Users.name=='eric').all()
ret=session.query(Users).filter(Users.id.in_([1,3,4])).all()
ret=session.query(Users).filter(~Users.id.in_([1,3,4])).all()
ret=session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name='eric'))).all()
fromsqlalchemyimportand_, or_
ret=session.query(Users).filter(and_(Users.id>3, Users.name=='eric')).all()
ret=session.query(Users).filter(or_(Users.id<2, Users.name=='eric')).all()
ret=session.query(Users).filter(
or_(
Users.id<2,
and_(Users.name=='eric', Users.id>3),
Users.extra!=""
)).all()


# 通配符
ret=session.query(Users).filter(Users.name.like('e%')).all()
ret=session.query(Users).filter(~Users.name.like('e%')).all()

# 限制
ret=session.query(Users)[1:2]

# 排序
ret=session.query(Users).order_by(Users.name.desc()).all()
ret=session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all()

# 分组
fromsqlalchemy.sqlimportfunc

ret=session.query(Users).group_by(Users.extra).all()
ret=session.query(
func.max(Users.id),
func.sum(Users.id),
func.min(Users.id)).group_by(Users.name).all()

ret=session.query(
func.max(Users.id),
func.sum(Users.id),
func.min(Users.id)).group_by(Users.name).having(func.min(Users.id) >2).all()

# 连表

ret=session.query(Users, Favor).filter(Users.id==Favor.nid).all()

ret=session.query(Person).join(Favor).all()

ret=session.query(Person).join(Favor, isouter=True).all()


# 组合
q1=session.query(Users.name).filter(Users.id>2)
q2=session.query(Favor.caption).filter(Favor.nid<2)
ret=q1.union(q2).all()

q1=session.query(Users.name).filter(Users.id>2)
q2=session.query(Favor.caption).filter(Favor.nid<2)
ret=q1.union_all(q2).all()

补充

#coding:utf-8

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.sql import text, func
from sqlalchemy_orm2 import PurchaseOrder #导入定义的PurchaseOrder表格类

engine = create_engine("mysql+pymysql://[email protected]:3306/learningsql?charset=utf8")
Session = sessionmaker(bind=engine)
session = Session()
#查询
ret = session.execute("select * from purchase_order where id=:value",params={"value":3})
print ret
for i in ret:
print i.id, i.cost, i.ctime,i.desc

#插入
purchase_order = PurchaseOrder.__table__ #拿到PurchaseOrder表格对象
ret=session.execute(purchase_order.insert(),
[{"cost":46.3,"desc":'python2'},
{"cost":43.3,"desc":'python3'}])
session.commit()
print(ret.lastrowid)
session.close()

# 关联子查询
subqry = session.query(func.count(Server.id).label("sid")).filter(Server.id == Group.id).correlate(Group).as_scalar()
result = session.query(Group.name, subqry)
"""
SELECT `group`.name AS group_name, (SELECT count(server.id) AS sid
FROM server
WHERE server.id = `group`.id) AS anon_1
FROM `group`
"""

D.多线程操作

#coding:utf-8

fromsqlalchemyimportcreate_engine
fromsqlalchemy.ormimportsessionmaker
fromsqlalchemy_orm2importProduct
fromthreadingimportThread

engine=create_engine("mysql+pymysql://[email protected]:3306/learningsql?charset=utf8",max_overflow=0,pool_size=5)
Session=sessionmaker(bind=engine)

deftask(name,price):
session=Session()
pro=Product(name=name,price=price)
session.add(pro)
session.commit()
session.close()

if__name__=="__main__":
foriinrange(6):
t=Thread(target=task,args=("pro"+str(i),i*5))
t.start()

E. 通过relationship操纵一对多和多对多关系

一对多

#coding:utf-8

fromsqlalchemyimportcreate_engine
fromsqlalchemy.ormimportsessionmaker
fromsqlalchemy.sqlimporttext, func
fromsqlalchemy_orm2importPurchaseOrder,Product,OrdertoProduct,Customer#导入定义的表格类


engine=create_engine("mysql+pymysql://[email protected]:3306/learningsql?charset=utf8")
Session=sessionmaker(bind=engine)
session=Session()

# #通过定义的关键关系添加(id值)
# cus1=Customer(name="zack",phone="13567682333",address="Nanjing",purchase_order_id=3)
# session.add(cus1)


# #通过relationship正向添加
# cus2=Customer(name="zack2",phone="13567682333",address="Nanjing",purchase_order=PurchaseOrder(cost=53,desc="java"))
# session.add(cus2)
# session.commit()

#通过relationship反向添加
# purchase_order=PurchaseOrder(cost=53,desc="php")
# cus3=Customer(name="zack3",phone="13567682333",address="Nanjing")
# cus4=Customer(name="zack4",phone="13567682333",address="Nanjing")
# purchase_order.customer=[cus3,cus4] #cus3,cus4的purchase_order_id都是purchase_order.id值,即同时添加了两组外键关系
# session.add(purchase_order)
# session.commit()

##通过relationship正向查询
cus=session.query(Customer).first()
print(cus.purchase_order_id)
print(cus.purchase_order.desc)

#通过relationship反向查询
pur=session.query(PurchaseOrder).filter(PurchaseOrder.id==3).first()
print(pur.desc)
print(pur.customer) #返回一个list

多对多

#coding:utf-8


fromsqlalchemyimportcreate_engine
fromsqlalchemy.ormimportsessionmaker
fromsqlalchemy.sqlimporttext, func
fromsqlalchemy_orm2importPurchaseOrder,Product,OrdertoProduct,Customer#导入定义的表格类

engine=create_engine("mysql+pymysql://[email protected]:3306/learningsql?charset=utf8")
Session=sessionmaker(bind=engine)
session=Session()

# session.add_all([Product(name="java",price=24),
# Product(name="python",price=34),
# Product(name="php",price=27)])
# session.commit()

# #通过定义的关键关系添加(id值)
# op=OrdertoProduct(product_id=1,purchase_order_id=16)
# session.add(op)
# session.commit()

# #通过relationship添加
# pur=PurchaseOrder(cost=27,desc="xxxx")
# pur.product= [Product(name="C++",price=60),] #正向
# session.add(pur)

# pro=Product(name="C",price=40)
# pro.purchase_order=[PurchaseOrder(cost=27,desc="xxxx"),] #反向
# session.add(pro)
# session.commit()


#通过relationship正向查询
pur=session.query(PurchaseOrder).filter(PurchaseOrder.id==19).first()
print(pur.desc)
print(pur.product) #结果为列表

#通过relationship反向查询
pro=session.query(Product).filter(Product.id==5).first()
print(pro.name)
print(pro.purchase_order) #结果为列表

session.close()

3.数据库连接池

对于ORM框架,其内部维护了链接池,可以直接通过多线程操控数据库。对于pymysql模块,通过多线程操控数据库容易出错,得加锁串行执行。进行并发时,可以利用DBUtils模块来维护数据库连接池。

3.1 多线程操控pymysql

不采用DBUtils连接池时, pymysql多线程代码如下:

每个线程创建链接

importpymysql
importthreadind

#**************************无连接池*******************************
#每个线程都要创立一次连接,线程并发操作间可能有问题?
deffunc():
conn=pymysql.connect(host="127.0.0.1",port=3306,db="learningsql",user="root",passwd="",charset="utf8")
cursor=conn.cursor()
cursor.execute("select * from user where nid>1;")
result=cursor.fetchone()
print(result)
cursor.close()
conn.close()

if__name__=="__main__":
foriinrange(5):
t=threading.Thread(target=func,name="thread-%s"%i)
t.start()

一个连接串行执行

#**************************无连接池*******************************
#创建一个连接,加锁串行执行
fromthreadingimportLock
importpymysql
importthreading
conn=pymysql.connect(host="127.0.0.1",port=3306,db="learningsql",user="root",passwd="",charset="utf8")


lock=Lock()
deffunc():
withlock:
cursor=conn.cursor()
cursor.execute("select * from user where nid>1;")
result=cursor.fetchone()
print(result)
cursor.close()

#conn.close()不能在线程中关闭连接,否则其他线程不可用了

if__name__=="__main__":
threads= []
foriinrange(5):
t=threading.Thread(target=func,name="thread-%s"%i)
threads.append(t)
t.start()

fortinthreads:
t.join()

conn.close()

3.2 DBUtils连接池

DBUtils连接池有两种连接模式:PersistentDB和PooledDB

官网文档:https://cito.github.io/DBUtils/UsersGuide.html

模式一(DBUtils.PersistentDB):

为每个线程创建一个连接,线程即使调用了close方法,也不会关闭,只是把连接重新放到连接池,供自己线程再次使用。当线程终止时,连接自动关闭。

PersistentDB使用代码如下:

#coding:utf-8

fromDBUtils.PersistentDBimportPersistentDB
importpymysql
importthreading

pool=PersistentDB(
creator=pymysql, # 使用链接数据库的模块
maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制
setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
ping=0, # pingMySQL服务端,检查是否服务可用。# 如:0=None=never, 1=default=wheneveritisrequested, 2=whenacursoriscreated, 4=whenaqueryisexecuted, 7=always
closeable=False, # 如果为False时,conn.close() 实际上被忽略,供下次使用,再线程关闭时,才会自动关闭链接。如果为True时,conn.close()则关闭链接,那么再次调用pool.connection时就会报错,因为已经真的关闭了连接(pool.steady_connection()可以获取一个新的链接)
threadlocal=None, # 本线程独享值得对象,用于保存链接对象,如果链接对象被重置
host="127.0.0.1",
port=3306,
user="root",
password="",
database="learningsql",
charset="utf8"
)

deffunc():
conn=pool.connection()
cursor=conn.cursor()
cursor.execute("select * from user where nid>1;")
result=cursor.fetchone()
print(result)
cursor.close()
conn.close()

if__name__=="__main__":
foriinrange(5):
t=threading.Thread(target=func,name="thread-%s"%i)
t.start()

模式二(DBUtils.PooledDB):

创建一批连接到连接池,供所有线程共享使用。

(由于pymysql、MySQLdb等threadsafety值为1,所以该模式连接池中的线程会被所有线程共享。)

PooledDB使用代码如下:

fromDBUtils.PooledDBimportPooledDB
importpymysql
importthreading
importtime

pool=PooledDB(
creator=pymysql, # 使用链接数据库的模块
maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数
mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
maxcached=5, # 链接池中最多闲置的链接,0和None不限制
maxshared=3, # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制
setsession= [], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
ping=0, # pingMySQL服务端,检查是否服务可用。# 如:0=None=never, 1=default=wheneveritisrequested, 2=whenacursoriscreated, 4=whenaqueryisexecuted, 7=always
host="127.0.0.1",
port=3306,
user="root",
password="",
database="learningsql",
charset="utf8"
)

deffunc():
conn=pool.connection()
cursor=conn.cursor()
cursor.execute("select * from user where nid>1;")
result=cursor.fetchone()
print(result)
time.sleep(5) #为了查看mysql端的线程数量
cursor.close()
conn.close()

if__name__=="__main__":
foriinrange(5):
t=threading.Thread(target=func,name="thread-%s"%i)
t.start()

上述代码中加入了sleep(5)使线程连接数据库时间延长,方便查看mysql数据库连接线程情况,下图分别为代码执行中和执行后的线程连接情况,可以发现,代码执行时,同时有6个线程连接上了数据库(有一个为mysql命令客户端),代码执行后,只有一个线程连接数据库,但仍有5个线程等待连接。

(show status like “Threads%” 查看线程连接情况)

文章来源网络,作者:运维,如若转载,请注明出处:https://shuyeidc.com/wp/232171.html<

(0)
运维的头像运维
上一篇2025-04-20 01:15
下一篇 2025-04-20 01:16

相关推荐

  • 个人主题怎么制作?

    制作个人主题是一个将个人风格、兴趣或专业领域转化为视觉化或结构化内容的过程,无论是用于个人博客、作品集、社交媒体账号还是品牌形象,核心都是围绕“个人特色”展开,以下从定位、内容规划、视觉设计、技术实现四个维度,详细拆解制作个人主题的完整流程,明确主题定位:找到个人特色的核心主题定位是所有工作的起点,需要先回答……

    2025-11-20
    0
  • 社群营销管理关键是什么?

    社群营销的核心在于通过建立有温度、有价值、有归属感的社群,实现用户留存、转化和品牌传播,其管理需贯穿“目标定位-内容运营-用户互动-数据驱动-风险控制”全流程,以下从五个维度展开详细说明:明确社群定位与目标社群管理的首要任务是精准定位,需明确社群的核心价值(如行业交流、产品使用指导、兴趣分享等)、目标用户画像……

    2025-11-20
    0
  • 香港公司网站备案需要什么材料?

    香港公司进行网站备案是一个涉及多部门协调、流程相对严谨的过程,尤其需兼顾中国内地与香港两地的监管要求,由于香港公司注册地与中国内地不同,其网站若主要服务内地用户或使用内地服务器,需根据服务器位置、网站内容性质等,选择对应的备案路径(如工信部ICP备案或公安备案),以下从备案主体资格、流程步骤、材料准备、注意事项……

    2025-11-20
    0
  • 如何企业上云推广

    企业上云已成为数字化转型的核心战略,但推广过程中需结合行业特性、企业痛点与市场需求,构建系统性、多维度的推广体系,以下从市场定位、策略设计、执行落地及效果优化四个维度,详细拆解企业上云推广的实践路径,精准定位:明确目标企业与核心价值企业上云并非“一刀切”的方案,需先锁定目标客户群体,提炼差异化价值主张,客户分层……

    2025-11-20
    0
  • PS设计搜索框的实用技巧有哪些?

    在PS中设计一个美观且功能性的搜索框需要结合创意构思、视觉设计和用户体验考量,以下从设计思路、制作步骤、细节优化及交互预览等方面详细说明,帮助打造符合需求的搜索框,设计前的规划明确使用场景:根据网站或APP的整体风格确定搜索框的调性,例如极简风适合细线条和纯色,科技感适合渐变和发光效果,电商类则可能需要突出搜索……

    2025-11-20
    0

发表回复

您的邮箱地址不会被公开。必填项已用 * 标注