RxPY - 转换运算符
-
简述
-
缓冲
该操作符将从可观察源收集所有值,并在满足给定边界条件后定期发出它们。句法
buffer(boundaries)
参数
boundaries:输入是可观察的,它将决定何时停止,以便发出收集的值。返回值
返回值是可观察的,它将具有从源可观察对象收集的所有值,即持续时间由所采用的输入可观察对象决定。例子
from rx import of, interval, operators as op from datetime import date test = of(1, 2,3,4,5,6,7,8,9,10) sub1 = test.pipe( op.buffer(interval(1.0)) ) sub1.subscribe(lambda x: print("The element is {0}".format(x)))
输出
E:\pyrx>python test1.py The elements are [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
-
ground_by
该运算符将根据给定的 key_mapper 函数对来自源 observable 的值进行分组。句法
group_by(key_mapper)
参数
key_mapper:这个函数将负责从源 observable 中提取键。返回值
它返回一个 observable,其值基于 key_mapper 函数分组。例子
from rx import from_, interval, operators as op test = from_(["A", "B", "C", "D"]) sub1 = test.pipe( op.group_by(lambda v: v[0]) ) sub1.subscribe(lambda x: print("The element is {0}".format(x)))
输出
E:\pyrx>python testrx.py The element is <rx.core.observable.groupedobservable.GroupedObservable object at 0x000000C99A2E6550> The element is <rx.core.observable.groupedobservable.GroupedObservable object at 0x000000C99A2E65C0> The element is <rx.core.observable.groupedobservable.GroupedObservable object at 0x000000C99A2E6588> The element is <rx.core.observable.groupedobservable.GroupedObservable object at 0x000000C99A2E6550>
-
map
该运算符将根据给定的 mapper_func 的输出将源 observable 中的每个值更改为新值。句法
map(mapper_func:None)
参数
mapper_func:(可选)它将根据来自此函数的输出更改源 observable 的值。例子
from rx import of, interval, operators as op test = of(1, 2,3,4,5,6,7,8,9,10) sub1 = test.pipe( op.map(lambda x :x*x) ) sub1.subscribe(lambda x: print("The element is {0}".format(x)))
输出
E:\pyrx>python testrx.py The element is 1 The element is 4 The element is 9 The element is 16 The element is 25 The element is 36 The element is 49 The element is 64 The element is 81 The element is 100
-
scan
该运算符将对来自源 observable 的值应用一个累加器函数,并返回一个带有新值的 observable。句法
scan(accumulator_func, seed=NotSet)
参数
accumulator_func:此函数应用于源 observable 中的所有值。seed:(可选)在 accumular_func 中使用的初始值。返回值
该运算符将返回一个可观察对象,该可观察对象将基于应用于源可观察对象的每个值的累加器函数具有新值。例子
from rx import of, interval, operators as op test = of(1, 2,3,4,5,6,7,8,9,10) sub1 = test.pipe( op.scan(lambda acc, a: acc + a, 0)) sub1.subscribe(lambda x: print("The element is {0}".format(x)))
输出
E:\pyrx>python testrx.py The element is 1 The element is 3 The element is 6 The element is 10 The element is 15 The element is 21 The element is 28 The element is 36 The element is 45 The element is 55