From: dmilos Date: Wed, 17 Sep 2003 02:07:00 +0000 (+0000) Subject: Bug# 782 - address code review comments and enable Posix locking file op. X-Git-Tag: v1_7_100~1^248~72 X-Git-Url: https://git.whamcloud.com/?a=commitdiff_plain;h=1c946a507996a3f83ae4266503a8699eb61de0ad;p=fs%2Flustre-release.git Bug# 782 - address code review comments and enable Posix locking file op. --- diff --git a/lustre/ldlm/ldlm_flock.c b/lustre/ldlm/ldlm_flock.c index b12e69b..b0123a3 100644 --- a/lustre/ldlm/ldlm_flock.c +++ b/lustre/ldlm/ldlm_flock.c @@ -35,138 +35,121 @@ #include "ldlm_internal.h" #define l_flock_waitq l_lru -#define l_flock_blocker l_parent static struct list_head ldlm_flock_waitq = LIST_HEAD_INIT(ldlm_flock_waitq); +/** + * list_for_remaining_safe - iterate over the remaining entries in a list + * and safeguard against removal of a list entry. + * @pos: the &struct list_head to use as a loop counter. pos MUST + * have been initialized prior to using it in this macro. + * @n: another &struct list_head to use as temporary storage + * @head: the head for your list. + */ +#define list_for_remaining_safe(pos, n, head) \ + for (n = pos->next; pos != (head); pos = n, n = pos->next) + static inline int ldlm_same_flock_owner(struct ldlm_lock *lock, struct ldlm_lock *new) { - if ((new->l_policy_data.l_flock.pid == - lock->l_policy_data.l_flock.pid) && - (new->l_export == lock->l_export)) - return 1; - else - return 0; + return((new->l_policy_data.l_flock.pid == + lock->l_policy_data.l_flock.pid) && + (new->l_export == lock->l_export)); } static inline int ldlm_flocks_overlap(struct ldlm_lock *lock, struct ldlm_lock *new) { - if ((new->l_policy_data.l_flock.start <= - lock->l_policy_data.l_flock.end) && - (new->l_policy_data.l_flock.end >= - lock->l_policy_data.l_flock.start)) - return 1; - else - return 0; + return((new->l_policy_data.l_flock.start <= + lock->l_policy_data.l_flock.end) && + (new->l_policy_data.l_flock.end >= + lock->l_policy_data.l_flock.start)); } static inline void -ldlm_flock_destroy(struct ldlm_lock *lock, int flags) +ldlm_flock_destroy(struct ldlm_lock *lock, ldlm_mode_t mode, int flags) { ENTRY; list_del_init(&lock->l_res_link); if (flags == LDLM_FL_WAIT_NOREPROC) { - /* client side */ - struct lustre_handle lockh; - - /* Set a flag to prevent us from sending a CANCEL */ - lock->l_flags |= LDLM_FL_LOCAL_ONLY; - - ldlm_lock2handle(lock, &lockh); - ldlm_lock_decref_and_cancel(&lockh, lock->l_granted_mode); + /* client side - set a flag to prevent sending a CANCEL */ + lock->l_flags |= LDLM_FL_LOCAL_ONLY | LDLM_FL_CBPENDING; + ldlm_lock_decref_internal(lock, mode); } ldlm_lock_destroy(lock); EXIT; } -#if 0 static int -ldlm_flock_deadlock(struct ldlm_lock *waiter, struct ldlm_lock *blocker) +ldlm_flock_deadlock(struct ldlm_lock *req, struct ldlm_lock *blocking_lock) { - struct list_head *tmp; - struct ldlm_lock *lock; - struct obd_export *waiter_export; - struct obd_export *blocker_export; - pid_t waiter_pid; - pid_t blocker_pid; - - waiter_export = waiter->l_export; - waiter_pid = waiter->l_policy_data.l_flock.pid; - blocker_export = blocker->l_export; - blocker_pid = blocker->l_policy_data.l_flock.pid; - -next_task: - if (waiter_export == blocker_export && waiter_pid == blocker_pid) - return 1; - - list_for_each(tmp, &ldlm_flock_waitq) { - - lock = list_entry(tmp, struct ldlm_lock, l_flock_waitq); - if ((lock->l_export == blocker_export) - && (lock->l_policy_data.l_flock.pid == blocker_pid)) { - lock = lock->l_flock_blocker; - blocker_export = lock->l_export; - blocker_pid = lock->l_policy_data.l_flock.pid; - goto next_task; - } - } - return 0; + struct obd_export *req_export = req->l_export; + struct obd_export *blocking_export = blocking_lock->l_export; + pid_t req_pid = req->l_policy_data.l_flock.pid; + pid_t blocking_pid = blocking_lock->l_policy_data.l_flock.pid; + struct ldlm_lock *lock; + +restart: + list_for_each_entry(lock, &ldlm_flock_waitq, l_flock_waitq) { + if ((lock->l_policy_data.l_flock.pid != blocking_pid) || + (lock->l_export != blocking_export)) + continue; + + blocking_pid = lock->l_policy_data.l_flock.blocking_pid; + blocking_export = lock->l_policy_data.l_flock.blocking_export; + if (blocking_pid == req_pid && blocking_export == req_export) + return 1; + + goto restart; + } + + return 0; } -#endif int -ldlm_flock_enqueue(struct ldlm_lock *req, int *flags, int first_enq, - ldlm_error_t *err) +ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq, + ldlm_error_t *err) { - struct ldlm_lock *new = req; - struct ldlm_lock *new2 = NULL; - struct ldlm_lock *lock = NULL; struct ldlm_resource *res = req->l_resource; struct ldlm_namespace *ns = res->lr_namespace; struct list_head *tmp; - struct list_head *ownlocks; + struct list_head *ownlocks = NULL; + struct ldlm_lock *lock = NULL; + struct ldlm_lock *new = req; + struct ldlm_lock *new2 = NULL; ldlm_mode_t mode = req->l_req_mode; - int added = 0; + int added = (mode == LCK_NL); int overlaps = 0; ENTRY; - CDEBUG(D_DLMTRACE, "flags: 0x%x pid: %d mode: %d start: %llu end: %llu\n", - *flags, new->l_policy_data.l_flock.pid, mode, + CDEBUG(D_DLMTRACE, "flags: 0x%x pid: %d mode: %d start: %llu end: %llu" + "\n", *flags, new->l_policy_data.l_flock.pid, mode, req->l_policy_data.l_flock.start, req->l_policy_data.l_flock.end); *err = ELDLM_OK; - /* No blocking ASTs are sent for record locks */ + /* No blocking ASTs are sent for Posix file & record locks */ req->l_blocking_ast = NULL; - ownlocks = NULL; - if ((*flags == LDLM_FL_WAIT_NOREPROC) || (mode == LCK_NL)) { - CDEBUG(D_DLMTRACE, "starting loop1.\n"); + if ((*flags == LDLM_FL_WAIT_NOREPROC) || (mode == LCK_NL)) { + /* This loop determines where this processes locks start + * in the resource lr_granted list. */ list_for_each(tmp, &res->lr_granted) { lock = list_entry(tmp, struct ldlm_lock, l_res_link); - - CDEBUG(D_DLMTRACE, "loop1 granted: %p tmp: %p\n", - &res->lr_granted, tmp); - if (ldlm_same_flock_owner(lock, req)) { ownlocks = tmp; break; } } - CDEBUG(D_DLMTRACE, "loop1 end.\n"); } else { - CDEBUG(D_DLMTRACE, "starting loop2.\n"); + /* This loop determines if there are existing locks + * that conflict with the new lock request. */ list_for_each(tmp, &res->lr_granted) { lock = list_entry(tmp, struct ldlm_lock, l_res_link); - CDEBUG(D_DLMTRACE, "loop2 granted: %p tmp: %p\n", - &res->lr_granted, tmp); - if (ldlm_same_flock_owner(lock, req)) { if (!ownlocks) ownlocks = tmp; @@ -180,98 +163,98 @@ ldlm_flock_enqueue(struct ldlm_lock *req, int *flags, int first_enq, if (!ldlm_flocks_overlap(lock, req)) continue; -#if 0 - if ((*flags & LDLM_FL_BLOCK_NOWAIT) || - (first_enq && ldlm_flock_deadlock(req, lock))) { -#else + if (!first_enq) + RETURN(LDLM_ITER_CONTINUE); + if (*flags & LDLM_FL_BLOCK_NOWAIT) { -#endif - ldlm_flock_destroy(req, *flags); - *err = ELDLM_LOCK_ABORTED; + ldlm_flock_destroy(req, mode, *flags); + *err = -EAGAIN; + RETURN(LDLM_ITER_STOP); + } + if (ldlm_flock_deadlock(req, lock)) { + ldlm_flock_destroy(req, mode, *flags); + *err = -EDEADLK; RETURN(LDLM_ITER_STOP); } - if (*flags & LDLM_FL_TEST_LOCK) { - req->l_granted_mode = lock->l_granted_mode; + ldlm_flock_destroy(req, mode, *flags); + req->l_req_mode = lock->l_granted_mode; req->l_policy_data.l_flock.pid = lock->l_policy_data.l_flock.pid; req->l_policy_data.l_flock.start = lock->l_policy_data.l_flock.start; req->l_policy_data.l_flock.end = lock->l_policy_data.l_flock.end; - ldlm_flock_destroy(req, *flags); + *flags |= LDLM_FL_LOCK_CHANGED; RETURN(LDLM_ITER_STOP); } - req->l_flock_blocker = lock; - list_add_tail(&ldlm_flock_waitq, &req->l_flock_waitq); + req->l_policy_data.l_flock.blocking_pid = + lock->l_policy_data.l_flock.pid; + req->l_policy_data.l_flock.blocking_export = + lock->l_export; + + LASSERT(list_empty(&req->l_flock_waitq)); + list_add_tail(&req->l_flock_waitq, &ldlm_flock_waitq); + + ldlm_resource_add_lock(res, &res->lr_waiting, req); *flags |= LDLM_FL_BLOCK_GRANTED; - RETURN(LDLM_ITER_CONTINUE); + RETURN(LDLM_ITER_STOP); } - CDEBUG(D_DLMTRACE, "loop2 end.\n"); } if (*flags & LDLM_FL_TEST_LOCK) { - LASSERT(first_enq); - req->l_granted_mode = req->l_req_mode; + ldlm_flock_destroy(req, mode, *flags); + req->l_req_mode = LCK_NL; + *flags |= LDLM_FL_LOCK_CHANGED; RETURN(LDLM_ITER_STOP); } - added = (mode == LCK_NL); - - /* Insert the new lock into the list */ + /* Scan the locks owned by this process that overlap this request. + * We may have to merge or split existing locks. */ if (!ownlocks) ownlocks = &res->lr_granted; - CDEBUG(D_DLMTRACE, "granted: %p ownlocks: %p\n", - &res->lr_granted, ownlocks); - - CDEBUG(D_DLMTRACE, "starting loop3.\n"); - for (tmp = ownlocks->next; ownlocks != &res->lr_granted; - ownlocks = tmp, tmp = ownlocks->next) { - - CDEBUG(D_DLMTRACE, "loop3 granted: %p ownlocks: %p\n", - &res->lr_granted, ownlocks); - + list_for_remaining_safe(ownlocks, tmp, &res->lr_granted) { lock = list_entry(ownlocks, struct ldlm_lock, l_res_link); if (!ldlm_same_flock_owner(lock, new)) break; - if (lock->l_granted_mode == mode) { - if (lock->l_policy_data.l_flock.end < + if (lock->l_granted_mode == mode) { + if (lock->l_policy_data.l_flock.end < (new->l_policy_data.l_flock.start - 1)) - continue; + continue; - if (lock->l_policy_data.l_flock.start > + if (lock->l_policy_data.l_flock.start > (new->l_policy_data.l_flock.end + 1)) - break; + break; - if (lock->l_policy_data.l_flock.start > + if (lock->l_policy_data.l_flock.start > new->l_policy_data.l_flock.start) - lock->l_policy_data.l_flock.start = + lock->l_policy_data.l_flock.start = new->l_policy_data.l_flock.start; - else - new->l_policy_data.l_flock.start = + else + new->l_policy_data.l_flock.start = lock->l_policy_data.l_flock.start; - if (lock->l_policy_data.l_flock.end < + if (lock->l_policy_data.l_flock.end < new->l_policy_data.l_flock.end) - lock->l_policy_data.l_flock.end = + lock->l_policy_data.l_flock.end = new->l_policy_data.l_flock.end; - else - new->l_policy_data.l_flock.end = + else + new->l_policy_data.l_flock.end = lock->l_policy_data.l_flock.end; - if (added) { - ldlm_flock_destroy(lock, *flags); - } else { + if (added) { + ldlm_flock_destroy(lock, mode, *flags); + } else { new = lock; added = 1; } continue; - } + } if (lock->l_policy_data.l_flock.end < new->l_policy_data.l_flock.start) @@ -290,7 +273,8 @@ ldlm_flock_enqueue(struct ldlm_lock *req, int *flags, int first_enq, new->l_policy_data.l_flock.end + 1; break; } else if (added) { - ldlm_flock_destroy(lock, *flags); + ldlm_flock_destroy(lock, lock->l_req_mode, + *flags); } else { lock->l_policy_data.l_flock.start = new->l_policy_data.l_flock.start; @@ -314,18 +298,18 @@ ldlm_flock_enqueue(struct ldlm_lock *req, int *flags, int first_enq, * allocating a new lock and use the req lock passed in * with the request but this would complicate the reply * processing since updates to req get reflected in the - * reply. The client side must see the original lock data - * so that it can process the unlock properly. */ + * reply. The client side replays the lock request so + * it must see the original lock data in the reply. */ - /* XXX - if ldlm_lock_new() can sleep we have to - * release the ns_lock, allocate the new lock, and - * restart processing this lock. */ + /* XXX - if ldlm_lock_new() can sleep we should + * release the ns_lock, allocate the new lock, + * and restart processing this lock. */ new2 = ldlm_lock_create(ns, NULL, res->lr_name, LDLM_FLOCK, lock->l_granted_mode, NULL, NULL); if (!new2) { - /* LBUG for now */ - LASSERT(0); - RETURN(ENOMEM); + ldlm_flock_destroy(req, lock->l_granted_mode, *flags); + *err = -ENOLCK; + RETURN(LDLM_ITER_STOP); } new2->l_granted_mode = lock->l_granted_mode; @@ -347,42 +331,40 @@ ldlm_flock_enqueue(struct ldlm_lock *req, int *flags, int first_enq, ldlm_lock_addref_internal(new2, lock->l_granted_mode); /* insert new2 at lock */ - list_add_tail(&new2->l_res_link, ownlocks); + ldlm_resource_add_lock(res, ownlocks, new2); LDLM_LOCK_PUT(new2); break; } - CDEBUG(D_DLMTRACE, "loop3 end; added: %d\n", added); - if (added) { - ldlm_flock_destroy(req, *flags); + ldlm_flock_destroy(req, mode, *flags); } else { - /* insert new at ownlocks */ + /* insert new after ownlocks */ new->l_granted_mode = new->l_req_mode; list_del_init(&new->l_res_link); - list_add_tail(&new->l_res_link, ownlocks); + ldlm_resource_add_lock(res, ownlocks, new); } - if (*flags != LDLM_FL_WAIT_NOREPROC) { + if (*flags != LDLM_FL_WAIT_NOREPROC) { if (req->l_completion_ast) ldlm_add_ast_work_item(req, NULL, NULL, 0); /* The only problem with doing the reprocessing here is that * the completion ASTs for newly granted locks will be sent * before the unlock completion is sent. It shouldn't be an - * issue. Also note that ldlm_flock_enqueue() will recurse, - * but only once because there can't be unlock requests on - * the wait queue. */ + * issue. Also note that ldlm_process_flock_lock() will + * recurse, but only once because there can't be unlock + * requests on the wait queue. */ if ((mode == LCK_NL) && overlaps) ldlm_reprocess_queue(res, &res->lr_waiting); } ldlm_resource_dump(res); - - RETURN(LDLM_ITER_CONTINUE); + RETURN(LDLM_ITER_CONTINUE); } -static void interrupted_flock_completion_wait(void *data) +static void +interrupted_flock_completion_wait(void *data) { } @@ -395,7 +377,7 @@ int ldlm_flock_completion_ast(struct ldlm_lock *lock, int flags, void *data) { struct ldlm_namespace *ns; - struct file_lock *getlk = data; + struct file_lock *getlk = lock->l_ast_data; struct flock_wait_data fwd; unsigned long irqflags; struct obd_device *obd; @@ -405,6 +387,9 @@ ldlm_flock_completion_ast(struct ldlm_lock *lock, int flags, void *data) struct l_wait_info lwi; ENTRY; + CDEBUG(D_DLMTRACE, "flags: 0x%x data: %p getlk: %p\n", + flags, data, getlk); + LASSERT(flags != LDLM_FL_WAIT_NOREPROC); if (flags == 0) { @@ -414,7 +399,7 @@ ldlm_flock_completion_ast(struct ldlm_lock *lock, int flags, void *data) if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED | LDLM_FL_BLOCK_CONV))) - goto granted; + goto granted; LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, " "sleeping"); @@ -434,8 +419,7 @@ ldlm_flock_completion_ast(struct ldlm_lock *lock, int flags, void *data) spin_unlock_irqrestore(&imp->imp_lock, irqflags); } - lwi = LWI_TIMEOUT_INTR(0, NULL, interrupted_flock_completion_wait, - &fwd); + lwi = LWI_TIMEOUT_INTR(0,NULL,interrupted_flock_completion_wait,&fwd); /* Go to sleep until the lock is granted. */ rc = l_wait_event(lock->l_waitq, @@ -457,39 +441,36 @@ granted: ns = lock->l_resource->lr_namespace; l_lock(&ns->ns_lock); - lock->l_flock_blocker = NULL; + /* take data off of deadlock detection waitq. */ list_del_init(&lock->l_flock_waitq); /* ldlm_lock_enqueue() has already placed lock on the granted list. */ list_del_init(&lock->l_res_link); - if (getlk) { + if (flags & LDLM_FL_TEST_LOCK) { /* fcntl(F_GETLK) request */ - if (lock->l_granted_mode == LCK_PR) + /* The old mode was saved in getlk->fl_type so that if the mode + * in the lock changes we can decref the approprate refcount. */ + ldlm_flock_destroy(lock, getlk->fl_type, LDLM_FL_WAIT_NOREPROC); + switch (lock->l_granted_mode) { + case LCK_PR: getlk->fl_type = F_RDLCK; - else if (lock->l_granted_mode == LCK_PW) + break; + case LCK_PW: getlk->fl_type = F_WRLCK; - else + break; + default: getlk->fl_type = F_UNLCK; + } getlk->fl_pid = lock->l_policy_data.l_flock.pid; getlk->fl_start = lock->l_policy_data.l_flock.start; getlk->fl_end = lock->l_policy_data.l_flock.end; - /* ldlm_flock_destroy(lock); */ } else { + /* We need to reprocess the lock to do merges or splits + * with existing locks owne by this process. */ flags = LDLM_FL_WAIT_NOREPROC; - /* We need to reprocess the lock to do merges or split */ - ldlm_flock_enqueue(lock, &flags, 1, &err); + ldlm_process_flock_lock(lock, &flags, 1, &err); } l_unlock(&ns->ns_lock); RETURN(0); } - -/* This function is only called on the client when a lock is aborted. */ -int -ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *ld, - void *data, int flag) -{ - ENTRY; - ldlm_lock_destroy(lock); - RETURN(0); -} diff --git a/lustre/ldlm/ldlm_plain.c b/lustre/ldlm/ldlm_plain.c index 0efc294..af55c17 100644 --- a/lustre/ldlm/ldlm_plain.c +++ b/lustre/ldlm/ldlm_plain.c @@ -39,7 +39,7 @@ ldlm_plain_compat_queue(struct list_head *queue, struct ldlm_lock *req, { struct list_head *tmp; struct ldlm_lock *lock; - ldlm_mode_t req_mode = req->l_req_mode; + ldlm_mode_t req_mode = req->l_req_mode; int compat = 1; ENTRY; @@ -67,11 +67,11 @@ ldlm_plain_compat_queue(struct list_head *queue, struct ldlm_lock *req, } int -ldlm_plain_enqueue(struct ldlm_lock *lock, int *flags, int first_enq, - ldlm_error_t *err) +ldlm_process_plain_lock(struct ldlm_lock *lock, int *flags, int first_enq, + ldlm_error_t *err) { struct ldlm_resource *res = lock->l_resource; - int compat; + int compat; ENTRY; if (first_enq) {