RxJS – Observables, observers 和 operators 简介

RxJS 是响应式编程 (reactive programming) 强大的工具,今天我们将深入介绍 Observables 和 Observers 的内容,以及介绍如何创建自己的操作符 (operators)。

如果你之前已经使用过 RxJS,并希望了解 Observable 及 Operators (操作符) 的内部工作原理,那么这篇文章非常适合你。

什么是 Observable

Observable 就是一个拥有以下特性的函数:

  • 它接收一个 observer 对象作为参数,该对象中包含 nexterrorcomplete 方法

  • 它返回一个函数,用于在销毁 Observable 时,执行清理操作

在我们实现的示例中,我们将定义一个简单的 unsubscribe 函数来实现取消订阅的功能。然而在 RxJS 中,返回的是 Subcription 对象,该对象中包含一个 unsubscribe 方法。

一个 Observable 对象设置观察者 (observer),并将它与生产者关联起来。该生产者可能是 DOM 元素产生的 clickinput 事件,也可能是更复杂的事件,如 HTTP。

为了更好地理解 Observable,我们来自定义 Observable。首先,我们先来看一个订阅的例子:

const node = document.querySelector('input[type=text]');const input$ = Rx.Observable.fromEvent(node, 'input');input$.subscribe({next: (event) => console.log(`You just typed ${event.target.value}!`),error: (err) => console.log(`Oops... ${err}`),complete: () => console.log(`Complete!`)
});

该示例中,Rx.Observable.formEvent() 方法接收一个 input 元素和事件名作为参数,然后返回一个 $input Observable 对象。接下来我们使用 subscribe() 方法来定于该 Observable 对象。当触发 input 事件后,对应的值将会传递给 observer 对象。

什么是 Observer

Observer (观察者) 非常简单,在上面的示例中,观察者是一个普通的对象,该对象会作为 subscribe() 方法的参数。此外 subscribe(next, error, complete) 也是一个有效的语法,但在本文中我们将讨论对象字面量的形式。

当 Observable 对象产生新值的时候,我们可以通过调用 next() 方法来通知对应的观察者。若出现异常,则会调用观察者的 error() 方法。当我们订阅 Observable 对象后,只要有新的值,都会通知对应的观察者。但在以下两种情况下,新的值不会再通知对应的观察者:

  • 已调用 observer 对象的 complete() 方法

  • 消费者对数据不再感兴趣,执行取消订阅操作

此外在执行最终的 subscribe() 订阅操作前,我们传递的值可以经过一系列的链式处理操作。执行对应操作的东西叫操作符,每个操作符执行完后会返回一个新的 Observable 对象,然后继续我们的处理流程。

什么是 Operator

正如上面所说的,Observable 对象能够执行链式操作,具体如下所示:

const input$ = Rx.Observable.fromEvent(node, 'input').map(event => event.target.value).filter(value => value.length >= 2).subscribe(value => {// use the `value`
});

上面代码的执行流程如下:

  • 假设用户在输入框中输入字符 a

  • Observable 对象响应对应的 input 事件,然后把值传递给 observer

  • map() 操作符返回一个新的 Observable 对象

  • filter() 操作符执行过滤操作,然后又返回一个新的 Observable 对象

  • 最后我们通过调用 subscribe() 方法,来获取最终的值

简而言之,Operator 就是一个函数,它接收一个 Observable 对象,然后返回一个新的 Observable 对象。当我们订阅新返回的 Observable 对象时,它内部会自动订阅前一个 Observable 对象。

自定义 Observable

Observable 构造函数

function Observable(subscribe) {this.subscribe = subscribe;
}

每个 subscribe 回调函数被赋值给 this.subscribe 属性,该回调函数将会被我们或其它 Observable 对象调用。

Observer 示例

在我们深入介绍前,我们先来看一个简单的示例。之前我们已经创建完 Observable 函数,现在我们可以调用我们的观察者 (observer),然后传递数值 1,然后订阅它:

const one$ = new Observable((observer) => {observer.next(1);observer.complete();
});one$.subscribe({next: (value) => console.log(value) // 1
});

即我们订阅我们创建的 Observable 实例,然后通过 subscribe() 方法调用通过构造函数设置的回调函数。

Observable.fromEvent

下面就是我们需要的基础结构,即在 Observable 对象上需要新增一个静态方法 fromEvent

Observable.fromEvent = (element, name) => { };

接下来我们将参考 RxJS 为我们提供的方法来实现自定义的 fromEvent() 方法:

const node = document.querySelector('input');
const input$ = Observable.fromEvent(node, 'input');

按照上面的使用方式,我们的 fromEvent() 方法需要接收两个参数,同时需要返回一个新的 Observable 对象,具体如下:

Observable.fromEvent = (element, name) => {return new Observable((observer) => {});
};

接下来我们来实现事件监听功能:

Observable.fromEvent = (element, name) => {return new Observable((observer) => {element.addEventListener(name, (event) => {}, false);});
};

那么我们的 observer 参数来自哪里? 其实 observer 对象就是包含 nexterrorcomplete 方法的对象字面量。

需要注意的是,我们的 observer 参数不会被传递,直到 subscribe() 方法被调用。这意味着 addEventListener() 方法不会被调用,除非你订阅该 Observable 对象。

当我们调用 subscribe() 方法,之前设置的 this.subscribe 回调函数会被调用,对应的参数是我们定义的 observer 对象字面量,接下来将使用新的值,作为 next() 方法的参数,调用该方法。

很好,那接下来我们要做什么?之前版本我们只是设置了监听,但没有调用 observer 对象的 next() 方法,接下来让我们来修复这个问题:

Observable.fromEvent = (element, name) => {return new Observable((observer) => {element.addEventListener(name, (event) => {observer.next(event);}, false);});
};

如你所知,当销毁 Observables 对象时,需要调用一个函数用来执行清理操作。针对目前的场景,在销毁时我们需要移除事件监听:

Observable.fromEvent = (element, name) => {return new Observable((observer) => {const callback = (event) => observer.next(event);element.addEventListener(name, callback, false);return () => element.removeEventListener(name, callback, false);});
};

我们没有调用 complete() 方法,因为该 Observable 对象处理的 DOM 相关的事件,在时间维度上它们可能是无终止的。

现在让我们来验证一下最终实现的功能:

const node = document.querySelector('input');
const p = document.querySelector('p');function Observable(subscribe) {this.subscribe = subscribe;
}Observable.fromEvent = (element, name) => {return new Observable((observer) => {const callback = (event) => observer.next(event);element.addEventListener(name, callback, false);return () => element.removeEventListener(name, callback, false);});
};const input$ = Observable.fromEvent(node, 'input');const unsubscribe = input$.subscribe({next: (event) => {p.innerHTML = event.target.value;}
});// automatically unsub after 5s
setTimeout(unsubscribe, 5000);

自定义操作符

创建我们自己的操作符应该会更容易一些,现在我们了解 ObservableObservable 背后的概念。我们将在 Observable 的原型对象上添加一个方法:

Observable.prototype.map = function (mapFn) { };

该方法的功能与 JavaScript 中的 Array.prototype.map 方法类似:

const input$ = Observable.fromEvent(node, 'input').map(event => event.target.value);

所以我们需要应用回调函数并调用它,这用于获取我们所需要的数据。在我们这样做之前,我们需要流中的最新值。这里是巧妙的部分,在 map() 操作符中,我们需要访问 Observable 实例。因为 map 方法在原型上,我们可以通过以下方式访问 Observable 实例:

Observable.prototype.map = function (mapFn) {const input = this;
};

接下来我们在返回的 Observable 对象中执行 input 对象的订阅操作:

Observable.prototype.map = function(mapFn) {const input = this;return new Observable((observer) => {return input.subscribe();});
};

我们返回了 input.subscribe() 方法执行的结果,因为当我们执行取消订阅操作时,将会依次调用每个 Observable 对象取消订阅的方法。

最后我们来完善一下 map 操作符的内部代码:

Observable.prototype.map = function (mapFn) {const input = this;return new Observable((observer) => {return input.subscribe({next: (value) => observer.next(mapFn(value)),error: (err) => observer.error(err),complete: () => observer.complete()});});
};

现在我们已经可以执行链式操作了:

const input$ = Observable.fromEvent(node, 'input').map(event => event.target.value);input$.subscribe({next: (value) => {p.innerHTML = value;}
});

我有话说

Observable 与 Promise 有什么区别?

Observable(可观察对象)是基于推送(Push)运行时执行(lazy)的多值集合。

MagicQ 单值 多值
拉取(Pull) 函数 遍历器
推送(Push) Promise Observable
  • Promise

    • 返回单个值

    • 不可取消的

  • Observable

    • 随着时间的推移发出多个值

    • 可以取消的

    • 支持 map、filter、reduce 等操作符

    • 延迟执行,当订阅的时候才会开始执行

什么是 SafeObserver ?

上面的示例中,我们使用一个包含了 next、error、complete 方法的普通 JavaScript 对象来定义观察者。一个普通的 JavaScript 对象只是一个开始,在 RxJS 5 里面,为开发者提供了一些保障机制,来保证一个更安全的观察者。以下是一些比较重要的原则:

  • 传入的 Observer 对象可以不实现所有规定的方法 (next、error、complete 方法)

  • complete 或者 error 触发之后再调用 next 方法是没用的

  • 调用 unsubscribe 方法后,任何方法都不能再被调用了

  • completeerror 触发后,unsubscribe 也会自动调用

  • nextcompleteerror 出现异常时,unsubscribe 也会自动调用以保证资源不会浪费

  • nextcompleteerror是可选的。按需处理即可,不必全部处理

为了完成上述目标,我们得把传入的匿名 Observer 对象封装在一个 SafeObserver 里以提供上述保障。

若想进一步了解详细信息,请参考 Observable详解 文章中 "自定义 Observable" 章节的内容。

参考资源

  • rxjs-observables-observers-operators

Published by

风君子

独自遨游何稽首 揭天掀地慰生平

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注