python数据库sqlite3操作类
发表于:2024-07-02 12:00:40浏览:127次
引言
这是数据库sqlite3操作类
类
import os
import sqlite3
import math
class Basic(object):
"""
基础模型
"""
PREFIX = "py_"
DB_PATH = os.path.join(os.getcwd(),'database.db')
def __init__(self,table,sql = None) -> None:
self.table = f"{self.PREFIX}{table}"
self.__error = None
self.__last_id = []
self.execute(sql)
def select(self,where = None,page = None,limit = None,sort = "id DESC",field = "*"):
"""
查询多条数据:
where 查询条件,为list格式,字段:值
page 当前页
limit 每页条数
"""
if where == None:
self.__set_error("缺少查询条件")
return None
# 分页
sql_page = ""
if page != None and limit !=None:
offset = (page - 1) * limit
sql_page = f"LIMIT {limit} OFFSET {offset}"
sql = f"SELECT {field} FROM {self.table} WHERE {self.__build_where(where)} ORDER BY {sort} {sql_page};"
rows = []
result = None
conn = self.__conn()
# 创建一个游标对象Cursor
cursor = conn.cursor()
# 执行SQL语句
try:
cursor.execute(sql)
result = cursor.fetchall()
# 获取查询结果的列名称
columns = [description[0] for description in cursor.description]
# 提交
conn.commit()
except Exception as err:
conn.rollback()
self.__set_error(err)
finally:
# 关闭游标
cursor.close()
# 关闭Connection对象
conn.close()
# 返回数据
if result != None:
# 遍历每条记录,打印列名和对应的值
for row in result:
# 将字段名称和数据对应起来
rows.append(dict(zip(columns, row)))
count = self.count(where)
total_page = math.ceil(count/limit) if isinstance(limit,int) and limit > 0 else count
return {"count":count,"rows":rows,"total_page":total_page,"page":page,"limit":limit}
def count(self,where):
"""
查询条数:
where 查询条件,为list格式,字段:值
"""
if where == None:
self.__set_error("缺少查询条件")
return None
sql = f"SELECT COUNT(*) FROM {self.table} WHERE {self.__build_where(where)};"
result = 0
conn = self.__conn()
# 创建一个游标对象Cursor
cursor = conn.cursor()
# 执行SQL语句
try:
cursor.execute(sql)
result = cursor.fetchone()[0]
# 提交
conn.commit()
except Exception as err:
conn.rollback()
self.__set_error(err)
finally:
# 关闭游标
cursor.close()
# 关闭Connection对象
conn.close()
return result
def sum(self,where,field):
"""
求和:
where 查询条件,为list格式,字段:值
field 求和的字段
"""
if where == None:
self.__set_error("缺少查询条件")
return None
sql = f"SELECT SUM({field}) FROM {self.table} WHERE {self.__build_where(where)};"
result = 0
conn = self.__conn()
# 创建一个游标对象Cursor
cursor = conn.cursor()
# 执行SQL语句
try:
cursor.execute(sql)
result = cursor.fetchone()[0]
# 提交
conn.commit()
except Exception as err:
conn.rollback()
self.__set_error(err)
finally:
# 关闭游标
cursor.close()
# 关闭Connection对象
conn.close()
return 0 if result == None else result
def value(self,where = None,field = None):
"""
查询1一个字段的值:
where 查询条件,为list格式
field 字段名
"""
row = self.find(where)
if row == None or field == None:
return None
return row.get(field)
def find(self,where = None,sort = "id DESC"):
"""
查询1条数据:
where 查询条件,为list格式
"""
if where == None:
self.__set_error("缺少查询条件")
return None
sql = f"SELECT * FROM {self.table} WHERE {self.__build_where(where)} ORDER BY {sort} LIMIT 1;"
result = None
conn = self.__conn()
# 创建一个游标对象Cursor
cursor = conn.cursor()
# 执行SQL语句
try:
cursor.execute(sql)
result = cursor.fetchone()
# 获取查询结果的列名称
columns = [description[0] for description in cursor.description]
# 提交
conn.commit()
except Exception as err:
conn.rollback()
self.__set_error(err)
finally:
# 关闭游标
cursor.close()
# 关闭Connection对象
conn.close()
# 返回数据
if result == None:
return result
return dict(zip(columns, result))
def get_last_id(self):
"""
获取新增数据的ID
"""
if len(self.__last_id) == 0:
return None
elif len(self.__last_id) == 1:
return self.__last_id[0]
else:
return self.__last_id
def __set_last_id(self,id):
"""
设置新增数据的ID
"""
self.__last_id.append(id)
def insert(self,data):
"""
插入数据(1条或多条):
data 数据,list/dict格式
"""
if isinstance(data,dict):
data = [data]
sql = []
for row in data:
sql_field = []
sql_values = []
for field in row:
sql_field.append(field)
val = row.get(field)
if isinstance(val,(str,int,float)):
sql_values.append(f"'{val}'")
else:
sql_values.append(val)
sql.append(f"INSERT INTO {self.table} ({','.join(sql_field)}) VALUES ({','.join(sql_values)});")
result = 0
conn = self.__conn()
# 创建一个游标对象Cursor
cursor = conn.cursor()
# 执行SQL语句
try:
for sql_item in sql:
cursor.execute(sql_item)
# 获取最新插入行的ID
self.__set_last_id(cursor.lastrowid)
# 提交
conn.commit()
except Exception as err:
conn.rollback()
self.__set_error(err)
finally:
# 插入条数
result = conn.total_changes
# 关闭游标
cursor.close()
# 关闭Connection对象
conn.close()
return result
def update(self,where,data):
"""
更新数据(1条或多条):
where 查询条件,为list格式
data 数据,dict格式
"""
sql_data = []
for field,value in data.items():
if isinstance(value,str):
value = f"'{value}'"
sql_data.append(f"{field}={value}")
sql = f"UPDATE {self.table} SET {','.join(sql_data)} WHERE {self.__build_where(where)};"
result = 0
conn = self.__conn()
# 创建一个游标对象Cursor
cursor = conn.cursor()
# 执行SQL语句
try:
cursor.execute(sql)
# 提交
conn.commit()
except Exception as err:
conn.rollback()
self.__set_error(err)
finally:
# 更新条数
result = cursor.rowcount
# 关闭游标
cursor.close()
# 关闭Connection对象
conn.close()
# 返回数据
return result
def delete(self,where):
"""
删除数据:
where 条件,dict格式
"""
sql = f"DELETE FROM {self.table} WHERE {self.__build_where(where)};"
result = 0
conn = self.__conn()
# 创建一个游标对象Cursor
cursor = conn.cursor()
# 执行SQL语句
try:
cursor.execute(sql)
# 提交
conn.commit()
except Exception as err:
conn.rollback()
self.__set_error(err)
finally:
# 更新条数
result = cursor.rowcount
# 关闭游标
cursor.close()
# 关闭Connection对象
conn.close()
# 返回数据
return result
def dict(self,where = None,key_field = None,value_field = None,sort = "id DESC"):
"""
查询多条数据:
where 查询条件,为list格式,字段:值
page 当前页
limit 每页条数
"""
if where == None or key_field == None or value_field == None:
self.__set_error("缺少参数")
return None
sql = f"SELECT {key_field},{value_field} FROM {self.table} WHERE {self.__build_where(where)} ORDER BY {sort};"
rows = {}
result = None
conn = self.__conn()
# 创建一个游标对象Cursor
cursor = conn.cursor()
# 执行SQL语句
try:
cursor.execute(sql)
result = cursor.fetchall()
# 获取查询结果的列名称
columns = [description[0] for description in cursor.description]
# 提交
conn.commit()
except Exception as err:
conn.rollback()
self.__set_error(err)
finally:
# 关闭游标
cursor.close()
# 关闭Connection对象
conn.close()
# 返回数据
if result != None:
# 遍历每条记录,打印列名和对应的值
for row in result:
rows[row[0]] = row[1]
return rows
def execute(self,sql):
"""
执行sql数据:
sql sql语句
"""
result = False
conn = self.__conn()
# 创建一个游标对象Cursor
cursor = conn.cursor()
# 执行SQL语句
try:
if isinstance(sql,list):
result = []
for sql_item in sql:
cursor.execute(sql_item)
else:
cursor.execute(sql)
# 提交
conn.commit()
except Exception as err:
conn.rollback()
self.__set_error(err)
finally:
# 关闭游标
cursor.close()
# 关闭Connection对象
conn.close()
# 返回数据
return result
def get_error(self):
"""
获取错误信息
"""
return self.__error
def __set_error(self,error):
"""
设置错误信息
"""
self.__error = error
return False
def __result(self,status,msg = None,data = None):
"""
返回数据:
"""
return {
"status":status,
"msg":msg,
"data":data
}
def __build_where(self,where = None,parent = 0):
"""
构建WHERE
示例:
[
["or",["field1","=","china"],["field2","LIKE","张三"]],
["field2","=","李四"],
["field3","is","null"],
["field4","between",[100,200]],
["field5","in",['张三',200]],
["field6","in","李四,王五"]
]
遍历where里面的item
item[0] 格式:字段名、OR
字段名:常规查询
OR:则表示OR查询,后面可能就会有item[1],item[2],item[3] ...,每项的格式则为 ["字段名","表达式","值"]
item[1] 格式:=、!=、>=、<=、LIKE、IS、BETWEEN、LIKE、IN、["字段名","表达式","值"]
IS:item[2]则为NULL 或 NOT NULL
BETWEEN:item[2]则为list,格式为["值1","值2"]
IN:则需判断item[2]类型是否为list,若是,则需转成字符串
["字段名","表达式","值"]:表示OR查询
其他:表达式
item[2] 格式:值、["字段名","表达式","值"]、NULL、NOT NULL
值:常规的值,可以是字符串、整型、浮点型等
NULL、NOT NULL:则说明item[1]为IS
["字段名","表达式","值"]:表示OR查询
"""
if where == None:
return False
sql = []
sql_child = []
for item in where:
i0 = item[0]
i1 = item[1].upper() if isinstance(item[1],str) else item[1]
i2 = item[2]
match i1:
case 'IS':
sql.append(f"{i0} {i1} {i2.upper()}")
case 'BETWEEN':
sql.append(f"{i0} {i1} {i2[0]} AND {i2[1]}")
case 'LIKE':
sql.append(f"{i0} {i1} '%{i2}%'")
case 'IN':
# 若IN里面有数字,则需要转成字符串
if isinstance(i2,str):
i2 = i2.split(",")
i2 = ",".join([str(a) if not isinstance(a,str) else "'"+a+"'" for a in i2])
sql.append(f"{i0} {i1} ({i2})")
case _:
# OR查询
if i0.upper() == 'OR':
sql_child = self.__build_where(item[1:],1)
else:
sql.append(f"{i0} {i1} '{i2}'")
if sql_child != []:
sql = sql + sql_child
return " AND ".join(sql) if parent==0 else sql
def __conn(self):
"""
链接数据库:
"""
return sqlite3.connect(self.DB_PATH)
栏目分类全部>
推荐文章
- mysql报错:SQLSTATE[HY000]: General error: 1881 Operation not allowed when innodb_forced_recovery > 0.
- python之js逆向爬虫实战
- 【linux版】tp5中使用GatewayWorker和Workerman
- 使用layer.photos图片放大
- composer指定php版本运行
- composer指定用户执行
- fastadmin如何给自定义按钮增加权限
- js常用数组函数
- 微信支付错误:openssl_x509_read(): supplied parameter cannot be coerced into an X509 certi
- Thinkphp5/TP5/PHP通过经纬度计算距离获取附近信息