Кеширане на мрежов поток в golang?

Опитвам се да напиша кеширащ прокси за видео потоци в golang.

Въпросът ми е как да разпределя поточно копие на големи парчета данни между множество връзки?

Или как да съхранявате (кеширате) и безопасно (и бързо) да осъществявате достъп до данни от множество goroutines?

Пробвах няколко варианта, с мютексове и канали, но не се получиха. Ето няколко примера, които работят с грешки.

Това е опростена версия:

...
var clients []*client
func new_client(conn net.Conn) {
    client := &client{
        conn: conn,
    }
    clients = append(clients, client)
}
...
func stream(source io.Reader) {
    buf := make([]byte, 32*1024)
    for {
        n, _ := source.Read(buf)
        for _, client := range clients {
            wn, e := client.conn.Write(buf[0:n])
            // blocks here for all clients if one of clients stops reading
        }
    }
}

Проблемът с тази версия е, че когато един клиент спре да чете, но не затвори връзката, извикването на Write() започва да блокира. Обвивката на Write() в goroutine (със заключвания на mutex на клиента) не помогна - има същото забавяне като при каналите (следващия пример), освен това go не гарантира реда на изпълнение на goroutines.

Опитах се да го поправя така:

        for _, client := range clients {
            client.conn.SetWriteDeadline(time.Now().Add(1 * time.Millisecond))
            wn, e := client.conn.Write(buf[0:n])
        }

Помага при блокирането, но бавните клиенти не могат да четат навреме, увеличавайки времето за изчакване - връща закъснения.

Аз също опитах нещо подобно:

...
var clients []*client
func new_client(conn net.Conn) {
    client := &client{
        buf_chan: make(chan []byte, 100),
    }
    clients = append(clients, client)
    for {
        buf <- client.buf_chan
        n, e := client.conn.Write(buf)
    }
}
...
func stream(source io.Reader) {
    buf := make([]byte, 32*1024)
    for {
        n, _ := source.Read(buf)
        for _, client := range clients {
            client.buf_chan <- buf[0:n]
        }
    }
}

Но в тази версия - има известно забавяне между изпращане към канал и получаване от другия край, така че видеопотокът в плейъра започва да получава забавяния, закъснения.

Може би съвет за някои пакети в go или шаблони за проектиране за този вид задачи?

Благодаря за всяка помощ!


person user1579228    schedule 19.12.2014    source източник


Отговори (1)


Във версията на канала бавният клиент също може да увеличи забавянето. Тъй като бавен клиент може да направи своя buf_chan пълен, тогава писането в неговия buf_chan ще блокира. Wrappper select може да го избегне:

select {
case client.buf_chan <- buf[0:n]:
default:
//handle slow client ...    
}
person qianyong    schedule 21.12.2014
comment
благодаря, наблюдавах размера на буфера на канала buf_chan, винаги е 1 (0) след прочитане, направих го буфериран само за да тествам, мисля, че проблемът със забавянето е някъде другаде - person user1579228; 21.12.2014