Previous: Observables and Observers

Creating Observables

Before we can start manipulating observables and subscribing to them, we need to create an observable. In the wild, you’ll often receive an observable from a library or framework, or from a Subject (we’ll cover that later), but when you do need to create one, these are your options.

of

RxJS provides some helper functions to create an Observable from a different type. of is the most straightforward and commonly used in tests, or for cleaning up edge cases of logic involving Observables. As we briefly mentioned in the previous section, of creates an Observable that emits a single value and then completes. The parameter passed to it will be the value emitted from next.

const foo$ = of('foo');

foo$.subscribe({
    next: x => console.log(`received value: ${x}`),
    error: e => console.error(`ERROR: ${e}`),
    complete: () => console.log('complete')
})
  received value: foo
  complete

This is very useful for testing, when you just need to create an observable to confirm that it is subscribed to with the correct next behavior. It is also useful when you are a function returns an asynchronous value, but with a default synchronous value:

function getUserDisplayName(userId: string): Observable<string> {
    if (userId) {
        return getUserDisplayName(userId);
    } else {
        return of('Anonymous')
    }
}

Note: This is the RxJS equivalent of Promise.resolve().

throwError

The flip side of of is throwError. This creates an Observable that immediately throws the provided error. This is useful in similar situations to of, but when handling the error case.

Example:

const foo$ = throwError('foo');

foo$.subscribe({
    next: x => console.log(`received value: ${x}`),
    error: e => console.error(`ERROR: ${e}`),
    complete: () => console.log('complete')
});
x ERROR: foo

from

Whereas of and throwError don’t care what they are passed and blindly emit or throw the provided value, from attempts to convert existing streams of data into an Observable. The idea is for from to be a function where you just throw anything vaguely observable-ish into it, and it just “figures it out” to create an Observable out of it.

The simplest usage is passing in an array. An Observable is created that emits each value of the array immediately, in order, and then completes:

const foo$ = from(['foo', 'bar', 'baz']);

foo$.subscribe({
    next: x => console.log(`received value: ${x}`),
    error: e => console.error(`ERROR: ${e}`),
    complete: () => console.log('complete')
});
  received value: foo
  received value: bar
  received value: baz
  complete

The from function can also take a Promise. It is converted into an Observable that emits and completes when the Promise resolves, or errors when the Promise rejects:

const foo$ = from(new Promise((resolve, reject) => {
    setTimeout(() => resolve('pizza'), 300);
}));

const bar$ = from(new Promise((resolve, reject) => {
    setTimeout(() => reject('taco'), 900);
}));

foo$.subscribe({
    next: x => console.log(`foo received value: ${x}`),
    error: e => console.error(`foo ERROR: ${e}`),
    complete: () => console.log('foo complete')
});

bar$.subscribe({
    next: x => console.log(`bar received value: ${x}`),
    error: e => console.error(`bar ERROR: ${e}`),
    complete: () => console.log('bar complete')
});
  foo received value: pizza
  foo complete
  bar ERROR: taco

Arrays and Promises are by far the most common uses of from in the wild, but it also accepts other things, like iterables and generators. The resulting Observable will call next for each value in the iterable or generated by the generator, and if the sequence ever ends, it will call complete (similarly to an array).

Observable.EMPTY

This constant is an observable that never emits or errors. It just completes immediately.

Observable.EMPTY.subscribe({
    next: x => console.log(`received value: ${x}`),
    error: e => console.error(`ERROR: ${e}`),
    complete: () => console.log('complete')
});
  complete

A common mistake is confusing Observable.EMPTY with of() (that’s of with no parameter provided). The key to remembering this is that JavaScript does NOT have function overloading. Instead, any parameters not provided are considered undefined. This means that of() and of(undefined) are exactly the same at runtime. of() (and of(undefined)) creates an Observable that emits undefined and then completes, whereas Observable.EMPTY does not emit at all. It only completes.

Compare the following usage of of() to the previous example:

of().subscribe({
    next: x => console.log(`received value: ${x}`),
    error: e => console.error(`ERROR: ${e}`),
    complete: () => console.log('complete')
});
  received value: undefined
  complete

new Observable()

If you want to create an Observable from scratch with complex logic, you can use its constructor. It takes a function, sort of like Promise, but instead of taking resolve and reject callbacks, this function takes an Observer. This function runs whenever subscribe is called, generating values for the subscribed Observer. Note that even though subscribe takes a Partial<Observer>, the parameter for this function is just an Observer, so you don’t have to worry about nullish-checking the next, error, and complete handlers.

Example:

const foo$ = new Observable(observer => {
    observer.next('foo');
    setInterval(() => observer.next('bar'), 500);
    setTimeout(() => observer.complete, 5000);
});

foo$.subscribe({
    next: console.log,
    error: console.error,
    complete: () => console.log('complete')
});
  foo
  bar
  bar
  bar
  bar
  bar
  bar
  bar
  bar
  bar
  complete

This emits 'foo' immediately, then 'bar' every 500ms, until after 5 seconds, it completes. However, beware of hanging async code when creating Observables manually. Here there is an interval running that is never cleared. When the Observable completes, it prevents any future calls to next from being passed along to subscribers, but it does not clear your timers for you (or resolve other hanging async activity). Running the above code prints the expected output, but the program never exists, as the setInterval is still running in the background, calling next and generating no output.

In this particular case, it can be resolved like this:

const foo$ = new Observable(observer => {
    observer.next('foo');
    const timer = setInterval(() => observer.next('bar'), 500);
    setTimeout(() => {
        observer.complete()
        clearInterval(timer);
    }, 5000);
});

foo$.subscribe({
    next: console.log,
    error: console.error,
    complete: () => console.log('complete')
});

Also, note that since the Observer being passed in isn’t literally the Observer provided to subscribe() (but rather an object with a reference to your Observer’s functions), it needs to preserve this when running the handlers. Therefore, you shouldn’t provide them directly as callbacks (wrap them with an arrow function):

// Incorrect
const bad$ = new Observable(observer => {
    observer.next(42);
    setTimeout(observer.complete, 500); // Don't do this
})
bad$.subscribe({
    next: console.log,
    error: console.error,
    complete: () => console.log('complete')
});
  42
x TypeError: this._complete is not a function
// Correct
const good$ = new Observable(observer => {
    observer.next(42);
    setTimeout(() => observer.complete(), 500); // Do this instead
})
good$.subscribe({
    next: console.log,
    error: console.error,
    complete: () => console.log('complete')
});
  42
  complete

Promise vs Observable

Now that we know how to create an Observable, we can properly discuss the differences between a Promise and an Observable.

The difference between a Promise and an Observable is that a Promise represents a single asynchronous value, whereas an Observable represents a stream of asynchronous values. Promise is to Observable as a single value is to an array, and Promise is to a single value as Observable is to an array.

This chart lays this relationship out more clearly: | | Single Value | Many Values | | —————- | ———– | ———– | | Synchronous | Value | Array | | Asynchronous | Promise | Observable |

One thing to point out is that we can represent any of these using the type below or to the right of them, but not the other way around, almost like a subset relationship. In this example, we’ll avoid using things like of and from for the sake of demonstrating the behavior:

const aValue = 5;
const aArray = [5]; // Equivalent to [aValue]
const aPromise = new Promise((resolve, _) => resolve(5)); // Equivalent to Promise.resolve(aValue)
const aObservable = = new Observable(observer => {
    observer.next(5);
    observer.complete();
}); // Equivalent to of(aValue)

const bArray = [5, 8, 9, 1];
const bObservable = new Observable(observer => {
    observer.next(5);
    observer.next(8);
    observer.next(9);
    observer.next(1);
    observer.complete();
}); // Equivalent to from(bArray)

const cPromise = new Promise((resolve, reject) => {
    setTimeout(() => resolve(8), 900);
});
const cObservable = new Observable(observer => {
    setTimeout(() => observer.next(8), 900);
}); // Equivalent to from(cPromise)

const dObservable = new Observable(observer => {
    observer.next(5);
    if (flipACoin()) {
        observer.next(6);
        setTimeout(() => observer.next(82), 500);
        setTimeout(() => {
            observer.next(503);
            if (flipACoin()) {
                observer.complete();
            } else {
                setTimeout(() => observer.complete, 800);
                observer.next(47);
            }
        })
    } else {
        observer.next(18);
        observer.error({ error: 'it failed' });
    }
});

A couple things to note for the above example:

First of all, you might be thinking, “But you can represent an array using a Promise! Just use Promise.resolve([5, 8, 9, 1])!” (or new Promise((resolve, _) => resolve([5, 8, 9, 1])). The problem here is that the Promise still isn’t aware of the multiple values. In order to access them or remove/insert values, we need to first resolve the Promise so we’re working directly with the array, and then manipulate the array. The key thing here is that the multiple values are still being handled by the array, not the Promise itself, whereas the Observable needs no array. It handles each value itself.

Second, we can notice something interesting here. Notice how dObservable has logic in it, but cObservable and cPromise do not? This is due to another difference between Promise and Observable: both take a function in their constructor, but the function for the Observable is evaluated every time it is subscribed to, whereas the function for a Promise is only evaluated once when the constructor is called. For example:

const foo = new Promise((resolve, reject) => {
    if (flipACoin()) {
        resolve(generateRandomNumber());
    } else {
        reject({ error: 'it failed' });
    }
});

foo.then(console.log, console.error);
foo.then(console.log, console.error);
foo.then(console.log, console.error);
foo.then(console.log, console.error);
foo.then(console.log, console.error);
** First Run **
  0.15903534568282507
  0.15903534568282507
  0.15903534568282507
  0.15903534568282507
  0.15903534568282507

** Second Run **
x { error: 'it failed' }
x { error: 'it failed' }
x { error: 'it failed' }
x { error: 'it failed' }
x { error: 'it failed' }

** Third Run **
  0.27123801558195737
  0.27123801558195737
  0.27123801558195737
  0.27123801558195737
  0.27123801558195737

Here when we ran the code, we could see that it either resolves with the same random number 5 times or rejects with the same error 5 times. What if we used the same(ish) code to create an Observable?

const foo$ = new Observable(observer => {
    if (flipACoin()) {
        observer.next(generateRandomNumber());
        observer.complete();
    } else {
        observer.error({ error: 'it failed' });
    }
});

foo$.subscribe(console.log, console.error);
foo$.subscribe(console.log, console.error);
foo$.subscribe(console.log, console.error);
foo$.subscribe(console.log, console.error);
foo$.subscribe(console.log, console.error);
** First Run **
x { error: 'it failed' }
x { error: 'it failed' }
  0.10426312822332862
  0.06675352215521824
x { error: 'it failed' }

** Second Run **
x { error: 'it failed' }
x { error: 'it failed' }
  0.031318969080073256
x { error: 'it failed' }
  0.12634657092572255

Here, we can see that the code re-ran for each subscription and generated new random numbers. If we want to achieve the behavior of a Promise where the code is evaluated once, there are a few options. The one I would recommend is using a Subject, which we will cover later.

So, which should I use?

In situations where we would normally use a Promise, if we’re using RxJS, we may want to consider an Observable instead. This may seem like an “I have a hammer, so everything’s a nail” situation at first, but there is benefit to using Observables.

First of all, as we’ll discuss later, Observable operators are very convenient, even when working with only a single value.

Second, it is easier to keep track of one system for asynchronous values, rather than needing to consider which we’re working with in each situation and remembering the differences.

Finally, it is uncommon for each piece of data in your application to be completely separate. When Promise-based code and Observable-based code interact, or calculations need to be done involving values from both, it can be confusing and error-prone to be converting back and forth. It is far easier to just use Observables for everything.

Plus, thanks to the shorthand we just discussed, changing from a Promise to an Observable is as simple as changing then to subscribe. (with the exception of chaining, which has slightly different syntax that we’ll cover later).

somePromise.then(value => {
    // do some stuff
}, error => {
    // do some other stuff
});

someObservable$.subscribe(value => {
    // do some stuff
}, error => {
    // do some other stuff
});

Next: Operators