Skip to main content

What is RxJS?

RxJS (Reactive Extensions for JavaScript) is a library for reactive programming using Observables. Reactive programming is a programming paradigm that deals with asynchronous data streams (sequences of events) and the propagation of change. Here's why you might consider using RxJS:

  1. Handling Asynchronous Operations: Traditional callbacks and promises are the common ways to deal with asynchronous operations in JavaScript. However, they can become cumbersome when dealing with complex scenarios, especially when chaining multiple async operations. RxJS Observables provide a more powerful and flexible way to handle asynchronous tasks.

  2. Unified API for Different Kinds of Data Sources: Whether you're dealing with events, AJAX requests, WebSockets, or even Promises, RxJS provides a consistent API to handle them all.

  3. Powerful Operators: RxJS comes with a wide range of operators that can be used to filter, transform, combine, and manipulate data streams. This allows for elegant solutions to complex problems.

  4. Error Handling: RxJS provides robust mechanisms for handling errors throughout the entire chain of operations.

  5. Lazy Computation: Observables are lazy by nature. This means they don't start producing values until someone subscribes to them. This can be advantageous in terms of performance and resource management.

  6. Cancelling Subscriptions: With RxJS, you can unsubscribe from an Observable, effectively canceling an ongoing async operation. This is particularly useful for avoiding potential memory leaks and unnecessary operations.

  7. Debugging Tools: Tools like rxjs-spy allow for debugging of reactive streams, making it easier to trace and diagnose issues in complex applications.

  8. Wide Adoption in Frameworks/Libraries: Libraries and frameworks like Angular heavily use RxJS, making it a crucial part of the ecosystem.

  9. Backpressure Management: RxJS offers tools to handle backpressure – a situation where data is produced faster than it can be consumed. This is particularly useful in scenarios like streaming large amounts of data over WebSockets.

  10. Clear Separation of Concerns: RxJS can help in structuring applications in a way that separates side effects from the core logic, leading to more maintainable and testable code.

While RxJS offers numerous advantages, it also comes with a learning curve. It introduces a shift in thinking, and developers need to familiarize themselves with the concepts of reactive programming. However, once grasped, it can lead to cleaner, more scalable, and more maintainable code, especially in applications that deal with a high degree of asynchronicity.

Why we need to use RxJS Operators?

RxJS operators are functions that can be used with Observables to create new Observables, transform the emitted items, handle errors, or perform other operations on the data stream. Using operators is central to the power and flexibility of RxJS. Here are the benefits of using RxJS operators:

  1. Data Transformation: Operators like map, pluck, and scan allow you to transform the items emitted by an Observable. This means you can easily modify data in the stream to suit your application's needs without altering the source Observable.

  2. Filtering: With operators like filter, take, skip, and distinct, you can selectively emit specific items from an Observable, allowing you to process only the data you're interested in.

  3. Combining Multiple Streams: Operators such as merge, concat, combineLatest, and zip let you combine multiple Observables into a single Observable, enabling powerful ways to coordinate and combine asynchronous tasks.

  4. Error Handling: Operators like catchError and retry provide robust mechanisms to handle errors in the Observable chain, ensuring that your application remains resilient in the face of unexpected issues.

  5. Controlling the Flow of Data: Operators like debounceTime, throttleTime, and buffer allow you to control the rate at which data is processed, which can be essential for scenarios like search-as-you-type to prevent overloading the server with requests.

  6. Conditional and Boolean Operations: Using operators like every, some, and isEmpty, you can perform conditional checks on the items emitted by an Observable.

  7. Utility Operations: Operators like tap (formerly do), delay, and timeout offer utility functions that can be useful for debugging, logging, or introducing delays in the data stream.

  8. Resource Management: Operators like finalize (formerly finally) and using help in managing resources, ensuring that certain actions are taken when the Observable completes or errors out.

  9. Conversion: Operators such as toArray, toPromise, and pairwise allow you to convert the data stream into different forms, making it more suitable for specific use cases.

  10. Increased Readability: By chaining operators, you can express complex asynchronous logic in a more readable and declarative manner. This can make the code easier to understand and maintain.

  11. Optimized Performance: Some operators can improve performance by reducing the amount of unnecessary computation or network requests. For instance, the switchMap operator can cancel redundant network requests if a new value arrives before the previous request completes.

In essence, RxJS operators empower developers to handle asynchronous data streams with precision and expressiveness. They provide the tools to create, transform, and manage Observables, making it easier to build reactive applications that are both powerful and maintainable.

RxJs building blocks?

Certainly. RxJS provides several core building blocks that are essential for creating and working with reactive data streams. Here are the primary building blocks of RxJS:

  1. Observables:

    • The central concept in RxJS. An Observable represents a lazy push-based collection of multiple values over time.
    • Observables produce values and "emit" them to subscribers.
  2. Observers:

    • An object with callback functions that will be notified of new values (next), errors (error), or completion (complete) of the Observable.
    • It's essentially the consumer of the values emitted by the Observable.
  3. Subscription:

    • Represents the execution of an Observable.
    • When you subscribe to an Observable, you get a Subscription object, which you can use to unsubscribe (or cancel) the execution.
  4. Operators:

    • Pure functions that are used to transform or modify data sequences.
    • They can be chained together using the pipe method.
    • Examples include map, filter, merge, and concat.
  5. Subjects:

    • A special type of Observable that can multicast to many Observers.
    • Unlike plain Observables, Subjects maintain a list of subscribers and can have values "nexted" into them.
    • Types of Subjects: Subject, BehaviorSubject, ReplaySubject, and AsyncSubject.
  6. Schedulers:

    • A central dispatcher for controlling concurrency, allowing you to decide when computation happens (e.g., immediately, as an animation frame, or at an interval).
    • Examples include asapScheduler, queueScheduler, and animationFrameScheduler.
  7. Pipe:

    • A method used for chaining multiple operators together.
    • Operators are piped to Observables, and the result is a new Observable with the applied transformations.
  8. Notification:

    • Represents the idea of push-based events with values, errors, or completion signals.
    • Can be used for meta-operations and custom handling of Observable behaviors.
  9. Marble Testing:

    • A visual representation and testing methodology for Observables using diagrams.
    • Allows for easy and intuitive testing of complex asynchronous operations.
  10. Error Handling:

  • RxJS provides mechanisms to handle errors gracefully using operators like catchError and retry.
  1. Utilities:
  • Functions like create, of, from, and interval that help in creating and working with Observables.

Each of these building blocks plays a crucial role in the RxJS ecosystem, allowing developers to create, transform, and manage reactive data streams effectively. Understanding these core concepts is vital for anyone looking to harness the full power of RxJS in their applications.

What is Observables in RxJS?

In RxJS, an Observable is a foundational building block that represents a lazy push-based collection of values over time. It's the core concept around which most of RxJS revolves. Here's a deeper dive into Observables:

  1. Definition:

    • An Observable is akin to a stream, which can emit multiple values over time, either synchronously or asynchronously.
    • It can emit zero or more values and then complete or produce an error.
  2. Creation:

    • Observables can be created using various utility functions provided by RxJS, such as of, from, interval, and create.
  3. Subscription:

    • Observables are lazy by nature, meaning they don't produce values until they are subscribed to.
    • When you subscribe to an Observable using its subscribe method, the Observable begins to emit values to a subscriber (an Observer).
  4. Observer:

    • The subscriber or consumer of an Observable's values is known as an Observer.
    • An Observer is an object with callback methods: next, error, and complete.
    • The next method is called to receive emitted values, error is called if there's an error during execution, and complete is called when the Observable has finished emitting values.
  5. Completion & Error:

    • An Observable can complete, signaling that it won't emit any more values.
    • It can also produce an error, indicating that something went wrong during its execution. When an error occurs, the Observable stops emitting values.
  6. Multicasting:

    • By default, Observables are unicast, meaning each subscriber gets its own execution of the Observable.
    • However, Observables can be converted into multicasting using Subjects, where multiple subscribers share the same execution.
  7. Unsubscribing:

    • To release resources and prevent potential memory leaks, it's essential to unsubscribe from an Observable when you no longer need to listen to it. The subscribe method returns a Subscription object that has an unsubscribe method for this purpose.
  8. Operators:

    • Observables can be transformed, filtered, combined, and more using operators. Operators are functions that return a new Observable based on the original one but with some modification.

In summary, Observables in RxJS provide a way to work with asynchronous data streams. Whether it's events from the DOM, HTTP requests, timers, or any other asynchronous data source, Observables offer a powerful and flexible tool to handle and manipulate these data streams in a reactive manner.4

Subjects in RxJS

In RxJS, a Subject is a special type of Observable that can multicast values to many Observers. While standard Observables are unicast (each subscribed Observer owns an independent execution of the Observable), Subjects are multicast. Here are the main types of Subjects in RxJS:

  1. Subject:

    • Basic Description: The regular Subject allows for multicasting to multiple Observers. It doesn't have an initial value and doesn't replay any old values to new subscribers.
    • Usage: When you need a simple event emitter or need to multicast values to multiple Observers.
    • Behavior: When a value is emitted, it will be sent to all subscribers. New subscribers won't receive any previously emitted values.
  2. BehaviorSubject:

    • Basic Description: A variant of Subject that requires an initial value and emits its current value (last emitted item) to new subscribers.
    • Usage: When you want new subscribers to get the latest value immediately upon subscription.
    • Behavior: Stores the latest value and provides it to all new subscribers. If no values have been emitted, it provides the initial value.
  3. ReplaySubject:

    • Basic Description: Records multiple values from the Observable execution and replays them to new subscribers.
    • Usage: When you want new subscribers to receive a specified number of latest values or all values from a specific time window.
    • Behavior: You can specify the buffer size (number of recent values to store) and a window time (time frame for which to keep values). New subscribers will receive values based on these settings.
  4. AsyncSubject:

    • Basic Description: Only emits the last value (and only the last value) emitted by the source Observable, and only after the source Observable completes.
    • Usage: When you only care about the final value and want to receive it once the Observable completes.
    • Behavior: It doesn't emit any values to subscribers until the source Observable completes. Once completed, it emits the last value to all current and future subscribers.

Each type of Subject serves a different use case. The choice of which one to use depends on the desired behavior regarding how values are emitted and how new subscribers should receive those values. In practice, Subjects are instrumental in scenarios where you want to maintain state, share values among multiple subscribers, or control the emission of values in a more granular way.

Pipes in Rxjs

In RxJS, the term "pipe" refers to a method used to chain multiple operators together to process and transform values emitted by Observables. The pipe method is available on both Observables and Subjects. Instead of discussing "types of pipes," it's more accurate to discuss the various operators that can be used within the pipe method.

Here are categories of operators that can be used within the pipe method, along with examples from each category:

  1. Transformation Operators:

    • Transform the data emitted by an Observable.
    • Examples: map, buffer, scan, pluck, switchMap, mergeMap, concatMap, exhaustMap
  2. Filtering Operators:

    • Emit only the values that satisfy a given condition.
    • Examples: filter, first, last, skip, take, debounceTime, distinctUntilChanged
  3. Combination Operators:

    • Combine multiple Observables or values in various ways.
    • Examples: combineLatest, concat, merge, startWith, withLatestFrom, zip, race
  4. Error Handling Operators:

    • Handle errors that occur within an Observable sequence.
    • Examples: catchError, retry, retryWhen
  5. Multicasting Operators:

    • Share a single Observable execution with multiple subscribers.
    • Examples: share, shareReplay, multicast, publish
  6. Utility Operators:

    • Provide useful utilities for Observables.
    • Examples: tap, delay, toPromise, finalize, timeInterval
  7. Conditional Operators:

    • Emit values or Observables based on a condition.
    • Examples: defaultIfEmpty, every, find, iif, isEmpty
  8. Mathematical and Aggregate Operators:

    • Perform mathematical or aggregate operations on an Observable's emitted values.
    • Examples: count, max, min, reduce, average
  9. Creation Operators:

    • Create an Observable from various other data types or sources.
    • Examples: from, of, interval, timer, range
  10. Join Creation Operators:

  • Join multiple Observables together.
  • Examples: forkJoin, pairwise

To use these operators, you'd use the pipe method on an Observable, like so:

import { of } from 'rxjs';
import { map, filter } from 'rxjs/operators';

const numbers$ = of(1, 2, 3, 4, 5);

numbers$.pipe(
filter(value => value % 2 === 0), // Filter even numbers
map(value => value * 2) // Double the value
).subscribe(console.log); // Output: 4, 8

The above example uses the pipe method to chain the filter and map operators together. The Observable emits the numbers 1 through 5, the filter operator filters out the odd numbers, and the map operator then doubles the remaining values.

Exception Handling in RxJS

Certainly. Error handling is a crucial aspect of working with Observables in RxJS. When an error is thrown within an Observable sequence, it will be caught and passed down the chain to be handled by subscribers. RxJS provides several operators to handle errors gracefully:

  1. catchError:

    • This operator catches errors on the source Observable by returning a new Observable or an error.
    • Allows you to handle the error or switch to a new Observable.
    • Example:
      import { throwError, of } from 'rxjs';
      import { catchError } from 'rxjs/operators';

      throwError('This is an error!')
      .pipe(
      catchError(err => of(`Caught error: ${err}`))
      )
      .subscribe(console.log); // Output: Caught error: This is an error!
  2. retry:

    • If the source Observable errors, resubscribe to it for a specified number of times.
    • Useful for scenarios where errors might be transient (like network requests).
    • Example:
      import { interval, throwError } from 'rxjs';
      import { mergeMap, retry } from 'rxjs/operators';

      interval(1000)
      .pipe(
      mergeMap(val => {
      if (val > 2) {
      return throwError('Error!');
      }
      return of(val);
      }),
      retry(2)
      )
      .subscribe(console.log, console.error);
  3. retryWhen:

    • Similar to retry, but lets you specify custom logic for when to retry.
    • For example, you can add a delay before retrying.
    • Example:
      import { timer, throwError } from 'rxjs';
      import { mergeMap, retryWhen, delay } from 'rxjs/operators';

      throwError('Error!')
      .pipe(
      retryWhen(errors => errors.pipe(delay(1000)))
      )
      .subscribe(console.log, console.error);
  4. throwIfEmpty:

    • Throws an error if the Observable completes without emitting any values.
    • Example:
      import { of } from 'rxjs';
      import { filter, throwIfEmpty } from 'rxjs/operators';

      of(1, 2, 3)
      .pipe(
      filter(val => val > 3),
      throwIfEmpty()
      )
      .subscribe(console.log, console.error); // Output: Error: Empty result
  5. timeout:

    • Emits an error if a particular time span has passed without any emitted value.
    • Example:
      import { of, throwError } from 'rxjs';
      import { timeout, catchError } from 'rxjs/operators';

      of(1).pipe(
      timeout(1000),
      catchError(() => throwError('Timeout!'))
      )
      .subscribe(console.log, console.error);
  6. timeoutWith:

    • Similar to timeout, but switches to another Observable if the source doesn't emit a value within a certain time frame.
    • Example:
      import { of } from 'rxjs';
      import { timeoutWith, catchError } from 'rxjs/operators';

      of(1).pipe(
      timeoutWith(1000, of('Timeout value'))
      )
      .subscribe(console.log); // Output: 1

When working with Observables, especially in the context of asynchronous operations like HTTP requests, it's essential to handle errors appropriately to ensure a smooth user experience. Using these error-handling operators in RxJS allows you to manage errors effectively and provide fallback mechanisms or retry strategies.

Observer in RxJS

In RxJS, an Observer is an interface for a consumer of Observable values. It allows you to "observe" the values emitted by the Observable, as well as any errors or the completion notification. An Observer essentially consists of three callback methods:

  1. next:

    • Called whenever the Observable emits a new value.
    • Receives the emitted value as its argument.
    • Example:
      const nextCallback = value => console.log(`Received value: ${value}`);
  2. error:

    • Called if the Observable encounters an error during its execution.
    • Receives the error thrown by the Observable as its argument.
    • Once this method is called, the Observable terminates and no more values will be emitted.
    • Example:
      const errorCallback = error => console.error(`Encountered error: ${error}`);
  3. complete:

    • Called if the Observable successfully completes without any errors.
    • Does not receive any arguments.
    • Once this method is called, the Observable terminates and no more values will be emitted.
    • Example:
      const completeCallback = () => console.log('Observable completed.');

You can provide these callbacks when you subscribe to an Observable:

import { of } from 'rxjs';

const observable = of(1, 2, 3);

const observer = {
next: value => console.log(value),
error: err => console.error(err),
complete: () => console.log('Completed')
};

observable.subscribe(observer);
// Output:
// 1
// 2
// 3
// Completed

It's important to note that while you can provide all three callbacks, they are all optional. For example, you could just provide the next callback if you're only interested in the emitted values and not in errors or completion.

In addition to the above, RxJS provides some shorthand methods:

  • You can directly pass the callbacks as arguments to subscribe without explicitly defining an observer object:

    observable.subscribe(
    value => console.log(value),
    err => console.error(err),
    () => console.log('Completed')
    );
  • If you're only interested in one type of notification (e.g., just the emitted values), you can directly pass a single callback to subscribe:

    observable.subscribe(value => console.log(value));

In summary, the Observer in RxJS serves as the bridge between the Observable (producer of values) and the code that consumes those values. By providing callback methods, you can react to each emitted value, handle errors, and take action upon completion.

Rxjs complete Notes

Please click the below link for more details.

Ref: Learn RxJS

Ref: RxJS Book