Whamcloud - gitweb
b=1742
[fs/lustre-release.git] / lustre / ptlrpc / client.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  *
21  */
22
23 #define DEBUG_SUBSYSTEM S_RPC
24 #ifndef __KERNEL__
25 #include <errno.h>
26 #include <signal.h>
27 #include <liblustre.h>
28 #endif
29
30 #include <linux/obd_support.h>
31 #include <linux/obd_class.h>
32 #include <linux/lustre_lib.h>
33 #include <linux/lustre_ha.h>
34 #include <linux/lustre_import.h>
35
36 #include "ptlrpc_internal.h"
37
38 void ptlrpc_init_client(int req_portal, int rep_portal, char *name,
39                         struct ptlrpc_client *cl)
40 {
41         cl->cli_request_portal = req_portal;
42         cl->cli_reply_portal   = rep_portal;
43         cl->cli_name           = name;
44 }
45
46 struct ptlrpc_connection *ptlrpc_uuid_to_connection(struct obd_uuid *uuid)
47 {
48         struct ptlrpc_connection *c;
49         struct ptlrpc_peer peer;
50         int err;
51
52         err = ptlrpc_uuid_to_peer(uuid, &peer);
53         if (err != 0) {
54                 CERROR("cannot find peer %s!\n", uuid->uuid);
55                 return NULL;
56         }
57
58         c = ptlrpc_get_connection(&peer, uuid);
59         if (c) {
60                 memcpy(c->c_remote_uuid.uuid,
61                        uuid->uuid, sizeof(c->c_remote_uuid.uuid));
62         }
63
64         CDEBUG(D_INFO, "%s -> %p\n", uuid->uuid, c);
65
66         return c;
67 }
68
69 void ptlrpc_readdress_connection(struct ptlrpc_connection *conn,
70                                  struct obd_uuid *uuid)
71 {
72         struct ptlrpc_peer peer;
73         int err;
74
75         err = ptlrpc_uuid_to_peer(uuid, &peer);
76         if (err != 0) {
77                 CERROR("cannot find peer %s!\n", uuid->uuid);
78                 return;
79         }
80
81         memcpy(&conn->c_peer, &peer, sizeof (peer));
82         return;
83 }
84
85 static inline struct ptlrpc_bulk_desc *new_bulk(int npages, int type, int portal)
86 {
87         struct ptlrpc_bulk_desc *desc;
88
89         OBD_ALLOC(desc, offsetof (struct ptlrpc_bulk_desc, bd_iov[npages]));
90         if (!desc)
91                 return NULL;
92
93         spin_lock_init(&desc->bd_lock);
94         init_waitqueue_head(&desc->bd_waitq);
95         desc->bd_max_iov = npages;
96         desc->bd_iov_count = 0;
97         desc->bd_md_h = PTL_INVALID_HANDLE;
98         desc->bd_portal = portal;
99         desc->bd_type = type;
100         
101         return desc;
102 }
103
104 struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_imp (struct ptlrpc_request *req,
105                                                int npages, int type, int portal)
106 {
107         struct obd_import *imp = req->rq_import;
108         struct ptlrpc_bulk_desc *desc;
109
110         LASSERT(type == BULK_PUT_SINK || type == BULK_GET_SOURCE);
111         desc = new_bulk(npages, type, portal);
112         if (desc == NULL)
113                 RETURN(NULL);
114
115         desc->bd_import_generation = req->rq_import_generation;
116         desc->bd_import = class_import_get(imp);
117         desc->bd_req = req;
118
119         desc->bd_cbid.cbid_fn  = client_bulk_callback;
120         desc->bd_cbid.cbid_arg = desc;
121
122         /* This makes req own desc, and free it when she frees herself */
123         req->rq_bulk = desc;
124
125         return desc;
126 }
127
128 struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_exp (struct ptlrpc_request *req,
129                                                int npages, int type, int portal)
130 {
131         struct obd_export *exp = req->rq_export;
132         struct ptlrpc_bulk_desc *desc;
133
134         LASSERT(type == BULK_PUT_SOURCE || type == BULK_GET_SINK);
135
136         desc = new_bulk(npages, type, portal);
137         if (desc == NULL)
138                 RETURN(NULL);
139
140         desc->bd_export = class_export_get(exp);
141         desc->bd_req = req;
142
143         desc->bd_cbid.cbid_fn  = server_bulk_callback;
144         desc->bd_cbid.cbid_arg = desc;
145
146         /* NB we don't assign rq_bulk here; server-side requests are
147          * re-used, and the handler frees the bulk desc explicitly. */
148
149         return desc;
150 }
151
152 void ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc,
153                            struct page *page, int pageoffset, int len)
154 {
155         LASSERT(desc->bd_iov_count < desc->bd_max_iov);
156         LASSERT(page != NULL);
157         LASSERT(pageoffset >= 0);
158         LASSERT(len > 0);
159         LASSERT(pageoffset + len <= PAGE_SIZE);
160
161         desc->bd_nob += len;
162
163         ptlrpc_add_bulk_page(desc, page, pageoffset, len);
164 }
165
166 void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *desc)
167 {
168         ENTRY;
169
170         LASSERT(desc != NULL);
171         LASSERT(desc->bd_iov_count != LI_POISON); /* not freed already */
172         LASSERT(!desc->bd_network_rw);         /* network hands off or */
173         LASSERT((desc->bd_export != NULL) ^ (desc->bd_import != NULL));
174         if (desc->bd_export)
175                 class_export_put(desc->bd_export);
176         else
177                 class_import_put(desc->bd_import);
178
179         OBD_FREE(desc, offsetof(struct ptlrpc_bulk_desc, 
180                                 bd_iov[desc->bd_max_iov]));
181         EXIT;
182 }
183
184 struct ptlrpc_request *ptlrpc_prep_req(struct obd_import *imp, int opcode,
185                                        int count, int *lengths, char **bufs)
186 {
187         struct ptlrpc_request *request;
188         int rc;
189         ENTRY;
190
191         LASSERT((unsigned long)imp > 0x1000);
192
193         OBD_ALLOC(request, sizeof(*request));
194         if (!request) {
195                 CERROR("request allocation out of memory\n");
196                 RETURN(NULL);
197         }
198
199         rc = lustre_pack_request(request, count, lengths, bufs);
200         if (rc) {
201                 CERROR("cannot pack request %d\n", rc);
202                 OBD_FREE(request, sizeof(*request));
203                 RETURN(NULL);
204         }
205
206         request->rq_timeout = obd_timeout;
207         request->rq_send_state = LUSTRE_IMP_FULL;
208         request->rq_type = PTL_RPC_MSG_REQUEST;
209         request->rq_import = class_import_get(imp);
210
211         request->rq_req_cbid.cbid_fn  = request_out_callback;
212         request->rq_req_cbid.cbid_arg = request;
213
214         request->rq_reply_cbid.cbid_fn  = reply_in_callback;
215         request->rq_reply_cbid.cbid_arg = request;
216         
217         request->rq_phase = RQ_PHASE_NEW;
218
219         /* XXX FIXME bug 249 */
220         request->rq_request_portal = imp->imp_client->cli_request_portal;
221         request->rq_reply_portal = imp->imp_client->cli_reply_portal;
222
223         spin_lock_init(&request->rq_lock);
224         INIT_LIST_HEAD(&request->rq_list);
225         INIT_LIST_HEAD(&request->rq_replay_list);
226         INIT_LIST_HEAD(&request->rq_set_chain);
227         init_waitqueue_head(&request->rq_reply_waitq);
228         request->rq_xid = ptlrpc_next_xid();
229         atomic_set(&request->rq_refcount, 1);
230
231         request->rq_reqmsg->opc = opcode;
232         request->rq_reqmsg->flags = 0;
233
234         RETURN(request);
235 }
236
237 struct ptlrpc_request_set *ptlrpc_prep_set(void)
238 {
239         struct ptlrpc_request_set *set;
240
241         OBD_ALLOC(set, sizeof *set);
242         if (!set)
243                 RETURN(NULL);
244         INIT_LIST_HEAD(&set->set_requests);
245         init_waitqueue_head(&set->set_waitq);
246         set->set_remaining = 0;
247         spin_lock_init(&set->set_new_req_lock);
248         INIT_LIST_HEAD(&set->set_new_requests);
249
250         RETURN(set);
251 }
252
253 /* Finish with this set; opposite of prep_set. */
254 void ptlrpc_set_destroy(struct ptlrpc_request_set *set)
255 {
256         struct list_head *tmp;
257         struct list_head *next;
258         int               expected_phase;
259         int               n = 0;
260         ENTRY;
261
262         /* Requests on the set should either all be completed, or all be new */
263         expected_phase = (set->set_remaining == 0) ?
264                          RQ_PHASE_COMPLETE : RQ_PHASE_NEW;
265         list_for_each (tmp, &set->set_requests) {
266                 struct ptlrpc_request *req =
267                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
268
269                 LASSERT(req->rq_phase == expected_phase);
270                 n++;
271         }
272
273         LASSERT(set->set_remaining == 0 || set->set_remaining == n);
274
275         list_for_each_safe(tmp, next, &set->set_requests) {
276                 struct ptlrpc_request *req =
277                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
278                 list_del_init(&req->rq_set_chain);
279
280                 LASSERT(req->rq_phase == expected_phase);
281
282                 if (req->rq_phase == RQ_PHASE_NEW) {
283
284                         if (req->rq_interpret_reply != NULL) {
285                                 int (*interpreter)(struct ptlrpc_request *,
286                                                    void *, int) =
287                                         req->rq_interpret_reply;
288
289                                 /* higher level (i.e. LOV) failed;
290                                  * let the sub reqs clean up */
291                                 req->rq_status = -EBADR;
292                                 interpreter(req, &req->rq_async_args,
293                                             req->rq_status);
294                         }
295                         set->set_remaining--;
296                 }
297
298                 req->rq_set = NULL;
299                 ptlrpc_req_finished (req);
300         }
301
302         LASSERT(set->set_remaining == 0);
303
304         OBD_FREE(set, sizeof(*set));
305         EXIT;
306 }
307
308 void ptlrpc_set_add_req(struct ptlrpc_request_set *set,
309                         struct ptlrpc_request *req)
310 {
311         /* The set takes over the caller's request reference */
312         list_add_tail(&req->rq_set_chain, &set->set_requests);
313         req->rq_set = set;
314         set->set_remaining++;
315         atomic_inc(&req->rq_import->imp_inflight);
316 }
317
318 /* lock so many callers can add things, the context that owns the set
319  * is supposed to notice these and move them into the set proper. */
320 void ptlrpc_set_add_new_req(struct ptlrpc_request_set *set,
321                             struct ptlrpc_request *req)
322 {
323         unsigned long flags;
324         spin_lock_irqsave(&set->set_new_req_lock, flags);
325         /* The set takes over the caller's request reference */
326         list_add_tail(&req->rq_set_chain, &set->set_new_requests);
327         req->rq_set = set;
328         spin_unlock_irqrestore(&set->set_new_req_lock, flags);
329 }
330
331 /*
332  * Based on the current state of the import, determine if the request
333  * can be sent, is an error, or should be delayed.
334  *
335  * Returns true if this request should be delayed. If false, and
336  * *status is set, then the request can not be sent and *status is the
337  * error code.  If false and status is 0, then request can be sent.
338  *
339  * The imp->imp_lock must be held.
340  */
341 static int ptlrpc_import_delay_req(struct obd_import *imp, 
342                                    struct ptlrpc_request *req, int *status)
343 {
344         int delay = 0;
345         ENTRY;
346
347         LASSERT (status != NULL);
348         *status = 0;
349
350         if (imp->imp_state == LUSTRE_IMP_NEW) {
351                 DEBUG_REQ(D_ERROR, req, "Uninitialized import.");
352                 *status = -EIO;
353                 LBUG();
354         }
355         else if (imp->imp_state == LUSTRE_IMP_CLOSED) {
356                 DEBUG_REQ(D_ERROR, req, "IMP_CLOSED ");
357                 *status = -EIO;
358         }
359         /* allow CONNECT even if import is invalid */
360         else if (req->rq_send_state == LUSTRE_IMP_CONNECTING &&
361                  imp->imp_state == LUSTRE_IMP_CONNECTING) {
362                 ;
363         }
364         /*
365          * If the import has been invalidated (such as by an OST failure), the
366          * request must fail with -EIO.  
367          */
368         else if (imp->imp_invalid) {
369                 DEBUG_REQ(D_ERROR, req, "IMP_INVALID");
370                 *status = -EIO;
371         } 
372         else if (req->rq_import_generation != imp->imp_generation) {
373                 DEBUG_REQ(D_ERROR, req, "req wrong generation:");
374                 *status = -EIO;
375         } 
376         else if (req->rq_send_state != imp->imp_state) {
377                 if (imp->imp_obd->obd_no_recov || imp->imp_dlm_fake 
378                     || req->rq_no_delay) 
379                         *status = -EWOULDBLOCK;
380                 else
381                         delay = 1;
382         }
383
384         RETURN(delay);
385 }
386
387 static int ptlrpc_check_reply(struct ptlrpc_request *req)
388 {
389         unsigned long flags;
390         int rc = 0;
391         ENTRY;
392
393         /* serialise with network callback */
394         spin_lock_irqsave (&req->rq_lock, flags);
395
396         if (req->rq_replied) {
397                 DEBUG_REQ(D_NET, req, "REPLIED:");
398                 GOTO(out, rc = 1);
399         }
400         
401         if (req->rq_net_err && !req->rq_timedout) {
402                 spin_unlock_irqrestore (&req->rq_lock, flags);
403                 rc = ptlrpc_expire_one_request(req); 
404                 spin_lock_irqsave (&req->rq_lock, flags);
405                 GOTO(out, rc);
406         }
407
408         if (req->rq_err) {
409                 DEBUG_REQ(D_ERROR, req, "ABORTED:");
410                 GOTO(out, rc = 1);
411         }
412
413         if (req->rq_resend) {
414                 DEBUG_REQ(D_ERROR, req, "RESEND:");
415                 GOTO(out, rc = 1);
416         }
417
418         if (req->rq_restart) {
419                 DEBUG_REQ(D_ERROR, req, "RESTART:");
420                 GOTO(out, rc = 1);
421         }
422         EXIT;
423  out:
424         spin_unlock_irqrestore (&req->rq_lock, flags);
425         DEBUG_REQ(D_NET, req, "rc = %d for", rc);
426         return rc;
427 }
428
429 static int ptlrpc_check_status(struct ptlrpc_request *req)
430 {
431         int err;
432         ENTRY;
433
434         err = req->rq_repmsg->status;
435         if (req->rq_repmsg->type == PTL_RPC_MSG_ERR) {
436                 DEBUG_REQ(D_ERROR, req, "type == PTL_RPC_MSG_ERR, err == %d", 
437                           err);
438                 RETURN(err < 0 ? err : -EINVAL);
439         }
440
441         if (err < 0) {
442                 DEBUG_REQ(D_INFO, req, "status is %d", err);
443         } else if (err > 0) {
444                 /* XXX: translate this error from net to host */
445                 DEBUG_REQ(D_INFO, req, "status is %d", err);
446         }
447
448         RETURN(err);
449 }
450
451 static int after_reply(struct ptlrpc_request *req)
452 {
453         unsigned long flags;
454         struct obd_import *imp = req->rq_import;
455         int rc;
456         ENTRY;
457
458         LASSERT(!req->rq_receiving_reply);
459
460         /* NB Until this point, the whole of the incoming message,
461          * including buflens, status etc is in the sender's byte order. */
462
463 #if SWAB_PARANOIA
464         /* Clear reply swab mask; this is a new reply in sender's byte order */
465         req->rq_rep_swab_mask = 0;
466 #endif
467         LASSERT (req->rq_nob_received <= req->rq_replen);
468         rc = lustre_unpack_msg(req->rq_repmsg, req->rq_nob_received);
469         if (rc) {
470                 CERROR("unpack_rep failed: %d\n", rc);
471                 RETURN(-EPROTO);
472         }
473
474         if (req->rq_repmsg->type != PTL_RPC_MSG_REPLY &&
475             req->rq_repmsg->type != PTL_RPC_MSG_ERR) {
476                 CERROR("invalid packet type received (type=%u)\n",
477                        req->rq_repmsg->type);
478                 RETURN(-EPROTO);
479         }
480
481         /* Store transno in reqmsg for replay. */
482         req->rq_reqmsg->transno = req->rq_transno = req->rq_repmsg->transno;
483
484         rc = ptlrpc_check_status(req);
485
486         /* Either we've been evicted, or the server has failed for
487          * some reason. Try to reconnect, and if that fails, punt to the
488          * upcall. */
489         if (rc == -ENOTCONN) {
490                 if (req->rq_send_state != LUSTRE_IMP_FULL ||
491                     imp->imp_obd->obd_no_recov || imp->imp_dlm_fake) {
492                         RETURN(-ENOTCONN);
493                 }
494
495                 ptlrpc_request_handle_notconn(req);
496
497                 RETURN(rc);
498         }
499
500         if (req->rq_import->imp_replayable) {
501                 spin_lock_irqsave(&imp->imp_lock, flags);
502                 if (req->rq_replay || req->rq_transno != 0)
503                         ptlrpc_retain_replayable_request(req, imp);
504                 else if (req->rq_commit_cb != NULL) {
505                         spin_unlock_irqrestore(&imp->imp_lock, flags);
506                         req->rq_commit_cb(req);
507                         spin_lock_irqsave(&imp->imp_lock, flags);
508                 }
509
510                 if (req->rq_transno > imp->imp_max_transno)
511                         imp->imp_max_transno = req->rq_transno;
512
513                 /* Replay-enabled imports return commit-status information. */
514                 if (req->rq_repmsg->last_committed)
515                         imp->imp_peer_committed_transno =
516                                 req->rq_repmsg->last_committed;
517                 ptlrpc_free_committed(imp);
518                 spin_unlock_irqrestore(&imp->imp_lock, flags);
519         }
520
521         RETURN(rc);
522 }
523
524 static int ptlrpc_send_new_req(struct ptlrpc_request *req)
525 {
526         char                   str[PTL_NALFMT_SIZE];
527         struct obd_import     *imp;
528         unsigned long          flags;
529         int rc;
530         ENTRY;
531
532         LASSERT(req->rq_phase == RQ_PHASE_NEW);
533         req->rq_phase = RQ_PHASE_RPC;
534
535         imp = req->rq_import;
536         spin_lock_irqsave(&imp->imp_lock, flags);
537
538         req->rq_import_generation = imp->imp_generation;
539
540         if (ptlrpc_import_delay_req(imp, req, &rc)) {
541                 spin_lock (&req->rq_lock);
542                 req->rq_waiting = 1;
543                 spin_unlock (&req->rq_lock);
544
545                 DEBUG_REQ(D_HA, req, "req from PID %d waiting for recovery: "
546                           "(%s != %s)",
547                           req->rq_reqmsg->status, 
548                           ptlrpc_import_state_name(req->rq_send_state),
549                           ptlrpc_import_state_name(imp->imp_state));
550                 LASSERT(list_empty (&req->rq_list));
551
552                 list_add_tail(&req->rq_list, &imp->imp_delayed_list);
553                 spin_unlock_irqrestore(&imp->imp_lock, flags);
554                 RETURN(0);
555         }
556
557         if (rc != 0) {
558                 spin_unlock_irqrestore(&imp->imp_lock, flags);
559                 req->rq_status = rc;
560                 req->rq_phase = RQ_PHASE_INTERPRET;
561                 RETURN(rc);
562         }
563
564         /* XXX this is the same as ptlrpc_queue_wait */
565         LASSERT(list_empty(&req->rq_list));
566         list_add_tail(&req->rq_list, &imp->imp_sending_list);
567         spin_unlock_irqrestore(&imp->imp_lock, flags);
568
569         req->rq_reqmsg->status = current->pid;
570         CDEBUG(D_RPCTRACE, "Sending RPC pname:cluuid:pid:xid:ni:nid:opc"
571                " %s:%s:%d:"LPU64":%s:%s:%d\n", current->comm,
572                imp->imp_obd->obd_uuid.uuid, req->rq_reqmsg->status,
573                req->rq_xid,
574                imp->imp_connection->c_peer.peer_ni->pni_name,
575                ptlrpc_peernid2str(&imp->imp_connection->c_peer, str),
576                req->rq_reqmsg->opc);
577
578         rc = ptl_send_rpc(req);
579         if (rc) {
580                 DEBUG_REQ(D_HA, req, "send failed (%d); expect timeout", rc);
581                 req->rq_net_err = 1;
582                 RETURN(rc);
583         }
584         RETURN(0);
585 }
586
587 int ptlrpc_check_set(struct ptlrpc_request_set *set)
588 {
589         char str[PTL_NALFMT_SIZE];
590         unsigned long flags;
591         struct list_head *tmp;
592         int force_timer_recalc = 0;
593         ENTRY;
594
595         if (set->set_remaining == 0)
596                 RETURN(1);
597
598         list_for_each(tmp, &set->set_requests) {
599                 struct ptlrpc_request *req =
600                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
601                 struct obd_import *imp = req->rq_import;
602                 int rc = 0;
603
604                 if (req->rq_phase == RQ_PHASE_NEW &&
605                     ptlrpc_send_new_req(req)) {
606                         force_timer_recalc = 1;
607                 }
608
609                 if (!(req->rq_phase == RQ_PHASE_RPC ||
610                       req->rq_phase == RQ_PHASE_BULK ||
611                       req->rq_phase == RQ_PHASE_INTERPRET ||
612                       req->rq_phase == RQ_PHASE_COMPLETE)) {
613                         DEBUG_REQ(D_ERROR, req, "bad phase %x", req->rq_phase);
614                         LBUG();
615                 }
616
617                 if (req->rq_phase == RQ_PHASE_COMPLETE)
618                         continue;
619
620                 if (req->rq_phase == RQ_PHASE_INTERPRET)
621                         GOTO(interpret, req->rq_status);
622
623                 if (req->rq_net_err && !req->rq_timedout)
624                         ptlrpc_expire_one_request(req); 
625
626                 if (req->rq_err) {
627                         ptlrpc_unregister_reply(req);
628                         if (req->rq_status == 0)
629                                 req->rq_status = -EIO;
630                         req->rq_phase = RQ_PHASE_INTERPRET;
631
632                         spin_lock_irqsave(&imp->imp_lock, flags);
633                         list_del_init(&req->rq_list);
634                         spin_unlock_irqrestore(&imp->imp_lock, flags);
635
636                         GOTO(interpret, req->rq_status);
637                 }
638
639                 /* ptlrpc_queue_wait->l_wait_event guarantees that rq_intr
640                  * will only be set after rq_timedout, but the oig waiting
641                  * path sets rq_intr irrespective of whether ptlrpcd has
642                  * seen a timeout.  our policy is to only interpret 
643                  * interrupted rpcs after they have timed out */
644                 if (req->rq_intr && (req->rq_timedout || req->rq_waiting)) {
645                         /* NB could be on delayed list */
646                         ptlrpc_unregister_reply(req);
647                         req->rq_status = -EINTR;
648                         req->rq_phase = RQ_PHASE_INTERPRET;
649
650                         spin_lock_irqsave(&imp->imp_lock, flags);
651                         list_del_init(&req->rq_list);
652                         spin_unlock_irqrestore(&imp->imp_lock, flags);
653
654                         GOTO(interpret, req->rq_status);
655                 }
656
657                 if (req->rq_phase == RQ_PHASE_RPC) {
658                         if (req->rq_waiting || req->rq_resend) {
659                                 int status;
660
661                                 ptlrpc_unregister_reply(req);
662
663                                 spin_lock_irqsave(&imp->imp_lock, flags);
664
665                                 if (ptlrpc_import_delay_req(imp, req, &status)){
666                                         spin_unlock_irqrestore(&imp->imp_lock,
667                                                                flags);
668                                         continue;
669                                 } 
670
671                                 list_del_init(&req->rq_list);
672                                 if (status != 0)  {
673                                         req->rq_status = status;
674                                         req->rq_phase = RQ_PHASE_INTERPRET;
675                                         spin_unlock_irqrestore(&imp->imp_lock,
676                                                                flags);
677                                         GOTO(interpret, req->rq_status);
678                                 }
679                                 if (req->rq_no_resend) {
680                                         req->rq_status = -ENOTCONN;
681                                         req->rq_phase = RQ_PHASE_INTERPRET;
682                                         spin_unlock_irqrestore(&imp->imp_lock,
683                                                                flags);
684                                         GOTO(interpret, req->rq_status);
685                                 }
686                                 list_add_tail(&req->rq_list,
687                                               &imp->imp_sending_list);
688
689                                 spin_unlock_irqrestore(&imp->imp_lock, flags);
690
691                                 req->rq_waiting = 0;
692                                 if (req->rq_resend) {
693                                         lustre_msg_add_flags(req->rq_reqmsg,
694                                                              MSG_RESENT);
695                                         if (req->rq_bulk) {
696                                                 __u64 old_xid = req->rq_xid;
697
698                                                 ptlrpc_unregister_bulk (req);
699
700                                                 /* ensure previous bulk fails */
701                                                 req->rq_xid = ptlrpc_next_xid();
702                                                 CDEBUG(D_HA, "resend bulk "
703                                                        "old x"LPU64
704                                                        " new x"LPU64"\n",
705                                                        old_xid, req->rq_xid);
706                                         }
707                                 }
708
709                                 rc = ptl_send_rpc(req);
710                                 if (rc) {
711                                         DEBUG_REQ(D_HA, req, "send failed (%d)",
712                                                   rc);
713                                         force_timer_recalc = 1;
714                                         req->rq_net_err = 1;
715                                 }
716                                 /* need to reset the timeout */
717                                 force_timer_recalc = 1;
718                         }
719
720                         /* Still waiting for a reply? */
721                         if (ptlrpc_client_receiving_reply(req))
722                                 continue;
723
724                         /* Did we actually receive a reply? */
725                         if (!ptlrpc_client_replied(req))
726                                 continue;
727
728                         spin_lock_irqsave(&imp->imp_lock, flags);
729                         list_del_init(&req->rq_list);
730                         spin_unlock_irqrestore(&imp->imp_lock, flags);
731
732                         req->rq_status = after_reply(req);
733                         if (req->rq_resend) {
734                                 /* Add this req to the delayed list so
735                                    it can be errored if the import is
736                                    evicted after recovery. */
737                                 spin_lock_irqsave (&req->rq_lock, flags);
738                                 list_add_tail(&req->rq_list, 
739                                               &imp->imp_delayed_list);
740                                 spin_unlock_irqrestore(&req->rq_lock, flags);
741                                 continue;
742                         }
743
744                         /* If there is no bulk associated with this request,
745                          * then we're done and should let the interpreter
746                          * process the reply.  Similarly if the RPC returned
747                          * an error, and therefore the bulk will never arrive.
748                          */
749                         if (req->rq_bulk == NULL || req->rq_status != 0) {
750                                 req->rq_phase = RQ_PHASE_INTERPRET;
751                                 GOTO(interpret, req->rq_status);
752                         }
753
754                         req->rq_phase = RQ_PHASE_BULK;
755                 }
756
757                 LASSERT(req->rq_phase == RQ_PHASE_BULK);
758                 if (ptlrpc_bulk_active(req->rq_bulk))
759                         continue;
760
761                 if (!req->rq_bulk->bd_success) {
762                         /* The RPC reply arrived OK, but the bulk screwed
763                          * up!  Dead wierd since the server told us the RPC
764                          * was good after getting the REPLY for her GET or
765                          * the ACK for her PUT. */
766                         DEBUG_REQ(D_ERROR, req, "bulk transfer failed");
767                         LBUG();
768                 }
769
770                 req->rq_phase = RQ_PHASE_INTERPRET;
771
772         interpret:
773                 LASSERT(req->rq_phase == RQ_PHASE_INTERPRET);
774                 LASSERT(!req->rq_receiving_reply);
775
776                 ptlrpc_unregister_reply(req);
777                 if (req->rq_bulk != NULL)
778                         ptlrpc_unregister_bulk (req);
779
780                 req->rq_phase = RQ_PHASE_COMPLETE;
781
782                 if (req->rq_interpret_reply != NULL) {
783                         int (*interpreter)(struct ptlrpc_request *,void *,int) =
784                                 req->rq_interpret_reply;
785                         req->rq_status = interpreter(req, &req->rq_async_args,
786                                                      req->rq_status);
787                 }
788
789                 CDEBUG(D_RPCTRACE, "Completed RPC pname:cluuid:pid:xid:ni:nid:"
790                        "opc %s:%s:%d:"LPU64":%s:%s:%d\n", current->comm,
791                        imp->imp_obd->obd_uuid.uuid, req->rq_reqmsg->status,
792                        req->rq_xid,
793                        imp->imp_connection->c_peer.peer_ni->pni_name,
794                        ptlrpc_peernid2str(&imp->imp_connection->c_peer, str),
795                        req->rq_reqmsg->opc);
796
797                 set->set_remaining--;
798
799                 atomic_dec(&imp->imp_inflight);
800                 wake_up(&imp->imp_recovery_waitq);
801         }
802
803         /* If we hit an error, we want to recover promptly. */
804         RETURN(set->set_remaining == 0 || force_timer_recalc);
805 }
806
807 int ptlrpc_expire_one_request(struct ptlrpc_request *req)
808 {
809         unsigned long      flags;
810         struct obd_import *imp = req->rq_import;
811         ENTRY;
812
813         DEBUG_REQ(D_ERROR, req, "timeout");
814
815         spin_lock_irqsave (&req->rq_lock, flags);
816         req->rq_timedout = 1;
817         spin_unlock_irqrestore (&req->rq_lock, flags);
818
819         ptlrpc_unregister_reply (req);
820
821         if (req->rq_bulk != NULL)
822                 ptlrpc_unregister_bulk (req);
823
824         if (imp == NULL) {
825                 DEBUG_REQ(D_HA, req, "NULL import: already cleaned up?");
826                 RETURN(1);
827         }
828
829         /* The DLM server doesn't want recovery run on its imports. */
830         if (imp->imp_dlm_fake)
831                 RETURN(1);
832
833         /* If this request is for recovery or other primordial tasks,
834          * then error it out here. */
835         if (req->rq_send_state != LUSTRE_IMP_FULL || 
836             imp->imp_obd->obd_no_recov) {
837                 spin_lock_irqsave (&req->rq_lock, flags);
838                 req->rq_status = -ETIMEDOUT;
839                 req->rq_err = 1;
840                 spin_unlock_irqrestore (&req->rq_lock, flags);
841                 RETURN(1);
842         }
843
844         ptlrpc_fail_import(imp, req->rq_import_generation);
845
846         RETURN(0);
847 }
848
849 int ptlrpc_expired_set(void *data)
850 {
851         struct ptlrpc_request_set *set = data;
852         struct list_head          *tmp;
853         time_t                     now = LTIME_S (CURRENT_TIME);
854         ENTRY;
855
856         LASSERT(set != NULL);
857
858         /* A timeout expired; see which reqs it applies to... */
859         list_for_each (tmp, &set->set_requests) {
860                 struct ptlrpc_request *req =
861                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
862
863                 /* request in-flight? */
864                 if (!((req->rq_phase == RQ_PHASE_RPC && !req->rq_waiting 
865                        && !req->rq_resend) ||
866                       (req->rq_phase == RQ_PHASE_BULK)))
867                         continue;
868
869                 if (req->rq_timedout ||           /* already dealt with */
870                     req->rq_sent + req->rq_timeout > now) /* not expired */
871                         continue;
872
873                 /* deal with this guy */
874                 ptlrpc_expire_one_request (req);
875         }
876
877         /* When waiting for a whole set, we always to break out of the
878          * sleep so we can recalculate the timeout, or enable interrupts
879          * iff everyone's timed out.
880          */
881         RETURN(1);
882 }
883
884 void ptlrpc_mark_interrupted(struct ptlrpc_request *req)
885 {
886         unsigned long flags;
887         spin_lock_irqsave(&req->rq_lock, flags);
888         req->rq_intr = 1;
889         spin_unlock_irqrestore(&req->rq_lock, flags);
890 }
891
892 void ptlrpc_interrupted_set(void *data)
893 {
894         struct ptlrpc_request_set *set = data;
895         struct list_head *tmp;
896
897         LASSERT(set != NULL);
898         CERROR("INTERRUPTED SET %p\n", set);
899
900         list_for_each(tmp, &set->set_requests) {
901                 struct ptlrpc_request *req =
902                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
903
904                 if (req->rq_phase != RQ_PHASE_RPC)
905                         continue;
906
907                 ptlrpc_mark_interrupted(req);
908         }
909 }
910
911 int ptlrpc_set_next_timeout(struct ptlrpc_request_set *set)
912 {
913         struct list_head      *tmp;
914         time_t                 now = LTIME_S(CURRENT_TIME);
915         time_t                 deadline;
916         int                    timeout = 0;
917         struct ptlrpc_request *req;
918         ENTRY;
919
920         SIGNAL_MASK_ASSERT(); /* XXX BUG 1511 */
921
922         list_for_each(tmp, &set->set_requests) {
923                 req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
924
925                 /* request in-flight? */
926                 if (!((req->rq_phase == RQ_PHASE_RPC && !req->rq_waiting) ||
927                       (req->rq_phase == RQ_PHASE_BULK)))
928                         continue;
929
930                 if (req->rq_timedout)   /* already timed out */
931                         continue;
932
933                 deadline = req->rq_sent + req->rq_timeout;
934                 if (deadline <= now)    /* actually expired already */
935                         timeout = 1;    /* ASAP */
936                 else if (timeout == 0 || timeout > deadline - now)
937                         timeout = deadline - now;
938         }
939         RETURN(timeout);
940 }
941                 
942
943 int ptlrpc_set_wait(struct ptlrpc_request_set *set)
944 {
945         struct list_head      *tmp;
946         struct ptlrpc_request *req;
947         struct l_wait_info     lwi;
948         int                    rc, timeout;
949         ENTRY;
950
951         LASSERT(!list_empty(&set->set_requests));
952         list_for_each(tmp, &set->set_requests) {
953                 req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
954                 if (req->rq_phase == RQ_PHASE_NEW)
955                         (void)ptlrpc_send_new_req(req);
956         }
957
958         do {
959                 timeout = ptlrpc_set_next_timeout(set);
960
961                 /* wait until all complete, interrupted, or an in-flight
962                  * req times out */
963                 CDEBUG(D_HA, "set %p going to sleep for %d seconds\n",
964                        set, timeout);
965                 lwi = LWI_TIMEOUT_INTR((timeout ? timeout : 1) * HZ,
966                                        ptlrpc_expired_set, 
967                                        ptlrpc_interrupted_set, set);
968                 rc = l_wait_event(set->set_waitq, ptlrpc_check_set(set), &lwi);
969
970                 LASSERT(rc == 0 || rc == -EINTR || rc == -ETIMEDOUT);
971
972                 /* -EINTR => all requests have been flagged rq_intr so next
973                  * check completes.
974                  * -ETIMEOUTD => someone timed out.  When all reqs have
975                  * timed out, signals are enabled allowing completion with
976                  * EINTR.
977                  * I don't really care if we go once more round the loop in
978                  * the error cases -eeb. */
979         } while (rc != 0);
980
981         LASSERT(set->set_remaining == 0);
982
983         rc = 0;
984         list_for_each(tmp, &set->set_requests) {
985                 req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
986
987                 LASSERT(req->rq_phase == RQ_PHASE_COMPLETE);
988                 if (req->rq_status != 0)
989                         rc = req->rq_status;
990         }
991
992         if (set->set_interpret != NULL) {
993                 int (*interpreter)(struct ptlrpc_request_set *set,void *,int) =
994                         set->set_interpret;
995                 rc = interpreter (set, &set->set_args, rc);
996         }
997
998         RETURN(rc);
999 }
1000
1001 static void __ptlrpc_free_req(struct ptlrpc_request *request, int locked)
1002 {
1003         ENTRY;
1004         if (request == NULL) {
1005                 EXIT;
1006                 return;
1007         }
1008
1009         LASSERTF(!request->rq_receiving_reply, "req %p\n", request);
1010         LASSERTF(request->rq_rqbd == NULL, "req %p\n",request);/* client-side */
1011         LASSERTF(list_empty(&request->rq_list), "req %p\n", request);
1012         LASSERTF(list_empty(&request->rq_set_chain), "req %p\n", request);
1013
1014         /* We must take it off the imp_replay_list first.  Otherwise, we'll set
1015          * request->rq_reqmsg to NULL while osc_close is dereferencing it. */
1016         if (request->rq_import != NULL) {
1017                 unsigned long flags = 0;
1018                 if (!locked)
1019                         spin_lock_irqsave(&request->rq_import->imp_lock, flags);
1020                 list_del_init(&request->rq_replay_list);
1021                 if (!locked)
1022                         spin_unlock_irqrestore(&request->rq_import->imp_lock,
1023                                                flags);
1024         }
1025         LASSERTF(list_empty(&request->rq_replay_list), "req %p\n", request);
1026
1027         if (atomic_read(&request->rq_refcount) != 0) {
1028                 DEBUG_REQ(D_ERROR, request,
1029                           "freeing request with nonzero refcount");
1030                 LBUG();
1031         }
1032
1033         if (request->rq_repmsg != NULL) {
1034                 OBD_FREE(request->rq_repmsg, request->rq_replen);
1035                 request->rq_repmsg = NULL;
1036         }
1037         if (request->rq_reqmsg != NULL) {
1038                 OBD_FREE(request->rq_reqmsg, request->rq_reqlen);
1039                 request->rq_reqmsg = NULL;
1040         }
1041         if (request->rq_export != NULL) {
1042                 class_export_put(request->rq_export);
1043                 request->rq_export = NULL;
1044         }
1045         if (request->rq_import != NULL) {
1046                 class_import_put(request->rq_import);
1047                 request->rq_import = NULL;
1048         }
1049         if (request->rq_bulk != NULL)
1050                 ptlrpc_free_bulk(request->rq_bulk);
1051
1052         OBD_FREE(request, sizeof(*request));
1053         EXIT;
1054 }
1055
1056 void ptlrpc_free_req(struct ptlrpc_request *request)
1057 {
1058         __ptlrpc_free_req(request, 0);
1059 }
1060
1061 static int __ptlrpc_req_finished(struct ptlrpc_request *request, int locked);
1062 void ptlrpc_req_finished_with_imp_lock(struct ptlrpc_request *request)
1063 {
1064         LASSERT_SPIN_LOCKED(&request->rq_import->imp_lock);
1065         (void)__ptlrpc_req_finished(request, 1);
1066 }
1067
1068 static int __ptlrpc_req_finished(struct ptlrpc_request *request, int locked)
1069 {
1070         ENTRY;
1071         if (request == NULL)
1072                 RETURN(1);
1073
1074         if (request == LP_POISON ||
1075             request->rq_reqmsg == LP_POISON) {
1076                 CERROR("dereferencing freed request (bug 575)\n");
1077                 LBUG();
1078                 RETURN(1);
1079         }
1080
1081         DEBUG_REQ(D_INFO, request, "refcount now %u",
1082                   atomic_read(&request->rq_refcount) - 1);
1083
1084         if (atomic_dec_and_test(&request->rq_refcount)) {
1085                 __ptlrpc_free_req(request, locked);
1086                 RETURN(1);
1087         }
1088
1089         RETURN(0);
1090 }
1091
1092 void ptlrpc_req_finished(struct ptlrpc_request *request)
1093 {
1094         __ptlrpc_req_finished(request, 0);
1095 }
1096
1097 /* Disengage the client's reply buffer from the network
1098  * NB does _NOT_ unregister any client-side bulk.
1099  * IDEMPOTENT, but _not_ safe against concurrent callers.
1100  * The request owner (i.e. the thread doing the I/O) must call...
1101  */
1102 void ptlrpc_unregister_reply (struct ptlrpc_request *request)
1103 {
1104         int                rc;
1105         wait_queue_head_t *wq;
1106         struct l_wait_info lwi;
1107
1108         LASSERT(!in_interrupt ());             /* might sleep */
1109
1110         if (!ptlrpc_client_receiving_reply(request))
1111                 return;
1112
1113         PtlMDUnlink (request->rq_reply_md_h);
1114
1115         /* We have to l_wait_event() whatever the result, to give liblustre
1116          * a chance to run reply_in_callback() */
1117
1118         if (request->rq_set == NULL)
1119                 wq = &request->rq_set->set_waitq;
1120         else
1121                 wq = &request->rq_reply_waitq;
1122
1123         for (;;) {
1124                 /* Network access will complete in finite time but the HUGE
1125                  * timeout lets us CWARN for visibility of sluggish NALs */
1126                 lwi = LWI_TIMEOUT(300 * HZ, NULL, NULL);
1127                 rc = l_wait_event (*wq, !ptlrpc_client_receiving_reply(request), &lwi);
1128                 if (rc == 0)
1129                         return;
1130
1131                 LASSERT (rc == -ETIMEDOUT);
1132                 DEBUG_REQ(D_WARNING, request, "Unexpectedly long timeout");
1133         }
1134 }
1135
1136 /* caller must hold imp->imp_lock */
1137 void ptlrpc_free_committed(struct obd_import *imp)
1138 {
1139         struct list_head *tmp, *saved;
1140         struct ptlrpc_request *req;
1141         struct ptlrpc_request *last_req = NULL; /* temporary fire escape */
1142         ENTRY;
1143
1144         LASSERT(imp != NULL);
1145
1146         LASSERT_SPIN_LOCKED(&imp->imp_lock);
1147
1148         CDEBUG(D_HA, "%s: committing for last_committed "LPU64"\n",
1149                imp->imp_obd->obd_name, imp->imp_peer_committed_transno);
1150
1151         list_for_each_safe(tmp, saved, &imp->imp_replay_list) {
1152                 req = list_entry(tmp, struct ptlrpc_request, rq_replay_list);
1153
1154                 /* XXX ok to remove when 1357 resolved - rread 05/29/03  */
1155                 LASSERT(req != last_req);
1156                 last_req = req;
1157
1158                 if (req->rq_import_generation < imp->imp_generation) {
1159                         DEBUG_REQ(D_HA, req, "freeing request with old gen");
1160                         GOTO(free_req, 0);
1161                 }
1162
1163                 if (req->rq_replay) {
1164                         DEBUG_REQ(D_HA, req, "keeping (FL_REPLAY)");
1165                         continue;
1166                 }
1167
1168                 /* not yet committed */
1169                 if (req->rq_transno > imp->imp_peer_committed_transno) {
1170                         DEBUG_REQ(D_HA, req, "stopping search");
1171                         break;
1172                 }
1173
1174                 DEBUG_REQ(D_HA, req, "committing (last_committed "LPU64")",
1175                           imp->imp_peer_committed_transno);
1176 free_req:
1177                 if (req->rq_commit_cb != NULL)
1178                         req->rq_commit_cb(req);
1179                 list_del_init(&req->rq_replay_list);
1180                 __ptlrpc_req_finished(req, 1);
1181         }
1182
1183         EXIT;
1184         return;
1185 }
1186
1187 void ptlrpc_cleanup_client(struct obd_import *imp)
1188 {
1189         ENTRY;
1190         EXIT;
1191         return;
1192 }
1193
1194 void ptlrpc_resend_req(struct ptlrpc_request *req)
1195 {
1196         unsigned long flags;
1197
1198         DEBUG_REQ(D_HA, req, "going to resend");
1199         req->rq_reqmsg->handle.cookie = 0;
1200         req->rq_status = -EAGAIN;
1201
1202         spin_lock_irqsave (&req->rq_lock, flags);
1203         req->rq_resend = 1;
1204         req->rq_net_err = 0;
1205         req->rq_timedout = 0;
1206         if (req->rq_bulk) {
1207                 __u64 old_xid = req->rq_xid;
1208                 
1209                 /* ensure previous bulk fails */
1210                 req->rq_xid = ptlrpc_next_xid();
1211                 CDEBUG(D_HA, "resend bulk old x"LPU64" new x"LPU64"\n",
1212                        old_xid, req->rq_xid);
1213         }
1214         ptlrpc_wake_client_req(req);
1215         spin_unlock_irqrestore (&req->rq_lock, flags);
1216 }
1217
1218 /* XXX: this function and rq_status are currently unused */
1219 void ptlrpc_restart_req(struct ptlrpc_request *req)
1220 {
1221         unsigned long flags;
1222
1223         DEBUG_REQ(D_HA, req, "restarting (possibly-)completed request");
1224         req->rq_status = -ERESTARTSYS;
1225
1226         spin_lock_irqsave (&req->rq_lock, flags);
1227         req->rq_restart = 1;
1228         req->rq_timedout = 0;
1229         ptlrpc_wake_client_req(req);
1230         spin_unlock_irqrestore (&req->rq_lock, flags);
1231 }
1232
1233 static int expired_request(void *data)
1234 {
1235         struct ptlrpc_request *req = data;
1236         ENTRY;
1237
1238         RETURN(ptlrpc_expire_one_request(req));
1239 }
1240
1241 static void interrupted_request(void *data)
1242 {
1243         unsigned long flags;
1244
1245         struct ptlrpc_request *req = data;
1246         DEBUG_REQ(D_HA, req, "request interrupted");
1247         spin_lock_irqsave (&req->rq_lock, flags);
1248         req->rq_intr = 1;
1249         spin_unlock_irqrestore (&req->rq_lock, flags);
1250 }
1251
1252 struct ptlrpc_request *ptlrpc_request_addref(struct ptlrpc_request *req)
1253 {
1254         ENTRY;
1255         atomic_inc(&req->rq_refcount);
1256         RETURN(req);
1257 }
1258
1259 void ptlrpc_retain_replayable_request(struct ptlrpc_request *req,
1260                                       struct obd_import *imp)
1261 {
1262         struct list_head *tmp;
1263
1264         LASSERT_SPIN_LOCKED(&imp->imp_lock);
1265
1266         /* clear this  for new requests that were resent as well
1267            as resent replayed requests. */
1268         lustre_msg_clear_flags(req->rq_reqmsg,
1269                              MSG_RESENT);
1270
1271         /* don't re-add requests that have been replayed */
1272         if (!list_empty(&req->rq_replay_list))
1273                 return;
1274
1275         lustre_msg_add_flags(req->rq_reqmsg,
1276                              MSG_REPLAY);
1277
1278         LASSERT(imp->imp_replayable);
1279         /* Balanced in ptlrpc_free_committed, usually. */
1280         ptlrpc_request_addref(req);
1281         list_for_each_prev(tmp, &imp->imp_replay_list) {
1282                 struct ptlrpc_request *iter =
1283                         list_entry(tmp, struct ptlrpc_request, rq_replay_list);
1284
1285                 /* We may have duplicate transnos if we create and then
1286                  * open a file, or for closes retained if to match creating
1287                  * opens, so use req->rq_xid as a secondary key.
1288                  * (See bugs 684, 685, and 428.)
1289                  * XXX no longer needed, but all opens need transnos!
1290                  */
1291                 if (iter->rq_transno > req->rq_transno)
1292                         continue;
1293
1294                 if (iter->rq_transno == req->rq_transno) {
1295                         LASSERT(iter->rq_xid != req->rq_xid);
1296                         if (iter->rq_xid > req->rq_xid)
1297                                 continue;
1298                 }
1299
1300                 list_add(&req->rq_replay_list, &iter->rq_replay_list);
1301                 return;
1302         }
1303
1304         list_add_tail(&req->rq_replay_list, &imp->imp_replay_list);
1305 }
1306
1307 int ptlrpc_queue_wait(struct ptlrpc_request *req)
1308 {
1309         char str[PTL_NALFMT_SIZE];
1310         int rc = 0;
1311         int brc;
1312         struct l_wait_info lwi;
1313         struct obd_import *imp = req->rq_import;
1314         unsigned long flags;
1315         int timeout = 0;
1316         ENTRY;
1317
1318         LASSERT(req->rq_set == NULL);
1319         LASSERT(!req->rq_receiving_reply);
1320         atomic_inc(&imp->imp_inflight);
1321
1322         /* for distributed debugging */
1323         req->rq_reqmsg->status = current->pid;
1324         LASSERT(imp->imp_obd != NULL);
1325         CDEBUG(D_RPCTRACE, "Sending RPC pname:cluuid:pid:xid:ni:nid:opc "
1326                "%s:%s:%d:"LPU64":%s:%s:%d\n", current->comm,
1327                imp->imp_obd->obd_uuid.uuid,
1328                req->rq_reqmsg->status, req->rq_xid,
1329                imp->imp_connection->c_peer.peer_ni->pni_name,
1330                ptlrpc_peernid2str(&imp->imp_connection->c_peer, str),
1331                req->rq_reqmsg->opc);
1332
1333         /* Mark phase here for a little debug help */
1334         req->rq_phase = RQ_PHASE_RPC;
1335
1336         spin_lock_irqsave(&imp->imp_lock, flags);
1337         req->rq_import_generation = imp->imp_generation;
1338 restart:
1339         if (ptlrpc_import_delay_req(imp, req, &rc)) {
1340                 list_del(&req->rq_list);
1341
1342                 list_add_tail(&req->rq_list, &imp->imp_delayed_list);
1343                 spin_unlock_irqrestore(&imp->imp_lock, flags);
1344
1345                 DEBUG_REQ(D_HA, req, "\"%s\" waiting for recovery: (%s != %s)",
1346                           current->comm, 
1347                           ptlrpc_import_state_name(req->rq_send_state), 
1348                           ptlrpc_import_state_name(imp->imp_state));
1349                 lwi = LWI_INTR(interrupted_request, req);
1350                 rc = l_wait_event(req->rq_reply_waitq,
1351                                   (req->rq_send_state == imp->imp_state ||
1352                                    req->rq_err),
1353                                   &lwi);
1354                 DEBUG_REQ(D_HA, req, "\"%s\" awake: (%s == %s or %d == 1)",
1355                           current->comm, 
1356                           ptlrpc_import_state_name(imp->imp_state), 
1357                           ptlrpc_import_state_name(req->rq_send_state),
1358                           req->rq_err);
1359
1360                 spin_lock_irqsave(&imp->imp_lock, flags);
1361                 list_del_init(&req->rq_list);
1362
1363                 if (req->rq_err) {
1364                         rc = -EIO;
1365                 } 
1366                 else if (req->rq_intr) {
1367                         rc = -EINTR;
1368                 }
1369                 else if (req->rq_no_resend) {
1370                         spin_unlock_irqrestore(&imp->imp_lock, flags);
1371                         GOTO(out, rc = -ETIMEDOUT);
1372                 }
1373                 else {
1374                         GOTO(restart, rc);
1375                 }
1376         } 
1377
1378         if (rc != 0) {
1379                 list_del_init(&req->rq_list);
1380                 spin_unlock_irqrestore(&imp->imp_lock, flags);
1381                 req->rq_status = rc; // XXX this ok?
1382                 GOTO(out, rc);
1383         }
1384
1385         if (req->rq_resend) {
1386                 lustre_msg_add_flags(req->rq_reqmsg, MSG_RESENT);
1387
1388                 if (req->rq_bulk != NULL)
1389                         ptlrpc_unregister_bulk (req);
1390
1391                 DEBUG_REQ(D_HA, req, "resending: ");
1392         }
1393
1394         /* XXX this is the same as ptlrpc_set_wait */
1395         LASSERT(list_empty(&req->rq_list));
1396         list_add_tail(&req->rq_list, &imp->imp_sending_list);
1397         spin_unlock_irqrestore(&imp->imp_lock, flags);
1398
1399         rc = ptl_send_rpc(req);
1400         if (rc) {
1401                 DEBUG_REQ(D_HA, req, "send failed (%d); recovering", rc);
1402                 timeout = 1;
1403         } else {
1404                 timeout = MAX(req->rq_timeout * HZ, 1);
1405                 DEBUG_REQ(D_NET, req, "-- sleeping");
1406         }
1407         lwi = LWI_TIMEOUT_INTR(timeout, expired_request, interrupted_request,
1408                                req);
1409         l_wait_event(req->rq_reply_waitq, ptlrpc_check_reply(req), &lwi);
1410         DEBUG_REQ(D_NET, req, "-- done sleeping");
1411
1412         CDEBUG(D_RPCTRACE, "Completed RPC pname:cluuid:pid:xid:ni:nid:opc "
1413                "%s:%s:%d:"LPU64":%s:%s:%d\n", current->comm,
1414                imp->imp_obd->obd_uuid.uuid,
1415                req->rq_reqmsg->status, req->rq_xid,
1416                imp->imp_connection->c_peer.peer_ni->pni_name,
1417                ptlrpc_peernid2str(&imp->imp_connection->c_peer, str),
1418                req->rq_reqmsg->opc);
1419
1420         spin_lock_irqsave(&imp->imp_lock, flags);
1421         list_del_init(&req->rq_list);
1422         spin_unlock_irqrestore(&imp->imp_lock, flags);
1423
1424         /* If the reply was received normally, this just grabs the spinlock
1425          * (ensuring the reply callback has returned), sees that
1426          * req->rq_receiving_reply is clear and returns. */
1427         ptlrpc_unregister_reply (req);
1428
1429         if (req->rq_err)
1430                 GOTO(out, rc = -EIO);
1431
1432         /* Resend if we need to, unless we were interrupted. */
1433         if (req->rq_resend && !req->rq_intr) {
1434                 /* ...unless we were specifically told otherwise. */
1435                 if (req->rq_no_resend)
1436                         GOTO(out, rc = -ETIMEDOUT);
1437                 spin_lock_irqsave(&imp->imp_lock, flags);
1438                 goto restart;
1439         }
1440
1441         if (req->rq_intr) {
1442                 /* Should only be interrupted if we timed out. */
1443                 if (!req->rq_timedout)
1444                         DEBUG_REQ(D_ERROR, req,
1445                                   "rq_intr set but rq_timedout not");
1446                 GOTO(out, rc = -EINTR);
1447         }
1448
1449         if (req->rq_timedout) {                 /* non-recoverable timeout */
1450                 GOTO(out, rc = -ETIMEDOUT);
1451         }
1452
1453         if (!req->rq_replied) {
1454                 /* How can this be? -eeb */
1455                 DEBUG_REQ(D_ERROR, req, "!rq_replied: ");
1456                 LBUG();
1457                 GOTO(out, rc = req->rq_status);
1458         }
1459
1460         rc = after_reply (req);
1461         /* NB may return +ve success rc */
1462         if (req->rq_resend) {
1463                 spin_lock_irqsave(&imp->imp_lock, flags);
1464                 goto restart;
1465         }
1466
1467  out:
1468         if (req->rq_bulk != NULL) {
1469                 if (rc >= 0) {                  
1470                         /* success so far.  Note that anything going wrong
1471                          * with bulk now, is EXTREMELY strange, since the
1472                          * server must have believed that the bulk
1473                          * tranferred OK before she replied with success to
1474                          * me. */
1475                         lwi = LWI_TIMEOUT(timeout, NULL, NULL);
1476                         brc = l_wait_event(req->rq_reply_waitq,
1477                                            !ptlrpc_bulk_active(req->rq_bulk),
1478                                            &lwi);
1479                         LASSERT(brc == 0 || brc == -ETIMEDOUT);
1480                         if (brc != 0) {
1481                                 LASSERT(brc == -ETIMEDOUT);
1482                                 DEBUG_REQ(D_ERROR, req, "bulk timed out");
1483                                 rc = brc;
1484                         } else if (!req->rq_bulk->bd_success) {
1485                                 DEBUG_REQ(D_ERROR, req, "bulk transfer failed");
1486                                 rc = -EIO;
1487                         }
1488                 }
1489                 if (rc < 0)
1490                         ptlrpc_unregister_bulk (req);
1491         }
1492
1493         LASSERT(!req->rq_receiving_reply);
1494         req->rq_phase = RQ_PHASE_INTERPRET;
1495
1496         atomic_dec(&imp->imp_inflight);
1497         wake_up(&imp->imp_recovery_waitq);
1498         RETURN(rc);
1499 }
1500
1501 struct ptlrpc_replay_async_args {
1502         int praa_old_state;
1503         int praa_old_status;
1504 };
1505
1506 static int ptlrpc_replay_interpret(struct ptlrpc_request *req,
1507                                     void * data, int rc)
1508 {
1509         struct ptlrpc_replay_async_args *aa = data;
1510         struct obd_import *imp = req->rq_import;
1511         unsigned long flags;
1512
1513         atomic_dec(&imp->imp_replay_inflight);
1514         
1515         if (!req->rq_replied) {
1516                 CERROR("request replay timed out, restarting recovery\n");
1517                 GOTO(out, rc = -ETIMEDOUT);
1518         }
1519
1520 #if SWAB_PARANOIA
1521         /* Clear reply swab mask; this is a new reply in sender's byte order */
1522         req->rq_rep_swab_mask = 0;
1523 #endif
1524         LASSERT (req->rq_nob_received <= req->rq_replen);
1525         rc = lustre_unpack_msg(req->rq_repmsg, req->rq_nob_received);
1526         if (rc) {
1527                 CERROR("unpack_rep failed: %d\n", rc);
1528                 GOTO(out, rc = -EPROTO);
1529         }
1530
1531         if (req->rq_repmsg->type == PTL_RPC_MSG_ERR && 
1532             req->rq_repmsg->status == -ENOTCONN) 
1533                 GOTO(out, rc = req->rq_repmsg->status);
1534
1535         /* The transno had better not change over replay. */
1536         LASSERT(req->rq_reqmsg->transno == req->rq_repmsg->transno);
1537
1538         DEBUG_REQ(D_HA, req, "got rep");
1539
1540         /* let the callback do fixups, possibly including in the request */
1541         if (req->rq_replay_cb)
1542                 req->rq_replay_cb(req);
1543
1544         if (req->rq_replied && req->rq_repmsg->status != aa->praa_old_status) {
1545                 DEBUG_REQ(D_ERROR, req, "status %d, old was %d",
1546                           req->rq_repmsg->status, aa->praa_old_status);
1547         } else {
1548                 /* Put it back for re-replay. */
1549                 req->rq_repmsg->status = aa->praa_old_status;
1550         }
1551
1552         spin_lock_irqsave(&imp->imp_lock, flags);
1553         imp->imp_last_replay_transno = req->rq_transno;
1554         spin_unlock_irqrestore(&imp->imp_lock, flags);
1555
1556         /* continue with recovery */
1557         rc = ptlrpc_import_recovery_state_machine(imp);
1558  out:
1559         req->rq_send_state = aa->praa_old_state;
1560         
1561         if (rc != 0)
1562                 /* this replay failed, so restart recovery */
1563                 ptlrpc_connect_import(imp, NULL);
1564
1565         RETURN(rc);
1566 }
1567
1568
1569 int ptlrpc_replay_req(struct ptlrpc_request *req)
1570 {
1571         struct ptlrpc_replay_async_args *aa;
1572         ENTRY;
1573
1574         LASSERT(req->rq_import->imp_state == LUSTRE_IMP_REPLAY);
1575
1576         /* Not handling automatic bulk replay yet (or ever?) */
1577         LASSERT(req->rq_bulk == NULL);
1578
1579         DEBUG_REQ(D_HA, req, "REPLAY");
1580
1581         LASSERT (sizeof (*aa) <= sizeof (req->rq_async_args));
1582         aa = (struct ptlrpc_replay_async_args *)&req->rq_async_args;
1583         memset(aa, 0, sizeof *aa);
1584
1585         /* Prepare request to be resent with ptlrpcd */
1586         aa->praa_old_state = req->rq_send_state;
1587         req->rq_send_state = LUSTRE_IMP_REPLAY;
1588         req->rq_phase = RQ_PHASE_NEW;
1589         aa->praa_old_status = req->rq_repmsg->status;
1590         req->rq_status = 0;
1591
1592         req->rq_interpret_reply = ptlrpc_replay_interpret;
1593         atomic_inc(&req->rq_import->imp_replay_inflight);
1594         ptlrpc_request_addref(req); /* ptlrpcd needs a ref */
1595
1596         ptlrpcd_add_req(req);
1597         RETURN(0);
1598 }
1599
1600 void ptlrpc_abort_inflight(struct obd_import *imp)
1601 {
1602         unsigned long flags;
1603         struct list_head *tmp, *n;
1604         ENTRY;
1605
1606         /* Make sure that no new requests get processed for this import.
1607          * ptlrpc_{queue,set}_wait must (and does) hold imp_lock while testing
1608          * this flag and then putting requests on sending_list or delayed_list.
1609          */
1610         spin_lock_irqsave(&imp->imp_lock, flags);
1611
1612         /* XXX locking?  Maybe we should remove each request with the list
1613          * locked?  Also, how do we know if the requests on the list are
1614          * being freed at this time?
1615          */
1616         list_for_each_safe(tmp, n, &imp->imp_sending_list) {
1617                 struct ptlrpc_request *req =
1618                         list_entry(tmp, struct ptlrpc_request, rq_list);
1619
1620                 DEBUG_REQ(D_HA, req, "inflight");
1621
1622                 spin_lock (&req->rq_lock);
1623                 if (req->rq_import_generation < imp->imp_generation) {
1624                         req->rq_err = 1;
1625                         ptlrpc_wake_client_req(req);
1626                 }
1627                 spin_unlock (&req->rq_lock);
1628         }
1629
1630         list_for_each_safe(tmp, n, &imp->imp_delayed_list) {
1631                 struct ptlrpc_request *req =
1632                         list_entry(tmp, struct ptlrpc_request, rq_list);
1633
1634                 DEBUG_REQ(D_HA, req, "aborting waiting req");
1635
1636                 spin_lock (&req->rq_lock);
1637                 if (req->rq_import_generation < imp->imp_generation) {
1638                         req->rq_err = 1;
1639                         ptlrpc_wake_client_req(req);
1640                 }
1641                 spin_unlock (&req->rq_lock);
1642         }
1643
1644         /* Last chance to free reqs left on the replay list, but we
1645          * will still leak reqs that haven't comitted.  */
1646         if (imp->imp_replayable)
1647                 ptlrpc_free_committed(imp);
1648
1649         spin_unlock_irqrestore(&imp->imp_lock, flags);
1650
1651         EXIT;
1652 }
1653
1654 static __u64 ptlrpc_last_xid = 0;
1655 static spinlock_t ptlrpc_last_xid_lock = SPIN_LOCK_UNLOCKED;
1656
1657 __u64 ptlrpc_next_xid(void)
1658 {
1659         __u64 tmp;
1660         spin_lock(&ptlrpc_last_xid_lock);
1661         tmp = ++ptlrpc_last_xid;
1662         spin_unlock(&ptlrpc_last_xid_lock);
1663         return tmp;
1664 }
1665
1666