1 //===-- Shared memory RPC client / server interface -------------*- C++ -*-===//
2 //
3 // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4 // See https://llvm.org/LICENSE.txt for license information.
5 // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6 //
7 //===----------------------------------------------------------------------===//
8 //
9 // This file implements a remote procedure call mechanism to communicate between
10 // heterogeneous devices that can share an address space atomically. We provide
11 // a client and a server to facilitate the remote call. The client makes request
12 // to the server using a shared communication channel. We use separate atomic
13 // signals to indicate which side, the client or the server is in ownership of
14 // the buffer.
15 //
16 //===----------------------------------------------------------------------===//
17
18 #ifndef LLVM_LIBC_SHARED_RPC_H
19 #define LLVM_LIBC_SHARED_RPC_H
20
21 #include "rpc_util.h"
22
23 namespace rpc {
24
25 /// Use scoped atomic variants if they are available for the target.
26 #if !__has_builtin(__scoped_atomic_load_n)
27 #define __scoped_atomic_load_n(src, ord, scp) __atomic_load_n(src, ord)
28 #define __scoped_atomic_store_n(dst, src, ord, scp) \
29 __atomic_store_n(dst, src, ord)
30 #define __scoped_atomic_fetch_or(src, val, ord, scp) \
31 __atomic_fetch_or(src, val, ord)
32 #define __scoped_atomic_fetch_and(src, val, ord, scp) \
33 __atomic_fetch_and(src, val, ord)
34 #endif
35 #if !__has_builtin(__scoped_atomic_thread_fence)
36 #define __scoped_atomic_thread_fence(ord, scp) __atomic_thread_fence(ord)
37 #endif
38
39 /// Generic codes that can be used whem implementing the server.
40 enum Status {
41 RPC_SUCCESS = 0x0,
42 RPC_ERROR = 0x1000,
43 RPC_UNHANDLED_OPCODE = 0x1001,
44 };
45
46 /// A fixed size channel used to communicate between the RPC client and server.
47 struct Buffer {
48 uint64_t data[8];
49 };
50 static_assert(sizeof(Buffer) == 64, "Buffer size mismatch");
51
52 /// The information associated with a packet. This indicates which operations to
53 /// perform and which threads are active in the slots.
54 struct Header {
55 uint64_t mask;
56 uint32_t opcode;
57 };
58
59 /// The maximum number of parallel ports that the RPC interface can support.
60 constexpr static uint64_t MAX_PORT_COUNT = 4096;
61
62 /// A common process used to synchronize communication between a client and a
63 /// server. The process contains a read-only inbox and a write-only outbox used
64 /// for signaling ownership of the shared buffer between both sides. We assign
65 /// ownership of the buffer to the client if the inbox and outbox bits match,
66 /// otherwise it is owned by the server.
67 ///
68 /// This process is designed to allow the client and the server to exchange data
69 /// using a fixed size packet in a mostly arbitrary order using the 'send' and
70 /// 'recv' operations. The following restrictions to this scheme apply:
71 /// - The client will always start with a 'send' operation.
72 /// - The server will always start with a 'recv' operation.
73 /// - Every 'send' or 'recv' call is mirrored by the other process.
74 template <bool Invert> struct Process {
75 RPC_ATTRS Process() = default;
76 RPC_ATTRS Process(const Process &) = delete;
77 RPC_ATTRS Process &operator=(const Process &) = delete;
78 RPC_ATTRS Process(Process &&) = default;
79 RPC_ATTRS Process &operator=(Process &&) = default;
80 RPC_ATTRS ~Process() = default;
81
82 const uint32_t port_count = 0;
83 const uint32_t *const inbox = nullptr;
84 uint32_t *const outbox = nullptr;
85 Header *const header = nullptr;
86 Buffer *const packet = nullptr;
87
88 static constexpr uint64_t NUM_BITS_IN_WORD = sizeof(uint32_t) * 8;
89 uint32_t lock[MAX_PORT_COUNT / NUM_BITS_IN_WORD] = {0};
90
ProcessProcess91 RPC_ATTRS Process(uint32_t port_count, void *buffer)
92 : port_count(port_count), inbox(reinterpret_cast<uint32_t *>(
93 advance(buffer, inbox_offset(port_count)))),
94 outbox(reinterpret_cast<uint32_t *>(
95 advance(buffer, outbox_offset(port_count)))),
96 header(reinterpret_cast<Header *>(
97 advance(buffer, header_offset(port_count)))),
98 packet(reinterpret_cast<Buffer *>(
99 advance(buffer, buffer_offset(port_count)))) {}
100
101 /// Allocate a memory buffer sufficient to store the following equivalent
102 /// representation in memory.
103 ///
104 /// struct Equivalent {
105 /// Atomic<uint32_t> primary[port_count];
106 /// Atomic<uint32_t> secondary[port_count];
107 /// Header header[port_count];
108 /// Buffer packet[port_count][lane_size];
109 /// };
allocation_sizeProcess110 RPC_ATTRS static constexpr uint64_t allocation_size(uint32_t port_count,
111 uint32_t lane_size) {
112 return buffer_offset(port_count) + buffer_bytes(port_count, lane_size);
113 }
114
115 /// Retrieve the inbox state from memory shared between processes.
load_inboxProcess116 RPC_ATTRS uint32_t load_inbox(uint64_t lane_mask, uint32_t index) const {
117 return rpc::broadcast_value(
118 lane_mask, __scoped_atomic_load_n(&inbox[index], __ATOMIC_RELAXED,
119 __MEMORY_SCOPE_SYSTEM));
120 }
121
122 /// Retrieve the outbox state from memory shared between processes.
load_outboxProcess123 RPC_ATTRS uint32_t load_outbox(uint64_t lane_mask, uint32_t index) const {
124 return rpc::broadcast_value(
125 lane_mask, __scoped_atomic_load_n(&outbox[index], __ATOMIC_RELAXED,
126 __MEMORY_SCOPE_SYSTEM));
127 }
128
129 /// Signal to the other process that this one is finished with the buffer.
130 /// Equivalent to loading outbox followed by store of the inverted value
131 /// The outbox is write only by this warp and tracking the value locally is
132 /// cheaper than calling load_outbox to get the value to store.
invert_outboxProcess133 RPC_ATTRS uint32_t invert_outbox(uint32_t index, uint32_t current_outbox) {
134 uint32_t inverted_outbox = !current_outbox;
135 __scoped_atomic_thread_fence(__ATOMIC_RELEASE, __MEMORY_SCOPE_SYSTEM);
136 __scoped_atomic_store_n(&outbox[index], inverted_outbox, __ATOMIC_RELAXED,
137 __MEMORY_SCOPE_SYSTEM);
138 return inverted_outbox;
139 }
140
141 // Given the current outbox and inbox values, wait until the inbox changes
142 // to indicate that this thread owns the buffer element.
wait_for_ownershipProcess143 RPC_ATTRS void wait_for_ownership(uint64_t lane_mask, uint32_t index,
144 uint32_t outbox, uint32_t in) {
145 while (buffer_unavailable(in, outbox)) {
146 sleep_briefly();
147 in = load_inbox(lane_mask, index);
148 }
149 __scoped_atomic_thread_fence(__ATOMIC_ACQUIRE, __MEMORY_SCOPE_SYSTEM);
150 }
151
152 /// The packet is a linearly allocated array of buffers used to communicate
153 /// with the other process. This function returns the appropriate slot in this
154 /// array such that the process can operate on an entire warp or wavefront.
get_packetProcess155 RPC_ATTRS Buffer *get_packet(uint32_t index, uint32_t lane_size) {
156 return &packet[index * lane_size];
157 }
158
159 /// Determines if this process needs to wait for ownership of the buffer. We
160 /// invert the condition on one of the processes to indicate that if one
161 /// process owns the buffer then the other does not.
buffer_unavailableProcess162 RPC_ATTRS static bool buffer_unavailable(uint32_t in, uint32_t out) {
163 bool cond = in != out;
164 return Invert ? !cond : cond;
165 }
166
167 /// Attempt to claim the lock at index. Return true on lock taken.
168 /// lane_mask is a bitmap of the threads in the warp that would hold the
169 /// single lock on success, e.g. the result of rpc::get_lane_mask()
170 /// The lock is held when the n-th bit of the lock bitfield is set.
try_lockProcess171 RPC_ATTRS bool try_lock(uint64_t lane_mask, uint32_t index) {
172 // On amdgpu, test and set to the nth lock bit and a sync_lane would suffice
173 // On volta, need to handle differences between the threads running and
174 // the threads that were detected in the previous call to get_lane_mask()
175 //
176 // All threads in lane_mask try to claim the lock. At most one can succeed.
177 // There may be threads active which are not in lane mask which must not
178 // succeed in taking the lock, as otherwise it will leak. This is handled
179 // by making threads which are not in lane_mask or with 0, a no-op.
180 uint32_t id = rpc::get_lane_id();
181 bool id_in_lane_mask = lane_mask & (1ul << id);
182
183 // All threads in the warp call fetch_or. Possibly at the same time.
184 bool before = set_nth(lock, index, id_in_lane_mask);
185 uint64_t packed = rpc::ballot(lane_mask, before);
186
187 // If every bit set in lane_mask is also set in packed, every single thread
188 // in the warp failed to get the lock. Ballot returns unset for threads not
189 // in the lane mask.
190 //
191 // Cases, per thread:
192 // mask==0 -> unspecified before, discarded by ballot -> 0
193 // mask==1 and before==0 (success), set zero by ballot -> 0
194 // mask==1 and before==1 (failure), set one by ballot -> 1
195 //
196 // mask != packed implies at least one of the threads got the lock
197 // atomic semantics of fetch_or mean at most one of the threads for the lock
198
199 // If holding the lock then the caller can load values knowing said loads
200 // won't move past the lock. No such guarantee is needed if the lock acquire
201 // failed. This conditional branch is expected to fold in the caller after
202 // inlining the current function.
203 bool holding_lock = lane_mask != packed;
204 if (holding_lock)
205 __scoped_atomic_thread_fence(__ATOMIC_ACQUIRE, __MEMORY_SCOPE_DEVICE);
206 return holding_lock;
207 }
208
209 /// Unlock the lock at index. We need a lane sync to keep this function
210 /// convergent, otherwise the compiler will sink the store and deadlock.
unlockProcess211 RPC_ATTRS void unlock(uint64_t lane_mask, uint32_t index) {
212 // Do not move any writes past the unlock.
213 __scoped_atomic_thread_fence(__ATOMIC_RELEASE, __MEMORY_SCOPE_DEVICE);
214
215 // Use exactly one thread to clear the nth bit in the lock array Must
216 // restrict to a single thread to avoid one thread dropping the lock, then
217 // an unrelated warp claiming the lock, then a second thread in this warp
218 // dropping the lock again.
219 clear_nth(lock, index, rpc::is_first_lane(lane_mask));
220 rpc::sync_lane(lane_mask);
221 }
222
223 /// Number of bytes to allocate for an inbox or outbox.
mailbox_bytesProcess224 RPC_ATTRS static constexpr uint64_t mailbox_bytes(uint32_t port_count) {
225 return port_count * sizeof(uint32_t);
226 }
227
228 /// Number of bytes to allocate for the buffer containing the packets.
buffer_bytesProcess229 RPC_ATTRS static constexpr uint64_t buffer_bytes(uint32_t port_count,
230 uint32_t lane_size) {
231 return port_count * lane_size * sizeof(Buffer);
232 }
233
234 /// Offset of the inbox in memory. This is the same as the outbox if inverted.
inbox_offsetProcess235 RPC_ATTRS static constexpr uint64_t inbox_offset(uint32_t port_count) {
236 return Invert ? mailbox_bytes(port_count) : 0;
237 }
238
239 /// Offset of the outbox in memory. This is the same as the inbox if inverted.
outbox_offsetProcess240 RPC_ATTRS static constexpr uint64_t outbox_offset(uint32_t port_count) {
241 return Invert ? 0 : mailbox_bytes(port_count);
242 }
243
244 /// Offset of the buffer containing the packets after the inbox and outbox.
header_offsetProcess245 RPC_ATTRS static constexpr uint64_t header_offset(uint32_t port_count) {
246 return align_up(2 * mailbox_bytes(port_count), alignof(Header));
247 }
248
249 /// Offset of the buffer containing the packets after the inbox and outbox.
buffer_offsetProcess250 RPC_ATTRS static constexpr uint64_t buffer_offset(uint32_t port_count) {
251 return align_up(header_offset(port_count) + port_count * sizeof(Header),
252 alignof(Buffer));
253 }
254
255 /// Conditionally set the n-th bit in the atomic bitfield.
set_nthProcess256 RPC_ATTRS static constexpr uint32_t set_nth(uint32_t *bits, uint32_t index,
257 bool cond) {
258 uint32_t slot = index / NUM_BITS_IN_WORD;
259 uint32_t bit = index % NUM_BITS_IN_WORD;
260 return __scoped_atomic_fetch_or(&bits[slot],
261 static_cast<uint32_t>(cond) << bit,
262 __ATOMIC_RELAXED, __MEMORY_SCOPE_DEVICE) &
263 (1u << bit);
264 }
265
266 /// Conditionally clear the n-th bit in the atomic bitfield.
clear_nthProcess267 RPC_ATTRS static constexpr uint32_t clear_nth(uint32_t *bits, uint32_t index,
268 bool cond) {
269 uint32_t slot = index / NUM_BITS_IN_WORD;
270 uint32_t bit = index % NUM_BITS_IN_WORD;
271 return __scoped_atomic_fetch_and(&bits[slot],
272 ~0u ^ (static_cast<uint32_t>(cond) << bit),
273 __ATOMIC_RELAXED, __MEMORY_SCOPE_DEVICE) &
274 (1u << bit);
275 }
276 };
277
278 /// Invokes a function accross every active buffer across the total lane size.
279 template <typename F>
invoke_rpc(F && fn,uint32_t lane_size,uint64_t lane_mask,Buffer * slot)280 RPC_ATTRS static void invoke_rpc(F &&fn, uint32_t lane_size, uint64_t lane_mask,
281 Buffer *slot) {
282 if constexpr (is_process_gpu()) {
283 fn(&slot[rpc::get_lane_id()], rpc::get_lane_id());
284 } else {
285 for (uint32_t i = 0; i < lane_size; i += rpc::get_num_lanes())
286 if (lane_mask & (1ul << i))
287 fn(&slot[i], i);
288 }
289 }
290
291 /// The port provides the interface to communicate between the multiple
292 /// processes. A port is conceptually an index into the memory provided by the
293 /// underlying process that is guarded by a lock bit.
294 template <bool T> struct Port {
PortPort295 RPC_ATTRS Port(Process<T> &process, uint64_t lane_mask, uint32_t lane_size,
296 uint32_t index, uint32_t out)
297 : process(process), lane_mask(lane_mask), lane_size(lane_size),
298 index(index), out(out), receive(false), owns_buffer(true) {}
299 RPC_ATTRS ~Port() = default;
300
301 private:
302 RPC_ATTRS Port(const Port &) = delete;
303 RPC_ATTRS Port &operator=(const Port &) = delete;
304 RPC_ATTRS Port(Port &&) = default;
305 RPC_ATTRS Port &operator=(Port &&) = default;
306
307 friend struct Client;
308 friend struct Server;
309 friend class rpc::optional<Port<T>>;
310
311 public:
312 template <typename U> RPC_ATTRS void recv(U use);
313 template <typename F> RPC_ATTRS void send(F fill);
314 template <typename F, typename U> RPC_ATTRS void send_and_recv(F fill, U use);
315 template <typename W> RPC_ATTRS void recv_and_send(W work);
316 RPC_ATTRS void send_n(const void *const *src, uint64_t *size);
317 RPC_ATTRS void send_n(const void *src, uint64_t size);
318 template <typename A>
319 RPC_ATTRS void recv_n(void **dst, uint64_t *size, A &&alloc);
320
get_opcodePort321 RPC_ATTRS uint32_t get_opcode() const { return process.header[index].opcode; }
322
get_indexPort323 RPC_ATTRS uint32_t get_index() const { return index; }
324
closePort325 RPC_ATTRS void close() {
326 // Wait for all lanes to finish using the port.
327 rpc::sync_lane(lane_mask);
328
329 // The server is passive, if it own the buffer when it closes we need to
330 // give ownership back to the client.
331 if (owns_buffer && T)
332 out = process.invert_outbox(index, out);
333 process.unlock(lane_mask, index);
334 }
335
336 private:
337 Process<T> &process;
338 uint64_t lane_mask;
339 uint32_t lane_size;
340 uint32_t index;
341 uint32_t out;
342 bool receive;
343 bool owns_buffer;
344 };
345
346 /// The RPC client used to make requests to the server.
347 struct Client {
348 RPC_ATTRS Client() = default;
349 RPC_ATTRS Client(const Client &) = delete;
350 RPC_ATTRS Client &operator=(const Client &) = delete;
351 RPC_ATTRS ~Client() = default;
352
ClientClient353 RPC_ATTRS Client(uint32_t port_count, void *buffer)
354 : process(port_count, buffer) {}
355
356 using Port = rpc::Port<false>;
357 template <uint32_t opcode> RPC_ATTRS Port open();
358
359 private:
360 Process<false> process;
361 };
362
363 /// The RPC server used to respond to the client.
364 struct Server {
365 RPC_ATTRS Server() = default;
366 RPC_ATTRS Server(const Server &) = delete;
367 RPC_ATTRS Server &operator=(const Server &) = delete;
368 RPC_ATTRS ~Server() = default;
369
ServerServer370 RPC_ATTRS Server(uint32_t port_count, void *buffer)
371 : process(port_count, buffer) {}
372
373 using Port = rpc::Port<true>;
374 RPC_ATTRS rpc::optional<Port> try_open(uint32_t lane_size,
375 uint32_t start = 0);
376 RPC_ATTRS Port open(uint32_t lane_size);
377
allocation_sizeServer378 RPC_ATTRS static uint64_t allocation_size(uint32_t lane_size,
379 uint32_t port_count) {
380 return Process<true>::allocation_size(port_count, lane_size);
381 }
382
383 private:
384 Process<true> process;
385 };
386
387 /// Applies \p fill to the shared buffer and initiates a send operation.
send(F fill)388 template <bool T> template <typename F> RPC_ATTRS void Port<T>::send(F fill) {
389 uint32_t in = owns_buffer ? out ^ T : process.load_inbox(lane_mask, index);
390
391 // We need to wait until we own the buffer before sending.
392 process.wait_for_ownership(lane_mask, index, out, in);
393
394 // Apply the \p fill function to initialize the buffer and release the memory.
395 invoke_rpc(fill, lane_size, process.header[index].mask,
396 process.get_packet(index, lane_size));
397 out = process.invert_outbox(index, out);
398 owns_buffer = false;
399 receive = false;
400 }
401
402 /// Applies \p use to the shared buffer and acknowledges the send.
recv(U use)403 template <bool T> template <typename U> RPC_ATTRS void Port<T>::recv(U use) {
404 // We only exchange ownership of the buffer during a receive if we are waiting
405 // for a previous receive to finish.
406 if (receive) {
407 out = process.invert_outbox(index, out);
408 owns_buffer = false;
409 }
410
411 uint32_t in = owns_buffer ? out ^ T : process.load_inbox(lane_mask, index);
412
413 // We need to wait until we own the buffer before receiving.
414 process.wait_for_ownership(lane_mask, index, out, in);
415
416 // Apply the \p use function to read the memory out of the buffer.
417 invoke_rpc(use, lane_size, process.header[index].mask,
418 process.get_packet(index, lane_size));
419 receive = true;
420 owns_buffer = true;
421 }
422
423 /// Combines a send and receive into a single function.
424 template <bool T>
425 template <typename F, typename U>
send_and_recv(F fill,U use)426 RPC_ATTRS void Port<T>::send_and_recv(F fill, U use) {
427 send(fill);
428 recv(use);
429 }
430
431 /// Combines a receive and send operation into a single function. The \p work
432 /// function modifies the buffer in-place and the send is only used to initiate
433 /// the copy back.
434 template <bool T>
435 template <typename W>
recv_and_send(W work)436 RPC_ATTRS void Port<T>::recv_and_send(W work) {
437 recv(work);
438 send([](Buffer *, uint32_t) { /* no-op */ });
439 }
440
441 /// Helper routine to simplify the interface when sending from the GPU using
442 /// thread private pointers to the underlying value.
443 template <bool T>
send_n(const void * src,uint64_t size)444 RPC_ATTRS void Port<T>::send_n(const void *src, uint64_t size) {
445 const void **src_ptr = &src;
446 uint64_t *size_ptr = &size;
447 send_n(src_ptr, size_ptr);
448 }
449
450 /// Sends an arbitrarily sized data buffer \p src across the shared channel in
451 /// multiples of the packet length.
452 template <bool T>
send_n(const void * const * src,uint64_t * size)453 RPC_ATTRS void Port<T>::send_n(const void *const *src, uint64_t *size) {
454 uint64_t num_sends = 0;
455 send([&](Buffer *buffer, uint32_t id) {
456 reinterpret_cast<uint64_t *>(buffer->data)[0] = lane_value(size, id);
457 num_sends = is_process_gpu() ? lane_value(size, id)
458 : rpc::max(lane_value(size, id), num_sends);
459 uint64_t len =
460 lane_value(size, id) > sizeof(Buffer::data) - sizeof(uint64_t)
461 ? sizeof(Buffer::data) - sizeof(uint64_t)
462 : lane_value(size, id);
463 rpc_memcpy(&buffer->data[1], lane_value(src, id), len);
464 });
465 uint64_t idx = sizeof(Buffer::data) - sizeof(uint64_t);
466 uint64_t mask = process.header[index].mask;
467 while (rpc::ballot(mask, idx < num_sends)) {
468 send([=](Buffer *buffer, uint32_t id) {
469 uint64_t len = lane_value(size, id) - idx > sizeof(Buffer::data)
470 ? sizeof(Buffer::data)
471 : lane_value(size, id) - idx;
472 if (idx < lane_value(size, id))
473 rpc_memcpy(buffer->data, advance(lane_value(src, id), idx), len);
474 });
475 idx += sizeof(Buffer::data);
476 }
477 }
478
479 /// Receives an arbitrarily sized data buffer across the shared channel in
480 /// multiples of the packet length. The \p alloc function is called with the
481 /// size of the data so that we can initialize the size of the \p dst buffer.
482 template <bool T>
483 template <typename A>
recv_n(void ** dst,uint64_t * size,A && alloc)484 RPC_ATTRS void Port<T>::recv_n(void **dst, uint64_t *size, A &&alloc) {
485 uint64_t num_recvs = 0;
486 recv([&](Buffer *buffer, uint32_t id) {
487 lane_value(size, id) = reinterpret_cast<uint64_t *>(buffer->data)[0];
488 lane_value(dst, id) =
489 reinterpret_cast<uint8_t *>(alloc(lane_value(size, id)));
490 num_recvs = is_process_gpu() ? lane_value(size, id)
491 : rpc::max(lane_value(size, id), num_recvs);
492 uint64_t len =
493 lane_value(size, id) > sizeof(Buffer::data) - sizeof(uint64_t)
494 ? sizeof(Buffer::data) - sizeof(uint64_t)
495 : lane_value(size, id);
496 rpc_memcpy(lane_value(dst, id), &buffer->data[1], len);
497 });
498 uint64_t idx = sizeof(Buffer::data) - sizeof(uint64_t);
499 uint64_t mask = process.header[index].mask;
500 while (rpc::ballot(mask, idx < num_recvs)) {
501 recv([=](Buffer *buffer, uint32_t id) {
502 uint64_t len = lane_value(size, id) - idx > sizeof(Buffer::data)
503 ? sizeof(Buffer::data)
504 : lane_value(size, id) - idx;
505 if (idx < lane_value(size, id))
506 rpc_memcpy(advance(lane_value(dst, id), idx), buffer->data, len);
507 });
508 idx += sizeof(Buffer::data);
509 }
510 }
511
512 /// Continually attempts to open a port to use as the client. The client can
513 /// only open a port if we find an index that is in a valid sending state. That
514 /// is, there are send operations pending that haven't been serviced on this
515 /// port. Each port instance uses an associated \p opcode to tell the server
516 /// what to do. The Client interface provides the appropriate lane size to the
517 /// port using the platform's returned value.
open()518 template <uint32_t opcode> RPC_ATTRS Client::Port Client::open() {
519 // Repeatedly perform a naive linear scan for a port that can be opened to
520 // send data.
521 for (uint32_t index = 0;; ++index) {
522 // Start from the beginning if we run out of ports to check.
523 if (index >= process.port_count)
524 index = 0;
525
526 // Attempt to acquire the lock on this index.
527 uint64_t lane_mask = rpc::get_lane_mask();
528 if (!process.try_lock(lane_mask, index))
529 continue;
530
531 uint32_t in = process.load_inbox(lane_mask, index);
532 uint32_t out = process.load_outbox(lane_mask, index);
533
534 // Once we acquire the index we need to check if we are in a valid sending
535 // state.
536 if (process.buffer_unavailable(in, out)) {
537 process.unlock(lane_mask, index);
538 continue;
539 }
540
541 if (rpc::is_first_lane(lane_mask)) {
542 process.header[index].opcode = opcode;
543 process.header[index].mask = lane_mask;
544 }
545 rpc::sync_lane(lane_mask);
546 return Port(process, lane_mask, rpc::get_num_lanes(), index, out);
547 }
548 }
549
550 /// Attempts to open a port to use as the server. The server can only open a
551 /// port if it has a pending receive operation
552 RPC_ATTRS rpc::optional<typename Server::Port>
try_open(uint32_t lane_size,uint32_t start)553 Server::try_open(uint32_t lane_size, uint32_t start) {
554 // Perform a naive linear scan for a port that has a pending request.
555 for (uint32_t index = start; index < process.port_count; ++index) {
556 uint64_t lane_mask = rpc::get_lane_mask();
557 uint32_t in = process.load_inbox(lane_mask, index);
558 uint32_t out = process.load_outbox(lane_mask, index);
559
560 // The server is passive, if there is no work pending don't bother
561 // opening a port.
562 if (process.buffer_unavailable(in, out))
563 continue;
564
565 // Attempt to acquire the lock on this index.
566 if (!process.try_lock(lane_mask, index))
567 continue;
568
569 in = process.load_inbox(lane_mask, index);
570 out = process.load_outbox(lane_mask, index);
571
572 if (process.buffer_unavailable(in, out)) {
573 process.unlock(lane_mask, index);
574 continue;
575 }
576
577 return Port(process, lane_mask, lane_size, index, out);
578 }
579 return rpc::nullopt;
580 }
581
open(uint32_t lane_size)582 RPC_ATTRS Server::Port Server::open(uint32_t lane_size) {
583 for (;;) {
584 if (rpc::optional<Server::Port> p = try_open(lane_size))
585 return rpc::move(p.value());
586 sleep_briefly();
587 }
588 }
589
590 #undef RPC_ATTRS
591 #if !__has_builtin(__scoped_atomic_load_n)
592 #undef __scoped_atomic_load_n
593 #undef __scoped_atomic_store_n
594 #undef __scoped_atomic_fetch_or
595 #undef __scoped_atomic_fetch_and
596 #endif
597 #if !__has_builtin(__scoped_atomic_thread_fence)
598 #undef __scoped_atomic_thread_fence
599 #endif
600
601 } // namespace rpc
602
603 #endif // LLVM_LIBC_SHARED_RPC_H
604