Whamcloud - gitweb
land v0.9.1 on HEAD, in preparation for a 1.0.x branch
[fs/lustre-release.git] / lustre / ptlrpc / client.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  *
21  */
22
23 #define DEBUG_SUBSYSTEM S_RPC
24 #ifndef __KERNEL__
25 #include <errno.h>
26 #include <signal.h>
27 #include <liblustre.h>
28 #endif
29
30 #include <linux/obd_support.h>
31 #include <linux/obd_class.h>
32 #include <linux/lustre_lib.h>
33 #include <linux/lustre_ha.h>
34 #include <linux/lustre_import.h>
35
36 #include "ptlrpc_internal.h"
37
38 void ptlrpc_init_client(int req_portal, int rep_portal, char *name,
39                         struct ptlrpc_client *cl)
40 {
41         cl->cli_request_portal = req_portal;
42         cl->cli_reply_portal   = rep_portal;
43         cl->cli_name           = name;
44 }
45
46 struct obd_uuid *ptlrpc_req_to_uuid(struct ptlrpc_request *req)
47 {
48         return &req->rq_connection->c_remote_uuid;
49 }
50
51 struct ptlrpc_connection *ptlrpc_uuid_to_connection(struct obd_uuid *uuid)
52 {
53         struct ptlrpc_connection *c;
54         struct ptlrpc_peer peer;
55         int err;
56
57         err = ptlrpc_uuid_to_peer(uuid, &peer);
58         if (err != 0) {
59                 CERROR("cannot find peer %s!\n", uuid->uuid);
60                 return NULL;
61         }
62
63         c = ptlrpc_get_connection(&peer, uuid);
64         if (c) {
65                 memcpy(c->c_remote_uuid.uuid,
66                        uuid->uuid, sizeof(c->c_remote_uuid.uuid));
67         }
68
69         CDEBUG(D_INFO, "%s -> %p\n", uuid->uuid, c);
70
71         return c;
72 }
73
74 void ptlrpc_readdress_connection(struct ptlrpc_connection *conn,
75                                  struct obd_uuid *uuid)
76 {
77         struct ptlrpc_peer peer;
78         int err;
79
80         err = ptlrpc_uuid_to_peer(uuid, &peer);
81         if (err != 0) {
82                 CERROR("cannot find peer %s!\n", uuid->uuid);
83                 return;
84         }
85
86         memcpy(&conn->c_peer, &peer, sizeof (peer));
87         return;
88 }
89
90 static inline struct ptlrpc_bulk_desc *new_bulk(void)
91 {
92         struct ptlrpc_bulk_desc *desc;
93
94         OBD_ALLOC(desc, sizeof(*desc));
95         if (!desc)
96                 return NULL;
97
98         spin_lock_init(&desc->bd_lock);
99         init_waitqueue_head(&desc->bd_waitq);
100         INIT_LIST_HEAD(&desc->bd_page_list);
101         desc->bd_md_h = PTL_HANDLE_NONE;
102         desc->bd_me_h = PTL_HANDLE_NONE;
103
104         return desc;
105 }
106
107 struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_imp (struct ptlrpc_request *req,
108                                                int type, int portal)
109 {
110         struct obd_import *imp = req->rq_import;
111         struct ptlrpc_bulk_desc *desc;
112
113         LASSERT(type == BULK_PUT_SINK || type == BULK_GET_SOURCE);
114
115         desc = new_bulk();
116         if (desc == NULL)
117                 RETURN(NULL);
118
119         desc->bd_import_generation = req->rq_import_generation;
120         desc->bd_import = class_import_get(imp);
121         desc->bd_req = req;
122         desc->bd_type = type;
123         desc->bd_portal = portal;
124
125         /* This makes req own desc, and free it when she frees herself */
126         req->rq_bulk = desc;
127
128         return desc;
129 }
130
131 struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_exp (struct ptlrpc_request *req,
132                                                int type, int portal)
133 {
134         struct obd_export *exp = req->rq_export;
135         struct ptlrpc_bulk_desc *desc;
136
137         LASSERT(type == BULK_PUT_SOURCE || type == BULK_GET_SINK);
138
139         desc = new_bulk();
140         if (desc == NULL)
141                 RETURN(NULL);
142
143         desc->bd_export = class_export_get(exp);
144         desc->bd_req = req;
145         desc->bd_type = type;
146         desc->bd_portal = portal;
147
148         /* NB we don't assign rq_bulk here; server-side requests are
149          * re-used, and the handler frees the bulk desc explicitly. */
150
151         return desc;
152 }
153
154 int ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc,
155                           struct page *page, int pageoffset, int len)
156 {
157         struct ptlrpc_bulk_page *bulk;
158
159         OBD_ALLOC(bulk, sizeof(*bulk));
160         if (bulk == NULL)
161                 return -ENOMEM;
162
163         LASSERT(page != NULL);
164         LASSERT(pageoffset >= 0);
165         LASSERT(len > 0);
166         LASSERT(pageoffset + len <= PAGE_SIZE);
167
168         bulk->bp_page = page;
169         bulk->bp_pageoffset = pageoffset;
170         bulk->bp_buflen = len;
171
172         bulk->bp_desc = desc;
173         list_add_tail(&bulk->bp_link, &desc->bd_page_list);
174         desc->bd_page_count++;
175         return 0;
176 }
177
178 void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *desc)
179 {
180         struct list_head *tmp, *next;
181         ENTRY;
182
183         LASSERT(desc != NULL);
184         LASSERT(desc->bd_page_count != 0x5a5a5a5a); /* not freed already */
185         LASSERT(!desc->bd_network_rw);         /* network hands off or */
186
187         list_for_each_safe(tmp, next, &desc->bd_page_list) {
188                 struct ptlrpc_bulk_page *bulk;
189                 bulk = list_entry(tmp, struct ptlrpc_bulk_page, bp_link);
190                 ptlrpc_free_bulk_page(bulk);
191         }
192
193         LASSERT(desc->bd_page_count == 0);
194         LASSERT((desc->bd_export != NULL) ^ (desc->bd_import != NULL));
195
196         if (desc->bd_export)
197                 class_export_put(desc->bd_export);
198         else
199                 class_import_put(desc->bd_import);
200
201         OBD_FREE(desc, sizeof(*desc));
202         EXIT;
203 }
204
205 void ptlrpc_free_bulk_page(struct ptlrpc_bulk_page *bulk)
206 {
207         LASSERT(bulk != NULL);
208
209         list_del(&bulk->bp_link);
210         bulk->bp_desc->bd_page_count--;
211         OBD_FREE(bulk, sizeof(*bulk));
212 }
213
214 struct ptlrpc_request *ptlrpc_prep_req(struct obd_import *imp, int opcode,
215                                        int count, int *lengths, char **bufs)
216 {
217         struct ptlrpc_request *request;
218         int rc;
219         ENTRY;
220
221         LASSERT((unsigned long)imp > 0x1000);
222
223         OBD_ALLOC(request, sizeof(*request));
224         if (!request) {
225                 CERROR("request allocation out of memory\n");
226                 RETURN(NULL);
227         }
228
229         rc = lustre_pack_request(request, count, lengths, bufs);
230         if (rc) {
231                 CERROR("cannot pack request %d\n", rc);
232                 OBD_FREE(request, sizeof(*request));
233                 RETURN(NULL);
234         }
235
236         if (imp->imp_server_timeout)
237                 request->rq_timeout = obd_timeout / 2;
238         else
239                 request->rq_timeout = obd_timeout;
240         request->rq_send_state = LUSTRE_IMP_FULL;
241         request->rq_type = PTL_RPC_MSG_REQUEST;
242         request->rq_import = class_import_get(imp);
243         request->rq_phase = RQ_PHASE_NEW;
244
245         /* XXX FIXME bug 249 */
246         request->rq_request_portal = imp->imp_client->cli_request_portal;
247         request->rq_reply_portal = imp->imp_client->cli_reply_portal;
248
249         request->rq_connection = ptlrpc_connection_addref(imp->imp_connection);
250
251         spin_lock_init(&request->rq_lock);
252         INIT_LIST_HEAD(&request->rq_list);
253         init_waitqueue_head(&request->rq_wait_for_rep);
254         request->rq_xid = ptlrpc_next_xid();
255         atomic_set(&request->rq_refcount, 1);
256
257         request->rq_reqmsg->opc = opcode;
258         request->rq_reqmsg->flags = 0;
259
260         RETURN(request);
261 }
262
263 struct ptlrpc_request_set *ptlrpc_prep_set(void)
264 {
265         struct ptlrpc_request_set *set;
266
267         OBD_ALLOC(set, sizeof *set);
268         if (!set)
269                 RETURN(NULL);
270         INIT_LIST_HEAD(&set->set_requests);
271         init_waitqueue_head(&set->set_waitq);
272         set->set_remaining = 0;
273         spin_lock_init(&set->set_new_req_lock);
274         INIT_LIST_HEAD(&set->set_new_requests);
275
276         RETURN(set);
277 }
278
279 /* Finish with this set; opposite of prep_set. */
280 void ptlrpc_set_destroy(struct ptlrpc_request_set *set)
281 {
282         struct list_head *tmp;
283         struct list_head *next;
284         int               expected_phase;
285         int               n = 0;
286         ENTRY;
287
288         /* Requests on the set should either all be completed, or all be new */
289         expected_phase = (set->set_remaining == 0) ?
290                          RQ_PHASE_COMPLETE : RQ_PHASE_NEW;
291         list_for_each (tmp, &set->set_requests) {
292                 struct ptlrpc_request *req =
293                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
294
295                 LASSERT(req->rq_phase == expected_phase);
296                 n++;
297         }
298
299         LASSERT(set->set_remaining == 0 || set->set_remaining == n);
300
301         list_for_each_safe(tmp, next, &set->set_requests) {
302                 struct ptlrpc_request *req =
303                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
304                 list_del_init(&req->rq_set_chain);
305
306                 LASSERT(req->rq_phase == expected_phase);
307
308                 if (req->rq_phase == RQ_PHASE_NEW) {
309
310                         if (req->rq_interpret_reply != NULL) {
311                                 int (*interpreter)(struct ptlrpc_request *,
312                                                    void *, int) =
313                                         req->rq_interpret_reply;
314
315                                 /* higher level (i.e. LOV) failed;
316                                  * let the sub reqs clean up */
317                                 req->rq_status = -EBADR;
318                                 interpreter(req, &req->rq_async_args,
319                                             req->rq_status);
320                         }
321                         set->set_remaining--;
322                 }
323
324                 req->rq_set = NULL;
325                 ptlrpc_req_finished (req);
326         }
327
328         LASSERT(set->set_remaining == 0);
329
330         OBD_FREE(set, sizeof(*set));
331         EXIT;
332 }
333
334 void ptlrpc_set_add_req(struct ptlrpc_request_set *set,
335                         struct ptlrpc_request *req)
336 {
337         /* The set takes over the caller's request reference */
338         list_add_tail(&req->rq_set_chain, &set->set_requests);
339         req->rq_set = set;
340         set->set_remaining++;
341 }
342
343 /* lock so many callers can add things, the context that owns the set
344  * is supposed to notice these and move them into the set proper. */
345 void ptlrpc_set_add_new_req(struct ptlrpc_request_set *set,
346                             struct ptlrpc_request *req)
347 {
348         unsigned long flags;
349         spin_lock_irqsave(&set->set_new_req_lock, flags);
350         /* The set takes over the caller's request reference */
351         list_add_tail(&req->rq_set_chain, &set->set_new_requests);
352         req->rq_set = set;
353         spin_unlock_irqrestore(&set->set_new_req_lock, flags);
354 }
355
356 /*
357  * Based on the current state of the import, determine if the request
358  * can be sent, is an error, or should be delayed.
359  *
360  * Returns true if this request should be delayed. If false, and
361  * *status is set, then the request can not be sent and *status is the
362  * error code.  If false and status is 0, then request can be sent.
363  *
364  * The imp->imp_lock must be held.
365  */
366 static int ptlrpc_import_delay_req(struct obd_import *imp, 
367                                    struct ptlrpc_request *req, int *status)
368 {
369         int delay = 0;
370         ENTRY;
371
372         LASSERT (status != NULL);
373         *status = 0;
374
375         /* A new import, or one that has been cleaned up.
376          */
377         if (imp->imp_state == LUSTRE_IMP_NEW) {
378                 DEBUG_REQ(D_ERROR, req, "Uninitialized import.");
379                 *status = -EIO;
380         }
381         /*
382          * If the import has been invalidated (such as by an OST failure), the
383          * request must fail with -EIO.  
384          */
385         else if (imp->imp_invalid) {
386                 DEBUG_REQ(D_ERROR, req, "IMP_INVALID");
387                 *status = -EIO;
388         } 
389         else if (req->rq_import_generation != imp->imp_generation) {
390                 DEBUG_REQ(D_ERROR, req, "req wrong generation:");
391                 *status = -EIO;
392         } 
393         else if (req->rq_send_state != imp->imp_state) {
394                 if (imp->imp_obd->obd_no_recov || imp->imp_dlm_fake) 
395                         *status = -EWOULDBLOCK;
396                 else
397                         delay = 1;
398         }
399
400         RETURN(delay);
401 }
402
403 static int ptlrpc_check_reply(struct ptlrpc_request *req)
404 {
405         unsigned long flags;
406         int rc = 0;
407         ENTRY;
408
409         /* serialise with network callback */
410         spin_lock_irqsave (&req->rq_lock, flags);
411
412         if (req->rq_replied) {
413                 DEBUG_REQ(D_NET, req, "REPLIED:");
414                 GOTO(out, rc = 1);
415         }
416
417         if (req->rq_err) {
418                 DEBUG_REQ(D_ERROR, req, "ABORTED:");
419                 GOTO(out, rc = 1);
420         }
421
422         if (req->rq_resend) {
423                 DEBUG_REQ(D_ERROR, req, "RESEND:");
424                 GOTO(out, rc = 1);
425         }
426
427         if (req->rq_restart) {
428                 DEBUG_REQ(D_ERROR, req, "RESTART:");
429                 GOTO(out, rc = 1);
430         }
431         EXIT;
432  out:
433         spin_unlock_irqrestore (&req->rq_lock, flags);
434         DEBUG_REQ(D_NET, req, "rc = %d for", rc);
435         return rc;
436 }
437
438 static int ptlrpc_check_status(struct ptlrpc_request *req)
439 {
440         int err;
441         ENTRY;
442
443         err = req->rq_repmsg->status;
444         if (req->rq_repmsg->type == PTL_RPC_MSG_ERR) {
445                 DEBUG_REQ(D_ERROR, req, "type == PTL_RPC_MSG_ERR");
446                 RETURN(err < 0 ? err : -EINVAL);
447         }
448
449         if (err < 0) {
450                 DEBUG_REQ(D_INFO, req, "status is %d", err);
451         } else if (err > 0) {
452                 /* XXX: translate this error from net to host */
453                 DEBUG_REQ(D_INFO, req, "status is %d", err);
454         }
455
456         RETURN(err);
457 }
458
459 static int after_reply(struct ptlrpc_request *req, int *restartp)
460 {
461         unsigned long flags;
462         struct obd_import *imp = req->rq_import;
463         int rc;
464         ENTRY;
465
466         LASSERT(!req->rq_receiving_reply);
467         LASSERT(req->rq_replied);
468
469         if (restartp != NULL)
470                 *restartp = 0;
471
472         /* NB Until this point, the whole of the incoming message,
473          * including buflens, status etc is in the sender's byte order. */
474
475 #if SWAB_PARANOIA
476         /* Clear reply swab mask; this is a new reply in sender's byte order */
477         req->rq_rep_swab_mask = 0;
478 #endif
479         rc = lustre_unpack_msg(req->rq_repmsg, req->rq_replen);
480         if (rc) {
481                 CERROR("unpack_rep failed: %d\n", rc);
482                 RETURN(-EPROTO);
483         }
484
485         if (req->rq_repmsg->type != PTL_RPC_MSG_REPLY &&
486             req->rq_repmsg->type != PTL_RPC_MSG_ERR) {
487                 CERROR("invalid packet type received (type=%u)\n",
488                        req->rq_repmsg->type);
489                 RETURN(-EPROTO);
490         }
491
492         /* Store transno in reqmsg for replay. */
493         req->rq_reqmsg->transno = req->rq_transno = req->rq_repmsg->transno;
494
495         rc = ptlrpc_check_status(req);
496
497         /* Either we've been evicted, or the server has failed for
498          * some reason. Try to reconnect, and if that fails, punt to the
499          * upcall. */
500         if (rc == -ENOTCONN) {
501                 if (req->rq_send_state != LUSTRE_IMP_FULL ||
502                     imp->imp_obd->obd_no_recov || imp->imp_dlm_fake) {
503                         RETURN(-ENOTCONN);
504                 }
505
506                 ptlrpc_request_handle_notconn(req);
507
508                 if (req->rq_err)
509                         RETURN(-EIO);
510
511                 if (req->rq_no_resend)
512                         RETURN(rc); /* -ENOTCONN */
513
514                 if (req->rq_resend) {
515                         if (restartp == NULL)
516                                 LBUG(); /* async resend not supported yet */
517                         spin_lock_irqsave (&req->rq_lock, flags);
518                         req->rq_resend = 0;
519                         spin_unlock_irqrestore (&req->rq_lock, flags);
520                         *restartp = 1;
521                         lustre_msg_add_flags(req->rq_reqmsg, MSG_RESENT);
522                         DEBUG_REQ(D_HA, req, "resending: ");
523                         RETURN(0);
524                 }
525
526                 CERROR("request should be err or resend: %p\n", req);
527                 LBUG();
528         }
529
530         if (req->rq_import->imp_replayable) {
531                 spin_lock_irqsave(&imp->imp_lock, flags);
532                 if (req->rq_replay || req->rq_transno != 0)
533                         ptlrpc_retain_replayable_request(req, imp);
534                 else if (req->rq_commit_cb != NULL)
535                         req->rq_commit_cb(req);
536
537                 if (req->rq_transno > imp->imp_max_transno)
538                         imp->imp_max_transno = req->rq_transno;
539
540                 /* Replay-enabled imports return commit-status information. */
541                 if (req->rq_repmsg->last_committed)
542                         imp->imp_peer_committed_transno =
543                                 req->rq_repmsg->last_committed;
544                 ptlrpc_free_committed(imp);
545                 spin_unlock_irqrestore(&imp->imp_lock, flags);
546         }
547
548         RETURN(rc);
549 }
550
551 static int ptlrpc_send_new_req(struct ptlrpc_request *req)
552 {
553         struct obd_import     *imp;
554         unsigned long          flags;
555         int rc;
556         ENTRY;
557
558         LASSERT(req->rq_send_state == LUSTRE_IMP_FULL);
559         LASSERT(req->rq_phase == RQ_PHASE_NEW);
560         req->rq_phase = RQ_PHASE_RPC;
561
562         imp = req->rq_import;
563         spin_lock_irqsave(&imp->imp_lock, flags);
564
565         if (imp->imp_invalid) {
566                 spin_unlock_irqrestore(&imp->imp_lock, flags);
567                 req->rq_status = -EIO;
568                 req->rq_phase = RQ_PHASE_INTERPRET;
569                 RETURN(-EIO);
570         }
571
572         req->rq_import_generation = imp->imp_generation;
573
574         if (ptlrpc_import_delay_req(imp, req, &rc)) {
575                 spin_lock (&req->rq_lock);
576                 req->rq_waiting = 1;
577                 spin_unlock (&req->rq_lock);
578
579                 LASSERT(list_empty (&req->rq_list));
580
581                 // list_del(&req->rq_list);
582                 list_add_tail(&req->rq_list, &imp->imp_delayed_list);
583                 spin_unlock_irqrestore(&imp->imp_lock, flags);
584                 RETURN(0);
585         }
586
587         if (rc != 0) {
588                 spin_unlock_irqrestore(&imp->imp_lock, flags);
589                 req->rq_status = rc;
590                 req->rq_phase = RQ_PHASE_INTERPRET;
591                 RETURN(rc);
592         }
593
594         /* XXX this is the same as ptlrpc_queue_wait */
595         LASSERT(list_empty(&req->rq_list));
596         list_add_tail(&req->rq_list, &imp->imp_sending_list);
597         spin_unlock_irqrestore(&imp->imp_lock, flags);
598
599         req->rq_reqmsg->status = current->pid;
600         CDEBUG(D_RPCTRACE, "Sending RPC pname:cluuid:pid:xid:ni:nid:opc"
601                " %s:%s:%d:"LPU64":%s:"LPX64":%d\n", current->comm,
602                imp->imp_obd->obd_uuid.uuid, req->rq_reqmsg->status,
603                req->rq_xid,
604                imp->imp_connection->c_peer.peer_ni->pni_name,
605                imp->imp_connection->c_peer.peer_nid,
606                req->rq_reqmsg->opc);
607
608         rc = ptl_send_rpc(req);
609         if (rc) {
610                 DEBUG_REQ(D_HA, req, "send failed (%d); expect timeout", rc);
611                 req->rq_timeout = 1;
612                 RETURN(rc);
613         }
614         RETURN(0);
615 }
616
617 int ptlrpc_check_set(struct ptlrpc_request_set *set)
618 {
619         unsigned long flags;
620         struct list_head *tmp;
621         int sending_error = 0;
622         ENTRY;
623
624         if (set->set_remaining == 0)
625                 RETURN(1);
626
627         list_for_each(tmp, &set->set_requests) {
628                 struct ptlrpc_request *req =
629                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
630                 struct obd_import *imp = req->rq_import;
631                 int rc = 0;
632
633                 if (req->rq_phase == RQ_PHASE_NEW &&
634                     ptlrpc_send_new_req(req)) {
635                         sending_error = 1;
636                 }
637
638                 if (!(req->rq_phase == RQ_PHASE_RPC ||
639                       req->rq_phase == RQ_PHASE_BULK ||
640                       req->rq_phase == RQ_PHASE_INTERPRET ||
641                       req->rq_phase == RQ_PHASE_COMPLETE)) {
642                         DEBUG_REQ(D_ERROR, req, "bad phase %x", req->rq_phase);
643                         LBUG();
644                 }
645
646                 if (req->rq_phase == RQ_PHASE_COMPLETE)
647                         continue;
648
649                 if (req->rq_phase == RQ_PHASE_INTERPRET)
650                         GOTO(interpret, req->rq_status);
651
652                 if (req->rq_err) {
653                         ptlrpc_unregister_reply(req);
654                         if (req->rq_status == 0)
655                                 req->rq_status = -EIO;
656                         req->rq_phase = RQ_PHASE_INTERPRET;
657
658                         spin_lock_irqsave(&imp->imp_lock, flags);
659                         list_del_init(&req->rq_list);
660                         spin_unlock_irqrestore(&imp->imp_lock, flags);
661
662                         GOTO(interpret, req->rq_status);
663                 }
664
665                 if (req->rq_intr) {
666                         /* NB could be on delayed list */
667                         ptlrpc_unregister_reply(req);
668                         req->rq_status = -EINTR;
669                         req->rq_phase = RQ_PHASE_INTERPRET;
670
671                         spin_lock_irqsave(&imp->imp_lock, flags);
672                         list_del_init(&req->rq_list);
673                         spin_unlock_irqrestore(&imp->imp_lock, flags);
674
675                         GOTO(interpret, req->rq_status);
676                 }
677
678                 if (req->rq_phase == RQ_PHASE_RPC) {
679                         int do_restart = 0;
680                         if (req->rq_waiting || req->rq_resend) {
681                                 int status;
682                                 spin_lock_irqsave(&imp->imp_lock, flags);
683
684                                 if (ptlrpc_import_delay_req(imp, req, &status)) {
685                                         spin_unlock_irqrestore(&imp->imp_lock,
686                                                                flags);
687                                         continue;
688                                 } 
689
690                                 list_del(&req->rq_list);
691                                 list_add_tail(&req->rq_list,
692                                               &imp->imp_sending_list);
693
694                                 if (status != 0)  {
695                                         req->rq_status = status;
696                                         req->rq_phase = RQ_PHASE_INTERPRET;
697                                         spin_unlock_irqrestore(&imp->imp_lock,
698                                                                flags);
699                                         GOTO(interpret, req->rq_status);
700                                 }
701                                 spin_unlock_irqrestore(&imp->imp_lock, flags);
702
703                                 req->rq_waiting = 0;
704                                 if (req->rq_resend) {
705                                         lustre_msg_add_flags(req->rq_reqmsg,
706                                                              MSG_RESENT);
707                                         spin_lock_irqsave(&req->rq_lock, flags);
708                                         req->rq_resend = 0;
709                                         spin_unlock_irqrestore(&req->rq_lock,
710                                                                flags);
711
712                                         ptlrpc_unregister_reply(req);
713                                         if (req->rq_bulk) {
714                                                 __u64 old_xid = req->rq_xid;
715                                                 ptlrpc_unregister_bulk(req);
716                                                 /* ensure previous bulk fails */
717                                                 req->rq_xid = ptlrpc_next_xid();
718                                                 CDEBUG(D_HA, "resend bulk "
719                                                        "old x"LPU64
720                                                        " new x"LPU64"\n",
721                                                        old_xid, req->rq_xid);
722                                         }
723                                 }
724
725                                 rc = ptl_send_rpc(req);
726                                 if (rc) {
727                                         DEBUG_REQ(D_HA, req, "send failed (%d)",
728                                                   rc);
729                                         sending_error = 1;
730                                         req->rq_timeout = 0;
731                                 }
732                         }
733
734                         /* Ensure the network callback returned */
735                         spin_lock_irqsave (&req->rq_lock, flags);
736                         if (!req->rq_replied) {
737                                 spin_unlock_irqrestore (&req->rq_lock, flags);
738                                 continue;
739                         }
740                         spin_unlock_irqrestore (&req->rq_lock, flags);
741
742                         spin_lock_irqsave(&imp->imp_lock, flags);
743                         list_del_init(&req->rq_list);
744                         spin_unlock_irqrestore(&imp->imp_lock, flags);
745
746                         req->rq_status = after_reply(req, &do_restart);
747                         if (do_restart) {
748                                 spin_lock_irqsave (&req->rq_lock, flags);
749                                 req->rq_resend = 1; /* ugh */
750                                 spin_unlock_irqrestore (&req->rq_lock, flags);
751                                 continue;
752                         }
753
754                         /* If there is no bulk associated with this request,
755                          * then we're done and should let the interpreter
756                          * process the reply.  Similarly if the RPC returned
757                          * an error, and therefore the bulk will never arrive.
758                          */
759                         if (req->rq_bulk == NULL || req->rq_status != 0) {
760                                 req->rq_phase = RQ_PHASE_INTERPRET;
761                                 GOTO(interpret, req->rq_status);
762                         }
763
764                         req->rq_phase = RQ_PHASE_BULK;
765                 }
766
767                 LASSERT(req->rq_phase == RQ_PHASE_BULK);
768                 if (!ptlrpc_bulk_complete (req->rq_bulk))
769                         continue;
770
771                 req->rq_phase = RQ_PHASE_INTERPRET;
772
773         interpret:
774                 LASSERT(req->rq_phase == RQ_PHASE_INTERPRET);
775                 LASSERT(!req->rq_receiving_reply);
776
777                 ptlrpc_unregister_reply(req);
778                 if (req->rq_bulk != NULL)
779                         ptlrpc_unregister_bulk (req);
780
781                 if (req->rq_interpret_reply != NULL) {
782                         int (*interpreter)(struct ptlrpc_request *,void *,int) =
783                                 req->rq_interpret_reply;
784                         req->rq_status = interpreter(req, &req->rq_async_args,
785                                                      req->rq_status);
786                 }
787
788                 CDEBUG(D_RPCTRACE, "Completed RPC pname:cluuid:pid:xid:ni:nid:"
789                        "opc %s:%s:%d:"LPU64":%s:"LPX64":%d\n", current->comm,
790                        imp->imp_obd->obd_uuid.uuid, req->rq_reqmsg->status,
791                        req->rq_xid,
792                        imp->imp_connection->c_peer.peer_ni->pni_name,
793                        imp->imp_connection->c_peer.peer_nid,
794                        req->rq_reqmsg->opc);
795
796                 req->rq_phase = RQ_PHASE_COMPLETE;
797                 set->set_remaining--;
798         }
799
800         /* If we hit an error, we want to recover promptly. */
801         RETURN(set->set_remaining == 0 || sending_error);
802 }
803
804 int ptlrpc_expire_one_request(struct ptlrpc_request *req)
805 {
806         unsigned long      flags;
807         struct obd_import *imp = req->rq_import;
808         ENTRY;
809
810         DEBUG_REQ(D_ERROR, req, "timeout");
811
812         spin_lock_irqsave (&req->rq_lock, flags);
813         req->rq_timedout = 1;
814         spin_unlock_irqrestore (&req->rq_lock, flags);
815
816         ptlrpc_unregister_reply (req);
817
818         if (imp == NULL) {
819                 DEBUG_REQ(D_HA, req, "NULL import: already cleaned up?");
820                 RETURN(1);
821         }
822
823         /* The DLM server doesn't want recovery run on its imports. */
824         if (imp->imp_dlm_fake)
825                 RETURN(1);
826
827         /* If this request is for recovery or other primordial tasks,
828          * don't go back to sleep, and don't start recovery again.. */
829         if (req->rq_send_state != LUSTRE_IMP_FULL || imp->imp_obd->obd_no_recov)
830                 RETURN(1);
831
832         ptlrpc_fail_import(imp, req->rq_import_generation);
833
834         RETURN(0);
835 }
836
837 int ptlrpc_expired_set(void *data)
838 {
839         struct ptlrpc_request_set *set = data;
840         struct list_head          *tmp;
841         time_t                     now = LTIME_S (CURRENT_TIME);
842         ENTRY;
843
844         LASSERT(set != NULL);
845
846         /* A timeout expired; see which reqs it applies to... */
847         list_for_each (tmp, &set->set_requests) {
848                 struct ptlrpc_request *req =
849                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
850
851                 /* request in-flight? */
852                 if (!((req->rq_phase == RQ_PHASE_RPC && !req->rq_waiting) ||
853                       (req->rq_phase == RQ_PHASE_BULK)))
854                         continue;
855
856                 if (req->rq_timedout ||           /* already dealt with */
857                     req->rq_sent + req->rq_timeout > now) /* not expired */
858                         continue;
859
860                 /* deal with this guy */
861                 ptlrpc_expire_one_request (req);
862         }
863
864         /* When waiting for a whole set, we always to break out of the
865          * sleep so we can recalculate the timeout, or enable interrupts
866          * iff everyone's timed out.
867          */
868         RETURN(1);
869 }
870
871 void ptlrpc_interrupted_set(void *data)
872 {
873         struct ptlrpc_request_set *set = data;
874         struct list_head *tmp;
875         unsigned long flags;
876
877         LASSERT(set != NULL);
878         CERROR("INTERRUPTED SET %p\n", set);
879
880         list_for_each(tmp, &set->set_requests) {
881                 struct ptlrpc_request *req =
882                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
883
884                 if (req->rq_phase != RQ_PHASE_RPC)
885                         continue;
886
887                 spin_lock_irqsave (&req->rq_lock, flags);
888                 req->rq_intr = 1;
889                 spin_unlock_irqrestore (&req->rq_lock, flags);
890         }
891 }
892
893 int ptlrpc_set_next_timeout(struct ptlrpc_request_set *set)
894 {
895         struct list_head      *tmp;
896         time_t                 now = LTIME_S(CURRENT_TIME);
897         time_t                 deadline;
898         int                    timeout = 0;
899         struct ptlrpc_request *req;
900         ENTRY;
901
902         SIGNAL_MASK_ASSERT(); /* XXX BUG 1511 */
903
904         list_for_each(tmp, &set->set_requests) {
905                 req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
906
907                 /* request in-flight? */
908                 if (!((req->rq_phase == RQ_PHASE_RPC && !req->rq_waiting) ||
909                       (req->rq_phase == RQ_PHASE_BULK)))
910                         continue;
911
912                 if (req->rq_timedout)   /* already timed out */
913                         continue;
914
915                 deadline = req->rq_sent + req->rq_timeout;
916                 if (deadline <= now)    /* actually expired already */
917                         timeout = 1;    /* ASAP */
918                 else if (timeout == 0 || timeout > deadline - now)
919                         timeout = deadline - now;
920         }
921         RETURN(timeout);
922 }
923                 
924
925 int ptlrpc_set_wait(struct ptlrpc_request_set *set)
926 {
927         struct list_head      *tmp;
928         struct ptlrpc_request *req;
929         struct l_wait_info     lwi;
930         int                    rc, timeout;
931         ENTRY;
932
933         LASSERT(!list_empty(&set->set_requests));
934         list_for_each(tmp, &set->set_requests) {
935                 req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
936                 (void)ptlrpc_send_new_req(req);
937         }
938
939         do {
940                 timeout = ptlrpc_set_next_timeout(set);
941
942                 /* wait until all complete, interrupted, or an in-flight
943                  * req times out */
944                 CDEBUG(D_HA, "set %p going to sleep for %d seconds\n",
945                        set, timeout);
946                 lwi = LWI_TIMEOUT_INTR((timeout ? timeout : 1) * HZ,
947                                        ptlrpc_expired_set, 
948                                        ptlrpc_interrupted_set, set);
949                 rc = l_wait_event(set->set_waitq, ptlrpc_check_set(set), &lwi);
950
951                 LASSERT(rc == 0 || rc == -EINTR || rc == -ETIMEDOUT);
952
953                 /* -EINTR => all requests have been flagged rq_intr so next
954                  * check completes.
955                  * -ETIMEOUTD => someone timed out.  When all reqs have
956                  * timed out, signals are enabled allowing completion with
957                  * EINTR.
958                  * I don't really care if we go once more round the loop in
959                  * the error cases -eeb. */
960         } while (rc != 0);
961
962         LASSERT(set->set_remaining == 0);
963
964         rc = 0;
965         list_for_each(tmp, &set->set_requests) {
966                 req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
967
968                 LASSERT(req->rq_phase == RQ_PHASE_COMPLETE);
969                 if (req->rq_status != 0)
970                         rc = req->rq_status;
971         }
972
973         if (set->set_interpret != NULL) {
974                 int (*interpreter)(struct ptlrpc_request_set *set,void *,int) =
975                         set->set_interpret;
976                 rc = interpreter (set, &set->set_args, rc);
977         }
978
979         RETURN(rc);
980 }
981
982 static void __ptlrpc_free_req(struct ptlrpc_request *request, int locked)
983 {
984         ENTRY;
985         if (request == NULL) {
986                 EXIT;
987                 return;
988         }
989
990         LASSERT(!request->rq_receiving_reply);
991
992         /* We must take it off the imp_replay_list first.  Otherwise, we'll set
993          * request->rq_reqmsg to NULL while osc_close is dereferencing it. */
994         if (request->rq_import != NULL) {
995                 unsigned long flags = 0;
996                 if (!locked)
997                         spin_lock_irqsave(&request->rq_import->imp_lock, flags);
998                 list_del_init(&request->rq_list);
999                 if (!locked)
1000                         spin_unlock_irqrestore(&request->rq_import->imp_lock,
1001                                                flags);
1002         }
1003
1004         if (atomic_read(&request->rq_refcount) != 0) {
1005                 DEBUG_REQ(D_ERROR, request,
1006                           "freeing request with nonzero refcount");
1007                 LBUG();
1008         }
1009
1010         if (request->rq_repmsg != NULL) {
1011                 OBD_FREE(request->rq_repmsg, request->rq_replen);
1012                 request->rq_repmsg = NULL;
1013         }
1014         if (request->rq_reqmsg != NULL) {
1015                 OBD_FREE(request->rq_reqmsg, request->rq_reqlen);
1016                 request->rq_reqmsg = NULL;
1017         }
1018         if (request->rq_export != NULL) {
1019                 class_export_put(request->rq_export);
1020                 request->rq_export = NULL;
1021         }
1022         if (request->rq_import != NULL) {
1023                 class_import_put(request->rq_import);
1024                 request->rq_import = NULL;
1025         }
1026         if (request->rq_bulk != NULL)
1027                 ptlrpc_free_bulk(request->rq_bulk);
1028
1029         ptlrpc_put_connection(request->rq_connection);
1030         OBD_FREE(request, sizeof(*request));
1031         EXIT;
1032 }
1033
1034 void ptlrpc_free_req(struct ptlrpc_request *request)
1035 {
1036         __ptlrpc_free_req(request, 0);
1037 }
1038
1039 static int __ptlrpc_req_finished(struct ptlrpc_request *request, int locked);
1040 void ptlrpc_req_finished_with_imp_lock(struct ptlrpc_request *request)
1041 {
1042 #ifdef CONFIG_SMP
1043         LASSERT(spin_is_locked(&request->rq_import->imp_lock));
1044 #endif
1045         (void)__ptlrpc_req_finished(request, 1);
1046 }
1047
1048 static int __ptlrpc_req_finished(struct ptlrpc_request *request, int locked)
1049 {
1050         ENTRY;
1051         if (request == NULL)
1052                 RETURN(1);
1053
1054         if (request == (void *)(unsigned long)(0x5a5a5a5a5a5a5a5a) ||
1055             request->rq_reqmsg == (void *)(unsigned long)(0x5a5a5a5a5a5a5a5a)) {
1056                 CERROR("dereferencing freed request (bug 575)\n");
1057                 LBUG();
1058                 RETURN(1);
1059         }
1060
1061         DEBUG_REQ(D_INFO, request, "refcount now %u",
1062                   atomic_read(&request->rq_refcount) - 1);
1063
1064         if (atomic_dec_and_test(&request->rq_refcount)) {
1065                 __ptlrpc_free_req(request, locked);
1066                 RETURN(1);
1067         }
1068
1069         RETURN(0);
1070 }
1071
1072 void ptlrpc_req_finished(struct ptlrpc_request *request)
1073 {
1074         __ptlrpc_req_finished(request, 0);
1075 }
1076
1077 static void ptlrpc_cleanup_request_buf(struct ptlrpc_request *request)
1078 {
1079         OBD_FREE(request->rq_reqmsg, request->rq_reqlen);
1080         request->rq_reqmsg = NULL;
1081         request->rq_reqlen = 0;
1082 }
1083
1084 /* Disengage the client's reply buffer from the network
1085  * NB does _NOT_ unregister any client-side bulk.
1086  * IDEMPOTENT, but _not_ safe against concurrent callers.
1087  * The request owner (i.e. the thread doing the I/O) must call...
1088  */
1089 void ptlrpc_unregister_reply (struct ptlrpc_request *request)
1090 {
1091         unsigned long flags;
1092         int           rc;
1093         ENTRY;
1094
1095         LASSERT(!in_interrupt ());             /* might sleep */
1096
1097         spin_lock_irqsave (&request->rq_lock, flags);
1098         if (!request->rq_receiving_reply) {     /* not waiting for a reply */
1099                 spin_unlock_irqrestore (&request->rq_lock, flags);
1100                 EXIT;
1101                 /* NB reply buffer not freed here */
1102                 return;
1103         }
1104
1105         LASSERT(!request->rq_replied);         /* callback hasn't completed */
1106         spin_unlock_irqrestore (&request->rq_lock, flags);
1107
1108         rc = PtlMDUnlink (request->rq_reply_md_h);
1109         switch (rc) {
1110         default:
1111                 LBUG ();
1112
1113         case PTL_OK:                            /* unlinked before completion */
1114                 LASSERT(request->rq_receiving_reply);
1115                 LASSERT(!request->rq_replied);
1116                 spin_lock_irqsave (&request->rq_lock, flags);
1117                 request->rq_receiving_reply = 0;
1118                 spin_unlock_irqrestore (&request->rq_lock, flags);
1119                 OBD_FREE(request->rq_repmsg, request->rq_replen);
1120                 request->rq_repmsg = NULL;
1121                 EXIT;
1122                 return;
1123
1124         case PTL_MD_INUSE:                      /* callback in progress */
1125                 for (;;) {
1126                         /* Network access will complete in finite time but
1127                          * the timeout lets us CERROR for visibility */
1128                         struct l_wait_info lwi = LWI_TIMEOUT(10*HZ, NULL, NULL);
1129
1130                         rc = l_wait_event (request->rq_wait_for_rep,
1131                                            request->rq_replied, &lwi);
1132                         LASSERT(rc == 0 || rc == -ETIMEDOUT);
1133                         if (rc == 0) {
1134                                 spin_lock_irqsave (&request->rq_lock, flags);
1135                                 /* Ensure the callback has completed scheduling
1136                                  * me and taken its hands off the request */
1137                                 spin_unlock_irqrestore(&request->rq_lock,flags);
1138                                 break;
1139                         }
1140
1141                         CERROR ("Unexpectedly long timeout: req %p\n", request);
1142                 }
1143                 /* fall through */
1144
1145         case PTL_INV_MD:                        /* callback completed */
1146                 LASSERT(!request->rq_receiving_reply);
1147                 LASSERT(request->rq_replied);
1148                 EXIT;
1149                 return;
1150         }
1151         /* Not Reached */
1152 }
1153
1154 /* caller must hold imp->imp_lock */
1155 void ptlrpc_free_committed(struct obd_import *imp)
1156 {
1157         struct list_head *tmp, *saved;
1158         struct ptlrpc_request *req;
1159         struct ptlrpc_request *last_req = NULL; /* temporary fire escape */
1160         ENTRY;
1161
1162         LASSERT(imp != NULL);
1163
1164 #ifdef CONFIG_SMP
1165         LASSERT(spin_is_locked(&imp->imp_lock));
1166 #endif
1167
1168         CDEBUG(D_HA, "%s: committing for last_committed "LPU64"\n",
1169                imp->imp_obd->obd_name, imp->imp_peer_committed_transno);
1170
1171         list_for_each_safe(tmp, saved, &imp->imp_replay_list) {
1172                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
1173
1174                 /* XXX ok to remove when 1357 resolved - rread 05/29/03  */
1175                 LASSERT(req != last_req);
1176                 last_req = req;
1177
1178                 if (req->rq_import_generation < imp->imp_generation) {
1179                         DEBUG_REQ(D_HA, req, "freeing request with old gen");
1180                         GOTO(free_req, 0);
1181                 }
1182
1183                 if (req->rq_replay) {
1184                         DEBUG_REQ(D_HA, req, "keeping (FL_REPLAY)");
1185                         continue;
1186                 }
1187
1188                 /* not yet committed */
1189                 if (req->rq_transno > imp->imp_peer_committed_transno) {
1190                         DEBUG_REQ(D_HA, req, "stopping search");
1191                         break;
1192                 }
1193
1194                 DEBUG_REQ(D_HA, req, "committing (last_committed "LPU64")",
1195                           imp->imp_peer_committed_transno);
1196 free_req:
1197                 if (req->rq_commit_cb != NULL)
1198                         req->rq_commit_cb(req);
1199                 list_del_init(&req->rq_list);
1200                 __ptlrpc_req_finished(req, 1);
1201         }
1202
1203         EXIT;
1204         return;
1205 }
1206
1207 void ptlrpc_cleanup_client(struct obd_import *imp)
1208 {
1209         ENTRY;
1210         EXIT;
1211         return;
1212 }
1213
1214 void ptlrpc_resend_req(struct ptlrpc_request *req)
1215 {
1216         unsigned long flags;
1217
1218         DEBUG_REQ(D_HA, req, "resending");
1219         req->rq_reqmsg->handle.cookie = 0;
1220         ptlrpc_put_connection(req->rq_connection);
1221         req->rq_connection =
1222                 ptlrpc_connection_addref(req->rq_import->imp_connection);
1223         req->rq_status = -EAGAIN;
1224
1225         spin_lock_irqsave (&req->rq_lock, flags);
1226         req->rq_resend = 1;
1227         req->rq_timedout = 0;
1228         if (req->rq_set != NULL)
1229                 wake_up (&req->rq_set->set_waitq);
1230         else
1231                 wake_up(&req->rq_wait_for_rep);
1232         spin_unlock_irqrestore (&req->rq_lock, flags);
1233 }
1234
1235 /* XXX: this function and rq_status are currently unused */
1236 void ptlrpc_restart_req(struct ptlrpc_request *req)
1237 {
1238         unsigned long flags;
1239
1240         DEBUG_REQ(D_HA, req, "restarting (possibly-)completed request");
1241         req->rq_status = -ERESTARTSYS;
1242
1243         spin_lock_irqsave (&req->rq_lock, flags);
1244         req->rq_restart = 1;
1245         req->rq_timedout = 0;
1246         if (req->rq_set != NULL)
1247                 wake_up (&req->rq_set->set_waitq);
1248         else
1249                 wake_up(&req->rq_wait_for_rep);
1250         spin_unlock_irqrestore (&req->rq_lock, flags);
1251 }
1252
1253 static int expired_request(void *data)
1254 {
1255         struct ptlrpc_request *req = data;
1256         ENTRY;
1257
1258         RETURN(ptlrpc_expire_one_request(req));
1259 }
1260
1261 static void interrupted_request(void *data)
1262 {
1263         unsigned long flags;
1264
1265         struct ptlrpc_request *req = data;
1266         DEBUG_REQ(D_HA, req, "request interrupted");
1267         spin_lock_irqsave (&req->rq_lock, flags);
1268         req->rq_intr = 1;
1269         spin_unlock_irqrestore (&req->rq_lock, flags);
1270 }
1271
1272 struct ptlrpc_request *ptlrpc_request_addref(struct ptlrpc_request *req)
1273 {
1274         ENTRY;
1275         atomic_inc(&req->rq_refcount);
1276         RETURN(req);
1277 }
1278
1279 void ptlrpc_retain_replayable_request(struct ptlrpc_request *req,
1280                                       struct obd_import *imp)
1281 {
1282         struct list_head *tmp;
1283
1284 #ifdef CONFIG_SMP
1285         LASSERT(spin_is_locked(&imp->imp_lock));
1286 #endif
1287
1288         LASSERT(imp->imp_replayable);
1289         /* Balanced in ptlrpc_free_committed, usually. */
1290         ptlrpc_request_addref(req);
1291         list_for_each_prev(tmp, &imp->imp_replay_list) {
1292                 struct ptlrpc_request *iter =
1293                         list_entry(tmp, struct ptlrpc_request, rq_list);
1294
1295                 /* We may have duplicate transnos if we create and then
1296                  * open a file, or for closes retained if to match creating
1297                  * opens, so use req->rq_xid as a secondary key.
1298                  * (See bugs 684, 685, and 428.)
1299                  * XXX no longer needed, but all opens need transnos!
1300                  */
1301                 if (iter->rq_transno > req->rq_transno)
1302                         continue;
1303
1304                 if (iter->rq_transno == req->rq_transno) {
1305                         LASSERT(iter->rq_xid != req->rq_xid);
1306                         if (iter->rq_xid > req->rq_xid)
1307                                 continue;
1308                 }
1309
1310                 list_add(&req->rq_list, &iter->rq_list);
1311                 return;
1312         }
1313
1314         list_add_tail(&req->rq_list, &imp->imp_replay_list);
1315 }
1316
1317 int ptlrpc_queue_wait(struct ptlrpc_request *req)
1318 {
1319         int rc = 0;
1320         int brc;
1321         struct l_wait_info lwi;
1322         struct obd_import *imp = req->rq_import;
1323         unsigned long flags;
1324         int do_restart = 0;
1325         int timeout = 0;
1326         ENTRY;
1327
1328         LASSERT(req->rq_set == NULL);
1329         LASSERT(!req->rq_receiving_reply);
1330
1331         /* for distributed debugging */
1332         req->rq_reqmsg->status = current->pid;
1333         LASSERT(imp->imp_obd != NULL);
1334         CDEBUG(D_RPCTRACE, "Sending RPC pname:cluuid:pid:xid:ni:nid:opc "
1335                "%s:%s:%d:"LPU64":%s:"LPX64":%d\n", current->comm,
1336                imp->imp_obd->obd_uuid.uuid,
1337                req->rq_reqmsg->status, req->rq_xid,
1338                imp->imp_connection->c_peer.peer_ni->pni_name,
1339                imp->imp_connection->c_peer.peer_nid,
1340                req->rq_reqmsg->opc);
1341
1342         /* Mark phase here for a little debug help */
1343         req->rq_phase = RQ_PHASE_RPC;
1344
1345         spin_lock_irqsave(&imp->imp_lock, flags);
1346         req->rq_import_generation = imp->imp_generation;
1347 restart:
1348         if (ptlrpc_import_delay_req(imp, req, &rc)) {
1349                 list_del(&req->rq_list);
1350
1351                 list_add_tail(&req->rq_list, &imp->imp_delayed_list);
1352                 spin_unlock_irqrestore(&imp->imp_lock, flags);
1353
1354                 DEBUG_REQ(D_HA, req, "\"%s\" waiting for recovery: (%d > %d)",
1355                           current->comm, req->rq_send_state, imp->imp_state);
1356                 lwi = LWI_INTR(interrupted_request, req);
1357                 rc = l_wait_event(req->rq_wait_for_rep,
1358                                   (req->rq_send_state == imp->imp_state ||
1359                                    req->rq_err),
1360                                   &lwi);
1361                 DEBUG_REQ(D_HA, req, "\"%s\" awake: (%d > %d or %d == 1)",
1362                           current->comm, imp->imp_state, req->rq_send_state,
1363                           req->rq_err);
1364
1365                 spin_lock_irqsave(&imp->imp_lock, flags);
1366                 list_del_init(&req->rq_list);
1367
1368                 if (req->rq_err) {
1369                         rc = -EIO;
1370                 } 
1371                 else if (req->rq_intr) {
1372                         rc = -EINTR;
1373                 }
1374                 else {
1375                         GOTO(restart, rc);
1376                 }
1377         } 
1378
1379         if (rc != 0) {
1380                 list_del_init(&req->rq_list);
1381                 spin_unlock_irqrestore(&imp->imp_lock, flags);
1382                 req->rq_status = rc; // XXX this ok?
1383                 GOTO(out, rc);
1384         }
1385
1386         /* XXX this is the same as ptlrpc_set_wait */
1387         LASSERT(list_empty(&req->rq_list));
1388         list_add_tail(&req->rq_list, &imp->imp_sending_list);
1389         spin_unlock_irqrestore(&imp->imp_lock, flags);
1390
1391         rc = ptl_send_rpc(req);
1392         if (rc) {
1393                 DEBUG_REQ(D_HA, req, "send failed (%d); recovering", rc);
1394                 timeout = 1;
1395         } else {
1396                 timeout = MAX(req->rq_timeout * HZ, 1);
1397                 DEBUG_REQ(D_NET, req, "-- sleeping");
1398         }
1399         lwi = LWI_TIMEOUT_INTR(timeout, expired_request, interrupted_request,
1400                                req);
1401         l_wait_event(req->rq_wait_for_rep, ptlrpc_check_reply(req), &lwi);
1402         DEBUG_REQ(D_NET, req, "-- done sleeping");
1403
1404         CDEBUG(D_RPCTRACE, "Completed RPC pname:cluuid:pid:xid:ni:nid:opc "
1405                "%s:%s:%d:"LPU64":%s:"LPX64":%d\n", current->comm,
1406                imp->imp_obd->obd_uuid.uuid,
1407                req->rq_reqmsg->status, req->rq_xid,
1408                imp->imp_connection->c_peer.peer_ni->pni_name,
1409                imp->imp_connection->c_peer.peer_nid,
1410                req->rq_reqmsg->opc);
1411
1412         spin_lock_irqsave(&imp->imp_lock, flags);
1413         list_del_init(&req->rq_list);
1414         spin_unlock_irqrestore(&imp->imp_lock, flags);
1415
1416         /* If the reply was received normally, this just grabs the spinlock
1417          * (ensuring the reply callback has returned), sees that
1418          * req->rq_receiving_reply is clear and returns. */
1419         ptlrpc_unregister_reply (req);
1420
1421         if (req->rq_err)
1422                 GOTO(out, rc = -EIO);
1423
1424         /* Resend if we need to, unless we were interrupted. */
1425         if (req->rq_resend && !req->rq_intr) {
1426                 /* ...unless we were specifically told otherwise. */
1427                 if (req->rq_no_resend)
1428                         GOTO(out, rc = -ETIMEDOUT);
1429                 spin_lock_irqsave (&req->rq_lock, flags);
1430                 req->rq_resend = 0;
1431                 spin_unlock_irqrestore (&req->rq_lock, flags);
1432                 lustre_msg_add_flags(req->rq_reqmsg, MSG_RESENT);
1433
1434                 if (req->rq_bulk != NULL)
1435                         ptlrpc_unregister_bulk (req);
1436
1437                 DEBUG_REQ(D_HA, req, "resending: ");
1438                 spin_lock_irqsave(&imp->imp_lock, flags);
1439                 goto restart;
1440         }
1441
1442         if (req->rq_intr) {
1443                 /* Should only be interrupted if we timed out. */
1444                 if (!req->rq_timedout)
1445                         DEBUG_REQ(D_ERROR, req,
1446                                   "rq_intr set but rq_timedout not");
1447                 GOTO(out, rc = -EINTR);
1448         }
1449
1450         if (req->rq_timedout) {                 /* non-recoverable timeout */
1451                 GOTO(out, rc = -ETIMEDOUT);
1452         }
1453
1454         if (!req->rq_replied) {
1455                 /* How can this be? -eeb */
1456                 DEBUG_REQ(D_ERROR, req, "!rq_replied: ");
1457                 LBUG();
1458                 GOTO(out, rc = req->rq_status);
1459         }
1460
1461         rc = after_reply (req, &do_restart);
1462         /* NB may return +ve success rc */
1463         if (do_restart) {
1464                 if (req->rq_bulk != NULL)
1465                         ptlrpc_unregister_bulk (req);
1466                 DEBUG_REQ(D_HA, req, "resending: ");
1467                 spin_lock_irqsave(&imp->imp_lock, flags);
1468                 goto restart;
1469         }
1470
1471  out:
1472         if (req->rq_bulk != NULL) {
1473                 if (rc >= 0) {                  /* success so far */
1474                         lwi = LWI_TIMEOUT(timeout, NULL, NULL);
1475                         brc = l_wait_event(req->rq_wait_for_rep,
1476                                            ptlrpc_bulk_complete(req->rq_bulk),
1477                                            &lwi);
1478                         if (brc != 0) {
1479                                 LASSERT(brc == -ETIMEDOUT);
1480                                 CERROR ("Timed out waiting for bulk\n");
1481                                 rc = brc;
1482                         }
1483                 }
1484                 if (rc < 0)
1485                         ptlrpc_unregister_bulk (req);
1486         }
1487
1488         LASSERT(!req->rq_receiving_reply);
1489         req->rq_phase = RQ_PHASE_INTERPRET;
1490         RETURN(rc);
1491 }
1492
1493 int ptlrpc_replay_req(struct ptlrpc_request *req)
1494 {
1495         int rc = 0, old_state, old_status = 0;
1496         // struct ptlrpc_client *cli = req->rq_import->imp_client;
1497         struct l_wait_info lwi;
1498         ENTRY;
1499
1500         LASSERT(req->rq_import->imp_state == LUSTRE_IMP_REPLAY);
1501
1502         /* I don't touch rq_phase here, so the debug log can show what
1503          * state it was left in */
1504
1505         /* Not handling automatic bulk replay yet (or ever?) */
1506         LASSERT(req->rq_bulk == NULL);
1507
1508         DEBUG_REQ(D_NET, req, "about to replay");
1509
1510         /* Update request's state, since we might have a new connection. */
1511         ptlrpc_put_connection(req->rq_connection);
1512         req->rq_connection =
1513                 ptlrpc_connection_addref(req->rq_import->imp_connection);
1514
1515         /* temporarily set request to REPLAY level---not strictly
1516          * necessary since ptl_send_rpc doesn't check state, but let's
1517          * be consistent.*/
1518         old_state = req->rq_send_state;
1519
1520         /*
1521          * Q: "How can a req get on the replay list if it wasn't replied?"
1522          * A: "If we failed during the replay of this request, it will still
1523          *     be on the list, but rq_replied will have been reset to 0."
1524          */
1525         if (req->rq_replied)
1526                 old_status = req->rq_repmsg->status;
1527         req->rq_send_state = LUSTRE_IMP_REPLAY;
1528         rc = ptl_send_rpc(req);
1529         if (rc) {
1530                 CERROR("error %d, opcode %d\n", rc, req->rq_reqmsg->opc);
1531                 ptlrpc_cleanup_request_buf(req);
1532                 // up(&cli->cli_rpc_sem);
1533                 GOTO(out, rc = -rc);
1534         }
1535
1536         CDEBUG(D_OTHER, "-- sleeping\n");
1537         lwi = LWI_INTR(NULL, NULL); /* XXX needs timeout, nested recovery */
1538         l_wait_event(req->rq_wait_for_rep, ptlrpc_check_reply(req), &lwi);
1539         CDEBUG(D_OTHER, "-- done\n");
1540
1541         // up(&cli->cli_rpc_sem);
1542
1543         /* If the reply was received normally, this just grabs the spinlock
1544          * (ensuring the reply callback has returned), sees that
1545          * req->rq_receiving_reply is clear and returns. */
1546         ptlrpc_unregister_reply (req);
1547
1548         if (!req->rq_replied) {
1549                 CERROR("Unknown reason for wakeup\n");
1550                 /* XXX Phil - I end up here when I kill obdctl */
1551                 /* ...that's because signals aren't all masked in
1552                  * l_wait_event() -eeb */
1553                 GOTO(out, rc = -EINTR);
1554         }
1555
1556 #if SWAB_PARANOIA
1557         /* Clear reply swab mask; this is a new reply in sender's byte order */
1558         req->rq_rep_swab_mask = 0;
1559 #endif
1560         rc = lustre_unpack_msg(req->rq_repmsg, req->rq_replen);
1561         if (rc) {
1562                 CERROR("unpack_rep failed: %d\n", rc);
1563                 GOTO(out, rc = -EPROTO);
1564         }
1565 #if 0
1566         /* FIXME: Enable when BlueArc makes new release */
1567         if (req->rq_repmsg->type != PTL_RPC_MSG_REPLY &&
1568             req->rq_repmsg->type != PTL_RPC_MSG_ERR) {
1569                 CERROR("invalid packet type received (type=%u)\n",
1570                        req->rq_repmsg->type);
1571                 GOTO(out, rc = -EPROTO);
1572         }
1573 #endif
1574
1575         if (req->rq_repmsg->type == PTL_RPC_MSG_ERR && 
1576             req->rq_repmsg->status == -ENOTCONN) 
1577                 GOTO(out, rc = req->rq_repmsg->status);
1578
1579         /* The transno had better not change over replay. */
1580         LASSERT(req->rq_reqmsg->transno == req->rq_repmsg->transno);
1581
1582         CDEBUG(D_NET, "got rep "LPD64"\n", req->rq_xid);
1583
1584         /* let the callback do fixups, possibly including in the request */
1585         if (req->rq_replay_cb)
1586                 req->rq_replay_cb(req);
1587
1588         if (req->rq_replied && req->rq_repmsg->status != old_status) {
1589                 DEBUG_REQ(D_ERROR, req, "status %d, old was %d",
1590                           req->rq_repmsg->status, old_status);
1591         } else {
1592                 /* Put it back for re-replay. */
1593                 req->rq_status = old_status;
1594         }
1595
1596  out:
1597         req->rq_send_state = old_state;
1598         RETURN(rc);
1599 }
1600
1601 void ptlrpc_abort_inflight(struct obd_import *imp)
1602 {
1603         unsigned long flags;
1604         struct list_head *tmp, *n;
1605         ENTRY;
1606
1607         /* Make sure that no new requests get processed for this import.
1608          * ptlrpc_{queue,set}_wait must (and does) hold imp_lock while testing
1609          * this flag and then putting requests on sending_list or delayed_list.
1610          */
1611         spin_lock_irqsave(&imp->imp_lock, flags);
1612
1613         /* XXX locking?  Maybe we should remove each request with the list
1614          * locked?  Also, how do we know if the requests on the list are
1615          * being freed at this time?
1616          */
1617         list_for_each_safe(tmp, n, &imp->imp_sending_list) {
1618                 struct ptlrpc_request *req =
1619                         list_entry(tmp, struct ptlrpc_request, rq_list);
1620
1621                 DEBUG_REQ(D_HA, req, "inflight");
1622
1623                 spin_lock (&req->rq_lock);
1624                 if (req->rq_import_generation < imp->imp_generation) {
1625                         req->rq_err = 1;
1626                         if (req->rq_set != NULL)
1627                                 wake_up(&req->rq_set->set_waitq);
1628                         else
1629                                 wake_up(&req->rq_wait_for_rep);
1630                 }
1631                 spin_unlock (&req->rq_lock);
1632         }
1633
1634         list_for_each_safe(tmp, n, &imp->imp_delayed_list) {
1635                 struct ptlrpc_request *req =
1636                         list_entry(tmp, struct ptlrpc_request, rq_list);
1637
1638                 DEBUG_REQ(D_HA, req, "aborting waiting req");
1639
1640                 spin_lock (&req->rq_lock);
1641                 if (req->rq_import_generation < imp->imp_generation) {
1642                         req->rq_err = 1;
1643                         if (req->rq_set != NULL)
1644                                 wake_up(&req->rq_set->set_waitq);
1645                         else
1646                                 wake_up(&req->rq_wait_for_rep);
1647                 }
1648                 spin_unlock (&req->rq_lock);
1649         }
1650
1651         /* Last chance to free reqs left on the replay list, but we
1652          * will still leak reqs that haven't comitted.  */
1653         if (imp->imp_replayable)
1654                 ptlrpc_free_committed(imp);
1655
1656         spin_unlock_irqrestore(&imp->imp_lock, flags);
1657
1658         EXIT;
1659 }
1660
1661 static __u64 ptlrpc_last_xid = 0;
1662 static spinlock_t ptlrpc_last_xid_lock = SPIN_LOCK_UNLOCKED;
1663
1664 __u64 ptlrpc_next_xid(void)
1665 {
1666         __u64 tmp;
1667         spin_lock(&ptlrpc_last_xid_lock);
1668         tmp = ++ptlrpc_last_xid;
1669         spin_unlock(&ptlrpc_last_xid_lock);
1670         return tmp;
1671 }
1672
1673