Custom Combine RateLimitedScheduler

Я пытаюсь реализовать планировщик ограничения скорости в сочетании. Документы плохо подходят для Scheduler, поэтому я делаю некоторые предположения о том, как они работают внутри. Я предполагаю, что функция schedule вызывается для каждого события в потоке, но это не то, что я наблюдаю на практике. Я вижу только пару вызовов этой функции в потоке многих других событий.

Вот моя реализация, которая сейчас практически ничего не делает и позволяет всем событиям проходить ...

public class RateLimitedScheduler: Scheduler {
    public typealias SchedulerTimeType = DispatchQueue.SchedulerTimeType
    public typealias SchedulerOptions = DispatchQueue.SchedulerOptions
    
    // MARK: API
    
    public var now: SchedulerTimeType { queue.now }
    public var minimumTolerance: SchedulerTimeType.Stride { queue.minimumTolerance }

    public func schedule(options: SchedulerOptions? = nil, _ action: @escaping () -> Void) {
        queue.schedule(after: nextDeadline(), tolerance: .init(floatLiteral: period / 10), options: options, action)
    }

    public func schedule(after date: SchedulerTimeType, tolerance: SchedulerTimeType.Stride, options: SchedulerOptions? = nil, _ action: @escaping () -> Void) {
        let deadline = max(nextDeadline(), date)
        queue.schedule(after: deadline, tolerance: tolerance, options: options, action)
    }

    public func schedule(after date: SchedulerTimeType,
                  interval: SchedulerTimeType.Stride,
                  tolerance: SchedulerTimeType.Stride,
                  options: SchedulerOptions? = nil,
                  _ action: @escaping () -> Void) -> Cancellable {
        let deadline = max(nextDeadline(), date)
        return queue.schedule(after: deadline, interval: interval, tolerance: tolerance, options: options, action)
    }
    
    // MARK: Initialization
    
    public init(maxEvents: Int, period: TimeInterval, queue: DispatchQueue = .main) {
        self.maxEvents = maxEvents
        self.period = period
        self.queue = queue
    }
    
    // MARK: Private
    
    private let maxEvents: Int
    private let period: TimeInterval
    private let queue: DispatchQueue

    private var eventCount = 0
    private var windowStartTime = DispatchTime.now()

    private func nextDeadline() -> SchedulerTimeType {
        let now = DispatchTime.now()

        if eventCount < maxEvents {
            eventCount += 1
            return SchedulerTimeType(max(windowStartTime, now))
        }

        let nextStartTime = windowStartTime + period
        eventCount = 1
        
        if now > nextStartTime {
            windowStartTime = now
            return SchedulerTimeType(max(windowStartTime, now))
        }
        
        windowStartTime = nextStartTime
            
        return SchedulerTimeType(max(nextStartTime, now))
    }
}

И для проверки:

let cancellable = Array(repeating: 0, count: 20).publisher
    .map { $0 * 1 }
    .subscribe(on: RateLimitedScheduler(maxEvents: 5, period: 1))
    .sink(receiveValue: { _ in
        print(Date())
    })

Мой алгоритм для nextDeadline определенно работает, поскольку я тестировал его вне планировщика, так что проблема не в этом.

Итак, мои вопросы:

  1. Верно ли мое предположение, что каждое событие должно запускать функцию расписания в планировщике? И если нет, то почему?
  2. Может ли кто-нибудь помочь разобраться в этом или предоставить мне альтернативу?

person doovers    schedule 29.01.2021    source источник
comment
Я не реализовал Scheduler, поэтому не могу вам помочь, но посмотрите, поможет ли это создать ограничитель скорости: stackoverflow.com/ a / 62542206/968155   -  person New Dev    schedule 29.01.2021
comment
Чего вы пытаетесь достичь с помощью этого планировщика? Я спрашиваю, потому что этого можно было бы достичь без настраиваемого планировщика.   -  person LuLuGaGa    schedule 29.01.2021
comment
@LuLuGaGa Я пытаюсь ограничить количество запросов до 5. в секунду, например, в объединенном потоке   -  person doovers    schedule 29.01.2021
comment
@NewDev Хороший трюк! Спасибо.   -  person doovers    schedule 29.01.2021
comment
Мне все еще любопытно понять, почему планировщик не работает, хотя   -  person doovers    schedule 29.01.2021
comment
.debounce(for: 0.2, scheduler: RunLoop.main) или подобное не решит вашу проблему?   -  person LuLuGaGa    schedule 29.01.2021
comment
@LuLuGaGa, если я не ошибаюсь, debounce сбрасывает значения, а не буферизует и излучает с постоянной скоростью   -  person New Dev    schedule 29.01.2021
comment
@doovers, не стесняйтесь проголосовать за мой другой ответ;)   -  person New Dev    schedule 29.01.2021
comment
Из документации: используйте оператор debounce (for: scheduler: options :), чтобы контролировать количество значений и время между доставкой значений от вышестоящего издателя. Этот оператор полезен для обработки пакетных или больших потоков событий, когда вам нужно уменьшить количество значений, доставляемых в нисходящий поток, до указанной вами скорости.   -  person LuLuGaGa    schedule 29.01.2021
comment
@NewDev Мой плохой забыл проголосовать за !!   -  person doovers    schedule 29.01.2021
comment
@LuLuGaGa Да, я могу подтвердить debounce, и дроссельная заслонка снизит значения. Кроме того, идея состоит в том, чтобы разрешить прохождение всех элементов для определенного окна, а затем отложить до следующего окна.   -  person doovers    schedule 29.01.2021