Getting started

What are you building here

This Getting Started guides you to build a very simple app, taking microphone audio as input, and displaying the transcription as output.

You can download the Python script we're going to build here. You can also find other examples on the Git repository.

Get the Python SDK

The SDK is available on pip. It is built for Python 3.7 and higher and can then be installed using pip, including dependencies needed to run the examples:

$ pip install -U uhlive[examples]

Connect to the server

You need an access_token to connect to the API. This token has to be requested on our authentication server, using the client_id and client_secret credentials that were provided to you by your account manager.

Our Python SDK takes care of requesting this token for you. You'll just have to provide the ID and secret.

Once the access_token has been retrieved, a WebSocket connection will be established, the server URL being wss://api.uh.live.

You can read more about the whole authentication flow in the protocol documentation.

We recommend you pass credentials in this Getting Started script as environment variables. A simple way to make credentials available at script run time is to run it as follows:

UHLIVE_API_CLIENT="your-client-id" UHLIVE_API_SECRET="your-client-secret" python myapp.py

Our SDK is designed so that you are free to use the websocket library you want as transport, and to architecture your code the way you like.

Here is a straightforward example of connecting to the API using either websocket-client (sync) or aiohttp (async):

import os
import requests
from uhlive.auth import build_authentication_request


uhlive_client = os.environ["UHLIVE_API_CLIENT"]
uhlive_secret = os.environ["UHLIVE_API_SECRET"]

auth_url, auth_params = build_authentication_request(
uhlive_client, uhlive_secret
)
login = requests.post(auth_url, data=auth_params)
login.raise_for_status()
uhlive_token = login.json()["access_token"]
import asyncio
import os
from uhlive.auth import build_authentication_request
from aiohttp import ClientSession # type: ignore


async def main(uhlive_client, uhlive_secret):
async with ClientSession() as session:
auth_url, auth_params = build_authentication_request(
uhlive_client, uhlive_secret
)
async with session.post(auth_url, data=auth_params) as login:
login.raise_for_status()
body = await login.json()
uhlive_token = body["access_token"]


uhlive_client = os.environ["UHLIVE_API_CLIENT"]
uhlive_secret = os.environ["UHLIVE_API_SECRET"]
asyncio.run(main(uhlive_client, uhlive_secret))

Join a conversation

Think of a conversation as a conference call instance, or a room on a chat service. Only people who have joined the conversation can access the exchanged data. For this example, we'll use a conversation with only one speaker: Alice.

You have to open a new WebSocket connection for each conversation you want to join in. There is a bandwidth limit that would prevent nominal behaviour in case you join several conversations within the same connection.

Update the code above with the following. Add the imports, and append the code at the end:

# Update imports with:
import time
import websocket as ws # type: ignore
from uhlive.stream.conversation import Conversation, build_conversation_url

# Append the following to the code above
url = build_conversation_url(uhlive_token)
socket = ws.create_connection(url, timeout=10)
client = Conversation(uhlive_client, "my-conversation", "Alice")

socket.send(
client.join(
model="fr",
interim_results=False,
rescoring=True,
origin=int(time.time() * 1000),
country="fr",
)
)
# check we didn't get an error on join
client.receive(socket.recv())
# Update imports with
import time
from uhlive.stream.conversation import Conversation, build_conversation_url

# Within the main function...
async def main(uhlive_client, uhlive_secret):
async with ClientSession() as session:

# ... append this code:
async with session.ws_connect(build_conversation_url(uhlive_token)) as socket:
client = Conversation(uhlive_client, "my-conversation-id", "Alice")
# shortcut
await socket.send_str(
client.join(
model="fr",
interim_results=False,
rescoring=True,
origin=int(time.time() * 1000),
country="fr",
)
)
# check we didn't get an error on join
msg = await socket.receive()
client.receive(msg.data)

We've set model as French, but you can also try English en or Spanish es. You can read more about ASR parameters here.

You can name the conversation however you like.

Send audio and receive transcription

This is where the fun begins. Now that we have a conversation we talk to, we can send an audio and receive its transcription.

When you join a conversation as a participant, you must be ready to stream audio immediately. Any gap in the stream longer than a few seconds will be interpreted as a lost connection and will terminate your session.

Send audio

We currently only support audio with a sample rate of 8kHz and a bit depth of 16 bits

We're going to use microphone input as our audio source. Our language models are specialized for phone call conversation, but it would be cumbersome to plug in your phone to our API in a Getting Started :)

We'll have to add some more import, and append the code with microphone streaming:

# Update imports with:
import sounddevice as sd

# Append the previous code block with:
# Audio recording parameters
RATE = 8000
CHUNK = int(RATE / 10) # 100ms


def stream_microphone(socket, client):
def callback(indata, frame_count, time_info, status):
socket.send_binary(client.send_audio_chunk(bytes(indata)))

stream = sd.RawInputStream(
callback=callback, channels=1, samplerate=8000, dtype="int16", blocksize=960
)
stream.start()
return stream

stream = stream_microphone(socket, client)

print("Listening to events")
# Update imports with:
import sounddevice as sd

# Add these two new functions before main:
async def inputstream_generator(channels=1, samplerate=8000, dtype="int16", **kwargs):
"""Generator that yields blocks of input data as bytearrays."""
q_in = asyncio.Queue()
loop = asyncio.get_event_loop()

def callback(indata, frame_count, time_info, status):
loop.call_soon_threadsafe(q_in.put_nowait, bytes(indata))

stream = sd.RawInputStream(
callback=callback,
channels=channels,
samplerate=samplerate,
dtype=dtype,
**kwargs,
)
with stream:
while True:
indata = await q_in.get()
yield indata


async def stream_mic(socket, client):
try:
async for block in inputstream_generator(blocksize=960):
await socket.send_bytes(client.send_audio_chunk(block))
except asyncio.CancelledError:
pass


# Within the main function...
async def main(uhlive_client, uhlive_secret):
async with ClientSession() as session:
async with session.ws_connect(build_conversation_url(uhlive_token)) as socket:

# ... append this code:
streamer = asyncio.create_task(stream_mic(socket, client))
print("Listening…")

This API is meant for realtime streaming. If you try to stream audio files at full bandwidth speed, you'll be throttled. Use our batch API to transcribe files at full speed.

Receive transcription

Now that our microphone is ready to stream, let's take care of handling returned events, and display the transcript.

As before, update the import section, and append the code at the end:

# Update imports with:
from websocket import WebSocketTimeoutException # type: ignore
from uhlive.stream.conversation import Ok

# Append the code with:
try:
while True:
try:
event = client.receive(socket.recv())
except WebSocketTimeoutException:
print("— Silence —")
continue
except KeyboardInterrupt:
break
if not isinstance(event, Ok):
print(event)
finally:
print("Exiting")
stream.stop()
stream.close()
socket.send(client.leave())
socket.close()
# Update imports with
from uhlive.stream.conversation import Ok

# Within the main function...
async def main(uhlive_client, uhlive_secret):
async with ClientSession() as session:
async with session.ws_connect(build_conversation_url(uhlive_token)) as socket:

# ... append this code:
try:
while True:
try:
msg = await socket.receive()
event = client.receive(msg.data)
except KeyboardInterrupt:
break
if not isinstance(event, Ok):
print(event)
finally:
streamer.cancel()
await streamer
await socket.send_str(client.leave())

Receive enrich events

If you've subscribed to our Named Entities Recognition option, from time to time, when an NLU agent find interesting information in the conversation, it will emit enrich events.

The class in python has the same name but in CamelCase.

You can replace the previous snippet with this one:

# Update import with:
from uhlive.stream.conversation import EntityFound

# Replace previous code with:
try:
while True:
try:
event = client.receive(socket.recv())
except WebSocketTimeoutException:
print("— Silence —")
continue
except KeyboardInterrupt:
break
if not isinstance(event, Ok):
if isinstance(event, EntityFound):
print(f"[{event.speaker}] {event.canonical}")
# You can also dig into the event payload to get more data
canonical = event.canonical
original = event.original
start = event.start
end = event.end
print(f"[{start}->{end}]: '{canonical}' replaces '{original}'")
else:
print(event)
finally:
print("Exiting")
stream.stop()
stream.close()
socket.send(client.leave())
socket.close()
# Update import with:
from uhlive.stream.conversation import EntityFound

# Within the main function...
async def main(uhlive_client, uhlive_secret):
async with ClientSession() as session:
async with session.ws_connect(build_conversation_url(uhlive_token)) as socket:

# ... replace the previous code with this:
try:
while True:
try:
msg = await socket.receive()
event = client.receive(msg.data)
except KeyboardInterrupt:
break
if not isinstance(event, Ok):
if isinstance(event, EntityFound):
print(f"[{event.speaker}] {event.canonical}")
# You can also dig into the event payload to get more data
canonical = event.canonical
original = event.original
start = event.start
end = event.end
print(f"[{start}->{end}]: '{canonical}' replaces '{original}'")
else:
print(event)
finally:
streamer.cancel()
await streamer
await socket.send_str(client.leave())

Leave conversation

To cleanly leave a conversation without missing any transcription events from your audio stream, use the Conversation instance's .leave() method and wait for its .left flag to be true.

There you go! You are now able to send an audio and receive its transcription. Full version of these examples, with arguments to pass to the script to ease selection of model for example, are available on our repository:

Further reading

We provide some more examples in the SDK repository.

To dive in deeper, you can browse the API Reference documentation.

If you are stuck, want to suggest something, or just want to say hello, send us an e-mail to support@allo-media.fr.