4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2003 Hewlett-Packard Development Company LP.
24 * Developed under the sponsorship of the US Government under
25 * Subcontract No. B514193
27 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2010, 2017, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
37 * This file implements POSIX lock type for Lustre.
38 * Its policy properties are start and end of extent and PID.
40 * These locks are only done through MDS due to POSIX semantics requiring
41 * e.g. that locks could be only partially released and as such split into
42 * two parts, and also that two adjacent locks from the same process may be
43 * merged into a single wider lock.
45 * Lock modes are mapped like this:
46 * PR and PW for READ and WRITE locks
47 * NL to request a releasing of a portion of the lock
49 * These flock locks never timeout.
52 #define DEBUG_SUBSYSTEM S_LDLM
54 #include <linux/list.h>
55 #ifdef HAVE_LINUX_FILELOCK_HEADER
56 #include <linux/filelock.h>
58 #include <lustre_dlm.h>
59 #include <obd_support.h>
60 #include <obd_class.h>
61 #include <lustre_lib.h>
63 #include "ldlm_internal.h"
65 int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
66 void *data, int flag);
69 ldlm_same_flock_owner(struct ldlm_lock *lock, struct ldlm_lock *new)
71 return ((new->l_policy_data.l_flock.owner ==
72 lock->l_policy_data.l_flock.owner) &&
73 (new->l_export == lock->l_export));
77 ldlm_flocks_overlap(struct ldlm_lock *lock, struct ldlm_lock *new)
79 return ((new->l_policy_data.l_flock.start <=
80 lock->l_policy_data.l_flock.end) &&
81 (new->l_policy_data.l_flock.end >=
82 lock->l_policy_data.l_flock.start));
85 static inline void ldlm_flock_blocking_link(struct ldlm_lock *req,
86 struct ldlm_lock *lock)
89 if (req->l_export == NULL)
92 LASSERT(hlist_unhashed(&req->l_exp_flock_hash));
94 req->l_policy_data.l_flock.blocking_owner =
95 lock->l_policy_data.l_flock.owner;
96 req->l_policy_data.l_flock.blocking_export =
98 atomic_set(&req->l_policy_data.l_flock.blocking_refs, 0);
100 cfs_hash_add(req->l_export->exp_flock_hash,
101 &req->l_policy_data.l_flock.owner,
102 &req->l_exp_flock_hash);
105 static inline void ldlm_flock_blocking_unlink(struct ldlm_lock *req)
107 /* For server only */
108 if (req->l_export == NULL)
111 check_res_locked(req->l_resource);
112 if (req->l_export->exp_flock_hash != NULL &&
113 !hlist_unhashed(&req->l_exp_flock_hash))
114 cfs_hash_del(req->l_export->exp_flock_hash,
115 &req->l_policy_data.l_flock.owner,
116 &req->l_exp_flock_hash);
120 ldlm_flock_destroy(struct ldlm_lock *lock, enum ldlm_mode mode, __u64 flags)
124 LDLM_DEBUG(lock, "ldlm_flock_destroy(mode: %d, flags: %#llx)",
127 /* Safe to not lock here, since it should be empty anyway */
128 LASSERT(hlist_unhashed(&lock->l_exp_flock_hash));
130 list_del_init(&lock->l_res_link);
131 if (flags == LDLM_FL_WAIT_NOREPROC) {
132 /* client side - set a flag to prevent sending a CANCEL */
133 lock->l_flags |= LDLM_FL_LOCAL_ONLY | LDLM_FL_CBPENDING;
135 /* when reaching here, it is under lock_res_and_lock(). Thus,
136 * need call the nolock version of ldlm_lock_decref_internal
138 ldlm_lock_decref_internal_nolock(lock, mode);
141 ldlm_lock_destroy_nolock(lock);
145 #ifdef HAVE_SERVER_SUPPORT
147 * POSIX locks deadlock detection code.
149 * Given a new lock \a req and an existing lock \a bl_lock it conflicts
150 * with, we need to iterate through all blocked POSIX locks for this
151 * export and see if there is a deadlock condition arising. (i.e. when
152 * one client holds a lock on something and want a lock on something
153 * else and at the same time another client has the opposite situation).
156 struct ldlm_flock_lookup_cb_data {
158 struct ldlm_lock *lock;
159 struct obd_export *exp;
162 static int ldlm_flock_lookup_cb(struct obd_export *exp, void *data)
164 struct ldlm_flock_lookup_cb_data *cb_data = data;
165 struct ldlm_lock *lock;
170 lock = cfs_hash_lookup(exp->exp_flock_hash, cb_data->bl_owner);
174 /* Stop on first found lock. Same process can't sleep twice */
175 cb_data->lock = lock;
176 cb_data->exp = class_export_get(exp);
182 ldlm_flock_deadlock(struct ldlm_lock *req, struct ldlm_lock *bl_lock)
184 struct obd_export *req_exp = req->l_export;
185 struct obd_export *bl_exp = bl_lock->l_export;
186 __u64 req_owner = req->l_policy_data.l_flock.owner;
187 __u64 bl_owner = bl_lock->l_policy_data.l_flock.owner;
189 /* For server only */
193 class_export_get(bl_exp);
195 struct ldlm_flock_lookup_cb_data cb_data = {
196 .bl_owner = &bl_owner,
200 struct ptlrpc_connection *bl_exp_conn;
201 struct obd_export *bl_exp_new;
202 struct ldlm_lock *lock = NULL;
203 struct ldlm_flock *flock;
205 bl_exp_conn = bl_exp->exp_connection;
206 if (bl_exp->exp_flock_hash != NULL) {
209 found = obd_nid_export_for_each(bl_exp->exp_obd,
210 &bl_exp_conn->c_peer.nid,
211 ldlm_flock_lookup_cb,
219 class_export_put(bl_exp);
220 bl_exp = cb_data.exp;
222 LASSERT(req != lock);
223 flock = &lock->l_policy_data.l_flock;
224 LASSERT(flock->owner == bl_owner);
225 bl_owner = flock->blocking_owner;
226 bl_exp_new = class_export_get(flock->blocking_export);
227 class_export_put(bl_exp);
229 cfs_hash_put(bl_exp->exp_flock_hash, &lock->l_exp_flock_hash);
232 if (bl_exp->exp_failed)
235 if (bl_owner == req_owner &&
236 nid_same(&bl_exp_conn->c_peer.nid,
237 &req_exp->exp_connection->c_peer.nid)) {
238 class_export_put(bl_exp);
242 class_export_put(bl_exp);
247 static void ldlm_flock_cancel_on_deadlock(struct ldlm_lock *lock,
248 struct list_head *work_list)
250 CDEBUG(D_INFO, "reprocess deadlock req=%p\n", lock);
252 if ((exp_connect_flags(lock->l_export) &
253 OBD_CONNECT_FLOCK_DEAD) == 0) {
254 CERROR("deadlock found, but client doesn't support flock canceliation\n");
256 LASSERT(lock->l_completion_ast);
257 LASSERT(!ldlm_is_ast_sent(lock));
258 lock->l_flags |= (LDLM_FL_AST_SENT | LDLM_FL_CANCEL_ON_BLOCK |
259 LDLM_FL_FLOCK_DEADLOCK);
260 ldlm_flock_blocking_unlink(lock);
261 ldlm_resource_unlink_lock(lock);
262 ldlm_add_ast_work_item(lock, NULL, work_list);
265 #endif /* HAVE_SERVER_SUPPORT */
268 * Process a granting attempt for flock lock.
269 * Must be called under ns lock held.
271 * This function looks for any conflicts for \a lock in the granted or
272 * waiting queues. The lock is granted if no conflicts are found in
276 ldlm_process_flock_lock(struct ldlm_lock *req, __u64 *flags,
277 enum ldlm_process_intention intention,
278 enum ldlm_error *err, struct list_head *work_list)
280 struct ldlm_resource *res = req->l_resource;
281 struct ldlm_namespace *ns = ldlm_res_to_ns(res);
282 struct ldlm_lock *tmp;
283 struct ldlm_lock *ownlocks = NULL;
284 struct ldlm_lock *lock = NULL;
285 struct ldlm_lock *new = req;
286 struct ldlm_lock *new2 = NULL;
287 enum ldlm_mode mode = req->l_req_mode;
288 int local = ns_is_client(ns);
289 int added = (mode == LCK_NL);
291 const struct ldlm_callback_suite null_cbs = { NULL };
292 #ifdef HAVE_SERVER_SUPPORT
293 struct list_head *grant_work = (intention == LDLM_PROCESS_ENQUEUE ?
298 CDEBUG(D_DLMTRACE, "flags %#llx owner %llu pid %u mode %u start "
299 "%llu end %llu\n", *flags,
300 new->l_policy_data.l_flock.owner,
301 new->l_policy_data.l_flock.pid, mode,
302 req->l_policy_data.l_flock.start,
303 req->l_policy_data.l_flock.end);
308 /* No blocking ASTs are sent to the clients for
309 * Posix file & record locks
311 req->l_blocking_ast = NULL;
313 /* Called on the server for lock cancels. */
314 req->l_blocking_ast = ldlm_flock_blocking_ast;
318 if ((*flags == LDLM_FL_WAIT_NOREPROC) || (mode == LCK_NL)) {
319 /* This loop determines where this processes locks start
320 * in the resource lr_granted list.
322 list_for_each_entry(lock, &res->lr_granted, l_res_link) {
323 if (ldlm_same_flock_owner(lock, req)) {
329 #ifdef HAVE_SERVER_SUPPORT
331 int reprocess_failed = 0;
332 lockmode_verify(mode);
334 /* This loop determines if there are existing locks
335 * that conflict with the new lock request.
337 list_for_each_entry(lock, &res->lr_granted, l_res_link) {
338 if (ldlm_same_flock_owner(lock, req)) {
344 if (req->l_req_mode == LCK_PR &&
345 lock->l_granted_mode == LCK_PR &&
346 lock->l_policy_data.l_flock.start <=
347 req->l_policy_data.l_flock.start &&
348 lock->l_policy_data.l_flock.end >=
349 req->l_policy_data.l_flock.end) {
350 /* there can't be granted WR lock */
353 /* locks are compatible, overlap doesn't matter */
354 if (lockmode_compat(lock->l_granted_mode, mode))
357 if (!ldlm_flocks_overlap(lock, req))
360 if (intention != LDLM_PROCESS_ENQUEUE) {
361 if (ldlm_flock_deadlock(req, lock)) {
362 ldlm_flock_cancel_on_deadlock(
364 RETURN(LDLM_ITER_CONTINUE);
366 reprocess_failed = 1;
370 if (*flags & LDLM_FL_BLOCK_NOWAIT) {
371 ldlm_flock_destroy(req, mode, *flags);
373 RETURN(LDLM_ITER_STOP);
376 if (*flags & LDLM_FL_TEST_LOCK) {
377 ldlm_flock_destroy(req, mode, *flags);
378 req->l_req_mode = lock->l_granted_mode;
379 req->l_policy_data.l_flock.pid =
380 lock->l_policy_data.l_flock.pid;
381 req->l_policy_data.l_flock.start =
382 lock->l_policy_data.l_flock.start;
383 req->l_policy_data.l_flock.end =
384 lock->l_policy_data.l_flock.end;
385 *flags |= LDLM_FL_LOCK_CHANGED;
386 RETURN(LDLM_ITER_STOP);
389 /* add lock to blocking list before deadlock
390 * check to prevent race
392 ldlm_flock_blocking_link(req, lock);
394 if (ldlm_flock_deadlock(req, lock)) {
395 ldlm_flock_blocking_unlink(req);
396 ldlm_flock_destroy(req, mode, *flags);
398 RETURN(LDLM_ITER_STOP);
401 ldlm_resource_add_lock(res, &res->lr_waiting, req);
402 *flags |= LDLM_FL_BLOCK_GRANTED;
403 RETURN(LDLM_ITER_STOP);
405 if (reprocess_failed)
406 RETURN(LDLM_ITER_CONTINUE);
409 if (*flags & LDLM_FL_TEST_LOCK) {
410 ldlm_flock_destroy(req, mode, *flags);
411 req->l_req_mode = LCK_NL;
412 *flags |= LDLM_FL_LOCK_CHANGED;
413 RETURN(LDLM_ITER_STOP);
416 /* In case we had slept on this lock request take it off of the
417 * deadlock detection hash list.
419 ldlm_flock_blocking_unlink(req);
420 #endif /* HAVE_SERVER_SUPPORT */
422 /* Scan the locks owned by this process to find the insertion point
423 * (as locks are ordered), and to handle overlaps.
424 * We may have to merge or split existing locks.
429 lock = list_entry(&res->lr_granted,
430 struct ldlm_lock, l_res_link);
431 list_for_each_entry_safe_from(lock, tmp, &res->lr_granted, l_res_link) {
432 if (!ldlm_same_flock_owner(lock, new))
435 if (lock->l_granted_mode == mode) {
436 /* If the modes are the same then we need to process
437 * locks that overlap OR adjoin the new lock. The extra
438 * logic condition is necessary to deal with arithmetic
439 * overflow and underflow.
441 if ((new->l_policy_data.l_flock.start >
442 (lock->l_policy_data.l_flock.end + 1))
443 && (lock->l_policy_data.l_flock.end !=
447 if ((new->l_policy_data.l_flock.end <
448 (lock->l_policy_data.l_flock.start - 1))
449 && (lock->l_policy_data.l_flock.start != 0))
452 if (new->l_policy_data.l_flock.start <
453 lock->l_policy_data.l_flock.start) {
454 lock->l_policy_data.l_flock.start =
455 new->l_policy_data.l_flock.start;
457 new->l_policy_data.l_flock.start =
458 lock->l_policy_data.l_flock.start;
461 if (new->l_policy_data.l_flock.end >
462 lock->l_policy_data.l_flock.end) {
463 lock->l_policy_data.l_flock.end =
464 new->l_policy_data.l_flock.end;
466 new->l_policy_data.l_flock.end =
467 lock->l_policy_data.l_flock.end;
471 ldlm_flock_destroy(lock, mode, *flags);
479 if (new->l_policy_data.l_flock.start >
480 lock->l_policy_data.l_flock.end)
483 if (new->l_policy_data.l_flock.end <
484 lock->l_policy_data.l_flock.start)
487 res->lr_flock_node.lfn_needs_reprocess = true;
489 if (new->l_policy_data.l_flock.start <=
490 lock->l_policy_data.l_flock.start) {
491 if (new->l_policy_data.l_flock.end <
492 lock->l_policy_data.l_flock.end) {
493 lock->l_policy_data.l_flock.start =
494 new->l_policy_data.l_flock.end + 1;
497 ldlm_flock_destroy(lock, lock->l_req_mode, *flags);
500 if (new->l_policy_data.l_flock.end >=
501 lock->l_policy_data.l_flock.end) {
502 lock->l_policy_data.l_flock.end =
503 new->l_policy_data.l_flock.start - 1;
507 /* split the existing lock into two locks */
509 /* if this is an F_UNLCK operation then we could avoid
510 * allocating a new lock and use the req lock passed in
511 * with the request but this would complicate the reply
512 * processing since updates to req get reflected in the
513 * reply. The client side replays the lock request so
514 * it must see the original lock data in the reply.
517 /* XXX - if ldlm_lock_new() can sleep we should
518 * release the lr_lock, allocate the new lock,
519 * and restart processing this lock.
522 unlock_res_and_lock(req);
523 new2 = ldlm_lock_create(ns, &res->lr_name, LDLM_FLOCK,
524 lock->l_granted_mode, &null_cbs,
525 NULL, 0, LVB_T_NONE);
526 lock_res_and_lock(req);
528 ldlm_flock_destroy(req, lock->l_granted_mode,
530 *err = PTR_ERR(new2);
531 RETURN(LDLM_ITER_STOP);
538 new2->l_granted_mode = lock->l_granted_mode;
539 new2->l_policy_data.l_flock.pid =
540 new->l_policy_data.l_flock.pid;
541 new2->l_policy_data.l_flock.owner =
542 new->l_policy_data.l_flock.owner;
543 new2->l_policy_data.l_flock.start =
544 lock->l_policy_data.l_flock.start;
545 new2->l_policy_data.l_flock.end =
546 new->l_policy_data.l_flock.start - 1;
547 lock->l_policy_data.l_flock.start =
548 new->l_policy_data.l_flock.end + 1;
549 new2->l_conn_export = lock->l_conn_export;
550 if (lock->l_export != NULL) {
551 new2->l_export = class_export_lock_get(lock->l_export,
553 if (new2->l_export->exp_lock_hash &&
554 hlist_unhashed(&new2->l_exp_hash))
555 cfs_hash_add(new2->l_export->exp_lock_hash,
556 &new2->l_remote_handle,
559 if (*flags == LDLM_FL_WAIT_NOREPROC)
560 ldlm_lock_addref_internal_nolock(new2,
561 lock->l_granted_mode);
563 /* insert new2 at lock */
564 ldlm_resource_add_lock(res, &lock->l_res_link, new2);
565 LDLM_LOCK_RELEASE(new2);
569 /* if new2 is created but never used, destroy it*/
570 if (splitted == 0 && new2 != NULL)
571 ldlm_lock_destroy_nolock(new2);
573 /* At this point we're granting the lock request. */
574 req->l_granted_mode = req->l_req_mode;
576 /* Add req to the granted queue before calling ldlm_reprocess_all(). */
578 list_del_init(&req->l_res_link);
579 /* insert new lock before "lock", which might be the
580 * next lock for this owner, or might be the first
581 * lock for the next owner, or might not be a lock at
582 * all, but instead points at the head of the list
584 ldlm_resource_add_lock(res, &lock->l_res_link, req);
587 if (*flags != LDLM_FL_WAIT_NOREPROC) {
588 #ifdef HAVE_SERVER_SUPPORT
589 if (intention == LDLM_PROCESS_ENQUEUE) {
590 /* If this is an unlock, reprocess the waitq and
591 * send completions ASTs for locks that can now be
592 * granted. The only problem with doing this
593 * reprocessing here is that the completion ASTs for
594 * newly granted locks will be sent before the unlock
595 * completion is sent. It shouldn't be an issue. Also
596 * note that ldlm_process_flock_lock() will recurse,
597 * but only once because 'intention' won't be
598 * LDLM_PROCESS_ENQUEUE from ldlm_reprocess_queue.
600 struct ldlm_flock_node *fn = &res->lr_flock_node;
602 if (mode == LCK_NL && fn->lfn_needs_reprocess &&
603 atomic_read(&fn->lfn_unlock_pending) == 0) {
607 ldlm_reprocess_queue(res, &res->lr_waiting,
609 LDLM_PROCESS_RESCAN, 0);
610 fn->lfn_needs_reprocess = false;
611 unlock_res_and_lock(req);
612 rc = ldlm_run_ast_work(ns, &rpc_list,
614 lock_res_and_lock(req);
615 if (rc == -ERESTART) {
616 fn->lfn_needs_reprocess = true;
621 LASSERT(req->l_completion_ast);
622 ldlm_add_ast_work_item(req, NULL, grant_work);
624 #else /* !HAVE_SERVER_SUPPORT */
625 /* The only one possible case for client-side calls flock
626 * policy function is ldlm_flock_completion_ast inside which
627 * carries LDLM_FL_WAIT_NOREPROC flag.
629 CERROR("Illegal parameter for client-side-only module.\n");
631 #endif /* HAVE_SERVER_SUPPORT */
634 /* In case we're reprocessing the requested lock we can't destroy
635 * it until after calling ldlm_add_ast_work_item() above so that laawi()
636 * can bump the reference count on \a req. Otherwise \a req
637 * could be freed before the completion AST can be sent.
640 ldlm_flock_destroy(req, mode, *flags);
642 ldlm_resource_dump(D_INFO, res);
643 RETURN(LDLM_ITER_CONTINUE);
647 * Flock completion callback function.
649 * \param lock [in,out]: A lock to be handled
650 * \param flags [in]: flags
651 * \param *data [in]: ldlm_work_cp_ast_lock() will use ldlm_cb_set_arg
653 * \retval 0 : success
654 * \retval <0 : failure
657 ldlm_flock_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data)
659 struct file_lock *getlk = lock->l_ast_data;
660 struct obd_device *obd;
665 CFS_FAIL_TIMEOUT(OBD_FAIL_LDLM_CP_CB_WAIT2, 4);
666 if (CFS_FAIL_PRECHECK(OBD_FAIL_LDLM_CP_CB_WAIT3)) {
667 lock_res_and_lock(lock);
668 lock->l_flags |= LDLM_FL_FAIL_LOC;
669 unlock_res_and_lock(lock);
670 CFS_FAIL_TIMEOUT(OBD_FAIL_LDLM_CP_CB_WAIT3, 4);
672 CDEBUG(D_DLMTRACE, "flags: %#llx data: %p getlk: %p\n",
675 LASSERT(flags != LDLM_FL_WAIT_NOREPROC);
677 if (flags & LDLM_FL_FAILED)
680 if (!(flags & LDLM_FL_BLOCKED_MASK)) {
682 /* mds granted the lock in the reply */
684 /* CP AST RPC: lock get granted, wake it up */
685 wake_up(&lock->l_waitq);
690 "client-side enqueue returned a blocked lock, sleeping");
691 obd = class_exp2obd(lock->l_conn_export);
693 /* Go to sleep until the lock is granted. */
694 rc = l_wait_event_abortable(lock->l_waitq,
695 is_granted_or_cancelled(lock));
697 /* take lock off the deadlock detection hash list. */
698 lock_res_and_lock(lock);
699 ldlm_flock_blocking_unlink(lock);
701 /* client side - set flag to prevent lock from being
704 ldlm_set_cbpending(lock);
705 unlock_res_and_lock(lock);
707 LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
713 CFS_FAIL_TIMEOUT(OBD_FAIL_LDLM_CP_CB_WAIT, 10);
715 if (CFS_FAIL_PRECHECK(OBD_FAIL_LDLM_CP_CB_WAIT4)) {
716 lock_res_and_lock(lock);
717 /* DEADLOCK is always set with CBPENDING */
718 lock->l_flags |= LDLM_FL_FLOCK_DEADLOCK | LDLM_FL_CBPENDING;
719 unlock_res_and_lock(lock);
720 CFS_FAIL_TIMEOUT(OBD_FAIL_LDLM_CP_CB_WAIT4, 4);
722 if (CFS_FAIL_PRECHECK(OBD_FAIL_LDLM_CP_CB_WAIT5)) {
723 lock_res_and_lock(lock);
724 /* DEADLOCK is always set with CBPENDING */
725 lock->l_flags |= (LDLM_FL_FAIL_LOC |
726 LDLM_FL_FLOCK_DEADLOCK | LDLM_FL_CBPENDING);
727 unlock_res_and_lock(lock);
728 CFS_FAIL_TIMEOUT(OBD_FAIL_LDLM_CP_CB_WAIT5, 4);
731 lock_res_and_lock(lock);
734 /* Protect against race where lock could have been just destroyed
735 * due to overlap in ldlm_process_flock_lock().
737 if (ldlm_is_destroyed(lock)) {
738 unlock_res_and_lock(lock);
739 LDLM_DEBUG(lock, "client-side enqueue waking up: destroyed");
741 /* An error is still to be returned, to propagate it up to
742 * ldlm_cli_enqueue_fini() caller. */
746 /* ldlm_lock_enqueue() has already placed lock on the granted list. */
747 ldlm_resource_unlink_lock(lock);
749 /* Import invalidation. We need to actually release the lock
750 * references being held, so that it can go away. No point in
751 * holding the lock even if app still believes it has it, since
752 * server already dropped it anyway. Only for granted locks too.
754 /* Do the same for DEADLOCK'ed locks. */
755 if (ldlm_is_failed(lock) || ldlm_is_flock_deadlock(lock)) {
758 if (flags & LDLM_FL_TEST_LOCK)
759 LASSERT(ldlm_is_test_lock(lock));
761 if (ldlm_is_test_lock(lock) || ldlm_is_flock_deadlock(lock))
762 mode = getlk->fl_type;
764 mode = lock->l_req_mode;
766 if (ldlm_is_flock_deadlock(lock)) {
767 LDLM_DEBUG(lock, "client-side enqueue deadlock "
771 ldlm_flock_destroy(lock, mode, LDLM_FL_WAIT_NOREPROC);
772 unlock_res_and_lock(lock);
774 /* Need to wake up the waiter if we were evicted */
775 wake_up(&lock->l_waitq);
777 /* An error is still to be returned, to propagate it up to
778 * ldlm_cli_enqueue_fini() caller.
783 LDLM_DEBUG(lock, "client-side enqueue granted");
785 if (flags & LDLM_FL_TEST_LOCK) {
787 * fcntl(F_GETLK) request
788 * The old mode was saved in getlk->fl_type so that if the mode
789 * in the lock changes we can decref the appropriate refcount.
791 LASSERT(ldlm_is_test_lock(lock));
792 ldlm_flock_destroy(lock, getlk->fl_type, LDLM_FL_WAIT_NOREPROC);
793 switch (lock->l_granted_mode) {
795 getlk->fl_type = F_RDLCK;
798 getlk->fl_type = F_WRLCK;
801 getlk->fl_type = F_UNLCK;
803 getlk->fl_pid = (pid_t)lock->l_policy_data.l_flock.pid;
804 getlk->fl_start = (loff_t)lock->l_policy_data.l_flock.start;
805 getlk->fl_end = (loff_t)lock->l_policy_data.l_flock.end;
807 __u64 noreproc = LDLM_FL_WAIT_NOREPROC;
809 /* We need to reprocess the lock to do merges or splits
810 * with existing locks owned by this process.
812 ldlm_process_flock_lock(lock, &noreproc, 1, &err, NULL);
814 unlock_res_and_lock(lock);
817 EXPORT_SYMBOL(ldlm_flock_completion_ast);
819 int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
820 void *data, int flag)
825 LASSERT(flag == LDLM_CB_CANCELING);
827 /* take lock off the deadlock detection hash list. */
828 lock_res_and_lock(lock);
829 ldlm_flock_blocking_unlink(lock);
830 unlock_res_and_lock(lock);
834 void ldlm_flock_policy_wire_to_local(const union ldlm_wire_policy_data *wpolicy,
835 union ldlm_policy_data *lpolicy)
837 lpolicy->l_flock.start = wpolicy->l_flock.lfw_start;
838 lpolicy->l_flock.end = wpolicy->l_flock.lfw_end;
839 lpolicy->l_flock.pid = wpolicy->l_flock.lfw_pid;
840 lpolicy->l_flock.owner = wpolicy->l_flock.lfw_owner;
843 void ldlm_flock_policy_local_to_wire(const union ldlm_policy_data *lpolicy,
844 union ldlm_wire_policy_data *wpolicy)
846 memset(wpolicy, 0, sizeof(*wpolicy));
847 wpolicy->l_flock.lfw_start = lpolicy->l_flock.start;
848 wpolicy->l_flock.lfw_end = lpolicy->l_flock.end;
849 wpolicy->l_flock.lfw_pid = lpolicy->l_flock.pid;
850 wpolicy->l_flock.lfw_owner = lpolicy->l_flock.owner;
854 * Export handle<->flock hash operations.
857 ldlm_export_flock_hash(struct cfs_hash *hs, const void *key,
858 const unsigned int bits)
860 return cfs_hash_64(*(__u64 *)key, bits);
864 ldlm_export_flock_key(struct hlist_node *hnode)
866 struct ldlm_lock *lock;
868 lock = hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
869 return &lock->l_policy_data.l_flock.owner;
873 ldlm_export_flock_keycmp(const void *key, struct hlist_node *hnode)
875 return !memcmp(ldlm_export_flock_key(hnode), key, sizeof(__u64));
879 ldlm_export_flock_object(struct hlist_node *hnode)
881 return hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
885 ldlm_export_flock_get(struct cfs_hash *hs, struct hlist_node *hnode)
887 struct ldlm_lock *lock;
888 struct ldlm_flock *flock;
890 lock = hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
893 flock = &lock->l_policy_data.l_flock;
894 LASSERT(flock->blocking_export != NULL);
895 class_export_get(flock->blocking_export);
896 atomic_inc(&flock->blocking_refs);
900 ldlm_export_flock_put(struct cfs_hash *hs, struct hlist_node *hnode)
902 struct ldlm_lock *lock;
903 struct ldlm_flock *flock;
905 lock = hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
907 flock = &lock->l_policy_data.l_flock;
908 LASSERT(flock->blocking_export != NULL);
909 class_export_put(flock->blocking_export);
910 if (atomic_dec_and_test(&flock->blocking_refs)) {
911 flock->blocking_owner = 0;
912 flock->blocking_export = NULL;
914 LDLM_LOCK_RELEASE(lock);
917 static struct cfs_hash_ops ldlm_export_flock_ops = {
918 .hs_hash = ldlm_export_flock_hash,
919 .hs_key = ldlm_export_flock_key,
920 .hs_keycmp = ldlm_export_flock_keycmp,
921 .hs_object = ldlm_export_flock_object,
922 .hs_get = ldlm_export_flock_get,
923 .hs_put = ldlm_export_flock_put,
924 .hs_put_locked = ldlm_export_flock_put,
927 int ldlm_init_flock_export(struct obd_export *exp)
929 if( strcmp(exp->exp_obd->obd_type->typ_name, LUSTRE_MDT_NAME) != 0)
932 exp->exp_flock_hash =
933 cfs_hash_create(obd_uuid2str(&exp->exp_client_uuid),
934 HASH_EXP_LOCK_CUR_BITS,
935 HASH_EXP_LOCK_MAX_BITS,
936 HASH_EXP_LOCK_BKT_BITS, 0,
937 CFS_HASH_MIN_THETA, CFS_HASH_MAX_THETA,
938 &ldlm_export_flock_ops,
939 CFS_HASH_DEFAULT | CFS_HASH_NBLK_CHANGE);
940 if (!exp->exp_flock_hash)
946 void ldlm_destroy_flock_export(struct obd_export *exp)
949 if (exp->exp_flock_hash) {
950 cfs_hash_putref(exp->exp_flock_hash);
951 exp->exp_flock_hash = NULL;