vendor/google.golang.org/appengine/internal/api_pre17.go
changeset 251 1c52a0eeb952
parent 250 c040f992052f
child 252 8399cd48111b
equal deleted inserted replaced
250:c040f992052f 251:1c52a0eeb952
     1 // Copyright 2011 Google Inc. All rights reserved.
       
     2 // Use of this source code is governed by the Apache 2.0
       
     3 // license that can be found in the LICENSE file.
       
     4 
       
     5 // +build !appengine
       
     6 // +build !go1.7
       
     7 
       
     8 package internal
       
     9 
       
    10 import (
       
    11 	"bytes"
       
    12 	"errors"
       
    13 	"fmt"
       
    14 	"io/ioutil"
       
    15 	"log"
       
    16 	"net"
       
    17 	"net/http"
       
    18 	"net/url"
       
    19 	"os"
       
    20 	"runtime"
       
    21 	"strconv"
       
    22 	"strings"
       
    23 	"sync"
       
    24 	"sync/atomic"
       
    25 	"time"
       
    26 
       
    27 	"github.com/golang/protobuf/proto"
       
    28 	netcontext "golang.org/x/net/context"
       
    29 
       
    30 	basepb "google.golang.org/appengine/internal/base"
       
    31 	logpb "google.golang.org/appengine/internal/log"
       
    32 	remotepb "google.golang.org/appengine/internal/remote_api"
       
    33 )
       
    34 
       
    35 const (
       
    36 	apiPath             = "/rpc_http"
       
    37 	defaultTicketSuffix = "/default.20150612t184001.0"
       
    38 )
       
    39 
       
    40 var (
       
    41 	// Incoming headers.
       
    42 	ticketHeader       = http.CanonicalHeaderKey("X-AppEngine-API-Ticket")
       
    43 	dapperHeader       = http.CanonicalHeaderKey("X-Google-DapperTraceInfo")
       
    44 	traceHeader        = http.CanonicalHeaderKey("X-Cloud-Trace-Context")
       
    45 	curNamespaceHeader = http.CanonicalHeaderKey("X-AppEngine-Current-Namespace")
       
    46 	userIPHeader       = http.CanonicalHeaderKey("X-AppEngine-User-IP")
       
    47 	remoteAddrHeader   = http.CanonicalHeaderKey("X-AppEngine-Remote-Addr")
       
    48 
       
    49 	// Outgoing headers.
       
    50 	apiEndpointHeader      = http.CanonicalHeaderKey("X-Google-RPC-Service-Endpoint")
       
    51 	apiEndpointHeaderValue = []string{"app-engine-apis"}
       
    52 	apiMethodHeader        = http.CanonicalHeaderKey("X-Google-RPC-Service-Method")
       
    53 	apiMethodHeaderValue   = []string{"/VMRemoteAPI.CallRemoteAPI"}
       
    54 	apiDeadlineHeader      = http.CanonicalHeaderKey("X-Google-RPC-Service-Deadline")
       
    55 	apiContentType         = http.CanonicalHeaderKey("Content-Type")
       
    56 	apiContentTypeValue    = []string{"application/octet-stream"}
       
    57 	logFlushHeader         = http.CanonicalHeaderKey("X-AppEngine-Log-Flush-Count")
       
    58 
       
    59 	apiHTTPClient = &http.Client{
       
    60 		Transport: &http.Transport{
       
    61 			Proxy: http.ProxyFromEnvironment,
       
    62 			Dial:  limitDial,
       
    63 		},
       
    64 	}
       
    65 
       
    66 	defaultTicketOnce sync.Once
       
    67 	defaultTicket     string
       
    68 )
       
    69 
       
    70 func apiURL() *url.URL {
       
    71 	host, port := "appengine.googleapis.internal", "10001"
       
    72 	if h := os.Getenv("API_HOST"); h != "" {
       
    73 		host = h
       
    74 	}
       
    75 	if p := os.Getenv("API_PORT"); p != "" {
       
    76 		port = p
       
    77 	}
       
    78 	return &url.URL{
       
    79 		Scheme: "http",
       
    80 		Host:   host + ":" + port,
       
    81 		Path:   apiPath,
       
    82 	}
       
    83 }
       
    84 
       
    85 func handleHTTP(w http.ResponseWriter, r *http.Request) {
       
    86 	c := &context{
       
    87 		req:       r,
       
    88 		outHeader: w.Header(),
       
    89 		apiURL:    apiURL(),
       
    90 	}
       
    91 	stopFlushing := make(chan int)
       
    92 
       
    93 	ctxs.Lock()
       
    94 	ctxs.m[r] = c
       
    95 	ctxs.Unlock()
       
    96 	defer func() {
       
    97 		ctxs.Lock()
       
    98 		delete(ctxs.m, r)
       
    99 		ctxs.Unlock()
       
   100 	}()
       
   101 
       
   102 	// Patch up RemoteAddr so it looks reasonable.
       
   103 	if addr := r.Header.Get(userIPHeader); addr != "" {
       
   104 		r.RemoteAddr = addr
       
   105 	} else if addr = r.Header.Get(remoteAddrHeader); addr != "" {
       
   106 		r.RemoteAddr = addr
       
   107 	} else {
       
   108 		// Should not normally reach here, but pick a sensible default anyway.
       
   109 		r.RemoteAddr = "127.0.0.1"
       
   110 	}
       
   111 	// The address in the headers will most likely be of these forms:
       
   112 	//	123.123.123.123
       
   113 	//	2001:db8::1
       
   114 	// net/http.Request.RemoteAddr is specified to be in "IP:port" form.
       
   115 	if _, _, err := net.SplitHostPort(r.RemoteAddr); err != nil {
       
   116 		// Assume the remote address is only a host; add a default port.
       
   117 		r.RemoteAddr = net.JoinHostPort(r.RemoteAddr, "80")
       
   118 	}
       
   119 
       
   120 	// Start goroutine responsible for flushing app logs.
       
   121 	// This is done after adding c to ctx.m (and stopped before removing it)
       
   122 	// because flushing logs requires making an API call.
       
   123 	go c.logFlusher(stopFlushing)
       
   124 
       
   125 	executeRequestSafely(c, r)
       
   126 	c.outHeader = nil // make sure header changes aren't respected any more
       
   127 
       
   128 	stopFlushing <- 1 // any logging beyond this point will be dropped
       
   129 
       
   130 	// Flush any pending logs asynchronously.
       
   131 	c.pendingLogs.Lock()
       
   132 	flushes := c.pendingLogs.flushes
       
   133 	if len(c.pendingLogs.lines) > 0 {
       
   134 		flushes++
       
   135 	}
       
   136 	c.pendingLogs.Unlock()
       
   137 	go c.flushLog(false)
       
   138 	w.Header().Set(logFlushHeader, strconv.Itoa(flushes))
       
   139 
       
   140 	// Avoid nil Write call if c.Write is never called.
       
   141 	if c.outCode != 0 {
       
   142 		w.WriteHeader(c.outCode)
       
   143 	}
       
   144 	if c.outBody != nil {
       
   145 		w.Write(c.outBody)
       
   146 	}
       
   147 }
       
   148 
       
   149 func executeRequestSafely(c *context, r *http.Request) {
       
   150 	defer func() {
       
   151 		if x := recover(); x != nil {
       
   152 			logf(c, 4, "%s", renderPanic(x)) // 4 == critical
       
   153 			c.outCode = 500
       
   154 		}
       
   155 	}()
       
   156 
       
   157 	http.DefaultServeMux.ServeHTTP(c, r)
       
   158 }
       
   159 
       
   160 func renderPanic(x interface{}) string {
       
   161 	buf := make([]byte, 16<<10) // 16 KB should be plenty
       
   162 	buf = buf[:runtime.Stack(buf, false)]
       
   163 
       
   164 	// Remove the first few stack frames:
       
   165 	//   this func
       
   166 	//   the recover closure in the caller
       
   167 	// That will root the stack trace at the site of the panic.
       
   168 	const (
       
   169 		skipStart  = "internal.renderPanic"
       
   170 		skipFrames = 2
       
   171 	)
       
   172 	start := bytes.Index(buf, []byte(skipStart))
       
   173 	p := start
       
   174 	for i := 0; i < skipFrames*2 && p+1 < len(buf); i++ {
       
   175 		p = bytes.IndexByte(buf[p+1:], '\n') + p + 1
       
   176 		if p < 0 {
       
   177 			break
       
   178 		}
       
   179 	}
       
   180 	if p >= 0 {
       
   181 		// buf[start:p+1] is the block to remove.
       
   182 		// Copy buf[p+1:] over buf[start:] and shrink buf.
       
   183 		copy(buf[start:], buf[p+1:])
       
   184 		buf = buf[:len(buf)-(p+1-start)]
       
   185 	}
       
   186 
       
   187 	// Add panic heading.
       
   188 	head := fmt.Sprintf("panic: %v\n\n", x)
       
   189 	if len(head) > len(buf) {
       
   190 		// Extremely unlikely to happen.
       
   191 		return head
       
   192 	}
       
   193 	copy(buf[len(head):], buf)
       
   194 	copy(buf, head)
       
   195 
       
   196 	return string(buf)
       
   197 }
       
   198 
       
   199 var ctxs = struct {
       
   200 	sync.Mutex
       
   201 	m  map[*http.Request]*context
       
   202 	bg *context // background context, lazily initialized
       
   203 	// dec is used by tests to decorate the netcontext.Context returned
       
   204 	// for a given request. This allows tests to add overrides (such as
       
   205 	// WithAppIDOverride) to the context. The map is nil outside tests.
       
   206 	dec map[*http.Request]func(netcontext.Context) netcontext.Context
       
   207 }{
       
   208 	m: make(map[*http.Request]*context),
       
   209 }
       
   210 
       
   211 // context represents the context of an in-flight HTTP request.
       
   212 // It implements the appengine.Context and http.ResponseWriter interfaces.
       
   213 type context struct {
       
   214 	req *http.Request
       
   215 
       
   216 	outCode   int
       
   217 	outHeader http.Header
       
   218 	outBody   []byte
       
   219 
       
   220 	pendingLogs struct {
       
   221 		sync.Mutex
       
   222 		lines   []*logpb.UserAppLogLine
       
   223 		flushes int
       
   224 	}
       
   225 
       
   226 	apiURL *url.URL
       
   227 }
       
   228 
       
   229 var contextKey = "holds a *context"
       
   230 
       
   231 // fromContext returns the App Engine context or nil if ctx is not
       
   232 // derived from an App Engine context.
       
   233 func fromContext(ctx netcontext.Context) *context {
       
   234 	c, _ := ctx.Value(&contextKey).(*context)
       
   235 	return c
       
   236 }
       
   237 
       
   238 func withContext(parent netcontext.Context, c *context) netcontext.Context {
       
   239 	ctx := netcontext.WithValue(parent, &contextKey, c)
       
   240 	if ns := c.req.Header.Get(curNamespaceHeader); ns != "" {
       
   241 		ctx = withNamespace(ctx, ns)
       
   242 	}
       
   243 	return ctx
       
   244 }
       
   245 
       
   246 func toContext(c *context) netcontext.Context {
       
   247 	return withContext(netcontext.Background(), c)
       
   248 }
       
   249 
       
   250 func IncomingHeaders(ctx netcontext.Context) http.Header {
       
   251 	if c := fromContext(ctx); c != nil {
       
   252 		return c.req.Header
       
   253 	}
       
   254 	return nil
       
   255 }
       
   256 
       
   257 func ReqContext(req *http.Request) netcontext.Context {
       
   258 	return WithContext(netcontext.Background(), req)
       
   259 }
       
   260 
       
   261 func WithContext(parent netcontext.Context, req *http.Request) netcontext.Context {
       
   262 	ctxs.Lock()
       
   263 	c := ctxs.m[req]
       
   264 	d := ctxs.dec[req]
       
   265 	ctxs.Unlock()
       
   266 
       
   267 	if d != nil {
       
   268 		parent = d(parent)
       
   269 	}
       
   270 
       
   271 	if c == nil {
       
   272 		// Someone passed in an http.Request that is not in-flight.
       
   273 		// We panic here rather than panicking at a later point
       
   274 		// so that stack traces will be more sensible.
       
   275 		log.Panic("appengine: NewContext passed an unknown http.Request")
       
   276 	}
       
   277 	return withContext(parent, c)
       
   278 }
       
   279 
       
   280 // DefaultTicket returns a ticket used for background context or dev_appserver.
       
   281 func DefaultTicket() string {
       
   282 	defaultTicketOnce.Do(func() {
       
   283 		if IsDevAppServer() {
       
   284 			defaultTicket = "testapp" + defaultTicketSuffix
       
   285 			return
       
   286 		}
       
   287 		appID := partitionlessAppID()
       
   288 		escAppID := strings.Replace(strings.Replace(appID, ":", "_", -1), ".", "_", -1)
       
   289 		majVersion := VersionID(nil)
       
   290 		if i := strings.Index(majVersion, "."); i > 0 {
       
   291 			majVersion = majVersion[:i]
       
   292 		}
       
   293 		defaultTicket = fmt.Sprintf("%s/%s.%s.%s", escAppID, ModuleName(nil), majVersion, InstanceID())
       
   294 	})
       
   295 	return defaultTicket
       
   296 }
       
   297 
       
   298 func BackgroundContext() netcontext.Context {
       
   299 	ctxs.Lock()
       
   300 	defer ctxs.Unlock()
       
   301 
       
   302 	if ctxs.bg != nil {
       
   303 		return toContext(ctxs.bg)
       
   304 	}
       
   305 
       
   306 	// Compute background security ticket.
       
   307 	ticket := DefaultTicket()
       
   308 
       
   309 	ctxs.bg = &context{
       
   310 		req: &http.Request{
       
   311 			Header: http.Header{
       
   312 				ticketHeader: []string{ticket},
       
   313 			},
       
   314 		},
       
   315 		apiURL: apiURL(),
       
   316 	}
       
   317 
       
   318 	// TODO(dsymonds): Wire up the shutdown handler to do a final flush.
       
   319 	go ctxs.bg.logFlusher(make(chan int))
       
   320 
       
   321 	return toContext(ctxs.bg)
       
   322 }
       
   323 
       
   324 // RegisterTestRequest registers the HTTP request req for testing, such that
       
   325 // any API calls are sent to the provided URL. It returns a closure to delete
       
   326 // the registration.
       
   327 // It should only be used by aetest package.
       
   328 func RegisterTestRequest(req *http.Request, apiURL *url.URL, decorate func(netcontext.Context) netcontext.Context) (*http.Request, func()) {
       
   329 	c := &context{
       
   330 		req:    req,
       
   331 		apiURL: apiURL,
       
   332 	}
       
   333 	ctxs.Lock()
       
   334 	defer ctxs.Unlock()
       
   335 	if _, ok := ctxs.m[req]; ok {
       
   336 		log.Panic("req already associated with context")
       
   337 	}
       
   338 	if _, ok := ctxs.dec[req]; ok {
       
   339 		log.Panic("req already associated with context")
       
   340 	}
       
   341 	if ctxs.dec == nil {
       
   342 		ctxs.dec = make(map[*http.Request]func(netcontext.Context) netcontext.Context)
       
   343 	}
       
   344 	ctxs.m[req] = c
       
   345 	ctxs.dec[req] = decorate
       
   346 
       
   347 	return req, func() {
       
   348 		ctxs.Lock()
       
   349 		delete(ctxs.m, req)
       
   350 		delete(ctxs.dec, req)
       
   351 		ctxs.Unlock()
       
   352 	}
       
   353 }
       
   354 
       
   355 var errTimeout = &CallError{
       
   356 	Detail:  "Deadline exceeded",
       
   357 	Code:    int32(remotepb.RpcError_CANCELLED),
       
   358 	Timeout: true,
       
   359 }
       
   360 
       
   361 func (c *context) Header() http.Header { return c.outHeader }
       
   362 
       
   363 // Copied from $GOROOT/src/pkg/net/http/transfer.go. Some response status
       
   364 // codes do not permit a response body (nor response entity headers such as
       
   365 // Content-Length, Content-Type, etc).
       
   366 func bodyAllowedForStatus(status int) bool {
       
   367 	switch {
       
   368 	case status >= 100 && status <= 199:
       
   369 		return false
       
   370 	case status == 204:
       
   371 		return false
       
   372 	case status == 304:
       
   373 		return false
       
   374 	}
       
   375 	return true
       
   376 }
       
   377 
       
   378 func (c *context) Write(b []byte) (int, error) {
       
   379 	if c.outCode == 0 {
       
   380 		c.WriteHeader(http.StatusOK)
       
   381 	}
       
   382 	if len(b) > 0 && !bodyAllowedForStatus(c.outCode) {
       
   383 		return 0, http.ErrBodyNotAllowed
       
   384 	}
       
   385 	c.outBody = append(c.outBody, b...)
       
   386 	return len(b), nil
       
   387 }
       
   388 
       
   389 func (c *context) WriteHeader(code int) {
       
   390 	if c.outCode != 0 {
       
   391 		logf(c, 3, "WriteHeader called multiple times on request.") // error level
       
   392 		return
       
   393 	}
       
   394 	c.outCode = code
       
   395 }
       
   396 
       
   397 func (c *context) post(body []byte, timeout time.Duration) (b []byte, err error) {
       
   398 	hreq := &http.Request{
       
   399 		Method: "POST",
       
   400 		URL:    c.apiURL,
       
   401 		Header: http.Header{
       
   402 			apiEndpointHeader: apiEndpointHeaderValue,
       
   403 			apiMethodHeader:   apiMethodHeaderValue,
       
   404 			apiContentType:    apiContentTypeValue,
       
   405 			apiDeadlineHeader: []string{strconv.FormatFloat(timeout.Seconds(), 'f', -1, 64)},
       
   406 		},
       
   407 		Body:          ioutil.NopCloser(bytes.NewReader(body)),
       
   408 		ContentLength: int64(len(body)),
       
   409 		Host:          c.apiURL.Host,
       
   410 	}
       
   411 	if info := c.req.Header.Get(dapperHeader); info != "" {
       
   412 		hreq.Header.Set(dapperHeader, info)
       
   413 	}
       
   414 	if info := c.req.Header.Get(traceHeader); info != "" {
       
   415 		hreq.Header.Set(traceHeader, info)
       
   416 	}
       
   417 
       
   418 	tr := apiHTTPClient.Transport.(*http.Transport)
       
   419 
       
   420 	var timedOut int32 // atomic; set to 1 if timed out
       
   421 	t := time.AfterFunc(timeout, func() {
       
   422 		atomic.StoreInt32(&timedOut, 1)
       
   423 		tr.CancelRequest(hreq)
       
   424 	})
       
   425 	defer t.Stop()
       
   426 	defer func() {
       
   427 		// Check if timeout was exceeded.
       
   428 		if atomic.LoadInt32(&timedOut) != 0 {
       
   429 			err = errTimeout
       
   430 		}
       
   431 	}()
       
   432 
       
   433 	hresp, err := apiHTTPClient.Do(hreq)
       
   434 	if err != nil {
       
   435 		return nil, &CallError{
       
   436 			Detail: fmt.Sprintf("service bridge HTTP failed: %v", err),
       
   437 			Code:   int32(remotepb.RpcError_UNKNOWN),
       
   438 		}
       
   439 	}
       
   440 	defer hresp.Body.Close()
       
   441 	hrespBody, err := ioutil.ReadAll(hresp.Body)
       
   442 	if hresp.StatusCode != 200 {
       
   443 		return nil, &CallError{
       
   444 			Detail: fmt.Sprintf("service bridge returned HTTP %d (%q)", hresp.StatusCode, hrespBody),
       
   445 			Code:   int32(remotepb.RpcError_UNKNOWN),
       
   446 		}
       
   447 	}
       
   448 	if err != nil {
       
   449 		return nil, &CallError{
       
   450 			Detail: fmt.Sprintf("service bridge response bad: %v", err),
       
   451 			Code:   int32(remotepb.RpcError_UNKNOWN),
       
   452 		}
       
   453 	}
       
   454 	return hrespBody, nil
       
   455 }
       
   456 
       
   457 func Call(ctx netcontext.Context, service, method string, in, out proto.Message) error {
       
   458 	if ns := NamespaceFromContext(ctx); ns != "" {
       
   459 		if fn, ok := NamespaceMods[service]; ok {
       
   460 			fn(in, ns)
       
   461 		}
       
   462 	}
       
   463 
       
   464 	if f, ctx, ok := callOverrideFromContext(ctx); ok {
       
   465 		return f(ctx, service, method, in, out)
       
   466 	}
       
   467 
       
   468 	// Handle already-done contexts quickly.
       
   469 	select {
       
   470 	case <-ctx.Done():
       
   471 		return ctx.Err()
       
   472 	default:
       
   473 	}
       
   474 
       
   475 	c := fromContext(ctx)
       
   476 	if c == nil {
       
   477 		// Give a good error message rather than a panic lower down.
       
   478 		return errNotAppEngineContext
       
   479 	}
       
   480 
       
   481 	// Apply transaction modifications if we're in a transaction.
       
   482 	if t := transactionFromContext(ctx); t != nil {
       
   483 		if t.finished {
       
   484 			return errors.New("transaction context has expired")
       
   485 		}
       
   486 		applyTransaction(in, &t.transaction)
       
   487 	}
       
   488 
       
   489 	// Default RPC timeout is 60s.
       
   490 	timeout := 60 * time.Second
       
   491 	if deadline, ok := ctx.Deadline(); ok {
       
   492 		timeout = deadline.Sub(time.Now())
       
   493 	}
       
   494 
       
   495 	data, err := proto.Marshal(in)
       
   496 	if err != nil {
       
   497 		return err
       
   498 	}
       
   499 
       
   500 	ticket := c.req.Header.Get(ticketHeader)
       
   501 	// Use a test ticket under test environment.
       
   502 	if ticket == "" {
       
   503 		if appid := ctx.Value(&appIDOverrideKey); appid != nil {
       
   504 			ticket = appid.(string) + defaultTicketSuffix
       
   505 		}
       
   506 	}
       
   507 	// Fall back to use background ticket when the request ticket is not available in Flex or dev_appserver.
       
   508 	if ticket == "" {
       
   509 		ticket = DefaultTicket()
       
   510 	}
       
   511 	req := &remotepb.Request{
       
   512 		ServiceName: &service,
       
   513 		Method:      &method,
       
   514 		Request:     data,
       
   515 		RequestId:   &ticket,
       
   516 	}
       
   517 	hreqBody, err := proto.Marshal(req)
       
   518 	if err != nil {
       
   519 		return err
       
   520 	}
       
   521 
       
   522 	hrespBody, err := c.post(hreqBody, timeout)
       
   523 	if err != nil {
       
   524 		return err
       
   525 	}
       
   526 
       
   527 	res := &remotepb.Response{}
       
   528 	if err := proto.Unmarshal(hrespBody, res); err != nil {
       
   529 		return err
       
   530 	}
       
   531 	if res.RpcError != nil {
       
   532 		ce := &CallError{
       
   533 			Detail: res.RpcError.GetDetail(),
       
   534 			Code:   *res.RpcError.Code,
       
   535 		}
       
   536 		switch remotepb.RpcError_ErrorCode(ce.Code) {
       
   537 		case remotepb.RpcError_CANCELLED, remotepb.RpcError_DEADLINE_EXCEEDED:
       
   538 			ce.Timeout = true
       
   539 		}
       
   540 		return ce
       
   541 	}
       
   542 	if res.ApplicationError != nil {
       
   543 		return &APIError{
       
   544 			Service: *req.ServiceName,
       
   545 			Detail:  res.ApplicationError.GetDetail(),
       
   546 			Code:    *res.ApplicationError.Code,
       
   547 		}
       
   548 	}
       
   549 	if res.Exception != nil || res.JavaException != nil {
       
   550 		// This shouldn't happen, but let's be defensive.
       
   551 		return &CallError{
       
   552 			Detail: "service bridge returned exception",
       
   553 			Code:   int32(remotepb.RpcError_UNKNOWN),
       
   554 		}
       
   555 	}
       
   556 	return proto.Unmarshal(res.Response, out)
       
   557 }
       
   558 
       
   559 func (c *context) Request() *http.Request {
       
   560 	return c.req
       
   561 }
       
   562 
       
   563 func (c *context) addLogLine(ll *logpb.UserAppLogLine) {
       
   564 	// Truncate long log lines.
       
   565 	// TODO(dsymonds): Check if this is still necessary.
       
   566 	const lim = 8 << 10
       
   567 	if len(*ll.Message) > lim {
       
   568 		suffix := fmt.Sprintf("...(length %d)", len(*ll.Message))
       
   569 		ll.Message = proto.String((*ll.Message)[:lim-len(suffix)] + suffix)
       
   570 	}
       
   571 
       
   572 	c.pendingLogs.Lock()
       
   573 	c.pendingLogs.lines = append(c.pendingLogs.lines, ll)
       
   574 	c.pendingLogs.Unlock()
       
   575 }
       
   576 
       
   577 var logLevelName = map[int64]string{
       
   578 	0: "DEBUG",
       
   579 	1: "INFO",
       
   580 	2: "WARNING",
       
   581 	3: "ERROR",
       
   582 	4: "CRITICAL",
       
   583 }
       
   584 
       
   585 func logf(c *context, level int64, format string, args ...interface{}) {
       
   586 	if c == nil {
       
   587 		panic("not an App Engine context")
       
   588 	}
       
   589 	s := fmt.Sprintf(format, args...)
       
   590 	s = strings.TrimRight(s, "\n") // Remove any trailing newline characters.
       
   591 	c.addLogLine(&logpb.UserAppLogLine{
       
   592 		TimestampUsec: proto.Int64(time.Now().UnixNano() / 1e3),
       
   593 		Level:         &level,
       
   594 		Message:       &s,
       
   595 	})
       
   596 	log.Print(logLevelName[level] + ": " + s)
       
   597 }
       
   598 
       
   599 // flushLog attempts to flush any pending logs to the appserver.
       
   600 // It should not be called concurrently.
       
   601 func (c *context) flushLog(force bool) (flushed bool) {
       
   602 	c.pendingLogs.Lock()
       
   603 	// Grab up to 30 MB. We can get away with up to 32 MB, but let's be cautious.
       
   604 	n, rem := 0, 30<<20
       
   605 	for ; n < len(c.pendingLogs.lines); n++ {
       
   606 		ll := c.pendingLogs.lines[n]
       
   607 		// Each log line will require about 3 bytes of overhead.
       
   608 		nb := proto.Size(ll) + 3
       
   609 		if nb > rem {
       
   610 			break
       
   611 		}
       
   612 		rem -= nb
       
   613 	}
       
   614 	lines := c.pendingLogs.lines[:n]
       
   615 	c.pendingLogs.lines = c.pendingLogs.lines[n:]
       
   616 	c.pendingLogs.Unlock()
       
   617 
       
   618 	if len(lines) == 0 && !force {
       
   619 		// Nothing to flush.
       
   620 		return false
       
   621 	}
       
   622 
       
   623 	rescueLogs := false
       
   624 	defer func() {
       
   625 		if rescueLogs {
       
   626 			c.pendingLogs.Lock()
       
   627 			c.pendingLogs.lines = append(lines, c.pendingLogs.lines...)
       
   628 			c.pendingLogs.Unlock()
       
   629 		}
       
   630 	}()
       
   631 
       
   632 	buf, err := proto.Marshal(&logpb.UserAppLogGroup{
       
   633 		LogLine: lines,
       
   634 	})
       
   635 	if err != nil {
       
   636 		log.Printf("internal.flushLog: marshaling UserAppLogGroup: %v", err)
       
   637 		rescueLogs = true
       
   638 		return false
       
   639 	}
       
   640 
       
   641 	req := &logpb.FlushRequest{
       
   642 		Logs: buf,
       
   643 	}
       
   644 	res := &basepb.VoidProto{}
       
   645 	c.pendingLogs.Lock()
       
   646 	c.pendingLogs.flushes++
       
   647 	c.pendingLogs.Unlock()
       
   648 	if err := Call(toContext(c), "logservice", "Flush", req, res); err != nil {
       
   649 		log.Printf("internal.flushLog: Flush RPC: %v", err)
       
   650 		rescueLogs = true
       
   651 		return false
       
   652 	}
       
   653 	return true
       
   654 }
       
   655 
       
   656 const (
       
   657 	// Log flushing parameters.
       
   658 	flushInterval      = 1 * time.Second
       
   659 	forceFlushInterval = 60 * time.Second
       
   660 )
       
   661 
       
   662 func (c *context) logFlusher(stop <-chan int) {
       
   663 	lastFlush := time.Now()
       
   664 	tick := time.NewTicker(flushInterval)
       
   665 	for {
       
   666 		select {
       
   667 		case <-stop:
       
   668 			// Request finished.
       
   669 			tick.Stop()
       
   670 			return
       
   671 		case <-tick.C:
       
   672 			force := time.Now().Sub(lastFlush) > forceFlushInterval
       
   673 			if c.flushLog(force) {
       
   674 				lastFlush = time.Now()
       
   675 			}
       
   676 		}
       
   677 	}
       
   678 }
       
   679 
       
   680 func ContextForTesting(req *http.Request) netcontext.Context {
       
   681 	return toContext(&context{req: req})
       
   682 }