How to create an RxJS Observable - manually and by use of Creation Operators

Photo by qi xna on Unsplash

How to create an RxJS Observable - manually and by use of Creation Operators

Reactive programming is somewhat of a shift in thinking on how you get values from a source of data. Instead of you asking the source every few seconds for an update, the source sends you data you are interested in over time. In other words, we are talking about a stream of data that comes to you over time.

In this first article (in my RxJS series), I want to focus on how to create such a data source.

Terminologies

  • Observable: A data source that outputs a stream of data over time.
  • Observer: A consumer that subscribes to listen for data from the observable.

Most of the time you will be observing (as an Observer) on an existing Observable, however, there are cases where you will want to create an Observable yourself. For example, when you want to listen to some DOM events, or, you want to create some mock data for your Unit Test.

Manual Observable creation

To create an Observable manually, we can use the Observable constructor:

import { Observable } from 'rxjs';
let source$ = new Observable(subscriber => {
  subscriber.next("Hello");
  subscriber.next("World");
});
source$.subscribe(console.log);

// Output on console:
// Hello
// World

We can use the following two methods on the subscriber parameter:

  1. error(): Emit an error, which stops emitting data hence forward.
  2. complete(): Completes the source - nothing will be emitted hence forward.
let source$ = new Observable(subscriber => {
  subscriber.next(1);
  subscriber.error('oops');
  subscriber.next(2);  // will not be emitted
});

source$.subscribe({
  next: console.log,
  error: console.error,
  complete: () => console.log('Done')
});

// Output on console, oops in red:
// 1
// oops

Creation Operators

In RxJS terminologies, operators that let you create an Observable are called Creation Operators.

of Operator

The of operator takes in a parameter of values that it then streams out one after the other.

import { of } from 'rxjs';

let source$ = of(1,2,3,4);
source$.subscribe(console.log);

// Output on the console will be: 
// 1
// 2
// 3
// 4

We need to remember that the of operator spits out the values one at a time. It doesn't care if the first value is an array - it does not flatten the array (or do any other manipulation of it).

import { of } from 'rxjs';
let source$ = of(1,2,3,[4,5,6]);
source$.subscribe(console.log);

// Output on the console will be:
// 1
// 2
// 3
// [4,5,6]

from operator

This operator works almost similar to theof operator, except that it is a bit more intelligent. The from operator looks at the type passed and tries to emit it's content one by one. For example, if you pass it an array, instead of emitting the whole array (like the way 'of' does), from will emit each item in the array one at a time. Note that it does not break down the sub items - so if element 3 is a sub array, it won't break that array down too. If you pass the from operator a string, it will emit the contents of the string one by one.

import { from } from 'rxjs';
let source$ = from('test');
source$.subscribe(console.log);

// Output on the console:
// t
// e
// s
// t

Compare the output above to the 'of' operator and you should see that the 'of' operator spits out 'test' as the only item it emits. Try passing a promise to the from operator and see if it plays nice.

import { from } from 'rxjs';
let source$ = from(Promise.resolve('oops'));
source$.subscribe({ next: console.log, error: console.err });

// Output will show in red - telling us that console.err was called!
// oops

interval operator

This operator emits values from 0 onwards in a specified interval spacing. For example, we have a sequence of values emitted every 5 seconds by passing the interval operator some interval time.

import { interval } from 'rxjs';
let source$ = interval(5000);
source$.subscribe(console.log);

// Output on the console every 5 seconds, incremented:
// 0
// 1
// 2
// .... n

timer operator

If you want to wait before emitting some value, use this operator. The timer operator takes in two parameters. The first one tells it how long to wait before emitting the first value. The second parameter tells it how much time to wait between subsequent emissions. Again, values start with 0 and keep on incrementing by 1.

import { timer } from 'rxjs';
let source$ = timer(1000);
source$.subscribe(console.log);

// Output after 1 second:
// 0

Remember, if you specify only the first parameter, the observable will end after emitting the first value. You must specify a second parameter make the values continue indefinitely.

fromEvent operator

This operator lets you subscribe to DOM events and every time that event fires, your observable emits respective event.

Here is an example of listening to a click event on the document:

import { fromEvent } from 'rxjs';
let documentClick$ = fromEvent(document, 'click');
documentClick$.subscribe(console.log);

Try using the 'keypress' event and you can imagine how powerful this operator is. You can limit the event to a specific HTML element too. For example, imagine that you have a div tag with an id of 'container':

import { fromEvent } from 'rxjs';
let containerDiv = document.getElementById('container');
let containerClick$ = fromEvent(containerDiv, 'click');
containerClick$.subscribe(console.log);

Important Points

We talked about how Observables are a stream of data source, but there are a few things I must point out:

  1. Observables don't emit any data until they are subscribed to.
  2. Subscribing to an Observable is through the 'subscribe' method. However, this doesn't specify when the subscriber can stop. Remember, if there is even a single subscriber listening, the Observable will keep on emitting data (hence a potential memory issue).

To stop an observable from emitting data, one of the three things below must happen:

  1. The source errors (subscriber.error()).
  2. The source completes (subscriber.complete, or, timer(1000) where the operator completes.
  3. The subscriber unsubscribes. Remember, if there are no subscribers, the source will stop.

Every subscribe call returns us a subscription. All we have to do is call 'unsubscribe' on this returned object.

import { interval } from 'rxjs';
let source$ = interval(1000);
let subscription = source$.subscribe(console.log);
setTimeout(() => subscription.unsubscribe(), 1000);

// Output
// 0

The reason we use setTimeout is because we want to wait for a second so that the source has a chance to emit some value for us to see.

  1. The subscribe method takes in an object with the following three keys:
  2. next: The callback to fire when the source emits a data value successfully
  3. error: The callback to fire when an error is fired by the source
  4. complete: The callback to fire when the Observable completes.
let source$ =  // some source observable
let subscription = source$.subscribe({
  next: (value) => console.log(value),
  error: (err) => console.log(err),
  complete: () => console.log('Completed');
});
// In the future, unsubscribe
setTimeout(() => subscription.unsubscribe(), 5000);

I purposely kept my examples verbose. You can easily save some lines of code as shown below:

import { of } from 'rxjs';
of([1,2,3]).subscribe(console.log).unsubscribe();

In an upcoming article in this series, I will go over some important operators that combine multiple Observables to do some very operations!