Streaming Resources in Angular 19.2 – Details and Semantics

  1. Signals in Angular: The Future of Change Detection
  2. Component Communication with Signals: Inputs, Two-Way Bindings, and Content/ View Queries
  3. Successful with Signals in Angular – 3 Effective Rules for Your Architecture
  4. Skillfully Using Signals in Angular – Selected Hints for Professional Use
  5. When (Not) to use Effects in Angular — and what to do instead
  6. Asynchronous Data Flow with Angular’s new Resource API
  7. Streaming Resources in Angular 19.2 – Details and Semantics
  8. Streaming Resources for a Chat with Web Sockets: Messages in a Glitch-Free World
  9. Angular’s new httpResource

With Streaming Resources, the Resource API and Signals are expanding into areas of RxJS. They allow data streams to be represented. In addition to the similarities to RxJS, some non-obvious but significant semantic differences need to be considered.

This article explains the details of Streaming Resources, the semantic subtleties, and the interaction with RxJS.

📂 Source Code

Big thanks to Alex Rickabaugh from the Angular team for reviewing the examples and for the enlightening discussions about semantics and the idiomatic use of Streaming Resources.

This article is written using version 19.2.0-next.0. I will update the text to respect changes when necessary.

Example Application

The example used here is the “Hello World” of reactive programming: a simple timer that counts up every second:

A simple timer based on a streaming resource

The beauty of this elementary example is that it already contains a lot of material to discuss the small but important nuances of Streaming Resources. A variation of this example is also used to demonstrate interoperability with RxJS.

Anatomy of a Streaming Resource

In contrast to conventional resources, the Loader of a streaming resource always returns the following structure:

PromiseLike<Signal<
    { value: T; } 
    | { error: unknown;}>>;

This is a Promise with a Signal holding an object. The object refers either to the last published value or an error. Because the Signal takes on several values or error states over time, it represents a data stream.

To simplify the program code, I define a type called StreamItem for the content of this Signal:

export type StreamItem<T> =
  | {
      value: T;
    }
  | {
      error: unknown;
    };

Angular does not currently offer an explicit type for this. However, this will change during further development.

The program code uses the resource function, which is also used for conventional resources, to create a streaming resource. However, instead of a Loader, this function is passed a Streaming Loader via the stream property:

const myResource = resource({
  request: requestSignal,
  stream: async (params) => {

    // 1. Create Signal representing the Stream
    const result = signal<StreamItem<number>>({ 
      value: 4711 
    });

    // 2. Set up async logic updating the Signal
    […]

    // 3. Set up clean-up handler triggered by AbortSignal
    params.abortSignal.addEventListener('abort', () => {
      […]
    });

    // 4. Return Signal
    return result;
  },
});

Because of the async keyword, the Promise that provides the Signal is implicit. The Loader can be divided into four parts, which are indicated with comments in the example shown:

  1. First, the Streaming Loader creates a new Signal representing the data stream. This Signal is given an initial value.
  2. The Streaming Loader starts an asynchronous operation that produces multiple results over time. It publishes these results one by one via the Signal.
  3. The Streaming Loader is also responsible for providing cleanup logic that terminates the underlying asynchronous operation when its values are no longer needed.
  4. At the end, the Streaming Loader returns the Signal.

To provide the cleanup logic, the Streaming Loader uses the AbortSignal provided by the Resource API, which can be found in the passed parameter object. Interestingly, these points correspond more or less to the typical approach when using the Observable constructor directly in the RxJS world.

Switch-Map Semantics when Transitioning between Streams

The defined cleanup logic runs whenever the application no longer needs the current data stream. There are two reasons for this: The first occurs when Angular destroys the building block that hosts the resource. Let's imagine a component with a resource. When leaving, the router destroys this component, and Angular also destroys the resource.

The second reason arises when the Resource’s request Signal changes. Each change triggers the Streaming Loader, which then delivers a new stream. When transitioning between these streams, the resource API uses the same semantics as switchMap in RxJS:

Switch Map Semantic when transitioning to a new stream

This means that the resource always consumes only the most recent stream. This approach is usually used when loading data. As is generally the case with Signals in Angular, the aim is to offer simple concepts for standard cases. For more complex scenarios, the application can use libraries such as RxJS.

More on this: Angular Architecture Workshop (Remote, Interactive, Advanced)

Become an expert for enterprise-scale and maintainable Angular applications with our Angular Architecture workshop!

English Version | German Version

A Simple Resource-based Timer

Now that we have discussed the basic structure of a Streaming Resource, I would like to demonstrate its concrete use using a first example: a timer that counts up a number at a specific interval. The next listing shows this timerResource from the perspective of a consumer:

@Component([…])
export class TimerResourceComponent {
  ResourceStatus = ResourceStatus;

  startValue = signal(0);
  timer = timerResource(1000, this.startValue);

  forward(): void {
    this.startValue.update((v) => nextSegment(v));
  }
}

function nextSegment(currentValue: number): number {
  return Math.floor(currentValue / 100) * 100 + 100;
}

The timer returns a stream that starts counting at the given start value. The first parameter of timerResource represents the desired interval in milliseconds. The start value is defined by the startValue Signal. Whenever it changes, the timer switches to a new stream. To demonstrate this, the forward method jumps to the next full hundred, e.g., from 17 to 100 or from 123 to 200.

Factory for the Streaming Resource

To simplify the use of the streaming resource for the timer, the timerResource function is simply a factory:

export function timerResource(
  timeout: number,
  startValue: () => number
): ResourceRef<number | undefined> {

  const request = computed(() => ({
    startValue: startValue(),
  }));

  const result = resource({
    request: request,
    stream: async (params) => {
      const counter = params.request.startValue;
      […]
    }
  });

  return result;
}

This factory receives the desired interval (timeout) and a signal with the default value. For this signal, timerResource is only interested in the Getter. Therefore, it types the signal with () => number.

The return value is a ResourceRef <number | undefined>. The type parameter corresponds to the values in the stream. Since the resource can generate the stream asynchronously, it initially sets this value to undefined. To avoid undefined, Angular will also offer the option of setting your own initial value in the future.

The computed signal request represents all parameters that trigger the Loader. Since only the startValue is used as a trigger in the example discussed, the Computed Signal feels a bit like a middleman without a purpose. Nevertheless, I like to stick with this approach, especially since it allows the trigger to be extended later and makes the name startValue available within the (Streaming) Loader: In the case discussed, the Loader can obtain the current value via param.request.startValue.

Streaming Loder for the Timer

The structure of the streaming loader corresponds to the four sections discussed above:

const result = resource({
  request: request,
  stream: async (params) => {
    let counter = params.request.startValue;

    // 1. Create Signal representing the Stream
    const resultSignal = signal<StreamItem<number>>({
      value: params.request.startValue,
    });

    // 2. Set up async logic updating the Signal
    const ref = setInterval(() => {
      counter++;
      console.log('tick', counter);

      if (counter === 7 || counter === 13) {
        resultSignal.set({ error: 'bad luck!' });
      } else {
        resultSignal.set({ value: counter });
      }
    }, timeout);

    // 3. Set up clean-up handler triggered by AbortSignal
    params.abortSignal.addEventListener('abort', () => {
      console.log('clean up!');
      clearInterval(ref);
    });

    // 4. Return Signal
    return resultSignal;
  },
});

The Loader implements the asynchronous counting operation using the traditional JavaScript setInterval function. To demonstrate the behavior of error states, the timer reports an error for the values 7 and 13. Hence, superstitious users also have their needs met.

Due to the asynchronous behavior of setInterval, step 4 takes place before the first execution of the callback in step 2. This means that the Loader first returns the Signal with its initial value, and only then does the Signal gradually receive new values.

Trying Out the Streaming Resource

When trying out the example, you can first see how the example increments the counter every second. Instead of the values 7 and 13, it reports an error. Unlike RxJS, however, such an error does not end the stream. As soon as the streaming loader returns a new value, the resource makes it available.

The following image shows the console output of the Loader:

Transition to new streams

It emphasizes that the Streaming Resource only uses the newest stream, which results in switch-map semantics.

Shortly before each clean up! was issued, the user triggered the forward function. This results in a new value in startValue, and this circumstance triggers the Loader again. The resource notifies the AbortSignal of the old stream, which its abort handler terminates. From then on, the resource uses the new stream provided by the loader, which continues with the next hundredth.

Interop with RxJS and Observables

Together with the Resource API, Angular 19 also introduced the rxResource, which allows bridging the gap to the RxJS world. Starting with Angular 19.2, rxResources are always Streaming Resources. This means that an rxResource gradually delivers the values of the Observable returned by the Loader. To demonstrate this behavior, the next listing shows a variation of the previously discussed timer based on rxResource:

export function timerResource(
  timeout: number,
  startValue: () => number
): ResourceRef<number | undefined> {

  const request = computed(() => ({
    startValue: startValue(),
  }));

  return rxResource({
    request: request,
    loader: (params) => {
      const startValue = params.request.startValue;
      return interval(timeout).pipe(
        map((v) => v + startValue + 1),
        startWith(startValue),
        tap((v) => console.log('counter', v)),
        switchMap((value) => {
          if (value === 7 || value === 13) {
            return throwError(() => 'bad luck');
          }
          return [value];
        })
      );
    },
  });
}

This implementation is somewhat shorter, thanks to the numerous operators RxJS offers. In principle, analogous helper functions could also be provided for Signals and Resources. However, I feel that the trend in the Signals world is moving towards more use case-specific and thus coarse-grained building blocks such as timerResource. Time will tell whether this impression proves to be true.

Important Details When Using Streams with rxResource

A small detail with the rxResource is that the application does not provide the Streaming Loader via the stream property but via loader. This may be due to the fact that a loader of an rxResource is always a Streaming Loader. Suppose the application only wants to use the first value of the observable, as in the original implementation in Angular 19.0 and 19.1. In that case, it must now resort to operators such as first or take (1).

A critical difference arises when an error occurs. This is where the semantics of Observables and resources Collide. An observable closes automatically at the first unhandled error. For this reason, the Observable in the rxResource does not recover after the first error, which in our example occurs instead of the value 7.

However, the whole rxResource can recover from the error state by switching to a new stream. For this, for example, the request signal must change. It triggers the Loader again, which returns a new Observable.

As usual, switch-map semantics are used when switching to new Observables. This means that the rxResource only uses the most recent Observable at a time. It closes predecessors by unsubscribing.

Summary

Streaming Resources from Angular 19.2 represent data streams without relying on RxJS. The asynchronous Streaming Loader updates a Signal representing the stream. The Resource’s request Signal triggers the Streaming Loader, which delivers a new data stream and terminates the old one - this corresponds to the switch map semantics known from RxJS.

The rxResource, which bridges over to RxJS, is now a Streaming Resource by default. If an application only wants to use the first value of the underlying Observable, it must use operators such as first or take (1). Unlike Observables, a Resource encountering an unhandled error is not closed but can continue publishing values. However, since the rxResource is based on an Observable, the current stream cannot recover from an error state. However, it is possible to switch to a new stream when the application triggers the Streaming Loader again.

eBook: Modern Angular

Bleibe am Puls der Zeit und lerne, moderne und leichtgewichtige Lösungen mit den neuesten Angular-Features zu entwickeln: Standalone, Signals, Build-in Dataflow.

Gratis downloaden