Whamcloud - gitweb
- merge 0.7rc1 from b_devel to HEAD (20030612 merge point)
[fs/lustre-release.git] / lustre / ldlm / ldlm_lib.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2003 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 #define EXPORT_SYMTAB
23 #define DEBUG_SUBSYSTEM S_LDLM
24
25 #ifdef __KERNEL__
26 # include <linux/module.h>
27 #else
28 # include <liblustre.h>
29 #endif
30 #include <linux/obd_ost.h>
31 #include <linux/lustre_dlm.h>
32 #include <linux/lustre_mds.h>
33 #include <linux/lustre_net.h>
34
35 int client_import_connect(struct lustre_handle *dlm_handle, 
36                           struct obd_device *obd,
37                           struct obd_uuid *cluuid)
38 {
39         struct client_obd *cli = &obd->u.cli;
40         struct obd_import *imp = cli->cl_import;
41         struct obd_export *exp;
42         struct ptlrpc_request *request;
43         /* XXX maybe this is a good time to create a connect struct? */
44         int rc, size[] = {sizeof(imp->imp_target_uuid),
45                           sizeof(obd->obd_uuid),
46                           sizeof(*dlm_handle)};
47         char *tmp[] = {imp->imp_target_uuid.uuid,
48                        obd->obd_uuid.uuid,
49                        (char *)dlm_handle};
50         int rq_opc = (obd->obd_type->typ_ops->o_brw) ? OST_CONNECT :MDS_CONNECT;
51         int msg_flags;
52
53         ENTRY;
54         down(&cli->cl_sem);
55         rc = class_connect(dlm_handle, obd, cluuid);
56         if (rc)
57                 GOTO(out_sem, rc);
58
59         cli->cl_conn_count++;
60         if (cli->cl_conn_count > 1)
61                 GOTO(out_sem, rc);
62
63         if (obd->obd_namespace != NULL)
64                 CERROR("already have namespace!\n");
65         obd->obd_namespace = ldlm_namespace_new(obd->obd_name,
66                                                 LDLM_NAMESPACE_CLIENT);
67         if (obd->obd_namespace == NULL)
68                 GOTO(out_disco, rc = -ENOMEM);
69
70         request = ptlrpc_prep_req(imp, rq_opc, 3, size, tmp);
71         if (!request)
72                 GOTO(out_ldlm, rc = -ENOMEM);
73
74         request->rq_level = LUSTRE_CONN_NEW;
75         request->rq_replen = lustre_msg_size(0, NULL);
76
77         imp->imp_dlm_handle = *dlm_handle;
78
79         imp->imp_level = LUSTRE_CONN_CON;
80         rc = ptlrpc_queue_wait(request);
81         if (rc) {
82                 class_disconnect(dlm_handle, 0);
83                 GOTO(out_req, rc);
84         }
85
86         exp = class_conn2export(dlm_handle);
87         exp->exp_connection = ptlrpc_connection_addref(request->rq_connection);
88         class_export_put(exp);
89
90         msg_flags = lustre_msg_get_op_flags(request->rq_repmsg);
91         if (rq_opc == MDS_CONNECT || msg_flags & MSG_CONNECT_REPLAYABLE) {
92                 imp->imp_replayable = 1;
93                 CDEBUG(D_HA, "connected to replayable target: %s\n",
94                        imp->imp_target_uuid.uuid);
95         }
96         imp->imp_level = LUSTRE_CONN_FULL;
97         imp->imp_remote_handle = request->rq_repmsg->handle;
98         CDEBUG(D_HA, "local import: %p, remote handle: "LPX64"\n", imp,
99                imp->imp_remote_handle.cookie);
100
101         EXIT;
102 out_req:
103         ptlrpc_req_finished(request);
104         if (rc) {
105 out_ldlm:
106                 ldlm_namespace_free(obd->obd_namespace);
107                 obd->obd_namespace = NULL;
108 out_disco:
109                 cli->cl_conn_count--;
110                 class_disconnect(dlm_handle, 0);
111         }
112 out_sem:
113         up(&cli->cl_sem);
114         return rc;
115 }
116
117 int client_import_disconnect(struct lustre_handle *dlm_handle, int failover)
118 {
119         struct obd_device *obd = class_conn2obd(dlm_handle);
120         struct client_obd *cli = &obd->u.cli;
121         struct obd_import *imp = cli->cl_import;
122         struct ptlrpc_request *request = NULL;
123         int rc = 0, err, rq_opc;
124         ENTRY;
125
126         if (!obd) {
127                 CERROR("invalid connection for disconnect: cookie "LPX64"\n",
128                        dlm_handle ? dlm_handle->cookie : -1UL);
129                 RETURN(-EINVAL);
130         }
131
132         rq_opc = obd->obd_type->typ_ops->o_brw ? OST_DISCONNECT:MDS_DISCONNECT;
133         down(&cli->cl_sem);
134         if (!cli->cl_conn_count) {
135                 CERROR("disconnecting disconnected device (%s)\n",
136                        obd->obd_name);
137                 GOTO(out_sem, rc = -EINVAL);
138         }
139
140         cli->cl_conn_count--;
141         if (cli->cl_conn_count)
142                 GOTO(out_no_disconnect, rc = 0);
143
144         if (obd->obd_namespace != NULL) {
145                 /* obd_no_recov == local only */
146                 ldlm_cli_cancel_unused(obd->obd_namespace, NULL,
147                                        obd->obd_no_recov, NULL);
148                 ldlm_namespace_free(obd->obd_namespace);
149                 obd->obd_namespace = NULL;
150         }
151
152         /* Yeah, obd_no_recov also (mainly) means "forced shutdown". */
153         if (obd->obd_no_recov) {
154                 ptlrpc_abort_inflight(imp);
155         } else {
156                 request = ptlrpc_prep_req(imp, rq_opc, 0, NULL, NULL);
157                 if (!request)
158                         GOTO(out_req, rc = -ENOMEM);
159
160                 request->rq_replen = lustre_msg_size(0, NULL);
161
162                 /* Process disconnects even if we're waiting for recovery. */
163                 request->rq_level = LUSTRE_CONN_RECOVD;
164
165                 rc = ptlrpc_queue_wait(request);
166                 if (rc)
167                         GOTO(out_req, rc);
168         }
169         EXIT;
170  out_req:
171         if (request)
172                 ptlrpc_req_finished(request);
173  out_no_disconnect:
174         err = class_disconnect(dlm_handle, 0);
175         if (!rc && err)
176                 rc = err;
177  out_sem:
178         up(&cli->cl_sem);
179         RETURN(rc);
180 }
181
182 /* --------------------------------------------------------------------------
183  * from old lib/target.c
184  * -------------------------------------------------------------------------- */
185
186 int target_handle_reconnect(struct lustre_handle *conn, struct obd_export *exp,
187                             struct obd_uuid *cluuid)
188 {
189         if (exp->exp_connection) {
190                 struct lustre_handle *hdl;
191                 hdl = &exp->exp_ldlm_data.led_import->imp_remote_handle;
192                 /* Might be a re-connect after a partition. */
193                 if (!memcmp(&conn->cookie, &hdl->cookie, sizeof conn->cookie)) {
194                         CERROR("%s reconnecting\n", cluuid->uuid);
195                         conn->cookie = exp->exp_handle.h_cookie;
196                         RETURN(EALREADY);
197                 } else {
198                         CERROR("%s reconnecting from %s, "
199                                "handle mismatch (ours "LPX64", theirs "
200                                LPX64")\n", cluuid->uuid,
201                                exp->exp_connection->c_remote_uuid.uuid,
202                                hdl->cookie, conn->cookie);
203                         /* XXX disconnect them here? */
204                         memset(conn, 0, sizeof *conn);
205                         /* This is a little scary, but right now we build this
206                          * file separately into each server module, so I won't
207                          * go _immediately_ to hell.
208                          */
209                         RETURN(-EALREADY);
210                 }
211         }
212
213         conn->cookie = exp->exp_handle.h_cookie;
214         CDEBUG(D_INFO, "existing export for UUID '%s' at %p\n",
215                cluuid->uuid, exp);
216         CDEBUG(D_IOCTL,"connect: cookie "LPX64"\n", conn->cookie);
217         RETURN(0);
218 }
219
220 int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler)
221 {
222         struct obd_device *target;
223         struct obd_export *export = NULL;
224         struct obd_import *dlmimp;
225         struct lustre_handle conn;
226         struct obd_uuid tgtuuid;
227         struct obd_uuid cluuid;
228         struct obd_uuid remote_uuid;
229         struct list_head *p;
230         char *str, *tmp;
231         int rc, i, abort_recovery;
232         ENTRY;
233
234         LASSERT_REQSWAB (req, 0);
235         str = lustre_msg_string (req->rq_reqmsg, 0, sizeof (tgtuuid.uuid) - 1);
236         if (str == NULL) {
237                 CERROR("bad target UUID for connect\n");
238                 GOTO(out, rc = -EINVAL);
239         }
240         obd_str2uuid (&tgtuuid, str);
241
242         LASSERT_REQSWAB (req, 1);
243         str = lustre_msg_string (req->rq_reqmsg, 1, sizeof (cluuid.uuid) - 1);
244         if (str == NULL) {
245                 CERROR("bad client UUID for connect\n");
246                 GOTO(out, rc = -EINVAL);
247         }
248         obd_str2uuid (&cluuid, str);
249
250         i = class_uuid2dev(&tgtuuid);
251         if (i == -1) {
252                 CERROR("UUID '%s' not found for connect\n", tgtuuid.uuid);
253                 GOTO(out, rc = -ENODEV);
254         }
255
256         target = &obd_dev[i];
257         if (!target || target->obd_stopping || !target->obd_set_up) {
258                 CERROR("UUID '%s' is not available for connect\n", str);
259                 GOTO(out, rc = -ENODEV);
260         }
261
262         /* XXX extract a nettype and format accordingly */
263         snprintf(remote_uuid.uuid, sizeof remote_uuid,
264                  "NET_"LPX64"_UUID", req->rq_peer.peer_nid);
265
266         spin_lock_bh(&target->obd_processing_task_lock);
267         abort_recovery = target->obd_abort_recovery;
268         spin_unlock_bh(&target->obd_processing_task_lock);
269         if (abort_recovery)
270                 target_abort_recovery(target);
271
272         tmp = lustre_msg_buf(req->rq_reqmsg, 2, sizeof conn);
273         if (tmp == NULL)
274                 GOTO(out, rc = -EPROTO);
275
276         memcpy(&conn, tmp, sizeof conn);
277
278         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
279         if (rc)
280                 GOTO(out, rc);
281
282         /* lctl gets a backstage, all-access pass. */
283         if (obd_uuid_equals(&cluuid, &lctl_fake_uuid))
284                 goto dont_check_exports;
285
286         spin_lock(&target->obd_dev_lock);
287         list_for_each(p, &target->obd_exports) {
288                 export = list_entry(p, struct obd_export, exp_obd_chain);
289                 if (obd_uuid_equals(&cluuid, &export->exp_client_uuid)) {
290                         spin_unlock(&target->obd_dev_lock);
291                         LASSERT(export->exp_obd == target);
292
293                         rc = target_handle_reconnect(&conn, export, &cluuid);
294                         break;
295                 }
296                 export = NULL;
297         }
298         /* If we found an export, we already unlocked. */
299         if (!export)
300                 spin_unlock(&target->obd_dev_lock);
301
302         /* Tell the client if we're in recovery. */
303         /* If this is the first client, start the recovery timer */
304         if (target->obd_recovering) {
305                 lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_RECOVERING);
306                 target_start_recovery_timer(target, handler);
307         }
308
309         /* Tell the client if we support replayable requests */
310         if (target->obd_replayable)
311                 lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_REPLAYABLE);
312
313         if (export == NULL) {
314                 if (target->obd_recovering) {
315                         CERROR("denying connection for new client %s: "
316                                "in recovery\n", cluuid.uuid);
317                         rc = -EBUSY;
318                 } else {
319  dont_check_exports:
320                         rc = obd_connect(&conn, target, &cluuid);
321                 }
322         }
323
324         /* If all else goes well, this is our RPC return code. */
325         req->rq_status = 0;
326
327         if (rc && rc != EALREADY)
328                 GOTO(out, rc);
329
330         req->rq_repmsg->handle = conn;
331
332         /* If the client and the server are the same node, we will already
333          * have an export that really points to the client's DLM export,
334          * because we have a shared handles table.
335          *
336          * XXX this will go away when shaver stops sending the "connect" handle
337          * in the real "remote handle" field of the request --phik 24 Apr 2003
338          */
339         if (req->rq_export != NULL)
340                 class_export_put(req->rq_export);
341
342         /* ownership of this export ref transfers to the request */
343         export = req->rq_export = class_conn2export(&conn);
344         LASSERT(export != NULL);
345
346         if (req->rq_connection != NULL)
347                 ptlrpc_put_connection(req->rq_connection);
348         if (export->exp_connection != NULL)
349                 ptlrpc_put_connection(export->exp_connection);
350         export->exp_connection = ptlrpc_get_connection(&req->rq_peer,
351                                                        &remote_uuid);
352         req->rq_connection = ptlrpc_connection_addref(export->exp_connection);
353
354         if (rc == EALREADY) {
355                 /* We indicate the reconnection in a flag, not an error code. */
356                 lustre_msg_add_op_flags(req->rq_repmsg, MSG_CONNECT_RECONNECT);
357                 GOTO(out, rc = 0);
358         }
359
360         memcpy(&conn, lustre_msg_buf(req->rq_reqmsg, 2, sizeof conn),
361                sizeof conn);
362
363         if (export->exp_ldlm_data.led_import != NULL)
364                 class_destroy_import(export->exp_ldlm_data.led_import);
365         dlmimp = export->exp_ldlm_data.led_import = class_new_import();
366         dlmimp->imp_connection = ptlrpc_connection_addref(req->rq_connection);
367         dlmimp->imp_client = &export->exp_obd->obd_ldlm_client;
368         dlmimp->imp_remote_handle = conn;
369         dlmimp->imp_obd = target;
370         dlmimp->imp_dlm_fake = 1;
371         dlmimp->imp_level = LUSTRE_CONN_FULL;
372         class_import_put(dlmimp);
373 out:
374         if (rc)
375                 req->rq_status = rc;
376         RETURN(rc);
377 }
378
379 int target_handle_disconnect(struct ptlrpc_request *req)
380 {
381         struct lustre_handle *conn = &req->rq_reqmsg->handle;
382         struct obd_import *dlmimp;
383         int rc;
384         ENTRY;
385
386         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
387         if (rc)
388                 RETURN(rc);
389
390         req->rq_status = obd_disconnect(conn, 0);
391
392         dlmimp = req->rq_export->exp_ldlm_data.led_import;
393         class_destroy_import(dlmimp);
394
395         class_export_put(req->rq_export);
396         req->rq_export = NULL;
397         RETURN(0);
398 }
399
400 /*
401  * Recovery functions
402  */
403
404 void target_cancel_recovery_timer(struct obd_device *obd)
405 {
406         del_timer(&obd->obd_recovery_timer);
407 }
408
409 static void abort_delayed_replies(struct obd_device *obd)
410 {
411         struct ptlrpc_request *req;
412         struct list_head *tmp, *n;
413         list_for_each_safe(tmp, n, &obd->obd_delayed_reply_queue) {
414                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
415                 DEBUG_REQ(D_ERROR, req, "aborted:");
416                 req->rq_status = -ENOTCONN;
417                 req->rq_type = PTL_RPC_MSG_ERR;
418                 ptlrpc_reply(req);
419                 list_del(&req->rq_list);
420                 OBD_FREE(req->rq_reqmsg, req->rq_reqlen);
421                 OBD_FREE(req, sizeof *req);
422         }
423 }
424
425 static void abort_recovery_queue(struct obd_device *obd)
426 {
427         struct ptlrpc_request *req;
428         struct list_head *tmp, *n;
429         int rc;
430
431         list_for_each_safe(tmp, n, &obd->obd_recovery_queue) {
432                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
433                 DEBUG_REQ(D_ERROR, req, "aborted:");
434                 req->rq_status = -ENOTCONN;
435                 req->rq_type = PTL_RPC_MSG_ERR;
436                 rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen,
437                                      &req->rq_repmsg);
438                 if (rc == 0) {
439                         ptlrpc_reply(req);
440                 } else {
441                         DEBUG_REQ(D_ERROR, req,
442                                   "packing failed for abort-reply; skipping");
443                 }
444                 list_del(&req->rq_list);
445                 class_export_put(req->rq_export);
446                 OBD_FREE(req->rq_reqmsg, req->rq_reqlen);
447                 OBD_FREE(req, sizeof *req);
448         }
449 }
450
451 void target_abort_recovery(void *data)
452 {
453         struct obd_device *obd = data;
454
455         CERROR("disconnecting clients and aborting recovery\n");
456         spin_lock_bh(&obd->obd_processing_task_lock);
457         if (!obd->obd_recovering) {
458                 spin_unlock_bh(&obd->obd_processing_task_lock);
459                 EXIT;
460                 return;
461         }
462
463         obd->obd_recovering = obd->obd_abort_recovery = 0;
464         obd->obd_recoverable_clients = 0;
465         wake_up(&obd->obd_next_transno_waitq);
466         target_cancel_recovery_timer(obd);
467         spin_unlock_bh(&obd->obd_processing_task_lock);
468         class_disconnect_exports(obd, 0);
469         abort_delayed_replies(obd);
470         abort_recovery_queue(obd);
471 }
472
473 static void target_recovery_expired(unsigned long castmeharder)
474 {
475         struct obd_device *obd = (struct obd_device *)castmeharder;
476         CERROR("recovery timed out, aborting\n");
477         spin_lock_bh(&obd->obd_processing_task_lock);
478         obd->obd_abort_recovery = 1;
479         wake_up(&obd->obd_next_transno_waitq);
480         spin_unlock_bh(&obd->obd_processing_task_lock);
481 }
482
483 static void reset_recovery_timer(struct obd_device *obd)
484 {
485         int recovering;
486         spin_lock(&obd->obd_dev_lock);
487         recovering = obd->obd_recovering;
488         spin_unlock(&obd->obd_dev_lock);
489
490         if (!recovering)
491                 return;
492         CDEBUG(D_ERROR, "timer will expire in %ld seconds\n",
493                OBD_RECOVERY_TIMEOUT / HZ);
494         mod_timer(&obd->obd_recovery_timer, jiffies + OBD_RECOVERY_TIMEOUT);
495 }
496
497
498 /* Only start it the first time called */
499 void target_start_recovery_timer(struct obd_device *obd, svc_handler_t handler)
500 {
501         spin_lock_bh(&obd->obd_processing_task_lock);
502         if (obd->obd_recovery_handler) {
503                 spin_unlock_bh(&obd->obd_processing_task_lock);
504                 return;
505         }
506         CERROR("%s: starting recovery timer\n", obd->obd_name);
507         obd->obd_recovery_handler = handler;
508         obd->obd_recovery_timer.function = target_recovery_expired;
509         obd->obd_recovery_timer.data = (unsigned long)obd;
510         init_timer(&obd->obd_recovery_timer);
511         spin_unlock_bh(&obd->obd_processing_task_lock);
512
513         reset_recovery_timer(obd);
514 }
515
516 static int check_for_next_transno(struct obd_device *obd)
517 {
518         struct ptlrpc_request *req;
519         int wake_up;
520
521         req = list_entry(obd->obd_recovery_queue.next,
522                          struct ptlrpc_request, rq_list);
523         LASSERT(req->rq_reqmsg->transno >= obd->obd_next_recovery_transno);
524
525         wake_up = req->rq_reqmsg->transno == obd->obd_next_recovery_transno ||
526                 (obd->obd_recovering) == 0;
527         CDEBUG(D_HA, "check_for_next_transno: "LPD64" vs "LPD64", %d == %d\n",
528                req->rq_reqmsg->transno, obd->obd_next_recovery_transno,
529                obd->obd_recovering, wake_up);
530         return wake_up;
531 }
532
533 static void process_recovery_queue(struct obd_device *obd)
534 {
535         struct ptlrpc_request *req;
536         int abort_recovery = 0;
537         struct l_wait_info lwi = { 0 };
538         ENTRY;
539
540         for (;;) {
541                 spin_lock_bh(&obd->obd_processing_task_lock);
542                 LASSERT(obd->obd_processing_task == current->pid);
543                 req = list_entry(obd->obd_recovery_queue.next,
544                                  struct ptlrpc_request, rq_list);
545
546                 if (req->rq_reqmsg->transno != obd->obd_next_recovery_transno) {
547                         spin_unlock_bh(&obd->obd_processing_task_lock);
548                         CDEBUG(D_HA, "Waiting for transno "LPD64" (1st is "
549                                LPD64")\n",
550                                obd->obd_next_recovery_transno,
551                                req->rq_reqmsg->transno);
552                         l_wait_event(obd->obd_next_transno_waitq,
553                                      check_for_next_transno(obd), &lwi);
554                         spin_lock_bh(&obd->obd_processing_task_lock);
555                         abort_recovery = obd->obd_abort_recovery;
556                         spin_unlock_bh(&obd->obd_processing_task_lock);
557                         if (abort_recovery) {
558                                 target_abort_recovery(obd);
559                                 return;
560                         }
561                         continue;
562                 }
563                 list_del_init(&req->rq_list);
564                 spin_unlock_bh(&obd->obd_processing_task_lock);
565
566                 DEBUG_REQ(D_ERROR, req, "processing: ");
567                 (void)obd->obd_recovery_handler(req);
568                 reset_recovery_timer(obd);
569 #warning FIXME: mds_fsync_super(mds->mds_sb);
570                 class_export_put(req->rq_export);
571                 OBD_FREE(req->rq_reqmsg, req->rq_reqlen);
572                 OBD_FREE(req, sizeof *req);
573                 spin_lock_bh(&obd->obd_processing_task_lock);
574                 obd->obd_next_recovery_transno++;
575                 if (list_empty(&obd->obd_recovery_queue)) {
576                         obd->obd_processing_task = 0;
577                         spin_unlock_bh(&obd->obd_processing_task_lock);
578                         break;
579                 }
580                 spin_unlock_bh(&obd->obd_processing_task_lock);
581         }
582         EXIT;
583 }
584
585 int target_queue_recovery_request(struct ptlrpc_request *req,
586                                   struct obd_device *obd)
587 {
588         struct list_head *tmp;
589         int inserted = 0;
590         __u64 transno = req->rq_reqmsg->transno;
591         struct ptlrpc_request *saved_req;
592         struct lustre_msg *reqmsg;
593
594         /* CAVEAT EMPTOR: The incoming request message has been swabbed
595          * (i.e. buflens etc are in my own byte order), but type-dependent
596          * buffers (eg mds_body, ost_body etc) have NOT been swabbed. */
597
598         if (!transno) {
599                 INIT_LIST_HEAD(&req->rq_list);
600                 DEBUG_REQ(D_HA, req, "not queueing");
601                 return 1;
602         }
603
604         /* XXX If I were a real man, these LBUGs would be sane cleanups. */
605         /* XXX just like the request-dup code in queue_final_reply */
606         OBD_ALLOC(saved_req, sizeof *saved_req);
607         if (!saved_req)
608                 LBUG();
609         OBD_ALLOC(reqmsg, req->rq_reqlen);
610         if (!reqmsg)
611                 LBUG();
612
613         spin_lock_bh(&obd->obd_processing_task_lock);
614
615         /* If we're processing the queue, we want don't want to queue this
616          * message.
617          *
618          * Also, if this request has a transno less than the one we're waiting
619          * for, we should process it now.  It could (and currently always will)
620          * be an open request for a descriptor that was opened some time ago.
621          */
622         if (obd->obd_processing_task == current->pid ||
623             transno < obd->obd_next_recovery_transno) {
624                 /* Processing the queue right now, don't re-add. */
625                 LASSERT(list_empty(&req->rq_list));
626                 spin_unlock_bh(&obd->obd_processing_task_lock);
627                 OBD_FREE(reqmsg, req->rq_reqlen);
628                 OBD_FREE(saved_req, sizeof *saved_req);
629                 return 1;
630         }
631
632         memcpy(saved_req, req, sizeof *req);
633         memcpy(reqmsg, req->rq_reqmsg, req->rq_reqlen);
634         req = saved_req;
635         req->rq_reqmsg = reqmsg;
636         class_export_get(req->rq_export);
637         INIT_LIST_HEAD(&req->rq_list);
638
639         /* XXX O(n^2) */
640         list_for_each(tmp, &obd->obd_recovery_queue) {
641                 struct ptlrpc_request *reqiter =
642                         list_entry(tmp, struct ptlrpc_request, rq_list);
643
644                 if (reqiter->rq_reqmsg->transno > transno) {
645                         list_add_tail(&req->rq_list, &reqiter->rq_list);
646                         inserted = 1;
647                         break;
648                 }
649         }
650
651         if (!inserted) {
652                 list_add_tail(&req->rq_list, &obd->obd_recovery_queue);
653         }
654
655         if (obd->obd_processing_task != 0) {
656                 /* Someone else is processing this queue, we'll leave it to
657                  * them.
658                  */
659                 if (transno == obd->obd_next_recovery_transno)
660                         wake_up(&obd->obd_next_transno_waitq);
661                 spin_unlock_bh(&obd->obd_processing_task_lock);
662                 return 0;
663         }
664
665         /* Nobody is processing, and we know there's (at least) one to process
666          * now, so we'll do the honours.
667          */
668         obd->obd_processing_task = current->pid;
669         spin_unlock_bh(&obd->obd_processing_task_lock);
670
671         process_recovery_queue(obd);
672         return 0;
673 }
674
675 struct obd_device * target_req2obd(struct ptlrpc_request *req)
676 {
677         return req->rq_export->exp_obd;
678 }
679
680 int target_queue_final_reply(struct ptlrpc_request *req, int rc)
681 {
682         struct obd_device *obd = target_req2obd(req);
683         struct ptlrpc_request *saved_req;
684         struct lustre_msg *reqmsg;
685         int recovery_done = 0;
686
687         if (rc) {
688                 /* Just like ptlrpc_error, but without the sending. */
689                 lustre_pack_msg(0, NULL, NULL, &req->rq_replen,
690                                 &req->rq_repmsg);
691                 req->rq_type = PTL_RPC_MSG_ERR;
692         }
693
694         LASSERT(list_empty(&req->rq_list));
695         /* XXX just like the request-dup code in queue_recovery_request */
696         OBD_ALLOC(saved_req, sizeof *saved_req);
697         if (!saved_req)
698                 LBUG();
699         OBD_ALLOC(reqmsg, req->rq_reqlen);
700         if (!reqmsg)
701                 LBUG();
702         memcpy(saved_req, req, sizeof *saved_req);
703         memcpy(reqmsg, req->rq_reqmsg, req->rq_reqlen);
704         req = saved_req;
705         req->rq_reqmsg = reqmsg;
706         list_add(&req->rq_list, &obd->obd_delayed_reply_queue);
707
708         spin_lock_bh(&obd->obd_processing_task_lock);
709         --obd->obd_recoverable_clients;
710         recovery_done = (obd->obd_recoverable_clients == 0);
711         spin_unlock_bh(&obd->obd_processing_task_lock);
712
713         if (recovery_done) {
714                 struct list_head *tmp, *n;
715                 ldlm_reprocess_all_ns(req->rq_export->exp_obd->obd_namespace);
716                 CDEBUG(D_ERROR,
717                        "%s: all clients recovered, sending delayed replies\n",
718                        obd->obd_name);
719                 obd->obd_recovering = 0;
720                 list_for_each_safe(tmp, n, &obd->obd_delayed_reply_queue) {
721                         req = list_entry(tmp, struct ptlrpc_request, rq_list);
722                         DEBUG_REQ(D_ERROR, req, "delayed:");
723                         ptlrpc_reply(req);
724                         list_del(&req->rq_list);
725                         OBD_FREE(req->rq_reqmsg, req->rq_reqlen);
726                         OBD_FREE(req, sizeof *req);
727                 }
728                 target_cancel_recovery_timer(obd);
729         } else {
730                 CERROR("%s: %d recoverable clients remain\n",
731                        obd->obd_name, obd->obd_recoverable_clients);
732         }
733
734         return 1;
735 }
736
737 static void ptlrpc_abort_reply (struct ptlrpc_request *req)
738 {
739         /* On return, we must be sure that the ACK callback has either
740          * happened or will not happen.  Note that the SENT callback will
741          * happen come what may since we successfully posted the PUT. */
742         int rc;
743         struct l_wait_info lwi;
744         unsigned long flags;
745
746  again:
747         /* serialise with ACK callback */
748         spin_lock_irqsave (&req->rq_lock, flags);
749         if (!req->rq_want_ack) {
750                 spin_unlock_irqrestore (&req->rq_lock, flags);
751                 /* The ACK callback has happened already.  Although the
752                  * SENT callback might still be outstanding (yes really) we
753                  * don't care; this is just like normal completion. */
754                 return;
755         }
756         spin_unlock_irqrestore (&req->rq_lock, flags);
757
758         /* Have a bash at unlinking the MD.  This will fail until the SENT
759          * callback has happened since the MD is busy from the PUT.  If the
760          * ACK still hasn't arrived after then, a successful unlink will
761          * ensure the ACK callback never happens. */
762         rc = PtlMDUnlink (req->rq_reply_md_h);
763         switch (rc) {
764         default:
765                 LBUG ();
766         case PTL_OK:
767                 /* SENT callback happened; ACK callback preempted */
768                 LASSERT (req->rq_want_ack);
769                 spin_lock_irqsave (&req->rq_lock, flags);
770                 req->rq_want_ack = 0;
771                 spin_unlock_irqrestore (&req->rq_lock, flags);
772                 return;
773         case PTL_INV_MD:
774                 return;
775         case PTL_MD_INUSE:
776                 /* Still sending or ACK callback in progress: wait until
777                  * either callback has completed and try again.
778                  * Actually we can't wait for the SENT callback because
779                  * there's no state the SENT callback can touch that will
780                  * allow it to communicate with us!  So we just wait here
781                  * for a short time, effectively polling for the SENT
782                  * callback by calling PtlMDUnlink() again, to see if it
783                  * has finished.  Note that if the ACK does arrive, its
784                  * callback wakes us in short order. --eeb */
785                 lwi = LWI_TIMEOUT (HZ/4, NULL, NULL);
786                 rc = l_wait_event(req->rq_wait_for_rep, !req->rq_want_ack,
787                                   &lwi);
788                 CDEBUG (D_HA, "Retrying req %p: %d\n", req, rc);
789                 /* NB go back and test rq_want_ack with locking, to ensure
790                  * if ACK callback happened, it has completed stopped
791                  * referencing this req. */
792                 goto again;
793         }
794 }
795
796 void target_send_reply(struct ptlrpc_request *req, int rc, int fail_id)
797 {
798         int i;
799         int netrc;
800         unsigned long flags;
801         struct ptlrpc_req_ack_lock *ack_lock;
802         struct l_wait_info lwi = { 0 };
803         wait_queue_t commit_wait;
804         struct obd_device *obd =
805                 req->rq_export ? req->rq_export->exp_obd : NULL;
806         struct obd_export *exp =
807                 (req->rq_export && req->rq_ack_locks[0].mode) ?
808                 req->rq_export : NULL;
809
810         if (exp) {
811                 exp->exp_outstanding_reply = req;
812                 spin_lock_irqsave (&req->rq_lock, flags);
813                 req->rq_want_ack = 1;
814                 spin_unlock_irqrestore (&req->rq_lock, flags);
815         }
816
817         if (!OBD_FAIL_CHECK(fail_id | OBD_FAIL_ONCE)) {
818                 if (rc) {
819                         DEBUG_REQ(D_ERROR, req, "processing error (%d)", rc);
820                         netrc = ptlrpc_error(req);
821                 } else {
822                         DEBUG_REQ(D_NET, req, "sending reply");
823                         netrc = ptlrpc_reply(req);
824                 }
825         } else {
826                 obd_fail_loc |= OBD_FAIL_ONCE | OBD_FAILED;
827                 DEBUG_REQ(D_ERROR, req, "dropping reply");
828                 if (!exp && req->rq_repmsg) {
829                         OBD_FREE(req->rq_repmsg, req->rq_replen);
830                         req->rq_repmsg = NULL;
831                 }
832                 init_waitqueue_head(&req->rq_wait_for_rep);
833                 netrc = 0;
834         }
835
836         /* a failed send simulates the callbacks */
837         LASSERT(netrc == 0 || req->rq_want_ack == 0);
838         if (exp == NULL) {
839                 LASSERT(req->rq_want_ack == 0);
840                 return;
841         }
842         LASSERT(obd != NULL);
843
844         init_waitqueue_entry(&commit_wait, current);
845         add_wait_queue(&obd->obd_commit_waitq, &commit_wait);
846         rc = l_wait_event(req->rq_wait_for_rep,
847                           !req->rq_want_ack || req->rq_resent ||
848                           req->rq_transno <= obd->obd_last_committed, &lwi);
849         remove_wait_queue(&obd->obd_commit_waitq, &commit_wait);
850
851         spin_lock_irqsave (&req->rq_lock, flags);
852         /* If we got here because the ACK callback ran, this acts as a
853          * barrier to ensure the callback completed the wakeup. */
854         spin_unlock_irqrestore (&req->rq_lock, flags);
855
856         /* If we committed the transno already, then we might wake up before
857          * the ack arrives.  We need to stop waiting for the ack before we can
858          * reuse this request structure.  We are guaranteed by this point that
859          * this cannot abort the sending of the actual reply.*/
860         ptlrpc_abort_reply(req);
861
862         if (req->rq_resent) {
863                 DEBUG_REQ(D_HA, req, "resent: not cancelling locks");
864                 return;
865         }
866
867         LASSERT(rc == 0);
868         DEBUG_REQ(D_HA, req, "cancelling locks for %s",
869                   req->rq_want_ack ? "commit" : "ack");
870
871         exp->exp_outstanding_reply = NULL;
872
873         for (ack_lock = req->rq_ack_locks, i = 0; i < 4; i++, ack_lock++) {
874                 if (!ack_lock->mode)
875                         break;
876                 ldlm_lock_decref(&ack_lock->lock, ack_lock->mode);
877         }
878 }
879
880 int target_handle_ping(struct ptlrpc_request *req)
881 {
882         return lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
883 }