1 // Copyright 2011 Google Inc. All rights reserved. |
|
2 // Use of this source code is governed by the Apache 2.0 |
|
3 // license that can be found in the LICENSE file. |
|
4 |
|
5 // +build !appengine |
|
6 // +build !go1.7 |
|
7 |
|
8 package internal |
|
9 |
|
10 import ( |
|
11 "bytes" |
|
12 "errors" |
|
13 "fmt" |
|
14 "io/ioutil" |
|
15 "log" |
|
16 "net" |
|
17 "net/http" |
|
18 "net/url" |
|
19 "os" |
|
20 "runtime" |
|
21 "strconv" |
|
22 "strings" |
|
23 "sync" |
|
24 "sync/atomic" |
|
25 "time" |
|
26 |
|
27 "github.com/golang/protobuf/proto" |
|
28 netcontext "golang.org/x/net/context" |
|
29 |
|
30 basepb "google.golang.org/appengine/internal/base" |
|
31 logpb "google.golang.org/appengine/internal/log" |
|
32 remotepb "google.golang.org/appengine/internal/remote_api" |
|
33 ) |
|
34 |
|
35 const ( |
|
36 apiPath = "/rpc_http" |
|
37 defaultTicketSuffix = "/default.20150612t184001.0" |
|
38 ) |
|
39 |
|
40 var ( |
|
41 // Incoming headers. |
|
42 ticketHeader = http.CanonicalHeaderKey("X-AppEngine-API-Ticket") |
|
43 dapperHeader = http.CanonicalHeaderKey("X-Google-DapperTraceInfo") |
|
44 traceHeader = http.CanonicalHeaderKey("X-Cloud-Trace-Context") |
|
45 curNamespaceHeader = http.CanonicalHeaderKey("X-AppEngine-Current-Namespace") |
|
46 userIPHeader = http.CanonicalHeaderKey("X-AppEngine-User-IP") |
|
47 remoteAddrHeader = http.CanonicalHeaderKey("X-AppEngine-Remote-Addr") |
|
48 |
|
49 // Outgoing headers. |
|
50 apiEndpointHeader = http.CanonicalHeaderKey("X-Google-RPC-Service-Endpoint") |
|
51 apiEndpointHeaderValue = []string{"app-engine-apis"} |
|
52 apiMethodHeader = http.CanonicalHeaderKey("X-Google-RPC-Service-Method") |
|
53 apiMethodHeaderValue = []string{"/VMRemoteAPI.CallRemoteAPI"} |
|
54 apiDeadlineHeader = http.CanonicalHeaderKey("X-Google-RPC-Service-Deadline") |
|
55 apiContentType = http.CanonicalHeaderKey("Content-Type") |
|
56 apiContentTypeValue = []string{"application/octet-stream"} |
|
57 logFlushHeader = http.CanonicalHeaderKey("X-AppEngine-Log-Flush-Count") |
|
58 |
|
59 apiHTTPClient = &http.Client{ |
|
60 Transport: &http.Transport{ |
|
61 Proxy: http.ProxyFromEnvironment, |
|
62 Dial: limitDial, |
|
63 }, |
|
64 } |
|
65 |
|
66 defaultTicketOnce sync.Once |
|
67 defaultTicket string |
|
68 ) |
|
69 |
|
70 func apiURL() *url.URL { |
|
71 host, port := "appengine.googleapis.internal", "10001" |
|
72 if h := os.Getenv("API_HOST"); h != "" { |
|
73 host = h |
|
74 } |
|
75 if p := os.Getenv("API_PORT"); p != "" { |
|
76 port = p |
|
77 } |
|
78 return &url.URL{ |
|
79 Scheme: "http", |
|
80 Host: host + ":" + port, |
|
81 Path: apiPath, |
|
82 } |
|
83 } |
|
84 |
|
85 func handleHTTP(w http.ResponseWriter, r *http.Request) { |
|
86 c := &context{ |
|
87 req: r, |
|
88 outHeader: w.Header(), |
|
89 apiURL: apiURL(), |
|
90 } |
|
91 stopFlushing := make(chan int) |
|
92 |
|
93 ctxs.Lock() |
|
94 ctxs.m[r] = c |
|
95 ctxs.Unlock() |
|
96 defer func() { |
|
97 ctxs.Lock() |
|
98 delete(ctxs.m, r) |
|
99 ctxs.Unlock() |
|
100 }() |
|
101 |
|
102 // Patch up RemoteAddr so it looks reasonable. |
|
103 if addr := r.Header.Get(userIPHeader); addr != "" { |
|
104 r.RemoteAddr = addr |
|
105 } else if addr = r.Header.Get(remoteAddrHeader); addr != "" { |
|
106 r.RemoteAddr = addr |
|
107 } else { |
|
108 // Should not normally reach here, but pick a sensible default anyway. |
|
109 r.RemoteAddr = "127.0.0.1" |
|
110 } |
|
111 // The address in the headers will most likely be of these forms: |
|
112 // 123.123.123.123 |
|
113 // 2001:db8::1 |
|
114 // net/http.Request.RemoteAddr is specified to be in "IP:port" form. |
|
115 if _, _, err := net.SplitHostPort(r.RemoteAddr); err != nil { |
|
116 // Assume the remote address is only a host; add a default port. |
|
117 r.RemoteAddr = net.JoinHostPort(r.RemoteAddr, "80") |
|
118 } |
|
119 |
|
120 // Start goroutine responsible for flushing app logs. |
|
121 // This is done after adding c to ctx.m (and stopped before removing it) |
|
122 // because flushing logs requires making an API call. |
|
123 go c.logFlusher(stopFlushing) |
|
124 |
|
125 executeRequestSafely(c, r) |
|
126 c.outHeader = nil // make sure header changes aren't respected any more |
|
127 |
|
128 stopFlushing <- 1 // any logging beyond this point will be dropped |
|
129 |
|
130 // Flush any pending logs asynchronously. |
|
131 c.pendingLogs.Lock() |
|
132 flushes := c.pendingLogs.flushes |
|
133 if len(c.pendingLogs.lines) > 0 { |
|
134 flushes++ |
|
135 } |
|
136 c.pendingLogs.Unlock() |
|
137 go c.flushLog(false) |
|
138 w.Header().Set(logFlushHeader, strconv.Itoa(flushes)) |
|
139 |
|
140 // Avoid nil Write call if c.Write is never called. |
|
141 if c.outCode != 0 { |
|
142 w.WriteHeader(c.outCode) |
|
143 } |
|
144 if c.outBody != nil { |
|
145 w.Write(c.outBody) |
|
146 } |
|
147 } |
|
148 |
|
149 func executeRequestSafely(c *context, r *http.Request) { |
|
150 defer func() { |
|
151 if x := recover(); x != nil { |
|
152 logf(c, 4, "%s", renderPanic(x)) // 4 == critical |
|
153 c.outCode = 500 |
|
154 } |
|
155 }() |
|
156 |
|
157 http.DefaultServeMux.ServeHTTP(c, r) |
|
158 } |
|
159 |
|
160 func renderPanic(x interface{}) string { |
|
161 buf := make([]byte, 16<<10) // 16 KB should be plenty |
|
162 buf = buf[:runtime.Stack(buf, false)] |
|
163 |
|
164 // Remove the first few stack frames: |
|
165 // this func |
|
166 // the recover closure in the caller |
|
167 // That will root the stack trace at the site of the panic. |
|
168 const ( |
|
169 skipStart = "internal.renderPanic" |
|
170 skipFrames = 2 |
|
171 ) |
|
172 start := bytes.Index(buf, []byte(skipStart)) |
|
173 p := start |
|
174 for i := 0; i < skipFrames*2 && p+1 < len(buf); i++ { |
|
175 p = bytes.IndexByte(buf[p+1:], '\n') + p + 1 |
|
176 if p < 0 { |
|
177 break |
|
178 } |
|
179 } |
|
180 if p >= 0 { |
|
181 // buf[start:p+1] is the block to remove. |
|
182 // Copy buf[p+1:] over buf[start:] and shrink buf. |
|
183 copy(buf[start:], buf[p+1:]) |
|
184 buf = buf[:len(buf)-(p+1-start)] |
|
185 } |
|
186 |
|
187 // Add panic heading. |
|
188 head := fmt.Sprintf("panic: %v\n\n", x) |
|
189 if len(head) > len(buf) { |
|
190 // Extremely unlikely to happen. |
|
191 return head |
|
192 } |
|
193 copy(buf[len(head):], buf) |
|
194 copy(buf, head) |
|
195 |
|
196 return string(buf) |
|
197 } |
|
198 |
|
199 var ctxs = struct { |
|
200 sync.Mutex |
|
201 m map[*http.Request]*context |
|
202 bg *context // background context, lazily initialized |
|
203 // dec is used by tests to decorate the netcontext.Context returned |
|
204 // for a given request. This allows tests to add overrides (such as |
|
205 // WithAppIDOverride) to the context. The map is nil outside tests. |
|
206 dec map[*http.Request]func(netcontext.Context) netcontext.Context |
|
207 }{ |
|
208 m: make(map[*http.Request]*context), |
|
209 } |
|
210 |
|
211 // context represents the context of an in-flight HTTP request. |
|
212 // It implements the appengine.Context and http.ResponseWriter interfaces. |
|
213 type context struct { |
|
214 req *http.Request |
|
215 |
|
216 outCode int |
|
217 outHeader http.Header |
|
218 outBody []byte |
|
219 |
|
220 pendingLogs struct { |
|
221 sync.Mutex |
|
222 lines []*logpb.UserAppLogLine |
|
223 flushes int |
|
224 } |
|
225 |
|
226 apiURL *url.URL |
|
227 } |
|
228 |
|
229 var contextKey = "holds a *context" |
|
230 |
|
231 // fromContext returns the App Engine context or nil if ctx is not |
|
232 // derived from an App Engine context. |
|
233 func fromContext(ctx netcontext.Context) *context { |
|
234 c, _ := ctx.Value(&contextKey).(*context) |
|
235 return c |
|
236 } |
|
237 |
|
238 func withContext(parent netcontext.Context, c *context) netcontext.Context { |
|
239 ctx := netcontext.WithValue(parent, &contextKey, c) |
|
240 if ns := c.req.Header.Get(curNamespaceHeader); ns != "" { |
|
241 ctx = withNamespace(ctx, ns) |
|
242 } |
|
243 return ctx |
|
244 } |
|
245 |
|
246 func toContext(c *context) netcontext.Context { |
|
247 return withContext(netcontext.Background(), c) |
|
248 } |
|
249 |
|
250 func IncomingHeaders(ctx netcontext.Context) http.Header { |
|
251 if c := fromContext(ctx); c != nil { |
|
252 return c.req.Header |
|
253 } |
|
254 return nil |
|
255 } |
|
256 |
|
257 func ReqContext(req *http.Request) netcontext.Context { |
|
258 return WithContext(netcontext.Background(), req) |
|
259 } |
|
260 |
|
261 func WithContext(parent netcontext.Context, req *http.Request) netcontext.Context { |
|
262 ctxs.Lock() |
|
263 c := ctxs.m[req] |
|
264 d := ctxs.dec[req] |
|
265 ctxs.Unlock() |
|
266 |
|
267 if d != nil { |
|
268 parent = d(parent) |
|
269 } |
|
270 |
|
271 if c == nil { |
|
272 // Someone passed in an http.Request that is not in-flight. |
|
273 // We panic here rather than panicking at a later point |
|
274 // so that stack traces will be more sensible. |
|
275 log.Panic("appengine: NewContext passed an unknown http.Request") |
|
276 } |
|
277 return withContext(parent, c) |
|
278 } |
|
279 |
|
280 // DefaultTicket returns a ticket used for background context or dev_appserver. |
|
281 func DefaultTicket() string { |
|
282 defaultTicketOnce.Do(func() { |
|
283 if IsDevAppServer() { |
|
284 defaultTicket = "testapp" + defaultTicketSuffix |
|
285 return |
|
286 } |
|
287 appID := partitionlessAppID() |
|
288 escAppID := strings.Replace(strings.Replace(appID, ":", "_", -1), ".", "_", -1) |
|
289 majVersion := VersionID(nil) |
|
290 if i := strings.Index(majVersion, "."); i > 0 { |
|
291 majVersion = majVersion[:i] |
|
292 } |
|
293 defaultTicket = fmt.Sprintf("%s/%s.%s.%s", escAppID, ModuleName(nil), majVersion, InstanceID()) |
|
294 }) |
|
295 return defaultTicket |
|
296 } |
|
297 |
|
298 func BackgroundContext() netcontext.Context { |
|
299 ctxs.Lock() |
|
300 defer ctxs.Unlock() |
|
301 |
|
302 if ctxs.bg != nil { |
|
303 return toContext(ctxs.bg) |
|
304 } |
|
305 |
|
306 // Compute background security ticket. |
|
307 ticket := DefaultTicket() |
|
308 |
|
309 ctxs.bg = &context{ |
|
310 req: &http.Request{ |
|
311 Header: http.Header{ |
|
312 ticketHeader: []string{ticket}, |
|
313 }, |
|
314 }, |
|
315 apiURL: apiURL(), |
|
316 } |
|
317 |
|
318 // TODO(dsymonds): Wire up the shutdown handler to do a final flush. |
|
319 go ctxs.bg.logFlusher(make(chan int)) |
|
320 |
|
321 return toContext(ctxs.bg) |
|
322 } |
|
323 |
|
324 // RegisterTestRequest registers the HTTP request req for testing, such that |
|
325 // any API calls are sent to the provided URL. It returns a closure to delete |
|
326 // the registration. |
|
327 // It should only be used by aetest package. |
|
328 func RegisterTestRequest(req *http.Request, apiURL *url.URL, decorate func(netcontext.Context) netcontext.Context) (*http.Request, func()) { |
|
329 c := &context{ |
|
330 req: req, |
|
331 apiURL: apiURL, |
|
332 } |
|
333 ctxs.Lock() |
|
334 defer ctxs.Unlock() |
|
335 if _, ok := ctxs.m[req]; ok { |
|
336 log.Panic("req already associated with context") |
|
337 } |
|
338 if _, ok := ctxs.dec[req]; ok { |
|
339 log.Panic("req already associated with context") |
|
340 } |
|
341 if ctxs.dec == nil { |
|
342 ctxs.dec = make(map[*http.Request]func(netcontext.Context) netcontext.Context) |
|
343 } |
|
344 ctxs.m[req] = c |
|
345 ctxs.dec[req] = decorate |
|
346 |
|
347 return req, func() { |
|
348 ctxs.Lock() |
|
349 delete(ctxs.m, req) |
|
350 delete(ctxs.dec, req) |
|
351 ctxs.Unlock() |
|
352 } |
|
353 } |
|
354 |
|
355 var errTimeout = &CallError{ |
|
356 Detail: "Deadline exceeded", |
|
357 Code: int32(remotepb.RpcError_CANCELLED), |
|
358 Timeout: true, |
|
359 } |
|
360 |
|
361 func (c *context) Header() http.Header { return c.outHeader } |
|
362 |
|
363 // Copied from $GOROOT/src/pkg/net/http/transfer.go. Some response status |
|
364 // codes do not permit a response body (nor response entity headers such as |
|
365 // Content-Length, Content-Type, etc). |
|
366 func bodyAllowedForStatus(status int) bool { |
|
367 switch { |
|
368 case status >= 100 && status <= 199: |
|
369 return false |
|
370 case status == 204: |
|
371 return false |
|
372 case status == 304: |
|
373 return false |
|
374 } |
|
375 return true |
|
376 } |
|
377 |
|
378 func (c *context) Write(b []byte) (int, error) { |
|
379 if c.outCode == 0 { |
|
380 c.WriteHeader(http.StatusOK) |
|
381 } |
|
382 if len(b) > 0 && !bodyAllowedForStatus(c.outCode) { |
|
383 return 0, http.ErrBodyNotAllowed |
|
384 } |
|
385 c.outBody = append(c.outBody, b...) |
|
386 return len(b), nil |
|
387 } |
|
388 |
|
389 func (c *context) WriteHeader(code int) { |
|
390 if c.outCode != 0 { |
|
391 logf(c, 3, "WriteHeader called multiple times on request.") // error level |
|
392 return |
|
393 } |
|
394 c.outCode = code |
|
395 } |
|
396 |
|
397 func (c *context) post(body []byte, timeout time.Duration) (b []byte, err error) { |
|
398 hreq := &http.Request{ |
|
399 Method: "POST", |
|
400 URL: c.apiURL, |
|
401 Header: http.Header{ |
|
402 apiEndpointHeader: apiEndpointHeaderValue, |
|
403 apiMethodHeader: apiMethodHeaderValue, |
|
404 apiContentType: apiContentTypeValue, |
|
405 apiDeadlineHeader: []string{strconv.FormatFloat(timeout.Seconds(), 'f', -1, 64)}, |
|
406 }, |
|
407 Body: ioutil.NopCloser(bytes.NewReader(body)), |
|
408 ContentLength: int64(len(body)), |
|
409 Host: c.apiURL.Host, |
|
410 } |
|
411 if info := c.req.Header.Get(dapperHeader); info != "" { |
|
412 hreq.Header.Set(dapperHeader, info) |
|
413 } |
|
414 if info := c.req.Header.Get(traceHeader); info != "" { |
|
415 hreq.Header.Set(traceHeader, info) |
|
416 } |
|
417 |
|
418 tr := apiHTTPClient.Transport.(*http.Transport) |
|
419 |
|
420 var timedOut int32 // atomic; set to 1 if timed out |
|
421 t := time.AfterFunc(timeout, func() { |
|
422 atomic.StoreInt32(&timedOut, 1) |
|
423 tr.CancelRequest(hreq) |
|
424 }) |
|
425 defer t.Stop() |
|
426 defer func() { |
|
427 // Check if timeout was exceeded. |
|
428 if atomic.LoadInt32(&timedOut) != 0 { |
|
429 err = errTimeout |
|
430 } |
|
431 }() |
|
432 |
|
433 hresp, err := apiHTTPClient.Do(hreq) |
|
434 if err != nil { |
|
435 return nil, &CallError{ |
|
436 Detail: fmt.Sprintf("service bridge HTTP failed: %v", err), |
|
437 Code: int32(remotepb.RpcError_UNKNOWN), |
|
438 } |
|
439 } |
|
440 defer hresp.Body.Close() |
|
441 hrespBody, err := ioutil.ReadAll(hresp.Body) |
|
442 if hresp.StatusCode != 200 { |
|
443 return nil, &CallError{ |
|
444 Detail: fmt.Sprintf("service bridge returned HTTP %d (%q)", hresp.StatusCode, hrespBody), |
|
445 Code: int32(remotepb.RpcError_UNKNOWN), |
|
446 } |
|
447 } |
|
448 if err != nil { |
|
449 return nil, &CallError{ |
|
450 Detail: fmt.Sprintf("service bridge response bad: %v", err), |
|
451 Code: int32(remotepb.RpcError_UNKNOWN), |
|
452 } |
|
453 } |
|
454 return hrespBody, nil |
|
455 } |
|
456 |
|
457 func Call(ctx netcontext.Context, service, method string, in, out proto.Message) error { |
|
458 if ns := NamespaceFromContext(ctx); ns != "" { |
|
459 if fn, ok := NamespaceMods[service]; ok { |
|
460 fn(in, ns) |
|
461 } |
|
462 } |
|
463 |
|
464 if f, ctx, ok := callOverrideFromContext(ctx); ok { |
|
465 return f(ctx, service, method, in, out) |
|
466 } |
|
467 |
|
468 // Handle already-done contexts quickly. |
|
469 select { |
|
470 case <-ctx.Done(): |
|
471 return ctx.Err() |
|
472 default: |
|
473 } |
|
474 |
|
475 c := fromContext(ctx) |
|
476 if c == nil { |
|
477 // Give a good error message rather than a panic lower down. |
|
478 return errNotAppEngineContext |
|
479 } |
|
480 |
|
481 // Apply transaction modifications if we're in a transaction. |
|
482 if t := transactionFromContext(ctx); t != nil { |
|
483 if t.finished { |
|
484 return errors.New("transaction context has expired") |
|
485 } |
|
486 applyTransaction(in, &t.transaction) |
|
487 } |
|
488 |
|
489 // Default RPC timeout is 60s. |
|
490 timeout := 60 * time.Second |
|
491 if deadline, ok := ctx.Deadline(); ok { |
|
492 timeout = deadline.Sub(time.Now()) |
|
493 } |
|
494 |
|
495 data, err := proto.Marshal(in) |
|
496 if err != nil { |
|
497 return err |
|
498 } |
|
499 |
|
500 ticket := c.req.Header.Get(ticketHeader) |
|
501 // Use a test ticket under test environment. |
|
502 if ticket == "" { |
|
503 if appid := ctx.Value(&appIDOverrideKey); appid != nil { |
|
504 ticket = appid.(string) + defaultTicketSuffix |
|
505 } |
|
506 } |
|
507 // Fall back to use background ticket when the request ticket is not available in Flex or dev_appserver. |
|
508 if ticket == "" { |
|
509 ticket = DefaultTicket() |
|
510 } |
|
511 req := &remotepb.Request{ |
|
512 ServiceName: &service, |
|
513 Method: &method, |
|
514 Request: data, |
|
515 RequestId: &ticket, |
|
516 } |
|
517 hreqBody, err := proto.Marshal(req) |
|
518 if err != nil { |
|
519 return err |
|
520 } |
|
521 |
|
522 hrespBody, err := c.post(hreqBody, timeout) |
|
523 if err != nil { |
|
524 return err |
|
525 } |
|
526 |
|
527 res := &remotepb.Response{} |
|
528 if err := proto.Unmarshal(hrespBody, res); err != nil { |
|
529 return err |
|
530 } |
|
531 if res.RpcError != nil { |
|
532 ce := &CallError{ |
|
533 Detail: res.RpcError.GetDetail(), |
|
534 Code: *res.RpcError.Code, |
|
535 } |
|
536 switch remotepb.RpcError_ErrorCode(ce.Code) { |
|
537 case remotepb.RpcError_CANCELLED, remotepb.RpcError_DEADLINE_EXCEEDED: |
|
538 ce.Timeout = true |
|
539 } |
|
540 return ce |
|
541 } |
|
542 if res.ApplicationError != nil { |
|
543 return &APIError{ |
|
544 Service: *req.ServiceName, |
|
545 Detail: res.ApplicationError.GetDetail(), |
|
546 Code: *res.ApplicationError.Code, |
|
547 } |
|
548 } |
|
549 if res.Exception != nil || res.JavaException != nil { |
|
550 // This shouldn't happen, but let's be defensive. |
|
551 return &CallError{ |
|
552 Detail: "service bridge returned exception", |
|
553 Code: int32(remotepb.RpcError_UNKNOWN), |
|
554 } |
|
555 } |
|
556 return proto.Unmarshal(res.Response, out) |
|
557 } |
|
558 |
|
559 func (c *context) Request() *http.Request { |
|
560 return c.req |
|
561 } |
|
562 |
|
563 func (c *context) addLogLine(ll *logpb.UserAppLogLine) { |
|
564 // Truncate long log lines. |
|
565 // TODO(dsymonds): Check if this is still necessary. |
|
566 const lim = 8 << 10 |
|
567 if len(*ll.Message) > lim { |
|
568 suffix := fmt.Sprintf("...(length %d)", len(*ll.Message)) |
|
569 ll.Message = proto.String((*ll.Message)[:lim-len(suffix)] + suffix) |
|
570 } |
|
571 |
|
572 c.pendingLogs.Lock() |
|
573 c.pendingLogs.lines = append(c.pendingLogs.lines, ll) |
|
574 c.pendingLogs.Unlock() |
|
575 } |
|
576 |
|
577 var logLevelName = map[int64]string{ |
|
578 0: "DEBUG", |
|
579 1: "INFO", |
|
580 2: "WARNING", |
|
581 3: "ERROR", |
|
582 4: "CRITICAL", |
|
583 } |
|
584 |
|
585 func logf(c *context, level int64, format string, args ...interface{}) { |
|
586 if c == nil { |
|
587 panic("not an App Engine context") |
|
588 } |
|
589 s := fmt.Sprintf(format, args...) |
|
590 s = strings.TrimRight(s, "\n") // Remove any trailing newline characters. |
|
591 c.addLogLine(&logpb.UserAppLogLine{ |
|
592 TimestampUsec: proto.Int64(time.Now().UnixNano() / 1e3), |
|
593 Level: &level, |
|
594 Message: &s, |
|
595 }) |
|
596 log.Print(logLevelName[level] + ": " + s) |
|
597 } |
|
598 |
|
599 // flushLog attempts to flush any pending logs to the appserver. |
|
600 // It should not be called concurrently. |
|
601 func (c *context) flushLog(force bool) (flushed bool) { |
|
602 c.pendingLogs.Lock() |
|
603 // Grab up to 30 MB. We can get away with up to 32 MB, but let's be cautious. |
|
604 n, rem := 0, 30<<20 |
|
605 for ; n < len(c.pendingLogs.lines); n++ { |
|
606 ll := c.pendingLogs.lines[n] |
|
607 // Each log line will require about 3 bytes of overhead. |
|
608 nb := proto.Size(ll) + 3 |
|
609 if nb > rem { |
|
610 break |
|
611 } |
|
612 rem -= nb |
|
613 } |
|
614 lines := c.pendingLogs.lines[:n] |
|
615 c.pendingLogs.lines = c.pendingLogs.lines[n:] |
|
616 c.pendingLogs.Unlock() |
|
617 |
|
618 if len(lines) == 0 && !force { |
|
619 // Nothing to flush. |
|
620 return false |
|
621 } |
|
622 |
|
623 rescueLogs := false |
|
624 defer func() { |
|
625 if rescueLogs { |
|
626 c.pendingLogs.Lock() |
|
627 c.pendingLogs.lines = append(lines, c.pendingLogs.lines...) |
|
628 c.pendingLogs.Unlock() |
|
629 } |
|
630 }() |
|
631 |
|
632 buf, err := proto.Marshal(&logpb.UserAppLogGroup{ |
|
633 LogLine: lines, |
|
634 }) |
|
635 if err != nil { |
|
636 log.Printf("internal.flushLog: marshaling UserAppLogGroup: %v", err) |
|
637 rescueLogs = true |
|
638 return false |
|
639 } |
|
640 |
|
641 req := &logpb.FlushRequest{ |
|
642 Logs: buf, |
|
643 } |
|
644 res := &basepb.VoidProto{} |
|
645 c.pendingLogs.Lock() |
|
646 c.pendingLogs.flushes++ |
|
647 c.pendingLogs.Unlock() |
|
648 if err := Call(toContext(c), "logservice", "Flush", req, res); err != nil { |
|
649 log.Printf("internal.flushLog: Flush RPC: %v", err) |
|
650 rescueLogs = true |
|
651 return false |
|
652 } |
|
653 return true |
|
654 } |
|
655 |
|
656 const ( |
|
657 // Log flushing parameters. |
|
658 flushInterval = 1 * time.Second |
|
659 forceFlushInterval = 60 * time.Second |
|
660 ) |
|
661 |
|
662 func (c *context) logFlusher(stop <-chan int) { |
|
663 lastFlush := time.Now() |
|
664 tick := time.NewTicker(flushInterval) |
|
665 for { |
|
666 select { |
|
667 case <-stop: |
|
668 // Request finished. |
|
669 tick.Stop() |
|
670 return |
|
671 case <-tick.C: |
|
672 force := time.Now().Sub(lastFlush) > forceFlushInterval |
|
673 if c.flushLog(force) { |
|
674 lastFlush = time.Now() |
|
675 } |
|
676 } |
|
677 } |
|
678 } |
|
679 |
|
680 func ContextForTesting(req *http.Request) netcontext.Context { |
|
681 return toContext(&context{req: req}) |
|
682 } |
|