Thursday 4 July 2024

Check the Kendra from Local System using Boto3 and Lambda

 

import json
import boto3
import os

kendra_client = boto3.client('kendra')

def query_kendra(index_id, query_text):
    try:
        response = kendra_client.query(
            IndexId=index_id,
            QueryText=query_text
        )
        return response
    except Exception as e:
        print(f"Error querying Kendra: {str(e)}")
        raise

def serialize_kendra_response(response):
    results = []
    for result in response.get('ResultItems', []):
        item = {
            'Id': result.get('Id'),
            'Type': result.get('Type'),
            'DocumentId': result.get('DocumentId'),
            'DocumentTitle': result.get('DocumentTitle', {}).get('Text'),
            'DocumentExcerpt': result.get('DocumentExcerpt', {}).get('Text'),
            'DocumentURI': result.get('DocumentURI'),
        }
        results.append(item)
    return json.dumps(results, indent=4)

def lambda_handler(event, context):
    index_id = "REPLACE YOUR KENDRA ID"
    query_text = event.get('query', '')

    if not query_text:
        return {
            'statusCode': 400,
            'body': json.dumps({'message': 'Missing query text'})
        }

    try:
        kendra_response = query_kendra(index_id, query_text)
        serialized_response = serialize_kendra_response(kendra_response)
        print(serialized_response)
        
        return {
            'statusCode': 200,
            'body': serialized_response
        }
    except Exception as e:
        print(f"Error: {str(e)}")
        return {
            'statusCode': 500,
            'body': json.dumps({'message': 'Internal server error'})
        }