Whamcloud - gitweb
b=2494
[fs/lustre-release.git] / lustre / ptlrpc / client.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  *
21  */
22
23 #define DEBUG_SUBSYSTEM S_RPC
24 #ifndef __KERNEL__
25 #include <errno.h>
26 #include <signal.h>
27 #include <liblustre.h>
28 #endif
29
30 #include <linux/obd_support.h>
31 #include <linux/obd_class.h>
32 #include <linux/lustre_lib.h>
33 #include <linux/lustre_ha.h>
34 #include <linux/lustre_import.h>
35
36 #include "ptlrpc_internal.h"
37
38 void ptlrpc_init_client(int req_portal, int rep_portal, char *name,
39                         struct ptlrpc_client *cl)
40 {
41         cl->cli_request_portal = req_portal;
42         cl->cli_reply_portal   = rep_portal;
43         cl->cli_name           = name;
44 }
45
46 struct obd_uuid *ptlrpc_req_to_uuid(struct ptlrpc_request *req)
47 {
48         return &req->rq_connection->c_remote_uuid;
49 }
50
51 struct ptlrpc_connection *ptlrpc_uuid_to_connection(struct obd_uuid *uuid)
52 {
53         struct ptlrpc_connection *c;
54         struct ptlrpc_peer peer;
55         int err;
56
57         err = ptlrpc_uuid_to_peer(uuid, &peer);
58         if (err != 0) {
59                 CERROR("cannot find peer %s!\n", uuid->uuid);
60                 return NULL;
61         }
62
63         c = ptlrpc_get_connection(&peer, uuid);
64         if (c) {
65                 memcpy(c->c_remote_uuid.uuid,
66                        uuid->uuid, sizeof(c->c_remote_uuid.uuid));
67         }
68
69         CDEBUG(D_INFO, "%s -> %p\n", uuid->uuid, c);
70
71         return c;
72 }
73
74 void ptlrpc_readdress_connection(struct ptlrpc_connection *conn,
75                                  struct obd_uuid *uuid)
76 {
77         struct ptlrpc_peer peer;
78         int err;
79
80         err = ptlrpc_uuid_to_peer(uuid, &peer);
81         if (err != 0) {
82                 CERROR("cannot find peer %s!\n", uuid->uuid);
83                 return;
84         }
85
86         memcpy(&conn->c_peer, &peer, sizeof (peer));
87         return;
88 }
89
90 static inline struct ptlrpc_bulk_desc *new_bulk(void)
91 {
92         struct ptlrpc_bulk_desc *desc;
93
94         OBD_ALLOC(desc, sizeof(*desc));
95         if (!desc)
96                 return NULL;
97
98         spin_lock_init(&desc->bd_lock);
99         init_waitqueue_head(&desc->bd_waitq);
100         INIT_LIST_HEAD(&desc->bd_page_list);
101         desc->bd_md_h = PTL_HANDLE_NONE;
102         desc->bd_me_h = PTL_HANDLE_NONE;
103
104         return desc;
105 }
106
107 struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_imp (struct ptlrpc_request *req,
108                                                int type, int portal)
109 {
110         struct obd_import *imp = req->rq_import;
111         struct ptlrpc_bulk_desc *desc;
112
113         LASSERT(type == BULK_PUT_SINK || type == BULK_GET_SOURCE);
114
115         desc = new_bulk();
116         if (desc == NULL)
117                 RETURN(NULL);
118
119         desc->bd_import_generation = req->rq_import_generation;
120         desc->bd_import = class_import_get(imp);
121         desc->bd_req = req;
122         desc->bd_type = type;
123         desc->bd_portal = portal;
124
125         /* This makes req own desc, and free it when she frees herself */
126         req->rq_bulk = desc;
127
128         return desc;
129 }
130
131 struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_exp (struct ptlrpc_request *req,
132                                                int type, int portal)
133 {
134         struct obd_export *exp = req->rq_export;
135         struct ptlrpc_bulk_desc *desc;
136
137         LASSERT(type == BULK_PUT_SOURCE || type == BULK_GET_SINK);
138
139         desc = new_bulk();
140         if (desc == NULL)
141                 RETURN(NULL);
142
143         desc->bd_export = class_export_get(exp);
144         desc->bd_req = req;
145         desc->bd_type = type;
146         desc->bd_portal = portal;
147
148         /* NB we don't assign rq_bulk here; server-side requests are
149          * re-used, and the handler frees the bulk desc explicitly. */
150
151         return desc;
152 }
153
154 int ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc,
155                           struct page *page, int pageoffset, int len)
156 {
157         struct ptlrpc_bulk_page *bulk;
158
159         OBD_ALLOC(bulk, sizeof(*bulk));
160         if (bulk == NULL)
161                 return -ENOMEM;
162
163         LASSERT(page != NULL);
164         LASSERT(pageoffset >= 0);
165         LASSERT(len > 0);
166         LASSERT(pageoffset + len <= PAGE_SIZE);
167
168         bulk->bp_page = page;
169         bulk->bp_pageoffset = pageoffset;
170         bulk->bp_buflen = len;
171
172         bulk->bp_desc = desc;
173         list_add_tail(&bulk->bp_link, &desc->bd_page_list);
174         desc->bd_page_count++;
175         return 0;
176 }
177
178 void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *desc)
179 {
180         struct list_head *tmp, *next;
181         ENTRY;
182
183         LASSERT(desc != NULL);
184         LASSERT(desc->bd_page_count != 0x5a5a5a5a); /* not freed already */
185         LASSERT(!desc->bd_network_rw);         /* network hands off or */
186
187         list_for_each_safe(tmp, next, &desc->bd_page_list) {
188                 struct ptlrpc_bulk_page *bulk;
189                 bulk = list_entry(tmp, struct ptlrpc_bulk_page, bp_link);
190                 ptlrpc_free_bulk_page(bulk);
191         }
192
193         LASSERT(desc->bd_page_count == 0);
194         LASSERT((desc->bd_export != NULL) ^ (desc->bd_import != NULL));
195
196         if (desc->bd_export)
197                 class_export_put(desc->bd_export);
198         else
199                 class_import_put(desc->bd_import);
200
201         OBD_FREE(desc, sizeof(*desc));
202         EXIT;
203 }
204
205 void ptlrpc_free_bulk_page(struct ptlrpc_bulk_page *bulk)
206 {
207         LASSERT(bulk != NULL);
208
209         list_del(&bulk->bp_link);
210         bulk->bp_desc->bd_page_count--;
211         OBD_FREE(bulk, sizeof(*bulk));
212 }
213
214 struct ptlrpc_request *ptlrpc_prep_req(struct obd_import *imp, int opcode,
215                                        int count, int *lengths, char **bufs)
216 {
217         struct ptlrpc_request *request;
218         int rc;
219         ENTRY;
220
221         LASSERT((unsigned long)imp > 0x1000);
222
223         OBD_ALLOC(request, sizeof(*request));
224         if (!request) {
225                 CERROR("request allocation out of memory\n");
226                 RETURN(NULL);
227         }
228
229         rc = lustre_pack_request(request, count, lengths, bufs);
230         if (rc) {
231                 CERROR("cannot pack request %d\n", rc);
232                 OBD_FREE(request, sizeof(*request));
233                 RETURN(NULL);
234         }
235
236         if (imp->imp_server_timeout)
237                 request->rq_timeout = obd_timeout / 2;
238         else
239                 request->rq_timeout = obd_timeout;
240         request->rq_send_state = LUSTRE_IMP_FULL;
241         request->rq_type = PTL_RPC_MSG_REQUEST;
242         request->rq_import = class_import_get(imp);
243         request->rq_phase = RQ_PHASE_NEW;
244
245         /* XXX FIXME bug 249 */
246         request->rq_request_portal = imp->imp_client->cli_request_portal;
247         request->rq_reply_portal = imp->imp_client->cli_reply_portal;
248
249         request->rq_connection = ptlrpc_connection_addref(imp->imp_connection);
250
251         spin_lock_init(&request->rq_lock);
252         INIT_LIST_HEAD(&request->rq_list);
253         init_waitqueue_head(&request->rq_reply_waitq);
254         request->rq_xid = ptlrpc_next_xid();
255         atomic_set(&request->rq_refcount, 1);
256
257         request->rq_reqmsg->opc = opcode;
258         request->rq_reqmsg->flags = 0;
259
260         RETURN(request);
261 }
262
263 struct ptlrpc_request_set *ptlrpc_prep_set(void)
264 {
265         struct ptlrpc_request_set *set;
266
267         OBD_ALLOC(set, sizeof *set);
268         if (!set)
269                 RETURN(NULL);
270         INIT_LIST_HEAD(&set->set_requests);
271         init_waitqueue_head(&set->set_waitq);
272         set->set_remaining = 0;
273         spin_lock_init(&set->set_new_req_lock);
274         INIT_LIST_HEAD(&set->set_new_requests);
275
276         RETURN(set);
277 }
278
279 /* Finish with this set; opposite of prep_set. */
280 void ptlrpc_set_destroy(struct ptlrpc_request_set *set)
281 {
282         struct list_head *tmp;
283         struct list_head *next;
284         int               expected_phase;
285         int               n = 0;
286         ENTRY;
287
288         /* Requests on the set should either all be completed, or all be new */
289         expected_phase = (set->set_remaining == 0) ?
290                          RQ_PHASE_COMPLETE : RQ_PHASE_NEW;
291         list_for_each (tmp, &set->set_requests) {
292                 struct ptlrpc_request *req =
293                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
294
295                 LASSERT(req->rq_phase == expected_phase);
296                 n++;
297         }
298
299         LASSERT(set->set_remaining == 0 || set->set_remaining == n);
300
301         list_for_each_safe(tmp, next, &set->set_requests) {
302                 struct ptlrpc_request *req =
303                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
304                 list_del_init(&req->rq_set_chain);
305
306                 LASSERT(req->rq_phase == expected_phase);
307
308                 if (req->rq_phase == RQ_PHASE_NEW) {
309
310                         if (req->rq_interpret_reply != NULL) {
311                                 int (*interpreter)(struct ptlrpc_request *,
312                                                    void *, int) =
313                                         req->rq_interpret_reply;
314
315                                 /* higher level (i.e. LOV) failed;
316                                  * let the sub reqs clean up */
317                                 req->rq_status = -EBADR;
318                                 interpreter(req, &req->rq_async_args,
319                                             req->rq_status);
320                         }
321                         set->set_remaining--;
322                 }
323
324                 req->rq_set = NULL;
325                 ptlrpc_req_finished (req);
326         }
327
328         LASSERT(set->set_remaining == 0);
329
330         OBD_FREE(set, sizeof(*set));
331         EXIT;
332 }
333
334 void ptlrpc_set_add_req(struct ptlrpc_request_set *set,
335                         struct ptlrpc_request *req)
336 {
337         /* The set takes over the caller's request reference */
338         list_add_tail(&req->rq_set_chain, &set->set_requests);
339         req->rq_set = set;
340         set->set_remaining++;
341 }
342
343 /* lock so many callers can add things, the context that owns the set
344  * is supposed to notice these and move them into the set proper. */
345 void ptlrpc_set_add_new_req(struct ptlrpc_request_set *set,
346                             struct ptlrpc_request *req)
347 {
348         unsigned long flags;
349         spin_lock_irqsave(&set->set_new_req_lock, flags);
350         /* The set takes over the caller's request reference */
351         list_add_tail(&req->rq_set_chain, &set->set_new_requests);
352         req->rq_set = set;
353         spin_unlock_irqrestore(&set->set_new_req_lock, flags);
354 }
355
356 /*
357  * Based on the current state of the import, determine if the request
358  * can be sent, is an error, or should be delayed.
359  *
360  * Returns true if this request should be delayed. If false, and
361  * *status is set, then the request can not be sent and *status is the
362  * error code.  If false and status is 0, then request can be sent.
363  *
364  * The imp->imp_lock must be held.
365  */
366 static int ptlrpc_import_delay_req(struct obd_import *imp, 
367                                    struct ptlrpc_request *req, int *status)
368 {
369         int delay = 0;
370         ENTRY;
371
372         LASSERT (status != NULL);
373         *status = 0;
374
375         /* A new import, or one that has been cleaned up.
376          */
377         if (imp->imp_state == LUSTRE_IMP_NEW) {
378                 DEBUG_REQ(D_ERROR, req, "Uninitialized import.");
379                 *status = -EIO;
380         }
381         /*
382          * If the import has been invalidated (such as by an OST failure), the
383          * request must fail with -EIO.  
384          */
385         else if (imp->imp_invalid) {
386                 DEBUG_REQ(D_ERROR, req, "IMP_INVALID");
387                 *status = -EIO;
388         } 
389         else if (req->rq_import_generation != imp->imp_generation) {
390                 DEBUG_REQ(D_ERROR, req, "req wrong generation:");
391                 *status = -EIO;
392         } 
393         else if (req->rq_send_state != imp->imp_state) {
394                 if (imp->imp_obd->obd_no_recov || imp->imp_dlm_fake) 
395                         *status = -EWOULDBLOCK;
396                 else
397                         delay = 1;
398         }
399
400         RETURN(delay);
401 }
402
403 static int ptlrpc_check_reply(struct ptlrpc_request *req)
404 {
405         unsigned long flags;
406         int rc = 0;
407         ENTRY;
408
409         /* serialise with network callback */
410         spin_lock_irqsave (&req->rq_lock, flags);
411
412         if (req->rq_replied) {
413                 DEBUG_REQ(D_NET, req, "REPLIED:");
414                 GOTO(out, rc = 1);
415         }
416
417         if (req->rq_err) {
418                 DEBUG_REQ(D_ERROR, req, "ABORTED:");
419                 GOTO(out, rc = 1);
420         }
421
422         if (req->rq_resend) {
423                 DEBUG_REQ(D_ERROR, req, "RESEND:");
424                 GOTO(out, rc = 1);
425         }
426
427         if (req->rq_restart) {
428                 DEBUG_REQ(D_ERROR, req, "RESTART:");
429                 GOTO(out, rc = 1);
430         }
431         EXIT;
432  out:
433         spin_unlock_irqrestore (&req->rq_lock, flags);
434         DEBUG_REQ(D_NET, req, "rc = %d for", rc);
435         return rc;
436 }
437
438 static int ptlrpc_check_status(struct ptlrpc_request *req)
439 {
440         int err;
441         ENTRY;
442
443         err = req->rq_repmsg->status;
444         if (req->rq_repmsg->type == PTL_RPC_MSG_ERR) {
445                 DEBUG_REQ(D_ERROR, req, "type == PTL_RPC_MSG_ERR");
446                 RETURN(err < 0 ? err : -EINVAL);
447         }
448
449         if (err < 0) {
450                 DEBUG_REQ(D_INFO, req, "status is %d", err);
451         } else if (err > 0) {
452                 /* XXX: translate this error from net to host */
453                 DEBUG_REQ(D_INFO, req, "status is %d", err);
454         }
455
456         RETURN(err);
457 }
458
459 static int after_reply(struct ptlrpc_request *req, int *restartp)
460 {
461         unsigned long flags;
462         struct obd_import *imp = req->rq_import;
463         int rc;
464         ENTRY;
465
466         LASSERT(!req->rq_receiving_reply);
467         LASSERT(req->rq_replied);
468
469         if (restartp != NULL)
470                 *restartp = 0;
471
472         /* NB Until this point, the whole of the incoming message,
473          * including buflens, status etc is in the sender's byte order. */
474
475 #if SWAB_PARANOIA
476         /* Clear reply swab mask; this is a new reply in sender's byte order */
477         req->rq_rep_swab_mask = 0;
478 #endif
479         rc = lustre_unpack_msg(req->rq_repmsg, req->rq_replen);
480         if (rc) {
481                 CERROR("unpack_rep failed: %d\n", rc);
482                 RETURN(-EPROTO);
483         }
484
485         if (req->rq_repmsg->type != PTL_RPC_MSG_REPLY &&
486             req->rq_repmsg->type != PTL_RPC_MSG_ERR) {
487                 CERROR("invalid packet type received (type=%u)\n",
488                        req->rq_repmsg->type);
489                 RETURN(-EPROTO);
490         }
491
492         /* Store transno in reqmsg for replay. */
493         req->rq_reqmsg->transno = req->rq_transno = req->rq_repmsg->transno;
494
495         rc = ptlrpc_check_status(req);
496
497         /* Either we've been evicted, or the server has failed for
498          * some reason. Try to reconnect, and if that fails, punt to the
499          * upcall. */
500         if (rc == -ENOTCONN) {
501                 if (req->rq_send_state != LUSTRE_IMP_FULL ||
502                     imp->imp_obd->obd_no_recov || imp->imp_dlm_fake) {
503                         RETURN(-ENOTCONN);
504                 }
505
506                 ptlrpc_request_handle_notconn(req);
507
508                 if (req->rq_err)
509                         RETURN(-EIO);
510
511                 if (req->rq_no_resend)
512                         RETURN(rc); /* -ENOTCONN */
513
514                 if (req->rq_resend) {
515                         if (restartp == NULL)
516                                 LBUG(); /* async resend not supported yet */
517                         spin_lock_irqsave (&req->rq_lock, flags);
518                         req->rq_resend = 0;
519                         spin_unlock_irqrestore (&req->rq_lock, flags);
520                         *restartp = 1;
521                         lustre_msg_add_flags(req->rq_reqmsg, MSG_RESENT);
522                         DEBUG_REQ(D_HA, req, "resending: ");
523                         RETURN(0);
524                 }
525
526                 CERROR("request should be err or resend: %p\n", req);
527                 LBUG();
528         }
529
530         if (req->rq_import->imp_replayable) {
531                 spin_lock_irqsave(&imp->imp_lock, flags);
532                 if (req->rq_replay || req->rq_transno != 0)
533                         ptlrpc_retain_replayable_request(req, imp);
534                 else if (req->rq_commit_cb != NULL)
535                         req->rq_commit_cb(req);
536
537                 if (req->rq_transno > imp->imp_max_transno)
538                         imp->imp_max_transno = req->rq_transno;
539
540                 /* Replay-enabled imports return commit-status information. */
541                 if (req->rq_repmsg->last_committed)
542                         imp->imp_peer_committed_transno =
543                                 req->rq_repmsg->last_committed;
544                 ptlrpc_free_committed(imp);
545                 spin_unlock_irqrestore(&imp->imp_lock, flags);
546         }
547
548         RETURN(rc);
549 }
550
551 static int ptlrpc_send_new_req(struct ptlrpc_request *req)
552 {
553         struct obd_import     *imp;
554         unsigned long          flags;
555         int rc;
556         ENTRY;
557
558         LASSERT(req->rq_send_state == LUSTRE_IMP_FULL);
559         LASSERT(req->rq_phase == RQ_PHASE_NEW);
560         req->rq_phase = RQ_PHASE_RPC;
561
562         imp = req->rq_import;
563         spin_lock_irqsave(&imp->imp_lock, flags);
564
565         if (imp->imp_invalid) {
566                 spin_unlock_irqrestore(&imp->imp_lock, flags);
567                 req->rq_status = -EIO;
568                 req->rq_phase = RQ_PHASE_INTERPRET;
569                 RETURN(-EIO);
570         }
571
572         req->rq_import_generation = imp->imp_generation;
573
574         if (ptlrpc_import_delay_req(imp, req, &rc)) {
575                 spin_lock (&req->rq_lock);
576                 req->rq_waiting = 1;
577                 spin_unlock (&req->rq_lock);
578
579                 LASSERT(list_empty (&req->rq_list));
580
581                 // list_del(&req->rq_list);
582                 list_add_tail(&req->rq_list, &imp->imp_delayed_list);
583                 spin_unlock_irqrestore(&imp->imp_lock, flags);
584                 RETURN(0);
585         }
586
587         if (rc != 0) {
588                 spin_unlock_irqrestore(&imp->imp_lock, flags);
589                 req->rq_status = rc;
590                 req->rq_phase = RQ_PHASE_INTERPRET;
591                 RETURN(rc);
592         }
593
594         /* XXX this is the same as ptlrpc_queue_wait */
595         LASSERT(list_empty(&req->rq_list));
596         list_add_tail(&req->rq_list, &imp->imp_sending_list);
597         spin_unlock_irqrestore(&imp->imp_lock, flags);
598
599         req->rq_reqmsg->status = current->pid;
600         CDEBUG(D_RPCTRACE, "Sending RPC pname:cluuid:pid:xid:ni:nid:opc"
601                " %s:%s:%d:"LPU64":%s:"LPX64":%d\n", current->comm,
602                imp->imp_obd->obd_uuid.uuid, req->rq_reqmsg->status,
603                req->rq_xid,
604                imp->imp_connection->c_peer.peer_ni->pni_name,
605                imp->imp_connection->c_peer.peer_nid,
606                req->rq_reqmsg->opc);
607
608         rc = ptl_send_rpc(req);
609         if (rc) {
610                 DEBUG_REQ(D_HA, req, "send failed (%d); expect timeout", rc);
611                 req->rq_timeout = 1;
612                 RETURN(rc);
613         }
614         RETURN(0);
615 }
616
617 int ptlrpc_check_set(struct ptlrpc_request_set *set)
618 {
619         unsigned long flags;
620         struct list_head *tmp;
621         int force_timer_recalc = 0;
622         ENTRY;
623
624         if (set->set_remaining == 0)
625                 RETURN(1);
626
627         list_for_each(tmp, &set->set_requests) {
628                 struct ptlrpc_request *req =
629                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
630                 struct obd_import *imp = req->rq_import;
631                 int rc = 0;
632
633                 if (req->rq_phase == RQ_PHASE_NEW &&
634                     ptlrpc_send_new_req(req)) {
635                         force_timer_recalc = 1;
636                 }
637
638                 if (!(req->rq_phase == RQ_PHASE_RPC ||
639                       req->rq_phase == RQ_PHASE_BULK ||
640                       req->rq_phase == RQ_PHASE_INTERPRET ||
641                       req->rq_phase == RQ_PHASE_COMPLETE)) {
642                         DEBUG_REQ(D_ERROR, req, "bad phase %x", req->rq_phase);
643                         LBUG();
644                 }
645
646                 if (req->rq_phase == RQ_PHASE_COMPLETE)
647                         continue;
648
649                 if (req->rq_phase == RQ_PHASE_INTERPRET)
650                         GOTO(interpret, req->rq_status);
651
652                 if (req->rq_err) {
653                         ptlrpc_unregister_reply(req);
654                         if (req->rq_status == 0)
655                                 req->rq_status = -EIO;
656                         req->rq_phase = RQ_PHASE_INTERPRET;
657
658                         spin_lock_irqsave(&imp->imp_lock, flags);
659                         list_del_init(&req->rq_list);
660                         spin_unlock_irqrestore(&imp->imp_lock, flags);
661
662                         GOTO(interpret, req->rq_status);
663                 }
664
665                 if (req->rq_intr) {
666                         /* NB could be on delayed list */
667                         ptlrpc_unregister_reply(req);
668                         req->rq_status = -EINTR;
669                         req->rq_phase = RQ_PHASE_INTERPRET;
670
671                         spin_lock_irqsave(&imp->imp_lock, flags);
672                         list_del_init(&req->rq_list);
673                         spin_unlock_irqrestore(&imp->imp_lock, flags);
674
675                         GOTO(interpret, req->rq_status);
676                 }
677
678                 if (req->rq_phase == RQ_PHASE_RPC) {
679                         int do_restart = 0;
680                         if (req->rq_waiting || req->rq_resend) {
681                                 int status;
682                                 spin_lock_irqsave(&imp->imp_lock, flags);
683
684                                 if (ptlrpc_import_delay_req(imp, req, &status)) {
685                                         spin_unlock_irqrestore(&imp->imp_lock,
686                                                                flags);
687                                         continue;
688                                 } 
689
690                                 list_del(&req->rq_list);
691                                 list_add_tail(&req->rq_list,
692                                               &imp->imp_sending_list);
693
694                                 if (status != 0)  {
695                                         req->rq_status = status;
696                                         req->rq_phase = RQ_PHASE_INTERPRET;
697                                         spin_unlock_irqrestore(&imp->imp_lock,
698                                                                flags);
699                                         GOTO(interpret, req->rq_status);
700                                 }
701                                 spin_unlock_irqrestore(&imp->imp_lock, flags);
702
703                                 req->rq_waiting = 0;
704                                 if (req->rq_resend) {
705                                         lustre_msg_add_flags(req->rq_reqmsg,
706                                                              MSG_RESENT);
707                                         spin_lock_irqsave(&req->rq_lock, flags);
708                                         req->rq_resend = 0;
709                                         spin_unlock_irqrestore(&req->rq_lock,
710                                                                flags);
711
712                                         ptlrpc_unregister_reply(req);
713                                         if (req->rq_bulk) {
714                                                 __u64 old_xid = req->rq_xid;
715                                                 ptlrpc_unregister_bulk(req);
716                                                 /* ensure previous bulk fails */
717                                                 req->rq_xid = ptlrpc_next_xid();
718                                                 CDEBUG(D_HA, "resend bulk "
719                                                        "old x"LPU64
720                                                        " new x"LPU64"\n",
721                                                        old_xid, req->rq_xid);
722                                         }
723                                 }
724
725                                 rc = ptl_send_rpc(req);
726                                 if (rc) {
727                                         DEBUG_REQ(D_HA, req, "send failed (%d)",
728                                                   rc);
729                                         force_timer_recalc = 1;
730                                         req->rq_timeout = 0;
731                                 }
732                                 /* need to reset the timeout */
733                                 force_timer_recalc = 1;
734                         }
735
736                         /* Ensure the network callback returned */
737                         spin_lock_irqsave (&req->rq_lock, flags);
738                         if (!req->rq_replied) {
739                                 spin_unlock_irqrestore (&req->rq_lock, flags);
740                                 continue;
741                         }
742                         spin_unlock_irqrestore (&req->rq_lock, flags);
743
744                         spin_lock_irqsave(&imp->imp_lock, flags);
745                         list_del_init(&req->rq_list);
746                         spin_unlock_irqrestore(&imp->imp_lock, flags);
747
748                         req->rq_status = after_reply(req, &do_restart);
749                         if (do_restart) {
750                                 spin_lock_irqsave (&req->rq_lock, flags);
751                                 req->rq_resend = 1; /* ugh */
752                                 spin_unlock_irqrestore (&req->rq_lock, flags);
753                                 continue;
754                         }
755
756                         /* If there is no bulk associated with this request,
757                          * then we're done and should let the interpreter
758                          * process the reply.  Similarly if the RPC returned
759                          * an error, and therefore the bulk will never arrive.
760                          */
761                         if (req->rq_bulk == NULL || req->rq_status != 0) {
762                                 req->rq_phase = RQ_PHASE_INTERPRET;
763                                 GOTO(interpret, req->rq_status);
764                         }
765
766                         req->rq_phase = RQ_PHASE_BULK;
767                 }
768
769                 LASSERT(req->rq_phase == RQ_PHASE_BULK);
770                 if (!ptlrpc_bulk_complete (req->rq_bulk))
771                         continue;
772
773                 req->rq_phase = RQ_PHASE_INTERPRET;
774
775         interpret:
776                 LASSERT(req->rq_phase == RQ_PHASE_INTERPRET);
777                 LASSERT(!req->rq_receiving_reply);
778
779                 ptlrpc_unregister_reply(req);
780                 if (req->rq_bulk != NULL)
781                         ptlrpc_unregister_bulk (req);
782
783                 if (req->rq_interpret_reply != NULL) {
784                         int (*interpreter)(struct ptlrpc_request *,void *,int) =
785                                 req->rq_interpret_reply;
786                         req->rq_status = interpreter(req, &req->rq_async_args,
787                                                      req->rq_status);
788                 }
789
790                 CDEBUG(D_RPCTRACE, "Completed RPC pname:cluuid:pid:xid:ni:nid:"
791                        "opc %s:%s:%d:"LPU64":%s:"LPX64":%d\n", current->comm,
792                        imp->imp_obd->obd_uuid.uuid, req->rq_reqmsg->status,
793                        req->rq_xid,
794                        imp->imp_connection->c_peer.peer_ni->pni_name,
795                        imp->imp_connection->c_peer.peer_nid,
796                        req->rq_reqmsg->opc);
797
798                 req->rq_phase = RQ_PHASE_COMPLETE;
799                 set->set_remaining--;
800         }
801
802         /* If we hit an error, we want to recover promptly. */
803         RETURN(set->set_remaining == 0 || force_timer_recalc);
804 }
805
806 int ptlrpc_expire_one_request(struct ptlrpc_request *req)
807 {
808         unsigned long      flags;
809         struct obd_import *imp = req->rq_import;
810         ENTRY;
811
812         DEBUG_REQ(D_ERROR, req, "timeout");
813
814         spin_lock_irqsave (&req->rq_lock, flags);
815         req->rq_timedout = 1;
816         spin_unlock_irqrestore (&req->rq_lock, flags);
817
818         ptlrpc_unregister_reply (req);
819
820         if (imp == NULL) {
821                 DEBUG_REQ(D_HA, req, "NULL import: already cleaned up?");
822                 RETURN(1);
823         }
824
825         /* The DLM server doesn't want recovery run on its imports. */
826         if (imp->imp_dlm_fake)
827                 RETURN(1);
828
829         /* If this request is for recovery or other primordial tasks,
830          * don't go back to sleep, and don't start recovery again.. */
831         if (req->rq_send_state != LUSTRE_IMP_FULL || imp->imp_obd->obd_no_recov)
832                 RETURN(1);
833
834         ptlrpc_fail_import(imp, req->rq_import_generation);
835
836         RETURN(0);
837 }
838
839 int ptlrpc_expired_set(void *data)
840 {
841         struct ptlrpc_request_set *set = data;
842         struct list_head          *tmp;
843         time_t                     now = LTIME_S (CURRENT_TIME);
844         ENTRY;
845
846         LASSERT(set != NULL);
847
848         /* A timeout expired; see which reqs it applies to... */
849         list_for_each (tmp, &set->set_requests) {
850                 struct ptlrpc_request *req =
851                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
852
853                 /* request in-flight? */
854                 if (!((req->rq_phase == RQ_PHASE_RPC && !req->rq_waiting) ||
855                       (req->rq_phase == RQ_PHASE_BULK)))
856                         continue;
857
858                 if (req->rq_timedout ||           /* already dealt with */
859                     req->rq_sent + req->rq_timeout > now) /* not expired */
860                         continue;
861
862                 /* deal with this guy */
863                 ptlrpc_expire_one_request (req);
864         }
865
866         /* When waiting for a whole set, we always to break out of the
867          * sleep so we can recalculate the timeout, or enable interrupts
868          * iff everyone's timed out.
869          */
870         RETURN(1);
871 }
872
873 void ptlrpc_interrupted_set(void *data)
874 {
875         struct ptlrpc_request_set *set = data;
876         struct list_head *tmp;
877         unsigned long flags;
878
879         LASSERT(set != NULL);
880         CERROR("INTERRUPTED SET %p\n", set);
881
882         list_for_each(tmp, &set->set_requests) {
883                 struct ptlrpc_request *req =
884                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
885
886                 if (req->rq_phase != RQ_PHASE_RPC)
887                         continue;
888
889                 spin_lock_irqsave (&req->rq_lock, flags);
890                 req->rq_intr = 1;
891                 spin_unlock_irqrestore (&req->rq_lock, flags);
892         }
893 }
894
895 int ptlrpc_set_next_timeout(struct ptlrpc_request_set *set)
896 {
897         struct list_head      *tmp;
898         time_t                 now = LTIME_S(CURRENT_TIME);
899         time_t                 deadline;
900         int                    timeout = 0;
901         struct ptlrpc_request *req;
902         ENTRY;
903
904         SIGNAL_MASK_ASSERT(); /* XXX BUG 1511 */
905
906         list_for_each(tmp, &set->set_requests) {
907                 req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
908
909                 /* request in-flight? */
910                 if (!((req->rq_phase == RQ_PHASE_RPC && !req->rq_waiting) ||
911                       (req->rq_phase == RQ_PHASE_BULK)))
912                         continue;
913
914                 if (req->rq_timedout)   /* already timed out */
915                         continue;
916
917                 deadline = req->rq_sent + req->rq_timeout;
918                 if (deadline <= now)    /* actually expired already */
919                         timeout = 1;    /* ASAP */
920                 else if (timeout == 0 || timeout > deadline - now)
921                         timeout = deadline - now;
922         }
923         RETURN(timeout);
924 }
925                 
926
927 int ptlrpc_set_wait(struct ptlrpc_request_set *set)
928 {
929         struct list_head      *tmp;
930         struct ptlrpc_request *req;
931         struct l_wait_info     lwi;
932         int                    rc, timeout;
933         ENTRY;
934
935         LASSERT(!list_empty(&set->set_requests));
936         list_for_each(tmp, &set->set_requests) {
937                 req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
938                 (void)ptlrpc_send_new_req(req);
939         }
940
941         do {
942                 timeout = ptlrpc_set_next_timeout(set);
943
944                 /* wait until all complete, interrupted, or an in-flight
945                  * req times out */
946                 CDEBUG(D_HA, "set %p going to sleep for %d seconds\n",
947                        set, timeout);
948                 lwi = LWI_TIMEOUT_INTR((timeout ? timeout : 1) * HZ,
949                                        ptlrpc_expired_set, 
950                                        ptlrpc_interrupted_set, set);
951                 rc = l_wait_event(set->set_waitq, ptlrpc_check_set(set), &lwi);
952
953                 LASSERT(rc == 0 || rc == -EINTR || rc == -ETIMEDOUT);
954
955                 /* -EINTR => all requests have been flagged rq_intr so next
956                  * check completes.
957                  * -ETIMEOUTD => someone timed out.  When all reqs have
958                  * timed out, signals are enabled allowing completion with
959                  * EINTR.
960                  * I don't really care if we go once more round the loop in
961                  * the error cases -eeb. */
962         } while (rc != 0);
963
964         LASSERT(set->set_remaining == 0);
965
966         rc = 0;
967         list_for_each(tmp, &set->set_requests) {
968                 req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
969
970                 LASSERT(req->rq_phase == RQ_PHASE_COMPLETE);
971                 if (req->rq_status != 0)
972                         rc = req->rq_status;
973         }
974
975         if (set->set_interpret != NULL) {
976                 int (*interpreter)(struct ptlrpc_request_set *set,void *,int) =
977                         set->set_interpret;
978                 rc = interpreter (set, &set->set_args, rc);
979         }
980
981         RETURN(rc);
982 }
983
984 static void __ptlrpc_free_req(struct ptlrpc_request *request, int locked)
985 {
986         ENTRY;
987         if (request == NULL) {
988                 EXIT;
989                 return;
990         }
991
992         LASSERT(!request->rq_receiving_reply);
993
994         /* We must take it off the imp_replay_list first.  Otherwise, we'll set
995          * request->rq_reqmsg to NULL while osc_close is dereferencing it. */
996         if (request->rq_import != NULL) {
997                 unsigned long flags = 0;
998                 if (!locked)
999                         spin_lock_irqsave(&request->rq_import->imp_lock, flags);
1000                 list_del_init(&request->rq_list);
1001                 if (!locked)
1002                         spin_unlock_irqrestore(&request->rq_import->imp_lock,
1003                                                flags);
1004         }
1005
1006         if (atomic_read(&request->rq_refcount) != 0) {
1007                 DEBUG_REQ(D_ERROR, request,
1008                           "freeing request with nonzero refcount");
1009                 LBUG();
1010         }
1011
1012         if (request->rq_repmsg != NULL) {
1013                 OBD_FREE(request->rq_repmsg, request->rq_replen);
1014                 request->rq_repmsg = NULL;
1015         }
1016         if (request->rq_reqmsg != NULL) {
1017                 OBD_FREE(request->rq_reqmsg, request->rq_reqlen);
1018                 request->rq_reqmsg = NULL;
1019         }
1020         if (request->rq_export != NULL) {
1021                 class_export_put(request->rq_export);
1022                 request->rq_export = NULL;
1023         }
1024         if (request->rq_import != NULL) {
1025                 class_import_put(request->rq_import);
1026                 request->rq_import = NULL;
1027         }
1028         if (request->rq_bulk != NULL)
1029                 ptlrpc_free_bulk(request->rq_bulk);
1030
1031         ptlrpc_put_connection(request->rq_connection);
1032         OBD_FREE(request, sizeof(*request));
1033         EXIT;
1034 }
1035
1036 void ptlrpc_free_req(struct ptlrpc_request *request)
1037 {
1038         __ptlrpc_free_req(request, 0);
1039 }
1040
1041 static int __ptlrpc_req_finished(struct ptlrpc_request *request, int locked);
1042 void ptlrpc_req_finished_with_imp_lock(struct ptlrpc_request *request)
1043 {
1044 #ifdef CONFIG_SMP
1045         LASSERT(spin_is_locked(&request->rq_import->imp_lock));
1046 #endif
1047         (void)__ptlrpc_req_finished(request, 1);
1048 }
1049
1050 static int __ptlrpc_req_finished(struct ptlrpc_request *request, int locked)
1051 {
1052         ENTRY;
1053         if (request == NULL)
1054                 RETURN(1);
1055
1056         if (request == (void *)(unsigned long)(0x5a5a5a5a5a5a5a5a) ||
1057             request->rq_reqmsg == (void *)(unsigned long)(0x5a5a5a5a5a5a5a5a)) {
1058                 CERROR("dereferencing freed request (bug 575)\n");
1059                 LBUG();
1060                 RETURN(1);
1061         }
1062
1063         DEBUG_REQ(D_INFO, request, "refcount now %u",
1064                   atomic_read(&request->rq_refcount) - 1);
1065
1066         if (atomic_dec_and_test(&request->rq_refcount)) {
1067                 __ptlrpc_free_req(request, locked);
1068                 RETURN(1);
1069         }
1070
1071         RETURN(0);
1072 }
1073
1074 void ptlrpc_req_finished(struct ptlrpc_request *request)
1075 {
1076         __ptlrpc_req_finished(request, 0);
1077 }
1078
1079 static void ptlrpc_cleanup_request_buf(struct ptlrpc_request *request)
1080 {
1081         OBD_FREE(request->rq_reqmsg, request->rq_reqlen);
1082         request->rq_reqmsg = NULL;
1083         request->rq_reqlen = 0;
1084 }
1085
1086 /* Disengage the client's reply buffer from the network
1087  * NB does _NOT_ unregister any client-side bulk.
1088  * IDEMPOTENT, but _not_ safe against concurrent callers.
1089  * The request owner (i.e. the thread doing the I/O) must call...
1090  */
1091 void ptlrpc_unregister_reply (struct ptlrpc_request *request)
1092 {
1093         unsigned long flags;
1094         int           rc;
1095         ENTRY;
1096
1097         LASSERT(!in_interrupt ());             /* might sleep */
1098
1099         spin_lock_irqsave (&request->rq_lock, flags);
1100         if (!request->rq_receiving_reply) {     /* not waiting for a reply */
1101                 spin_unlock_irqrestore (&request->rq_lock, flags);
1102                 EXIT;
1103                 /* NB reply buffer not freed here */
1104                 return;
1105         }
1106
1107         LASSERT(!request->rq_replied);         /* callback hasn't completed */
1108         spin_unlock_irqrestore (&request->rq_lock, flags);
1109
1110         rc = PtlMDUnlink (request->rq_reply_md_h);
1111         switch (rc) {
1112         default:
1113                 LBUG ();
1114
1115         case PTL_OK:                            /* unlinked before completion */
1116                 LASSERT(request->rq_receiving_reply);
1117                 LASSERT(!request->rq_replied);
1118                 spin_lock_irqsave (&request->rq_lock, flags);
1119                 request->rq_receiving_reply = 0;
1120                 spin_unlock_irqrestore (&request->rq_lock, flags);
1121                 OBD_FREE(request->rq_repmsg, request->rq_replen);
1122                 request->rq_repmsg = NULL;
1123                 EXIT;
1124                 return;
1125
1126         case PTL_MD_INUSE:                      /* callback in progress */
1127                 for (;;) {
1128                         /* Network access will complete in finite time but
1129                          * the timeout lets us CERROR for visibility */
1130                         struct l_wait_info lwi = LWI_TIMEOUT(10*HZ, NULL, NULL);
1131
1132                         rc = l_wait_event (request->rq_reply_waitq,
1133                                            request->rq_replied, &lwi);
1134                         LASSERT(rc == 0 || rc == -ETIMEDOUT);
1135                         if (rc == 0) {
1136                                 spin_lock_irqsave (&request->rq_lock, flags);
1137                                 /* Ensure the callback has completed scheduling
1138                                  * me and taken its hands off the request */
1139                                 spin_unlock_irqrestore(&request->rq_lock,flags);
1140                                 break;
1141                         }
1142
1143                         CERROR ("Unexpectedly long timeout: req %p\n", request);
1144                 }
1145                 /* fall through */
1146
1147         case PTL_INV_MD:                        /* callback completed */
1148                 LASSERT(!request->rq_receiving_reply);
1149                 LASSERT(request->rq_replied);
1150                 EXIT;
1151                 return;
1152         }
1153         /* Not Reached */
1154 }
1155
1156 /* caller must hold imp->imp_lock */
1157 void ptlrpc_free_committed(struct obd_import *imp)
1158 {
1159         struct list_head *tmp, *saved;
1160         struct ptlrpc_request *req;
1161         struct ptlrpc_request *last_req = NULL; /* temporary fire escape */
1162         ENTRY;
1163
1164         LASSERT(imp != NULL);
1165
1166 #ifdef CONFIG_SMP
1167         LASSERT(spin_is_locked(&imp->imp_lock));
1168 #endif
1169
1170         CDEBUG(D_HA, "%s: committing for last_committed "LPU64"\n",
1171                imp->imp_obd->obd_name, imp->imp_peer_committed_transno);
1172
1173         list_for_each_safe(tmp, saved, &imp->imp_replay_list) {
1174                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
1175
1176                 /* XXX ok to remove when 1357 resolved - rread 05/29/03  */
1177                 LASSERT(req != last_req);
1178                 last_req = req;
1179
1180                 if (req->rq_import_generation < imp->imp_generation) {
1181                         DEBUG_REQ(D_HA, req, "freeing request with old gen");
1182                         GOTO(free_req, 0);
1183                 }
1184
1185                 if (req->rq_replay) {
1186                         DEBUG_REQ(D_HA, req, "keeping (FL_REPLAY)");
1187                         continue;
1188                 }
1189
1190                 /* not yet committed */
1191                 if (req->rq_transno > imp->imp_peer_committed_transno) {
1192                         DEBUG_REQ(D_HA, req, "stopping search");
1193                         break;
1194                 }
1195
1196                 DEBUG_REQ(D_HA, req, "committing (last_committed "LPU64")",
1197                           imp->imp_peer_committed_transno);
1198 free_req:
1199                 if (req->rq_commit_cb != NULL)
1200                         req->rq_commit_cb(req);
1201                 list_del_init(&req->rq_list);
1202                 __ptlrpc_req_finished(req, 1);
1203         }
1204
1205         EXIT;
1206         return;
1207 }
1208
1209 void ptlrpc_cleanup_client(struct obd_import *imp)
1210 {
1211         ENTRY;
1212         EXIT;
1213         return;
1214 }
1215
1216 void ptlrpc_resend_req(struct ptlrpc_request *req)
1217 {
1218         unsigned long flags;
1219
1220         DEBUG_REQ(D_HA, req, "resending");
1221         req->rq_reqmsg->handle.cookie = 0;
1222         ptlrpc_put_connection(req->rq_connection);
1223         req->rq_connection =
1224                 ptlrpc_connection_addref(req->rq_import->imp_connection);
1225         req->rq_status = -EAGAIN;
1226
1227         spin_lock_irqsave (&req->rq_lock, flags);
1228         req->rq_resend = 1;
1229         req->rq_timedout = 0;
1230         if (req->rq_set != NULL)
1231                 wake_up (&req->rq_set->set_waitq);
1232         else
1233                 wake_up(&req->rq_reply_waitq);
1234         spin_unlock_irqrestore (&req->rq_lock, flags);
1235 }
1236
1237 /* XXX: this function and rq_status are currently unused */
1238 void ptlrpc_restart_req(struct ptlrpc_request *req)
1239 {
1240         unsigned long flags;
1241
1242         DEBUG_REQ(D_HA, req, "restarting (possibly-)completed request");
1243         req->rq_status = -ERESTARTSYS;
1244
1245         spin_lock_irqsave (&req->rq_lock, flags);
1246         req->rq_restart = 1;
1247         req->rq_timedout = 0;
1248         if (req->rq_set != NULL)
1249                 wake_up (&req->rq_set->set_waitq);
1250         else
1251                 wake_up(&req->rq_reply_waitq);
1252         spin_unlock_irqrestore (&req->rq_lock, flags);
1253 }
1254
1255 static int expired_request(void *data)
1256 {
1257         struct ptlrpc_request *req = data;
1258         ENTRY;
1259
1260         RETURN(ptlrpc_expire_one_request(req));
1261 }
1262
1263 static void interrupted_request(void *data)
1264 {
1265         unsigned long flags;
1266
1267         struct ptlrpc_request *req = data;
1268         DEBUG_REQ(D_HA, req, "request interrupted");
1269         spin_lock_irqsave (&req->rq_lock, flags);
1270         req->rq_intr = 1;
1271         spin_unlock_irqrestore (&req->rq_lock, flags);
1272 }
1273
1274 struct ptlrpc_request *ptlrpc_request_addref(struct ptlrpc_request *req)
1275 {
1276         ENTRY;
1277         atomic_inc(&req->rq_refcount);
1278         RETURN(req);
1279 }
1280
1281 void ptlrpc_retain_replayable_request(struct ptlrpc_request *req,
1282                                       struct obd_import *imp)
1283 {
1284         struct list_head *tmp;
1285
1286 #ifdef CONFIG_SMP
1287         LASSERT(spin_is_locked(&imp->imp_lock));
1288 #endif
1289
1290         LASSERT(imp->imp_replayable);
1291         /* Balanced in ptlrpc_free_committed, usually. */
1292         ptlrpc_request_addref(req);
1293         list_for_each_prev(tmp, &imp->imp_replay_list) {
1294                 struct ptlrpc_request *iter =
1295                         list_entry(tmp, struct ptlrpc_request, rq_list);
1296
1297                 /* We may have duplicate transnos if we create and then
1298                  * open a file, or for closes retained if to match creating
1299                  * opens, so use req->rq_xid as a secondary key.
1300                  * (See bugs 684, 685, and 428.)
1301                  * XXX no longer needed, but all opens need transnos!
1302                  */
1303                 if (iter->rq_transno > req->rq_transno)
1304                         continue;
1305
1306                 if (iter->rq_transno == req->rq_transno) {
1307                         LASSERT(iter->rq_xid != req->rq_xid);
1308                         if (iter->rq_xid > req->rq_xid)
1309                                 continue;
1310                 }
1311
1312                 list_add(&req->rq_list, &iter->rq_list);
1313                 return;
1314         }
1315
1316         list_add_tail(&req->rq_list, &imp->imp_replay_list);
1317 }
1318
1319 int ptlrpc_queue_wait(struct ptlrpc_request *req)
1320 {
1321         int rc = 0;
1322         int brc;
1323         struct l_wait_info lwi;
1324         struct obd_import *imp = req->rq_import;
1325         unsigned long flags;
1326         int do_restart = 0;
1327         int timeout = 0;
1328         ENTRY;
1329
1330         LASSERT(req->rq_set == NULL);
1331         LASSERT(!req->rq_receiving_reply);
1332
1333         /* for distributed debugging */
1334         req->rq_reqmsg->status = current->pid;
1335         LASSERT(imp->imp_obd != NULL);
1336         CDEBUG(D_RPCTRACE, "Sending RPC pname:cluuid:pid:xid:ni:nid:opc "
1337                "%s:%s:%d:"LPU64":%s:"LPX64":%d\n", current->comm,
1338                imp->imp_obd->obd_uuid.uuid,
1339                req->rq_reqmsg->status, req->rq_xid,
1340                imp->imp_connection->c_peer.peer_ni->pni_name,
1341                imp->imp_connection->c_peer.peer_nid,
1342                req->rq_reqmsg->opc);
1343
1344         /* Mark phase here for a little debug help */
1345         req->rq_phase = RQ_PHASE_RPC;
1346
1347         spin_lock_irqsave(&imp->imp_lock, flags);
1348         req->rq_import_generation = imp->imp_generation;
1349 restart:
1350         if (ptlrpc_import_delay_req(imp, req, &rc)) {
1351                 list_del(&req->rq_list);
1352
1353                 list_add_tail(&req->rq_list, &imp->imp_delayed_list);
1354                 spin_unlock_irqrestore(&imp->imp_lock, flags);
1355
1356                 DEBUG_REQ(D_HA, req, "\"%s\" waiting for recovery: (%d > %d)",
1357                           current->comm, req->rq_send_state, imp->imp_state);
1358                 lwi = LWI_INTR(interrupted_request, req);
1359                 rc = l_wait_event(req->rq_reply_waitq,
1360                                   (req->rq_send_state == imp->imp_state ||
1361                                    req->rq_err),
1362                                   &lwi);
1363                 DEBUG_REQ(D_HA, req, "\"%s\" awake: (%d > %d or %d == 1)",
1364                           current->comm, imp->imp_state, req->rq_send_state,
1365                           req->rq_err);
1366
1367                 spin_lock_irqsave(&imp->imp_lock, flags);
1368                 list_del_init(&req->rq_list);
1369
1370                 if (req->rq_err) {
1371                         rc = -EIO;
1372                 } 
1373                 else if (req->rq_intr) {
1374                         rc = -EINTR;
1375                 }
1376                 else {
1377                         GOTO(restart, rc);
1378                 }
1379         } 
1380
1381         if (rc != 0) {
1382                 list_del_init(&req->rq_list);
1383                 spin_unlock_irqrestore(&imp->imp_lock, flags);
1384                 req->rq_status = rc; // XXX this ok?
1385                 GOTO(out, rc);
1386         }
1387
1388         /* XXX this is the same as ptlrpc_set_wait */
1389         LASSERT(list_empty(&req->rq_list));
1390         list_add_tail(&req->rq_list, &imp->imp_sending_list);
1391         spin_unlock_irqrestore(&imp->imp_lock, flags);
1392
1393         rc = ptl_send_rpc(req);
1394         if (rc) {
1395                 DEBUG_REQ(D_HA, req, "send failed (%d); recovering", rc);
1396                 timeout = 1;
1397         } else {
1398                 timeout = MAX(req->rq_timeout * HZ, 1);
1399                 DEBUG_REQ(D_NET, req, "-- sleeping");
1400         }
1401         lwi = LWI_TIMEOUT_INTR(timeout, expired_request, interrupted_request,
1402                                req);
1403         l_wait_event(req->rq_reply_waitq, ptlrpc_check_reply(req), &lwi);
1404         DEBUG_REQ(D_NET, req, "-- done sleeping");
1405
1406         CDEBUG(D_RPCTRACE, "Completed RPC pname:cluuid:pid:xid:ni:nid:opc "
1407                "%s:%s:%d:"LPU64":%s:"LPX64":%d\n", current->comm,
1408                imp->imp_obd->obd_uuid.uuid,
1409                req->rq_reqmsg->status, req->rq_xid,
1410                imp->imp_connection->c_peer.peer_ni->pni_name,
1411                imp->imp_connection->c_peer.peer_nid,
1412                req->rq_reqmsg->opc);
1413
1414         spin_lock_irqsave(&imp->imp_lock, flags);
1415         list_del_init(&req->rq_list);
1416         spin_unlock_irqrestore(&imp->imp_lock, flags);
1417
1418         /* If the reply was received normally, this just grabs the spinlock
1419          * (ensuring the reply callback has returned), sees that
1420          * req->rq_receiving_reply is clear and returns. */
1421         ptlrpc_unregister_reply (req);
1422
1423         if (req->rq_err)
1424                 GOTO(out, rc = -EIO);
1425
1426         /* Resend if we need to, unless we were interrupted. */
1427         if (req->rq_resend && !req->rq_intr) {
1428                 /* ...unless we were specifically told otherwise. */
1429                 if (req->rq_no_resend)
1430                         GOTO(out, rc = -ETIMEDOUT);
1431                 spin_lock_irqsave (&req->rq_lock, flags);
1432                 req->rq_resend = 0;
1433                 spin_unlock_irqrestore (&req->rq_lock, flags);
1434                 lustre_msg_add_flags(req->rq_reqmsg, MSG_RESENT);
1435
1436                 if (req->rq_bulk != NULL)
1437                         ptlrpc_unregister_bulk (req);
1438
1439                 DEBUG_REQ(D_HA, req, "resending: ");
1440                 spin_lock_irqsave(&imp->imp_lock, flags);
1441                 goto restart;
1442         }
1443
1444         if (req->rq_intr) {
1445                 /* Should only be interrupted if we timed out. */
1446                 if (!req->rq_timedout)
1447                         DEBUG_REQ(D_ERROR, req,
1448                                   "rq_intr set but rq_timedout not");
1449                 GOTO(out, rc = -EINTR);
1450         }
1451
1452         if (req->rq_timedout) {                 /* non-recoverable timeout */
1453                 GOTO(out, rc = -ETIMEDOUT);
1454         }
1455
1456         if (!req->rq_replied) {
1457                 /* How can this be? -eeb */
1458                 DEBUG_REQ(D_ERROR, req, "!rq_replied: ");
1459                 LBUG();
1460                 GOTO(out, rc = req->rq_status);
1461         }
1462
1463         rc = after_reply (req, &do_restart);
1464         /* NB may return +ve success rc */
1465         if (do_restart) {
1466                 if (req->rq_bulk != NULL)
1467                         ptlrpc_unregister_bulk (req);
1468                 DEBUG_REQ(D_HA, req, "resending: ");
1469                 spin_lock_irqsave(&imp->imp_lock, flags);
1470                 goto restart;
1471         }
1472
1473  out:
1474         if (req->rq_bulk != NULL) {
1475                 if (rc >= 0) {                  /* success so far */
1476                         lwi = LWI_TIMEOUT(timeout, NULL, NULL);
1477                         brc = l_wait_event(req->rq_reply_waitq,
1478                                            ptlrpc_bulk_complete(req->rq_bulk),
1479                                            &lwi);
1480                         if (brc != 0) {
1481                                 LASSERT(brc == -ETIMEDOUT);
1482                                 CERROR ("Timed out waiting for bulk\n");
1483                                 rc = brc;
1484                         }
1485                 }
1486                 if (rc < 0)
1487                         ptlrpc_unregister_bulk (req);
1488         }
1489
1490         LASSERT(!req->rq_receiving_reply);
1491         req->rq_phase = RQ_PHASE_INTERPRET;
1492         RETURN(rc);
1493 }
1494
1495 int ptlrpc_replay_req(struct ptlrpc_request *req)
1496 {
1497         int rc = 0, old_state, old_status = 0;
1498         // struct ptlrpc_client *cli = req->rq_import->imp_client;
1499         struct l_wait_info lwi;
1500         ENTRY;
1501
1502         LASSERT(req->rq_import->imp_state == LUSTRE_IMP_REPLAY);
1503
1504         /* I don't touch rq_phase here, so the debug log can show what
1505          * state it was left in */
1506
1507         /* Not handling automatic bulk replay yet (or ever?) */
1508         LASSERT(req->rq_bulk == NULL);
1509
1510         DEBUG_REQ(D_NET, req, "about to replay");
1511
1512         /* Update request's state, since we might have a new connection. */
1513         ptlrpc_put_connection(req->rq_connection);
1514         req->rq_connection =
1515                 ptlrpc_connection_addref(req->rq_import->imp_connection);
1516
1517         /* temporarily set request to REPLAY level---not strictly
1518          * necessary since ptl_send_rpc doesn't check state, but let's
1519          * be consistent.*/
1520         old_state = req->rq_send_state;
1521
1522         /*
1523          * Q: "How can a req get on the replay list if it wasn't replied?"
1524          * A: "If we failed during the replay of this request, it will still
1525          *     be on the list, but rq_replied will have been reset to 0."
1526          */
1527         if (req->rq_replied)
1528                 old_status = req->rq_repmsg->status;
1529         req->rq_send_state = LUSTRE_IMP_REPLAY;
1530         rc = ptl_send_rpc(req);
1531         if (rc) {
1532                 CERROR("error %d, opcode %d\n", rc, req->rq_reqmsg->opc);
1533                 ptlrpc_cleanup_request_buf(req);
1534                 // up(&cli->cli_rpc_sem);
1535                 GOTO(out, rc = -rc);
1536         }
1537
1538         CDEBUG(D_OTHER, "-- sleeping\n");
1539         lwi = LWI_INTR(NULL, NULL); /* XXX needs timeout, nested recovery */
1540         l_wait_event(req->rq_reply_waitq, ptlrpc_check_reply(req), &lwi);
1541         CDEBUG(D_OTHER, "-- done\n");
1542
1543         // up(&cli->cli_rpc_sem);
1544
1545         /* If the reply was received normally, this just grabs the spinlock
1546          * (ensuring the reply callback has returned), sees that
1547          * req->rq_receiving_reply is clear and returns. */
1548         ptlrpc_unregister_reply (req);
1549
1550         if (!req->rq_replied) {
1551                 CERROR("Unknown reason for wakeup\n");
1552                 /* XXX Phil - I end up here when I kill obdctl */
1553                 /* ...that's because signals aren't all masked in
1554                  * l_wait_event() -eeb */
1555                 GOTO(out, rc = -EINTR);
1556         }
1557
1558 #if SWAB_PARANOIA
1559         /* Clear reply swab mask; this is a new reply in sender's byte order */
1560         req->rq_rep_swab_mask = 0;
1561 #endif
1562         rc = lustre_unpack_msg(req->rq_repmsg, req->rq_replen);
1563         if (rc) {
1564                 CERROR("unpack_rep failed: %d\n", rc);
1565                 GOTO(out, rc = -EPROTO);
1566         }
1567 #if 0
1568         /* FIXME: Enable when BlueArc makes new release */
1569         if (req->rq_repmsg->type != PTL_RPC_MSG_REPLY &&
1570             req->rq_repmsg->type != PTL_RPC_MSG_ERR) {
1571                 CERROR("invalid packet type received (type=%u)\n",
1572                        req->rq_repmsg->type);
1573                 GOTO(out, rc = -EPROTO);
1574         }
1575 #endif
1576
1577         if (req->rq_repmsg->type == PTL_RPC_MSG_ERR && 
1578             req->rq_repmsg->status == -ENOTCONN) 
1579                 GOTO(out, rc = req->rq_repmsg->status);
1580
1581         /* The transno had better not change over replay. */
1582         LASSERT(req->rq_reqmsg->transno == req->rq_repmsg->transno);
1583
1584         CDEBUG(D_NET, "got rep "LPD64"\n", req->rq_xid);
1585
1586         /* let the callback do fixups, possibly including in the request */
1587         if (req->rq_replay_cb)
1588                 req->rq_replay_cb(req);
1589
1590         if (req->rq_replied && req->rq_repmsg->status != old_status) {
1591                 DEBUG_REQ(D_ERROR, req, "status %d, old was %d",
1592                           req->rq_repmsg->status, old_status);
1593         } else {
1594                 /* Put it back for re-replay. */
1595                 req->rq_status = old_status;
1596         }
1597
1598  out:
1599         req->rq_send_state = old_state;
1600         RETURN(rc);
1601 }
1602
1603 void ptlrpc_abort_inflight(struct obd_import *imp)
1604 {
1605         unsigned long flags;
1606         struct list_head *tmp, *n;
1607         ENTRY;
1608
1609         /* Make sure that no new requests get processed for this import.
1610          * ptlrpc_{queue,set}_wait must (and does) hold imp_lock while testing
1611          * this flag and then putting requests on sending_list or delayed_list.
1612          */
1613         spin_lock_irqsave(&imp->imp_lock, flags);
1614
1615         /* XXX locking?  Maybe we should remove each request with the list
1616          * locked?  Also, how do we know if the requests on the list are
1617          * being freed at this time?
1618          */
1619         list_for_each_safe(tmp, n, &imp->imp_sending_list) {
1620                 struct ptlrpc_request *req =
1621                         list_entry(tmp, struct ptlrpc_request, rq_list);
1622
1623                 DEBUG_REQ(D_HA, req, "inflight");
1624
1625                 spin_lock (&req->rq_lock);
1626                 if (req->rq_import_generation < imp->imp_generation) {
1627                         req->rq_err = 1;
1628                         if (req->rq_set != NULL)
1629                                 wake_up(&req->rq_set->set_waitq);
1630                         else
1631                                 wake_up(&req->rq_reply_waitq);
1632                 }
1633                 spin_unlock (&req->rq_lock);
1634         }
1635
1636         list_for_each_safe(tmp, n, &imp->imp_delayed_list) {
1637                 struct ptlrpc_request *req =
1638                         list_entry(tmp, struct ptlrpc_request, rq_list);
1639
1640                 DEBUG_REQ(D_HA, req, "aborting waiting req");
1641
1642                 spin_lock (&req->rq_lock);
1643                 if (req->rq_import_generation < imp->imp_generation) {
1644                         req->rq_err = 1;
1645                         if (req->rq_set != NULL)
1646                                 wake_up(&req->rq_set->set_waitq);
1647                         else
1648                                 wake_up(&req->rq_reply_waitq);
1649                 }
1650                 spin_unlock (&req->rq_lock);
1651         }
1652
1653         /* Last chance to free reqs left on the replay list, but we
1654          * will still leak reqs that haven't comitted.  */
1655         if (imp->imp_replayable)
1656                 ptlrpc_free_committed(imp);
1657
1658         spin_unlock_irqrestore(&imp->imp_lock, flags);
1659
1660         EXIT;
1661 }
1662
1663 static __u64 ptlrpc_last_xid = 0;
1664 static spinlock_t ptlrpc_last_xid_lock = SPIN_LOCK_UNLOCKED;
1665
1666 __u64 ptlrpc_next_xid(void)
1667 {
1668         __u64 tmp;
1669         spin_lock(&ptlrpc_last_xid_lock);
1670         tmp = ++ptlrpc_last_xid;
1671         spin_unlock(&ptlrpc_last_xid_lock);
1672         return tmp;
1673 }
1674
1675