From aa98ff87f80830b2f88d710c203dc0cb2f1dc4d5 Mon Sep 17 00:00:00 2001 From: brian Date: Sat, 24 Sep 2005 03:25:58 +0000 Subject: [PATCH] Group lock code to pass test 58. --- lustre/ldlm/ldlm_extent.c | 217 ++++++++++++++++++++++++++++++++-------------- 1 file changed, 150 insertions(+), 67 deletions(-) diff --git a/lustre/ldlm/ldlm_extent.c b/lustre/ldlm/ldlm_extent.c index 6f3d57a..8240eab 100644 --- a/lustre/ldlm/ldlm_extent.c +++ b/lustre/ldlm/ldlm_extent.c @@ -163,80 +163,28 @@ ldlm_extent_compat_queue(struct list_head *queue, struct ldlm_lock *req, struct list_head **insertp) { struct list_head *tmp; - struct ldlm_lock *lock = req; + struct list_head *save = NULL; + struct ldlm_lock *lock = NULL; ldlm_mode_t req_mode = req->l_req_mode; int compat = 1; + int found = 0; ENTRY; lockmode_verify(req_mode); - if (req->l_req_mode == LCK_GROUP) { - int found = 0; + /* Extent locks are only queued once. We can get back here with + * insertp != NULL if the blocking ASTs returned -ERESTART. */ + if (!list_empty(&req->l_res_link)) + insertp = NULL; - list_for_each(tmp, queue) { - lock = list_entry(tmp, struct ldlm_lock, l_res_link); - if (req == lock) - break; - - if (lock->l_req_mode != LCK_GROUP) { - /* group locks are not blocked by waiting PR|PW - * locks. also, any group locks that could have - * blocked us would have appeared before this - * PR|PW lock in the list. */ - if (lock->l_req_mode != lock->l_granted_mode) - break; - - compat = 0; - if (!work_list) - break; - - if (lock->l_blocking_ast) - ldlm_add_ast_work_item(lock, req, - work_list); - continue; - } - - /* no blocking ASTs are sent for group locks. */ - - if (lock->l_policy_data.l_extent.gid != - req->l_policy_data.l_extent.gid) { - if (*flags & LDLM_FL_BLOCK_NOWAIT) - RETURN(-EWOULDBLOCK); - - compat = 0; - - /* there's a blocking group lock in front - * of us on the queue. It can be held - * indefinitely, so don't timeout. */ - *flags |= LDLM_FL_NO_TIMEOUT; - - /* the only reason to continue traversing the - * list at this point is to find the proper - * place to insert the lock in the waitq. */ - if (insertp && !found) - continue; - - break; - } - - /* if a group lock with this gid has already been - * granted then grant this one. */ - if (lock->l_req_mode == lock->l_granted_mode) { - compat = 2; - break; - } - found = 1; - } - } else { + if (req->l_req_mode != LCK_GROUP) { __u64 req_start = req->l_req_extent.start; __u64 req_end = req->l_req_extent.end; list_for_each(tmp, queue) { lock = list_entry(tmp, struct ldlm_lock, l_res_link); - if (req == lock) { - tmp = tmp->next; + if (req == lock) break; - } if (lock->l_req_mode == LCK_GROUP) { if (*flags & LDLM_FL_BLOCK_NOWAIT) @@ -248,7 +196,11 @@ ldlm_extent_compat_queue(struct list_head *queue, struct ldlm_lock *req, /* there's a blocking group lock in front * of us on the queue. It can be held * indefinitely, so don't timeout. */ - *flags |= LDLM_FL_NO_TIMEOUT; + if (insertp) { + *flags |= LDLM_FL_NO_TIMEOUT; + /* lock_bitlock(req) is held here. */ + req->l_flags |= LDLM_FL_NO_TIMEOUT; + } if (work_list) continue; @@ -273,13 +225,140 @@ ldlm_extent_compat_queue(struct list_head *queue, struct ldlm_lock *req, ldlm_add_ast_work_item(lock, req, work_list); } - /* Ensure we scan entire list if insertp is requested so that - * the new request will be appended to the end of the list. */ - LASSERTF((insertp == NULL) || (tmp == queue) || (req == lock)); + if (insertp) + *insertp = queue; + + RETURN(compat); + } + + list_for_each(tmp, queue) { + lock = list_entry(tmp, struct ldlm_lock, l_res_link); + if (req == lock) + break; + + if (lock->l_req_mode != LCK_GROUP) { + if (lock->l_req_mode != lock->l_granted_mode) { + /* we must be traversing the waitq. */ + + /* If a group lock was already found then + * req can be queued before any extent locks + * that come after the found group lock. */ + if (found) + break; + + if (!insertp) { + /* We've hit a conflicting extent lock + * on the waitq before hitting the req + * group lock. See comments below. */ + compat = 0; + break; + } + + /* Group locks are not normally blocked by + * waiting PR|PW locks. */ + + /* If NO_TIMEOUT was sent back to the client + * we can queue the group lock in front of + * this extent lock. */ + if (lock->l_flags & LDLM_FL_NO_TIMEOUT) { + if (save == NULL) + save = tmp; + continue; + } + + /* If we did NOT send NO_TIMEOUT back to the + * client for this extent lock then the client + * could possibly timeout if we queue this + * group lock before it, so don't. This is the + * only way to get a conflicting extent lock + * in front of a group lock on the waitq. */ + } + + compat = 0; + if (!work_list) { + LASSERT(save == NULL); + break; + } + + /* If we previously skipped over some extent locks + * because we thought we were going to queue the + * group lock in front of them then we need to go back + * and send blocking ASTs for the locks we skipped. */ + if (save != NULL) { + struct ldlm_lock *lck2; + + for (; save != tmp; save = save->next) { + lck2 = list_entry(save, + struct ldlm_lock, + l_res_link); + + /* If there was a group lock after save + * then we would have exited this loop + * above. */ + LASSERT(lck2->l_req_mode!=LCK_GROUP); + + if (lck2->l_blocking_ast) { + ldlm_add_ast_work_item(lck2,req, + work_list); + } + } + save = NULL; + } + + if (lock->l_blocking_ast) + ldlm_add_ast_work_item(lock, req, work_list); + continue; + } + + /* If it was safe to insert a group lock at save, + * i.e. save != NULL, then this group lock already + * on the queue would have been inserted before save. */ + LASSERT(save == NULL); + + /* Note: no blocking ASTs are sent for group locks. */ + + if (lock->l_policy_data.l_extent.gid == + req->l_policy_data.l_extent.gid) { + /* group locks with this gid already on the waitq. */ + found = 2; + + if (lock->l_req_mode == lock->l_granted_mode) { + /* if a group lock with this gid has already + * been granted then grant this one. */ + compat = 2; + break; + } + } else { + if (found == 2) + break; + + /* group locks already exist on the queue. */ + found = 1; + + if (*flags & LDLM_FL_BLOCK_NOWAIT) + RETURN(-EWOULDBLOCK); + + compat = 0; + + /* there's a blocking group lock in front + * of us on the queue. It can be held + * indefinitely, so don't timeout. */ + *flags |= LDLM_FL_NO_TIMEOUT; + + /* the only reason to continue traversing the + * list at this point is to find the proper + * place to insert the lock in the waitq. */ + if (!insertp) + break; + } } - if (insertp) - *insertp = tmp; + if (insertp != NULL) { + if (save != NULL) + *insertp = save; + else + *insertp = tmp; + } RETURN(compat); } @@ -322,6 +401,9 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq, ldlm_resource_unlink_lock(lock); ldlm_extent_policy(res, lock, flags); + lock_bitlock(lock); + lock->l_flags &= ~LDLM_FL_NO_TIMEOUT; + unlock_bitlock(lock); ldlm_grant_lock(lock, work_list); RETURN(LDLM_ITER_CONTINUE); } @@ -344,6 +426,7 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq, grant: ldlm_extent_policy(res, lock, flags); ldlm_resource_unlink_lock(lock); + lock->l_flags &= ~LDLM_FL_NO_TIMEOUT; ldlm_grant_lock(lock, NULL); } else { /* If either of the compat_queue()s returned failure, then we -- 1.8.3.1