Skip to content

Commit

Permalink
Update data stream API (#405)
Browse files Browse the repository at this point in the history
  • Loading branch information
lukasIO authored Jan 31, 2025
1 parent a5311f8 commit f32e411
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 11 deletions.
10 changes: 5 additions & 5 deletions examples/data-streams/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ const sendFile = async (room: Room, recipient: RemoteParticipant) => {
await room.localParticipant?.sendFile('./assets/maybemexico.jpg', {
destinationIdentities: [recipient.identity],
name: 'mex.jpg',
topic: 'welcome',
topic: 'files',
mimeType: 'image/jpg',
});
console.log('done sending file');
Expand All @@ -53,14 +53,14 @@ const main = async () => {
room.on(RoomEvent.ParticipantDisconnected, resolve);
});

room.setTextStreamHandler(async (reader: TextStreamReader, { identity }) => {
room.registerTextStreamHandler('chat', async (reader: TextStreamReader, { identity }) => {
console.log(`chat message from ${identity}: ${await reader.readAll()}`);
// for await (const { collected } of reader) {
// console.log(collected);
// }
}, 'chat');
});

room.setByteStreamHandler(async (reader: ByteStreamReader, { identity }) => {
room.registerByteStreamHandler('files', async (reader: ByteStreamReader, { identity }) => {
console.log(`welcome image received from ${identity}: ${reader.info.name}`);

// create write stream and write received file to disk, make sure ./temp folder exists
Expand All @@ -70,7 +70,7 @@ const main = async () => {
writer.write(chunk);
}
writer.close();
}, 'welcome');
});

room.on(RoomEvent.ParticipantConnected, async (participant) => {
await sendFile(room, participant);
Expand Down
12 changes: 6 additions & 6 deletions packages/livekit-rtc/src/room.ts
Original file line number Diff line number Diff line change
Expand Up @@ -194,25 +194,25 @@ export class Room extends (EventEmitter as new () => TypedEmitter<RoomCallbacks>
this.removeAllListeners();
}

setTextStreamHandler(callback: TextStreamHandler, topic: string = '') {
registerTextStreamHandler(topic: string, callback: TextStreamHandler) {
if (this.textStreamHandlers.has(topic)) {
throw new TypeError(`A text stream handler for topic "${topic}" has already been set.`);
throw new Error(`A text stream handler for topic "${topic}" has already been set.`);
}
this.textStreamHandlers.set(topic, callback);
}

removeTextStreamHandler(topic: string = '') {
unregisterTextStreamHandler(topic: string) {
this.textStreamHandlers.delete(topic);
}

setByteStreamHandler(callback: ByteStreamHandler, topic: string = '') {
registerByteStreamHandler(topic: string, callback: ByteStreamHandler) {
if (this.byteStreamHandlers.has(topic)) {
throw new TypeError(`A byte stream handler for topic "${topic}" has already been set.`);
throw new Error(`A byte stream handler for topic "${topic}" has already been set.`);
}
this.byteStreamHandlers.set(topic, callback);
}

removeByteStreamHandler(topic: string = '') {
unregisterByteStreamHandler(topic: string) {
this.byteStreamHandlers.delete(topic);
}

Expand Down

0 comments on commit f32e411

Please sign in to comment.