Whamcloud - gitweb
land b1_2_bug2248 on b1_4
[fs/lustre-release.git] / lustre / ptlrpc / client.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  *
21  */
22
23 #define DEBUG_SUBSYSTEM S_RPC
24 #ifndef __KERNEL__
25 #include <errno.h>
26 #include <signal.h>
27 #include <liblustre.h>
28 #endif
29
30 #include <linux/obd_support.h>
31 #include <linux/obd_class.h>
32 #include <linux/lustre_lib.h>
33 #include <linux/lustre_ha.h>
34 #include <linux/lustre_import.h>
35
36 #include "ptlrpc_internal.h"
37
38 void ptlrpc_init_client(int req_portal, int rep_portal, char *name,
39                         struct ptlrpc_client *cl)
40 {
41         cl->cli_request_portal = req_portal;
42         cl->cli_reply_portal   = rep_portal;
43         cl->cli_name           = name;
44 }
45
46 struct ptlrpc_connection *ptlrpc_uuid_to_connection(struct obd_uuid *uuid)
47 {
48         struct ptlrpc_connection *c;
49         struct ptlrpc_peer peer;
50         int err;
51
52         err = ptlrpc_uuid_to_peer(uuid, &peer);
53         if (err != 0) {
54                 CERROR("cannot find peer %s!\n", uuid->uuid);
55                 return NULL;
56         }
57
58         c = ptlrpc_get_connection(&peer, uuid);
59         if (c) {
60                 memcpy(c->c_remote_uuid.uuid,
61                        uuid->uuid, sizeof(c->c_remote_uuid.uuid));
62         }
63
64         CDEBUG(D_INFO, "%s -> %p\n", uuid->uuid, c);
65
66         return c;
67 }
68
69 void ptlrpc_readdress_connection(struct ptlrpc_connection *conn,
70                                  struct obd_uuid *uuid)
71 {
72         struct ptlrpc_peer peer;
73         int err;
74
75         err = ptlrpc_uuid_to_peer(uuid, &peer);
76         if (err != 0) {
77                 CERROR("cannot find peer %s!\n", uuid->uuid);
78                 return;
79         }
80
81         memcpy(&conn->c_peer, &peer, sizeof (peer));
82         return;
83 }
84
85 static inline struct ptlrpc_bulk_desc *new_bulk(int npages, int type, int portal)
86 {
87         struct ptlrpc_bulk_desc *desc;
88
89         OBD_ALLOC(desc, offsetof (struct ptlrpc_bulk_desc, bd_iov[npages]));
90         if (!desc)
91                 return NULL;
92
93         spin_lock_init(&desc->bd_lock);
94         init_waitqueue_head(&desc->bd_waitq);
95         desc->bd_max_iov = npages;
96         desc->bd_iov_count = 0;
97         desc->bd_md_h = PTL_INVALID_HANDLE;
98         desc->bd_portal = portal;
99         desc->bd_type = type;
100         
101         return desc;
102 }
103
104 struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_imp (struct ptlrpc_request *req,
105                                                int npages, int type, int portal)
106 {
107         struct obd_import *imp = req->rq_import;
108         struct ptlrpc_bulk_desc *desc;
109
110         LASSERT(type == BULK_PUT_SINK || type == BULK_GET_SOURCE);
111         desc = new_bulk(npages, type, portal);
112         if (desc == NULL)
113                 RETURN(NULL);
114
115         desc->bd_import_generation = req->rq_import_generation;
116         desc->bd_import = class_import_get(imp);
117         desc->bd_req = req;
118
119         desc->bd_cbid.cbid_fn  = client_bulk_callback;
120         desc->bd_cbid.cbid_arg = desc;
121
122         /* This makes req own desc, and free it when she frees herself */
123         req->rq_bulk = desc;
124
125         return desc;
126 }
127
128 struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_exp (struct ptlrpc_request *req,
129                                                int npages, int type, int portal)
130 {
131         struct obd_export *exp = req->rq_export;
132         struct ptlrpc_bulk_desc *desc;
133
134         LASSERT(type == BULK_PUT_SOURCE || type == BULK_GET_SINK);
135
136         desc = new_bulk(npages, type, portal);
137         if (desc == NULL)
138                 RETURN(NULL);
139
140         desc->bd_export = class_export_get(exp);
141         desc->bd_req = req;
142
143         desc->bd_cbid.cbid_fn  = server_bulk_callback;
144         desc->bd_cbid.cbid_arg = desc;
145
146         /* NB we don't assign rq_bulk here; server-side requests are
147          * re-used, and the handler frees the bulk desc explicitly. */
148
149         return desc;
150 }
151
152 void ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc,
153                            struct page *page, int pageoffset, int len)
154 {
155         LASSERT(desc->bd_iov_count < desc->bd_max_iov);
156         LASSERT(page != NULL);
157         LASSERT(pageoffset >= 0);
158         LASSERT(len > 0);
159         LASSERT(pageoffset + len <= PAGE_SIZE);
160
161         desc->bd_nob += len;
162
163         ptlrpc_add_bulk_page(desc, page, pageoffset, len);
164 }
165
166 void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *desc)
167 {
168         ENTRY;
169
170         LASSERT(desc != NULL);
171         LASSERT(desc->bd_iov_count != LI_POISON); /* not freed already */
172         LASSERT(!desc->bd_network_rw);         /* network hands off or */
173         LASSERT((desc->bd_export != NULL) ^ (desc->bd_import != NULL));
174         if (desc->bd_export)
175                 class_export_put(desc->bd_export);
176         else
177                 class_import_put(desc->bd_import);
178
179         OBD_FREE(desc, offsetof(struct ptlrpc_bulk_desc, 
180                                 bd_iov[desc->bd_max_iov]));
181         EXIT;
182 }
183
184 struct ptlrpc_request *ptlrpc_prep_req(struct obd_import *imp, int opcode,
185                                        int count, int *lengths, char **bufs)
186 {
187         struct ptlrpc_request *request;
188         int rc;
189         ENTRY;
190
191         LASSERT((unsigned long)imp > 0x1000);
192
193         OBD_ALLOC(request, sizeof(*request));
194         if (!request) {
195                 CERROR("request allocation out of memory\n");
196                 RETURN(NULL);
197         }
198
199         rc = lustre_pack_request(request, count, lengths, bufs);
200         if (rc) {
201                 CERROR("cannot pack request %d\n", rc);
202                 OBD_FREE(request, sizeof(*request));
203                 RETURN(NULL);
204         }
205
206         if (imp->imp_server_timeout)
207                 request->rq_timeout = obd_timeout / 2;
208         else
209                 request->rq_timeout = obd_timeout;
210         request->rq_send_state = LUSTRE_IMP_FULL;
211         request->rq_type = PTL_RPC_MSG_REQUEST;
212         request->rq_import = class_import_get(imp);
213
214         request->rq_req_cbid.cbid_fn  = request_out_callback;
215         request->rq_req_cbid.cbid_arg = request;
216
217         request->rq_reply_cbid.cbid_fn  = reply_in_callback;
218         request->rq_reply_cbid.cbid_arg = request;
219         
220         request->rq_phase = RQ_PHASE_NEW;
221
222         /* XXX FIXME bug 249 */
223         request->rq_request_portal = imp->imp_client->cli_request_portal;
224         request->rq_reply_portal = imp->imp_client->cli_reply_portal;
225
226         spin_lock_init(&request->rq_lock);
227         INIT_LIST_HEAD(&request->rq_list);
228         INIT_LIST_HEAD(&request->rq_replay_list);
229         INIT_LIST_HEAD(&request->rq_set_chain);
230         init_waitqueue_head(&request->rq_reply_waitq);
231         request->rq_xid = ptlrpc_next_xid();
232         atomic_set(&request->rq_refcount, 1);
233
234         request->rq_reqmsg->opc = opcode;
235         request->rq_reqmsg->flags = 0;
236
237         RETURN(request);
238 }
239
240 struct ptlrpc_request_set *ptlrpc_prep_set(void)
241 {
242         struct ptlrpc_request_set *set;
243
244         OBD_ALLOC(set, sizeof *set);
245         if (!set)
246                 RETURN(NULL);
247         INIT_LIST_HEAD(&set->set_requests);
248         init_waitqueue_head(&set->set_waitq);
249         set->set_remaining = 0;
250         spin_lock_init(&set->set_new_req_lock);
251         INIT_LIST_HEAD(&set->set_new_requests);
252
253         RETURN(set);
254 }
255
256 /* Finish with this set; opposite of prep_set. */
257 void ptlrpc_set_destroy(struct ptlrpc_request_set *set)
258 {
259         struct list_head *tmp;
260         struct list_head *next;
261         int               expected_phase;
262         int               n = 0;
263         ENTRY;
264
265         /* Requests on the set should either all be completed, or all be new */
266         expected_phase = (set->set_remaining == 0) ?
267                          RQ_PHASE_COMPLETE : RQ_PHASE_NEW;
268         list_for_each (tmp, &set->set_requests) {
269                 struct ptlrpc_request *req =
270                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
271
272                 LASSERT(req->rq_phase == expected_phase);
273                 n++;
274         }
275
276         LASSERT(set->set_remaining == 0 || set->set_remaining == n);
277
278         list_for_each_safe(tmp, next, &set->set_requests) {
279                 struct ptlrpc_request *req =
280                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
281                 list_del_init(&req->rq_set_chain);
282
283                 LASSERT(req->rq_phase == expected_phase);
284
285                 if (req->rq_phase == RQ_PHASE_NEW) {
286
287                         if (req->rq_interpret_reply != NULL) {
288                                 int (*interpreter)(struct ptlrpc_request *,
289                                                    void *, int) =
290                                         req->rq_interpret_reply;
291
292                                 /* higher level (i.e. LOV) failed;
293                                  * let the sub reqs clean up */
294                                 req->rq_status = -EBADR;
295                                 interpreter(req, &req->rq_async_args,
296                                             req->rq_status);
297                         }
298                         set->set_remaining--;
299                 }
300
301                 req->rq_set = NULL;
302                 ptlrpc_req_finished (req);
303         }
304
305         LASSERT(set->set_remaining == 0);
306
307         OBD_FREE(set, sizeof(*set));
308         EXIT;
309 }
310
311 void ptlrpc_set_add_req(struct ptlrpc_request_set *set,
312                         struct ptlrpc_request *req)
313 {
314         /* The set takes over the caller's request reference */
315         list_add_tail(&req->rq_set_chain, &set->set_requests);
316         req->rq_set = set;
317         set->set_remaining++;
318         atomic_inc(&req->rq_import->imp_inflight);
319 }
320
321 /* lock so many callers can add things, the context that owns the set
322  * is supposed to notice these and move them into the set proper. */
323 void ptlrpc_set_add_new_req(struct ptlrpc_request_set *set,
324                             struct ptlrpc_request *req)
325 {
326         unsigned long flags;
327         spin_lock_irqsave(&set->set_new_req_lock, flags);
328         /* The set takes over the caller's request reference */
329         list_add_tail(&req->rq_set_chain, &set->set_new_requests);
330         req->rq_set = set;
331         spin_unlock_irqrestore(&set->set_new_req_lock, flags);
332 }
333
334 /*
335  * Based on the current state of the import, determine if the request
336  * can be sent, is an error, or should be delayed.
337  *
338  * Returns true if this request should be delayed. If false, and
339  * *status is set, then the request can not be sent and *status is the
340  * error code.  If false and status is 0, then request can be sent.
341  *
342  * The imp->imp_lock must be held.
343  */
344 static int ptlrpc_import_delay_req(struct obd_import *imp, 
345                                    struct ptlrpc_request *req, int *status)
346 {
347         int delay = 0;
348         ENTRY;
349
350         LASSERT (status != NULL);
351         *status = 0;
352
353         if (imp->imp_state == LUSTRE_IMP_NEW) {
354                 DEBUG_REQ(D_ERROR, req, "Uninitialized import.");
355                 *status = -EIO;
356                 LBUG();
357         }
358         else if (imp->imp_state == LUSTRE_IMP_CLOSED) {
359                 DEBUG_REQ(D_ERROR, req, "IMP_CLOSED ");
360                 *status = -EIO;
361         }
362         /* allow CONNECT even if import is invalid */
363         else if (req->rq_send_state == LUSTRE_IMP_CONNECTING &&
364                  imp->imp_state == LUSTRE_IMP_CONNECTING) {
365                 ;
366         }
367         /*
368          * If the import has been invalidated (such as by an OST failure), the
369          * request must fail with -EIO.  
370          */
371         else if (imp->imp_invalid) {
372                 DEBUG_REQ(D_ERROR, req, "IMP_INVALID");
373                 *status = -EIO;
374         } 
375         else if (req->rq_import_generation != imp->imp_generation) {
376                 DEBUG_REQ(D_ERROR, req, "req wrong generation:");
377                 *status = -EIO;
378         } 
379         else if (req->rq_send_state != imp->imp_state) {
380                 if (imp->imp_obd->obd_no_recov || imp->imp_dlm_fake 
381                     || req->rq_no_delay) 
382                         *status = -EWOULDBLOCK;
383                 else
384                         delay = 1;
385         }
386
387         RETURN(delay);
388 }
389
390 static int ptlrpc_check_reply(struct ptlrpc_request *req)
391 {
392         unsigned long flags;
393         int rc = 0;
394         ENTRY;
395
396         /* serialise with network callback */
397         spin_lock_irqsave (&req->rq_lock, flags);
398
399         if (req->rq_replied) {
400                 DEBUG_REQ(D_NET, req, "REPLIED:");
401                 GOTO(out, rc = 1);
402         }
403         
404         if (req->rq_net_err && !req->rq_timedout) {
405                 spin_unlock_irqrestore (&req->rq_lock, flags);
406                 rc = ptlrpc_expire_one_request(req); 
407                 spin_lock_irqsave (&req->rq_lock, flags);
408                 GOTO(out, rc);
409         }
410
411         if (req->rq_err) {
412                 DEBUG_REQ(D_ERROR, req, "ABORTED:");
413                 GOTO(out, rc = 1);
414         }
415
416         if (req->rq_resend) {
417                 DEBUG_REQ(D_ERROR, req, "RESEND:");
418                 GOTO(out, rc = 1);
419         }
420
421         if (req->rq_restart) {
422                 DEBUG_REQ(D_ERROR, req, "RESTART:");
423                 GOTO(out, rc = 1);
424         }
425         EXIT;
426  out:
427         spin_unlock_irqrestore (&req->rq_lock, flags);
428         DEBUG_REQ(D_NET, req, "rc = %d for", rc);
429         return rc;
430 }
431
432 static int ptlrpc_check_status(struct ptlrpc_request *req)
433 {
434         int err;
435         ENTRY;
436
437         err = req->rq_repmsg->status;
438         if (req->rq_repmsg->type == PTL_RPC_MSG_ERR) {
439                 DEBUG_REQ(D_ERROR, req, "type == PTL_RPC_MSG_ERR, err == %d", 
440                           err);
441                 RETURN(err < 0 ? err : -EINVAL);
442         }
443
444         if (err < 0) {
445                 DEBUG_REQ(D_INFO, req, "status is %d", err);
446         } else if (err > 0) {
447                 /* XXX: translate this error from net to host */
448                 DEBUG_REQ(D_INFO, req, "status is %d", err);
449         }
450
451         RETURN(err);
452 }
453
454 static int after_reply(struct ptlrpc_request *req)
455 {
456         unsigned long flags;
457         struct obd_import *imp = req->rq_import;
458         int rc;
459         ENTRY;
460
461         LASSERT(!req->rq_receiving_reply);
462
463         /* NB Until this point, the whole of the incoming message,
464          * including buflens, status etc is in the sender's byte order. */
465
466 #if SWAB_PARANOIA
467         /* Clear reply swab mask; this is a new reply in sender's byte order */
468         req->rq_rep_swab_mask = 0;
469 #endif
470         LASSERT (req->rq_nob_received <= req->rq_replen);
471         rc = lustre_unpack_msg(req->rq_repmsg, req->rq_nob_received);
472         if (rc) {
473                 CERROR("unpack_rep failed: %d\n", rc);
474                 RETURN(-EPROTO);
475         }
476
477         if (req->rq_repmsg->type != PTL_RPC_MSG_REPLY &&
478             req->rq_repmsg->type != PTL_RPC_MSG_ERR) {
479                 CERROR("invalid packet type received (type=%u)\n",
480                        req->rq_repmsg->type);
481                 RETURN(-EPROTO);
482         }
483
484         rc = ptlrpc_check_status(req);
485
486         /* Either we've been evicted, or the server has failed for
487          * some reason. Try to reconnect, and if that fails, punt to the
488          * upcall. */
489         if (rc == -ENOTCONN) {
490                 if (req->rq_send_state != LUSTRE_IMP_FULL ||
491                     imp->imp_obd->obd_no_recov || imp->imp_dlm_fake) {
492                         RETURN(-ENOTCONN);
493                 }
494
495                 ptlrpc_request_handle_notconn(req);
496
497                 RETURN(rc);
498         }
499
500         /* Store transno in reqmsg for replay. */
501         req->rq_reqmsg->transno = req->rq_transno = req->rq_repmsg->transno;
502
503         if (req->rq_import->imp_replayable) {
504                 spin_lock_irqsave(&imp->imp_lock, flags);
505                 if (req->rq_replay || req->rq_transno != 0)
506                         ptlrpc_retain_replayable_request(req, imp);
507                 else if (req->rq_commit_cb != NULL)
508                         req->rq_commit_cb(req);
509
510                 if (req->rq_transno > imp->imp_max_transno)
511                         imp->imp_max_transno = req->rq_transno;
512
513                 /* Replay-enabled imports return commit-status information. */
514                 if (req->rq_repmsg->last_committed)
515                         imp->imp_peer_committed_transno =
516                                 req->rq_repmsg->last_committed;
517                 ptlrpc_free_committed(imp);
518                 spin_unlock_irqrestore(&imp->imp_lock, flags);
519         }
520
521         RETURN(rc);
522 }
523
524 static int ptlrpc_send_new_req(struct ptlrpc_request *req)
525 {
526         char                   str[PTL_NALFMT_SIZE];
527         struct obd_import     *imp;
528         unsigned long          flags;
529         int rc;
530         ENTRY;
531
532         LASSERT(req->rq_phase == RQ_PHASE_NEW);
533         req->rq_phase = RQ_PHASE_RPC;
534
535         imp = req->rq_import;
536         spin_lock_irqsave(&imp->imp_lock, flags);
537
538         req->rq_import_generation = imp->imp_generation;
539
540         if (ptlrpc_import_delay_req(imp, req, &rc)) {
541                 spin_lock (&req->rq_lock);
542                 req->rq_waiting = 1;
543                 spin_unlock (&req->rq_lock);
544
545                 DEBUG_REQ(D_HA, req, "req from PID %d waiting for recovery: "
546                           "(%s != %s)",
547                           req->rq_reqmsg->status, 
548                           ptlrpc_import_state_name(req->rq_send_state),
549                           ptlrpc_import_state_name(imp->imp_state));
550                 LASSERT(list_empty (&req->rq_list));
551
552                 list_add_tail(&req->rq_list, &imp->imp_delayed_list);
553                 spin_unlock_irqrestore(&imp->imp_lock, flags);
554                 RETURN(0);
555         }
556
557         if (rc != 0) {
558                 spin_unlock_irqrestore(&imp->imp_lock, flags);
559                 req->rq_status = rc;
560                 req->rq_phase = RQ_PHASE_INTERPRET;
561                 RETURN(rc);
562         }
563
564         /* XXX this is the same as ptlrpc_queue_wait */
565         LASSERT(list_empty(&req->rq_list));
566         list_add_tail(&req->rq_list, &imp->imp_sending_list);
567         spin_unlock_irqrestore(&imp->imp_lock, flags);
568
569         req->rq_reqmsg->status = current->pid;
570         CDEBUG(D_RPCTRACE, "Sending RPC pname:cluuid:pid:xid:ni:nid:opc"
571                " %s:%s:%d:"LPU64":%s:%s:%d\n", current->comm,
572                imp->imp_obd->obd_uuid.uuid, req->rq_reqmsg->status,
573                req->rq_xid,
574                imp->imp_connection->c_peer.peer_ni->pni_name,
575                ptlrpc_peernid2str(&imp->imp_connection->c_peer, str),
576                req->rq_reqmsg->opc);
577
578         rc = ptl_send_rpc(req);
579         if (rc) {
580                 DEBUG_REQ(D_HA, req, "send failed (%d); expect timeout", rc);
581                 req->rq_net_err = 1;
582                 RETURN(rc);
583         }
584         RETURN(0);
585 }
586
587 int ptlrpc_check_set(struct ptlrpc_request_set *set)
588 {
589         char str[PTL_NALFMT_SIZE];
590         unsigned long flags;
591         struct list_head *tmp;
592         int force_timer_recalc = 0;
593         ENTRY;
594
595         if (set->set_remaining == 0)
596                 RETURN(1);
597
598         list_for_each(tmp, &set->set_requests) {
599                 struct ptlrpc_request *req =
600                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
601                 struct obd_import *imp = req->rq_import;
602                 int rc = 0;
603
604                 if (req->rq_phase == RQ_PHASE_NEW &&
605                     ptlrpc_send_new_req(req)) {
606                         force_timer_recalc = 1;
607                 }
608
609                 if (!(req->rq_phase == RQ_PHASE_RPC ||
610                       req->rq_phase == RQ_PHASE_BULK ||
611                       req->rq_phase == RQ_PHASE_INTERPRET ||
612                       req->rq_phase == RQ_PHASE_COMPLETE)) {
613                         DEBUG_REQ(D_ERROR, req, "bad phase %x", req->rq_phase);
614                         LBUG();
615                 }
616
617                 if (req->rq_phase == RQ_PHASE_COMPLETE)
618                         continue;
619
620                 if (req->rq_phase == RQ_PHASE_INTERPRET)
621                         GOTO(interpret, req->rq_status);
622
623                 if (req->rq_net_err && !req->rq_timedout)
624                         ptlrpc_expire_one_request(req); 
625
626                 if (req->rq_err) {
627                         ptlrpc_unregister_reply(req);
628                         if (req->rq_status == 0)
629                                 req->rq_status = -EIO;
630                         req->rq_phase = RQ_PHASE_INTERPRET;
631
632                         spin_lock_irqsave(&imp->imp_lock, flags);
633                         list_del_init(&req->rq_list);
634                         spin_unlock_irqrestore(&imp->imp_lock, flags);
635
636                         GOTO(interpret, req->rq_status);
637                 }
638
639                 /* ptlrpc_queue_wait->l_wait_event guarantees that rq_intr
640                  * will only be set after rq_timedout, but the oig waiting
641                  * path sets rq_intr irrespective of whether ptlrpcd has
642                  * seen a timeout.  our policy is to only interpret 
643                  * interrupted rpcs after they have timed out */
644                 if (req->rq_intr && (req->rq_timedout || req->rq_waiting)) {
645                         /* NB could be on delayed list */
646                         ptlrpc_unregister_reply(req);
647                         req->rq_status = -EINTR;
648                         req->rq_phase = RQ_PHASE_INTERPRET;
649
650                         spin_lock_irqsave(&imp->imp_lock, flags);
651                         list_del_init(&req->rq_list);
652                         spin_unlock_irqrestore(&imp->imp_lock, flags);
653
654                         GOTO(interpret, req->rq_status);
655                 }
656
657                 if (req->rq_phase == RQ_PHASE_RPC) {
658                         if (req->rq_timedout||req->rq_waiting||req->rq_resend) {
659                                 int status;
660
661                                 ptlrpc_unregister_reply(req);
662
663                                 spin_lock_irqsave(&imp->imp_lock, flags);
664
665                                 if (ptlrpc_import_delay_req(imp, req, &status)){
666                                         spin_unlock_irqrestore(&imp->imp_lock,
667                                                                flags);
668                                         continue;
669                                 }
670
671                                 list_del_init(&req->rq_list);
672                                 if (status != 0)  {
673                                         req->rq_status = status;
674                                         req->rq_phase = RQ_PHASE_INTERPRET;
675                                         spin_unlock_irqrestore(&imp->imp_lock,
676                                                                flags);
677                                         GOTO(interpret, req->rq_status);
678                                 }
679                                 if (req->rq_no_resend) {
680                                         req->rq_status = -ENOTCONN;
681                                         req->rq_phase = RQ_PHASE_INTERPRET;
682                                         spin_unlock_irqrestore(&imp->imp_lock,
683                                                                flags);
684                                         GOTO(interpret, req->rq_status);
685                                 }
686                                 list_add_tail(&req->rq_list,
687                                               &imp->imp_sending_list);
688
689                                 spin_unlock_irqrestore(&imp->imp_lock, flags);
690
691                                 req->rq_waiting = 0;
692                                 if (req->rq_resend) {
693                                         lustre_msg_add_flags(req->rq_reqmsg,
694                                                              MSG_RESENT);
695                                         if (req->rq_bulk) {
696                                                 __u64 old_xid = req->rq_xid;
697
698                                                 ptlrpc_unregister_bulk (req);
699
700                                                 /* ensure previous bulk fails */
701                                                 req->rq_xid = ptlrpc_next_xid();
702                                                 CDEBUG(D_HA, "resend bulk "
703                                                        "old x"LPU64
704                                                        " new x"LPU64"\n",
705                                                        old_xid, req->rq_xid);
706                                         }
707                                 }
708
709                                 rc = ptl_send_rpc(req);
710                                 if (rc) {
711                                         DEBUG_REQ(D_HA, req, "send failed (%d)",
712                                                   rc);
713                                         force_timer_recalc = 1;
714                                         req->rq_net_err = 1;
715                                 }
716                                 /* need to reset the timeout */
717                                 force_timer_recalc = 1;
718                         }
719
720                         /* Still waiting for a reply? */
721                         if (ptlrpc_client_receiving_reply(req))
722                                 continue;
723
724                         /* Did we actually receive a reply? */
725                         if (!ptlrpc_client_replied(req))
726                                 continue;
727
728                         spin_lock_irqsave(&imp->imp_lock, flags);
729                         list_del_init(&req->rq_list);
730                         spin_unlock_irqrestore(&imp->imp_lock, flags);
731
732                         req->rq_status = after_reply(req);
733                         if (req->rq_resend) {
734                                 /* Add this req to the delayed list so
735                                    it can be errored if the import is
736                                    evicted after recovery. */
737                                 spin_lock_irqsave (&req->rq_lock, flags);
738                                 list_add_tail(&req->rq_list, 
739                                               &imp->imp_delayed_list);
740                                 spin_unlock_irqrestore(&req->rq_lock, flags);
741                                 continue;
742                         }
743
744                         /* If there is no bulk associated with this request,
745                          * then we're done and should let the interpreter
746                          * process the reply.  Similarly if the RPC returned
747                          * an error, and therefore the bulk will never arrive.
748                          */
749                         if (req->rq_bulk == NULL || req->rq_status != 0) {
750                                 req->rq_phase = RQ_PHASE_INTERPRET;
751                                 GOTO(interpret, req->rq_status);
752                         }
753
754                         req->rq_phase = RQ_PHASE_BULK;
755                 }
756
757                 LASSERT(req->rq_phase == RQ_PHASE_BULK);
758                 if (ptlrpc_bulk_active(req->rq_bulk))
759                         continue;
760
761                 if (!req->rq_bulk->bd_success) {
762                         /* The RPC reply arrived OK, but the bulk screwed
763                          * up!  Dead wierd since the server told us the RPC
764                          * was good after getting the REPLY for her GET or
765                          * the ACK for her PUT. */
766                         DEBUG_REQ(D_ERROR, req, "bulk transfer failed");
767                         LBUG();
768                 }
769
770                 req->rq_phase = RQ_PHASE_INTERPRET;
771
772         interpret:
773                 LASSERT(req->rq_phase == RQ_PHASE_INTERPRET);
774                 LASSERT(!req->rq_receiving_reply);
775
776                 ptlrpc_unregister_reply(req);
777                 if (req->rq_bulk != NULL)
778                         ptlrpc_unregister_bulk (req);
779
780                 req->rq_phase = RQ_PHASE_COMPLETE;
781
782                 if (req->rq_interpret_reply != NULL) {
783                         int (*interpreter)(struct ptlrpc_request *,void *,int) =
784                                 req->rq_interpret_reply;
785                         req->rq_status = interpreter(req, &req->rq_async_args,
786                                                      req->rq_status);
787                 }
788
789                 CDEBUG(D_RPCTRACE, "Completed RPC pname:cluuid:pid:xid:ni:nid:"
790                        "opc %s:%s:%d:"LPU64":%s:%s:%d\n", current->comm,
791                        imp->imp_obd->obd_uuid.uuid, req->rq_reqmsg->status,
792                        req->rq_xid,
793                        imp->imp_connection->c_peer.peer_ni->pni_name,
794                        ptlrpc_peernid2str(&imp->imp_connection->c_peer, str),
795                        req->rq_reqmsg->opc);
796
797                 set->set_remaining--;
798
799                 atomic_dec(&imp->imp_inflight);
800                 wake_up(&imp->imp_recovery_waitq);
801         }
802
803         /* If we hit an error, we want to recover promptly. */
804         RETURN(set->set_remaining == 0 || force_timer_recalc);
805 }
806
807 int ptlrpc_expire_one_request(struct ptlrpc_request *req)
808 {
809         unsigned long      flags;
810         struct obd_import *imp = req->rq_import;
811         ENTRY;
812
813         DEBUG_REQ(D_ERROR, req, "timeout (sent at %lu, %lus ago)",
814                   (long)req->rq_sent, CURRENT_SECONDS - req->rq_sent);
815
816         spin_lock_irqsave (&req->rq_lock, flags);
817         req->rq_timedout = 1;
818         spin_unlock_irqrestore (&req->rq_lock, flags);
819
820         ptlrpc_unregister_reply (req);
821
822         if (obd_dump_on_timeout)
823                 portals_debug_dumplog();
824
825         if (req->rq_bulk != NULL)
826                 ptlrpc_unregister_bulk (req);
827
828         if (imp == NULL) {
829                 DEBUG_REQ(D_HA, req, "NULL import: already cleaned up?");
830                 RETURN(1);
831         }
832
833         /* The DLM server doesn't want recovery run on its imports. */
834         if (imp->imp_dlm_fake)
835                 RETURN(1);
836
837         /* If this request is for recovery or other primordial tasks,
838          * then error it out here. */
839         if (req->rq_send_state != LUSTRE_IMP_FULL ||
840             imp->imp_obd->obd_no_recov) {
841                 spin_lock_irqsave (&req->rq_lock, flags);
842                 req->rq_status = -ETIMEDOUT;
843                 req->rq_err = 1;
844                 spin_unlock_irqrestore (&req->rq_lock, flags);
845                 RETURN(1);
846         }
847
848         ptlrpc_fail_import(imp, req->rq_import_generation);
849
850         RETURN(0);
851 }
852
853 int ptlrpc_expired_set(void *data)
854 {
855         struct ptlrpc_request_set *set = data;
856         struct list_head          *tmp;
857         time_t                     now = CURRENT_SECONDS;
858         ENTRY;
859
860         LASSERT(set != NULL);
861
862         /* A timeout expired; see which reqs it applies to... */
863         list_for_each (tmp, &set->set_requests) {
864                 struct ptlrpc_request *req =
865                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
866
867                 /* request in-flight? */
868                 if (!((req->rq_phase == RQ_PHASE_RPC && !req->rq_waiting 
869                        && !req->rq_resend) ||
870                       (req->rq_phase == RQ_PHASE_BULK)))
871                         continue;
872
873                 if (req->rq_timedout ||           /* already dealt with */
874                     req->rq_sent + req->rq_timeout > now) /* not expired */
875                         continue;
876
877                 /* deal with this guy */
878                 ptlrpc_expire_one_request (req);
879         }
880
881         /* When waiting for a whole set, we always to break out of the
882          * sleep so we can recalculate the timeout, or enable interrupts
883          * iff everyone's timed out.
884          */
885         RETURN(1);
886 }
887
888 void ptlrpc_mark_interrupted(struct ptlrpc_request *req)
889 {
890         unsigned long flags;
891         spin_lock_irqsave(&req->rq_lock, flags);
892         req->rq_intr = 1;
893         spin_unlock_irqrestore(&req->rq_lock, flags);
894 }
895
896 void ptlrpc_interrupted_set(void *data)
897 {
898         struct ptlrpc_request_set *set = data;
899         struct list_head *tmp;
900
901         LASSERT(set != NULL);
902         CERROR("INTERRUPTED SET %p\n", set);
903
904         list_for_each(tmp, &set->set_requests) {
905                 struct ptlrpc_request *req =
906                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
907
908                 if (req->rq_phase != RQ_PHASE_RPC)
909                         continue;
910
911                 ptlrpc_mark_interrupted(req);
912         }
913 }
914
915 int ptlrpc_set_next_timeout(struct ptlrpc_request_set *set)
916 {
917         struct list_head      *tmp;
918         time_t                 now = CURRENT_SECONDS;
919         time_t                 deadline;
920         int                    timeout = 0;
921         struct ptlrpc_request *req;
922         ENTRY;
923
924         SIGNAL_MASK_ASSERT(); /* XXX BUG 1511 */
925
926         list_for_each(tmp, &set->set_requests) {
927                 req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
928
929                 /* request in-flight? */
930                 if (!((req->rq_phase == RQ_PHASE_RPC && !req->rq_waiting) ||
931                       (req->rq_phase == RQ_PHASE_BULK)))
932                         continue;
933
934                 if (req->rq_timedout)   /* already timed out */
935                         continue;
936
937                 deadline = req->rq_sent + req->rq_timeout;
938                 if (deadline <= now)    /* actually expired already */
939                         timeout = 1;    /* ASAP */
940                 else if (timeout == 0 || timeout > deadline - now)
941                         timeout = deadline - now;
942         }
943         RETURN(timeout);
944 }
945
946 int ptlrpc_set_wait(struct ptlrpc_request_set *set)
947 {
948         struct list_head      *tmp;
949         struct ptlrpc_request *req;
950         struct l_wait_info     lwi;
951         int                    rc, timeout;
952         ENTRY;
953
954         LASSERT(!list_empty(&set->set_requests));
955         list_for_each(tmp, &set->set_requests) {
956                 req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
957                 if (req->rq_phase == RQ_PHASE_NEW)
958                         (void)ptlrpc_send_new_req(req);
959         }
960
961         do {
962                 timeout = ptlrpc_set_next_timeout(set);
963
964                 /* wait until all complete, interrupted, or an in-flight
965                  * req times out */
966                 CDEBUG(D_HA, "set %p going to sleep for %d seconds\n",
967                        set, timeout);
968                 lwi = LWI_TIMEOUT_INTR((timeout ? timeout : 1) * HZ,
969                                        ptlrpc_expired_set,
970                                        ptlrpc_interrupted_set, set);
971                 rc = l_wait_event(set->set_waitq, ptlrpc_check_set(set), &lwi);
972
973                 LASSERT(rc == 0 || rc == -EINTR || rc == -ETIMEDOUT);
974
975                 /* -EINTR => all requests have been flagged rq_intr so next
976                  * check completes.
977                  * -ETIMEOUTD => someone timed out.  When all reqs have
978                  * timed out, signals are enabled allowing completion with
979                  * EINTR.
980                  * I don't really care if we go once more round the loop in
981                  * the error cases -eeb. */
982         } while (rc != 0 || set->set_remaining != 0);
983
984         LASSERT(set->set_remaining == 0);
985
986         rc = 0;
987         list_for_each(tmp, &set->set_requests) {
988                 req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
989
990                 LASSERT(req->rq_phase == RQ_PHASE_COMPLETE);
991                 if (req->rq_status != 0)
992                         rc = req->rq_status;
993         }
994
995         if (set->set_interpret != NULL) {
996                 int (*interpreter)(struct ptlrpc_request_set *set,void *,int) =
997                         set->set_interpret;
998                 rc = interpreter (set, set->set_arg, rc);
999         }
1000
1001         RETURN(rc);
1002 }
1003
1004 static void __ptlrpc_free_req(struct ptlrpc_request *request, int locked)
1005 {
1006         ENTRY;
1007         if (request == NULL) {
1008                 EXIT;
1009                 return;
1010         }
1011
1012         LASSERTF(!request->rq_receiving_reply, "req %p\n", request);
1013         LASSERTF(request->rq_rqbd == NULL, "req %p\n",request);/* client-side */
1014         LASSERTF(list_empty(&request->rq_list), "req %p\n", request);
1015         LASSERTF(list_empty(&request->rq_set_chain), "req %p\n", request);
1016
1017         /* We must take it off the imp_replay_list first.  Otherwise, we'll set
1018          * request->rq_reqmsg to NULL while osc_close is dereferencing it. */
1019         if (request->rq_import != NULL) {
1020                 unsigned long flags = 0;
1021                 if (!locked)
1022                         spin_lock_irqsave(&request->rq_import->imp_lock, flags);
1023                 list_del_init(&request->rq_replay_list);
1024                 if (!locked)
1025                         spin_unlock_irqrestore(&request->rq_import->imp_lock,
1026                                                flags);
1027         }
1028         LASSERTF(list_empty(&request->rq_replay_list), "req %p\n", request);
1029
1030         if (atomic_read(&request->rq_refcount) != 0) {
1031                 DEBUG_REQ(D_ERROR, request,
1032                           "freeing request with nonzero refcount");
1033                 LBUG();
1034         }
1035
1036         if (request->rq_repmsg != NULL) {
1037                 OBD_FREE(request->rq_repmsg, request->rq_replen);
1038                 request->rq_repmsg = NULL;
1039         }
1040         if (request->rq_reqmsg != NULL) {
1041                 OBD_FREE(request->rq_reqmsg, request->rq_reqlen);
1042                 request->rq_reqmsg = NULL;
1043         }
1044         if (request->rq_export != NULL) {
1045                 class_export_put(request->rq_export);
1046                 request->rq_export = NULL;
1047         }
1048         if (request->rq_import != NULL) {
1049                 class_import_put(request->rq_import);
1050                 request->rq_import = NULL;
1051         }
1052         if (request->rq_bulk != NULL)
1053                 ptlrpc_free_bulk(request->rq_bulk);
1054
1055         OBD_FREE(request, sizeof(*request));
1056         EXIT;
1057 }
1058
1059 void ptlrpc_free_req(struct ptlrpc_request *request)
1060 {
1061         __ptlrpc_free_req(request, 0);
1062 }
1063
1064 static int __ptlrpc_req_finished(struct ptlrpc_request *request, int locked);
1065 void ptlrpc_req_finished_with_imp_lock(struct ptlrpc_request *request)
1066 {
1067         LASSERT_SPIN_LOCKED(&request->rq_import->imp_lock);
1068         (void)__ptlrpc_req_finished(request, 1);
1069 }
1070
1071 static int __ptlrpc_req_finished(struct ptlrpc_request *request, int locked)
1072 {
1073         ENTRY;
1074         if (request == NULL)
1075                 RETURN(1);
1076
1077         if (request == LP_POISON ||
1078             request->rq_reqmsg == LP_POISON) {
1079                 CERROR("dereferencing freed request (bug 575)\n");
1080                 LBUG();
1081                 RETURN(1);
1082         }
1083
1084         DEBUG_REQ(D_INFO, request, "refcount now %u",
1085                   atomic_read(&request->rq_refcount) - 1);
1086
1087         if (atomic_dec_and_test(&request->rq_refcount)) {
1088                 __ptlrpc_free_req(request, locked);
1089                 RETURN(1);
1090         }
1091
1092         RETURN(0);
1093 }
1094
1095 void ptlrpc_req_finished(struct ptlrpc_request *request)
1096 {
1097         __ptlrpc_req_finished(request, 0);
1098 }
1099
1100 __u64 ptlrpc_req_xid(struct ptlrpc_request *request)
1101 {
1102         return request->rq_xid;
1103 }
1104 EXPORT_SYMBOL(ptlrpc_req_xid);
1105
1106 /* Disengage the client's reply buffer from the network
1107  * NB does _NOT_ unregister any client-side bulk.
1108  * IDEMPOTENT, but _not_ safe against concurrent callers.
1109  * The request owner (i.e. the thread doing the I/O) must call...
1110  */
1111 void ptlrpc_unregister_reply (struct ptlrpc_request *request)
1112 {
1113         int                rc;
1114         wait_queue_head_t *wq;
1115         struct l_wait_info lwi;
1116
1117         LASSERT(!in_interrupt ());             /* might sleep */
1118
1119         if (!ptlrpc_client_receiving_reply(request))
1120                 return;
1121
1122         PtlMDUnlink (request->rq_reply_md_h);
1123
1124         /* We have to l_wait_event() whatever the result, to give liblustre
1125          * a chance to run reply_in_callback() */
1126
1127         if (request->rq_set != NULL)
1128                 wq = &request->rq_set->set_waitq;
1129         else
1130                 wq = &request->rq_reply_waitq;
1131
1132         for (;;) {
1133                 /* Network access will complete in finite time but the HUGE
1134                  * timeout lets us CWARN for visibility of sluggish NALs */
1135                 lwi = LWI_TIMEOUT(300 * HZ, NULL, NULL);
1136                 rc = l_wait_event (*wq, !ptlrpc_client_receiving_reply(request), &lwi);
1137                 if (rc == 0)
1138                         return;
1139
1140                 LASSERT (rc == -ETIMEDOUT);
1141                 DEBUG_REQ(D_WARNING, request, "Unexpectedly long timeout");
1142         }
1143 }
1144
1145 /* caller must hold imp->imp_lock */
1146 void ptlrpc_free_committed(struct obd_import *imp)
1147 {
1148         struct list_head *tmp, *saved;
1149         struct ptlrpc_request *req;
1150         struct ptlrpc_request *last_req = NULL; /* temporary fire escape */
1151         ENTRY;
1152
1153         LASSERT(imp != NULL);
1154
1155         LASSERT_SPIN_LOCKED(&imp->imp_lock);
1156
1157         CDEBUG(D_HA, "%s: committing for last_committed "LPU64"\n",
1158                imp->imp_obd->obd_name, imp->imp_peer_committed_transno);
1159
1160         list_for_each_safe(tmp, saved, &imp->imp_replay_list) {
1161                 req = list_entry(tmp, struct ptlrpc_request, rq_replay_list);
1162
1163                 /* XXX ok to remove when 1357 resolved - rread 05/29/03  */
1164                 LASSERT(req != last_req);
1165                 last_req = req;
1166
1167                 if (req->rq_import_generation < imp->imp_generation) {
1168                         DEBUG_REQ(D_HA, req, "freeing request with old gen");
1169                         GOTO(free_req, 0);
1170                 }
1171
1172                 if (req->rq_replay) {
1173                         DEBUG_REQ(D_HA, req, "keeping (FL_REPLAY)");
1174                         continue;
1175                 }
1176
1177                 /* not yet committed */
1178                 if (req->rq_transno > imp->imp_peer_committed_transno) {
1179                         DEBUG_REQ(D_HA, req, "stopping search");
1180                         break;
1181                 }
1182
1183                 DEBUG_REQ(D_HA, req, "committing (last_committed "LPU64")",
1184                           imp->imp_peer_committed_transno);
1185 free_req:
1186                 if (req->rq_commit_cb != NULL)
1187                         req->rq_commit_cb(req);
1188                 list_del_init(&req->rq_replay_list);
1189                 __ptlrpc_req_finished(req, 1);
1190         }
1191
1192         EXIT;
1193         return;
1194 }
1195
1196 void ptlrpc_cleanup_client(struct obd_import *imp)
1197 {
1198         ENTRY;
1199         EXIT;
1200         return;
1201 }
1202
1203 void ptlrpc_resend_req(struct ptlrpc_request *req)
1204 {
1205         unsigned long flags;
1206
1207         DEBUG_REQ(D_HA, req, "going to resend");
1208         req->rq_reqmsg->handle.cookie = 0;
1209         req->rq_status = -EAGAIN;
1210
1211         spin_lock_irqsave (&req->rq_lock, flags);
1212         req->rq_resend = 1;
1213         req->rq_net_err = 0;
1214         req->rq_timedout = 0;
1215         if (req->rq_bulk) {
1216                 __u64 old_xid = req->rq_xid;
1217
1218                 /* ensure previous bulk fails */
1219                 req->rq_xid = ptlrpc_next_xid();
1220                 CDEBUG(D_HA, "resend bulk old x"LPU64" new x"LPU64"\n",
1221                        old_xid, req->rq_xid);
1222         }
1223         ptlrpc_wake_client_req(req);
1224         spin_unlock_irqrestore (&req->rq_lock, flags);
1225 }
1226
1227 /* XXX: this function and rq_status are currently unused */
1228 void ptlrpc_restart_req(struct ptlrpc_request *req)
1229 {
1230         unsigned long flags;
1231
1232         DEBUG_REQ(D_HA, req, "restarting (possibly-)completed request");
1233         req->rq_status = -ERESTARTSYS;
1234
1235         spin_lock_irqsave (&req->rq_lock, flags);
1236         req->rq_restart = 1;
1237         req->rq_timedout = 0;
1238         ptlrpc_wake_client_req(req);
1239         spin_unlock_irqrestore (&req->rq_lock, flags);
1240 }
1241
1242 static int expired_request(void *data)
1243 {
1244         struct ptlrpc_request *req = data;
1245         ENTRY;
1246
1247         RETURN(ptlrpc_expire_one_request(req));
1248 }
1249
1250 static void interrupted_request(void *data)
1251 {
1252         unsigned long flags;
1253
1254         struct ptlrpc_request *req = data;
1255         DEBUG_REQ(D_HA, req, "request interrupted");
1256         spin_lock_irqsave (&req->rq_lock, flags);
1257         req->rq_intr = 1;
1258         spin_unlock_irqrestore (&req->rq_lock, flags);
1259 }
1260
1261 struct ptlrpc_request *ptlrpc_request_addref(struct ptlrpc_request *req)
1262 {
1263         ENTRY;
1264         atomic_inc(&req->rq_refcount);
1265         RETURN(req);
1266 }
1267
1268 void ptlrpc_retain_replayable_request(struct ptlrpc_request *req,
1269                                       struct obd_import *imp)
1270 {
1271         struct list_head *tmp;
1272
1273         LASSERT_SPIN_LOCKED(&imp->imp_lock);
1274
1275         /* clear this for new requests that were resent as well
1276            as resent replayed requests. */
1277         lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT);
1278
1279         /* don't re-add requests that have been replayed */
1280         if (!list_empty(&req->rq_replay_list))
1281                 return;
1282
1283         lustre_msg_add_flags(req->rq_reqmsg, MSG_REPLAY);
1284
1285         LASSERT(imp->imp_replayable);
1286         /* Balanced in ptlrpc_free_committed, usually. */
1287         ptlrpc_request_addref(req);
1288         list_for_each_prev(tmp, &imp->imp_replay_list) {
1289                 struct ptlrpc_request *iter =
1290                         list_entry(tmp, struct ptlrpc_request, rq_replay_list);
1291
1292                 /* We may have duplicate transnos if we create and then
1293                  * open a file, or for closes retained if to match creating
1294                  * opens, so use req->rq_xid as a secondary key.
1295                  * (See bugs 684, 685, and 428.)
1296                  * XXX no longer needed, but all opens need transnos!
1297                  */
1298                 if (iter->rq_transno > req->rq_transno)
1299                         continue;
1300
1301                 if (iter->rq_transno == req->rq_transno) {
1302                         LASSERT(iter->rq_xid != req->rq_xid);
1303                         if (iter->rq_xid > req->rq_xid)
1304                                 continue;
1305                 }
1306
1307                 list_add(&req->rq_replay_list, &iter->rq_replay_list);
1308                 return;
1309         }
1310
1311         list_add_tail(&req->rq_replay_list, &imp->imp_replay_list);
1312 }
1313
1314 int ptlrpc_queue_wait(struct ptlrpc_request *req)
1315 {
1316         char str[PTL_NALFMT_SIZE];
1317         int rc = 0;
1318         int brc;
1319         struct l_wait_info lwi;
1320         struct obd_import *imp = req->rq_import;
1321         unsigned long flags;
1322         int timeout = 0;
1323         ENTRY;
1324
1325         LASSERT(req->rq_set == NULL);
1326         LASSERT(!req->rq_receiving_reply);
1327         atomic_inc(&imp->imp_inflight);
1328
1329         /* for distributed debugging */
1330         req->rq_reqmsg->status = current->pid;
1331         LASSERT(imp->imp_obd != NULL);
1332         CDEBUG(D_RPCTRACE, "Sending RPC pname:cluuid:pid:xid:ni:nid:opc "
1333                "%s:%s:%d:"LPU64":%s:%s:%d\n", current->comm,
1334                imp->imp_obd->obd_uuid.uuid,
1335                req->rq_reqmsg->status, req->rq_xid,
1336                imp->imp_connection->c_peer.peer_ni->pni_name,
1337                ptlrpc_peernid2str(&imp->imp_connection->c_peer, str),
1338                req->rq_reqmsg->opc);
1339
1340         /* Mark phase here for a little debug help */
1341         req->rq_phase = RQ_PHASE_RPC;
1342
1343         spin_lock_irqsave(&imp->imp_lock, flags);
1344         req->rq_import_generation = imp->imp_generation;
1345 restart:
1346         if (ptlrpc_import_delay_req(imp, req, &rc)) {
1347                 list_del(&req->rq_list);
1348
1349                 list_add_tail(&req->rq_list, &imp->imp_delayed_list);
1350                 spin_unlock_irqrestore(&imp->imp_lock, flags);
1351
1352                 DEBUG_REQ(D_HA, req, "\"%s\" waiting for recovery: (%s != %s)",
1353                           current->comm, 
1354                           ptlrpc_import_state_name(req->rq_send_state), 
1355                           ptlrpc_import_state_name(imp->imp_state));
1356                 lwi = LWI_INTR(interrupted_request, req);
1357                 rc = l_wait_event(req->rq_reply_waitq,
1358                                   (req->rq_send_state == imp->imp_state ||
1359                                    req->rq_err),
1360                                   &lwi);
1361                 DEBUG_REQ(D_HA, req, "\"%s\" awake: (%s == %s or %d == 1)",
1362                           current->comm, 
1363                           ptlrpc_import_state_name(imp->imp_state), 
1364                           ptlrpc_import_state_name(req->rq_send_state),
1365                           req->rq_err);
1366
1367                 spin_lock_irqsave(&imp->imp_lock, flags);
1368                 list_del_init(&req->rq_list);
1369
1370                 if (req->rq_err) {
1371                         rc = -EIO;
1372                 }
1373                 else if (req->rq_intr) {
1374                         rc = -EINTR;
1375                 }
1376                 else if (req->rq_no_resend) {
1377                         spin_unlock_irqrestore(&imp->imp_lock, flags);
1378                         GOTO(out, rc = -ETIMEDOUT);
1379                 }
1380                 else {
1381                         GOTO(restart, rc);
1382                 }
1383         }
1384
1385         if (rc != 0) {
1386                 list_del_init(&req->rq_list);
1387                 spin_unlock_irqrestore(&imp->imp_lock, flags);
1388                 req->rq_status = rc; // XXX this ok?
1389                 GOTO(out, rc);
1390         }
1391
1392         if (req->rq_resend) {
1393                 lustre_msg_add_flags(req->rq_reqmsg, MSG_RESENT);
1394
1395                 if (req->rq_bulk != NULL)
1396                         ptlrpc_unregister_bulk (req);
1397
1398                 DEBUG_REQ(D_HA, req, "resending: ");
1399         }
1400
1401         /* XXX this is the same as ptlrpc_set_wait */
1402         LASSERT(list_empty(&req->rq_list));
1403         list_add_tail(&req->rq_list, &imp->imp_sending_list);
1404         spin_unlock_irqrestore(&imp->imp_lock, flags);
1405
1406         rc = ptl_send_rpc(req);
1407         if (rc) {
1408                 DEBUG_REQ(D_HA, req, "send failed (%d); recovering", rc);
1409                 timeout = 1;
1410         } else {
1411                 timeout = MAX(req->rq_timeout * HZ, 1);
1412                 DEBUG_REQ(D_NET, req, "-- sleeping for %d jiffies", timeout);
1413         }
1414         lwi = LWI_TIMEOUT_INTR(timeout, expired_request, interrupted_request,
1415                                req);
1416         l_wait_event(req->rq_reply_waitq, ptlrpc_check_reply(req), &lwi);
1417         DEBUG_REQ(D_NET, req, "-- done sleeping");
1418
1419         CDEBUG(D_RPCTRACE, "Completed RPC pname:cluuid:pid:xid:ni:nid:opc "
1420                "%s:%s:%d:"LPU64":%s:%s:%d\n", current->comm,
1421                imp->imp_obd->obd_uuid.uuid,
1422                req->rq_reqmsg->status, req->rq_xid,
1423                imp->imp_connection->c_peer.peer_ni->pni_name,
1424                ptlrpc_peernid2str(&imp->imp_connection->c_peer, str),
1425                req->rq_reqmsg->opc);
1426
1427         spin_lock_irqsave(&imp->imp_lock, flags);
1428         list_del_init(&req->rq_list);
1429         spin_unlock_irqrestore(&imp->imp_lock, flags);
1430
1431         /* If the reply was received normally, this just grabs the spinlock
1432          * (ensuring the reply callback has returned), sees that
1433          * req->rq_receiving_reply is clear and returns. */
1434         ptlrpc_unregister_reply (req);
1435
1436         if (req->rq_err)
1437                 GOTO(out, rc = -EIO);
1438
1439         /* Resend if we need to, unless we were interrupted. */
1440         if (req->rq_resend && !req->rq_intr) {
1441                 /* ...unless we were specifically told otherwise. */
1442                 if (req->rq_no_resend)
1443                         GOTO(out, rc = -ETIMEDOUT);
1444                 spin_lock_irqsave(&imp->imp_lock, flags);
1445                 goto restart;
1446         }
1447
1448         if (req->rq_intr) {
1449                 /* Should only be interrupted if we timed out. */
1450                 if (!req->rq_timedout)
1451                         DEBUG_REQ(D_ERROR, req,
1452                                   "rq_intr set but rq_timedout not");
1453                 GOTO(out, rc = -EINTR);
1454         }
1455
1456         if (req->rq_timedout) {                 /* non-recoverable timeout */
1457                 GOTO(out, rc = -ETIMEDOUT);
1458         }
1459
1460         if (!req->rq_replied) {
1461                 /* How can this be? -eeb */
1462                 DEBUG_REQ(D_ERROR, req, "!rq_replied: ");
1463                 LBUG();
1464                 GOTO(out, rc = req->rq_status);
1465         }
1466
1467         rc = after_reply (req);
1468         /* NB may return +ve success rc */
1469         if (req->rq_resend) {
1470                 spin_lock_irqsave(&imp->imp_lock, flags);
1471                 goto restart;
1472         }
1473
1474  out:
1475         if (req->rq_bulk != NULL) {
1476                 if (rc >= 0) {                  
1477                         /* success so far.  Note that anything going wrong
1478                          * with bulk now, is EXTREMELY strange, since the
1479                          * server must have believed that the bulk
1480                          * tranferred OK before she replied with success to
1481                          * me. */
1482                         lwi = LWI_TIMEOUT(timeout, NULL, NULL);
1483                         brc = l_wait_event(req->rq_reply_waitq,
1484                                            !ptlrpc_bulk_active(req->rq_bulk),
1485                                            &lwi);
1486                         LASSERT(brc == 0 || brc == -ETIMEDOUT);
1487                         if (brc != 0) {
1488                                 LASSERT(brc == -ETIMEDOUT);
1489                                 DEBUG_REQ(D_ERROR, req, "bulk timed out");
1490                                 rc = brc;
1491                         } else if (!req->rq_bulk->bd_success) {
1492                                 DEBUG_REQ(D_ERROR, req, "bulk transfer failed");
1493                                 rc = -EIO;
1494                         }
1495                 }
1496                 if (rc < 0)
1497                         ptlrpc_unregister_bulk (req);
1498         }
1499
1500         LASSERT(!req->rq_receiving_reply);
1501         req->rq_phase = RQ_PHASE_INTERPRET;
1502
1503         atomic_dec(&imp->imp_inflight);
1504         wake_up(&imp->imp_recovery_waitq);
1505         RETURN(rc);
1506 }
1507
1508 struct ptlrpc_replay_async_args {
1509         int praa_old_state;
1510         int praa_old_status;
1511 };
1512
1513 static int ptlrpc_replay_interpret(struct ptlrpc_request *req,
1514                                     void * data, int rc)
1515 {
1516         struct ptlrpc_replay_async_args *aa = data;
1517         struct obd_import *imp = req->rq_import;
1518         unsigned long flags;
1519
1520         atomic_dec(&imp->imp_replay_inflight);
1521         
1522         if (!req->rq_replied) {
1523                 CERROR("request replay timed out, restarting recovery\n");
1524                 GOTO(out, rc = -ETIMEDOUT);
1525         }
1526
1527 #if SWAB_PARANOIA
1528         /* Clear reply swab mask; this is a new reply in sender's byte order */
1529         req->rq_rep_swab_mask = 0;
1530 #endif
1531         LASSERT (req->rq_nob_received <= req->rq_replen);
1532         rc = lustre_unpack_msg(req->rq_repmsg, req->rq_nob_received);
1533         if (rc) {
1534                 CERROR("unpack_rep failed: %d\n", rc);
1535                 GOTO(out, rc = -EPROTO);
1536         }
1537
1538         if (req->rq_repmsg->type == PTL_RPC_MSG_ERR && 
1539             req->rq_repmsg->status == -ENOTCONN) 
1540                 GOTO(out, rc = req->rq_repmsg->status);
1541
1542         /* The transno had better not change over replay. */
1543         LASSERT(req->rq_reqmsg->transno == req->rq_repmsg->transno);
1544
1545         DEBUG_REQ(D_HA, req, "got rep");
1546
1547         /* let the callback do fixups, possibly including in the request */
1548         if (req->rq_replay_cb)
1549                 req->rq_replay_cb(req);
1550
1551         if (req->rq_replied && req->rq_repmsg->status != aa->praa_old_status) {
1552                 DEBUG_REQ(D_ERROR, req, "status %d, old was %d",
1553                           req->rq_repmsg->status, aa->praa_old_status);
1554         } else {
1555                 /* Put it back for re-replay. */
1556                 req->rq_repmsg->status = aa->praa_old_status;
1557         }
1558
1559         spin_lock_irqsave(&imp->imp_lock, flags);
1560         imp->imp_last_replay_transno = req->rq_transno;
1561         spin_unlock_irqrestore(&imp->imp_lock, flags);
1562
1563         /* continue with recovery */
1564         rc = ptlrpc_import_recovery_state_machine(imp);
1565  out:
1566         req->rq_send_state = aa->praa_old_state;
1567         
1568         if (rc != 0)
1569                 /* this replay failed, so restart recovery */
1570                 ptlrpc_connect_import(imp, NULL);
1571
1572         RETURN(rc);
1573 }
1574
1575
1576 int ptlrpc_replay_req(struct ptlrpc_request *req)
1577 {
1578         struct ptlrpc_replay_async_args *aa;
1579         ENTRY;
1580
1581         LASSERT(req->rq_import->imp_state == LUSTRE_IMP_REPLAY);
1582
1583         /* Not handling automatic bulk replay yet (or ever?) */
1584         LASSERT(req->rq_bulk == NULL);
1585
1586         DEBUG_REQ(D_HA, req, "REPLAY");
1587
1588         LASSERT (sizeof (*aa) <= sizeof (req->rq_async_args));
1589         aa = (struct ptlrpc_replay_async_args *)&req->rq_async_args;
1590         memset(aa, 0, sizeof *aa);
1591
1592         /* Prepare request to be resent with ptlrpcd */
1593         aa->praa_old_state = req->rq_send_state;
1594         req->rq_send_state = LUSTRE_IMP_REPLAY;
1595         req->rq_phase = RQ_PHASE_NEW;
1596         aa->praa_old_status = req->rq_repmsg->status;
1597         req->rq_status = 0;
1598
1599         req->rq_interpret_reply = ptlrpc_replay_interpret;
1600         atomic_inc(&req->rq_import->imp_replay_inflight);
1601         ptlrpc_request_addref(req); /* ptlrpcd needs a ref */
1602
1603         ptlrpcd_add_req(req);
1604         RETURN(0);
1605 }
1606
1607 void ptlrpc_abort_inflight(struct obd_import *imp)
1608 {
1609         unsigned long flags;
1610         struct list_head *tmp, *n;
1611         ENTRY;
1612
1613         /* Make sure that no new requests get processed for this import.
1614          * ptlrpc_{queue,set}_wait must (and does) hold imp_lock while testing
1615          * this flag and then putting requests on sending_list or delayed_list.
1616          */
1617         spin_lock_irqsave(&imp->imp_lock, flags);
1618
1619         /* XXX locking?  Maybe we should remove each request with the list
1620          * locked?  Also, how do we know if the requests on the list are
1621          * being freed at this time?
1622          */
1623         list_for_each_safe(tmp, n, &imp->imp_sending_list) {
1624                 struct ptlrpc_request *req =
1625                         list_entry(tmp, struct ptlrpc_request, rq_list);
1626
1627                 DEBUG_REQ(D_HA, req, "inflight");
1628
1629                 spin_lock (&req->rq_lock);
1630                 if (req->rq_import_generation < imp->imp_generation) {
1631                         req->rq_err = 1;
1632                         ptlrpc_wake_client_req(req);
1633                 }
1634                 spin_unlock (&req->rq_lock);
1635         }
1636
1637         list_for_each_safe(tmp, n, &imp->imp_delayed_list) {
1638                 struct ptlrpc_request *req =
1639                         list_entry(tmp, struct ptlrpc_request, rq_list);
1640
1641                 DEBUG_REQ(D_HA, req, "aborting waiting req");
1642
1643                 spin_lock (&req->rq_lock);
1644                 if (req->rq_import_generation < imp->imp_generation) {
1645                         req->rq_err = 1;
1646                         ptlrpc_wake_client_req(req);
1647                 }
1648                 spin_unlock (&req->rq_lock);
1649         }
1650
1651         /* Last chance to free reqs left on the replay list, but we
1652          * will still leak reqs that haven't comitted.  */
1653         if (imp->imp_replayable)
1654                 ptlrpc_free_committed(imp);
1655
1656         spin_unlock_irqrestore(&imp->imp_lock, flags);
1657
1658         EXIT;
1659 }
1660
1661 static __u64 ptlrpc_last_xid = 0;
1662 static spinlock_t ptlrpc_last_xid_lock = SPIN_LOCK_UNLOCKED;
1663
1664 __u64 ptlrpc_next_xid(void)
1665 {
1666         __u64 tmp;
1667         spin_lock(&ptlrpc_last_xid_lock);
1668         tmp = ++ptlrpc_last_xid;
1669         spin_unlock(&ptlrpc_last_xid_lock);
1670         return tmp;
1671 }
1672
1673 __u64 ptlrpc_sample_next_xid(void)
1674 {
1675         __u64 tmp;
1676         spin_lock(&ptlrpc_last_xid_lock);
1677         tmp = ptlrpc_last_xid + 1;
1678         spin_unlock(&ptlrpc_last_xid_lock);
1679         return tmp;
1680 }
1681 EXPORT_SYMBOL(ptlrpc_sample_next_xid);