Reactive Programming in Java: How, Why, and Is It Worth Doing? Part II. Implementing and subscribing to an observer

Were continuing our series on Reactive Programming in Java. This time we look at implementing and subscribing to an observer.

Implementing and subscribing to an observer


In Java 9, there is no implementation of reactive streams but only a specification. Yet, there are several libraries implementations of the reactive approach. In this example, we use the RxJava library. We subscribe to a data stream and define several handlers, that is, methods that will be run at the start of stream processing (onSubscribe), upon receiving each following message (onNext), on the occurrence of errors (onError), and upon completion of the stream (onComplete):

implementing and subscribing to an observer.png


Look at the last string.

locations.map(String::length).filter(l -> l >= 5).subscribe(observer);

We use the map and filter operators. If you have worked with Java 8 streams, you certainly know map and filter. Here they work in the same way. The difference is that in reactive programming, those values could appear gradually. Every time a new value arrives, it goes through all transformations. Thus, String::length will replace strings to length in each line.
In this case, we get 5 (Minsk), 6 (Krakow), 6 (Moscow), 4 (Kiev), 5 (Sofia). Filter to leave only those that are over 5. We get a list of string lengths that are over 5 (Kiev is out). Subscribe to the final stream. Then the Observer is called to react to values in this final stream. With each next value, it will print the length:

public void onNext(Integer value) {
System.out.println("Length: " + value);

First, there appears Length 5, then Length 6. Once the stream is complete, onComplete will be called, and at the end, "Done. will appear:

public void onComplete() {
System.out.println("Done.");

Not all streams can complete. But some can do that. For example, if we were reading something from a file, the stream will complete when the file ends.

If an error occurs, we can react to it:

public void onError(Throwable e) {
e.printStackTrace();

So we can react to various events in different ways: to the following values, stream completion, and errors.

The original article can be found here.

Interested in learning how to program with Java or in upgrading your Java programming skills? Check out our trainings.


Share the knowledge

Still have questions?
Connect with us
Thank you.
Your request has been received.
Thank you!
The form has been submitted successfully.