Whamcloud - gitweb
file upcall_cache.c was initially added on branch b1_4_bug3389.
[fs/lustre-release.git] / lustre / ptlrpc / client.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  *
21  */
22
23 #define DEBUG_SUBSYSTEM S_RPC
24 #ifndef __KERNEL__
25 #include <errno.h>
26 #include <signal.h>
27 #include <liblustre.h>
28 #endif
29
30 #include <linux/obd_support.h>
31 #include <linux/obd_class.h>
32 #include <linux/lustre_lib.h>
33 #include <linux/lustre_ha.h>
34 #include <linux/lustre_import.h>
35 #include <linux/lustre_sec.h>
36
37 #include "ptlrpc_internal.h"
38
39 void ptlrpc_init_client(int req_portal, int rep_portal, char *name,
40                         struct ptlrpc_client *cl)
41 {
42         cl->cli_request_portal = req_portal;
43         cl->cli_reply_portal   = rep_portal;
44         cl->cli_name           = name;
45 }
46
47 struct ptlrpc_connection *ptlrpc_uuid_to_connection(struct obd_uuid *uuid)
48 {
49         struct ptlrpc_connection *c;
50         struct ptlrpc_peer peer;
51         int err;
52
53         err = ptlrpc_uuid_to_peer(uuid, &peer);
54         if (err != 0) {
55                 CERROR("cannot find peer %s!\n", uuid->uuid);
56                 return NULL;
57         }
58
59         c = ptlrpc_get_connection(&peer, uuid);
60         if (c) {
61                 memcpy(c->c_remote_uuid.uuid,
62                        uuid->uuid, sizeof(c->c_remote_uuid.uuid));
63         }
64
65         CDEBUG(D_INFO, "%s -> %p\n", uuid->uuid, c);
66
67         return c;
68 }
69
70 void ptlrpc_readdress_connection(struct ptlrpc_connection *conn,
71                                  struct obd_uuid *uuid)
72 {
73         struct ptlrpc_peer peer;
74         int err;
75
76         err = ptlrpc_uuid_to_peer(uuid, &peer);
77         if (err != 0) {
78                 CERROR("cannot find peer %s!\n", uuid->uuid);
79                 return;
80         }
81
82         memcpy(&conn->c_peer, &peer, sizeof (peer));
83         return;
84 }
85
86 static inline struct ptlrpc_bulk_desc *new_bulk(int npages, int type, int portal)
87 {
88         struct ptlrpc_bulk_desc *desc;
89
90         OBD_ALLOC(desc, offsetof (struct ptlrpc_bulk_desc, bd_iov[npages]));
91         if (!desc)
92                 return NULL;
93
94         spin_lock_init(&desc->bd_lock);
95         init_waitqueue_head(&desc->bd_waitq);
96         desc->bd_max_iov = npages;
97         desc->bd_iov_count = 0;
98         desc->bd_md_h = PTL_INVALID_HANDLE;
99         desc->bd_portal = portal;
100         desc->bd_type = type;
101         
102         return desc;
103 }
104
105 struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_imp (struct ptlrpc_request *req,
106                                                int npages, int type, int portal)
107 {
108         struct obd_import *imp = req->rq_import;
109         struct ptlrpc_bulk_desc *desc;
110
111         LASSERT(type == BULK_PUT_SINK || type == BULK_GET_SOURCE);
112         desc = new_bulk(npages, type, portal);
113         if (desc == NULL)
114                 RETURN(NULL);
115
116         desc->bd_import_generation = req->rq_import_generation;
117         desc->bd_import = class_import_get(imp);
118         desc->bd_req = req;
119
120         desc->bd_cbid.cbid_fn  = client_bulk_callback;
121         desc->bd_cbid.cbid_arg = desc;
122
123         /* This makes req own desc, and free it when she frees herself */
124         req->rq_bulk = desc;
125
126         return desc;
127 }
128
129 struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_exp (struct ptlrpc_request *req,
130                                                int npages, int type, int portal)
131 {
132         struct obd_export *exp = req->rq_export;
133         struct ptlrpc_bulk_desc *desc;
134
135         LASSERT(type == BULK_PUT_SOURCE || type == BULK_GET_SINK);
136
137         desc = new_bulk(npages, type, portal);
138         if (desc == NULL)
139                 RETURN(NULL);
140
141         desc->bd_export = class_export_get(exp);
142         desc->bd_req = req;
143
144         desc->bd_cbid.cbid_fn  = server_bulk_callback;
145         desc->bd_cbid.cbid_arg = desc;
146
147         /* NB we don't assign rq_bulk here; server-side requests are
148          * re-used, and the handler frees the bulk desc explicitly. */
149
150         return desc;
151 }
152
153 void ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc,
154                            struct page *page, int pageoffset, int len)
155 {
156         LASSERT(desc->bd_iov_count < desc->bd_max_iov);
157         LASSERT(page != NULL);
158         LASSERT(pageoffset >= 0);
159         LASSERT(len > 0);
160         LASSERT(pageoffset + len <= PAGE_SIZE);
161
162         desc->bd_nob += len;
163
164         ptlrpc_add_bulk_page(desc, page, pageoffset, len);
165 }
166
167 void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *desc)
168 {
169         ENTRY;
170
171         LASSERT(desc != NULL);
172         LASSERT(desc->bd_iov_count != LI_POISON); /* not freed already */
173         LASSERT(!desc->bd_network_rw);         /* network hands off or */
174         LASSERT((desc->bd_export != NULL) ^ (desc->bd_import != NULL));
175         if (desc->bd_export)
176                 class_export_put(desc->bd_export);
177         else
178                 class_import_put(desc->bd_import);
179
180         OBD_FREE(desc, offsetof(struct ptlrpc_bulk_desc, 
181                                 bd_iov[desc->bd_max_iov]));
182         EXIT;
183 }
184
185 /* FIXME prep_req now should return error code other than NULL. but
186  * this is called everywhere :(
187  */
188 struct ptlrpc_request *ptlrpc_prep_req(struct obd_import *imp, __u32 version,
189                                        int opcode, int count, int *lengths,
190                                        char **bufs)
191 {
192         struct ptlrpc_request *request;
193         int rc;
194         ENTRY;
195
196         LASSERT((unsigned long)imp > 0x1000);
197
198         OBD_ALLOC(request, sizeof(*request));
199         if (!request) {
200                 CERROR("request allocation out of memory\n");
201                 RETURN(NULL);
202         }
203
204         request->rq_import = class_import_get(imp);
205
206         rc = ptlrpcs_req_get_cred(request);
207         if (rc) {
208                 CDEBUG(D_SEC, "failed to get credential\n");
209                 GOTO(out_free, rc);
210         }
211
212         /* just a try on refresh, but we proceed even if it failed */
213         rc = ptlrpcs_cred_refresh(request->rq_cred);
214         if (!ptlrpcs_cred_is_uptodate(request->rq_cred)) {
215                 CERROR("req %p: failed to refresh cred %p, rc %d, continue\n",
216                        request, request->rq_cred, rc);
217         }
218
219         rc = lustre_pack_request(request, count, lengths, bufs);
220         if (rc) {
221                 CERROR("cannot pack request %d\n", rc);
222                 GOTO(out_cred, rc);
223         }
224         request->rq_reqmsg->version |= version;
225
226         if (imp->imp_server_timeout)
227                 request->rq_timeout = obd_timeout / 2;
228         else
229                 request->rq_timeout = obd_timeout;
230
231         request->rq_send_state = LUSTRE_IMP_FULL;
232         request->rq_type = PTL_RPC_MSG_REQUEST;
233
234         request->rq_req_cbid.cbid_fn  = request_out_callback;
235         request->rq_req_cbid.cbid_arg = request;
236
237         request->rq_reply_cbid.cbid_fn  = reply_in_callback;
238         request->rq_reply_cbid.cbid_arg = request;
239         
240         request->rq_phase = RQ_PHASE_NEW;
241
242         /* XXX FIXME bug 249 */
243         request->rq_request_portal = imp->imp_client->cli_request_portal;
244         request->rq_reply_portal = imp->imp_client->cli_reply_portal;
245
246         spin_lock_init(&request->rq_lock);
247         INIT_LIST_HEAD(&request->rq_list);
248         INIT_LIST_HEAD(&request->rq_replay_list);
249         INIT_LIST_HEAD(&request->rq_set_chain);
250         init_waitqueue_head(&request->rq_reply_waitq);
251         request->rq_xid = ptlrpc_next_xid();
252         atomic_set(&request->rq_refcount, 1);
253
254         request->rq_reqmsg->opc = opcode;
255         request->rq_reqmsg->flags = 0;
256         RETURN(request);
257 out_cred:
258         ptlrpcs_req_drop_cred(request);
259 out_free:
260         class_import_put(imp);
261         OBD_FREE(request, sizeof(*request));
262         RETURN(NULL);
263 }
264
265 struct ptlrpc_request_set *ptlrpc_prep_set(void)
266 {
267         struct ptlrpc_request_set *set;
268
269         OBD_ALLOC(set, sizeof *set);
270         if (!set)
271                 RETURN(NULL);
272         INIT_LIST_HEAD(&set->set_requests);
273         init_waitqueue_head(&set->set_waitq);
274         set->set_remaining = 0;
275         spin_lock_init(&set->set_new_req_lock);
276         INIT_LIST_HEAD(&set->set_new_requests);
277
278         RETURN(set);
279 }
280
281 /* Finish with this set; opposite of prep_set. */
282 void ptlrpc_set_destroy(struct ptlrpc_request_set *set)
283 {
284         struct list_head *tmp;
285         struct list_head *next;
286         int               expected_phase;
287         int               n = 0;
288         ENTRY;
289
290         /* Requests on the set should either all be completed, or all be new */
291         expected_phase = (set->set_remaining == 0) ?
292                          RQ_PHASE_COMPLETE : RQ_PHASE_NEW;
293         list_for_each (tmp, &set->set_requests) {
294                 struct ptlrpc_request *req =
295                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
296
297                 LASSERT(req->rq_phase == expected_phase);
298                 n++;
299         }
300
301         LASSERT(set->set_remaining == 0 || set->set_remaining == n);
302
303         list_for_each_safe(tmp, next, &set->set_requests) {
304                 struct ptlrpc_request *req =
305                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
306                 list_del_init(&req->rq_set_chain);
307
308                 LASSERT(req->rq_phase == expected_phase);
309
310                 if (req->rq_phase == RQ_PHASE_NEW) {
311
312                         if (req->rq_interpret_reply != NULL) {
313                                 int (*interpreter)(struct ptlrpc_request *,
314                                                    void *, int) =
315                                         req->rq_interpret_reply;
316
317                                 /* higher level (i.e. LOV) failed;
318                                  * let the sub reqs clean up */
319                                 req->rq_status = -EBADR;
320                                 interpreter(req, &req->rq_async_args,
321                                             req->rq_status);
322                         }
323                         set->set_remaining--;
324                 }
325
326                 req->rq_set = NULL;
327                 ptlrpc_req_finished (req);
328         }
329
330         LASSERT(set->set_remaining == 0);
331
332         OBD_FREE(set, sizeof(*set));
333         EXIT;
334 }
335
336 void ptlrpc_set_add_req(struct ptlrpc_request_set *set,
337                         struct ptlrpc_request *req)
338 {
339         /* The set takes over the caller's request reference */
340         list_add_tail(&req->rq_set_chain, &set->set_requests);
341         req->rq_set = set;
342         set->set_remaining++;
343         atomic_inc(&req->rq_import->imp_inflight);
344 }
345
346 /* lock so many callers can add things, the context that owns the set
347  * is supposed to notice these and move them into the set proper. */
348 void ptlrpc_set_add_new_req(struct ptlrpc_request_set *set,
349                             struct ptlrpc_request *req)
350 {
351         unsigned long flags;
352         spin_lock_irqsave(&set->set_new_req_lock, flags);
353         /* The set takes over the caller's request reference */
354         list_add_tail(&req->rq_set_chain, &set->set_new_requests);
355         req->rq_set = set;
356         spin_unlock_irqrestore(&set->set_new_req_lock, flags);
357 }
358
359 /*
360  * Based on the current state of the import, determine if the request
361  * can be sent, is an error, or should be delayed.
362  *
363  * Returns true if this request should be delayed. If false, and
364  * *status is set, then the request can not be sent and *status is the
365  * error code.  If false and status is 0, then request can be sent.
366  *
367  * The imp->imp_lock must be held.
368  */
369 static int ptlrpc_import_delay_req(struct obd_import *imp, 
370                                    struct ptlrpc_request *req, int *status)
371 {
372         int delay = 0;
373         ENTRY;
374
375         LASSERT (status != NULL);
376         *status = 0;
377
378         if (imp->imp_state == LUSTRE_IMP_NEW) {
379                 DEBUG_REQ(D_ERROR, req, "Uninitialized import.");
380                 *status = -EIO;
381                 LBUG();
382         }
383         else if (imp->imp_state == LUSTRE_IMP_CLOSED) {
384                 DEBUG_REQ(D_ERROR, req, "IMP_CLOSED ");
385                 *status = -EIO;
386         }
387         /* allow CONNECT even if import is invalid */
388         else if (req->rq_send_state == LUSTRE_IMP_CONNECTING &&
389                  imp->imp_state == LUSTRE_IMP_CONNECTING) {
390                 ;
391         }
392         /*
393          * If the import has been invalidated (such as by an OST failure), the
394          * request must fail with -EIO.  
395          */
396         else if (imp->imp_invalid) {
397                 DEBUG_REQ(D_ERROR, req, "IMP_INVALID");
398                 *status = -EIO;
399         } 
400         else if (req->rq_import_generation != imp->imp_generation) {
401                 DEBUG_REQ(D_ERROR, req, "req wrong generation:");
402                 *status = -EIO;
403         } 
404         else if (req->rq_send_state != imp->imp_state) {
405                 if (imp->imp_obd->obd_no_recov || imp->imp_dlm_fake 
406                     || req->rq_no_delay) 
407                         *status = -EWOULDBLOCK;
408                 else
409                         delay = 1;
410         }
411
412         RETURN(delay);
413 }
414
415 static int ptlrpc_check_reply(struct ptlrpc_request *req)
416 {
417         unsigned long flags;
418         int rc = 0;
419         ENTRY;
420
421         /* serialise with network callback */
422         spin_lock_irqsave (&req->rq_lock, flags);
423
424         if (req->rq_replied) {
425                 DEBUG_REQ(D_NET, req, "REPLIED:");
426                 GOTO(out, rc = 1);
427         }
428         
429         if (req->rq_net_err && !req->rq_timedout) {
430                 spin_unlock_irqrestore (&req->rq_lock, flags);
431                 rc = ptlrpc_expire_one_request(req); 
432                 spin_lock_irqsave (&req->rq_lock, flags);
433                 GOTO(out, rc);
434         }
435
436         if (req->rq_err) {
437                 DEBUG_REQ(D_ERROR, req, "ABORTED:");
438                 GOTO(out, rc = 1);
439         }
440
441         if (req->rq_resend) {
442                 DEBUG_REQ(D_ERROR, req, "RESEND:");
443                 GOTO(out, rc = 1);
444         }
445
446         if (req->rq_restart) {
447                 DEBUG_REQ(D_ERROR, req, "RESTART:");
448                 GOTO(out, rc = 1);
449         }
450         EXIT;
451  out:
452         spin_unlock_irqrestore (&req->rq_lock, flags);
453         DEBUG_REQ(D_NET, req, "rc = %d for", rc);
454         return rc;
455 }
456
457 static int ptlrpc_check_status(struct ptlrpc_request *req)
458 {
459         int err;
460         ENTRY;
461
462         err = req->rq_repmsg->status;
463         if (req->rq_repmsg->type == PTL_RPC_MSG_ERR) {
464                 DEBUG_REQ(D_ERROR, req, "type == PTL_RPC_MSG_ERR, err == %d", 
465                           err);
466                 RETURN(err < 0 ? err : -EINVAL);
467         }
468
469         if (err < 0) {
470                 DEBUG_REQ(D_INFO, req, "status is %d", err);
471         } else if (err > 0) {
472                 /* XXX: translate this error from net to host */
473                 DEBUG_REQ(D_INFO, req, "status is %d", err);
474         }
475
476         RETURN(err);
477 }
478
479 static int after_reply(struct ptlrpc_request *req)
480 {
481         unsigned long flags;
482         struct obd_import *imp = req->rq_import;
483         int rc;
484         ENTRY;
485
486         LASSERT(!req->rq_receiving_reply);
487
488         /* NB Until this point, the whole of the incoming message,
489          * including buflens, status etc is in the sender's byte order. */
490
491 #if SWAB_PARANOIA
492         /* Clear reply swab mask; this is a new reply in sender's byte order */
493         req->rq_rep_swab_mask = 0;
494 #endif
495         LASSERT (req->rq_nob_received <= req->rq_repbuf_len);
496         rc = ptlrpcs_cli_unwrap_reply(req);
497         if (rc) {
498                 CERROR("verify reply error: %d\n", rc);
499                 RETURN(rc);
500         }
501         /* unwrap_reply may request rpc be resend */
502         if (req->rq_ptlrpcs_restart) {
503                 req->rq_resend = 1;
504                 RETURN(0);
505         }
506
507         /* unwrap_reply will set rq_replen as the actual received
508          * lustre_msg length
509          */
510         rc = lustre_unpack_msg(req->rq_repmsg, req->rq_replen);
511         if (rc) {
512                 CERROR("unpack_rep failed: %d\n", rc);
513                 RETURN(-EPROTO);
514         }
515
516         if (req->rq_repmsg->type != PTL_RPC_MSG_REPLY &&
517             req->rq_repmsg->type != PTL_RPC_MSG_ERR) {
518                 CERROR("invalid packet type received (type=%u)\n",
519                        req->rq_repmsg->type);
520                 RETURN(-EPROTO);
521         }
522
523         rc = ptlrpc_check_status(req);
524
525         /* Either we've been evicted, or the server has failed for
526          * some reason. Try to reconnect, and if that fails, punt to the
527          * upcall. */
528         if (rc == -ENOTCONN) {
529                 if (req->rq_send_state != LUSTRE_IMP_FULL ||
530                     imp->imp_obd->obd_no_recov || imp->imp_dlm_fake) {
531                         RETURN(-ENOTCONN);
532                 }
533
534                 ptlrpc_request_handle_notconn(req);
535
536                 RETURN(rc);
537         }
538
539         /* Store transno in reqmsg for replay. */
540         req->rq_reqmsg->transno = req->rq_transno = req->rq_repmsg->transno;
541
542
543         if (req->rq_import->imp_replayable) {
544                 spin_lock_irqsave(&imp->imp_lock, flags);
545                 if (req->rq_transno != 0)
546                         ptlrpc_retain_replayable_request(req, imp);
547                 else if (req->rq_commit_cb != NULL) {
548                         spin_unlock_irqrestore(&imp->imp_lock, flags);
549                         req->rq_commit_cb(req);
550                         spin_lock_irqsave(&imp->imp_lock, flags);
551                 }
552
553                 if (req->rq_transno > imp->imp_max_transno)
554                         imp->imp_max_transno = req->rq_transno;
555
556                 /* Replay-enabled imports return commit-status information. */
557                 if (req->rq_repmsg->last_committed)
558                         imp->imp_peer_committed_transno =
559                                 req->rq_repmsg->last_committed;
560                 ptlrpc_free_committed(imp);
561                 spin_unlock_irqrestore(&imp->imp_lock, flags);
562         }
563
564         RETURN(rc);
565 }
566
567 static int ptlrpc_send_new_req(struct ptlrpc_request *req)
568 {
569         char                   str[PTL_NALFMT_SIZE];
570         struct obd_import     *imp;
571         unsigned long          flags;
572         int rc;
573         ENTRY;
574
575         LASSERT(req->rq_phase == RQ_PHASE_NEW);
576         req->rq_phase = RQ_PHASE_RPC;
577
578         imp = req->rq_import;
579         spin_lock_irqsave(&imp->imp_lock, flags);
580
581         req->rq_import_generation = imp->imp_generation;
582
583         if (ptlrpc_import_delay_req(imp, req, &rc)) {
584                 spin_lock (&req->rq_lock);
585                 req->rq_waiting = 1;
586                 spin_unlock (&req->rq_lock);
587
588                 DEBUG_REQ(D_HA, req, "req from PID %d waiting for recovery: "
589                           "(%s != %s)",
590                           req->rq_reqmsg->status, 
591                           ptlrpc_import_state_name(req->rq_send_state),
592                           ptlrpc_import_state_name(imp->imp_state));
593                 LASSERT(list_empty (&req->rq_list));
594
595                 list_add_tail(&req->rq_list, &imp->imp_delayed_list);
596                 spin_unlock_irqrestore(&imp->imp_lock, flags);
597                 RETURN(0);
598         }
599
600         if (rc != 0) {
601                 spin_unlock_irqrestore(&imp->imp_lock, flags);
602                 req->rq_status = rc;
603                 req->rq_phase = RQ_PHASE_INTERPRET;
604                 RETURN(rc);
605         }
606
607         /* XXX this is the same as ptlrpc_queue_wait */
608         LASSERT(list_empty(&req->rq_list));
609         list_add_tail(&req->rq_list, &imp->imp_sending_list);
610         spin_unlock_irqrestore(&imp->imp_lock, flags);
611
612         req->rq_reqmsg->status = current->pid;
613         CDEBUG(D_RPCTRACE, "Sending RPC pname:cluuid:pid:xid:ni:nid:opc"
614                " %s:%s:%d:"LPU64":%s:%s:%d\n", current->comm,
615                imp->imp_obd->obd_uuid.uuid, req->rq_reqmsg->status,
616                req->rq_xid,
617                imp->imp_connection->c_peer.peer_ni->pni_name,
618                ptlrpc_peernid2str(&imp->imp_connection->c_peer, str),
619                req->rq_reqmsg->opc);
620
621         rc = ptl_send_rpc(req);
622         if (rc) {
623                 DEBUG_REQ(D_HA, req, "send failed (%d); expect timeout", rc);
624                 req->rq_net_err = 1;
625                 RETURN(rc);
626         }
627         RETURN(0);
628 }
629
630 int ptlrpc_check_set(struct ptlrpc_request_set *set)
631 {
632         char str[PTL_NALFMT_SIZE];
633         unsigned long flags;
634         struct list_head *tmp;
635         int force_timer_recalc = 0;
636         ENTRY;
637
638         if (set->set_remaining == 0)
639                 RETURN(1);
640
641         list_for_each(tmp, &set->set_requests) {
642                 struct ptlrpc_request *req =
643                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
644                 struct obd_import *imp = req->rq_import;
645                 int rc = 0;
646
647                 if (req->rq_phase == RQ_PHASE_NEW &&
648                     ptlrpc_send_new_req(req)) {
649                         force_timer_recalc = 1;
650                 }
651
652                 if (!(req->rq_phase == RQ_PHASE_RPC ||
653                       req->rq_phase == RQ_PHASE_BULK ||
654                       req->rq_phase == RQ_PHASE_INTERPRET ||
655                       req->rq_phase == RQ_PHASE_COMPLETE)) {
656                         DEBUG_REQ(D_ERROR, req, "bad phase %x", req->rq_phase);
657                         LBUG();
658                 }
659
660                 if (req->rq_phase == RQ_PHASE_COMPLETE)
661                         continue;
662
663                 if (req->rq_phase == RQ_PHASE_INTERPRET)
664                         GOTO(interpret, req->rq_status);
665
666                 if (req->rq_net_err && !req->rq_timedout)
667                         ptlrpc_expire_one_request(req); 
668
669                 if (req->rq_err) {
670                         ptlrpc_unregister_reply(req);
671                         if (req->rq_status == 0)
672                                 req->rq_status = -EIO;
673                         req->rq_phase = RQ_PHASE_INTERPRET;
674
675                         spin_lock_irqsave(&imp->imp_lock, flags);
676                         list_del_init(&req->rq_list);
677                         spin_unlock_irqrestore(&imp->imp_lock, flags);
678
679                         GOTO(interpret, req->rq_status);
680                 }
681
682                 /* ptlrpc_queue_wait->l_wait_event guarantees that rq_intr
683                  * will only be set after rq_timedout, but the oig waiting
684                  * path sets rq_intr irrespective of whether ptlrpcd has
685                  * seen a timeout.  our policy is to only interpret 
686                  * interrupted rpcs after they have timed out */
687                 if (req->rq_intr && (req->rq_timedout || req->rq_waiting)) {
688                         /* NB could be on delayed list */
689                         ptlrpc_unregister_reply(req);
690                         req->rq_status = -EINTR;
691                         req->rq_phase = RQ_PHASE_INTERPRET;
692
693                         spin_lock_irqsave(&imp->imp_lock, flags);
694                         list_del_init(&req->rq_list);
695                         spin_unlock_irqrestore(&imp->imp_lock, flags);
696
697                         GOTO(interpret, req->rq_status);
698                 }
699
700                 if (req->rq_phase == RQ_PHASE_RPC) {
701                         if (req->rq_timedout||req->rq_waiting||req->rq_resend) {
702                                 int status;
703
704                                 ptlrpc_unregister_reply(req);
705
706                                 spin_lock_irqsave(&imp->imp_lock, flags);
707
708                                 if (ptlrpc_import_delay_req(imp, req, &status)){
709                                         spin_unlock_irqrestore(&imp->imp_lock,
710                                                                flags);
711                                         continue;
712                                 }
713
714                                 list_del_init(&req->rq_list);
715                                 if (status != 0)  {
716                                         req->rq_status = status;
717                                         req->rq_phase = RQ_PHASE_INTERPRET;
718                                         spin_unlock_irqrestore(&imp->imp_lock,
719                                                                flags);
720                                         GOTO(interpret, req->rq_status);
721                                 }
722                                 if (req->rq_no_resend) {
723                                         req->rq_status = -ENOTCONN;
724                                         req->rq_phase = RQ_PHASE_INTERPRET;
725                                         spin_unlock_irqrestore(&imp->imp_lock,
726                                                                flags);
727                                         GOTO(interpret, req->rq_status);
728                                 }
729                                 list_add_tail(&req->rq_list,
730                                               &imp->imp_sending_list);
731
732                                 spin_unlock_irqrestore(&imp->imp_lock, flags);
733
734                                 req->rq_waiting = 0;
735                                 if (req->rq_resend) {
736                                         if (!req->rq_ptlrpcs_restart)
737                                                 lustre_msg_add_flags(
738                                                         req->rq_reqmsg,
739                                                         MSG_RESENT);
740                                         if (req->rq_bulk) {
741                                                 __u64 old_xid = req->rq_xid;
742
743                                                 ptlrpc_unregister_bulk (req);
744
745                                                 /* ensure previous bulk fails */
746                                                 req->rq_xid = ptlrpc_next_xid();
747                                                 CDEBUG(D_HA, "resend bulk "
748                                                        "old x"LPU64
749                                                        " new x"LPU64"\n",
750                                                        old_xid, req->rq_xid);
751                                         }
752                                 }
753
754                                 rc = ptl_send_rpc(req);
755                                 if (rc) {
756                                         DEBUG_REQ(D_HA, req, "send failed (%d)",
757                                                   rc);
758                                         force_timer_recalc = 1;
759                                         req->rq_net_err = 1;
760                                 }
761                                 /* need to reset the timeout */
762                                 force_timer_recalc = 1;
763                         }
764
765                         /* Still waiting for a reply? */
766                         if (ptlrpc_client_receiving_reply(req))
767                                 continue;
768
769                         /* Did we actually receive a reply? */
770                         if (!ptlrpc_client_replied(req))
771                                 continue;
772
773                         spin_lock_irqsave(&imp->imp_lock, flags);
774                         list_del_init(&req->rq_list);
775                         spin_unlock_irqrestore(&imp->imp_lock, flags);
776
777                         req->rq_status = after_reply(req);
778                         if (req->rq_resend) {
779                                 /* Add this req to the delayed list so
780                                    it can be errored if the import is
781                                    evicted after recovery. */
782                                 spin_lock_irqsave (&req->rq_lock, flags);
783                                 list_add_tail(&req->rq_list, 
784                                               &imp->imp_delayed_list);
785                                 spin_unlock_irqrestore(&req->rq_lock, flags);
786                                 continue;
787                         }
788
789                         /* If there is no bulk associated with this request,
790                          * then we're done and should let the interpreter
791                          * process the reply.  Similarly if the RPC returned
792                          * an error, and therefore the bulk will never arrive.
793                          */
794                         if (req->rq_bulk == NULL || req->rq_status != 0) {
795                                 req->rq_phase = RQ_PHASE_INTERPRET;
796                                 GOTO(interpret, req->rq_status);
797                         }
798
799                         req->rq_phase = RQ_PHASE_BULK;
800                 }
801
802                 LASSERT(req->rq_phase == RQ_PHASE_BULK);
803                 if (ptlrpc_bulk_active(req->rq_bulk))
804                         continue;
805
806                 if (!req->rq_bulk->bd_success) {
807                         /* The RPC reply arrived OK, but the bulk screwed
808                          * up!  Dead wierd since the server told us the RPC
809                          * was good after getting the REPLY for her GET or
810                          * the ACK for her PUT. */
811                         DEBUG_REQ(D_ERROR, req, "bulk transfer failed");
812                         LBUG();
813                 }
814
815                 req->rq_phase = RQ_PHASE_INTERPRET;
816
817         interpret:
818                 LASSERT(req->rq_phase == RQ_PHASE_INTERPRET);
819                 LASSERT(!req->rq_receiving_reply);
820
821                 ptlrpc_unregister_reply(req);
822                 if (req->rq_bulk != NULL)
823                         ptlrpc_unregister_bulk (req);
824
825                 req->rq_phase = RQ_PHASE_COMPLETE;
826
827                 if (req->rq_interpret_reply != NULL) {
828                         int (*interpreter)(struct ptlrpc_request *,void *,int) =
829                                 req->rq_interpret_reply;
830                         req->rq_status = interpreter(req, &req->rq_async_args,
831                                                      req->rq_status);
832                 }
833
834                 CDEBUG(D_RPCTRACE, "Completed RPC pname:cluuid:pid:xid:ni:nid:"
835                        "opc %s:%s:%d:"LPU64":%s:%s:%d\n", current->comm,
836                        imp->imp_obd->obd_uuid.uuid, req->rq_reqmsg->status,
837                        req->rq_xid,
838                        imp->imp_connection->c_peer.peer_ni->pni_name,
839                        ptlrpc_peernid2str(&imp->imp_connection->c_peer, str),
840                        req->rq_reqmsg->opc);
841
842                 set->set_remaining--;
843
844                 atomic_dec(&imp->imp_inflight);
845                 wake_up(&imp->imp_recovery_waitq);
846         }
847
848         /* If we hit an error, we want to recover promptly. */
849         RETURN(set->set_remaining == 0 || force_timer_recalc);
850 }
851
852 int ptlrpc_expire_one_request(struct ptlrpc_request *req)
853 {
854         unsigned long      flags;
855         struct obd_import *imp = req->rq_import;
856         int replied = 0;
857         ENTRY;
858
859         DEBUG_REQ(D_ERROR, req, "timeout (sent at %lu, %lus ago)",
860                   (long)req->rq_sent, LTIME_S(CURRENT_TIME) - req->rq_sent);
861
862         spin_lock_irqsave (&req->rq_lock, flags);
863         replied = req->rq_replied;
864         if (!replied)
865                 req->rq_timedout = 1;
866         spin_unlock_irqrestore (&req->rq_lock, flags);
867
868         if (replied)
869                 RETURN(0);
870
871         ptlrpc_unregister_reply (req);
872
873         if (obd_dump_on_timeout)
874                 portals_debug_dumplog();
875
876         if (req->rq_bulk != NULL)
877                 ptlrpc_unregister_bulk (req);
878
879         if (imp == NULL) {
880                 DEBUG_REQ(D_HA, req, "NULL import: already cleaned up?");
881                 RETURN(1);
882         }
883
884         /* The DLM server doesn't want recovery run on its imports. */
885         if (imp->imp_dlm_fake)
886                 RETURN(1);
887
888         /* If this request is for recovery or other primordial tasks,
889          * then error it out here. */
890         if (req->rq_send_state != LUSTRE_IMP_FULL ||
891             imp->imp_obd->obd_no_recov) {
892                 spin_lock_irqsave (&req->rq_lock, flags);
893                 req->rq_status = -ETIMEDOUT;
894                 req->rq_err = 1;
895                 spin_unlock_irqrestore (&req->rq_lock, flags);
896                 RETURN(1);
897         }
898
899         ptlrpc_fail_import(imp, req->rq_import_generation);
900
901         RETURN(0);
902 }
903
904 int ptlrpc_expired_set(void *data)
905 {
906         struct ptlrpc_request_set *set = data;
907         struct list_head          *tmp;
908         time_t                     now = LTIME_S(CURRENT_TIME);
909         ENTRY;
910
911         LASSERT(set != NULL);
912
913         /* A timeout expired; see which reqs it applies to... */
914         list_for_each (tmp, &set->set_requests) {
915                 struct ptlrpc_request *req =
916                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
917
918                 /* request in-flight? */
919                 if (!((req->rq_phase == RQ_PHASE_RPC && !req->rq_waiting 
920                        && !req->rq_resend) ||
921                       (req->rq_phase == RQ_PHASE_BULK)))
922                         continue;
923
924                 if (req->rq_timedout ||           /* already dealt with */
925                     req->rq_sent + req->rq_timeout > now) /* not expired */
926                         continue;
927
928                 /* deal with this guy */
929                 ptlrpc_expire_one_request (req);
930         }
931
932         /* When waiting for a whole set, we always to break out of the
933          * sleep so we can recalculate the timeout, or enable interrupts
934          * iff everyone's timed out.
935          */
936         RETURN(1);
937 }
938
939 void ptlrpc_mark_interrupted(struct ptlrpc_request *req)
940 {
941         unsigned long flags;
942         spin_lock_irqsave(&req->rq_lock, flags);
943         req->rq_intr = 1;
944         spin_unlock_irqrestore(&req->rq_lock, flags);
945 }
946
947 void ptlrpc_interrupted_set(void *data)
948 {
949         struct ptlrpc_request_set *set = data;
950         struct list_head *tmp;
951
952         LASSERT(set != NULL);
953         CERROR("INTERRUPTED SET %p\n", set);
954
955         list_for_each(tmp, &set->set_requests) {
956                 struct ptlrpc_request *req =
957                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
958
959                 if (req->rq_phase != RQ_PHASE_RPC)
960                         continue;
961
962                 ptlrpc_mark_interrupted(req);
963         }
964 }
965
966 int ptlrpc_set_next_timeout(struct ptlrpc_request_set *set)
967 {
968         struct list_head      *tmp;
969         time_t                 now = LTIME_S(CURRENT_TIME);
970         time_t                 deadline;
971         int                    timeout = 0;
972         struct ptlrpc_request *req;
973         ENTRY;
974
975         SIGNAL_MASK_ASSERT(); /* XXX BUG 1511 */
976
977         list_for_each(tmp, &set->set_requests) {
978                 req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
979
980                 /* request in-flight? */
981                 if (!((req->rq_phase == RQ_PHASE_RPC && !req->rq_waiting) ||
982                       (req->rq_phase == RQ_PHASE_BULK)))
983                         continue;
984
985                 if (req->rq_timedout)   /* already timed out */
986                         continue;
987
988                 deadline = req->rq_sent + req->rq_timeout;
989                 if (deadline <= now)    /* actually expired already */
990                         timeout = 1;    /* ASAP */
991                 else if (timeout == 0 || timeout > deadline - now)
992                         timeout = deadline - now;
993         }
994         RETURN(timeout);
995 }
996                 
997
998 int ptlrpc_set_wait(struct ptlrpc_request_set *set)
999 {
1000         struct list_head      *tmp;
1001         struct ptlrpc_request *req;
1002         struct l_wait_info     lwi;
1003         int                    rc, timeout;
1004         ENTRY;
1005
1006         LASSERT(!list_empty(&set->set_requests));
1007         list_for_each(tmp, &set->set_requests) {
1008                 req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
1009                 if (req->rq_phase == RQ_PHASE_NEW)
1010                         (void)ptlrpc_send_new_req(req);
1011         }
1012
1013         do {
1014                 timeout = ptlrpc_set_next_timeout(set);
1015
1016                 /* wait until all complete, interrupted, or an in-flight
1017                  * req times out */
1018                 CDEBUG(D_HA, "set %p going to sleep for %d seconds\n",
1019                        set, timeout);
1020                 lwi = LWI_TIMEOUT_INTR((timeout ? timeout : 1) * HZ,
1021                                        ptlrpc_expired_set,
1022                                        ptlrpc_interrupted_set, set);
1023                 rc = l_wait_event(set->set_waitq, ptlrpc_check_set(set), &lwi);
1024
1025                 LASSERT(rc == 0 || rc == -EINTR || rc == -ETIMEDOUT);
1026
1027                 /* -EINTR => all requests have been flagged rq_intr so next
1028                  * check completes.
1029                  * -ETIMEOUTD => someone timed out.  When all reqs have
1030                  * timed out, signals are enabled allowing completion with
1031                  * EINTR.
1032                  * I don't really care if we go once more round the loop in
1033                  * the error cases -eeb. */
1034         } while (rc != 0 || set->set_remaining != 0);
1035
1036         LASSERT(set->set_remaining == 0);
1037
1038         rc = 0;
1039         list_for_each(tmp, &set->set_requests) {
1040                 req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
1041
1042                 LASSERT(req->rq_phase == RQ_PHASE_COMPLETE);
1043                 if (req->rq_status != 0)
1044                         rc = req->rq_status;
1045         }
1046
1047         if (set->set_interpret != NULL) {
1048                 int (*interpreter)(struct ptlrpc_request_set *set,void *,int) =
1049                         set->set_interpret;
1050                 rc = interpreter (set, set->set_arg, rc);
1051         }
1052
1053         RETURN(rc);
1054 }
1055
1056 static void __ptlrpc_free_req(struct ptlrpc_request *request, int locked)
1057 {
1058         ENTRY;
1059         if (request == NULL) {
1060                 EXIT;
1061                 return;
1062         }
1063
1064         LASSERTF(!request->rq_receiving_reply, "req %p\n", request);
1065         LASSERTF(request->rq_rqbd == NULL, "req %p\n",request);/* client-side */
1066         LASSERTF(list_empty(&request->rq_list), "req %p\n", request);
1067         LASSERTF(list_empty(&request->rq_set_chain), "req %p\n", request);
1068         LASSERT(request->rq_cred);
1069
1070         /* We must take it off the imp_replay_list first.  Otherwise, we'll set
1071          * request->rq_reqmsg to NULL while osc_close is dereferencing it. */
1072         if (request->rq_import != NULL) {
1073                 unsigned long flags = 0;
1074                 if (!locked)
1075                         spin_lock_irqsave(&request->rq_import->imp_lock, flags);
1076                 list_del_init(&request->rq_replay_list);
1077                 if (!locked)
1078                         spin_unlock_irqrestore(&request->rq_import->imp_lock,
1079                                                flags);
1080         }
1081         LASSERTF(list_empty(&request->rq_replay_list), "req %p\n", request);
1082
1083         if (atomic_read(&request->rq_refcount) != 0) {
1084                 DEBUG_REQ(D_ERROR, request,
1085                           "freeing request with nonzero refcount");
1086                 LBUG();
1087         }
1088
1089         if (request->rq_repbuf != NULL)
1090                 ptlrpcs_cli_free_repbuf(request);
1091         if (request->rq_reqbuf != NULL)
1092                 ptlrpcs_cli_free_reqbuf(request);
1093
1094         if (request->rq_export != NULL) {
1095                 class_export_put(request->rq_export);
1096                 request->rq_export = NULL;
1097         }
1098         if (request->rq_import != NULL) {
1099                 class_import_put(request->rq_import);
1100                 request->rq_import = NULL;
1101         }
1102         if (request->rq_bulk != NULL)
1103                 ptlrpc_free_bulk(request->rq_bulk);
1104
1105         ptlrpcs_req_drop_cred(request);
1106         OBD_FREE(request, sizeof(*request));
1107         EXIT;
1108 }
1109
1110 void ptlrpc_free_req(struct ptlrpc_request *request)
1111 {
1112         __ptlrpc_free_req(request, 0);
1113 }
1114
1115 static int __ptlrpc_req_finished(struct ptlrpc_request *request, int locked);
1116 void ptlrpc_req_finished_with_imp_lock(struct ptlrpc_request *request)
1117 {
1118         LASSERT_SPIN_LOCKED(&request->rq_import->imp_lock);
1119         (void)__ptlrpc_req_finished(request, 1);
1120 }
1121
1122 static int __ptlrpc_req_finished(struct ptlrpc_request *request, int locked)
1123 {
1124         ENTRY;
1125         if (request == NULL)
1126                 RETURN(1);
1127
1128         if (request == LP_POISON ||
1129             request->rq_reqmsg == LP_POISON) {
1130                 CERROR("dereferencing freed request (bug 575)\n");
1131                 LBUG();
1132                 RETURN(1);
1133         }
1134
1135         DEBUG_REQ(D_INFO, request, "refcount now %u",
1136                   atomic_read(&request->rq_refcount) - 1);
1137
1138         if (atomic_dec_and_test(&request->rq_refcount)) {
1139                 __ptlrpc_free_req(request, locked);
1140                 RETURN(1);
1141         }
1142
1143         RETURN(0);
1144 }
1145
1146 void ptlrpc_req_finished(struct ptlrpc_request *request)
1147 {
1148         __ptlrpc_req_finished(request, 0);
1149 }
1150
1151 /* Disengage the client's reply buffer from the network
1152  * NB does _NOT_ unregister any client-side bulk.
1153  * IDEMPOTENT, but _not_ safe against concurrent callers.
1154  * The request owner (i.e. the thread doing the I/O) must call...
1155  */
1156 void ptlrpc_unregister_reply (struct ptlrpc_request *request)
1157 {
1158         int                rc;
1159         wait_queue_head_t *wq;
1160         struct l_wait_info lwi;
1161
1162         LASSERT(!in_interrupt ());             /* might sleep */
1163
1164         if (!ptlrpc_client_receiving_reply(request))
1165                 return;
1166
1167         PtlMDUnlink (request->rq_reply_md_h);
1168
1169         /* We have to l_wait_event() whatever the result, to give liblustre
1170          * a chance to run reply_in_callback() */
1171
1172         if (request->rq_set != NULL)
1173                 wq = &request->rq_set->set_waitq;
1174         else
1175                 wq = &request->rq_reply_waitq;
1176
1177         for (;;) {
1178                 /* Network access will complete in finite time but the HUGE
1179                  * timeout lets us CWARN for visibility of sluggish NALs */
1180                 lwi = LWI_TIMEOUT(300 * HZ, NULL, NULL);
1181                 rc = l_wait_event (*wq, !ptlrpc_client_receiving_reply(request), &lwi);
1182                 if (rc == 0)
1183                         return;
1184
1185                 LASSERT (rc == -ETIMEDOUT);
1186                 DEBUG_REQ(D_WARNING, request, "Unexpectedly long timeout");
1187         }
1188 }
1189
1190 /* caller must hold imp->imp_lock */
1191 void ptlrpc_free_committed(struct obd_import *imp)
1192 {
1193         struct list_head *tmp, *saved;
1194         struct ptlrpc_request *req;
1195         struct ptlrpc_request *last_req = NULL; /* temporary fire escape */
1196         ENTRY;
1197
1198         LASSERT(imp != NULL);
1199
1200         LASSERT_SPIN_LOCKED(&imp->imp_lock);
1201
1202         CDEBUG(D_HA, "%s: committing for last_committed "LPU64"\n",
1203                imp->imp_obd->obd_name, imp->imp_peer_committed_transno);
1204
1205         list_for_each_safe(tmp, saved, &imp->imp_replay_list) {
1206                 req = list_entry(tmp, struct ptlrpc_request, rq_replay_list);
1207
1208                 /* XXX ok to remove when 1357 resolved - rread 05/29/03  */
1209                 LASSERT(req != last_req);
1210                 last_req = req;
1211
1212                 if (req->rq_import_generation < imp->imp_generation) {
1213                         DEBUG_REQ(D_HA, req, "freeing request with old gen");
1214                         GOTO(free_req, 0);
1215                 }
1216
1217                 if (req->rq_replay) {
1218                         DEBUG_REQ(D_HA, req, "keeping (FL_REPLAY)");
1219                         continue;
1220                 }
1221
1222                 /* not yet committed */
1223                 if (req->rq_transno > imp->imp_peer_committed_transno) {
1224                         DEBUG_REQ(D_HA, req, "stopping search");
1225                         break;
1226                 }
1227
1228                 DEBUG_REQ(D_HA, req, "committing (last_committed "LPU64")",
1229                           imp->imp_peer_committed_transno);
1230 free_req:
1231                 if (req->rq_commit_cb != NULL)
1232                         req->rq_commit_cb(req);
1233                 list_del_init(&req->rq_replay_list);
1234                 __ptlrpc_req_finished(req, 1);
1235         }
1236
1237         EXIT;
1238         return;
1239 }
1240
1241 void ptlrpc_cleanup_client(struct obd_import *imp)
1242 {
1243         ENTRY;
1244         EXIT;
1245         return;
1246 }
1247
1248 void ptlrpc_resend_req(struct ptlrpc_request *req)
1249 {
1250         unsigned long flags;
1251
1252         DEBUG_REQ(D_HA, req, "going to resend");
1253         req->rq_reqmsg->handle.cookie = 0;
1254         req->rq_status = -EAGAIN;
1255
1256         spin_lock_irqsave (&req->rq_lock, flags);
1257         req->rq_resend = 1;
1258         req->rq_net_err = 0;
1259         req->rq_timedout = 0;
1260         if (req->rq_bulk) {
1261                 __u64 old_xid = req->rq_xid;
1262                 
1263                 /* ensure previous bulk fails */
1264                 req->rq_xid = ptlrpc_next_xid();
1265                 CDEBUG(D_HA, "resend bulk old x"LPU64" new x"LPU64"\n",
1266                        old_xid, req->rq_xid);
1267         }
1268         ptlrpc_wake_client_req(req);
1269         spin_unlock_irqrestore (&req->rq_lock, flags);
1270 }
1271
1272 /* XXX: this function and rq_status are currently unused */
1273 void ptlrpc_restart_req(struct ptlrpc_request *req)
1274 {
1275         unsigned long flags;
1276
1277         DEBUG_REQ(D_HA, req, "restarting (possibly-)completed request");
1278         req->rq_status = -ERESTARTSYS;
1279
1280         spin_lock_irqsave (&req->rq_lock, flags);
1281         req->rq_restart = 1;
1282         req->rq_timedout = 0;
1283         ptlrpc_wake_client_req(req);
1284         spin_unlock_irqrestore (&req->rq_lock, flags);
1285 }
1286
1287 static int expired_request(void *data)
1288 {
1289         struct ptlrpc_request *req = data;
1290         ENTRY;
1291
1292         /* some failure can suspend regular timeouts */
1293         if (ptlrpc_check_suspend())
1294                 RETURN(1);
1295
1296         RETURN(ptlrpc_expire_one_request(req));
1297 }
1298
1299 static void interrupted_request(void *data)
1300 {
1301         unsigned long flags;
1302
1303         struct ptlrpc_request *req = data;
1304         DEBUG_REQ(D_HA, req, "request interrupted");
1305         spin_lock_irqsave (&req->rq_lock, flags);
1306         req->rq_intr = 1;
1307         spin_unlock_irqrestore (&req->rq_lock, flags);
1308 }
1309
1310 struct ptlrpc_request *ptlrpc_request_addref(struct ptlrpc_request *req)
1311 {
1312         ENTRY;
1313         atomic_inc(&req->rq_refcount);
1314         RETURN(req);
1315 }
1316
1317 void ptlrpc_retain_replayable_request(struct ptlrpc_request *req,
1318                                       struct obd_import *imp)
1319 {
1320         struct list_head *tmp;
1321
1322         LASSERT_SPIN_LOCKED(&imp->imp_lock);
1323
1324         /* clear this  for new requests that were resent as well
1325            as resent replayed requests. */
1326         lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT);
1327
1328         /* don't re-add requests that have been replayed */
1329         if (!list_empty(&req->rq_replay_list))
1330                 return;
1331
1332         lustre_msg_add_flags(req->rq_reqmsg, MSG_REPLAY);
1333
1334         LASSERT(imp->imp_replayable);
1335         /* Balanced in ptlrpc_free_committed, usually. */
1336         ptlrpc_request_addref(req);
1337         list_for_each_prev(tmp, &imp->imp_replay_list) {
1338                 struct ptlrpc_request *iter =
1339                         list_entry(tmp, struct ptlrpc_request, rq_replay_list);
1340
1341                 /* We may have duplicate transnos if we create and then
1342                  * open a file, or for closes retained if to match creating
1343                  * opens, so use req->rq_xid as a secondary key.
1344                  * (See bugs 684, 685, and 428.)
1345                  * XXX no longer needed, but all opens need transnos!
1346                  */
1347                 if (iter->rq_transno > req->rq_transno)
1348                         continue;
1349
1350                 if (iter->rq_transno == req->rq_transno) {
1351                         LASSERT(iter->rq_xid != req->rq_xid);
1352                         if (iter->rq_xid > req->rq_xid)
1353                                 continue;
1354                 }
1355
1356                 list_add(&req->rq_replay_list, &iter->rq_replay_list);
1357                 return;
1358         }
1359
1360         list_add_tail(&req->rq_replay_list, &imp->imp_replay_list);
1361 }
1362
1363 int ptlrpc_queue_wait(struct ptlrpc_request *req)
1364 {
1365         char str[PTL_NALFMT_SIZE];
1366         int rc = 0;
1367         int brc;
1368         struct l_wait_info lwi;
1369         struct obd_import *imp = req->rq_import;
1370         unsigned long flags;
1371         int timeout = 0;
1372         ENTRY;
1373
1374         LASSERT(req->rq_set == NULL);
1375         LASSERT(!req->rq_receiving_reply);
1376         atomic_inc(&imp->imp_inflight);
1377
1378         if (imp->imp_connection == NULL) {
1379                 CERROR("request on not connected import %s\n",
1380                         imp->imp_obd->obd_name);
1381                 RETURN(-EINVAL);
1382         }
1383
1384         /* for distributed debugging */
1385         req->rq_reqmsg->status = current->pid;
1386         LASSERT(imp->imp_obd != NULL);
1387         CDEBUG(D_RPCTRACE, "Sending RPC pname:cluuid:pid:xid:ni:nid:opc "
1388                "%s:%s:%d:"LPU64":%s:%s:%d\n", current->comm,
1389                imp->imp_obd->obd_uuid.uuid,
1390                req->rq_reqmsg->status, req->rq_xid,
1391                imp->imp_connection->c_peer.peer_ni->pni_name,
1392                ptlrpc_peernid2str(&imp->imp_connection->c_peer, str),
1393                req->rq_reqmsg->opc);
1394
1395         /* Mark phase here for a little debug help */
1396         req->rq_phase = RQ_PHASE_RPC;
1397
1398         spin_lock_irqsave(&imp->imp_lock, flags);
1399         req->rq_import_generation = imp->imp_generation;
1400 restart:
1401         if (ptlrpc_import_delay_req(imp, req, &rc)) {
1402                 list_del(&req->rq_list);
1403
1404                 list_add_tail(&req->rq_list, &imp->imp_delayed_list);
1405                 spin_unlock_irqrestore(&imp->imp_lock, flags);
1406
1407                 DEBUG_REQ(D_HA, req, "\"%s\" waiting for recovery: (%s != %s)",
1408                           current->comm, 
1409                           ptlrpc_import_state_name(req->rq_send_state), 
1410                           ptlrpc_import_state_name(imp->imp_state));
1411                 lwi = LWI_INTR(interrupted_request, req);
1412                 rc = l_wait_event(req->rq_reply_waitq,
1413                                   (req->rq_send_state == imp->imp_state ||
1414                                    req->rq_err),
1415                                   &lwi);
1416                 DEBUG_REQ(D_HA, req, "\"%s\" awake: (%s == %s or %d == 1)",
1417                           current->comm, 
1418                           ptlrpc_import_state_name(imp->imp_state), 
1419                           ptlrpc_import_state_name(req->rq_send_state),
1420                           req->rq_err);
1421
1422                 spin_lock_irqsave(&imp->imp_lock, flags);
1423                 list_del_init(&req->rq_list);
1424
1425                 if (req->rq_err) {
1426                         rc = -EIO;
1427                 } 
1428                 else if (req->rq_intr) {
1429                         rc = -EINTR;
1430                 }
1431                 else if (req->rq_no_resend) {
1432                         spin_unlock_irqrestore(&imp->imp_lock, flags);
1433                         GOTO(out, rc = -ETIMEDOUT);
1434                 }
1435                 else {
1436                         GOTO(restart, rc);
1437                 }
1438         } 
1439
1440         if (rc != 0) {
1441                 list_del_init(&req->rq_list);
1442                 spin_unlock_irqrestore(&imp->imp_lock, flags);
1443                 req->rq_status = rc; // XXX this ok?
1444                 GOTO(out, rc);
1445         }
1446
1447         if (req->rq_resend) {
1448                 if (!req->rq_ptlrpcs_restart)
1449                         lustre_msg_add_flags(req->rq_reqmsg, MSG_RESENT);
1450
1451                 if (req->rq_bulk != NULL)
1452                         ptlrpc_unregister_bulk (req);
1453
1454                 DEBUG_REQ(D_HA, req, "resending: ");
1455         }
1456
1457         /* XXX this is the same as ptlrpc_set_wait */
1458         LASSERT(list_empty(&req->rq_list));
1459         list_add_tail(&req->rq_list, &imp->imp_sending_list);
1460         spin_unlock_irqrestore(&imp->imp_lock, flags);
1461
1462         rc = ptl_send_rpc(req);
1463         if (rc) {
1464                 DEBUG_REQ(D_HA, req, "send failed (%d); recovering", rc);
1465                 timeout = 1;
1466         } else {
1467                 timeout = MAX(req->rq_timeout * HZ, 1);
1468                 DEBUG_REQ(D_NET, req, "-- sleeping for %d jiffies", timeout);
1469         }
1470 repeat:
1471         lwi = LWI_TIMEOUT_INTR(timeout, expired_request, interrupted_request,
1472                                req);
1473         rc = l_wait_event(req->rq_reply_waitq, ptlrpc_check_reply(req), &lwi);
1474         if (rc == -ETIMEDOUT && ptlrpc_check_and_wait_suspend(req))
1475                 goto repeat;
1476         DEBUG_REQ(D_NET, req, "-- done sleeping");
1477
1478         CDEBUG(D_RPCTRACE, "Completed RPC pname:cluuid:pid:xid:ni:nid:opc "
1479                "%s:%s:%d:"LPU64":%s:%s:%d\n", current->comm,
1480                imp->imp_obd->obd_uuid.uuid,
1481                req->rq_reqmsg->status, req->rq_xid,
1482                imp->imp_connection->c_peer.peer_ni->pni_name,
1483                ptlrpc_peernid2str(&imp->imp_connection->c_peer, str),
1484                req->rq_reqmsg->opc);
1485
1486         spin_lock_irqsave(&imp->imp_lock, flags);
1487         list_del_init(&req->rq_list);
1488         spin_unlock_irqrestore(&imp->imp_lock, flags);
1489
1490         /* If the reply was received normally, this just grabs the spinlock
1491          * (ensuring the reply callback has returned), sees that
1492          * req->rq_receiving_reply is clear and returns. */
1493         ptlrpc_unregister_reply (req);
1494
1495         if (req->rq_err)
1496                 GOTO(out, rc = -EIO);
1497
1498         /* Resend if we need to, unless we were interrupted. */
1499         if (req->rq_resend && !req->rq_intr) {
1500                 /* ...unless we were specifically told otherwise. */
1501                 if (req->rq_no_resend)
1502                         GOTO(out, rc = -ETIMEDOUT);
1503                 spin_lock_irqsave(&imp->imp_lock, flags);
1504                 goto restart;
1505         }
1506
1507         if (req->rq_intr) {
1508                 /* Should only be interrupted if we timed out. */
1509                 if (!req->rq_timedout)
1510                         DEBUG_REQ(D_ERROR, req,
1511                                   "rq_intr set but rq_timedout not");
1512                 GOTO(out, rc = -EINTR);
1513         }
1514
1515         if (req->rq_timedout) {                 /* non-recoverable timeout */
1516                 GOTO(out, rc = -ETIMEDOUT);
1517         }
1518
1519         if (!req->rq_replied) {
1520                 /* How can this be? -eeb */
1521                 DEBUG_REQ(D_ERROR, req, "!rq_replied: ");
1522                 LBUG();
1523                 GOTO(out, rc = req->rq_status);
1524         }
1525
1526         rc = after_reply (req);
1527         /* NB may return +ve success rc */
1528         if (req->rq_resend) {
1529                 spin_lock_irqsave(&imp->imp_lock, flags);
1530                 goto restart;
1531         }
1532
1533  out:
1534         if (req->rq_bulk != NULL) {
1535                 if (rc >= 0) {                  
1536                         /* success so far.  Note that anything going wrong
1537                          * with bulk now, is EXTREMELY strange, since the
1538                          * server must have believed that the bulk
1539                          * tranferred OK before she replied with success to
1540                          * me. */
1541                         lwi = LWI_TIMEOUT(timeout, NULL, NULL);
1542                         brc = l_wait_event(req->rq_reply_waitq,
1543                                            !ptlrpc_bulk_active(req->rq_bulk),
1544                                            &lwi);
1545                         LASSERT(brc == 0 || brc == -ETIMEDOUT);
1546                         if (brc != 0) {
1547                                 LASSERT(brc == -ETIMEDOUT);
1548                                 DEBUG_REQ(D_ERROR, req, "bulk timed out");
1549                                 rc = brc;
1550                         } else if (!req->rq_bulk->bd_success) {
1551                                 DEBUG_REQ(D_ERROR, req, "bulk transfer failed");
1552                                 rc = -EIO;
1553                         }
1554                 }
1555                 if (rc < 0)
1556                         ptlrpc_unregister_bulk (req);
1557         }
1558
1559         LASSERT(!req->rq_receiving_reply);
1560         req->rq_phase = RQ_PHASE_INTERPRET;
1561
1562         atomic_dec(&imp->imp_inflight);
1563         wake_up(&imp->imp_recovery_waitq);
1564         RETURN(rc);
1565 }
1566
1567 struct ptlrpc_replay_async_args {
1568         int praa_old_state;
1569         int praa_old_status;
1570 };
1571
1572 static int ptlrpc_replay_interpret(struct ptlrpc_request *req,
1573                                     void * data, int rc)
1574 {
1575         struct ptlrpc_replay_async_args *aa = data;
1576         struct obd_import *imp = req->rq_import;
1577         unsigned long flags;
1578
1579         atomic_dec(&imp->imp_replay_inflight);
1580         
1581         if (!req->rq_replied) {
1582                 CERROR("request replay timed out, restarting recovery\n");
1583                 GOTO(out, rc = -ETIMEDOUT);
1584         }
1585
1586 #if SWAB_PARANOIA
1587         /* Clear reply swab mask; this is a new reply in sender's byte order */
1588         req->rq_rep_swab_mask = 0;
1589 #endif
1590         LASSERT (req->rq_nob_received <= req->rq_repbuf_len);
1591         rc = lustre_unpack_msg(req->rq_repmsg, req->rq_replen);
1592         if (rc) {
1593                 CERROR("unpack_rep failed: %d\n", rc);
1594                 GOTO(out, rc = -EPROTO);
1595         }
1596
1597         if (req->rq_repmsg->type == PTL_RPC_MSG_ERR && 
1598             req->rq_repmsg->status == -ENOTCONN) 
1599                 GOTO(out, rc = req->rq_repmsg->status);
1600
1601         /* The transno had better not change over replay. */
1602         LASSERT(req->rq_reqmsg->transno == req->rq_repmsg->transno);
1603
1604         DEBUG_REQ(D_HA, req, "got rep");
1605
1606         /* let the callback do fixups, possibly including in the request */
1607         if (req->rq_replay_cb)
1608                 req->rq_replay_cb(req);
1609
1610         if (req->rq_replied && req->rq_repmsg->status != aa->praa_old_status) {
1611                 DEBUG_REQ(D_ERROR, req, "status %d, old was %d",
1612                           req->rq_repmsg->status, aa->praa_old_status);
1613         } else {
1614                 /* Put it back for re-replay. */
1615                 req->rq_repmsg->status = aa->praa_old_status;
1616         }
1617
1618         spin_lock_irqsave(&imp->imp_lock, flags);
1619         imp->imp_last_replay_transno = req->rq_transno;
1620         spin_unlock_irqrestore(&imp->imp_lock, flags);
1621
1622         /* continue with recovery */
1623         rc = ptlrpc_import_recovery_state_machine(imp);
1624  out:
1625         req->rq_send_state = aa->praa_old_state;
1626         
1627         if (rc != 0)
1628                 /* this replay failed, so restart recovery */
1629                 ptlrpc_connect_import(imp, NULL);
1630
1631         RETURN(rc);
1632 }
1633
1634
1635 int ptlrpc_replay_req(struct ptlrpc_request *req)
1636 {
1637         struct ptlrpc_replay_async_args *aa;
1638         ENTRY;
1639
1640         LASSERT(req->rq_import->imp_state == LUSTRE_IMP_REPLAY);
1641
1642         /* Not handling automatic bulk replay yet (or ever?) */
1643         LASSERT(req->rq_bulk == NULL);
1644
1645         DEBUG_REQ(D_HA, req, "REPLAY");
1646
1647         LASSERT (sizeof (*aa) <= sizeof (req->rq_async_args));
1648         aa = (struct ptlrpc_replay_async_args *)&req->rq_async_args;
1649         memset(aa, 0, sizeof *aa);
1650
1651         /* Prepare request to be resent with ptlrpcd */
1652         aa->praa_old_state = req->rq_send_state;
1653         req->rq_send_state = LUSTRE_IMP_REPLAY;
1654         req->rq_phase = RQ_PHASE_NEW;
1655         aa->praa_old_status = req->rq_repmsg->status;
1656         req->rq_status = 0;
1657
1658         req->rq_interpret_reply = ptlrpc_replay_interpret;
1659         atomic_inc(&req->rq_import->imp_replay_inflight);
1660         ptlrpc_request_addref(req); /* ptlrpcd needs a ref */
1661
1662         ptlrpcd_add_req(req);
1663         RETURN(0);
1664 }
1665
1666 void ptlrpc_abort_inflight(struct obd_import *imp)
1667 {
1668         unsigned long flags;
1669         struct list_head *tmp, *n;
1670         ENTRY;
1671
1672         /* Make sure that no new requests get processed for this import.
1673          * ptlrpc_{queue,set}_wait must (and does) hold imp_lock while testing
1674          * this flag and then putting requests on sending_list or delayed_list.
1675          */
1676         spin_lock_irqsave(&imp->imp_lock, flags);
1677
1678         /* XXX locking?  Maybe we should remove each request with the list
1679          * locked?  Also, how do we know if the requests on the list are
1680          * being freed at this time?
1681          */
1682         list_for_each_safe(tmp, n, &imp->imp_sending_list) {
1683                 struct ptlrpc_request *req =
1684                         list_entry(tmp, struct ptlrpc_request, rq_list);
1685
1686                 DEBUG_REQ(D_HA, req, "inflight");
1687
1688                 spin_lock (&req->rq_lock);
1689                 if (req->rq_import_generation < imp->imp_generation) {
1690                         req->rq_err = 1;
1691                         ptlrpc_wake_client_req(req);
1692                 }
1693                 spin_unlock (&req->rq_lock);
1694         }
1695
1696         list_for_each_safe(tmp, n, &imp->imp_delayed_list) {
1697                 struct ptlrpc_request *req =
1698                         list_entry(tmp, struct ptlrpc_request, rq_list);
1699
1700                 DEBUG_REQ(D_HA, req, "aborting waiting req");
1701
1702                 spin_lock (&req->rq_lock);
1703                 if (req->rq_import_generation < imp->imp_generation) {
1704                         req->rq_err = 1;
1705                         ptlrpc_wake_client_req(req);
1706                 }
1707                 spin_unlock (&req->rq_lock);
1708         }
1709
1710         list_for_each_safe(tmp, n, &imp->imp_rawrpc_list) {
1711                 struct ptlrpc_request *req =
1712                         list_entry(tmp, struct ptlrpc_request, rq_list);
1713
1714                 DEBUG_REQ(D_HA, req, "aborting raw rpc");
1715
1716                 spin_lock(&req->rq_lock);
1717                 req->rq_err = 1;
1718                 ptlrpc_wake_client_req(req);
1719                 spin_unlock(&req->rq_lock);
1720         }
1721
1722         /* Last chance to free reqs left on the replay list, but we
1723          * will still leak reqs that haven't comitted.  */
1724         if (imp->imp_replayable)
1725                 ptlrpc_free_committed(imp);
1726
1727         spin_unlock_irqrestore(&imp->imp_lock, flags);
1728
1729         EXIT;
1730 }
1731
1732 static __u64 ptlrpc_last_xid = 0;
1733 static spinlock_t ptlrpc_last_xid_lock = SPIN_LOCK_UNLOCKED;
1734
1735 __u64 ptlrpc_next_xid(void)
1736 {
1737         __u64 tmp;
1738         spin_lock(&ptlrpc_last_xid_lock);
1739         tmp = ++ptlrpc_last_xid;
1740         spin_unlock(&ptlrpc_last_xid_lock);
1741         return tmp;
1742 }
1743
1744