import time
import xmlrpclib
from locust import Locust, events, task, TaskSet
class XmlRpcClient(xmlrpclib.ServerProxy):
"""
Simple, sample XML RPC client implementation that wraps xmlrpclib.ServerProxy and
fires locust events on request_success and request_failure, so that all requests
gets tracked in locust's statistics.
"""
def __getattr__(self, name):
func = xmlrpclib.ServerProxy.__getattr__(self, name)
def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = func(*args, **kwargs)
except xmlrpclib.Fault as e:
total_time = int((time.time() - start_time) * 1000)
events.request_failure.fire(request_type="xmlrpc", name=name, response_time=total_time, exception=e)
else:
total_time = int((time.time() - start_time) * 1000)
events.request_success.fire(request_type="xmlrpc", name=name, response_time=total_time, response_length=0)
# In this example, I've hardcoded response_length=0. If we would want the response length to be
# reported correctly in the statistics, we would probably need to hook in at a lower level
return wrapper
class XmlRpcLocust(Locust):
"""
This is the abstract Locust class which should be subclassed. It provides an XML-RPC client
that can be used to make XML-RPC requests that will be tracked in Locust's statistics.
"""
def __init__(self, *args, **kwargs):
super(XmlRpcLocust, self).__init__(*args, **kwargs)
self.client = XmlRpcClient(self.host)
class ApiUser(XmlRpcLocust):
host = "http://127.0.0.1:8877/"
min_wait = 100
max_wait = 1000
class task_set(TaskSet):
@task(10)
def get_time(self):
self.client.get_time()
@task(5)
def get_random_number(self):
self.client.get_random_number(0, 100)