Operators

Operators are optionally addable methods that enable various manipulation of channels.

Instances Methods

These methods could be added to the instances created by the Channel constructor.

broadcast

source.broadcast(...channels) -> source

The broadcast method enables multicasting from one channel to multiple channels. As soon as a value is inserted into the source, it will be emitted to listening channels.

require("@jfet97/csp/dist/operators/broadcast");
// or...
import "@jfet97/csp/dist/operators/broadcast";


const source = new Channel();
const dest1 = new Channel();
const dest2 = new Channel();
const dest3 = new Channel();

source.broadcast(dest1, dest2, dest3);

const m = 42;
source.put(m);

const res1 = await dest1.take(); // will receive 42
const res2 = await dest1.take(); // will receive 42
const res3 = await dest1.take(); // will receive 42

delay

source.broadcast(number) -> channel

The delay method creates a new channel that will receive all the values coming from its source, but with a delay expressed in milliseconds.

require("@jfet97/csp/dist/operators/delay");
// or...
import "@jfet97/csp/dist/operators/delay";


const source = new Channel();
const delayed = source.delay(3000);

source.put(42);

const res = await delayed.take(); // will receive 42 after 3 seconds

filter

source.filter(value => boolean) -> channel

The filter method takes a predicate function and returns a new channel. Each value inserted into the source will be passed to the predicate, and only those who make the function to return true will be inserted into the returned channel. The others will be discarded.

require("@jfet97/csp/dist/operators/filter");
// or...
import "@jfet97/csp/dist/operators/filter";


const source = new Channel();
const resCh = source.filter(v => Boolean(v % 2));

source.put(1);
source.put(2);
source.put(3);
source.put(4);
            
const result1 = await resCh.take(); // will receive 1
const result2 = await resCh.take(); // will receive 3

map

source.map(value => value) -> channel

The map method takes a mapper function and returns a new channel. Each value inserted into the source will be passed to the mapper function and the result of each computation will be inserted into the returned channel.

require("@jfet97/csp/dist/operators/map");
// or...
import "@jfet97/csp/dist/operators/map";
  

const source = new Channel();
const resCh = source.filter(v => 10 * v);

source.put(1);
source.put(2);
source.put(3);
source.put(4);
            
const result1 = await resCh.take(); // will receive 10
const result2 = await resCh.take(); // will receive 20
const result3 = await resCh.take(); // will receive 30
const result4 = await resCh.take(); // will receive 40

pipe

source.pipe(dest) -> dest

The pipe method simply takes alle the values from the source and insert them into the dest. It returns the destination channel to allow chained operations on it.

require("@jfet97/csp/dist/operators/pipe");
// or...
import "@jfet97/csp/dist/operators/pipe";
  

const source = new Channel();
const dest = new Channel();
source.pipe(dest);

source.put(1);
source.put(2);
source.put(3);
source.put(4);
            
const result1 = await dest.take(); // will receive 1
const result2 = await dest.take(); // will receive 2
const result3 = await dest.take(); // will receive 3
const result4 = await dest.take(); // will receive 4

fromIterable

channel.fromIterable(iterable) -> channel

The fromIterable method takes all the values from a synchronous iterable and puts them all synchronously into the channel. Do not use with endless iterables.

require("@jfet97/csp/dist/operators/fromIterable");
// or...
import "@jfet97/csp/dist/operators/fromIterable";


const chan = new Channel();
const iterable = [1, 2, 3];

chan.fromIterable(iterable);

const result = await chan.drain(); // will receive [1, 2, 3]

fromIterableDelayed

channel.fromIterableDelayed(iterable) -> channel

The fromIterableDelayed method takes all the values from a synchronous iterable and puts them into the channel, waiting that a value is taken from the channel before put the next one.

require("@jfet97/csp/dist/operators/fromIterableDelayed");
// or...
import "@jfet97/csp/dist/operators/fromIterableDelayed";


const chan = new Channel();
const iterable = [1, 2, 3];

chan.fromIterableDelayed(iterable);

const result = await chan.drain(); // will receive [1]

fromAsyncIterable

channel.fromAsyncIterable(asyncIterable) -> channel

The fromAsyncIterable method takes each values from an asynchronous iterable and puts them into the channel. A take operation won't be waited, therefore as soon as a new value is available it will be inserted into the channel.

require("@jfet97/csp/dist/operators/fromAsyncIterable");
// or...
import "@jfet97/csp/dist/operators/fromAsyncIterable";


const chan = new Channel();
const asyncIterable = {
    async *[Symbol.asyncIterator]() {
        yield* [1, 2, 3, 4, 5];
    }
};

chan.fromAsyncIterable(asyncIterable);

const result = await chan.drain(); // will receive [1, 2, 3, 4, 5]

fromAsyncIterableDelayed

channel.fromAsyncIterableDelayed(asyncIterable) -> channel

The fromAsyncIterableDelayed method takes all the values from an asynchronous iterable and puts them into the channel, waiting that a value is taken from the channel before put the next one.

require("@jfet97/csp/dist/operators/fromAsyncIterableDelayed");
// or...
import "@jfet97/csp/dist/operators/fromAsyncIterableDelayed";


const chan = new Channel();
const asyncIterable = {
    async *[Symbol.asyncIterator]() {
        yield* [1, 2, 3, 4, 5];
    }
};

chan.fromAsyncIterableDelayed(asyncIterable);

const result = await chan.drain(); // will receive [1]