Cancellation
When a callback message represents a long-running operation, the sender may need to cancel the request before it completes. Postboy supports cancellation through an explicit cancel$ stream passed as part of the message payload and consumed by the handler with takeUntil.
Message with cancellation support
Add an Observable<void> parameter to the message constructor. This stream signals cancellation from the sender.
import {PostboyCallbackMessage} from '@artstesh/postboy';
import {Observable} from 'rxjs';
export class SearchProductsQuery extends PostboyCallbackMessage<Product[]> {
static readonly ID = 'catalog.search';
constructor(
public readonly query: string,
public readonly cancel$: Observable<void>
) {
super();
}
}
Handler: stop work on cancellation
Apply takeUntil(message.cancel$) to the async operation. If cancellation signals before the operation finishes, the inner subscription is torn down and msg.finish() is never called.
import {HttpClient} from '@angular/common/http';
import {takeUntil} from 'rxjs/operators';
import {AppPostboyService} from '@shared/services/app-postboy.service';
import {ConnectMessage} from '@artstesh/postboy';
import {Subject} from 'rxjs';
import {SearchProductsQuery} from './messages/queries/search-products.query';
export class CatalogService {
constructor(private http: HttpClient, private postboy: AppPostboyService) {
this.postboy.exec(new ConnectMessage(SearchProductsQuery, new Subject<SearchProductsQuery>()));
this.postboy.sub(SearchProductsQuery).subscribe((msg) => {
this.http.get<Product[]>('/api/products', {params: {q: msg.query}})
.pipe(takeUntil(msg.cancel$))
.subscribe({
next: (products) => msg.finish(products),
error: () => msg.finish([]),
});
});
}
}
Sender: create cancellation trigger
The sender creates a Subject<void> for cancellation, passes it to the message, and also uses it to clean up its own subscription when the operation is cancelled.
import {Subject} from 'rxjs';
import {takeUntil} from 'rxjs/operators';
import {AppPostboyService} from '@shared/services/app-postboy.service';
import {SearchProductsQuery} from './messages/queries/search-products.query';
export class SearchComponent {
private cancel$ = new Subject<void>();
constructor(private postboy: AppPostboyService) {
}
onSearch(query: string): void {
// Cancel any ongoing search
this.cancel$.next();
const cancel$ = new Subject<void>();
this.cancel$ = cancel$;
this.postboy.fireCallback(new SearchProductsQuery(query, cancel$))
.pipe(takeUntil(cancel$))
.subscribe((products) => {
this.displayResults(products);
});
}
onCancel(): void {
this.cancel$.next();
}
}
Key points
Cancellation is a cooperative mechanism: the handler stops its work via takeUntil, and the sender stops waiting for a result using the same cancel$ stream.
When cancellation fires, msg.finish() is never called, so the sender’s subscription simply completes without a value.
Always clean up cancellation subjects to prevent memory leaks (takeUntil(cancel$) on both sides is sufficient).
This pattern keeps the bus simple — cancellation is entirely owned by the message contract and the application code.
24 апреля 2026