Whamcloud - gitweb
b=3405
[fs/lustre-release.git] / lustre / ptlrpc / client.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  *
21  */
22
23 #define DEBUG_SUBSYSTEM S_RPC
24 #ifndef __KERNEL__
25 #include <errno.h>
26 #include <signal.h>
27 #include <liblustre.h>
28 #endif
29
30 #include <linux/obd_support.h>
31 #include <linux/obd_class.h>
32 #include <linux/lustre_lib.h>
33 #include <linux/lustre_ha.h>
34 #include <linux/lustre_import.h>
35
36 #include "ptlrpc_internal.h"
37
38 void ptlrpc_init_client(int req_portal, int rep_portal, char *name,
39                         struct ptlrpc_client *cl)
40 {
41         cl->cli_request_portal = req_portal;
42         cl->cli_reply_portal   = rep_portal;
43         cl->cli_name           = name;
44 }
45
46 struct ptlrpc_connection *ptlrpc_uuid_to_connection(struct obd_uuid *uuid)
47 {
48         struct ptlrpc_connection *c;
49         struct ptlrpc_peer peer;
50         int err;
51
52         err = ptlrpc_uuid_to_peer(uuid, &peer);
53         if (err != 0) {
54                 CERROR("cannot find peer %s!\n", uuid->uuid);
55                 return NULL;
56         }
57
58         c = ptlrpc_get_connection(&peer, uuid);
59         if (c) {
60                 memcpy(c->c_remote_uuid.uuid,
61                        uuid->uuid, sizeof(c->c_remote_uuid.uuid));
62         }
63
64         CDEBUG(D_INFO, "%s -> %p\n", uuid->uuid, c);
65
66         return c;
67 }
68
69 void ptlrpc_readdress_connection(struct ptlrpc_connection *conn,
70                                  struct obd_uuid *uuid)
71 {
72         struct ptlrpc_peer peer;
73         int err;
74
75         err = ptlrpc_uuid_to_peer(uuid, &peer);
76         if (err != 0) {
77                 CERROR("cannot find peer %s!\n", uuid->uuid);
78                 return;
79         }
80
81         memcpy(&conn->c_peer, &peer, sizeof (peer));
82         return;
83 }
84
85 static inline struct ptlrpc_bulk_desc *new_bulk(int npages, int type, int portal)
86 {
87         struct ptlrpc_bulk_desc *desc;
88
89         OBD_ALLOC(desc, offsetof (struct ptlrpc_bulk_desc, bd_iov[npages]));
90         if (!desc)
91                 return NULL;
92
93         spin_lock_init(&desc->bd_lock);
94         init_waitqueue_head(&desc->bd_waitq);
95         desc->bd_max_iov = npages;
96         desc->bd_iov_count = 0;
97         desc->bd_md_h = PTL_INVALID_HANDLE;
98         desc->bd_portal = portal;
99         desc->bd_type = type;
100         
101         return desc;
102 }
103
104 struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_imp (struct ptlrpc_request *req,
105                                                int npages, int type, int portal)
106 {
107         struct obd_import *imp = req->rq_import;
108         struct ptlrpc_bulk_desc *desc;
109
110         LASSERT(type == BULK_PUT_SINK || type == BULK_GET_SOURCE);
111         desc = new_bulk(npages, type, portal);
112         if (desc == NULL)
113                 RETURN(NULL);
114
115         desc->bd_import_generation = req->rq_import_generation;
116         desc->bd_import = class_import_get(imp);
117         desc->bd_req = req;
118
119         desc->bd_cbid.cbid_fn  = client_bulk_callback;
120         desc->bd_cbid.cbid_arg = desc;
121
122         /* This makes req own desc, and free it when she frees herself */
123         req->rq_bulk = desc;
124
125         return desc;
126 }
127
128 struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_exp (struct ptlrpc_request *req,
129                                                int npages, int type, int portal)
130 {
131         struct obd_export *exp = req->rq_export;
132         struct ptlrpc_bulk_desc *desc;
133
134         LASSERT(type == BULK_PUT_SOURCE || type == BULK_GET_SINK);
135
136         desc = new_bulk(npages, type, portal);
137         if (desc == NULL)
138                 RETURN(NULL);
139
140         desc->bd_export = class_export_get(exp);
141         desc->bd_req = req;
142
143         desc->bd_cbid.cbid_fn  = server_bulk_callback;
144         desc->bd_cbid.cbid_arg = desc;
145
146         /* NB we don't assign rq_bulk here; server-side requests are
147          * re-used, and the handler frees the bulk desc explicitly. */
148
149         return desc;
150 }
151
152 void ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc,
153                            struct page *page, int pageoffset, int len)
154 {
155         LASSERT(desc->bd_iov_count < desc->bd_max_iov);
156         LASSERT(page != NULL);
157         LASSERT(pageoffset >= 0);
158         LASSERT(len > 0);
159         LASSERT(pageoffset + len <= PAGE_SIZE);
160
161         desc->bd_nob += len;
162
163         ptlrpc_add_bulk_page(desc, page, pageoffset, len);
164 }
165
166 void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *desc)
167 {
168         ENTRY;
169
170         LASSERT(desc != NULL);
171         LASSERT(desc->bd_iov_count != LI_POISON); /* not freed already */
172         LASSERT(!desc->bd_network_rw);         /* network hands off or */
173         LASSERT((desc->bd_export != NULL) ^ (desc->bd_import != NULL));
174         if (desc->bd_export)
175                 class_export_put(desc->bd_export);
176         else
177                 class_import_put(desc->bd_import);
178
179         OBD_FREE(desc, offsetof(struct ptlrpc_bulk_desc, 
180                                 bd_iov[desc->bd_max_iov]));
181         EXIT;
182 }
183
184 struct ptlrpc_request *ptlrpc_prep_req(struct obd_import *imp, int opcode,
185                                        int count, int *lengths, char **bufs)
186 {
187         struct ptlrpc_request *request;
188         int rc;
189         ENTRY;
190
191         LASSERT((unsigned long)imp > 0x1000);
192
193         OBD_ALLOC(request, sizeof(*request));
194         if (!request) {
195                 CERROR("request allocation out of memory\n");
196                 RETURN(NULL);
197         }
198
199         rc = lustre_pack_request(request, count, lengths, bufs);
200         if (rc) {
201                 CERROR("cannot pack request %d\n", rc);
202                 OBD_FREE(request, sizeof(*request));
203                 RETURN(NULL);
204         }
205
206         if (imp->imp_server_timeout)
207                 request->rq_timeout = obd_timeout / 2;
208         else
209                 request->rq_timeout = obd_timeout;
210         request->rq_send_state = LUSTRE_IMP_FULL;
211         request->rq_type = PTL_RPC_MSG_REQUEST;
212         request->rq_import = class_import_get(imp);
213
214         request->rq_req_cbid.cbid_fn  = request_out_callback;
215         request->rq_req_cbid.cbid_arg = request;
216
217         request->rq_reply_cbid.cbid_fn  = reply_in_callback;
218         request->rq_reply_cbid.cbid_arg = request;
219         
220         request->rq_phase = RQ_PHASE_NEW;
221
222         /* XXX FIXME bug 249 */
223         request->rq_request_portal = imp->imp_client->cli_request_portal;
224         request->rq_reply_portal = imp->imp_client->cli_reply_portal;
225
226         spin_lock_init(&request->rq_lock);
227         INIT_LIST_HEAD(&request->rq_list);
228         INIT_LIST_HEAD(&request->rq_replay_list);
229         INIT_LIST_HEAD(&request->rq_set_chain);
230         init_waitqueue_head(&request->rq_reply_waitq);
231         request->rq_xid = ptlrpc_next_xid();
232         atomic_set(&request->rq_refcount, 1);
233
234         request->rq_reqmsg->opc = opcode;
235         request->rq_reqmsg->flags = 0;
236
237         RETURN(request);
238 }
239
240 struct ptlrpc_request_set *ptlrpc_prep_set(void)
241 {
242         struct ptlrpc_request_set *set;
243
244         OBD_ALLOC(set, sizeof *set);
245         if (!set)
246                 RETURN(NULL);
247         INIT_LIST_HEAD(&set->set_requests);
248         init_waitqueue_head(&set->set_waitq);
249         set->set_remaining = 0;
250         spin_lock_init(&set->set_new_req_lock);
251         INIT_LIST_HEAD(&set->set_new_requests);
252
253         RETURN(set);
254 }
255
256 /* Finish with this set; opposite of prep_set. */
257 void ptlrpc_set_destroy(struct ptlrpc_request_set *set)
258 {
259         struct list_head *tmp;
260         struct list_head *next;
261         int               expected_phase;
262         int               n = 0;
263         ENTRY;
264
265         /* Requests on the set should either all be completed, or all be new */
266         expected_phase = (set->set_remaining == 0) ?
267                          RQ_PHASE_COMPLETE : RQ_PHASE_NEW;
268         list_for_each (tmp, &set->set_requests) {
269                 struct ptlrpc_request *req =
270                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
271
272                 LASSERT(req->rq_phase == expected_phase);
273                 n++;
274         }
275
276         LASSERT(set->set_remaining == 0 || set->set_remaining == n);
277
278         list_for_each_safe(tmp, next, &set->set_requests) {
279                 struct ptlrpc_request *req =
280                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
281                 list_del_init(&req->rq_set_chain);
282
283                 LASSERT(req->rq_phase == expected_phase);
284
285                 if (req->rq_phase == RQ_PHASE_NEW) {
286
287                         if (req->rq_interpret_reply != NULL) {
288                                 int (*interpreter)(struct ptlrpc_request *,
289                                                    void *, int) =
290                                         req->rq_interpret_reply;
291
292                                 /* higher level (i.e. LOV) failed;
293                                  * let the sub reqs clean up */
294                                 req->rq_status = -EBADR;
295                                 interpreter(req, &req->rq_async_args,
296                                             req->rq_status);
297                         }
298                         set->set_remaining--;
299                 }
300
301                 req->rq_set = NULL;
302                 ptlrpc_req_finished (req);
303         }
304
305         LASSERT(set->set_remaining == 0);
306
307         OBD_FREE(set, sizeof(*set));
308         EXIT;
309 }
310
311 void ptlrpc_set_add_req(struct ptlrpc_request_set *set,
312                         struct ptlrpc_request *req)
313 {
314         /* The set takes over the caller's request reference */
315         list_add_tail(&req->rq_set_chain, &set->set_requests);
316         req->rq_set = set;
317         set->set_remaining++;
318         atomic_inc(&req->rq_import->imp_inflight);
319 }
320
321 /* lock so many callers can add things, the context that owns the set
322  * is supposed to notice these and move them into the set proper. */
323 void ptlrpc_set_add_new_req(struct ptlrpc_request_set *set,
324                             struct ptlrpc_request *req)
325 {
326         unsigned long flags;
327         spin_lock_irqsave(&set->set_new_req_lock, flags);
328         /* The set takes over the caller's request reference */
329         list_add_tail(&req->rq_set_chain, &set->set_new_requests);
330         req->rq_set = set;
331         spin_unlock_irqrestore(&set->set_new_req_lock, flags);
332 }
333
334 /*
335  * Based on the current state of the import, determine if the request
336  * can be sent, is an error, or should be delayed.
337  *
338  * Returns true if this request should be delayed. If false, and
339  * *status is set, then the request can not be sent and *status is the
340  * error code.  If false and status is 0, then request can be sent.
341  *
342  * The imp->imp_lock must be held.
343  */
344 static int ptlrpc_import_delay_req(struct obd_import *imp, 
345                                    struct ptlrpc_request *req, int *status)
346 {
347         int delay = 0;
348         ENTRY;
349
350         LASSERT (status != NULL);
351         *status = 0;
352
353         if (imp->imp_state == LUSTRE_IMP_NEW) {
354                 DEBUG_REQ(D_ERROR, req, "Uninitialized import.");
355                 *status = -EIO;
356                 LBUG();
357         }
358         else if (imp->imp_state == LUSTRE_IMP_CLOSED) {
359                 DEBUG_REQ(D_ERROR, req, "IMP_CLOSED ");
360                 *status = -EIO;
361         }
362         /* allow CONNECT even if import is invalid */
363         else if (req->rq_send_state == LUSTRE_IMP_CONNECTING &&
364                  imp->imp_state == LUSTRE_IMP_CONNECTING) {
365                 ;
366         }
367         /*
368          * If the import has been invalidated (such as by an OST failure), the
369          * request must fail with -EIO.  
370          */
371         else if (imp->imp_invalid) {
372                 DEBUG_REQ(D_ERROR, req, "IMP_INVALID");
373                 *status = -EIO;
374         } 
375         else if (req->rq_import_generation != imp->imp_generation) {
376                 DEBUG_REQ(D_ERROR, req, "req wrong generation:");
377                 *status = -EIO;
378         } 
379         else if (req->rq_send_state != imp->imp_state) {
380                 if (imp->imp_obd->obd_no_recov || imp->imp_dlm_fake 
381                     || req->rq_no_delay) 
382                         *status = -EWOULDBLOCK;
383                 else
384                         delay = 1;
385         }
386
387         RETURN(delay);
388 }
389
390 static int ptlrpc_check_reply(struct ptlrpc_request *req)
391 {
392         unsigned long flags;
393         int rc = 0;
394         ENTRY;
395
396         /* serialise with network callback */
397         spin_lock_irqsave (&req->rq_lock, flags);
398
399         if (req->rq_replied) {
400                 DEBUG_REQ(D_NET, req, "REPLIED:");
401                 GOTO(out, rc = 1);
402         }
403         
404         if (req->rq_net_err && !req->rq_timedout) {
405                 spin_unlock_irqrestore (&req->rq_lock, flags);
406                 rc = ptlrpc_expire_one_request(req); 
407                 spin_lock_irqsave (&req->rq_lock, flags);
408                 GOTO(out, rc);
409         }
410
411         if (req->rq_err) {
412                 DEBUG_REQ(D_ERROR, req, "ABORTED:");
413                 GOTO(out, rc = 1);
414         }
415
416         if (req->rq_resend) {
417                 DEBUG_REQ(D_ERROR, req, "RESEND:");
418                 GOTO(out, rc = 1);
419         }
420
421         if (req->rq_restart) {
422                 DEBUG_REQ(D_ERROR, req, "RESTART:");
423                 GOTO(out, rc = 1);
424         }
425         EXIT;
426  out:
427         spin_unlock_irqrestore (&req->rq_lock, flags);
428         DEBUG_REQ(D_NET, req, "rc = %d for", rc);
429         return rc;
430 }
431
432 static int ptlrpc_check_status(struct ptlrpc_request *req)
433 {
434         int err;
435         ENTRY;
436
437         err = req->rq_repmsg->status;
438         if (req->rq_repmsg->type == PTL_RPC_MSG_ERR) {
439                 DEBUG_REQ(D_ERROR, req, "type == PTL_RPC_MSG_ERR, err == %d", 
440                           err);
441                 RETURN(err < 0 ? err : -EINVAL);
442         }
443
444         if (err < 0) {
445                 DEBUG_REQ(D_INFO, req, "status is %d", err);
446         } else if (err > 0) {
447                 /* XXX: translate this error from net to host */
448                 DEBUG_REQ(D_INFO, req, "status is %d", err);
449         }
450
451         RETURN(err);
452 }
453
454 static int after_reply(struct ptlrpc_request *req)
455 {
456         unsigned long flags;
457         struct obd_import *imp = req->rq_import;
458         int rc;
459         ENTRY;
460
461         LASSERT(!req->rq_receiving_reply);
462
463         /* NB Until this point, the whole of the incoming message,
464          * including buflens, status etc is in the sender's byte order. */
465
466 #if SWAB_PARANOIA
467         /* Clear reply swab mask; this is a new reply in sender's byte order */
468         req->rq_rep_swab_mask = 0;
469 #endif
470         LASSERT (req->rq_nob_received <= req->rq_replen);
471         rc = lustre_unpack_msg(req->rq_repmsg, req->rq_nob_received);
472         if (rc) {
473                 CERROR("unpack_rep failed: %d\n", rc);
474                 RETURN(-EPROTO);
475         }
476
477         if (req->rq_repmsg->type != PTL_RPC_MSG_REPLY &&
478             req->rq_repmsg->type != PTL_RPC_MSG_ERR) {
479                 CERROR("invalid packet type received (type=%u)\n",
480                        req->rq_repmsg->type);
481                 RETURN(-EPROTO);
482         }
483
484         /* Store transno in reqmsg for replay. */
485         req->rq_reqmsg->transno = req->rq_transno = req->rq_repmsg->transno;
486
487         rc = ptlrpc_check_status(req);
488
489         /* Either we've been evicted, or the server has failed for
490          * some reason. Try to reconnect, and if that fails, punt to the
491          * upcall. */
492         if (rc == -ENOTCONN) {
493                 if (req->rq_send_state != LUSTRE_IMP_FULL ||
494                     imp->imp_obd->obd_no_recov || imp->imp_dlm_fake) {
495                         RETURN(-ENOTCONN);
496                 }
497
498                 ptlrpc_request_handle_notconn(req);
499
500                 RETURN(rc);
501         }
502
503         if (req->rq_import->imp_replayable) {
504                 spin_lock_irqsave(&imp->imp_lock, flags);
505                 if (req->rq_replay || req->rq_transno != 0)
506                         ptlrpc_retain_replayable_request(req, imp);
507                 else if (req->rq_commit_cb != NULL) {
508                         spin_unlock_irqrestore(&imp->imp_lock, flags);
509                         req->rq_commit_cb(req);
510                         spin_lock_irqsave(&imp->imp_lock, flags);
511                 }
512
513                 if (req->rq_transno > imp->imp_max_transno)
514                         imp->imp_max_transno = req->rq_transno;
515
516                 /* Replay-enabled imports return commit-status information. */
517                 if (req->rq_repmsg->last_committed)
518                         imp->imp_peer_committed_transno =
519                                 req->rq_repmsg->last_committed;
520                 ptlrpc_free_committed(imp);
521                 spin_unlock_irqrestore(&imp->imp_lock, flags);
522         }
523
524         RETURN(rc);
525 }
526
527 static int ptlrpc_send_new_req(struct ptlrpc_request *req)
528 {
529         char                   str[PTL_NALFMT_SIZE];
530         struct obd_import     *imp;
531         unsigned long          flags;
532         int rc;
533         ENTRY;
534
535         LASSERT(req->rq_phase == RQ_PHASE_NEW);
536         req->rq_phase = RQ_PHASE_RPC;
537
538         imp = req->rq_import;
539         spin_lock_irqsave(&imp->imp_lock, flags);
540
541         req->rq_import_generation = imp->imp_generation;
542
543         if (ptlrpc_import_delay_req(imp, req, &rc)) {
544                 spin_lock (&req->rq_lock);
545                 req->rq_waiting = 1;
546                 spin_unlock (&req->rq_lock);
547
548                 DEBUG_REQ(D_HA, req, "req from PID %d waiting for recovery: "
549                           "(%s != %s)",
550                           req->rq_reqmsg->status, 
551                           ptlrpc_import_state_name(req->rq_send_state),
552                           ptlrpc_import_state_name(imp->imp_state));
553                 LASSERT(list_empty (&req->rq_list));
554
555                 list_add_tail(&req->rq_list, &imp->imp_delayed_list);
556                 spin_unlock_irqrestore(&imp->imp_lock, flags);
557                 RETURN(0);
558         }
559
560         if (rc != 0) {
561                 spin_unlock_irqrestore(&imp->imp_lock, flags);
562                 req->rq_status = rc;
563                 req->rq_phase = RQ_PHASE_INTERPRET;
564                 RETURN(rc);
565         }
566
567         /* XXX this is the same as ptlrpc_queue_wait */
568         LASSERT(list_empty(&req->rq_list));
569         list_add_tail(&req->rq_list, &imp->imp_sending_list);
570         spin_unlock_irqrestore(&imp->imp_lock, flags);
571
572         req->rq_reqmsg->status = current->pid;
573         CDEBUG(D_RPCTRACE, "Sending RPC pname:cluuid:pid:xid:ni:nid:opc"
574                " %s:%s:%d:"LPU64":%s:%s:%d\n", current->comm,
575                imp->imp_obd->obd_uuid.uuid, req->rq_reqmsg->status,
576                req->rq_xid,
577                imp->imp_connection->c_peer.peer_ni->pni_name,
578                ptlrpc_peernid2str(&imp->imp_connection->c_peer, str),
579                req->rq_reqmsg->opc);
580
581         rc = ptl_send_rpc(req);
582         if (rc) {
583                 DEBUG_REQ(D_HA, req, "send failed (%d); expect timeout", rc);
584                 req->rq_net_err = 1;
585                 RETURN(rc);
586         }
587         RETURN(0);
588 }
589
590 int ptlrpc_check_set(struct ptlrpc_request_set *set)
591 {
592         char str[PTL_NALFMT_SIZE];
593         unsigned long flags;
594         struct list_head *tmp;
595         int force_timer_recalc = 0;
596         ENTRY;
597
598         if (set->set_remaining == 0)
599                 RETURN(1);
600
601         list_for_each(tmp, &set->set_requests) {
602                 struct ptlrpc_request *req =
603                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
604                 struct obd_import *imp = req->rq_import;
605                 int rc = 0;
606
607                 if (req->rq_phase == RQ_PHASE_NEW &&
608                     ptlrpc_send_new_req(req)) {
609                         force_timer_recalc = 1;
610                 }
611
612                 if (!(req->rq_phase == RQ_PHASE_RPC ||
613                       req->rq_phase == RQ_PHASE_BULK ||
614                       req->rq_phase == RQ_PHASE_INTERPRET ||
615                       req->rq_phase == RQ_PHASE_COMPLETE)) {
616                         DEBUG_REQ(D_ERROR, req, "bad phase %x", req->rq_phase);
617                         LBUG();
618                 }
619
620                 if (req->rq_phase == RQ_PHASE_COMPLETE)
621                         continue;
622
623                 if (req->rq_phase == RQ_PHASE_INTERPRET)
624                         GOTO(interpret, req->rq_status);
625
626                 if (req->rq_net_err && !req->rq_timedout)
627                         ptlrpc_expire_one_request(req); 
628
629                 if (req->rq_err) {
630                         ptlrpc_unregister_reply(req);
631                         if (req->rq_status == 0)
632                                 req->rq_status = -EIO;
633                         req->rq_phase = RQ_PHASE_INTERPRET;
634
635                         spin_lock_irqsave(&imp->imp_lock, flags);
636                         list_del_init(&req->rq_list);
637                         spin_unlock_irqrestore(&imp->imp_lock, flags);
638
639                         GOTO(interpret, req->rq_status);
640                 }
641
642                 /* ptlrpc_queue_wait->l_wait_event guarantees that rq_intr
643                  * will only be set after rq_timedout, but the oig waiting
644                  * path sets rq_intr irrespective of whether ptlrpcd has
645                  * seen a timeout.  our policy is to only interpret 
646                  * interrupted rpcs after they have timed out */
647                 if (req->rq_intr && (req->rq_timedout || req->rq_waiting)) {
648                         /* NB could be on delayed list */
649                         ptlrpc_unregister_reply(req);
650                         req->rq_status = -EINTR;
651                         req->rq_phase = RQ_PHASE_INTERPRET;
652
653                         spin_lock_irqsave(&imp->imp_lock, flags);
654                         list_del_init(&req->rq_list);
655                         spin_unlock_irqrestore(&imp->imp_lock, flags);
656
657                         GOTO(interpret, req->rq_status);
658                 }
659
660                 if (req->rq_phase == RQ_PHASE_RPC) {
661                         if (req->rq_waiting || req->rq_resend) {
662                                 int status;
663
664                                 ptlrpc_unregister_reply(req);
665
666                                 spin_lock_irqsave(&imp->imp_lock, flags);
667
668                                 if (ptlrpc_import_delay_req(imp, req, &status)){
669                                         spin_unlock_irqrestore(&imp->imp_lock,
670                                                                flags);
671                                         continue;
672                                 } 
673
674                                 list_del_init(&req->rq_list);
675                                 if (status != 0)  {
676                                         req->rq_status = status;
677                                         req->rq_phase = RQ_PHASE_INTERPRET;
678                                         spin_unlock_irqrestore(&imp->imp_lock,
679                                                                flags);
680                                         GOTO(interpret, req->rq_status);
681                                 }
682                                 if (req->rq_no_resend) {
683                                         req->rq_status = -ENOTCONN;
684                                         req->rq_phase = RQ_PHASE_INTERPRET;
685                                         spin_unlock_irqrestore(&imp->imp_lock,
686                                                                flags);
687                                         GOTO(interpret, req->rq_status);
688                                 }
689                                 list_add_tail(&req->rq_list,
690                                               &imp->imp_sending_list);
691
692                                 spin_unlock_irqrestore(&imp->imp_lock, flags);
693
694                                 req->rq_waiting = 0;
695                                 if (req->rq_resend) {
696                                         lustre_msg_add_flags(req->rq_reqmsg,
697                                                              MSG_RESENT);
698                                         if (req->rq_bulk) {
699                                                 __u64 old_xid = req->rq_xid;
700
701                                                 ptlrpc_unregister_bulk (req);
702
703                                                 /* ensure previous bulk fails */
704                                                 req->rq_xid = ptlrpc_next_xid();
705                                                 CDEBUG(D_HA, "resend bulk "
706                                                        "old x"LPU64
707                                                        " new x"LPU64"\n",
708                                                        old_xid, req->rq_xid);
709                                         }
710                                 }
711
712                                 rc = ptl_send_rpc(req);
713                                 if (rc) {
714                                         DEBUG_REQ(D_HA, req, "send failed (%d)",
715                                                   rc);
716                                         force_timer_recalc = 1;
717                                         req->rq_net_err = 1;
718                                 }
719                                 /* need to reset the timeout */
720                                 force_timer_recalc = 1;
721                         }
722
723                         /* Still waiting for a reply? */
724                         if (ptlrpc_client_receiving_reply(req))
725                                 continue;
726
727                         /* Did we actually receive a reply? */
728                         if (!ptlrpc_client_replied(req))
729                                 continue;
730
731                         spin_lock_irqsave(&imp->imp_lock, flags);
732                         list_del_init(&req->rq_list);
733                         spin_unlock_irqrestore(&imp->imp_lock, flags);
734
735                         req->rq_status = after_reply(req);
736                         if (req->rq_resend) {
737                                 /* Add this req to the delayed list so
738                                    it can be errored if the import is
739                                    evicted after recovery. */
740                                 spin_lock_irqsave (&req->rq_lock, flags);
741                                 list_add_tail(&req->rq_list, 
742                                               &imp->imp_delayed_list);
743                                 spin_unlock_irqrestore(&req->rq_lock, flags);
744                                 continue;
745                         }
746
747                         /* If there is no bulk associated with this request,
748                          * then we're done and should let the interpreter
749                          * process the reply.  Similarly if the RPC returned
750                          * an error, and therefore the bulk will never arrive.
751                          */
752                         if (req->rq_bulk == NULL || req->rq_status != 0) {
753                                 req->rq_phase = RQ_PHASE_INTERPRET;
754                                 GOTO(interpret, req->rq_status);
755                         }
756
757                         req->rq_phase = RQ_PHASE_BULK;
758                 }
759
760                 LASSERT(req->rq_phase == RQ_PHASE_BULK);
761                 if (ptlrpc_bulk_active(req->rq_bulk))
762                         continue;
763
764                 if (!req->rq_bulk->bd_success) {
765                         /* The RPC reply arrived OK, but the bulk screwed
766                          * up!  Dead wierd since the server told us the RPC
767                          * was good after getting the REPLY for her GET or
768                          * the ACK for her PUT. */
769                         DEBUG_REQ(D_ERROR, req, "bulk transfer failed");
770                         LBUG();
771                 }
772
773                 req->rq_phase = RQ_PHASE_INTERPRET;
774
775         interpret:
776                 LASSERT(req->rq_phase == RQ_PHASE_INTERPRET);
777                 LASSERT(!req->rq_receiving_reply);
778
779                 ptlrpc_unregister_reply(req);
780                 if (req->rq_bulk != NULL)
781                         ptlrpc_unregister_bulk (req);
782
783                 req->rq_phase = RQ_PHASE_COMPLETE;
784
785                 if (req->rq_interpret_reply != NULL) {
786                         int (*interpreter)(struct ptlrpc_request *,void *,int) =
787                                 req->rq_interpret_reply;
788                         req->rq_status = interpreter(req, &req->rq_async_args,
789                                                      req->rq_status);
790                 }
791
792                 CDEBUG(D_RPCTRACE, "Completed RPC pname:cluuid:pid:xid:ni:nid:"
793                        "opc %s:%s:%d:"LPU64":%s:%s:%d\n", current->comm,
794                        imp->imp_obd->obd_uuid.uuid, req->rq_reqmsg->status,
795                        req->rq_xid,
796                        imp->imp_connection->c_peer.peer_ni->pni_name,
797                        ptlrpc_peernid2str(&imp->imp_connection->c_peer, str),
798                        req->rq_reqmsg->opc);
799
800                 set->set_remaining--;
801
802                 atomic_dec(&imp->imp_inflight);
803                 wake_up(&imp->imp_recovery_waitq);
804         }
805
806         /* If we hit an error, we want to recover promptly. */
807         RETURN(set->set_remaining == 0 || force_timer_recalc);
808 }
809
810 int ptlrpc_expire_one_request(struct ptlrpc_request *req)
811 {
812         unsigned long      flags;
813         struct obd_import *imp = req->rq_import;
814         ENTRY;
815
816         DEBUG_REQ(D_ERROR, req, "timeout");
817
818         spin_lock_irqsave (&req->rq_lock, flags);
819         req->rq_timedout = 1;
820         spin_unlock_irqrestore (&req->rq_lock, flags);
821
822         ptlrpc_unregister_reply (req);
823
824         if (req->rq_bulk != NULL)
825                 ptlrpc_unregister_bulk (req);
826
827         if (imp == NULL) {
828                 DEBUG_REQ(D_HA, req, "NULL import: already cleaned up?");
829                 RETURN(1);
830         }
831
832         /* The DLM server doesn't want recovery run on its imports. */
833         if (imp->imp_dlm_fake)
834                 RETURN(1);
835
836         /* If this request is for recovery or other primordial tasks,
837          * then error it out here. */
838         if (req->rq_send_state != LUSTRE_IMP_FULL || 
839             imp->imp_obd->obd_no_recov) {
840                 spin_lock_irqsave (&req->rq_lock, flags);
841                 req->rq_status = -ETIMEDOUT;
842                 req->rq_err = 1;
843                 spin_unlock_irqrestore (&req->rq_lock, flags);
844                 RETURN(1);
845         }
846
847         ptlrpc_fail_import(imp, req->rq_import_generation);
848
849         RETURN(0);
850 }
851
852 int ptlrpc_expired_set(void *data)
853 {
854         struct ptlrpc_request_set *set = data;
855         struct list_head          *tmp;
856         time_t                     now = LTIME_S (CURRENT_TIME);
857         ENTRY;
858
859         LASSERT(set != NULL);
860
861         /* A timeout expired; see which reqs it applies to... */
862         list_for_each (tmp, &set->set_requests) {
863                 struct ptlrpc_request *req =
864                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
865
866                 /* request in-flight? */
867                 if (!((req->rq_phase == RQ_PHASE_RPC && !req->rq_waiting 
868                        && !req->rq_resend) ||
869                       (req->rq_phase == RQ_PHASE_BULK)))
870                         continue;
871
872                 if (req->rq_timedout ||           /* already dealt with */
873                     req->rq_sent + req->rq_timeout > now) /* not expired */
874                         continue;
875
876                 /* deal with this guy */
877                 ptlrpc_expire_one_request (req);
878         }
879
880         /* When waiting for a whole set, we always to break out of the
881          * sleep so we can recalculate the timeout, or enable interrupts
882          * iff everyone's timed out.
883          */
884         RETURN(1);
885 }
886
887 void ptlrpc_mark_interrupted(struct ptlrpc_request *req)
888 {
889         unsigned long flags;
890         spin_lock_irqsave(&req->rq_lock, flags);
891         req->rq_intr = 1;
892         spin_unlock_irqrestore(&req->rq_lock, flags);
893 }
894
895 void ptlrpc_interrupted_set(void *data)
896 {
897         struct ptlrpc_request_set *set = data;
898         struct list_head *tmp;
899
900         LASSERT(set != NULL);
901         CERROR("INTERRUPTED SET %p\n", set);
902
903         list_for_each(tmp, &set->set_requests) {
904                 struct ptlrpc_request *req =
905                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
906
907                 if (req->rq_phase != RQ_PHASE_RPC)
908                         continue;
909
910                 ptlrpc_mark_interrupted(req);
911         }
912 }
913
914 int ptlrpc_set_next_timeout(struct ptlrpc_request_set *set)
915 {
916         struct list_head      *tmp;
917         time_t                 now = LTIME_S(CURRENT_TIME);
918         time_t                 deadline;
919         int                    timeout = 0;
920         struct ptlrpc_request *req;
921         ENTRY;
922
923         SIGNAL_MASK_ASSERT(); /* XXX BUG 1511 */
924
925         list_for_each(tmp, &set->set_requests) {
926                 req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
927
928                 /* request in-flight? */
929                 if (!((req->rq_phase == RQ_PHASE_RPC && !req->rq_waiting) ||
930                       (req->rq_phase == RQ_PHASE_BULK)))
931                         continue;
932
933                 if (req->rq_timedout)   /* already timed out */
934                         continue;
935
936                 deadline = req->rq_sent + req->rq_timeout;
937                 if (deadline <= now)    /* actually expired already */
938                         timeout = 1;    /* ASAP */
939                 else if (timeout == 0 || timeout > deadline - now)
940                         timeout = deadline - now;
941         }
942         RETURN(timeout);
943 }
944                 
945
946 int ptlrpc_set_wait(struct ptlrpc_request_set *set)
947 {
948         struct list_head      *tmp;
949         struct ptlrpc_request *req;
950         struct l_wait_info     lwi;
951         int                    rc, timeout;
952         ENTRY;
953
954         LASSERT(!list_empty(&set->set_requests));
955         list_for_each(tmp, &set->set_requests) {
956                 req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
957                 if (req->rq_phase == RQ_PHASE_NEW)
958                         (void)ptlrpc_send_new_req(req);
959         }
960
961         do {
962                 timeout = ptlrpc_set_next_timeout(set);
963
964                 /* wait until all complete, interrupted, or an in-flight
965                  * req times out */
966                 CDEBUG(D_HA, "set %p going to sleep for %d seconds\n",
967                        set, timeout);
968                 lwi = LWI_TIMEOUT_INTR((timeout ? timeout : 1) * HZ,
969                                        ptlrpc_expired_set, 
970                                        ptlrpc_interrupted_set, set);
971                 rc = l_wait_event(set->set_waitq, ptlrpc_check_set(set), &lwi);
972
973                 LASSERT(rc == 0 || rc == -EINTR || rc == -ETIMEDOUT);
974
975                 /* -EINTR => all requests have been flagged rq_intr so next
976                  * check completes.
977                  * -ETIMEOUTD => someone timed out.  When all reqs have
978                  * timed out, signals are enabled allowing completion with
979                  * EINTR.
980                  * I don't really care if we go once more round the loop in
981                  * the error cases -eeb. */
982         } while (rc != 0);
983
984         LASSERT(set->set_remaining == 0);
985
986         rc = 0;
987         list_for_each(tmp, &set->set_requests) {
988                 req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
989
990                 LASSERT(req->rq_phase == RQ_PHASE_COMPLETE);
991                 if (req->rq_status != 0)
992                         rc = req->rq_status;
993         }
994
995         if (set->set_interpret != NULL) {
996                 int (*interpreter)(struct ptlrpc_request_set *set,void *,int) =
997                         set->set_interpret;
998                 rc = interpreter (set, &set->set_args, rc);
999         }
1000
1001         RETURN(rc);
1002 }
1003
1004 static void __ptlrpc_free_req(struct ptlrpc_request *request, int locked)
1005 {
1006         ENTRY;
1007         if (request == NULL) {
1008                 EXIT;
1009                 return;
1010         }
1011
1012         LASSERTF(!request->rq_receiving_reply, "req %p\n", request);
1013         LASSERTF(request->rq_rqbd == NULL, "req %p\n",request);/* client-side */
1014         LASSERTF(list_empty(&request->rq_list), "req %p\n", request);
1015         LASSERTF(list_empty(&request->rq_set_chain), "req %p\n", request);
1016
1017         /* We must take it off the imp_replay_list first.  Otherwise, we'll set
1018          * request->rq_reqmsg to NULL while osc_close is dereferencing it. */
1019         if (request->rq_import != NULL) {
1020                 unsigned long flags = 0;
1021                 if (!locked)
1022                         spin_lock_irqsave(&request->rq_import->imp_lock, flags);
1023                 list_del_init(&request->rq_replay_list);
1024                 if (!locked)
1025                         spin_unlock_irqrestore(&request->rq_import->imp_lock,
1026                                                flags);
1027         }
1028         LASSERTF(list_empty(&request->rq_replay_list), "req %p\n", request);
1029
1030         if (atomic_read(&request->rq_refcount) != 0) {
1031                 DEBUG_REQ(D_ERROR, request,
1032                           "freeing request with nonzero refcount");
1033                 LBUG();
1034         }
1035
1036         if (request->rq_repmsg != NULL) {
1037                 OBD_FREE(request->rq_repmsg, request->rq_replen);
1038                 request->rq_repmsg = NULL;
1039         }
1040         if (request->rq_reqmsg != NULL) {
1041                 OBD_FREE(request->rq_reqmsg, request->rq_reqlen);
1042                 request->rq_reqmsg = NULL;
1043         }
1044         if (request->rq_export != NULL) {
1045                 class_export_put(request->rq_export);
1046                 request->rq_export = NULL;
1047         }
1048         if (request->rq_import != NULL) {
1049                 class_import_put(request->rq_import);
1050                 request->rq_import = NULL;
1051         }
1052         if (request->rq_bulk != NULL)
1053                 ptlrpc_free_bulk(request->rq_bulk);
1054
1055         OBD_FREE(request, sizeof(*request));
1056         EXIT;
1057 }
1058
1059 void ptlrpc_free_req(struct ptlrpc_request *request)
1060 {
1061         __ptlrpc_free_req(request, 0);
1062 }
1063
1064 static int __ptlrpc_req_finished(struct ptlrpc_request *request, int locked);
1065 void ptlrpc_req_finished_with_imp_lock(struct ptlrpc_request *request)
1066 {
1067         LASSERT_SPIN_LOCKED(&request->rq_import->imp_lock);
1068         (void)__ptlrpc_req_finished(request, 1);
1069 }
1070
1071 static int __ptlrpc_req_finished(struct ptlrpc_request *request, int locked)
1072 {
1073         ENTRY;
1074         if (request == NULL)
1075                 RETURN(1);
1076
1077         if (request == LP_POISON ||
1078             request->rq_reqmsg == LP_POISON) {
1079                 CERROR("dereferencing freed request (bug 575)\n");
1080                 LBUG();
1081                 RETURN(1);
1082         }
1083
1084         DEBUG_REQ(D_INFO, request, "refcount now %u",
1085                   atomic_read(&request->rq_refcount) - 1);
1086
1087         if (atomic_dec_and_test(&request->rq_refcount)) {
1088                 __ptlrpc_free_req(request, locked);
1089                 RETURN(1);
1090         }
1091
1092         RETURN(0);
1093 }
1094
1095 void ptlrpc_req_finished(struct ptlrpc_request *request)
1096 {
1097         __ptlrpc_req_finished(request, 0);
1098 }
1099
1100 /* Disengage the client's reply buffer from the network
1101  * NB does _NOT_ unregister any client-side bulk.
1102  * IDEMPOTENT, but _not_ safe against concurrent callers.
1103  * The request owner (i.e. the thread doing the I/O) must call...
1104  */
1105 void ptlrpc_unregister_reply (struct ptlrpc_request *request)
1106 {
1107         int                rc;
1108         wait_queue_head_t *wq;
1109         struct l_wait_info lwi;
1110
1111         LASSERT(!in_interrupt ());             /* might sleep */
1112
1113         if (!ptlrpc_client_receiving_reply(request))
1114                 return;
1115
1116         PtlMDUnlink (request->rq_reply_md_h);
1117
1118         /* We have to l_wait_event() whatever the result, to give liblustre
1119          * a chance to run reply_in_callback() */
1120
1121         if (request->rq_set == NULL)
1122                 wq = &request->rq_set->set_waitq;
1123         else
1124                 wq = &request->rq_reply_waitq;
1125
1126         for (;;) {
1127                 /* Network access will complete in finite time but the HUGE
1128                  * timeout lets us CWARN for visibility of sluggish NALs */
1129                 lwi = LWI_TIMEOUT(300 * HZ, NULL, NULL);
1130                 rc = l_wait_event (*wq, !ptlrpc_client_receiving_reply(request), &lwi);
1131                 if (rc == 0)
1132                         return;
1133
1134                 LASSERT (rc == -ETIMEDOUT);
1135                 DEBUG_REQ(D_WARNING, request, "Unexpectedly long timeout");
1136         }
1137 }
1138
1139 /* caller must hold imp->imp_lock */
1140 void ptlrpc_free_committed(struct obd_import *imp)
1141 {
1142         struct list_head *tmp, *saved;
1143         struct ptlrpc_request *req;
1144         struct ptlrpc_request *last_req = NULL; /* temporary fire escape */
1145         ENTRY;
1146
1147         LASSERT(imp != NULL);
1148
1149         LASSERT_SPIN_LOCKED(&imp->imp_lock);
1150
1151         CDEBUG(D_HA, "%s: committing for last_committed "LPU64"\n",
1152                imp->imp_obd->obd_name, imp->imp_peer_committed_transno);
1153
1154         list_for_each_safe(tmp, saved, &imp->imp_replay_list) {
1155                 req = list_entry(tmp, struct ptlrpc_request, rq_replay_list);
1156
1157                 /* XXX ok to remove when 1357 resolved - rread 05/29/03  */
1158                 LASSERT(req != last_req);
1159                 last_req = req;
1160
1161                 if (req->rq_import_generation < imp->imp_generation) {
1162                         DEBUG_REQ(D_HA, req, "freeing request with old gen");
1163                         GOTO(free_req, 0);
1164                 }
1165
1166                 if (req->rq_replay) {
1167                         DEBUG_REQ(D_HA, req, "keeping (FL_REPLAY)");
1168                         continue;
1169                 }
1170
1171                 /* not yet committed */
1172                 if (req->rq_transno > imp->imp_peer_committed_transno) {
1173                         DEBUG_REQ(D_HA, req, "stopping search");
1174                         break;
1175                 }
1176
1177                 DEBUG_REQ(D_HA, req, "committing (last_committed "LPU64")",
1178                           imp->imp_peer_committed_transno);
1179 free_req:
1180                 if (req->rq_commit_cb != NULL)
1181                         req->rq_commit_cb(req);
1182                 list_del_init(&req->rq_replay_list);
1183                 __ptlrpc_req_finished(req, 1);
1184         }
1185
1186         EXIT;
1187         return;
1188 }
1189
1190 void ptlrpc_cleanup_client(struct obd_import *imp)
1191 {
1192         ENTRY;
1193         EXIT;
1194         return;
1195 }
1196
1197 void ptlrpc_resend_req(struct ptlrpc_request *req)
1198 {
1199         unsigned long flags;
1200
1201         DEBUG_REQ(D_HA, req, "going to resend");
1202         req->rq_reqmsg->handle.cookie = 0;
1203         req->rq_status = -EAGAIN;
1204
1205         spin_lock_irqsave (&req->rq_lock, flags);
1206         req->rq_resend = 1;
1207         req->rq_net_err = 0;
1208         req->rq_timedout = 0;
1209         if (req->rq_bulk) {
1210                 __u64 old_xid = req->rq_xid;
1211                 
1212                 /* ensure previous bulk fails */
1213                 req->rq_xid = ptlrpc_next_xid();
1214                 CDEBUG(D_HA, "resend bulk old x"LPU64" new x"LPU64"\n",
1215                        old_xid, req->rq_xid);
1216         }
1217         ptlrpc_wake_client_req(req);
1218         spin_unlock_irqrestore (&req->rq_lock, flags);
1219 }
1220
1221 /* XXX: this function and rq_status are currently unused */
1222 void ptlrpc_restart_req(struct ptlrpc_request *req)
1223 {
1224         unsigned long flags;
1225
1226         DEBUG_REQ(D_HA, req, "restarting (possibly-)completed request");
1227         req->rq_status = -ERESTARTSYS;
1228
1229         spin_lock_irqsave (&req->rq_lock, flags);
1230         req->rq_restart = 1;
1231         req->rq_timedout = 0;
1232         ptlrpc_wake_client_req(req);
1233         spin_unlock_irqrestore (&req->rq_lock, flags);
1234 }
1235
1236 static int expired_request(void *data)
1237 {
1238         struct ptlrpc_request *req = data;
1239         ENTRY;
1240
1241         RETURN(ptlrpc_expire_one_request(req));
1242 }
1243
1244 static void interrupted_request(void *data)
1245 {
1246         unsigned long flags;
1247
1248         struct ptlrpc_request *req = data;
1249         DEBUG_REQ(D_HA, req, "request interrupted");
1250         spin_lock_irqsave (&req->rq_lock, flags);
1251         req->rq_intr = 1;
1252         spin_unlock_irqrestore (&req->rq_lock, flags);
1253 }
1254
1255 struct ptlrpc_request *ptlrpc_request_addref(struct ptlrpc_request *req)
1256 {
1257         ENTRY;
1258         atomic_inc(&req->rq_refcount);
1259         RETURN(req);
1260 }
1261
1262 void ptlrpc_retain_replayable_request(struct ptlrpc_request *req,
1263                                       struct obd_import *imp)
1264 {
1265         struct list_head *tmp;
1266
1267         LASSERT_SPIN_LOCKED(&imp->imp_lock);
1268
1269         /* clear this  for new requests that were resent as well
1270            as resent replayed requests. */
1271         lustre_msg_clear_flags(req->rq_reqmsg,
1272                              MSG_RESENT);
1273
1274         /* don't re-add requests that have been replayed */
1275         if (!list_empty(&req->rq_replay_list))
1276                 return;
1277
1278         lustre_msg_add_flags(req->rq_reqmsg,
1279                              MSG_REPLAY);
1280
1281         LASSERT(imp->imp_replayable);
1282         /* Balanced in ptlrpc_free_committed, usually. */
1283         ptlrpc_request_addref(req);
1284         list_for_each_prev(tmp, &imp->imp_replay_list) {
1285                 struct ptlrpc_request *iter =
1286                         list_entry(tmp, struct ptlrpc_request, rq_replay_list);
1287
1288                 /* We may have duplicate transnos if we create and then
1289                  * open a file, or for closes retained if to match creating
1290                  * opens, so use req->rq_xid as a secondary key.
1291                  * (See bugs 684, 685, and 428.)
1292                  * XXX no longer needed, but all opens need transnos!
1293                  */
1294                 if (iter->rq_transno > req->rq_transno)
1295                         continue;
1296
1297                 if (iter->rq_transno == req->rq_transno) {
1298                         LASSERT(iter->rq_xid != req->rq_xid);
1299                         if (iter->rq_xid > req->rq_xid)
1300                                 continue;
1301                 }
1302
1303                 list_add(&req->rq_replay_list, &iter->rq_replay_list);
1304                 return;
1305         }
1306
1307         list_add_tail(&req->rq_replay_list, &imp->imp_replay_list);
1308 }
1309
1310 int ptlrpc_queue_wait(struct ptlrpc_request *req)
1311 {
1312         char str[PTL_NALFMT_SIZE];
1313         int rc = 0;
1314         int brc;
1315         struct l_wait_info lwi;
1316         struct obd_import *imp = req->rq_import;
1317         unsigned long flags;
1318         int timeout = 0;
1319         ENTRY;
1320
1321         LASSERT(req->rq_set == NULL);
1322         LASSERT(!req->rq_receiving_reply);
1323         atomic_inc(&imp->imp_inflight);
1324
1325         /* for distributed debugging */
1326         req->rq_reqmsg->status = current->pid;
1327         LASSERT(imp->imp_obd != NULL);
1328         CDEBUG(D_RPCTRACE, "Sending RPC pname:cluuid:pid:xid:ni:nid:opc "
1329                "%s:%s:%d:"LPU64":%s:%s:%d\n", current->comm,
1330                imp->imp_obd->obd_uuid.uuid,
1331                req->rq_reqmsg->status, req->rq_xid,
1332                imp->imp_connection->c_peer.peer_ni->pni_name,
1333                ptlrpc_peernid2str(&imp->imp_connection->c_peer, str),
1334                req->rq_reqmsg->opc);
1335
1336         /* Mark phase here for a little debug help */
1337         req->rq_phase = RQ_PHASE_RPC;
1338
1339         spin_lock_irqsave(&imp->imp_lock, flags);
1340         req->rq_import_generation = imp->imp_generation;
1341 restart:
1342         if (ptlrpc_import_delay_req(imp, req, &rc)) {
1343                 list_del(&req->rq_list);
1344
1345                 list_add_tail(&req->rq_list, &imp->imp_delayed_list);
1346                 spin_unlock_irqrestore(&imp->imp_lock, flags);
1347
1348                 DEBUG_REQ(D_HA, req, "\"%s\" waiting for recovery: (%s != %s)",
1349                           current->comm, 
1350                           ptlrpc_import_state_name(req->rq_send_state), 
1351                           ptlrpc_import_state_name(imp->imp_state));
1352                 lwi = LWI_INTR(interrupted_request, req);
1353                 rc = l_wait_event(req->rq_reply_waitq,
1354                                   (req->rq_send_state == imp->imp_state ||
1355                                    req->rq_err),
1356                                   &lwi);
1357                 DEBUG_REQ(D_HA, req, "\"%s\" awake: (%s == %s or %d == 1)",
1358                           current->comm, 
1359                           ptlrpc_import_state_name(imp->imp_state), 
1360                           ptlrpc_import_state_name(req->rq_send_state),
1361                           req->rq_err);
1362
1363                 spin_lock_irqsave(&imp->imp_lock, flags);
1364                 list_del_init(&req->rq_list);
1365
1366                 if (req->rq_err) {
1367                         rc = -EIO;
1368                 } 
1369                 else if (req->rq_intr) {
1370                         rc = -EINTR;
1371                 }
1372                 else if (req->rq_no_resend) {
1373                         spin_unlock_irqrestore(&imp->imp_lock, flags);
1374                         GOTO(out, rc = -ETIMEDOUT);
1375                 }
1376                 else {
1377                         GOTO(restart, rc);
1378                 }
1379         } 
1380
1381         if (rc != 0) {
1382                 list_del_init(&req->rq_list);
1383                 spin_unlock_irqrestore(&imp->imp_lock, flags);
1384                 req->rq_status = rc; // XXX this ok?
1385                 GOTO(out, rc);
1386         }
1387
1388         if (req->rq_resend) {
1389                 lustre_msg_add_flags(req->rq_reqmsg, MSG_RESENT);
1390
1391                 if (req->rq_bulk != NULL)
1392                         ptlrpc_unregister_bulk (req);
1393
1394                 DEBUG_REQ(D_HA, req, "resending: ");
1395         }
1396
1397         /* XXX this is the same as ptlrpc_set_wait */
1398         LASSERT(list_empty(&req->rq_list));
1399         list_add_tail(&req->rq_list, &imp->imp_sending_list);
1400         spin_unlock_irqrestore(&imp->imp_lock, flags);
1401
1402         rc = ptl_send_rpc(req);
1403         if (rc) {
1404                 DEBUG_REQ(D_HA, req, "send failed (%d); recovering", rc);
1405                 timeout = 1;
1406         } else {
1407                 timeout = MAX(req->rq_timeout * HZ, 1);
1408                 DEBUG_REQ(D_NET, req, "-- sleeping");
1409         }
1410         lwi = LWI_TIMEOUT_INTR(timeout, expired_request, interrupted_request,
1411                                req);
1412         l_wait_event(req->rq_reply_waitq, ptlrpc_check_reply(req), &lwi);
1413         DEBUG_REQ(D_NET, req, "-- done sleeping");
1414
1415         CDEBUG(D_RPCTRACE, "Completed RPC pname:cluuid:pid:xid:ni:nid:opc "
1416                "%s:%s:%d:"LPU64":%s:%s:%d\n", current->comm,
1417                imp->imp_obd->obd_uuid.uuid,
1418                req->rq_reqmsg->status, req->rq_xid,
1419                imp->imp_connection->c_peer.peer_ni->pni_name,
1420                ptlrpc_peernid2str(&imp->imp_connection->c_peer, str),
1421                req->rq_reqmsg->opc);
1422
1423         spin_lock_irqsave(&imp->imp_lock, flags);
1424         list_del_init(&req->rq_list);
1425         spin_unlock_irqrestore(&imp->imp_lock, flags);
1426
1427         /* If the reply was received normally, this just grabs the spinlock
1428          * (ensuring the reply callback has returned), sees that
1429          * req->rq_receiving_reply is clear and returns. */
1430         ptlrpc_unregister_reply (req);
1431
1432         if (req->rq_err)
1433                 GOTO(out, rc = -EIO);
1434
1435         /* Resend if we need to, unless we were interrupted. */
1436         if (req->rq_resend && !req->rq_intr) {
1437                 /* ...unless we were specifically told otherwise. */
1438                 if (req->rq_no_resend)
1439                         GOTO(out, rc = -ETIMEDOUT);
1440                 spin_lock_irqsave(&imp->imp_lock, flags);
1441                 goto restart;
1442         }
1443
1444         if (req->rq_intr) {
1445                 /* Should only be interrupted if we timed out. */
1446                 if (!req->rq_timedout)
1447                         DEBUG_REQ(D_ERROR, req,
1448                                   "rq_intr set but rq_timedout not");
1449                 GOTO(out, rc = -EINTR);
1450         }
1451
1452         if (req->rq_timedout) {                 /* non-recoverable timeout */
1453                 GOTO(out, rc = -ETIMEDOUT);
1454         }
1455
1456         if (!req->rq_replied) {
1457                 /* How can this be? -eeb */
1458                 DEBUG_REQ(D_ERROR, req, "!rq_replied: ");
1459                 LBUG();
1460                 GOTO(out, rc = req->rq_status);
1461         }
1462
1463         rc = after_reply (req);
1464         /* NB may return +ve success rc */
1465         if (req->rq_resend) {
1466                 spin_lock_irqsave(&imp->imp_lock, flags);
1467                 goto restart;
1468         }
1469
1470  out:
1471         if (req->rq_bulk != NULL) {
1472                 if (rc >= 0) {                  
1473                         /* success so far.  Note that anything going wrong
1474                          * with bulk now, is EXTREMELY strange, since the
1475                          * server must have believed that the bulk
1476                          * tranferred OK before she replied with success to
1477                          * me. */
1478                         lwi = LWI_TIMEOUT(timeout, NULL, NULL);
1479                         brc = l_wait_event(req->rq_reply_waitq,
1480                                            !ptlrpc_bulk_active(req->rq_bulk),
1481                                            &lwi);
1482                         LASSERT(brc == 0 || brc == -ETIMEDOUT);
1483                         if (brc != 0) {
1484                                 LASSERT(brc == -ETIMEDOUT);
1485                                 DEBUG_REQ(D_ERROR, req, "bulk timed out");
1486                                 rc = brc;
1487                         } else if (!req->rq_bulk->bd_success) {
1488                                 DEBUG_REQ(D_ERROR, req, "bulk transfer failed");
1489                                 rc = -EIO;
1490                         }
1491                 }
1492                 if (rc < 0)
1493                         ptlrpc_unregister_bulk (req);
1494         }
1495
1496         LASSERT(!req->rq_receiving_reply);
1497         req->rq_phase = RQ_PHASE_INTERPRET;
1498
1499         atomic_dec(&imp->imp_inflight);
1500         wake_up(&imp->imp_recovery_waitq);
1501         RETURN(rc);
1502 }
1503
1504 struct ptlrpc_replay_async_args {
1505         int praa_old_state;
1506         int praa_old_status;
1507 };
1508
1509 static int ptlrpc_replay_interpret(struct ptlrpc_request *req,
1510                                     void * data, int rc)
1511 {
1512         struct ptlrpc_replay_async_args *aa = data;
1513         struct obd_import *imp = req->rq_import;
1514         unsigned long flags;
1515
1516         atomic_dec(&imp->imp_replay_inflight);
1517         
1518         if (!req->rq_replied) {
1519                 CERROR("request replay timed out, restarting recovery\n");
1520                 GOTO(out, rc = -ETIMEDOUT);
1521         }
1522
1523 #if SWAB_PARANOIA
1524         /* Clear reply swab mask; this is a new reply in sender's byte order */
1525         req->rq_rep_swab_mask = 0;
1526 #endif
1527         LASSERT (req->rq_nob_received <= req->rq_replen);
1528         rc = lustre_unpack_msg(req->rq_repmsg, req->rq_nob_received);
1529         if (rc) {
1530                 CERROR("unpack_rep failed: %d\n", rc);
1531                 GOTO(out, rc = -EPROTO);
1532         }
1533
1534         if (req->rq_repmsg->type == PTL_RPC_MSG_ERR && 
1535             req->rq_repmsg->status == -ENOTCONN) 
1536                 GOTO(out, rc = req->rq_repmsg->status);
1537
1538         /* The transno had better not change over replay. */
1539         LASSERT(req->rq_reqmsg->transno == req->rq_repmsg->transno);
1540
1541         DEBUG_REQ(D_HA, req, "got rep");
1542
1543         /* let the callback do fixups, possibly including in the request */
1544         if (req->rq_replay_cb)
1545                 req->rq_replay_cb(req);
1546
1547         if (req->rq_replied && req->rq_repmsg->status != aa->praa_old_status) {
1548                 DEBUG_REQ(D_ERROR, req, "status %d, old was %d",
1549                           req->rq_repmsg->status, aa->praa_old_status);
1550         } else {
1551                 /* Put it back for re-replay. */
1552                 req->rq_repmsg->status = aa->praa_old_status;
1553         }
1554
1555         spin_lock_irqsave(&imp->imp_lock, flags);
1556         imp->imp_last_replay_transno = req->rq_transno;
1557         spin_unlock_irqrestore(&imp->imp_lock, flags);
1558
1559         /* continue with recovery */
1560         rc = ptlrpc_import_recovery_state_machine(imp);
1561  out:
1562         req->rq_send_state = aa->praa_old_state;
1563         
1564         if (rc != 0)
1565                 /* this replay failed, so restart recovery */
1566                 ptlrpc_connect_import(imp, NULL);
1567
1568         RETURN(rc);
1569 }
1570
1571
1572 int ptlrpc_replay_req(struct ptlrpc_request *req)
1573 {
1574         struct ptlrpc_replay_async_args *aa;
1575         ENTRY;
1576
1577         LASSERT(req->rq_import->imp_state == LUSTRE_IMP_REPLAY);
1578
1579         /* Not handling automatic bulk replay yet (or ever?) */
1580         LASSERT(req->rq_bulk == NULL);
1581
1582         DEBUG_REQ(D_HA, req, "REPLAY");
1583
1584         LASSERT (sizeof (*aa) <= sizeof (req->rq_async_args));
1585         aa = (struct ptlrpc_replay_async_args *)&req->rq_async_args;
1586         memset(aa, 0, sizeof *aa);
1587
1588         /* Prepare request to be resent with ptlrpcd */
1589         aa->praa_old_state = req->rq_send_state;
1590         req->rq_send_state = LUSTRE_IMP_REPLAY;
1591         req->rq_phase = RQ_PHASE_NEW;
1592         aa->praa_old_status = req->rq_repmsg->status;
1593         req->rq_status = 0;
1594
1595         req->rq_interpret_reply = ptlrpc_replay_interpret;
1596         atomic_inc(&req->rq_import->imp_replay_inflight);
1597         ptlrpc_request_addref(req); /* ptlrpcd needs a ref */
1598
1599         ptlrpcd_add_req(req);
1600         RETURN(0);
1601 }
1602
1603 void ptlrpc_abort_inflight(struct obd_import *imp)
1604 {
1605         unsigned long flags;
1606         struct list_head *tmp, *n;
1607         ENTRY;
1608
1609         /* Make sure that no new requests get processed for this import.
1610          * ptlrpc_{queue,set}_wait must (and does) hold imp_lock while testing
1611          * this flag and then putting requests on sending_list or delayed_list.
1612          */
1613         spin_lock_irqsave(&imp->imp_lock, flags);
1614
1615         /* XXX locking?  Maybe we should remove each request with the list
1616          * locked?  Also, how do we know if the requests on the list are
1617          * being freed at this time?
1618          */
1619         list_for_each_safe(tmp, n, &imp->imp_sending_list) {
1620                 struct ptlrpc_request *req =
1621                         list_entry(tmp, struct ptlrpc_request, rq_list);
1622
1623                 DEBUG_REQ(D_HA, req, "inflight");
1624
1625                 spin_lock (&req->rq_lock);
1626                 if (req->rq_import_generation < imp->imp_generation) {
1627                         req->rq_err = 1;
1628                         ptlrpc_wake_client_req(req);
1629                 }
1630                 spin_unlock (&req->rq_lock);
1631         }
1632
1633         list_for_each_safe(tmp, n, &imp->imp_delayed_list) {
1634                 struct ptlrpc_request *req =
1635                         list_entry(tmp, struct ptlrpc_request, rq_list);
1636
1637                 DEBUG_REQ(D_HA, req, "aborting waiting req");
1638
1639                 spin_lock (&req->rq_lock);
1640                 if (req->rq_import_generation < imp->imp_generation) {
1641                         req->rq_err = 1;
1642                         ptlrpc_wake_client_req(req);
1643                 }
1644                 spin_unlock (&req->rq_lock);
1645         }
1646
1647         /* Last chance to free reqs left on the replay list, but we
1648          * will still leak reqs that haven't comitted.  */
1649         if (imp->imp_replayable)
1650                 ptlrpc_free_committed(imp);
1651
1652         spin_unlock_irqrestore(&imp->imp_lock, flags);
1653
1654         EXIT;
1655 }
1656
1657 static __u64 ptlrpc_last_xid = 0;
1658 static spinlock_t ptlrpc_last_xid_lock = SPIN_LOCK_UNLOCKED;
1659
1660 __u64 ptlrpc_next_xid(void)
1661 {
1662         __u64 tmp;
1663         spin_lock(&ptlrpc_last_xid_lock);
1664         tmp = ++ptlrpc_last_xid;
1665         spin_unlock(&ptlrpc_last_xid_lock);
1666         return tmp;
1667 }
1668
1669