@@ -4,22 +4,32 @@ package main
44
55import (
66 "context"
7+ "fmt"
8+ "net"
79 "os"
10+ "os/signal"
811 "runtime/debug"
912 "strconv"
1013 "strings"
14+ "sync"
1115
1216 "github.com/aws/aws-sdk-go-v2/config"
1317 "github.com/localstack/lambda-runtime-init/internal/aws/xray"
1418 "github.com/localstack/lambda-runtime-init/internal/bootstrap"
19+ "github.com/localstack/lambda-runtime-init/internal/events"
1520 "github.com/localstack/lambda-runtime-init/internal/hotreloading"
1621 "github.com/localstack/lambda-runtime-init/internal/logging"
1722 "github.com/localstack/lambda-runtime-init/internal/server"
23+
24+ "github.com/localstack/lambda-runtime-init/internal/supervisor"
1825 "github.com/localstack/lambda-runtime-init/internal/tracing"
1926 "github.com/localstack/lambda-runtime-init/internal/utils"
2027 log "github.com/sirupsen/logrus"
2128 "go.amzn.com/lambda/core/directinvoke"
29+ "go.amzn.com/lambda/interop"
2230 "go.amzn.com/lambda/rapidcore"
31+ supv "go.amzn.com/lambda/supervisor"
32+ "go.amzn.com/lambda/telemetry"
2333)
2434
2535func InitLsOpts () * server.LsOpts {
@@ -97,6 +107,7 @@ func main() {
97107 lsOpts := InitLsOpts ()
98108 functionConf := InitFunctionConfig ()
99109 awsEnvConf , _ := config .NewEnvConfig ()
110+ awsEnvConf .Credentials .AccountID = lsOpts .AccountId
100111
101112 UnsetLsEnvs ()
102113
@@ -136,6 +147,10 @@ func main() {
136147 log .Panicln ("Please specify a number for LOCALSTACK_MAX_PAYLOAD_SIZE" )
137148 }
138149 directinvoke .MaxDirectResponseSize = int64 (payloadSize )
150+ if directinvoke .MaxDirectResponseSize > interop .MaxPayloadSize {
151+ log .Infof ("Large response size detected (%d bytes), forcing streaming mode" , directinvoke .MaxDirectResponseSize )
152+ directinvoke .InvokeResponseMode = interop .InvokeResponseModeStreaming
153+ }
139154
140155 // download code archive if env variable is set
141156 if err := utils .DownloadCodeArchives (lsOpts .CodeArchives ); err != nil {
@@ -166,13 +181,26 @@ func main() {
166181 }
167182 }
168183
184+ ctx , stop := signal .NotifyContext (context .Background (), os .Interrupt )
185+ defer stop ()
186+
169187 // file watcher for hot-reloading
170- fileWatcherContext , cancelFileWatcher := context .WithCancel (context .Background ())
188+ fileWatcherContext , cancelFileWatcher := context .WithCancel (ctx )
189+ defer cancelFileWatcher ()
171190
191+ // Custom Interop Server
192+ defaultServer := rapidcore .NewServer ()
193+ lsAdapter := server .NewLocalStackAdapter (lsOpts .RuntimeEndpoint , lsOpts .RuntimeId )
194+ interopServer := server .NewInteropServer (defaultServer , lsAdapter )
195+
196+ // Services required for Sandbox environment
172197 logCollector := logging .NewLogCollector ()
173198 localStackLogsEgressApi := logging .NewLocalStackLogsEgressAPI (logCollector )
174199 tracer := tracing .NewLocalStackTracer ()
175- // localSupervisor := supervisor.NewLocalSupervisor()
200+ eventsListener := events .NewEventsListener (lsAdapter , & telemetry.NoOpEventsAPI {})
201+
202+ defaultSupv := supv .NewLocalSupervisor ()
203+ wrappedSupv := supervisor .NewLocalStackSupervisor (ctx , defaultSupv , eventsListener , interopServer .InternalState )
176204
177205 // build sandbox
178206 sandbox := rapidcore .
@@ -185,22 +213,26 @@ func main() {
185213 SetExtensionsFlag (true ).
186214 SetInitCachingFlag (true ).
187215 SetLogsEgressAPI (localStackLogsEgressApi ).
188- SetTracer (tracer )
216+ SetTracer (tracer ).
217+ SetInteropServer (interopServer ).
218+ SetSupervisor (wrappedSupv )
219+
220+ // SetEventsAPI(eventsListener)
189221
190222 // Externally set supervisor for metrics tracking
191223 // sandbox.SetSupervisor(localSupervisor)
192224 // sandbox.SetRuntimeFsRootPath(localSupervisor.RootPath)
193225
194226 // xray daemon
195- endpoint := "http://" + lsOpts .LocalstackIP + ":" + lsOpts .EdgePort
227+ endpoint := "http://" + net . JoinHostPort ( lsOpts .LocalstackIP , lsOpts .EdgePort )
196228 xrayConfig := xray .NewConfig (endpoint , xRayLogLevel )
197229 d := xray .NewDaemon (xrayConfig , lsOpts .EnableXRayTelemetry == "1" )
198- sandbox . AddShutdownFunc ( func () {
230+ defer func () {
199231 log .Debugln ("Shutting down xray daemon" )
200232 d .Stop ()
201233 log .Debugln ("Flushing segments in xray daemon" )
202234 d .Close ()
203- })
235+ }( )
204236 d .Run () // async
205237
206238 if len (handler ) > 0 {
@@ -214,37 +246,63 @@ func main() {
214246 // initialize all flows and start runtime API
215247 sandboxContext , internalStateFn := sandbox .Create ()
216248 // Populate our interop server
217- sandbox . DefaultInteropServer () .SetSandboxContext (sandboxContext )
218- sandbox . DefaultInteropServer () .SetInternalStateGetter (internalStateFn )
249+ interopServer .SetSandboxContext (sandboxContext )
250+ interopServer .SetInternalStateGetter (internalStateFn )
219251
220- localStackService := server .NewLocalStackAPI ( sandbox . LambdaInvokeAPI (), bootstrap , logCollector , xrayConfig .Endpoint , lsOpts , functionConf , awsEnvConf )
252+ localStackService := server .NewLocalStackService ( interopServer , logCollector , lsAdapter , xrayConfig .Endpoint , lsOpts , functionConf , awsEnvConf )
221253
222254 // start runtime init. It is important to start `InitHandler` synchronously because we need to ensure the
223255 // notification channels and status fields are properly initialized before `AwaitInitialized`
224256 log .Debugln ("Starting runtime init." )
225- localStackService .Initialize ()
257+ if err := localStackService .Initialize (bootstrap ); err != nil {
258+ log .Fatalf ("Failed to initialize runtime: %s" , err )
259+ return
260+ }
226261
227262 invokeServer := server .NewServer (lsOpts .InteropPort , localStackService )
228263 invokeServer .RegisterOnShutdown (localStackService .Close )
229264
230265 defer invokeServer .Shutdown (context .Background ())
231266
232- go invokeServer .ListenAndServe ()
233- go hotreloading .RunHotReloadingListener (sandbox .DefaultInteropServer (), lsOpts .HotReloadingPaths , fileWatcherContext , lsOpts .FileWatcherStrategy )
267+ var wg sync.WaitGroup
268+
269+ wg .Add (1 )
270+ go func () {
271+ defer wg .Done ()
272+ listener , err := net .Listen ("tcp" , fmt .Sprintf (":%s" , lsOpts .InteropPort ))
273+
274+ if err != nil {
275+ log .Fatalf ("failed to start listener for custom interops server: %s" , err )
276+ }
277+ go invokeServer .Serve (listener )
278+ log .Debugf ("LocalStack API gateway listening on %s" , listener .Addr ().String ())
279+ }()
280+
281+ wg .Add (1 )
282+ go func () {
283+ defer wg .Done ()
284+ hotreloading .RunHotReloadingListener (interopServer , lsOpts .HotReloadingPaths , fileWatcherContext , lsOpts .FileWatcherStrategy )
285+ }()
286+
287+ wg .Wait ()
234288
235289 log .Debugln ("Awaiting initialization of runtime init." )
236- if err := sandbox . DefaultInteropServer () .AwaitInitialized (); err != nil {
290+ if err := interopServer .AwaitInitialized (); err != nil {
237291 // Error cases: ErrInitDoneFailed or ErrInitResetReceived
238292 log .Errorln ("Runtime init failed to initialize: " + err .Error () + ". Exiting." )
239293 // NOTE: Sending the error status to LocalStack is handled beforehand in the custom_interop.go through the
240294 // callback SendInitErrorResponse because it contains the correct error response payload.
241- return
295+ // return
296+ } else {
297+ log .Debugln ("Completed initialization of runtime init. Sending status ready to LocalStack." )
298+ if err := localStackService .SendStatus (server .Ready , []byte {}); err != nil {
299+ log .Fatalln ("Failed to send status ready to LocalStack " + err .Error () + ". Exiting." )
300+ }
242301 }
243302
244- log . Debugln ( "Completed initialization of runtime init. Sending status ready to LocalStack." )
245- if err := localStackService . SendStatus ( server . Ready , [] byte {}); err != nil {
246- log . Fatalln ( "Failed to send status ready to LocalStack " + err . Error () + ". Exiting." )
303+ select {
304+ case <- ctx . Done ():
305+ case <- exitChan :
247306 }
248307
249- <- exitChan
250308}
0 commit comments