• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * libjingle
3  * Copyright 2004--2006, Google Inc.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  *
8  *  1. Redistributions of source code must retain the above copyright notice,
9  *     this list of conditions and the following disclaimer.
10  *  2. Redistributions in binary form must reproduce the above copyright notice,
11  *     this list of conditions and the following disclaimer in the documentation
12  *     and/or other materials provided with the distribution.
13  *  3. The name of the author may not be used to endorse or promote products
14  *     derived from this software without specific prior written permission.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
17  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
18  * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
19  * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25  * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26  */
27 
28 #include <string>
29 #include "talk/base/basictypes.h"
30 #include "talk/base/common.h"
31 #include "talk/base/logging.h"
32 #include "talk/base/scoped_ptr.h"
33 #include "talk/base/stringutils.h"
34 #include "talk/p2p/base/transportchannel.h"
35 #include "pseudotcpchannel.h"
36 
37 using namespace talk_base;
38 
39 namespace cricket {
40 
41 extern const talk_base::ConstantLabel SESSION_STATES[];
42 
43 // MSG_WK_* - worker thread messages
44 // MSG_ST_* - stream thread messages
45 // MSG_SI_* - signal thread messages
46 
47 enum {
48   MSG_WK_CLOCK = 1,
49   MSG_WK_PURGE,
50   MSG_ST_EVENT,
51   MSG_SI_DESTROYCHANNEL,
52   MSG_SI_DESTROY,
53 };
54 
55 struct EventData : public MessageData {
56   int event, error;
EventDatacricket::EventData57   EventData(int ev, int err = 0) : event(ev), error(err) { }
58 };
59 
60 ///////////////////////////////////////////////////////////////////////////////
61 // PseudoTcpChannel::InternalStream
62 ///////////////////////////////////////////////////////////////////////////////
63 
64 class PseudoTcpChannel::InternalStream : public StreamInterface {
65 public:
66   InternalStream(PseudoTcpChannel* parent);
67   virtual ~InternalStream();
68 
69   virtual StreamState GetState() const;
70   virtual StreamResult Read(void* buffer, size_t buffer_len,
71                                        size_t* read, int* error);
72   virtual StreamResult Write(const void* data, size_t data_len,
73                                         size_t* written, int* error);
74   virtual void Close();
75 
76 private:
77   // parent_ is accessed and modified exclusively on the event thread, to
78   // avoid thread contention.  This means that the PseudoTcpChannel cannot go
79   // away until after it receives a Close() from TunnelStream.
80   PseudoTcpChannel* parent_;
81 };
82 
83 ///////////////////////////////////////////////////////////////////////////////
84 // PseudoTcpChannel
85 // Member object lifetime summaries:
86 //   session_ - passed in constructor, cleared when channel_ goes away.
87 //   channel_ - created in Connect, destroyed when session_ or tcp_ goes away.
88 //   tcp_ - created in Connect, destroyed when channel_ goes away, or connection
89 //     closes.
90 //   worker_thread_ - created when channel_ is created, purged when channel_ is
91 //     destroyed.
92 //   stream_ - created in GetStream, destroyed by owner at arbitrary time.
93 //   this - created in constructor, destroyed when worker_thread_ and stream_
94 //     are both gone.
95 ///////////////////////////////////////////////////////////////////////////////
96 
97 //
98 // Signal thread methods
99 //
100 
PseudoTcpChannel(Thread * stream_thread,Session * session)101 PseudoTcpChannel::PseudoTcpChannel(Thread* stream_thread, Session* session)
102   : signal_thread_(session->session_manager()->signaling_thread()),
103     worker_thread_(NULL),
104     stream_thread_(stream_thread),
105     session_(session), channel_(NULL), tcp_(NULL), stream_(NULL),
106     stream_readable_(false), pending_read_event_(false),
107     ready_to_connect_(false) {
108   ASSERT(signal_thread_->IsCurrent());
109   ASSERT(NULL != session_);
110 }
111 
~PseudoTcpChannel()112 PseudoTcpChannel::~PseudoTcpChannel() {
113   ASSERT(signal_thread_->IsCurrent());
114   ASSERT(worker_thread_ == NULL);
115   ASSERT(session_ == NULL);
116   ASSERT(channel_ == NULL);
117   ASSERT(stream_ == NULL);
118   ASSERT(tcp_ == NULL);
119 }
120 
Connect(const std::string & content_name,const std::string & channel_name)121 bool PseudoTcpChannel::Connect(const std::string& content_name,
122                                const std::string& channel_name) {
123   ASSERT(signal_thread_->IsCurrent());
124   CritScope lock(&cs_);
125 
126   if (channel_)
127     return false;
128 
129   ASSERT(session_ != NULL);
130   worker_thread_ = session_->session_manager()->worker_thread();
131   content_name_ = content_name;
132   channel_ = session_->CreateChannel(content_name, channel_name);
133   channel_name_ = channel_name;
134   channel_->SetOption(Socket::OPT_DONTFRAGMENT, 1);
135 
136   channel_->SignalDestroyed.connect(this,
137     &PseudoTcpChannel::OnChannelDestroyed);
138   channel_->SignalWritableState.connect(this,
139     &PseudoTcpChannel::OnChannelWritableState);
140   channel_->SignalReadPacket.connect(this,
141     &PseudoTcpChannel::OnChannelRead);
142   channel_->SignalRouteChange.connect(this,
143     &PseudoTcpChannel::OnChannelConnectionChanged);
144 
145   ASSERT(tcp_ == NULL);
146   tcp_ = new PseudoTcp(this, 0);
147   if (session_->initiator()) {
148     // Since we may try several protocols and network adapters that won't work,
149     // waiting until we get our first writable notification before initiating
150     // TCP negotiation.
151     ready_to_connect_ = true;
152   }
153 
154   return true;
155 }
156 
GetStream()157 StreamInterface* PseudoTcpChannel::GetStream() {
158   ASSERT(signal_thread_->IsCurrent());
159   CritScope lock(&cs_);
160   ASSERT(NULL != session_);
161   if (!stream_)
162     stream_ = new PseudoTcpChannel::InternalStream(this);
163   //TODO("should we disallow creation of new stream at some point?");
164   return stream_;
165 }
166 
OnChannelDestroyed(TransportChannel * channel)167 void PseudoTcpChannel::OnChannelDestroyed(TransportChannel* channel) {
168   LOG_F(LS_INFO) << "(" << channel->name() << ")";
169   ASSERT(signal_thread_->IsCurrent());
170   CritScope lock(&cs_);
171   ASSERT(channel == channel_);
172   signal_thread_->Clear(this, MSG_SI_DESTROYCHANNEL);
173   // When MSG_WK_PURGE is received, we know there will be no more messages from
174   // the worker thread.
175   worker_thread_->Clear(this, MSG_WK_CLOCK);
176   worker_thread_->Post(this, MSG_WK_PURGE);
177   session_ = NULL;
178   channel_ = NULL;
179   if ((stream_ != NULL)
180       && ((tcp_ == NULL) || (tcp_->State() != PseudoTcp::TCP_CLOSED)))
181     stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_CLOSE, 0));
182   if (tcp_) {
183     tcp_->Close(true);
184     AdjustClock();
185   }
186   SignalChannelClosed(this);
187 }
188 
OnSessionTerminate(Session * session)189 void PseudoTcpChannel::OnSessionTerminate(Session* session) {
190   // When the session terminates before we even connected
191   CritScope lock(&cs_);
192   if (session_ != NULL && channel_ == NULL) {
193     ASSERT(session == session_);
194     ASSERT(worker_thread_ == NULL);
195     ASSERT(tcp_ == NULL);
196     LOG(LS_INFO) << "Destroying unconnected PseudoTcpChannel";
197     session_ = NULL;
198     if (stream_ != NULL)
199       stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_CLOSE, -1));
200   }
201 
202   // Even though session_ is being destroyed, we mustn't clear the pointer,
203   // since we'll need it to tear down channel_.
204   //
205   // TODO(wez): Is it always the case that if channel_ != NULL then we'll get
206   // a channel-destroyed notification?
207 }
208 
GetOption(PseudoTcp::Option opt,int * value)209 void PseudoTcpChannel::GetOption(PseudoTcp::Option opt, int* value) {
210   ASSERT(signal_thread_->IsCurrent());
211   CritScope lock(&cs_);
212   ASSERT(tcp_ != NULL);
213   tcp_->GetOption(opt, value);
214 }
215 
SetOption(PseudoTcp::Option opt,int value)216 void PseudoTcpChannel::SetOption(PseudoTcp::Option opt, int value) {
217   ASSERT(signal_thread_->IsCurrent());
218   CritScope lock(&cs_);
219   ASSERT(tcp_ != NULL);
220   tcp_->SetOption(opt, value);
221 }
222 
223 //
224 // Stream thread methods
225 //
226 
GetState() const227 StreamState PseudoTcpChannel::GetState() const {
228   ASSERT(stream_ != NULL && stream_thread_->IsCurrent());
229   CritScope lock(&cs_);
230   if (!session_)
231     return SS_CLOSED;
232   if (!tcp_)
233     return SS_OPENING;
234   switch (tcp_->State()) {
235     case PseudoTcp::TCP_LISTEN:
236     case PseudoTcp::TCP_SYN_SENT:
237     case PseudoTcp::TCP_SYN_RECEIVED:
238       return SS_OPENING;
239     case PseudoTcp::TCP_ESTABLISHED:
240       return SS_OPEN;
241     case PseudoTcp::TCP_CLOSED:
242     default:
243       return SS_CLOSED;
244   }
245 }
246 
Read(void * buffer,size_t buffer_len,size_t * read,int * error)247 StreamResult PseudoTcpChannel::Read(void* buffer, size_t buffer_len,
248                                     size_t* read, int* error) {
249   ASSERT(stream_ != NULL && stream_thread_->IsCurrent());
250   CritScope lock(&cs_);
251   if (!tcp_)
252     return SR_BLOCK;
253 
254   stream_readable_ = false;
255   int result = tcp_->Recv(static_cast<char*>(buffer), buffer_len);
256   //LOG_F(LS_VERBOSE) << "Recv returned: " << result;
257   if (result > 0) {
258     if (read)
259       *read = result;
260     // PseudoTcp doesn't currently support repeated Readable signals.  Simulate
261     // them here.
262     stream_readable_ = true;
263     if (!pending_read_event_) {
264       pending_read_event_ = true;
265       stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_READ), true);
266     }
267     return SR_SUCCESS;
268   } else if (IsBlockingError(tcp_->GetError())) {
269     return SR_BLOCK;
270   } else {
271     if (error)
272       *error = tcp_->GetError();
273     return SR_ERROR;
274   }
275   // This spot is never reached.
276 }
277 
Write(const void * data,size_t data_len,size_t * written,int * error)278 StreamResult PseudoTcpChannel::Write(const void* data, size_t data_len,
279                                      size_t* written, int* error) {
280   ASSERT(stream_ != NULL && stream_thread_->IsCurrent());
281   CritScope lock(&cs_);
282   if (!tcp_)
283     return SR_BLOCK;
284   int result = tcp_->Send(static_cast<const char*>(data), data_len);
285   //LOG_F(LS_VERBOSE) << "Send returned: " << result;
286   if (result > 0) {
287     if (written)
288       *written = result;
289     return SR_SUCCESS;
290   } else if (IsBlockingError(tcp_->GetError())) {
291     return SR_BLOCK;
292   } else {
293     if (error)
294       *error = tcp_->GetError();
295     return SR_ERROR;
296   }
297   // This spot is never reached.
298 }
299 
Close()300 void PseudoTcpChannel::Close() {
301   ASSERT(stream_ != NULL && stream_thread_->IsCurrent());
302   CritScope lock(&cs_);
303   stream_ = NULL;
304   // Clear out any pending event notifications
305   stream_thread_->Clear(this, MSG_ST_EVENT);
306   if (tcp_) {
307     tcp_->Close(false);
308     AdjustClock();
309   } else {
310     CheckDestroy();
311   }
312 }
313 
314 //
315 // Worker thread methods
316 //
317 
OnChannelWritableState(TransportChannel * channel)318 void PseudoTcpChannel::OnChannelWritableState(TransportChannel* channel) {
319   LOG_F(LS_VERBOSE) << "[" << channel_name_ << "]";
320   ASSERT(worker_thread_->IsCurrent());
321   CritScope lock(&cs_);
322   if (!channel_) {
323     LOG_F(LS_WARNING) << "NULL channel";
324     return;
325   }
326   ASSERT(channel == channel_);
327   if (!tcp_) {
328     LOG_F(LS_WARNING) << "NULL tcp";
329     return;
330   }
331   if (!ready_to_connect_ || !channel->writable())
332     return;
333 
334   ready_to_connect_ = false;
335   tcp_->Connect();
336   AdjustClock();
337 }
338 
OnChannelRead(TransportChannel * channel,const char * data,size_t size)339 void PseudoTcpChannel::OnChannelRead(TransportChannel* channel,
340                                      const char* data, size_t size) {
341   //LOG_F(LS_VERBOSE) << "(" << size << ")";
342   ASSERT(worker_thread_->IsCurrent());
343   CritScope lock(&cs_);
344   if (!channel_) {
345     LOG_F(LS_WARNING) << "NULL channel";
346     return;
347   }
348   ASSERT(channel == channel_);
349   if (!tcp_) {
350     LOG_F(LS_WARNING) << "NULL tcp";
351     return;
352   }
353   tcp_->NotifyPacket(data, size);
354   AdjustClock();
355 }
356 
OnChannelConnectionChanged(TransportChannel * channel,const SocketAddress & addr)357 void PseudoTcpChannel::OnChannelConnectionChanged(TransportChannel* channel,
358                                                   const SocketAddress& addr) {
359   LOG_F(LS_VERBOSE) << "[" << channel_name_ << "]";
360   ASSERT(worker_thread_->IsCurrent());
361   CritScope lock(&cs_);
362   if (!channel_) {
363     LOG_F(LS_WARNING) << "NULL channel";
364     return;
365   }
366   ASSERT(channel == channel_);
367   if (!tcp_) {
368     LOG_F(LS_WARNING) << "NULL tcp";
369     return;
370   }
371 
372   uint16 mtu = 1280;  // safe default
373   talk_base::scoped_ptr<Socket> mtu_socket(
374       worker_thread_->socketserver()->CreateSocket(SOCK_DGRAM));
375   if (mtu_socket->Connect(addr) < 0 ||
376       mtu_socket->EstimateMTU(&mtu) < 0) {
377     LOG_F(LS_WARNING) << "Failed to estimate MTU, error="
378                       << mtu_socket->GetError();
379   }
380 
381   LOG_F(LS_VERBOSE) << "Using MTU of " << mtu << " bytes";
382   tcp_->NotifyMTU(mtu);
383   AdjustClock();
384 }
385 
OnTcpOpen(PseudoTcp * tcp)386 void PseudoTcpChannel::OnTcpOpen(PseudoTcp* tcp) {
387   LOG_F(LS_VERBOSE) << "[" << channel_name_ << "]";
388   ASSERT(cs_.CurrentThreadIsOwner());
389   ASSERT(worker_thread_->IsCurrent());
390   ASSERT(tcp == tcp_);
391   if (stream_) {
392     stream_readable_ = true;
393     pending_read_event_ = true;
394     stream_thread_->Post(this, MSG_ST_EVENT,
395                          new EventData(SE_OPEN | SE_READ | SE_WRITE));
396   }
397 }
398 
OnTcpReadable(PseudoTcp * tcp)399 void PseudoTcpChannel::OnTcpReadable(PseudoTcp* tcp) {
400   //LOG_F(LS_VERBOSE);
401   ASSERT(cs_.CurrentThreadIsOwner());
402   ASSERT(worker_thread_->IsCurrent());
403   ASSERT(tcp == tcp_);
404   if (stream_) {
405     stream_readable_ = true;
406     if (!pending_read_event_) {
407       pending_read_event_ = true;
408       stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_READ));
409     }
410   }
411 }
412 
OnTcpWriteable(PseudoTcp * tcp)413 void PseudoTcpChannel::OnTcpWriteable(PseudoTcp* tcp) {
414   //LOG_F(LS_VERBOSE);
415   ASSERT(cs_.CurrentThreadIsOwner());
416   ASSERT(worker_thread_->IsCurrent());
417   ASSERT(tcp == tcp_);
418   if (stream_)
419     stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_WRITE));
420 }
421 
OnTcpClosed(PseudoTcp * tcp,uint32 nError)422 void PseudoTcpChannel::OnTcpClosed(PseudoTcp* tcp, uint32 nError) {
423   LOG_F(LS_VERBOSE) << "[" << channel_name_ << "]";
424   ASSERT(cs_.CurrentThreadIsOwner());
425   ASSERT(worker_thread_->IsCurrent());
426   ASSERT(tcp == tcp_);
427   if (stream_)
428     stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_CLOSE, nError));
429 }
430 
431 //
432 // Multi-thread methods
433 //
434 
OnMessage(Message * pmsg)435 void PseudoTcpChannel::OnMessage(Message* pmsg) {
436   if (pmsg->message_id == MSG_WK_CLOCK) {
437 
438     ASSERT(worker_thread_->IsCurrent());
439     //LOG(LS_INFO) << "PseudoTcpChannel::OnMessage(MSG_WK_CLOCK)";
440     CritScope lock(&cs_);
441     if (tcp_) {
442       tcp_->NotifyClock(PseudoTcp::Now());
443       AdjustClock(false);
444     }
445 
446   } else if (pmsg->message_id == MSG_WK_PURGE) {
447 
448     ASSERT(worker_thread_->IsCurrent());
449     LOG_F(LS_INFO) << "(MSG_WK_PURGE)";
450     // At this point, we know there are no additional worker thread messages.
451     CritScope lock(&cs_);
452     ASSERT(NULL == session_);
453     ASSERT(NULL == channel_);
454     worker_thread_ = NULL;
455     CheckDestroy();
456 
457   } else if (pmsg->message_id == MSG_ST_EVENT) {
458 
459     ASSERT(stream_thread_->IsCurrent());
460     //LOG(LS_INFO) << "PseudoTcpChannel::OnMessage(MSG_ST_EVENT, "
461     //             << data->event << ", " << data->error << ")";
462     ASSERT(stream_ != NULL);
463     EventData* data = static_cast<EventData*>(pmsg->pdata);
464     if (data->event & SE_READ) {
465       CritScope lock(&cs_);
466       pending_read_event_ = false;
467     }
468     stream_->SignalEvent(stream_, data->event, data->error);
469     delete data;
470 
471   } else if (pmsg->message_id == MSG_SI_DESTROYCHANNEL) {
472 
473     ASSERT(signal_thread_->IsCurrent());
474     LOG_F(LS_INFO) << "(MSG_SI_DESTROYCHANNEL)";
475     ASSERT(session_ != NULL);
476     ASSERT(channel_ != NULL);
477     session_->DestroyChannel(content_name_, channel_->name());
478 
479   } else if (pmsg->message_id == MSG_SI_DESTROY) {
480 
481     ASSERT(signal_thread_->IsCurrent());
482     LOG_F(LS_INFO) << "(MSG_SI_DESTROY)";
483     // The message queue is empty, so it is safe to destroy ourselves.
484     delete this;
485 
486   } else {
487     ASSERT(false);
488   }
489 }
490 
TcpWritePacket(PseudoTcp * tcp,const char * buffer,size_t len)491 IPseudoTcpNotify::WriteResult PseudoTcpChannel::TcpWritePacket(
492     PseudoTcp* tcp, const char* buffer, size_t len) {
493   ASSERT(cs_.CurrentThreadIsOwner());
494   ASSERT(tcp == tcp_);
495   ASSERT(NULL != channel_);
496   int sent = channel_->SendPacket(buffer, len);
497   if (sent > 0) {
498     //LOG_F(LS_VERBOSE) << "(" << sent << ") Sent";
499     return IPseudoTcpNotify::WR_SUCCESS;
500   } else if (IsBlockingError(channel_->GetError())) {
501     LOG_F(LS_VERBOSE) << "Blocking";
502     return IPseudoTcpNotify::WR_SUCCESS;
503   } else if (channel_->GetError() == EMSGSIZE) {
504     LOG_F(LS_ERROR) << "EMSGSIZE";
505     return IPseudoTcpNotify::WR_TOO_LARGE;
506   } else {
507     PLOG(LS_ERROR, channel_->GetError()) << "PseudoTcpChannel::TcpWritePacket";
508     ASSERT(false);
509     return IPseudoTcpNotify::WR_FAIL;
510   }
511 }
512 
AdjustClock(bool clear)513 void PseudoTcpChannel::AdjustClock(bool clear) {
514   ASSERT(cs_.CurrentThreadIsOwner());
515   ASSERT(NULL != tcp_);
516 
517   long timeout = 0;
518   if (tcp_->GetNextClock(PseudoTcp::Now(), timeout)) {
519     ASSERT(NULL != channel_);
520     // Reset the next clock, by clearing the old and setting a new one.
521     if (clear)
522       worker_thread_->Clear(this, MSG_WK_CLOCK);
523     worker_thread_->PostDelayed(_max(timeout, 0L), this, MSG_WK_CLOCK);
524     return;
525   }
526 
527   delete tcp_;
528   tcp_ = NULL;
529   ready_to_connect_ = false;
530 
531   if (channel_) {
532     // If TCP has failed, no need for channel_ anymore
533     signal_thread_->Post(this, MSG_SI_DESTROYCHANNEL);
534   }
535 }
536 
CheckDestroy()537 void PseudoTcpChannel::CheckDestroy() {
538   ASSERT(cs_.CurrentThreadIsOwner());
539   if ((worker_thread_ != NULL) || (stream_ != NULL))
540     return;
541   signal_thread_->Post(this, MSG_SI_DESTROY);
542 }
543 
544 ///////////////////////////////////////////////////////////////////////////////
545 // PseudoTcpChannel::InternalStream
546 ///////////////////////////////////////////////////////////////////////////////
547 
InternalStream(PseudoTcpChannel * parent)548 PseudoTcpChannel::InternalStream::InternalStream(PseudoTcpChannel* parent)
549   : parent_(parent) {
550 }
551 
~InternalStream()552 PseudoTcpChannel::InternalStream::~InternalStream() {
553   Close();
554 }
555 
GetState() const556 StreamState PseudoTcpChannel::InternalStream::GetState() const {
557   if (!parent_)
558     return SS_CLOSED;
559   return parent_->GetState();
560 }
561 
Read(void * buffer,size_t buffer_len,size_t * read,int * error)562 StreamResult PseudoTcpChannel::InternalStream::Read(
563     void* buffer, size_t buffer_len, size_t* read, int* error) {
564   if (!parent_) {
565     if (error)
566       *error = ENOTCONN;
567     return SR_ERROR;
568   }
569   return parent_->Read(buffer, buffer_len, read, error);
570 }
571 
Write(const void * data,size_t data_len,size_t * written,int * error)572 StreamResult PseudoTcpChannel::InternalStream::Write(
573     const void* data, size_t data_len,  size_t* written, int* error) {
574   if (!parent_) {
575     if (error)
576       *error = ENOTCONN;
577     return SR_ERROR;
578   }
579   return parent_->Write(data, data_len, written, error);
580 }
581 
Close()582 void PseudoTcpChannel::InternalStream::Close() {
583   if (!parent_)
584     return;
585   parent_->Close();
586   parent_ = NULL;
587 }
588 
589 ///////////////////////////////////////////////////////////////////////////////
590 
591 } // namespace cricket
592