Whamcloud - gitweb
LU-12559 ptlrpc: Hold imp lock for idle reconnect
[fs/lustre-release.git] / lustre / ptlrpc / client.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 /** Implementation of client-side PortalRPC interfaces */
34
35 #define DEBUG_SUBSYSTEM S_RPC
36
37 #include <linux/delay.h>
38 #include <linux/random.h>
39
40 #include <obd_support.h>
41 #include <obd_class.h>
42 #include <lustre_lib.h>
43 #include <lustre_ha.h>
44 #include <lustre_import.h>
45 #include <lustre_req_layout.h>
46
47 #include "ptlrpc_internal.h"
48
49 static void ptlrpc_prep_bulk_page_pin(struct ptlrpc_bulk_desc *desc,
50                                       struct page *page, int pageoffset,
51                                       int len)
52 {
53         __ptlrpc_prep_bulk_page(desc, page, pageoffset, len, 1);
54 }
55
56 static void ptlrpc_prep_bulk_page_nopin(struct ptlrpc_bulk_desc *desc,
57                                         struct page *page, int pageoffset,
58                                         int len)
59 {
60         __ptlrpc_prep_bulk_page(desc, page, pageoffset, len, 0);
61 }
62
63 static void ptlrpc_release_bulk_page_pin(struct ptlrpc_bulk_desc *desc)
64 {
65         int i;
66
67         for (i = 0; i < desc->bd_iov_count ; i++)
68                 put_page(BD_GET_KIOV(desc, i).kiov_page);
69 }
70
71 const struct ptlrpc_bulk_frag_ops ptlrpc_bulk_kiov_pin_ops = {
72         .add_kiov_frag  = ptlrpc_prep_bulk_page_pin,
73         .release_frags  = ptlrpc_release_bulk_page_pin,
74 };
75 EXPORT_SYMBOL(ptlrpc_bulk_kiov_pin_ops);
76
77 const struct ptlrpc_bulk_frag_ops ptlrpc_bulk_kiov_nopin_ops = {
78         .add_kiov_frag  = ptlrpc_prep_bulk_page_nopin,
79         .release_frags  = ptlrpc_release_bulk_noop,
80 };
81 EXPORT_SYMBOL(ptlrpc_bulk_kiov_nopin_ops);
82
83 const struct ptlrpc_bulk_frag_ops ptlrpc_bulk_kvec_ops = {
84         .add_iov_frag = ptlrpc_prep_bulk_frag,
85 };
86 EXPORT_SYMBOL(ptlrpc_bulk_kvec_ops);
87
88 static int ptlrpc_send_new_req(struct ptlrpc_request *req);
89 static int ptlrpcd_check_work(struct ptlrpc_request *req);
90 static int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async);
91
92 /**
93  * Initialize passed in client structure \a cl.
94  */
95 void ptlrpc_init_client(int req_portal, int rep_portal, const char *name,
96                         struct ptlrpc_client *cl)
97 {
98         cl->cli_request_portal = req_portal;
99         cl->cli_reply_portal   = rep_portal;
100         cl->cli_name           = name;
101 }
102 EXPORT_SYMBOL(ptlrpc_init_client);
103
104 /**
105  * Return PortalRPC connection for remore uud \a uuid
106  */
107 struct ptlrpc_connection *ptlrpc_uuid_to_connection(struct obd_uuid *uuid,
108                                                     lnet_nid_t nid4refnet)
109 {
110         struct ptlrpc_connection *c;
111         lnet_nid_t self;
112         struct lnet_process_id peer;
113         int err;
114
115         /*
116          * ptlrpc_uuid_to_peer() initializes its 2nd parameter
117          * before accessing its values.
118          */
119         /* coverity[uninit_use_in_call] */
120         peer.nid = nid4refnet;
121         err = ptlrpc_uuid_to_peer(uuid, &peer, &self);
122         if (err != 0) {
123                 CNETERR("cannot find peer %s!\n", uuid->uuid);
124                 return NULL;
125         }
126
127         c = ptlrpc_connection_get(peer, self, uuid);
128         if (c) {
129                 memcpy(c->c_remote_uuid.uuid,
130                        uuid->uuid, sizeof(c->c_remote_uuid.uuid));
131         }
132
133         CDEBUG(D_INFO, "%s -> %p\n", uuid->uuid, c);
134
135         return c;
136 }
137
138 /**
139  * Allocate and initialize new bulk descriptor on the sender.
140  * Returns pointer to the descriptor or NULL on error.
141  */
142 struct ptlrpc_bulk_desc *ptlrpc_new_bulk(unsigned int nfrags,
143                                          unsigned int max_brw,
144                                          enum ptlrpc_bulk_op_type type,
145                                          unsigned int portal,
146                                          const struct ptlrpc_bulk_frag_ops *ops)
147 {
148         struct ptlrpc_bulk_desc *desc;
149         int i;
150
151         /* ensure that only one of KIOV or IOVEC is set but not both */
152         LASSERT((ptlrpc_is_bulk_desc_kiov(type) &&
153                  ops->add_kiov_frag != NULL) ||
154                 (ptlrpc_is_bulk_desc_kvec(type) &&
155                  ops->add_iov_frag != NULL));
156
157         OBD_ALLOC_PTR(desc);
158         if (!desc)
159                 return NULL;
160         if (type & PTLRPC_BULK_BUF_KIOV) {
161                 OBD_ALLOC_LARGE(GET_KIOV(desc),
162                                 nfrags * sizeof(*GET_KIOV(desc)));
163                 if (!GET_KIOV(desc))
164                         goto out;
165         } else {
166                 OBD_ALLOC_LARGE(GET_KVEC(desc),
167                                 nfrags * sizeof(*GET_KVEC(desc)));
168                 if (!GET_KVEC(desc))
169                         goto out;
170         }
171
172         spin_lock_init(&desc->bd_lock);
173         init_waitqueue_head(&desc->bd_waitq);
174         desc->bd_max_iov = nfrags;
175         desc->bd_iov_count = 0;
176         desc->bd_portal = portal;
177         desc->bd_type = type;
178         desc->bd_md_count = 0;
179         desc->bd_frag_ops = ops;
180         LASSERT(max_brw > 0);
181         desc->bd_md_max_brw = min(max_brw, PTLRPC_BULK_OPS_COUNT);
182         /*
183          * PTLRPC_BULK_OPS_COUNT is the compile-time transfer limit for this
184          * node. Negotiated ocd_brw_size will always be <= this number.
185          */
186         for (i = 0; i < PTLRPC_BULK_OPS_COUNT; i++)
187                 LNetInvalidateMDHandle(&desc->bd_mds[i]);
188
189         return desc;
190 out:
191         OBD_FREE_PTR(desc);
192         return NULL;
193 }
194
195 /**
196  * Prepare bulk descriptor for specified outgoing request \a req that
197  * can fit \a nfrags * pages. \a type is bulk type. \a portal is where
198  * the bulk to be sent. Used on client-side.
199  * Returns pointer to newly allocatrd initialized bulk descriptor or NULL on
200  * error.
201  */
202 struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_imp(struct ptlrpc_request *req,
203                                               unsigned int nfrags,
204                                               unsigned int max_brw,
205                                               unsigned int type,
206                                               unsigned int portal,
207                                               const struct ptlrpc_bulk_frag_ops
208                                                 *ops)
209 {
210         struct obd_import *imp = req->rq_import;
211         struct ptlrpc_bulk_desc *desc;
212
213         ENTRY;
214         LASSERT(ptlrpc_is_bulk_op_passive(type));
215
216         desc = ptlrpc_new_bulk(nfrags, max_brw, type, portal, ops);
217         if (!desc)
218                 RETURN(NULL);
219
220         desc->bd_import = class_import_get(imp);
221         desc->bd_req = req;
222
223         desc->bd_cbid.cbid_fn  = client_bulk_callback;
224         desc->bd_cbid.cbid_arg = desc;
225
226         /* This makes req own desc, and free it when she frees herself */
227         req->rq_bulk = desc;
228
229         return desc;
230 }
231 EXPORT_SYMBOL(ptlrpc_prep_bulk_imp);
232
233 void __ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc,
234                              struct page *page, int pageoffset, int len,
235                              int pin)
236 {
237         lnet_kiov_t *kiov;
238
239         LASSERT(desc->bd_iov_count < desc->bd_max_iov);
240         LASSERT(page != NULL);
241         LASSERT(pageoffset >= 0);
242         LASSERT(len > 0);
243         LASSERT(pageoffset + len <= PAGE_SIZE);
244         LASSERT(ptlrpc_is_bulk_desc_kiov(desc->bd_type));
245
246         kiov = &BD_GET_KIOV(desc, desc->bd_iov_count);
247
248         desc->bd_nob += len;
249
250         if (pin)
251                 get_page(page);
252
253         kiov->kiov_page = page;
254         kiov->kiov_offset = pageoffset;
255         kiov->kiov_len = len;
256
257         desc->bd_iov_count++;
258 }
259 EXPORT_SYMBOL(__ptlrpc_prep_bulk_page);
260
261 int ptlrpc_prep_bulk_frag(struct ptlrpc_bulk_desc *desc,
262                           void *frag, int len)
263 {
264         struct kvec *iovec;
265
266         ENTRY;
267
268         LASSERT(desc->bd_iov_count < desc->bd_max_iov);
269         LASSERT(frag != NULL);
270         LASSERT(len > 0);
271         LASSERT(ptlrpc_is_bulk_desc_kvec(desc->bd_type));
272
273         iovec = &BD_GET_KVEC(desc, desc->bd_iov_count);
274
275         desc->bd_nob += len;
276
277         iovec->iov_base = frag;
278         iovec->iov_len = len;
279
280         desc->bd_iov_count++;
281
282         RETURN(desc->bd_nob);
283 }
284 EXPORT_SYMBOL(ptlrpc_prep_bulk_frag);
285
286 void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *desc)
287 {
288         ENTRY;
289
290         LASSERT(desc != NULL);
291         LASSERT(desc->bd_iov_count != LI_POISON); /* not freed already */
292         LASSERT(desc->bd_md_count == 0);         /* network hands off */
293         LASSERT((desc->bd_export != NULL) ^ (desc->bd_import != NULL));
294         LASSERT(desc->bd_frag_ops != NULL);
295
296         if (ptlrpc_is_bulk_desc_kiov(desc->bd_type))
297                 sptlrpc_enc_pool_put_pages(desc);
298
299         if (desc->bd_export)
300                 class_export_put(desc->bd_export);
301         else
302                 class_import_put(desc->bd_import);
303
304         if (desc->bd_frag_ops->release_frags != NULL)
305                 desc->bd_frag_ops->release_frags(desc);
306
307         if (ptlrpc_is_bulk_desc_kiov(desc->bd_type))
308                 OBD_FREE_LARGE(GET_KIOV(desc),
309                                desc->bd_max_iov * sizeof(*GET_KIOV(desc)));
310         else
311                 OBD_FREE_LARGE(GET_KVEC(desc),
312                                desc->bd_max_iov * sizeof(*GET_KVEC(desc)));
313         OBD_FREE_PTR(desc);
314         EXIT;
315 }
316 EXPORT_SYMBOL(ptlrpc_free_bulk);
317
318 /**
319  * Set server timelimit for this req, i.e. how long are we willing to wait
320  * for reply before timing out this request.
321  */
322 void ptlrpc_at_set_req_timeout(struct ptlrpc_request *req)
323 {
324         __u32 serv_est;
325         int idx;
326         struct imp_at *at;
327
328         LASSERT(req->rq_import);
329
330         if (AT_OFF) {
331                 /* non-AT settings */
332                 /**
333                  * \a imp_server_timeout means this is reverse import and
334                  * we send (currently only) ASTs to the client and cannot afford
335                  * to wait too long for the reply, otherwise the other client
336                  * (because of which we are sending this request) would
337                  * timeout waiting for us
338                  */
339                 req->rq_timeout = req->rq_import->imp_server_timeout ?
340                                   obd_timeout / 2 : obd_timeout;
341         } else {
342                 at = &req->rq_import->imp_at;
343                 idx = import_at_get_index(req->rq_import,
344                                           req->rq_request_portal);
345                 serv_est = at_get(&at->iat_service_estimate[idx]);
346                 req->rq_timeout = at_est2timeout(serv_est);
347         }
348         /*
349          * We could get even fancier here, using history to predict increased
350          * loading...
351          */
352
353         /*
354          * Let the server know what this RPC timeout is by putting it in the
355          * reqmsg
356          */
357         lustre_msg_set_timeout(req->rq_reqmsg, req->rq_timeout);
358 }
359 EXPORT_SYMBOL(ptlrpc_at_set_req_timeout);
360
361 /* Adjust max service estimate based on server value */
362 static void ptlrpc_at_adj_service(struct ptlrpc_request *req,
363                                   unsigned int serv_est)
364 {
365         int idx;
366         unsigned int oldse;
367         struct imp_at *at;
368
369         LASSERT(req->rq_import);
370         at = &req->rq_import->imp_at;
371
372         idx = import_at_get_index(req->rq_import, req->rq_request_portal);
373         /*
374          * max service estimates are tracked on the server side,
375          * so just keep minimal history here
376          */
377         oldse = at_measured(&at->iat_service_estimate[idx], serv_est);
378         if (oldse != 0)
379                 CDEBUG(D_ADAPTTO,
380                        "The RPC service estimate for %s ptl %d has changed from %d to %d\n",
381                        req->rq_import->imp_obd->obd_name,
382                        req->rq_request_portal,
383                        oldse, at_get(&at->iat_service_estimate[idx]));
384 }
385
386 /* Expected network latency per remote node (secs) */
387 int ptlrpc_at_get_net_latency(struct ptlrpc_request *req)
388 {
389         return AT_OFF ? 0 : at_get(&req->rq_import->imp_at.iat_net_latency);
390 }
391
392 /* Adjust expected network latency */
393 void ptlrpc_at_adj_net_latency(struct ptlrpc_request *req,
394                                unsigned int service_time)
395 {
396         unsigned int nl, oldnl;
397         struct imp_at *at;
398         time64_t now = ktime_get_real_seconds();
399
400         LASSERT(req->rq_import);
401
402         if (service_time > now - req->rq_sent + 3) {
403                 /*
404                  * b=16408, however, this can also happen if early reply
405                  * is lost and client RPC is expired and resent, early reply
406                  * or reply of original RPC can still be fit in reply buffer
407                  * of resent RPC, now client is measuring time from the
408                  * resent time, but server sent back service time of original
409                  * RPC.
410                  */
411                 CDEBUG((lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) ?
412                        D_ADAPTTO : D_WARNING,
413                        "Reported service time %u > total measured time %lld\n",
414                        service_time, now - req->rq_sent);
415                 return;
416         }
417
418         /* Network latency is total time less server processing time */
419         nl = max_t(int, now - req->rq_sent -
420                         service_time, 0) + 1; /* st rounding */
421         at = &req->rq_import->imp_at;
422
423         oldnl = at_measured(&at->iat_net_latency, nl);
424         if (oldnl != 0)
425                 CDEBUG(D_ADAPTTO,
426                        "The network latency for %s (nid %s) has changed from %d to %d\n",
427                        req->rq_import->imp_obd->obd_name,
428                        obd_uuid2str(&req->rq_import->imp_connection->c_remote_uuid),
429                        oldnl, at_get(&at->iat_net_latency));
430 }
431
432 static int unpack_reply(struct ptlrpc_request *req)
433 {
434         int rc;
435
436         if (SPTLRPC_FLVR_POLICY(req->rq_flvr.sf_rpc) != SPTLRPC_POLICY_NULL) {
437                 rc = ptlrpc_unpack_rep_msg(req, req->rq_replen);
438                 if (rc) {
439                         DEBUG_REQ(D_ERROR, req, "unpack_rep failed: rc = %d",
440                                   rc);
441                         return -EPROTO;
442                 }
443         }
444
445         rc = lustre_unpack_rep_ptlrpc_body(req, MSG_PTLRPC_BODY_OFF);
446         if (rc) {
447                 DEBUG_REQ(D_ERROR, req, "unpack ptlrpc body failed: rc = %d",
448                           rc);
449                 return -EPROTO;
450         }
451         return 0;
452 }
453
454 /**
455  * Handle an early reply message, called with the rq_lock held.
456  * If anything goes wrong just ignore it - same as if it never happened
457  */
458 static int ptlrpc_at_recv_early_reply(struct ptlrpc_request *req)
459 __must_hold(&req->rq_lock)
460 {
461         struct ptlrpc_request *early_req;
462         time64_t olddl;
463         int rc;
464
465         ENTRY;
466         req->rq_early = 0;
467         spin_unlock(&req->rq_lock);
468
469         rc = sptlrpc_cli_unwrap_early_reply(req, &early_req);
470         if (rc) {
471                 spin_lock(&req->rq_lock);
472                 RETURN(rc);
473         }
474
475         rc = unpack_reply(early_req);
476         if (rc != 0) {
477                 sptlrpc_cli_finish_early_reply(early_req);
478                 spin_lock(&req->rq_lock);
479                 RETURN(rc);
480         }
481
482         /*
483          * Use new timeout value just to adjust the local value for this
484          * request, don't include it into at_history. It is unclear yet why
485          * service time increased and should it be counted or skipped, e.g.
486          * that can be recovery case or some error or server, the real reply
487          * will add all new data if it is worth to add.
488          */
489         req->rq_timeout = lustre_msg_get_timeout(early_req->rq_repmsg);
490         lustre_msg_set_timeout(req->rq_reqmsg, req->rq_timeout);
491
492         /* Network latency can be adjusted, it is pure network delays */
493         ptlrpc_at_adj_net_latency(req,
494                                   lustre_msg_get_service_time(early_req->rq_repmsg));
495
496         sptlrpc_cli_finish_early_reply(early_req);
497
498         spin_lock(&req->rq_lock);
499         olddl = req->rq_deadline;
500         /*
501          * server assumes it now has rq_timeout from when the request
502          * arrived, so the client should give it at least that long.
503          * since we don't know the arrival time we'll use the original
504          * sent time
505          */
506         req->rq_deadline = req->rq_sent + req->rq_timeout +
507                            ptlrpc_at_get_net_latency(req);
508
509         /* The below message is checked in replay-single.sh test_65{a,b} */
510         /* The below message is checked in sanity-{gss,krb5} test_8 */
511         DEBUG_REQ(D_ADAPTTO, req,
512                   "Early reply #%d, new deadline in %llds (%llds)",
513                   req->rq_early_count,
514                   req->rq_deadline - ktime_get_real_seconds(),
515                   req->rq_deadline - olddl);
516
517         RETURN(rc);
518 }
519
520 static struct kmem_cache *request_cache;
521
522 int ptlrpc_request_cache_init(void)
523 {
524         request_cache = kmem_cache_create("ptlrpc_cache",
525                                           sizeof(struct ptlrpc_request),
526                                           0, SLAB_HWCACHE_ALIGN, NULL);
527         return request_cache ? 0 : -ENOMEM;
528 }
529
530 void ptlrpc_request_cache_fini(void)
531 {
532         kmem_cache_destroy(request_cache);
533 }
534
535 struct ptlrpc_request *ptlrpc_request_cache_alloc(gfp_t flags)
536 {
537         struct ptlrpc_request *req;
538
539         OBD_SLAB_ALLOC_PTR_GFP(req, request_cache, flags);
540         return req;
541 }
542
543 void ptlrpc_request_cache_free(struct ptlrpc_request *req)
544 {
545         OBD_SLAB_FREE_PTR(req, request_cache);
546 }
547
548 /**
549  * Wind down request pool \a pool.
550  * Frees all requests from the pool too
551  */
552 void ptlrpc_free_rq_pool(struct ptlrpc_request_pool *pool)
553 {
554         struct list_head *l, *tmp;
555         struct ptlrpc_request *req;
556
557         LASSERT(pool != NULL);
558
559         spin_lock(&pool->prp_lock);
560         list_for_each_safe(l, tmp, &pool->prp_req_list) {
561                 req = list_entry(l, struct ptlrpc_request, rq_list);
562                 list_del(&req->rq_list);
563                 LASSERT(req->rq_reqbuf);
564                 LASSERT(req->rq_reqbuf_len == pool->prp_rq_size);
565                 OBD_FREE_LARGE(req->rq_reqbuf, pool->prp_rq_size);
566                 ptlrpc_request_cache_free(req);
567         }
568         spin_unlock(&pool->prp_lock);
569         OBD_FREE(pool, sizeof(*pool));
570 }
571 EXPORT_SYMBOL(ptlrpc_free_rq_pool);
572
573 /**
574  * Allocates, initializes and adds \a num_rq requests to the pool \a pool
575  */
576 int ptlrpc_add_rqs_to_pool(struct ptlrpc_request_pool *pool, int num_rq)
577 {
578         int i;
579         int size = 1;
580
581         while (size < pool->prp_rq_size)
582                 size <<= 1;
583
584         LASSERTF(list_empty(&pool->prp_req_list) ||
585                  size == pool->prp_rq_size,
586                  "Trying to change pool size with nonempty pool from %d to %d bytes\n",
587                  pool->prp_rq_size, size);
588
589         pool->prp_rq_size = size;
590         for (i = 0; i < num_rq; i++) {
591                 struct ptlrpc_request *req;
592                 struct lustre_msg *msg;
593
594                 req = ptlrpc_request_cache_alloc(GFP_NOFS);
595                 if (!req)
596                         return i;
597                 OBD_ALLOC_LARGE(msg, size);
598                 if (!msg) {
599                         ptlrpc_request_cache_free(req);
600                         return i;
601                 }
602                 req->rq_reqbuf = msg;
603                 req->rq_reqbuf_len = size;
604                 req->rq_pool = pool;
605                 spin_lock(&pool->prp_lock);
606                 list_add_tail(&req->rq_list, &pool->prp_req_list);
607                 spin_unlock(&pool->prp_lock);
608         }
609         return num_rq;
610 }
611 EXPORT_SYMBOL(ptlrpc_add_rqs_to_pool);
612
613 /**
614  * Create and initialize new request pool with given attributes:
615  * \a num_rq - initial number of requests to create for the pool
616  * \a msgsize - maximum message size possible for requests in thid pool
617  * \a populate_pool - function to be called when more requests need to be added
618  *                    to the pool
619  * Returns pointer to newly created pool or NULL on error.
620  */
621 struct ptlrpc_request_pool *
622 ptlrpc_init_rq_pool(int num_rq, int msgsize,
623                     int (*populate_pool)(struct ptlrpc_request_pool *, int))
624 {
625         struct ptlrpc_request_pool *pool;
626
627         OBD_ALLOC(pool, sizeof(struct ptlrpc_request_pool));
628         if (!pool)
629                 return NULL;
630
631         /*
632          * Request next power of two for the allocation, because internally
633          * kernel would do exactly this
634          */
635         spin_lock_init(&pool->prp_lock);
636         INIT_LIST_HEAD(&pool->prp_req_list);
637         pool->prp_rq_size = msgsize + SPTLRPC_MAX_PAYLOAD;
638         pool->prp_populate = populate_pool;
639
640         populate_pool(pool, num_rq);
641
642         return pool;
643 }
644 EXPORT_SYMBOL(ptlrpc_init_rq_pool);
645
646 /**
647  * Fetches one request from pool \a pool
648  */
649 static struct ptlrpc_request *
650 ptlrpc_prep_req_from_pool(struct ptlrpc_request_pool *pool)
651 {
652         struct ptlrpc_request *request;
653         struct lustre_msg *reqbuf;
654
655         if (!pool)
656                 return NULL;
657
658         spin_lock(&pool->prp_lock);
659
660         /*
661          * See if we have anything in a pool, and bail out if nothing,
662          * in writeout path, where this matters, this is safe to do, because
663          * nothing is lost in this case, and when some in-flight requests
664          * complete, this code will be called again.
665          */
666         if (unlikely(list_empty(&pool->prp_req_list))) {
667                 spin_unlock(&pool->prp_lock);
668                 return NULL;
669         }
670
671         request = list_entry(pool->prp_req_list.next, struct ptlrpc_request,
672                              rq_list);
673         list_del_init(&request->rq_list);
674         spin_unlock(&pool->prp_lock);
675
676         LASSERT(request->rq_reqbuf);
677         LASSERT(request->rq_pool);
678
679         reqbuf = request->rq_reqbuf;
680         memset(request, 0, sizeof(*request));
681         request->rq_reqbuf = reqbuf;
682         request->rq_reqbuf_len = pool->prp_rq_size;
683         request->rq_pool = pool;
684
685         return request;
686 }
687
688 /**
689  * Returns freed \a request to pool.
690  */
691 static void __ptlrpc_free_req_to_pool(struct ptlrpc_request *request)
692 {
693         struct ptlrpc_request_pool *pool = request->rq_pool;
694
695         spin_lock(&pool->prp_lock);
696         LASSERT(list_empty(&request->rq_list));
697         LASSERT(!request->rq_receiving_reply);
698         list_add_tail(&request->rq_list, &pool->prp_req_list);
699         spin_unlock(&pool->prp_lock);
700 }
701
702 void ptlrpc_add_unreplied(struct ptlrpc_request *req)
703 {
704         struct obd_import *imp = req->rq_import;
705         struct list_head *tmp;
706         struct ptlrpc_request *iter;
707
708         assert_spin_locked(&imp->imp_lock);
709         LASSERT(list_empty(&req->rq_unreplied_list));
710
711         /* unreplied list is sorted by xid in ascending order */
712         list_for_each_prev(tmp, &imp->imp_unreplied_list) {
713                 iter = list_entry(tmp, struct ptlrpc_request,
714                                   rq_unreplied_list);
715
716                 LASSERT(req->rq_xid != iter->rq_xid);
717                 if (req->rq_xid < iter->rq_xid)
718                         continue;
719                 list_add(&req->rq_unreplied_list, &iter->rq_unreplied_list);
720                 return;
721         }
722         list_add(&req->rq_unreplied_list, &imp->imp_unreplied_list);
723 }
724
725 void ptlrpc_assign_next_xid_nolock(struct ptlrpc_request *req)
726 {
727         req->rq_xid = ptlrpc_next_xid();
728         ptlrpc_add_unreplied(req);
729 }
730
731 static inline void ptlrpc_assign_next_xid(struct ptlrpc_request *req)
732 {
733         spin_lock(&req->rq_import->imp_lock);
734         ptlrpc_assign_next_xid_nolock(req);
735         spin_unlock(&req->rq_import->imp_lock);
736 }
737
738 static atomic64_t ptlrpc_last_xid;
739
740 int ptlrpc_request_bufs_pack(struct ptlrpc_request *request,
741                              __u32 version, int opcode, char **bufs,
742                              struct ptlrpc_cli_ctx *ctx)
743 {
744         int count;
745         struct obd_import *imp;
746         __u32 *lengths;
747         int rc;
748
749         ENTRY;
750
751         count = req_capsule_filled_sizes(&request->rq_pill, RCL_CLIENT);
752         imp = request->rq_import;
753         lengths = request->rq_pill.rc_area[RCL_CLIENT];
754
755         if (ctx) {
756                 request->rq_cli_ctx = sptlrpc_cli_ctx_get(ctx);
757         } else {
758                 rc = sptlrpc_req_get_ctx(request);
759                 if (rc)
760                         GOTO(out_free, rc);
761         }
762         sptlrpc_req_set_flavor(request, opcode);
763
764         rc = lustre_pack_request(request, imp->imp_msg_magic, count,
765                                  lengths, bufs);
766         if (rc)
767                 GOTO(out_ctx, rc);
768
769         lustre_msg_add_version(request->rq_reqmsg, version);
770         request->rq_send_state = LUSTRE_IMP_FULL;
771         request->rq_type = PTL_RPC_MSG_REQUEST;
772
773         request->rq_req_cbid.cbid_fn  = request_out_callback;
774         request->rq_req_cbid.cbid_arg = request;
775
776         request->rq_reply_cbid.cbid_fn  = reply_in_callback;
777         request->rq_reply_cbid.cbid_arg = request;
778
779         request->rq_reply_deadline = 0;
780         request->rq_bulk_deadline = 0;
781         request->rq_req_deadline = 0;
782         request->rq_phase = RQ_PHASE_NEW;
783         request->rq_next_phase = RQ_PHASE_UNDEFINED;
784
785         request->rq_request_portal = imp->imp_client->cli_request_portal;
786         request->rq_reply_portal = imp->imp_client->cli_reply_portal;
787
788         ptlrpc_at_set_req_timeout(request);
789
790         lustre_msg_set_opc(request->rq_reqmsg, opcode);
791
792         /* Let's setup deadline for req/reply/bulk unlink for opcode. */
793         if (cfs_fail_val == opcode) {
794                 time64_t *fail_t = NULL, *fail2_t = NULL;
795
796                 if (CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_BULK_UNLINK)) {
797                         fail_t = &request->rq_bulk_deadline;
798                 } else if (CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK)) {
799                         fail_t = &request->rq_reply_deadline;
800                 } else if (CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REQ_UNLINK)) {
801                         fail_t = &request->rq_req_deadline;
802                 } else if (CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_BOTH_UNLINK)) {
803                         fail_t = &request->rq_reply_deadline;
804                         fail2_t = &request->rq_bulk_deadline;
805                 } else if (CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_ROUND_XID)) {
806                         time64_t now = ktime_get_real_seconds();
807                         u64 xid = ((u64)now >> 4) << 24;
808
809                         atomic64_set(&ptlrpc_last_xid, xid);
810                 }
811
812                 if (fail_t) {
813                         *fail_t = ktime_get_real_seconds() + LONG_UNLINK;
814
815                         if (fail2_t)
816                                 *fail2_t = ktime_get_real_seconds() +
817                                            LONG_UNLINK;
818
819                         /*
820                          * The RPC is infected, let the test to change the
821                          * fail_loc
822                          */
823                         msleep(4 * MSEC_PER_SEC);
824                 }
825         }
826         ptlrpc_assign_next_xid(request);
827
828         RETURN(0);
829
830 out_ctx:
831         LASSERT(!request->rq_pool);
832         sptlrpc_cli_ctx_put(request->rq_cli_ctx, 1);
833 out_free:
834         class_import_put(imp);
835
836         return rc;
837 }
838 EXPORT_SYMBOL(ptlrpc_request_bufs_pack);
839
840 /**
841  * Pack request buffers for network transfer, performing necessary encryption
842  * steps if necessary.
843  */
844 int ptlrpc_request_pack(struct ptlrpc_request *request,
845                         __u32 version, int opcode)
846 {
847         return ptlrpc_request_bufs_pack(request, version, opcode, NULL, NULL);
848 }
849 EXPORT_SYMBOL(ptlrpc_request_pack);
850
851 /**
852  * Helper function to allocate new request on import \a imp
853  * and possibly using existing request from pool \a pool if provided.
854  * Returns allocated request structure with import field filled or
855  * NULL on error.
856  */
857 static inline
858 struct ptlrpc_request *__ptlrpc_request_alloc(struct obd_import *imp,
859                                               struct ptlrpc_request_pool *pool)
860 {
861         struct ptlrpc_request *request = NULL;
862
863         request = ptlrpc_request_cache_alloc(GFP_NOFS);
864
865         if (!request && pool)
866                 request = ptlrpc_prep_req_from_pool(pool);
867
868         if (request) {
869                 ptlrpc_cli_req_init(request);
870
871                 LASSERTF((unsigned long)imp > 0x1000, "%p", imp);
872                 LASSERT(imp != LP_POISON);
873                 LASSERTF((unsigned long)imp->imp_client > 0x1000, "%p\n",
874                          imp->imp_client);
875                 LASSERT(imp->imp_client != LP_POISON);
876
877                 request->rq_import = class_import_get(imp);
878         } else {
879                 CERROR("request allocation out of memory\n");
880         }
881
882         return request;
883 }
884
885 /**
886  * Helper function for creating a request.
887  * Calls __ptlrpc_request_alloc to allocate new request sturcture and inits
888  * buffer structures according to capsule template \a format.
889  * Returns allocated request structure pointer or NULL on error.
890  */
891 static struct ptlrpc_request *
892 ptlrpc_request_alloc_internal(struct obd_import *imp,
893                               struct ptlrpc_request_pool *pool,
894                               const struct req_format *format)
895 {
896         struct ptlrpc_request *request;
897
898         request = __ptlrpc_request_alloc(imp, pool);
899         if (!request)
900                 return NULL;
901
902         /*
903          * initiate connection if needed when the import has been
904          * referenced by the new request to avoid races with disconnect
905          */
906         if (unlikely(imp->imp_state == LUSTRE_IMP_IDLE)) {
907                 int rc;
908
909                 CDEBUG_LIMIT(imp->imp_idle_debug,
910                              "%s: reconnect after %llds idle\n",
911                              imp->imp_obd->obd_name, ktime_get_real_seconds() -
912                                                      imp->imp_last_reply_time);
913                 spin_lock(&imp->imp_lock);
914                 if (imp->imp_state == LUSTRE_IMP_IDLE) {
915                         imp->imp_generation++;
916                         imp->imp_initiated_at = imp->imp_generation;
917                         imp->imp_state = LUSTRE_IMP_NEW;
918
919                         /* connect_import_locked releases imp_lock */
920                         rc = ptlrpc_connect_import_locked(imp);
921                         if (rc < 0) {
922                                 ptlrpc_request_free(request);
923                                 return NULL;
924                         }
925                         ptlrpc_pinger_add_import(imp);
926                 } else {
927                         spin_unlock(&imp->imp_lock);
928                 }
929         }
930
931         req_capsule_init(&request->rq_pill, request, RCL_CLIENT);
932         req_capsule_set(&request->rq_pill, format);
933         return request;
934 }
935
936 /**
937  * Allocate new request structure for import \a imp and initialize its
938  * buffer structure according to capsule template \a format.
939  */
940 struct ptlrpc_request *ptlrpc_request_alloc(struct obd_import *imp,
941                                             const struct req_format *format)
942 {
943         return ptlrpc_request_alloc_internal(imp, NULL, format);
944 }
945 EXPORT_SYMBOL(ptlrpc_request_alloc);
946
947 /**
948  * Allocate new request structure for import \a imp from pool \a pool and
949  * initialize its buffer structure according to capsule template \a format.
950  */
951 struct ptlrpc_request *
952 ptlrpc_request_alloc_pool(struct obd_import *imp,
953                           struct ptlrpc_request_pool *pool,
954                           const struct req_format *format)
955 {
956         return ptlrpc_request_alloc_internal(imp, pool, format);
957 }
958 EXPORT_SYMBOL(ptlrpc_request_alloc_pool);
959
960 /**
961  * For requests not from pool, free memory of the request structure.
962  * For requests obtained from a pool earlier, return request back to pool.
963  */
964 void ptlrpc_request_free(struct ptlrpc_request *request)
965 {
966         if (request->rq_pool)
967                 __ptlrpc_free_req_to_pool(request);
968         else
969                 ptlrpc_request_cache_free(request);
970 }
971 EXPORT_SYMBOL(ptlrpc_request_free);
972
973 /**
974  * Allocate new request for operatione \a opcode and immediatelly pack it for
975  * network transfer.
976  * Only used for simple requests like OBD_PING where the only important
977  * part of the request is operation itself.
978  * Returns allocated request or NULL on error.
979  */
980 struct ptlrpc_request *ptlrpc_request_alloc_pack(struct obd_import *imp,
981                                                  const struct req_format *format,
982                                                  __u32 version, int opcode)
983 {
984         struct ptlrpc_request *req = ptlrpc_request_alloc(imp, format);
985         int rc;
986
987         if (req) {
988                 rc = ptlrpc_request_pack(req, version, opcode);
989                 if (rc) {
990                         ptlrpc_request_free(req);
991                         req = NULL;
992                 }
993         }
994         return req;
995 }
996 EXPORT_SYMBOL(ptlrpc_request_alloc_pack);
997
998 /**
999  * Allocate and initialize new request set structure on the current CPT.
1000  * Returns a pointer to the newly allocated set structure or NULL on error.
1001  */
1002 struct ptlrpc_request_set *ptlrpc_prep_set(void)
1003 {
1004         struct ptlrpc_request_set *set;
1005         int cpt;
1006
1007         ENTRY;
1008         cpt = cfs_cpt_current(cfs_cpt_table, 0);
1009         OBD_CPT_ALLOC(set, cfs_cpt_table, cpt, sizeof(*set));
1010         if (!set)
1011                 RETURN(NULL);
1012         atomic_set(&set->set_refcount, 1);
1013         INIT_LIST_HEAD(&set->set_requests);
1014         init_waitqueue_head(&set->set_waitq);
1015         atomic_set(&set->set_new_count, 0);
1016         atomic_set(&set->set_remaining, 0);
1017         spin_lock_init(&set->set_new_req_lock);
1018         INIT_LIST_HEAD(&set->set_new_requests);
1019         set->set_max_inflight = UINT_MAX;
1020         set->set_producer     = NULL;
1021         set->set_producer_arg = NULL;
1022         set->set_rc           = 0;
1023
1024         RETURN(set);
1025 }
1026 EXPORT_SYMBOL(ptlrpc_prep_set);
1027
1028 /**
1029  * Allocate and initialize new request set structure with flow control
1030  * extension. This extension allows to control the number of requests in-flight
1031  * for the whole set. A callback function to generate requests must be provided
1032  * and the request set will keep the number of requests sent over the wire to
1033  * @max_inflight.
1034  * Returns a pointer to the newly allocated set structure or NULL on error.
1035  */
1036 struct ptlrpc_request_set *ptlrpc_prep_fcset(int max, set_producer_func func,
1037                                              void *arg)
1038
1039 {
1040         struct ptlrpc_request_set *set;
1041
1042         set = ptlrpc_prep_set();
1043         if (!set)
1044                 RETURN(NULL);
1045
1046         set->set_max_inflight  = max;
1047         set->set_producer      = func;
1048         set->set_producer_arg  = arg;
1049
1050         RETURN(set);
1051 }
1052
1053 /**
1054  * Wind down and free request set structure previously allocated with
1055  * ptlrpc_prep_set.
1056  * Ensures that all requests on the set have completed and removes
1057  * all requests from the request list in a set.
1058  * If any unsent request happen to be on the list, pretends that they got
1059  * an error in flight and calls their completion handler.
1060  */
1061 void ptlrpc_set_destroy(struct ptlrpc_request_set *set)
1062 {
1063         struct list_head *tmp;
1064         struct list_head *next;
1065         int expected_phase;
1066         int n = 0;
1067
1068         ENTRY;
1069
1070         /* Requests on the set should either all be completed, or all be new */
1071         expected_phase = (atomic_read(&set->set_remaining) == 0) ?
1072                          RQ_PHASE_COMPLETE : RQ_PHASE_NEW;
1073         list_for_each(tmp, &set->set_requests) {
1074                 struct ptlrpc_request *req =
1075                         list_entry(tmp, struct ptlrpc_request,
1076                                    rq_set_chain);
1077
1078                 LASSERT(req->rq_phase == expected_phase);
1079                 n++;
1080         }
1081
1082         LASSERTF(atomic_read(&set->set_remaining) == 0 ||
1083                  atomic_read(&set->set_remaining) == n, "%d / %d\n",
1084                  atomic_read(&set->set_remaining), n);
1085
1086         list_for_each_safe(tmp, next, &set->set_requests) {
1087                 struct ptlrpc_request *req =
1088                         list_entry(tmp, struct ptlrpc_request,
1089                                    rq_set_chain);
1090                 list_del_init(&req->rq_set_chain);
1091
1092                 LASSERT(req->rq_phase == expected_phase);
1093
1094                 if (req->rq_phase == RQ_PHASE_NEW) {
1095                         ptlrpc_req_interpret(NULL, req, -EBADR);
1096                         atomic_dec(&set->set_remaining);
1097                 }
1098
1099                 spin_lock(&req->rq_lock);
1100                 req->rq_set = NULL;
1101                 req->rq_invalid_rqset = 0;
1102                 spin_unlock(&req->rq_lock);
1103
1104                 ptlrpc_req_finished(req);
1105         }
1106
1107         LASSERT(atomic_read(&set->set_remaining) == 0);
1108
1109         ptlrpc_reqset_put(set);
1110         EXIT;
1111 }
1112 EXPORT_SYMBOL(ptlrpc_set_destroy);
1113
1114 /**
1115  * Add a new request to the general purpose request set.
1116  * Assumes request reference from the caller.
1117  */
1118 void ptlrpc_set_add_req(struct ptlrpc_request_set *set,
1119                         struct ptlrpc_request *req)
1120 {
1121         LASSERT(req->rq_import->imp_state != LUSTRE_IMP_IDLE);
1122         LASSERT(list_empty(&req->rq_set_chain));
1123
1124         if (req->rq_allow_intr)
1125                 set->set_allow_intr = 1;
1126
1127         /* The set takes over the caller's request reference */
1128         list_add_tail(&req->rq_set_chain, &set->set_requests);
1129         req->rq_set = set;
1130         atomic_inc(&set->set_remaining);
1131         req->rq_queued_time = ktime_get_seconds();
1132
1133         if (req->rq_reqmsg)
1134                 lustre_msg_set_jobid(req->rq_reqmsg, NULL);
1135
1136         if (set->set_producer)
1137                 /*
1138                  * If the request set has a producer callback, the RPC must be
1139                  * sent straight away
1140                  */
1141                 ptlrpc_send_new_req(req);
1142 }
1143 EXPORT_SYMBOL(ptlrpc_set_add_req);
1144
1145 /**
1146  * Add a request to a request with dedicated server thread
1147  * and wake the thread to make any necessary processing.
1148  * Currently only used for ptlrpcd.
1149  */
1150 void ptlrpc_set_add_new_req(struct ptlrpcd_ctl *pc,
1151                             struct ptlrpc_request *req)
1152 {
1153         struct ptlrpc_request_set *set = pc->pc_set;
1154         int count, i;
1155
1156         LASSERT(req->rq_set == NULL);
1157         LASSERT(test_bit(LIOD_STOP, &pc->pc_flags) == 0);
1158
1159         spin_lock(&set->set_new_req_lock);
1160         /*
1161          * The set takes over the caller's request reference.
1162          */
1163         req->rq_set = set;
1164         req->rq_queued_time = ktime_get_seconds();
1165         list_add_tail(&req->rq_set_chain, &set->set_new_requests);
1166         count = atomic_inc_return(&set->set_new_count);
1167         spin_unlock(&set->set_new_req_lock);
1168
1169         /* Only need to call wakeup once for the first entry. */
1170         if (count == 1) {
1171                 wake_up(&set->set_waitq);
1172
1173                 /*
1174                  * XXX: It maybe unnecessary to wakeup all the partners. But to
1175                  *      guarantee the async RPC can be processed ASAP, we have
1176                  *      no other better choice. It maybe fixed in future.
1177                  */
1178                 for (i = 0; i < pc->pc_npartners; i++)
1179                         wake_up(&pc->pc_partners[i]->pc_set->set_waitq);
1180         }
1181 }
1182
1183 /**
1184  * Based on the current state of the import, determine if the request
1185  * can be sent, is an error, or should be delayed.
1186  *
1187  * Returns true if this request should be delayed. If false, and
1188  * *status is set, then the request can not be sent and *status is the
1189  * error code.  If false and status is 0, then request can be sent.
1190  *
1191  * The imp->imp_lock must be held.
1192  */
1193 static int ptlrpc_import_delay_req(struct obd_import *imp,
1194                                    struct ptlrpc_request *req, int *status)
1195 {
1196         int delay = 0;
1197
1198         ENTRY;
1199         LASSERT(status);
1200         *status = 0;
1201
1202         if (req->rq_ctx_init || req->rq_ctx_fini) {
1203                 /* always allow ctx init/fini rpc go through */
1204         } else if (imp->imp_state == LUSTRE_IMP_NEW) {
1205                 DEBUG_REQ(D_ERROR, req, "Uninitialized import");
1206                 *status = -EIO;
1207         } else if (imp->imp_state == LUSTRE_IMP_CLOSED) {
1208                 unsigned int opc = lustre_msg_get_opc(req->rq_reqmsg);
1209
1210                 /*
1211                  * pings or MDS-equivalent STATFS may safely
1212                  * race with umount
1213                  */
1214                 DEBUG_REQ((opc == OBD_PING || opc == OST_STATFS) ?
1215                           D_HA : D_ERROR, req, "IMP_CLOSED");
1216                 *status = -EIO;
1217         } else if (ptlrpc_send_limit_expired(req)) {
1218                 /* probably doesn't need to be a D_ERROR afterinitial testing */
1219                 DEBUG_REQ(D_HA, req, "send limit expired");
1220                 *status = -ETIMEDOUT;
1221         } else if (req->rq_send_state == LUSTRE_IMP_CONNECTING &&
1222                    imp->imp_state == LUSTRE_IMP_CONNECTING) {
1223                 ;/* allow CONNECT even if import is invalid */
1224                 if (atomic_read(&imp->imp_inval_count) != 0) {
1225                         DEBUG_REQ(D_ERROR, req, "invalidate in flight");
1226                         *status = -EIO;
1227                 }
1228         } else if (imp->imp_invalid || imp->imp_obd->obd_no_recov) {
1229                 if (!imp->imp_deactive)
1230                         DEBUG_REQ(D_NET, req, "IMP_INVALID");
1231                 *status = -ESHUTDOWN; /* b=12940 */
1232         } else if (req->rq_import_generation != imp->imp_generation) {
1233                 DEBUG_REQ(D_ERROR, req, "req wrong generation:");
1234                 *status = -EIO;
1235         } else if (req->rq_send_state != imp->imp_state) {
1236                 /* invalidate in progress - any requests should be drop */
1237                 if (atomic_read(&imp->imp_inval_count) != 0) {
1238                         DEBUG_REQ(D_ERROR, req, "invalidate in flight");
1239                         *status = -EIO;
1240                 } else if (req->rq_no_delay &&
1241                            imp->imp_generation != imp->imp_initiated_at) {
1242                         /* ignore nodelay for requests initiating connections */
1243                         *status = -EWOULDBLOCK;
1244                 } else if (req->rq_allow_replay &&
1245                            (imp->imp_state == LUSTRE_IMP_REPLAY ||
1246                             imp->imp_state == LUSTRE_IMP_REPLAY_LOCKS ||
1247                             imp->imp_state == LUSTRE_IMP_REPLAY_WAIT ||
1248                             imp->imp_state == LUSTRE_IMP_RECOVER)) {
1249                         DEBUG_REQ(D_HA, req, "allow during recovery");
1250                 } else {
1251                         delay = 1;
1252                 }
1253         }
1254
1255         RETURN(delay);
1256 }
1257
1258 /**
1259  * Decide if the error message should be printed to the console or not.
1260  * Makes its decision based on request type, status, and failure frequency.
1261  *
1262  * \param[in] req  request that failed and may need a console message
1263  *
1264  * \retval false if no message should be printed
1265  * \retval true  if console message should be printed
1266  */
1267 static bool ptlrpc_console_allow(struct ptlrpc_request *req, __u32 opc, int err)
1268 {
1269         LASSERT(req->rq_reqmsg != NULL);
1270
1271         /* Suppress particular reconnect errors which are to be expected. */
1272         if (opc == OST_CONNECT || opc == MDS_CONNECT || opc == MGS_CONNECT) {
1273                 /* Suppress timed out reconnect requests */
1274                 if (lustre_handle_is_used(&req->rq_import->imp_remote_handle) ||
1275                     req->rq_timedout)
1276                         return false;
1277
1278                 /*
1279                  * Suppress most unavailable/again reconnect requests, but
1280                  * print occasionally so it is clear client is trying to
1281                  * connect to a server where no target is running.
1282                  */
1283                 if ((err == -ENODEV || err == -EAGAIN) &&
1284                     req->rq_import->imp_conn_cnt % 30 != 20)
1285                         return false;
1286         }
1287
1288         if (opc == LDLM_ENQUEUE && err == -EAGAIN)
1289                 /* -EAGAIN is normal when using POSIX flocks */
1290                 return false;
1291
1292         if (opc == OBD_PING && (err == -ENODEV || err == -ENOTCONN) &&
1293             (req->rq_xid & 0xf) != 10)
1294                 /* Suppress most ping requests, they may fail occasionally */
1295                 return false;
1296
1297         return true;
1298 }
1299
1300 /**
1301  * Check request processing status.
1302  * Returns the status.
1303  */
1304 static int ptlrpc_check_status(struct ptlrpc_request *req)
1305 {
1306         int rc;
1307
1308         ENTRY;
1309         rc = lustre_msg_get_status(req->rq_repmsg);
1310         if (lustre_msg_get_type(req->rq_repmsg) == PTL_RPC_MSG_ERR) {
1311                 struct obd_import *imp = req->rq_import;
1312                 lnet_nid_t nid = imp->imp_connection->c_peer.nid;
1313                 __u32 opc = lustre_msg_get_opc(req->rq_reqmsg);
1314
1315                 if (ptlrpc_console_allow(req, opc, rc))
1316                         LCONSOLE_ERROR_MSG(0x11,
1317                                            "%s: operation %s to node %s failed: rc = %d\n",
1318                                            imp->imp_obd->obd_name,
1319                                            ll_opcode2str(opc),
1320                                            libcfs_nid2str(nid), rc);
1321                 RETURN(rc < 0 ? rc : -EINVAL);
1322         }
1323
1324         if (rc)
1325                 DEBUG_REQ(D_INFO, req, "check status: rc = %d", rc);
1326
1327         RETURN(rc);
1328 }
1329
1330 /**
1331  * save pre-versions of objects into request for replay.
1332  * Versions are obtained from server reply.
1333  * used for VBR.
1334  */
1335 static void ptlrpc_save_versions(struct ptlrpc_request *req)
1336 {
1337         struct lustre_msg *repmsg = req->rq_repmsg;
1338         struct lustre_msg *reqmsg = req->rq_reqmsg;
1339         __u64 *versions = lustre_msg_get_versions(repmsg);
1340
1341         ENTRY;
1342         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)
1343                 return;
1344
1345         LASSERT(versions);
1346         lustre_msg_set_versions(reqmsg, versions);
1347         CDEBUG(D_INFO, "Client save versions [%#llx/%#llx]\n",
1348                versions[0], versions[1]);
1349
1350         EXIT;
1351 }
1352
1353 __u64 ptlrpc_known_replied_xid(struct obd_import *imp)
1354 {
1355         struct ptlrpc_request *req;
1356
1357         assert_spin_locked(&imp->imp_lock);
1358         if (list_empty(&imp->imp_unreplied_list))
1359                 return 0;
1360
1361         req = list_entry(imp->imp_unreplied_list.next, struct ptlrpc_request,
1362                          rq_unreplied_list);
1363         LASSERTF(req->rq_xid >= 1, "XID:%llu\n", req->rq_xid);
1364
1365         if (imp->imp_known_replied_xid < req->rq_xid - 1)
1366                 imp->imp_known_replied_xid = req->rq_xid - 1;
1367
1368         return req->rq_xid - 1;
1369 }
1370
1371 /**
1372  * Callback function called when client receives RPC reply for \a req.
1373  * Returns 0 on success or error code.
1374  * The return alue would be assigned to req->rq_status by the caller
1375  * as request processing status.
1376  * This function also decides if the request needs to be saved for later replay.
1377  */
1378 static int after_reply(struct ptlrpc_request *req)
1379 {
1380         struct obd_import *imp = req->rq_import;
1381         struct obd_device *obd = req->rq_import->imp_obd;
1382         ktime_t work_start;
1383         u64 committed;
1384         s64 timediff;
1385         int rc;
1386
1387         ENTRY;
1388         LASSERT(obd != NULL);
1389         /* repbuf must be unlinked */
1390         LASSERT(!req->rq_receiving_reply && req->rq_reply_unlinked);
1391
1392         if (req->rq_reply_truncated) {
1393                 if (ptlrpc_no_resend(req)) {
1394                         DEBUG_REQ(D_ERROR, req,
1395                                   "reply buffer overflow, expected=%d, actual size=%d",
1396                                   req->rq_nob_received, req->rq_repbuf_len);
1397                         RETURN(-EOVERFLOW);
1398                 }
1399
1400                 sptlrpc_cli_free_repbuf(req);
1401                 /*
1402                  * Pass the required reply buffer size (include
1403                  * space for early reply).
1404                  * NB: no need to roundup because alloc_repbuf
1405                  * will roundup it
1406                  */
1407                 req->rq_replen = req->rq_nob_received;
1408                 req->rq_nob_received = 0;
1409                 spin_lock(&req->rq_lock);
1410                 req->rq_resend       = 1;
1411                 spin_unlock(&req->rq_lock);
1412                 RETURN(0);
1413         }
1414
1415         work_start = ktime_get_real();
1416         timediff = ktime_us_delta(work_start, req->rq_sent_ns);
1417
1418         /*
1419          * NB Until this point, the whole of the incoming message,
1420          * including buflens, status etc is in the sender's byte order.
1421          */
1422         rc = sptlrpc_cli_unwrap_reply(req);
1423         if (rc) {
1424                 DEBUG_REQ(D_ERROR, req, "unwrap reply failed: rc = %d", rc);
1425                 RETURN(rc);
1426         }
1427
1428         /*
1429          * Security layer unwrap might ask resend this request.
1430          */
1431         if (req->rq_resend)
1432                 RETURN(0);
1433
1434         rc = unpack_reply(req);
1435         if (rc)
1436                 RETURN(rc);
1437
1438         /* retry indefinitely on EINPROGRESS */
1439         if (lustre_msg_get_status(req->rq_repmsg) == -EINPROGRESS &&
1440             ptlrpc_no_resend(req) == 0 && !req->rq_no_retry_einprogress) {
1441                 time64_t now = ktime_get_real_seconds();
1442
1443                 DEBUG_REQ((req->rq_nr_resend % 8 == 1 ? D_WARNING : 0) |
1444                           D_RPCTRACE, req, "resending request on EINPROGRESS");
1445                 spin_lock(&req->rq_lock);
1446                 req->rq_resend = 1;
1447                 spin_unlock(&req->rq_lock);
1448                 req->rq_nr_resend++;
1449
1450                 /* Readjust the timeout for current conditions */
1451                 ptlrpc_at_set_req_timeout(req);
1452                 /*
1453                  * delay resend to give a chance to the server to get ready.
1454                  * The delay is increased by 1s on every resend and is capped to
1455                  * the current request timeout (i.e. obd_timeout if AT is off,
1456                  * or AT service time x 125% + 5s, see at_est2timeout)
1457                  */
1458                 if (req->rq_nr_resend > req->rq_timeout)
1459                         req->rq_sent = now + req->rq_timeout;
1460                 else
1461                         req->rq_sent = now + req->rq_nr_resend;
1462
1463                 /* Resend for EINPROGRESS will use a new XID */
1464                 spin_lock(&imp->imp_lock);
1465                 list_del_init(&req->rq_unreplied_list);
1466                 spin_unlock(&imp->imp_lock);
1467
1468                 RETURN(0);
1469         }
1470
1471         if (obd->obd_svc_stats) {
1472                 lprocfs_counter_add(obd->obd_svc_stats, PTLRPC_REQWAIT_CNTR,
1473                                     timediff);
1474                 ptlrpc_lprocfs_rpc_sent(req, timediff);
1475         }
1476
1477         if (lustre_msg_get_type(req->rq_repmsg) != PTL_RPC_MSG_REPLY &&
1478             lustre_msg_get_type(req->rq_repmsg) != PTL_RPC_MSG_ERR) {
1479                 DEBUG_REQ(D_ERROR, req, "invalid packet received (type=%u)",
1480                           lustre_msg_get_type(req->rq_repmsg));
1481                 RETURN(-EPROTO);
1482         }
1483
1484         if (lustre_msg_get_opc(req->rq_reqmsg) != OBD_PING)
1485                 CFS_FAIL_TIMEOUT(OBD_FAIL_PTLRPC_PAUSE_REP, cfs_fail_val);
1486         ptlrpc_at_adj_service(req, lustre_msg_get_timeout(req->rq_repmsg));
1487         ptlrpc_at_adj_net_latency(req,
1488                                   lustre_msg_get_service_time(req->rq_repmsg));
1489
1490         rc = ptlrpc_check_status(req);
1491
1492         if (rc) {
1493                 /*
1494                  * Either we've been evicted, or the server has failed for
1495                  * some reason. Try to reconnect, and if that fails, punt to
1496                  * the upcall.
1497                  */
1498                 if (ptlrpc_recoverable_error(rc)) {
1499                         if (req->rq_send_state != LUSTRE_IMP_FULL ||
1500                             imp->imp_obd->obd_no_recov || imp->imp_dlm_fake) {
1501                                 RETURN(rc);
1502                         }
1503                         ptlrpc_request_handle_notconn(req);
1504                         RETURN(rc);
1505                 }
1506         } else {
1507                 /*
1508                  * Let's look if server sent slv. Do it only for RPC with
1509                  * rc == 0.
1510                  */
1511                 ldlm_cli_update_pool(req);
1512         }
1513
1514         /*
1515          * Store transno in reqmsg for replay.
1516          */
1517         if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)) {
1518                 req->rq_transno = lustre_msg_get_transno(req->rq_repmsg);
1519                 lustre_msg_set_transno(req->rq_reqmsg, req->rq_transno);
1520         }
1521
1522         if (imp->imp_replayable) {
1523                 spin_lock(&imp->imp_lock);
1524                 /*
1525                  * No point in adding already-committed requests to the replay
1526                  * list, we will just remove them immediately. b=9829
1527                  */
1528                 if (req->rq_transno != 0 &&
1529                     (req->rq_transno >
1530                      lustre_msg_get_last_committed(req->rq_repmsg) ||
1531                      req->rq_replay)) {
1532                         /** version recovery */
1533                         ptlrpc_save_versions(req);
1534                         ptlrpc_retain_replayable_request(req, imp);
1535                 } else if (req->rq_commit_cb &&
1536                            list_empty(&req->rq_replay_list)) {
1537                         /*
1538                          * NB: don't call rq_commit_cb if it's already on
1539                          * rq_replay_list, ptlrpc_free_committed() will call
1540                          * it later, see LU-3618 for details
1541                          */
1542                         spin_unlock(&imp->imp_lock);
1543                         req->rq_commit_cb(req);
1544                         spin_lock(&imp->imp_lock);
1545                 }
1546
1547                 /*
1548                  * Replay-enabled imports return commit-status information.
1549                  */
1550                 committed = lustre_msg_get_last_committed(req->rq_repmsg);
1551                 if (likely(committed > imp->imp_peer_committed_transno))
1552                         imp->imp_peer_committed_transno = committed;
1553
1554                 ptlrpc_free_committed(imp);
1555
1556                 if (!list_empty(&imp->imp_replay_list)) {
1557                         struct ptlrpc_request *last;
1558
1559                         last = list_entry(imp->imp_replay_list.prev,
1560                                           struct ptlrpc_request,
1561                                           rq_replay_list);
1562                         /*
1563                          * Requests with rq_replay stay on the list even if no
1564                          * commit is expected.
1565                          */
1566                         if (last->rq_transno > imp->imp_peer_committed_transno)
1567                                 ptlrpc_pinger_commit_expected(imp);
1568                 }
1569
1570                 spin_unlock(&imp->imp_lock);
1571         }
1572
1573         RETURN(rc);
1574 }
1575
1576 /**
1577  * Helper function to send request \a req over the network for the first time
1578  * Also adjusts request phase.
1579  * Returns 0 on success or error code.
1580  */
1581 static int ptlrpc_send_new_req(struct ptlrpc_request *req)
1582 {
1583         struct obd_import *imp = req->rq_import;
1584         __u64 min_xid = 0;
1585         int rc;
1586
1587         ENTRY;
1588         LASSERT(req->rq_phase == RQ_PHASE_NEW);
1589
1590         /* do not try to go further if there is not enough memory in enc_pool */
1591         if (req->rq_sent && req->rq_bulk)
1592                 if (req->rq_bulk->bd_iov_count > get_free_pages_in_pool() &&
1593                     pool_is_at_full_capacity())
1594                         RETURN(-ENOMEM);
1595
1596         if (req->rq_sent && (req->rq_sent > ktime_get_real_seconds()) &&
1597             (!req->rq_generation_set ||
1598              req->rq_import_generation == imp->imp_generation))
1599                 RETURN(0);
1600
1601         ptlrpc_rqphase_move(req, RQ_PHASE_RPC);
1602
1603         spin_lock(&imp->imp_lock);
1604
1605         LASSERT(req->rq_xid != 0);
1606         LASSERT(!list_empty(&req->rq_unreplied_list));
1607
1608         if (!req->rq_generation_set)
1609                 req->rq_import_generation = imp->imp_generation;
1610
1611         if (ptlrpc_import_delay_req(imp, req, &rc)) {
1612                 spin_lock(&req->rq_lock);
1613                 req->rq_waiting = 1;
1614                 spin_unlock(&req->rq_lock);
1615
1616                 DEBUG_REQ(D_HA, req, "req waiting for recovery: (%s != %s)",
1617                           ptlrpc_import_state_name(req->rq_send_state),
1618                           ptlrpc_import_state_name(imp->imp_state));
1619                 LASSERT(list_empty(&req->rq_list));
1620                 list_add_tail(&req->rq_list, &imp->imp_delayed_list);
1621                 atomic_inc(&req->rq_import->imp_inflight);
1622                 spin_unlock(&imp->imp_lock);
1623                 RETURN(0);
1624         }
1625
1626         if (rc != 0) {
1627                 spin_unlock(&imp->imp_lock);
1628                 req->rq_status = rc;
1629                 ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
1630                 RETURN(rc);
1631         }
1632
1633         LASSERT(list_empty(&req->rq_list));
1634         list_add_tail(&req->rq_list, &imp->imp_sending_list);
1635         atomic_inc(&req->rq_import->imp_inflight);
1636
1637         /*
1638          * find the known replied XID from the unreplied list, CONNECT
1639          * and DISCONNECT requests are skipped to make the sanity check
1640          * on server side happy. see process_req_last_xid().
1641          *
1642          * For CONNECT: Because replay requests have lower XID, it'll
1643          * break the sanity check if CONNECT bump the exp_last_xid on
1644          * server.
1645          *
1646          * For DISCONNECT: Since client will abort inflight RPC before
1647          * sending DISCONNECT, DISCONNECT may carry an XID which higher
1648          * than the inflight RPC.
1649          */
1650         if (!ptlrpc_req_is_connect(req) && !ptlrpc_req_is_disconnect(req))
1651                 min_xid = ptlrpc_known_replied_xid(imp);
1652         spin_unlock(&imp->imp_lock);
1653
1654         lustre_msg_set_last_xid(req->rq_reqmsg, min_xid);
1655
1656         lustre_msg_set_status(req->rq_reqmsg, current_pid());
1657
1658         rc = sptlrpc_req_refresh_ctx(req, -1);
1659         if (rc) {
1660                 if (req->rq_err) {
1661                         req->rq_status = rc;
1662                         RETURN(1);
1663                 } else {
1664                         spin_lock(&req->rq_lock);
1665                         req->rq_wait_ctx = 1;
1666                         spin_unlock(&req->rq_lock);
1667                         RETURN(0);
1668                 }
1669         }
1670
1671         CDEBUG(D_RPCTRACE,
1672                "Sending RPC req@%p pname:cluuid:pid:xid:nid:opc:job %s:%s:%d:%llu:%s:%d:%s\n",
1673                req, current_comm(),
1674                imp->imp_obd->obd_uuid.uuid,
1675                lustre_msg_get_status(req->rq_reqmsg), req->rq_xid,
1676                obd_import_nid2str(imp), lustre_msg_get_opc(req->rq_reqmsg),
1677                lustre_msg_get_jobid(req->rq_reqmsg) ?: "");
1678
1679         rc = ptl_send_rpc(req, 0);
1680         if (rc == -ENOMEM) {
1681                 spin_lock(&imp->imp_lock);
1682                 if (!list_empty(&req->rq_list)) {
1683                         list_del_init(&req->rq_list);
1684                         atomic_dec(&req->rq_import->imp_inflight);
1685                 }
1686                 spin_unlock(&imp->imp_lock);
1687                 ptlrpc_rqphase_move(req, RQ_PHASE_NEW);
1688                 RETURN(rc);
1689         }
1690         if (rc) {
1691                 DEBUG_REQ(D_HA, req, "send failed, expect timeout: rc = %d",
1692                           rc);
1693                 spin_lock(&req->rq_lock);
1694                 req->rq_net_err = 1;
1695                 spin_unlock(&req->rq_lock);
1696                 RETURN(rc);
1697         }
1698         RETURN(0);
1699 }
1700
1701 static inline int ptlrpc_set_producer(struct ptlrpc_request_set *set)
1702 {
1703         int remaining, rc;
1704
1705         ENTRY;
1706         LASSERT(set->set_producer != NULL);
1707
1708         remaining = atomic_read(&set->set_remaining);
1709
1710         /*
1711          * populate the ->set_requests list with requests until we
1712          * reach the maximum number of RPCs in flight for this set
1713          */
1714         while (atomic_read(&set->set_remaining) < set->set_max_inflight) {
1715                 rc = set->set_producer(set, set->set_producer_arg);
1716                 if (rc == -ENOENT) {
1717                         /* no more RPC to produce */
1718                         set->set_producer     = NULL;
1719                         set->set_producer_arg = NULL;
1720                         RETURN(0);
1721                 }
1722         }
1723
1724         RETURN((atomic_read(&set->set_remaining) - remaining));
1725 }
1726
1727 /**
1728  * this sends any unsent RPCs in \a set and returns 1 if all are sent
1729  * and no more replies are expected.
1730  * (it is possible to get less replies than requests sent e.g. due to timed out
1731  * requests or requests that we had trouble to send out)
1732  *
1733  * NOTE: This function contains a potential schedule point (cond_resched()).
1734  */
1735 int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set)
1736 {
1737         struct list_head *tmp, *next;
1738         struct list_head  comp_reqs;
1739         int force_timer_recalc = 0;
1740
1741         ENTRY;
1742         if (atomic_read(&set->set_remaining) == 0)
1743                 RETURN(1);
1744
1745         INIT_LIST_HEAD(&comp_reqs);
1746         list_for_each_safe(tmp, next, &set->set_requests) {
1747                 struct ptlrpc_request *req =
1748                         list_entry(tmp, struct ptlrpc_request,
1749                                    rq_set_chain);
1750                 struct obd_import *imp = req->rq_import;
1751                 int unregistered = 0;
1752                 int async = 1;
1753                 int rc = 0;
1754
1755                 if (req->rq_phase == RQ_PHASE_COMPLETE) {
1756                         list_move_tail(&req->rq_set_chain, &comp_reqs);
1757                         continue;
1758                 }
1759
1760                 /*
1761                  * This schedule point is mainly for the ptlrpcd caller of this
1762                  * function.  Most ptlrpc sets are not long-lived and unbounded
1763                  * in length, but at the least the set used by the ptlrpcd is.
1764                  * Since the processing time is unbounded, we need to insert an
1765                  * explicit schedule point to make the thread well-behaved.
1766                  */
1767                 cond_resched();
1768
1769                 /*
1770                  * If the caller requires to allow to be interpreted by force
1771                  * and it has really been interpreted, then move the request
1772                  * to RQ_PHASE_INTERPRET phase in spite of what the current
1773                  * phase is.
1774                  */
1775                 if (unlikely(req->rq_allow_intr && req->rq_intr)) {
1776                         req->rq_status = -EINTR;
1777                         ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
1778
1779                         /*
1780                          * Since it is interpreted and we have to wait for
1781                          * the reply to be unlinked, then use sync mode.
1782                          */
1783                         async = 0;
1784
1785                         GOTO(interpret, req->rq_status);
1786                 }
1787
1788                 if (req->rq_phase == RQ_PHASE_NEW && ptlrpc_send_new_req(req))
1789                         force_timer_recalc = 1;
1790
1791                 /* delayed send - skip */
1792                 if (req->rq_phase == RQ_PHASE_NEW && req->rq_sent)
1793                         continue;
1794
1795                 /* delayed resend - skip */
1796                 if (req->rq_phase == RQ_PHASE_RPC && req->rq_resend &&
1797                     req->rq_sent > ktime_get_real_seconds())
1798                         continue;
1799
1800                 if (!(req->rq_phase == RQ_PHASE_RPC ||
1801                       req->rq_phase == RQ_PHASE_BULK ||
1802                       req->rq_phase == RQ_PHASE_INTERPRET ||
1803                       req->rq_phase == RQ_PHASE_UNREG_RPC ||
1804                       req->rq_phase == RQ_PHASE_UNREG_BULK)) {
1805                         DEBUG_REQ(D_ERROR, req, "bad phase %x", req->rq_phase);
1806                         LBUG();
1807                 }
1808
1809                 if (req->rq_phase == RQ_PHASE_UNREG_RPC ||
1810                     req->rq_phase == RQ_PHASE_UNREG_BULK) {
1811                         LASSERT(req->rq_next_phase != req->rq_phase);
1812                         LASSERT(req->rq_next_phase != RQ_PHASE_UNDEFINED);
1813
1814                         if (req->rq_req_deadline &&
1815                             !OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REQ_UNLINK))
1816                                 req->rq_req_deadline = 0;
1817                         if (req->rq_reply_deadline &&
1818                             !OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK))
1819                                 req->rq_reply_deadline = 0;
1820                         if (req->rq_bulk_deadline &&
1821                             !OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_BULK_UNLINK))
1822                                 req->rq_bulk_deadline = 0;
1823
1824                         /*
1825                          * Skip processing until reply is unlinked. We
1826                          * can't return to pool before that and we can't
1827                          * call interpret before that. We need to make
1828                          * sure that all rdma transfers finished and will
1829                          * not corrupt any data.
1830                          */
1831                         if (req->rq_phase == RQ_PHASE_UNREG_RPC &&
1832                             ptlrpc_client_recv_or_unlink(req))
1833                                 continue;
1834                         if (req->rq_phase == RQ_PHASE_UNREG_BULK &&
1835                             ptlrpc_client_bulk_active(req))
1836                                 continue;
1837
1838                         /*
1839                          * Turn fail_loc off to prevent it from looping
1840                          * forever.
1841                          */
1842                         if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK)) {
1843                                 OBD_FAIL_CHECK_ORSET(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK,
1844                                                      OBD_FAIL_ONCE);
1845                         }
1846                         if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_BULK_UNLINK)) {
1847                                 OBD_FAIL_CHECK_ORSET(OBD_FAIL_PTLRPC_LONG_BULK_UNLINK,
1848                                                      OBD_FAIL_ONCE);
1849                         }
1850
1851                         /*
1852                          * Move to next phase if reply was successfully
1853                          * unlinked.
1854                          */
1855                         ptlrpc_rqphase_move(req, req->rq_next_phase);
1856                 }
1857
1858                 if (req->rq_phase == RQ_PHASE_INTERPRET)
1859                         GOTO(interpret, req->rq_status);
1860
1861                 /*
1862                  * Note that this also will start async reply unlink.
1863                  */
1864                 if (req->rq_net_err && !req->rq_timedout) {
1865                         ptlrpc_expire_one_request(req, 1);
1866
1867                         /*
1868                          * Check if we still need to wait for unlink.
1869                          */
1870                         if (ptlrpc_client_recv_or_unlink(req) ||
1871                             ptlrpc_client_bulk_active(req))
1872                                 continue;
1873                         /* If there is no need to resend, fail it now. */
1874                         if (req->rq_no_resend) {
1875                                 if (req->rq_status == 0)
1876                                         req->rq_status = -EIO;
1877                                 ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
1878                                 GOTO(interpret, req->rq_status);
1879                         } else {
1880                                 continue;
1881                         }
1882                 }
1883
1884                 if (req->rq_err) {
1885                         spin_lock(&req->rq_lock);
1886                         req->rq_replied = 0;
1887                         spin_unlock(&req->rq_lock);
1888                         if (req->rq_status == 0)
1889                                 req->rq_status = -EIO;
1890                         ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
1891                         GOTO(interpret, req->rq_status);
1892                 }
1893
1894                 /*
1895                  * ptlrpc_set_wait->l_wait_event sets lwi_allow_intr
1896                  * so it sets rq_intr regardless of individual rpc
1897                  * timeouts. The synchronous IO waiting path sets
1898                  * rq_intr irrespective of whether ptlrpcd
1899                  * has seen a timeout.  Our policy is to only interpret
1900                  * interrupted rpcs after they have timed out, so we
1901                  * need to enforce that here.
1902                  */
1903
1904                 if (req->rq_intr && (req->rq_timedout || req->rq_waiting ||
1905                                      req->rq_wait_ctx)) {
1906                         req->rq_status = -EINTR;
1907                         ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
1908                         GOTO(interpret, req->rq_status);
1909                 }
1910
1911                 if (req->rq_phase == RQ_PHASE_RPC) {
1912                         if (req->rq_timedout || req->rq_resend ||
1913                             req->rq_waiting || req->rq_wait_ctx) {
1914                                 int status;
1915
1916                                 if (!ptlrpc_unregister_reply(req, 1)) {
1917                                         ptlrpc_unregister_bulk(req, 1);
1918                                         continue;
1919                                 }
1920
1921                                 spin_lock(&imp->imp_lock);
1922                                 if (ptlrpc_import_delay_req(imp, req,
1923                                                             &status)) {
1924                                         /*
1925                                          * put on delay list - only if we wait
1926                                          * recovery finished - before send
1927                                          */
1928                                         list_move_tail(&req->rq_list,
1929                                                        &imp->imp_delayed_list);
1930                                         spin_unlock(&imp->imp_lock);
1931                                         continue;
1932                                 }
1933
1934                                 if (status != 0)  {
1935                                         req->rq_status = status;
1936                                         ptlrpc_rqphase_move(req,
1937                                                             RQ_PHASE_INTERPRET);
1938                                         spin_unlock(&imp->imp_lock);
1939                                         GOTO(interpret, req->rq_status);
1940                                 }
1941                                 /* ignore on just initiated connections */
1942                                 if (ptlrpc_no_resend(req) &&
1943                                     !req->rq_wait_ctx &&
1944                                     imp->imp_generation !=
1945                                     imp->imp_initiated_at) {
1946                                         req->rq_status = -ENOTCONN;
1947                                         ptlrpc_rqphase_move(req,
1948                                                             RQ_PHASE_INTERPRET);
1949                                         spin_unlock(&imp->imp_lock);
1950                                         GOTO(interpret, req->rq_status);
1951                                 }
1952
1953                                 list_move_tail(&req->rq_list,
1954                                                &imp->imp_sending_list);
1955
1956                                 spin_unlock(&imp->imp_lock);
1957
1958                                 spin_lock(&req->rq_lock);
1959                                 req->rq_waiting = 0;
1960                                 spin_unlock(&req->rq_lock);
1961
1962                                 if (req->rq_timedout || req->rq_resend) {
1963                                         /*
1964                                          * This is re-sending anyways,
1965                                          * let's mark req as resend.
1966                                          */
1967                                         spin_lock(&req->rq_lock);
1968                                         req->rq_resend = 1;
1969                                         spin_unlock(&req->rq_lock);
1970                                 }
1971                                 /*
1972                                  * rq_wait_ctx is only touched by ptlrpcd,
1973                                  * so no lock is needed here.
1974                                  */
1975                                 status = sptlrpc_req_refresh_ctx(req, -1);
1976                                 if (status) {
1977                                         if (req->rq_err) {
1978                                                 req->rq_status = status;
1979                                                 spin_lock(&req->rq_lock);
1980                                                 req->rq_wait_ctx = 0;
1981                                                 spin_unlock(&req->rq_lock);
1982                                                 force_timer_recalc = 1;
1983                                         } else {
1984                                                 spin_lock(&req->rq_lock);
1985                                                 req->rq_wait_ctx = 1;
1986                                                 spin_unlock(&req->rq_lock);
1987                                         }
1988
1989                                         continue;
1990                                 } else {
1991                                         spin_lock(&req->rq_lock);
1992                                         req->rq_wait_ctx = 0;
1993                                         spin_unlock(&req->rq_lock);
1994                                 }
1995
1996                                 /*
1997                                  * In any case, the previous bulk should be
1998                                  * cleaned up to prepare for the new sending
1999                                  */
2000                                 if (req->rq_bulk &&
2001                                     !ptlrpc_unregister_bulk(req, 1))
2002                                         continue;
2003
2004                                 rc = ptl_send_rpc(req, 0);
2005                                 if (rc == -ENOMEM) {
2006                                         spin_lock(&imp->imp_lock);
2007                                         if (!list_empty(&req->rq_list))
2008                                                 list_del_init(&req->rq_list);
2009                                         spin_unlock(&imp->imp_lock);
2010                                         ptlrpc_rqphase_move(req, RQ_PHASE_NEW);
2011                                         continue;
2012                                 }
2013                                 if (rc) {
2014                                         DEBUG_REQ(D_HA, req,
2015                                                   "send failed: rc = %d", rc);
2016                                         force_timer_recalc = 1;
2017                                         spin_lock(&req->rq_lock);
2018                                         req->rq_net_err = 1;
2019                                         spin_unlock(&req->rq_lock);
2020                                         continue;
2021                                 }
2022                                 /* need to reset the timeout */
2023                                 force_timer_recalc = 1;
2024                         }
2025
2026                         spin_lock(&req->rq_lock);
2027
2028                         if (ptlrpc_client_early(req)) {
2029                                 ptlrpc_at_recv_early_reply(req);
2030                                 spin_unlock(&req->rq_lock);
2031                                 continue;
2032                         }
2033
2034                         /* Still waiting for a reply? */
2035                         if (ptlrpc_client_recv(req)) {
2036                                 spin_unlock(&req->rq_lock);
2037                                 continue;
2038                         }
2039
2040                         /* Did we actually receive a reply? */
2041                         if (!ptlrpc_client_replied(req)) {
2042                                 spin_unlock(&req->rq_lock);
2043                                 continue;
2044                         }
2045
2046                         spin_unlock(&req->rq_lock);
2047
2048                         /*
2049                          * unlink from net because we are going to
2050                          * swab in-place of reply buffer
2051                          */
2052                         unregistered = ptlrpc_unregister_reply(req, 1);
2053                         if (!unregistered)
2054                                 continue;
2055
2056                         req->rq_status = after_reply(req);
2057                         if (req->rq_resend)
2058                                 continue;
2059
2060                         /*
2061                          * If there is no bulk associated with this request,
2062                          * then we're done and should let the interpreter
2063                          * process the reply. Similarly if the RPC returned
2064                          * an error, and therefore the bulk will never arrive.
2065                          */
2066                         if (!req->rq_bulk || req->rq_status < 0) {
2067                                 ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
2068                                 GOTO(interpret, req->rq_status);
2069                         }
2070
2071                         ptlrpc_rqphase_move(req, RQ_PHASE_BULK);
2072                 }
2073
2074                 LASSERT(req->rq_phase == RQ_PHASE_BULK);
2075                 if (ptlrpc_client_bulk_active(req))
2076                         continue;
2077
2078                 if (req->rq_bulk->bd_failure) {
2079                         /*
2080                          * The RPC reply arrived OK, but the bulk screwed
2081                          * up!  Dead weird since the server told us the RPC
2082                          * was good after getting the REPLY for her GET or
2083                          * the ACK for her PUT.
2084                          */
2085                         DEBUG_REQ(D_ERROR, req, "bulk transfer failed");
2086                         req->rq_status = -EIO;
2087                 }
2088
2089                 ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
2090
2091 interpret:
2092                 LASSERT(req->rq_phase == RQ_PHASE_INTERPRET);
2093
2094                 /*
2095                  * This moves to "unregistering" phase we need to wait for
2096                  * reply unlink.
2097                  */
2098                 if (!unregistered && !ptlrpc_unregister_reply(req, async)) {
2099                         /* start async bulk unlink too */
2100                         ptlrpc_unregister_bulk(req, 1);
2101                         continue;
2102                 }
2103
2104                 if (!ptlrpc_unregister_bulk(req, async))
2105                         continue;
2106
2107                 /*
2108                  * When calling interpret receiving already should be
2109                  * finished.
2110                  */
2111                 LASSERT(!req->rq_receiving_reply);
2112
2113                 ptlrpc_req_interpret(env, req, req->rq_status);
2114
2115                 if (ptlrpcd_check_work(req)) {
2116                         atomic_dec(&set->set_remaining);
2117                         continue;
2118                 }
2119                 ptlrpc_rqphase_move(req, RQ_PHASE_COMPLETE);
2120
2121                 if (req->rq_reqmsg)
2122                         CDEBUG(D_RPCTRACE,
2123                                "Completed RPC req@%p pname:cluuid:pid:xid:nid:opc:job %s:%s:%d:%llu:%s:%d:%s\n",
2124                                req, current_comm(),
2125                                imp->imp_obd->obd_uuid.uuid,
2126                                lustre_msg_get_status(req->rq_reqmsg),
2127                                req->rq_xid,
2128                                obd_import_nid2str(imp),
2129                                lustre_msg_get_opc(req->rq_reqmsg),
2130                                lustre_msg_get_jobid(req->rq_reqmsg) ?: "");
2131
2132                 spin_lock(&imp->imp_lock);
2133                 /*
2134                  * Request already may be not on sending or delaying list. This
2135                  * may happen in the case of marking it erroneous for the case
2136                  * ptlrpc_import_delay_req(req, status) find it impossible to
2137                  * allow sending this rpc and returns *status != 0.
2138                  */
2139                 if (!list_empty(&req->rq_list)) {
2140                         list_del_init(&req->rq_list);
2141                         atomic_dec(&imp->imp_inflight);
2142                 }
2143                 list_del_init(&req->rq_unreplied_list);
2144                 spin_unlock(&imp->imp_lock);
2145
2146                 atomic_dec(&set->set_remaining);
2147                 wake_up_all(&imp->imp_recovery_waitq);
2148
2149                 if (set->set_producer) {
2150                         /* produce a new request if possible */
2151                         if (ptlrpc_set_producer(set) > 0)
2152                                 force_timer_recalc = 1;
2153
2154                         /*
2155                          * free the request that has just been completed
2156                          * in order not to pollute set->set_requests
2157                          */
2158                         list_del_init(&req->rq_set_chain);
2159                         spin_lock(&req->rq_lock);
2160                         req->rq_set = NULL;
2161                         req->rq_invalid_rqset = 0;
2162                         spin_unlock(&req->rq_lock);
2163
2164                         /* record rq_status to compute the final status later */
2165                         if (req->rq_status != 0)
2166                                 set->set_rc = req->rq_status;
2167                         ptlrpc_req_finished(req);
2168                 } else {
2169                         list_move_tail(&req->rq_set_chain, &comp_reqs);
2170                 }
2171         }
2172
2173         /*
2174          * move completed request at the head of list so it's easier for
2175          * caller to find them
2176          */
2177         list_splice(&comp_reqs, &set->set_requests);
2178
2179         /* If we hit an error, we want to recover promptly. */
2180         RETURN(atomic_read(&set->set_remaining) == 0 || force_timer_recalc);
2181 }
2182 EXPORT_SYMBOL(ptlrpc_check_set);
2183
2184 /**
2185  * Time out request \a req. is \a async_unlink is set, that means do not wait
2186  * until LNet actually confirms network buffer unlinking.
2187  * Return 1 if we should give up further retrying attempts or 0 otherwise.
2188  */
2189 int ptlrpc_expire_one_request(struct ptlrpc_request *req, int async_unlink)
2190 {
2191         struct obd_import *imp = req->rq_import;
2192         unsigned int debug_mask = D_RPCTRACE;
2193         int rc = 0;
2194
2195         ENTRY;
2196         spin_lock(&req->rq_lock);
2197         req->rq_timedout = 1;
2198         spin_unlock(&req->rq_lock);
2199
2200         if (ptlrpc_console_allow(req, lustre_msg_get_opc(req->rq_reqmsg),
2201                                  lustre_msg_get_status(req->rq_reqmsg)))
2202                 debug_mask = D_WARNING;
2203         DEBUG_REQ(debug_mask, req, "Request sent has %s: [sent %lld/real %lld]",
2204                   req->rq_net_err ? "failed due to network error" :
2205                      ((req->rq_real_sent == 0 ||
2206                        req->rq_real_sent < req->rq_sent ||
2207                        req->rq_real_sent >= req->rq_deadline) ?
2208                       "timed out for sent delay" : "timed out for slow reply"),
2209                   (s64)req->rq_sent, (s64)req->rq_real_sent);
2210
2211         if (imp && obd_debug_peer_on_timeout)
2212                 LNetDebugPeer(imp->imp_connection->c_peer);
2213
2214         ptlrpc_unregister_reply(req, async_unlink);
2215         ptlrpc_unregister_bulk(req, async_unlink);
2216
2217         if (obd_dump_on_timeout)
2218                 libcfs_debug_dumplog();
2219
2220         if (!imp) {
2221                 DEBUG_REQ(D_HA, req, "NULL import: already cleaned up?");
2222                 RETURN(1);
2223         }
2224
2225         atomic_inc(&imp->imp_timeouts);
2226
2227         /* The DLM server doesn't want recovery run on its imports. */
2228         if (imp->imp_dlm_fake)
2229                 RETURN(1);
2230
2231         /*
2232          * If this request is for recovery or other primordial tasks,
2233          * then error it out here.
2234          */
2235         if (req->rq_ctx_init || req->rq_ctx_fini ||
2236             req->rq_send_state != LUSTRE_IMP_FULL ||
2237             imp->imp_obd->obd_no_recov) {
2238                 DEBUG_REQ(D_RPCTRACE, req, "err -110, sent_state=%s (now=%s)",
2239                           ptlrpc_import_state_name(req->rq_send_state),
2240                           ptlrpc_import_state_name(imp->imp_state));
2241                 spin_lock(&req->rq_lock);
2242                 req->rq_status = -ETIMEDOUT;
2243                 req->rq_err = 1;
2244                 spin_unlock(&req->rq_lock);
2245                 RETURN(1);
2246         }
2247
2248         /*
2249          * if a request can't be resent we can't wait for an answer after
2250          * the timeout
2251          */
2252         if (ptlrpc_no_resend(req)) {
2253                 DEBUG_REQ(D_RPCTRACE, req, "TIMEOUT-NORESEND:");
2254                 rc = 1;
2255         }
2256
2257         ptlrpc_fail_import(imp, lustre_msg_get_conn_cnt(req->rq_reqmsg));
2258
2259         RETURN(rc);
2260 }
2261
2262 /**
2263  * Time out all uncompleted requests in request set pointed by \a data
2264  * Callback used when waiting on sets with l_wait_event.
2265  * Always returns 1.
2266  */
2267 int ptlrpc_expired_set(void *data)
2268 {
2269         struct ptlrpc_request_set *set = data;
2270         struct list_head *tmp;
2271         time64_t now = ktime_get_real_seconds();
2272
2273         ENTRY;
2274         LASSERT(set != NULL);
2275
2276         /*
2277          * A timeout expired. See which reqs it applies to...
2278          */
2279         list_for_each(tmp, &set->set_requests) {
2280                 struct ptlrpc_request *req =
2281                         list_entry(tmp, struct ptlrpc_request,
2282                                    rq_set_chain);
2283
2284                 /* don't expire request waiting for context */
2285                 if (req->rq_wait_ctx)
2286                         continue;
2287
2288                 /* Request in-flight? */
2289                 if (!((req->rq_phase == RQ_PHASE_RPC &&
2290                        !req->rq_waiting && !req->rq_resend) ||
2291                       (req->rq_phase == RQ_PHASE_BULK)))
2292                         continue;
2293
2294                 if (req->rq_timedout ||     /* already dealt with */
2295                     req->rq_deadline > now) /* not expired */
2296                         continue;
2297
2298                 /*
2299                  * Deal with this guy. Do it asynchronously to not block
2300                  * ptlrpcd thread.
2301                  */
2302                 ptlrpc_expire_one_request(req, 1);
2303         }
2304
2305         /*
2306          * When waiting for a whole set, we always break out of the
2307          * sleep so we can recalculate the timeout, or enable interrupts
2308          * if everyone's timed out.
2309          */
2310         RETURN(1);
2311 }
2312
2313 /**
2314  * Sets rq_intr flag in \a req under spinlock.
2315  */
2316 void ptlrpc_mark_interrupted(struct ptlrpc_request *req)
2317 {
2318         spin_lock(&req->rq_lock);
2319         req->rq_intr = 1;
2320         spin_unlock(&req->rq_lock);
2321 }
2322 EXPORT_SYMBOL(ptlrpc_mark_interrupted);
2323
2324 /**
2325  * Interrupts (sets interrupted flag) all uncompleted requests in
2326  * a set \a data. Callback for l_wait_event for interruptible waits.
2327  */
2328 static void ptlrpc_interrupted_set(void *data)
2329 {
2330         struct ptlrpc_request_set *set = data;
2331         struct list_head *tmp;
2332
2333         LASSERT(set != NULL);
2334         CDEBUG(D_RPCTRACE, "INTERRUPTED SET %p\n", set);
2335
2336         list_for_each(tmp, &set->set_requests) {
2337                 struct ptlrpc_request *req =
2338                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
2339
2340                 if (req->rq_intr)
2341                         continue;
2342
2343                 if (req->rq_phase != RQ_PHASE_RPC &&
2344                     req->rq_phase != RQ_PHASE_UNREG_RPC &&
2345                     !req->rq_allow_intr)
2346                         continue;
2347
2348                 ptlrpc_mark_interrupted(req);
2349         }
2350 }
2351
2352 /**
2353  * Get the smallest timeout in the set; this does NOT set a timeout.
2354  */
2355 time64_t ptlrpc_set_next_timeout(struct ptlrpc_request_set *set)
2356 {
2357         struct list_head *tmp;
2358         time64_t now = ktime_get_real_seconds();
2359         int timeout = 0;
2360         struct ptlrpc_request *req;
2361         time64_t deadline;
2362
2363         ENTRY;
2364         list_for_each(tmp, &set->set_requests) {
2365                 req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
2366
2367                 /* Request in-flight? */
2368                 if (!(((req->rq_phase == RQ_PHASE_RPC) && !req->rq_waiting) ||
2369                       (req->rq_phase == RQ_PHASE_BULK) ||
2370                       (req->rq_phase == RQ_PHASE_NEW)))
2371                         continue;
2372
2373                 /* Already timed out. */
2374                 if (req->rq_timedout)
2375                         continue;
2376
2377                 /* Waiting for ctx. */
2378                 if (req->rq_wait_ctx)
2379                         continue;
2380
2381                 if (req->rq_phase == RQ_PHASE_NEW)
2382                         deadline = req->rq_sent;
2383                 else if (req->rq_phase == RQ_PHASE_RPC && req->rq_resend)
2384                         deadline = req->rq_sent;
2385                 else
2386                         deadline = req->rq_sent + req->rq_timeout;
2387
2388                 if (deadline <= now)    /* actually expired already */
2389                         timeout = 1;    /* ASAP */
2390                 else if (timeout == 0 || timeout > deadline - now)
2391                         timeout = deadline - now;
2392         }
2393         RETURN(timeout);
2394 }
2395
2396 /**
2397  * Send all unset request from the set and then wait untill all
2398  * requests in the set complete (either get a reply, timeout, get an
2399  * error or otherwise be interrupted).
2400  * Returns 0 on success or error code otherwise.
2401  */
2402 int ptlrpc_set_wait(const struct lu_env *env, struct ptlrpc_request_set *set)
2403 {
2404         struct list_head *tmp;
2405         struct ptlrpc_request *req;
2406         struct l_wait_info lwi;
2407         time64_t timeout;
2408         int rc;
2409
2410         ENTRY;
2411         if (set->set_producer)
2412                 (void)ptlrpc_set_producer(set);
2413         else
2414                 list_for_each(tmp, &set->set_requests) {
2415                         req = list_entry(tmp, struct ptlrpc_request,
2416                                          rq_set_chain);
2417                         if (req->rq_phase == RQ_PHASE_NEW)
2418                                 (void)ptlrpc_send_new_req(req);
2419                 }
2420
2421         if (list_empty(&set->set_requests))
2422                 RETURN(0);
2423
2424         do {
2425                 timeout = ptlrpc_set_next_timeout(set);
2426
2427                 /*
2428                  * wait until all complete, interrupted, or an in-flight
2429                  * req times out
2430                  */
2431                 CDEBUG(D_RPCTRACE, "set %p going to sleep for %lld seconds\n",
2432                        set, timeout);
2433
2434                 if ((timeout == 0 && !signal_pending(current)) ||
2435                     set->set_allow_intr)
2436                         /*
2437                          * No requests are in-flight (ether timed out
2438                          * or delayed), so we can allow interrupts.
2439                          * We still want to block for a limited time,
2440                          * so we allow interrupts during the timeout.
2441                          */
2442                         lwi = LWI_TIMEOUT_INTR_ALL(
2443                                         cfs_time_seconds(timeout ? timeout : 1),
2444                                         ptlrpc_expired_set,
2445                                         ptlrpc_interrupted_set, set);
2446                 else
2447                         /*
2448                          * At least one request is in flight, so no
2449                          * interrupts are allowed. Wait until all
2450                          * complete, or an in-flight req times out.
2451                          */
2452                         lwi = LWI_TIMEOUT(cfs_time_seconds(timeout ? timeout : 1),
2453                                           ptlrpc_expired_set, set);
2454
2455                 rc = l_wait_event(set->set_waitq,
2456                                   ptlrpc_check_set(NULL, set), &lwi);
2457
2458                 /*
2459                  * LU-769 - if we ignored the signal because it was already
2460                  * pending when we started, we need to handle it now or we risk
2461                  * it being ignored forever
2462                  */
2463                 if (rc == -ETIMEDOUT &&
2464                     (!lwi.lwi_allow_intr || set->set_allow_intr) &&
2465                     signal_pending(current)) {
2466                         sigset_t blocked_sigs =
2467                                            cfs_block_sigsinv(LUSTRE_FATAL_SIGS);
2468
2469                         /*
2470                          * In fact we only interrupt for the "fatal" signals
2471                          * like SIGINT or SIGKILL. We still ignore less
2472                          * important signals since ptlrpc set is not easily
2473                          * reentrant from userspace again
2474                          */
2475                         if (signal_pending(current))
2476                                 ptlrpc_interrupted_set(set);
2477                         cfs_restore_sigs(blocked_sigs);
2478                 }
2479
2480                 LASSERT(rc == 0 || rc == -EINTR || rc == -ETIMEDOUT);
2481
2482                 /*
2483                  * -EINTR => all requests have been flagged rq_intr so next
2484                  * check completes.
2485                  * -ETIMEDOUT => someone timed out.  When all reqs have
2486                  * timed out, signals are enabled allowing completion with
2487                  * EINTR.
2488                  * I don't really care if we go once more round the loop in
2489                  * the error cases -eeb.
2490                  */
2491                 if (rc == 0 && atomic_read(&set->set_remaining) == 0) {
2492                         list_for_each(tmp, &set->set_requests) {
2493                                 req = list_entry(tmp, struct ptlrpc_request,
2494                                                  rq_set_chain);
2495                                 spin_lock(&req->rq_lock);
2496                                 req->rq_invalid_rqset = 1;
2497                                 spin_unlock(&req->rq_lock);
2498                         }
2499                 }
2500         } while (rc != 0 || atomic_read(&set->set_remaining) != 0);
2501
2502         LASSERT(atomic_read(&set->set_remaining) == 0);
2503
2504         rc = set->set_rc; /* rq_status of already freed requests if any */
2505         list_for_each(tmp, &set->set_requests) {
2506                 req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
2507
2508                 LASSERT(req->rq_phase == RQ_PHASE_COMPLETE);
2509                 if (req->rq_status != 0)
2510                         rc = req->rq_status;
2511         }
2512
2513         RETURN(rc);
2514 }
2515 EXPORT_SYMBOL(ptlrpc_set_wait);
2516
2517 /**
2518  * Helper fuction for request freeing.
2519  * Called when request count reached zero and request needs to be freed.
2520  * Removes request from all sorts of sending/replay lists it might be on,
2521  * frees network buffers if any are present.
2522  * If \a locked is set, that means caller is already holding import imp_lock
2523  * and so we no longer need to reobtain it (for certain lists manipulations)
2524  */
2525 static void __ptlrpc_free_req(struct ptlrpc_request *request, int locked)
2526 {
2527         ENTRY;
2528
2529         if (!request)
2530                 RETURN_EXIT;
2531
2532         LASSERT(!request->rq_srv_req);
2533         LASSERT(request->rq_export == NULL);
2534         LASSERTF(!request->rq_receiving_reply, "req %p\n", request);
2535         LASSERTF(list_empty(&request->rq_list), "req %p\n", request);
2536         LASSERTF(list_empty(&request->rq_set_chain), "req %p\n", request);
2537         LASSERTF(!request->rq_replay, "req %p\n", request);
2538
2539         req_capsule_fini(&request->rq_pill);
2540
2541         /*
2542          * We must take it off the imp_replay_list first.  Otherwise, we'll set
2543          * request->rq_reqmsg to NULL while osc_close is dereferencing it.
2544          */
2545         if (request->rq_import) {
2546                 if (!locked)
2547                         spin_lock(&request->rq_import->imp_lock);
2548                 list_del_init(&request->rq_replay_list);
2549                 list_del_init(&request->rq_unreplied_list);
2550                 if (!locked)
2551                         spin_unlock(&request->rq_import->imp_lock);
2552         }
2553         LASSERTF(list_empty(&request->rq_replay_list), "req %p\n", request);
2554
2555         if (atomic_read(&request->rq_refcount) != 0) {
2556                 DEBUG_REQ(D_ERROR, request,
2557                           "freeing request with nonzero refcount");
2558                 LBUG();
2559         }
2560
2561         if (request->rq_repbuf)
2562                 sptlrpc_cli_free_repbuf(request);
2563
2564         if (request->rq_import) {
2565                 class_import_put(request->rq_import);
2566                 request->rq_import = NULL;
2567         }
2568         if (request->rq_bulk)
2569                 ptlrpc_free_bulk(request->rq_bulk);
2570
2571         if (request->rq_reqbuf || request->rq_clrbuf)
2572                 sptlrpc_cli_free_reqbuf(request);
2573
2574         if (request->rq_cli_ctx)
2575                 sptlrpc_req_put_ctx(request, !locked);
2576
2577         if (request->rq_pool)
2578                 __ptlrpc_free_req_to_pool(request);
2579         else
2580                 ptlrpc_request_cache_free(request);
2581         EXIT;
2582 }
2583
2584 static int __ptlrpc_req_finished(struct ptlrpc_request *request, int locked);
2585 /**
2586  * Drop one request reference. Must be called with import imp_lock held.
2587  * When reference count drops to zero, request is freed.
2588  */
2589 void ptlrpc_req_finished_with_imp_lock(struct ptlrpc_request *request)
2590 {
2591         assert_spin_locked(&request->rq_import->imp_lock);
2592         (void)__ptlrpc_req_finished(request, 1);
2593 }
2594
2595 /**
2596  * Helper function
2597  * Drops one reference count for request \a request.
2598  * \a locked set indicates that caller holds import imp_lock.
2599  * Frees the request whe reference count reaches zero.
2600  *
2601  * \retval 1    the request is freed
2602  * \retval 0    some others still hold references on the request
2603  */
2604 static int __ptlrpc_req_finished(struct ptlrpc_request *request, int locked)
2605 {
2606         int count;
2607
2608         ENTRY;
2609         if (!request)
2610                 RETURN(1);
2611
2612         LASSERT(request != LP_POISON);
2613         LASSERT(request->rq_reqmsg != LP_POISON);
2614
2615         DEBUG_REQ(D_INFO, request, "refcount now %u",
2616                   atomic_read(&request->rq_refcount) - 1);
2617
2618         spin_lock(&request->rq_lock);
2619         count = atomic_dec_return(&request->rq_refcount);
2620         LASSERTF(count >= 0, "Invalid ref count %d\n", count);
2621
2622         /*
2623          * For open RPC, the client does not know the EA size (LOV, ACL, and
2624          * so on) before replied, then the client has to reserve very large
2625          * reply buffer. Such buffer will not be released until the RPC freed.
2626          * Since The open RPC is replayable, we need to keep it in the replay
2627          * list until close. If there are a lot of files opened concurrently,
2628          * then the client may be OOM.
2629          *
2630          * If fact, it is unnecessary to keep reply buffer for open replay,
2631          * related EAs have already been saved via mdc_save_lovea() before
2632          * coming here. So it is safe to free the reply buffer some earlier
2633          * before releasing the RPC to avoid client OOM. LU-9514
2634          */
2635         if (count == 1 && request->rq_early_free_repbuf && request->rq_repbuf) {
2636                 spin_lock(&request->rq_early_free_lock);
2637                 sptlrpc_cli_free_repbuf(request);
2638                 request->rq_repbuf = NULL;
2639                 request->rq_repbuf_len = 0;
2640                 request->rq_repdata = NULL;
2641                 request->rq_reqdata_len = 0;
2642                 spin_unlock(&request->rq_early_free_lock);
2643         }
2644         spin_unlock(&request->rq_lock);
2645
2646         if (!count)
2647                 __ptlrpc_free_req(request, locked);
2648
2649         RETURN(!count);
2650 }
2651
2652 /**
2653  * Drops one reference count for a request.
2654  */
2655 void ptlrpc_req_finished(struct ptlrpc_request *request)
2656 {
2657         __ptlrpc_req_finished(request, 0);
2658 }
2659 EXPORT_SYMBOL(ptlrpc_req_finished);
2660
2661 /**
2662  * Returns xid of a \a request
2663  */
2664 __u64 ptlrpc_req_xid(struct ptlrpc_request *request)
2665 {
2666         return request->rq_xid;
2667 }
2668 EXPORT_SYMBOL(ptlrpc_req_xid);
2669
2670 /**
2671  * Disengage the client's reply buffer from the network
2672  * NB does _NOT_ unregister any client-side bulk.
2673  * IDEMPOTENT, but _not_ safe against concurrent callers.
2674  * The request owner (i.e. the thread doing the I/O) must call...
2675  * Returns 0 on success or 1 if unregistering cannot be made.
2676  */
2677 static int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async)
2678 {
2679         int rc;
2680         struct l_wait_info lwi;
2681
2682         /*
2683          * Might sleep.
2684          */
2685         LASSERT(!in_interrupt());
2686
2687         /* Let's setup deadline for reply unlink. */
2688         if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK) &&
2689             async && request->rq_reply_deadline == 0 && cfs_fail_val == 0)
2690                 request->rq_reply_deadline = ktime_get_real_seconds() +
2691                                              LONG_UNLINK;
2692
2693         /*
2694          * Nothing left to do.
2695          */
2696         if (!ptlrpc_client_recv_or_unlink(request))
2697                 RETURN(1);
2698
2699         LNetMDUnlink(request->rq_reply_md_h);
2700
2701         /*
2702          * Let's check it once again.
2703          */
2704         if (!ptlrpc_client_recv_or_unlink(request))
2705                 RETURN(1);
2706
2707         /* Move to "Unregistering" phase as reply was not unlinked yet. */
2708         ptlrpc_rqphase_move(request, RQ_PHASE_UNREG_RPC);
2709
2710         /*
2711          * Do not wait for unlink to finish.
2712          */
2713         if (async)
2714                 RETURN(0);
2715
2716         /*
2717          * We have to l_wait_event() whatever the result, to give liblustre
2718          * a chance to run reply_in_callback(), and to make sure we've
2719          * unlinked before returning a req to the pool.
2720          */
2721         for (;;) {
2722                 /* The wq argument is ignored by user-space wait_event macros */
2723                 wait_queue_head_t *wq = (request->rq_set) ?
2724                                         &request->rq_set->set_waitq :
2725                                         &request->rq_reply_waitq;
2726                 /*
2727                  * Network access will complete in finite time but the HUGE
2728                  * timeout lets us CWARN for visibility of sluggish NALs
2729                  */
2730                 lwi = LWI_TIMEOUT_INTERVAL(cfs_time_seconds(LONG_UNLINK),
2731                                            cfs_time_seconds(1), NULL, NULL);
2732                 rc = l_wait_event(*wq, !ptlrpc_client_recv_or_unlink(request),
2733                                   &lwi);
2734                 if (rc == 0) {
2735                         ptlrpc_rqphase_move(request, request->rq_next_phase);
2736                         RETURN(1);
2737                 }
2738
2739                 LASSERT(rc == -ETIMEDOUT);
2740                 DEBUG_REQ(D_WARNING, request,
2741                           "Unexpectedly long timeout receiving_reply=%d req_ulinked=%d reply_unlinked=%d",
2742                           request->rq_receiving_reply,
2743                           request->rq_req_unlinked,
2744                           request->rq_reply_unlinked);
2745         }
2746         RETURN(0);
2747 }
2748
2749 static void ptlrpc_free_request(struct ptlrpc_request *req)
2750 {
2751         spin_lock(&req->rq_lock);
2752         req->rq_replay = 0;
2753         spin_unlock(&req->rq_lock);
2754
2755         if (req->rq_commit_cb)
2756                 req->rq_commit_cb(req);
2757         list_del_init(&req->rq_replay_list);
2758
2759         __ptlrpc_req_finished(req, 1);
2760 }
2761
2762 /**
2763  * the request is committed and dropped from the replay list of its import
2764  */
2765 void ptlrpc_request_committed(struct ptlrpc_request *req, int force)
2766 {
2767         struct obd_import *imp = req->rq_import;
2768
2769         spin_lock(&imp->imp_lock);
2770         if (list_empty(&req->rq_replay_list)) {
2771                 spin_unlock(&imp->imp_lock);
2772                 return;
2773         }
2774
2775         if (force || req->rq_transno <= imp->imp_peer_committed_transno) {
2776                 if (imp->imp_replay_cursor == &req->rq_replay_list)
2777                         imp->imp_replay_cursor = req->rq_replay_list.next;
2778                 ptlrpc_free_request(req);
2779         }
2780
2781         spin_unlock(&imp->imp_lock);
2782 }
2783 EXPORT_SYMBOL(ptlrpc_request_committed);
2784
2785 /**
2786  * Iterates through replay_list on import and prunes
2787  * all requests have transno smaller than last_committed for the
2788  * import and don't have rq_replay set.
2789  * Since requests are sorted in transno order, stops when meetign first
2790  * transno bigger than last_committed.
2791  * caller must hold imp->imp_lock
2792  */
2793 void ptlrpc_free_committed(struct obd_import *imp)
2794 {
2795         struct ptlrpc_request *req, *saved;
2796         struct ptlrpc_request *last_req = NULL; /* temporary fire escape */
2797         bool skip_committed_list = true;
2798
2799         ENTRY;
2800         LASSERT(imp != NULL);
2801         assert_spin_locked(&imp->imp_lock);
2802
2803         if (imp->imp_peer_committed_transno == imp->imp_last_transno_checked &&
2804             imp->imp_generation == imp->imp_last_generation_checked) {
2805                 CDEBUG(D_INFO, "%s: skip recheck: last_committed %llu\n",
2806                        imp->imp_obd->obd_name, imp->imp_peer_committed_transno);
2807                 RETURN_EXIT;
2808         }
2809         CDEBUG(D_RPCTRACE, "%s: committing for last_committed %llu gen %d\n",
2810                imp->imp_obd->obd_name, imp->imp_peer_committed_transno,
2811                imp->imp_generation);
2812
2813         if (imp->imp_generation != imp->imp_last_generation_checked ||
2814             imp->imp_last_transno_checked == 0)
2815                 skip_committed_list = false;
2816
2817         imp->imp_last_transno_checked = imp->imp_peer_committed_transno;
2818         imp->imp_last_generation_checked = imp->imp_generation;
2819
2820         list_for_each_entry_safe(req, saved, &imp->imp_replay_list,
2821                                  rq_replay_list) {
2822                 /* XXX ok to remove when 1357 resolved - rread 05/29/03  */
2823                 LASSERT(req != last_req);
2824                 last_req = req;
2825
2826                 if (req->rq_transno == 0) {
2827                         DEBUG_REQ(D_EMERG, req, "zero transno during replay");
2828                         LBUG();
2829                 }
2830                 if (req->rq_import_generation < imp->imp_generation) {
2831                         DEBUG_REQ(D_RPCTRACE, req, "free request with old gen");
2832                         GOTO(free_req, 0);
2833                 }
2834
2835                 /* not yet committed */
2836                 if (req->rq_transno > imp->imp_peer_committed_transno) {
2837                         DEBUG_REQ(D_RPCTRACE, req, "stopping search");
2838                         break;
2839                 }
2840
2841                 if (req->rq_replay) {
2842                         DEBUG_REQ(D_RPCTRACE, req, "keeping (FL_REPLAY)");
2843                         list_move_tail(&req->rq_replay_list,
2844                                        &imp->imp_committed_list);
2845                         continue;
2846                 }
2847
2848                 DEBUG_REQ(D_INFO, req, "commit (last_committed %llu)",
2849                           imp->imp_peer_committed_transno);
2850 free_req:
2851                 ptlrpc_free_request(req);
2852         }
2853
2854         if (skip_committed_list)
2855                 GOTO(out, 0);
2856
2857         list_for_each_entry_safe(req, saved, &imp->imp_committed_list,
2858                                  rq_replay_list) {
2859                 LASSERT(req->rq_transno != 0);
2860                 if (req->rq_import_generation < imp->imp_generation ||
2861                     !req->rq_replay) {
2862                         DEBUG_REQ(D_RPCTRACE, req, "free %s open request",
2863                                   req->rq_import_generation <
2864                                   imp->imp_generation ? "stale" : "closed");
2865
2866                         if (imp->imp_replay_cursor == &req->rq_replay_list)
2867                                 imp->imp_replay_cursor =
2868                                         req->rq_replay_list.next;
2869
2870                         ptlrpc_free_request(req);
2871                 }
2872         }
2873 out:
2874         EXIT;
2875 }
2876
2877 void ptlrpc_cleanup_client(struct obd_import *imp)
2878 {
2879         ENTRY;
2880         EXIT;
2881 }
2882
2883 /**
2884  * Schedule previously sent request for resend.
2885  * For bulk requests we assign new xid (to avoid problems with
2886  * lost replies and therefore several transfers landing into same buffer
2887  * from different sending attempts).
2888  */
2889 void ptlrpc_resend_req(struct ptlrpc_request *req)
2890 {
2891         DEBUG_REQ(D_HA, req, "going to resend");
2892         spin_lock(&req->rq_lock);
2893
2894         /*
2895          * Request got reply but linked to the import list still.
2896          * Let ptlrpc_check_set() process it.
2897          */
2898         if (ptlrpc_client_replied(req)) {
2899                 spin_unlock(&req->rq_lock);
2900                 DEBUG_REQ(D_HA, req, "it has reply, so skip it");
2901                 return;
2902         }
2903
2904         req->rq_status = -EAGAIN;
2905
2906         req->rq_resend = 1;
2907         req->rq_net_err = 0;
2908         req->rq_timedout = 0;
2909
2910         ptlrpc_client_wake_req(req);
2911         spin_unlock(&req->rq_lock);
2912 }
2913
2914 /* XXX: this function and rq_status are currently unused */
2915 void ptlrpc_restart_req(struct ptlrpc_request *req)
2916 {
2917         DEBUG_REQ(D_HA, req, "restarting (possibly-)completed request");
2918         req->rq_status = -ERESTARTSYS;
2919
2920         spin_lock(&req->rq_lock);
2921         req->rq_restart = 1;
2922         req->rq_timedout = 0;
2923         ptlrpc_client_wake_req(req);
2924         spin_unlock(&req->rq_lock);
2925 }
2926
2927 /**
2928  * Grab additional reference on a request \a req
2929  */
2930 struct ptlrpc_request *ptlrpc_request_addref(struct ptlrpc_request *req)
2931 {
2932         ENTRY;
2933         atomic_inc(&req->rq_refcount);
2934         RETURN(req);
2935 }
2936 EXPORT_SYMBOL(ptlrpc_request_addref);
2937
2938 /**
2939  * Add a request to import replay_list.
2940  * Must be called under imp_lock
2941  */
2942 void ptlrpc_retain_replayable_request(struct ptlrpc_request *req,
2943                                       struct obd_import *imp)
2944 {
2945         struct list_head *tmp;
2946
2947         assert_spin_locked(&imp->imp_lock);
2948
2949         if (req->rq_transno == 0) {
2950                 DEBUG_REQ(D_EMERG, req, "saving request with zero transno");
2951                 LBUG();
2952         }
2953
2954         /*
2955          * clear this for new requests that were resent as well
2956          * as resent replayed requests.
2957          */
2958         lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT);
2959
2960         /* don't re-add requests that have been replayed */
2961         if (!list_empty(&req->rq_replay_list))
2962                 return;
2963
2964         lustre_msg_add_flags(req->rq_reqmsg, MSG_REPLAY);
2965
2966         spin_lock(&req->rq_lock);
2967         req->rq_resend = 0;
2968         spin_unlock(&req->rq_lock);
2969
2970         LASSERT(imp->imp_replayable);
2971         /* Balanced in ptlrpc_free_committed, usually. */
2972         ptlrpc_request_addref(req);
2973         list_for_each_prev(tmp, &imp->imp_replay_list) {
2974                 struct ptlrpc_request *iter = list_entry(tmp,
2975                                                          struct ptlrpc_request,
2976                                                          rq_replay_list);
2977
2978                 /*
2979                  * We may have duplicate transnos if we create and then
2980                  * open a file, or for closes retained if to match creating
2981                  * opens, so use req->rq_xid as a secondary key.
2982                  * (See bugs 684, 685, and 428.)
2983                  * XXX no longer needed, but all opens need transnos!
2984                  */
2985                 if (iter->rq_transno > req->rq_transno)
2986                         continue;
2987
2988                 if (iter->rq_transno == req->rq_transno) {
2989                         LASSERT(iter->rq_xid != req->rq_xid);
2990                         if (iter->rq_xid > req->rq_xid)
2991                                 continue;
2992                 }
2993
2994                 list_add(&req->rq_replay_list, &iter->rq_replay_list);
2995                 return;
2996         }
2997
2998         list_add(&req->rq_replay_list, &imp->imp_replay_list);
2999 }
3000
3001 /**
3002  * Send request and wait until it completes.
3003  * Returns request processing status.
3004  */
3005 int ptlrpc_queue_wait(struct ptlrpc_request *req)
3006 {
3007         struct ptlrpc_request_set *set;
3008         int rc;
3009
3010         ENTRY;
3011         LASSERT(req->rq_set == NULL);
3012         LASSERT(!req->rq_receiving_reply);
3013
3014         set = ptlrpc_prep_set();
3015         if (!set) {
3016                 CERROR("cannot allocate ptlrpc set: rc = %d\n", -ENOMEM);
3017                 RETURN(-ENOMEM);
3018         }
3019
3020         /* for distributed debugging */
3021         lustre_msg_set_status(req->rq_reqmsg, current_pid());
3022
3023         /* add a ref for the set (see comment in ptlrpc_set_add_req) */
3024         ptlrpc_request_addref(req);
3025         ptlrpc_set_add_req(set, req);
3026         rc = ptlrpc_set_wait(NULL, set);
3027         ptlrpc_set_destroy(set);
3028
3029         RETURN(rc);
3030 }
3031 EXPORT_SYMBOL(ptlrpc_queue_wait);
3032
3033 /**
3034  * Callback used for replayed requests reply processing.
3035  * In case of successful reply calls registered request replay callback.
3036  * In case of error restart replay process.
3037  */
3038 static int ptlrpc_replay_interpret(const struct lu_env *env,
3039                                    struct ptlrpc_request *req,
3040                                    void *args, int rc)
3041 {
3042         struct ptlrpc_replay_async_args *aa = args;
3043         struct obd_import *imp = req->rq_import;
3044
3045         ENTRY;
3046         atomic_dec(&imp->imp_replay_inflight);
3047
3048         /*
3049          * Note: if it is bulk replay (MDS-MDS replay), then even if
3050          * server got the request, but bulk transfer timeout, let's
3051          * replay the bulk req again
3052          */
3053         if (!ptlrpc_client_replied(req) ||
3054             (req->rq_bulk &&
3055              lustre_msg_get_status(req->rq_repmsg) == -ETIMEDOUT)) {
3056                 DEBUG_REQ(D_ERROR, req, "request replay timed out");
3057                 GOTO(out, rc = -ETIMEDOUT);
3058         }
3059
3060         if (lustre_msg_get_type(req->rq_repmsg) == PTL_RPC_MSG_ERR &&
3061             (lustre_msg_get_status(req->rq_repmsg) == -ENOTCONN ||
3062             lustre_msg_get_status(req->rq_repmsg) == -ENODEV))
3063                 GOTO(out, rc = lustre_msg_get_status(req->rq_repmsg));
3064
3065         /** VBR: check version failure */
3066         if (lustre_msg_get_status(req->rq_repmsg) == -EOVERFLOW) {
3067                 /** replay was failed due to version mismatch */
3068                 DEBUG_REQ(D_WARNING, req, "Version mismatch during replay");
3069                 spin_lock(&imp->imp_lock);
3070                 imp->imp_vbr_failed = 1;
3071                 spin_unlock(&imp->imp_lock);
3072                 lustre_msg_set_status(req->rq_repmsg, aa->praa_old_status);
3073         } else {
3074                 /** The transno had better not change over replay. */
3075                 LASSERTF(lustre_msg_get_transno(req->rq_reqmsg) ==
3076                          lustre_msg_get_transno(req->rq_repmsg) ||
3077                          lustre_msg_get_transno(req->rq_repmsg) == 0,
3078                          "%#llx/%#llx\n",
3079                          lustre_msg_get_transno(req->rq_reqmsg),
3080                          lustre_msg_get_transno(req->rq_repmsg));
3081         }
3082
3083         spin_lock(&imp->imp_lock);
3084         imp->imp_last_replay_transno = lustre_msg_get_transno(req->rq_reqmsg);
3085         spin_unlock(&imp->imp_lock);
3086         LASSERT(imp->imp_last_replay_transno);
3087
3088         /* transaction number shouldn't be bigger than the latest replayed */
3089         if (req->rq_transno > lustre_msg_get_transno(req->rq_reqmsg)) {
3090                 DEBUG_REQ(D_ERROR, req,
3091                           "Reported transno=%llu is bigger than replayed=%llu",
3092                           req->rq_transno,
3093                           lustre_msg_get_transno(req->rq_reqmsg));
3094                 GOTO(out, rc = -EINVAL);
3095         }
3096
3097         DEBUG_REQ(D_HA, req, "got reply");
3098
3099         /* let the callback do fixups, possibly including in the request */
3100         if (req->rq_replay_cb)
3101                 req->rq_replay_cb(req);
3102
3103         if (ptlrpc_client_replied(req) &&
3104             lustre_msg_get_status(req->rq_repmsg) != aa->praa_old_status) {
3105                 DEBUG_REQ(D_ERROR, req, "status %d, old was %d",
3106                           lustre_msg_get_status(req->rq_repmsg),
3107                           aa->praa_old_status);
3108
3109                 /*
3110                  * Note: If the replay fails for MDT-MDT recovery, let's
3111                  * abort all of the following requests in the replay
3112                  * and sending list, because MDT-MDT update requests
3113                  * are dependent on each other, see LU-7039
3114                  */
3115                 if (imp->imp_connect_flags_orig & OBD_CONNECT_MDS_MDS) {
3116                         struct ptlrpc_request *free_req;
3117                         struct ptlrpc_request *tmp;
3118
3119                         spin_lock(&imp->imp_lock);
3120                         list_for_each_entry_safe(free_req, tmp,
3121                                                  &imp->imp_replay_list,
3122                                                  rq_replay_list) {
3123                                 ptlrpc_free_request(free_req);
3124                         }
3125
3126                         list_for_each_entry_safe(free_req, tmp,
3127                                                  &imp->imp_committed_list,
3128                                                  rq_replay_list) {
3129                                 ptlrpc_free_request(free_req);
3130                         }
3131
3132                         list_for_each_entry_safe(free_req, tmp,
3133                                                  &imp->imp_delayed_list,
3134                                                  rq_list) {
3135                                 spin_lock(&free_req->rq_lock);
3136                                 free_req->rq_err = 1;
3137                                 free_req->rq_status = -EIO;
3138                                 ptlrpc_client_wake_req(free_req);
3139                                 spin_unlock(&free_req->rq_lock);
3140                         }
3141
3142                         list_for_each_entry_safe(free_req, tmp,
3143                                                  &imp->imp_sending_list,
3144                                                  rq_list) {
3145                                 spin_lock(&free_req->rq_lock);
3146                                 free_req->rq_err = 1;
3147                                 free_req->rq_status = -EIO;
3148                                 ptlrpc_client_wake_req(free_req);
3149                                 spin_unlock(&free_req->rq_lock);
3150                         }
3151                         spin_unlock(&imp->imp_lock);
3152                 }
3153         } else {
3154                 /* Put it back for re-replay. */
3155                 lustre_msg_set_status(req->rq_repmsg, aa->praa_old_status);
3156         }
3157
3158         /*
3159          * Errors while replay can set transno to 0, but
3160          * imp_last_replay_transno shouldn't be set to 0 anyway
3161          */
3162         if (req->rq_transno == 0)
3163                 CERROR("Transno is 0 during replay!\n");
3164
3165         /* continue with recovery */
3166         rc = ptlrpc_import_recovery_state_machine(imp);
3167  out:
3168         req->rq_send_state = aa->praa_old_state;
3169
3170         if (rc != 0)
3171                 /* this replay failed, so restart recovery */
3172                 ptlrpc_connect_import(imp);
3173
3174         RETURN(rc);
3175 }
3176
3177 /**
3178  * Prepares and queues request for replay.
3179  * Adds it to ptlrpcd queue for actual sending.
3180  * Returns 0 on success.
3181  */
3182 int ptlrpc_replay_req(struct ptlrpc_request *req)
3183 {
3184         struct ptlrpc_replay_async_args *aa;
3185
3186         ENTRY;
3187
3188         LASSERT(req->rq_import->imp_state == LUSTRE_IMP_REPLAY);
3189
3190         aa = ptlrpc_req_async_args(aa, req);
3191         memset(aa, 0, sizeof(*aa));
3192
3193         /* Prepare request to be resent with ptlrpcd */
3194         aa->praa_old_state = req->rq_send_state;
3195         req->rq_send_state = LUSTRE_IMP_REPLAY;
3196         req->rq_phase = RQ_PHASE_NEW;
3197         req->rq_next_phase = RQ_PHASE_UNDEFINED;
3198         if (req->rq_repmsg)
3199                 aa->praa_old_status = lustre_msg_get_status(req->rq_repmsg);
3200         req->rq_status = 0;
3201         req->rq_interpret_reply = ptlrpc_replay_interpret;
3202         /* Readjust the timeout for current conditions */
3203         ptlrpc_at_set_req_timeout(req);
3204
3205         /* Tell server net_latency to calculate how long to wait for reply. */
3206         lustre_msg_set_service_time(req->rq_reqmsg,
3207                                     ptlrpc_at_get_net_latency(req));
3208         DEBUG_REQ(D_HA, req, "REPLAY");
3209
3210         atomic_inc(&req->rq_import->imp_replay_inflight);
3211         spin_lock(&req->rq_lock);
3212         req->rq_early_free_repbuf = 0;
3213         spin_unlock(&req->rq_lock);
3214         ptlrpc_request_addref(req); /* ptlrpcd needs a ref */
3215
3216         ptlrpcd_add_req(req);
3217         RETURN(0);
3218 }
3219
3220 /**
3221  * Aborts all in-flight request on import \a imp sending and delayed lists
3222  */
3223 void ptlrpc_abort_inflight(struct obd_import *imp)
3224 {
3225         struct list_head *tmp, *n;
3226         ENTRY;
3227
3228         /*
3229          * Make sure that no new requests get processed for this import.
3230          * ptlrpc_{queue,set}_wait must (and does) hold imp_lock while testing
3231          * this flag and then putting requests on sending_list or delayed_list.
3232          */
3233         assert_spin_locked(&imp->imp_lock);
3234
3235         /*
3236          * XXX locking?  Maybe we should remove each request with the list
3237          * locked?  Also, how do we know if the requests on the list are
3238          * being freed at this time?
3239          */
3240         list_for_each_safe(tmp, n, &imp->imp_sending_list) {
3241                 struct ptlrpc_request *req = list_entry(tmp,
3242                                                         struct ptlrpc_request,
3243                                                         rq_list);
3244
3245                 DEBUG_REQ(D_RPCTRACE, req, "inflight");
3246
3247                 spin_lock(&req->rq_lock);
3248                 if (req->rq_import_generation < imp->imp_generation) {
3249                         req->rq_err = 1;
3250                         req->rq_status = -EIO;
3251                         ptlrpc_client_wake_req(req);
3252                 }
3253                 spin_unlock(&req->rq_lock);
3254         }
3255
3256         list_for_each_safe(tmp, n, &imp->imp_delayed_list) {
3257                 struct ptlrpc_request *req =
3258                         list_entry(tmp, struct ptlrpc_request, rq_list);
3259
3260                 DEBUG_REQ(D_RPCTRACE, req, "aborting waiting req");
3261
3262                 spin_lock(&req->rq_lock);
3263                 if (req->rq_import_generation < imp->imp_generation) {
3264                         req->rq_err = 1;
3265                         req->rq_status = -EIO;
3266                         ptlrpc_client_wake_req(req);
3267                 }
3268                 spin_unlock(&req->rq_lock);
3269         }
3270
3271         /*
3272          * Last chance to free reqs left on the replay list, but we
3273          * will still leak reqs that haven't committed.
3274          */
3275         if (imp->imp_replayable)
3276                 ptlrpc_free_committed(imp);
3277
3278         EXIT;
3279 }
3280
3281 /**
3282  * Abort all uncompleted requests in request set \a set
3283  */
3284 void ptlrpc_abort_set(struct ptlrpc_request_set *set)
3285 {
3286         struct list_head *tmp, *pos;
3287
3288         LASSERT(set != NULL);
3289
3290         list_for_each_safe(pos, tmp, &set->set_requests) {
3291                 struct ptlrpc_request *req =
3292                         list_entry(pos, struct ptlrpc_request,
3293                                    rq_set_chain);
3294
3295                 spin_lock(&req->rq_lock);
3296                 if (req->rq_phase != RQ_PHASE_RPC) {
3297                         spin_unlock(&req->rq_lock);
3298                         continue;
3299                 }
3300
3301                 req->rq_err = 1;
3302                 req->rq_status = -EINTR;
3303                 ptlrpc_client_wake_req(req);
3304                 spin_unlock(&req->rq_lock);
3305         }
3306 }
3307
3308 /**
3309  * Initialize the XID for the node.  This is common among all requests on
3310  * this node, and only requires the property that it is monotonically
3311  * increasing.  It does not need to be sequential.  Since this is also used
3312  * as the RDMA match bits, it is important that a single client NOT have
3313  * the same match bits for two different in-flight requests, hence we do
3314  * NOT want to have an XID per target or similar.
3315  *
3316  * To avoid an unlikely collision between match bits after a client reboot
3317  * (which would deliver old data into the wrong RDMA buffer) initialize
3318  * the XID based on the current time, assuming a maximum RPC rate of 1M RPC/s.
3319  * If the time is clearly incorrect, we instead use a 62-bit random number.
3320  * In the worst case the random number will overflow 1M RPCs per second in
3321  * 9133 years, or permutations thereof.
3322  */
3323 #define YEAR_2004 (1ULL << 30)
3324 void ptlrpc_init_xid(void)
3325 {
3326         time64_t now = ktime_get_real_seconds();
3327         u64 xid;
3328
3329         if (now < YEAR_2004) {
3330                 get_random_bytes(&xid, sizeof(xid));
3331                 xid >>= 2;
3332                 xid |= (1ULL << 61);
3333         } else {
3334                 xid = (u64)now << 20;
3335         }
3336
3337         /* Need to always be aligned to a power-of-two for mutli-bulk BRW */
3338         CLASSERT((PTLRPC_BULK_OPS_COUNT & (PTLRPC_BULK_OPS_COUNT - 1)) == 0);
3339         xid &= PTLRPC_BULK_OPS_MASK;
3340         atomic64_set(&ptlrpc_last_xid, xid);
3341 }
3342
3343 /**
3344  * Increase xid and returns resulting new value to the caller.
3345  *
3346  * Multi-bulk BRW RPCs consume multiple XIDs for each bulk transfer, starting
3347  * at the returned xid, up to xid + PTLRPC_BULK_OPS_COUNT - 1. The BRW RPC
3348  * itself uses the last bulk xid needed, so the server can determine the
3349  * the number of bulk transfers from the RPC XID and a bitmask.  The starting
3350  * xid must align to a power-of-two value.
3351  *
3352  * This is assumed to be true due to the initial ptlrpc_last_xid
3353  * value also being initialized to a power-of-two value. LU-1431
3354  */
3355 __u64 ptlrpc_next_xid(void)
3356 {
3357         return atomic64_add_return(PTLRPC_BULK_OPS_COUNT, &ptlrpc_last_xid);
3358 }
3359
3360 /**
3361  * If request has a new allocated XID (new request or EINPROGRESS resend),
3362  * use this XID as matchbits of bulk, otherwise allocate a new matchbits for
3363  * request to ensure previous bulk fails and avoid problems with lost replies
3364  * and therefore several transfers landing into the same buffer from different
3365  * sending attempts.
3366  */
3367 void ptlrpc_set_bulk_mbits(struct ptlrpc_request *req)
3368 {
3369         struct ptlrpc_bulk_desc *bd = req->rq_bulk;
3370
3371         LASSERT(bd != NULL);
3372
3373         /*
3374          * Generate new matchbits for all resend requests, including
3375          * resend replay.
3376          */
3377         if (req->rq_resend) {
3378                 __u64 old_mbits = req->rq_mbits;
3379
3380                 /*
3381                  * First time resend on -EINPROGRESS will generate new xid,
3382                  * so we can actually use the rq_xid as rq_mbits in such case,
3383                  * however, it's bit hard to distinguish such resend with a
3384                  * 'resend for the -EINPROGRESS resend'. To make it simple,
3385                  * we opt to generate mbits for all resend cases.
3386                  */
3387                 if (OCD_HAS_FLAG(&bd->bd_import->imp_connect_data,
3388                                  BULK_MBITS)) {
3389                         req->rq_mbits = ptlrpc_next_xid();
3390                 } else {
3391                         /*
3392                          * Old version transfers rq_xid to peer as
3393                          * matchbits.
3394                          */
3395                         spin_lock(&req->rq_import->imp_lock);
3396                         list_del_init(&req->rq_unreplied_list);
3397                         ptlrpc_assign_next_xid_nolock(req);
3398                         spin_unlock(&req->rq_import->imp_lock);
3399                         req->rq_mbits = req->rq_xid;
3400                 }
3401                 CDEBUG(D_HA, "resend bulk old x%llu new x%llu\n",
3402                        old_mbits, req->rq_mbits);
3403         } else if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)) {
3404                 /* Request being sent first time, use xid as matchbits. */
3405                 if (OCD_HAS_FLAG(&bd->bd_import->imp_connect_data, BULK_MBITS)
3406                     || req->rq_mbits == 0) {
3407                         req->rq_mbits = req->rq_xid;
3408                 } else {
3409                         int total_md = (bd->bd_iov_count + LNET_MAX_IOV - 1) /
3410                                         LNET_MAX_IOV;
3411                         req->rq_mbits -= total_md - 1;
3412                 }
3413         } else {
3414                 /*
3415                  * Replay request, xid and matchbits have already been
3416                  * correctly assigned.
3417                  */
3418                 return;
3419         }
3420
3421         /*
3422          * For multi-bulk RPCs, rq_mbits is the last mbits needed for bulks so
3423          * that server can infer the number of bulks that were prepared,
3424          * see LU-1431
3425          */
3426         req->rq_mbits += ((bd->bd_iov_count + LNET_MAX_IOV - 1) /
3427                           LNET_MAX_IOV) - 1;
3428
3429         /*
3430          * Set rq_xid as rq_mbits to indicate the final bulk for the old
3431          * server which does not support OBD_CONNECT_BULK_MBITS. LU-6808.
3432          *
3433          * It's ok to directly set the rq_xid here, since this xid bump
3434          * won't affect the request position in unreplied list.
3435          */
3436         if (!OCD_HAS_FLAG(&bd->bd_import->imp_connect_data, BULK_MBITS))
3437                 req->rq_xid = req->rq_mbits;
3438 }
3439
3440 /**
3441  * Get a glimpse at what next xid value might have been.
3442  * Returns possible next xid.
3443  */
3444 __u64 ptlrpc_sample_next_xid(void)
3445 {
3446         return atomic64_read(&ptlrpc_last_xid) + PTLRPC_BULK_OPS_COUNT;
3447 }
3448 EXPORT_SYMBOL(ptlrpc_sample_next_xid);
3449
3450 /**
3451  * Functions for operating ptlrpc workers.
3452  *
3453  * A ptlrpc work is a function which will be running inside ptlrpc context.
3454  * The callback shouldn't sleep otherwise it will block that ptlrpcd thread.
3455  *
3456  * 1. after a work is created, it can be used many times, that is:
3457  *         handler = ptlrpcd_alloc_work();
3458  *         ptlrpcd_queue_work();
3459  *
3460  *    queue it again when necessary:
3461  *         ptlrpcd_queue_work();
3462  *         ptlrpcd_destroy_work();
3463  * 2. ptlrpcd_queue_work() can be called by multiple processes meanwhile, but
3464  *    it will only be queued once in any time. Also as its name implies, it may
3465  *    have delay before it really runs by ptlrpcd thread.
3466  */
3467 struct ptlrpc_work_async_args {
3468         int (*cb)(const struct lu_env *, void *);
3469         void *cbdata;
3470 };
3471
3472 static void ptlrpcd_add_work_req(struct ptlrpc_request *req)
3473 {
3474         /* re-initialize the req */
3475         req->rq_timeout         = obd_timeout;
3476         req->rq_sent            = ktime_get_real_seconds();
3477         req->rq_deadline        = req->rq_sent + req->rq_timeout;
3478         req->rq_phase           = RQ_PHASE_INTERPRET;
3479         req->rq_next_phase      = RQ_PHASE_COMPLETE;
3480         req->rq_xid             = ptlrpc_next_xid();
3481         req->rq_import_generation = req->rq_import->imp_generation;
3482
3483         ptlrpcd_add_req(req);
3484 }
3485
3486 static int work_interpreter(const struct lu_env *env,
3487                             struct ptlrpc_request *req, void *args, int rc)
3488 {
3489         struct ptlrpc_work_async_args *arg = args;
3490
3491         LASSERT(ptlrpcd_check_work(req));
3492         LASSERT(arg->cb != NULL);
3493
3494         rc = arg->cb(env, arg->cbdata);
3495
3496         list_del_init(&req->rq_set_chain);
3497         req->rq_set = NULL;
3498
3499         if (atomic_dec_return(&req->rq_refcount) > 1) {
3500                 atomic_set(&req->rq_refcount, 2);
3501                 ptlrpcd_add_work_req(req);
3502         }
3503         return rc;
3504 }
3505
3506 static int worker_format;
3507
3508 static int ptlrpcd_check_work(struct ptlrpc_request *req)
3509 {
3510         return req->rq_pill.rc_fmt == (void *)&worker_format;
3511 }
3512
3513 /**
3514  * Create a work for ptlrpc.
3515  */
3516 void *ptlrpcd_alloc_work(struct obd_import *imp,
3517                          int (*cb)(const struct lu_env *, void *), void *cbdata)
3518 {
3519         struct ptlrpc_request *req = NULL;
3520         struct ptlrpc_work_async_args *args;
3521
3522         ENTRY;
3523         might_sleep();
3524
3525         if (!cb)
3526                 RETURN(ERR_PTR(-EINVAL));
3527
3528         /* copy some code from deprecated fakereq. */
3529         req = ptlrpc_request_cache_alloc(GFP_NOFS);
3530         if (!req) {
3531                 CERROR("ptlrpc: run out of memory!\n");
3532                 RETURN(ERR_PTR(-ENOMEM));
3533         }
3534
3535         ptlrpc_cli_req_init(req);
3536
3537         req->rq_send_state = LUSTRE_IMP_FULL;
3538         req->rq_type = PTL_RPC_MSG_REQUEST;
3539         req->rq_import = class_import_get(imp);
3540         req->rq_interpret_reply = work_interpreter;
3541         /* don't want reply */
3542         req->rq_no_delay = req->rq_no_resend = 1;
3543         req->rq_pill.rc_fmt = (void *)&worker_format;
3544
3545         args = ptlrpc_req_async_args(args, req);
3546         args->cb     = cb;
3547         args->cbdata = cbdata;
3548
3549         RETURN(req);
3550 }
3551 EXPORT_SYMBOL(ptlrpcd_alloc_work);
3552
3553 void ptlrpcd_destroy_work(void *handler)
3554 {
3555         struct ptlrpc_request *req = handler;
3556
3557         if (req)
3558                 ptlrpc_req_finished(req);
3559 }
3560 EXPORT_SYMBOL(ptlrpcd_destroy_work);
3561
3562 int ptlrpcd_queue_work(void *handler)
3563 {
3564         struct ptlrpc_request *req = handler;
3565
3566         /*
3567          * Check if the req is already being queued.
3568          *
3569          * Here comes a trick: it lacks a way of checking if a req is being
3570          * processed reliably in ptlrpc. Here I have to use refcount of req
3571          * for this purpose. This is okay because the caller should use this
3572          * req as opaque data. - Jinshan
3573          */
3574         LASSERT(atomic_read(&req->rq_refcount) > 0);
3575         if (atomic_inc_return(&req->rq_refcount) == 2)
3576                 ptlrpcd_add_work_req(req);
3577         return 0;
3578 }
3579 EXPORT_SYMBOL(ptlrpcd_queue_work);