• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2010, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36 /** \defgroup PtlRPC Portal RPC and networking module.
37  *
38  * PortalRPC is the layer used by rest of lustre code to achieve network
39  * communications: establish connections with corresponding export and import
40  * states, listen for a service, send and receive RPCs.
41  * PortalRPC also includes base recovery framework: packet resending and
42  * replaying, reconnections, pinger.
43  *
44  * PortalRPC utilizes LNet as its transport layer.
45  *
46  * @{
47  */
48 
49 #ifndef _LUSTRE_NET_H
50 #define _LUSTRE_NET_H
51 
52 /** \defgroup net net
53  *
54  * @{
55  */
56 
57 #include "../../include/linux/libcfs/libcfs.h"
58 #include "../../include/linux/lnet/nidstr.h"
59 #include "../../include/linux/lnet/api.h"
60 #include "lustre/lustre_idl.h"
61 #include "lustre_ha.h"
62 #include "lustre_sec.h"
63 #include "lustre_import.h"
64 #include "lprocfs_status.h"
65 #include "lu_object.h"
66 #include "lustre_req_layout.h"
67 
68 #include "obd_support.h"
69 #include "lustre_ver.h"
70 
71 /* MD flags we _always_ use */
72 #define PTLRPC_MD_OPTIONS  0
73 
74 /**
75  * Max # of bulk operations in one request.
76  * In order for the client and server to properly negotiate the maximum
77  * possible transfer size, PTLRPC_BULK_OPS_COUNT must be a power-of-two
78  * value.  The client is free to limit the actual RPC size for any bulk
79  * transfer via cl_max_pages_per_rpc to some non-power-of-two value. */
80 #define PTLRPC_BULK_OPS_BITS	2
81 #define PTLRPC_BULK_OPS_COUNT	(1U << PTLRPC_BULK_OPS_BITS)
82 /**
83  * PTLRPC_BULK_OPS_MASK is for the convenience of the client only, and
84  * should not be used on the server at all.  Otherwise, it imposes a
85  * protocol limitation on the maximum RPC size that can be used by any
86  * RPC sent to that server in the future.  Instead, the server should
87  * use the negotiated per-client ocd_brw_size to determine the bulk
88  * RPC count. */
89 #define PTLRPC_BULK_OPS_MASK	(~((__u64)PTLRPC_BULK_OPS_COUNT - 1))
90 
91 /**
92  * Define maxima for bulk I/O.
93  *
94  * A single PTLRPC BRW request is sent via up to PTLRPC_BULK_OPS_COUNT
95  * of LNET_MTU sized RDMA transfers.  Clients and servers negotiate the
96  * currently supported maximum between peers at connect via ocd_brw_size.
97  */
98 #define PTLRPC_MAX_BRW_BITS	(LNET_MTU_BITS + PTLRPC_BULK_OPS_BITS)
99 #define PTLRPC_MAX_BRW_SIZE	(1 << PTLRPC_MAX_BRW_BITS)
100 #define PTLRPC_MAX_BRW_PAGES	(PTLRPC_MAX_BRW_SIZE >> PAGE_CACHE_SHIFT)
101 
102 #define ONE_MB_BRW_SIZE		(1 << LNET_MTU_BITS)
103 #define MD_MAX_BRW_SIZE		(1 << LNET_MTU_BITS)
104 #define MD_MAX_BRW_PAGES	(MD_MAX_BRW_SIZE >> PAGE_CACHE_SHIFT)
105 #define DT_MAX_BRW_SIZE		PTLRPC_MAX_BRW_SIZE
106 #define DT_MAX_BRW_PAGES	(DT_MAX_BRW_SIZE >> PAGE_CACHE_SHIFT)
107 #define OFD_MAX_BRW_SIZE	(1 << LNET_MTU_BITS)
108 
109 /* When PAGE_SIZE is a constant, we can check our arithmetic here with cpp! */
110 # if ((PTLRPC_MAX_BRW_PAGES & (PTLRPC_MAX_BRW_PAGES - 1)) != 0)
111 #  error "PTLRPC_MAX_BRW_PAGES isn't a power of two"
112 # endif
113 # if (PTLRPC_MAX_BRW_SIZE != (PTLRPC_MAX_BRW_PAGES * PAGE_CACHE_SIZE))
114 #  error "PTLRPC_MAX_BRW_SIZE isn't PTLRPC_MAX_BRW_PAGES * PAGE_CACHE_SIZE"
115 # endif
116 # if (PTLRPC_MAX_BRW_SIZE > LNET_MTU * PTLRPC_BULK_OPS_COUNT)
117 #  error "PTLRPC_MAX_BRW_SIZE too big"
118 # endif
119 # if (PTLRPC_MAX_BRW_PAGES > LNET_MAX_IOV * PTLRPC_BULK_OPS_COUNT)
120 #  error "PTLRPC_MAX_BRW_PAGES too big"
121 # endif
122 
123 #define PTLRPC_NTHRS_INIT	2
124 
125 /**
126  * Buffer Constants
127  *
128  * Constants determine how memory is used to buffer incoming service requests.
129  *
130  * ?_NBUFS	      # buffers to allocate when growing the pool
131  * ?_BUFSIZE	    # bytes in a single request buffer
132  * ?_MAXREQSIZE	 # maximum request service will receive
133  *
134  * When fewer than ?_NBUFS/2 buffers are posted for receive, another chunk
135  * of ?_NBUFS is added to the pool.
136  *
137  * Messages larger than ?_MAXREQSIZE are dropped.  Request buffers are
138  * considered full when less than ?_MAXREQSIZE is left in them.
139  */
140 /**
141  * Thread Constants
142  *
143  * Constants determine how threads are created for ptlrpc service.
144  *
145  * ?_NTHRS_INIT		# threads to create for each service partition on
146  *			  initializing. If it's non-affinity service and
147  *			  there is only one partition, it's the overall #
148  *			  threads for the service while initializing.
149  * ?_NTHRS_BASE		# threads should be created at least for each
150  *			  ptlrpc partition to keep the service healthy.
151  *			  It's the low-water mark of threads upper-limit
152  *			  for each partition.
153  * ?_THR_FACTOR	 # threads can be added on threads upper-limit for
154  *			  each CPU core. This factor is only for reference,
155  *			  we might decrease value of factor if number of cores
156  *			  per CPT is above a limit.
157  * ?_NTHRS_MAX		# overall threads can be created for a service,
158  *			  it's a soft limit because if service is running
159  *			  on machine with hundreds of cores and tens of
160  *			  CPU partitions, we need to guarantee each partition
161  *			  has ?_NTHRS_BASE threads, which means total threads
162  *			  will be ?_NTHRS_BASE * number_of_cpts which can
163  *			  exceed ?_NTHRS_MAX.
164  *
165  * Examples
166  *
167  * #define MDS_NTHRS_INIT	2
168  * #define MDS_NTHRS_BASE	64
169  * #define MDS_NTHRS_FACTOR	8
170  * #define MDS_NTHRS_MAX	1024
171  *
172  * Example 1):
173  * ---------------------------------------------------------------------
174  * Server(A) has 16 cores, user configured it to 4 partitions so each
175  * partition has 4 cores, then actual number of service threads on each
176  * partition is:
177  *     MDS_NTHRS_BASE(64) + cores(4) * MDS_NTHRS_FACTOR(8) = 96
178  *
179  * Total number of threads for the service is:
180  *     96 * partitions(4) = 384
181  *
182  * Example 2):
183  * ---------------------------------------------------------------------
184  * Server(B) has 32 cores, user configured it to 4 partitions so each
185  * partition has 8 cores, then actual number of service threads on each
186  * partition is:
187  *     MDS_NTHRS_BASE(64) + cores(8) * MDS_NTHRS_FACTOR(8) = 128
188  *
189  * Total number of threads for the service is:
190  *     128 * partitions(4) = 512
191  *
192  * Example 3):
193  * ---------------------------------------------------------------------
194  * Server(B) has 96 cores, user configured it to 8 partitions so each
195  * partition has 12 cores, then actual number of service threads on each
196  * partition is:
197  *     MDS_NTHRS_BASE(64) + cores(12) * MDS_NTHRS_FACTOR(8) = 160
198  *
199  * Total number of threads for the service is:
200  *     160 * partitions(8) = 1280
201  *
202  * However, it's above the soft limit MDS_NTHRS_MAX, so we choose this number
203  * as upper limit of threads number for each partition:
204  *     MDS_NTHRS_MAX(1024) / partitions(8) = 128
205  *
206  * Example 4):
207  * ---------------------------------------------------------------------
208  * Server(C) have a thousand of cores and user configured it to 32 partitions
209  *     MDS_NTHRS_BASE(64) * 32 = 2048
210  *
211  * which is already above soft limit MDS_NTHRS_MAX(1024), but we still need
212  * to guarantee that each partition has at least MDS_NTHRS_BASE(64) threads
213  * to keep service healthy, so total number of threads will just be 2048.
214  *
215  * NB: we don't suggest to choose server with that many cores because backend
216  *     filesystem itself, buffer cache, or underlying network stack might
217  *     have some SMP scalability issues at that large scale.
218  *
219  *     If user already has a fat machine with hundreds or thousands of cores,
220  *     there are two choices for configuration:
221  *     a) create CPU table from subset of all CPUs and run Lustre on
222  *	top of this subset
223  *     b) bind service threads on a few partitions, see modparameters of
224  *	MDS and OSS for details
225 *
226  * NB: these calculations (and examples below) are simplified to help
227  *     understanding, the real implementation is a little more complex,
228  *     please see ptlrpc_server_nthreads_check() for details.
229  *
230  */
231 
232  /*
233   * LDLM threads constants:
234   *
235   * Given 8 as factor and 24 as base threads number
236   *
237   * example 1)
238   * On 4-core machine we will have 24 + 8 * 4 = 56 threads.
239   *
240   * example 2)
241   * On 8-core machine with 2 partitions we will have 24 + 4 * 8 = 56
242   * threads for each partition and total threads number will be 112.
243   *
244   * example 3)
245   * On 64-core machine with 8 partitions we will need LDLM_NTHRS_BASE(24)
246   * threads for each partition to keep service healthy, so total threads
247   * number should be 24 * 8 = 192.
248   *
249   * So with these constants, threads number will be at the similar level
250   * of old versions, unless target machine has over a hundred cores
251   */
252 #define LDLM_THR_FACTOR		8
253 #define LDLM_NTHRS_INIT		PTLRPC_NTHRS_INIT
254 #define LDLM_NTHRS_BASE		24
255 #define LDLM_NTHRS_MAX		(num_online_cpus() == 1 ? 64 : 128)
256 
257 #define LDLM_BL_THREADS   LDLM_NTHRS_AUTO_INIT
258 #define LDLM_CLIENT_NBUFS 1
259 #define LDLM_SERVER_NBUFS 64
260 #define LDLM_BUFSIZE      (8 * 1024)
261 #define LDLM_MAXREQSIZE   (5 * 1024)
262 #define LDLM_MAXREPSIZE   (1024)
263 
264 #define MDS_MAXREQSIZE		(5 * 1024)	/* >= 4736 */
265 
266 #define OST_MAXREQSIZE		(5 * 1024)
267 
268 /* Macro to hide a typecast. */
269 #define ptlrpc_req_async_args(req) ((void *)&req->rq_async_args)
270 
271 /**
272  * Structure to single define portal connection.
273  */
274 struct ptlrpc_connection {
275 	/** linkage for connections hash table */
276 	struct hlist_node	c_hash;
277 	/** Our own lnet nid for this connection */
278 	lnet_nid_t	      c_self;
279 	/** Remote side nid for this connection */
280 	lnet_process_id_t       c_peer;
281 	/** UUID of the other side */
282 	struct obd_uuid	 c_remote_uuid;
283 	/** reference counter for this connection */
284 	atomic_t	    c_refcount;
285 };
286 
287 /** Client definition for PortalRPC */
288 struct ptlrpc_client {
289 	/** What lnet portal does this client send messages to by default */
290 	__u32		   cli_request_portal;
291 	/** What portal do we expect replies on */
292 	__u32		   cli_reply_portal;
293 	/** Name of the client */
294 	char		   *cli_name;
295 };
296 
297 /** state flags of requests */
298 /* XXX only ones left are those used by the bulk descs as well! */
299 #define PTL_RPC_FL_INTR      (1 << 0)  /* reply wait was interrupted by user */
300 #define PTL_RPC_FL_TIMEOUT   (1 << 7)  /* request timed out waiting for reply */
301 
302 #define REQ_MAX_ACK_LOCKS 8
303 
304 union ptlrpc_async_args {
305 	/**
306 	 * Scratchpad for passing args to completion interpreter. Users
307 	 * cast to the struct of their choosing, and CLASSERT that this is
308 	 * big enough.  For _tons_ of context, kmalloc a struct and store
309 	 * a pointer to it here.  The pointer_arg ensures this struct is at
310 	 * least big enough for that.
311 	 */
312 	void      *pointer_arg[11];
313 	__u64      space[7];
314 };
315 
316 struct ptlrpc_request_set;
317 typedef int (*set_interpreter_func)(struct ptlrpc_request_set *, void *, int);
318 typedef int (*set_producer_func)(struct ptlrpc_request_set *, void *);
319 
320 /**
321  * Definition of request set structure.
322  * Request set is a list of requests (not necessary to the same target) that
323  * once populated with RPCs could be sent in parallel.
324  * There are two kinds of request sets. General purpose and with dedicated
325  * serving thread. Example of the latter is ptlrpcd set.
326  * For general purpose sets once request set started sending it is impossible
327  * to add new requests to such set.
328  * Provides a way to call "completion callbacks" when all requests in the set
329  * returned.
330  */
331 struct ptlrpc_request_set {
332 	atomic_t	  set_refcount;
333 	/** number of in queue requests */
334 	atomic_t	  set_new_count;
335 	/** number of uncompleted requests */
336 	atomic_t	  set_remaining;
337 	/** wait queue to wait on for request events */
338 	wait_queue_head_t	   set_waitq;
339 	wait_queue_head_t	  *set_wakeup_ptr;
340 	/** List of requests in the set */
341 	struct list_head	    set_requests;
342 	/**
343 	 * List of completion callbacks to be called when the set is completed
344 	 * This is only used if \a set_interpret is NULL.
345 	 * Links struct ptlrpc_set_cbdata.
346 	 */
347 	struct list_head	    set_cblist;
348 	/** Completion callback, if only one. */
349 	set_interpreter_func  set_interpret;
350 	/** opaq argument passed to completion \a set_interpret callback. */
351 	void		 *set_arg;
352 	/**
353 	 * Lock for \a set_new_requests manipulations
354 	 * locked so that any old caller can communicate requests to
355 	 * the set holder who can then fold them into the lock-free set
356 	 */
357 	spinlock_t		set_new_req_lock;
358 	/** List of new yet unsent requests. Only used with ptlrpcd now. */
359 	struct list_head	    set_new_requests;
360 
361 	/** rq_status of requests that have been freed already */
362 	int		   set_rc;
363 	/** Additional fields used by the flow control extension */
364 	/** Maximum number of RPCs in flight */
365 	int		   set_max_inflight;
366 	/** Callback function used to generate RPCs */
367 	set_producer_func     set_producer;
368 	/** opaq argument passed to the producer callback */
369 	void		 *set_producer_arg;
370 };
371 
372 /**
373  * Description of a single ptrlrpc_set callback
374  */
375 struct ptlrpc_set_cbdata {
376 	/** List linkage item */
377 	struct list_head	      psc_item;
378 	/** Pointer to interpreting function */
379 	set_interpreter_func    psc_interpret;
380 	/** Opaq argument to pass to the callback */
381 	void		   *psc_data;
382 };
383 
384 struct ptlrpc_bulk_desc;
385 struct ptlrpc_service_part;
386 struct ptlrpc_service;
387 
388 /**
389  * ptlrpc callback & work item stuff
390  */
391 struct ptlrpc_cb_id {
392 	void   (*cbid_fn)(lnet_event_t *ev);     /* specific callback fn */
393 	void    *cbid_arg;		      /* additional arg */
394 };
395 
396 /** Maximum number of locks to fit into reply state */
397 #define RS_MAX_LOCKS 8
398 #define RS_DEBUG     0
399 
400 /**
401  * Structure to define reply state on the server
402  * Reply state holds various reply message information. Also for "difficult"
403  * replies (rep-ack case) we store the state after sending reply and wait
404  * for the client to acknowledge the reception. In these cases locks could be
405  * added to the state for replay/failover consistency guarantees.
406  */
407 struct ptlrpc_reply_state {
408 	/** Callback description */
409 	struct ptlrpc_cb_id    rs_cb_id;
410 	/** Linkage for list of all reply states in a system */
411 	struct list_head	     rs_list;
412 	/** Linkage for list of all reply states on same export */
413 	struct list_head	     rs_exp_list;
414 	/** Linkage for list of all reply states for same obd */
415 	struct list_head	     rs_obd_list;
416 #if RS_DEBUG
417 	struct list_head	     rs_debug_list;
418 #endif
419 	/** A spinlock to protect the reply state flags */
420 	spinlock_t		rs_lock;
421 	/** Reply state flags */
422 	unsigned long	  rs_difficult:1;     /* ACK/commit stuff */
423 	unsigned long	  rs_no_ack:1;    /* no ACK, even for
424 						  difficult requests */
425 	unsigned long	  rs_scheduled:1;     /* being handled? */
426 	unsigned long	  rs_scheduled_ever:1;/* any schedule attempts? */
427 	unsigned long	  rs_handled:1;  /* been handled yet? */
428 	unsigned long	  rs_on_net:1;   /* reply_out_callback pending? */
429 	unsigned long	  rs_prealloc:1; /* rs from prealloc list */
430 	unsigned long	  rs_committed:1;/* the transaction was committed
431 					  * and the rs was dispatched */
432 	/** Size of the state */
433 	int		    rs_size;
434 	/** opcode */
435 	__u32		  rs_opc;
436 	/** Transaction number */
437 	__u64		  rs_transno;
438 	/** xid */
439 	__u64		  rs_xid;
440 	struct obd_export     *rs_export;
441 	struct ptlrpc_service_part *rs_svcpt;
442 	/** Lnet metadata handle for the reply */
443 	lnet_handle_md_t       rs_md_h;
444 	atomic_t	   rs_refcount;
445 
446 	/** Context for the service thread */
447 	struct ptlrpc_svc_ctx *rs_svc_ctx;
448 	/** Reply buffer (actually sent to the client), encoded if needed */
449 	struct lustre_msg     *rs_repbuf;       /* wrapper */
450 	/** Size of the reply buffer */
451 	int		    rs_repbuf_len;   /* wrapper buf length */
452 	/** Size of the reply message */
453 	int		    rs_repdata_len;  /* wrapper msg length */
454 	/**
455 	 * Actual reply message. Its content is encrypted (if needed) to
456 	 * produce reply buffer for actual sending. In simple case
457 	 * of no network encryption we just set \a rs_repbuf to \a rs_msg
458 	 */
459 	struct lustre_msg     *rs_msg;	  /* reply message */
460 
461 	/** Number of locks awaiting client ACK */
462 	int		    rs_nlocks;
463 	/** Handles of locks awaiting client reply ACK */
464 	struct lustre_handle   rs_locks[RS_MAX_LOCKS];
465 	/** Lock modes of locks in \a rs_locks */
466 	ldlm_mode_t	    rs_modes[RS_MAX_LOCKS];
467 };
468 
469 struct ptlrpc_thread;
470 
471 /** RPC stages */
472 enum rq_phase {
473 	RQ_PHASE_NEW	    = 0xebc0de00,
474 	RQ_PHASE_RPC	    = 0xebc0de01,
475 	RQ_PHASE_BULK	   = 0xebc0de02,
476 	RQ_PHASE_INTERPRET      = 0xebc0de03,
477 	RQ_PHASE_COMPLETE       = 0xebc0de04,
478 	RQ_PHASE_UNREGISTERING  = 0xebc0de05,
479 	RQ_PHASE_UNDEFINED      = 0xebc0de06
480 };
481 
482 /** Type of request interpreter call-back */
483 typedef int (*ptlrpc_interpterer_t)(const struct lu_env *env,
484 				    struct ptlrpc_request *req,
485 				    void *arg, int rc);
486 
487 /**
488  * Definition of request pool structure.
489  * The pool is used to store empty preallocated requests for the case
490  * when we would actually need to send something without performing
491  * any allocations (to avoid e.g. OOM).
492  */
493 struct ptlrpc_request_pool {
494 	/** Locks the list */
495 	spinlock_t prp_lock;
496 	/** list of ptlrpc_request structs */
497 	struct list_head prp_req_list;
498 	/** Maximum message size that would fit into a request from this pool */
499 	int prp_rq_size;
500 	/** Function to allocate more requests for this pool */
501 	int (*prp_populate)(struct ptlrpc_request_pool *, int);
502 };
503 
504 struct lu_context;
505 struct lu_env;
506 
507 struct ldlm_lock;
508 
509 /**
510  * \defgroup nrs Network Request Scheduler
511  * @{
512  */
513 struct ptlrpc_nrs_policy;
514 struct ptlrpc_nrs_resource;
515 struct ptlrpc_nrs_request;
516 
517 /**
518  * NRS control operations.
519  *
520  * These are common for all policies.
521  */
522 enum ptlrpc_nrs_ctl {
523 	/**
524 	 * Not a valid opcode.
525 	 */
526 	PTLRPC_NRS_CTL_INVALID,
527 	/**
528 	 * Activate the policy.
529 	 */
530 	PTLRPC_NRS_CTL_START,
531 	/**
532 	 * Reserved for multiple primary policies, which may be a possibility
533 	 * in the future.
534 	 */
535 	PTLRPC_NRS_CTL_STOP,
536 	/**
537 	 * Policies can start using opcodes from this value and onwards for
538 	 * their own purposes; the assigned value itself is arbitrary.
539 	 */
540 	PTLRPC_NRS_CTL_1ST_POL_SPEC = 0x20,
541 };
542 
543 /**
544  * ORR policy operations
545  */
546 enum nrs_ctl_orr {
547 	NRS_CTL_ORR_RD_QUANTUM = PTLRPC_NRS_CTL_1ST_POL_SPEC,
548 	NRS_CTL_ORR_WR_QUANTUM,
549 	NRS_CTL_ORR_RD_OFF_TYPE,
550 	NRS_CTL_ORR_WR_OFF_TYPE,
551 	NRS_CTL_ORR_RD_SUPP_REQ,
552 	NRS_CTL_ORR_WR_SUPP_REQ,
553 };
554 
555 /**
556  * NRS policy operations.
557  *
558  * These determine the behaviour of a policy, and are called in response to
559  * NRS core events.
560  */
561 struct ptlrpc_nrs_pol_ops {
562 	/**
563 	 * Called during policy registration; this operation is optional.
564 	 *
565 	 * \param[in,out] policy The policy being initialized
566 	 */
567 	int	(*op_policy_init) (struct ptlrpc_nrs_policy *policy);
568 	/**
569 	 * Called during policy unregistration; this operation is optional.
570 	 *
571 	 * \param[in,out] policy The policy being unregistered/finalized
572 	 */
573 	void	(*op_policy_fini) (struct ptlrpc_nrs_policy *policy);
574 	/**
575 	 * Called when activating a policy via lprocfs; policies allocate and
576 	 * initialize their resources here; this operation is optional.
577 	 *
578 	 * \param[in,out] policy The policy being started
579 	 *
580 	 * \see nrs_policy_start_locked()
581 	 */
582 	int	(*op_policy_start) (struct ptlrpc_nrs_policy *policy);
583 	/**
584 	 * Called when deactivating a policy via lprocfs; policies deallocate
585 	 * their resources here; this operation is optional
586 	 *
587 	 * \param[in,out] policy The policy being stopped
588 	 *
589 	 * \see nrs_policy_stop0()
590 	 */
591 	void	(*op_policy_stop) (struct ptlrpc_nrs_policy *policy);
592 	/**
593 	 * Used for policy-specific operations; i.e. not generic ones like
594 	 * \e PTLRPC_NRS_CTL_START and \e PTLRPC_NRS_CTL_GET_INFO; analogous
595 	 * to an ioctl; this operation is optional.
596 	 *
597 	 * \param[in,out]	 policy The policy carrying out operation \a opc
598 	 * \param[in]	  opc	 The command operation being carried out
599 	 * \param[in,out] arg	 An generic buffer for communication between the
600 	 *			 user and the control operation
601 	 *
602 	 * \retval -ve error
603 	 * \retval   0 success
604 	 *
605 	 * \see ptlrpc_nrs_policy_control()
606 	 */
607 	int	(*op_policy_ctl) (struct ptlrpc_nrs_policy *policy,
608 				  enum ptlrpc_nrs_ctl opc, void *arg);
609 
610 	/**
611 	 * Called when obtaining references to the resources of the resource
612 	 * hierarchy for a request that has arrived for handling at the PTLRPC
613 	 * service. Policies should return -ve for requests they do not wish
614 	 * to handle. This operation is mandatory.
615 	 *
616 	 * \param[in,out] policy  The policy we're getting resources for.
617 	 * \param[in,out] nrq	  The request we are getting resources for.
618 	 * \param[in]	  parent  The parent resource of the resource being
619 	 *			  requested; set to NULL if none.
620 	 * \param[out]	  resp	  The resource is to be returned here; the
621 	 *			  fallback policy in an NRS head should
622 	 *			  \e always return a non-NULL pointer value.
623 	 * \param[in]  moving_req When set, signifies that this is an attempt
624 	 *			  to obtain resources for a request being moved
625 	 *			  to the high-priority NRS head by
626 	 *			  ldlm_lock_reorder_req().
627 	 *			  This implies two things:
628 	 *			  1. We are under obd_export::exp_rpc_lock and
629 	 *			  so should not sleep.
630 	 *			  2. We should not perform non-idempotent or can
631 	 *			  skip performing idempotent operations that
632 	 *			  were carried out when resources were first
633 	 *			  taken for the request when it was initialized
634 	 *			  in ptlrpc_nrs_req_initialize().
635 	 *
636 	 * \retval 0, +ve The level of the returned resource in the resource
637 	 *		  hierarchy; currently only 0 (for a non-leaf resource)
638 	 *		  and 1 (for a leaf resource) are supported by the
639 	 *		  framework.
640 	 * \retval -ve	  error
641 	 *
642 	 * \see ptlrpc_nrs_req_initialize()
643 	 * \see ptlrpc_nrs_hpreq_add_nolock()
644 	 */
645 	int	(*op_res_get) (struct ptlrpc_nrs_policy *policy,
646 			       struct ptlrpc_nrs_request *nrq,
647 			       const struct ptlrpc_nrs_resource *parent,
648 			       struct ptlrpc_nrs_resource **resp,
649 			       bool moving_req);
650 	/**
651 	 * Called when releasing references taken for resources in the resource
652 	 * hierarchy for the request; this operation is optional.
653 	 *
654 	 * \param[in,out] policy The policy the resource belongs to
655 	 * \param[in] res	 The resource to be freed
656 	 *
657 	 * \see ptlrpc_nrs_req_finalize()
658 	 * \see ptlrpc_nrs_hpreq_add_nolock()
659 	 */
660 	void	(*op_res_put) (struct ptlrpc_nrs_policy *policy,
661 			       const struct ptlrpc_nrs_resource *res);
662 
663 	/**
664 	 * Obtains a request for handling from the policy, and optionally
665 	 * removes the request from the policy; this operation is mandatory.
666 	 *
667 	 * \param[in,out] policy The policy to poll
668 	 * \param[in]	  peek	 When set, signifies that we just want to
669 	 *			 examine the request, and not handle it, so the
670 	 *			 request is not removed from the policy.
671 	 * \param[in]	  force	 When set, it will force a policy to return a
672 	 *			 request if it has one queued.
673 	 *
674 	 * \retval NULL No request available for handling
675 	 * \retval valid-pointer The request polled for handling
676 	 *
677 	 * \see ptlrpc_nrs_req_get_nolock()
678 	 */
679 	struct ptlrpc_nrs_request *
680 		(*op_req_get) (struct ptlrpc_nrs_policy *policy, bool peek,
681 			       bool force);
682 	/**
683 	 * Called when attempting to add a request to a policy for later
684 	 * handling; this operation is mandatory.
685 	 *
686 	 * \param[in,out] policy  The policy on which to enqueue \a nrq
687 	 * \param[in,out] nrq The request to enqueue
688 	 *
689 	 * \retval 0	success
690 	 * \retval != 0	error
691 	 *
692 	 * \see ptlrpc_nrs_req_add_nolock()
693 	 */
694 	int	(*op_req_enqueue) (struct ptlrpc_nrs_policy *policy,
695 				   struct ptlrpc_nrs_request *nrq);
696 	/**
697 	 * Removes a request from the policy's set of pending requests. Normally
698 	 * called after a request has been polled successfully from the policy
699 	 * for handling; this operation is mandatory.
700 	 *
701 	 * \param[in,out] policy The policy the request \a nrq belongs to
702 	 * \param[in,out] nrq    The request to dequeue
703 	 */
704 	void	(*op_req_dequeue) (struct ptlrpc_nrs_policy *policy,
705 				   struct ptlrpc_nrs_request *nrq);
706 	/**
707 	 * Called after the request being carried out. Could be used for
708 	 * job/resource control; this operation is optional.
709 	 *
710 	 * \param[in,out] policy The policy which is stopping to handle request
711 	 *			 \a nrq
712 	 * \param[in,out] nrq	 The request
713 	 *
714 	 * \pre assert_spin_locked(&svcpt->scp_req_lock)
715 	 *
716 	 * \see ptlrpc_nrs_req_stop_nolock()
717 	 */
718 	void	(*op_req_stop) (struct ptlrpc_nrs_policy *policy,
719 				struct ptlrpc_nrs_request *nrq);
720 	/**
721 	 * Registers the policy's lprocfs interface with a PTLRPC service.
722 	 *
723 	 * \param[in] svc The service
724 	 *
725 	 * \retval 0	success
726 	 * \retval != 0	error
727 	 */
728 	int	(*op_lprocfs_init) (struct ptlrpc_service *svc);
729 	/**
730 	 * Unegisters the policy's lprocfs interface with a PTLRPC service.
731 	 *
732 	 * In cases of failed policy registration in
733 	 * \e ptlrpc_nrs_policy_register(), this function may be called for a
734 	 * service which has not registered the policy successfully, so
735 	 * implementations of this method should make sure their operations are
736 	 * safe in such cases.
737 	 *
738 	 * \param[in] svc The service
739 	 */
740 	void	(*op_lprocfs_fini) (struct ptlrpc_service *svc);
741 };
742 
743 /**
744  * Policy flags
745  */
746 enum nrs_policy_flags {
747 	/**
748 	 * Fallback policy, use this flag only on a single supported policy per
749 	 * service. The flag cannot be used on policies that use
750 	 * \e PTLRPC_NRS_FL_REG_EXTERN
751 	 */
752 	PTLRPC_NRS_FL_FALLBACK		= (1 << 0),
753 	/**
754 	 * Start policy immediately after registering.
755 	 */
756 	PTLRPC_NRS_FL_REG_START		= (1 << 1),
757 	/**
758 	 * This is a policy registering from a module different to the one NRS
759 	 * core ships in (currently ptlrpc).
760 	 */
761 	PTLRPC_NRS_FL_REG_EXTERN	= (1 << 2),
762 };
763 
764 /**
765  * NRS queue type.
766  *
767  * Denotes whether an NRS instance is for handling normal or high-priority
768  * RPCs, or whether an operation pertains to one or both of the NRS instances
769  * in a service.
770  */
771 enum ptlrpc_nrs_queue_type {
772 	PTLRPC_NRS_QUEUE_REG	= (1 << 0),
773 	PTLRPC_NRS_QUEUE_HP	= (1 << 1),
774 	PTLRPC_NRS_QUEUE_BOTH	= (PTLRPC_NRS_QUEUE_REG | PTLRPC_NRS_QUEUE_HP)
775 };
776 
777 /**
778  * NRS head
779  *
780  * A PTLRPC service has at least one NRS head instance for handling normal
781  * priority RPCs, and may optionally have a second NRS head instance for
782  * handling high-priority RPCs. Each NRS head maintains a list of available
783  * policies, of which one and only one policy is acting as the fallback policy,
784  * and optionally a different policy may be acting as the primary policy. For
785  * all RPCs handled by this NRS head instance, NRS core will first attempt to
786  * enqueue the RPC using the primary policy (if any). The fallback policy is
787  * used in the following cases:
788  * - when there was no primary policy in the
789  *   ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED state at the time the request
790  *   was initialized.
791  * - when the primary policy that was at the
792  *   ptlrpc_nrs_pol_state::PTLRPC_NRS_POL_STATE_STARTED state at the time the
793  *   RPC was initialized, denoted it did not wish, or for some other reason was
794  *   not able to handle the request, by returning a non-valid NRS resource
795  *   reference.
796  * - when the primary policy that was at the
797  *   ptlrpc_nrs_pol_state::PTLRPC_NRS_POL_STATE_STARTED state at the time the
798  *   RPC was initialized, fails later during the request enqueueing stage.
799  *
800  * \see nrs_resource_get_safe()
801  * \see nrs_request_enqueue()
802  */
803 struct ptlrpc_nrs {
804 	spinlock_t			nrs_lock;
805 	/** XXX Possibly replace svcpt->scp_req_lock with another lock here. */
806 	/**
807 	 * List of registered policies
808 	 */
809 	struct list_head			nrs_policy_list;
810 	/**
811 	 * List of policies with queued requests. Policies that have any
812 	 * outstanding requests are queued here, and this list is queried
813 	 * in a round-robin manner from NRS core when obtaining a request
814 	 * for handling. This ensures that requests from policies that at some
815 	 * point transition away from the
816 	 * ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED state are drained.
817 	 */
818 	struct list_head			nrs_policy_queued;
819 	/**
820 	 * Service partition for this NRS head
821 	 */
822 	struct ptlrpc_service_part     *nrs_svcpt;
823 	/**
824 	 * Primary policy, which is the preferred policy for handling RPCs
825 	 */
826 	struct ptlrpc_nrs_policy       *nrs_policy_primary;
827 	/**
828 	 * Fallback policy, which is the backup policy for handling RPCs
829 	 */
830 	struct ptlrpc_nrs_policy       *nrs_policy_fallback;
831 	/**
832 	 * This NRS head handles either HP or regular requests
833 	 */
834 	enum ptlrpc_nrs_queue_type	nrs_queue_type;
835 	/**
836 	 * # queued requests from all policies in this NRS head
837 	 */
838 	unsigned long			nrs_req_queued;
839 	/**
840 	 * # scheduled requests from all policies in this NRS head
841 	 */
842 	unsigned long			nrs_req_started;
843 	/**
844 	 * # policies on this NRS
845 	 */
846 	unsigned			nrs_num_pols;
847 	/**
848 	 * This NRS head is in progress of starting a policy
849 	 */
850 	unsigned			nrs_policy_starting:1;
851 	/**
852 	 * In progress of shutting down the whole NRS head; used during
853 	 * unregistration
854 	 */
855 	unsigned			nrs_stopping:1;
856 };
857 
858 #define NRS_POL_NAME_MAX		16
859 
860 struct ptlrpc_nrs_pol_desc;
861 
862 /**
863  * Service compatibility predicate; this determines whether a policy is adequate
864  * for handling RPCs of a particular PTLRPC service.
865  *
866  * XXX:This should give the same result during policy registration and
867  * unregistration, and for all partitions of a service; so the result should not
868  * depend on temporal service or other properties, that may influence the
869  * result.
870  */
871 typedef bool (*nrs_pol_desc_compat_t) (const struct ptlrpc_service *svc,
872 				       const struct ptlrpc_nrs_pol_desc *desc);
873 
874 struct ptlrpc_nrs_pol_conf {
875 	/**
876 	 * Human-readable policy name
877 	 */
878 	char				   nc_name[NRS_POL_NAME_MAX];
879 	/**
880 	 * NRS operations for this policy
881 	 */
882 	const struct ptlrpc_nrs_pol_ops	  *nc_ops;
883 	/**
884 	 * Service compatibility predicate
885 	 */
886 	nrs_pol_desc_compat_t		   nc_compat;
887 	/**
888 	 * Set for policies that support a single ptlrpc service, i.e. ones that
889 	 * have \a pd_compat set to nrs_policy_compat_one(). The variable value
890 	 * depicts the name of the single service that such policies are
891 	 * compatible with.
892 	 */
893 	const char			  *nc_compat_svc_name;
894 	/**
895 	 * Owner module for this policy descriptor; policies registering from a
896 	 * different module to the one the NRS framework is held within
897 	 * (currently ptlrpc), should set this field to THIS_MODULE.
898 	 */
899 	struct module			  *nc_owner;
900 	/**
901 	 * Policy registration flags; a bitmask of \e nrs_policy_flags
902 	 */
903 	unsigned			   nc_flags;
904 };
905 
906 /**
907  * NRS policy registering descriptor
908  *
909  * Is used to hold a description of a policy that can be passed to NRS core in
910  * order to register the policy with NRS heads in different PTLRPC services.
911  */
912 struct ptlrpc_nrs_pol_desc {
913 	/**
914 	 * Human-readable policy name
915 	 */
916 	char					pd_name[NRS_POL_NAME_MAX];
917 	/**
918 	 * Link into nrs_core::nrs_policies
919 	 */
920 	struct list_head				pd_list;
921 	/**
922 	 * NRS operations for this policy
923 	 */
924 	const struct ptlrpc_nrs_pol_ops	       *pd_ops;
925 	/**
926 	 * Service compatibility predicate
927 	 */
928 	nrs_pol_desc_compat_t			pd_compat;
929 	/**
930 	 * Set for policies that are compatible with only one PTLRPC service.
931 	 *
932 	 * \see ptlrpc_nrs_pol_conf::nc_compat_svc_name
933 	 */
934 	const char			       *pd_compat_svc_name;
935 	/**
936 	 * Owner module for this policy descriptor.
937 	 *
938 	 * We need to hold a reference to the module whenever we might make use
939 	 * of any of the module's contents, i.e.
940 	 * - If one or more instances of the policy are at a state where they
941 	 *   might be handling a request, i.e.
942 	 *   ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED or
943 	 *   ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPING as we will have to
944 	 *   call into the policy's ptlrpc_nrs_pol_ops() handlers. A reference
945 	 *   is taken on the module when
946 	 *   \e ptlrpc_nrs_pol_desc::pd_refs becomes 1, and released when it
947 	 *   becomes 0, so that we hold only one reference to the module maximum
948 	 *   at any time.
949 	 *
950 	 *   We do not need to hold a reference to the module, even though we
951 	 *   might use code and data from the module, in the following cases:
952 	 * - During external policy registration, because this should happen in
953 	 *   the module's init() function, in which case the module is safe from
954 	 *   removal because a reference is being held on the module by the
955 	 *   kernel, and iirc kmod (and I guess module-init-tools also) will
956 	 *   serialize any racing processes properly anyway.
957 	 * - During external policy unregistration, because this should happen
958 	 *   in a module's exit() function, and any attempts to start a policy
959 	 *   instance would need to take a reference on the module, and this is
960 	 *   not possible once we have reached the point where the exit()
961 	 *   handler is called.
962 	 * - During service registration and unregistration, as service setup
963 	 *   and cleanup, and policy registration, unregistration and policy
964 	 *   instance starting, are serialized by \e nrs_core::nrs_mutex, so
965 	 *   as long as users adhere to the convention of registering policies
966 	 *   in init() and unregistering them in module exit() functions, there
967 	 *   should not be a race between these operations.
968 	 * - During any policy-specific lprocfs operations, because a reference
969 	 *   is held by the kernel on a proc entry that has been entered by a
970 	 *   syscall, so as long as proc entries are removed during unregistration time,
971 	 *   then unregistration and lprocfs operations will be properly
972 	 *   serialized.
973 	 */
974 	struct module			       *pd_owner;
975 	/**
976 	 * Bitmask of \e nrs_policy_flags
977 	 */
978 	unsigned				pd_flags;
979 	/**
980 	 * # of references on this descriptor
981 	 */
982 	atomic_t				pd_refs;
983 };
984 
985 /**
986  * NRS policy state
987  *
988  * Policies transition from one state to the other during their lifetime
989  */
990 enum ptlrpc_nrs_pol_state {
991 	/**
992 	 * Not a valid policy state.
993 	 */
994 	NRS_POL_STATE_INVALID,
995 	/**
996 	 * Policies are at this state either at the start of their life, or
997 	 * transition here when the user selects a different policy to act
998 	 * as the primary one.
999 	 */
1000 	NRS_POL_STATE_STOPPED,
1001 	/**
1002 	 * Policy is progress of stopping
1003 	 */
1004 	NRS_POL_STATE_STOPPING,
1005 	/**
1006 	 * Policy is in progress of starting
1007 	 */
1008 	NRS_POL_STATE_STARTING,
1009 	/**
1010 	 * A policy is in this state in two cases:
1011 	 * - it is the fallback policy, which is always in this state.
1012 	 * - it has been activated by the user; i.e. it is the primary policy,
1013 	 */
1014 	NRS_POL_STATE_STARTED,
1015 };
1016 
1017 /**
1018  * NRS policy information
1019  *
1020  * Used for obtaining information for the status of a policy via lprocfs
1021  */
1022 struct ptlrpc_nrs_pol_info {
1023 	/**
1024 	 * Policy name
1025 	 */
1026 	char				pi_name[NRS_POL_NAME_MAX];
1027 	/**
1028 	 * Current policy state
1029 	 */
1030 	enum ptlrpc_nrs_pol_state	pi_state;
1031 	/**
1032 	 * # RPCs enqueued for later dispatching by the policy
1033 	 */
1034 	long				pi_req_queued;
1035 	/**
1036 	 * # RPCs started for dispatch by the policy
1037 	 */
1038 	long				pi_req_started;
1039 	/**
1040 	 * Is this a fallback policy?
1041 	 */
1042 	unsigned			pi_fallback:1;
1043 };
1044 
1045 /**
1046  * NRS policy
1047  *
1048  * There is one instance of this for each policy in each NRS head of each
1049  * PTLRPC service partition.
1050  */
1051 struct ptlrpc_nrs_policy {
1052 	/**
1053 	 * Linkage into the NRS head's list of policies,
1054 	 * ptlrpc_nrs:nrs_policy_list
1055 	 */
1056 	struct list_head			pol_list;
1057 	/**
1058 	 * Linkage into the NRS head's list of policies with enqueued
1059 	 * requests ptlrpc_nrs:nrs_policy_queued
1060 	 */
1061 	struct list_head			pol_list_queued;
1062 	/**
1063 	 * Current state of this policy
1064 	 */
1065 	enum ptlrpc_nrs_pol_state	pol_state;
1066 	/**
1067 	 * Bitmask of nrs_policy_flags
1068 	 */
1069 	unsigned			pol_flags;
1070 	/**
1071 	 * # RPCs enqueued for later dispatching by the policy
1072 	 */
1073 	long				pol_req_queued;
1074 	/**
1075 	 * # RPCs started for dispatch by the policy
1076 	 */
1077 	long				pol_req_started;
1078 	/**
1079 	 * Usage Reference count taken on the policy instance
1080 	 */
1081 	long				pol_ref;
1082 	/**
1083 	 * The NRS head this policy has been created at
1084 	 */
1085 	struct ptlrpc_nrs	       *pol_nrs;
1086 	/**
1087 	 * Private policy data; varies by policy type
1088 	 */
1089 	void			       *pol_private;
1090 	/**
1091 	 * Policy descriptor for this policy instance.
1092 	 */
1093 	struct ptlrpc_nrs_pol_desc     *pol_desc;
1094 };
1095 
1096 /**
1097  * NRS resource
1098  *
1099  * Resources are embedded into two types of NRS entities:
1100  * - Inside NRS policies, in the policy's private data in
1101  *   ptlrpc_nrs_policy::pol_private
1102  * - In objects that act as prime-level scheduling entities in different NRS
1103  *   policies; e.g. on a policy that performs round robin or similar order
1104  *   scheduling across client NIDs, there would be one NRS resource per unique
1105  *   client NID. On a policy which performs round robin scheduling across
1106  *   backend filesystem objects, there would be one resource associated with
1107  *   each of the backend filesystem objects partaking in the scheduling
1108  *   performed by the policy.
1109  *
1110  * NRS resources share a parent-child relationship, in which resources embedded
1111  * in policy instances are the parent entities, with all scheduling entities
1112  * a policy schedules across being the children, thus forming a simple resource
1113  * hierarchy. This hierarchy may be extended with one or more levels in the
1114  * future if the ability to have more than one primary policy is added.
1115  *
1116  * Upon request initialization, references to the then active NRS policies are
1117  * taken and used to later handle the dispatching of the request with one of
1118  * these policies.
1119  *
1120  * \see nrs_resource_get_safe()
1121  * \see ptlrpc_nrs_req_add()
1122  */
1123 struct ptlrpc_nrs_resource {
1124 	/**
1125 	 * This NRS resource's parent; is NULL for resources embedded in NRS
1126 	 * policy instances; i.e. those are top-level ones.
1127 	 */
1128 	struct ptlrpc_nrs_resource     *res_parent;
1129 	/**
1130 	 * The policy associated with this resource.
1131 	 */
1132 	struct ptlrpc_nrs_policy       *res_policy;
1133 };
1134 
1135 enum {
1136 	NRS_RES_FALLBACK,
1137 	NRS_RES_PRIMARY,
1138 	NRS_RES_MAX
1139 };
1140 
1141 /* \name fifo
1142  *
1143  * FIFO policy
1144  *
1145  * This policy is a logical wrapper around previous, non-NRS functionality.
1146  * It dispatches RPCs in the same order as they arrive from the network. This
1147  * policy is currently used as the fallback policy, and the only enabled policy
1148  * on all NRS heads of all PTLRPC service partitions.
1149  * @{
1150  */
1151 
1152 /**
1153  * Private data structure for the FIFO policy
1154  */
1155 struct nrs_fifo_head {
1156 	/**
1157 	 * Resource object for policy instance.
1158 	 */
1159 	struct ptlrpc_nrs_resource	fh_res;
1160 	/**
1161 	 * List of queued requests.
1162 	 */
1163 	struct list_head			fh_list;
1164 	/**
1165 	 * For debugging purposes.
1166 	 */
1167 	__u64				fh_sequence;
1168 };
1169 
1170 struct nrs_fifo_req {
1171 	struct list_head		fr_list;
1172 	__u64			fr_sequence;
1173 };
1174 
1175 /** @} fifo */
1176 
1177 /**
1178  * NRS request
1179  *
1180  * Instances of this object exist embedded within ptlrpc_request; the main
1181  * purpose of this object is to hold references to the request's resources
1182  * for the lifetime of the request, and to hold properties that policies use
1183  * use for determining the request's scheduling priority.
1184  * */
1185 struct ptlrpc_nrs_request {
1186 	/**
1187 	 * The request's resource hierarchy.
1188 	 */
1189 	struct ptlrpc_nrs_resource     *nr_res_ptrs[NRS_RES_MAX];
1190 	/**
1191 	 * Index into ptlrpc_nrs_request::nr_res_ptrs of the resource of the
1192 	 * policy that was used to enqueue the request.
1193 	 *
1194 	 * \see nrs_request_enqueue()
1195 	 */
1196 	unsigned			nr_res_idx;
1197 	unsigned			nr_initialized:1;
1198 	unsigned			nr_enqueued:1;
1199 	unsigned			nr_started:1;
1200 	unsigned			nr_finalized:1;
1201 
1202 	/**
1203 	 * Policy-specific fields, used for determining a request's scheduling
1204 	 * priority, and other supporting functionality.
1205 	 */
1206 	union {
1207 		/**
1208 		 * Fields for the FIFO policy
1209 		 */
1210 		struct nrs_fifo_req	fifo;
1211 	} nr_u;
1212 	/**
1213 	 * Externally-registering policies may want to use this to allocate
1214 	 * their own request properties.
1215 	 */
1216 	void			       *ext;
1217 };
1218 
1219 /** @} nrs */
1220 
1221 /**
1222  * Basic request prioritization operations structure.
1223  * The whole idea is centered around locks and RPCs that might affect locks.
1224  * When a lock is contended we try to give priority to RPCs that might lead
1225  * to fastest release of that lock.
1226  * Currently only implemented for OSTs only in a way that makes all
1227  * IO and truncate RPCs that are coming from a locked region where a lock is
1228  * contended a priority over other requests.
1229  */
1230 struct ptlrpc_hpreq_ops {
1231 	/**
1232 	 * Check if the lock handle of the given lock is the same as
1233 	 * taken from the request.
1234 	 */
1235 	int  (*hpreq_lock_match)(struct ptlrpc_request *, struct ldlm_lock *);
1236 	/**
1237 	 * Check if the request is a high priority one.
1238 	 */
1239 	int  (*hpreq_check)(struct ptlrpc_request *);
1240 	/**
1241 	 * Called after the request has been handled.
1242 	 */
1243 	void (*hpreq_fini)(struct ptlrpc_request *);
1244 };
1245 
1246 /**
1247  * Represents remote procedure call.
1248  *
1249  * This is a staple structure used by everybody wanting to send a request
1250  * in Lustre.
1251  */
1252 struct ptlrpc_request {
1253 	/* Request type: one of PTL_RPC_MSG_* */
1254 	int rq_type;
1255 	/** Result of request processing */
1256 	int rq_status;
1257 	/**
1258 	 * Linkage item through which this request is included into
1259 	 * sending/delayed lists on client and into rqbd list on server
1260 	 */
1261 	struct list_head rq_list;
1262 	/**
1263 	 * Server side list of incoming unserved requests sorted by arrival
1264 	 * time.  Traversed from time to time to notice about to expire
1265 	 * requests and sent back "early replies" to clients to let them
1266 	 * know server is alive and well, just very busy to service their
1267 	 * requests in time
1268 	 */
1269 	struct list_head rq_timed_list;
1270 	/** server-side history, used for debugging purposes. */
1271 	struct list_head rq_history_list;
1272 	/** server-side per-export list */
1273 	struct list_head rq_exp_list;
1274 	/** server-side hp handlers */
1275 	struct ptlrpc_hpreq_ops *rq_ops;
1276 
1277 	/** initial thread servicing this request */
1278 	struct ptlrpc_thread *rq_svc_thread;
1279 
1280 	/** history sequence # */
1281 	__u64 rq_history_seq;
1282 	/** \addtogroup  nrs
1283 	 * @{
1284 	 */
1285 	/** stub for NRS request */
1286 	struct ptlrpc_nrs_request rq_nrq;
1287 	/** @} nrs */
1288 	/** the index of service's srv_at_array into which request is linked */
1289 	u32 rq_at_index;
1290 	/** Lock to protect request flags and some other important bits, like
1291 	 * rq_list
1292 	 */
1293 	spinlock_t rq_lock;
1294 	/** client-side flags are serialized by rq_lock */
1295 	unsigned int rq_intr:1, rq_replied:1, rq_err:1,
1296 		rq_timedout:1, rq_resend:1, rq_restart:1,
1297 		/**
1298 		 * when ->rq_replay is set, request is kept by the client even
1299 		 * after server commits corresponding transaction. This is
1300 		 * used for operations that require sequence of multiple
1301 		 * requests to be replayed. The only example currently is file
1302 		 * open/close. When last request in such a sequence is
1303 		 * committed, ->rq_replay is cleared on all requests in the
1304 		 * sequence.
1305 		 */
1306 		rq_replay:1,
1307 		rq_no_resend:1, rq_waiting:1, rq_receiving_reply:1,
1308 		rq_no_delay:1, rq_net_err:1, rq_wait_ctx:1,
1309 		rq_early:1,
1310 		rq_req_unlink:1, rq_reply_unlink:1,
1311 		rq_memalloc:1,      /* req originated from "kswapd" */
1312 		/* server-side flags */
1313 		rq_packed_final:1,  /* packed final reply */
1314 		rq_hp:1,	    /* high priority RPC */
1315 		rq_at_linked:1,     /* link into service's srv_at_array */
1316 		rq_reply_truncate:1,
1317 		rq_committed:1,
1318 		/* whether the "rq_set" is a valid one */
1319 		rq_invalid_rqset:1,
1320 		rq_generation_set:1,
1321 		/* do not resend request on -EINPROGRESS */
1322 		rq_no_retry_einprogress:1,
1323 		/* allow the req to be sent if the import is in recovery
1324 		 * status */
1325 		rq_allow_replay:1;
1326 
1327 	unsigned int rq_nr_resend;
1328 
1329 	enum rq_phase rq_phase; /* one of RQ_PHASE_* */
1330 	enum rq_phase rq_next_phase; /* one of RQ_PHASE_* to be used next */
1331 	atomic_t rq_refcount;/* client-side refcount for SENT race,
1332 				    server-side refcount for multiple replies */
1333 
1334 	/** Portal to which this request would be sent */
1335 	short rq_request_portal;  /* XXX FIXME bug 249 */
1336 	/** Portal where to wait for reply and where reply would be sent */
1337 	short rq_reply_portal;    /* XXX FIXME bug 249 */
1338 
1339 	/**
1340 	 * client-side:
1341 	 * !rq_truncate : # reply bytes actually received,
1342 	 *  rq_truncate : required repbuf_len for resend
1343 	 */
1344 	int rq_nob_received;
1345 	/** Request length */
1346 	int rq_reqlen;
1347 	/** Reply length */
1348 	int rq_replen;
1349 	/** Request message - what client sent */
1350 	struct lustre_msg *rq_reqmsg;
1351 	/** Reply message - server response */
1352 	struct lustre_msg *rq_repmsg;
1353 	/** Transaction number */
1354 	__u64 rq_transno;
1355 	/** xid */
1356 	__u64 rq_xid;
1357 	/**
1358 	 * List item to for replay list. Not yet committed requests get linked
1359 	 * there.
1360 	 * Also see \a rq_replay comment above.
1361 	 */
1362 	struct list_head rq_replay_list;
1363 
1364 	/**
1365 	 * security and encryption data
1366 	 * @{ */
1367 	struct ptlrpc_cli_ctx   *rq_cli_ctx;     /**< client's half ctx */
1368 	struct ptlrpc_svc_ctx   *rq_svc_ctx;     /**< server's half ctx */
1369 	struct list_head	       rq_ctx_chain;   /**< link to waited ctx */
1370 
1371 	struct sptlrpc_flavor    rq_flvr;	/**< for client & server */
1372 	enum lustre_sec_part     rq_sp_from;
1373 
1374 	/* client/server security flags */
1375 	unsigned int
1376 				 rq_ctx_init:1,      /* context initiation */
1377 				 rq_ctx_fini:1,      /* context destroy */
1378 				 rq_bulk_read:1,     /* request bulk read */
1379 				 rq_bulk_write:1,    /* request bulk write */
1380 				 /* server authentication flags */
1381 				 rq_auth_gss:1,      /* authenticated by gss */
1382 				 rq_auth_remote:1,   /* authed as remote user */
1383 				 rq_auth_usr_root:1, /* authed as root */
1384 				 rq_auth_usr_mdt:1,  /* authed as mdt */
1385 				 rq_auth_usr_ost:1,  /* authed as ost */
1386 				 /* security tfm flags */
1387 				 rq_pack_udesc:1,
1388 				 rq_pack_bulk:1,
1389 				 /* doesn't expect reply FIXME */
1390 				 rq_no_reply:1,
1391 				 rq_pill_init:1;     /* pill initialized */
1392 
1393 	uid_t		    rq_auth_uid;	/* authed uid */
1394 	uid_t		    rq_auth_mapped_uid; /* authed uid mapped to */
1395 
1396 	/* (server side), pointed directly into req buffer */
1397 	struct ptlrpc_user_desc *rq_user_desc;
1398 
1399 	/* various buffer pointers */
1400 	struct lustre_msg       *rq_reqbuf;      /* req wrapper */
1401 	char		    *rq_repbuf;      /* rep buffer */
1402 	struct lustre_msg       *rq_repdata;     /* rep wrapper msg */
1403 	struct lustre_msg       *rq_clrbuf;      /* only in priv mode */
1404 	int		      rq_reqbuf_len;  /* req wrapper buf len */
1405 	int		      rq_reqdata_len; /* req wrapper msg len */
1406 	int		      rq_repbuf_len;  /* rep buffer len */
1407 	int		      rq_repdata_len; /* rep wrapper msg len */
1408 	int		      rq_clrbuf_len;  /* only in priv mode */
1409 	int		      rq_clrdata_len; /* only in priv mode */
1410 
1411 	/** early replies go to offset 0, regular replies go after that */
1412 	unsigned int	     rq_reply_off;
1413 
1414 	/** @} */
1415 
1416 	/** Fields that help to see if request and reply were swabbed or not */
1417 	__u32 rq_req_swab_mask;
1418 	__u32 rq_rep_swab_mask;
1419 
1420 	/** What was import generation when this request was sent */
1421 	int rq_import_generation;
1422 	enum lustre_imp_state rq_send_state;
1423 
1424 	/** how many early replies (for stats) */
1425 	int rq_early_count;
1426 
1427 	/** client+server request */
1428 	lnet_handle_md_t     rq_req_md_h;
1429 	struct ptlrpc_cb_id  rq_req_cbid;
1430 	/** optional time limit for send attempts */
1431 	long       rq_delay_limit;
1432 	/** time request was first queued */
1433 	unsigned long	   rq_queued_time;
1434 
1435 	/* server-side... */
1436 	/** request arrival time */
1437 	struct timespec64	rq_arrival_time;
1438 	/** separated reply state */
1439 	struct ptlrpc_reply_state *rq_reply_state;
1440 	/** incoming request buffer */
1441 	struct ptlrpc_request_buffer_desc *rq_rqbd;
1442 
1443 	/** client-only incoming reply */
1444 	lnet_handle_md_t     rq_reply_md_h;
1445 	wait_queue_head_t	  rq_reply_waitq;
1446 	struct ptlrpc_cb_id  rq_reply_cbid;
1447 
1448 	/** our LNet NID */
1449 	lnet_nid_t	   rq_self;
1450 	/** Peer description (the other side) */
1451 	lnet_process_id_t    rq_peer;
1452 	/** Server-side, export on which request was received */
1453 	struct obd_export   *rq_export;
1454 	/** Client side, import where request is being sent */
1455 	struct obd_import   *rq_import;
1456 
1457 	/** Replay callback, called after request is replayed at recovery */
1458 	void (*rq_replay_cb)(struct ptlrpc_request *);
1459 	/**
1460 	 * Commit callback, called when request is committed and about to be
1461 	 * freed.
1462 	 */
1463 	void (*rq_commit_cb)(struct ptlrpc_request *);
1464 	/** Opaq data for replay and commit callbacks. */
1465 	void  *rq_cb_data;
1466 
1467 	/** For bulk requests on client only: bulk descriptor */
1468 	struct ptlrpc_bulk_desc *rq_bulk;
1469 
1470 	/** client outgoing req */
1471 	/**
1472 	 * when request/reply sent (secs), or time when request should be sent
1473 	 */
1474 	time64_t rq_sent;
1475 	/** time for request really sent out */
1476 	time64_t rq_real_sent;
1477 
1478 	/** when request must finish. volatile
1479 	 * so that servers' early reply updates to the deadline aren't
1480 	 * kept in per-cpu cache */
1481 	volatile time64_t rq_deadline;
1482 	/** when req reply unlink must finish. */
1483 	time64_t rq_reply_deadline;
1484 	/** when req bulk unlink must finish. */
1485 	time64_t rq_bulk_deadline;
1486 	/**
1487 	 * service time estimate (secs)
1488 	 * If the requestsis not served by this time, it is marked as timed out.
1489 	 */
1490 	int    rq_timeout;
1491 
1492 	/** Multi-rpc bits */
1493 	/** Per-request waitq introduced by bug 21938 for recovery waiting */
1494 	wait_queue_head_t rq_set_waitq;
1495 	/** Link item for request set lists */
1496 	struct list_head  rq_set_chain;
1497 	/** Link back to the request set */
1498 	struct ptlrpc_request_set *rq_set;
1499 	/** Async completion handler, called when reply is received */
1500 	ptlrpc_interpterer_t rq_interpret_reply;
1501 	/** Async completion context */
1502 	union ptlrpc_async_args rq_async_args;
1503 
1504 	/** Pool if request is from preallocated list */
1505 	struct ptlrpc_request_pool *rq_pool;
1506 
1507 	struct lu_context	   rq_session;
1508 	struct lu_context	   rq_recov_session;
1509 
1510 	/** request format description */
1511 	struct req_capsule	  rq_pill;
1512 };
1513 
1514 /**
1515  * Call completion handler for rpc if any, return it's status or original
1516  * rc if there was no handler defined for this request.
1517  */
ptlrpc_req_interpret(const struct lu_env * env,struct ptlrpc_request * req,int rc)1518 static inline int ptlrpc_req_interpret(const struct lu_env *env,
1519 				       struct ptlrpc_request *req, int rc)
1520 {
1521 	if (req->rq_interpret_reply != NULL) {
1522 		req->rq_status = req->rq_interpret_reply(env, req,
1523 							 &req->rq_async_args,
1524 							 rc);
1525 		return req->rq_status;
1526 	}
1527 	return rc;
1528 }
1529 
1530 /*
1531  * Can the request be moved from the regular NRS head to the high-priority NRS
1532  * head (of the same PTLRPC service partition), if any?
1533  *
1534  * For a reliable result, this should be checked under svcpt->scp_req lock.
1535  */
ptlrpc_nrs_req_can_move(struct ptlrpc_request * req)1536 static inline bool ptlrpc_nrs_req_can_move(struct ptlrpc_request *req)
1537 {
1538 	struct ptlrpc_nrs_request *nrq = &req->rq_nrq;
1539 
1540 	/**
1541 	 * LU-898: Check ptlrpc_nrs_request::nr_enqueued to make sure the
1542 	 * request has been enqueued first, and ptlrpc_nrs_request::nr_started
1543 	 * to make sure it has not been scheduled yet (analogous to previous
1544 	 * (non-NRS) checking of !list_empty(&ptlrpc_request::rq_list).
1545 	 */
1546 	return nrq->nr_enqueued && !nrq->nr_started && !req->rq_hp;
1547 }
1548 
1549 /** @} nrs */
1550 
1551 /**
1552  * Returns 1 if request buffer at offset \a index was already swabbed
1553  */
lustre_req_swabbed(struct ptlrpc_request * req,int index)1554 static inline int lustre_req_swabbed(struct ptlrpc_request *req, int index)
1555 {
1556 	LASSERT(index < sizeof(req->rq_req_swab_mask) * 8);
1557 	return req->rq_req_swab_mask & (1 << index);
1558 }
1559 
1560 /**
1561  * Returns 1 if request reply buffer at offset \a index was already swabbed
1562  */
lustre_rep_swabbed(struct ptlrpc_request * req,int index)1563 static inline int lustre_rep_swabbed(struct ptlrpc_request *req, int index)
1564 {
1565 	LASSERT(index < sizeof(req->rq_rep_swab_mask) * 8);
1566 	return req->rq_rep_swab_mask & (1 << index);
1567 }
1568 
1569 /**
1570  * Returns 1 if request needs to be swabbed into local cpu byteorder
1571  */
ptlrpc_req_need_swab(struct ptlrpc_request * req)1572 static inline int ptlrpc_req_need_swab(struct ptlrpc_request *req)
1573 {
1574 	return lustre_req_swabbed(req, MSG_PTLRPC_HEADER_OFF);
1575 }
1576 
1577 /**
1578  * Returns 1 if request reply needs to be swabbed into local cpu byteorder
1579  */
ptlrpc_rep_need_swab(struct ptlrpc_request * req)1580 static inline int ptlrpc_rep_need_swab(struct ptlrpc_request *req)
1581 {
1582 	return lustre_rep_swabbed(req, MSG_PTLRPC_HEADER_OFF);
1583 }
1584 
1585 /**
1586  * Mark request buffer at offset \a index that it was already swabbed
1587  */
lustre_set_req_swabbed(struct ptlrpc_request * req,int index)1588 static inline void lustre_set_req_swabbed(struct ptlrpc_request *req, int index)
1589 {
1590 	LASSERT(index < sizeof(req->rq_req_swab_mask) * 8);
1591 	LASSERT((req->rq_req_swab_mask & (1 << index)) == 0);
1592 	req->rq_req_swab_mask |= 1 << index;
1593 }
1594 
1595 /**
1596  * Mark request reply buffer at offset \a index that it was already swabbed
1597  */
lustre_set_rep_swabbed(struct ptlrpc_request * req,int index)1598 static inline void lustre_set_rep_swabbed(struct ptlrpc_request *req, int index)
1599 {
1600 	LASSERT(index < sizeof(req->rq_rep_swab_mask) * 8);
1601 	LASSERT((req->rq_rep_swab_mask & (1 << index)) == 0);
1602 	req->rq_rep_swab_mask |= 1 << index;
1603 }
1604 
1605 /**
1606  * Convert numerical request phase value \a phase into text string description
1607  */
1608 static inline const char *
ptlrpc_phase2str(enum rq_phase phase)1609 ptlrpc_phase2str(enum rq_phase phase)
1610 {
1611 	switch (phase) {
1612 	case RQ_PHASE_NEW:
1613 		return "New";
1614 	case RQ_PHASE_RPC:
1615 		return "Rpc";
1616 	case RQ_PHASE_BULK:
1617 		return "Bulk";
1618 	case RQ_PHASE_INTERPRET:
1619 		return "Interpret";
1620 	case RQ_PHASE_COMPLETE:
1621 		return "Complete";
1622 	case RQ_PHASE_UNREGISTERING:
1623 		return "Unregistering";
1624 	default:
1625 		return "?Phase?";
1626 	}
1627 }
1628 
1629 /**
1630  * Convert numerical request phase of the request \a req into text stringi
1631  * description
1632  */
1633 static inline const char *
ptlrpc_rqphase2str(struct ptlrpc_request * req)1634 ptlrpc_rqphase2str(struct ptlrpc_request *req)
1635 {
1636 	return ptlrpc_phase2str(req->rq_phase);
1637 }
1638 
1639 /**
1640  * Debugging functions and helpers to print request structure into debug log
1641  * @{
1642  */
1643 /* Spare the preprocessor, spoil the bugs. */
1644 #define FLAG(field, str) (field ? str : "")
1645 
1646 /** Convert bit flags into a string */
1647 #define DEBUG_REQ_FLAGS(req)						    \
1648 	ptlrpc_rqphase2str(req),						\
1649 	FLAG(req->rq_intr, "I"), FLAG(req->rq_replied, "R"),		    \
1650 	FLAG(req->rq_err, "E"),						 \
1651 	FLAG(req->rq_timedout, "X") /* eXpired */, FLAG(req->rq_resend, "S"),   \
1652 	FLAG(req->rq_restart, "T"), FLAG(req->rq_replay, "P"),		  \
1653 	FLAG(req->rq_no_resend, "N"),					   \
1654 	FLAG(req->rq_waiting, "W"),					     \
1655 	FLAG(req->rq_wait_ctx, "C"), FLAG(req->rq_hp, "H"),		     \
1656 	FLAG(req->rq_committed, "M")
1657 
1658 #define REQ_FLAGS_FMT "%s:%s%s%s%s%s%s%s%s%s%s%s%s"
1659 
1660 void _debug_req(struct ptlrpc_request *req,
1661 		struct libcfs_debug_msg_data *data, const char *fmt, ...)
1662 	__printf(3, 4);
1663 
1664 /**
1665  * Helper that decides if we need to print request according to current debug
1666  * level settings
1667  */
1668 #define debug_req(msgdata, mask, cdls, req, fmt, a...)			\
1669 do {									  \
1670 	CFS_CHECK_STACK(msgdata, mask, cdls);				 \
1671 									      \
1672 	if (((mask) & D_CANTMASK) != 0 ||				     \
1673 	    ((libcfs_debug & (mask)) != 0 &&				  \
1674 	     (libcfs_subsystem_debug & DEBUG_SUBSYSTEM) != 0))		\
1675 		_debug_req((req), msgdata, fmt, ##a);			 \
1676 } while (0)
1677 
1678 /**
1679  * This is the debug print function you need to use to print request structure
1680  * content into lustre debug log.
1681  * for most callers (level is a constant) this is resolved at compile time */
1682 #define DEBUG_REQ(level, req, fmt, args...)				   \
1683 do {									  \
1684 	if ((level) & (D_ERROR | D_WARNING)) {				\
1685 		static struct cfs_debug_limit_state cdls;			  \
1686 		LIBCFS_DEBUG_MSG_DATA_DECL(msgdata, level, &cdls);	    \
1687 		debug_req(&msgdata, level, &cdls, req, "@@@ "fmt" ", ## args);\
1688 	} else {							      \
1689 		LIBCFS_DEBUG_MSG_DATA_DECL(msgdata, level, NULL);	     \
1690 		debug_req(&msgdata, level, NULL, req, "@@@ "fmt" ", ## args); \
1691 	}								     \
1692 } while (0)
1693 /** @} */
1694 
1695 /**
1696  * Structure that defines a single page of a bulk transfer
1697  */
1698 struct ptlrpc_bulk_page {
1699 	/** Linkage to list of pages in a bulk */
1700 	struct list_head       bp_link;
1701 	/**
1702 	 * Number of bytes in a page to transfer starting from \a bp_pageoffset
1703 	 */
1704 	int	      bp_buflen;
1705 	/** offset within a page */
1706 	int	      bp_pageoffset;
1707 	/** The page itself */
1708 	struct page     *bp_page;
1709 };
1710 
1711 #define BULK_GET_SOURCE   0
1712 #define BULK_PUT_SINK     1
1713 #define BULK_GET_SINK     2
1714 #define BULK_PUT_SOURCE   3
1715 
1716 /**
1717  * Definition of bulk descriptor.
1718  * Bulks are special "Two phase" RPCs where initial request message
1719  * is sent first and it is followed bt a transfer (o receiving) of a large
1720  * amount of data to be settled into pages referenced from the bulk descriptors.
1721  * Bulks transfers (the actual data following the small requests) are done
1722  * on separate LNet portals.
1723  * In lustre we use bulk transfers for READ and WRITE transfers from/to OSTs.
1724  *  Another user is readpage for MDT.
1725  */
1726 struct ptlrpc_bulk_desc {
1727 	/** completed with failure */
1728 	unsigned long bd_failure:1;
1729 	/** {put,get}{source,sink} */
1730 	unsigned long bd_type:2;
1731 	/** client side */
1732 	unsigned long bd_registered:1;
1733 	/** For serialization with callback */
1734 	spinlock_t bd_lock;
1735 	/** Import generation when request for this bulk was sent */
1736 	int bd_import_generation;
1737 	/** LNet portal for this bulk */
1738 	__u32 bd_portal;
1739 	/** Server side - export this bulk created for */
1740 	struct obd_export *bd_export;
1741 	/** Client side - import this bulk was sent on */
1742 	struct obd_import *bd_import;
1743 	/** Back pointer to the request */
1744 	struct ptlrpc_request *bd_req;
1745 	wait_queue_head_t	    bd_waitq;	/* server side only WQ */
1746 	int		    bd_iov_count;    /* # entries in bd_iov */
1747 	int		    bd_max_iov;      /* allocated size of bd_iov */
1748 	int		    bd_nob;	  /* # bytes covered */
1749 	int		    bd_nob_transferred; /* # bytes GOT/PUT */
1750 
1751 	__u64		  bd_last_xid;
1752 
1753 	struct ptlrpc_cb_id    bd_cbid;	 /* network callback info */
1754 	lnet_nid_t	     bd_sender;       /* stash event::sender */
1755 	int			bd_md_count;	/* # valid entries in bd_mds */
1756 	int			bd_md_max_brw;	/* max entries in bd_mds */
1757 	/** array of associated MDs */
1758 	lnet_handle_md_t	bd_mds[PTLRPC_BULK_OPS_COUNT];
1759 
1760 	/*
1761 	 * encrypt iov, size is either 0 or bd_iov_count.
1762 	 */
1763 	lnet_kiov_t	   *bd_enc_iov;
1764 
1765 	lnet_kiov_t	    bd_iov[0];
1766 };
1767 
1768 enum {
1769 	SVC_STOPPED     = 1 << 0,
1770 	SVC_STOPPING    = 1 << 1,
1771 	SVC_STARTING    = 1 << 2,
1772 	SVC_RUNNING     = 1 << 3,
1773 	SVC_EVENT       = 1 << 4,
1774 	SVC_SIGNAL      = 1 << 5,
1775 };
1776 
1777 #define PTLRPC_THR_NAME_LEN		32
1778 /**
1779  * Definition of server service thread structure
1780  */
1781 struct ptlrpc_thread {
1782 	/**
1783 	 * List of active threads in svc->srv_threads
1784 	 */
1785 	struct list_head t_link;
1786 	/**
1787 	 * thread-private data (preallocated memory)
1788 	 */
1789 	void *t_data;
1790 	__u32 t_flags;
1791 	/**
1792 	 * service thread index, from ptlrpc_start_threads
1793 	 */
1794 	unsigned int t_id;
1795 	/**
1796 	 * service thread pid
1797 	 */
1798 	pid_t t_pid;
1799 	/**
1800 	 * put watchdog in the structure per thread b=14840
1801 	 *
1802 	 * Lustre watchdog is removed for client in the hope
1803 	 * of a generic watchdog can be merged in kernel.
1804 	 * When that happens, we should add below back.
1805 	 *
1806 	 * struct lc_watchdog *t_watchdog;
1807 	 */
1808 	/**
1809 	 * the svc this thread belonged to b=18582
1810 	 */
1811 	struct ptlrpc_service_part	*t_svcpt;
1812 	wait_queue_head_t			t_ctl_waitq;
1813 	struct lu_env			*t_env;
1814 	char				t_name[PTLRPC_THR_NAME_LEN];
1815 };
1816 
thread_is_init(struct ptlrpc_thread * thread)1817 static inline int thread_is_init(struct ptlrpc_thread *thread)
1818 {
1819 	return thread->t_flags == 0;
1820 }
1821 
thread_is_stopped(struct ptlrpc_thread * thread)1822 static inline int thread_is_stopped(struct ptlrpc_thread *thread)
1823 {
1824 	return !!(thread->t_flags & SVC_STOPPED);
1825 }
1826 
thread_is_stopping(struct ptlrpc_thread * thread)1827 static inline int thread_is_stopping(struct ptlrpc_thread *thread)
1828 {
1829 	return !!(thread->t_flags & SVC_STOPPING);
1830 }
1831 
thread_is_starting(struct ptlrpc_thread * thread)1832 static inline int thread_is_starting(struct ptlrpc_thread *thread)
1833 {
1834 	return !!(thread->t_flags & SVC_STARTING);
1835 }
1836 
thread_is_running(struct ptlrpc_thread * thread)1837 static inline int thread_is_running(struct ptlrpc_thread *thread)
1838 {
1839 	return !!(thread->t_flags & SVC_RUNNING);
1840 }
1841 
thread_is_event(struct ptlrpc_thread * thread)1842 static inline int thread_is_event(struct ptlrpc_thread *thread)
1843 {
1844 	return !!(thread->t_flags & SVC_EVENT);
1845 }
1846 
thread_is_signal(struct ptlrpc_thread * thread)1847 static inline int thread_is_signal(struct ptlrpc_thread *thread)
1848 {
1849 	return !!(thread->t_flags & SVC_SIGNAL);
1850 }
1851 
thread_clear_flags(struct ptlrpc_thread * thread,__u32 flags)1852 static inline void thread_clear_flags(struct ptlrpc_thread *thread, __u32 flags)
1853 {
1854 	thread->t_flags &= ~flags;
1855 }
1856 
thread_set_flags(struct ptlrpc_thread * thread,__u32 flags)1857 static inline void thread_set_flags(struct ptlrpc_thread *thread, __u32 flags)
1858 {
1859 	thread->t_flags = flags;
1860 }
1861 
thread_add_flags(struct ptlrpc_thread * thread,__u32 flags)1862 static inline void thread_add_flags(struct ptlrpc_thread *thread, __u32 flags)
1863 {
1864 	thread->t_flags |= flags;
1865 }
1866 
thread_test_and_clear_flags(struct ptlrpc_thread * thread,__u32 flags)1867 static inline int thread_test_and_clear_flags(struct ptlrpc_thread *thread,
1868 					      __u32 flags)
1869 {
1870 	if (thread->t_flags & flags) {
1871 		thread->t_flags &= ~flags;
1872 		return 1;
1873 	}
1874 	return 0;
1875 }
1876 
1877 /**
1878  * Request buffer descriptor structure.
1879  * This is a structure that contains one posted request buffer for service.
1880  * Once data land into a buffer, event callback creates actual request and
1881  * notifies wakes one of the service threads to process new incoming request.
1882  * More than one request can fit into the buffer.
1883  */
1884 struct ptlrpc_request_buffer_desc {
1885 	/** Link item for rqbds on a service */
1886 	struct list_head	     rqbd_list;
1887 	/** History of requests for this buffer */
1888 	struct list_head	     rqbd_reqs;
1889 	/** Back pointer to service for which this buffer is registered */
1890 	struct ptlrpc_service_part *rqbd_svcpt;
1891 	/** LNet descriptor */
1892 	lnet_handle_md_t       rqbd_md_h;
1893 	int		    rqbd_refcount;
1894 	/** The buffer itself */
1895 	char		  *rqbd_buffer;
1896 	struct ptlrpc_cb_id    rqbd_cbid;
1897 	/**
1898 	 * This "embedded" request structure is only used for the
1899 	 * last request to fit into the buffer
1900 	 */
1901 	struct ptlrpc_request  rqbd_req;
1902 };
1903 
1904 typedef int  (*svc_handler_t)(struct ptlrpc_request *req);
1905 
1906 struct ptlrpc_service_ops {
1907 	/**
1908 	 * if non-NULL called during thread creation (ptlrpc_start_thread())
1909 	 * to initialize service specific per-thread state.
1910 	 */
1911 	int		(*so_thr_init)(struct ptlrpc_thread *thr);
1912 	/**
1913 	 * if non-NULL called during thread shutdown (ptlrpc_main()) to
1914 	 * destruct state created by ->srv_init().
1915 	 */
1916 	void		(*so_thr_done)(struct ptlrpc_thread *thr);
1917 	/**
1918 	 * Handler function for incoming requests for this service
1919 	 */
1920 	int		(*so_req_handler)(struct ptlrpc_request *req);
1921 	/**
1922 	 * function to determine priority of the request, it's called
1923 	 * on every new request
1924 	 */
1925 	int		(*so_hpreq_handler)(struct ptlrpc_request *);
1926 	/**
1927 	 * service-specific print fn
1928 	 */
1929 	void		(*so_req_printer)(void *, struct ptlrpc_request *);
1930 };
1931 
1932 #ifndef __cfs_cacheline_aligned
1933 /* NB: put it here for reducing patche dependence */
1934 # define __cfs_cacheline_aligned
1935 #endif
1936 
1937 /**
1938  * How many high priority requests to serve before serving one normal
1939  * priority request
1940  */
1941 #define PTLRPC_SVC_HP_RATIO 10
1942 
1943 /**
1944  * Definition of PortalRPC service.
1945  * The service is listening on a particular portal (like tcp port)
1946  * and perform actions for a specific server like IO service for OST
1947  * or general metadata service for MDS.
1948  */
1949 struct ptlrpc_service {
1950 	/** serialize /proc operations */
1951 	spinlock_t			srv_lock;
1952 	/** most often accessed fields */
1953 	/** chain thru all services */
1954 	struct list_head		      srv_list;
1955 	/** service operations table */
1956 	struct ptlrpc_service_ops	srv_ops;
1957 	/** only statically allocated strings here; we don't clean them */
1958 	char			   *srv_name;
1959 	/** only statically allocated strings here; we don't clean them */
1960 	char			   *srv_thread_name;
1961 	/** service thread list */
1962 	struct list_head		      srv_threads;
1963 	/** threads # should be created for each partition on initializing */
1964 	int				srv_nthrs_cpt_init;
1965 	/** limit of threads number for each partition */
1966 	int				srv_nthrs_cpt_limit;
1967 	/** Root of debugfs dir tree for this service */
1968 	struct dentry		   *srv_debugfs_entry;
1969 	/** Pointer to statistic data for this service */
1970 	struct lprocfs_stats	   *srv_stats;
1971 	/** # hp per lp reqs to handle */
1972 	int			     srv_hpreq_ratio;
1973 	/** biggest request to receive */
1974 	int			     srv_max_req_size;
1975 	/** biggest reply to send */
1976 	int			     srv_max_reply_size;
1977 	/** size of individual buffers */
1978 	int			     srv_buf_size;
1979 	/** # buffers to allocate in 1 group */
1980 	int			     srv_nbuf_per_group;
1981 	/** Local portal on which to receive requests */
1982 	__u32			   srv_req_portal;
1983 	/** Portal on the client to send replies to */
1984 	__u32			   srv_rep_portal;
1985 	/**
1986 	 * Tags for lu_context associated with this thread, see struct
1987 	 * lu_context.
1988 	 */
1989 	__u32			   srv_ctx_tags;
1990 	/** soft watchdog timeout multiplier */
1991 	int			     srv_watchdog_factor;
1992 	/** under unregister_service */
1993 	unsigned			srv_is_stopping:1;
1994 
1995 	/** max # request buffers in history per partition */
1996 	int				srv_hist_nrqbds_cpt_max;
1997 	/** number of CPTs this service bound on */
1998 	int				srv_ncpts;
1999 	/** CPTs array this service bound on */
2000 	__u32				*srv_cpts;
2001 	/** 2^srv_cptab_bits >= cfs_cpt_numbert(srv_cptable) */
2002 	int				srv_cpt_bits;
2003 	/** CPT table this service is running over */
2004 	struct cfs_cpt_table		*srv_cptable;
2005 
2006 	/* sysfs object */
2007 	struct kobject			 srv_kobj;
2008 	struct completion		 srv_kobj_unregister;
2009 	/**
2010 	 * partition data for ptlrpc service
2011 	 */
2012 	struct ptlrpc_service_part	*srv_parts[0];
2013 };
2014 
2015 /**
2016  * Definition of PortalRPC service partition data.
2017  * Although a service only has one instance of it right now, but we
2018  * will have multiple instances very soon (instance per CPT).
2019  *
2020  * it has four locks:
2021  * \a scp_lock
2022  *    serialize operations on rqbd and requests waiting for preprocess
2023  * \a scp_req_lock
2024  *    serialize operations active requests sent to this portal
2025  * \a scp_at_lock
2026  *    serialize adaptive timeout stuff
2027  * \a scp_rep_lock
2028  *    serialize operations on RS list (reply states)
2029  *
2030  * We don't have any use-case to take two or more locks at the same time
2031  * for now, so there is no lock order issue.
2032  */
2033 struct ptlrpc_service_part {
2034 	/** back reference to owner */
2035 	struct ptlrpc_service		*scp_service __cfs_cacheline_aligned;
2036 	/* CPT id, reserved */
2037 	int				scp_cpt;
2038 	/** always increasing number */
2039 	int				scp_thr_nextid;
2040 	/** # of starting threads */
2041 	int				scp_nthrs_starting;
2042 	/** # of stopping threads, reserved for shrinking threads */
2043 	int				scp_nthrs_stopping;
2044 	/** # running threads */
2045 	int				scp_nthrs_running;
2046 	/** service threads list */
2047 	struct list_head			scp_threads;
2048 
2049 	/**
2050 	 * serialize the following fields, used for protecting
2051 	 * rqbd list and incoming requests waiting for preprocess,
2052 	 * threads starting & stopping are also protected by this lock.
2053 	 */
2054 	spinlock_t scp_lock __cfs_cacheline_aligned;
2055 	/** total # req buffer descs allocated */
2056 	int				scp_nrqbds_total;
2057 	/** # posted request buffers for receiving */
2058 	int				scp_nrqbds_posted;
2059 	/** in progress of allocating rqbd */
2060 	int				scp_rqbd_allocating;
2061 	/** # incoming reqs */
2062 	int				scp_nreqs_incoming;
2063 	/** request buffers to be reposted */
2064 	struct list_head			scp_rqbd_idle;
2065 	/** req buffers receiving */
2066 	struct list_head			scp_rqbd_posted;
2067 	/** incoming reqs */
2068 	struct list_head			scp_req_incoming;
2069 	/** timeout before re-posting reqs, in tick */
2070 	long			scp_rqbd_timeout;
2071 	/**
2072 	 * all threads sleep on this. This wait-queue is signalled when new
2073 	 * incoming request arrives and when difficult reply has to be handled.
2074 	 */
2075 	wait_queue_head_t			scp_waitq;
2076 
2077 	/** request history */
2078 	struct list_head			scp_hist_reqs;
2079 	/** request buffer history */
2080 	struct list_head			scp_hist_rqbds;
2081 	/** # request buffers in history */
2082 	int				scp_hist_nrqbds;
2083 	/** sequence number for request */
2084 	__u64				scp_hist_seq;
2085 	/** highest seq culled from history */
2086 	__u64				scp_hist_seq_culled;
2087 
2088 	/**
2089 	 * serialize the following fields, used for processing requests
2090 	 * sent to this portal
2091 	 */
2092 	spinlock_t			scp_req_lock __cfs_cacheline_aligned;
2093 	/** # reqs in either of the NRS heads below */
2094 	/** # reqs being served */
2095 	int				scp_nreqs_active;
2096 	/** # HPreqs being served */
2097 	int				scp_nhreqs_active;
2098 	/** # hp requests handled */
2099 	int				scp_hreq_count;
2100 
2101 	/** NRS head for regular requests */
2102 	struct ptlrpc_nrs		scp_nrs_reg;
2103 	/** NRS head for HP requests; this is only valid for services that can
2104 	 *  handle HP requests */
2105 	struct ptlrpc_nrs	       *scp_nrs_hp;
2106 
2107 	/** AT stuff */
2108 	/** @{ */
2109 	/**
2110 	 * serialize the following fields, used for changes on
2111 	 * adaptive timeout
2112 	 */
2113 	spinlock_t			scp_at_lock __cfs_cacheline_aligned;
2114 	/** estimated rpc service time */
2115 	struct adaptive_timeout		scp_at_estimate;
2116 	/** reqs waiting for replies */
2117 	struct ptlrpc_at_array		scp_at_array;
2118 	/** early reply timer */
2119 	struct timer_list		scp_at_timer;
2120 	/** debug */
2121 	unsigned long			scp_at_checktime;
2122 	/** check early replies */
2123 	unsigned			scp_at_check;
2124 	/** @} */
2125 
2126 	/**
2127 	 * serialize the following fields, used for processing
2128 	 * replies for this portal
2129 	 */
2130 	spinlock_t			scp_rep_lock __cfs_cacheline_aligned;
2131 	/** all the active replies */
2132 	struct list_head			scp_rep_active;
2133 	/** List of free reply_states */
2134 	struct list_head			scp_rep_idle;
2135 	/** waitq to run, when adding stuff to srv_free_rs_list */
2136 	wait_queue_head_t			scp_rep_waitq;
2137 	/** # 'difficult' replies */
2138 	atomic_t			scp_nreps_difficult;
2139 };
2140 
2141 #define ptlrpc_service_for_each_part(part, i, svc)			\
2142 	for (i = 0;							\
2143 	     i < (svc)->srv_ncpts &&					\
2144 	     (svc)->srv_parts != NULL &&				\
2145 	     ((part) = (svc)->srv_parts[i]) != NULL; i++)
2146 
2147 /**
2148  * Declaration of ptlrpcd control structure
2149  */
2150 struct ptlrpcd_ctl {
2151 	/**
2152 	 * Ptlrpc thread control flags (LIOD_START, LIOD_STOP, LIOD_FORCE)
2153 	 */
2154 	unsigned long			pc_flags;
2155 	/**
2156 	 * Thread lock protecting structure fields.
2157 	 */
2158 	spinlock_t			pc_lock;
2159 	/**
2160 	 * Start completion.
2161 	 */
2162 	struct completion		pc_starting;
2163 	/**
2164 	 * Stop completion.
2165 	 */
2166 	struct completion		pc_finishing;
2167 	/**
2168 	 * Thread requests set.
2169 	 */
2170 	struct ptlrpc_request_set  *pc_set;
2171 	/**
2172 	 * Thread name used in kthread_run()
2173 	 */
2174 	char			pc_name[16];
2175 	/**
2176 	 * Environment for request interpreters to run in.
2177 	 */
2178 	struct lu_env	       pc_env;
2179 	/**
2180 	 * CPT the thread is bound on.
2181 	 */
2182 	int				pc_cpt;
2183 	/**
2184 	 * Index of ptlrpcd thread in the array.
2185 	 */
2186 	int				pc_index;
2187 	/**
2188 	 * Pointer to the array of partners' ptlrpcd_ctl structure.
2189 	 */
2190 	struct ptlrpcd_ctl	**pc_partners;
2191 	/**
2192 	 * Number of the ptlrpcd's partners.
2193 	 */
2194 	int				pc_npartners;
2195 	/**
2196 	 * Record the partner index to be processed next.
2197 	 */
2198 	int			 pc_cursor;
2199 	/**
2200 	 * Error code if the thread failed to fully start.
2201 	 */
2202 	int				pc_error;
2203 };
2204 
2205 /* Bits for pc_flags */
2206 enum ptlrpcd_ctl_flags {
2207 	/**
2208 	 * Ptlrpc thread start flag.
2209 	 */
2210 	LIOD_START       = 1 << 0,
2211 	/**
2212 	 * Ptlrpc thread stop flag.
2213 	 */
2214 	LIOD_STOP	= 1 << 1,
2215 	/**
2216 	 * Ptlrpc thread force flag (only stop force so far).
2217 	 * This will cause aborting any inflight rpcs handled
2218 	 * by thread if LIOD_STOP is specified.
2219 	 */
2220 	LIOD_FORCE       = 1 << 2,
2221 	/**
2222 	 * This is a recovery ptlrpc thread.
2223 	 */
2224 	LIOD_RECOVERY    = 1 << 3,
2225 };
2226 
2227 /**
2228  * \addtogroup nrs
2229  * @{
2230  *
2231  * Service compatibility function; the policy is compatible with all services.
2232  *
2233  * \param[in] svc  The service the policy is attempting to register with.
2234  * \param[in] desc The policy descriptor
2235  *
2236  * \retval true The policy is compatible with the service
2237  *
2238  * \see ptlrpc_nrs_pol_desc::pd_compat()
2239  */
nrs_policy_compat_all(const struct ptlrpc_service * svc,const struct ptlrpc_nrs_pol_desc * desc)2240 static inline bool nrs_policy_compat_all(const struct ptlrpc_service *svc,
2241 					 const struct ptlrpc_nrs_pol_desc *desc)
2242 {
2243 	return true;
2244 }
2245 
2246 /**
2247  * Service compatibility function; the policy is compatible with only a specific
2248  * service which is identified by its human-readable name at
2249  * ptlrpc_service::srv_name.
2250  *
2251  * \param[in] svc  The service the policy is attempting to register with.
2252  * \param[in] desc The policy descriptor
2253  *
2254  * \retval false The policy is not compatible with the service
2255  * \retval true	 The policy is compatible with the service
2256  *
2257  * \see ptlrpc_nrs_pol_desc::pd_compat()
2258  */
nrs_policy_compat_one(const struct ptlrpc_service * svc,const struct ptlrpc_nrs_pol_desc * desc)2259 static inline bool nrs_policy_compat_one(const struct ptlrpc_service *svc,
2260 					 const struct ptlrpc_nrs_pol_desc *desc)
2261 {
2262 	LASSERT(desc->pd_compat_svc_name != NULL);
2263 	return strcmp(svc->srv_name, desc->pd_compat_svc_name) == 0;
2264 }
2265 
2266 /** @} nrs */
2267 
2268 /* ptlrpc/events.c */
2269 extern lnet_handle_eq_t ptlrpc_eq_h;
2270 int ptlrpc_uuid_to_peer(struct obd_uuid *uuid,
2271 			lnet_process_id_t *peer, lnet_nid_t *self);
2272 /**
2273  * These callbacks are invoked by LNet when something happened to
2274  * underlying buffer
2275  * @{
2276  */
2277 void request_out_callback(lnet_event_t *ev);
2278 void reply_in_callback(lnet_event_t *ev);
2279 void client_bulk_callback(lnet_event_t *ev);
2280 void request_in_callback(lnet_event_t *ev);
2281 void reply_out_callback(lnet_event_t *ev);
2282 /** @} */
2283 
2284 /* ptlrpc/connection.c */
2285 struct ptlrpc_connection *ptlrpc_connection_get(lnet_process_id_t peer,
2286 						lnet_nid_t self,
2287 						struct obd_uuid *uuid);
2288 int ptlrpc_connection_put(struct ptlrpc_connection *c);
2289 struct ptlrpc_connection *ptlrpc_connection_addref(struct ptlrpc_connection *);
2290 int ptlrpc_connection_init(void);
2291 void ptlrpc_connection_fini(void);
2292 
2293 /* ptlrpc/niobuf.c */
2294 /**
2295  * Actual interfacing with LNet to put/get/register/unregister stuff
2296  * @{
2297  */
2298 
2299 int ptlrpc_unregister_bulk(struct ptlrpc_request *req, int async);
2300 
ptlrpc_client_bulk_active(struct ptlrpc_request * req)2301 static inline int ptlrpc_client_bulk_active(struct ptlrpc_request *req)
2302 {
2303 	struct ptlrpc_bulk_desc *desc;
2304 	int		      rc;
2305 
2306 	LASSERT(req != NULL);
2307 	desc = req->rq_bulk;
2308 
2309 	if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_BULK_UNLINK) &&
2310 	    req->rq_bulk_deadline > ktime_get_real_seconds())
2311 		return 1;
2312 
2313 	if (!desc)
2314 		return 0;
2315 
2316 	spin_lock(&desc->bd_lock);
2317 	rc = desc->bd_md_count;
2318 	spin_unlock(&desc->bd_lock);
2319 	return rc;
2320 }
2321 
2322 #define PTLRPC_REPLY_MAYBE_DIFFICULT 0x01
2323 #define PTLRPC_REPLY_EARLY	   0x02
2324 int ptlrpc_send_reply(struct ptlrpc_request *req, int flags);
2325 int ptlrpc_reply(struct ptlrpc_request *req);
2326 int ptlrpc_send_error(struct ptlrpc_request *req, int difficult);
2327 int ptlrpc_error(struct ptlrpc_request *req);
2328 void ptlrpc_resend_req(struct ptlrpc_request *request);
2329 int ptlrpc_at_get_net_latency(struct ptlrpc_request *req);
2330 int ptl_send_rpc(struct ptlrpc_request *request, int noreply);
2331 int ptlrpc_register_rqbd(struct ptlrpc_request_buffer_desc *rqbd);
2332 /** @} */
2333 
2334 /* ptlrpc/client.c */
2335 /**
2336  * Client-side portals API. Everything to send requests, receive replies,
2337  * request queues, request management, etc.
2338  * @{
2339  */
2340 void ptlrpc_request_committed(struct ptlrpc_request *req, int force);
2341 
2342 void ptlrpc_init_client(int req_portal, int rep_portal, char *name,
2343 			struct ptlrpc_client *);
2344 struct ptlrpc_connection *ptlrpc_uuid_to_connection(struct obd_uuid *uuid);
2345 
2346 int ptlrpc_queue_wait(struct ptlrpc_request *req);
2347 int ptlrpc_replay_req(struct ptlrpc_request *req);
2348 int ptlrpc_unregister_reply(struct ptlrpc_request *req, int async);
2349 void ptlrpc_abort_inflight(struct obd_import *imp);
2350 void ptlrpc_abort_set(struct ptlrpc_request_set *set);
2351 
2352 struct ptlrpc_request_set *ptlrpc_prep_set(void);
2353 struct ptlrpc_request_set *ptlrpc_prep_fcset(int max, set_producer_func func,
2354 					     void *arg);
2355 int ptlrpc_set_next_timeout(struct ptlrpc_request_set *);
2356 int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set);
2357 int ptlrpc_set_wait(struct ptlrpc_request_set *);
2358 int ptlrpc_expired_set(void *data);
2359 void ptlrpc_interrupted_set(void *data);
2360 void ptlrpc_mark_interrupted(struct ptlrpc_request *req);
2361 void ptlrpc_set_destroy(struct ptlrpc_request_set *);
2362 void ptlrpc_set_add_req(struct ptlrpc_request_set *, struct ptlrpc_request *);
2363 void ptlrpc_set_add_new_req(struct ptlrpcd_ctl *pc,
2364 			    struct ptlrpc_request *req);
2365 
2366 void ptlrpc_free_rq_pool(struct ptlrpc_request_pool *pool);
2367 int ptlrpc_add_rqs_to_pool(struct ptlrpc_request_pool *pool, int num_rq);
2368 
2369 struct ptlrpc_request_pool *
2370 ptlrpc_init_rq_pool(int, int,
2371 		    int (*populate_pool)(struct ptlrpc_request_pool *, int));
2372 
2373 void ptlrpc_at_set_req_timeout(struct ptlrpc_request *req);
2374 struct ptlrpc_request *ptlrpc_request_alloc(struct obd_import *imp,
2375 					    const struct req_format *format);
2376 struct ptlrpc_request *ptlrpc_request_alloc_pool(struct obd_import *imp,
2377 					    struct ptlrpc_request_pool *,
2378 					    const struct req_format *format);
2379 void ptlrpc_request_free(struct ptlrpc_request *request);
2380 int ptlrpc_request_pack(struct ptlrpc_request *request,
2381 			__u32 version, int opcode);
2382 struct ptlrpc_request *ptlrpc_request_alloc_pack(struct obd_import *imp,
2383 						const struct req_format *format,
2384 						__u32 version, int opcode);
2385 int ptlrpc_request_bufs_pack(struct ptlrpc_request *request,
2386 			     __u32 version, int opcode, char **bufs,
2387 			     struct ptlrpc_cli_ctx *ctx);
2388 void ptlrpc_req_finished(struct ptlrpc_request *request);
2389 struct ptlrpc_request *ptlrpc_request_addref(struct ptlrpc_request *req);
2390 struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_imp(struct ptlrpc_request *req,
2391 					      unsigned npages, unsigned max_brw,
2392 					      unsigned type, unsigned portal);
2393 void __ptlrpc_free_bulk(struct ptlrpc_bulk_desc *bulk, int pin);
ptlrpc_free_bulk_pin(struct ptlrpc_bulk_desc * bulk)2394 static inline void ptlrpc_free_bulk_pin(struct ptlrpc_bulk_desc *bulk)
2395 {
2396 	__ptlrpc_free_bulk(bulk, 1);
2397 }
2398 
ptlrpc_free_bulk_nopin(struct ptlrpc_bulk_desc * bulk)2399 static inline void ptlrpc_free_bulk_nopin(struct ptlrpc_bulk_desc *bulk)
2400 {
2401 	__ptlrpc_free_bulk(bulk, 0);
2402 }
2403 
2404 void __ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc,
2405 			     struct page *page, int pageoffset, int len, int);
ptlrpc_prep_bulk_page_pin(struct ptlrpc_bulk_desc * desc,struct page * page,int pageoffset,int len)2406 static inline void ptlrpc_prep_bulk_page_pin(struct ptlrpc_bulk_desc *desc,
2407 					     struct page *page, int pageoffset,
2408 					     int len)
2409 {
2410 	__ptlrpc_prep_bulk_page(desc, page, pageoffset, len, 1);
2411 }
2412 
ptlrpc_prep_bulk_page_nopin(struct ptlrpc_bulk_desc * desc,struct page * page,int pageoffset,int len)2413 static inline void ptlrpc_prep_bulk_page_nopin(struct ptlrpc_bulk_desc *desc,
2414 					       struct page *page, int pageoffset,
2415 					       int len)
2416 {
2417 	__ptlrpc_prep_bulk_page(desc, page, pageoffset, len, 0);
2418 }
2419 
2420 void ptlrpc_retain_replayable_request(struct ptlrpc_request *req,
2421 				      struct obd_import *imp);
2422 __u64 ptlrpc_next_xid(void);
2423 __u64 ptlrpc_sample_next_xid(void);
2424 __u64 ptlrpc_req_xid(struct ptlrpc_request *request);
2425 
2426 /* Set of routines to run a function in ptlrpcd context */
2427 void *ptlrpcd_alloc_work(struct obd_import *imp,
2428 			 int (*cb)(const struct lu_env *, void *), void *data);
2429 void ptlrpcd_destroy_work(void *handler);
2430 int ptlrpcd_queue_work(void *handler);
2431 
2432 /** @} */
2433 struct ptlrpc_service_buf_conf {
2434 	/* nbufs is buffers # to allocate when growing the pool */
2435 	unsigned int			bc_nbufs;
2436 	/* buffer size to post */
2437 	unsigned int			bc_buf_size;
2438 	/* portal to listed for requests on */
2439 	unsigned int			bc_req_portal;
2440 	/* portal of where to send replies to */
2441 	unsigned int			bc_rep_portal;
2442 	/* maximum request size to be accepted for this service */
2443 	unsigned int			bc_req_max_size;
2444 	/* maximum reply size this service can ever send */
2445 	unsigned int			bc_rep_max_size;
2446 };
2447 
2448 struct ptlrpc_service_thr_conf {
2449 	/* threadname should be 8 characters or less - 6 will be added on */
2450 	char				*tc_thr_name;
2451 	/* threads increasing factor for each CPU */
2452 	unsigned int			tc_thr_factor;
2453 	/* service threads # to start on each partition while initializing */
2454 	unsigned int			tc_nthrs_init;
2455 	/*
2456 	 * low water of threads # upper-limit on each partition while running,
2457 	 * service availability may be impacted if threads number is lower
2458 	 * than this value. It can be ZERO if the service doesn't require
2459 	 * CPU affinity or there is only one partition.
2460 	 */
2461 	unsigned int			tc_nthrs_base;
2462 	/* "soft" limit for total threads number */
2463 	unsigned int			tc_nthrs_max;
2464 	/* user specified threads number, it will be validated due to
2465 	 * other members of this structure. */
2466 	unsigned int			tc_nthrs_user;
2467 	/* set NUMA node affinity for service threads */
2468 	unsigned int			tc_cpu_affinity;
2469 	/* Tags for lu_context associated with service thread */
2470 	__u32				tc_ctx_tags;
2471 };
2472 
2473 struct ptlrpc_service_cpt_conf {
2474 	struct cfs_cpt_table		*cc_cptable;
2475 	/* string pattern to describe CPTs for a service */
2476 	char				*cc_pattern;
2477 };
2478 
2479 struct ptlrpc_service_conf {
2480 	/* service name */
2481 	char				*psc_name;
2482 	/* soft watchdog timeout multiplifier to print stuck service traces */
2483 	unsigned int			psc_watchdog_factor;
2484 	/* buffer information */
2485 	struct ptlrpc_service_buf_conf	psc_buf;
2486 	/* thread information */
2487 	struct ptlrpc_service_thr_conf	psc_thr;
2488 	/* CPU partition information */
2489 	struct ptlrpc_service_cpt_conf	psc_cpt;
2490 	/* function table */
2491 	struct ptlrpc_service_ops	psc_ops;
2492 };
2493 
2494 /* ptlrpc/service.c */
2495 /**
2496  * Server-side services API. Register/unregister service, request state
2497  * management, service thread management
2498  *
2499  * @{
2500  */
2501 void ptlrpc_dispatch_difficult_reply(struct ptlrpc_reply_state *rs);
2502 void ptlrpc_schedule_difficult_reply(struct ptlrpc_reply_state *rs);
2503 struct ptlrpc_service *ptlrpc_register_service(
2504 				struct ptlrpc_service_conf *conf,
2505 				struct kset *parent,
2506 				struct dentry *debugfs_entry);
2507 
2508 int ptlrpc_start_threads(struct ptlrpc_service *svc);
2509 int ptlrpc_unregister_service(struct ptlrpc_service *service);
2510 int liblustre_check_services(void *arg);
2511 
2512 int ptlrpc_hr_init(void);
2513 void ptlrpc_hr_fini(void);
2514 
2515 /** @} */
2516 
2517 /* ptlrpc/import.c */
2518 /**
2519  * Import API
2520  * @{
2521  */
2522 int ptlrpc_connect_import(struct obd_import *imp);
2523 int ptlrpc_init_import(struct obd_import *imp);
2524 int ptlrpc_disconnect_import(struct obd_import *imp, int noclose);
2525 int ptlrpc_import_recovery_state_machine(struct obd_import *imp);
2526 
2527 /* ptlrpc/pack_generic.c */
2528 int ptlrpc_reconnect_import(struct obd_import *imp);
2529 /** @} */
2530 
2531 /**
2532  * ptlrpc msg buffer and swab interface
2533  *
2534  * @{
2535  */
2536 int ptlrpc_buf_need_swab(struct ptlrpc_request *req, const int inout,
2537 			 int index);
2538 void ptlrpc_buf_set_swabbed(struct ptlrpc_request *req, const int inout,
2539 				int index);
2540 int ptlrpc_unpack_rep_msg(struct ptlrpc_request *req, int len);
2541 int ptlrpc_unpack_req_msg(struct ptlrpc_request *req, int len);
2542 
2543 void lustre_init_msg_v2(struct lustre_msg_v2 *msg, int count, __u32 *lens,
2544 			char **bufs);
2545 int lustre_pack_request(struct ptlrpc_request *, __u32 magic, int count,
2546 			__u32 *lens, char **bufs);
2547 int lustre_pack_reply(struct ptlrpc_request *, int count, __u32 *lens,
2548 		      char **bufs);
2549 int lustre_pack_reply_v2(struct ptlrpc_request *req, int count,
2550 			 __u32 *lens, char **bufs, int flags);
2551 #define LPRFL_EARLY_REPLY 1
2552 int lustre_pack_reply_flags(struct ptlrpc_request *, int count, __u32 *lens,
2553 			    char **bufs, int flags);
2554 int lustre_shrink_msg(struct lustre_msg *msg, int segment,
2555 		      unsigned int newlen, int move_data);
2556 void lustre_free_reply_state(struct ptlrpc_reply_state *rs);
2557 int __lustre_unpack_msg(struct lustre_msg *m, int len);
2558 int lustre_msg_hdr_size(__u32 magic, int count);
2559 int lustre_msg_size(__u32 magic, int count, __u32 *lengths);
2560 int lustre_msg_size_v2(int count, __u32 *lengths);
2561 int lustre_packed_msg_size(struct lustre_msg *msg);
2562 int lustre_msg_early_size(void);
2563 void *lustre_msg_buf_v2(struct lustre_msg_v2 *m, int n, int min_size);
2564 void *lustre_msg_buf(struct lustre_msg *m, int n, int minlen);
2565 int lustre_msg_buflen(struct lustre_msg *m, int n);
2566 int lustre_msg_bufcount(struct lustre_msg *m);
2567 char *lustre_msg_string(struct lustre_msg *m, int n, int max_len);
2568 __u32 lustre_msghdr_get_flags(struct lustre_msg *msg);
2569 void lustre_msghdr_set_flags(struct lustre_msg *msg, __u32 flags);
2570 __u32 lustre_msg_get_flags(struct lustre_msg *msg);
2571 void lustre_msg_add_flags(struct lustre_msg *msg, int flags);
2572 void lustre_msg_set_flags(struct lustre_msg *msg, int flags);
2573 void lustre_msg_clear_flags(struct lustre_msg *msg, int flags);
2574 __u32 lustre_msg_get_op_flags(struct lustre_msg *msg);
2575 void lustre_msg_add_op_flags(struct lustre_msg *msg, int flags);
2576 struct lustre_handle *lustre_msg_get_handle(struct lustre_msg *msg);
2577 __u32 lustre_msg_get_type(struct lustre_msg *msg);
2578 void lustre_msg_add_version(struct lustre_msg *msg, int version);
2579 __u32 lustre_msg_get_opc(struct lustre_msg *msg);
2580 __u64 lustre_msg_get_last_committed(struct lustre_msg *msg);
2581 __u64 *lustre_msg_get_versions(struct lustre_msg *msg);
2582 __u64 lustre_msg_get_transno(struct lustre_msg *msg);
2583 __u64 lustre_msg_get_slv(struct lustre_msg *msg);
2584 __u32 lustre_msg_get_limit(struct lustre_msg *msg);
2585 void lustre_msg_set_slv(struct lustre_msg *msg, __u64 slv);
2586 void lustre_msg_set_limit(struct lustre_msg *msg, __u64 limit);
2587 int lustre_msg_get_status(struct lustre_msg *msg);
2588 __u32 lustre_msg_get_conn_cnt(struct lustre_msg *msg);
2589 __u32 lustre_msg_get_magic(struct lustre_msg *msg);
2590 __u32 lustre_msg_get_timeout(struct lustre_msg *msg);
2591 __u32 lustre_msg_get_service_time(struct lustre_msg *msg);
2592 __u32 lustre_msg_get_cksum(struct lustre_msg *msg);
2593 __u32 lustre_msg_calc_cksum(struct lustre_msg *msg);
2594 void lustre_msg_set_handle(struct lustre_msg *msg,
2595 			   struct lustre_handle *handle);
2596 void lustre_msg_set_type(struct lustre_msg *msg, __u32 type);
2597 void lustre_msg_set_opc(struct lustre_msg *msg, __u32 opc);
2598 void lustre_msg_set_versions(struct lustre_msg *msg, __u64 *versions);
2599 void lustre_msg_set_transno(struct lustre_msg *msg, __u64 transno);
2600 void lustre_msg_set_status(struct lustre_msg *msg, __u32 status);
2601 void lustre_msg_set_conn_cnt(struct lustre_msg *msg, __u32 conn_cnt);
2602 void ptlrpc_request_set_replen(struct ptlrpc_request *req);
2603 void lustre_msg_set_timeout(struct lustre_msg *msg, __u32 timeout);
2604 void lustre_msg_set_service_time(struct lustre_msg *msg, __u32 service_time);
2605 void lustre_msg_set_jobid(struct lustre_msg *msg, char *jobid);
2606 void lustre_msg_set_cksum(struct lustre_msg *msg, __u32 cksum);
2607 
2608 static inline void
lustre_shrink_reply(struct ptlrpc_request * req,int segment,unsigned int newlen,int move_data)2609 lustre_shrink_reply(struct ptlrpc_request *req, int segment,
2610 		    unsigned int newlen, int move_data)
2611 {
2612 	LASSERT(req->rq_reply_state);
2613 	LASSERT(req->rq_repmsg);
2614 	req->rq_replen = lustre_shrink_msg(req->rq_repmsg, segment,
2615 					   newlen, move_data);
2616 }
2617 
2618 #ifdef CONFIG_LUSTRE_TRANSLATE_ERRNOS
2619 
ptlrpc_status_hton(int h)2620 static inline int ptlrpc_status_hton(int h)
2621 {
2622 	/*
2623 	 * Positive errnos must be network errnos, such as LUSTRE_EDEADLK,
2624 	 * ELDLM_LOCK_ABORTED, etc.
2625 	 */
2626 	if (h < 0)
2627 		return -lustre_errno_hton(-h);
2628 	else
2629 		return h;
2630 }
2631 
ptlrpc_status_ntoh(int n)2632 static inline int ptlrpc_status_ntoh(int n)
2633 {
2634 	/*
2635 	 * See the comment in ptlrpc_status_hton().
2636 	 */
2637 	if (n < 0)
2638 		return -lustre_errno_ntoh(-n);
2639 	else
2640 		return n;
2641 }
2642 
2643 #else
2644 
2645 #define ptlrpc_status_hton(h) (h)
2646 #define ptlrpc_status_ntoh(n) (n)
2647 
2648 #endif
2649 /** @} */
2650 
2651 /** Change request phase of \a req to \a new_phase */
2652 static inline void
ptlrpc_rqphase_move(struct ptlrpc_request * req,enum rq_phase new_phase)2653 ptlrpc_rqphase_move(struct ptlrpc_request *req, enum rq_phase new_phase)
2654 {
2655 	if (req->rq_phase == new_phase)
2656 		return;
2657 
2658 	if (new_phase == RQ_PHASE_UNREGISTERING) {
2659 		req->rq_next_phase = req->rq_phase;
2660 		if (req->rq_import)
2661 			atomic_inc(&req->rq_import->imp_unregistering);
2662 	}
2663 
2664 	if (req->rq_phase == RQ_PHASE_UNREGISTERING) {
2665 		if (req->rq_import)
2666 			atomic_dec(&req->rq_import->imp_unregistering);
2667 	}
2668 
2669 	DEBUG_REQ(D_INFO, req, "move req \"%s\" -> \"%s\"",
2670 		  ptlrpc_rqphase2str(req), ptlrpc_phase2str(new_phase));
2671 
2672 	req->rq_phase = new_phase;
2673 }
2674 
2675 /**
2676  * Returns true if request \a req got early reply and hard deadline is not met
2677  */
2678 static inline int
ptlrpc_client_early(struct ptlrpc_request * req)2679 ptlrpc_client_early(struct ptlrpc_request *req)
2680 {
2681 	if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK) &&
2682 	    req->rq_reply_deadline > ktime_get_real_seconds())
2683 		return 0;
2684 	return req->rq_early;
2685 }
2686 
2687 /**
2688  * Returns true if we got real reply from server for this request
2689  */
2690 static inline int
ptlrpc_client_replied(struct ptlrpc_request * req)2691 ptlrpc_client_replied(struct ptlrpc_request *req)
2692 {
2693 	if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK) &&
2694 	    req->rq_reply_deadline > ktime_get_real_seconds())
2695 		return 0;
2696 	return req->rq_replied;
2697 }
2698 
2699 /** Returns true if request \a req is in process of receiving server reply */
2700 static inline int
ptlrpc_client_recv(struct ptlrpc_request * req)2701 ptlrpc_client_recv(struct ptlrpc_request *req)
2702 {
2703 	if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK) &&
2704 	    req->rq_reply_deadline > ktime_get_real_seconds())
2705 		return 1;
2706 	return req->rq_receiving_reply;
2707 }
2708 
2709 static inline int
ptlrpc_client_recv_or_unlink(struct ptlrpc_request * req)2710 ptlrpc_client_recv_or_unlink(struct ptlrpc_request *req)
2711 {
2712 	int rc;
2713 
2714 	spin_lock(&req->rq_lock);
2715 	if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK) &&
2716 	    req->rq_reply_deadline > ktime_get_real_seconds()) {
2717 		spin_unlock(&req->rq_lock);
2718 		return 1;
2719 	}
2720 	rc = req->rq_receiving_reply;
2721 	rc = rc || req->rq_req_unlink || req->rq_reply_unlink;
2722 	spin_unlock(&req->rq_lock);
2723 	return rc;
2724 }
2725 
2726 static inline void
ptlrpc_client_wake_req(struct ptlrpc_request * req)2727 ptlrpc_client_wake_req(struct ptlrpc_request *req)
2728 {
2729 	if (req->rq_set == NULL)
2730 		wake_up(&req->rq_reply_waitq);
2731 	else
2732 		wake_up(&req->rq_set->set_waitq);
2733 }
2734 
2735 static inline void
ptlrpc_rs_addref(struct ptlrpc_reply_state * rs)2736 ptlrpc_rs_addref(struct ptlrpc_reply_state *rs)
2737 {
2738 	LASSERT(atomic_read(&rs->rs_refcount) > 0);
2739 	atomic_inc(&rs->rs_refcount);
2740 }
2741 
2742 static inline void
ptlrpc_rs_decref(struct ptlrpc_reply_state * rs)2743 ptlrpc_rs_decref(struct ptlrpc_reply_state *rs)
2744 {
2745 	LASSERT(atomic_read(&rs->rs_refcount) > 0);
2746 	if (atomic_dec_and_test(&rs->rs_refcount))
2747 		lustre_free_reply_state(rs);
2748 }
2749 
2750 /* Should only be called once per req */
ptlrpc_req_drop_rs(struct ptlrpc_request * req)2751 static inline void ptlrpc_req_drop_rs(struct ptlrpc_request *req)
2752 {
2753 	if (req->rq_reply_state == NULL)
2754 		return; /* shouldn't occur */
2755 	ptlrpc_rs_decref(req->rq_reply_state);
2756 	req->rq_reply_state = NULL;
2757 	req->rq_repmsg = NULL;
2758 }
2759 
lustre_request_magic(struct ptlrpc_request * req)2760 static inline __u32 lustre_request_magic(struct ptlrpc_request *req)
2761 {
2762 	return lustre_msg_get_magic(req->rq_reqmsg);
2763 }
2764 
ptlrpc_req_get_repsize(struct ptlrpc_request * req)2765 static inline int ptlrpc_req_get_repsize(struct ptlrpc_request *req)
2766 {
2767 	switch (req->rq_reqmsg->lm_magic) {
2768 	case LUSTRE_MSG_MAGIC_V2:
2769 		return req->rq_reqmsg->lm_repsize;
2770 	default:
2771 		LASSERTF(0, "incorrect message magic: %08x\n",
2772 			 req->rq_reqmsg->lm_magic);
2773 		return -EFAULT;
2774 	}
2775 }
2776 
ptlrpc_send_limit_expired(struct ptlrpc_request * req)2777 static inline int ptlrpc_send_limit_expired(struct ptlrpc_request *req)
2778 {
2779 	if (req->rq_delay_limit != 0 &&
2780 	    time_before(cfs_time_add(req->rq_queued_time,
2781 				     cfs_time_seconds(req->rq_delay_limit)),
2782 			cfs_time_current())) {
2783 		return 1;
2784 	}
2785 	return 0;
2786 }
2787 
ptlrpc_no_resend(struct ptlrpc_request * req)2788 static inline int ptlrpc_no_resend(struct ptlrpc_request *req)
2789 {
2790 	if (!req->rq_no_resend && ptlrpc_send_limit_expired(req)) {
2791 		spin_lock(&req->rq_lock);
2792 		req->rq_no_resend = 1;
2793 		spin_unlock(&req->rq_lock);
2794 	}
2795 	return req->rq_no_resend;
2796 }
2797 
2798 static inline int
ptlrpc_server_get_timeout(struct ptlrpc_service_part * svcpt)2799 ptlrpc_server_get_timeout(struct ptlrpc_service_part *svcpt)
2800 {
2801 	int at = AT_OFF ? 0 : at_get(&svcpt->scp_at_estimate);
2802 
2803 	return svcpt->scp_service->srv_watchdog_factor *
2804 	       max_t(int, at, obd_timeout);
2805 }
2806 
2807 static inline struct ptlrpc_service *
ptlrpc_req2svc(struct ptlrpc_request * req)2808 ptlrpc_req2svc(struct ptlrpc_request *req)
2809 {
2810 	LASSERT(req->rq_rqbd != NULL);
2811 	return req->rq_rqbd->rqbd_svcpt->scp_service;
2812 }
2813 
2814 /* ldlm/ldlm_lib.c */
2815 /**
2816  * Target client logic
2817  * @{
2818  */
2819 int client_obd_setup(struct obd_device *obddev, struct lustre_cfg *lcfg);
2820 int client_obd_cleanup(struct obd_device *obddev);
2821 int client_connect_import(const struct lu_env *env,
2822 			  struct obd_export **exp, struct obd_device *obd,
2823 			  struct obd_uuid *cluuid, struct obd_connect_data *,
2824 			  void *localdata);
2825 int client_disconnect_export(struct obd_export *exp);
2826 int client_import_add_conn(struct obd_import *imp, struct obd_uuid *uuid,
2827 			   int priority);
2828 int client_import_del_conn(struct obd_import *imp, struct obd_uuid *uuid);
2829 int client_import_find_conn(struct obd_import *imp, lnet_nid_t peer,
2830 			    struct obd_uuid *uuid);
2831 int import_set_conn_priority(struct obd_import *imp, struct obd_uuid *uuid);
2832 void client_destroy_import(struct obd_import *imp);
2833 /** @} */
2834 
2835 /* ptlrpc/pinger.c */
2836 /**
2837  * Pinger API (client side only)
2838  * @{
2839  */
2840 enum timeout_event {
2841 	TIMEOUT_GRANT = 1
2842 };
2843 
2844 struct timeout_item;
2845 typedef int (*timeout_cb_t)(struct timeout_item *, void *);
2846 int ptlrpc_pinger_add_import(struct obd_import *imp);
2847 int ptlrpc_pinger_del_import(struct obd_import *imp);
2848 int ptlrpc_add_timeout_client(int time, enum timeout_event event,
2849 			      timeout_cb_t cb, void *data,
2850 			      struct list_head *obd_list);
2851 int ptlrpc_del_timeout_client(struct list_head *obd_list,
2852 			      enum timeout_event event);
2853 struct ptlrpc_request *ptlrpc_prep_ping(struct obd_import *imp);
2854 int ptlrpc_obd_ping(struct obd_device *obd);
2855 void ptlrpc_pinger_ir_up(void);
2856 void ptlrpc_pinger_ir_down(void);
2857 /** @} */
2858 int ptlrpc_pinger_suppress_pings(void);
2859 
2860 /* ptlrpc/ptlrpcd.c */
2861 void ptlrpcd_stop(struct ptlrpcd_ctl *pc, int force);
2862 void ptlrpcd_free(struct ptlrpcd_ctl *pc);
2863 void ptlrpcd_wake(struct ptlrpc_request *req);
2864 void ptlrpcd_add_req(struct ptlrpc_request *req);
2865 int ptlrpcd_addref(void);
2866 void ptlrpcd_decref(void);
2867 
2868 /* ptlrpc/lproc_ptlrpc.c */
2869 /**
2870  * procfs output related functions
2871  * @{
2872  */
2873 const char *ll_opcode2str(__u32 opcode);
2874 void ptlrpc_lprocfs_register_obd(struct obd_device *obd);
2875 void ptlrpc_lprocfs_unregister_obd(struct obd_device *obd);
2876 void ptlrpc_lprocfs_brw(struct ptlrpc_request *req, int bytes);
2877 /** @} */
2878 
2879 /* ptlrpc/llog_client.c */
2880 extern struct llog_operations llog_client_ops;
2881 /** @} net */
2882 
2883 #endif
2884 /** @} PtlRPC */
2885