Whamcloud - gitweb
Land b_smallfix onto HEAD (20040423_1603)
[fs/lustre-release.git] / lustre / ptlrpc / client.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  *
21  */
22
23 #define DEBUG_SUBSYSTEM S_RPC
24 #ifndef __KERNEL__
25 #include <errno.h>
26 #include <signal.h>
27 #include <liblustre.h>
28 #endif
29
30 #include <linux/obd_support.h>
31 #include <linux/obd_class.h>
32 #include <linux/lustre_lib.h>
33 #include <linux/lustre_ha.h>
34 #include <linux/lustre_import.h>
35
36 #include "ptlrpc_internal.h"
37
38 void ptlrpc_init_client(int req_portal, int rep_portal, char *name,
39                         struct ptlrpc_client *cl)
40 {
41         cl->cli_request_portal = req_portal;
42         cl->cli_reply_portal   = rep_portal;
43         cl->cli_name           = name;
44 }
45
46 struct ptlrpc_connection *ptlrpc_uuid_to_connection(struct obd_uuid *uuid)
47 {
48         struct ptlrpc_connection *c;
49         struct ptlrpc_peer peer;
50         int err;
51
52         err = ptlrpc_uuid_to_peer(uuid, &peer);
53         if (err != 0) {
54                 CERROR("cannot find peer %s!\n", uuid->uuid);
55                 return NULL;
56         }
57
58         c = ptlrpc_get_connection(&peer, uuid);
59         if (c) {
60                 memcpy(c->c_remote_uuid.uuid,
61                        uuid->uuid, sizeof(c->c_remote_uuid.uuid));
62         }
63
64         CDEBUG(D_INFO, "%s -> %p\n", uuid->uuid, c);
65
66         return c;
67 }
68
69 void ptlrpc_readdress_connection(struct ptlrpc_connection *conn,
70                                  struct obd_uuid *uuid)
71 {
72         struct ptlrpc_peer peer;
73         int err;
74
75         err = ptlrpc_uuid_to_peer(uuid, &peer);
76         if (err != 0) {
77                 CERROR("cannot find peer %s!\n", uuid->uuid);
78                 return;
79         }
80
81         memcpy(&conn->c_peer, &peer, sizeof (peer));
82         return;
83 }
84
85 static inline struct ptlrpc_bulk_desc *new_bulk(int npages, int type, int portal)
86 {
87         struct ptlrpc_bulk_desc *desc;
88
89         OBD_ALLOC(desc, offsetof (struct ptlrpc_bulk_desc, bd_iov[npages]));
90         if (!desc)
91                 return NULL;
92
93         spin_lock_init(&desc->bd_lock);
94         init_waitqueue_head(&desc->bd_waitq);
95         desc->bd_max_iov = npages;
96         desc->bd_iov_count = 0;
97         desc->bd_md_h = PTL_INVALID_HANDLE;
98         desc->bd_portal = portal;
99         desc->bd_type = type;
100         
101         return desc;
102 }
103
104 struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_imp (struct ptlrpc_request *req,
105                                                int npages, int type, int portal)
106 {
107         struct obd_import *imp = req->rq_import;
108         struct ptlrpc_bulk_desc *desc;
109
110         LASSERT(type == BULK_PUT_SINK || type == BULK_GET_SOURCE);
111         desc = new_bulk(npages, type, portal);
112         if (desc == NULL)
113                 RETURN(NULL);
114
115         desc->bd_import_generation = req->rq_import_generation;
116         desc->bd_import = class_import_get(imp);
117         desc->bd_req = req;
118
119         desc->bd_cbid.cbid_fn  = client_bulk_callback;
120         desc->bd_cbid.cbid_arg = desc;
121
122         /* This makes req own desc, and free it when she frees herself */
123         req->rq_bulk = desc;
124
125         return desc;
126 }
127
128 struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_exp (struct ptlrpc_request *req,
129                                                int npages, int type, int portal)
130 {
131         struct obd_export *exp = req->rq_export;
132         struct ptlrpc_bulk_desc *desc;
133
134         LASSERT(type == BULK_PUT_SOURCE || type == BULK_GET_SINK);
135
136         desc = new_bulk(npages, type, portal);
137         if (desc == NULL)
138                 RETURN(NULL);
139
140         desc->bd_export = class_export_get(exp);
141         desc->bd_req = req;
142
143         desc->bd_cbid.cbid_fn  = server_bulk_callback;
144         desc->bd_cbid.cbid_arg = desc;
145
146         /* NB we don't assign rq_bulk here; server-side requests are
147          * re-used, and the handler frees the bulk desc explicitly. */
148
149         return desc;
150 }
151
152 void ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc,
153                            struct page *page, int pageoffset, int len)
154 {
155         LASSERT(desc->bd_iov_count < desc->bd_max_iov);
156         LASSERT(page != NULL);
157         LASSERT(pageoffset >= 0);
158         LASSERT(len > 0);
159         LASSERT(pageoffset + len <= PAGE_SIZE);
160
161         desc->bd_nob += len;
162
163         ptlrpc_add_bulk_page(desc, page, pageoffset, len);
164 }
165
166 void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *desc)
167 {
168         ENTRY;
169
170         LASSERT(desc != NULL);
171         LASSERT(desc->bd_iov_count != 0x5a5a5a5a); /* not freed already */
172         LASSERT(!desc->bd_network_rw);         /* network hands off or */
173         LASSERT((desc->bd_export != NULL) ^ (desc->bd_import != NULL));
174         if (desc->bd_export)
175                 class_export_put(desc->bd_export);
176         else
177                 class_import_put(desc->bd_import);
178
179         OBD_FREE(desc, offsetof(struct ptlrpc_bulk_desc, 
180                                 bd_iov[desc->bd_max_iov]));
181         EXIT;
182 }
183
184 struct ptlrpc_request *ptlrpc_prep_req(struct obd_import *imp, int opcode,
185                                        int count, int *lengths, char **bufs)
186 {
187         struct ptlrpc_request *request;
188         int rc;
189         ENTRY;
190
191         LASSERT((unsigned long)imp > 0x1000);
192
193         OBD_ALLOC(request, sizeof(*request));
194         if (!request) {
195                 CERROR("request allocation out of memory\n");
196                 RETURN(NULL);
197         }
198
199         rc = lustre_pack_request(request, count, lengths, bufs);
200         if (rc) {
201                 CERROR("cannot pack request %d\n", rc);
202                 OBD_FREE(request, sizeof(*request));
203                 RETURN(NULL);
204         }
205
206         if (imp->imp_server_timeout)
207                 request->rq_timeout = obd_timeout / 2;
208         else
209                 request->rq_timeout = obd_timeout;
210         request->rq_send_state = LUSTRE_IMP_FULL;
211         request->rq_type = PTL_RPC_MSG_REQUEST;
212         request->rq_import = class_import_get(imp);
213
214         request->rq_req_cbid.cbid_fn  = request_out_callback;
215         request->rq_req_cbid.cbid_arg = request;
216
217         request->rq_reply_cbid.cbid_fn  = reply_in_callback;
218         request->rq_reply_cbid.cbid_arg = request;
219         
220         request->rq_phase = RQ_PHASE_NEW;
221
222         /* XXX FIXME bug 249 */
223         request->rq_request_portal = imp->imp_client->cli_request_portal;
224         request->rq_reply_portal = imp->imp_client->cli_reply_portal;
225
226         spin_lock_init(&request->rq_lock);
227         INIT_LIST_HEAD(&request->rq_list);
228         INIT_LIST_HEAD(&request->rq_replay_list);
229         INIT_LIST_HEAD(&request->rq_set_chain);
230         init_waitqueue_head(&request->rq_reply_waitq);
231         request->rq_xid = ptlrpc_next_xid();
232         atomic_set(&request->rq_refcount, 1);
233
234         request->rq_reqmsg->opc = opcode;
235         request->rq_reqmsg->flags = 0;
236
237         RETURN(request);
238 }
239
240 struct ptlrpc_request_set *ptlrpc_prep_set(void)
241 {
242         struct ptlrpc_request_set *set;
243
244         OBD_ALLOC(set, sizeof *set);
245         if (!set)
246                 RETURN(NULL);
247         INIT_LIST_HEAD(&set->set_requests);
248         init_waitqueue_head(&set->set_waitq);
249         set->set_remaining = 0;
250         spin_lock_init(&set->set_new_req_lock);
251         INIT_LIST_HEAD(&set->set_new_requests);
252
253         RETURN(set);
254 }
255
256 /* Finish with this set; opposite of prep_set. */
257 void ptlrpc_set_destroy(struct ptlrpc_request_set *set)
258 {
259         struct list_head *tmp;
260         struct list_head *next;
261         int               expected_phase;
262         int               n = 0;
263         ENTRY;
264
265         /* Requests on the set should either all be completed, or all be new */
266         expected_phase = (set->set_remaining == 0) ?
267                          RQ_PHASE_COMPLETE : RQ_PHASE_NEW;
268         list_for_each (tmp, &set->set_requests) {
269                 struct ptlrpc_request *req =
270                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
271
272                 LASSERT(req->rq_phase == expected_phase);
273                 n++;
274         }
275
276         LASSERT(set->set_remaining == 0 || set->set_remaining == n);
277
278         list_for_each_safe(tmp, next, &set->set_requests) {
279                 struct ptlrpc_request *req =
280                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
281                 list_del_init(&req->rq_set_chain);
282
283                 LASSERT(req->rq_phase == expected_phase);
284
285                 if (req->rq_phase == RQ_PHASE_NEW) {
286
287                         if (req->rq_interpret_reply != NULL) {
288                                 int (*interpreter)(struct ptlrpc_request *,
289                                                    void *, int) =
290                                         req->rq_interpret_reply;
291
292                                 /* higher level (i.e. LOV) failed;
293                                  * let the sub reqs clean up */
294                                 req->rq_status = -EBADR;
295                                 interpreter(req, &req->rq_async_args,
296                                             req->rq_status);
297                         }
298                         set->set_remaining--;
299                 }
300
301                 req->rq_set = NULL;
302                 ptlrpc_req_finished (req);
303         }
304
305         LASSERT(set->set_remaining == 0);
306
307         OBD_FREE(set, sizeof(*set));
308         EXIT;
309 }
310
311 void ptlrpc_set_add_req(struct ptlrpc_request_set *set,
312                         struct ptlrpc_request *req)
313 {
314         /* The set takes over the caller's request reference */
315         list_add_tail(&req->rq_set_chain, &set->set_requests);
316         req->rq_set = set;
317         set->set_remaining++;
318         atomic_inc(&req->rq_import->imp_inflight);
319 }
320
321 /* lock so many callers can add things, the context that owns the set
322  * is supposed to notice these and move them into the set proper. */
323 void ptlrpc_set_add_new_req(struct ptlrpc_request_set *set,
324                             struct ptlrpc_request *req)
325 {
326         unsigned long flags;
327         spin_lock_irqsave(&set->set_new_req_lock, flags);
328         /* The set takes over the caller's request reference */
329         list_add_tail(&req->rq_set_chain, &set->set_new_requests);
330         req->rq_set = set;
331         spin_unlock_irqrestore(&set->set_new_req_lock, flags);
332 }
333
334 /*
335  * Based on the current state of the import, determine if the request
336  * can be sent, is an error, or should be delayed.
337  *
338  * Returns true if this request should be delayed. If false, and
339  * *status is set, then the request can not be sent and *status is the
340  * error code.  If false and status is 0, then request can be sent.
341  *
342  * The imp->imp_lock must be held.
343  */
344 static int ptlrpc_import_delay_req(struct obd_import *imp, 
345                                    struct ptlrpc_request *req, int *status)
346 {
347         int delay = 0;
348         ENTRY;
349
350         LASSERT (status != NULL);
351         *status = 0;
352
353         if (imp->imp_state == LUSTRE_IMP_NEW) {
354                 DEBUG_REQ(D_ERROR, req, "Uninitialized import.");
355                 *status = -EIO;
356                 LBUG();
357         }
358         else if (imp->imp_state == LUSTRE_IMP_CLOSED) {
359                 DEBUG_REQ(D_ERROR, req, "IMP_CLOSED ");
360                 *status = -EIO;
361         }
362         /* allow CONNECT even if import is invalid */
363         else if (req->rq_send_state == LUSTRE_IMP_CONNECTING &&
364                  imp->imp_state == LUSTRE_IMP_CONNECTING) {
365                 ;
366         }
367         /*
368          * If the import has been invalidated (such as by an OST failure), the
369          * request must fail with -EIO.  
370          */
371         else if (imp->imp_invalid) {
372                 DEBUG_REQ(D_ERROR, req, "IMP_INVALID");
373                 *status = -EIO;
374         } 
375         else if (req->rq_import_generation != imp->imp_generation) {
376                 DEBUG_REQ(D_ERROR, req, "req wrong generation:");
377                 *status = -EIO;
378         } 
379         else if (req->rq_send_state != imp->imp_state) {
380                 if (imp->imp_obd->obd_no_recov || imp->imp_dlm_fake 
381                     || req->rq_no_delay) 
382                         *status = -EWOULDBLOCK;
383                 else
384                         delay = 1;
385         }
386
387         RETURN(delay);
388 }
389
390 static int ptlrpc_check_reply(struct ptlrpc_request *req)
391 {
392         unsigned long flags;
393         int rc = 0;
394         ENTRY;
395
396         /* serialise with network callback */
397         spin_lock_irqsave (&req->rq_lock, flags);
398
399         if (req->rq_replied) {
400                 DEBUG_REQ(D_NET, req, "REPLIED:");
401                 GOTO(out, rc = 1);
402         }
403         
404         if (req->rq_net_err && !req->rq_timedout) {
405                 spin_unlock_irqrestore (&req->rq_lock, flags);
406                 rc = ptlrpc_expire_one_request(req); 
407                 spin_lock_irqsave (&req->rq_lock, flags);
408                 GOTO(out, rc);
409         }
410
411         if (req->rq_err) {
412                 DEBUG_REQ(D_ERROR, req, "ABORTED:");
413                 GOTO(out, rc = 1);
414         }
415
416         if (req->rq_resend) {
417                 DEBUG_REQ(D_ERROR, req, "RESEND:");
418                 GOTO(out, rc = 1);
419         }
420
421         if (req->rq_restart) {
422                 DEBUG_REQ(D_ERROR, req, "RESTART:");
423                 GOTO(out, rc = 1);
424         }
425         EXIT;
426  out:
427         spin_unlock_irqrestore (&req->rq_lock, flags);
428         DEBUG_REQ(D_NET, req, "rc = %d for", rc);
429         return rc;
430 }
431
432 static int ptlrpc_check_status(struct ptlrpc_request *req)
433 {
434         int err;
435         ENTRY;
436
437         err = req->rq_repmsg->status;
438         if (req->rq_repmsg->type == PTL_RPC_MSG_ERR) {
439                 DEBUG_REQ(D_ERROR, req, "type == PTL_RPC_MSG_ERR, err == %d", 
440                           err);
441                 RETURN(err < 0 ? err : -EINVAL);
442         }
443
444         if (err < 0) {
445                 DEBUG_REQ(D_INFO, req, "status is %d", err);
446         } else if (err > 0) {
447                 /* XXX: translate this error from net to host */
448                 DEBUG_REQ(D_INFO, req, "status is %d", err);
449         }
450
451         RETURN(err);
452 }
453
454 static int after_reply(struct ptlrpc_request *req)
455 {
456         unsigned long flags;
457         struct obd_import *imp = req->rq_import;
458         int rc;
459         ENTRY;
460
461         LASSERT(!req->rq_receiving_reply);
462
463         /* NB Until this point, the whole of the incoming message,
464          * including buflens, status etc is in the sender's byte order. */
465
466 #if SWAB_PARANOIA
467         /* Clear reply swab mask; this is a new reply in sender's byte order */
468         req->rq_rep_swab_mask = 0;
469 #endif
470         LASSERT (req->rq_nob_received <= req->rq_replen);
471         rc = lustre_unpack_msg(req->rq_repmsg, req->rq_nob_received);
472         if (rc) {
473                 CERROR("unpack_rep failed: %d\n", rc);
474                 RETURN(-EPROTO);
475         }
476
477         if (req->rq_repmsg->type != PTL_RPC_MSG_REPLY &&
478             req->rq_repmsg->type != PTL_RPC_MSG_ERR) {
479                 CERROR("invalid packet type received (type=%u)\n",
480                        req->rq_repmsg->type);
481                 RETURN(-EPROTO);
482         }
483
484         /* Store transno in reqmsg for replay. */
485         req->rq_reqmsg->transno = req->rq_transno = req->rq_repmsg->transno;
486
487         rc = ptlrpc_check_status(req);
488
489         /* Either we've been evicted, or the server has failed for
490          * some reason. Try to reconnect, and if that fails, punt to the
491          * upcall. */
492         if (rc == -ENOTCONN) {
493                 if (req->rq_send_state != LUSTRE_IMP_FULL ||
494                     imp->imp_obd->obd_no_recov || imp->imp_dlm_fake) {
495                         RETURN(-ENOTCONN);
496                 }
497
498                 ptlrpc_request_handle_notconn(req);
499
500                 RETURN(rc);
501         }
502
503         if (req->rq_import->imp_replayable) {
504                 spin_lock_irqsave(&imp->imp_lock, flags);
505                 if (req->rq_replay || req->rq_transno != 0)
506                         ptlrpc_retain_replayable_request(req, imp);
507                 else if (req->rq_commit_cb != NULL) {
508                         spin_unlock_irqrestore(&imp->imp_lock, flags);
509                         req->rq_commit_cb(req);
510                         spin_lock_irqsave(&imp->imp_lock, flags);
511                 }
512
513                 if (req->rq_transno > imp->imp_max_transno)
514                         imp->imp_max_transno = req->rq_transno;
515
516                 /* Replay-enabled imports return commit-status information. */
517                 if (req->rq_repmsg->last_committed)
518                         imp->imp_peer_committed_transno =
519                                 req->rq_repmsg->last_committed;
520                 ptlrpc_free_committed(imp);
521                 spin_unlock_irqrestore(&imp->imp_lock, flags);
522         }
523
524         RETURN(rc);
525 }
526
527 static int ptlrpc_send_new_req(struct ptlrpc_request *req)
528 {
529         char                   str[PTL_NALFMT_SIZE];
530         struct obd_import     *imp;
531         unsigned long          flags;
532         int rc;
533         ENTRY;
534
535         LASSERT(req->rq_phase == RQ_PHASE_NEW);
536         req->rq_phase = RQ_PHASE_RPC;
537
538         imp = req->rq_import;
539         spin_lock_irqsave(&imp->imp_lock, flags);
540
541         req->rq_import_generation = imp->imp_generation;
542
543         if (ptlrpc_import_delay_req(imp, req, &rc)) {
544                 spin_lock (&req->rq_lock);
545                 req->rq_waiting = 1;
546                 spin_unlock (&req->rq_lock);
547
548                 DEBUG_REQ(D_HA, req, "req from PID %d waiting for recovery: "
549                           "(%s != %s)",
550                           req->rq_reqmsg->status, 
551                           ptlrpc_import_state_name(req->rq_send_state),
552                           ptlrpc_import_state_name(imp->imp_state));
553                 LASSERT(list_empty (&req->rq_list));
554
555                 list_add_tail(&req->rq_list, &imp->imp_delayed_list);
556                 spin_unlock_irqrestore(&imp->imp_lock, flags);
557                 RETURN(0);
558         }
559
560         if (rc != 0) {
561                 spin_unlock_irqrestore(&imp->imp_lock, flags);
562                 req->rq_status = rc;
563                 req->rq_phase = RQ_PHASE_INTERPRET;
564                 RETURN(rc);
565         }
566
567         /* XXX this is the same as ptlrpc_queue_wait */
568         LASSERT(list_empty(&req->rq_list));
569         list_add_tail(&req->rq_list, &imp->imp_sending_list);
570         spin_unlock_irqrestore(&imp->imp_lock, flags);
571
572         req->rq_reqmsg->status = current->pid;
573         CDEBUG(D_RPCTRACE, "Sending RPC pname:cluuid:pid:xid:ni:nid:opc"
574                " %s:%s:%d:"LPU64":%s:%s:%d\n", current->comm,
575                imp->imp_obd->obd_uuid.uuid, req->rq_reqmsg->status,
576                req->rq_xid,
577                imp->imp_connection->c_peer.peer_ni->pni_name,
578                ptlrpc_peernid2str(&imp->imp_connection->c_peer, str),
579                req->rq_reqmsg->opc);
580
581         rc = ptl_send_rpc(req);
582         if (rc) {
583                 DEBUG_REQ(D_HA, req, "send failed (%d); expect timeout", rc);
584                 req->rq_net_err = 1;
585                 RETURN(rc);
586         }
587         RETURN(0);
588 }
589
590 int ptlrpc_check_set(struct ptlrpc_request_set *set)
591 {
592         char str[PTL_NALFMT_SIZE];
593         unsigned long flags;
594         struct list_head *tmp;
595         int force_timer_recalc = 0;
596         ENTRY;
597
598         if (set->set_remaining == 0)
599                 RETURN(1);
600
601         list_for_each(tmp, &set->set_requests) {
602                 struct ptlrpc_request *req =
603                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
604                 struct obd_import *imp = req->rq_import;
605                 int rc = 0;
606
607                 if (req->rq_phase == RQ_PHASE_NEW &&
608                     ptlrpc_send_new_req(req)) {
609                         force_timer_recalc = 1;
610                 }
611
612                 if (!(req->rq_phase == RQ_PHASE_RPC ||
613                       req->rq_phase == RQ_PHASE_BULK ||
614                       req->rq_phase == RQ_PHASE_INTERPRET ||
615                       req->rq_phase == RQ_PHASE_COMPLETE)) {
616                         DEBUG_REQ(D_ERROR, req, "bad phase %x", req->rq_phase);
617                         LBUG();
618                 }
619
620                 if (req->rq_phase == RQ_PHASE_COMPLETE)
621                         continue;
622
623                 if (req->rq_phase == RQ_PHASE_INTERPRET)
624                         GOTO(interpret, req->rq_status);
625
626                 if (req->rq_net_err && !req->rq_timedout)
627                         ptlrpc_expire_one_request(req); 
628
629                 if (req->rq_err) {
630                         ptlrpc_unregister_reply(req);
631                         if (req->rq_status == 0)
632                                 req->rq_status = -EIO;
633                         req->rq_phase = RQ_PHASE_INTERPRET;
634
635                         spin_lock_irqsave(&imp->imp_lock, flags);
636                         list_del_init(&req->rq_list);
637                         spin_unlock_irqrestore(&imp->imp_lock, flags);
638
639                         GOTO(interpret, req->rq_status);
640                 }
641
642                 /* ptlrpc_queue_wait->l_wait_event guarantees that rq_intr
643                  * will only be set after rq_timedout, but the oig waiting
644                  * path sets rq_intr irrespective of whether ptlrpcd has
645                  * seen a timeout.  our policy is to only interpret 
646                  * interrupted rpcs after they have timed out */
647                 if (req->rq_intr && (req->rq_timedout || req->rq_waiting)) {
648                         /* NB could be on delayed list */
649                         ptlrpc_unregister_reply(req);
650                         req->rq_status = -EINTR;
651                         req->rq_phase = RQ_PHASE_INTERPRET;
652
653                         spin_lock_irqsave(&imp->imp_lock, flags);
654                         list_del_init(&req->rq_list);
655                         spin_unlock_irqrestore(&imp->imp_lock, flags);
656
657                         GOTO(interpret, req->rq_status);
658                 }
659
660                 if (req->rq_phase == RQ_PHASE_RPC) {
661                         if (req->rq_waiting || req->rq_resend) {
662                                 int status;
663
664                                 spin_lock_irqsave(&imp->imp_lock, flags);
665
666                                 if (ptlrpc_import_delay_req(imp, req, &status)){
667                                         spin_unlock_irqrestore(&imp->imp_lock,
668                                                                flags);
669                                         continue;
670                                 } 
671
672                                 list_del_init(&req->rq_list);
673                                 if (status != 0)  {
674                                         req->rq_status = status;
675                                         req->rq_phase = RQ_PHASE_INTERPRET;
676                                         spin_unlock_irqrestore(&imp->imp_lock,
677                                                                flags);
678                                         GOTO(interpret, req->rq_status);
679                                 }
680                                 if (req->rq_no_resend) {
681                                         req->rq_status = -ENOTCONN;
682                                         req->rq_phase = RQ_PHASE_INTERPRET;
683                                         spin_unlock_irqrestore(&imp->imp_lock,
684                                                                flags);
685                                         GOTO(interpret, req->rq_status);
686                                 }
687                                 list_add_tail(&req->rq_list,
688                                               &imp->imp_sending_list);
689
690                                 spin_unlock_irqrestore(&imp->imp_lock, flags);
691
692                                 req->rq_waiting = 0;
693                                 if (req->rq_resend) {
694                                         lustre_msg_add_flags(req->rq_reqmsg,
695                                                              MSG_RESENT);
696                                         ptlrpc_unregister_reply(req);
697                                         if (req->rq_bulk) {
698                                                 __u64 old_xid = req->rq_xid;
699
700                                                 ptlrpc_unregister_bulk (req);
701
702                                                 /* ensure previous bulk fails */
703                                                 req->rq_xid = ptlrpc_next_xid();
704                                                 CDEBUG(D_HA, "resend bulk "
705                                                        "old x"LPU64
706                                                        " new x"LPU64"\n",
707                                                        old_xid, req->rq_xid);
708                                         }
709                                 }
710
711                                 rc = ptl_send_rpc(req);
712                                 if (rc) {
713                                         DEBUG_REQ(D_HA, req, "send failed (%d)",
714                                                   rc);
715                                         force_timer_recalc = 1;
716                                         req->rq_net_err = 1;
717                                 }
718                                 /* need to reset the timeout */
719                                 force_timer_recalc = 1;
720                         }
721
722                         /* Still waiting for a reply? */
723                         if (ptlrpc_client_receiving_reply(req))
724                                 continue;
725
726                         /* Did we actually receive a reply? */
727                         if (!ptlrpc_client_replied(req))
728                                 continue;
729
730                         spin_lock_irqsave(&imp->imp_lock, flags);
731                         list_del_init(&req->rq_list);
732                         spin_unlock_irqrestore(&imp->imp_lock, flags);
733
734                         req->rq_status = after_reply(req);
735                         if (req->rq_resend) {
736                                 /* Add this req to the delayed list so
737                                    it can be errored if the import is
738                                    evicted after recovery. */
739                                 spin_lock_irqsave (&req->rq_lock, flags);
740                                 list_add_tail(&req->rq_list, 
741                                               &imp->imp_delayed_list);
742                                 spin_unlock_irqrestore(&req->rq_lock, flags);
743                                 continue;
744                         }
745
746                         /* If there is no bulk associated with this request,
747                          * then we're done and should let the interpreter
748                          * process the reply.  Similarly if the RPC returned
749                          * an error, and therefore the bulk will never arrive.
750                          */
751                         if (req->rq_bulk == NULL || req->rq_status != 0) {
752                                 req->rq_phase = RQ_PHASE_INTERPRET;
753                                 GOTO(interpret, req->rq_status);
754                         }
755
756                         req->rq_phase = RQ_PHASE_BULK;
757                 }
758
759                 LASSERT(req->rq_phase == RQ_PHASE_BULK);
760                 if (ptlrpc_bulk_active(req->rq_bulk))
761                         continue;
762
763                 if (!req->rq_bulk->bd_success) {
764                         /* The RPC reply arrived OK, but the bulk screwed
765                          * up!  Dead wierd since the server told us the RPC
766                          * was good after getting the REPLY for her GET or
767                          * the ACK for her PUT. */
768                         DEBUG_REQ(D_ERROR, req, "bulk transfer failed");
769                         LBUG();
770                 }
771
772                 req->rq_phase = RQ_PHASE_INTERPRET;
773
774         interpret:
775                 LASSERT(req->rq_phase == RQ_PHASE_INTERPRET);
776                 LASSERT(!req->rq_receiving_reply);
777
778                 ptlrpc_unregister_reply(req);
779                 if (req->rq_bulk != NULL)
780                         ptlrpc_unregister_bulk (req);
781
782                 req->rq_phase = RQ_PHASE_COMPLETE;
783
784                 if (req->rq_interpret_reply != NULL) {
785                         int (*interpreter)(struct ptlrpc_request *,void *,int) =
786                                 req->rq_interpret_reply;
787                         req->rq_status = interpreter(req, &req->rq_async_args,
788                                                      req->rq_status);
789                 }
790
791                 CDEBUG(D_RPCTRACE, "Completed RPC pname:cluuid:pid:xid:ni:nid:"
792                        "opc %s:%s:%d:"LPU64":%s:%s:%d\n", current->comm,
793                        imp->imp_obd->obd_uuid.uuid, req->rq_reqmsg->status,
794                        req->rq_xid,
795                        imp->imp_connection->c_peer.peer_ni->pni_name,
796                        ptlrpc_peernid2str(&imp->imp_connection->c_peer, str),
797                        req->rq_reqmsg->opc);
798
799                 set->set_remaining--;
800
801                 atomic_dec(&imp->imp_inflight);
802                 wake_up(&imp->imp_recovery_waitq);
803         }
804
805         /* If we hit an error, we want to recover promptly. */
806         RETURN(set->set_remaining == 0 || force_timer_recalc);
807 }
808
809 int ptlrpc_expire_one_request(struct ptlrpc_request *req)
810 {
811         unsigned long      flags;
812         struct obd_import *imp = req->rq_import;
813         ENTRY;
814
815         DEBUG_REQ(D_ERROR, req, "timeout");
816
817         spin_lock_irqsave (&req->rq_lock, flags);
818         req->rq_timedout = 1;
819         spin_unlock_irqrestore (&req->rq_lock, flags);
820
821         ptlrpc_unregister_reply (req);
822
823         if (req->rq_bulk != NULL)
824                 ptlrpc_unregister_bulk (req);
825
826         if (imp == NULL) {
827                 DEBUG_REQ(D_HA, req, "NULL import: already cleaned up?");
828                 RETURN(1);
829         }
830
831         /* The DLM server doesn't want recovery run on its imports. */
832         if (imp->imp_dlm_fake)
833                 RETURN(1);
834
835         /* If this request is for recovery or other primordial tasks,
836          * then error it out here. */
837         if (req->rq_send_state != LUSTRE_IMP_FULL || 
838             imp->imp_obd->obd_no_recov) {
839                 spin_lock_irqsave (&req->rq_lock, flags);
840                 req->rq_status = -ETIMEDOUT;
841                 req->rq_err = 1;
842                 spin_unlock_irqrestore (&req->rq_lock, flags);
843                 RETURN(1);
844         }
845
846         ptlrpc_fail_import(imp, req->rq_import_generation);
847
848         RETURN(0);
849 }
850
851 int ptlrpc_expired_set(void *data)
852 {
853         struct ptlrpc_request_set *set = data;
854         struct list_head          *tmp;
855         time_t                     now = LTIME_S (CURRENT_TIME);
856         ENTRY;
857
858         LASSERT(set != NULL);
859
860         /* A timeout expired; see which reqs it applies to... */
861         list_for_each (tmp, &set->set_requests) {
862                 struct ptlrpc_request *req =
863                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
864
865                 /* request in-flight? */
866                 if (!((req->rq_phase == RQ_PHASE_RPC && !req->rq_waiting 
867                        && !req->rq_resend) ||
868                       (req->rq_phase == RQ_PHASE_BULK)))
869                         continue;
870
871                 if (req->rq_timedout ||           /* already dealt with */
872                     req->rq_sent + req->rq_timeout > now) /* not expired */
873                         continue;
874
875                 /* deal with this guy */
876                 ptlrpc_expire_one_request (req);
877         }
878
879         /* When waiting for a whole set, we always to break out of the
880          * sleep so we can recalculate the timeout, or enable interrupts
881          * iff everyone's timed out.
882          */
883         RETURN(1);
884 }
885
886 void ptlrpc_mark_interrupted(struct ptlrpc_request *req)
887 {
888         unsigned long flags;
889         spin_lock_irqsave(&req->rq_lock, flags);
890         req->rq_intr = 1;
891         spin_unlock_irqrestore(&req->rq_lock, flags);
892 }
893
894 void ptlrpc_interrupted_set(void *data)
895 {
896         struct ptlrpc_request_set *set = data;
897         struct list_head *tmp;
898
899         LASSERT(set != NULL);
900         CERROR("INTERRUPTED SET %p\n", set);
901
902         list_for_each(tmp, &set->set_requests) {
903                 struct ptlrpc_request *req =
904                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
905
906                 if (req->rq_phase != RQ_PHASE_RPC)
907                         continue;
908
909                 ptlrpc_mark_interrupted(req);
910         }
911 }
912
913 int ptlrpc_set_next_timeout(struct ptlrpc_request_set *set)
914 {
915         struct list_head      *tmp;
916         time_t                 now = LTIME_S(CURRENT_TIME);
917         time_t                 deadline;
918         int                    timeout = 0;
919         struct ptlrpc_request *req;
920         ENTRY;
921
922         SIGNAL_MASK_ASSERT(); /* XXX BUG 1511 */
923
924         list_for_each(tmp, &set->set_requests) {
925                 req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
926
927                 /* request in-flight? */
928                 if (!((req->rq_phase == RQ_PHASE_RPC && !req->rq_waiting) ||
929                       (req->rq_phase == RQ_PHASE_BULK)))
930                         continue;
931
932                 if (req->rq_timedout)   /* already timed out */
933                         continue;
934
935                 deadline = req->rq_sent + req->rq_timeout;
936                 if (deadline <= now)    /* actually expired already */
937                         timeout = 1;    /* ASAP */
938                 else if (timeout == 0 || timeout > deadline - now)
939                         timeout = deadline - now;
940         }
941         RETURN(timeout);
942 }
943                 
944
945 int ptlrpc_set_wait(struct ptlrpc_request_set *set)
946 {
947         struct list_head      *tmp;
948         struct ptlrpc_request *req;
949         struct l_wait_info     lwi;
950         int                    rc, timeout;
951         ENTRY;
952
953         LASSERT(!list_empty(&set->set_requests));
954         list_for_each(tmp, &set->set_requests) {
955                 req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
956                 if (req->rq_phase == RQ_PHASE_NEW)
957                         (void)ptlrpc_send_new_req(req);
958         }
959
960         do {
961                 timeout = ptlrpc_set_next_timeout(set);
962
963                 /* wait until all complete, interrupted, or an in-flight
964                  * req times out */
965                 CDEBUG(D_HA, "set %p going to sleep for %d seconds\n",
966                        set, timeout);
967                 lwi = LWI_TIMEOUT_INTR((timeout ? timeout : 1) * HZ,
968                                        ptlrpc_expired_set, 
969                                        ptlrpc_interrupted_set, set);
970                 rc = l_wait_event(set->set_waitq, ptlrpc_check_set(set), &lwi);
971
972                 LASSERT(rc == 0 || rc == -EINTR || rc == -ETIMEDOUT);
973
974                 /* -EINTR => all requests have been flagged rq_intr so next
975                  * check completes.
976                  * -ETIMEOUTD => someone timed out.  When all reqs have
977                  * timed out, signals are enabled allowing completion with
978                  * EINTR.
979                  * I don't really care if we go once more round the loop in
980                  * the error cases -eeb. */
981         } while (rc != 0);
982
983         LASSERT(set->set_remaining == 0);
984
985         rc = 0;
986         list_for_each(tmp, &set->set_requests) {
987                 req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
988
989                 LASSERT(req->rq_phase == RQ_PHASE_COMPLETE);
990                 if (req->rq_status != 0)
991                         rc = req->rq_status;
992         }
993
994         if (set->set_interpret != NULL) {
995                 int (*interpreter)(struct ptlrpc_request_set *set,void *,int) =
996                         set->set_interpret;
997                 rc = interpreter (set, &set->set_args, rc);
998         }
999
1000         RETURN(rc);
1001 }
1002
1003 static void __ptlrpc_free_req(struct ptlrpc_request *request, int locked)
1004 {
1005         ENTRY;
1006         if (request == NULL) {
1007                 EXIT;
1008                 return;
1009         }
1010
1011         LASSERTF(!request->rq_receiving_reply, "req %p\n", request);
1012         LASSERTF(request->rq_rqbd == NULL, "req %p\n",request);/* client-side */
1013         LASSERTF(list_empty(&request->rq_list), "req %p\n", request);
1014         LASSERTF(list_empty(&request->rq_set_chain), "req %p\n", request);
1015
1016         /* We must take it off the imp_replay_list first.  Otherwise, we'll set
1017          * request->rq_reqmsg to NULL while osc_close is dereferencing it. */
1018         if (request->rq_import != NULL) {
1019                 unsigned long flags = 0;
1020                 if (!locked)
1021                         spin_lock_irqsave(&request->rq_import->imp_lock, flags);
1022                 list_del_init(&request->rq_replay_list);
1023                 if (!locked)
1024                         spin_unlock_irqrestore(&request->rq_import->imp_lock,
1025                                                flags);
1026         }
1027         LASSERTF(list_empty(&request->rq_replay_list), "req %p\n", request);
1028
1029         if (atomic_read(&request->rq_refcount) != 0) {
1030                 DEBUG_REQ(D_ERROR, request,
1031                           "freeing request with nonzero refcount");
1032                 LBUG();
1033         }
1034
1035         if (request->rq_repmsg != NULL) {
1036                 OBD_FREE(request->rq_repmsg, request->rq_replen);
1037                 request->rq_repmsg = NULL;
1038         }
1039         if (request->rq_reqmsg != NULL) {
1040                 OBD_FREE(request->rq_reqmsg, request->rq_reqlen);
1041                 request->rq_reqmsg = NULL;
1042         }
1043         if (request->rq_export != NULL) {
1044                 class_export_put(request->rq_export);
1045                 request->rq_export = NULL;
1046         }
1047         if (request->rq_import != NULL) {
1048                 class_import_put(request->rq_import);
1049                 request->rq_import = NULL;
1050         }
1051         if (request->rq_bulk != NULL)
1052                 ptlrpc_free_bulk(request->rq_bulk);
1053
1054         OBD_FREE(request, sizeof(*request));
1055         EXIT;
1056 }
1057
1058 void ptlrpc_free_req(struct ptlrpc_request *request)
1059 {
1060         __ptlrpc_free_req(request, 0);
1061 }
1062
1063 static int __ptlrpc_req_finished(struct ptlrpc_request *request, int locked);
1064 void ptlrpc_req_finished_with_imp_lock(struct ptlrpc_request *request)
1065 {
1066         LASSERT_SPIN_LOCKED(&request->rq_import->imp_lock);
1067         (void)__ptlrpc_req_finished(request, 1);
1068 }
1069
1070 static int __ptlrpc_req_finished(struct ptlrpc_request *request, int locked)
1071 {
1072         ENTRY;
1073         if (request == NULL)
1074                 RETURN(1);
1075
1076         if (request == LP_POISON ||
1077             request->rq_reqmsg == LP_POISON) {
1078                 CERROR("dereferencing freed request (bug 575)\n");
1079                 LBUG();
1080                 RETURN(1);
1081         }
1082
1083         DEBUG_REQ(D_INFO, request, "refcount now %u",
1084                   atomic_read(&request->rq_refcount) - 1);
1085
1086         if (atomic_dec_and_test(&request->rq_refcount)) {
1087                 __ptlrpc_free_req(request, locked);
1088                 RETURN(1);
1089         }
1090
1091         RETURN(0);
1092 }
1093
1094 void ptlrpc_req_finished(struct ptlrpc_request *request)
1095 {
1096         __ptlrpc_req_finished(request, 0);
1097 }
1098
1099 /* Disengage the client's reply buffer from the network
1100  * NB does _NOT_ unregister any client-side bulk.
1101  * IDEMPOTENT, but _not_ safe against concurrent callers.
1102  * The request owner (i.e. the thread doing the I/O) must call...
1103  */
1104 void ptlrpc_unregister_reply (struct ptlrpc_request *request)
1105 {
1106         int                rc;
1107         wait_queue_head_t *wq;
1108         struct l_wait_info lwi;
1109
1110         LASSERT(!in_interrupt ());             /* might sleep */
1111
1112         if (!ptlrpc_client_receiving_reply(request))
1113                 return;
1114
1115         PtlMDUnlink (request->rq_reply_md_h);
1116
1117         /* We have to l_wait_event() whatever the result, to give liblustre
1118          * a chance to run reply_in_callback() */
1119
1120         if (request->rq_set == NULL)
1121                 wq = &request->rq_set->set_waitq;
1122         else
1123                 wq = &request->rq_reply_waitq;
1124
1125         for (;;) {
1126                 /* Network access will complete in finite time but the HUGE
1127                  * timeout lets us CWARN for visibility of sluggish NALs */
1128                 lwi = LWI_TIMEOUT(300 * HZ, NULL, NULL);
1129                 rc = l_wait_event (*wq, !ptlrpc_client_receiving_reply(request), &lwi);
1130                 if (rc == 0)
1131                         return;
1132
1133                 LASSERT (rc == -ETIMEDOUT);
1134                 DEBUG_REQ(D_WARNING, request, "Unexpectedly long timeout");
1135         }
1136 }
1137
1138 /* caller must hold imp->imp_lock */
1139 void ptlrpc_free_committed(struct obd_import *imp)
1140 {
1141         struct list_head *tmp, *saved;
1142         struct ptlrpc_request *req;
1143         struct ptlrpc_request *last_req = NULL; /* temporary fire escape */
1144         ENTRY;
1145
1146         LASSERT(imp != NULL);
1147
1148         LASSERT_SPIN_LOCKED(&imp->imp_lock);
1149
1150         CDEBUG(D_HA, "%s: committing for last_committed "LPU64"\n",
1151                imp->imp_obd->obd_name, imp->imp_peer_committed_transno);
1152
1153         list_for_each_safe(tmp, saved, &imp->imp_replay_list) {
1154                 req = list_entry(tmp, struct ptlrpc_request, rq_replay_list);
1155
1156                 /* XXX ok to remove when 1357 resolved - rread 05/29/03  */
1157                 LASSERT(req != last_req);
1158                 last_req = req;
1159
1160                 if (req->rq_import_generation < imp->imp_generation) {
1161                         DEBUG_REQ(D_HA, req, "freeing request with old gen");
1162                         GOTO(free_req, 0);
1163                 }
1164
1165                 if (req->rq_replay) {
1166                         DEBUG_REQ(D_HA, req, "keeping (FL_REPLAY)");
1167                         continue;
1168                 }
1169
1170                 /* not yet committed */
1171                 if (req->rq_transno > imp->imp_peer_committed_transno) {
1172                         DEBUG_REQ(D_HA, req, "stopping search");
1173                         break;
1174                 }
1175
1176                 DEBUG_REQ(D_HA, req, "committing (last_committed "LPU64")",
1177                           imp->imp_peer_committed_transno);
1178 free_req:
1179                 if (req->rq_commit_cb != NULL)
1180                         req->rq_commit_cb(req);
1181                 list_del_init(&req->rq_replay_list);
1182                 __ptlrpc_req_finished(req, 1);
1183         }
1184
1185         EXIT;
1186         return;
1187 }
1188
1189 void ptlrpc_cleanup_client(struct obd_import *imp)
1190 {
1191         ENTRY;
1192         EXIT;
1193         return;
1194 }
1195
1196 void ptlrpc_resend_req(struct ptlrpc_request *req)
1197 {
1198         unsigned long flags;
1199
1200         DEBUG_REQ(D_HA, req, "going to resend");
1201         req->rq_reqmsg->handle.cookie = 0;
1202         req->rq_status = -EAGAIN;
1203
1204         spin_lock_irqsave (&req->rq_lock, flags);
1205         req->rq_resend = 1;
1206         req->rq_net_err = 0;
1207         req->rq_timedout = 0;
1208         if (req->rq_bulk) {
1209                 __u64 old_xid = req->rq_xid;
1210                 
1211                 /* ensure previous bulk fails */
1212                 req->rq_xid = ptlrpc_next_xid();
1213                 CDEBUG(D_HA, "resend bulk old x"LPU64" new x"LPU64"\n",
1214                        old_xid, req->rq_xid);
1215         }
1216         ptlrpc_wake_client_req(req);
1217         spin_unlock_irqrestore (&req->rq_lock, flags);
1218 }
1219
1220 /* XXX: this function and rq_status are currently unused */
1221 void ptlrpc_restart_req(struct ptlrpc_request *req)
1222 {
1223         unsigned long flags;
1224
1225         DEBUG_REQ(D_HA, req, "restarting (possibly-)completed request");
1226         req->rq_status = -ERESTARTSYS;
1227
1228         spin_lock_irqsave (&req->rq_lock, flags);
1229         req->rq_restart = 1;
1230         req->rq_timedout = 0;
1231         ptlrpc_wake_client_req(req);
1232         spin_unlock_irqrestore (&req->rq_lock, flags);
1233 }
1234
1235 static int expired_request(void *data)
1236 {
1237         struct ptlrpc_request *req = data;
1238         ENTRY;
1239
1240         RETURN(ptlrpc_expire_one_request(req));
1241 }
1242
1243 static void interrupted_request(void *data)
1244 {
1245         unsigned long flags;
1246
1247         struct ptlrpc_request *req = data;
1248         DEBUG_REQ(D_HA, req, "request interrupted");
1249         spin_lock_irqsave (&req->rq_lock, flags);
1250         req->rq_intr = 1;
1251         spin_unlock_irqrestore (&req->rq_lock, flags);
1252 }
1253
1254 struct ptlrpc_request *ptlrpc_request_addref(struct ptlrpc_request *req)
1255 {
1256         ENTRY;
1257         atomic_inc(&req->rq_refcount);
1258         RETURN(req);
1259 }
1260
1261 void ptlrpc_retain_replayable_request(struct ptlrpc_request *req,
1262                                       struct obd_import *imp)
1263 {
1264         struct list_head *tmp;
1265
1266         LASSERT_SPIN_LOCKED(&imp->imp_lock);
1267
1268         /* don't re-add requests that have been replayed */
1269         if (!list_empty(&req->rq_replay_list))
1270                 return;
1271
1272         lustre_msg_add_flags(req->rq_reqmsg,
1273                              MSG_REPLAY);
1274
1275         LASSERT(imp->imp_replayable);
1276         /* Balanced in ptlrpc_free_committed, usually. */
1277         ptlrpc_request_addref(req);
1278         list_for_each_prev(tmp, &imp->imp_replay_list) {
1279                 struct ptlrpc_request *iter =
1280                         list_entry(tmp, struct ptlrpc_request, rq_replay_list);
1281
1282                 /* We may have duplicate transnos if we create and then
1283                  * open a file, or for closes retained if to match creating
1284                  * opens, so use req->rq_xid as a secondary key.
1285                  * (See bugs 684, 685, and 428.)
1286                  * XXX no longer needed, but all opens need transnos!
1287                  */
1288                 if (iter->rq_transno > req->rq_transno)
1289                         continue;
1290
1291                 if (iter->rq_transno == req->rq_transno) {
1292                         LASSERT(iter->rq_xid != req->rq_xid);
1293                         if (iter->rq_xid > req->rq_xid)
1294                                 continue;
1295                 }
1296
1297                 list_add(&req->rq_replay_list, &iter->rq_replay_list);
1298                 return;
1299         }
1300
1301         list_add_tail(&req->rq_replay_list, &imp->imp_replay_list);
1302 }
1303
1304 int ptlrpc_queue_wait(struct ptlrpc_request *req)
1305 {
1306         char str[PTL_NALFMT_SIZE];
1307         int rc = 0;
1308         int brc;
1309         struct l_wait_info lwi;
1310         struct obd_import *imp = req->rq_import;
1311         unsigned long flags;
1312         int timeout = 0;
1313         ENTRY;
1314
1315         LASSERT(req->rq_set == NULL);
1316         LASSERT(!req->rq_receiving_reply);
1317         atomic_inc(&imp->imp_inflight);
1318
1319         /* for distributed debugging */
1320         req->rq_reqmsg->status = current->pid;
1321         LASSERT(imp->imp_obd != NULL);
1322         CDEBUG(D_RPCTRACE, "Sending RPC pname:cluuid:pid:xid:ni:nid:opc "
1323                "%s:%s:%d:"LPU64":%s:%s:%d\n", current->comm,
1324                imp->imp_obd->obd_uuid.uuid,
1325                req->rq_reqmsg->status, req->rq_xid,
1326                imp->imp_connection->c_peer.peer_ni->pni_name,
1327                ptlrpc_peernid2str(&imp->imp_connection->c_peer, str),
1328                req->rq_reqmsg->opc);
1329
1330         /* Mark phase here for a little debug help */
1331         req->rq_phase = RQ_PHASE_RPC;
1332
1333         spin_lock_irqsave(&imp->imp_lock, flags);
1334         req->rq_import_generation = imp->imp_generation;
1335 restart:
1336         if (ptlrpc_import_delay_req(imp, req, &rc)) {
1337                 list_del(&req->rq_list);
1338
1339                 list_add_tail(&req->rq_list, &imp->imp_delayed_list);
1340                 spin_unlock_irqrestore(&imp->imp_lock, flags);
1341
1342                 DEBUG_REQ(D_HA, req, "\"%s\" waiting for recovery: (%s != %s)",
1343                           current->comm, 
1344                           ptlrpc_import_state_name(req->rq_send_state), 
1345                           ptlrpc_import_state_name(imp->imp_state));
1346                 lwi = LWI_INTR(interrupted_request, req);
1347                 rc = l_wait_event(req->rq_reply_waitq,
1348                                   (req->rq_send_state == imp->imp_state ||
1349                                    req->rq_err),
1350                                   &lwi);
1351                 DEBUG_REQ(D_HA, req, "\"%s\" awake: (%s == %s or %d == 1)",
1352                           current->comm, 
1353                           ptlrpc_import_state_name(imp->imp_state), 
1354                           ptlrpc_import_state_name(req->rq_send_state),
1355                           req->rq_err);
1356
1357                 spin_lock_irqsave(&imp->imp_lock, flags);
1358                 list_del_init(&req->rq_list);
1359
1360                 if (req->rq_err) {
1361                         rc = -EIO;
1362                 } 
1363                 else if (req->rq_intr) {
1364                         rc = -EINTR;
1365                 }
1366                 else if (req->rq_no_resend) {
1367                         spin_unlock_irqrestore(&imp->imp_lock, flags);
1368                         GOTO(out, rc = -ETIMEDOUT);
1369                 }
1370                 else {
1371                         GOTO(restart, rc);
1372                 }
1373         } 
1374
1375         if (rc != 0) {
1376                 list_del_init(&req->rq_list);
1377                 spin_unlock_irqrestore(&imp->imp_lock, flags);
1378                 req->rq_status = rc; // XXX this ok?
1379                 GOTO(out, rc);
1380         }
1381
1382         if (req->rq_resend) {
1383                 lustre_msg_add_flags(req->rq_reqmsg, MSG_RESENT);
1384
1385                 if (req->rq_bulk != NULL)
1386                         ptlrpc_unregister_bulk (req);
1387
1388                 DEBUG_REQ(D_HA, req, "resending: ");
1389         }
1390
1391         /* XXX this is the same as ptlrpc_set_wait */
1392         LASSERT(list_empty(&req->rq_list));
1393         list_add_tail(&req->rq_list, &imp->imp_sending_list);
1394         spin_unlock_irqrestore(&imp->imp_lock, flags);
1395
1396         rc = ptl_send_rpc(req);
1397         if (rc) {
1398                 DEBUG_REQ(D_HA, req, "send failed (%d); recovering", rc);
1399                 timeout = 1;
1400         } else {
1401                 timeout = MAX(req->rq_timeout * HZ, 1);
1402                 DEBUG_REQ(D_NET, req, "-- sleeping");
1403         }
1404         lwi = LWI_TIMEOUT_INTR(timeout, expired_request, interrupted_request,
1405                                req);
1406         l_wait_event(req->rq_reply_waitq, ptlrpc_check_reply(req), &lwi);
1407         DEBUG_REQ(D_NET, req, "-- done sleeping");
1408
1409         CDEBUG(D_RPCTRACE, "Completed RPC pname:cluuid:pid:xid:ni:nid:opc "
1410                "%s:%s:%d:"LPU64":%s:%s:%d\n", current->comm,
1411                imp->imp_obd->obd_uuid.uuid,
1412                req->rq_reqmsg->status, req->rq_xid,
1413                imp->imp_connection->c_peer.peer_ni->pni_name,
1414                ptlrpc_peernid2str(&imp->imp_connection->c_peer, str),
1415                req->rq_reqmsg->opc);
1416
1417         spin_lock_irqsave(&imp->imp_lock, flags);
1418         list_del_init(&req->rq_list);
1419         spin_unlock_irqrestore(&imp->imp_lock, flags);
1420
1421         /* If the reply was received normally, this just grabs the spinlock
1422          * (ensuring the reply callback has returned), sees that
1423          * req->rq_receiving_reply is clear and returns. */
1424         ptlrpc_unregister_reply (req);
1425
1426         if (req->rq_err)
1427                 GOTO(out, rc = -EIO);
1428
1429         /* Resend if we need to, unless we were interrupted. */
1430         if (req->rq_resend && !req->rq_intr) {
1431                 /* ...unless we were specifically told otherwise. */
1432                 if (req->rq_no_resend)
1433                         GOTO(out, rc = -ETIMEDOUT);
1434                 spin_lock_irqsave(&imp->imp_lock, flags);
1435                 goto restart;
1436         }
1437
1438         if (req->rq_intr) {
1439                 /* Should only be interrupted if we timed out. */
1440                 if (!req->rq_timedout)
1441                         DEBUG_REQ(D_ERROR, req,
1442                                   "rq_intr set but rq_timedout not");
1443                 GOTO(out, rc = -EINTR);
1444         }
1445
1446         if (req->rq_timedout) {                 /* non-recoverable timeout */
1447                 GOTO(out, rc = -ETIMEDOUT);
1448         }
1449
1450         if (!req->rq_replied) {
1451                 /* How can this be? -eeb */
1452                 DEBUG_REQ(D_ERROR, req, "!rq_replied: ");
1453                 LBUG();
1454                 GOTO(out, rc = req->rq_status);
1455         }
1456
1457         rc = after_reply (req);
1458         /* NB may return +ve success rc */
1459         if (req->rq_resend) {
1460                 spin_lock_irqsave(&imp->imp_lock, flags);
1461                 goto restart;
1462         }
1463
1464  out:
1465         if (req->rq_bulk != NULL) {
1466                 if (rc >= 0) {                  
1467                         /* success so far.  Note that anything going wrong
1468                          * with bulk now, is EXTREMELY strange, since the
1469                          * server must have believed that the bulk
1470                          * tranferred OK before she replied with success to
1471                          * me. */
1472                         lwi = LWI_TIMEOUT(timeout, NULL, NULL);
1473                         brc = l_wait_event(req->rq_reply_waitq,
1474                                            !ptlrpc_bulk_active(req->rq_bulk),
1475                                            &lwi);
1476                         LASSERT(brc == 0 || brc == -ETIMEDOUT);
1477                         if (brc != 0) {
1478                                 LASSERT(brc == -ETIMEDOUT);
1479                                 DEBUG_REQ(D_ERROR, req, "bulk timed out");
1480                                 rc = brc;
1481                         } else if (!req->rq_bulk->bd_success) {
1482                                 DEBUG_REQ(D_ERROR, req, "bulk transfer failed");
1483                                 rc = -EIO;
1484                         }
1485                 }
1486                 if (rc < 0)
1487                         ptlrpc_unregister_bulk (req);
1488         }
1489
1490         LASSERT(!req->rq_receiving_reply);
1491         req->rq_phase = RQ_PHASE_INTERPRET;
1492
1493         atomic_dec(&imp->imp_inflight);
1494         wake_up(&imp->imp_recovery_waitq);
1495         RETURN(rc);
1496 }
1497
1498 struct ptlrpc_replay_async_args {
1499         int praa_old_state;
1500         int praa_old_status;
1501 };
1502
1503 static int ptlrpc_replay_interpret(struct ptlrpc_request *req,
1504                                     void * data, int rc)
1505 {
1506         struct ptlrpc_replay_async_args *aa = data;
1507         struct obd_import *imp = req->rq_import;
1508         unsigned long flags;
1509
1510         atomic_dec(&imp->imp_replay_inflight);
1511         
1512         if (!req->rq_replied) {
1513                 CERROR("request replay timed out, restarting recovery\n");
1514                 GOTO(out, rc = -ETIMEDOUT);
1515         }
1516
1517 #if SWAB_PARANOIA
1518         /* Clear reply swab mask; this is a new reply in sender's byte order */
1519         req->rq_rep_swab_mask = 0;
1520 #endif
1521         LASSERT (req->rq_nob_received <= req->rq_replen);
1522         rc = lustre_unpack_msg(req->rq_repmsg, req->rq_nob_received);
1523         if (rc) {
1524                 CERROR("unpack_rep failed: %d\n", rc);
1525                 GOTO(out, rc = -EPROTO);
1526         }
1527
1528         if (req->rq_repmsg->type == PTL_RPC_MSG_ERR && 
1529             req->rq_repmsg->status == -ENOTCONN) 
1530                 GOTO(out, rc = req->rq_repmsg->status);
1531
1532         /* The transno had better not change over replay. */
1533         LASSERT(req->rq_reqmsg->transno == req->rq_repmsg->transno);
1534
1535         DEBUG_REQ(D_HA, req, "got rep");
1536
1537         /* let the callback do fixups, possibly including in the request */
1538         if (req->rq_replay_cb)
1539                 req->rq_replay_cb(req);
1540
1541         if (req->rq_replied && req->rq_repmsg->status != aa->praa_old_status) {
1542                 DEBUG_REQ(D_ERROR, req, "status %d, old was %d",
1543                           req->rq_repmsg->status, aa->praa_old_status);
1544         } else {
1545                 /* Put it back for re-replay. */
1546                 req->rq_repmsg->status = aa->praa_old_status;
1547         }
1548
1549         spin_lock_irqsave(&imp->imp_lock, flags);
1550         imp->imp_last_replay_transno = req->rq_transno;
1551         spin_unlock_irqrestore(&imp->imp_lock, flags);
1552
1553         /* continue with recovery */
1554         rc = ptlrpc_import_recovery_state_machine(imp);
1555  out:
1556         req->rq_send_state = aa->praa_old_state;
1557         
1558         if (rc != 0)
1559                 /* this replay failed, so restart recovery */
1560                 ptlrpc_connect_import(imp, NULL);
1561
1562         RETURN(rc);
1563 }
1564
1565
1566 int ptlrpc_replay_req(struct ptlrpc_request *req)
1567 {
1568         struct ptlrpc_replay_async_args *aa;
1569         ENTRY;
1570
1571         LASSERT(req->rq_import->imp_state == LUSTRE_IMP_REPLAY);
1572
1573         /* Not handling automatic bulk replay yet (or ever?) */
1574         LASSERT(req->rq_bulk == NULL);
1575
1576         DEBUG_REQ(D_HA, req, "REPLAY");
1577
1578         LASSERT (sizeof (*aa) <= sizeof (req->rq_async_args));
1579         aa = (struct ptlrpc_replay_async_args *)&req->rq_async_args;
1580         memset(aa, 0, sizeof *aa);
1581
1582         /* Prepare request to be resent with ptlrpcd */
1583         aa->praa_old_state = req->rq_send_state;
1584         req->rq_send_state = LUSTRE_IMP_REPLAY;
1585         req->rq_phase = RQ_PHASE_NEW;
1586         /*
1587          * Q: "How can a req get on the replay list if it wasn't replied?"
1588          * A: "If we failed during the replay of this request, it will still
1589          *     be on the list, but rq_replied will have been reset to 0."
1590          */
1591         if (req->rq_replied) {
1592                 aa->praa_old_status = req->rq_repmsg->status;
1593                 req->rq_status = 0;
1594                 req->rq_replied = 0;
1595         }
1596
1597         req->rq_interpret_reply = ptlrpc_replay_interpret;
1598         atomic_inc(&req->rq_import->imp_replay_inflight);
1599         ptlrpc_request_addref(req); /* ptlrpcd needs a ref */
1600
1601         ptlrpcd_add_req(req);
1602         RETURN(0);
1603 }
1604
1605 void ptlrpc_abort_inflight(struct obd_import *imp)
1606 {
1607         unsigned long flags;
1608         struct list_head *tmp, *n;
1609         ENTRY;
1610
1611         /* Make sure that no new requests get processed for this import.
1612          * ptlrpc_{queue,set}_wait must (and does) hold imp_lock while testing
1613          * this flag and then putting requests on sending_list or delayed_list.
1614          */
1615         spin_lock_irqsave(&imp->imp_lock, flags);
1616
1617         /* XXX locking?  Maybe we should remove each request with the list
1618          * locked?  Also, how do we know if the requests on the list are
1619          * being freed at this time?
1620          */
1621         list_for_each_safe(tmp, n, &imp->imp_sending_list) {
1622                 struct ptlrpc_request *req =
1623                         list_entry(tmp, struct ptlrpc_request, rq_list);
1624
1625                 DEBUG_REQ(D_HA, req, "inflight");
1626
1627                 spin_lock (&req->rq_lock);
1628                 if (req->rq_import_generation < imp->imp_generation) {
1629                         req->rq_err = 1;
1630                         ptlrpc_wake_client_req(req);
1631                 }
1632                 spin_unlock (&req->rq_lock);
1633         }
1634
1635         list_for_each_safe(tmp, n, &imp->imp_delayed_list) {
1636                 struct ptlrpc_request *req =
1637                         list_entry(tmp, struct ptlrpc_request, rq_list);
1638
1639                 DEBUG_REQ(D_HA, req, "aborting waiting req");
1640
1641                 spin_lock (&req->rq_lock);
1642                 if (req->rq_import_generation < imp->imp_generation) {
1643                         req->rq_err = 1;
1644                         ptlrpc_wake_client_req(req);
1645                 }
1646                 spin_unlock (&req->rq_lock);
1647         }
1648
1649         /* Last chance to free reqs left on the replay list, but we
1650          * will still leak reqs that haven't comitted.  */
1651         if (imp->imp_replayable)
1652                 ptlrpc_free_committed(imp);
1653
1654         spin_unlock_irqrestore(&imp->imp_lock, flags);
1655
1656         EXIT;
1657 }
1658
1659 static __u64 ptlrpc_last_xid = 0;
1660 static spinlock_t ptlrpc_last_xid_lock = SPIN_LOCK_UNLOCKED;
1661
1662 __u64 ptlrpc_next_xid(void)
1663 {
1664         __u64 tmp;
1665         spin_lock(&ptlrpc_last_xid_lock);
1666         tmp = ++ptlrpc_last_xid;
1667         spin_unlock(&ptlrpc_last_xid_lock);
1668         return tmp;
1669 }
1670
1671