简述
RxPy 的一个重要特性是并发,即允许任务并行执行。为了实现这一点,我们有两个运算符 subscribe_on() 和 observe_on() 将与调度程序一起工作,这将决定订阅任务的执行。
这是一个工作示例,显示了对 subscibe_on()、observe_on() 和调度程序的需求。
例子
在上面的例子中,我有 2 个任务:任务 1 和任务 2。任务的执行是按顺序执行的。第二个任务只有在第一个任务完成后才开始。
输出
RxPy 支持很多 Scheduler,这里我们将使用 ThreadPoolScheduler。ThreadPoolScheduler 主要会尝试管理可用的 CPU 线程。
在我们之前看到的示例中,我们将使用一个多处理模块,该模块将为我们提供 cpu_count。计数将提供给 ThreadPoolScheduler,它将根据可用线程设法使任务并行工作。
这是一个工作示例 -
在上面的例子中,我有 2 个任务,cpu_count 是 4。因为任务是 2,我们可用的线程是 4,所以这两个任务可以并行启动。
输出
如果您看到输出,则这两个任务已并行启动。
现在,考虑一个场景,其中任务超过 CPU 计数,即 CPU 计数为 4,任务为 5。在这种情况下,我们需要检查任务完成后是否有任何线程空闲,因此,它可以是分配给队列中可用的新任务。
为此,我们可以使用 observe_on() 操作符,它会观察调度器是否有空闲线程。这是一个使用 observe_on() 的工作示例
例子
输出
如果您看到输出,则在任务 4 完成的那一刻,线程被分配给下一个任务,即任务 5,并且同样开始执行。