1 /*
2 * libjingle
3 * Copyright 2004--2006, Google Inc.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * 1. Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright notice,
11 * this list of conditions and the following disclaimer in the documentation
12 * and/or other materials provided with the distribution.
13 * 3. The name of the author may not be used to endorse or promote products
14 * derived from this software without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 */
27
28 #include <string>
29 #include "pseudotcpchannel.h"
30 #include "talk/p2p/base/candidate.h"
31 #include "talk/p2p/base/transportchannel.h"
32 #include "webrtc/base/basictypes.h"
33 #include "webrtc/base/common.h"
34 #include "webrtc/base/logging.h"
35 #include "webrtc/base/scoped_ptr.h"
36 #include "webrtc/base/stringutils.h"
37
38 using namespace rtc;
39
40 namespace cricket {
41
42 extern const rtc::ConstantLabel SESSION_STATES[];
43
44 // MSG_WK_* - worker thread messages
45 // MSG_ST_* - stream thread messages
46 // MSG_SI_* - signal thread messages
47
48 enum {
49 MSG_WK_CLOCK = 1,
50 MSG_WK_PURGE,
51 MSG_ST_EVENT,
52 MSG_SI_DESTROYCHANNEL,
53 MSG_SI_DESTROY,
54 };
55
56 struct EventData : public MessageData {
57 int event, error;
EventDatacricket::EventData58 EventData(int ev, int err = 0) : event(ev), error(err) { }
59 };
60
61 ///////////////////////////////////////////////////////////////////////////////
62 // PseudoTcpChannel::InternalStream
63 ///////////////////////////////////////////////////////////////////////////////
64
65 class PseudoTcpChannel::InternalStream : public StreamInterface {
66 public:
67 InternalStream(PseudoTcpChannel* parent);
68 virtual ~InternalStream();
69
70 virtual StreamState GetState() const;
71 virtual StreamResult Read(void* buffer, size_t buffer_len,
72 size_t* read, int* error);
73 virtual StreamResult Write(const void* data, size_t data_len,
74 size_t* written, int* error);
75 virtual void Close();
76
77 private:
78 // parent_ is accessed and modified exclusively on the event thread, to
79 // avoid thread contention. This means that the PseudoTcpChannel cannot go
80 // away until after it receives a Close() from TunnelStream.
81 PseudoTcpChannel* parent_;
82 };
83
84 ///////////////////////////////////////////////////////////////////////////////
85 // PseudoTcpChannel
86 // Member object lifetime summaries:
87 // session_ - passed in constructor, cleared when channel_ goes away.
88 // channel_ - created in Connect, destroyed when session_ or tcp_ goes away.
89 // tcp_ - created in Connect, destroyed when channel_ goes away, or connection
90 // closes.
91 // worker_thread_ - created when channel_ is created, purged when channel_ is
92 // destroyed.
93 // stream_ - created in GetStream, destroyed by owner at arbitrary time.
94 // this - created in constructor, destroyed when worker_thread_ and stream_
95 // are both gone.
96 ///////////////////////////////////////////////////////////////////////////////
97
98 //
99 // Signal thread methods
100 //
101
PseudoTcpChannel(Thread * stream_thread,Session * session)102 PseudoTcpChannel::PseudoTcpChannel(Thread* stream_thread, Session* session)
103 : signal_thread_(session->session_manager()->signaling_thread()),
104 worker_thread_(NULL),
105 stream_thread_(stream_thread),
106 session_(session), channel_(NULL), tcp_(NULL), stream_(NULL),
107 stream_readable_(false), pending_read_event_(false),
108 ready_to_connect_(false) {
109 ASSERT(signal_thread_->IsCurrent());
110 ASSERT(NULL != session_);
111 }
112
~PseudoTcpChannel()113 PseudoTcpChannel::~PseudoTcpChannel() {
114 ASSERT(signal_thread_->IsCurrent());
115 ASSERT(worker_thread_ == NULL);
116 ASSERT(session_ == NULL);
117 ASSERT(channel_ == NULL);
118 ASSERT(stream_ == NULL);
119 ASSERT(tcp_ == NULL);
120 }
121
Connect(const std::string & content_name,const std::string & channel_name,int component)122 bool PseudoTcpChannel::Connect(const std::string& content_name,
123 const std::string& channel_name,
124 int component) {
125 ASSERT(signal_thread_->IsCurrent());
126 CritScope lock(&cs_);
127
128 if (channel_)
129 return false;
130
131 ASSERT(session_ != NULL);
132 worker_thread_ = session_->session_manager()->worker_thread();
133 content_name_ = content_name;
134 channel_ = session_->CreateChannel(
135 content_name, channel_name, component);
136 channel_name_ = channel_name;
137 channel_->SetOption(Socket::OPT_DONTFRAGMENT, 1);
138
139 channel_->SignalDestroyed.connect(this,
140 &PseudoTcpChannel::OnChannelDestroyed);
141 channel_->SignalWritableState.connect(this,
142 &PseudoTcpChannel::OnChannelWritableState);
143 channel_->SignalReadPacket.connect(this,
144 &PseudoTcpChannel::OnChannelRead);
145 channel_->SignalRouteChange.connect(this,
146 &PseudoTcpChannel::OnChannelConnectionChanged);
147
148 ASSERT(tcp_ == NULL);
149 tcp_ = new PseudoTcp(this, 0);
150 if (session_->initiator()) {
151 // Since we may try several protocols and network adapters that won't work,
152 // waiting until we get our first writable notification before initiating
153 // TCP negotiation.
154 ready_to_connect_ = true;
155 }
156
157 return true;
158 }
159
GetStream()160 StreamInterface* PseudoTcpChannel::GetStream() {
161 ASSERT(signal_thread_->IsCurrent());
162 CritScope lock(&cs_);
163 ASSERT(NULL != session_);
164 if (!stream_)
165 stream_ = new PseudoTcpChannel::InternalStream(this);
166 //TODO("should we disallow creation of new stream at some point?");
167 return stream_;
168 }
169
OnChannelDestroyed(TransportChannel * channel)170 void PseudoTcpChannel::OnChannelDestroyed(TransportChannel* channel) {
171 LOG_F(LS_INFO) << "(" << channel->component() << ")";
172 ASSERT(signal_thread_->IsCurrent());
173 CritScope lock(&cs_);
174 ASSERT(channel == channel_);
175 signal_thread_->Clear(this, MSG_SI_DESTROYCHANNEL);
176 // When MSG_WK_PURGE is received, we know there will be no more messages from
177 // the worker thread.
178 worker_thread_->Clear(this, MSG_WK_CLOCK);
179 worker_thread_->Post(this, MSG_WK_PURGE);
180 session_ = NULL;
181 channel_ = NULL;
182 if ((stream_ != NULL)
183 && ((tcp_ == NULL) || (tcp_->State() != PseudoTcp::TCP_CLOSED)))
184 stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_CLOSE, 0));
185 if (tcp_) {
186 tcp_->Close(true);
187 AdjustClock();
188 }
189 SignalChannelClosed(this);
190 }
191
OnSessionTerminate(Session * session)192 void PseudoTcpChannel::OnSessionTerminate(Session* session) {
193 // When the session terminates before we even connected
194 CritScope lock(&cs_);
195 if (session_ != NULL && channel_ == NULL) {
196 ASSERT(session == session_);
197 ASSERT(worker_thread_ == NULL);
198 ASSERT(tcp_ == NULL);
199 LOG(LS_INFO) << "Destroying unconnected PseudoTcpChannel";
200 session_ = NULL;
201 if (stream_ != NULL)
202 stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_CLOSE, -1));
203 }
204
205 // Even though session_ is being destroyed, we mustn't clear the pointer,
206 // since we'll need it to tear down channel_.
207 //
208 // TODO: Is it always the case that if channel_ != NULL then we'll get
209 // a channel-destroyed notification?
210 }
211
GetOption(PseudoTcp::Option opt,int * value)212 void PseudoTcpChannel::GetOption(PseudoTcp::Option opt, int* value) {
213 ASSERT(signal_thread_->IsCurrent());
214 CritScope lock(&cs_);
215 ASSERT(tcp_ != NULL);
216 tcp_->GetOption(opt, value);
217 }
218
SetOption(PseudoTcp::Option opt,int value)219 void PseudoTcpChannel::SetOption(PseudoTcp::Option opt, int value) {
220 ASSERT(signal_thread_->IsCurrent());
221 CritScope lock(&cs_);
222 ASSERT(tcp_ != NULL);
223 tcp_->SetOption(opt, value);
224 }
225
226 //
227 // Stream thread methods
228 //
229
GetState() const230 StreamState PseudoTcpChannel::GetState() const {
231 ASSERT(stream_ != NULL && stream_thread_->IsCurrent());
232 CritScope lock(&cs_);
233 if (!session_)
234 return SS_CLOSED;
235 if (!tcp_)
236 return SS_OPENING;
237 switch (tcp_->State()) {
238 case PseudoTcp::TCP_LISTEN:
239 case PseudoTcp::TCP_SYN_SENT:
240 case PseudoTcp::TCP_SYN_RECEIVED:
241 return SS_OPENING;
242 case PseudoTcp::TCP_ESTABLISHED:
243 return SS_OPEN;
244 case PseudoTcp::TCP_CLOSED:
245 default:
246 return SS_CLOSED;
247 }
248 }
249
Read(void * buffer,size_t buffer_len,size_t * read,int * error)250 StreamResult PseudoTcpChannel::Read(void* buffer, size_t buffer_len,
251 size_t* read, int* error) {
252 ASSERT(stream_ != NULL && stream_thread_->IsCurrent());
253 CritScope lock(&cs_);
254 if (!tcp_)
255 return SR_BLOCK;
256
257 stream_readable_ = false;
258 int result = tcp_->Recv(static_cast<char*>(buffer), buffer_len);
259 //LOG_F(LS_VERBOSE) << "Recv returned: " << result;
260 if (result > 0) {
261 if (read)
262 *read = result;
263 // PseudoTcp doesn't currently support repeated Readable signals. Simulate
264 // them here.
265 stream_readable_ = true;
266 if (!pending_read_event_) {
267 pending_read_event_ = true;
268 stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_READ), true);
269 }
270 return SR_SUCCESS;
271 } else if (IsBlockingError(tcp_->GetError())) {
272 return SR_BLOCK;
273 } else {
274 if (error)
275 *error = tcp_->GetError();
276 return SR_ERROR;
277 }
278 // This spot is never reached.
279 }
280
Write(const void * data,size_t data_len,size_t * written,int * error)281 StreamResult PseudoTcpChannel::Write(const void* data, size_t data_len,
282 size_t* written, int* error) {
283 ASSERT(stream_ != NULL && stream_thread_->IsCurrent());
284 CritScope lock(&cs_);
285 if (!tcp_)
286 return SR_BLOCK;
287 int result = tcp_->Send(static_cast<const char*>(data), data_len);
288 //LOG_F(LS_VERBOSE) << "Send returned: " << result;
289 if (result > 0) {
290 if (written)
291 *written = result;
292 return SR_SUCCESS;
293 } else if (IsBlockingError(tcp_->GetError())) {
294 return SR_BLOCK;
295 } else {
296 if (error)
297 *error = tcp_->GetError();
298 return SR_ERROR;
299 }
300 // This spot is never reached.
301 }
302
Close()303 void PseudoTcpChannel::Close() {
304 ASSERT(stream_ != NULL && stream_thread_->IsCurrent());
305 CritScope lock(&cs_);
306 stream_ = NULL;
307 // Clear out any pending event notifications
308 stream_thread_->Clear(this, MSG_ST_EVENT);
309 if (tcp_) {
310 tcp_->Close(false);
311 AdjustClock();
312 } else {
313 CheckDestroy();
314 }
315 }
316
317 //
318 // Worker thread methods
319 //
320
OnChannelWritableState(TransportChannel * channel)321 void PseudoTcpChannel::OnChannelWritableState(TransportChannel* channel) {
322 LOG_F(LS_VERBOSE) << "[" << channel_name_ << "]";
323 ASSERT(worker_thread_->IsCurrent());
324 CritScope lock(&cs_);
325 if (!channel_) {
326 LOG_F(LS_WARNING) << "NULL channel";
327 return;
328 }
329 ASSERT(channel == channel_);
330 if (!tcp_) {
331 LOG_F(LS_WARNING) << "NULL tcp";
332 return;
333 }
334 if (!ready_to_connect_ || !channel->writable())
335 return;
336
337 ready_to_connect_ = false;
338 tcp_->Connect();
339 AdjustClock();
340 }
341
OnChannelRead(TransportChannel * channel,const char * data,size_t size,const rtc::PacketTime & packet_time,int flags)342 void PseudoTcpChannel::OnChannelRead(TransportChannel* channel,
343 const char* data, size_t size,
344 const rtc::PacketTime& packet_time,
345 int flags) {
346 //LOG_F(LS_VERBOSE) << "(" << size << ")";
347 ASSERT(worker_thread_->IsCurrent());
348 CritScope lock(&cs_);
349 if (!channel_) {
350 LOG_F(LS_WARNING) << "NULL channel";
351 return;
352 }
353 ASSERT(channel == channel_);
354 if (!tcp_) {
355 LOG_F(LS_WARNING) << "NULL tcp";
356 return;
357 }
358 tcp_->NotifyPacket(data, size);
359 AdjustClock();
360 }
361
OnChannelConnectionChanged(TransportChannel * channel,const Candidate & candidate)362 void PseudoTcpChannel::OnChannelConnectionChanged(TransportChannel* channel,
363 const Candidate& candidate) {
364 LOG_F(LS_VERBOSE) << "[" << channel_name_ << "]";
365 ASSERT(worker_thread_->IsCurrent());
366 CritScope lock(&cs_);
367 if (!channel_) {
368 LOG_F(LS_WARNING) << "NULL channel";
369 return;
370 }
371 ASSERT(channel == channel_);
372 if (!tcp_) {
373 LOG_F(LS_WARNING) << "NULL tcp";
374 return;
375 }
376
377 uint16 mtu = 1280; // safe default
378 int family = candidate.address().family();
379 Socket* socket =
380 worker_thread_->socketserver()->CreateAsyncSocket(family, SOCK_DGRAM);
381 rtc::scoped_ptr<Socket> mtu_socket(socket);
382 if (socket == NULL) {
383 LOG_F(LS_WARNING) << "Couldn't create socket while estimating MTU.";
384 } else {
385 if (mtu_socket->Connect(candidate.address()) < 0 ||
386 mtu_socket->EstimateMTU(&mtu) < 0) {
387 LOG_F(LS_WARNING) << "Failed to estimate MTU, error="
388 << mtu_socket->GetError();
389 }
390 }
391
392 LOG_F(LS_VERBOSE) << "Using MTU of " << mtu << " bytes";
393 tcp_->NotifyMTU(mtu);
394 AdjustClock();
395 }
396
OnTcpOpen(PseudoTcp * tcp)397 void PseudoTcpChannel::OnTcpOpen(PseudoTcp* tcp) {
398 LOG_F(LS_VERBOSE) << "[" << channel_name_ << "]";
399 ASSERT(cs_.CurrentThreadIsOwner());
400 ASSERT(worker_thread_->IsCurrent());
401 ASSERT(tcp == tcp_);
402 if (stream_) {
403 stream_readable_ = true;
404 pending_read_event_ = true;
405 stream_thread_->Post(this, MSG_ST_EVENT,
406 new EventData(SE_OPEN | SE_READ | SE_WRITE));
407 }
408 }
409
OnTcpReadable(PseudoTcp * tcp)410 void PseudoTcpChannel::OnTcpReadable(PseudoTcp* tcp) {
411 //LOG_F(LS_VERBOSE);
412 ASSERT(cs_.CurrentThreadIsOwner());
413 ASSERT(worker_thread_->IsCurrent());
414 ASSERT(tcp == tcp_);
415 if (stream_) {
416 stream_readable_ = true;
417 if (!pending_read_event_) {
418 pending_read_event_ = true;
419 stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_READ));
420 }
421 }
422 }
423
OnTcpWriteable(PseudoTcp * tcp)424 void PseudoTcpChannel::OnTcpWriteable(PseudoTcp* tcp) {
425 //LOG_F(LS_VERBOSE);
426 ASSERT(cs_.CurrentThreadIsOwner());
427 ASSERT(worker_thread_->IsCurrent());
428 ASSERT(tcp == tcp_);
429 if (stream_)
430 stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_WRITE));
431 }
432
OnTcpClosed(PseudoTcp * tcp,uint32 nError)433 void PseudoTcpChannel::OnTcpClosed(PseudoTcp* tcp, uint32 nError) {
434 LOG_F(LS_VERBOSE) << "[" << channel_name_ << "]";
435 ASSERT(cs_.CurrentThreadIsOwner());
436 ASSERT(worker_thread_->IsCurrent());
437 ASSERT(tcp == tcp_);
438 if (stream_)
439 stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_CLOSE, nError));
440 }
441
442 //
443 // Multi-thread methods
444 //
445
OnMessage(Message * pmsg)446 void PseudoTcpChannel::OnMessage(Message* pmsg) {
447 if (pmsg->message_id == MSG_WK_CLOCK) {
448
449 ASSERT(worker_thread_->IsCurrent());
450 //LOG(LS_INFO) << "PseudoTcpChannel::OnMessage(MSG_WK_CLOCK)";
451 CritScope lock(&cs_);
452 if (tcp_) {
453 tcp_->NotifyClock(PseudoTcp::Now());
454 AdjustClock(false);
455 }
456
457 } else if (pmsg->message_id == MSG_WK_PURGE) {
458
459 ASSERT(worker_thread_->IsCurrent());
460 LOG_F(LS_INFO) << "(MSG_WK_PURGE)";
461 // At this point, we know there are no additional worker thread messages.
462 CritScope lock(&cs_);
463 ASSERT(NULL == session_);
464 ASSERT(NULL == channel_);
465 worker_thread_ = NULL;
466 CheckDestroy();
467
468 } else if (pmsg->message_id == MSG_ST_EVENT) {
469
470 ASSERT(stream_thread_->IsCurrent());
471 //LOG(LS_INFO) << "PseudoTcpChannel::OnMessage(MSG_ST_EVENT, "
472 // << data->event << ", " << data->error << ")";
473 ASSERT(stream_ != NULL);
474 EventData* data = static_cast<EventData*>(pmsg->pdata);
475 if (data->event & SE_READ) {
476 CritScope lock(&cs_);
477 pending_read_event_ = false;
478 }
479 stream_->SignalEvent(stream_, data->event, data->error);
480 delete data;
481
482 } else if (pmsg->message_id == MSG_SI_DESTROYCHANNEL) {
483
484 ASSERT(signal_thread_->IsCurrent());
485 LOG_F(LS_INFO) << "(MSG_SI_DESTROYCHANNEL)";
486 ASSERT(session_ != NULL);
487 ASSERT(channel_ != NULL);
488 session_->DestroyChannel(content_name_, channel_->component());
489
490 } else if (pmsg->message_id == MSG_SI_DESTROY) {
491
492 ASSERT(signal_thread_->IsCurrent());
493 LOG_F(LS_INFO) << "(MSG_SI_DESTROY)";
494 // The message queue is empty, so it is safe to destroy ourselves.
495 delete this;
496
497 } else {
498 ASSERT(false);
499 }
500 }
501
TcpWritePacket(PseudoTcp * tcp,const char * buffer,size_t len)502 IPseudoTcpNotify::WriteResult PseudoTcpChannel::TcpWritePacket(
503 PseudoTcp* tcp, const char* buffer, size_t len) {
504 ASSERT(cs_.CurrentThreadIsOwner());
505 ASSERT(tcp == tcp_);
506 ASSERT(NULL != channel_);
507 rtc::PacketOptions packet_options;
508 int sent = channel_->SendPacket(buffer, len, packet_options);
509 if (sent > 0) {
510 //LOG_F(LS_VERBOSE) << "(" << sent << ") Sent";
511 return IPseudoTcpNotify::WR_SUCCESS;
512 } else if (IsBlockingError(channel_->GetError())) {
513 LOG_F(LS_VERBOSE) << "Blocking";
514 return IPseudoTcpNotify::WR_SUCCESS;
515 } else if (channel_->GetError() == EMSGSIZE) {
516 LOG_F(LS_ERROR) << "EMSGSIZE";
517 return IPseudoTcpNotify::WR_TOO_LARGE;
518 } else {
519 PLOG(LS_ERROR, channel_->GetError()) << "PseudoTcpChannel::TcpWritePacket";
520 ASSERT(false);
521 return IPseudoTcpNotify::WR_FAIL;
522 }
523 }
524
AdjustClock(bool clear)525 void PseudoTcpChannel::AdjustClock(bool clear) {
526 ASSERT(cs_.CurrentThreadIsOwner());
527 ASSERT(NULL != tcp_);
528
529 long timeout = 0;
530 if (tcp_->GetNextClock(PseudoTcp::Now(), timeout)) {
531 ASSERT(NULL != channel_);
532 // Reset the next clock, by clearing the old and setting a new one.
533 if (clear)
534 worker_thread_->Clear(this, MSG_WK_CLOCK);
535 worker_thread_->PostDelayed(_max(timeout, 0L), this, MSG_WK_CLOCK);
536 return;
537 }
538
539 delete tcp_;
540 tcp_ = NULL;
541 ready_to_connect_ = false;
542
543 if (channel_) {
544 // If TCP has failed, no need for channel_ anymore
545 signal_thread_->Post(this, MSG_SI_DESTROYCHANNEL);
546 }
547 }
548
CheckDestroy()549 void PseudoTcpChannel::CheckDestroy() {
550 ASSERT(cs_.CurrentThreadIsOwner());
551 if ((worker_thread_ != NULL) || (stream_ != NULL))
552 return;
553 signal_thread_->Post(this, MSG_SI_DESTROY);
554 }
555
556 ///////////////////////////////////////////////////////////////////////////////
557 // PseudoTcpChannel::InternalStream
558 ///////////////////////////////////////////////////////////////////////////////
559
InternalStream(PseudoTcpChannel * parent)560 PseudoTcpChannel::InternalStream::InternalStream(PseudoTcpChannel* parent)
561 : parent_(parent) {
562 }
563
~InternalStream()564 PseudoTcpChannel::InternalStream::~InternalStream() {
565 Close();
566 }
567
GetState() const568 StreamState PseudoTcpChannel::InternalStream::GetState() const {
569 if (!parent_)
570 return SS_CLOSED;
571 return parent_->GetState();
572 }
573
Read(void * buffer,size_t buffer_len,size_t * read,int * error)574 StreamResult PseudoTcpChannel::InternalStream::Read(
575 void* buffer, size_t buffer_len, size_t* read, int* error) {
576 if (!parent_) {
577 if (error)
578 *error = ENOTCONN;
579 return SR_ERROR;
580 }
581 return parent_->Read(buffer, buffer_len, read, error);
582 }
583
Write(const void * data,size_t data_len,size_t * written,int * error)584 StreamResult PseudoTcpChannel::InternalStream::Write(
585 const void* data, size_t data_len, size_t* written, int* error) {
586 if (!parent_) {
587 if (error)
588 *error = ENOTCONN;
589 return SR_ERROR;
590 }
591 return parent_->Write(data, data_len, written, error);
592 }
593
Close()594 void PseudoTcpChannel::InternalStream::Close() {
595 if (!parent_)
596 return;
597 parent_->Close();
598 parent_ = NULL;
599 }
600
601 ///////////////////////////////////////////////////////////////////////////////
602
603 } // namespace cricket
604