RxPY – Creating Observables

RxPY – Creating Observables

create

This method is used to create an Observable object. It will have one observer method, namely:

  • on_next() – This function is called when the Observable emits an item.
  • on_completed() – This function is called when the Observable completes.

  • on_error() – This function is called when an error occurs on the Observable.

Here is a working example –

testrx.py

from rx import create
def test_observable(observer, scheduler):
observer.on_next("Hello")
observer.on_error("Error occurred")
observer.on_completed()
source = create(test_observable)
source.subscribe(
on_next = lambda i: print("Got - {0}".format(i)),
on_error = lambda e: print("Error : {0}".format(e)),
on_completed = lambda: print("Job Done!"),
)

Here is the created observable’s output

E:pyrx>python testrx.py
Got - Hello
Job Done!

empty

This observable does not output anything, but instead emits the complete state directly.

Syntax

empty()

Return Value

This returns an observable with no elements.

Example

from rx import empty
test = empty()
test.subscribe(
   lambda x: print("The value is {0}".format(x)),
   on_error = lambda e: print("Error : {0}".format(e)),
   on_completed = lambda: print("Job Done!")
)

Output

E:pyrx>python testrx.py
Job Done!

never

This method creates an observable that never completes.

Syntax

never()

Return Value

It returns an observable that never completes.

Example

from rx import never
test = never()
test.subscribe(
lambda x: print("The value is {0}".format(x)),
on_error = lambda e: print("Error : {0}".format(e)),
on_completed = lambda: print("Job Done!")
)

Output

It does not show any output.

throw

This method creates an observable that throws errors.

Syntax

throw(exception)

Parameters

exception: An object with details about the error.

Return Value

Returns an observable with error details.

Example

from rx import throw
test = throw(Exception('There is an Error!'))
test.subscribe(
lambda x: print("The value is {0}".format(x)),
on_error = lambda e: print("Error : {0}".format(e)),
on_completed = lambda: print("Job Done!")
)

Output

E:pyrx>python testrx.py
Error: There is an Error!

from_

This method converts the given array or object into an observable.

Syntax

from_(iterator)

Parameters

Iterator: This is an object or array.

Return Value

This returns the observable data for the given iterator.

Example

from rx import from_
test = from_([1,2,3,4,5,6,7,8,9,10])
test.subscribe(
   lambda x: print("The value is {0}".format(x)),
   on_error = lambda e: print("Error : {0}".format(e)),
   on_completed = lambda: print("Job Done!")
)

Output

E:pyrx>python testrx.py
The value is 1
The value is 2
The value is 3
The value is 4
The value is 5
The value is 6
The value is 7
The value is 8
The value is 9
The value is 10
Job Done!

interval

This method generates a series of values after a timeout.

Syntax

interval(period)

Parameters

period: The starting integer sequence.

Return Value

It returns an observable containing the sequence of all values.

Example

import rx
from rx import operators as ops
rx.interval(1).pipe(
   ops.map(lambda i: i * i)
).subscribe(lambda x: print("The value is {0}".format(x)))
input("Press any key to exitn")

Output

E:pyrx>python testrx.py
Press any key to exit
The value is 0
The value is 1
The value is 4
The value is 9
The value is 16
The value is 25
The value is 36
The value is 49
The value is 64
The value is 81
The value is 100
The value is 121
The value is 144
The value is 169
The value is 196
The value is 225
The value is 256
The value is 289
The value is 324
The value is 361
The value is 400

just

This method converts the given value into an Observable.

Syntax

just(value)

Parameters

value: The value to be converted into an Observable.

Return Value

It returns an Observable with the given value.

Example

from rx import just
test = just([15, 25, 50, 55])
test.subscribe(
lambda x: print("The value is {0}".format(x)),
on_error = lambda e: print("Error : {0}".format(e)),
on_completed = lambda: print("Job Done!")
)

Output

E:pyrx>python testrx.py
The value is [15, 25, 50, 55]
Job Done!

range

This method returns a range of integers based on the given input.

Syntax

range(start, stop=None)

Parameters

start: The first value at which the range will begin.

stop: Optional, the last value at which the range will stop.

Return Value

This returns an observable whose values are integers based on the given input.

Example

from rx import range
test = range(0,10)
test.subscribe(
   lambda x: print("The value is {0}".format(x)),
   on_error = lambda e: print("Error : {0}".format(e)),
   on_completed = lambda: print("Job Done!")
)

Output

E:pyrx>python testrx.py
The value is 0
The value is 1
The value is 2
The value is 3
The value is 4
The value is 5
The value is 6
The value is 7
The value is 8
The value is 9
Job Done!

repeat_value

This method creates an observation variable that repeats the given value at the given count.

Syntax

repeat_value(value=None, repeat_count=None)

Parameters

value: Optional. The value to repeat.

repeat_count: Optional. The number of times to repeat the given value.

Return Value

Returns an observation variable that repeats the given value at the given count.

Example

from rx import repeat_value
test = repeat_value(44,10)
test.subscribe(
   lambda x: print("The value is {0}".format(x)),
   on_error = lambda e: print("Error : {0}".format(e)),
   on_completed = lambda: print("Job Done!")
)

Output

E:pyrx>python testrx.py
The value is 44
The value is 44
The value is 44
The value is 44
The value is 44
The value is 44
The value is 44
The value is 44
The value is 44
The value is 44
Job Done!

start

This method takes a function as input and returns an observable that will hold the return value of the input function.

Syntax

start(func)

Parameters

func: A function to be called.

Return Value

It returns an observable that will hold the return value of the input function.

Example

from rx import start
test = start(lambda : "Hello World")
test.subscribe(
lambda x: print("The value is {0}".format(x)),
on_error = lambda e: print("Error : {0}".format(e)),
on_completed = lambda: print("Job Done!")
)

Output

E:pyrx>python testrx.py
The value is Hello World
Job Done!

timer

This method will emit values sequentially after the timeout expires.

Syntax

timer(duetime)

Parameters

duetime: The time after which the first value should be emitted.

Return Value

Returns an observable containing the value emitted after duetime.

Example

import rx
from rx import operators as ops
rx.timer(5.0, 10).pipe(
   ops.map(lambda i: i * i)
).subscribe(lambda x: print("The value is {0}".format(x)))
input("Press any key to exitn")

Output

E:pyrx>python testrx.py
Press any key to exit
The value is 0
The value is 1
The value is 4
The value is 9
The value is 16
The value is 25
The value is 36
The value is 49
The value is 64

Leave a Reply

Your email address will not be published. Required fields are marked *