Whamcloud - gitweb
b=13669
[fs/lustre-release.git] / lustre / ptlrpc / client.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of the Lustre file system, http://www.lustre.org
7  *   Lustre is a trademark of Cluster File Systems, Inc.
8  *
9  *   You may have signed or agreed to another license before downloading
10  *   this software.  If so, you are bound by the terms and conditions
11  *   of that agreement, and the following does not apply to you.  See the
12  *   LICENSE file included with this distribution for more information.
13  *
14  *   If you did not agree to a different license, then this copy of Lustre
15  *   is open source software; you can redistribute it and/or modify it
16  *   under the terms of version 2 of the GNU General Public License as
17  *   published by the Free Software Foundation.
18  *
19  *   In either case, Lustre is distributed in the hope that it will be
20  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
21  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22  *   license text for more details.
23  *
24  */
25
26 #define DEBUG_SUBSYSTEM S_RPC
27 #ifndef __KERNEL__
28 #include <errno.h>
29 #include <signal.h>
30 #include <liblustre.h>
31 #endif
32
33 #include <obd_support.h>
34 #include <obd_class.h>
35 #include <lustre_lib.h>
36 #include <lustre_ha.h>
37 #include <lustre_import.h>
38
39 #include "ptlrpc_internal.h"
40
41 void ptlrpc_init_client(int req_portal, int rep_portal, char *name,
42                         struct ptlrpc_client *cl)
43 {
44         cl->cli_request_portal = req_portal;
45         cl->cli_reply_portal   = rep_portal;
46         cl->cli_name           = name;
47 }
48
49 struct ptlrpc_connection *ptlrpc_uuid_to_connection(struct obd_uuid *uuid)
50 {
51         struct ptlrpc_connection *c;
52         lnet_nid_t                self;
53         lnet_process_id_t         peer;
54         int                       err;
55
56         err = ptlrpc_uuid_to_peer(uuid, &peer, &self);
57         if (err != 0) {
58                 CERROR("cannot find peer %s!\n", uuid->uuid);
59                 return NULL;
60         }
61
62         c = ptlrpc_get_connection(peer, self, uuid);
63         if (c) {
64                 memcpy(c->c_remote_uuid.uuid,
65                        uuid->uuid, sizeof(c->c_remote_uuid.uuid));
66         }
67
68         CDEBUG(D_INFO, "%s -> %p\n", uuid->uuid, c);
69
70         return c;
71 }
72
73 void ptlrpc_readdress_connection(struct ptlrpc_connection *conn,
74                                  struct obd_uuid *uuid)
75 {
76         lnet_nid_t        self;
77         lnet_process_id_t peer;
78         int               err;
79
80         err = ptlrpc_uuid_to_peer(uuid, &peer, &self);
81         if (err != 0) {
82                 CERROR("cannot find peer %s!\n", uuid->uuid);
83                 return;
84         }
85
86         conn->c_peer = peer;
87         conn->c_self = self;
88         return;
89 }
90
91 static inline struct ptlrpc_bulk_desc *new_bulk(int npages, int type, int portal)
92 {
93         struct ptlrpc_bulk_desc *desc;
94
95         OBD_ALLOC(desc, offsetof (struct ptlrpc_bulk_desc, bd_iov[npages]));
96         if (!desc)
97                 return NULL;
98
99         spin_lock_init(&desc->bd_lock);
100         cfs_waitq_init(&desc->bd_waitq);
101         desc->bd_max_iov = npages;
102         desc->bd_iov_count = 0;
103         desc->bd_md_h = LNET_INVALID_HANDLE;
104         desc->bd_portal = portal;
105         desc->bd_type = type;
106
107         return desc;
108 }
109
110 struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_imp (struct ptlrpc_request *req,
111                                                int npages, int type, int portal)
112 {
113         struct obd_import *imp = req->rq_import;
114         struct ptlrpc_bulk_desc *desc;
115
116         ENTRY;
117         LASSERT(type == BULK_PUT_SINK || type == BULK_GET_SOURCE);
118         desc = new_bulk(npages, type, portal);
119         if (desc == NULL)
120                 RETURN(NULL);
121
122         desc->bd_import_generation = req->rq_import_generation;
123         desc->bd_import = class_import_get(imp);
124         desc->bd_req = req;
125
126         desc->bd_cbid.cbid_fn  = client_bulk_callback;
127         desc->bd_cbid.cbid_arg = desc;
128
129         /* This makes req own desc, and free it when she frees herself */
130         req->rq_bulk = desc;
131
132         return desc;
133 }
134
135 struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_exp (struct ptlrpc_request *req,
136                                                int npages, int type, int portal)
137 {
138         struct obd_export *exp = req->rq_export;
139         struct ptlrpc_bulk_desc *desc;
140
141         ENTRY;
142         LASSERT(type == BULK_PUT_SOURCE || type == BULK_GET_SINK);
143
144         desc = new_bulk(npages, type, portal);
145         if (desc == NULL)
146                 RETURN(NULL);
147
148         desc->bd_export = class_export_get(exp);
149         desc->bd_req = req;
150
151         desc->bd_cbid.cbid_fn  = server_bulk_callback;
152         desc->bd_cbid.cbid_arg = desc;
153
154         /* NB we don't assign rq_bulk here; server-side requests are
155          * re-used, and the handler frees the bulk desc explicitly. */
156
157         return desc;
158 }
159
160 void ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc,
161                            cfs_page_t *page, int pageoffset, int len)
162 {
163         LASSERT(desc->bd_iov_count < desc->bd_max_iov);
164         LASSERT(page != NULL);
165         LASSERT(pageoffset >= 0);
166         LASSERT(len > 0);
167         LASSERT(pageoffset + len <= CFS_PAGE_SIZE);
168
169         desc->bd_nob += len;
170
171         ptlrpc_add_bulk_page(desc, page, pageoffset, len);
172 }
173
174 void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *desc)
175 {
176         ENTRY;
177
178         LASSERT(desc != NULL);
179         LASSERT(desc->bd_iov_count != LI_POISON); /* not freed already */
180         LASSERT(!desc->bd_network_rw);         /* network hands off or */
181         LASSERT((desc->bd_export != NULL) ^ (desc->bd_import != NULL));
182         if (desc->bd_export)
183                 class_export_put(desc->bd_export);
184         else
185                 class_import_put(desc->bd_import);
186
187         OBD_FREE(desc, offsetof(struct ptlrpc_bulk_desc,
188                                 bd_iov[desc->bd_max_iov]));
189         EXIT;
190 }
191
192 /* Set server timelimit for this req */
193 static void ptlrpc_at_set_req_timeout(struct ptlrpc_request *req) 
194 {
195         __u32 serv_est;
196         int idx;
197         struct imp_at *at;
198
199         LASSERT(req->rq_import);
200
201         if (AT_OFF) {
202                 /* non-AT settings */
203                 req->rq_timeout = req->rq_import->imp_server_timeout ? 
204                         obd_timeout / 2 : obd_timeout;
205                 lustre_msg_set_timeout(req->rq_reqmsg, req->rq_timeout);
206                 return;
207         }
208
209         at = &req->rq_import->imp_at;
210         idx = import_at_get_index(req->rq_import, 
211                                   req->rq_request_portal);
212         serv_est = at_get(&at->iat_service_estimate[idx]);
213         /* add an arbitrary minimum: 125% +5 sec */
214         req->rq_timeout = serv_est + (serv_est >> 2) + 5;
215         /* We could get even fancier here, using history to predict increased
216            loading... */
217              
218         if (at->iat_drain > req->rq_timeout) {
219                 /* If we're trying to drain the network queues, give this 
220                    req a long timeout */
221                 req->rq_timeout = at->iat_drain;
222                 CDEBUG(D_ADAPTTO, "waiting %ds to let queues drain\n",
223                        req->rq_timeout);
224         }
225
226         /* Let the server know what this RPC timeout is by putting it in the 
227            reqmsg*/
228         lustre_msg_set_timeout(req->rq_reqmsg, req->rq_timeout);
229 }
230
231 /* Adjust max service estimate based on server value */
232 static void ptlrpc_at_adj_service(struct ptlrpc_request *req) 
233 {
234         int idx;
235         unsigned int serv_est, oldse;
236         struct imp_at *at = &req->rq_import->imp_at;
237
238         LASSERT(req->rq_import);
239         
240         /* service estimate is returned in the repmsg timeout field,
241            may be 0 on err */
242         serv_est = lustre_msg_get_timeout(req->rq_repmsg);
243
244         idx = import_at_get_index(req->rq_import, req->rq_request_portal);
245         /* max service estimates are tracked on the server side,
246            so just keep minimal history here */
247         oldse = at_add(&at->iat_service_estimate[idx], serv_est);
248         if (oldse != 0)
249                 CDEBUG(D_ADAPTTO, "The RPC service estimate for %s ptl %d "
250                        "has changed from %d to %d\n", 
251                        req->rq_import->imp_obd->obd_name,req->rq_request_portal,
252                        oldse, at_get(&at->iat_service_estimate[idx]));
253 }
254
255 /* Expected network latency per remote node (secs) */
256 int ptlrpc_at_get_net_latency(struct ptlrpc_request *req)
257 {
258         return AT_OFF ? 0 : at_get(&req->rq_import->imp_at.iat_net_latency);
259 }
260
261 /* Adjust expected network latency */
262 static void ptlrpc_at_adj_net_latency(struct ptlrpc_request *req)
263 {
264         unsigned int st, nl, oldnl;
265         struct imp_at *at = &req->rq_import->imp_at;
266         time_t now = cfs_time_current_sec();
267
268         LASSERT(req->rq_import);
269         
270         st = lustre_msg_get_service_time(req->rq_repmsg);
271         
272         /* Network latency is total time less server processing time */
273         nl = max_t(int, now - req->rq_sent - st, 0) + 1/*st rounding*/;
274         if (st > now - req->rq_sent + 1 /* rounding */) 
275                 CERROR("Reported service time %u > total measured time %ld\n",
276                        st, now - req->rq_sent);
277
278         oldnl = at_add(&at->iat_net_latency, nl);
279         if (oldnl != 0)
280                 CDEBUG(D_ADAPTTO, "The network latency for %s (nid %s) "
281                        "has changed from %d to %d\n", 
282                        req->rq_import->imp_obd->obd_name,
283                        obd_uuid2str(
284                                &req->rq_import->imp_connection->c_remote_uuid),
285                        oldnl, at_get(&at->iat_net_latency));
286 }
287
288 static int unpack_reply(struct ptlrpc_request *req)
289 {
290         int rc;
291
292         /* Clear reply swab mask; we may have already swabbed an early reply */
293         req->rq_rep_swab_mask = 0;
294
295         rc = lustre_unpack_msg(req->rq_repmsg, req->rq_nob_received);
296         if (rc) {
297                 DEBUG_REQ(D_ERROR, req, "unpack_rep failed: %d", rc);
298                 return(-EPROTO);
299         }
300
301         rc = lustre_unpack_rep_ptlrpc_body(req, MSG_PTLRPC_BODY_OFF);
302         if (rc) {
303                 DEBUG_REQ(D_ERROR, req, "unpack ptlrpc body failed: %d", rc);
304                 return(-EPROTO);
305         }
306         return 0;
307 }
308
309 /* Handle an early reply message.
310    We can't risk the real reply coming in and changing rq_repmsg, 
311    so this fn must be called under the rq_lock */
312 static int ptlrpc_at_recv_early_reply(struct ptlrpc_request *req) {
313         struct lustre_msg *oldmsg, *msgcpy;
314         time_t olddl;
315         int oldlen, rc;
316         ENTRY;
317
318         req->rq_early = 0;
319
320         rc = unpack_reply(req);
321         if (rc) 
322                 /* Let's just ignore it - same as if it never got here */ 
323                 RETURN(rc);
324
325         /* We've got to make sure another early reply doesn't land on
326            top of our current repbuf.  Make a copy and verify checksum. */
327         oldlen = req->rq_replen;
328         spin_unlock(&req->rq_lock);
329         OBD_ALLOC(msgcpy, oldlen);
330         if (!msgcpy) {
331                 spin_lock(&req->rq_lock);
332                 RETURN(-ENOMEM);
333         }
334         spin_lock(&req->rq_lock);
335         /* Another reply might have changed the repmsg and replen while 
336            we dropped the lock; doesn't really matter, just use the latest.
337            If it doesn't fit in oldlen, checksum will be wrong. */
338         oldmsg = req->rq_repmsg;
339         memcpy(msgcpy, oldmsg, oldlen);
340         if (lustre_msg_get_cksum(msgcpy) != 
341             lustre_msg_calc_cksum(msgcpy)) {
342                 CDEBUG(D_ADAPTTO, "Early reply checksum mismatch, "
343                        "discarding %x != %x\n", lustre_msg_get_cksum(msgcpy),
344                        lustre_msg_calc_cksum(msgcpy));
345                 GOTO(out, rc = -EINVAL); 
346         }
347
348         /* Our copied msg is valid, now we can adjust the timeouts without 
349            worrying that a new reply will land on the copy. */
350         req->rq_repmsg = msgcpy;
351
352         /* Expecting to increase the service time estimate here */
353         ptlrpc_at_adj_service(req);
354         ptlrpc_at_adj_net_latency(req);
355
356         /* Adjust the local timeout for this req */
357         ptlrpc_at_set_req_timeout(req);
358
359         olddl = req->rq_deadline;
360         /* server assumes it now has rq_timeout from when it sent the 
361            early reply, so client should give it at least that long. */
362         req->rq_deadline = cfs_time_current_sec() + req->rq_timeout + 
363                     ptlrpc_at_get_net_latency(req);
364
365         DEBUG_REQ(D_ADAPTTO, req, 
366                   "Early reply #%d, new deadline in %lds (%+lds)", 
367                   req->rq_early_count, req->rq_deadline -
368                   cfs_time_current_sec(), req->rq_deadline - olddl);
369         
370         req->rq_repmsg = oldmsg;
371         
372 out:
373         OBD_FREE(msgcpy, oldlen);
374         RETURN(rc);
375 }
376
377 void ptlrpc_free_rq_pool(struct ptlrpc_request_pool *pool)
378 {
379         struct list_head *l, *tmp;
380         struct ptlrpc_request *req;
381
382         if (!pool)
383                 return;
384
385         list_for_each_safe(l, tmp, &pool->prp_req_list) {
386                 req = list_entry(l, struct ptlrpc_request, rq_list);
387                 list_del(&req->rq_list);
388                 LASSERT (req->rq_reqmsg);
389                 OBD_FREE(req->rq_reqmsg, pool->prp_rq_size);
390                 OBD_FREE(req, sizeof(*req));
391         }
392         OBD_FREE(pool, sizeof(*pool));
393 }
394
395 void ptlrpc_add_rqs_to_pool(struct ptlrpc_request_pool *pool, int num_rq)
396 {
397         int i;
398         int size = 1;
399
400         while (size < pool->prp_rq_size)
401                 size <<= 1;
402
403         LASSERTF(list_empty(&pool->prp_req_list) || size == pool->prp_rq_size,
404                  "Trying to change pool size with nonempty pool "
405                  "from %d to %d bytes\n", pool->prp_rq_size, size);
406
407         spin_lock(&pool->prp_lock);
408         pool->prp_rq_size = size;
409         for (i = 0; i < num_rq; i++) {
410                 struct ptlrpc_request *req;
411                 struct lustre_msg *msg;
412
413                 spin_unlock(&pool->prp_lock);
414                 OBD_ALLOC(req, sizeof(struct ptlrpc_request));
415                 if (!req)
416                         return;
417                 OBD_ALLOC_GFP(msg, size, CFS_ALLOC_STD);
418                 if (!msg) {
419                         OBD_FREE(req, sizeof(struct ptlrpc_request));
420                         return;
421                 }
422                 req->rq_reqmsg = msg;
423                 req->rq_pool = pool;
424                 spin_lock(&pool->prp_lock);
425                 list_add_tail(&req->rq_list, &pool->prp_req_list);
426         }
427         spin_unlock(&pool->prp_lock);
428         return;
429 }
430
431 struct ptlrpc_request_pool *ptlrpc_init_rq_pool(int num_rq, int msgsize,
432                                                 void (*populate_pool)(struct ptlrpc_request_pool *, int))
433 {
434         struct ptlrpc_request_pool *pool;
435
436         OBD_ALLOC(pool, sizeof (struct ptlrpc_request_pool));
437         if (!pool)
438                 return NULL;
439
440         /* Request next power of two for the allocation, because internally
441            kernel would do exactly this */
442
443         spin_lock_init(&pool->prp_lock);
444         CFS_INIT_LIST_HEAD(&pool->prp_req_list);
445         pool->prp_rq_size = msgsize;
446         pool->prp_populate = populate_pool;
447
448         populate_pool(pool, num_rq);
449
450         if (list_empty(&pool->prp_req_list)) {
451                 /* have not allocated a single request for the pool */
452                 OBD_FREE(pool, sizeof (struct ptlrpc_request_pool));
453                 pool = NULL;
454         }
455         return pool;
456 }
457
458 static struct ptlrpc_request *ptlrpc_prep_req_from_pool(struct ptlrpc_request_pool *pool)
459 {
460         struct ptlrpc_request *request;
461         struct lustre_msg *reqmsg;
462
463         if (!pool)
464                 return NULL;
465
466         spin_lock(&pool->prp_lock);
467
468         /* See if we have anything in a pool, and bail out if nothing,
469          * in writeout path, where this matters, this is safe to do, because
470          * nothing is lost in this case, and when some in-flight requests
471          * complete, this code will be called again. */
472         if (unlikely(list_empty(&pool->prp_req_list))) {
473                 spin_unlock(&pool->prp_lock);
474                 return NULL;
475         }
476
477         request = list_entry(pool->prp_req_list.next, struct ptlrpc_request,
478                              rq_list);
479         list_del(&request->rq_list);
480         spin_unlock(&pool->prp_lock);
481
482         LASSERT(request->rq_reqmsg);
483         LASSERT(request->rq_pool);
484
485         reqmsg = request->rq_reqmsg;
486         memset(request, 0, sizeof(*request));
487         request->rq_reqmsg = reqmsg;
488         request->rq_pool = pool;
489         request->rq_reqlen = pool->prp_rq_size;
490         return request;
491 }
492
493 struct ptlrpc_request *
494 ptlrpc_prep_req_pool(struct obd_import *imp, __u32 version, int opcode,
495                      int count, int *lengths, char **bufs,
496                      struct ptlrpc_request_pool *pool)
497 {
498         struct ptlrpc_request *request = NULL;
499         int rc;
500         ENTRY;
501
502         /* The obd disconnected */
503         if (imp == NULL)
504                 return NULL;
505
506         LASSERT(imp != LP_POISON);
507         LASSERT((unsigned long)imp->imp_client > 0x1000);
508         LASSERT(imp->imp_client != LP_POISON);
509
510         if (pool)
511                 request = ptlrpc_prep_req_from_pool(pool);
512
513         if (!request)
514                 OBD_ALLOC(request, sizeof(*request));
515
516         if (!request) {
517                 CERROR("request allocation out of memory\n");
518                 RETURN(NULL);
519         }
520
521         rc = lustre_pack_request(request, imp->imp_msg_magic, count, lengths,
522                                  bufs);
523         if (rc) {
524                 LASSERT(!request->rq_pool);
525                 OBD_FREE(request, sizeof(*request));
526                 RETURN(NULL);
527         }
528
529         lustre_msg_add_version(request->rq_reqmsg, version);
530         request->rq_send_state = LUSTRE_IMP_FULL;
531         request->rq_type = PTL_RPC_MSG_REQUEST;
532         request->rq_import = class_import_get(imp);
533         request->rq_export = NULL;
534
535         request->rq_req_cbid.cbid_fn  = request_out_callback;
536         request->rq_req_cbid.cbid_arg = request;
537
538         request->rq_reply_cbid.cbid_fn  = reply_in_callback;
539         request->rq_reply_cbid.cbid_arg = request;
540
541         request->rq_phase = RQ_PHASE_NEW;
542
543         /* XXX FIXME bug 249 */
544         request->rq_request_portal = imp->imp_client->cli_request_portal;
545         request->rq_reply_portal = imp->imp_client->cli_reply_portal;
546         
547         ptlrpc_at_set_req_timeout(request);
548
549         spin_lock_init(&request->rq_lock);
550         CFS_INIT_LIST_HEAD(&request->rq_list);
551         CFS_INIT_LIST_HEAD(&request->rq_replay_list);
552         CFS_INIT_LIST_HEAD(&request->rq_set_chain);
553         cfs_waitq_init(&request->rq_reply_waitq);
554         request->rq_xid = ptlrpc_next_xid();
555         atomic_set(&request->rq_refcount, 1);
556
557         lustre_msg_set_opc(request->rq_reqmsg, opcode);
558         lustre_msg_set_flags(request->rq_reqmsg, imp->imp_msg_flags);
559
560         RETURN(request);
561 }
562
563 struct ptlrpc_request *
564 ptlrpc_prep_req(struct obd_import *imp, __u32 version, int opcode, int count,
565                 int *lengths, char **bufs)
566 {
567         return ptlrpc_prep_req_pool(imp, version, opcode, count, lengths, bufs,
568                                     NULL);
569 }
570
571 struct ptlrpc_request_set *ptlrpc_prep_set(void)
572 {
573         struct ptlrpc_request_set *set;
574
575         ENTRY;
576         OBD_ALLOC(set, sizeof *set);
577         if (!set)
578                 RETURN(NULL);
579         CFS_INIT_LIST_HEAD(&set->set_requests);
580         cfs_waitq_init(&set->set_waitq);
581         set->set_remaining = 0;
582         spin_lock_init(&set->set_new_req_lock);
583         CFS_INIT_LIST_HEAD(&set->set_new_requests);
584
585         RETURN(set);
586 }
587
588 /* Finish with this set; opposite of prep_set. */
589 void ptlrpc_set_destroy(struct ptlrpc_request_set *set)
590 {
591         struct list_head *tmp;
592         struct list_head *next;
593         int               expected_phase;
594         int               n = 0;
595         ENTRY;
596
597         /* Requests on the set should either all be completed, or all be new */
598         expected_phase = (set->set_remaining == 0) ?
599                          RQ_PHASE_COMPLETE : RQ_PHASE_NEW;
600         list_for_each (tmp, &set->set_requests) {
601                 struct ptlrpc_request *req =
602                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
603
604                 LASSERT(req->rq_phase == expected_phase);
605                 n++;
606         }
607
608         LASSERT(set->set_remaining == 0 || set->set_remaining == n);
609
610         list_for_each_safe(tmp, next, &set->set_requests) {
611                 struct ptlrpc_request *req =
612                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
613                 list_del_init(&req->rq_set_chain);
614
615                 LASSERT(req->rq_phase == expected_phase);
616
617                 if (req->rq_phase == RQ_PHASE_NEW) {
618
619                         if (req->rq_interpret_reply != NULL) {
620                                 int (*interpreter)(struct ptlrpc_request *,
621                                                    void *, int) =
622                                         req->rq_interpret_reply;
623
624                                 /* higher level (i.e. LOV) failed;
625                                  * let the sub reqs clean up */
626                                 req->rq_status = -EBADR;
627                                 interpreter(req, &req->rq_async_args,
628                                             req->rq_status);
629                         }
630                         set->set_remaining--;
631                 }
632
633                 req->rq_set = NULL;
634                 ptlrpc_req_finished (req);
635         }
636
637         LASSERT(set->set_remaining == 0);
638
639         OBD_FREE(set, sizeof(*set));
640         EXIT;
641 }
642
643 void ptlrpc_set_add_req(struct ptlrpc_request_set *set,
644                         struct ptlrpc_request *req)
645 {
646         /* The set takes over the caller's request reference */
647         list_add_tail(&req->rq_set_chain, &set->set_requests);
648         req->rq_set = set;
649         set->set_remaining++;
650
651         atomic_inc(&req->rq_import->imp_inflight);
652 }
653
654 /* lock so many callers can add things, the context that owns the set
655  * is supposed to notice these and move them into the set proper. */
656 void ptlrpc_set_add_new_req(struct ptlrpc_request_set *set,
657                             struct ptlrpc_request *req)
658 {
659         spin_lock(&set->set_new_req_lock);
660         /* The set takes over the caller's request reference */
661         list_add_tail(&req->rq_set_chain, &set->set_new_requests);
662         req->rq_set = set;
663         spin_unlock(&set->set_new_req_lock);
664 }
665
666 /*
667  * Based on the current state of the import, determine if the request
668  * can be sent, is an error, or should be delayed.
669  *
670  * Returns true if this request should be delayed. If false, and
671  * *status is set, then the request can not be sent and *status is the
672  * error code.  If false and status is 0, then request can be sent.
673  *
674  * The imp->imp_lock must be held.
675  */
676 static int ptlrpc_import_delay_req(struct obd_import *imp,
677                                    struct ptlrpc_request *req, int *status)
678 {
679         int delay = 0;
680         ENTRY;
681
682         LASSERT (status != NULL);
683         *status = 0;
684
685         if (imp->imp_state == LUSTRE_IMP_NEW) {
686                 DEBUG_REQ(D_ERROR, req, "Uninitialized import.");
687                 *status = -EIO;
688                 LBUG();
689         } else if (imp->imp_state == LUSTRE_IMP_CLOSED) {
690                 DEBUG_REQ(D_ERROR, req, "IMP_CLOSED ");
691                 *status = -EIO;
692         } else if (req->rq_send_state == LUSTRE_IMP_CONNECTING &&
693                  imp->imp_state == LUSTRE_IMP_CONNECTING) {
694                 /* allow CONNECT even if import is invalid */ ;
695         } else if (imp->imp_invalid) {
696                 /* If the import has been invalidated (such as by an OST
697                  * failure) the request must fail with -ESHUTDOWN.  This
698                  * indicates the requests should be discarded; an -EIO
699                  * may result in a resend of the request. */              
700                 if (!imp->imp_deactive)
701                         DEBUG_REQ(D_ERROR, req, "IMP_INVALID");
702                 *status = -ESHUTDOWN; /* bz 12940 */
703         } else if (req->rq_import_generation != imp->imp_generation) {
704                 DEBUG_REQ(D_ERROR, req, "req wrong generation:");
705                 *status = -EIO;
706         } else if (req->rq_send_state != imp->imp_state) {
707                 if (imp->imp_obd->obd_no_recov || imp->imp_dlm_fake ||
708                     req->rq_no_delay)
709                         *status = -EWOULDBLOCK;
710                 else
711                         delay = 1;
712         }
713
714         RETURN(delay);
715 }
716
717 static int ptlrpc_check_reply(struct ptlrpc_request *req)
718 {
719         int rc = 0;
720         ENTRY;
721
722         /* serialise with network callback */
723         spin_lock(&req->rq_lock);
724
725         if (req->rq_replied)
726                 GOTO(out, rc = 1);
727
728         if (req->rq_net_err && !req->rq_timedout) {
729                 spin_unlock(&req->rq_lock);
730                 rc = ptlrpc_expire_one_request(req);
731                 spin_lock(&req->rq_lock);
732                 GOTO(out, rc);
733         }
734
735         if (req->rq_err)
736                 GOTO(out, rc = 1);
737
738         if (req->rq_resend)
739                 GOTO(out, rc = 1);
740
741         if (req->rq_restart)
742                 GOTO(out, rc = 1);
743
744         if (req->rq_early) {
745                 ptlrpc_at_recv_early_reply(req);
746                 GOTO(out, rc = 0); /* keep waiting */
747         }
748
749         EXIT;
750  out:
751         spin_unlock(&req->rq_lock);
752         DEBUG_REQ(D_NET, req, "rc = %d for", rc);
753         return rc;
754 }
755
756 static int ptlrpc_check_status(struct ptlrpc_request *req)
757 {
758         int err;
759         ENTRY;
760
761         err = lustre_msg_get_status(req->rq_repmsg);
762         if (lustre_msg_get_type(req->rq_repmsg) == PTL_RPC_MSG_ERR) {
763                 struct obd_import *imp = req->rq_import;
764                 __u32 opc = lustre_msg_get_opc(req->rq_reqmsg);
765
766                 LCONSOLE_ERROR_MSG(0x011,"an error occurred while communicating"
767                                " with %s. The %s operation failed with %d\n",
768                                libcfs_nid2str(imp->imp_connection->c_peer.nid),
769                                ll_opcode2str(opc), err);
770                 RETURN(err < 0 ? err : -EINVAL);
771         }
772
773         if (err < 0) {
774                 DEBUG_REQ(D_INFO, req, "status is %d", err);
775         } else if (err > 0) {
776                 /* XXX: translate this error from net to host */
777                 DEBUG_REQ(D_INFO, req, "status is %d", err);
778         }
779
780         RETURN(err);
781 }
782
783 static int after_reply(struct ptlrpc_request *req)
784 {
785         struct obd_import *imp = req->rq_import;
786         int rc;
787         ENTRY;
788
789         LASSERT(!req->rq_receiving_reply);
790
791         /* NB Until this point, the whole of the incoming message,
792          * including buflens, status etc is in the sender's byte order. */
793
794         LASSERT (req->rq_nob_received <= req->rq_replen);
795         rc = unpack_reply(req);
796         if (rc) 
797                 RETURN(rc);
798
799         OBD_FAIL_TIMEOUT(OBD_FAIL_PTLRPC_PAUSE_REP, obd_fail_val);
800         ptlrpc_at_adj_service(req);
801         ptlrpc_at_adj_net_latency(req);
802
803         if (lustre_msg_get_type(req->rq_repmsg) != PTL_RPC_MSG_REPLY &&
804             lustre_msg_get_type(req->rq_repmsg) != PTL_RPC_MSG_ERR) {
805                 DEBUG_REQ(D_ERROR, req, "invalid packet received (type=%u)",
806                           lustre_msg_get_type(req->rq_repmsg));
807                 RETURN(-EPROTO);
808         }
809
810         rc = ptlrpc_check_status(req);
811         if (rc) {
812                 /* Either we've been evicted, or the server has failed for
813                  * some reason. Try to reconnect, and if that fails, punt to
814                  * the upcall. */
815                 if (rc == -ENOTCONN || rc == -ENODEV) {
816                         if (req->rq_send_state != LUSTRE_IMP_FULL ||
817                             imp->imp_obd->obd_no_recov || imp->imp_dlm_fake) {
818                                 RETURN(-ENOTCONN);
819                         }
820                         ptlrpc_request_handle_notconn(req);
821                         RETURN(rc);
822                 }
823         } else {
824                 /* Let's look if server send slv. Do it only for RPC with 
825                  * rc == 0. */
826                 if (imp->imp_obd->obd_namespace) {
827                         /* Disconnect rpc is sent when namespace is already 
828                          * destroyed. Let's check this and will not try update
829                          * pool. */
830                         ldlm_cli_update_pool(req);
831                 }
832         }
833
834         /* Store transno in reqmsg for replay. */
835         req->rq_transno = lustre_msg_get_transno(req->rq_repmsg);
836         lustre_msg_set_transno(req->rq_reqmsg, req->rq_transno);
837
838         if (req->rq_import->imp_replayable) {
839                 spin_lock(&imp->imp_lock);
840                 /* no point in adding already-committed requests to the replay
841                  * list, we will just remove them immediately. b=9829 */
842                 if (req->rq_transno != 0 && 
843                     (req->rq_transno > 
844                      lustre_msg_get_last_committed(req->rq_repmsg) ||
845                      req->rq_replay))
846                         ptlrpc_retain_replayable_request(req, imp);
847                 else if (req->rq_commit_cb != NULL) {
848                         spin_unlock(&imp->imp_lock);
849                         req->rq_commit_cb(req);
850                         spin_lock(&imp->imp_lock);
851                 }
852
853                 /* Replay-enabled imports return commit-status information. */
854                 if (lustre_msg_get_last_committed(req->rq_repmsg))
855                         imp->imp_peer_committed_transno =
856                                 lustre_msg_get_last_committed(req->rq_repmsg);
857                 ptlrpc_free_committed(imp);
858                 spin_unlock(&imp->imp_lock);
859         }
860
861         RETURN(rc);
862 }
863
864 static int ptlrpc_send_new_req(struct ptlrpc_request *req)
865 {
866         struct obd_import     *imp;
867         int rc;
868         ENTRY;
869
870         LASSERT(req->rq_phase == RQ_PHASE_NEW);
871         req->rq_phase = RQ_PHASE_RPC;
872
873         imp = req->rq_import;
874         spin_lock(&imp->imp_lock);
875
876         req->rq_import_generation = imp->imp_generation;
877
878         if (ptlrpc_import_delay_req(imp, req, &rc)) {
879                 spin_lock (&req->rq_lock);
880                 req->rq_waiting = 1;
881                 spin_unlock (&req->rq_lock);
882
883                 DEBUG_REQ(D_HA, req, "req from PID %d waiting for recovery: "
884                           "(%s != %s)",
885                           lustre_msg_get_status(req->rq_reqmsg) ,
886                           ptlrpc_import_state_name(req->rq_send_state),
887                           ptlrpc_import_state_name(imp->imp_state));
888                 LASSERT(list_empty (&req->rq_list));
889
890                 list_add_tail(&req->rq_list, &imp->imp_delayed_list);
891                 spin_unlock(&imp->imp_lock);
892                 RETURN(0);
893         }
894
895         if (rc != 0) {
896                 spin_unlock(&imp->imp_lock);
897                 req->rq_status = rc;
898                 req->rq_phase = RQ_PHASE_INTERPRET;
899                 RETURN(rc);
900         }
901
902         /* XXX this is the same as ptlrpc_queue_wait */
903         LASSERT(list_empty(&req->rq_list));
904         list_add_tail(&req->rq_list, &imp->imp_sending_list);
905         spin_unlock(&imp->imp_lock);
906
907         lustre_msg_set_status(req->rq_reqmsg, cfs_curproc_pid());
908         CDEBUG(D_RPCTRACE, "Sending RPC pname:cluuid:pid:xid:nid:opc"
909                " %s:%s:%d:"LPU64":%s:%d\n", cfs_curproc_comm(),
910                imp->imp_obd->obd_uuid.uuid,
911                lustre_msg_get_status(req->rq_reqmsg), req->rq_xid,
912                libcfs_nid2str(imp->imp_connection->c_peer.nid),
913                lustre_msg_get_opc(req->rq_reqmsg));
914
915         rc = ptl_send_rpc(req, 0);
916         if (rc) {
917                 DEBUG_REQ(D_HA, req, "send failed (%d); expect timeout", rc);
918                 req->rq_net_err = 1;
919                 RETURN(rc);
920         }
921         RETURN(0);
922 }
923
924 /* this sends any unsent RPCs in @set and returns TRUE if all are sent */
925 int ptlrpc_check_set(struct ptlrpc_request_set *set)
926 {
927         struct list_head *tmp;
928         int force_timer_recalc = 0;
929         ENTRY;
930
931         if (set->set_remaining == 0)
932                 RETURN(1);
933
934         list_for_each(tmp, &set->set_requests) {
935                 struct ptlrpc_request *req =
936                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
937                 struct obd_import *imp = req->rq_import;
938                 int rc = 0;
939
940                 if (req->rq_phase == RQ_PHASE_NEW &&
941                     ptlrpc_send_new_req(req)) {
942                         force_timer_recalc = 1;
943                 }
944
945                 if (!(req->rq_phase == RQ_PHASE_RPC ||
946                       req->rq_phase == RQ_PHASE_BULK ||
947                       req->rq_phase == RQ_PHASE_INTERPRET ||
948                       req->rq_phase == RQ_PHASE_COMPLETE)) {
949                         DEBUG_REQ(D_ERROR, req, "bad phase %x", req->rq_phase);
950                         LBUG();
951                 }
952
953                 if (req->rq_phase == RQ_PHASE_COMPLETE)
954                         continue;
955
956                 if (req->rq_phase == RQ_PHASE_INTERPRET)
957                         GOTO(interpret, req->rq_status);
958
959                 if (req->rq_net_err && !req->rq_timedout)
960                         ptlrpc_expire_one_request(req);
961
962                 if (req->rq_err) {
963                         ptlrpc_unregister_reply(req);
964                         if (req->rq_status == 0)
965                                 req->rq_status = -EIO;
966                         req->rq_phase = RQ_PHASE_INTERPRET;
967
968                         spin_lock(&imp->imp_lock);
969                         list_del_init(&req->rq_list);
970                         spin_unlock(&imp->imp_lock);
971
972                         GOTO(interpret, req->rq_status);
973                 }
974
975                 /* ptlrpc_queue_wait->l_wait_event guarantees that rq_intr
976                  * will only be set after rq_timedout, but the oig waiting
977                  * path sets rq_intr irrespective of whether ptlrpcd has
978                  * seen a timeout.  our policy is to only interpret
979                  * interrupted rpcs after they have timed out */
980                 if (req->rq_intr && (req->rq_timedout || req->rq_waiting)) {
981                         /* NB could be on delayed list */
982                         ptlrpc_unregister_reply(req);
983                         req->rq_status = -EINTR;
984                         req->rq_phase = RQ_PHASE_INTERPRET;
985
986                         spin_lock(&imp->imp_lock);
987                         list_del_init(&req->rq_list);
988                         spin_unlock(&imp->imp_lock);
989
990                         GOTO(interpret, req->rq_status);
991                 }
992
993                 if (req->rq_phase == RQ_PHASE_RPC) {
994                         if (req->rq_timedout||req->rq_waiting||req->rq_resend) {
995                                 int status;
996
997                                 ptlrpc_unregister_reply(req);
998
999                                 spin_lock(&imp->imp_lock);
1000
1001                                 if (ptlrpc_import_delay_req(imp, req, &status)){
1002                                         spin_unlock(&imp->imp_lock);
1003                                         continue;
1004                                 }
1005
1006                                 list_del_init(&req->rq_list);
1007                                 if (status != 0)  {
1008                                         req->rq_status = status;
1009                                         req->rq_phase = RQ_PHASE_INTERPRET;
1010                                         spin_unlock(&imp->imp_lock);
1011                                         GOTO(interpret, req->rq_status);
1012                                 }
1013                                 if (req->rq_no_resend) {
1014                                         req->rq_status = -ENOTCONN;
1015                                         req->rq_phase = RQ_PHASE_INTERPRET;
1016                                         spin_unlock(&imp->imp_lock);
1017                                         GOTO(interpret, req->rq_status);
1018                                 }
1019                                 list_add_tail(&req->rq_list,
1020                                               &imp->imp_sending_list);
1021
1022                                 spin_unlock(&imp->imp_lock);
1023
1024                                 req->rq_waiting = 0;
1025                                 if (req->rq_resend) {
1026                                         lustre_msg_add_flags(req->rq_reqmsg,
1027                                                              MSG_RESENT);
1028                                         if (req->rq_bulk) {
1029                                                 __u64 old_xid = req->rq_xid;
1030
1031                                                 ptlrpc_unregister_bulk (req);
1032
1033                                                 /* ensure previous bulk fails */
1034                                                 req->rq_xid = ptlrpc_next_xid();
1035                                                 CDEBUG(D_HA, "resend bulk "
1036                                                        "old x"LPU64
1037                                                        " new x"LPU64"\n",
1038                                                        old_xid, req->rq_xid);
1039                                         }
1040                                 }
1041
1042                                 rc = ptl_send_rpc(req, 0);
1043                                 if (rc) {
1044                                         DEBUG_REQ(D_HA, req, "send failed (%d)",
1045                                                   rc);
1046                                         force_timer_recalc = 1;
1047                                         req->rq_net_err = 1;
1048                                 }
1049                                 /* need to reset the timeout */
1050                                 force_timer_recalc = 1;
1051                         }
1052
1053                         spin_lock(&req->rq_lock);
1054
1055                         if (req->rq_early) {
1056                                 ptlrpc_at_recv_early_reply(req);
1057                                 spin_unlock(&req->rq_lock);
1058                                 continue;
1059                         }
1060
1061                         /* Still waiting for a reply? */
1062                         if (req->rq_receiving_reply) {
1063                                 spin_unlock(&req->rq_lock);
1064                                 continue;
1065                         }
1066
1067                         /* Did we actually receive a reply? */
1068                         if (!req->rq_replied) {
1069                                 spin_unlock(&req->rq_lock);
1070                                 continue;
1071                         }
1072
1073                         spin_unlock(&req->rq_lock);
1074
1075                         spin_lock(&imp->imp_lock);
1076                         list_del_init(&req->rq_list);
1077                         spin_unlock(&imp->imp_lock);
1078
1079                         req->rq_status = after_reply(req);
1080                         if (req->rq_resend) {
1081                                 /* Add this req to the delayed list so
1082                                    it can be errored if the import is
1083                                    evicted after recovery. */
1084                                 spin_lock(&imp->imp_lock);
1085                                 list_add_tail(&req->rq_list,
1086                                               &imp->imp_delayed_list);
1087                                 spin_unlock(&imp->imp_lock);
1088                                 continue;
1089                         }
1090
1091                         /* If there is no bulk associated with this request,
1092                          * then we're done and should let the interpreter
1093                          * process the reply.  Similarly if the RPC returned
1094                          * an error, and therefore the bulk will never arrive.
1095                          */
1096                         if (req->rq_bulk == NULL || req->rq_status != 0) {
1097                                 req->rq_phase = RQ_PHASE_INTERPRET;
1098                                 GOTO(interpret, req->rq_status);
1099                         }
1100
1101                         req->rq_phase = RQ_PHASE_BULK;
1102                 }
1103
1104                 LASSERT(req->rq_phase == RQ_PHASE_BULK);
1105                 if (ptlrpc_bulk_active(req->rq_bulk))
1106                         continue;
1107
1108                 if (!req->rq_bulk->bd_success) {
1109                         /* The RPC reply arrived OK, but the bulk screwed
1110                          * up!  Dead wierd since the server told us the RPC
1111                          * was good after getting the REPLY for her GET or
1112                          * the ACK for her PUT. */
1113                         DEBUG_REQ(D_ERROR, req, "bulk transfer failed");
1114                         req->rq_status = -EIO;
1115                         req->rq_phase = RQ_PHASE_INTERPRET;
1116                         GOTO(interpret, req->rq_status);
1117                 }
1118
1119                 req->rq_phase = RQ_PHASE_INTERPRET;
1120
1121         interpret:
1122                 LASSERT(req->rq_phase == RQ_PHASE_INTERPRET);
1123                 LASSERT(!req->rq_receiving_reply);
1124
1125                 ptlrpc_unregister_reply(req);
1126                 if (req->rq_bulk != NULL)
1127                         ptlrpc_unregister_bulk (req);
1128
1129                 req->rq_phase = RQ_PHASE_COMPLETE;
1130
1131                 if (req->rq_interpret_reply != NULL) {
1132                         int (*interpreter)(struct ptlrpc_request *,void *,int) =
1133                                 req->rq_interpret_reply;
1134                         req->rq_status = interpreter(req, &req->rq_async_args,
1135                                                      req->rq_status);
1136                 }
1137
1138                 CDEBUG(D_RPCTRACE, "Completed RPC pname:cluuid:pid:xid:nid:"
1139                        "opc %s:%s:%d:"LPU64":%s:%d\n", cfs_curproc_comm(),
1140                        imp->imp_obd->obd_uuid.uuid,
1141                        lustre_msg_get_status(req->rq_reqmsg), req->rq_xid,
1142                        libcfs_nid2str(imp->imp_connection->c_peer.nid),
1143                        lustre_msg_get_opc(req->rq_reqmsg));
1144
1145                 set->set_remaining--;
1146
1147                 atomic_dec(&imp->imp_inflight);
1148                 cfs_waitq_signal(&imp->imp_recovery_waitq);
1149         }
1150
1151         /* If we hit an error, we want to recover promptly. */
1152         RETURN(set->set_remaining == 0 || force_timer_recalc);
1153 }
1154
1155 /* Return 1 if we should give up, else 0 */
1156 int ptlrpc_expire_one_request(struct ptlrpc_request *req)
1157 {
1158         struct obd_import *imp = req->rq_import;
1159         int rc = 0;
1160         ENTRY;
1161
1162         DEBUG_REQ(D_NETERROR, req, "%s (sent at %lu, %lus ago)",
1163                   req->rq_net_err ? "network error" : "timeout",
1164                   (long)req->rq_sent, cfs_time_current_sec() - req->rq_sent);
1165
1166         if (imp) {
1167                 LCONSOLE_WARN("Request x"LPU64" sent from %s to NID %s %lus ago"
1168                               " has timed out (limit %lus).\n", req->rq_xid,
1169                               req->rq_import->imp_obd->obd_name,
1170                               libcfs_nid2str(imp->imp_connection->c_peer.nid),
1171                               cfs_time_current_sec() - req->rq_sent,
1172                               req->rq_deadline - req->rq_sent);
1173         }
1174
1175         if (imp != NULL && obd_debug_peer_on_timeout)
1176                 LNetCtl(IOC_LIBCFS_DEBUG_PEER, &imp->imp_connection->c_peer);
1177
1178         spin_lock(&req->rq_lock);
1179         req->rq_timedout = 1;
1180         spin_unlock(&req->rq_lock);
1181
1182         ptlrpc_unregister_reply (req);
1183
1184         if (obd_dump_on_timeout)
1185                 libcfs_debug_dumplog();
1186
1187         if (req->rq_bulk != NULL)
1188                 ptlrpc_unregister_bulk (req);
1189
1190         if (imp == NULL) {
1191                 DEBUG_REQ(D_HA, req, "NULL import: already cleaned up?");
1192                 RETURN(1);
1193         }
1194
1195         /* The DLM server doesn't want recovery run on its imports. */
1196         if (imp->imp_dlm_fake)
1197                 RETURN(1);
1198
1199         /* If this request is for recovery or other primordial tasks,
1200          * then error it out here. */
1201         if (req->rq_send_state != LUSTRE_IMP_FULL ||
1202             imp->imp_obd->obd_no_recov) {
1203                 DEBUG_REQ(D_RPCTRACE, req, "err -110, sent_state=%s (now=%s)",
1204                           ptlrpc_import_state_name(req->rq_send_state),
1205                           ptlrpc_import_state_name(imp->imp_state));
1206                 spin_lock(&req->rq_lock);
1207                 req->rq_status = -ETIMEDOUT;
1208                 req->rq_err = 1;
1209                 spin_unlock(&req->rq_lock);
1210                 RETURN(1);
1211         }
1212
1213         /* if a request can't be resent we can't wait for an answer after 
1214            the timeout */
1215         if (req->rq_no_resend) {
1216                 DEBUG_REQ(D_RPCTRACE, req, "TIMEOUT-NORESEND:");
1217                 rc = 1;
1218         }
1219
1220         ptlrpc_fail_import(imp, lustre_msg_get_conn_cnt(req->rq_reqmsg));
1221
1222         RETURN(rc);
1223 }
1224
1225 int ptlrpc_expired_set(void *data)
1226 {
1227         struct ptlrpc_request_set *set = data;
1228         struct list_head          *tmp;
1229         time_t                     now = cfs_time_current_sec();
1230         ENTRY;
1231
1232         LASSERT(set != NULL);
1233
1234         /* A timeout expired; see which reqs it applies to... */
1235         list_for_each (tmp, &set->set_requests) {
1236                 struct ptlrpc_request *req =
1237                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
1238
1239                 /* request in-flight? */
1240                 if (!((req->rq_phase == RQ_PHASE_RPC && !req->rq_waiting &&
1241                        !req->rq_resend) ||
1242                       (req->rq_phase == RQ_PHASE_BULK)))
1243                         continue;
1244
1245                 if (req->rq_timedout ||           /* already dealt with */
1246                     req->rq_deadline > now) /* not expired */
1247                         continue;
1248
1249                 /* deal with this guy */
1250                 ptlrpc_expire_one_request (req);
1251         }
1252
1253         /* When waiting for a whole set, we always to break out of the
1254          * sleep so we can recalculate the timeout, or enable interrupts
1255          * iff everyone's timed out.
1256          */
1257         RETURN(1);
1258 }
1259
1260 void ptlrpc_mark_interrupted(struct ptlrpc_request *req)
1261 {
1262         spin_lock(&req->rq_lock);
1263         req->rq_intr = 1;
1264         spin_unlock(&req->rq_lock);
1265 }
1266
1267 void ptlrpc_interrupted_set(void *data)
1268 {
1269         struct ptlrpc_request_set *set = data;
1270         struct list_head *tmp;
1271
1272         LASSERT(set != NULL);
1273         CERROR("INTERRUPTED SET %p\n", set);
1274
1275         list_for_each(tmp, &set->set_requests) {
1276                 struct ptlrpc_request *req =
1277                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
1278
1279                 if (req->rq_phase != RQ_PHASE_RPC)
1280                         continue;
1281
1282                 ptlrpc_mark_interrupted(req);
1283         }
1284 }
1285
1286 /* get the smallest timeout in the set; this does NOT set a timeout. */
1287 int ptlrpc_set_next_timeout(struct ptlrpc_request_set *set)
1288 {
1289         struct list_head      *tmp;
1290         time_t                 now = cfs_time_current_sec();
1291         int                    timeout = 0;
1292         struct ptlrpc_request *req;
1293         ENTRY;
1294
1295         SIGNAL_MASK_ASSERT(); /* XXX BUG 1511 */
1296
1297         list_for_each(tmp, &set->set_requests) {
1298                 req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
1299
1300                 /* request in-flight? */
1301                 if (!(((req->rq_phase == RQ_PHASE_RPC) && !req->rq_waiting) ||
1302                       (req->rq_phase == RQ_PHASE_BULK)))
1303                         continue;
1304
1305                 if (req->rq_timedout)   /* already timed out */
1306                         continue;
1307
1308                 if (req->rq_deadline <= now) {  /* actually expired already */
1309                         timeout = 1;    /* ASAP */
1310                         break;
1311                 }
1312                 
1313                 if ((timeout == 0) || (timeout > (req->rq_deadline - now))) {
1314                         timeout = req->rq_deadline - now;
1315                 }
1316         }
1317         RETURN(timeout);
1318 }
1319
1320 int ptlrpc_set_wait(struct ptlrpc_request_set *set)
1321 {
1322         struct list_head      *tmp;
1323         struct ptlrpc_request *req;
1324         struct l_wait_info     lwi;
1325         int                    rc, timeout;
1326         ENTRY;
1327
1328         if (list_empty(&set->set_requests))
1329                 RETURN(0);
1330
1331         list_for_each(tmp, &set->set_requests) {
1332                 req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
1333                 if (req->rq_phase == RQ_PHASE_NEW)
1334                         (void)ptlrpc_send_new_req(req);
1335         }
1336
1337         do {
1338                 timeout = ptlrpc_set_next_timeout(set);
1339
1340                 /* wait until all complete, interrupted, or an in-flight
1341                  * req times out */
1342                 CDEBUG(D_RPCTRACE, "set %p going to sleep for %d seconds\n",
1343                        set, timeout);
1344                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(timeout ? timeout : 1),
1345                                        ptlrpc_expired_set,
1346                                        ptlrpc_interrupted_set, set);
1347                 rc = l_wait_event(set->set_waitq, ptlrpc_check_set(set), &lwi);
1348
1349                 LASSERT(rc == 0 || rc == -EINTR || rc == -ETIMEDOUT);
1350
1351                 /* -EINTR => all requests have been flagged rq_intr so next
1352                  * check completes.
1353                  * -ETIMEOUTD => someone timed out.  When all reqs have
1354                  * timed out, signals are enabled allowing completion with
1355                  * EINTR.
1356                  * I don't really care if we go once more round the loop in
1357                  * the error cases -eeb. */
1358         } while (rc != 0 || set->set_remaining != 0);
1359
1360         LASSERT(set->set_remaining == 0);
1361
1362         rc = 0;
1363         list_for_each(tmp, &set->set_requests) {
1364                 req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
1365
1366                 LASSERT(req->rq_phase == RQ_PHASE_COMPLETE);
1367                 if (req->rq_status != 0)
1368                         rc = req->rq_status;
1369         }
1370
1371         if (set->set_interpret != NULL) {
1372                 int (*interpreter)(struct ptlrpc_request_set *set,void *,int) =
1373                         set->set_interpret;
1374                 rc = interpreter (set, set->set_arg, rc);
1375         }
1376
1377         RETURN(rc);
1378 }
1379
1380 static void __ptlrpc_free_req_to_pool(struct ptlrpc_request *request)
1381 {
1382         struct ptlrpc_request_pool *pool = request->rq_pool;
1383
1384         spin_lock(&pool->prp_lock);
1385         list_add_tail(&request->rq_list, &pool->prp_req_list);
1386         spin_unlock(&pool->prp_lock);
1387 }
1388
1389 static void __ptlrpc_free_req(struct ptlrpc_request *request, int locked)
1390 {
1391         ENTRY;
1392         if (request == NULL) {
1393                 EXIT;
1394                 return;
1395         }
1396
1397         LASSERTF(!request->rq_receiving_reply, "req %p\n", request);
1398         LASSERTF(request->rq_rqbd == NULL, "req %p\n",request);/* client-side */
1399         LASSERTF(list_empty(&request->rq_list), "req %p\n", request);
1400         LASSERTF(list_empty(&request->rq_set_chain), "req %p\n", request);
1401
1402         /* We must take it off the imp_replay_list first.  Otherwise, we'll set
1403          * request->rq_reqmsg to NULL while osc_close is dereferencing it. */
1404         if (request->rq_import != NULL) {
1405                 if (!locked)
1406                         spin_lock(&request->rq_import->imp_lock);
1407                 list_del_init(&request->rq_replay_list);
1408                 if (!locked)
1409                         spin_unlock(&request->rq_import->imp_lock);
1410         }
1411         LASSERTF(list_empty(&request->rq_replay_list), "req %p\n", request);
1412
1413         if (atomic_read(&request->rq_refcount) != 0) {
1414                 DEBUG_REQ(D_ERROR, request,
1415                           "freeing request with nonzero refcount");
1416                 LBUG();
1417         }
1418
1419         if (request->rq_repbuf != NULL) {
1420                 OBD_FREE(request->rq_repbuf, request->rq_replen);
1421                 request->rq_repbuf = NULL;
1422                 request->rq_repmsg = NULL;
1423         }
1424         if (request->rq_export != NULL) {
1425                 class_export_put(request->rq_export);
1426                 request->rq_export = NULL;
1427         }
1428         if (request->rq_import != NULL) {
1429                 class_import_put(request->rq_import);
1430                 request->rq_import = NULL;
1431         }
1432         if (request->rq_bulk != NULL)
1433                 ptlrpc_free_bulk(request->rq_bulk);
1434
1435         if (request->rq_pool) {
1436                 __ptlrpc_free_req_to_pool(request);
1437         } else {
1438                 if (request->rq_reqmsg != NULL) {
1439                         OBD_FREE(request->rq_reqmsg, request->rq_reqlen);
1440                         request->rq_reqmsg = NULL;
1441                 }
1442                 OBD_FREE(request, sizeof(*request));
1443         }
1444         EXIT;
1445 }
1446
1447 void ptlrpc_free_req(struct ptlrpc_request *request)
1448 {
1449         __ptlrpc_free_req(request, 0);
1450 }
1451
1452 static int __ptlrpc_req_finished(struct ptlrpc_request *request, int locked);
1453 void ptlrpc_req_finished_with_imp_lock(struct ptlrpc_request *request)
1454 {
1455         LASSERT_SPIN_LOCKED(&request->rq_import->imp_lock);
1456         (void)__ptlrpc_req_finished(request, 1);
1457 }
1458
1459 static int __ptlrpc_req_finished(struct ptlrpc_request *request, int locked)
1460 {
1461         ENTRY;
1462         if (request == NULL)
1463                 RETURN(1);
1464
1465         if (request == LP_POISON ||
1466             request->rq_reqmsg == LP_POISON) {
1467                 CERROR("dereferencing freed request (bug 575)\n");
1468                 LBUG();
1469                 RETURN(1);
1470         }
1471
1472         DEBUG_REQ(D_INFO, request, "refcount now %u",
1473                   atomic_read(&request->rq_refcount) - 1);
1474
1475         if (atomic_dec_and_test(&request->rq_refcount)) {
1476                 __ptlrpc_free_req(request, locked);
1477                 RETURN(1);
1478         }
1479
1480         RETURN(0);
1481 }
1482
1483 void ptlrpc_req_finished(struct ptlrpc_request *request)
1484 {
1485         __ptlrpc_req_finished(request, 0);
1486 }
1487
1488 __u64 ptlrpc_req_xid(struct ptlrpc_request *request)
1489 {
1490         return request->rq_xid;
1491 }
1492 EXPORT_SYMBOL(ptlrpc_req_xid);
1493
1494 /* Disengage the client's reply buffer from the network
1495  * NB does _NOT_ unregister any client-side bulk.
1496  * IDEMPOTENT, but _not_ safe against concurrent callers.
1497  * The request owner (i.e. the thread doing the I/O) must call...
1498  */
1499 void ptlrpc_unregister_reply (struct ptlrpc_request *request)
1500 {
1501         int                rc;
1502         cfs_waitq_t       *wq;
1503         struct l_wait_info lwi;
1504         ENTRY;
1505
1506         LASSERT(!in_interrupt ());             /* might sleep */
1507         if (!ptlrpc_client_recv_or_unlink(request))
1508                 /* Nothing left to do */
1509                 return;
1510
1511         LNetMDUnlink (request->rq_reply_md_h);
1512
1513         /* We have to l_wait_event() whatever the result, to give liblustre
1514          * a chance to run reply_in_callback(), and to make sure we've 
1515          * unlinked before returning a req to the pool */
1516
1517         if (request->rq_set != NULL)
1518                 wq = &request->rq_set->set_waitq;
1519         else
1520                 wq = &request->rq_reply_waitq;
1521
1522         for (;;) {
1523                 /* Network access will complete in finite time but the HUGE
1524                  * timeout lets us CWARN for visibility of sluggish NALs */
1525                 lwi = LWI_TIMEOUT(cfs_time_seconds(LONG_UNLINK), NULL, NULL);
1526                 rc = l_wait_event (*wq, !ptlrpc_client_recv_or_unlink(request),
1527                                    &lwi);
1528                 if (rc == 0)
1529                         return;
1530
1531                 LASSERT (rc == -ETIMEDOUT);
1532                 DEBUG_REQ(D_WARNING, request, "Unexpectedly long timeout "
1533                           "rvcng=%d unlnk=%d", request->rq_receiving_reply,
1534                           request->rq_must_unlink);
1535         }
1536         EXIT;
1537 }
1538
1539 /* caller must hold imp->imp_lock */
1540 void ptlrpc_free_committed(struct obd_import *imp)
1541 {
1542         struct list_head *tmp, *saved;
1543         struct ptlrpc_request *req;
1544         struct ptlrpc_request *last_req = NULL; /* temporary fire escape */
1545         ENTRY;
1546
1547         LASSERT(imp != NULL);
1548
1549         LASSERT_SPIN_LOCKED(&imp->imp_lock);
1550
1551
1552         if (imp->imp_peer_committed_transno == imp->imp_last_transno_checked &&
1553             imp->imp_generation == imp->imp_last_generation_checked) {
1554                 CDEBUG(D_RPCTRACE, "%s: skip recheck: last_committed "LPU64"\n",
1555                        imp->imp_obd->obd_name, imp->imp_peer_committed_transno);
1556                 return;
1557         }
1558
1559         CDEBUG(D_RPCTRACE, "%s: committing for last_committed "LPU64" gen %d\n",
1560                imp->imp_obd->obd_name, imp->imp_peer_committed_transno,
1561                imp->imp_generation);
1562         imp->imp_last_transno_checked = imp->imp_peer_committed_transno;
1563         imp->imp_last_generation_checked = imp->imp_generation;
1564
1565         list_for_each_safe(tmp, saved, &imp->imp_replay_list) {
1566                 req = list_entry(tmp, struct ptlrpc_request, rq_replay_list);
1567
1568                 /* XXX ok to remove when 1357 resolved - rread 05/29/03  */
1569                 LASSERT(req != last_req);
1570                 last_req = req;
1571
1572                 if (req->rq_import_generation < imp->imp_generation) {
1573                         DEBUG_REQ(D_RPCTRACE, req, "free request with old gen");
1574                         GOTO(free_req, 0);
1575                 }
1576
1577                 if (req->rq_replay) {
1578                         DEBUG_REQ(D_RPCTRACE, req, "keeping (FL_REPLAY)");
1579                         continue;
1580                 }
1581
1582                 /* not yet committed */
1583                 if (req->rq_transno > imp->imp_peer_committed_transno) {
1584                         DEBUG_REQ(D_RPCTRACE, req, "stopping search");
1585                         break;
1586                 }
1587
1588                 DEBUG_REQ(D_RPCTRACE, req, "commit (last_committed "LPU64")",
1589                           imp->imp_peer_committed_transno);
1590 free_req:
1591                 spin_lock(&req->rq_lock);
1592                 req->rq_replay = 0;
1593                 spin_unlock(&req->rq_lock);
1594                 if (req->rq_commit_cb != NULL)
1595                         req->rq_commit_cb(req);
1596                 list_del_init(&req->rq_replay_list);
1597                 __ptlrpc_req_finished(req, 1);
1598         }
1599
1600         EXIT;
1601         return;
1602 }
1603
1604 void ptlrpc_cleanup_client(struct obd_import *imp)
1605 {
1606         ENTRY;
1607         EXIT;
1608         return;
1609 }
1610
1611 void ptlrpc_resend_req(struct ptlrpc_request *req)
1612 {
1613         DEBUG_REQ(D_HA, req, "going to resend");
1614         lustre_msg_set_handle(req->rq_reqmsg, &(struct lustre_handle){ 0 });
1615         req->rq_status = -EAGAIN;
1616
1617         spin_lock(&req->rq_lock);
1618         req->rq_resend = 1;
1619         req->rq_net_err = 0;
1620         req->rq_timedout = 0;
1621         if (req->rq_bulk) {
1622                 __u64 old_xid = req->rq_xid;
1623
1624                 /* ensure previous bulk fails */
1625                 req->rq_xid = ptlrpc_next_xid();
1626                 CDEBUG(D_HA, "resend bulk old x"LPU64" new x"LPU64"\n",
1627                        old_xid, req->rq_xid);
1628         }
1629         ptlrpc_wake_client_req(req);
1630         spin_unlock(&req->rq_lock);
1631 }
1632
1633 /* XXX: this function and rq_status are currently unused */
1634 void ptlrpc_restart_req(struct ptlrpc_request *req)
1635 {
1636         DEBUG_REQ(D_HA, req, "restarting (possibly-)completed request");
1637         req->rq_status = -ERESTARTSYS;
1638
1639         spin_lock(&req->rq_lock);
1640         req->rq_restart = 1;
1641         req->rq_timedout = 0;
1642         ptlrpc_wake_client_req(req);
1643         spin_unlock(&req->rq_lock);
1644 }
1645
1646 static void interrupted_request(void *data)
1647 {
1648         struct ptlrpc_request *req = data;
1649         DEBUG_REQ(D_HA, req, "request interrupted");
1650         spin_lock(&req->rq_lock);
1651         req->rq_intr = 1;
1652         spin_unlock(&req->rq_lock);
1653 }
1654
1655 struct ptlrpc_request *ptlrpc_request_addref(struct ptlrpc_request *req)
1656 {
1657         ENTRY;
1658         atomic_inc(&req->rq_refcount);
1659         RETURN(req);
1660 }
1661
1662 void ptlrpc_retain_replayable_request(struct ptlrpc_request *req,
1663                                       struct obd_import *imp)
1664 {
1665         struct list_head *tmp;
1666
1667         LASSERT_SPIN_LOCKED(&imp->imp_lock);
1668
1669         /* clear this for new requests that were resent as well
1670            as resent replayed requests. */
1671         lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT);
1672
1673         /* don't re-add requests that have been replayed */
1674         if (!list_empty(&req->rq_replay_list))
1675                 return;
1676
1677         lustre_msg_add_flags(req->rq_reqmsg, MSG_REPLAY);
1678
1679         LASSERT(imp->imp_replayable);
1680         /* Balanced in ptlrpc_free_committed, usually. */
1681         ptlrpc_request_addref(req);
1682         list_for_each_prev(tmp, &imp->imp_replay_list) {
1683                 struct ptlrpc_request *iter =
1684                         list_entry(tmp, struct ptlrpc_request, rq_replay_list);
1685
1686                 /* We may have duplicate transnos if we create and then
1687                  * open a file, or for closes retained if to match creating
1688                  * opens, so use req->rq_xid as a secondary key.
1689                  * (See bugs 684, 685, and 428.)
1690                  * XXX no longer needed, but all opens need transnos!
1691                  */
1692                 if (iter->rq_transno > req->rq_transno)
1693                         continue;
1694
1695                 if (iter->rq_transno == req->rq_transno) {
1696                         LASSERT(iter->rq_xid != req->rq_xid);
1697                         if (iter->rq_xid > req->rq_xid)
1698                                 continue;
1699                 }
1700
1701                 list_add(&req->rq_replay_list, &iter->rq_replay_list);
1702                 return;
1703         }
1704
1705         list_add_tail(&req->rq_replay_list, &imp->imp_replay_list);
1706 }
1707
1708 int ptlrpc_queue_wait(struct ptlrpc_request *req)
1709 {
1710         int rc = 0;
1711         int brc;
1712         struct l_wait_info lwi;
1713         struct obd_import *imp = req->rq_import;
1714         cfs_duration_t timeout = CFS_TICK;
1715         long timeoutl;
1716         ENTRY;
1717
1718         LASSERT(req->rq_set == NULL);
1719         LASSERT(!req->rq_receiving_reply);
1720         atomic_inc(&imp->imp_inflight);
1721
1722         /* for distributed debugging */
1723         lustre_msg_set_status(req->rq_reqmsg, cfs_curproc_pid());
1724         LASSERT(imp->imp_obd != NULL);
1725         CDEBUG(D_RPCTRACE, "Sending RPC pname:cluuid:pid:xid:nid:opc "
1726                "%s:%s:%d:"LPU64":%s:%d\n", cfs_curproc_comm(),
1727                imp->imp_obd->obd_uuid.uuid,
1728                lustre_msg_get_status(req->rq_reqmsg), req->rq_xid,
1729                libcfs_nid2str(imp->imp_connection->c_peer.nid),
1730                lustre_msg_get_opc(req->rq_reqmsg));
1731
1732         /* Mark phase here for a little debug help */
1733         req->rq_phase = RQ_PHASE_RPC;
1734
1735         spin_lock(&imp->imp_lock);
1736         req->rq_import_generation = imp->imp_generation;
1737 restart:
1738         if (ptlrpc_import_delay_req(imp, req, &rc)) {
1739                 list_del(&req->rq_list);
1740
1741                 list_add_tail(&req->rq_list, &imp->imp_delayed_list);
1742                 spin_unlock(&imp->imp_lock);
1743
1744                 DEBUG_REQ(D_HA, req, "\"%s\" waiting for recovery: (%s != %s)",
1745                           cfs_curproc_comm(),
1746                           ptlrpc_import_state_name(req->rq_send_state),
1747                           ptlrpc_import_state_name(imp->imp_state));
1748                 lwi = LWI_INTR(interrupted_request, req);
1749                 rc = l_wait_event(req->rq_reply_waitq,
1750                                   (req->rq_send_state == imp->imp_state ||
1751                                    req->rq_err || req->rq_intr),
1752                                   &lwi);
1753                 DEBUG_REQ(D_HA, req, "\"%s\" awake: (%s == %s or %d/%d == 1)",
1754                           cfs_curproc_comm(),
1755                           ptlrpc_import_state_name(imp->imp_state),
1756                           ptlrpc_import_state_name(req->rq_send_state),
1757                           req->rq_err, req->rq_intr);
1758
1759                 spin_lock(&imp->imp_lock);
1760                 list_del_init(&req->rq_list);
1761
1762                 if (req->rq_err) {
1763                         /* rq_status was set locally */
1764                         rc = -EIO;
1765                 }
1766                 else if (req->rq_intr) {
1767                         rc = -EINTR;
1768                 }
1769                 else if (req->rq_no_resend) {
1770                         spin_unlock(&imp->imp_lock);
1771                         GOTO(out, rc = -ETIMEDOUT);
1772                 }
1773                 else {
1774                         GOTO(restart, rc);
1775                 }
1776         }
1777
1778         if (rc != 0) {
1779                 list_del_init(&req->rq_list);
1780                 spin_unlock(&imp->imp_lock);
1781                 req->rq_status = rc; // XXX this ok?
1782                 GOTO(out, rc);
1783         }
1784
1785         if (req->rq_resend) {
1786                 lustre_msg_add_flags(req->rq_reqmsg, MSG_RESENT);
1787
1788                 if (req->rq_bulk != NULL) {
1789                         ptlrpc_unregister_bulk (req);
1790
1791                         /* bulk requests are supposed to be
1792                          * idempotent, so we are free to bump the xid
1793                          * here, which we need to do before
1794                          * registering the bulk again (bug 6371).
1795                          * print the old xid first for sanity.
1796                          */
1797                         DEBUG_REQ(D_HA, req, "bumping xid for bulk: ");
1798                         req->rq_xid = ptlrpc_next_xid();
1799                 }
1800
1801                 DEBUG_REQ(D_HA, req, "resending: ");
1802         }
1803
1804         /* XXX this is the same as ptlrpc_set_wait */
1805         LASSERT(list_empty(&req->rq_list));
1806         list_add_tail(&req->rq_list, &imp->imp_sending_list);
1807         spin_unlock(&imp->imp_lock);
1808
1809         rc = ptl_send_rpc(req, 0);
1810         if (rc) 
1811                 DEBUG_REQ(D_HA, req, "send failed (%d); recovering", rc);
1812         do {
1813                 timeoutl = req->rq_deadline - cfs_time_current_sec();
1814                 timeout = (timeoutl <= 0 || rc) ? CFS_TICK :
1815                         cfs_time_seconds(timeoutl);
1816                 DEBUG_REQ(D_NET, req, 
1817                           "-- sleeping for "CFS_DURATION_T" ticks", timeout);
1818                 lwi = LWI_TIMEOUT_INTR(timeout, NULL, interrupted_request, req);
1819                 brc = l_wait_event(req->rq_reply_waitq, ptlrpc_check_reply(req),
1820                                   &lwi);
1821                 /* Wait again if we changed deadline */
1822         } while ((brc == -ETIMEDOUT) && 
1823                  (req->rq_deadline > cfs_time_current_sec()));
1824
1825         if ((brc == -ETIMEDOUT) && !ptlrpc_expire_one_request(req)) {
1826                 /* Wait forever for reconnect / replay or failure */
1827                 lwi = LWI_INTR(interrupted_request, req);
1828                 rc = l_wait_event(req->rq_reply_waitq, ptlrpc_check_reply(req),
1829                                   &lwi);
1830         }
1831
1832         CDEBUG(D_RPCTRACE, "Completed RPC pname:cluuid:pid:xid:nid:opc "
1833                "%s:%s:%d:"LPU64":%s:%d\n", cfs_curproc_comm(),
1834                imp->imp_obd->obd_uuid.uuid,
1835                lustre_msg_get_status(req->rq_reqmsg), req->rq_xid,
1836                libcfs_nid2str(imp->imp_connection->c_peer.nid),
1837                lustre_msg_get_opc(req->rq_reqmsg));
1838
1839         spin_lock(&imp->imp_lock);
1840         list_del_init(&req->rq_list);
1841         spin_unlock(&imp->imp_lock);
1842
1843         /* If the reply was received normally, this just grabs the spinlock
1844          * (ensuring the reply callback has returned), sees that
1845          * req->rq_receiving_reply is clear and returns. */
1846         ptlrpc_unregister_reply (req);
1847
1848
1849         if (req->rq_err) {
1850                 DEBUG_REQ(D_RPCTRACE, req, "err rc=%d status=%d", 
1851                           rc, req->rq_status);
1852                 GOTO(out, rc = -EIO);
1853         }
1854
1855         if (req->rq_intr) {
1856                 /* Should only be interrupted if we timed out. */
1857                 if (!req->rq_timedout)
1858                         DEBUG_REQ(D_ERROR, req,
1859                                   "rq_intr set but rq_timedout not");
1860                 GOTO(out, rc = -EINTR);
1861         }
1862
1863         /* Resend if we need to */
1864         if (req->rq_resend) {
1865                 /* ...unless we were specifically told otherwise. */
1866                 if (req->rq_no_resend)
1867                         GOTO(out, rc = -ETIMEDOUT);
1868                 spin_lock(&imp->imp_lock);
1869                 goto restart;
1870         }
1871
1872         if (req->rq_timedout) {                 /* non-recoverable timeout */
1873                 GOTO(out, rc = -ETIMEDOUT);
1874         }
1875
1876         if (!req->rq_replied) {
1877                 /* How can this be? -eeb */
1878                 DEBUG_REQ(D_ERROR, req, "!rq_replied: ");
1879                 LBUG();
1880                 GOTO(out, rc = req->rq_status);
1881         }
1882
1883         rc = after_reply(req);
1884         /* NB may return +ve success rc */
1885         if (req->rq_resend) {
1886                 spin_lock(&imp->imp_lock);
1887                 goto restart;
1888         }
1889
1890  out:
1891         if (req->rq_bulk != NULL) {
1892                 if (rc >= 0) {
1893                         /* success so far.  Note that anything going wrong
1894                          * with bulk now, is EXTREMELY strange, since the
1895                          * server must have believed that the bulk
1896                          * tranferred OK before she replied with success to
1897                          * me. */
1898                         lwi = LWI_TIMEOUT(timeout, NULL, NULL);
1899                         brc = l_wait_event(req->rq_reply_waitq,
1900                                            !ptlrpc_bulk_active(req->rq_bulk),
1901                                            &lwi);
1902                         LASSERT(brc == 0 || brc == -ETIMEDOUT);
1903                         if (brc != 0) {
1904                                 LASSERT(brc == -ETIMEDOUT);
1905                                 DEBUG_REQ(D_ERROR, req, "bulk timed out");
1906                                 rc = brc;
1907                         } else if (!req->rq_bulk->bd_success) {
1908                                 DEBUG_REQ(D_ERROR, req, "bulk transfer failed");
1909                                 rc = -EIO;
1910                         }
1911                 }
1912                 if (rc < 0)
1913                         ptlrpc_unregister_bulk (req);
1914         }
1915
1916         LASSERT(!req->rq_receiving_reply);
1917         req->rq_phase = RQ_PHASE_INTERPRET;
1918
1919         atomic_dec(&imp->imp_inflight);
1920         cfs_waitq_signal(&imp->imp_recovery_waitq);
1921         RETURN(rc);
1922 }
1923
1924 struct ptlrpc_replay_async_args {
1925         int praa_old_state;
1926         int praa_old_status;
1927 };
1928
1929 static int ptlrpc_replay_interpret(struct ptlrpc_request *req,
1930                                     void * data, int rc)
1931 {
1932         struct ptlrpc_replay_async_args *aa = data;
1933         struct obd_import *imp = req->rq_import;
1934
1935         ENTRY;
1936         atomic_dec(&imp->imp_replay_inflight);
1937
1938         if (!req->rq_replied) {
1939                 CERROR("request replay timed out, restarting recovery\n");
1940                 GOTO(out, rc = -ETIMEDOUT);
1941         }
1942
1943         if (lustre_msg_get_type(req->rq_repmsg) == PTL_RPC_MSG_ERR &&
1944             lustre_msg_get_status(req->rq_repmsg) == -ENOTCONN)
1945                 GOTO(out, rc = lustre_msg_get_status(req->rq_repmsg));
1946
1947         /* The transno had better not change over replay. */
1948         LASSERT(lustre_msg_get_transno(req->rq_reqmsg) ==
1949                 lustre_msg_get_transno(req->rq_repmsg));
1950
1951         DEBUG_REQ(D_HA, req, "got rep");
1952
1953         /* let the callback do fixups, possibly including in the request */
1954         if (req->rq_replay_cb)
1955                 req->rq_replay_cb(req);
1956
1957         if (req->rq_replied &&
1958             lustre_msg_get_status(req->rq_repmsg) != aa->praa_old_status) {
1959                 DEBUG_REQ(D_ERROR, req, "status %d, old was %d",
1960                           lustre_msg_get_status(req->rq_repmsg),
1961                           aa->praa_old_status);
1962         } else {
1963                 /* Put it back for re-replay. */
1964                 lustre_msg_set_status(req->rq_repmsg, aa->praa_old_status);
1965         }
1966
1967         spin_lock(&imp->imp_lock);
1968         imp->imp_last_replay_transno = req->rq_transno;
1969         spin_unlock(&imp->imp_lock);
1970
1971         /* continue with recovery */
1972         rc = ptlrpc_import_recovery_state_machine(imp);
1973  out:
1974         req->rq_send_state = aa->praa_old_state;
1975
1976         if (rc != 0)
1977                 /* this replay failed, so restart recovery */
1978                 ptlrpc_connect_import(imp, NULL);
1979
1980         RETURN(rc);
1981 }
1982
1983
1984 int ptlrpc_replay_req(struct ptlrpc_request *req)
1985 {
1986         struct ptlrpc_replay_async_args *aa;
1987         ENTRY;
1988
1989         LASSERT(req->rq_import->imp_state == LUSTRE_IMP_REPLAY);
1990
1991         /* Not handling automatic bulk replay yet (or ever?) */
1992         LASSERT(req->rq_bulk == NULL);
1993
1994         DEBUG_REQ(D_HA, req, "REPLAY");
1995
1996         LASSERT (sizeof (*aa) <= sizeof (req->rq_async_args));
1997         aa = (struct ptlrpc_replay_async_args *)&req->rq_async_args;
1998         memset(aa, 0, sizeof *aa);
1999
2000         /* Prepare request to be resent with ptlrpcd */
2001         aa->praa_old_state = req->rq_send_state;
2002         req->rq_send_state = LUSTRE_IMP_REPLAY;
2003         req->rq_phase = RQ_PHASE_NEW;
2004         if (req->rq_repmsg)
2005                 aa->praa_old_status = lustre_msg_get_status(req->rq_repmsg);
2006         req->rq_status = 0;
2007         req->rq_interpret_reply = ptlrpc_replay_interpret;
2008
2009         atomic_inc(&req->rq_import->imp_replay_inflight);
2010         ptlrpc_request_addref(req); /* ptlrpcd needs a ref */
2011
2012         ptlrpcd_add_req(req);
2013         RETURN(0);
2014 }
2015
2016 void ptlrpc_abort_inflight(struct obd_import *imp)
2017 {
2018         struct list_head *tmp, *n;
2019         ENTRY;
2020
2021         /* Make sure that no new requests get processed for this import.
2022          * ptlrpc_{queue,set}_wait must (and does) hold imp_lock while testing
2023          * this flag and then putting requests on sending_list or delayed_list.
2024          */
2025         spin_lock(&imp->imp_lock);
2026
2027         /* XXX locking?  Maybe we should remove each request with the list
2028          * locked?  Also, how do we know if the requests on the list are
2029          * being freed at this time?
2030          */
2031         list_for_each_safe(tmp, n, &imp->imp_sending_list) {
2032                 struct ptlrpc_request *req =
2033                         list_entry(tmp, struct ptlrpc_request, rq_list);
2034
2035                 DEBUG_REQ(D_RPCTRACE, req, "inflight");
2036
2037                 spin_lock (&req->rq_lock);
2038                 if (req->rq_import_generation < imp->imp_generation) {
2039                         req->rq_err = 1;
2040                         req->rq_status = -EINTR;
2041                         ptlrpc_wake_client_req(req);
2042                 }
2043                 spin_unlock (&req->rq_lock);
2044         }
2045
2046         list_for_each_safe(tmp, n, &imp->imp_delayed_list) {
2047                 struct ptlrpc_request *req =
2048                         list_entry(tmp, struct ptlrpc_request, rq_list);
2049
2050                 DEBUG_REQ(D_RPCTRACE, req, "aborting waiting req");
2051
2052                 spin_lock (&req->rq_lock);
2053                 if (req->rq_import_generation < imp->imp_generation) {
2054                         req->rq_err = 1;
2055                         req->rq_status = -EINTR;
2056                         ptlrpc_wake_client_req(req);
2057                 }
2058                 spin_unlock (&req->rq_lock);
2059         }
2060
2061         /* Last chance to free reqs left on the replay list, but we
2062          * will still leak reqs that haven't committed.  */
2063         if (imp->imp_replayable)
2064                 ptlrpc_free_committed(imp);
2065
2066         spin_unlock(&imp->imp_lock);
2067
2068         EXIT;
2069 }
2070
2071 static __u64 ptlrpc_last_xid = 0;
2072 spinlock_t ptlrpc_last_xid_lock;
2073
2074 __u64 ptlrpc_next_xid(void)
2075 {
2076         __u64 tmp;
2077         spin_lock(&ptlrpc_last_xid_lock);
2078         tmp = ++ptlrpc_last_xid;
2079         spin_unlock(&ptlrpc_last_xid_lock);
2080         return tmp;
2081 }
2082
2083 __u64 ptlrpc_sample_next_xid(void)
2084 {
2085         __u64 tmp;
2086         spin_lock(&ptlrpc_last_xid_lock);
2087         tmp = ptlrpc_last_xid + 1;
2088         spin_unlock(&ptlrpc_last_xid_lock);
2089         return tmp;
2090 }
2091 EXPORT_SYMBOL(ptlrpc_sample_next_xid);
2092