observants
mini RxJS implementation
Last updated 3 years ago by andy*() .
MIT · Repository · Bugs · Original npm · Tarball · package.json
$ cnpm install observants 
SYNC missed versions from official npm registry.

observants

observants is a mini RxJS implementation with the following operators: map, filter, reduce, scan, every, of, from, skip, take, range, timer, interval, fromEvent, fromPromise, pipe

Examples

const { Observable, Observer, Subscriber } = require('observants');

let observable1 = Observable.create(observer => {
  let i = 0
  observer.next(i++)
  const interval = setInterval(() => {
    observer.next(i++)
  }, 200)

  setTimeout(() => {
    observer.next('next') // next not dispatched after unsubscribe
    observer.complete() // complete not dispatched after unsubscribe
    observer.error('error') // error not dispatched after unsubscribe
  }, 600)

  return () => {
    console.log('clearInterval')
    clearInterval(interval)
  }
})

let subscription1 = observable1.subscribe(
  value => console.log(value),
  error => console.log(error),
  () => console.log('completed')
)

setTimeout(() => {
  subscription1.unsubscribe()
}, 500)

//=> 0
//=> 1
//=> 2
//=> clearInterval


let observable2 = new Observable(observer => {
  let timer = setTimeout(() => {
    observer.next('hello')
    observer.complete() // complete will trigger unsubscribe
    observer.next('world') // next not dispatched after complete
  }, 1000)

  return () => {
    clearTimeout(timer)
    console.log('clearTimeout')
  }
})

let subscription2 = observable2.subscribe(
  value => console.log(value),
  undefined,
  () => console.log('completed')
)

//=> hello
//=> completed
//=> clearTimeout


let observable3 = new Observable(observer => {
  let timer = setTimeout(() => {
    observer.next('hello')
    observer.error('error') // error will trigger unsubscribe
    observer.next('world') // next not dispatched after error
  }, 1000)

  return () => {
    clearTimeout(timer)
    console.log('clearTimeout')
  }
})

let subscription3 = observable3.subscribe(
  value => console.log(value),
  error => console.log(error),
  undefined
)

//=> hello
//=> error
//=> clearTimeout


setTimeout(() => {
  Observable.of(4, 5, 6).subscribe(
    x => console.log(x),
    error => console.log(error),
    () => console.log('completed')
  )
}, 1000)

//=> 4
//=> 5
//=> 6
//=> completed


setTimeout(() => {
  Observable.of(5, 6).map(x => x + 2).filter(x => x % 2 === 0).subscribe(
    x => console.log(x),
    error => console.log(error),
    () => console.log('completed')
  )
}, 1500)

//=> 8
//=> completed


setTimeout(() => {
  Observable.of(1, 2, 3).reduce((acc, x) => acc + x).subscribe(
    x => console.log(x),
    error => console.log(error),
    () => console.log('completed')
  )
}, 2000)

//=> 6
//=> completed


setTimeout(() => {
  Observable.of(1, 2, 3).scan((acc, x) => acc + x).subscribe(
    x => console.log(x),
    error => console.log(error),
    () => console.log('completed')
  )
}, 2500)

//=> 1
//=> 3
//=> 6
//=> completed


setTimeout(() => {
  Observable.of(5, 6).map(x => x + 2).do(x => console.log('do:' + x))
  .filter(x => x % 2 === 0).subscribe(
    x => console.log(x),
    error => console.log(error),
    () => console.log('completed')
  )
}, 3000)

//=> do:7
//=> do:8
//=> 8
//=> completed


setTimeout(() => {
  let subscription1 = Observable.interval(200).subscribe(
    x => console.log(x),
    error => console.log(error),
    () => console.log('completed')
  )
  setTimeout(() => {
    subscription1.unsubscribe()
  }, 500)
}, 3500)

//=> 0
//=> 1

setTimeout(() => {

  Observable.of().defaultIfEmpty('Empty!').subscribe(val => console.log(val))
  //=> Empty!

  Observable.range(1, 3).subscribe(val => console.log(val))
  //=> 1
  //=> 2
  //=> 3

  Observable.of(1, 2, 3).every(val => val % 2 === 0).subscribe(val => console.log(val))
  //=> false

  Observable.from([1, 2, 3]).subscribe(val => console.log(val))
  //=> 1
  //=> 2
  //=> 3

  Observable.of(1,2,3,4).skip(2).subscribe(console.log)
  //=> 3
  //=> 4

  Observable.of(1, 2, 3).take(1).subscribe(val => console.log(val))
  //=> 1

  Observable.fromPromise(new Promise(resolve => {resolve('Resolved!')}))
    .subscribe(val => console.log(val))
  //=> Resolved!

  Observable.timer(1000).subscribe(val => console.log(val))
  //=> 0

  let subscription1 = Observable.timer(1000, 2000).subscribe(val => console.log(val))

  setTimeout(() => {
    subscription1.unsubscribe()
  }, 3500)
  //=> 0
  //=> 1

}, 4500)

Installation

npm install --save observants

Usage

You can import one or multiple operators from observants:

const { Observable, Observer, Subscriber } = require('observants');
// or
import { Observable, Observer, Subscriber } from 'observants';

Todo

Subject

Scheduler

Current Tags

  • 0.1.0                                ...           latest (3 years ago)

2 Versions

  • 0.1.0                                ...           3 years ago
  • 0.0.1                                ...           3 years ago
Maintainers (1)
Downloads
Today 0
This Week 0
This Month 0
Last Day 0
Last Week 0
Last Month 1
Dependencies (0)
None
Dependents (0)
None

Copyright 2014 - 2017 © taobao.org |