RxPY – Connectable Difference Operator
RxPY – Connectable Difference Operator
publish
This method converts an observer into a connectable observer.
Syntax
publish(mapper=None)
Parameters
mapper: Optional. A function that multicasts the source value multiple times, without requiring multiple subscriptions.
Example
from rx import create, range, operators as op
import random
def test_observable(observer, scheduler):
observer.on_next(random.random())
observer.on_completed()
source = create(test_observable).pipe(op.publish())
test1 = source.subscribe(on_next = lambda i: print("From subscriber 1 - {0}".format(i)))
test2 = source.subscribe(on_next = lambda i: print("From subscriber 2 –
{0}".format(i)))
source.connect()
Output
E:pyrx>python testrx.py
From subscriber 1 - 0.14751607273318490
From subscriber 2 - 0.1475160727331849
ref_count
This operator will make the observer a normal observer.
Grammar
ref_count()
Example
from rx import create, operators as op
import random
def test_observable(observer, scheduler):
observer.on_next(random.random())
source = create(test_observable).pipe(op.publish(),op.ref_count())
test1 = source.subscribe(on_next = lambda i: print("From subscriber 1 - {0}".format(i)))
test2 = source.subscribe(on_next = lambda i: print("From subscriber 2 - {0}".format(i)))
Output
E:pyrx>python testrx.py
From subscriber 1 - 0.8230640432381131
replay
This method works similarly to replaySubject. It will return the same value even if the observer has already emitted and some subscribers have not subscribed yet.
Grammar
replay()
Example
from rx import create, range, operators as op
import random
from threading import Timer
def test_observable(observer, scheduler):
observer.on_next(random.random())
observer.on_completed()
source = create(test_observable).pipe(op.replay())
test1 = source.subscribe(on_next = lambda i: print("From subscriber 1 - {0}".format(i)))
test2 = source.subscribe(on_next = lambda i: print("From subscriber 2 - {0}".format(i)))
source.connect()
print("subscriber called after delay ")
def last_subscriber():
test3 = source.subscribe(on_next = lambda i: print("From subscriber 3 - {0}".format(i)))
t = Timer(5.0, last_subscriber)
t.start()
Output
E:pyrx>python testrx.py
From subscriber 1 - 0.8340998157725388
From subscriber 2 - 0.8340998157725388
subscriber called after delay
From subscriber 3 - 0.8340998157725388