Whamcloud - gitweb
land b_hd_sec on HEAD. various security fixes.
[fs/lustre-release.git] / lustre / ptlrpc / client.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  *
21  */
22
23 #define DEBUG_SUBSYSTEM S_RPC
24 #ifndef __KERNEL__
25 #include <errno.h>
26 #include <signal.h>
27 #include <liblustre.h>
28 #endif
29
30 #include <linux/obd_support.h>
31 #include <linux/obd_class.h>
32 #include <linux/lustre_lib.h>
33 #include <linux/lustre_ha.h>
34 #include <linux/lustre_import.h>
35 #include <linux/lustre_sec.h>
36
37 #include "ptlrpc_internal.h"
38
39 void ptlrpc_init_client(int req_portal, int rep_portal, char *name,
40                         struct ptlrpc_client *cl)
41 {
42         cl->cli_request_portal = req_portal;
43         cl->cli_reply_portal   = rep_portal;
44         cl->cli_name           = name;
45 }
46
47 struct ptlrpc_connection *ptlrpc_uuid_to_connection(struct obd_uuid *uuid)
48 {
49         struct ptlrpc_connection *c;
50         struct ptlrpc_peer peer;
51         int err;
52
53         err = ptlrpc_uuid_to_peer(uuid, &peer);
54         if (err != 0) {
55                 CERROR("cannot find peer %s!\n", uuid->uuid);
56                 return NULL;
57         }
58
59         c = ptlrpc_get_connection(&peer, uuid);
60         if (c) {
61                 memcpy(c->c_remote_uuid.uuid,
62                        uuid->uuid, sizeof(c->c_remote_uuid.uuid));
63         }
64
65         CDEBUG(D_INFO, "%s -> %p\n", uuid->uuid, c);
66
67         return c;
68 }
69
70 void ptlrpc_readdress_connection(struct ptlrpc_connection *conn,
71                                  struct obd_uuid *uuid)
72 {
73         struct ptlrpc_peer peer;
74         int err;
75
76         err = ptlrpc_uuid_to_peer(uuid, &peer);
77         if (err != 0) {
78                 CERROR("cannot find peer %s!\n", uuid->uuid);
79                 return;
80         }
81
82         memcpy(&conn->c_peer, &peer, sizeof (peer));
83         return;
84 }
85
86 static inline struct ptlrpc_bulk_desc *new_bulk(int npages, int type, int portal)
87 {
88         struct ptlrpc_bulk_desc *desc;
89
90         OBD_ALLOC(desc, offsetof (struct ptlrpc_bulk_desc, bd_iov[npages]));
91         if (!desc)
92                 return NULL;
93
94         spin_lock_init(&desc->bd_lock);
95         init_waitqueue_head(&desc->bd_waitq);
96         desc->bd_max_iov = npages;
97         desc->bd_iov_count = 0;
98         desc->bd_md_h = PTL_INVALID_HANDLE;
99         desc->bd_portal = portal;
100         desc->bd_type = type;
101         
102         return desc;
103 }
104
105 struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_imp (struct ptlrpc_request *req,
106                                                int npages, int type, int portal)
107 {
108         struct obd_import *imp = req->rq_import;
109         struct ptlrpc_bulk_desc *desc;
110
111         LASSERT(type == BULK_PUT_SINK || type == BULK_GET_SOURCE);
112         desc = new_bulk(npages, type, portal);
113         if (desc == NULL)
114                 RETURN(NULL);
115
116         desc->bd_import_generation = req->rq_import_generation;
117         desc->bd_import = class_import_get(imp);
118         desc->bd_req = req;
119
120         desc->bd_cbid.cbid_fn  = client_bulk_callback;
121         desc->bd_cbid.cbid_arg = desc;
122
123         /* This makes req own desc, and free it when she frees herself */
124         req->rq_bulk = desc;
125
126         return desc;
127 }
128
129 struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_exp (struct ptlrpc_request *req,
130                                                int npages, int type, int portal)
131 {
132         struct obd_export *exp = req->rq_export;
133         struct ptlrpc_bulk_desc *desc;
134
135         LASSERT(type == BULK_PUT_SOURCE || type == BULK_GET_SINK);
136
137         desc = new_bulk(npages, type, portal);
138         if (desc == NULL)
139                 RETURN(NULL);
140
141         desc->bd_export = class_export_get(exp);
142         desc->bd_req = req;
143
144         desc->bd_cbid.cbid_fn  = server_bulk_callback;
145         desc->bd_cbid.cbid_arg = desc;
146
147         /* NB we don't assign rq_bulk here; server-side requests are
148          * re-used, and the handler frees the bulk desc explicitly. */
149
150         return desc;
151 }
152
153 void ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc,
154                            struct page *page, int pageoffset, int len)
155 {
156         LASSERT(desc->bd_iov_count < desc->bd_max_iov);
157         LASSERT(page != NULL);
158         LASSERT(pageoffset >= 0);
159         LASSERT(len > 0);
160         LASSERT(pageoffset + len <= PAGE_SIZE);
161
162         desc->bd_nob += len;
163
164         ptlrpc_add_bulk_page(desc, page, pageoffset, len);
165 }
166
167 void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *desc)
168 {
169         ENTRY;
170
171         LASSERT(desc != NULL);
172         LASSERT(desc->bd_iov_count != LI_POISON); /* not freed already */
173         LASSERT(!desc->bd_network_rw);         /* network hands off or */
174         LASSERT((desc->bd_export != NULL) ^ (desc->bd_import != NULL));
175         if (desc->bd_export)
176                 class_export_put(desc->bd_export);
177         else
178                 class_import_put(desc->bd_import);
179
180         OBD_FREE(desc, offsetof(struct ptlrpc_bulk_desc, 
181                                 bd_iov[desc->bd_max_iov]));
182         EXIT;
183 }
184
185 /* FIXME prep_req now should return error code other than NULL. but
186  * this is called everywhere :(
187  */
188 struct ptlrpc_request *ptlrpc_prep_req(struct obd_import *imp, __u32 version,
189                                        int opcode, int count, int *lengths,
190                                        char **bufs)
191 {
192         struct ptlrpc_request *request;
193         int rc;
194         ENTRY;
195
196         LASSERT((unsigned long)imp > 0x1000);
197
198         OBD_ALLOC(request, sizeof(*request));
199         if (!request) {
200                 CERROR("request allocation out of memory\n");
201                 RETURN(NULL);
202         }
203
204         request->rq_import = class_import_get(imp);
205
206         rc = ptlrpcs_req_get_cred(request);
207         if (rc) {
208                 CDEBUG(D_SEC, "failed to get credential\n");
209                 GOTO(out_free, rc);
210         }
211
212         /* try to refresh the cred. we do this here in order to let fewer
213          * refresh be performed in ptlrpcd context (which might block ptlrpcd).
214          * fail out only if a fatal ptlrpcs error occured.
215          */
216         ptlrpcs_req_refresh_cred(request);
217         if (request->rq_ptlrpcs_err)
218                 GOTO(out_cred, rc = -EPERM);
219
220         /* set default sec flavor for this req. in the future we might need
221          * increase security strengh, e.g. AUTH -> PRIV
222          */
223         request->rq_req_secflvr = imp->imp_sec->ps_flavor;
224
225         rc = lustre_pack_request(request, count, lengths, bufs);
226         if (rc) {
227                 CERROR("cannot pack request %d\n", rc);
228                 GOTO(out_cred, rc);
229         }
230         request->rq_reqmsg->version |= version;
231
232         if (imp->imp_server_timeout)
233                 request->rq_timeout = obd_timeout / 2;
234         else
235                 request->rq_timeout = obd_timeout;
236
237         request->rq_send_state = LUSTRE_IMP_FULL;
238         request->rq_type = PTL_RPC_MSG_REQUEST;
239
240         request->rq_req_cbid.cbid_fn  = request_out_callback;
241         request->rq_req_cbid.cbid_arg = request;
242
243         request->rq_reply_cbid.cbid_fn  = reply_in_callback;
244         request->rq_reply_cbid.cbid_arg = request;
245         
246         request->rq_phase = RQ_PHASE_NEW;
247
248         /* XXX FIXME bug 249 */
249         request->rq_request_portal = imp->imp_client->cli_request_portal;
250         request->rq_reply_portal = imp->imp_client->cli_reply_portal;
251
252         spin_lock_init(&request->rq_lock);
253         INIT_LIST_HEAD(&request->rq_list);
254         INIT_LIST_HEAD(&request->rq_replay_list);
255         INIT_LIST_HEAD(&request->rq_set_chain);
256         init_waitqueue_head(&request->rq_reply_waitq);
257         request->rq_xid = ptlrpc_next_xid();
258         atomic_set(&request->rq_refcount, 1);
259
260         request->rq_reqmsg->opc = opcode;
261         request->rq_reqmsg->flags = 0;
262         RETURN(request);
263 out_cred:
264         ptlrpcs_req_drop_cred(request);
265 out_free:
266         class_import_put(imp);
267         OBD_FREE(request, sizeof(*request));
268         RETURN(NULL);
269 }
270
271 struct ptlrpc_request_set *ptlrpc_prep_set(void)
272 {
273         struct ptlrpc_request_set *set;
274
275         OBD_ALLOC(set, sizeof *set);
276         if (!set)
277                 RETURN(NULL);
278         INIT_LIST_HEAD(&set->set_requests);
279         init_waitqueue_head(&set->set_waitq);
280         set->set_remaining = 0;
281         spin_lock_init(&set->set_new_req_lock);
282         INIT_LIST_HEAD(&set->set_new_requests);
283
284         RETURN(set);
285 }
286
287 /* Finish with this set; opposite of prep_set. */
288 void ptlrpc_set_destroy(struct ptlrpc_request_set *set)
289 {
290         struct list_head *tmp;
291         struct list_head *next;
292         int               expected_phase;
293         int               n = 0;
294         ENTRY;
295
296         /* Requests on the set should either all be completed, or all be new */
297         expected_phase = (set->set_remaining == 0) ?
298                          RQ_PHASE_COMPLETE : RQ_PHASE_NEW;
299         list_for_each (tmp, &set->set_requests) {
300                 struct ptlrpc_request *req =
301                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
302
303                 LASSERT(req->rq_phase == expected_phase);
304                 n++;
305         }
306
307         LASSERT(set->set_remaining == 0 || set->set_remaining == n);
308
309         list_for_each_safe(tmp, next, &set->set_requests) {
310                 struct ptlrpc_request *req =
311                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
312                 list_del_init(&req->rq_set_chain);
313
314                 LASSERT(req->rq_phase == expected_phase);
315
316                 if (req->rq_phase == RQ_PHASE_NEW) {
317
318                         if (req->rq_interpret_reply != NULL) {
319                                 int (*interpreter)(struct ptlrpc_request *,
320                                                    void *, int) =
321                                         req->rq_interpret_reply;
322
323                                 /* higher level (i.e. LOV) failed;
324                                  * let the sub reqs clean up */
325                                 req->rq_status = -EBADR;
326                                 interpreter(req, &req->rq_async_args,
327                                             req->rq_status);
328                         }
329                         set->set_remaining--;
330                 }
331
332                 req->rq_set = NULL;
333                 ptlrpc_req_finished (req);
334         }
335
336         LASSERT(set->set_remaining == 0);
337
338         OBD_FREE(set, sizeof(*set));
339         EXIT;
340 }
341
342 void ptlrpc_set_add_req(struct ptlrpc_request_set *set,
343                         struct ptlrpc_request *req)
344 {
345         /* The set takes over the caller's request reference */
346         list_add_tail(&req->rq_set_chain, &set->set_requests);
347         req->rq_set = set;
348         set->set_remaining++;
349         atomic_inc(&req->rq_import->imp_inflight);
350 }
351
352 /* lock so many callers can add things, the context that owns the set
353  * is supposed to notice these and move them into the set proper. */
354 void ptlrpc_set_add_new_req(struct ptlrpc_request_set *set,
355                             struct ptlrpc_request *req)
356 {
357         unsigned long flags;
358         spin_lock_irqsave(&set->set_new_req_lock, flags);
359         /* The set takes over the caller's request reference */
360         list_add_tail(&req->rq_set_chain, &set->set_new_requests);
361         req->rq_set = set;
362         spin_unlock_irqrestore(&set->set_new_req_lock, flags);
363 }
364
365 /*
366  * Based on the current state of the import, determine if the request
367  * can be sent, is an error, or should be delayed.
368  *
369  * Returns true if this request should be delayed. If false, and
370  * *status is set, then the request can not be sent and *status is the
371  * error code.  If false and status is 0, then request can be sent.
372  *
373  * The imp->imp_lock must be held.
374  */
375 static int ptlrpc_import_delay_req(struct obd_import *imp, 
376                                    struct ptlrpc_request *req, int *status)
377 {
378         int delay = 0;
379         ENTRY;
380
381         LASSERT (status != NULL);
382         *status = 0;
383
384         if (imp->imp_state == LUSTRE_IMP_NEW) {
385                 DEBUG_REQ(D_ERROR, req, "Uninitialized import.");
386                 *status = -EIO;
387                 LBUG();
388         }
389         else if (imp->imp_state == LUSTRE_IMP_CLOSED) {
390                 DEBUG_REQ(D_ERROR, req, "IMP_CLOSED ");
391                 *status = -EIO;
392         }
393         /* allow CONNECT even if import is invalid */
394         else if (req->rq_send_state == LUSTRE_IMP_CONNECTING &&
395                  imp->imp_state == LUSTRE_IMP_CONNECTING) {
396                 ;
397         }
398         /*
399          * If the import has been invalidated (such as by an OST failure), the
400          * request must fail with -EIO.  
401          */
402         else if (imp->imp_invalid) {
403                 DEBUG_REQ(D_ERROR, req, "IMP_INVALID");
404                 *status = -EIO;
405         } 
406         else if (req->rq_import_generation != imp->imp_generation) {
407                 DEBUG_REQ(D_ERROR, req, "req wrong generation:");
408                 *status = -EIO;
409         } 
410         else if (req->rq_send_state != imp->imp_state) {
411                 if (imp->imp_obd->obd_no_recov || imp->imp_dlm_fake 
412                     || req->rq_no_delay) 
413                         *status = -EWOULDBLOCK;
414                 else
415                         delay = 1;
416         }
417
418         RETURN(delay);
419 }
420
421 static int ptlrpc_check_reply(struct ptlrpc_request *req)
422 {
423         unsigned long flags;
424         int rc = 0;
425         ENTRY;
426
427         /* serialise with network callback */
428         spin_lock_irqsave (&req->rq_lock, flags);
429
430         if (req->rq_replied) {
431                 DEBUG_REQ(D_NET, req, "REPLIED:");
432                 GOTO(out, rc = 1);
433         }
434         
435         if (req->rq_net_err && !req->rq_timedout) {
436                 spin_unlock_irqrestore (&req->rq_lock, flags);
437                 rc = ptlrpc_expire_one_request(req); 
438                 spin_lock_irqsave (&req->rq_lock, flags);
439                 GOTO(out, rc);
440         }
441
442         if (req->rq_err) {
443                 DEBUG_REQ(D_ERROR, req, "ABORTED:");
444                 GOTO(out, rc = 1);
445         }
446
447         if (req->rq_resend) {
448                 DEBUG_REQ(D_ERROR, req, "RESEND:");
449                 GOTO(out, rc = 1);
450         }
451
452         if (req->rq_restart) {
453                 DEBUG_REQ(D_ERROR, req, "RESTART:");
454                 GOTO(out, rc = 1);
455         }
456         EXIT;
457  out:
458         spin_unlock_irqrestore (&req->rq_lock, flags);
459         DEBUG_REQ(D_NET, req, "rc = %d for", rc);
460         return rc;
461 }
462
463 static int ptlrpc_check_status(struct ptlrpc_request *req)
464 {
465         int err;
466         ENTRY;
467
468         err = req->rq_repmsg->status;
469         if (req->rq_repmsg->type == PTL_RPC_MSG_ERR) {
470                 DEBUG_REQ(D_ERROR, req, "type == PTL_RPC_MSG_ERR, err == %d", 
471                           err);
472                 RETURN(err < 0 ? err : -EINVAL);
473         }
474
475         if (err < 0) {
476                 DEBUG_REQ(D_INFO, req, "status is %d", err);
477         } else if (err > 0) {
478                 /* XXX: translate this error from net to host */
479                 DEBUG_REQ(D_INFO, req, "status is %d", err);
480         }
481
482         RETURN(err);
483 }
484
485 static int after_reply(struct ptlrpc_request *req)
486 {
487         unsigned long flags;
488         struct obd_import *imp = req->rq_import;
489         int rc;
490         ENTRY;
491
492         LASSERT(!req->rq_receiving_reply);
493
494         /* NB Until this point, the whole of the incoming message,
495          * including buflens, status etc is in the sender's byte order. */
496
497 #if SWAB_PARANOIA
498         /* Clear reply swab mask; this is a new reply in sender's byte order */
499         req->rq_rep_swab_mask = 0;
500 #endif
501         LASSERT (req->rq_nob_received <= req->rq_repbuf_len);
502         rc = ptlrpcs_cli_unwrap_reply(req);
503         if (rc) {
504                 CERROR("verify reply error: %d\n", rc);
505                 RETURN(rc);
506         }
507         /* unwrap_reply may request rpc be resend */
508         if (req->rq_ptlrpcs_restart) {
509                 req->rq_resend = 1;
510                 RETURN(0);
511         }
512
513         /* unwrap_reply will set rq_replen as the actual received
514          * lustre_msg length
515          */
516         rc = lustre_unpack_msg(req->rq_repmsg, req->rq_replen);
517         if (rc) {
518                 CERROR("unpack_rep failed: %d\n", rc);
519                 RETURN(-EPROTO);
520         }
521
522         if (req->rq_repmsg->type != PTL_RPC_MSG_REPLY &&
523             req->rq_repmsg->type != PTL_RPC_MSG_ERR) {
524                 CERROR("invalid packet type received (type=%u)\n",
525                        req->rq_repmsg->type);
526                 RETURN(-EPROTO);
527         }
528
529         rc = ptlrpc_check_status(req);
530
531         /* Either we've been evicted, or the server has failed for
532          * some reason. Try to reconnect, and if that fails, punt to the
533          * upcall. */
534         if (rc == -ENOTCONN) {
535                 if (req->rq_send_state != LUSTRE_IMP_FULL ||
536                     imp->imp_obd->obd_no_recov || imp->imp_dlm_fake) {
537                         RETURN(-ENOTCONN);
538                 }
539
540                 ptlrpc_request_handle_notconn(req);
541
542                 RETURN(rc);
543         }
544
545         /* Store transno in reqmsg for replay. */
546         req->rq_reqmsg->transno = req->rq_transno = req->rq_repmsg->transno;
547
548         if (req->rq_import->imp_replayable) {
549                 spin_lock_irqsave(&imp->imp_lock, flags);
550                 if (req->rq_transno != 0)
551                         ptlrpc_retain_replayable_request(req, imp);
552                 else if (req->rq_commit_cb != NULL) {
553                         spin_unlock_irqrestore(&imp->imp_lock, flags);
554                         req->rq_commit_cb(req);
555                         spin_lock_irqsave(&imp->imp_lock, flags);
556                 }
557
558                 if (req->rq_transno > imp->imp_max_transno)
559                         imp->imp_max_transno = req->rq_transno;
560
561                 /* Replay-enabled imports return commit-status information. */
562                 if (req->rq_repmsg->last_committed)
563                         imp->imp_peer_committed_transno =
564                                 req->rq_repmsg->last_committed;
565                 ptlrpc_free_committed(imp);
566                 spin_unlock_irqrestore(&imp->imp_lock, flags);
567         }
568
569         RETURN(rc);
570 }
571
572 static int ptlrpc_send_new_req(struct ptlrpc_request *req)
573 {
574         char                   str[PTL_NALFMT_SIZE];
575         struct obd_import     *imp;
576         unsigned long          flags;
577         int rc;
578         ENTRY;
579
580         LASSERT(req->rq_phase == RQ_PHASE_NEW);
581         req->rq_phase = RQ_PHASE_RPC;
582
583         imp = req->rq_import;
584         spin_lock_irqsave(&imp->imp_lock, flags);
585
586         req->rq_import_generation = imp->imp_generation;
587
588         if (ptlrpc_import_delay_req(imp, req, &rc)) {
589                 spin_lock (&req->rq_lock);
590                 req->rq_waiting = 1;
591                 spin_unlock (&req->rq_lock);
592
593                 DEBUG_REQ(D_HA, req, "req from PID %d waiting for recovery: "
594                           "(%s != %s)",
595                           req->rq_reqmsg->status, 
596                           ptlrpc_import_state_name(req->rq_send_state),
597                           ptlrpc_import_state_name(imp->imp_state));
598                 LASSERT(list_empty (&req->rq_list));
599
600                 list_add_tail(&req->rq_list, &imp->imp_delayed_list);
601                 spin_unlock_irqrestore(&imp->imp_lock, flags);
602                 RETURN(0);
603         }
604
605         if (rc != 0) {
606                 spin_unlock_irqrestore(&imp->imp_lock, flags);
607                 req->rq_status = rc;
608                 req->rq_phase = RQ_PHASE_INTERPRET;
609                 RETURN(rc);
610         }
611
612         /* XXX this is the same as ptlrpc_queue_wait */
613         LASSERT(list_empty(&req->rq_list));
614         list_add_tail(&req->rq_list, &imp->imp_sending_list);
615         spin_unlock_irqrestore(&imp->imp_lock, flags);
616
617         req->rq_reqmsg->status = current->pid;
618         CDEBUG(D_RPCTRACE, "Sending RPC pname:cluuid:pid:xid:ni:nid:opc"
619                " %s:%s:%d:"LPU64":%s:%s:%d\n", current->comm,
620                imp->imp_obd->obd_uuid.uuid, req->rq_reqmsg->status,
621                req->rq_xid,
622                imp->imp_connection->c_peer.peer_ni->pni_name,
623                ptlrpc_peernid2str(&imp->imp_connection->c_peer, str),
624                req->rq_reqmsg->opc);
625
626         rc = ptl_send_rpc(req);
627         if (rc) {
628                 DEBUG_REQ(D_HA, req, "send failed (%d); expect timeout", rc);
629                 req->rq_net_err = 1;
630                 RETURN(rc);
631         }
632         RETURN(0);
633 }
634
635 int ptlrpc_check_set(struct ptlrpc_request_set *set)
636 {
637         char str[PTL_NALFMT_SIZE];
638         unsigned long flags;
639         struct list_head *tmp;
640         int force_timer_recalc = 0;
641         ENTRY;
642
643         if (set->set_remaining == 0)
644                 RETURN(1);
645
646         list_for_each(tmp, &set->set_requests) {
647                 struct ptlrpc_request *req =
648                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
649                 struct obd_import *imp = req->rq_import;
650                 int rc = 0;
651
652                 if (req->rq_phase == RQ_PHASE_NEW &&
653                     ptlrpc_send_new_req(req)) {
654                         force_timer_recalc = 1;
655                 }
656
657                 if (!(req->rq_phase == RQ_PHASE_RPC ||
658                       req->rq_phase == RQ_PHASE_BULK ||
659                       req->rq_phase == RQ_PHASE_INTERPRET ||
660                       req->rq_phase == RQ_PHASE_COMPLETE)) {
661                         DEBUG_REQ(D_ERROR, req, "bad phase %x", req->rq_phase);
662                         LBUG();
663                 }
664
665                 if (req->rq_phase == RQ_PHASE_COMPLETE)
666                         continue;
667
668                 if (req->rq_phase == RQ_PHASE_INTERPRET)
669                         GOTO(interpret, req->rq_status);
670
671                 if (req->rq_net_err && !req->rq_timedout)
672                         ptlrpc_expire_one_request(req); 
673
674                 if (req->rq_err || req->rq_ptlrpcs_err) {
675                         ptlrpc_unregister_reply(req);
676                         if (req->rq_status == 0)
677                                 req->rq_status = req->rq_err ? -EIO : -EPERM;
678                         req->rq_phase = RQ_PHASE_INTERPRET;
679
680                         spin_lock_irqsave(&imp->imp_lock, flags);
681                         list_del_init(&req->rq_list);
682                         spin_unlock_irqrestore(&imp->imp_lock, flags);
683
684                         GOTO(interpret, req->rq_status);
685                 }
686
687                 /* ptlrpc_queue_wait->l_wait_event guarantees that rq_intr
688                  * will only be set after rq_timedout, but the oig waiting
689                  * path sets rq_intr irrespective of whether ptlrpcd has
690                  * seen a timeout.  our policy is to only interpret 
691                  * interrupted rpcs after they have timed out */
692                 if (req->rq_intr && (req->rq_timedout || req->rq_waiting)) {
693                         /* NB could be on delayed list */
694                         ptlrpc_unregister_reply(req);
695                         req->rq_status = -EINTR;
696                         req->rq_phase = RQ_PHASE_INTERPRET;
697
698                         spin_lock_irqsave(&imp->imp_lock, flags);
699                         list_del_init(&req->rq_list);
700                         spin_unlock_irqrestore(&imp->imp_lock, flags);
701
702                         GOTO(interpret, req->rq_status);
703                 }
704
705                 if (req->rq_phase == RQ_PHASE_RPC) {
706                         if (req->rq_timedout||req->rq_waiting||req->rq_resend) {
707                                 int status;
708
709                                 ptlrpc_unregister_reply(req);
710
711                                 spin_lock_irqsave(&imp->imp_lock, flags);
712
713                                 if (ptlrpc_import_delay_req(imp, req, &status)){
714                                         spin_unlock_irqrestore(&imp->imp_lock,
715                                                                flags);
716                                         continue;
717                                 }
718
719                                 list_del_init(&req->rq_list);
720                                 if (status != 0)  {
721                                         req->rq_status = status;
722                                         req->rq_phase = RQ_PHASE_INTERPRET;
723                                         spin_unlock_irqrestore(&imp->imp_lock,
724                                                                flags);
725                                         GOTO(interpret, req->rq_status);
726                                 }
727                                 if (req->rq_no_resend) {
728                                         req->rq_status = -ENOTCONN;
729                                         req->rq_phase = RQ_PHASE_INTERPRET;
730                                         spin_unlock_irqrestore(&imp->imp_lock,
731                                                                flags);
732                                         GOTO(interpret, req->rq_status);
733                                 }
734                                 list_add_tail(&req->rq_list,
735                                               &imp->imp_sending_list);
736
737                                 spin_unlock_irqrestore(&imp->imp_lock, flags);
738
739                                 req->rq_waiting = 0;
740                                 if (req->rq_resend) {
741                                         if (!req->rq_ptlrpcs_restart)
742                                                 lustre_msg_add_flags(
743                                                         req->rq_reqmsg,
744                                                         MSG_RESENT);
745                                         if (req->rq_bulk) {
746                                                 __u64 old_xid = req->rq_xid;
747
748                                                 ptlrpc_unregister_bulk (req);
749
750                                                 /* ensure previous bulk fails */
751                                                 req->rq_xid = ptlrpc_next_xid();
752                                                 CDEBUG(D_HA, "resend bulk "
753                                                        "old x"LPU64
754                                                        " new x"LPU64"\n",
755                                                        old_xid, req->rq_xid);
756                                         }
757                                 }
758
759                                 rc = ptl_send_rpc(req);
760                                 if (rc) {
761                                         DEBUG_REQ(D_HA, req, "send failed (%d)",
762                                                   rc);
763                                         force_timer_recalc = 1;
764                                         req->rq_net_err = 1;
765                                 }
766                                 /* need to reset the timeout */
767                                 force_timer_recalc = 1;
768                         }
769
770                         /* Still waiting for a reply? */
771                         if (ptlrpc_client_receiving_reply(req))
772                                 continue;
773
774                         /* Did we actually receive a reply? */
775                         if (!ptlrpc_client_replied(req))
776                                 continue;
777
778                         spin_lock_irqsave(&imp->imp_lock, flags);
779                         list_del_init(&req->rq_list);
780                         spin_unlock_irqrestore(&imp->imp_lock, flags);
781
782                         req->rq_status = after_reply(req);
783                         if (req->rq_resend) {
784                                 /* Add this req to the delayed list so
785                                    it can be errored if the import is
786                                    evicted after recovery. */
787                                 spin_lock_irqsave (&req->rq_lock, flags);
788                                 list_add_tail(&req->rq_list, 
789                                               &imp->imp_delayed_list);
790                                 spin_unlock_irqrestore(&req->rq_lock, flags);
791                                 continue;
792                         }
793
794                         /* If there is no bulk associated with this request,
795                          * then we're done and should let the interpreter
796                          * process the reply.  Similarly if the RPC returned
797                          * an error, and therefore the bulk will never arrive.
798                          */
799                         if (req->rq_bulk == NULL || req->rq_status != 0) {
800                                 req->rq_phase = RQ_PHASE_INTERPRET;
801                                 GOTO(interpret, req->rq_status);
802                         }
803
804                         req->rq_phase = RQ_PHASE_BULK;
805                 }
806
807                 LASSERT(req->rq_phase == RQ_PHASE_BULK);
808                 if (ptlrpc_bulk_active(req->rq_bulk))
809                         continue;
810
811                 if (!req->rq_bulk->bd_success) {
812                         /* The RPC reply arrived OK, but the bulk screwed
813                          * up!  Dead wierd since the server told us the RPC
814                          * was good after getting the REPLY for her GET or
815                          * the ACK for her PUT. */
816                         DEBUG_REQ(D_ERROR, req, "bulk transfer failed");
817                         LBUG();
818                 }
819
820                 req->rq_phase = RQ_PHASE_INTERPRET;
821
822         interpret:
823                 LASSERT(req->rq_phase == RQ_PHASE_INTERPRET);
824                 LASSERT(!req->rq_receiving_reply);
825
826                 ptlrpc_unregister_reply(req);
827                 if (req->rq_bulk != NULL)
828                         ptlrpc_unregister_bulk (req);
829
830                 req->rq_phase = RQ_PHASE_COMPLETE;
831
832                 if (req->rq_interpret_reply != NULL) {
833                         int (*interpreter)(struct ptlrpc_request *,void *,int) =
834                                 req->rq_interpret_reply;
835                         req->rq_status = interpreter(req, &req->rq_async_args,
836                                                      req->rq_status);
837                 }
838
839                 CDEBUG(D_RPCTRACE, "Completed RPC pname:cluuid:pid:xid:ni:nid:"
840                        "opc %s:%s:%d:"LPU64":%s:%s:%d\n", current->comm,
841                        imp->imp_obd->obd_uuid.uuid, req->rq_reqmsg->status,
842                        req->rq_xid,
843                        imp->imp_connection->c_peer.peer_ni->pni_name,
844                        ptlrpc_peernid2str(&imp->imp_connection->c_peer, str),
845                        req->rq_reqmsg->opc);
846
847                 set->set_remaining--;
848
849                 atomic_dec(&imp->imp_inflight);
850                 wake_up(&imp->imp_recovery_waitq);
851         }
852
853         /* If we hit an error, we want to recover promptly. */
854         RETURN(set->set_remaining == 0 || force_timer_recalc);
855 }
856
857 int ptlrpc_expire_one_request(struct ptlrpc_request *req)
858 {
859         unsigned long      flags;
860         struct obd_import *imp = req->rq_import;
861         int replied = 0;
862         ENTRY;
863
864         DEBUG_REQ(D_ERROR, req, "timeout (sent at %lu, %lus ago)",
865                   (long)req->rq_sent, LTIME_S(CURRENT_TIME) - req->rq_sent);
866
867         spin_lock_irqsave (&req->rq_lock, flags);
868         replied = req->rq_replied;
869         if (!replied)
870                 req->rq_timedout = 1;
871         spin_unlock_irqrestore (&req->rq_lock, flags);
872
873         if (replied)
874                 RETURN(0);
875
876         ptlrpc_unregister_reply (req);
877
878         if (obd_dump_on_timeout)
879                 portals_debug_dumplog();
880
881         if (req->rq_bulk != NULL)
882                 ptlrpc_unregister_bulk (req);
883
884         if (imp == NULL) {
885                 DEBUG_REQ(D_HA, req, "NULL import: already cleaned up?");
886                 RETURN(1);
887         }
888
889         /* The DLM server doesn't want recovery run on its imports. */
890         if (imp->imp_dlm_fake)
891                 RETURN(1);
892
893         /* If this request is for recovery or other primordial tasks,
894          * then error it out here. */
895         if (req->rq_send_state != LUSTRE_IMP_FULL ||
896             imp->imp_obd->obd_no_recov) {
897                 spin_lock_irqsave (&req->rq_lock, flags);
898                 req->rq_status = -ETIMEDOUT;
899                 req->rq_err = 1;
900                 spin_unlock_irqrestore (&req->rq_lock, flags);
901                 RETURN(1);
902         }
903
904         ptlrpc_fail_import(imp, req->rq_import_generation);
905
906         RETURN(0);
907 }
908
909 int ptlrpc_expired_set(void *data)
910 {
911         struct ptlrpc_request_set *set = data;
912         struct list_head          *tmp;
913         time_t                     now = LTIME_S(CURRENT_TIME);
914         ENTRY;
915
916         LASSERT(set != NULL);
917
918         /* A timeout expired; see which reqs it applies to... */
919         list_for_each (tmp, &set->set_requests) {
920                 struct ptlrpc_request *req =
921                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
922
923                 /* request in-flight? */
924                 if (!((req->rq_phase == RQ_PHASE_RPC && !req->rq_waiting 
925                        && !req->rq_resend) ||
926                       (req->rq_phase == RQ_PHASE_BULK)))
927                         continue;
928
929                 if (req->rq_timedout ||           /* already dealt with */
930                     req->rq_sent + req->rq_timeout > now) /* not expired */
931                         continue;
932
933                 /* deal with this guy */
934                 ptlrpc_expire_one_request (req);
935         }
936
937         /* When waiting for a whole set, we always to break out of the
938          * sleep so we can recalculate the timeout, or enable interrupts
939          * iff everyone's timed out.
940          */
941         RETURN(1);
942 }
943
944 void ptlrpc_mark_interrupted(struct ptlrpc_request *req)
945 {
946         unsigned long flags;
947         spin_lock_irqsave(&req->rq_lock, flags);
948         req->rq_intr = 1;
949         spin_unlock_irqrestore(&req->rq_lock, flags);
950 }
951
952 void ptlrpc_interrupted_set(void *data)
953 {
954         struct ptlrpc_request_set *set = data;
955         struct list_head *tmp;
956
957         LASSERT(set != NULL);
958         CERROR("INTERRUPTED SET %p\n", set);
959
960         list_for_each(tmp, &set->set_requests) {
961                 struct ptlrpc_request *req =
962                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
963
964                 if (req->rq_phase != RQ_PHASE_RPC)
965                         continue;
966
967                 ptlrpc_mark_interrupted(req);
968         }
969 }
970
971 int ptlrpc_set_next_timeout(struct ptlrpc_request_set *set)
972 {
973         struct list_head      *tmp;
974         time_t                 now = LTIME_S(CURRENT_TIME);
975         time_t                 deadline;
976         int                    timeout = 0;
977         struct ptlrpc_request *req;
978         ENTRY;
979
980         SIGNAL_MASK_ASSERT(); /* XXX BUG 1511 */
981
982         list_for_each(tmp, &set->set_requests) {
983                 req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
984
985                 /* request in-flight? */
986                 if (!((req->rq_phase == RQ_PHASE_RPC && !req->rq_waiting) ||
987                       (req->rq_phase == RQ_PHASE_BULK)))
988                         continue;
989
990                 if (req->rq_timedout)   /* already timed out */
991                         continue;
992
993                 deadline = req->rq_sent + req->rq_timeout;
994                 if (deadline <= now)    /* actually expired already */
995                         timeout = 1;    /* ASAP */
996                 else if (timeout == 0 || timeout > deadline - now)
997                         timeout = deadline - now;
998         }
999         RETURN(timeout);
1000 }
1001                 
1002
1003 int ptlrpc_set_wait(struct ptlrpc_request_set *set)
1004 {
1005         struct list_head      *tmp;
1006         struct ptlrpc_request *req;
1007         struct l_wait_info     lwi;
1008         int                    rc, timeout;
1009         ENTRY;
1010
1011         LASSERT(!list_empty(&set->set_requests));
1012         list_for_each(tmp, &set->set_requests) {
1013                 req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
1014                 if (req->rq_phase == RQ_PHASE_NEW)
1015                         (void)ptlrpc_send_new_req(req);
1016         }
1017
1018         do {
1019                 timeout = ptlrpc_set_next_timeout(set);
1020
1021                 /* wait until all complete, interrupted, or an in-flight
1022                  * req times out */
1023                 CDEBUG(D_HA, "set %p going to sleep for %d seconds\n",
1024                        set, timeout);
1025                 lwi = LWI_TIMEOUT_INTR((timeout ? timeout : 1) * HZ,
1026                                        ptlrpc_expired_set,
1027                                        ptlrpc_interrupted_set, set);
1028                 rc = l_wait_event(set->set_waitq, ptlrpc_check_set(set), &lwi);
1029
1030                 LASSERT(rc == 0 || rc == -EINTR || rc == -ETIMEDOUT);
1031
1032                 /* -EINTR => all requests have been flagged rq_intr so next
1033                  * check completes.
1034                  * -ETIMEOUTD => someone timed out.  When all reqs have
1035                  * timed out, signals are enabled allowing completion with
1036                  * EINTR.
1037                  * I don't really care if we go once more round the loop in
1038                  * the error cases -eeb. */
1039         } while (rc != 0 || set->set_remaining != 0);
1040
1041         LASSERT(set->set_remaining == 0);
1042
1043         rc = 0;
1044         list_for_each(tmp, &set->set_requests) {
1045                 req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
1046
1047                 LASSERT(req->rq_phase == RQ_PHASE_COMPLETE);
1048                 if (req->rq_status != 0)
1049                         rc = req->rq_status;
1050         }
1051
1052         if (set->set_interpret != NULL) {
1053                 int (*interpreter)(struct ptlrpc_request_set *set,void *,int) =
1054                         set->set_interpret;
1055                 rc = interpreter (set, set->set_arg, rc);
1056         }
1057
1058         RETURN(rc);
1059 }
1060
1061 static void __ptlrpc_free_req(struct ptlrpc_request *request, int locked)
1062 {
1063         ENTRY;
1064         if (request == NULL) {
1065                 EXIT;
1066                 return;
1067         }
1068
1069         LASSERTF(!request->rq_receiving_reply, "req %p\n", request);
1070         LASSERTF(request->rq_rqbd == NULL, "req %p\n",request);/* client-side */
1071         LASSERTF(list_empty(&request->rq_list), "req %p\n", request);
1072         LASSERTF(list_empty(&request->rq_set_chain), "req %p\n", request);
1073         LASSERT(request->rq_cred);
1074
1075         /* We must take it off the imp_replay_list first.  Otherwise, we'll set
1076          * request->rq_reqmsg to NULL while osc_close is dereferencing it. */
1077         if (request->rq_import != NULL) {
1078                 unsigned long flags = 0;
1079                 if (!locked)
1080                         spin_lock_irqsave(&request->rq_import->imp_lock, flags);
1081                 list_del_init(&request->rq_replay_list);
1082                 if (!locked)
1083                         spin_unlock_irqrestore(&request->rq_import->imp_lock,
1084                                                flags);
1085         }
1086         LASSERTF(list_empty(&request->rq_replay_list), "req %p\n", request);
1087
1088         if (atomic_read(&request->rq_refcount) != 0) {
1089                 DEBUG_REQ(D_ERROR, request,
1090                           "freeing request with nonzero refcount");
1091                 LBUG();
1092         }
1093
1094         if (request->rq_repbuf != NULL)
1095                 ptlrpcs_cli_free_repbuf(request);
1096         if (request->rq_reqbuf != NULL)
1097                 ptlrpcs_cli_free_reqbuf(request);
1098
1099         if (request->rq_export != NULL) {
1100                 class_export_put(request->rq_export);
1101                 request->rq_export = NULL;
1102         }
1103         if (request->rq_import != NULL) {
1104                 class_import_put(request->rq_import);
1105                 request->rq_import = NULL;
1106         }
1107         if (request->rq_bulk != NULL)
1108                 ptlrpc_free_bulk(request->rq_bulk);
1109
1110         ptlrpcs_req_drop_cred(request);
1111         OBD_FREE(request, sizeof(*request));
1112         EXIT;
1113 }
1114
1115 void ptlrpc_free_req(struct ptlrpc_request *request)
1116 {
1117         __ptlrpc_free_req(request, 0);
1118 }
1119
1120 static int __ptlrpc_req_finished(struct ptlrpc_request *request, int locked);
1121 void ptlrpc_req_finished_with_imp_lock(struct ptlrpc_request *request)
1122 {
1123         LASSERT_SPIN_LOCKED(&request->rq_import->imp_lock);
1124         (void)__ptlrpc_req_finished(request, 1);
1125 }
1126
1127 static int __ptlrpc_req_finished(struct ptlrpc_request *request, int locked)
1128 {
1129         ENTRY;
1130         if (request == NULL)
1131                 RETURN(1);
1132
1133         if (request == LP_POISON ||
1134             request->rq_reqmsg == LP_POISON) {
1135                 CERROR("dereferencing freed request (bug 575)\n");
1136                 LBUG();
1137                 RETURN(1);
1138         }
1139
1140         DEBUG_REQ(D_INFO, request, "refcount now %u",
1141                   atomic_read(&request->rq_refcount) - 1);
1142
1143         if (atomic_dec_and_test(&request->rq_refcount)) {
1144                 __ptlrpc_free_req(request, locked);
1145                 RETURN(1);
1146         }
1147
1148         RETURN(0);
1149 }
1150
1151 void ptlrpc_req_finished(struct ptlrpc_request *request)
1152 {
1153         __ptlrpc_req_finished(request, 0);
1154 }
1155
1156 /* Disengage the client's reply buffer from the network
1157  * NB does _NOT_ unregister any client-side bulk.
1158  * IDEMPOTENT, but _not_ safe against concurrent callers.
1159  * The request owner (i.e. the thread doing the I/O) must call...
1160  */
1161 void ptlrpc_unregister_reply (struct ptlrpc_request *request)
1162 {
1163         int                rc;
1164         wait_queue_head_t *wq;
1165         struct l_wait_info lwi;
1166
1167         LASSERT(!in_interrupt ());             /* might sleep */
1168
1169         if (!ptlrpc_client_receiving_reply(request))
1170                 return;
1171
1172         PtlMDUnlink (request->rq_reply_md_h);
1173
1174         /* We have to l_wait_event() whatever the result, to give liblustre
1175          * a chance to run reply_in_callback() */
1176
1177         if (request->rq_set != NULL)
1178                 wq = &request->rq_set->set_waitq;
1179         else
1180                 wq = &request->rq_reply_waitq;
1181
1182         for (;;) {
1183                 /* Network access will complete in finite time but the HUGE
1184                  * timeout lets us CWARN for visibility of sluggish NALs */
1185                 lwi = LWI_TIMEOUT(300 * HZ, NULL, NULL);
1186                 rc = l_wait_event (*wq, !ptlrpc_client_receiving_reply(request), &lwi);
1187                 if (rc == 0)
1188                         return;
1189
1190                 LASSERT (rc == -ETIMEDOUT);
1191                 DEBUG_REQ(D_WARNING, request, "Unexpectedly long timeout");
1192         }
1193 }
1194
1195 /* caller must hold imp->imp_lock */
1196 void ptlrpc_free_committed(struct obd_import *imp)
1197 {
1198         struct list_head *tmp, *saved;
1199         struct ptlrpc_request *req;
1200         struct ptlrpc_request *last_req = NULL; /* temporary fire escape */
1201         ENTRY;
1202
1203         LASSERT(imp != NULL);
1204
1205         LASSERT_SPIN_LOCKED(&imp->imp_lock);
1206
1207         CDEBUG(D_HA, "%s: committing for last_committed "LPU64"\n",
1208                imp->imp_obd->obd_name, imp->imp_peer_committed_transno);
1209
1210         list_for_each_safe(tmp, saved, &imp->imp_replay_list) {
1211                 req = list_entry(tmp, struct ptlrpc_request, rq_replay_list);
1212
1213                 /* XXX ok to remove when 1357 resolved - rread 05/29/03  */
1214                 LASSERT(req != last_req);
1215                 last_req = req;
1216
1217                 if (req->rq_import_generation < imp->imp_generation) {
1218                         DEBUG_REQ(D_HA, req, "freeing request with old gen");
1219                         GOTO(free_req, 0);
1220                 }
1221
1222                 if (req->rq_replay) {
1223                         /* this is debug flag to avoid flooding of logs -bzzz */
1224                         if (imp->imp_debug_open_replays)
1225                                 DEBUG_REQ(D_HA, req, "keeping (FL_REPLAY)");
1226                         continue;
1227                 }
1228
1229                 /* not yet committed */
1230                 if (req->rq_transno > imp->imp_peer_committed_transno) {
1231                         DEBUG_REQ(D_HA, req, "stopping search");
1232                         break;
1233                 }
1234
1235                 DEBUG_REQ(D_HA, req, "committing (last_committed "LPU64")",
1236                           imp->imp_peer_committed_transno);
1237 free_req:
1238                 if (req->rq_commit_cb != NULL)
1239                         req->rq_commit_cb(req);
1240                 list_del_init(&req->rq_replay_list);
1241                 __ptlrpc_req_finished(req, 1);
1242         }
1243         imp->imp_debug_open_replays = 0;
1244
1245         EXIT;
1246         return;
1247 }
1248
1249 void ptlrpc_cleanup_client(struct obd_import *imp)
1250 {
1251         ENTRY;
1252         EXIT;
1253         return;
1254 }
1255
1256 void ptlrpc_resend_req(struct ptlrpc_request *req)
1257 {
1258         unsigned long flags;
1259
1260         DEBUG_REQ(D_HA, req, "going to resend");
1261         req->rq_reqmsg->handle.cookie = 0;
1262         req->rq_status = -EAGAIN;
1263
1264         spin_lock_irqsave (&req->rq_lock, flags);
1265         req->rq_resend = 1;
1266         req->rq_net_err = 0;
1267         req->rq_timedout = 0;
1268         if (req->rq_bulk) {
1269                 __u64 old_xid = req->rq_xid;
1270                 
1271                 /* ensure previous bulk fails */
1272                 req->rq_xid = ptlrpc_next_xid();
1273                 CDEBUG(D_HA, "resend bulk old x"LPU64" new x"LPU64"\n",
1274                        old_xid, req->rq_xid);
1275         }
1276         ptlrpc_wake_client_req(req);
1277         spin_unlock_irqrestore (&req->rq_lock, flags);
1278 }
1279
1280 /* XXX: this function and rq_status are currently unused */
1281 void ptlrpc_restart_req(struct ptlrpc_request *req)
1282 {
1283         unsigned long flags;
1284
1285         DEBUG_REQ(D_HA, req, "restarting (possibly-)completed request");
1286         req->rq_status = -ERESTARTSYS;
1287
1288         spin_lock_irqsave (&req->rq_lock, flags);
1289         req->rq_restart = 1;
1290         req->rq_timedout = 0;
1291         ptlrpc_wake_client_req(req);
1292         spin_unlock_irqrestore (&req->rq_lock, flags);
1293 }
1294
1295 static int expired_request(void *data)
1296 {
1297         struct ptlrpc_request *req = data;
1298         ENTRY;
1299
1300         /* some failure can suspend regular timeouts */
1301         if (ptlrpc_check_suspend())
1302                 RETURN(1);
1303
1304         RETURN(ptlrpc_expire_one_request(req));
1305 }
1306
1307 static void interrupted_request(void *data)
1308 {
1309         unsigned long flags;
1310
1311         struct ptlrpc_request *req = data;
1312         DEBUG_REQ(D_HA, req, "request interrupted");
1313         spin_lock_irqsave (&req->rq_lock, flags);
1314         req->rq_intr = 1;
1315         spin_unlock_irqrestore (&req->rq_lock, flags);
1316 }
1317
1318 struct ptlrpc_request *ptlrpc_request_addref(struct ptlrpc_request *req)
1319 {
1320         ENTRY;
1321         atomic_inc(&req->rq_refcount);
1322         RETURN(req);
1323 }
1324
1325 void ptlrpc_retain_replayable_request(struct ptlrpc_request *req,
1326                                       struct obd_import *imp)
1327 {
1328         struct list_head *tmp;
1329
1330         LASSERT_SPIN_LOCKED(&imp->imp_lock);
1331
1332         /* clear this  for new requests that were resent as well
1333            as resent replayed requests. */
1334         lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT);
1335
1336         /* don't re-add requests that have been replayed */
1337         if (!list_empty(&req->rq_replay_list))
1338                 return;
1339
1340         lustre_msg_add_flags(req->rq_reqmsg, MSG_REPLAY);
1341
1342         LASSERT(imp->imp_replayable);
1343         /* Balanced in ptlrpc_free_committed, usually. */
1344         ptlrpc_request_addref(req);
1345         list_for_each_prev(tmp, &imp->imp_replay_list) {
1346                 struct ptlrpc_request *iter =
1347                         list_entry(tmp, struct ptlrpc_request, rq_replay_list);
1348
1349                 /* We may have duplicate transnos if we create and then
1350                  * open a file, or for closes retained if to match creating
1351                  * opens, so use req->rq_xid as a secondary key.
1352                  * (See bugs 684, 685, and 428.)
1353                  * XXX no longer needed, but all opens need transnos!
1354                  */
1355                 if (iter->rq_transno > req->rq_transno)
1356                         continue;
1357
1358                 if (iter->rq_transno == req->rq_transno) {
1359                         LASSERT(iter->rq_xid != req->rq_xid);
1360                         if (iter->rq_xid > req->rq_xid)
1361                                 continue;
1362                 }
1363
1364                 list_add(&req->rq_replay_list, &iter->rq_replay_list);
1365                 return;
1366         }
1367
1368         list_add_tail(&req->rq_replay_list, &imp->imp_replay_list);
1369 }
1370
1371 int ptlrpc_queue_wait(struct ptlrpc_request *req)
1372 {
1373         char str[PTL_NALFMT_SIZE];
1374         int rc = 0;
1375         int brc;
1376         struct l_wait_info lwi;
1377         struct obd_import *imp = req->rq_import;
1378         unsigned long flags;
1379         int timeout = 0;
1380         ENTRY;
1381
1382         LASSERT(req->rq_set == NULL);
1383         LASSERT(!req->rq_receiving_reply);
1384         atomic_inc(&imp->imp_inflight);
1385
1386         if (imp->imp_connection == NULL) {
1387                 CERROR("request on not connected import %s\n",
1388                         imp->imp_obd->obd_name);
1389                 RETURN(-EINVAL);
1390         }
1391
1392         /* for distributed debugging */
1393         req->rq_reqmsg->status = current->pid;
1394         LASSERT(imp->imp_obd != NULL);
1395         CDEBUG(D_RPCTRACE, "Sending RPC pname:cluuid:pid:xid:ni:nid:opc "
1396                "%s:%s:%d:"LPU64":%s:%s:%d\n", current->comm,
1397                imp->imp_obd->obd_uuid.uuid,
1398                req->rq_reqmsg->status, req->rq_xid,
1399                imp->imp_connection->c_peer.peer_ni->pni_name,
1400                ptlrpc_peernid2str(&imp->imp_connection->c_peer, str),
1401                req->rq_reqmsg->opc);
1402
1403         /* Mark phase here for a little debug help */
1404         req->rq_phase = RQ_PHASE_RPC;
1405
1406         spin_lock_irqsave(&imp->imp_lock, flags);
1407         req->rq_import_generation = imp->imp_generation;
1408 restart:
1409         if (ptlrpc_import_delay_req(imp, req, &rc)) {
1410                 list_del(&req->rq_list);
1411
1412                 list_add_tail(&req->rq_list, &imp->imp_delayed_list);
1413                 spin_unlock_irqrestore(&imp->imp_lock, flags);
1414
1415                 DEBUG_REQ(D_HA, req, "\"%s\" waiting for recovery: (%s != %s)",
1416                           current->comm, 
1417                           ptlrpc_import_state_name(req->rq_send_state), 
1418                           ptlrpc_import_state_name(imp->imp_state));
1419                 lwi = LWI_INTR(interrupted_request, req);
1420                 rc = l_wait_event(req->rq_reply_waitq,
1421                                   (req->rq_send_state == imp->imp_state ||
1422                                    req->rq_err),
1423                                   &lwi);
1424                 DEBUG_REQ(D_HA, req, "\"%s\" awake: (%s == %s or %d == 1)",
1425                           current->comm, 
1426                           ptlrpc_import_state_name(imp->imp_state), 
1427                           ptlrpc_import_state_name(req->rq_send_state),
1428                           req->rq_err);
1429
1430                 spin_lock_irqsave(&imp->imp_lock, flags);
1431                 list_del_init(&req->rq_list);
1432
1433                 if (req->rq_err) {
1434                         rc = -EIO;
1435                 } 
1436                 else if (req->rq_intr) {
1437                         rc = -EINTR;
1438                 }
1439                 else if (req->rq_no_resend) {
1440                         spin_unlock_irqrestore(&imp->imp_lock, flags);
1441                         GOTO(out, rc = -ETIMEDOUT);
1442                 }
1443                 else {
1444                         GOTO(restart, rc);
1445                 }
1446         } 
1447
1448         if (rc != 0) {
1449                 list_del_init(&req->rq_list);
1450                 spin_unlock_irqrestore(&imp->imp_lock, flags);
1451                 req->rq_status = rc; // XXX this ok?
1452                 GOTO(out, rc);
1453         }
1454
1455         if (req->rq_resend) {
1456                 if (!req->rq_ptlrpcs_restart)
1457                         lustre_msg_add_flags(req->rq_reqmsg, MSG_RESENT);
1458
1459                 if (req->rq_bulk != NULL)
1460                         ptlrpc_unregister_bulk (req);
1461
1462                 DEBUG_REQ(D_HA, req, "resending: ");
1463         }
1464
1465         /* XXX this is the same as ptlrpc_set_wait */
1466         LASSERT(list_empty(&req->rq_list));
1467         list_add_tail(&req->rq_list, &imp->imp_sending_list);
1468         spin_unlock_irqrestore(&imp->imp_lock, flags);
1469
1470         rc = ptl_send_rpc(req);
1471         if (rc) {
1472                 DEBUG_REQ(D_HA, req, "send failed (%d); recovering", rc);
1473                 timeout = 1;
1474         } else {
1475                 timeout = MAX(req->rq_timeout * HZ, 1);
1476                 DEBUG_REQ(D_NET, req, "-- sleeping for %d jiffies", timeout);
1477         }
1478 repeat:
1479         lwi = LWI_TIMEOUT_INTR(timeout, expired_request, interrupted_request,
1480                                req);
1481         rc = l_wait_event(req->rq_reply_waitq, ptlrpc_check_reply(req), &lwi);
1482         if (rc == -ETIMEDOUT && ptlrpc_check_and_wait_suspend(req))
1483                 goto repeat;
1484         DEBUG_REQ(D_NET, req, "-- done sleeping");
1485
1486         CDEBUG(D_RPCTRACE, "Completed RPC pname:cluuid:pid:xid:ni:nid:opc "
1487                "%s:%s:%d:"LPU64":%s:%s:%d\n", current->comm,
1488                imp->imp_obd->obd_uuid.uuid,
1489                req->rq_reqmsg->status, req->rq_xid,
1490                imp->imp_connection->c_peer.peer_ni->pni_name,
1491                ptlrpc_peernid2str(&imp->imp_connection->c_peer, str),
1492                req->rq_reqmsg->opc);
1493
1494         spin_lock_irqsave(&imp->imp_lock, flags);
1495         list_del_init(&req->rq_list);
1496         spin_unlock_irqrestore(&imp->imp_lock, flags);
1497
1498         /* If the reply was received normally, this just grabs the spinlock
1499          * (ensuring the reply callback has returned), sees that
1500          * req->rq_receiving_reply is clear and returns. */
1501         ptlrpc_unregister_reply (req);
1502
1503         if (req->rq_err)
1504                 GOTO(out, rc = -EIO);
1505
1506         if (req->rq_ptlrpcs_err)
1507                 GOTO(out, rc = -EPERM);
1508
1509         /* Resend if we need to, unless we were interrupted. */
1510         if (req->rq_resend && !req->rq_intr) {
1511                 /* ...unless we were specifically told otherwise. */
1512                 if (req->rq_no_resend)
1513                         GOTO(out, rc = -ETIMEDOUT);
1514                 spin_lock_irqsave(&imp->imp_lock, flags);
1515                 goto restart;
1516         }
1517
1518         if (req->rq_intr) {
1519                 /* Should only be interrupted if we timed out. */
1520                 if (!req->rq_timedout)
1521                         DEBUG_REQ(D_ERROR, req,
1522                                   "rq_intr set but rq_timedout not");
1523                 GOTO(out, rc = -EINTR);
1524         }
1525
1526         if (req->rq_timedout) {                 /* non-recoverable timeout */
1527                 GOTO(out, rc = -ETIMEDOUT);
1528         }
1529
1530         if (!req->rq_replied) {
1531                 /* How can this be? -eeb */
1532                 DEBUG_REQ(D_ERROR, req, "!rq_replied: ");
1533                 LBUG();
1534                 GOTO(out, rc = req->rq_status);
1535         }
1536
1537         rc = after_reply (req);
1538         /* NB may return +ve success rc */
1539         if (req->rq_resend) {
1540                 spin_lock_irqsave(&imp->imp_lock, flags);
1541                 goto restart;
1542         }
1543
1544  out:
1545         if (req->rq_bulk != NULL) {
1546                 if (rc >= 0) {                  
1547                         /* success so far.  Note that anything going wrong
1548                          * with bulk now, is EXTREMELY strange, since the
1549                          * server must have believed that the bulk
1550                          * tranferred OK before she replied with success to
1551                          * me. */
1552                         lwi = LWI_TIMEOUT(timeout, NULL, NULL);
1553                         brc = l_wait_event(req->rq_reply_waitq,
1554                                            !ptlrpc_bulk_active(req->rq_bulk),
1555                                            &lwi);
1556                         LASSERT(brc == 0 || brc == -ETIMEDOUT);
1557                         if (brc != 0) {
1558                                 LASSERT(brc == -ETIMEDOUT);
1559                                 DEBUG_REQ(D_ERROR, req, "bulk timed out");
1560                                 rc = brc;
1561                         } else if (!req->rq_bulk->bd_success) {
1562                                 DEBUG_REQ(D_ERROR, req, "bulk transfer failed");
1563                                 rc = -EIO;
1564                         }
1565                 }
1566                 if (rc < 0)
1567                         ptlrpc_unregister_bulk (req);
1568         }
1569
1570         LASSERT(!req->rq_receiving_reply);
1571         req->rq_phase = RQ_PHASE_INTERPRET;
1572
1573         atomic_dec(&imp->imp_inflight);
1574         wake_up(&imp->imp_recovery_waitq);
1575         RETURN(rc);
1576 }
1577
1578 struct ptlrpc_replay_async_args {
1579         int praa_old_state;
1580         int praa_old_status;
1581 };
1582
1583 static int ptlrpc_replay_interpret(struct ptlrpc_request *req,
1584                                     void * data, int rc)
1585 {
1586         struct ptlrpc_replay_async_args *aa = data;
1587         struct obd_import *imp = req->rq_import;
1588         unsigned long flags;
1589
1590         atomic_dec(&imp->imp_replay_inflight);
1591         
1592         if (!req->rq_replied) {
1593                 CERROR("request replay timed out, restarting recovery\n");
1594                 GOTO(out, rc = -ETIMEDOUT);
1595         }
1596
1597 #if SWAB_PARANOIA
1598         /* Clear reply swab mask; this is a new reply in sender's byte order */
1599         req->rq_rep_swab_mask = 0;
1600 #endif
1601         LASSERT (req->rq_nob_received <= req->rq_repbuf_len);
1602         rc = lustre_unpack_msg(req->rq_repmsg, req->rq_replen);
1603         if (rc) {
1604                 CERROR("unpack_rep failed: %d\n", rc);
1605                 GOTO(out, rc = -EPROTO);
1606         }
1607
1608         if (req->rq_repmsg->type == PTL_RPC_MSG_ERR && 
1609             req->rq_repmsg->status == -ENOTCONN) 
1610                 GOTO(out, rc = req->rq_repmsg->status);
1611
1612         /* The transno had better not change over replay. */
1613         LASSERT(req->rq_reqmsg->transno == req->rq_repmsg->transno);
1614
1615         DEBUG_REQ(D_HA, req, "got rep");
1616
1617         /* let the callback do fixups, possibly including in the request */
1618         if (req->rq_replay_cb)
1619                 req->rq_replay_cb(req);
1620
1621         if (req->rq_replied && req->rq_repmsg->status != aa->praa_old_status) {
1622                 DEBUG_REQ(D_HA, req, "status %d, old was %d",
1623                           req->rq_repmsg->status, aa->praa_old_status);
1624         } else {
1625                 /* Put it back for re-replay. */
1626                 req->rq_repmsg->status = aa->praa_old_status;
1627         }
1628
1629         spin_lock_irqsave(&imp->imp_lock, flags);
1630         imp->imp_last_replay_transno = req->rq_transno;
1631         spin_unlock_irqrestore(&imp->imp_lock, flags);
1632
1633         /* continue with recovery */
1634         rc = ptlrpc_import_recovery_state_machine(imp);
1635  out:
1636         req->rq_send_state = aa->praa_old_state;
1637         
1638         if (rc != 0)
1639                 /* this replay failed, so restart recovery */
1640                 ptlrpc_connect_import(imp, NULL);
1641
1642         RETURN(rc);
1643 }
1644
1645
1646 int ptlrpc_replay_req(struct ptlrpc_request *req)
1647 {
1648         struct ptlrpc_replay_async_args *aa;
1649         ENTRY;
1650
1651         LASSERT(req->rq_import->imp_state == LUSTRE_IMP_REPLAY);
1652
1653         /* Not handling automatic bulk replay yet (or ever?) */
1654         LASSERT(req->rq_bulk == NULL);
1655
1656         DEBUG_REQ(D_HA, req, "REPLAY");
1657
1658         LASSERT (sizeof (*aa) <= sizeof (req->rq_async_args));
1659         aa = (struct ptlrpc_replay_async_args *)&req->rq_async_args;
1660         memset(aa, 0, sizeof *aa);
1661
1662         /* Prepare request to be resent with ptlrpcd */
1663         aa->praa_old_state = req->rq_send_state;
1664         req->rq_send_state = LUSTRE_IMP_REPLAY;
1665         req->rq_phase = RQ_PHASE_NEW;
1666         aa->praa_old_status = req->rq_repmsg->status;
1667         req->rq_status = 0;
1668
1669         req->rq_interpret_reply = ptlrpc_replay_interpret;
1670         atomic_inc(&req->rq_import->imp_replay_inflight);
1671         ptlrpc_request_addref(req); /* ptlrpcd needs a ref */
1672
1673         ptlrpcd_add_req(req);
1674         RETURN(0);
1675 }
1676
1677 void ptlrpc_abort_inflight(struct obd_import *imp)
1678 {
1679         unsigned long flags;
1680         struct list_head *tmp, *n;
1681         ENTRY;
1682
1683         /* Make sure that no new requests get processed for this import.
1684          * ptlrpc_{queue,set}_wait must (and does) hold imp_lock while testing
1685          * this flag and then putting requests on sending_list or delayed_list.
1686          */
1687         spin_lock_irqsave(&imp->imp_lock, flags);
1688
1689         /* XXX locking?  Maybe we should remove each request with the list
1690          * locked?  Also, how do we know if the requests on the list are
1691          * being freed at this time?
1692          */
1693         list_for_each_safe(tmp, n, &imp->imp_sending_list) {
1694                 struct ptlrpc_request *req =
1695                         list_entry(tmp, struct ptlrpc_request, rq_list);
1696
1697                 DEBUG_REQ(D_HA, req, "inflight");
1698
1699                 spin_lock (&req->rq_lock);
1700                 if (req->rq_import_generation < imp->imp_generation) {
1701                         req->rq_err = 1;
1702                         ptlrpc_wake_client_req(req);
1703                 }
1704                 spin_unlock (&req->rq_lock);
1705         }
1706
1707         list_for_each_safe(tmp, n, &imp->imp_delayed_list) {
1708                 struct ptlrpc_request *req =
1709                         list_entry(tmp, struct ptlrpc_request, rq_list);
1710
1711                 DEBUG_REQ(D_HA, req, "aborting waiting req");
1712
1713                 spin_lock (&req->rq_lock);
1714                 if (req->rq_import_generation < imp->imp_generation) {
1715                         req->rq_err = 1;
1716                         ptlrpc_wake_client_req(req);
1717                 }
1718                 spin_unlock (&req->rq_lock);
1719         }
1720
1721         list_for_each_safe(tmp, n, &imp->imp_rawrpc_list) {
1722                 struct ptlrpc_request *req =
1723                         list_entry(tmp, struct ptlrpc_request, rq_list);
1724
1725                 DEBUG_REQ(D_HA, req, "aborting raw rpc");
1726
1727                 spin_lock(&req->rq_lock);
1728                 req->rq_err = 1;
1729                 ptlrpc_wake_client_req(req);
1730                 spin_unlock(&req->rq_lock);
1731         }
1732
1733         /* Last chance to free reqs left on the replay list, but we
1734          * will still leak reqs that haven't comitted.  */
1735         if (imp->imp_replayable)
1736                 ptlrpc_free_committed(imp);
1737
1738         spin_unlock_irqrestore(&imp->imp_lock, flags);
1739
1740         EXIT;
1741 }
1742
1743 static __u64 ptlrpc_last_xid = 0;
1744 static spinlock_t ptlrpc_last_xid_lock = SPIN_LOCK_UNLOCKED;
1745
1746 __u64 ptlrpc_next_xid(void)
1747 {
1748         __u64 tmp;
1749         spin_lock(&ptlrpc_last_xid_lock);
1750         tmp = ++ptlrpc_last_xid;
1751         spin_unlock(&ptlrpc_last_xid_lock);
1752         return tmp;
1753 }
1754
1755