Let's write some useful custom RxJS operators

Angular Dynamic Overlay

RxJS comes with a decent number of operators out of the box that help you combine, filter or transform observables the way you want. I like to see them as the swiss army knife of observable tools: They will always get the job done no matter what. But sometimes you just don’t want to cut a loaf of bread with the tiny swiss army knife. I mean - it works, but it’s much nicer to cut your bread with a bread knife. So lets make some RxJS bread knives!

We’ll be hands on and there won’t be too much theory in this post. The operators we are about to write are all based on existing RxJS operators. If you are looking for a lower level look into the fundamentals of writing operators from scratch take a look at this article by Netanel Basal.

All code mentioned in this post is also available in this GitHub Repo.
Let’s get started!

tapLog

To warm up, lets start with a classic example of custom RxJS operators: Logging emissions of an observable. We all know, most devs shy away from “advanced” debugging tools like breakpoints and the vscode debugger. So naturally the first thing we want to do if something does not work as expected is console-logging emissions of an observable to know what actually is going on.
The usual way to achieve this is by using the tap operator and then logging the emitted value to the console.

1
2
3
4
5
6
const fruits$ = from(['banana', 'coconut', 'pineapple'])
fruits$.pipe(tap((v) => console.log(v))).subscribe()
// Log output:
// banana
// coconut
// pineapple

This approach is not too bulky at first glance, but if you want to use this on a regular basis you quickly get annoyed by writing the same tap over and over again. Writing a custom operator can fix that! Also this easy example is a great way to understand how RxJS operators work:

1
2
3
4
5
6
7
8
9
// an operator always has to accept an observable as input
// and return one again to allow piping multiple operators
export function tapLog<T>( // the generic parameter helps us conserve the type of the observable
prefix = 'Tap Log: ' // add an optional parameter to prefix the logging
) {
return (source$: Observable<T>): Observable<T> =>
// on te source observable we can pipe any existing operators
source$.pipe(tap((value) => console.log(prefix, value)))
}

So basically an operator is a function that accepts and returns an observable and adds some logic to the stream in between.

Using our new tapLog operator, we now can conveniently log values of any existing observable with ease:

1
2
3
4
5
6
const fruits$ = from(['banana', 'coconut', 'pineapple'])
fruits$.pipe(tapLog()).subscribe()
// Log output:
// Tap Log: banana
// Tap Log: coconut
// Tap Log: pineapple

As a tiny bonus feature we can also pass a custom prefix to the operator to make it easier to find the log output in the console!

invert

Another operation that i see myself writing over and over again is probably even more simple: Logically inverting a boolean (or actually any truthy) value.

1
2
3
4
5
6
7
export function invert() {
return (source$: Observable<boolean>): Observable<boolean> =>
source$.pipe(
map((v) => !v),
distinctUntilChanged() // to avoid redundant boolean emissions
)
}

Using the invert operator, this

1
obs$.pipe(map((v) => !v))

becomes this:

1
obs$.pipe(invert())

Just like tapLog, it does not look like a huge difference at first glance, but keep in mind if we use this all over the place in a large project, every instance of mapping and inverting adds another logical branch to the complexity of your code.

every, none and some

Now lets have a look at three new operators together: every, none and some.

Especially when using any redux-ish state management in a larger project, you will occasionally want to aggregate observables using combineLatest. Let’s look at a real world example for this:

1
2
3
4
5
const buttonDisabled$ = combineLatest([
conditionA$,
conditionB$,
conditionC$,
]).pipe(map(([conA, conB, conC]) => conA && conB && conC))

Here we want to determine if we need to disable a button, based on the truthyness of three values that all come from different observables. To achieve that, we use map and then just check all of them for truthyness individually. This again comes with the cost of increasing complexity and being a little verbose. Of course we can also tackle this issue with custom operators. In this case i opted for creating three of them for the most common combinations of conditions:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* Returns a truthy observable if all input values are truthy
*/
export function every<T>() {
return (source$: Observable<any[]>): Observable<boolean> =>
source$.pipe(
map((values: any[]) => !values.some((v) => !v)),
distinctUntilChanged()
)
}

/**
* Returns a truthy observable if no input values are truthy
*/
export function none<T>() {
return (source$: Observable<any[]>): Observable<boolean> =>
source$.pipe(
map((values: any[]) => !values.some((v) => !!v)),
distinctUntilChanged()
)
}

/**
* Returns a truthy observable if at least one input value is truthy
*/
export function some<T>() {
return (source$: Observable<any[]>): Observable<boolean> =>
source$.pipe(
map((values: any[]) => values.some((v) => !!v)),
distinctUntilChanged()
)
}

Using these operators, our button disabled example becomes much easier to read. Also the other conditions can be implemented quick and easy:

1
2
3
4
5
const conditions$ = combineLatest([conditionA$, conditionB$, conditionC$])

const allTruthy$ = conditions$.pipe(every())
const allFalsy$ = conditions$.pipe(none())
const atLeastOneTruthy$ = conditions$.pipe(some())

notEmpty

Now let’s get into the realm of filtering operators. Let’s start again with a super simple but yet also super useful one. Let’s say we have an observable that emits an array and we need to ensure it never emits an empty array.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
export function notEmpty<T>() {
return (source$: Observable<T[]>): Observable<T[]> =>
source$.pipe(
filter((values: T[]) => values.length > 0),
distinctUntilChanged()
)
}

// usage
const obs$ = from([['banana'], [], ['coconut']])
obs$.pipe(notEmpty()).subscribe()

// Emissions:
// ['banana']
// ['coconut']

This approach can easily be adapted to many kinds of empty checks. Empty objects, null-ish values, empty properties, zero values etc.
Sky is the limit.

distinctUntilArrayChanged

A filtering operator that you most likely already know and use a lot is distinctUntilChanged. It filters redundant successive emissions and is therefore especially useful to mitigate unneeded expensive operations like re-rendering of your templates (looking at you, angular ;).
The downside is though, that it only works with primitive values, as the operator uses a simple strict equality check (===) under the hood. We can pass a compare function to work around this, but at this point you might have guessed already that this it is not an acceptable solution for me.
The distinctUntilArrayChanged operator makes use of the customization of the normal distinctUntilChanged and encapsulates the logic for checking the (shallow) equality of two arrays.

1
2
3
4
5
6
7
8
9
10
11
12
export function distinctUntilArrayChanged<T, U extends T[] = T[]>() {
return (source$: Observable<U>): Observable<U> =>
source$.pipe(
distinctUntilChanged((prevArr: U, nextArr: U): boolean => {
const indexes = prevArr.map((_, i) => i)
return (
prevArr.length === nextArr.length &&
indexes.every((cur) => prevArr[cur] === nextArr[cur])
)
})
)
}

Usage:

1
2
3
4
5
6
const obs$ = from([['foo', 'bar'], ['foo', 'bar'], ['foo']])
obs$.pipe(distinctUntilArrayChanged()).subscribe()

// emissions:
// ['foo', 'bar']
// ['foo']

It is possible to extend this operator again to add a deep equality check between the arrays if needed. You could consider adding a check using deep equal or add your own logic that is optimized for your exact use case.

unPluck

This one I would consider at least “special interest”, as its usage really requires a very specific scenario.

Let’s say we need to combine emissions from different observables into a single one with an object containing all the values from the others. This could look something like this:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
const banana$ = from(['banana value'])
const coconut$ = from(['coconut value'])

merge(
banana$.pipe(map((banana) => ({ banana }))),
coconut$.pipe(map((coconut) => ({ coconut })))
)
.pipe(scan((acc, val) => ({ ...acc, ...val })))
.subscribe()

// ultimate emission:
// {
// banana: 'banana value',
// coconut: 'coconut value'
// }

To simplify this, we can create an operator that constructs the partial objects for us:

1
2
3
4
5
6
7
export function unPluck<T, K extends string | number = string>(key: K) {
return (source$: Observable<T>) =>
source$.pipe(
distinctUntilChanged(),
map((value: T) => ({ [key]: value } as { [key in K]: T }))
)
}

I named it unPluck because it basically does the opposite of RxJS pluck. Using unPluck the example now looks like this:

1
2
3
4
// `'banana value' -> unPluck('banana') -> { banana: 'banana value' }`
merge(banana$.pipe(unPluck('banana')), coconut$.pipe(unPluck('coconut')))
.pipe(scan((acc, val) => ({ ...acc, ...val })))
.subscribe()

Bonus: selectWithParams

This one is a very specific operator that i use on a daily basis working with memoized projections in ngrx. So if you are not familiar with that you can easily ignore this one.

Using the defaultMemoize function inside of an ngrx selector will result in an Observable containing an object with a function memoized when selecting state. Parameters can be passed to this function to call the business logic inside the selector. An advantage of this approach is the built-in memoization of former calls to the parametrized selector. A disadvantage is the bulky way to actually get to the memoized function when selecting state. Usually this would look something like this:

1
2
3
4
5
6
7
8
9
10
namespace fromExample {
// definition of the parametrized selector using ngrx `defaultMemoize`
export const selectById = createSelector(state, (s) =>
defaultMemoize((id: string) => e[id])
)
}
// selecting state using the parametrized selector, mapping to the memoized function and passing params
const selectedObject$ = this.store
.select(fromExample.selectById)
.pipe(map(({ memoized }) => memoized('exampleid')))

Now if you have to deal with a bunch of this type of selectors, it gets annoying very fast to always map to the memoized function and then passing parameters inside the map. To simplify this process we can encapsulate the logic of selecting state and mapping to the memoized function in a new custom operator:

1
2
3
4
5
6
7
8
9
10
export function selectWithParams<State, Proj extends (...args: any[]) => any>(
mapFn: (state: State) => MemoizedProjection,
...params: Parameters<Proj>
) {
return (source$: Observable<State>) =>
source$.pipe(
select(mapFn),
map(({ memoized }) => memoized(...params))
)
}

Using this, selecting state with a parametrized selector looks nice and clean:

1
2
3
4
const selectedObject$ = this.store.selectWithParams(
fromExample.selectById,
'exampleid'
)

And now: You!

I hope I was able to show you how to create different kinds of useful every-day RxJS operators in this post. Of course there are virtually endless possibilities of creating even more of them. I’m curious to hear from you what other operators should be created. If you have an idea or already wrote some that you want to share, drop me a DM on any platform or just do a PR on the repo with your custom operator. Maybe at some point its even worth it to release an npm package :)
Until then i can only say thanks for reading this post and have a good day.

PS. I promise the next one is not gonna take a year again!