Getting started

What are you building here

This Getting Started guides you to build a very simple app, taking audio as input, and displaying the transcription as output.

You can download the Python script we're going to build here. You can also find other examples on the Git repository.

Get the Python SDK

The SDK is available on pip. It is built for Python 3.7 and higher and can then be installed using pip:

$ pip install -U uhlive

Or, to automatically install the dependencies needed to run the examples:

$ pip install -U uhlive[examples]

Connect to the server

First things first, you need to initiate a connection to our servers. Our API URL is: wss://api.uh.live

Make sure you have valid credentials (token and identifier), otherwise you'll receive an error. We recommend you pass credentials in as environment variables, or persist them in a database that is accessed at runtime. You can add a token to the environment by starting your app as:

UHLIVE_API_TOKEN="some-token" UHLIVE_API_ID="your_identifier" UHLIVE_API_URL="wss://api.uh.live" python myapp.py

Our SDK is designed so that you are free to use the websocket library you want as transport, and to architecture your code the way you like.

Here is a straightforward example of connecting to the API using either websocket-client (sync) or aiohttp (async):

# connect with SDK
import os
import time
import websocket as ws # type: ignore
from websocket import WebSocketTimeoutException # type: ignore

from uhlive.stream.conversation import *

def main(uhlive_url, uhlive_token, uhlive_id):
url = build_conversation_url(uhlive_url, uhlive_token)
# create transport
socket = ws.create_connection(url, timeout=10)
print("connected!")


uhlive_url = os.environ["UHLIVE_API_URL"]
uhlive_id = os.environ["UHLIVE_API_ID"]
uhlive_token = os.environ["UHLIVE_API_TOKEN"]
main(uhlive_url, uhlive_token, uhlive_id)
# connect with SDK
import asyncio
import os
from aiohttp import ClientSession # type: ignore

from uhlive.stream.conversation import *

async def main(uhlive_url, uhlive_token, uhlive_id):
# create transport
async with ClientSession() as session:
async with session.ws_connect(
build_conversation_url(uhlive_url, uhlive_token),
) as socket:
print("connected!")


uhlive_url = os.environ["UHLIVE_API_URL"]
uhlive_id = os.environ["UHLIVE_API_ID"]
uhlive_token = os.environ["UHLIVE_API_TOKEN"]
asyncio.run(main(uhlive_url, uhlive_token, uhlive_id))

Join a conversation

With a single connection, you can join several different conversations. Think of a conversation as a conference call instance, or a room on a chat service. Only people who have joined the conversation can access the exchanged data. For this example, we'll use a conversation with only one speaker: Alice.

Replace the main function with:

def main(uhlive_url, uhlive_token, uhlive_id):
url = build_conversation_url(uhlive_url, uhlive_token)
socket = ws.create_connection(url, timeout=10)

# join a conversation
client = Conversation(uhlive_id, "my-conversation-id", speaker="Alice")
# command are sent as text frames
socket.send(client.join())

# you can also join a conversation as an observer
# client = Conversation(uhlive_id, "my-conversation-id", speaker="Bob")
# socket.send(client.join(readonly=True))

# if you are an active speaker, you can explicitly set some ASR parameters or audio codec
# client = Conversation(uhlive_id, "my-conversation-id", speaker="Alice")
# socket.send(
# client.join(model="fr", interim_results=False, rescoring=True, origin=int(time.time()*1000), audio_codec="g711a")
# )

# check we didn't get an error on join
client.receive(socket.recv())
print("joined!")
async def main(uhlive_url, uhlive_token, uhlive_id):
async with ClientSession() as session:
async with session.ws_connect(
build_conversation_url(uhlive_url, uhlive_token),
) as socket:
# join a conversation
client = Conversation(uhlive_id, "my-conversation-id", speaker="Alice")
# command are sent as text frames
await socket.send_str(client.join())

# you can also join a conversation as an observer
# client = Conversation(uhlive_id, "my-conversation-id", speaker="Bob")
# await socket.send_str(client.join(readonly=True))

# if you are an active speaker, you can explicitly set some ASR parameters
# client = Conversation(uhlive_id, "my-conversation-id", speaker="Alice")
# await socket.send_str(
# client.join(model="fr", interim_results=False, rescoring=True, origin=int(time.time()*1000))
# )

# check we didn't get an error on join
msg = await socket.receive()
client.receive(msg.data)
print("joined!")

You can read more about ASR parameters here.

You can name the conversation however you like.

Send audio and receive transcription

This is were the fun begins. Now that we have a conversation we talk to, we can send an audio and receive its transcription.

When you join a conversation as a participant, you must be ready to stream audio immediately. Any gap in the stream longer than a few seconds will be interpreted as a lost connection and will terminate your session.

Send audio

We currently only support audio with a sample rate of 8kHz and a bit depth of 16 bits

You first need to get your audio_file.wav wav file and build a raw audio file with it using SoX:

$ sox audio_file.mp3 -t raw -c 1 -b 16 -r 8k -e signed-integer audio_file.pcm

Don't have an audio at hand? Download this audio file as an example.

Now you can stream the audio for transcription. Streaming and listening for recognition events should be done in different threads (or coroutines if you are using asyncio) or different scripts to that they run concurrently.

For example, to stream an audio file, insert this code before your main(…) function:

class AudioSender(Thread):
def __init__(self, socket, client, audio_file):
Thread.__init__(self)
self.socket = socket
self.client = client
self.audio_file = audio_file

def run(self):
print(f"Streaming file in realtime: {self.audio_file} for transcription!")
with open(self.audio_file, "rb") as audio_file:
while True:
audio_chunk = audio_file.read(8000)
if not audio_chunk:
break
# Audio is sent as binary messages
self.socket.send_binary(self.client.send_audio_chunk(audio_chunk))
time.sleep(0.5) # Simulate realtime audio

print(f"File {self.audio_file} successfully streamed")
# Leave the conversation after file has been streamed (see below for explanation)
self.socket.send(self.client.leave())
async def stream_file(audio_path, socket, client):
with open(audio_path, "rb") as audio_file:
while True:
audio_chunk = audio_file.read(8000)
if not audio_chunk:
break
# audio is sent as binary frames
await socket.send_bytes(client.send_audio_chunk(audio_chunk))
await asyncio.sleep(0.5) # Simulate real time audio
print(f"File {audio_path} successfully streamed")
# Leave the conversation after file has been streamed (see below for explanation)
await socket.send_str(client.leave())

For the websocket example, do not forget to add the following import at the head of the file: from threading import Thread.

Please don't abuse our realtime streaming API to transcribe files at full bandwidth speed. You'll be throttled. Use our batch API to transcribe files at full speed.

And, to start the stream, append these lines to your main(…) function:

    sender = AudioSender(socket, client, "path/to/the/file/to/stream")
sender.start()
print("Stream started")
            sender = asyncio.create_task(
stream_file("path/to/the/file/to/stream", socket, client)
)
print("Stream started")

If you try to run your script in this state there will be an error, since the main thread will terminate before the sender thread (or coroutine) and you'll get an error from the latter.

Receive transcription

We receive the transcription as a succession of transcription events which are either intermediate transcription of words or full transcription of a segment of audio.

Intermediate transcriptions events are named WordsDecoded. They happen many times per second with incrementel results.

Final transcription events are named SegmentDecoded. They happen less frequently than WordsDecoded and expose the final result.

You can display the whole final transcription with the following snippet, in main(…) function, append after print("Stream started"):

    # Print each transcribed segment
try:
while True:
try:
event = client.receive(socket.recv()) # we set a 10 seconds timeout on the socket earlier
except WebSocketTimeoutException:
print("Silence")
continue
if client.left:
break
if isinstance(event, SegmentDecoded):
print(f"[{event.speaker} - {event.utterance_id}] {event.transcript}")
# You can also dig into the event to get more data
transcript = event.transcript
start = event.start
end = event.end
print(f"[{start}->{end}]: {transcript}")
finally:
sender.join()
socket.close()
            try:
while True:
msg = await socket.receive()
event = client.receive(msg.data)
if client.left:
break
if isinstance(event, SegmentDecoded):
print(f"[{event.speaker} - {event.utterance_id}] {event.transcript}")
# You can also dig into the event to get more data
transcript = event.transcript
start = event.start
end = event.end
print(f"[{start}->{end}]: {transcript}")
finally:
sender.cancel()
await sender

When a speaker joins the conversation, a SpeakerJoined event is emitted. When a speaker leaves the conversation, a SpeakerLeft event is emitted.

Receive enrich events

From time to time, when an NLU agent find interesting information in the conversation, it will emit enrich events.

The class in python has the same name but in CamelCase.

You can replace the previous snippet with this one, or merge them:

    # Print EntityFound events
try:
while True:
try:
event = client.receive(socket.recv()) # we set a 10 seconds timeout on the socket earlier
except WebSocketTimeoutException:
print("Silence")
continue
if client.left:
break
if isinstance(event, EntityFound):
print(f"[{event.speaker}] {event.canonical}")
# You can also dig into the event payload to get more data
canonical = event.canonical
original = event.original
start = event.start
end = event.end
print(f"[{start}->{end}]: '{canonical}' replaces '{original}'")
finally:
sender.join()
socket.close()
            try:
while True:
msg = await socket.receive()
event = client.receive(msg.data)
if client.left:
break
if isinstance(event, EntityFound):
print(f"[{event.speaker}] {event.canonical}")
# You can also dig into the event payload to get more data
canonical = event.canonical
original = event.original
start = event.start
end = event.end
print(f"[{start}->{end}]: '{canonical}' replaces '{original}'")
finally:
sender.cancel()
await sender

Leave conversation

To cleanly leave a conversation without missing any transcription events from your audio stream, use the Conversation instance's .leave() method and wait for its .left flag to be true.

As you can see in this example, .leave() method has been added in the Send Audio part, at the end of the sender. The while event loop above also checks the .left flag.

There you go! You are now able to send an audio and receive its transcription.

We provide some more examples in the SDK repository.

To dive in deeper, you can browse the API Reference documentation.

If you are stuck, want to suggest something, or just want to say hello, send us an e-mail to support@allo-media.fr.