Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
node_modules
coverage

dist
dist

.github/copilot-instructions.md
.gitconfig
6 changes: 6 additions & 0 deletions .prettierrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"trailingComma": "all",
"tabWidth": 2,
"singleQuote": true,
"semi": false
}
9 changes: 6 additions & 3 deletions eslint.config.mjs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
// @ts-check

import eslint from '@eslint/js';
import tseslint from 'typescript-eslint';
import eslint from '@eslint/js'
import tseslint from 'typescript-eslint'

export default tseslint.config(
eslint.configs.recommended,
...tseslint.configs.recommended,
);
{
ignores: ['node_modules/**', 'dist/**'],
},
)
16 changes: 10 additions & 6 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"version": "",
"description": "Queue over Nats",
"main": "dist/index.js",
"types": "dist/index.d.ts",
"repository": "https://github.com/arusakov/node-nats-queue.git",
"author": "Aleksandr Rusakov <aleksandr.s.rusakov@gmail.com>",
"license": "MIT",
Expand All @@ -13,18 +14,15 @@
"build": "tsc -p .",
"compile": "tsc --noEmit -p ./test",
"lint": "eslint .",
"test:all": "TS_NODE_PROJECT=test/tsconfig.json yarn test ./test/*.test.ts",
"test:all": "TS_NODE_PROJECT=test/tsconfig.json yarn test ./test/**/*.test.ts",
"test:coverage:html": "c8 --reporter=html --reporter=text yarn test:all",
"test:coverage": "c8 --reporter=lcovonly --reporter=text yarn test:all",
"test": "node --test --test-concurrency=1 --require=ts-node/register"
},
"devDependencies": {
"@eslint/js": "9.11.0",
"@nats-io/jetstream": "3.0.0-10",
"@nats-io/nats-core": "3.0.0-27",
"@nats-io/transport-node": "3.0.0-12",
"@types/eslint__js": "8.42.3",
"@types/node": "^20.0.0",
"@types/node": "22.0.0",
"c8": "10.1.2",
"eslint": "9.11.0",
"ts-node": "10.9.2",
Expand All @@ -33,5 +31,11 @@
},
"engines": {
"node": ">=20.0.0"
},
"dependencies": {
"@nats-io/kv": "3.0.2",
"@nats-io/jetstream": "3.0.2",
"@nats-io/nats-core": "3.0.2",
"@nats-io/transport-node": "3.0.2"
}
}
}
14 changes: 14 additions & 0 deletions src/flowJob.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import { Job } from './job'
import { FlowJobCreateData } from './types'

export class FlowJob {
job: Job
children?: FlowJob[]

constructor(data: FlowJobCreateData) {
this.job = new Job(data.job)
this.children = data.children
? data.children.map((j) => new FlowJob(j))
: []
}
}
132 changes: 132 additions & 0 deletions src/flowQueue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
import { KV, Kvm } from '@nats-io/kv'
import { FlowJob } from './flowJob'
import { Job } from './job'
import { Queue } from './queue'
import { ChildToParentsKVValue, DependenciesKVValue } from './types'

export class FlowQueue extends Queue {
private parentChildrenStore: KV | null = null
private childParentsStore: KV | null = null

public override async setup(): Promise<void> {
try {
await super.setup()
const kvm = await new Kvm(this.connection)
this.parentChildrenStore = await kvm.create(`${this.name}_parent_id`)
this.childParentsStore = await kvm.create(`${this.name}_parents`)
} catch (e) {
console.error(`Error connecting to JetStream: ${e}`)
throw e
}
}

public async addFlowJob(tree: FlowJob, priority: number = 1): Promise<void> {
const deepestJobs = await this.traverseJobTree(tree)
await this.addJobs(deepestJobs, priority)
}

private async traverseJobTree(
node: FlowJob,
parentId: string | null = null,
): Promise<Job[]> {
const currentJob = node.job
if (parentId) {
currentJob.meta.parentId = parentId
}

const children = node.children || []
if (children.length === 0) {
return [currentJob]
}

const newChildrenForCurrentJobCount: number =
await this.updateChildDependencies({
parentJob: currentJob,
childJobs: children.map((child) => child.job),
})

await this.updateParentDependecies({
parentJob: currentJob,
newChildrentCount: newChildrenForCurrentJobCount,
})

const deepestJobs: Job[] = []
for (const child of children) {
const traverseResult = await this.traverseJobTree(child, currentJob.id)
deepestJobs.push(...traverseResult)
}

return deepestJobs
}

private async updateChildDependencies({
parentJob,
childJobs,
}: {
parentJob: Job
childJobs: Job[]
}) {
let newChildrenForCurrentJobCount: number = 0
for (const childJob of childJobs) {
const childParentsKVEntry = await this.childParentsStore!.get(childJob.id)
if (!childParentsKVEntry) {
await this.childParentsStore!.put(
childJob.id,
JSON.stringify({
parentIds: [parentJob.id],
}),
)
newChildrenForCurrentJobCount++
continue
}

const parentsInfo: ChildToParentsKVValue = childParentsKVEntry.json()

if (parentsInfo.parentIds.includes(parentJob.id)) continue

parentsInfo.parentIds.push(parentJob.id)
await this.childParentsStore!.put(
childJob.id,
JSON.stringify(parentsInfo),
{
previousSeq: childParentsKVEntry.revision,
},
)
newChildrenForCurrentJobCount++
}

return newChildrenForCurrentJobCount
}

private async updateParentDependecies({
parentJob,
newChildrentCount,
}: {
parentJob: Job
newChildrentCount: number
}) {
const existingParentDependencies = await this.parentChildrenStore!.get(
parentJob.id,
)
if (!existingParentDependencies) {
await this.parentChildrenStore!.put(
parentJob.id,
JSON.stringify({
...parentJob,
childrenCount: newChildrentCount,
}),
)
} else {
const parentDependencies: DependenciesKVValue =
existingParentDependencies.json()
parentDependencies.childrenCount += newChildrentCount
await this.parentChildrenStore!.put(
parentJob.id,
JSON.stringify(parentDependencies),
{
previousSeq: existingParentDependencies.revision,
},
)
}
}
}
Loading
Loading