Postboy Help

Cancellation

When a callback message represents a long-running operation, the sender may need to cancel the request before it completes. Postboy supports cancellation through an explicit cancel$ stream passed as part of the message payload and consumed by the handler with takeUntil.

Message with cancellation support

Add an Observable<void> parameter to the message constructor. This stream signals cancellation from the sender.

import {PostboyCallbackMessage} from '@artstesh/postboy'; import {Observable} from 'rxjs'; export class SearchProductsQuery extends PostboyCallbackMessage<Product[]> { static readonly ID = 'catalog.search'; constructor( public readonly query: string, public readonly cancel$: Observable<void> ) { super(); } }

Handler: stop work on cancellation

Apply takeUntil(message.cancel$) to the async operation. If cancellation signals before the operation finishes, the inner subscription is torn down and msg.finish() is never called.

import {HttpClient} from '@angular/common/http'; import {takeUntil} from 'rxjs/operators'; import {AppPostboyService} from '@shared/services/app-postboy.service'; import {ConnectMessage} from '@artstesh/postboy'; import {Subject} from 'rxjs'; import {SearchProductsQuery} from './messages/queries/search-products.query'; export class CatalogService { constructor(private http: HttpClient, private postboy: AppPostboyService) { this.postboy.exec(new ConnectMessage(SearchProductsQuery, new Subject<SearchProductsQuery>())); this.postboy.sub(SearchProductsQuery).subscribe((msg) => { this.http.get<Product[]>('/api/products', {params: {q: msg.query}}) .pipe(takeUntil(msg.cancel$)) .subscribe({ next: (products) => msg.finish(products), error: () => msg.finish([]), }); }); } }

Sender: create cancellation trigger

The sender creates a Subject<void> for cancellation, passes it to the message, and also uses it to clean up its own subscription when the operation is cancelled.

import {Subject} from 'rxjs'; import {takeUntil} from 'rxjs/operators'; import {AppPostboyService} from '@shared/services/app-postboy.service'; import {SearchProductsQuery} from './messages/queries/search-products.query'; export class SearchComponent { private cancel$ = new Subject<void>(); constructor(private postboy: AppPostboyService) { } onSearch(query: string): void { // Cancel any ongoing search this.cancel$.next(); const cancel$ = new Subject<void>(); this.cancel$ = cancel$; this.postboy.fireCallback(new SearchProductsQuery(query, cancel$)) .pipe(takeUntil(cancel$)) .subscribe((products) => { this.displayResults(products); }); } onCancel(): void { this.cancel$.next(); } }

Key points

  • Cancellation is a cooperative mechanism: the handler stops its work via takeUntil, and the sender stops waiting for a result using the same cancel$ stream.

  • When cancellation fires, msg.finish() is never called, so the sender’s subscription simply completes without a value.

  • Always clean up cancellation subjects to prevent memory leaks (takeUntil(cancel$) on both sides is sufficient).

  • This pattern keeps the bus simple — cancellation is entirely owned by the message contract and the application code.

24 апреля 2026