• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1// Copyright 2011 Google Inc. All rights reserved.
2// Use of this source code is governed by the Apache 2.0
3// license that can be found in the LICENSE file.
4
5// +build !appengine
6// +build go1.7
7
8package internal
9
10import (
11	"bytes"
12	"errors"
13	"fmt"
14	"io/ioutil"
15	"log"
16	"net"
17	"net/http"
18	"net/url"
19	"os"
20	"runtime"
21	"strconv"
22	"strings"
23	"sync"
24	"sync/atomic"
25	"time"
26
27	"github.com/golang/protobuf/proto"
28	netcontext "golang.org/x/net/context"
29
30	basepb "google.golang.org/appengine/internal/base"
31	logpb "google.golang.org/appengine/internal/log"
32	remotepb "google.golang.org/appengine/internal/remote_api"
33)
34
35const (
36	apiPath             = "/rpc_http"
37	defaultTicketSuffix = "/default.20150612t184001.0"
38)
39
40var (
41	// Incoming headers.
42	ticketHeader       = http.CanonicalHeaderKey("X-AppEngine-API-Ticket")
43	dapperHeader       = http.CanonicalHeaderKey("X-Google-DapperTraceInfo")
44	traceHeader        = http.CanonicalHeaderKey("X-Cloud-Trace-Context")
45	curNamespaceHeader = http.CanonicalHeaderKey("X-AppEngine-Current-Namespace")
46	userIPHeader       = http.CanonicalHeaderKey("X-AppEngine-User-IP")
47	remoteAddrHeader   = http.CanonicalHeaderKey("X-AppEngine-Remote-Addr")
48
49	// Outgoing headers.
50	apiEndpointHeader      = http.CanonicalHeaderKey("X-Google-RPC-Service-Endpoint")
51	apiEndpointHeaderValue = []string{"app-engine-apis"}
52	apiMethodHeader        = http.CanonicalHeaderKey("X-Google-RPC-Service-Method")
53	apiMethodHeaderValue   = []string{"/VMRemoteAPI.CallRemoteAPI"}
54	apiDeadlineHeader      = http.CanonicalHeaderKey("X-Google-RPC-Service-Deadline")
55	apiContentType         = http.CanonicalHeaderKey("Content-Type")
56	apiContentTypeValue    = []string{"application/octet-stream"}
57	logFlushHeader         = http.CanonicalHeaderKey("X-AppEngine-Log-Flush-Count")
58
59	apiHTTPClient = &http.Client{
60		Transport: &http.Transport{
61			Proxy: http.ProxyFromEnvironment,
62			Dial:  limitDial,
63		},
64	}
65
66	defaultTicketOnce     sync.Once
67	defaultTicket         string
68	backgroundContextOnce sync.Once
69	backgroundContext     netcontext.Context
70)
71
72func apiURL() *url.URL {
73	host, port := "appengine.googleapis.internal", "10001"
74	if h := os.Getenv("API_HOST"); h != "" {
75		host = h
76	}
77	if p := os.Getenv("API_PORT"); p != "" {
78		port = p
79	}
80	return &url.URL{
81		Scheme: "http",
82		Host:   host + ":" + port,
83		Path:   apiPath,
84	}
85}
86
87func handleHTTP(w http.ResponseWriter, r *http.Request) {
88	c := &context{
89		req:       r,
90		outHeader: w.Header(),
91		apiURL:    apiURL(),
92	}
93	r = r.WithContext(withContext(r.Context(), c))
94	c.req = r
95
96	stopFlushing := make(chan int)
97
98	// Patch up RemoteAddr so it looks reasonable.
99	if addr := r.Header.Get(userIPHeader); addr != "" {
100		r.RemoteAddr = addr
101	} else if addr = r.Header.Get(remoteAddrHeader); addr != "" {
102		r.RemoteAddr = addr
103	} else {
104		// Should not normally reach here, but pick a sensible default anyway.
105		r.RemoteAddr = "127.0.0.1"
106	}
107	// The address in the headers will most likely be of these forms:
108	//	123.123.123.123
109	//	2001:db8::1
110	// net/http.Request.RemoteAddr is specified to be in "IP:port" form.
111	if _, _, err := net.SplitHostPort(r.RemoteAddr); err != nil {
112		// Assume the remote address is only a host; add a default port.
113		r.RemoteAddr = net.JoinHostPort(r.RemoteAddr, "80")
114	}
115
116	// Start goroutine responsible for flushing app logs.
117	// This is done after adding c to ctx.m (and stopped before removing it)
118	// because flushing logs requires making an API call.
119	go c.logFlusher(stopFlushing)
120
121	executeRequestSafely(c, r)
122	c.outHeader = nil // make sure header changes aren't respected any more
123
124	stopFlushing <- 1 // any logging beyond this point will be dropped
125
126	// Flush any pending logs asynchronously.
127	c.pendingLogs.Lock()
128	flushes := c.pendingLogs.flushes
129	if len(c.pendingLogs.lines) > 0 {
130		flushes++
131	}
132	c.pendingLogs.Unlock()
133	go c.flushLog(false)
134	w.Header().Set(logFlushHeader, strconv.Itoa(flushes))
135
136	// Avoid nil Write call if c.Write is never called.
137	if c.outCode != 0 {
138		w.WriteHeader(c.outCode)
139	}
140	if c.outBody != nil {
141		w.Write(c.outBody)
142	}
143}
144
145func executeRequestSafely(c *context, r *http.Request) {
146	defer func() {
147		if x := recover(); x != nil {
148			logf(c, 4, "%s", renderPanic(x)) // 4 == critical
149			c.outCode = 500
150		}
151	}()
152
153	http.DefaultServeMux.ServeHTTP(c, r)
154}
155
156func renderPanic(x interface{}) string {
157	buf := make([]byte, 16<<10) // 16 KB should be plenty
158	buf = buf[:runtime.Stack(buf, false)]
159
160	// Remove the first few stack frames:
161	//   this func
162	//   the recover closure in the caller
163	// That will root the stack trace at the site of the panic.
164	const (
165		skipStart  = "internal.renderPanic"
166		skipFrames = 2
167	)
168	start := bytes.Index(buf, []byte(skipStart))
169	p := start
170	for i := 0; i < skipFrames*2 && p+1 < len(buf); i++ {
171		p = bytes.IndexByte(buf[p+1:], '\n') + p + 1
172		if p < 0 {
173			break
174		}
175	}
176	if p >= 0 {
177		// buf[start:p+1] is the block to remove.
178		// Copy buf[p+1:] over buf[start:] and shrink buf.
179		copy(buf[start:], buf[p+1:])
180		buf = buf[:len(buf)-(p+1-start)]
181	}
182
183	// Add panic heading.
184	head := fmt.Sprintf("panic: %v\n\n", x)
185	if len(head) > len(buf) {
186		// Extremely unlikely to happen.
187		return head
188	}
189	copy(buf[len(head):], buf)
190	copy(buf, head)
191
192	return string(buf)
193}
194
195// context represents the context of an in-flight HTTP request.
196// It implements the appengine.Context and http.ResponseWriter interfaces.
197type context struct {
198	req *http.Request
199
200	outCode   int
201	outHeader http.Header
202	outBody   []byte
203
204	pendingLogs struct {
205		sync.Mutex
206		lines   []*logpb.UserAppLogLine
207		flushes int
208	}
209
210	apiURL *url.URL
211}
212
213var contextKey = "holds a *context"
214
215// jointContext joins two contexts in a superficial way.
216// It takes values and timeouts from a base context, and only values from another context.
217type jointContext struct {
218	base       netcontext.Context
219	valuesOnly netcontext.Context
220}
221
222func (c jointContext) Deadline() (time.Time, bool) {
223	return c.base.Deadline()
224}
225
226func (c jointContext) Done() <-chan struct{} {
227	return c.base.Done()
228}
229
230func (c jointContext) Err() error {
231	return c.base.Err()
232}
233
234func (c jointContext) Value(key interface{}) interface{} {
235	if val := c.base.Value(key); val != nil {
236		return val
237	}
238	return c.valuesOnly.Value(key)
239}
240
241// fromContext returns the App Engine context or nil if ctx is not
242// derived from an App Engine context.
243func fromContext(ctx netcontext.Context) *context {
244	c, _ := ctx.Value(&contextKey).(*context)
245	return c
246}
247
248func withContext(parent netcontext.Context, c *context) netcontext.Context {
249	ctx := netcontext.WithValue(parent, &contextKey, c)
250	if ns := c.req.Header.Get(curNamespaceHeader); ns != "" {
251		ctx = withNamespace(ctx, ns)
252	}
253	return ctx
254}
255
256func toContext(c *context) netcontext.Context {
257	return withContext(netcontext.Background(), c)
258}
259
260func IncomingHeaders(ctx netcontext.Context) http.Header {
261	if c := fromContext(ctx); c != nil {
262		return c.req.Header
263	}
264	return nil
265}
266
267func ReqContext(req *http.Request) netcontext.Context {
268	return req.Context()
269}
270
271func WithContext(parent netcontext.Context, req *http.Request) netcontext.Context {
272	return jointContext{
273		base:       parent,
274		valuesOnly: req.Context(),
275	}
276}
277
278// DefaultTicket returns a ticket used for background context or dev_appserver.
279func DefaultTicket() string {
280	defaultTicketOnce.Do(func() {
281		if IsDevAppServer() {
282			defaultTicket = "testapp" + defaultTicketSuffix
283			return
284		}
285		appID := partitionlessAppID()
286		escAppID := strings.Replace(strings.Replace(appID, ":", "_", -1), ".", "_", -1)
287		majVersion := VersionID(nil)
288		if i := strings.Index(majVersion, "."); i > 0 {
289			majVersion = majVersion[:i]
290		}
291		defaultTicket = fmt.Sprintf("%s/%s.%s.%s", escAppID, ModuleName(nil), majVersion, InstanceID())
292	})
293	return defaultTicket
294}
295
296func BackgroundContext() netcontext.Context {
297	backgroundContextOnce.Do(func() {
298		// Compute background security ticket.
299		ticket := DefaultTicket()
300
301		c := &context{
302			req: &http.Request{
303				Header: http.Header{
304					ticketHeader: []string{ticket},
305				},
306			},
307			apiURL: apiURL(),
308		}
309		backgroundContext = toContext(c)
310
311		// TODO(dsymonds): Wire up the shutdown handler to do a final flush.
312		go c.logFlusher(make(chan int))
313	})
314
315	return backgroundContext
316}
317
318// RegisterTestRequest registers the HTTP request req for testing, such that
319// any API calls are sent to the provided URL. It returns a closure to delete
320// the registration.
321// It should only be used by aetest package.
322func RegisterTestRequest(req *http.Request, apiURL *url.URL, decorate func(netcontext.Context) netcontext.Context) (*http.Request, func()) {
323	c := &context{
324		req:    req,
325		apiURL: apiURL,
326	}
327	ctx := withContext(decorate(req.Context()), c)
328	req = req.WithContext(ctx)
329	c.req = req
330	return req, func() {}
331}
332
333var errTimeout = &CallError{
334	Detail:  "Deadline exceeded",
335	Code:    int32(remotepb.RpcError_CANCELLED),
336	Timeout: true,
337}
338
339func (c *context) Header() http.Header { return c.outHeader }
340
341// Copied from $GOROOT/src/pkg/net/http/transfer.go. Some response status
342// codes do not permit a response body (nor response entity headers such as
343// Content-Length, Content-Type, etc).
344func bodyAllowedForStatus(status int) bool {
345	switch {
346	case status >= 100 && status <= 199:
347		return false
348	case status == 204:
349		return false
350	case status == 304:
351		return false
352	}
353	return true
354}
355
356func (c *context) Write(b []byte) (int, error) {
357	if c.outCode == 0 {
358		c.WriteHeader(http.StatusOK)
359	}
360	if len(b) > 0 && !bodyAllowedForStatus(c.outCode) {
361		return 0, http.ErrBodyNotAllowed
362	}
363	c.outBody = append(c.outBody, b...)
364	return len(b), nil
365}
366
367func (c *context) WriteHeader(code int) {
368	if c.outCode != 0 {
369		logf(c, 3, "WriteHeader called multiple times on request.") // error level
370		return
371	}
372	c.outCode = code
373}
374
375func (c *context) post(body []byte, timeout time.Duration) (b []byte, err error) {
376	hreq := &http.Request{
377		Method: "POST",
378		URL:    c.apiURL,
379		Header: http.Header{
380			apiEndpointHeader: apiEndpointHeaderValue,
381			apiMethodHeader:   apiMethodHeaderValue,
382			apiContentType:    apiContentTypeValue,
383			apiDeadlineHeader: []string{strconv.FormatFloat(timeout.Seconds(), 'f', -1, 64)},
384		},
385		Body:          ioutil.NopCloser(bytes.NewReader(body)),
386		ContentLength: int64(len(body)),
387		Host:          c.apiURL.Host,
388	}
389	if info := c.req.Header.Get(dapperHeader); info != "" {
390		hreq.Header.Set(dapperHeader, info)
391	}
392	if info := c.req.Header.Get(traceHeader); info != "" {
393		hreq.Header.Set(traceHeader, info)
394	}
395
396	tr := apiHTTPClient.Transport.(*http.Transport)
397
398	var timedOut int32 // atomic; set to 1 if timed out
399	t := time.AfterFunc(timeout, func() {
400		atomic.StoreInt32(&timedOut, 1)
401		tr.CancelRequest(hreq)
402	})
403	defer t.Stop()
404	defer func() {
405		// Check if timeout was exceeded.
406		if atomic.LoadInt32(&timedOut) != 0 {
407			err = errTimeout
408		}
409	}()
410
411	hresp, err := apiHTTPClient.Do(hreq)
412	if err != nil {
413		return nil, &CallError{
414			Detail: fmt.Sprintf("service bridge HTTP failed: %v", err),
415			Code:   int32(remotepb.RpcError_UNKNOWN),
416		}
417	}
418	defer hresp.Body.Close()
419	hrespBody, err := ioutil.ReadAll(hresp.Body)
420	if hresp.StatusCode != 200 {
421		return nil, &CallError{
422			Detail: fmt.Sprintf("service bridge returned HTTP %d (%q)", hresp.StatusCode, hrespBody),
423			Code:   int32(remotepb.RpcError_UNKNOWN),
424		}
425	}
426	if err != nil {
427		return nil, &CallError{
428			Detail: fmt.Sprintf("service bridge response bad: %v", err),
429			Code:   int32(remotepb.RpcError_UNKNOWN),
430		}
431	}
432	return hrespBody, nil
433}
434
435func Call(ctx netcontext.Context, service, method string, in, out proto.Message) error {
436	if ns := NamespaceFromContext(ctx); ns != "" {
437		if fn, ok := NamespaceMods[service]; ok {
438			fn(in, ns)
439		}
440	}
441
442	if f, ctx, ok := callOverrideFromContext(ctx); ok {
443		return f(ctx, service, method, in, out)
444	}
445
446	// Handle already-done contexts quickly.
447	select {
448	case <-ctx.Done():
449		return ctx.Err()
450	default:
451	}
452
453	c := fromContext(ctx)
454	if c == nil {
455		// Give a good error message rather than a panic lower down.
456		return errNotAppEngineContext
457	}
458
459	// Apply transaction modifications if we're in a transaction.
460	if t := transactionFromContext(ctx); t != nil {
461		if t.finished {
462			return errors.New("transaction context has expired")
463		}
464		applyTransaction(in, &t.transaction)
465	}
466
467	// Default RPC timeout is 60s.
468	timeout := 60 * time.Second
469	if deadline, ok := ctx.Deadline(); ok {
470		timeout = deadline.Sub(time.Now())
471	}
472
473	data, err := proto.Marshal(in)
474	if err != nil {
475		return err
476	}
477
478	ticket := c.req.Header.Get(ticketHeader)
479	// Use a test ticket under test environment.
480	if ticket == "" {
481		if appid := ctx.Value(&appIDOverrideKey); appid != nil {
482			ticket = appid.(string) + defaultTicketSuffix
483		}
484	}
485	// Fall back to use background ticket when the request ticket is not available in Flex or dev_appserver.
486	if ticket == "" {
487		ticket = DefaultTicket()
488	}
489	req := &remotepb.Request{
490		ServiceName: &service,
491		Method:      &method,
492		Request:     data,
493		RequestId:   &ticket,
494	}
495	hreqBody, err := proto.Marshal(req)
496	if err != nil {
497		return err
498	}
499
500	hrespBody, err := c.post(hreqBody, timeout)
501	if err != nil {
502		return err
503	}
504
505	res := &remotepb.Response{}
506	if err := proto.Unmarshal(hrespBody, res); err != nil {
507		return err
508	}
509	if res.RpcError != nil {
510		ce := &CallError{
511			Detail: res.RpcError.GetDetail(),
512			Code:   *res.RpcError.Code,
513		}
514		switch remotepb.RpcError_ErrorCode(ce.Code) {
515		case remotepb.RpcError_CANCELLED, remotepb.RpcError_DEADLINE_EXCEEDED:
516			ce.Timeout = true
517		}
518		return ce
519	}
520	if res.ApplicationError != nil {
521		return &APIError{
522			Service: *req.ServiceName,
523			Detail:  res.ApplicationError.GetDetail(),
524			Code:    *res.ApplicationError.Code,
525		}
526	}
527	if res.Exception != nil || res.JavaException != nil {
528		// This shouldn't happen, but let's be defensive.
529		return &CallError{
530			Detail: "service bridge returned exception",
531			Code:   int32(remotepb.RpcError_UNKNOWN),
532		}
533	}
534	return proto.Unmarshal(res.Response, out)
535}
536
537func (c *context) Request() *http.Request {
538	return c.req
539}
540
541func (c *context) addLogLine(ll *logpb.UserAppLogLine) {
542	// Truncate long log lines.
543	// TODO(dsymonds): Check if this is still necessary.
544	const lim = 8 << 10
545	if len(*ll.Message) > lim {
546		suffix := fmt.Sprintf("...(length %d)", len(*ll.Message))
547		ll.Message = proto.String((*ll.Message)[:lim-len(suffix)] + suffix)
548	}
549
550	c.pendingLogs.Lock()
551	c.pendingLogs.lines = append(c.pendingLogs.lines, ll)
552	c.pendingLogs.Unlock()
553}
554
555var logLevelName = map[int64]string{
556	0: "DEBUG",
557	1: "INFO",
558	2: "WARNING",
559	3: "ERROR",
560	4: "CRITICAL",
561}
562
563func logf(c *context, level int64, format string, args ...interface{}) {
564	if c == nil {
565		panic("not an App Engine context")
566	}
567	s := fmt.Sprintf(format, args...)
568	s = strings.TrimRight(s, "\n") // Remove any trailing newline characters.
569	c.addLogLine(&logpb.UserAppLogLine{
570		TimestampUsec: proto.Int64(time.Now().UnixNano() / 1e3),
571		Level:         &level,
572		Message:       &s,
573	})
574	log.Print(logLevelName[level] + ": " + s)
575}
576
577// flushLog attempts to flush any pending logs to the appserver.
578// It should not be called concurrently.
579func (c *context) flushLog(force bool) (flushed bool) {
580	c.pendingLogs.Lock()
581	// Grab up to 30 MB. We can get away with up to 32 MB, but let's be cautious.
582	n, rem := 0, 30<<20
583	for ; n < len(c.pendingLogs.lines); n++ {
584		ll := c.pendingLogs.lines[n]
585		// Each log line will require about 3 bytes of overhead.
586		nb := proto.Size(ll) + 3
587		if nb > rem {
588			break
589		}
590		rem -= nb
591	}
592	lines := c.pendingLogs.lines[:n]
593	c.pendingLogs.lines = c.pendingLogs.lines[n:]
594	c.pendingLogs.Unlock()
595
596	if len(lines) == 0 && !force {
597		// Nothing to flush.
598		return false
599	}
600
601	rescueLogs := false
602	defer func() {
603		if rescueLogs {
604			c.pendingLogs.Lock()
605			c.pendingLogs.lines = append(lines, c.pendingLogs.lines...)
606			c.pendingLogs.Unlock()
607		}
608	}()
609
610	buf, err := proto.Marshal(&logpb.UserAppLogGroup{
611		LogLine: lines,
612	})
613	if err != nil {
614		log.Printf("internal.flushLog: marshaling UserAppLogGroup: %v", err)
615		rescueLogs = true
616		return false
617	}
618
619	req := &logpb.FlushRequest{
620		Logs: buf,
621	}
622	res := &basepb.VoidProto{}
623	c.pendingLogs.Lock()
624	c.pendingLogs.flushes++
625	c.pendingLogs.Unlock()
626	if err := Call(toContext(c), "logservice", "Flush", req, res); err != nil {
627		log.Printf("internal.flushLog: Flush RPC: %v", err)
628		rescueLogs = true
629		return false
630	}
631	return true
632}
633
634const (
635	// Log flushing parameters.
636	flushInterval      = 1 * time.Second
637	forceFlushInterval = 60 * time.Second
638)
639
640func (c *context) logFlusher(stop <-chan int) {
641	lastFlush := time.Now()
642	tick := time.NewTicker(flushInterval)
643	for {
644		select {
645		case <-stop:
646			// Request finished.
647			tick.Stop()
648			return
649		case <-tick.C:
650			force := time.Now().Sub(lastFlush) > forceFlushInterval
651			if c.flushLog(force) {
652				lastFlush = time.Now()
653			}
654		}
655	}
656}
657
658func ContextForTesting(req *http.Request) netcontext.Context {
659	return toContext(&context{req: req})
660}
661