Кэширование сетевого потока в golang?

Я пытаюсь написать кеширующий прокси для видеопотоков в golang.

Мой вопрос: как распределить потоковую копию больших кусков данных между несколькими соединениями?

Или как хранить (кешировать) и безопасно (и быстро) получать доступ к данным из нескольких горутин?

Я пробовал несколько вариантов с мьютексами и каналами, но они не работали. Вот несколько примеров, которые работали с ошибками.

Это упрощенная версия:

...
var clients []*client
func new_client(conn net.Conn) {
    client := &client{
        conn: conn,
    }
    clients = append(clients, client)
}
...
func stream(source io.Reader) {
    buf := make([]byte, 32*1024)
    for {
        n, _ := source.Read(buf)
        for _, client := range clients {
            wn, e := client.conn.Write(buf[0:n])
            // blocks here for all clients if one of clients stops reading
        }
    }
}

Проблема этой версии в том, что когда один клиент перестает читать, но не закрывает соединение, вызов Write() начинает блокироваться. Обертывание вызова Write() в горутине (с блокировками мьютекса на клиенте) не помогло - там такая же задержка, как и с каналами (следующий пример), кроме того, go не гарантирует порядок выполнения горутин.

Я пытался исправить это так:

        for _, client := range clients {
            client.conn.SetWriteDeadline(time.Now().Add(1 * time.Millisecond))
            wn, e := client.conn.Write(buf[0:n])
        }

Помогает с блокировкой, но медленные клиенты не успевают прочитать, увеличивая таймаут - возвращают задержки.

Я также пробовал что-то вроде этого:

...
var clients []*client
func new_client(conn net.Conn) {
    client := &client{
        buf_chan: make(chan []byte, 100),
    }
    clients = append(clients, client)
    for {
        buf <- client.buf_chan
        n, e := client.conn.Write(buf)
    }
}
...
func stream(source io.Reader) {
    buf := make([]byte, 32*1024)
    for {
        n, _ := source.Read(buf)
        for _, client := range clients {
            client.buf_chan <- buf[0:n]
        }
    }
}

Но в этой версии - есть некоторая задержка между отправкой на канал и получением на другом конце, поэтому видеопоток в плеере начинает тормозить, лагать.

Может быть, посоветуете какие-нибудь пакеты в ходу или шаблоны проектирования для такого рода задач?

Спасибо за любую помощь!


person user1579228    schedule 19.12.2014    source источник


Ответы (1)


В канальной версии медленный клиент также может увеличить задержку. Поскольку медленный клиент может заполнить свой buf_chan, запись в его buf_chan будет заблокирована. Wrapper select может избежать этого:

select {
case client.buf_chan <- buf[0:n]:
default:
//handle slow client ...    
}
person qianyong    schedule 21.12.2014
comment
спасибо, я следил за размером буфера канала buf_chan, после чтения он всегда равен 1 (0), я сделал его буферизованным только для проверки, я думаю, что проблема с задержкой где-то еще - person user1579228; 21.12.2014