Whamcloud - gitweb
Land b1_2_smallfix onto b1_2 (20040608_1846)
[fs/lustre-release.git] / lustre / ptlrpc / client.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  *
21  */
22
23 #define DEBUG_SUBSYSTEM S_RPC
24 #ifndef __KERNEL__
25 #include <errno.h>
26 #include <signal.h>
27 #include <liblustre.h>
28 #endif
29
30 #include <linux/obd_support.h>
31 #include <linux/obd_class.h>
32 #include <linux/lustre_lib.h>
33 #include <linux/lustre_ha.h>
34 #include <linux/lustre_import.h>
35
36 #include "ptlrpc_internal.h"
37
38 void ptlrpc_init_client(int req_portal, int rep_portal, char *name,
39                         struct ptlrpc_client *cl)
40 {
41         cl->cli_request_portal = req_portal;
42         cl->cli_reply_portal   = rep_portal;
43         cl->cli_name           = name;
44 }
45
46 struct ptlrpc_connection *ptlrpc_uuid_to_connection(struct obd_uuid *uuid)
47 {
48         struct ptlrpc_connection *c;
49         struct ptlrpc_peer peer;
50         int err;
51
52         err = ptlrpc_uuid_to_peer(uuid, &peer);
53         if (err != 0) {
54                 CERROR("cannot find peer %s!\n", uuid->uuid);
55                 return NULL;
56         }
57
58         c = ptlrpc_get_connection(&peer, uuid);
59         if (c) {
60                 memcpy(c->c_remote_uuid.uuid,
61                        uuid->uuid, sizeof(c->c_remote_uuid.uuid));
62         }
63
64         CDEBUG(D_INFO, "%s -> %p\n", uuid->uuid, c);
65
66         return c;
67 }
68
69 void ptlrpc_readdress_connection(struct ptlrpc_connection *conn,
70                                  struct obd_uuid *uuid)
71 {
72         struct ptlrpc_peer peer;
73         int err;
74
75         err = ptlrpc_uuid_to_peer(uuid, &peer);
76         if (err != 0) {
77                 CERROR("cannot find peer %s!\n", uuid->uuid);
78                 return;
79         }
80
81         memcpy(&conn->c_peer, &peer, sizeof (peer));
82         return;
83 }
84
85 static inline struct ptlrpc_bulk_desc *new_bulk(int npages, int type, int portal)
86 {
87         struct ptlrpc_bulk_desc *desc;
88
89         OBD_ALLOC(desc, offsetof (struct ptlrpc_bulk_desc, bd_iov[npages]));
90         if (!desc)
91                 return NULL;
92
93         spin_lock_init(&desc->bd_lock);
94         init_waitqueue_head(&desc->bd_waitq);
95         desc->bd_max_pages = npages;
96         desc->bd_page_count = 0;
97         desc->bd_md_h = PTL_HANDLE_NONE;
98         desc->bd_portal = portal;
99         desc->bd_type = type;
100         
101         return desc;
102 }
103
104 struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_imp (struct ptlrpc_request *req,
105                                                int npages, int type, int portal)
106 {
107         struct obd_import *imp = req->rq_import;
108         struct ptlrpc_bulk_desc *desc;
109
110         LASSERT(type == BULK_PUT_SINK || type == BULK_GET_SOURCE);
111         desc = new_bulk(npages, type, portal);
112         if (desc == NULL)
113                 RETURN(NULL);
114
115         desc->bd_import_generation = req->rq_import_generation;
116         desc->bd_import = class_import_get(imp);
117         desc->bd_req = req;
118
119         desc->bd_cbid.cbid_fn  = client_bulk_callback;
120         desc->bd_cbid.cbid_arg = desc;
121
122         /* This makes req own desc, and free it when she frees herself */
123         req->rq_bulk = desc;
124
125         return desc;
126 }
127
128 struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_exp (struct ptlrpc_request *req,
129                                                int npages, int type, int portal)
130 {
131         struct obd_export *exp = req->rq_export;
132         struct ptlrpc_bulk_desc *desc;
133
134         LASSERT(type == BULK_PUT_SOURCE || type == BULK_GET_SINK);
135
136         desc = new_bulk(npages, type, portal);
137         if (desc == NULL)
138                 RETURN(NULL);
139
140         desc->bd_export = class_export_get(exp);
141         desc->bd_req = req;
142
143         desc->bd_cbid.cbid_fn  = server_bulk_callback;
144         desc->bd_cbid.cbid_arg = desc;
145
146         /* NB we don't assign rq_bulk here; server-side requests are
147          * re-used, and the handler frees the bulk desc explicitly. */
148
149         return desc;
150 }
151
152 void ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc,
153                            struct page *page, int pageoffset, int len)
154 {
155 #ifdef __KERNEL__
156         ptl_kiov_t *kiov = &desc->bd_iov[desc->bd_page_count];
157 #else
158         struct iovec *iov = &desc->bd_iov[desc->bd_page_count];
159 #endif
160         LASSERT(desc->bd_page_count < desc->bd_max_pages);
161         LASSERT(page != NULL);
162         LASSERT(pageoffset >= 0);
163         LASSERT(len > 0);
164         LASSERT(pageoffset + len <= PAGE_SIZE);
165
166 #ifdef __KERNEL__
167         kiov->kiov_page   = page;
168         kiov->kiov_offset = pageoffset;
169         kiov->kiov_len    = len;
170 #else
171         iov->iov_base = page->addr + pageoffset;
172         iov->iov_len  = len;
173 #endif
174         desc->bd_page_count++;
175         desc->bd_nob += len;
176 }
177
178 void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *desc)
179 {
180         ENTRY;
181
182         LASSERT(desc != NULL);
183         LASSERT(desc->bd_page_count != 0x5a5a5a5a); /* not freed already */
184         LASSERT(!desc->bd_network_rw);         /* network hands off or */
185         LASSERT((desc->bd_export != NULL) ^ (desc->bd_import != NULL));
186         if (desc->bd_export)
187                 class_export_put(desc->bd_export);
188         else
189                 class_import_put(desc->bd_import);
190
191         OBD_FREE(desc, offsetof(struct ptlrpc_bulk_desc, 
192                                 bd_iov[desc->bd_max_pages]));
193         EXIT;
194 }
195
196 struct ptlrpc_request *ptlrpc_prep_req(struct obd_import *imp, int opcode,
197                                        int count, int *lengths, char **bufs)
198 {
199         struct ptlrpc_request *request;
200         int rc;
201         ENTRY;
202
203         LASSERT((unsigned long)imp > 0x1000);
204
205         OBD_ALLOC(request, sizeof(*request));
206         if (!request) {
207                 CERROR("request allocation out of memory\n");
208                 RETURN(NULL);
209         }
210
211         rc = lustre_pack_request(request, count, lengths, bufs);
212         if (rc) {
213                 CERROR("cannot pack request %d\n", rc);
214                 OBD_FREE(request, sizeof(*request));
215                 RETURN(NULL);
216         }
217
218         if (imp->imp_server_timeout)
219                 request->rq_timeout = obd_timeout / 2;
220         else
221                 request->rq_timeout = obd_timeout;
222         request->rq_send_state = LUSTRE_IMP_FULL;
223         request->rq_type = PTL_RPC_MSG_REQUEST;
224         request->rq_import = class_import_get(imp);
225
226         request->rq_req_cbid.cbid_fn  = request_out_callback;
227         request->rq_req_cbid.cbid_arg = request;
228
229         request->rq_reply_cbid.cbid_fn  = reply_in_callback;
230         request->rq_reply_cbid.cbid_arg = request;
231         
232         request->rq_phase = RQ_PHASE_NEW;
233
234         /* XXX FIXME bug 249 */
235         request->rq_request_portal = imp->imp_client->cli_request_portal;
236         request->rq_reply_portal = imp->imp_client->cli_reply_portal;
237
238         spin_lock_init(&request->rq_lock);
239         INIT_LIST_HEAD(&request->rq_list);
240         INIT_LIST_HEAD(&request->rq_replay_list);
241         INIT_LIST_HEAD(&request->rq_set_chain);
242         init_waitqueue_head(&request->rq_reply_waitq);
243         request->rq_xid = ptlrpc_next_xid();
244         atomic_set(&request->rq_refcount, 1);
245
246         request->rq_reqmsg->opc = opcode;
247         request->rq_reqmsg->flags = 0;
248
249         RETURN(request);
250 }
251
252 struct ptlrpc_request_set *ptlrpc_prep_set(void)
253 {
254         struct ptlrpc_request_set *set;
255
256         OBD_ALLOC(set, sizeof *set);
257         if (!set)
258                 RETURN(NULL);
259         INIT_LIST_HEAD(&set->set_requests);
260         init_waitqueue_head(&set->set_waitq);
261         set->set_remaining = 0;
262         spin_lock_init(&set->set_new_req_lock);
263         INIT_LIST_HEAD(&set->set_new_requests);
264
265         RETURN(set);
266 }
267
268 /* Finish with this set; opposite of prep_set. */
269 void ptlrpc_set_destroy(struct ptlrpc_request_set *set)
270 {
271         struct list_head *tmp;
272         struct list_head *next;
273         int               expected_phase;
274         int               n = 0;
275         ENTRY;
276
277         /* Requests on the set should either all be completed, or all be new */
278         expected_phase = (set->set_remaining == 0) ?
279                          RQ_PHASE_COMPLETE : RQ_PHASE_NEW;
280         list_for_each (tmp, &set->set_requests) {
281                 struct ptlrpc_request *req =
282                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
283
284                 LASSERT(req->rq_phase == expected_phase);
285                 n++;
286         }
287
288         LASSERT(set->set_remaining == 0 || set->set_remaining == n);
289
290         list_for_each_safe(tmp, next, &set->set_requests) {
291                 struct ptlrpc_request *req =
292                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
293                 list_del_init(&req->rq_set_chain);
294
295                 LASSERT(req->rq_phase == expected_phase);
296
297                 if (req->rq_phase == RQ_PHASE_NEW) {
298
299                         if (req->rq_interpret_reply != NULL) {
300                                 int (*interpreter)(struct ptlrpc_request *,
301                                                    void *, int) =
302                                         req->rq_interpret_reply;
303
304                                 /* higher level (i.e. LOV) failed;
305                                  * let the sub reqs clean up */
306                                 req->rq_status = -EBADR;
307                                 interpreter(req, &req->rq_async_args,
308                                             req->rq_status);
309                         }
310                         set->set_remaining--;
311                 }
312
313                 req->rq_set = NULL;
314                 ptlrpc_req_finished (req);
315         }
316
317         LASSERT(set->set_remaining == 0);
318
319         OBD_FREE(set, sizeof(*set));
320         EXIT;
321 }
322
323 void ptlrpc_set_add_req(struct ptlrpc_request_set *set,
324                         struct ptlrpc_request *req)
325 {
326         /* The set takes over the caller's request reference */
327         list_add_tail(&req->rq_set_chain, &set->set_requests);
328         req->rq_set = set;
329         set->set_remaining++;
330         atomic_inc(&req->rq_import->imp_inflight);
331 }
332
333 /* lock so many callers can add things, the context that owns the set
334  * is supposed to notice these and move them into the set proper. */
335 void ptlrpc_set_add_new_req(struct ptlrpc_request_set *set,
336                             struct ptlrpc_request *req)
337 {
338         unsigned long flags;
339         spin_lock_irqsave(&set->set_new_req_lock, flags);
340         /* The set takes over the caller's request reference */
341         list_add_tail(&req->rq_set_chain, &set->set_new_requests);
342         req->rq_set = set;
343         spin_unlock_irqrestore(&set->set_new_req_lock, flags);
344 }
345
346 /*
347  * Based on the current state of the import, determine if the request
348  * can be sent, is an error, or should be delayed.
349  *
350  * Returns true if this request should be delayed. If false, and
351  * *status is set, then the request can not be sent and *status is the
352  * error code.  If false and status is 0, then request can be sent.
353  *
354  * The imp->imp_lock must be held.
355  */
356 static int ptlrpc_import_delay_req(struct obd_import *imp, 
357                                    struct ptlrpc_request *req, int *status)
358 {
359         int delay = 0;
360         ENTRY;
361
362         LASSERT (status != NULL);
363         *status = 0;
364
365         if (imp->imp_state == LUSTRE_IMP_NEW) {
366                 DEBUG_REQ(D_ERROR, req, "Uninitialized import.");
367                 *status = -EIO;
368                 LBUG();
369         }
370         else if (imp->imp_state == LUSTRE_IMP_CLOSED) {
371                 DEBUG_REQ(D_ERROR, req, "IMP_CLOSED ");
372                 *status = -EIO;
373         }
374         /* allow CONNECT even if import is invalid */
375         else if (req->rq_send_state == LUSTRE_IMP_CONNECTING &&
376                  imp->imp_state == LUSTRE_IMP_CONNECTING) {
377                 ;
378         }
379         /*
380          * If the import has been invalidated (such as by an OST failure), the
381          * request must fail with -EIO.  
382          */
383         else if (imp->imp_invalid) {
384                 DEBUG_REQ(D_ERROR, req, "IMP_INVALID");
385                 *status = -EIO;
386         } 
387         else if (req->rq_import_generation != imp->imp_generation) {
388                 DEBUG_REQ(D_ERROR, req, "req wrong generation:");
389                 *status = -EIO;
390         } 
391         else if (req->rq_send_state != imp->imp_state) {
392                 if (imp->imp_obd->obd_no_recov || imp->imp_dlm_fake 
393                     || req->rq_no_delay) 
394                         *status = -EWOULDBLOCK;
395                 else
396                         delay = 1;
397         }
398
399         RETURN(delay);
400 }
401
402 static int ptlrpc_check_reply(struct ptlrpc_request *req)
403 {
404         unsigned long flags;
405         int rc = 0;
406         ENTRY;
407
408         /* serialise with network callback */
409         spin_lock_irqsave (&req->rq_lock, flags);
410
411         if (req->rq_replied) {
412                 DEBUG_REQ(D_NET, req, "REPLIED:");
413                 GOTO(out, rc = 1);
414         }
415         
416         if (req->rq_net_err && !req->rq_timedout) {
417                 spin_unlock_irqrestore (&req->rq_lock, flags);
418                 rc = ptlrpc_expire_one_request(req); 
419                 spin_lock_irqsave (&req->rq_lock, flags);
420                 GOTO(out, rc);
421         }
422
423         if (req->rq_err) {
424                 DEBUG_REQ(D_ERROR, req, "ABORTED:");
425                 GOTO(out, rc = 1);
426         }
427
428         if (req->rq_resend) {
429                 DEBUG_REQ(D_ERROR, req, "RESEND:");
430                 GOTO(out, rc = 1);
431         }
432
433         if (req->rq_restart) {
434                 DEBUG_REQ(D_ERROR, req, "RESTART:");
435                 GOTO(out, rc = 1);
436         }
437         EXIT;
438  out:
439         spin_unlock_irqrestore (&req->rq_lock, flags);
440         DEBUG_REQ(D_NET, req, "rc = %d for", rc);
441         return rc;
442 }
443
444 static int ptlrpc_check_status(struct ptlrpc_request *req)
445 {
446         int err;
447         ENTRY;
448
449         err = req->rq_repmsg->status;
450         if (req->rq_repmsg->type == PTL_RPC_MSG_ERR) {
451                 DEBUG_REQ(D_ERROR, req, "type == PTL_RPC_MSG_ERR, err == %d", 
452                           err);
453                 RETURN(err < 0 ? err : -EINVAL);
454         }
455
456         if (err < 0) {
457                 DEBUG_REQ(D_INFO, req, "status is %d", err);
458         } else if (err > 0) {
459                 /* XXX: translate this error from net to host */
460                 DEBUG_REQ(D_INFO, req, "status is %d", err);
461         }
462
463         RETURN(err);
464 }
465
466 static int after_reply(struct ptlrpc_request *req)
467 {
468         unsigned long flags;
469         struct obd_import *imp = req->rq_import;
470         int rc;
471         ENTRY;
472
473         LASSERT(!req->rq_receiving_reply);
474
475         /* NB Until this point, the whole of the incoming message,
476          * including buflens, status etc is in the sender's byte order. */
477
478 #if SWAB_PARANOIA
479         /* Clear reply swab mask; this is a new reply in sender's byte order */
480         req->rq_rep_swab_mask = 0;
481 #endif
482         LASSERT (req->rq_nob_received <= req->rq_replen);
483         rc = lustre_unpack_msg(req->rq_repmsg, req->rq_nob_received);
484         if (rc) {
485                 CERROR("unpack_rep failed: %d\n", rc);
486                 RETURN(-EPROTO);
487         }
488
489         if (req->rq_repmsg->type != PTL_RPC_MSG_REPLY &&
490             req->rq_repmsg->type != PTL_RPC_MSG_ERR) {
491                 CERROR("invalid packet type received (type=%u)\n",
492                        req->rq_repmsg->type);
493                 RETURN(-EPROTO);
494         }
495
496         /* Store transno in reqmsg for replay. */
497         req->rq_reqmsg->transno = req->rq_transno = req->rq_repmsg->transno;
498
499         rc = ptlrpc_check_status(req);
500
501         /* Either we've been evicted, or the server has failed for
502          * some reason. Try to reconnect, and if that fails, punt to the
503          * upcall. */
504         if (rc == -ENOTCONN) {
505                 if (req->rq_send_state != LUSTRE_IMP_FULL ||
506                     imp->imp_obd->obd_no_recov || imp->imp_dlm_fake) {
507                         RETURN(-ENOTCONN);
508                 }
509
510                 ptlrpc_request_handle_notconn(req);
511
512                 RETURN(rc);
513         }
514
515         if (req->rq_import->imp_replayable) {
516                 spin_lock_irqsave(&imp->imp_lock, flags);
517                 if (req->rq_replay || req->rq_transno != 0)
518                         ptlrpc_retain_replayable_request(req, imp);
519                 else if (req->rq_commit_cb != NULL)
520                         req->rq_commit_cb(req);
521
522                 if (req->rq_transno > imp->imp_max_transno)
523                         imp->imp_max_transno = req->rq_transno;
524
525                 /* Replay-enabled imports return commit-status information. */
526                 if (req->rq_repmsg->last_committed)
527                         imp->imp_peer_committed_transno =
528                                 req->rq_repmsg->last_committed;
529                 ptlrpc_free_committed(imp);
530                 spin_unlock_irqrestore(&imp->imp_lock, flags);
531         }
532
533         RETURN(rc);
534 }
535
536 static int ptlrpc_send_new_req(struct ptlrpc_request *req)
537 {
538         struct obd_import     *imp;
539         unsigned long          flags;
540         int rc;
541         ENTRY;
542
543         LASSERT(req->rq_phase == RQ_PHASE_NEW);
544         req->rq_phase = RQ_PHASE_RPC;
545
546         imp = req->rq_import;
547         spin_lock_irqsave(&imp->imp_lock, flags);
548
549         req->rq_import_generation = imp->imp_generation;
550
551         if (ptlrpc_import_delay_req(imp, req, &rc)) {
552                 spin_lock (&req->rq_lock);
553                 req->rq_waiting = 1;
554                 spin_unlock (&req->rq_lock);
555
556                 DEBUG_REQ(D_HA, req, "req from PID %d waiting for recovery: "
557                           "(%s != %s)",
558                           req->rq_reqmsg->status, 
559                           ptlrpc_import_state_name(req->rq_send_state),
560                           ptlrpc_import_state_name(imp->imp_state));
561                 LASSERT(list_empty (&req->rq_list));
562
563                 list_add_tail(&req->rq_list, &imp->imp_delayed_list);
564                 spin_unlock_irqrestore(&imp->imp_lock, flags);
565                 RETURN(0);
566         }
567
568         if (rc != 0) {
569                 spin_unlock_irqrestore(&imp->imp_lock, flags);
570                 req->rq_status = rc;
571                 req->rq_phase = RQ_PHASE_INTERPRET;
572                 RETURN(rc);
573         }
574
575         /* XXX this is the same as ptlrpc_queue_wait */
576         LASSERT(list_empty(&req->rq_list));
577         list_add_tail(&req->rq_list, &imp->imp_sending_list);
578         spin_unlock_irqrestore(&imp->imp_lock, flags);
579
580         req->rq_reqmsg->status = current->pid;
581         CDEBUG(D_RPCTRACE, "Sending RPC pname:cluuid:pid:xid:ni:nid:opc"
582                " %s:%s:%d:"LPU64":%s:"LPX64":%d\n", current->comm,
583                imp->imp_obd->obd_uuid.uuid, req->rq_reqmsg->status,
584                req->rq_xid,
585                imp->imp_connection->c_peer.peer_ni->pni_name,
586                imp->imp_connection->c_peer.peer_nid,
587                req->rq_reqmsg->opc);
588
589         rc = ptl_send_rpc(req);
590         if (rc) {
591                 DEBUG_REQ(D_HA, req, "send failed (%d); expect timeout", rc);
592                 req->rq_net_err = 1;
593                 RETURN(rc);
594         }
595         RETURN(0);
596 }
597
598 int ptlrpc_check_set(struct ptlrpc_request_set *set)
599 {
600         unsigned long flags;
601         struct list_head *tmp;
602         int force_timer_recalc = 0;
603         ENTRY;
604
605         if (set->set_remaining == 0)
606                 RETURN(1);
607
608         list_for_each(tmp, &set->set_requests) {
609                 struct ptlrpc_request *req =
610                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
611                 struct obd_import *imp = req->rq_import;
612                 int rc = 0;
613
614                 if (req->rq_phase == RQ_PHASE_NEW &&
615                     ptlrpc_send_new_req(req)) {
616                         force_timer_recalc = 1;
617                 }
618
619                 if (!(req->rq_phase == RQ_PHASE_RPC ||
620                       req->rq_phase == RQ_PHASE_BULK ||
621                       req->rq_phase == RQ_PHASE_INTERPRET ||
622                       req->rq_phase == RQ_PHASE_COMPLETE)) {
623                         DEBUG_REQ(D_ERROR, req, "bad phase %x", req->rq_phase);
624                         LBUG();
625                 }
626
627                 if (req->rq_phase == RQ_PHASE_COMPLETE)
628                         continue;
629
630                 if (req->rq_phase == RQ_PHASE_INTERPRET)
631                         GOTO(interpret, req->rq_status);
632
633                 if (req->rq_net_err && !req->rq_timedout)
634                         ptlrpc_expire_one_request(req); 
635
636                 if (req->rq_err) {
637                         ptlrpc_unregister_reply(req);
638                         if (req->rq_status == 0)
639                                 req->rq_status = -EIO;
640                         req->rq_phase = RQ_PHASE_INTERPRET;
641
642                         spin_lock_irqsave(&imp->imp_lock, flags);
643                         list_del_init(&req->rq_list);
644                         spin_unlock_irqrestore(&imp->imp_lock, flags);
645
646                         GOTO(interpret, req->rq_status);
647                 }
648
649                 /* ptlrpc_queue_wait->l_wait_event guarantees that rq_intr
650                  * will only be set after rq_timedout, but the oig waiting
651                  * path sets rq_intr irrespective of whether ptlrpcd has
652                  * seen a timeout.  our policy is to only interpret 
653                  * interrupted rpcs after they have timed out */
654                 if (req->rq_intr && (req->rq_timedout || req->rq_waiting)) {
655                         /* NB could be on delayed list */
656                         ptlrpc_unregister_reply(req);
657                         req->rq_status = -EINTR;
658                         req->rq_phase = RQ_PHASE_INTERPRET;
659
660                         spin_lock_irqsave(&imp->imp_lock, flags);
661                         list_del_init(&req->rq_list);
662                         spin_unlock_irqrestore(&imp->imp_lock, flags);
663
664                         GOTO(interpret, req->rq_status);
665                 }
666
667                 if (req->rq_phase == RQ_PHASE_RPC) {
668                         if (req->rq_waiting || req->rq_resend) {
669                                 int status;
670
671                                 spin_lock_irqsave(&imp->imp_lock, flags);
672
673                                 if (ptlrpc_import_delay_req(imp, req, &status)){
674                                         spin_unlock_irqrestore(&imp->imp_lock,
675                                                                flags);
676                                         continue;
677                                 } 
678
679                                 list_del_init(&req->rq_list);
680                                 if (status != 0)  {
681                                         req->rq_status = status;
682                                         req->rq_phase = RQ_PHASE_INTERPRET;
683                                         spin_unlock_irqrestore(&imp->imp_lock,
684                                                                flags);
685                                         GOTO(interpret, req->rq_status);
686                                 }
687                                 if (req->rq_no_resend) {
688                                         req->rq_status = -ENOTCONN;
689                                         req->rq_phase = RQ_PHASE_INTERPRET;
690                                         spin_unlock_irqrestore(&imp->imp_lock,
691                                                                flags);
692                                         GOTO(interpret, req->rq_status);
693                                 }
694                                 list_add_tail(&req->rq_list,
695                                               &imp->imp_sending_list);
696
697                                 spin_unlock_irqrestore(&imp->imp_lock, flags);
698
699                                 req->rq_waiting = 0;
700                                 if (req->rq_resend) {
701                                         lustre_msg_add_flags(req->rq_reqmsg,
702                                                              MSG_RESENT);
703                                         ptlrpc_unregister_reply(req);
704                                         if (req->rq_bulk) {
705                                                 __u64 old_xid = req->rq_xid;
706
707                                                 ptlrpc_unregister_bulk (req);
708
709                                                 /* ensure previous bulk fails */
710                                                 req->rq_xid = ptlrpc_next_xid();
711                                                 CDEBUG(D_HA, "resend bulk "
712                                                        "old x"LPU64
713                                                        " new x"LPU64"\n",
714                                                        old_xid, req->rq_xid);
715                                         }
716                                 }
717
718                                 rc = ptl_send_rpc(req);
719                                 if (rc) {
720                                         DEBUG_REQ(D_HA, req, "send failed (%d)",
721                                                   rc);
722                                         force_timer_recalc = 1;
723                                         req->rq_net_err = 1;
724                                 }
725                                 /* need to reset the timeout */
726                                 force_timer_recalc = 1;
727                         }
728
729                         /* Still waiting for a reply? */
730                         if (ptlrpc_client_receiving_reply(req))
731                                 continue;
732
733                         /* Did we actually receive a reply? */
734                         if (!ptlrpc_client_replied(req))
735                                 continue;
736
737                         spin_lock_irqsave(&imp->imp_lock, flags);
738                         list_del_init(&req->rq_list);
739                         spin_unlock_irqrestore(&imp->imp_lock, flags);
740
741                         req->rq_status = after_reply(req);
742                         if (req->rq_resend) {
743                                 /* Add this req to the delayed list so
744                                    it can be errored if the import is
745                                    evicted after recovery. */
746                                 spin_lock_irqsave (&req->rq_lock, flags);
747                                 list_add_tail(&req->rq_list, 
748                                               &imp->imp_delayed_list);
749                                 spin_unlock_irqrestore(&req->rq_lock, flags);
750                                 continue;
751                         }
752
753                         /* If there is no bulk associated with this request,
754                          * then we're done and should let the interpreter
755                          * process the reply.  Similarly if the RPC returned
756                          * an error, and therefore the bulk will never arrive.
757                          */
758                         if (req->rq_bulk == NULL || req->rq_status != 0) {
759                                 req->rq_phase = RQ_PHASE_INTERPRET;
760                                 GOTO(interpret, req->rq_status);
761                         }
762
763                         req->rq_phase = RQ_PHASE_BULK;
764                 }
765
766                 LASSERT(req->rq_phase == RQ_PHASE_BULK);
767                 if (ptlrpc_bulk_active(req->rq_bulk))
768                         continue;
769
770                 if (!req->rq_bulk->bd_success) {
771                         /* The RPC reply arrived OK, but the bulk screwed
772                          * up!  Dead wierd since the server told us the RPC
773                          * was good after getting the REPLY for her GET or
774                          * the ACK for her PUT. */
775                         DEBUG_REQ(D_ERROR, req, "bulk transfer failed");
776                         LBUG();
777                 }
778
779                 req->rq_phase = RQ_PHASE_INTERPRET;
780
781         interpret:
782                 LASSERT(req->rq_phase == RQ_PHASE_INTERPRET);
783                 LASSERT(!req->rq_receiving_reply);
784
785                 ptlrpc_unregister_reply(req);
786                 if (req->rq_bulk != NULL)
787                         ptlrpc_unregister_bulk (req);
788
789                 req->rq_phase = RQ_PHASE_COMPLETE;
790
791                 if (req->rq_interpret_reply != NULL) {
792                         int (*interpreter)(struct ptlrpc_request *,void *,int) =
793                                 req->rq_interpret_reply;
794                         req->rq_status = interpreter(req, &req->rq_async_args,
795                                                      req->rq_status);
796                 }
797
798                 CDEBUG(D_RPCTRACE, "Completed RPC pname:cluuid:pid:xid:ni:nid:"
799                        "opc %s:%s:%d:"LPU64":%s:"LPX64":%d\n", current->comm,
800                        imp->imp_obd->obd_uuid.uuid, req->rq_reqmsg->status,
801                        req->rq_xid,
802                        imp->imp_connection->c_peer.peer_ni->pni_name,
803                        imp->imp_connection->c_peer.peer_nid,
804                        req->rq_reqmsg->opc);
805
806                 set->set_remaining--;
807
808                 atomic_dec(&imp->imp_inflight);
809                 wake_up(&imp->imp_recovery_waitq);
810         }
811
812         /* If we hit an error, we want to recover promptly. */
813         RETURN(set->set_remaining == 0 || force_timer_recalc);
814 }
815
816 int ptlrpc_expire_one_request(struct ptlrpc_request *req)
817 {
818         unsigned long      flags;
819         struct obd_import *imp = req->rq_import;
820         ENTRY;
821
822         DEBUG_REQ(D_ERROR, req, "timeout (sent %lu)", (long)req->rq_sent);
823
824         spin_lock_irqsave (&req->rq_lock, flags);
825         req->rq_timedout = 1;
826         spin_unlock_irqrestore (&req->rq_lock, flags);
827
828         ptlrpc_unregister_reply (req);
829
830         if (req->rq_bulk != NULL)
831                 ptlrpc_unregister_bulk (req);
832
833         if (imp == NULL) {
834                 DEBUG_REQ(D_HA, req, "NULL import: already cleaned up?");
835                 RETURN(1);
836         }
837
838         /* The DLM server doesn't want recovery run on its imports. */
839         if (imp->imp_dlm_fake)
840                 RETURN(1);
841
842         /* If this request is for recovery or other primordial tasks,
843          * then error it out here. */
844         if (req->rq_send_state != LUSTRE_IMP_FULL || 
845             imp->imp_obd->obd_no_recov) {
846                 spin_lock_irqsave (&req->rq_lock, flags);
847                 req->rq_status = -ETIMEDOUT;
848                 req->rq_err = 1;
849                 spin_unlock_irqrestore (&req->rq_lock, flags);
850                 RETURN(1);
851         }
852
853         ptlrpc_fail_import(imp, req->rq_import_generation);
854
855         RETURN(0);
856 }
857
858 int ptlrpc_expired_set(void *data)
859 {
860         struct ptlrpc_request_set *set = data;
861         struct list_head          *tmp;
862         time_t                     now = LTIME_S (CURRENT_TIME);
863         ENTRY;
864
865         LASSERT(set != NULL);
866
867         /* A timeout expired; see which reqs it applies to... */
868         list_for_each (tmp, &set->set_requests) {
869                 struct ptlrpc_request *req =
870                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
871
872                 /* request in-flight? */
873                 if (!((req->rq_phase == RQ_PHASE_RPC && !req->rq_waiting 
874                        && !req->rq_resend) ||
875                       (req->rq_phase == RQ_PHASE_BULK)))
876                         continue;
877
878                 if (req->rq_timedout ||           /* already dealt with */
879                     req->rq_sent + req->rq_timeout > now) /* not expired */
880                         continue;
881
882                 /* deal with this guy */
883                 ptlrpc_expire_one_request (req);
884         }
885
886         /* When waiting for a whole set, we always to break out of the
887          * sleep so we can recalculate the timeout, or enable interrupts
888          * iff everyone's timed out.
889          */
890         RETURN(1);
891 }
892
893 void ptlrpc_mark_interrupted(struct ptlrpc_request *req)
894 {
895         unsigned long flags;
896         spin_lock_irqsave(&req->rq_lock, flags);
897         req->rq_intr = 1;
898         spin_unlock_irqrestore(&req->rq_lock, flags);
899 }
900
901 void ptlrpc_interrupted_set(void *data)
902 {
903         struct ptlrpc_request_set *set = data;
904         struct list_head *tmp;
905
906         LASSERT(set != NULL);
907         CERROR("INTERRUPTED SET %p\n", set);
908
909         list_for_each(tmp, &set->set_requests) {
910                 struct ptlrpc_request *req =
911                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
912
913                 if (req->rq_phase != RQ_PHASE_RPC)
914                         continue;
915
916                 ptlrpc_mark_interrupted(req);
917         }
918 }
919
920 int ptlrpc_set_next_timeout(struct ptlrpc_request_set *set)
921 {
922         struct list_head      *tmp;
923         time_t                 now = LTIME_S(CURRENT_TIME);
924         time_t                 deadline;
925         int                    timeout = 0;
926         struct ptlrpc_request *req;
927         ENTRY;
928
929         SIGNAL_MASK_ASSERT(); /* XXX BUG 1511 */
930
931         list_for_each(tmp, &set->set_requests) {
932                 req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
933
934                 /* request in-flight? */
935                 if (!((req->rq_phase == RQ_PHASE_RPC && !req->rq_waiting) ||
936                       (req->rq_phase == RQ_PHASE_BULK)))
937                         continue;
938
939                 if (req->rq_timedout)   /* already timed out */
940                         continue;
941
942                 deadline = req->rq_sent + req->rq_timeout;
943                 if (deadline <= now)    /* actually expired already */
944                         timeout = 1;    /* ASAP */
945                 else if (timeout == 0 || timeout > deadline - now)
946                         timeout = deadline - now;
947         }
948         RETURN(timeout);
949 }
950
951 int ptlrpc_set_wait(struct ptlrpc_request_set *set)
952 {
953         struct list_head      *tmp;
954         struct ptlrpc_request *req;
955         struct l_wait_info     lwi;
956         int                    rc, timeout;
957         ENTRY;
958
959         LASSERT(!list_empty(&set->set_requests));
960         list_for_each(tmp, &set->set_requests) {
961                 req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
962                 if (req->rq_phase == RQ_PHASE_NEW)
963                         (void)ptlrpc_send_new_req(req);
964         }
965
966         do {
967                 timeout = ptlrpc_set_next_timeout(set);
968
969                 /* wait until all complete, interrupted, or an in-flight
970                  * req times out */
971                 CDEBUG(D_HA, "set %p going to sleep for %d seconds\n",
972                        set, timeout);
973                 lwi = LWI_TIMEOUT_INTR((timeout ? timeout : 1) * HZ,
974                                        ptlrpc_expired_set, 
975                                        ptlrpc_interrupted_set, set);
976                 rc = l_wait_event(set->set_waitq, ptlrpc_check_set(set), &lwi);
977
978                 LASSERT(rc == 0 || rc == -EINTR || rc == -ETIMEDOUT);
979
980                 /* -EINTR => all requests have been flagged rq_intr so next
981                  * check completes.
982                  * -ETIMEOUTD => someone timed out.  When all reqs have
983                  * timed out, signals are enabled allowing completion with
984                  * EINTR.
985                  * I don't really care if we go once more round the loop in
986                  * the error cases -eeb. */
987         } while (rc != 0);
988
989         LASSERT(set->set_remaining == 0);
990
991         rc = 0;
992         list_for_each(tmp, &set->set_requests) {
993                 req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
994
995                 LASSERT(req->rq_phase == RQ_PHASE_COMPLETE);
996                 if (req->rq_status != 0)
997                         rc = req->rq_status;
998         }
999
1000         if (set->set_interpret != NULL) {
1001                 int (*interpreter)(struct ptlrpc_request_set *set,void *,int) =
1002                         set->set_interpret;
1003                 rc = interpreter (set, &set->set_args, rc);
1004         }
1005
1006         RETURN(rc);
1007 }
1008
1009 static void __ptlrpc_free_req(struct ptlrpc_request *request, int locked)
1010 {
1011         ENTRY;
1012         if (request == NULL) {
1013                 EXIT;
1014                 return;
1015         }
1016
1017         LASSERTF(!request->rq_receiving_reply, "req %p\n", request);
1018         LASSERTF(request->rq_rqbd == NULL, "req %p\n",request);/* client-side */
1019         LASSERTF(list_empty(&request->rq_list), "req %p\n", request);
1020         LASSERTF(list_empty(&request->rq_set_chain), "req %p\n", request);
1021
1022         /* We must take it off the imp_replay_list first.  Otherwise, we'll set
1023          * request->rq_reqmsg to NULL while osc_close is dereferencing it. */
1024         if (request->rq_import != NULL) {
1025                 unsigned long flags = 0;
1026                 if (!locked)
1027                         spin_lock_irqsave(&request->rq_import->imp_lock, flags);
1028                 list_del_init(&request->rq_replay_list);
1029                 if (!locked)
1030                         spin_unlock_irqrestore(&request->rq_import->imp_lock,
1031                                                flags);
1032         }
1033         LASSERTF(list_empty(&request->rq_replay_list), "req %p\n", request);
1034
1035         if (atomic_read(&request->rq_refcount) != 0) {
1036                 DEBUG_REQ(D_ERROR, request,
1037                           "freeing request with nonzero refcount");
1038                 LBUG();
1039         }
1040
1041         if (request->rq_repmsg != NULL) {
1042                 OBD_FREE(request->rq_repmsg, request->rq_replen);
1043                 request->rq_repmsg = NULL;
1044         }
1045         if (request->rq_reqmsg != NULL) {
1046                 OBD_FREE(request->rq_reqmsg, request->rq_reqlen);
1047                 request->rq_reqmsg = NULL;
1048         }
1049         if (request->rq_export != NULL) {
1050                 class_export_put(request->rq_export);
1051                 request->rq_export = NULL;
1052         }
1053         if (request->rq_import != NULL) {
1054                 class_import_put(request->rq_import);
1055                 request->rq_import = NULL;
1056         }
1057         if (request->rq_bulk != NULL)
1058                 ptlrpc_free_bulk(request->rq_bulk);
1059
1060         OBD_FREE(request, sizeof(*request));
1061         EXIT;
1062 }
1063
1064 void ptlrpc_free_req(struct ptlrpc_request *request)
1065 {
1066         __ptlrpc_free_req(request, 0);
1067 }
1068
1069 static int __ptlrpc_req_finished(struct ptlrpc_request *request, int locked);
1070 void ptlrpc_req_finished_with_imp_lock(struct ptlrpc_request *request)
1071 {
1072         LASSERT_SPIN_LOCKED(&request->rq_import->imp_lock);
1073         (void)__ptlrpc_req_finished(request, 1);
1074 }
1075
1076 static int __ptlrpc_req_finished(struct ptlrpc_request *request, int locked)
1077 {
1078         ENTRY;
1079         if (request == NULL)
1080                 RETURN(1);
1081
1082         if (request == LP_POISON ||
1083             request->rq_reqmsg == LP_POISON) {
1084                 CERROR("dereferencing freed request (bug 575)\n");
1085                 LBUG();
1086                 RETURN(1);
1087         }
1088
1089         DEBUG_REQ(D_INFO, request, "refcount now %u",
1090                   atomic_read(&request->rq_refcount) - 1);
1091
1092         if (atomic_dec_and_test(&request->rq_refcount)) {
1093                 __ptlrpc_free_req(request, locked);
1094                 RETURN(1);
1095         }
1096
1097         RETURN(0);
1098 }
1099
1100 void ptlrpc_req_finished(struct ptlrpc_request *request)
1101 {
1102         __ptlrpc_req_finished(request, 0);
1103 }
1104
1105 /* Disengage the client's reply buffer from the network
1106  * NB does _NOT_ unregister any client-side bulk.
1107  * IDEMPOTENT, but _not_ safe against concurrent callers.
1108  * The request owner (i.e. the thread doing the I/O) must call...
1109  */
1110 void ptlrpc_unregister_reply (struct ptlrpc_request *request)
1111 {
1112         int                rc;
1113         wait_queue_head_t *wq;
1114         struct l_wait_info lwi;
1115
1116         LASSERT(!in_interrupt ());             /* might sleep */
1117
1118         if (!ptlrpc_client_receiving_reply(request))
1119                 return;
1120
1121         rc = PtlMDUnlink (request->rq_reply_md_h);
1122         if (rc == PTL_INV_MD) {
1123                 LASSERT (!ptlrpc_client_receiving_reply(request));
1124                 return;
1125         }
1126         
1127         LASSERT (rc == PTL_OK);
1128
1129         if (request->rq_set == NULL)
1130                 wq = &request->rq_set->set_waitq;
1131         else
1132                 wq = &request->rq_reply_waitq;
1133
1134         for (;;) {
1135                 /* Network access will complete in finite time but the HUGE
1136                  * timeout lets us CWARN for visibility of sluggish NALs */
1137                 lwi = LWI_TIMEOUT(300 * HZ, NULL, NULL);
1138                 rc = l_wait_event (*wq, !ptlrpc_client_receiving_reply(request), &lwi);
1139                 if (rc == 0)
1140                         return;
1141
1142                 LASSERT (rc == -ETIMEDOUT);
1143                 DEBUG_REQ(D_WARNING, request, "Unexpectedly long timeout");
1144         }
1145 }
1146
1147 /* caller must hold imp->imp_lock */
1148 void ptlrpc_free_committed(struct obd_import *imp)
1149 {
1150         struct list_head *tmp, *saved;
1151         struct ptlrpc_request *req;
1152         struct ptlrpc_request *last_req = NULL; /* temporary fire escape */
1153         ENTRY;
1154
1155         LASSERT(imp != NULL);
1156
1157         LASSERT_SPIN_LOCKED(&imp->imp_lock);
1158
1159         CDEBUG(D_HA, "%s: committing for last_committed "LPU64"\n",
1160                imp->imp_obd->obd_name, imp->imp_peer_committed_transno);
1161
1162         list_for_each_safe(tmp, saved, &imp->imp_replay_list) {
1163                 req = list_entry(tmp, struct ptlrpc_request, rq_replay_list);
1164
1165                 /* XXX ok to remove when 1357 resolved - rread 05/29/03  */
1166                 LASSERT(req != last_req);
1167                 last_req = req;
1168
1169                 if (req->rq_import_generation < imp->imp_generation) {
1170                         DEBUG_REQ(D_HA, req, "freeing request with old gen");
1171                         GOTO(free_req, 0);
1172                 }
1173
1174                 if (req->rq_replay) {
1175                         DEBUG_REQ(D_HA, req, "keeping (FL_REPLAY)");
1176                         continue;
1177                 }
1178
1179                 /* not yet committed */
1180                 if (req->rq_transno > imp->imp_peer_committed_transno) {
1181                         DEBUG_REQ(D_HA, req, "stopping search");
1182                         break;
1183                 }
1184
1185                 DEBUG_REQ(D_HA, req, "committing (last_committed "LPU64")",
1186                           imp->imp_peer_committed_transno);
1187 free_req:
1188                 if (req->rq_commit_cb != NULL)
1189                         req->rq_commit_cb(req);
1190                 list_del_init(&req->rq_replay_list);
1191                 __ptlrpc_req_finished(req, 1);
1192         }
1193
1194         EXIT;
1195         return;
1196 }
1197
1198 void ptlrpc_cleanup_client(struct obd_import *imp)
1199 {
1200         ENTRY;
1201         EXIT;
1202         return;
1203 }
1204
1205 void ptlrpc_resend_req(struct ptlrpc_request *req)
1206 {
1207         unsigned long flags;
1208
1209         DEBUG_REQ(D_HA, req, "going to resend");
1210         req->rq_reqmsg->handle.cookie = 0;
1211         req->rq_status = -EAGAIN;
1212
1213         spin_lock_irqsave (&req->rq_lock, flags);
1214         req->rq_resend = 1;
1215         req->rq_net_err = 0;
1216         req->rq_timedout = 0;
1217         if (req->rq_bulk) {
1218                 __u64 old_xid = req->rq_xid;
1219
1220                 /* ensure previous bulk fails */
1221                 req->rq_xid = ptlrpc_next_xid();
1222                 CDEBUG(D_HA, "resend bulk old x"LPU64" new x"LPU64"\n",
1223                        old_xid, req->rq_xid);
1224         }
1225         ptlrpc_wake_client_req(req);
1226         spin_unlock_irqrestore (&req->rq_lock, flags);
1227 }
1228
1229 /* XXX: this function and rq_status are currently unused */
1230 void ptlrpc_restart_req(struct ptlrpc_request *req)
1231 {
1232         unsigned long flags;
1233
1234         DEBUG_REQ(D_HA, req, "restarting (possibly-)completed request");
1235         req->rq_status = -ERESTARTSYS;
1236
1237         spin_lock_irqsave (&req->rq_lock, flags);
1238         req->rq_restart = 1;
1239         req->rq_timedout = 0;
1240         ptlrpc_wake_client_req(req);
1241         spin_unlock_irqrestore (&req->rq_lock, flags);
1242 }
1243
1244 static int expired_request(void *data)
1245 {
1246         struct ptlrpc_request *req = data;
1247         ENTRY;
1248
1249         RETURN(ptlrpc_expire_one_request(req));
1250 }
1251
1252 static void interrupted_request(void *data)
1253 {
1254         unsigned long flags;
1255
1256         struct ptlrpc_request *req = data;
1257         DEBUG_REQ(D_HA, req, "request interrupted");
1258         spin_lock_irqsave (&req->rq_lock, flags);
1259         req->rq_intr = 1;
1260         spin_unlock_irqrestore (&req->rq_lock, flags);
1261 }
1262
1263 struct ptlrpc_request *ptlrpc_request_addref(struct ptlrpc_request *req)
1264 {
1265         ENTRY;
1266         atomic_inc(&req->rq_refcount);
1267         RETURN(req);
1268 }
1269
1270 void ptlrpc_retain_replayable_request(struct ptlrpc_request *req,
1271                                       struct obd_import *imp)
1272 {
1273         struct list_head *tmp;
1274
1275         LASSERT_SPIN_LOCKED(&imp->imp_lock);
1276
1277         /* don't re-add requests that have been replayed */
1278         if (!list_empty(&req->rq_replay_list))
1279                 return;
1280
1281         lustre_msg_add_flags(req->rq_reqmsg,
1282                              MSG_REPLAY);
1283
1284         LASSERT(imp->imp_replayable);
1285         /* Balanced in ptlrpc_free_committed, usually. */
1286         ptlrpc_request_addref(req);
1287         list_for_each_prev(tmp, &imp->imp_replay_list) {
1288                 struct ptlrpc_request *iter =
1289                         list_entry(tmp, struct ptlrpc_request, rq_replay_list);
1290
1291                 /* We may have duplicate transnos if we create and then
1292                  * open a file, or for closes retained if to match creating
1293                  * opens, so use req->rq_xid as a secondary key.
1294                  * (See bugs 684, 685, and 428.)
1295                  * XXX no longer needed, but all opens need transnos!
1296                  */
1297                 if (iter->rq_transno > req->rq_transno)
1298                         continue;
1299
1300                 if (iter->rq_transno == req->rq_transno) {
1301                         LASSERT(iter->rq_xid != req->rq_xid);
1302                         if (iter->rq_xid > req->rq_xid)
1303                                 continue;
1304                 }
1305
1306                 list_add(&req->rq_replay_list, &iter->rq_replay_list);
1307                 return;
1308         }
1309
1310         list_add_tail(&req->rq_replay_list, &imp->imp_replay_list);
1311 }
1312
1313 int ptlrpc_queue_wait(struct ptlrpc_request *req)
1314 {
1315         int rc = 0;
1316         int brc;
1317         struct l_wait_info lwi;
1318         struct obd_import *imp = req->rq_import;
1319         unsigned long flags;
1320         int timeout = 0;
1321         ENTRY;
1322
1323         LASSERT(req->rq_set == NULL);
1324         LASSERT(!req->rq_receiving_reply);
1325         atomic_inc(&imp->imp_inflight);
1326
1327         /* for distributed debugging */
1328         req->rq_reqmsg->status = current->pid;
1329         LASSERT(imp->imp_obd != NULL);
1330         CDEBUG(D_RPCTRACE, "Sending RPC pname:cluuid:pid:xid:ni:nid:opc "
1331                "%s:%s:%d:"LPU64":%s:"LPX64":%d\n", current->comm,
1332                imp->imp_obd->obd_uuid.uuid,
1333                req->rq_reqmsg->status, req->rq_xid,
1334                imp->imp_connection->c_peer.peer_ni->pni_name,
1335                imp->imp_connection->c_peer.peer_nid,
1336                req->rq_reqmsg->opc);
1337
1338         /* Mark phase here for a little debug help */
1339         req->rq_phase = RQ_PHASE_RPC;
1340
1341         spin_lock_irqsave(&imp->imp_lock, flags);
1342         req->rq_import_generation = imp->imp_generation;
1343 restart:
1344         if (ptlrpc_import_delay_req(imp, req, &rc)) {
1345                 list_del(&req->rq_list);
1346
1347                 list_add_tail(&req->rq_list, &imp->imp_delayed_list);
1348                 spin_unlock_irqrestore(&imp->imp_lock, flags);
1349
1350                 DEBUG_REQ(D_HA, req, "\"%s\" waiting for recovery: (%s != %s)",
1351                           current->comm, 
1352                           ptlrpc_import_state_name(req->rq_send_state), 
1353                           ptlrpc_import_state_name(imp->imp_state));
1354                 lwi = LWI_INTR(interrupted_request, req);
1355                 rc = l_wait_event(req->rq_reply_waitq,
1356                                   (req->rq_send_state == imp->imp_state ||
1357                                    req->rq_err),
1358                                   &lwi);
1359                 DEBUG_REQ(D_HA, req, "\"%s\" awake: (%s == %s or %d == 1)",
1360                           current->comm, 
1361                           ptlrpc_import_state_name(imp->imp_state), 
1362                           ptlrpc_import_state_name(req->rq_send_state),
1363                           req->rq_err);
1364
1365                 spin_lock_irqsave(&imp->imp_lock, flags);
1366                 list_del_init(&req->rq_list);
1367
1368                 if (req->rq_err) {
1369                         rc = -EIO;
1370                 } 
1371                 else if (req->rq_intr) {
1372                         rc = -EINTR;
1373                 }
1374                 else if (req->rq_no_resend) {
1375                         spin_unlock_irqrestore(&imp->imp_lock, flags);
1376                         GOTO(out, rc = -ETIMEDOUT);
1377                 }
1378                 else {
1379                         GOTO(restart, rc);
1380                 }
1381         } 
1382
1383         if (rc != 0) {
1384                 list_del_init(&req->rq_list);
1385                 spin_unlock_irqrestore(&imp->imp_lock, flags);
1386                 req->rq_status = rc; // XXX this ok?
1387                 GOTO(out, rc);
1388         }
1389
1390         if (req->rq_resend) {
1391                 lustre_msg_add_flags(req->rq_reqmsg, MSG_RESENT);
1392
1393                 if (req->rq_bulk != NULL)
1394                         ptlrpc_unregister_bulk (req);
1395
1396                 DEBUG_REQ(D_HA, req, "resending: ");
1397         }
1398
1399         /* XXX this is the same as ptlrpc_set_wait */
1400         LASSERT(list_empty(&req->rq_list));
1401         list_add_tail(&req->rq_list, &imp->imp_sending_list);
1402         spin_unlock_irqrestore(&imp->imp_lock, flags);
1403
1404         rc = ptl_send_rpc(req);
1405         if (rc) {
1406                 DEBUG_REQ(D_HA, req, "send failed (%d); recovering", rc);
1407                 timeout = 1;
1408         } else {
1409                 timeout = MAX(req->rq_timeout * HZ, 1);
1410                 DEBUG_REQ(D_NET, req, "-- sleeping for %d jiffies", timeout);
1411         }
1412         lwi = LWI_TIMEOUT_INTR(timeout, expired_request, interrupted_request,
1413                                req);
1414         l_wait_event(req->rq_reply_waitq, ptlrpc_check_reply(req), &lwi);
1415         DEBUG_REQ(D_NET, req, "-- done sleeping");
1416
1417         CDEBUG(D_RPCTRACE, "Completed RPC pname:cluuid:pid:xid:ni:nid:opc "
1418                "%s:%s:%d:"LPU64":%s:"LPX64":%d\n", current->comm,
1419                imp->imp_obd->obd_uuid.uuid,
1420                req->rq_reqmsg->status, req->rq_xid,
1421                imp->imp_connection->c_peer.peer_ni->pni_name,
1422                imp->imp_connection->c_peer.peer_nid,
1423                req->rq_reqmsg->opc);
1424
1425         spin_lock_irqsave(&imp->imp_lock, flags);
1426         list_del_init(&req->rq_list);
1427         spin_unlock_irqrestore(&imp->imp_lock, flags);
1428
1429         /* If the reply was received normally, this just grabs the spinlock
1430          * (ensuring the reply callback has returned), sees that
1431          * req->rq_receiving_reply is clear and returns. */
1432         ptlrpc_unregister_reply (req);
1433
1434         if (req->rq_err)
1435                 GOTO(out, rc = -EIO);
1436
1437         /* Resend if we need to, unless we were interrupted. */
1438         if (req->rq_resend && !req->rq_intr) {
1439                 /* ...unless we were specifically told otherwise. */
1440                 if (req->rq_no_resend)
1441                         GOTO(out, rc = -ETIMEDOUT);
1442                 spin_lock_irqsave(&imp->imp_lock, flags);
1443                 goto restart;
1444         }
1445
1446         if (req->rq_intr) {
1447                 /* Should only be interrupted if we timed out. */
1448                 if (!req->rq_timedout)
1449                         DEBUG_REQ(D_ERROR, req,
1450                                   "rq_intr set but rq_timedout not");
1451                 GOTO(out, rc = -EINTR);
1452         }
1453
1454         if (req->rq_timedout) {                 /* non-recoverable timeout */
1455                 GOTO(out, rc = -ETIMEDOUT);
1456         }
1457
1458         if (!req->rq_replied) {
1459                 /* How can this be? -eeb */
1460                 DEBUG_REQ(D_ERROR, req, "!rq_replied: ");
1461                 LBUG();
1462                 GOTO(out, rc = req->rq_status);
1463         }
1464
1465         rc = after_reply (req);
1466         /* NB may return +ve success rc */
1467         if (req->rq_resend) {
1468                 spin_lock_irqsave(&imp->imp_lock, flags);
1469                 goto restart;
1470         }
1471
1472  out:
1473         if (req->rq_bulk != NULL) {
1474                 if (rc >= 0) {                  
1475                         /* success so far.  Note that anything going wrong
1476                          * with bulk now, is EXTREMELY strange, since the
1477                          * server must have believed that the bulk
1478                          * tranferred OK before she replied with success to
1479                          * me. */
1480                         lwi = LWI_TIMEOUT(timeout, NULL, NULL);
1481                         brc = l_wait_event(req->rq_reply_waitq,
1482                                            !ptlrpc_bulk_active(req->rq_bulk),
1483                                            &lwi);
1484                         LASSERT(brc == 0 || brc == -ETIMEDOUT);
1485                         if (brc != 0) {
1486                                 LASSERT(brc == -ETIMEDOUT);
1487                                 DEBUG_REQ(D_ERROR, req, "bulk timed out");
1488                                 rc = brc;
1489                         } else if (!req->rq_bulk->bd_success) {
1490                                 DEBUG_REQ(D_ERROR, req, "bulk transfer failed");
1491                                 rc = -EIO;
1492                         }
1493                 }
1494                 if (rc < 0)
1495                         ptlrpc_unregister_bulk (req);
1496         }
1497
1498         LASSERT(!req->rq_receiving_reply);
1499         req->rq_phase = RQ_PHASE_INTERPRET;
1500
1501         atomic_dec(&imp->imp_inflight);
1502         wake_up(&imp->imp_recovery_waitq);
1503         RETURN(rc);
1504 }
1505
1506 struct ptlrpc_replay_async_args {
1507         int praa_old_state;
1508         int praa_old_status;
1509 };
1510
1511 static int ptlrpc_replay_interpret(struct ptlrpc_request *req,
1512                                     void * data, int rc)
1513 {
1514         struct ptlrpc_replay_async_args *aa = data;
1515         struct obd_import *imp = req->rq_import;
1516         unsigned long flags;
1517
1518         atomic_dec(&imp->imp_replay_inflight);
1519         
1520         if (!req->rq_replied) {
1521                 CERROR("request replay timed out, restarting recovery\n");
1522                 GOTO(out, rc = -ETIMEDOUT);
1523         }
1524
1525 #if SWAB_PARANOIA
1526         /* Clear reply swab mask; this is a new reply in sender's byte order */
1527         req->rq_rep_swab_mask = 0;
1528 #endif
1529         LASSERT (req->rq_nob_received <= req->rq_replen);
1530         rc = lustre_unpack_msg(req->rq_repmsg, req->rq_nob_received);
1531         if (rc) {
1532                 CERROR("unpack_rep failed: %d\n", rc);
1533                 GOTO(out, rc = -EPROTO);
1534         }
1535
1536         if (req->rq_repmsg->type == PTL_RPC_MSG_ERR && 
1537             req->rq_repmsg->status == -ENOTCONN) 
1538                 GOTO(out, rc = req->rq_repmsg->status);
1539
1540         /* The transno had better not change over replay. */
1541         LASSERT(req->rq_reqmsg->transno == req->rq_repmsg->transno);
1542
1543         DEBUG_REQ(D_HA, req, "got rep");
1544
1545         /* let the callback do fixups, possibly including in the request */
1546         if (req->rq_replay_cb)
1547                 req->rq_replay_cb(req);
1548
1549         if (req->rq_replied && req->rq_repmsg->status != aa->praa_old_status) {
1550                 DEBUG_REQ(D_ERROR, req, "status %d, old was %d",
1551                           req->rq_repmsg->status, aa->praa_old_status);
1552         } else {
1553                 /* Put it back for re-replay. */
1554                 req->rq_repmsg->status = aa->praa_old_status;
1555         }
1556
1557         spin_lock_irqsave(&imp->imp_lock, flags);
1558         imp->imp_last_replay_transno = req->rq_transno;
1559         spin_unlock_irqrestore(&imp->imp_lock, flags);
1560
1561         /* continue with recovery */
1562         rc = ptlrpc_import_recovery_state_machine(imp);
1563  out:
1564         req->rq_send_state = aa->praa_old_state;
1565         
1566         if (rc != 0)
1567                 /* this replay failed, so restart recovery */
1568                 ptlrpc_connect_import(imp, NULL);
1569
1570         RETURN(rc);
1571 }
1572
1573
1574 int ptlrpc_replay_req(struct ptlrpc_request *req)
1575 {
1576         struct ptlrpc_replay_async_args *aa;
1577         ENTRY;
1578
1579         LASSERT(req->rq_import->imp_state == LUSTRE_IMP_REPLAY);
1580
1581         /* Not handling automatic bulk replay yet (or ever?) */
1582         LASSERT(req->rq_bulk == NULL);
1583
1584         DEBUG_REQ(D_HA, req, "REPLAY");
1585
1586         LASSERT (sizeof (*aa) <= sizeof (req->rq_async_args));
1587         aa = (struct ptlrpc_replay_async_args *)&req->rq_async_args;
1588         memset(aa, 0, sizeof *aa);
1589
1590         /* Prepare request to be resent with ptlrpcd */
1591         aa->praa_old_state = req->rq_send_state;
1592         req->rq_send_state = LUSTRE_IMP_REPLAY;
1593         req->rq_phase = RQ_PHASE_NEW;
1594         /*
1595          * Q: "How can a req get on the replay list if it wasn't replied?"
1596          * A: "If we failed during the replay of this request, it will still
1597          *     be on the list, but rq_replied will have been reset to 0."
1598          */
1599         if (req->rq_replied) {
1600                 aa->praa_old_status = req->rq_repmsg->status;
1601                 req->rq_status = 0;
1602                 req->rq_replied = 0;
1603         }
1604
1605         req->rq_interpret_reply = ptlrpc_replay_interpret;
1606         atomic_inc(&req->rq_import->imp_replay_inflight);
1607         ptlrpc_request_addref(req); /* ptlrpcd needs a ref */
1608
1609         ptlrpcd_add_req(req);
1610         RETURN(0);
1611 }
1612
1613 void ptlrpc_abort_inflight(struct obd_import *imp)
1614 {
1615         unsigned long flags;
1616         struct list_head *tmp, *n;
1617         ENTRY;
1618
1619         /* Make sure that no new requests get processed for this import.
1620          * ptlrpc_{queue,set}_wait must (and does) hold imp_lock while testing
1621          * this flag and then putting requests on sending_list or delayed_list.
1622          */
1623         spin_lock_irqsave(&imp->imp_lock, flags);
1624
1625         /* XXX locking?  Maybe we should remove each request with the list
1626          * locked?  Also, how do we know if the requests on the list are
1627          * being freed at this time?
1628          */
1629         list_for_each_safe(tmp, n, &imp->imp_sending_list) {
1630                 struct ptlrpc_request *req =
1631                         list_entry(tmp, struct ptlrpc_request, rq_list);
1632
1633                 DEBUG_REQ(D_HA, req, "inflight");
1634
1635                 spin_lock (&req->rq_lock);
1636                 if (req->rq_import_generation < imp->imp_generation) {
1637                         req->rq_err = 1;
1638                         ptlrpc_wake_client_req(req);
1639                 }
1640                 spin_unlock (&req->rq_lock);
1641         }
1642
1643         list_for_each_safe(tmp, n, &imp->imp_delayed_list) {
1644                 struct ptlrpc_request *req =
1645                         list_entry(tmp, struct ptlrpc_request, rq_list);
1646
1647                 DEBUG_REQ(D_HA, req, "aborting waiting req");
1648
1649                 spin_lock (&req->rq_lock);
1650                 if (req->rq_import_generation < imp->imp_generation) {
1651                         req->rq_err = 1;
1652                         ptlrpc_wake_client_req(req);
1653                 }
1654                 spin_unlock (&req->rq_lock);
1655         }
1656
1657         /* Last chance to free reqs left on the replay list, but we
1658          * will still leak reqs that haven't comitted.  */
1659         if (imp->imp_replayable)
1660                 ptlrpc_free_committed(imp);
1661
1662         spin_unlock_irqrestore(&imp->imp_lock, flags);
1663
1664         EXIT;
1665 }
1666
1667 static __u64 ptlrpc_last_xid = 0;
1668 static spinlock_t ptlrpc_last_xid_lock = SPIN_LOCK_UNLOCKED;
1669
1670 __u64 ptlrpc_next_xid(void)
1671 {
1672         __u64 tmp;
1673         spin_lock(&ptlrpc_last_xid_lock);
1674         tmp = ++ptlrpc_last_xid;
1675         spin_unlock(&ptlrpc_last_xid_lock);
1676         return tmp;
1677 }
1678
1679