Мне нужно провести стресс-тест системы, и http://locust.io кажется лучшим способом сделать это. Однако похоже, что он настроен на использование одного и того же пользователя каждый раз. Мне нужно, чтобы каждый респаун входил в систему как другой пользователь. Как мне это настроить? В качестве альтернативы, есть ли еще одна система, которую можно было бы использовать?
Способ использования locust.io путем предоставления списка пользователей
Ответы (4)
Автор Locust здесь.
По умолчанию у каждого пользовательского экземпляра HttpLocust есть HTTP-клиент, у которого есть отдельный сеанс.
Locust не имеет функции для предоставления списка учетных данных пользователя или чего-то подобного. Однако ваши сценарии нагрузочного тестирования - это просто код Python, и, к счастью, реализовать это самостоятельно - тривиально.
Вот небольшой пример:
# locustfile.py
from locust import HttpLocust, TaskSet, task
USER_CREDENTIALS = [
("user1", "password"),
("user2", "password"),
("user3", "password"),
]
class UserBehaviour(TaskSet):
def on_start(self):
if len(USER_CREDENTIALS) > 0:
user, passw = USER_CREDENTIALS.pop()
self.client.post("/login", {"username":user, "password":passw})
@task
def some_task(self):
# user should be logged in here (unless the USER_CREDENTIALS ran out)
self.client.get("/protected/resource")
class User(HttpLocust):
task_set = UserBehaviour
min_wait = 5000
max_wait = 60000
Приведенный выше код не будет работать при запуске распределенного Locust, поскольку один и тот же код работает на каждом подчиненном узле, и они не имеют общего состояния. Поэтому вам нужно будет ввести какое-то внешнее хранилище данных, которое подчиненные узлы могут использовать для обмена состояниями (например, PostgreSQL, redis, memcached или что-то еще).
В качестве альтернативы вы можете создать модуль users.py
для хранения информации о пользователях, которая вам нужна в ваших тестовых случаях, в моем примере он содержит email
и cookies
. Затем вы можете вызывать их случайным образом в своих задачах. См. ниже:
# locustfile.py
from locust import HttpLocust, TaskSet, task
from user_agent import *
from users import users_info
class UserBehaviour(TaskSet):
def get_user(self):
user = random.choice(users_info)
return user
@task(10)
def get_siparislerim(self):
user = self.get_user()
user_agent = self.get_user_agent()
r = self.client.get("/orders", headers = {"Cookie": user[1], 'User-Agent': user_agent})
class User(HttpLocust):
task_set = UserBehaviour
min_wait = 5000
max_wait = 60000
Пользователь и пользовательский агент могут быть вызваны функцией. Таким образом, мы могли бы распространить тест среди множества пользователей и различных пользовательских агентов.
# users.py
users_info = [
['[email protected]', 'cookies_created_by_each_user'],
['[email protected]', 'cookies_created_by_each_user'],
['[email protected]', 'cookies_created_by_each_user'],
['[email protected]', 'cookies_created_by_each_user'],
['[email protected]', 'cookies_created_by_each_user'],
['[email protected]', 'cookies_created_by_each_user'],
['[email protected]', 'cookies_created_by_each_user'],
['[email protected]', 'cookies_created_by_each_user'],
['[email protected]', 'cookies_created_by_each_user'],
['[email protected]', 'cookies_created_by_each_user'],
['[email protected]', 'cookies_created_by_each_user']]
При реализации этого для распределенной системы я применил несколько иной подход. Я использовал очень простой фляжный сервер, к которому я обратился во время части on_start TaskSet.
from flask import Flask, jsonify
app = Flask(__name__)
count = 0 #Shared Variable
@app.route("/")
def counter():
global count
count = count+1
tenant = count // 5 + 1
user = count % 5 + 1
return jsonify({'count':count,'tenant':"load_tenant_{}".format(str(tenant)),'admin':"admin",'user':"load_user_{}".format(str(user))})
if __name__ == "__main__":
app.run()
Таким образом, теперь у меня есть конечная точка, которую я могу получить на http://localhost:5000/ на любом хосте, на котором я запускаю этот . Мне просто нужно сделать эту конечную точку доступной для ведомых систем, и мне не придется беспокоиться о дублировании пользователей или о каком-либо эффекте циклического перебора, вызванном ограниченным набором user_info.
Копирование на ответ @heyman здесь. Код примера будет работать, но продолжение запуска / остановки тестов в конечном итоге очистит список USER_CREDENTIALS
и начнет выдавать ошибки.
В итоге я добавил следующее:
from locust import events # in addition to the other locust modules needed
def hatch_complete_handler(**kw):
global USER_CREDENTIALS
USER_CREDENTIALS = generate_users() # some function here to regenerate your list
events.hatch_complete += hatch_complete_handler
Это обновит ваш список пользователей, когда ваш рой завершит вылупление.
Также имейте в виду, что вам понадобится список, длина которого превышает количество пользователей, которых вы хотите создать.