Несмотря на то, что в реализации классы BufferedIOBase
обертывают объект IOBase
, их интерфейс является потоком (все наследуется от IOBase
), поэтому обычное поведение объекта IOBase
заключается в том, чтобы закрыться, когда они выходят за пределы области видимости. BufferedIOBase
просто делегирует вызов close()
базовому потоку.
Вы не должны рассматривать BufferedReader
как оболочку потока (хотя именно так это и реализовано), а как приведение типа существующего потока. Состояние двух потоков полностью связано друг с другом. Однако вы можете развязать обернутый поток с помощью detach()
, но это сделает объект BufferedIOBase
бесполезным.
Кроме того, io.open
уже возвращает BufferedReader
, когда режим равен rb
, поэтому вы выполняете двойную буферизацию. Вместо этого вы должны использовать io.FileIO
.
У вас есть несколько вариантов:
Создайте новый поток и новый базовый файловый дескриптор и передавайте имена файлов вместо потоков. Это самый простой и безопасный вариант.
Создавайте необработанные файловые дескрипторы и создавайте из них потоки по мере необходимости. Это требует некоторой осторожности, чтобы несколько потоков не использовали один и тот же файловый дескриптор одновременно. Например:
fd = os.open('test.txt', os.O_RDONLY)
file1 = FileIO(fd, 'r', closefd=False)
file2 = FileIO(fd, 'r', closefd=False)
file1.read(100)
assert file1.tell() == 100
file2.read(100)
assert file1.tell() == 200
detach()
основной поток до того, как ваш BufferedIOBase
объект закроет свой поток. (Не забудьте перемотать поток!)
def test(streams):
for stream in streams:
b=TestReader(stream)
do_something(b)
wrappedstream = b.detach()
assert wrappedstream is stream
Вы даже можете реализовать это в своем деструкторе:
class TestReader(BufferedReader):
def __del__(self):
self.detach()
# self.raw will not be closed,
# rather left in the state it was in at detachment
Или просто полностью отключите делегирование close()
, если считаете, что семантика неверна:
class TestReader(BufferedReader):
def close(self):
self.closed = True
У меня нет общей картины того, что вы делаете (возможно, вам нужен другой дизайн), но я бы реализовал код, который вижу, так:
from io import FileIO, BufferedReader
import io
import os
class TestReader(BufferedReader):
pass
def test(streams):
for stream in streams:
b = TestReader(stream)
def test_reset(streams):
"""Will try to leave stream state unchanged"""
for stream in streams:
pos = stream.tell()
b = TestReader(stream)
do_something(b)
b.detach()
stream.seek(pos)
filenames = ['test1.txt', 'test2.txt']
# option 1: just make new streams
streams = [FileIO(name, 'r') for name in filenames]
test(streams)
streams = [io.open(name, 'rb') for name in filenames]
#etc
# option 2: use file descriptors
fds = [os.open(name, os.O_RDONLY) for name in filenames]
#closefd = False means "do not close fd on __del__ or __exit__"
#this is only an option when you pass a fd instead of a file name
streams = [FileIO(fd, 'r', closefd=False) for fd in fds]
test(streams)
streams = []
for fd in fds:
os.lseek(fd, 0, os.SEEK_SET)
streams.append(io.open(fd, 'rb', closefd=False))
# you can also .seek(0) on the BufferedReader objects
# instead of os.lseek on the fds
# option 3: detach
streams = [FileIO(name, 'r') for name in filenames]
test_reset(streams)
# streams[*] should still be in the same state as when you passed it in
person
Francis Avila
schedule
25.12.2011