77from typing import Callable , List , Literal , Optional , Sequence , Union
88
99import boto3
10+ from botocore .config import Config
1011from pydantic import StrictStr
1112from tqdm import tqdm
1213
3334from feast .version import get_version
3435
3536DEFAULT_BATCH_SIZE = 10_000
37+ DEFAULT_TIMEOUT = 600
38+ LAMBDA_TIMEOUT_RETRIES = 5
3639
3740logger = logging .getLogger (__name__ )
3841
@@ -52,11 +55,16 @@ class LambdaMaterializationEngineConfig(FeastConfigBaseModel):
5255
5356@dataclass
5457class LambdaMaterializationJob (MaterializationJob ):
55- def __init__ (self , job_id : str , status : MaterializationJobStatus ) -> None :
58+ def __init__ (
59+ self ,
60+ job_id : str ,
61+ status : MaterializationJobStatus ,
62+ error : Optional [BaseException ] = None ,
63+ ) -> None :
5664 super ().__init__ ()
5765 self ._job_id : str = job_id
5866 self ._status = status
59- self ._error = None
67+ self ._error = error
6068
6169 def status (self ) -> MaterializationJobStatus :
6270 return self ._status
@@ -97,7 +105,7 @@ def update(
97105 PackageType = "Image" ,
98106 Role = self .repo_config .batch_engine .lambda_role ,
99107 Code = {"ImageUri" : self .repo_config .batch_engine .materialization_image },
100- Timeout = 600 ,
108+ Timeout = DEFAULT_TIMEOUT ,
101109 Tags = {
102110 "feast-owned" : "True" ,
103111 "project" : project ,
@@ -149,7 +157,8 @@ def __init__(
149157 self .lambda_name = f"feast-materialize-{ self .repo_config .project } "
150158 if len (self .lambda_name ) > 64 :
151159 self .lambda_name = self .lambda_name [:64 ]
152- self .lambda_client = boto3 .client ("lambda" )
160+ config = Config (read_timeout = DEFAULT_TIMEOUT + 10 )
161+ self .lambda_client = boto3 .client ("lambda" , config = config )
153162
154163 def materialize (
155164 self , registry , tasks : List [MaterializationTask ]
@@ -200,47 +209,83 @@ def _materialize_one(
200209 )
201210
202211 paths = offline_job .to_remote_storage ()
203- max_workers = len (paths ) if len (paths ) <= 20 else 20
204- executor = ThreadPoolExecutor (max_workers = max_workers )
205- futures = []
206-
207- for path in paths :
208- payload = {
209- FEATURE_STORE_YAML_ENV_NAME : self .feature_store_base64 ,
210- "view_name" : feature_view .name ,
211- "view_type" : "batch" ,
212- "path" : path ,
213- }
214- # Invoke a lambda to materialize this file.
215-
216- logger .info ("Invoking materialization for %s" , path )
217- futures .append (
218- executor .submit (
219- self .lambda_client .invoke ,
220- FunctionName = self .lambda_name ,
221- InvocationType = "RequestResponse" ,
222- Payload = json .dumps (payload ),
223- )
212+ if (num_files := len (paths )) == 0 :
213+ logger .warning ("No values to update for the given time range." )
214+ return LambdaMaterializationJob (
215+ job_id = job_id , status = MaterializationJobStatus .SUCCEEDED
224216 )
217+ else :
218+ max_workers = num_files if num_files <= 20 else 20
219+ executor = ThreadPoolExecutor (max_workers = max_workers )
220+ futures = []
221+
222+ for path in paths :
223+ payload = {
224+ FEATURE_STORE_YAML_ENV_NAME : self .feature_store_base64 ,
225+ "view_name" : feature_view .name ,
226+ "view_type" : "batch" ,
227+ "path" : path ,
228+ }
229+ # Invoke a lambda to materialize this file.
230+
231+ logger .info ("Invoking materialization for %s" , path )
232+ futures .append (
233+ executor .submit (
234+ self .invoke_with_retries ,
235+ FunctionName = self .lambda_name ,
236+ InvocationType = "RequestResponse" ,
237+ Payload = json .dumps (payload ),
238+ )
239+ )
225240
226- done , not_done = wait (futures )
227- logger .info ("Done: %s Not Done: %s" , done , not_done )
228- for f in done :
229- response = f . result ()
230- output = json . loads ( response [ "Payload" ]. read () )
241+ done , not_done = wait (futures )
242+ logger .info ("Done: %s Not Done: %s" , done , not_done )
243+ errors = []
244+ for f in done :
245+ response , payload = f . result ( )
231246
232- logger .info (
233- f"Ingested task; request id { response ['ResponseMetadata' ]['RequestId' ]} , "
234- f"Output: { output } "
235- )
247+ logger .info (
248+ f"Ingested task; request id { response ['ResponseMetadata' ]['RequestId' ]} , "
249+ f"Output: { payload } "
250+ )
251+ if "errorMessage" in payload .keys ():
252+ errors .append (payload ["errorMessage" ])
236253
237- for f in not_done :
238- response = f .result ()
239- logger .error (f"Ingestion failed: { response } " )
254+ for f in not_done :
255+ response , payload = f .result ()
256+ logger .error (f"Ingestion failed: { response = } , { payload = } " )
240257
241- return LambdaMaterializationJob (
242- job_id = job_id ,
243- status = MaterializationJobStatus .SUCCEEDED
244- if not not_done
245- else MaterializationJobStatus .ERROR ,
246- )
258+ if len (not_done ) == 0 and len (errors ) == 0 :
259+ return LambdaMaterializationJob (
260+ job_id = job_id , status = MaterializationJobStatus .SUCCEEDED
261+ )
262+ else :
263+ return LambdaMaterializationJob (
264+ job_id = job_id ,
265+ status = MaterializationJobStatus .ERROR ,
266+ error = RuntimeError (
267+ f"Lambda functions did not finish successfully: { errors } "
268+ ),
269+ )
270+
271+ def invoke_with_retries (self , ** kwargs ):
272+ """Invoke the Lambda function and retry if it times out.
273+
274+ The Lambda function may time out initially if many values are updated
275+ and DynamoDB throttles requests. As soon as the DynamoDB tables
276+ are scaled up, the Lambda function can succeed upon retry with higher
277+ throughput.
278+
279+ """
280+ retries = 0
281+ while retries < LAMBDA_TIMEOUT_RETRIES :
282+ response = self .lambda_client .invoke (** kwargs )
283+ payload = json .loads (response ["Payload" ].read ()) or {}
284+ if "Task timed out after" not in payload .get ("errorMessage" , "" ):
285+ break
286+ retries += 1
287+ logger .warning (
288+ "Retrying lambda function after lambda timeout in request"
289+ f"{ response ['ResponseMetadata' ]['RequestId' ]} "
290+ )
291+ return response , payload
0 commit comments