Angular状态管理(RxJS)

RxJS是Angular状态管理的核心,提供了强大的响应式编程能力,让状态管理变得更加可预测和可维护。

RxJS核心概念:Observable(可观察对象)、Observer(观察者)、Subscription(订阅)、Operators(操作符)、Subject(主体)

1. RxJS基础

1.1 Observable(可观察对象)

Observable表示一个可调用的未来值或事件的集合。

// 创建Observable
import { Observable, of, from, interval, fromEvent } from 'rxjs';

// 1. of - 创建同步Observable
const numbers$ = of(1, 2, 3, 4, 5);

// 2. from - 从数组、Promise等创建
const array$ = from([1, 2, 3, 4, 5]);
const promise$ = from(fetch('/api/data'));

// 3. interval - 创建定时器Observable
const timer$ = interval(1000); // 每秒发出一个递增数字

// 4. fromEvent - 从DOM事件创建
const button = document.getElementById('myButton');
const clicks$ = fromEvent(button, 'click');

// 订阅Observable
numbers$.subscribe({
  next: value => console.log('值:', value),
  error: err => console.error('错误:', err),
  complete: () => console.log('完成!')
});

// 输出:
// 值: 1
// 值: 2
// 值: 3
// 值: 4
// 值: 5
// 完成!
Observable时间线演示:
时间轴: 12345完成

1.2 Subject(主体)

Subject既是Observable又是Observer,可以多播给多个观察者。

import { Subject, BehaviorSubject, ReplaySubject, AsyncSubject } from 'rxjs';

// 1. Subject - 基本Subject
const subject = new Subject<string>();

// 订阅Subject
subject.subscribe(value => console.log('观察者A:', value));
subject.subscribe(value => console.log('观察者B:', value));

// 发送值
subject.next('Hello');
subject.next('World');

// 输出:
// 观察者A: Hello
// 观察者B: Hello
// 观察者A: World
// 观察者B: World

// 2. BehaviorSubject - 保存当前值并向新订阅者发送
const behaviorSubject = new BehaviorSubject<number>(0); // 初始值

behaviorSubject.subscribe(value => console.log('订阅1:', value));
behaviorSubject.next(1);
behaviorSubject.next(2);

behaviorSubject.subscribe(value => console.log('订阅2:', value)); // 收到最新值

// 输出:
// 订阅1: 0
// 订阅1: 1
// 订阅1: 2
// 订阅2: 2

// 3. ReplaySubject - 重放指定数量的值
const replaySubject = new ReplaySubject<number>(2); // 缓存最后2个值

replaySubject.next(1);
replaySubject.next(2);
replaySubject.next(3);

replaySubject.subscribe(value => console.log('订阅:', value));

// 输出:
// 订阅: 2
// 订阅: 3

// 4. AsyncSubject - 只在完成时发送最后一个值
const asyncSubject = new AsyncSubject<number>();

asyncSubject.subscribe(value => console.log('订阅:', value));

asyncSubject.next(1);
asyncSubject.next(2);
asyncSubject.next(3);
asyncSubject.complete(); // 完成时发送3

// 输出:
// 订阅: 3

2. RxJS操作符

2.1 常用操作符

import { map, filter, tap, take, debounceTime, distinctUntilChanged, switchMap } from 'rxjs/operators';

const source$ = of(1, 2, 3, 4, 5);

// 1. map - 转换值
const doubled$ = source$.pipe(
  map(x => x * 2)
);
// 结果: 2, 4, 6, 8, 10

// 2. filter - 过滤值
const even$ = source$.pipe(
  filter(x => x % 2 === 0)
);
// 结果: 2, 4

// 3. tap - 执行副作用
const logged$ = source$.pipe(
  tap(value => console.log('原始值:', value)),
  map(x => x * 2),
  tap(value => console.log('转换后:', value))
);

// 4. take - 取前N个值
const firstThree$ = source$.pipe(
  take(3)
);
// 结果: 1, 2, 3

// 5. debounceTime - 防抖
const searchInput = document.getElementById('search');
const search$ = fromEvent(searchInput, 'input').pipe(
  map((event: any) => event.target.value),
  debounceTime(300),
  distinctUntilChanged()
);

// 6. switchMap - 切换Observable(常用于HTTP请求)
search$.pipe(
  switchMap(searchTerm => {
    return this.http.get(`/api/search?q=${searchTerm}`);
  })
).subscribe(results => {
  console.log('搜索结果:', results);
});
操作符弹珠图示例:
源:      1---2---3---4---5---|
map(x => x * 2):
          2---4---6---8---10---|

filter(x => x % 2 === 0):
          2-------4-------10---|

take(3):
          1---2---3---|
                            

3. Angular中的状态管理

3.1 使用BehaviorSubject进行状态管理

// app-state.service.ts
import { Injectable } from '@angular/core';
import { BehaviorSubject, Observable } from 'rxjs';
import { map, distinctUntilChanged } from 'rxjs/operators';

export interface AppState {
  user: User | null;
  isLoading: boolean;
  theme: 'light' | 'dark';
  notifications: Notification[];
  cart: CartItem[];
}

const initialState: AppState = {
  user: null,
  isLoading: false,
  theme: 'light',
  notifications: [],
  cart: []
};

@Injectable({
  providedIn: 'root'
})
export class AppStateService {
  private state = new BehaviorSubject<AppState>(initialState);
  private state$ = this.state.asObservable();

  // 选择器 - 获取特定状态片段
  select<T>(selector: (state: AppState) => T): Observable<T> {
    return this.state$.pipe(
      map(selector),
      distinctUntilChanged()
    );
  }

  // 获取当前状态快照
  getSnapshot(): AppState {
    return this.state.getValue();
  }

  // 更新状态 - 部分更新
  updateState(partialState: Partial<AppState>): void {
    const currentState = this.getSnapshot();
    const newState = { ...currentState, ...partialState };
    this.state.next(newState);
  }

  // 特定action方法
  setLoading(isLoading: boolean): void {
    this.updateState({ isLoading });
  }

  setUser(user: User | null): void {
    this.updateState({ user });
  }

  setTheme(theme: 'light' | 'dark'): void {
    this.updateState({ theme });
  }

  addToCart(item: CartItem): void {
    const currentCart = this.getSnapshot().cart;
    const existingItem = currentCart.find(i => i.id === item.id);

    if (existingItem) {
      const updatedCart = currentCart.map(cartItem =>
        cartItem.id === item.id
          ? { ...cartItem, quantity: cartItem.quantity + item.quantity }
          : cartItem
      );
      this.updateState({ cart: updatedCart });
    } else {
      this.updateState({ cart: [...currentCart, item] });
    }
  }

  removeFromCart(itemId: string): void {
    const currentCart = this.getSnapshot().cart;
    const updatedCart = currentCart.filter(item => item.id !== itemId);
    this.updateState({ cart: updatedCart });
  }
}

3.2 在组件中使用状态服务

// app.component.ts
import { Component, OnInit, OnDestroy } from '@angular/core';
import { AppStateService } from './app-state.service';
import { Subscription } from 'rxjs';

@Component({
  selector: 'app-root',
  templateUrl: './app.component.html'
})
export class AppComponent implements OnInit, OnDestroy {
  isLoading = false;
  user: User | null = null;
  cartCount = 0;
  theme = 'light';

  private subscriptions: Subscription[] = [];

  constructor(private stateService: AppStateService) {}

  ngOnInit(): void {
    // 方式1: 直接订阅
    const loadingSub = this.stateService
      .select(state => state.isLoading)
      .subscribe(isLoading => {
        this.isLoading = isLoading;
      });

    // 方式2: 使用async pipe(推荐,自动取消订阅)
    // 在模板中: {{ isLoading$ | async }}
    this.user$ = this.stateService.select(state => state.user);
    this.theme$ = this.stateService.select(state => state.theme);
    this.cartCount$ = this.stateService.select(state =>
      state.cart.reduce((total, item) => total + item.quantity, 0)
    );

    // 方式3: 订阅多个状态
    const combinedSub = this.stateService
      .select(state => ({
        user: state.user,
        theme: state.theme
      }))
      .subscribe(({ user, theme }) => {
        this.user = user;
        this.theme = theme;
      });

    this.subscriptions.push(loadingSub, combinedSub);
  }

  login(): void {
    this.stateService.setLoading(true);

    // 模拟API调用
    setTimeout(() => {
      const user: User = {
        id: '1',
        name: '张三',
        email: 'zhangsan@example.com'
      };
      this.stateService.setUser(user);
      this.stateService.setLoading(false);
    }, 1000);
  }

  logout(): void {
    this.stateService.setUser(null);
  }

  toggleTheme(): void {
    const currentTheme = this.stateService.getSnapshot().theme;
    const newTheme = currentTheme === 'light' ? 'dark' : 'light';
    this.stateService.setTheme(newTheme);
  }

  ngOnDestroy(): void {
    // 手动清理订阅
    this.subscriptions.forEach(sub => sub.unsubscribe());
  }
}
<!-- app.component.html -->
<div [class.dark-theme]="(theme$ | async) === 'dark'" class="app-container">
  <header>
    <div class="user-info" *ngIf="user$ | async as user">
      欢迎, {{user.name}}!
      <button (click)="logout()">退出登录</button>
    </div>
    <button *ngIf="!(user$ | async)" (click)="login()" [disabled]="isLoading">
      {{isLoading ? '登录中...' : '登录'}}
    </button>

    <button (click)="toggleTheme()">
      切换主题 (当前: {{theme$ | async}})
    </button>

    <div class="cart">
      购物车: {{cartCount$ | async}} 件商品
    </div>
  </header>

  <div *ngIf="isLoading" class="loading-overlay">
    <i class="fas fa-spinner fa-spin"></i> 加载中...
  </div>

  <main>
    <!-- 应用内容 -->
  </main>
</div>

4. 高级状态管理模式

4.1 Redux模式实现

// store.types.ts
export interface Action {
  type: string;
  payload?: any;
}

export interface Reducer<T> {
  (state: T, action: Action): T;
}

// store.ts
import { Injectable } from '@angular/core';
import { BehaviorSubject, Observable } from 'rxjs';
import { map, distinctUntilChanged } from 'rxjs/operators';

@Injectable({
  providedIn: 'root'
})
export class Store<T> {
  private stateSubject: BehaviorSubject<T>;
  state$: Observable<T>;

  constructor(private reducer: Reducer<T>, initialState: T) {
    this.stateSubject = new BehaviorSubject<T>(initialState);
    this.state$ = this.stateSubject.asObservable();
  }

  getState(): T {
    return this.stateSubject.getValue();
  }

  dispatch(action: Action): void {
    const currentState = this.getState();
    const newState = this.reducer(currentState, action);
    this.stateSubject.next(newState);
  }

  select<R>(selector: (state: T) => R): Observable<R> {
    return this.state$.pipe(
      map(selector),
      distinctUntilChanged()
    );
  }
}
// counter.reducer.ts
export interface CounterState {
  count: number;
  loading: boolean;
}

const initialState: CounterState = {
  count: 0,
  loading: false
};

export function counterReducer(
  state: CounterState = initialState,
  action: Action
): CounterState {
  switch (action.type) {
    case 'INCREMENT':
      return { ...state, count: state.count + 1 };

    case 'DECREMENT':
      return { ...state, count: state.count - 1 };

    case 'INCREMENT_BY':
      return { ...state, count: state.count + action.payload };

    case 'SET_LOADING':
      return { ...state, loading: action.payload };

    default:
      return state;
  }
}

// counter.store.ts
@Injectable({
  providedIn: 'root'
})
export class CounterStore extends Store<CounterState> {
  constructor() {
    super(counterReducer, initialState);
  }

  increment(): void {
    this.dispatch({ type: 'INCREMENT' });
  }

  decrement(): void {
    this.dispatch({ type: 'DECREMENT' });
  }

  incrementBy(amount: number): void {
    this.dispatch({ type: 'INCREMENT_BY', payload: amount });
  }

  setLoading(loading: boolean): void {
    this.dispatch({ type: 'SET_LOADING', payload: loading });
  }
}

4.2 使用ngrx/store(官方状态管理库)

// 安装ngrx
// npm install @ngrx/store @ngrx/effects @ngrx/store-devtools

// app.module.ts
import { StoreModule } from '@ngrx/store';
import { EffectsModule } from '@ngrx/effects';
import { StoreDevtoolsModule } from '@ngrx/store-devtools';
import { counterReducer } from './store/counter.reducer';
import { CounterEffects } from './store/counter.effects';

@NgModule({
  imports: [
    StoreModule.forRoot({ counter: counterReducer }),
    EffectsModule.forRoot([CounterEffects]),
    StoreDevtoolsModule.instrument({
      maxAge: 25 // 保留25个历史状态
    })
  ]
})
export class AppModule { }

// 在组件中使用
import { Store } from '@ngrx/store';
import { Observable } from 'rxjs';
import { increment, decrement } from './store/counter.actions';

@Component({
  selector: 'app-counter',
  template: `
    <div>计数: {{count$ | async}}</div>
    <button (click)="increment()">+</button>
    <button (click)="decrement()">-</button>
  `
})
export class CounterComponent {
  count$: Observable<number>;

  constructor(private store: Store<{ counter: CounterState }>) {
    this.count$ = this.store.select(state => state.counter.count);
  }

  increment(): void {
    this.store.dispatch(increment());
  }

  decrement(): void {
    this.store.dispatch(decrement());
  }
}

5. 性能优化

// 使用trackBy优化ngFor
@Component({
  template: `
    <div *ngFor="let item of items$ | async; trackBy: trackById">
      {{item.name}}
    </div>
  `
})
export class ItemListComponent {
  items$ = this.stateService.select(state => state.items);

  trackById(index: number, item: Item): string {
    return item.id;
  }
}

// 使用shareReplay避免重复计算
const expensiveData$ = this.http.get('/api/data').pipe(
  shareReplay(1) // 缓存最新的值
);

// 使用debounceTime避免频繁更新
const searchResults$ = this.searchInput.valueChanges.pipe(
  debounceTime(300),
  distinctUntilChanged(),
  switchMap(term => this.searchService.search(term))
);

// 使用OnPush变更检测策略
@Component({
  selector: 'app-user-profile',
  templateUrl: './user-profile.component.html',
  changeDetection: ChangeDetectionStrategy.OnPush // 优化性能
})
export class UserProfileComponent {
  @Input() user: User;
}
状态流图:
Action
Reducer
State
View
单向数据流:Action → Reducer → State → View

6. 最佳实践

RxJS最佳实践:
  • 使用async管道自动管理订阅
  • 使用takeUntil模式手动管理订阅生命周期
  • 使用shareReplay避免重复计算
  • 使用distinctUntilChanged避免不必要的更新
  • 为复杂的流使用类型定义
  • 使用弹珠图理解和设计数据流
  • 在服务中封装业务逻辑
状态管理建议:
  • 根据应用复杂度选择合适的状态管理方案
  • 小型应用:使用BehaviorSubject
  • 中型应用:考虑Redux模式
  • 大型应用:使用ngrxngxs
  • 将状态逻辑与组件逻辑分离
  • 使用不可变数据结构
  • 实现撤销/重做功能考虑使用状态历史

7. 常见错误和解决方案

内存泄漏问题
// ❌ 错误:忘记取消订阅
ngOnInit() {
  this.dataService.getData().subscribe(data => {
    this.data = data;
  });
}

// ✅ 正确:使用takeUntil
import { Subject } from 'rxjs';
import { takeUntil } from 'rxjs/operators';

export class MyComponent implements OnInit, OnDestroy {
  private destroy$ = new Subject();

  ngOnInit() {
    this.dataService.getData()
      .pipe(takeUntil(this.destroy$))
      .subscribe(data => {
        this.data = data;
      });
  }

  ngOnDestroy() {
    this.destroy$.next();
    this.destroy$.complete();
  }
}

// ✅ 正确:使用async管道
// 模板中: {{ data$ | async }}
调试技巧
// 1. 使用tap进行调试
observable$.pipe(
  tap(value => console.log('值:', value)),
  tap(value => console.log('类型:', typeof value)),
  tap({
    next: value => console.log('Next:', value),
    error: err => console.error('Error:', err),
    complete: () => console.log('Complete')
  })
);

// 2. 使用Redux DevTools
// 安装: npm install @ngrx/store-devtools
// 配置后可以在浏览器开发者工具中查看状态变化

// 3. 创建自定义调试操作符
function debug(tag: string) {
  return <T>(source: Observable<T>) => {
    return source.pipe(
      tap({
        next: value => console.log(`[${tag}] Next:`, value),
        error: err => console.error(`[${tag}] Error:`, err),
        complete: () => console.log(`[${tag}] Complete`)
      })
    );
  };
}

// 使用
data$.pipe(
  debug('Data Stream')
).subscribe();