Опитвам се да напиша кеширащ прокси за видео потоци в golang.
Въпросът ми е как да разпределя поточно копие на големи парчета данни между множество връзки?
Или как да съхранявате (кеширате) и безопасно (и бързо) да осъществявате достъп до данни от множество goroutines?
Пробвах няколко варианта, с мютексове и канали, но не се получиха. Ето няколко примера, които работят с грешки.
Това е опростена версия:
...
var clients []*client
func new_client(conn net.Conn) {
client := &client{
conn: conn,
}
clients = append(clients, client)
}
...
func stream(source io.Reader) {
buf := make([]byte, 32*1024)
for {
n, _ := source.Read(buf)
for _, client := range clients {
wn, e := client.conn.Write(buf[0:n])
// blocks here for all clients if one of clients stops reading
}
}
}
Проблемът с тази версия е, че когато един клиент спре да чете, но не затвори връзката, извикването на Write() започва да блокира. Обвивката на Write() в goroutine (със заключвания на mutex на клиента) не помогна - има същото забавяне като при каналите (следващия пример), освен това go не гарантира реда на изпълнение на goroutines.
Опитах се да го поправя така:
for _, client := range clients {
client.conn.SetWriteDeadline(time.Now().Add(1 * time.Millisecond))
wn, e := client.conn.Write(buf[0:n])
}
Помага при блокирането, но бавните клиенти не могат да четат навреме, увеличавайки времето за изчакване - връща закъснения.
Аз също опитах нещо подобно:
...
var clients []*client
func new_client(conn net.Conn) {
client := &client{
buf_chan: make(chan []byte, 100),
}
clients = append(clients, client)
for {
buf <- client.buf_chan
n, e := client.conn.Write(buf)
}
}
...
func stream(source io.Reader) {
buf := make([]byte, 32*1024)
for {
n, _ := source.Read(buf)
for _, client := range clients {
client.buf_chan <- buf[0:n]
}
}
}
Но в тази версия - има известно забавяне между изпращане към канал и получаване от другия край, така че видеопотокът в плейъра започва да получава забавяния, закъснения.
Може би съвет за някои пакети в go или шаблони за проектиране за този вид задачи?
Благодаря за всяка помощ!