In a recent project, I had the opportunity to explore into a technology called the Dubber API
. Despite a scarcity of references and blog posts about it, I’ll be sharing insights into this API in this blog post.
Note: This content is distinct from the series of DevSecOps pipeline blogs.
Introduction
Firstly, Dubber API
is an interface provided by Dubber, a company specializing in voice AI technology and call recording for various types of channels such as PSTN, VoIP, ISDN and etc. This API allows developers to integrate Dubber’s call recording and voice AI capabilities into their own applications or systems. With the Dubber API, developers can programmatically access features such as call recording, transcription, sentiment analysis, and keyword spotting.
Flowchart
Since we know what Dubber and Dubber API is now, I will explain in detail about the process we will follow to get the recordings using AWS Services. Here is the simple flowchart of the process
- To begin, I am using
AWS Lambda
(referred to asGetCallDetails
henceforth) to fetch recording details from the Dubber Portal. - These retrieved items are dispatched to
AWS SQS
. - Then, another
AWS Lambda
function (GetDetailsFromSQS
) is used to retrieve queued messages from AWS SQS and handle them - Post-processing, the media data is uploaded to an S3 bucket.
- Afterwards, I retrieve the metadata of the media object and store it in
AWS DynamoDB
.
Generating an Access Token
The generation of access token to access the Dubber API is a complicated process in my opinion. Simply put, it requires access from 4 different tokens and 2 different portals.
- Dubber developer portal
- In the dubber developer portal you can get the
Mashery client ID
andMashery client secret
which constitute half of the credentials/tokens required to generate the access token. - Note: In order to get the
Mashery client ID
andMashery client secret
from the portal, you need to register an new application in the portal and wait for the approval process from the Dubber team which takes 2-3 business days. - After the successful registration of the application you should be able to see the 2 tokens under
My Applications
tab in your developer dashboard.
- In the dubber developer portal you can get the
- Dubber portal
- The dubber portal (not to be confused with the Dubber developer portal) is the one which contains all the recordings.
- You can access your
Auth ID
andAuth token
under theMy profile
page after you login to your account.
After you get all the 4 tokens you can use the below curl command in order to generate the access token.
- Curl Command
curl -i -k -X POST "https://api.dubber.net/[REGION]/v1/token" -d 'client_id=[ClientID]&client_secret=[ClientSecret]&username=[AuthID]&password=[AuthToken]&grant_type=password' \ -H 'content-type: application/x-www-form-urlencoded'
Note: Remember to change the variables which are given inside the square brackets in the above curl command
Accessing the recordings
Coming to the main point here, we will use AWS Lambda
to authorize using the token that we generated and fetch the details of the recordings stored in the Dubber portal
. Below is the code to get the call details from the Dubber portal using python3
import requests import json import os #Constants BASE_URL="https://api.dubber.net/us/v1" ACCESS_TOKEN = os.environ['ACCESS_TOKEN'] ACCOUNT_ID = os.environ['ACCOUNT_ID'] SQS_QUEUE_URL = os.environ['SQS_QUEUE_URL'] def lambda_handler(event, context): headers = {"Authorization": f"Bearer {ACCESS_TOKEN}"} get_recordings_url = f"{BASE_URL}/accounts/{ACCOUNT_ID}/recordings?count=10" response = requests.get(get_recordings_url,headers=headers).json() print(response)
Note:
- The Account ID is a required parameter which can be found under the
My profile
page as the last part of your account URL - The count parameter in the URL can be changed from 0 to 100 which depicts the number of calls that should be fetched.
- The
Account ID
and theAccess Token
are referenced as environment variables in the lambda function.
Pushing to SQS
To get a single call detail from the above response simply use a For
loop to loop through all the records and push each call record to the SQS queue. Here is the code…
import boto3 def send_to_sqs(calls): sqs = boto3.client('sqs') try: for call in calls["recordings"]: message_body = { "call_id": call["id"], "call_to": call["to"], "call_from": call["from"], "call_channel": call["channel"] # More details can be added after this line if you need it } response = sqs.send_message( QueueUrl=SQS_QUEUE_URL, MessageBody=json.dumps(message_body) ) return True except: return False
With this GetCallDetails
lambda is done.
Processing recording and pushing to S3
Moving to the second lambda i.e, GetDetailsfromSQS, this stage has a lot of sub stages.
Polling messages from the SQS queue and deleting the message
import json import boto3 import os import requests # Constants BASE_URL="https://api.dubber.net/us/v1" ACCESS_TOKEN = os.environ['ACCESS_TOKEN'] ACCOUNT_ID = os.environ['ACCOUNT_ID'] S3_BUCKET = os.environ['S3_BUCKET'] DYNAMODB_TABLE = os.environ['DYNAMODB_TABLE'] def lambda_handler(event, context): sqs = boto3.client('sqs') SQS_QUEUE_URL = os.environ['SQS_QUEUE_URL'] # Receive messages from SQS queue response = sqs.receive_message( QueueUrl=SQS_QUEUE_URL, MaxNumberOfMessages=10, VisibilityTimeout=30, WaitTimeSeconds=10 ) messages = response.get('Messages', []) # Returns an empty list if there are no messages to be polled if len(messages) == 0: return "No messages pulled" else: for message in messages: # Process the message body = json.loads(message['Body']) # Delete the message from the queue sqs.delete_message( QueueUrl=SQS_QUEUE_URL, ReceiptHandle=message['ReceiptHandle'] ) else: return "Error in fetching media URL" return f"{len(messages)} recordings uploaded"
Get the account ID of the user
Just like our Account ID
, each user has their own account ID which is needed as a parameter in order to fetch the media URL of the recorded call.
def get_user_id(message): # Get the first name parameter channel = message["call_channel"] channel_list = channel.split(" ") if len(channel_list) > 4: first_name = ' '.join(channel_list[0:3]) # This if condition is to get the first name depending on the length of username elif len(channel_list) == 4: first_name = ' '.join(channel_list[0:2]) else: first_name = channel_list[0] # Get the account response url = f"{BASE_URL}/accounts/{ACCOUNT_ID}/users?count=100" # This lists the top 100 users in the subscription headers = {"Authorization": f"Bearer {ACCESS_TOKEN}"} response = requests.get(url, headers=headers).json() for user in response["users"]: if user["first_name"] == first_name: user_id = user["id"] return user_id
Get the recording URL
We can now get the recording URL of the recorded media using the below code. Note that you will not be able to get the recording URL without the listener parameter even if you hit the same API.
def get_recording_media_url(recording_id, user_id): # Get the account response url = f"{BASE_URL}/recordings/{recording_id}?listener={user_id}" headers = {"Authorization": f"Bearer {ACCESS_TOKEN}"} response = requests.get(url, headers=headers).json() return response["recording_url"]
Download the recording and upload the content to S3
This stage is quite simple as we just need to upload the contents of the recorded media file into a desired S3 bucket.
def download_media(url, filename): response = requests.get(url) if response.status_code == 200: s3 = boto3.client('s3') s3.put_object(Bucket=S3_BUCKET, Key=filename, Body=response.content) return True else: return False
Writing S3 object metadata to DynamoDB
Now all that is left is writing the metadata to the DynamoDB table
.
def write_metadata(filename): s3 = boto3.client('s3') response = s3.head_object(Bucket=S3_BUCKET, Key=filename) metadata = response['Metadata'] dynamodb = boto3.client('dynamodb') dynamodb.put_item( TableName=DYNAMODB_TABLE, Item={ 'object_key': {'S': filename}, 'metadata': {'S': json.dumps(metadata)} } )
Code
Here is how the completed code looks like for
Lambda1 - GetCallDetials
import requests import json import boto3 import os # Constants BASE_URL="https://api.dubber.net/us/v1" ACCESS_TOKEN = os.environ['ACCESS_TOKEN'] ACCOUNT_ID = os.environ['ACCOUNT_ID'] SQS_QUEUE_URL = os.environ['SQS_QUEUE_URL'] def send_to_sqs(calls): sqs = boto3.client('sqs') try: for call in calls["recordings"]: message_body = { "call_id": call["id"], "call_to": call["to"], "call_from": call["from"], "call_channel": call["channel"] } response = sqs.send_message( QueueUrl=SQS_QUEUE_URL, MessageBody=json.dumps(message_body) ) return True except: return False def lambda_handler(event, context): headers = {"Authorization": f"Bearer {ACCESS_TOKEN}"} get_recordings_url = f"{BASE_URL}/accounts/{ACCOUNT_ID}/recordings?count=10" response = requests.get(get_recordings_url,headers=headers).json() if send_to_sqs(response): return 'Push to SQS successful' else: return 'Push to SQS failed'
Lambda2 - GetDetialsFromSQS
import json import boto3 import os import requests # Constants BASE_URL="https://api.dubber.net/us/v1" ACCESS_TOKEN = os.environ['ACCESS_TOKEN'] ACCOUNT_ID = os.environ['ACCOUNT_ID'] S3_BUCKET = os.environ['S3_BUCKET'] DYNAMODB_TABLE = os.environ['DYNAMODB_TABLE'] def get_user_id(message): # Get the first name parameter channel = message["call_channel"] channel_list = channel.split(" ") if len(channel_list) > 4: first_name = ' '.join(channel_list[0:3]) elif len(channel_list) == 4: first_name = ' '.join(channel_list[0:2]) else: first_name = channel_list[0] # Get the account response url = f"{BASE_URL}/accounts/{ACCOUNT_ID}/users?count=100" headers = {"Authorization": f"Bearer {ACCESS_TOKEN}"} response = requests.get(url, headers=headers).json() for user in response["users"]: if user["first_name"] == first_name: user_id = user["id"] return user_id def get_recording_media_url(recording_id, user_id): # Get the account response url = f"{BASE_URL}/recordings/{recording_id}?listener={user_id}" headers = {"Authorization": f"Bearer {ACCESS_TOKEN}"} response = requests.get(url, headers=headers).json() return response["recording_url"] def download_media(url, filename): # Download the recording and upload the content to S3 response = requests.get(url) if response.status_code == 200: s3 = boto3.client('s3') s3.put_object(Bucket=S3_BUCKET, Key=filename, Body=response.content) return True else: return False def write_metadata(filename): s3 = boto3.client('s3') response = s3.head_object(Bucket=S3_BUCKET, Key=filename) metadata = response['Metadata'] # Write metadata to DynamoDB dynamodb = boto3.client('dynamodb') response = dynamodb.put_item( TableName=DYNAMODB_TABLE, Item={ 'object_key': {'S': filename}, 'metadata': {'S': json.dumps(metadata)} } ) if response.status_code == 200: return True else: return False def lambda_handler(event, context): sqs = boto3.client('sqs') SQS_QUEUE_URL = os.environ['SQS_QUEUE_URL'] # Receive messages from SQS queue response = sqs.receive_message( QueueUrl=SQS_QUEUE_URL, MaxNumberOfMessages=10, VisibilityTimeout=30, WaitTimeSeconds=10 ) messages = response.get('Messages', []) if len(messages) == 0: return "No message pulled" else: for message in messages: # Process the message body = json.loads(message['Body']) response_media = get_recording_media_url(body["call_id"] ,get_user_id(body)) if response_media: if (download_media(response_media, f"{body["call_id"]}.mp3")): if (write_metadata(f"{body["call_id"]}.mp3")): # Delete the message from the queue sqs.delete_message( QueueUrl=SQS_QUEUE_URL, ReceiptHandle=message['ReceiptHandle'] ) else: return "Error in fetching media URL" return f"{len(messages)} recordings uploaded and metadata stored"
Points to Note
- Remember to use the exact same REGION as the one when you created the application. If you are going to work on sandbox or the interactive API remember to change the REGION in the URL to
sandbox
. - When you register an application you start a trial period for 30 days and when it expires you are charged for every user that uses the dubber portal. So, discuss in advance the pricing with the dubber team.
- Remember to store the call recording media in an EU S3 bucket if the call recording are made inside the European region as per the
GDPR
.
That’s it for this post!! See you guys on the next one.