Reactive programming with RxJava

ReactiveX is one of the most established frameworks for reactive programming, and RxJava is its Java-based implementation. Let's see what we can do with RxJava.

hot coffee java beans cup steam
Felix Brönnimann (CC0)

Reactive programming takes the functional paradigm and layers on sophisticated programming-in-the-large capabilities. These capabilities allow for using functional-like semantics in application architectures. ReactiveX is one of the strongest projects in the reactive world, providing a set of common specifications for language implementers. This article is a hands-on exploration of RxJava, the Java implementation of ReactiveX.

Getting started with RxJava

To test out RxJava, we’ll write a command-line application that watches the public event stream developed by CoinCap. This event stream provides a WebSocket API, which is like a firehouse of JSON-formatted events for every transaction on a wide range of crypto exchanges. We’ll begin by simply grabbing these events and printing them to the console. Then we'll add in some more sophisticated handling to show off RxJava’s capabilities.

Listing 1 gets us started with the Maven quickstart archetype, which provides the scaffolding for our demo application.

Listing 1. The Maven quickstart


mvn archetype:generate -DgroupId=com.infoworld -DartifactId=rxjava -DarchetypeArtifactId=maven-archetype-quickstart

Now we have a simple project scaffold stored in the /rxjava directory. We can modify the pom.xml to include the dependencies we need. We also set the Java version of the program, as shown in Listing 2.

Listing 2. Modified pom.xml


<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.infoworld</groupId>
  <artifactId>rxjava</artifactId>
  <packaging>jar</packaging>
  <version>1.0-SNAPSHOT</version>
  <name>rxjava</name>
  <url>http://maven.apache.org</url>
  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.8.1</version>
        <configuration>
          <source>16</source>
          <target>16</target>
        </configuration>
      </plugin>
    </plugins>
  </build>
  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
      </dependency>
    <dependency>
        <groupId>io.reactivex.rxjava2</groupId>
        <artifactId>rxjava</artifactId>
        <version>2.2.21</version>
    </dependency>
<dependency>
        <groupId>com.squareup.okhttp3</groupId>
        <artifactId>okhttp</artifactId>
        <version>4.9.1</version>
    </dependency>
    <!-- JSON library for parsing GitHub API response -->
    <dependency>
        <groupId>com.google.code.gson</groupId>
        <artifactId>gson</artifactId>
        <version>2.8.9</version>
    </dependency>
  </dependencies>
</project>

To confirm that things are working, type: $ mvn clean install exec:java -Dexec.mainClass="com.infoworld.App". This command should result in the classic “Hello World” output.

Now, we’ll add the code for the basic feature of pulling events from the WebSocket endpoint and displaying them in the console. You can see this code in Listing 3.

Listing 3. Adding a feature


package com.infoworld;

import io.reactivex.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.disposables.Disposables;
import io.reactivex.schedulers.Schedulers;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.WebSocket;
import okhttp3.WebSocketListener;
import okio.ByteString;

public class App {
  public static void main(String[] args) {
    String websocketUrl = "wss://ws.coincap.io/trades/binance";
    OkHttpClient client = new OkHttpClient();
    Request request = new Request.Builder().url(websocketUrl).build();

    Observable<String> observable = Observable.create(emitter -> {
      WebSocket webSocket = client.newWebSocket(request, new WebSocketListener() {
        @Override
        public void onOpen(WebSocket webSocket, okhttp3.Response response) {
          // WebSocket connection is open
        }
        @Override
        public void onMessage(WebSocket webSocket, String text) {
          emitter.onNext(text); // Emit received message
        }
        @Override
        public void onMessage(WebSocket webSocket, ByteString bytes)           {
        // Handle binary message if needed
        }
        @Override
        public void onClosing(WebSocket webSocket, int code, String reason) {
          webSocket.close(code, reason);
        }
        @Override
        public void onClosed(WebSocket webSocket, int code, String reason) {
          emitter.onComplete(); // WebSocket connection is closed
        }
        @Override
        public void onFailure(WebSocket webSocket, Throwable t, okhttp3.Response response) {
          emitter.onError(t); // WebSocket connection failure
        }
      });
    // Dispose WebSocket connection when the observer is disposed
    emitter.setDisposable(Disposables.fromRunnable(() -> webSocket.close(1000, "Closing WebSocket")));
    });
    observable
      .subscribeOn(Schedulers.io())
      .subscribe(new Observer<String>() {
      @Override
      public void onSubscribe(Disposable d) {
        // No-op
      }
     @Override
     public void onNext(String event) {
       // Process each event here
       System.out.println(event);
     }
     @Override
     public void onError(Throwable e) {
       e.printStackTrace();
     }

     @Override
     public void onComplete() {
       System.out.println("Completed");
     }
   });

   // Wait indefinitely (or use another mechanism to keep the program running)
   try {
      Thread.sleep(Long.MAX_VALUE);
   } catch (InterruptedException e) {
     e.printStackTrace();
   }
  }
}

If you run this program, you’ll get a line-by-line output of JSON events, with one event per line. To kill it, hit Ctrl/Command-c.

Modeling event streams

Listing 3 gives us a good look at some RxJava fundamentals. We obtain a connection to the binance push endpoint (wss://ws.coincap.io/trades/binance) with the OkHttpClient, which makes it easy to consume the WebSocket API. (See the OkHttpClient documentation.)

Once we have the connection open, we create a new Observable. An Observable is the basic type for emitting events, an object that can be watched (or listened to). In other words, an Observable is an event source of some kind, and it can model many different sources. In this case, we're creating a new source with the Observables.create method, which is a higher-order function accepting a function with a single argument, which we name emitter.

The emitter object has all the callback methods we need in order to produce our event stream. In a sense, we want to wrap the WebSocket stream in a custom RxJava event source. To do this, we take the callbacks we want from WebSocketClient—in particular, the String version of onMessage—and call the emitter method we want, in this case, emitter.onNext(text);. (There are also callbacks for life cycle events like onClosed and onError.) 

What this gives us is an Observable that can be handed around to whoever needs it in order to be informed of what’s going on. This is a standardized, portable way to model event streams. Moreover, it's highly malleable, with a range of functional transformations, which you'll see momentarily. 

Here's how we close the emitter:


emitter.setDisposable(Disposables.fromRunnable(() -> webSocket.close(1000, "Closing WebSocket")));
});. 

Closing the emitter this way ensures that we close the WebSocket connection when the emitter is done. 

Observing events

To observe events coming off the Observable, we use the subscribe method on the Observable object. We first call .subscribeOn(Schedulers.io()), which tells RxJava to run in a background thread. This is a (very) easy way to get multithreaded concurrency. RxJava even uses a thread pool for you.

The main work of handling the events is done by passing in an Observer to the subscribe method. The Observer class is the other side of the coin to Observable: the basic type of anything that wants to watch for events. In this case, we create a new anonymous Observer (parameterized with a <String> generic) inline in the subscribe() call. The actual job of writing the event to the console happens in the onNext(String) method of the Observer.

Manipulating event streams

Now let’s perform a couple of operations on the stream. First, we'll use GSON to transform the String into a JSON object. Then, we'll use the object to filter out only transactions that are on the Solana blockchain.

To do this, we can use the map() and filter() methods on the Observable class. With map(), we can transform the strings on an event-by-event basis into JSON objects. Then, we use the JSON inside the filter() method to only keep those events with “Solana” as the currency (in the CoinCap spec, the crypto being used is in the “base” field). You can see this new code  in Listing 4.

Listing 4. Using map() and filter()


import com.google.gson.Gson;
import com.google.gson.JsonObject;
//… The rest is the same
observable
  .subscribeOn(Schedulers.io())
  .map(event -> {
    Gson gson = new Gson();
    JsonObject jsonObject = gson.fromJson(event, JsonObject.class);
    return jsonObject;
  })
  .filter(jsonObject -> {
    String base = jsonObject.get("base").getAsString();
    return base.equals("solana");
  })
  .subscribe(
    jsonObject -> System.out.println(jsonObject),
    Throwable::printStackTrace,
    () -> System.out.println("Completed")
  );

The map and filter calls are fairly easy to read. map() turns our String stream into a JsonObject stream. filter() takes the JsonObjects as they arrive. It only keeps the ones with a base field equal to “solana”. 

Listing 4 also shows us a different overload of the subscribe() method. Instead of an Observer instance, this one takes three arguments: the onNext, onError, and onComplete functions. It works the same. There is also a single-argument version that just takes the onNext handler.

Also, notice that map and filter are the same functional-style operations we know and love from Java streams and other languages like JavaScript. But now, we can apply them to a wide range of event sources. In fact, we can apply these operations to anything that can be handled with Observers and Observables.

Conclusion

Reactive programming in RxJava puts some serious power in your hands, in a flexible syntax. It can be used in a variety of circumstances. As we’ve seen, it’s quite handy in dealing with streaming data sources like the CoinCap API. The ability to pass around streams of events as objects is an important abstraction in modern software. Every developer should know about it.  You can find the full source for the example application on GitHub.

Copyright © 2023 IDG Communications, Inc.