Whamcloud - gitweb
- landed b_hd_cray_merge3
[fs/lustre-release.git] / lustre / ptlrpc / client.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  *
21  */
22
23 #define DEBUG_SUBSYSTEM S_RPC
24 #ifndef __KERNEL__
25 #include <errno.h>
26 #include <signal.h>
27 #include <liblustre.h>
28 #endif
29
30 #include <linux/obd_support.h>
31 #include <linux/obd_class.h>
32 #include <linux/lustre_lib.h>
33 #include <linux/lustre_ha.h>
34 #include <linux/lustre_import.h>
35 #include <linux/lustre_sec.h>
36
37 #include "ptlrpc_internal.h"
38
39 void ptlrpc_init_client(int req_portal, int rep_portal, char *name,
40                         struct ptlrpc_client *cl)
41 {
42         cl->cli_request_portal = req_portal;
43         cl->cli_reply_portal   = rep_portal;
44         cl->cli_name           = name;
45 }
46
47 struct ptlrpc_connection *ptlrpc_uuid_to_connection(struct obd_uuid *uuid)
48 {
49         struct ptlrpc_connection *c;
50         struct ptlrpc_peer peer;
51         int err;
52
53         err = ptlrpc_uuid_to_peer(uuid, &peer);
54         if (err != 0) {
55                 CERROR("cannot find peer %s!\n", uuid->uuid);
56                 return NULL;
57         }
58
59         c = ptlrpc_get_connection(&peer, uuid);
60         if (c) {
61                 memcpy(c->c_remote_uuid.uuid,
62                        uuid->uuid, sizeof(c->c_remote_uuid.uuid));
63         }
64
65         CDEBUG(D_INFO, "%s -> %p\n", uuid->uuid, c);
66
67         return c;
68 }
69
70 void ptlrpc_readdress_connection(struct ptlrpc_connection *conn,
71                                  struct obd_uuid *uuid)
72 {
73         struct ptlrpc_peer peer;
74         int err;
75
76         err = ptlrpc_uuid_to_peer(uuid, &peer);
77         if (err != 0) {
78                 CERROR("cannot find peer %s!\n", uuid->uuid);
79                 return;
80         }
81
82         memcpy(&conn->c_peer, &peer, sizeof (peer));
83         return;
84 }
85
86 static inline struct ptlrpc_bulk_desc *new_bulk(int npages, int type, int portal)
87 {
88         struct ptlrpc_bulk_desc *desc;
89
90         OBD_ALLOC(desc, offsetof (struct ptlrpc_bulk_desc, bd_iov[npages]));
91         if (!desc)
92                 return NULL;
93
94         spin_lock_init(&desc->bd_lock);
95         init_waitqueue_head(&desc->bd_waitq);
96         desc->bd_max_iov = npages;
97         desc->bd_iov_count = 0;
98         desc->bd_md_h = PTL_INVALID_HANDLE;
99         desc->bd_portal = portal;
100         desc->bd_type = type;
101         
102         return desc;
103 }
104
105 struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_imp (struct ptlrpc_request *req,
106                                                int npages, int type, int portal)
107 {
108         struct obd_import *imp = req->rq_import;
109         struct ptlrpc_bulk_desc *desc;
110
111         LASSERT(type == BULK_PUT_SINK || type == BULK_GET_SOURCE);
112         desc = new_bulk(npages, type, portal);
113         if (desc == NULL)
114                 RETURN(NULL);
115
116         desc->bd_import_generation = req->rq_import_generation;
117         desc->bd_import = class_import_get(imp);
118         desc->bd_req = req;
119
120         desc->bd_cbid.cbid_fn  = client_bulk_callback;
121         desc->bd_cbid.cbid_arg = desc;
122
123         /* This makes req own desc, and free it when she frees herself */
124         req->rq_bulk = desc;
125
126         return desc;
127 }
128
129 struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_exp (struct ptlrpc_request *req,
130                                                int npages, int type, int portal)
131 {
132         struct obd_export *exp = req->rq_export;
133         struct ptlrpc_bulk_desc *desc;
134
135         LASSERT(type == BULK_PUT_SOURCE || type == BULK_GET_SINK);
136
137         desc = new_bulk(npages, type, portal);
138         if (desc == NULL)
139                 RETURN(NULL);
140
141         desc->bd_export = class_export_get(exp);
142         desc->bd_req = req;
143
144         desc->bd_cbid.cbid_fn  = server_bulk_callback;
145         desc->bd_cbid.cbid_arg = desc;
146
147         /* NB we don't assign rq_bulk here; server-side requests are
148          * re-used, and the handler frees the bulk desc explicitly. */
149
150         return desc;
151 }
152
153 void ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc,
154                            struct page *page, int pageoffset, int len)
155 {
156         LASSERT(desc->bd_iov_count < desc->bd_max_iov);
157         LASSERT(page != NULL);
158         LASSERT(pageoffset >= 0);
159         LASSERT(len > 0);
160         LASSERT(pageoffset + len <= PAGE_SIZE);
161
162         desc->bd_nob += len;
163
164         ptlrpc_add_bulk_page(desc, page, pageoffset, len);
165 }
166
167 void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *desc)
168 {
169         ENTRY;
170
171         LASSERT(desc != NULL);
172         LASSERT(desc->bd_iov_count != LI_POISON); /* not freed already */
173         LASSERT(!desc->bd_network_rw);         /* network hands off or */
174         LASSERT((desc->bd_export != NULL) ^ (desc->bd_import != NULL));
175         if (desc->bd_export)
176                 class_export_put(desc->bd_export);
177         else
178                 class_import_put(desc->bd_import);
179
180         OBD_FREE(desc, offsetof(struct ptlrpc_bulk_desc, 
181                                 bd_iov[desc->bd_max_iov]));
182         EXIT;
183 }
184
185 /* FIXME prep_req now should return error code other than NULL. but
186  * this is called everywhere :(
187  */
188 struct ptlrpc_request *ptlrpc_prep_req(struct obd_import *imp, __u32 version,
189                                        int opcode, int count, int *lengths,
190                                        char **bufs)
191 {
192         struct ptlrpc_request *request;
193         int rc;
194         ENTRY;
195
196         LASSERT((unsigned long)imp > 0x1000);
197
198         OBD_ALLOC(request, sizeof(*request));
199         if (!request) {
200                 CERROR("request allocation out of memory\n");
201                 RETURN(NULL);
202         }
203
204         request->rq_import = class_import_get(imp);
205
206         rc = ptlrpcs_req_get_cred(request);
207         if (rc) {
208                 CDEBUG(D_SEC, "failed to get credential\n");
209                 GOTO(out_free, rc);
210         }
211
212         /* just a try on refresh, but we proceed even if it failed */
213         rc = ptlrpcs_cred_refresh(request->rq_cred);
214         if (!ptlrpcs_cred_is_uptodate(request->rq_cred)) {
215                 CERROR("req %p: failed to refresh cred %p, rc %d, continue\n",
216                        request, request->rq_cred, rc);
217         }
218
219         rc = lustre_pack_request(request, count, lengths, bufs);
220         if (rc) {
221                 CERROR("cannot pack request %d\n", rc);
222                 GOTO(out_cred, rc);
223         }
224         request->rq_reqmsg->version |= version;
225
226         if (imp->imp_server_timeout)
227                 request->rq_timeout = obd_timeout / 2;
228         else
229                 request->rq_timeout = obd_timeout;
230
231         request->rq_send_state = LUSTRE_IMP_FULL;
232         request->rq_type = PTL_RPC_MSG_REQUEST;
233
234         request->rq_req_cbid.cbid_fn  = request_out_callback;
235         request->rq_req_cbid.cbid_arg = request;
236
237         request->rq_reply_cbid.cbid_fn  = reply_in_callback;
238         request->rq_reply_cbid.cbid_arg = request;
239         
240         request->rq_phase = RQ_PHASE_NEW;
241
242         /* XXX FIXME bug 249 */
243         request->rq_request_portal = imp->imp_client->cli_request_portal;
244         request->rq_reply_portal = imp->imp_client->cli_reply_portal;
245
246         spin_lock_init(&request->rq_lock);
247         INIT_LIST_HEAD(&request->rq_list);
248         INIT_LIST_HEAD(&request->rq_replay_list);
249         INIT_LIST_HEAD(&request->rq_set_chain);
250         init_waitqueue_head(&request->rq_reply_waitq);
251         request->rq_xid = ptlrpc_next_xid();
252         atomic_set(&request->rq_refcount, 1);
253
254         request->rq_reqmsg->opc = opcode;
255         request->rq_reqmsg->flags = 0;
256         RETURN(request);
257 out_cred:
258         ptlrpcs_req_drop_cred(request);
259 out_free:
260         class_import_put(imp);
261         OBD_FREE(request, sizeof(*request));
262         RETURN(NULL);
263 }
264
265 struct ptlrpc_request_set *ptlrpc_prep_set(void)
266 {
267         struct ptlrpc_request_set *set;
268
269         OBD_ALLOC(set, sizeof *set);
270         if (!set)
271                 RETURN(NULL);
272         INIT_LIST_HEAD(&set->set_requests);
273         init_waitqueue_head(&set->set_waitq);
274         set->set_remaining = 0;
275         spin_lock_init(&set->set_new_req_lock);
276         INIT_LIST_HEAD(&set->set_new_requests);
277
278         RETURN(set);
279 }
280
281 /* Finish with this set; opposite of prep_set. */
282 void ptlrpc_set_destroy(struct ptlrpc_request_set *set)
283 {
284         struct list_head *tmp;
285         struct list_head *next;
286         int               expected_phase;
287         int               n = 0;
288         ENTRY;
289
290         /* Requests on the set should either all be completed, or all be new */
291         expected_phase = (set->set_remaining == 0) ?
292                          RQ_PHASE_COMPLETE : RQ_PHASE_NEW;
293         list_for_each (tmp, &set->set_requests) {
294                 struct ptlrpc_request *req =
295                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
296
297                 LASSERT(req->rq_phase == expected_phase);
298                 n++;
299         }
300
301         LASSERT(set->set_remaining == 0 || set->set_remaining == n);
302
303         list_for_each_safe(tmp, next, &set->set_requests) {
304                 struct ptlrpc_request *req =
305                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
306                 list_del_init(&req->rq_set_chain);
307
308                 LASSERT(req->rq_phase == expected_phase);
309
310                 if (req->rq_phase == RQ_PHASE_NEW) {
311
312                         if (req->rq_interpret_reply != NULL) {
313                                 int (*interpreter)(struct ptlrpc_request *,
314                                                    void *, int) =
315                                         req->rq_interpret_reply;
316
317                                 /* higher level (i.e. LOV) failed;
318                                  * let the sub reqs clean up */
319                                 req->rq_status = -EBADR;
320                                 interpreter(req, &req->rq_async_args,
321                                             req->rq_status);
322                         }
323                         set->set_remaining--;
324                 }
325
326                 req->rq_set = NULL;
327                 ptlrpc_req_finished (req);
328         }
329
330         LASSERT(set->set_remaining == 0);
331
332         OBD_FREE(set, sizeof(*set));
333         EXIT;
334 }
335
336 void ptlrpc_set_add_req(struct ptlrpc_request_set *set,
337                         struct ptlrpc_request *req)
338 {
339         /* The set takes over the caller's request reference */
340         list_add_tail(&req->rq_set_chain, &set->set_requests);
341         req->rq_set = set;
342         set->set_remaining++;
343         atomic_inc(&req->rq_import->imp_inflight);
344 }
345
346 /* lock so many callers can add things, the context that owns the set
347  * is supposed to notice these and move them into the set proper. */
348 void ptlrpc_set_add_new_req(struct ptlrpc_request_set *set,
349                             struct ptlrpc_request *req)
350 {
351         unsigned long flags;
352         spin_lock_irqsave(&set->set_new_req_lock, flags);
353         /* The set takes over the caller's request reference */
354         list_add_tail(&req->rq_set_chain, &set->set_new_requests);
355         req->rq_set = set;
356         spin_unlock_irqrestore(&set->set_new_req_lock, flags);
357 }
358
359 /*
360  * Based on the current state of the import, determine if the request
361  * can be sent, is an error, or should be delayed.
362  *
363  * Returns true if this request should be delayed. If false, and
364  * *status is set, then the request can not be sent and *status is the
365  * error code.  If false and status is 0, then request can be sent.
366  *
367  * The imp->imp_lock must be held.
368  */
369 static int ptlrpc_import_delay_req(struct obd_import *imp, 
370                                    struct ptlrpc_request *req, int *status)
371 {
372         int delay = 0;
373         ENTRY;
374
375         LASSERT (status != NULL);
376         *status = 0;
377
378         if (imp->imp_state == LUSTRE_IMP_NEW) {
379                 DEBUG_REQ(D_ERROR, req, "Uninitialized import.");
380                 *status = -EIO;
381                 LBUG();
382         }
383         else if (imp->imp_state == LUSTRE_IMP_CLOSED) {
384                 DEBUG_REQ(D_ERROR, req, "IMP_CLOSED ");
385                 *status = -EIO;
386         }
387         /* allow CONNECT even if import is invalid */
388         else if (req->rq_send_state == LUSTRE_IMP_CONNECTING &&
389                  imp->imp_state == LUSTRE_IMP_CONNECTING) {
390                 ;
391         }
392         /*
393          * If the import has been invalidated (such as by an OST failure), the
394          * request must fail with -EIO.  
395          */
396         else if (imp->imp_invalid) {
397                 DEBUG_REQ(D_ERROR, req, "IMP_INVALID");
398                 *status = -EIO;
399         } 
400         else if (req->rq_import_generation != imp->imp_generation) {
401                 DEBUG_REQ(D_ERROR, req, "req wrong generation:");
402                 *status = -EIO;
403         } 
404         else if (req->rq_send_state != imp->imp_state) {
405                 if (imp->imp_obd->obd_no_recov || imp->imp_dlm_fake 
406                     || req->rq_no_delay) 
407                         *status = -EWOULDBLOCK;
408                 else
409                         delay = 1;
410         }
411
412         RETURN(delay);
413 }
414
415 static int ptlrpc_check_reply(struct ptlrpc_request *req)
416 {
417         unsigned long flags;
418         int rc = 0;
419         ENTRY;
420
421         /* serialise with network callback */
422         spin_lock_irqsave (&req->rq_lock, flags);
423
424         if (req->rq_replied) {
425                 DEBUG_REQ(D_NET, req, "REPLIED:");
426                 GOTO(out, rc = 1);
427         }
428         
429         if (req->rq_net_err && !req->rq_timedout) {
430                 spin_unlock_irqrestore (&req->rq_lock, flags);
431                 rc = ptlrpc_expire_one_request(req); 
432                 spin_lock_irqsave (&req->rq_lock, flags);
433                 GOTO(out, rc);
434         }
435
436         if (req->rq_err) {
437                 DEBUG_REQ(D_ERROR, req, "ABORTED:");
438                 GOTO(out, rc = 1);
439         }
440
441         if (req->rq_resend) {
442                 DEBUG_REQ(D_ERROR, req, "RESEND:");
443                 GOTO(out, rc = 1);
444         }
445
446         if (req->rq_restart) {
447                 DEBUG_REQ(D_ERROR, req, "RESTART:");
448                 GOTO(out, rc = 1);
449         }
450         EXIT;
451  out:
452         spin_unlock_irqrestore (&req->rq_lock, flags);
453         DEBUG_REQ(D_NET, req, "rc = %d for", rc);
454         return rc;
455 }
456
457 static int ptlrpc_check_status(struct ptlrpc_request *req)
458 {
459         int err;
460         ENTRY;
461
462         err = req->rq_repmsg->status;
463         if (req->rq_repmsg->type == PTL_RPC_MSG_ERR) {
464                 DEBUG_REQ(D_ERROR, req, "type == PTL_RPC_MSG_ERR, err == %d", 
465                           err);
466                 RETURN(err < 0 ? err : -EINVAL);
467         }
468
469         if (err < 0) {
470                 DEBUG_REQ(D_INFO, req, "status is %d", err);
471         } else if (err > 0) {
472                 /* XXX: translate this error from net to host */
473                 DEBUG_REQ(D_INFO, req, "status is %d", err);
474         }
475
476         RETURN(err);
477 }
478
479 static int after_reply(struct ptlrpc_request *req)
480 {
481         unsigned long flags;
482         struct obd_import *imp = req->rq_import;
483         int rc;
484         ENTRY;
485
486         LASSERT(!req->rq_receiving_reply);
487
488         /* NB Until this point, the whole of the incoming message,
489          * including buflens, status etc is in the sender's byte order. */
490
491 #if SWAB_PARANOIA
492         /* Clear reply swab mask; this is a new reply in sender's byte order */
493         req->rq_rep_swab_mask = 0;
494 #endif
495         LASSERT (req->rq_nob_received <= req->rq_repbuf_len);
496         rc = ptlrpcs_cli_unwrap_reply(req);
497         if (rc) {
498                 CERROR("verify reply error: %d\n", rc);
499                 RETURN(rc);
500         }
501         /* unwrap_reply may request rpc be resend */
502         if (req->rq_ptlrpcs_restart) {
503                 req->rq_resend = 1;
504                 RETURN(0);
505         }
506
507         /* unwrap_reply will set rq_replen as the actual received
508          * lustre_msg length
509          */
510         rc = lustre_unpack_msg(req->rq_repmsg, req->rq_replen);
511         if (rc) {
512                 CERROR("unpack_rep failed: %d\n", rc);
513                 RETURN(-EPROTO);
514         }
515
516         if (req->rq_repmsg->type != PTL_RPC_MSG_REPLY &&
517             req->rq_repmsg->type != PTL_RPC_MSG_ERR) {
518                 CERROR("invalid packet type received (type=%u)\n",
519                        req->rq_repmsg->type);
520                 RETURN(-EPROTO);
521         }
522
523         rc = ptlrpc_check_status(req);
524
525         /* Either we've been evicted, or the server has failed for
526          * some reason. Try to reconnect, and if that fails, punt to the
527          * upcall. */
528         if (rc == -ENOTCONN) {
529                 if (req->rq_send_state != LUSTRE_IMP_FULL ||
530                     imp->imp_obd->obd_no_recov || imp->imp_dlm_fake) {
531                         RETURN(-ENOTCONN);
532                 }
533
534                 ptlrpc_request_handle_notconn(req);
535
536                 RETURN(rc);
537         }
538
539         /* Store transno in reqmsg for replay. */
540         req->rq_reqmsg->transno = req->rq_transno = req->rq_repmsg->transno;
541
542
543         if (req->rq_import->imp_replayable) {
544                 spin_lock_irqsave(&imp->imp_lock, flags);
545                 if (req->rq_transno != 0)
546                         ptlrpc_retain_replayable_request(req, imp);
547                 else if (req->rq_commit_cb != NULL) {
548                         spin_unlock_irqrestore(&imp->imp_lock, flags);
549                         req->rq_commit_cb(req);
550                         spin_lock_irqsave(&imp->imp_lock, flags);
551                 }
552
553                 if (req->rq_transno > imp->imp_max_transno)
554                         imp->imp_max_transno = req->rq_transno;
555
556                 /* Replay-enabled imports return commit-status information. */
557                 if (req->rq_repmsg->last_committed)
558                         imp->imp_peer_committed_transno =
559                                 req->rq_repmsg->last_committed;
560                 ptlrpc_free_committed(imp);
561                 spin_unlock_irqrestore(&imp->imp_lock, flags);
562         }
563
564         RETURN(rc);
565 }
566
567 static int ptlrpc_send_new_req(struct ptlrpc_request *req)
568 {
569         char                   str[PTL_NALFMT_SIZE];
570         struct obd_import     *imp;
571         unsigned long          flags;
572         int rc;
573         ENTRY;
574
575         LASSERT(req->rq_phase == RQ_PHASE_NEW);
576         req->rq_phase = RQ_PHASE_RPC;
577
578         imp = req->rq_import;
579         spin_lock_irqsave(&imp->imp_lock, flags);
580
581         req->rq_import_generation = imp->imp_generation;
582
583         if (ptlrpc_import_delay_req(imp, req, &rc)) {
584                 spin_lock (&req->rq_lock);
585                 req->rq_waiting = 1;
586                 spin_unlock (&req->rq_lock);
587
588                 DEBUG_REQ(D_HA, req, "req from PID %d waiting for recovery: "
589                           "(%s != %s)",
590                           req->rq_reqmsg->status, 
591                           ptlrpc_import_state_name(req->rq_send_state),
592                           ptlrpc_import_state_name(imp->imp_state));
593                 LASSERT(list_empty (&req->rq_list));
594
595                 list_add_tail(&req->rq_list, &imp->imp_delayed_list);
596                 spin_unlock_irqrestore(&imp->imp_lock, flags);
597                 RETURN(0);
598         }
599
600         if (rc != 0) {
601                 spin_unlock_irqrestore(&imp->imp_lock, flags);
602                 req->rq_status = rc;
603                 req->rq_phase = RQ_PHASE_INTERPRET;
604                 RETURN(rc);
605         }
606
607         /* XXX this is the same as ptlrpc_queue_wait */
608         LASSERT(list_empty(&req->rq_list));
609         list_add_tail(&req->rq_list, &imp->imp_sending_list);
610         spin_unlock_irqrestore(&imp->imp_lock, flags);
611
612         req->rq_reqmsg->status = current->pid;
613         CDEBUG(D_RPCTRACE, "Sending RPC pname:cluuid:pid:xid:ni:nid:opc"
614                " %s:%s:%d:"LPU64":%s:%s:%d\n", current->comm,
615                imp->imp_obd->obd_uuid.uuid, req->rq_reqmsg->status,
616                req->rq_xid,
617                imp->imp_connection->c_peer.peer_ni->pni_name,
618                ptlrpc_peernid2str(&imp->imp_connection->c_peer, str),
619                req->rq_reqmsg->opc);
620
621         rc = ptl_send_rpc(req);
622         if (rc) {
623                 DEBUG_REQ(D_HA, req, "send failed (%d); expect timeout", rc);
624                 req->rq_net_err = 1;
625                 RETURN(rc);
626         }
627         RETURN(0);
628 }
629
630 int ptlrpc_check_set(struct ptlrpc_request_set *set)
631 {
632         char str[PTL_NALFMT_SIZE];
633         unsigned long flags;
634         struct list_head *tmp;
635         int force_timer_recalc = 0;
636         ENTRY;
637
638         if (set->set_remaining == 0)
639                 RETURN(1);
640
641         list_for_each(tmp, &set->set_requests) {
642                 struct ptlrpc_request *req =
643                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
644                 struct obd_import *imp = req->rq_import;
645                 int rc = 0;
646
647                 if (req->rq_phase == RQ_PHASE_NEW &&
648                     ptlrpc_send_new_req(req)) {
649                         force_timer_recalc = 1;
650                 }
651
652                 if (!(req->rq_phase == RQ_PHASE_RPC ||
653                       req->rq_phase == RQ_PHASE_BULK ||
654                       req->rq_phase == RQ_PHASE_INTERPRET ||
655                       req->rq_phase == RQ_PHASE_COMPLETE)) {
656                         DEBUG_REQ(D_ERROR, req, "bad phase %x", req->rq_phase);
657                         LBUG();
658                 }
659
660                 if (req->rq_phase == RQ_PHASE_COMPLETE)
661                         continue;
662
663                 if (req->rq_phase == RQ_PHASE_INTERPRET)
664                         GOTO(interpret, req->rq_status);
665
666                 if (req->rq_net_err && !req->rq_timedout)
667                         ptlrpc_expire_one_request(req); 
668
669                 if (req->rq_err) {
670                         ptlrpc_unregister_reply(req);
671                         if (req->rq_status == 0)
672                                 req->rq_status = -EIO;
673                         req->rq_phase = RQ_PHASE_INTERPRET;
674
675                         spin_lock_irqsave(&imp->imp_lock, flags);
676                         list_del_init(&req->rq_list);
677                         spin_unlock_irqrestore(&imp->imp_lock, flags);
678
679                         GOTO(interpret, req->rq_status);
680                 }
681
682                 /* ptlrpc_queue_wait->l_wait_event guarantees that rq_intr
683                  * will only be set after rq_timedout, but the oig waiting
684                  * path sets rq_intr irrespective of whether ptlrpcd has
685                  * seen a timeout.  our policy is to only interpret 
686                  * interrupted rpcs after they have timed out */
687                 if (req->rq_intr && (req->rq_timedout || req->rq_waiting)) {
688                         /* NB could be on delayed list */
689                         ptlrpc_unregister_reply(req);
690                         req->rq_status = -EINTR;
691                         req->rq_phase = RQ_PHASE_INTERPRET;
692
693                         spin_lock_irqsave(&imp->imp_lock, flags);
694                         list_del_init(&req->rq_list);
695                         spin_unlock_irqrestore(&imp->imp_lock, flags);
696
697                         GOTO(interpret, req->rq_status);
698                 }
699
700                 if (req->rq_phase == RQ_PHASE_RPC) {
701                         if (req->rq_timedout||req->rq_waiting||req->rq_resend) {
702                                 int status;
703
704                                 ptlrpc_unregister_reply(req);
705
706                                 spin_lock_irqsave(&imp->imp_lock, flags);
707
708                                 if (ptlrpc_import_delay_req(imp, req, &status)){
709                                         spin_unlock_irqrestore(&imp->imp_lock,
710                                                                flags);
711                                         continue;
712                                 }
713
714                                 list_del_init(&req->rq_list);
715                                 if (status != 0)  {
716                                         req->rq_status = status;
717                                         req->rq_phase = RQ_PHASE_INTERPRET;
718                                         spin_unlock_irqrestore(&imp->imp_lock,
719                                                                flags);
720                                         GOTO(interpret, req->rq_status);
721                                 }
722                                 if (req->rq_no_resend) {
723                                         req->rq_status = -ENOTCONN;
724                                         req->rq_phase = RQ_PHASE_INTERPRET;
725                                         spin_unlock_irqrestore(&imp->imp_lock,
726                                                                flags);
727                                         GOTO(interpret, req->rq_status);
728                                 }
729                                 list_add_tail(&req->rq_list,
730                                               &imp->imp_sending_list);
731
732                                 spin_unlock_irqrestore(&imp->imp_lock, flags);
733
734                                 req->rq_waiting = 0;
735                                 if (req->rq_resend) {
736                                         if (!req->rq_ptlrpcs_restart)
737                                                 lustre_msg_add_flags(
738                                                         req->rq_reqmsg,
739                                                         MSG_RESENT);
740                                         if (req->rq_bulk) {
741                                                 __u64 old_xid = req->rq_xid;
742
743                                                 ptlrpc_unregister_bulk (req);
744
745                                                 /* ensure previous bulk fails */
746                                                 req->rq_xid = ptlrpc_next_xid();
747                                                 CDEBUG(D_HA, "resend bulk "
748                                                        "old x"LPU64
749                                                        " new x"LPU64"\n",
750                                                        old_xid, req->rq_xid);
751                                         }
752                                 }
753
754                                 rc = ptl_send_rpc(req);
755                                 if (rc) {
756                                         DEBUG_REQ(D_HA, req, "send failed (%d)",
757                                                   rc);
758                                         force_timer_recalc = 1;
759                                         req->rq_net_err = 1;
760                                 }
761                                 /* need to reset the timeout */
762                                 force_timer_recalc = 1;
763                         }
764
765                         /* Still waiting for a reply? */
766                         if (ptlrpc_client_receiving_reply(req))
767                                 continue;
768
769                         /* Did we actually receive a reply? */
770                         if (!ptlrpc_client_replied(req))
771                                 continue;
772
773                         spin_lock_irqsave(&imp->imp_lock, flags);
774                         list_del_init(&req->rq_list);
775                         spin_unlock_irqrestore(&imp->imp_lock, flags);
776
777                         req->rq_status = after_reply(req);
778                         if (req->rq_resend) {
779                                 /* Add this req to the delayed list so
780                                    it can be errored if the import is
781                                    evicted after recovery. */
782                                 spin_lock_irqsave (&req->rq_lock, flags);
783                                 list_add_tail(&req->rq_list, 
784                                               &imp->imp_delayed_list);
785                                 spin_unlock_irqrestore(&req->rq_lock, flags);
786                                 continue;
787                         }
788
789                         /* If there is no bulk associated with this request,
790                          * then we're done and should let the interpreter
791                          * process the reply.  Similarly if the RPC returned
792                          * an error, and therefore the bulk will never arrive.
793                          */
794                         if (req->rq_bulk == NULL || req->rq_status != 0) {
795                                 req->rq_phase = RQ_PHASE_INTERPRET;
796                                 GOTO(interpret, req->rq_status);
797                         }
798
799                         req->rq_phase = RQ_PHASE_BULK;
800                 }
801
802                 LASSERT(req->rq_phase == RQ_PHASE_BULK);
803                 if (ptlrpc_bulk_active(req->rq_bulk))
804                         continue;
805
806                 if (!req->rq_bulk->bd_success) {
807                         /* The RPC reply arrived OK, but the bulk screwed
808                          * up!  Dead wierd since the server told us the RPC
809                          * was good after getting the REPLY for her GET or
810                          * the ACK for her PUT. */
811                         DEBUG_REQ(D_ERROR, req, "bulk transfer failed");
812                         LBUG();
813                 }
814
815                 req->rq_phase = RQ_PHASE_INTERPRET;
816
817         interpret:
818                 LASSERT(req->rq_phase == RQ_PHASE_INTERPRET);
819                 LASSERT(!req->rq_receiving_reply);
820
821                 ptlrpc_unregister_reply(req);
822                 if (req->rq_bulk != NULL)
823                         ptlrpc_unregister_bulk (req);
824
825                 req->rq_phase = RQ_PHASE_COMPLETE;
826
827                 if (req->rq_interpret_reply != NULL) {
828                         int (*interpreter)(struct ptlrpc_request *,void *,int) =
829                                 req->rq_interpret_reply;
830                         req->rq_status = interpreter(req, &req->rq_async_args,
831                                                      req->rq_status);
832                 }
833
834                 CDEBUG(D_RPCTRACE, "Completed RPC pname:cluuid:pid:xid:ni:nid:"
835                        "opc %s:%s:%d:"LPU64":%s:%s:%d\n", current->comm,
836                        imp->imp_obd->obd_uuid.uuid, req->rq_reqmsg->status,
837                        req->rq_xid,
838                        imp->imp_connection->c_peer.peer_ni->pni_name,
839                        ptlrpc_peernid2str(&imp->imp_connection->c_peer, str),
840                        req->rq_reqmsg->opc);
841
842                 set->set_remaining--;
843
844                 atomic_dec(&imp->imp_inflight);
845                 wake_up(&imp->imp_recovery_waitq);
846         }
847
848         /* If we hit an error, we want to recover promptly. */
849         RETURN(set->set_remaining == 0 || force_timer_recalc);
850 }
851
852 int ptlrpc_expire_one_request(struct ptlrpc_request *req)
853 {
854         unsigned long      flags;
855         struct obd_import *imp = req->rq_import;
856         int replied = 0;
857         ENTRY;
858
859         DEBUG_REQ(D_ERROR, req, "timeout (sent at %lu, %lus ago)",
860                   (long)req->rq_sent, LTIME_S(CURRENT_TIME) - req->rq_sent);
861
862         spin_lock_irqsave (&req->rq_lock, flags);
863         replied = req->rq_replied;
864         if (!replied)
865                 req->rq_timedout = 1;
866         spin_unlock_irqrestore (&req->rq_lock, flags);
867
868         if (replied)
869                 RETURN(0);
870
871         DEBUG_REQ(D_ERROR, req, "timeout (sent at %lu)", (long)req->rq_sent); 
872
873         ptlrpc_unregister_reply (req);
874
875         if (obd_dump_on_timeout)
876                 portals_debug_dumplog();
877
878         if (req->rq_bulk != NULL)
879                 ptlrpc_unregister_bulk (req);
880
881         if (imp == NULL) {
882                 DEBUG_REQ(D_HA, req, "NULL import: already cleaned up?");
883                 RETURN(1);
884         }
885
886         /* The DLM server doesn't want recovery run on its imports. */
887         if (imp->imp_dlm_fake)
888                 RETURN(1);
889
890         /* If this request is for recovery or other primordial tasks,
891          * then error it out here. */
892         if (req->rq_send_state != LUSTRE_IMP_FULL ||
893             imp->imp_obd->obd_no_recov) {
894                 spin_lock_irqsave (&req->rq_lock, flags);
895                 req->rq_status = -ETIMEDOUT;
896                 req->rq_err = 1;
897                 spin_unlock_irqrestore (&req->rq_lock, flags);
898                 RETURN(1);
899         }
900
901         ptlrpc_fail_import(imp, req->rq_import_generation);
902
903         RETURN(0);
904 }
905
906 int ptlrpc_expired_set(void *data)
907 {
908         struct ptlrpc_request_set *set = data;
909         struct list_head          *tmp;
910         time_t                     now = LTIME_S(CURRENT_TIME);
911         ENTRY;
912
913         LASSERT(set != NULL);
914
915         /* A timeout expired; see which reqs it applies to... */
916         list_for_each (tmp, &set->set_requests) {
917                 struct ptlrpc_request *req =
918                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
919
920                 /* request in-flight? */
921                 if (!((req->rq_phase == RQ_PHASE_RPC && !req->rq_waiting 
922                        && !req->rq_resend) ||
923                       (req->rq_phase == RQ_PHASE_BULK)))
924                         continue;
925
926                 if (req->rq_timedout ||           /* already dealt with */
927                     req->rq_sent + req->rq_timeout > now) /* not expired */
928                         continue;
929
930                 /* deal with this guy */
931                 ptlrpc_expire_one_request (req);
932         }
933
934         /* When waiting for a whole set, we always to break out of the
935          * sleep so we can recalculate the timeout, or enable interrupts
936          * iff everyone's timed out.
937          */
938         RETURN(1);
939 }
940
941 void ptlrpc_mark_interrupted(struct ptlrpc_request *req)
942 {
943         unsigned long flags;
944         spin_lock_irqsave(&req->rq_lock, flags);
945         req->rq_intr = 1;
946         spin_unlock_irqrestore(&req->rq_lock, flags);
947 }
948
949 void ptlrpc_interrupted_set(void *data)
950 {
951         struct ptlrpc_request_set *set = data;
952         struct list_head *tmp;
953
954         LASSERT(set != NULL);
955         CERROR("INTERRUPTED SET %p\n", set);
956
957         list_for_each(tmp, &set->set_requests) {
958                 struct ptlrpc_request *req =
959                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
960
961                 if (req->rq_phase != RQ_PHASE_RPC)
962                         continue;
963
964                 ptlrpc_mark_interrupted(req);
965         }
966 }
967
968 int ptlrpc_set_next_timeout(struct ptlrpc_request_set *set)
969 {
970         struct list_head      *tmp;
971         time_t                 now = LTIME_S(CURRENT_TIME);
972         time_t                 deadline;
973         int                    timeout = 0;
974         struct ptlrpc_request *req;
975         ENTRY;
976
977         SIGNAL_MASK_ASSERT(); /* XXX BUG 1511 */
978
979         list_for_each(tmp, &set->set_requests) {
980                 req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
981
982                 /* request in-flight? */
983                 if (!((req->rq_phase == RQ_PHASE_RPC && !req->rq_waiting) ||
984                       (req->rq_phase == RQ_PHASE_BULK)))
985                         continue;
986
987                 if (req->rq_timedout)   /* already timed out */
988                         continue;
989
990                 deadline = req->rq_sent + req->rq_timeout;
991                 if (deadline <= now)    /* actually expired already */
992                         timeout = 1;    /* ASAP */
993                 else if (timeout == 0 || timeout > deadline - now)
994                         timeout = deadline - now;
995         }
996         RETURN(timeout);
997 }
998                 
999
1000 int ptlrpc_set_wait(struct ptlrpc_request_set *set)
1001 {
1002         struct list_head      *tmp;
1003         struct ptlrpc_request *req;
1004         struct l_wait_info     lwi;
1005         int                    rc, timeout;
1006         ENTRY;
1007
1008         LASSERT(!list_empty(&set->set_requests));
1009         list_for_each(tmp, &set->set_requests) {
1010                 req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
1011                 if (req->rq_phase == RQ_PHASE_NEW)
1012                         (void)ptlrpc_send_new_req(req);
1013         }
1014
1015         do {
1016                 timeout = ptlrpc_set_next_timeout(set);
1017
1018                 /* wait until all complete, interrupted, or an in-flight
1019                  * req times out */
1020                 CDEBUG(D_HA, "set %p going to sleep for %d seconds\n",
1021                        set, timeout);
1022                 lwi = LWI_TIMEOUT_INTR((timeout ? timeout : 1) * HZ,
1023                                        ptlrpc_expired_set,
1024                                        ptlrpc_interrupted_set, set);
1025                 rc = l_wait_event(set->set_waitq, ptlrpc_check_set(set), &lwi);
1026
1027                 LASSERT(rc == 0 || rc == -EINTR || rc == -ETIMEDOUT);
1028
1029                 /* -EINTR => all requests have been flagged rq_intr so next
1030                  * check completes.
1031                  * -ETIMEOUTD => someone timed out.  When all reqs have
1032                  * timed out, signals are enabled allowing completion with
1033                  * EINTR.
1034                  * I don't really care if we go once more round the loop in
1035                  * the error cases -eeb. */
1036         } while (rc != 0 || set->set_remaining != 0);
1037
1038         LASSERT(set->set_remaining == 0);
1039
1040         rc = 0;
1041         list_for_each(tmp, &set->set_requests) {
1042                 req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
1043
1044                 LASSERT(req->rq_phase == RQ_PHASE_COMPLETE);
1045                 if (req->rq_status != 0)
1046                         rc = req->rq_status;
1047         }
1048
1049         if (set->set_interpret != NULL) {
1050                 int (*interpreter)(struct ptlrpc_request_set *set,void *,int) =
1051                         set->set_interpret;
1052                 rc = interpreter (set, set->set_arg, rc);
1053         }
1054
1055         RETURN(rc);
1056 }
1057
1058 static void __ptlrpc_free_req(struct ptlrpc_request *request, int locked)
1059 {
1060         ENTRY;
1061         if (request == NULL) {
1062                 EXIT;
1063                 return;
1064         }
1065
1066         LASSERTF(!request->rq_receiving_reply, "req %p\n", request);
1067         LASSERTF(request->rq_rqbd == NULL, "req %p\n",request);/* client-side */
1068         LASSERTF(list_empty(&request->rq_list), "req %p\n", request);
1069         LASSERTF(list_empty(&request->rq_set_chain), "req %p\n", request);
1070         LASSERT(request->rq_cred);
1071
1072         /* We must take it off the imp_replay_list first.  Otherwise, we'll set
1073          * request->rq_reqmsg to NULL while osc_close is dereferencing it. */
1074         if (request->rq_import != NULL) {
1075                 unsigned long flags = 0;
1076                 if (!locked)
1077                         spin_lock_irqsave(&request->rq_import->imp_lock, flags);
1078                 list_del_init(&request->rq_replay_list);
1079                 if (!locked)
1080                         spin_unlock_irqrestore(&request->rq_import->imp_lock,
1081                                                flags);
1082         }
1083         LASSERTF(list_empty(&request->rq_replay_list), "req %p\n", request);
1084
1085         if (atomic_read(&request->rq_refcount) != 0) {
1086                 DEBUG_REQ(D_ERROR, request,
1087                           "freeing request with nonzero refcount");
1088                 LBUG();
1089         }
1090
1091         if (request->rq_repbuf != NULL)
1092                 ptlrpcs_cli_free_repbuf(request);
1093         if (request->rq_reqbuf != NULL)
1094                 ptlrpcs_cli_free_reqbuf(request);
1095
1096         if (request->rq_export != NULL) {
1097                 class_export_put(request->rq_export);
1098                 request->rq_export = NULL;
1099         }
1100         if (request->rq_import != NULL) {
1101                 class_import_put(request->rq_import);
1102                 request->rq_import = NULL;
1103         }
1104         if (request->rq_bulk != NULL)
1105                 ptlrpc_free_bulk(request->rq_bulk);
1106
1107         ptlrpcs_req_drop_cred(request);
1108         OBD_FREE(request, sizeof(*request));
1109         EXIT;
1110 }
1111
1112 void ptlrpc_free_req(struct ptlrpc_request *request)
1113 {
1114         __ptlrpc_free_req(request, 0);
1115 }
1116
1117 static int __ptlrpc_req_finished(struct ptlrpc_request *request, int locked);
1118 void ptlrpc_req_finished_with_imp_lock(struct ptlrpc_request *request)
1119 {
1120         LASSERT_SPIN_LOCKED(&request->rq_import->imp_lock);
1121         (void)__ptlrpc_req_finished(request, 1);
1122 }
1123
1124 static int __ptlrpc_req_finished(struct ptlrpc_request *request, int locked)
1125 {
1126         ENTRY;
1127         if (request == NULL)
1128                 RETURN(1);
1129
1130         if (request == LP_POISON ||
1131             request->rq_reqmsg == LP_POISON) {
1132                 CERROR("dereferencing freed request (bug 575)\n");
1133                 LBUG();
1134                 RETURN(1);
1135         }
1136
1137         DEBUG_REQ(D_INFO, request, "refcount now %u",
1138                   atomic_read(&request->rq_refcount) - 1);
1139
1140         if (atomic_dec_and_test(&request->rq_refcount)) {
1141                 __ptlrpc_free_req(request, locked);
1142                 RETURN(1);
1143         }
1144
1145         RETURN(0);
1146 }
1147
1148 void ptlrpc_req_finished(struct ptlrpc_request *request)
1149 {
1150         __ptlrpc_req_finished(request, 0);
1151 }
1152
1153 /* Disengage the client's reply buffer from the network
1154  * NB does _NOT_ unregister any client-side bulk.
1155  * IDEMPOTENT, but _not_ safe against concurrent callers.
1156  * The request owner (i.e. the thread doing the I/O) must call...
1157  */
1158 void ptlrpc_unregister_reply (struct ptlrpc_request *request)
1159 {
1160         int                rc;
1161         wait_queue_head_t *wq;
1162         struct l_wait_info lwi;
1163
1164         LASSERT(!in_interrupt ());             /* might sleep */
1165
1166         if (!ptlrpc_client_receiving_reply(request))
1167                 return;
1168
1169         PtlMDUnlink (request->rq_reply_md_h);
1170
1171         /* We have to l_wait_event() whatever the result, to give liblustre
1172          * a chance to run reply_in_callback() */
1173
1174         if (request->rq_set != NULL)
1175                 wq = &request->rq_set->set_waitq;
1176         else
1177                 wq = &request->rq_reply_waitq;
1178
1179         for (;;) {
1180                 /* Network access will complete in finite time but the HUGE
1181                  * timeout lets us CWARN for visibility of sluggish NALs */
1182                 lwi = LWI_TIMEOUT(300 * HZ, NULL, NULL);
1183                 rc = l_wait_event (*wq, !ptlrpc_client_receiving_reply(request), &lwi);
1184                 if (rc == 0)
1185                         return;
1186
1187                 LASSERT (rc == -ETIMEDOUT);
1188                 DEBUG_REQ(D_WARNING, request, "Unexpectedly long timeout");
1189         }
1190 }
1191
1192 /* caller must hold imp->imp_lock */
1193 void ptlrpc_free_committed(struct obd_import *imp)
1194 {
1195         struct list_head *tmp, *saved;
1196         struct ptlrpc_request *req;
1197         struct ptlrpc_request *last_req = NULL; /* temporary fire escape */
1198         ENTRY;
1199
1200         LASSERT(imp != NULL);
1201
1202         LASSERT_SPIN_LOCKED(&imp->imp_lock);
1203
1204         CDEBUG(D_HA, "%s: committing for last_committed "LPU64"\n",
1205                imp->imp_obd->obd_name, imp->imp_peer_committed_transno);
1206
1207         list_for_each_safe(tmp, saved, &imp->imp_replay_list) {
1208                 req = list_entry(tmp, struct ptlrpc_request, rq_replay_list);
1209
1210                 /* XXX ok to remove when 1357 resolved - rread 05/29/03  */
1211                 LASSERT(req != last_req);
1212                 last_req = req;
1213
1214                 if (req->rq_import_generation < imp->imp_generation) {
1215                         DEBUG_REQ(D_HA, req, "freeing request with old gen");
1216                         GOTO(free_req, 0);
1217                 }
1218
1219                 if (req->rq_replay) {
1220                         DEBUG_REQ(D_HA, req, "keeping (FL_REPLAY)");
1221                         continue;
1222                 }
1223
1224                 /* not yet committed */
1225                 if (req->rq_transno > imp->imp_peer_committed_transno) {
1226                         DEBUG_REQ(D_HA, req, "stopping search");
1227                         break;
1228                 }
1229
1230                 DEBUG_REQ(D_HA, req, "committing (last_committed "LPU64")",
1231                           imp->imp_peer_committed_transno);
1232 free_req:
1233                 if (req->rq_commit_cb != NULL)
1234                         req->rq_commit_cb(req);
1235                 list_del_init(&req->rq_replay_list);
1236                 __ptlrpc_req_finished(req, 1);
1237         }
1238
1239         EXIT;
1240         return;
1241 }
1242
1243 void ptlrpc_cleanup_client(struct obd_import *imp)
1244 {
1245         ENTRY;
1246         EXIT;
1247         return;
1248 }
1249
1250 void ptlrpc_resend_req(struct ptlrpc_request *req)
1251 {
1252         unsigned long flags;
1253
1254         DEBUG_REQ(D_HA, req, "going to resend");
1255         req->rq_reqmsg->handle.cookie = 0;
1256         req->rq_status = -EAGAIN;
1257
1258         spin_lock_irqsave (&req->rq_lock, flags);
1259         req->rq_resend = 1;
1260         req->rq_net_err = 0;
1261         req->rq_timedout = 0;
1262         if (req->rq_bulk) {
1263                 __u64 old_xid = req->rq_xid;
1264                 
1265                 /* ensure previous bulk fails */
1266                 req->rq_xid = ptlrpc_next_xid();
1267                 CDEBUG(D_HA, "resend bulk old x"LPU64" new x"LPU64"\n",
1268                        old_xid, req->rq_xid);
1269         }
1270         ptlrpc_wake_client_req(req);
1271         spin_unlock_irqrestore (&req->rq_lock, flags);
1272 }
1273
1274 /* XXX: this function and rq_status are currently unused */
1275 void ptlrpc_restart_req(struct ptlrpc_request *req)
1276 {
1277         unsigned long flags;
1278
1279         DEBUG_REQ(D_HA, req, "restarting (possibly-)completed request");
1280         req->rq_status = -ERESTARTSYS;
1281
1282         spin_lock_irqsave (&req->rq_lock, flags);
1283         req->rq_restart = 1;
1284         req->rq_timedout = 0;
1285         ptlrpc_wake_client_req(req);
1286         spin_unlock_irqrestore (&req->rq_lock, flags);
1287 }
1288
1289 static int expired_request(void *data)
1290 {
1291         struct ptlrpc_request *req = data;
1292         ENTRY;
1293
1294         RETURN(ptlrpc_expire_one_request(req));
1295 }
1296
1297 static void interrupted_request(void *data)
1298 {
1299         unsigned long flags;
1300
1301         struct ptlrpc_request *req = data;
1302         DEBUG_REQ(D_HA, req, "request interrupted");
1303         spin_lock_irqsave (&req->rq_lock, flags);
1304         req->rq_intr = 1;
1305         spin_unlock_irqrestore (&req->rq_lock, flags);
1306 }
1307
1308 struct ptlrpc_request *ptlrpc_request_addref(struct ptlrpc_request *req)
1309 {
1310         ENTRY;
1311         atomic_inc(&req->rq_refcount);
1312         RETURN(req);
1313 }
1314
1315 void ptlrpc_retain_replayable_request(struct ptlrpc_request *req,
1316                                       struct obd_import *imp)
1317 {
1318         struct list_head *tmp;
1319
1320         LASSERT_SPIN_LOCKED(&imp->imp_lock);
1321
1322         /* clear this  for new requests that were resent as well
1323            as resent replayed requests. */
1324         lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT);
1325
1326         /* don't re-add requests that have been replayed */
1327         if (!list_empty(&req->rq_replay_list))
1328                 return;
1329
1330         lustre_msg_add_flags(req->rq_reqmsg, MSG_REPLAY);
1331
1332         LASSERT(imp->imp_replayable);
1333         /* Balanced in ptlrpc_free_committed, usually. */
1334         ptlrpc_request_addref(req);
1335         list_for_each_prev(tmp, &imp->imp_replay_list) {
1336                 struct ptlrpc_request *iter =
1337                         list_entry(tmp, struct ptlrpc_request, rq_replay_list);
1338
1339                 /* We may have duplicate transnos if we create and then
1340                  * open a file, or for closes retained if to match creating
1341                  * opens, so use req->rq_xid as a secondary key.
1342                  * (See bugs 684, 685, and 428.)
1343                  * XXX no longer needed, but all opens need transnos!
1344                  */
1345                 if (iter->rq_transno > req->rq_transno)
1346                         continue;
1347
1348                 if (iter->rq_transno == req->rq_transno) {
1349                         LASSERT(iter->rq_xid != req->rq_xid);
1350                         if (iter->rq_xid > req->rq_xid)
1351                                 continue;
1352                 }
1353
1354                 list_add(&req->rq_replay_list, &iter->rq_replay_list);
1355                 return;
1356         }
1357
1358         list_add_tail(&req->rq_replay_list, &imp->imp_replay_list);
1359 }
1360
1361 int ptlrpc_queue_wait(struct ptlrpc_request *req)
1362 {
1363         char str[PTL_NALFMT_SIZE];
1364         int rc = 0;
1365         int brc;
1366         struct l_wait_info lwi;
1367         struct obd_import *imp = req->rq_import;
1368         unsigned long flags;
1369         int timeout = 0;
1370         ENTRY;
1371
1372         LASSERT(req->rq_set == NULL);
1373         LASSERT(!req->rq_receiving_reply);
1374         atomic_inc(&imp->imp_inflight);
1375
1376         if (imp->imp_connection == NULL) {
1377                 CERROR("request on not connected import %s\n",
1378                         imp->imp_obd->obd_name);
1379                 RETURN(-EINVAL);
1380         }
1381
1382         /* for distributed debugging */
1383         req->rq_reqmsg->status = current->pid;
1384         LASSERT(imp->imp_obd != NULL);
1385         CDEBUG(D_RPCTRACE, "Sending RPC pname:cluuid:pid:xid:ni:nid:opc "
1386                "%s:%s:%d:"LPU64":%s:%s:%d\n", current->comm,
1387                imp->imp_obd->obd_uuid.uuid,
1388                req->rq_reqmsg->status, req->rq_xid,
1389                imp->imp_connection->c_peer.peer_ni->pni_name,
1390                ptlrpc_peernid2str(&imp->imp_connection->c_peer, str),
1391                req->rq_reqmsg->opc);
1392
1393         /* Mark phase here for a little debug help */
1394         req->rq_phase = RQ_PHASE_RPC;
1395
1396         spin_lock_irqsave(&imp->imp_lock, flags);
1397         req->rq_import_generation = imp->imp_generation;
1398 restart:
1399         if (ptlrpc_import_delay_req(imp, req, &rc)) {
1400                 list_del(&req->rq_list);
1401
1402                 list_add_tail(&req->rq_list, &imp->imp_delayed_list);
1403                 spin_unlock_irqrestore(&imp->imp_lock, flags);
1404
1405                 DEBUG_REQ(D_HA, req, "\"%s\" waiting for recovery: (%s != %s)",
1406                           current->comm, 
1407                           ptlrpc_import_state_name(req->rq_send_state), 
1408                           ptlrpc_import_state_name(imp->imp_state));
1409                 lwi = LWI_INTR(interrupted_request, req);
1410                 rc = l_wait_event(req->rq_reply_waitq,
1411                                   (req->rq_send_state == imp->imp_state ||
1412                                    req->rq_err),
1413                                   &lwi);
1414                 DEBUG_REQ(D_HA, req, "\"%s\" awake: (%s == %s or %d == 1)",
1415                           current->comm, 
1416                           ptlrpc_import_state_name(imp->imp_state), 
1417                           ptlrpc_import_state_name(req->rq_send_state),
1418                           req->rq_err);
1419
1420                 spin_lock_irqsave(&imp->imp_lock, flags);
1421                 list_del_init(&req->rq_list);
1422
1423                 if (req->rq_err) {
1424                         rc = -EIO;
1425                 } 
1426                 else if (req->rq_intr) {
1427                         rc = -EINTR;
1428                 }
1429                 else if (req->rq_no_resend) {
1430                         spin_unlock_irqrestore(&imp->imp_lock, flags);
1431                         GOTO(out, rc = -ETIMEDOUT);
1432                 }
1433                 else {
1434                         GOTO(restart, rc);
1435                 }
1436         } 
1437
1438         if (rc != 0) {
1439                 list_del_init(&req->rq_list);
1440                 spin_unlock_irqrestore(&imp->imp_lock, flags);
1441                 req->rq_status = rc; // XXX this ok?
1442                 GOTO(out, rc);
1443         }
1444
1445         if (req->rq_resend) {
1446                 if (!req->rq_ptlrpcs_restart)
1447                         lustre_msg_add_flags(req->rq_reqmsg, MSG_RESENT);
1448
1449                 if (req->rq_bulk != NULL)
1450                         ptlrpc_unregister_bulk (req);
1451
1452                 DEBUG_REQ(D_HA, req, "resending: ");
1453         }
1454
1455         /* XXX this is the same as ptlrpc_set_wait */
1456         LASSERT(list_empty(&req->rq_list));
1457         list_add_tail(&req->rq_list, &imp->imp_sending_list);
1458         spin_unlock_irqrestore(&imp->imp_lock, flags);
1459
1460         rc = ptl_send_rpc(req);
1461         if (rc) {
1462                 DEBUG_REQ(D_HA, req, "send failed (%d); recovering", rc);
1463                 timeout = 1;
1464         } else {
1465                 timeout = MAX(req->rq_timeout * HZ, 1);
1466                 DEBUG_REQ(D_NET, req, "-- sleeping for %d jiffies", timeout);
1467         }
1468         lwi = LWI_TIMEOUT_INTR(timeout, expired_request, interrupted_request,
1469                                req);
1470         l_wait_event(req->rq_reply_waitq, ptlrpc_check_reply(req), &lwi);
1471         DEBUG_REQ(D_NET, req, "-- done sleeping");
1472
1473         CDEBUG(D_RPCTRACE, "Completed RPC pname:cluuid:pid:xid:ni:nid:opc "
1474                "%s:%s:%d:"LPU64":%s:%s:%d\n", current->comm,
1475                imp->imp_obd->obd_uuid.uuid,
1476                req->rq_reqmsg->status, req->rq_xid,
1477                imp->imp_connection->c_peer.peer_ni->pni_name,
1478                ptlrpc_peernid2str(&imp->imp_connection->c_peer, str),
1479                req->rq_reqmsg->opc);
1480
1481         spin_lock_irqsave(&imp->imp_lock, flags);
1482         list_del_init(&req->rq_list);
1483         spin_unlock_irqrestore(&imp->imp_lock, flags);
1484
1485         /* If the reply was received normally, this just grabs the spinlock
1486          * (ensuring the reply callback has returned), sees that
1487          * req->rq_receiving_reply is clear and returns. */
1488         ptlrpc_unregister_reply (req);
1489
1490         if (req->rq_err)
1491                 GOTO(out, rc = -EIO);
1492
1493         /* Resend if we need to, unless we were interrupted. */
1494         if (req->rq_resend && !req->rq_intr) {
1495                 /* ...unless we were specifically told otherwise. */
1496                 if (req->rq_no_resend)
1497                         GOTO(out, rc = -ETIMEDOUT);
1498                 spin_lock_irqsave(&imp->imp_lock, flags);
1499                 goto restart;
1500         }
1501
1502         if (req->rq_intr) {
1503                 /* Should only be interrupted if we timed out. */
1504                 if (!req->rq_timedout)
1505                         DEBUG_REQ(D_ERROR, req,
1506                                   "rq_intr set but rq_timedout not");
1507                 GOTO(out, rc = -EINTR);
1508         }
1509
1510         if (req->rq_timedout) {                 /* non-recoverable timeout */
1511                 GOTO(out, rc = -ETIMEDOUT);
1512         }
1513
1514         if (!req->rq_replied) {
1515                 /* How can this be? -eeb */
1516                 DEBUG_REQ(D_ERROR, req, "!rq_replied: ");
1517                 LBUG();
1518                 GOTO(out, rc = req->rq_status);
1519         }
1520
1521         rc = after_reply (req);
1522         /* NB may return +ve success rc */
1523         if (req->rq_resend) {
1524                 spin_lock_irqsave(&imp->imp_lock, flags);
1525                 goto restart;
1526         }
1527
1528  out:
1529         if (req->rq_bulk != NULL) {
1530                 if (rc >= 0) {                  
1531                         /* success so far.  Note that anything going wrong
1532                          * with bulk now, is EXTREMELY strange, since the
1533                          * server must have believed that the bulk
1534                          * tranferred OK before she replied with success to
1535                          * me. */
1536                         lwi = LWI_TIMEOUT(timeout, NULL, NULL);
1537                         brc = l_wait_event(req->rq_reply_waitq,
1538                                            !ptlrpc_bulk_active(req->rq_bulk),
1539                                            &lwi);
1540                         LASSERT(brc == 0 || brc == -ETIMEDOUT);
1541                         if (brc != 0) {
1542                                 LASSERT(brc == -ETIMEDOUT);
1543                                 DEBUG_REQ(D_ERROR, req, "bulk timed out");
1544                                 rc = brc;
1545                         } else if (!req->rq_bulk->bd_success) {
1546                                 DEBUG_REQ(D_ERROR, req, "bulk transfer failed");
1547                                 rc = -EIO;
1548                         }
1549                 }
1550                 if (rc < 0)
1551                         ptlrpc_unregister_bulk (req);
1552         }
1553
1554         LASSERT(!req->rq_receiving_reply);
1555         req->rq_phase = RQ_PHASE_INTERPRET;
1556
1557         atomic_dec(&imp->imp_inflight);
1558         wake_up(&imp->imp_recovery_waitq);
1559         RETURN(rc);
1560 }
1561
1562 struct ptlrpc_replay_async_args {
1563         int praa_old_state;
1564         int praa_old_status;
1565 };
1566
1567 static int ptlrpc_replay_interpret(struct ptlrpc_request *req,
1568                                     void * data, int rc)
1569 {
1570         struct ptlrpc_replay_async_args *aa = data;
1571         struct obd_import *imp = req->rq_import;
1572         unsigned long flags;
1573
1574         atomic_dec(&imp->imp_replay_inflight);
1575         
1576         if (!req->rq_replied) {
1577                 CERROR("request replay timed out, restarting recovery\n");
1578                 GOTO(out, rc = -ETIMEDOUT);
1579         }
1580
1581 #if SWAB_PARANOIA
1582         /* Clear reply swab mask; this is a new reply in sender's byte order */
1583         req->rq_rep_swab_mask = 0;
1584 #endif
1585         LASSERT (req->rq_nob_received <= req->rq_repbuf_len);
1586         rc = lustre_unpack_msg(req->rq_repmsg, req->rq_replen);
1587         if (rc) {
1588                 CERROR("unpack_rep failed: %d\n", rc);
1589                 GOTO(out, rc = -EPROTO);
1590         }
1591
1592         if (req->rq_repmsg->type == PTL_RPC_MSG_ERR && 
1593             req->rq_repmsg->status == -ENOTCONN) 
1594                 GOTO(out, rc = req->rq_repmsg->status);
1595
1596         /* The transno had better not change over replay. */
1597         LASSERT(req->rq_reqmsg->transno == req->rq_repmsg->transno);
1598
1599         DEBUG_REQ(D_HA, req, "got rep");
1600
1601         /* let the callback do fixups, possibly including in the request */
1602         if (req->rq_replay_cb)
1603                 req->rq_replay_cb(req);
1604
1605         if (req->rq_replied && req->rq_repmsg->status != aa->praa_old_status) {
1606                 DEBUG_REQ(D_ERROR, req, "status %d, old was %d",
1607                           req->rq_repmsg->status, aa->praa_old_status);
1608         } else {
1609                 /* Put it back for re-replay. */
1610                 req->rq_repmsg->status = aa->praa_old_status;
1611         }
1612
1613         spin_lock_irqsave(&imp->imp_lock, flags);
1614         imp->imp_last_replay_transno = req->rq_transno;
1615         spin_unlock_irqrestore(&imp->imp_lock, flags);
1616
1617         /* continue with recovery */
1618         rc = ptlrpc_import_recovery_state_machine(imp);
1619  out:
1620         req->rq_send_state = aa->praa_old_state;
1621         
1622         if (rc != 0)
1623                 /* this replay failed, so restart recovery */
1624                 ptlrpc_connect_import(imp, NULL);
1625
1626         RETURN(rc);
1627 }
1628
1629
1630 int ptlrpc_replay_req(struct ptlrpc_request *req)
1631 {
1632         struct ptlrpc_replay_async_args *aa;
1633         ENTRY;
1634
1635         LASSERT(req->rq_import->imp_state == LUSTRE_IMP_REPLAY);
1636
1637         /* Not handling automatic bulk replay yet (or ever?) */
1638         LASSERT(req->rq_bulk == NULL);
1639
1640         DEBUG_REQ(D_HA, req, "REPLAY");
1641
1642         LASSERT (sizeof (*aa) <= sizeof (req->rq_async_args));
1643         aa = (struct ptlrpc_replay_async_args *)&req->rq_async_args;
1644         memset(aa, 0, sizeof *aa);
1645
1646         /* Prepare request to be resent with ptlrpcd */
1647         aa->praa_old_state = req->rq_send_state;
1648         req->rq_send_state = LUSTRE_IMP_REPLAY;
1649         req->rq_phase = RQ_PHASE_NEW;
1650         aa->praa_old_status = req->rq_repmsg->status;
1651         req->rq_status = 0;
1652
1653         req->rq_interpret_reply = ptlrpc_replay_interpret;
1654         atomic_inc(&req->rq_import->imp_replay_inflight);
1655         ptlrpc_request_addref(req); /* ptlrpcd needs a ref */
1656
1657         ptlrpcd_add_req(req);
1658         RETURN(0);
1659 }
1660
1661 void ptlrpc_abort_inflight(struct obd_import *imp)
1662 {
1663         unsigned long flags;
1664         struct list_head *tmp, *n;
1665         ENTRY;
1666
1667         /* Make sure that no new requests get processed for this import.
1668          * ptlrpc_{queue,set}_wait must (and does) hold imp_lock while testing
1669          * this flag and then putting requests on sending_list or delayed_list.
1670          */
1671         spin_lock_irqsave(&imp->imp_lock, flags);
1672
1673         /* XXX locking?  Maybe we should remove each request with the list
1674          * locked?  Also, how do we know if the requests on the list are
1675          * being freed at this time?
1676          */
1677         list_for_each_safe(tmp, n, &imp->imp_sending_list) {
1678                 struct ptlrpc_request *req =
1679                         list_entry(tmp, struct ptlrpc_request, rq_list);
1680
1681                 DEBUG_REQ(D_HA, req, "inflight");
1682
1683                 spin_lock (&req->rq_lock);
1684                 if (req->rq_import_generation < imp->imp_generation) {
1685                         req->rq_err = 1;
1686                         ptlrpc_wake_client_req(req);
1687                 }
1688                 spin_unlock (&req->rq_lock);
1689         }
1690
1691         list_for_each_safe(tmp, n, &imp->imp_delayed_list) {
1692                 struct ptlrpc_request *req =
1693                         list_entry(tmp, struct ptlrpc_request, rq_list);
1694
1695                 DEBUG_REQ(D_HA, req, "aborting waiting req");
1696
1697                 spin_lock (&req->rq_lock);
1698                 if (req->rq_import_generation < imp->imp_generation) {
1699                         req->rq_err = 1;
1700                         ptlrpc_wake_client_req(req);
1701                 }
1702                 spin_unlock (&req->rq_lock);
1703         }
1704
1705         list_for_each_safe(tmp, n, &imp->imp_rawrpc_list) {
1706                 struct ptlrpc_request *req =
1707                         list_entry(tmp, struct ptlrpc_request, rq_list);
1708
1709                 DEBUG_REQ(D_HA, req, "aborting raw rpc");
1710
1711                 spin_lock(&req->rq_lock);
1712                 req->rq_err = 1;
1713                 ptlrpc_wake_client_req(req);
1714                 spin_unlock(&req->rq_lock);
1715         }
1716
1717         /* Last chance to free reqs left on the replay list, but we
1718          * will still leak reqs that haven't comitted.  */
1719         if (imp->imp_replayable)
1720                 ptlrpc_free_committed(imp);
1721
1722         spin_unlock_irqrestore(&imp->imp_lock, flags);
1723
1724         EXIT;
1725 }
1726
1727 static __u64 ptlrpc_last_xid = 0;
1728 static spinlock_t ptlrpc_last_xid_lock = SPIN_LOCK_UNLOCKED;
1729
1730 __u64 ptlrpc_next_xid(void)
1731 {
1732         __u64 tmp;
1733         spin_lock(&ptlrpc_last_xid_lock);
1734         tmp = ++ptlrpc_last_xid;
1735         spin_unlock(&ptlrpc_last_xid_lock);
1736         return tmp;
1737 }
1738
1739