RxPY – Using Observables
RxPY – Using Observables
An observer is a function that creates an observer and attaches it to the source of the expected value, such as a click, a mouse event from a DOM element, etc.
The following topics will be examined in detail in this chapter.
- Creating Observers
-
Subscribing to and Executing an Observable
Creating Observables
To create an Observable, we will use the create() method and pass it a function with the following items.
- on_next() – This function is called when the Observable emits an item.
-
on_completed() – This function is called when the Observable completes.
-
on_error() – This function is called when an error occurs on the Observable.
To use the create() method, first import it, as shown below.
from rx import create
Here is a working example that creates an observable:
testrx.py
from rx import create
deftest_observable(observer, scheduler):
observer.on_next("Hello")
observer.on_error("Error")
observer.on_completed()
source = create(test_observable).
Subscribing and Executing an Observable
To subscribe to an observable, we use the subscribe() function and pass in the callback functions on_next, on_error, and on_completed.
Here’s a working example –
testrx.py
from rx import create
deftest_observable(observer, scheduler):
observer.on_next("Hello")
observer.on_completed()
source = create(test_observable)
source.subscribe(
on_next = lambda i: print("Got - {0}".format(i)),
on_error = lambda e: print("Error : {0}".format(e)),
on_completed = lambda: print("Job Done!"),
)
The subscribe() method is responsible for executing the observable’s contents. The callback functions on_next, on_error, and on_completed must be passed to the subscribe method. The call to the subscribe method, in turn, executes the test_observable() function.
You don’t have to pass all three callback functions to the subscribe() method. You can pass on_next(), on_error(), and on_completed() as needed.
The lambda function is used for on_next, on_error, and on_completed. It receives an argument and executes the given expression.
Below is the output of the created observable variable –
E:pyrx>python testrx.py
Got - Hello
Job Done!