Skip to content

numaproj/numaflow-js

Repository files navigation

Numaflow-JS

Numaflow-JS is an SDK for Numaflow that provides the interfaces in JavaScript/TypeScript to implement different types of data processing tasks that Numaflow supports.

Currently, these include:

This SDK is powered by Numaflow Rust SDK through napi.rs to interact with Numaflow.

Installation

Replace npm with your favourite package manager (pnpm, yarn, etc.) in the following command to install the SDK.

npm install @numaproj/numaflow-js

Examples

The examples are available in the examples directory. The examples presented provide a basic overview of how to implement different types of data processing tasks using Numaflow-JS. Each example focuses implementing and building one specific component of Numaflow.

Most of the examples follow a similar structure:

  • Dockerfile: Contains the instructions to build the Docker image for the example.
  • Makefile: Contains helper commands to build the Docker image
  • README.md: Provides details on implementing the concerned type of component and instructions to run the specific example.
  • <example-name>.ts: Contains the TypeScript implementation of the example.
  • <example-pipeline>.yaml: Contains the pipeline configuration which uses the image built from the specific example.

In the implementation part of all the examples presented, i.e. in <example-name>.ts, the pattern is mostly similar. We need to instantiate and start an async server for the respective component being implemented.

Implementation details with examples

Eg: Implementing UD sink component:

  • To implement a UD sink need to use sink.AsyncServer.
  • To instantiate the server, we need to provide a function sinkFn with a signature satisfying AsyncServer constructor.
  • Start the server using start method of AsyncServer.
  • Stop the server using stop method of AsyncServer.

Currently, source and session-reduce components require implementing all methods of an interface and passing an instance of the same to their respective async server constructors. Rest of the components only require implementing a function with a signature satisfying the constructor of the async server.

Following are the different ways to implement a function with a signature satisfying the constructor of the async server:

  1. Using an arrow function
    If the function is small and simple, we can use an arrow function.
const sinkFn = (message: Message) => {
    console.log(message)
}

const server = new sink.AsyncServer(sinkFn)

Still works if defined as part of a class.

class Sink {
    counter = 0
    sinkFn = (message: Message) => {
        this.counter++
        console.log(this.counter, message)
    }
}

let sinker = new Sink()
const server = new sink.AsyncServer(sinker.sinkFn)
  1. Using a named function

Simple named functions work the same way.

function sinkFn(message: Message) {
    console.log(message)
}

const server = new sink.AsyncServer(sinkFn)

Named functions defined as part of a class may need to be bound to the instance of the class.

class Sink {
    counter = 0
    sinkFn(message: Message) {
        this.counter++
        console.log(this.counter, message)
    }
}

let sinker = new Sink()
const server = new sink.AsyncServer(sinker.sinkFn.bind(sinker))

If any of the examples are failing to build or if they need further clarification, please create an issue to fix the same.

Releases

No releases published

Packages

No packages published

Contributors 4

  •  
  •  
  •  
  •