Posts Dubber API - Call recordings
Post
Cancel

Dubber API - Call recordings


In a recent project, I had the opportunity to explore into a technology called the Dubber API. Despite a scarcity of references and blog posts about it, I’ll be sharing insights into this API in this blog post.

Note: This content is distinct from the series of DevSecOps pipeline blogs.


Introduction


Cover Firstly, Dubber API is an interface provided by Dubber, a company specializing in voice AI technology and call recording for various types of channels such as PSTN, VoIP, ISDN and etc. This API allows developers to integrate Dubber’s call recording and voice AI capabilities into their own applications or systems. With the Dubber API, developers can programmatically access features such as call recording, transcription, sentiment analysis, and keyword spotting.


Flowchart


Since we know what Dubber and Dubber API is now, I will explain in detail about the process we will follow to get the recordings using AWS Services. Here is the simple flowchart of the process
Dubber_Architecture

  1. To begin, I am using AWS Lambda (referred to as GetCallDetails henceforth) to fetch recording details from the Dubber Portal.
  2. These retrieved items are dispatched to AWS SQS.
  3. Then, another AWS Lambda function (GetDetailsFromSQS) is used to retrieve queued messages from AWS SQS and handle them
  4. Post-processing, the media data is uploaded to an S3 bucket.
  5. Afterwards, I retrieve the metadata of the media object and store it in AWS DynamoDB.

Generating an Access Token


The generation of access token to access the Dubber API is a complicated process in my opinion. Simply put, it requires access from 4 different tokens and 2 different portals.

  1. Dubber developer portal
    • In the dubber developer portal you can get the Mashery client ID and Mashery client secret which constitute half of the credentials/tokens required to generate the access token.
    • Note: In order to get the Mashery client ID and Mashery client secret from the portal, you need to register an new application in the portal and wait for the approval process from the Dubber team which takes 2-3 business days.
    • After the successful registration of the application you should be able to see the 2 tokens under My Applications tab in your developer dashboard.
  2. Dubber portal
    • The dubber portal (not to be confused with the Dubber developer portal) is the one which contains all the recordings.
    • You can access your Auth ID and Auth token under the My profile page after you login to your account.


After you get all the 4 tokens you can use the below curl command in order to generate the access token.

  • Curl Command
curl -i -k -X POST "https://api.dubber.net/[REGION]/v1/token" -d 'client_id=[ClientID]&client_secret=[ClientSecret]&username=[AuthID]&password=[AuthToken]&grant_type=password' \
    -H 'content-type: application/x-www-form-urlencoded'

Note: Remember to change the variables which are given inside the square brackets in the above curl command


Accessing the recordings


Coming to the main point here, we will use AWS Lambda to authorize using the token that we generated and fetch the details of the recordings stored in the Dubber portal. Below is the code to get the call details from the Dubber portal using python3

import requests
import json
import os

#Constants
BASE_URL="https://api.dubber.net/us/v1"
ACCESS_TOKEN = os.environ['ACCESS_TOKEN']
ACCOUNT_ID = os.environ['ACCOUNT_ID']
SQS_QUEUE_URL = os.environ['SQS_QUEUE_URL']

def lambda_handler(event, context):
    headers = {"Authorization": f"Bearer {ACCESS_TOKEN}"}
    get_recordings_url = f"{BASE_URL}/accounts/{ACCOUNT_ID}/recordings?count=10"
    response = requests.get(get_recordings_url,headers=headers).json()
    print(response)
    

Note:

  1. The Account ID is a required parameter which can be found under the My profile page as the last part of your account URL
  2. The count parameter in the URL can be changed from 0 to 100 which depicts the number of calls that should be fetched.
  3. The Account ID and the Access Token are referenced as environment variables in the lambda function.

Pushing to SQS


To get a single call detail from the above response simply use a For loop to loop through all the records and push each call record to the SQS queue. Here is the code…

import boto3

def send_to_sqs(calls):
    sqs = boto3.client('sqs')
    try:
        for call in calls["recordings"]:
            message_body = {
                "call_id": call["id"],
                "call_to": call["to"],
                "call_from": call["from"],
                "call_channel": call["channel"] # More details can be added after this line if you need it
            }
            response = sqs.send_message(
                QueueUrl=SQS_QUEUE_URL,
                MessageBody=json.dumps(message_body)
            )
        return True
    except:
        return False

With this GetCallDetails lambda is done.


Processing recording and pushing to S3


Moving to the second lambda i.e, GetDetailsfromSQS, this stage has a lot of sub stages.

Polling messages from the SQS queue and deleting the message

import json
import boto3
import os
import requests

# Constants
BASE_URL="https://api.dubber.net/us/v1"
ACCESS_TOKEN = os.environ['ACCESS_TOKEN']
ACCOUNT_ID = os.environ['ACCOUNT_ID']
S3_BUCKET = os.environ['S3_BUCKET']
DYNAMODB_TABLE = os.environ['DYNAMODB_TABLE']

def lambda_handler(event, context):
    sqs = boto3.client('sqs')
    SQS_QUEUE_URL = os.environ['SQS_QUEUE_URL']
    
    # Receive messages from SQS queue
    response = sqs.receive_message(
        QueueUrl=SQS_QUEUE_URL,
        MaxNumberOfMessages=10,
        VisibilityTimeout=30,
        WaitTimeSeconds=10
    )

    messages = response.get('Messages', []) # Returns an empty list if there are no messages to be polled
    if len(messages) == 0: 
        return "No messages pulled"
    else:
        for message in messages:
            # Process the message
            body = json.loads(message['Body'])
                
                # Delete the message from the queue
                sqs.delete_message(
                    QueueUrl=SQS_QUEUE_URL,
                    ReceiptHandle=message['ReceiptHandle']
                )
            else:
                return "Error in fetching media URL"
    return f"{len(messages)} recordings uploaded"

Get the account ID of the user

Just like our Account ID, each user has their own account ID which is needed as a parameter in order to fetch the media URL of the recorded call.

def get_user_id(message):
    
    # Get the first name parameter
    channel = message["call_channel"]
    channel_list = channel.split(" ")
    if len(channel_list) > 4: first_name = ' '.join(channel_list[0:3]) # This if condition is to get the first name depending on the length of username
    elif len(channel_list) == 4: first_name = ' '.join(channel_list[0:2])
    else: first_name = channel_list[0]
    
    # Get the account response 
    url = f"{BASE_URL}/accounts/{ACCOUNT_ID}/users?count=100" # This lists the top 100 users in the subscription
    headers = {"Authorization": f"Bearer {ACCESS_TOKEN}"}
    response = requests.get(url, headers=headers).json()
    
    for user in response["users"]:
        if user["first_name"] == first_name:
            user_id = user["id"]
            return user_id

Get the recording URL

We can now get the recording URL of the recorded media using the below code. Note that you will not be able to get the recording URL without the listener parameter even if you hit the same API.

def get_recording_media_url(recording_id, user_id):
    # Get the account response
    url = f"{BASE_URL}/recordings/{recording_id}?listener={user_id}"
    headers = {"Authorization": f"Bearer {ACCESS_TOKEN}"}
    response = requests.get(url, headers=headers).json()
    return response["recording_url"]

Download the recording and upload the content to S3

This stage is quite simple as we just need to upload the contents of the recorded media file into a desired S3 bucket.

def download_media(url, filename):
    response = requests.get(url)
    if response.status_code == 200:
        s3 = boto3.client('s3')
        s3.put_object(Bucket=S3_BUCKET, Key=filename, Body=response.content)
        return True
    else:
        return False

Writing S3 object metadata to DynamoDB


Now all that is left is writing the metadata to the DynamoDB table.

def write_metadata(filename):
    s3 = boto3.client('s3')
    response = s3.head_object(Bucket=S3_BUCKET, Key=filename)
    metadata = response['Metadata']
    
    dynamodb = boto3.client('dynamodb')
    dynamodb.put_item(
        TableName=DYNAMODB_TABLE,
        Item={
            'object_key': {'S': filename},
            'metadata': {'S': json.dumps(metadata)}
        }
    )

Code


Here is how the completed code looks like for

Lambda1 - GetCallDetials

import requests
import json
import boto3
import os

# Constants
BASE_URL="https://api.dubber.net/us/v1"
ACCESS_TOKEN = os.environ['ACCESS_TOKEN']
ACCOUNT_ID = os.environ['ACCOUNT_ID']
SQS_QUEUE_URL = os.environ['SQS_QUEUE_URL']

def send_to_sqs(calls):
    sqs = boto3.client('sqs')
    try:
        for call in calls["recordings"]:
            message_body = {
                "call_id": call["id"],
                "call_to": call["to"],
                "call_from": call["from"],
                "call_channel": call["channel"]
            }
            response = sqs.send_message(
                QueueUrl=SQS_QUEUE_URL,
                MessageBody=json.dumps(message_body)
            )
        return True
    except:
        return False

def lambda_handler(event, context):
    headers = {"Authorization": f"Bearer {ACCESS_TOKEN}"}
    get_recordings_url = f"{BASE_URL}/accounts/{ACCOUNT_ID}/recordings?count=10"
    response = requests.get(get_recordings_url,headers=headers).json()
    if send_to_sqs(response):
        return 'Push to SQS successful'
    else:
        return 'Push to SQS failed'

Lambda2 - GetDetialsFromSQS

import json
import boto3
import os
import requests

# Constants
BASE_URL="https://api.dubber.net/us/v1"
ACCESS_TOKEN = os.environ['ACCESS_TOKEN']
ACCOUNT_ID = os.environ['ACCOUNT_ID']
S3_BUCKET = os.environ['S3_BUCKET']
DYNAMODB_TABLE = os.environ['DYNAMODB_TABLE']


def get_user_id(message):
    # Get the first name parameter
    channel = message["call_channel"]
    channel_list = channel.split(" ")
    if len(channel_list) > 4: first_name = ' '.join(channel_list[0:3])
    elif len(channel_list) == 4: first_name = ' '.join(channel_list[0:2])
    else: first_name = channel_list[0]
    
    # Get the account response 
    url = f"{BASE_URL}/accounts/{ACCOUNT_ID}/users?count=100"
    headers = {"Authorization": f"Bearer {ACCESS_TOKEN}"}
    response = requests.get(url, headers=headers).json()
    
    for user in response["users"]:
        if user["first_name"] == first_name:
            user_id = user["id"]
            return user_id
    

def get_recording_media_url(recording_id, user_id):
    # Get the account response
    url = f"{BASE_URL}/recordings/{recording_id}?listener={user_id}"
    headers = {"Authorization": f"Bearer {ACCESS_TOKEN}"}
    response = requests.get(url, headers=headers).json()
    return response["recording_url"] 


def download_media(url, filename):
    # Download the recording and upload the content to S3
    response = requests.get(url)
    if response.status_code == 200:
        s3 = boto3.client('s3')
        s3.put_object(Bucket=S3_BUCKET, Key=filename, Body=response.content)
        return True
    else:
        return False
        

def write_metadata(filename):
    s3 = boto3.client('s3')
    response = s3.head_object(Bucket=S3_BUCKET, Key=filename)
    metadata = response['Metadata']
    
    # Write metadata to DynamoDB
    dynamodb = boto3.client('dynamodb')
    response = dynamodb.put_item(
        TableName=DYNAMODB_TABLE,
        Item={
            'object_key': {'S': filename},
            'metadata': {'S': json.dumps(metadata)}
        }
    )
    if response.status_code == 200:
        return True
    else:
        return False


def lambda_handler(event, context):
    sqs = boto3.client('sqs')
    SQS_QUEUE_URL = os.environ['SQS_QUEUE_URL']
    
    # Receive messages from SQS queue
    response = sqs.receive_message(
        QueueUrl=SQS_QUEUE_URL,
        MaxNumberOfMessages=10,
        VisibilityTimeout=30,
        WaitTimeSeconds=10
    )

    messages = response.get('Messages', [])
    if len(messages) == 0: 
        return "No message pulled"
    else:
        for message in messages:
            # Process the message
            body = json.loads(message['Body'])
            response_media = get_recording_media_url(body["call_id"] ,get_user_id(body))
            if response_media:
                if (download_media(response_media, f"{body["call_id"]}.mp3")):
                    if (write_metadata(f"{body["call_id"]}.mp3")):
                        # Delete the message from the queue
                        sqs.delete_message(
                            QueueUrl=SQS_QUEUE_URL,
                            ReceiptHandle=message['ReceiptHandle']
                        )
            else:
                return "Error in fetching media URL"
    return f"{len(messages)} recordings uploaded and metadata stored"

Points to Note

  1. Remember to use the exact same REGION as the one when you created the application. If you are going to work on sandbox or the interactive API remember to change the REGION in the URL to sandbox.
  2. When you register an application you start a trial period for 30 days and when it expires you are charged for every user that uses the dubber portal. So, discuss in advance the pricing with the dubber team.
  3. Remember to store the call recording media in an EU S3 bucket if the call recording are made inside the European region as per the GDPR.

That’s it for this post!! See you guys on the next one.

This post is licensed under CC BY 4.0 by the author.