Previous: Observables and Observers
Creating Observables
Before we can start manipulating observables and subscribing to them, we need to create an observable. In the wild, you’ll often receive an observable from a library or framework, or from a Subject
(we’ll cover that later), but when you do need to create one, these are your options.
of
RxJS provides some helper functions to create an Observable
from a different type. of
is the most straightforward and commonly used in tests, or for cleaning up edge cases of logic involving Observable
s. As we briefly mentioned in the previous section, of
creates an Observable
that emits a single value and then completes. The parameter passed to it will be the value emitted from next
.
const foo$ = of('foo');
foo$.subscribe({
next: x => console.log(`received value: ${x}`),
error: e => console.error(`ERROR: ${e}`),
complete: () => console.log('complete')
})
received value: foo
complete
This is very useful for testing, when you just need to create an observable to confirm that it is subscribed to with the correct next
behavior. It is also useful when you are a function returns an asynchronous value, but with a default synchronous value:
function getUserDisplayName(userId: string): Observable<string> {
if (userId) {
return getUserDisplayName(userId);
} else {
return of('Anonymous')
}
}
Note: This is the RxJS equivalent of Promise.resolve()
.
throwError
The flip side of of
is throwError
. This creates an Observable
that immediately throws the provided error. This is useful in similar situations to of
, but when handling the error case.
Example:
const foo$ = throwError('foo');
foo$.subscribe({
next: x => console.log(`received value: ${x}`),
error: e => console.error(`ERROR: ${e}`),
complete: () => console.log('complete')
});
x ERROR: foo
from
Whereas of
and throwError
don’t care what they are passed and blindly emit or throw the provided value, from
attempts to convert existing streams of data into an Observable
. The idea is for from
to be a function where you just throw anything vaguely observable-ish into it, and it just “figures it out” to create an Observable out of it.
The simplest usage is passing in an array. An Observable
is created that emits each value of the array immediately, in order, and then completes:
const foo$ = from(['foo', 'bar', 'baz']);
foo$.subscribe({
next: x => console.log(`received value: ${x}`),
error: e => console.error(`ERROR: ${e}`),
complete: () => console.log('complete')
});
received value: foo
received value: bar
received value: baz
complete
The from
function can also take a Promise
. It is converted into an Observable
that emits and completes when the Promise
resolves, or errors when the Promise
rejects:
const foo$ = from(new Promise((resolve, reject) => {
setTimeout(() => resolve('pizza'), 300);
}));
const bar$ = from(new Promise((resolve, reject) => {
setTimeout(() => reject('taco'), 900);
}));
foo$.subscribe({
next: x => console.log(`foo received value: ${x}`),
error: e => console.error(`foo ERROR: ${e}`),
complete: () => console.log('foo complete')
});
bar$.subscribe({
next: x => console.log(`bar received value: ${x}`),
error: e => console.error(`bar ERROR: ${e}`),
complete: () => console.log('bar complete')
});
foo received value: pizza
foo complete
bar ERROR: taco
Arrays and Promise
s are by far the most common uses of from
in the wild, but it also accepts other things, like iterables and generators. The resulting Observable
will call next
for each value in the iterable or generated by the generator, and if the sequence ever ends, it will call complete
(similarly to an array).
Observable.EMPTY
This constant is an observable that never emits or errors. It just completes immediately.
Observable.EMPTY.subscribe({
next: x => console.log(`received value: ${x}`),
error: e => console.error(`ERROR: ${e}`),
complete: () => console.log('complete')
});
complete
A common mistake is confusing Observable.EMPTY
with of()
(that’s of
with no parameter provided). The key to remembering this is that JavaScript does NOT have function overloading. Instead, any parameters not provided are considered undefined
. This means that of()
and of(undefined)
are exactly the same at runtime. of()
(and of(undefined)
) creates an Observable
that emits undefined
and then completes, whereas Observable.EMPTY
does not emit at all. It only completes.
Compare the following usage of of()
to the previous example:
of().subscribe({
next: x => console.log(`received value: ${x}`),
error: e => console.error(`ERROR: ${e}`),
complete: () => console.log('complete')
});
received value: undefined
complete
new Observable()
If you want to create an Observable
from scratch with complex logic, you can use its constructor
. It takes a function, sort of like Promise
, but instead of taking resolve
and reject
callbacks, this function takes an Observer
. This function runs whenever subscribe
is called, generating values for the subscribed Observer
. Note that even though subscribe
takes a Partial<Observer>
, the parameter for this function is just an Observer
, so you don’t have to worry about nullish-checking the next
, error
, and complete
handlers.
Example:
const foo$ = new Observable(observer => {
observer.next('foo');
setInterval(() => observer.next('bar'), 500);
setTimeout(() => observer.complete, 5000);
});
foo$.subscribe({
next: console.log,
error: console.error,
complete: () => console.log('complete')
});
foo
bar
bar
bar
bar
bar
bar
bar
bar
bar
complete
This emits 'foo'
immediately, then 'bar'
every 500ms, until after 5 seconds, it completes. However, beware of hanging async code when creating Observable
s manually. Here there is an interval running that is never cleared. When the Observable
completes, it prevents any future calls to next
from being passed along to subscribers, but it does not clear your timers for you (or resolve other hanging async activity). Running the above code prints the expected output, but the program never exists, as the setInterval
is still running in the background, calling next
and generating no output.
In this particular case, it can be resolved like this:
const foo$ = new Observable(observer => {
observer.next('foo');
const timer = setInterval(() => observer.next('bar'), 500);
setTimeout(() => {
observer.complete()
clearInterval(timer);
}, 5000);
});
foo$.subscribe({
next: console.log,
error: console.error,
complete: () => console.log('complete')
});
Also, note that since the Observer
being passed in isn’t literally the Observer
provided to subscribe()
(but rather an object with a reference to your Observer
’s functions), it needs to preserve this
when running the handlers. Therefore, you shouldn’t provide them directly as callbacks (wrap them with an arrow function):
// Incorrect
const bad$ = new Observable(observer => {
observer.next(42);
setTimeout(observer.complete, 500); // Don't do this
})
bad$.subscribe({
next: console.log,
error: console.error,
complete: () => console.log('complete')
});
42
x TypeError: this._complete is not a function
// Correct
const good$ = new Observable(observer => {
observer.next(42);
setTimeout(() => observer.complete(), 500); // Do this instead
})
good$.subscribe({
next: console.log,
error: console.error,
complete: () => console.log('complete')
});
42
complete
Promise vs Observable
Now that we know how to create an Observable
, we can properly discuss the differences between a Promise
and an Observable
.
The difference between a Promise
and an Observable
is that a Promise
represents a single asynchronous value, whereas an Observable represents a stream of asynchronous values. Promise
is to Observable
as a single value is to an array, and Promise
is to a single value as Observable
is to an array.
This chart lays this relationship out more clearly: | | Single Value | Many Values | | —————- | ———– | ———– | | Synchronous | Value | Array | | Asynchronous | Promise | Observable |
One thing to point out is that we can represent any of these using the type below or to the right of them, but not the other way around, almost like a subset relationship. In this example, we’ll avoid using things like of
and from
for the sake of demonstrating the behavior:
const aValue = 5;
const aArray = [5]; // Equivalent to [aValue]
const aPromise = new Promise((resolve, _) => resolve(5)); // Equivalent to Promise.resolve(aValue)
const aObservable = = new Observable(observer => {
observer.next(5);
observer.complete();
}); // Equivalent to of(aValue)
const bArray = [5, 8, 9, 1];
const bObservable = new Observable(observer => {
observer.next(5);
observer.next(8);
observer.next(9);
observer.next(1);
observer.complete();
}); // Equivalent to from(bArray)
const cPromise = new Promise((resolve, reject) => {
setTimeout(() => resolve(8), 900);
});
const cObservable = new Observable(observer => {
setTimeout(() => observer.next(8), 900);
}); // Equivalent to from(cPromise)
const dObservable = new Observable(observer => {
observer.next(5);
if (flipACoin()) {
observer.next(6);
setTimeout(() => observer.next(82), 500);
setTimeout(() => {
observer.next(503);
if (flipACoin()) {
observer.complete();
} else {
setTimeout(() => observer.complete, 800);
observer.next(47);
}
})
} else {
observer.next(18);
observer.error({ error: 'it failed' });
}
});
A couple things to note for the above example:
First of all, you might be thinking, “But you can represent an array using a Promise
! Just use Promise.resolve([5, 8, 9, 1])
!” (or new Promise((resolve, _) => resolve([5, 8, 9, 1])
). The problem here is that the Promise
still isn’t aware of the multiple values. In order to access them or remove/insert values, we need to first resolve the Promise
so we’re working directly with the array, and then manipulate the array. The key thing here is that the multiple values are still being handled by the array, not the Promise
itself, whereas the Observable
needs no array. It handles each value itself.
Second, we can notice something interesting here. Notice how dObservable
has logic in it, but cObservable
and cPromise
do not? This is due to another difference between Promise
and Observable
: both take a function in their constructor, but the function for the Observable
is evaluated every time it is subscribed to, whereas the function for a Promise
is only evaluated once when the constructor is called. For example:
const foo = new Promise((resolve, reject) => {
if (flipACoin()) {
resolve(generateRandomNumber());
} else {
reject({ error: 'it failed' });
}
});
foo.then(console.log, console.error);
foo.then(console.log, console.error);
foo.then(console.log, console.error);
foo.then(console.log, console.error);
foo.then(console.log, console.error);
** First Run **
0.15903534568282507
0.15903534568282507
0.15903534568282507
0.15903534568282507
0.15903534568282507
** Second Run **
x { error: 'it failed' }
x { error: 'it failed' }
x { error: 'it failed' }
x { error: 'it failed' }
x { error: 'it failed' }
** Third Run **
0.27123801558195737
0.27123801558195737
0.27123801558195737
0.27123801558195737
0.27123801558195737
Here when we ran the code, we could see that it either resolves with the same random number 5 times or rejects with the same error 5 times. What if we used the same(ish) code to create an Observable?
const foo$ = new Observable(observer => {
if (flipACoin()) {
observer.next(generateRandomNumber());
observer.complete();
} else {
observer.error({ error: 'it failed' });
}
});
foo$.subscribe(console.log, console.error);
foo$.subscribe(console.log, console.error);
foo$.subscribe(console.log, console.error);
foo$.subscribe(console.log, console.error);
foo$.subscribe(console.log, console.error);
** First Run **
x { error: 'it failed' }
x { error: 'it failed' }
0.10426312822332862
0.06675352215521824
x { error: 'it failed' }
** Second Run **
x { error: 'it failed' }
x { error: 'it failed' }
0.031318969080073256
x { error: 'it failed' }
0.12634657092572255
Here, we can see that the code re-ran for each subscription and generated new random numbers. If we want to achieve the behavior of a Promise
where the code is evaluated once, there are a few options. The one I would recommend is using a Subject
, which we will cover later.
So, which should I use?
In situations where we would normally use a Promise
, if we’re using RxJS, we may want to consider an Observable
instead. This may seem like an “I have a hammer, so everything’s a nail” situation at first, but there is benefit to using Observables
.
First of all, as we’ll discuss later, Observable
operators are very convenient, even when working with only a single value.
Second, it is easier to keep track of one system for asynchronous values, rather than needing to consider which we’re working with in each situation and remembering the differences.
Finally, it is uncommon for each piece of data in your application to be completely separate. When Promise
-based code and Observable
-based code interact, or calculations need to be done involving values from both, it can be confusing and error-prone to be converting back and forth. It is far easier to just use Observable
s for everything.
Plus, thanks to the shorthand we just discussed, changing from a Promise
to an Observable
is as simple as changing then
to subscribe
. (with the exception of chaining, which has slightly different syntax that we’ll cover later).
somePromise.then(value => {
// do some stuff
}, error => {
// do some other stuff
});
someObservable$.subscribe(value => {
// do some stuff
}, error => {
// do some other stuff
});