In a recent project, I had the opportunity to explore into a technology called the Dubber API. Despite a scarcity of references and blog posts about it, I’ll be sharing insights into this API in this blog post.
Note: This content is distinct from the series of DevSecOps pipeline blogs.
Introduction
Firstly, Dubber API is an interface provided by Dubber, a company specializing in voice AI technology and call recording for various types of channels such as PSTN, VoIP, ISDN and etc. This API allows developers to integrate Dubber’s call recording and voice AI capabilities into their own applications or systems. With the Dubber API, developers can programmatically access features such as call recording, transcription, sentiment analysis, and keyword spotting.
Flowchart
Since we know what Dubber and Dubber API is now, I will explain in detail about the process we will follow to get the recordings using AWS Services. Here is the simple flowchart of the process
- To begin, I am using
AWS Lambda(referred to asGetCallDetailshenceforth) to fetch recording details from the Dubber Portal. - These retrieved items are dispatched to
AWS SQS. - Then, another
AWS Lambdafunction (GetDetailsFromSQS) is used to retrieve queued messages from AWS SQS and handle them - Post-processing, the media data is uploaded to an S3 bucket.
- Afterwards, I retrieve the metadata of the media object and store it in
AWS DynamoDB.
Generating an Access Token
The generation of access token to access the Dubber API is a complicated process in my opinion. Simply put, it requires access from 4 different tokens and 2 different portals.
- Dubber developer portal
- In the dubber developer portal you can get the
Mashery client IDandMashery client secretwhich constitute half of the credentials/tokens required to generate the access token. - Note: In order to get the
Mashery client IDandMashery client secretfrom the portal, you need to register an new application in the portal and wait for the approval process from the Dubber team which takes 2-3 business days. - After the successful registration of the application you should be able to see the 2 tokens under
My Applicationstab in your developer dashboard.
- In the dubber developer portal you can get the
- Dubber portal
- The dubber portal (not to be confused with the Dubber developer portal) is the one which contains all the recordings.
- You can access your
Auth IDandAuth tokenunder theMy profilepage after you login to your account.
After you get all the 4 tokens you can use the below curl command in order to generate the access token.
- Curl Command
curl -i -k -X POST "https://api.dubber.net/[REGION]/v1/token" -d 'client_id=[ClientID]&client_secret=[ClientSecret]&username=[AuthID]&password=[AuthToken]&grant_type=password' \
-H 'content-type: application/x-www-form-urlencoded'
Note: Remember to change the variables which are given inside the square brackets in the above curl command
Accessing the recordings
Coming to the main point here, we will use AWS Lambda to authorize using the token that we generated and fetch the details of the recordings stored in the Dubber portal. Below is the code to get the call details from the Dubber portal using python3
import requests
import json
import os
#Constants
BASE_URL="https://api.dubber.net/us/v1"
ACCESS_TOKEN = os.environ['ACCESS_TOKEN']
ACCOUNT_ID = os.environ['ACCOUNT_ID']
SQS_QUEUE_URL = os.environ['SQS_QUEUE_URL']
def lambda_handler(event, context):
headers = {"Authorization": f"Bearer {ACCESS_TOKEN}"}
get_recordings_url = f"{BASE_URL}/accounts/{ACCOUNT_ID}/recordings?count=10"
response = requests.get(get_recordings_url,headers=headers).json()
print(response)
Note:
- The Account ID is a required parameter which can be found under the
My profilepage as the last part of your account URL - The count parameter in the URL can be changed from 0 to 100 which depicts the number of calls that should be fetched.
- The
Account IDand theAccess Tokenare referenced as environment variables in the lambda function.
Pushing to SQS
To get a single call detail from the above response simply use a For loop to loop through all the records and push each call record to the SQS queue. Here is the code…
import boto3
def send_to_sqs(calls):
sqs = boto3.client('sqs')
try:
for call in calls["recordings"]:
message_body = {
"call_id": call["id"],
"call_to": call["to"],
"call_from": call["from"],
"call_channel": call["channel"] # More details can be added after this line if you need it
}
response = sqs.send_message(
QueueUrl=SQS_QUEUE_URL,
MessageBody=json.dumps(message_body)
)
return True
except:
return False
With this GetCallDetails lambda is done.
Processing recording and pushing to S3
Moving to the second lambda i.e, GetDetailsfromSQS, this stage has a lot of sub stages.
Polling messages from the SQS queue and deleting the message
import json
import boto3
import os
import requests
# Constants
BASE_URL="https://api.dubber.net/us/v1"
ACCESS_TOKEN = os.environ['ACCESS_TOKEN']
ACCOUNT_ID = os.environ['ACCOUNT_ID']
S3_BUCKET = os.environ['S3_BUCKET']
DYNAMODB_TABLE = os.environ['DYNAMODB_TABLE']
def lambda_handler(event, context):
sqs = boto3.client('sqs')
SQS_QUEUE_URL = os.environ['SQS_QUEUE_URL']
# Receive messages from SQS queue
response = sqs.receive_message(
QueueUrl=SQS_QUEUE_URL,
MaxNumberOfMessages=10,
VisibilityTimeout=30,
WaitTimeSeconds=10
)
messages = response.get('Messages', []) # Returns an empty list if there are no messages to be polled
if len(messages) == 0:
return "No messages pulled"
else:
for message in messages:
# Process the message
body = json.loads(message['Body'])
# Delete the message from the queue
sqs.delete_message(
QueueUrl=SQS_QUEUE_URL,
ReceiptHandle=message['ReceiptHandle']
)
else:
return "Error in fetching media URL"
return f"{len(messages)} recordings uploaded"
Get the account ID of the user
Just like our Account ID, each user has their own account ID which is needed as a parameter in order to fetch the media URL of the recorded call.
def get_user_id(message):
# Get the first name parameter
channel = message["call_channel"]
channel_list = channel.split(" ")
if len(channel_list) > 4: first_name = ' '.join(channel_list[0:3]) # This if condition is to get the first name depending on the length of username
elif len(channel_list) == 4: first_name = ' '.join(channel_list[0:2])
else: first_name = channel_list[0]
# Get the account response
url = f"{BASE_URL}/accounts/{ACCOUNT_ID}/users?count=100" # This lists the top 100 users in the subscription
headers = {"Authorization": f"Bearer {ACCESS_TOKEN}"}
response = requests.get(url, headers=headers).json()
for user in response["users"]:
if user["first_name"] == first_name:
user_id = user["id"]
return user_id
Get the recording URL
We can now get the recording URL of the recorded media using the below code. Note that you will not be able to get the recording URL without the listener parameter even if you hit the same API.
def get_recording_media_url(recording_id, user_id):
# Get the account response
url = f"{BASE_URL}/recordings/{recording_id}?listener={user_id}"
headers = {"Authorization": f"Bearer {ACCESS_TOKEN}"}
response = requests.get(url, headers=headers).json()
return response["recording_url"]
Download the recording and upload the content to S3
This stage is quite simple as we just need to upload the contents of the recorded media file into a desired S3 bucket.
def download_media(url, filename):
response = requests.get(url)
if response.status_code == 200:
s3 = boto3.client('s3')
s3.put_object(Bucket=S3_BUCKET, Key=filename, Body=response.content)
return True
else:
return False
Writing S3 object metadata to DynamoDB
Now all that is left is writing the metadata to the DynamoDB table.
def write_metadata(filename):
s3 = boto3.client('s3')
response = s3.head_object(Bucket=S3_BUCKET, Key=filename)
metadata = response['Metadata']
dynamodb = boto3.client('dynamodb')
dynamodb.put_item(
TableName=DYNAMODB_TABLE,
Item={
'object_key': {'S': filename},
'metadata': {'S': json.dumps(metadata)}
}
)
Code
Here is how the completed code looks like for
Lambda1 - GetCallDetials
import requests
import json
import boto3
import os
# Constants
BASE_URL="https://api.dubber.net/us/v1"
ACCESS_TOKEN = os.environ['ACCESS_TOKEN']
ACCOUNT_ID = os.environ['ACCOUNT_ID']
SQS_QUEUE_URL = os.environ['SQS_QUEUE_URL']
def send_to_sqs(calls):
sqs = boto3.client('sqs')
try:
for call in calls["recordings"]:
message_body = {
"call_id": call["id"],
"call_to": call["to"],
"call_from": call["from"],
"call_channel": call["channel"]
}
response = sqs.send_message(
QueueUrl=SQS_QUEUE_URL,
MessageBody=json.dumps(message_body)
)
return True
except:
return False
def lambda_handler(event, context):
headers = {"Authorization": f"Bearer {ACCESS_TOKEN}"}
get_recordings_url = f"{BASE_URL}/accounts/{ACCOUNT_ID}/recordings?count=10"
response = requests.get(get_recordings_url,headers=headers).json()
if send_to_sqs(response):
return 'Push to SQS successful'
else:
return 'Push to SQS failed'
Lambda2 - GetDetialsFromSQS
import json
import boto3
import os
import requests
# Constants
BASE_URL="https://api.dubber.net/us/v1"
ACCESS_TOKEN = os.environ['ACCESS_TOKEN']
ACCOUNT_ID = os.environ['ACCOUNT_ID']
S3_BUCKET = os.environ['S3_BUCKET']
DYNAMODB_TABLE = os.environ['DYNAMODB_TABLE']
def get_user_id(message):
# Get the first name parameter
channel = message["call_channel"]
channel_list = channel.split(" ")
if len(channel_list) > 4: first_name = ' '.join(channel_list[0:3])
elif len(channel_list) == 4: first_name = ' '.join(channel_list[0:2])
else: first_name = channel_list[0]
# Get the account response
url = f"{BASE_URL}/accounts/{ACCOUNT_ID}/users?count=100"
headers = {"Authorization": f"Bearer {ACCESS_TOKEN}"}
response = requests.get(url, headers=headers).json()
for user in response["users"]:
if user["first_name"] == first_name:
user_id = user["id"]
return user_id
def get_recording_media_url(recording_id, user_id):
# Get the account response
url = f"{BASE_URL}/recordings/{recording_id}?listener={user_id}"
headers = {"Authorization": f"Bearer {ACCESS_TOKEN}"}
response = requests.get(url, headers=headers).json()
return response["recording_url"]
def download_media(url, filename):
# Download the recording and upload the content to S3
response = requests.get(url)
if response.status_code == 200:
s3 = boto3.client('s3')
s3.put_object(Bucket=S3_BUCKET, Key=filename, Body=response.content)
return True
else:
return False
def write_metadata(filename):
s3 = boto3.client('s3')
response = s3.head_object(Bucket=S3_BUCKET, Key=filename)
metadata = response['Metadata']
# Write metadata to DynamoDB
dynamodb = boto3.client('dynamodb')
response = dynamodb.put_item(
TableName=DYNAMODB_TABLE,
Item={
'object_key': {'S': filename},
'metadata': {'S': json.dumps(metadata)}
}
)
if response.status_code == 200:
return True
else:
return False
def lambda_handler(event, context):
sqs = boto3.client('sqs')
SQS_QUEUE_URL = os.environ['SQS_QUEUE_URL']
# Receive messages from SQS queue
response = sqs.receive_message(
QueueUrl=SQS_QUEUE_URL,
MaxNumberOfMessages=10,
VisibilityTimeout=30,
WaitTimeSeconds=10
)
messages = response.get('Messages', [])
if len(messages) == 0:
return "No message pulled"
else:
for message in messages:
# Process the message
body = json.loads(message['Body'])
response_media = get_recording_media_url(body["call_id"] ,get_user_id(body))
if response_media:
if (download_media(response_media, f"{body["call_id"]}.mp3")):
if (write_metadata(f"{body["call_id"]}.mp3")):
# Delete the message from the queue
sqs.delete_message(
QueueUrl=SQS_QUEUE_URL,
ReceiptHandle=message['ReceiptHandle']
)
else:
return "Error in fetching media URL"
return f"{len(messages)} recordings uploaded and metadata stored"
Points to Note
- Remember to use the exact same REGION as the one when you created the application. If you are going to work on sandbox or the interactive API remember to change the REGION in the URL to
sandbox. - When you register an application you start a trial period for 30 days and when it expires you are charged for every user that uses the dubber portal. So, discuss in advance the pricing with the dubber team.
- Remember to store the call recording media in an EU S3 bucket if the call recording are made inside the European region as per the
GDPR.
That’s it for this post!! See you guys on the next one.