Whamcloud - gitweb
8dc4ad030e2161b70ed83c080f167083f4a75dc3
[fs/lustre-release.git] / client.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  *
21  */
22
23 #define DEBUG_SUBSYSTEM S_RPC
24 #ifndef __KERNEL__
25 #include <errno.h>
26 #include <signal.h>
27 #include <liblustre.h>
28 #endif
29
30 #include <linux/obd_support.h>
31 #include <linux/obd_class.h>
32 #include <linux/lustre_lib.h>
33 #include <linux/lustre_ha.h>
34 #include <linux/lustre_import.h>
35
36 #include "ptlrpc_internal.h"
37
38 void ptlrpc_init_client(int req_portal, int rep_portal, char *name,
39                         struct ptlrpc_client *cl)
40 {
41         cl->cli_request_portal = req_portal;
42         cl->cli_reply_portal   = rep_portal;
43         cl->cli_name           = name;
44 }
45
46 struct ptlrpc_connection *ptlrpc_uuid_to_connection(struct obd_uuid *uuid)
47 {
48         struct ptlrpc_connection *c;
49         struct ptlrpc_peer peer;
50         int err;
51
52         err = ptlrpc_uuid_to_peer(uuid, &peer);
53         if (err != 0) {
54                 CERROR("cannot find peer %s!\n", uuid->uuid);
55                 return NULL;
56         }
57
58         c = ptlrpc_get_connection(&peer, uuid);
59         if (c) {
60                 memcpy(c->c_remote_uuid.uuid,
61                        uuid->uuid, sizeof(c->c_remote_uuid.uuid));
62         }
63
64         CDEBUG(D_INFO, "%s -> %p\n", uuid->uuid, c);
65
66         return c;
67 }
68
69 void ptlrpc_readdress_connection(struct ptlrpc_connection *conn,
70                                  struct obd_uuid *uuid)
71 {
72         struct ptlrpc_peer peer;
73         int err;
74
75         err = ptlrpc_uuid_to_peer(uuid, &peer);
76         if (err != 0) {
77                 CERROR("cannot find peer %s!\n", uuid->uuid);
78                 return;
79         }
80
81         memcpy(&conn->c_peer, &peer, sizeof (peer));
82         return;
83 }
84
85 static inline struct ptlrpc_bulk_desc *new_bulk(int npages, int type, int portal)
86 {
87         struct ptlrpc_bulk_desc *desc;
88
89         OBD_ALLOC(desc, offsetof (struct ptlrpc_bulk_desc, bd_iov[npages]));
90         if (!desc)
91                 return NULL;
92
93         spin_lock_init(&desc->bd_lock);
94         init_waitqueue_head(&desc->bd_waitq);
95         desc->bd_max_pages = npages;
96         desc->bd_page_count = 0;
97         desc->bd_md_h = PTL_HANDLE_NONE;
98         desc->bd_portal = portal;
99         desc->bd_type = type;
100         
101         return desc;
102 }
103
104 struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_imp (struct ptlrpc_request *req,
105                                                int npages, int type, int portal)
106 {
107         struct obd_import *imp = req->rq_import;
108         struct ptlrpc_bulk_desc *desc;
109
110         LASSERT(type == BULK_PUT_SINK || type == BULK_GET_SOURCE);
111         desc = new_bulk(npages, type, portal);
112         if (desc == NULL)
113                 RETURN(NULL);
114
115         desc->bd_import_generation = req->rq_import_generation;
116         desc->bd_import = class_import_get(imp);
117         desc->bd_req = req;
118
119         desc->bd_cbid.cbid_fn  = client_bulk_callback;
120         desc->bd_cbid.cbid_arg = desc;
121
122         /* This makes req own desc, and free it when she frees herself */
123         req->rq_bulk = desc;
124
125         return desc;
126 }
127
128 struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_exp (struct ptlrpc_request *req,
129                                                int npages, int type, int portal)
130 {
131         struct obd_export *exp = req->rq_export;
132         struct ptlrpc_bulk_desc *desc;
133
134         LASSERT(type == BULK_PUT_SOURCE || type == BULK_GET_SINK);
135
136         desc = new_bulk(npages, type, portal);
137         if (desc == NULL)
138                 RETURN(NULL);
139
140         desc->bd_export = class_export_get(exp);
141         desc->bd_req = req;
142
143         desc->bd_cbid.cbid_fn  = server_bulk_callback;
144         desc->bd_cbid.cbid_arg = desc;
145
146         /* NB we don't assign rq_bulk here; server-side requests are
147          * re-used, and the handler frees the bulk desc explicitly. */
148
149         return desc;
150 }
151
152 void ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc,
153                            struct page *page, int pageoffset, int len)
154 {
155 #ifdef __KERNEL__
156         ptl_kiov_t *kiov = &desc->bd_iov[desc->bd_page_count];
157 #else
158         struct iovec *iov = &desc->bd_iov[desc->bd_page_count];
159 #endif
160         LASSERT(desc->bd_page_count < desc->bd_max_pages);
161         LASSERT(page != NULL);
162         LASSERT(pageoffset >= 0);
163         LASSERT(len > 0);
164         LASSERT(pageoffset + len <= PAGE_SIZE);
165
166 #ifdef __KERNEL__
167         kiov->kiov_page   = page;
168         kiov->kiov_offset = pageoffset;
169         kiov->kiov_len    = len;
170 #else
171         iov->iov_base = page->addr + pageoffset;
172         iov->iov_len  = len;
173 #endif
174         desc->bd_page_count++;
175         desc->bd_nob += len;
176 }
177
178 void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *desc)
179 {
180         ENTRY;
181
182         LASSERT(desc != NULL);
183         LASSERT(desc->bd_page_count != 0x5a5a5a5a); /* not freed already */
184         LASSERT(!desc->bd_network_rw);         /* network hands off or */
185         LASSERT((desc->bd_export != NULL) ^ (desc->bd_import != NULL));
186         if (desc->bd_export)
187                 class_export_put(desc->bd_export);
188         else
189                 class_import_put(desc->bd_import);
190
191         OBD_FREE(desc, offsetof(struct ptlrpc_bulk_desc, 
192                                 bd_iov[desc->bd_max_pages]));
193         EXIT;
194 }
195
196 struct ptlrpc_request *ptlrpc_prep_req(struct obd_import *imp, int opcode,
197                                        int count, int *lengths, char **bufs)
198 {
199         struct ptlrpc_request *request;
200         int rc;
201         ENTRY;
202
203         LASSERT((unsigned long)imp > 0x1000);
204
205         OBD_ALLOC(request, sizeof(*request));
206         if (!request) {
207                 CERROR("request allocation out of memory\n");
208                 RETURN(NULL);
209         }
210
211         rc = lustre_pack_request(request, count, lengths, bufs);
212         if (rc) {
213                 CERROR("cannot pack request %d\n", rc);
214                 OBD_FREE(request, sizeof(*request));
215                 RETURN(NULL);
216         }
217
218         if (imp->imp_server_timeout)
219                 request->rq_timeout = obd_timeout / 2;
220         else
221                 request->rq_timeout = obd_timeout;
222         request->rq_send_state = LUSTRE_IMP_FULL;
223         request->rq_type = PTL_RPC_MSG_REQUEST;
224         request->rq_import = class_import_get(imp);
225
226         request->rq_req_cbid.cbid_fn  = request_out_callback;
227         request->rq_req_cbid.cbid_arg = request;
228
229         request->rq_reply_cbid.cbid_fn  = reply_in_callback;
230         request->rq_reply_cbid.cbid_arg = request;
231         
232         request->rq_phase = RQ_PHASE_NEW;
233
234         /* XXX FIXME bug 249 */
235         request->rq_request_portal = imp->imp_client->cli_request_portal;
236         request->rq_reply_portal = imp->imp_client->cli_reply_portal;
237
238         spin_lock_init(&request->rq_lock);
239         INIT_LIST_HEAD(&request->rq_list);
240         INIT_LIST_HEAD(&request->rq_replay_list);
241         INIT_LIST_HEAD(&request->rq_set_chain);
242         init_waitqueue_head(&request->rq_reply_waitq);
243         request->rq_xid = ptlrpc_next_xid();
244         atomic_set(&request->rq_refcount, 1);
245
246         request->rq_reqmsg->opc = opcode;
247         request->rq_reqmsg->flags = 0;
248
249         RETURN(request);
250 }
251
252 struct ptlrpc_request_set *ptlrpc_prep_set(void)
253 {
254         struct ptlrpc_request_set *set;
255
256         OBD_ALLOC(set, sizeof *set);
257         if (!set)
258                 RETURN(NULL);
259         INIT_LIST_HEAD(&set->set_requests);
260         init_waitqueue_head(&set->set_waitq);
261         set->set_remaining = 0;
262         spin_lock_init(&set->set_new_req_lock);
263         INIT_LIST_HEAD(&set->set_new_requests);
264
265         RETURN(set);
266 }
267
268 /* Finish with this set; opposite of prep_set. */
269 void ptlrpc_set_destroy(struct ptlrpc_request_set *set)
270 {
271         struct list_head *tmp;
272         struct list_head *next;
273         int               expected_phase;
274         int               n = 0;
275         ENTRY;
276
277         /* Requests on the set should either all be completed, or all be new */
278         expected_phase = (set->set_remaining == 0) ?
279                          RQ_PHASE_COMPLETE : RQ_PHASE_NEW;
280         list_for_each (tmp, &set->set_requests) {
281                 struct ptlrpc_request *req =
282                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
283
284                 LASSERT(req->rq_phase == expected_phase);
285                 n++;
286         }
287
288         LASSERT(set->set_remaining == 0 || set->set_remaining == n);
289
290         list_for_each_safe(tmp, next, &set->set_requests) {
291                 struct ptlrpc_request *req =
292                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
293                 list_del_init(&req->rq_set_chain);
294
295                 LASSERT(req->rq_phase == expected_phase);
296
297                 if (req->rq_phase == RQ_PHASE_NEW) {
298
299                         if (req->rq_interpret_reply != NULL) {
300                                 int (*interpreter)(struct ptlrpc_request *,
301                                                    void *, int) =
302                                         req->rq_interpret_reply;
303
304                                 /* higher level (i.e. LOV) failed;
305                                  * let the sub reqs clean up */
306                                 req->rq_status = -EBADR;
307                                 interpreter(req, &req->rq_async_args,
308                                             req->rq_status);
309                         }
310                         set->set_remaining--;
311                 }
312
313                 req->rq_set = NULL;
314                 ptlrpc_req_finished (req);
315         }
316
317         LASSERT(set->set_remaining == 0);
318
319         OBD_FREE(set, sizeof(*set));
320         EXIT;
321 }
322
323 void ptlrpc_set_add_req(struct ptlrpc_request_set *set,
324                         struct ptlrpc_request *req)
325 {
326         /* The set takes over the caller's request reference */
327         list_add_tail(&req->rq_set_chain, &set->set_requests);
328         req->rq_set = set;
329         set->set_remaining++;
330 }
331
332 /* lock so many callers can add things, the context that owns the set
333  * is supposed to notice these and move them into the set proper. */
334 void ptlrpc_set_add_new_req(struct ptlrpc_request_set *set,
335                             struct ptlrpc_request *req)
336 {
337         unsigned long flags;
338         spin_lock_irqsave(&set->set_new_req_lock, flags);
339         /* The set takes over the caller's request reference */
340         list_add_tail(&req->rq_set_chain, &set->set_new_requests);
341         req->rq_set = set;
342         spin_unlock_irqrestore(&set->set_new_req_lock, flags);
343 }
344
345 /*
346  * Based on the current state of the import, determine if the request
347  * can be sent, is an error, or should be delayed.
348  *
349  * Returns true if this request should be delayed. If false, and
350  * *status is set, then the request can not be sent and *status is the
351  * error code.  If false and status is 0, then request can be sent.
352  *
353  * The imp->imp_lock must be held.
354  */
355 static int ptlrpc_import_delay_req(struct obd_import *imp, 
356                                    struct ptlrpc_request *req, int *status)
357 {
358         int delay = 0;
359         ENTRY;
360
361         LASSERT (status != NULL);
362         *status = 0;
363
364         if (imp->imp_state == LUSTRE_IMP_NEW) {
365                 DEBUG_REQ(D_ERROR, req, "Uninitialized import.");
366                 *status = -EIO;
367                 LBUG();
368         }
369         else if (imp->imp_state == LUSTRE_IMP_CLOSED) {
370                 DEBUG_REQ(D_ERROR, req, "IMP_CLOSED ");
371                 *status = -EIO;
372         }
373         /*
374          * If the import has been invalidated (such as by an OST failure), the
375          * request must fail with -EIO.  
376          */
377         else if (imp->imp_invalid) {
378                 DEBUG_REQ(D_ERROR, req, "IMP_INVALID");
379                 *status = -EIO;
380         } 
381         else if (req->rq_import_generation != imp->imp_generation) {
382                 DEBUG_REQ(D_ERROR, req, "req wrong generation:");
383                 *status = -EIO;
384         } 
385         else if (req->rq_send_state != imp->imp_state) {
386                 if (imp->imp_obd->obd_no_recov || imp->imp_dlm_fake 
387                     || req->rq_no_delay) 
388                         *status = -EWOULDBLOCK;
389                 else
390                         delay = 1;
391         }
392
393         RETURN(delay);
394 }
395
396 static int ptlrpc_check_reply(struct ptlrpc_request *req)
397 {
398         unsigned long flags;
399         int rc = 0;
400         ENTRY;
401
402         /* serialise with network callback */
403         spin_lock_irqsave (&req->rq_lock, flags);
404
405         if (req->rq_replied) {
406                 DEBUG_REQ(D_NET, req, "REPLIED:");
407                 GOTO(out, rc = 1);
408         }
409
410         if (req->rq_err) {
411                 DEBUG_REQ(D_ERROR, req, "ABORTED:");
412                 GOTO(out, rc = 1);
413         }
414
415         if (req->rq_resend) {
416                 DEBUG_REQ(D_ERROR, req, "RESEND:");
417                 GOTO(out, rc = 1);
418         }
419
420         if (req->rq_restart) {
421                 DEBUG_REQ(D_ERROR, req, "RESTART:");
422                 GOTO(out, rc = 1);
423         }
424         EXIT;
425  out:
426         spin_unlock_irqrestore (&req->rq_lock, flags);
427         DEBUG_REQ(D_NET, req, "rc = %d for", rc);
428         return rc;
429 }
430
431 static int ptlrpc_check_status(struct ptlrpc_request *req)
432 {
433         int err;
434         ENTRY;
435
436         err = req->rq_repmsg->status;
437         if (req->rq_repmsg->type == PTL_RPC_MSG_ERR) {
438                 DEBUG_REQ(D_ERROR, req, "type == PTL_RPC_MSG_ERR, err == %d", 
439                           err);
440                 RETURN(err < 0 ? err : -EINVAL);
441         }
442
443         if (err < 0) {
444                 DEBUG_REQ(D_INFO, req, "status is %d", err);
445         } else if (err > 0) {
446                 /* XXX: translate this error from net to host */
447                 DEBUG_REQ(D_INFO, req, "status is %d", err);
448         }
449
450         RETURN(err);
451 }
452
453 static int after_reply(struct ptlrpc_request *req)
454 {
455         unsigned long flags;
456         struct obd_import *imp = req->rq_import;
457         int rc;
458         ENTRY;
459
460         LASSERT(!req->rq_receiving_reply);
461
462         /* NB Until this point, the whole of the incoming message,
463          * including buflens, status etc is in the sender's byte order. */
464
465 #if SWAB_PARANOIA
466         /* Clear reply swab mask; this is a new reply in sender's byte order */
467         req->rq_rep_swab_mask = 0;
468 #endif
469         LASSERT (req->rq_nob_received <= req->rq_replen);
470         rc = lustre_unpack_msg(req->rq_repmsg, req->rq_nob_received);
471         if (rc) {
472                 CERROR("unpack_rep failed: %d\n", rc);
473                 RETURN(-EPROTO);
474         }
475
476         if (req->rq_repmsg->type != PTL_RPC_MSG_REPLY &&
477             req->rq_repmsg->type != PTL_RPC_MSG_ERR) {
478                 CERROR("invalid packet type received (type=%u)\n",
479                        req->rq_repmsg->type);
480                 RETURN(-EPROTO);
481         }
482
483         /* Store transno in reqmsg for replay. */
484         req->rq_reqmsg->transno = req->rq_transno = req->rq_repmsg->transno;
485
486         rc = ptlrpc_check_status(req);
487
488         /* Either we've been evicted, or the server has failed for
489          * some reason. Try to reconnect, and if that fails, punt to the
490          * upcall. */
491         if (rc == -ENOTCONN) {
492                 if (req->rq_send_state != LUSTRE_IMP_FULL ||
493                     imp->imp_obd->obd_no_recov || imp->imp_dlm_fake) {
494                         RETURN(-ENOTCONN);
495                 }
496
497                 ptlrpc_request_handle_notconn(req);
498
499                 RETURN(rc);
500         }
501
502         if (req->rq_import->imp_replayable) {
503                 spin_lock_irqsave(&imp->imp_lock, flags);
504                 if (req->rq_replay || req->rq_transno != 0)
505                         ptlrpc_retain_replayable_request(req, imp);
506                 else if (req->rq_commit_cb != NULL)
507                         req->rq_commit_cb(req);
508
509                 if (req->rq_transno > imp->imp_max_transno)
510                         imp->imp_max_transno = req->rq_transno;
511
512                 /* Replay-enabled imports return commit-status information. */
513                 if (req->rq_repmsg->last_committed)
514                         imp->imp_peer_committed_transno =
515                                 req->rq_repmsg->last_committed;
516                 ptlrpc_free_committed(imp);
517                 spin_unlock_irqrestore(&imp->imp_lock, flags);
518         }
519
520         RETURN(rc);
521 }
522
523 static int ptlrpc_send_new_req(struct ptlrpc_request *req)
524 {
525         struct obd_import     *imp;
526         unsigned long          flags;
527         int rc;
528         ENTRY;
529
530         LASSERT(req->rq_phase == RQ_PHASE_NEW);
531         req->rq_phase = RQ_PHASE_RPC;
532
533         imp = req->rq_import;
534         spin_lock_irqsave(&imp->imp_lock, flags);
535
536         if (imp->imp_invalid) {
537                 spin_unlock_irqrestore(&imp->imp_lock, flags);
538                 req->rq_status = -EIO;
539                 req->rq_phase = RQ_PHASE_INTERPRET;
540                 RETURN(-EIO);
541         }
542
543         req->rq_import_generation = imp->imp_generation;
544
545         if (ptlrpc_import_delay_req(imp, req, &rc)) {
546                 spin_lock (&req->rq_lock);
547                 req->rq_waiting = 1;
548                 spin_unlock (&req->rq_lock);
549
550                 DEBUG_REQ(D_HA, req, "req from PID %d waiting for recovery: "
551                           "(%s != %s)",
552                           req->rq_reqmsg->status, 
553                           ptlrpc_import_state_name(req->rq_send_state),
554                           ptlrpc_import_state_name(imp->imp_state));
555                 LASSERT(list_empty (&req->rq_list));
556
557                 list_add_tail(&req->rq_list, &imp->imp_delayed_list);
558                 spin_unlock_irqrestore(&imp->imp_lock, flags);
559                 RETURN(0);
560         }
561
562         if (rc != 0) {
563                 spin_unlock_irqrestore(&imp->imp_lock, flags);
564                 req->rq_status = rc;
565                 req->rq_phase = RQ_PHASE_INTERPRET;
566                 RETURN(rc);
567         }
568
569         /* XXX this is the same as ptlrpc_queue_wait */
570         LASSERT(list_empty(&req->rq_list));
571         list_add_tail(&req->rq_list, &imp->imp_sending_list);
572         spin_unlock_irqrestore(&imp->imp_lock, flags);
573
574         req->rq_reqmsg->status = current->pid;
575         CDEBUG(D_RPCTRACE, "Sending RPC pname:cluuid:pid:xid:ni:nid:opc"
576                " %s:%s:%d:"LPU64":%s:"LPX64":%d\n", current->comm,
577                imp->imp_obd->obd_uuid.uuid, req->rq_reqmsg->status,
578                req->rq_xid,
579                imp->imp_connection->c_peer.peer_ni->pni_name,
580                imp->imp_connection->c_peer.peer_nid,
581                req->rq_reqmsg->opc);
582
583         rc = ptl_send_rpc(req);
584         if (rc) {
585                 DEBUG_REQ(D_HA, req, "send failed (%d); expect timeout", rc);
586                 req->rq_timeout = 1;
587                 RETURN(rc);
588         }
589         RETURN(0);
590 }
591
592 int ptlrpc_check_set(struct ptlrpc_request_set *set)
593 {
594         unsigned long flags;
595         struct list_head *tmp;
596         int force_timer_recalc = 0;
597         ENTRY;
598
599         if (set->set_remaining == 0)
600                 RETURN(1);
601
602         list_for_each(tmp, &set->set_requests) {
603                 struct ptlrpc_request *req =
604                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
605                 struct obd_import *imp = req->rq_import;
606                 int rc = 0;
607
608                 if (req->rq_phase == RQ_PHASE_NEW &&
609                     ptlrpc_send_new_req(req)) {
610                         force_timer_recalc = 1;
611                 }
612
613                 if (!(req->rq_phase == RQ_PHASE_RPC ||
614                       req->rq_phase == RQ_PHASE_BULK ||
615                       req->rq_phase == RQ_PHASE_INTERPRET ||
616                       req->rq_phase == RQ_PHASE_COMPLETE)) {
617                         DEBUG_REQ(D_ERROR, req, "bad phase %x", req->rq_phase);
618                         LBUG();
619                 }
620
621                 if (req->rq_phase == RQ_PHASE_COMPLETE)
622                         continue;
623
624                 if (req->rq_phase == RQ_PHASE_INTERPRET)
625                         GOTO(interpret, req->rq_status);
626
627                 if (req->rq_err) {
628                         ptlrpc_unregister_reply(req);
629                         if (req->rq_status == 0)
630                                 req->rq_status = -EIO;
631                         req->rq_phase = RQ_PHASE_INTERPRET;
632
633                         spin_lock_irqsave(&imp->imp_lock, flags);
634                         list_del_init(&req->rq_list);
635                         spin_unlock_irqrestore(&imp->imp_lock, flags);
636
637                         GOTO(interpret, req->rq_status);
638                 }
639
640                 /* ptlrpc_queue_wait->l_wait_event guarantees that rq_intr
641                  * will only be set after rq_timedout, but the oig waiting
642                  * path sets rq_intr irrespective of whether ptlrpcd has
643                  * seen a timeout.  our policy is to only interpret 
644                  * interrupted rpcs after they have timed out */
645                 if (req->rq_intr && (req->rq_timedout || req->rq_waiting)) {
646                         /* NB could be on delayed list */
647                         ptlrpc_unregister_reply(req);
648                         req->rq_status = -EINTR;
649                         req->rq_phase = RQ_PHASE_INTERPRET;
650
651                         spin_lock_irqsave(&imp->imp_lock, flags);
652                         list_del_init(&req->rq_list);
653                         spin_unlock_irqrestore(&imp->imp_lock, flags);
654
655                         GOTO(interpret, req->rq_status);
656                 }
657
658                 if (req->rq_phase == RQ_PHASE_RPC) {
659                         if (req->rq_waiting || req->rq_resend) {
660                                 int status;
661
662                                 LASSERT (!ptlrpc_client_receiving_reply(req));
663                                 LASSERT (req->rq_bulk == NULL ||
664                                          !ptlrpc_bulk_active(req->rq_bulk));
665
666                                 spin_lock_irqsave(&imp->imp_lock, flags);
667
668                                 if (ptlrpc_import_delay_req(imp, req, &status)) {
669                                         spin_unlock_irqrestore(&imp->imp_lock,
670                                                                flags);
671                                         continue;
672                                 } 
673
674                                 list_del_init(&req->rq_list);
675                                 if (status != 0)  {
676                                         req->rq_status = status;
677                                         req->rq_phase = RQ_PHASE_INTERPRET;
678                                         spin_unlock_irqrestore(&imp->imp_lock,
679                                                                flags);
680                                         GOTO(interpret, req->rq_status);
681                                 }
682                                 if (req->rq_no_resend) {
683                                         req->rq_status = -ENOTCONN;
684                                         req->rq_phase = RQ_PHASE_INTERPRET;
685                                         spin_unlock_irqrestore(&imp->imp_lock,
686                                                                flags);
687                                         GOTO(interpret, req->rq_status);
688                                 }
689                                 list_add_tail(&req->rq_list,
690                                               &imp->imp_sending_list);
691
692                                 spin_unlock_irqrestore(&imp->imp_lock, flags);
693
694                                 req->rq_waiting = 0;
695                                 if (req->rq_resend) {
696                                         lustre_msg_add_flags(req->rq_reqmsg,
697                                                              MSG_RESENT);
698                                         ptlrpc_unregister_reply(req);
699                                         if (req->rq_bulk) {
700                                                 __u64 old_xid = req->rq_xid;
701
702                                                 /* ensure previous bulk fails */
703                                                 req->rq_xid = ptlrpc_next_xid();
704                                                 CDEBUG(D_HA, "resend bulk "
705                                                        "old x"LPU64
706                                                        " new x"LPU64"\n",
707                                                        old_xid, req->rq_xid);
708                                         }
709                                 }
710
711                                 rc = ptl_send_rpc(req);
712                                 if (rc) {
713                                         DEBUG_REQ(D_HA, req, "send failed (%d)",
714                                                   rc);
715                                         force_timer_recalc = 1;
716                                         req->rq_timeout = 0;
717                                 }
718                                 /* need to reset the timeout */
719                                 force_timer_recalc = 1;
720                         }
721
722                         /* Still waiting for a reply? */
723                         if (ptlrpc_client_receiving_reply(req))
724                                 continue;
725
726                         /* Did we actually receive a reply? */
727                         if (!ptlrpc_client_replied(req))
728                                 continue;
729
730                         spin_lock_irqsave(&imp->imp_lock, flags);
731                         list_del_init(&req->rq_list);
732                         spin_unlock_irqrestore(&imp->imp_lock, flags);
733
734                         req->rq_status = after_reply(req);
735                         if (req->rq_resend) {
736                                 /* Add this req to the delayed list so
737                                    it can be errored if the import is
738                                    evicted after recovery. */
739                                 spin_lock_irqsave (&req->rq_lock, flags);
740                                 list_add_tail(&req->rq_list, 
741                                               &imp->imp_delayed_list);
742                                 spin_unlock_irqrestore(&req->rq_lock, flags);
743                                 continue;
744                         }
745
746                         /* If there is no bulk associated with this request,
747                          * then we're done and should let the interpreter
748                          * process the reply.  Similarly if the RPC returned
749                          * an error, and therefore the bulk will never arrive.
750                          */
751                         if (req->rq_bulk == NULL || req->rq_status != 0) {
752                                 req->rq_phase = RQ_PHASE_INTERPRET;
753                                 GOTO(interpret, req->rq_status);
754                         }
755
756                         req->rq_phase = RQ_PHASE_BULK;
757                 }
758
759                 LASSERT(req->rq_phase == RQ_PHASE_BULK);
760                 if (ptlrpc_bulk_active(req->rq_bulk))
761                         continue;
762
763                 if (!req->rq_bulk->bd_success) {
764                         /* The RPC reply arrived OK, but the bulk screwed
765                          * up!  Dead wierd since the server told us the RPC
766                          * was good after getting the REPLY for her GET or
767                          * the ACK for her PUT. */
768                         DEBUG_REQ(D_ERROR, req, "bulk transfer failed");
769                         LBUG();
770                 }
771
772                 req->rq_phase = RQ_PHASE_INTERPRET;
773
774         interpret:
775                 LASSERT(req->rq_phase == RQ_PHASE_INTERPRET);
776                 LASSERT(!req->rq_receiving_reply);
777
778                 ptlrpc_unregister_reply(req);
779                 if (req->rq_bulk != NULL)
780                         ptlrpc_unregister_bulk (req);
781
782                 req->rq_phase = RQ_PHASE_COMPLETE;
783
784                 if (req->rq_interpret_reply != NULL) {
785                         int (*interpreter)(struct ptlrpc_request *,void *,int) =
786                                 req->rq_interpret_reply;
787                         req->rq_status = interpreter(req, &req->rq_async_args,
788                                                      req->rq_status);
789                 }
790
791                 CDEBUG(D_RPCTRACE, "Completed RPC pname:cluuid:pid:xid:ni:nid:"
792                        "opc %s:%s:%d:"LPU64":%s:"LPX64":%d\n", current->comm,
793                        imp->imp_obd->obd_uuid.uuid, req->rq_reqmsg->status,
794                        req->rq_xid,
795                        imp->imp_connection->c_peer.peer_ni->pni_name,
796                        imp->imp_connection->c_peer.peer_nid,
797                        req->rq_reqmsg->opc);
798
799                 set->set_remaining--;
800         }
801
802         /* If we hit an error, we want to recover promptly. */
803         RETURN(set->set_remaining == 0 || force_timer_recalc);
804 }
805
806 int ptlrpc_expire_one_request(struct ptlrpc_request *req)
807 {
808         unsigned long      flags;
809         struct obd_import *imp = req->rq_import;
810         ENTRY;
811
812         DEBUG_REQ(D_ERROR, req, "timeout");
813
814         spin_lock_irqsave (&req->rq_lock, flags);
815         req->rq_timedout = 1;
816         spin_unlock_irqrestore (&req->rq_lock, flags);
817
818         ptlrpc_unregister_reply (req);
819
820         if (req->rq_bulk != NULL)
821                 ptlrpc_unregister_bulk (req);
822
823         if (imp == NULL) {
824                 DEBUG_REQ(D_HA, req, "NULL import: already cleaned up?");
825                 RETURN(1);
826         }
827
828         /* The DLM server doesn't want recovery run on its imports. */
829         if (imp->imp_dlm_fake)
830                 RETURN(1);
831
832         /* If this request is for recovery or other primordial tasks,
833          * then error it out here. */
834         if (req->rq_send_state != LUSTRE_IMP_FULL || 
835             imp->imp_obd->obd_no_recov) {
836                 spin_lock_irqsave (&req->rq_lock, flags);
837                 req->rq_status = -ETIMEDOUT;
838                 req->rq_err = 1;
839                 spin_unlock_irqrestore (&req->rq_lock, flags);
840                 RETURN(1);
841         }
842
843         ptlrpc_fail_import(imp, req->rq_import_generation);
844
845         RETURN(0);
846 }
847
848 int ptlrpc_expired_set(void *data)
849 {
850         struct ptlrpc_request_set *set = data;
851         struct list_head          *tmp;
852         time_t                     now = LTIME_S (CURRENT_TIME);
853         ENTRY;
854
855         LASSERT(set != NULL);
856
857         /* A timeout expired; see which reqs it applies to... */
858         list_for_each (tmp, &set->set_requests) {
859                 struct ptlrpc_request *req =
860                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
861
862                 /* request in-flight? */
863                 if (!((req->rq_phase == RQ_PHASE_RPC && !req->rq_waiting 
864                        && !req->rq_resend) ||
865                       (req->rq_phase == RQ_PHASE_BULK)))
866                         continue;
867
868                 if (req->rq_timedout ||           /* already dealt with */
869                     req->rq_sent + req->rq_timeout > now) /* not expired */
870                         continue;
871
872                 /* deal with this guy */
873                 ptlrpc_expire_one_request (req);
874         }
875
876         /* When waiting for a whole set, we always to break out of the
877          * sleep so we can recalculate the timeout, or enable interrupts
878          * iff everyone's timed out.
879          */
880         RETURN(1);
881 }
882
883 void ptlrpc_mark_interrupted(struct ptlrpc_request *req)
884 {
885         unsigned long flags;
886         spin_lock_irqsave(&req->rq_lock, flags);
887         req->rq_intr = 1;
888         spin_unlock_irqrestore(&req->rq_lock, flags);
889 }
890
891 void ptlrpc_interrupted_set(void *data)
892 {
893         struct ptlrpc_request_set *set = data;
894         struct list_head *tmp;
895
896         LASSERT(set != NULL);
897         CERROR("INTERRUPTED SET %p\n", set);
898
899         list_for_each(tmp, &set->set_requests) {
900                 struct ptlrpc_request *req =
901                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
902
903                 if (req->rq_phase != RQ_PHASE_RPC)
904                         continue;
905
906                 ptlrpc_mark_interrupted(req);
907         }
908 }
909
910 int ptlrpc_set_next_timeout(struct ptlrpc_request_set *set)
911 {
912         struct list_head      *tmp;
913         time_t                 now = LTIME_S(CURRENT_TIME);
914         time_t                 deadline;
915         int                    timeout = 0;
916         struct ptlrpc_request *req;
917         ENTRY;
918
919         SIGNAL_MASK_ASSERT(); /* XXX BUG 1511 */
920
921         list_for_each(tmp, &set->set_requests) {
922                 req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
923
924                 /* request in-flight? */
925                 if (!((req->rq_phase == RQ_PHASE_RPC && !req->rq_waiting) ||
926                       (req->rq_phase == RQ_PHASE_BULK)))
927                         continue;
928
929                 if (req->rq_timedout)   /* already timed out */
930                         continue;
931
932                 deadline = req->rq_sent + req->rq_timeout;
933                 if (deadline <= now)    /* actually expired already */
934                         timeout = 1;    /* ASAP */
935                 else if (timeout == 0 || timeout > deadline - now)
936                         timeout = deadline - now;
937         }
938         RETURN(timeout);
939 }
940                 
941
942 int ptlrpc_set_wait(struct ptlrpc_request_set *set)
943 {
944         struct list_head      *tmp;
945         struct ptlrpc_request *req;
946         struct l_wait_info     lwi;
947         int                    rc, timeout;
948         ENTRY;
949
950         LASSERT(!list_empty(&set->set_requests));
951         list_for_each(tmp, &set->set_requests) {
952                 req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
953                 if (req->rq_phase == RQ_PHASE_NEW)
954                         (void)ptlrpc_send_new_req(req);
955         }
956
957         do {
958                 timeout = ptlrpc_set_next_timeout(set);
959
960                 /* wait until all complete, interrupted, or an in-flight
961                  * req times out */
962                 CDEBUG(D_HA, "set %p going to sleep for %d seconds\n",
963                        set, timeout);
964                 lwi = LWI_TIMEOUT_INTR((timeout ? timeout : 1) * HZ,
965                                        ptlrpc_expired_set, 
966                                        ptlrpc_interrupted_set, set);
967                 rc = l_wait_event(set->set_waitq, ptlrpc_check_set(set), &lwi);
968
969                 LASSERT(rc == 0 || rc == -EINTR || rc == -ETIMEDOUT);
970
971                 /* -EINTR => all requests have been flagged rq_intr so next
972                  * check completes.
973                  * -ETIMEOUTD => someone timed out.  When all reqs have
974                  * timed out, signals are enabled allowing completion with
975                  * EINTR.
976                  * I don't really care if we go once more round the loop in
977                  * the error cases -eeb. */
978         } while (rc != 0);
979
980         LASSERT(set->set_remaining == 0);
981
982         rc = 0;
983         list_for_each(tmp, &set->set_requests) {
984                 req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
985
986                 LASSERT(req->rq_phase == RQ_PHASE_COMPLETE);
987                 if (req->rq_status != 0)
988                         rc = req->rq_status;
989         }
990
991         if (set->set_interpret != NULL) {
992                 int (*interpreter)(struct ptlrpc_request_set *set,void *,int) =
993                         set->set_interpret;
994                 rc = interpreter (set, &set->set_args, rc);
995         }
996
997         RETURN(rc);
998 }
999
1000 static void __ptlrpc_free_req(struct ptlrpc_request *request, int locked)
1001 {
1002         ENTRY;
1003         if (request == NULL) {
1004                 EXIT;
1005                 return;
1006         }
1007
1008         LASSERTF(!request->rq_receiving_reply, "req %p\n", request);
1009         LASSERTF(request->rq_rqbd == NULL, "req %p\n",request);/* client-side */
1010         LASSERTF(list_empty(&request->rq_list), "req %p\n", request);
1011         LASSERTF(list_empty(&request->rq_set_chain), "req %p\n", request);
1012
1013         /* We must take it off the imp_replay_list first.  Otherwise, we'll set
1014          * request->rq_reqmsg to NULL while osc_close is dereferencing it. */
1015         if (request->rq_import != NULL) {
1016                 unsigned long flags = 0;
1017                 if (!locked)
1018                         spin_lock_irqsave(&request->rq_import->imp_lock, flags);
1019                 list_del_init(&request->rq_replay_list);
1020                 if (!locked)
1021                         spin_unlock_irqrestore(&request->rq_import->imp_lock,
1022                                                flags);
1023         }
1024         LASSERTF(list_empty(&request->rq_replay_list), "req %p\n", request);
1025
1026         if (atomic_read(&request->rq_refcount) != 0) {
1027                 DEBUG_REQ(D_ERROR, request,
1028                           "freeing request with nonzero refcount");
1029                 LBUG();
1030         }
1031
1032         if (request->rq_repmsg != NULL) {
1033                 OBD_FREE(request->rq_repmsg, request->rq_replen);
1034                 request->rq_repmsg = NULL;
1035         }
1036         if (request->rq_reqmsg != NULL) {
1037                 OBD_FREE(request->rq_reqmsg, request->rq_reqlen);
1038                 request->rq_reqmsg = NULL;
1039         }
1040         if (request->rq_export != NULL) {
1041                 class_export_put(request->rq_export);
1042                 request->rq_export = NULL;
1043         }
1044         if (request->rq_import != NULL) {
1045                 class_import_put(request->rq_import);
1046                 request->rq_import = NULL;
1047         }
1048         if (request->rq_bulk != NULL)
1049                 ptlrpc_free_bulk(request->rq_bulk);
1050
1051         OBD_FREE(request, sizeof(*request));
1052         EXIT;
1053 }
1054
1055 void ptlrpc_free_req(struct ptlrpc_request *request)
1056 {
1057         __ptlrpc_free_req(request, 0);
1058 }
1059
1060 static int __ptlrpc_req_finished(struct ptlrpc_request *request, int locked);
1061 void ptlrpc_req_finished_with_imp_lock(struct ptlrpc_request *request)
1062 {
1063         LASSERT_SPIN_LOCKED(&request->rq_import->imp_lock);
1064         (void)__ptlrpc_req_finished(request, 1);
1065 }
1066
1067 static int __ptlrpc_req_finished(struct ptlrpc_request *request, int locked)
1068 {
1069         ENTRY;
1070         if (request == NULL)
1071                 RETURN(1);
1072
1073         if (request == (void *)(unsigned long)(0x5a5a5a5a5a5a5a5a) ||
1074             request->rq_reqmsg == (void *)(unsigned long)(0x5a5a5a5a5a5a5a5a)) {
1075                 CERROR("dereferencing freed request (bug 575)\n");
1076                 LBUG();
1077                 RETURN(1);
1078         }
1079
1080         DEBUG_REQ(D_INFO, request, "refcount now %u",
1081                   atomic_read(&request->rq_refcount) - 1);
1082
1083         if (atomic_dec_and_test(&request->rq_refcount)) {
1084                 __ptlrpc_free_req(request, locked);
1085                 RETURN(1);
1086         }
1087
1088         RETURN(0);
1089 }
1090
1091 void ptlrpc_req_finished(struct ptlrpc_request *request)
1092 {
1093         __ptlrpc_req_finished(request, 0);
1094 }
1095
1096 /* Disengage the client's reply buffer from the network
1097  * NB does _NOT_ unregister any client-side bulk.
1098  * IDEMPOTENT, but _not_ safe against concurrent callers.
1099  * The request owner (i.e. the thread doing the I/O) must call...
1100  */
1101 void ptlrpc_unregister_reply (struct ptlrpc_request *request)
1102 {
1103         int                rc;
1104         wait_queue_head_t *wq;
1105         struct l_wait_info lwi;
1106
1107         LASSERT(!in_interrupt ());             /* might sleep */
1108
1109         if (!ptlrpc_client_receiving_reply(request))
1110                 return;
1111
1112         rc = PtlMDUnlink (request->rq_reply_md_h);
1113         if (rc == PTL_INV_MD) {
1114                 LASSERT (!ptlrpc_client_receiving_reply(request));
1115                 return;
1116         }
1117         
1118         LASSERT (rc == PTL_OK);
1119
1120         if (request->rq_set == NULL)
1121                 wq = &request->rq_set->set_waitq;
1122         else
1123                 wq = &request->rq_reply_waitq;
1124
1125         for (;;) {
1126                 /* Network access will complete in finite time but the HUGE
1127                  * timeout lets us CWARN for visibility of sluggish NALs */
1128                 lwi = LWI_TIMEOUT(300 * HZ, NULL, NULL);
1129                 rc = l_wait_event (*wq, !ptlrpc_client_receiving_reply(request), &lwi);
1130                 if (rc == 0)
1131                         return;
1132
1133                 LASSERT (rc == -ETIMEDOUT);
1134                 DEBUG_REQ(D_WARNING, request, "Unexpectedly long timeout");
1135         }
1136 }
1137
1138 /* caller must hold imp->imp_lock */
1139 void ptlrpc_free_committed(struct obd_import *imp)
1140 {
1141         struct list_head *tmp, *saved;
1142         struct ptlrpc_request *req;
1143         struct ptlrpc_request *last_req = NULL; /* temporary fire escape */
1144         ENTRY;
1145
1146         LASSERT(imp != NULL);
1147
1148         LASSERT_SPIN_LOCKED(&imp->imp_lock);
1149
1150         CDEBUG(D_HA, "%s: committing for last_committed "LPU64"\n",
1151                imp->imp_obd->obd_name, imp->imp_peer_committed_transno);
1152
1153         list_for_each_safe(tmp, saved, &imp->imp_replay_list) {
1154                 req = list_entry(tmp, struct ptlrpc_request, rq_replay_list);
1155
1156                 /* XXX ok to remove when 1357 resolved - rread 05/29/03  */
1157                 LASSERT(req != last_req);
1158                 last_req = req;
1159
1160                 if (req->rq_import_generation < imp->imp_generation) {
1161                         DEBUG_REQ(D_HA, req, "freeing request with old gen");
1162                         GOTO(free_req, 0);
1163                 }
1164
1165                 if (req->rq_replay) {
1166                         DEBUG_REQ(D_HA, req, "keeping (FL_REPLAY)");
1167                         continue;
1168                 }
1169
1170                 /* not yet committed */
1171                 if (req->rq_transno > imp->imp_peer_committed_transno) {
1172                         DEBUG_REQ(D_HA, req, "stopping search");
1173                         break;
1174                 }
1175
1176                 DEBUG_REQ(D_HA, req, "committing (last_committed "LPU64")",
1177                           imp->imp_peer_committed_transno);
1178 free_req:
1179                 if (req->rq_commit_cb != NULL)
1180                         req->rq_commit_cb(req);
1181                 list_del_init(&req->rq_replay_list);
1182                 __ptlrpc_req_finished(req, 1);
1183         }
1184
1185         EXIT;
1186         return;
1187 }
1188
1189 void ptlrpc_cleanup_client(struct obd_import *imp)
1190 {
1191         ENTRY;
1192         EXIT;
1193         return;
1194 }
1195
1196 void ptlrpc_resend_req(struct ptlrpc_request *req)
1197 {
1198         unsigned long flags;
1199
1200         DEBUG_REQ(D_HA, req, "going to resend");
1201         req->rq_reqmsg->handle.cookie = 0;
1202         req->rq_status = -EAGAIN;
1203
1204         spin_lock_irqsave (&req->rq_lock, flags);
1205         req->rq_resend = 1;
1206         req->rq_timedout = 0;
1207         if (req->rq_bulk) {
1208                 __u64 old_xid = req->rq_xid;
1209                 
1210                 /* ensure previous bulk fails */
1211                 req->rq_xid = ptlrpc_next_xid();
1212                 CDEBUG(D_HA, "resend bulk old x"LPU64" new x"LPU64"\n",
1213                        old_xid, req->rq_xid);
1214         }
1215         ptlrpc_wake_client_req(req);
1216         spin_unlock_irqrestore (&req->rq_lock, flags);
1217 }
1218
1219 /* XXX: this function and rq_status are currently unused */
1220 void ptlrpc_restart_req(struct ptlrpc_request *req)
1221 {
1222         unsigned long flags;
1223
1224         DEBUG_REQ(D_HA, req, "restarting (possibly-)completed request");
1225         req->rq_status = -ERESTARTSYS;
1226
1227         spin_lock_irqsave (&req->rq_lock, flags);
1228         req->rq_restart = 1;
1229         req->rq_timedout = 0;
1230         ptlrpc_wake_client_req(req);
1231         spin_unlock_irqrestore (&req->rq_lock, flags);
1232 }
1233
1234 static int expired_request(void *data)
1235 {
1236         struct ptlrpc_request *req = data;
1237         ENTRY;
1238
1239         RETURN(ptlrpc_expire_one_request(req));
1240 }
1241
1242 static void interrupted_request(void *data)
1243 {
1244         unsigned long flags;
1245
1246         struct ptlrpc_request *req = data;
1247         DEBUG_REQ(D_HA, req, "request interrupted");
1248         spin_lock_irqsave (&req->rq_lock, flags);
1249         req->rq_intr = 1;
1250         spin_unlock_irqrestore (&req->rq_lock, flags);
1251 }
1252
1253 struct ptlrpc_request *ptlrpc_request_addref(struct ptlrpc_request *req)
1254 {
1255         ENTRY;
1256         atomic_inc(&req->rq_refcount);
1257         RETURN(req);
1258 }
1259
1260 void ptlrpc_retain_replayable_request(struct ptlrpc_request *req,
1261                                       struct obd_import *imp)
1262 {
1263         struct list_head *tmp;
1264
1265         LASSERT_SPIN_LOCKED(&imp->imp_lock);
1266
1267         /* don't re-add requests that have been replayed */
1268         if (!list_empty(&req->rq_replay_list))
1269                 return;
1270
1271         lustre_msg_add_flags(req->rq_reqmsg,
1272                              MSG_REPLAY);
1273
1274         LASSERT(imp->imp_replayable);
1275         /* Balanced in ptlrpc_free_committed, usually. */
1276         ptlrpc_request_addref(req);
1277         list_for_each_prev(tmp, &imp->imp_replay_list) {
1278                 struct ptlrpc_request *iter =
1279                         list_entry(tmp, struct ptlrpc_request, rq_replay_list);
1280
1281                 /* We may have duplicate transnos if we create and then
1282                  * open a file, or for closes retained if to match creating
1283                  * opens, so use req->rq_xid as a secondary key.
1284                  * (See bugs 684, 685, and 428.)
1285                  * XXX no longer needed, but all opens need transnos!
1286                  */
1287                 if (iter->rq_transno > req->rq_transno)
1288                         continue;
1289
1290                 if (iter->rq_transno == req->rq_transno) {
1291                         LASSERT(iter->rq_xid != req->rq_xid);
1292                         if (iter->rq_xid > req->rq_xid)
1293                                 continue;
1294                 }
1295
1296                 list_add(&req->rq_replay_list, &iter->rq_replay_list);
1297                 return;
1298         }
1299
1300         list_add_tail(&req->rq_replay_list, &imp->imp_replay_list);
1301 }
1302
1303 int ptlrpc_queue_wait(struct ptlrpc_request *req)
1304 {
1305         int rc = 0;
1306         int brc;
1307         struct l_wait_info lwi;
1308         struct obd_import *imp = req->rq_import;
1309         unsigned long flags;
1310         int timeout = 0;
1311         ENTRY;
1312
1313         LASSERT(req->rq_set == NULL);
1314         LASSERT(!req->rq_receiving_reply);
1315
1316         /* for distributed debugging */
1317         req->rq_reqmsg->status = current->pid;
1318         LASSERT(imp->imp_obd != NULL);
1319         CDEBUG(D_RPCTRACE, "Sending RPC pname:cluuid:pid:xid:ni:nid:opc "
1320                "%s:%s:%d:"LPU64":%s:"LPX64":%d\n", current->comm,
1321                imp->imp_obd->obd_uuid.uuid,
1322                req->rq_reqmsg->status, req->rq_xid,
1323                imp->imp_connection->c_peer.peer_ni->pni_name,
1324                imp->imp_connection->c_peer.peer_nid,
1325                req->rq_reqmsg->opc);
1326
1327         /* Mark phase here for a little debug help */
1328         req->rq_phase = RQ_PHASE_RPC;
1329
1330         spin_lock_irqsave(&imp->imp_lock, flags);
1331         req->rq_import_generation = imp->imp_generation;
1332 restart:
1333         if (ptlrpc_import_delay_req(imp, req, &rc)) {
1334                 list_del(&req->rq_list);
1335
1336                 list_add_tail(&req->rq_list, &imp->imp_delayed_list);
1337                 spin_unlock_irqrestore(&imp->imp_lock, flags);
1338
1339                 DEBUG_REQ(D_HA, req, "\"%s\" waiting for recovery: (%s != %s)",
1340                           current->comm, 
1341                           ptlrpc_import_state_name(req->rq_send_state), 
1342                           ptlrpc_import_state_name(imp->imp_state));
1343                 lwi = LWI_INTR(interrupted_request, req);
1344                 rc = l_wait_event(req->rq_reply_waitq,
1345                                   (req->rq_send_state == imp->imp_state ||
1346                                    req->rq_err),
1347                                   &lwi);
1348                 DEBUG_REQ(D_HA, req, "\"%s\" awake: (%s == %s or %d == 1)",
1349                           current->comm, 
1350                           ptlrpc_import_state_name(imp->imp_state), 
1351                           ptlrpc_import_state_name(req->rq_send_state),
1352                           req->rq_err);
1353
1354                 spin_lock_irqsave(&imp->imp_lock, flags);
1355                 list_del_init(&req->rq_list);
1356
1357                 if (req->rq_err) {
1358                         rc = -EIO;
1359                 } 
1360                 else if (req->rq_intr) {
1361                         rc = -EINTR;
1362                 }
1363                 else if (req->rq_no_resend) {
1364                         spin_unlock_irqrestore(&imp->imp_lock, flags);
1365                         GOTO(out, rc = -ETIMEDOUT);
1366                 }
1367                 else {
1368                         GOTO(restart, rc);
1369                 }
1370         } 
1371
1372         if (rc != 0) {
1373                 list_del_init(&req->rq_list);
1374                 spin_unlock_irqrestore(&imp->imp_lock, flags);
1375                 req->rq_status = rc; // XXX this ok?
1376                 GOTO(out, rc);
1377         }
1378
1379         if (req->rq_resend) {
1380                 lustre_msg_add_flags(req->rq_reqmsg, MSG_RESENT);
1381
1382                 if (req->rq_bulk != NULL)
1383                         ptlrpc_unregister_bulk (req);
1384
1385                 DEBUG_REQ(D_HA, req, "resending: ");
1386         }
1387
1388         /* XXX this is the same as ptlrpc_set_wait */
1389         LASSERT(list_empty(&req->rq_list));
1390         list_add_tail(&req->rq_list, &imp->imp_sending_list);
1391         spin_unlock_irqrestore(&imp->imp_lock, flags);
1392
1393         rc = ptl_send_rpc(req);
1394         if (rc) {
1395                 DEBUG_REQ(D_HA, req, "send failed (%d); recovering", rc);
1396                 timeout = 1;
1397         } else {
1398                 timeout = MAX(req->rq_timeout * HZ, 1);
1399                 DEBUG_REQ(D_NET, req, "-- sleeping");
1400         }
1401         lwi = LWI_TIMEOUT_INTR(timeout, expired_request, interrupted_request,
1402                                req);
1403         l_wait_event(req->rq_reply_waitq, ptlrpc_check_reply(req), &lwi);
1404         DEBUG_REQ(D_NET, req, "-- done sleeping");
1405
1406         CDEBUG(D_RPCTRACE, "Completed RPC pname:cluuid:pid:xid:ni:nid:opc "
1407                "%s:%s:%d:"LPU64":%s:"LPX64":%d\n", current->comm,
1408                imp->imp_obd->obd_uuid.uuid,
1409                req->rq_reqmsg->status, req->rq_xid,
1410                imp->imp_connection->c_peer.peer_ni->pni_name,
1411                imp->imp_connection->c_peer.peer_nid,
1412                req->rq_reqmsg->opc);
1413
1414         spin_lock_irqsave(&imp->imp_lock, flags);
1415         list_del_init(&req->rq_list);
1416         spin_unlock_irqrestore(&imp->imp_lock, flags);
1417
1418         /* If the reply was received normally, this just grabs the spinlock
1419          * (ensuring the reply callback has returned), sees that
1420          * req->rq_receiving_reply is clear and returns. */
1421         ptlrpc_unregister_reply (req);
1422
1423         if (req->rq_err)
1424                 GOTO(out, rc = -EIO);
1425
1426         /* Resend if we need to, unless we were interrupted. */
1427         if (req->rq_resend && !req->rq_intr) {
1428                 /* ...unless we were specifically told otherwise. */
1429                 if (req->rq_no_resend)
1430                         GOTO(out, rc = -ETIMEDOUT);
1431                 spin_lock_irqsave(&imp->imp_lock, flags);
1432                 goto restart;
1433         }
1434
1435         if (req->rq_intr) {
1436                 /* Should only be interrupted if we timed out. */
1437                 if (!req->rq_timedout)
1438                         DEBUG_REQ(D_ERROR, req,
1439                                   "rq_intr set but rq_timedout not");
1440                 GOTO(out, rc = -EINTR);
1441         }
1442
1443         if (req->rq_timedout) {                 /* non-recoverable timeout */
1444                 GOTO(out, rc = -ETIMEDOUT);
1445         }
1446
1447         if (!req->rq_replied) {
1448                 /* How can this be? -eeb */
1449                 DEBUG_REQ(D_ERROR, req, "!rq_replied: ");
1450                 LBUG();
1451                 GOTO(out, rc = req->rq_status);
1452         }
1453
1454         rc = after_reply (req);
1455         /* NB may return +ve success rc */
1456         if (req->rq_resend) {
1457                 spin_lock_irqsave(&imp->imp_lock, flags);
1458                 goto restart;
1459         }
1460
1461  out:
1462         if (req->rq_bulk != NULL) {
1463                 if (rc >= 0) {                  
1464                         /* success so far.  Note that anything going wrong
1465                          * with bulk now, is EXTREMELY strange, since the
1466                          * server must have believed that the bulk
1467                          * tranferred OK before she replied with success to
1468                          * me. */
1469                         lwi = LWI_TIMEOUT(timeout, NULL, NULL);
1470                         brc = l_wait_event(req->rq_reply_waitq,
1471                                            !ptlrpc_bulk_active(req->rq_bulk),
1472                                            &lwi);
1473                         LASSERT(brc == 0 || brc == -ETIMEDOUT);
1474                         if (brc != 0) {
1475                                 LASSERT(brc == -ETIMEDOUT);
1476                                 DEBUG_REQ(D_ERROR, req, "bulk timed out");
1477                                 rc = brc;
1478                         } else if (!req->rq_bulk->bd_success) {
1479                                 DEBUG_REQ(D_ERROR, req, "bulk transfer failed");
1480                                 rc = -EIO;
1481                         }
1482                 }
1483                 if (rc < 0)
1484                         ptlrpc_unregister_bulk (req);
1485         }
1486
1487         LASSERT(!req->rq_receiving_reply);
1488         req->rq_phase = RQ_PHASE_INTERPRET;
1489         RETURN(rc);
1490 }
1491
1492 struct ptlrpc_replay_async_args {
1493         int praa_old_state;
1494         int praa_old_status;
1495 };
1496
1497 static int ptlrpc_replay_interpret(struct ptlrpc_request *req,
1498                                     void * data, int rc)
1499 {
1500         struct ptlrpc_replay_async_args *aa = data;
1501         struct obd_import *imp = req->rq_import;
1502         unsigned long flags;
1503
1504         atomic_dec(&imp->imp_replay_inflight);
1505         
1506         if (!req->rq_replied) {
1507                 CERROR("request replay timed out, restarting recovery\n");
1508                 GOTO(out, rc = -ETIMEDOUT);
1509         }
1510
1511 #if SWAB_PARANOIA
1512         /* Clear reply swab mask; this is a new reply in sender's byte order */
1513         req->rq_rep_swab_mask = 0;
1514 #endif
1515         LASSERT (req->rq_nob_received <= req->rq_replen);
1516         rc = lustre_unpack_msg(req->rq_repmsg, req->rq_nob_received);
1517         if (rc) {
1518                 CERROR("unpack_rep failed: %d\n", rc);
1519                 GOTO(out, rc = -EPROTO);
1520         }
1521
1522         if (req->rq_repmsg->type == PTL_RPC_MSG_ERR && 
1523             req->rq_repmsg->status == -ENOTCONN) 
1524                 GOTO(out, rc = req->rq_repmsg->status);
1525
1526         /* The transno had better not change over replay. */
1527         LASSERT(req->rq_reqmsg->transno == req->rq_repmsg->transno);
1528
1529         DEBUG_REQ(D_HA, req, "got rep");
1530
1531         /* let the callback do fixups, possibly including in the request */
1532         if (req->rq_replay_cb)
1533                 req->rq_replay_cb(req);
1534
1535         if (req->rq_replied && req->rq_repmsg->status != aa->praa_old_status) {
1536                 DEBUG_REQ(D_ERROR, req, "status %d, old was %d",
1537                           req->rq_repmsg->status, aa->praa_old_status);
1538         } else {
1539                 /* Put it back for re-replay. */
1540                 req->rq_repmsg->status = aa->praa_old_status;
1541         }
1542
1543         spin_lock_irqsave(&imp->imp_lock, flags);
1544         imp->imp_last_replay_transno = req->rq_transno;
1545         spin_unlock_irqrestore(&imp->imp_lock, flags);
1546
1547         /* continue with recovery */
1548         rc = ptlrpc_import_recovery_state_machine(imp);
1549  out:
1550         req->rq_send_state = aa->praa_old_state;
1551         
1552         if (rc != 0)
1553                 /* this replay failed, so restart recovery */
1554                 ptlrpc_connect_import(imp, NULL);
1555
1556         RETURN(rc);
1557 }
1558
1559
1560 int ptlrpc_replay_req(struct ptlrpc_request *req)
1561 {
1562         struct ptlrpc_replay_async_args *aa;
1563         ENTRY;
1564
1565         LASSERT(req->rq_import->imp_state == LUSTRE_IMP_REPLAY);
1566
1567         /* Not handling automatic bulk replay yet (or ever?) */
1568         LASSERT(req->rq_bulk == NULL);
1569
1570         DEBUG_REQ(D_HA, req, "REPLAY");
1571
1572         LASSERT (sizeof (*aa) <= sizeof (req->rq_async_args));
1573         aa = (struct ptlrpc_replay_async_args *)&req->rq_async_args;
1574         memset(aa, 0, sizeof *aa);
1575
1576         /* Prepare request to be resent with ptlrpcd */
1577         aa->praa_old_state = req->rq_send_state;
1578         req->rq_send_state = LUSTRE_IMP_REPLAY;
1579         req->rq_phase = RQ_PHASE_NEW;
1580         /*
1581          * Q: "How can a req get on the replay list if it wasn't replied?"
1582          * A: "If we failed during the replay of this request, it will still
1583          *     be on the list, but rq_replied will have been reset to 0."
1584          */
1585         if (req->rq_replied) {
1586                 aa->praa_old_status = req->rq_repmsg->status;
1587                 req->rq_status = 0;
1588                 req->rq_replied = 0;
1589         }
1590
1591         req->rq_interpret_reply = ptlrpc_replay_interpret;
1592         atomic_inc(&req->rq_import->imp_replay_inflight);
1593         ptlrpc_request_addref(req); /* ptlrpcd needs a ref */
1594
1595         ptlrpcd_add_req(req);
1596         RETURN(0);
1597 }
1598
1599 void ptlrpc_abort_inflight(struct obd_import *imp)
1600 {
1601         unsigned long flags;
1602         struct list_head *tmp, *n;
1603         ENTRY;
1604
1605         /* Make sure that no new requests get processed for this import.
1606          * ptlrpc_{queue,set}_wait must (and does) hold imp_lock while testing
1607          * this flag and then putting requests on sending_list or delayed_list.
1608          */
1609         spin_lock_irqsave(&imp->imp_lock, flags);
1610
1611         /* XXX locking?  Maybe we should remove each request with the list
1612          * locked?  Also, how do we know if the requests on the list are
1613          * being freed at this time?
1614          */
1615         list_for_each_safe(tmp, n, &imp->imp_sending_list) {
1616                 struct ptlrpc_request *req =
1617                         list_entry(tmp, struct ptlrpc_request, rq_list);
1618
1619                 DEBUG_REQ(D_HA, req, "inflight");
1620
1621                 spin_lock (&req->rq_lock);
1622                 if (req->rq_import_generation < imp->imp_generation) {
1623                         req->rq_err = 1;
1624                         ptlrpc_wake_client_req(req);
1625                 }
1626                 spin_unlock (&req->rq_lock);
1627         }
1628
1629         list_for_each_safe(tmp, n, &imp->imp_delayed_list) {
1630                 struct ptlrpc_request *req =
1631                         list_entry(tmp, struct ptlrpc_request, rq_list);
1632
1633                 DEBUG_REQ(D_HA, req, "aborting waiting req");
1634
1635                 spin_lock (&req->rq_lock);
1636                 if (req->rq_import_generation < imp->imp_generation) {
1637                         req->rq_err = 1;
1638                         ptlrpc_wake_client_req(req);
1639                 }
1640                 spin_unlock (&req->rq_lock);
1641         }
1642
1643         /* Last chance to free reqs left on the replay list, but we
1644          * will still leak reqs that haven't comitted.  */
1645         if (imp->imp_replayable)
1646                 ptlrpc_free_committed(imp);
1647
1648         spin_unlock_irqrestore(&imp->imp_lock, flags);
1649
1650         EXIT;
1651 }
1652
1653 static __u64 ptlrpc_last_xid = 0;
1654 static spinlock_t ptlrpc_last_xid_lock = SPIN_LOCK_UNLOCKED;
1655
1656 __u64 ptlrpc_next_xid(void)
1657 {
1658         __u64 tmp;
1659         spin_lock(&ptlrpc_last_xid_lock);
1660         tmp = ++ptlrpc_last_xid;
1661         spin_unlock(&ptlrpc_last_xid_lock);
1662         return tmp;
1663 }
1664
1665