269 lines
8.7 KiB
TypeScript
269 lines
8.7 KiB
TypeScript
/**
|
||
* VoiceActivityService — monitors audio levels for local microphone
|
||
* and remote peer streams, exposing per-user "speaking" state as
|
||
* reactive Angular signals.
|
||
*
|
||
* Usage:
|
||
* ```ts
|
||
* const speaking = voiceActivity.isSpeaking(userId);
|
||
* // speaking() => true when the user's audio level exceeds the threshold
|
||
*
|
||
* const volume = voiceActivity.volume(userId);
|
||
* // volume() => normalised 0–1 audio level
|
||
* ```
|
||
*
|
||
* Internally uses the Web Audio API ({@link AudioContext} +
|
||
* {@link AnalyserNode}) per tracked stream, with a single
|
||
* `requestAnimationFrame` poll loop.
|
||
*/
|
||
import { Injectable, signal, computed, inject, OnDestroy, Signal } from '@angular/core';
|
||
import { Subscription } from 'rxjs';
|
||
import { WebRTCService } from './webrtc.service';
|
||
|
||
/** RMS volume threshold (0–1) above which a user counts as "speaking". */
|
||
const SPEAKING_THRESHOLD = 0.015;
|
||
|
||
/** How many consecutive silent frames before we flip speaking → false. */
|
||
const SILENT_FRAME_GRACE = 8;
|
||
|
||
/** FFT size for the AnalyserNode (smaller = cheaper). */
|
||
const FFT_SIZE = 256;
|
||
|
||
/** Internal bookkeeping for a single tracked stream. */
|
||
interface TrackedStream {
|
||
/** The AudioContext used for analysis (one per stream to avoid cross-origin issues). */
|
||
ctx: AudioContext;
|
||
/** Source node wired from the MediaStream. */
|
||
source: MediaStreamAudioSourceNode;
|
||
/** Analyser node that provides time-domain data. */
|
||
analyser: AnalyserNode;
|
||
/** Reusable buffer for `getByteTimeDomainData`. */
|
||
dataArray: Uint8Array<ArrayBuffer>;
|
||
/** Writable signal for the normalised volume (0–1). */
|
||
volumeSignal: ReturnType<typeof signal<number>>;
|
||
/** Writable signal for speaking state. */
|
||
speakingSignal: ReturnType<typeof signal<boolean>>;
|
||
/** Counter of consecutive silent frames. */
|
||
silentFrames: number;
|
||
/** The MediaStream being analysed (for identity checks). */
|
||
stream: MediaStream;
|
||
}
|
||
|
||
@Injectable({ providedIn: 'root' })
|
||
export class VoiceActivityService implements OnDestroy {
|
||
private readonly webrtc = inject(WebRTCService);
|
||
|
||
/** All tracked streams keyed by user/peer ID. */
|
||
private readonly tracked = new Map<string, TrackedStream>();
|
||
|
||
/** Animation frame handle. */
|
||
private animFrameId: number | null = null;
|
||
|
||
/** RxJS subscriptions managed by this service. */
|
||
private readonly subs: Subscription[] = [];
|
||
|
||
/** Exposed map: userId → speaking (reactive snapshot). */
|
||
private readonly _speakingMap = signal<ReadonlyMap<string, boolean>>(new Map());
|
||
|
||
/** Reactive snapshot of all speaking users (for debugging / bulk consumption). */
|
||
readonly speakingMap: Signal<ReadonlyMap<string, boolean>> = this._speakingMap;
|
||
|
||
constructor() {
|
||
// Wire up remote stream events
|
||
this.subs.push(
|
||
this.webrtc.onRemoteStream.subscribe(({ peerId, stream }) => {
|
||
this.trackStream(peerId, stream);
|
||
}),
|
||
);
|
||
|
||
this.subs.push(
|
||
this.webrtc.onPeerDisconnected.subscribe((peerId) => {
|
||
this.untrackStream(peerId);
|
||
}),
|
||
);
|
||
}
|
||
|
||
// ── Public API ──────────────────────────────────────────────────
|
||
|
||
/**
|
||
* Start monitoring the current user's local microphone stream.
|
||
* Should be called after voice is enabled (mic captured).
|
||
*
|
||
* @param userId - The local user's ID (used as the key in the speaking map).
|
||
* @param stream - The local {@link MediaStream} from `getUserMedia`.
|
||
*/
|
||
trackLocalMic(userId: string, stream: MediaStream): void {
|
||
this.trackStream(userId, stream);
|
||
}
|
||
|
||
/**
|
||
* Stop monitoring the current user's local microphone.
|
||
*
|
||
* @param userId - The local user's ID.
|
||
*/
|
||
untrackLocalMic(userId: string): void {
|
||
this.untrackStream(userId);
|
||
}
|
||
|
||
/**
|
||
* Returns a read-only signal that is `true` when the given user
|
||
* is currently speaking (audio level above threshold).
|
||
*
|
||
* If the user is not tracked yet, the returned signal starts as
|
||
* `false` and will become reactive once a stream is tracked.
|
||
*/
|
||
isSpeaking(userId: string): Signal<boolean> {
|
||
const entry = this.tracked.get(userId);
|
||
if (entry) return entry.speakingSignal.asReadonly();
|
||
|
||
// Return a computed that re-checks the map so it becomes live
|
||
// once the stream is tracked.
|
||
return computed(() => this._speakingMap().get(userId) ?? false);
|
||
}
|
||
|
||
/**
|
||
* Returns a read-only signal with the normalised (0–1) volume
|
||
* for the given user.
|
||
*/
|
||
volume(userId: string): Signal<number> {
|
||
const entry = this.tracked.get(userId);
|
||
if (entry) return entry.volumeSignal.asReadonly();
|
||
return computed(() => 0);
|
||
}
|
||
|
||
// ── Stream tracking ─────────────────────────────────────────────
|
||
|
||
/**
|
||
* Begin analysing a {@link MediaStream} for audio activity.
|
||
*
|
||
* If a stream is already tracked for `id`, it is replaced.
|
||
*/
|
||
trackStream(id: string, stream: MediaStream): void {
|
||
// If we already track this exact stream, skip.
|
||
const existing = this.tracked.get(id);
|
||
if (existing && existing.stream === stream) return;
|
||
|
||
// Clean up any previous entry for this id.
|
||
if (existing) this.disposeEntry(existing);
|
||
|
||
const ctx = new AudioContext();
|
||
const source = ctx.createMediaStreamSource(stream);
|
||
const analyser = ctx.createAnalyser();
|
||
analyser.fftSize = FFT_SIZE;
|
||
|
||
source.connect(analyser);
|
||
// Do NOT connect analyser to ctx.destination — we don't want to
|
||
// double-play audio; playback is handled elsewhere.
|
||
|
||
const dataArray = new Uint8Array(analyser.fftSize) as Uint8Array<ArrayBuffer>;
|
||
const volumeSignal = signal(0);
|
||
const speakingSignal = signal(false);
|
||
|
||
this.tracked.set(id, {
|
||
ctx,
|
||
source,
|
||
analyser,
|
||
dataArray,
|
||
volumeSignal,
|
||
speakingSignal,
|
||
silentFrames: 0,
|
||
stream,
|
||
});
|
||
|
||
// Ensure the poll loop is running.
|
||
this.ensurePolling();
|
||
}
|
||
|
||
/** Stop tracking and dispose resources for a given ID. */
|
||
untrackStream(id: string): void {
|
||
const entry = this.tracked.get(id);
|
||
if (!entry) return;
|
||
this.disposeEntry(entry);
|
||
this.tracked.delete(id);
|
||
this.publishSpeakingMap();
|
||
|
||
// Stop polling when nothing is tracked.
|
||
if (this.tracked.size === 0) this.stopPolling();
|
||
}
|
||
|
||
// ── Polling loop ────────────────────────────────────────────────
|
||
|
||
private ensurePolling(): void {
|
||
if (this.animFrameId !== null) return;
|
||
this.poll();
|
||
}
|
||
|
||
private stopPolling(): void {
|
||
if (this.animFrameId !== null) {
|
||
cancelAnimationFrame(this.animFrameId);
|
||
this.animFrameId = null;
|
||
}
|
||
}
|
||
|
||
/**
|
||
* Single `requestAnimationFrame`-based loop that reads audio levels
|
||
* from every tracked analyser and updates signals accordingly.
|
||
*/
|
||
private poll = (): void => {
|
||
let mapDirty = false;
|
||
|
||
this.tracked.forEach((entry) => {
|
||
const { analyser, dataArray, volumeSignal, speakingSignal } = entry;
|
||
|
||
analyser.getByteTimeDomainData(dataArray);
|
||
|
||
// Compute RMS volume from time-domain data (values 0–255, centred at 128).
|
||
let sumSquares = 0;
|
||
for (let i = 0; i < dataArray.length; i++) {
|
||
const normalised = (dataArray[i] - 128) / 128;
|
||
sumSquares += normalised * normalised;
|
||
}
|
||
const rms = Math.sqrt(sumSquares / dataArray.length);
|
||
|
||
volumeSignal.set(rms);
|
||
|
||
const wasSpeaking = speakingSignal();
|
||
if (rms >= SPEAKING_THRESHOLD) {
|
||
entry.silentFrames = 0;
|
||
if (!wasSpeaking) {
|
||
speakingSignal.set(true);
|
||
mapDirty = true;
|
||
}
|
||
} else {
|
||
entry.silentFrames++;
|
||
if (wasSpeaking && entry.silentFrames >= SILENT_FRAME_GRACE) {
|
||
speakingSignal.set(false);
|
||
mapDirty = true;
|
||
}
|
||
}
|
||
});
|
||
|
||
if (mapDirty) this.publishSpeakingMap();
|
||
|
||
this.animFrameId = requestAnimationFrame(this.poll);
|
||
};
|
||
|
||
/** Rebuild the public speaking-map signal from current entries. */
|
||
private publishSpeakingMap(): void {
|
||
const map = new Map<string, boolean>();
|
||
this.tracked.forEach((entry, id) => {
|
||
map.set(id, entry.speakingSignal());
|
||
});
|
||
this._speakingMap.set(map);
|
||
}
|
||
|
||
// ── Cleanup ─────────────────────────────────────────────────────
|
||
|
||
private disposeEntry(entry: TrackedStream): void {
|
||
try { entry.source.disconnect(); } catch { /* already disconnected */ }
|
||
try { entry.ctx.close(); } catch { /* already closed */ }
|
||
}
|
||
|
||
ngOnDestroy(): void {
|
||
this.stopPolling();
|
||
this.tracked.forEach((entry) => this.disposeEntry(entry));
|
||
this.tracked.clear();
|
||
this.subs.forEach((s) => s.unsubscribe());
|
||
}
|
||
}
|