您的当前位置:首页>全部文章>文章详情

python数据库sqlite3操作类

发表于:2024-07-02 12:00:40浏览:127次TAG: #python #sqlite3

引言

这是数据库sqlite3操作类


import os
import sqlite3
import math

class Basic(object):
    """
    基础模型
    """
    PREFIX = "py_"
    DB_PATH = os.path.join(os.getcwd(),'database.db')

    def __init__(self,table,sql = None) -> None:
        self.table = f"{self.PREFIX}{table}"
        self.__error = None
        self.__last_id = []
        self.execute(sql)

    def select(self,where = None,page = None,limit = None,sort = "id DESC",field = "*"):
        """
        查询多条数据:
        where 查询条件,为list格式,字段:值
        page  当前页
        limit 每页条数
        """
        if where == None:
            self.__set_error("缺少查询条件")
            return None
        # 分页
        sql_page = ""
        if page != None and limit !=None:
            offset = (page - 1) * limit
            sql_page = f"LIMIT {limit} OFFSET {offset}"

        sql = f"SELECT {field} FROM {self.table} WHERE {self.__build_where(where)} ORDER BY {sort} {sql_page};"
        rows = []
        result = None
        conn = self.__conn()
        # 创建一个游标对象Cursor
        cursor = conn.cursor()
        # 执行SQL语句
        try:
            cursor.execute(sql)
            result = cursor.fetchall()
            # 获取查询结果的列名称
            columns = [description[0] for description in cursor.description]
            # 提交
            conn.commit()
        except Exception as err:
            conn.rollback()
            self.__set_error(err)
        finally:
            # 关闭游标
            cursor.close()
            # 关闭Connection对象
            conn.close()
        # 返回数据

        if result != None:
            # 遍历每条记录,打印列名和对应的值
            for row in result:
                # 将字段名称和数据对应起来
                rows.append(dict(zip(columns, row)))
        count = self.count(where)
        total_page = math.ceil(count/limit) if isinstance(limit,int) and limit > 0 else count
        return {"count":count,"rows":rows,"total_page":total_page,"page":page,"limit":limit}

    def count(self,where):
        """
        查询条数:
        where 查询条件,为list格式,字段:值
        """
        if where == None:
            self.__set_error("缺少查询条件")
            return None
        sql = f"SELECT COUNT(*) FROM {self.table} WHERE {self.__build_where(where)};"
        result = 0
        conn = self.__conn()
        # 创建一个游标对象Cursor
        cursor = conn.cursor()
        # 执行SQL语句
        try:
            cursor.execute(sql)
            result = cursor.fetchone()[0]
            # 提交
            conn.commit()
        except Exception as err:
            conn.rollback()
            self.__set_error(err)
        finally:
            # 关闭游标
            cursor.close()
            # 关闭Connection对象
            conn.close()
        return result

    def sum(self,where,field):
        """
        求和:
        where 查询条件,为list格式,字段:值
        field 求和的字段
        """
        if where == None:
            self.__set_error("缺少查询条件")
            return None
        sql = f"SELECT SUM({field}) FROM {self.table} WHERE {self.__build_where(where)};"
        result = 0
        conn = self.__conn()
        # 创建一个游标对象Cursor
        cursor = conn.cursor()
        # 执行SQL语句
        try:
            cursor.execute(sql)
            result = cursor.fetchone()[0]
            # 提交
            conn.commit()
        except Exception as err:
            conn.rollback()
            self.__set_error(err)
        finally:
            # 关闭游标
            cursor.close()
            # 关闭Connection对象
            conn.close()
        return 0 if result == None else result

    def value(self,where = None,field = None):
        """
        查询1一个字段的值:
        where 查询条件,为list格式
        field 字段名
        """
        row = self.find(where)
        if row == None or field == None:
            return None
        return row.get(field)

    def find(self,where = None,sort = "id DESC"):
        """
        查询1条数据:
        where 查询条件,为list格式
        """
        if where == None:
            self.__set_error("缺少查询条件")
            return None
        sql = f"SELECT * FROM {self.table} WHERE {self.__build_where(where)} ORDER BY {sort} LIMIT 1;"
        result = None
        conn = self.__conn()
        # 创建一个游标对象Cursor
        cursor = conn.cursor()
        # 执行SQL语句
        try:
            cursor.execute(sql)
            result = cursor.fetchone()
            # 获取查询结果的列名称
            columns = [description[0] for description in cursor.description]
            # 提交
            conn.commit()
        except Exception as err:
            conn.rollback()
            self.__set_error(err)
        finally:
            # 关闭游标
            cursor.close()
            # 关闭Connection对象
            conn.close()
        # 返回数据
        if result == None:
            return result
        return dict(zip(columns, result))

    def get_last_id(self):
        """
        获取新增数据的ID
        """
        if len(self.__last_id) == 0:
            return None
        elif len(self.__last_id) == 1:
            return self.__last_id[0]
        else:
            return self.__last_id

    def __set_last_id(self,id):
        """
        设置新增数据的ID
        """
        self.__last_id.append(id)

    def insert(self,data):
        """
        插入数据(1条或多条):
        data 数据,list/dict格式
        """
        if isinstance(data,dict):
            data = [data]
        sql = []
        for row in data:
            sql_field = []
            sql_values = []
            for field in row:
                sql_field.append(field)
                val = row.get(field)
                if isinstance(val,(str,int,float)):
                    sql_values.append(f"'{val}'")
                else:
                    sql_values.append(val)
            sql.append(f"INSERT INTO {self.table} ({','.join(sql_field)}) VALUES ({','.join(sql_values)});")
        result = 0
        conn = self.__conn()
        # 创建一个游标对象Cursor
        cursor = conn.cursor()
        # 执行SQL语句
        try:
            for sql_item in sql:
                cursor.execute(sql_item)
                # 获取最新插入行的ID
                self.__set_last_id(cursor.lastrowid)
            # 提交
            conn.commit()
        except Exception as err:
            conn.rollback()
            self.__set_error(err)
        finally:
            # 插入条数
            result = conn.total_changes
            # 关闭游标
            cursor.close()
            # 关闭Connection对象
            conn.close()
        return result

    def update(self,where,data):
        """
        更新数据(1条或多条):
        where 查询条件,为list格式
        data 数据,dict格式
        """
        sql_data = []
        for field,value in data.items():
            if isinstance(value,str):
                value = f"'{value}'"
            sql_data.append(f"{field}={value}")
        sql = f"UPDATE {self.table} SET {','.join(sql_data)} WHERE {self.__build_where(where)};"
        result = 0
        conn = self.__conn()
        # 创建一个游标对象Cursor
        cursor = conn.cursor()
        # 执行SQL语句
        try:
            cursor.execute(sql)
            # 提交
            conn.commit()
        except Exception as err:
            conn.rollback()
            self.__set_error(err)
        finally:
            # 更新条数
            result = cursor.rowcount
            # 关闭游标
            cursor.close()
            # 关闭Connection对象
            conn.close()
        # 返回数据
        return result

    def delete(self,where):
        """
        删除数据:
        where 条件,dict格式
        """
        sql = f"DELETE FROM {self.table} WHERE {self.__build_where(where)};"
        result = 0
        conn = self.__conn()
        # 创建一个游标对象Cursor
        cursor = conn.cursor()
        # 执行SQL语句
        try:
            cursor.execute(sql)
            # 提交
            conn.commit()
        except Exception as err:
            conn.rollback()
            self.__set_error(err)
        finally:
            # 更新条数
            result = cursor.rowcount
            # 关闭游标
            cursor.close()
            # 关闭Connection对象
            conn.close()
        # 返回数据
        return result

    def dict(self,where = None,key_field = None,value_field = None,sort = "id DESC"):
        """
        查询多条数据:
        where 查询条件,为list格式,字段:值
        page  当前页
        limit 每页条数
        """
        if where == None or key_field == None or value_field == None:
            self.__set_error("缺少参数")
            return None
        sql = f"SELECT {key_field},{value_field} FROM {self.table} WHERE {self.__build_where(where)} ORDER BY {sort};"
        rows = {}
        result = None
        conn = self.__conn()
        # 创建一个游标对象Cursor
        cursor = conn.cursor()
        # 执行SQL语句
        try:
            cursor.execute(sql)
            result = cursor.fetchall()
            # 获取查询结果的列名称
            columns = [description[0] for description in cursor.description]
            # 提交
            conn.commit()
        except Exception as err:
            conn.rollback()
            self.__set_error(err)
        finally:
            # 关闭游标
            cursor.close()
            # 关闭Connection对象
            conn.close()
        # 返回数据
        if result != None:
            # 遍历每条记录,打印列名和对应的值
            for row in result:
                rows[row[0]] = row[1]
        return rows

    def execute(self,sql):
        """
        执行sql数据:
        sql sql语句
        """
        result = False
        conn = self.__conn()
        # 创建一个游标对象Cursor
        cursor = conn.cursor()
        # 执行SQL语句
        try:
            if isinstance(sql,list):
                result = []
                for sql_item in sql:
                    cursor.execute(sql_item)
            else:
                cursor.execute(sql)
            # 提交
            conn.commit()
        except Exception as err:
            conn.rollback()
            self.__set_error(err)
        finally:
            # 关闭游标
            cursor.close()
            # 关闭Connection对象
            conn.close()
        # 返回数据
        return result

    def get_error(self):
        """
        获取错误信息
        """
        return self.__error

    def __set_error(self,error):
        """
        设置错误信息
        """
        self.__error = error
        return False

    def __result(self,status,msg = None,data = None):
        """
        返回数据:
        """
        return {
                "status":status,
                "msg":msg,
                "data":data
            }

    def __build_where(self,where = None,parent = 0):
        """
        构建WHERE
        示例:
        [
            ["or",["field1","=","china"],["field2","LIKE","张三"]],
            ["field2","=","李四"],
            ["field3","is","null"],
            ["field4","between",[100,200]],
            ["field5","in",['张三',200]],
            ["field6","in","李四,王五"]
        ]
        遍历where里面的item
        item[0] 格式:字段名、OR
                字段名:常规查询
                OR:则表示OR查询,后面可能就会有item[1],item[2],item[3] ...,每项的格式则为 ["字段名","表达式","值"]
        item[1] 格式:=、!=、>=、<=、LIKE、IS、BETWEEN、LIKE、IN、["字段名","表达式","值"]
                IS:item[2]则为NULL 或 NOT NULL
                BETWEEN:item[2]则为list,格式为["值1","值2"]
                IN:则需判断item[2]类型是否为list,若是,则需转成字符串
                ["字段名","表达式","值"]:表示OR查询
                其他:表达式
        item[2] 格式:值、["字段名","表达式","值"]、NULL、NOT NULL
                值:常规的值,可以是字符串、整型、浮点型等
                NULL、NOT NULL:则说明item[1]为IS
                ["字段名","表达式","值"]:表示OR查询
        """
        if where == None:
            return False
        sql = []
        sql_child = []
        for item in where:
            i0 = item[0]
            i1 = item[1].upper() if isinstance(item[1],str) else item[1]
            i2 = item[2]
            match i1:
                case 'IS':
                    sql.append(f"{i0} {i1} {i2.upper()}")
                case 'BETWEEN':
                    sql.append(f"{i0} {i1} {i2[0]} AND {i2[1]}")
                case 'LIKE':
                    sql.append(f"{i0} {i1} '%{i2}%'")
                case 'IN':
                    # 若IN里面有数字,则需要转成字符串
                    if isinstance(i2,str):
                        i2 = i2.split(",")
                    i2 = ",".join([str(a) if not isinstance(a,str) else "'"+a+"'" for a in i2])
                    sql.append(f"{i0} {i1} ({i2})")
                case _:
                    # OR查询
                    if i0.upper() == 'OR':
                        sql_child = self.__build_where(item[1:],1)
                    else:
                        sql.append(f"{i0} {i1} '{i2}'")
        if sql_child != []:
            sql = sql + sql_child
        return " AND ".join(sql) if parent==0 else sql

    def __conn(self):
        """
        链接数据库:
        """
        return sqlite3.connect(self.DB_PATH)