RxPY – Working with Subjects
RxPY – Working with Subjects
A subject is an observable sequence and an observer that can multicast, i.e., talk to many subscribed observers.
We will discuss the following topics regarding subjects:
- Creating a Subject
- Subscribing to a Subject
- Passing Data to a Subject
- Behavior Subject
- Playback Subject
- Asynchronous Subject
Creating a Subject
To work with a subject, we need to import the subject, as shown below.
from rx.subject import Subject
You can create a subject as follows:
subject_test = Subject()
This object is an observer and has three methods: –
- on_next(value)
- on_error(error) and
- on_completed().
Subscribe to a Topic
You can create multiple subscriptions on a topic, as shown below.
subject_test.subscribe(
lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
lambda x: print("The value is {0}".format(x))
)
Passing Data to a Topic
You can use the on_next(value) method to pass data to the created topic, as shown below.
subject_test.on_next("A")
subject_test.on_next("B")
This data will be passed to all subscriptions and added to the topic.
Here is a working example of a topic.
Example
from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
lambda x: print("The value is {0}".format(x))
)
subject_test.on_next("A")
subject_test.on_next("B")
The subject_test object is created by calling Subject() . The subject_test object has references to the on_next(value), on_error(error), and on_completed() methods. The output of the above example is shown below.
Output
E:pyrx>python testrx.py
The value is A
The value is A
The value is B
The value is B
We can use the on_completed() method to stop the execution of the theme, as shown below.
Example
from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
lambda x: print("The value is {0}".format(x))
)
subject_test.on_next("A")
subject_test.on_completed()
subject_test.on_next("B")
Once we call complete, the next method we call will not be called.
Output
E:pyrx>python testrx.py
The value is A
The value is A
Now let’s see how to call the on_error(error) method.
Example
from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
on_error = lambda e: print("Error : {0}".format(e))
)
subject_test.subscribe(
on_error = lambda e: print("Error : {0}".format(e))
)
subject_test.on_error(Exception('There is an Error!'))
Output
E:pyrx>python testrx.py
Error: There is an Error!
Error: There is an Error!
BehaviorSubject
BehaviorSubject will give you the latest value when called. You can create a behavior subject, as shown below.
from rx.subject import BehaviorSubject
behavior_subject = BehaviorSubject("Testing Behaviour Subject"); // initialized the behavior subject with value: Testing Behaviour Subject
<p>Here is a working example using a behavior subject.
<p>Example</p>
<pre><code class="language-python line-numbers">from rx.subject import BehaviorSubject
behavior_subject = BehaviorSubject("Testing Behaviour Subject");
behavior_subject.subscribe(
lambda x: print("Observer A : {0}".format(x))
)
behavior_subject.on_next("Hello")
behavior_subject.subscribe(
lambda x: print("Observer B : {0}".format(x))
)
behavior_subject.on_next("Last call to Behaviour Subject")
Output
E:pyrx>python testrx.py
Observer A: Testing Behaviour Subject
Observer A: Hello
Observer B: Hello
Observer A: Last call to Behaviour Subject
Observer B: Last call to Behaviour Subject
Repaysubject
A Repaysubject is similar to a behavior subject in that it can buffer values and replay them to new subscribers. Here is a working example of a replay subject.
Example
from rx.subject import ReplaySubject
replay_subject = ReplaySubject(2)
replay_subject.subscribe(lambda x: print("Testing Replay Subject A: {0}".format(x)))
replay_subject.on_next(1)
replay_subject.on_next(2)
replay_subject.on_next(3)
replay_subject.subscribe(lambda x: print("Testing Replay Subject B: {0}".format(x)));
replay_subject.on_next(5)
The buffer value used on the replay subject is 2. Therefore, the last two values will be buffered and used to call the new user.
Output
E:pyrx>python testrx.py
Testing Replay Subject A: 1
Testing Replay Subject A: 2
Testing Replay Subject A: 3
Testing Replay Subject B: 2
Testing Replay Subject B: 3
Testing Replay Subject A: 5
Testing Replay Subject B: 5
AsyncSubject
In the case of an AsyncSubject, the value of the last call is delivered to the subscriber, and the process completes only after the complete() method is called.
Example
from rx.subject import AsyncSubject
async_subject = AsyncSubject()
async_subject.subscribe(lambda x: print("Testing Async Subject A: {0}".format(x)))
async_subject.on_next(1)
async_subject.on_next(2)
async_subject.on_completed()
async_subject.subscribe(lambda x: print("Testing Async Subject B: {0}".format(x)))
Here, before complete is called, the last value passed to the subject is 2, and the same is given to the subscribers.
Output
E:pyrx>python testrx.py
Testing Async Subject A: 2
Testing Async Subject B: 2