Whamcloud - gitweb
Merge b_md into HEAD
[fs/lustre-release.git] / lustre / ptlrpc / client.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  *
21  */
22
23 #define DEBUG_SUBSYSTEM S_RPC
24
25 #include <linux/obd_support.h>
26 #include <linux/obd_class.h>
27 #include <linux/lustre_lib.h>
28 #include <linux/lustre_ha.h>
29 #include <linux/lustre_import.h>
30
31 void ptlrpc_init_client(int req_portal, int rep_portal, char *name,
32                         struct ptlrpc_client *cl)
33 {
34         cl->cli_request_portal = req_portal;
35         cl->cli_reply_portal   = rep_portal;
36         cl->cli_name           = name;
37 }
38
39 struct obd_uuid *ptlrpc_req_to_uuid(struct ptlrpc_request *req)
40 {
41         return &req->rq_connection->c_remote_uuid;
42 }
43
44 struct ptlrpc_connection *ptlrpc_uuid_to_connection(struct obd_uuid *uuid)
45 {
46         struct ptlrpc_connection *c;
47         struct lustre_peer peer;
48         int err;
49
50         err = kportal_uuid_to_peer(uuid->uuid, &peer);
51         if (err != 0) {
52                 CERROR("cannot find peer %s!\n", uuid->uuid);
53                 return NULL;
54         }
55
56         c = ptlrpc_get_connection(&peer, uuid);
57         if (c) {
58                 memcpy(c->c_remote_uuid.uuid,
59                        uuid->uuid, sizeof(c->c_remote_uuid.uuid));
60                 c->c_epoch++;
61         }
62
63         CDEBUG(D_INFO, "%s -> %p\n", uuid->uuid, c);
64
65         return c;
66 }
67
68 void ptlrpc_readdress_connection(struct ptlrpc_connection *conn,struct obd_uuid *uuid)
69 {
70         struct lustre_peer peer;
71         int err;
72
73         err = kportal_uuid_to_peer(uuid->uuid, &peer);
74         if (err != 0) {
75                 CERROR("cannot find peer %s!\n", uuid->uuid);
76                 return;
77         }
78
79         memcpy(&conn->c_peer, &peer, sizeof(peer));
80         return;
81 }
82
83 struct ptlrpc_bulk_desc *ptlrpc_prep_bulk(struct ptlrpc_connection *conn)
84 {
85         struct ptlrpc_bulk_desc *desc;
86
87         OBD_ALLOC(desc, sizeof(*desc));
88         if (desc != NULL) {
89                 desc->bd_connection = ptlrpc_connection_addref(conn);
90                 atomic_set(&desc->bd_refcount, 1);
91                 init_waitqueue_head(&desc->bd_waitq);
92                 INIT_LIST_HEAD(&desc->bd_page_list);
93                 INIT_LIST_HEAD(&desc->bd_set_chain);
94                 ptl_set_inv_handle(&desc->bd_md_h);
95                 ptl_set_inv_handle(&desc->bd_me_h);
96         }
97
98         return desc;
99 }
100
101 int ptlrpc_bulk_error(struct ptlrpc_bulk_desc *desc)
102 {
103         int rc = 0;
104         if (desc->bd_flags & PTL_RPC_FL_TIMEOUT) {
105                 rc = (desc->bd_flags & PTL_RPC_FL_INTR ? -ERESTARTSYS :
106                       -ETIMEDOUT);
107         }
108         return rc;
109 }
110
111 struct ptlrpc_bulk_page *ptlrpc_prep_bulk_page(struct ptlrpc_bulk_desc *desc)
112 {
113         struct ptlrpc_bulk_page *bulk;
114
115         OBD_ALLOC(bulk, sizeof(*bulk));
116         if (bulk != NULL) {
117                 bulk->bp_desc = desc;
118                 list_add_tail(&bulk->bp_link, &desc->bd_page_list);
119                 desc->bd_page_count++;
120         }
121         return bulk;
122 }
123
124 void ptlrpc_free_bulk(struct ptlrpc_bulk_desc *desc)
125 {
126         struct list_head *tmp, *next;
127         ENTRY;
128         if (desc == NULL) {
129                 EXIT;
130                 return;
131         }
132
133         LASSERT(list_empty(&desc->bd_set_chain));
134
135         if (atomic_read(&desc->bd_refcount) != 0)
136                 CERROR("freeing desc %p with refcount %d!\n", desc,
137                        atomic_read(&desc->bd_refcount));
138
139         list_for_each_safe(tmp, next, &desc->bd_page_list) {
140                 struct ptlrpc_bulk_page *bulk;
141                 bulk = list_entry(tmp, struct ptlrpc_bulk_page, bp_link);
142                 ptlrpc_free_bulk_page(bulk);
143         }
144
145         ptlrpc_put_connection(desc->bd_connection);
146
147         OBD_FREE(desc, sizeof(*desc));
148         EXIT;
149 }
150
151 void ptlrpc_free_bulk_page(struct ptlrpc_bulk_page *bulk)
152 {
153         ENTRY;
154         if (bulk == NULL) {
155                 EXIT;
156                 return;
157         }
158
159         list_del(&bulk->bp_link);
160         bulk->bp_desc->bd_page_count--;
161         OBD_FREE(bulk, sizeof(*bulk));
162         EXIT;
163 }
164
165 static int ll_sync_brw_timeout(void *data)
166 {
167         struct obd_brw_set *set = data;
168         struct list_head *tmp;
169         int failed = 0;
170         ENTRY;
171
172         LASSERT(set);
173
174         set->brw_flags |= PTL_RPC_FL_TIMEOUT;
175
176         list_for_each(tmp, &set->brw_desc_head) {
177                 struct ptlrpc_bulk_desc *desc =
178                         list_entry(tmp, struct ptlrpc_bulk_desc, bd_set_chain);
179
180                 /* Skip descriptors that were completed successfully. */
181                 if (desc->bd_flags & (PTL_BULK_FL_RCVD | PTL_BULK_FL_SENT))
182                         continue;
183
184                 LASSERT(desc->bd_connection);
185
186                 /* If PtlMDUnlink succeeds, then it hasn't completed yet.  If it
187                  * fails, the bulk finished _just_ in time (after the timeout
188                  * fired but before we got this far) and we'll let it live.
189                  */
190                 if (PtlMDUnlink(desc->bd_md_h) != 0) {
191                         CERROR("Near-miss on OST %s -- need to adjust "
192                                "obd_timeout?\n",
193                                desc->bd_connection->c_remote_uuid.uuid);
194                         continue;
195                 }
196
197                 CERROR("IO of %d pages to/from %s:%d (conn %p) timed out\n",
198                        desc->bd_page_count,
199                        desc->bd_connection->c_remote_uuid.uuid,
200                        desc->bd_portal, desc->bd_connection);
201
202                 /* This one will "never" arrive, don't wait for it. */
203                 if (atomic_dec_and_test(&set->brw_refcount))
204                         wake_up(&set->brw_waitq);
205
206                 if (class_signal_connection_failure)
207                         class_signal_connection_failure(desc->bd_connection);
208                 else
209                         failed = 1;
210         }
211
212         /* 0 = We go back to sleep, until we're resumed or interrupted */
213         /* 1 = We can't be recovered, just abort the syscall with -ETIMEDOUT */
214         RETURN(failed);
215 }
216
217 static int ll_sync_brw_intr(void *data)
218 {
219         struct obd_brw_set *set = data;
220
221         ENTRY;
222         set->brw_flags |= PTL_RPC_FL_INTR;
223         RETURN(1); /* ignored, as of this writing */
224 }
225
226 int ll_brw_sync_wait(struct obd_brw_set *set, int phase)
227 {
228         struct l_wait_info lwi;
229         struct list_head *tmp, *next;
230         int rc = 0;
231         ENTRY;
232
233         switch(phase) {
234         case CB_PHASE_START:
235                 lwi = LWI_TIMEOUT_INTR(obd_timeout * HZ, ll_sync_brw_timeout,
236                                        ll_sync_brw_intr, set);
237                 rc = l_wait_event(set->brw_waitq,
238                                   atomic_read(&set->brw_refcount) == 0, &lwi);
239
240                 list_for_each_safe(tmp, next, &set->brw_desc_head) {
241                         struct ptlrpc_bulk_desc *desc =
242                                 list_entry(tmp, struct ptlrpc_bulk_desc,
243                                            bd_set_chain);
244                         list_del_init(&desc->bd_set_chain);
245                         ptlrpc_bulk_decref(desc);
246                 }
247                 break;
248         case CB_PHASE_FINISH:
249                 if (atomic_dec_and_test(&set->brw_refcount))
250                         wake_up(&set->brw_waitq);
251                 break;
252         default:
253                 LBUG();
254         }
255
256         RETURN(rc);
257 }
258
259 struct ptlrpc_request *ptlrpc_prep_req(struct obd_import *imp, int opcode,
260                                        int count, int *lengths, char **bufs)
261 {
262         struct ptlrpc_connection *conn;
263         struct ptlrpc_request *request;
264         int rc;
265         ENTRY;
266
267         LASSERT((unsigned long)imp > 0x1000);
268         conn = imp->imp_connection;
269
270         OBD_ALLOC(request, sizeof(*request));
271         if (!request) {
272                 CERROR("request allocation out of memory\n");
273                 RETURN(NULL);
274         }
275
276         rc = lustre_pack_msg(count, lengths, bufs,
277                              &request->rq_reqlen, &request->rq_reqmsg);
278         if (rc) {
279                 CERROR("cannot pack request %d\n", rc);
280                 OBD_FREE(request, sizeof(*request));
281                 RETURN(NULL);
282         }
283
284         request->rq_level = LUSTRE_CONN_FULL;
285         request->rq_type = PTL_RPC_MSG_REQUEST;
286         request->rq_import = imp;
287
288         /* XXX FIXME bug 625069, now 249 */
289         request->rq_request_portal = imp->imp_client->cli_request_portal;
290         request->rq_reply_portal = imp->imp_client->cli_reply_portal;
291
292         request->rq_connection = ptlrpc_connection_addref(conn);
293
294         INIT_LIST_HEAD(&request->rq_list);
295         atomic_set(&request->rq_refcount, 1);
296
297         request->rq_reqmsg->magic = PTLRPC_MSG_MAGIC;
298         request->rq_reqmsg->version = PTLRPC_MSG_VERSION;
299         request->rq_reqmsg->opc = HTON__u32(opcode);
300         request->rq_reqmsg->flags = 0;
301
302         ptlrpc_hdl2req(request, &imp->imp_handle);
303         RETURN(request);
304 }
305
306 static void __ptlrpc_free_req(struct ptlrpc_request *request, int locked)
307 {
308         ENTRY;
309         if (request == NULL) {
310                 EXIT;
311                 return;
312         }
313
314         if (atomic_read(&request->rq_refcount) != 0) {
315                 CERROR("freeing request %p (%d->%s:%d) with refcount %d\n",
316                        request, request->rq_reqmsg->opc,
317                        request->rq_connection->c_remote_uuid.uuid,
318                        request->rq_import->imp_client->cli_request_portal,
319                        atomic_read (&request->rq_refcount));
320                 /* LBUG(); */
321         }
322
323         if (request->rq_repmsg != NULL) {
324                 OBD_FREE(request->rq_repmsg, request->rq_replen);
325                 request->rq_repmsg = NULL;
326                 request->rq_reply_md.start = NULL;
327         }
328         if (request->rq_reqmsg != NULL) {
329                 OBD_FREE(request->rq_reqmsg, request->rq_reqlen);
330                 request->rq_reqmsg = NULL;
331         }
332
333         if (request->rq_import) {
334                 unsigned long flags = 0;
335                 if (!locked)
336                         spin_lock_irqsave(&request->rq_import->imp_lock, flags);
337                 list_del_init(&request->rq_list);
338                 if (!locked)
339                         spin_unlock_irqrestore(&request->rq_import->imp_lock,
340                                                flags);
341         }
342
343         ptlrpc_put_connection(request->rq_connection);
344         OBD_FREE(request, sizeof(*request));
345         EXIT;
346 }
347
348 void ptlrpc_free_req(struct ptlrpc_request *request)
349 {
350         __ptlrpc_free_req(request, 0);
351 }
352
353 static int __ptlrpc_req_finished(struct ptlrpc_request *request, int locked)
354 {
355         ENTRY;
356         if (request == NULL)
357                 RETURN(1);
358
359         if (request == (void *)(long)(0x5a5a5a5a5a5a5a5a)) {
360                 CERROR("dereferencing freed request (bug 575)\n");
361                 LBUG();
362                 RETURN(1);
363         }
364
365         DEBUG_REQ(D_INFO, request, "refcount now %u",
366                   atomic_read(&request->rq_refcount) - 1);
367
368         if (atomic_dec_and_test(&request->rq_refcount)) {
369                 __ptlrpc_free_req(request, locked);
370                 RETURN(1);
371         }
372
373         RETURN(0);
374 }
375
376 void ptlrpc_req_finished(struct ptlrpc_request *request)
377 {
378         __ptlrpc_req_finished(request, 0);
379 }
380
381 static int ptlrpc_check_reply(struct ptlrpc_request *req)
382 {
383         int rc = 0;
384
385         ENTRY;
386         if (req->rq_repmsg != NULL) {
387                 req->rq_transno = NTOH__u64(req->rq_repmsg->transno);
388                 /* Store transno in reqmsg for replay. */
389                 req->rq_reqmsg->transno = req->rq_repmsg->transno;
390                 req->rq_flags |= PTL_RPC_FL_REPLIED;
391                 GOTO(out, rc = 1);
392         }
393
394         if (req->rq_flags & PTL_RPC_FL_RESEND) {
395                 ENTRY;
396                 DEBUG_REQ(D_ERROR, req, "RESEND:");
397                 GOTO(out, rc = 1);
398         }
399
400         if (req->rq_flags & PTL_RPC_FL_ERR) {
401                 ENTRY;
402                 DEBUG_REQ(D_ERROR, req, "ABORTED:");
403                 GOTO(out, rc = 1);
404         }
405
406         if (req->rq_flags & PTL_RPC_FL_RESTART) {
407                 DEBUG_REQ(D_ERROR, req, "RESTART:");
408                 GOTO(out, rc = 1);
409         }
410         EXIT;
411  out:
412         DEBUG_REQ(D_NET, req, "rc = %d for", rc);
413         return rc;
414 }
415
416 static int ptlrpc_check_status(struct ptlrpc_request *req)
417 {
418         int err;
419         ENTRY;
420
421         err = req->rq_repmsg->status;
422         if (req->rq_repmsg->type == NTOH__u32(PTL_RPC_MSG_ERR)) {
423                 DEBUG_REQ(D_ERROR, req, "type == PTL_RPC_MSG_ERR (%d)", err);
424                 RETURN(err ? err : -EINVAL);
425         }
426
427         if (err < 0) {
428                 DEBUG_REQ(D_INFO, req, "status is %d", err);
429         } else if (err > 0) {
430                 /* XXX: translate this error from net to host */
431                 DEBUG_REQ(D_INFO, req, "status is %d", err);
432         }
433
434         RETURN(err);
435 }
436
437 static void ptlrpc_cleanup_request_buf(struct ptlrpc_request *request)
438 {
439         OBD_FREE(request->rq_reqmsg, request->rq_reqlen);
440         request->rq_reqmsg = NULL;
441         request->rq_reqlen = 0;
442 }
443
444 /* Abort this request and cleanup any resources associated with it. */
445 static int ptlrpc_abort(struct ptlrpc_request *request)
446 {
447         /* First remove the ME for the reply; in theory, this means
448          * that we can tear down the buffer safely. */
449         if (PtlMEUnlink(request->rq_reply_me_h) != PTL_OK)
450                 RETURN(0);
451         OBD_FREE(request->rq_reply_md.start, request->rq_replen);
452
453         memset(&request->rq_reply_me_h, 0, sizeof(request->rq_reply_me_h));
454         request->rq_reply_md.start = NULL;
455         request->rq_repmsg = NULL;
456         return 0;
457 }
458
459 /* caller must hold imp->imp_lock */
460 void ptlrpc_free_committed(struct obd_import *imp)
461 {
462         struct list_head *tmp, *saved;
463         struct ptlrpc_request *req;
464         ENTRY;
465
466         LASSERT(imp != NULL);
467
468 #ifdef CONFIG_SMP
469         LASSERT(spin_is_locked(&imp->imp_lock));
470 #endif
471
472         CDEBUG(D_HA, "committing for last_committed "LPU64"\n",
473                imp->imp_peer_committed_transno);
474
475         list_for_each_safe(tmp, saved, &imp->imp_replay_list) {
476                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
477
478                 if (req->rq_flags & PTL_RPC_FL_REPLAY) {
479                         DEBUG_REQ(D_HA, req, "keeping (FL_REPLAY)");
480                         continue;
481                 }
482
483                 /* not yet committed */
484                 if (req->rq_transno > imp->imp_peer_committed_transno) {
485                         DEBUG_REQ(D_HA, req, "stopping search");
486                         break;
487                 }
488
489                 DEBUG_REQ(D_HA, req, "committing (last_committed "LPU64")",
490                           imp->imp_peer_committed_transno);
491                 list_del_init(&req->rq_list);
492                 __ptlrpc_req_finished(req, 1);
493         }
494
495         EXIT;
496         return;
497 }
498
499 void ptlrpc_cleanup_client(struct obd_import *imp)
500 {
501         struct list_head *tmp, *saved;
502         struct ptlrpc_request *req;
503         struct ptlrpc_connection *conn = imp->imp_connection;
504         unsigned long flags;
505         ENTRY;
506
507         LASSERT(conn);
508
509         spin_lock_irqsave(&imp->imp_lock, flags);
510         list_for_each_safe(tmp, saved, &imp->imp_replay_list) {
511                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
512
513                 /* XXX we should make sure that nobody's sleeping on these! */
514                 DEBUG_REQ(D_HA, req, "cleaning up from sending list");
515                 list_del_init(&req->rq_list);
516                 req->rq_import = NULL;
517                 __ptlrpc_req_finished(req, 0);
518         }
519         spin_unlock_irqrestore(&imp->imp_lock, flags);
520
521         EXIT;
522         return;
523 }
524
525 void ptlrpc_continue_req(struct ptlrpc_request *req)
526 {
527         ENTRY;
528         DEBUG_REQ(D_HA, req, "continuing delayed request");
529         req->rq_reqmsg->addr = req->rq_import->imp_handle.addr;
530         req->rq_reqmsg->cookie = req->rq_import->imp_handle.cookie;
531         wake_up(&req->rq_wait_for_rep);
532         EXIT;
533 }
534
535 void ptlrpc_resend_req(struct ptlrpc_request *req)
536 {
537         ENTRY;
538         DEBUG_REQ(D_HA, req, "resending");
539         req->rq_reqmsg->addr = req->rq_import->imp_handle.addr;
540         req->rq_reqmsg->cookie = req->rq_import->imp_handle.cookie;
541         req->rq_status = -EAGAIN;
542         req->rq_level = LUSTRE_CONN_RECOVD;
543         req->rq_flags |= PTL_RPC_FL_RESEND;
544         req->rq_flags &= ~PTL_RPC_FL_TIMEOUT;
545         wake_up(&req->rq_wait_for_rep);
546         EXIT;
547 }
548
549 void ptlrpc_restart_req(struct ptlrpc_request *req)
550 {
551         ENTRY;
552         DEBUG_REQ(D_HA, req, "restarting (possibly-)completed request");
553         req->rq_status = -ERESTARTSYS;
554         req->rq_flags |= PTL_RPC_FL_RESTART;
555         req->rq_flags &= ~PTL_RPC_FL_TIMEOUT;
556         wake_up(&req->rq_wait_for_rep);
557         EXIT;
558 }
559
560 static int expired_request(void *data)
561 {
562         struct ptlrpc_request *req = data;
563
564         ENTRY;
565         if (!req) {
566                 CERROR("NULL req!");
567                 LBUG();
568                 RETURN(0);
569         }
570
571         DEBUG_REQ(D_ERROR, req, "timeout");
572         ptlrpc_abort(req);
573         req->rq_flags |= PTL_RPC_FL_TIMEOUT;
574
575         if (!req->rq_import) {
576                 DEBUG_REQ(D_HA, req, "NULL import; already cleaned up?");
577                 RETURN(1);
578         }
579
580         if (!req->rq_import->imp_connection) {
581                 DEBUG_REQ(D_ERROR, req, "NULL connection");
582                 LBUG();
583                 RETURN(0);
584         }
585
586         if (!req->rq_import->imp_connection->c_recovd_data.rd_recovd)
587                 RETURN(1);
588
589         recovd_conn_fail(req->rq_import->imp_connection);
590
591         /* If this request is for recovery or other primordial tasks,
592          * don't go back to sleep.
593          */
594         if (req->rq_level < LUSTRE_CONN_FULL)
595                 RETURN(1);
596         RETURN(0);
597 }
598
599 static int interrupted_request(void *data)
600 {
601         struct ptlrpc_request *req = data;
602         ENTRY;
603         req->rq_flags |= PTL_RPC_FL_INTR;
604         RETURN(1); /* ignored, as of this writing */
605 }
606
607 struct ptlrpc_request *ptlrpc_request_addref(struct ptlrpc_request *req)
608 {
609         ENTRY;
610         atomic_inc(&req->rq_refcount);
611         RETURN(req);
612 }
613
614 void ptlrpc_retain_replayable_request(struct ptlrpc_request *req,
615                                       struct obd_import *imp)
616 {
617         struct list_head *tmp;
618
619 #ifdef CONFIG_SMP
620         LASSERT(spin_is_locked(&imp->imp_lock));
621 #endif
622
623         LASSERT(imp->imp_flags & IMP_REPLAYABLE);
624         /* Balanced in ptlrpc_free_committed, usually. */
625         ptlrpc_request_addref(req);
626         list_for_each_prev(tmp, &imp->imp_replay_list) {
627                 struct ptlrpc_request *iter =
628                         list_entry(tmp, struct ptlrpc_request, rq_list);
629
630                 /* We may have duplicate transnos if we create and then
631                  * open a file, or for closes retained if to match creating
632                  * opens, so use req->rq_xid as a secondary key.
633                  * (See bugs 684, 685, and 428.)
634                  */
635                 if (iter->rq_transno > req->rq_transno)
636                         continue;
637
638                 if (iter->rq_transno == req->rq_transno) {
639                         LASSERT(iter->rq_xid != req->rq_xid);
640                         if (iter->rq_xid > req->rq_xid)
641                                 continue;
642                 }
643
644                 list_add(&req->rq_list, &iter->rq_list);
645                 return;
646         }
647
648         list_add_tail(&req->rq_list, &imp->imp_replay_list);
649 }
650
651 int ptlrpc_queue_wait(struct ptlrpc_request *req)
652 {
653         int rc = 0;
654         struct l_wait_info lwi;
655         struct obd_import *imp = req->rq_import;
656         struct ptlrpc_connection *conn = imp->imp_connection;
657         unsigned int flags;
658         ENTRY;
659
660         init_waitqueue_head(&req->rq_wait_for_rep);
661
662         spin_lock_irqsave(&imp->imp_lock, flags);
663         req->rq_xid = HTON__u32(++imp->imp_last_xid);
664         spin_unlock_irqrestore(&imp->imp_lock, flags);
665
666         /* for distributed debugging */
667         req->rq_reqmsg->status = HTON__u32(current->pid);
668         CDEBUG(D_RPCTRACE, "Sending RPC pid:xid:nid:opc %d:"LPU64":%x:%d\n",
669                NTOH__u32(req->rq_reqmsg->status), req->rq_xid,
670                conn->c_peer.peer_nid, NTOH__u32(req->rq_reqmsg->opc));
671
672         spin_lock_irqsave(&imp->imp_lock, flags);
673
674         /*
675          * If the import has been invalidated (such as by an OST failure), the
676          * request must fail with -EIO.
677          */
678         if (req->rq_import->imp_flags & IMP_INVALID) {
679                 DEBUG_REQ(D_ERROR, req, "IMP_INVALID:");
680                 spin_unlock_irqrestore(&imp->imp_lock, flags);
681                 RETURN(-EIO);
682         }
683
684         if (req->rq_level > imp->imp_level) {
685                 list_del(&req->rq_list);
686                 list_add_tail(&req->rq_list, &imp->imp_delayed_list);
687                 spin_unlock_irqrestore(&imp->imp_lock, flags);
688
689                 DEBUG_REQ(D_HA, req, "\"%s\" waiting for recovery: (%d < %d)",
690                           current->comm, req->rq_level, imp->imp_level);
691                 lwi = LWI_INTR(NULL, NULL);
692                 rc = l_wait_event(req->rq_wait_for_rep,
693                                   (req->rq_level <= imp->imp_level) ||
694                                   (req->rq_flags & PTL_RPC_FL_ERR), &lwi);
695
696                 if (req->rq_flags & PTL_RPC_FL_ERR)
697                         rc = -EIO;
698
699                 if (!req->rq_import)
700                         RETURN(rc);
701
702                 spin_lock_irqsave(&imp->imp_lock, flags);
703                 list_del_init(&req->rq_list);
704
705                 if (rc) {
706                         spin_unlock_irqrestore(&imp->imp_lock, flags);
707                         RETURN(rc);
708                 }
709
710                 CERROR("process %d resumed\n", current->pid);
711         }
712  resend:
713
714         LASSERT(list_empty(&req->rq_list));
715         list_add_tail(&req->rq_list, &imp->imp_sending_list);
716         spin_unlock_irqrestore(&imp->imp_lock, flags);
717         rc = ptl_send_rpc(req);
718         if (rc) {
719                 CDEBUG(D_HA, "error %d, opcode %d, need recovery\n", rc,
720                        req->rq_reqmsg->opc);
721                 /* sleep for a jiffy, then trigger recovery */
722                 lwi = LWI_TIMEOUT_INTR(1, expired_request,
723                                        interrupted_request, req);
724         } else {
725                 DEBUG_REQ(D_NET, req, "-- sleeping");
726                 lwi = LWI_TIMEOUT_INTR(obd_timeout * HZ, expired_request,
727                                        interrupted_request, req);
728         }
729         l_wait_event(req->rq_wait_for_rep, ptlrpc_check_reply(req), &lwi);
730         DEBUG_REQ(D_NET, req, "-- done sleeping");
731
732         spin_lock_irqsave(&imp->imp_lock, flags);
733         list_del_init(&req->rq_list);
734         spin_unlock_irqrestore(&imp->imp_lock, flags);
735
736         if (req->rq_flags & PTL_RPC_FL_ERR) {
737                 ptlrpc_abort(req);
738                 GOTO(out, rc = -EIO);
739         }
740
741         /* Don't resend if we were interrupted. */
742         if ((req->rq_flags & (PTL_RPC_FL_RESEND | PTL_RPC_FL_INTR)) ==
743             PTL_RPC_FL_RESEND) {
744                 req->rq_flags &= ~PTL_RPC_FL_RESEND;
745                 lustre_msg_add_flags(req->rq_reqmsg, MSG_RESENT);
746                 DEBUG_REQ(D_HA, req, "resending: ");
747                 spin_lock_irqsave(&imp->imp_lock, flags);
748                 goto resend;
749         }
750
751         if (req->rq_flags & PTL_RPC_FL_INTR) {
752                 if (!(req->rq_flags & PTL_RPC_FL_TIMEOUT))
753                         LBUG(); /* should only be interrupted if we timed out */
754                 /* Clean up the dangling reply buffers */
755                 ptlrpc_abort(req);
756                 GOTO(out, rc = -EINTR);
757         }
758
759         if (req->rq_flags & PTL_RPC_FL_TIMEOUT)
760                 GOTO(out, rc = -ETIMEDOUT);
761
762         if (!(req->rq_flags & PTL_RPC_FL_REPLIED))
763                 GOTO(out, rc = req->rq_status);
764
765         rc = lustre_unpack_msg(req->rq_repmsg, req->rq_replen);
766         if (rc) {
767                 CERROR("unpack_rep failed: %d\n", rc);
768                 GOTO(out, rc);
769         }
770 #if 0
771         /* FIXME: Enable when BlueArc makes new release */
772         if (req->rq_repmsg->type != PTL_RPC_MSG_REPLY &&
773             req->rq_repmsg->type != PTL_RPC_MSG_ERR) {
774                 CERROR("invalid packet type received (type=%u)\n",
775                        req->rq_repmsg->type);
776                 LBUG();
777                 GOTO(out, rc = -EINVAL);
778         }
779 #endif
780         DEBUG_REQ(D_NET, req, "status %d", req->rq_repmsg->status);
781
782         /* We're a rejected connection, need to invalidate and rebuild. */
783         if (req->rq_repmsg->status == -ENOTCONN) {
784                 spin_lock_irqsave(&imp->imp_lock, flags);
785                 /* If someone else is reconnecting us (CONN_RECOVD) or has
786                  * already completed it (handle mismatch), then we just need
787                  * to get out.
788                  */
789                 if (imp->imp_level == LUSTRE_CONN_RECOVD ||
790                     imp->imp_handle.addr != req->rq_reqmsg->addr ||
791                     imp->imp_handle.cookie != req->rq_reqmsg->cookie) {
792                         spin_unlock_irqrestore(&imp->imp_lock, flags);
793                         GOTO(out, rc = -EIO);
794                 }
795                 imp->imp_level = LUSTRE_CONN_RECOVD;
796                 spin_unlock_irqrestore(&imp->imp_lock, flags);
797                 rc = imp->imp_recover(imp, PTLRPC_RECOVD_PHASE_NOTCONN);
798                 if (rc)
799                         LBUG();
800                 GOTO(out, rc = -EIO);
801         }
802
803         rc = ptlrpc_check_status(req);
804
805         if (req->rq_import->imp_flags & IMP_REPLAYABLE) {
806                 spin_lock_irqsave(&imp->imp_lock, flags);
807                 if ((req->rq_flags & PTL_RPC_FL_REPLAY || req->rq_transno != 0)
808                     && rc >= 0) {
809                         ptlrpc_retain_replayable_request(req, imp);
810                 }
811
812                 if (req->rq_transno > imp->imp_max_transno) {
813                         imp->imp_max_transno = req->rq_transno;
814                 }
815
816                 /* Replay-enabled imports return commit-status information. */
817                 if (req->rq_repmsg->last_committed) {
818                         imp->imp_peer_committed_transno =
819                                 req->rq_repmsg->last_committed;
820                 }
821                 ptlrpc_free_committed(imp);
822                 spin_unlock_irqrestore(&imp->imp_lock, flags);
823         }
824
825         EXIT;
826  out:
827         return rc;
828 }
829
830 int ptlrpc_replay_req(struct ptlrpc_request *req)
831 {
832         int rc = 0, old_level, old_status = 0;
833         // struct ptlrpc_client *cli = req->rq_import->imp_client;
834         struct l_wait_info lwi;
835         ENTRY;
836
837         init_waitqueue_head(&req->rq_wait_for_rep);
838         DEBUG_REQ(D_NET, req, "");
839
840         req->rq_reqmsg->addr = req->rq_import->imp_handle.addr;
841         req->rq_reqmsg->cookie = req->rq_import->imp_handle.cookie;
842
843         /* temporarily set request to RECOVD level (reset at out:) */
844         old_level = req->rq_level;
845         if (req->rq_flags & PTL_RPC_FL_REPLIED)
846                 old_status = req->rq_repmsg->status;
847         req->rq_level = LUSTRE_CONN_RECOVD;
848         rc = ptl_send_rpc(req);
849         if (rc) {
850                 CERROR("error %d, opcode %d\n", rc, req->rq_reqmsg->opc);
851                 ptlrpc_cleanup_request_buf(req);
852                 // up(&cli->cli_rpc_sem);
853                 GOTO(out, rc = -rc);
854         }
855
856         CDEBUG(D_OTHER, "-- sleeping\n");
857         lwi = LWI_INTR(NULL, NULL); /* XXX needs timeout, nested recovery */
858         l_wait_event(req->rq_wait_for_rep, ptlrpc_check_reply(req), &lwi);
859         CDEBUG(D_OTHER, "-- done\n");
860
861         // up(&cli->cli_rpc_sem);
862
863         if (!(req->rq_flags & PTL_RPC_FL_REPLIED)) {
864                 CERROR("Unknown reason for wakeup\n");
865                 /* XXX Phil - I end up here when I kill obdctl */
866                 ptlrpc_abort(req);
867                 GOTO(out, rc = -EINTR);
868         }
869
870         rc = lustre_unpack_msg(req->rq_repmsg, req->rq_replen);
871         if (rc) {
872                 CERROR("unpack_rep failed: %d\n", rc);
873                 GOTO(out, rc);
874         }
875
876         CDEBUG(D_NET, "got rep "LPD64"\n", req->rq_xid);
877
878         /* let the callback do fixups, possibly including in the request */
879         if (req->rq_replay_cb)
880                 req->rq_replay_cb(req);
881
882         if ((req->rq_flags & PTL_RPC_FL_REPLIED) &&
883             req->rq_repmsg->status != old_status) {
884                 DEBUG_REQ(D_HA, req, "status %d, old was %d",
885                           req->rq_repmsg->status, old_status);
886         }
887
888  out:
889         req->rq_level = old_level;
890         RETURN(rc);
891 }
892
893 /* XXX looks a lot like super.c:invalidate_request_list, don't it? */
894 void ptlrpc_abort_inflight(struct obd_import *imp, int dying_import)
895 {
896         unsigned long flags;
897         struct list_head *tmp, *n;
898
899         /* Make sure that no new requests get processed for this import.
900          * ptlrpc_queue_wait must (and does) hold imp_lock while testing this
901          * flag and then putting requests on sending_list or delayed_list.
902          */
903         spin_lock_irqsave(&imp->imp_lock, flags);
904         imp->imp_flags |= IMP_INVALID;
905         spin_unlock_irqrestore(&imp->imp_lock, flags);
906
907         list_for_each_safe(tmp, n, &imp->imp_sending_list) {
908                 struct ptlrpc_request *req =
909                         list_entry(tmp, struct ptlrpc_request, rq_list);
910
911                 DEBUG_REQ(D_HA, req, "inflight");
912                 req->rq_flags |= PTL_RPC_FL_ERR;
913                 if (dying_import)
914                         req->rq_import = NULL;
915                 wake_up(&req->rq_wait_for_rep);
916         }
917
918         list_for_each_safe(tmp, n, &imp->imp_delayed_list) {
919                 struct ptlrpc_request *req =
920                         list_entry(tmp, struct ptlrpc_request, rq_list);
921
922                 DEBUG_REQ(D_HA, req, "aborting waiting req");
923                 req->rq_flags |= PTL_RPC_FL_ERR;
924                 if (dying_import)
925                         req->rq_import = NULL;
926                 wake_up(&req->rq_wait_for_rep);
927         }
928 }