JavaScript
Article
By Peleke Sengstacke

10 Need-to-Know RxJS Functions with Examples

By Peleke Sengstacke

This article was peer reviewed by Florian Rappl and Moritz Kröger. Thanks to all of SitePoint’s peer reviewers for making SitePoint content the best it can be!

As the interest in functional reactive programming (FRP) has grown, RxJS has emerged as one of the most popular JavaScript libraries for programming in this paradigm. In this article, we’re going to take a look at what I consider to be the ten must-know functions from RxJS.

Note: This article presupposes familiarity with the basics of RxJS, as presented in the article Introduction to Functional Reactive Programming with RxJS.

Reactive Programming

Reactive programming is a programming paradigm that treats streams of data, called Observables, as its basic units of programming.

Streams—or Observables , in RxJS jargon—are analogous to event listeners: Both wait for something to happen, and notify you when it does. The series of asynchronous notifications you get from an onClick listener is a perfect example of a stream of data.

Put another way, an Observable is nothing more than an array that populates over time.

That array’s elements can come from just about anywhere: The file system, DOM events, API calls, or even converted synchronous data, like arrays. At heart, reactive programming is nothing more than using Observables as the building blocks of your programs.

Relationship to arrays

Arrays are simple because their contents are final unless explicitly changed. In that sense, there’s nothing essentially temporal about an array.

An Observable, on the other hand, is defined by time. The most you can know about a stream is that it has received [1, 2, 3] so far. You can’t be certain that you’ll ever get a 4—nor that you won’t—and it’s the data source, not your program, that decides.

The relationship between streams and arrays is so profound that most of the reactive extensions derive from the world of functional programming, where list operations are bread and butter.

Warming up to RxJS

Consider the all-too-common to-do app. Let’s see how the problem of displaying only the names of a user’s incomplete tasks looks with RxJS:

const task_stream =
  // Makes a stream of all the tasks in the database
  getTasks().
    // Get tasks only for this user
    filter((task) => task.user_id == user_id).
    // Get tasks that are incompleted
    filter((task) => !task.completed).
    // Only get name of task
    map((task) => task.name)

/* Tasks look like this:
   task = {
    user_id   : number,
    completed : boolean,
    name      : string
   }
 */

So far, this is nothing more than array extras, but demonstrates reactive programming’s functional flavor.

It’s declarative nature becomes clear with the addition of more complicated, “real-world” features. Let’s say we want to:

  • Kick off the request in response to the user’s selection to view either complete or incomplete tasks;
  • Only send a request for the last selection every second, so as not to waste bandwidth if the user quickly changes their choice;
  • Retry failed requests up to three times; and
  • Only redraw the view when the server sends a different response from last time.
const task_stream =
  parameter_stream.
    debounce(1000).
    map((parameter) => {
      getTasks().
        retry(3).
        filter((task) => task.user_id === user_id).
        filter((task) => task.completed === parameter).
        map((task)    => task.name)
    }).
    flatMap(Rx.Observable.from).
    distinctUntilChanged().
    update()

Step by step:

  • parameter_stream tells us whether the user wants complete or incomplete tasks, storing the choice in parameter;
  • debounce() ensures we only pay attention to the last button click every second;
  • The section around getTasks() does the same as before;
  • distinctUntilChanged() ensures we only pay attention when the server’s response is different than it was last time; and
  • update() takes care of updating the UI to reflect what we got from the server.

Handling debounce, retry, and “distinct until changed” logic in an imperative, callback-based style is valid, but it can be both brittle and complicated.

The takeaway is that programming with RxJS allows for:

  1. Declarative programs;
  2. Extensible systems; and
  3. Straightforward, robust error-handling.

We’ll meet each of the functions in the above example on our tour through the ten must-know functions of RxJS.

Operations on Simple Streams

The fundamental functions on simple streams—those that emit simple values, like strings—include:

With the exception of take() and takeWhile(), these are analogous to JavaScript’s higher-order array functions.

We’ll apply each of these by solving an example problem: Finding all the users in our database who have a .com or .org website, and calculating the average length of their websites’ names.

JSONPlaceholder will be our source of users. Here’s the JSON representation of the user data we’ll be working with.

1. Using map() to transform data

Using map() on an Observable is identical to using it on an array. It:

  1. Accepts a callback as an argument;
  2. Executes it on every element of the array you called it on; and
  3. Returns a new array with each element of the original array replaced with the result of calling the callback on it.

The only differences when using map() on Observables are that:

  1. Instead of returning a new array, it returns a new Observable; and
  2. It executes every time the Observable emits a new item, instead of immediately and all at once.

We can use map() to transform our stream of user data into just a list of their website names:

source.
  map((user) => user.website)

See the Pen 10 Functions from RxJS // map by SitePoint (@SitePoint) on CodePen.

Here, we’ve used map to “replace” every user object in the incoming stream with each user’s website.

RxJS also allows you to call map() as select(). Both names refer to the same function.

2. Filtering results

Like map(), filter() is much the same on Observables as on arrays. To find every user with either a .net or a .org website address, we’d write:

source.
  map((user) => user.website).
  filter((website) => (website.endsWith('net') || website.endsWith('org'));
})

See the Pen 10 Functions from RxJS // filter by SitePoint (@SitePoint) on CodePen.

This selects only the users whose websites end with ‘net’ or ‘org’.

filter() also has the alias where().

3. Collecting results with reduce()

reduce() allows us to use all of our individual values and turn them into a single result.

reduce() tends to be the most confusing of the basic list operations, because, unlike filter() or map(), it behaves differently from use to use.

In general, reduce() takes a collection of values, and turns it into a single data point. In our case, we’ll feed it a stream of website name, and use reduce() to convert that stream into an object that counts how many websites we’ve found, and the sum of the lengths of their names.

source.
  map((user) => user.website).
  filter((website) => (website.endsWith('net') || website.endsWith('org'))).
  reduce((data, website) => {
    return {
      count       : data.count += 1,
      name_length : data.name_length += website.length
    }
  }, { count : 0, name_length : 0 })

See the Pen 10 Functions from RxJS // reduce by SitePoint (@SitePoint) on CodePen.

Here, we reduce our stream to a single object, which tracks:

  1. How many sites we’ve seen; and
  2. The total length of all their names.

Keep in mind that reduce() only returns a result when the source Observable completes. If you want to know the state of the accumulator every time the stream receives a new item, use scan() instead.

4. Limiting results with take()

take() and takeWhile() round out the basic functions on simple streams.

take(n) reads n values from a stream, and then unsubscribes.

We can use scan() to emit the our object every time we receive a website, and only take() the first two values.

source.
  map((user) => user.website).
  filter((website) => (website.endsWith('net') || website.endsWith('org'))).
  scan((data, website) => {
      return {
        count       : data.count += 1,
        name_length : data.name_length += website.length
      }
    }, { count : 0, name_length : 0 }).
  take(2);

See the Pen 10 Functions from RxJS // scan & take/takeWhile by SitePoint (@SitePoint) on CodePen.

RxJS also offers takeWhile(), which allows you to take values until some boolean test holds true. We can write the above stream with takeWhile() like this:

source.
  map((user) => user.website).
  filter((website) => (website.endsWith('net') || website.endsWith('org'))).
  scan((data, website) => {
    return {
      count       : data.count += 1,
      name_length : data.name_length += website.length
    }
  }, { count : 0, name_length : 0 }).
  takeWhile((data) =>  data.count < 3)
--ADVERTISEMENT--

Operations on Higher-Order Streams

Aside from the fact that they work on Observables, not arrays, these functions are almost identical the familiar list operations.

“[I]f you know how to program against Arrays using the Array#extras, then you already know how to use RxJS!” ~ RxJS Documentation

Just as arrays can contain more complicated data than simple values, like arrays or objects, so too can Observables emit higher-order data, like Promises or other Observables. This is where more specialized tools come into play.

5. Squashing streams with flatMap()

. . . As a matter of fact, we’re already using one!

We made calls to fromPromise() and flatMap() when we defined our source stream:

const source   =
        // Take a Promise and convert it to an Observable
        Rx.Observable.fromPromise(makeRequest(ENDPOINT))
          // Flatten Promise
          .flatMap(Rx.Observable.from); 

This uses three pieces of new machinery:

  1. fromPromise;
  2. Rx.Observable.from; and
  3. flatMap .

Observables from promises

A Promise represents a single future value we’ll get back asynchronously—the result of a call to the server, for instance.

One of the defining features of a Promise is that it represents just one future value. It can’t return multiple asynchronous pieces of data; that’s what Observables do, and is a fundamental difference between the two.

This means that, when we use Rx.Observable.fromPromise(), we get an Observable that emits a single value—either:

  1. The value the Promise resolves to; or
  2. The error the Promise rejects with.

When a Promise returns a string or a number, we don’t need to do anything special. But when it returns an array, as it does in our case, we’d prefer to create an Observable that emits the contents of the array, not the array itself as a single value.

6. Using flatMap()

This process is called flattening, which flatMap() takes care of. It has a number of overloads, but we’ll only use the simplest and the most common of them.

When using flatMap(), we:

  1. Call flatMap() on an Observable that emits the single-value resolution or rejection of a Promise; and
  2. Pass it a function to create a new Observable with.

In our case, we pass Rx.Observable.from(), which creates a sequence from the values of an array:

Rx.Observable.from([1, 2, 3]).
  subscribe(
      onNext (value) => console.log(`Next: ${value}`))

// Prints: 
//  Next: 1
//  Next: 2
//  Next: 3

That covers the code in our little prelude:

const source =
  // Create an Observable emitting the VALUE or REJECTION of a Promise...
  Rx.Observable.fromPromise(makeRequest(ENDPOINT))
    // ...And turn it into a new Observable that emits every item of the
    //  array the Promise resolves to.
    .flatMap(Rx.Observable.from)

RxJS has an alias for flatMap(), as well: selectMany().

Composing Multiple Streams

Often, we’ll have multiple streams that we need to put together. There are many ways to combine streams, but there are a few that come up more than others.

7. Combining streams with concat() and merge()

Concatenation and merging are two of the most common ways to combine streams.

Concatenation creates a new stream by emitting the values of a first stream until it completes, and then emitting the values of a second stream.

Merging creates a new stream from many streams by emitting values from whichever stream is active

Think of talking to two people at once on Facebook Messenger. concat() is the scenario where you get messages from both, but finish your conversation with one person before responding to the other. merge() is like creating a group chat and receiving both streams of messages simultaneously.

source1.
  concat(source2).
  subscribe(
    onNext(value) => console.log(`Next: ${value}`))
    // Prints 'Source 1' values first, THEN 'Source 2'

source1.
  merge(source2).
  subscribe(
    onNext(value) => console.log(`Next: ${value}`))
    // INTERLEAVES 'Source 1' and 'Source 2' values

See the Pen 10 Functions from RxJS // merge & concat by SitePoint (@SitePoint) on CodePen.

The concat() stream will print all of the values from source1 first, and only begin printing values from source2 after source1 completes.

The merge() stream will print values from source1 and source2 as it receives them: It won’t wait for the first stream to complete before emitting values from the second.

8. Using switch()

Often, we want to listen to an Observable emitting Observables, but only pay attention to the latest emission from the source.

To extend the Facebook Messenger analogy further, switch() is the case where you . . . Well, switch who you respond to, based on who’s currently sending messages.

For this, RxJS provides switch.

User interfaces furnish several good use cases for switch(). If our app fires off a request every time a user selects what they want to search for, we can assume they only want to see results from the latest selection. So, we use switch() to listen to only the result from the latest selection.

While we’re at it, we should make sure not to waste bandwitdh by only hitting the server for the last selection a user makes every second. The function we use for this is called debounce()

If you want to go in the other direction, and only honor the first selection, you’d use throttle(). It has the same API, but opposite behavior.

See the Pen 10 Functions from RxJS // switch, combineLatest, & distinctUntilChanged by SitePoint (@SitePoint) on CodePen.

9. Coordinating streams

What if we want to allow the user to search for a post or user with a specific ID?

To demonstrate, we’ll create another dropdown, and allow users to choose the ID of the item they want to retrieve.

There are two scenarios. We kick off a request when the user:

  1. Changes either selection; or
  2. Changes both selections.

Respond to changes to either stream with combineLatest()

In the former case, we need to create a stream that fires a network request with:

  1. Whichever endpoint the user most recently selected; and
  2. Whichever ID the user most recently selected.

. . . And do so whenever the user updates either selection.

This is what combineLatest() is for:

// User's selection for either POSTS or USERS data
const endpoint_stream = 
  Rx.Observable.fromEvent(select_endpoint, 'click').
    map(event  => event.target).
    map(target => (target.options[target.selectedIndex].text.toLowerCase()));

// Which item ID the user wants to retrieve
const id_stream = 
  Rx.Observable.fromEvent(select_id, 'click').
    map(event  => event.target).
    map(target => (target.options[target.selectedIndex].text));

// Emits a pair of the most recent selections from BOTH streams 
//   when EITHER emits a value
const complete_endpoint_stream = 
  endpoint_stream.combineLatest(id_stream);

See the Pen 10 Functions from RxJS // combineLatest & distinctUntilChanged by SitePoint (@SitePoint) on CodePen.

Whenever either stream emits a value, combineLatest() takes the emitted value and pairs it with the last item the other stream emitted, and emits the pair in an array.

This is easier to visualize in a diagram:

// stream1 : Emits 1
// stream2 : Emits 1

combined : Emits [1, 1]

// stream2: Emits 2

combined : Emits [1, 2]

// stream2: Emits 3

combined : Emits [1, 3]

Respond only to changes in both streams with zip

To wait until the user updates their selection for both the id and the endpoint fields, replace combineLatest() with zip().

Again, this is easier to understand through a diagram:

// stream1 : Emits A
// stream2 : Emits 1
zipped : Emits [A, 1]

// stream2: Emits 2
zipped : Emits NOTHING

// stream2: Emits 3
zipped : Emits NOTHING

// stream1: Emits B
zipped : Emits [B, 2]

// stream1: Emits C
zipped : Emits [C, 3]

Unlike combineLatest(), zip() waits until both Observables have emitted something new before emitting its array of updated values.

10. takeUntil

Finally, takeUntil() allows us to listen to a first stream until a second one starts emitting values.

source1.
  takeUntil(source2);

This is useful when you need to coordinate streams, but not necessarily combine them.

Wrapping Up

The simple fact of adding a time dimension to arrays opens the doors to a whole new way of thinking about programs.

There’s far more to RxJS than what we’ve seen here, but this is enough to get surprisingly far.

Get started with RxJS Lite, keep the documentation handy, and take the time to get your hands dirty. Before you know it, everything will look like a stream . . . Because everything is.

More:
Login or Create Account to Comment
Login Create Account
Recommended
Sponsors
Get the most important and interesting stories in tech. Straight to your inbox, daily.