python用户负载测试工具:locust

logo


locust是什么

locust是一个易于使用的,分布式的,用户负载测试工具。用于web站点(或其他系统)的负载测试,然后算出系统能够处理多少并发用户。
locust的思想是:在测试期间,一大群"蝗虫"会攻击你的网站,每一个"蝗虫"的行为都是由你自己定义的,同时,可以在一个web界面上实时的监控这群进程。这会帮助你更好的"进行战斗",在真正的用户进入之前,就找出代码中的瓶颈。
locust完全是事件驱动的,因此它能够在单机支持数以千计的并发用户,相比许多其他的基于事件的应用,locust不使用回调函数。它使用轻量进程---gevent。每一个访问你的网站的locust实际上都在它自己的进程内部运行(准确地说,是greenlet)。这允许你在不使用带回调函数的复杂代码的情形下,使用python写出非常具有表现力的脚本。

特性

  • 使用普通的python编写用户测试脚本
    不需要笨重的UI或臃肿的XML,只需像平时一样写代码。locust基于协程而不是回调函数,允许你像平时写阻塞的python代码一样写异步代码。

  • 分布式&可扩展---支持无数的用户
    locust支持在多台机器上分布式地运行负载测试,因为是事件驱动的,所以甚至一个locust节点能够在单线程中处理数以千计的用户。模拟大量用户的一部分原因是,即使你模拟了许多用户,但是并非所有的用户都同时访问你的系统,通常情况下用户都是空闲的,思考接下来要做什么,所以每秒的请求数 != 在线的用户数

  • 基于web的UI
    locust有一个整洁的HTML+JS用户接口,它实时的展示了相关的测试细节。因为UI是基于web的,所以它是跨平台的,并且很容易扩展。

  • 能够测试任何系统
    即使locust是面向web的,但是它也能够用于测试几乎任何系统。只需要写一个客户端,然后使用locust运行,非常容易。

  • 易于hack
    locust非常小,并且非常容易hack,我们打算使它保持这样,io事件和协程这些重活都被委派给gevent。

作者

  • Jonatan Heyman (@jonatanheyman on Twitter)
  • Carl Byström (@cgbystrom on Twitter)
  • Joakim Hamrén (@Jahaaja on Twitter)
  • Hugo Heyman (@hugoheyman on Twitter)

License

MIT


安装

pip install locustio 或者
easy_install locustio
当locust被安装的时候,在shell当中可以使用locust命令。为了查看可用的选项,运行:
locust --help

支持python版本
locust支持python2.6+,它现在不兼容Python 3.x

安装ZeroMQ
如果你打算通过多进程或多机,分布式地运行Locust,我们建议你也安装pyzmq
pip install pyzmq或者:
easy_install pyzmq

增加max_open_files
一台机器上的每个HTTP连接,都打开一个新的文件(术语是:文件描述符)。操作系统可能会给每个用户所能打开的最大文件数量设置一个较低的限制,如果这个限制少于测试中模拟的用户数,就会发生失败。
因此,应该增加操作系统的默认的最大文件描述符数量到一个比模拟的用户数量更大的值,如何做,依赖于使用的操作系统。


快速教程

locustfile.py的例子

下面是一个简单的locustfile.py的小例子:

from locust import HttpLocust, TaskSet

def login(l):
    l.client.post("/login", {"username":"ellen_key", "password":"education"})

def index(l):
    l.client.get("/")

def profile(l):
    l.client.get("/profile")

class UserBehavior(TaskSet):
    tasks = {index:2, profile:1}

    def on_start(self):
        login(self)

class WebsiteUser(HttpLocust):
    task_set = UserBehavior
    min_wait=5000
    max_wait=9000

在这里,我们定义了许多locust任务,一个locust任务就是一个带有一个参数(参数是TaskSet类实例)的python可调用对象。这些任务被收集到一个TaskSet类的tasks属性。然后我们有一个代表模拟用户的HttpLocust类,在这个类中,我们定义了一个模拟用户在两次执行任务之间应该等待多久,以及定义用户行为的TaskSet类。TaskSet能够嵌套。
HttpLocust类继承自Locust类,它添加了一个用于生成http请求的client属性---HttpSession类的实例。
另外一种更加方便声明任务的方式是使用@task装饰器,下面的代码等价于上面的代码:

from locust import HttpLocust, TaskSet, task

class UserBehavior(TaskSet):
    def on_start(self):
        """ on_start is called when a Locust start before any task is scheduled """
        self.login()

    def login(self):
        self.client.post("/login", {"username":"ellen_key", "password":"education"})

    @task(2)
    def index(self):
        self.client.get("/")

    @task(1)
    def profile(self):
        self.client.get("/profile")

class WebsiteUser(HttpLocust):
    task_set = UserBehavior
    min_wait=5000
    max_wait=9000

Locust类(以及HttpLocust,因为它是Locust的子类)允许指定每个模拟用户在两次执行任务之间等待的最大和最小时间(min_waitmax_wait),以及其他的用户行为。

启动Locust

为了使用上面的locust文件来运行locust,如果它被命名为locustfile.py,我们可以运行(在与locustfile.py相同的目录下):
locust --host=http://example.com
如果locust file被放到了其他的地方,我们可以运行:
locust -f ../locust_files/my_locust_file.py --host=http://example.com
为了通过多进程来分布式地运行locust,我们应该在启动master进程的时候指定--master选项:
locust -f ../locust_files/my_locust_file.py --master --host=http://example.com
然后我们可以启动任意数量的slave进程:
locust -f ../locust_files/my_locust_file.py --slave --host=http://example.com
如果我们想要在多台机器上分布式的运行locust,在启动slave的时候,我们也必须指定master的host。
locust -f ../locust_files/my_locust_file.py --slave --master-host=192.168.0.100 --host=http://example.com

打开Locust的web接口

一旦你使用了上面的命令行中的任意一个启动了locust,你应该打开浏览器,使他指向http://127.0.0.1:8089 ,然后你会看到下面的欢迎页面:
webinterface


编写locustfile

locustfile是一个普通的python文件,唯一的要求是它至少要声明一个类---我们管它叫locust类---它继承自Locust类。

Locust类

一个Locust类代表一个用户,locust会为每一个模拟用户生成一个locust类的实例,一个Locust类应该定义一些属性:

  • task_set属性
    task_set属性应该指向一个定义用户行为的TaskSet类。

  • min_waitmax_wait属性
    除了task_set属性,也可以声明min_waitmax_wait属性,它们是一个模拟用户在执行任务之间等待的最大和最小时间,单位是毫秒。min_waitmax_wait默认是1000,因此如果没有声明min_waitmax_wait,locust在执行每个任务之间总是会等待1秒。
    使用下面的locustfile,在任务之间每个用户等待5-15秒:

from locust import Locust, TaskSet, task

class MyTaskSet(TaskSet):  
    @task
    def my_task(self):
        print "executing my_task"

class MyLocust(Locust):  
    task_set = MyTaskSet
    min_wait = 5000
    max_wait = 15000

min_waitmax_wait也可以在TaskSet类中重写。

  • weight属性
    你可以像这样从相同的文件中运行两个locust:
    locust -f locust_file.py WebUserLocust MobileUserLocust
    如果你希望使这些locust中的一个执行的更加频繁,你可以在这些类上设置weight属性。对于这个例子而言,web用户可能是mobile用户的三倍:
class WebUserLocust(Locust):  
    weight = 3
    ....

class MobileUserLocust(Locust):  
    weight = 1
    ....
  • host属性
    host属性是一个指向将要被加载的host的URL前缀(比如:http://google.com )。通常而言,当locust被启动的时候,在命令行中使用--host选项来指定它。如果在locust类中声明了host属性,当命令行中没提供host选项的时候,它会被使用。

TaskSet类

如果Locust类代表一群蝗虫,你应该说TaskSet类代表了蝗虫的大脑。每个Locust类必须有一个指向一个TaskSet的task_set属性。
TaskSet是任务的集合,这些任务是普通的python可调用对象。
当负载测试启动的时候,产生的每一个Locust类的实例都会开始执行它们的TaskSet。接下来发生的是,每个TaskSet会选择它的任务中的一个,并且调用它。接下来等待min_waitmax_wait毫秒,然后它会再选择下一个要被调用的任务,再等待,等等。

  • 声明任务
    为TaskSet声明任务最典型的方式是使用task装饰器。
    下面是一个例子:
from locust import Locust, TaskSet, task

class MyTaskSet(TaskSet):  
    @task
    def my_task(self):
        print "Locust instance (%r) executing my_task" % (self.locust)

class MyLocust(Locust):  
    task_set = MyTaskSet

@task装饰器带一个可选的weight参数,它用于指定任务的执行比例。在下面的例子中,task2的执行次数是task1的两倍:

from locust import Locust, TaskSet, task

class MyTaskSet(TaskSet):  
    min_wait = 5000
    max_wait = 15000

    @task(3)
    def task1(self):
        pass

    @task(6)
    def task2(self):
        pass

class MyLocust(Locust):  
    task_set = MyTaskSet
  • tasks属性

使用@task装饰器来声明任务很方便,并且通常是最好的方式。然而,也可以通过设置tasks属性,来定义TaskSet的任务。
tasks属性既可以是python可调用对象的列表,也可以是一个<callable : int>字典,每个任务都是接受一个参数(正在执行这个任务的TaskSet实例)的python可调用对象,下面是一个简单的例子:

from locust import Locust, TaskSet

def my_task(l):  
    pass

class MyTaskSet(TaskSet):  
    tasks = [my_task]

class MyLocust(Locust):  
    task_set = MyTaskSet

如果tasks属性被指定为一个列表,每次将要执行任务的时候,都会随机地从tasks属性中选择,如果tasks是键为可调用对象,值为整型的字典,那么将要被执行的任务,会使用整型作为比例,随机地选取。
因此如下的tasks:
{my_task: 3, another_task:1}
my_stask被执行的次数是another_task的三倍。

  • TaskSet支持嵌套

TaskSet一个非常重要的特性是它支持嵌套。因为真正的网站通常是由多个子区域,以分层的方式组合起来的。嵌套的TaskSet支持把模拟用户的行为定义的更加逼真,比如我们可以定义拥有下面结构的TaskSet:

Main user behaviour  
    Index page

    Forum page
    Read thread
        Reply
    New thread
    View next page

    Browse categories
        Watch movie
        Filter movies
    About page

就像指定任务的时候一样,通过使用tasks属性来定义嵌套的TaskSet。只不过tasks中的元素不指向python可调用对象,而是指向另外一个TaskSet。

class ForumPage(TaskSet):  
    @task(20)
    def read_thread(self):
        pass

    @task(1)
    def new_thread(self):
        pass

    @task(5)
    def stop(self):
        self.interrupt()

class UserBehaviour(TaskSet):  
    tasks = {ForumPage:10}

    @task
    def index(self):
        pass

在上面的例子中,当UserBehaviour TaskSet执行的时候,ForumPage会被选择执行,也就是说ForumPage TaskSet会开始执行。ForumPage TaskSet会选择它自己的任务中的一个,并且执行它,然后等待,等等。
在上面的例子中,有个非常重要的地方要注意,那就是在ForumPage的stop方法里调用了self.interrupt()。它的作用是中断ForumPage TaskSet,将执行控制权交回给UserBehaviour TaskSet。如果没有在ForumPage中调用interrupt(),那么一旦ForumPage启动,locust会不停的执行ForumPage TaskSet。
可以通过对内部类使用@task装饰器来声明嵌套的TaskSet,就像我们声明普通的任务一样:

class MyTaskSet(TaskSet):  
    @task
    class SubTaskSet(TaskSet):
        @task
        def my_task(self):
            pass
  • on_start函数

TaskSet类可以定义一个on_start方法,当模拟用户开始执行TaskSet类的时候,on_start方法会被调用。

  • 引用Locust实例或父TaskSet实例

TaskSet实例的locust属性指向它的Locust类,parent属性指向它的父TaskSet。

生成HTTP请求

到现在为止,本文已经包含了Locust用户的任务调度部分,为了真正的给一个系统进行负载测试,我们需要生成HTTP请求,HttpLocust类的存在,就是为了解决这个问题。当使用HttpLocust类的时候,每个实例都有一个client属性---它是能够用于生成HTTP请求的HttpSession类的实例。

    class HttpLocust
        代表一个被孵化出的用于“攻击”要进行负载测试的系统的HTTP“用户”。  
        这个用户的行为是由指向TaskSet类的task_set属性定义的。  
        这个类在初始化的时候,会创建一个client属性,client属性是一个支持在请求之间保持用户会话的HTTP客户端。  

        client = None 
            在Locust实例化的时候被创建的HttpSession实例,这个客户端支持cookie,因此可以在HTTP请求之间保持会话。  


当从HttpLocust类继承的时候,我们可以使用它的client属性来生成HTTP请求,下面是一个例子:

from locust import HttpLocust, TaskSet, task

class MyTaskSet(TaskSet):  
    @task(2)
    def index(self):
        self.client.get("/")

    @task(1)
    def about(self):
        self.client.get("/about/")

class MyLocust(HttpLocust):  
    task_set = MyTaskSet
    min_wait = 5000
    max_wait = 15000


使用上面的Locust类,每个模拟用户在请求之间都会等待5-15秒,并且/的请求次数是/about/的两倍。
用心的读者可能会觉得很奇怪:在TaskSet内部我们使用self.client而非self.locust.client开引用HttpSession实例,我们能这么做是因为:TaskSet类有一个便捷的被称作client的属性,它简单的返回self.locust.client。

使用HTTP客户端

每个HttpLocust实例都有一个指向HttpSession实例的client属性。HttpSession类其实是requests.Session的子类,能够使用get, post, put, delete, head, patch和options方法来生成HTTP请求,并且会被报告到Locust的统计。HttpSession实例会在请求之间保持cookie,以便它能登陆到网站,在请求之间保持会话。client属性也能从Locust实例的TaskSet实例引用,以便在任务内部,能够很容易的取出client,生成HTTP请求。
下面是一个简单的例子,用于生成到/about路径的GET请求(在这个例子中,我们假定self是TaskSet或HttpLocust类的实例):

response = self.client.get("/about")  
print "Response status code:", response.status_code  
print "Response content:", response.content  


下面是一个生成POST请求的例子:

response = self.client.post("/login", {"username":"testuser", "password":"secret"})  
  • 安全模式
    HTTP客户端被配置成以安全模式运行,任何由于连接错误,超时之类导致失败的请求都不会抛出异常,而是返回一个空的虚拟的Response对象,在Locust的统计中请求会被报告为一个失败。被返回的虚拟的Response对象的content属性被设置为None,status_code属性被设置为0。

  • 人工控制一个请求被视为成功还是失败
    默认情况下,除非HTTP响应码是ok(2xx),否则请求就会被标记为失败。大多数情况下,默认的情况就是我们想要的。然而有时---比如说你期望返回404,或者是测试一个即使发生错误,仍然返回200 OK的系统,就存在人工控制locust将请求视为成功还是失败的需求。
    通过使用catch_response参数和with语句,可以把一个响应码是okay的请求标记成失败:

with client.get("/", catch_response=True) as response:  
    if response.content != "Success":
        response.failure("Got wrong response")


正如可以把响应码为OK的请求标记为失败,也可以使用catch_response参数和with语句,将返回http错误代码的请求在统计中报告为成功。

with client.get("/does_not_exist/", catch_response=True) as response:  
    if response.status_code == 404:
        response.success()
  • 将到具有动态参数的URL的请求分组
    对于网站来说,拥有URL中包含某种动态参数的页面是非常普遍的。通常在Locust的统计中,把这些URL分成一组是有意义的。可以通过给HttpSession实例的请求方法传递name参数,来完成这件事。
    例子:
# Statistics for these requests will be grouped under: /blog/?id=[id]
for i in range(10):  
    client.get("/blog?id=%i" % i, name="/blog?id=[id]")

分布式运行Locust

一旦单机无法满足你需要模拟的用户的数量,Locust支持通过多机分布式的运行负载测试。
为了分布式的运行Locust,需要使用--master标记,以master模式启动一个Locust实例,master节点上会运行Locust的web接口,通过web接口可以开始测试以及查看统计。master节点本身不需要模拟任何用户。必须使用--slave和--master-host(用来指定master节点的ip或主机名)标记来启动一个或多个slave节点。
一个通用的设置是:在一台机器上运行一个单独的master,在slave机器的每个核心上运行一个slave实例。

当分布式的运行Locust的时候,master节点和每个slave节点上都必须有locust测试脚本的拷贝。

例子

以master模式启动Locust:
locust -f my_locustfile.py --master
然后在每一个slave上(用master机器的ip替换192.168.0.14):
locust -f my_locustfile.py --slave --master-host=192.168.0.14

选项

--master
以master的模式运行locust,web接口会运行在这个节点上。

--slave
以slave模式运行locust。

--master-host=X.X.X.X
和--slave一起使用,用来设置master节点的ip或主机名(默认是127.0.0.1)。

--master-port=5557
和--slave一起使用,用来设置master节点的端口号(默认是5557),注意:locust既会使用指定的端口号,又会使用指定的端口号+1,因此如果设置为5557,那么locust既会使用5557,也会使用5558。

--master-bind-host=X.X.X.X
和--master一起使用,决定master节点绑定到哪一个网络接口,默认是*(所有可用的网络接口)。

--master-bind-port=5557
和--master一起使用,决定master节点监听哪一个网络端口(默认是5557)。注意:locust既会使用指定的端口号,又会使用指定的端口号+1,因此如果设置为5557,那么locust既会使用5557,也会使用5558。


使用自定义的客户端测试其他系统

locust因为它的主要目标,内建了HTTP。然而它能够通过编写触发request_successrequest_failure事件的客户端,很容易的扩展成可以测试任何基于请求/响应的系统。

简单的XML-RPC Locust客户端

下面是一个locust类XmlRpcLocust的例子。XmlRpcLocust提供了一个XML-RPC客户端XmlRpcClient,追踪所有产生的请求。

import time  
import xmlrpclib

from locust import Locust, events, task, TaskSet


class XmlRpcClient(xmlrpclib.ServerProxy):  
    """
    Simple, sample XML RPC client implementation that wraps xmlrpclib.ServerProxy and 
    fires locust events on request_success and request_failure, so that all requests 
    gets tracked in locust's statistics.
    """
    def __getattr__(self, name):
        func = xmlrpclib.ServerProxy.__getattr__(self, name)
        def wrapper(*args, **kwargs):
            start_time = time.time()
            try:
                result = func(*args, **kwargs)
            except xmlrpclib.Fault as e:
                total_time = int((time.time() - start_time) * 1000)
                events.request_failure.fire(request_type="xmlrpc", name=name, response_time=total_time, exception=e)
            else:
                total_time = int((time.time() - start_time) * 1000)
                events.request_success.fire(request_type="xmlrpc", name=name, response_time=total_time, response_length=0)
                # In this example, I've hardcoded response_length=0. If we would want the response length to be 
                # reported correctly in the statistics, we would probably need to hook in at a lower level

        return wrapper


class XmlRpcLocust(Locust):  
    """
    This is the abstract Locust class which should be subclassed. It provides an XML-RPC client
    that can be used to make XML-RPC requests that will be tracked in Locust's statistics.
    """
    def __init__(self, *args, **kwargs):
        super(XmlRpcLocust, self).__init__(*args, **kwargs)
        self.client = XmlRpcClient(self.host)


class ApiUser(XmlRpcLocust):

    host = "http://127.0.0.1:8877/"
    min_wait = 100
    max_wait = 1000

    class task_set(TaskSet):
        @task(10)
        def get_time(self):
            self.client.get_time()

        @task(5)
        def get_random_number(self):
            self.client.get_random_number(0, 100)

如果你之前写过locust测试,你会看出ApiUser类是一个普通的Locust类,它的task_set属性是一个包含任务的TaskSet类。ApiUser类从XmlRpcLocust继承,XmlRpcLocust的client属性提供了一个XmlRpcClient实例,XmlRpcClient是标准库的xmlrpclib.ServerProxy的封装,它主要作用只是代理函数调用,除此之外更重要的是触发locust.events.request_successlocust.events.request_failure事件,它会把所有的调用报告到Locust的统计中。
下面是XML-RPC server的实现:

import time  
import random  
from SimpleXMLRPCServer import SimpleXMLRPCServer  
import xmlrpclib

def get_time():  
    time.sleep(random.random())
    return time.time()

def get_random_number(low, high):  
    time.sleep(random.random())
    return random.randint(low, high)

server = SimpleXMLRPCServer(("localhost", 8877))  
print "Listening on port 8877..."  
server.register_function(get_time, "get_time")  
server.register_function(get_random_number, "get_random_number")  
server.serve_forever()  

扩展Locust

Locust带有许多事件,事件提供了钩子用于扩展locust。
可以在locust文件的模块级别注册event listener,下面是一个例子:

from locust import events

def my_success_handler(method, path, response_time, response, **kw):  
    print "Successfully fetched: %s" % (path)

events.request_success += my_success_handler  
  • 强烈建议:在listener函数中增加一个通配的关键字参数(上面代码中的**kw)。这样做可以防止:在将来的版本中因为增加了新的参数,而导致代码出现异常。
  • 为了查看所有的可用的事件,请看events

增加web路由

Locust使用Flask来提供web服务,因此向web server中增加一个URI非常方便,只需要在locustfile中导入Flask app,然后设置一个新的路由:

from locust import web

@web.app.route("/added_page")
def my_added_page():  
    return "Another page"

(That's all, thanks for your reading.)

感谢浏览tim chow的作品!

如果您喜欢,可以分享到: 更多

如果您有任何疑问或想要与tim chow进行交流

可点此给tim chow发信

如有问题,也可在下面留言: