收藏文章 楼主

Websocket协议压测记录

版块:网站建设   类型:普通   作者:小绿叶技术博客   查看:1183   回复:0   获赞:8   时间:2020-09-13 01:57:50

Websocket协议压测记录


背景:


    公司的行情系统是采用的websocket协议,有请求和订阅两种方式向服务器申请最新行情信息。请求方式是一次的,订阅方式是建立连接后,服务器定时向客户端推送行情信息。


初步测试方案:


因考虑到websocket是双工通讯,是长连接,并且本次压测的性能指标是系统能建立的最大连接数,并且是建立连接后服务器能持续向客户端推送行情信息。


基于以上原因考虑用python采用多线程建立连接,为了验证能否收到推送的信息,把返回的行情信息保存到文本文件中。Python脚本如下:


 


import websocket


import time


import threading


import gzip


#import json


#from threadpool import ThreadPool, makeRequests


#from websocket import create_connection


 


SERVER_URL = "ws://pro-web-new.devtest.exshell-dev.com/r1/main/ws"


#SERVER_URL = "wss://i.cg.net/wi/ws"


#SERVER_URL = "wss://www.exshell.com/r1/main/ws"


def on_message(ws, message):


    print(message)


 


def on_error(ws, error):


    print(error)


 


def on_close(ws):


    print("### closed ###")


 


def on_open(ws):


    def send_trhead():


 


        send_info = '{"sub": "market.ethusdt.kline.1min","id": "id10"}'


        #send_info = '{"event":"subscribe", "channel":"btc_usdt.deep"}'


        while True:


            #time.sleep(5)


            #ws.send(json.dumps(send_info))


            ws.send(send_info)


            while (1):


                compressData = ws.recv()


                result = gzip.decompress(compressData).decode('utf-8')


                if result[:7] == '{"ping"':


                    ts = result[8:21]


                    pong = '{"pong":' + ts + '}'


                    ws.send(pong)


                    ws.send(send_info)


                else:


                    #print(result)


                    with open('./test_result.txt', 'a') as f:


                        f.write(threading.currentThread().name+'\n')


                        f.write(result+'\n')


 


    t = threading.Thread(target=send_trhead)


    t.start()


    print(threading.currentThread().name)


def on_start(a):


 


    # time.sleep(2)


    # websocket.enableTrace(True)


    # ws = websocket.WebSocketApp(SERVER_URL,


    #                             on_message=on_message,


    #                             on_error=on_error,


    #                             on_close=on_close)


    # ws.on_open = on_open


    # ws.run_forever()


    #print(a[2])


    try:


        ws = websocket.create_connection(SERVER_URL)


        on_open(ws)


    except Exception as e:


        print('error is :',e)


        print('connect ws error,retry...')


        time.sleep(5)


if __name__ == "__main__":


 


    # pool = ThreadPool(3)


    # test = list()


    # for ir in range(3):


    #     test.append(ir)


    #


    # requests = makeRequests(on_start, test)


    # [pool.putRequest(req) for req in requests]


    # pool.wait()


    # # #on_start(1)


 


    for ir in range(20):


        on_start(1)


        time.sleep(0.1)


 


初步测试结果:


    在压测的过程中,发现连接数达到一定程度(单机1400连接),连接就断掉了,监控发现压力机内存基本消耗光了,因建立连接,并接收返回的信息,随着连接数增加,内存消耗大,只能断开连接,释放内存。


 


调整测试方案:


    和架构、开发讨论后,准备在websocket客户端采用AIO异步通讯方式增大压力,因当时是考虑到长连接未考虑这种方式,查询资料,发现websocket服务端可以采用AIO异步通讯方式,在websocket客户端尝试一下,采用locust + python的方式,也查找了一些资料,发现方案可行。


    Locust是一款可扩展的,分布式的,性能测试的,开源的,用Python编写的性能测试工具。对于测试HTTP协议的接口是比较方便的,但是它也支持测试别的协议的接口,不过需要重写Locust类。脚本如下:


 


from locust import Locust, events, task, TaskSet


import websocket


import time


import gzip


 


class WebSocketClient():


     def __init__(self, host, port):


         self.host = host


         self.port = port


 


class WebSocketLocust(Locust):


     def __init__(self, *args, **kwargs):


         self.client = WebSocketClient("172.31.15.85", 9503)


 


class UserBehavior(TaskSet):


 


    ws = websocket.WebSocket()


     #self.ws.connect("ws://10.98.64.103:8807")


     ws.connect("ws://pro-web-new.devtest.exshell-dev.com/r1/main/ws")


 


     @task(1)


     def buy(self):


         try:


             start_time = time.time()


 


             #self.ws.send('{"url":"/buy","data":{"id":"123","issue":"20170822","doubled_num":2}}')


            #result = self.ws.recv()


 


             send_info = '{"sub": "market.ethusdt.kline.1min","id": "id10"}'


             # send_info = '{"event":"subscribe", "channel":"btc_usdt.deep"}'


             while True:


                 # time.sleep(5)


                # ws.send(json.dumps(send_info))


                 ws.send(send_info)


                 while (1):


                    compressData = ws.recv()


                    result = gzip.decompress(compressData).decode('utf-8')


                     if result[:7] == '{"ping"':


                        ts = result[8:21]


                         pong = '{"pong":' + ts + '}'


                         ws.send(pong)


                        ws.send(send_info)


                     else:


                         # print(result)


                        with open('./test_result.txt', 'a') as f:


                             #f.write(threading.currentThread().name + '\n')


                             f.write(result + '\n')


         except Exception as e:


             print("error is:",e)


 


class ApiUser(WebSocketLocust):


    task_set = UserBehavior


     min_wait = 100


     max_wait = 200


 


 


 


用命令执行脚本:


Locust -f websocket_client_locust.py –-no-web -c 1 -r 1 -t 60s


单个用户执行成功,并能生成文件。但多个用户执行的时候就报错,报错信息如下:This socket is already used by another greenlet: <bound method Waiter.switch of <gevent.hub.Waiter object at 0x7f01f0594900>>


错误原因说的是socke正在被使用,但是我的代码中是新的socket,简单分析了一下,应该不会出现问题,但是我的socek的使用部分是一个全局的client,然后程序运行的时候出现了上述错误.仔细推测我找出了原因:


geven是个协程库,那么多个协程共用一个socek的时候就会出现上述错误了,于是我把socket改成了局部的,问题解决.


修改前:


 


 


 


 


 


修改后:


 


 


 


 


修改后代码:


from locust import Locust, events, task, TaskSet


import websocket


import time


import gzip


 


class WebSocketClient():


     def __init__(self, host):


         self.host = host


         #self.port = port


 


class WebSocketLocust(Locust):


     def __init__(self, *args, **kwargs):


         self.client = WebSocketClient("172.31.15.85")


 


class UserBehavior(TaskSet):


 


     # ws = websocket.WebSocket()


    # #self.ws.connect("ws://10.98.64.103:8807")


    # ws.connect("ws://pro-web-new.devtest.exshell-dev.com/r1/main/ws")


 


     @task(1)


     def buy(self):


         try:


            ws = websocket.WebSocket()


             # self.ws.connect("ws://10.98.64.103:8807")


             ws.connect("ws://pro-web-new.devtest.exshell-dev.com/r1/main/ws")


 


             start_time = time.time()


 


             #self.ws.send('{"url":"/buy","data":{"id":"123","issue":"20170822","doubled_num":2}}')


            #result = self.ws.recv()


 


            send_info = '{"sub": "market.ethusdt.kline.1min","id": "id10"}'


             # send_info = '{"event":"subscribe", "channel":"btc_usdt.deep"}'


             while True:


                 # time.sleep(5)


                # ws.send(json.dumps(send_info))


                 ws.send(send_info)


                 while (1):


                    compressData = ws.recv()


                    result = gzip.decompress(compressData).decode('utf-8')


                     if result[:7] == '{"ping"':


                        ts = result[8:21]


                         pong = '{"pong":' + ts + '}'


                         ws.send(pong)


                        ws.send(send_info)


                     # else:


                    #     # print(result)


                    #     with open('./test_result.txt', 'a') as f:


                    #         #f.write(threading.currentThread().name + '\n')


                    #         f.write(result + '\n')


         except Exception as e:


             print("error is:",e)


 


class ApiUser(WebSocketLocust):


    task_set = UserBehavior


     min_wait = 100


     max_wait = 200


 


 


 


压测开始,随着用户数上升,压力机端发生如下错误:500和502错误


 


 


 


这是协议进行握手时失败,查询后端行情应用服务器,也有大量报错。


查看服务器发现打开最大文件数是1024,调整到65535,用如下命令调整:


 


第一步,修改/etc/sysctl.conf文件,在文件中添加如下行:


  net.ipv4.ip_local_port_range = 1024 65000


  这表明将系统对本地端口范围限制设置为1024~65000之间。请注意,本地端口范围的最小值必须大于或等于1024;而端口范围的最大值则应小于或等于65535.修改完后保存此文件。


  第二步,执行sysctl命令:


  [speng@as4 ~]$ sysctl -p


  如果系统没有错误提示,就表明新的本地端口范围设置成功。如果按上述端口范围进行设置,则理论上单独一个进程最多可以同时建立60000多个TCP客户端连接。


 


调整完成后复测,发现还是报这个错误,请开发进行定位分析应用程序。

提供企业建站服务,免费网防系统,提交信息登录 http://yundun.ddoss.cn 邮箱: proposal@ddoss.cn 
回复列表
默认   热门   正序   倒序

回复:Websocket协议压测记录

头像

用户名:

粉丝数:

签名:

资料 关注 好友 消息