Whamcloud - gitweb
b=6403
[fs/lustre-release.git] / lustre / ptlrpc / client.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  *
21  */
22
23 #define DEBUG_SUBSYSTEM S_RPC
24 #ifndef __KERNEL__
25 #include <errno.h>
26 #include <signal.h>
27 #include <liblustre.h>
28 #endif
29
30 #include <linux/obd_support.h>
31 #include <linux/obd_class.h>
32 #include <linux/lustre_lib.h>
33 #include <linux/lustre_ha.h>
34 #include <linux/lustre_import.h>
35 #include <linux/lustre_sec.h>
36
37 #include "ptlrpc_internal.h"
38
39 void ptlrpc_init_client(int req_portal, int rep_portal, char *name,
40                         struct ptlrpc_client *cl)
41 {
42         cl->cli_request_portal = req_portal;
43         cl->cli_reply_portal   = rep_portal;
44         cl->cli_name           = name;
45 }
46
47 struct ptlrpc_connection *ptlrpc_uuid_to_connection(struct obd_uuid *uuid)
48 {
49         struct ptlrpc_connection *c;
50         struct ptlrpc_peer peer;
51         int err;
52
53         err = ptlrpc_uuid_to_peer(uuid, &peer);
54         if (err != 0) {
55                 CERROR("cannot find peer %s!\n", uuid->uuid);
56                 return NULL;
57         }
58
59         c = ptlrpc_get_connection(&peer, uuid);
60         if (c) {
61                 memcpy(c->c_remote_uuid.uuid,
62                        uuid->uuid, sizeof(c->c_remote_uuid.uuid));
63         }
64
65         CDEBUG(D_INFO, "%s -> %p\n", uuid->uuid, c);
66
67         return c;
68 }
69
70 void ptlrpc_readdress_connection(struct ptlrpc_connection *conn,
71                                  struct obd_uuid *uuid)
72 {
73         struct ptlrpc_peer peer;
74         int err;
75
76         err = ptlrpc_uuid_to_peer(uuid, &peer);
77         if (err != 0) {
78                 CERROR("cannot find peer %s!\n", uuid->uuid);
79                 return;
80         }
81
82         memcpy(&conn->c_peer, &peer, sizeof (peer));
83         return;
84 }
85
86 static inline struct ptlrpc_bulk_desc *new_bulk(int npages, int type, int portal)
87 {
88         struct ptlrpc_bulk_desc *desc;
89
90         OBD_ALLOC(desc, offsetof (struct ptlrpc_bulk_desc, bd_iov[npages]));
91         if (!desc)
92                 return NULL;
93
94         spin_lock_init(&desc->bd_lock);
95         init_waitqueue_head(&desc->bd_waitq);
96         desc->bd_max_iov = npages;
97         desc->bd_iov_count = 0;
98         desc->bd_md_h = PTL_INVALID_HANDLE;
99         desc->bd_portal = portal;
100         desc->bd_type = type;
101         
102         return desc;
103 }
104
105 struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_imp (struct ptlrpc_request *req,
106                                                int npages, int type, int portal)
107 {
108         struct obd_import *imp = req->rq_import;
109         struct ptlrpc_bulk_desc *desc;
110
111         LASSERT(type == BULK_PUT_SINK || type == BULK_GET_SOURCE);
112         desc = new_bulk(npages, type, portal);
113         if (desc == NULL)
114                 RETURN(NULL);
115
116         desc->bd_import_generation = req->rq_import_generation;
117         desc->bd_import = class_import_get(imp);
118         desc->bd_req = req;
119
120         desc->bd_cbid.cbid_fn  = client_bulk_callback;
121         desc->bd_cbid.cbid_arg = desc;
122
123         /* This makes req own desc, and free it when she frees herself */
124         req->rq_bulk = desc;
125
126         return desc;
127 }
128
129 struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_exp (struct ptlrpc_request *req,
130                                                int npages, int type, int portal)
131 {
132         struct obd_export *exp = req->rq_export;
133         struct ptlrpc_bulk_desc *desc;
134
135         LASSERT(type == BULK_PUT_SOURCE || type == BULK_GET_SINK);
136
137         desc = new_bulk(npages, type, portal);
138         if (desc == NULL)
139                 RETURN(NULL);
140
141         desc->bd_export = class_export_get(exp);
142         desc->bd_req = req;
143
144         desc->bd_cbid.cbid_fn  = server_bulk_callback;
145         desc->bd_cbid.cbid_arg = desc;
146
147         /* NB we don't assign rq_bulk here; server-side requests are
148          * re-used, and the handler frees the bulk desc explicitly. */
149
150         return desc;
151 }
152
153 void ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc,
154                            struct page *page, int pageoffset, int len)
155 {
156         LASSERT(desc->bd_iov_count < desc->bd_max_iov);
157         LASSERT(page != NULL);
158         LASSERT(pageoffset >= 0);
159         LASSERT(len > 0);
160         LASSERT(pageoffset + len <= PAGE_SIZE);
161
162         desc->bd_nob += len;
163
164         ptlrpc_add_bulk_page(desc, page, pageoffset, len);
165 }
166
167 void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *desc)
168 {
169         ENTRY;
170
171         LASSERT(desc != NULL);
172         LASSERT(desc->bd_iov_count != LI_POISON); /* not freed already */
173         LASSERT(!desc->bd_network_rw);         /* network hands off or */
174         LASSERT((desc->bd_export != NULL) ^ (desc->bd_import != NULL));
175         if (desc->bd_export)
176                 class_export_put(desc->bd_export);
177         else
178                 class_import_put(desc->bd_import);
179
180         OBD_FREE(desc, offsetof(struct ptlrpc_bulk_desc, 
181                                 bd_iov[desc->bd_max_iov]));
182         EXIT;
183 }
184
185 /* FIXME prep_req now should return error code other than NULL. but
186  * this is called everywhere :(
187  */
188 struct ptlrpc_request *ptlrpc_prep_req(struct obd_import *imp, __u32 version,
189                                        int opcode, int count, int *lengths,
190                                        char **bufs)
191 {
192         struct ptlrpc_request *request;
193         int rc;
194         ENTRY;
195
196         LASSERT((unsigned long)imp > 0x1000);
197
198         OBD_ALLOC(request, sizeof(*request));
199         if (!request) {
200                 CERROR("request allocation out of memory\n");
201                 RETURN(NULL);
202         }
203
204         request->rq_import = class_import_get(imp);
205
206         rc = ptlrpcs_req_get_cred(request);
207         if (rc) {
208                 CDEBUG(D_SEC, "failed to get credential\n");
209                 GOTO(out_free, rc);
210         }
211
212         /* try to refresh the cred. we do this here in order to let fewer
213          * refresh be performed in ptlrpcd context (which might block ptlrpcd).
214          * fail out only if a fatal ptlrpcs error occured.
215          */
216         ptlrpcs_req_refresh_cred(request);
217         if (request->rq_ptlrpcs_err)
218                 GOTO(out_cred, rc = -EPERM);
219
220         rc = lustre_pack_request(request, count, lengths, bufs);
221         if (rc) {
222                 CERROR("cannot pack request %d\n", rc);
223                 GOTO(out_cred, rc);
224         }
225         request->rq_reqmsg->version |= version;
226
227         if (imp->imp_server_timeout)
228                 request->rq_timeout = obd_timeout / 2;
229         else
230                 request->rq_timeout = obd_timeout;
231
232         request->rq_send_state = LUSTRE_IMP_FULL;
233         request->rq_type = PTL_RPC_MSG_REQUEST;
234
235         request->rq_req_cbid.cbid_fn  = request_out_callback;
236         request->rq_req_cbid.cbid_arg = request;
237
238         request->rq_reply_cbid.cbid_fn  = reply_in_callback;
239         request->rq_reply_cbid.cbid_arg = request;
240         
241         request->rq_phase = RQ_PHASE_NEW;
242
243         /* XXX FIXME bug 249 */
244         request->rq_request_portal = imp->imp_client->cli_request_portal;
245         request->rq_reply_portal = imp->imp_client->cli_reply_portal;
246
247         spin_lock_init(&request->rq_lock);
248         INIT_LIST_HEAD(&request->rq_list);
249         INIT_LIST_HEAD(&request->rq_replay_list);
250         INIT_LIST_HEAD(&request->rq_set_chain);
251         init_waitqueue_head(&request->rq_reply_waitq);
252         request->rq_xid = ptlrpc_next_xid();
253         atomic_set(&request->rq_refcount, 1);
254
255         request->rq_reqmsg->opc = opcode;
256         request->rq_reqmsg->flags = 0;
257         RETURN(request);
258 out_cred:
259         ptlrpcs_req_drop_cred(request);
260 out_free:
261         class_import_put(imp);
262         OBD_FREE(request, sizeof(*request));
263         RETURN(NULL);
264 }
265
266 struct ptlrpc_request_set *ptlrpc_prep_set(void)
267 {
268         struct ptlrpc_request_set *set;
269
270         OBD_ALLOC(set, sizeof *set);
271         if (!set)
272                 RETURN(NULL);
273         INIT_LIST_HEAD(&set->set_requests);
274         init_waitqueue_head(&set->set_waitq);
275         set->set_remaining = 0;
276         spin_lock_init(&set->set_new_req_lock);
277         INIT_LIST_HEAD(&set->set_new_requests);
278
279         RETURN(set);
280 }
281
282 /* Finish with this set; opposite of prep_set. */
283 void ptlrpc_set_destroy(struct ptlrpc_request_set *set)
284 {
285         struct list_head *tmp;
286         struct list_head *next;
287         int               expected_phase;
288         int               n = 0;
289         ENTRY;
290
291         /* Requests on the set should either all be completed, or all be new */
292         expected_phase = (set->set_remaining == 0) ?
293                          RQ_PHASE_COMPLETE : RQ_PHASE_NEW;
294         list_for_each (tmp, &set->set_requests) {
295                 struct ptlrpc_request *req =
296                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
297
298                 LASSERT(req->rq_phase == expected_phase);
299                 n++;
300         }
301
302         LASSERT(set->set_remaining == 0 || set->set_remaining == n);
303
304         list_for_each_safe(tmp, next, &set->set_requests) {
305                 struct ptlrpc_request *req =
306                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
307                 list_del_init(&req->rq_set_chain);
308
309                 LASSERT(req->rq_phase == expected_phase);
310
311                 if (req->rq_phase == RQ_PHASE_NEW) {
312
313                         if (req->rq_interpret_reply != NULL) {
314                                 int (*interpreter)(struct ptlrpc_request *,
315                                                    void *, int) =
316                                         req->rq_interpret_reply;
317
318                                 /* higher level (i.e. LOV) failed;
319                                  * let the sub reqs clean up */
320                                 req->rq_status = -EBADR;
321                                 interpreter(req, &req->rq_async_args,
322                                             req->rq_status);
323                         }
324                         set->set_remaining--;
325                 }
326
327                 req->rq_set = NULL;
328                 ptlrpc_req_finished (req);
329         }
330
331         LASSERT(set->set_remaining == 0);
332
333         OBD_FREE(set, sizeof(*set));
334         EXIT;
335 }
336
337 void ptlrpc_set_add_req(struct ptlrpc_request_set *set,
338                         struct ptlrpc_request *req)
339 {
340         /* The set takes over the caller's request reference */
341         list_add_tail(&req->rq_set_chain, &set->set_requests);
342         req->rq_set = set;
343         set->set_remaining++;
344         atomic_inc(&req->rq_import->imp_inflight);
345 }
346
347 /* lock so many callers can add things, the context that owns the set
348  * is supposed to notice these and move them into the set proper. */
349 void ptlrpc_set_add_new_req(struct ptlrpc_request_set *set,
350                             struct ptlrpc_request *req)
351 {
352         unsigned long flags;
353         spin_lock_irqsave(&set->set_new_req_lock, flags);
354         /* The set takes over the caller's request reference */
355         list_add_tail(&req->rq_set_chain, &set->set_new_requests);
356         req->rq_set = set;
357         spin_unlock_irqrestore(&set->set_new_req_lock, flags);
358 }
359
360 /*
361  * Based on the current state of the import, determine if the request
362  * can be sent, is an error, or should be delayed.
363  *
364  * Returns true if this request should be delayed. If false, and
365  * *status is set, then the request can not be sent and *status is the
366  * error code.  If false and status is 0, then request can be sent.
367  *
368  * The imp->imp_lock must be held.
369  */
370 static int ptlrpc_import_delay_req(struct obd_import *imp, 
371                                    struct ptlrpc_request *req, int *status)
372 {
373         int delay = 0;
374         ENTRY;
375
376         LASSERT (status != NULL);
377         *status = 0;
378
379         if (imp->imp_state == LUSTRE_IMP_NEW) {
380                 DEBUG_REQ(D_ERROR, req, "Uninitialized import.");
381                 *status = -EIO;
382                 LBUG();
383         }
384         else if (imp->imp_state == LUSTRE_IMP_CLOSED) {
385                 DEBUG_REQ(D_ERROR, req, "IMP_CLOSED ");
386                 *status = -EIO;
387         }
388         /* allow CONNECT even if import is invalid */
389         else if (req->rq_send_state == LUSTRE_IMP_CONNECTING &&
390                  imp->imp_state == LUSTRE_IMP_CONNECTING) {
391                 ;
392         }
393         /*
394          * If the import has been invalidated (such as by an OST failure), the
395          * request must fail with -EIO.  
396          */
397         else if (imp->imp_invalid) {
398                 DEBUG_REQ(D_ERROR, req, "IMP_INVALID");
399                 *status = -EIO;
400         } 
401         else if (req->rq_import_generation != imp->imp_generation) {
402                 DEBUG_REQ(D_ERROR, req, "req wrong generation:");
403                 *status = -EIO;
404         } 
405         else if (req->rq_send_state != imp->imp_state) {
406                 if (imp->imp_obd->obd_no_recov || imp->imp_dlm_fake 
407                     || req->rq_no_delay) 
408                         *status = -EWOULDBLOCK;
409                 else
410                         delay = 1;
411         }
412
413         RETURN(delay);
414 }
415
416 static int ptlrpc_check_reply(struct ptlrpc_request *req)
417 {
418         unsigned long flags;
419         int rc = 0;
420         ENTRY;
421
422         /* serialise with network callback */
423         spin_lock_irqsave (&req->rq_lock, flags);
424
425         if (req->rq_replied) {
426                 DEBUG_REQ(D_NET, req, "REPLIED:");
427                 GOTO(out, rc = 1);
428         }
429         
430         if (req->rq_net_err && !req->rq_timedout) {
431                 spin_unlock_irqrestore (&req->rq_lock, flags);
432                 rc = ptlrpc_expire_one_request(req); 
433                 spin_lock_irqsave (&req->rq_lock, flags);
434                 GOTO(out, rc);
435         }
436
437         if (req->rq_err) {
438                 DEBUG_REQ(D_ERROR, req, "ABORTED:");
439                 GOTO(out, rc = 1);
440         }
441
442         if (req->rq_resend) {
443                 DEBUG_REQ(D_ERROR, req, "RESEND:");
444                 GOTO(out, rc = 1);
445         }
446
447         if (req->rq_restart) {
448                 DEBUG_REQ(D_ERROR, req, "RESTART:");
449                 GOTO(out, rc = 1);
450         }
451         EXIT;
452  out:
453         spin_unlock_irqrestore (&req->rq_lock, flags);
454         DEBUG_REQ(D_NET, req, "rc = %d for", rc);
455         return rc;
456 }
457
458 static int ptlrpc_check_status(struct ptlrpc_request *req)
459 {
460         int err;
461         ENTRY;
462
463         err = req->rq_repmsg->status;
464         if (req->rq_repmsg->type == PTL_RPC_MSG_ERR) {
465                 DEBUG_REQ(D_ERROR, req, "type == PTL_RPC_MSG_ERR, err == %d", 
466                           err);
467                 RETURN(err < 0 ? err : -EINVAL);
468         }
469
470         if (err < 0) {
471                 DEBUG_REQ(D_INFO, req, "status is %d", err);
472         } else if (err > 0) {
473                 /* XXX: translate this error from net to host */
474                 DEBUG_REQ(D_INFO, req, "status is %d", err);
475         }
476
477         RETURN(err);
478 }
479
480 static int after_reply(struct ptlrpc_request *req)
481 {
482         unsigned long flags;
483         struct obd_import *imp = req->rq_import;
484         int rc;
485         ENTRY;
486
487         LASSERT(!req->rq_receiving_reply);
488
489         /* NB Until this point, the whole of the incoming message,
490          * including buflens, status etc is in the sender's byte order. */
491
492 #if SWAB_PARANOIA
493         /* Clear reply swab mask; this is a new reply in sender's byte order */
494         req->rq_rep_swab_mask = 0;
495 #endif
496         LASSERT (req->rq_nob_received <= req->rq_repbuf_len);
497         rc = ptlrpcs_cli_unwrap_reply(req);
498         if (rc) {
499                 CERROR("verify reply error: %d\n", rc);
500                 RETURN(rc);
501         }
502         /* unwrap_reply may request rpc be resend */
503         if (req->rq_ptlrpcs_restart) {
504                 req->rq_resend = 1;
505                 RETURN(0);
506         }
507
508         /* unwrap_reply will set rq_replen as the actual received
509          * lustre_msg length
510          */
511         rc = lustre_unpack_msg(req->rq_repmsg, req->rq_replen);
512         if (rc) {
513                 CERROR("unpack_rep failed: %d\n", rc);
514                 RETURN(-EPROTO);
515         }
516
517         if (req->rq_repmsg->type != PTL_RPC_MSG_REPLY &&
518             req->rq_repmsg->type != PTL_RPC_MSG_ERR) {
519                 CERROR("invalid packet type received (type=%u)\n",
520                        req->rq_repmsg->type);
521                 RETURN(-EPROTO);
522         }
523
524         rc = ptlrpc_check_status(req);
525
526         /* Either we've been evicted, or the server has failed for
527          * some reason. Try to reconnect, and if that fails, punt to the
528          * upcall. */
529         if (rc == -ENOTCONN) {
530                 if (req->rq_send_state != LUSTRE_IMP_FULL ||
531                     imp->imp_obd->obd_no_recov || imp->imp_dlm_fake) {
532                         RETURN(-ENOTCONN);
533                 }
534
535                 ptlrpc_request_handle_notconn(req);
536
537                 RETURN(rc);
538         }
539
540         /* Store transno in reqmsg for replay. */
541         req->rq_reqmsg->transno = req->rq_transno = req->rq_repmsg->transno;
542
543
544         if (req->rq_import->imp_replayable) {
545                 spin_lock_irqsave(&imp->imp_lock, flags);
546                 if (req->rq_transno != 0)
547                         ptlrpc_retain_replayable_request(req, imp);
548                 else if (req->rq_commit_cb != NULL) {
549                         spin_unlock_irqrestore(&imp->imp_lock, flags);
550                         req->rq_commit_cb(req);
551                         spin_lock_irqsave(&imp->imp_lock, flags);
552                 }
553
554                 if (req->rq_transno > imp->imp_max_transno)
555                         imp->imp_max_transno = req->rq_transno;
556
557                 /* Replay-enabled imports return commit-status information. */
558                 if (req->rq_repmsg->last_committed)
559                         imp->imp_peer_committed_transno =
560                                 req->rq_repmsg->last_committed;
561                 ptlrpc_free_committed(imp);
562                 spin_unlock_irqrestore(&imp->imp_lock, flags);
563         }
564
565         RETURN(rc);
566 }
567
568 static int ptlrpc_send_new_req(struct ptlrpc_request *req)
569 {
570         char                   str[PTL_NALFMT_SIZE];
571         struct obd_import     *imp;
572         unsigned long          flags;
573         int rc;
574         ENTRY;
575
576         LASSERT(req->rq_phase == RQ_PHASE_NEW);
577         req->rq_phase = RQ_PHASE_RPC;
578
579         imp = req->rq_import;
580         spin_lock_irqsave(&imp->imp_lock, flags);
581
582         req->rq_import_generation = imp->imp_generation;
583
584         if (ptlrpc_import_delay_req(imp, req, &rc)) {
585                 spin_lock (&req->rq_lock);
586                 req->rq_waiting = 1;
587                 spin_unlock (&req->rq_lock);
588
589                 DEBUG_REQ(D_HA, req, "req from PID %d waiting for recovery: "
590                           "(%s != %s)",
591                           req->rq_reqmsg->status, 
592                           ptlrpc_import_state_name(req->rq_send_state),
593                           ptlrpc_import_state_name(imp->imp_state));
594                 LASSERT(list_empty (&req->rq_list));
595
596                 list_add_tail(&req->rq_list, &imp->imp_delayed_list);
597                 spin_unlock_irqrestore(&imp->imp_lock, flags);
598                 RETURN(0);
599         }
600
601         if (rc != 0) {
602                 spin_unlock_irqrestore(&imp->imp_lock, flags);
603                 req->rq_status = rc;
604                 req->rq_phase = RQ_PHASE_INTERPRET;
605                 RETURN(rc);
606         }
607
608         /* XXX this is the same as ptlrpc_queue_wait */
609         LASSERT(list_empty(&req->rq_list));
610         list_add_tail(&req->rq_list, &imp->imp_sending_list);
611         spin_unlock_irqrestore(&imp->imp_lock, flags);
612
613         req->rq_reqmsg->status = current->pid;
614         CDEBUG(D_RPCTRACE, "Sending RPC pname:cluuid:pid:xid:ni:nid:opc"
615                " %s:%s:%d:"LPU64":%s:%s:%d\n", current->comm,
616                imp->imp_obd->obd_uuid.uuid, req->rq_reqmsg->status,
617                req->rq_xid,
618                imp->imp_connection->c_peer.peer_ni->pni_name,
619                ptlrpc_peernid2str(&imp->imp_connection->c_peer, str),
620                req->rq_reqmsg->opc);
621
622         rc = ptl_send_rpc(req);
623         if (rc) {
624                 DEBUG_REQ(D_HA, req, "send failed (%d); expect timeout", rc);
625                 req->rq_net_err = 1;
626                 RETURN(rc);
627         }
628         RETURN(0);
629 }
630
631 int ptlrpc_check_set(struct ptlrpc_request_set *set)
632 {
633         char str[PTL_NALFMT_SIZE];
634         unsigned long flags;
635         struct list_head *tmp;
636         int force_timer_recalc = 0;
637         ENTRY;
638
639         if (set->set_remaining == 0)
640                 RETURN(1);
641
642         list_for_each(tmp, &set->set_requests) {
643                 struct ptlrpc_request *req =
644                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
645                 struct obd_import *imp = req->rq_import;
646                 int rc = 0;
647
648                 if (req->rq_phase == RQ_PHASE_NEW &&
649                     ptlrpc_send_new_req(req)) {
650                         force_timer_recalc = 1;
651                 }
652
653                 if (!(req->rq_phase == RQ_PHASE_RPC ||
654                       req->rq_phase == RQ_PHASE_BULK ||
655                       req->rq_phase == RQ_PHASE_INTERPRET ||
656                       req->rq_phase == RQ_PHASE_COMPLETE)) {
657                         DEBUG_REQ(D_ERROR, req, "bad phase %x", req->rq_phase);
658                         LBUG();
659                 }
660
661                 if (req->rq_phase == RQ_PHASE_COMPLETE)
662                         continue;
663
664                 if (req->rq_phase == RQ_PHASE_INTERPRET)
665                         GOTO(interpret, req->rq_status);
666
667                 if (req->rq_net_err && !req->rq_timedout)
668                         ptlrpc_expire_one_request(req); 
669
670                 if (req->rq_err || req->rq_ptlrpcs_err) {
671                         ptlrpc_unregister_reply(req);
672                         if (req->rq_status == 0)
673                                 req->rq_status = req->rq_err ? -EIO : -EPERM;
674                         req->rq_phase = RQ_PHASE_INTERPRET;
675
676                         spin_lock_irqsave(&imp->imp_lock, flags);
677                         list_del_init(&req->rq_list);
678                         spin_unlock_irqrestore(&imp->imp_lock, flags);
679
680                         GOTO(interpret, req->rq_status);
681                 }
682
683                 /* ptlrpc_queue_wait->l_wait_event guarantees that rq_intr
684                  * will only be set after rq_timedout, but the oig waiting
685                  * path sets rq_intr irrespective of whether ptlrpcd has
686                  * seen a timeout.  our policy is to only interpret 
687                  * interrupted rpcs after they have timed out */
688                 if (req->rq_intr && (req->rq_timedout || req->rq_waiting)) {
689                         /* NB could be on delayed list */
690                         ptlrpc_unregister_reply(req);
691                         req->rq_status = -EINTR;
692                         req->rq_phase = RQ_PHASE_INTERPRET;
693
694                         spin_lock_irqsave(&imp->imp_lock, flags);
695                         list_del_init(&req->rq_list);
696                         spin_unlock_irqrestore(&imp->imp_lock, flags);
697
698                         GOTO(interpret, req->rq_status);
699                 }
700
701                 if (req->rq_phase == RQ_PHASE_RPC) {
702                         if (req->rq_timedout||req->rq_waiting||req->rq_resend) {
703                                 int status;
704
705                                 ptlrpc_unregister_reply(req);
706
707                                 spin_lock_irqsave(&imp->imp_lock, flags);
708
709                                 if (ptlrpc_import_delay_req(imp, req, &status)){
710                                         spin_unlock_irqrestore(&imp->imp_lock,
711                                                                flags);
712                                         continue;
713                                 }
714
715                                 list_del_init(&req->rq_list);
716                                 if (status != 0)  {
717                                         req->rq_status = status;
718                                         req->rq_phase = RQ_PHASE_INTERPRET;
719                                         spin_unlock_irqrestore(&imp->imp_lock,
720                                                                flags);
721                                         GOTO(interpret, req->rq_status);
722                                 }
723                                 if (req->rq_no_resend) {
724                                         req->rq_status = -ENOTCONN;
725                                         req->rq_phase = RQ_PHASE_INTERPRET;
726                                         spin_unlock_irqrestore(&imp->imp_lock,
727                                                                flags);
728                                         GOTO(interpret, req->rq_status);
729                                 }
730                                 list_add_tail(&req->rq_list,
731                                               &imp->imp_sending_list);
732
733                                 spin_unlock_irqrestore(&imp->imp_lock, flags);
734
735                                 req->rq_waiting = 0;
736                                 if (req->rq_resend) {
737                                         if (!req->rq_ptlrpcs_restart)
738                                                 lustre_msg_add_flags(
739                                                         req->rq_reqmsg,
740                                                         MSG_RESENT);
741                                         if (req->rq_bulk) {
742                                                 __u64 old_xid = req->rq_xid;
743
744                                                 ptlrpc_unregister_bulk (req);
745
746                                                 /* ensure previous bulk fails */
747                                                 req->rq_xid = ptlrpc_next_xid();
748                                                 CDEBUG(D_HA, "resend bulk "
749                                                        "old x"LPU64
750                                                        " new x"LPU64"\n",
751                                                        old_xid, req->rq_xid);
752                                         }
753                                 }
754
755                                 rc = ptl_send_rpc(req);
756                                 if (rc) {
757                                         DEBUG_REQ(D_HA, req, "send failed (%d)",
758                                                   rc);
759                                         force_timer_recalc = 1;
760                                         req->rq_net_err = 1;
761                                 }
762                                 /* need to reset the timeout */
763                                 force_timer_recalc = 1;
764                         }
765
766                         /* Still waiting for a reply? */
767                         if (ptlrpc_client_receiving_reply(req))
768                                 continue;
769
770                         /* Did we actually receive a reply? */
771                         if (!ptlrpc_client_replied(req))
772                                 continue;
773
774                         spin_lock_irqsave(&imp->imp_lock, flags);
775                         list_del_init(&req->rq_list);
776                         spin_unlock_irqrestore(&imp->imp_lock, flags);
777
778                         req->rq_status = after_reply(req);
779                         if (req->rq_resend) {
780                                 /* Add this req to the delayed list so
781                                    it can be errored if the import is
782                                    evicted after recovery. */
783                                 spin_lock_irqsave (&req->rq_lock, flags);
784                                 list_add_tail(&req->rq_list, 
785                                               &imp->imp_delayed_list);
786                                 spin_unlock_irqrestore(&req->rq_lock, flags);
787                                 continue;
788                         }
789
790                         /* If there is no bulk associated with this request,
791                          * then we're done and should let the interpreter
792                          * process the reply.  Similarly if the RPC returned
793                          * an error, and therefore the bulk will never arrive.
794                          */
795                         if (req->rq_bulk == NULL || req->rq_status != 0) {
796                                 req->rq_phase = RQ_PHASE_INTERPRET;
797                                 GOTO(interpret, req->rq_status);
798                         }
799
800                         req->rq_phase = RQ_PHASE_BULK;
801                 }
802
803                 LASSERT(req->rq_phase == RQ_PHASE_BULK);
804                 if (ptlrpc_bulk_active(req->rq_bulk))
805                         continue;
806
807                 if (!req->rq_bulk->bd_success) {
808                         /* The RPC reply arrived OK, but the bulk screwed
809                          * up!  Dead wierd since the server told us the RPC
810                          * was good after getting the REPLY for her GET or
811                          * the ACK for her PUT. */
812                         DEBUG_REQ(D_ERROR, req, "bulk transfer failed");
813                         LBUG();
814                 }
815
816                 req->rq_phase = RQ_PHASE_INTERPRET;
817
818         interpret:
819                 LASSERT(req->rq_phase == RQ_PHASE_INTERPRET);
820                 LASSERT(!req->rq_receiving_reply);
821
822                 ptlrpc_unregister_reply(req);
823                 if (req->rq_bulk != NULL)
824                         ptlrpc_unregister_bulk (req);
825
826                 req->rq_phase = RQ_PHASE_COMPLETE;
827
828                 if (req->rq_interpret_reply != NULL) {
829                         int (*interpreter)(struct ptlrpc_request *,void *,int) =
830                                 req->rq_interpret_reply;
831                         req->rq_status = interpreter(req, &req->rq_async_args,
832                                                      req->rq_status);
833                 }
834
835                 CDEBUG(D_RPCTRACE, "Completed RPC pname:cluuid:pid:xid:ni:nid:"
836                        "opc %s:%s:%d:"LPU64":%s:%s:%d\n", current->comm,
837                        imp->imp_obd->obd_uuid.uuid, req->rq_reqmsg->status,
838                        req->rq_xid,
839                        imp->imp_connection->c_peer.peer_ni->pni_name,
840                        ptlrpc_peernid2str(&imp->imp_connection->c_peer, str),
841                        req->rq_reqmsg->opc);
842
843                 set->set_remaining--;
844
845                 atomic_dec(&imp->imp_inflight);
846                 wake_up(&imp->imp_recovery_waitq);
847         }
848
849         /* If we hit an error, we want to recover promptly. */
850         RETURN(set->set_remaining == 0 || force_timer_recalc);
851 }
852
853 int ptlrpc_expire_one_request(struct ptlrpc_request *req)
854 {
855         unsigned long      flags;
856         struct obd_import *imp = req->rq_import;
857         int replied = 0;
858         ENTRY;
859
860         DEBUG_REQ(D_ERROR, req, "timeout (sent at %lu, %lus ago)",
861                   (long)req->rq_sent, LTIME_S(CURRENT_TIME) - req->rq_sent);
862
863         spin_lock_irqsave (&req->rq_lock, flags);
864         replied = req->rq_replied;
865         if (!replied)
866                 req->rq_timedout = 1;
867         spin_unlock_irqrestore (&req->rq_lock, flags);
868
869         if (replied)
870                 RETURN(0);
871
872         ptlrpc_unregister_reply (req);
873
874         if (obd_dump_on_timeout)
875                 portals_debug_dumplog();
876
877         if (req->rq_bulk != NULL)
878                 ptlrpc_unregister_bulk (req);
879
880         if (imp == NULL) {
881                 DEBUG_REQ(D_HA, req, "NULL import: already cleaned up?");
882                 RETURN(1);
883         }
884
885         /* The DLM server doesn't want recovery run on its imports. */
886         if (imp->imp_dlm_fake)
887                 RETURN(1);
888
889         /* If this request is for recovery or other primordial tasks,
890          * then error it out here. */
891         if (req->rq_send_state != LUSTRE_IMP_FULL ||
892             imp->imp_obd->obd_no_recov) {
893                 spin_lock_irqsave (&req->rq_lock, flags);
894                 req->rq_status = -ETIMEDOUT;
895                 req->rq_err = 1;
896                 spin_unlock_irqrestore (&req->rq_lock, flags);
897                 RETURN(1);
898         }
899
900         ptlrpc_fail_import(imp, req->rq_import_generation);
901
902         RETURN(0);
903 }
904
905 int ptlrpc_expired_set(void *data)
906 {
907         struct ptlrpc_request_set *set = data;
908         struct list_head          *tmp;
909         time_t                     now = LTIME_S(CURRENT_TIME);
910         ENTRY;
911
912         LASSERT(set != NULL);
913
914         /* A timeout expired; see which reqs it applies to... */
915         list_for_each (tmp, &set->set_requests) {
916                 struct ptlrpc_request *req =
917                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
918
919                 /* request in-flight? */
920                 if (!((req->rq_phase == RQ_PHASE_RPC && !req->rq_waiting 
921                        && !req->rq_resend) ||
922                       (req->rq_phase == RQ_PHASE_BULK)))
923                         continue;
924
925                 if (req->rq_timedout ||           /* already dealt with */
926                     req->rq_sent + req->rq_timeout > now) /* not expired */
927                         continue;
928
929                 /* deal with this guy */
930                 ptlrpc_expire_one_request (req);
931         }
932
933         /* When waiting for a whole set, we always to break out of the
934          * sleep so we can recalculate the timeout, or enable interrupts
935          * iff everyone's timed out.
936          */
937         RETURN(1);
938 }
939
940 void ptlrpc_mark_interrupted(struct ptlrpc_request *req)
941 {
942         unsigned long flags;
943         spin_lock_irqsave(&req->rq_lock, flags);
944         req->rq_intr = 1;
945         spin_unlock_irqrestore(&req->rq_lock, flags);
946 }
947
948 void ptlrpc_interrupted_set(void *data)
949 {
950         struct ptlrpc_request_set *set = data;
951         struct list_head *tmp;
952
953         LASSERT(set != NULL);
954         CERROR("INTERRUPTED SET %p\n", set);
955
956         list_for_each(tmp, &set->set_requests) {
957                 struct ptlrpc_request *req =
958                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
959
960                 if (req->rq_phase != RQ_PHASE_RPC)
961                         continue;
962
963                 ptlrpc_mark_interrupted(req);
964         }
965 }
966
967 int ptlrpc_set_next_timeout(struct ptlrpc_request_set *set)
968 {
969         struct list_head      *tmp;
970         time_t                 now = LTIME_S(CURRENT_TIME);
971         time_t                 deadline;
972         int                    timeout = 0;
973         struct ptlrpc_request *req;
974         ENTRY;
975
976         SIGNAL_MASK_ASSERT(); /* XXX BUG 1511 */
977
978         list_for_each(tmp, &set->set_requests) {
979                 req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
980
981                 /* request in-flight? */
982                 if (!((req->rq_phase == RQ_PHASE_RPC && !req->rq_waiting) ||
983                       (req->rq_phase == RQ_PHASE_BULK)))
984                         continue;
985
986                 if (req->rq_timedout)   /* already timed out */
987                         continue;
988
989                 deadline = req->rq_sent + req->rq_timeout;
990                 if (deadline <= now)    /* actually expired already */
991                         timeout = 1;    /* ASAP */
992                 else if (timeout == 0 || timeout > deadline - now)
993                         timeout = deadline - now;
994         }
995         RETURN(timeout);
996 }
997                 
998
999 int ptlrpc_set_wait(struct ptlrpc_request_set *set)
1000 {
1001         struct list_head      *tmp;
1002         struct ptlrpc_request *req;
1003         struct l_wait_info     lwi;
1004         int                    rc, timeout;
1005         ENTRY;
1006
1007         LASSERT(!list_empty(&set->set_requests));
1008         list_for_each(tmp, &set->set_requests) {
1009                 req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
1010                 if (req->rq_phase == RQ_PHASE_NEW)
1011                         (void)ptlrpc_send_new_req(req);
1012         }
1013
1014         do {
1015                 timeout = ptlrpc_set_next_timeout(set);
1016
1017                 /* wait until all complete, interrupted, or an in-flight
1018                  * req times out */
1019                 CDEBUG(D_HA, "set %p going to sleep for %d seconds\n",
1020                        set, timeout);
1021                 lwi = LWI_TIMEOUT_INTR((timeout ? timeout : 1) * HZ,
1022                                        ptlrpc_expired_set,
1023                                        ptlrpc_interrupted_set, set);
1024                 rc = l_wait_event(set->set_waitq, ptlrpc_check_set(set), &lwi);
1025
1026                 LASSERT(rc == 0 || rc == -EINTR || rc == -ETIMEDOUT);
1027
1028                 /* -EINTR => all requests have been flagged rq_intr so next
1029                  * check completes.
1030                  * -ETIMEOUTD => someone timed out.  When all reqs have
1031                  * timed out, signals are enabled allowing completion with
1032                  * EINTR.
1033                  * I don't really care if we go once more round the loop in
1034                  * the error cases -eeb. */
1035         } while (rc != 0 || set->set_remaining != 0);
1036
1037         LASSERT(set->set_remaining == 0);
1038
1039         rc = 0;
1040         list_for_each(tmp, &set->set_requests) {
1041                 req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
1042
1043                 LASSERT(req->rq_phase == RQ_PHASE_COMPLETE);
1044                 if (req->rq_status != 0)
1045                         rc = req->rq_status;
1046         }
1047
1048         if (set->set_interpret != NULL) {
1049                 int (*interpreter)(struct ptlrpc_request_set *set,void *,int) =
1050                         set->set_interpret;
1051                 rc = interpreter (set, set->set_arg, rc);
1052         }
1053
1054         RETURN(rc);
1055 }
1056
1057 static void __ptlrpc_free_req(struct ptlrpc_request *request, int locked)
1058 {
1059         ENTRY;
1060         if (request == NULL) {
1061                 EXIT;
1062                 return;
1063         }
1064
1065         LASSERTF(!request->rq_receiving_reply, "req %p\n", request);
1066         LASSERTF(request->rq_rqbd == NULL, "req %p\n",request);/* client-side */
1067         LASSERTF(list_empty(&request->rq_list), "req %p\n", request);
1068         LASSERTF(list_empty(&request->rq_set_chain), "req %p\n", request);
1069         LASSERT(request->rq_cred);
1070
1071         /* We must take it off the imp_replay_list first.  Otherwise, we'll set
1072          * request->rq_reqmsg to NULL while osc_close is dereferencing it. */
1073         if (request->rq_import != NULL) {
1074                 unsigned long flags = 0;
1075                 if (!locked)
1076                         spin_lock_irqsave(&request->rq_import->imp_lock, flags);
1077                 list_del_init(&request->rq_replay_list);
1078                 if (!locked)
1079                         spin_unlock_irqrestore(&request->rq_import->imp_lock,
1080                                                flags);
1081         }
1082         LASSERTF(list_empty(&request->rq_replay_list), "req %p\n", request);
1083
1084         if (atomic_read(&request->rq_refcount) != 0) {
1085                 DEBUG_REQ(D_ERROR, request,
1086                           "freeing request with nonzero refcount");
1087                 LBUG();
1088         }
1089
1090         if (request->rq_repbuf != NULL)
1091                 ptlrpcs_cli_free_repbuf(request);
1092         if (request->rq_reqbuf != NULL)
1093                 ptlrpcs_cli_free_reqbuf(request);
1094
1095         if (request->rq_export != NULL) {
1096                 class_export_put(request->rq_export);
1097                 request->rq_export = NULL;
1098         }
1099         if (request->rq_import != NULL) {
1100                 class_import_put(request->rq_import);
1101                 request->rq_import = NULL;
1102         }
1103         if (request->rq_bulk != NULL)
1104                 ptlrpc_free_bulk(request->rq_bulk);
1105
1106         ptlrpcs_req_drop_cred(request);
1107         OBD_FREE(request, sizeof(*request));
1108         EXIT;
1109 }
1110
1111 void ptlrpc_free_req(struct ptlrpc_request *request)
1112 {
1113         __ptlrpc_free_req(request, 0);
1114 }
1115
1116 static int __ptlrpc_req_finished(struct ptlrpc_request *request, int locked);
1117 void ptlrpc_req_finished_with_imp_lock(struct ptlrpc_request *request)
1118 {
1119         LASSERT_SPIN_LOCKED(&request->rq_import->imp_lock);
1120         (void)__ptlrpc_req_finished(request, 1);
1121 }
1122
1123 static int __ptlrpc_req_finished(struct ptlrpc_request *request, int locked)
1124 {
1125         ENTRY;
1126         if (request == NULL)
1127                 RETURN(1);
1128
1129         if (request == LP_POISON ||
1130             request->rq_reqmsg == LP_POISON) {
1131                 CERROR("dereferencing freed request (bug 575)\n");
1132                 LBUG();
1133                 RETURN(1);
1134         }
1135
1136         DEBUG_REQ(D_INFO, request, "refcount now %u",
1137                   atomic_read(&request->rq_refcount) - 1);
1138
1139         if (atomic_dec_and_test(&request->rq_refcount)) {
1140                 __ptlrpc_free_req(request, locked);
1141                 RETURN(1);
1142         }
1143
1144         RETURN(0);
1145 }
1146
1147 void ptlrpc_req_finished(struct ptlrpc_request *request)
1148 {
1149         __ptlrpc_req_finished(request, 0);
1150 }
1151
1152 /* Disengage the client's reply buffer from the network
1153  * NB does _NOT_ unregister any client-side bulk.
1154  * IDEMPOTENT, but _not_ safe against concurrent callers.
1155  * The request owner (i.e. the thread doing the I/O) must call...
1156  */
1157 void ptlrpc_unregister_reply (struct ptlrpc_request *request)
1158 {
1159         int                rc;
1160         wait_queue_head_t *wq;
1161         struct l_wait_info lwi;
1162
1163         LASSERT(!in_interrupt ());             /* might sleep */
1164
1165         if (!ptlrpc_client_receiving_reply(request))
1166                 return;
1167
1168         PtlMDUnlink (request->rq_reply_md_h);
1169
1170         /* We have to l_wait_event() whatever the result, to give liblustre
1171          * a chance to run reply_in_callback() */
1172
1173         if (request->rq_set != NULL)
1174                 wq = &request->rq_set->set_waitq;
1175         else
1176                 wq = &request->rq_reply_waitq;
1177
1178         for (;;) {
1179                 /* Network access will complete in finite time but the HUGE
1180                  * timeout lets us CWARN for visibility of sluggish NALs */
1181                 lwi = LWI_TIMEOUT(300 * HZ, NULL, NULL);
1182                 rc = l_wait_event (*wq, !ptlrpc_client_receiving_reply(request), &lwi);
1183                 if (rc == 0)
1184                         return;
1185
1186                 LASSERT (rc == -ETIMEDOUT);
1187                 DEBUG_REQ(D_WARNING, request, "Unexpectedly long timeout");
1188         }
1189 }
1190
1191 /* caller must hold imp->imp_lock */
1192 void ptlrpc_free_committed(struct obd_import *imp)
1193 {
1194         struct list_head *tmp, *saved;
1195         struct ptlrpc_request *req;
1196         struct ptlrpc_request *last_req = NULL; /* temporary fire escape */
1197         ENTRY;
1198
1199         LASSERT(imp != NULL);
1200
1201         LASSERT_SPIN_LOCKED(&imp->imp_lock);
1202
1203         CDEBUG(D_HA, "%s: committing for last_committed "LPU64"\n",
1204                imp->imp_obd->obd_name, imp->imp_peer_committed_transno);
1205
1206         list_for_each_safe(tmp, saved, &imp->imp_replay_list) {
1207                 req = list_entry(tmp, struct ptlrpc_request, rq_replay_list);
1208
1209                 /* XXX ok to remove when 1357 resolved - rread 05/29/03  */
1210                 LASSERT(req != last_req);
1211                 last_req = req;
1212
1213                 if (req->rq_import_generation < imp->imp_generation) {
1214                         DEBUG_REQ(D_HA, req, "freeing request with old gen");
1215                         GOTO(free_req, 0);
1216                 }
1217
1218                 if (req->rq_replay) {
1219 #warning "REMOVE THIS CRAP ONCE 6403 SOLVED -bzzz"
1220                         struct mdc_open_data {
1221                                 struct obd_client_handle *mod_och;
1222                                 struct ptlrpc_request    *mod_open_req;
1223                                 struct ptlrpc_request    *mod_close_req;
1224                         };
1225                         struct mdc_open_data *mod = req->rq_cb_data;
1226                         if (mod == NULL || mod->mod_close_req == NULL) {
1227                                 DEBUG_REQ(D_HA, req, "keeping (FL_REPLAY)");
1228                                 continue;
1229                         }
1230                         DEBUG_REQ(D_HA, req, "keeping (FL_REPLAY), "
1231                                   "closed by x"LPD64"/t"LPD64,
1232                                   mod->mod_close_req->rq_xid,
1233                                   mod->mod_close_req->rq_repmsg->transno);
1234                         continue;
1235                 }
1236
1237                 /* not yet committed */
1238                 if (req->rq_transno > imp->imp_peer_committed_transno) {
1239                         DEBUG_REQ(D_HA, req, "stopping search");
1240                         break;
1241                 }
1242
1243                 DEBUG_REQ(D_HA, req, "committing (last_committed "LPU64")",
1244                           imp->imp_peer_committed_transno);
1245 free_req:
1246                 if (req->rq_commit_cb != NULL)
1247                         req->rq_commit_cb(req);
1248                 list_del_init(&req->rq_replay_list);
1249                 __ptlrpc_req_finished(req, 1);
1250         }
1251
1252         EXIT;
1253         return;
1254 }
1255
1256 void ptlrpc_cleanup_client(struct obd_import *imp)
1257 {
1258         ENTRY;
1259         EXIT;
1260         return;
1261 }
1262
1263 void ptlrpc_resend_req(struct ptlrpc_request *req)
1264 {
1265         unsigned long flags;
1266
1267         DEBUG_REQ(D_HA, req, "going to resend");
1268         req->rq_reqmsg->handle.cookie = 0;
1269         req->rq_status = -EAGAIN;
1270
1271         spin_lock_irqsave (&req->rq_lock, flags);
1272         req->rq_resend = 1;
1273         req->rq_net_err = 0;
1274         req->rq_timedout = 0;
1275         if (req->rq_bulk) {
1276                 __u64 old_xid = req->rq_xid;
1277                 
1278                 /* ensure previous bulk fails */
1279                 req->rq_xid = ptlrpc_next_xid();
1280                 CDEBUG(D_HA, "resend bulk old x"LPU64" new x"LPU64"\n",
1281                        old_xid, req->rq_xid);
1282         }
1283         ptlrpc_wake_client_req(req);
1284         spin_unlock_irqrestore (&req->rq_lock, flags);
1285 }
1286
1287 /* XXX: this function and rq_status are currently unused */
1288 void ptlrpc_restart_req(struct ptlrpc_request *req)
1289 {
1290         unsigned long flags;
1291
1292         DEBUG_REQ(D_HA, req, "restarting (possibly-)completed request");
1293         req->rq_status = -ERESTARTSYS;
1294
1295         spin_lock_irqsave (&req->rq_lock, flags);
1296         req->rq_restart = 1;
1297         req->rq_timedout = 0;
1298         ptlrpc_wake_client_req(req);
1299         spin_unlock_irqrestore (&req->rq_lock, flags);
1300 }
1301
1302 static int expired_request(void *data)
1303 {
1304         struct ptlrpc_request *req = data;
1305         ENTRY;
1306
1307         /* some failure can suspend regular timeouts */
1308         if (ptlrpc_check_suspend())
1309                 RETURN(1);
1310
1311         RETURN(ptlrpc_expire_one_request(req));
1312 }
1313
1314 static void interrupted_request(void *data)
1315 {
1316         unsigned long flags;
1317
1318         struct ptlrpc_request *req = data;
1319         DEBUG_REQ(D_HA, req, "request interrupted");
1320         spin_lock_irqsave (&req->rq_lock, flags);
1321         req->rq_intr = 1;
1322         spin_unlock_irqrestore (&req->rq_lock, flags);
1323 }
1324
1325 struct ptlrpc_request *ptlrpc_request_addref(struct ptlrpc_request *req)
1326 {
1327         ENTRY;
1328         atomic_inc(&req->rq_refcount);
1329         RETURN(req);
1330 }
1331
1332 void ptlrpc_retain_replayable_request(struct ptlrpc_request *req,
1333                                       struct obd_import *imp)
1334 {
1335         struct list_head *tmp;
1336
1337         LASSERT_SPIN_LOCKED(&imp->imp_lock);
1338
1339         /* clear this  for new requests that were resent as well
1340            as resent replayed requests. */
1341         lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT);
1342
1343         /* don't re-add requests that have been replayed */
1344         if (!list_empty(&req->rq_replay_list))
1345                 return;
1346
1347         lustre_msg_add_flags(req->rq_reqmsg, MSG_REPLAY);
1348
1349         LASSERT(imp->imp_replayable);
1350         /* Balanced in ptlrpc_free_committed, usually. */
1351         ptlrpc_request_addref(req);
1352         list_for_each_prev(tmp, &imp->imp_replay_list) {
1353                 struct ptlrpc_request *iter =
1354                         list_entry(tmp, struct ptlrpc_request, rq_replay_list);
1355
1356                 /* We may have duplicate transnos if we create and then
1357                  * open a file, or for closes retained if to match creating
1358                  * opens, so use req->rq_xid as a secondary key.
1359                  * (See bugs 684, 685, and 428.)
1360                  * XXX no longer needed, but all opens need transnos!
1361                  */
1362                 if (iter->rq_transno > req->rq_transno)
1363                         continue;
1364
1365                 if (iter->rq_transno == req->rq_transno) {
1366                         LASSERT(iter->rq_xid != req->rq_xid);
1367                         if (iter->rq_xid > req->rq_xid)
1368                                 continue;
1369                 }
1370
1371                 list_add(&req->rq_replay_list, &iter->rq_replay_list);
1372                 return;
1373         }
1374
1375         list_add_tail(&req->rq_replay_list, &imp->imp_replay_list);
1376 }
1377
1378 int ptlrpc_queue_wait(struct ptlrpc_request *req)
1379 {
1380         char str[PTL_NALFMT_SIZE];
1381         int rc = 0;
1382         int brc;
1383         struct l_wait_info lwi;
1384         struct obd_import *imp = req->rq_import;
1385         unsigned long flags;
1386         int timeout = 0;
1387         ENTRY;
1388
1389         LASSERT(req->rq_set == NULL);
1390         LASSERT(!req->rq_receiving_reply);
1391         atomic_inc(&imp->imp_inflight);
1392
1393         if (imp->imp_connection == NULL) {
1394                 CERROR("request on not connected import %s\n",
1395                         imp->imp_obd->obd_name);
1396                 RETURN(-EINVAL);
1397         }
1398
1399         /* for distributed debugging */
1400         req->rq_reqmsg->status = current->pid;
1401         LASSERT(imp->imp_obd != NULL);
1402         CDEBUG(D_RPCTRACE, "Sending RPC pname:cluuid:pid:xid:ni:nid:opc "
1403                "%s:%s:%d:"LPU64":%s:%s:%d\n", current->comm,
1404                imp->imp_obd->obd_uuid.uuid,
1405                req->rq_reqmsg->status, req->rq_xid,
1406                imp->imp_connection->c_peer.peer_ni->pni_name,
1407                ptlrpc_peernid2str(&imp->imp_connection->c_peer, str),
1408                req->rq_reqmsg->opc);
1409
1410         /* Mark phase here for a little debug help */
1411         req->rq_phase = RQ_PHASE_RPC;
1412
1413         spin_lock_irqsave(&imp->imp_lock, flags);
1414         req->rq_import_generation = imp->imp_generation;
1415 restart:
1416         if (ptlrpc_import_delay_req(imp, req, &rc)) {
1417                 list_del(&req->rq_list);
1418
1419                 list_add_tail(&req->rq_list, &imp->imp_delayed_list);
1420                 spin_unlock_irqrestore(&imp->imp_lock, flags);
1421
1422                 DEBUG_REQ(D_HA, req, "\"%s\" waiting for recovery: (%s != %s)",
1423                           current->comm, 
1424                           ptlrpc_import_state_name(req->rq_send_state), 
1425                           ptlrpc_import_state_name(imp->imp_state));
1426                 lwi = LWI_INTR(interrupted_request, req);
1427                 rc = l_wait_event(req->rq_reply_waitq,
1428                                   (req->rq_send_state == imp->imp_state ||
1429                                    req->rq_err),
1430                                   &lwi);
1431                 DEBUG_REQ(D_HA, req, "\"%s\" awake: (%s == %s or %d == 1)",
1432                           current->comm, 
1433                           ptlrpc_import_state_name(imp->imp_state), 
1434                           ptlrpc_import_state_name(req->rq_send_state),
1435                           req->rq_err);
1436
1437                 spin_lock_irqsave(&imp->imp_lock, flags);
1438                 list_del_init(&req->rq_list);
1439
1440                 if (req->rq_err) {
1441                         rc = -EIO;
1442                 } 
1443                 else if (req->rq_intr) {
1444                         rc = -EINTR;
1445                 }
1446                 else if (req->rq_no_resend) {
1447                         spin_unlock_irqrestore(&imp->imp_lock, flags);
1448                         GOTO(out, rc = -ETIMEDOUT);
1449                 }
1450                 else {
1451                         GOTO(restart, rc);
1452                 }
1453         } 
1454
1455         if (rc != 0) {
1456                 list_del_init(&req->rq_list);
1457                 spin_unlock_irqrestore(&imp->imp_lock, flags);
1458                 req->rq_status = rc; // XXX this ok?
1459                 GOTO(out, rc);
1460         }
1461
1462         if (req->rq_resend) {
1463                 if (!req->rq_ptlrpcs_restart)
1464                         lustre_msg_add_flags(req->rq_reqmsg, MSG_RESENT);
1465
1466                 if (req->rq_bulk != NULL)
1467                         ptlrpc_unregister_bulk (req);
1468
1469                 DEBUG_REQ(D_HA, req, "resending: ");
1470         }
1471
1472         /* XXX this is the same as ptlrpc_set_wait */
1473         LASSERT(list_empty(&req->rq_list));
1474         list_add_tail(&req->rq_list, &imp->imp_sending_list);
1475         spin_unlock_irqrestore(&imp->imp_lock, flags);
1476
1477         rc = ptl_send_rpc(req);
1478         if (rc) {
1479                 DEBUG_REQ(D_HA, req, "send failed (%d); recovering", rc);
1480                 timeout = 1;
1481         } else {
1482                 timeout = MAX(req->rq_timeout * HZ, 1);
1483                 DEBUG_REQ(D_NET, req, "-- sleeping for %d jiffies", timeout);
1484         }
1485 repeat:
1486         lwi = LWI_TIMEOUT_INTR(timeout, expired_request, interrupted_request,
1487                                req);
1488         rc = l_wait_event(req->rq_reply_waitq, ptlrpc_check_reply(req), &lwi);
1489         if (rc == -ETIMEDOUT && ptlrpc_check_and_wait_suspend(req))
1490                 goto repeat;
1491         DEBUG_REQ(D_NET, req, "-- done sleeping");
1492
1493         CDEBUG(D_RPCTRACE, "Completed RPC pname:cluuid:pid:xid:ni:nid:opc "
1494                "%s:%s:%d:"LPU64":%s:%s:%d\n", current->comm,
1495                imp->imp_obd->obd_uuid.uuid,
1496                req->rq_reqmsg->status, req->rq_xid,
1497                imp->imp_connection->c_peer.peer_ni->pni_name,
1498                ptlrpc_peernid2str(&imp->imp_connection->c_peer, str),
1499                req->rq_reqmsg->opc);
1500
1501         spin_lock_irqsave(&imp->imp_lock, flags);
1502         list_del_init(&req->rq_list);
1503         spin_unlock_irqrestore(&imp->imp_lock, flags);
1504
1505         /* If the reply was received normally, this just grabs the spinlock
1506          * (ensuring the reply callback has returned), sees that
1507          * req->rq_receiving_reply is clear and returns. */
1508         ptlrpc_unregister_reply (req);
1509
1510         if (req->rq_err)
1511                 GOTO(out, rc = -EIO);
1512
1513         if (req->rq_ptlrpcs_err)
1514                 GOTO(out, rc = -EPERM);
1515
1516         /* Resend if we need to, unless we were interrupted. */
1517         if (req->rq_resend && !req->rq_intr) {
1518                 /* ...unless we were specifically told otherwise. */
1519                 if (req->rq_no_resend)
1520                         GOTO(out, rc = -ETIMEDOUT);
1521                 spin_lock_irqsave(&imp->imp_lock, flags);
1522                 goto restart;
1523         }
1524
1525         if (req->rq_intr) {
1526                 /* Should only be interrupted if we timed out. */
1527                 if (!req->rq_timedout)
1528                         DEBUG_REQ(D_ERROR, req,
1529                                   "rq_intr set but rq_timedout not");
1530                 GOTO(out, rc = -EINTR);
1531         }
1532
1533         if (req->rq_timedout) {                 /* non-recoverable timeout */
1534                 GOTO(out, rc = -ETIMEDOUT);
1535         }
1536
1537         if (!req->rq_replied) {
1538                 /* How can this be? -eeb */
1539                 DEBUG_REQ(D_ERROR, req, "!rq_replied: ");
1540                 LBUG();
1541                 GOTO(out, rc = req->rq_status);
1542         }
1543
1544         rc = after_reply (req);
1545         /* NB may return +ve success rc */
1546         if (req->rq_resend) {
1547                 spin_lock_irqsave(&imp->imp_lock, flags);
1548                 goto restart;
1549         }
1550
1551  out:
1552         if (req->rq_bulk != NULL) {
1553                 if (rc >= 0) {                  
1554                         /* success so far.  Note that anything going wrong
1555                          * with bulk now, is EXTREMELY strange, since the
1556                          * server must have believed that the bulk
1557                          * tranferred OK before she replied with success to
1558                          * me. */
1559                         lwi = LWI_TIMEOUT(timeout, NULL, NULL);
1560                         brc = l_wait_event(req->rq_reply_waitq,
1561                                            !ptlrpc_bulk_active(req->rq_bulk),
1562                                            &lwi);
1563                         LASSERT(brc == 0 || brc == -ETIMEDOUT);
1564                         if (brc != 0) {
1565                                 LASSERT(brc == -ETIMEDOUT);
1566                                 DEBUG_REQ(D_ERROR, req, "bulk timed out");
1567                                 rc = brc;
1568                         } else if (!req->rq_bulk->bd_success) {
1569                                 DEBUG_REQ(D_ERROR, req, "bulk transfer failed");
1570                                 rc = -EIO;
1571                         }
1572                 }
1573                 if (rc < 0)
1574                         ptlrpc_unregister_bulk (req);
1575         }
1576
1577         LASSERT(!req->rq_receiving_reply);
1578         req->rq_phase = RQ_PHASE_INTERPRET;
1579
1580         atomic_dec(&imp->imp_inflight);
1581         wake_up(&imp->imp_recovery_waitq);
1582         RETURN(rc);
1583 }
1584
1585 struct ptlrpc_replay_async_args {
1586         int praa_old_state;
1587         int praa_old_status;
1588 };
1589
1590 static int ptlrpc_replay_interpret(struct ptlrpc_request *req,
1591                                     void * data, int rc)
1592 {
1593         struct ptlrpc_replay_async_args *aa = data;
1594         struct obd_import *imp = req->rq_import;
1595         unsigned long flags;
1596
1597         atomic_dec(&imp->imp_replay_inflight);
1598         
1599         if (!req->rq_replied) {
1600                 CERROR("request replay timed out, restarting recovery\n");
1601                 GOTO(out, rc = -ETIMEDOUT);
1602         }
1603
1604 #if SWAB_PARANOIA
1605         /* Clear reply swab mask; this is a new reply in sender's byte order */
1606         req->rq_rep_swab_mask = 0;
1607 #endif
1608         LASSERT (req->rq_nob_received <= req->rq_repbuf_len);
1609         rc = lustre_unpack_msg(req->rq_repmsg, req->rq_replen);
1610         if (rc) {
1611                 CERROR("unpack_rep failed: %d\n", rc);
1612                 GOTO(out, rc = -EPROTO);
1613         }
1614
1615         if (req->rq_repmsg->type == PTL_RPC_MSG_ERR && 
1616             req->rq_repmsg->status == -ENOTCONN) 
1617                 GOTO(out, rc = req->rq_repmsg->status);
1618
1619         /* The transno had better not change over replay. */
1620         LASSERT(req->rq_reqmsg->transno == req->rq_repmsg->transno);
1621
1622         DEBUG_REQ(D_HA, req, "got rep");
1623
1624         /* let the callback do fixups, possibly including in the request */
1625         if (req->rq_replay_cb)
1626                 req->rq_replay_cb(req);
1627
1628         if (req->rq_replied && req->rq_repmsg->status != aa->praa_old_status) {
1629                 DEBUG_REQ(D_HA, req, "status %d, old was %d",
1630                           req->rq_repmsg->status, aa->praa_old_status);
1631         } else {
1632                 /* Put it back for re-replay. */
1633                 req->rq_repmsg->status = aa->praa_old_status;
1634         }
1635
1636         spin_lock_irqsave(&imp->imp_lock, flags);
1637         imp->imp_last_replay_transno = req->rq_transno;
1638         spin_unlock_irqrestore(&imp->imp_lock, flags);
1639
1640         /* continue with recovery */
1641         rc = ptlrpc_import_recovery_state_machine(imp);
1642  out:
1643         req->rq_send_state = aa->praa_old_state;
1644         
1645         if (rc != 0)
1646                 /* this replay failed, so restart recovery */
1647                 ptlrpc_connect_import(imp, NULL);
1648
1649         RETURN(rc);
1650 }
1651
1652
1653 int ptlrpc_replay_req(struct ptlrpc_request *req)
1654 {
1655         struct ptlrpc_replay_async_args *aa;
1656         ENTRY;
1657
1658         LASSERT(req->rq_import->imp_state == LUSTRE_IMP_REPLAY);
1659
1660         /* Not handling automatic bulk replay yet (or ever?) */
1661         LASSERT(req->rq_bulk == NULL);
1662
1663         DEBUG_REQ(D_HA, req, "REPLAY");
1664
1665         LASSERT (sizeof (*aa) <= sizeof (req->rq_async_args));
1666         aa = (struct ptlrpc_replay_async_args *)&req->rq_async_args;
1667         memset(aa, 0, sizeof *aa);
1668
1669         /* Prepare request to be resent with ptlrpcd */
1670         aa->praa_old_state = req->rq_send_state;
1671         req->rq_send_state = LUSTRE_IMP_REPLAY;
1672         req->rq_phase = RQ_PHASE_NEW;
1673         aa->praa_old_status = req->rq_repmsg->status;
1674         req->rq_status = 0;
1675
1676         req->rq_interpret_reply = ptlrpc_replay_interpret;
1677         atomic_inc(&req->rq_import->imp_replay_inflight);
1678         ptlrpc_request_addref(req); /* ptlrpcd needs a ref */
1679
1680         ptlrpcd_add_req(req);
1681         RETURN(0);
1682 }
1683
1684 void ptlrpc_abort_inflight(struct obd_import *imp)
1685 {
1686         unsigned long flags;
1687         struct list_head *tmp, *n;
1688         ENTRY;
1689
1690         /* Make sure that no new requests get processed for this import.
1691          * ptlrpc_{queue,set}_wait must (and does) hold imp_lock while testing
1692          * this flag and then putting requests on sending_list or delayed_list.
1693          */
1694         spin_lock_irqsave(&imp->imp_lock, flags);
1695
1696         /* XXX locking?  Maybe we should remove each request with the list
1697          * locked?  Also, how do we know if the requests on the list are
1698          * being freed at this time?
1699          */
1700         list_for_each_safe(tmp, n, &imp->imp_sending_list) {
1701                 struct ptlrpc_request *req =
1702                         list_entry(tmp, struct ptlrpc_request, rq_list);
1703
1704                 DEBUG_REQ(D_HA, req, "inflight");
1705
1706                 spin_lock (&req->rq_lock);
1707                 if (req->rq_import_generation < imp->imp_generation) {
1708                         req->rq_err = 1;
1709                         ptlrpc_wake_client_req(req);
1710                 }
1711                 spin_unlock (&req->rq_lock);
1712         }
1713
1714         list_for_each_safe(tmp, n, &imp->imp_delayed_list) {
1715                 struct ptlrpc_request *req =
1716                         list_entry(tmp, struct ptlrpc_request, rq_list);
1717
1718                 DEBUG_REQ(D_HA, req, "aborting waiting req");
1719
1720                 spin_lock (&req->rq_lock);
1721                 if (req->rq_import_generation < imp->imp_generation) {
1722                         req->rq_err = 1;
1723                         ptlrpc_wake_client_req(req);
1724                 }
1725                 spin_unlock (&req->rq_lock);
1726         }
1727
1728         list_for_each_safe(tmp, n, &imp->imp_rawrpc_list) {
1729                 struct ptlrpc_request *req =
1730                         list_entry(tmp, struct ptlrpc_request, rq_list);
1731
1732                 DEBUG_REQ(D_HA, req, "aborting raw rpc");
1733
1734                 spin_lock(&req->rq_lock);
1735                 req->rq_err = 1;
1736                 ptlrpc_wake_client_req(req);
1737                 spin_unlock(&req->rq_lock);
1738         }
1739
1740         /* Last chance to free reqs left on the replay list, but we
1741          * will still leak reqs that haven't comitted.  */
1742         if (imp->imp_replayable)
1743                 ptlrpc_free_committed(imp);
1744
1745         spin_unlock_irqrestore(&imp->imp_lock, flags);
1746
1747         EXIT;
1748 }
1749
1750 static __u64 ptlrpc_last_xid = 0;
1751 static spinlock_t ptlrpc_last_xid_lock = SPIN_LOCK_UNLOCKED;
1752
1753 __u64 ptlrpc_next_xid(void)
1754 {
1755         __u64 tmp;
1756         spin_lock(&ptlrpc_last_xid_lock);
1757         tmp = ++ptlrpc_last_xid;
1758         spin_unlock(&ptlrpc_last_xid_lock);
1759         return tmp;
1760 }
1761
1762