Whamcloud - gitweb
b=3048
[fs/lustre-release.git] / lustre / ptlrpc / client.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  *
21  */
22
23 #define DEBUG_SUBSYSTEM S_RPC
24 #ifndef __KERNEL__
25 #include <errno.h>
26 #include <signal.h>
27 #include <liblustre.h>
28 #endif
29
30 #include <linux/obd_support.h>
31 #include <linux/obd_class.h>
32 #include <linux/lustre_lib.h>
33 #include <linux/lustre_ha.h>
34 #include <linux/lustre_import.h>
35
36 #include "ptlrpc_internal.h"
37
38 void ptlrpc_init_client(int req_portal, int rep_portal, char *name,
39                         struct ptlrpc_client *cl)
40 {
41         cl->cli_request_portal = req_portal;
42         cl->cli_reply_portal   = rep_portal;
43         cl->cli_name           = name;
44 }
45
46 struct ptlrpc_connection *ptlrpc_uuid_to_connection(struct obd_uuid *uuid)
47 {
48         struct ptlrpc_connection *c;
49         struct ptlrpc_peer peer;
50         int err;
51
52         err = ptlrpc_uuid_to_peer(uuid, &peer);
53         if (err != 0) {
54                 CERROR("cannot find peer %s!\n", uuid->uuid);
55                 return NULL;
56         }
57
58         c = ptlrpc_get_connection(&peer, uuid);
59         if (c) {
60                 memcpy(c->c_remote_uuid.uuid,
61                        uuid->uuid, sizeof(c->c_remote_uuid.uuid));
62         }
63
64         CDEBUG(D_INFO, "%s -> %p\n", uuid->uuid, c);
65
66         return c;
67 }
68
69 void ptlrpc_readdress_connection(struct ptlrpc_connection *conn,
70                                  struct obd_uuid *uuid)
71 {
72         struct ptlrpc_peer peer;
73         int err;
74
75         err = ptlrpc_uuid_to_peer(uuid, &peer);
76         if (err != 0) {
77                 CERROR("cannot find peer %s!\n", uuid->uuid);
78                 return;
79         }
80
81         memcpy(&conn->c_peer, &peer, sizeof (peer));
82         return;
83 }
84
85 static inline struct ptlrpc_bulk_desc *new_bulk(int npages, int type, int portal)
86 {
87         struct ptlrpc_bulk_desc *desc;
88
89         OBD_ALLOC(desc, offsetof (struct ptlrpc_bulk_desc, bd_iov[npages]));
90         if (!desc)
91                 return NULL;
92
93         spin_lock_init(&desc->bd_lock);
94         init_waitqueue_head(&desc->bd_waitq);
95         desc->bd_max_iov = npages;
96         desc->bd_iov_count = 0;
97         desc->bd_md_h = PTL_INVALID_HANDLE;
98         desc->bd_portal = portal;
99         desc->bd_type = type;
100         
101         return desc;
102 }
103
104 struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_imp (struct ptlrpc_request *req,
105                                                int npages, int type, int portal)
106 {
107         struct obd_import *imp = req->rq_import;
108         struct ptlrpc_bulk_desc *desc;
109
110         LASSERT(type == BULK_PUT_SINK || type == BULK_GET_SOURCE);
111         desc = new_bulk(npages, type, portal);
112         if (desc == NULL)
113                 RETURN(NULL);
114
115         desc->bd_import_generation = req->rq_import_generation;
116         desc->bd_import = class_import_get(imp);
117         desc->bd_req = req;
118
119         desc->bd_cbid.cbid_fn  = client_bulk_callback;
120         desc->bd_cbid.cbid_arg = desc;
121
122         /* This makes req own desc, and free it when she frees herself */
123         req->rq_bulk = desc;
124
125         return desc;
126 }
127
128 struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_exp (struct ptlrpc_request *req,
129                                                int npages, int type, int portal)
130 {
131         struct obd_export *exp = req->rq_export;
132         struct ptlrpc_bulk_desc *desc;
133
134         LASSERT(type == BULK_PUT_SOURCE || type == BULK_GET_SINK);
135
136         desc = new_bulk(npages, type, portal);
137         if (desc == NULL)
138                 RETURN(NULL);
139
140         desc->bd_export = class_export_get(exp);
141         desc->bd_req = req;
142
143         desc->bd_cbid.cbid_fn  = server_bulk_callback;
144         desc->bd_cbid.cbid_arg = desc;
145
146         /* NB we don't assign rq_bulk here; server-side requests are
147          * re-used, and the handler frees the bulk desc explicitly. */
148
149         return desc;
150 }
151
152 void ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc,
153                            struct page *page, int pageoffset, int len)
154 {
155         LASSERT(desc->bd_iov_count < desc->bd_max_iov);
156         LASSERT(page != NULL);
157         LASSERT(pageoffset >= 0);
158         LASSERT(len > 0);
159         LASSERT(pageoffset + len <= PAGE_SIZE);
160
161         desc->bd_nob += len;
162
163         ptlrpc_add_bulk_page(desc, page, pageoffset, len);
164 }
165
166 void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *desc)
167 {
168         ENTRY;
169
170         LASSERT(desc != NULL);
171         LASSERT(desc->bd_iov_count != LI_POISON); /* not freed already */
172         LASSERT(!desc->bd_network_rw);         /* network hands off or */
173         LASSERT((desc->bd_export != NULL) ^ (desc->bd_import != NULL));
174         if (desc->bd_export)
175                 class_export_put(desc->bd_export);
176         else
177                 class_import_put(desc->bd_import);
178
179         OBD_FREE(desc, offsetof(struct ptlrpc_bulk_desc, 
180                                 bd_iov[desc->bd_max_iov]));
181         EXIT;
182 }
183
184 struct ptlrpc_request *ptlrpc_prep_req(struct obd_import *imp, int opcode,
185                                        int count, int *lengths, char **bufs)
186 {
187         struct ptlrpc_request *request;
188         int rc;
189         ENTRY;
190
191         LASSERT((unsigned long)imp > 0x1000);
192
193         OBD_ALLOC(request, sizeof(*request));
194         if (!request) {
195                 CERROR("request allocation out of memory\n");
196                 RETURN(NULL);
197         }
198
199         rc = lustre_pack_request(request, count, lengths, bufs);
200         if (rc) {
201                 CERROR("cannot pack request %d\n", rc);
202                 OBD_FREE(request, sizeof(*request));
203                 RETURN(NULL);
204         }
205
206         if (imp->imp_server_timeout)
207                 request->rq_timeout = obd_timeout / 2;
208         else
209                 request->rq_timeout = obd_timeout;
210         request->rq_send_state = LUSTRE_IMP_FULL;
211         request->rq_type = PTL_RPC_MSG_REQUEST;
212         request->rq_import = class_import_get(imp);
213         request->rq_export = NULL;
214         
215         request->rq_req_cbid.cbid_fn  = request_out_callback;
216         request->rq_req_cbid.cbid_arg = request;
217
218         request->rq_reply_cbid.cbid_fn  = reply_in_callback;
219         request->rq_reply_cbid.cbid_arg = request;
220         
221         request->rq_phase = RQ_PHASE_NEW;
222
223         /* XXX FIXME bug 249 */
224         request->rq_request_portal = imp->imp_client->cli_request_portal;
225         request->rq_reply_portal = imp->imp_client->cli_reply_portal;
226
227         spin_lock_init(&request->rq_lock);
228         INIT_LIST_HEAD(&request->rq_list);
229         INIT_LIST_HEAD(&request->rq_replay_list);
230         INIT_LIST_HEAD(&request->rq_set_chain);
231         init_waitqueue_head(&request->rq_reply_waitq);
232         request->rq_xid = ptlrpc_next_xid();
233         atomic_set(&request->rq_refcount, 1);
234
235         request->rq_reqmsg->opc = opcode;
236         request->rq_reqmsg->flags = 0;
237
238         RETURN(request);
239 }
240
241 struct ptlrpc_request_set *ptlrpc_prep_set(void)
242 {
243         struct ptlrpc_request_set *set;
244
245         OBD_ALLOC(set, sizeof *set);
246         if (!set)
247                 RETURN(NULL);
248         INIT_LIST_HEAD(&set->set_requests);
249         init_waitqueue_head(&set->set_waitq);
250         set->set_remaining = 0;
251         spin_lock_init(&set->set_new_req_lock);
252         INIT_LIST_HEAD(&set->set_new_requests);
253
254         RETURN(set);
255 }
256
257 /* Finish with this set; opposite of prep_set. */
258 void ptlrpc_set_destroy(struct ptlrpc_request_set *set)
259 {
260         struct list_head *tmp;
261         struct list_head *next;
262         int               expected_phase;
263         int               n = 0;
264         ENTRY;
265
266         /* Requests on the set should either all be completed, or all be new */
267         expected_phase = (set->set_remaining == 0) ?
268                          RQ_PHASE_COMPLETE : RQ_PHASE_NEW;
269         list_for_each (tmp, &set->set_requests) {
270                 struct ptlrpc_request *req =
271                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
272
273                 LASSERT(req->rq_phase == expected_phase);
274                 n++;
275         }
276
277         LASSERT(set->set_remaining == 0 || set->set_remaining == n);
278
279         list_for_each_safe(tmp, next, &set->set_requests) {
280                 struct ptlrpc_request *req =
281                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
282                 list_del_init(&req->rq_set_chain);
283
284                 LASSERT(req->rq_phase == expected_phase);
285
286                 if (req->rq_phase == RQ_PHASE_NEW) {
287
288                         if (req->rq_interpret_reply != NULL) {
289                                 int (*interpreter)(struct ptlrpc_request *,
290                                                    void *, int) =
291                                         req->rq_interpret_reply;
292
293                                 /* higher level (i.e. LOV) failed;
294                                  * let the sub reqs clean up */
295                                 req->rq_status = -EBADR;
296                                 interpreter(req, &req->rq_async_args,
297                                             req->rq_status);
298                         }
299                         set->set_remaining--;
300                 }
301
302                 req->rq_set = NULL;
303                 ptlrpc_req_finished (req);
304         }
305
306         LASSERT(set->set_remaining == 0);
307
308         OBD_FREE(set, sizeof(*set));
309         EXIT;
310 }
311
312 void ptlrpc_set_add_req(struct ptlrpc_request_set *set,
313                         struct ptlrpc_request *req)
314 {
315         /* The set takes over the caller's request reference */
316         list_add_tail(&req->rq_set_chain, &set->set_requests);
317         req->rq_set = set;
318         set->set_remaining++;
319         atomic_inc(&req->rq_import->imp_inflight);
320 }
321
322 /* lock so many callers can add things, the context that owns the set
323  * is supposed to notice these and move them into the set proper. */
324 void ptlrpc_set_add_new_req(struct ptlrpc_request_set *set,
325                             struct ptlrpc_request *req)
326 {
327         unsigned long flags;
328         spin_lock_irqsave(&set->set_new_req_lock, flags);
329         /* The set takes over the caller's request reference */
330         list_add_tail(&req->rq_set_chain, &set->set_new_requests);
331         req->rq_set = set;
332         spin_unlock_irqrestore(&set->set_new_req_lock, flags);
333 }
334
335 /*
336  * Based on the current state of the import, determine if the request
337  * can be sent, is an error, or should be delayed.
338  *
339  * Returns true if this request should be delayed. If false, and
340  * *status is set, then the request can not be sent and *status is the
341  * error code.  If false and status is 0, then request can be sent.
342  *
343  * The imp->imp_lock must be held.
344  */
345 static int ptlrpc_import_delay_req(struct obd_import *imp, 
346                                    struct ptlrpc_request *req, int *status)
347 {
348         int delay = 0;
349         ENTRY;
350
351         LASSERT (status != NULL);
352         *status = 0;
353
354         if (imp->imp_state == LUSTRE_IMP_NEW) {
355                 DEBUG_REQ(D_ERROR, req, "Uninitialized import.");
356                 *status = -EIO;
357                 LBUG();
358         }
359         else if (imp->imp_state == LUSTRE_IMP_CLOSED) {
360                 DEBUG_REQ(D_ERROR, req, "IMP_CLOSED ");
361                 *status = -EIO;
362         }
363         /* allow CONNECT even if import is invalid */
364         else if (req->rq_send_state == LUSTRE_IMP_CONNECTING &&
365                  imp->imp_state == LUSTRE_IMP_CONNECTING) {
366                 ;
367         }
368         /*
369          * If the import has been invalidated (such as by an OST failure), the
370          * request must fail with -EIO.  
371          */
372         else if (imp->imp_invalid) {
373                 DEBUG_REQ(D_ERROR, req, "IMP_INVALID");
374                 *status = -EIO;
375         } 
376         else if (req->rq_import_generation != imp->imp_generation) {
377                 DEBUG_REQ(D_ERROR, req, "req wrong generation:");
378                 *status = -EIO;
379         } 
380         else if (req->rq_send_state != imp->imp_state) {
381                 if (imp->imp_obd->obd_no_recov || imp->imp_dlm_fake 
382                     || req->rq_no_delay) 
383                         *status = -EWOULDBLOCK;
384                 else
385                         delay = 1;
386         }
387
388         RETURN(delay);
389 }
390
391 static int ptlrpc_check_reply(struct ptlrpc_request *req)
392 {
393         unsigned long flags;
394         int rc = 0;
395         ENTRY;
396
397         /* serialise with network callback */
398         spin_lock_irqsave (&req->rq_lock, flags);
399
400         if (req->rq_replied) {
401                 DEBUG_REQ(D_NET, req, "REPLIED:");
402                 GOTO(out, rc = 1);
403         }
404         
405         if (req->rq_net_err && !req->rq_timedout) {
406                 spin_unlock_irqrestore (&req->rq_lock, flags);
407                 rc = ptlrpc_expire_one_request(req); 
408                 spin_lock_irqsave (&req->rq_lock, flags);
409                 GOTO(out, rc);
410         }
411
412         if (req->rq_err) {
413                 DEBUG_REQ(D_ERROR, req, "ABORTED:");
414                 GOTO(out, rc = 1);
415         }
416
417         if (req->rq_resend) {
418                 DEBUG_REQ(D_ERROR, req, "RESEND:");
419                 GOTO(out, rc = 1);
420         }
421
422         if (req->rq_restart) {
423                 DEBUG_REQ(D_ERROR, req, "RESTART:");
424                 GOTO(out, rc = 1);
425         }
426         EXIT;
427  out:
428         spin_unlock_irqrestore (&req->rq_lock, flags);
429         DEBUG_REQ(D_NET, req, "rc = %d for", rc);
430         return rc;
431 }
432
433 static int ptlrpc_check_status(struct ptlrpc_request *req)
434 {
435         int err;
436         ENTRY;
437
438         err = req->rq_repmsg->status;
439         if (req->rq_repmsg->type == PTL_RPC_MSG_ERR) {
440                 DEBUG_REQ(D_ERROR, req, "type == PTL_RPC_MSG_ERR, err == %d", 
441                           err);
442                 RETURN(err < 0 ? err : -EINVAL);
443         }
444
445         if (err < 0) {
446                 DEBUG_REQ(D_INFO, req, "status is %d", err);
447         } else if (err > 0) {
448                 /* XXX: translate this error from net to host */
449                 DEBUG_REQ(D_INFO, req, "status is %d", err);
450         }
451
452         RETURN(err);
453 }
454
455 static int after_reply(struct ptlrpc_request *req)
456 {
457         unsigned long flags;
458         struct obd_import *imp = req->rq_import;
459         int rc;
460         ENTRY;
461
462         LASSERT(!req->rq_receiving_reply);
463
464         /* NB Until this point, the whole of the incoming message,
465          * including buflens, status etc is in the sender's byte order. */
466
467 #if SWAB_PARANOIA
468         /* Clear reply swab mask; this is a new reply in sender's byte order */
469         req->rq_rep_swab_mask = 0;
470 #endif
471         LASSERT (req->rq_nob_received <= req->rq_replen);
472         rc = lustre_unpack_msg(req->rq_repmsg, req->rq_nob_received);
473         if (rc) {
474                 CERROR("unpack_rep failed: %d\n", rc);
475                 RETURN(-EPROTO);
476         }
477
478         if (req->rq_repmsg->type != PTL_RPC_MSG_REPLY &&
479             req->rq_repmsg->type != PTL_RPC_MSG_ERR) {
480                 CERROR("invalid packet type received (type=%u)\n",
481                        req->rq_repmsg->type);
482                 RETURN(-EPROTO);
483         }
484
485         rc = ptlrpc_check_status(req);
486
487         /* Either we've been evicted, or the server has failed for
488          * some reason. Try to reconnect, and if that fails, punt to the
489          * upcall. */
490         if ((rc == -ENOTCONN) || (rc == -ENODEV)) {
491                 if (req->rq_send_state != LUSTRE_IMP_FULL ||
492                     imp->imp_obd->obd_no_recov || imp->imp_dlm_fake) {
493                         RETURN(-ENOTCONN);
494                 }
495
496                 ptlrpc_request_handle_notconn(req);
497
498                 RETURN(rc);
499         }
500
501         /* Store transno in reqmsg for replay. */
502         req->rq_reqmsg->transno = req->rq_transno = req->rq_repmsg->transno;
503
504         if (req->rq_import->imp_replayable) {
505                 spin_lock_irqsave(&imp->imp_lock, flags);
506                 if (req->rq_transno != 0)
507                         ptlrpc_retain_replayable_request(req, imp);
508                 else if (req->rq_commit_cb != NULL)
509                         req->rq_commit_cb(req);
510
511                 if (req->rq_transno > imp->imp_max_transno)
512                         imp->imp_max_transno = req->rq_transno;
513
514                 /* Replay-enabled imports return commit-status information. */
515                 if (req->rq_repmsg->last_committed)
516                         imp->imp_peer_committed_transno =
517                                 req->rq_repmsg->last_committed;
518                 ptlrpc_free_committed(imp);
519                 spin_unlock_irqrestore(&imp->imp_lock, flags);
520         }
521
522         RETURN(rc);
523 }
524
525 static int ptlrpc_send_new_req(struct ptlrpc_request *req)
526 {
527         char                   str[PTL_NALFMT_SIZE];
528         struct obd_import     *imp;
529         unsigned long          flags;
530         int rc;
531         ENTRY;
532
533         LASSERT(req->rq_phase == RQ_PHASE_NEW);
534         req->rq_phase = RQ_PHASE_RPC;
535
536         imp = req->rq_import;
537         spin_lock_irqsave(&imp->imp_lock, flags);
538
539         req->rq_import_generation = imp->imp_generation;
540
541         if (ptlrpc_import_delay_req(imp, req, &rc)) {
542                 spin_lock (&req->rq_lock);
543                 req->rq_waiting = 1;
544                 spin_unlock (&req->rq_lock);
545
546                 DEBUG_REQ(D_HA, req, "req from PID %d waiting for recovery: "
547                           "(%s != %s)",
548                           req->rq_reqmsg->status, 
549                           ptlrpc_import_state_name(req->rq_send_state),
550                           ptlrpc_import_state_name(imp->imp_state));
551                 LASSERT(list_empty (&req->rq_list));
552
553                 list_add_tail(&req->rq_list, &imp->imp_delayed_list);
554                 spin_unlock_irqrestore(&imp->imp_lock, flags);
555                 RETURN(0);
556         }
557
558         if (rc != 0) {
559                 spin_unlock_irqrestore(&imp->imp_lock, flags);
560                 req->rq_status = rc;
561                 req->rq_phase = RQ_PHASE_INTERPRET;
562                 RETURN(rc);
563         }
564
565         /* XXX this is the same as ptlrpc_queue_wait */
566         LASSERT(list_empty(&req->rq_list));
567         list_add_tail(&req->rq_list, &imp->imp_sending_list);
568         spin_unlock_irqrestore(&imp->imp_lock, flags);
569
570         req->rq_reqmsg->status = current->pid;
571         CDEBUG(D_RPCTRACE, "Sending RPC pname:cluuid:pid:xid:ni:nid:opc"
572                " %s:%s:%d:"LPU64":%s:%s:%d\n", current->comm,
573                imp->imp_obd->obd_uuid.uuid, req->rq_reqmsg->status,
574                req->rq_xid,
575                imp->imp_connection->c_peer.peer_ni->pni_name,
576                ptlrpc_peernid2str(&imp->imp_connection->c_peer, str),
577                req->rq_reqmsg->opc);
578
579         rc = ptl_send_rpc(req);
580         if (rc) {
581                 DEBUG_REQ(D_HA, req, "send failed (%d); expect timeout", rc);
582                 req->rq_net_err = 1;
583                 RETURN(rc);
584         }
585         RETURN(0);
586 }
587
588 int ptlrpc_check_set(struct ptlrpc_request_set *set)
589 {
590         char str[PTL_NALFMT_SIZE];
591         unsigned long flags;
592         struct list_head *tmp;
593         int force_timer_recalc = 0;
594         ENTRY;
595
596         if (set->set_remaining == 0)
597                 RETURN(1);
598
599         list_for_each(tmp, &set->set_requests) {
600                 struct ptlrpc_request *req =
601                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
602                 struct obd_import *imp = req->rq_import;
603                 int rc = 0;
604
605                 if (req->rq_phase == RQ_PHASE_NEW &&
606                     ptlrpc_send_new_req(req)) {
607                         force_timer_recalc = 1;
608                 }
609
610                 if (!(req->rq_phase == RQ_PHASE_RPC ||
611                       req->rq_phase == RQ_PHASE_BULK ||
612                       req->rq_phase == RQ_PHASE_INTERPRET ||
613                       req->rq_phase == RQ_PHASE_COMPLETE)) {
614                         DEBUG_REQ(D_ERROR, req, "bad phase %x", req->rq_phase);
615                         LBUG();
616                 }
617
618                 if (req->rq_phase == RQ_PHASE_COMPLETE)
619                         continue;
620
621                 if (req->rq_phase == RQ_PHASE_INTERPRET)
622                         GOTO(interpret, req->rq_status);
623
624                 if (req->rq_net_err && !req->rq_timedout)
625                         ptlrpc_expire_one_request(req); 
626
627                 if (req->rq_err) {
628                         ptlrpc_unregister_reply(req);
629                         if (req->rq_status == 0)
630                                 req->rq_status = -EIO;
631                         req->rq_phase = RQ_PHASE_INTERPRET;
632
633                         spin_lock_irqsave(&imp->imp_lock, flags);
634                         list_del_init(&req->rq_list);
635                         spin_unlock_irqrestore(&imp->imp_lock, flags);
636
637                         GOTO(interpret, req->rq_status);
638                 }
639
640                 /* ptlrpc_queue_wait->l_wait_event guarantees that rq_intr
641                  * will only be set after rq_timedout, but the oig waiting
642                  * path sets rq_intr irrespective of whether ptlrpcd has
643                  * seen a timeout.  our policy is to only interpret 
644                  * interrupted rpcs after they have timed out */
645                 if (req->rq_intr && (req->rq_timedout || req->rq_waiting)) {
646                         /* NB could be on delayed list */
647                         ptlrpc_unregister_reply(req);
648                         req->rq_status = -EINTR;
649                         req->rq_phase = RQ_PHASE_INTERPRET;
650
651                         spin_lock_irqsave(&imp->imp_lock, flags);
652                         list_del_init(&req->rq_list);
653                         spin_unlock_irqrestore(&imp->imp_lock, flags);
654
655                         GOTO(interpret, req->rq_status);
656                 }
657
658                 if (req->rq_phase == RQ_PHASE_RPC) {
659                         if (req->rq_timedout||req->rq_waiting||req->rq_resend) {
660                                 int status;
661
662                                 ptlrpc_unregister_reply(req);
663
664                                 spin_lock_irqsave(&imp->imp_lock, flags);
665
666                                 if (ptlrpc_import_delay_req(imp, req, &status)){
667                                         spin_unlock_irqrestore(&imp->imp_lock,
668                                                                flags);
669                                         continue;
670                                 }
671
672                                 list_del_init(&req->rq_list);
673                                 if (status != 0)  {
674                                         req->rq_status = status;
675                                         req->rq_phase = RQ_PHASE_INTERPRET;
676                                         spin_unlock_irqrestore(&imp->imp_lock,
677                                                                flags);
678                                         GOTO(interpret, req->rq_status);
679                                 }
680                                 if (req->rq_no_resend) {
681                                         req->rq_status = -ENOTCONN;
682                                         req->rq_phase = RQ_PHASE_INTERPRET;
683                                         spin_unlock_irqrestore(&imp->imp_lock,
684                                                                flags);
685                                         GOTO(interpret, req->rq_status);
686                                 }
687                                 list_add_tail(&req->rq_list,
688                                               &imp->imp_sending_list);
689
690                                 spin_unlock_irqrestore(&imp->imp_lock, flags);
691
692                                 req->rq_waiting = 0;
693                                 if (req->rq_resend) {
694                                         lustre_msg_add_flags(req->rq_reqmsg,
695                                                              MSG_RESENT);
696                                         if (req->rq_bulk) {
697                                                 __u64 old_xid = req->rq_xid;
698
699                                                 ptlrpc_unregister_bulk (req);
700
701                                                 /* ensure previous bulk fails */
702                                                 req->rq_xid = ptlrpc_next_xid();
703                                                 CDEBUG(D_HA, "resend bulk "
704                                                        "old x"LPU64
705                                                        " new x"LPU64"\n",
706                                                        old_xid, req->rq_xid);
707                                         }
708                                 }
709
710                                 rc = ptl_send_rpc(req);
711                                 if (rc) {
712                                         DEBUG_REQ(D_HA, req, "send failed (%d)",
713                                                   rc);
714                                         force_timer_recalc = 1;
715                                         req->rq_net_err = 1;
716                                 }
717                                 /* need to reset the timeout */
718                                 force_timer_recalc = 1;
719                         }
720
721                         /* Still waiting for a reply? */
722                         if (ptlrpc_client_receiving_reply(req))
723                                 continue;
724
725                         /* Did we actually receive a reply? */
726                         if (!ptlrpc_client_replied(req))
727                                 continue;
728
729                         spin_lock_irqsave(&imp->imp_lock, flags);
730                         list_del_init(&req->rq_list);
731                         spin_unlock_irqrestore(&imp->imp_lock, flags);
732
733                         req->rq_status = after_reply(req);
734                         if (req->rq_resend) {
735                                 /* Add this req to the delayed list so
736                                    it can be errored if the import is
737                                    evicted after recovery. */
738                                 spin_lock_irqsave (&req->rq_lock, flags);
739                                 list_add_tail(&req->rq_list, 
740                                               &imp->imp_delayed_list);
741                                 spin_unlock_irqrestore(&req->rq_lock, flags);
742                                 continue;
743                         }
744
745                         /* If there is no bulk associated with this request,
746                          * then we're done and should let the interpreter
747                          * process the reply.  Similarly if the RPC returned
748                          * an error, and therefore the bulk will never arrive.
749                          */
750                         if (req->rq_bulk == NULL || req->rq_status != 0) {
751                                 req->rq_phase = RQ_PHASE_INTERPRET;
752                                 GOTO(interpret, req->rq_status);
753                         }
754
755                         req->rq_phase = RQ_PHASE_BULK;
756                 }
757
758                 LASSERT(req->rq_phase == RQ_PHASE_BULK);
759                 if (ptlrpc_bulk_active(req->rq_bulk))
760                         continue;
761
762                 if (!req->rq_bulk->bd_success) {
763                         /* The RPC reply arrived OK, but the bulk screwed
764                          * up!  Dead wierd since the server told us the RPC
765                          * was good after getting the REPLY for her GET or
766                          * the ACK for her PUT. */
767                         DEBUG_REQ(D_ERROR, req, "bulk transfer failed");
768                         LBUG();
769                 }
770
771                 req->rq_phase = RQ_PHASE_INTERPRET;
772
773         interpret:
774                 LASSERT(req->rq_phase == RQ_PHASE_INTERPRET);
775                 LASSERT(!req->rq_receiving_reply);
776
777                 ptlrpc_unregister_reply(req);
778                 if (req->rq_bulk != NULL)
779                         ptlrpc_unregister_bulk (req);
780
781                 req->rq_phase = RQ_PHASE_COMPLETE;
782
783                 if (req->rq_interpret_reply != NULL) {
784                         int (*interpreter)(struct ptlrpc_request *,void *,int) =
785                                 req->rq_interpret_reply;
786                         req->rq_status = interpreter(req, &req->rq_async_args,
787                                                      req->rq_status);
788                 }
789
790                 CDEBUG(D_RPCTRACE, "Completed RPC pname:cluuid:pid:xid:ni:nid:"
791                        "opc %s:%s:%d:"LPU64":%s:%s:%d\n", current->comm,
792                        imp->imp_obd->obd_uuid.uuid, req->rq_reqmsg->status,
793                        req->rq_xid,
794                        imp->imp_connection->c_peer.peer_ni->pni_name,
795                        ptlrpc_peernid2str(&imp->imp_connection->c_peer, str),
796                        req->rq_reqmsg->opc);
797
798                 set->set_remaining--;
799
800                 atomic_dec(&imp->imp_inflight);
801                 wake_up(&imp->imp_recovery_waitq);
802         }
803
804         /* If we hit an error, we want to recover promptly. */
805         RETURN(set->set_remaining == 0 || force_timer_recalc);
806 }
807
808 int ptlrpc_expire_one_request(struct ptlrpc_request *req)
809 {
810         unsigned long      flags;
811         struct obd_import *imp = req->rq_import;
812         ENTRY;
813
814         DEBUG_REQ(D_ERROR, req, "timeout (sent at %lu, %lus ago)",
815                   (long)req->rq_sent, CURRENT_SECONDS - req->rq_sent);
816
817         spin_lock_irqsave (&req->rq_lock, flags);
818         req->rq_timedout = 1;
819         spin_unlock_irqrestore (&req->rq_lock, flags);
820
821         ptlrpc_unregister_reply (req);
822
823         if (obd_dump_on_timeout)
824                 portals_debug_dumplog();
825
826         if (req->rq_bulk != NULL)
827                 ptlrpc_unregister_bulk (req);
828
829         if (imp == NULL) {
830                 DEBUG_REQ(D_HA, req, "NULL import: already cleaned up?");
831                 RETURN(1);
832         }
833
834         /* The DLM server doesn't want recovery run on its imports. */
835         if (imp->imp_dlm_fake)
836                 RETURN(1);
837
838         /* If this request is for recovery or other primordial tasks,
839          * then error it out here. */
840         if (req->rq_send_state != LUSTRE_IMP_FULL ||
841             imp->imp_obd->obd_no_recov) {
842                 spin_lock_irqsave (&req->rq_lock, flags);
843                 req->rq_status = -ETIMEDOUT;
844                 req->rq_err = 1;
845                 spin_unlock_irqrestore (&req->rq_lock, flags);
846                 RETURN(1);
847         }
848
849         ptlrpc_fail_import(imp, req->rq_import_generation);
850
851         RETURN(0);
852 }
853
854 int ptlrpc_expired_set(void *data)
855 {
856         struct ptlrpc_request_set *set = data;
857         struct list_head          *tmp;
858         time_t                     now = CURRENT_SECONDS;
859         ENTRY;
860
861         LASSERT(set != NULL);
862
863         /* A timeout expired; see which reqs it applies to... */
864         list_for_each (tmp, &set->set_requests) {
865                 struct ptlrpc_request *req =
866                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
867
868                 /* request in-flight? */
869                 if (!((req->rq_phase == RQ_PHASE_RPC && !req->rq_waiting 
870                        && !req->rq_resend) ||
871                       (req->rq_phase == RQ_PHASE_BULK)))
872                         continue;
873
874                 if (req->rq_timedout ||           /* already dealt with */
875                     req->rq_sent + req->rq_timeout > now) /* not expired */
876                         continue;
877
878                 /* deal with this guy */
879                 ptlrpc_expire_one_request (req);
880         }
881
882         /* When waiting for a whole set, we always to break out of the
883          * sleep so we can recalculate the timeout, or enable interrupts
884          * iff everyone's timed out.
885          */
886         RETURN(1);
887 }
888
889 void ptlrpc_mark_interrupted(struct ptlrpc_request *req)
890 {
891         unsigned long flags;
892         spin_lock_irqsave(&req->rq_lock, flags);
893         req->rq_intr = 1;
894         spin_unlock_irqrestore(&req->rq_lock, flags);
895 }
896
897 void ptlrpc_interrupted_set(void *data)
898 {
899         struct ptlrpc_request_set *set = data;
900         struct list_head *tmp;
901
902         LASSERT(set != NULL);
903         CERROR("INTERRUPTED SET %p\n", set);
904
905         list_for_each(tmp, &set->set_requests) {
906                 struct ptlrpc_request *req =
907                         list_entry(tmp, struct ptlrpc_request, rq_set_chain);
908
909                 if (req->rq_phase != RQ_PHASE_RPC)
910                         continue;
911
912                 ptlrpc_mark_interrupted(req);
913         }
914 }
915
916 int ptlrpc_set_next_timeout(struct ptlrpc_request_set *set)
917 {
918         struct list_head      *tmp;
919         time_t                 now = CURRENT_SECONDS;
920         time_t                 deadline;
921         int                    timeout = 0;
922         struct ptlrpc_request *req;
923         ENTRY;
924
925         SIGNAL_MASK_ASSERT(); /* XXX BUG 1511 */
926
927         list_for_each(tmp, &set->set_requests) {
928                 req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
929
930                 /* request in-flight? */
931                 if (!((req->rq_phase == RQ_PHASE_RPC && !req->rq_waiting) ||
932                       (req->rq_phase == RQ_PHASE_BULK)))
933                         continue;
934
935                 if (req->rq_timedout)   /* already timed out */
936                         continue;
937
938                 deadline = req->rq_sent + req->rq_timeout;
939                 if (deadline <= now)    /* actually expired already */
940                         timeout = 1;    /* ASAP */
941                 else if (timeout == 0 || timeout > deadline - now)
942                         timeout = deadline - now;
943         }
944         RETURN(timeout);
945 }
946
947 int ptlrpc_set_wait(struct ptlrpc_request_set *set)
948 {
949         struct list_head      *tmp;
950         struct ptlrpc_request *req;
951         struct l_wait_info     lwi;
952         int                    rc, timeout;
953         ENTRY;
954
955         LASSERT(!list_empty(&set->set_requests));
956         list_for_each(tmp, &set->set_requests) {
957                 req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
958                 if (req->rq_phase == RQ_PHASE_NEW)
959                         (void)ptlrpc_send_new_req(req);
960         }
961
962         do {
963                 timeout = ptlrpc_set_next_timeout(set);
964
965                 /* wait until all complete, interrupted, or an in-flight
966                  * req times out */
967                 CDEBUG(D_HA, "set %p going to sleep for %d seconds\n",
968                        set, timeout);
969                 lwi = LWI_TIMEOUT_INTR((timeout ? timeout : 1) * HZ,
970                                        ptlrpc_expired_set,
971                                        ptlrpc_interrupted_set, set);
972                 rc = l_wait_event(set->set_waitq, ptlrpc_check_set(set), &lwi);
973
974                 LASSERT(rc == 0 || rc == -EINTR || rc == -ETIMEDOUT);
975
976                 /* -EINTR => all requests have been flagged rq_intr so next
977                  * check completes.
978                  * -ETIMEOUTD => someone timed out.  When all reqs have
979                  * timed out, signals are enabled allowing completion with
980                  * EINTR.
981                  * I don't really care if we go once more round the loop in
982                  * the error cases -eeb. */
983         } while (rc != 0 || set->set_remaining != 0);
984
985         LASSERT(set->set_remaining == 0);
986
987         rc = 0;
988         list_for_each(tmp, &set->set_requests) {
989                 req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
990
991                 LASSERT(req->rq_phase == RQ_PHASE_COMPLETE);
992                 if (req->rq_status != 0)
993                         rc = req->rq_status;
994         }
995
996         if (set->set_interpret != NULL) {
997                 int (*interpreter)(struct ptlrpc_request_set *set,void *,int) =
998                         set->set_interpret;
999                 rc = interpreter (set, set->set_arg, rc);
1000         }
1001
1002         RETURN(rc);
1003 }
1004
1005 static void __ptlrpc_free_req(struct ptlrpc_request *request, int locked)
1006 {
1007         ENTRY;
1008         if (request == NULL) {
1009                 EXIT;
1010                 return;
1011         }
1012
1013         LASSERTF(!request->rq_receiving_reply, "req %p\n", request);
1014         LASSERTF(request->rq_rqbd == NULL, "req %p\n",request);/* client-side */
1015         LASSERTF(list_empty(&request->rq_list), "req %p\n", request);
1016         LASSERTF(list_empty(&request->rq_set_chain), "req %p\n", request);
1017
1018         /* We must take it off the imp_replay_list first.  Otherwise, we'll set
1019          * request->rq_reqmsg to NULL while osc_close is dereferencing it. */
1020         if (request->rq_import != NULL) {
1021                 unsigned long flags = 0;
1022                 if (!locked)
1023                         spin_lock_irqsave(&request->rq_import->imp_lock, flags);
1024                 list_del_init(&request->rq_replay_list);
1025                 if (!locked)
1026                         spin_unlock_irqrestore(&request->rq_import->imp_lock,
1027                                                flags);
1028         }
1029         LASSERTF(list_empty(&request->rq_replay_list), "req %p\n", request);
1030
1031         if (atomic_read(&request->rq_refcount) != 0) {
1032                 DEBUG_REQ(D_ERROR, request,
1033                           "freeing request with nonzero refcount");
1034                 LBUG();
1035         }
1036
1037         if (request->rq_repmsg != NULL) {
1038                 OBD_FREE(request->rq_repmsg, request->rq_replen);
1039                 request->rq_repmsg = NULL;
1040         }
1041         if (request->rq_reqmsg != NULL) {
1042                 OBD_FREE(request->rq_reqmsg, request->rq_reqlen);
1043                 request->rq_reqmsg = NULL;
1044         }
1045         if (request->rq_export != NULL) {
1046                 class_export_put(request->rq_export);
1047                 request->rq_export = NULL;
1048         }
1049         if (request->rq_import != NULL) {
1050                 class_import_put(request->rq_import);
1051                 request->rq_import = NULL;
1052         }
1053         if (request->rq_bulk != NULL)
1054                 ptlrpc_free_bulk(request->rq_bulk);
1055
1056         OBD_FREE(request, sizeof(*request));
1057         EXIT;
1058 }
1059
1060 void ptlrpc_free_req(struct ptlrpc_request *request)
1061 {
1062         __ptlrpc_free_req(request, 0);
1063 }
1064
1065 static int __ptlrpc_req_finished(struct ptlrpc_request *request, int locked);
1066 void ptlrpc_req_finished_with_imp_lock(struct ptlrpc_request *request)
1067 {
1068         LASSERT_SPIN_LOCKED(&request->rq_import->imp_lock);
1069         (void)__ptlrpc_req_finished(request, 1);
1070 }
1071
1072 static int __ptlrpc_req_finished(struct ptlrpc_request *request, int locked)
1073 {
1074         ENTRY;
1075         if (request == NULL)
1076                 RETURN(1);
1077
1078         if (request == LP_POISON ||
1079             request->rq_reqmsg == LP_POISON) {
1080                 CERROR("dereferencing freed request (bug 575)\n");
1081                 LBUG();
1082                 RETURN(1);
1083         }
1084
1085         DEBUG_REQ(D_INFO, request, "refcount now %u",
1086                   atomic_read(&request->rq_refcount) - 1);
1087
1088         if (atomic_dec_and_test(&request->rq_refcount)) {
1089                 __ptlrpc_free_req(request, locked);
1090                 RETURN(1);
1091         }
1092
1093         RETURN(0);
1094 }
1095
1096 void ptlrpc_req_finished(struct ptlrpc_request *request)
1097 {
1098         __ptlrpc_req_finished(request, 0);
1099 }
1100
1101 __u64 ptlrpc_req_xid(struct ptlrpc_request *request)
1102 {
1103         return request->rq_xid;
1104 }
1105 EXPORT_SYMBOL(ptlrpc_req_xid);
1106
1107 /* Disengage the client's reply buffer from the network
1108  * NB does _NOT_ unregister any client-side bulk.
1109  * IDEMPOTENT, but _not_ safe against concurrent callers.
1110  * The request owner (i.e. the thread doing the I/O) must call...
1111  */
1112 void ptlrpc_unregister_reply (struct ptlrpc_request *request)
1113 {
1114         int                rc;
1115         wait_queue_head_t *wq;
1116         struct l_wait_info lwi;
1117
1118         LASSERT(!in_interrupt ());             /* might sleep */
1119
1120         if (!ptlrpc_client_receiving_reply(request))
1121                 return;
1122
1123         PtlMDUnlink (request->rq_reply_md_h);
1124
1125         /* We have to l_wait_event() whatever the result, to give liblustre
1126          * a chance to run reply_in_callback() */
1127
1128         if (request->rq_set != NULL)
1129                 wq = &request->rq_set->set_waitq;
1130         else
1131                 wq = &request->rq_reply_waitq;
1132
1133         for (;;) {
1134                 /* Network access will complete in finite time but the HUGE
1135                  * timeout lets us CWARN for visibility of sluggish NALs */
1136                 lwi = LWI_TIMEOUT(300 * HZ, NULL, NULL);
1137                 rc = l_wait_event (*wq, !ptlrpc_client_receiving_reply(request), &lwi);
1138                 if (rc == 0)
1139                         return;
1140
1141                 LASSERT (rc == -ETIMEDOUT);
1142                 DEBUG_REQ(D_WARNING, request, "Unexpectedly long timeout");
1143         }
1144 }
1145
1146 /* caller must hold imp->imp_lock */
1147 void ptlrpc_free_committed(struct obd_import *imp)
1148 {
1149         struct list_head *tmp, *saved;
1150         struct ptlrpc_request *req;
1151         struct ptlrpc_request *last_req = NULL; /* temporary fire escape */
1152         ENTRY;
1153
1154         LASSERT(imp != NULL);
1155
1156         LASSERT_SPIN_LOCKED(&imp->imp_lock);
1157
1158         CDEBUG(D_HA, "%s: committing for last_committed "LPU64"\n",
1159                imp->imp_obd->obd_name, imp->imp_peer_committed_transno);
1160
1161         list_for_each_safe(tmp, saved, &imp->imp_replay_list) {
1162                 req = list_entry(tmp, struct ptlrpc_request, rq_replay_list);
1163
1164                 /* XXX ok to remove when 1357 resolved - rread 05/29/03  */
1165                 LASSERT(req != last_req);
1166                 last_req = req;
1167
1168                 if (req->rq_import_generation < imp->imp_generation) {
1169                         DEBUG_REQ(D_HA, req, "freeing request with old gen");
1170                         GOTO(free_req, 0);
1171                 }
1172
1173                 if (req->rq_replay) {
1174                         DEBUG_REQ(D_HA, req, "keeping (FL_REPLAY)");
1175                         continue;
1176                 }
1177
1178                 /* not yet committed */
1179                 if (req->rq_transno > imp->imp_peer_committed_transno) {
1180                         DEBUG_REQ(D_HA, req, "stopping search");
1181                         break;
1182                 }
1183
1184                 DEBUG_REQ(D_HA, req, "committing (last_committed "LPU64")",
1185                           imp->imp_peer_committed_transno);
1186 free_req:
1187                 if (req->rq_commit_cb != NULL)
1188                         req->rq_commit_cb(req);
1189                 list_del_init(&req->rq_replay_list);
1190                 __ptlrpc_req_finished(req, 1);
1191         }
1192
1193         EXIT;
1194         return;
1195 }
1196
1197 void ptlrpc_cleanup_client(struct obd_import *imp)
1198 {
1199         ENTRY;
1200         EXIT;
1201         return;
1202 }
1203
1204 void ptlrpc_resend_req(struct ptlrpc_request *req)
1205 {
1206         unsigned long flags;
1207
1208         DEBUG_REQ(D_HA, req, "going to resend");
1209         req->rq_reqmsg->handle.cookie = 0;
1210         req->rq_status = -EAGAIN;
1211
1212         spin_lock_irqsave (&req->rq_lock, flags);
1213         req->rq_resend = 1;
1214         req->rq_net_err = 0;
1215         req->rq_timedout = 0;
1216         if (req->rq_bulk) {
1217                 __u64 old_xid = req->rq_xid;
1218
1219                 /* ensure previous bulk fails */
1220                 req->rq_xid = ptlrpc_next_xid();
1221                 CDEBUG(D_HA, "resend bulk old x"LPU64" new x"LPU64"\n",
1222                        old_xid, req->rq_xid);
1223         }
1224         ptlrpc_wake_client_req(req);
1225         spin_unlock_irqrestore (&req->rq_lock, flags);
1226 }
1227
1228 /* XXX: this function and rq_status are currently unused */
1229 void ptlrpc_restart_req(struct ptlrpc_request *req)
1230 {
1231         unsigned long flags;
1232
1233         DEBUG_REQ(D_HA, req, "restarting (possibly-)completed request");
1234         req->rq_status = -ERESTARTSYS;
1235
1236         spin_lock_irqsave (&req->rq_lock, flags);
1237         req->rq_restart = 1;
1238         req->rq_timedout = 0;
1239         ptlrpc_wake_client_req(req);
1240         spin_unlock_irqrestore (&req->rq_lock, flags);
1241 }
1242
1243 static int expired_request(void *data)
1244 {
1245         struct ptlrpc_request *req = data;
1246         ENTRY;
1247
1248         RETURN(ptlrpc_expire_one_request(req));
1249 }
1250
1251 static void interrupted_request(void *data)
1252 {
1253         unsigned long flags;
1254
1255         struct ptlrpc_request *req = data;
1256         DEBUG_REQ(D_HA, req, "request interrupted");
1257         spin_lock_irqsave (&req->rq_lock, flags);
1258         req->rq_intr = 1;
1259         spin_unlock_irqrestore (&req->rq_lock, flags);
1260 }
1261
1262 struct ptlrpc_request *ptlrpc_request_addref(struct ptlrpc_request *req)
1263 {
1264         ENTRY;
1265         atomic_inc(&req->rq_refcount);
1266         RETURN(req);
1267 }
1268
1269 void ptlrpc_retain_replayable_request(struct ptlrpc_request *req,
1270                                       struct obd_import *imp)
1271 {
1272         struct list_head *tmp;
1273
1274         LASSERT_SPIN_LOCKED(&imp->imp_lock);
1275
1276         /* clear this for new requests that were resent as well
1277            as resent replayed requests. */
1278         lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT);
1279
1280         /* don't re-add requests that have been replayed */
1281         if (!list_empty(&req->rq_replay_list))
1282                 return;
1283
1284         lustre_msg_add_flags(req->rq_reqmsg, MSG_REPLAY);
1285
1286         LASSERT(imp->imp_replayable);
1287         /* Balanced in ptlrpc_free_committed, usually. */
1288         ptlrpc_request_addref(req);
1289         list_for_each_prev(tmp, &imp->imp_replay_list) {
1290                 struct ptlrpc_request *iter =
1291                         list_entry(tmp, struct ptlrpc_request, rq_replay_list);
1292
1293                 /* We may have duplicate transnos if we create and then
1294                  * open a file, or for closes retained if to match creating
1295                  * opens, so use req->rq_xid as a secondary key.
1296                  * (See bugs 684, 685, and 428.)
1297                  * XXX no longer needed, but all opens need transnos!
1298                  */
1299                 if (iter->rq_transno > req->rq_transno)
1300                         continue;
1301
1302                 if (iter->rq_transno == req->rq_transno) {
1303                         LASSERT(iter->rq_xid != req->rq_xid);
1304                         if (iter->rq_xid > req->rq_xid)
1305                                 continue;
1306                 }
1307
1308                 list_add(&req->rq_replay_list, &iter->rq_replay_list);
1309                 return;
1310         }
1311
1312         list_add_tail(&req->rq_replay_list, &imp->imp_replay_list);
1313 }
1314
1315 int ptlrpc_queue_wait(struct ptlrpc_request *req)
1316 {
1317         char str[PTL_NALFMT_SIZE];
1318         int rc = 0;
1319         int brc;
1320         struct l_wait_info lwi;
1321         struct obd_import *imp = req->rq_import;
1322         unsigned long flags;
1323         int timeout = 0;
1324         ENTRY;
1325
1326         LASSERT(req->rq_set == NULL);
1327         LASSERT(!req->rq_receiving_reply);
1328         atomic_inc(&imp->imp_inflight);
1329
1330         /* for distributed debugging */
1331         req->rq_reqmsg->status = current->pid;
1332         LASSERT(imp->imp_obd != NULL);
1333         CDEBUG(D_RPCTRACE, "Sending RPC pname:cluuid:pid:xid:ni:nid:opc "
1334                "%s:%s:%d:"LPU64":%s:%s:%d\n", current->comm,
1335                imp->imp_obd->obd_uuid.uuid,
1336                req->rq_reqmsg->status, req->rq_xid,
1337                imp->imp_connection->c_peer.peer_ni->pni_name,
1338                ptlrpc_peernid2str(&imp->imp_connection->c_peer, str),
1339                req->rq_reqmsg->opc);
1340
1341         /* Mark phase here for a little debug help */
1342         req->rq_phase = RQ_PHASE_RPC;
1343
1344         spin_lock_irqsave(&imp->imp_lock, flags);
1345         req->rq_import_generation = imp->imp_generation;
1346 restart:
1347         if (ptlrpc_import_delay_req(imp, req, &rc)) {
1348                 list_del(&req->rq_list);
1349
1350                 list_add_tail(&req->rq_list, &imp->imp_delayed_list);
1351                 spin_unlock_irqrestore(&imp->imp_lock, flags);
1352
1353                 DEBUG_REQ(D_HA, req, "\"%s\" waiting for recovery: (%s != %s)",
1354                           current->comm, 
1355                           ptlrpc_import_state_name(req->rq_send_state), 
1356                           ptlrpc_import_state_name(imp->imp_state));
1357                 lwi = LWI_INTR(interrupted_request, req);
1358                 rc = l_wait_event(req->rq_reply_waitq,
1359                                   (req->rq_send_state == imp->imp_state ||
1360                                    req->rq_err),
1361                                   &lwi);
1362                 DEBUG_REQ(D_HA, req, "\"%s\" awake: (%s == %s or %d == 1)",
1363                           current->comm, 
1364                           ptlrpc_import_state_name(imp->imp_state), 
1365                           ptlrpc_import_state_name(req->rq_send_state),
1366                           req->rq_err);
1367
1368                 spin_lock_irqsave(&imp->imp_lock, flags);
1369                 list_del_init(&req->rq_list);
1370
1371                 if (req->rq_err) {
1372                         rc = -EIO;
1373                 }
1374                 else if (req->rq_intr) {
1375                         rc = -EINTR;
1376                 }
1377                 else if (req->rq_no_resend) {
1378                         spin_unlock_irqrestore(&imp->imp_lock, flags);
1379                         GOTO(out, rc = -ETIMEDOUT);
1380                 }
1381                 else {
1382                         GOTO(restart, rc);
1383                 }
1384         }
1385
1386         if (rc != 0) {
1387                 list_del_init(&req->rq_list);
1388                 spin_unlock_irqrestore(&imp->imp_lock, flags);
1389                 req->rq_status = rc; // XXX this ok?
1390                 GOTO(out, rc);
1391         }
1392
1393         if (req->rq_resend) {
1394                 lustre_msg_add_flags(req->rq_reqmsg, MSG_RESENT);
1395
1396                 if (req->rq_bulk != NULL)
1397                         ptlrpc_unregister_bulk (req);
1398
1399                 DEBUG_REQ(D_HA, req, "resending: ");
1400         }
1401
1402         /* XXX this is the same as ptlrpc_set_wait */
1403         LASSERT(list_empty(&req->rq_list));
1404         list_add_tail(&req->rq_list, &imp->imp_sending_list);
1405         spin_unlock_irqrestore(&imp->imp_lock, flags);
1406
1407         rc = ptl_send_rpc(req);
1408         if (rc) {
1409                 DEBUG_REQ(D_HA, req, "send failed (%d); recovering", rc);
1410                 timeout = 1;
1411         } else {
1412                 timeout = MAX(req->rq_timeout * HZ, 1);
1413                 DEBUG_REQ(D_NET, req, "-- sleeping for %d jiffies", timeout);
1414         }
1415         lwi = LWI_TIMEOUT_INTR(timeout, expired_request, interrupted_request,
1416                                req);
1417         l_wait_event(req->rq_reply_waitq, ptlrpc_check_reply(req), &lwi);
1418         DEBUG_REQ(D_NET, req, "-- done sleeping");
1419
1420         CDEBUG(D_RPCTRACE, "Completed RPC pname:cluuid:pid:xid:ni:nid:opc "
1421                "%s:%s:%d:"LPU64":%s:%s:%d\n", current->comm,
1422                imp->imp_obd->obd_uuid.uuid,
1423                req->rq_reqmsg->status, req->rq_xid,
1424                imp->imp_connection->c_peer.peer_ni->pni_name,
1425                ptlrpc_peernid2str(&imp->imp_connection->c_peer, str),
1426                req->rq_reqmsg->opc);
1427
1428         spin_lock_irqsave(&imp->imp_lock, flags);
1429         list_del_init(&req->rq_list);
1430         spin_unlock_irqrestore(&imp->imp_lock, flags);
1431
1432         /* If the reply was received normally, this just grabs the spinlock
1433          * (ensuring the reply callback has returned), sees that
1434          * req->rq_receiving_reply is clear and returns. */
1435         ptlrpc_unregister_reply (req);
1436
1437         if (req->rq_err)
1438                 GOTO(out, rc = -EIO);
1439
1440         /* Resend if we need to, unless we were interrupted. */
1441         if (req->rq_resend && !req->rq_intr) {
1442                 /* ...unless we were specifically told otherwise. */
1443                 if (req->rq_no_resend)
1444                         GOTO(out, rc = -ETIMEDOUT);
1445                 spin_lock_irqsave(&imp->imp_lock, flags);
1446                 goto restart;
1447         }
1448
1449         if (req->rq_intr) {
1450                 /* Should only be interrupted if we timed out. */
1451                 if (!req->rq_timedout)
1452                         DEBUG_REQ(D_ERROR, req,
1453                                   "rq_intr set but rq_timedout not");
1454                 GOTO(out, rc = -EINTR);
1455         }
1456
1457         if (req->rq_timedout) {                 /* non-recoverable timeout */
1458                 GOTO(out, rc = -ETIMEDOUT);
1459         }
1460
1461         if (!req->rq_replied) {
1462                 /* How can this be? -eeb */
1463                 DEBUG_REQ(D_ERROR, req, "!rq_replied: ");
1464                 LBUG();
1465                 GOTO(out, rc = req->rq_status);
1466         }
1467
1468         rc = after_reply (req);
1469         /* NB may return +ve success rc */
1470         if (req->rq_resend) {
1471                 spin_lock_irqsave(&imp->imp_lock, flags);
1472                 goto restart;
1473         }
1474
1475  out:
1476         if (req->rq_bulk != NULL) {
1477                 if (rc >= 0) {                  
1478                         /* success so far.  Note that anything going wrong
1479                          * with bulk now, is EXTREMELY strange, since the
1480                          * server must have believed that the bulk
1481                          * tranferred OK before she replied with success to
1482                          * me. */
1483                         lwi = LWI_TIMEOUT(timeout, NULL, NULL);
1484                         brc = l_wait_event(req->rq_reply_waitq,
1485                                            !ptlrpc_bulk_active(req->rq_bulk),
1486                                            &lwi);
1487                         LASSERT(brc == 0 || brc == -ETIMEDOUT);
1488                         if (brc != 0) {
1489                                 LASSERT(brc == -ETIMEDOUT);
1490                                 DEBUG_REQ(D_ERROR, req, "bulk timed out");
1491                                 rc = brc;
1492                         } else if (!req->rq_bulk->bd_success) {
1493                                 DEBUG_REQ(D_ERROR, req, "bulk transfer failed");
1494                                 rc = -EIO;
1495                         }
1496                 }
1497                 if (rc < 0)
1498                         ptlrpc_unregister_bulk (req);
1499         }
1500
1501         LASSERT(!req->rq_receiving_reply);
1502         req->rq_phase = RQ_PHASE_INTERPRET;
1503
1504         atomic_dec(&imp->imp_inflight);
1505         wake_up(&imp->imp_recovery_waitq);
1506         RETURN(rc);
1507 }
1508
1509 struct ptlrpc_replay_async_args {
1510         int praa_old_state;
1511         int praa_old_status;
1512 };
1513
1514 static int ptlrpc_replay_interpret(struct ptlrpc_request *req,
1515                                     void * data, int rc)
1516 {
1517         struct ptlrpc_replay_async_args *aa = data;
1518         struct obd_import *imp = req->rq_import;
1519         unsigned long flags;
1520
1521         atomic_dec(&imp->imp_replay_inflight);
1522         
1523         if (!req->rq_replied) {
1524                 CERROR("request replay timed out, restarting recovery\n");
1525                 GOTO(out, rc = -ETIMEDOUT);
1526         }
1527
1528 #if SWAB_PARANOIA
1529         /* Clear reply swab mask; this is a new reply in sender's byte order */
1530         req->rq_rep_swab_mask = 0;
1531 #endif
1532         LASSERT (req->rq_nob_received <= req->rq_replen);
1533         rc = lustre_unpack_msg(req->rq_repmsg, req->rq_nob_received);
1534         if (rc) {
1535                 CERROR("unpack_rep failed: %d\n", rc);
1536                 GOTO(out, rc = -EPROTO);
1537         }
1538
1539         if (req->rq_repmsg->type == PTL_RPC_MSG_ERR && 
1540             req->rq_repmsg->status == -ENOTCONN) 
1541                 GOTO(out, rc = req->rq_repmsg->status);
1542
1543         /* The transno had better not change over replay. */
1544         LASSERT(req->rq_reqmsg->transno == req->rq_repmsg->transno);
1545
1546         DEBUG_REQ(D_HA, req, "got rep");
1547
1548         /* let the callback do fixups, possibly including in the request */
1549         if (req->rq_replay_cb)
1550                 req->rq_replay_cb(req);
1551
1552         if (req->rq_replied && req->rq_repmsg->status != aa->praa_old_status) {
1553                 DEBUG_REQ(D_ERROR, req, "status %d, old was %d",
1554                           req->rq_repmsg->status, aa->praa_old_status);
1555         } else {
1556                 /* Put it back for re-replay. */
1557                 req->rq_repmsg->status = aa->praa_old_status;
1558         }
1559
1560         spin_lock_irqsave(&imp->imp_lock, flags);
1561         imp->imp_last_replay_transno = req->rq_transno;
1562         spin_unlock_irqrestore(&imp->imp_lock, flags);
1563
1564         /* continue with recovery */
1565         rc = ptlrpc_import_recovery_state_machine(imp);
1566  out:
1567         req->rq_send_state = aa->praa_old_state;
1568         
1569         if (rc != 0)
1570                 /* this replay failed, so restart recovery */
1571                 ptlrpc_connect_import(imp, NULL);
1572
1573         RETURN(rc);
1574 }
1575
1576
1577 int ptlrpc_replay_req(struct ptlrpc_request *req)
1578 {
1579         struct ptlrpc_replay_async_args *aa;
1580         ENTRY;
1581
1582         LASSERT(req->rq_import->imp_state == LUSTRE_IMP_REPLAY);
1583
1584         /* Not handling automatic bulk replay yet (or ever?) */
1585         LASSERT(req->rq_bulk == NULL);
1586
1587         DEBUG_REQ(D_HA, req, "REPLAY");
1588
1589         LASSERT (sizeof (*aa) <= sizeof (req->rq_async_args));
1590         aa = (struct ptlrpc_replay_async_args *)&req->rq_async_args;
1591         memset(aa, 0, sizeof *aa);
1592
1593         /* Prepare request to be resent with ptlrpcd */
1594         aa->praa_old_state = req->rq_send_state;
1595         req->rq_send_state = LUSTRE_IMP_REPLAY;
1596         req->rq_phase = RQ_PHASE_NEW;
1597         aa->praa_old_status = req->rq_repmsg->status;
1598         req->rq_status = 0;
1599
1600         req->rq_interpret_reply = ptlrpc_replay_interpret;
1601         atomic_inc(&req->rq_import->imp_replay_inflight);
1602         ptlrpc_request_addref(req); /* ptlrpcd needs a ref */
1603
1604         ptlrpcd_add_req(req);
1605         RETURN(0);
1606 }
1607
1608 void ptlrpc_abort_inflight(struct obd_import *imp)
1609 {
1610         unsigned long flags;
1611         struct list_head *tmp, *n;
1612         ENTRY;
1613
1614         /* Make sure that no new requests get processed for this import.
1615          * ptlrpc_{queue,set}_wait must (and does) hold imp_lock while testing
1616          * this flag and then putting requests on sending_list or delayed_list.
1617          */
1618         spin_lock_irqsave(&imp->imp_lock, flags);
1619
1620         /* XXX locking?  Maybe we should remove each request with the list
1621          * locked?  Also, how do we know if the requests on the list are
1622          * being freed at this time?
1623          */
1624         list_for_each_safe(tmp, n, &imp->imp_sending_list) {
1625                 struct ptlrpc_request *req =
1626                         list_entry(tmp, struct ptlrpc_request, rq_list);
1627
1628                 DEBUG_REQ(D_HA, req, "inflight");
1629
1630                 spin_lock (&req->rq_lock);
1631                 if (req->rq_import_generation < imp->imp_generation) {
1632                         req->rq_err = 1;
1633                         ptlrpc_wake_client_req(req);
1634                 }
1635                 spin_unlock (&req->rq_lock);
1636         }
1637
1638         list_for_each_safe(tmp, n, &imp->imp_delayed_list) {
1639                 struct ptlrpc_request *req =
1640                         list_entry(tmp, struct ptlrpc_request, rq_list);
1641
1642                 DEBUG_REQ(D_HA, req, "aborting waiting req");
1643
1644                 spin_lock (&req->rq_lock);
1645                 if (req->rq_import_generation < imp->imp_generation) {
1646                         req->rq_err = 1;
1647                         ptlrpc_wake_client_req(req);
1648                 }
1649                 spin_unlock (&req->rq_lock);
1650         }
1651
1652         /* Last chance to free reqs left on the replay list, but we
1653          * will still leak reqs that haven't comitted.  */
1654         if (imp->imp_replayable)
1655                 ptlrpc_free_committed(imp);
1656
1657         spin_unlock_irqrestore(&imp->imp_lock, flags);
1658
1659         EXIT;
1660 }
1661
1662 static __u64 ptlrpc_last_xid = 0;
1663 static spinlock_t ptlrpc_last_xid_lock = SPIN_LOCK_UNLOCKED;
1664
1665 __u64 ptlrpc_next_xid(void)
1666 {
1667         __u64 tmp;
1668         spin_lock(&ptlrpc_last_xid_lock);
1669         tmp = ++ptlrpc_last_xid;
1670         spin_unlock(&ptlrpc_last_xid_lock);
1671         return tmp;
1672 }
1673
1674 __u64 ptlrpc_sample_next_xid(void)
1675 {
1676         __u64 tmp;
1677         spin_lock(&ptlrpc_last_xid_lock);
1678         tmp = ptlrpc_last_xid + 1;
1679         spin_unlock(&ptlrpc_last_xid_lock);
1680         return tmp;
1681 }
1682 EXPORT_SYMBOL(ptlrpc_sample_next_xid);