1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2002 Cluster File Systems, Inc.
6 * This file is part of Lustre, http://www.lustre.org.
8 * Lustre is free software; you can redistribute it and/or
9 * modify it under the terms of version 2 of the GNU General Public
10 * License as published by the Free Software Foundation.
12 * Lustre is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with Lustre; if not, write to the Free Software
19 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22 #define DEBUG_SUBSYSTEM S_LDLM
24 #include <linux/lustre_dlm.h>
25 #include <linux/obd_class.h>
26 #include <linux/obd.h>
28 static int interrupted_completion_wait(void *data)
33 int ldlm_expired_completion_wait(void *data)
35 struct ldlm_lock *lock = data;
36 struct ptlrpc_connection *conn;
37 struct obd_device *obd;
40 CERROR("NULL lock\n");
41 else if (!lock->l_connh)
42 CERROR("lock %p has NULL connh\n", lock);
43 else if (!(obd = class_conn2obd(lock->l_connh)))
44 CERROR("lock %p has NULL obd\n", lock);
45 else if (!(conn = obd->u.cli.cl_import.imp_connection))
46 CERROR("lock %p has NULL connection\n", lock);
48 LDLM_DEBUG(lock, "timed out waiting for completion");
49 CERROR("lock %p timed out from %s\n", lock,
51 ldlm_lock_dump(D_ERROR, lock);
52 class_signal_connection_failure(conn);
57 int ldlm_completion_ast(struct ldlm_lock *lock, int flags)
59 struct l_wait_info lwi =
60 LWI_TIMEOUT_INTR(obd_timeout * HZ, ldlm_expired_completion_wait,
61 interrupted_completion_wait, lock);
65 if (flags == LDLM_FL_WAIT_NOREPROC)
69 wake_up(&lock->l_waitq);
73 if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
77 LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, "
79 ldlm_lock_dump(D_OTHER, lock);
80 ldlm_reprocess_all(lock->l_resource);
83 /* Go to sleep until the lock is granted or cancelled. */
84 rc = l_wait_event(lock->l_waitq,
85 ((lock->l_req_mode == lock->l_granted_mode) ||
86 lock->l_destroyed), &lwi);
88 if (lock->l_destroyed) {
89 LDLM_DEBUG(lock, "client-side enqueue waking up: destroyed");
94 LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
99 LDLM_DEBUG(lock, "client-side enqueue waking up: granted");
103 static int ldlm_cli_enqueue_local(struct ldlm_namespace *ns,
104 struct lustre_handle *parent_lockh,
107 void *cookie, int cookielen,
110 ldlm_completion_callback completion,
111 ldlm_blocking_callback blocking,
114 struct lustre_handle *lockh)
116 struct ldlm_lock *lock;
121 CERROR("Trying to enqueue local lock in a shadow namespace\n");
125 lock = ldlm_lock_create(ns, parent_lockh, res_id, type, mode, data,
128 GOTO(out_nolock, err = -ENOMEM);
129 LDLM_DEBUG(lock, "client-side local enqueue handler, new lock created");
131 ldlm_lock_addref_internal(lock, mode);
132 ldlm_lock2handle(lock, lockh);
133 lock->l_connh = NULL;
135 err = ldlm_lock_enqueue(ns, lock, cookie, cookielen, flags, completion,
140 if (type == LDLM_EXTENT)
141 memcpy(cookie, &lock->l_extent, sizeof(lock->l_extent));
142 if ((*flags) & LDLM_FL_LOCK_CHANGED)
143 memcpy(res_id, lock->l_resource->lr_name, sizeof(*res_id));
145 LDLM_DEBUG_NOLOCK("client-side local enqueue handler END (lock %p)",
148 if (lock->l_completion_ast)
149 lock->l_completion_ast(lock, *flags);
151 LDLM_DEBUG(lock, "client-side local enqueue END");
159 int ldlm_cli_enqueue(struct lustre_handle *connh,
160 struct ptlrpc_request *req,
161 struct ldlm_namespace *ns,
162 struct lustre_handle *parent_lock_handle,
165 void *cookie, int cookielen,
168 ldlm_completion_callback completion,
169 ldlm_blocking_callback blocking,
172 struct lustre_handle *lockh)
174 struct ldlm_lock *lock;
175 struct ldlm_request *body;
176 struct ldlm_reply *reply;
177 int rc, size = sizeof(*body), req_passed_in = 1, is_replay;
180 is_replay = *flags & LDLM_FL_REPLAY;
181 LASSERT(connh != NULL || !is_replay);
184 return ldlm_cli_enqueue_local(ns, parent_lock_handle, res_id,
185 type, cookie, cookielen, mode,
186 flags, completion, blocking, data,
189 /* If we're replaying this lock, just check some invariants.
190 * If we're creating a new lock, get everything all setup nice. */
192 lock = ldlm_handle2lock(lockh);
193 LDLM_DEBUG(lock, "client-side enqueue START");
194 LASSERT(connh == lock->l_connh);
196 lock = ldlm_lock_create(ns, parent_lock_handle, res_id, type,
197 mode, data, data_len);
199 GOTO(out_nolock, rc = -ENOMEM);
200 LDLM_DEBUG(lock, "client-side enqueue START");
201 /* for the local lock, add the reference */
202 ldlm_lock_addref_internal(lock, mode);
203 ldlm_lock2handle(lock, lockh);
204 if (type == LDLM_EXTENT)
205 memcpy(&lock->l_extent, cookie,
206 sizeof(body->lock_desc.l_extent));
210 req = ptlrpc_prep_req(class_conn2cliimp(connh), LDLM_ENQUEUE, 1,
213 GOTO(out, rc = -ENOMEM);
215 } else if (req->rq_reqmsg->buflens[0] != sizeof(*body))
218 /* Dump lock data into the request buffer */
219 body = lustre_msg_buf(req->rq_reqmsg, 0);
220 ldlm_lock2desc(lock, &body->lock_desc);
221 body->lock_flags = *flags;
223 memcpy(&body->lock_handle1, lockh, sizeof(*lockh));
224 if (parent_lock_handle)
225 memcpy(&body->lock_handle2, parent_lock_handle,
226 sizeof(body->lock_handle2));
228 /* Continue as normal. */
229 if (!req_passed_in) {
230 size = sizeof(*reply);
231 req->rq_replen = lustre_msg_size(1, &size);
233 lock->l_connh = connh;
234 lock->l_export = NULL;
236 LDLM_DEBUG(lock, "sending request");
237 rc = ptlrpc_queue_wait(req);
239 if (rc != ELDLM_OK) {
241 LDLM_DEBUG(lock, "client-side enqueue END (%s)",
242 rc == ELDLM_LOCK_ABORTED ? "ABORTED" : "FAILED");
243 ldlm_lock_decref(lockh, mode);
244 /* FIXME: if we've already received a completion AST, this will
246 ldlm_lock_destroy(lock);
250 reply = lustre_msg_buf(req->rq_repmsg, 0);
251 memcpy(&lock->l_remote_handle, &reply->lock_handle,
252 sizeof(lock->l_remote_handle));
253 *flags = reply->lock_flags;
255 CDEBUG(D_INFO, "local: %p, remote: %p, flags: %d\n", lock,
256 (void *)(unsigned long)reply->lock_handle.addr, *flags);
257 if (type == LDLM_EXTENT) {
258 CDEBUG(D_INFO, "requested extent: "LPU64" -> "LPU64", got "
259 "extent "LPU64" -> "LPU64"\n",
260 body->lock_desc.l_extent.start,
261 body->lock_desc.l_extent.end,
262 reply->lock_extent.start, reply->lock_extent.end);
263 cookie = &reply->lock_extent; /* FIXME bug 267 */
264 cookielen = sizeof(reply->lock_extent);
267 /* If enqueue returned a blocked lock but the completion handler has
268 * already run, then it fixed up the resource and we don't need to do it
270 if ((*flags) & LDLM_FL_LOCK_CHANGED) {
271 int newmode = reply->lock_mode;
273 if (newmode && newmode != lock->l_req_mode) {
274 LDLM_DEBUG(lock, "server returned different mode %s",
275 ldlm_lockname[newmode]);
276 lock->l_req_mode = newmode;
279 if (reply->lock_resource_name[0] !=
280 lock->l_resource->lr_name[0]) {
281 CDEBUG(D_INFO, "remote intent success, locking %ld "
283 (long)reply->lock_resource_name[0],
284 (long)lock->l_resource->lr_name[0]);
286 ldlm_lock_change_resource(ns, lock,
287 reply->lock_resource_name);
288 if (lock->l_resource == NULL) {
290 GOTO(out_req, rc = -ENOMEM);
292 LDLM_DEBUG(lock, "client-side enqueue, new resource");
297 rc = ldlm_lock_enqueue(ns, lock, cookie, cookielen, flags,
298 completion, blocking);
299 if (lock->l_completion_ast)
300 lock->l_completion_ast(lock, *flags);
303 LDLM_DEBUG(lock, "client-side enqueue END");
307 ptlrpc_req_finished(req);
314 int ldlm_match_or_enqueue(struct lustre_handle *connh,
315 struct ptlrpc_request *req,
316 struct ldlm_namespace *ns,
317 struct lustre_handle *parent_lock_handle,
320 void *cookie, int cookielen,
323 ldlm_completion_callback completion,
324 ldlm_blocking_callback blocking,
327 struct lustre_handle *lockh)
331 rc = ldlm_lock_match(ns, res_id, type, cookie, cookielen, mode, lockh);
333 rc = ldlm_cli_enqueue(connh, req, ns,
334 parent_lock_handle, res_id, type, cookie,
335 cookielen, mode, flags, completion,
336 blocking, data, data_len, lockh);
338 CERROR("ldlm_cli_enqueue: err: %d\n", rc);
344 int ldlm_cli_replay_enqueue(struct ldlm_lock *lock)
346 struct lustre_handle lockh;
347 int flags = LDLM_FL_REPLAY;
348 ldlm_lock2handle(lock, &lockh);
349 return ldlm_cli_enqueue(lock->l_connh, NULL, NULL, NULL, NULL,
350 lock->l_resource->lr_type, NULL, 0, -1, &flags,
351 NULL, NULL, NULL, 0, &lockh);
354 static int ldlm_cli_convert_local(struct ldlm_lock *lock, int new_mode,
358 if (lock->l_resource->lr_namespace->ns_client) {
359 CERROR("Trying to cancel local lock\n");
362 LDLM_DEBUG(lock, "client-side local convert");
364 ldlm_lock_convert(lock, new_mode, flags);
365 ldlm_reprocess_all(lock->l_resource);
367 LDLM_DEBUG(lock, "client-side local convert handler END");
372 /* FIXME: one of ldlm_cli_convert or the server side should reject attempted
373 * conversion of locks which are on the waiting or converting queue */
374 int ldlm_cli_convert(struct lustre_handle *lockh, int new_mode, int *flags)
376 struct ldlm_request *body;
377 struct lustre_handle *connh;
378 struct ldlm_reply *reply;
379 struct ldlm_lock *lock;
380 struct ldlm_resource *res;
381 struct ptlrpc_request *req;
382 int rc, size = sizeof(*body);
385 lock = ldlm_handle2lock(lockh);
391 connh = lock->l_connh;
394 RETURN(ldlm_cli_convert_local(lock, new_mode, flags));
396 LDLM_DEBUG(lock, "client-side convert");
398 req = ptlrpc_prep_req(class_conn2cliimp(connh), LDLM_CONVERT, 1, &size,
401 GOTO(out, rc = -ENOMEM);
403 body = lustre_msg_buf(req->rq_reqmsg, 0);
404 memcpy(&body->lock_handle1, &lock->l_remote_handle,
405 sizeof(body->lock_handle1));
407 body->lock_desc.l_req_mode = new_mode;
408 body->lock_flags = *flags;
410 size = sizeof(*reply);
411 req->rq_replen = lustre_msg_size(1, &size);
413 rc = ptlrpc_queue_wait(req);
417 reply = lustre_msg_buf(req->rq_repmsg, 0);
418 res = ldlm_lock_convert(lock, new_mode, &reply->lock_flags);
420 ldlm_reprocess_all(res);
421 /* Go to sleep until the lock is granted. */
422 /* FIXME: or cancelled. */
423 if (lock->l_completion_ast)
424 lock->l_completion_ast(lock, LDLM_FL_WAIT_NOREPROC);
428 ptlrpc_req_finished(req);
432 int ldlm_cli_cancel(struct lustre_handle *lockh)
434 struct ptlrpc_request *req;
435 struct ldlm_lock *lock;
436 struct ldlm_request *body;
437 int rc = 0, size = sizeof(*body);
440 /* concurrent cancels on the same handle can happen */
441 lock = __ldlm_handle2lock(lockh, LDLM_FL_CANCELING);
446 LDLM_DEBUG(lock, "client-side cancel");
447 /* Set this flag to prevent others from getting new references*/
448 l_lock(&lock->l_resource->lr_namespace->ns_lock);
449 lock->l_flags |= LDLM_FL_CBPENDING;
450 ldlm_cancel_callback(lock);
451 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
453 req = ptlrpc_prep_req(class_conn2cliimp(lock->l_connh),
454 LDLM_CANCEL, 1, &size, NULL);
456 GOTO(out, rc = -ENOMEM);
458 /* XXX FIXME bug 249 */
459 req->rq_request_portal = LDLM_CANCEL_REQUEST_PORTAL;
460 req->rq_reply_portal = LDLM_CANCEL_REPLY_PORTAL;
462 body = lustre_msg_buf(req->rq_reqmsg, 0);
463 memcpy(&body->lock_handle1, &lock->l_remote_handle,
464 sizeof(body->lock_handle1));
466 req->rq_replen = lustre_msg_size(0, NULL);
468 rc = ptlrpc_queue_wait(req);
469 ptlrpc_req_finished(req);
473 ldlm_lock_cancel(lock);
475 LDLM_DEBUG(lock, "client-side local cancel");
476 if (lock->l_resource->lr_namespace->ns_client) {
477 CERROR("Trying to cancel local lock\n");
480 ldlm_lock_cancel(lock);
481 ldlm_reprocess_all(lock->l_resource);
482 LDLM_DEBUG(lock, "client-side local cancel handler END");
485 lock->l_flags |= LDLM_FL_CANCELING;
493 int ldlm_cancel_lru(struct ldlm_namespace *ns)
495 struct list_head *tmp, *next, list = LIST_HEAD_INIT(list);
497 struct ldlm_ast_work *w;
500 l_lock(&ns->ns_lock);
501 count = ns->ns_nr_unused - ns->ns_max_unused;
504 l_unlock(&ns->ns_lock);
508 list_for_each_safe(tmp, next, &ns->ns_unused_list) {
509 struct ldlm_lock *lock;
510 lock = list_entry(tmp, struct ldlm_lock, l_lru);
512 LASSERT(!lock->l_readers && !lock->l_writers);
514 /* Setting the CBPENDING flag is a little misleading, but
515 * prevents an important race; namely, once CBPENDING is set,
516 * the lock can accumulate no more readers/writers. Since
517 * readers and writers are already zero here, ldlm_lock_decref
518 * won't see this flag and call l_blocking_ast */
519 lock->l_flags |= LDLM_FL_CBPENDING;
521 OBD_ALLOC(w, sizeof(*w));
524 w->w_lock = LDLM_LOCK_GET(lock);
525 list_add(&w->w_list, &list);
526 ldlm_lock_remove_from_lru(lock);
531 l_unlock(&ns->ns_lock);
533 list_for_each_safe(tmp, next, &list) {
534 struct lustre_handle lockh;
536 w = list_entry(tmp, struct ldlm_ast_work, w_list);
538 ldlm_lock2handle(w->w_lock, &lockh);
539 rc = ldlm_cli_cancel(&lockh);
541 CDEBUG(D_INFO, "ldlm_cli_cancel: %d\n", rc);
543 list_del(&w->w_list);
544 LDLM_LOCK_PUT(w->w_lock);
545 OBD_FREE(w, sizeof(*w));
551 int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns,
552 __u64 *res_id, int flags)
554 struct ldlm_resource *res;
555 struct list_head *tmp, *next, list = LIST_HEAD_INIT(list);
556 struct ldlm_ast_work *w;
559 res = ldlm_resource_get(ns, NULL, res_id, 0, 0);
561 /* This is not a problem. */
562 CDEBUG(D_INFO, "No resource "LPU64"\n", res_id[0]);
566 l_lock(&ns->ns_lock);
567 list_for_each(tmp, &res->lr_granted) {
568 struct ldlm_lock *lock;
569 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
571 if (lock->l_readers || lock->l_writers)
574 /* See CBPENDING comment in ldlm_cancel_lru */
575 lock->l_flags |= LDLM_FL_CBPENDING;
577 OBD_ALLOC(w, sizeof(*w));
580 w->w_lock = LDLM_LOCK_GET(lock);
581 list_add(&w->w_list, &list);
583 l_unlock(&ns->ns_lock);
585 list_for_each_safe(tmp, next, &list) {
586 struct lustre_handle lockh;
588 w = list_entry(tmp, struct ldlm_ast_work, w_list);
590 /* Prevent the cancel callback from being called by setting
591 * LDLM_FL_CANCEL in the lock. Very sneaky. -p */
592 if (flags & LDLM_FL_NO_CALLBACK)
593 w->w_lock->l_flags |= LDLM_FL_CANCEL;
595 if (flags & LDLM_FL_LOCAL_ONLY) {
596 ldlm_lock_cancel(w->w_lock);
598 ldlm_lock2handle(w->w_lock, &lockh);
599 rc = ldlm_cli_cancel(&lockh);
601 CERROR("ldlm_cli_cancel: %d\n", rc);
603 list_del(&w->w_list);
604 LDLM_LOCK_PUT(w->w_lock);
605 OBD_FREE(w, sizeof(*w));
608 ldlm_resource_putref(res);
613 /* Cancel all locks on a namespace (or a specific resource, if given) that have
616 * If 'local_only' is true, throw the locks away without trying to notify the
618 int ldlm_cli_cancel_unused(struct ldlm_namespace *ns, __u64 *res_id,
628 RETURN(ldlm_cli_cancel_unused_resource(ns, res_id, flags));
630 l_lock(&ns->ns_lock);
631 for (i = 0; i < RES_HASH_SIZE; i++) {
632 struct list_head *tmp, *pos;
633 list_for_each_safe(tmp, pos, &(ns->ns_hash[i])) {
635 struct ldlm_resource *res;
636 res = list_entry(tmp, struct ldlm_resource, lr_hash);
637 ldlm_resource_getref(res);
639 rc = ldlm_cli_cancel_unused_resource(ns, res->lr_name,
643 CERROR("cancel_unused_res ("LPU64"): %d\n",
644 res->lr_name[0], rc);
645 ldlm_resource_putref(res);
648 l_unlock(&ns->ns_lock);
653 /* Lock iterators. */
655 int ldlm_resource_foreach(struct ldlm_resource *res, ldlm_iterator_t iter,
658 struct list_head *tmp, *next;
659 struct ldlm_lock *lock;
660 int rc = LDLM_ITER_CONTINUE;
661 struct ldlm_namespace *ns = res->lr_namespace;
666 RETURN(LDLM_ITER_CONTINUE);
668 l_lock(&ns->ns_lock);
669 list_for_each_safe(tmp, next, &res->lr_granted) {
670 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
672 if (iter(lock, closure) == LDLM_ITER_STOP)
673 GOTO(out, rc = LDLM_ITER_STOP);
676 list_for_each_safe(tmp, next, &res->lr_converting) {
677 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
679 if (iter(lock, closure) == LDLM_ITER_STOP)
680 GOTO(out, rc = LDLM_ITER_STOP);
683 list_for_each_safe(tmp, next, &res->lr_waiting) {
684 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
686 if (iter(lock, closure) == LDLM_ITER_STOP)
687 GOTO(out, rc = LDLM_ITER_STOP);
690 l_unlock(&ns->ns_lock);
694 struct iter_helper_data {
695 ldlm_iterator_t iter;
699 static int ldlm_iter_helper(struct ldlm_lock *lock, void *closure)
701 struct iter_helper_data *helper = closure;
702 return helper->iter(lock, helper->closure);
705 static int ldlm_res_iter_helper(struct ldlm_resource *res, void *closure)
707 return ldlm_resource_foreach(res, ldlm_iter_helper, closure);
710 int ldlm_namespace_foreach(struct ldlm_namespace *ns, ldlm_iterator_t iter,
713 struct iter_helper_data helper = { iter: iter, closure: closure };
714 return ldlm_namespace_foreach_res(ns, ldlm_res_iter_helper, &helper);
717 int ldlm_namespace_foreach_res(struct ldlm_namespace *ns,
718 ldlm_res_iterator_t iter, void *closure)
720 int i, rc = LDLM_ITER_CONTINUE;
722 l_lock(&ns->ns_lock);
723 for (i = 0; i < RES_HASH_SIZE; i++) {
724 struct list_head *tmp, *next;
725 list_for_each_safe(tmp, next, &(ns->ns_hash[i])) {
726 struct ldlm_resource *res =
727 list_entry(tmp, struct ldlm_resource, lr_hash);
729 ldlm_resource_getref(res);
730 rc = iter(res, closure);
731 ldlm_resource_putref(res);
732 if (rc == LDLM_ITER_STOP)
737 l_unlock(&ns->ns_lock);
743 static int ldlm_chain_lock_for_replay(struct ldlm_lock *lock, void *closure)
745 struct list_head *list = closure;
747 /* we use l_pending_chain here, because it's unused on clients. */
748 list_add(&lock->l_pending_chain, list);
749 return LDLM_ITER_CONTINUE;
752 static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock)
754 struct ptlrpc_request *req;
755 struct ldlm_request *body;
756 struct ldlm_reply *reply;
761 * If granted mode matches the requested mode, this lock is granted.
763 * If they differ, but we have a granted mode, then we were granted
764 * one mode and now want another: ergo, converting.
766 * If we haven't been granted anything and are on a resource list,
767 * then we're blocked/waiting.
769 * If we haven't been granted anything and we're NOT on a resource list,
770 * then we haven't got a reply yet and don't have a known disposition.
771 * This happens whenever a lock enqueue is the request that triggers
774 if (lock->l_granted_mode == lock->l_req_mode)
775 flags = LDLM_FL_REPLAY | LDLM_FL_BLOCK_GRANTED;
776 else if (lock->l_granted_mode)
777 flags = LDLM_FL_REPLAY | LDLM_FL_BLOCK_CONV;
778 else if (!list_empty(&lock->l_res_link))
779 flags = LDLM_FL_REPLAY | LDLM_FL_BLOCK_WAIT;
781 flags = LDLM_FL_REPLAY;
783 size = sizeof(*body);
784 req = ptlrpc_prep_req(imp, LDLM_ENQUEUE, 1, &size, NULL);
788 /* We're part of recovery, so don't wait for it. */
789 req->rq_level = LUSTRE_CONN_RECOVD;
791 body = lustre_msg_buf(req->rq_reqmsg, 0);
792 ldlm_lock2desc(lock, &body->lock_desc);
793 body->lock_flags = flags;
795 ldlm_lock2handle(lock, &body->lock_handle1);
796 size = sizeof(*reply);
797 req->rq_replen = lustre_msg_size(1, &size);
799 LDLM_DEBUG(lock, "replaying lock:");
800 rc = ptlrpc_queue_wait(req);
804 reply = lustre_msg_buf(req->rq_repmsg, 0);
805 memcpy(&lock->l_remote_handle, &reply->lock_handle,
806 sizeof(lock->l_remote_handle));
807 LDLM_DEBUG(lock, "replayed lock:");
809 ptlrpc_req_finished(req);
813 int ldlm_replay_locks(struct obd_import *imp)
815 struct ldlm_namespace *ns = imp->imp_obd->obd_namespace;
816 struct list_head list, *pos, *next;
817 struct ldlm_lock *lock;
821 INIT_LIST_HEAD(&list);
823 l_lock(&ns->ns_lock);
824 (void)ldlm_namespace_foreach(ns, ldlm_chain_lock_for_replay, &list);
826 list_for_each_safe(pos, next, &list) {
827 lock = list_entry(pos, struct ldlm_lock, l_pending_chain);
828 rc = replay_one_lock(imp, lock);
830 break; /* or try to do the rest? */
832 l_unlock(&ns->ns_lock);