RxPY – Operators
RxPY – Operators
This chapter explains the operators in RxPY in detail. These operators include
- Using Operators
- Mathematical Operators
- Transformation Operators
- Filter Operators
- Error Handling Operators
- Utility Operators
- Conditional Operators
- Creation Operators
- Connectable Operators
- Combining Operators
Reactive (Rx) Python has many operators that make your life as a Python programmer much easier. You can use multiple operators simultaneously. For example, when working with strings, you can use the map, filter, and merge operators.
Using Operators
You can use multiple operators together using the pipe() method. This method allows you to chain multiple operators together.
Below is an example of using operators.
test = of(1,2,3) // an observable
subscriber = test.pipe(
op1(),
op2(),
op3()
)
In the example above, we use the of() method to create an observable that receives the values 1, 2, and 3. Now, you can use any number of operators on this observable, as shown above, to perform different operations using the pipe() method. The operators will be executed sequentially on the given observable.
To use the operator, first import it, as shown below.
from rx import of, operators as op
Here’s a working example –
testrx.py
from rx import of, operators as op
test = of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
sub1 = test.pipe(
op.filter(lambda s: s%2==0),
op.reduce(lambda acc, x: acc + x)
)
sub1.subscribe(lambda x: print("Sum of Even numbers is {0}".format(x)))
In the above example, we have a list of numbers. We use the filter operator to filter out even numbers, and then use the reduce operator to add them.
Output
E:pyrx>python testrx.py
Sum of even numbers is 30
Below is a list of the operators we will discuss.
- Creating Observables
- Mathematical Operators
- Transformation Operators
- Filter Operators
- Error Handling Operators
- Utility Operators
- Conditional
- Connectable
- Combination Operators
Creating Observables
Below are the observables we will discuss in the Create category.
Observables | Description |
---|---|
create | This method is used to create an observable. |
empty | This observable will not output anything and will immediately emit the complete state. |
never | This method creates an observable that will never reach a complete state. |
throw | This method creates an observable that will throw an error. |
from_ | This method converts the given array or object into an observable. |
interval | This method generates a series of values after a timeout. |
just | This method converts a given value into an Observable. |
range | This method generates a range of integers based on the given input value. |
repeat_value | This method creates an Observable that repeats the given value with a given count. |
start | This method takes a function as input and returns an Observable that returns the value from the function. |
timer | This method emits values sequentially after a timeout. |
Mathematical Operators
The operators we will discuss in the mathematical operators category are as follows:
Operator | Description |
---|---|
average | This operator calculates the average from the given source observations and outputs a single observation with the average. |
concat | This operator takes two or more observations and produces a single observation with all the values in the sequence. |
count | This operator takes an Observable with values and transforms it into an Observable with a single value. The count function accepts a predicate function as an optional parameter. This function is of Boolean type and adds a value to the output only if the condition is met. |
max | This operator returns an observation variable with the maximum value from the source observation variable. |
min | This operator returns an observation variable with the minimum value from the source observation variable. |
reduce | This operator takes a function, called an accumulator, which is applied to the values from the source observer and returns the accumulated value as an observer, with an optional seed value passed to the accumulator function. |
sum | This operator returns an observation variable with the sum of all the values from the source observation variable. |
Transformation Operators
The operators we will discuss in the Transformation Operators category are as follows –
Operator | Category |
---|---|
buffer | This operator collects all values from a source observable and emits them at regular intervals once a given boundary condition is met. |
ground_by | This operator groups values from a source observable according to a given key_mapper function. |
map | This operator transforms each value from a source observable into a new value based on the output of a given mapper_func. |
scan | This operator applies an accumulator function to the values from the source observer and returns an observer with the new values. |
Filter Operators
The following categories of filter operators are discussed:
Operator | Category |
---|---|
debounce | This operator returns the values of the source observer up to a given time range, ignoring the rest of the time. |
distinct | This operator returns all values that are distinct from the source observer. |
element_at | This operator returns an element from the source observable at the given index. |
filter | This operator filters values from the source observable according to the given predicate function. |
first | This operator returns the first element from the source observable. |
ignore_elements | This operator ignores all values from the source observable and only calls the completion or error callback. |
last | This operator returns the last element from the source observable. |
skip | This operator takes an observable that skips the first occurrence of a counted item. |
skip_last | This operator returns an observable that skips the last occurrence of a counted item. |
take | This operator takes a list of source values in consecutive order, given a counted number. |
take_last | This operator takes a list of source values in consecutive order, given a counted number. |
Error Handling Operators
The operators we will discuss in the Error Handling Operators category are: –
Operator | Description |
---|---|
catch | This operator terminates the source observer when an exception occurs. |
retry | This operator retries on the source observer when an error occurs and terminates once the number of retries has been completed. |
Utility Operators
The following operators are from the Utility Operators category that we will discuss.
Operator | Description |
---|---|
delay | This operator delays the emission of a source observable by a given time or date. |
materialize | This operator transforms the value of a source observable into a value emitted as a specific notification value. |
time_interval | This operator returns the elapsed time between values from a source observable. |
timeout | This operator returns all values from a source observable after the elapsed time, otherwise an error is raised. |
timestamp | This operator will append a timestamp to all values of the source observable. |
Conditional and Boolean Operators
The operators we will discuss in the Conditional and Boolean Operators category are as follows –
Operator | Description |
---|---|
all | This operator will check if all values of the source observable satisfy the given condition. |
contains | If the given value exists and is a value of the source observable, this operator will return an observer with the value true or false. |
default_if_empty | If the source observer is empty, this operator returns a default value. |
sequence_equal | This operator compares two sequences of observations or arrays of values and returns an observation that evaluates to true or false. |
skip_until | This operator discards the value of the source observer until the second observer emits a value. |
skip_while | This operator returns an observer from the source observer with the value that satisfies the passed condition. |
take_until | This operator discards the value from the source observer after the second observer emits a value or terminates. |
take_while | This operator discards the value from the source observer if the condition fails. |
Connectable Operators
The operator in the connectable operator category we will discuss is −
Operator | Description |
---|---|
publish | This method converts an observable into a connectable observable. |
ref_count | This operator will make the observer a normal observer. |
replay | This method works similarly to replaySubject. This method will return the same value even if the observer has already emitted and some subscribers have not subscribed yet. |
Combination Operators
The following operators are discussed in the Combination Operators category.
Operator | Description |
---|---|
combine_latest | This operator creates a tuple of the observation variables it takes as input. |
merge | This operator merges the given observables. |
start_with | This operator takes a given value and appends it to the beginning of the source observable, returning the complete sequence. |
zip | This operator returns an observable with a tuple of values, where the tuple is formed from the first value of the given observable, and so on. |