Whamcloud - gitweb
b=23701 a build fix
[fs/lustre-release.git] / lustre / ptlrpc / client.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_RPC
38 #ifndef __KERNEL__
39 #include <errno.h>
40 #include <signal.h>
41 #include <liblustre.h>
42 #endif
43
44 #include <obd_support.h>
45 #include <obd_class.h>
46 #include <lustre_lib.h>
47 #include <lustre_ha.h>
48 #include <lustre_import.h>
49
50 #include "ptlrpc_internal.h"
51
52 void ptlrpc_init_client(int req_portal, int rep_portal, char *name,
53                         struct ptlrpc_client *cl)
54 {
55         cl->cli_request_portal = req_portal;
56         cl->cli_reply_portal   = rep_portal;
57         cl->cli_name           = name;
58 }
59
60 struct ptlrpc_connection *ptlrpc_uuid_to_connection(struct obd_uuid *uuid)
61 {
62         struct ptlrpc_connection *c;
63         lnet_nid_t                self;
64         lnet_process_id_t         peer;
65         int                       err;
66
67         err = ptlrpc_uuid_to_peer(uuid, &peer, &self);
68         if (err != 0) {
69                 CNETERR("cannot find peer %s!\n", uuid->uuid);
70                 return NULL;
71         }
72
73         c = ptlrpc_connection_get(peer, self, uuid);
74         if (c) {
75                 memcpy(c->c_remote_uuid.uuid,
76                        uuid->uuid, sizeof(c->c_remote_uuid.uuid));
77         }
78
79         CDEBUG(D_INFO, "%s -> %p\n", uuid->uuid, c);
80
81         return c;
82 }
83
84 static inline struct ptlrpc_bulk_desc *new_bulk(int npages, int type, int portal)
85 {
86         struct ptlrpc_bulk_desc *desc;
87
88         OBD_ALLOC(desc, offsetof (struct ptlrpc_bulk_desc, bd_iov[npages]));
89         if (!desc)
90                 return NULL;
91
92         spin_lock_init(&desc->bd_lock);
93         cfs_waitq_init(&desc->bd_waitq);
94         desc->bd_max_iov = npages;
95         desc->bd_iov_count = 0;
96         desc->bd_md_h = LNET_INVALID_HANDLE;
97         desc->bd_portal = portal;
98         desc->bd_type = type;
99
100         return desc;
101 }
102
103 struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_imp(struct ptlrpc_request *req,
104                                               int npages, int type, int portal)
105 {
106         struct obd_import *imp = req->rq_import;
107         struct ptlrpc_bulk_desc *desc;
108
109         ENTRY;
110         LASSERT(type == BULK_PUT_SINK || type == BULK_GET_SOURCE);
111         desc = new_bulk(npages, type, portal);
112         if (desc == NULL)
113                 RETURN(NULL);
114
115         desc->bd_import_generation = req->rq_import_generation;
116         desc->bd_import = class_import_get(imp);
117         desc->bd_req = req;
118
119         desc->bd_cbid.cbid_fn  = client_bulk_callback;
120         desc->bd_cbid.cbid_arg = desc;
121
122         /* This makes req own desc, and free it when she frees herself */
123         req->rq_bulk = desc;
124
125         return desc;
126 }
127
128 struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_exp(struct ptlrpc_request *req,
129                                               int npages, int type, int portal)
130 {
131         struct obd_export *exp = req->rq_export;
132         struct ptlrpc_bulk_desc *desc;
133
134         ENTRY;
135         LASSERT(type == BULK_PUT_SOURCE || type == BULK_GET_SINK);
136
137         desc = new_bulk(npages, type, portal);
138         if (desc == NULL)
139                 RETURN(NULL);
140
141         desc->bd_export = class_export_get(exp);
142         desc->bd_req = req;
143
144         desc->bd_cbid.cbid_fn  = server_bulk_callback;
145         desc->bd_cbid.cbid_arg = desc;
146
147         /* NB we don't assign rq_bulk here; server-side requests are
148          * re-used, and the handler frees the bulk desc explicitly. */
149
150         return desc;
151 }
152
153 void ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc,
154                            cfs_page_t *page, int pageoffset, int len)
155 {
156         LASSERT(desc->bd_iov_count < desc->bd_max_iov);
157         LASSERT(page != NULL);
158         LASSERT(pageoffset >= 0);
159         LASSERT(len > 0);
160         LASSERT(pageoffset + len <= CFS_PAGE_SIZE);
161
162         desc->bd_nob += len;
163
164         cfs_page_pin(page);
165         ptlrpc_add_bulk_page(desc, page, pageoffset, len);
166 }
167
168 void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *desc)
169 {
170         int i;
171         ENTRY;
172
173         LASSERT(desc != NULL);
174         LASSERT(desc->bd_iov_count != LI_POISON); /* not freed already */
175         LASSERT(!desc->bd_network_rw);         /* network hands off or */
176         LASSERT((desc->bd_export != NULL) ^ (desc->bd_import != NULL));
177         if (desc->bd_export)
178                 class_export_put(desc->bd_export);
179         else
180                 class_import_put(desc->bd_import);
181
182         for (i = 0; i < desc->bd_iov_count ; i++)
183                 cfs_page_unpin(desc->bd_iov[i].kiov_page);
184
185         OBD_FREE(desc, offsetof(struct ptlrpc_bulk_desc,
186                                 bd_iov[desc->bd_max_iov]));
187         EXIT;
188 }
189
190 /* Set server timelimit for this req */
191 void ptlrpc_at_set_req_timeout(struct ptlrpc_request *req)
192 {
193         __u32 serv_est;
194         int idx;
195         struct imp_at *at;
196
197         LASSERT(req->rq_import);
198
199         if (AT_OFF) {
200                 /* non-AT settings */
201                 req->rq_timeout = req->rq_import->imp_server_timeout ?
202                                   obd_timeout / 2 : obd_timeout;
203         } else {
204                 at = &req->rq_import->imp_at;
205                 idx = import_at_get_index(req->rq_import,
206                                           req->rq_request_portal);
207                 serv_est = at_get(&at->iat_service_estimate[idx]);
208                 req->rq_timeout = at_est2timeout(serv_est);
209         }
210
211         /* We could get even fancier here, using history to predict increased
212            loading... */
213
214         /* Let the server know what this RPC timeout is by putting it in the
215            reqmsg*/
216         lustre_msg_set_timeout(req->rq_reqmsg, req->rq_timeout);
217 }
218
219 /* Adjust max service estimate based on server value */
220 static void ptlrpc_at_adj_service(struct ptlrpc_request *req,
221                                   unsigned int serv_est)
222 {
223         int idx;
224         unsigned int oldse;
225         struct imp_at *at;
226
227         LASSERT(req->rq_import);
228         at = &req->rq_import->imp_at;
229
230         idx = import_at_get_index(req->rq_import, req->rq_request_portal);
231         /* max service estimates are tracked on the server side,
232            so just keep minimal history here */
233         oldse = at_measured(&at->iat_service_estimate[idx], serv_est);
234         if (oldse != 0)
235                 CDEBUG(D_ADAPTTO, "The RPC service estimate for %s ptl %d "
236                        "has changed from %d to %d\n",
237                        req->rq_import->imp_obd->obd_name,req->rq_request_portal,
238                        oldse, at_get(&at->iat_service_estimate[idx]));
239 }
240
241 /* Expected network latency per remote node (secs) */
242 int ptlrpc_at_get_net_latency(struct ptlrpc_request *req)
243 {
244         return AT_OFF ? 0 : at_get(&req->rq_import->imp_at.iat_net_latency);
245 }
246
247 /* Adjust expected network latency */
248 static void ptlrpc_at_adj_net_latency(struct ptlrpc_request *req,
249                                       unsigned int service_time)
250 {
251         unsigned int nl, oldnl;
252         struct imp_at *at;
253         time_t now = cfs_time_current_sec();
254
255         LASSERT(req->rq_import);
256         at = &req->rq_import->imp_at;
257
258         /* Network latency is total time less server processing time */
259         nl = max_t(int, now - req->rq_sent - service_time, 0) + 1/*st rounding*/;
260         if (service_time > now - req->rq_sent + 3 /* bz16408 */)
261                 CWARN("Reported service time %u > total measured time %ld\n",
262                       service_time, now - req->rq_sent);
263
264         oldnl = at_measured(&at->iat_net_latency, nl);
265         if (oldnl != 0)
266                 CDEBUG(D_ADAPTTO, "The network latency for %s (nid %s) "
267                        "has changed from %d to %d\n",
268                        req->rq_import->imp_obd->obd_name,
269                        obd_uuid2str(
270                                &req->rq_import->imp_connection->c_remote_uuid),
271                        oldnl, at_get(&at->iat_net_latency));
272 }
273
274 static int unpack_reply_common(struct ptlrpc_request *req)
275 {
276         int rc;
277
278         req->rq_rep_swab_mask = 0;
279         rc = lustre_unpack_msg(req->rq_repmsg, req->rq_nob_received);
280         if (rc < 0) {
281                 DEBUG_REQ(D_ERROR, req, "unpack_rep failed: %d", rc);
282                 return(-EPROTO);
283         }
284
285         if (rc > 0)
286                 lustre_set_rep_swabbed(req, MSG_PTLRPC_HEADER_OFF);
287
288         return rc;
289 }
290
291 static int unpack_reply(struct ptlrpc_request *req)
292 {
293         int rc;
294
295         rc = unpack_reply_common(req);
296         if (rc < 0)
297                 return rc;
298
299         rc = lustre_unpack_rep_ptlrpc_body(req, MSG_PTLRPC_BODY_OFF);
300         if (rc) {
301                 DEBUG_REQ(D_ERROR, req, "unpack ptlrpc body failed: %d", rc);
302                 return(-EPROTO);
303         }
304         return 0;
305 }
306
307 static inline void unpack_reply_free_msg(struct lustre_msg *msg, int len)
308 {
309         OBD_FREE(msg, len);
310 }
311
312 static int unpack_reply_copy_msg(struct ptlrpc_request *req,
313                                  struct lustre_msg **msg, int *len)
314 {
315         struct lustre_msg *msgcpy;
316         __u32 csum_calc, csum_get;
317         int lencpy, rc;
318         ENTRY;
319
320         LASSERT_SPIN_LOCKED(&req->rq_lock);
321         *msg = NULL;
322         *len = 0;
323
324         /* Swabbing required when rc == 1 */
325         rc = unpack_reply_common(req);
326         if (rc < 0)
327                 RETURN(rc);
328
329         lencpy = req->rq_replen;
330         spin_unlock(&req->rq_lock);
331
332         OBD_ALLOC(msgcpy, lencpy);
333         if (!msgcpy) {
334                 spin_lock(&req->rq_lock);
335                 RETURN(-ENOMEM);
336         }
337         spin_lock(&req->rq_lock);
338
339         /* Checksum must be calculated before being unswabbed.  If the magic
340          * in the copy is unswabbed discard like the checksum failure case */
341         memcpy(msgcpy, req->rq_repmsg, lencpy);
342         if (lustre_msg_need_swab(msgcpy)) {
343                 DEBUG_REQ(D_NET, req, "incorrect message magic: %08x\n",
344                           msgcpy->lm_magic);
345                 GOTO(err, rc = -EINVAL);
346         }
347
348         csum_calc = lustre_msg_calc_cksum(msgcpy);
349
350         /* Unpack the copy the original rq_repmsg is untouched */
351         rc = lustre_unpack_msg_ptlrpc_body(msgcpy, MSG_PTLRPC_BODY_OFF, rc);
352         if (rc) {
353                 DEBUG_REQ(D_ERROR, req, "unpack msg copy failed: %d", rc);
354                 GOTO(err, rc = -EPROTO);
355         }
356
357         /* For early replies the LND may update repmsg outside req->rq_lock
358          * resulting in a checksum failure which is non-harmful */
359         csum_get = lustre_msg_get_cksum(msgcpy);
360         if (csum_calc != csum_get) {
361                 DEBUG_REQ(D_NET, req, "checksum mismatch: %x != %x\n",
362                           csum_calc, csum_get);
363                 GOTO(err, rc = -EINVAL);
364         }
365
366         *msg = msgcpy;
367         *len = lencpy;
368         return 0;
369 err:
370         unpack_reply_free_msg(msgcpy, lencpy);
371         return rc;
372 }
373
374 /* Handle an early reply message.  To prevent a real reply from arriving
375  * and changing req->rq_repmsg this func is called under the rq_lock */
376 static int ptlrpc_at_recv_early_reply(struct ptlrpc_request *req) {
377         struct lustre_msg *msg;
378         time_t olddl;
379         int len, rc;
380         ENTRY;
381
382         LASSERT_SPIN_LOCKED(&req->rq_lock);
383         req->rq_early = 0;
384
385         /* All early replys for this request use a single repbuf which can
386          * be updated outside the req->rq_lock.  To prevent racing we create
387          * a copy of the repmsg and verify its checksum before it is used. */
388         rc = unpack_reply_copy_msg(req, &msg, &len);
389         if (rc) {
390                 /* Let's just ignore it - same as if it never got here */
391                 CDEBUG(D_ADAPTTO, "Discarding racing early reply: %d\n", rc);
392                 RETURN(rc);
393         }
394
395         /* Expecting to increase the service time estimate here */
396         ptlrpc_at_adj_service(req, lustre_msg_get_timeout(msg));
397         ptlrpc_at_adj_net_latency(req, lustre_msg_get_service_time(msg));
398         /* Adjust the local timeout for this req */
399         ptlrpc_at_set_req_timeout(req);
400
401         olddl = req->rq_deadline;
402         /* Server assumes it now has rq_timeout from when it sent the
403            early reply, so client should give it at least that long. */
404         req->rq_deadline = cfs_time_current_sec() + req->rq_timeout +
405                            ptlrpc_at_get_net_latency(req);
406
407         DEBUG_REQ(D_ADAPTTO, req,
408                   "Early reply #%d, new deadline in %lds (%+lds)",
409                   req->rq_early_count, req->rq_deadline -
410                   cfs_time_current_sec(), req->rq_deadline - olddl);
411
412         unpack_reply_free_msg(msg, len);
413         RETURN(rc);
414 }
415
416 void ptlrpc_free_rq_pool(struct ptlrpc_request_pool *pool)
417 {
418         struct list_head *l, *tmp;
419         struct ptlrpc_request *req;
420
421         LASSERT(pool != NULL);
422
423         spin_lock(&pool->prp_lock);
424         list_for_each_safe(l, tmp, &pool->prp_req_list) {
425                 req = list_entry(l, struct ptlrpc_request, rq_list);
426                 list_del(&req->rq_list);
427                 LASSERT(req->rq_reqmsg);
428                 OBD_FREE(req->rq_reqmsg, pool->prp_rq_size);
429                 OBD_FREE(req, sizeof(*req));
430         }
431         spin_unlock(&pool->prp_lock);
432         OBD_FREE(pool, sizeof(*pool));
433 }
434
435 void ptlrpc_add_rqs_to_pool(struct ptlrpc_request_pool *pool, int num_rq)
436 {
437         int i;
438         int size = 1;
439
440         while (size < pool->prp_rq_size)
441                 size <<= 1;
442
443         LASSERTF(list_empty(&pool->prp_req_list) || size == pool->prp_rq_size,
444                  "Trying to change pool size with nonempty pool "
445                  "from %d to %d bytes\n", pool->prp_rq_size, size);
446
447         spin_lock(&pool->prp_lock);
448         pool->prp_rq_size = size;
449         for (i = 0; i < num_rq; i++) {
450                 struct ptlrpc_request *req;
451                 struct lustre_msg *msg;
452
453                 spin_unlock(&pool->prp_lock);
454                 OBD_ALLOC(req, sizeof(struct ptlrpc_request));
455                 if (!req)
456                         return;
457                 OBD_ALLOC_GFP(msg, size, CFS_ALLOC_STD);
458                 if (!msg) {
459                         OBD_FREE(req, sizeof(struct ptlrpc_request));
460                         return;
461                 }
462                 req->rq_reqmsg = msg;
463                 req->rq_pool = pool;
464                 spin_lock(&pool->prp_lock);
465                 list_add_tail(&req->rq_list, &pool->prp_req_list);
466         }
467         spin_unlock(&pool->prp_lock);
468         return;
469 }
470
471 struct ptlrpc_request_pool *ptlrpc_init_rq_pool(int num_rq, int msgsize,
472                                                 void (*populate_pool)(struct ptlrpc_request_pool *, int))
473 {
474         struct ptlrpc_request_pool *pool;
475
476         OBD_ALLOC(pool, sizeof (struct ptlrpc_request_pool));
477         if (!pool)
478                 return NULL;
479
480         /* Request next power of two for the allocation, because internally
481            kernel would do exactly this */
482
483         spin_lock_init(&pool->prp_lock);
484         CFS_INIT_LIST_HEAD(&pool->prp_req_list);
485         pool->prp_rq_size = msgsize;
486         pool->prp_populate = populate_pool;
487
488         populate_pool(pool, num_rq);
489
490         if (list_empty(&pool->prp_req_list)) {
491                 /* have not allocated a single request for the pool */
492                 OBD_FREE(pool, sizeof (struct ptlrpc_request_pool));
493                 pool = NULL;
494         }
495         return pool;
496 }
497
498 static struct ptlrpc_request *ptlrpc_prep_req_from_pool(struct ptlrpc_request_pool *pool)
499 {
500         struct ptlrpc_request *request;
501         struct lustre_msg *reqmsg;
502
503         if (!pool)
504                 return NULL;
505
506         spin_lock(&pool->prp_lock);
507
508         /* See if we have anything in a pool, and bail out if nothing,
509          * in writeout path, where this matters, this is safe to do, because
510          * nothing is lost in this case, and when some in-flight requests
511          * complete, this code will be called again. */
512         if (unlikely(list_empty(&pool->prp_req_list))) {
513                 spin_unlock(&pool->prp_lock);
514                 return NULL;
515         }
516
517         request = list_entry(pool->prp_req_list.next, struct ptlrpc_request,
518                              rq_list);
519         list_del_init(&request->rq_list);
520         spin_unlock(&pool->prp_lock);
521
522         LASSERT(request->rq_reqmsg);
523         LASSERT(request->rq_pool);
524
525         reqmsg = request->rq_reqmsg;
526         memset(request, 0, sizeof(*request));
527         request->rq_reqmsg = reqmsg;
528         request->rq_pool = pool;
529         request->rq_reqlen = pool->prp_rq_size;
530         return request;
531 }
532
533 struct ptlrpc_request *
534 ptlrpc_prep_req_pool(struct obd_import *imp, __u32 version, int opcode,
535                      int count, __u32 *lengths, char **bufs,
536                      struct ptlrpc_request_pool *pool)
537 {
538         struct ptlrpc_request *request = NULL;
539         int rc;
540         ENTRY;
541
542         /* The obd disconnected */
543         if (imp == NULL)
544                 return NULL;
545
546         LASSERT(imp != LP_POISON);
547         LASSERT((unsigned long)imp->imp_client > 0x1000);
548         LASSERT(imp->imp_client != LP_POISON);
549
550         if (pool)
551                 request = ptlrpc_prep_req_from_pool(pool);
552
553         if (!request)
554                 OBD_ALLOC(request, sizeof(*request));
555
556         if (!request) {
557                 CERROR("request allocation out of memory\n");
558                 RETURN(NULL);
559         }
560
561         rc = lustre_pack_request(request, imp->imp_msg_magic, count, lengths,
562                                  bufs);
563         if (rc) {
564                 LASSERT(!request->rq_pool);
565                 OBD_FREE(request, sizeof(*request));
566                 RETURN(NULL);
567         }
568
569         lustre_msg_add_version(request->rq_reqmsg, version);
570         request->rq_send_state = LUSTRE_IMP_FULL;
571         request->rq_type = PTL_RPC_MSG_REQUEST;
572         request->rq_import = class_import_get(imp);
573         request->rq_export = NULL;
574
575         request->rq_req_cbid.cbid_fn  = request_out_callback;
576         request->rq_req_cbid.cbid_arg = request;
577
578         request->rq_reply_cbid.cbid_fn  = reply_in_callback;
579         request->rq_reply_cbid.cbid_arg = request;
580
581         request->rq_reply_deadline = 0;
582         request->rq_phase = RQ_PHASE_NEW;
583         request->rq_next_phase = RQ_PHASE_UNDEFINED;
584
585         request->rq_request_portal = imp->imp_client->cli_request_portal;
586         request->rq_reply_portal = imp->imp_client->cli_reply_portal;
587
588         ptlrpc_at_set_req_timeout(request);
589
590         spin_lock_init(&request->rq_lock);
591         CFS_INIT_LIST_HEAD(&request->rq_list);
592         CFS_INIT_LIST_HEAD(&request->rq_replay_list);
593         CFS_INIT_LIST_HEAD(&request->rq_set_chain);
594         CFS_INIT_LIST_HEAD(&request->rq_history_list);
595         CFS_INIT_LIST_HEAD(&request->rq_exp_list);
596         cfs_waitq_init(&request->rq_reply_waitq);
597         cfs_waitq_init(&request->rq_set_waitq);
598         request->rq_xid = ptlrpc_next_xid();
599         atomic_set(&request->rq_refcount, 1);
600
601         lustre_msg_set_opc(request->rq_reqmsg, opcode);
602
603         RETURN(request);
604 }
605
606 struct ptlrpc_request *
607 ptlrpc_prep_req(struct obd_import *imp, __u32 version, int opcode, int count,
608                 __u32 *lengths, char **bufs)
609 {
610         return ptlrpc_prep_req_pool(imp, version, opcode, count, lengths, bufs,
611                                     NULL);
612 }
613
614 struct ptlrpc_request *ptlrpc_prep_fakereq(struct obd_import *imp,
615                                            unsigned int timeout,
616                                            int (*interpreter)(struct ptlrpc_request *,
617                                                               void *, int))
618 {
619         struct ptlrpc_request *request = NULL;
620         ENTRY;
621
622         OBD_ALLOC(request, sizeof(*request));
623         if (!request) {
624                 CERROR("request allocation out of memory\n");
625                 RETURN(NULL);
626         }
627
628         request->rq_send_state = LUSTRE_IMP_FULL;
629         request->rq_type = PTL_RPC_MSG_REQUEST;
630         request->rq_import = class_import_get(imp);
631         request->rq_export = NULL;
632         request->rq_import_generation = imp->imp_generation;
633
634         request->rq_timeout = timeout;
635         request->rq_sent = cfs_time_current_sec();
636         request->rq_deadline = request->rq_sent + timeout;
637         request->rq_reply_deadline = request->rq_deadline;
638         request->rq_interpret_reply = interpreter;
639         request->rq_phase = RQ_PHASE_RPC;
640         request->rq_next_phase = RQ_PHASE_INTERPRET;
641         /* don't want reply */
642         request->rq_receiving_reply = 0;
643         request->rq_must_unlink = 0;
644         request->rq_no_delay = request->rq_no_resend = 1;
645         request->rq_fake = 1;
646
647         spin_lock_init(&request->rq_lock);
648         CFS_INIT_LIST_HEAD(&request->rq_list);
649         CFS_INIT_LIST_HEAD(&request->rq_replay_list);
650         CFS_INIT_LIST_HEAD(&request->rq_set_chain);
651         CFS_INIT_LIST_HEAD(&request->rq_history_list);
652         CFS_INIT_LIST_HEAD(&request->rq_exp_list);
653         cfs_waitq_init(&request->rq_reply_waitq);
654         cfs_waitq_init(&request->rq_set_waitq);
655
656         request->rq_xid = ptlrpc_next_xid();
657         atomic_set(&request->rq_refcount, 1);
658
659         RETURN(request);
660 }
661
662 void ptlrpc_fakereq_finished(struct ptlrpc_request *req)
663 {
664         /* if we kill request before timeout - need adjust counter */
665         if (req->rq_phase == RQ_PHASE_RPC) {
666                 struct ptlrpc_request_set *set = req->rq_set;
667
668                 if (set)
669                         atomic_dec(&set->set_remaining);
670         }
671
672         ptlrpc_rqphase_move(req, RQ_PHASE_COMPLETE);
673         list_del_init(&req->rq_list);
674 }
675
676
677 struct ptlrpc_request_set *ptlrpc_prep_set(void)
678 {
679         struct ptlrpc_request_set *set;
680
681         ENTRY;
682         OBD_ALLOC(set, sizeof *set);
683         if (!set)
684                 RETURN(NULL);
685         CFS_INIT_LIST_HEAD(&set->set_requests);
686         cfs_waitq_init(&set->set_waitq);
687         atomic_set(&set->set_remaining, 0);
688         spin_lock_init(&set->set_new_req_lock);
689         CFS_INIT_LIST_HEAD(&set->set_new_requests);
690         CFS_INIT_LIST_HEAD(&set->set_cblist);
691
692         RETURN(set);
693 }
694
695 /* Finish with this set; opposite of prep_set. */
696 void ptlrpc_set_destroy(struct ptlrpc_request_set *set)
697 {
698         struct list_head *tmp;
699         struct list_head *next;
700         int               expected_phase;
701         int               n = 0;
702         ENTRY;
703
704         /* Requests on the set should either all be completed, or all be new */
705         expected_phase = (atomic_read(&set->set_remaining) == 0) ?
706                          RQ_PHASE_COMPLETE : RQ_PHASE_NEW;
707         list_for_each (tmp, &set->set_requests) {
708                 struct ptlrpc_request *req =
709                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
710
711                 LASSERT(req->rq_phase == expected_phase);
712                 n++;
713         }
714
715         LASSERTF(atomic_read(&set->set_remaining) == 0 || 
716                  atomic_read(&set->set_remaining) == n, "%d / %d\n",
717                  atomic_read(&set->set_remaining), n);
718
719         list_for_each_safe(tmp, next, &set->set_requests) {
720                 struct ptlrpc_request *req =
721                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
722                 list_del_init(&req->rq_set_chain);
723
724                 LASSERT(req->rq_phase == expected_phase);
725
726                 if (req->rq_phase == RQ_PHASE_NEW) {
727
728                         if (req->rq_interpret_reply != NULL) {
729                                 int (*interpreter)(struct ptlrpc_request *,
730                                                    void *, int) =
731                                         req->rq_interpret_reply;
732
733                                 /* higher level (i.e. LOV) failed;
734                                  * let the sub reqs clean up */
735                                 req->rq_status = -EBADR;
736                                 interpreter(req, &req->rq_async_args,
737                                             req->rq_status);
738                         }
739                         atomic_dec(&set->set_remaining);
740                 }
741
742                 spin_lock(&req->rq_lock);
743                 req->rq_set = NULL;
744                 req->rq_invalid_rqset = 0;
745                 spin_unlock(&req->rq_lock);
746
747                 cfs_waitq_signal(&req->rq_set_waitq);
748                 ptlrpc_req_finished (req);
749         }
750
751         LASSERT(atomic_read(&set->set_remaining) == 0);
752
753         OBD_FREE(set, sizeof(*set));
754         EXIT;
755 }
756
757 int ptlrpc_set_add_cb(struct ptlrpc_request_set *set,
758                       set_interpreter_func fn, void *data)
759 {
760         struct ptlrpc_set_cbdata *cbdata;
761
762         OBD_SLAB_ALLOC(cbdata, ptlrpc_cbdata_slab,
763                         CFS_ALLOC_STD, sizeof(*cbdata));
764         if (cbdata == NULL)
765                 RETURN(-ENOMEM);
766
767         cbdata->psc_interpret = fn;
768         cbdata->psc_data = data;
769         list_add_tail(&cbdata->psc_item, &set->set_cblist);
770
771         RETURN(0);
772 }
773
774 void ptlrpc_set_add_req(struct ptlrpc_request_set *set,
775                         struct ptlrpc_request *req)
776 {
777         /* The set takes over the caller's request reference */
778         LASSERT(list_empty(&req->rq_set_chain));
779         list_add_tail(&req->rq_set_chain, &set->set_requests);
780         req->rq_set = set;
781         atomic_inc(&set->set_remaining);
782 }
783
784 /** 
785  * Lock so many callers can add things, the context that owns the set
786  * is supposed to notice these and move them into the set proper. 
787  */
788 int ptlrpc_set_add_new_req(struct ptlrpcd_ctl *pc,
789                            struct ptlrpc_request *req)
790 {
791         struct ptlrpc_request_set *set = pc->pc_set;
792
793         /* 
794          * Let caller know that we stopped and will not handle this request.
795          * It needs to take care itself of request.
796          */
797         if (test_bit(LIOD_STOP, &pc->pc_flags))
798                 return -EALREADY;
799
800         spin_lock(&set->set_new_req_lock);
801         /* 
802          * The set takes over the caller's request reference. 
803          */
804         LASSERT(list_empty(&req->rq_set_chain));
805         list_add_tail(&req->rq_set_chain, &set->set_new_requests);
806         req->rq_set = set;
807         spin_unlock(&set->set_new_req_lock);
808
809         /*
810          * Let thead know that we added something and better it to wake up 
811          * and process.
812          */
813         cfs_waitq_signal(&set->set_waitq);
814         return 0;
815 }
816
817 /*
818  * Based on the current state of the import, determine if the request
819  * can be sent, is an error, or should be delayed.
820  *
821  * Returns true if this request should be delayed. If false, and
822  * *status is set, then the request can not be sent and *status is the
823  * error code.  If false and status is 0, then request can be sent.
824  *
825  * The imp->imp_lock must be held.
826  */
827 static int ptlrpc_import_delay_req(struct obd_import *imp,
828                                    struct ptlrpc_request *req, int *status)
829 {
830         int delay = 0;
831         ENTRY;
832
833         LASSERT (status != NULL);
834         *status = 0;
835
836         if (imp->imp_state == LUSTRE_IMP_NEW) {
837                 DEBUG_REQ(D_ERROR, req, "Uninitialized import.");
838                 *status = -EIO;
839                 LBUG();
840         } else if (imp->imp_state == LUSTRE_IMP_CLOSED) {
841                 DEBUG_REQ(D_ERROR, req, "IMP_CLOSED ");
842                 *status = -EIO;
843         } else if (req->rq_send_state == LUSTRE_IMP_CONNECTING &&
844                  imp->imp_state == LUSTRE_IMP_CONNECTING) {
845                 /* allow CONNECT even if import is invalid */ ;
846                 if (atomic_read(&imp->imp_inval_count) != 0) {
847                         DEBUG_REQ(D_ERROR, req, "invalidate in flight");
848                         *status = -EIO;
849                 }
850         } else if ((imp->imp_invalid && (!imp->imp_recon_bk)) ||
851                                          imp->imp_obd->obd_no_recov) {
852                 /* If the import has been invalidated (such as by an OST
853                  * failure), and if the import(MGC) tried all of its connection
854                  * list (Bug 13464), the request must fail with -ESHUTDOWN.
855                  * This indicates the requests should be discarded; an -EIO
856                  * may result in a resend of the request. */
857                 if (!imp->imp_deactive)
858                         DEBUG_REQ(D_ERROR, req, "IMP_INVALID");
859                 *status = -ESHUTDOWN; /* bz 12940 */
860         } else if (req->rq_import_generation != imp->imp_generation) {
861                 DEBUG_REQ(D_ERROR, req, "req wrong generation:");
862                 *status = -EIO;
863         } else if (req->rq_send_state != imp->imp_state) {
864                 /* invalidate in progress - any requests should be drop */
865                 if (atomic_read(&imp->imp_inval_count) != 0) {
866                         DEBUG_REQ(D_ERROR, req, "invalidate in flight");
867                         *status = -EIO;
868                 } else if (imp->imp_dlm_fake || req->rq_no_delay) {
869                         *status = -EWOULDBLOCK;
870                 } else {
871                         delay = 1;
872                 }
873         }
874
875         RETURN(delay);
876 }
877
878 static int ptlrpc_check_reply(struct ptlrpc_request *req)
879 {
880         int rc = 0;
881         const char *what = "";
882         ENTRY;
883
884         /* serialise with network callback */
885         spin_lock(&req->rq_lock);
886
887         if (ptlrpc_client_replied(req)) {
888                 what = "REPLIED: ";
889                 GOTO(out, rc = 1);
890         }
891
892         if (req->rq_net_err && !req->rq_timedout) {
893                 what = "NETERR: ";
894                 spin_unlock(&req->rq_lock);
895                 rc = ptlrpc_expire_one_request(req, 0);
896                 spin_lock(&req->rq_lock);
897                 GOTO(out, rc);
898         }
899
900         if (req->rq_err) {
901                 what = "ABORTED: ";
902                 GOTO(out, rc = 1);
903         }
904
905         if (req->rq_resend) {
906                 what = "RESEND: ";
907                 GOTO(out, rc = 1);
908         }
909
910         if (req->rq_restart) {
911                 what = "RESTART: ";
912                 GOTO(out, rc = 1);
913         }
914
915         if (ptlrpc_client_early(req)) {
916                 what = "EARLYREP: ";
917                 ptlrpc_at_recv_early_reply(req);
918                 GOTO(out, rc = 0); /* keep waiting */
919         }
920
921         EXIT;
922  out:
923         spin_unlock(&req->rq_lock);
924         DEBUG_REQ(D_NET, req, "%src = %d for", what, rc);
925         return rc;
926 }
927
928 /* Conditionally suppress specific console messages */
929 static int ptlrpc_console_allow(struct ptlrpc_request *req)
930 {
931         __u32 opc = lustre_msg_get_opc(req->rq_reqmsg);
932         int err;
933
934         /* Suppress particular reconnect errors which are to be expected.  No
935          * errors are suppressed for the initial connection on an import */
936         if ((lustre_handle_is_used(&req->rq_import->imp_remote_handle)) &&
937             (opc == OST_CONNECT || opc == MDS_CONNECT || opc == MGS_CONNECT)) {
938
939                 /* Suppress timed out reconnect requests */
940                 if (req->rq_timedout)
941                         return 0;
942
943                 /* Suppress unavailable/again reconnect requests */
944                 err = lustre_msg_get_status(req->rq_repmsg);
945                 if (err == -ENODEV || err == -EAGAIN)
946                         return 0;
947         }
948
949         return 1;
950 }
951
952 static int ptlrpc_check_status(struct ptlrpc_request *req)
953 {
954         int err;
955         ENTRY;
956
957         err = lustre_msg_get_status(req->rq_repmsg);
958         if (err < 0) {
959                 DEBUG_REQ(D_INFO, req, "status is %d", err);
960         } else if (err > 0) {
961                 /* XXX: translate this error from net to host */
962                 DEBUG_REQ(D_INFO, req, "status is %d", err);
963         }
964
965         if (lustre_msg_get_type(req->rq_repmsg) == PTL_RPC_MSG_ERR) {
966                 struct obd_import *imp = req->rq_import;
967                 __u32 opc = lustre_msg_get_opc(req->rq_reqmsg);
968
969                 if (ptlrpc_console_allow(req))
970                         LCONSOLE_ERROR_MSG(0x011,"an error occurred while "
971                                            "communicating with %s. The %s "
972                                            "operation failed with %d\n",
973                                            libcfs_nid2str(
974                                            imp->imp_connection->c_peer.nid),
975                                            ll_opcode2str(opc), err);
976
977                 RETURN(err < 0 ? err : -EINVAL);
978         }
979
980         RETURN(err);
981 }
982
983 /* VBR: we should save pre-versions for replay*/
984 static void ptlrpc_save_versions(struct ptlrpc_request *req)
985 {
986         struct lustre_msg *repmsg = req->rq_repmsg;
987         struct lustre_msg *reqmsg = req->rq_reqmsg;
988         __u64 *versions = lustre_msg_get_versions(repmsg);
989         ENTRY;
990         /* Interoperability with 1.6. This should be changed to LASSERT in HEAD */
991         if (versions == NULL)
992                 return;
993
994         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)
995                 return;
996
997         lustre_msg_set_versions(reqmsg, versions);
998         CDEBUG(D_INFO, "Client save versions ["LPX64"/"LPX64"]\n",
999                versions[0], versions[1]);
1000
1001         EXIT;
1002 }
1003
1004 static int after_reply(struct ptlrpc_request *req)
1005 {
1006         struct obd_import *imp = req->rq_import;
1007         struct obd_device *obd = req->rq_import->imp_obd;
1008         int rc;
1009         struct timeval work_start;
1010         long timediff;
1011         ENTRY;
1012
1013         /* repbuf must be unlinked */
1014         LASSERT(!req->rq_receiving_reply && !req->rq_must_unlink);
1015         LASSERT(obd);
1016
1017         /* NB Until this point, the whole of the incoming message,
1018          * including buflens, status etc is in the sender's byte order. */
1019
1020         if (req->rq_reply_truncate) {
1021                 if (req->rq_no_resend) {
1022                         DEBUG_REQ(D_ERROR, req, "reply buffer overflow,"
1023                                   " expected: %d, actual size: %d",
1024                                   req->rq_nob_received, req->rq_replen);
1025                         RETURN(-EOVERFLOW);
1026                 }
1027
1028                 OBD_FREE(req->rq_repbuf, req->rq_replen);
1029                 req->rq_repbuf = NULL;
1030                 /* Pass the required reply buffer size (include
1031                  * space for early reply) */
1032                 req->rq_replen       = size_round(req->rq_nob_received);
1033                 req->rq_nob_received = 0;
1034
1035                 spin_lock(&req->rq_lock);
1036                 req->rq_resend       = 1;
1037                 spin_unlock(&req->rq_lock);
1038                 RETURN(0);
1039         }
1040
1041         LASSERT ((char *)req->rq_repmsg + req->rq_nob_received <=
1042                  (char *)req->rq_repbuf + req->rq_replen);
1043         rc = unpack_reply(req);
1044         if (rc)
1045                 RETURN(rc);
1046
1047         do_gettimeofday(&work_start);
1048         timediff = cfs_timeval_sub(&work_start, &req->rq_arrival_time, NULL);
1049         if (obd->obd_svc_stats != NULL) {
1050                 lprocfs_counter_add(obd->obd_svc_stats, PTLRPC_REQWAIT_CNTR,
1051                                     timediff);
1052                 ptlrpc_lprocfs_rpc_sent(req, timediff);
1053         }
1054
1055         if (lustre_msg_get_opc(req->rq_reqmsg) != OBD_PING)
1056                 OBD_FAIL_TIMEOUT(OBD_FAIL_PTLRPC_PAUSE_REP, obd_fail_val);
1057         ptlrpc_at_adj_service(req, lustre_msg_get_timeout(req->rq_repmsg));
1058         ptlrpc_at_adj_net_latency(req, lustre_msg_get_service_time(req->rq_repmsg));
1059
1060         if (lustre_msg_get_type(req->rq_repmsg) != PTL_RPC_MSG_REPLY &&
1061             lustre_msg_get_type(req->rq_repmsg) != PTL_RPC_MSG_ERR) {
1062                 DEBUG_REQ(D_ERROR, req, "invalid packet received (type=%u)",
1063                           lustre_msg_get_type(req->rq_repmsg));
1064                 RETURN(-EPROTO);
1065         }
1066
1067         rc = ptlrpc_check_status(req);
1068         if (rc) {
1069                 /* Either we've been evicted, or the server has failed for
1070                  * some reason. Try to reconnect, and if that fails, punt to
1071                  * the upcall. */
1072                 if (ll_rpc_recoverable_error(rc)) {
1073                         if (req->rq_send_state != LUSTRE_IMP_FULL ||
1074                             imp->imp_obd->obd_no_recov || imp->imp_dlm_fake) {
1075                                 RETURN(rc);
1076                         }
1077                         ptlrpc_request_handle_notconn(req);
1078                         RETURN(rc);
1079                 }
1080         } else {
1081                 /* Let's look if server sent slv. Do it only for RPC with
1082                  * rc == 0. */
1083                 ldlm_cli_update_pool(req);
1084         }
1085
1086         /* Store transno in reqmsg for replay. */
1087         if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)) {
1088                 req->rq_transno = lustre_msg_get_transno(req->rq_repmsg);
1089                 lustre_msg_set_transno(req->rq_reqmsg, req->rq_transno);
1090         }
1091
1092         if (imp->imp_replayable) {
1093                 spin_lock(&imp->imp_lock);
1094                 /* no point in adding already-committed requests to the replay
1095                  * list, we will just remove them immediately. b=9829 */
1096                 if (req->rq_transno != 0 &&
1097                     (req->rq_transno >
1098                      lustre_msg_get_last_committed(req->rq_repmsg) ||
1099                      req->rq_replay)) {
1100                         /* version recovery */
1101                         ptlrpc_save_versions(req);
1102                         ptlrpc_retain_replayable_request(req, imp);
1103                 } else if (req->rq_commit_cb != NULL) {
1104                         spin_unlock(&imp->imp_lock);
1105                         req->rq_commit_cb(req);
1106                         spin_lock(&imp->imp_lock);
1107                 }
1108
1109                 /* Replay-enabled imports return commit-status information. */
1110                 if (lustre_msg_get_last_committed(req->rq_repmsg))
1111                         imp->imp_peer_committed_transno =
1112                                 lustre_msg_get_last_committed(req->rq_repmsg);
1113                 ptlrpc_free_committed(imp);
1114
1115                 if (req->rq_transno > imp->imp_peer_committed_transno)
1116                         ptlrpc_pinger_commit_expected(imp);
1117                 spin_unlock(&imp->imp_lock);
1118         }
1119
1120         RETURN(rc);
1121 }
1122
1123 static int ptlrpc_send_new_req(struct ptlrpc_request *req)
1124 {
1125         struct obd_import     *imp;
1126         int rc;
1127         ENTRY;
1128
1129         LASSERT(req->rq_phase == RQ_PHASE_NEW);
1130         if (req->rq_sent && (req->rq_sent > CURRENT_SECONDS))
1131                 RETURN (0);
1132
1133         ptlrpc_rqphase_move(req, RQ_PHASE_RPC);
1134
1135         imp = req->rq_import;
1136         spin_lock(&imp->imp_lock);
1137
1138         req->rq_import_generation = imp->imp_generation;
1139
1140         if (ptlrpc_import_delay_req(imp, req, &rc)) {
1141                 spin_lock(&req->rq_lock);
1142                 req->rq_waiting = 1;
1143                 spin_unlock(&req->rq_lock);
1144
1145                 DEBUG_REQ(D_HA, req, "req from PID %d waiting for recovery: "
1146                           "(%s != %s)", lustre_msg_get_status(req->rq_reqmsg),
1147                           ptlrpc_import_state_name(req->rq_send_state),
1148                           ptlrpc_import_state_name(imp->imp_state));
1149                 LASSERT(list_empty(&req->rq_list));
1150                 list_add_tail(&req->rq_list, &imp->imp_delayed_list);
1151                 atomic_inc(&req->rq_import->imp_inflight);
1152                 spin_unlock(&imp->imp_lock);
1153                 RETURN(0);
1154         }
1155
1156         if (rc != 0) {
1157                 spin_unlock(&imp->imp_lock);
1158                 req->rq_status = rc;
1159                 ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
1160                 RETURN(rc);
1161         }
1162
1163         LASSERT(list_empty(&req->rq_list));
1164         list_add_tail(&req->rq_list, &imp->imp_sending_list);
1165         atomic_inc(&req->rq_import->imp_inflight);
1166         spin_unlock(&imp->imp_lock);
1167
1168         lustre_msg_set_status(req->rq_reqmsg, cfs_curproc_pid());
1169         CDEBUG(D_RPCTRACE, "Sending RPC pname:cluuid:pid:xid:nid:opc"
1170                " %s:%s:%d:x"LPU64":%s:%d\n", cfs_curproc_comm(),
1171                imp->imp_obd->obd_uuid.uuid,
1172                lustre_msg_get_status(req->rq_reqmsg), req->rq_xid,
1173                libcfs_nid2str(imp->imp_connection->c_peer.nid),
1174                lustre_msg_get_opc(req->rq_reqmsg));
1175
1176         rc = ptl_send_rpc(req, 0);
1177         if (rc) {
1178                 DEBUG_REQ(D_HA, req, "send failed (%d); expect timeout", rc);
1179
1180                 spin_lock(&req->rq_lock);
1181                 req->rq_net_err = 1;
1182                 spin_unlock(&req->rq_lock);
1183                 RETURN(rc);
1184         }
1185         RETURN(0);
1186 }
1187
1188 /* this sends any unsent RPCs in @set and returns TRUE if all are sent */
1189 int ptlrpc_check_set(struct ptlrpc_request_set *set)
1190 {
1191         struct list_head *tmp;
1192         int force_timer_recalc = 0;
1193         ENTRY;
1194
1195         if (atomic_read(&set->set_remaining) == 0)
1196                 RETURN(1);
1197
1198         list_for_each(tmp, &set->set_requests) {
1199                 struct ptlrpc_request *req =
1200                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
1201                 struct obd_import *imp = req->rq_import;
1202                 int unregistered = 0;
1203                 int rc = 0;
1204
1205                 if (req->rq_phase == RQ_PHASE_NEW &&
1206                     ptlrpc_send_new_req(req)) {
1207                         force_timer_recalc = 1;
1208                 }
1209
1210                 /* delayed send - skip */
1211                 if (req->rq_phase == RQ_PHASE_NEW && req->rq_sent)
1212                         continue;
1213
1214                 if (!(req->rq_phase == RQ_PHASE_RPC ||
1215                       req->rq_phase == RQ_PHASE_BULK ||
1216                       req->rq_phase == RQ_PHASE_INTERPRET ||
1217                       req->rq_phase == RQ_PHASE_UNREGISTERING ||
1218                       req->rq_phase == RQ_PHASE_COMPLETE)) {
1219                         DEBUG_REQ(D_ERROR, req, "bad phase %x", req->rq_phase);
1220                         LBUG();
1221                 }
1222
1223                 if (req->rq_phase == RQ_PHASE_UNREGISTERING) {
1224                         LASSERT(req->rq_next_phase != req->rq_phase);
1225                         LASSERT(req->rq_next_phase != RQ_PHASE_UNDEFINED);
1226
1227                         /* Abort the bulk, if the request itself has been
1228                          * aborted, for instance, on a client eviction. */
1229                         if (req->rq_err && req->rq_status == -EINTR &&
1230                             req->rq_bulk != NULL)
1231                                 ptlrpc_abort_bulk(req->rq_bulk);
1232
1233                         /* Skip processing until reply is unlinked. We
1234                          * can't return to pool before that and we can't
1235                          * call interpret before that. We need to make
1236                          * sure that all rdma transfers finished and will
1237                          * not corrupt any data. */
1238                         if (ptlrpc_client_recv_or_unlink(req) ||
1239                             ptlrpc_client_bulk_active(req))
1240                                 continue;
1241
1242                         /* Turn repl fail_loc off to prevent it from looping
1243                          * forever. */
1244                         if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK)) {
1245                                 OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK |
1246                                                OBD_FAIL_ONCE);
1247                         }
1248
1249                         /* Turn off bulk fail_loc. */
1250                         if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_BULK_UNLINK)) {
1251                                 OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_BULK_UNLINK |
1252                                                OBD_FAIL_ONCE);
1253                         }
1254
1255                         /* Move to next phase if reply was successfully 
1256                          * unlinked. */
1257                         ptlrpc_rqphase_move(req, req->rq_next_phase);
1258                 }
1259
1260                 if (req->rq_phase == RQ_PHASE_COMPLETE)
1261                         continue;
1262
1263                 if (req->rq_phase == RQ_PHASE_INTERPRET)
1264                         GOTO(interpret, req->rq_status);
1265
1266                 /* Note that this also will start async reply unlink */
1267                 if (req->rq_net_err && !req->rq_timedout) {
1268                         ptlrpc_expire_one_request(req, 1);
1269
1270                         /* Check if we still need to wait for unlink. */
1271                         if (ptlrpc_client_recv_or_unlink(req) ||
1272                             ptlrpc_client_bulk_active(req))
1273                                 continue;
1274                 }
1275
1276                 if (req->rq_err) {
1277                         if (req->rq_status == 0)
1278                                 req->rq_status = -EIO;
1279                         GOTO(interpret, req->rq_status);
1280                 }
1281
1282                 /* ptlrpc_queue_wait->l_wait_event guarantees that rq_intr
1283                  * will only be set after rq_timedout, but the oig waiting
1284                  * path sets rq_intr irrespective of whether ptlrpcd has
1285                  * seen a timeout.  our policy is to only interpret
1286                  * interrupted rpcs after they have timed out */
1287                 if (req->rq_intr && (req->rq_timedout || req->rq_waiting)) {
1288                         req->rq_status = -EINTR;
1289                         GOTO(interpret, req->rq_status);
1290                 }
1291
1292                 if (req->rq_phase == RQ_PHASE_RPC) {
1293                         if (req->rq_timedout||req->rq_waiting||req->rq_resend) {
1294                                 int status;
1295
1296                                 if (!ptlrpc_unregister_reply(req, 1))
1297                                         continue;
1298
1299                                 spin_lock(&imp->imp_lock);
1300                                 if (ptlrpc_import_delay_req(imp, req, &status)){
1301                                         /* put on delay list - only if we wait
1302                                          * recovery finished - before send */
1303                                         list_del_init(&req->rq_list);
1304                                         list_add_tail(&req->rq_list, &imp->imp_delayed_list);
1305                                         spin_unlock(&imp->imp_lock);
1306                                         continue;
1307                                 }
1308
1309                                 if (status != 0)  {
1310                                         req->rq_status = status;
1311                                         spin_unlock(&imp->imp_lock);
1312                                         GOTO(interpret, req->rq_status);
1313                                 }
1314                                 if (req->rq_no_resend) {
1315                                         req->rq_status = -ENOTCONN;
1316                                         spin_unlock(&imp->imp_lock);
1317                                         GOTO(interpret, req->rq_status);
1318                                 }
1319
1320                                 list_del_init(&req->rq_list);
1321                                 list_add_tail(&req->rq_list,
1322                                               &imp->imp_sending_list);
1323
1324                                 spin_unlock(&imp->imp_lock);
1325
1326                                 spin_lock(&req->rq_lock);
1327                                 req->rq_waiting = 0;
1328                                 spin_unlock(&req->rq_lock);
1329
1330                                 if (req->rq_timedout||req->rq_resend) {
1331                                         /* This is re-sending anyways, 
1332                                          * let's mark req as resend. */
1333                                         spin_lock(&req->rq_lock);
1334                                         req->rq_resend = 1;
1335                                         spin_unlock(&req->rq_lock);
1336                                         if (req->rq_bulk) {
1337                                                 __u64 old_xid;
1338
1339                                                 if (!ptlrpc_unregister_bulk(req, 1))
1340                                                         continue;
1341
1342                                                 /* ensure previous bulk fails */
1343                                                 old_xid = req->rq_xid;
1344                                                 req->rq_xid = ptlrpc_next_xid();
1345                                                 CDEBUG(D_HA, "resend bulk "
1346                                                        "old x"LPU64
1347                                                        " new x"LPU64"\n",
1348                                                        old_xid, req->rq_xid);
1349                                         }
1350                                 }
1351
1352                                 rc = ptl_send_rpc(req, 0);
1353                                 if (rc) {
1354                                         DEBUG_REQ(D_HA, req, "send failed (%d)",
1355                                                   rc);
1356                                         force_timer_recalc = 1;
1357
1358                                         spin_lock(&req->rq_lock);
1359                                         req->rq_net_err = 1;
1360                                         spin_unlock(&req->rq_lock);
1361                                 }
1362                                 /* need to reset the timeout */
1363                                 force_timer_recalc = 1;
1364                         }
1365
1366                         spin_lock(&req->rq_lock);
1367
1368                         if (ptlrpc_client_early(req)) {
1369                                 ptlrpc_at_recv_early_reply(req);
1370                                 spin_unlock(&req->rq_lock);
1371                                 continue;
1372                         }
1373
1374                         /* Still waiting for a reply? */
1375                         if (ptlrpc_client_recv(req)) {
1376                                 spin_unlock(&req->rq_lock);
1377                                 continue;
1378                         }
1379
1380                         /* Did we actually receive a reply? */
1381                         if (!ptlrpc_client_replied(req)) {
1382                                 spin_unlock(&req->rq_lock);
1383                                 continue;
1384                         }
1385
1386                         spin_unlock(&req->rq_lock);
1387
1388                         /* unlink from net because we are going to
1389                          * swab in-place of reply buffer */
1390                         unregistered = ptlrpc_unregister_reply(req, 1);
1391                         if (!unregistered)
1392                                 continue;
1393
1394                         req->rq_status = after_reply(req);
1395                         if (req->rq_resend)
1396                                 continue;
1397
1398                         /* If there is no bulk associated with this request,
1399                          * then we're done and should let the interpreter
1400                          * process the reply. Similarly if the RPC returned
1401                          * an error, and therefore the bulk will never arrive.
1402                          */
1403                         if (req->rq_bulk == NULL || req->rq_status != 0)
1404                                 GOTO(interpret, req->rq_status);
1405
1406                         ptlrpc_rqphase_move(req, RQ_PHASE_BULK);
1407                 }
1408
1409                 LASSERT(req->rq_phase == RQ_PHASE_BULK);
1410                 if (ptlrpc_client_bulk_active(req))
1411                         continue;
1412
1413                 if (!req->rq_bulk->bd_success) {
1414                         /* The RPC reply arrived OK, but the bulk screwed
1415                          * up!  Dead wierd since the server told us the RPC
1416                          * was good after getting the REPLY for her GET or
1417                          * the ACK for her PUT. */
1418                         DEBUG_REQ(D_ERROR, req, "bulk transfer failed");
1419                         req->rq_status = -EIO;
1420                         GOTO(interpret, req->rq_status);
1421                 }
1422       interpret:
1423                 ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
1424
1425                 /* This moves to "unregistering" phase we need to wait for
1426                  * reply unlink. */
1427                 if (!unregistered && !ptlrpc_unregister_reply(req, 1))
1428                         continue;
1429
1430                 if (!ptlrpc_unregister_bulk(req, 1))
1431                         continue;
1432
1433                 /* When calling interpret receiving already should be
1434                  * finished. */
1435                 LASSERT(!req->rq_receiving_reply);
1436
1437                 ptlrpc_req_interpret(req, req->rq_status);
1438
1439                 ptlrpc_rqphase_move(req, RQ_PHASE_COMPLETE);
1440
1441                 CDEBUG(D_RPCTRACE, "Completed RPC pname:cluuid:pid:xid:nid:"
1442                        "opc %s:%s:%d:x"LPU64":%s:%d\n", cfs_curproc_comm(),
1443                        imp->imp_obd->obd_uuid.uuid,
1444                        req->rq_reqmsg ? lustre_msg_get_status(req->rq_reqmsg):-1,
1445                        req->rq_xid,
1446                        libcfs_nid2str(imp->imp_connection->c_peer.nid),
1447                        req->rq_reqmsg ? lustre_msg_get_opc(req->rq_reqmsg) : -1);
1448
1449                 spin_lock(&imp->imp_lock);
1450                 /* Request already may be not on sending or delaying list. This
1451                  * may happen in the case of marking it errorneous for the case
1452                  * ptlrpc_import_delay_req(req, status) find it impossible to 
1453                  * allow sending this rpc and returns *status != 0. */
1454                 if (!list_empty(&req->rq_list)) {
1455                         list_del_init(&req->rq_list);
1456                         atomic_dec(&imp->imp_inflight);
1457                 }
1458                 spin_unlock(&imp->imp_lock);
1459
1460                 atomic_dec(&set->set_remaining);
1461                 cfs_waitq_broadcast(&imp->imp_recovery_waitq);
1462         }
1463
1464         /* If we hit an error, we want to recover promptly. */
1465         RETURN(atomic_read(&set->set_remaining) == 0 || force_timer_recalc);
1466 }
1467
1468 /* Return 1 if we should give up, else 0 */
1469 int ptlrpc_expire_one_request(struct ptlrpc_request *req, int async_unlink)
1470 {
1471         struct obd_import *imp = req->rq_import;
1472         int rc = 0;
1473         ENTRY;
1474
1475         DEBUG_REQ(req->rq_fake ? D_INFO : D_WARNING, req, 
1476                   "Request x"LPU64" sent from %s to NID %s"
1477                   " %lus ago has %s (%lds prior to deadline).\n", req->rq_xid,
1478                   imp ? imp->imp_obd->obd_name : "<?>",
1479                   imp ? libcfs_nid2str(imp->imp_connection->c_peer.nid) : "<?>",
1480                   cfs_time_current_sec() - req->rq_sent,
1481                   req->rq_net_err ? "failed due to network error" : "timed out",
1482                   req->rq_deadline - req->rq_sent);
1483
1484         if (imp != NULL && obd_debug_peer_on_timeout)
1485                 LNetCtl(IOC_LIBCFS_DEBUG_PEER, &imp->imp_connection->c_peer);
1486
1487         spin_lock(&req->rq_lock);
1488         req->rq_timedout = 1;
1489         spin_unlock(&req->rq_lock);
1490
1491         ptlrpc_unregister_reply(req, async_unlink);
1492         ptlrpc_unregister_bulk(req, async_unlink);
1493
1494         if (obd_dump_on_timeout)
1495                 libcfs_debug_dumplog();
1496
1497         if (imp == NULL) {
1498                 DEBUG_REQ(D_HA, req, "NULL import: already cleaned up?");
1499                 RETURN(1);
1500         }
1501
1502         if (req->rq_fake)
1503                 RETURN(1);
1504
1505         atomic_inc(&imp->imp_timeouts);
1506
1507         /* The DLM server doesn't want recovery run on its imports. */
1508         if (imp->imp_dlm_fake)
1509                 RETURN(1);
1510
1511         /* If this request is for recovery or other primordial tasks,
1512          * then error it out here. */
1513         if (req->rq_send_state != LUSTRE_IMP_FULL ||
1514             imp->imp_obd->obd_no_recov) {
1515                 DEBUG_REQ(D_RPCTRACE, req, "err -110, sent_state=%s (now=%s)",
1516                           ptlrpc_import_state_name(req->rq_send_state),
1517                           ptlrpc_import_state_name(imp->imp_state));
1518                 spin_lock(&req->rq_lock);
1519                 req->rq_status = -ETIMEDOUT;
1520                 req->rq_err = 1;
1521                 spin_unlock(&req->rq_lock);
1522                 RETURN(1);
1523         }
1524
1525         /* if a request can't be resent we can't wait for an answer after
1526            the timeout */
1527         if (req->rq_no_resend) {
1528                 DEBUG_REQ(D_RPCTRACE, req, "TIMEOUT-NORESEND:");
1529                 rc = 1;
1530         }
1531
1532         ptlrpc_fail_import(imp, lustre_msg_get_conn_cnt(req->rq_reqmsg));
1533
1534         RETURN(rc);
1535 }
1536
1537 int ptlrpc_expired_set(void *data)
1538 {
1539         struct ptlrpc_request_set *set = data;
1540         struct list_head          *tmp;
1541         time_t                     now = cfs_time_current_sec();
1542         ENTRY;
1543
1544         LASSERT(set != NULL);
1545
1546         /* A timeout expired; see which reqs it applies to... */
1547         list_for_each(tmp, &set->set_requests) {
1548                 struct ptlrpc_request *req =
1549                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
1550
1551                 /* Request in-flight? */
1552                 if (!((req->rq_phase == RQ_PHASE_RPC &&
1553                        !req->rq_waiting && !req->rq_resend) ||
1554                       (req->rq_phase == RQ_PHASE_BULK)))
1555                         continue;
1556
1557                 if (req->rq_timedout ||     /* already dealt with */
1558                     req->rq_deadline > now) /* not expired */
1559                         continue;
1560
1561                 /* Deal with this guy. Do it asynchronously to not block
1562                  * ptlrpcd thread. */
1563                 ptlrpc_expire_one_request(req, 1);
1564         }
1565
1566         /* When waiting for a whole set, we always to break out of the
1567          * sleep so we can recalculate the timeout, or enable interrupts
1568          * if everyone's timed out. */
1569         RETURN(1);
1570 }
1571
1572 void ptlrpc_mark_interrupted(struct ptlrpc_request *req)
1573 {
1574         spin_lock(&req->rq_lock);
1575         req->rq_intr = 1;
1576         spin_unlock(&req->rq_lock);
1577 }
1578
1579 void ptlrpc_interrupted_set(void *data)
1580 {
1581         struct ptlrpc_request_set *set = data;
1582         struct list_head *tmp;
1583
1584         LASSERT(set != NULL);
1585         CERROR("INTERRUPTED SET %p\n", set);
1586
1587         list_for_each(tmp, &set->set_requests) {
1588                 struct ptlrpc_request *req =
1589                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
1590
1591                 if (req->rq_phase != RQ_PHASE_RPC &&
1592                     req->rq_phase != RQ_PHASE_UNREGISTERING)
1593                         continue;
1594
1595                 ptlrpc_mark_interrupted(req);
1596         }
1597 }
1598
1599 /* get the smallest timeout in the set; this does NOT set a timeout. */
1600 int ptlrpc_set_next_timeout(struct ptlrpc_request_set *set)
1601 {
1602         struct list_head      *tmp;
1603         time_t                 now = cfs_time_current_sec();
1604         int                    timeout = 0;
1605         struct ptlrpc_request *req;
1606         int                    deadline;
1607         ENTRY;
1608
1609         SIGNAL_MASK_ASSERT(); /* XXX BUG 1511 */
1610
1611         list_for_each(tmp, &set->set_requests) {
1612                 req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
1613
1614                 /* request in-flight? */
1615                 if (!(((req->rq_phase == RQ_PHASE_RPC) && !req->rq_waiting) ||
1616                       (req->rq_phase == RQ_PHASE_BULK) ||
1617                       (req->rq_phase == RQ_PHASE_NEW)))
1618                         continue;
1619
1620                 /* Already timed out. */
1621                 if (req->rq_timedout)
1622                         continue;
1623
1624                 if (req->rq_phase == RQ_PHASE_NEW)
1625                         deadline = req->rq_sent;    /* delayed send */
1626                 else
1627                         deadline = req->rq_deadline;
1628
1629                 if (deadline <= now) {  /* actually expired already */
1630                         timeout = 1;    /* ASAP */
1631                         break;
1632                 }
1633                 if ((timeout == 0) || (timeout > (deadline - now)))
1634                         timeout = deadline - now;
1635         }
1636         RETURN(timeout);
1637 }
1638
1639 int ptlrpc_set_wait(struct ptlrpc_request_set *set)
1640 {
1641         struct list_head      *tmp;
1642         struct ptlrpc_request *req;
1643         struct l_wait_info     lwi;
1644         int                    rc, timeout;
1645         ENTRY;
1646
1647         if (list_empty(&set->set_requests))
1648                 RETURN(0);
1649
1650         list_for_each(tmp, &set->set_requests) {
1651                 req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
1652                 if (req->rq_phase == RQ_PHASE_NEW)
1653                         (void)ptlrpc_send_new_req(req);
1654         }
1655
1656         do {
1657                 timeout = ptlrpc_set_next_timeout(set);
1658
1659                 /* wait until all complete, interrupted, or an in-flight
1660                  * req times out */
1661                 CDEBUG(D_RPCTRACE, "set %p going to sleep for %d seconds\n",
1662                        set, timeout);
1663                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(timeout ? timeout : 1),
1664                                        ptlrpc_expired_set,
1665                                        ptlrpc_interrupted_set, set);
1666                 rc = l_wait_event(set->set_waitq, ptlrpc_check_set(set), &lwi);
1667
1668                 LASSERT(rc == 0 || rc == -EINTR || rc == -ETIMEDOUT);
1669
1670                 /* -EINTR => all requests have been flagged rq_intr so next
1671                  * check completes.
1672                  * -ETIMEOUTD => someone timed out.  When all reqs have
1673                  * timed out, signals are enabled allowing completion with
1674                  * EINTR.
1675                  * I don't really care if we go once more round the loop in
1676                  * the error cases -eeb. */
1677                 if (rc == 0 && atomic_read(&set->set_remaining) == 0) {
1678                         list_for_each(tmp, &set->set_requests) {
1679                                 req = list_entry(tmp, struct ptlrpc_request, 
1680                                                  rq_set_chain);
1681                                 spin_lock(&req->rq_lock);
1682                                 req->rq_invalid_rqset = 1;
1683                                 spin_unlock(&req->rq_lock);
1684                         }
1685                 }
1686         } while (rc != 0 || atomic_read(&set->set_remaining) != 0);
1687
1688         LASSERT(atomic_read(&set->set_remaining) == 0);
1689
1690         rc = 0;
1691         list_for_each(tmp, &set->set_requests) {
1692                 req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
1693
1694                 LASSERT(req->rq_phase == RQ_PHASE_COMPLETE);
1695                 if (req->rq_status != 0)
1696                         rc = req->rq_status;
1697         }
1698
1699         if (set->set_interpret != NULL) {
1700                 int (*interpreter)(struct ptlrpc_request_set *set,void *,int) =
1701                         set->set_interpret;
1702                 rc = interpreter (set, set->set_arg, rc);
1703         } else {
1704                 struct ptlrpc_set_cbdata *cbdata, *n;
1705                 int err;
1706
1707                 list_for_each_entry_safe(cbdata, n,
1708                                          &set->set_cblist, psc_item) {
1709                         list_del_init(&cbdata->psc_item);
1710                         err = cbdata->psc_interpret(set, cbdata->psc_data, rc);
1711                         if (err && !rc)
1712                                 rc = err;
1713                         OBD_SLAB_FREE(cbdata, ptlrpc_cbdata_slab,
1714                                         sizeof(*cbdata));
1715                 }
1716         }
1717
1718         RETURN(rc);
1719 }
1720
1721 static void __ptlrpc_free_req_to_pool(struct ptlrpc_request *request)
1722 {
1723         struct ptlrpc_request_pool *pool = request->rq_pool;
1724
1725         spin_lock(&pool->prp_lock);
1726         LASSERT(list_empty(&request->rq_list));
1727         LASSERT(!request->rq_receiving_reply);
1728         list_add_tail(&request->rq_list, &pool->prp_req_list);
1729         spin_unlock(&pool->prp_lock);
1730 }
1731
1732 static void __ptlrpc_free_req(struct ptlrpc_request *request, int locked)
1733 {
1734         ENTRY;
1735         if (request == NULL) {
1736                 EXIT;
1737                 return;
1738         }
1739
1740         LASSERTF(!request->rq_receiving_reply, "req %p\n", request);
1741         LASSERTF(request->rq_rqbd == NULL, "req %p\n",request);/* client-side */
1742         LASSERTF(list_empty(&request->rq_list), "req %p\n", request);
1743         LASSERTF(list_empty(&request->rq_set_chain), "req %p\n", request);
1744         LASSERTF(list_empty(&request->rq_exp_list), "req %p\n", request);
1745         LASSERTF(!request->rq_replay, "req %p\n", request);
1746
1747         /* We must take it off the imp_replay_list first.  Otherwise, we'll set
1748          * request->rq_reqmsg to NULL while osc_close is dereferencing it. */
1749         if (request->rq_import != NULL) {
1750                 if (!locked)
1751                         spin_lock(&request->rq_import->imp_lock);
1752                 list_del_init(&request->rq_replay_list);
1753                 if (!locked)
1754                         spin_unlock(&request->rq_import->imp_lock);
1755         }
1756         LASSERTF(list_empty(&request->rq_replay_list), "req %p\n", request);
1757
1758         if (atomic_read(&request->rq_refcount) != 0) {
1759                 DEBUG_REQ(D_ERROR, request,
1760                           "freeing request with nonzero refcount");
1761                 LBUG();
1762         }
1763
1764         if (request->rq_repbuf != NULL) {
1765                 OBD_FREE(request->rq_repbuf, request->rq_replen);
1766                 request->rq_repbuf = NULL;
1767                 request->rq_repmsg = NULL;
1768         }
1769         if (request->rq_export != NULL) {
1770                 class_export_put(request->rq_export);
1771                 request->rq_export = NULL;
1772         }
1773         if (request->rq_import != NULL) {
1774                 class_import_put(request->rq_import);
1775                 request->rq_import = NULL;
1776         }
1777         if (request->rq_bulk != NULL)
1778                 ptlrpc_free_bulk(request->rq_bulk);
1779
1780         if (request->rq_pool) {
1781                 __ptlrpc_free_req_to_pool(request);
1782         } else {
1783                 if (request->rq_reqmsg != NULL) {
1784                         OBD_FREE(request->rq_reqmsg, request->rq_reqlen);
1785                         request->rq_reqmsg = NULL;
1786                 }
1787                 OBD_FREE(request, sizeof(*request));
1788         }
1789         EXIT;
1790 }
1791
1792 static int __ptlrpc_req_finished(struct ptlrpc_request *request, int locked);
1793 void ptlrpc_req_finished_with_imp_lock(struct ptlrpc_request *request)
1794 {
1795         LASSERT_SPIN_LOCKED(&request->rq_import->imp_lock);
1796         (void)__ptlrpc_req_finished(request, 1);
1797 }
1798
1799 static int __ptlrpc_req_finished(struct ptlrpc_request *request, int locked)
1800 {
1801         ENTRY;
1802         if (request == NULL)
1803                 RETURN(1);
1804
1805         if (request == LP_POISON ||
1806             request->rq_reqmsg == LP_POISON) {
1807                 CERROR("dereferencing freed request (bug 575)\n");
1808                 LBUG();
1809                 RETURN(1);
1810         }
1811
1812         DEBUG_REQ(D_INFO, request, "refcount now %u",
1813                   atomic_read(&request->rq_refcount) - 1);
1814
1815         if (atomic_dec_and_test(&request->rq_refcount)) {
1816                 __ptlrpc_free_req(request, locked);
1817                 RETURN(1);
1818         }
1819
1820         RETURN(0);
1821 }
1822
1823 void ptlrpc_req_finished(struct ptlrpc_request *request)
1824 {
1825         __ptlrpc_req_finished(request, 0);
1826 }
1827
1828 __u64 ptlrpc_req_xid(struct ptlrpc_request *request)
1829 {
1830         return request->rq_xid;
1831 }
1832 EXPORT_SYMBOL(ptlrpc_req_xid);
1833
1834 /* Disengage the client's reply buffer from the network
1835  * NB does _NOT_ unregister any client-side bulk.
1836  * IDEMPOTENT, but _not_ safe against concurrent callers.
1837  * The request owner (i.e. the thread doing the I/O) must call...
1838  */
1839 int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async)
1840 {
1841         int                rc;
1842         cfs_waitq_t       *wq;
1843         struct l_wait_info lwi;
1844         ENTRY;
1845
1846         /* Might sleep. */
1847         LASSERT(!in_interrupt());
1848
1849         /* Let's setup deadline for reply unlink. */
1850         if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK) && 
1851             async && request->rq_reply_deadline == 0)
1852                 request->rq_reply_deadline = cfs_time_current_sec()+LONG_UNLINK;
1853
1854         /* Nothing left to do. */
1855         if (!ptlrpc_client_recv_or_unlink(request))
1856                 RETURN(1);
1857
1858         LNetMDUnlink(request->rq_reply_md_h);
1859
1860         /* Let's check it once again. */        
1861         if (!ptlrpc_client_recv_or_unlink(request))
1862                 RETURN(1);
1863
1864         /* Move to "Unregistering" phase as reply was not unlinked yet. */
1865         ptlrpc_rqphase_move(request, RQ_PHASE_UNREGISTERING);
1866
1867         /* Do not wait for unlink to finish. */
1868         if (async)
1869                 RETURN(0);
1870
1871         /* We have to l_wait_event() whatever the result, to give liblustre
1872          * a chance to run reply_in_callback(), and to make sure we've
1873          * unlinked before returning a req to the pool */
1874         if (request->rq_set != NULL)
1875                 wq = &request->rq_set->set_waitq;
1876         else
1877                 wq = &request->rq_reply_waitq;
1878
1879         for (;;) {
1880                 /* Network access will complete in finite time but the HUGE
1881                  * timeout lets us CWARN for visibility of sluggish NALs */
1882                 lwi = LWI_TIMEOUT_INTERVAL(cfs_time_seconds(LONG_UNLINK),
1883                                            cfs_time_seconds(1), NULL, NULL);
1884                 rc = l_wait_event(*wq, !ptlrpc_client_recv_or_unlink(request),
1885                                   &lwi);
1886                 if (rc == 0) {
1887                         ptlrpc_rqphase_move(request, request->rq_next_phase);
1888                         RETURN(1);
1889                 }
1890
1891                 LASSERT(rc == -ETIMEDOUT);
1892                 DEBUG_REQ(D_WARNING, request, "Unexpectedly long timeout "
1893                           "rvcng=%d unlnk=%d", request->rq_receiving_reply,
1894                           request->rq_must_unlink);
1895         }
1896         RETURN(0);
1897 }
1898
1899 /* caller must hold imp->imp_lock */
1900 void ptlrpc_free_committed(struct obd_import *imp)
1901 {
1902         struct list_head *tmp, *saved;
1903         struct ptlrpc_request *req;
1904         struct ptlrpc_request *last_req = NULL; /* temporary fire escape */
1905         ENTRY;
1906
1907         LASSERT(imp != NULL);
1908
1909         LASSERT_SPIN_LOCKED(&imp->imp_lock);
1910
1911
1912         if (imp->imp_peer_committed_transno == imp->imp_last_transno_checked &&
1913             imp->imp_generation == imp->imp_last_generation_checked) {
1914                 CDEBUG(D_INFO, "%s: skip recheck: last_committed "LPU64"\n",
1915                        imp->imp_obd->obd_name, imp->imp_peer_committed_transno);
1916                 EXIT;
1917                 return;
1918         }
1919
1920         CDEBUG(D_RPCTRACE, "%s: committing for last_committed "LPU64" gen %d\n",
1921                imp->imp_obd->obd_name, imp->imp_peer_committed_transno,
1922                imp->imp_generation);
1923         imp->imp_last_transno_checked = imp->imp_peer_committed_transno;
1924         imp->imp_last_generation_checked = imp->imp_generation;
1925
1926         list_for_each_safe(tmp, saved, &imp->imp_replay_list) {
1927                 req = list_entry(tmp, struct ptlrpc_request, rq_replay_list);
1928
1929                 /* XXX ok to remove when 1357 resolved - rread 05/29/03  */
1930                 LASSERT(req != last_req);
1931                 last_req = req;
1932
1933                 if (req->rq_import_generation < imp->imp_generation) {
1934                         DEBUG_REQ(D_RPCTRACE, req, "free request with old gen");
1935                         GOTO(free_req, 0);
1936                 }
1937
1938                 if (req->rq_replay) {
1939                         DEBUG_REQ(D_RPCTRACE, req, "keeping (FL_REPLAY)");
1940                         continue;
1941                 }
1942
1943                 /* not yet committed */
1944                 if (req->rq_transno > imp->imp_peer_committed_transno) {
1945                         DEBUG_REQ(D_RPCTRACE, req, "stopping search");
1946                         break;
1947                 }
1948
1949                 DEBUG_REQ(D_INFO, req, "commit (last_committed "LPU64")",
1950                           imp->imp_peer_committed_transno);
1951 free_req:
1952                 spin_lock(&req->rq_lock);
1953                 req->rq_replay = 0;
1954                 spin_unlock(&req->rq_lock);
1955                 if (req->rq_commit_cb != NULL)
1956                         req->rq_commit_cb(req);
1957                 list_del_init(&req->rq_replay_list);
1958                 __ptlrpc_req_finished(req, 1);
1959         }
1960
1961         EXIT;
1962         return;
1963 }
1964
1965 void ptlrpc_cleanup_client(struct obd_import *imp)
1966 {
1967         ENTRY;
1968         EXIT;
1969         return;
1970 }
1971
1972 void ptlrpc_resend_req(struct ptlrpc_request *req)
1973 {
1974         DEBUG_REQ(D_HA, req, "going to resend");
1975         lustre_msg_set_handle(req->rq_reqmsg, &(struct lustre_handle){ 0 });
1976         req->rq_status = -EAGAIN;
1977
1978         spin_lock(&req->rq_lock);
1979         req->rq_resend = 1;
1980         req->rq_net_err = 0;
1981         req->rq_timedout = 0;
1982         if (req->rq_bulk) {
1983                 __u64 old_xid = req->rq_xid;
1984
1985                 /* ensure previous bulk fails */
1986                 req->rq_xid = ptlrpc_next_xid();
1987                 CDEBUG(D_HA, "resend bulk old x"LPU64" new x"LPU64"\n",
1988                        old_xid, req->rq_xid);
1989         }
1990         ptlrpc_client_wake_req(req);
1991         spin_unlock(&req->rq_lock);
1992 }
1993
1994 /* XXX: this function and rq_status are currently unused */
1995 void ptlrpc_restart_req(struct ptlrpc_request *req)
1996 {
1997         DEBUG_REQ(D_HA, req, "restarting (possibly-)completed request");
1998         req->rq_status = -ERESTARTSYS;
1999
2000         spin_lock(&req->rq_lock);
2001         req->rq_restart = 1;
2002         req->rq_timedout = 0;
2003         ptlrpc_client_wake_req(req);
2004         spin_unlock(&req->rq_lock);
2005 }
2006
2007 static void interrupted_request(void *data)
2008 {
2009         struct ptlrpc_request *req = data;
2010         DEBUG_REQ(D_HA, req, "request interrupted");
2011         spin_lock(&req->rq_lock);
2012         req->rq_intr = 1;
2013         spin_unlock(&req->rq_lock);
2014 }
2015
2016 struct ptlrpc_request *ptlrpc_request_addref(struct ptlrpc_request *req)
2017 {
2018         ENTRY;
2019         atomic_inc(&req->rq_refcount);
2020         RETURN(req);
2021 }
2022
2023 void ptlrpc_retain_replayable_request(struct ptlrpc_request *req,
2024                                       struct obd_import *imp)
2025 {
2026         struct list_head *tmp;
2027
2028         LASSERT_SPIN_LOCKED(&imp->imp_lock);
2029
2030         /* clear this for new requests that were resent as well
2031            as resent replayed requests. */
2032         lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT);
2033
2034         /* don't re-add requests that have been replayed */
2035         if (!list_empty(&req->rq_replay_list))
2036                 return;
2037
2038         lustre_msg_add_flags(req->rq_reqmsg, MSG_REPLAY);
2039
2040         LASSERT(imp->imp_replayable);
2041         /* Balanced in ptlrpc_free_committed, usually. */
2042         ptlrpc_request_addref(req);
2043         list_for_each_prev(tmp, &imp->imp_replay_list) {
2044                 struct ptlrpc_request *iter =
2045                         list_entry(tmp, struct ptlrpc_request, rq_replay_list);
2046
2047                 /* We may have duplicate transnos if we create and then
2048                  * open a file, or for closes retained if to match creating
2049                  * opens, so use req->rq_xid as a secondary key.
2050                  * (See bugs 684, 685, and 428.)
2051                  * XXX no longer needed, but all opens need transnos!
2052                  */
2053                 if (iter->rq_transno > req->rq_transno)
2054                         continue;
2055
2056                 if (iter->rq_transno == req->rq_transno) {
2057                         LASSERT(iter->rq_xid != req->rq_xid);
2058                         if (iter->rq_xid > req->rq_xid)
2059                                 continue;
2060                 }
2061
2062                 list_add(&req->rq_replay_list, &iter->rq_replay_list);
2063                 return;
2064         }
2065
2066         list_add(&req->rq_replay_list, &imp->imp_replay_list);
2067 }
2068
2069 int ptlrpc_queue_wait(struct ptlrpc_request *req)
2070 {
2071         int rc = 0;
2072         int brc;
2073         struct l_wait_info lwi;
2074         struct obd_import *imp = req->rq_import;
2075         cfs_duration_t timeout = CFS_TICK;
2076         long timeoutl;
2077         ENTRY;
2078
2079         LASSERT(req->rq_set == NULL);
2080         LASSERT(!req->rq_receiving_reply);
2081
2082         /* for distributed debugging */
2083         lustre_msg_set_status(req->rq_reqmsg, cfs_curproc_pid());
2084         LASSERT(imp->imp_obd != NULL);
2085         CDEBUG(D_RPCTRACE, "Sending RPC pname:cluuid:pid:xid:nid:opc "
2086                "%s:%s:%d:x"LPU64":%s:%d\n", cfs_curproc_comm(),
2087                imp->imp_obd->obd_uuid.uuid,
2088                lustre_msg_get_status(req->rq_reqmsg), req->rq_xid,
2089                libcfs_nid2str(imp->imp_connection->c_peer.nid),
2090                lustre_msg_get_opc(req->rq_reqmsg));
2091
2092         /* Mark phase here for a little debug help */
2093         ptlrpc_rqphase_move(req, RQ_PHASE_RPC);
2094
2095         spin_lock(&imp->imp_lock);
2096         req->rq_import_generation = imp->imp_generation;
2097 restart:
2098         if (ptlrpc_import_delay_req(imp, req, &rc)) {
2099                 list_del_init(&req->rq_list);
2100                 list_add_tail(&req->rq_list, &imp->imp_delayed_list);
2101                 atomic_inc(&imp->imp_inflight);
2102                 spin_unlock(&imp->imp_lock);
2103
2104                 DEBUG_REQ(D_HA, req, "\"%s\" waiting for recovery: (%s != %s)",
2105                           cfs_curproc_comm(),
2106                           ptlrpc_import_state_name(req->rq_send_state),
2107                           ptlrpc_import_state_name(imp->imp_state));
2108                 lwi = LWI_INTR(interrupted_request, req);
2109                 rc = l_wait_event(req->rq_reply_waitq,
2110                                   (req->rq_send_state == imp->imp_state ||
2111                                    req->rq_err || req->rq_intr),
2112                                   &lwi);
2113                 DEBUG_REQ(D_HA, req, "\"%s\" awake: (%s == %s or %d/%d == 1)",
2114                           cfs_curproc_comm(),
2115                           ptlrpc_import_state_name(imp->imp_state),
2116                           ptlrpc_import_state_name(req->rq_send_state),
2117                           req->rq_err, req->rq_intr);
2118
2119                 spin_lock(&imp->imp_lock);
2120                 list_del_init(&req->rq_list);
2121                 atomic_dec(&imp->imp_inflight);
2122
2123                 if (req->rq_err) {
2124                         /* rq_status was set locally */
2125                         rc = req->rq_status ? req->rq_status : -EIO;
2126                 }
2127                 else if (req->rq_intr) {
2128                         rc = -EINTR;
2129                 }
2130                 else if (req->rq_no_resend) {
2131                         rc = -ETIMEDOUT;
2132                 }
2133                 else {
2134                         GOTO(restart, rc);
2135                 }
2136         }
2137
2138         if (rc != 0) {
2139                 spin_unlock(&imp->imp_lock);
2140                 req->rq_status = rc; // XXX this ok?
2141                 GOTO(out, rc);
2142         }
2143
2144         if (req->rq_resend) {
2145                 if (req->rq_bulk != NULL) {
2146                         ptlrpc_unregister_bulk(req, 0);
2147
2148                         /* bulk requests are supposed to be
2149                          * idempotent, so we are free to bump the xid
2150                          * here, which we need to do before
2151                          * registering the bulk again (bug 6371).
2152                          * print the old xid first for sanity.
2153                          */
2154                         DEBUG_REQ(D_HA, req, "bumping xid for bulk: ");
2155                         req->rq_xid = ptlrpc_next_xid();
2156                 }
2157
2158                 DEBUG_REQ(D_HA, req, "resending: ");
2159         }
2160
2161         /* XXX this is the same as ptlrpc_set_wait */
2162         LASSERT(list_empty(&req->rq_list));
2163         list_add_tail(&req->rq_list, &imp->imp_sending_list);
2164         atomic_inc(&imp->imp_inflight);
2165         spin_unlock(&imp->imp_lock);
2166
2167         rc = ptl_send_rpc(req, 0);
2168         if (rc)
2169                 DEBUG_REQ(D_HA, req, "send failed (%d); recovering", rc);
2170         do {
2171                 timeoutl = req->rq_deadline - cfs_time_current_sec();
2172                 timeout = (timeoutl <= 0 || rc) ? CFS_TICK :
2173                         cfs_time_seconds(timeoutl);
2174                 DEBUG_REQ(D_NET, req,
2175                           "-- sleeping for "CFS_DURATION_T" ticks", timeout);
2176                 lwi = LWI_TIMEOUT_INTR(timeout, NULL, interrupted_request, req);
2177                 brc = l_wait_event(req->rq_reply_waitq, ptlrpc_check_reply(req),
2178                                   &lwi);
2179                 /* Wait again if we changed deadline */
2180         } while ((brc == -ETIMEDOUT) &&
2181                  (req->rq_deadline > cfs_time_current_sec()));
2182
2183         if ((brc == -ETIMEDOUT) && !ptlrpc_expire_one_request(req, 0)) {
2184                 /* Wait forever for reconnect / replay or failure */
2185                 lwi = LWI_INTR(interrupted_request, req);
2186                 brc = l_wait_event(req->rq_reply_waitq, ptlrpc_check_reply(req),
2187                                    &lwi);
2188         }
2189
2190         CDEBUG(D_RPCTRACE, "Completed RPC pname:cluuid:pid:xid:nid:opc "
2191                "%s:%s:%d:x"LPU64":%s:%d\n", cfs_curproc_comm(),
2192                imp->imp_obd->obd_uuid.uuid,
2193                lustre_msg_get_status(req->rq_reqmsg), req->rq_xid,
2194                libcfs_nid2str(imp->imp_connection->c_peer.nid),
2195                lustre_msg_get_opc(req->rq_reqmsg));
2196
2197         /* If the reply was received normally, this just grabs the spinlock
2198          * (ensuring the reply callback has returned), sees that
2199          * req->rq_receiving_reply is clear and returns. */
2200         ptlrpc_unregister_reply(req, 0);
2201
2202         spin_lock(&imp->imp_lock);
2203         list_del_init(&req->rq_list);
2204         atomic_dec(&imp->imp_inflight);
2205         spin_unlock(&imp->imp_lock);
2206
2207         if (req->rq_err) {
2208                 DEBUG_REQ(D_RPCTRACE, req, "err rc=%d status=%d",
2209                           rc, req->rq_status);
2210                 rc = rc ? rc : req->rq_status;
2211                 GOTO(out, rc = rc ? rc : -EIO);
2212         }
2213
2214         if (req->rq_intr) {
2215                 /* Should only be interrupted if we timed out. */
2216                 if (!req->rq_timedout)
2217                         DEBUG_REQ(D_ERROR, req,
2218                                   "rq_intr set but rq_timedout not");
2219                 GOTO(out, rc = -EINTR);
2220         }
2221
2222         /* Resend if we need to */
2223         if (req->rq_resend||req->rq_timedout) {
2224                 /* ...unless we were specifically told otherwise. */
2225                 if (req->rq_no_resend)
2226                         GOTO(out, rc = -ETIMEDOUT);
2227                 spin_lock(&imp->imp_lock);
2228                 /* we can have rq_timeout on dlm fake import which not support
2229                  * recovery - but me need resend request on this import instead
2230                  * of return error */
2231                 spin_lock(&req->rq_lock);
2232                 req->rq_resend = 1;
2233                 spin_unlock(&req->rq_lock);
2234                 goto restart;
2235         }
2236
2237         if (!ptlrpc_client_replied(req)) {
2238                 /* How can this be? -eeb */
2239                 DEBUG_REQ(D_ERROR, req, "!rq_replied: ");
2240                 LBUG();
2241                 GOTO(out, rc = req->rq_status);
2242         }
2243
2244         rc = after_reply(req);
2245         /* NB may return +ve success rc */
2246         if (req->rq_resend) {
2247                 spin_lock(&imp->imp_lock);
2248                 goto restart;
2249         }
2250
2251  out:
2252         if (req->rq_bulk != NULL) {
2253                 if (rc >= 0) {
2254                         /* success so far.  Note that anything going wrong
2255                          * with bulk now, is EXTREMELY strange, since the
2256                          * server must have believed that the bulk
2257                          * tranferred OK before she replied with success to
2258                          * me. */
2259                         lwi = LWI_TIMEOUT(timeout, NULL, NULL);
2260                         brc = l_wait_event(req->rq_reply_waitq,
2261                                            !ptlrpc_client_bulk_active(req),
2262                                            &lwi);
2263                         LASSERT(brc == 0 || brc == -ETIMEDOUT);
2264                         if (brc != 0) {
2265                                 LASSERT(brc == -ETIMEDOUT);
2266                                 DEBUG_REQ(D_ERROR, req, "bulk timed out");
2267                                 rc = brc;
2268                         } else if (!req->rq_bulk->bd_success) {
2269                                 DEBUG_REQ(D_ERROR, req, "bulk transfer failed");
2270                                 rc = -EIO;
2271                         }
2272                 }
2273                 if (rc < 0)
2274                         ptlrpc_unregister_bulk(req, 0);
2275         }
2276
2277         LASSERT(!req->rq_receiving_reply);
2278         ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
2279         cfs_waitq_broadcast(&imp->imp_recovery_waitq);
2280         RETURN(rc);
2281 }
2282
2283 struct ptlrpc_replay_async_args {
2284         int praa_old_state;
2285         int praa_old_status;
2286 };
2287
2288 static int ptlrpc_replay_interpret(struct ptlrpc_request *req,
2289                                     void * data, int rc)
2290 {
2291         struct ptlrpc_replay_async_args *aa = data;
2292         struct obd_import *imp = req->rq_import;
2293
2294         ENTRY;
2295         atomic_dec(&imp->imp_replay_inflight);
2296
2297         if (!ptlrpc_client_replied(req)) {
2298                 CERROR("request replay timed out, restarting recovery\n");
2299                 GOTO(out, rc = -ETIMEDOUT);
2300         }
2301
2302         if (lustre_msg_get_type(req->rq_repmsg) == PTL_RPC_MSG_ERR &&
2303             (lustre_msg_get_status(req->rq_repmsg) == -ENOTCONN ||
2304              lustre_msg_get_status(req->rq_repmsg) == -ENODEV))
2305                 GOTO(out, rc = lustre_msg_get_status(req->rq_repmsg));
2306
2307         /* VBR: check version failure */
2308         if (lustre_msg_get_status(req->rq_repmsg) == -EOVERFLOW) {
2309                 /* replay was failed due to version mismatch */
2310                 DEBUG_REQ(D_WARNING, req, "Version mismatch during replay\n");
2311                 spin_lock(&imp->imp_lock);
2312                 imp->imp_vbr_failed = 1;
2313                 spin_unlock(&imp->imp_lock);
2314         } else {
2315                 /* The transno had better not change over replay. */
2316                 if (unlikely(lustre_msg_get_transno(req->rq_reqmsg) !=
2317                              lustre_msg_get_transno(req->rq_repmsg) &&
2318                              lustre_msg_get_transno(req->rq_repmsg) != 0)) {
2319                         DEBUG_REQ(D_ERROR, req, "Transno has changed over "
2320                                   "replay ("LPU64"/"LPU64")\n",
2321                                   lustre_msg_get_transno(req->rq_reqmsg),
2322                                   lustre_msg_get_transno(req->rq_repmsg));
2323                         LBUG();
2324                 }
2325         }
2326
2327         spin_lock(&imp->imp_lock);
2328         imp->imp_last_replay_transno = lustre_msg_get_transno(req->rq_reqmsg);
2329         spin_unlock(&imp->imp_lock);
2330         LASSERT(imp->imp_last_replay_transno);
2331
2332         DEBUG_REQ(D_HA, req, "got rep");
2333
2334         /* let the callback do fixups, possibly including in the request */
2335         if (req->rq_replay_cb)
2336                 req->rq_replay_cb(req);
2337
2338         if (ptlrpc_client_replied(req) &&
2339             lustre_msg_get_status(req->rq_repmsg) != aa->praa_old_status) {
2340                 DEBUG_REQ(D_ERROR, req, "status %d, old was %d",
2341                           lustre_msg_get_status(req->rq_repmsg),
2342                           aa->praa_old_status);
2343         } else {
2344                 /* Put it back for re-replay. */
2345                 lustre_msg_set_status(req->rq_repmsg, aa->praa_old_status);
2346         }
2347
2348         /* continue with recovery */
2349         rc = ptlrpc_import_recovery_state_machine(imp);
2350  out:
2351         req->rq_send_state = aa->praa_old_state;
2352
2353         if (rc != 0)
2354                 /* this replay failed, so restart recovery */
2355                 ptlrpc_connect_import(imp, NULL);
2356
2357         RETURN(rc);
2358 }
2359
2360
2361 int ptlrpc_replay_req(struct ptlrpc_request *req)
2362 {
2363         struct ptlrpc_replay_async_args *aa;
2364         ENTRY;
2365
2366         LASSERT(req->rq_import->imp_state == LUSTRE_IMP_REPLAY);
2367
2368         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2369         aa = ptlrpc_req_async_args(req);
2370         memset(aa, 0, sizeof *aa);
2371
2372         /* Prepare request to be resent with ptlrpcd */
2373         aa->praa_old_state = req->rq_send_state;
2374         req->rq_send_state = LUSTRE_IMP_REPLAY;
2375         req->rq_phase = RQ_PHASE_NEW;
2376         req->rq_next_phase = RQ_PHASE_UNDEFINED;
2377         if (req->rq_repmsg)
2378                 aa->praa_old_status = lustre_msg_get_status(req->rq_repmsg);
2379         req->rq_status = 0;
2380         req->rq_interpret_reply = ptlrpc_replay_interpret;
2381         /* Readjust the timeout for current conditions */
2382         ptlrpc_at_set_req_timeout(req);
2383
2384         DEBUG_REQ(D_HA, req, "REPLAY");
2385
2386         atomic_inc(&req->rq_import->imp_replay_inflight);
2387         ptlrpc_request_addref(req); /* ptlrpcd needs a ref */
2388
2389         ptlrpcd_add_req(req);
2390         RETURN(0);
2391 }
2392
2393 void ptlrpc_abort_inflight(struct obd_import *imp)
2394 {
2395         struct list_head *tmp, *n;
2396         ENTRY;
2397
2398         /* Make sure that no new requests get processed for this import.
2399          * ptlrpc_{queue,set}_wait must (and does) hold imp_lock while testing
2400          * this flag and then putting requests on sending_list or delayed_list.
2401          */
2402         spin_lock(&imp->imp_lock);
2403
2404         /* XXX locking?  Maybe we should remove each request with the list
2405          * locked?  Also, how do we know if the requests on the list are
2406          * being freed at this time?
2407          */
2408         list_for_each_safe(tmp, n, &imp->imp_sending_list) {
2409                 struct ptlrpc_request *req =
2410                         list_entry(tmp, struct ptlrpc_request, rq_list);
2411
2412                 DEBUG_REQ(D_RPCTRACE, req, "inflight");
2413
2414                 spin_lock (&req->rq_lock);
2415                 if (req->rq_import_generation < imp->imp_generation) {
2416                         req->rq_err = 1;
2417                         req->rq_status = -EINTR;
2418                         ptlrpc_client_wake_req(req);
2419                 }
2420                 spin_unlock (&req->rq_lock);
2421         }
2422
2423         list_for_each_safe(tmp, n, &imp->imp_delayed_list) {
2424                 struct ptlrpc_request *req =
2425                         list_entry(tmp, struct ptlrpc_request, rq_list);
2426
2427                 DEBUG_REQ(D_RPCTRACE, req, "aborting waiting req");
2428
2429                 spin_lock (&req->rq_lock);
2430                 if (req->rq_import_generation < imp->imp_generation) {
2431                         req->rq_err = 1;
2432                         req->rq_status = -EINTR;
2433                         ptlrpc_client_wake_req(req);
2434                 }
2435                 spin_unlock (&req->rq_lock);
2436         }
2437
2438         /* Last chance to free reqs left on the replay list, but we
2439          * will still leak reqs that haven't committed.  */
2440         if (imp->imp_replayable)
2441                 ptlrpc_free_committed(imp);
2442
2443         spin_unlock(&imp->imp_lock);
2444
2445         EXIT;
2446 }
2447
2448 void ptlrpc_abort_set(struct ptlrpc_request_set *set)
2449 {
2450         struct list_head *tmp, *pos;
2451
2452         LASSERT(set != NULL);
2453
2454         list_for_each_safe(pos, tmp, &set->set_requests) {
2455                 struct ptlrpc_request *req =
2456                         list_entry(pos, struct ptlrpc_request, rq_set_chain);
2457
2458                 spin_lock(&req->rq_lock);
2459                 if (req->rq_phase != RQ_PHASE_RPC) {
2460                         spin_unlock(&req->rq_lock);
2461                         continue;
2462                 }
2463
2464                 req->rq_err = 1;
2465                 req->rq_status = -EINTR;
2466                 ptlrpc_client_wake_req(req);
2467                 spin_unlock(&req->rq_lock);
2468         }
2469 }
2470
2471 static __u64 ptlrpc_last_xid;
2472 static spinlock_t ptlrpc_last_xid_lock;
2473
2474 /* Initialize the XID for the node.  This is common among all requests on
2475  * this node, and only requires the property that it is monotonically
2476  * increasing.  It does not need to be sequential.  Since this is also used
2477  * as the RDMA match bits, it is important that a single client NOT have
2478  * the same match bits for two different in-flight requests, hence we do
2479  * NOT want to have an XID per target or similar.
2480  *
2481  * To avoid an unlikely collision between match bits after a client reboot
2482  * (which would deliver old data into the wrong RDMA buffer) we initialize
2483  * the XID based on the current time, assuming a maximum RPC rate of 1M RPC/s.
2484  * If the time is clearly incorrect, we instead use a 62-bit random number.
2485  * In the worst case the random number will overflow 1M RPCs per second in
2486  * 9133 years, or permutations thereof.
2487  */
2488 #define YEAR_2004 (1ULL << 30)
2489 void ptlrpc_init_xid(void)
2490 {
2491         time_t now = cfs_time_current_sec();
2492
2493         spin_lock_init(&ptlrpc_last_xid_lock);
2494         if (now < YEAR_2004) {
2495                 ll_get_random_bytes(&ptlrpc_last_xid, sizeof(ptlrpc_last_xid));
2496                 ptlrpc_last_xid >>= 2;
2497                 ptlrpc_last_xid |= (1ULL << 61);
2498         } else {
2499                 ptlrpc_last_xid = (__u64)now << 20;
2500         }
2501 }
2502
2503 __u64 ptlrpc_next_xid(void)
2504 {
2505         __u64 tmp;
2506         spin_lock(&ptlrpc_last_xid_lock);
2507         tmp = ++ptlrpc_last_xid;
2508         spin_unlock(&ptlrpc_last_xid_lock);
2509         return tmp;
2510 }
2511
2512 __u64 ptlrpc_sample_next_xid(void)
2513 {
2514         if (sizeof(long) < 8) {
2515                 /* need to avoid possible word tearing on 32-bit systems */
2516                 __u64 tmp;
2517                 spin_lock(&ptlrpc_last_xid_lock);
2518                 tmp = ptlrpc_last_xid + 1;
2519                 spin_unlock(&ptlrpc_last_xid_lock);
2520                 return tmp;
2521         }
2522         /* No need to lock, since returned value is racy anyways */
2523         return ptlrpc_last_xid + 1;
2524 }
2525 EXPORT_SYMBOL(ptlrpc_sample_next_xid);