尽量降低Python语句调用数据库的次数~
综合
架构:

配置数据库
在项目下建立config包和utils包,每个包必须有__init__.py文件不然不能导包。
# 数据库信息配置
DB_CONFIG = {"host": "127.0.0.1",
"user": "root",
"password": "123456",
"database": "jwdatabases",
"port": 3306,
"charset": "utf8"}日志目录配置:
def is_run_data(self,web):
list_run=[]
for case in self.load_all_case(web):
if case['isdel']==1:
list_run.append(case)
return list_runutils工具类包下mysqlutils调用settings:
自定义用例工具类:
现在已经有了一个通用的工具类,可以同过类的对象化调用(封装工具类API),自定义工具类的框架:
#其他工具类
from utils.mysqlutil import MysqlUtil
gongju1=MysqlUtil() # gongju1 封装pymysql的API(导包然后对象化类)
#我是用例工具类
class RdTestCase:
# 1.1加载所有用例
def load_all_case(self):
print("我要写代码了~~~")
# 1.2加载需要执行用例
def is_run_data(self):
pass
# 1.3加载用例配置
def loadConfKey(self):
pass
# 1.4回写存放用例结果
def updateResults(self):
pass
config包下放settings文件,配置数据库连接信息。
# 导入PyMySQL库
import pymysql
# 导入数据库的配置信息
from config.settings import DB_CONFIG
# 设置数据库工具类的名称
class MysqlUtil:
def __init__(self):
# self.db = pymysql.connect(host="127.0.0.1", user="root", password="123456", database="jwtest1", port=3306,
# charset="utf8")
# 读取配置文件,初始化pymysql数据库连接
self.db = pymysql.connect(**DB_CONFIG)
# 创建数据库游标 返回字典类型的数据
self.cursor = self.db.cursor(cursor=pymysql.cursors.DictCursor)
# 获取单条数据
def get_fetchone(self, sql):
# 执行sql
self.cursor.execute(sql)
# 查询单条数据,结果返回
return self.cursor.fetchone()
# 获取多条数据
def get_fetchall(self, sql):
# 执行sql
self.cursor.execute(sql)
# 查询多条数据,结果返回
return self.cursor.fetchall()
# 执行更新类sql
def sql_execute(self, sql):
try:
# db对象和指针对象同时存在
if self.db and self.cursor:
# 执行sql
print("sql是",sql)
self.cursor.execute(sql)
# 提交执行sql到数据库,完成insert或者update相关命令操作,非查询时使用
self.db.commit()
print("sql执行成功~!")
except Exception as e:
# 出现异常时,数据库回滚
self.db.rollback()
# 返回结果为失败
return False
# 关闭对象,staticmethod静态方法,可以直接使用类名.静态方法。
@staticmethod
def close(self):
# 判断游标对象是否存在
if self.cursor is not None:
# 存在则关闭指针
self.cursor.close()
# 判断数据库对象是否存在
if self.db is not None:
# 存在则关闭数据库对象
self.db.close()
# 测试代码
if __name__ == '__main__':
# 验证编写的方法
mysql = MysqlUtil()
res1=mysql.get_fetchone("select * from jwtest_case_list")
print(res1)
res2 = mysql.get_fetchall("select * from jwtest_case_list")
print(res2)
res3=mysql.sql_execute("insert into jwtest_result_record (case_id,result) values ('9999','测试通过');")
print(res3)第一个功能,拿出所有的用例:
……
def load_all_case(self):
sql='select * from jwtest_case_list;'
return gongju1.get_fetchall(sql)为了更加通用可以参数化:
def load_all_case(self,web):
sql=f'select * from jwtest_case_list where web="{web}";'
return gongju1.get_fetchall(sql)第二个功能,判断是否执行用例:
def is_run_data(self,web):
list_run=[]
for case in self.load_all_case(web):
if case['isdel']==1:
list_run.append(case)
return list_runPython提供了更加简化的方法:
return [case for case in self.load_all_case(jwweb) if case['isdel']==1]加载配置:(拿到对应地址)
# 1.3加载用例配置
def loadConfKey(self,web,key):
sql=f'select * from jwtest_config where web="{web}" and key1="{key}"'
return gongju1.get_fetchone(sql)['value']这里的列表名key1不能写key会冲突,key是MySQL的关键字。
第四个功能,传参对数据库进行修改:
def updateResults(self,response,is_pass,case_id):
current_time=datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
sql=f'insert into '\
f'jwtest_result_record(case_id,times,response,result) '\
f'values("{case_id}","{current_time}","{str(response)}","{str(is_pass)}")'
return gongju1.sql_execute(sql)这个功能用来保存用例的结果。
这样,RdTestCase工具类做完了。
日志工具类
from config.settings import get_log_path
import logging
import time
import os
class LogUtil:
def __init__(self):
# 初始化日志对象,设置日志名称
self.logger = logging.getLogger("jwlogger")
# 设置总的日志级别开关
self.logger.setLevel(logging.DEBUG)
# 避免日志重复
if not self.logger.handlers:
########################1.设置在文件中输出###################
# 定义日志名称
self.log_name = '{}.log'.format(time.strftime("%Y_%m_%d", time.localtime()))
# 定义日志路径及文件名称
self.log_path_file = os.path.join(get_log_path(), self.log_name)
# 定义文件处理handler
fh = logging.FileHandler(self.log_path_file, encoding='utf-8', mode='a')
# 设置文件处理handler的日志级别
fh.setLevel(logging.DEBUG)
# 日志格式变量
formatter = logging.Formatter("%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s")
# 设置打印格式
fh.setFormatter(formatter)
# 添加handler
self.logger.addHandler(fh)
# 关闭handler
fh.close()
##################################2.设置在控制台输出########
# 定义控制台输出流handler
fh_stream = logging.StreamHandler()
# 控制台输出日志级别
fh_stream.setLevel(logging.DEBUG)
# 设置打印格式
fh_stream.setFormatter(formatter)
# 添加handler
self.logger.addHandler(fh_stream)
def log(self):
# 返回定义好的logger对象,对外直接使用log函数即可
return self.logger
logger = LogUtil().log()
# 测试代码
if __name__ == '__main__':
logger.info('jwtest11')
logger.info('jwtest22')
logger.info('jwtest33')

发表回复