RxJS是Angular状态管理的核心,提供了强大的响应式编程能力,让状态管理变得更加可预测和可维护。
Observable表示一个可调用的未来值或事件的集合。
// 创建Observable
import { Observable, of, from, interval, fromEvent } from 'rxjs';
// 1. of - 创建同步Observable
const numbers$ = of(1, 2, 3, 4, 5);
// 2. from - 从数组、Promise等创建
const array$ = from([1, 2, 3, 4, 5]);
const promise$ = from(fetch('/api/data'));
// 3. interval - 创建定时器Observable
const timer$ = interval(1000); // 每秒发出一个递增数字
// 4. fromEvent - 从DOM事件创建
const button = document.getElementById('myButton');
const clicks$ = fromEvent(button, 'click');
// 订阅Observable
numbers$.subscribe({
next: value => console.log('值:', value),
error: err => console.error('错误:', err),
complete: () => console.log('完成!')
});
// 输出:
// 值: 1
// 值: 2
// 值: 3
// 值: 4
// 值: 5
// 完成!
Subject既是Observable又是Observer,可以多播给多个观察者。
import { Subject, BehaviorSubject, ReplaySubject, AsyncSubject } from 'rxjs';
// 1. Subject - 基本Subject
const subject = new Subject<string>();
// 订阅Subject
subject.subscribe(value => console.log('观察者A:', value));
subject.subscribe(value => console.log('观察者B:', value));
// 发送值
subject.next('Hello');
subject.next('World');
// 输出:
// 观察者A: Hello
// 观察者B: Hello
// 观察者A: World
// 观察者B: World
// 2. BehaviorSubject - 保存当前值并向新订阅者发送
const behaviorSubject = new BehaviorSubject<number>(0); // 初始值
behaviorSubject.subscribe(value => console.log('订阅1:', value));
behaviorSubject.next(1);
behaviorSubject.next(2);
behaviorSubject.subscribe(value => console.log('订阅2:', value)); // 收到最新值
// 输出:
// 订阅1: 0
// 订阅1: 1
// 订阅1: 2
// 订阅2: 2
// 3. ReplaySubject - 重放指定数量的值
const replaySubject = new ReplaySubject<number>(2); // 缓存最后2个值
replaySubject.next(1);
replaySubject.next(2);
replaySubject.next(3);
replaySubject.subscribe(value => console.log('订阅:', value));
// 输出:
// 订阅: 2
// 订阅: 3
// 4. AsyncSubject - 只在完成时发送最后一个值
const asyncSubject = new AsyncSubject<number>();
asyncSubject.subscribe(value => console.log('订阅:', value));
asyncSubject.next(1);
asyncSubject.next(2);
asyncSubject.next(3);
asyncSubject.complete(); // 完成时发送3
// 输出:
// 订阅: 3
import { map, filter, tap, take, debounceTime, distinctUntilChanged, switchMap } from 'rxjs/operators';
const source$ = of(1, 2, 3, 4, 5);
// 1. map - 转换值
const doubled$ = source$.pipe(
map(x => x * 2)
);
// 结果: 2, 4, 6, 8, 10
// 2. filter - 过滤值
const even$ = source$.pipe(
filter(x => x % 2 === 0)
);
// 结果: 2, 4
// 3. tap - 执行副作用
const logged$ = source$.pipe(
tap(value => console.log('原始值:', value)),
map(x => x * 2),
tap(value => console.log('转换后:', value))
);
// 4. take - 取前N个值
const firstThree$ = source$.pipe(
take(3)
);
// 结果: 1, 2, 3
// 5. debounceTime - 防抖
const searchInput = document.getElementById('search');
const search$ = fromEvent(searchInput, 'input').pipe(
map((event: any) => event.target.value),
debounceTime(300),
distinctUntilChanged()
);
// 6. switchMap - 切换Observable(常用于HTTP请求)
search$.pipe(
switchMap(searchTerm => {
return this.http.get(`/api/search?q=${searchTerm}`);
})
).subscribe(results => {
console.log('搜索结果:', results);
});
源: 1---2---3---4---5---|
map(x => x * 2):
2---4---6---8---10---|
filter(x => x % 2 === 0):
2-------4-------10---|
take(3):
1---2---3---|
// app-state.service.ts
import { Injectable } from '@angular/core';
import { BehaviorSubject, Observable } from 'rxjs';
import { map, distinctUntilChanged } from 'rxjs/operators';
export interface AppState {
user: User | null;
isLoading: boolean;
theme: 'light' | 'dark';
notifications: Notification[];
cart: CartItem[];
}
const initialState: AppState = {
user: null,
isLoading: false,
theme: 'light',
notifications: [],
cart: []
};
@Injectable({
providedIn: 'root'
})
export class AppStateService {
private state = new BehaviorSubject<AppState>(initialState);
private state$ = this.state.asObservable();
// 选择器 - 获取特定状态片段
select<T>(selector: (state: AppState) => T): Observable<T> {
return this.state$.pipe(
map(selector),
distinctUntilChanged()
);
}
// 获取当前状态快照
getSnapshot(): AppState {
return this.state.getValue();
}
// 更新状态 - 部分更新
updateState(partialState: Partial<AppState>): void {
const currentState = this.getSnapshot();
const newState = { ...currentState, ...partialState };
this.state.next(newState);
}
// 特定action方法
setLoading(isLoading: boolean): void {
this.updateState({ isLoading });
}
setUser(user: User | null): void {
this.updateState({ user });
}
setTheme(theme: 'light' | 'dark'): void {
this.updateState({ theme });
}
addToCart(item: CartItem): void {
const currentCart = this.getSnapshot().cart;
const existingItem = currentCart.find(i => i.id === item.id);
if (existingItem) {
const updatedCart = currentCart.map(cartItem =>
cartItem.id === item.id
? { ...cartItem, quantity: cartItem.quantity + item.quantity }
: cartItem
);
this.updateState({ cart: updatedCart });
} else {
this.updateState({ cart: [...currentCart, item] });
}
}
removeFromCart(itemId: string): void {
const currentCart = this.getSnapshot().cart;
const updatedCart = currentCart.filter(item => item.id !== itemId);
this.updateState({ cart: updatedCart });
}
}
// app.component.ts
import { Component, OnInit, OnDestroy } from '@angular/core';
import { AppStateService } from './app-state.service';
import { Subscription } from 'rxjs';
@Component({
selector: 'app-root',
templateUrl: './app.component.html'
})
export class AppComponent implements OnInit, OnDestroy {
isLoading = false;
user: User | null = null;
cartCount = 0;
theme = 'light';
private subscriptions: Subscription[] = [];
constructor(private stateService: AppStateService) {}
ngOnInit(): void {
// 方式1: 直接订阅
const loadingSub = this.stateService
.select(state => state.isLoading)
.subscribe(isLoading => {
this.isLoading = isLoading;
});
// 方式2: 使用async pipe(推荐,自动取消订阅)
// 在模板中: {{ isLoading$ | async }}
this.user$ = this.stateService.select(state => state.user);
this.theme$ = this.stateService.select(state => state.theme);
this.cartCount$ = this.stateService.select(state =>
state.cart.reduce((total, item) => total + item.quantity, 0)
);
// 方式3: 订阅多个状态
const combinedSub = this.stateService
.select(state => ({
user: state.user,
theme: state.theme
}))
.subscribe(({ user, theme }) => {
this.user = user;
this.theme = theme;
});
this.subscriptions.push(loadingSub, combinedSub);
}
login(): void {
this.stateService.setLoading(true);
// 模拟API调用
setTimeout(() => {
const user: User = {
id: '1',
name: '张三',
email: 'zhangsan@example.com'
};
this.stateService.setUser(user);
this.stateService.setLoading(false);
}, 1000);
}
logout(): void {
this.stateService.setUser(null);
}
toggleTheme(): void {
const currentTheme = this.stateService.getSnapshot().theme;
const newTheme = currentTheme === 'light' ? 'dark' : 'light';
this.stateService.setTheme(newTheme);
}
ngOnDestroy(): void {
// 手动清理订阅
this.subscriptions.forEach(sub => sub.unsubscribe());
}
}
<!-- app.component.html -->
<div [class.dark-theme]="(theme$ | async) === 'dark'" class="app-container">
<header>
<div class="user-info" *ngIf="user$ | async as user">
欢迎, {{user.name}}!
<button (click)="logout()">退出登录</button>
</div>
<button *ngIf="!(user$ | async)" (click)="login()" [disabled]="isLoading">
{{isLoading ? '登录中...' : '登录'}}
</button>
<button (click)="toggleTheme()">
切换主题 (当前: {{theme$ | async}})
</button>
<div class="cart">
购物车: {{cartCount$ | async}} 件商品
</div>
</header>
<div *ngIf="isLoading" class="loading-overlay">
<i class="fas fa-spinner fa-spin"></i> 加载中...
</div>
<main>
<!-- 应用内容 -->
</main>
</div>
// store.types.ts
export interface Action {
type: string;
payload?: any;
}
export interface Reducer<T> {
(state: T, action: Action): T;
}
// store.ts
import { Injectable } from '@angular/core';
import { BehaviorSubject, Observable } from 'rxjs';
import { map, distinctUntilChanged } from 'rxjs/operators';
@Injectable({
providedIn: 'root'
})
export class Store<T> {
private stateSubject: BehaviorSubject<T>;
state$: Observable<T>;
constructor(private reducer: Reducer<T>, initialState: T) {
this.stateSubject = new BehaviorSubject<T>(initialState);
this.state$ = this.stateSubject.asObservable();
}
getState(): T {
return this.stateSubject.getValue();
}
dispatch(action: Action): void {
const currentState = this.getState();
const newState = this.reducer(currentState, action);
this.stateSubject.next(newState);
}
select<R>(selector: (state: T) => R): Observable<R> {
return this.state$.pipe(
map(selector),
distinctUntilChanged()
);
}
}
// counter.reducer.ts
export interface CounterState {
count: number;
loading: boolean;
}
const initialState: CounterState = {
count: 0,
loading: false
};
export function counterReducer(
state: CounterState = initialState,
action: Action
): CounterState {
switch (action.type) {
case 'INCREMENT':
return { ...state, count: state.count + 1 };
case 'DECREMENT':
return { ...state, count: state.count - 1 };
case 'INCREMENT_BY':
return { ...state, count: state.count + action.payload };
case 'SET_LOADING':
return { ...state, loading: action.payload };
default:
return state;
}
}
// counter.store.ts
@Injectable({
providedIn: 'root'
})
export class CounterStore extends Store<CounterState> {
constructor() {
super(counterReducer, initialState);
}
increment(): void {
this.dispatch({ type: 'INCREMENT' });
}
decrement(): void {
this.dispatch({ type: 'DECREMENT' });
}
incrementBy(amount: number): void {
this.dispatch({ type: 'INCREMENT_BY', payload: amount });
}
setLoading(loading: boolean): void {
this.dispatch({ type: 'SET_LOADING', payload: loading });
}
}
// 安装ngrx
// npm install @ngrx/store @ngrx/effects @ngrx/store-devtools
// app.module.ts
import { StoreModule } from '@ngrx/store';
import { EffectsModule } from '@ngrx/effects';
import { StoreDevtoolsModule } from '@ngrx/store-devtools';
import { counterReducer } from './store/counter.reducer';
import { CounterEffects } from './store/counter.effects';
@NgModule({
imports: [
StoreModule.forRoot({ counter: counterReducer }),
EffectsModule.forRoot([CounterEffects]),
StoreDevtoolsModule.instrument({
maxAge: 25 // 保留25个历史状态
})
]
})
export class AppModule { }
// 在组件中使用
import { Store } from '@ngrx/store';
import { Observable } from 'rxjs';
import { increment, decrement } from './store/counter.actions';
@Component({
selector: 'app-counter',
template: `
<div>计数: {{count$ | async}}</div>
<button (click)="increment()">+</button>
<button (click)="decrement()">-</button>
`
})
export class CounterComponent {
count$: Observable<number>;
constructor(private store: Store<{ counter: CounterState }>) {
this.count$ = this.store.select(state => state.counter.count);
}
increment(): void {
this.store.dispatch(increment());
}
decrement(): void {
this.store.dispatch(decrement());
}
}
// 使用trackBy优化ngFor
@Component({
template: `
<div *ngFor="let item of items$ | async; trackBy: trackById">
{{item.name}}
</div>
`
})
export class ItemListComponent {
items$ = this.stateService.select(state => state.items);
trackById(index: number, item: Item): string {
return item.id;
}
}
// 使用shareReplay避免重复计算
const expensiveData$ = this.http.get('/api/data').pipe(
shareReplay(1) // 缓存最新的值
);
// 使用debounceTime避免频繁更新
const searchResults$ = this.searchInput.valueChanges.pipe(
debounceTime(300),
distinctUntilChanged(),
switchMap(term => this.searchService.search(term))
);
// 使用OnPush变更检测策略
@Component({
selector: 'app-user-profile',
templateUrl: './user-profile.component.html',
changeDetection: ChangeDetectionStrategy.OnPush // 优化性能
})
export class UserProfileComponent {
@Input() user: User;
}
async管道自动管理订阅takeUntil模式手动管理订阅生命周期shareReplay避免重复计算distinctUntilChanged避免不必要的更新BehaviorSubjectngrx或ngxs// ❌ 错误:忘记取消订阅
ngOnInit() {
this.dataService.getData().subscribe(data => {
this.data = data;
});
}
// ✅ 正确:使用takeUntil
import { Subject } from 'rxjs';
import { takeUntil } from 'rxjs/operators';
export class MyComponent implements OnInit, OnDestroy {
private destroy$ = new Subject();
ngOnInit() {
this.dataService.getData()
.pipe(takeUntil(this.destroy$))
.subscribe(data => {
this.data = data;
});
}
ngOnDestroy() {
this.destroy$.next();
this.destroy$.complete();
}
}
// ✅ 正确:使用async管道
// 模板中: {{ data$ | async }}
// 1. 使用tap进行调试
observable$.pipe(
tap(value => console.log('值:', value)),
tap(value => console.log('类型:', typeof value)),
tap({
next: value => console.log('Next:', value),
error: err => console.error('Error:', err),
complete: () => console.log('Complete')
})
);
// 2. 使用Redux DevTools
// 安装: npm install @ngrx/store-devtools
// 配置后可以在浏览器开发者工具中查看状态变化
// 3. 创建自定义调试操作符
function debug(tag: string) {
return <T>(source: Observable<T>) => {
return source.pipe(
tap({
next: value => console.log(`[${tag}] Next:`, value),
error: err => console.error(`[${tag}] Error:`, err),
complete: () => console.log(`[${tag}] Complete`)
})
);
};
}
// 使用
data$.pipe(
debug('Data Stream')
).subscribe();