1 /*
2 * Copyright 2004 The WebRTC Project Authors. All rights reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11
12 #if defined(WEBRTC_WIN)
13 #include "webrtc/base/win32.h"
14 #else // !WEBRTC_WIN
15 #define SEC_E_CERT_EXPIRED (-2146893016)
16 #endif // !WEBRTC_WIN
17
18 #include "webrtc/base/common.h"
19 #include "webrtc/base/httpbase.h"
20 #include "webrtc/base/logging.h"
21 #include "webrtc/base/socket.h"
22 #include "webrtc/base/stringutils.h"
23 #include "webrtc/base/thread.h"
24
25 namespace rtc {
26
27 //////////////////////////////////////////////////////////////////////
28 // Helpers
29 //////////////////////////////////////////////////////////////////////
30
MatchHeader(const char * str,size_t len,HttpHeader header)31 bool MatchHeader(const char* str, size_t len, HttpHeader header) {
32 const char* const header_str = ToString(header);
33 const size_t header_len = strlen(header_str);
34 return (len == header_len) && (_strnicmp(str, header_str, header_len) == 0);
35 }
36
37 enum {
38 MSG_READ
39 };
40
41 //////////////////////////////////////////////////////////////////////
42 // HttpParser
43 //////////////////////////////////////////////////////////////////////
44
HttpParser()45 HttpParser::HttpParser() {
46 reset();
47 }
48
~HttpParser()49 HttpParser::~HttpParser() {
50 }
51
52 void
reset()53 HttpParser::reset() {
54 state_ = ST_LEADER;
55 chunked_ = false;
56 data_size_ = SIZE_UNKNOWN;
57 }
58
59 HttpParser::ProcessResult
Process(const char * buffer,size_t len,size_t * processed,HttpError * error)60 HttpParser::Process(const char* buffer, size_t len, size_t* processed,
61 HttpError* error) {
62 *processed = 0;
63 *error = HE_NONE;
64
65 if (state_ >= ST_COMPLETE) {
66 ASSERT(false);
67 return PR_COMPLETE;
68 }
69
70 while (true) {
71 if (state_ < ST_DATA) {
72 size_t pos = *processed;
73 while ((pos < len) && (buffer[pos] != '\n')) {
74 pos += 1;
75 }
76 if (pos >= len) {
77 break; // don't have a full header
78 }
79 const char* line = buffer + *processed;
80 size_t len = (pos - *processed);
81 *processed = pos + 1;
82 while ((len > 0) && isspace(static_cast<unsigned char>(line[len-1]))) {
83 len -= 1;
84 }
85 ProcessResult result = ProcessLine(line, len, error);
86 LOG(LS_VERBOSE) << "Processed line, result=" << result;
87
88 if (PR_CONTINUE != result) {
89 return result;
90 }
91 } else if (data_size_ == 0) {
92 if (chunked_) {
93 state_ = ST_CHUNKTERM;
94 } else {
95 return PR_COMPLETE;
96 }
97 } else {
98 size_t available = len - *processed;
99 if (available <= 0) {
100 break; // no more data
101 }
102 if ((data_size_ != SIZE_UNKNOWN) && (available > data_size_)) {
103 available = data_size_;
104 }
105 size_t read = 0;
106 ProcessResult result = ProcessData(buffer + *processed, available, read,
107 error);
108 LOG(LS_VERBOSE) << "Processed data, result: " << result << " read: "
109 << read << " err: " << error;
110
111 if (PR_CONTINUE != result) {
112 return result;
113 }
114 *processed += read;
115 if (data_size_ != SIZE_UNKNOWN) {
116 data_size_ -= read;
117 }
118 }
119 }
120
121 return PR_CONTINUE;
122 }
123
124 HttpParser::ProcessResult
ProcessLine(const char * line,size_t len,HttpError * error)125 HttpParser::ProcessLine(const char* line, size_t len, HttpError* error) {
126 LOG_F(LS_VERBOSE) << " state: " << state_ << " line: "
127 << std::string(line, len) << " len: " << len << " err: "
128 << error;
129
130 switch (state_) {
131 case ST_LEADER:
132 state_ = ST_HEADERS;
133 return ProcessLeader(line, len, error);
134
135 case ST_HEADERS:
136 if (len > 0) {
137 const char* value = strchrn(line, len, ':');
138 if (!value) {
139 *error = HE_PROTOCOL;
140 return PR_COMPLETE;
141 }
142 size_t nlen = (value - line);
143 const char* eol = line + len;
144 do {
145 value += 1;
146 } while ((value < eol) && isspace(static_cast<unsigned char>(*value)));
147 size_t vlen = eol - value;
148 if (MatchHeader(line, nlen, HH_CONTENT_LENGTH)) {
149 // sscanf isn't safe with strings that aren't null-terminated, and there
150 // is no guarantee that |value| is.
151 // Create a local copy that is null-terminated.
152 std::string value_str(value, vlen);
153 unsigned int temp_size;
154 if (sscanf(value_str.c_str(), "%u", &temp_size) != 1) {
155 *error = HE_PROTOCOL;
156 return PR_COMPLETE;
157 }
158 data_size_ = static_cast<size_t>(temp_size);
159 } else if (MatchHeader(line, nlen, HH_TRANSFER_ENCODING)) {
160 if ((vlen == 7) && (_strnicmp(value, "chunked", 7) == 0)) {
161 chunked_ = true;
162 } else if ((vlen == 8) && (_strnicmp(value, "identity", 8) == 0)) {
163 chunked_ = false;
164 } else {
165 *error = HE_PROTOCOL;
166 return PR_COMPLETE;
167 }
168 }
169 return ProcessHeader(line, nlen, value, vlen, error);
170 } else {
171 state_ = chunked_ ? ST_CHUNKSIZE : ST_DATA;
172 return ProcessHeaderComplete(chunked_, data_size_, error);
173 }
174 break;
175
176 case ST_CHUNKSIZE:
177 if (len > 0) {
178 char* ptr = NULL;
179 data_size_ = strtoul(line, &ptr, 16);
180 if (ptr != line + len) {
181 *error = HE_PROTOCOL;
182 return PR_COMPLETE;
183 }
184 state_ = (data_size_ == 0) ? ST_TRAILERS : ST_DATA;
185 } else {
186 *error = HE_PROTOCOL;
187 return PR_COMPLETE;
188 }
189 break;
190
191 case ST_CHUNKTERM:
192 if (len > 0) {
193 *error = HE_PROTOCOL;
194 return PR_COMPLETE;
195 } else {
196 state_ = chunked_ ? ST_CHUNKSIZE : ST_DATA;
197 }
198 break;
199
200 case ST_TRAILERS:
201 if (len == 0) {
202 return PR_COMPLETE;
203 }
204 // *error = onHttpRecvTrailer();
205 break;
206
207 default:
208 ASSERT(false);
209 break;
210 }
211
212 return PR_CONTINUE;
213 }
214
215 bool
is_valid_end_of_input() const216 HttpParser::is_valid_end_of_input() const {
217 return (state_ == ST_DATA) && (data_size_ == SIZE_UNKNOWN);
218 }
219
220 void
complete(HttpError error)221 HttpParser::complete(HttpError error) {
222 if (state_ < ST_COMPLETE) {
223 state_ = ST_COMPLETE;
224 OnComplete(error);
225 }
226 }
227
228 //////////////////////////////////////////////////////////////////////
229 // HttpBase::DocumentStream
230 //////////////////////////////////////////////////////////////////////
231
232 class BlockingMemoryStream : public ExternalMemoryStream {
233 public:
BlockingMemoryStream(char * buffer,size_t size)234 BlockingMemoryStream(char* buffer, size_t size)
235 : ExternalMemoryStream(buffer, size) { }
236
DoReserve(size_t size,int * error)237 virtual StreamResult DoReserve(size_t size, int* error) {
238 return (buffer_length_ >= size) ? SR_SUCCESS : SR_BLOCK;
239 }
240 };
241
242 class HttpBase::DocumentStream : public StreamInterface {
243 public:
DocumentStream(HttpBase * base)244 DocumentStream(HttpBase* base) : base_(base), error_(HE_DEFAULT) { }
245
GetState() const246 virtual StreamState GetState() const {
247 if (NULL == base_)
248 return SS_CLOSED;
249 if (HM_RECV == base_->mode_)
250 return SS_OPEN;
251 return SS_OPENING;
252 }
253
Read(void * buffer,size_t buffer_len,size_t * read,int * error)254 virtual StreamResult Read(void* buffer, size_t buffer_len,
255 size_t* read, int* error) {
256 if (!base_) {
257 if (error) *error = error_;
258 return (HE_NONE == error_) ? SR_EOS : SR_ERROR;
259 }
260
261 if (HM_RECV != base_->mode_) {
262 return SR_BLOCK;
263 }
264
265 // DoReceiveLoop writes http document data to the StreamInterface* document
266 // member of HttpData. In this case, we want this data to be written
267 // directly to our buffer. To accomplish this, we wrap our buffer with a
268 // StreamInterface, and replace the existing document with our wrapper.
269 // When the method returns, we restore the old document. Ideally, we would
270 // pass our StreamInterface* to DoReceiveLoop, but due to the callbacks
271 // of HttpParser, we would still need to store the pointer temporarily.
272 scoped_ptr<StreamInterface>
273 stream(new BlockingMemoryStream(reinterpret_cast<char*>(buffer),
274 buffer_len));
275
276 // Replace the existing document with our wrapped buffer.
277 base_->data_->document.swap(stream);
278
279 // Pump the I/O loop. DoReceiveLoop is guaranteed not to attempt to
280 // complete the I/O process, which means that our wrapper is not in danger
281 // of being deleted. To ensure this, DoReceiveLoop returns true when it
282 // wants complete to be called. We make sure to uninstall our wrapper
283 // before calling complete().
284 HttpError http_error;
285 bool complete = base_->DoReceiveLoop(&http_error);
286
287 // Reinstall the original output document.
288 base_->data_->document.swap(stream);
289
290 // If we reach the end of the receive stream, we disconnect our stream
291 // adapter from the HttpBase, and further calls to read will either return
292 // EOS or ERROR, appropriately. Finally, we call complete().
293 StreamResult result = SR_BLOCK;
294 if (complete) {
295 HttpBase* base = Disconnect(http_error);
296 if (error) *error = error_;
297 result = (HE_NONE == error_) ? SR_EOS : SR_ERROR;
298 base->complete(http_error);
299 }
300
301 // Even if we are complete, if some data was read we must return SUCCESS.
302 // Future Reads will return EOS or ERROR based on the error_ variable.
303 size_t position;
304 stream->GetPosition(&position);
305 if (position > 0) {
306 if (read) *read = position;
307 result = SR_SUCCESS;
308 }
309 return result;
310 }
311
Write(const void * data,size_t data_len,size_t * written,int * error)312 virtual StreamResult Write(const void* data, size_t data_len,
313 size_t* written, int* error) {
314 if (error) *error = -1;
315 return SR_ERROR;
316 }
317
Close()318 virtual void Close() {
319 if (base_) {
320 HttpBase* base = Disconnect(HE_NONE);
321 if (HM_RECV == base->mode_ && base->http_stream_) {
322 // Read I/O could have been stalled on the user of this DocumentStream,
323 // so restart the I/O process now that we've removed ourselves.
324 base->http_stream_->PostEvent(SE_READ, 0);
325 }
326 }
327 }
328
GetAvailable(size_t * size) const329 virtual bool GetAvailable(size_t* size) const {
330 if (!base_ || HM_RECV != base_->mode_)
331 return false;
332 size_t data_size = base_->GetDataRemaining();
333 if (SIZE_UNKNOWN == data_size)
334 return false;
335 if (size)
336 *size = data_size;
337 return true;
338 }
339
Disconnect(HttpError error)340 HttpBase* Disconnect(HttpError error) {
341 ASSERT(NULL != base_);
342 ASSERT(NULL != base_->doc_stream_);
343 HttpBase* base = base_;
344 base_->doc_stream_ = NULL;
345 base_ = NULL;
346 error_ = error;
347 return base;
348 }
349
350 private:
351 HttpBase* base_;
352 HttpError error_;
353 };
354
355 //////////////////////////////////////////////////////////////////////
356 // HttpBase
357 //////////////////////////////////////////////////////////////////////
358
HttpBase()359 HttpBase::HttpBase() : mode_(HM_NONE), data_(NULL), notify_(NULL),
360 http_stream_(NULL), doc_stream_(NULL) {
361 }
362
~HttpBase()363 HttpBase::~HttpBase() {
364 ASSERT(HM_NONE == mode_);
365 }
366
367 bool
isConnected() const368 HttpBase::isConnected() const {
369 return (http_stream_ != NULL) && (http_stream_->GetState() == SS_OPEN);
370 }
371
372 bool
attach(StreamInterface * stream)373 HttpBase::attach(StreamInterface* stream) {
374 if ((mode_ != HM_NONE) || (http_stream_ != NULL) || (stream == NULL)) {
375 ASSERT(false);
376 return false;
377 }
378 http_stream_ = stream;
379 http_stream_->SignalEvent.connect(this, &HttpBase::OnHttpStreamEvent);
380 mode_ = (http_stream_->GetState() == SS_OPENING) ? HM_CONNECT : HM_NONE;
381 return true;
382 }
383
384 StreamInterface*
detach()385 HttpBase::detach() {
386 ASSERT(HM_NONE == mode_);
387 if (mode_ != HM_NONE) {
388 return NULL;
389 }
390 StreamInterface* stream = http_stream_;
391 http_stream_ = NULL;
392 if (stream) {
393 stream->SignalEvent.disconnect(this);
394 }
395 return stream;
396 }
397
398 void
send(HttpData * data)399 HttpBase::send(HttpData* data) {
400 ASSERT(HM_NONE == mode_);
401 if (mode_ != HM_NONE) {
402 return;
403 } else if (!isConnected()) {
404 OnHttpStreamEvent(http_stream_, SE_CLOSE, HE_DISCONNECTED);
405 return;
406 }
407
408 mode_ = HM_SEND;
409 data_ = data;
410 len_ = 0;
411 ignore_data_ = chunk_data_ = false;
412
413 if (data_->document) {
414 data_->document->SignalEvent.connect(this, &HttpBase::OnDocumentEvent);
415 }
416
417 std::string encoding;
418 if (data_->hasHeader(HH_TRANSFER_ENCODING, &encoding)
419 && (encoding == "chunked")) {
420 chunk_data_ = true;
421 }
422
423 len_ = data_->formatLeader(buffer_, sizeof(buffer_));
424 len_ += strcpyn(buffer_ + len_, sizeof(buffer_) - len_, "\r\n");
425
426 header_ = data_->begin();
427 if (header_ == data_->end()) {
428 // We must call this at least once, in the case where there are no headers.
429 queue_headers();
430 }
431
432 flush_data();
433 }
434
435 void
recv(HttpData * data)436 HttpBase::recv(HttpData* data) {
437 ASSERT(HM_NONE == mode_);
438 if (mode_ != HM_NONE) {
439 return;
440 } else if (!isConnected()) {
441 OnHttpStreamEvent(http_stream_, SE_CLOSE, HE_DISCONNECTED);
442 return;
443 }
444
445 mode_ = HM_RECV;
446 data_ = data;
447 len_ = 0;
448 ignore_data_ = chunk_data_ = false;
449
450 reset();
451 if (doc_stream_) {
452 doc_stream_->SignalEvent(doc_stream_, SE_OPEN | SE_READ, 0);
453 } else {
454 read_and_process_data();
455 }
456 }
457
458 void
abort(HttpError err)459 HttpBase::abort(HttpError err) {
460 if (mode_ != HM_NONE) {
461 if (http_stream_ != NULL) {
462 http_stream_->Close();
463 }
464 do_complete(err);
465 }
466 }
467
GetDocumentStream()468 StreamInterface* HttpBase::GetDocumentStream() {
469 if (doc_stream_)
470 return NULL;
471 doc_stream_ = new DocumentStream(this);
472 return doc_stream_;
473 }
474
HandleStreamClose(int error)475 HttpError HttpBase::HandleStreamClose(int error) {
476 if (http_stream_ != NULL) {
477 http_stream_->Close();
478 }
479 if (error == 0) {
480 if ((mode_ == HM_RECV) && is_valid_end_of_input()) {
481 return HE_NONE;
482 } else {
483 return HE_DISCONNECTED;
484 }
485 } else if (error == SOCKET_EACCES) {
486 return HE_AUTH;
487 } else if (error == SEC_E_CERT_EXPIRED) {
488 return HE_CERTIFICATE_EXPIRED;
489 }
490 LOG_F(LS_ERROR) << "(" << error << ")";
491 return (HM_CONNECT == mode_) ? HE_CONNECT_FAILED : HE_SOCKET_ERROR;
492 }
493
DoReceiveLoop(HttpError * error)494 bool HttpBase::DoReceiveLoop(HttpError* error) {
495 ASSERT(HM_RECV == mode_);
496 ASSERT(NULL != error);
497
498 // Do to the latency between receiving read notifications from
499 // pseudotcpchannel, we rely on repeated calls to read in order to acheive
500 // ideal throughput. The number of reads is limited to prevent starving
501 // the caller.
502
503 size_t loop_count = 0;
504 const size_t kMaxReadCount = 20;
505 bool process_requires_more_data = false;
506 do {
507 // The most frequent use of this function is response to new data available
508 // on http_stream_. Therefore, we optimize by attempting to read from the
509 // network first (as opposed to processing existing data first).
510
511 if (len_ < sizeof(buffer_)) {
512 // Attempt to buffer more data.
513 size_t read;
514 int read_error;
515 StreamResult read_result = http_stream_->Read(buffer_ + len_,
516 sizeof(buffer_) - len_,
517 &read, &read_error);
518 switch (read_result) {
519 case SR_SUCCESS:
520 ASSERT(len_ + read <= sizeof(buffer_));
521 len_ += read;
522 break;
523 case SR_BLOCK:
524 if (process_requires_more_data) {
525 // We're can't make progress until more data is available.
526 return false;
527 }
528 // Attempt to process the data already in our buffer.
529 break;
530 case SR_EOS:
531 // Clean close, with no error. Fall through to HandleStreamClose.
532 read_error = 0;
533 case SR_ERROR:
534 *error = HandleStreamClose(read_error);
535 return true;
536 }
537 } else if (process_requires_more_data) {
538 // We have too much unprocessed data in our buffer. This should only
539 // occur when a single HTTP header is longer than the buffer size (32K).
540 // Anything longer than that is almost certainly an error.
541 *error = HE_OVERFLOW;
542 return true;
543 }
544
545 // Process data in our buffer. Process is not guaranteed to process all
546 // the buffered data. In particular, it will wait until a complete
547 // protocol element (such as http header, or chunk size) is available,
548 // before processing it in its entirety. Also, it is valid and sometimes
549 // necessary to call Process with an empty buffer, since the state machine
550 // may have interrupted state transitions to complete.
551 size_t processed;
552 ProcessResult process_result = Process(buffer_, len_, &processed,
553 error);
554 ASSERT(processed <= len_);
555 len_ -= processed;
556 memmove(buffer_, buffer_ + processed, len_);
557 switch (process_result) {
558 case PR_CONTINUE:
559 // We need more data to make progress.
560 process_requires_more_data = true;
561 break;
562 case PR_BLOCK:
563 // We're stalled on writing the processed data.
564 return false;
565 case PR_COMPLETE:
566 // *error already contains the correct code.
567 return true;
568 }
569 } while (++loop_count <= kMaxReadCount);
570
571 LOG_F(LS_WARNING) << "danger of starvation";
572 return false;
573 }
574
575 void
read_and_process_data()576 HttpBase::read_and_process_data() {
577 HttpError error;
578 if (DoReceiveLoop(&error)) {
579 complete(error);
580 }
581 }
582
583 void
flush_data()584 HttpBase::flush_data() {
585 ASSERT(HM_SEND == mode_);
586
587 // When send_required is true, no more buffering can occur without a network
588 // write.
589 bool send_required = (len_ >= sizeof(buffer_));
590
591 while (true) {
592 ASSERT(len_ <= sizeof(buffer_));
593
594 // HTTP is inherently sensitive to round trip latency, since a frequent use
595 // case is for small requests and responses to be sent back and forth, and
596 // the lack of pipelining forces a single request to take a minimum of the
597 // round trip time. As a result, it is to our benefit to pack as much data
598 // into each packet as possible. Thus, we defer network writes until we've
599 // buffered as much data as possible.
600
601 if (!send_required && (header_ != data_->end())) {
602 // First, attempt to queue more header data.
603 send_required = queue_headers();
604 }
605
606 if (!send_required && data_->document) {
607 // Next, attempt to queue document data.
608
609 const size_t kChunkDigits = 8;
610 size_t offset, reserve;
611 if (chunk_data_) {
612 // Reserve characters at the start for X-byte hex value and \r\n
613 offset = len_ + kChunkDigits + 2;
614 // ... and 2 characters at the end for \r\n
615 reserve = offset + 2;
616 } else {
617 offset = len_;
618 reserve = offset;
619 }
620
621 if (reserve >= sizeof(buffer_)) {
622 send_required = true;
623 } else {
624 size_t read;
625 int error;
626 StreamResult result = data_->document->Read(buffer_ + offset,
627 sizeof(buffer_) - reserve,
628 &read, &error);
629 if (result == SR_SUCCESS) {
630 ASSERT(reserve + read <= sizeof(buffer_));
631 if (chunk_data_) {
632 // Prepend the chunk length in hex.
633 // Note: sprintfn appends a null terminator, which is why we can't
634 // combine it with the line terminator.
635 sprintfn(buffer_ + len_, kChunkDigits + 1, "%.*x",
636 kChunkDigits, read);
637 // Add line terminator to the chunk length.
638 memcpy(buffer_ + len_ + kChunkDigits, "\r\n", 2);
639 // Add line terminator to the end of the chunk.
640 memcpy(buffer_ + offset + read, "\r\n", 2);
641 }
642 len_ = reserve + read;
643 } else if (result == SR_BLOCK) {
644 // Nothing to do but flush data to the network.
645 send_required = true;
646 } else if (result == SR_EOS) {
647 if (chunk_data_) {
648 // Append the empty chunk and empty trailers, then turn off
649 // chunking.
650 ASSERT(len_ + 5 <= sizeof(buffer_));
651 memcpy(buffer_ + len_, "0\r\n\r\n", 5);
652 len_ += 5;
653 chunk_data_ = false;
654 } else if (0 == len_) {
655 // No more data to read, and no more data to write.
656 do_complete();
657 return;
658 }
659 // Although we are done reading data, there is still data which needs
660 // to be flushed to the network.
661 send_required = true;
662 } else {
663 LOG_F(LS_ERROR) << "Read error: " << error;
664 do_complete(HE_STREAM);
665 return;
666 }
667 }
668 }
669
670 if (0 == len_) {
671 // No data currently available to send.
672 if (!data_->document) {
673 // If there is no source document, that means we're done.
674 do_complete();
675 }
676 return;
677 }
678
679 size_t written;
680 int error;
681 StreamResult result = http_stream_->Write(buffer_, len_, &written, &error);
682 if (result == SR_SUCCESS) {
683 ASSERT(written <= len_);
684 len_ -= written;
685 memmove(buffer_, buffer_ + written, len_);
686 send_required = false;
687 } else if (result == SR_BLOCK) {
688 if (send_required) {
689 // Nothing more we can do until network is writeable.
690 return;
691 }
692 } else {
693 ASSERT(result == SR_ERROR);
694 LOG_F(LS_ERROR) << "error";
695 OnHttpStreamEvent(http_stream_, SE_CLOSE, error);
696 return;
697 }
698 }
699
700 ASSERT(false);
701 }
702
703 bool
queue_headers()704 HttpBase::queue_headers() {
705 ASSERT(HM_SEND == mode_);
706 while (header_ != data_->end()) {
707 size_t len = sprintfn(buffer_ + len_, sizeof(buffer_) - len_,
708 "%.*s: %.*s\r\n",
709 header_->first.size(), header_->first.data(),
710 header_->second.size(), header_->second.data());
711 if (len_ + len < sizeof(buffer_) - 3) {
712 len_ += len;
713 ++header_;
714 } else if (len_ == 0) {
715 LOG(WARNING) << "discarding header that is too long: " << header_->first;
716 ++header_;
717 } else {
718 // Not enough room for the next header, write to network first.
719 return true;
720 }
721 }
722 // End of headers
723 len_ += strcpyn(buffer_ + len_, sizeof(buffer_) - len_, "\r\n");
724 return false;
725 }
726
727 void
do_complete(HttpError err)728 HttpBase::do_complete(HttpError err) {
729 ASSERT(mode_ != HM_NONE);
730 HttpMode mode = mode_;
731 mode_ = HM_NONE;
732 if (data_ && data_->document) {
733 data_->document->SignalEvent.disconnect(this);
734 }
735 data_ = NULL;
736 if ((HM_RECV == mode) && doc_stream_) {
737 ASSERT(HE_NONE != err); // We should have Disconnected doc_stream_ already.
738 DocumentStream* ds = doc_stream_;
739 ds->Disconnect(err);
740 ds->SignalEvent(ds, SE_CLOSE, err);
741 }
742 if (notify_) {
743 notify_->onHttpComplete(mode, err);
744 }
745 }
746
747 //
748 // Stream Signals
749 //
750
751 void
OnHttpStreamEvent(StreamInterface * stream,int events,int error)752 HttpBase::OnHttpStreamEvent(StreamInterface* stream, int events, int error) {
753 ASSERT(stream == http_stream_);
754 if ((events & SE_OPEN) && (mode_ == HM_CONNECT)) {
755 do_complete();
756 return;
757 }
758
759 if ((events & SE_WRITE) && (mode_ == HM_SEND)) {
760 flush_data();
761 return;
762 }
763
764 if ((events & SE_READ) && (mode_ == HM_RECV)) {
765 if (doc_stream_) {
766 doc_stream_->SignalEvent(doc_stream_, SE_READ, 0);
767 } else {
768 read_and_process_data();
769 }
770 return;
771 }
772
773 if ((events & SE_CLOSE) == 0)
774 return;
775
776 HttpError http_error = HandleStreamClose(error);
777 if (mode_ == HM_RECV) {
778 complete(http_error);
779 } else if (mode_ != HM_NONE) {
780 do_complete(http_error);
781 } else if (notify_) {
782 notify_->onHttpClosed(http_error);
783 }
784 }
785
786 void
OnDocumentEvent(StreamInterface * stream,int events,int error)787 HttpBase::OnDocumentEvent(StreamInterface* stream, int events, int error) {
788 ASSERT(stream == data_->document.get());
789 if ((events & SE_WRITE) && (mode_ == HM_RECV)) {
790 read_and_process_data();
791 return;
792 }
793
794 if ((events & SE_READ) && (mode_ == HM_SEND)) {
795 flush_data();
796 return;
797 }
798
799 if (events & SE_CLOSE) {
800 LOG_F(LS_ERROR) << "Read error: " << error;
801 do_complete(HE_STREAM);
802 return;
803 }
804 }
805
806 //
807 // HttpParser Implementation
808 //
809
810 HttpParser::ProcessResult
ProcessLeader(const char * line,size_t len,HttpError * error)811 HttpBase::ProcessLeader(const char* line, size_t len, HttpError* error) {
812 *error = data_->parseLeader(line, len);
813 return (HE_NONE == *error) ? PR_CONTINUE : PR_COMPLETE;
814 }
815
816 HttpParser::ProcessResult
ProcessHeader(const char * name,size_t nlen,const char * value,size_t vlen,HttpError * error)817 HttpBase::ProcessHeader(const char* name, size_t nlen, const char* value,
818 size_t vlen, HttpError* error) {
819 std::string sname(name, nlen), svalue(value, vlen);
820 data_->addHeader(sname, svalue);
821 return PR_CONTINUE;
822 }
823
824 HttpParser::ProcessResult
ProcessHeaderComplete(bool chunked,size_t & data_size,HttpError * error)825 HttpBase::ProcessHeaderComplete(bool chunked, size_t& data_size,
826 HttpError* error) {
827 StreamInterface* old_docstream = doc_stream_;
828 if (notify_) {
829 *error = notify_->onHttpHeaderComplete(chunked, data_size);
830 // The request must not be aborted as a result of this callback.
831 ASSERT(NULL != data_);
832 }
833 if ((HE_NONE == *error) && data_->document) {
834 data_->document->SignalEvent.connect(this, &HttpBase::OnDocumentEvent);
835 }
836 if (HE_NONE != *error) {
837 return PR_COMPLETE;
838 }
839 if (old_docstream != doc_stream_) {
840 // Break out of Process loop, since our I/O model just changed.
841 return PR_BLOCK;
842 }
843 return PR_CONTINUE;
844 }
845
846 HttpParser::ProcessResult
ProcessData(const char * data,size_t len,size_t & read,HttpError * error)847 HttpBase::ProcessData(const char* data, size_t len, size_t& read,
848 HttpError* error) {
849 if (ignore_data_ || !data_->document) {
850 read = len;
851 return PR_CONTINUE;
852 }
853 int write_error = 0;
854 switch (data_->document->Write(data, len, &read, &write_error)) {
855 case SR_SUCCESS:
856 return PR_CONTINUE;
857 case SR_BLOCK:
858 return PR_BLOCK;
859 case SR_EOS:
860 LOG_F(LS_ERROR) << "Unexpected EOS";
861 *error = HE_STREAM;
862 return PR_COMPLETE;
863 case SR_ERROR:
864 default:
865 LOG_F(LS_ERROR) << "Write error: " << write_error;
866 *error = HE_STREAM;
867 return PR_COMPLETE;
868 }
869 }
870
871 void
OnComplete(HttpError err)872 HttpBase::OnComplete(HttpError err) {
873 LOG_F(LS_VERBOSE);
874 do_complete(err);
875 }
876
877 } // namespace rtc
878