Skip to content

Commit 10f556f

Browse files
author
Dinesh Sajwan
committed
feat(merge): merge qa with data ingestion changes
2 parents f04ea6b + 6c17076 commit 10f556f

File tree

10 files changed

+545
-55
lines changed

10 files changed

+545
-55
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
#
2+
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance
5+
# with the License. A copy of the License is located at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# or in the 'license' file accompanying this file. This file is distributed on an 'AS IS' BASIS, WITHOUT WARRANTIES
10+
# OR CONDITIONS OF ANY KIND, express or implied. See the License for the specific language governing permissions
11+
# and limitations under the License.
12+
#
13+
import base64
14+
import json
15+
import os
16+
from typing import List
17+
from aiohttp import ClientError
18+
19+
from aws_lambda_powertools import Logger, Tracer
20+
#from langchain_community.document_loaders.image import UnstructuredImageLoader
21+
from langchain.docstore.document import Document
22+
23+
import boto3
24+
25+
s3 = boto3.client('s3')
26+
27+
logger = Logger(service="INGESTION_FILE_TRANSFORMER")
28+
tracer = Tracer(service="INGESTION_FILE_TRANSFORMER")
29+
30+
@tracer.capture_method
31+
class image_loader():
32+
"""Loading logic for pdf documents from s3 ."""
33+
34+
def __init__(self, bucket: str, image_file: str,image_detail_file: str):
35+
"""Initialize with bucket and key name."""
36+
self.bucket = bucket
37+
self.image_file = image_file
38+
self.image_detail_file = image_detail_file
39+
40+
41+
42+
@tracer.capture_method
43+
def load(self):
44+
"""Load documents."""
45+
try:
46+
local_file_path = self.download_file(self.image_file)
47+
print(f"file downloaded :: {local_file_path}")
48+
49+
with open(f"{local_file_path}", "rb") as image_file:
50+
input_image = base64.b64encode(image_file.read()).decode("utf8")
51+
52+
s3 = boto3.resource('s3')
53+
obj = s3.Object(self.bucket, self.image_detail_file)
54+
raw_text = obj.get()['Body'].read().decode('utf-8')
55+
56+
metadata = {"source": self.image_file}
57+
58+
docs = json.dumps({
59+
"inputImage": input_image,
60+
#"inputText": raw_text,
61+
})
62+
#print(f'docs for titan embeddings {docs}')
63+
return [Document(page_content=docs, metadata=metadata)]
64+
65+
except Exception as exception:
66+
logger.exception(f"Reason: {exception}")
67+
return ""
68+
69+
@tracer.capture_method
70+
def get_presigned_url(self) -> str:
71+
try:
72+
url = s3.generate_presigned_url(
73+
ClientMethod='get_object',
74+
Params={'Bucket': self.bucket, 'Key': self.image_file},
75+
ExpiresIn=900
76+
)
77+
print(f"presigned url generated for {self.image_file} from {self.bucket}")
78+
return url
79+
except Exception as exception:
80+
logger.exception(f"Reason: {exception}")
81+
return ""
82+
83+
@tracer.capture_method
84+
def download_file(self,key )-> str:
85+
try:
86+
file_path = "/tmp/" + os.path.basename(key)
87+
s3.download_file(self.bucket, key,file_path)
88+
print(f"file downloaded {file_path}")
89+
return file_path
90+
except ClientError as client_err:
91+
print(f"Couldn\'t download file {client_err.response['Error']['Message']}")
92+
93+
except Exception as exp:
94+
print(f"Couldn\'t download file : {exp}")
95+
96+
@tracer.capture_method
97+
def prepare_document_for_direct_load(self)->any:
98+
local_file_path = self.download_file(self.image_file)
99+
print(f" prepare os_document")
100+
101+
with open(f"{local_file_path}", "rb") as image_file:
102+
input_image = base64.b64encode(image_file.read()).decode("utf8")
103+
104+
s3 = boto3.resource('s3')
105+
obj = s3.Object(self.bucket, self.image_detail_file)
106+
raw_text = obj.get()['Body'].read().decode('utf-8')
107+
108+
metadata = {"source": self.image_file}
109+
110+
docs = json.dumps({
111+
"inputImage": input_image,
112+
#"inputText": raw_text,
113+
})
114+
115+
os_document = {
116+
"image_words": raw_text,
117+
"image_vector": input_image,
118+
}
119+
print (f'os_document prepared ')
120+
return os_document

lambda/aws-rag-appsync-stepfn-opensearch/embeddings_job/src/helpers/opensearch_helper.py

+52-3
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,12 @@ def check_if_index_exists(index_name: str, region: str, host: str, http_auth: Tu
3737
print(f"index_name={index_name}, exists={exists}")
3838
return exists
3939

40-
def process_shard(shard, os_index_name, os_domain_ep, os_http_auth) -> int:
40+
def process_shard(shard, os_index_name, os_domain_ep, os_http_auth,model_id) -> int:
4141
print(f'Starting process_shard of {len(shard)} chunks.')
4242
bedrock_client = boto3.client('bedrock-runtime')
4343
embeddings = BedrockEmbeddings(
4444
client=bedrock_client,
45-
model_id="amazon.titan-embed-text-v1")
45+
model_id=model_id)
4646
opensearch_url = os_domain_ep if os_domain_ep.startswith("https://") else f"https://{os_domain_ep}"
4747
docsearch = OpenSearchVectorSearch(index_name=os_index_name,
4848
embedding_function=embeddings,
@@ -53,4 +53,53 @@ def process_shard(shard, os_index_name, os_domain_ep, os_http_auth) -> int:
5353
connection_class = RequestsHttpConnection)
5454
docsearch.add_documents(documents=shard)
5555
print(f'Shard completed')
56-
return 0
56+
return 0
57+
58+
# TODO - Use this to create index in OS if langchain community process_shard throws issues with image.
59+
# otherwise remove this function.
60+
def create_index_for_image(index_name: str, region: str, host: str, http_auth: Tuple[str, str],document):
61+
# Create Index, generates a warning if index already exists
62+
print(f' create index on os without langchain utility ')
63+
aos_client = OpenSearch(
64+
hosts = [{'host': host.replace("https://", ""), 'port': 443}],
65+
http_auth = http_auth,
66+
use_ssl = True,
67+
verify_certs = True,
68+
connection_class = RequestsHttpConnection
69+
)
70+
71+
# Create an index
72+
if not aos_client.indices.exists(index=index_name):
73+
print(f' conection made , creating index....')
74+
aos_client.indices.create(
75+
index=index_name,
76+
body={
77+
"settings":{
78+
"index.knn": True,
79+
"index.knn.space_type": "cosinesimil",
80+
"analysis": {
81+
"analyzer": {"default": {"type": "standard", "stopwords": "_english_"}}
82+
},
83+
},
84+
"mappings":{
85+
"properties": {
86+
"image_vector": {
87+
"type": "knn_vector",
88+
"dimension": len(document["image_vector"]),
89+
"store": True,
90+
},
91+
"image_path": {"type": "text", "store": True},
92+
"image_words": {"type": "text", "store": True},
93+
"celebrities": {"type": "text", "store": True},
94+
}
95+
}
96+
}
97+
98+
)
99+
else:
100+
print(f' index already exist , document loading ....')
101+
# Create documents
102+
result= aos_client.index(
103+
index=index_name, body=document
104+
)
105+
print(' embeddings uploaded in os {result}')

lambda/aws-rag-appsync-stepfn-opensearch/embeddings_job/src/lambda.py

+74-26
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,12 @@
1818
import numpy as np
1919
import tempfile
2020
from helpers.credentials_helper import get_credentials
21-
from helpers.opensearch_helper import check_if_index_exists, process_shard
21+
from helpers.csv_loader import csv_loader
22+
from helpers.image_loader import image_loader
23+
from helpers.msdoc_loader import msdoc_loader
24+
from helpers.html_loader import html_loader
25+
26+
from helpers.opensearch_helper import check_if_index_exists, process_shard, create_index_for_image
2227
from helpers.update_ingestion_status import updateIngestionJobStatus
2328
from langchain_community.embeddings import BedrockEmbeddings
2429
from helpers.s3inmemoryloader import S3TxtFileLoaderInMemory
@@ -43,6 +48,7 @@
4348

4449
opensearch_secret_id = os.environ['OPENSEARCH_SECRET_ID']
4550
bucket_name = os.environ['OUTPUT_BUCKET']
51+
# TODO: add input_bucket for csv|images
4652
opensearch_index = os.environ['OPENSEARCH_INDEX']
4753
opensearch_domain = os.environ['OPENSEARCH_DOMAIN_ENDPOINT']
4854
opensearch_api_name = os.environ['OPENSEARCH_API_NAME']
@@ -56,9 +62,10 @@
5662
PROCESS_COUNT=5
5763
INDEX_FILE="index_file"
5864

59-
def process_documents_in_es(index_exists, shards, http_auth):
65+
def process_documents_in_es(index_exists, shards, http_auth,model_id):
6066
bedrock_client = boto3.client('bedrock-runtime')
61-
embeddings = BedrockEmbeddings(client=bedrock_client)
67+
embeddings = BedrockEmbeddings(client=bedrock_client,model_id=model_id)
68+
print(f' Bedrock embeddings model id :: {embeddings.model_id}')
6269

6370
if index_exists is False:
6471
# create an index if the create index hint file exists
@@ -104,11 +111,13 @@ def process_documents_in_es(index_exists, shards, http_auth):
104111
os_domain_ep=opensearch_domain,
105112
os_http_auth=http_auth)
106113

107-
def process_documents_in_aoss(index_exists, shards, http_auth):
114+
def process_documents_in_aoss(index_exists, shards, http_auth,model_id):
108115
# Reference: https://python.langchain.com/docs/integrations/vectorstores/opensearch#using-aoss-amazon-opensearch-service-serverless
109116
bedrock_client = boto3.client('bedrock-runtime')
110-
embeddings = BedrockEmbeddings(client=bedrock_client)
111-
117+
embeddings = BedrockEmbeddings(client=bedrock_client,model_id=model_id)
118+
119+
print(f' Bedrock embeddings model id :: {embeddings.model_id}')
120+
112121
shard_start_index = 0
113122
if index_exists is False:
114123
OpenSearchVectorSearch.from_documents(
@@ -125,18 +134,19 @@ def process_documents_in_aoss(index_exists, shards, http_auth):
125134
)
126135
# we now need to start the loop below for the second shard
127136
shard_start_index = 1
128-
137+
print(f'statrt processing shard')
129138
for shard in shards[shard_start_index:]:
130139
results = process_shard(shard=shard,
131140
os_index_name=opensearch_index,
132141
os_domain_ep=opensearch_domain,
133-
os_http_auth=http_auth)
142+
os_http_auth=http_auth,
143+
model_id=model_id)
134144

135145
@logger.inject_lambda_context(log_event=True)
136146
@tracer.capture_lambda_handler
137147
@metrics.log_metrics(capture_cold_start_metric=True)
138148
def handler(event, context: LambdaContext) -> dict:
139-
149+
print(f'event {event}')
140150
# if the secret id is not provided
141151
# uses username password
142152
if opensearch_secret_id != 'NONE': # nosec
@@ -151,33 +161,61 @@ def handler(event, context: LambdaContext) -> dict:
151161
session_token=credentials.token
152162
)
153163
job_id = event[0]['s3_transformer_result']['Payload']['jobid']
164+
modelid = event[0]['s3_transformer_result']['Payload']['modelid']
165+
166+
print(f' model id :: {modelid}')
154167

155168
logger.set_correlation_id(job_id)
156169
metrics.add_metadata(key='correlationId', value=job_id)
157170
tracer.put_annotation(key="correlationId", value=job_id)
158171

159172
files = []
160173
for transformed_file in event:
161-
files.append({'name':transformed_file['name'], 'status':transformed_file['s3_transformer_result']['Payload']['status']})
174+
files.append({'name':transformed_file['name'],
175+
'status':transformed_file['s3_transformer_result']['Payload']['status'],
176+
'imageurl':''})
162177
updateIngestionJobStatus({'jobid': job_id, 'files': files})
163178

179+
180+
164181
print(f'Loading txt raw files from {bucket_name}')
165182

166183
docs = []
184+
185+
# Images are stored in s3 with presigned url, embeddings is not required.
167186

168187
for transformed_file in event:
188+
print(f" staus :: {transformed_file['s3_transformer_result']['Payload']['status']}")
169189
if transformed_file['s3_transformer_result']['Payload']['status'] == 'File transformed':
170190
filename = transformed_file['s3_transformer_result']['Payload']['name']
171-
loader = S3TxtFileLoaderInMemory(bucket_name, filename)
172-
sub_docs = loader.load()
173-
for doc in sub_docs:
174-
doc.metadata['source'] = filename
175-
docs.extend(sub_docs)
191+
name, extension = os.path.splitext(filename)
192+
print(f" the name {name} and extension {extension}")
193+
# TODO: check file format , if pdf then read raw text from output bucket and update docs[]
194+
# if csv|image then read file from input bucket using langchain document loader and update docs[]
195+
if(extension == '.pdf'):
196+
loader = S3TxtFileLoaderInMemory(bucket_name, filename)
197+
sub_docs = loader.load()
198+
for doc in sub_docs:
199+
doc.metadata['source'] = filename
200+
docs.extend(sub_docs)
201+
if(extension == '.jpg' or extension == '.jpeg' or extension == '.png'):
202+
# Try adding text to document
203+
#image_detal_file is created by aws rekognition
204+
img_load = image_loader(bucket_name, f"{name}-resized.png",f"{name}.txt")
205+
sub_docs = img_load.load()
206+
for doc in sub_docs:
207+
doc.metadata['source'] = filename
208+
docs.extend(sub_docs)
209+
url = img_load.get_presigned_url()
210+
print(f" url set :: {url} ")
211+
print(f" prepare os object ")
212+
os_document = img_load.prepare_document_for_direct_load()
213+
176214

177215
if not docs:
178-
return {
179-
'status':'nothing to ingest'
180-
}
216+
return {
217+
'status':'nothing to ingest'
218+
}
181219

182220
text_splitter = RecursiveCharacterTextSplitter(
183221
# Set a really small chunk size, just to show.
@@ -192,18 +230,20 @@ def handler(event, context: LambdaContext) -> dict:
192230
# we can augment data here probably (PII present ? ...)
193231
for doc in docs:
194232
doc.metadata['timestamp'] = time.time()
195-
doc.metadata['embeddings_model'] = 'amazon.titan-embed-text-v1'
233+
# doc.metadata['embeddings_model'] = 'amazon.titan-embed-text-v1'
234+
doc.metadata['embeddings_model'] = modelid
196235
chunks = text_splitter.create_documents([doc.page_content for doc in docs], metadatas=[doc.metadata for doc in docs])
197236

198237
db_shards = (len(chunks) // MAX_OS_DOCS_PER_PUT) + 1
199-
print(f'Loading chunks into vector store ... using {db_shards} shards')
200238
shards = np.array_split(chunks, db_shards)
201239

202240
# first check if index exists, if it does then call the add_documents function
203241
# otherwise call the from_documents function which would first create the index
204242
# and then do a bulk add. Both add_documents and from_documents do a bulk add
205243
# but it is important to call from_documents first so that the index is created
206244
# correctly for K-NN
245+
246+
print(f'check if index exists shards')
207247
try:
208248
index_exists = check_if_index_exists(opensearch_index,
209249
aws_region,
@@ -218,19 +258,27 @@ def handler(event, context: LambdaContext) -> dict:
218258
'status':'failed'
219259
}
220260

221-
if opensearch_api_name == "es":
222-
process_documents_in_es(index_exists, shards, http_auth)
223-
elif opensearch_api_name == "aoss":
224-
process_documents_in_aoss(index_exists, shards, http_auth)
261+
print(f'job_id :: {job_id}')
262+
if(job_id=="101"):
263+
print(f'running for job_id 101, use os directly')
264+
create_index_for_image(os_document)
265+
else:
266+
print(f'Loading chunks into vector store ... using {db_shards} shards')
267+
if opensearch_api_name == "es":
268+
process_documents_in_es(index_exists, shards, http_auth,modelid)
269+
elif opensearch_api_name == "aoss":
270+
process_documents_in_aoss(index_exists, shards, http_auth,modelid)
225271

272+
226273

227274
for file in files:
228275
if file['status'] == 'File transformed':
229-
file['status'] = 'Ingested'
276+
file['status'] = 'Ingested'
277+
file['imageurl'] = url
230278
else:
231279
file['status'] = 'Error_'+file['status']
232280
updateIngestionJobStatus({'jobid': job_id, 'files': files})
233281

234282
return {
235283
'status':'succeed'
236-
}
284+
}

0 commit comments

Comments
 (0)