目录

200行代码理解 RxJS 的核心概念

RxJS 非常强大,各种操作符连接在一起便能让数据流动到需要用到它的地方,有人甚至觉得 RxJS 是魔法;

然而,太阳底下无新事,本教程将一步步揭开 RxJS 的神秘面纱;

本教程会通过 10 个有趣的小 demo 渐进式的实现 RxJS 的核心功能,其中包括:

  • 类:Observable 的实现
  • 类的方法:subscribe,pipe 的实现
  • 创建类操作符:of,from,fromEvent,interval,timer 的实现
  • 过滤类操作符:filter,take 的实现
  • 工具类操作符:tap 的实现
  • 组合类操作符:merge 的实现

其中还包括 RxJS v5 链式调用 和 RxJS v6 通过 pipe 来调用 的各自实现。

而这一切,只有 200 行不到的代码,如果感兴趣,开始你的愉快之旅吧!

写在前面

如何使用?

首先安装依赖:

1
yarn

需要注意的是,本教程的 demo 是用 typescript 写的;

所以,为了方便调试运行,建议使用 ts-node,可以通过如下方式安装:

1
yarn global add ts-node

安装好后,你可以用 ts-node 来运行 typescript 文件,例如:

1
ts-node demo01/index.ts

这样就可以看到运行结果了;

文件结构

每个 demo 文件夹下会有 4 个文件,其中:

  • core.ts 是当前阶段实现的 rxjs 的核心代码
  • index.ts 是引用官方 rxjs 写的测试代码
  • index1.ts 是引用前阶段实现的 rxjs 写的测试代码
  • readme.md 是对于当前阶段代码改动的介绍

可以发现 index.tsindex1.ts 中,除了引用不同,代码没有任何不同。

这样做是为了方便对比官方 rxjs 和自己实现 rxjs 的运行效果。

为了保证阅读效果,建议读者边阅读边动手实操,点击这里可以下载源码。

1、实现一个 Observable

RxJS 的一切起源与 Observable,Observable 是 RxJS 世界的基石,没有它,响应式无重谈起。

Observable 表示一个可观察对象,他表示一个可调用的未来值或事件的集合。

比如有以下代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
import { Observable } from "rxjs";

const dataStream$ = new Observable((observer) => {
  observer.next(1);
  setTimeout(() => {
    observer.next(2);
    observer.complete();
  }, 1000);
  observer.next(3);
});

const observer = {
  next: (x) => console.log(x),
  error: (err) => console.error(err),
  complete: () => console.log("done"),
};

dataStream$.subscribe(observer);

这段代码引用的是官方的 Observable, 它在运行后会首先打印一个 1,接着打印一个 3,隔一秒后会再打印一个 2,最后运行结束

仔细观察 Observable 方法,他会接受一个方法传进它的构造函数,这个方法接受一个对象,对象上有 next, error, complete 等属性,但是这个对象是 Observable 实例在调用 subscribe 方法时才传进去的:

有了上面的思路,我们可以大胆的构造出自己的 Observable, 如下:

1
2
3
4
5
6
7
8
9
export class Observable {
  _subscribe;
  constructor(subscribe) {
    this._subscribe = subscribe;
  }
  subscribe(observer) {
    this._subscribe(observer);
  }
}

把官方的 Observable 替换成自己的 Observable 会发现输出没什么差异。

没错,Observable 的核心就是这么简单。到现在,我们只用了 9 行代码实现便了 Rxjs 的核心概念之一 —— Observable

2、实现创建类操作符 of

创建类操作符中,最容易理解的莫过于 of,那么我们就先实现 of 操作符。

比如有如下代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
import { of } from "rxjs";

const dataStream$ = of(1, 2, 3);

const observer = {
  next: (x) => console.log(x),
  error: (err) => console.error(err),
  complete: () => console.log("done"),
};

dataStream$.subscribe(observer);

它在运行后会首先打印一个 1,接着打印一个 2,再会打印一个 3,最后运行结束。

有了前面自己实现的 Observableof 的实现就会变得非常简单,它实际上只是 Observable 外套了一层包装,本质上还是 Observable,实现如下:

1
2
3
4
5
6
7
8
export function of(...args) {
  return new Observable((observer) => {
    args.forEach((arg) => {
      observer.next(arg);
    });
    observer.complete();
  });
}

把官方的 of 替换成自己的 of ,再配上自己实现的 Observable,我们会发现输出和官方一致。

3、Observable.subscribe 可以传入一个方法作为参数

官方 Observablesubscribe 可以传入一个函数进去,这样的话写起来会清爽很多,如下:

1
2
3
4
5
import { of } from "rxjs";

const dataStream$ = of(1, 2, 3);

dataStream$.subscribe(console.log);

为了我们的 Observable 也能这样的好用,我们可以将 subscribe 适当的改造一下,如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
export class Observable {
  _subscribe;
  constructor(subscribe) {
    this._subscribe = subscribe;
  }
  subscribe(observer) {
    const defaultObserver = {
      next: () => {},
      error: () => {},
      complete: () => {},
    };
    if (typeof observer === "function") {
      return this._subscribe({ ...defaultObserver, next: observer });
    } else {
      return this._subscribe({ ...defaultObserver, ...observer });
    }
  }
}

这样我们的 subscribe 也能只传一个方法进去了,变得相当好用。

4、实现创建类操作符 fromEvent

但是,Rxjs 核心要解决的是数据流传输的问题,很多时候,我们的数据源头来自用户的人机交互,比如说点击按钮,这样的话就不得不用到 fromEvent,比如如下代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
import { fromEvent } from "rxjs";
import { JSDOM } from "jsdom";

const element = new JSDOM(
  `<div>Hello world</div>`
).window.document.querySelector("div");

const source$ = fromEvent(element, "click");

source$.subscribe(console.log);

setTimeout(() => {
  element.click();
}, 1000);

为了方便对比和测试,我们引用了 jsdom,它的作用是在 node 端可以做一些 dom 的相关操作。

以上代码渲染了一个 Hello world 的元素盒子,并且在一秒钟之后会点击这个盒子。与此同时,我们又使用了 Rxjs 中的 fromEvent 来监听盒子的事件。

为了实现自己的fromEvent,我们来分析一下 fromEvent 所需要的参数,第一个传的是 dom 元素的实例,第二个则是事件的类型,于是可以猜到, fromEvent 内部本质上还是通过原生的 addEventListener 来实现的。而且需要注意到,除非自己手动取消订阅,否则fromEvent创造的对象永远不会结束,根据这个推测,我们能猜到它的内部很有可能只有 next 方法。

有了上述的推断,我们很容易就写出了一个 fromEvent 方法,如下:

1
2
3
4
5
6
export function fromEvent(element, event) {
  return new Observable((observer) => {
    const handler = (e) => observer.next(e);
    element.addEventListener(event, handler);
  });
}

连我都惊讶它的实现居然如此简短,但是运行的效果和官方的效果完全一致。

5、实现创建类操作符 from、interval、timer

有了上面构造 offromEvent 的基础,那么 fromintervaltimer 也将不在话下。

例如,interval 操作符可以这样实现:

1
2
3
4
5
6
7
8
export function interval(delay) {
  return new Observable((observer) => {
    let index = 0;
    setInterval(() => {
      observer.next(index++);
    }, delay);
  });
}

timer 操作符可以这样实现:

1
2
3
4
5
6
7
export function timer(delay) {
  return new Observable((observer) => {
    setTimeout(() => {
      observer.next(0);
    }, delay);
  });
}

from 操作符的实现稍微比较复杂,因为它可以接受 Array 或者 Promise 类型的参数:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
export function from(param) {
  if (Array.isArray(param)) {
    return new Observable((observer) => {
      param.forEach((val) => observer.next(val));
      observer.complete();
    });
  }
  return new Observable((observer) => {
    Promise.resolve(param)
      .then((val) => {
        observer.next(val);
        observer.complete();
      })
      .catch((e) => {
        observer.error(e);
      });
  });
}

用我们自己实现的 fromintervaltimer 操作符来替换官方的操作符号,会发现执行结果和官方的表现的一致

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import { from, timer, interval } from "rxjs";

const dataStream1$ = from([1, 2, 3]);

const dataPromise = new Promise((res) => {
  setTimeout(() => {
    res("dataPromise");
  }, 1500);
});

const dataStream2$ = from(dataPromise);

const dataStream3$ = timer(1000);

const dataStream4$ = interval(1000);

setTimeout(() => {
  console.log("===== test from =====");
  dataStream1$.subscribe(console.log);
  dataStream2$.subscribe(console.log);
}, 1000);

setTimeout(() => {
  console.log("===== test timer =====");
  dataStream3$.subscribe(console.log);
}, 3000);

setTimeout(() => {
  console.log("===== test interval =====");
  dataStream4$.subscribe(console.log);
}, 5000);

6、为创建类操作符添加取消订阅功能

前面的操作复实现的过于顺利,以至于我隐隐觉得不安,是不是忽略了什么东西。

果然,我马上意识到,事情没有这么简单,上面自己实现的操作符,我无法取消订阅它们,处理不好这会造成严重的内存泄漏。

于是我们着手改造,以 fromEvent 为例,只需要给 Observable 构造函数传入的方法一个返回值,在这个返回值加一个 unsubscribe 属性,然后在这个属性中写入取消订阅的操作。

1
2
3
4
5
6
7
8
9
export function fromEvent(element, event) {
  return new Observable((observer) => {
    const handler = (e) => observer.next(e);
    element.addEventListener(event, handler);
    return {
      unsubscribe: () => element.removeEventListener(event, handler),
    };
  });
}

这样的话,我们的 fromEvent 就能顺利的取消订阅了。

将上面的操作符都整理一下加上返回值,看看我现阶段的成果:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
export class Observable {
  _subscribe;
  constructor(subscribe) {
    this._subscribe = subscribe;
  }
  subscribe(observer) {
    const defaultObserver = {
      next: () => {},
      error: () => {},
      complete: () => {},
    };
    if (typeof observer === "function") {
      return this._subscribe({ ...defaultObserver, next: observer });
    } else {
      return this._subscribe({ ...defaultObserver, ...observer });
    }
  }
}

export function of(...args) {
  return new Observable((observer) => {
    args.forEach((arg) => {
      observer.next(arg);
    });
    observer.complete();
    return {
      unsubscribe: () => {},
    };
  });
}

export function fromEvent(element, event) {
  return new Observable((observer) => {
    const handler = (e) => observer.next(e);
    element.addEventListener(event, handler);
    return {
      unsubscribe: () => element.removeEventListener(event, handler),
    };
  });
}

export function from(param) {
  if (Array.isArray(param)) {
    return new Observable((observer) => {
      param.forEach((val) => observer.next(val));
      observer.complete();
      return {
        unsubscribe: () => {},
      };
    });
  }
  return new Observable((observer) => {
    let canceld = false;
    Promise.resolve(param)
      .then((val) => {
        if (!canceld) {
          observer.next(val);
          observer.complete();
        }
      })
      .catch((e) => {
        observer.error(e);
      });
    return {
      unsubscribe: () => {
        canceld = true;
      },
    };
  });
}

export function interval(delay) {
  return new Observable((observer) => {
    let index = 0;
    const time = setInterval(() => {
      observer.next(index++);
    }, delay);
    return {
      unsubscribe: () => clearInterval(time),
    };
  });
}

export function timer(delay) {
  return new Observable((observer) => {
    const time = setTimeout(() => {
      observer.next(0);
    }, delay);
    return {
      unsubscribe: () => clearTimeout(time),
    };
  });
}

目前,我们用不到 100 行的代码基本实现了 ObservableoffromEventfromintervaltimer,而且实现的效果非常让人惊艳。

但是我们的旅行还没有结束,接下来,笔者将会实现一些好用的转换类操作符,比如 mapfilter。我们将分别用链式调用和 pipe 调用来分别实现他们,以及对比这两种方式使用的优劣。通过这些对比,我们会明白为什么 rxjs6 为什会使用 pipe 调用 而不延续 rxjs5 的链式调用方法。

7、实现转换类操作符 map 和过滤类操作符 filter(链式调用实现)

对于 mapfilter,同学们一定很熟悉,因为在 Array 的原型上就有这两个方法。map 表示转换,filter表示过滤,在 rxjs 中也是这样的功能,那我们应该如何实现这两个操作符呢?

同学可能暂时一头雾水,但是只要换个问题问,同学们一定手到擒来,怎么用 forEach 实现 Array 中的 map 和 filter?,这还不简单,于是大笔一挥,有了如下的代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
function map(array, fn) {
  const res = [];
  array.forEach((item) => res.push(fn(item)));
  return res;
}

function filter(array, fn) {
  const res = [];
  array.forEach((item) => (fn(item) ? res.push(item) : ""));
  return res;
}

有了上面的参考,那我们实现 rxjs 中的 mapfilter 就有了很多思路,我们先来考虑链式调用的实现。

有经验的同学肯定知道,上古神器 JQuery 就是典型的链式调用。链式调用有很多优点,特别是写起来非常优雅。

以 rxjs5 为例,有如下的一串代码:

1
2
3
4
5
6
7
const dataStream1$ = of(1, 2, 3);

dataStream1$
  .map((data) => data * 2)
  .filter((data) => data > 3)
  .map((data) => data + 1)
  .subscribe(console.log);

仔细观察我们会发现,实现链式调用的关键是 mapfilter 的返回值必须也要是 Observable 实例,而且 mapfilter 需要挂载到实例对象上。

有了上面的思路,并且结合前面 Array 中的 mapfilter 的实现,我们尝试写出了 rxjs5 中 mapfilter的实现,如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
export class Observable {
  _subscribe;
  constructor(subscribe) {
    this._subscribe = subscribe;
  }

  map(fn) {
    return new Observable((observer) => {
      this.subscribe({
        next: (val) => observer.next(fn(val)),
        error: (err) => observer.error(err),
        complete: () => observer.complete(),
      });
    });
  }

  filter(fn) {
    return new Observable((observer) => {
      this.subscribe({
        next: (val) => (fn(val) ? observer.next(val) : () => {}),
        error: (err) => observer.error(err),
        complete: () => observer.complete(),
      });
    });
  }
}

非常完美,之前写的代码片段可以顺利执行,打印出了 5 和 7。

但是我们也发现的了链式调用的缺陷 —— 方法都在实例上

这也就意味着哪怕仅仅用了一个转换操作符,也将会加载全部操作符。如果实例中的方法比较少,这还能忍受。但是像 rxjs 这样的库,转换类操作符和过滤类操作符加起来有几十种,这样的性能影响是无法忽略的。

而且这样的代码组织方式,打包工具的 tree shaking 将无法起作用,于是有了以下的 pipe 实现。

8、实现转换类操作符 map 和过滤类操作符 filter(pipe 调用实现)

pipe 的思想出现的是间比链式调用还要早,早在 Unix 中就有了 pipe,在 Linux 中,我们也能经常看到 | 符号。和前端相关的,有经验的开发同学会知道在 Gulp 也使用了大量的 pipe 的思想。

以一段 Gulp 的脚本为例:

1
2
3
4
5
6
const { src, dest } = require("gulp");
const babel = require("gulp-babel");

exports.default = function () {
  return src("src/*.js").pipe(babel()).pipe(dest("output/"));
};

乍一看和链式调用很像,但是由于 pipe 在中间做了一层隔离,实例对象和具体的转换方法解耦,所以完全没有了上面链式调用出现的问题。

rxjs 中的 pipe 实现更进一步,它能接受多个参数,参考如下写法:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
import { of } from "rxjs";
import { map, filter } from "rxjs/operators";

const dataStream1$ = of(1, 2, 3);

const dataStream2$ = dataStream1$
  .pipe(map((data) => data * 2))
  .pipe(filter((data) => data > 3))
  .pipe(map((data) => data + 1));

const dataStream3$ = dataStream1$.pipe(
  map((data) => data * 2),
  filter((data) => data > 3),
  map((data) => data + 1)
);

上面的代码中 dataStream2$dataStream3$ 实现的效果完全一致,所以想要 pipe 调用,核心在于如何实现这个加强版 pipe 方法。

一次性实现会有难度,我们可以尝试把这个问题分开,先实现只接受一个参数的 pipe,如下:

1
2
3
4
5
6
7
8
9
import { of } from "rxjs";
import { map, filter } from "rxjs/operators";

const dataStream1$ = of(1, 2, 3);

const dataStream2$ = dataStream1$
  .pipe(map((data) => data * 2))
  .pipe(filter((data) => data > 3))
  .pipe(map((data) => data + 1));

仔细观察 pipe, 我们会发现它接收一个 RxJS 操作符的运行结果作为参数,并返回一个 Observable 实例。

而且 mapfilter 不像之前链式调用那样挂载在 Observable 实例上,而仅仅是一个纯函数。

而且这个纯函数会在传入 pipe 的时候执行一次,目的是把具体 map 的逻辑传进去。然后在 pipe 方法里再执行一次,目的是吐出一个 Observable 以供后续使用。我们可以猜到 mapfilter 这样的操作符实际上是包了两层 “纸” 的 Observable 实例。

有了上面的思路,我们简单推断便可以写出如下的代码(完整的代码查看demo08):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
export class Observable {
  _subscribe;
  constructor(subscribe) {
    this._subscribe = subscribe;
  }

  // 接受单个参数的 pipe 实现
  pipe(operation) {
    return operation(this);
  }
}

export function filter(fn) {
  return (observable) =>
    new Observable((observer) => {
      observable.subscribe({
        next: (val) => (fn(val) ? observer.next(val) : () => {}),
        error: (err) => observer.error(err),
        complete: () => observer.complete(),
      });
    });
}

export function map(fn) {
  return (observable) =>
    new Observable((observer) => {
      observable.subscribe({
        next: (val) => observer.next(fn(val)),
        error: (err) => observer.error(err),
        complete: () => observer.complete(),
      });
    });
}

我们在 pipe 中将 当前的 Observable 实例传递给操作符以生成一个新的 Observable 实例。经过测试发现,可以完美的运行。就这样,我们实现了可以接收单个参数的 pipe。

接下来我们来 实现可以接受多个参数的 pipe,如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
import { of } from "rxjs";
import { map, filter } from "rxjs/operators";

const dataStream1$ = of(1, 2, 3);

const dataStream2$ = dataStream1$.pipe(
  map((data) => data * 2),
  filter((data) => data > 3),
  map((data) => data + 1)
);

尝试将上面的 pipe 方法做一些改造,从

1
2
3
pipe(operation) {
  return operation(this);
}

变成:

1
2
3
pipe(operation1,operation2,operation3) {
  return operation3(operation2(operation1(this)));
}

我们发现可以完美的运行接受三个参数的 pipe, 但是这样实在是太丑陋,而且参数是固定的,于是我们可以再优化一下,如下:

1
2
3
pipe(...operations) {
  return operations.reduce((prev, fn) => fn(prev), this);
}

这样,我们就实现了可以接受任意多参数的 pipe 了,最后写下来才发现,原来一行代码就搞定了。

9、tap 、 take 以及 merge 的实现

最后还有 taptake 以及 merge 的实现,实现的原理和前面提到的操作符大同小异。感兴趣的读者可以继续查阅 demo09demo10,由于篇幅原因,这里就不展开讲了。

10、完整的核心代码

最后,完整的核心代码如下:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
export class Observable {
  _subscribe;
  constructor(subscribe) {
    this._subscribe = subscribe;
  }

  pipe(...operations) {
    return operations.reduce((prev, fn) => fn(prev), this);
  }

  subscribe(observer) {
    const defaultObserver = {
      next: () => {},
      error: () => {},
      complete: () => {},
    };
    if (typeof observer === "function") {
      return this._subscribe({ ...defaultObserver, next: observer });
    } else {
      return this._subscribe({ ...defaultObserver, ...observer });
    }
  }
}

export function of(...args) {
  return new Observable((observer) => {
    args.forEach((arg) => {
      observer.next(arg);
    });
    observer.complete();
    return {
      unsubscribe: () => {},
    };
  });
}

export function fromEvent(element, event) {
  return new Observable((observer) => {
    const handler = (e) => observer.next(e);
    element.addEventListener(event, handler);
    return {
      unsubscribe: () => element.removeEventListener(event, handler),
    };
  });
}

/**
 * @param param array or promise
 */
export function from(param) {
  if (Array.isArray(param)) {
    return new Observable((observer) => {
      param.forEach((val) => observer.next(val));
      observer.complete();
      return {
        unsubscribe: () => {},
      };
    });
  }
  return new Observable((observer) => {
    let canceld = false;
    Promise.resolve(param)
      .then((val) => {
        if (!canceld) {
          observer.next(val);
          observer.complete();
        }
      })
      .catch((e) => {
        observer.error(e);
      });
    return {
      unsubscribe: () => {
        canceld = true;
      },
    };
  });
}

export function interval(delay) {
  return new Observable((observer) => {
    let index = 0;
    const time = setInterval(() => {
      observer.next(index++);
    }, delay);
    return {
      unsubscribe: () => clearInterval(time),
    };
  });
}

export function timer(delay) {
  return new Observable((observer) => {
    const time = setTimeout(() => {
      observer.next(0);
    }, delay);
    return {
      unsubscribe: () => clearTimeout(time),
    };
  });
}

export function filter(fn) {
  return (observable) =>
    new Observable((observer) => {
      observable.subscribe({
        next: (val) => (fn(val) ? observer.next(val) : () => {}),
        error: (err) => observer.error(err),
        complete: () => observer.complete(),
      });
    });
}

export function map(fn) {
  return (observable) =>
    new Observable((observer) => {
      observable.subscribe({
        next: (val) => observer.next(fn(val)),
        error: (err) => observer.error(err),
        complete: () => observer.complete(),
      });
    });
}

export function take(num) {
  return (observable) =>
    new Observable((observer) => {
      let times = 0;
      const subscription = observable.subscribe({
        next: (val) => {
          times++;
          if (num >= times) {
            observer.next(val);
          } else {
            observer.complete();
            subscription.unsubscribe();
          }
        },
        error: (err) => observer.error(err),
        complete: () => observer.complete(),
      });
    });
}

export function tap(fn) {
  return (observable) =>
    new Observable((observer) => {
      observable.subscribe({
        next: (val) => {
          fn(val);
          observer.next(val);
        },
        error: (err) => observer.error(err),
        complete: () => observer.complete(),
      });
    });
}

export function merge(...observables) {
  return (observable) => {
    let completeNum = 0;
    if (observable) {
      observables = [observable, ...observables];
    }
    return new Observable((observer) => {
      observables.forEach((observable) => {
        observable.subscribe({
          next: (val) => observer.next(val),
          error: (err) => {
            observables.forEach(observable.unsubscribe());
            observer.error(err);
          },
          complete: () => {
            completeNum++;
            if (completeNum === observables.length) {
              observer.complete();
            }
          },
        });
      });
    });
  };
}

写在最后

我们通过 10 个有趣的小 demo,不到 200 行的代码 渐进式的实现 RxJS 的核心功能。

希望读者们阅读完这篇文章后对 RxJS 有更深层次的理解。如果有更优雅的实现方式,欢迎留言区多多交流。

这些 demo 的完整代码可以点这里查看,如果感觉这些 demo 写的不错,可以给笔者一个 star,谢谢大家阅读