Marmicode
Blog Post
Younes Jaaidi

End-to-End HTTP request cancelation with RxJS & NestJS

by Younes Jaaidi • 
Feb 20, 2020 • 6 minutes
End-to-End HTTP request cancelation with RxJS & NestJS

Life is too short. When searching for something, we can’t afford to type a whole word or sentence in a search field, or filling all the fields then hitting our old keyboard’s half-broken enter key to finally be able to see the first results... or nothing at all because our search criteria were too restrictive.

Don’t look at me like that! We can probably agree that most of us, if not all, are . We get frustrated every time we have to submit a search form.

TL;DR:

🚨 Reactive Programming & RxJS to the rescue

Implementing these kinds of features can be tricky, especially if developed from scratch and with an imperative approach. That’s when reactive programming and RxJS come to the rescue. In fact, RxJS provides the right tooling and operators to implement these features in a few lines. RxJS is such a perfect fit for these scenarios that most courses and tutorials cover the live search topic. It helps understand both how reactive programming works and how it can easily solve some challenging issues.

That’s when we end up with this common recipe:

keywords$ = this.keywordsControl.valueChanges;
data$ = keywords$.pipe(
  /* Wait for the user to stop typing for 100ms and emit last value. */
  debounceTime(100),
  /* Ignore identical successive values
   * (e.g. user pastes the same value in the input). */
  distinctUntilChanged(), 
  /* when new keywords are emitted, this unsubscribes from the previous
   * search result (canceling the underlying http request)
   * and subscribes to the new one. */
  switchMap(keywords => this.search(keywords))
)

function interval(period) {
  return new Observable(observer => {
    let i = 0;
    const handle = setInterval(() => observer.next(i++), period);
    /* This is the teardown logic. */
    return () => clearInterval(handle);
  });
}

function getFiles(directoryPath) {
  return new Observable(observer => {
    ...
    return () => walker.pause();
  }
}

function readLines(filePath) {
  return new Observable(observer => {
    ...
    return () => reader.close();
  }
}

function search(): Observable<Line[]> {
  return getFiles(nodeModulesPath)
    .pipe(
      mergeMap(file => readLines(file)),
      ...
    );
}

  public async transformToResult(resultOrDeferred: any) {
    if (resultOrDeferred && isFunction(resultOrDeferred.subscribe)) {
      return resultOrDeferred.toPromise();
    }
    return resultOrDeferred;
  }

@Injectable()
export class NoopInterceptor implements NestInterceptor {
  intercept(context: ExecutionContext, next: CallHandler): Observable<unknown> {
    return next.handle();
  }
}

@Injectable()
export class UnsubscribeOnCloseInterceptor implements NestInterceptor {
  intercept(context: ExecutionContext, next: CallHandler): Observable<unknown> {
    if (context.getType() !== 'http') {
      return next.handle();
    }

    const request = context.switchToHttp().getRequest() as Request;

    const close$ = fromEvent(request, 'close');

    return next.handle().pipe(takeUntil(close$));
  }
}