Многопроцессорная загрузка на FTP с точным числом подключений

Итак, я смог использовать многопроцессорность для одновременной загрузки нескольких файлов на заданный сервер со следующими двумя функциями:

import ftplib,multiprocessing,subprocess

def upload(t):
    server=locker.server,user=locker.user,password=locker.password,service=locker.service #These all just return strings representing the various fields I will need.
    ftp=ftplib.FTP(server)
    ftp.login(user=user,passwd=password,acct="")
    ftp.storbinary("STOR "+t.split('/')[-1], open(t,"rb"))
    ftp.close() # Doesn't seem to be necessary, same thing happens whether I close this or not

def ftp_upload(t=files,server=locker.server,user=locker.user,password=locker.password,service=locker.service):
    parsed_targets=parse_it(t)
    ftp=ftplib.FTP(server)
    ftp.login(user=user,passwd=password,acct="")
    remote_files=ftp.nlst(".")
    ftp.close()
    files_already_on_server=[f for f in t if f.split("/")[-1] in remote_files]
    files_to_upload=[f for f in t if not f in files_already_on_server]
    connections_to_make=3 #The maximum connections allowed the the server is 5, and this error will pop up even if I use 1
    pool=multiprocessing.Pool(processes=connections_to_make)
    pool.map(upload,files_to_upload)

Моя проблема в том, что я (очень регулярно) получаю такие ошибки, как:

File "/usr/lib/python2.7/multiprocessing/pool.py", line 227, in map
    return self.map_async(func, iterable, chunksize).get()
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 528, in get
    raise self._value
ftplib.error_temp: 421 Too many connections (5) from this IP

Примечание. Также иногда возникает ошибка тайм-аута, но я жду, когда она снова поднимет свою уродливую голову, и в этот момент я опубликую ее.

Я не получаю эту ошибку, когда использую командную строку (например, «ftp -inv», «open SERVER», «userNAME PASSWORD», «mput *.rar»), даже если у меня есть (например) 3 экземпляра этого работает сразу.

Я прочитал документацию по ftplib и multiprocessing и не могу понять, что вызывает эти ошибки. Это некоторая проблема, потому что я регулярно делаю резервные копии большого количества данных и большого количества файлов.

  1. Можно ли каким-то образом избежать этих ошибок или есть другой способ сделать это с помощью сценария?
  2. Есть ли способ сказать скрипту, что если у него есть эта ошибка, он должен подождать секунду, а затем возобновить свою работу?
  3. Есть ли способ, которым я могу заставить скрипт загружать файлы в том же порядке, в котором они находятся в списке (конечно, разница в скорости будет означать, что они не всегда будут 4 последовательными файлами, но на данный момент порядок кажется в основном случайным)?
  4. Может ли кто-нибудь объяснить, почему/как одновременно выполняется больше подключений к этому серверу, чем требует сценарий?

Итак, кажется, что просто обработка исключений работает (за исключением случайной ошибки рекурсии... до сих пор понятия не имею, что, черт возьми, там происходит).

Согласно № 3, я не искал, чтобы это было на 100% в порядке, только чтобы сценарий выбирал следующий файл в списке для загрузки (поэтому различия в скоростях процессов могли / все равно могли привести к тому, что порядок не будет полностью последовательной, было бы меньше изменчивости, чем в нынешней системе, которая кажется почти неупорядоченной).


person Robin Hood    schedule 24.02.2013    source источник
comment
Возможно, библиотека автоматически создает одновременные соединения для повышения производительности. Я опубликовал подробный ответ об этой проблеме в целом, который может быть полезен   -  person Armin Šupuk    schedule 16.07.2017


Ответы (2)


Вы можете попробовать использовать один экземпляр ftp для каждого процесса:

def init(*credentials):
    global ftp
    server, user, password, acct = credentials
    ftp = ftplib.FTP(server)
    ftp.login(user=user, passwd=password, acct=acct)

def upload(path):
    with open(path, 'rb') as file:
        try:
            ftp.storbinary("STOR " + os.path.basename(path), file)
        except ftplib.error_temp as error: # handle temporary error
            return path, error
        else:
            return path, None

def main():
    # ...
    pool = multiprocessing.Pool(processes=connections_to_make,
                                initializer=init, initargs=credentials)
    for path, error in pool.imap_unordered(upload, files_to_upload):
        if error is not None:
           print("failed to upload %s" % (path,))
person jfs    schedule 24.02.2013

конкретно отвечая (2) Есть ли способ сказать скрипту, что если у него есть эта ошибка, он должен подождать секунду, а затем возобновить работу?

Да.

ftplib.error_temp: 421 Too many connections (5) from this IP

Это исключение. Вы можете поймать его и справиться с ним. Хотя python не поддерживает хвостовые вызовы, так что это ужасная форма, она может быть такой простой:

def upload(t):
    server=locker.server,user=locker.user,password=locker.password,service=locker.service #These all just return strings representing the various fields I will need.
    try:
        ftp=ftplib.FTP(server)
        ftp.login(user=user,passwd=password,acct="")
        ftp.storbinary("STOR "+t.split('/')[-1], open(t,"rb"))
        ftp.close() # Doesn't seem to be necessary, same thing happens whether I close this or not
    except ftplib.error_temp:
        ftp.close()
        sleep(2)
        upload(t)

Что касается вашего вопроса (3), если это то, что вы хотите, выполняйте загрузку последовательно, а не параллельно.

Я с нетерпением жду, когда вы опубликуете обновление с ответом на (4). Единственное, что мне приходит в голову, это какой-то другой процесс с ftp-подключением к этому IP.

person jrwren    schedule 24.02.2013