• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36 
37 /** Implementation of client-side PortalRPC interfaces */
38 
39 #define DEBUG_SUBSYSTEM S_RPC
40 
41 #include "../include/obd_support.h"
42 #include "../include/obd_class.h"
43 #include "../include/lustre_lib.h"
44 #include "../include/lustre_ha.h"
45 #include "../include/lustre_import.h"
46 #include "../include/lustre_req_layout.h"
47 
48 #include "ptlrpc_internal.h"
49 
50 static int ptlrpc_send_new_req(struct ptlrpc_request *req);
51 static int ptlrpcd_check_work(struct ptlrpc_request *req);
52 
53 /**
54  * Initialize passed in client structure \a cl.
55  */
ptlrpc_init_client(int req_portal,int rep_portal,char * name,struct ptlrpc_client * cl)56 void ptlrpc_init_client(int req_portal, int rep_portal, char *name,
57 			struct ptlrpc_client *cl)
58 {
59 	cl->cli_request_portal = req_portal;
60 	cl->cli_reply_portal = rep_portal;
61 	cl->cli_name = name;
62 }
63 EXPORT_SYMBOL(ptlrpc_init_client);
64 
65 /**
66  * Return PortalRPC connection for remote uud \a uuid
67  */
ptlrpc_uuid_to_connection(struct obd_uuid * uuid)68 struct ptlrpc_connection *ptlrpc_uuid_to_connection(struct obd_uuid *uuid)
69 {
70 	struct ptlrpc_connection *c;
71 	lnet_nid_t self;
72 	lnet_process_id_t peer;
73 	int err;
74 
75 	/*
76 	 * ptlrpc_uuid_to_peer() initializes its 2nd parameter
77 	 * before accessing its values.
78 	 * coverity[uninit_use_in_call]
79 	 */
80 	err = ptlrpc_uuid_to_peer(uuid, &peer, &self);
81 	if (err != 0) {
82 		CNETERR("cannot find peer %s!\n", uuid->uuid);
83 		return NULL;
84 	}
85 
86 	c = ptlrpc_connection_get(peer, self, uuid);
87 	if (c) {
88 		memcpy(c->c_remote_uuid.uuid,
89 		       uuid->uuid, sizeof(c->c_remote_uuid.uuid));
90 	}
91 
92 	CDEBUG(D_INFO, "%s -> %p\n", uuid->uuid, c);
93 
94 	return c;
95 }
96 EXPORT_SYMBOL(ptlrpc_uuid_to_connection);
97 
98 /**
99  * Allocate and initialize new bulk descriptor on the sender.
100  * Returns pointer to the descriptor or NULL on error.
101  */
ptlrpc_new_bulk(unsigned npages,unsigned max_brw,unsigned type,unsigned portal)102 struct ptlrpc_bulk_desc *ptlrpc_new_bulk(unsigned npages, unsigned max_brw,
103 					 unsigned type, unsigned portal)
104 {
105 	struct ptlrpc_bulk_desc *desc;
106 	int i;
107 
108 	desc = kzalloc(offsetof(struct ptlrpc_bulk_desc, bd_iov[npages]),
109 		       GFP_NOFS);
110 	if (!desc)
111 		return NULL;
112 
113 	spin_lock_init(&desc->bd_lock);
114 	init_waitqueue_head(&desc->bd_waitq);
115 	desc->bd_max_iov = npages;
116 	desc->bd_iov_count = 0;
117 	desc->bd_portal = portal;
118 	desc->bd_type = type;
119 	desc->bd_md_count = 0;
120 	LASSERT(max_brw > 0);
121 	desc->bd_md_max_brw = min(max_brw, PTLRPC_BULK_OPS_COUNT);
122 	/*
123 	 * PTLRPC_BULK_OPS_COUNT is the compile-time transfer limit for this
124 	 * node. Negotiated ocd_brw_size will always be <= this number.
125 	 */
126 	for (i = 0; i < PTLRPC_BULK_OPS_COUNT; i++)
127 		LNetInvalidateHandle(&desc->bd_mds[i]);
128 
129 	return desc;
130 }
131 
132 /**
133  * Prepare bulk descriptor for specified outgoing request \a req that
134  * can fit \a npages * pages. \a type is bulk type. \a portal is where
135  * the bulk to be sent. Used on client-side.
136  * Returns pointer to newly allocated initialized bulk descriptor or NULL on
137  * error.
138  */
ptlrpc_prep_bulk_imp(struct ptlrpc_request * req,unsigned npages,unsigned max_brw,unsigned type,unsigned portal)139 struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_imp(struct ptlrpc_request *req,
140 					      unsigned npages, unsigned max_brw,
141 					      unsigned type, unsigned portal)
142 {
143 	struct obd_import *imp = req->rq_import;
144 	struct ptlrpc_bulk_desc *desc;
145 
146 	LASSERT(type == BULK_PUT_SINK || type == BULK_GET_SOURCE);
147 	desc = ptlrpc_new_bulk(npages, max_brw, type, portal);
148 	if (desc == NULL)
149 		return NULL;
150 
151 	desc->bd_import_generation = req->rq_import_generation;
152 	desc->bd_import = class_import_get(imp);
153 	desc->bd_req = req;
154 
155 	desc->bd_cbid.cbid_fn = client_bulk_callback;
156 	desc->bd_cbid.cbid_arg = desc;
157 
158 	/* This makes req own desc, and free it when she frees herself */
159 	req->rq_bulk = desc;
160 
161 	return desc;
162 }
163 EXPORT_SYMBOL(ptlrpc_prep_bulk_imp);
164 
165 /**
166  * Add a page \a page to the bulk descriptor \a desc.
167  * Data to transfer in the page starts at offset \a pageoffset and
168  * amount of data to transfer from the page is \a len
169  */
__ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc * desc,struct page * page,int pageoffset,int len,int pin)170 void __ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc,
171 			     struct page *page, int pageoffset, int len, int pin)
172 {
173 	LASSERT(desc->bd_iov_count < desc->bd_max_iov);
174 	LASSERT(page != NULL);
175 	LASSERT(pageoffset >= 0);
176 	LASSERT(len > 0);
177 	LASSERT(pageoffset + len <= PAGE_CACHE_SIZE);
178 
179 	desc->bd_nob += len;
180 
181 	if (pin)
182 		page_cache_get(page);
183 
184 	ptlrpc_add_bulk_page(desc, page, pageoffset, len);
185 }
186 EXPORT_SYMBOL(__ptlrpc_prep_bulk_page);
187 
188 /**
189  * Uninitialize and free bulk descriptor \a desc.
190  * Works on bulk descriptors both from server and client side.
191  */
__ptlrpc_free_bulk(struct ptlrpc_bulk_desc * desc,int unpin)192 void __ptlrpc_free_bulk(struct ptlrpc_bulk_desc *desc, int unpin)
193 {
194 	int i;
195 
196 	LASSERT(desc != NULL);
197 	LASSERT(desc->bd_iov_count != LI_POISON); /* not freed already */
198 	LASSERT(desc->bd_md_count == 0);	 /* network hands off */
199 	LASSERT((desc->bd_export != NULL) ^ (desc->bd_import != NULL));
200 
201 	sptlrpc_enc_pool_put_pages(desc);
202 
203 	if (desc->bd_export)
204 		class_export_put(desc->bd_export);
205 	else
206 		class_import_put(desc->bd_import);
207 
208 	if (unpin) {
209 		for (i = 0; i < desc->bd_iov_count; i++)
210 			page_cache_release(desc->bd_iov[i].kiov_page);
211 	}
212 
213 	kfree(desc);
214 }
215 EXPORT_SYMBOL(__ptlrpc_free_bulk);
216 
217 /**
218  * Set server timelimit for this req, i.e. how long are we willing to wait
219  * for reply before timing out this request.
220  */
ptlrpc_at_set_req_timeout(struct ptlrpc_request * req)221 void ptlrpc_at_set_req_timeout(struct ptlrpc_request *req)
222 {
223 	__u32 serv_est;
224 	int idx;
225 	struct imp_at *at;
226 
227 	LASSERT(req->rq_import);
228 
229 	if (AT_OFF) {
230 		/*
231 		 * non-AT settings
232 		 *
233 		 * \a imp_server_timeout means this is reverse import and
234 		 * we send (currently only) ASTs to the client and cannot afford
235 		 * to wait too long for the reply, otherwise the other client
236 		 * (because of which we are sending this request) would
237 		 * timeout waiting for us
238 		 */
239 		req->rq_timeout = req->rq_import->imp_server_timeout ?
240 				  obd_timeout / 2 : obd_timeout;
241 	} else {
242 		at = &req->rq_import->imp_at;
243 		idx = import_at_get_index(req->rq_import,
244 					  req->rq_request_portal);
245 		serv_est = at_get(&at->iat_service_estimate[idx]);
246 		req->rq_timeout = at_est2timeout(serv_est);
247 	}
248 	/*
249 	 * We could get even fancier here, using history to predict increased
250 	 * loading...
251 	 */
252 
253 	/*
254 	 * Let the server know what this RPC timeout is by putting it in the
255 	 * reqmsg
256 	 */
257 	lustre_msg_set_timeout(req->rq_reqmsg, req->rq_timeout);
258 }
259 EXPORT_SYMBOL(ptlrpc_at_set_req_timeout);
260 
261 /* Adjust max service estimate based on server value */
ptlrpc_at_adj_service(struct ptlrpc_request * req,unsigned int serv_est)262 static void ptlrpc_at_adj_service(struct ptlrpc_request *req,
263 				  unsigned int serv_est)
264 {
265 	int idx;
266 	unsigned int oldse;
267 	struct imp_at *at;
268 
269 	LASSERT(req->rq_import);
270 	at = &req->rq_import->imp_at;
271 
272 	idx = import_at_get_index(req->rq_import, req->rq_request_portal);
273 	/*
274 	 * max service estimates are tracked on the server side,
275 	 * so just keep minimal history here
276 	 */
277 	oldse = at_measured(&at->iat_service_estimate[idx], serv_est);
278 	if (oldse != 0)
279 		CDEBUG(D_ADAPTTO, "The RPC service estimate for %s ptl %d has changed from %d to %d\n",
280 		       req->rq_import->imp_obd->obd_name, req->rq_request_portal,
281 		       oldse, at_get(&at->iat_service_estimate[idx]));
282 }
283 
284 /* Expected network latency per remote node (secs) */
ptlrpc_at_get_net_latency(struct ptlrpc_request * req)285 int ptlrpc_at_get_net_latency(struct ptlrpc_request *req)
286 {
287 	return AT_OFF ? 0 : at_get(&req->rq_import->imp_at.iat_net_latency);
288 }
289 
290 /* Adjust expected network latency */
ptlrpc_at_adj_net_latency(struct ptlrpc_request * req,unsigned int service_time)291 static void ptlrpc_at_adj_net_latency(struct ptlrpc_request *req,
292 				      unsigned int service_time)
293 {
294 	unsigned int nl, oldnl;
295 	struct imp_at *at;
296 	time64_t now = ktime_get_real_seconds();
297 
298 	LASSERT(req->rq_import);
299 
300 	if (service_time > now - req->rq_sent + 3) {
301 		/*
302 		 * bz16408, however, this can also happen if early reply
303 		 * is lost and client RPC is expired and resent, early reply
304 		 * or reply of original RPC can still be fit in reply buffer
305 		 * of resent RPC, now client is measuring time from the
306 		 * resent time, but server sent back service time of original
307 		 * RPC.
308 		 */
309 		CDEBUG((lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) ?
310 		       D_ADAPTTO : D_WARNING,
311 		       "Reported service time %u > total measured time "
312 		       CFS_DURATION_T"\n", service_time,
313 		       (long)(now - req->rq_sent));
314 		return;
315 	}
316 
317 	/* Network latency is total time less server processing time */
318 	nl = max_t(int, now - req->rq_sent -
319 			service_time, 0) + 1; /* st rounding */
320 	at = &req->rq_import->imp_at;
321 
322 	oldnl = at_measured(&at->iat_net_latency, nl);
323 	if (oldnl != 0)
324 		CDEBUG(D_ADAPTTO, "The network latency for %s (nid %s) has changed from %d to %d\n",
325 		       req->rq_import->imp_obd->obd_name,
326 		       obd_uuid2str(
327 			       &req->rq_import->imp_connection->c_remote_uuid),
328 		       oldnl, at_get(&at->iat_net_latency));
329 }
330 
unpack_reply(struct ptlrpc_request * req)331 static int unpack_reply(struct ptlrpc_request *req)
332 {
333 	int rc;
334 
335 	if (SPTLRPC_FLVR_POLICY(req->rq_flvr.sf_rpc) != SPTLRPC_POLICY_NULL) {
336 		rc = ptlrpc_unpack_rep_msg(req, req->rq_replen);
337 		if (rc) {
338 			DEBUG_REQ(D_ERROR, req, "unpack_rep failed: %d", rc);
339 			return -EPROTO;
340 		}
341 	}
342 
343 	rc = lustre_unpack_rep_ptlrpc_body(req, MSG_PTLRPC_BODY_OFF);
344 	if (rc) {
345 		DEBUG_REQ(D_ERROR, req, "unpack ptlrpc body failed: %d", rc);
346 		return -EPROTO;
347 	}
348 	return 0;
349 }
350 
351 /**
352  * Handle an early reply message, called with the rq_lock held.
353  * If anything goes wrong just ignore it - same as if it never happened
354  */
ptlrpc_at_recv_early_reply(struct ptlrpc_request * req)355 static int ptlrpc_at_recv_early_reply(struct ptlrpc_request *req)
356 {
357 	struct ptlrpc_request *early_req;
358 	time64_t olddl;
359 	int rc;
360 
361 	req->rq_early = 0;
362 	spin_unlock(&req->rq_lock);
363 
364 	rc = sptlrpc_cli_unwrap_early_reply(req, &early_req);
365 	if (rc) {
366 		spin_lock(&req->rq_lock);
367 		return rc;
368 	}
369 
370 	rc = unpack_reply(early_req);
371 	if (rc == 0) {
372 		/* Expecting to increase the service time estimate here */
373 		ptlrpc_at_adj_service(req,
374 			lustre_msg_get_timeout(early_req->rq_repmsg));
375 		ptlrpc_at_adj_net_latency(req,
376 			lustre_msg_get_service_time(early_req->rq_repmsg));
377 	}
378 
379 	sptlrpc_cli_finish_early_reply(early_req);
380 
381 	if (rc != 0) {
382 		spin_lock(&req->rq_lock);
383 		return rc;
384 	}
385 
386 	/* Adjust the local timeout for this req */
387 	ptlrpc_at_set_req_timeout(req);
388 
389 	spin_lock(&req->rq_lock);
390 	olddl = req->rq_deadline;
391 	/*
392 	 * server assumes it now has rq_timeout from when it sent the
393 	 * early reply, so client should give it at least that long.
394 	 */
395 	req->rq_deadline = ktime_get_real_seconds() + req->rq_timeout +
396 			   ptlrpc_at_get_net_latency(req);
397 
398 	DEBUG_REQ(D_ADAPTTO, req,
399 		  "Early reply #%d, new deadline in %lds (%lds)",
400 		  req->rq_early_count,
401 		  (long)(req->rq_deadline - ktime_get_real_seconds()),
402 		  (long)(req->rq_deadline - olddl));
403 
404 	return rc;
405 }
406 
407 static struct kmem_cache *request_cache;
408 
ptlrpc_request_cache_init(void)409 int ptlrpc_request_cache_init(void)
410 {
411 	request_cache = kmem_cache_create("ptlrpc_cache",
412 					  sizeof(struct ptlrpc_request),
413 					  0, SLAB_HWCACHE_ALIGN, NULL);
414 	return request_cache == NULL ? -ENOMEM : 0;
415 }
416 
ptlrpc_request_cache_fini(void)417 void ptlrpc_request_cache_fini(void)
418 {
419 	kmem_cache_destroy(request_cache);
420 }
421 
ptlrpc_request_cache_alloc(gfp_t flags)422 struct ptlrpc_request *ptlrpc_request_cache_alloc(gfp_t flags)
423 {
424 	struct ptlrpc_request *req;
425 
426 	req = kmem_cache_alloc(request_cache, flags | __GFP_ZERO);
427 	return req;
428 }
429 
ptlrpc_request_cache_free(struct ptlrpc_request * req)430 void ptlrpc_request_cache_free(struct ptlrpc_request *req)
431 {
432 	kmem_cache_free(request_cache, req);
433 }
434 
435 /**
436  * Wind down request pool \a pool.
437  * Frees all requests from the pool too
438  */
ptlrpc_free_rq_pool(struct ptlrpc_request_pool * pool)439 void ptlrpc_free_rq_pool(struct ptlrpc_request_pool *pool)
440 {
441 	struct list_head *l, *tmp;
442 	struct ptlrpc_request *req;
443 
444 	LASSERT(pool != NULL);
445 
446 	spin_lock(&pool->prp_lock);
447 	list_for_each_safe(l, tmp, &pool->prp_req_list) {
448 		req = list_entry(l, struct ptlrpc_request, rq_list);
449 		list_del(&req->rq_list);
450 		LASSERT(req->rq_reqbuf);
451 		LASSERT(req->rq_reqbuf_len == pool->prp_rq_size);
452 		kvfree(req->rq_reqbuf);
453 		ptlrpc_request_cache_free(req);
454 	}
455 	spin_unlock(&pool->prp_lock);
456 	kfree(pool);
457 }
458 EXPORT_SYMBOL(ptlrpc_free_rq_pool);
459 
460 /**
461  * Allocates, initializes and adds \a num_rq requests to the pool \a pool
462  */
ptlrpc_add_rqs_to_pool(struct ptlrpc_request_pool * pool,int num_rq)463 int ptlrpc_add_rqs_to_pool(struct ptlrpc_request_pool *pool, int num_rq)
464 {
465 	int i;
466 	int size = 1;
467 
468 	while (size < pool->prp_rq_size)
469 		size <<= 1;
470 
471 	LASSERTF(list_empty(&pool->prp_req_list) ||
472 		 size == pool->prp_rq_size,
473 		 "Trying to change pool size with nonempty pool from %d to %d bytes\n",
474 		 pool->prp_rq_size, size);
475 
476 	spin_lock(&pool->prp_lock);
477 	pool->prp_rq_size = size;
478 	for (i = 0; i < num_rq; i++) {
479 		struct ptlrpc_request *req;
480 		struct lustre_msg *msg;
481 
482 		spin_unlock(&pool->prp_lock);
483 		req = ptlrpc_request_cache_alloc(GFP_NOFS);
484 		if (!req)
485 			return i;
486 		msg = libcfs_kvzalloc(size, GFP_NOFS);
487 		if (!msg) {
488 			ptlrpc_request_cache_free(req);
489 			return i;
490 		}
491 		req->rq_reqbuf = msg;
492 		req->rq_reqbuf_len = size;
493 		req->rq_pool = pool;
494 		spin_lock(&pool->prp_lock);
495 		list_add_tail(&req->rq_list, &pool->prp_req_list);
496 	}
497 	spin_unlock(&pool->prp_lock);
498 	return num_rq;
499 }
500 EXPORT_SYMBOL(ptlrpc_add_rqs_to_pool);
501 
502 /**
503  * Create and initialize new request pool with given attributes:
504  * \a num_rq - initial number of requests to create for the pool
505  * \a msgsize - maximum message size possible for requests in thid pool
506  * \a populate_pool - function to be called when more requests need to be added
507  *		    to the pool
508  * Returns pointer to newly created pool or NULL on error.
509  */
510 struct ptlrpc_request_pool *
ptlrpc_init_rq_pool(int num_rq,int msgsize,int (* populate_pool)(struct ptlrpc_request_pool *,int))511 ptlrpc_init_rq_pool(int num_rq, int msgsize,
512 		    int (*populate_pool)(struct ptlrpc_request_pool *, int))
513 {
514 	struct ptlrpc_request_pool *pool;
515 
516 	pool = kzalloc(sizeof(struct ptlrpc_request_pool), GFP_NOFS);
517 	if (!pool)
518 		return NULL;
519 
520 	/*
521 	 * Request next power of two for the allocation, because internally
522 	 * kernel would do exactly this
523 	 */
524 
525 	spin_lock_init(&pool->prp_lock);
526 	INIT_LIST_HEAD(&pool->prp_req_list);
527 	pool->prp_rq_size = msgsize + SPTLRPC_MAX_PAYLOAD;
528 	pool->prp_populate = populate_pool;
529 
530 	populate_pool(pool, num_rq);
531 
532 	return pool;
533 }
534 EXPORT_SYMBOL(ptlrpc_init_rq_pool);
535 
536 /**
537  * Fetches one request from pool \a pool
538  */
539 static struct ptlrpc_request *
ptlrpc_prep_req_from_pool(struct ptlrpc_request_pool * pool)540 ptlrpc_prep_req_from_pool(struct ptlrpc_request_pool *pool)
541 {
542 	struct ptlrpc_request *request;
543 	struct lustre_msg *reqbuf;
544 
545 	if (!pool)
546 		return NULL;
547 
548 	spin_lock(&pool->prp_lock);
549 
550 	/*
551 	 * See if we have anything in a pool, and bail out if nothing,
552 	 * in writeout path, where this matters, this is safe to do, because
553 	 * nothing is lost in this case, and when some in-flight requests
554 	 * complete, this code will be called again.
555 	 */
556 	if (unlikely(list_empty(&pool->prp_req_list))) {
557 		spin_unlock(&pool->prp_lock);
558 		return NULL;
559 	}
560 
561 	request = list_entry(pool->prp_req_list.next, struct ptlrpc_request,
562 				 rq_list);
563 	list_del_init(&request->rq_list);
564 	spin_unlock(&pool->prp_lock);
565 
566 	LASSERT(request->rq_reqbuf);
567 	LASSERT(request->rq_pool);
568 
569 	reqbuf = request->rq_reqbuf;
570 	memset(request, 0, sizeof(*request));
571 	request->rq_reqbuf = reqbuf;
572 	request->rq_reqbuf_len = pool->prp_rq_size;
573 	request->rq_pool = pool;
574 
575 	return request;
576 }
577 
578 /**
579  * Returns freed \a request to pool.
580  */
__ptlrpc_free_req_to_pool(struct ptlrpc_request * request)581 static void __ptlrpc_free_req_to_pool(struct ptlrpc_request *request)
582 {
583 	struct ptlrpc_request_pool *pool = request->rq_pool;
584 
585 	spin_lock(&pool->prp_lock);
586 	LASSERT(list_empty(&request->rq_list));
587 	LASSERT(!request->rq_receiving_reply);
588 	list_add_tail(&request->rq_list, &pool->prp_req_list);
589 	spin_unlock(&pool->prp_lock);
590 }
591 
__ptlrpc_request_bufs_pack(struct ptlrpc_request * request,__u32 version,int opcode,int count,__u32 * lengths,char ** bufs,struct ptlrpc_cli_ctx * ctx)592 static int __ptlrpc_request_bufs_pack(struct ptlrpc_request *request,
593 				      __u32 version, int opcode,
594 				      int count, __u32 *lengths, char **bufs,
595 				      struct ptlrpc_cli_ctx *ctx)
596 {
597 	struct obd_import *imp = request->rq_import;
598 	int rc;
599 
600 	if (unlikely(ctx))
601 		request->rq_cli_ctx = sptlrpc_cli_ctx_get(ctx);
602 	else {
603 		rc = sptlrpc_req_get_ctx(request);
604 		if (rc)
605 			goto out_free;
606 	}
607 
608 	sptlrpc_req_set_flavor(request, opcode);
609 
610 	rc = lustre_pack_request(request, imp->imp_msg_magic, count,
611 				 lengths, bufs);
612 	if (rc) {
613 		LASSERT(!request->rq_pool);
614 		goto out_ctx;
615 	}
616 
617 	lustre_msg_add_version(request->rq_reqmsg, version);
618 	request->rq_send_state = LUSTRE_IMP_FULL;
619 	request->rq_type = PTL_RPC_MSG_REQUEST;
620 	request->rq_export = NULL;
621 
622 	request->rq_req_cbid.cbid_fn = request_out_callback;
623 	request->rq_req_cbid.cbid_arg = request;
624 
625 	request->rq_reply_cbid.cbid_fn = reply_in_callback;
626 	request->rq_reply_cbid.cbid_arg = request;
627 
628 	request->rq_reply_deadline = 0;
629 	request->rq_phase = RQ_PHASE_NEW;
630 	request->rq_next_phase = RQ_PHASE_UNDEFINED;
631 
632 	request->rq_request_portal = imp->imp_client->cli_request_portal;
633 	request->rq_reply_portal = imp->imp_client->cli_reply_portal;
634 
635 	ptlrpc_at_set_req_timeout(request);
636 
637 	spin_lock_init(&request->rq_lock);
638 	INIT_LIST_HEAD(&request->rq_list);
639 	INIT_LIST_HEAD(&request->rq_timed_list);
640 	INIT_LIST_HEAD(&request->rq_replay_list);
641 	INIT_LIST_HEAD(&request->rq_ctx_chain);
642 	INIT_LIST_HEAD(&request->rq_set_chain);
643 	INIT_LIST_HEAD(&request->rq_history_list);
644 	INIT_LIST_HEAD(&request->rq_exp_list);
645 	init_waitqueue_head(&request->rq_reply_waitq);
646 	init_waitqueue_head(&request->rq_set_waitq);
647 	request->rq_xid = ptlrpc_next_xid();
648 	atomic_set(&request->rq_refcount, 1);
649 
650 	lustre_msg_set_opc(request->rq_reqmsg, opcode);
651 
652 	return 0;
653 out_ctx:
654 	sptlrpc_cli_ctx_put(request->rq_cli_ctx, 1);
655 out_free:
656 	class_import_put(imp);
657 	return rc;
658 }
659 
ptlrpc_request_bufs_pack(struct ptlrpc_request * request,__u32 version,int opcode,char ** bufs,struct ptlrpc_cli_ctx * ctx)660 int ptlrpc_request_bufs_pack(struct ptlrpc_request *request,
661 			     __u32 version, int opcode, char **bufs,
662 			     struct ptlrpc_cli_ctx *ctx)
663 {
664 	int count;
665 
666 	count = req_capsule_filled_sizes(&request->rq_pill, RCL_CLIENT);
667 	return __ptlrpc_request_bufs_pack(request, version, opcode, count,
668 					  request->rq_pill.rc_area[RCL_CLIENT],
669 					  bufs, ctx);
670 }
671 EXPORT_SYMBOL(ptlrpc_request_bufs_pack);
672 
673 /**
674  * Pack request buffers for network transfer, performing necessary encryption
675  * steps if necessary.
676  */
ptlrpc_request_pack(struct ptlrpc_request * request,__u32 version,int opcode)677 int ptlrpc_request_pack(struct ptlrpc_request *request,
678 			__u32 version, int opcode)
679 {
680 	int rc;
681 
682 	rc = ptlrpc_request_bufs_pack(request, version, opcode, NULL, NULL);
683 	if (rc)
684 		return rc;
685 
686 	/*
687 	 * For some old 1.8 clients (< 1.8.7), they will LASSERT the size of
688 	 * ptlrpc_body sent from server equal to local ptlrpc_body size, so we
689 	 * have to send old ptlrpc_body to keep interoperability with these
690 	 * clients.
691 	 *
692 	 * Only three kinds of server->client RPCs so far:
693 	 *  - LDLM_BL_CALLBACK
694 	 *  - LDLM_CP_CALLBACK
695 	 *  - LDLM_GL_CALLBACK
696 	 *
697 	 * XXX This should be removed whenever we drop the interoperability with
698 	 *     the these old clients.
699 	 */
700 	if (opcode == LDLM_BL_CALLBACK || opcode == LDLM_CP_CALLBACK ||
701 	    opcode == LDLM_GL_CALLBACK)
702 		req_capsule_shrink(&request->rq_pill, &RMF_PTLRPC_BODY,
703 				   sizeof(struct ptlrpc_body_v2), RCL_CLIENT);
704 
705 	return rc;
706 }
707 EXPORT_SYMBOL(ptlrpc_request_pack);
708 
709 /**
710  * Helper function to allocate new request on import \a imp
711  * and possibly using existing request from pool \a pool if provided.
712  * Returns allocated request structure with import field filled or
713  * NULL on error.
714  */
715 static inline
__ptlrpc_request_alloc(struct obd_import * imp,struct ptlrpc_request_pool * pool)716 struct ptlrpc_request *__ptlrpc_request_alloc(struct obd_import *imp,
717 					      struct ptlrpc_request_pool *pool)
718 {
719 	struct ptlrpc_request *request;
720 
721 	request = ptlrpc_request_cache_alloc(GFP_NOFS);
722 
723 	if (!request && pool)
724 		request = ptlrpc_prep_req_from_pool(pool);
725 
726 	if (request) {
727 		LASSERTF((unsigned long)imp > 0x1000, "%p", imp);
728 		LASSERT(imp != LP_POISON);
729 		LASSERTF((unsigned long)imp->imp_client > 0x1000, "%p",
730 			imp->imp_client);
731 		LASSERT(imp->imp_client != LP_POISON);
732 
733 		request->rq_import = class_import_get(imp);
734 	} else {
735 		CERROR("request allocation out of memory\n");
736 	}
737 
738 	return request;
739 }
740 
741 /**
742  * Helper function for creating a request.
743  * Calls __ptlrpc_request_alloc to allocate new request structure and inits
744  * buffer structures according to capsule template \a format.
745  * Returns allocated request structure pointer or NULL on error.
746  */
747 static struct ptlrpc_request *
ptlrpc_request_alloc_internal(struct obd_import * imp,struct ptlrpc_request_pool * pool,const struct req_format * format)748 ptlrpc_request_alloc_internal(struct obd_import *imp,
749 			      struct ptlrpc_request_pool *pool,
750 			      const struct req_format *format)
751 {
752 	struct ptlrpc_request *request;
753 
754 	request = __ptlrpc_request_alloc(imp, pool);
755 	if (request == NULL)
756 		return NULL;
757 
758 	req_capsule_init(&request->rq_pill, request, RCL_CLIENT);
759 	req_capsule_set(&request->rq_pill, format);
760 	return request;
761 }
762 
763 /**
764  * Allocate new request structure for import \a imp and initialize its
765  * buffer structure according to capsule template \a format.
766  */
ptlrpc_request_alloc(struct obd_import * imp,const struct req_format * format)767 struct ptlrpc_request *ptlrpc_request_alloc(struct obd_import *imp,
768 					    const struct req_format *format)
769 {
770 	return ptlrpc_request_alloc_internal(imp, NULL, format);
771 }
772 EXPORT_SYMBOL(ptlrpc_request_alloc);
773 
774 /**
775  * Allocate new request structure for import \a imp from pool \a pool and
776  * initialize its buffer structure according to capsule template \a format.
777  */
ptlrpc_request_alloc_pool(struct obd_import * imp,struct ptlrpc_request_pool * pool,const struct req_format * format)778 struct ptlrpc_request *ptlrpc_request_alloc_pool(struct obd_import *imp,
779 						 struct ptlrpc_request_pool *pool,
780 						 const struct req_format *format)
781 {
782 	return ptlrpc_request_alloc_internal(imp, pool, format);
783 }
784 EXPORT_SYMBOL(ptlrpc_request_alloc_pool);
785 
786 /**
787  * For requests not from pool, free memory of the request structure.
788  * For requests obtained from a pool earlier, return request back to pool.
789  */
ptlrpc_request_free(struct ptlrpc_request * request)790 void ptlrpc_request_free(struct ptlrpc_request *request)
791 {
792 	if (request->rq_pool)
793 		__ptlrpc_free_req_to_pool(request);
794 	else
795 		ptlrpc_request_cache_free(request);
796 }
797 EXPORT_SYMBOL(ptlrpc_request_free);
798 
799 /**
800  * Allocate new request for operation \a opcode and immediately pack it for
801  * network transfer.
802  * Only used for simple requests like OBD_PING where the only important
803  * part of the request is operation itself.
804  * Returns allocated request or NULL on error.
805  */
ptlrpc_request_alloc_pack(struct obd_import * imp,const struct req_format * format,__u32 version,int opcode)806 struct ptlrpc_request *ptlrpc_request_alloc_pack(struct obd_import *imp,
807 						 const struct req_format *format,
808 						 __u32 version, int opcode)
809 {
810 	struct ptlrpc_request *req = ptlrpc_request_alloc(imp, format);
811 	int rc;
812 
813 	if (req) {
814 		rc = ptlrpc_request_pack(req, version, opcode);
815 		if (rc) {
816 			ptlrpc_request_free(req);
817 			req = NULL;
818 		}
819 	}
820 	return req;
821 }
822 EXPORT_SYMBOL(ptlrpc_request_alloc_pack);
823 
824 /**
825  * Allocate and initialize new request set structure on the current CPT.
826  * Returns a pointer to the newly allocated set structure or NULL on error.
827  */
ptlrpc_prep_set(void)828 struct ptlrpc_request_set *ptlrpc_prep_set(void)
829 {
830 	struct ptlrpc_request_set *set;
831 	int cpt;
832 
833 	cpt = cfs_cpt_current(cfs_cpt_table, 0);
834 	set = kzalloc_node(sizeof(*set), GFP_NOFS,
835 			   cfs_cpt_spread_node(cfs_cpt_table, cpt));
836 	if (!set)
837 		return NULL;
838 	atomic_set(&set->set_refcount, 1);
839 	INIT_LIST_HEAD(&set->set_requests);
840 	init_waitqueue_head(&set->set_waitq);
841 	atomic_set(&set->set_new_count, 0);
842 	atomic_set(&set->set_remaining, 0);
843 	spin_lock_init(&set->set_new_req_lock);
844 	INIT_LIST_HEAD(&set->set_new_requests);
845 	INIT_LIST_HEAD(&set->set_cblist);
846 	set->set_max_inflight = UINT_MAX;
847 	set->set_producer = NULL;
848 	set->set_producer_arg = NULL;
849 	set->set_rc = 0;
850 
851 	return set;
852 }
853 EXPORT_SYMBOL(ptlrpc_prep_set);
854 
855 /**
856  * Allocate and initialize new request set structure with flow control
857  * extension. This extension allows to control the number of requests in-flight
858  * for the whole set. A callback function to generate requests must be provided
859  * and the request set will keep the number of requests sent over the wire to
860  * @max_inflight.
861  * Returns a pointer to the newly allocated set structure or NULL on error.
862  */
ptlrpc_prep_fcset(int max,set_producer_func func,void * arg)863 struct ptlrpc_request_set *ptlrpc_prep_fcset(int max, set_producer_func func,
864 					     void *arg)
865 
866 {
867 	struct ptlrpc_request_set *set;
868 
869 	set = ptlrpc_prep_set();
870 	if (!set)
871 		return NULL;
872 
873 	set->set_max_inflight = max;
874 	set->set_producer = func;
875 	set->set_producer_arg = arg;
876 
877 	return set;
878 }
879 EXPORT_SYMBOL(ptlrpc_prep_fcset);
880 
881 /**
882  * Wind down and free request set structure previously allocated with
883  * ptlrpc_prep_set.
884  * Ensures that all requests on the set have completed and removes
885  * all requests from the request list in a set.
886  * If any unsent request happen to be on the list, pretends that they got
887  * an error in flight and calls their completion handler.
888  */
ptlrpc_set_destroy(struct ptlrpc_request_set * set)889 void ptlrpc_set_destroy(struct ptlrpc_request_set *set)
890 {
891 	struct list_head *tmp;
892 	struct list_head *next;
893 	int expected_phase;
894 	int n = 0;
895 
896 	/* Requests on the set should either all be completed, or all be new */
897 	expected_phase = (atomic_read(&set->set_remaining) == 0) ?
898 			 RQ_PHASE_COMPLETE : RQ_PHASE_NEW;
899 	list_for_each(tmp, &set->set_requests) {
900 		struct ptlrpc_request *req =
901 			list_entry(tmp, struct ptlrpc_request,
902 				       rq_set_chain);
903 
904 		LASSERT(req->rq_phase == expected_phase);
905 		n++;
906 	}
907 
908 	LASSERTF(atomic_read(&set->set_remaining) == 0 ||
909 		 atomic_read(&set->set_remaining) == n, "%d / %d\n",
910 		 atomic_read(&set->set_remaining), n);
911 
912 	list_for_each_safe(tmp, next, &set->set_requests) {
913 		struct ptlrpc_request *req =
914 			list_entry(tmp, struct ptlrpc_request,
915 				       rq_set_chain);
916 		list_del_init(&req->rq_set_chain);
917 
918 		LASSERT(req->rq_phase == expected_phase);
919 
920 		if (req->rq_phase == RQ_PHASE_NEW) {
921 			ptlrpc_req_interpret(NULL, req, -EBADR);
922 			atomic_dec(&set->set_remaining);
923 		}
924 
925 		spin_lock(&req->rq_lock);
926 		req->rq_set = NULL;
927 		req->rq_invalid_rqset = 0;
928 		spin_unlock(&req->rq_lock);
929 
930 		ptlrpc_req_finished(req);
931 	}
932 
933 	LASSERT(atomic_read(&set->set_remaining) == 0);
934 
935 	ptlrpc_reqset_put(set);
936 }
937 EXPORT_SYMBOL(ptlrpc_set_destroy);
938 
939 /**
940  * Add a new request to the general purpose request set.
941  * Assumes request reference from the caller.
942  */
ptlrpc_set_add_req(struct ptlrpc_request_set * set,struct ptlrpc_request * req)943 void ptlrpc_set_add_req(struct ptlrpc_request_set *set,
944 			struct ptlrpc_request *req)
945 {
946 	LASSERT(list_empty(&req->rq_set_chain));
947 
948 	/* The set takes over the caller's request reference */
949 	list_add_tail(&req->rq_set_chain, &set->set_requests);
950 	req->rq_set = set;
951 	atomic_inc(&set->set_remaining);
952 	req->rq_queued_time = cfs_time_current();
953 
954 	if (req->rq_reqmsg != NULL)
955 		lustre_msg_set_jobid(req->rq_reqmsg, NULL);
956 
957 	if (set->set_producer != NULL)
958 		/*
959 		 * If the request set has a producer callback, the RPC must be
960 		 * sent straight away
961 		 */
962 		ptlrpc_send_new_req(req);
963 }
964 EXPORT_SYMBOL(ptlrpc_set_add_req);
965 
966 /**
967  * Add a request to a request with dedicated server thread
968  * and wake the thread to make any necessary processing.
969  * Currently only used for ptlrpcd.
970  */
ptlrpc_set_add_new_req(struct ptlrpcd_ctl * pc,struct ptlrpc_request * req)971 void ptlrpc_set_add_new_req(struct ptlrpcd_ctl *pc,
972 			    struct ptlrpc_request *req)
973 {
974 	struct ptlrpc_request_set *set = pc->pc_set;
975 	int count, i;
976 
977 	LASSERT(req->rq_set == NULL);
978 	LASSERT(test_bit(LIOD_STOP, &pc->pc_flags) == 0);
979 
980 	spin_lock(&set->set_new_req_lock);
981 	/* The set takes over the caller's request reference.  */
982 	req->rq_set = set;
983 	req->rq_queued_time = cfs_time_current();
984 	list_add_tail(&req->rq_set_chain, &set->set_new_requests);
985 	count = atomic_inc_return(&set->set_new_count);
986 	spin_unlock(&set->set_new_req_lock);
987 
988 	/* Only need to call wakeup once for the first entry. */
989 	if (count == 1) {
990 		wake_up(&set->set_waitq);
991 
992 		/*
993 		 * XXX: It maybe unnecessary to wakeup all the partners. But to
994 		 *      guarantee the async RPC can be processed ASAP, we have
995 		 *      no other better choice. It maybe fixed in future.
996 		 */
997 		for (i = 0; i < pc->pc_npartners; i++)
998 			wake_up(&pc->pc_partners[i]->pc_set->set_waitq);
999 	}
1000 }
1001 EXPORT_SYMBOL(ptlrpc_set_add_new_req);
1002 
1003 /**
1004  * Based on the current state of the import, determine if the request
1005  * can be sent, is an error, or should be delayed.
1006  *
1007  * Returns true if this request should be delayed. If false, and
1008  * *status is set, then the request can not be sent and *status is the
1009  * error code.  If false and status is 0, then request can be sent.
1010  *
1011  * The imp->imp_lock must be held.
1012  */
ptlrpc_import_delay_req(struct obd_import * imp,struct ptlrpc_request * req,int * status)1013 static int ptlrpc_import_delay_req(struct obd_import *imp,
1014 				   struct ptlrpc_request *req, int *status)
1015 {
1016 	int delay = 0;
1017 
1018 	LASSERT(status != NULL);
1019 	*status = 0;
1020 
1021 	if (req->rq_ctx_init || req->rq_ctx_fini) {
1022 		/* always allow ctx init/fini rpc go through */
1023 	} else if (imp->imp_state == LUSTRE_IMP_NEW) {
1024 		DEBUG_REQ(D_ERROR, req, "Uninitialized import.");
1025 		*status = -EIO;
1026 	} else if (imp->imp_state == LUSTRE_IMP_CLOSED) {
1027 		/* pings may safely race with umount */
1028 		DEBUG_REQ(lustre_msg_get_opc(req->rq_reqmsg) == OBD_PING ?
1029 			  D_HA : D_ERROR, req, "IMP_CLOSED ");
1030 		*status = -EIO;
1031 	} else if (ptlrpc_send_limit_expired(req)) {
1032 		/* probably doesn't need to be a D_ERROR after initial testing */
1033 		DEBUG_REQ(D_ERROR, req, "send limit expired ");
1034 		*status = -EIO;
1035 	} else if (req->rq_send_state == LUSTRE_IMP_CONNECTING &&
1036 		   imp->imp_state == LUSTRE_IMP_CONNECTING) {
1037 		/* allow CONNECT even if import is invalid */
1038 		if (atomic_read(&imp->imp_inval_count) != 0) {
1039 			DEBUG_REQ(D_ERROR, req, "invalidate in flight");
1040 			*status = -EIO;
1041 		}
1042 	} else if (imp->imp_invalid || imp->imp_obd->obd_no_recov) {
1043 		if (!imp->imp_deactive)
1044 			DEBUG_REQ(D_NET, req, "IMP_INVALID");
1045 		*status = -ESHUTDOWN; /* bz 12940 */
1046 	} else if (req->rq_import_generation != imp->imp_generation) {
1047 		DEBUG_REQ(D_ERROR, req, "req wrong generation:");
1048 		*status = -EIO;
1049 	} else if (req->rq_send_state != imp->imp_state) {
1050 		/* invalidate in progress - any requests should be drop */
1051 		if (atomic_read(&imp->imp_inval_count) != 0) {
1052 			DEBUG_REQ(D_ERROR, req, "invalidate in flight");
1053 			*status = -EIO;
1054 		} else if (imp->imp_dlm_fake || req->rq_no_delay) {
1055 			*status = -EWOULDBLOCK;
1056 		} else if (req->rq_allow_replay &&
1057 			  (imp->imp_state == LUSTRE_IMP_REPLAY ||
1058 			   imp->imp_state == LUSTRE_IMP_REPLAY_LOCKS ||
1059 			   imp->imp_state == LUSTRE_IMP_REPLAY_WAIT ||
1060 			   imp->imp_state == LUSTRE_IMP_RECOVER)) {
1061 			DEBUG_REQ(D_HA, req, "allow during recovery.\n");
1062 		} else {
1063 			delay = 1;
1064 		}
1065 	}
1066 
1067 	return delay;
1068 }
1069 
1070 /**
1071  * Decide if the error message regarding provided request \a req
1072  * should be printed to the console or not.
1073  * Makes it's decision on request status and other properties.
1074  * Returns 1 to print error on the system console or 0 if not.
1075  */
ptlrpc_console_allow(struct ptlrpc_request * req)1076 static int ptlrpc_console_allow(struct ptlrpc_request *req)
1077 {
1078 	__u32 opc;
1079 	int err;
1080 
1081 	LASSERT(req->rq_reqmsg != NULL);
1082 	opc = lustre_msg_get_opc(req->rq_reqmsg);
1083 
1084 	/*
1085 	 * Suppress particular reconnect errors which are to be expected.  No
1086 	 * errors are suppressed for the initial connection on an import
1087 	 */
1088 	if ((lustre_handle_is_used(&req->rq_import->imp_remote_handle)) &&
1089 	    (opc == OST_CONNECT || opc == MDS_CONNECT || opc == MGS_CONNECT)) {
1090 
1091 		/* Suppress timed out reconnect requests */
1092 		if (req->rq_timedout)
1093 			return 0;
1094 
1095 		/* Suppress unavailable/again reconnect requests */
1096 		err = lustre_msg_get_status(req->rq_repmsg);
1097 		if (err == -ENODEV || err == -EAGAIN)
1098 			return 0;
1099 	}
1100 
1101 	return 1;
1102 }
1103 
1104 /**
1105  * Check request processing status.
1106  * Returns the status.
1107  */
ptlrpc_check_status(struct ptlrpc_request * req)1108 static int ptlrpc_check_status(struct ptlrpc_request *req)
1109 {
1110 	int err;
1111 
1112 	err = lustre_msg_get_status(req->rq_repmsg);
1113 	if (lustre_msg_get_type(req->rq_repmsg) == PTL_RPC_MSG_ERR) {
1114 		struct obd_import *imp = req->rq_import;
1115 		__u32 opc = lustre_msg_get_opc(req->rq_reqmsg);
1116 
1117 		if (ptlrpc_console_allow(req))
1118 			LCONSOLE_ERROR_MSG(0x011, "%s: Communicating with %s, operation %s failed with %d.\n",
1119 					   imp->imp_obd->obd_name,
1120 					   libcfs_nid2str(
1121 						   imp->imp_connection->c_peer.nid),
1122 					   ll_opcode2str(opc), err);
1123 		return err < 0 ? err : -EINVAL;
1124 	}
1125 
1126 	if (err < 0)
1127 		DEBUG_REQ(D_INFO, req, "status is %d", err);
1128 	else if (err > 0)
1129 		/* XXX: translate this error from net to host */
1130 		DEBUG_REQ(D_INFO, req, "status is %d", err);
1131 
1132 	return err;
1133 }
1134 
1135 /**
1136  * save pre-versions of objects into request for replay.
1137  * Versions are obtained from server reply.
1138  * used for VBR.
1139  */
ptlrpc_save_versions(struct ptlrpc_request * req)1140 static void ptlrpc_save_versions(struct ptlrpc_request *req)
1141 {
1142 	struct lustre_msg *repmsg = req->rq_repmsg;
1143 	struct lustre_msg *reqmsg = req->rq_reqmsg;
1144 	__u64 *versions = lustre_msg_get_versions(repmsg);
1145 
1146 	if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)
1147 		return;
1148 
1149 	LASSERT(versions);
1150 	lustre_msg_set_versions(reqmsg, versions);
1151 	CDEBUG(D_INFO, "Client save versions [%#llx/%#llx]\n",
1152 	       versions[0], versions[1]);
1153 }
1154 
1155 /**
1156  * Callback function called when client receives RPC reply for \a req.
1157  * Returns 0 on success or error code.
1158  * The return value would be assigned to req->rq_status by the caller
1159  * as request processing status.
1160  * This function also decides if the request needs to be saved for later replay.
1161  */
after_reply(struct ptlrpc_request * req)1162 static int after_reply(struct ptlrpc_request *req)
1163 {
1164 	struct obd_import *imp = req->rq_import;
1165 	struct obd_device *obd = req->rq_import->imp_obd;
1166 	int rc;
1167 	struct timespec64 work_start;
1168 	long timediff;
1169 
1170 	LASSERT(obd != NULL);
1171 	/* repbuf must be unlinked */
1172 	LASSERT(!req->rq_receiving_reply && !req->rq_reply_unlink);
1173 
1174 	if (req->rq_reply_truncate) {
1175 		if (ptlrpc_no_resend(req)) {
1176 			DEBUG_REQ(D_ERROR, req, "reply buffer overflow, expected: %d, actual size: %d",
1177 				  req->rq_nob_received, req->rq_repbuf_len);
1178 			return -EOVERFLOW;
1179 		}
1180 
1181 		sptlrpc_cli_free_repbuf(req);
1182 		/*
1183 		 * Pass the required reply buffer size (include space for early
1184 		 * reply).  NB: no need to round up because alloc_repbuf will
1185 		 * round it up
1186 		 */
1187 		req->rq_replen       = req->rq_nob_received;
1188 		req->rq_nob_received = 0;
1189 		spin_lock(&req->rq_lock);
1190 		req->rq_resend       = 1;
1191 		spin_unlock(&req->rq_lock);
1192 		return 0;
1193 	}
1194 
1195 	/*
1196 	 * NB Until this point, the whole of the incoming message,
1197 	 * including buflens, status etc is in the sender's byte order.
1198 	 */
1199 	rc = sptlrpc_cli_unwrap_reply(req);
1200 	if (rc) {
1201 		DEBUG_REQ(D_ERROR, req, "unwrap reply failed (%d):", rc);
1202 		return rc;
1203 	}
1204 
1205 	/* Security layer unwrap might ask resend this request. */
1206 	if (req->rq_resend)
1207 		return 0;
1208 
1209 	rc = unpack_reply(req);
1210 	if (rc)
1211 		return rc;
1212 
1213 	/* retry indefinitely on EINPROGRESS */
1214 	if (lustre_msg_get_status(req->rq_repmsg) == -EINPROGRESS &&
1215 	    ptlrpc_no_resend(req) == 0 && !req->rq_no_retry_einprogress) {
1216 		time64_t now = ktime_get_real_seconds();
1217 
1218 		DEBUG_REQ(D_RPCTRACE, req, "Resending request on EINPROGRESS");
1219 		spin_lock(&req->rq_lock);
1220 		req->rq_resend = 1;
1221 		spin_unlock(&req->rq_lock);
1222 		req->rq_nr_resend++;
1223 
1224 		/* allocate new xid to avoid reply reconstruction */
1225 		if (!req->rq_bulk) {
1226 			/* new xid is already allocated for bulk in ptlrpc_check_set() */
1227 			req->rq_xid = ptlrpc_next_xid();
1228 			DEBUG_REQ(D_RPCTRACE, req, "Allocating new xid for resend on EINPROGRESS");
1229 		}
1230 
1231 		/* Readjust the timeout for current conditions */
1232 		ptlrpc_at_set_req_timeout(req);
1233 		/*
1234 		 * delay resend to give a chance to the server to get ready.
1235 		 * The delay is increased by 1s on every resend and is capped to
1236 		 * the current request timeout (i.e. obd_timeout if AT is off,
1237 		 * or AT service time x 125% + 5s, see at_est2timeout)
1238 		 */
1239 		if (req->rq_nr_resend > req->rq_timeout)
1240 			req->rq_sent = now + req->rq_timeout;
1241 		else
1242 			req->rq_sent = now + req->rq_nr_resend;
1243 
1244 		return 0;
1245 	}
1246 
1247 	ktime_get_real_ts64(&work_start);
1248 	timediff = (work_start.tv_sec - req->rq_arrival_time.tv_sec) * USEC_PER_SEC +
1249 		   (work_start.tv_nsec - req->rq_arrival_time.tv_nsec) / NSEC_PER_USEC;
1250 	if (obd->obd_svc_stats != NULL) {
1251 		lprocfs_counter_add(obd->obd_svc_stats, PTLRPC_REQWAIT_CNTR,
1252 				    timediff);
1253 		ptlrpc_lprocfs_rpc_sent(req, timediff);
1254 	}
1255 
1256 	if (lustre_msg_get_type(req->rq_repmsg) != PTL_RPC_MSG_REPLY &&
1257 	    lustre_msg_get_type(req->rq_repmsg) != PTL_RPC_MSG_ERR) {
1258 		DEBUG_REQ(D_ERROR, req, "invalid packet received (type=%u)",
1259 			  lustre_msg_get_type(req->rq_repmsg));
1260 		return -EPROTO;
1261 	}
1262 
1263 	if (lustre_msg_get_opc(req->rq_reqmsg) != OBD_PING)
1264 		CFS_FAIL_TIMEOUT(OBD_FAIL_PTLRPC_PAUSE_REP, cfs_fail_val);
1265 	ptlrpc_at_adj_service(req, lustre_msg_get_timeout(req->rq_repmsg));
1266 	ptlrpc_at_adj_net_latency(req,
1267 				  lustre_msg_get_service_time(req->rq_repmsg));
1268 
1269 	rc = ptlrpc_check_status(req);
1270 	imp->imp_connect_error = rc;
1271 
1272 	if (rc) {
1273 		/*
1274 		 * Either we've been evicted, or the server has failed for
1275 		 * some reason. Try to reconnect, and if that fails, punt to
1276 		 * the upcall.
1277 		 */
1278 		if (ll_rpc_recoverable_error(rc)) {
1279 			if (req->rq_send_state != LUSTRE_IMP_FULL ||
1280 			    imp->imp_obd->obd_no_recov || imp->imp_dlm_fake) {
1281 				return rc;
1282 			}
1283 			ptlrpc_request_handle_notconn(req);
1284 			return rc;
1285 		}
1286 	} else {
1287 		/*
1288 		 * Let's look if server sent slv. Do it only for RPC with
1289 		 * rc == 0.
1290 		 */
1291 		ldlm_cli_update_pool(req);
1292 	}
1293 
1294 	/* Store transno in reqmsg for replay. */
1295 	if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)) {
1296 		req->rq_transno = lustre_msg_get_transno(req->rq_repmsg);
1297 		lustre_msg_set_transno(req->rq_reqmsg, req->rq_transno);
1298 	}
1299 
1300 	if (imp->imp_replayable) {
1301 		spin_lock(&imp->imp_lock);
1302 		/*
1303 		 * No point in adding already-committed requests to the replay
1304 		 * list, we will just remove them immediately. b=9829
1305 		 */
1306 		if (req->rq_transno != 0 &&
1307 		    (req->rq_transno >
1308 		     lustre_msg_get_last_committed(req->rq_repmsg) ||
1309 		     req->rq_replay)) {
1310 			/* version recovery */
1311 			ptlrpc_save_versions(req);
1312 			ptlrpc_retain_replayable_request(req, imp);
1313 		} else if (req->rq_commit_cb != NULL &&
1314 			   list_empty(&req->rq_replay_list)) {
1315 			/*
1316 			 * NB: don't call rq_commit_cb if it's already on
1317 			 * rq_replay_list, ptlrpc_free_committed() will call
1318 			 * it later, see LU-3618 for details
1319 			 */
1320 			spin_unlock(&imp->imp_lock);
1321 			req->rq_commit_cb(req);
1322 			spin_lock(&imp->imp_lock);
1323 		}
1324 
1325 		/* Replay-enabled imports return commit-status information. */
1326 		if (lustre_msg_get_last_committed(req->rq_repmsg)) {
1327 			imp->imp_peer_committed_transno =
1328 				lustre_msg_get_last_committed(req->rq_repmsg);
1329 		}
1330 
1331 		ptlrpc_free_committed(imp);
1332 
1333 		if (!list_empty(&imp->imp_replay_list)) {
1334 			struct ptlrpc_request *last;
1335 
1336 			last = list_entry(imp->imp_replay_list.prev,
1337 					      struct ptlrpc_request,
1338 					      rq_replay_list);
1339 			/*
1340 			 * Requests with rq_replay stay on the list even if no
1341 			 * commit is expected.
1342 			 */
1343 			if (last->rq_transno > imp->imp_peer_committed_transno)
1344 				ptlrpc_pinger_commit_expected(imp);
1345 		}
1346 
1347 		spin_unlock(&imp->imp_lock);
1348 	}
1349 
1350 	return rc;
1351 }
1352 
1353 /**
1354  * Helper function to send request \a req over the network for the first time
1355  * Also adjusts request phase.
1356  * Returns 0 on success or error code.
1357  */
ptlrpc_send_new_req(struct ptlrpc_request * req)1358 static int ptlrpc_send_new_req(struct ptlrpc_request *req)
1359 {
1360 	struct obd_import *imp = req->rq_import;
1361 	int rc;
1362 
1363 	LASSERT(req->rq_phase == RQ_PHASE_NEW);
1364 	if (req->rq_sent && (req->rq_sent > ktime_get_real_seconds()) &&
1365 	    (!req->rq_generation_set ||
1366 	     req->rq_import_generation == imp->imp_generation))
1367 		return 0;
1368 
1369 	ptlrpc_rqphase_move(req, RQ_PHASE_RPC);
1370 
1371 	spin_lock(&imp->imp_lock);
1372 
1373 	if (!req->rq_generation_set)
1374 		req->rq_import_generation = imp->imp_generation;
1375 
1376 	if (ptlrpc_import_delay_req(imp, req, &rc)) {
1377 		spin_lock(&req->rq_lock);
1378 		req->rq_waiting = 1;
1379 		spin_unlock(&req->rq_lock);
1380 
1381 		DEBUG_REQ(D_HA, req, "req from PID %d waiting for recovery: (%s != %s)",
1382 			  lustre_msg_get_status(req->rq_reqmsg),
1383 			  ptlrpc_import_state_name(req->rq_send_state),
1384 			  ptlrpc_import_state_name(imp->imp_state));
1385 		LASSERT(list_empty(&req->rq_list));
1386 		list_add_tail(&req->rq_list, &imp->imp_delayed_list);
1387 		atomic_inc(&req->rq_import->imp_inflight);
1388 		spin_unlock(&imp->imp_lock);
1389 		return 0;
1390 	}
1391 
1392 	if (rc != 0) {
1393 		spin_unlock(&imp->imp_lock);
1394 		req->rq_status = rc;
1395 		ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
1396 		return rc;
1397 	}
1398 
1399 	LASSERT(list_empty(&req->rq_list));
1400 	list_add_tail(&req->rq_list, &imp->imp_sending_list);
1401 	atomic_inc(&req->rq_import->imp_inflight);
1402 	spin_unlock(&imp->imp_lock);
1403 
1404 	lustre_msg_set_status(req->rq_reqmsg, current_pid());
1405 
1406 	rc = sptlrpc_req_refresh_ctx(req, -1);
1407 	if (rc) {
1408 		if (req->rq_err) {
1409 			req->rq_status = rc;
1410 			return 1;
1411 		}
1412 		spin_lock(&req->rq_lock);
1413 		req->rq_wait_ctx = 1;
1414 		spin_unlock(&req->rq_lock);
1415 		return 0;
1416 	}
1417 
1418 	CDEBUG(D_RPCTRACE, "Sending RPC pname:cluuid:pid:xid:nid:opc %s:%s:%d:%llu:%s:%d\n",
1419 	       current_comm(),
1420 	       imp->imp_obd->obd_uuid.uuid,
1421 	       lustre_msg_get_status(req->rq_reqmsg), req->rq_xid,
1422 	       libcfs_nid2str(imp->imp_connection->c_peer.nid),
1423 	       lustre_msg_get_opc(req->rq_reqmsg));
1424 
1425 	rc = ptl_send_rpc(req, 0);
1426 	if (rc) {
1427 		DEBUG_REQ(D_HA, req, "send failed (%d); expect timeout", rc);
1428 		spin_lock(&req->rq_lock);
1429 		req->rq_net_err = 1;
1430 		spin_unlock(&req->rq_lock);
1431 		return rc;
1432 	}
1433 	return 0;
1434 }
1435 
ptlrpc_set_producer(struct ptlrpc_request_set * set)1436 static inline int ptlrpc_set_producer(struct ptlrpc_request_set *set)
1437 {
1438 	int remaining, rc;
1439 
1440 	LASSERT(set->set_producer != NULL);
1441 
1442 	remaining = atomic_read(&set->set_remaining);
1443 
1444 	/*
1445 	 * populate the ->set_requests list with requests until we
1446 	 * reach the maximum number of RPCs in flight for this set
1447 	 */
1448 	while (atomic_read(&set->set_remaining) < set->set_max_inflight) {
1449 		rc = set->set_producer(set, set->set_producer_arg);
1450 		if (rc == -ENOENT) {
1451 			/* no more RPC to produce */
1452 			set->set_producer     = NULL;
1453 			set->set_producer_arg = NULL;
1454 			return 0;
1455 		}
1456 	}
1457 
1458 	return (atomic_read(&set->set_remaining) - remaining);
1459 }
1460 
1461 /**
1462  * this sends any unsent RPCs in \a set and returns 1 if all are sent
1463  * and no more replies are expected.
1464  * (it is possible to get less replies than requests sent e.g. due to timed out
1465  * requests or requests that we had trouble to send out)
1466  *
1467  * NOTE: This function contains a potential schedule point (cond_resched()).
1468  */
ptlrpc_check_set(const struct lu_env * env,struct ptlrpc_request_set * set)1469 int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set)
1470 {
1471 	struct list_head *tmp, *next;
1472 	struct list_head comp_reqs;
1473 	int force_timer_recalc = 0;
1474 
1475 	if (atomic_read(&set->set_remaining) == 0)
1476 		return 1;
1477 
1478 	INIT_LIST_HEAD(&comp_reqs);
1479 	list_for_each_safe(tmp, next, &set->set_requests) {
1480 		struct ptlrpc_request *req =
1481 			list_entry(tmp, struct ptlrpc_request,
1482 				       rq_set_chain);
1483 		struct obd_import *imp = req->rq_import;
1484 		int unregistered = 0;
1485 		int rc = 0;
1486 
1487 		/*
1488 		 * This schedule point is mainly for the ptlrpcd caller of this
1489 		 * function.  Most ptlrpc sets are not long-lived and unbounded
1490 		 * in length, but at the least the set used by the ptlrpcd is.
1491 		 * Since the processing time is unbounded, we need to insert an
1492 		 * explicit schedule point to make the thread well-behaved.
1493 		 */
1494 		cond_resched();
1495 
1496 		if (req->rq_phase == RQ_PHASE_NEW &&
1497 		    ptlrpc_send_new_req(req)) {
1498 			force_timer_recalc = 1;
1499 		}
1500 
1501 		/* delayed send - skip */
1502 		if (req->rq_phase == RQ_PHASE_NEW && req->rq_sent)
1503 			continue;
1504 
1505 		/* delayed resend - skip */
1506 		if (req->rq_phase == RQ_PHASE_RPC && req->rq_resend &&
1507 		    req->rq_sent > ktime_get_real_seconds())
1508 			continue;
1509 
1510 		if (!(req->rq_phase == RQ_PHASE_RPC ||
1511 		      req->rq_phase == RQ_PHASE_BULK ||
1512 		      req->rq_phase == RQ_PHASE_INTERPRET ||
1513 		      req->rq_phase == RQ_PHASE_UNREGISTERING ||
1514 		      req->rq_phase == RQ_PHASE_COMPLETE)) {
1515 			DEBUG_REQ(D_ERROR, req, "bad phase %x", req->rq_phase);
1516 			LBUG();
1517 		}
1518 
1519 		if (req->rq_phase == RQ_PHASE_UNREGISTERING) {
1520 			LASSERT(req->rq_next_phase != req->rq_phase);
1521 			LASSERT(req->rq_next_phase != RQ_PHASE_UNDEFINED);
1522 
1523 			/*
1524 			 * Skip processing until reply is unlinked. We
1525 			 * can't return to pool before that and we can't
1526 			 * call interpret before that. We need to make
1527 			 * sure that all rdma transfers finished and will
1528 			 * not corrupt any data.
1529 			 */
1530 			if (ptlrpc_client_recv_or_unlink(req) ||
1531 			    ptlrpc_client_bulk_active(req))
1532 				continue;
1533 
1534 			/*
1535 			 * Turn fail_loc off to prevent it from looping
1536 			 * forever.
1537 			 */
1538 			if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK)) {
1539 				OBD_FAIL_CHECK_ORSET(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK,
1540 						     OBD_FAIL_ONCE);
1541 			}
1542 			if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_BULK_UNLINK)) {
1543 				OBD_FAIL_CHECK_ORSET(OBD_FAIL_PTLRPC_LONG_BULK_UNLINK,
1544 						     OBD_FAIL_ONCE);
1545 			}
1546 
1547 			/* Move to next phase if reply was successfully
1548 			 * unlinked.
1549 			 */
1550 			ptlrpc_rqphase_move(req, req->rq_next_phase);
1551 		}
1552 
1553 		if (req->rq_phase == RQ_PHASE_COMPLETE) {
1554 			list_move_tail(&req->rq_set_chain, &comp_reqs);
1555 			continue;
1556 		}
1557 
1558 		if (req->rq_phase == RQ_PHASE_INTERPRET)
1559 			goto interpret;
1560 
1561 		/* Note that this also will start async reply unlink. */
1562 		if (req->rq_net_err && !req->rq_timedout) {
1563 			ptlrpc_expire_one_request(req, 1);
1564 
1565 			/* Check if we still need to wait for unlink. */
1566 			if (ptlrpc_client_recv_or_unlink(req) ||
1567 			    ptlrpc_client_bulk_active(req))
1568 				continue;
1569 			/* If there is no need to resend, fail it now. */
1570 			if (req->rq_no_resend) {
1571 				if (req->rq_status == 0)
1572 					req->rq_status = -EIO;
1573 				ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
1574 				goto interpret;
1575 			} else {
1576 				continue;
1577 			}
1578 		}
1579 
1580 		if (req->rq_err) {
1581 			spin_lock(&req->rq_lock);
1582 			req->rq_replied = 0;
1583 			spin_unlock(&req->rq_lock);
1584 			if (req->rq_status == 0)
1585 				req->rq_status = -EIO;
1586 			ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
1587 			goto interpret;
1588 		}
1589 
1590 		/*
1591 		 * ptlrpc_set_wait->l_wait_event sets lwi_allow_intr
1592 		 * so it sets rq_intr regardless of individual rpc
1593 		 * timeouts. The synchronous IO waiting path sets
1594 		 * rq_intr irrespective of whether ptlrpcd
1595 		 * has seen a timeout.  Our policy is to only interpret
1596 		 * interrupted rpcs after they have timed out, so we
1597 		 * need to enforce that here.
1598 		 */
1599 
1600 		if (req->rq_intr && (req->rq_timedout || req->rq_waiting ||
1601 				     req->rq_wait_ctx)) {
1602 			req->rq_status = -EINTR;
1603 			ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
1604 			goto interpret;
1605 		}
1606 
1607 		if (req->rq_phase == RQ_PHASE_RPC) {
1608 			if (req->rq_timedout || req->rq_resend ||
1609 			    req->rq_waiting || req->rq_wait_ctx) {
1610 				int status;
1611 
1612 				if (!ptlrpc_unregister_reply(req, 1))
1613 					continue;
1614 
1615 				spin_lock(&imp->imp_lock);
1616 				if (ptlrpc_import_delay_req(imp, req,
1617 							    &status)) {
1618 					/*
1619 					 * put on delay list - only if we wait
1620 					 * recovery finished - before send
1621 					 */
1622 					list_del_init(&req->rq_list);
1623 					list_add_tail(&req->rq_list,
1624 							  &imp->
1625 							  imp_delayed_list);
1626 					spin_unlock(&imp->imp_lock);
1627 					continue;
1628 				}
1629 
1630 				if (status != 0) {
1631 					req->rq_status = status;
1632 					ptlrpc_rqphase_move(req,
1633 						RQ_PHASE_INTERPRET);
1634 					spin_unlock(&imp->imp_lock);
1635 					goto interpret;
1636 				}
1637 				if (ptlrpc_no_resend(req) &&
1638 				    !req->rq_wait_ctx) {
1639 					req->rq_status = -ENOTCONN;
1640 					ptlrpc_rqphase_move(req,
1641 							    RQ_PHASE_INTERPRET);
1642 					spin_unlock(&imp->imp_lock);
1643 					goto interpret;
1644 				}
1645 
1646 				list_del_init(&req->rq_list);
1647 				list_add_tail(&req->rq_list,
1648 						  &imp->imp_sending_list);
1649 
1650 				spin_unlock(&imp->imp_lock);
1651 
1652 				spin_lock(&req->rq_lock);
1653 				req->rq_waiting = 0;
1654 				spin_unlock(&req->rq_lock);
1655 
1656 				if (req->rq_timedout || req->rq_resend) {
1657 					/* This is re-sending anyway, let's mark req as resend. */
1658 					spin_lock(&req->rq_lock);
1659 					req->rq_resend = 1;
1660 					spin_unlock(&req->rq_lock);
1661 					if (req->rq_bulk) {
1662 						__u64 old_xid;
1663 
1664 						if (!ptlrpc_unregister_bulk(req, 1))
1665 							continue;
1666 
1667 						/* ensure previous bulk fails */
1668 						old_xid = req->rq_xid;
1669 						req->rq_xid = ptlrpc_next_xid();
1670 						CDEBUG(D_HA, "resend bulk old x%llu new x%llu\n",
1671 						       old_xid, req->rq_xid);
1672 					}
1673 				}
1674 				/*
1675 				 * rq_wait_ctx is only touched by ptlrpcd,
1676 				 * so no lock is needed here.
1677 				 */
1678 				status = sptlrpc_req_refresh_ctx(req, -1);
1679 				if (status) {
1680 					if (req->rq_err) {
1681 						req->rq_status = status;
1682 						spin_lock(&req->rq_lock);
1683 						req->rq_wait_ctx = 0;
1684 						spin_unlock(&req->rq_lock);
1685 						force_timer_recalc = 1;
1686 					} else {
1687 						spin_lock(&req->rq_lock);
1688 						req->rq_wait_ctx = 1;
1689 						spin_unlock(&req->rq_lock);
1690 					}
1691 
1692 					continue;
1693 				} else {
1694 					spin_lock(&req->rq_lock);
1695 					req->rq_wait_ctx = 0;
1696 					spin_unlock(&req->rq_lock);
1697 				}
1698 
1699 				rc = ptl_send_rpc(req, 0);
1700 				if (rc) {
1701 					DEBUG_REQ(D_HA, req,
1702 						  "send failed: rc = %d", rc);
1703 					force_timer_recalc = 1;
1704 					spin_lock(&req->rq_lock);
1705 					req->rq_net_err = 1;
1706 					spin_unlock(&req->rq_lock);
1707 					continue;
1708 				}
1709 				/* need to reset the timeout */
1710 				force_timer_recalc = 1;
1711 			}
1712 
1713 			spin_lock(&req->rq_lock);
1714 
1715 			if (ptlrpc_client_early(req)) {
1716 				ptlrpc_at_recv_early_reply(req);
1717 				spin_unlock(&req->rq_lock);
1718 				continue;
1719 			}
1720 
1721 			/* Still waiting for a reply? */
1722 			if (ptlrpc_client_recv(req)) {
1723 				spin_unlock(&req->rq_lock);
1724 				continue;
1725 			}
1726 
1727 			/* Did we actually receive a reply? */
1728 			if (!ptlrpc_client_replied(req)) {
1729 				spin_unlock(&req->rq_lock);
1730 				continue;
1731 			}
1732 
1733 			spin_unlock(&req->rq_lock);
1734 
1735 			/*
1736 			 * unlink from net because we are going to
1737 			 * swab in-place of reply buffer
1738 			 */
1739 			unregistered = ptlrpc_unregister_reply(req, 1);
1740 			if (!unregistered)
1741 				continue;
1742 
1743 			req->rq_status = after_reply(req);
1744 			if (req->rq_resend)
1745 				continue;
1746 
1747 			/*
1748 			 * If there is no bulk associated with this request,
1749 			 * then we're done and should let the interpreter
1750 			 * process the reply. Similarly if the RPC returned
1751 			 * an error, and therefore the bulk will never arrive.
1752 			 */
1753 			if (req->rq_bulk == NULL || req->rq_status < 0) {
1754 				ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
1755 				goto interpret;
1756 			}
1757 
1758 			ptlrpc_rqphase_move(req, RQ_PHASE_BULK);
1759 		}
1760 
1761 		LASSERT(req->rq_phase == RQ_PHASE_BULK);
1762 		if (ptlrpc_client_bulk_active(req))
1763 			continue;
1764 
1765 		if (req->rq_bulk->bd_failure) {
1766 			/*
1767 			 * The RPC reply arrived OK, but the bulk screwed
1768 			 * up!  Dead weird since the server told us the RPC
1769 			 * was good after getting the REPLY for her GET or
1770 			 * the ACK for her PUT.
1771 			 */
1772 			DEBUG_REQ(D_ERROR, req, "bulk transfer failed");
1773 			req->rq_status = -EIO;
1774 		}
1775 
1776 		ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
1777 
1778 interpret:
1779 		LASSERT(req->rq_phase == RQ_PHASE_INTERPRET);
1780 
1781 		/*
1782 		 * This moves to "unregistering" phase we need to wait for
1783 		 * reply unlink.
1784 		 */
1785 		if (!unregistered && !ptlrpc_unregister_reply(req, 1)) {
1786 			/* start async bulk unlink too */
1787 			ptlrpc_unregister_bulk(req, 1);
1788 			continue;
1789 		}
1790 
1791 		if (!ptlrpc_unregister_bulk(req, 1))
1792 			continue;
1793 
1794 		/* When calling interpret receive should already be finished. */
1795 		LASSERT(!req->rq_receiving_reply);
1796 
1797 		ptlrpc_req_interpret(env, req, req->rq_status);
1798 
1799 		if (ptlrpcd_check_work(req)) {
1800 			atomic_dec(&set->set_remaining);
1801 			continue;
1802 		}
1803 		ptlrpc_rqphase_move(req, RQ_PHASE_COMPLETE);
1804 
1805 		CDEBUG(req->rq_reqmsg != NULL ? D_RPCTRACE : 0,
1806 		       "Completed RPC pname:cluuid:pid:xid:nid:opc %s:%s:%d:%llu:%s:%d\n",
1807 		       current_comm(), imp->imp_obd->obd_uuid.uuid,
1808 		       lustre_msg_get_status(req->rq_reqmsg), req->rq_xid,
1809 		       libcfs_nid2str(imp->imp_connection->c_peer.nid),
1810 		       lustre_msg_get_opc(req->rq_reqmsg));
1811 
1812 		spin_lock(&imp->imp_lock);
1813 		/*
1814 		 * Request already may be not on sending or delaying list. This
1815 		 * may happen in the case of marking it erroneous for the case
1816 		 * ptlrpc_import_delay_req(req, status) find it impossible to
1817 		 * allow sending this rpc and returns *status != 0.
1818 		 */
1819 		if (!list_empty(&req->rq_list)) {
1820 			list_del_init(&req->rq_list);
1821 			atomic_dec(&imp->imp_inflight);
1822 		}
1823 		spin_unlock(&imp->imp_lock);
1824 
1825 		atomic_dec(&set->set_remaining);
1826 		wake_up_all(&imp->imp_recovery_waitq);
1827 
1828 		if (set->set_producer) {
1829 			/* produce a new request if possible */
1830 			if (ptlrpc_set_producer(set) > 0)
1831 				force_timer_recalc = 1;
1832 
1833 			/*
1834 			 * free the request that has just been completed
1835 			 * in order not to pollute set->set_requests
1836 			 */
1837 			list_del_init(&req->rq_set_chain);
1838 			spin_lock(&req->rq_lock);
1839 			req->rq_set = NULL;
1840 			req->rq_invalid_rqset = 0;
1841 			spin_unlock(&req->rq_lock);
1842 
1843 			/* record rq_status to compute the final status later */
1844 			if (req->rq_status != 0)
1845 				set->set_rc = req->rq_status;
1846 			ptlrpc_req_finished(req);
1847 		} else {
1848 			list_move_tail(&req->rq_set_chain, &comp_reqs);
1849 		}
1850 	}
1851 
1852 	/*
1853 	 * move completed request at the head of list so it's easier for
1854 	 * caller to find them
1855 	 */
1856 	list_splice(&comp_reqs, &set->set_requests);
1857 
1858 	/* If we hit an error, we want to recover promptly. */
1859 	return atomic_read(&set->set_remaining) == 0 || force_timer_recalc;
1860 }
1861 EXPORT_SYMBOL(ptlrpc_check_set);
1862 
1863 /**
1864  * Time out request \a req. is \a async_unlink is set, that means do not wait
1865  * until LNet actually confirms network buffer unlinking.
1866  * Return 1 if we should give up further retrying attempts or 0 otherwise.
1867  */
ptlrpc_expire_one_request(struct ptlrpc_request * req,int async_unlink)1868 int ptlrpc_expire_one_request(struct ptlrpc_request *req, int async_unlink)
1869 {
1870 	struct obd_import *imp = req->rq_import;
1871 	int rc = 0;
1872 
1873 	spin_lock(&req->rq_lock);
1874 	req->rq_timedout = 1;
1875 	spin_unlock(&req->rq_lock);
1876 
1877 	DEBUG_REQ(D_WARNING, req, "Request sent has %s: [sent %lld/real %lld]",
1878 		  req->rq_net_err ? "failed due to network error" :
1879 		     ((req->rq_real_sent == 0 ||
1880 		       req->rq_real_sent < req->rq_sent ||
1881 		       req->rq_real_sent >= req->rq_deadline) ?
1882 		      "timed out for sent delay" : "timed out for slow reply"),
1883 		  (s64)req->rq_sent, (s64)req->rq_real_sent);
1884 
1885 	if (imp != NULL && obd_debug_peer_on_timeout)
1886 		LNetCtl(IOC_LIBCFS_DEBUG_PEER, &imp->imp_connection->c_peer);
1887 
1888 	ptlrpc_unregister_reply(req, async_unlink);
1889 	ptlrpc_unregister_bulk(req, async_unlink);
1890 
1891 	if (obd_dump_on_timeout)
1892 		libcfs_debug_dumplog();
1893 
1894 	if (imp == NULL) {
1895 		DEBUG_REQ(D_HA, req, "NULL import: already cleaned up?");
1896 		return 1;
1897 	}
1898 
1899 	atomic_inc(&imp->imp_timeouts);
1900 
1901 	/* The DLM server doesn't want recovery run on its imports. */
1902 	if (imp->imp_dlm_fake)
1903 		return 1;
1904 
1905 	/*
1906 	 * If this request is for recovery or other primordial tasks,
1907 	 * then error it out here.
1908 	 */
1909 	if (req->rq_ctx_init || req->rq_ctx_fini ||
1910 	    req->rq_send_state != LUSTRE_IMP_FULL ||
1911 	    imp->imp_obd->obd_no_recov) {
1912 		DEBUG_REQ(D_RPCTRACE, req, "err -110, sent_state=%s (now=%s)",
1913 			  ptlrpc_import_state_name(req->rq_send_state),
1914 			  ptlrpc_import_state_name(imp->imp_state));
1915 		spin_lock(&req->rq_lock);
1916 		req->rq_status = -ETIMEDOUT;
1917 		req->rq_err = 1;
1918 		spin_unlock(&req->rq_lock);
1919 		return 1;
1920 	}
1921 
1922 	/*
1923 	 * if a request can't be resent we can't wait for an answer after
1924 	 * the timeout
1925 	 */
1926 	if (ptlrpc_no_resend(req)) {
1927 		DEBUG_REQ(D_RPCTRACE, req, "TIMEOUT-NORESEND:");
1928 		rc = 1;
1929 	}
1930 
1931 	ptlrpc_fail_import(imp, lustre_msg_get_conn_cnt(req->rq_reqmsg));
1932 
1933 	return rc;
1934 }
1935 
1936 /**
1937  * Time out all uncompleted requests in request set pointed by \a data
1938  * Callback used when waiting on sets with l_wait_event.
1939  * Always returns 1.
1940  */
ptlrpc_expired_set(void * data)1941 int ptlrpc_expired_set(void *data)
1942 {
1943 	struct ptlrpc_request_set *set = data;
1944 	struct list_head *tmp;
1945 	time64_t now = ktime_get_real_seconds();
1946 
1947 	LASSERT(set != NULL);
1948 
1949 	/* A timeout expired. See which reqs it applies to...  */
1950 	list_for_each(tmp, &set->set_requests) {
1951 		struct ptlrpc_request *req =
1952 			list_entry(tmp, struct ptlrpc_request,
1953 				       rq_set_chain);
1954 
1955 		/* don't expire request waiting for context */
1956 		if (req->rq_wait_ctx)
1957 			continue;
1958 
1959 		/* Request in-flight? */
1960 		if (!((req->rq_phase == RQ_PHASE_RPC &&
1961 		       !req->rq_waiting && !req->rq_resend) ||
1962 		      (req->rq_phase == RQ_PHASE_BULK)))
1963 			continue;
1964 
1965 		if (req->rq_timedout ||     /* already dealt with */
1966 		    req->rq_deadline > now) /* not expired */
1967 			continue;
1968 
1969 		/*
1970 		 * Deal with this guy. Do it asynchronously to not block
1971 		 * ptlrpcd thread.
1972 		 */
1973 		ptlrpc_expire_one_request(req, 1);
1974 	}
1975 
1976 	/*
1977 	 * When waiting for a whole set, we always break out of the
1978 	 * sleep so we can recalculate the timeout, or enable interrupts
1979 	 * if everyone's timed out.
1980 	 */
1981 	return 1;
1982 }
1983 EXPORT_SYMBOL(ptlrpc_expired_set);
1984 
1985 /**
1986  * Sets rq_intr flag in \a req under spinlock.
1987  */
ptlrpc_mark_interrupted(struct ptlrpc_request * req)1988 void ptlrpc_mark_interrupted(struct ptlrpc_request *req)
1989 {
1990 	spin_lock(&req->rq_lock);
1991 	req->rq_intr = 1;
1992 	spin_unlock(&req->rq_lock);
1993 }
1994 EXPORT_SYMBOL(ptlrpc_mark_interrupted);
1995 
1996 /**
1997  * Interrupts (sets interrupted flag) all uncompleted requests in
1998  * a set \a data. Callback for l_wait_event for interruptible waits.
1999  */
ptlrpc_interrupted_set(void * data)2000 void ptlrpc_interrupted_set(void *data)
2001 {
2002 	struct ptlrpc_request_set *set = data;
2003 	struct list_head *tmp;
2004 
2005 	LASSERT(set != NULL);
2006 	CDEBUG(D_RPCTRACE, "INTERRUPTED SET %p\n", set);
2007 
2008 	list_for_each(tmp, &set->set_requests) {
2009 		struct ptlrpc_request *req =
2010 			list_entry(tmp, struct ptlrpc_request,
2011 				       rq_set_chain);
2012 
2013 		if (req->rq_phase != RQ_PHASE_RPC &&
2014 		    req->rq_phase != RQ_PHASE_UNREGISTERING)
2015 			continue;
2016 
2017 		ptlrpc_mark_interrupted(req);
2018 	}
2019 }
2020 EXPORT_SYMBOL(ptlrpc_interrupted_set);
2021 
2022 /**
2023  * Get the smallest timeout in the set; this does NOT set a timeout.
2024  */
ptlrpc_set_next_timeout(struct ptlrpc_request_set * set)2025 int ptlrpc_set_next_timeout(struct ptlrpc_request_set *set)
2026 {
2027 	struct list_head *tmp;
2028 	time64_t now = ktime_get_real_seconds();
2029 	int timeout = 0;
2030 	struct ptlrpc_request *req;
2031 	time64_t deadline;
2032 
2033 	list_for_each(tmp, &set->set_requests) {
2034 		req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
2035 
2036 		/* Request in-flight? */
2037 		if (!(((req->rq_phase == RQ_PHASE_RPC) && !req->rq_waiting) ||
2038 		      (req->rq_phase == RQ_PHASE_BULK) ||
2039 		      (req->rq_phase == RQ_PHASE_NEW)))
2040 			continue;
2041 
2042 		/* Already timed out. */
2043 		if (req->rq_timedout)
2044 			continue;
2045 
2046 		/* Waiting for ctx. */
2047 		if (req->rq_wait_ctx)
2048 			continue;
2049 
2050 		if (req->rq_phase == RQ_PHASE_NEW)
2051 			deadline = req->rq_sent;
2052 		else if (req->rq_phase == RQ_PHASE_RPC && req->rq_resend)
2053 			deadline = req->rq_sent;
2054 		else
2055 			deadline = req->rq_sent + req->rq_timeout;
2056 
2057 		if (deadline <= now)    /* actually expired already */
2058 			timeout = 1;    /* ASAP */
2059 		else if (timeout == 0 || timeout > deadline - now)
2060 			timeout = deadline - now;
2061 	}
2062 	return timeout;
2063 }
2064 EXPORT_SYMBOL(ptlrpc_set_next_timeout);
2065 
2066 /**
2067  * Send all unset request from the set and then wait until all
2068  * requests in the set complete (either get a reply, timeout, get an
2069  * error or otherwise be interrupted).
2070  * Returns 0 on success or error code otherwise.
2071  */
ptlrpc_set_wait(struct ptlrpc_request_set * set)2072 int ptlrpc_set_wait(struct ptlrpc_request_set *set)
2073 {
2074 	struct list_head *tmp;
2075 	struct ptlrpc_request *req;
2076 	struct l_wait_info lwi;
2077 	int rc, timeout;
2078 
2079 	if (set->set_producer)
2080 		(void)ptlrpc_set_producer(set);
2081 	else
2082 		list_for_each(tmp, &set->set_requests) {
2083 			req = list_entry(tmp, struct ptlrpc_request,
2084 					     rq_set_chain);
2085 			if (req->rq_phase == RQ_PHASE_NEW)
2086 				(void)ptlrpc_send_new_req(req);
2087 		}
2088 
2089 	if (list_empty(&set->set_requests))
2090 		return 0;
2091 
2092 	do {
2093 		timeout = ptlrpc_set_next_timeout(set);
2094 
2095 		/*
2096 		 * wait until all complete, interrupted, or an in-flight
2097 		 * req times out
2098 		 */
2099 		CDEBUG(D_RPCTRACE, "set %p going to sleep for %d seconds\n",
2100 		       set, timeout);
2101 
2102 		if (timeout == 0 && !cfs_signal_pending())
2103 			/*
2104 			 * No requests are in-flight (ether timed out
2105 			 * or delayed), so we can allow interrupts.
2106 			 * We still want to block for a limited time,
2107 			 * so we allow interrupts during the timeout.
2108 			 */
2109 			lwi = LWI_TIMEOUT_INTR_ALL(cfs_time_seconds(1),
2110 						   ptlrpc_expired_set,
2111 						   ptlrpc_interrupted_set, set);
2112 		else
2113 			/*
2114 			 * At least one request is in flight, so no
2115 			 * interrupts are allowed. Wait until all
2116 			 * complete, or an in-flight req times out.
2117 			 */
2118 			lwi = LWI_TIMEOUT(cfs_time_seconds(timeout ? timeout : 1),
2119 					  ptlrpc_expired_set, set);
2120 
2121 		rc = l_wait_event(set->set_waitq, ptlrpc_check_set(NULL, set), &lwi);
2122 
2123 		/*
2124 		 * LU-769 - if we ignored the signal because it was already
2125 		 * pending when we started, we need to handle it now or we risk
2126 		 * it being ignored forever
2127 		 */
2128 		if (rc == -ETIMEDOUT && !lwi.lwi_allow_intr &&
2129 		    cfs_signal_pending()) {
2130 			sigset_t blocked_sigs =
2131 					   cfs_block_sigsinv(LUSTRE_FATAL_SIGS);
2132 
2133 			/*
2134 			 * In fact we only interrupt for the "fatal" signals
2135 			 * like SIGINT or SIGKILL. We still ignore less
2136 			 * important signals since ptlrpc set is not easily
2137 			 * reentrant from userspace again
2138 			 */
2139 			if (cfs_signal_pending())
2140 				ptlrpc_interrupted_set(set);
2141 			cfs_restore_sigs(blocked_sigs);
2142 		}
2143 
2144 		LASSERT(rc == 0 || rc == -EINTR || rc == -ETIMEDOUT);
2145 
2146 		/*
2147 		 * -EINTR => all requests have been flagged rq_intr so next
2148 		 * check completes.
2149 		 * -ETIMEDOUT => someone timed out.  When all reqs have
2150 		 * timed out, signals are enabled allowing completion with
2151 		 * EINTR.
2152 		 * I don't really care if we go once more round the loop in
2153 		 * the error cases -eeb.
2154 		 */
2155 		if (rc == 0 && atomic_read(&set->set_remaining) == 0) {
2156 			list_for_each(tmp, &set->set_requests) {
2157 				req = list_entry(tmp, struct ptlrpc_request,
2158 						     rq_set_chain);
2159 				spin_lock(&req->rq_lock);
2160 				req->rq_invalid_rqset = 1;
2161 				spin_unlock(&req->rq_lock);
2162 			}
2163 		}
2164 	} while (rc != 0 || atomic_read(&set->set_remaining) != 0);
2165 
2166 	LASSERT(atomic_read(&set->set_remaining) == 0);
2167 
2168 	rc = set->set_rc; /* rq_status of already freed requests if any */
2169 	list_for_each(tmp, &set->set_requests) {
2170 		req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
2171 
2172 		LASSERT(req->rq_phase == RQ_PHASE_COMPLETE);
2173 		if (req->rq_status != 0)
2174 			rc = req->rq_status;
2175 	}
2176 
2177 	if (set->set_interpret != NULL) {
2178 		int (*interpreter)(struct ptlrpc_request_set *set, void *, int) =
2179 			set->set_interpret;
2180 		rc = interpreter(set, set->set_arg, rc);
2181 	} else {
2182 		struct ptlrpc_set_cbdata *cbdata, *n;
2183 		int err;
2184 
2185 		list_for_each_entry_safe(cbdata, n,
2186 					 &set->set_cblist, psc_item) {
2187 			list_del_init(&cbdata->psc_item);
2188 			err = cbdata->psc_interpret(set, cbdata->psc_data, rc);
2189 			if (err && !rc)
2190 				rc = err;
2191 			kfree(cbdata);
2192 		}
2193 	}
2194 
2195 	return rc;
2196 }
2197 EXPORT_SYMBOL(ptlrpc_set_wait);
2198 
2199 /**
2200  * Helper function for request freeing.
2201  * Called when request count reached zero and request needs to be freed.
2202  * Removes request from all sorts of sending/replay lists it might be on,
2203  * frees network buffers if any are present.
2204  * If \a locked is set, that means caller is already holding import imp_lock
2205  * and so we no longer need to reobtain it (for certain lists manipulations)
2206  */
__ptlrpc_free_req(struct ptlrpc_request * request,int locked)2207 static void __ptlrpc_free_req(struct ptlrpc_request *request, int locked)
2208 {
2209 	if (request == NULL)
2210 		return;
2211 	LASSERTF(!request->rq_receiving_reply, "req %p\n", request);
2212 	LASSERTF(request->rq_rqbd == NULL, "req %p\n", request);/* client-side */
2213 	LASSERTF(list_empty(&request->rq_list), "req %p\n", request);
2214 	LASSERTF(list_empty(&request->rq_set_chain), "req %p\n", request);
2215 	LASSERTF(list_empty(&request->rq_exp_list), "req %p\n", request);
2216 	LASSERTF(!request->rq_replay, "req %p\n", request);
2217 
2218 	req_capsule_fini(&request->rq_pill);
2219 
2220 	/*
2221 	 * We must take it off the imp_replay_list first.  Otherwise, we'll set
2222 	 * request->rq_reqmsg to NULL while osc_close is dereferencing it.
2223 	 */
2224 	if (request->rq_import != NULL) {
2225 		if (!locked)
2226 			spin_lock(&request->rq_import->imp_lock);
2227 		list_del_init(&request->rq_replay_list);
2228 		if (!locked)
2229 			spin_unlock(&request->rq_import->imp_lock);
2230 	}
2231 	LASSERTF(list_empty(&request->rq_replay_list), "req %p\n", request);
2232 
2233 	if (atomic_read(&request->rq_refcount) != 0) {
2234 		DEBUG_REQ(D_ERROR, request,
2235 			  "freeing request with nonzero refcount");
2236 		LBUG();
2237 	}
2238 
2239 	if (request->rq_repbuf != NULL)
2240 		sptlrpc_cli_free_repbuf(request);
2241 	if (request->rq_export != NULL) {
2242 		class_export_put(request->rq_export);
2243 		request->rq_export = NULL;
2244 	}
2245 	if (request->rq_import != NULL) {
2246 		class_import_put(request->rq_import);
2247 		request->rq_import = NULL;
2248 	}
2249 	if (request->rq_bulk != NULL)
2250 		ptlrpc_free_bulk_pin(request->rq_bulk);
2251 
2252 	if (request->rq_reqbuf != NULL || request->rq_clrbuf != NULL)
2253 		sptlrpc_cli_free_reqbuf(request);
2254 
2255 	if (request->rq_cli_ctx)
2256 		sptlrpc_req_put_ctx(request, !locked);
2257 
2258 	if (request->rq_pool)
2259 		__ptlrpc_free_req_to_pool(request);
2260 	else
2261 		ptlrpc_request_cache_free(request);
2262 }
2263 
2264 /**
2265  * Helper function
2266  * Drops one reference count for request \a request.
2267  * \a locked set indicates that caller holds import imp_lock.
2268  * Frees the request when reference count reaches zero.
2269  */
__ptlrpc_req_finished(struct ptlrpc_request * request,int locked)2270 static int __ptlrpc_req_finished(struct ptlrpc_request *request, int locked)
2271 {
2272 	if (request == NULL)
2273 		return 1;
2274 
2275 	if (request == LP_POISON ||
2276 	    request->rq_reqmsg == LP_POISON) {
2277 		CERROR("dereferencing freed request (bug 575)\n");
2278 		LBUG();
2279 		return 1;
2280 	}
2281 
2282 	DEBUG_REQ(D_INFO, request, "refcount now %u",
2283 		  atomic_read(&request->rq_refcount) - 1);
2284 
2285 	if (atomic_dec_and_test(&request->rq_refcount)) {
2286 		__ptlrpc_free_req(request, locked);
2287 		return 1;
2288 	}
2289 
2290 	return 0;
2291 }
2292 
2293 /**
2294  * Drops one reference count for a request.
2295  */
ptlrpc_req_finished(struct ptlrpc_request * request)2296 void ptlrpc_req_finished(struct ptlrpc_request *request)
2297 {
2298 	__ptlrpc_req_finished(request, 0);
2299 }
2300 EXPORT_SYMBOL(ptlrpc_req_finished);
2301 
2302 /**
2303  * Returns xid of a \a request
2304  */
ptlrpc_req_xid(struct ptlrpc_request * request)2305 __u64 ptlrpc_req_xid(struct ptlrpc_request *request)
2306 {
2307 	return request->rq_xid;
2308 }
2309 EXPORT_SYMBOL(ptlrpc_req_xid);
2310 
2311 /**
2312  * Disengage the client's reply buffer from the network
2313  * NB does _NOT_ unregister any client-side bulk.
2314  * IDEMPOTENT, but _not_ safe against concurrent callers.
2315  * The request owner (i.e. the thread doing the I/O) must call...
2316  * Returns 0 on success or 1 if unregistering cannot be made.
2317  */
ptlrpc_unregister_reply(struct ptlrpc_request * request,int async)2318 int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async)
2319 {
2320 	int rc;
2321 	wait_queue_head_t *wq;
2322 	struct l_wait_info lwi;
2323 
2324 	/* Might sleep. */
2325 	LASSERT(!in_interrupt());
2326 
2327 	/* Let's setup deadline for reply unlink. */
2328 	if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK) &&
2329 	    async && request->rq_reply_deadline == 0)
2330 		request->rq_reply_deadline = ktime_get_real_seconds()+LONG_UNLINK;
2331 
2332 	/* Nothing left to do. */
2333 	if (!ptlrpc_client_recv_or_unlink(request))
2334 		return 1;
2335 
2336 	LNetMDUnlink(request->rq_reply_md_h);
2337 
2338 	/* Let's check it once again. */
2339 	if (!ptlrpc_client_recv_or_unlink(request))
2340 		return 1;
2341 
2342 	/* Move to "Unregistering" phase as reply was not unlinked yet. */
2343 	ptlrpc_rqphase_move(request, RQ_PHASE_UNREGISTERING);
2344 
2345 	/* Do not wait for unlink to finish. */
2346 	if (async)
2347 		return 0;
2348 
2349 	/*
2350 	 * We have to l_wait_event() whatever the result, to give liblustre
2351 	 * a chance to run reply_in_callback(), and to make sure we've
2352 	 * unlinked before returning a req to the pool.
2353 	 */
2354 	if (request->rq_set != NULL)
2355 		wq = &request->rq_set->set_waitq;
2356 	else
2357 		wq = &request->rq_reply_waitq;
2358 
2359 	for (;;) {
2360 		/*
2361 		 * Network access will complete in finite time but the HUGE
2362 		 * timeout lets us CWARN for visibility of sluggish NALs
2363 		 */
2364 		lwi = LWI_TIMEOUT_INTERVAL(cfs_time_seconds(LONG_UNLINK),
2365 					   cfs_time_seconds(1), NULL, NULL);
2366 		rc = l_wait_event(*wq, !ptlrpc_client_recv_or_unlink(request),
2367 				  &lwi);
2368 		if (rc == 0) {
2369 			ptlrpc_rqphase_move(request, request->rq_next_phase);
2370 			return 1;
2371 		}
2372 
2373 		LASSERT(rc == -ETIMEDOUT);
2374 		DEBUG_REQ(D_WARNING, request,
2375 			  "Unexpectedly long timeout rvcng=%d unlnk=%d/%d",
2376 			  request->rq_receiving_reply,
2377 			  request->rq_req_unlink, request->rq_reply_unlink);
2378 	}
2379 	return 0;
2380 }
2381 EXPORT_SYMBOL(ptlrpc_unregister_reply);
2382 
ptlrpc_free_request(struct ptlrpc_request * req)2383 static void ptlrpc_free_request(struct ptlrpc_request *req)
2384 {
2385 	spin_lock(&req->rq_lock);
2386 	req->rq_replay = 0;
2387 	spin_unlock(&req->rq_lock);
2388 
2389 	if (req->rq_commit_cb != NULL)
2390 		req->rq_commit_cb(req);
2391 	list_del_init(&req->rq_replay_list);
2392 
2393 	__ptlrpc_req_finished(req, 1);
2394 }
2395 
2396 /**
2397  * the request is committed and dropped from the replay list of its import
2398  */
ptlrpc_request_committed(struct ptlrpc_request * req,int force)2399 void ptlrpc_request_committed(struct ptlrpc_request *req, int force)
2400 {
2401 	struct obd_import	*imp = req->rq_import;
2402 
2403 	spin_lock(&imp->imp_lock);
2404 	if (list_empty(&req->rq_replay_list)) {
2405 		spin_unlock(&imp->imp_lock);
2406 		return;
2407 	}
2408 
2409 	if (force || req->rq_transno <= imp->imp_peer_committed_transno)
2410 		ptlrpc_free_request(req);
2411 
2412 	spin_unlock(&imp->imp_lock);
2413 }
2414 EXPORT_SYMBOL(ptlrpc_request_committed);
2415 
2416 /**
2417  * Iterates through replay_list on import and prunes
2418  * all requests have transno smaller than last_committed for the
2419  * import and don't have rq_replay set.
2420  * Since requests are sorted in transno order, stops when meeting first
2421  * transno bigger than last_committed.
2422  * caller must hold imp->imp_lock
2423  */
ptlrpc_free_committed(struct obd_import * imp)2424 void ptlrpc_free_committed(struct obd_import *imp)
2425 {
2426 	struct ptlrpc_request *req, *saved;
2427 	struct ptlrpc_request *last_req = NULL; /* temporary fire escape */
2428 	bool skip_committed_list = true;
2429 
2430 	LASSERT(imp != NULL);
2431 	assert_spin_locked(&imp->imp_lock);
2432 
2433 	if (imp->imp_peer_committed_transno == imp->imp_last_transno_checked &&
2434 	    imp->imp_generation == imp->imp_last_generation_checked) {
2435 		CDEBUG(D_INFO, "%s: skip recheck: last_committed %llu\n",
2436 		       imp->imp_obd->obd_name, imp->imp_peer_committed_transno);
2437 		return;
2438 	}
2439 	CDEBUG(D_RPCTRACE, "%s: committing for last_committed %llu gen %d\n",
2440 	       imp->imp_obd->obd_name, imp->imp_peer_committed_transno,
2441 	       imp->imp_generation);
2442 
2443 	if (imp->imp_generation != imp->imp_last_generation_checked)
2444 		skip_committed_list = false;
2445 
2446 	imp->imp_last_transno_checked = imp->imp_peer_committed_transno;
2447 	imp->imp_last_generation_checked = imp->imp_generation;
2448 
2449 	list_for_each_entry_safe(req, saved, &imp->imp_replay_list,
2450 				 rq_replay_list) {
2451 		/* XXX ok to remove when 1357 resolved - rread 05/29/03  */
2452 		LASSERT(req != last_req);
2453 		last_req = req;
2454 
2455 		if (req->rq_transno == 0) {
2456 			DEBUG_REQ(D_EMERG, req, "zero transno during replay");
2457 			LBUG();
2458 		}
2459 		if (req->rq_import_generation < imp->imp_generation) {
2460 			DEBUG_REQ(D_RPCTRACE, req, "free request with old gen");
2461 			goto free_req;
2462 		}
2463 
2464 		/* not yet committed */
2465 		if (req->rq_transno > imp->imp_peer_committed_transno) {
2466 			DEBUG_REQ(D_RPCTRACE, req, "stopping search");
2467 			break;
2468 		}
2469 
2470 		if (req->rq_replay) {
2471 			DEBUG_REQ(D_RPCTRACE, req, "keeping (FL_REPLAY)");
2472 			list_move_tail(&req->rq_replay_list,
2473 				       &imp->imp_committed_list);
2474 			continue;
2475 		}
2476 
2477 		DEBUG_REQ(D_INFO, req, "commit (last_committed %llu)",
2478 			  imp->imp_peer_committed_transno);
2479 free_req:
2480 		ptlrpc_free_request(req);
2481 	}
2482 	if (skip_committed_list)
2483 		return;
2484 
2485 	list_for_each_entry_safe(req, saved, &imp->imp_committed_list,
2486 				 rq_replay_list) {
2487 		LASSERT(req->rq_transno != 0);
2488 		if (req->rq_import_generation < imp->imp_generation) {
2489 			DEBUG_REQ(D_RPCTRACE, req, "free stale open request");
2490 			ptlrpc_free_request(req);
2491 		}
2492 	}
2493 }
2494 
2495 /**
2496  * Schedule previously sent request for resend.
2497  * For bulk requests we assign new xid (to avoid problems with
2498  * lost replies and therefore several transfers landing into same buffer
2499  * from different sending attempts).
2500  */
ptlrpc_resend_req(struct ptlrpc_request * req)2501 void ptlrpc_resend_req(struct ptlrpc_request *req)
2502 {
2503 	DEBUG_REQ(D_HA, req, "going to resend");
2504 	spin_lock(&req->rq_lock);
2505 
2506 	/*
2507 	 * Request got reply but linked to the import list still.
2508 	 * Let ptlrpc_check_set() to process it.
2509 	 */
2510 	if (ptlrpc_client_replied(req)) {
2511 		spin_unlock(&req->rq_lock);
2512 		DEBUG_REQ(D_HA, req, "it has reply, so skip it");
2513 		return;
2514 	}
2515 
2516 	lustre_msg_set_handle(req->rq_reqmsg, &(struct lustre_handle){ 0 });
2517 	req->rq_status = -EAGAIN;
2518 
2519 	req->rq_resend = 1;
2520 	req->rq_net_err = 0;
2521 	req->rq_timedout = 0;
2522 	if (req->rq_bulk) {
2523 		__u64 old_xid = req->rq_xid;
2524 
2525 		/* ensure previous bulk fails */
2526 		req->rq_xid = ptlrpc_next_xid();
2527 		CDEBUG(D_HA, "resend bulk old x%llu new x%llu\n",
2528 		       old_xid, req->rq_xid);
2529 	}
2530 	ptlrpc_client_wake_req(req);
2531 	spin_unlock(&req->rq_lock);
2532 }
2533 EXPORT_SYMBOL(ptlrpc_resend_req);
2534 
2535 /**
2536  * Grab additional reference on a request \a req
2537  */
ptlrpc_request_addref(struct ptlrpc_request * req)2538 struct ptlrpc_request *ptlrpc_request_addref(struct ptlrpc_request *req)
2539 {
2540 	atomic_inc(&req->rq_refcount);
2541 	return req;
2542 }
2543 EXPORT_SYMBOL(ptlrpc_request_addref);
2544 
2545 /**
2546  * Add a request to import replay_list.
2547  * Must be called under imp_lock
2548  */
ptlrpc_retain_replayable_request(struct ptlrpc_request * req,struct obd_import * imp)2549 void ptlrpc_retain_replayable_request(struct ptlrpc_request *req,
2550 				      struct obd_import *imp)
2551 {
2552 	struct list_head *tmp;
2553 
2554 	assert_spin_locked(&imp->imp_lock);
2555 
2556 	if (req->rq_transno == 0) {
2557 		DEBUG_REQ(D_EMERG, req, "saving request with zero transno");
2558 		LBUG();
2559 	}
2560 
2561 	/*
2562 	 * clear this for new requests that were resent as well
2563 	 * as resent replayed requests.
2564 	 */
2565 	lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT);
2566 
2567 	/* don't re-add requests that have been replayed */
2568 	if (!list_empty(&req->rq_replay_list))
2569 		return;
2570 
2571 	lustre_msg_add_flags(req->rq_reqmsg, MSG_REPLAY);
2572 
2573 	LASSERT(imp->imp_replayable);
2574 	/* Balanced in ptlrpc_free_committed, usually. */
2575 	ptlrpc_request_addref(req);
2576 	list_for_each_prev(tmp, &imp->imp_replay_list) {
2577 		struct ptlrpc_request *iter =
2578 			list_entry(tmp, struct ptlrpc_request,
2579 				       rq_replay_list);
2580 
2581 		/*
2582 		 * We may have duplicate transnos if we create and then
2583 		 * open a file, or for closes retained if to match creating
2584 		 * opens, so use req->rq_xid as a secondary key.
2585 		 * (See bugs 684, 685, and 428.)
2586 		 * XXX no longer needed, but all opens need transnos!
2587 		 */
2588 		if (iter->rq_transno > req->rq_transno)
2589 			continue;
2590 
2591 		if (iter->rq_transno == req->rq_transno) {
2592 			LASSERT(iter->rq_xid != req->rq_xid);
2593 			if (iter->rq_xid > req->rq_xid)
2594 				continue;
2595 		}
2596 
2597 		list_add(&req->rq_replay_list, &iter->rq_replay_list);
2598 		return;
2599 	}
2600 
2601 	list_add(&req->rq_replay_list, &imp->imp_replay_list);
2602 }
2603 EXPORT_SYMBOL(ptlrpc_retain_replayable_request);
2604 
2605 /**
2606  * Send request and wait until it completes.
2607  * Returns request processing status.
2608  */
ptlrpc_queue_wait(struct ptlrpc_request * req)2609 int ptlrpc_queue_wait(struct ptlrpc_request *req)
2610 {
2611 	struct ptlrpc_request_set *set;
2612 	int rc;
2613 
2614 	LASSERT(req->rq_set == NULL);
2615 	LASSERT(!req->rq_receiving_reply);
2616 
2617 	set = ptlrpc_prep_set();
2618 	if (set == NULL) {
2619 		CERROR("Unable to allocate ptlrpc set.");
2620 		return -ENOMEM;
2621 	}
2622 
2623 	/* for distributed debugging */
2624 	lustre_msg_set_status(req->rq_reqmsg, current_pid());
2625 
2626 	/* add a ref for the set (see comment in ptlrpc_set_add_req) */
2627 	ptlrpc_request_addref(req);
2628 	ptlrpc_set_add_req(set, req);
2629 	rc = ptlrpc_set_wait(set);
2630 	ptlrpc_set_destroy(set);
2631 
2632 	return rc;
2633 }
2634 EXPORT_SYMBOL(ptlrpc_queue_wait);
2635 
2636 struct ptlrpc_replay_async_args {
2637 	int praa_old_state;
2638 	int praa_old_status;
2639 };
2640 
2641 /**
2642  * Callback used for replayed requests reply processing.
2643  * In case of successful reply calls registered request replay callback.
2644  * In case of error restart replay process.
2645  */
ptlrpc_replay_interpret(const struct lu_env * env,struct ptlrpc_request * req,void * data,int rc)2646 static int ptlrpc_replay_interpret(const struct lu_env *env,
2647 				   struct ptlrpc_request *req,
2648 				   void *data, int rc)
2649 {
2650 	struct ptlrpc_replay_async_args *aa = data;
2651 	struct obd_import *imp = req->rq_import;
2652 
2653 	atomic_dec(&imp->imp_replay_inflight);
2654 
2655 	if (!ptlrpc_client_replied(req)) {
2656 		CERROR("request replay timed out, restarting recovery\n");
2657 		rc = -ETIMEDOUT;
2658 		goto out;
2659 	}
2660 
2661 	if (lustre_msg_get_type(req->rq_repmsg) == PTL_RPC_MSG_ERR &&
2662 	    (lustre_msg_get_status(req->rq_repmsg) == -ENOTCONN ||
2663 	     lustre_msg_get_status(req->rq_repmsg) == -ENODEV)) {
2664 		rc = lustre_msg_get_status(req->rq_repmsg);
2665 		goto out;
2666 	}
2667 
2668 	/** VBR: check version failure */
2669 	if (lustre_msg_get_status(req->rq_repmsg) == -EOVERFLOW) {
2670 		/** replay was failed due to version mismatch */
2671 		DEBUG_REQ(D_WARNING, req, "Version mismatch during replay\n");
2672 		spin_lock(&imp->imp_lock);
2673 		imp->imp_vbr_failed = 1;
2674 		imp->imp_no_lock_replay = 1;
2675 		spin_unlock(&imp->imp_lock);
2676 		lustre_msg_set_status(req->rq_repmsg, aa->praa_old_status);
2677 	} else {
2678 		/** The transno had better not change over replay. */
2679 		LASSERTF(lustre_msg_get_transno(req->rq_reqmsg) ==
2680 			 lustre_msg_get_transno(req->rq_repmsg) ||
2681 			 lustre_msg_get_transno(req->rq_repmsg) == 0,
2682 			 "%#llx/%#llx\n",
2683 			 lustre_msg_get_transno(req->rq_reqmsg),
2684 			 lustre_msg_get_transno(req->rq_repmsg));
2685 	}
2686 
2687 	spin_lock(&imp->imp_lock);
2688 	/** if replays by version then gap occur on server, no trust to locks */
2689 	if (lustre_msg_get_flags(req->rq_repmsg) & MSG_VERSION_REPLAY)
2690 		imp->imp_no_lock_replay = 1;
2691 	imp->imp_last_replay_transno = lustre_msg_get_transno(req->rq_reqmsg);
2692 	spin_unlock(&imp->imp_lock);
2693 	LASSERT(imp->imp_last_replay_transno);
2694 
2695 	/* transaction number shouldn't be bigger than the latest replayed */
2696 	if (req->rq_transno > lustre_msg_get_transno(req->rq_reqmsg)) {
2697 		DEBUG_REQ(D_ERROR, req,
2698 			  "Reported transno %llu is bigger than the replayed one: %llu",
2699 			  req->rq_transno,
2700 			  lustre_msg_get_transno(req->rq_reqmsg));
2701 		rc = -EINVAL;
2702 		goto out;
2703 	}
2704 
2705 	DEBUG_REQ(D_HA, req, "got rep");
2706 
2707 	/* let the callback do fixups, possibly including in the request */
2708 	if (req->rq_replay_cb)
2709 		req->rq_replay_cb(req);
2710 
2711 	if (ptlrpc_client_replied(req) &&
2712 	    lustre_msg_get_status(req->rq_repmsg) != aa->praa_old_status) {
2713 		DEBUG_REQ(D_ERROR, req, "status %d, old was %d",
2714 			  lustre_msg_get_status(req->rq_repmsg),
2715 			  aa->praa_old_status);
2716 	} else {
2717 		/* Put it back for re-replay. */
2718 		lustre_msg_set_status(req->rq_repmsg, aa->praa_old_status);
2719 	}
2720 
2721 	/*
2722 	 * Errors while replay can set transno to 0, but
2723 	 * imp_last_replay_transno shouldn't be set to 0 anyway
2724 	 */
2725 	if (req->rq_transno == 0)
2726 		CERROR("Transno is 0 during replay!\n");
2727 
2728 	/* continue with recovery */
2729 	rc = ptlrpc_import_recovery_state_machine(imp);
2730  out:
2731 	req->rq_send_state = aa->praa_old_state;
2732 
2733 	if (rc != 0)
2734 		/* this replay failed, so restart recovery */
2735 		ptlrpc_connect_import(imp);
2736 
2737 	return rc;
2738 }
2739 
2740 /**
2741  * Prepares and queues request for replay.
2742  * Adds it to ptlrpcd queue for actual sending.
2743  * Returns 0 on success.
2744  */
ptlrpc_replay_req(struct ptlrpc_request * req)2745 int ptlrpc_replay_req(struct ptlrpc_request *req)
2746 {
2747 	struct ptlrpc_replay_async_args *aa;
2748 
2749 	LASSERT(req->rq_import->imp_state == LUSTRE_IMP_REPLAY);
2750 
2751 	LASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2752 	aa = ptlrpc_req_async_args(req);
2753 	memset(aa, 0, sizeof(*aa));
2754 
2755 	/* Prepare request to be resent with ptlrpcd */
2756 	aa->praa_old_state = req->rq_send_state;
2757 	req->rq_send_state = LUSTRE_IMP_REPLAY;
2758 	req->rq_phase = RQ_PHASE_NEW;
2759 	req->rq_next_phase = RQ_PHASE_UNDEFINED;
2760 	if (req->rq_repmsg)
2761 		aa->praa_old_status = lustre_msg_get_status(req->rq_repmsg);
2762 	req->rq_status = 0;
2763 	req->rq_interpret_reply = ptlrpc_replay_interpret;
2764 	/* Readjust the timeout for current conditions */
2765 	ptlrpc_at_set_req_timeout(req);
2766 
2767 	/*
2768 	 * Tell server the net_latency, so the server can calculate how long
2769 	 * it should wait for next replay
2770 	 */
2771 	lustre_msg_set_service_time(req->rq_reqmsg,
2772 				    ptlrpc_at_get_net_latency(req));
2773 	DEBUG_REQ(D_HA, req, "REPLAY");
2774 
2775 	atomic_inc(&req->rq_import->imp_replay_inflight);
2776 	ptlrpc_request_addref(req); /* ptlrpcd needs a ref */
2777 
2778 	ptlrpcd_add_req(req);
2779 	return 0;
2780 }
2781 EXPORT_SYMBOL(ptlrpc_replay_req);
2782 
2783 /**
2784  * Aborts all in-flight request on import \a imp sending and delayed lists
2785  */
ptlrpc_abort_inflight(struct obd_import * imp)2786 void ptlrpc_abort_inflight(struct obd_import *imp)
2787 {
2788 	struct list_head *tmp, *n;
2789 
2790 	/*
2791 	 * Make sure that no new requests get processed for this import.
2792 	 * ptlrpc_{queue,set}_wait must (and does) hold imp_lock while testing
2793 	 * this flag and then putting requests on sending_list or delayed_list.
2794 	 */
2795 	spin_lock(&imp->imp_lock);
2796 
2797 	/*
2798 	 * XXX locking?  Maybe we should remove each request with the list
2799 	 * locked?  Also, how do we know if the requests on the list are
2800 	 * being freed at this time?
2801 	 */
2802 	list_for_each_safe(tmp, n, &imp->imp_sending_list) {
2803 		struct ptlrpc_request *req =
2804 			list_entry(tmp, struct ptlrpc_request, rq_list);
2805 
2806 		DEBUG_REQ(D_RPCTRACE, req, "inflight");
2807 
2808 		spin_lock(&req->rq_lock);
2809 		if (req->rq_import_generation < imp->imp_generation) {
2810 			req->rq_err = 1;
2811 			req->rq_status = -EIO;
2812 			ptlrpc_client_wake_req(req);
2813 		}
2814 		spin_unlock(&req->rq_lock);
2815 	}
2816 
2817 	list_for_each_safe(tmp, n, &imp->imp_delayed_list) {
2818 		struct ptlrpc_request *req =
2819 			list_entry(tmp, struct ptlrpc_request, rq_list);
2820 
2821 		DEBUG_REQ(D_RPCTRACE, req, "aborting waiting req");
2822 
2823 		spin_lock(&req->rq_lock);
2824 		if (req->rq_import_generation < imp->imp_generation) {
2825 			req->rq_err = 1;
2826 			req->rq_status = -EIO;
2827 			ptlrpc_client_wake_req(req);
2828 		}
2829 		spin_unlock(&req->rq_lock);
2830 	}
2831 
2832 	/*
2833 	 * Last chance to free reqs left on the replay list, but we
2834 	 * will still leak reqs that haven't committed.
2835 	 */
2836 	if (imp->imp_replayable)
2837 		ptlrpc_free_committed(imp);
2838 
2839 	spin_unlock(&imp->imp_lock);
2840 }
2841 EXPORT_SYMBOL(ptlrpc_abort_inflight);
2842 
2843 /**
2844  * Abort all uncompleted requests in request set \a set
2845  */
ptlrpc_abort_set(struct ptlrpc_request_set * set)2846 void ptlrpc_abort_set(struct ptlrpc_request_set *set)
2847 {
2848 	struct list_head *tmp, *pos;
2849 
2850 	LASSERT(set != NULL);
2851 
2852 	list_for_each_safe(pos, tmp, &set->set_requests) {
2853 		struct ptlrpc_request *req =
2854 			list_entry(pos, struct ptlrpc_request,
2855 				       rq_set_chain);
2856 
2857 		spin_lock(&req->rq_lock);
2858 		if (req->rq_phase != RQ_PHASE_RPC) {
2859 			spin_unlock(&req->rq_lock);
2860 			continue;
2861 		}
2862 
2863 		req->rq_err = 1;
2864 		req->rq_status = -EINTR;
2865 		ptlrpc_client_wake_req(req);
2866 		spin_unlock(&req->rq_lock);
2867 	}
2868 }
2869 
2870 static __u64 ptlrpc_last_xid;
2871 static spinlock_t ptlrpc_last_xid_lock;
2872 
2873 /**
2874  * Initialize the XID for the node.  This is common among all requests on
2875  * this node, and only requires the property that it is monotonically
2876  * increasing.  It does not need to be sequential.  Since this is also used
2877  * as the RDMA match bits, it is important that a single client NOT have
2878  * the same match bits for two different in-flight requests, hence we do
2879  * NOT want to have an XID per target or similar.
2880  *
2881  * To avoid an unlikely collision between match bits after a client reboot
2882  * (which would deliver old data into the wrong RDMA buffer) initialize
2883  * the XID based on the current time, assuming a maximum RPC rate of 1M RPC/s.
2884  * If the time is clearly incorrect, we instead use a 62-bit random number.
2885  * In the worst case the random number will overflow 1M RPCs per second in
2886  * 9133 years, or permutations thereof.
2887  */
2888 #define YEAR_2004 (1ULL << 30)
ptlrpc_init_xid(void)2889 void ptlrpc_init_xid(void)
2890 {
2891 	time64_t now = ktime_get_real_seconds();
2892 
2893 	spin_lock_init(&ptlrpc_last_xid_lock);
2894 	if (now < YEAR_2004) {
2895 		cfs_get_random_bytes(&ptlrpc_last_xid, sizeof(ptlrpc_last_xid));
2896 		ptlrpc_last_xid >>= 2;
2897 		ptlrpc_last_xid |= (1ULL << 61);
2898 	} else {
2899 		ptlrpc_last_xid = (__u64)now << 20;
2900 	}
2901 
2902 	/* Always need to be aligned to a power-of-two for multi-bulk BRW */
2903 	CLASSERT(((PTLRPC_BULK_OPS_COUNT - 1) & PTLRPC_BULK_OPS_COUNT) == 0);
2904 	ptlrpc_last_xid &= PTLRPC_BULK_OPS_MASK;
2905 }
2906 
2907 /**
2908  * Increase xid and returns resulting new value to the caller.
2909  *
2910  * Multi-bulk BRW RPCs consume multiple XIDs for each bulk transfer, starting
2911  * at the returned xid, up to xid + PTLRPC_BULK_OPS_COUNT - 1. The BRW RPC
2912  * itself uses the last bulk xid needed, so the server can determine the
2913  * the number of bulk transfers from the RPC XID and a bitmask.  The starting
2914  * xid must align to a power-of-two value.
2915  *
2916  * This is assumed to be true due to the initial ptlrpc_last_xid
2917  * value also being initialized to a power-of-two value. LU-1431
2918  */
ptlrpc_next_xid(void)2919 __u64 ptlrpc_next_xid(void)
2920 {
2921 	__u64 next;
2922 
2923 	spin_lock(&ptlrpc_last_xid_lock);
2924 	next = ptlrpc_last_xid + PTLRPC_BULK_OPS_COUNT;
2925 	ptlrpc_last_xid = next;
2926 	spin_unlock(&ptlrpc_last_xid_lock);
2927 
2928 	return next;
2929 }
2930 EXPORT_SYMBOL(ptlrpc_next_xid);
2931 
2932 /**
2933  * Get a glimpse at what next xid value might have been.
2934  * Returns possible next xid.
2935  */
ptlrpc_sample_next_xid(void)2936 __u64 ptlrpc_sample_next_xid(void)
2937 {
2938 #if BITS_PER_LONG == 32
2939 	/* need to avoid possible word tearing on 32-bit systems */
2940 	__u64 next;
2941 
2942 	spin_lock(&ptlrpc_last_xid_lock);
2943 	next = ptlrpc_last_xid + PTLRPC_BULK_OPS_COUNT;
2944 	spin_unlock(&ptlrpc_last_xid_lock);
2945 
2946 	return next;
2947 #else
2948 	/* No need to lock, since returned value is racy anyways */
2949 	return ptlrpc_last_xid + PTLRPC_BULK_OPS_COUNT;
2950 #endif
2951 }
2952 EXPORT_SYMBOL(ptlrpc_sample_next_xid);
2953 
2954 /**
2955  * Functions for operating ptlrpc workers.
2956  *
2957  * A ptlrpc work is a function which will be running inside ptlrpc context.
2958  * The callback shouldn't sleep otherwise it will block that ptlrpcd thread.
2959  *
2960  * 1. after a work is created, it can be used many times, that is:
2961  *	 handler = ptlrpcd_alloc_work();
2962  *	 ptlrpcd_queue_work();
2963  *
2964  *    queue it again when necessary:
2965  *	 ptlrpcd_queue_work();
2966  *	 ptlrpcd_destroy_work();
2967  * 2. ptlrpcd_queue_work() can be called by multiple processes meanwhile, but
2968  *    it will only be queued once in any time. Also as its name implies, it may
2969  *    have delay before it really runs by ptlrpcd thread.
2970  */
2971 struct ptlrpc_work_async_args {
2972 	int (*cb)(const struct lu_env *, void *);
2973 	void *cbdata;
2974 };
2975 
ptlrpcd_add_work_req(struct ptlrpc_request * req)2976 static void ptlrpcd_add_work_req(struct ptlrpc_request *req)
2977 {
2978 	/* re-initialize the req */
2979 	req->rq_timeout		= obd_timeout;
2980 	req->rq_sent		= ktime_get_real_seconds();
2981 	req->rq_deadline	= req->rq_sent + req->rq_timeout;
2982 	req->rq_reply_deadline	= req->rq_deadline;
2983 	req->rq_phase		= RQ_PHASE_INTERPRET;
2984 	req->rq_next_phase	= RQ_PHASE_COMPLETE;
2985 	req->rq_xid		= ptlrpc_next_xid();
2986 	req->rq_import_generation = req->rq_import->imp_generation;
2987 
2988 	ptlrpcd_add_req(req);
2989 }
2990 
work_interpreter(const struct lu_env * env,struct ptlrpc_request * req,void * data,int rc)2991 static int work_interpreter(const struct lu_env *env,
2992 			    struct ptlrpc_request *req, void *data, int rc)
2993 {
2994 	struct ptlrpc_work_async_args *arg = data;
2995 
2996 	LASSERT(ptlrpcd_check_work(req));
2997 	LASSERT(arg->cb != NULL);
2998 
2999 	rc = arg->cb(env, arg->cbdata);
3000 
3001 	list_del_init(&req->rq_set_chain);
3002 	req->rq_set = NULL;
3003 
3004 	if (atomic_dec_return(&req->rq_refcount) > 1) {
3005 		atomic_set(&req->rq_refcount, 2);
3006 		ptlrpcd_add_work_req(req);
3007 	}
3008 	return rc;
3009 }
3010 
3011 static int worker_format;
3012 
ptlrpcd_check_work(struct ptlrpc_request * req)3013 static int ptlrpcd_check_work(struct ptlrpc_request *req)
3014 {
3015 	return req->rq_pill.rc_fmt == (void *)&worker_format;
3016 }
3017 
3018 /**
3019  * Create a work for ptlrpc.
3020  */
ptlrpcd_alloc_work(struct obd_import * imp,int (* cb)(const struct lu_env *,void *),void * cbdata)3021 void *ptlrpcd_alloc_work(struct obd_import *imp,
3022 			 int (*cb)(const struct lu_env *, void *), void *cbdata)
3023 {
3024 	struct ptlrpc_request	 *req = NULL;
3025 	struct ptlrpc_work_async_args *args;
3026 
3027 	might_sleep();
3028 
3029 	if (cb == NULL)
3030 		return ERR_PTR(-EINVAL);
3031 
3032 	/* copy some code from deprecated fakereq. */
3033 	req = ptlrpc_request_cache_alloc(GFP_NOFS);
3034 	if (req == NULL) {
3035 		CERROR("ptlrpc: run out of memory!\n");
3036 		return ERR_PTR(-ENOMEM);
3037 	}
3038 
3039 	req->rq_send_state = LUSTRE_IMP_FULL;
3040 	req->rq_type = PTL_RPC_MSG_REQUEST;
3041 	req->rq_import = class_import_get(imp);
3042 	req->rq_export = NULL;
3043 	req->rq_interpret_reply = work_interpreter;
3044 	/* don't want reply */
3045 	req->rq_receiving_reply = 0;
3046 	req->rq_req_unlink = req->rq_reply_unlink = 0;
3047 	req->rq_no_delay = req->rq_no_resend = 1;
3048 	req->rq_pill.rc_fmt = (void *)&worker_format;
3049 
3050 	spin_lock_init(&req->rq_lock);
3051 	INIT_LIST_HEAD(&req->rq_list);
3052 	INIT_LIST_HEAD(&req->rq_replay_list);
3053 	INIT_LIST_HEAD(&req->rq_set_chain);
3054 	INIT_LIST_HEAD(&req->rq_history_list);
3055 	INIT_LIST_HEAD(&req->rq_exp_list);
3056 	init_waitqueue_head(&req->rq_reply_waitq);
3057 	init_waitqueue_head(&req->rq_set_waitq);
3058 	atomic_set(&req->rq_refcount, 1);
3059 
3060 	CLASSERT(sizeof(*args) <= sizeof(req->rq_async_args));
3061 	args = ptlrpc_req_async_args(req);
3062 	args->cb = cb;
3063 	args->cbdata = cbdata;
3064 
3065 	return req;
3066 }
3067 EXPORT_SYMBOL(ptlrpcd_alloc_work);
3068 
ptlrpcd_destroy_work(void * handler)3069 void ptlrpcd_destroy_work(void *handler)
3070 {
3071 	struct ptlrpc_request *req = handler;
3072 
3073 	if (req)
3074 		ptlrpc_req_finished(req);
3075 }
3076 EXPORT_SYMBOL(ptlrpcd_destroy_work);
3077 
ptlrpcd_queue_work(void * handler)3078 int ptlrpcd_queue_work(void *handler)
3079 {
3080 	struct ptlrpc_request *req = handler;
3081 
3082 	/*
3083 	 * Check if the req is already being queued.
3084 	 *
3085 	 * Here comes a trick: it lacks a way of checking if a req is being
3086 	 * processed reliably in ptlrpc. Here I have to use refcount of req
3087 	 * for this purpose. This is okay because the caller should use this
3088 	 * req as opaque data. - Jinshan
3089 	 */
3090 	LASSERT(atomic_read(&req->rq_refcount) > 0);
3091 	if (atomic_inc_return(&req->rq_refcount) == 2)
3092 		ptlrpcd_add_work_req(req);
3093 	return 0;
3094 }
3095 EXPORT_SYMBOL(ptlrpcd_queue_work);
3096