@@ -19,7 +19,6 @@ package main
1919import (
2020 "fmt"
2121 "io/ioutil"
22- "log"
2322 "net/http"
2423 "os"
2524 "os/exec"
@@ -28,9 +27,12 @@ import (
2827 "sync"
2928 "time"
3029
30+ "github.com/google/uuid"
3131 "github.com/kelseyhightower/envconfig"
32+ "go.uber.org/zap"
3233
3334 "github.com/triggermesh/aws-custom-runtime/pkg/converter"
35+ "github.com/triggermesh/aws-custom-runtime/pkg/logger"
3436 "github.com/triggermesh/aws-custom-runtime/pkg/metrics"
3537 "github.com/triggermesh/aws-custom-runtime/pkg/sender"
3638)
@@ -62,7 +64,7 @@ type Specification struct {
6264 // Request body size limit, Mb
6365 RequestSizeLimit int64 `envconfig:"request_size_limit" default:"5"`
6466 // Funtions deadline, seconds
65- FunctionTTL int64 `envconfig:"function_ttl" default:"10 "`
67+ FunctionTTL time. Duration `envconfig:"function_ttl" default:"10s "`
6668 // Lambda runtime API port for functions
6769 InternalAPIport string `envconfig:"internal_api_port" default:"80"`
6870 // Lambda API port to put function requests and get results
@@ -77,14 +79,15 @@ type Handler struct {
7779 sender * sender.Sender
7880 converter converter.Converter
7981 reporter * metrics.EventProcessingStatsReporter
82+ logger * zap.SugaredLogger
8083
8184 requestSizeLimit int64
82- functionTTL int64
85+ functionTTL time. Duration
8386}
8487
8588type message struct {
8689 id string
87- deadline int64
90+ deadline time. Time
8891 data []byte
8992 context map [string ]string
9093 statusCode int
@@ -114,6 +117,7 @@ func (h *Handler) serve(w http.ResponseWriter, r *http.Request) {
114117 body , err := ioutil .ReadAll (http .MaxBytesReader (w , r .Body , requestSizeLimitInBytes ))
115118 if err != nil {
116119 h .reporter .ReportProcessingError (false , eventTypeTag , eventSrcTag )
120+ h .logger .Error ("Request exceeds allowed size limit, rejecting" )
117121 http .Error (w , err .Error (), http .StatusRequestEntityTooLarge )
118122 return
119123 }
@@ -122,34 +126,37 @@ func (h *Handler) serve(w http.ResponseWriter, r *http.Request) {
122126 req , context , err := h .converter .Request (body , r .Header )
123127 if err != nil {
124128 h .reporter .ReportProcessingError (false , eventTypeTag , eventSrcTag )
129+ h .logger .Errorf ("Cannot convert request: %v" , err )
125130 http .Error (w , err .Error (), http .StatusInternalServerError )
126131 return
127132 }
128133
129134 eventTypeTag , eventSrcTag = metrics .CETagsFromContext (context )
130135
131- result := enqueue (req , context , h .functionTTL * 1e+9 )
136+ h .logger .Debugf ("Enqueuing request: %+v, %s" , context , string (req ))
137+ result := enqueue (req , context , h .functionTTL )
138+ h .logger .Debugf ("Result: %+v, %s" , result .context , string (result .data ))
139+
132140 result .data , err = h .converter .Response (result .data )
133141 if err != nil {
134142 result .data = []byte (fmt .Sprintf ("Response conversion error: %v" , err ))
143+ h .logger .Errorf ("Cannot convert response: %v" , err )
135144 }
136145 if err := h .sender .Send (result .data , result .statusCode , w ); err != nil {
137146 h .reporter .ReportProcessingError (false , eventTypeTag , eventSrcTag )
138- log . Printf ( "! %s %s %v\n " , result . id , result . data , err )
147+ h . logger . Errorf ( "Cannot send response: %v" , err )
139148 return
140149 }
141150 h .reporter .ReportProcessingSuccess (eventTypeTag , eventSrcTag )
142151}
143152
144- func enqueue (request []byte , context map [string ]string , ttl int64 ) message {
145- now := time .Now ().UnixNano ()
153+ func enqueue (request []byte , context map [string ]string , ttl time.Duration ) message {
146154 task := message {
147- id : fmt . Sprintf ( "%d" , now ),
148- deadline : now + ttl ,
155+ id : uuid . New (). String ( ),
156+ deadline : time . Now (). Add ( ttl ) ,
149157 data : request ,
150158 context : context ,
151159 }
152- log .Printf ("<- %s\n " , task .id )
153160
154161 resultsChannel := make (chan message )
155162 mutex .Lock ()
@@ -161,7 +168,7 @@ func enqueue(request []byte, context map[string]string, ttl int64) message {
161168
162169 var resp message
163170 select {
164- case <- time .After (time . Duration ( ttl ) ):
171+ case <- time .After (ttl ):
165172 resp = message {
166173 id : task .id ,
167174 data : []byte (fmt .Sprintf ("Deadline is reached, data %s" , task .data )),
@@ -173,7 +180,6 @@ func enqueue(request []byte, context map[string]string, ttl int64) message {
173180 mutex .Lock ()
174181 delete (results , task .id )
175182 mutex .Unlock ()
176- log .Printf ("-> %s %d\n " , resp .id , resp .statusCode )
177183 return resp
178184}
179185
@@ -182,7 +188,7 @@ func getTask(w http.ResponseWriter, r *http.Request) {
182188
183189 // Dummy headers required by Rust client. Replace with something meaningful
184190 w .Header ().Set ("Lambda-Runtime-Aws-Request-Id" , task .id )
185- w .Header ().Set ("Lambda-Runtime-Deadline-Ms" , strconv .Itoa (int (task .deadline )))
191+ w .Header ().Set ("Lambda-Runtime-Deadline-Ms" , strconv .Itoa (int (task .deadline . UnixMilli () )))
186192 w .Header ().Set ("Lambda-Runtime-Invoked-Function-Arn" , "arn:aws:lambda:us-east-1:123456789012:function:custom-runtime" )
187193 w .Header ().Set ("Lambda-Runtime-Trace-Id" , "0" )
188194 for k , v := range task .context {
@@ -193,36 +199,39 @@ func getTask(w http.ResponseWriter, r *http.Request) {
193199 w .Write (task .data )
194200}
195201
196- func initError (w http.ResponseWriter , r * http.Request ) {
202+ func ( h * Handler ) initError (w http.ResponseWriter , r * http.Request ) {
197203 data , err := ioutil .ReadAll (r .Body )
198204 if err != nil {
199- log . Fatalln ( err )
205+ h . logger . Fatalf ( "Cannot read initialization error data: %v" , err )
200206 }
201207 defer r .Body .Close ()
202208
203- log . Fatalf ("Runtime initialization error: %s\n " , data )
209+ h . logger . Fatalf ("Runtime initialization error: %s" , data )
204210}
205211
206212func parsePath (query string ) (string , string , error ) {
207213 path := strings .TrimPrefix (query , awsEndpoint + "/invocation/" )
208214 request := strings .Split (path , "/" )
209215 if len (request ) != 2 {
210- return "" , "" , fmt .Errorf ("incorrect URL query size " )
216+ return "" , "" , fmt .Errorf ("incorrect URL path " )
211217 }
212218 return request [0 ], request [1 ], nil
213219}
214220
215- func responseHandler (w http.ResponseWriter , r * http.Request ) {
221+ func ( h * Handler ) responseHandler (w http.ResponseWriter , r * http.Request ) {
216222 id , kind , err := parsePath (r .URL .Path )
217223 if err != nil {
224+ h .logger .Errorf ("Runtime response error: %v" , err )
218225 w .WriteHeader (http .StatusBadRequest )
219226 w .Write ([]byte (err .Error ()))
220227 return
221228 }
222229
223230 data , err := ioutil .ReadAll (r .Body )
224231 if err != nil {
225- log .Printf ("! %s\n " , err )
232+ h .logger .Errorf ("Cannot read response data: %v" , err )
233+ w .WriteHeader (http .StatusBadRequest )
234+ w .Write ([]byte (err .Error ()))
226235 return
227236 }
228237 defer r .Body .Close ()
@@ -260,16 +269,16 @@ func ping(w http.ResponseWriter, r *http.Request) {
260269 w .Write ([]byte ("pong" ))
261270}
262271
263- func api () error {
272+ func ( h * Handler ) internalAPI () error {
264273 internalSocket , _ := os .LookupEnv ("AWS_LAMBDA_RUNTIME_API" )
265274 if internalSocket == "" {
266275 return fmt .Errorf ("AWS_LAMBDA_RUNTIME_API is not set" )
267276 }
268277
269278 apiRouter := http .NewServeMux ()
270- apiRouter .HandleFunc (awsEndpoint + "/init/error" , initError )
279+ apiRouter .HandleFunc (awsEndpoint + "/init/error" , h . initError )
271280 apiRouter .HandleFunc (awsEndpoint + "/invocation/next" , getTask )
272- apiRouter .HandleFunc (awsEndpoint + "/invocation/" , responseHandler )
281+ apiRouter .HandleFunc (awsEndpoint + "/invocation/" , h . responseHandler )
273282 apiRouter .HandleFunc ("/2018-06-01/ping" , ping )
274283
275284 err := http .ListenAndServe (internalSocket , apiRouter )
@@ -280,35 +289,37 @@ func api() error {
280289}
281290
282291func main () {
292+ logger := logger .New ()
293+
283294 // parse env
284295 var spec Specification
285296 if err := envconfig .Process ("" , & spec ); err != nil {
286- log .Fatalf ("Cannot process env variables: %v" , err )
297+ logger .Fatalf ("Cannot process env variables: %v" , err )
287298 }
288- log .Printf ("%+v\n " , spec )
289-
290- log .Println ("Setting up runtime env" )
299+ logger .Debugf ("Runtime specification: %+v" , spec )
300+ logger .Debug ("Setting up runtime env" )
291301 if err := setupEnv (spec .InternalAPIport ); err != nil {
292- log .Fatalf ("Cannot setup runime env: %v" , err )
302+ logger .Fatalf ("Cannot setup runime env: %v" , err )
293303 }
294304
295305 // create converter
296306 conv , err := converter .New (spec .ResponseFormat )
297307 if err != nil {
298- log .Fatalf ("Cannot create converter: %v" , err )
308+ logger .Fatalf ("Cannot create converter: %v" , err )
299309 }
300310
301311 // start metrics reporter
302312 mr , err := metrics .StatsExporter ()
303313 if err != nil {
304- log .Fatalf ("Cannot start stats exporter: %v" , err )
314+ logger .Fatalf ("Cannot start stats exporter: %v" , err )
305315 }
306316
307317 // setup sender
308318 handler := Handler {
309319 sender : sender .New (spec .Sink , conv .ContentType ()),
310320 converter : conv ,
311321 reporter : mr ,
322+ logger : logger ,
312323 requestSizeLimit : spec .RequestSizeLimit ,
313324 functionTTL : spec .FunctionTTL ,
314325 }
@@ -319,33 +330,33 @@ func main() {
319330 defer close (tasks )
320331
321332 // start Lambda API
322- log . Println ("Starting API" )
333+ logger . Debug ("Starting API" )
323334 go func () {
324- if err := api (); err != nil {
325- log .Fatalf ("Runtime internal API error: %v" , err )
335+ if err := handler . internalAPI (); err != nil {
336+ logger .Fatalf ("Runtime internal API error: %v" , err )
326337 }
327338 }()
328339
329340 // start invokers
330341 for i := 0 ; i < spec .NumberOfinvokers ; i ++ {
331- log . Println ("Starting bootstrap" , i + 1 )
342+ logger . Debug ("Starting bootstrap" , i + 1 )
332343 go func (i int ) {
333344 cmd := exec .Command ("sh" , "-c" , environment ["LAMBDA_TASK_ROOT" ]+ "/bootstrap" )
334345 cmd .Env = append (os .Environ (), fmt .Sprintf ("BOOTSTRAP_INDEX=%d" , i ))
335346 cmd .Stdout = os .Stdout
336347 cmd .Stderr = os .Stderr
337348 if err := cmd .Run (); err != nil {
338- log .Fatalf ("Cannot start bootstrap process: %v" , err )
349+ logger .Fatalf ("Cannot start bootstrap process: %v" , err )
339350 }
340351 }(i )
341352 }
342353
343354 // start external API
344355 taskRouter := http .NewServeMux ()
345356 taskRouter .Handle ("/" , http .HandlerFunc (handler .serve ))
346- log . Println ( "Listening... " )
357+ logger . Info ( "Runtime initialized " )
347358 err = http .ListenAndServe (":" + spec .ExternalAPIport , taskRouter )
348359 if err != nil && err != http .ErrServerClosed {
349- log .Fatalf ("Runtime external API error: %v" , err )
360+ logger .Fatalf ("Runtime external API error: %v" , err )
350361 }
351362}
0 commit comments