import json
import boto3
import os
kendra_client = boto3.client('kendra')
def query_kendra(index_id, query_text):
try:
response = kendra_client.query(
IndexId=index_id,
QueryText=query_text
)
return response
except Exception as e:
print(f"Error querying Kendra: {str(e)}")
raise
def serialize_kendra_response(response):
results = []
for result in response.get('ResultItems', []):
item = {
'Id': result.get('Id'),
'Type': result.get('Type'),
'DocumentId': result.get('DocumentId'),
'DocumentTitle': result.get('DocumentTitle', {}).get('Text'),
'DocumentExcerpt': result.get('DocumentExcerpt', {}).get('Text'),
'DocumentURI': result.get('DocumentURI'),
}
results.append(item)
return json.dumps(results, indent=4)
def lambda_handler(event, context):
index_id = "REPLACE YOUR KENDRA ID"
query_text = event.get('query', '')
if not query_text:
return {
'statusCode': 400,
'body': json.dumps({'message': 'Missing query text'})
}
try:
kendra_response = query_kendra(index_id, query_text)
serialized_response = serialize_kendra_response(kendra_response)
print(serialized_response)
return {
'statusCode': 200,
'body': serialized_response
}
except Exception as e:
print(f"Error: {str(e)}")
return {
'statusCode': 500,
'body': json.dumps({'message': 'Internal server error'})
}
No comments:
Post a Comment