diff --git a/src/utils/async-queue.ts b/src/utils/async-queue.ts new file mode 100644 index 0000000..239ee44 --- /dev/null +++ b/src/utils/async-queue.ts @@ -0,0 +1,32 @@ +export class AsyncQueue { + private items: T[] = []; + private resolvers: ((value: T) => void)[] = []; + + push(item: T): void { + if (this.resolvers.length > 0) { + const resolve = this.resolvers.shift()!; + resolve(item); + } else { + this.items.push(item); + } + } + + pushAll(items: Iterable): void { + for (const item of items) { + this.push(item); + } + } + + async pop(): Promise { + if (this.items.length > 0) { + return this.items.shift()!; + } + return new Promise((resolve) => { + this.resolvers.push(resolve); + }); + } + + get length(): number { + return this.items.length - this.resolvers.length; + } +} diff --git a/tests/utils/async-queue.test.ts b/tests/utils/async-queue.test.ts new file mode 100644 index 0000000..b4c23e7 --- /dev/null +++ b/tests/utils/async-queue.test.ts @@ -0,0 +1,104 @@ +import { describe, it, expect } from 'vitest'; +import { AsyncQueue } from '../../src/utils/async-queue'; + +describe('AsyncQueue', () => { + describe('push', () => { + it('should add item to queue', () => { + const queue = new AsyncQueue(); + queue.push(1); + expect(queue.length).toBe(1); + }); + }); + + describe('pop', () => { + it('should return item immediately if queue is not empty', async () => { + const queue = new AsyncQueue(); + queue.push(1); + queue.push(2); + + const result = await queue.pop(); + expect(result).toBe(1); + expect(queue.length).toBe(1); + }); + + it('should wait for item if queue is empty', async () => { + const queue = new AsyncQueue(); + + const popPromise = queue.pop(); + + queue.push(42); + + const result = await popPromise; + expect(result).toBe(42); + expect(queue.length).toBe(0); + }); + + it('should return items in FIFO order', async () => { + const queue = new AsyncQueue(); + queue.push(1); + queue.push(2); + queue.push(3); + + expect(await queue.pop()).toBe(1); + expect(await queue.pop()).toBe(2); + expect(await queue.pop()).toBe(3); + }); + }); + + describe('pushAll', () => { + it('should add all items to queue', async () => { + const queue = new AsyncQueue(); + queue.pushAll([1, 2, 3]); + + expect(queue.length).toBe(3); + expect(await queue.pop()).toBe(1); + expect(await queue.pop()).toBe(2); + expect(await queue.pop()).toBe(3); + }); + + it('should accept any iterable', async () => { + const queue = new AsyncQueue(); + queue.pushAll(new Set([10, 20, 30])); + + expect(await queue.pop()).toBe(10); + expect(await queue.pop()).toBe(20); + expect(await queue.pop()).toBe(30); + }); + + it('should resolve waiting pops', async () => { + const queue = new AsyncQueue(); + + const pop1 = queue.pop(); + const pop2 = queue.pop(); + + queue.pushAll([1, 2]); + + expect(await pop1).toBe(1); + expect(await pop2).toBe(2); + expect(queue.length).toBe(0); + }); + }); + + describe('length', () => { + it('should return 0 for empty queue', () => { + const queue = new AsyncQueue(); + expect(queue.length).toBe(0); + }); + + it('should reflect pending consumers as negative', () => { + const queue = new AsyncQueue(); + queue.pop(); // no await, so it's pending + expect(queue.length).toBe(-1); + }); + + it('should update after push and pop', async () => { + const queue = new AsyncQueue(); + queue.push(1); + queue.push(2); + expect(queue.length).toBe(2); + + await queue.pop(); + expect(queue.length).toBe(1); + }); + }); +});