Whamcloud - gitweb
Landing b_recovery
[fs/lustre-release.git] / lustre / ptlrpc / client.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  *
21  */
22
23 #define DEBUG_SUBSYSTEM S_RPC
24 #ifndef __KERNEL__
25 #include <errno.h>
26 #include <signal.h>
27 #include <liblustre.h>
28 #endif
29
30 #include <linux/obd_support.h>
31 #include <linux/obd_class.h>
32 #include <linux/lustre_lib.h>
33 #include <linux/lustre_ha.h>
34 #include <linux/lustre_import.h>
35
36 #include "ptlrpc_internal.h"
37
38 void ptlrpc_init_client(int req_portal, int rep_portal, char *name,
39                         struct ptlrpc_client *cl)
40 {
41         cl->cli_request_portal = req_portal;
42         cl->cli_reply_portal   = rep_portal;
43         cl->cli_name           = name;
44 }
45
46 struct ptlrpc_connection *ptlrpc_uuid_to_connection(struct obd_uuid *uuid)
47 {
48         struct ptlrpc_connection *c;
49         struct ptlrpc_peer peer;
50         int err;
51
52         err = ptlrpc_uuid_to_peer(uuid, &peer);
53         if (err != 0) {
54                 CERROR("cannot find peer %s!\n", uuid->uuid);
55                 return NULL;
56         }
57
58         c = ptlrpc_get_connection(&peer, uuid);
59         if (c) {
60                 memcpy(c->c_remote_uuid.uuid,
61                        uuid->uuid, sizeof(c->c_remote_uuid.uuid));
62         }
63
64         CDEBUG(D_INFO, "%s -> %p\n", uuid->uuid, c);
65
66         return c;
67 }
68
69 void ptlrpc_readdress_connection(struct ptlrpc_connection *conn,
70                                  struct obd_uuid *uuid)
71 {
72         struct ptlrpc_peer peer;
73         int err;
74
75         err = ptlrpc_uuid_to_peer(uuid, &peer);
76         if (err != 0) {
77                 CERROR("cannot find peer %s!\n", uuid->uuid);
78                 return;
79         }
80
81         memcpy(&conn->c_peer, &peer, sizeof (peer));
82         return;
83 }
84
85 static inline struct ptlrpc_bulk_desc *new_bulk(void)
86 {
87         struct ptlrpc_bulk_desc *desc;
88
89         OBD_ALLOC(desc, sizeof(*desc));
90         if (!desc)
91                 return NULL;
92
93         spin_lock_init(&desc->bd_lock);
94         init_waitqueue_head(&desc->bd_waitq);
95         INIT_LIST_HEAD(&desc->bd_page_list);
96         desc->bd_md_h = PTL_HANDLE_NONE;
97         desc->bd_me_h = PTL_HANDLE_NONE;
98
99         return desc;
100 }
101
102 struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_imp (struct ptlrpc_request *req,
103                                                int type, int portal)
104 {
105         struct obd_import *imp = req->rq_import;
106         struct ptlrpc_bulk_desc *desc;
107
108         LASSERT(type == BULK_PUT_SINK || type == BULK_GET_SOURCE);
109
110         desc = new_bulk();
111         if (desc == NULL)
112                 RETURN(NULL);
113
114         desc->bd_import_generation = req->rq_import_generation;
115         desc->bd_import = class_import_get(imp);
116         desc->bd_req = req;
117         desc->bd_type = type;
118         desc->bd_portal = portal;
119
120         /* This makes req own desc, and free it when she frees herself */
121         req->rq_bulk = desc;
122
123         return desc;
124 }
125
126 struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_exp (struct ptlrpc_request *req,
127                                                int type, int portal)
128 {
129         struct obd_export *exp = req->rq_export;
130         struct ptlrpc_bulk_desc *desc;
131
132         LASSERT(type == BULK_PUT_SOURCE || type == BULK_GET_SINK);
133
134         desc = new_bulk();
135         if (desc == NULL)
136                 RETURN(NULL);
137
138         desc->bd_export = class_export_get(exp);
139         desc->bd_req = req;
140         desc->bd_type = type;
141         desc->bd_portal = portal;
142
143         /* NB we don't assign rq_bulk here; server-side requests are
144          * re-used, and the handler frees the bulk desc explicitly. */
145
146         return desc;
147 }
148
149 int ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc,
150                           struct page *page, int pageoffset, int len)
151 {
152         struct ptlrpc_bulk_page *bulk;
153
154         OBD_ALLOC(bulk, sizeof(*bulk));
155         if (bulk == NULL)
156                 return -ENOMEM;
157
158         LASSERT(page != NULL);
159         LASSERT(pageoffset >= 0);
160         LASSERT(len > 0);
161         LASSERT(pageoffset + len <= PAGE_SIZE);
162
163         bulk->bp_page = page;
164         bulk->bp_pageoffset = pageoffset;
165         bulk->bp_buflen = len;
166
167         bulk->bp_desc = desc;
168         list_add_tail(&bulk->bp_link, &desc->bd_page_list);
169         desc->bd_page_count++;
170         return 0;
171 }
172
173 void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *desc)
174 {
175         struct list_head *tmp, *next;
176         ENTRY;
177
178         LASSERT(desc != NULL);
179         LASSERT(desc->bd_page_count != 0x5a5a5a5a); /* not freed already */
180         LASSERT(!desc->bd_network_rw);         /* network hands off or */
181
182         list_for_each_safe(tmp, next, &desc->bd_page_list) {
183                 struct ptlrpc_bulk_page *bulk;
184                 bulk = list_entry(tmp, struct ptlrpc_bulk_page, bp_link);
185                 ptlrpc_free_bulk_page(bulk);
186         }
187
188         LASSERT(desc->bd_page_count == 0);
189         LASSERT((desc->bd_export != NULL) ^ (desc->bd_import != NULL));
190
191         if (desc->bd_export)
192                 class_export_put(desc->bd_export);
193         else
194                 class_import_put(desc->bd_import);
195
196         OBD_FREE(desc, sizeof(*desc));
197         EXIT;
198 }
199
200 void ptlrpc_free_bulk_page(struct ptlrpc_bulk_page *bulk)
201 {
202         LASSERT(bulk != NULL);
203
204         list_del(&bulk->bp_link);
205         bulk->bp_desc->bd_page_count--;
206         OBD_FREE(bulk, sizeof(*bulk));
207 }
208
209 struct ptlrpc_request *ptlrpc_prep_req(struct obd_import *imp, int opcode,
210                                        int count, int *lengths, char **bufs)
211 {
212         struct ptlrpc_request *request;
213         int rc;
214         ENTRY;
215
216         LASSERT((unsigned long)imp > 0x1000);
217
218         OBD_ALLOC(request, sizeof(*request));
219         if (!request) {
220                 CERROR("request allocation out of memory\n");
221                 RETURN(NULL);
222         }
223
224         rc = lustre_pack_request(request, count, lengths, bufs);
225         if (rc) {
226                 CERROR("cannot pack request %d\n", rc);
227                 OBD_FREE(request, sizeof(*request));
228                 RETURN(NULL);
229         }
230
231         if (imp->imp_server_timeout)
232                 request->rq_timeout = obd_timeout / 2;
233         else
234                 request->rq_timeout = obd_timeout;
235         request->rq_send_state = LUSTRE_IMP_FULL;
236         request->rq_type = PTL_RPC_MSG_REQUEST;
237         request->rq_import = class_import_get(imp);
238         request->rq_phase = RQ_PHASE_NEW;
239
240         /* XXX FIXME bug 249 */
241         request->rq_request_portal = imp->imp_client->cli_request_portal;
242         request->rq_reply_portal = imp->imp_client->cli_reply_portal;
243
244         spin_lock_init(&request->rq_lock);
245         INIT_LIST_HEAD(&request->rq_list);
246         INIT_LIST_HEAD(&request->rq_replay_list);
247         init_waitqueue_head(&request->rq_reply_waitq);
248         request->rq_xid = ptlrpc_next_xid();
249         atomic_set(&request->rq_refcount, 1);
250
251         request->rq_reqmsg->opc = opcode;
252         request->rq_reqmsg->flags = 0;
253
254         RETURN(request);
255 }
256
257 struct ptlrpc_request_set *ptlrpc_prep_set(void)
258 {
259         struct ptlrpc_request_set *set;
260
261         OBD_ALLOC(set, sizeof *set);
262         if (!set)
263                 RETURN(NULL);
264         INIT_LIST_HEAD(&set->set_requests);
265         init_waitqueue_head(&set->set_waitq);
266         set->set_remaining = 0;
267         spin_lock_init(&set->set_new_req_lock);
268         INIT_LIST_HEAD(&set->set_new_requests);
269
270         RETURN(set);
271 }
272
273 /* Finish with this set; opposite of prep_set. */
274 void ptlrpc_set_destroy(struct ptlrpc_request_set *set)
275 {
276         struct list_head *tmp;
277         struct list_head *next;
278         int               expected_phase;
279         int               n = 0;
280         ENTRY;
281
282         /* Requests on the set should either all be completed, or all be new */
283         expected_phase = (set->set_remaining == 0) ?
284                          RQ_PHASE_COMPLETE : RQ_PHASE_NEW;
285         list_for_each (tmp, &set->set_requests) {
286                 struct ptlrpc_request *req =
287                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
288
289                 LASSERT(req->rq_phase == expected_phase);
290                 n++;
291         }
292
293         LASSERT(set->set_remaining == 0 || set->set_remaining == n);
294
295         list_for_each_safe(tmp, next, &set->set_requests) {
296                 struct ptlrpc_request *req =
297                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
298                 list_del_init(&req->rq_set_chain);
299
300                 LASSERT(req->rq_phase == expected_phase);
301
302                 if (req->rq_phase == RQ_PHASE_NEW) {
303
304                         if (req->rq_interpret_reply != NULL) {
305                                 int (*interpreter)(struct ptlrpc_request *,
306                                                    void *, int) =
307                                         req->rq_interpret_reply;
308
309                                 /* higher level (i.e. LOV) failed;
310                                  * let the sub reqs clean up */
311                                 req->rq_status = -EBADR;
312                                 interpreter(req, &req->rq_async_args,
313                                             req->rq_status);
314                         }
315                         set->set_remaining--;
316                 }
317
318                 req->rq_set = NULL;
319                 ptlrpc_req_finished (req);
320         }
321
322         LASSERT(set->set_remaining == 0);
323
324         OBD_FREE(set, sizeof(*set));
325         EXIT;
326 }
327
328 void ptlrpc_set_add_req(struct ptlrpc_request_set *set,
329                         struct ptlrpc_request *req)
330 {
331         /* The set takes over the caller's request reference */
332         list_add_tail(&req->rq_set_chain, &set->set_requests);
333         req->rq_set = set;
334         set->set_remaining++;
335 }
336
337 /* lock so many callers can add things, the context that owns the set
338  * is supposed to notice these and move them into the set proper. */
339 void ptlrpc_set_add_new_req(struct ptlrpc_request_set *set,
340                             struct ptlrpc_request *req)
341 {
342         unsigned long flags;
343         spin_lock_irqsave(&set->set_new_req_lock, flags);
344         /* The set takes over the caller's request reference */
345         list_add_tail(&req->rq_set_chain, &set->set_new_requests);
346         req->rq_set = set;
347         spin_unlock_irqrestore(&set->set_new_req_lock, flags);
348 }
349
350 /*
351  * Based on the current state of the import, determine if the request
352  * can be sent, is an error, or should be delayed.
353  *
354  * Returns true if this request should be delayed. If false, and
355  * *status is set, then the request can not be sent and *status is the
356  * error code.  If false and status is 0, then request can be sent.
357  *
358  * The imp->imp_lock must be held.
359  */
360 static int ptlrpc_import_delay_req(struct obd_import *imp, 
361                                    struct ptlrpc_request *req, int *status)
362 {
363         int delay = 0;
364         ENTRY;
365
366         LASSERT (status != NULL);
367         *status = 0;
368
369         if (imp->imp_state == LUSTRE_IMP_NEW) {
370                 DEBUG_REQ(D_ERROR, req, "Uninitialized import.");
371                 *status = -EIO;
372                 LBUG();
373         }
374         else if (imp->imp_state == LUSTRE_IMP_CLOSED) {
375                 DEBUG_REQ(D_ERROR, req, "IMP_CLOSED ");
376                 *status = -EIO;
377         }
378         /*
379          * If the import has been invalidated (such as by an OST failure), the
380          * request must fail with -EIO.  
381          */
382         else if (imp->imp_invalid) {
383                 DEBUG_REQ(D_ERROR, req, "IMP_INVALID");
384                 *status = -EIO;
385         } 
386         else if (req->rq_import_generation != imp->imp_generation) {
387                 DEBUG_REQ(D_ERROR, req, "req wrong generation:");
388                 *status = -EIO;
389         } 
390         else if (req->rq_send_state != imp->imp_state) {
391                 if (imp->imp_obd->obd_no_recov || imp->imp_dlm_fake) 
392                         *status = -EWOULDBLOCK;
393                 else
394                         delay = 1;
395         }
396
397         RETURN(delay);
398 }
399
400 static int ptlrpc_check_reply(struct ptlrpc_request *req)
401 {
402         unsigned long flags;
403         int rc = 0;
404         ENTRY;
405
406         /* serialise with network callback */
407         spin_lock_irqsave (&req->rq_lock, flags);
408
409         if (req->rq_replied) {
410                 DEBUG_REQ(D_NET, req, "REPLIED:");
411                 GOTO(out, rc = 1);
412         }
413
414         if (req->rq_err) {
415                 DEBUG_REQ(D_ERROR, req, "ABORTED:");
416                 GOTO(out, rc = 1);
417         }
418
419         if (req->rq_resend) {
420                 DEBUG_REQ(D_ERROR, req, "RESEND:");
421                 GOTO(out, rc = 1);
422         }
423
424         if (req->rq_restart) {
425                 DEBUG_REQ(D_ERROR, req, "RESTART:");
426                 GOTO(out, rc = 1);
427         }
428         EXIT;
429  out:
430         spin_unlock_irqrestore (&req->rq_lock, flags);
431         DEBUG_REQ(D_NET, req, "rc = %d for", rc);
432         return rc;
433 }
434
435 static int ptlrpc_check_status(struct ptlrpc_request *req)
436 {
437         int err;
438         ENTRY;
439
440         err = req->rq_repmsg->status;
441         if (req->rq_repmsg->type == PTL_RPC_MSG_ERR) {
442                 DEBUG_REQ(D_ERROR, req, "type == PTL_RPC_MSG_ERR, err == %d", 
443                           err);
444                 RETURN(err < 0 ? err : -EINVAL);
445         }
446
447         if (err < 0) {
448                 DEBUG_REQ(D_INFO, req, "status is %d", err);
449         } else if (err > 0) {
450                 /* XXX: translate this error from net to host */
451                 DEBUG_REQ(D_INFO, req, "status is %d", err);
452         }
453
454         RETURN(err);
455 }
456
457 static int after_reply(struct ptlrpc_request *req)
458 {
459         unsigned long flags;
460         struct obd_import *imp = req->rq_import;
461         int rc;
462         ENTRY;
463
464         LASSERT(!req->rq_receiving_reply);
465         LASSERT(req->rq_replied);
466
467         /* NB Until this point, the whole of the incoming message,
468          * including buflens, status etc is in the sender's byte order. */
469
470 #if SWAB_PARANOIA
471         /* Clear reply swab mask; this is a new reply in sender's byte order */
472         req->rq_rep_swab_mask = 0;
473 #endif
474         rc = lustre_unpack_msg(req->rq_repmsg, req->rq_replen);
475         if (rc) {
476                 CERROR("unpack_rep failed: %d\n", rc);
477                 RETURN(-EPROTO);
478         }
479
480         if (req->rq_repmsg->type != PTL_RPC_MSG_REPLY &&
481             req->rq_repmsg->type != PTL_RPC_MSG_ERR) {
482                 CERROR("invalid packet type received (type=%u)\n",
483                        req->rq_repmsg->type);
484                 RETURN(-EPROTO);
485         }
486
487         /* Store transno in reqmsg for replay. */
488         req->rq_reqmsg->transno = req->rq_transno = req->rq_repmsg->transno;
489
490         rc = ptlrpc_check_status(req);
491
492         /* Either we've been evicted, or the server has failed for
493          * some reason. Try to reconnect, and if that fails, punt to the
494          * upcall. */
495         if (rc == -ENOTCONN) {
496                 if (req->rq_send_state != LUSTRE_IMP_FULL ||
497                     imp->imp_obd->obd_no_recov || imp->imp_dlm_fake) {
498                         RETURN(-ENOTCONN);
499                 }
500
501                 ptlrpc_request_handle_notconn(req);
502
503                 RETURN(rc);
504         }
505
506         if (req->rq_import->imp_replayable) {
507                 spin_lock_irqsave(&imp->imp_lock, flags);
508                 if (req->rq_replay || req->rq_transno != 0)
509                         ptlrpc_retain_replayable_request(req, imp);
510                 else if (req->rq_commit_cb != NULL)
511                         req->rq_commit_cb(req);
512
513                 if (req->rq_transno > imp->imp_max_transno)
514                         imp->imp_max_transno = req->rq_transno;
515
516                 /* Replay-enabled imports return commit-status information. */
517                 if (req->rq_repmsg->last_committed)
518                         imp->imp_peer_committed_transno =
519                                 req->rq_repmsg->last_committed;
520                 ptlrpc_free_committed(imp);
521                 spin_unlock_irqrestore(&imp->imp_lock, flags);
522         }
523
524         RETURN(rc);
525 }
526
527 static int ptlrpc_send_new_req(struct ptlrpc_request *req)
528 {
529         struct obd_import     *imp;
530         unsigned long          flags;
531         int rc;
532         ENTRY;
533
534         LASSERT(req->rq_phase == RQ_PHASE_NEW);
535         req->rq_phase = RQ_PHASE_RPC;
536
537         imp = req->rq_import;
538         spin_lock_irqsave(&imp->imp_lock, flags);
539
540         if (imp->imp_invalid) {
541                 spin_unlock_irqrestore(&imp->imp_lock, flags);
542                 req->rq_status = -EIO;
543                 req->rq_phase = RQ_PHASE_INTERPRET;
544                 RETURN(-EIO);
545         }
546
547         req->rq_import_generation = imp->imp_generation;
548
549         if (ptlrpc_import_delay_req(imp, req, &rc)) {
550                 spin_lock (&req->rq_lock);
551                 req->rq_waiting = 1;
552                 spin_unlock (&req->rq_lock);
553
554                 LASSERT(list_empty (&req->rq_list));
555
556                 // list_del(&req->rq_list);
557                 list_add_tail(&req->rq_list, &imp->imp_delayed_list);
558                 spin_unlock_irqrestore(&imp->imp_lock, flags);
559                 RETURN(0);
560         }
561
562         if (rc != 0) {
563                 spin_unlock_irqrestore(&imp->imp_lock, flags);
564                 req->rq_status = rc;
565                 req->rq_phase = RQ_PHASE_INTERPRET;
566                 RETURN(rc);
567         }
568
569         /* XXX this is the same as ptlrpc_queue_wait */
570         LASSERT(list_empty(&req->rq_list));
571         list_add_tail(&req->rq_list, &imp->imp_sending_list);
572         spin_unlock_irqrestore(&imp->imp_lock, flags);
573
574         req->rq_reqmsg->status = current->pid;
575         CDEBUG(D_RPCTRACE, "Sending RPC pname:cluuid:pid:xid:ni:nid:opc"
576                " %s:%s:%d:"LPU64":%s:"LPX64":%d\n", current->comm,
577                imp->imp_obd->obd_uuid.uuid, req->rq_reqmsg->status,
578                req->rq_xid,
579                imp->imp_connection->c_peer.peer_ni->pni_name,
580                imp->imp_connection->c_peer.peer_nid,
581                req->rq_reqmsg->opc);
582
583         rc = ptl_send_rpc(req);
584         if (rc) {
585                 DEBUG_REQ(D_HA, req, "send failed (%d); expect timeout", rc);
586                 req->rq_timeout = 1;
587                 RETURN(rc);
588         }
589         RETURN(0);
590 }
591
592 int ptlrpc_check_set(struct ptlrpc_request_set *set)
593 {
594         unsigned long flags;
595         struct list_head *tmp;
596         int force_timer_recalc = 0;
597         ENTRY;
598
599         if (set->set_remaining == 0)
600                 RETURN(1);
601
602         list_for_each(tmp, &set->set_requests) {
603                 struct ptlrpc_request *req =
604                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
605                 struct obd_import *imp = req->rq_import;
606                 int rc = 0;
607
608                 if (req->rq_phase == RQ_PHASE_NEW &&
609                     ptlrpc_send_new_req(req)) {
610                         force_timer_recalc = 1;
611                 }
612
613                 if (!(req->rq_phase == RQ_PHASE_RPC ||
614                       req->rq_phase == RQ_PHASE_BULK ||
615                       req->rq_phase == RQ_PHASE_INTERPRET ||
616                       req->rq_phase == RQ_PHASE_COMPLETE)) {
617                         DEBUG_REQ(D_ERROR, req, "bad phase %x", req->rq_phase);
618                         LBUG();
619                 }
620
621                 if (req->rq_phase == RQ_PHASE_COMPLETE)
622                         continue;
623
624                 if (req->rq_phase == RQ_PHASE_INTERPRET)
625                         GOTO(interpret, req->rq_status);
626
627                 if (req->rq_err) {
628                         ptlrpc_unregister_reply(req);
629                         if (req->rq_status == 0)
630                                 req->rq_status = -EIO;
631                         req->rq_phase = RQ_PHASE_INTERPRET;
632
633                         spin_lock_irqsave(&imp->imp_lock, flags);
634                         list_del_init(&req->rq_list);
635                         spin_unlock_irqrestore(&imp->imp_lock, flags);
636
637                         GOTO(interpret, req->rq_status);
638                 }
639
640                 /* ptlrpc_queue_wait->l_wait_event guarantees that rq_intr
641                  * will only be set after rq_timedout, but the osic waiting
642                  * path sets rq_intr irrespective of whether ptlrpcd has
643                  * seen a timeout.  our policy is to only interpret 
644                  * interrupted rpcs after they have timed out */
645                 if (req->rq_intr && (req->rq_timedout || req->rq_waiting)) {
646                         /* NB could be on delayed list */
647                         ptlrpc_unregister_reply(req);
648                         req->rq_status = -EINTR;
649                         req->rq_phase = RQ_PHASE_INTERPRET;
650
651                         spin_lock_irqsave(&imp->imp_lock, flags);
652                         list_del_init(&req->rq_list);
653                         spin_unlock_irqrestore(&imp->imp_lock, flags);
654
655                         GOTO(interpret, req->rq_status);
656                 }
657
658                 if (req->rq_phase == RQ_PHASE_RPC) {
659                         if (req->rq_waiting || req->rq_resend) {
660                                 int status;
661                                 spin_lock_irqsave(&imp->imp_lock, flags);
662
663                                 if (ptlrpc_import_delay_req(imp, req, &status)) {
664                                         spin_unlock_irqrestore(&imp->imp_lock,
665                                                                flags);
666                                         continue;
667                                 } 
668
669                                 list_del(&req->rq_list);
670                                 list_add_tail(&req->rq_list,
671                                               &imp->imp_sending_list);
672
673                                 if (status != 0)  {
674                                         req->rq_status = status;
675                                         req->rq_phase = RQ_PHASE_INTERPRET;
676                                         spin_unlock_irqrestore(&imp->imp_lock,
677                                                                flags);
678                                         GOTO(interpret, req->rq_status);
679                                 }
680                                 spin_unlock_irqrestore(&imp->imp_lock, flags);
681
682                                 req->rq_waiting = 0;
683                                 if (req->rq_resend) {
684                                         lustre_msg_add_flags(req->rq_reqmsg,
685                                                              MSG_RESENT);
686                                         ptlrpc_unregister_reply(req);
687                                         if (req->rq_bulk) {
688                                                 __u64 old_xid = req->rq_xid;
689                                                 ptlrpc_unregister_bulk(req);
690                                                 /* ensure previous bulk fails */
691                                                 req->rq_xid = ptlrpc_next_xid();
692                                                 CDEBUG(D_HA, "resend bulk "
693                                                        "old x"LPU64
694                                                        " new x"LPU64"\n",
695                                                        old_xid, req->rq_xid);
696                                         }
697                                 }
698
699                                 rc = ptl_send_rpc(req);
700                                 if (rc) {
701                                         DEBUG_REQ(D_HA, req, "send failed (%d)",
702                                                   rc);
703                                         force_timer_recalc = 1;
704                                         req->rq_timeout = 0;
705                                 }
706                                 /* need to reset the timeout */
707                                 force_timer_recalc = 1;
708                         }
709
710                         /* Ensure the network callback returned */
711                         spin_lock_irqsave (&req->rq_lock, flags);
712                         if (!req->rq_replied) {
713                                 spin_unlock_irqrestore (&req->rq_lock, flags);
714                                 continue;
715                         }
716                         spin_unlock_irqrestore (&req->rq_lock, flags);
717
718                         spin_lock_irqsave(&imp->imp_lock, flags);
719                         list_del_init(&req->rq_list);
720                         spin_unlock_irqrestore(&imp->imp_lock, flags);
721
722                         req->rq_status = after_reply(req);
723                         if (req->rq_resend) {
724                                 /* Add this req to the delayed list so
725                                    it can be errored if the import is
726                                    evicted after recovery. */
727                                 spin_lock_irqsave (&req->rq_lock, flags);
728                                 list_add_tail(&req->rq_list, 
729                                               &imp->imp_delayed_list);
730                                 spin_unlock_irqrestore(&req->rq_lock, flags);
731                                 continue;
732                         }
733
734                         /* If there is no bulk associated with this request,
735                          * then we're done and should let the interpreter
736                          * process the reply.  Similarly if the RPC returned
737                          * an error, and therefore the bulk will never arrive.
738                          */
739                         if (req->rq_bulk == NULL || req->rq_status != 0) {
740                                 req->rq_phase = RQ_PHASE_INTERPRET;
741                                 GOTO(interpret, req->rq_status);
742                         }
743
744                         req->rq_phase = RQ_PHASE_BULK;
745                 }
746
747                 LASSERT(req->rq_phase == RQ_PHASE_BULK);
748                 if (!ptlrpc_bulk_complete (req->rq_bulk))
749                         continue;
750
751                 req->rq_phase = RQ_PHASE_INTERPRET;
752
753         interpret:
754                 LASSERT(req->rq_phase == RQ_PHASE_INTERPRET);
755                 LASSERT(!req->rq_receiving_reply);
756
757                 ptlrpc_unregister_reply(req);
758                 if (req->rq_bulk != NULL)
759                         ptlrpc_unregister_bulk (req);
760
761                 req->rq_phase = RQ_PHASE_COMPLETE;
762
763                 if (req->rq_interpret_reply != NULL) {
764                         int (*interpreter)(struct ptlrpc_request *,void *,int) =
765                                 req->rq_interpret_reply;
766                         req->rq_status = interpreter(req, &req->rq_async_args,
767                                                      req->rq_status);
768                 }
769
770                 CDEBUG(D_RPCTRACE, "Completed RPC pname:cluuid:pid:xid:ni:nid:"
771                        "opc %s:%s:%d:"LPU64":%s:"LPX64":%d\n", current->comm,
772                        imp->imp_obd->obd_uuid.uuid, req->rq_reqmsg->status,
773                        req->rq_xid,
774                        imp->imp_connection->c_peer.peer_ni->pni_name,
775                        imp->imp_connection->c_peer.peer_nid,
776                        req->rq_reqmsg->opc);
777
778                 set->set_remaining--;
779         }
780
781         /* If we hit an error, we want to recover promptly. */
782         RETURN(set->set_remaining == 0 || force_timer_recalc);
783 }
784
785 int ptlrpc_expire_one_request(struct ptlrpc_request *req)
786 {
787         unsigned long      flags;
788         struct obd_import *imp = req->rq_import;
789         ENTRY;
790
791         DEBUG_REQ(D_ERROR, req, "timeout");
792
793         spin_lock_irqsave (&req->rq_lock, flags);
794         req->rq_timedout = 1;
795         spin_unlock_irqrestore (&req->rq_lock, flags);
796
797         ptlrpc_unregister_reply (req);
798
799         if (imp == NULL) {
800                 DEBUG_REQ(D_HA, req, "NULL import: already cleaned up?");
801                 RETURN(1);
802         }
803
804         /* The DLM server doesn't want recovery run on its imports. */
805         if (imp->imp_dlm_fake)
806                 RETURN(1);
807
808         /* If this request is for recovery or other primordial tasks,
809          * then error it out here. */
810         if (req->rq_send_state != LUSTRE_IMP_FULL || 
811             imp->imp_obd->obd_no_recov) {
812                 spin_lock_irqsave (&req->rq_lock, flags);
813                 req->rq_status = -ETIMEDOUT;
814                 req->rq_err = 1;
815                 spin_unlock_irqrestore (&req->rq_lock, flags);
816                 RETURN(1);
817         }
818
819         ptlrpc_fail_import(imp, req->rq_import_generation);
820
821         RETURN(0);
822 }
823
824 int ptlrpc_expired_set(void *data)
825 {
826         struct ptlrpc_request_set *set = data;
827         struct list_head          *tmp;
828         time_t                     now = LTIME_S (CURRENT_TIME);
829         ENTRY;
830
831         LASSERT(set != NULL);
832
833         /* A timeout expired; see which reqs it applies to... */
834         list_for_each (tmp, &set->set_requests) {
835                 struct ptlrpc_request *req =
836                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
837
838                 /* request in-flight? */
839                 if (!((req->rq_phase == RQ_PHASE_RPC && !req->rq_waiting 
840                        && !req->rq_resend) ||
841                       (req->rq_phase == RQ_PHASE_BULK)))
842                         continue;
843
844                 if (req->rq_timedout ||           /* already dealt with */
845                     req->rq_sent + req->rq_timeout > now) /* not expired */
846                         continue;
847
848                 /* deal with this guy */
849                 ptlrpc_expire_one_request (req);
850         }
851
852         /* When waiting for a whole set, we always to break out of the
853          * sleep so we can recalculate the timeout, or enable interrupts
854          * iff everyone's timed out.
855          */
856         RETURN(1);
857 }
858
859 void ptlrpc_mark_interrupted(struct ptlrpc_request *req)
860 {
861         unsigned long flags;
862         spin_lock_irqsave(&req->rq_lock, flags);
863         req->rq_intr = 1;
864         spin_unlock_irqrestore(&req->rq_lock, flags);
865 }
866
867 void ptlrpc_interrupted_set(void *data)
868 {
869         struct ptlrpc_request_set *set = data;
870         struct list_head *tmp;
871
872         LASSERT(set != NULL);
873         CERROR("INTERRUPTED SET %p\n", set);
874
875         list_for_each(tmp, &set->set_requests) {
876                 struct ptlrpc_request *req =
877                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
878
879                 if (req->rq_phase != RQ_PHASE_RPC)
880                         continue;
881
882                 ptlrpc_mark_interrupted(req);
883         }
884 }
885
886 int ptlrpc_set_next_timeout(struct ptlrpc_request_set *set)
887 {
888         struct list_head      *tmp;
889         time_t                 now = LTIME_S(CURRENT_TIME);
890         time_t                 deadline;
891         int                    timeout = 0;
892         struct ptlrpc_request *req;
893         ENTRY;
894
895         SIGNAL_MASK_ASSERT(); /* XXX BUG 1511 */
896
897         list_for_each(tmp, &set->set_requests) {
898                 req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
899
900                 /* request in-flight? */
901                 if (!((req->rq_phase == RQ_PHASE_RPC && !req->rq_waiting) ||
902                       (req->rq_phase == RQ_PHASE_BULK)))
903                         continue;
904
905                 if (req->rq_timedout)   /* already timed out */
906                         continue;
907
908                 deadline = req->rq_sent + req->rq_timeout;
909                 if (deadline <= now)    /* actually expired already */
910                         timeout = 1;    /* ASAP */
911                 else if (timeout == 0 || timeout > deadline - now)
912                         timeout = deadline - now;
913         }
914         RETURN(timeout);
915 }
916                 
917
918 int ptlrpc_set_wait(struct ptlrpc_request_set *set)
919 {
920         struct list_head      *tmp;
921         struct ptlrpc_request *req;
922         struct l_wait_info     lwi;
923         int                    rc, timeout;
924         ENTRY;
925
926         LASSERT(!list_empty(&set->set_requests));
927         list_for_each(tmp, &set->set_requests) {
928                 req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
929                 (void)ptlrpc_send_new_req(req);
930         }
931
932         do {
933                 timeout = ptlrpc_set_next_timeout(set);
934
935                 /* wait until all complete, interrupted, or an in-flight
936                  * req times out */
937                 CDEBUG(D_HA, "set %p going to sleep for %d seconds\n",
938                        set, timeout);
939                 lwi = LWI_TIMEOUT_INTR((timeout ? timeout : 1) * HZ,
940                                        ptlrpc_expired_set, 
941                                        ptlrpc_interrupted_set, set);
942                 rc = l_wait_event(set->set_waitq, ptlrpc_check_set(set), &lwi);
943
944                 LASSERT(rc == 0 || rc == -EINTR || rc == -ETIMEDOUT);
945
946                 /* -EINTR => all requests have been flagged rq_intr so next
947                  * check completes.
948                  * -ETIMEOUTD => someone timed out.  When all reqs have
949                  * timed out, signals are enabled allowing completion with
950                  * EINTR.
951                  * I don't really care if we go once more round the loop in
952                  * the error cases -eeb. */
953         } while (rc != 0);
954
955         LASSERT(set->set_remaining == 0);
956
957         rc = 0;
958         list_for_each(tmp, &set->set_requests) {
959                 req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
960
961                 LASSERT(req->rq_phase == RQ_PHASE_COMPLETE);
962                 if (req->rq_status != 0)
963                         rc = req->rq_status;
964         }
965
966         if (set->set_interpret != NULL) {
967                 int (*interpreter)(struct ptlrpc_request_set *set,void *,int) =
968                         set->set_interpret;
969                 rc = interpreter (set, &set->set_args, rc);
970         }
971
972         RETURN(rc);
973 }
974
975 static void __ptlrpc_free_req(struct ptlrpc_request *request, int locked)
976 {
977         ENTRY;
978         if (request == NULL) {
979                 EXIT;
980                 return;
981         }
982
983         LASSERT(!request->rq_receiving_reply);
984
985         /* We must take it off the imp_replay_list first.  Otherwise, we'll set
986          * request->rq_reqmsg to NULL while osc_close is dereferencing it. */
987         if (request->rq_import != NULL) {
988                 unsigned long flags = 0;
989                 if (!locked)
990                         spin_lock_irqsave(&request->rq_import->imp_lock, flags);
991                 list_del_init(&request->rq_replay_list);
992                 if (!locked)
993                         spin_unlock_irqrestore(&request->rq_import->imp_lock,
994                                                flags);
995         }
996
997         if (atomic_read(&request->rq_refcount) != 0) {
998                 DEBUG_REQ(D_ERROR, request,
999                           "freeing request with nonzero refcount");
1000                 LBUG();
1001         }
1002
1003         if (request->rq_repmsg != NULL) {
1004                 OBD_FREE(request->rq_repmsg, request->rq_replen);
1005                 request->rq_repmsg = NULL;
1006         }
1007         if (request->rq_reqmsg != NULL) {
1008                 OBD_FREE(request->rq_reqmsg, request->rq_reqlen);
1009                 request->rq_reqmsg = NULL;
1010         }
1011         if (request->rq_export != NULL) {
1012                 class_export_put(request->rq_export);
1013                 request->rq_export = NULL;
1014         }
1015         if (request->rq_import != NULL) {
1016                 class_import_put(request->rq_import);
1017                 request->rq_import = NULL;
1018         }
1019         if (request->rq_bulk != NULL)
1020                 ptlrpc_free_bulk(request->rq_bulk);
1021
1022         OBD_FREE(request, sizeof(*request));
1023         EXIT;
1024 }
1025
1026 void ptlrpc_free_req(struct ptlrpc_request *request)
1027 {
1028         __ptlrpc_free_req(request, 0);
1029 }
1030
1031 static int __ptlrpc_req_finished(struct ptlrpc_request *request, int locked);
1032 void ptlrpc_req_finished_with_imp_lock(struct ptlrpc_request *request)
1033 {
1034 #ifdef CONFIG_SMP
1035         LASSERT(spin_is_locked(&request->rq_import->imp_lock));
1036 #endif
1037         (void)__ptlrpc_req_finished(request, 1);
1038 }
1039
1040 static int __ptlrpc_req_finished(struct ptlrpc_request *request, int locked)
1041 {
1042         ENTRY;
1043         if (request == NULL)
1044                 RETURN(1);
1045
1046         if (request == (void *)(unsigned long)(0x5a5a5a5a5a5a5a5a) ||
1047             request->rq_reqmsg == (void *)(unsigned long)(0x5a5a5a5a5a5a5a5a)) {
1048                 CERROR("dereferencing freed request (bug 575)\n");
1049                 LBUG();
1050                 RETURN(1);
1051         }
1052
1053         DEBUG_REQ(D_INFO, request, "refcount now %u",
1054                   atomic_read(&request->rq_refcount) - 1);
1055
1056         if (atomic_dec_and_test(&request->rq_refcount)) {
1057                 __ptlrpc_free_req(request, locked);
1058                 RETURN(1);
1059         }
1060
1061         RETURN(0);
1062 }
1063
1064 void ptlrpc_req_finished(struct ptlrpc_request *request)
1065 {
1066         __ptlrpc_req_finished(request, 0);
1067 }
1068
1069 /* Disengage the client's reply buffer from the network
1070  * NB does _NOT_ unregister any client-side bulk.
1071  * IDEMPOTENT, but _not_ safe against concurrent callers.
1072  * The request owner (i.e. the thread doing the I/O) must call...
1073  */
1074 void ptlrpc_unregister_reply (struct ptlrpc_request *request)
1075 {
1076         unsigned long flags;
1077         int           rc;
1078         ENTRY;
1079
1080         LASSERT(!in_interrupt ());             /* might sleep */
1081
1082         spin_lock_irqsave (&request->rq_lock, flags);
1083         if (!request->rq_receiving_reply) {     /* not waiting for a reply */
1084                 spin_unlock_irqrestore (&request->rq_lock, flags);
1085                 EXIT;
1086                 /* NB reply buffer not freed here */
1087                 return;
1088         }
1089
1090         LASSERT(!request->rq_replied);         /* callback hasn't completed */
1091         spin_unlock_irqrestore (&request->rq_lock, flags);
1092
1093         rc = PtlMDUnlink (request->rq_reply_md_h);
1094         switch (rc) {
1095         default:
1096                 LBUG ();
1097
1098         case PTL_OK:                            /* unlinked before completion */
1099                 LASSERT(request->rq_receiving_reply);
1100                 LASSERT(!request->rq_replied);
1101                 spin_lock_irqsave (&request->rq_lock, flags);
1102                 request->rq_receiving_reply = 0;
1103                 spin_unlock_irqrestore (&request->rq_lock, flags);
1104                 OBD_FREE(request->rq_repmsg, request->rq_replen);
1105                 request->rq_repmsg = NULL;
1106                 EXIT;
1107                 return;
1108
1109         case PTL_MD_INUSE:                      /* callback in progress */
1110                 for (;;) {
1111                         /* Network access will complete in finite time but
1112                          * the timeout lets us CERROR for visibility */
1113                         struct l_wait_info lwi = LWI_TIMEOUT(10*HZ, NULL, NULL);
1114
1115                         rc = l_wait_event (request->rq_reply_waitq,
1116                                            request->rq_replied, &lwi);
1117                         LASSERT(rc == 0 || rc == -ETIMEDOUT);
1118                         if (rc == 0) {
1119                                 spin_lock_irqsave (&request->rq_lock, flags);
1120                                 /* Ensure the callback has completed scheduling
1121                                  * me and taken its hands off the request */
1122                                 spin_unlock_irqrestore(&request->rq_lock,flags);
1123                                 break;
1124                         }
1125
1126                         CERROR ("Unexpectedly long timeout: req %p\n", request);
1127                 }
1128                 /* fall through */
1129
1130         case PTL_INV_MD:                        /* callback completed */
1131                 LASSERT(!request->rq_receiving_reply);
1132                 LASSERT(request->rq_replied);
1133                 EXIT;
1134                 return;
1135         }
1136         /* Not Reached */
1137 }
1138
1139 /* caller must hold imp->imp_lock */
1140 void ptlrpc_free_committed(struct obd_import *imp)
1141 {
1142         struct list_head *tmp, *saved;
1143         struct ptlrpc_request *req;
1144         struct ptlrpc_request *last_req = NULL; /* temporary fire escape */
1145         ENTRY;
1146
1147         LASSERT(imp != NULL);
1148
1149 #ifdef CONFIG_SMP
1150         LASSERT(spin_is_locked(&imp->imp_lock));
1151 #endif
1152
1153         CDEBUG(D_HA, "%s: committing for last_committed "LPU64"\n",
1154                imp->imp_obd->obd_name, imp->imp_peer_committed_transno);
1155
1156         list_for_each_safe(tmp, saved, &imp->imp_replay_list) {
1157                 req = list_entry(tmp, struct ptlrpc_request, rq_replay_list);
1158
1159                 /* XXX ok to remove when 1357 resolved - rread 05/29/03  */
1160                 LASSERT(req != last_req);
1161                 last_req = req;
1162
1163                 if (req->rq_import_generation < imp->imp_generation) {
1164                         DEBUG_REQ(D_HA, req, "freeing request with old gen");
1165                         GOTO(free_req, 0);
1166                 }
1167
1168                 if (req->rq_replay) {
1169                         DEBUG_REQ(D_HA, req, "keeping (FL_REPLAY)");
1170                         continue;
1171                 }
1172
1173                 /* not yet committed */
1174                 if (req->rq_transno > imp->imp_peer_committed_transno) {
1175                         DEBUG_REQ(D_HA, req, "stopping search");
1176                         break;
1177                 }
1178
1179                 DEBUG_REQ(D_HA, req, "committing (last_committed "LPU64")",
1180                           imp->imp_peer_committed_transno);
1181 free_req:
1182                 if (req->rq_commit_cb != NULL)
1183                         req->rq_commit_cb(req);
1184                 list_del_init(&req->rq_replay_list);
1185                 __ptlrpc_req_finished(req, 1);
1186         }
1187
1188         EXIT;
1189         return;
1190 }
1191
1192 void ptlrpc_cleanup_client(struct obd_import *imp)
1193 {
1194         ENTRY;
1195         EXIT;
1196         return;
1197 }
1198
1199 void ptlrpc_resend_req(struct ptlrpc_request *req)
1200 {
1201         unsigned long flags;
1202
1203         DEBUG_REQ(D_HA, req, "going to resend");
1204         req->rq_reqmsg->handle.cookie = 0;
1205         req->rq_status = -EAGAIN;
1206
1207         spin_lock_irqsave (&req->rq_lock, flags);
1208         req->rq_resend = 1;
1209         req->rq_timedout = 0;
1210         if (req->rq_set != NULL)
1211                 wake_up (&req->rq_set->set_waitq);
1212         else
1213                 wake_up(&req->rq_reply_waitq);
1214         spin_unlock_irqrestore (&req->rq_lock, flags);
1215 }
1216
1217 /* XXX: this function and rq_status are currently unused */
1218 void ptlrpc_restart_req(struct ptlrpc_request *req)
1219 {
1220         unsigned long flags;
1221
1222         DEBUG_REQ(D_HA, req, "restarting (possibly-)completed request");
1223         req->rq_status = -ERESTARTSYS;
1224
1225         spin_lock_irqsave (&req->rq_lock, flags);
1226         req->rq_restart = 1;
1227         req->rq_timedout = 0;
1228         if (req->rq_set != NULL)
1229                 wake_up (&req->rq_set->set_waitq);
1230         else
1231                 wake_up(&req->rq_reply_waitq);
1232         spin_unlock_irqrestore (&req->rq_lock, flags);
1233 }
1234
1235 static int expired_request(void *data)
1236 {
1237         struct ptlrpc_request *req = data;
1238         ENTRY;
1239
1240         RETURN(ptlrpc_expire_one_request(req));
1241 }
1242
1243 static void interrupted_request(void *data)
1244 {
1245         unsigned long flags;
1246
1247         struct ptlrpc_request *req = data;
1248         DEBUG_REQ(D_HA, req, "request interrupted");
1249         spin_lock_irqsave (&req->rq_lock, flags);
1250         req->rq_intr = 1;
1251         spin_unlock_irqrestore (&req->rq_lock, flags);
1252 }
1253
1254 struct ptlrpc_request *ptlrpc_request_addref(struct ptlrpc_request *req)
1255 {
1256         ENTRY;
1257         atomic_inc(&req->rq_refcount);
1258         RETURN(req);
1259 }
1260
1261 void ptlrpc_retain_replayable_request(struct ptlrpc_request *req,
1262                                       struct obd_import *imp)
1263 {
1264         struct list_head *tmp;
1265
1266 #ifdef CONFIG_SMP
1267         LASSERT(spin_is_locked(&imp->imp_lock));
1268 #endif
1269
1270         /* don't re-add requests that have been replayed */
1271         if (!list_empty(&req->rq_replay_list))
1272                 return;
1273
1274         LASSERT(imp->imp_replayable);
1275         /* Balanced in ptlrpc_free_committed, usually. */
1276         ptlrpc_request_addref(req);
1277         list_for_each_prev(tmp, &imp->imp_replay_list) {
1278                 struct ptlrpc_request *iter =
1279                         list_entry(tmp, struct ptlrpc_request, rq_replay_list);
1280
1281                 /* We may have duplicate transnos if we create and then
1282                  * open a file, or for closes retained if to match creating
1283                  * opens, so use req->rq_xid as a secondary key.
1284                  * (See bugs 684, 685, and 428.)
1285                  * XXX no longer needed, but all opens need transnos!
1286                  */
1287                 if (iter->rq_transno > req->rq_transno)
1288                         continue;
1289
1290                 if (iter->rq_transno == req->rq_transno) {
1291                         LASSERT(iter->rq_xid != req->rq_xid);
1292                         if (iter->rq_xid > req->rq_xid)
1293                                 continue;
1294                 }
1295
1296                 list_add(&req->rq_replay_list, &iter->rq_replay_list);
1297                 return;
1298         }
1299
1300         list_add_tail(&req->rq_replay_list, &imp->imp_replay_list);
1301 }
1302
1303 int ptlrpc_queue_wait(struct ptlrpc_request *req)
1304 {
1305         int rc = 0;
1306         int brc;
1307         struct l_wait_info lwi;
1308         struct obd_import *imp = req->rq_import;
1309         unsigned long flags;
1310         int timeout = 0;
1311         ENTRY;
1312
1313         LASSERT(req->rq_set == NULL);
1314         LASSERT(!req->rq_receiving_reply);
1315
1316         /* for distributed debugging */
1317         req->rq_reqmsg->status = current->pid;
1318         LASSERT(imp->imp_obd != NULL);
1319         CDEBUG(D_RPCTRACE, "Sending RPC pname:cluuid:pid:xid:ni:nid:opc "
1320                "%s:%s:%d:"LPU64":%s:"LPX64":%d\n", current->comm,
1321                imp->imp_obd->obd_uuid.uuid,
1322                req->rq_reqmsg->status, req->rq_xid,
1323                imp->imp_connection->c_peer.peer_ni->pni_name,
1324                imp->imp_connection->c_peer.peer_nid,
1325                req->rq_reqmsg->opc);
1326
1327         /* Mark phase here for a little debug help */
1328         req->rq_phase = RQ_PHASE_RPC;
1329
1330         spin_lock_irqsave(&imp->imp_lock, flags);
1331         req->rq_import_generation = imp->imp_generation;
1332 restart:
1333         if (ptlrpc_import_delay_req(imp, req, &rc)) {
1334                 list_del(&req->rq_list);
1335
1336                 list_add_tail(&req->rq_list, &imp->imp_delayed_list);
1337                 spin_unlock_irqrestore(&imp->imp_lock, flags);
1338
1339                 DEBUG_REQ(D_HA, req, "\"%s\" waiting for recovery: (%s != %s)",
1340                           current->comm, 
1341                           ptlrpc_import_state_name(req->rq_send_state), 
1342                           ptlrpc_import_state_name(imp->imp_state));
1343                 lwi = LWI_INTR(interrupted_request, req);
1344                 rc = l_wait_event(req->rq_reply_waitq,
1345                                   (req->rq_send_state == imp->imp_state ||
1346                                    req->rq_err),
1347                                   &lwi);
1348                 DEBUG_REQ(D_HA, req, "\"%s\" awake: (%s == %s or %d == 1)",
1349                           current->comm, 
1350                           ptlrpc_import_state_name(imp->imp_state), 
1351                           ptlrpc_import_state_name(req->rq_send_state),
1352                           req->rq_err);
1353
1354                 spin_lock_irqsave(&imp->imp_lock, flags);
1355                 list_del_init(&req->rq_list);
1356
1357                 if (req->rq_err) {
1358                         rc = -EIO;
1359                 } 
1360                 else if (req->rq_intr) {
1361                         rc = -EINTR;
1362                 }
1363                 else {
1364                         GOTO(restart, rc);
1365                 }
1366         } 
1367
1368         if (rc != 0) {
1369                 list_del_init(&req->rq_list);
1370                 spin_unlock_irqrestore(&imp->imp_lock, flags);
1371                 req->rq_status = rc; // XXX this ok?
1372                 GOTO(out, rc);
1373         }
1374
1375         if (req->rq_resend) {
1376                 lustre_msg_add_flags(req->rq_reqmsg, MSG_RESENT);
1377
1378                 if (req->rq_bulk != NULL)
1379                         ptlrpc_unregister_bulk (req);
1380
1381                 DEBUG_REQ(D_HA, req, "resending: ");
1382         }
1383
1384         /* XXX this is the same as ptlrpc_set_wait */
1385         LASSERT(list_empty(&req->rq_list));
1386         list_add_tail(&req->rq_list, &imp->imp_sending_list);
1387         spin_unlock_irqrestore(&imp->imp_lock, flags);
1388
1389         rc = ptl_send_rpc(req);
1390         if (rc) {
1391                 DEBUG_REQ(D_HA, req, "send failed (%d); recovering", rc);
1392                 timeout = 1;
1393         } else {
1394                 timeout = MAX(req->rq_timeout * HZ, 1);
1395                 DEBUG_REQ(D_NET, req, "-- sleeping");
1396         }
1397         lwi = LWI_TIMEOUT_INTR(timeout, expired_request, interrupted_request,
1398                                req);
1399         l_wait_event(req->rq_reply_waitq, ptlrpc_check_reply(req), &lwi);
1400         DEBUG_REQ(D_NET, req, "-- done sleeping");
1401
1402         CDEBUG(D_RPCTRACE, "Completed RPC pname:cluuid:pid:xid:ni:nid:opc "
1403                "%s:%s:%d:"LPU64":%s:"LPX64":%d\n", current->comm,
1404                imp->imp_obd->obd_uuid.uuid,
1405                req->rq_reqmsg->status, req->rq_xid,
1406                imp->imp_connection->c_peer.peer_ni->pni_name,
1407                imp->imp_connection->c_peer.peer_nid,
1408                req->rq_reqmsg->opc);
1409
1410         spin_lock_irqsave(&imp->imp_lock, flags);
1411         list_del_init(&req->rq_list);
1412         spin_unlock_irqrestore(&imp->imp_lock, flags);
1413
1414         /* If the reply was received normally, this just grabs the spinlock
1415          * (ensuring the reply callback has returned), sees that
1416          * req->rq_receiving_reply is clear and returns. */
1417         ptlrpc_unregister_reply (req);
1418
1419         if (req->rq_err)
1420                 GOTO(out, rc = -EIO);
1421
1422         /* Resend if we need to, unless we were interrupted. */
1423         if (req->rq_resend && !req->rq_intr) {
1424                 /* ...unless we were specifically told otherwise. */
1425                 if (req->rq_no_resend)
1426                         GOTO(out, rc = -ETIMEDOUT);
1427                 spin_lock_irqsave(&imp->imp_lock, flags);
1428                 goto restart;
1429         }
1430
1431         if (req->rq_intr) {
1432                 /* Should only be interrupted if we timed out. */
1433                 if (!req->rq_timedout)
1434                         DEBUG_REQ(D_ERROR, req,
1435                                   "rq_intr set but rq_timedout not");
1436                 GOTO(out, rc = -EINTR);
1437         }
1438
1439         if (req->rq_timedout) {                 /* non-recoverable timeout */
1440                 GOTO(out, rc = -ETIMEDOUT);
1441         }
1442
1443         if (!req->rq_replied) {
1444                 /* How can this be? -eeb */
1445                 DEBUG_REQ(D_ERROR, req, "!rq_replied: ");
1446                 LBUG();
1447                 GOTO(out, rc = req->rq_status);
1448         }
1449
1450         rc = after_reply (req);
1451         /* NB may return +ve success rc */
1452         if (req->rq_resend) {
1453                 spin_lock_irqsave(&imp->imp_lock, flags);
1454                 goto restart;
1455         }
1456
1457  out:
1458         if (req->rq_bulk != NULL) {
1459                 if (rc >= 0) {                  /* success so far */
1460                         lwi = LWI_TIMEOUT(timeout, NULL, NULL);
1461                         brc = l_wait_event(req->rq_reply_waitq,
1462                                            ptlrpc_bulk_complete(req->rq_bulk),
1463                                            &lwi);
1464                         if (brc != 0) {
1465                                 LASSERT(brc == -ETIMEDOUT);
1466                                 CERROR ("Timed out waiting for bulk\n");
1467                                 rc = brc;
1468                         }
1469                 }
1470                 if (rc < 0)
1471                         ptlrpc_unregister_bulk (req);
1472         }
1473
1474         LASSERT(!req->rq_receiving_reply);
1475         req->rq_phase = RQ_PHASE_INTERPRET;
1476         RETURN(rc);
1477 }
1478
1479 struct ptlrpc_replay_async_args {
1480         int praa_old_state;
1481         int praa_old_status;
1482 };
1483
1484 static int ptlrpc_replay_interpret(struct ptlrpc_request *req,
1485                                     void * data, int rc)
1486 {
1487         struct ptlrpc_replay_async_args *aa = data;
1488         struct obd_import *imp = req->rq_import;
1489         unsigned long flags;
1490
1491         atomic_dec(&imp->imp_replay_inflight);
1492
1493 #if SWAB_PARANOIA
1494         /* Clear reply swab mask; this is a new reply in sender's byte order */
1495         req->rq_rep_swab_mask = 0;
1496 #endif
1497         rc = lustre_unpack_msg(req->rq_repmsg, req->rq_replen);
1498         if (rc) {
1499                 CERROR("unpack_rep failed: %d\n", rc);
1500                 GOTO(out, rc = -EPROTO);
1501         }
1502
1503         if (req->rq_repmsg->type == PTL_RPC_MSG_ERR && 
1504             req->rq_repmsg->status == -ENOTCONN) 
1505                 GOTO(out, rc = req->rq_repmsg->status);
1506
1507         /* The transno had better not change over replay. */
1508         LASSERT(req->rq_reqmsg->transno == req->rq_repmsg->transno);
1509
1510         DEBUG_REQ(D_HA, req, "got rep");
1511
1512         /* let the callback do fixups, possibly including in the request */
1513         if (req->rq_replay_cb)
1514                 req->rq_replay_cb(req);
1515
1516         if (req->rq_replied && req->rq_repmsg->status != aa->praa_old_status) {
1517                 DEBUG_REQ(D_ERROR, req, "status %d, old was %d",
1518                           req->rq_repmsg->status, aa->praa_old_status);
1519         } else {
1520                 /* Put it back for re-replay. */
1521                 req->rq_repmsg->status = aa->praa_old_status;
1522         }
1523
1524         spin_lock_irqsave(&imp->imp_lock, flags);
1525         imp->imp_last_replay_transno = req->rq_transno;
1526         spin_unlock_irqrestore(&imp->imp_lock, flags);
1527
1528         /* continue with recovery */
1529         rc = ptlrpc_import_recovery_state_machine(imp);
1530  out:
1531         req->rq_send_state = aa->praa_old_state;
1532         
1533         if (rc != 0)
1534                 /* this replay failed, so restart recovery */
1535                 ptlrpc_connect_import(imp, NULL);
1536
1537         RETURN(rc);
1538 }
1539
1540
1541 int ptlrpc_replay_req(struct ptlrpc_request *req)
1542 {
1543         struct ptlrpc_replay_async_args *aa;
1544         ENTRY;
1545
1546         LASSERT(req->rq_import->imp_state == LUSTRE_IMP_REPLAY);
1547
1548         /* Not handling automatic bulk replay yet (or ever?) */
1549         LASSERT(req->rq_bulk == NULL);
1550
1551         DEBUG_REQ(D_HA, req, "REPLAY");
1552
1553         LASSERT (sizeof (*aa) <= sizeof (req->rq_async_args));
1554         aa = (struct ptlrpc_replay_async_args *)&req->rq_async_args;
1555         memset(aa, 0, sizeof *aa);
1556
1557         /* Prepare request to be resent with ptlrpcd */
1558         aa->praa_old_state = req->rq_send_state;
1559         req->rq_send_state = LUSTRE_IMP_REPLAY;
1560         req->rq_phase = RQ_PHASE_NEW;
1561         /*
1562          * Q: "How can a req get on the replay list if it wasn't replied?"
1563          * A: "If we failed during the replay of this request, it will still
1564          *     be on the list, but rq_replied will have been reset to 0."
1565          */
1566         if (req->rq_replied) {
1567                 aa->praa_old_status = req->rq_repmsg->status;
1568                 req->rq_status = 0;
1569                 req->rq_replied = 0;
1570         }
1571
1572         req->rq_interpret_reply = ptlrpc_replay_interpret;
1573         atomic_inc(&req->rq_import->imp_replay_inflight);
1574         ptlrpc_request_addref(req); /* ptlrpcd needs a ref */
1575
1576         ptlrpcd_add_req(req);
1577         RETURN(0);
1578 }
1579
1580 void ptlrpc_abort_inflight(struct obd_import *imp)
1581 {
1582         unsigned long flags;
1583         struct list_head *tmp, *n;
1584         ENTRY;
1585
1586         /* Make sure that no new requests get processed for this import.
1587          * ptlrpc_{queue,set}_wait must (and does) hold imp_lock while testing
1588          * this flag and then putting requests on sending_list or delayed_list.
1589          */
1590         spin_lock_irqsave(&imp->imp_lock, flags);
1591
1592         /* XXX locking?  Maybe we should remove each request with the list
1593          * locked?  Also, how do we know if the requests on the list are
1594          * being freed at this time?
1595          */
1596         list_for_each_safe(tmp, n, &imp->imp_sending_list) {
1597                 struct ptlrpc_request *req =
1598                         list_entry(tmp, struct ptlrpc_request, rq_list);
1599
1600                 DEBUG_REQ(D_HA, req, "inflight");
1601
1602                 spin_lock (&req->rq_lock);
1603                 if (req->rq_import_generation < imp->imp_generation) {
1604                         req->rq_err = 1;
1605                         if (req->rq_set != NULL)
1606                                 wake_up(&req->rq_set->set_waitq);
1607                         else
1608                                 wake_up(&req->rq_reply_waitq);
1609                 }
1610                 spin_unlock (&req->rq_lock);
1611         }
1612
1613         list_for_each_safe(tmp, n, &imp->imp_delayed_list) {
1614                 struct ptlrpc_request *req =
1615                         list_entry(tmp, struct ptlrpc_request, rq_list);
1616
1617                 DEBUG_REQ(D_HA, req, "aborting waiting req");
1618
1619                 spin_lock (&req->rq_lock);
1620                 if (req->rq_import_generation < imp->imp_generation) {
1621                         req->rq_err = 1;
1622                         if (req->rq_set != NULL)
1623                                 wake_up(&req->rq_set->set_waitq);
1624                         else
1625                                 wake_up(&req->rq_reply_waitq);
1626                 }
1627                 spin_unlock (&req->rq_lock);
1628         }
1629
1630         /* Last chance to free reqs left on the replay list, but we
1631          * will still leak reqs that haven't comitted.  */
1632         if (imp->imp_replayable)
1633                 ptlrpc_free_committed(imp);
1634
1635         spin_unlock_irqrestore(&imp->imp_lock, flags);
1636
1637         EXIT;
1638 }
1639
1640 static __u64 ptlrpc_last_xid = 0;
1641 static spinlock_t ptlrpc_last_xid_lock = SPIN_LOCK_UNLOCKED;
1642
1643 __u64 ptlrpc_next_xid(void)
1644 {
1645         __u64 tmp;
1646         spin_lock(&ptlrpc_last_xid_lock);
1647         tmp = ++ptlrpc_last_xid;
1648         spin_unlock(&ptlrpc_last_xid_lock);
1649         return tmp;
1650 }
1651
1652