This function will read an array of questions and run them against a specified capability (Kendra, OpenSearch, Q), return the response received, the duration of the request, and any token counts, and write them to a DynamoDB database.
Here is an example implementation using AWS SDK for Python (Boto3):
Prerequisites
- AWS CLI: Ensure AWS CLI is installed and configured.
- Boto3: Install Boto3 if not already installed using
pip install boto3
. - AWS Lambda Execution Role: Create an IAM role with the necessary permissions for Lambda, DynamoDB, and the services you're using (Kendra, OpenSearch, Q).
Lambda Function Code
import json
import time
import boto3
from boto3.dynamodb.conditions import Key
# Initialize clients
dynamodb = boto3.resource('dynamodb')
kendra = boto3.client('kendra')
opensearch = boto3.client('opensearch')
# DynamoDB table name
TABLE_NAME = 'your_dynamodb_table'
def query_kendra(question):
# Replace with your Kendra index ID
index_id = 'your_kendra_index_id'
response = kendra.query(
IndexId=index_id,
QueryText=question
)
return response
def query_opensearch(question):
# Replace with your OpenSearch endpoint
endpoint = 'your_opensearch_endpoint'
response = opensearch.search(
body={
"query": {
"match": {
"content": question
}
}
},
index="your_index"
)
return response
def query_q(question):
# Implement the query logic for Q
response = {"answer": "Response from Q"}
return response
def lambda_handler(event, context):
questions = event['questions']
capability = event['capability']
results = []
for question in questions:
start_time = time.time()
if capability == 'kendra':
response = query_kendra(question)
elif capability == 'opensearch':
response = query_opensearch(question)
elif capability == 'q':
response = query_q(question)
else:
response = {"error": "Invalid capability"}
duration = time.time() - start_time
# Optional: Get token count if applicable
token_count = len(question.split()) # Example token count
result = {
'question': question,
'response': response,
'duration': duration,
'token_count': token_count
}
# Write result to DynamoDB
table = dynamodb.Table(TABLE_NAME)
table.put_item(Item=result)
results.append(result)
return {
'statusCode': 200,
'body': json.dumps(results)
}Instructions
Create DynamoDB Table:
- Create a DynamoDB table with a suitable primary key structure to store the results.
Deploy Lambda Function:
- Deploy this Lambda function through the AWS Management Console or using AWS CLI.
Test Event:
- Create a test event in AWS Lambda with the following structure:
json
{
"questions": ["What is AI?", "Explain machine learning."],
"capability": "kendra"
}- Adjust the
capability
field to test different capabilities (kendra
,opensearch
,q
)- Permissions:
- Ensure the Lambda function has permissions to access DynamoDB, Kendra, OpenSearch, and any other necessary AWS resources.
By following these steps, you will have a Lambda function that reads questions, queries different services, measures the response time, counts tokens, and writes the results to DynamoDB.