After this short introduction to ReactiveX and RxPY, the time has come to see some concrete code and write a first example. This first RxPY application is acommand line interface (CLI) program that echoes the parameters that are provided as input. Save the following code in a file calledecho1.py, or use theecho1.py script from the Git repository of this book, as shown in the following code:
import sys
from rx import Observable
argv = Observable.from_(sys.argv[1:])
argv.subscribe(
on_next=lambda i: print("on_next: {}".format(i)),
on_error=lambda e: print("on_error: {}".format(e)),
on_completed=lambda: print("on_completed"))
Ensure that you are running invirutalenv, as shown in the following code:
$ source venv-rx/bin/activate
And when you run it, you should see the following output:
(venv-rx)$ python3 echo1.py hello world !
on_next: hello
on_next: world
on_next: !
on_completed
We ran the program with three parameters (hello,world, and!), and it printed these three parameters as well as information on the end ofObservable. Let's detail each line of this program. We will start by importing the modules that we will use, as in the following code:
import sys
from rx import Observable
Thesys module allows us to access the command line arguments. Therx module is the name of the RxPY package, which we installed frompip. We do not import the completerx module, but just theObservable class. In many cases we will only need this class, or a few other ones. Then wecancreate an observable from the command line arguments, as in the following example:
argv = Observable.from_(sys.argv[1:])
sys.argv is a list containing the command line arguments that were used to run the program. The first argument is the name of the script being executed. In this case its value isecho1.py. Since we do not want to use this argument we omit it with a slice, using the second up to the last argument of the list. An observable is created from this list with thefrom_ creation operator. This operator creates an observable from a Python iterable object, which is the case of our argument list. We affect the reference of this observable to theargv variable. So,argv is a reference to an observable that will emit items containing the arguments provided on the command line, one item per argument. After this affectation the observable is created, but does not emit any item yet; items are emitted only once the observable is subscribed. On the last part of the program, we subscribe to this observable and print text depending on the event being received, as can be seen in the following code:
argv.subscribe(
on_next=lambda i: print("on_next: {}".format(i)),
on_error=lambda e: print("on_error: {}".format(e)),
on_completed=lambda: print("on_completed"))
Three callback arguments are provided to the subscribe method:on_next,on_error, andon_completed. They are all optional, and they correspond to the reception of the associated events. As already explained, theon_next callback will be called zero or more times, and theon_error andon_completed callbacks can be called once at the most (and never if the observable never ends, which is not the case here). The call to thesubscribe method is the one that makes theargv observable start emitting items. In this simple application, the code of each callback is very simple, so we uselambda instead of functions.
Lambdas are anonymous functions; that is, functions that can be referenced only from where they are defined, because they have no name. However, lambdas have restrictions over functions, which makes them only suitable when simple manipulations are done with the data:
- Lambdas can use only expressions, not statements
- Lambdas contain only one expression
- Lambdas cannot declare or use local variables
So, lambdas are very useful when you need to do an action on one or several input parameters. For more complex logic, writing a function is mandatory. Lambdas are used a lot when developing RxPY code because many operators take functions as input. So, such operators are functions that accept functions as input parameters. Functions that accept functions as input are called higher order functions in functional programming and this is another aspect of functional programming used a lot in ReactiveX.
As you can see, theon_next callback is called once for each argument provided on the command line, and theon_completed callback is called right after. In this example application, we use a synchronousObservable. In practice, this means that all items are emitted in the context of thesubscribe call. To confirm this, add anotherprint statement after the subscribe call:
argv.subscribe(
on_next=lambda i: print("on_next: {}".format(i)),
on_error=lambda e: print("on_error: {}".format(e)),
on_completed=lambda: print("on_completed"))
print("done")
Then run the program again. You should see the following output:
(venv-rx)$ python3 ch1/echo1.py hello world !
on_next: hello
on_next: world
on_next: !
on_completed
done
As you can see, thedone print is displayed after the observable completes because the observable emits all its items during the call tosubscribe.
We will now add some functionality to this echo application. Instead of simply printing each argument, we will print them with the first letter in uppercase. This is the typical case of an action that must be applied to each item of an observable. In the current code, there are two possible locations to do it:
- Either by using an operator on theargv observable
- By modifying theon_next callback in thesubscribe call
In a real application, there will usually be only a single place where the action must be applied, depending on whether the action must be done in the observer or on the observable directly. Implementing the action in the observer allows you to isolate the change to this single observer. The other way of implementing the action on the observable allows you to share this behavior with several observers. Here we will implement the action on the observable with themap operator. Modify the code as in the following example, or use theecho2.py script from the GitHub repository (https://github.com/PacktPublishing/Hands-On-Reactive-Programming-with-Python) of the book:
argv = Observable.from_(sys.argv[1:]) \
.map(lambda i: i.capitalize())
Themap operator takes an observable as input, applies a transformation function on each item of this observable, and returns an observable which contains all input items, with the transformation applied to them. Here we have usedlambda that returns the item (a string) capitalized; that is, with its first letter in uppercase. If you run this new code you should get the following output:
(venv-rx)$ python3 ch1/echo2.py hello world !
on_next: Hello
on_next: World
on_next: !
on_completed
done
As you can see, the output is the same, but with capitalized names. Congratulations; you have just written your first reactive application!