RxPY – Transformation Operators

RxPY – Transformation Operators

buffer

This operator collects all values from a source observable and emits them at intervals once a given boundary condition is met.

Syntax

buffer(boundaries)

Parameters

Bounds: The input observable that determines when to stop emitting the collected values.

Return Value

The return value is an observable that holds all values collected from the source observable, over a time interval determined by the input observable.

Example

from rx import of, interval, operators as op
from datetime import date
test = of(1, 2,3,4,5,6,7,8,9,10)
sub1 = test.pipe(
   op.buffer(interval(1.0))
)
sub1.subscribe(lambda x: print("The element is {0}".format(x)))

Output

E:pyrx>python test1.py
The elements are [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

ground_by

This operator groups the values from the source observable according to the given key_mapper function.

Syntax

group_by(key_mapper)

Parameters

key_mapper: This function will be responsible for extracting the keys from the source observable.

Return Value

It returns an observable with the values grouped by the key_mapper function.

Example

from rx import from_, interval, operators as op
test = from_(["A", "B", "C", "D"])
sub1 = test.pipe(
   op.group_by(lambda v: v[0])
)
sub1.subscribe(lambda x: print("The element is {0}".format(x)))

Output

E:pyrx>python testrx.py
The The element is <rx.core.observable.groupedobservable.GroupedObservable object
at
0x000000C99A2E6550>
The element is <rx.core.observable.groupedobservable.GroupedObservable object at
0x000000C99A2E65C0>
The element is <rx.core.observable.groupedobservable.GroupedObservable object at
0x000000C99A2E6588>
The element is <rx.core.observable.groupedobservable.GroupedObservable object at
0x000000C99A2E6550>

map

This operator transforms each value of the source observable into a new value based on the output of the given mapper_func.

Syntax

map(mapper_func:None)

Parameters

mapper_func: (Optional) This function will change the value of the source observation based on its output.

Example

from rx import of, interval, operators as op
test = of(1, 2,3,4,5,6,7,8,9,10)
sub1 = test.pipe(
   op.map(lambda x :x*x)
)
sub1.subscribe(lambda x: print("The element is {0}".format(x)))

Output

E:pyrx>python testrx.py
The element is 1
The element is 4
The element is 9
The element is 16
The element is 25
The element is 36
The element is 49
The element is 64
The element is 81
The element is 100

scan

This operator applies an accumulator function to the values from the source observer and returns an observer with the new values.

Syntax

scan(accumulator_func, seed=NotSet)

Parameters

accumulator_func: The function to be applied to all values of the source observer.

seed: (optional) The initial value to use in accumulator_func.

Return Value

This operator returns an observer with a new value based on the accumulator function applied to each value of the source observer.

Example

from rx import of, interval, operators as op
test = of(1, 2,3,4,5,6,7,8,9,10)
sub1 = test.pipe(
   op.scan(lambda acc, a: acc + a, 0))
sub1.subscribe(lambda x: print("The element is {0}".format(x)))

Output

E:pyrx>python testrx.py
The element is 1
The element is 3
The element is 6
The element is 10
The element is 15
The element is 21
The element is 28
The element is 36
The element is 45
The element is 55

Leave a Reply

Your email address will not be published. Required fields are marked *