4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2003 Hewlett-Packard Development Company LP.
28 * Developed under the sponsorship of the US Government under
29 * Subcontract No. B514193
31 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
32 * Use is subject to license terms.
34 * Copyright (c) 2010, 2011, Whamcloud, Inc.
37 * This file is part of Lustre, http://www.lustre.org/
38 * Lustre is a trademark of Sun Microsystems, Inc.
41 #define DEBUG_SUBSYSTEM S_LDLM
44 #include <lustre_dlm.h>
45 #include <obd_support.h>
46 #include <obd_class.h>
47 #include <lustre_lib.h>
48 #include <libcfs/list.h>
50 #include <liblustre.h>
51 #include <obd_class.h>
54 #include "ldlm_internal.h"
56 int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
57 void *data, int flag);
60 * list_for_remaining_safe - iterate over the remaining entries in a list
61 * and safeguard against removal of a list entry.
62 * \param pos the &struct list_head to use as a loop counter. pos MUST
63 * have been initialized prior to using it in this macro.
64 * \param n another &struct list_head to use as temporary storage
65 * \param head the head for your list.
67 #define list_for_remaining_safe(pos, n, head) \
68 for (n = pos->next; pos != (head); pos = n, n = pos->next)
71 ldlm_same_flock_owner(struct ldlm_lock *lock, struct ldlm_lock *new)
73 return((new->l_policy_data.l_flock.owner ==
74 lock->l_policy_data.l_flock.owner) &&
75 (new->l_export == lock->l_export));
79 ldlm_flocks_overlap(struct ldlm_lock *lock, struct ldlm_lock *new)
81 return((new->l_policy_data.l_flock.start <=
82 lock->l_policy_data.l_flock.end) &&
83 (new->l_policy_data.l_flock.end >=
84 lock->l_policy_data.l_flock.start));
87 static inline int ldlm_flock_blocking_link(struct ldlm_lock *req,
88 struct ldlm_lock *lock)
93 if (req->l_export == NULL)
96 if (unlikely(req->l_export->exp_flock_hash == NULL)) {
97 rc = ldlm_init_flock_export(req->l_export);
102 LASSERT(cfs_hlist_unhashed(&req->l_exp_flock_hash));
104 req->l_policy_data.l_flock.blocking_owner =
105 lock->l_policy_data.l_flock.owner;
106 req->l_policy_data.l_flock.blocking_export =
108 req->l_policy_data.l_flock.blocking_refs = 0;
110 cfs_hash_add(req->l_export->exp_flock_hash,
111 &req->l_policy_data.l_flock.owner,
112 &req->l_exp_flock_hash);
117 static inline void ldlm_flock_blocking_unlink(struct ldlm_lock *req)
119 /* For server only */
120 if (req->l_export == NULL)
123 check_res_locked(req->l_resource);
124 if (req->l_export->exp_flock_hash != NULL &&
125 !cfs_hlist_unhashed(&req->l_exp_flock_hash))
126 cfs_hash_del(req->l_export->exp_flock_hash,
127 &req->l_policy_data.l_flock.owner,
128 &req->l_exp_flock_hash);
132 ldlm_flock_destroy(struct ldlm_lock *lock, ldlm_mode_t mode, __u64 flags)
136 LDLM_DEBUG(lock, "ldlm_flock_destroy(mode: %d, flags: 0x%llx)",
139 /* Safe to not lock here, since it should be empty anyway */
140 LASSERT(cfs_hlist_unhashed(&lock->l_exp_flock_hash));
142 cfs_list_del_init(&lock->l_res_link);
143 if (flags == LDLM_FL_WAIT_NOREPROC &&
144 !(lock->l_flags & LDLM_FL_FAILED)) {
145 /* client side - set a flag to prevent sending a CANCEL */
146 lock->l_flags |= LDLM_FL_LOCAL_ONLY | LDLM_FL_CBPENDING;
148 /* when reaching here, it is under lock_res_and_lock(). Thus,
149 need call the nolock version of ldlm_lock_decref_internal*/
150 ldlm_lock_decref_internal_nolock(lock, mode);
153 ldlm_lock_destroy_nolock(lock);
158 ldlm_flock_deadlock(struct ldlm_lock *req, struct ldlm_lock *bl_lock)
160 struct obd_export *req_exp = req->l_export;
161 struct obd_export *bl_exp = bl_lock->l_export;
162 __u64 req_owner = req->l_policy_data.l_flock.owner;
163 __u64 bl_owner = bl_lock->l_policy_data.l_flock.owner;
165 /* For server only */
169 class_export_get(bl_exp);
171 struct obd_export *bl_exp_new;
172 struct ldlm_lock *lock = NULL;
173 struct ldlm_flock *flock;
175 if (bl_exp->exp_flock_hash != NULL)
176 lock = cfs_hash_lookup(bl_exp->exp_flock_hash,
181 flock = &lock->l_policy_data.l_flock;
182 LASSERT(flock->owner == bl_owner);
183 bl_owner = flock->blocking_owner;
184 bl_exp_new = class_export_get(flock->blocking_export);
185 class_export_put(bl_exp);
187 cfs_hash_put(bl_exp->exp_flock_hash, &lock->l_exp_flock_hash);
190 if (bl_owner == req_owner && bl_exp == req_exp) {
191 class_export_put(bl_exp);
195 class_export_put(bl_exp);
201 ldlm_process_flock_lock(struct ldlm_lock *req, __u64 *flags, int first_enq,
202 ldlm_error_t *err, cfs_list_t *work_list)
204 struct ldlm_resource *res = req->l_resource;
205 struct ldlm_namespace *ns = ldlm_res_to_ns(res);
207 cfs_list_t *ownlocks = NULL;
208 struct ldlm_lock *lock = NULL;
209 struct ldlm_lock *new = req;
210 struct ldlm_lock *new2 = NULL;
211 ldlm_mode_t mode = req->l_req_mode;
212 int local = ns_is_client(ns);
213 int added = (mode == LCK_NL);
216 const struct ldlm_callback_suite null_cbs = { NULL };
220 CDEBUG(D_DLMTRACE, "flags %#llx owner "LPU64" pid %u mode %u start "
221 LPU64" end "LPU64"\n", *flags,
222 new->l_policy_data.l_flock.owner,
223 new->l_policy_data.l_flock.pid, mode,
224 req->l_policy_data.l_flock.start,
225 req->l_policy_data.l_flock.end);
230 /* No blocking ASTs are sent to the clients for
231 * Posix file & record locks */
232 req->l_blocking_ast = NULL;
234 /* Called on the server for lock cancels. */
235 req->l_blocking_ast = ldlm_flock_blocking_ast;
239 if ((*flags == LDLM_FL_WAIT_NOREPROC) || (mode == LCK_NL)) {
240 /* This loop determines where this processes locks start
241 * in the resource lr_granted list. */
242 cfs_list_for_each(tmp, &res->lr_granted) {
243 lock = cfs_list_entry(tmp, struct ldlm_lock,
245 if (ldlm_same_flock_owner(lock, req)) {
251 lockmode_verify(mode);
253 /* This loop determines if there are existing locks
254 * that conflict with the new lock request. */
255 cfs_list_for_each(tmp, &res->lr_granted) {
256 lock = cfs_list_entry(tmp, struct ldlm_lock,
259 if (ldlm_same_flock_owner(lock, req)) {
265 /* locks are compatible, overlap doesn't matter */
266 if (lockmode_compat(lock->l_granted_mode, mode))
269 if (!ldlm_flocks_overlap(lock, req))
273 RETURN(LDLM_ITER_CONTINUE);
275 if (*flags & LDLM_FL_BLOCK_NOWAIT) {
276 ldlm_flock_destroy(req, mode, *flags);
278 RETURN(LDLM_ITER_STOP);
281 if (*flags & LDLM_FL_TEST_LOCK) {
282 ldlm_flock_destroy(req, mode, *flags);
283 req->l_req_mode = lock->l_granted_mode;
284 req->l_policy_data.l_flock.pid =
285 lock->l_policy_data.l_flock.pid;
286 req->l_policy_data.l_flock.start =
287 lock->l_policy_data.l_flock.start;
288 req->l_policy_data.l_flock.end =
289 lock->l_policy_data.l_flock.end;
290 *flags |= LDLM_FL_LOCK_CHANGED;
291 RETURN(LDLM_ITER_STOP);
294 if (ldlm_flock_deadlock(req, lock)) {
295 ldlm_flock_destroy(req, mode, *flags);
297 RETURN(LDLM_ITER_STOP);
300 rc = ldlm_flock_blocking_link(req, lock);
302 ldlm_flock_destroy(req, mode, *flags);
304 RETURN(LDLM_ITER_STOP);
306 ldlm_resource_add_lock(res, &res->lr_waiting, req);
307 *flags |= LDLM_FL_BLOCK_GRANTED;
308 RETURN(LDLM_ITER_STOP);
312 if (*flags & LDLM_FL_TEST_LOCK) {
313 ldlm_flock_destroy(req, mode, *flags);
314 req->l_req_mode = LCK_NL;
315 *flags |= LDLM_FL_LOCK_CHANGED;
316 RETURN(LDLM_ITER_STOP);
319 /* In case we had slept on this lock request take it off of the
320 * deadlock detection hash list. */
321 ldlm_flock_blocking_unlink(req);
323 /* Scan the locks owned by this process that overlap this request.
324 * We may have to merge or split existing locks. */
327 ownlocks = &res->lr_granted;
329 list_for_remaining_safe(ownlocks, tmp, &res->lr_granted) {
330 lock = cfs_list_entry(ownlocks, struct ldlm_lock, l_res_link);
332 if (!ldlm_same_flock_owner(lock, new))
335 if (lock->l_granted_mode == mode) {
336 /* If the modes are the same then we need to process
337 * locks that overlap OR adjoin the new lock. The extra
338 * logic condition is necessary to deal with arithmetic
339 * overflow and underflow. */
340 if ((new->l_policy_data.l_flock.start >
341 (lock->l_policy_data.l_flock.end + 1))
342 && (lock->l_policy_data.l_flock.end !=
346 if ((new->l_policy_data.l_flock.end <
347 (lock->l_policy_data.l_flock.start - 1))
348 && (lock->l_policy_data.l_flock.start != 0))
351 if (new->l_policy_data.l_flock.start <
352 lock->l_policy_data.l_flock.start) {
353 lock->l_policy_data.l_flock.start =
354 new->l_policy_data.l_flock.start;
356 new->l_policy_data.l_flock.start =
357 lock->l_policy_data.l_flock.start;
360 if (new->l_policy_data.l_flock.end >
361 lock->l_policy_data.l_flock.end) {
362 lock->l_policy_data.l_flock.end =
363 new->l_policy_data.l_flock.end;
365 new->l_policy_data.l_flock.end =
366 lock->l_policy_data.l_flock.end;
370 ldlm_flock_destroy(lock, mode, *flags);
378 if (new->l_policy_data.l_flock.start >
379 lock->l_policy_data.l_flock.end)
382 if (new->l_policy_data.l_flock.end <
383 lock->l_policy_data.l_flock.start)
388 if (new->l_policy_data.l_flock.start <=
389 lock->l_policy_data.l_flock.start) {
390 if (new->l_policy_data.l_flock.end <
391 lock->l_policy_data.l_flock.end) {
392 lock->l_policy_data.l_flock.start =
393 new->l_policy_data.l_flock.end + 1;
396 ldlm_flock_destroy(lock, lock->l_req_mode, *flags);
399 if (new->l_policy_data.l_flock.end >=
400 lock->l_policy_data.l_flock.end) {
401 lock->l_policy_data.l_flock.end =
402 new->l_policy_data.l_flock.start - 1;
406 /* split the existing lock into two locks */
408 /* if this is an F_UNLCK operation then we could avoid
409 * allocating a new lock and use the req lock passed in
410 * with the request but this would complicate the reply
411 * processing since updates to req get reflected in the
412 * reply. The client side replays the lock request so
413 * it must see the original lock data in the reply. */
415 /* XXX - if ldlm_lock_new() can sleep we should
416 * release the lr_lock, allocate the new lock,
417 * and restart processing this lock. */
419 unlock_res_and_lock(req);
420 new2 = ldlm_lock_create(ns, &res->lr_name, LDLM_FLOCK,
421 lock->l_granted_mode, &null_cbs,
423 lock_res_and_lock(req);
425 ldlm_flock_destroy(req, lock->l_granted_mode,
428 RETURN(LDLM_ITER_STOP);
435 new2->l_granted_mode = lock->l_granted_mode;
436 new2->l_policy_data.l_flock.pid =
437 new->l_policy_data.l_flock.pid;
438 new2->l_policy_data.l_flock.owner =
439 new->l_policy_data.l_flock.owner;
440 new2->l_policy_data.l_flock.start =
441 lock->l_policy_data.l_flock.start;
442 new2->l_policy_data.l_flock.end =
443 new->l_policy_data.l_flock.start - 1;
444 lock->l_policy_data.l_flock.start =
445 new->l_policy_data.l_flock.end + 1;
446 new2->l_conn_export = lock->l_conn_export;
447 if (lock->l_export != NULL) {
448 new2->l_export = class_export_lock_get(lock->l_export, new2);
449 if (new2->l_export->exp_lock_hash &&
450 cfs_hlist_unhashed(&new2->l_exp_hash))
451 cfs_hash_add(new2->l_export->exp_lock_hash,
452 &new2->l_remote_handle,
455 if (*flags == LDLM_FL_WAIT_NOREPROC)
456 ldlm_lock_addref_internal_nolock(new2,
457 lock->l_granted_mode);
459 /* insert new2 at lock */
460 ldlm_resource_add_lock(res, ownlocks, new2);
461 LDLM_LOCK_RELEASE(new2);
465 /* if new2 is created but never used, destroy it*/
466 if (splitted == 0 && new2 != NULL)
467 ldlm_lock_destroy_nolock(new2);
469 /* At this point we're granting the lock request. */
470 req->l_granted_mode = req->l_req_mode;
472 /* Add req to the granted queue before calling ldlm_reprocess_all(). */
474 cfs_list_del_init(&req->l_res_link);
475 /* insert new lock before ownlocks in list. */
476 ldlm_resource_add_lock(res, ownlocks, req);
479 if (*flags != LDLM_FL_WAIT_NOREPROC) {
480 #ifdef HAVE_SERVER_SUPPORT
482 /* If this is an unlock, reprocess the waitq and
483 * send completions ASTs for locks that can now be
484 * granted. The only problem with doing this
485 * reprocessing here is that the completion ASTs for
486 * newly granted locks will be sent before the unlock
487 * completion is sent. It shouldn't be an issue. Also
488 * note that ldlm_process_flock_lock() will recurse,
489 * but only once because first_enq will be false from
490 * ldlm_reprocess_queue. */
491 if ((mode == LCK_NL) && overlaps) {
492 CFS_LIST_HEAD(rpc_list);
495 ldlm_reprocess_queue(res, &res->lr_waiting,
498 unlock_res_and_lock(req);
499 rc = ldlm_run_ast_work(ns, &rpc_list,
501 lock_res_and_lock(req);
503 GOTO(restart, -ERESTART);
506 LASSERT(req->l_completion_ast);
507 ldlm_add_ast_work_item(req, NULL, work_list);
509 #else /* !HAVE_SERVER_SUPPORT */
510 /* The only one possible case for client-side calls flock
511 * policy function is ldlm_flock_completion_ast inside which
512 * carries LDLM_FL_WAIT_NOREPROC flag. */
513 CERROR("Illegal parameter for client-side-only module.\n");
515 #endif /* HAVE_SERVER_SUPPORT */
518 /* In case we're reprocessing the requested lock we can't destroy
519 * it until after calling ldlm_ast_work_item() above so that lawi()
520 * can bump the reference count on req. Otherwise req could be freed
521 * before the completion AST can be sent. */
523 ldlm_flock_destroy(req, mode, *flags);
525 ldlm_resource_dump(D_INFO, res);
526 RETURN(LDLM_ITER_CONTINUE);
529 struct ldlm_flock_wait_data {
530 struct ldlm_lock *fwd_lock;
535 ldlm_flock_interrupted_wait(void *data)
537 struct ldlm_lock *lock;
540 lock = ((struct ldlm_flock_wait_data *)data)->fwd_lock;
542 /* take lock off the deadlock detection hash list. */
543 lock_res_and_lock(lock);
544 ldlm_flock_blocking_unlink(lock);
546 /* client side - set flag to prevent lock from being put on lru list */
547 lock->l_flags |= LDLM_FL_CBPENDING;
548 unlock_res_and_lock(lock);
554 * Flock completion calback function.
556 * \param lock [in,out]: A lock to be handled
557 * \param flags [in]: flags
558 * \param *data [in]: ldlm_work_cp_ast_lock() will use ldlm_cb_set_arg
560 * \retval 0 : success
561 * \retval <0 : failure
564 ldlm_flock_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data)
566 cfs_flock_t *getlk = lock->l_ast_data;
567 struct obd_device *obd;
568 struct obd_import *imp = NULL;
569 struct ldlm_flock_wait_data fwd;
570 struct l_wait_info lwi;
575 CDEBUG(D_DLMTRACE, "flags: 0x%llx data: %p getlk: %p\n",
578 /* Import invalidation. We need to actually release the lock
579 * references being held, so that it can go away. No point in
580 * holding the lock even if app still believes it has it, since
581 * server already dropped it anyway. Only for granted locks too. */
582 if ((lock->l_flags & (LDLM_FL_FAILED|LDLM_FL_LOCAL_ONLY)) ==
583 (LDLM_FL_FAILED|LDLM_FL_LOCAL_ONLY)) {
584 if (lock->l_req_mode == lock->l_granted_mode &&
585 lock->l_granted_mode != LCK_NL &&
587 ldlm_lock_decref_internal(lock, lock->l_req_mode);
589 /* Need to wake up the waiter if we were evicted */
590 cfs_waitq_signal(&lock->l_waitq);
594 LASSERT(flags != LDLM_FL_WAIT_NOREPROC);
596 if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
597 LDLM_FL_BLOCK_CONV))) {
599 /* mds granted the lock in the reply */
601 /* CP AST RPC: lock get granted, wake it up */
602 cfs_waitq_signal(&lock->l_waitq);
606 LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, "
609 obd = class_exp2obd(lock->l_conn_export);
611 /* if this is a local lock, there is no import */
613 imp = obd->u.cli.cl_import;
616 cfs_spin_lock(&imp->imp_lock);
617 fwd.fwd_generation = imp->imp_generation;
618 cfs_spin_unlock(&imp->imp_lock);
621 lwi = LWI_TIMEOUT_INTR(0, NULL, ldlm_flock_interrupted_wait, &fwd);
623 /* Go to sleep until the lock is granted. */
624 rc = l_wait_event(lock->l_waitq, is_granted_or_cancelled(lock), &lwi);
627 LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
633 OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_CP_CB_WAIT, 10);
635 if (lock->l_destroyed) {
636 LDLM_DEBUG(lock, "client-side enqueue waking up: destroyed");
640 if (lock->l_flags & LDLM_FL_FAILED) {
641 LDLM_DEBUG(lock, "client-side enqueue waking up: failed");
646 LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
651 LDLM_DEBUG(lock, "client-side enqueue granted");
653 lock_res_and_lock(lock);
655 /* take lock off the deadlock detection hash list. */
656 ldlm_flock_blocking_unlink(lock);
658 /* ldlm_lock_enqueue() has already placed lock on the granted list. */
659 cfs_list_del_init(&lock->l_res_link);
661 if (flags & LDLM_FL_TEST_LOCK) {
662 /* fcntl(F_GETLK) request */
663 /* The old mode was saved in getlk->fl_type so that if the mode
664 * in the lock changes we can decref the appropriate refcount.*/
665 ldlm_flock_destroy(lock, cfs_flock_type(getlk),
666 LDLM_FL_WAIT_NOREPROC);
667 switch (lock->l_granted_mode) {
669 cfs_flock_set_type(getlk, F_RDLCK);
672 cfs_flock_set_type(getlk, F_WRLCK);
675 cfs_flock_set_type(getlk, F_UNLCK);
677 cfs_flock_set_pid(getlk,
678 (pid_t)lock->l_policy_data.l_flock.pid);
679 cfs_flock_set_start(getlk,
680 (loff_t)lock->l_policy_data.l_flock.start);
681 cfs_flock_set_end(getlk,
682 (loff_t)lock->l_policy_data.l_flock.end);
684 __u64 noreproc = LDLM_FL_WAIT_NOREPROC;
686 /* We need to reprocess the lock to do merges or splits
687 * with existing locks owned by this process. */
688 ldlm_process_flock_lock(lock, &noreproc, 1, &err, NULL);
690 unlock_res_and_lock(lock);
693 EXPORT_SYMBOL(ldlm_flock_completion_ast);
695 int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
696 void *data, int flag)
701 LASSERT(flag == LDLM_CB_CANCELING);
703 /* take lock off the deadlock detection hash list. */
704 lock_res_and_lock(lock);
705 ldlm_flock_blocking_unlink(lock);
706 unlock_res_and_lock(lock);
710 void ldlm_flock_policy_wire18_to_local(const ldlm_wire_policy_data_t *wpolicy,
711 ldlm_policy_data_t *lpolicy)
713 memset(lpolicy, 0, sizeof(*lpolicy));
714 lpolicy->l_flock.start = wpolicy->l_flock.lfw_start;
715 lpolicy->l_flock.end = wpolicy->l_flock.lfw_end;
716 lpolicy->l_flock.pid = wpolicy->l_flock.lfw_pid;
717 /* Compat code, old clients had no idea about owner field and
718 * relied solely on pid for ownership. Introduced in LU-104, 2.1,
720 lpolicy->l_flock.owner = wpolicy->l_flock.lfw_pid;
724 void ldlm_flock_policy_wire21_to_local(const ldlm_wire_policy_data_t *wpolicy,
725 ldlm_policy_data_t *lpolicy)
727 memset(lpolicy, 0, sizeof(*lpolicy));
728 lpolicy->l_flock.start = wpolicy->l_flock.lfw_start;
729 lpolicy->l_flock.end = wpolicy->l_flock.lfw_end;
730 lpolicy->l_flock.pid = wpolicy->l_flock.lfw_pid;
731 lpolicy->l_flock.owner = wpolicy->l_flock.lfw_owner;
734 void ldlm_flock_policy_local_to_wire(const ldlm_policy_data_t *lpolicy,
735 ldlm_wire_policy_data_t *wpolicy)
737 memset(wpolicy, 0, sizeof(*wpolicy));
738 wpolicy->l_flock.lfw_start = lpolicy->l_flock.start;
739 wpolicy->l_flock.lfw_end = lpolicy->l_flock.end;
740 wpolicy->l_flock.lfw_pid = lpolicy->l_flock.pid;
741 wpolicy->l_flock.lfw_owner = lpolicy->l_flock.owner;
745 * Export handle<->flock hash operations.
748 ldlm_export_flock_hash(cfs_hash_t *hs, const void *key, unsigned mask)
750 return cfs_hash_u64_hash(*(__u64 *)key, mask);
754 ldlm_export_flock_key(cfs_hlist_node_t *hnode)
756 struct ldlm_lock *lock;
758 lock = cfs_hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
759 return &lock->l_policy_data.l_flock.owner;
763 ldlm_export_flock_keycmp(const void *key, cfs_hlist_node_t *hnode)
765 return !memcmp(ldlm_export_flock_key(hnode), key, sizeof(__u64));
769 ldlm_export_flock_object(cfs_hlist_node_t *hnode)
771 return cfs_hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
775 ldlm_export_flock_get(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
777 struct ldlm_lock *lock;
778 struct ldlm_flock *flock;
780 lock = cfs_hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
783 flock = &lock->l_policy_data.l_flock;
784 LASSERT(flock->blocking_export != NULL);
785 class_export_get(flock->blocking_export);
786 flock->blocking_refs++;
790 ldlm_export_flock_put(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
792 struct ldlm_lock *lock;
793 struct ldlm_flock *flock;
795 lock = cfs_hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
796 LDLM_LOCK_RELEASE(lock);
798 flock = &lock->l_policy_data.l_flock;
799 LASSERT(flock->blocking_export != NULL);
800 class_export_put(flock->blocking_export);
801 if (--flock->blocking_refs == 0) {
802 flock->blocking_owner = 0;
803 flock->blocking_export = NULL;
807 static cfs_hash_ops_t ldlm_export_flock_ops = {
808 .hs_hash = ldlm_export_flock_hash,
809 .hs_key = ldlm_export_flock_key,
810 .hs_keycmp = ldlm_export_flock_keycmp,
811 .hs_object = ldlm_export_flock_object,
812 .hs_get = ldlm_export_flock_get,
813 .hs_put = ldlm_export_flock_put,
814 .hs_put_locked = ldlm_export_flock_put,
817 int ldlm_init_flock_export(struct obd_export *exp)
819 exp->exp_flock_hash =
820 cfs_hash_create(obd_uuid2str(&exp->exp_client_uuid),
821 HASH_EXP_LOCK_CUR_BITS,
822 HASH_EXP_LOCK_MAX_BITS,
823 HASH_EXP_LOCK_BKT_BITS, 0,
824 CFS_HASH_MIN_THETA, CFS_HASH_MAX_THETA,
825 &ldlm_export_flock_ops,
826 CFS_HASH_DEFAULT | CFS_HASH_NBLK_CHANGE);
827 if (!exp->exp_flock_hash)
832 EXPORT_SYMBOL(ldlm_init_flock_export);
834 void ldlm_destroy_flock_export(struct obd_export *exp)
837 if (exp->exp_flock_hash) {
838 cfs_hash_putref(exp->exp_flock_hash);
839 exp->exp_flock_hash = NULL;
843 EXPORT_SYMBOL(ldlm_destroy_flock_export);