RxPY – Using Observables

RxPY – Using Observables

An observer is a function that creates an observer and attaches it to the source of the expected value, such as a click, a mouse event from a DOM element, etc.

The following topics will be examined in detail in this chapter.

  • Creating Observers
  • Subscribing to and Executing an Observable

Creating Observables

To create an Observable, we will use the create() method and pass it a function with the following items.

  • on_next() – This function is called when the Observable emits an item.
  • on_completed() – This function is called when the Observable completes.

  • on_error() – This function is called when an error occurs on the Observable.

To use the create() method, first import it, as shown below.

from rx import create

Here is a working example that creates an observable:

testrx.py

from rx import create
deftest_observable(observer, scheduler):
observer.on_next("Hello")
observer.on_error("Error")
observer.on_completed()
source = create(test_observable).

Subscribing and Executing an Observable

To subscribe to an observable, we use the subscribe() function and pass in the callback functions on_next, on_error, and on_completed.

Here’s a working example –

testrx.py

from rx import create
deftest_observable(observer, scheduler):
observer.on_next("Hello")
observer.on_completed()
source = create(test_observable)
source.subscribe(
on_next = lambda i: print("Got - {0}".format(i)),
on_error = lambda e: print("Error : {0}".format(e)),
on_completed = lambda: print("Job Done!"),
)

The subscribe() method is responsible for executing the observable’s contents. The callback functions on_next, on_error, and on_completed must be passed to the subscribe method. The call to the subscribe method, in turn, executes the test_observable() function.

You don’t have to pass all three callback functions to the subscribe() method. You can pass on_next(), on_error(), and on_completed() as needed.

The lambda function is used for on_next, on_error, and on_completed. It receives an argument and executes the given expression.

Below is the output of the created observable variable –

E:pyrx>python testrx.py
Got - Hello
Job Done!

Leave a Reply

Your email address will not be published. Required fields are marked *