来自 Web前端 2020-05-07 05:42 的文章
当前位置: 网上澳门金莎娱乐 > Web前端 > 正文

网上澳门金莎娱乐:rxjs

时间: 2019-09-03阅读: 170标签: rxjs主题(Subjects)

rxjs使用观察者模式、迭代器模式以及函数式编程实现一种理想的、管理序列事件的方式
rxjs的基础概念

什么是主题?RxJS 主题就是一个特性类型的 Observable 对象,它允许值多路广播给观察者(Observers)。当一个简单的 Observable 是单播的(每个订阅的观察者它们自己都依赖 Observable 的执行)时候,主题(Subjects)就是多播的。

  1. Observable:是一个包含多个值的集合,这些值都是懒推送(lazy push)进集合中的
  2. Observer
  3. Subscription
  4. Operators
  5. Subject
  6. Schedulers

Subjects 就像是一个 Observable,但是它能多播到多个观察者(Observers)。Subjects 就像是事件发射器:它们维护众多侦听者的注册。

pull push system

先搞清楚两个角色:

  1. 生产者(producer): 数据产生的地方
  2. 消费者(consumer): 数据使用的地方

pull系统:consumer决定什么时候接受producer生产的数据,比如函数,就是一个pull system,它只生产数据,并不知道什么时候这些数据会被使用。
push系统:producer决定什么时候把生产的数据传递给消费者,如promise,promise决定什么时候把生产的值“push”给callback函数

每一个 Subject 都是一个 Observable。给定一个 Subject,你就能订阅它,提供一个 Observer,开始正常接收值。从 Observer 它的角度讲,它不知道 Observable 的执行是否来自普通的单播 Observable 或是 Subject 。

Observable

Observable的核心概念:

  1. creating Observables:可以通过Rx.Observable.create创建,或者通过所谓的创建操作如:of、from、interval等创建
  2. Subscribing Observables:Observables的注册就像调用一个函数,这个函数提供一个回调函数,数据最终会在这个回调函数中使用
  3. Executing the Observable:Observable.create(function subscribe(observer) {...})中的一段代码,在Excution中,如果error或者complete执行了,那么后续的observer就不会执行
  4. Disposing Observables:处理Observables。Executing Observables可能是循环的,需要一个unsubscribe()去终止这个无限循环

概念很不好理解,下面是一个js写的简易版Observale,仅帮助理解,注释表明了1,2,3的含义

var observerOrigin = function(nextSelf) {
  this.nextSelf = nextSelf ? nextSelf : null
}
observerOrigin.closed = false
observerOrigin.prototype.next = function(val) {
  if (observerOrigin.closed) {
    return
  }
  this.nextSelf ? this.nextSelf.call(this, val) : console.log(val)
}
observerOrigin.prototype.error = function(error) {
  if (observerOrigin.closed) {
    return
  }
  observerOrigin.closed = true
  console.error(error)
}
observerOrigin.prototype.complete = function(val) {
  if (observerOrigin.closed) {
    return
  }
  observerOrigin.closed = true
  console.log('complete')
}

var observable = function(subscribeFun) {
  this.subscribeFun = subscribeFun
}
observable.prototype.subscribe = function(observer) {
  let observerInner = new observerOrigin(observer) // a observer
  this.subscribeFun.call(this, observerInner)
  return this
}
/* subscribeFunEx: Executing Observables
   Executing Observables执行的过程中,如果执行了observer的error或者complete,后续的其他操作就不会执行
*/
var subscribeFunEx = function(observer) {
  observer.next(1)
  observer.next(2)
  // observer.error('throw a error')
  // observer.complete()
  observer.next(3)
}
// Creating Observables
var observableEx = new observable(subscribeFunEx)
observableEx
  .subscribe()   // Subscribing to Observables
  .subscribe((val) => console.log('next: ', val))   // Subscribing to Observables

对于第四点,官方示例如下

function subscribe(observer) {
  var intervalID = setInterval(() => {
    observer.next('hi');
  }, 1000);

  return function unsubscribe() {
    clearInterval(intervalID);
  };
}

var unsubscribe = subscribe({next: (x) => console.log(x)});

// Later:
unsubscribe(); // dispose the resources

在 Subject 内部,subscribe不会调用新的执行来发送值。它只是简单的在观察者列表中注册一个观察者,跟在其他库和语言中的addListener的做法是很相似的。

Observer

Observer是一个对象,这个对象有三个回调函数(next,error,complete),任何一个回调函数都可能调用

每个 Subject 也是一个 Observer。它通过next(v),error(e),complete()是一个对象。为了给 Subject 提供一个新值,只需要调用next(theValue),那么它将会多播给注册侦听到 Subject 的观察者。

Subscription

var subscription = observable.subscribe(x => console.log(x)),subscription有一个unsubscribe()方法释放所有的资源并且取消Observable的执行,也可以通过add()将多个subscription放在一起(个人感觉类似数组的unshift),这个时候调用一个subscription的unsubscribe()方法可能会将多个Subscriptionunsubscribe()

var observable1 = Rx.Observable.interval(400);
var observable2 = Rx.Observable.interval(300);

var subscription = observable1.subscribe(x => console.log('first: ' + x));
var childSubscription = observable2.subscribe(x => console.log('second: ' + x));

subscription.add(childSubscription);

setTimeout(() => {
  // Unsubscribes BOTH subscription and childSubscription
  subscription.unsubscribe();
}, 1000);

Subscription还有一个remove(otherSubscription)方法

下面是一个例子,我们有附加了两个观察者对象,并且我们发送一些值给 Subject:

Subject

一个Subject就是一个Observable,和Observable的区别是,Subject可以多播多个observers,它就是一个注册器,订阅者将自己想要订阅的事件注册到注册中心。Subject的subscribe并不会立即执行传递过来的值,它只是将订阅的事件放到一个observers的list中,类似addListener的作用
一个Subject也是一个Observer,通过next(value)可以将值多播至注册在Subject中的订阅事件

var subject = new Rx.Subject();  // 一个Observables

subject.subscribe({  // subscribe类似于别的语言中的addListener
  next: (v) => console.log('observerA: ' + v)  // 订阅的事件
});
subject.subscribe({
  next: (v) => console.log('observerB: ' + v) // 订阅的事件
});

subject.next(1); // 将value值1多播至上面的订阅事件中
subject.next(2);// 将value值2多播至上面的订阅事件中

打印结果:

observerA: 1
observerB: 1
observerA: 2
observerB: 2

subject也是一个observer,所以也可以observable.subscribe(subject)

import { Subject } from 'rxjs';const subject = new Subjectnumber();subject.subscribe({ next: (v) = console.log(`observerA: ${v}`)});subject.subscribe({ next: (v) = console.log(`observerB: ${v}`)});subject.next(1);subject.next(2);//Logs:// observerA: 1// observerB: 1// observerA: 2// observerB: 2

Muticasted Observables

multicast返回的的Observable的subscribe方法和Subject的subscribe方法作用相同(即类似其他语言的addListener),connect方法调用的是observable的subscribe

var source = Rx.Observable.from([1, 2, 3]);
var subject = new Rx.Subject();
var multicasted = source.multicast(subject);

// These are, under the hood, `subject.subscribe({...})`:
multicasted.subscribe({
  next: (v) => console.log('observerA: ' + v)
});
multicasted.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

// This is, under the hood, `source.subscribe(subject)`:
multicasted.connect();

因为 Subject 是一个观察者,也就是说你也许会提供一个 Subject 作为参数给subscribe到任何 Observable,就像下面这个例子:

BehaviorSubject

BehaviorSubject表示“随着时间变化的值”,例如人的生日是不变的时间流,使用Subject,那么人的年龄就是随着时间变化的事件流,用BehaviorSubject表示

var subject = new Rx.BehaviorSubject(0); // 0 is the initial value

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

subject.next(1);
subject.next(2);

subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

subject.next(3);

输出

observerA: 0
observerA: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3

如果将上述的BehaviorSubject换成Subject,输出将变为

observerA: 1
observerA: 2
observerA: 3
observerB: 3
import { Subject, from } from 'rxjs'; const subject = new Subjectnumber();subject.subscribe({ next: (v) = console.log(`observerA: ${v}`)})subject.subscribe({ next: (v) = console.log(`observerB: ${v}`)}); const observable = from([1, 2, 3]); observable.subscribe(subject); // 你可以订阅已经提供的 observable 对象// Logs:// observerA: 1// observerB: 1// observerA: 2// observerB: 2// observerA: 3// observerB: 3

ReplaySubject

ReplaySubject记录多个来自Observable excution的值,并将它们分配给新的subscribes

var subject = new Rx.ReplaySubject(3); // buffer 3 values for new subscribers

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);

subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

subject.next(5);

输出:

observerA: 1
observerA: 2
observerA: 3
observerA: 4
observerB: 2
observerB: 3
observerB: 4
observerA: 5
observerB: 5

设置一个windowTime来决定,分配多少个并且最近windowTime时间内来自Observable excution的值

// Rx.ReplaySubject(10, 1000),observerB记录1开始至结束的值
// Rx.ReplaySubject(10, 500),observerB记录3开始至结束(500ms~1000ms之前的buffer)的值
// Rx.ReplaySubject(3, 1000),observerB记录3开始至结束(只subscribe3个buffer)的值
var subject = new Rx.ReplaySubject(10, 1000 /* windowTime */);

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

var i = 1;
setInterval(() => subject.next(i++), 200);

setTimeout(() => {
  subject.subscribe({
    next: (v) => console.log('observerB: ' + v)
  });
}, 1000);

setTimeout(() => {
  subject.unsubscribe();
}, 5000);

通过上面的方法,本质上我们就仅仅只是通过 Subject 把单播的可观察的执行转成了多播的。这个例子演示了主题如何让多个观察者共享 Observable 的执行的唯一方法。

AsyncSubject

AsyncSubject中,只有最后一个值会传递给observers

var subject = new Rx.AsyncSubject();

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);

subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

subject.next(5);
subject.complete();

输出

observerA: 5
observerB: 5

这里还有一些特殊的 Subject 类型:BehaviorSubject,ReplaySubject,AsyncSubject。

Operator

Operator是函数,这个函数会根据原来的Observable创建一个新的Observable,并且不会改变原来的Observable。.map(...), .filter(...), .merge(...)都是Operator

function multiplyByTen(input) {
  var output = Rx.Observable.create(function subscribe(observer) {
    input.subscribe({
      next: (v) => observer.next(10 * v),
      error: (err) => observer.error(err),
      complete: () => observer.complete()
    });
  });
  return output;
}

var input = Rx.Observable.from([1, 2, 3, 4]);
var output = multiplyByTen(input);
output.subscribe(x => console.log(x));

输出

10
20
30
40

转化成js(简易版,很多漏洞,望指教)

var observerOrigin = function(nextSelf) {
  this.nextSelf = nextSelf ? nextSelf : null
}
observerOrigin.closed = false
observerOrigin.prototype.next = function(val) {
  if (observerOrigin.closed) {
    return
  }
  this.nextSelf ? this.nextSelf.call(this, val) : console.log(val)
}
observerOrigin.prototype.error = function(error) {
  if (observerOrigin.closed) {
    return
  }
  observerOrigin.closed = true
  console.error(error)
}
observerOrigin.prototype.complete = function() {
  if (observerOrigin.closed) {
    return
  }
  observerOrigin.closed = true
  console.log('complete')
}

var observable = function(subscribeFun) {
  this.subscribeFun = subscribeFun
}
observable.prototype.subscribe = function(observer) {
  let observerInner = new observerOrigin(observer) // a observer
  this.subscribeFun.call(this, observerInner)
  return this
}
/* subscribeFunEx: Executing Observables
   Executing Observables执行的过程中,如果执行了observer的error或者complete,后续的其他操作就不会执行
*/
var subscribeFunEx = function(observer) {
  observer.next(1)
  observer.next(2)
  observer.next(3)
  observer.next(4)
}

function multiplyByTen(input) {
  var output = new observable(function subscribe(observer) {
    input.subscribe(v => observer.next(10 * v))
  });
  return output;
}
// Creating Observables
var input = new observable(subscribeFunEx)
var output = multiplyByTen(input);
output.subscribe(v => console.log(v))

由上可以看出:output的subscribe会导致input的subscribe,这叫做“operator subscription chain”

多播 Observables

Instance operators versus static operators(实例运算符与静态运算符)

在Instance operators中,this关键字是输入的Observable,通过input
observable创建一个observable。static operators是通过Observable对象从头开始创建一个Observable

一个 “多播Observable” 通过一个 Subject 传递通知,它可能会有很多订阅者,而一个普通的 “单播 Observable” 只会发送通知到单个观察者。

Scheduler

一个scheduler可以定义在什么样的执行环境中,observable会把通知传递给observer

var observable = Rx.Observable.create(function (proxyObserver) {
  proxyObserver.next(1);
  proxyObserver.next(2);
  proxyObserver.next(3);
  proxyObserver.complete();
})
.observeOn(Rx.Scheduler.async);

var finalObserver = {
  next: x => console.log('got value ' + x),
  error: err => console.error('something wrong occurred: ' + err),
  complete: () => console.log('done'),
};

console.log('just before subscribe');
observable.subscribe(finalObserver);
console.log('just after subscribe');

proxyObserver是在observeOn(Rx.Scheduler.async)中创建的,scheduler在Observable.create和最终的Observer之间创建了一个proxyObserver,proxyObserver实质上通过setTimeout或者setInterval操作来实现一个延迟执行(delay)

一个多播 Observable 在后台(hood) 用一个 Subject 让多个观察者都能看到相同的 Observable 执行。

Scheduler种类

Scheduler 目的
null 消息同步的递归的传递,
Rx.Scheduler.queue 在当前时间框架的队列中按时间表传递,用于迭代操作
Rx.Scheduler.asap 在微型任务队列中按时间表传递,例如NodeJs的nextTick()、Web Worker的MessageChannel、setTimeout()或者其他的,用于转换成异步操作
Rx.Scheduler.async Scheduler通过setInterval工作,用于基于事件的操作

在后台,multicast又是如何工作的呢:观察者订阅一个基础的 Subject,并且这个 Subject 订阅了源 Observable。下面的例子跟上面的例子很相似,它使用了observable.subscribe(subject):

使用 Schedulers

Static creation operators通常有一个Scheduler作为最后一个函数参数,如 from(array, scheduler)
Scheduler.subscribeOn决定subscribe()在什么环境中执行
Scheduler.observeOn决定在什么环境中传递通知
Instance operators有一个Scheduler作为函数参数

import { from, Subject } from 'rxjs';import { multicast } from 'rxjs/operators';const source = from([1, 2, 3]);const subject = new Subject();const multicasted = source.pipe(multicast(subject));//这里在后台就是 `subject.subscribe({...})`multicasted.subscribe({ next: (v) = console.log(`observableA: ${v}`);});multicasted.subscribe({ next: (v) = console.log(`observableB: ${v}`);});//这个带后台就是 `source.subscribe(subject)`multicasted.connect();

multicast返回一个看起来想平常使用的 Observable,但是工作却像 Subject,当它订阅的时候。multicast返回的实际是ConnectableObservable,它只是一个使用connect()方法的 Observable。

当那些共享的 Observable 的执行开始执行的时候connect()方法明确执行是非常重要的。因为connect()会在后台执行source.subscribe(subject),connect()返回一个 Subscription,它使你能够取消订阅,从而取消那些共享的 Observable 的执行。

引用计数(Reference counting)

手动调用connect()处理订阅(Subscription)是很麻烦的。通常,我们想要当第一个观察者(Observer)到达的时候自动连接,以及当最后一个观察者取消订阅的时候自动取消公共的执行。

考虑下面例子,它的订阅按此列表概述的发生:

第一个观察者订阅多播 Observable多播 Observable 连接next发送值 0 给第一个观察者第二个观察者订阅多播 Observablenext发送值 1 给第一个观察者next发送值 1 给第一个观察者第一个观察者从多播 Observable 取消订阅next发送值 2 给第二个观察者第二个观察者从多播 Observable 取消订阅连接的多播 Observable 取消订阅

为了达成上述过程,显示调用connect(),我们编写如下代码:

本文由网上澳门金莎娱乐发布于Web前端,转载请注明出处:网上澳门金莎娱乐:rxjs

关键词: