@@ -14,7 +14,6 @@ import (
1414 "github.com/sirupsen/logrus"
1515 log "github.com/sirupsen/logrus"
1616 "go.amzn.com/lambda/core/directinvoke"
17- "go.amzn.com/lambda/fatalerror"
1817 "go.amzn.com/lambda/interop"
1918 "go.amzn.com/lambda/metering"
2019 "go.amzn.com/lambda/rapi/model"
@@ -60,13 +59,14 @@ func NewInteropServer(server *rapidcore.Server, ls *LocalStackAdapter) *CustomIn
6059 return & CustomInteropServer {
6160 Server : server ,
6261 localStackAdapter : ls ,
62+ mutex : & sync.Mutex {},
6363 }
6464}
6565
6666type CustomInteropServer struct {
6767 * rapidcore.Server
6868 localStackAdapter * LocalStackAdapter
69- initOnce sync.Once
69+ mutex * sync.Mutex
7070}
7171
7272func (c * CustomInteropServer ) Invoke (responseWriter http.ResponseWriter , invoke * interop.Invoke ) error {
@@ -77,69 +77,78 @@ func (c *CustomInteropServer) Invoke(responseWriter http.ResponseWriter, invoke
7777 defer close (releaseRespChan )
7878
7979 go func () {
80- _ , err := c .Server .Reserve (invoke .ID , invoke .TraceID , invoke .LambdaSegmentID )
81- if err != nil && ! errors . Is ( err , rapidcore . ErrAlreadyReserved ) {
80+ reserveResp , err := c .Server .Reserve (invoke .ID , invoke .TraceID , invoke .LambdaSegmentID )
81+ if err != nil {
8282 releaseRespChan <- err
8383 return
8484 }
8585
86- invoke .DeadlineNs = fmt .Sprintf ("%d" , metering .Monotime ()+ c .Server .GetInvokeTimeout ().Nanoseconds ())
86+ invoke .DeadlineNs = fmt .Sprintf ("%d" , metering .Monotime ()+ reserveResp .Token .FunctionTimeout .Nanoseconds ())
87+
88+ // Wait for initialization to complete
89+ if err := c .Server .AwaitInitialized (); err != nil {
90+ switch err {
91+ case rapidcore .ErrInitDoneFailed :
92+ // Init failed, reset and continue with suppressed init
93+ if _ , resetErr := c .Server .Reset ("InitFailed" , 2000 ); resetErr != nil {
94+ log .Errorf ("Reset failed: %v" , resetErr )
95+ }
96+ // Reserve again after reset for suppressed init
97+ if _ , reserveErr := c .Server .Reserve (invoke .ID , invoke .TraceID , invoke .LambdaSegmentID ); reserveErr != nil {
98+ releaseRespChan <- reserveErr
99+ return
100+ }
101+ default :
102+ releaseRespChan <- err
103+ return
104+ }
105+ }
106+
87107 go func () {
88108 isDirect := directinvoke .MaxDirectResponseSize > interop .MaxPayloadSize
89109 if err := c .Server .FastInvoke (responseWriter , invoke , isDirect ); err != nil {
90110 log .Debugf ("FastInvoke() error: %s" , err )
91111 }
92112 }()
93113
94- // Waits for an invocation to finish before calling Release()
95114 _ , err = c .Server .AwaitRelease ()
96115 if err != nil {
97- log .Debugf ("AwaitRelease() error: %s" , err )
98116 switch err {
99117 case rapidcore .ErrReleaseReservationDone :
100- // not an error, expected return value when Reset is called
118+ // Expected when Reset is called
119+ releaseRespChan <- nil
101120 return
102121 case rapidcore .ErrInitDoneFailed , rapidcore .ErrInvokeDoneFailed :
103- c .Server .Reset ("ReleaseFail" , 2000 )
122+ if _ , resetErr := c .Server .Reset ("ReleaseFail" , 2000 ); resetErr != nil {
123+ log .Errorf ("Reset failed: %v" , resetErr )
124+ }
125+ releaseRespChan <- err
126+ return
127+ default :
128+ if _ , resetErr := c .Server .Reset ("UnexpectedError" , 2000 ); resetErr != nil {
129+ log .Errorf ("Reset failed: %v" , resetErr )
130+ }
131+ releaseRespChan <- err
132+ return
104133 }
105134 }
106- releaseRespChan <- err
135+
136+ releaseRespChan <- nil
107137 }()
108138
109139 select {
110140 case err := <- releaseRespChan :
111- if err != nil {
112- return err
113- }
141+ return err
114142 case <- ctx .Done ():
115143 if errors .Is (ctx .Err (), context .DeadlineExceeded ) {
116- c .Server .Reset ("Timeout" , 2000 )
144+ if _ , resetErr := c .Server .Reset ("Timeout" , 2000 ); resetErr != nil {
145+ log .Errorf ("Reset failed: %v" , resetErr )
146+ }
117147 return rapidcore .ErrInvokeTimeout
118148 }
119149 }
120150
121- return c .Server .Release ()
122- }
123-
124- func (c * CustomInteropServer ) AwaitInitialized () error {
125- err := c .Server .AwaitInitialized ()
126- go func () {
127- state , stateErr := c .InternalState ()
128- if stateErr != nil {
129- return
130- }
131-
132- doneResponse := & interop.Done {}
133- if state .FirstFatalError != "" {
134- doneResponse .ErrorType = fatalerror .GetValidRuntimeOrFunctionErrorType (state .FirstFatalError )
135- }
136-
137- c .InitDoneChan <- rapidcore.DoneWithState {
138- Done : doneResponse ,
139- State : c .InternalStateGetter (),
140- }
141- }()
142- return err
151+ return nil
143152}
144153
145154func (c * CustomInteropServer ) SendInitErrorResponse (resp * interop.ErrorInvokeResponse ) error {
0 commit comments