22
22
from helpers .update_ingestion_status import updateIngestionJobStatus
23
23
from langchain_community .embeddings import BedrockEmbeddings
24
24
from helpers .s3inmemoryloader import S3TxtFileLoaderInMemory
25
- from opensearchpy import OpenSearch , RequestsHttpConnection
25
+ from opensearchpy import RequestsHttpConnection
26
26
from langchain_community .vectorstores import OpenSearchVectorSearch
27
27
from langchain .text_splitter import RecursiveCharacterTextSplitter
28
28
import multiprocessing as mp
39
39
aws_region = boto3 .Session ().region_name
40
40
session = boto3 .session .Session ()
41
41
credentials = session .get_credentials ()
42
- sts_client = boto3 .client ("sts" )
43
42
44
- def get_bedrock_client (service_name = "bedrock-runtime" ):
45
- config = {}
46
- bedrock_config = config .get ("bedrock" , {})
47
- bedrock_enabled = bedrock_config .get ("enabled" , False )
48
- if not bedrock_enabled :
49
- print ("bedrock not enabled" )
50
- return None
51
-
52
- bedrock_config_data = {"service_name" : service_name }
53
- region_name = bedrock_config .get ("region" )
54
- endpoint_url = bedrock_config .get ("endpointUrl" )
55
- role_arn = bedrock_config .get ("roleArn" )
56
-
57
- if region_name :
58
- bedrock_config_data ["region_name" ] = region_name
59
- if endpoint_url :
60
- bedrock_config_data ["endpoint_url" ] = endpoint_url
61
-
62
- if role_arn :
63
- assumed_role_object = sts_client .assume_role (
64
- RoleArn = role_arn ,
65
- RoleSessionName = "AssumedRoleSession" ,
66
- )
67
-
68
- credentials = assumed_role_object ["Credentials" ]
69
- bedrock_config_data ["aws_access_key_id" ] = credentials ["AccessKeyId" ]
70
- bedrock_config_data ["aws_secret_access_key" ] = credentials ["SecretAccessKey" ]
71
- bedrock_config_data ["aws_session_token" ] = credentials ["SessionToken" ]
72
-
73
- return boto3 .client (** bedrock_config_data )
74
43
75
44
opensearch_secret_id = os .environ ['OPENSEARCH_SECRET_ID' ]
76
45
bucket_name = os .environ ['OUTPUT_BUCKET' ]
@@ -88,7 +57,7 @@ def get_bedrock_client(service_name="bedrock-runtime"):
88
57
INDEX_FILE = "index_file"
89
58
90
59
def process_documents_in_es (index_exists , shards , http_auth ):
91
- bedrock_client = get_bedrock_client ( )
60
+ bedrock_client = boto3 . client ( 'bedrock-runtime' )
92
61
embeddings = BedrockEmbeddings (client = bedrock_client )
93
62
94
63
if index_exists is False :
@@ -136,52 +105,32 @@ def process_documents_in_es(index_exists, shards, http_auth):
136
105
os_http_auth = http_auth )
137
106
138
107
def process_documents_in_aoss (index_exists , shards , http_auth ):
108
+ # Reference: https://python.langchain.com/docs/integrations/vectorstores/opensearch#using-aoss-amazon-opensearch-service-serverless
109
+ bedrock_client = boto3 .client ('bedrock-runtime' )
110
+ embeddings = BedrockEmbeddings (client = bedrock_client )
111
+
112
+ shard_start_index = 0
139
113
if index_exists is False :
140
- vector_db = OpenSearch (
141
- hosts = [{'host' : opensearch_domain .replace ("https://" , "" ), 'port' : 443 }],
142
- http_auth = http_auth ,
143
- use_ssl = True ,
144
- verify_certs = True ,
145
- connection_class = RequestsHttpConnection
114
+ OpenSearchVectorSearch .from_documents (
115
+ shards [0 ],
116
+ embeddings ,
117
+ opensearch_url = opensearch_domain ,
118
+ http_auth = http_auth ,
119
+ timeout = 300 ,
120
+ use_ssl = True ,
121
+ verify_certs = True ,
122
+ connection_class = RequestsHttpConnection ,
123
+ index_name = opensearch_index ,
124
+ engine = "faiss" ,
146
125
)
147
- index_body = {
148
- 'settings' : {
149
- "index.knn" : True
150
- },
151
- "mappings" : {
152
- "properties" : {
153
- "vector_field" : {
154
- "type" : "knn_vector" ,
155
- "dimension" : 1536 ,
156
- "method" : {
157
- "engine" : "nmslib" ,
158
- "space_type" : "cosinesimil" ,
159
- "name" : "hnsw" ,
160
- "parameters" : {},
161
- }
162
- },
163
- "id" : {
164
- "type" : "text" ,
165
- "fields" : {"keyword" : {"type" : "keyword" , "ignore_above" : 256 }},
166
- },
167
- }
168
- }
169
- }
170
- response = vector_db .indices .create (opensearch_index , body = index_body )
171
- print (response )
126
+ # we now need to start the loop below for the second shard
127
+ shard_start_index = 1
172
128
173
- print (f"index={ opensearch_index } Adding Documents" )
174
- bedrock_client = get_bedrock_client ()
175
- embeddings = BedrockEmbeddings (client = bedrock_client , model_id = "amazon.titan-embed-text-v1" )
176
- docsearch = OpenSearchVectorSearch (index_name = opensearch_index ,
177
- embedding_function = embeddings ,
178
- opensearch_url = opensearch_domain ,
179
- http_auth = http_auth ,
180
- use_ssl = True ,
181
- verify_certs = True ,
182
- connection_class = RequestsHttpConnection )
183
- for shard in shards :
184
- docsearch .add_documents (documents = shard )
129
+ for shard in shards [shard_start_index :]:
130
+ results = process_shard (shard = shard ,
131
+ os_index_name = opensearch_index ,
132
+ os_domain_ep = opensearch_domain ,
133
+ os_http_auth = http_auth )
185
134
186
135
@logger .inject_lambda_context (log_event = True )
187
136
@tracer .capture_lambda_handler
0 commit comments