Whamcloud - gitweb
b=6403
[fs/lustre-release.git] / lustre / ptlrpc / client.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  *
21  */
22
23 #define DEBUG_SUBSYSTEM S_RPC
24 #ifndef __KERNEL__
25 #include <errno.h>
26 #include <signal.h>
27 #include <liblustre.h>
28 #endif
29
30 #include <linux/obd_support.h>
31 #include <linux/obd_class.h>
32 #include <linux/lustre_lib.h>
33 #include <linux/lustre_ha.h>
34 #include <linux/lustre_import.h>
35 #include <linux/lustre_sec.h>
36
37 #include "ptlrpc_internal.h"
38
39 void ptlrpc_init_client(int req_portal, int rep_portal, char *name,
40                         struct ptlrpc_client *cl)
41 {
42         cl->cli_request_portal = req_portal;
43         cl->cli_reply_portal   = rep_portal;
44         cl->cli_name           = name;
45 }
46
47 struct ptlrpc_connection *ptlrpc_uuid_to_connection(struct obd_uuid *uuid)
48 {
49         struct ptlrpc_connection *c;
50         struct ptlrpc_peer peer;
51         int err;
52
53         err = ptlrpc_uuid_to_peer(uuid, &peer);
54         if (err != 0) {
55                 CERROR("cannot find peer %s!\n", uuid->uuid);
56                 return NULL;
57         }
58
59         c = ptlrpc_get_connection(&peer, uuid);
60         if (c) {
61                 memcpy(c->c_remote_uuid.uuid,
62                        uuid->uuid, sizeof(c->c_remote_uuid.uuid));
63         }
64
65         CDEBUG(D_INFO, "%s -> %p\n", uuid->uuid, c);
66
67         return c;
68 }
69
70 void ptlrpc_readdress_connection(struct ptlrpc_connection *conn,
71                                  struct obd_uuid *uuid)
72 {
73         struct ptlrpc_peer peer;
74         int err;
75
76         err = ptlrpc_uuid_to_peer(uuid, &peer);
77         if (err != 0) {
78                 CERROR("cannot find peer %s!\n", uuid->uuid);
79                 return;
80         }
81
82         memcpy(&conn->c_peer, &peer, sizeof (peer));
83         return;
84 }
85
86 static inline struct ptlrpc_bulk_desc *new_bulk(int npages, int type, int portal)
87 {
88         struct ptlrpc_bulk_desc *desc;
89
90         OBD_ALLOC(desc, offsetof (struct ptlrpc_bulk_desc, bd_iov[npages]));
91         if (!desc)
92                 return NULL;
93
94         spin_lock_init(&desc->bd_lock);
95         init_waitqueue_head(&desc->bd_waitq);
96         desc->bd_max_iov = npages;
97         desc->bd_iov_count = 0;
98         desc->bd_md_h = PTL_INVALID_HANDLE;
99         desc->bd_portal = portal;
100         desc->bd_type = type;
101         
102         return desc;
103 }
104
105 struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_imp (struct ptlrpc_request *req,
106                                                int npages, int type, int portal)
107 {
108         struct obd_import *imp = req->rq_import;
109         struct ptlrpc_bulk_desc *desc;
110
111         LASSERT(type == BULK_PUT_SINK || type == BULK_GET_SOURCE);
112         desc = new_bulk(npages, type, portal);
113         if (desc == NULL)
114                 RETURN(NULL);
115
116         desc->bd_import_generation = req->rq_import_generation;
117         desc->bd_import = class_import_get(imp);
118         desc->bd_req = req;
119
120         desc->bd_cbid.cbid_fn  = client_bulk_callback;
121         desc->bd_cbid.cbid_arg = desc;
122
123         /* This makes req own desc, and free it when she frees herself */
124         req->rq_bulk = desc;
125
126         return desc;
127 }
128
129 struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_exp (struct ptlrpc_request *req,
130                                                int npages, int type, int portal)
131 {
132         struct obd_export *exp = req->rq_export;
133         struct ptlrpc_bulk_desc *desc;
134
135         LASSERT(type == BULK_PUT_SOURCE || type == BULK_GET_SINK);
136
137         desc = new_bulk(npages, type, portal);
138         if (desc == NULL)
139                 RETURN(NULL);
140
141         desc->bd_export = class_export_get(exp);
142         desc->bd_req = req;
143
144         desc->bd_cbid.cbid_fn  = server_bulk_callback;
145         desc->bd_cbid.cbid_arg = desc;
146
147         /* NB we don't assign rq_bulk here; server-side requests are
148          * re-used, and the handler frees the bulk desc explicitly. */
149
150         return desc;
151 }
152
153 void ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc,
154                            struct page *page, int pageoffset, int len)
155 {
156         LASSERT(desc->bd_iov_count < desc->bd_max_iov);
157         LASSERT(page != NULL);
158         LASSERT(pageoffset >= 0);
159         LASSERT(len > 0);
160         LASSERT(pageoffset + len <= PAGE_SIZE);
161
162         desc->bd_nob += len;
163
164         ptlrpc_add_bulk_page(desc, page, pageoffset, len);
165 }
166
167 void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *desc)
168 {
169         ENTRY;
170
171         LASSERT(desc != NULL);
172         LASSERT(desc->bd_iov_count != LI_POISON); /* not freed already */
173         LASSERT(!desc->bd_network_rw);         /* network hands off or */
174         LASSERT((desc->bd_export != NULL) ^ (desc->bd_import != NULL));
175         if (desc->bd_export)
176                 class_export_put(desc->bd_export);
177         else
178                 class_import_put(desc->bd_import);
179
180         OBD_FREE(desc, offsetof(struct ptlrpc_bulk_desc, 
181                                 bd_iov[desc->bd_max_iov]));
182         EXIT;
183 }
184
185 /* FIXME prep_req now should return error code other than NULL. but
186  * this is called everywhere :(
187  */
188 struct ptlrpc_request *ptlrpc_prep_req(struct obd_import *imp, __u32 version,
189                                        int opcode, int count, int *lengths,
190                                        char **bufs)
191 {
192         struct ptlrpc_request *request;
193         int rc;
194         ENTRY;
195
196         LASSERT((unsigned long)imp > 0x1000);
197
198         OBD_ALLOC(request, sizeof(*request));
199         if (!request) {
200                 CERROR("request allocation out of memory\n");
201                 RETURN(NULL);
202         }
203
204         request->rq_import = class_import_get(imp);
205
206         rc = ptlrpcs_req_get_cred(request);
207         if (rc) {
208                 CDEBUG(D_SEC, "failed to get credential\n");
209                 GOTO(out_free, rc);
210         }
211
212         /* try to refresh the cred. we do this here in order to let fewer
213          * refresh be performed in ptlrpcd context (which might block ptlrpcd).
214          * fail out only if a fatal ptlrpcs error occured.
215          */
216         ptlrpcs_req_refresh_cred(request);
217         if (request->rq_ptlrpcs_err)
218                 GOTO(out_cred, rc = -EPERM);
219
220         rc = lustre_pack_request(request, count, lengths, bufs);
221         if (rc) {
222                 CERROR("cannot pack request %d\n", rc);
223                 GOTO(out_cred, rc);
224         }
225         request->rq_reqmsg->version |= version;
226
227         if (imp->imp_server_timeout)
228                 request->rq_timeout = obd_timeout / 2;
229         else
230                 request->rq_timeout = obd_timeout;
231
232         request->rq_send_state = LUSTRE_IMP_FULL;
233         request->rq_type = PTL_RPC_MSG_REQUEST;
234
235         request->rq_req_cbid.cbid_fn  = request_out_callback;
236         request->rq_req_cbid.cbid_arg = request;
237
238         request->rq_reply_cbid.cbid_fn  = reply_in_callback;
239         request->rq_reply_cbid.cbid_arg = request;
240         
241         request->rq_phase = RQ_PHASE_NEW;
242
243         /* XXX FIXME bug 249 */
244         request->rq_request_portal = imp->imp_client->cli_request_portal;
245         request->rq_reply_portal = imp->imp_client->cli_reply_portal;
246
247         spin_lock_init(&request->rq_lock);
248         INIT_LIST_HEAD(&request->rq_list);
249         INIT_LIST_HEAD(&request->rq_replay_list);
250         INIT_LIST_HEAD(&request->rq_set_chain);
251         init_waitqueue_head(&request->rq_reply_waitq);
252         request->rq_xid = ptlrpc_next_xid();
253         atomic_set(&request->rq_refcount, 1);
254
255         request->rq_reqmsg->opc = opcode;
256         request->rq_reqmsg->flags = 0;
257         RETURN(request);
258 out_cred:
259         ptlrpcs_req_drop_cred(request);
260 out_free:
261         class_import_put(imp);
262         OBD_FREE(request, sizeof(*request));
263         RETURN(NULL);
264 }
265
266 struct ptlrpc_request_set *ptlrpc_prep_set(void)
267 {
268         struct ptlrpc_request_set *set;
269
270         OBD_ALLOC(set, sizeof *set);
271         if (!set)
272                 RETURN(NULL);
273         INIT_LIST_HEAD(&set->set_requests);
274         init_waitqueue_head(&set->set_waitq);
275         set->set_remaining = 0;
276         spin_lock_init(&set->set_new_req_lock);
277         INIT_LIST_HEAD(&set->set_new_requests);
278
279         RETURN(set);
280 }
281
282 /* Finish with this set; opposite of prep_set. */
283 void ptlrpc_set_destroy(struct ptlrpc_request_set *set)
284 {
285         struct list_head *tmp;
286         struct list_head *next;
287         int               expected_phase;
288         int               n = 0;
289         ENTRY;
290
291         /* Requests on the set should either all be completed, or all be new */
292         expected_phase = (set->set_remaining == 0) ?
293                          RQ_PHASE_COMPLETE : RQ_PHASE_NEW;
294         list_for_each (tmp, &set->set_requests) {
295                 struct ptlrpc_request *req =
296                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
297
298                 LASSERT(req->rq_phase == expected_phase);
299                 n++;
300         }
301
302         LASSERT(set->set_remaining == 0 || set->set_remaining == n);
303
304         list_for_each_safe(tmp, next, &set->set_requests) {
305                 struct ptlrpc_request *req =
306                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
307                 list_del_init(&req->rq_set_chain);
308
309                 LASSERT(req->rq_phase == expected_phase);
310
311                 if (req->rq_phase == RQ_PHASE_NEW) {
312
313                         if (req->rq_interpret_reply != NULL) {
314                                 int (*interpreter)(struct ptlrpc_request *,
315                                                    void *, int) =
316                                         req->rq_interpret_reply;
317
318                                 /* higher level (i.e. LOV) failed;
319                                  * let the sub reqs clean up */
320                                 req->rq_status = -EBADR;
321                                 interpreter(req, &req->rq_async_args,
322                                             req->rq_status);
323                         }
324                         set->set_remaining--;
325                 }
326
327                 req->rq_set = NULL;
328                 ptlrpc_req_finished (req);
329         }
330
331         LASSERT(set->set_remaining == 0);
332
333         OBD_FREE(set, sizeof(*set));
334         EXIT;
335 }
336
337 void ptlrpc_set_add_req(struct ptlrpc_request_set *set,
338                         struct ptlrpc_request *req)
339 {
340         /* The set takes over the caller's request reference */
341         list_add_tail(&req->rq_set_chain, &set->set_requests);
342         req->rq_set = set;
343         set->set_remaining++;
344         atomic_inc(&req->rq_import->imp_inflight);
345 }
346
347 /* lock so many callers can add things, the context that owns the set
348  * is supposed to notice these and move them into the set proper. */
349 void ptlrpc_set_add_new_req(struct ptlrpc_request_set *set,
350                             struct ptlrpc_request *req)
351 {
352         unsigned long flags;
353         spin_lock_irqsave(&set->set_new_req_lock, flags);
354         /* The set takes over the caller's request reference */
355         list_add_tail(&req->rq_set_chain, &set->set_new_requests);
356         req->rq_set = set;
357         spin_unlock_irqrestore(&set->set_new_req_lock, flags);
358 }
359
360 /*
361  * Based on the current state of the import, determine if the request
362  * can be sent, is an error, or should be delayed.
363  *
364  * Returns true if this request should be delayed. If false, and
365  * *status is set, then the request can not be sent and *status is the
366  * error code.  If false and status is 0, then request can be sent.
367  *
368  * The imp->imp_lock must be held.
369  */
370 static int ptlrpc_import_delay_req(struct obd_import *imp, 
371                                    struct ptlrpc_request *req, int *status)
372 {
373         int delay = 0;
374         ENTRY;
375
376         LASSERT (status != NULL);
377         *status = 0;
378
379         if (imp->imp_state == LUSTRE_IMP_NEW) {
380                 DEBUG_REQ(D_ERROR, req, "Uninitialized import.");
381                 *status = -EIO;
382                 LBUG();
383         }
384         else if (imp->imp_state == LUSTRE_IMP_CLOSED) {
385                 DEBUG_REQ(D_ERROR, req, "IMP_CLOSED ");
386                 *status = -EIO;
387         }
388         /* allow CONNECT even if import is invalid */
389         else if (req->rq_send_state == LUSTRE_IMP_CONNECTING &&
390                  imp->imp_state == LUSTRE_IMP_CONNECTING) {
391                 ;
392         }
393         /*
394          * If the import has been invalidated (such as by an OST failure), the
395          * request must fail with -EIO.  
396          */
397         else if (imp->imp_invalid) {
398                 DEBUG_REQ(D_ERROR, req, "IMP_INVALID");
399                 *status = -EIO;
400         } 
401         else if (req->rq_import_generation != imp->imp_generation) {
402                 DEBUG_REQ(D_ERROR, req, "req wrong generation:");
403                 *status = -EIO;
404         } 
405         else if (req->rq_send_state != imp->imp_state) {
406                 if (imp->imp_obd->obd_no_recov || imp->imp_dlm_fake 
407                     || req->rq_no_delay) 
408                         *status = -EWOULDBLOCK;
409                 else
410                         delay = 1;
411         }
412
413         RETURN(delay);
414 }
415
416 static int ptlrpc_check_reply(struct ptlrpc_request *req)
417 {
418         unsigned long flags;
419         int rc = 0;
420         ENTRY;
421
422         /* serialise with network callback */
423         spin_lock_irqsave (&req->rq_lock, flags);
424
425         if (req->rq_replied) {
426                 DEBUG_REQ(D_NET, req, "REPLIED:");
427                 GOTO(out, rc = 1);
428         }
429         
430         if (req->rq_net_err && !req->rq_timedout) {
431                 spin_unlock_irqrestore (&req->rq_lock, flags);
432                 rc = ptlrpc_expire_one_request(req); 
433                 spin_lock_irqsave (&req->rq_lock, flags);
434                 GOTO(out, rc);
435         }
436
437         if (req->rq_err) {
438                 DEBUG_REQ(D_ERROR, req, "ABORTED:");
439                 GOTO(out, rc = 1);
440         }
441
442         if (req->rq_resend) {
443                 DEBUG_REQ(D_ERROR, req, "RESEND:");
444                 GOTO(out, rc = 1);
445         }
446
447         if (req->rq_restart) {
448                 DEBUG_REQ(D_ERROR, req, "RESTART:");
449                 GOTO(out, rc = 1);
450         }
451         EXIT;
452  out:
453         spin_unlock_irqrestore (&req->rq_lock, flags);
454         DEBUG_REQ(D_NET, req, "rc = %d for", rc);
455         return rc;
456 }
457
458 static int ptlrpc_check_status(struct ptlrpc_request *req)
459 {
460         int err;
461         ENTRY;
462
463         err = req->rq_repmsg->status;
464         if (req->rq_repmsg->type == PTL_RPC_MSG_ERR) {
465                 DEBUG_REQ(D_ERROR, req, "type == PTL_RPC_MSG_ERR, err == %d", 
466                           err);
467                 RETURN(err < 0 ? err : -EINVAL);
468         }
469
470         if (err < 0) {
471                 DEBUG_REQ(D_INFO, req, "status is %d", err);
472         } else if (err > 0) {
473                 /* XXX: translate this error from net to host */
474                 DEBUG_REQ(D_INFO, req, "status is %d", err);
475         }
476
477         RETURN(err);
478 }
479
480 static int after_reply(struct ptlrpc_request *req)
481 {
482         unsigned long flags;
483         struct obd_import *imp = req->rq_import;
484         int rc;
485         ENTRY;
486
487         LASSERT(!req->rq_receiving_reply);
488
489         /* NB Until this point, the whole of the incoming message,
490          * including buflens, status etc is in the sender's byte order. */
491
492 #if SWAB_PARANOIA
493         /* Clear reply swab mask; this is a new reply in sender's byte order */
494         req->rq_rep_swab_mask = 0;
495 #endif
496         LASSERT (req->rq_nob_received <= req->rq_repbuf_len);
497         rc = ptlrpcs_cli_unwrap_reply(req);
498         if (rc) {
499                 CERROR("verify reply error: %d\n", rc);
500                 RETURN(rc);
501         }
502         /* unwrap_reply may request rpc be resend */
503         if (req->rq_ptlrpcs_restart) {
504                 req->rq_resend = 1;
505                 RETURN(0);
506         }
507
508         /* unwrap_reply will set rq_replen as the actual received
509          * lustre_msg length
510          */
511         rc = lustre_unpack_msg(req->rq_repmsg, req->rq_replen);
512         if (rc) {
513                 CERROR("unpack_rep failed: %d\n", rc);
514                 RETURN(-EPROTO);
515         }
516
517         if (req->rq_repmsg->type != PTL_RPC_MSG_REPLY &&
518             req->rq_repmsg->type != PTL_RPC_MSG_ERR) {
519                 CERROR("invalid packet type received (type=%u)\n",
520                        req->rq_repmsg->type);
521                 RETURN(-EPROTO);
522         }
523
524         rc = ptlrpc_check_status(req);
525
526         /* Either we've been evicted, or the server has failed for
527          * some reason. Try to reconnect, and if that fails, punt to the
528          * upcall. */
529         if (rc == -ENOTCONN) {
530                 if (req->rq_send_state != LUSTRE_IMP_FULL ||
531                     imp->imp_obd->obd_no_recov || imp->imp_dlm_fake) {
532                         RETURN(-ENOTCONN);
533                 }
534
535                 ptlrpc_request_handle_notconn(req);
536
537                 RETURN(rc);
538         }
539
540         /* Store transno in reqmsg for replay. */
541         req->rq_reqmsg->transno = req->rq_transno = req->rq_repmsg->transno;
542
543         if (req->rq_import->imp_replayable) {
544                 spin_lock_irqsave(&imp->imp_lock, flags);
545                 if (req->rq_transno != 0)
546                         ptlrpc_retain_replayable_request(req, imp);
547                 else if (req->rq_commit_cb != NULL) {
548                         spin_unlock_irqrestore(&imp->imp_lock, flags);
549                         req->rq_commit_cb(req);
550                         spin_lock_irqsave(&imp->imp_lock, flags);
551                 }
552
553                 if (req->rq_transno > imp->imp_max_transno)
554                         imp->imp_max_transno = req->rq_transno;
555
556                 /* Replay-enabled imports return commit-status information. */
557                 if (req->rq_repmsg->last_committed)
558                         imp->imp_peer_committed_transno =
559                                 req->rq_repmsg->last_committed;
560                 ptlrpc_free_committed(imp);
561                 spin_unlock_irqrestore(&imp->imp_lock, flags);
562         }
563
564         RETURN(rc);
565 }
566
567 static int ptlrpc_send_new_req(struct ptlrpc_request *req)
568 {
569         char                   str[PTL_NALFMT_SIZE];
570         struct obd_import     *imp;
571         unsigned long          flags;
572         int rc;
573         ENTRY;
574
575         LASSERT(req->rq_phase == RQ_PHASE_NEW);
576         req->rq_phase = RQ_PHASE_RPC;
577
578         imp = req->rq_import;
579         spin_lock_irqsave(&imp->imp_lock, flags);
580
581         req->rq_import_generation = imp->imp_generation;
582
583         if (ptlrpc_import_delay_req(imp, req, &rc)) {
584                 spin_lock (&req->rq_lock);
585                 req->rq_waiting = 1;
586                 spin_unlock (&req->rq_lock);
587
588                 DEBUG_REQ(D_HA, req, "req from PID %d waiting for recovery: "
589                           "(%s != %s)",
590                           req->rq_reqmsg->status, 
591                           ptlrpc_import_state_name(req->rq_send_state),
592                           ptlrpc_import_state_name(imp->imp_state));
593                 LASSERT(list_empty (&req->rq_list));
594
595                 list_add_tail(&req->rq_list, &imp->imp_delayed_list);
596                 spin_unlock_irqrestore(&imp->imp_lock, flags);
597                 RETURN(0);
598         }
599
600         if (rc != 0) {
601                 spin_unlock_irqrestore(&imp->imp_lock, flags);
602                 req->rq_status = rc;
603                 req->rq_phase = RQ_PHASE_INTERPRET;
604                 RETURN(rc);
605         }
606
607         /* XXX this is the same as ptlrpc_queue_wait */
608         LASSERT(list_empty(&req->rq_list));
609         list_add_tail(&req->rq_list, &imp->imp_sending_list);
610         spin_unlock_irqrestore(&imp->imp_lock, flags);
611
612         req->rq_reqmsg->status = current->pid;
613         CDEBUG(D_RPCTRACE, "Sending RPC pname:cluuid:pid:xid:ni:nid:opc"
614                " %s:%s:%d:"LPU64":%s:%s:%d\n", current->comm,
615                imp->imp_obd->obd_uuid.uuid, req->rq_reqmsg->status,
616                req->rq_xid,
617                imp->imp_connection->c_peer.peer_ni->pni_name,
618                ptlrpc_peernid2str(&imp->imp_connection->c_peer, str),
619                req->rq_reqmsg->opc);
620
621         rc = ptl_send_rpc(req);
622         if (rc) {
623                 DEBUG_REQ(D_HA, req, "send failed (%d); expect timeout", rc);
624                 req->rq_net_err = 1;
625                 RETURN(rc);
626         }
627         RETURN(0);
628 }
629
630 int ptlrpc_check_set(struct ptlrpc_request_set *set)
631 {
632         char str[PTL_NALFMT_SIZE];
633         unsigned long flags;
634         struct list_head *tmp;
635         int force_timer_recalc = 0;
636         ENTRY;
637
638         if (set->set_remaining == 0)
639                 RETURN(1);
640
641         list_for_each(tmp, &set->set_requests) {
642                 struct ptlrpc_request *req =
643                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
644                 struct obd_import *imp = req->rq_import;
645                 int rc = 0;
646
647                 if (req->rq_phase == RQ_PHASE_NEW &&
648                     ptlrpc_send_new_req(req)) {
649                         force_timer_recalc = 1;
650                 }
651
652                 if (!(req->rq_phase == RQ_PHASE_RPC ||
653                       req->rq_phase == RQ_PHASE_BULK ||
654                       req->rq_phase == RQ_PHASE_INTERPRET ||
655                       req->rq_phase == RQ_PHASE_COMPLETE)) {
656                         DEBUG_REQ(D_ERROR, req, "bad phase %x", req->rq_phase);
657                         LBUG();
658                 }
659
660                 if (req->rq_phase == RQ_PHASE_COMPLETE)
661                         continue;
662
663                 if (req->rq_phase == RQ_PHASE_INTERPRET)
664                         GOTO(interpret, req->rq_status);
665
666                 if (req->rq_net_err && !req->rq_timedout)
667                         ptlrpc_expire_one_request(req); 
668
669                 if (req->rq_err || req->rq_ptlrpcs_err) {
670                         ptlrpc_unregister_reply(req);
671                         if (req->rq_status == 0)
672                                 req->rq_status = req->rq_err ? -EIO : -EPERM;
673                         req->rq_phase = RQ_PHASE_INTERPRET;
674
675                         spin_lock_irqsave(&imp->imp_lock, flags);
676                         list_del_init(&req->rq_list);
677                         spin_unlock_irqrestore(&imp->imp_lock, flags);
678
679                         GOTO(interpret, req->rq_status);
680                 }
681
682                 /* ptlrpc_queue_wait->l_wait_event guarantees that rq_intr
683                  * will only be set after rq_timedout, but the oig waiting
684                  * path sets rq_intr irrespective of whether ptlrpcd has
685                  * seen a timeout.  our policy is to only interpret 
686                  * interrupted rpcs after they have timed out */
687                 if (req->rq_intr && (req->rq_timedout || req->rq_waiting)) {
688                         /* NB could be on delayed list */
689                         ptlrpc_unregister_reply(req);
690                         req->rq_status = -EINTR;
691                         req->rq_phase = RQ_PHASE_INTERPRET;
692
693                         spin_lock_irqsave(&imp->imp_lock, flags);
694                         list_del_init(&req->rq_list);
695                         spin_unlock_irqrestore(&imp->imp_lock, flags);
696
697                         GOTO(interpret, req->rq_status);
698                 }
699
700                 if (req->rq_phase == RQ_PHASE_RPC) {
701                         if (req->rq_timedout||req->rq_waiting||req->rq_resend) {
702                                 int status;
703
704                                 ptlrpc_unregister_reply(req);
705
706                                 spin_lock_irqsave(&imp->imp_lock, flags);
707
708                                 if (ptlrpc_import_delay_req(imp, req, &status)){
709                                         spin_unlock_irqrestore(&imp->imp_lock,
710                                                                flags);
711                                         continue;
712                                 }
713
714                                 list_del_init(&req->rq_list);
715                                 if (status != 0)  {
716                                         req->rq_status = status;
717                                         req->rq_phase = RQ_PHASE_INTERPRET;
718                                         spin_unlock_irqrestore(&imp->imp_lock,
719                                                                flags);
720                                         GOTO(interpret, req->rq_status);
721                                 }
722                                 if (req->rq_no_resend) {
723                                         req->rq_status = -ENOTCONN;
724                                         req->rq_phase = RQ_PHASE_INTERPRET;
725                                         spin_unlock_irqrestore(&imp->imp_lock,
726                                                                flags);
727                                         GOTO(interpret, req->rq_status);
728                                 }
729                                 list_add_tail(&req->rq_list,
730                                               &imp->imp_sending_list);
731
732                                 spin_unlock_irqrestore(&imp->imp_lock, flags);
733
734                                 req->rq_waiting = 0;
735                                 if (req->rq_resend) {
736                                         if (!req->rq_ptlrpcs_restart)
737                                                 lustre_msg_add_flags(
738                                                         req->rq_reqmsg,
739                                                         MSG_RESENT);
740                                         if (req->rq_bulk) {
741                                                 __u64 old_xid = req->rq_xid;
742
743                                                 ptlrpc_unregister_bulk (req);
744
745                                                 /* ensure previous bulk fails */
746                                                 req->rq_xid = ptlrpc_next_xid();
747                                                 CDEBUG(D_HA, "resend bulk "
748                                                        "old x"LPU64
749                                                        " new x"LPU64"\n",
750                                                        old_xid, req->rq_xid);
751                                         }
752                                 }
753
754                                 rc = ptl_send_rpc(req);
755                                 if (rc) {
756                                         DEBUG_REQ(D_HA, req, "send failed (%d)",
757                                                   rc);
758                                         force_timer_recalc = 1;
759                                         req->rq_net_err = 1;
760                                 }
761                                 /* need to reset the timeout */
762                                 force_timer_recalc = 1;
763                         }
764
765                         /* Still waiting for a reply? */
766                         if (ptlrpc_client_receiving_reply(req))
767                                 continue;
768
769                         /* Did we actually receive a reply? */
770                         if (!ptlrpc_client_replied(req))
771                                 continue;
772
773                         spin_lock_irqsave(&imp->imp_lock, flags);
774                         list_del_init(&req->rq_list);
775                         spin_unlock_irqrestore(&imp->imp_lock, flags);
776
777                         req->rq_status = after_reply(req);
778                         if (req->rq_resend) {
779                                 /* Add this req to the delayed list so
780                                    it can be errored if the import is
781                                    evicted after recovery. */
782                                 spin_lock_irqsave (&req->rq_lock, flags);
783                                 list_add_tail(&req->rq_list, 
784                                               &imp->imp_delayed_list);
785                                 spin_unlock_irqrestore(&req->rq_lock, flags);
786                                 continue;
787                         }
788
789                         /* If there is no bulk associated with this request,
790                          * then we're done and should let the interpreter
791                          * process the reply.  Similarly if the RPC returned
792                          * an error, and therefore the bulk will never arrive.
793                          */
794                         if (req->rq_bulk == NULL || req->rq_status != 0) {
795                                 req->rq_phase = RQ_PHASE_INTERPRET;
796                                 GOTO(interpret, req->rq_status);
797                         }
798
799                         req->rq_phase = RQ_PHASE_BULK;
800                 }
801
802                 LASSERT(req->rq_phase == RQ_PHASE_BULK);
803                 if (ptlrpc_bulk_active(req->rq_bulk))
804                         continue;
805
806                 if (!req->rq_bulk->bd_success) {
807                         /* The RPC reply arrived OK, but the bulk screwed
808                          * up!  Dead wierd since the server told us the RPC
809                          * was good after getting the REPLY for her GET or
810                          * the ACK for her PUT. */
811                         DEBUG_REQ(D_ERROR, req, "bulk transfer failed");
812                         LBUG();
813                 }
814
815                 req->rq_phase = RQ_PHASE_INTERPRET;
816
817         interpret:
818                 LASSERT(req->rq_phase == RQ_PHASE_INTERPRET);
819                 LASSERT(!req->rq_receiving_reply);
820
821                 ptlrpc_unregister_reply(req);
822                 if (req->rq_bulk != NULL)
823                         ptlrpc_unregister_bulk (req);
824
825                 req->rq_phase = RQ_PHASE_COMPLETE;
826
827                 if (req->rq_interpret_reply != NULL) {
828                         int (*interpreter)(struct ptlrpc_request *,void *,int) =
829                                 req->rq_interpret_reply;
830                         req->rq_status = interpreter(req, &req->rq_async_args,
831                                                      req->rq_status);
832                 }
833
834                 CDEBUG(D_RPCTRACE, "Completed RPC pname:cluuid:pid:xid:ni:nid:"
835                        "opc %s:%s:%d:"LPU64":%s:%s:%d\n", current->comm,
836                        imp->imp_obd->obd_uuid.uuid, req->rq_reqmsg->status,
837                        req->rq_xid,
838                        imp->imp_connection->c_peer.peer_ni->pni_name,
839                        ptlrpc_peernid2str(&imp->imp_connection->c_peer, str),
840                        req->rq_reqmsg->opc);
841
842                 set->set_remaining--;
843
844                 atomic_dec(&imp->imp_inflight);
845                 wake_up(&imp->imp_recovery_waitq);
846         }
847
848         /* If we hit an error, we want to recover promptly. */
849         RETURN(set->set_remaining == 0 || force_timer_recalc);
850 }
851
852 int ptlrpc_expire_one_request(struct ptlrpc_request *req)
853 {
854         unsigned long      flags;
855         struct obd_import *imp = req->rq_import;
856         int replied = 0;
857         ENTRY;
858
859         DEBUG_REQ(D_ERROR, req, "timeout (sent at %lu, %lus ago)",
860                   (long)req->rq_sent, LTIME_S(CURRENT_TIME) - req->rq_sent);
861
862         spin_lock_irqsave (&req->rq_lock, flags);
863         replied = req->rq_replied;
864         if (!replied)
865                 req->rq_timedout = 1;
866         spin_unlock_irqrestore (&req->rq_lock, flags);
867
868         if (replied)
869                 RETURN(0);
870
871         ptlrpc_unregister_reply (req);
872
873         if (obd_dump_on_timeout)
874                 portals_debug_dumplog();
875
876         if (req->rq_bulk != NULL)
877                 ptlrpc_unregister_bulk (req);
878
879         if (imp == NULL) {
880                 DEBUG_REQ(D_HA, req, "NULL import: already cleaned up?");
881                 RETURN(1);
882         }
883
884         /* The DLM server doesn't want recovery run on its imports. */
885         if (imp->imp_dlm_fake)
886                 RETURN(1);
887
888         /* If this request is for recovery or other primordial tasks,
889          * then error it out here. */
890         if (req->rq_send_state != LUSTRE_IMP_FULL ||
891             imp->imp_obd->obd_no_recov) {
892                 spin_lock_irqsave (&req->rq_lock, flags);
893                 req->rq_status = -ETIMEDOUT;
894                 req->rq_err = 1;
895                 spin_unlock_irqrestore (&req->rq_lock, flags);
896                 RETURN(1);
897         }
898
899         ptlrpc_fail_import(imp, req->rq_import_generation);
900
901         RETURN(0);
902 }
903
904 int ptlrpc_expired_set(void *data)
905 {
906         struct ptlrpc_request_set *set = data;
907         struct list_head          *tmp;
908         time_t                     now = LTIME_S(CURRENT_TIME);
909         ENTRY;
910
911         LASSERT(set != NULL);
912
913         /* A timeout expired; see which reqs it applies to... */
914         list_for_each (tmp, &set->set_requests) {
915                 struct ptlrpc_request *req =
916                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
917
918                 /* request in-flight? */
919                 if (!((req->rq_phase == RQ_PHASE_RPC && !req->rq_waiting 
920                        && !req->rq_resend) ||
921                       (req->rq_phase == RQ_PHASE_BULK)))
922                         continue;
923
924                 if (req->rq_timedout ||           /* already dealt with */
925                     req->rq_sent + req->rq_timeout > now) /* not expired */
926                         continue;
927
928                 /* deal with this guy */
929                 ptlrpc_expire_one_request (req);
930         }
931
932         /* When waiting for a whole set, we always to break out of the
933          * sleep so we can recalculate the timeout, or enable interrupts
934          * iff everyone's timed out.
935          */
936         RETURN(1);
937 }
938
939 void ptlrpc_mark_interrupted(struct ptlrpc_request *req)
940 {
941         unsigned long flags;
942         spin_lock_irqsave(&req->rq_lock, flags);
943         req->rq_intr = 1;
944         spin_unlock_irqrestore(&req->rq_lock, flags);
945 }
946
947 void ptlrpc_interrupted_set(void *data)
948 {
949         struct ptlrpc_request_set *set = data;
950         struct list_head *tmp;
951
952         LASSERT(set != NULL);
953         CERROR("INTERRUPTED SET %p\n", set);
954
955         list_for_each(tmp, &set->set_requests) {
956                 struct ptlrpc_request *req =
957                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
958
959                 if (req->rq_phase != RQ_PHASE_RPC)
960                         continue;
961
962                 ptlrpc_mark_interrupted(req);
963         }
964 }
965
966 int ptlrpc_set_next_timeout(struct ptlrpc_request_set *set)
967 {
968         struct list_head      *tmp;
969         time_t                 now = LTIME_S(CURRENT_TIME);
970         time_t                 deadline;
971         int                    timeout = 0;
972         struct ptlrpc_request *req;
973         ENTRY;
974
975         SIGNAL_MASK_ASSERT(); /* XXX BUG 1511 */
976
977         list_for_each(tmp, &set->set_requests) {
978                 req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
979
980                 /* request in-flight? */
981                 if (!((req->rq_phase == RQ_PHASE_RPC && !req->rq_waiting) ||
982                       (req->rq_phase == RQ_PHASE_BULK)))
983                         continue;
984
985                 if (req->rq_timedout)   /* already timed out */
986                         continue;
987
988                 deadline = req->rq_sent + req->rq_timeout;
989                 if (deadline <= now)    /* actually expired already */
990                         timeout = 1;    /* ASAP */
991                 else if (timeout == 0 || timeout > deadline - now)
992                         timeout = deadline - now;
993         }
994         RETURN(timeout);
995 }
996                 
997
998 int ptlrpc_set_wait(struct ptlrpc_request_set *set)
999 {
1000         struct list_head      *tmp;
1001         struct ptlrpc_request *req;
1002         struct l_wait_info     lwi;
1003         int                    rc, timeout;
1004         ENTRY;
1005
1006         LASSERT(!list_empty(&set->set_requests));
1007         list_for_each(tmp, &set->set_requests) {
1008                 req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
1009                 if (req->rq_phase == RQ_PHASE_NEW)
1010                         (void)ptlrpc_send_new_req(req);
1011         }
1012
1013         do {
1014                 timeout = ptlrpc_set_next_timeout(set);
1015
1016                 /* wait until all complete, interrupted, or an in-flight
1017                  * req times out */
1018                 CDEBUG(D_HA, "set %p going to sleep for %d seconds\n",
1019                        set, timeout);
1020                 lwi = LWI_TIMEOUT_INTR((timeout ? timeout : 1) * HZ,
1021                                        ptlrpc_expired_set,
1022                                        ptlrpc_interrupted_set, set);
1023                 rc = l_wait_event(set->set_waitq, ptlrpc_check_set(set), &lwi);
1024
1025                 LASSERT(rc == 0 || rc == -EINTR || rc == -ETIMEDOUT);
1026
1027                 /* -EINTR => all requests have been flagged rq_intr so next
1028                  * check completes.
1029                  * -ETIMEOUTD => someone timed out.  When all reqs have
1030                  * timed out, signals are enabled allowing completion with
1031                  * EINTR.
1032                  * I don't really care if we go once more round the loop in
1033                  * the error cases -eeb. */
1034         } while (rc != 0 || set->set_remaining != 0);
1035
1036         LASSERT(set->set_remaining == 0);
1037
1038         rc = 0;
1039         list_for_each(tmp, &set->set_requests) {
1040                 req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
1041
1042                 LASSERT(req->rq_phase == RQ_PHASE_COMPLETE);
1043                 if (req->rq_status != 0)
1044                         rc = req->rq_status;
1045         }
1046
1047         if (set->set_interpret != NULL) {
1048                 int (*interpreter)(struct ptlrpc_request_set *set,void *,int) =
1049                         set->set_interpret;
1050                 rc = interpreter (set, set->set_arg, rc);
1051         }
1052
1053         RETURN(rc);
1054 }
1055
1056 static void __ptlrpc_free_req(struct ptlrpc_request *request, int locked)
1057 {
1058         ENTRY;
1059         if (request == NULL) {
1060                 EXIT;
1061                 return;
1062         }
1063
1064         LASSERTF(!request->rq_receiving_reply, "req %p\n", request);
1065         LASSERTF(request->rq_rqbd == NULL, "req %p\n",request);/* client-side */
1066         LASSERTF(list_empty(&request->rq_list), "req %p\n", request);
1067         LASSERTF(list_empty(&request->rq_set_chain), "req %p\n", request);
1068         LASSERT(request->rq_cred);
1069
1070         /* We must take it off the imp_replay_list first.  Otherwise, we'll set
1071          * request->rq_reqmsg to NULL while osc_close is dereferencing it. */
1072         if (request->rq_import != NULL) {
1073                 unsigned long flags = 0;
1074                 if (!locked)
1075                         spin_lock_irqsave(&request->rq_import->imp_lock, flags);
1076                 list_del_init(&request->rq_replay_list);
1077                 if (!locked)
1078                         spin_unlock_irqrestore(&request->rq_import->imp_lock,
1079                                                flags);
1080         }
1081         LASSERTF(list_empty(&request->rq_replay_list), "req %p\n", request);
1082
1083         if (atomic_read(&request->rq_refcount) != 0) {
1084                 DEBUG_REQ(D_ERROR, request,
1085                           "freeing request with nonzero refcount");
1086                 LBUG();
1087         }
1088
1089         if (request->rq_repbuf != NULL)
1090                 ptlrpcs_cli_free_repbuf(request);
1091         if (request->rq_reqbuf != NULL)
1092                 ptlrpcs_cli_free_reqbuf(request);
1093
1094         if (request->rq_export != NULL) {
1095                 class_export_put(request->rq_export);
1096                 request->rq_export = NULL;
1097         }
1098         if (request->rq_import != NULL) {
1099                 class_import_put(request->rq_import);
1100                 request->rq_import = NULL;
1101         }
1102         if (request->rq_bulk != NULL)
1103                 ptlrpc_free_bulk(request->rq_bulk);
1104
1105         ptlrpcs_req_drop_cred(request);
1106         OBD_FREE(request, sizeof(*request));
1107         EXIT;
1108 }
1109
1110 void ptlrpc_free_req(struct ptlrpc_request *request)
1111 {
1112         __ptlrpc_free_req(request, 0);
1113 }
1114
1115 static int __ptlrpc_req_finished(struct ptlrpc_request *request, int locked);
1116 void ptlrpc_req_finished_with_imp_lock(struct ptlrpc_request *request)
1117 {
1118         LASSERT_SPIN_LOCKED(&request->rq_import->imp_lock);
1119         (void)__ptlrpc_req_finished(request, 1);
1120 }
1121
1122 static int __ptlrpc_req_finished(struct ptlrpc_request *request, int locked)
1123 {
1124         ENTRY;
1125         if (request == NULL)
1126                 RETURN(1);
1127
1128         if (request == LP_POISON ||
1129             request->rq_reqmsg == LP_POISON) {
1130                 CERROR("dereferencing freed request (bug 575)\n");
1131                 LBUG();
1132                 RETURN(1);
1133         }
1134
1135         DEBUG_REQ(D_INFO, request, "refcount now %u",
1136                   atomic_read(&request->rq_refcount) - 1);
1137
1138         if (atomic_dec_and_test(&request->rq_refcount)) {
1139                 __ptlrpc_free_req(request, locked);
1140                 RETURN(1);
1141         }
1142
1143         RETURN(0);
1144 }
1145
1146 void ptlrpc_req_finished(struct ptlrpc_request *request)
1147 {
1148         __ptlrpc_req_finished(request, 0);
1149 }
1150
1151 /* Disengage the client's reply buffer from the network
1152  * NB does _NOT_ unregister any client-side bulk.
1153  * IDEMPOTENT, but _not_ safe against concurrent callers.
1154  * The request owner (i.e. the thread doing the I/O) must call...
1155  */
1156 void ptlrpc_unregister_reply (struct ptlrpc_request *request)
1157 {
1158         int                rc;
1159         wait_queue_head_t *wq;
1160         struct l_wait_info lwi;
1161
1162         LASSERT(!in_interrupt ());             /* might sleep */
1163
1164         if (!ptlrpc_client_receiving_reply(request))
1165                 return;
1166
1167         PtlMDUnlink (request->rq_reply_md_h);
1168
1169         /* We have to l_wait_event() whatever the result, to give liblustre
1170          * a chance to run reply_in_callback() */
1171
1172         if (request->rq_set != NULL)
1173                 wq = &request->rq_set->set_waitq;
1174         else
1175                 wq = &request->rq_reply_waitq;
1176
1177         for (;;) {
1178                 /* Network access will complete in finite time but the HUGE
1179                  * timeout lets us CWARN for visibility of sluggish NALs */
1180                 lwi = LWI_TIMEOUT(300 * HZ, NULL, NULL);
1181                 rc = l_wait_event (*wq, !ptlrpc_client_receiving_reply(request), &lwi);
1182                 if (rc == 0)
1183                         return;
1184
1185                 LASSERT (rc == -ETIMEDOUT);
1186                 DEBUG_REQ(D_WARNING, request, "Unexpectedly long timeout");
1187         }
1188 }
1189
1190 /* caller must hold imp->imp_lock */
1191 void ptlrpc_free_committed(struct obd_import *imp)
1192 {
1193         struct list_head *tmp, *saved;
1194         struct ptlrpc_request *req;
1195         struct ptlrpc_request *last_req = NULL; /* temporary fire escape */
1196         ENTRY;
1197
1198         LASSERT(imp != NULL);
1199
1200         LASSERT_SPIN_LOCKED(&imp->imp_lock);
1201
1202         CDEBUG(D_HA, "%s: committing for last_committed "LPU64"\n",
1203                imp->imp_obd->obd_name, imp->imp_peer_committed_transno);
1204
1205         list_for_each_safe(tmp, saved, &imp->imp_replay_list) {
1206                 req = list_entry(tmp, struct ptlrpc_request, rq_replay_list);
1207
1208                 /* XXX ok to remove when 1357 resolved - rread 05/29/03  */
1209                 LASSERT(req != last_req);
1210                 last_req = req;
1211
1212                 if (req->rq_import_generation < imp->imp_generation) {
1213                         DEBUG_REQ(D_HA, req, "freeing request with old gen");
1214                         GOTO(free_req, 0);
1215                 }
1216
1217                 if (req->rq_replay) {
1218 #warning "REMOVE THIS CRAP ONCE 6403 SOLVED -bzzz"
1219                         struct mdc_open_data {
1220                                 struct obd_client_handle *mod_och;
1221                                 struct ptlrpc_request    *mod_open_req;
1222                                 struct ptlrpc_request    *mod_close_req;
1223                         };
1224                         struct mdc_open_data *mod = req->rq_cb_data;
1225                         if (mod == NULL || mod->mod_close_req == NULL) {
1226                                 DEBUG_REQ(D_HA, req, "keeping (FL_REPLAY)");
1227                                 continue;
1228                         }
1229                         DEBUG_REQ(D_HA, req, "keeping (FL_REPLAY), "
1230                                   "closed by x"LPD64"/t"LPD64,
1231                                   mod->mod_close_req->rq_xid,
1232                                   mod->mod_close_req->rq_repmsg ?
1233                                   mod->mod_close_req->rq_repmsg->transno : 0);
1234                         continue;
1235                 }
1236
1237                 /* not yet committed */
1238                 if (req->rq_transno > imp->imp_peer_committed_transno) {
1239                         DEBUG_REQ(D_HA, req, "stopping search");
1240                         break;
1241                 }
1242
1243                 DEBUG_REQ(D_HA, req, "committing (last_committed "LPU64")",
1244                           imp->imp_peer_committed_transno);
1245 free_req:
1246                 if (req->rq_commit_cb != NULL)
1247                         req->rq_commit_cb(req);
1248                 list_del_init(&req->rq_replay_list);
1249                 __ptlrpc_req_finished(req, 1);
1250         }
1251
1252         EXIT;
1253         return;
1254 }
1255
1256 void ptlrpc_cleanup_client(struct obd_import *imp)
1257 {
1258         ENTRY;
1259         EXIT;
1260         return;
1261 }
1262
1263 void ptlrpc_resend_req(struct ptlrpc_request *req)
1264 {
1265         unsigned long flags;
1266
1267         DEBUG_REQ(D_HA, req, "going to resend");
1268         req->rq_reqmsg->handle.cookie = 0;
1269         req->rq_status = -EAGAIN;
1270
1271         spin_lock_irqsave (&req->rq_lock, flags);
1272         req->rq_resend = 1;
1273         req->rq_net_err = 0;
1274         req->rq_timedout = 0;
1275         if (req->rq_bulk) {
1276                 __u64 old_xid = req->rq_xid;
1277                 
1278                 /* ensure previous bulk fails */
1279                 req->rq_xid = ptlrpc_next_xid();
1280                 CDEBUG(D_HA, "resend bulk old x"LPU64" new x"LPU64"\n",
1281                        old_xid, req->rq_xid);
1282         }
1283         ptlrpc_wake_client_req(req);
1284         spin_unlock_irqrestore (&req->rq_lock, flags);
1285 }
1286
1287 /* XXX: this function and rq_status are currently unused */
1288 void ptlrpc_restart_req(struct ptlrpc_request *req)
1289 {
1290         unsigned long flags;
1291
1292         DEBUG_REQ(D_HA, req, "restarting (possibly-)completed request");
1293         req->rq_status = -ERESTARTSYS;
1294
1295         spin_lock_irqsave (&req->rq_lock, flags);
1296         req->rq_restart = 1;
1297         req->rq_timedout = 0;
1298         ptlrpc_wake_client_req(req);
1299         spin_unlock_irqrestore (&req->rq_lock, flags);
1300 }
1301
1302 static int expired_request(void *data)
1303 {
1304         struct ptlrpc_request *req = data;
1305         ENTRY;
1306
1307         /* some failure can suspend regular timeouts */
1308         if (ptlrpc_check_suspend())
1309                 RETURN(1);
1310
1311         RETURN(ptlrpc_expire_one_request(req));
1312 }
1313
1314 static void interrupted_request(void *data)
1315 {
1316         unsigned long flags;
1317
1318         struct ptlrpc_request *req = data;
1319         DEBUG_REQ(D_HA, req, "request interrupted");
1320         spin_lock_irqsave (&req->rq_lock, flags);
1321         req->rq_intr = 1;
1322         spin_unlock_irqrestore (&req->rq_lock, flags);
1323 }
1324
1325 struct ptlrpc_request *ptlrpc_request_addref(struct ptlrpc_request *req)
1326 {
1327         ENTRY;
1328         atomic_inc(&req->rq_refcount);
1329         RETURN(req);
1330 }
1331
1332 void ptlrpc_retain_replayable_request(struct ptlrpc_request *req,
1333                                       struct obd_import *imp)
1334 {
1335         struct list_head *tmp;
1336
1337         LASSERT_SPIN_LOCKED(&imp->imp_lock);
1338
1339         /* clear this  for new requests that were resent as well
1340            as resent replayed requests. */
1341         lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT);
1342
1343         /* don't re-add requests that have been replayed */
1344         if (!list_empty(&req->rq_replay_list))
1345                 return;
1346
1347         lustre_msg_add_flags(req->rq_reqmsg, MSG_REPLAY);
1348
1349         LASSERT(imp->imp_replayable);
1350         /* Balanced in ptlrpc_free_committed, usually. */
1351         ptlrpc_request_addref(req);
1352         list_for_each_prev(tmp, &imp->imp_replay_list) {
1353                 struct ptlrpc_request *iter =
1354                         list_entry(tmp, struct ptlrpc_request, rq_replay_list);
1355
1356                 /* We may have duplicate transnos if we create and then
1357                  * open a file, or for closes retained if to match creating
1358                  * opens, so use req->rq_xid as a secondary key.
1359                  * (See bugs 684, 685, and 428.)
1360                  * XXX no longer needed, but all opens need transnos!
1361                  */
1362                 if (iter->rq_transno > req->rq_transno)
1363                         continue;
1364
1365                 if (iter->rq_transno == req->rq_transno) {
1366                         LASSERT(iter->rq_xid != req->rq_xid);
1367                         if (iter->rq_xid > req->rq_xid)
1368                                 continue;
1369                 }
1370
1371                 list_add(&req->rq_replay_list, &iter->rq_replay_list);
1372                 return;
1373         }
1374
1375         list_add_tail(&req->rq_replay_list, &imp->imp_replay_list);
1376 }
1377
1378 int ptlrpc_queue_wait(struct ptlrpc_request *req)
1379 {
1380         char str[PTL_NALFMT_SIZE];
1381         int rc = 0;
1382         int brc;
1383         struct l_wait_info lwi;
1384         struct obd_import *imp = req->rq_import;
1385         unsigned long flags;
1386         int timeout = 0;
1387         ENTRY;
1388
1389         LASSERT(req->rq_set == NULL);
1390         LASSERT(!req->rq_receiving_reply);
1391         atomic_inc(&imp->imp_inflight);
1392
1393         if (imp->imp_connection == NULL) {
1394                 CERROR("request on not connected import %s\n",
1395                         imp->imp_obd->obd_name);
1396                 RETURN(-EINVAL);
1397         }
1398
1399         /* for distributed debugging */
1400         req->rq_reqmsg->status = current->pid;
1401         LASSERT(imp->imp_obd != NULL);
1402         CDEBUG(D_RPCTRACE, "Sending RPC pname:cluuid:pid:xid:ni:nid:opc "
1403                "%s:%s:%d:"LPU64":%s:%s:%d\n", current->comm,
1404                imp->imp_obd->obd_uuid.uuid,
1405                req->rq_reqmsg->status, req->rq_xid,
1406                imp->imp_connection->c_peer.peer_ni->pni_name,
1407                ptlrpc_peernid2str(&imp->imp_connection->c_peer, str),
1408                req->rq_reqmsg->opc);
1409
1410         /* Mark phase here for a little debug help */
1411         req->rq_phase = RQ_PHASE_RPC;
1412
1413         spin_lock_irqsave(&imp->imp_lock, flags);
1414         req->rq_import_generation = imp->imp_generation;
1415 restart:
1416         if (ptlrpc_import_delay_req(imp, req, &rc)) {
1417                 list_del(&req->rq_list);
1418
1419                 list_add_tail(&req->rq_list, &imp->imp_delayed_list);
1420                 spin_unlock_irqrestore(&imp->imp_lock, flags);
1421
1422                 DEBUG_REQ(D_HA, req, "\"%s\" waiting for recovery: (%s != %s)",
1423                           current->comm, 
1424                           ptlrpc_import_state_name(req->rq_send_state), 
1425                           ptlrpc_import_state_name(imp->imp_state));
1426                 lwi = LWI_INTR(interrupted_request, req);
1427                 rc = l_wait_event(req->rq_reply_waitq,
1428                                   (req->rq_send_state == imp->imp_state ||
1429                                    req->rq_err),
1430                                   &lwi);
1431                 DEBUG_REQ(D_HA, req, "\"%s\" awake: (%s == %s or %d == 1)",
1432                           current->comm, 
1433                           ptlrpc_import_state_name(imp->imp_state), 
1434                           ptlrpc_import_state_name(req->rq_send_state),
1435                           req->rq_err);
1436
1437                 spin_lock_irqsave(&imp->imp_lock, flags);
1438                 list_del_init(&req->rq_list);
1439
1440                 if (req->rq_err) {
1441                         rc = -EIO;
1442                 } 
1443                 else if (req->rq_intr) {
1444                         rc = -EINTR;
1445                 }
1446                 else if (req->rq_no_resend) {
1447                         spin_unlock_irqrestore(&imp->imp_lock, flags);
1448                         GOTO(out, rc = -ETIMEDOUT);
1449                 }
1450                 else {
1451                         GOTO(restart, rc);
1452                 }
1453         } 
1454
1455         if (rc != 0) {
1456                 list_del_init(&req->rq_list);
1457                 spin_unlock_irqrestore(&imp->imp_lock, flags);
1458                 req->rq_status = rc; // XXX this ok?
1459                 GOTO(out, rc);
1460         }
1461
1462         if (req->rq_resend) {
1463                 if (!req->rq_ptlrpcs_restart)
1464                         lustre_msg_add_flags(req->rq_reqmsg, MSG_RESENT);
1465
1466                 if (req->rq_bulk != NULL)
1467                         ptlrpc_unregister_bulk (req);
1468
1469                 DEBUG_REQ(D_HA, req, "resending: ");
1470         }
1471
1472         /* XXX this is the same as ptlrpc_set_wait */
1473         LASSERT(list_empty(&req->rq_list));
1474         list_add_tail(&req->rq_list, &imp->imp_sending_list);
1475         spin_unlock_irqrestore(&imp->imp_lock, flags);
1476
1477         rc = ptl_send_rpc(req);
1478         if (rc) {
1479                 DEBUG_REQ(D_HA, req, "send failed (%d); recovering", rc);
1480                 timeout = 1;
1481         } else {
1482                 timeout = MAX(req->rq_timeout * HZ, 1);
1483                 DEBUG_REQ(D_NET, req, "-- sleeping for %d jiffies", timeout);
1484         }
1485 repeat:
1486         lwi = LWI_TIMEOUT_INTR(timeout, expired_request, interrupted_request,
1487                                req);
1488         rc = l_wait_event(req->rq_reply_waitq, ptlrpc_check_reply(req), &lwi);
1489         if (rc == -ETIMEDOUT && ptlrpc_check_and_wait_suspend(req))
1490                 goto repeat;
1491         DEBUG_REQ(D_NET, req, "-- done sleeping");
1492
1493         CDEBUG(D_RPCTRACE, "Completed RPC pname:cluuid:pid:xid:ni:nid:opc "
1494                "%s:%s:%d:"LPU64":%s:%s:%d\n", current->comm,
1495                imp->imp_obd->obd_uuid.uuid,
1496                req->rq_reqmsg->status, req->rq_xid,
1497                imp->imp_connection->c_peer.peer_ni->pni_name,
1498                ptlrpc_peernid2str(&imp->imp_connection->c_peer, str),
1499                req->rq_reqmsg->opc);
1500
1501         spin_lock_irqsave(&imp->imp_lock, flags);
1502         list_del_init(&req->rq_list);
1503         spin_unlock_irqrestore(&imp->imp_lock, flags);
1504
1505         /* If the reply was received normally, this just grabs the spinlock
1506          * (ensuring the reply callback has returned), sees that
1507          * req->rq_receiving_reply is clear and returns. */
1508         ptlrpc_unregister_reply (req);
1509
1510         if (req->rq_err)
1511                 GOTO(out, rc = -EIO);
1512
1513         if (req->rq_ptlrpcs_err)
1514                 GOTO(out, rc = -EPERM);
1515
1516         /* Resend if we need to, unless we were interrupted. */
1517         if (req->rq_resend && !req->rq_intr) {
1518                 /* ...unless we were specifically told otherwise. */
1519                 if (req->rq_no_resend)
1520                         GOTO(out, rc = -ETIMEDOUT);
1521                 spin_lock_irqsave(&imp->imp_lock, flags);
1522                 goto restart;
1523         }
1524
1525         if (req->rq_intr) {
1526                 /* Should only be interrupted if we timed out. */
1527                 if (!req->rq_timedout)
1528                         DEBUG_REQ(D_ERROR, req,
1529                                   "rq_intr set but rq_timedout not");
1530                 GOTO(out, rc = -EINTR);
1531         }
1532
1533         if (req->rq_timedout) {                 /* non-recoverable timeout */
1534                 GOTO(out, rc = -ETIMEDOUT);
1535         }
1536
1537         if (!req->rq_replied) {
1538                 /* How can this be? -eeb */
1539                 DEBUG_REQ(D_ERROR, req, "!rq_replied: ");
1540                 LBUG();
1541                 GOTO(out, rc = req->rq_status);
1542         }
1543
1544         rc = after_reply (req);
1545         /* NB may return +ve success rc */
1546         if (req->rq_resend) {
1547                 spin_lock_irqsave(&imp->imp_lock, flags);
1548                 goto restart;
1549         }
1550
1551  out:
1552         if (req->rq_bulk != NULL) {
1553                 if (rc >= 0) {                  
1554                         /* success so far.  Note that anything going wrong
1555                          * with bulk now, is EXTREMELY strange, since the
1556                          * server must have believed that the bulk
1557                          * tranferred OK before she replied with success to
1558                          * me. */
1559                         lwi = LWI_TIMEOUT(timeout, NULL, NULL);
1560                         brc = l_wait_event(req->rq_reply_waitq,
1561                                            !ptlrpc_bulk_active(req->rq_bulk),
1562                                            &lwi);
1563                         LASSERT(brc == 0 || brc == -ETIMEDOUT);
1564                         if (brc != 0) {
1565                                 LASSERT(brc == -ETIMEDOUT);
1566                                 DEBUG_REQ(D_ERROR, req, "bulk timed out");
1567                                 rc = brc;
1568                         } else if (!req->rq_bulk->bd_success) {
1569                                 DEBUG_REQ(D_ERROR, req, "bulk transfer failed");
1570                                 rc = -EIO;
1571                         }
1572                 }
1573                 if (rc < 0)
1574                         ptlrpc_unregister_bulk (req);
1575         }
1576
1577         LASSERT(!req->rq_receiving_reply);
1578         req->rq_phase = RQ_PHASE_INTERPRET;
1579
1580         atomic_dec(&imp->imp_inflight);
1581         wake_up(&imp->imp_recovery_waitq);
1582         RETURN(rc);
1583 }
1584
1585 struct ptlrpc_replay_async_args {
1586         int praa_old_state;
1587         int praa_old_status;
1588 };
1589
1590 static int ptlrpc_replay_interpret(struct ptlrpc_request *req,
1591                                     void * data, int rc)
1592 {
1593         struct ptlrpc_replay_async_args *aa = data;
1594         struct obd_import *imp = req->rq_import;
1595         unsigned long flags;
1596
1597         atomic_dec(&imp->imp_replay_inflight);
1598         
1599         if (!req->rq_replied) {
1600                 CERROR("request replay timed out, restarting recovery\n");
1601                 GOTO(out, rc = -ETIMEDOUT);
1602         }
1603
1604 #if SWAB_PARANOIA
1605         /* Clear reply swab mask; this is a new reply in sender's byte order */
1606         req->rq_rep_swab_mask = 0;
1607 #endif
1608         LASSERT (req->rq_nob_received <= req->rq_repbuf_len);
1609         rc = lustre_unpack_msg(req->rq_repmsg, req->rq_replen);
1610         if (rc) {
1611                 CERROR("unpack_rep failed: %d\n", rc);
1612                 GOTO(out, rc = -EPROTO);
1613         }
1614
1615         if (req->rq_repmsg->type == PTL_RPC_MSG_ERR && 
1616             req->rq_repmsg->status == -ENOTCONN) 
1617                 GOTO(out, rc = req->rq_repmsg->status);
1618
1619         /* The transno had better not change over replay. */
1620         LASSERT(req->rq_reqmsg->transno == req->rq_repmsg->transno);
1621
1622         DEBUG_REQ(D_HA, req, "got rep");
1623
1624         /* let the callback do fixups, possibly including in the request */
1625         if (req->rq_replay_cb)
1626                 req->rq_replay_cb(req);
1627
1628         if (req->rq_replied && req->rq_repmsg->status != aa->praa_old_status) {
1629                 DEBUG_REQ(D_HA, req, "status %d, old was %d",
1630                           req->rq_repmsg->status, aa->praa_old_status);
1631         } else {
1632                 /* Put it back for re-replay. */
1633                 req->rq_repmsg->status = aa->praa_old_status;
1634         }
1635
1636         spin_lock_irqsave(&imp->imp_lock, flags);
1637         imp->imp_last_replay_transno = req->rq_transno;
1638         spin_unlock_irqrestore(&imp->imp_lock, flags);
1639
1640         /* continue with recovery */
1641         rc = ptlrpc_import_recovery_state_machine(imp);
1642  out:
1643         req->rq_send_state = aa->praa_old_state;
1644         
1645         if (rc != 0)
1646                 /* this replay failed, so restart recovery */
1647                 ptlrpc_connect_import(imp, NULL);
1648
1649         RETURN(rc);
1650 }
1651
1652
1653 int ptlrpc_replay_req(struct ptlrpc_request *req)
1654 {
1655         struct ptlrpc_replay_async_args *aa;
1656         ENTRY;
1657
1658         LASSERT(req->rq_import->imp_state == LUSTRE_IMP_REPLAY);
1659
1660         /* Not handling automatic bulk replay yet (or ever?) */
1661         LASSERT(req->rq_bulk == NULL);
1662
1663         DEBUG_REQ(D_HA, req, "REPLAY");
1664
1665         LASSERT (sizeof (*aa) <= sizeof (req->rq_async_args));
1666         aa = (struct ptlrpc_replay_async_args *)&req->rq_async_args;
1667         memset(aa, 0, sizeof *aa);
1668
1669         /* Prepare request to be resent with ptlrpcd */
1670         aa->praa_old_state = req->rq_send_state;
1671         req->rq_send_state = LUSTRE_IMP_REPLAY;
1672         req->rq_phase = RQ_PHASE_NEW;
1673         aa->praa_old_status = req->rq_repmsg->status;
1674         req->rq_status = 0;
1675
1676         req->rq_interpret_reply = ptlrpc_replay_interpret;
1677         atomic_inc(&req->rq_import->imp_replay_inflight);
1678         ptlrpc_request_addref(req); /* ptlrpcd needs a ref */
1679
1680         ptlrpcd_add_req(req);
1681         RETURN(0);
1682 }
1683
1684 void ptlrpc_abort_inflight(struct obd_import *imp)
1685 {
1686         unsigned long flags;
1687         struct list_head *tmp, *n;
1688         ENTRY;
1689
1690         /* Make sure that no new requests get processed for this import.
1691          * ptlrpc_{queue,set}_wait must (and does) hold imp_lock while testing
1692          * this flag and then putting requests on sending_list or delayed_list.
1693          */
1694         spin_lock_irqsave(&imp->imp_lock, flags);
1695
1696         /* XXX locking?  Maybe we should remove each request with the list
1697          * locked?  Also, how do we know if the requests on the list are
1698          * being freed at this time?
1699          */
1700         list_for_each_safe(tmp, n, &imp->imp_sending_list) {
1701                 struct ptlrpc_request *req =
1702                         list_entry(tmp, struct ptlrpc_request, rq_list);
1703
1704                 DEBUG_REQ(D_HA, req, "inflight");
1705
1706                 spin_lock (&req->rq_lock);
1707                 if (req->rq_import_generation < imp->imp_generation) {
1708                         req->rq_err = 1;
1709                         ptlrpc_wake_client_req(req);
1710                 }
1711                 spin_unlock (&req->rq_lock);
1712         }
1713
1714         list_for_each_safe(tmp, n, &imp->imp_delayed_list) {
1715                 struct ptlrpc_request *req =
1716                         list_entry(tmp, struct ptlrpc_request, rq_list);
1717
1718                 DEBUG_REQ(D_HA, req, "aborting waiting req");
1719
1720                 spin_lock (&req->rq_lock);
1721                 if (req->rq_import_generation < imp->imp_generation) {
1722                         req->rq_err = 1;
1723                         ptlrpc_wake_client_req(req);
1724                 }
1725                 spin_unlock (&req->rq_lock);
1726         }
1727
1728         list_for_each_safe(tmp, n, &imp->imp_rawrpc_list) {
1729                 struct ptlrpc_request *req =
1730                         list_entry(tmp, struct ptlrpc_request, rq_list);
1731
1732                 DEBUG_REQ(D_HA, req, "aborting raw rpc");
1733
1734                 spin_lock(&req->rq_lock);
1735                 req->rq_err = 1;
1736                 ptlrpc_wake_client_req(req);
1737                 spin_unlock(&req->rq_lock);
1738         }
1739
1740         /* Last chance to free reqs left on the replay list, but we
1741          * will still leak reqs that haven't comitted.  */
1742         if (imp->imp_replayable)
1743                 ptlrpc_free_committed(imp);
1744
1745         spin_unlock_irqrestore(&imp->imp_lock, flags);
1746
1747         EXIT;
1748 }
1749
1750 static __u64 ptlrpc_last_xid = 0;
1751 static spinlock_t ptlrpc_last_xid_lock = SPIN_LOCK_UNLOCKED;
1752
1753 __u64 ptlrpc_next_xid(void)
1754 {
1755         __u64 tmp;
1756         spin_lock(&ptlrpc_last_xid_lock);
1757         tmp = ++ptlrpc_last_xid;
1758         spin_unlock(&ptlrpc_last_xid_lock);
1759         return tmp;
1760 }
1761
1762