RxPY – Operators

RxPY – Operators

This chapter explains the operators in RxPY in detail. These operators include

  • Using Operators
  • Mathematical Operators
  • Transformation Operators
  • Filter Operators
  • Error Handling Operators
  • Utility Operators
  • Conditional Operators
  • Creation Operators
  • Connectable Operators
  • Combining Operators

Reactive (Rx) Python has many operators that make your life as a Python programmer much easier. You can use multiple operators simultaneously. For example, when working with strings, you can use the map, filter, and merge operators.

Using Operators

You can use multiple operators together using the pipe() method. This method allows you to chain multiple operators together.

Below is an example of using operators.

test = of(1,2,3) // an observable
subscriber = test.pipe(
op1(),
op2(),
op3()
)

In the example above, we use the of() method to create an observable that receives the values 1, 2, and 3. Now, you can use any number of operators on this observable, as shown above, to perform different operations using the pipe() method. The operators will be executed sequentially on the given observable.

To use the operator, first import it, as shown below.

from rx import of, operators as op

Here’s a working example –

testrx.py

from rx import of, operators as op
test = of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
sub1 = test.pipe(
op.filter(lambda s: s%2==0),
op.reduce(lambda acc, x: acc + x)
)
sub1.subscribe(lambda x: print("Sum of Even numbers is {0}".format(x)))

In the above example, we have a list of numbers. We use the filter operator to filter out even numbers, and then use the reduce operator to add them.

Output

E:pyrx>python testrx.py
Sum of even numbers is 30

Below is a list of the operators we will discuss.

  • Creating Observables
  • Mathematical Operators
  • Transformation Operators
  • Filter Operators
  • Error Handling Operators
  • Utility Operators
  • Conditional
  • Connectable
  • Combination Operators

Creating Observables

Below are the observables we will discuss in the Create category.

Observables Description
create This method is used to create an observable.
empty This observable will not output anything and will immediately emit the complete state.
never This method creates an observable that will never reach a complete state.
throw This method creates an observable that will throw an error.
from_ This method converts the given array or object into an observable.
interval This method generates a series of values after a timeout.
just This method converts a given value into an Observable.
range This method generates a range of integers based on the given input value.
repeat_value This method creates an Observable that repeats the given value with a given count.
start This method takes a function as input and returns an Observable that returns the value from the function.
timer This method emits values sequentially after a timeout.

Mathematical Operators

The operators we will discuss in the mathematical operators category are as follows:

Operator Description
average This operator calculates the average from the given source observations and outputs a single observation with the average.
concat This operator takes two or more observations and produces a single observation with all the values in the sequence.
count This operator takes an Observable with values and transforms it into an Observable with a single value. The count function accepts a predicate function as an optional parameter. This function is of Boolean type and adds a value to the output only if the condition is met.
max This operator returns an observation variable with the maximum value from the source observation variable.
min This operator returns an observation variable with the minimum value from the source observation variable.
reduce This operator takes a function, called an accumulator, which is applied to the values from the source observer and returns the accumulated value as an observer, with an optional seed value passed to the accumulator function.
sum This operator returns an observation variable with the sum of all the values from the source observation variable.

Transformation Operators

The operators we will discuss in the Transformation Operators category are as follows –

Operator Category
buffer This operator collects all values from a source observable and emits them at regular intervals once a given boundary condition is met.
ground_by This operator groups values from a source observable according to a given key_mapper function.
map This operator transforms each value from a source observable into a new value based on the output of a given mapper_func.
scan This operator applies an accumulator function to the values from the source observer and returns an observer with the new values.

Filter Operators

The following categories of filter operators are discussed:

Operator Category
debounce This operator returns the values of the source observer up to a given time range, ignoring the rest of the time.
distinct This operator returns all values that are distinct from the source observer.
element_at This operator returns an element from the source observable at the given index.
filter This operator filters values from the source observable according to the given predicate function.
first This operator returns the first element from the source observable.
ignore_elements This operator ignores all values from the source observable and only calls the completion or error callback.
last This operator returns the last element from the source observable.
skip This operator takes an observable that skips the first occurrence of a counted item.
skip_last This operator returns an observable that skips the last occurrence of a counted item.
take This operator takes a list of source values in consecutive order, given a counted number.
take_last This operator takes a list of source values in consecutive order, given a counted number.

Error Handling Operators

The operators we will discuss in the Error Handling Operators category are: –

Operator Description
catch This operator terminates the source observer when an exception occurs.
retry This operator retries on the source observer when an error occurs and terminates once the number of retries has been completed.

Utility Operators

The following operators are from the Utility Operators category that we will discuss.

Operator Description
delay This operator delays the emission of a source observable by a given time or date.
materialize This operator transforms the value of a source observable into a value emitted as a specific notification value.
time_interval This operator returns the elapsed time between values from a source observable.
timeout This operator returns all values from a source observable after the elapsed time, otherwise an error is raised.
timestamp This operator will append a timestamp to all values of the source observable.

Conditional and Boolean Operators

The operators we will discuss in the Conditional and Boolean Operators category are as follows –

Operator Description
all This operator will check if all values of the source observable satisfy the given condition.
contains If the given value exists and is a value of the source observable, this operator will return an observer with the value true or false.
default_if_empty If the source observer is empty, this operator returns a default value.
sequence_equal This operator compares two sequences of observations or arrays of values and returns an observation that evaluates to true or false.
skip_until This operator discards the value of the source observer until the second observer emits a value.
skip_while This operator returns an observer from the source observer with the value that satisfies the passed condition.
take_until This operator discards the value from the source observer after the second observer emits a value or terminates.
take_while This operator discards the value from the source observer if the condition fails.

Connectable Operators

The operator in the connectable operator category we will discuss is −

Operator Description
publish This method converts an observable into a connectable observable.
ref_count This operator will make the observer a normal observer.
replay This method works similarly to replaySubject. This method will return the same value even if the observer has already emitted and some subscribers have not subscribed yet.

Combination Operators

The following operators are discussed in the Combination Operators category.

Operator Description
combine_latest This operator creates a tuple of the observation variables it takes as input.
merge This operator merges the given observables.
start_with This operator takes a given value and appends it to the beginning of the source observable, returning the complete sequence.
zip This operator returns an observable with a tuple of values, where the tuple is formed from the first value of the given observable, and so on.

Leave a Reply

Your email address will not be published. Required fields are marked *