接口自动化(2)

尽量降低Python语句调用数据库的次数~

综合

架构:

配置数据库

在项目下建立config包和utils包,每个包必须有__init__.py文件不然不能导包。

# 数据库信息配置
DB_CONFIG = {"host": "127.0.0.1",
             "user": "root",
             "password": "123456",
             "database": "jwdatabases",
             "port": 3306,
             "charset": "utf8"}

日志目录配置:

    def is_run_data(self,web):
        list_run=[]
        for case in self.load_all_case(web):
            if case['isdel']==1:
                list_run.append(case)
        return list_run

utils工具类包下mysqlutils调用settings:

自定义用例工具类:

现在已经有了一个通用的工具类,可以同过类的对象化调用(封装工具类API),自定义工具类的框架

#其他工具类
from utils.mysqlutil import MysqlUtil

gongju1=MysqlUtil() # gongju1 封装pymysql的API(导包然后对象化类)



#我是用例工具类
class RdTestCase:
    # 1.1加载所有用例
    def load_all_case(self):
        print("我要写代码了~~~")
    # 1.2加载需要执行用例
    def is_run_data(self):
        pass
    # 1.3加载用例配置
    def loadConfKey(self):
        pass
    # 1.4回写存放用例结果
    def updateResults(self):
        pass


config包下放settings文件,配置数据库连接信息。

# 导入PyMySQL库
import pymysql
# 导入数据库的配置信息
from config.settings import DB_CONFIG
# 设置数据库工具类的名称
class MysqlUtil:
    def __init__(self):
        # self.db = pymysql.connect(host="127.0.0.1", user="root", password="123456", database="jwtest1", port=3306,
        #                           charset="utf8")
        # 读取配置文件,初始化pymysql数据库连接
        self.db = pymysql.connect(**DB_CONFIG)
        # 创建数据库游标  返回字典类型的数据
        self.cursor = self.db.cursor(cursor=pymysql.cursors.DictCursor)
    # 获取单条数据
    def get_fetchone(self, sql):
        # 执行sql
        self.cursor.execute(sql)
        # 查询单条数据,结果返回
        return self.cursor.fetchone()
    # 获取多条数据
    def get_fetchall(self, sql):
        # 执行sql
        self.cursor.execute(sql)
        # 查询多条数据,结果返回
        return self.cursor.fetchall()
    # 执行更新类sql
    def sql_execute(self, sql):
        try:
            # db对象和指针对象同时存在
            if self.db and self.cursor:
                # 执行sql
                print("sql是",sql)
                self.cursor.execute(sql)
                # 提交执行sql到数据库,完成insert或者update相关命令操作,非查询时使用
                self.db.commit()
                print("sql执行成功~!")
        except Exception as e:
            # 出现异常时,数据库回滚
            self.db.rollback()
            # 返回结果为失败
            return False
    # 关闭对象,staticmethod静态方法,可以直接使用类名.静态方法。
    @staticmethod
    def close(self):
        # 判断游标对象是否存在
        if self.cursor is not None:
            # 存在则关闭指针
            self.cursor.close()
        # 判断数据库对象是否存在
        if self.db is not None:
            # 存在则关闭数据库对象
            self.db.close()

# 测试代码
if __name__ == '__main__':


    # 验证编写的方法

    mysql = MysqlUtil()
    res1=mysql.get_fetchone("select * from jwtest_case_list")
    print(res1)
    res2 = mysql.get_fetchall("select * from jwtest_case_list")
    print(res2)
    res3=mysql.sql_execute("insert into jwtest_result_record (case_id,result) values ('9999','测试通过');")
    print(res3)

第一个功能,拿出所有的用例:

……
def load_all_case(self):
        sql='select * from jwtest_case_list;'
        return gongju1.get_fetchall(sql)

为了更加通用可以参数化:

    def load_all_case(self,web):
        sql=f'select * from jwtest_case_list where web="{web}";'
        return gongju1.get_fetchall(sql)

第二个功能,判断是否执行用例:

        def is_run_data(self,web):
        list_run=[]
        for case in self.load_all_case(web):
            if case['isdel']==1:
                list_run.append(case)
        return list_run

Python提供了更加简化的方法:

return  [case for case in self.load_all_case(jwweb) if case['isdel']==1]

加载配置:(拿到对应地址)

    # 1.3加载用例配置
    def loadConfKey(self,web,key):
        sql=f'select * from jwtest_config where web="{web}" and key1="{key}"'
        return gongju1.get_fetchone(sql)['value']

这里的列表名key1不能写key会冲突,key是MySQL的关键字。

第四个功能,传参对数据库进行修改:

    def updateResults(self,response,is_pass,case_id):
        current_time=datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        sql=f'insert into '\
            f'jwtest_result_record(case_id,times,response,result) '\
            f'values("{case_id}","{current_time}","{str(response)}","{str(is_pass)}")'
        return gongju1.sql_execute(sql)

这个功能用来保存用例的结果。

这样,RdTestCase工具类做完了。

日志工具类

from config.settings import get_log_path
import logging
import time
import os


class LogUtil:
    def __init__(self):
        # 初始化日志对象,设置日志名称
        self.logger = logging.getLogger("jwlogger")
        # 设置总的日志级别开关
        self.logger.setLevel(logging.DEBUG)
        # 避免日志重复
        if not self.logger.handlers:
########################1.设置在文件中输出###################
            # 定义日志名称
            self.log_name = '{}.log'.format(time.strftime("%Y_%m_%d", time.localtime()))
            # 定义日志路径及文件名称
            self.log_path_file = os.path.join(get_log_path(), self.log_name)
            # 定义文件处理handler
            fh = logging.FileHandler(self.log_path_file, encoding='utf-8', mode='a')
            # 设置文件处理handler的日志级别
            fh.setLevel(logging.DEBUG)
            # 日志格式变量
            formatter = logging.Formatter("%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s")
            # 设置打印格式
            fh.setFormatter(formatter)
            # 添加handler
            self.logger.addHandler(fh)
            # 关闭handler
            fh.close()
##################################2.设置在控制台输出########
            # 定义控制台输出流handler
            fh_stream = logging.StreamHandler()
            # 控制台输出日志级别
            fh_stream.setLevel(logging.DEBUG)
            # 设置打印格式
            fh_stream.setFormatter(formatter)
            # 添加handler
            self.logger.addHandler(fh_stream)

    def log(self):
        # 返回定义好的logger对象,对外直接使用log函数即可
        return self.logger


logger = LogUtil().log()
# 测试代码
if __name__ == '__main__':
    logger.info('jwtest11')
    logger.info('jwtest22')
    logger.info('jwtest33')


评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注