xmysql是一个mysql操作映射为api的js库,方便自动化测试数据构建、读取提供方便,但是使用xmysql需要学习新的语法,何不如动手写映射代码直接传递sql语句呢?

可能很多公司存在跟我们一样的情况:mysql服务是关闭了远程连接的,如果需要链接到数据库,需要通过SSH连接到服务器,再通过服务器操作连接到服务器本地的SQL。
而这一情况对我们测试人员很不友好,在校验数据的时候,要么只能手工去查,要么需要编写SSH->SQL的中转插件。
而部门的同事提议并部署了xmysql在测试服,我一开始认为该方案非常好,原理是通过http携带参数,通过xmysql去进行数据库擦操作,但是问题在于两点:
1、使用xmysql需要学习新的语法,如果还熟练使用,还要学不少语法
2、不知道为什么,我调用不了xmysql的dynamic方法,死活用不了。这个方法才是精髓,因为可以无需学习成本,直接传sql语句给xmysql去操作数据库

基于以上问题,想着要不自己写一个吧,根据xmyql把http映射成sql操作的思路,说干就干
实现思路
1、建立http通道
2、接收Body里的sql语句
3、连接sql数据库,传递sql语句执行
4、对结果集处理成json,返回给客户端

根据这个思路写了ymysql后,这个ymysql脚本是通用了,只要你用的是mysql就能用(源码在文末)

优点

不需要像xmysql一样学习新的语法,你平常sql怎么写,现在就怎么写

只用于本地调试?

ymysql.py脚本基于python3,依赖bottle、DBUtils和pyMysql,安装好在CMD或IDE中传入参数启动脚本

python3 ymysql.py -u root -p 数据库密码 -d 数据库名 -id 鉴权字符串

浏览器直接get请求:http://localhost:8999/?id=123
如果返回“请使用post请求!”说明脚本启动成功(get仅用启动时测试,具体使用见下面)

用户端的请求方式

1、必须是POST请求
2、请求连接中必须有id这个参数(这个怎么来下面有讲)
3、body不要选择form-data等格式,选择RAW
注意:
1、body里面直接写select 语句即可,不需要什么括号,双引号等,你在mysql管理工具怎么查询的,你在这里就怎么写
2、这个id怎么来?这个id是动态的,每次启用这个查询程序,都需要设定这个参数,写什么取决于启动该程序的人。这么设定是防止别人拿到链接后从此一直可以查询数据。比如我一开始开放给某个需要的人员访问。但是我不希望他一直能用,那么我只需要重新启动下程序,修改id,即可让对方没办法查询数据。不做成加密的形式,是基于简便考虑(本来就是为了简便才写的ymysql)

示例:查询v2_order的用户id为229878的订单号和姓名和收货地址

postman示例

[POST] http://test-api.rrzuji.com/ymysql/?id=abc

postman示例.png

jmeter示例

jmeter请求示例.png

jmeter响应示例.png

注意事项

1、我没有对请求的结果进行分页,建议自己写sql时每次都自己分页,可使用limit
2、不要写select * from这种,查询会变得缓慢,如果你写了,post后程序会提醒你
3、仅支持select操作,因为如果别人知道了链接,那岂不是可以直接删库?为此,限制了只能进行select,其他关键字不允许

服务端使用方式

启动方式

把脚本丢到服务器上面
如果你只是想试一下,则直接调用python3执行,这种方式你账户登录进程就会被杀掉:

python3 ymysql.py -u root -p 数据库密码 -d 数据库名 -id 鉴权字符串

如果你想让ymysql挂后台运行则执行(退出SSH登录不影响脚本运行):

nohup python3 -u ymysql.py -u root -p 数据库密码-d 数据库名称 -id id

最好的启动方式:后台运行+写运行日志,报错了就去日志文件查(推荐):
nohup python3 -u ymysql.py -u root -p 数据库密码-d 数据库名称 -id id 1> ./log/ymysql_info.log 2>&1 &

启动成功的表现:如下图,会出现监听提醒。脚本默认运行在8999端口

ymysql启动.png

注意

1、命令中必须加-u,因为python默认对输出有缓冲,如果不加-u,等很久都没法在日志中看到输出
2、这里把标准输出和错误都写在一个文件ymysql_info.log上,如果后续需要拆分标准输出和错误日志,可以分开:1./log/ymysql_error.log 2>./log/ymysql_info.log &
3、为了防止日志过大,命令中使用了>来确保每次启动清空日志,如果需要每次启动追加日志,则把>改为>>即可:1>> ./log/ymysql_info.log 2>>&1 &

日志查询

日志位置:/log/ymysql_info.log
如果你需要实时查看日志,则使用taif命令来帮助在控制台显示新写入的内容
taif -f ymysql.log

帮助

我写了比较详细的帮助,服务端执行:pyhton3 ymysql.py -h,或者你输入任意错误的参数,都会触发

ymysql帮助.png

服务端部署

如果你是本地调试就不用了,这里用的nginx转发的,所以你请求的是80端口,而ymysql脚本监听的是8999端口

环境需求

1、python3
2、安装Bottle:pip3 install Bottle
3、安装DBUtils:pip3 install DBUtils

配置Nginx

/nginx/conf/vhost/ 对conf文件进行配置,
按一下i进入编辑模式。在server{}里面新增。如图所示

nginx.png

最后

这样一来,因为所有的测试工具都会支持http协议,所以可以很简单的在postman或jmeter中从数据库取信息校验。算是一个通用的的解决方案

附上代码(烂代码,不要介意),源码下载

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# create by lonelylizard 2020-05-09
from bottle import Bottle,run,get,post,request,error,response,template
import pymysql
import re
import json
import collections
from copy import deepcopy
import argparse
from DBUtils.PooledDB import PooledDB

description = '''
        通用帮助:

        前端请求步骤示例:
        1、[POST] http://sumver.cn:8999/ymyql?id=sumver
        2、Body选择raw格式,直接填写sql语句,无需任何额外符号修饰
        正确示例:select id,user,pwd from user where id between 1 AND 5
        ==============================================================
        '''
epilog = '''
        ymysql [参数]
        ===================================================
        参数列表:
        -u    user 数据库用户名(必填)
        -p    password 数据库密码(必填)
        -d    database 指定数据库名(必填)
        -s    sercet 鉴权参数(必填),该参数将要求在请求时携带
        示例:ymysql -u root -p 1234 -d zulin -s sumver
        ===================================================
        '''
#args,参数列表
parser = argparse.ArgumentParser(description,epilog)
parser.add_argument("-u","-uname",required=True)
parser.add_argument("-p","-password",required=True)
parser.add_argument("-d","-database",required=True)
parser.add_argument("-id",required=True)

args = parser.parse_args()

#数据库连接配置
__config = {
    "host":"localhost",
    "port":3306,
    "user":args.u,
    "password":args.p,
    "database":args.d,
    "charset":'utf8'
}
#配置连接池,加快查询速度
POOL = PooledDB(
    creator=pymysql,  # 使用链接数据库的模块
    maxconnections=6,  # 连接池允许的最大连接数,0和None表示不限制连接数
    mincached=2,  # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
    maxcached=5,  # 链接池中最多闲置的链接,0和None不限制
    maxshared=3,
    # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
    blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
    maxusage=None,  # 一个链接最多被重复使用的次数,None表示无限制
    setsession=[],  # 开始会话前执行的命令列表。
    **__config
)

# conn  = pymysql.connect(host='localhost',user='root',password='xxx',database='xxx',charset='utf8',)

app = Bottle()
nameArr = []#存放需要返回的字段数组

#获取到select语句中的每个字段,返回数组
def getNames(sql):
    nameArr.clear()
    rule = re.compile("select ([\\s\\S]*) from")
    namesArr = rule.findall(sql)[0]
    for a in namesArr.split(","):
        nameArr.append(a.strip())
        # nameArr.append(a)
        # print(nameArr)
    # print("id数组:",nameArr)



def query(sql):
    try:
       getNames(sql)
    except Exception as e:
        return '<p>前端传输的参数不正确!<br>为了安全起见,仅支持select操作<br>前端请求步骤示例:<br>1、[POST] http://sumver.cn:8999/ymsql?id=sumver<br>2、Body选择raw格式,直接填写sql语句,无需任何额外符号修饰<br>正确示例:select id,user,pwd from user where id between 1 AND 5</p>'
    else:
        if nameArr[0]=="*":
            return "请不要使用select *,你应当明确你要取的字段"
        else:
            all_data = collections.OrderedDict()
            mdata = collections.OrderedDict()
            num = 1
            # conn = getConnect()
            conn = POOL.connection()
            cursor = conn.cursor()
            try:
                cursor.execute(sql)
            except Exception as e:
                print("连接数据库或查询语句出现错误:",e)
            cursor.close()
            conn.close()
            result = cursor.fetchall()#取出结果集
            all_data["count"]=len(result)
            #读取数据库返回结果,拼接成字典返回
            for item in result:
                mdata.clear()#每次都清空mdata
                i = 0
                for item2 in item:
                    mdata[nameArr[i]]=item2
                    # print(mdata[nameArr[i]])
                    i+=1
                all_data[str(num)]=mdata.copy() #这里是坑,如果不是深拷贝,输出的结果会被最后一个mdata覆盖
                # print("all_data:",all_data)
                num+=1
            # print("mdata:",mdata)
            return all_data

@app.route("/",method="GET")
def hello():
    return "请使用post请求!"

@app.route("/",method="POST")
def do_hello():
    if request.params.get("id") != args.id :
        return "鉴权参数不对"
    else:
        sql = request.body.read().decode('utf-8')#read读到的是字节流,转化为Str才处理
        st = query(sql)
        if type(st).__name__=="OrderedDict":
            return json.dumps(st,sort_keys=False)#返回json,不要排序,否则json里的键值对会根据类型、大小进行排序
        else:
            return st

run(app,host='0.0.0.0',port=8999,debug=True,reloader=False)