-
Notifications
You must be signed in to change notification settings - Fork 636
/
Copy pathminimal_assistant.py
94 lines (73 loc) · 2.64 KB
/
minimal_assistant.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import asyncio
import logging
from dotenv import load_dotenv
from livekit import rtc
from livekit.agents import (
AutoSubscribe,
JobContext,
JobProcess,
WorkerOptions,
cli,
llm,
metrics,
)
from livekit.agents.pipeline import VoicePipelineAgent
from livekit.plugins import deepgram, openai, silero
load_dotenv()
logger = logging.getLogger("voice-assistant")
def prewarm(proc: JobProcess):
proc.userdata["vad"] = silero.VAD.load()
async def entrypoint(ctx: JobContext):
initial_ctx = llm.ChatContext().append(
role="system",
text=(
"You are a voice assistant created by LiveKit. Your interface with users will be voice. "
"You should use short and concise responses, and avoiding usage of unpronouncable punctuation."
),
)
logger.info(f"connecting to room {ctx.room.name}")
await ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY)
# wait for the first participant to connect
participant = await ctx.wait_for_participant()
logger.info(f"starting voice assistant for participant {participant.identity}")
dg_model = "nova-3-general"
if participant.kind == rtc.ParticipantKind.PARTICIPANT_KIND_SIP:
# use a model optimized for telephony
dg_model = "nova-2-phonecall"
agent = VoicePipelineAgent(
vad=ctx.proc.userdata["vad"],
stt=deepgram.STT(model=dg_model),
llm=openai.LLM(),
tts=openai.TTS(),
chat_ctx=initial_ctx,
)
agent.start(ctx.room, participant)
usage_collector = metrics.UsageCollector()
@agent.on("metrics_collected")
def _on_metrics_collected(mtrcs: metrics.AgentMetrics):
metrics.log_metrics(mtrcs)
usage_collector.collect(mtrcs)
async def log_usage():
summary = usage_collector.get_summary()
logger.info(f"Usage: ${summary}")
ctx.add_shutdown_callback(log_usage)
# listen to incoming chat messages, only required if you'd like the agent to
# answer incoming messages from Chat
chat = rtc.ChatManager(ctx.room)
async def answer_from_text(txt: str):
chat_ctx = agent.chat_ctx.copy()
chat_ctx.append(role="user", text=txt)
stream = agent.llm.chat(chat_ctx=chat_ctx)
await agent.say(stream)
@chat.on("message_received")
def on_chat_received(msg: rtc.ChatMessage):
if msg.message:
asyncio.create_task(answer_from_text(msg.message))
await agent.say("Hey, how can I help you today?", allow_interruptions=True)
if __name__ == "__main__":
cli.run_app(
WorkerOptions(
entrypoint_fnc=entrypoint,
prewarm_fnc=prewarm,
),
)