Marmicode
Blog Post
Younes Jaaidi

End-to-End HTTP request cancelation with RxJS & NestJS

by Younes Jaaidi • 
Feb 20, 2020 • 6 minutes
End-to-End HTTP request cancelation with RxJS & NestJS

Life is too short. When searching for something, we can’t afford to type a whole word or sentence in a search field, or filling all the fields then hitting our old keyboard’s half-broken enter key to finally be able to see the first results... or nothing at all because our search criteria were too restrictive.

Don’t look at me like that! We can probably agree that most of us, if not all, are used to features like typeahead and live search results. We get frustrated every time we have to submit a search form.

TL;DR:

  • if you are using NestJS, you will need this interceptor,
  • if you are not using NestJS then maybe you should,
  • we have to think reactively, I agree that it can have a steep learning curve but think about the pleasure of sliding on the other side of the hill ⛷,
  • we can and should use RxJS everywhere,
  • we should use observables even for single value streams,
  • we should not ignore observables teardown logic.

🚨 Reactive Programming & RxJS to the rescue

Implementing these kinds of features can be tricky, especially if developed from scratch and with an imperative approach. That’s when reactive programming and RxJS come to the rescue. In fact, RxJS provides the right tooling and operators to implement these features in a few lines. RxJS is such a perfect fit for these scenarios that most courses and tutorials cover the live search topic. It helps understand both how reactive programming works and how it can easily solve some challenging issues.

That’s when we end up with this common recipe:

keywords$ = this.keywordsControl.valueChanges;
data$ = keywords$.pipe(
  /* Wait for the user to stop typing for 100ms and emit last value. */
  debounceTime(100),
  /* Ignore identical successive values
   * (e.g. user pastes the same value in the input). */
  distinctUntilChanged(), 
  /* when new keywords are emitted, this unsubscribes from the previous
   * search result (canceling the underlying http request)
   * and subscribes to the new one. */
  switchMap(keywords => this.search(keywords))
)

The illustration below might help you notice the difference between RxJS flattening strategies and the related operators:

RxJS Flattening Strategies

but if it doesn't help, you should definitely check out the great work by my buddy Shai Reznik: https://medium.com/@shairez/a-super-ninja-trick-to-learn-rxjss-switchmap-mergemap-concatmap-and-exhaustmap-forever-88e178a75f1b

🐢 Hey Debounce! Stop bullying my low latency!

The problem is that you are probably investing a lot of energy and money in producing low latency architectures and APIs but all these efforts just vanish when we introduce the artificial latency created by the debounceTime operator.

What if we just get rid of the debounce? We are using switchMap after all, and unnecessary requests are immediately canceled.

Request cancelation

Wait a second! What happens on the back-end? Is the back-end "work" interrupted by some voodoo magic? Or did we just trigger some crazy chaos where the back-end is working for nothing until it realizes that the consumer is not there anymore?

🐈 Here comes the big cat

In a few words, NestJS is THE feature-rich NodeJS framework.

Amongst its wealth of features, there is native support of observables. This is quite handy even if we respond with a single value and not a stream of values. In fact, the interesting observables property we are looking for here is cancelability.

🧨 Observables Teardown Logic

Observables are said cancelable because we can unsubscribe whenever we need to, and interrupt the work. Cancelation works thanks to the teardown logic function returned when creating an observable.

Here’s an example of wrapping setInterval in an observable:

function interval(period) { return new Observable(observer => { let i = 0; const handle = setInterval(() => observer.next(i++), period); /* This is the teardown logic. */ return () => clearInterval(handle); }); }

As you can see, the observer function given to the Observable's constructor returns the teardown logic function that calls clearInterval in order to cancel the tasks scheduled by setInterval.

⚠️ This is exactly how you should NOT implement an interval. This implementation is scheduler naive. You should use interval or timer instead.

🧪 The experiment

For the experiment, I needed to run some slow CPU, filesystem and memory intensive work on the back-end for every request. The first idea that crossed my mind was reading a big text file line by line and matching every one of them against the given keywords. It turned out that even with a 1GB file, it was still quite fast.

That’s when I thought that reading multiple small files should be more inefficient. I just needed to generate a directory with lots of files... but wait! What about using node_modules directory 🤔

Bingo! It could not be worse and that is exactly what I needed.

The implementation looks something like this and as you can see, the teardown logic immediately stops crawling the directory and reading files when the observer unsubscribes.

function getFiles(directoryPath) { return new Observable(observer => { ... return () => walker.pause(); } } function readLines(filePath) { return new Observable(observer => { ... return () => reader.close(); } } function search(): Observable<Line[]> { return getFiles(nodeModulesPath) .pipe( mergeMap(file => readLines(file)), ... ); }

😔 The disappointment

In the animation below, we can observe high CPU usage and an exponential memory usage on the back-end and that canceling the requests, even the last one, doesn’t interrupt the work.

CPU &amp; Memory Usage

By diving a little bit in Nest’s source code, we can see that our observable is converted to a promise using toPromise method. In fact, Nest has to adapt to frameworks like ExpressJS that don’t handle observables.

public async transformToResult(resultOrDeferred: any) { if (resultOrDeferred && isFunction(resultOrDeferred.subscribe)) { return resultOrDeferred.toPromise(); } return resultOrDeferred; }

🔍 Detecting request cancelation

In Nest, request objects are instances of NodeJS’ IncomingMessage that trigger a close event when the connection is closed or when the HTTP2 stream is closed.

If we can detect when the request is canceled, then we can interrupt the work in our RxJS response stream.

Nest has an interesting concept called interceptors:

and it looks like this:

@Injectable()
export class NoopInterceptor implements NestInterceptor {
  intercept(context: ExecutionContext, next: CallHandler): Observable<unknown> {
    return next.handle();
  }
}

This makes it possible to write, in a single place, a function that:

  • intercepts every incoming HTTP request,
  • listens to the request’s close event,
  • does something to interrupt the work.

NestJS interceptors are Observable friendly

One of the interesting properties of Nest interceptors, compared to Express middlewares for example, is that the next parameter is not just a function that triggers the route function or the next middleware but it is an object with a handle method that returns an Observable.

Thanks to this feature, we can manipulate the response and the whole stream by adding operators to the given Observable.

For instance, we can detect the request cancelation by listening to the close event using RxJS's fromEvent and interrupt the Observable returned by the route handler using the takeUntil operator.

The final interceptor should look like this:

@Injectable() export class UnsubscribeOnCloseInterceptor implements NestInterceptor { intercept(context: ExecutionContext, next: CallHandler): Observable<unknown> { if (context.getType() !== 'http') { return next.handle(); } const request = context.switchToHttp().getRequest() as Request; const close$ = fromEvent(request, 'close'); return next.handle().pipe(takeUntil(close$)); } }

Let's try it out!

CPU &amp; memory usage with interceptor

As you can observe, thanks to the interceptor, canceling an HTTP request will automatically and almost immediately cancel the work by unsubscribing from the observable returned by the route handler. This reduces CPU, memory and all resources usage and interrupts all the work even when the user simply closes the window.

🧠 Think reactive

The key takeaway here is that by adopting a reactive approach and using observables everywhere, we can easily benefit from observables cancelability and boost APIs performance with a generic interceptor.

MongoDB query cancelation

What if our data source was a database like MongoDB? Can we interrupt the query?

🔗 Links

💻 Source code Nx monorepo with an Angular app, a NestJS API and custom CPU / Memory graphing app using Angular & GraphQL subscriptions.