Whamcloud - gitweb
add address method of smfs for mmap
[fs/lustre-release.git] / lustre / ptlrpc / client.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  *
21  */
22
23 #define DEBUG_SUBSYSTEM S_RPC
24 #ifndef __KERNEL__
25 #include <errno.h>
26 #include <signal.h>
27 #include <liblustre.h>
28 #endif
29
30 #include <linux/obd_support.h>
31 #include <linux/obd_class.h>
32 #include <linux/lustre_lib.h>
33 #include <linux/lustre_ha.h>
34 #include <linux/lustre_import.h>
35
36 #include "ptlrpc_internal.h"
37
38 void ptlrpc_init_client(int req_portal, int rep_portal, char *name,
39                         struct ptlrpc_client *cl)
40 {
41         cl->cli_request_portal = req_portal;
42         cl->cli_reply_portal   = rep_portal;
43         cl->cli_name           = name;
44 }
45
46 struct ptlrpc_connection *ptlrpc_uuid_to_connection(struct obd_uuid *uuid)
47 {
48         struct ptlrpc_connection *c;
49         struct ptlrpc_peer peer;
50         int err;
51
52         err = ptlrpc_uuid_to_peer(uuid, &peer);
53         if (err != 0) {
54                 CERROR("cannot find peer %s!\n", uuid->uuid);
55                 return NULL;
56         }
57
58         c = ptlrpc_get_connection(&peer, uuid);
59         if (c) {
60                 memcpy(c->c_remote_uuid.uuid,
61                        uuid->uuid, sizeof(c->c_remote_uuid.uuid));
62         }
63
64         CDEBUG(D_INFO, "%s -> %p\n", uuid->uuid, c);
65
66         return c;
67 }
68
69 void ptlrpc_readdress_connection(struct ptlrpc_connection *conn,
70                                  struct obd_uuid *uuid)
71 {
72         struct ptlrpc_peer peer;
73         int err;
74
75         err = ptlrpc_uuid_to_peer(uuid, &peer);
76         if (err != 0) {
77                 CERROR("cannot find peer %s!\n", uuid->uuid);
78                 return;
79         }
80
81         memcpy(&conn->c_peer, &peer, sizeof (peer));
82         return;
83 }
84
85 static inline struct ptlrpc_bulk_desc *new_bulk(int npages, int type, int portal)
86 {
87         struct ptlrpc_bulk_desc *desc;
88
89         OBD_ALLOC(desc, offsetof (struct ptlrpc_bulk_desc, bd_iov[npages]));
90         if (!desc)
91                 return NULL;
92
93         spin_lock_init(&desc->bd_lock);
94         init_waitqueue_head(&desc->bd_waitq);
95         desc->bd_max_pages = npages;
96         desc->bd_page_count = 0;
97         desc->bd_md_h = PTL_HANDLE_NONE;
98         desc->bd_portal = portal;
99         desc->bd_type = type;
100         
101         return desc;
102 }
103
104 struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_imp (struct ptlrpc_request *req,
105                                                int npages, int type, int portal)
106 {
107         struct obd_import *imp = req->rq_import;
108         struct ptlrpc_bulk_desc *desc;
109
110         LASSERT(type == BULK_PUT_SINK || type == BULK_GET_SOURCE);
111         desc = new_bulk(npages, type, portal);
112         if (desc == NULL)
113                 RETURN(NULL);
114
115         desc->bd_import_generation = req->rq_import_generation;
116         desc->bd_import = class_import_get(imp);
117         desc->bd_req = req;
118
119         desc->bd_cbid.cbid_fn  = client_bulk_callback;
120         desc->bd_cbid.cbid_arg = desc;
121
122         /* This makes req own desc, and free it when she frees herself */
123         req->rq_bulk = desc;
124
125         return desc;
126 }
127
128 struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_exp (struct ptlrpc_request *req,
129                                                int npages, int type, int portal)
130 {
131         struct obd_export *exp = req->rq_export;
132         struct ptlrpc_bulk_desc *desc;
133
134         LASSERT(type == BULK_PUT_SOURCE || type == BULK_GET_SINK);
135
136         desc = new_bulk(npages, type, portal);
137         if (desc == NULL)
138                 RETURN(NULL);
139
140         desc->bd_export = class_export_get(exp);
141         desc->bd_req = req;
142
143         desc->bd_cbid.cbid_fn  = server_bulk_callback;
144         desc->bd_cbid.cbid_arg = desc;
145
146         /* NB we don't assign rq_bulk here; server-side requests are
147          * re-used, and the handler frees the bulk desc explicitly. */
148
149         return desc;
150 }
151
152 void ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc,
153                            struct page *page, int pageoffset, int len)
154 {
155 #ifdef __KERNEL__
156         ptl_kiov_t *kiov = &desc->bd_iov[desc->bd_page_count];
157 #else
158         struct iovec *iov = &desc->bd_iov[desc->bd_page_count];
159 #endif
160         LASSERT(desc->bd_page_count < desc->bd_max_pages);
161         LASSERT(page != NULL);
162         LASSERT(pageoffset >= 0);
163         LASSERT(len > 0);
164         LASSERT(pageoffset + len <= PAGE_SIZE);
165
166 #ifdef __KERNEL__
167         kiov->kiov_page   = page;
168         kiov->kiov_offset = pageoffset;
169         kiov->kiov_len    = len;
170 #else
171         iov->iov_base = page->addr + pageoffset;
172         iov->iov_len  = len;
173 #endif
174         desc->bd_page_count++;
175         desc->bd_nob += len;
176 }
177
178 void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *desc)
179 {
180         ENTRY;
181
182         LASSERT(desc != NULL);
183         LASSERT(desc->bd_page_count != 0x5a5a5a5a); /* not freed already */
184         LASSERT(!desc->bd_network_rw);         /* network hands off or */
185         LASSERT((desc->bd_export != NULL) ^ (desc->bd_import != NULL));
186         if (desc->bd_export)
187                 class_export_put(desc->bd_export);
188         else
189                 class_import_put(desc->bd_import);
190
191         OBD_FREE(desc, offsetof(struct ptlrpc_bulk_desc, 
192                                 bd_iov[desc->bd_max_pages]));
193         EXIT;
194 }
195
196 struct ptlrpc_request *ptlrpc_prep_req(struct obd_import *imp, int opcode,
197                                        int count, int *lengths, char **bufs)
198 {
199         struct ptlrpc_request *request;
200         int rc;
201         ENTRY;
202
203         LASSERT((unsigned long)imp > 0x1000);
204
205         OBD_ALLOC(request, sizeof(*request));
206         if (!request) {
207                 CERROR("request allocation out of memory\n");
208                 RETURN(NULL);
209         }
210
211         rc = lustre_pack_request(request, count, lengths, bufs);
212         if (rc) {
213                 CERROR("cannot pack request %d\n", rc);
214                 OBD_FREE(request, sizeof(*request));
215                 RETURN(NULL);
216         }
217
218         if (imp->imp_server_timeout)
219                 request->rq_timeout = obd_timeout / 2;
220         else
221                 request->rq_timeout = obd_timeout;
222         request->rq_send_state = LUSTRE_IMP_FULL;
223         request->rq_type = PTL_RPC_MSG_REQUEST;
224         request->rq_import = class_import_get(imp);
225
226         request->rq_req_cbid.cbid_fn  = request_out_callback;
227         request->rq_req_cbid.cbid_arg = request;
228
229         request->rq_reply_cbid.cbid_fn  = reply_in_callback;
230         request->rq_reply_cbid.cbid_arg = request;
231         
232         request->rq_phase = RQ_PHASE_NEW;
233
234         /* XXX FIXME bug 249 */
235         request->rq_request_portal = imp->imp_client->cli_request_portal;
236         request->rq_reply_portal = imp->imp_client->cli_reply_portal;
237
238         spin_lock_init(&request->rq_lock);
239         INIT_LIST_HEAD(&request->rq_list);
240         INIT_LIST_HEAD(&request->rq_replay_list);
241         init_waitqueue_head(&request->rq_reply_waitq);
242         request->rq_xid = ptlrpc_next_xid();
243         atomic_set(&request->rq_refcount, 1);
244
245         request->rq_reqmsg->opc = opcode;
246         request->rq_reqmsg->flags = 0;
247
248         RETURN(request);
249 }
250
251 struct ptlrpc_request_set *ptlrpc_prep_set(void)
252 {
253         struct ptlrpc_request_set *set;
254
255         OBD_ALLOC(set, sizeof *set);
256         if (!set)
257                 RETURN(NULL);
258         INIT_LIST_HEAD(&set->set_requests);
259         init_waitqueue_head(&set->set_waitq);
260         set->set_remaining = 0;
261         spin_lock_init(&set->set_new_req_lock);
262         INIT_LIST_HEAD(&set->set_new_requests);
263
264         RETURN(set);
265 }
266
267 /* Finish with this set; opposite of prep_set. */
268 void ptlrpc_set_destroy(struct ptlrpc_request_set *set)
269 {
270         struct list_head *tmp;
271         struct list_head *next;
272         int               expected_phase;
273         int               n = 0;
274         ENTRY;
275
276         /* Requests on the set should either all be completed, or all be new */
277         expected_phase = (set->set_remaining == 0) ?
278                          RQ_PHASE_COMPLETE : RQ_PHASE_NEW;
279         list_for_each (tmp, &set->set_requests) {
280                 struct ptlrpc_request *req =
281                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
282
283                 LASSERT(req->rq_phase == expected_phase);
284                 n++;
285         }
286
287         LASSERT(set->set_remaining == 0 || set->set_remaining == n);
288
289         list_for_each_safe(tmp, next, &set->set_requests) {
290                 struct ptlrpc_request *req =
291                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
292                 list_del_init(&req->rq_set_chain);
293
294                 LASSERT(req->rq_phase == expected_phase);
295
296                 if (req->rq_phase == RQ_PHASE_NEW) {
297
298                         if (req->rq_interpret_reply != NULL) {
299                                 int (*interpreter)(struct ptlrpc_request *,
300                                                    void *, int) =
301                                         req->rq_interpret_reply;
302
303                                 /* higher level (i.e. LOV) failed;
304                                  * let the sub reqs clean up */
305                                 req->rq_status = -EBADR;
306                                 interpreter(req, &req->rq_async_args,
307                                             req->rq_status);
308                         }
309                         set->set_remaining--;
310                 }
311
312                 req->rq_set = NULL;
313                 ptlrpc_req_finished (req);
314         }
315
316         LASSERT(set->set_remaining == 0);
317
318         OBD_FREE(set, sizeof(*set));
319         EXIT;
320 }
321
322 void ptlrpc_set_add_req(struct ptlrpc_request_set *set,
323                         struct ptlrpc_request *req)
324 {
325         /* The set takes over the caller's request reference */
326         list_add_tail(&req->rq_set_chain, &set->set_requests);
327         req->rq_set = set;
328         set->set_remaining++;
329 }
330
331 /* lock so many callers can add things, the context that owns the set
332  * is supposed to notice these and move them into the set proper. */
333 void ptlrpc_set_add_new_req(struct ptlrpc_request_set *set,
334                             struct ptlrpc_request *req)
335 {
336         unsigned long flags;
337         spin_lock_irqsave(&set->set_new_req_lock, flags);
338         /* The set takes over the caller's request reference */
339         list_add_tail(&req->rq_set_chain, &set->set_new_requests);
340         req->rq_set = set;
341         spin_unlock_irqrestore(&set->set_new_req_lock, flags);
342 }
343
344 /*
345  * Based on the current state of the import, determine if the request
346  * can be sent, is an error, or should be delayed.
347  *
348  * Returns true if this request should be delayed. If false, and
349  * *status is set, then the request can not be sent and *status is the
350  * error code.  If false and status is 0, then request can be sent.
351  *
352  * The imp->imp_lock must be held.
353  */
354 static int ptlrpc_import_delay_req(struct obd_import *imp, 
355                                    struct ptlrpc_request *req, int *status)
356 {
357         int delay = 0;
358         ENTRY;
359
360         LASSERT (status != NULL);
361         *status = 0;
362
363         if (imp->imp_state == LUSTRE_IMP_NEW) {
364                 DEBUG_REQ(D_ERROR, req, "Uninitialized import.");
365                 *status = -EIO;
366                 LBUG();
367         }
368         else if (imp->imp_state == LUSTRE_IMP_CLOSED) {
369                 DEBUG_REQ(D_ERROR, req, "IMP_CLOSED ");
370                 *status = -EIO;
371         }
372         /*
373          * If the import has been invalidated (such as by an OST failure), the
374          * request must fail with -EIO.  
375          */
376         else if (imp->imp_invalid) {
377                 DEBUG_REQ(D_ERROR, req, "IMP_INVALID");
378                 *status = -EIO;
379         } 
380         else if (req->rq_import_generation != imp->imp_generation) {
381                 DEBUG_REQ(D_ERROR, req, "req wrong generation:");
382                 *status = -EIO;
383         } 
384         else if (req->rq_send_state != imp->imp_state) {
385                 if (imp->imp_obd->obd_no_recov || imp->imp_dlm_fake) 
386                         *status = -EWOULDBLOCK;
387                 else
388                         delay = 1;
389         }
390
391         RETURN(delay);
392 }
393
394 static int ptlrpc_check_reply(struct ptlrpc_request *req)
395 {
396         unsigned long flags;
397         int rc = 0;
398         ENTRY;
399
400         /* serialise with network callback */
401         spin_lock_irqsave (&req->rq_lock, flags);
402
403         if (req->rq_replied) {
404                 DEBUG_REQ(D_NET, req, "REPLIED:");
405                 GOTO(out, rc = 1);
406         }
407
408         if (req->rq_err) {
409                 DEBUG_REQ(D_ERROR, req, "ABORTED:");
410                 GOTO(out, rc = 1);
411         }
412
413         if (req->rq_resend) {
414                 DEBUG_REQ(D_ERROR, req, "RESEND:");
415                 GOTO(out, rc = 1);
416         }
417
418         if (req->rq_restart) {
419                 DEBUG_REQ(D_ERROR, req, "RESTART:");
420                 GOTO(out, rc = 1);
421         }
422         EXIT;
423  out:
424         spin_unlock_irqrestore (&req->rq_lock, flags);
425         DEBUG_REQ(D_NET, req, "rc = %d for", rc);
426         return rc;
427 }
428
429 static int ptlrpc_check_status(struct ptlrpc_request *req)
430 {
431         int err;
432         ENTRY;
433
434         err = req->rq_repmsg->status;
435         if (req->rq_repmsg->type == PTL_RPC_MSG_ERR) {
436                 DEBUG_REQ(D_ERROR, req, "type == PTL_RPC_MSG_ERR, err == %d", 
437                           err);
438                 RETURN(err < 0 ? err : -EINVAL);
439         }
440
441         if (err < 0) {
442                 DEBUG_REQ(D_INFO, req, "status is %d", err);
443         } else if (err > 0) {
444                 /* XXX: translate this error from net to host */
445                 DEBUG_REQ(D_INFO, req, "status is %d", err);
446         }
447
448         RETURN(err);
449 }
450
451 static int after_reply(struct ptlrpc_request *req)
452 {
453         unsigned long flags;
454         struct obd_import *imp = req->rq_import;
455         int rc;
456         ENTRY;
457
458         LASSERT(!req->rq_receiving_reply);
459
460         /* NB Until this point, the whole of the incoming message,
461          * including buflens, status etc is in the sender's byte order. */
462
463 #if SWAB_PARANOIA
464         /* Clear reply swab mask; this is a new reply in sender's byte order */
465         req->rq_rep_swab_mask = 0;
466 #endif
467         LASSERT (req->rq_nob_received <= req->rq_replen);
468         rc = lustre_unpack_msg(req->rq_repmsg, req->rq_nob_received);
469         if (rc) {
470                 CERROR("unpack_rep failed: %d\n", rc);
471                 RETURN(-EPROTO);
472         }
473
474         if (req->rq_repmsg->type != PTL_RPC_MSG_REPLY &&
475             req->rq_repmsg->type != PTL_RPC_MSG_ERR) {
476                 CERROR("invalid packet type received (type=%u)\n",
477                        req->rq_repmsg->type);
478                 RETURN(-EPROTO);
479         }
480
481         /* Store transno in reqmsg for replay. */
482         req->rq_reqmsg->transno = req->rq_transno = req->rq_repmsg->transno;
483
484         rc = ptlrpc_check_status(req);
485
486         /* Either we've been evicted, or the server has failed for
487          * some reason. Try to reconnect, and if that fails, punt to the
488          * upcall. */
489         if (rc == -ENOTCONN) {
490                 if (req->rq_send_state != LUSTRE_IMP_FULL ||
491                     imp->imp_obd->obd_no_recov || imp->imp_dlm_fake) {
492                         RETURN(-ENOTCONN);
493                 }
494
495                 ptlrpc_request_handle_notconn(req);
496
497                 RETURN(rc);
498         }
499
500         if (req->rq_import->imp_replayable) {
501                 spin_lock_irqsave(&imp->imp_lock, flags);
502                 if (req->rq_replay || req->rq_transno != 0)
503                         ptlrpc_retain_replayable_request(req, imp);
504                 else if (req->rq_commit_cb != NULL)
505                         req->rq_commit_cb(req);
506
507                 if (req->rq_transno > imp->imp_max_transno)
508                         imp->imp_max_transno = req->rq_transno;
509
510                 /* Replay-enabled imports return commit-status information. */
511                 if (req->rq_repmsg->last_committed)
512                         imp->imp_peer_committed_transno =
513                                 req->rq_repmsg->last_committed;
514                 ptlrpc_free_committed(imp);
515                 spin_unlock_irqrestore(&imp->imp_lock, flags);
516         }
517
518         RETURN(rc);
519 }
520
521 static int ptlrpc_send_new_req(struct ptlrpc_request *req)
522 {
523         struct obd_import     *imp;
524         unsigned long          flags;
525         int rc;
526         ENTRY;
527
528         LASSERT(req->rq_phase == RQ_PHASE_NEW);
529         req->rq_phase = RQ_PHASE_RPC;
530
531         imp = req->rq_import;
532         spin_lock_irqsave(&imp->imp_lock, flags);
533
534         if (imp->imp_invalid) {
535                 spin_unlock_irqrestore(&imp->imp_lock, flags);
536                 req->rq_status = -EIO;
537                 req->rq_phase = RQ_PHASE_INTERPRET;
538                 RETURN(-EIO);
539         }
540
541         req->rq_import_generation = imp->imp_generation;
542
543         if (ptlrpc_import_delay_req(imp, req, &rc)) {
544                 spin_lock (&req->rq_lock);
545                 req->rq_waiting = 1;
546                 spin_unlock (&req->rq_lock);
547
548                 LASSERT(list_empty (&req->rq_list));
549
550                 // list_del(&req->rq_list);
551                 list_add_tail(&req->rq_list, &imp->imp_delayed_list);
552                 spin_unlock_irqrestore(&imp->imp_lock, flags);
553                 RETURN(0);
554         }
555
556         if (rc != 0) {
557                 spin_unlock_irqrestore(&imp->imp_lock, flags);
558                 req->rq_status = rc;
559                 req->rq_phase = RQ_PHASE_INTERPRET;
560                 RETURN(rc);
561         }
562
563         /* XXX this is the same as ptlrpc_queue_wait */
564         LASSERT(list_empty(&req->rq_list));
565         list_add_tail(&req->rq_list, &imp->imp_sending_list);
566         spin_unlock_irqrestore(&imp->imp_lock, flags);
567
568         req->rq_reqmsg->status = current->pid;
569         CDEBUG(D_RPCTRACE, "Sending RPC pname:cluuid:pid:xid:ni:nid:opc"
570                " %s:%s:%d:"LPU64":%s:"LPX64":%d\n", current->comm,
571                imp->imp_obd->obd_uuid.uuid, req->rq_reqmsg->status,
572                req->rq_xid,
573                imp->imp_connection->c_peer.peer_ni->pni_name,
574                imp->imp_connection->c_peer.peer_nid,
575                req->rq_reqmsg->opc);
576
577         rc = ptl_send_rpc(req);
578         if (rc) {
579                 DEBUG_REQ(D_HA, req, "send failed (%d); expect timeout", rc);
580                 req->rq_timeout = 1;
581                 RETURN(rc);
582         }
583         RETURN(0);
584 }
585
586 int ptlrpc_check_set(struct ptlrpc_request_set *set)
587 {
588         unsigned long flags;
589         struct list_head *tmp;
590         int force_timer_recalc = 0;
591         ENTRY;
592
593         if (set->set_remaining == 0)
594                 RETURN(1);
595
596         list_for_each(tmp, &set->set_requests) {
597                 struct ptlrpc_request *req =
598                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
599                 struct obd_import *imp = req->rq_import;
600                 int rc = 0;
601
602                 if (req->rq_phase == RQ_PHASE_NEW &&
603                     ptlrpc_send_new_req(req)) {
604                         force_timer_recalc = 1;
605                 }
606
607                 if (!(req->rq_phase == RQ_PHASE_RPC ||
608                       req->rq_phase == RQ_PHASE_BULK ||
609                       req->rq_phase == RQ_PHASE_INTERPRET ||
610                       req->rq_phase == RQ_PHASE_COMPLETE)) {
611                         DEBUG_REQ(D_ERROR, req, "bad phase %x", req->rq_phase);
612                         LBUG();
613                 }
614
615                 if (req->rq_phase == RQ_PHASE_COMPLETE)
616                         continue;
617
618                 if (req->rq_phase == RQ_PHASE_INTERPRET)
619                         GOTO(interpret, req->rq_status);
620
621                 if (req->rq_err) {
622                         ptlrpc_unregister_reply(req);
623                         if (req->rq_status == 0)
624                                 req->rq_status = -EIO;
625                         req->rq_phase = RQ_PHASE_INTERPRET;
626
627                         spin_lock_irqsave(&imp->imp_lock, flags);
628                         list_del_init(&req->rq_list);
629                         spin_unlock_irqrestore(&imp->imp_lock, flags);
630
631                         GOTO(interpret, req->rq_status);
632                 }
633
634                 /* ptlrpc_queue_wait->l_wait_event guarantees that rq_intr
635                  * will only be set after rq_timedout, but the osic waiting
636                  * path sets rq_intr irrespective of whether ptlrpcd has
637                  * seen a timeout.  our policy is to only interpret 
638                  * interrupted rpcs after they have timed out */
639                 if (req->rq_intr && (req->rq_timedout || req->rq_waiting)) {
640                         /* NB could be on delayed list */
641                         ptlrpc_unregister_reply(req);
642                         req->rq_status = -EINTR;
643                         req->rq_phase = RQ_PHASE_INTERPRET;
644
645                         spin_lock_irqsave(&imp->imp_lock, flags);
646                         list_del_init(&req->rq_list);
647                         spin_unlock_irqrestore(&imp->imp_lock, flags);
648
649                         GOTO(interpret, req->rq_status);
650                 }
651
652                 if (req->rq_phase == RQ_PHASE_RPC) {
653                         if (req->rq_waiting || req->rq_resend) {
654                                 int status;
655
656                                 LASSERT (!ptlrpc_client_receiving_reply(req));
657                                 LASSERT (req->rq_bulk == NULL ||
658                                          !ptlrpc_bulk_active(req->rq_bulk));
659
660                                 spin_lock_irqsave(&imp->imp_lock, flags);
661
662                                 if (ptlrpc_import_delay_req(imp, req, &status)) {
663                                         spin_unlock_irqrestore(&imp->imp_lock,
664                                                                flags);
665                                         continue;
666                                 } 
667
668                                 list_del(&req->rq_list);
669                                 list_add_tail(&req->rq_list,
670                                               &imp->imp_sending_list);
671
672                                 if (status != 0)  {
673                                         req->rq_status = status;
674                                         req->rq_phase = RQ_PHASE_INTERPRET;
675                                         spin_unlock_irqrestore(&imp->imp_lock,
676                                                                flags);
677                                         GOTO(interpret, req->rq_status);
678                                 }
679                                 spin_unlock_irqrestore(&imp->imp_lock, flags);
680
681                                 req->rq_waiting = 0;
682                                 if (req->rq_resend) {
683                                         lustre_msg_add_flags(req->rq_reqmsg,
684                                                              MSG_RESENT);
685                                         ptlrpc_unregister_reply(req);
686                                         if (req->rq_bulk) {
687                                                 __u64 old_xid = req->rq_xid;
688
689                                                 /* ensure previous bulk fails */
690                                                 req->rq_xid = ptlrpc_next_xid();
691                                                 CDEBUG(D_HA, "resend bulk "
692                                                        "old x"LPU64
693                                                        " new x"LPU64"\n",
694                                                        old_xid, req->rq_xid);
695                                         }
696                                 }
697
698                                 rc = ptl_send_rpc(req);
699                                 if (rc) {
700                                         DEBUG_REQ(D_HA, req, "send failed (%d)",
701                                                   rc);
702                                         force_timer_recalc = 1;
703                                         req->rq_timeout = 0;
704                                 }
705                                 /* need to reset the timeout */
706                                 force_timer_recalc = 1;
707                         }
708
709                         /* Still waiting for a reply? */
710                         if (ptlrpc_client_receiving_reply(req))
711                                 continue;
712
713                         /* Did we actually receive a reply? */
714                         if (!ptlrpc_client_replied(req))
715                                 continue;
716
717                         spin_lock_irqsave(&imp->imp_lock, flags);
718                         list_del_init(&req->rq_list);
719                         spin_unlock_irqrestore(&imp->imp_lock, flags);
720
721                         req->rq_status = after_reply(req);
722                         if (req->rq_resend) {
723                                 /* Add this req to the delayed list so
724                                    it can be errored if the import is
725                                    evicted after recovery. */
726                                 spin_lock_irqsave (&req->rq_lock, flags);
727                                 list_add_tail(&req->rq_list, 
728                                               &imp->imp_delayed_list);
729                                 spin_unlock_irqrestore(&req->rq_lock, flags);
730                                 continue;
731                         }
732
733                         /* If there is no bulk associated with this request,
734                          * then we're done and should let the interpreter
735                          * process the reply.  Similarly if the RPC returned
736                          * an error, and therefore the bulk will never arrive.
737                          */
738                         if (req->rq_bulk == NULL || req->rq_status != 0) {
739                                 req->rq_phase = RQ_PHASE_INTERPRET;
740                                 GOTO(interpret, req->rq_status);
741                         }
742
743                         req->rq_phase = RQ_PHASE_BULK;
744                 }
745
746                 LASSERT(req->rq_phase == RQ_PHASE_BULK);
747                 if (ptlrpc_bulk_active(req->rq_bulk))
748                         continue;
749
750                 if (!req->rq_bulk->bd_success) {
751                         /* The RPC reply arrived OK, but the bulk screwed
752                          * up!  Dead wierd since the server told us the RPC
753                          * was good after getting the REPLY for her GET or
754                          * the ACK for her PUT. */
755                         DEBUG_REQ(D_ERROR, req, "bulk transfer failed");
756                         LBUG();
757                 }
758
759                 req->rq_phase = RQ_PHASE_INTERPRET;
760
761         interpret:
762                 LASSERT(req->rq_phase == RQ_PHASE_INTERPRET);
763                 LASSERT(!req->rq_receiving_reply);
764
765                 ptlrpc_unregister_reply(req);
766                 if (req->rq_bulk != NULL)
767                         ptlrpc_unregister_bulk (req);
768
769                 req->rq_phase = RQ_PHASE_COMPLETE;
770
771                 if (req->rq_interpret_reply != NULL) {
772                         int (*interpreter)(struct ptlrpc_request *,void *,int) =
773                                 req->rq_interpret_reply;
774                         req->rq_status = interpreter(req, &req->rq_async_args,
775                                                      req->rq_status);
776                 }
777
778                 CDEBUG(D_RPCTRACE, "Completed RPC pname:cluuid:pid:xid:ni:nid:"
779                        "opc %s:%s:%d:"LPU64":%s:"LPX64":%d\n", current->comm,
780                        imp->imp_obd->obd_uuid.uuid, req->rq_reqmsg->status,
781                        req->rq_xid,
782                        imp->imp_connection->c_peer.peer_ni->pni_name,
783                        imp->imp_connection->c_peer.peer_nid,
784                        req->rq_reqmsg->opc);
785
786                 set->set_remaining--;
787         }
788
789         /* If we hit an error, we want to recover promptly. */
790         RETURN(set->set_remaining == 0 || force_timer_recalc);
791 }
792
793 int ptlrpc_expire_one_request(struct ptlrpc_request *req)
794 {
795         unsigned long      flags;
796         struct obd_import *imp = req->rq_import;
797         ENTRY;
798
799         DEBUG_REQ(D_ERROR, req, "timeout");
800
801         spin_lock_irqsave (&req->rq_lock, flags);
802         req->rq_timedout = 1;
803         spin_unlock_irqrestore (&req->rq_lock, flags);
804
805         ptlrpc_unregister_reply (req);
806
807         if (req->rq_bulk != NULL)
808                 ptlrpc_unregister_bulk (req);
809
810         if (imp == NULL) {
811                 DEBUG_REQ(D_HA, req, "NULL import: already cleaned up?");
812                 RETURN(1);
813         }
814
815         /* The DLM server doesn't want recovery run on its imports. */
816         if (imp->imp_dlm_fake)
817                 RETURN(1);
818
819         /* If this request is for recovery or other primordial tasks,
820          * then error it out here. */
821         if (req->rq_send_state != LUSTRE_IMP_FULL || 
822             imp->imp_obd->obd_no_recov) {
823                 spin_lock_irqsave (&req->rq_lock, flags);
824                 req->rq_status = -ETIMEDOUT;
825                 req->rq_err = 1;
826                 spin_unlock_irqrestore (&req->rq_lock, flags);
827                 RETURN(1);
828         }
829
830         ptlrpc_fail_import(imp, req->rq_import_generation);
831
832         RETURN(0);
833 }
834
835 int ptlrpc_expired_set(void *data)
836 {
837         struct ptlrpc_request_set *set = data;
838         struct list_head          *tmp;
839         time_t                     now = LTIME_S (CURRENT_TIME);
840         ENTRY;
841
842         LASSERT(set != NULL);
843
844         /* A timeout expired; see which reqs it applies to... */
845         list_for_each (tmp, &set->set_requests) {
846                 struct ptlrpc_request *req =
847                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
848
849                 /* request in-flight? */
850                 if (!((req->rq_phase == RQ_PHASE_RPC && !req->rq_waiting 
851                        && !req->rq_resend) ||
852                       (req->rq_phase == RQ_PHASE_BULK)))
853                         continue;
854
855                 if (req->rq_timedout ||           /* already dealt with */
856                     req->rq_sent + req->rq_timeout > now) /* not expired */
857                         continue;
858
859                 /* deal with this guy */
860                 ptlrpc_expire_one_request (req);
861         }
862
863         /* When waiting for a whole set, we always to break out of the
864          * sleep so we can recalculate the timeout, or enable interrupts
865          * iff everyone's timed out.
866          */
867         RETURN(1);
868 }
869
870 void ptlrpc_mark_interrupted(struct ptlrpc_request *req)
871 {
872         unsigned long flags;
873         spin_lock_irqsave(&req->rq_lock, flags);
874         req->rq_intr = 1;
875         spin_unlock_irqrestore(&req->rq_lock, flags);
876 }
877
878 void ptlrpc_interrupted_set(void *data)
879 {
880         struct ptlrpc_request_set *set = data;
881         struct list_head *tmp;
882
883         LASSERT(set != NULL);
884         CERROR("INTERRUPTED SET %p\n", set);
885
886         list_for_each(tmp, &set->set_requests) {
887                 struct ptlrpc_request *req =
888                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
889
890                 if (req->rq_phase != RQ_PHASE_RPC)
891                         continue;
892
893                 ptlrpc_mark_interrupted(req);
894         }
895 }
896
897 int ptlrpc_set_next_timeout(struct ptlrpc_request_set *set)
898 {
899         struct list_head      *tmp;
900         time_t                 now = LTIME_S(CURRENT_TIME);
901         time_t                 deadline;
902         int                    timeout = 0;
903         struct ptlrpc_request *req;
904         ENTRY;
905
906         SIGNAL_MASK_ASSERT(); /* XXX BUG 1511 */
907
908         list_for_each(tmp, &set->set_requests) {
909                 req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
910
911                 /* request in-flight? */
912                 if (!((req->rq_phase == RQ_PHASE_RPC && !req->rq_waiting) ||
913                       (req->rq_phase == RQ_PHASE_BULK)))
914                         continue;
915
916                 if (req->rq_timedout)   /* already timed out */
917                         continue;
918
919                 deadline = req->rq_sent + req->rq_timeout;
920                 if (deadline <= now)    /* actually expired already */
921                         timeout = 1;    /* ASAP */
922                 else if (timeout == 0 || timeout > deadline - now)
923                         timeout = deadline - now;
924         }
925         RETURN(timeout);
926 }
927                 
928
929 int ptlrpc_set_wait(struct ptlrpc_request_set *set)
930 {
931         struct list_head      *tmp;
932         struct ptlrpc_request *req;
933         struct l_wait_info     lwi;
934         int                    rc, timeout;
935         ENTRY;
936
937         LASSERT(!list_empty(&set->set_requests));
938         list_for_each(tmp, &set->set_requests) {
939                 req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
940                 if (req->rq_phase == RQ_PHASE_NEW)
941                         (void)ptlrpc_send_new_req(req);
942         }
943
944         do {
945                 timeout = ptlrpc_set_next_timeout(set);
946
947                 /* wait until all complete, interrupted, or an in-flight
948                  * req times out */
949                 CDEBUG(D_HA, "set %p going to sleep for %d seconds\n",
950                        set, timeout);
951                 lwi = LWI_TIMEOUT_INTR((timeout ? timeout : 1) * HZ,
952                                        ptlrpc_expired_set, 
953                                        ptlrpc_interrupted_set, set);
954                 rc = l_wait_event(set->set_waitq, ptlrpc_check_set(set), &lwi);
955
956                 LASSERT(rc == 0 || rc == -EINTR || rc == -ETIMEDOUT);
957
958                 /* -EINTR => all requests have been flagged rq_intr so next
959                  * check completes.
960                  * -ETIMEOUTD => someone timed out.  When all reqs have
961                  * timed out, signals are enabled allowing completion with
962                  * EINTR.
963                  * I don't really care if we go once more round the loop in
964                  * the error cases -eeb. */
965         } while (rc != 0);
966
967         LASSERT(set->set_remaining == 0);
968
969         rc = 0;
970         list_for_each(tmp, &set->set_requests) {
971                 req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
972
973                 LASSERT(req->rq_phase == RQ_PHASE_COMPLETE);
974                 if (req->rq_status != 0)
975                         rc = req->rq_status;
976         }
977
978         if (set->set_interpret != NULL) {
979                 int (*interpreter)(struct ptlrpc_request_set *set,void *,int) =
980                         set->set_interpret;
981                 rc = interpreter (set, &set->set_args, rc);
982         }
983
984         RETURN(rc);
985 }
986
987 static void __ptlrpc_free_req(struct ptlrpc_request *request, int locked)
988 {
989         ENTRY;
990         if (request == NULL) {
991                 EXIT;
992                 return;
993         }
994
995         LASSERT(!request->rq_receiving_reply);
996         LASSERT(request->rq_rqbd == NULL);    /* client-side */
997
998         /* We must take it off the imp_replay_list first.  Otherwise, we'll set
999          * request->rq_reqmsg to NULL while osc_close is dereferencing it. */
1000         if (request->rq_import != NULL) {
1001                 unsigned long flags = 0;
1002                 if (!locked)
1003                         spin_lock_irqsave(&request->rq_import->imp_lock, flags);
1004                 list_del_init(&request->rq_replay_list);
1005                 if (!locked)
1006                         spin_unlock_irqrestore(&request->rq_import->imp_lock,
1007                                                flags);
1008         }
1009
1010         if (atomic_read(&request->rq_refcount) != 0) {
1011                 DEBUG_REQ(D_ERROR, request,
1012                           "freeing request with nonzero refcount");
1013                 LBUG();
1014         }
1015
1016         if (request->rq_repmsg != NULL) {
1017                 OBD_FREE(request->rq_repmsg, request->rq_replen);
1018                 request->rq_repmsg = NULL;
1019         }
1020         if (request->rq_reqmsg != NULL) {
1021                 OBD_FREE(request->rq_reqmsg, request->rq_reqlen);
1022                 request->rq_reqmsg = NULL;
1023         }
1024         if (request->rq_export != NULL) {
1025                 class_export_put(request->rq_export);
1026                 request->rq_export = NULL;
1027         }
1028         if (request->rq_import != NULL) {
1029                 class_import_put(request->rq_import);
1030                 request->rq_import = NULL;
1031         }
1032         if (request->rq_bulk != NULL)
1033                 ptlrpc_free_bulk(request->rq_bulk);
1034
1035         OBD_FREE(request, sizeof(*request));
1036         EXIT;
1037 }
1038
1039 void ptlrpc_free_req(struct ptlrpc_request *request)
1040 {
1041         __ptlrpc_free_req(request, 0);
1042 }
1043
1044 static int __ptlrpc_req_finished(struct ptlrpc_request *request, int locked);
1045 void ptlrpc_req_finished_with_imp_lock(struct ptlrpc_request *request)
1046 {
1047 #ifdef CONFIG_SMP
1048         LASSERT(spin_is_locked(&request->rq_import->imp_lock));
1049 #endif
1050         (void)__ptlrpc_req_finished(request, 1);
1051 }
1052
1053 static int __ptlrpc_req_finished(struct ptlrpc_request *request, int locked)
1054 {
1055         ENTRY;
1056         if (request == NULL)
1057                 RETURN(1);
1058
1059         if (request == (void *)(unsigned long)(0x5a5a5a5a5a5a5a5a) ||
1060             request->rq_reqmsg == (void *)(unsigned long)(0x5a5a5a5a5a5a5a5a)) {
1061                 CERROR("dereferencing freed request (bug 575)\n");
1062                 LBUG();
1063                 RETURN(1);
1064         }
1065
1066         DEBUG_REQ(D_INFO, request, "refcount now %u",
1067                   atomic_read(&request->rq_refcount) - 1);
1068
1069         if (atomic_dec_and_test(&request->rq_refcount)) {
1070                 __ptlrpc_free_req(request, locked);
1071                 RETURN(1);
1072         }
1073
1074         RETURN(0);
1075 }
1076
1077 void ptlrpc_req_finished(struct ptlrpc_request *request)
1078 {
1079         __ptlrpc_req_finished(request, 0);
1080 }
1081
1082 /* Disengage the client's reply buffer from the network
1083  * NB does _NOT_ unregister any client-side bulk.
1084  * IDEMPOTENT, but _not_ safe against concurrent callers.
1085  * The request owner (i.e. the thread doing the I/O) must call...
1086  */
1087 void ptlrpc_unregister_reply (struct ptlrpc_request *request)
1088 {
1089         int                rc;
1090         wait_queue_head_t *wq;
1091         struct l_wait_info lwi;
1092
1093         LASSERT(!in_interrupt ());             /* might sleep */
1094
1095         if (!ptlrpc_client_receiving_reply(request))
1096                 return;
1097
1098         rc = PtlMDUnlink (request->rq_reply_md_h);
1099         if (rc == PTL_INV_MD) {
1100                 LASSERT (!ptlrpc_client_receiving_reply(request));
1101                 return;
1102         }
1103         
1104         LASSERT (rc == PTL_OK);
1105
1106         if (request->rq_set == NULL)
1107                 wq = &request->rq_set->set_waitq;
1108         else
1109                 wq = &request->rq_reply_waitq;
1110
1111         for (;;) {
1112                 /* Network access will complete in finite time but the HUGE
1113                  * timeout lets us CWARN for visibility of sluggish NALs */
1114                 lwi = LWI_TIMEOUT(300 * HZ, NULL, NULL);
1115                 rc = l_wait_event (*wq, !ptlrpc_client_receiving_reply(request), &lwi);
1116                 if (rc == 0)
1117                         return;
1118
1119                 LASSERT (rc == -ETIMEDOUT);
1120                 DEBUG_REQ(D_WARNING, request, "Unexpectedly long timeout");
1121         }
1122 }
1123
1124 /* caller must hold imp->imp_lock */
1125 void ptlrpc_free_committed(struct obd_import *imp)
1126 {
1127         struct list_head *tmp, *saved;
1128         struct ptlrpc_request *req;
1129         struct ptlrpc_request *last_req = NULL; /* temporary fire escape */
1130         ENTRY;
1131
1132         LASSERT(imp != NULL);
1133
1134 #ifdef CONFIG_SMP
1135         LASSERT(spin_is_locked(&imp->imp_lock));
1136 #endif
1137
1138         CDEBUG(D_HA, "%s: committing for last_committed "LPU64"\n",
1139                imp->imp_obd->obd_name, imp->imp_peer_committed_transno);
1140
1141         list_for_each_safe(tmp, saved, &imp->imp_replay_list) {
1142                 req = list_entry(tmp, struct ptlrpc_request, rq_replay_list);
1143
1144                 /* XXX ok to remove when 1357 resolved - rread 05/29/03  */
1145                 LASSERT(req != last_req);
1146                 last_req = req;
1147
1148                 if (req->rq_import_generation < imp->imp_generation) {
1149                         DEBUG_REQ(D_HA, req, "freeing request with old gen");
1150                         GOTO(free_req, 0);
1151                 }
1152
1153                 if (req->rq_replay) {
1154                         DEBUG_REQ(D_HA, req, "keeping (FL_REPLAY)");
1155                         continue;
1156                 }
1157
1158                 /* not yet committed */
1159                 if (req->rq_transno > imp->imp_peer_committed_transno) {
1160                         DEBUG_REQ(D_HA, req, "stopping search");
1161                         break;
1162                 }
1163
1164                 DEBUG_REQ(D_HA, req, "committing (last_committed "LPU64")",
1165                           imp->imp_peer_committed_transno);
1166 free_req:
1167                 if (req->rq_commit_cb != NULL)
1168                         req->rq_commit_cb(req);
1169                 list_del_init(&req->rq_replay_list);
1170                 __ptlrpc_req_finished(req, 1);
1171         }
1172
1173         EXIT;
1174         return;
1175 }
1176
1177 void ptlrpc_cleanup_client(struct obd_import *imp)
1178 {
1179         ENTRY;
1180         EXIT;
1181         return;
1182 }
1183
1184 void ptlrpc_resend_req(struct ptlrpc_request *req)
1185 {
1186         unsigned long flags;
1187
1188         DEBUG_REQ(D_HA, req, "going to resend");
1189         req->rq_reqmsg->handle.cookie = 0;
1190         req->rq_status = -EAGAIN;
1191
1192         spin_lock_irqsave (&req->rq_lock, flags);
1193         req->rq_resend = 1;
1194         req->rq_timedout = 0;
1195         if (req->rq_bulk) {
1196                 __u64 old_xid = req->rq_xid;
1197                 
1198                 /* ensure previous bulk fails */
1199                 req->rq_xid = ptlrpc_next_xid();
1200                 CDEBUG(D_HA, "resend bulk old x"LPU64" new x"LPU64"\n",
1201                        old_xid, req->rq_xid);
1202         }
1203         ptlrpc_wake_client_req(req);
1204         spin_unlock_irqrestore (&req->rq_lock, flags);
1205
1206 }
1207
1208 /* XXX: this function and rq_status are currently unused */
1209 void ptlrpc_restart_req(struct ptlrpc_request *req)
1210 {
1211         unsigned long flags;
1212
1213         DEBUG_REQ(D_HA, req, "restarting (possibly-)completed request");
1214         req->rq_status = -ERESTARTSYS;
1215
1216         spin_lock_irqsave (&req->rq_lock, flags);
1217         req->rq_restart = 1;
1218         req->rq_timedout = 0;
1219         ptlrpc_wake_client_req(req);
1220         spin_unlock_irqrestore (&req->rq_lock, flags);
1221 }
1222
1223 static int expired_request(void *data)
1224 {
1225         struct ptlrpc_request *req = data;
1226         ENTRY;
1227
1228         RETURN(ptlrpc_expire_one_request(req));
1229 }
1230
1231 static void interrupted_request(void *data)
1232 {
1233         unsigned long flags;
1234
1235         struct ptlrpc_request *req = data;
1236         DEBUG_REQ(D_HA, req, "request interrupted");
1237         spin_lock_irqsave (&req->rq_lock, flags);
1238         req->rq_intr = 1;
1239         spin_unlock_irqrestore (&req->rq_lock, flags);
1240 }
1241
1242 struct ptlrpc_request *ptlrpc_request_addref(struct ptlrpc_request *req)
1243 {
1244         ENTRY;
1245         atomic_inc(&req->rq_refcount);
1246         RETURN(req);
1247 }
1248
1249 void ptlrpc_retain_replayable_request(struct ptlrpc_request *req,
1250                                       struct obd_import *imp)
1251 {
1252         struct list_head *tmp;
1253
1254 #ifdef CONFIG_SMP
1255         LASSERT(spin_is_locked(&imp->imp_lock));
1256 #endif
1257
1258         /* don't re-add requests that have been replayed */
1259         if (!list_empty(&req->rq_replay_list))
1260                 return;
1261
1262         LASSERT(imp->imp_replayable);
1263         /* Balanced in ptlrpc_free_committed, usually. */
1264         ptlrpc_request_addref(req);
1265         list_for_each_prev(tmp, &imp->imp_replay_list) {
1266                 struct ptlrpc_request *iter =
1267                         list_entry(tmp, struct ptlrpc_request, rq_replay_list);
1268
1269                 /* We may have duplicate transnos if we create and then
1270                  * open a file, or for closes retained if to match creating
1271                  * opens, so use req->rq_xid as a secondary key.
1272                  * (See bugs 684, 685, and 428.)
1273                  * XXX no longer needed, but all opens need transnos!
1274                  */
1275                 if (iter->rq_transno > req->rq_transno)
1276                         continue;
1277
1278                 if (iter->rq_transno == req->rq_transno) {
1279                         LASSERT(iter->rq_xid != req->rq_xid);
1280                         if (iter->rq_xid > req->rq_xid)
1281                                 continue;
1282                 }
1283
1284                 list_add(&req->rq_replay_list, &iter->rq_replay_list);
1285                 return;
1286         }
1287
1288         list_add_tail(&req->rq_replay_list, &imp->imp_replay_list);
1289 }
1290
1291 int ptlrpc_queue_wait(struct ptlrpc_request *req)
1292 {
1293         int rc = 0;
1294         int brc;
1295         struct l_wait_info lwi;
1296         struct obd_import *imp = req->rq_import;
1297         unsigned long flags;
1298         int timeout = 0;
1299         ENTRY;
1300
1301         LASSERT(req->rq_set == NULL);
1302         LASSERT(!req->rq_receiving_reply);
1303
1304         /* for distributed debugging */
1305         req->rq_reqmsg->status = current->pid;
1306         LASSERT(imp->imp_obd != NULL);
1307         CDEBUG(D_RPCTRACE, "Sending RPC pname:cluuid:pid:xid:ni:nid:opc "
1308                "%s:%s:%d:"LPU64":%s:"LPX64":%d\n", current->comm,
1309                imp->imp_obd->obd_uuid.uuid,
1310                req->rq_reqmsg->status, req->rq_xid,
1311                imp->imp_connection->c_peer.peer_ni->pni_name,
1312                imp->imp_connection->c_peer.peer_nid,
1313                req->rq_reqmsg->opc);
1314
1315         /* Mark phase here for a little debug help */
1316         req->rq_phase = RQ_PHASE_RPC;
1317
1318         spin_lock_irqsave(&imp->imp_lock, flags);
1319         req->rq_import_generation = imp->imp_generation;
1320 restart:
1321         if (ptlrpc_import_delay_req(imp, req, &rc)) {
1322                 list_del(&req->rq_list);
1323
1324                 list_add_tail(&req->rq_list, &imp->imp_delayed_list);
1325                 spin_unlock_irqrestore(&imp->imp_lock, flags);
1326
1327                 DEBUG_REQ(D_HA, req, "\"%s\" waiting for recovery: (%s != %s)",
1328                           current->comm, 
1329                           ptlrpc_import_state_name(req->rq_send_state), 
1330                           ptlrpc_import_state_name(imp->imp_state));
1331                 lwi = LWI_INTR(interrupted_request, req);
1332                 rc = l_wait_event(req->rq_reply_waitq,
1333                                   (req->rq_send_state == imp->imp_state ||
1334                                    req->rq_err),
1335                                   &lwi);
1336                 DEBUG_REQ(D_HA, req, "\"%s\" awake: (%s == %s or %d == 1)",
1337                           current->comm, 
1338                           ptlrpc_import_state_name(imp->imp_state), 
1339                           ptlrpc_import_state_name(req->rq_send_state),
1340                           req->rq_err);
1341
1342                 spin_lock_irqsave(&imp->imp_lock, flags);
1343                 list_del_init(&req->rq_list);
1344
1345                 if (req->rq_err) {
1346                         rc = -EIO;
1347                 } 
1348                 else if (req->rq_intr) {
1349                         rc = -EINTR;
1350                 }
1351                 else {
1352                         GOTO(restart, rc);
1353                 }
1354         } 
1355
1356         if (rc != 0) {
1357                 list_del_init(&req->rq_list);
1358                 spin_unlock_irqrestore(&imp->imp_lock, flags);
1359                 req->rq_status = rc; // XXX this ok?
1360                 GOTO(out, rc);
1361         }
1362
1363         if (req->rq_resend) {
1364                 lustre_msg_add_flags(req->rq_reqmsg, MSG_RESENT);
1365
1366                 if (req->rq_bulk != NULL)
1367                         ptlrpc_unregister_bulk (req);
1368
1369                 DEBUG_REQ(D_HA, req, "resending: ");
1370         }
1371
1372         /* XXX this is the same as ptlrpc_set_wait */
1373         LASSERT(list_empty(&req->rq_list));
1374         list_add_tail(&req->rq_list, &imp->imp_sending_list);
1375         spin_unlock_irqrestore(&imp->imp_lock, flags);
1376
1377         rc = ptl_send_rpc(req);
1378         if (rc) {
1379                 DEBUG_REQ(D_HA, req, "send failed (%d); recovering", rc);
1380                 timeout = 1;
1381         } else {
1382                 timeout = MAX(req->rq_timeout * HZ, 1);
1383                 DEBUG_REQ(D_NET, req, "-- sleeping");
1384         }
1385         lwi = LWI_TIMEOUT_INTR(timeout, expired_request, interrupted_request,
1386                                req);
1387         l_wait_event(req->rq_reply_waitq, ptlrpc_check_reply(req), &lwi);
1388         DEBUG_REQ(D_NET, req, "-- done sleeping");
1389
1390         CDEBUG(D_RPCTRACE, "Completed RPC pname:cluuid:pid:xid:ni:nid:opc "
1391                "%s:%s:%d:"LPU64":%s:"LPX64":%d\n", current->comm,
1392                imp->imp_obd->obd_uuid.uuid,
1393                req->rq_reqmsg->status, req->rq_xid,
1394                imp->imp_connection->c_peer.peer_ni->pni_name,
1395                imp->imp_connection->c_peer.peer_nid,
1396                req->rq_reqmsg->opc);
1397
1398         spin_lock_irqsave(&imp->imp_lock, flags);
1399         list_del_init(&req->rq_list);
1400         spin_unlock_irqrestore(&imp->imp_lock, flags);
1401
1402         /* If the reply was received normally, this just grabs the spinlock
1403          * (ensuring the reply callback has returned), sees that
1404          * req->rq_receiving_reply is clear and returns. */
1405         ptlrpc_unregister_reply (req);
1406
1407         if (req->rq_err)
1408                 GOTO(out, rc = -EIO);
1409
1410         /* Resend if we need to, unless we were interrupted. */
1411         if (req->rq_resend && !req->rq_intr) {
1412                 /* ...unless we were specifically told otherwise. */
1413                 if (req->rq_no_resend)
1414                         GOTO(out, rc = -ETIMEDOUT);
1415                 spin_lock_irqsave(&imp->imp_lock, flags);
1416                 goto restart;
1417         }
1418
1419         if (req->rq_intr) {
1420                 /* Should only be interrupted if we timed out. */
1421                 if (!req->rq_timedout)
1422                         DEBUG_REQ(D_ERROR, req,
1423                                   "rq_intr set but rq_timedout not");
1424                 GOTO(out, rc = -EINTR);
1425         }
1426
1427         if (req->rq_timedout) {                 /* non-recoverable timeout */
1428                 GOTO(out, rc = -ETIMEDOUT);
1429         }
1430
1431         if (!req->rq_replied) {
1432                 /* How can this be? -eeb */
1433                 DEBUG_REQ(D_ERROR, req, "!rq_replied: ");
1434                 LBUG();
1435                 GOTO(out, rc = req->rq_status);
1436         }
1437
1438         rc = after_reply (req);
1439         /* NB may return +ve success rc */
1440         if (req->rq_resend) {
1441                 spin_lock_irqsave(&imp->imp_lock, flags);
1442                 goto restart;
1443         }
1444
1445  out:
1446         if (req->rq_bulk != NULL) {
1447                 if (rc >= 0) {                  
1448                         /* success so far.  Note that anything going wrong
1449                          * with bulk now, is EXTREMELY strange, since the
1450                          * server must have believed that the bulk
1451                          * tranferred OK before she replied with success to
1452                          * me. */
1453                         lwi = LWI_TIMEOUT(timeout, NULL, NULL);
1454                         brc = l_wait_event(req->rq_reply_waitq,
1455                                            !ptlrpc_bulk_active(req->rq_bulk),
1456                                            &lwi);
1457                         LASSERT(brc == 0 || brc == -ETIMEDOUT);
1458                         if (brc != 0) {
1459                                 LASSERT(brc == -ETIMEDOUT);
1460                                 DEBUG_REQ(D_ERROR, req, "bulk timed out");
1461                                 rc = brc;
1462                         } else if (!req->rq_bulk->bd_success) {
1463                                 DEBUG_REQ(D_ERROR, req, "bulk transfer failed");
1464                                 rc = -EIO;
1465                         }
1466                 }
1467                 if (rc < 0)
1468                         ptlrpc_unregister_bulk (req);
1469         }
1470
1471         LASSERT(!req->rq_receiving_reply);
1472         req->rq_phase = RQ_PHASE_INTERPRET;
1473         RETURN(rc);
1474 }
1475
1476 struct ptlrpc_replay_async_args {
1477         int praa_old_state;
1478         int praa_old_status;
1479 };
1480
1481 static int ptlrpc_replay_interpret(struct ptlrpc_request *req,
1482                                     void * data, int rc)
1483 {
1484         struct ptlrpc_replay_async_args *aa = data;
1485         struct obd_import *imp = req->rq_import;
1486         unsigned long flags;
1487
1488         atomic_dec(&imp->imp_replay_inflight);
1489         
1490         if (!req->rq_replied) {
1491                 CERROR("request replay timed out, restarting recovery\n");
1492                 GOTO(out, rc = -ETIMEDOUT);
1493         }
1494
1495 #if SWAB_PARANOIA
1496         /* Clear reply swab mask; this is a new reply in sender's byte order */
1497         req->rq_rep_swab_mask = 0;
1498 #endif
1499         LASSERT (req->rq_nob_received <= req->rq_replen);
1500         rc = lustre_unpack_msg(req->rq_repmsg, req->rq_nob_received);
1501         if (rc) {
1502                 CERROR("unpack_rep failed: %d\n", rc);
1503                 GOTO(out, rc = -EPROTO);
1504         }
1505
1506         if (req->rq_repmsg->type == PTL_RPC_MSG_ERR && 
1507             req->rq_repmsg->status == -ENOTCONN) 
1508                 GOTO(out, rc = req->rq_repmsg->status);
1509
1510         /* The transno had better not change over replay. */
1511         LASSERT(req->rq_reqmsg->transno == req->rq_repmsg->transno);
1512
1513         DEBUG_REQ(D_HA, req, "got rep");
1514
1515         /* let the callback do fixups, possibly including in the request */
1516         if (req->rq_replay_cb)
1517                 req->rq_replay_cb(req);
1518
1519         if (req->rq_replied && req->rq_repmsg->status != aa->praa_old_status) {
1520                 DEBUG_REQ(D_ERROR, req, "status %d, old was %d",
1521                           req->rq_repmsg->status, aa->praa_old_status);
1522         } else {
1523                 /* Put it back for re-replay. */
1524                 req->rq_repmsg->status = aa->praa_old_status;
1525         }
1526
1527         spin_lock_irqsave(&imp->imp_lock, flags);
1528         imp->imp_last_replay_transno = req->rq_transno;
1529         spin_unlock_irqrestore(&imp->imp_lock, flags);
1530
1531         /* continue with recovery */
1532         rc = ptlrpc_import_recovery_state_machine(imp);
1533  out:
1534         req->rq_send_state = aa->praa_old_state;
1535         
1536         if (rc != 0)
1537                 /* this replay failed, so restart recovery */
1538                 ptlrpc_connect_import(imp, NULL);
1539
1540         RETURN(rc);
1541 }
1542
1543
1544 int ptlrpc_replay_req(struct ptlrpc_request *req)
1545 {
1546         struct ptlrpc_replay_async_args *aa;
1547         ENTRY;
1548
1549         LASSERT(req->rq_import->imp_state == LUSTRE_IMP_REPLAY);
1550
1551         /* Not handling automatic bulk replay yet (or ever?) */
1552         LASSERT(req->rq_bulk == NULL);
1553
1554         DEBUG_REQ(D_HA, req, "REPLAY");
1555
1556         LASSERT (sizeof (*aa) <= sizeof (req->rq_async_args));
1557         aa = (struct ptlrpc_replay_async_args *)&req->rq_async_args;
1558         memset(aa, 0, sizeof *aa);
1559
1560         /* Prepare request to be resent with ptlrpcd */
1561         aa->praa_old_state = req->rq_send_state;
1562         req->rq_send_state = LUSTRE_IMP_REPLAY;
1563         req->rq_phase = RQ_PHASE_NEW;
1564         /*
1565          * Q: "How can a req get on the replay list if it wasn't replied?"
1566          * A: "If we failed during the replay of this request, it will still
1567          *     be on the list, but rq_replied will have been reset to 0."
1568          */
1569         if (req->rq_replied) {
1570                 aa->praa_old_status = req->rq_repmsg->status;
1571                 req->rq_status = 0;
1572                 req->rq_replied = 0;
1573         }
1574
1575         req->rq_interpret_reply = ptlrpc_replay_interpret;
1576         atomic_inc(&req->rq_import->imp_replay_inflight);
1577         ptlrpc_request_addref(req); /* ptlrpcd needs a ref */
1578
1579         ptlrpcd_add_req(req);
1580         RETURN(0);
1581 }
1582
1583 void ptlrpc_abort_inflight(struct obd_import *imp)
1584 {
1585         unsigned long flags;
1586         struct list_head *tmp, *n;
1587         ENTRY;
1588
1589         /* Make sure that no new requests get processed for this import.
1590          * ptlrpc_{queue,set}_wait must (and does) hold imp_lock while testing
1591          * this flag and then putting requests on sending_list or delayed_list.
1592          */
1593         spin_lock_irqsave(&imp->imp_lock, flags);
1594
1595         /* XXX locking?  Maybe we should remove each request with the list
1596          * locked?  Also, how do we know if the requests on the list are
1597          * being freed at this time?
1598          */
1599         list_for_each_safe(tmp, n, &imp->imp_sending_list) {
1600                 struct ptlrpc_request *req =
1601                         list_entry(tmp, struct ptlrpc_request, rq_list);
1602
1603                 DEBUG_REQ(D_HA, req, "inflight");
1604
1605                 spin_lock (&req->rq_lock);
1606                 if (req->rq_import_generation < imp->imp_generation) {
1607                         req->rq_err = 1;
1608                         ptlrpc_wake_client_req(req);
1609                 }
1610                 spin_unlock (&req->rq_lock);
1611         }
1612
1613         list_for_each_safe(tmp, n, &imp->imp_delayed_list) {
1614                 struct ptlrpc_request *req =
1615                         list_entry(tmp, struct ptlrpc_request, rq_list);
1616
1617                 DEBUG_REQ(D_HA, req, "aborting waiting req");
1618
1619                 spin_lock (&req->rq_lock);
1620                 if (req->rq_import_generation < imp->imp_generation) {
1621                         req->rq_err = 1;
1622                         ptlrpc_wake_client_req(req);
1623                 }
1624                 spin_unlock (&req->rq_lock);
1625         }
1626
1627         /* Last chance to free reqs left on the replay list, but we
1628          * will still leak reqs that haven't comitted.  */
1629         if (imp->imp_replayable)
1630                 ptlrpc_free_committed(imp);
1631
1632         spin_unlock_irqrestore(&imp->imp_lock, flags);
1633
1634         EXIT;
1635 }
1636
1637 static __u64 ptlrpc_last_xid = 0;
1638 static spinlock_t ptlrpc_last_xid_lock = SPIN_LOCK_UNLOCKED;
1639
1640 __u64 ptlrpc_next_xid(void)
1641 {
1642         __u64 tmp;
1643         spin_lock(&ptlrpc_last_xid_lock);
1644         tmp = ++ptlrpc_last_xid;
1645         spin_unlock(&ptlrpc_last_xid_lock);
1646         return tmp;
1647 }
1648
1649