RxPY – Utility Operators
RxPY – Utility Operators
delay
This operator will delay the emission of a source observable by a given time or date.
Syntax
delay(timespan)
Parameters
timespan: This will be the time or date in seconds.
Return Value
It will return an observable with the source value emitted after the timeout.
Example
from rx import of, operators as op
import datetime
test1 = of(1,2,3,4,5)
sub1 = test1.pipe(
op.delay(5.0)
)
sub1.subscribe(lambda x: print("The value is {0}".format(x)))
input("Press any key to exitn")
Output
E:pyrx>python testrx.py
Press any key to exit
The value is 1
The value is 2
The value is 3
The value is 4
The value is 5
materialize
This operator transforms the value from the source observable into a value emitted as an explicit notification value.
Syntax
materialize()
Return Value
This returns an observable that emits the value as an explicit notification value.
Example
from rx import of, operators as op
import datetime
test1 = of(1,2,3,4,5)
sub1 = test1.pipe(
op.materialize()
)
sub1.subscribe(lambda x: print("The value is {0}".format(x)))
Output
E:pyrx>python testrx.py
The value is OnNext(1.0)
The value is OnNext(2.0)
The value is OnNext(3.0)
The value is OnNext(4.0)
The value is OnNext(5.0)
The value is OnCompleted()
time_interval
This operator returns the time interval between values emitted from a source observable.
Syntax
time_interval()
Return Value
This operator returns an observable containing the time elapsed between the emission of the source value.
Example
from rx import of, operators as op
from datetime import date
test = of(1,2,3,4,5,6)
sub1 = test.pipe(
op.time_interval()
)
sub1.subscribe(lambda x: print("The value is {0}".format(x)))
Output
E:pyrx>python testrx.py
The value is TimeInterval(value=1, interval=datetime.timedelta(microseconds=1000
))
The value is TimeInterval(value=2, interval=datetime.timedelta(0))
The value is TimeInterval(value=3, interval=datetime.timedelta(0))
The value is TimeInterval(value=4, interval=datetime.timedelta(microseconds=1000
))
The value is TimeInterval(value=5, interval=datetime.timedelta(0))
The value is TimeInterval(value=6, interval=datetime.timedelta(0))
timeout
This operator will return all the values of the source observable after the time has passed, otherwise an error will be triggered.
Syntax
timeout(duetime)
Parameters
duetime: The time in seconds.
Return Value
It will return an observable with all the values of the source observable.
Example
from rx import of, operators as op
from datetime import date
test = of(1,2,3,4,5,6)
sub1 = test.pipe(
op.timeout(5.0)
)
sub1.subscribe(lambda x: print("The value is {0}".format(x)))
Output
E:pyrx>python testrx.py
The value is 1
The value is 2
The value is 3
The value is 4
The value is 5
The value is 6
Timestamp
This operator appends a timestamp to all values of the source observation variable.
Syntax
timestamp()
Return Value
This returns an Observable containing all the values of the source observable and a timestamp.
Example
from rx import of, operators as op
from datetime import date
test = of(1,2,3,4,5,6)
sub1 = test.pipe(
op.timestamp()
)
sub1.subscribe(lambda x: print("The value is {0}".format(x)))
Output
E:pyrx>python testrx.py
The value is Timestamp(value=1, timestamp=datetime.datetime(2019, 11, 4, 4, 57,
44, 667243))
The value is Timestamp(value=2, timestamp=datetime.datetime(2019, 11, 4, 4, 57,
44, 668243))
The value is Timestamp(value=3, timestamp=datetime.datetime(2019, 11, 4, 4, 57,
44, 668243))
The value is Timestamp(value=4, timestamp=datetime.datetime(2019, 11, 4, 4, 57,
44, 668243))
The value is Timestamp(value=5, timestamp=datetime.datetime(2019, 11, 4, 4, 57,
44, 669243))
The value is Timestamp(value=6, timestamp=datetime.datetime(2019, 11, 4, 4, 57,
44, 669243))