• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1// Copyright 2014 The Go Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5package runtime
6
7// This file contains the implementation of Go channels.
8
9// Invariants:
10//  At least one of c.sendq and c.recvq is empty,
11//  except for the case of an unbuffered channel with a single goroutine
12//  blocked on it for both sending and receiving using a select statement,
13//  in which case the length of c.sendq and c.recvq is limited only by the
14//  size of the select statement.
15//
16// For buffered channels, also:
17//  c.qcount > 0 implies that c.recvq is empty.
18//  c.qcount < c.dataqsiz implies that c.sendq is empty.
19
20import (
21	"internal/abi"
22	"internal/runtime/atomic"
23	"runtime/internal/math"
24	"unsafe"
25)
26
27const (
28	maxAlign  = 8
29	hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))
30	debugChan = false
31)
32
33type hchan struct {
34	qcount   uint           // total data in the queue
35	dataqsiz uint           // size of the circular queue
36	buf      unsafe.Pointer // points to an array of dataqsiz elements
37	elemsize uint16
38	closed   uint32
39	timer    *timer // timer feeding this chan
40	elemtype *_type // element type
41	sendx    uint   // send index
42	recvx    uint   // receive index
43	recvq    waitq  // list of recv waiters
44	sendq    waitq  // list of send waiters
45
46	// lock protects all fields in hchan, as well as several
47	// fields in sudogs blocked on this channel.
48	//
49	// Do not change another G's status while holding this lock
50	// (in particular, do not ready a G), as this can deadlock
51	// with stack shrinking.
52	lock mutex
53}
54
55type waitq struct {
56	first *sudog
57	last  *sudog
58}
59
60//go:linkname reflect_makechan reflect.makechan
61func reflect_makechan(t *chantype, size int) *hchan {
62	return makechan(t, size)
63}
64
65func makechan64(t *chantype, size int64) *hchan {
66	if int64(int(size)) != size {
67		panic(plainError("makechan: size out of range"))
68	}
69
70	return makechan(t, int(size))
71}
72
73func makechan(t *chantype, size int) *hchan {
74	elem := t.Elem
75
76	// compiler checks this but be safe.
77	if elem.Size_ >= 1<<16 {
78		throw("makechan: invalid channel element type")
79	}
80	if hchanSize%maxAlign != 0 || elem.Align_ > maxAlign {
81		throw("makechan: bad alignment")
82	}
83
84	mem, overflow := math.MulUintptr(elem.Size_, uintptr(size))
85	if overflow || mem > maxAlloc-hchanSize || size < 0 {
86		panic(plainError("makechan: size out of range"))
87	}
88
89	// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
90	// buf points into the same allocation, elemtype is persistent.
91	// SudoG's are referenced from their owning thread so they can't be collected.
92	// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
93	var c *hchan
94	switch {
95	case mem == 0:
96		// Queue or element size is zero.
97		c = (*hchan)(mallocgc(hchanSize, nil, true))
98		// Race detector uses this location for synchronization.
99		c.buf = c.raceaddr()
100	case !elem.Pointers():
101		// Elements do not contain pointers.
102		// Allocate hchan and buf in one call.
103		c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
104		c.buf = add(unsafe.Pointer(c), hchanSize)
105	default:
106		// Elements contain pointers.
107		c = new(hchan)
108		c.buf = mallocgc(mem, elem, true)
109	}
110
111	c.elemsize = uint16(elem.Size_)
112	c.elemtype = elem
113	c.dataqsiz = uint(size)
114	lockInit(&c.lock, lockRankHchan)
115
116	if debugChan {
117		print("makechan: chan=", c, "; elemsize=", elem.Size_, "; dataqsiz=", size, "\n")
118	}
119	return c
120}
121
122// chanbuf(c, i) is pointer to the i'th slot in the buffer.
123//
124// chanbuf should be an internal detail,
125// but widely used packages access it using linkname.
126// Notable members of the hall of shame include:
127//   - github.com/fjl/memsize
128//
129// Do not remove or change the type signature.
130// See go.dev/issue/67401.
131//
132//go:linkname chanbuf
133func chanbuf(c *hchan, i uint) unsafe.Pointer {
134	return add(c.buf, uintptr(i)*uintptr(c.elemsize))
135}
136
137// full reports whether a send on c would block (that is, the channel is full).
138// It uses a single word-sized read of mutable state, so although
139// the answer is instantaneously true, the correct answer may have changed
140// by the time the calling function receives the return value.
141func full(c *hchan) bool {
142	// c.dataqsiz is immutable (never written after the channel is created)
143	// so it is safe to read at any time during channel operation.
144	if c.dataqsiz == 0 {
145		// Assumes that a pointer read is relaxed-atomic.
146		return c.recvq.first == nil
147	}
148	// Assumes that a uint read is relaxed-atomic.
149	return c.qcount == c.dataqsiz
150}
151
152// entry point for c <- x from compiled code.
153//
154//go:nosplit
155func chansend1(c *hchan, elem unsafe.Pointer) {
156	chansend(c, elem, true, getcallerpc())
157}
158
159/*
160 * generic single channel send/recv
161 * If block is not nil,
162 * then the protocol will not
163 * sleep but return if it could
164 * not complete.
165 *
166 * sleep can wake up with g.param == nil
167 * when a channel involved in the sleep has
168 * been closed.  it is easiest to loop and re-run
169 * the operation; we'll see that it's now closed.
170 */
171func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
172	if c == nil {
173		if !block {
174			return false
175		}
176		gopark(nil, nil, waitReasonChanSendNilChan, traceBlockForever, 2)
177		throw("unreachable")
178	}
179
180	if debugChan {
181		print("chansend: chan=", c, "\n")
182	}
183
184	if raceenabled {
185		racereadpc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(chansend))
186	}
187
188	// Fast path: check for failed non-blocking operation without acquiring the lock.
189	//
190	// After observing that the channel is not closed, we observe that the channel is
191	// not ready for sending. Each of these observations is a single word-sized read
192	// (first c.closed and second full()).
193	// Because a closed channel cannot transition from 'ready for sending' to
194	// 'not ready for sending', even if the channel is closed between the two observations,
195	// they imply a moment between the two when the channel was both not yet closed
196	// and not ready for sending. We behave as if we observed the channel at that moment,
197	// and report that the send cannot proceed.
198	//
199	// It is okay if the reads are reordered here: if we observe that the channel is not
200	// ready for sending and then observe that it is not closed, that implies that the
201	// channel wasn't closed during the first observation. However, nothing here
202	// guarantees forward progress. We rely on the side effects of lock release in
203	// chanrecv() and closechan() to update this thread's view of c.closed and full().
204	if !block && c.closed == 0 && full(c) {
205		return false
206	}
207
208	var t0 int64
209	if blockprofilerate > 0 {
210		t0 = cputicks()
211	}
212
213	lock(&c.lock)
214
215	if c.closed != 0 {
216		unlock(&c.lock)
217		panic(plainError("send on closed channel"))
218	}
219
220	if sg := c.recvq.dequeue(); sg != nil {
221		// Found a waiting receiver. We pass the value we want to send
222		// directly to the receiver, bypassing the channel buffer (if any).
223		send(c, sg, ep, func() { unlock(&c.lock) }, 3)
224		return true
225	}
226
227	if c.qcount < c.dataqsiz {
228		// Space is available in the channel buffer. Enqueue the element to send.
229		qp := chanbuf(c, c.sendx)
230		if raceenabled {
231			racenotify(c, c.sendx, nil)
232		}
233		typedmemmove(c.elemtype, qp, ep)
234		c.sendx++
235		if c.sendx == c.dataqsiz {
236			c.sendx = 0
237		}
238		c.qcount++
239		unlock(&c.lock)
240		return true
241	}
242
243	if !block {
244		unlock(&c.lock)
245		return false
246	}
247
248	// Block on the channel. Some receiver will complete our operation for us.
249	gp := getg()
250	mysg := acquireSudog()
251	mysg.releasetime = 0
252	if t0 != 0 {
253		mysg.releasetime = -1
254	}
255	// No stack splits between assigning elem and enqueuing mysg
256	// on gp.waiting where copystack can find it.
257	mysg.elem = ep
258	mysg.waitlink = nil
259	mysg.g = gp
260	mysg.isSelect = false
261	mysg.c = c
262	gp.waiting = mysg
263	gp.param = nil
264	c.sendq.enqueue(mysg)
265	// Signal to anyone trying to shrink our stack that we're about
266	// to park on a channel. The window between when this G's status
267	// changes and when we set gp.activeStackChans is not safe for
268	// stack shrinking.
269	gp.parkingOnChan.Store(true)
270	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceBlockChanSend, 2)
271	// Ensure the value being sent is kept alive until the
272	// receiver copies it out. The sudog has a pointer to the
273	// stack object, but sudogs aren't considered as roots of the
274	// stack tracer.
275	KeepAlive(ep)
276
277	// someone woke us up.
278	if mysg != gp.waiting {
279		throw("G waiting list is corrupted")
280	}
281	gp.waiting = nil
282	gp.activeStackChans = false
283	closed := !mysg.success
284	gp.param = nil
285	if mysg.releasetime > 0 {
286		blockevent(mysg.releasetime-t0, 2)
287	}
288	mysg.c = nil
289	releaseSudog(mysg)
290	if closed {
291		if c.closed == 0 {
292			throw("chansend: spurious wakeup")
293		}
294		panic(plainError("send on closed channel"))
295	}
296	return true
297}
298
299// send processes a send operation on an empty channel c.
300// The value ep sent by the sender is copied to the receiver sg.
301// The receiver is then woken up to go on its merry way.
302// Channel c must be empty and locked.  send unlocks c with unlockf.
303// sg must already be dequeued from c.
304// ep must be non-nil and point to the heap or the caller's stack.
305func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
306	if raceenabled {
307		if c.dataqsiz == 0 {
308			racesync(c, sg)
309		} else {
310			// Pretend we go through the buffer, even though
311			// we copy directly. Note that we need to increment
312			// the head/tail locations only when raceenabled.
313			racenotify(c, c.recvx, nil)
314			racenotify(c, c.recvx, sg)
315			c.recvx++
316			if c.recvx == c.dataqsiz {
317				c.recvx = 0
318			}
319			c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
320		}
321	}
322	if sg.elem != nil {
323		sendDirect(c.elemtype, sg, ep)
324		sg.elem = nil
325	}
326	gp := sg.g
327	unlockf()
328	gp.param = unsafe.Pointer(sg)
329	sg.success = true
330	if sg.releasetime != 0 {
331		sg.releasetime = cputicks()
332	}
333	goready(gp, skip+1)
334}
335
336// timerchandrain removes all elements in channel c's buffer.
337// It reports whether any elements were removed.
338// Because it is only intended for timers, it does not
339// handle waiting senders at all (all timer channels
340// use non-blocking sends to fill the buffer).
341func timerchandrain(c *hchan) bool {
342	// Note: Cannot use empty(c) because we are called
343	// while holding c.timer.sendLock, and empty(c) will
344	// call c.timer.maybeRunChan, which will deadlock.
345	// We are emptying the channel, so we only care about
346	// the count, not about potentially filling it up.
347	if atomic.Loaduint(&c.qcount) == 0 {
348		return false
349	}
350	lock(&c.lock)
351	any := false
352	for c.qcount > 0 {
353		any = true
354		typedmemclr(c.elemtype, chanbuf(c, c.recvx))
355		c.recvx++
356		if c.recvx == c.dataqsiz {
357			c.recvx = 0
358		}
359		c.qcount--
360	}
361	unlock(&c.lock)
362	return any
363}
364
365// Sends and receives on unbuffered or empty-buffered channels are the
366// only operations where one running goroutine writes to the stack of
367// another running goroutine. The GC assumes that stack writes only
368// happen when the goroutine is running and are only done by that
369// goroutine. Using a write barrier is sufficient to make up for
370// violating that assumption, but the write barrier has to work.
371// typedmemmove will call bulkBarrierPreWrite, but the target bytes
372// are not in the heap, so that will not help. We arrange to call
373// memmove and typeBitsBulkBarrier instead.
374
375func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
376	// src is on our stack, dst is a slot on another stack.
377
378	// Once we read sg.elem out of sg, it will no longer
379	// be updated if the destination's stack gets copied (shrunk).
380	// So make sure that no preemption points can happen between read & use.
381	dst := sg.elem
382	typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.Size_)
383	// No need for cgo write barrier checks because dst is always
384	// Go memory.
385	memmove(dst, src, t.Size_)
386}
387
388func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
389	// dst is on our stack or the heap, src is on another stack.
390	// The channel is locked, so src will not move during this
391	// operation.
392	src := sg.elem
393	typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.Size_)
394	memmove(dst, src, t.Size_)
395}
396
397func closechan(c *hchan) {
398	if c == nil {
399		panic(plainError("close of nil channel"))
400	}
401
402	lock(&c.lock)
403	if c.closed != 0 {
404		unlock(&c.lock)
405		panic(plainError("close of closed channel"))
406	}
407
408	if raceenabled {
409		callerpc := getcallerpc()
410		racewritepc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(closechan))
411		racerelease(c.raceaddr())
412	}
413
414	c.closed = 1
415
416	var glist gList
417
418	// release all readers
419	for {
420		sg := c.recvq.dequeue()
421		if sg == nil {
422			break
423		}
424		if sg.elem != nil {
425			typedmemclr(c.elemtype, sg.elem)
426			sg.elem = nil
427		}
428		if sg.releasetime != 0 {
429			sg.releasetime = cputicks()
430		}
431		gp := sg.g
432		gp.param = unsafe.Pointer(sg)
433		sg.success = false
434		if raceenabled {
435			raceacquireg(gp, c.raceaddr())
436		}
437		glist.push(gp)
438	}
439
440	// release all writers (they will panic)
441	for {
442		sg := c.sendq.dequeue()
443		if sg == nil {
444			break
445		}
446		sg.elem = nil
447		if sg.releasetime != 0 {
448			sg.releasetime = cputicks()
449		}
450		gp := sg.g
451		gp.param = unsafe.Pointer(sg)
452		sg.success = false
453		if raceenabled {
454			raceacquireg(gp, c.raceaddr())
455		}
456		glist.push(gp)
457	}
458	unlock(&c.lock)
459
460	// Ready all Gs now that we've dropped the channel lock.
461	for !glist.empty() {
462		gp := glist.pop()
463		gp.schedlink = 0
464		goready(gp, 3)
465	}
466}
467
468// empty reports whether a read from c would block (that is, the channel is
469// empty).  It is atomically correct and sequentially consistent at the moment
470// it returns, but since the channel is unlocked, the channel may become
471// non-empty immediately afterward.
472func empty(c *hchan) bool {
473	// c.dataqsiz is immutable.
474	if c.dataqsiz == 0 {
475		return atomic.Loadp(unsafe.Pointer(&c.sendq.first)) == nil
476	}
477	// c.timer is also immutable (it is set after make(chan) but before any channel operations).
478	// All timer channels have dataqsiz > 0.
479	if c.timer != nil {
480		c.timer.maybeRunChan()
481	}
482	return atomic.Loaduint(&c.qcount) == 0
483}
484
485// entry points for <- c from compiled code.
486//
487//go:nosplit
488func chanrecv1(c *hchan, elem unsafe.Pointer) {
489	chanrecv(c, elem, true)
490}
491
492//go:nosplit
493func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
494	_, received = chanrecv(c, elem, true)
495	return
496}
497
498// chanrecv receives on channel c and writes the received data to ep.
499// ep may be nil, in which case received data is ignored.
500// If block == false and no elements are available, returns (false, false).
501// Otherwise, if c is closed, zeros *ep and returns (true, false).
502// Otherwise, fills in *ep with an element and returns (true, true).
503// A non-nil ep must point to the heap or the caller's stack.
504func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
505	// raceenabled: don't need to check ep, as it is always on the stack
506	// or is new memory allocated by reflect.
507
508	if debugChan {
509		print("chanrecv: chan=", c, "\n")
510	}
511
512	if c == nil {
513		if !block {
514			return
515		}
516		gopark(nil, nil, waitReasonChanReceiveNilChan, traceBlockForever, 2)
517		throw("unreachable")
518	}
519
520	if c.timer != nil {
521		c.timer.maybeRunChan()
522	}
523
524	// Fast path: check for failed non-blocking operation without acquiring the lock.
525	if !block && empty(c) {
526		// After observing that the channel is not ready for receiving, we observe whether the
527		// channel is closed.
528		//
529		// Reordering of these checks could lead to incorrect behavior when racing with a close.
530		// For example, if the channel was open and not empty, was closed, and then drained,
531		// reordered reads could incorrectly indicate "open and empty". To prevent reordering,
532		// we use atomic loads for both checks, and rely on emptying and closing to happen in
533		// separate critical sections under the same lock.  This assumption fails when closing
534		// an unbuffered channel with a blocked send, but that is an error condition anyway.
535		if atomic.Load(&c.closed) == 0 {
536			// Because a channel cannot be reopened, the later observation of the channel
537			// being not closed implies that it was also not closed at the moment of the
538			// first observation. We behave as if we observed the channel at that moment
539			// and report that the receive cannot proceed.
540			return
541		}
542		// The channel is irreversibly closed. Re-check whether the channel has any pending data
543		// to receive, which could have arrived between the empty and closed checks above.
544		// Sequential consistency is also required here, when racing with such a send.
545		if empty(c) {
546			// The channel is irreversibly closed and empty.
547			if raceenabled {
548				raceacquire(c.raceaddr())
549			}
550			if ep != nil {
551				typedmemclr(c.elemtype, ep)
552			}
553			return true, false
554		}
555	}
556
557	var t0 int64
558	if blockprofilerate > 0 {
559		t0 = cputicks()
560	}
561
562	lock(&c.lock)
563
564	if c.closed != 0 {
565		if c.qcount == 0 {
566			if raceenabled {
567				raceacquire(c.raceaddr())
568			}
569			unlock(&c.lock)
570			if ep != nil {
571				typedmemclr(c.elemtype, ep)
572			}
573			return true, false
574		}
575		// The channel has been closed, but the channel's buffer have data.
576	} else {
577		// Just found waiting sender with not closed.
578		if sg := c.sendq.dequeue(); sg != nil {
579			// Found a waiting sender. If buffer is size 0, receive value
580			// directly from sender. Otherwise, receive from head of queue
581			// and add sender's value to the tail of the queue (both map to
582			// the same buffer slot because the queue is full).
583			recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
584			return true, true
585		}
586	}
587
588	if c.qcount > 0 {
589		// Receive directly from queue
590		qp := chanbuf(c, c.recvx)
591		if raceenabled {
592			racenotify(c, c.recvx, nil)
593		}
594		if ep != nil {
595			typedmemmove(c.elemtype, ep, qp)
596		}
597		typedmemclr(c.elemtype, qp)
598		c.recvx++
599		if c.recvx == c.dataqsiz {
600			c.recvx = 0
601		}
602		c.qcount--
603		unlock(&c.lock)
604		return true, true
605	}
606
607	if !block {
608		unlock(&c.lock)
609		return false, false
610	}
611
612	// no sender available: block on this channel.
613	gp := getg()
614	mysg := acquireSudog()
615	mysg.releasetime = 0
616	if t0 != 0 {
617		mysg.releasetime = -1
618	}
619	// No stack splits between assigning elem and enqueuing mysg
620	// on gp.waiting where copystack can find it.
621	mysg.elem = ep
622	mysg.waitlink = nil
623	gp.waiting = mysg
624
625	mysg.g = gp
626	mysg.isSelect = false
627	mysg.c = c
628	gp.param = nil
629	c.recvq.enqueue(mysg)
630	if c.timer != nil {
631		blockTimerChan(c)
632	}
633
634	// Signal to anyone trying to shrink our stack that we're about
635	// to park on a channel. The window between when this G's status
636	// changes and when we set gp.activeStackChans is not safe for
637	// stack shrinking.
638	gp.parkingOnChan.Store(true)
639	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceBlockChanRecv, 2)
640
641	// someone woke us up
642	if mysg != gp.waiting {
643		throw("G waiting list is corrupted")
644	}
645	if c.timer != nil {
646		unblockTimerChan(c)
647	}
648	gp.waiting = nil
649	gp.activeStackChans = false
650	if mysg.releasetime > 0 {
651		blockevent(mysg.releasetime-t0, 2)
652	}
653	success := mysg.success
654	gp.param = nil
655	mysg.c = nil
656	releaseSudog(mysg)
657	return true, success
658}
659
660// recv processes a receive operation on a full channel c.
661// There are 2 parts:
662//  1. The value sent by the sender sg is put into the channel
663//     and the sender is woken up to go on its merry way.
664//  2. The value received by the receiver (the current G) is
665//     written to ep.
666//
667// For synchronous channels, both values are the same.
668// For asynchronous channels, the receiver gets its data from
669// the channel buffer and the sender's data is put in the
670// channel buffer.
671// Channel c must be full and locked. recv unlocks c with unlockf.
672// sg must already be dequeued from c.
673// A non-nil ep must point to the heap or the caller's stack.
674func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
675	if c.dataqsiz == 0 {
676		if raceenabled {
677			racesync(c, sg)
678		}
679		if ep != nil {
680			// copy data from sender
681			recvDirect(c.elemtype, sg, ep)
682		}
683	} else {
684		// Queue is full. Take the item at the
685		// head of the queue. Make the sender enqueue
686		// its item at the tail of the queue. Since the
687		// queue is full, those are both the same slot.
688		qp := chanbuf(c, c.recvx)
689		if raceenabled {
690			racenotify(c, c.recvx, nil)
691			racenotify(c, c.recvx, sg)
692		}
693		// copy data from queue to receiver
694		if ep != nil {
695			typedmemmove(c.elemtype, ep, qp)
696		}
697		// copy data from sender to queue
698		typedmemmove(c.elemtype, qp, sg.elem)
699		c.recvx++
700		if c.recvx == c.dataqsiz {
701			c.recvx = 0
702		}
703		c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
704	}
705	sg.elem = nil
706	gp := sg.g
707	unlockf()
708	gp.param = unsafe.Pointer(sg)
709	sg.success = true
710	if sg.releasetime != 0 {
711		sg.releasetime = cputicks()
712	}
713	goready(gp, skip+1)
714}
715
716func chanparkcommit(gp *g, chanLock unsafe.Pointer) bool {
717	// There are unlocked sudogs that point into gp's stack. Stack
718	// copying must lock the channels of those sudogs.
719	// Set activeStackChans here instead of before we try parking
720	// because we could self-deadlock in stack growth on the
721	// channel lock.
722	gp.activeStackChans = true
723	// Mark that it's safe for stack shrinking to occur now,
724	// because any thread acquiring this G's stack for shrinking
725	// is guaranteed to observe activeStackChans after this store.
726	gp.parkingOnChan.Store(false)
727	// Make sure we unlock after setting activeStackChans and
728	// unsetting parkingOnChan. The moment we unlock chanLock
729	// we risk gp getting readied by a channel operation and
730	// so gp could continue running before everything before
731	// the unlock is visible (even to gp itself).
732	unlock((*mutex)(chanLock))
733	return true
734}
735
736// compiler implements
737//
738//	select {
739//	case c <- v:
740//		... foo
741//	default:
742//		... bar
743//	}
744//
745// as
746//
747//	if selectnbsend(c, v) {
748//		... foo
749//	} else {
750//		... bar
751//	}
752func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
753	return chansend(c, elem, false, getcallerpc())
754}
755
756// compiler implements
757//
758//	select {
759//	case v, ok = <-c:
760//		... foo
761//	default:
762//		... bar
763//	}
764//
765// as
766//
767//	if selected, ok = selectnbrecv(&v, c); selected {
768//		... foo
769//	} else {
770//		... bar
771//	}
772func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected, received bool) {
773	return chanrecv(c, elem, false)
774}
775
776//go:linkname reflect_chansend reflect.chansend0
777func reflect_chansend(c *hchan, elem unsafe.Pointer, nb bool) (selected bool) {
778	return chansend(c, elem, !nb, getcallerpc())
779}
780
781//go:linkname reflect_chanrecv reflect.chanrecv
782func reflect_chanrecv(c *hchan, nb bool, elem unsafe.Pointer) (selected bool, received bool) {
783	return chanrecv(c, elem, !nb)
784}
785
786func chanlen(c *hchan) int {
787	if c == nil {
788		return 0
789	}
790	async := debug.asynctimerchan.Load() != 0
791	if c.timer != nil && async {
792		c.timer.maybeRunChan()
793	}
794	if c.timer != nil && !async {
795		// timer channels have a buffered implementation
796		// but present to users as unbuffered, so that we can
797		// undo sends without users noticing.
798		return 0
799	}
800	return int(c.qcount)
801}
802
803func chancap(c *hchan) int {
804	if c == nil {
805		return 0
806	}
807	if c.timer != nil {
808		async := debug.asynctimerchan.Load() != 0
809		if async {
810			return int(c.dataqsiz)
811		}
812		// timer channels have a buffered implementation
813		// but present to users as unbuffered, so that we can
814		// undo sends without users noticing.
815		return 0
816	}
817	return int(c.dataqsiz)
818}
819
820//go:linkname reflect_chanlen reflect.chanlen
821func reflect_chanlen(c *hchan) int {
822	return chanlen(c)
823}
824
825//go:linkname reflectlite_chanlen internal/reflectlite.chanlen
826func reflectlite_chanlen(c *hchan) int {
827	return chanlen(c)
828}
829
830//go:linkname reflect_chancap reflect.chancap
831func reflect_chancap(c *hchan) int {
832	return chancap(c)
833}
834
835//go:linkname reflect_chanclose reflect.chanclose
836func reflect_chanclose(c *hchan) {
837	closechan(c)
838}
839
840func (q *waitq) enqueue(sgp *sudog) {
841	sgp.next = nil
842	x := q.last
843	if x == nil {
844		sgp.prev = nil
845		q.first = sgp
846		q.last = sgp
847		return
848	}
849	sgp.prev = x
850	x.next = sgp
851	q.last = sgp
852}
853
854func (q *waitq) dequeue() *sudog {
855	for {
856		sgp := q.first
857		if sgp == nil {
858			return nil
859		}
860		y := sgp.next
861		if y == nil {
862			q.first = nil
863			q.last = nil
864		} else {
865			y.prev = nil
866			q.first = y
867			sgp.next = nil // mark as removed (see dequeueSudoG)
868		}
869
870		// if a goroutine was put on this queue because of a
871		// select, there is a small window between the goroutine
872		// being woken up by a different case and it grabbing the
873		// channel locks. Once it has the lock
874		// it removes itself from the queue, so we won't see it after that.
875		// We use a flag in the G struct to tell us when someone
876		// else has won the race to signal this goroutine but the goroutine
877		// hasn't removed itself from the queue yet.
878		if sgp.isSelect && !sgp.g.selectDone.CompareAndSwap(0, 1) {
879			continue
880		}
881
882		return sgp
883	}
884}
885
886func (c *hchan) raceaddr() unsafe.Pointer {
887	// Treat read-like and write-like operations on the channel to
888	// happen at this address. Avoid using the address of qcount
889	// or dataqsiz, because the len() and cap() builtins read
890	// those addresses, and we don't want them racing with
891	// operations like close().
892	return unsafe.Pointer(&c.buf)
893}
894
895func racesync(c *hchan, sg *sudog) {
896	racerelease(chanbuf(c, 0))
897	raceacquireg(sg.g, chanbuf(c, 0))
898	racereleaseg(sg.g, chanbuf(c, 0))
899	raceacquire(chanbuf(c, 0))
900}
901
902// Notify the race detector of a send or receive involving buffer entry idx
903// and a channel c or its communicating partner sg.
904// This function handles the special case of c.elemsize==0.
905func racenotify(c *hchan, idx uint, sg *sudog) {
906	// We could have passed the unsafe.Pointer corresponding to entry idx
907	// instead of idx itself.  However, in a future version of this function,
908	// we can use idx to better handle the case of elemsize==0.
909	// A future improvement to the detector is to call TSan with c and idx:
910	// this way, Go will continue to not allocating buffer entries for channels
911	// of elemsize==0, yet the race detector can be made to handle multiple
912	// sync objects underneath the hood (one sync object per idx)
913	qp := chanbuf(c, idx)
914	// When elemsize==0, we don't allocate a full buffer for the channel.
915	// Instead of individual buffer entries, the race detector uses the
916	// c.buf as the only buffer entry.  This simplification prevents us from
917	// following the memory model's happens-before rules (rules that are
918	// implemented in racereleaseacquire).  Instead, we accumulate happens-before
919	// information in the synchronization object associated with c.buf.
920	if c.elemsize == 0 {
921		if sg == nil {
922			raceacquire(qp)
923			racerelease(qp)
924		} else {
925			raceacquireg(sg.g, qp)
926			racereleaseg(sg.g, qp)
927		}
928	} else {
929		if sg == nil {
930			racereleaseacquire(qp)
931		} else {
932			racereleaseacquireg(sg.g, qp)
933		}
934	}
935}
936