RxPY – Connectable Difference Operator

RxPY – Connectable Difference Operator

publish

This method converts an observer into a connectable observer.

Syntax

publish(mapper=None)

Parameters

mapper: Optional. A function that multicasts the source value multiple times, without requiring multiple subscriptions.

Example

from rx import create, range, operators as op
import random
def test_observable(observer, scheduler):
   observer.on_next(random.random())
   observer.on_completed()
source = create(test_observable).pipe(op.publish())
test1 = source.subscribe(on_next = lambda i: print("From subscriber 1 - {0}".format(i)))
test2 = source.subscribe(on_next = lambda i: print("From subscriber 2 –
{0}".format(i)))
source.connect()

Output

E:pyrx>python testrx.py
From subscriber 1 - 0.14751607273318490
From subscriber 2 - 0.1475160727331849

ref_count

This operator will make the observer a normal observer.

Grammar

ref_count()

Example

from rx import create, operators as op
import random
def test_observable(observer, scheduler):
   observer.on_next(random.random())
source = create(test_observable).pipe(op.publish(),op.ref_count())
test1 = source.subscribe(on_next = lambda i: print("From subscriber 1 - {0}".format(i)))
test2 = source.subscribe(on_next = lambda i: print("From subscriber 2 - {0}".format(i)))

Output

E:pyrx>python testrx.py
From subscriber 1 - 0.8230640432381131

replay

This method works similarly to replaySubject. It will return the same value even if the observer has already emitted and some subscribers have not subscribed yet.

Grammar

replay()

Example

from rx import create, range, operators as op
import random
from threading import Timer
def test_observable(observer, scheduler):
   observer.on_next(random.random())
   observer.on_completed()
source = create(test_observable).pipe(op.replay())
test1 = source.subscribe(on_next = lambda i: print("From subscriber 1 - {0}".format(i)))
test2 = source.subscribe(on_next = lambda i: print("From subscriber 2 - {0}".format(i)))
source.connect()
print("subscriber called after delay ")
def last_subscriber():
   test3 = source.subscribe(on_next = lambda i: print("From subscriber 3 - {0}".format(i)))
t = Timer(5.0, last_subscriber)
t.start()

Output

E:pyrx>python testrx.py
From subscriber 1 - 0.8340998157725388
From subscriber 2 - 0.8340998157725388
subscriber called after delay
From subscriber 3 - 0.8340998157725388

Leave a Reply

Your email address will not be published. Required fields are marked *