Whamcloud - gitweb
b=3886
[fs/lustre-release.git] / lustre / ptlrpc / client.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  *
21  */
22
23 #define DEBUG_SUBSYSTEM S_RPC
24 #ifndef __KERNEL__
25 #include <errno.h>
26 #include <signal.h>
27 #include <liblustre.h>
28 #endif
29
30 #include <linux/obd_support.h>
31 #include <linux/obd_class.h>
32 #include <linux/lustre_lib.h>
33 #include <linux/lustre_ha.h>
34 #include <linux/lustre_import.h>
35
36 #include "ptlrpc_internal.h"
37
38 void ptlrpc_init_client(int req_portal, int rep_portal, char *name,
39                         struct ptlrpc_client *cl)
40 {
41         cl->cli_request_portal = req_portal;
42         cl->cli_reply_portal   = rep_portal;
43         cl->cli_name           = name;
44 }
45
46 struct ptlrpc_connection *ptlrpc_uuid_to_connection(struct obd_uuid *uuid)
47 {
48         struct ptlrpc_connection *c;
49         struct ptlrpc_peer peer;
50         int err;
51
52         err = ptlrpc_uuid_to_peer(uuid, &peer);
53         if (err != 0) {
54                 CERROR("cannot find peer %s!\n", uuid->uuid);
55                 return NULL;
56         }
57
58         c = ptlrpc_get_connection(&peer, uuid);
59         if (c) {
60                 memcpy(c->c_remote_uuid.uuid,
61                        uuid->uuid, sizeof(c->c_remote_uuid.uuid));
62         }
63
64         CDEBUG(D_INFO, "%s -> %p\n", uuid->uuid, c);
65
66         return c;
67 }
68
69 void ptlrpc_readdress_connection(struct ptlrpc_connection *conn,
70                                  struct obd_uuid *uuid)
71 {
72         struct ptlrpc_peer peer;
73         int err;
74
75         err = ptlrpc_uuid_to_peer(uuid, &peer);
76         if (err != 0) {
77                 CERROR("cannot find peer %s!\n", uuid->uuid);
78                 return;
79         }
80
81         memcpy(&conn->c_peer, &peer, sizeof (peer));
82         return;
83 }
84
85 static inline struct ptlrpc_bulk_desc *new_bulk(int npages, int type, int portal)
86 {
87         struct ptlrpc_bulk_desc *desc;
88
89         OBD_ALLOC(desc, offsetof (struct ptlrpc_bulk_desc, bd_iov[npages]));
90         if (!desc)
91                 return NULL;
92
93         spin_lock_init(&desc->bd_lock);
94         init_waitqueue_head(&desc->bd_waitq);
95         desc->bd_max_iov = npages;
96         desc->bd_iov_count = 0;
97         desc->bd_md_h = PTL_INVALID_HANDLE;
98         desc->bd_portal = portal;
99         desc->bd_type = type;
100         
101         return desc;
102 }
103
104 struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_imp (struct ptlrpc_request *req,
105                                                int npages, int type, int portal)
106 {
107         struct obd_import *imp = req->rq_import;
108         struct ptlrpc_bulk_desc *desc;
109
110         LASSERT(type == BULK_PUT_SINK || type == BULK_GET_SOURCE);
111         desc = new_bulk(npages, type, portal);
112         if (desc == NULL)
113                 RETURN(NULL);
114
115         desc->bd_import_generation = req->rq_import_generation;
116         desc->bd_import = class_import_get(imp);
117         desc->bd_req = req;
118
119         desc->bd_cbid.cbid_fn  = client_bulk_callback;
120         desc->bd_cbid.cbid_arg = desc;
121
122         /* This makes req own desc, and free it when she frees herself */
123         req->rq_bulk = desc;
124
125         return desc;
126 }
127
128 struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_exp (struct ptlrpc_request *req,
129                                                int npages, int type, int portal)
130 {
131         struct obd_export *exp = req->rq_export;
132         struct ptlrpc_bulk_desc *desc;
133
134         LASSERT(type == BULK_PUT_SOURCE || type == BULK_GET_SINK);
135
136         desc = new_bulk(npages, type, portal);
137         if (desc == NULL)
138                 RETURN(NULL);
139
140         desc->bd_export = class_export_get(exp);
141         desc->bd_req = req;
142
143         desc->bd_cbid.cbid_fn  = server_bulk_callback;
144         desc->bd_cbid.cbid_arg = desc;
145
146         /* NB we don't assign rq_bulk here; server-side requests are
147          * re-used, and the handler frees the bulk desc explicitly. */
148
149         return desc;
150 }
151
152 void ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc,
153                            struct page *page, int pageoffset, int len)
154 {
155         LASSERT(desc->bd_iov_count < desc->bd_max_iov);
156         LASSERT(page != NULL);
157         LASSERT(pageoffset >= 0);
158         LASSERT(len > 0);
159         LASSERT(pageoffset + len <= PAGE_SIZE);
160
161         desc->bd_nob += len;
162
163         ptlrpc_add_bulk_page(desc, page, pageoffset, len);
164 }
165
166 void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *desc)
167 {
168         ENTRY;
169
170         LASSERT(desc != NULL);
171         LASSERT(desc->bd_iov_count != LI_POISON); /* not freed already */
172         LASSERT(!desc->bd_network_rw);         /* network hands off or */
173         LASSERT((desc->bd_export != NULL) ^ (desc->bd_import != NULL));
174         if (desc->bd_export)
175                 class_export_put(desc->bd_export);
176         else
177                 class_import_put(desc->bd_import);
178
179         OBD_FREE(desc, offsetof(struct ptlrpc_bulk_desc, 
180                                 bd_iov[desc->bd_max_iov]));
181         EXIT;
182 }
183
184 struct ptlrpc_request *ptlrpc_prep_req(struct obd_import *imp, int opcode,
185                                        int count, int *lengths, char **bufs)
186 {
187         struct ptlrpc_request *request;
188         int rc;
189         ENTRY;
190
191         LASSERT((unsigned long)imp > 0x1000);
192
193         OBD_ALLOC(request, sizeof(*request));
194         if (!request) {
195                 CERROR("request allocation out of memory\n");
196                 RETURN(NULL);
197         }
198
199         rc = lustre_pack_request(request, count, lengths, bufs);
200         if (rc) {
201                 CERROR("cannot pack request %d\n", rc);
202                 OBD_FREE(request, sizeof(*request));
203                 RETURN(NULL);
204         }
205
206         if (imp->imp_server_timeout)
207                 request->rq_timeout = obd_timeout / 2;
208         else
209                 request->rq_timeout = obd_timeout;
210         request->rq_send_state = LUSTRE_IMP_FULL;
211         request->rq_type = PTL_RPC_MSG_REQUEST;
212         request->rq_import = class_import_get(imp);
213
214         request->rq_req_cbid.cbid_fn  = request_out_callback;
215         request->rq_req_cbid.cbid_arg = request;
216
217         request->rq_reply_cbid.cbid_fn  = reply_in_callback;
218         request->rq_reply_cbid.cbid_arg = request;
219         
220         request->rq_phase = RQ_PHASE_NEW;
221
222         /* XXX FIXME bug 249 */
223         request->rq_request_portal = imp->imp_client->cli_request_portal;
224         request->rq_reply_portal = imp->imp_client->cli_reply_portal;
225
226         spin_lock_init(&request->rq_lock);
227         INIT_LIST_HEAD(&request->rq_list);
228         INIT_LIST_HEAD(&request->rq_replay_list);
229         INIT_LIST_HEAD(&request->rq_set_chain);
230         init_waitqueue_head(&request->rq_reply_waitq);
231         request->rq_xid = ptlrpc_next_xid();
232         atomic_set(&request->rq_refcount, 1);
233
234         request->rq_reqmsg->opc = opcode;
235         request->rq_reqmsg->flags = 0;
236
237         RETURN(request);
238 }
239
240 struct ptlrpc_request_set *ptlrpc_prep_set(void)
241 {
242         struct ptlrpc_request_set *set;
243
244         OBD_ALLOC(set, sizeof *set);
245         if (!set)
246                 RETURN(NULL);
247         INIT_LIST_HEAD(&set->set_requests);
248         init_waitqueue_head(&set->set_waitq);
249         set->set_remaining = 0;
250         spin_lock_init(&set->set_new_req_lock);
251         INIT_LIST_HEAD(&set->set_new_requests);
252
253         RETURN(set);
254 }
255
256 /* Finish with this set; opposite of prep_set. */
257 void ptlrpc_set_destroy(struct ptlrpc_request_set *set)
258 {
259         struct list_head *tmp;
260         struct list_head *next;
261         int               expected_phase;
262         int               n = 0;
263         ENTRY;
264
265         /* Requests on the set should either all be completed, or all be new */
266         expected_phase = (set->set_remaining == 0) ?
267                          RQ_PHASE_COMPLETE : RQ_PHASE_NEW;
268         list_for_each (tmp, &set->set_requests) {
269                 struct ptlrpc_request *req =
270                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
271
272                 LASSERT(req->rq_phase == expected_phase);
273                 n++;
274         }
275
276         LASSERT(set->set_remaining == 0 || set->set_remaining == n);
277
278         list_for_each_safe(tmp, next, &set->set_requests) {
279                 struct ptlrpc_request *req =
280                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
281                 list_del_init(&req->rq_set_chain);
282
283                 LASSERT(req->rq_phase == expected_phase);
284
285                 if (req->rq_phase == RQ_PHASE_NEW) {
286
287                         if (req->rq_interpret_reply != NULL) {
288                                 int (*interpreter)(struct ptlrpc_request *,
289                                                    void *, int) =
290                                         req->rq_interpret_reply;
291
292                                 /* higher level (i.e. LOV) failed;
293                                  * let the sub reqs clean up */
294                                 req->rq_status = -EBADR;
295                                 interpreter(req, &req->rq_async_args,
296                                             req->rq_status);
297                         }
298                         set->set_remaining--;
299                 }
300
301                 req->rq_set = NULL;
302                 ptlrpc_req_finished (req);
303         }
304
305         LASSERT(set->set_remaining == 0);
306
307         OBD_FREE(set, sizeof(*set));
308         EXIT;
309 }
310
311 void ptlrpc_set_add_req(struct ptlrpc_request_set *set,
312                         struct ptlrpc_request *req)
313 {
314         /* The set takes over the caller's request reference */
315         list_add_tail(&req->rq_set_chain, &set->set_requests);
316         req->rq_set = set;
317         set->set_remaining++;
318         atomic_inc(&req->rq_import->imp_inflight);
319 }
320
321 /* lock so many callers can add things, the context that owns the set
322  * is supposed to notice these and move them into the set proper. */
323 void ptlrpc_set_add_new_req(struct ptlrpc_request_set *set,
324                             struct ptlrpc_request *req)
325 {
326         unsigned long flags;
327         spin_lock_irqsave(&set->set_new_req_lock, flags);
328         /* The set takes over the caller's request reference */
329         list_add_tail(&req->rq_set_chain, &set->set_new_requests);
330         req->rq_set = set;
331         spin_unlock_irqrestore(&set->set_new_req_lock, flags);
332 }
333
334 /*
335  * Based on the current state of the import, determine if the request
336  * can be sent, is an error, or should be delayed.
337  *
338  * Returns true if this request should be delayed. If false, and
339  * *status is set, then the request can not be sent and *status is the
340  * error code.  If false and status is 0, then request can be sent.
341  *
342  * The imp->imp_lock must be held.
343  */
344 static int ptlrpc_import_delay_req(struct obd_import *imp, 
345                                    struct ptlrpc_request *req, int *status)
346 {
347         int delay = 0;
348         ENTRY;
349
350         LASSERT (status != NULL);
351         *status = 0;
352
353         if (imp->imp_state == LUSTRE_IMP_NEW) {
354                 DEBUG_REQ(D_ERROR, req, "Uninitialized import.");
355                 *status = -EIO;
356                 LBUG();
357         }
358         else if (imp->imp_state == LUSTRE_IMP_CLOSED) {
359                 DEBUG_REQ(D_ERROR, req, "IMP_CLOSED ");
360                 *status = -EIO;
361         }
362         /* allow CONNECT even if import is invalid */
363         else if (req->rq_send_state == LUSTRE_IMP_CONNECTING &&
364                  imp->imp_state == LUSTRE_IMP_CONNECTING) {
365                 ;
366         }
367         /*
368          * If the import has been invalidated (such as by an OST failure), the
369          * request must fail with -EIO.  
370          */
371         else if (imp->imp_invalid) {
372                 DEBUG_REQ(D_ERROR, req, "IMP_INVALID");
373                 *status = -EIO;
374         } 
375         else if (req->rq_import_generation != imp->imp_generation) {
376                 DEBUG_REQ(D_ERROR, req, "req wrong generation:");
377                 *status = -EIO;
378         } 
379         else if (req->rq_send_state != imp->imp_state) {
380                 if (imp->imp_obd->obd_no_recov || imp->imp_dlm_fake 
381                     || req->rq_no_delay) 
382                         *status = -EWOULDBLOCK;
383                 else
384                         delay = 1;
385         }
386
387         RETURN(delay);
388 }
389
390 static int ptlrpc_check_reply(struct ptlrpc_request *req)
391 {
392         unsigned long flags;
393         int rc = 0;
394         ENTRY;
395
396         /* serialise with network callback */
397         spin_lock_irqsave (&req->rq_lock, flags);
398
399         if (req->rq_replied) {
400                 DEBUG_REQ(D_NET, req, "REPLIED:");
401                 GOTO(out, rc = 1);
402         }
403         
404         if (req->rq_net_err && !req->rq_timedout) {
405                 spin_unlock_irqrestore (&req->rq_lock, flags);
406                 rc = ptlrpc_expire_one_request(req); 
407                 spin_lock_irqsave (&req->rq_lock, flags);
408                 GOTO(out, rc);
409         }
410
411         if (req->rq_err) {
412                 DEBUG_REQ(D_ERROR, req, "ABORTED:");
413                 GOTO(out, rc = 1);
414         }
415
416         if (req->rq_resend) {
417                 DEBUG_REQ(D_ERROR, req, "RESEND:");
418                 GOTO(out, rc = 1);
419         }
420
421         if (req->rq_restart) {
422                 DEBUG_REQ(D_ERROR, req, "RESTART:");
423                 GOTO(out, rc = 1);
424         }
425         EXIT;
426  out:
427         spin_unlock_irqrestore (&req->rq_lock, flags);
428         DEBUG_REQ(D_NET, req, "rc = %d for", rc);
429         return rc;
430 }
431
432 static int ptlrpc_check_status(struct ptlrpc_request *req)
433 {
434         int err;
435         ENTRY;
436
437         err = req->rq_repmsg->status;
438         if (req->rq_repmsg->type == PTL_RPC_MSG_ERR) {
439                 DEBUG_REQ(D_ERROR, req, "type == PTL_RPC_MSG_ERR, err == %d", 
440                           err);
441                 RETURN(err < 0 ? err : -EINVAL);
442         }
443
444         if (err < 0) {
445                 DEBUG_REQ(D_INFO, req, "status is %d", err);
446         } else if (err > 0) {
447                 /* XXX: translate this error from net to host */
448                 DEBUG_REQ(D_INFO, req, "status is %d", err);
449         }
450
451         RETURN(err);
452 }
453
454 static int after_reply(struct ptlrpc_request *req)
455 {
456         unsigned long flags;
457         struct obd_import *imp = req->rq_import;
458         int rc;
459         ENTRY;
460
461         LASSERT(!req->rq_receiving_reply);
462
463         /* NB Until this point, the whole of the incoming message,
464          * including buflens, status etc is in the sender's byte order. */
465
466 #if SWAB_PARANOIA
467         /* Clear reply swab mask; this is a new reply in sender's byte order */
468         req->rq_rep_swab_mask = 0;
469 #endif
470         LASSERT (req->rq_nob_received <= req->rq_replen);
471         rc = lustre_unpack_msg(req->rq_repmsg, req->rq_nob_received);
472         if (rc) {
473                 CERROR("unpack_rep failed: %d\n", rc);
474                 RETURN(-EPROTO);
475         }
476
477         if (req->rq_repmsg->type != PTL_RPC_MSG_REPLY &&
478             req->rq_repmsg->type != PTL_RPC_MSG_ERR) {
479                 CERROR("invalid packet type received (type=%u)\n",
480                        req->rq_repmsg->type);
481                 RETURN(-EPROTO);
482         }
483
484         /* Store transno in reqmsg for replay. */
485         req->rq_reqmsg->transno = req->rq_transno = req->rq_repmsg->transno;
486
487         rc = ptlrpc_check_status(req);
488
489         /* Either we've been evicted, or the server has failed for
490          * some reason. Try to reconnect, and if that fails, punt to the
491          * upcall. */
492         if (rc == -ENOTCONN) {
493                 if (req->rq_send_state != LUSTRE_IMP_FULL ||
494                     imp->imp_obd->obd_no_recov || imp->imp_dlm_fake) {
495                         RETURN(-ENOTCONN);
496                 }
497
498                 ptlrpc_request_handle_notconn(req);
499
500                 RETURN(rc);
501         }
502
503         if (req->rq_import->imp_replayable) {
504                 spin_lock_irqsave(&imp->imp_lock, flags);
505                 if (req->rq_replay || req->rq_transno != 0)
506                         ptlrpc_retain_replayable_request(req, imp);
507                 else if (req->rq_commit_cb != NULL) {
508                         spin_unlock_irqrestore(&imp->imp_lock, flags);
509                         req->rq_commit_cb(req);
510                         spin_lock_irqsave(&imp->imp_lock, flags);
511                 }
512
513                 if (req->rq_transno > imp->imp_max_transno)
514                         imp->imp_max_transno = req->rq_transno;
515
516                 /* Replay-enabled imports return commit-status information. */
517                 if (req->rq_repmsg->last_committed)
518                         imp->imp_peer_committed_transno =
519                                 req->rq_repmsg->last_committed;
520                 ptlrpc_free_committed(imp);
521                 spin_unlock_irqrestore(&imp->imp_lock, flags);
522         }
523
524         RETURN(rc);
525 }
526
527 static int ptlrpc_send_new_req(struct ptlrpc_request *req)
528 {
529         char                   str[PTL_NALFMT_SIZE];
530         struct obd_import     *imp;
531         unsigned long          flags;
532         int rc;
533         ENTRY;
534
535         LASSERT(req->rq_phase == RQ_PHASE_NEW);
536         req->rq_phase = RQ_PHASE_RPC;
537
538         imp = req->rq_import;
539         spin_lock_irqsave(&imp->imp_lock, flags);
540
541         req->rq_import_generation = imp->imp_generation;
542
543         if (ptlrpc_import_delay_req(imp, req, &rc)) {
544                 spin_lock (&req->rq_lock);
545                 req->rq_waiting = 1;
546                 spin_unlock (&req->rq_lock);
547
548                 DEBUG_REQ(D_HA, req, "req from PID %d waiting for recovery: "
549                           "(%s != %s)",
550                           req->rq_reqmsg->status, 
551                           ptlrpc_import_state_name(req->rq_send_state),
552                           ptlrpc_import_state_name(imp->imp_state));
553                 LASSERT(list_empty (&req->rq_list));
554
555                 list_add_tail(&req->rq_list, &imp->imp_delayed_list);
556                 spin_unlock_irqrestore(&imp->imp_lock, flags);
557                 RETURN(0);
558         }
559
560         if (rc != 0) {
561                 spin_unlock_irqrestore(&imp->imp_lock, flags);
562                 req->rq_status = rc;
563                 req->rq_phase = RQ_PHASE_INTERPRET;
564                 RETURN(rc);
565         }
566
567         /* XXX this is the same as ptlrpc_queue_wait */
568         LASSERT(list_empty(&req->rq_list));
569         list_add_tail(&req->rq_list, &imp->imp_sending_list);
570         spin_unlock_irqrestore(&imp->imp_lock, flags);
571
572         req->rq_reqmsg->status = current->pid;
573         CDEBUG(D_RPCTRACE, "Sending RPC pname:cluuid:pid:xid:ni:nid:opc"
574                " %s:%s:%d:"LPU64":%s:%s:%d\n", current->comm,
575                imp->imp_obd->obd_uuid.uuid, req->rq_reqmsg->status,
576                req->rq_xid,
577                imp->imp_connection->c_peer.peer_ni->pni_name,
578                ptlrpc_peernid2str(&imp->imp_connection->c_peer, str),
579                req->rq_reqmsg->opc);
580
581         rc = ptl_send_rpc(req);
582         if (rc) {
583                 DEBUG_REQ(D_HA, req, "send failed (%d); expect timeout", rc);
584                 req->rq_net_err = 1;
585                 RETURN(rc);
586         }
587         RETURN(0);
588 }
589
590 int ptlrpc_check_set(struct ptlrpc_request_set *set)
591 {
592         char str[PTL_NALFMT_SIZE];
593         unsigned long flags;
594         struct list_head *tmp;
595         int force_timer_recalc = 0;
596         ENTRY;
597
598         if (set->set_remaining == 0)
599                 RETURN(1);
600
601         list_for_each(tmp, &set->set_requests) {
602                 struct ptlrpc_request *req =
603                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
604                 struct obd_import *imp = req->rq_import;
605                 int rc = 0;
606
607                 if (req->rq_phase == RQ_PHASE_NEW &&
608                     ptlrpc_send_new_req(req)) {
609                         force_timer_recalc = 1;
610                 }
611
612                 if (!(req->rq_phase == RQ_PHASE_RPC ||
613                       req->rq_phase == RQ_PHASE_BULK ||
614                       req->rq_phase == RQ_PHASE_INTERPRET ||
615                       req->rq_phase == RQ_PHASE_COMPLETE)) {
616                         DEBUG_REQ(D_ERROR, req, "bad phase %x", req->rq_phase);
617                         LBUG();
618                 }
619
620                 if (req->rq_phase == RQ_PHASE_COMPLETE)
621                         continue;
622
623                 if (req->rq_phase == RQ_PHASE_INTERPRET)
624                         GOTO(interpret, req->rq_status);
625
626                 if (req->rq_net_err && !req->rq_timedout)
627                         ptlrpc_expire_one_request(req); 
628
629                 if (req->rq_err) {
630                         ptlrpc_unregister_reply(req);
631                         if (req->rq_status == 0)
632                                 req->rq_status = -EIO;
633                         req->rq_phase = RQ_PHASE_INTERPRET;
634
635                         spin_lock_irqsave(&imp->imp_lock, flags);
636                         list_del_init(&req->rq_list);
637                         spin_unlock_irqrestore(&imp->imp_lock, flags);
638
639                         GOTO(interpret, req->rq_status);
640                 }
641
642                 /* ptlrpc_queue_wait->l_wait_event guarantees that rq_intr
643                  * will only be set after rq_timedout, but the oig waiting
644                  * path sets rq_intr irrespective of whether ptlrpcd has
645                  * seen a timeout.  our policy is to only interpret 
646                  * interrupted rpcs after they have timed out */
647                 if (req->rq_intr && (req->rq_timedout || req->rq_waiting)) {
648                         /* NB could be on delayed list */
649                         ptlrpc_unregister_reply(req);
650                         req->rq_status = -EINTR;
651                         req->rq_phase = RQ_PHASE_INTERPRET;
652
653                         spin_lock_irqsave(&imp->imp_lock, flags);
654                         list_del_init(&req->rq_list);
655                         spin_unlock_irqrestore(&imp->imp_lock, flags);
656
657                         GOTO(interpret, req->rq_status);
658                 }
659
660                 if (req->rq_phase == RQ_PHASE_RPC) {
661                         if (req->rq_waiting || req->rq_resend) {
662                                 int status;
663
664                                 ptlrpc_unregister_reply(req);
665
666                                 spin_lock_irqsave(&imp->imp_lock, flags);
667
668                                 if (ptlrpc_import_delay_req(imp, req, &status)){
669                                         spin_unlock_irqrestore(&imp->imp_lock,
670                                                                flags);
671                                         continue;
672                                 } 
673
674                                 list_del_init(&req->rq_list);
675                                 if (status != 0)  {
676                                         req->rq_status = status;
677                                         req->rq_phase = RQ_PHASE_INTERPRET;
678                                         spin_unlock_irqrestore(&imp->imp_lock,
679                                                                flags);
680                                         GOTO(interpret, req->rq_status);
681                                 }
682                                 if (req->rq_no_resend) {
683                                         req->rq_status = -ENOTCONN;
684                                         req->rq_phase = RQ_PHASE_INTERPRET;
685                                         spin_unlock_irqrestore(&imp->imp_lock,
686                                                                flags);
687                                         GOTO(interpret, req->rq_status);
688                                 }
689                                 list_add_tail(&req->rq_list,
690                                               &imp->imp_sending_list);
691
692                                 spin_unlock_irqrestore(&imp->imp_lock, flags);
693
694                                 req->rq_waiting = 0;
695                                 if (req->rq_resend) {
696                                         lustre_msg_add_flags(req->rq_reqmsg,
697                                                              MSG_RESENT);
698                                         if (req->rq_bulk) {
699                                                 __u64 old_xid = req->rq_xid;
700
701                                                 ptlrpc_unregister_bulk (req);
702
703                                                 /* ensure previous bulk fails */
704                                                 req->rq_xid = ptlrpc_next_xid();
705                                                 CDEBUG(D_HA, "resend bulk "
706                                                        "old x"LPU64
707                                                        " new x"LPU64"\n",
708                                                        old_xid, req->rq_xid);
709                                         }
710                                 }
711
712                                 rc = ptl_send_rpc(req);
713                                 if (rc) {
714                                         DEBUG_REQ(D_HA, req, "send failed (%d)",
715                                                   rc);
716                                         force_timer_recalc = 1;
717                                         req->rq_net_err = 1;
718                                 }
719                                 /* need to reset the timeout */
720                                 force_timer_recalc = 1;
721                         }
722
723                         /* Still waiting for a reply? */
724                         if (ptlrpc_client_receiving_reply(req))
725                                 continue;
726
727                         /* Did we actually receive a reply? */
728                         if (!ptlrpc_client_replied(req))
729                                 continue;
730
731                         spin_lock_irqsave(&imp->imp_lock, flags);
732                         list_del_init(&req->rq_list);
733                         spin_unlock_irqrestore(&imp->imp_lock, flags);
734
735                         req->rq_status = after_reply(req);
736                         if (req->rq_resend) {
737                                 /* Add this req to the delayed list so
738                                    it can be errored if the import is
739                                    evicted after recovery. */
740                                 spin_lock_irqsave (&req->rq_lock, flags);
741                                 list_add_tail(&req->rq_list, 
742                                               &imp->imp_delayed_list);
743                                 spin_unlock_irqrestore(&req->rq_lock, flags);
744                                 continue;
745                         }
746
747                         /* If there is no bulk associated with this request,
748                          * then we're done and should let the interpreter
749                          * process the reply.  Similarly if the RPC returned
750                          * an error, and therefore the bulk will never arrive.
751                          */
752                         if (req->rq_bulk == NULL || req->rq_status != 0) {
753                                 req->rq_phase = RQ_PHASE_INTERPRET;
754                                 GOTO(interpret, req->rq_status);
755                         }
756
757                         req->rq_phase = RQ_PHASE_BULK;
758                 }
759
760                 LASSERT(req->rq_phase == RQ_PHASE_BULK);
761                 if (ptlrpc_bulk_active(req->rq_bulk))
762                         continue;
763
764                 if (!req->rq_bulk->bd_success) {
765                         /* The RPC reply arrived OK, but the bulk screwed
766                          * up!  Dead wierd since the server told us the RPC
767                          * was good after getting the REPLY for her GET or
768                          * the ACK for her PUT. */
769                         DEBUG_REQ(D_ERROR, req, "bulk transfer failed");
770                         LBUG();
771                 }
772
773                 req->rq_phase = RQ_PHASE_INTERPRET;
774
775         interpret:
776                 LASSERT(req->rq_phase == RQ_PHASE_INTERPRET);
777                 LASSERT(!req->rq_receiving_reply);
778
779                 ptlrpc_unregister_reply(req);
780                 if (req->rq_bulk != NULL)
781                         ptlrpc_unregister_bulk (req);
782
783                 req->rq_phase = RQ_PHASE_COMPLETE;
784
785                 if (req->rq_interpret_reply != NULL) {
786                         int (*interpreter)(struct ptlrpc_request *,void *,int) =
787                                 req->rq_interpret_reply;
788                         req->rq_status = interpreter(req, &req->rq_async_args,
789                                                      req->rq_status);
790                 }
791
792                 CDEBUG(D_RPCTRACE, "Completed RPC pname:cluuid:pid:xid:ni:nid:"
793                        "opc %s:%s:%d:"LPU64":%s:%s:%d\n", current->comm,
794                        imp->imp_obd->obd_uuid.uuid, req->rq_reqmsg->status,
795                        req->rq_xid,
796                        imp->imp_connection->c_peer.peer_ni->pni_name,
797                        ptlrpc_peernid2str(&imp->imp_connection->c_peer, str),
798                        req->rq_reqmsg->opc);
799
800                 set->set_remaining--;
801
802                 atomic_dec(&imp->imp_inflight);
803                 wake_up(&imp->imp_recovery_waitq);
804         }
805
806         /* If we hit an error, we want to recover promptly. */
807         RETURN(set->set_remaining == 0 || force_timer_recalc);
808 }
809
810 int ptlrpc_expire_one_request(struct ptlrpc_request *req)
811 {
812         unsigned long      flags;
813         struct obd_import *imp = req->rq_import;
814         int replied = 0;
815         ENTRY;
816
817         spin_lock_irqsave (&req->rq_lock, flags);
818         replied = req->rq_replied;
819         if (!replied)
820                 req->rq_timedout = 1;
821         spin_unlock_irqrestore (&req->rq_lock, flags);
822
823         if (replied)
824                 RETURN(0);
825
826         DEBUG_REQ(D_ERROR, req, "timeout");
827
828         ptlrpc_unregister_reply (req);
829
830         if (req->rq_bulk != NULL)
831                 ptlrpc_unregister_bulk (req);
832
833         if (imp == NULL) {
834                 DEBUG_REQ(D_HA, req, "NULL import: already cleaned up?");
835                 RETURN(1);
836         }
837
838         /* The DLM server doesn't want recovery run on its imports. */
839         if (imp->imp_dlm_fake)
840                 RETURN(1);
841
842         /* If this request is for recovery or other primordial tasks,
843          * then error it out here. */
844         if (req->rq_send_state != LUSTRE_IMP_FULL || 
845             imp->imp_obd->obd_no_recov) {
846                 spin_lock_irqsave (&req->rq_lock, flags);
847                 req->rq_status = -ETIMEDOUT;
848                 req->rq_err = 1;
849                 spin_unlock_irqrestore (&req->rq_lock, flags);
850                 RETURN(1);
851         }
852
853         ptlrpc_fail_import(imp, req->rq_import_generation);
854
855         RETURN(0);
856 }
857
858 int ptlrpc_expired_set(void *data)
859 {
860         struct ptlrpc_request_set *set = data;
861         struct list_head          *tmp;
862         time_t                     now = LTIME_S (CURRENT_TIME);
863         ENTRY;
864
865         LASSERT(set != NULL);
866
867         /* A timeout expired; see which reqs it applies to... */
868         list_for_each (tmp, &set->set_requests) {
869                 struct ptlrpc_request *req =
870                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
871
872                 /* request in-flight? */
873                 if (!((req->rq_phase == RQ_PHASE_RPC && !req->rq_waiting 
874                        && !req->rq_resend) ||
875                       (req->rq_phase == RQ_PHASE_BULK)))
876                         continue;
877
878                 if (req->rq_timedout ||           /* already dealt with */
879                     req->rq_sent + req->rq_timeout > now) /* not expired */
880                         continue;
881
882                 /* deal with this guy */
883                 ptlrpc_expire_one_request (req);
884         }
885
886         /* When waiting for a whole set, we always to break out of the
887          * sleep so we can recalculate the timeout, or enable interrupts
888          * iff everyone's timed out.
889          */
890         RETURN(1);
891 }
892
893 void ptlrpc_mark_interrupted(struct ptlrpc_request *req)
894 {
895         unsigned long flags;
896         spin_lock_irqsave(&req->rq_lock, flags);
897         req->rq_intr = 1;
898         spin_unlock_irqrestore(&req->rq_lock, flags);
899 }
900
901 void ptlrpc_interrupted_set(void *data)
902 {
903         struct ptlrpc_request_set *set = data;
904         struct list_head *tmp;
905
906         LASSERT(set != NULL);
907         CERROR("INTERRUPTED SET %p\n", set);
908
909         list_for_each(tmp, &set->set_requests) {
910                 struct ptlrpc_request *req =
911                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
912
913                 if (req->rq_phase != RQ_PHASE_RPC)
914                         continue;
915
916                 ptlrpc_mark_interrupted(req);
917         }
918 }
919
920 int ptlrpc_set_next_timeout(struct ptlrpc_request_set *set)
921 {
922         struct list_head      *tmp;
923         time_t                 now = LTIME_S(CURRENT_TIME);
924         time_t                 deadline;
925         int                    timeout = 0;
926         struct ptlrpc_request *req;
927         ENTRY;
928
929         SIGNAL_MASK_ASSERT(); /* XXX BUG 1511 */
930
931         list_for_each(tmp, &set->set_requests) {
932                 req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
933
934                 /* request in-flight? */
935                 if (!((req->rq_phase == RQ_PHASE_RPC && !req->rq_waiting) ||
936                       (req->rq_phase == RQ_PHASE_BULK)))
937                         continue;
938
939                 if (req->rq_timedout)   /* already timed out */
940                         continue;
941
942                 deadline = req->rq_sent + req->rq_timeout;
943                 if (deadline <= now)    /* actually expired already */
944                         timeout = 1;    /* ASAP */
945                 else if (timeout == 0 || timeout > deadline - now)
946                         timeout = deadline - now;
947         }
948         RETURN(timeout);
949 }
950                 
951
952 int ptlrpc_set_wait(struct ptlrpc_request_set *set)
953 {
954         struct list_head      *tmp;
955         struct ptlrpc_request *req;
956         struct l_wait_info     lwi;
957         int                    rc, timeout;
958         ENTRY;
959
960         LASSERT(!list_empty(&set->set_requests));
961         list_for_each(tmp, &set->set_requests) {
962                 req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
963                 if (req->rq_phase == RQ_PHASE_NEW)
964                         (void)ptlrpc_send_new_req(req);
965         }
966
967         do {
968                 timeout = ptlrpc_set_next_timeout(set);
969
970                 /* wait until all complete, interrupted, or an in-flight
971                  * req times out */
972                 CDEBUG(D_HA, "set %p going to sleep for %d seconds\n",
973                        set, timeout);
974                 lwi = LWI_TIMEOUT_INTR((timeout ? timeout : 1) * HZ,
975                                        ptlrpc_expired_set, 
976                                        ptlrpc_interrupted_set, set);
977                 rc = l_wait_event(set->set_waitq, ptlrpc_check_set(set), &lwi);
978
979                 LASSERT(rc == 0 || rc == -EINTR || rc == -ETIMEDOUT);
980
981                 /* -EINTR => all requests have been flagged rq_intr so next
982                  * check completes.
983                  * -ETIMEOUTD => someone timed out.  When all reqs have
984                  * timed out, signals are enabled allowing completion with
985                  * EINTR.
986                  * I don't really care if we go once more round the loop in
987                  * the error cases -eeb. */
988         } while (rc != 0);
989
990         LASSERT(set->set_remaining == 0);
991
992         rc = 0;
993         list_for_each(tmp, &set->set_requests) {
994                 req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
995
996                 LASSERT(req->rq_phase == RQ_PHASE_COMPLETE);
997                 if (req->rq_status != 0)
998                         rc = req->rq_status;
999         }
1000
1001         if (set->set_interpret != NULL) {
1002                 int (*interpreter)(struct ptlrpc_request_set *set,void *,int) =
1003                         set->set_interpret;
1004                 rc = interpreter (set, &set->set_args, rc);
1005         }
1006
1007         RETURN(rc);
1008 }
1009
1010 static void __ptlrpc_free_req(struct ptlrpc_request *request, int locked)
1011 {
1012         ENTRY;
1013         if (request == NULL) {
1014                 EXIT;
1015                 return;
1016         }
1017
1018         LASSERTF(!request->rq_receiving_reply, "req %p\n", request);
1019         LASSERTF(request->rq_rqbd == NULL, "req %p\n",request);/* client-side */
1020         LASSERTF(list_empty(&request->rq_list), "req %p\n", request);
1021         LASSERTF(list_empty(&request->rq_set_chain), "req %p\n", request);
1022
1023         /* We must take it off the imp_replay_list first.  Otherwise, we'll set
1024          * request->rq_reqmsg to NULL while osc_close is dereferencing it. */
1025         if (request->rq_import != NULL) {
1026                 unsigned long flags = 0;
1027                 if (!locked)
1028                         spin_lock_irqsave(&request->rq_import->imp_lock, flags);
1029                 list_del_init(&request->rq_replay_list);
1030                 if (!locked)
1031                         spin_unlock_irqrestore(&request->rq_import->imp_lock,
1032                                                flags);
1033         }
1034         LASSERTF(list_empty(&request->rq_replay_list), "req %p\n", request);
1035
1036         if (atomic_read(&request->rq_refcount) != 0) {
1037                 DEBUG_REQ(D_ERROR, request,
1038                           "freeing request with nonzero refcount");
1039                 LBUG();
1040         }
1041
1042         if (request->rq_repmsg != NULL) {
1043                 OBD_FREE(request->rq_repmsg, request->rq_replen);
1044                 request->rq_repmsg = NULL;
1045         }
1046         if (request->rq_reqmsg != NULL) {
1047                 OBD_FREE(request->rq_reqmsg, request->rq_reqlen);
1048                 request->rq_reqmsg = NULL;
1049         }
1050         if (request->rq_export != NULL) {
1051                 class_export_put(request->rq_export);
1052                 request->rq_export = NULL;
1053         }
1054         if (request->rq_import != NULL) {
1055                 class_import_put(request->rq_import);
1056                 request->rq_import = NULL;
1057         }
1058         if (request->rq_bulk != NULL)
1059                 ptlrpc_free_bulk(request->rq_bulk);
1060
1061         OBD_FREE(request, sizeof(*request));
1062         EXIT;
1063 }
1064
1065 void ptlrpc_free_req(struct ptlrpc_request *request)
1066 {
1067         __ptlrpc_free_req(request, 0);
1068 }
1069
1070 static int __ptlrpc_req_finished(struct ptlrpc_request *request, int locked);
1071 void ptlrpc_req_finished_with_imp_lock(struct ptlrpc_request *request)
1072 {
1073         LASSERT_SPIN_LOCKED(&request->rq_import->imp_lock);
1074         (void)__ptlrpc_req_finished(request, 1);
1075 }
1076
1077 static int __ptlrpc_req_finished(struct ptlrpc_request *request, int locked)
1078 {
1079         ENTRY;
1080         if (request == NULL)
1081                 RETURN(1);
1082
1083         if (request == LP_POISON ||
1084             request->rq_reqmsg == LP_POISON) {
1085                 CERROR("dereferencing freed request (bug 575)\n");
1086                 LBUG();
1087                 RETURN(1);
1088         }
1089
1090         DEBUG_REQ(D_INFO, request, "refcount now %u",
1091                   atomic_read(&request->rq_refcount) - 1);
1092
1093         if (atomic_dec_and_test(&request->rq_refcount)) {
1094                 __ptlrpc_free_req(request, locked);
1095                 RETURN(1);
1096         }
1097
1098         RETURN(0);
1099 }
1100
1101 void ptlrpc_req_finished(struct ptlrpc_request *request)
1102 {
1103         __ptlrpc_req_finished(request, 0);
1104 }
1105
1106 /* Disengage the client's reply buffer from the network
1107  * NB does _NOT_ unregister any client-side bulk.
1108  * IDEMPOTENT, but _not_ safe against concurrent callers.
1109  * The request owner (i.e. the thread doing the I/O) must call...
1110  */
1111 void ptlrpc_unregister_reply (struct ptlrpc_request *request)
1112 {
1113         int                rc;
1114         wait_queue_head_t *wq;
1115         struct l_wait_info lwi;
1116
1117         LASSERT(!in_interrupt ());             /* might sleep */
1118
1119         if (!ptlrpc_client_receiving_reply(request))
1120                 return;
1121
1122         PtlMDUnlink (request->rq_reply_md_h);
1123
1124         /* We have to l_wait_event() whatever the result, to give liblustre
1125          * a chance to run reply_in_callback() */
1126
1127         if (request->rq_set == NULL)
1128                 wq = &request->rq_set->set_waitq;
1129         else
1130                 wq = &request->rq_reply_waitq;
1131
1132         for (;;) {
1133                 /* Network access will complete in finite time but the HUGE
1134                  * timeout lets us CWARN for visibility of sluggish NALs */
1135                 lwi = LWI_TIMEOUT(300 * HZ, NULL, NULL);
1136                 rc = l_wait_event (*wq, !ptlrpc_client_receiving_reply(request), &lwi);
1137                 if (rc == 0)
1138                         return;
1139
1140                 LASSERT (rc == -ETIMEDOUT);
1141                 DEBUG_REQ(D_WARNING, request, "Unexpectedly long timeout");
1142         }
1143 }
1144
1145 /* caller must hold imp->imp_lock */
1146 void ptlrpc_free_committed(struct obd_import *imp)
1147 {
1148         struct list_head *tmp, *saved;
1149         struct ptlrpc_request *req;
1150         struct ptlrpc_request *last_req = NULL; /* temporary fire escape */
1151         ENTRY;
1152
1153         LASSERT(imp != NULL);
1154
1155         LASSERT_SPIN_LOCKED(&imp->imp_lock);
1156
1157         CDEBUG(D_HA, "%s: committing for last_committed "LPU64"\n",
1158                imp->imp_obd->obd_name, imp->imp_peer_committed_transno);
1159
1160         list_for_each_safe(tmp, saved, &imp->imp_replay_list) {
1161                 req = list_entry(tmp, struct ptlrpc_request, rq_replay_list);
1162
1163                 /* XXX ok to remove when 1357 resolved - rread 05/29/03  */
1164                 LASSERT(req != last_req);
1165                 last_req = req;
1166
1167                 if (req->rq_import_generation < imp->imp_generation) {
1168                         DEBUG_REQ(D_HA, req, "freeing request with old gen");
1169                         GOTO(free_req, 0);
1170                 }
1171
1172                 if (req->rq_replay) {
1173                         DEBUG_REQ(D_HA, req, "keeping (FL_REPLAY)");
1174                         continue;
1175                 }
1176
1177                 /* not yet committed */
1178                 if (req->rq_transno > imp->imp_peer_committed_transno) {
1179                         DEBUG_REQ(D_HA, req, "stopping search");
1180                         break;
1181                 }
1182
1183                 DEBUG_REQ(D_HA, req, "committing (last_committed "LPU64")",
1184                           imp->imp_peer_committed_transno);
1185 free_req:
1186                 if (req->rq_commit_cb != NULL)
1187                         req->rq_commit_cb(req);
1188                 list_del_init(&req->rq_replay_list);
1189                 __ptlrpc_req_finished(req, 1);
1190         }
1191
1192         EXIT;
1193         return;
1194 }
1195
1196 void ptlrpc_cleanup_client(struct obd_import *imp)
1197 {
1198         ENTRY;
1199         EXIT;
1200         return;
1201 }
1202
1203 void ptlrpc_resend_req(struct ptlrpc_request *req)
1204 {
1205         unsigned long flags;
1206
1207         DEBUG_REQ(D_HA, req, "going to resend");
1208         req->rq_reqmsg->handle.cookie = 0;
1209         req->rq_status = -EAGAIN;
1210
1211         spin_lock_irqsave (&req->rq_lock, flags);
1212         req->rq_resend = 1;
1213         req->rq_net_err = 0;
1214         req->rq_timedout = 0;
1215         if (req->rq_bulk) {
1216                 __u64 old_xid = req->rq_xid;
1217                 
1218                 /* ensure previous bulk fails */
1219                 req->rq_xid = ptlrpc_next_xid();
1220                 CDEBUG(D_HA, "resend bulk old x"LPU64" new x"LPU64"\n",
1221                        old_xid, req->rq_xid);
1222         }
1223         ptlrpc_wake_client_req(req);
1224         spin_unlock_irqrestore (&req->rq_lock, flags);
1225 }
1226
1227 /* XXX: this function and rq_status are currently unused */
1228 void ptlrpc_restart_req(struct ptlrpc_request *req)
1229 {
1230         unsigned long flags;
1231
1232         DEBUG_REQ(D_HA, req, "restarting (possibly-)completed request");
1233         req->rq_status = -ERESTARTSYS;
1234
1235         spin_lock_irqsave (&req->rq_lock, flags);
1236         req->rq_restart = 1;
1237         req->rq_timedout = 0;
1238         ptlrpc_wake_client_req(req);
1239         spin_unlock_irqrestore (&req->rq_lock, flags);
1240 }
1241
1242 static int expired_request(void *data)
1243 {
1244         struct ptlrpc_request *req = data;
1245         ENTRY;
1246
1247         RETURN(ptlrpc_expire_one_request(req));
1248 }
1249
1250 static void interrupted_request(void *data)
1251 {
1252         unsigned long flags;
1253
1254         struct ptlrpc_request *req = data;
1255         DEBUG_REQ(D_HA, req, "request interrupted");
1256         spin_lock_irqsave (&req->rq_lock, flags);
1257         req->rq_intr = 1;
1258         spin_unlock_irqrestore (&req->rq_lock, flags);
1259 }
1260
1261 struct ptlrpc_request *ptlrpc_request_addref(struct ptlrpc_request *req)
1262 {
1263         ENTRY;
1264         atomic_inc(&req->rq_refcount);
1265         RETURN(req);
1266 }
1267
1268 void ptlrpc_retain_replayable_request(struct ptlrpc_request *req,
1269                                       struct obd_import *imp)
1270 {
1271         struct list_head *tmp;
1272
1273         LASSERT_SPIN_LOCKED(&imp->imp_lock);
1274
1275         /* clear this  for new requests that were resent as well
1276            as resent replayed requests. */
1277         lustre_msg_clear_flags(req->rq_reqmsg,
1278                              MSG_RESENT);
1279
1280         /* don't re-add requests that have been replayed */
1281         if (!list_empty(&req->rq_replay_list))
1282                 return;
1283
1284         lustre_msg_add_flags(req->rq_reqmsg,
1285                              MSG_REPLAY);
1286
1287         LASSERT(imp->imp_replayable);
1288         /* Balanced in ptlrpc_free_committed, usually. */
1289         ptlrpc_request_addref(req);
1290         list_for_each_prev(tmp, &imp->imp_replay_list) {
1291                 struct ptlrpc_request *iter =
1292                         list_entry(tmp, struct ptlrpc_request, rq_replay_list);
1293
1294                 /* We may have duplicate transnos if we create and then
1295                  * open a file, or for closes retained if to match creating
1296                  * opens, so use req->rq_xid as a secondary key.
1297                  * (See bugs 684, 685, and 428.)
1298                  * XXX no longer needed, but all opens need transnos!
1299                  */
1300                 if (iter->rq_transno > req->rq_transno)
1301                         continue;
1302
1303                 if (iter->rq_transno == req->rq_transno) {
1304                         LASSERT(iter->rq_xid != req->rq_xid);
1305                         if (iter->rq_xid > req->rq_xid)
1306                                 continue;
1307                 }
1308
1309                 list_add(&req->rq_replay_list, &iter->rq_replay_list);
1310                 return;
1311         }
1312
1313         list_add_tail(&req->rq_replay_list, &imp->imp_replay_list);
1314 }
1315
1316 int ptlrpc_queue_wait(struct ptlrpc_request *req)
1317 {
1318         char str[PTL_NALFMT_SIZE];
1319         int rc = 0;
1320         int brc;
1321         struct l_wait_info lwi;
1322         struct obd_import *imp = req->rq_import;
1323         unsigned long flags;
1324         int timeout = 0;
1325         ENTRY;
1326
1327         LASSERT(req->rq_set == NULL);
1328         LASSERT(!req->rq_receiving_reply);
1329         atomic_inc(&imp->imp_inflight);
1330
1331         /* for distributed debugging */
1332         req->rq_reqmsg->status = current->pid;
1333         LASSERT(imp->imp_obd != NULL);
1334         CDEBUG(D_RPCTRACE, "Sending RPC pname:cluuid:pid:xid:ni:nid:opc "
1335                "%s:%s:%d:"LPU64":%s:%s:%d\n", current->comm,
1336                imp->imp_obd->obd_uuid.uuid,
1337                req->rq_reqmsg->status, req->rq_xid,
1338                imp->imp_connection->c_peer.peer_ni->pni_name,
1339                ptlrpc_peernid2str(&imp->imp_connection->c_peer, str),
1340                req->rq_reqmsg->opc);
1341
1342         /* Mark phase here for a little debug help */
1343         req->rq_phase = RQ_PHASE_RPC;
1344
1345         spin_lock_irqsave(&imp->imp_lock, flags);
1346         req->rq_import_generation = imp->imp_generation;
1347 restart:
1348         if (ptlrpc_import_delay_req(imp, req, &rc)) {
1349                 list_del(&req->rq_list);
1350
1351                 list_add_tail(&req->rq_list, &imp->imp_delayed_list);
1352                 spin_unlock_irqrestore(&imp->imp_lock, flags);
1353
1354                 DEBUG_REQ(D_HA, req, "\"%s\" waiting for recovery: (%s != %s)",
1355                           current->comm, 
1356                           ptlrpc_import_state_name(req->rq_send_state), 
1357                           ptlrpc_import_state_name(imp->imp_state));
1358                 lwi = LWI_INTR(interrupted_request, req);
1359                 rc = l_wait_event(req->rq_reply_waitq,
1360                                   (req->rq_send_state == imp->imp_state ||
1361                                    req->rq_err),
1362                                   &lwi);
1363                 DEBUG_REQ(D_HA, req, "\"%s\" awake: (%s == %s or %d == 1)",
1364                           current->comm, 
1365                           ptlrpc_import_state_name(imp->imp_state), 
1366                           ptlrpc_import_state_name(req->rq_send_state),
1367                           req->rq_err);
1368
1369                 spin_lock_irqsave(&imp->imp_lock, flags);
1370                 list_del_init(&req->rq_list);
1371
1372                 if (req->rq_err) {
1373                         rc = -EIO;
1374                 } 
1375                 else if (req->rq_intr) {
1376                         rc = -EINTR;
1377                 }
1378                 else if (req->rq_no_resend) {
1379                         spin_unlock_irqrestore(&imp->imp_lock, flags);
1380                         GOTO(out, rc = -ETIMEDOUT);
1381                 }
1382                 else {
1383                         GOTO(restart, rc);
1384                 }
1385         } 
1386
1387         if (rc != 0) {
1388                 list_del_init(&req->rq_list);
1389                 spin_unlock_irqrestore(&imp->imp_lock, flags);
1390                 req->rq_status = rc; // XXX this ok?
1391                 GOTO(out, rc);
1392         }
1393
1394         if (req->rq_resend) {
1395                 lustre_msg_add_flags(req->rq_reqmsg, MSG_RESENT);
1396
1397                 if (req->rq_bulk != NULL)
1398                         ptlrpc_unregister_bulk (req);
1399
1400                 DEBUG_REQ(D_HA, req, "resending: ");
1401         }
1402
1403         /* XXX this is the same as ptlrpc_set_wait */
1404         LASSERT(list_empty(&req->rq_list));
1405         list_add_tail(&req->rq_list, &imp->imp_sending_list);
1406         spin_unlock_irqrestore(&imp->imp_lock, flags);
1407
1408         rc = ptl_send_rpc(req);
1409         if (rc) {
1410                 DEBUG_REQ(D_HA, req, "send failed (%d); recovering", rc);
1411                 timeout = 1;
1412         } else {
1413                 timeout = MAX(req->rq_timeout * HZ, 1);
1414                 DEBUG_REQ(D_NET, req, "-- sleeping");
1415         }
1416         lwi = LWI_TIMEOUT_INTR(timeout, expired_request, interrupted_request,
1417                                req);
1418         l_wait_event(req->rq_reply_waitq, ptlrpc_check_reply(req), &lwi);
1419         DEBUG_REQ(D_NET, req, "-- done sleeping");
1420
1421         CDEBUG(D_RPCTRACE, "Completed RPC pname:cluuid:pid:xid:ni:nid:opc "
1422                "%s:%s:%d:"LPU64":%s:%s:%d\n", current->comm,
1423                imp->imp_obd->obd_uuid.uuid,
1424                req->rq_reqmsg->status, req->rq_xid,
1425                imp->imp_connection->c_peer.peer_ni->pni_name,
1426                ptlrpc_peernid2str(&imp->imp_connection->c_peer, str),
1427                req->rq_reqmsg->opc);
1428
1429         spin_lock_irqsave(&imp->imp_lock, flags);
1430         list_del_init(&req->rq_list);
1431         spin_unlock_irqrestore(&imp->imp_lock, flags);
1432
1433         /* If the reply was received normally, this just grabs the spinlock
1434          * (ensuring the reply callback has returned), sees that
1435          * req->rq_receiving_reply is clear and returns. */
1436         ptlrpc_unregister_reply (req);
1437
1438         if (req->rq_err)
1439                 GOTO(out, rc = -EIO);
1440
1441         /* Resend if we need to, unless we were interrupted. */
1442         if (req->rq_resend && !req->rq_intr) {
1443                 /* ...unless we were specifically told otherwise. */
1444                 if (req->rq_no_resend)
1445                         GOTO(out, rc = -ETIMEDOUT);
1446                 spin_lock_irqsave(&imp->imp_lock, flags);
1447                 goto restart;
1448         }
1449
1450         if (req->rq_intr) {
1451                 /* Should only be interrupted if we timed out. */
1452                 if (!req->rq_timedout)
1453                         DEBUG_REQ(D_ERROR, req,
1454                                   "rq_intr set but rq_timedout not");
1455                 GOTO(out, rc = -EINTR);
1456         }
1457
1458         if (req->rq_timedout) {                 /* non-recoverable timeout */
1459                 GOTO(out, rc = -ETIMEDOUT);
1460         }
1461
1462         if (!req->rq_replied) {
1463                 /* How can this be? -eeb */
1464                 DEBUG_REQ(D_ERROR, req, "!rq_replied: ");
1465                 LBUG();
1466                 GOTO(out, rc = req->rq_status);
1467         }
1468
1469         rc = after_reply (req);
1470         /* NB may return +ve success rc */
1471         if (req->rq_resend) {
1472                 spin_lock_irqsave(&imp->imp_lock, flags);
1473                 goto restart;
1474         }
1475
1476  out:
1477         if (req->rq_bulk != NULL) {
1478                 if (rc >= 0) {                  
1479                         /* success so far.  Note that anything going wrong
1480                          * with bulk now, is EXTREMELY strange, since the
1481                          * server must have believed that the bulk
1482                          * tranferred OK before she replied with success to
1483                          * me. */
1484                         lwi = LWI_TIMEOUT(timeout, NULL, NULL);
1485                         brc = l_wait_event(req->rq_reply_waitq,
1486                                            !ptlrpc_bulk_active(req->rq_bulk),
1487                                            &lwi);
1488                         LASSERT(brc == 0 || brc == -ETIMEDOUT);
1489                         if (brc != 0) {
1490                                 LASSERT(brc == -ETIMEDOUT);
1491                                 DEBUG_REQ(D_ERROR, req, "bulk timed out");
1492                                 rc = brc;
1493                         } else if (!req->rq_bulk->bd_success) {
1494                                 DEBUG_REQ(D_ERROR, req, "bulk transfer failed");
1495                                 rc = -EIO;
1496                         }
1497                 }
1498                 if (rc < 0)
1499                         ptlrpc_unregister_bulk (req);
1500         }
1501
1502         LASSERT(!req->rq_receiving_reply);
1503         req->rq_phase = RQ_PHASE_INTERPRET;
1504
1505         atomic_dec(&imp->imp_inflight);
1506         wake_up(&imp->imp_recovery_waitq);
1507         RETURN(rc);
1508 }
1509
1510 struct ptlrpc_replay_async_args {
1511         int praa_old_state;
1512         int praa_old_status;
1513 };
1514
1515 static int ptlrpc_replay_interpret(struct ptlrpc_request *req,
1516                                     void * data, int rc)
1517 {
1518         struct ptlrpc_replay_async_args *aa = data;
1519         struct obd_import *imp = req->rq_import;
1520         unsigned long flags;
1521
1522         atomic_dec(&imp->imp_replay_inflight);
1523         
1524         if (!req->rq_replied) {
1525                 CERROR("request replay timed out, restarting recovery\n");
1526                 GOTO(out, rc = -ETIMEDOUT);
1527         }
1528
1529 #if SWAB_PARANOIA
1530         /* Clear reply swab mask; this is a new reply in sender's byte order */
1531         req->rq_rep_swab_mask = 0;
1532 #endif
1533         LASSERT (req->rq_nob_received <= req->rq_replen);
1534         rc = lustre_unpack_msg(req->rq_repmsg, req->rq_nob_received);
1535         if (rc) {
1536                 CERROR("unpack_rep failed: %d\n", rc);
1537                 GOTO(out, rc = -EPROTO);
1538         }
1539
1540         if (req->rq_repmsg->type == PTL_RPC_MSG_ERR && 
1541             req->rq_repmsg->status == -ENOTCONN) 
1542                 GOTO(out, rc = req->rq_repmsg->status);
1543
1544         /* The transno had better not change over replay. */
1545         LASSERT(req->rq_reqmsg->transno == req->rq_repmsg->transno);
1546
1547         DEBUG_REQ(D_HA, req, "got rep");
1548
1549         /* let the callback do fixups, possibly including in the request */
1550         if (req->rq_replay_cb)
1551                 req->rq_replay_cb(req);
1552
1553         if (req->rq_replied && req->rq_repmsg->status != aa->praa_old_status) {
1554                 DEBUG_REQ(D_ERROR, req, "status %d, old was %d",
1555                           req->rq_repmsg->status, aa->praa_old_status);
1556         } else {
1557                 /* Put it back for re-replay. */
1558                 req->rq_repmsg->status = aa->praa_old_status;
1559         }
1560
1561         spin_lock_irqsave(&imp->imp_lock, flags);
1562         imp->imp_last_replay_transno = req->rq_transno;
1563         spin_unlock_irqrestore(&imp->imp_lock, flags);
1564
1565         /* continue with recovery */
1566         rc = ptlrpc_import_recovery_state_machine(imp);
1567  out:
1568         req->rq_send_state = aa->praa_old_state;
1569         
1570         if (rc != 0)
1571                 /* this replay failed, so restart recovery */
1572                 ptlrpc_connect_import(imp, NULL);
1573
1574         RETURN(rc);
1575 }
1576
1577
1578 int ptlrpc_replay_req(struct ptlrpc_request *req)
1579 {
1580         struct ptlrpc_replay_async_args *aa;
1581         ENTRY;
1582
1583         LASSERT(req->rq_import->imp_state == LUSTRE_IMP_REPLAY);
1584
1585         /* Not handling automatic bulk replay yet (or ever?) */
1586         LASSERT(req->rq_bulk == NULL);
1587
1588         DEBUG_REQ(D_HA, req, "REPLAY");
1589
1590         LASSERT (sizeof (*aa) <= sizeof (req->rq_async_args));
1591         aa = (struct ptlrpc_replay_async_args *)&req->rq_async_args;
1592         memset(aa, 0, sizeof *aa);
1593
1594         /* Prepare request to be resent with ptlrpcd */
1595         aa->praa_old_state = req->rq_send_state;
1596         req->rq_send_state = LUSTRE_IMP_REPLAY;
1597         req->rq_phase = RQ_PHASE_NEW;
1598         aa->praa_old_status = req->rq_repmsg->status;
1599         req->rq_status = 0;
1600
1601         req->rq_interpret_reply = ptlrpc_replay_interpret;
1602         atomic_inc(&req->rq_import->imp_replay_inflight);
1603         ptlrpc_request_addref(req); /* ptlrpcd needs a ref */
1604
1605         ptlrpcd_add_req(req);
1606         RETURN(0);
1607 }
1608
1609 void ptlrpc_abort_inflight(struct obd_import *imp)
1610 {
1611         unsigned long flags;
1612         struct list_head *tmp, *n;
1613         ENTRY;
1614
1615         /* Make sure that no new requests get processed for this import.
1616          * ptlrpc_{queue,set}_wait must (and does) hold imp_lock while testing
1617          * this flag and then putting requests on sending_list or delayed_list.
1618          */
1619         spin_lock_irqsave(&imp->imp_lock, flags);
1620
1621         /* XXX locking?  Maybe we should remove each request with the list
1622          * locked?  Also, how do we know if the requests on the list are
1623          * being freed at this time?
1624          */
1625         list_for_each_safe(tmp, n, &imp->imp_sending_list) {
1626                 struct ptlrpc_request *req =
1627                         list_entry(tmp, struct ptlrpc_request, rq_list);
1628
1629                 DEBUG_REQ(D_HA, req, "inflight");
1630
1631                 spin_lock (&req->rq_lock);
1632                 if (req->rq_import_generation < imp->imp_generation) {
1633                         req->rq_err = 1;
1634                         ptlrpc_wake_client_req(req);
1635                 }
1636                 spin_unlock (&req->rq_lock);
1637         }
1638
1639         list_for_each_safe(tmp, n, &imp->imp_delayed_list) {
1640                 struct ptlrpc_request *req =
1641                         list_entry(tmp, struct ptlrpc_request, rq_list);
1642
1643                 DEBUG_REQ(D_HA, req, "aborting waiting req");
1644
1645                 spin_lock (&req->rq_lock);
1646                 if (req->rq_import_generation < imp->imp_generation) {
1647                         req->rq_err = 1;
1648                         ptlrpc_wake_client_req(req);
1649                 }
1650                 spin_unlock (&req->rq_lock);
1651         }
1652
1653         /* Last chance to free reqs left on the replay list, but we
1654          * will still leak reqs that haven't comitted.  */
1655         if (imp->imp_replayable)
1656                 ptlrpc_free_committed(imp);
1657
1658         spin_unlock_irqrestore(&imp->imp_lock, flags);
1659
1660         EXIT;
1661 }
1662
1663 static __u64 ptlrpc_last_xid = 0;
1664 static spinlock_t ptlrpc_last_xid_lock = SPIN_LOCK_UNLOCKED;
1665
1666 __u64 ptlrpc_next_xid(void)
1667 {
1668         __u64 tmp;
1669         spin_lock(&ptlrpc_last_xid_lock);
1670         tmp = ++ptlrpc_last_xid;
1671         spin_unlock(&ptlrpc_last_xid_lock);
1672         return tmp;
1673 }
1674
1675