1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2002, 2003 Cluster File Systems, Inc.
6 * This file is part of Lustre, http://www.lustre.org.
8 * Lustre is free software; you can redistribute it and/or
9 * modify it under the terms of version 2 of the GNU General Public
10 * License as published by the Free Software Foundation.
12 * Lustre is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with Lustre; if not, write to the Free Software
19 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22 #define DEBUG_SUBSYSTEM S_LDLM
25 #include <liblustre.h>
28 #include <linux/lustre_dlm.h>
29 #include <linux/obd_class.h>
30 #include <linux/obd.h>
32 #include "ldlm_internal.h"
34 static void interrupted_completion_wait(void *data)
38 struct lock_wait_data {
39 struct ldlm_lock *lwd_lock;
43 int ldlm_expired_completion_wait(void *data)
45 static unsigned long next_dump = 0;
46 struct lock_wait_data *lwd = data;
47 struct ldlm_lock *lock = lwd->lwd_lock;
48 struct obd_import *imp;
49 struct obd_device *obd;
51 if (lock->l_conn_export == NULL) {
52 LDLM_ERROR(lock, "lock timed out; not entering recovery in "
53 "server code, just going back to sleep");
54 if (time_after(jiffies, next_dump)) {
55 unsigned int debug = portal_debug;
56 next_dump = jiffies + 300 * HZ;
57 portal_debug |= D_OTHER;
58 ldlm_namespace_dump(lock->l_resource->lr_namespace);
64 obd = lock->l_conn_export->exp_obd;
65 imp = obd->u.cli.cl_import;
66 ptlrpc_fail_import(imp, lwd->lwd_generation);
67 LDLM_ERROR(lock, "lock timed out, entering recovery for %s@%s",
68 imp->imp_target_uuid.uuid,
69 imp->imp_connection->c_remote_uuid.uuid);
74 int ldlm_completion_ast(struct ldlm_lock *lock, int flags, void *data)
76 /* XXX ALLOCATE - 160 mytes */
77 struct lock_wait_data lwd;
78 unsigned long irqflags;
79 struct obd_device *obd;
80 struct obd_import *imp = NULL;
82 struct l_wait_info lwi;
85 if (flags == LDLM_FL_WAIT_NOREPROC)
89 wake_up(&lock->l_waitq);
93 if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
97 LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, "
99 ldlm_lock_dump(D_OTHER, lock, 0);
100 ldlm_reprocess_all(lock->l_resource);
104 obd = class_exp2obd(lock->l_conn_export);
106 /* if this is a local lock, then there is no import */
108 imp = obd->u.cli.cl_import;
112 lwi = LWI_TIMEOUT_INTR(obd_timeout * HZ, ldlm_expired_completion_wait,
113 interrupted_completion_wait, &lwd);
115 spin_lock_irqsave(&imp->imp_lock, irqflags);
116 lwd.lwd_generation = imp->imp_generation;
117 spin_unlock_irqrestore(&imp->imp_lock, irqflags);
120 /* Go to sleep until the lock is granted or cancelled. */
121 rc = l_wait_event(lock->l_waitq,
122 ((lock->l_req_mode == lock->l_granted_mode) ||
123 (lock->l_flags & LDLM_FL_CANCEL)), &lwi);
125 if (lock->l_destroyed) {
126 LDLM_DEBUG(lock, "client-side enqueue waking up: destroyed");
131 LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
136 LDLM_DEBUG(lock, "client-side enqueue waking up: granted");
140 static int ldlm_cli_enqueue_local(struct ldlm_namespace *ns,
141 struct lustre_handle *parent_lockh,
142 struct ldlm_res_id res_id,
144 void *cookie, int cookielen,
147 ldlm_completion_callback completion,
148 ldlm_blocking_callback blocking,
150 struct lustre_handle *lockh)
152 struct ldlm_lock *lock;
157 CERROR("Trying to enqueue local lock in a shadow namespace\n");
161 lock = ldlm_lock_create(ns, parent_lockh, res_id, type, mode,
162 blocking, completion, data);
164 GOTO(out_nolock, err = -ENOMEM);
165 LDLM_DEBUG(lock, "client-side local enqueue handler, new lock created");
167 ldlm_lock_addref_internal(lock, mode);
168 ldlm_lock2handle(lock, lockh);
169 lock->l_flags |= LDLM_FL_LOCAL;
171 err = ldlm_lock_enqueue(ns, &lock, cookie, cookielen, flags);
175 if (type != LDLM_PLAIN)
176 memcpy(cookie, &lock->l_policy_data, cookielen);
177 if ((*flags) & LDLM_FL_LOCK_CHANGED)
178 memcpy(&res_id, &lock->l_resource->lr_name, sizeof(res_id));
180 LDLM_DEBUG_NOLOCK("client-side local enqueue handler END (lock %p)",
183 if (lock->l_completion_ast)
184 lock->l_completion_ast(lock, *flags, NULL);
186 LDLM_DEBUG(lock, "client-side local enqueue END");
194 static void failed_lock_cleanup(struct ldlm_namespace *ns,
195 struct ldlm_lock *lock,
196 struct lustre_handle *lockh, int mode)
198 /* Set a flag to prevent us from sending a CANCEL (bug 407) */
199 l_lock(&ns->ns_lock);
200 lock->l_flags |= LDLM_FL_LOCAL_ONLY;
201 LDLM_DEBUG(lock, "setting FL_LOCAL_ONLY");
202 l_unlock(&ns->ns_lock);
204 ldlm_lock_decref_and_cancel(lockh, mode);
207 int ldlm_cli_enqueue(struct obd_export *exp,
208 struct ptlrpc_request *req,
209 struct ldlm_namespace *ns,
210 struct lustre_handle *parent_lock_handle,
211 struct ldlm_res_id res_id,
213 void *cookie, int cookielen,
216 ldlm_completion_callback completion,
217 ldlm_blocking_callback blocking,
219 struct lustre_handle *lockh)
221 struct ldlm_lock *lock;
222 struct ldlm_request *body;
223 struct ldlm_reply *reply;
224 int rc, size = sizeof(*body), req_passed_in = 1, is_replay;
227 is_replay = *flags & LDLM_FL_REPLAY;
228 LASSERT(exp != NULL || !is_replay);
231 rc = ldlm_cli_enqueue_local(ns, parent_lock_handle, res_id,
232 type, cookie, cookielen, mode,
233 flags, completion, blocking, data,
238 /* If we're replaying this lock, just check some invariants.
239 * If we're creating a new lock, get everything all setup nice. */
241 lock = ldlm_handle2lock(lockh);
242 LDLM_DEBUG(lock, "client-side enqueue START");
243 LASSERT(exp == lock->l_conn_export);
245 lock = ldlm_lock_create(ns, parent_lock_handle, res_id, type,
246 mode, blocking, completion, data);
248 GOTO(out_nolock, rc = -ENOMEM);
249 /* for the local lock, add the reference */
250 ldlm_lock_addref_internal(lock, mode);
251 ldlm_lock2handle(lock, lockh);
252 if (type != LDLM_PLAIN)
253 memcpy(&lock->l_policy_data, cookie, cookielen);
254 LDLM_DEBUG(lock, "client-side enqueue START");
258 req = ptlrpc_prep_req(class_exp2cliimp(exp), LDLM_ENQUEUE, 1,
261 GOTO(out, rc = -ENOMEM);
263 } else if (req->rq_reqmsg->buflens[0] != sizeof(*body))
266 /* Dump lock data into the request buffer */
267 body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
268 ldlm_lock2desc(lock, &body->lock_desc);
269 body->lock_flags = *flags;
271 memcpy(&body->lock_handle1, lockh, sizeof(*lockh));
272 if (parent_lock_handle)
273 memcpy(&body->lock_handle2, parent_lock_handle,
274 sizeof(body->lock_handle2));
276 /* Continue as normal. */
277 if (!req_passed_in) {
278 size = sizeof(*reply);
279 req->rq_replen = lustre_msg_size(1, &size);
281 lock->l_conn_export = exp;
282 lock->l_export = NULL;
283 lock->l_blocking_ast = blocking;
285 LDLM_DEBUG(lock, "sending request");
286 rc = ptlrpc_queue_wait(req);
288 if (rc != ELDLM_OK) {
290 LDLM_DEBUG(lock, "client-side enqueue END (%s)",
291 rc == ELDLM_LOCK_ABORTED ? "ABORTED" : "FAILED");
292 failed_lock_cleanup(ns, lock, lockh, mode);
293 if (rc == ELDLM_LOCK_ABORTED) {
294 /* Before we return, swab the reply */
295 reply = lustre_swab_repbuf(req, 0, sizeof(*reply),
296 lustre_swab_ldlm_reply);
298 CERROR("Can't unpack ldlm_reply\n");
299 GOTO(out_req, rc = -EPROTO);
305 reply = lustre_swab_repbuf(req, 0, sizeof(*reply),
306 lustre_swab_ldlm_reply);
308 CERROR("Can't unpack ldlm_reply\n");
309 GOTO(out_req, rc = -EPROTO);
312 memcpy(&lock->l_remote_handle, &reply->lock_handle,
313 sizeof(lock->l_remote_handle));
314 *flags = reply->lock_flags;
316 CDEBUG(D_INFO, "local: %p, remote cookie: "LPX64", flags: 0x%x\n",
317 lock, reply->lock_handle.cookie, *flags);
318 if (type == LDLM_EXTENT) {
319 CDEBUG(D_INFO, "requested extent: "LPU64" -> "LPU64", got "
320 "extent "LPU64" -> "LPU64"\n",
321 body->lock_desc.l_policy_data.l_extent.start,
322 body->lock_desc.l_policy_data.l_extent.end,
323 reply->lock_policy_data.l_extent.start,
324 reply->lock_policy_data.l_extent.end);
326 cookie = &reply->lock_policy_data; /* FIXME bug 267 */
327 cookielen = sizeof(struct ldlm_extent);
328 } else if (type == LDLM_FLOCK) {
329 cookie = &reply->lock_policy_data;
330 cookielen = sizeof(struct ldlm_flock);
333 /* If enqueue returned a blocked lock but the completion handler has
334 * already run, then it fixed up the resource and we don't need to do it
336 if ((*flags) & LDLM_FL_LOCK_CHANGED) {
337 int newmode = reply->lock_mode;
339 if (newmode && newmode != lock->l_req_mode) {
340 LDLM_DEBUG(lock, "server returned different mode %s",
341 ldlm_lockname[newmode]);
342 lock->l_req_mode = newmode;
345 if (reply->lock_resource_name.name[0] !=
346 lock->l_resource->lr_name.name[0]) {
347 CDEBUG(D_INFO, "remote intent success, locking %ld "
349 (long)reply->lock_resource_name.name[0],
350 (long)lock->l_resource->lr_name.name[0]);
352 ldlm_lock_change_resource(ns, lock,
353 reply->lock_resource_name);
354 if (lock->l_resource == NULL) {
356 GOTO(out_req, rc = -ENOMEM);
358 LDLM_DEBUG(lock, "client-side enqueue, new resource");
361 if ((*flags) & LDLM_FL_AST_SENT) {
362 l_lock(&ns->ns_lock);
363 lock->l_flags |= LDLM_FL_CBPENDING;
364 l_unlock(&ns->ns_lock);
365 LDLM_DEBUG(lock, "enqueue reply includes blocking AST");
369 rc = ldlm_lock_enqueue(ns, &lock, cookie, cookielen, flags);
370 if (lock->l_completion_ast != NULL) {
371 int err = lock->l_completion_ast(lock, *flags, NULL);
373 failed_lock_cleanup(ns, lock, lockh, mode);
379 LDLM_DEBUG(lock, "client-side enqueue END");
383 ptlrpc_req_finished(req);
390 int ldlm_cli_replay_enqueue(struct ldlm_lock *lock)
392 struct lustre_handle lockh;
393 struct ldlm_res_id junk;
394 int flags = LDLM_FL_REPLAY;
395 ldlm_lock2handle(lock, &lockh);
396 return ldlm_cli_enqueue(lock->l_conn_export, NULL, NULL, NULL, junk,
397 lock->l_resource->lr_type, NULL, 0, -1, &flags,
398 NULL, NULL, NULL, &lockh);
401 static int ldlm_cli_convert_local(struct ldlm_lock *lock, int new_mode,
405 if (lock->l_resource->lr_namespace->ns_client) {
406 CERROR("Trying to cancel local lock\n");
409 LDLM_DEBUG(lock, "client-side local convert");
411 ldlm_lock_convert(lock, new_mode, flags);
412 ldlm_reprocess_all(lock->l_resource);
414 LDLM_DEBUG(lock, "client-side local convert handler END");
419 /* FIXME: one of ldlm_cli_convert or the server side should reject attempted
420 * conversion of locks which are on the waiting or converting queue */
421 int ldlm_cli_convert(struct lustre_handle *lockh, int new_mode, int *flags)
423 struct ldlm_request *body;
424 struct ldlm_reply *reply;
425 struct ldlm_lock *lock;
426 struct ldlm_resource *res;
427 struct ptlrpc_request *req;
428 int rc, size = sizeof(*body);
431 lock = ldlm_handle2lock(lockh);
438 if (lock->l_conn_export == NULL)
439 RETURN(ldlm_cli_convert_local(lock, new_mode, flags));
441 LDLM_DEBUG(lock, "client-side convert");
443 req = ptlrpc_prep_req(class_exp2cliimp(lock->l_conn_export),
444 LDLM_CONVERT, 1, &size, NULL);
446 GOTO(out, rc = -ENOMEM);
448 body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
449 memcpy(&body->lock_handle1, &lock->l_remote_handle,
450 sizeof(body->lock_handle1));
452 body->lock_desc.l_req_mode = new_mode;
453 body->lock_flags = *flags;
455 size = sizeof(*reply);
456 req->rq_replen = lustre_msg_size(1, &size);
458 rc = ptlrpc_queue_wait(req);
462 reply = lustre_swab_repbuf(req, 0, sizeof (*reply),
463 lustre_swab_ldlm_reply);
465 CERROR ("Can't unpack ldlm_reply\n");
466 GOTO (out, rc = -EPROTO);
469 res = ldlm_lock_convert(lock, new_mode, &reply->lock_flags);
471 ldlm_reprocess_all(res);
472 /* Go to sleep until the lock is granted. */
473 /* FIXME: or cancelled. */
474 if (lock->l_completion_ast)
475 lock->l_completion_ast(lock, LDLM_FL_WAIT_NOREPROC, NULL);
479 ptlrpc_req_finished(req);
483 int ldlm_cli_cancel(struct lustre_handle *lockh)
485 struct ptlrpc_request *req;
486 struct ldlm_lock *lock;
487 struct ldlm_request *body;
488 int rc = 0, size = sizeof(*body);
491 /* concurrent cancels on the same handle can happen */
492 lock = __ldlm_handle2lock(lockh, LDLM_FL_CANCELING);
496 if (lock->l_conn_export) {
498 struct obd_import *imp;
500 LDLM_DEBUG(lock, "client-side cancel");
501 /* Set this flag to prevent others from getting new references*/
502 l_lock(&lock->l_resource->lr_namespace->ns_lock);
503 lock->l_flags |= LDLM_FL_CBPENDING;
504 local_only = (lock->l_flags & LDLM_FL_LOCAL_ONLY);
505 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
506 ldlm_cancel_callback(lock);
509 CDEBUG(D_INFO, "not sending request (at caller's "
515 imp = class_exp2cliimp(lock->l_conn_export);
516 if (imp == NULL || imp->imp_invalid) {
517 CDEBUG(D_HA, "skipping cancel on invalid import %p\n",
522 req = ptlrpc_prep_req(imp, LDLM_CANCEL, 1, &size, NULL);
524 GOTO(out, rc = -ENOMEM);
525 req->rq_no_resend = 1;
527 /* XXX FIXME bug 249 */
528 req->rq_request_portal = LDLM_CANCEL_REQUEST_PORTAL;
529 req->rq_reply_portal = LDLM_CANCEL_REPLY_PORTAL;
531 body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
532 memcpy(&body->lock_handle1, &lock->l_remote_handle,
533 sizeof(body->lock_handle1));
535 req->rq_replen = lustre_msg_size(0, NULL);
537 rc = ptlrpc_queue_wait(req);
540 CERROR("client/server (nid "LPU64") out of sync--not "
542 req->rq_import->imp_connection->c_peer.peer_nid);
543 } else if (rc == -ETIMEDOUT) {
544 ptlrpc_req_finished(req);
546 } else if (rc != ELDLM_OK) {
547 CERROR("Got rc %d from cancel RPC: canceling "
551 ptlrpc_req_finished(req);
553 ldlm_lock_cancel(lock);
555 if (lock->l_resource->lr_namespace->ns_client) {
556 LDLM_ERROR(lock, "Trying to cancel local lock\n");
559 LDLM_DEBUG(lock, "client-side local cancel");
560 ldlm_lock_cancel(lock);
561 ldlm_reprocess_all(lock->l_resource);
562 LDLM_DEBUG(lock, "client-side local cancel handler END");
571 int ldlm_cancel_lru(struct ldlm_namespace *ns)
573 struct list_head *tmp, *next, list = LIST_HEAD_INIT(list);
575 struct ldlm_ast_work *w;
578 l_lock(&ns->ns_lock);
579 count = ns->ns_nr_unused - ns->ns_max_unused;
582 l_unlock(&ns->ns_lock);
586 list_for_each_safe(tmp, next, &ns->ns_unused_list) {
587 struct ldlm_lock *lock;
588 lock = list_entry(tmp, struct ldlm_lock, l_lru);
590 LASSERT(!lock->l_readers && !lock->l_writers);
592 /* Setting the CBPENDING flag is a little misleading, but
593 * prevents an important race; namely, once CBPENDING is set,
594 * the lock can accumulate no more readers/writers. Since
595 * readers and writers are already zero here, ldlm_lock_decref
596 * won't see this flag and call l_blocking_ast */
597 lock->l_flags |= LDLM_FL_CBPENDING;
599 OBD_ALLOC(w, sizeof(*w));
602 w->w_lock = LDLM_LOCK_GET(lock);
603 list_add(&w->w_list, &list);
604 ldlm_lock_remove_from_lru(lock);
609 l_unlock(&ns->ns_lock);
611 list_for_each_safe(tmp, next, &list) {
612 struct lustre_handle lockh;
614 w = list_entry(tmp, struct ldlm_ast_work, w_list);
616 ldlm_lock2handle(w->w_lock, &lockh);
617 rc = ldlm_cli_cancel(&lockh);
619 CDEBUG(D_INFO, "ldlm_cli_cancel: %d\n", rc);
621 list_del(&w->w_list);
622 LDLM_LOCK_PUT(w->w_lock);
623 OBD_FREE(w, sizeof(*w));
629 static int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns,
630 struct ldlm_res_id res_id, int flags,
633 struct ldlm_resource *res;
634 struct list_head *tmp, *next, list = LIST_HEAD_INIT(list);
635 struct ldlm_ast_work *w;
638 res = ldlm_resource_get(ns, NULL, res_id, 0, 0);
640 /* This is not a problem. */
641 CDEBUG(D_INFO, "No resource "LPU64"\n", res_id.name[0]);
645 l_lock(&ns->ns_lock);
646 list_for_each(tmp, &res->lr_granted) {
647 struct ldlm_lock *lock;
648 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
650 if (opaque != NULL && lock->l_ast_data != opaque) {
651 LDLM_ERROR(lock, "data %p doesn't match opaque %p",
652 lock->l_ast_data, opaque);
657 if (lock->l_readers || lock->l_writers) {
658 if (flags & LDLM_FL_WARN) {
659 LDLM_ERROR(lock, "lock in use");
665 /* See CBPENDING comment in ldlm_cancel_lru */
666 lock->l_flags |= LDLM_FL_CBPENDING;
668 OBD_ALLOC(w, sizeof(*w));
671 w->w_lock = LDLM_LOCK_GET(lock);
673 /* Prevent the cancel callback from being called by setting
674 * LDLM_FL_CANCEL in the lock. Very sneaky. -p */
675 if (flags & LDLM_FL_NO_CALLBACK)
676 w->w_lock->l_flags |= LDLM_FL_CANCEL;
678 list_add(&w->w_list, &list);
680 l_unlock(&ns->ns_lock);
682 list_for_each_safe(tmp, next, &list) {
683 struct lustre_handle lockh;
685 w = list_entry(tmp, struct ldlm_ast_work, w_list);
687 if (flags & LDLM_FL_LOCAL_ONLY) {
688 ldlm_lock_cancel(w->w_lock);
690 ldlm_lock2handle(w->w_lock, &lockh);
691 rc = ldlm_cli_cancel(&lockh);
693 CERROR("ldlm_cli_cancel: %d\n", rc);
695 list_del(&w->w_list);
696 LDLM_LOCK_PUT(w->w_lock);
697 OBD_FREE(w, sizeof(*w));
700 ldlm_resource_putref(res);
705 /* Cancel all locks on a namespace (or a specific resource, if given)
706 * that have 0 readers/writers.
708 * If flags & LDLM_FL_LOCAL_ONLY, throw the locks away without trying
709 * to notify the server.
710 * If flags & LDLM_FL_NO_CALLBACK, don't run the cancel callback.
711 * If flags & LDLM_FL_WARN, print a warning if some locks are still in use. */
712 int ldlm_cli_cancel_unused(struct ldlm_namespace *ns,
713 struct ldlm_res_id *res_id, int flags, void *opaque)
722 RETURN(ldlm_cli_cancel_unused_resource(ns, *res_id, flags,
725 l_lock(&ns->ns_lock);
726 for (i = 0; i < RES_HASH_SIZE; i++) {
727 struct list_head *tmp, *pos;
728 list_for_each_safe(tmp, pos, &(ns->ns_hash[i])) {
730 struct ldlm_resource *res;
731 res = list_entry(tmp, struct ldlm_resource, lr_hash);
732 ldlm_resource_getref(res);
734 rc = ldlm_cli_cancel_unused_resource(ns, res->lr_name,
738 CERROR("cancel_unused_res ("LPU64"): %d\n",
739 res->lr_name.name[0], rc);
740 ldlm_resource_putref(res);
743 l_unlock(&ns->ns_lock);
748 /* Lock iterators. */
750 int ldlm_resource_foreach(struct ldlm_resource *res, ldlm_iterator_t iter,
753 struct list_head *tmp, *next;
754 struct ldlm_lock *lock;
755 int rc = LDLM_ITER_CONTINUE;
756 struct ldlm_namespace *ns = res->lr_namespace;
761 RETURN(LDLM_ITER_CONTINUE);
763 l_lock(&ns->ns_lock);
764 list_for_each_safe(tmp, next, &res->lr_granted) {
765 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
767 if (iter(lock, closure) == LDLM_ITER_STOP)
768 GOTO(out, rc = LDLM_ITER_STOP);
771 list_for_each_safe(tmp, next, &res->lr_converting) {
772 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
774 if (iter(lock, closure) == LDLM_ITER_STOP)
775 GOTO(out, rc = LDLM_ITER_STOP);
778 list_for_each_safe(tmp, next, &res->lr_waiting) {
779 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
781 if (iter(lock, closure) == LDLM_ITER_STOP)
782 GOTO(out, rc = LDLM_ITER_STOP);
785 l_unlock(&ns->ns_lock);
789 struct iter_helper_data {
790 ldlm_iterator_t iter;
794 static int ldlm_iter_helper(struct ldlm_lock *lock, void *closure)
796 struct iter_helper_data *helper = closure;
797 return helper->iter(lock, helper->closure);
800 static int ldlm_res_iter_helper(struct ldlm_resource *res, void *closure)
802 return ldlm_resource_foreach(res, ldlm_iter_helper, closure);
805 int ldlm_namespace_foreach(struct ldlm_namespace *ns, ldlm_iterator_t iter,
808 struct iter_helper_data helper = { iter: iter, closure: closure };
809 return ldlm_namespace_foreach_res(ns, ldlm_res_iter_helper, &helper);
812 int ldlm_namespace_foreach_res(struct ldlm_namespace *ns,
813 ldlm_res_iterator_t iter, void *closure)
815 int i, rc = LDLM_ITER_CONTINUE;
817 l_lock(&ns->ns_lock);
818 for (i = 0; i < RES_HASH_SIZE; i++) {
819 struct list_head *tmp, *next;
820 list_for_each_safe(tmp, next, &(ns->ns_hash[i])) {
821 struct ldlm_resource *res =
822 list_entry(tmp, struct ldlm_resource, lr_hash);
824 ldlm_resource_getref(res);
825 rc = iter(res, closure);
826 ldlm_resource_putref(res);
827 if (rc == LDLM_ITER_STOP)
832 l_unlock(&ns->ns_lock);
836 /* non-blocking function to manipulate a lock whose cb_data is being put away.*/
837 void ldlm_change_cbdata(struct ldlm_namespace *ns,
838 struct ldlm_res_id *res_id,
839 ldlm_iterator_t iter,
842 struct ldlm_resource *res;
846 CERROR("must pass in namespace");
850 res = ldlm_resource_get(ns, NULL, *res_id, 0, 0);
856 l_lock(&ns->ns_lock);
857 ldlm_resource_foreach(res, iter, data);
858 l_unlock(&ns->ns_lock);
859 ldlm_resource_putref(res);
865 static int ldlm_chain_lock_for_replay(struct ldlm_lock *lock, void *closure)
867 struct list_head *list = closure;
869 /* we use l_pending_chain here, because it's unused on clients. */
870 list_add(&lock->l_pending_chain, list);
871 return LDLM_ITER_CONTINUE;
874 static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock)
876 struct ptlrpc_request *req;
877 struct ldlm_request *body;
878 struct ldlm_reply *reply;
883 * If granted mode matches the requested mode, this lock is granted.
885 * If they differ, but we have a granted mode, then we were granted
886 * one mode and now want another: ergo, converting.
888 * If we haven't been granted anything and are on a resource list,
889 * then we're blocked/waiting.
891 * If we haven't been granted anything and we're NOT on a resource list,
892 * then we haven't got a reply yet and don't have a known disposition.
893 * This happens whenever a lock enqueue is the request that triggers
896 if (lock->l_granted_mode == lock->l_req_mode)
897 flags = LDLM_FL_REPLAY | LDLM_FL_BLOCK_GRANTED;
898 else if (lock->l_granted_mode)
899 flags = LDLM_FL_REPLAY | LDLM_FL_BLOCK_CONV;
900 else if (!list_empty(&lock->l_res_link))
901 flags = LDLM_FL_REPLAY | LDLM_FL_BLOCK_WAIT;
903 flags = LDLM_FL_REPLAY;
905 size = sizeof(*body);
906 req = ptlrpc_prep_req(imp, LDLM_ENQUEUE, 1, &size, NULL);
910 /* We're part of recovery, so don't wait for it. */
911 req->rq_send_state = LUSTRE_IMP_REPLAY;
913 body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
914 ldlm_lock2desc(lock, &body->lock_desc);
915 body->lock_flags = flags;
917 ldlm_lock2handle(lock, &body->lock_handle1);
918 size = sizeof(*reply);
919 req->rq_replen = lustre_msg_size(1, &size);
921 LDLM_DEBUG(lock, "replaying lock:");
922 rc = ptlrpc_queue_wait(req);
926 reply = lustre_swab_repbuf(req, 0, sizeof (*reply),
927 lustre_swab_ldlm_reply);
929 CERROR("Can't unpack ldlm_reply\n");
930 GOTO (out, rc = -EPROTO);
933 memcpy(&lock->l_remote_handle, &reply->lock_handle,
934 sizeof(lock->l_remote_handle));
935 LDLM_DEBUG(lock, "replayed lock:");
937 ptlrpc_req_finished(req);
941 int ldlm_replay_locks(struct obd_import *imp)
943 struct ldlm_namespace *ns = imp->imp_obd->obd_namespace;
944 struct list_head list, *pos, *next;
945 struct ldlm_lock *lock;
949 INIT_LIST_HEAD(&list);
951 l_lock(&ns->ns_lock);
952 (void)ldlm_namespace_foreach(ns, ldlm_chain_lock_for_replay, &list);
954 list_for_each_safe(pos, next, &list) {
955 lock = list_entry(pos, struct ldlm_lock, l_pending_chain);
956 rc = replay_one_lock(imp, lock);
958 break; /* or try to do the rest? */
960 l_unlock(&ns->ns_lock);