1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
30 * Use is subject to license terms.
32 * Copyright (c) 2003 Hewlett-Packard Development Company LP.
33 * Developed under the sponsorship of the US Government under
34 * Subcontract No. B514193
37 * This file is part of Lustre, http://www.lustre.org/
38 * Lustre is a trademark of Sun Microsystems, Inc.
41 #define DEBUG_SUBSYSTEM S_LDLM
44 #include <lustre_dlm.h>
45 #include <obd_support.h>
46 #include <obd_class.h>
47 #include <lustre_lib.h>
48 #include <libcfs/list.h>
50 #include <liblustre.h>
51 #include <obd_class.h>
54 #include "ldlm_internal.h"
56 #define l_flock_waitq l_lru
59 * Wait queue for Posix lock deadlock detection, added with
60 * ldlm_lock::l_flock_waitq.
62 static CFS_LIST_HEAD(ldlm_flock_waitq);
64 * Lock protecting access to ldlm_flock_waitq.
66 cfs_spinlock_t ldlm_flock_waitq_lock = CFS_SPIN_LOCK_UNLOCKED;
68 int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
69 void *data, int flag);
72 * list_for_remaining_safe - iterate over the remaining entries in a list
73 * and safeguard against removal of a list entry.
74 * \param pos the &struct list_head to use as a loop counter. pos MUST
75 * have been initialized prior to using it in this macro.
76 * \param n another &struct list_head to use as temporary storage
77 * \param head the head for your list.
79 #define list_for_remaining_safe(pos, n, head) \
80 for (n = pos->next; pos != (head); pos = n, n = pos->next)
83 ldlm_same_flock_owner(struct ldlm_lock *lock, struct ldlm_lock *new)
85 return((new->l_policy_data.l_flock.pid ==
86 lock->l_policy_data.l_flock.pid) &&
87 (new->l_export == lock->l_export));
91 ldlm_flocks_overlap(struct ldlm_lock *lock, struct ldlm_lock *new)
93 return((new->l_policy_data.l_flock.start <=
94 lock->l_policy_data.l_flock.end) &&
95 (new->l_policy_data.l_flock.end >=
96 lock->l_policy_data.l_flock.start));
100 ldlm_flock_destroy(struct ldlm_lock *lock, ldlm_mode_t mode, int flags)
104 LDLM_DEBUG(lock, "ldlm_flock_destroy(mode: %d, flags: 0x%x)",
107 /* Safe to not lock here, since it should be empty anyway */
108 LASSERT(cfs_list_empty(&lock->l_flock_waitq));
110 cfs_list_del_init(&lock->l_res_link);
111 if (flags == LDLM_FL_WAIT_NOREPROC &&
112 !(lock->l_flags & LDLM_FL_FAILED)) {
113 /* client side - set a flag to prevent sending a CANCEL */
114 lock->l_flags |= LDLM_FL_LOCAL_ONLY | LDLM_FL_CBPENDING;
116 /* when reaching here, it is under lock_res_and_lock(). Thus,
117 need call the nolock version of ldlm_lock_decref_internal*/
118 ldlm_lock_decref_internal_nolock(lock, mode);
121 ldlm_lock_destroy_nolock(lock);
126 ldlm_flock_deadlock(struct ldlm_lock *req, struct ldlm_lock *blocking_lock)
128 struct obd_export *req_export = req->l_export;
129 struct obd_export *blocking_export = blocking_lock->l_export;
130 pid_t req_pid = req->l_policy_data.l_flock.pid;
131 pid_t blocking_pid = blocking_lock->l_policy_data.l_flock.pid;
132 struct ldlm_lock *lock;
134 cfs_spin_lock(&ldlm_flock_waitq_lock);
136 cfs_list_for_each_entry(lock, &ldlm_flock_waitq, l_flock_waitq) {
137 if ((lock->l_policy_data.l_flock.pid != blocking_pid) ||
138 (lock->l_export != blocking_export))
141 blocking_pid = lock->l_policy_data.l_flock.blocking_pid;
142 blocking_export = (struct obd_export *)(long)
143 lock->l_policy_data.l_flock.blocking_export;
144 if (blocking_pid == req_pid && blocking_export == req_export) {
145 cfs_spin_unlock(&ldlm_flock_waitq_lock);
151 cfs_spin_unlock(&ldlm_flock_waitq_lock);
157 ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq,
158 ldlm_error_t *err, cfs_list_t *work_list)
160 struct ldlm_resource *res = req->l_resource;
161 struct ldlm_namespace *ns = res->lr_namespace;
163 cfs_list_t *ownlocks = NULL;
164 struct ldlm_lock *lock = NULL;
165 struct ldlm_lock *new = req;
166 struct ldlm_lock *new2 = NULL;
167 ldlm_mode_t mode = req->l_req_mode;
168 int local = ns_is_client(ns);
169 int added = (mode == LCK_NL);
172 const struct ldlm_callback_suite null_cbs = { NULL };
175 CDEBUG(D_DLMTRACE, "flags %#x pid %u mode %u start "LPU64" end "LPU64
176 "\n", *flags, new->l_policy_data.l_flock.pid, mode,
177 req->l_policy_data.l_flock.start,
178 req->l_policy_data.l_flock.end);
183 /* No blocking ASTs are sent to the clients for
184 * Posix file & record locks */
185 req->l_blocking_ast = NULL;
187 /* Called on the server for lock cancels. */
188 req->l_blocking_ast = ldlm_flock_blocking_ast;
192 if ((*flags == LDLM_FL_WAIT_NOREPROC) || (mode == LCK_NL)) {
193 /* This loop determines where this processes locks start
194 * in the resource lr_granted list. */
195 cfs_list_for_each(tmp, &res->lr_granted) {
196 lock = cfs_list_entry(tmp, struct ldlm_lock,
198 if (ldlm_same_flock_owner(lock, req)) {
204 lockmode_verify(mode);
206 /* This loop determines if there are existing locks
207 * that conflict with the new lock request. */
208 cfs_list_for_each(tmp, &res->lr_granted) {
209 lock = cfs_list_entry(tmp, struct ldlm_lock,
212 if (ldlm_same_flock_owner(lock, req)) {
218 /* locks are compatible, overlap doesn't matter */
219 if (lockmode_compat(lock->l_granted_mode, mode))
222 if (!ldlm_flocks_overlap(lock, req))
226 RETURN(LDLM_ITER_CONTINUE);
228 if (*flags & LDLM_FL_BLOCK_NOWAIT) {
229 ldlm_flock_destroy(req, mode, *flags);
231 RETURN(LDLM_ITER_STOP);
234 if (*flags & LDLM_FL_TEST_LOCK) {
235 ldlm_flock_destroy(req, mode, *flags);
236 req->l_req_mode = lock->l_granted_mode;
237 req->l_policy_data.l_flock.pid =
238 lock->l_policy_data.l_flock.pid;
239 req->l_policy_data.l_flock.start =
240 lock->l_policy_data.l_flock.start;
241 req->l_policy_data.l_flock.end =
242 lock->l_policy_data.l_flock.end;
243 *flags |= LDLM_FL_LOCK_CHANGED;
244 RETURN(LDLM_ITER_STOP);
247 if (ldlm_flock_deadlock(req, lock)) {
248 ldlm_flock_destroy(req, mode, *flags);
250 RETURN(LDLM_ITER_STOP);
253 req->l_policy_data.l_flock.blocking_pid =
254 lock->l_policy_data.l_flock.pid;
255 req->l_policy_data.l_flock.blocking_export =
256 (long)(void *)lock->l_export;
258 LASSERT(cfs_list_empty(&req->l_flock_waitq));
259 cfs_spin_lock(&ldlm_flock_waitq_lock);
260 cfs_list_add_tail(&req->l_flock_waitq,
262 cfs_spin_unlock(&ldlm_flock_waitq_lock);
264 ldlm_resource_add_lock(res, &res->lr_waiting, req);
265 *flags |= LDLM_FL_BLOCK_GRANTED;
266 RETURN(LDLM_ITER_STOP);
270 if (*flags & LDLM_FL_TEST_LOCK) {
271 ldlm_flock_destroy(req, mode, *flags);
272 req->l_req_mode = LCK_NL;
273 *flags |= LDLM_FL_LOCK_CHANGED;
274 RETURN(LDLM_ITER_STOP);
277 /* In case we had slept on this lock request take it off of the
278 * deadlock detection waitq. */
279 cfs_spin_lock(&ldlm_flock_waitq_lock);
280 cfs_list_del_init(&req->l_flock_waitq);
281 cfs_spin_unlock(&ldlm_flock_waitq_lock);
283 /* Scan the locks owned by this process that overlap this request.
284 * We may have to merge or split existing locks. */
287 ownlocks = &res->lr_granted;
289 list_for_remaining_safe(ownlocks, tmp, &res->lr_granted) {
290 lock = cfs_list_entry(ownlocks, struct ldlm_lock, l_res_link);
292 if (!ldlm_same_flock_owner(lock, new))
295 if (lock->l_granted_mode == mode) {
296 /* If the modes are the same then we need to process
297 * locks that overlap OR adjoin the new lock. The extra
298 * logic condition is necessary to deal with arithmetic
299 * overflow and underflow. */
300 if ((new->l_policy_data.l_flock.start >
301 (lock->l_policy_data.l_flock.end + 1))
302 && (lock->l_policy_data.l_flock.end !=
306 if ((new->l_policy_data.l_flock.end <
307 (lock->l_policy_data.l_flock.start - 1))
308 && (lock->l_policy_data.l_flock.start != 0))
311 if (new->l_policy_data.l_flock.start <
312 lock->l_policy_data.l_flock.start) {
313 lock->l_policy_data.l_flock.start =
314 new->l_policy_data.l_flock.start;
316 new->l_policy_data.l_flock.start =
317 lock->l_policy_data.l_flock.start;
320 if (new->l_policy_data.l_flock.end >
321 lock->l_policy_data.l_flock.end) {
322 lock->l_policy_data.l_flock.end =
323 new->l_policy_data.l_flock.end;
325 new->l_policy_data.l_flock.end =
326 lock->l_policy_data.l_flock.end;
330 ldlm_flock_destroy(lock, mode, *flags);
338 if (new->l_policy_data.l_flock.start >
339 lock->l_policy_data.l_flock.end)
342 if (new->l_policy_data.l_flock.end <
343 lock->l_policy_data.l_flock.start)
348 if (new->l_policy_data.l_flock.start <=
349 lock->l_policy_data.l_flock.start) {
350 if (new->l_policy_data.l_flock.end <
351 lock->l_policy_data.l_flock.end) {
352 lock->l_policy_data.l_flock.start =
353 new->l_policy_data.l_flock.end + 1;
356 ldlm_flock_destroy(lock, lock->l_req_mode, *flags);
359 if (new->l_policy_data.l_flock.end >=
360 lock->l_policy_data.l_flock.end) {
361 lock->l_policy_data.l_flock.end =
362 new->l_policy_data.l_flock.start - 1;
366 /* split the existing lock into two locks */
368 /* if this is an F_UNLCK operation then we could avoid
369 * allocating a new lock and use the req lock passed in
370 * with the request but this would complicate the reply
371 * processing since updates to req get reflected in the
372 * reply. The client side replays the lock request so
373 * it must see the original lock data in the reply. */
375 /* XXX - if ldlm_lock_new() can sleep we should
376 * release the ns_lock, allocate the new lock,
377 * and restart processing this lock. */
379 unlock_res_and_lock(req);
380 new2 = ldlm_lock_create(ns, &res->lr_name, LDLM_FLOCK,
381 lock->l_granted_mode, &null_cbs,
383 lock_res_and_lock(req);
385 ldlm_flock_destroy(req, lock->l_granted_mode,
388 RETURN(LDLM_ITER_STOP);
395 new2->l_granted_mode = lock->l_granted_mode;
396 new2->l_policy_data.l_flock.pid =
397 new->l_policy_data.l_flock.pid;
398 new2->l_policy_data.l_flock.start =
399 lock->l_policy_data.l_flock.start;
400 new2->l_policy_data.l_flock.end =
401 new->l_policy_data.l_flock.start - 1;
402 lock->l_policy_data.l_flock.start =
403 new->l_policy_data.l_flock.end + 1;
404 new2->l_conn_export = lock->l_conn_export;
405 if (lock->l_export != NULL) {
406 new2->l_export = class_export_lock_get(lock->l_export, new2);
407 if (new2->l_export->exp_lock_hash &&
408 cfs_hlist_unhashed(&new2->l_exp_hash))
409 cfs_hash_add(new2->l_export->exp_lock_hash,
410 &new2->l_remote_handle,
413 if (*flags == LDLM_FL_WAIT_NOREPROC)
414 ldlm_lock_addref_internal_nolock(new2,
415 lock->l_granted_mode);
417 /* insert new2 at lock */
418 ldlm_resource_add_lock(res, ownlocks, new2);
419 LDLM_LOCK_RELEASE(new2);
423 /* if new2 is created but never used, destroy it*/
424 if (splitted == 0 && new2 != NULL)
425 ldlm_lock_destroy_nolock(new2);
427 /* At this point we're granting the lock request. */
428 req->l_granted_mode = req->l_req_mode;
430 /* Add req to the granted queue before calling ldlm_reprocess_all(). */
432 cfs_list_del_init(&req->l_res_link);
433 /* insert new lock before ownlocks in list. */
434 ldlm_resource_add_lock(res, ownlocks, req);
437 if (*flags != LDLM_FL_WAIT_NOREPROC) {
439 /* If this is an unlock, reprocess the waitq and
440 * send completions ASTs for locks that can now be
441 * granted. The only problem with doing this
442 * reprocessing here is that the completion ASTs for
443 * newly granted locks will be sent before the unlock
444 * completion is sent. It shouldn't be an issue. Also
445 * note that ldlm_process_flock_lock() will recurse,
446 * but only once because first_enq will be false from
447 * ldlm_reprocess_queue. */
448 if ((mode == LCK_NL) && overlaps) {
449 CFS_LIST_HEAD(rpc_list);
452 ldlm_reprocess_queue(res, &res->lr_waiting,
455 unlock_res_and_lock(req);
456 rc = ldlm_run_ast_work(&rpc_list,
458 lock_res_and_lock(req);
460 GOTO(restart, -ERESTART);
463 LASSERT(req->l_completion_ast);
464 ldlm_add_ast_work_item(req, NULL, work_list);
468 /* In case we're reprocessing the requested lock we can't destroy
469 * it until after calling ldlm_ast_work_item() above so that lawi()
470 * can bump the reference count on req. Otherwise req could be freed
471 * before the completion AST can be sent. */
473 ldlm_flock_destroy(req, mode, *flags);
475 ldlm_resource_dump(D_INFO, res);
476 RETURN(LDLM_ITER_CONTINUE);
479 struct ldlm_flock_wait_data {
480 struct ldlm_lock *fwd_lock;
485 ldlm_flock_interrupted_wait(void *data)
487 struct ldlm_lock *lock;
490 lock = ((struct ldlm_flock_wait_data *)data)->fwd_lock;
492 /* take lock off the deadlock detection waitq. */
493 cfs_spin_lock(&ldlm_flock_waitq_lock);
494 cfs_list_del_init(&lock->l_flock_waitq);
495 cfs_spin_unlock(&ldlm_flock_waitq_lock);
497 /* client side - set flag to prevent lock from being put on lru list */
498 lock->l_flags |= LDLM_FL_CBPENDING;
504 * Flock completion calback function.
506 * \param lock [in,out]: A lock to be handled
507 * \param flags [in]: flags
508 * \param *data [in]: ldlm_work_cp_ast_lock() will use ldlm_cb_set_arg
510 * \retval 0 : success
511 * \retval <0 : failure
514 ldlm_flock_completion_ast(struct ldlm_lock *lock, int flags, void *data)
516 cfs_flock_t *getlk = lock->l_ast_data;
517 struct obd_device *obd;
518 struct obd_import *imp = NULL;
519 struct ldlm_flock_wait_data fwd;
520 struct l_wait_info lwi;
525 CDEBUG(D_DLMTRACE, "flags: 0x%x data: %p getlk: %p\n",
528 /* Import invalidation. We need to actually release the lock
529 * references being held, so that it can go away. No point in
530 * holding the lock even if app still believes it has it, since
531 * server already dropped it anyway. Only for granted locks too. */
532 if ((lock->l_flags & (LDLM_FL_FAILED|LDLM_FL_LOCAL_ONLY)) ==
533 (LDLM_FL_FAILED|LDLM_FL_LOCAL_ONLY)) {
534 if (lock->l_req_mode == lock->l_granted_mode &&
535 lock->l_granted_mode != LCK_NL &&
537 ldlm_lock_decref_internal(lock, lock->l_req_mode);
539 /* Need to wake up the waiter if we were evicted */
540 cfs_waitq_signal(&lock->l_waitq);
544 LASSERT(flags != LDLM_FL_WAIT_NOREPROC);
546 if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
547 LDLM_FL_BLOCK_CONV))) {
549 /* mds granted the lock in the reply */
551 /* CP AST RPC: lock get granted, wake it up */
552 cfs_waitq_signal(&lock->l_waitq);
556 LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, "
559 obd = class_exp2obd(lock->l_conn_export);
561 /* if this is a local lock, there is no import */
563 imp = obd->u.cli.cl_import;
566 cfs_spin_lock(&imp->imp_lock);
567 fwd.fwd_generation = imp->imp_generation;
568 cfs_spin_unlock(&imp->imp_lock);
571 lwi = LWI_TIMEOUT_INTR(0, NULL, ldlm_flock_interrupted_wait, &fwd);
573 /* Go to sleep until the lock is granted. */
574 rc = l_wait_event(lock->l_waitq, is_granted_or_cancelled(lock), &lwi);
577 LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
583 OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_CP_CB_WAIT, 10);
585 if (lock->l_destroyed) {
586 LDLM_DEBUG(lock, "client-side enqueue waking up: destroyed");
590 if (lock->l_flags & LDLM_FL_FAILED) {
591 LDLM_DEBUG(lock, "client-side enqueue waking up: failed");
596 LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
601 LDLM_DEBUG(lock, "client-side enqueue granted");
603 /* take lock off the deadlock detection waitq. */
604 cfs_spin_lock(&ldlm_flock_waitq_lock);
605 cfs_list_del_init(&lock->l_flock_waitq);
606 cfs_spin_unlock(&ldlm_flock_waitq_lock);
608 lock_res_and_lock(lock);
609 /* ldlm_lock_enqueue() has already placed lock on the granted list. */
610 cfs_list_del_init(&lock->l_res_link);
612 if (flags & LDLM_FL_TEST_LOCK) {
613 /* fcntl(F_GETLK) request */
614 /* The old mode was saved in getlk->fl_type so that if the mode
615 * in the lock changes we can decref the appropriate refcount.*/
616 ldlm_flock_destroy(lock, cfs_flock_type(getlk),
617 LDLM_FL_WAIT_NOREPROC);
618 switch (lock->l_granted_mode) {
620 cfs_flock_set_type(getlk, F_RDLCK);
623 cfs_flock_set_type(getlk, F_WRLCK);
626 cfs_flock_set_type(getlk, F_UNLCK);
628 cfs_flock_set_pid(getlk,
629 (pid_t)lock->l_policy_data.l_flock.pid);
630 cfs_flock_set_start(getlk,
631 (loff_t)lock->l_policy_data.l_flock.start);
632 cfs_flock_set_end(getlk,
633 (loff_t)lock->l_policy_data.l_flock.end);
635 int noreproc = LDLM_FL_WAIT_NOREPROC;
637 /* We need to reprocess the lock to do merges or splits
638 * with existing locks owned by this process. */
639 ldlm_process_flock_lock(lock, &noreproc, 1, &err, NULL);
641 unlock_res_and_lock(lock);
644 EXPORT_SYMBOL(ldlm_flock_completion_ast);
646 int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
647 void *data, int flag)
649 struct ldlm_namespace *ns;
653 LASSERT(flag == LDLM_CB_CANCELING);
655 ns = lock->l_resource->lr_namespace;
657 /* take lock off the deadlock detection waitq. */
658 cfs_spin_lock(&ldlm_flock_waitq_lock);
659 cfs_list_del_init(&lock->l_flock_waitq);
660 cfs_spin_unlock(&ldlm_flock_waitq_lock);