ecs-observable/src/observable/observe.ts

150 lines
4.1 KiB
TypeScript
Raw Normal View History

import { Subject } from "rxjs";
import type { Query } from "../query";
import type { Entity } from "../entity";
import type { WorldEvent, QueryUpdate } from "./events";
// ── Internal observer state per query ────────────────
interface QueryObserverState {
query: Query;
/** Cached set of entities currently matching the query. */
matched: Set<Entity>;
subject: Subject<QueryUpdate>;
}
// ── Observable layer ─────────────────────────────────
/**
* Manages observable subscriptions for a World.
* Kept separate from the World class for clarity.
*/
export class ObservableLayer {
/** Raw event stream. */
readonly events$ = new Subject<WorldEvent>();
/** Active query observers. */
private _observers: QueryObserverState[] = [];
/**
* Get or create a Subject for a query.
* If this is the first subscription, seed the matched set using
* the provided queryMatches callback.
*/
observe(query: Query): Subject<QueryUpdate> {
const existing = this._observers.find(
(o) => o.query === query || queriesEqual(o.query, query),
);
if (existing) return existing.subject;
const state: QueryObserverState = {
query,
matched: new Set(),
subject: new Subject<QueryUpdate>(),
};
// Seeding is handled by World (we don't have entity iteration here).
// The World's observe() method seeds via a separate path.
this._observers.push(state);
return state.subject;
}
/**
* Seed the initial matched set for an observer.
* Called once by World.observe() with all currently-matching entities.
*/
seed(query: Query, entities: Entity[]): void {
const obs = this._observers.find(
(o) => o.query === query || queriesEqual(o.query, query),
);
if (!obs) return;
for (const e of entities) {
obs.matched.add(e);
}
}
/**
* Feed an event into the observable system.
* Called by the World after state mutation.
*/
onEvent(
event: WorldEvent,
queryMatches: (query: Query, e: Entity) => boolean,
): void {
// Forward to the global stream
this.events$.next(event);
// Update each observer
for (const observer of this._observers) {
this._updateObserver(observer, event, queryMatches);
}
}
private _updateObserver(
obs: QueryObserverState,
event: WorldEvent,
queryMatches: (query: Query, e: Entity) => boolean,
): void {
const e = event.entity;
const wasMatched = obs.matched.has(e);
const nowMatches = queryMatches(obs.query, e);
switch (event.type) {
case "spawned":
// Entity is bare; won't match unless components added later
break;
case "destroyed":
if (wasMatched) {
obs.matched.delete(e);
obs.subject.next({ added: [], removed: [e], changed: [] });
}
break;
case "componentAdded":
case "componentRemoved": {
if (wasMatched && !nowMatches) {
obs.matched.delete(e);
obs.subject.next({ added: [], removed: [e], changed: [] });
} else if (!wasMatched && nowMatches) {
obs.matched.add(e);
obs.subject.next({ added: [e], removed: [], changed: [] });
}
break;
}
case "componentChanged": {
if (wasMatched && nowMatches) {
obs.subject.next({ added: [], removed: [], changed: [e] });
}
break;
}
}
}
/** Reset all observer state (useful for tests). */
reset(): void {
for (const obs of this._observers) {
obs.subject.complete();
obs.matched.clear();
}
this._observers = [];
}
/** Complete all streams. */
complete(): void {
this.events$.complete();
for (const obs of this._observers) {
obs.subject.complete();
}
this._observers = [];
}
}
function queriesEqual(a: Query, b: Query): boolean {
if (a.with.length !== b.with.length) return false;
if (a.not.length !== b.not.length) return false;
return (
a.with.every((c, i) => c === b.with[i]) &&
a.not.every((c, i) => c === b.not[i])
);
}