4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2003 Hewlett-Packard Development Company LP.
24 * Developed under the sponsorship of the US Government under
25 * Subcontract No. B514193
27 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2010, 2017, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
37 * This file implements POSIX lock type for Lustre.
38 * Its policy properties are start and end of extent and PID.
40 * These locks are only done through MDS due to POSIX semantics requiring
41 * e.g. that locks could be only partially released and as such split into
42 * two parts, and also that two adjacent locks from the same process may be
43 * merged into a single wider lock.
45 * Lock modes are mapped like this:
46 * PR and PW for READ and WRITE locks
47 * NL to request a releasing of a portion of the lock
49 * These flock locks never timeout.
52 #define DEBUG_SUBSYSTEM S_LDLM
54 #include <linux/list.h>
55 #ifdef HAVE_LINUX_FILELOCK_HEADER
56 #include <linux/filelock.h>
58 #include <lustre_dlm.h>
59 #include <obd_support.h>
60 #include <obd_class.h>
61 #include <lustre_lib.h>
63 #include "ldlm_internal.h"
65 int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
66 void *data, int flag);
69 ldlm_same_flock_owner(struct ldlm_lock *lock, struct ldlm_lock *new)
71 return ((new->l_policy_data.l_flock.owner ==
72 lock->l_policy_data.l_flock.owner) &&
73 (new->l_export == lock->l_export));
77 ldlm_flocks_overlap(struct ldlm_lock *lock, struct ldlm_lock *new)
79 return ((new->l_policy_data.l_flock.start <=
80 lock->l_policy_data.l_flock.end) &&
81 (new->l_policy_data.l_flock.end >=
82 lock->l_policy_data.l_flock.start));
85 static inline void ldlm_flock_blocking_link(struct ldlm_lock *req,
86 struct ldlm_lock *lock)
89 if (req->l_export == NULL)
92 LASSERT(hlist_unhashed(&req->l_exp_flock_hash));
94 req->l_policy_data.l_flock.blocking_owner =
95 lock->l_policy_data.l_flock.owner;
96 req->l_policy_data.l_flock.blocking_export =
98 atomic_set(&req->l_policy_data.l_flock.blocking_refs, 0);
100 cfs_hash_add(req->l_export->exp_flock_hash,
101 &req->l_policy_data.l_flock.owner,
102 &req->l_exp_flock_hash);
105 static inline void ldlm_flock_blocking_unlink(struct ldlm_lock *req)
107 /* For server only */
108 if (req->l_export == NULL)
111 check_res_locked(req->l_resource);
112 if (req->l_export->exp_flock_hash != NULL &&
113 !hlist_unhashed(&req->l_exp_flock_hash))
114 cfs_hash_del(req->l_export->exp_flock_hash,
115 &req->l_policy_data.l_flock.owner,
116 &req->l_exp_flock_hash);
119 /** Remove cancelled lock from resource interval tree. */
120 void ldlm_flock_unlink_lock(struct ldlm_lock *lock)
122 struct ldlm_resource *res = lock->l_resource;
123 struct ldlm_interval *node = lock->l_tree_node;
125 if (!node || !interval_is_intree(&node->li_node)) /* duplicate unlink */
128 node = ldlm_interval_detach(lock);
130 struct interval_node **root = &res->lr_flock_node.lfn_root;
132 interval_erase(&node->li_node, root);
133 ldlm_interval_free(node);
138 ldlm_flock_destroy(struct ldlm_lock *lock, enum ldlm_mode mode, __u64 flags)
142 LDLM_DEBUG(lock, "%s(mode: %d, flags: %#llx)", __func__, mode, flags);
144 /* Safe to not lock here, since it should be empty anyway */
145 LASSERT(hlist_unhashed(&lock->l_exp_flock_hash));
147 list_del_init(&lock->l_res_link);
148 if (flags == LDLM_FL_WAIT_NOREPROC) {
149 /* client side - set a flag to prevent sending a CANCEL */
150 lock->l_flags |= LDLM_FL_LOCAL_ONLY | LDLM_FL_CBPENDING;
152 /* when reaching here, it is under lock_res_and_lock(). Thus,
153 * need call the nolock version of ldlm_lock_decref_internal
155 ldlm_lock_decref_internal_nolock(lock, mode);
157 ldlm_flock_unlink_lock(lock);
159 ldlm_lock_destroy_nolock(lock);
163 #ifdef HAVE_SERVER_SUPPORT
165 * POSIX locks deadlock detection code.
167 * Given a new lock \a req and an existing lock \a bl_lock it conflicts
168 * with, we need to iterate through all blocked POSIX locks for this
169 * export and see if there is a deadlock condition arising. (i.e. when
170 * one client holds a lock on something and want a lock on something
171 * else and at the same time another client has the opposite situation).
173 struct ldlm_flock_lookup_cb_data {
175 struct ldlm_lock *lock;
176 struct obd_export *exp;
179 static int ldlm_flock_lookup_cb(struct obd_export *exp, void *data)
181 struct ldlm_flock_lookup_cb_data *cb_data = data;
182 struct ldlm_lock *lock;
187 lock = cfs_hash_lookup(exp->exp_flock_hash, cb_data->bl_owner);
191 /* Stop on first found lock. Same process can't sleep twice */
192 cb_data->lock = lock;
193 cb_data->exp = class_export_get(exp);
199 ldlm_flock_deadlock(struct ldlm_lock *req, struct ldlm_lock *bl_lock)
201 struct obd_export *req_exp = req->l_export;
202 struct obd_export *bl_exp = bl_lock->l_export;
203 __u64 req_owner = req->l_policy_data.l_flock.owner;
204 __u64 bl_owner = bl_lock->l_policy_data.l_flock.owner;
206 /* For server only */
210 class_export_get(bl_exp);
212 struct ldlm_flock_lookup_cb_data cb_data = {
213 .bl_owner = &bl_owner,
217 struct ptlrpc_connection *bl_exp_conn;
218 struct obd_export *bl_exp_new;
219 struct ldlm_lock *lock = NULL;
220 struct ldlm_flock *flock;
222 bl_exp_conn = bl_exp->exp_connection;
223 if (bl_exp->exp_flock_hash != NULL) {
226 found = obd_nid_export_for_each(bl_exp->exp_obd,
227 &bl_exp_conn->c_peer.nid,
228 ldlm_flock_lookup_cb,
236 class_export_put(bl_exp);
237 bl_exp = cb_data.exp;
239 LASSERT(req != lock);
240 flock = &lock->l_policy_data.l_flock;
241 LASSERT(flock->owner == bl_owner);
242 bl_owner = flock->blocking_owner;
243 bl_exp_new = class_export_get(flock->blocking_export);
244 class_export_put(bl_exp);
246 cfs_hash_put(bl_exp->exp_flock_hash, &lock->l_exp_flock_hash);
249 if (bl_exp->exp_failed)
252 if (bl_owner == req_owner &&
253 nid_same(&bl_exp_conn->c_peer.nid,
254 &req_exp->exp_connection->c_peer.nid)) {
255 class_export_put(bl_exp);
259 class_export_put(bl_exp);
264 static void ldlm_flock_cancel_on_deadlock(struct ldlm_lock *lock,
265 struct list_head *work_list)
267 CDEBUG(D_INFO, "reprocess deadlock req=%p\n", lock);
269 if ((exp_connect_flags(lock->l_export) &
270 OBD_CONNECT_FLOCK_DEAD) == 0) {
271 CERROR("deadlock found, but client doesn't support flock canceliation\n");
273 LASSERT(lock->l_completion_ast);
274 LASSERT(!ldlm_is_ast_sent(lock));
275 lock->l_flags |= (LDLM_FL_AST_SENT | LDLM_FL_CANCEL_ON_BLOCK |
276 LDLM_FL_FLOCK_DEADLOCK);
277 ldlm_flock_blocking_unlink(lock);
278 ldlm_resource_unlink_lock(lock);
279 ldlm_add_ast_work_item(lock, NULL, work_list);
282 #endif /* HAVE_SERVER_SUPPORT */
284 /** Add newly granted lock into interval tree for the resource. */
285 static void ldlm_flock_add_lock(struct ldlm_resource *res,
286 struct list_head *head,
287 struct ldlm_lock *lock)
289 struct interval_node *found, **root;
290 struct ldlm_interval *node = lock->l_tree_node;
291 struct ldlm_extent *extent = &lock->l_policy_data.l_extent;
294 LASSERT(ldlm_is_granted(lock));
296 LASSERT(node != NULL);
297 LASSERT(!interval_is_intree(&node->li_node));
299 rc = interval_set(&node->li_node, extent->start, extent->end);
302 root = &res->lr_flock_node.lfn_root;
303 found = interval_insert(&node->li_node, root);
304 if (found) { /* The same extent found. */
305 struct ldlm_interval *tmp = ldlm_interval_detach(lock);
307 LASSERT(tmp != NULL);
308 ldlm_interval_free(tmp);
309 ldlm_interval_attach(to_ldlm_interval(found), lock);
312 /* Add the locks into list */
313 ldlm_resource_add_lock(res, head, lock);
317 ldlm_flock_range_update(struct ldlm_lock *lock, struct ldlm_lock *req)
319 struct ldlm_resource *res = lock->l_resource;
320 struct interval_node *found, **root = &res->lr_flock_node.lfn_root;
321 struct ldlm_interval *node;
322 struct ldlm_extent *extent = &lock->l_policy_data.l_extent;
324 node = ldlm_interval_detach(lock);
326 node = ldlm_interval_detach(req);
329 interval_erase(&node->li_node, root);
331 interval_set(&node->li_node, extent->start, extent->end);
333 found = interval_insert(&node->li_node, root);
334 if (found) { /* The policy group found. */
335 ldlm_interval_free(node);
336 node = to_ldlm_interval(found);
338 ldlm_interval_attach(node, lock);
343 * Process a granting attempt for flock lock.
344 * Must be called under ns lock held.
346 * This function looks for any conflicts for \a lock in the granted or
347 * waiting queues. The lock is granted if no conflicts are found in
351 ldlm_process_flock_lock(struct ldlm_lock *req, __u64 *flags,
352 enum ldlm_process_intention intention,
353 enum ldlm_error *err, struct list_head *work_list)
355 struct ldlm_resource *res = req->l_resource;
356 struct ldlm_namespace *ns = ldlm_res_to_ns(res);
357 struct ldlm_lock *tmp;
358 struct ldlm_lock *ownlocks = NULL;
359 struct ldlm_lock *lock = NULL;
360 struct ldlm_lock *new = req;
361 struct ldlm_lock *new2 = NULL;
362 enum ldlm_mode mode = req->l_req_mode;
363 int local = ns_is_client(ns);
364 int added = (mode == LCK_NL);
366 const struct ldlm_callback_suite null_cbs = { NULL };
367 #ifdef HAVE_SERVER_SUPPORT
368 struct list_head *grant_work = (intention == LDLM_PROCESS_ENQUEUE ?
374 "flags %#llx owner %llu pid %u mode %u start %llu end %llu\n",
375 *flags, new->l_policy_data.l_flock.owner,
376 new->l_policy_data.l_flock.pid, mode,
377 req->l_policy_data.l_flock.start,
378 req->l_policy_data.l_flock.end);
383 /* No blocking ASTs are sent to the clients for
384 * Posix file & record locks
386 req->l_blocking_ast = NULL;
388 /* Called on the server for lock cancels. */
389 req->l_blocking_ast = ldlm_flock_blocking_ast;
393 if ((*flags == LDLM_FL_WAIT_NOREPROC) || (mode == LCK_NL)) {
394 /* This loop determines where this processes locks start
395 * in the resource lr_granted list.
397 list_for_each_entry(lock, &res->lr_granted, l_res_link) {
398 if (ldlm_same_flock_owner(lock, req)) {
404 #ifdef HAVE_SERVER_SUPPORT
406 int reprocess_failed = 0;
408 lockmode_verify(mode);
410 /* This loop determines if there are existing locks
411 * that conflict with the new lock request.
413 list_for_each_entry(lock, &res->lr_granted, l_res_link) {
414 if (ldlm_same_flock_owner(lock, req)) {
420 if (req->l_req_mode == LCK_PR &&
421 lock->l_granted_mode == LCK_PR &&
422 lock->l_policy_data.l_flock.start <=
423 req->l_policy_data.l_flock.start &&
424 lock->l_policy_data.l_flock.end >=
425 req->l_policy_data.l_flock.end) {
426 /* there can't be granted WR lock */
429 /* locks are compatible, overlap doesn't matter */
430 if (lockmode_compat(lock->l_granted_mode, mode))
433 if (!ldlm_flocks_overlap(lock, req))
436 if (intention != LDLM_PROCESS_ENQUEUE) {
437 ldlm_flock_blocking_unlink(req);
438 ldlm_flock_blocking_link(req, lock);
439 if (ldlm_flock_deadlock(req, lock)) {
440 ldlm_flock_cancel_on_deadlock(
442 RETURN(LDLM_ITER_CONTINUE);
444 reprocess_failed = 1;
448 if (*flags & LDLM_FL_BLOCK_NOWAIT) {
449 ldlm_flock_destroy(req, mode, *flags);
451 RETURN(LDLM_ITER_STOP);
454 if (*flags & LDLM_FL_TEST_LOCK) {
455 ldlm_flock_destroy(req, mode, *flags);
456 req->l_req_mode = lock->l_granted_mode;
457 req->l_policy_data.l_flock.pid =
458 lock->l_policy_data.l_flock.pid;
459 req->l_policy_data.l_flock.start =
460 lock->l_policy_data.l_flock.start;
461 req->l_policy_data.l_flock.end =
462 lock->l_policy_data.l_flock.end;
463 *flags |= LDLM_FL_LOCK_CHANGED;
464 RETURN(LDLM_ITER_STOP);
467 /* add lock to blocking list before deadlock
468 * check to prevent race
470 ldlm_flock_blocking_link(req, lock);
472 if (ldlm_flock_deadlock(req, lock)) {
473 ldlm_flock_blocking_unlink(req);
474 ldlm_flock_destroy(req, mode, *flags);
476 RETURN(LDLM_ITER_STOP);
479 ldlm_resource_add_lock(res, &res->lr_waiting, req);
480 *flags |= LDLM_FL_BLOCK_GRANTED;
481 RETURN(LDLM_ITER_STOP);
483 if (reprocess_failed)
484 RETURN(LDLM_ITER_CONTINUE);
487 if (*flags & LDLM_FL_TEST_LOCK) {
488 ldlm_flock_destroy(req, mode, *flags);
489 req->l_req_mode = LCK_NL;
490 *flags |= LDLM_FL_LOCK_CHANGED;
491 RETURN(LDLM_ITER_STOP);
494 /* In case we had slept on this lock request take it off of the
495 * deadlock detection hash list.
497 ldlm_flock_blocking_unlink(req);
498 #endif /* HAVE_SERVER_SUPPORT */
500 /* Scan the locks owned by this process to find the insertion point
501 * (as locks are ordered), and to handle overlaps.
502 * We may have to merge or split existing locks.
507 lock = list_entry(&res->lr_granted,
508 struct ldlm_lock, l_res_link);
509 list_for_each_entry_safe_from(lock, tmp, &res->lr_granted, l_res_link) {
510 if (!ldlm_same_flock_owner(lock, new))
513 if (lock->l_granted_mode == mode) {
514 /* If the modes are the same then we need to process
515 * locks that overlap OR adjoin the new lock. The extra
516 * logic condition is necessary to deal with arithmetic
517 * overflow and underflow.
519 if ((new->l_policy_data.l_flock.start >
520 (lock->l_policy_data.l_flock.end + 1))
521 && (lock->l_policy_data.l_flock.end !=
525 if ((new->l_policy_data.l_flock.end <
526 (lock->l_policy_data.l_flock.start - 1))
527 && (lock->l_policy_data.l_flock.start != 0))
530 if (new->l_policy_data.l_flock.start <
531 lock->l_policy_data.l_flock.start) {
532 lock->l_policy_data.l_flock.start =
533 new->l_policy_data.l_flock.start;
535 new->l_policy_data.l_flock.start =
536 lock->l_policy_data.l_flock.start;
539 if (new->l_policy_data.l_flock.end >
540 lock->l_policy_data.l_flock.end) {
541 lock->l_policy_data.l_flock.end =
542 new->l_policy_data.l_flock.end;
544 new->l_policy_data.l_flock.end =
545 lock->l_policy_data.l_flock.end;
549 ldlm_flock_destroy(lock, mode, *flags);
557 if (new->l_policy_data.l_flock.start >
558 lock->l_policy_data.l_flock.end)
561 if (new->l_policy_data.l_flock.end <
562 lock->l_policy_data.l_flock.start)
565 res->lr_flock_node.lfn_needs_reprocess = true;
567 if (new->l_policy_data.l_flock.start <=
568 lock->l_policy_data.l_flock.start) {
569 if (new->l_policy_data.l_flock.end <
570 lock->l_policy_data.l_flock.end) {
571 lock->l_policy_data.l_flock.start =
572 new->l_policy_data.l_flock.end + 1;
575 ldlm_flock_destroy(lock, lock->l_req_mode, *flags);
578 if (new->l_policy_data.l_flock.end >=
579 lock->l_policy_data.l_flock.end) {
580 lock->l_policy_data.l_flock.end =
581 new->l_policy_data.l_flock.start - 1;
582 ldlm_flock_range_update(lock, req);
586 /* split the existing lock into two locks */
588 /* if this is an F_UNLCK operation then we could avoid
589 * allocating a new lock and use the req lock passed in
590 * with the request but this would complicate the reply
591 * processing since updates to req get reflected in the
592 * reply. The client side replays the lock request so
593 * it must see the original lock data in the reply.
596 /* XXX - if ldlm_lock_new() can sleep we should
597 * release the lr_lock, allocate the new lock,
598 * and restart processing this lock.
601 unlock_res_and_lock(req);
602 new2 = ldlm_lock_create(ns, &res->lr_name, LDLM_FLOCK,
603 lock->l_granted_mode, &null_cbs,
604 NULL, 0, LVB_T_NONE);
605 lock_res_and_lock(req);
607 ldlm_flock_destroy(req, lock->l_granted_mode,
609 *err = PTR_ERR(new2);
610 RETURN(LDLM_ITER_STOP);
617 new2->l_granted_mode = lock->l_granted_mode;
618 new2->l_policy_data.l_flock.pid =
619 new->l_policy_data.l_flock.pid;
620 new2->l_policy_data.l_flock.owner =
621 new->l_policy_data.l_flock.owner;
622 new2->l_policy_data.l_flock.start =
623 lock->l_policy_data.l_flock.start;
624 new2->l_policy_data.l_flock.end =
625 new->l_policy_data.l_flock.start - 1;
626 lock->l_policy_data.l_flock.start =
627 new->l_policy_data.l_flock.end + 1;
628 new2->l_conn_export = lock->l_conn_export;
629 if (lock->l_export != NULL) {
630 new2->l_export = class_export_lock_get(lock->l_export,
632 if (new2->l_export->exp_lock_hash &&
633 hlist_unhashed(&new2->l_exp_hash))
634 cfs_hash_add(new2->l_export->exp_lock_hash,
635 &new2->l_remote_handle,
638 if (*flags == LDLM_FL_WAIT_NOREPROC)
639 ldlm_lock_addref_internal_nolock(new2,
640 lock->l_granted_mode);
642 /* insert new2 at lock */
643 ldlm_flock_add_lock(res, &lock->l_res_link, new2);
644 LDLM_LOCK_RELEASE(new2);
648 /* if new2 is created but never used, destroy it*/
649 if (splitted == 0 && new2 != NULL)
650 ldlm_lock_destroy_nolock(new2);
652 /* At this point we're granting the lock request. */
653 req->l_granted_mode = req->l_req_mode;
655 /* Add req to the granted queue before calling ldlm_reprocess_all(). */
657 list_del_init(&req->l_res_link);
658 /* insert new lock before "lock", which might be the
659 * next lock for this owner, or might be the first
660 * lock for the next owner, or might not be a lock at
661 * all, but instead points at the head of the list
663 ldlm_flock_add_lock(res, &lock->l_res_link, req);
666 if (*flags != LDLM_FL_WAIT_NOREPROC) {
667 #ifdef HAVE_SERVER_SUPPORT
668 if (intention == LDLM_PROCESS_ENQUEUE) {
669 /* If this is an unlock, reprocess the waitq and
670 * send completions ASTs for locks that can now be
671 * granted. The only problem with doing this
672 * reprocessing here is that the completion ASTs for
673 * newly granted locks will be sent before the unlock
674 * completion is sent. It shouldn't be an issue. Also
675 * note that ldlm_process_flock_lock() will recurse,
676 * but only once because 'intention' won't be
677 * LDLM_PROCESS_ENQUEUE from ldlm_reprocess_queue.
679 struct ldlm_flock_node *fn = &res->lr_flock_node;
681 if (mode == LCK_NL && fn->lfn_needs_reprocess &&
682 atomic_read(&fn->lfn_unlock_pending) == 0) {
686 ldlm_reprocess_queue(res, &res->lr_waiting,
688 LDLM_PROCESS_RESCAN, 0);
689 fn->lfn_needs_reprocess = false;
690 unlock_res_and_lock(req);
691 rc = ldlm_run_ast_work(ns, &rpc_list,
693 lock_res_and_lock(req);
694 if (rc == -ERESTART) {
695 fn->lfn_needs_reprocess = true;
700 LASSERT(req->l_completion_ast);
701 ldlm_add_ast_work_item(req, NULL, grant_work);
703 #else /* !HAVE_SERVER_SUPPORT */
704 /* The only one possible case for client-side calls flock
705 * policy function is ldlm_flock_completion_ast inside which
706 * carries LDLM_FL_WAIT_NOREPROC flag.
708 CERROR("Illegal parameter for client-side-only module.\n");
710 #endif /* HAVE_SERVER_SUPPORT */
713 /* In case we're reprocessing the requested lock we can't destroy
714 * it until after calling ldlm_add_ast_work_item() above so that laawi()
715 * can bump the reference count on \a req. Otherwise \a req
716 * could be freed before the completion AST can be sent.
719 ldlm_flock_destroy(req, mode, *flags);
721 ldlm_resource_dump(D_INFO, res);
722 RETURN(LDLM_ITER_CONTINUE);
726 * Flock completion callback function.
728 * \param lock [in,out]: A lock to be handled
729 * \param flags [in]: flags
730 * \param *data [in]: ldlm_work_cp_ast_lock() will use ldlm_cb_set_arg
732 * \retval 0 : success
733 * \retval <0 : failure
736 ldlm_flock_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data)
738 struct file_lock *getlk = lock->l_ast_data;
739 struct obd_device *obd;
745 CFS_FAIL_TIMEOUT(OBD_FAIL_LDLM_CP_CB_WAIT2, 4);
746 if (CFS_FAIL_PRECHECK(OBD_FAIL_LDLM_CP_CB_WAIT3)) {
747 lock_res_and_lock(lock);
748 lock->l_flags |= LDLM_FL_FAIL_LOC;
749 unlock_res_and_lock(lock);
750 CFS_FAIL_TIMEOUT(OBD_FAIL_LDLM_CP_CB_WAIT3, 4);
752 CDEBUG(D_DLMTRACE, "flags: %#llx data: %p getlk: %p\n",
755 LASSERT(flags != LDLM_FL_WAIT_NOREPROC);
757 if (flags & LDLM_FL_FAILED)
760 if (!(flags & LDLM_FL_BLOCKED_MASK)) {
762 /* mds granted the lock in the reply */
764 /* CP AST RPC: lock get granted, wake it up */
765 wake_up(&lock->l_waitq);
770 "client-side enqueue returned a blocked lock, sleeping");
771 obd = class_exp2obd(lock->l_conn_export);
773 /* Go to sleep until the lock is granted. */
774 rc = l_wait_event_abortable(lock->l_waitq,
775 is_granted_or_cancelled(lock));
777 /* take lock off the deadlock detection hash list. */
778 lock_res_and_lock(lock);
779 ldlm_flock_blocking_unlink(lock);
781 /* client side - set flag to prevent lock from being
784 ldlm_set_cbpending(lock);
785 unlock_res_and_lock(lock);
787 LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
793 CFS_FAIL_TIMEOUT(OBD_FAIL_LDLM_CP_CB_WAIT, 10);
795 if (CFS_FAIL_PRECHECK(OBD_FAIL_LDLM_CP_CB_WAIT4)) {
796 lock_res_and_lock(lock);
797 /* DEADLOCK is always set with CBPENDING */
798 lock->l_flags |= LDLM_FL_FLOCK_DEADLOCK | LDLM_FL_CBPENDING;
799 unlock_res_and_lock(lock);
800 CFS_FAIL_TIMEOUT(OBD_FAIL_LDLM_CP_CB_WAIT4, 4);
802 if (CFS_FAIL_PRECHECK(OBD_FAIL_LDLM_CP_CB_WAIT5)) {
803 lock_res_and_lock(lock);
804 /* DEADLOCK is always set with CBPENDING */
805 lock->l_flags |= (LDLM_FL_FAIL_LOC |
806 LDLM_FL_FLOCK_DEADLOCK | LDLM_FL_CBPENDING);
807 unlock_res_and_lock(lock);
808 CFS_FAIL_TIMEOUT(OBD_FAIL_LDLM_CP_CB_WAIT5, 4);
811 lock_res_and_lock(lock);
814 /* Protect against race where lock could have been just destroyed
815 * due to overlap in ldlm_process_flock_lock().
817 if (ldlm_is_destroyed(lock)) {
818 unlock_res_and_lock(lock);
819 LDLM_DEBUG(lock, "client-side enqueue waking up: destroyed");
821 /* error is returned up to ldlm_cli_enqueue_fini() caller. */
825 /* ldlm_lock_enqueue() has already placed lock on the granted list. */
826 ldlm_resource_unlink_lock(lock);
828 /* Import invalidation. We need to actually release the lock
829 * references being held, so that it can go away. No point in
830 * holding the lock even if app still believes it has it, since
831 * server already dropped it anyway. Only for granted locks too.
833 /* Do the same for DEADLOCK'ed locks. */
834 if (ldlm_is_failed(lock) || ldlm_is_flock_deadlock(lock)) {
837 if (flags & LDLM_FL_TEST_LOCK)
838 LASSERT(ldlm_is_test_lock(lock));
840 if (ldlm_is_test_lock(lock) || ldlm_is_flock_deadlock(lock))
841 mode = getlk->fl_type;
843 mode = lock->l_req_mode;
845 if (ldlm_is_flock_deadlock(lock)) {
847 "client-side enqueue deadlock received");
850 ldlm_flock_destroy(lock, mode, LDLM_FL_WAIT_NOREPROC);
851 unlock_res_and_lock(lock);
853 /* Need to wake up the waiter if we were evicted */
854 wake_up(&lock->l_waitq);
856 /* An error is still to be returned, to propagate it up to
857 * ldlm_cli_enqueue_fini() caller.
862 LDLM_DEBUG(lock, "client-side enqueue granted");
864 if (flags & LDLM_FL_TEST_LOCK) {
866 * fcntl(F_GETLK) request
867 * The old mode was saved in getlk->fl_type so that if the mode
868 * in the lock changes we can decref the appropriate refcount.
870 LASSERT(ldlm_is_test_lock(lock));
871 ldlm_flock_destroy(lock, getlk->fl_type, LDLM_FL_WAIT_NOREPROC);
872 switch (lock->l_granted_mode) {
874 getlk->fl_type = F_RDLCK;
877 getlk->fl_type = F_WRLCK;
880 getlk->fl_type = F_UNLCK;
882 getlk->fl_pid = (pid_t)lock->l_policy_data.l_flock.pid;
883 getlk->fl_start = (loff_t)lock->l_policy_data.l_flock.start;
884 getlk->fl_end = (loff_t)lock->l_policy_data.l_flock.end;
886 __u64 noreproc = LDLM_FL_WAIT_NOREPROC;
888 /* We need to reprocess the lock to do merges or splits
889 * with existing locks owned by this process.
891 ldlm_process_flock_lock(lock, &noreproc, 1, &err, NULL);
893 unlock_res_and_lock(lock);
896 EXPORT_SYMBOL(ldlm_flock_completion_ast);
898 int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
899 void *data, int flag)
904 LASSERT(flag == LDLM_CB_CANCELING);
906 /* take lock off the deadlock detection hash list. */
907 lock_res_and_lock(lock);
908 ldlm_flock_blocking_unlink(lock);
909 unlock_res_and_lock(lock);
913 void ldlm_flock_policy_wire_to_local(const union ldlm_wire_policy_data *wpolicy,
914 union ldlm_policy_data *lpolicy)
916 lpolicy->l_flock.start = wpolicy->l_flock.lfw_start;
917 lpolicy->l_flock.end = wpolicy->l_flock.lfw_end;
918 lpolicy->l_flock.pid = wpolicy->l_flock.lfw_pid;
919 lpolicy->l_flock.owner = wpolicy->l_flock.lfw_owner;
922 void ldlm_flock_policy_local_to_wire(const union ldlm_policy_data *lpolicy,
923 union ldlm_wire_policy_data *wpolicy)
925 memset(wpolicy, 0, sizeof(*wpolicy));
926 wpolicy->l_flock.lfw_start = lpolicy->l_flock.start;
927 wpolicy->l_flock.lfw_end = lpolicy->l_flock.end;
928 wpolicy->l_flock.lfw_pid = lpolicy->l_flock.pid;
929 wpolicy->l_flock.lfw_owner = lpolicy->l_flock.owner;
933 * Export handle<->flock hash operations.
936 ldlm_export_flock_hash(struct cfs_hash *hs, const void *key,
937 const unsigned int bits)
939 return cfs_hash_64(*(__u64 *)key, bits);
943 ldlm_export_flock_key(struct hlist_node *hnode)
945 struct ldlm_lock *lock;
947 lock = hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
948 return &lock->l_policy_data.l_flock.owner;
952 ldlm_export_flock_keycmp(const void *key, struct hlist_node *hnode)
954 return !memcmp(ldlm_export_flock_key(hnode), key, sizeof(__u64));
958 ldlm_export_flock_object(struct hlist_node *hnode)
960 return hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
964 ldlm_export_flock_get(struct cfs_hash *hs, struct hlist_node *hnode)
966 struct ldlm_lock *lock;
967 struct ldlm_flock *flock;
969 lock = hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
972 flock = &lock->l_policy_data.l_flock;
973 LASSERT(flock->blocking_export != NULL);
974 class_export_get(flock->blocking_export);
975 atomic_inc(&flock->blocking_refs);
979 ldlm_export_flock_put(struct cfs_hash *hs, struct hlist_node *hnode)
981 struct ldlm_lock *lock;
982 struct ldlm_flock *flock;
984 lock = hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
986 flock = &lock->l_policy_data.l_flock;
987 LASSERT(flock->blocking_export != NULL);
988 class_export_put(flock->blocking_export);
989 if (atomic_dec_and_test(&flock->blocking_refs)) {
990 flock->blocking_owner = 0;
991 flock->blocking_export = NULL;
993 LDLM_LOCK_RELEASE(lock);
996 static struct cfs_hash_ops ldlm_export_flock_ops = {
997 .hs_hash = ldlm_export_flock_hash,
998 .hs_key = ldlm_export_flock_key,
999 .hs_keycmp = ldlm_export_flock_keycmp,
1000 .hs_object = ldlm_export_flock_object,
1001 .hs_get = ldlm_export_flock_get,
1002 .hs_put = ldlm_export_flock_put,
1003 .hs_put_locked = ldlm_export_flock_put,
1006 int ldlm_init_flock_export(struct obd_export *exp)
1008 if (strcmp(exp->exp_obd->obd_type->typ_name, LUSTRE_MDT_NAME) != 0)
1011 exp->exp_flock_hash =
1012 cfs_hash_create(obd_uuid2str(&exp->exp_client_uuid),
1013 HASH_EXP_LOCK_CUR_BITS,
1014 HASH_EXP_LOCK_MAX_BITS,
1015 HASH_EXP_LOCK_BKT_BITS, 0,
1016 CFS_HASH_MIN_THETA, CFS_HASH_MAX_THETA,
1017 &ldlm_export_flock_ops,
1018 CFS_HASH_DEFAULT | CFS_HASH_NBLK_CHANGE);
1019 if (!exp->exp_flock_hash)
1025 void ldlm_destroy_flock_export(struct obd_export *exp)
1028 if (exp->exp_flock_hash) {
1029 cfs_hash_putref(exp->exp_flock_hash);
1030 exp->exp_flock_hash = NULL;