1// Copyright 2011 Google Inc. All rights reserved. 2// Use of this source code is governed by the Apache 2.0 3// license that can be found in the LICENSE file. 4 5// +build !appengine 6// +build go1.7 7 8package internal 9 10import ( 11 "bytes" 12 "errors" 13 "fmt" 14 "io/ioutil" 15 "log" 16 "net" 17 "net/http" 18 "net/url" 19 "os" 20 "runtime" 21 "strconv" 22 "strings" 23 "sync" 24 "sync/atomic" 25 "time" 26 27 "github.com/golang/protobuf/proto" 28 netcontext "golang.org/x/net/context" 29 30 basepb "google.golang.org/appengine/internal/base" 31 logpb "google.golang.org/appengine/internal/log" 32 remotepb "google.golang.org/appengine/internal/remote_api" 33) 34 35const ( 36 apiPath = "/rpc_http" 37 defaultTicketSuffix = "/default.20150612t184001.0" 38) 39 40var ( 41 // Incoming headers. 42 ticketHeader = http.CanonicalHeaderKey("X-AppEngine-API-Ticket") 43 dapperHeader = http.CanonicalHeaderKey("X-Google-DapperTraceInfo") 44 traceHeader = http.CanonicalHeaderKey("X-Cloud-Trace-Context") 45 curNamespaceHeader = http.CanonicalHeaderKey("X-AppEngine-Current-Namespace") 46 userIPHeader = http.CanonicalHeaderKey("X-AppEngine-User-IP") 47 remoteAddrHeader = http.CanonicalHeaderKey("X-AppEngine-Remote-Addr") 48 49 // Outgoing headers. 50 apiEndpointHeader = http.CanonicalHeaderKey("X-Google-RPC-Service-Endpoint") 51 apiEndpointHeaderValue = []string{"app-engine-apis"} 52 apiMethodHeader = http.CanonicalHeaderKey("X-Google-RPC-Service-Method") 53 apiMethodHeaderValue = []string{"/VMRemoteAPI.CallRemoteAPI"} 54 apiDeadlineHeader = http.CanonicalHeaderKey("X-Google-RPC-Service-Deadline") 55 apiContentType = http.CanonicalHeaderKey("Content-Type") 56 apiContentTypeValue = []string{"application/octet-stream"} 57 logFlushHeader = http.CanonicalHeaderKey("X-AppEngine-Log-Flush-Count") 58 59 apiHTTPClient = &http.Client{ 60 Transport: &http.Transport{ 61 Proxy: http.ProxyFromEnvironment, 62 Dial: limitDial, 63 }, 64 } 65 66 defaultTicketOnce sync.Once 67 defaultTicket string 68 backgroundContextOnce sync.Once 69 backgroundContext netcontext.Context 70) 71 72func apiURL() *url.URL { 73 host, port := "appengine.googleapis.internal", "10001" 74 if h := os.Getenv("API_HOST"); h != "" { 75 host = h 76 } 77 if p := os.Getenv("API_PORT"); p != "" { 78 port = p 79 } 80 return &url.URL{ 81 Scheme: "http", 82 Host: host + ":" + port, 83 Path: apiPath, 84 } 85} 86 87func handleHTTP(w http.ResponseWriter, r *http.Request) { 88 c := &context{ 89 req: r, 90 outHeader: w.Header(), 91 apiURL: apiURL(), 92 } 93 r = r.WithContext(withContext(r.Context(), c)) 94 c.req = r 95 96 stopFlushing := make(chan int) 97 98 // Patch up RemoteAddr so it looks reasonable. 99 if addr := r.Header.Get(userIPHeader); addr != "" { 100 r.RemoteAddr = addr 101 } else if addr = r.Header.Get(remoteAddrHeader); addr != "" { 102 r.RemoteAddr = addr 103 } else { 104 // Should not normally reach here, but pick a sensible default anyway. 105 r.RemoteAddr = "127.0.0.1" 106 } 107 // The address in the headers will most likely be of these forms: 108 // 123.123.123.123 109 // 2001:db8::1 110 // net/http.Request.RemoteAddr is specified to be in "IP:port" form. 111 if _, _, err := net.SplitHostPort(r.RemoteAddr); err != nil { 112 // Assume the remote address is only a host; add a default port. 113 r.RemoteAddr = net.JoinHostPort(r.RemoteAddr, "80") 114 } 115 116 // Start goroutine responsible for flushing app logs. 117 // This is done after adding c to ctx.m (and stopped before removing it) 118 // because flushing logs requires making an API call. 119 go c.logFlusher(stopFlushing) 120 121 executeRequestSafely(c, r) 122 c.outHeader = nil // make sure header changes aren't respected any more 123 124 stopFlushing <- 1 // any logging beyond this point will be dropped 125 126 // Flush any pending logs asynchronously. 127 c.pendingLogs.Lock() 128 flushes := c.pendingLogs.flushes 129 if len(c.pendingLogs.lines) > 0 { 130 flushes++ 131 } 132 c.pendingLogs.Unlock() 133 go c.flushLog(false) 134 w.Header().Set(logFlushHeader, strconv.Itoa(flushes)) 135 136 // Avoid nil Write call if c.Write is never called. 137 if c.outCode != 0 { 138 w.WriteHeader(c.outCode) 139 } 140 if c.outBody != nil { 141 w.Write(c.outBody) 142 } 143} 144 145func executeRequestSafely(c *context, r *http.Request) { 146 defer func() { 147 if x := recover(); x != nil { 148 logf(c, 4, "%s", renderPanic(x)) // 4 == critical 149 c.outCode = 500 150 } 151 }() 152 153 http.DefaultServeMux.ServeHTTP(c, r) 154} 155 156func renderPanic(x interface{}) string { 157 buf := make([]byte, 16<<10) // 16 KB should be plenty 158 buf = buf[:runtime.Stack(buf, false)] 159 160 // Remove the first few stack frames: 161 // this func 162 // the recover closure in the caller 163 // That will root the stack trace at the site of the panic. 164 const ( 165 skipStart = "internal.renderPanic" 166 skipFrames = 2 167 ) 168 start := bytes.Index(buf, []byte(skipStart)) 169 p := start 170 for i := 0; i < skipFrames*2 && p+1 < len(buf); i++ { 171 p = bytes.IndexByte(buf[p+1:], '\n') + p + 1 172 if p < 0 { 173 break 174 } 175 } 176 if p >= 0 { 177 // buf[start:p+1] is the block to remove. 178 // Copy buf[p+1:] over buf[start:] and shrink buf. 179 copy(buf[start:], buf[p+1:]) 180 buf = buf[:len(buf)-(p+1-start)] 181 } 182 183 // Add panic heading. 184 head := fmt.Sprintf("panic: %v\n\n", x) 185 if len(head) > len(buf) { 186 // Extremely unlikely to happen. 187 return head 188 } 189 copy(buf[len(head):], buf) 190 copy(buf, head) 191 192 return string(buf) 193} 194 195// context represents the context of an in-flight HTTP request. 196// It implements the appengine.Context and http.ResponseWriter interfaces. 197type context struct { 198 req *http.Request 199 200 outCode int 201 outHeader http.Header 202 outBody []byte 203 204 pendingLogs struct { 205 sync.Mutex 206 lines []*logpb.UserAppLogLine 207 flushes int 208 } 209 210 apiURL *url.URL 211} 212 213var contextKey = "holds a *context" 214 215// jointContext joins two contexts in a superficial way. 216// It takes values and timeouts from a base context, and only values from another context. 217type jointContext struct { 218 base netcontext.Context 219 valuesOnly netcontext.Context 220} 221 222func (c jointContext) Deadline() (time.Time, bool) { 223 return c.base.Deadline() 224} 225 226func (c jointContext) Done() <-chan struct{} { 227 return c.base.Done() 228} 229 230func (c jointContext) Err() error { 231 return c.base.Err() 232} 233 234func (c jointContext) Value(key interface{}) interface{} { 235 if val := c.base.Value(key); val != nil { 236 return val 237 } 238 return c.valuesOnly.Value(key) 239} 240 241// fromContext returns the App Engine context or nil if ctx is not 242// derived from an App Engine context. 243func fromContext(ctx netcontext.Context) *context { 244 c, _ := ctx.Value(&contextKey).(*context) 245 return c 246} 247 248func withContext(parent netcontext.Context, c *context) netcontext.Context { 249 ctx := netcontext.WithValue(parent, &contextKey, c) 250 if ns := c.req.Header.Get(curNamespaceHeader); ns != "" { 251 ctx = withNamespace(ctx, ns) 252 } 253 return ctx 254} 255 256func toContext(c *context) netcontext.Context { 257 return withContext(netcontext.Background(), c) 258} 259 260func IncomingHeaders(ctx netcontext.Context) http.Header { 261 if c := fromContext(ctx); c != nil { 262 return c.req.Header 263 } 264 return nil 265} 266 267func ReqContext(req *http.Request) netcontext.Context { 268 return req.Context() 269} 270 271func WithContext(parent netcontext.Context, req *http.Request) netcontext.Context { 272 return jointContext{ 273 base: parent, 274 valuesOnly: req.Context(), 275 } 276} 277 278// DefaultTicket returns a ticket used for background context or dev_appserver. 279func DefaultTicket() string { 280 defaultTicketOnce.Do(func() { 281 if IsDevAppServer() { 282 defaultTicket = "testapp" + defaultTicketSuffix 283 return 284 } 285 appID := partitionlessAppID() 286 escAppID := strings.Replace(strings.Replace(appID, ":", "_", -1), ".", "_", -1) 287 majVersion := VersionID(nil) 288 if i := strings.Index(majVersion, "."); i > 0 { 289 majVersion = majVersion[:i] 290 } 291 defaultTicket = fmt.Sprintf("%s/%s.%s.%s", escAppID, ModuleName(nil), majVersion, InstanceID()) 292 }) 293 return defaultTicket 294} 295 296func BackgroundContext() netcontext.Context { 297 backgroundContextOnce.Do(func() { 298 // Compute background security ticket. 299 ticket := DefaultTicket() 300 301 c := &context{ 302 req: &http.Request{ 303 Header: http.Header{ 304 ticketHeader: []string{ticket}, 305 }, 306 }, 307 apiURL: apiURL(), 308 } 309 backgroundContext = toContext(c) 310 311 // TODO(dsymonds): Wire up the shutdown handler to do a final flush. 312 go c.logFlusher(make(chan int)) 313 }) 314 315 return backgroundContext 316} 317 318// RegisterTestRequest registers the HTTP request req for testing, such that 319// any API calls are sent to the provided URL. It returns a closure to delete 320// the registration. 321// It should only be used by aetest package. 322func RegisterTestRequest(req *http.Request, apiURL *url.URL, decorate func(netcontext.Context) netcontext.Context) (*http.Request, func()) { 323 c := &context{ 324 req: req, 325 apiURL: apiURL, 326 } 327 ctx := withContext(decorate(req.Context()), c) 328 req = req.WithContext(ctx) 329 c.req = req 330 return req, func() {} 331} 332 333var errTimeout = &CallError{ 334 Detail: "Deadline exceeded", 335 Code: int32(remotepb.RpcError_CANCELLED), 336 Timeout: true, 337} 338 339func (c *context) Header() http.Header { return c.outHeader } 340 341// Copied from $GOROOT/src/pkg/net/http/transfer.go. Some response status 342// codes do not permit a response body (nor response entity headers such as 343// Content-Length, Content-Type, etc). 344func bodyAllowedForStatus(status int) bool { 345 switch { 346 case status >= 100 && status <= 199: 347 return false 348 case status == 204: 349 return false 350 case status == 304: 351 return false 352 } 353 return true 354} 355 356func (c *context) Write(b []byte) (int, error) { 357 if c.outCode == 0 { 358 c.WriteHeader(http.StatusOK) 359 } 360 if len(b) > 0 && !bodyAllowedForStatus(c.outCode) { 361 return 0, http.ErrBodyNotAllowed 362 } 363 c.outBody = append(c.outBody, b...) 364 return len(b), nil 365} 366 367func (c *context) WriteHeader(code int) { 368 if c.outCode != 0 { 369 logf(c, 3, "WriteHeader called multiple times on request.") // error level 370 return 371 } 372 c.outCode = code 373} 374 375func (c *context) post(body []byte, timeout time.Duration) (b []byte, err error) { 376 hreq := &http.Request{ 377 Method: "POST", 378 URL: c.apiURL, 379 Header: http.Header{ 380 apiEndpointHeader: apiEndpointHeaderValue, 381 apiMethodHeader: apiMethodHeaderValue, 382 apiContentType: apiContentTypeValue, 383 apiDeadlineHeader: []string{strconv.FormatFloat(timeout.Seconds(), 'f', -1, 64)}, 384 }, 385 Body: ioutil.NopCloser(bytes.NewReader(body)), 386 ContentLength: int64(len(body)), 387 Host: c.apiURL.Host, 388 } 389 if info := c.req.Header.Get(dapperHeader); info != "" { 390 hreq.Header.Set(dapperHeader, info) 391 } 392 if info := c.req.Header.Get(traceHeader); info != "" { 393 hreq.Header.Set(traceHeader, info) 394 } 395 396 tr := apiHTTPClient.Transport.(*http.Transport) 397 398 var timedOut int32 // atomic; set to 1 if timed out 399 t := time.AfterFunc(timeout, func() { 400 atomic.StoreInt32(&timedOut, 1) 401 tr.CancelRequest(hreq) 402 }) 403 defer t.Stop() 404 defer func() { 405 // Check if timeout was exceeded. 406 if atomic.LoadInt32(&timedOut) != 0 { 407 err = errTimeout 408 } 409 }() 410 411 hresp, err := apiHTTPClient.Do(hreq) 412 if err != nil { 413 return nil, &CallError{ 414 Detail: fmt.Sprintf("service bridge HTTP failed: %v", err), 415 Code: int32(remotepb.RpcError_UNKNOWN), 416 } 417 } 418 defer hresp.Body.Close() 419 hrespBody, err := ioutil.ReadAll(hresp.Body) 420 if hresp.StatusCode != 200 { 421 return nil, &CallError{ 422 Detail: fmt.Sprintf("service bridge returned HTTP %d (%q)", hresp.StatusCode, hrespBody), 423 Code: int32(remotepb.RpcError_UNKNOWN), 424 } 425 } 426 if err != nil { 427 return nil, &CallError{ 428 Detail: fmt.Sprintf("service bridge response bad: %v", err), 429 Code: int32(remotepb.RpcError_UNKNOWN), 430 } 431 } 432 return hrespBody, nil 433} 434 435func Call(ctx netcontext.Context, service, method string, in, out proto.Message) error { 436 if ns := NamespaceFromContext(ctx); ns != "" { 437 if fn, ok := NamespaceMods[service]; ok { 438 fn(in, ns) 439 } 440 } 441 442 if f, ctx, ok := callOverrideFromContext(ctx); ok { 443 return f(ctx, service, method, in, out) 444 } 445 446 // Handle already-done contexts quickly. 447 select { 448 case <-ctx.Done(): 449 return ctx.Err() 450 default: 451 } 452 453 c := fromContext(ctx) 454 if c == nil { 455 // Give a good error message rather than a panic lower down. 456 return errNotAppEngineContext 457 } 458 459 // Apply transaction modifications if we're in a transaction. 460 if t := transactionFromContext(ctx); t != nil { 461 if t.finished { 462 return errors.New("transaction context has expired") 463 } 464 applyTransaction(in, &t.transaction) 465 } 466 467 // Default RPC timeout is 60s. 468 timeout := 60 * time.Second 469 if deadline, ok := ctx.Deadline(); ok { 470 timeout = deadline.Sub(time.Now()) 471 } 472 473 data, err := proto.Marshal(in) 474 if err != nil { 475 return err 476 } 477 478 ticket := c.req.Header.Get(ticketHeader) 479 // Use a test ticket under test environment. 480 if ticket == "" { 481 if appid := ctx.Value(&appIDOverrideKey); appid != nil { 482 ticket = appid.(string) + defaultTicketSuffix 483 } 484 } 485 // Fall back to use background ticket when the request ticket is not available in Flex or dev_appserver. 486 if ticket == "" { 487 ticket = DefaultTicket() 488 } 489 req := &remotepb.Request{ 490 ServiceName: &service, 491 Method: &method, 492 Request: data, 493 RequestId: &ticket, 494 } 495 hreqBody, err := proto.Marshal(req) 496 if err != nil { 497 return err 498 } 499 500 hrespBody, err := c.post(hreqBody, timeout) 501 if err != nil { 502 return err 503 } 504 505 res := &remotepb.Response{} 506 if err := proto.Unmarshal(hrespBody, res); err != nil { 507 return err 508 } 509 if res.RpcError != nil { 510 ce := &CallError{ 511 Detail: res.RpcError.GetDetail(), 512 Code: *res.RpcError.Code, 513 } 514 switch remotepb.RpcError_ErrorCode(ce.Code) { 515 case remotepb.RpcError_CANCELLED, remotepb.RpcError_DEADLINE_EXCEEDED: 516 ce.Timeout = true 517 } 518 return ce 519 } 520 if res.ApplicationError != nil { 521 return &APIError{ 522 Service: *req.ServiceName, 523 Detail: res.ApplicationError.GetDetail(), 524 Code: *res.ApplicationError.Code, 525 } 526 } 527 if res.Exception != nil || res.JavaException != nil { 528 // This shouldn't happen, but let's be defensive. 529 return &CallError{ 530 Detail: "service bridge returned exception", 531 Code: int32(remotepb.RpcError_UNKNOWN), 532 } 533 } 534 return proto.Unmarshal(res.Response, out) 535} 536 537func (c *context) Request() *http.Request { 538 return c.req 539} 540 541func (c *context) addLogLine(ll *logpb.UserAppLogLine) { 542 // Truncate long log lines. 543 // TODO(dsymonds): Check if this is still necessary. 544 const lim = 8 << 10 545 if len(*ll.Message) > lim { 546 suffix := fmt.Sprintf("...(length %d)", len(*ll.Message)) 547 ll.Message = proto.String((*ll.Message)[:lim-len(suffix)] + suffix) 548 } 549 550 c.pendingLogs.Lock() 551 c.pendingLogs.lines = append(c.pendingLogs.lines, ll) 552 c.pendingLogs.Unlock() 553} 554 555var logLevelName = map[int64]string{ 556 0: "DEBUG", 557 1: "INFO", 558 2: "WARNING", 559 3: "ERROR", 560 4: "CRITICAL", 561} 562 563func logf(c *context, level int64, format string, args ...interface{}) { 564 if c == nil { 565 panic("not an App Engine context") 566 } 567 s := fmt.Sprintf(format, args...) 568 s = strings.TrimRight(s, "\n") // Remove any trailing newline characters. 569 c.addLogLine(&logpb.UserAppLogLine{ 570 TimestampUsec: proto.Int64(time.Now().UnixNano() / 1e3), 571 Level: &level, 572 Message: &s, 573 }) 574 log.Print(logLevelName[level] + ": " + s) 575} 576 577// flushLog attempts to flush any pending logs to the appserver. 578// It should not be called concurrently. 579func (c *context) flushLog(force bool) (flushed bool) { 580 c.pendingLogs.Lock() 581 // Grab up to 30 MB. We can get away with up to 32 MB, but let's be cautious. 582 n, rem := 0, 30<<20 583 for ; n < len(c.pendingLogs.lines); n++ { 584 ll := c.pendingLogs.lines[n] 585 // Each log line will require about 3 bytes of overhead. 586 nb := proto.Size(ll) + 3 587 if nb > rem { 588 break 589 } 590 rem -= nb 591 } 592 lines := c.pendingLogs.lines[:n] 593 c.pendingLogs.lines = c.pendingLogs.lines[n:] 594 c.pendingLogs.Unlock() 595 596 if len(lines) == 0 && !force { 597 // Nothing to flush. 598 return false 599 } 600 601 rescueLogs := false 602 defer func() { 603 if rescueLogs { 604 c.pendingLogs.Lock() 605 c.pendingLogs.lines = append(lines, c.pendingLogs.lines...) 606 c.pendingLogs.Unlock() 607 } 608 }() 609 610 buf, err := proto.Marshal(&logpb.UserAppLogGroup{ 611 LogLine: lines, 612 }) 613 if err != nil { 614 log.Printf("internal.flushLog: marshaling UserAppLogGroup: %v", err) 615 rescueLogs = true 616 return false 617 } 618 619 req := &logpb.FlushRequest{ 620 Logs: buf, 621 } 622 res := &basepb.VoidProto{} 623 c.pendingLogs.Lock() 624 c.pendingLogs.flushes++ 625 c.pendingLogs.Unlock() 626 if err := Call(toContext(c), "logservice", "Flush", req, res); err != nil { 627 log.Printf("internal.flushLog: Flush RPC: %v", err) 628 rescueLogs = true 629 return false 630 } 631 return true 632} 633 634const ( 635 // Log flushing parameters. 636 flushInterval = 1 * time.Second 637 forceFlushInterval = 60 * time.Second 638) 639 640func (c *context) logFlusher(stop <-chan int) { 641 lastFlush := time.Now() 642 tick := time.NewTicker(flushInterval) 643 for { 644 select { 645 case <-stop: 646 // Request finished. 647 tick.Stop() 648 return 649 case <-tick.C: 650 force := time.Now().Sub(lastFlush) > forceFlushInterval 651 if c.flushLog(force) { 652 lastFlush = time.Now() 653 } 654 } 655 } 656} 657 658func ContextForTesting(req *http.Request) netcontext.Context { 659 return toContext(&context{req: req}) 660} 661