автор: Хайдун Лан

Nvidia представила свой GPU-ускоритель Tesla V100, который с тех пор стал обязательной моделью для глубокого обучения, на GTC (GPU Technology Conference) 2017 в Пекине. Именно тогда Дженсен Хуанг, генеральный директор Nvidia, торжественно дал нам самый искренний совет, который долгие годы звучал в наших головах:

И все мы знаем, что произошло потом. Несколько интернет-гигантов собрали в своих центрах обработки данных десятки тысяч чипов V100. Искусственный интеллект (ИИ) становится все более интеллектуальным: распознавание лиц и речи становится более точным; машинный перевод дает более человеческие результаты; Геймеры с искусственным интеллектом побеждают профессиональных игроков в мгновение ока. За кулисами огромные ресурсы исследований и разработок и первоклассный опыт были направлены на разработку распределенных сред машинного обучения, которые автоматически разбивают тяжелую учебную нагрузку на более мелкие задачи для выполнения на нескольких видеокартах.

Можно ли применять распределенные фреймворки к секторам, отличным от машинного обучения? Члены сообщества Taichi часто обнаруживают, что им приходится иметь дело со сверхбольшими сетками для моделирования жидкостей или частиц, где объем данных намного превышает возможности одного графического процессора. Распределенное решение, которое позволяет программе Taichi работать на нескольких картах GPU или даже на нескольких машинах GPU, значительно повысит эффективность крупномасштабных вычислений.

Что ж, это доказывает, что параллельные вычисления и распределенные вычисления могут быть идеально объединены и аккуратно реализованы в нескольких строках кода, и этот блог демонстрирует, как это сделать. Спойлер: секрет кроется в Taichi и MPI4Py.

Быстрый старт с MPI4Py

MPI (интерфейс передачи сообщений) — это стандартизированный коммуникационный интерфейс, созданный более двух десятилетий назад и с тех пор постоянно совершенствующийся. Он позволяет тысячам компьютеров взаимодействовать и координировать друг с другом, передавать данные и выполнять крупномасштабные вычисления. Следовательно, вы не можете обойти MPI, пока используете суперкомпьютеры. Внедрить программу MPI с нуля непросто. К счастью, MPI доступен через пакет Python, MPI4Py, и следующий фрагмент показывает самую простую программу MPI4Py:

# mpi_hello.py
from mpi4py import MPI

comm = MPI.COMM_WORLD
print(f"Hello from MPI worker {comm.rank}, the MPI world has {comm.size} peers.")

Этот фрагмент кода содержит наиболее важную концепцию MPI — MPI_COMM_WORLD, представленную в строке 4. Все рабочие процессы, созданные MPI, однозначно идентифицируются с помощью rank, независимо от того, размещены они на одном устройстве или нет. Например, rank=0 относится к первому работнику, rank=1 — ко второму и так далее. Общее количество рабочих хранится в comm.size. Оператор print будет выполняться каждым работником отдельно, печатая свою личность comm.rank в MPI_COMM_WORLD, а также общую сумму comm.size.

Чтобы запустить эту программу, добавьте mpirun перед командой python следующим образом. -np указывает, сколько рабочих должно быть запущено, а --hosts называет машины, участвующие в распределенных вычислениях.

mpirun -np 8 --hosts node1,node2 python mpi_hello.py

Если все пройдет гладко, вы должны увидеть следующую информацию:

Это все, что нам нужно знать об основах MPI4Py, чтобы понять следующие разделы. Для получения дополнительной информации о пакете Python см. эту страницу. Теперь мы можем перейти к простой программе Taichi и изучить, как ее можно разделить на несколько небольших задач и запустить на нескольких машинах с помощью MPI4Py.

Простая программа Тайчи

Давайте реализуем простой решатель Пуассона в Taichi. Решатель Пуассона решает следующее уравнение Лапласа:

Граничное условие естественно задается как f(x,y) = 0.

Вам не нужно быть кристально ясным в отношении обоснования этого решателя. По сути, это ядро ​​Taichi для трафаретных вычислений. Написать такое ядро ​​более чем просто:

x = ti.field(dtype=float, shape=(N + 2, N + 2))
xt = ti.field(dtype=float, shape=(N + 2, N + 2))

@ti.kernel                                                                                                                          
def substep():                                                                                                                  
    for i,j in ti.ndrange((1, N+1), (1, N+1)):                                                                            
        xt[i,j] = (-b[i,j]*dx**2+x[i+1,j]+x[i-1,j]+x[i,j+1]+x[i,j-1]) / 4.0                  
    for I in ti.grouped(x):                                                                                                    
        x[I] = xt[I]

Поле Taichi x формы (N+2) x (N+2) определено как представление f. + 2 обеспечивает заполнение, чтобы избежать ошибочной индексации за пределами границ. Вычисление каждой точки зависит от ее четырех соседних соседей, как показано на следующем рисунке:

Мы можем постепенно аппроксимировать решение f, многократно вызывая substep.

Теперь, когда у нас есть программа Taichi, которая в настоящее время использует один процессор и одно устройство, мы готовы представить MPI4Py и распространить эту программу на несколько машин.

Исходная версия MPI: Устранение зависимостей данных и достижение распределенных вычислений

Первым шагом в распространении на несколько процессоров является разделение данных. Есть два основных момента: форма разделов данных и зависимости данных между разделами.

В нашем случае простой подход к разбиению данных состоит в том, чтобы разделить поле x на четыре горизонтально выровненных подполя. Вычисление внутренних ячеек в каждом подполе не зависит от других подполей, поскольку функция substep включает только четыре соседа точки. Однако два смежных подполя имеют общую границу, где возникают зависимости данных. Мы также резервируем дополнительное пространство для каждого подполя (серые области на изображении ниже) для хранения данных границ, совместно используемых со смежными подполями. MPI синхронизирует пограничные данные между несколькими машинами.

После разделения MPI запускает несколько рабочих процессов. Подполе видно только в работнике, которому оно назначено. Мы должны соответствующим образом изменить объявление поля:

n = N // 4
x = ti.field(dtype=float, shape=(N + 2, n + 2))

x больше не является полным полем, как определено в последнем разделе. Вместо этого теперь оно представляет собой подполе, выделенное одному из четырех рабочих процессов. Другими словами, x различается у разных рабочих.

MPI4Py поддерживает использование ndarray NumPy в качестве буферных областей передачи данных, если передаваемые массивы NumPy непрерывны в памяти. Мы можем обмениваться данными между полями Taichi и массивами NumPy, используя функции to_numpy и from_numpy.

В следующем фрагменте кода показано, как отправить правую границу подполя его правой рабочей части:

comm = MPI.COMM_WORLD

x_arr = x.to_numpy()
x_right = np.ascontiguousarray(x_arr[:, -2])

if comm.rank < num_workers - 1:
    comm.Send(x_right, dest=comm.rank+1)

Предположим, что четыре подполя X0, X1, X2 и X3 соответствуют Работникам 0, 1, 2 и 3 соответственно. comm.rank+1 обозначает работника справа от текущего подполя. Оператор if гарантирует, что только рабочие процессы 0, 1 и 2 отправляют данные своему соседу справа, поскольку рабочий процесс 3 является самым правым.

Нам также нужно использовать пары recv и send; в противном случае коммуникационный тупик возникает без предупреждения, когда MPI не может определить местонахождение получателя данных. Следующий фрагмент кода отвечает за логику приема, т. е. Recv.

shadow_left = np.zeros(N + 2)
if comm.rank > 0:
    comm.Recv(shadow_left, source=comm.rank-1)

# Fill in the left edge
x_arr[:, 0] = shadow_left
x.from_numpy(x_arr)

shadow_left — буферная область для хранения полученных данных.

Логика приема — это обратная версия отправки данных: каждый рабочий процесс получает данные от своего левого соседа и заполняет данными левую границу своего подполя. Имейте в виду, что Worker 0 не получает никаких данных, поскольку он самый левый, и оператор if выше должен гарантировать, что только Worker 1, 2 и 3 удовлетворяют условию получения данных.

Отправка и получение граничных данных в обратном направлении аналогичны процессу, описанному выше, поэтому я не буду углубляться в детали. Полный код можно найти здесь.

Теперь наша программа может работать на нескольких машинах, но с неудовлетворительной производительностью из-за следующих проблем:

  1. Вызов field.to_numpy() преобразует все поле в массив NumPy, но необходимо передать только граничные данные. Поэтому мы можем копировать данные в меньшем масштабе.
  2. В настоящее время каждое подполе не перейдет к следующей итерации, пока Recv не получит данные. Но на самом деле им не нужно ждать передачи данных, так как большая часть вычислений происходит внутри.

Давайте решать проблемы одну за другой.

Оптимизация 01: копирование данных границ с помощью Taichi

В исходной версии мы сначала копируем все поле в массив NumPy, а затем нарезаем массив, чтобы извлечь и передать данные границы. Но нам не нужно оперировать все поле; вместо этого мы можем точно определить границы с помощью нескольких аккуратно написанных ядер Taichi. В следующем фрагменте кода показано, как заполнить данные границы в серых областях:

@ti.kernel
def fill_shadow(shadow_left : ti.types.ndarray(), 
                shadow_right : ti.types.ndarray()):
    for i in ti.ndrange((0, N + 2)):
        x[i, 0] = shadow_left[i]
        x[i, n+1] = shadow_right[i]

shadow_left и shadow_right — буферы NumPy для приема данных. Они могут быть переданы непосредственно ядрам Taichi без каких-либо дополнительных накладных расходов, кроме передачи данных CPU-GPU, или вообще без дополнительных накладных расходов, если вы используете серверную часть ЦП, где ядра Taichi напрямую работают с массивами NumPy.

Оптимизация 02: Асинхронная связь

В большинстве случаев трафаретных вычислений есть только небольшая область, подверженная зависимостям данных. В этом решателе Пуассона внутренние ячейки подполей не зависят от граничных ячеек и, таким образом, не играют никакой роли в коммуникации. Мы можем разделить substep на две функции, чтобы работать с ними по отдельности, т. е. substep_edge для вычисления граничных ячеек и substep_bulk для внутренних ячеек, как показано ниже.

Таким образом, внутренние ячейки могут быть вычислены параллельно с обменом данными, что позволяет значительно сократить накладные расходы. Чтобы перекрыть связь с вычислениями, нам нужно немного изменить реализацию, заменив блокирующие примитивы Send и Recv на неблокирующие Isend и Irecv. Последняя пара не блокирует выполнение последующих операторов, пока не встретит wait. Нам также нужен дополнительный параметр tag для маркировки данных, отправленных и полученных в каждой группе сообщений.

Соответственно, коммуникационный код для отправки данных должен быть изменен следующим образом:

if comm.rank < num_workers - 1:
    comm.Isend(edge_right, dest=comm.rank+1, tag=11)

И принимающая часть также обновляется:

if comm.rank > 0:
    comm.Irecv(shadow_left, source=comm.rank-1, tag=11).Wait()

Чтобы уменьшить общие накладные расходы, мы вызываем substep_bulk после отправки данных и вызываем substep_edge после завершения связи.

Процесс вычислений и связи завершается следующим образом:

def step():
    mpi_send_edges() # Trigger communication
    substep_bulk() # Compute the bulk
    mpi_recv_edges() # Wait for communications
    substep_edge(shadow_left, shadow_right) # Compute the edges

При такой оптимизации время вычисления substep_bulk и время связи MPI перекрываются друг с другом, следовательно, получается более компактная временная шкала и приличный прирост производительности.

Оценка эффективности

Чтобы оценить и сравнить производительность различных реализаций, я настроил среду MPI, соединив две рабочие станции с GPU напрямую с одним и тем же маршрутизатором. Один (node0) оснащен одним графическим процессором Nvidia RTX3080 и процессором Intel i9-11900k, а другой (node1) — GTX1080Ti и E3-1230v5. Оба установили систему Ubuntu 20.04.

Я тестировал поля размером 8192 × 8192 на подключенных машинах и оценивал производительность процессора и графического процессора соответственно. С Taichi мы можем легко переключаться между реализациями CPU и GPU, просто указав arch=ti.cpu или arch=ti.gpu при инициализации.

В идеале распределенные вычисления могут обеспечить линейную масштабируемость, т. е. прирост производительности пропорционально количеству добавленных машин.

Каков же разрыв между нашей оптимизированной реализацией и идеальной линейной масштабируемостью?

  • Реализации ЦП

Мы можем наблюдать значительную разницу в производительности ЦП двух компьютеров, как показано оранжевыми столбцами. Поскольку поля разделены поровну, node0 нужно ждать node1 на каждом шаге вычисления. Как показывают данные, производительность полностью оптимизированной версии MPI примерно в два раза выше, чем у node1. Если предположить, что на двух машинах установлена ​​одна и та же модель процессора, наша реализация MPI может идеально обеспечить линейную масштабируемость.

  • Реализации графического процессора

Исходная реализация MPI на бэкенде GPU даже отстает от версий CPU из-за дополнительных накладных расходов на передачу данных CPU-GPU. После оптимизации с помощью копирования данных и асинхронного обмена данными с помощью Taichi производительность резко возросла почти в два раза по сравнению с node1 (171,2 / 89,68 = 1,91). Разгон приближается к идеальному сценарию.

Если мы хотим добиться линейной масштабируемости на аппаратных платформах с большими различиями в производительности, нам нужно переразбить поля в неравных пропорциях. Однако это выходит за рамки этого блога, и я прекращу обсуждение здесь.

Заключение

В этом блоге я показываю, как использовать Taichi и MPI4Py для реализации параллельных и распределенных вычислений. Полная программа занимает всего около 100 строк кода Python, но обеспечивает удовлетворительную масштабируемость. Надеюсь, этот блог может вдохновить тех, кто сталкивается с крупномасштабными численными вычислениями. Тем не менее, я коснулся здесь самого основного случая. Реальные программы MPI могут быть намного сложнее, особенно когда речь идет о случайном доступе к данным или сложных зависимостях данных, которые трудно решить.

Если вы столкнулись с какими-либо трудностями при использовании MPI и Taichi или у вас есть лучшие решения, поделитесь ими с нами в слабом канале или в нашем сообществе субреддита!

Код ссылки: https://github.com/turbo0628/Taichi-MPI/tree/main/poisson