Whamcloud - gitweb
merge b_devel into HEAD, which will become 0.7.3
[fs/lustre-release.git] / lustre / ldlm / ldlm_lib.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 #define EXPORT_SYMTAB
23 #define DEBUG_SUBSYSTEM S_LDLM
24
25 #ifdef __KERNEL__
26 # include <linux/module.h>
27 #else
28 # include <liblustre.h>
29 #endif
30 #include <linux/obd_ost.h>
31 #include <linux/lustre_dlm.h>
32 #include <linux/lustre_mds.h>
33 #include <linux/lustre_net.h>
34
35 int client_import_connect(struct lustre_handle *dlm_handle,
36                           struct obd_device *obd,
37                           struct obd_uuid *cluuid)
38 {
39         struct client_obd *cli = &obd->u.cli;
40         struct obd_import *imp = cli->cl_import;
41         struct obd_export *exp;
42         struct ptlrpc_request *request;
43         /* XXX maybe this is a good time to create a connect struct? */
44         int rc, size[] = {sizeof(imp->imp_target_uuid),
45                           sizeof(obd->obd_uuid),
46                           sizeof(*dlm_handle)};
47         char *tmp[] = {imp->imp_target_uuid.uuid,
48                        obd->obd_uuid.uuid,
49                        (char *)dlm_handle};
50         int msg_flags;
51
52         ENTRY;
53         down(&cli->cl_sem);
54         rc = class_connect(dlm_handle, obd, cluuid);
55         if (rc)
56                 GOTO(out_sem, rc);
57
58         cli->cl_conn_count++;
59         if (cli->cl_conn_count > 1)
60                 GOTO(out_sem, rc);
61
62         if (obd->obd_namespace != NULL)
63                 CERROR("already have namespace!\n");
64         obd->obd_namespace = ldlm_namespace_new(obd->obd_name,
65                                                 LDLM_NAMESPACE_CLIENT);
66         if (obd->obd_namespace == NULL)
67                 GOTO(out_disco, rc = -ENOMEM);
68
69         request = ptlrpc_prep_req(imp, imp->imp_connect_op, 3, size, tmp);
70         if (!request)
71                 GOTO(out_ldlm, rc = -ENOMEM);
72
73         request->rq_level = LUSTRE_CONN_NEW;
74         request->rq_replen = lustre_msg_size(0, NULL);
75
76         lustre_msg_add_op_flags(request->rq_reqmsg, MSG_CONNECT_PEER);
77
78         imp->imp_dlm_handle = *dlm_handle;
79
80         imp->imp_level = LUSTRE_CONN_CON;
81         rc = ptlrpc_queue_wait(request);
82         if (rc) {
83                 class_disconnect(dlm_handle, 0);
84                 GOTO(out_req, rc);
85         }
86
87         exp = class_conn2export(dlm_handle);
88         exp->exp_connection = ptlrpc_connection_addref(request->rq_connection);
89         class_export_put(exp);
90
91         msg_flags = lustre_msg_get_op_flags(request->rq_repmsg);
92         if (msg_flags & MSG_CONNECT_REPLAYABLE) {
93                 imp->imp_replayable = 1;
94                 CDEBUG(D_HA, "connected to replayable target: %s\n",
95                        imp->imp_target_uuid.uuid);
96                 ptlrpc_pinger_add_import(imp);
97         }
98         imp->imp_level = LUSTRE_CONN_FULL;
99         imp->imp_remote_handle = request->rq_repmsg->handle;
100         CDEBUG(D_HA, "local import: %p, remote handle: "LPX64"\n", imp,
101                imp->imp_remote_handle.cookie);
102
103         EXIT;
104 out_req:
105         ptlrpc_req_finished(request);
106         if (rc) {
107 out_ldlm:
108                 ldlm_namespace_free(obd->obd_namespace);
109                 obd->obd_namespace = NULL;
110 out_disco:
111                 cli->cl_conn_count--;
112                 class_disconnect(dlm_handle, 0);
113         }
114 out_sem:
115         up(&cli->cl_sem);
116         return rc;
117 }
118
119 int client_import_disconnect(struct lustre_handle *dlm_handle, int failover)
120 {
121         struct obd_device *obd = class_conn2obd(dlm_handle);
122         struct client_obd *cli = &obd->u.cli;
123         struct obd_import *imp = cli->cl_import;
124         struct ptlrpc_request *request = NULL;
125         int rc = 0, err, rq_opc;
126         ENTRY;
127
128         if (!obd) {
129                 CERROR("invalid connection for disconnect: cookie "LPX64"\n",
130                        dlm_handle ? dlm_handle->cookie : -1UL);
131                 RETURN(-EINVAL);
132         }
133
134         switch (imp->imp_connect_op) {
135         case OST_CONNECT: rq_opc = OST_DISCONNECT; break;
136         case MDS_CONNECT: rq_opc = MDS_DISCONNECT; break;
137         case MGMT_CONNECT:rq_opc = MGMT_DISCONNECT;break;
138         default:
139                 CERROR("don't know how to disconnect from %s (connect_op %d)\n",
140                        imp->imp_target_uuid.uuid, imp->imp_connect_op);
141                 RETURN(-EINVAL);
142         }
143
144         down(&cli->cl_sem);
145         if (!cli->cl_conn_count) {
146                 CERROR("disconnecting disconnected device (%s)\n",
147                        obd->obd_name);
148                 GOTO(out_sem, rc = -EINVAL);
149         }
150
151         cli->cl_conn_count--;
152         if (cli->cl_conn_count)
153                 GOTO(out_no_disconnect, rc = 0);
154
155         if (obd->obd_namespace != NULL) {
156                 /* obd_no_recov == local only */
157                 ldlm_cli_cancel_unused(obd->obd_namespace, NULL,
158                                        obd->obd_no_recov, NULL);
159                 ldlm_namespace_free(obd->obd_namespace);
160                 obd->obd_namespace = NULL;
161         }
162
163         /* Yeah, obd_no_recov also (mainly) means "forced shutdown". */
164         if (obd->obd_no_recov) {
165                 ptlrpc_set_import_active(imp, 0);
166         } else {
167                 request = ptlrpc_prep_req(imp, rq_opc, 0, NULL, NULL);
168                 if (!request)
169                         GOTO(out_req, rc = -ENOMEM);
170
171                 request->rq_replen = lustre_msg_size(0, NULL);
172
173                 rc = ptlrpc_queue_wait(request);
174                 if (rc)
175                         GOTO(out_req, rc);
176         }
177         if (imp->imp_replayable)
178                 ptlrpc_pinger_del_import(imp);
179
180         EXIT;
181  out_req:
182         if (request)
183                 ptlrpc_req_finished(request);
184  out_no_disconnect:
185         err = class_disconnect(dlm_handle, 0);
186         if (!rc && err)
187                 rc = err;
188  out_sem:
189         up(&cli->cl_sem);
190         RETURN(rc);
191 }
192
193 /* --------------------------------------------------------------------------
194  * from old lib/target.c
195  * -------------------------------------------------------------------------- */
196
197 int target_handle_reconnect(struct lustre_handle *conn, struct obd_export *exp,
198                             struct obd_uuid *cluuid)
199 {
200         if (exp->exp_connection) {
201                 struct lustre_handle *hdl;
202                 hdl = &exp->exp_ldlm_data.led_import->imp_remote_handle;
203                 /* Might be a re-connect after a partition. */
204                 if (!memcmp(&conn->cookie, &hdl->cookie, sizeof conn->cookie)) {
205                         CERROR("%s reconnecting\n", cluuid->uuid);
206                         conn->cookie = exp->exp_handle.h_cookie;
207                         RETURN(EALREADY);
208                 } else {
209                         CERROR("%s reconnecting from %s, "
210                                "handle mismatch (ours "LPX64", theirs "
211                                LPX64")\n", cluuid->uuid,
212                                exp->exp_connection->c_remote_uuid.uuid,
213                                hdl->cookie, conn->cookie);
214                         /* XXX disconnect them here? */
215                         memset(conn, 0, sizeof *conn);
216                         /* This is a little scary, but right now we build this
217                          * file separately into each server module, so I won't
218                          * go _immediately_ to hell.
219                          */
220                         RETURN(-EALREADY);
221                 }
222         }
223
224         conn->cookie = exp->exp_handle.h_cookie;
225         CDEBUG(D_INFO, "existing export for UUID '%s' at %p\n",
226                cluuid->uuid, exp);
227         CDEBUG(D_IOCTL,"connect: cookie "LPX64"\n", conn->cookie);
228         RETURN(0);
229 }
230
231 int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler)
232 {
233         struct obd_device *target;
234         struct obd_export *export = NULL;
235         struct obd_import *dlmimp;
236         struct lustre_handle conn;
237         struct obd_uuid tgtuuid;
238         struct obd_uuid cluuid;
239         struct obd_uuid remote_uuid;
240         struct list_head *p;
241         char *str, *tmp;
242         int rc = 0, abort_recovery;
243         ENTRY;
244
245         LASSERT_REQSWAB (req, 0);
246         str = lustre_msg_string(req->rq_reqmsg, 0, sizeof(tgtuuid) - 1);
247         if (str == NULL) {
248                 CERROR("bad target UUID for connect\n");
249                 GOTO(out, rc = -EINVAL);
250         }
251
252         obd_str2uuid (&tgtuuid, str);
253         target = class_uuid2obd(&tgtuuid);
254         if (!target || target->obd_stopping || !target->obd_set_up) {
255                 CERROR("UUID '%s' is not available for connect\n", str);
256                 GOTO(out, rc = -ENODEV);
257         }
258
259         LASSERT_REQSWAB (req, 1);
260         str = lustre_msg_string(req->rq_reqmsg, 1, sizeof(cluuid) - 1);
261         if (str == NULL) {
262                 CERROR("bad client UUID for connect\n");
263                 GOTO(out, rc = -EINVAL);
264         }
265
266         obd_str2uuid (&cluuid, str);
267
268         /* XXX extract a nettype and format accordingly */
269         snprintf(remote_uuid.uuid, sizeof remote_uuid,
270                  "NET_"LPX64"_UUID", req->rq_peer.peer_nid);
271
272         spin_lock_bh(&target->obd_processing_task_lock);
273         abort_recovery = target->obd_abort_recovery;
274         spin_unlock_bh(&target->obd_processing_task_lock);
275         if (abort_recovery)
276                 target_abort_recovery(target);
277
278         tmp = lustre_msg_buf(req->rq_reqmsg, 2, sizeof conn);
279         if (tmp == NULL)
280                 GOTO(out, rc = -EPROTO);
281
282         memcpy(&conn, tmp, sizeof conn);
283
284         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
285         if (rc)
286                 GOTO(out, rc);
287
288         /* lctl gets a backstage, all-access pass. */
289         if (obd_uuid_equals(&cluuid, &lctl_fake_uuid))
290                 goto dont_check_exports;
291
292         spin_lock(&target->obd_dev_lock);
293         list_for_each(p, &target->obd_exports) {
294                 export = list_entry(p, struct obd_export, exp_obd_chain);
295                 if (obd_uuid_equals(&cluuid, &export->exp_client_uuid)) {
296                         spin_unlock(&target->obd_dev_lock);
297                         LASSERT(export->exp_obd == target);
298
299                         rc = target_handle_reconnect(&conn, export, &cluuid);
300                         break;
301                 }
302                 export = NULL;
303         }
304         /* If we found an export, we already unlocked. */
305         if (!export)
306                 spin_unlock(&target->obd_dev_lock);
307
308         /* Tell the client if we're in recovery. */
309         /* If this is the first client, start the recovery timer */
310         if (target->obd_recovering) {
311                 lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_RECOVERING);
312                 target_start_recovery_timer(target, handler);
313         }
314
315         /* Tell the client if we support replayable requests */
316         if (target->obd_replayable)
317                 lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_REPLAYABLE);
318
319         if (export == NULL) {
320                 if (target->obd_recovering) {
321                         CERROR("denying connection for new client %s: "
322                                "in recovery\n", cluuid.uuid);
323                         rc = -EBUSY;
324                 } else {
325  dont_check_exports:
326                         rc = obd_connect(&conn, target, &cluuid);
327                 }
328         }
329
330         /* If all else goes well, this is our RPC return code. */
331         req->rq_status = 0;
332
333         if (rc && rc != EALREADY)
334                 GOTO(out, rc);
335
336         req->rq_repmsg->handle = conn;
337
338         /* If the client and the server are the same node, we will already
339          * have an export that really points to the client's DLM export,
340          * because we have a shared handles table.
341          *
342          * XXX this will go away when shaver stops sending the "connect" handle
343          * in the real "remote handle" field of the request --phik 24 Apr 2003
344          */
345         if (req->rq_export != NULL)
346                 class_export_put(req->rq_export);
347
348         /* ownership of this export ref transfers to the request */
349         export = req->rq_export = class_conn2export(&conn);
350         LASSERT(export != NULL);
351
352         if (req->rq_connection != NULL)
353                 ptlrpc_put_connection(req->rq_connection);
354         if (export->exp_connection != NULL)
355                 ptlrpc_put_connection(export->exp_connection);
356         export->exp_connection = ptlrpc_get_connection(&req->rq_peer,
357                                                        &remote_uuid);
358         req->rq_connection = ptlrpc_connection_addref(export->exp_connection);
359
360         if (rc == EALREADY) {
361                 /* We indicate the reconnection in a flag, not an error code. */
362                 lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_RECONNECT);
363                 GOTO(out, rc = 0);
364         }
365
366         memcpy(&conn, lustre_msg_buf(req->rq_reqmsg, 2, sizeof conn),
367                sizeof conn);
368
369         if (export->exp_ldlm_data.led_import != NULL)
370                 class_destroy_import(export->exp_ldlm_data.led_import);
371         dlmimp = export->exp_ldlm_data.led_import = class_new_import();
372         dlmimp->imp_connection = ptlrpc_connection_addref(req->rq_connection);
373         dlmimp->imp_client = &export->exp_obd->obd_ldlm_client;
374         dlmimp->imp_remote_handle = conn;
375         dlmimp->imp_obd = target;
376         dlmimp->imp_dlm_fake = 1;
377         dlmimp->imp_level = LUSTRE_CONN_FULL;
378         class_import_put(dlmimp);
379 out:
380         if (rc)
381                 req->rq_status = rc;
382         RETURN(rc);
383 }
384
385 int target_handle_disconnect(struct ptlrpc_request *req)
386 {
387         struct lustre_handle *conn = &req->rq_reqmsg->handle;
388         struct obd_import *dlmimp;
389         int rc;
390         ENTRY;
391
392         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
393         if (rc)
394                 RETURN(rc);
395
396         req->rq_status = obd_disconnect(conn, 0);
397
398         dlmimp = req->rq_export->exp_ldlm_data.led_import;
399         class_destroy_import(dlmimp);
400
401         class_export_put(req->rq_export);
402         req->rq_export = NULL;
403         RETURN(0);
404 }
405
406 /*
407  * Recovery functions
408  */
409
410 void target_cancel_recovery_timer(struct obd_device *obd)
411 {
412         del_timer(&obd->obd_recovery_timer);
413 }
414
415 static void abort_delayed_replies(struct obd_device *obd)
416 {
417         struct ptlrpc_request *req;
418         struct list_head *tmp, *n;
419         list_for_each_safe(tmp, n, &obd->obd_delayed_reply_queue) {
420                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
421                 DEBUG_REQ(D_ERROR, req, "aborted:");
422                 req->rq_status = -ENOTCONN;
423                 req->rq_type = PTL_RPC_MSG_ERR;
424                 ptlrpc_reply(req);
425                 list_del(&req->rq_list);
426                 OBD_FREE(req->rq_reqmsg, req->rq_reqlen);
427                 OBD_FREE(req, sizeof *req);
428         }
429 }
430
431 static void abort_recovery_queue(struct obd_device *obd)
432 {
433         struct ptlrpc_request *req;
434         struct list_head *tmp, *n;
435         int rc;
436
437         list_for_each_safe(tmp, n, &obd->obd_recovery_queue) {
438                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
439                 DEBUG_REQ(D_ERROR, req, "aborted:");
440                 req->rq_status = -ENOTCONN;
441                 req->rq_type = PTL_RPC_MSG_ERR;
442                 rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen,
443                                      &req->rq_repmsg);
444                 if (rc == 0) {
445                         ptlrpc_reply(req);
446                 } else {
447                         DEBUG_REQ(D_ERROR, req,
448                                   "packing failed for abort-reply; skipping");
449                 }
450                 list_del(&req->rq_list);
451                 class_export_put(req->rq_export);
452                 OBD_FREE(req->rq_reqmsg, req->rq_reqlen);
453                 OBD_FREE(req, sizeof *req);
454         }
455 }
456
457 void target_abort_recovery(void *data)
458 {
459         struct obd_device *obd = data;
460
461         CERROR("disconnecting clients and aborting recovery\n");
462         spin_lock_bh(&obd->obd_processing_task_lock);
463         if (!obd->obd_recovering) {
464                 spin_unlock_bh(&obd->obd_processing_task_lock);
465                 EXIT;
466                 return;
467         }
468
469         obd->obd_recovering = obd->obd_abort_recovery = 0;
470         obd->obd_recoverable_clients = 0;
471         wake_up(&obd->obd_next_transno_waitq);
472         target_cancel_recovery_timer(obd);
473         spin_unlock_bh(&obd->obd_processing_task_lock);
474         class_disconnect_exports(obd, 0);
475         abort_delayed_replies(obd);
476         abort_recovery_queue(obd);
477         ptlrpc_run_recovery_over_upcall(obd);
478 }
479
480 static void target_recovery_expired(unsigned long castmeharder)
481 {
482         struct obd_device *obd = (struct obd_device *)castmeharder;
483         CERROR("recovery timed out, aborting\n");
484         spin_lock_bh(&obd->obd_processing_task_lock);
485         obd->obd_abort_recovery = 1;
486         wake_up(&obd->obd_next_transno_waitq);
487         spin_unlock_bh(&obd->obd_processing_task_lock);
488 }
489
490 static void reset_recovery_timer(struct obd_device *obd)
491 {
492         int recovering;
493         spin_lock(&obd->obd_dev_lock);
494         recovering = obd->obd_recovering;
495         spin_unlock(&obd->obd_dev_lock);
496
497         if (!recovering)
498                 return;
499         CERROR("timer will expire in %ld seconds\n", OBD_RECOVERY_TIMEOUT / HZ);
500         mod_timer(&obd->obd_recovery_timer, jiffies + OBD_RECOVERY_TIMEOUT);
501 }
502
503
504 /* Only start it the first time called */
505 void target_start_recovery_timer(struct obd_device *obd, svc_handler_t handler)
506 {
507         spin_lock_bh(&obd->obd_processing_task_lock);
508         if (obd->obd_recovery_handler) {
509                 spin_unlock_bh(&obd->obd_processing_task_lock);
510                 return;
511         }
512         CERROR("%s: starting recovery timer\n", obd->obd_name);
513         obd->obd_recovery_handler = handler;
514         obd->obd_recovery_timer.function = target_recovery_expired;
515         obd->obd_recovery_timer.data = (unsigned long)obd;
516         init_timer(&obd->obd_recovery_timer);
517         spin_unlock_bh(&obd->obd_processing_task_lock);
518
519         reset_recovery_timer(obd);
520 }
521
522 static int check_for_next_transno(struct obd_device *obd)
523 {
524         struct ptlrpc_request *req;
525         int wake_up;
526
527         req = list_entry(obd->obd_recovery_queue.next,
528                          struct ptlrpc_request, rq_list);
529         LASSERT(req->rq_reqmsg->transno >= obd->obd_next_recovery_transno);
530
531         wake_up = req->rq_reqmsg->transno == obd->obd_next_recovery_transno ||
532                 (obd->obd_recovering) == 0;
533         CDEBUG(D_HA, "check_for_next_transno: "LPD64" vs "LPD64", %d == %d\n",
534                req->rq_reqmsg->transno, obd->obd_next_recovery_transno,
535                obd->obd_recovering, wake_up);
536         return wake_up;
537 }
538
539 static void process_recovery_queue(struct obd_device *obd)
540 {
541         struct ptlrpc_request *req;
542         int abort_recovery = 0;
543         struct l_wait_info lwi = { 0 };
544         ENTRY;
545
546         for (;;) {
547                 spin_lock_bh(&obd->obd_processing_task_lock);
548                 LASSERT(obd->obd_processing_task == current->pid);
549                 req = list_entry(obd->obd_recovery_queue.next,
550                                  struct ptlrpc_request, rq_list);
551
552                 if (req->rq_reqmsg->transno != obd->obd_next_recovery_transno) {
553                         spin_unlock_bh(&obd->obd_processing_task_lock);
554                         CDEBUG(D_HA, "Waiting for transno "LPD64" (1st is "
555                                LPD64")\n",
556                                obd->obd_next_recovery_transno,
557                                req->rq_reqmsg->transno);
558                         l_wait_event(obd->obd_next_transno_waitq,
559                                      check_for_next_transno(obd), &lwi);
560                         spin_lock_bh(&obd->obd_processing_task_lock);
561                         abort_recovery = obd->obd_abort_recovery;
562                         spin_unlock_bh(&obd->obd_processing_task_lock);
563                         if (abort_recovery) {
564                                 target_abort_recovery(obd);
565                                 return;
566                         }
567                         continue;
568                 }
569                 list_del_init(&req->rq_list);
570                 spin_unlock_bh(&obd->obd_processing_task_lock);
571
572                 DEBUG_REQ(D_ERROR, req, "processing: ");
573                 (void)obd->obd_recovery_handler(req);
574                 reset_recovery_timer(obd);
575                 /* bug 1580: decide how to properly sync() in recovery */
576                 //mds_fsync_super(mds->mds_sb);
577                 class_export_put(req->rq_export);
578                 OBD_FREE(req->rq_reqmsg, req->rq_reqlen);
579                 OBD_FREE(req, sizeof *req);
580                 spin_lock_bh(&obd->obd_processing_task_lock);
581                 obd->obd_next_recovery_transno++;
582                 if (list_empty(&obd->obd_recovery_queue)) {
583                         obd->obd_processing_task = 0;
584                         spin_unlock_bh(&obd->obd_processing_task_lock);
585                         break;
586                 }
587                 spin_unlock_bh(&obd->obd_processing_task_lock);
588         }
589         EXIT;
590 }
591
592 int target_queue_recovery_request(struct ptlrpc_request *req,
593                                   struct obd_device *obd)
594 {
595         struct list_head *tmp;
596         int inserted = 0;
597         __u64 transno = req->rq_reqmsg->transno;
598         struct ptlrpc_request *saved_req;
599         struct lustre_msg *reqmsg;
600
601         /* CAVEAT EMPTOR: The incoming request message has been swabbed
602          * (i.e. buflens etc are in my own byte order), but type-dependent
603          * buffers (eg mds_body, ost_body etc) have NOT been swabbed. */
604
605         if (!transno) {
606                 INIT_LIST_HEAD(&req->rq_list);
607                 DEBUG_REQ(D_HA, req, "not queueing");
608                 return 1;
609         }
610
611         /* XXX If I were a real man, these LBUGs would be sane cleanups. */
612         /* XXX just like the request-dup code in queue_final_reply */
613         OBD_ALLOC(saved_req, sizeof *saved_req);
614         if (!saved_req)
615                 LBUG();
616         OBD_ALLOC(reqmsg, req->rq_reqlen);
617         if (!reqmsg)
618                 LBUG();
619
620         spin_lock_bh(&obd->obd_processing_task_lock);
621
622         /* If we're processing the queue, we want don't want to queue this
623          * message.
624          *
625          * Also, if this request has a transno less than the one we're waiting
626          * for, we should process it now.  It could (and currently always will)
627          * be an open request for a descriptor that was opened some time ago.
628          */
629         if (obd->obd_processing_task == current->pid ||
630             transno < obd->obd_next_recovery_transno) {
631                 /* Processing the queue right now, don't re-add. */
632                 LASSERT(list_empty(&req->rq_list));
633                 spin_unlock_bh(&obd->obd_processing_task_lock);
634                 OBD_FREE(reqmsg, req->rq_reqlen);
635                 OBD_FREE(saved_req, sizeof *saved_req);
636                 return 1;
637         }
638
639         memcpy(saved_req, req, sizeof *req);
640         memcpy(reqmsg, req->rq_reqmsg, req->rq_reqlen);
641         req = saved_req;
642         req->rq_reqmsg = reqmsg;
643         class_export_get(req->rq_export);
644         INIT_LIST_HEAD(&req->rq_list);
645
646         /* XXX O(n^2) */
647         list_for_each(tmp, &obd->obd_recovery_queue) {
648                 struct ptlrpc_request *reqiter =
649                         list_entry(tmp, struct ptlrpc_request, rq_list);
650
651                 if (reqiter->rq_reqmsg->transno > transno) {
652                         list_add_tail(&req->rq_list, &reqiter->rq_list);
653                         inserted = 1;
654                         break;
655                 }
656         }
657
658         if (!inserted) {
659                 list_add_tail(&req->rq_list, &obd->obd_recovery_queue);
660         }
661
662         if (obd->obd_processing_task != 0) {
663                 /* Someone else is processing this queue, we'll leave it to
664                  * them.
665                  */
666                 if (transno == obd->obd_next_recovery_transno)
667                         wake_up(&obd->obd_next_transno_waitq);
668                 spin_unlock_bh(&obd->obd_processing_task_lock);
669                 return 0;
670         }
671
672         /* Nobody is processing, and we know there's (at least) one to process
673          * now, so we'll do the honours.
674          */
675         obd->obd_processing_task = current->pid;
676         spin_unlock_bh(&obd->obd_processing_task_lock);
677
678         process_recovery_queue(obd);
679         return 0;
680 }
681
682 struct obd_device * target_req2obd(struct ptlrpc_request *req)
683 {
684         return req->rq_export->exp_obd;
685 }
686
687 int target_queue_final_reply(struct ptlrpc_request *req, int rc)
688 {
689         struct obd_device *obd = target_req2obd(req);
690         struct ptlrpc_request *saved_req;
691         struct lustre_msg *reqmsg;
692         int recovery_done = 0;
693
694         if (rc) {
695                 /* Just like ptlrpc_error, but without the sending. */
696                 lustre_pack_msg(0, NULL, NULL, &req->rq_replen,
697                                 &req->rq_repmsg);
698                 req->rq_type = PTL_RPC_MSG_ERR;
699         }
700
701         LASSERT(list_empty(&req->rq_list));
702         /* XXX just like the request-dup code in queue_recovery_request */
703         OBD_ALLOC(saved_req, sizeof *saved_req);
704         if (!saved_req)
705                 LBUG();
706         OBD_ALLOC(reqmsg, req->rq_reqlen);
707         if (!reqmsg)
708                 LBUG();
709         memcpy(saved_req, req, sizeof *saved_req);
710         memcpy(reqmsg, req->rq_reqmsg, req->rq_reqlen);
711         req = saved_req;
712         req->rq_reqmsg = reqmsg;
713         list_add(&req->rq_list, &obd->obd_delayed_reply_queue);
714
715         spin_lock_bh(&obd->obd_processing_task_lock);
716         --obd->obd_recoverable_clients;
717         recovery_done = (obd->obd_recoverable_clients == 0);
718         spin_unlock_bh(&obd->obd_processing_task_lock);
719
720         if (recovery_done) {
721                 struct list_head *tmp, *n;
722                 ldlm_reprocess_all_ns(req->rq_export->exp_obd->obd_namespace);
723                 CERROR("%s: all clients recovered, sending delayed replies\n",
724                        obd->obd_name);
725                 obd->obd_recovering = 0;
726                 list_for_each_safe(tmp, n, &obd->obd_delayed_reply_queue) {
727                         req = list_entry(tmp, struct ptlrpc_request, rq_list);
728                         DEBUG_REQ(D_ERROR, req, "delayed:");
729                         ptlrpc_reply(req);
730                         list_del(&req->rq_list);
731                         OBD_FREE(req->rq_reqmsg, req->rq_reqlen);
732                         OBD_FREE(req, sizeof *req);
733                 }
734                 target_cancel_recovery_timer(obd);
735         } else {
736                 CERROR("%s: %d recoverable clients remain\n",
737                        obd->obd_name, obd->obd_recoverable_clients);
738         }
739
740         return 1;
741 }
742
743 static void ptlrpc_abort_reply (struct ptlrpc_request *req)
744 {
745         /* On return, we must be sure that the ACK callback has either
746          * happened or will not happen.  Note that the SENT callback will
747          * happen come what may since we successfully posted the PUT. */
748         int rc;
749         struct l_wait_info lwi;
750         unsigned long flags;
751
752  again:
753         /* serialise with ACK callback */
754         spin_lock_irqsave (&req->rq_lock, flags);
755         if (!req->rq_want_ack) {
756                 spin_unlock_irqrestore (&req->rq_lock, flags);
757                 /* The ACK callback has happened already.  Although the
758                  * SENT callback might still be outstanding (yes really) we
759                  * don't care; this is just like normal completion. */
760                 return;
761         }
762         spin_unlock_irqrestore (&req->rq_lock, flags);
763
764         /* Have a bash at unlinking the MD.  This will fail until the SENT
765          * callback has happened since the MD is busy from the PUT.  If the
766          * ACK still hasn't arrived after then, a successful unlink will
767          * ensure the ACK callback never happens. */
768         rc = PtlMDUnlink (req->rq_reply_md_h);
769         switch (rc) {
770         default:
771                 LBUG ();
772         case PTL_OK:
773                 /* SENT callback happened; ACK callback preempted */
774                 LASSERT (req->rq_want_ack);
775                 spin_lock_irqsave (&req->rq_lock, flags);
776                 req->rq_want_ack = 0;
777                 spin_unlock_irqrestore (&req->rq_lock, flags);
778                 return;
779         case PTL_INV_MD:
780                 return;
781         case PTL_MD_INUSE:
782                 /* Still sending or ACK callback in progress: wait until
783                  * either callback has completed and try again.
784                  * Actually we can't wait for the SENT callback because
785                  * there's no state the SENT callback can touch that will
786                  * allow it to communicate with us!  So we just wait here
787                  * for a short time, effectively polling for the SENT
788                  * callback by calling PtlMDUnlink() again, to see if it
789                  * has finished.  Note that if the ACK does arrive, its
790                  * callback wakes us in short order. --eeb */
791                 lwi = LWI_TIMEOUT (HZ/4, NULL, NULL);
792                 rc = l_wait_event(req->rq_wait_for_rep, !req->rq_want_ack,
793                                   &lwi);
794                 CDEBUG (D_HA, "Retrying req %p: %d\n", req, rc);
795                 /* NB go back and test rq_want_ack with locking, to ensure
796                  * if ACK callback happened, it has completed stopped
797                  * referencing this req. */
798                 goto again;
799         }
800 }
801
802 void target_send_reply(struct ptlrpc_request *req, int rc, int fail_id)
803 {
804         int i;
805         int netrc;
806         unsigned long flags;
807         struct ptlrpc_req_ack_lock *ack_lock;
808         struct l_wait_info lwi = { 0 };
809         wait_queue_t commit_wait;
810         struct obd_device *obd =
811                 req->rq_export ? req->rq_export->exp_obd : NULL;
812         struct obd_export *exp =
813                 (req->rq_export && req->rq_ack_locks[0].mode) ?
814                 req->rq_export : NULL;
815
816         if (exp) {
817                 exp->exp_outstanding_reply = req;
818                 spin_lock_irqsave (&req->rq_lock, flags);
819                 req->rq_want_ack = 1;
820                 spin_unlock_irqrestore (&req->rq_lock, flags);
821         }
822
823         if (!OBD_FAIL_CHECK(fail_id | OBD_FAIL_ONCE)) {
824                 if (rc) {
825                         DEBUG_REQ(D_ERROR, req, "processing error (%d)", rc);
826                         netrc = ptlrpc_error(req);
827                 } else {
828                         DEBUG_REQ(D_NET, req, "sending reply");
829                         netrc = ptlrpc_reply(req);
830                 }
831         } else {
832                 obd_fail_loc |= OBD_FAIL_ONCE | OBD_FAILED;
833                 DEBUG_REQ(D_ERROR, req, "dropping reply");
834                 if (!exp && req->rq_repmsg) {
835                         OBD_FREE(req->rq_repmsg, req->rq_replen);
836                         req->rq_repmsg = NULL;
837                 }
838                 init_waitqueue_head(&req->rq_wait_for_rep);
839                 netrc = 0;
840         }
841
842         /* a failed send simulates the callbacks */
843         LASSERT(netrc == 0 || req->rq_want_ack == 0);
844         if (exp == NULL) {
845                 LASSERT(req->rq_want_ack == 0);
846                 return;
847         }
848         LASSERT(obd != NULL);
849
850         init_waitqueue_entry(&commit_wait, current);
851         add_wait_queue(&obd->obd_commit_waitq, &commit_wait);
852         rc = l_wait_event(req->rq_wait_for_rep,
853                           !req->rq_want_ack || req->rq_resent ||
854                           req->rq_transno <= obd->obd_last_committed, &lwi);
855         remove_wait_queue(&obd->obd_commit_waitq, &commit_wait);
856
857         spin_lock_irqsave (&req->rq_lock, flags);
858         /* If we got here because the ACK callback ran, this acts as a
859          * barrier to ensure the callback completed the wakeup. */
860         spin_unlock_irqrestore (&req->rq_lock, flags);
861
862         /* If we committed the transno already, then we might wake up before
863          * the ack arrives.  We need to stop waiting for the ack before we can
864          * reuse this request structure.  We are guaranteed by this point that
865          * this cannot abort the sending of the actual reply.*/
866         ptlrpc_abort_reply(req);
867
868         if (req->rq_resent) {
869                 DEBUG_REQ(D_HA, req, "resent: not cancelling locks");
870                 return;
871         }
872
873         LASSERT(rc == 0);
874         DEBUG_REQ(D_HA, req, "cancelling locks for %s",
875                   req->rq_want_ack ? "commit" : "ack");
876
877         exp->exp_outstanding_reply = NULL;
878
879         for (ack_lock = req->rq_ack_locks, i = 0; i < 4; i++, ack_lock++) {
880                 if (!ack_lock->mode)
881                         break;
882                 ldlm_lock_decref(&ack_lock->lock, ack_lock->mode);
883         }
884 }
885
886 int target_handle_ping(struct ptlrpc_request *req)
887 {
888         return lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
889 }