当前位置:网站首页 / 个人 / 正文

[代码片段]python仿照ThinkphpDB类 链式操作[未完成]

时间:2019年04月15日 | 作者 : admin | 分类 : 个人 | 浏览: 762次 | 评论 0

一时无聊写的   还没完成。。。。睡觉了   

# -*- coding: UTF-8 -*-
import MySQLdb.cursors
import MySQLdb

import logging
import sys

reload(sys)
sys.setdefaultencoding('utf-8')

class Db:
    __prefix = ''
    __conn = None
    __sql = ''
    __sql_error = False
    __sql_error_info = ''
    '''
    是否多次调用
    '''
    __c_where = False

    '''
    各种查询语法
    '''
    __is_select = 0
    __is_delete = 0
    __is_update = 0
    __is_insert = 0
    __is_find = 0
    __is_where = 0
    __dumpsql = 0

    '''
    各种拼接数据
    '''
    __table = ''
    __field = '*'
    __where = ''

    def __init__(self, db_config):
        logging.basicConfig(
            format="%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s"
        )
        default_config = {
            'db_host': 'localhost',
            'db_port': '3306',
            'db_user': '',
            'db_pass': '',
            'db_name': '',
            'db_prefix': ''
        }
        config = dict(default_config, **db_config)
        try:
            self.__conn = MySQLdb.connect(
                host=config['db_host'], port=int(config['db_port']),
                user=config['db_user'], passwd=config['db_pass'],
                db=config['db_name'], charset='utf8',
                cursorclass=MySQLdb.cursors.DictCursor
            )
        except MySQLdb.OperationalError, e:
            logging.critical('DatabsesError:' + str(e.args))
            exit()
        self.__prefix = config['db_prefix']

    def __del__(self):
        if self.__conn:
            self.__conn.close()

    def query(self, sql):
        try:
            cursor = self.__conn.cursor()
        except self.__conn.InternalError, e:
            logging.error('QueryError:' + str(e.args))
            return False

        try:
            cursor.execute(sql)
        except self.__conn.Error, e:
            logging.error('QueryError:' + str(e.args))
            return False
        '''获取所有数据'''
        res = cursor.fetchall()
        if not res:
            return None
        return res

    @staticmethod
    def __field_handle(string):
        return '`' + string + '`'

    def table(self, table):
        self.__table = table
        return self

    def name(self, table):
        self.__table = self.__prefix + table
        return self

    def field(self, field):
        db_field = ''
        i = str(field)
        i = i.split(',')
        for value in i:
            if '.' in value:
                j = ''
                value1 = value.split('.', 1)
                for v in value1:
                    j += self.__field_handle(v) + '.'
                db_field += j[:-1] + ','
            else:
                db_field += self.__field_handle(value) + ','
        self.__field = db_field[:-1]
        return self

    def where(self, condition, symbol='', value=''):
        self.__is_where = True
        sql = ''
        if not value and symbol:
            '''使用 = 作为操作符'''
            if isinstance(symbol, int):
                sql = '%s = %s' % (condition, str(symbol))
            elif isinstance(symbol, str):
                sql = '%s = \'%s\'' % (condition, symbol)
            else:
                self.__sql_error = True
                self.__sql_error_info = 'where中value类型错误'
        else:
            '''使用自定义运算符'''
            if symbol:
                if isinstance(value, int):
                    sql = '%s %s %s' % (condition, symbol, str(value))
                elif isinstance(value, str):
                    sql = '%s %s \'%s\'' % (condition, symbol, str(value))
            else:
                self.__sql_error = True
                self.__sql_error_info = 'where中操作符类型错误'

        if self.__c_where:
            '''多次调用'''
            self.__where = '(' + self.__where + ')'
            self.__where += ' AND (' + sql + ')'
        else:
            '''第一次调用'''
            self.__c_where = True
            self.__where = sql
        return self

    def dump_sql(self, isdump):
        if isdump:
            self.__dumpsql = 1
        else:
            self.__dumpsql = 0
        return self

    def select(self):
        self.__sql = 'SELECT ' + self.__field + ' from ' + self.__table
        if self.__is_where:
            self.__sql += ' where ' + self.__where

        if self.__dumpsql:
            return self.__sql
        return self.query(self.__sql)
aaa = {
    'db_user': 'root',
    'db_pass': 'root',
    'db_name': 'web'
}
a = Db(aaa)
data = a.table('ld_user').where('isadmin', '0').field('id,user').dump_sql(False).select()
print(data[0])


推荐您阅读更多有关于“mysqlpython,”的文章

猜你喜欢

网站分类
搜索
友情链接
Top