Whamcloud - gitweb
merge b_devel into HEAD (20030703)
[fs/lustre-release.git] / lustre / ldlm / ldlm_lib.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 #define EXPORT_SYMTAB
23 #define DEBUG_SUBSYSTEM S_LDLM
24
25 #ifdef __KERNEL__
26 # include <linux/module.h>
27 #else
28 # include <liblustre.h>
29 #endif
30 #include <linux/obd_ost.h>
31 #include <linux/lustre_dlm.h>
32 #include <linux/lustre_mds.h>
33 #include <linux/lustre_net.h>
34
35 int client_import_connect(struct lustre_handle *dlm_handle, 
36                           struct obd_device *obd,
37                           struct obd_uuid *cluuid)
38 {
39         struct client_obd *cli = &obd->u.cli;
40         struct obd_import *imp = cli->cl_import;
41         struct obd_export *exp;
42         struct ptlrpc_request *request;
43         /* XXX maybe this is a good time to create a connect struct? */
44         int rc, size[] = {sizeof(imp->imp_target_uuid),
45                           sizeof(obd->obd_uuid),
46                           sizeof(*dlm_handle)};
47         char *tmp[] = {imp->imp_target_uuid.uuid,
48                        obd->obd_uuid.uuid,
49                        (char *)dlm_handle};
50         int rq_opc = (obd->obd_type->typ_ops->o_brw) ? OST_CONNECT :MDS_CONNECT;
51         int msg_flags;
52
53         ENTRY;
54         down(&cli->cl_sem);
55         rc = class_connect(dlm_handle, obd, cluuid);
56         if (rc)
57                 GOTO(out_sem, rc);
58
59         cli->cl_conn_count++;
60         if (cli->cl_conn_count > 1)
61                 GOTO(out_sem, rc);
62
63         if (obd->obd_namespace != NULL)
64                 CERROR("already have namespace!\n");
65         obd->obd_namespace = ldlm_namespace_new(obd->obd_name,
66                                                 LDLM_NAMESPACE_CLIENT);
67         if (obd->obd_namespace == NULL)
68                 GOTO(out_disco, rc = -ENOMEM);
69
70         request = ptlrpc_prep_req(imp, rq_opc, 3, size, tmp);
71         if (!request)
72                 GOTO(out_ldlm, rc = -ENOMEM);
73
74         request->rq_level = LUSTRE_CONN_NEW;
75         request->rq_replen = lustre_msg_size(0, NULL);
76
77         imp->imp_dlm_handle = *dlm_handle;
78
79         imp->imp_level = LUSTRE_CONN_CON;
80         rc = ptlrpc_queue_wait(request);
81         if (rc) {
82                 class_disconnect(dlm_handle, 0);
83                 GOTO(out_req, rc);
84         }
85
86         exp = class_conn2export(dlm_handle);
87         exp->exp_connection = ptlrpc_connection_addref(request->rq_connection);
88         class_export_put(exp);
89
90         msg_flags = lustre_msg_get_op_flags(request->rq_repmsg);
91         if (rq_opc == MDS_CONNECT || msg_flags & MSG_CONNECT_REPLAYABLE) {
92                 imp->imp_replayable = 1;
93                 CDEBUG(D_HA, "connected to replayable target: %s\n",
94                        imp->imp_target_uuid.uuid);
95                 ptlrpc_pinger_add_import(imp);
96         }
97         imp->imp_level = LUSTRE_CONN_FULL;
98         imp->imp_remote_handle = request->rq_repmsg->handle;
99         CDEBUG(D_HA, "local import: %p, remote handle: "LPX64"\n", imp,
100                imp->imp_remote_handle.cookie);
101
102         EXIT;
103 out_req:
104         ptlrpc_req_finished(request);
105         if (rc) {
106 out_ldlm:
107                 ldlm_namespace_free(obd->obd_namespace);
108                 obd->obd_namespace = NULL;
109 out_disco:
110                 cli->cl_conn_count--;
111                 class_disconnect(dlm_handle, 0);
112         }
113 out_sem:
114         up(&cli->cl_sem);
115         return rc;
116 }
117
118 int client_import_disconnect(struct lustre_handle *dlm_handle, int failover)
119 {
120         struct obd_device *obd = class_conn2obd(dlm_handle);
121         struct client_obd *cli = &obd->u.cli;
122         struct obd_import *imp = cli->cl_import;
123         struct ptlrpc_request *request = NULL;
124         int rc = 0, err, rq_opc;
125         ENTRY;
126
127         if (!obd) {
128                 CERROR("invalid connection for disconnect: cookie "LPX64"\n",
129                        dlm_handle ? dlm_handle->cookie : -1UL);
130                 RETURN(-EINVAL);
131         }
132
133         rq_opc = obd->obd_type->typ_ops->o_brw ? OST_DISCONNECT:MDS_DISCONNECT;
134         down(&cli->cl_sem);
135         if (!cli->cl_conn_count) {
136                 CERROR("disconnecting disconnected device (%s)\n",
137                        obd->obd_name);
138                 GOTO(out_sem, rc = -EINVAL);
139         }
140
141         cli->cl_conn_count--;
142         if (cli->cl_conn_count)
143                 GOTO(out_no_disconnect, rc = 0);
144
145         if (obd->obd_namespace != NULL) {
146                 /* obd_no_recov == local only */
147                 ldlm_cli_cancel_unused(obd->obd_namespace, NULL,
148                                        obd->obd_no_recov, NULL);
149                 ldlm_namespace_free(obd->obd_namespace);
150                 obd->obd_namespace = NULL;
151         }
152
153         /* Yeah, obd_no_recov also (mainly) means "forced shutdown". */
154         if (obd->obd_no_recov) {
155                 ptlrpc_set_import_active(imp, 0);
156         } else {
157                 request = ptlrpc_prep_req(imp, rq_opc, 0, NULL, NULL);
158                 if (!request)
159                         GOTO(out_req, rc = -ENOMEM);
160
161                 request->rq_replen = lustre_msg_size(0, NULL);
162
163                 rc = ptlrpc_queue_wait(request);
164                 if (rc)
165                         GOTO(out_req, rc);
166         }
167         if (imp->imp_replayable)
168                 ptlrpc_pinger_del_import(imp);
169
170         EXIT;
171  out_req:
172         if (request)
173                 ptlrpc_req_finished(request);
174  out_no_disconnect:
175         err = class_disconnect(dlm_handle, 0);
176         if (!rc && err)
177                 rc = err;
178  out_sem:
179         up(&cli->cl_sem);
180         RETURN(rc);
181 }
182
183 /* --------------------------------------------------------------------------
184  * from old lib/target.c
185  * -------------------------------------------------------------------------- */
186
187 int target_handle_reconnect(struct lustre_handle *conn, struct obd_export *exp,
188                             struct obd_uuid *cluuid)
189 {
190         if (exp->exp_connection) {
191                 struct lustre_handle *hdl;
192                 hdl = &exp->exp_ldlm_data.led_import->imp_remote_handle;
193                 /* Might be a re-connect after a partition. */
194                 if (!memcmp(&conn->cookie, &hdl->cookie, sizeof conn->cookie)) {
195                         CERROR("%s reconnecting\n", cluuid->uuid);
196                         conn->cookie = exp->exp_handle.h_cookie;
197                         RETURN(EALREADY);
198                 } else {
199                         CERROR("%s reconnecting from %s, "
200                                "handle mismatch (ours "LPX64", theirs "
201                                LPX64")\n", cluuid->uuid,
202                                exp->exp_connection->c_remote_uuid.uuid,
203                                hdl->cookie, conn->cookie);
204                         /* XXX disconnect them here? */
205                         memset(conn, 0, sizeof *conn);
206                         /* This is a little scary, but right now we build this
207                          * file separately into each server module, so I won't
208                          * go _immediately_ to hell.
209                          */
210                         RETURN(-EALREADY);
211                 }
212         }
213
214         conn->cookie = exp->exp_handle.h_cookie;
215         CDEBUG(D_INFO, "existing export for UUID '%s' at %p\n",
216                cluuid->uuid, exp);
217         CDEBUG(D_IOCTL,"connect: cookie "LPX64"\n", conn->cookie);
218         RETURN(0);
219 }
220
221 int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler)
222 {
223         struct obd_device *target;
224         struct obd_export *export = NULL;
225         struct obd_import *dlmimp;
226         struct lustre_handle conn;
227         struct obd_uuid tgtuuid;
228         struct obd_uuid cluuid;
229         struct obd_uuid remote_uuid;
230         struct list_head *p;
231         char *str, *tmp;
232         int rc, i, abort_recovery;
233         ENTRY;
234
235         LASSERT_REQSWAB (req, 0);
236         str = lustre_msg_string (req->rq_reqmsg, 0, sizeof (tgtuuid.uuid) - 1);
237         if (str == NULL) {
238                 CERROR("bad target UUID for connect\n");
239                 GOTO(out, rc = -EINVAL);
240         }
241         obd_str2uuid (&tgtuuid, str);
242
243         LASSERT_REQSWAB (req, 1);
244         str = lustre_msg_string (req->rq_reqmsg, 1, sizeof (cluuid.uuid) - 1);
245         if (str == NULL) {
246                 CERROR("bad client UUID for connect\n");
247                 GOTO(out, rc = -EINVAL);
248         }
249         obd_str2uuid (&cluuid, str);
250
251         i = class_uuid2dev(&tgtuuid);
252         if (i == -1) {
253                 CERROR("UUID '%s' not found for connect\n", tgtuuid.uuid);
254                 GOTO(out, rc = -ENODEV);
255         }
256
257         target = &obd_dev[i];
258         if (!target || target->obd_stopping || !target->obd_set_up) {
259                 CERROR("UUID '%s' is not available for connect\n", str);
260                 GOTO(out, rc = -ENODEV);
261         }
262
263         /* XXX extract a nettype and format accordingly */
264         snprintf(remote_uuid.uuid, sizeof remote_uuid,
265                  "NET_"LPX64"_UUID", req->rq_peer.peer_nid);
266
267         spin_lock_bh(&target->obd_processing_task_lock);
268         abort_recovery = target->obd_abort_recovery;
269         spin_unlock_bh(&target->obd_processing_task_lock);
270         if (abort_recovery)
271                 target_abort_recovery(target);
272
273         tmp = lustre_msg_buf(req->rq_reqmsg, 2, sizeof conn);
274         if (tmp == NULL)
275                 GOTO(out, rc = -EPROTO);
276
277         memcpy(&conn, tmp, sizeof conn);
278
279         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
280         if (rc)
281                 GOTO(out, rc);
282
283         /* lctl gets a backstage, all-access pass. */
284         if (obd_uuid_equals(&cluuid, &lctl_fake_uuid))
285                 goto dont_check_exports;
286
287         spin_lock(&target->obd_dev_lock);
288         list_for_each(p, &target->obd_exports) {
289                 export = list_entry(p, struct obd_export, exp_obd_chain);
290                 if (obd_uuid_equals(&cluuid, &export->exp_client_uuid)) {
291                         spin_unlock(&target->obd_dev_lock);
292                         LASSERT(export->exp_obd == target);
293
294                         rc = target_handle_reconnect(&conn, export, &cluuid);
295                         break;
296                 }
297                 export = NULL;
298         }
299         /* If we found an export, we already unlocked. */
300         if (!export)
301                 spin_unlock(&target->obd_dev_lock);
302
303         /* Tell the client if we're in recovery. */
304         /* If this is the first client, start the recovery timer */
305         if (target->obd_recovering) {
306                 lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_RECOVERING);
307                 target_start_recovery_timer(target, handler);
308         }
309
310         /* Tell the client if we support replayable requests */
311         if (target->obd_replayable)
312                 lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_REPLAYABLE);
313
314         if (export == NULL) {
315                 if (target->obd_recovering) {
316                         CERROR("denying connection for new client %s: "
317                                "in recovery\n", cluuid.uuid);
318                         rc = -EBUSY;
319                 } else {
320  dont_check_exports:
321                         rc = obd_connect(&conn, target, &cluuid);
322                 }
323         }
324
325         /* If all else goes well, this is our RPC return code. */
326         req->rq_status = 0;
327
328         if (rc && rc != EALREADY)
329                 GOTO(out, rc);
330
331         req->rq_repmsg->handle = conn;
332
333         /* If the client and the server are the same node, we will already
334          * have an export that really points to the client's DLM export,
335          * because we have a shared handles table.
336          *
337          * XXX this will go away when shaver stops sending the "connect" handle
338          * in the real "remote handle" field of the request --phik 24 Apr 2003
339          */
340         if (req->rq_export != NULL)
341                 class_export_put(req->rq_export);
342
343         /* ownership of this export ref transfers to the request */
344         export = req->rq_export = class_conn2export(&conn);
345         LASSERT(export != NULL);
346
347         if (req->rq_connection != NULL)
348                 ptlrpc_put_connection(req->rq_connection);
349         if (export->exp_connection != NULL)
350                 ptlrpc_put_connection(export->exp_connection);
351         export->exp_connection = ptlrpc_get_connection(&req->rq_peer,
352                                                        &remote_uuid);
353         req->rq_connection = ptlrpc_connection_addref(export->exp_connection);
354
355         if (rc == EALREADY) {
356                 /* We indicate the reconnection in a flag, not an error code. */
357                 lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_RECONNECT);
358                 GOTO(out, rc = 0);
359         }
360
361         memcpy(&conn, lustre_msg_buf(req->rq_reqmsg, 2, sizeof conn),
362                sizeof conn);
363
364         if (export->exp_ldlm_data.led_import != NULL)
365                 class_destroy_import(export->exp_ldlm_data.led_import);
366         dlmimp = export->exp_ldlm_data.led_import = class_new_import();
367         dlmimp->imp_connection = ptlrpc_connection_addref(req->rq_connection);
368         dlmimp->imp_client = &export->exp_obd->obd_ldlm_client;
369         dlmimp->imp_remote_handle = conn;
370         dlmimp->imp_obd = target;
371         dlmimp->imp_dlm_fake = 1;
372         dlmimp->imp_level = LUSTRE_CONN_FULL;
373         class_import_put(dlmimp);
374 out:
375         if (rc)
376                 req->rq_status = rc;
377         RETURN(rc);
378 }
379
380 int target_handle_disconnect(struct ptlrpc_request *req)
381 {
382         struct lustre_handle *conn = &req->rq_reqmsg->handle;
383         struct obd_import *dlmimp;
384         int rc;
385         ENTRY;
386
387         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
388         if (rc)
389                 RETURN(rc);
390
391         req->rq_status = obd_disconnect(conn, 0);
392
393         dlmimp = req->rq_export->exp_ldlm_data.led_import;
394         class_destroy_import(dlmimp);
395
396         class_export_put(req->rq_export);
397         req->rq_export = NULL;
398         RETURN(0);
399 }
400
401 /*
402  * Recovery functions
403  */
404
405 void target_cancel_recovery_timer(struct obd_device *obd)
406 {
407         del_timer(&obd->obd_recovery_timer);
408 }
409
410 static void abort_delayed_replies(struct obd_device *obd)
411 {
412         struct ptlrpc_request *req;
413         struct list_head *tmp, *n;
414         list_for_each_safe(tmp, n, &obd->obd_delayed_reply_queue) {
415                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
416                 DEBUG_REQ(D_ERROR, req, "aborted:");
417                 req->rq_status = -ENOTCONN;
418                 req->rq_type = PTL_RPC_MSG_ERR;
419                 ptlrpc_reply(req);
420                 list_del(&req->rq_list);
421                 OBD_FREE(req->rq_reqmsg, req->rq_reqlen);
422                 OBD_FREE(req, sizeof *req);
423         }
424 }
425
426 static void abort_recovery_queue(struct obd_device *obd)
427 {
428         struct ptlrpc_request *req;
429         struct list_head *tmp, *n;
430         int rc;
431
432         list_for_each_safe(tmp, n, &obd->obd_recovery_queue) {
433                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
434                 DEBUG_REQ(D_ERROR, req, "aborted:");
435                 req->rq_status = -ENOTCONN;
436                 req->rq_type = PTL_RPC_MSG_ERR;
437                 rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen,
438                                      &req->rq_repmsg);
439                 if (rc == 0) {
440                         ptlrpc_reply(req);
441                 } else {
442                         DEBUG_REQ(D_ERROR, req,
443                                   "packing failed for abort-reply; skipping");
444                 }
445                 list_del(&req->rq_list);
446                 class_export_put(req->rq_export);
447                 OBD_FREE(req->rq_reqmsg, req->rq_reqlen);
448                 OBD_FREE(req, sizeof *req);
449         }
450 }
451
452 void target_abort_recovery(void *data)
453 {
454         struct obd_device *obd = data;
455
456         CERROR("disconnecting clients and aborting recovery\n");
457         spin_lock_bh(&obd->obd_processing_task_lock);
458         if (!obd->obd_recovering) {
459                 spin_unlock_bh(&obd->obd_processing_task_lock);
460                 EXIT;
461                 return;
462         }
463
464         obd->obd_recovering = obd->obd_abort_recovery = 0;
465         obd->obd_recoverable_clients = 0;
466         wake_up(&obd->obd_next_transno_waitq);
467         target_cancel_recovery_timer(obd);
468         spin_unlock_bh(&obd->obd_processing_task_lock);
469         class_disconnect_exports(obd, 0);
470         abort_delayed_replies(obd);
471         abort_recovery_queue(obd);
472         ptlrpc_run_recovery_over_upcall(obd);
473 }
474
475 static void target_recovery_expired(unsigned long castmeharder)
476 {
477         struct obd_device *obd = (struct obd_device *)castmeharder;
478         CERROR("recovery timed out, aborting\n");
479         spin_lock_bh(&obd->obd_processing_task_lock);
480         obd->obd_abort_recovery = 1;
481         wake_up(&obd->obd_next_transno_waitq);
482         spin_unlock_bh(&obd->obd_processing_task_lock);
483 }
484
485 static void reset_recovery_timer(struct obd_device *obd)
486 {
487         int recovering;
488         spin_lock(&obd->obd_dev_lock);
489         recovering = obd->obd_recovering;
490         spin_unlock(&obd->obd_dev_lock);
491
492         if (!recovering)
493                 return;
494         CDEBUG(D_ERROR, "timer will expire in %ld seconds\n",
495                OBD_RECOVERY_TIMEOUT / HZ);
496         mod_timer(&obd->obd_recovery_timer, jiffies + OBD_RECOVERY_TIMEOUT);
497 }
498
499
500 /* Only start it the first time called */
501 void target_start_recovery_timer(struct obd_device *obd, svc_handler_t handler)
502 {
503         spin_lock_bh(&obd->obd_processing_task_lock);
504         if (obd->obd_recovery_handler) {
505                 spin_unlock_bh(&obd->obd_processing_task_lock);
506                 return;
507         }
508         CERROR("%s: starting recovery timer\n", obd->obd_name);
509         obd->obd_recovery_handler = handler;
510         obd->obd_recovery_timer.function = target_recovery_expired;
511         obd->obd_recovery_timer.data = (unsigned long)obd;
512         init_timer(&obd->obd_recovery_timer);
513         spin_unlock_bh(&obd->obd_processing_task_lock);
514
515         reset_recovery_timer(obd);
516 }
517
518 static int check_for_next_transno(struct obd_device *obd)
519 {
520         struct ptlrpc_request *req;
521         int wake_up;
522
523         req = list_entry(obd->obd_recovery_queue.next,
524                          struct ptlrpc_request, rq_list);
525         LASSERT(req->rq_reqmsg->transno >= obd->obd_next_recovery_transno);
526
527         wake_up = req->rq_reqmsg->transno == obd->obd_next_recovery_transno ||
528                 (obd->obd_recovering) == 0;
529         CDEBUG(D_HA, "check_for_next_transno: "LPD64" vs "LPD64", %d == %d\n",
530                req->rq_reqmsg->transno, obd->obd_next_recovery_transno,
531                obd->obd_recovering, wake_up);
532         return wake_up;
533 }
534
535 static void process_recovery_queue(struct obd_device *obd)
536 {
537         struct ptlrpc_request *req;
538         int abort_recovery = 0;
539         struct l_wait_info lwi = { 0 };
540         ENTRY;
541
542         for (;;) {
543                 spin_lock_bh(&obd->obd_processing_task_lock);
544                 LASSERT(obd->obd_processing_task == current->pid);
545                 req = list_entry(obd->obd_recovery_queue.next,
546                                  struct ptlrpc_request, rq_list);
547
548                 if (req->rq_reqmsg->transno != obd->obd_next_recovery_transno) {
549                         spin_unlock_bh(&obd->obd_processing_task_lock);
550                         CDEBUG(D_HA, "Waiting for transno "LPD64" (1st is "
551                                LPD64")\n",
552                                obd->obd_next_recovery_transno,
553                                req->rq_reqmsg->transno);
554                         l_wait_event(obd->obd_next_transno_waitq,
555                                      check_for_next_transno(obd), &lwi);
556                         spin_lock_bh(&obd->obd_processing_task_lock);
557                         abort_recovery = obd->obd_abort_recovery;
558                         spin_unlock_bh(&obd->obd_processing_task_lock);
559                         if (abort_recovery) {
560                                 target_abort_recovery(obd);
561                                 return;
562                         }
563                         continue;
564                 }
565                 list_del_init(&req->rq_list);
566                 spin_unlock_bh(&obd->obd_processing_task_lock);
567
568                 DEBUG_REQ(D_ERROR, req, "processing: ");
569                 (void)obd->obd_recovery_handler(req);
570                 reset_recovery_timer(obd);
571 #warning FIXME: mds_fsync_super(mds->mds_sb);
572                 class_export_put(req->rq_export);
573                 OBD_FREE(req->rq_reqmsg, req->rq_reqlen);
574                 OBD_FREE(req, sizeof *req);
575                 spin_lock_bh(&obd->obd_processing_task_lock);
576                 obd->obd_next_recovery_transno++;
577                 if (list_empty(&obd->obd_recovery_queue)) {
578                         obd->obd_processing_task = 0;
579                         spin_unlock_bh(&obd->obd_processing_task_lock);
580                         break;
581                 }
582                 spin_unlock_bh(&obd->obd_processing_task_lock);
583         }
584         EXIT;
585 }
586
587 int target_queue_recovery_request(struct ptlrpc_request *req,
588                                   struct obd_device *obd)
589 {
590         struct list_head *tmp;
591         int inserted = 0;
592         __u64 transno = req->rq_reqmsg->transno;
593         struct ptlrpc_request *saved_req;
594         struct lustre_msg *reqmsg;
595
596         /* CAVEAT EMPTOR: The incoming request message has been swabbed
597          * (i.e. buflens etc are in my own byte order), but type-dependent
598          * buffers (eg mds_body, ost_body etc) have NOT been swabbed. */
599
600         if (!transno) {
601                 INIT_LIST_HEAD(&req->rq_list);
602                 DEBUG_REQ(D_HA, req, "not queueing");
603                 return 1;
604         }
605
606         /* XXX If I were a real man, these LBUGs would be sane cleanups. */
607         /* XXX just like the request-dup code in queue_final_reply */
608         OBD_ALLOC(saved_req, sizeof *saved_req);
609         if (!saved_req)
610                 LBUG();
611         OBD_ALLOC(reqmsg, req->rq_reqlen);
612         if (!reqmsg)
613                 LBUG();
614
615         spin_lock_bh(&obd->obd_processing_task_lock);
616
617         /* If we're processing the queue, we want don't want to queue this
618          * message.
619          *
620          * Also, if this request has a transno less than the one we're waiting
621          * for, we should process it now.  It could (and currently always will)
622          * be an open request for a descriptor that was opened some time ago.
623          */
624         if (obd->obd_processing_task == current->pid ||
625             transno < obd->obd_next_recovery_transno) {
626                 /* Processing the queue right now, don't re-add. */
627                 LASSERT(list_empty(&req->rq_list));
628                 spin_unlock_bh(&obd->obd_processing_task_lock);
629                 OBD_FREE(reqmsg, req->rq_reqlen);
630                 OBD_FREE(saved_req, sizeof *saved_req);
631                 return 1;
632         }
633
634         memcpy(saved_req, req, sizeof *req);
635         memcpy(reqmsg, req->rq_reqmsg, req->rq_reqlen);
636         req = saved_req;
637         req->rq_reqmsg = reqmsg;
638         class_export_get(req->rq_export);
639         INIT_LIST_HEAD(&req->rq_list);
640
641         /* XXX O(n^2) */
642         list_for_each(tmp, &obd->obd_recovery_queue) {
643                 struct ptlrpc_request *reqiter =
644                         list_entry(tmp, struct ptlrpc_request, rq_list);
645
646                 if (reqiter->rq_reqmsg->transno > transno) {
647                         list_add_tail(&req->rq_list, &reqiter->rq_list);
648                         inserted = 1;
649                         break;
650                 }
651         }
652
653         if (!inserted) {
654                 list_add_tail(&req->rq_list, &obd->obd_recovery_queue);
655         }
656
657         if (obd->obd_processing_task != 0) {
658                 /* Someone else is processing this queue, we'll leave it to
659                  * them.
660                  */
661                 if (transno == obd->obd_next_recovery_transno)
662                         wake_up(&obd->obd_next_transno_waitq);
663                 spin_unlock_bh(&obd->obd_processing_task_lock);
664                 return 0;
665         }
666
667         /* Nobody is processing, and we know there's (at least) one to process
668          * now, so we'll do the honours.
669          */
670         obd->obd_processing_task = current->pid;
671         spin_unlock_bh(&obd->obd_processing_task_lock);
672
673         process_recovery_queue(obd);
674         return 0;
675 }
676
677 struct obd_device * target_req2obd(struct ptlrpc_request *req)
678 {
679         return req->rq_export->exp_obd;
680 }
681
682 int target_queue_final_reply(struct ptlrpc_request *req, int rc)
683 {
684         struct obd_device *obd = target_req2obd(req);
685         struct ptlrpc_request *saved_req;
686         struct lustre_msg *reqmsg;
687         int recovery_done = 0;
688
689         if (rc) {
690                 /* Just like ptlrpc_error, but without the sending. */
691                 lustre_pack_msg(0, NULL, NULL, &req->rq_replen,
692                                 &req->rq_repmsg);
693                 req->rq_type = PTL_RPC_MSG_ERR;
694         }
695
696         LASSERT(list_empty(&req->rq_list));
697         /* XXX just like the request-dup code in queue_recovery_request */
698         OBD_ALLOC(saved_req, sizeof *saved_req);
699         if (!saved_req)
700                 LBUG();
701         OBD_ALLOC(reqmsg, req->rq_reqlen);
702         if (!reqmsg)
703                 LBUG();
704         memcpy(saved_req, req, sizeof *saved_req);
705         memcpy(reqmsg, req->rq_reqmsg, req->rq_reqlen);
706         req = saved_req;
707         req->rq_reqmsg = reqmsg;
708         list_add(&req->rq_list, &obd->obd_delayed_reply_queue);
709
710         spin_lock_bh(&obd->obd_processing_task_lock);
711         --obd->obd_recoverable_clients;
712         recovery_done = (obd->obd_recoverable_clients == 0);
713         spin_unlock_bh(&obd->obd_processing_task_lock);
714
715         if (recovery_done) {
716                 struct list_head *tmp, *n;
717                 ldlm_reprocess_all_ns(req->rq_export->exp_obd->obd_namespace);
718                 CDEBUG(D_ERROR,
719                        "%s: all clients recovered, sending delayed replies\n",
720                        obd->obd_name);
721                 obd->obd_recovering = 0;
722                 list_for_each_safe(tmp, n, &obd->obd_delayed_reply_queue) {
723                         req = list_entry(tmp, struct ptlrpc_request, rq_list);
724                         DEBUG_REQ(D_ERROR, req, "delayed:");
725                         ptlrpc_reply(req);
726                         list_del(&req->rq_list);
727                         OBD_FREE(req->rq_reqmsg, req->rq_reqlen);
728                         OBD_FREE(req, sizeof *req);
729                 }
730                 target_cancel_recovery_timer(obd);
731         } else {
732                 CERROR("%s: %d recoverable clients remain\n",
733                        obd->obd_name, obd->obd_recoverable_clients);
734         }
735
736         return 1;
737 }
738
739 static void ptlrpc_abort_reply (struct ptlrpc_request *req)
740 {
741         /* On return, we must be sure that the ACK callback has either
742          * happened or will not happen.  Note that the SENT callback will
743          * happen come what may since we successfully posted the PUT. */
744         int rc;
745         struct l_wait_info lwi;
746         unsigned long flags;
747
748  again:
749         /* serialise with ACK callback */
750         spin_lock_irqsave (&req->rq_lock, flags);
751         if (!req->rq_want_ack) {
752                 spin_unlock_irqrestore (&req->rq_lock, flags);
753                 /* The ACK callback has happened already.  Although the
754                  * SENT callback might still be outstanding (yes really) we
755                  * don't care; this is just like normal completion. */
756                 return;
757         }
758         spin_unlock_irqrestore (&req->rq_lock, flags);
759
760         /* Have a bash at unlinking the MD.  This will fail until the SENT
761          * callback has happened since the MD is busy from the PUT.  If the
762          * ACK still hasn't arrived after then, a successful unlink will
763          * ensure the ACK callback never happens. */
764         rc = PtlMDUnlink (req->rq_reply_md_h);
765         switch (rc) {
766         default:
767                 LBUG ();
768         case PTL_OK:
769                 /* SENT callback happened; ACK callback preempted */
770                 LASSERT (req->rq_want_ack);
771                 spin_lock_irqsave (&req->rq_lock, flags);
772                 req->rq_want_ack = 0;
773                 spin_unlock_irqrestore (&req->rq_lock, flags);
774                 return;
775         case PTL_INV_MD:
776                 return;
777         case PTL_MD_INUSE:
778                 /* Still sending or ACK callback in progress: wait until
779                  * either callback has completed and try again.
780                  * Actually we can't wait for the SENT callback because
781                  * there's no state the SENT callback can touch that will
782                  * allow it to communicate with us!  So we just wait here
783                  * for a short time, effectively polling for the SENT
784                  * callback by calling PtlMDUnlink() again, to see if it
785                  * has finished.  Note that if the ACK does arrive, its
786                  * callback wakes us in short order. --eeb */
787                 lwi = LWI_TIMEOUT (HZ/4, NULL, NULL);
788                 rc = l_wait_event(req->rq_wait_for_rep, !req->rq_want_ack,
789                                   &lwi);
790                 CDEBUG (D_HA, "Retrying req %p: %d\n", req, rc);
791                 /* NB go back and test rq_want_ack with locking, to ensure
792                  * if ACK callback happened, it has completed stopped
793                  * referencing this req. */
794                 goto again;
795         }
796 }
797
798 void target_send_reply(struct ptlrpc_request *req, int rc, int fail_id)
799 {
800         int i;
801         int netrc;
802         unsigned long flags;
803         struct ptlrpc_req_ack_lock *ack_lock;
804         struct l_wait_info lwi = { 0 };
805         wait_queue_t commit_wait;
806         struct obd_device *obd =
807                 req->rq_export ? req->rq_export->exp_obd : NULL;
808         struct obd_export *exp =
809                 (req->rq_export && req->rq_ack_locks[0].mode) ?
810                 req->rq_export : NULL;
811
812         if (exp) {
813                 exp->exp_outstanding_reply = req;
814                 spin_lock_irqsave (&req->rq_lock, flags);
815                 req->rq_want_ack = 1;
816                 spin_unlock_irqrestore (&req->rq_lock, flags);
817         }
818
819         if (!OBD_FAIL_CHECK(fail_id | OBD_FAIL_ONCE)) {
820                 if (rc) {
821                         DEBUG_REQ(D_ERROR, req, "processing error (%d)", rc);
822                         netrc = ptlrpc_error(req);
823                 } else {
824                         DEBUG_REQ(D_NET, req, "sending reply");
825                         netrc = ptlrpc_reply(req);
826                 }
827         } else {
828                 obd_fail_loc |= OBD_FAIL_ONCE | OBD_FAILED;
829                 DEBUG_REQ(D_ERROR, req, "dropping reply");
830                 if (!exp && req->rq_repmsg) {
831                         OBD_FREE(req->rq_repmsg, req->rq_replen);
832                         req->rq_repmsg = NULL;
833                 }
834                 init_waitqueue_head(&req->rq_wait_for_rep);
835                 netrc = 0;
836         }
837
838         /* a failed send simulates the callbacks */
839         LASSERT(netrc == 0 || req->rq_want_ack == 0);
840         if (exp == NULL) {
841                 LASSERT(req->rq_want_ack == 0);
842                 return;
843         }
844         LASSERT(obd != NULL);
845
846         init_waitqueue_entry(&commit_wait, current);
847         add_wait_queue(&obd->obd_commit_waitq, &commit_wait);
848         rc = l_wait_event(req->rq_wait_for_rep,
849                           !req->rq_want_ack || req->rq_resent ||
850                           req->rq_transno <= obd->obd_last_committed, &lwi);
851         remove_wait_queue(&obd->obd_commit_waitq, &commit_wait);
852
853         spin_lock_irqsave (&req->rq_lock, flags);
854         /* If we got here because the ACK callback ran, this acts as a
855          * barrier to ensure the callback completed the wakeup. */
856         spin_unlock_irqrestore (&req->rq_lock, flags);
857
858         /* If we committed the transno already, then we might wake up before
859          * the ack arrives.  We need to stop waiting for the ack before we can
860          * reuse this request structure.  We are guaranteed by this point that
861          * this cannot abort the sending of the actual reply.*/
862         ptlrpc_abort_reply(req);
863
864         if (req->rq_resent) {
865                 DEBUG_REQ(D_HA, req, "resent: not cancelling locks");
866                 return;
867         }
868
869         LASSERT(rc == 0);
870         DEBUG_REQ(D_HA, req, "cancelling locks for %s",
871                   req->rq_want_ack ? "commit" : "ack");
872
873         exp->exp_outstanding_reply = NULL;
874
875         for (ack_lock = req->rq_ack_locks, i = 0; i < 4; i++, ack_lock++) {
876                 if (!ack_lock->mode)
877                         break;
878                 ldlm_lock_decref(&ack_lock->lock, ack_lock->mode);
879         }
880 }
881
882 int target_handle_ping(struct ptlrpc_request *req)
883 {
884         return lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
885 }