From 640e9754c33a492244e01492243a4d07b33759f3 Mon Sep 17 00:00:00 2001 From: yury Date: Thu, 8 Sep 2005 08:34:57 +0000 Subject: [PATCH] - landed share_write_lock.patch from #7410 --- lustre/cmobd/cm_oss_reint.c | 3 +- lustre/ldlm/ldlm_extent.c | 255 ++++++++++++++++++++++---------------------- lustre/ldlm/ldlm_internal.h | 2 - lustre/ldlm/ldlm_lock.c | 39 ++++--- lustre/ldlm/ldlm_lockd.c | 3 +- lustre/ldlm/ldlm_resource.c | 20 ---- lustre/lmv/lmv_obd.c | 5 +- lustre/lov/lov_obd.c | 6 +- lustre/lov/lov_request.c | 5 + lustre/osc/osc_request.c | 6 +- 10 files changed, 162 insertions(+), 182 deletions(-) diff --git a/lustre/cmobd/cm_oss_reint.c b/lustre/cmobd/cm_oss_reint.c index 4e4e630..c460ffb 100644 --- a/lustre/cmobd/cm_oss_reint.c +++ b/lustre/cmobd/cm_oss_reint.c @@ -217,8 +217,7 @@ static int cmobd_write_extents(struct obd_device *obd, struct obdo *oa, /* XXX for debug write replay without smfs and kml */ res_id.name[0]= oa->o_id; res_id.name[1]= oa->o_gr; - policy.l_extent.start = extent->start; - policy.l_extent.end = extent->end; + policy.l_extent = *extent; /* get extent read lock on the source replay file */ rc = ldlm_cli_enqueue(NULL, NULL, cache->obd_namespace, res_id, diff --git a/lustre/ldlm/ldlm_extent.c b/lustre/ldlm/ldlm_extent.c index 391d493..25f4ae7 100644 --- a/lustre/ldlm/ldlm_extent.c +++ b/lustre/ldlm/ldlm_extent.c @@ -134,6 +134,9 @@ static void ldlm_extent_policy(struct ldlm_resource *res, { struct ldlm_extent new_ex = { .start = 0, .end = ~0}; + if (lock->l_req_mode == LCK_GROUP) + return; + ldlm_extent_internal_policy(&res->lr_granted, lock, &new_ex); ldlm_extent_internal_policy(&res->lr_waiting, lock, &new_ex); @@ -156,139 +159,126 @@ static void ldlm_extent_policy(struct ldlm_resource *res, */ static int ldlm_extent_compat_queue(struct list_head *queue, struct ldlm_lock *req, - int *flags, ldlm_error_t *err, - struct list_head *work_list) + int *flags, struct list_head *work_list, + struct list_head **insertp) { struct list_head *tmp; struct ldlm_lock *lock; ldlm_mode_t req_mode = req->l_req_mode; - __u64 req_start = req->l_req_extent.start; - __u64 req_end = req->l_req_extent.end; int compat = 1; - int scan = 0; ENTRY; lockmode_verify(req_mode); - list_for_each(tmp, queue) { - lock = list_entry(tmp, struct ldlm_lock, l_res_link); + if (req->l_req_mode == LCK_GROUP) { + int found = 0; - if (req == lock) - RETURN(compat); - - if (scan) { - /* We only get here if we are queuing GROUP lock - and met some incompatible one. The main idea of this - code is to insert GROUP lock past compatible GROUP - lock in the waiting queue or if there is not any, - then in front of first non-GROUP lock */ - if (lock->l_req_mode != LCK_GROUP) { - /* Ok, we hit non-GROUP lock, there should be no - more GROUP locks later on, queue in front of - first non-GROUP lock */ - - ldlm_resource_insert_lock_after(lock, req); - list_del_init(&lock->l_res_link); - ldlm_resource_insert_lock_after(req, lock); - RETURN(0); - } - if (req->l_policy_data.l_extent.gid == - lock->l_policy_data.l_extent.gid) { - /* found it */ - ldlm_resource_insert_lock_after(lock, - req); - RETURN(0); - } - continue; - } + list_for_each(tmp, queue) { + lock = list_entry(tmp, struct ldlm_lock, l_res_link); + if (req == lock) + break; - /* locks are compatible, overlap doesn't matter */ - if (lockmode_compat(lock->l_req_mode, req_mode)) { - /* non-group locks are compatible, overlap doesn't - matter */ - if (req_mode != LCK_GROUP) + if (lock->l_req_mode != LCK_GROUP) { + /* group locks are not blocked by waiting PR|PW + * locks. also, any group locks that could have + * blocked us would have appeared before this + * PR|PW lock in the list. */ + if (lock->l_req_mode != lock->l_granted_mode) + break; + + compat = 0; + if (!work_list) + break; + + if (lock->l_blocking_ast) + ldlm_add_ast_work_item(lock, req, + work_list); continue; - - /* If we are trying to get a GROUP lock and there is - another one of this kind, we need to compare gid */ - if (req->l_policy_data.l_extent.gid == - lock->l_policy_data.l_extent.gid) { - if (lock->l_req_mode == lock->l_granted_mode) - RETURN(2); - - /* If we are in nonblocking mode - return - immediately */ - if (*flags & LDLM_FL_BLOCK_NOWAIT) { - compat = -EWOULDBLOCK; - goto destroylock; - } - /* If this group lock is compatible with another - * group lock on the waiting list, they must be - * together in the list, so they can be granted - * at the same time. Otherwise the later lock - * can get stuck behind another, incompatible, - * lock. */ - ldlm_resource_insert_lock_after(lock, req); - /* Because 'lock' is not granted, we can stop - * processing this queue and return immediately. - * There is no need to check the rest of the - * list. */ - RETURN(0); } - } - if (req_mode == LCK_GROUP && - (lock->l_req_mode != lock->l_granted_mode)) { - scan = 1; - compat = 0; - if (lock->l_req_mode != LCK_GROUP) { - /* Ok, we hit non-GROUP lock, there should be no - more GROUP locks later on, queue in front of - first non-GROUP lock */ - - ldlm_resource_insert_lock_after(lock, req); - list_del_init(&lock->l_res_link); - ldlm_resource_insert_lock_after(req, lock); - RETURN(0); + /* no blocking ASTs are sent for group locks. */ + + if (lock->l_policy_data.l_extent.gid != + req->l_policy_data.l_extent.gid) { + if (*flags & LDLM_FL_BLOCK_NOWAIT) + RETURN(-EWOULDBLOCK); + + compat = 0; + + /* there's a blocking group lock in front + * of us on the queue. It can be held + * indefinitely, so don't timeout. */ + *flags |= LDLM_FL_NO_TIMEOUT; + + /* the only reason to continue traversing the + * list at this point is to find the proper + * place to insert the lock in the waitq. */ + if (insertp && !found) + continue; + + break; } - if (req->l_policy_data.l_extent.gid == - lock->l_policy_data.l_extent.gid) { - /* found it */ - ldlm_resource_insert_lock_after(lock, req); - RETURN(0); + + /* if a group lock with this gid has already been + * granted then grant this one. */ + if (lock->l_req_mode == lock->l_granted_mode) { + compat = 2; + break; } - continue; + found = 1; } + } else { + __u64 req_start = req->l_req_extent.start; + __u64 req_end = req->l_req_extent.end; + + list_for_each(tmp, queue) { + lock = list_entry(tmp, struct ldlm_lock, l_res_link); + if (req == lock) + break; - if (lock->l_req_mode == LCK_GROUP) { - /* If compared lock is GROUP, then requested is PR/PW/=> - * this is not compatible; extent range does not - * matter */ - if (*flags & LDLM_FL_BLOCK_NOWAIT) { - compat = -EWOULDBLOCK; - goto destroylock; - } else { + if (lock->l_req_mode == LCK_GROUP) { + if (*flags & LDLM_FL_BLOCK_NOWAIT) + RETURN(-EWOULDBLOCK); + + /* No blocking ASTs are sent for group locks. */ + compat = 0; + + /* there's a blocking group lock in front + * of us on the queue. It can be held + * indefinitely, so don't timeout. */ *flags |= LDLM_FL_NO_TIMEOUT; + + if (work_list) + continue; + else + break; } - } else if (lock->l_policy_data.l_extent.end < req_start || - lock->l_policy_data.l_extent.start > req_end) { - /* if a non grouplock doesn't overlap skip it */ - continue; - } - if (!work_list) - RETURN(0); + /* locks are compatible, overlap doesn't matter */ + if (lockmode_compat(lock->l_req_mode, req_mode)) + continue; + + if (lock->l_policy_data.l_extent.end < req_start || + lock->l_policy_data.l_extent.start > req_end) + continue; - compat = 0; - if (lock->l_blocking_ast) - ldlm_add_ast_work_item(lock, req, work_list); + compat = 0; + + if (!work_list) + break; + + if (lock->l_blocking_ast) + ldlm_add_ast_work_item(lock, req, work_list); + } + + /* Ensure we scan entire list if insertp is requested so that + * the new request will be appended to the end of the list. */ + LASSERT((insertp == NULL) || (tmp == queue)); } - return(compat); -destroylock: - list_del_init(&req->l_res_link); - ldlm_lock_destroy(req); - *err = compat; + if (insertp) + *insertp = tmp; + RETURN(compat); } @@ -306,6 +296,7 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq, { struct ldlm_resource *res = lock->l_resource; struct list_head rpc_list = LIST_HEAD_INIT(rpc_list); + struct list_head *insertp = NULL; int rc, rc2; ENTRY; @@ -313,17 +304,16 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq, *err = ELDLM_OK; if (!first_enq) { - /* Careful observers will note that we don't handle -EWOULDBLOCK - * here, but it's ok for a non-obvious reason -- compat_queue - * can only return -EWOULDBLOCK if (flags & BLOCK_NOWAIT). - * flags should always be zero here, and if that ever stops - * being true, we want to find out. */ + /* -EWOULDBLOCK can't occur here since (flags & BLOCK_NOWAIT) + * lock requests would either be granted or fail on their + * first_enq. flags should always be zero here, and if that + * ever changes we want to find out. */ LASSERT(*flags == 0); - rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags, - err, NULL); + rc = ldlm_extent_compat_queue(&res->lr_granted, lock, + flags, NULL, NULL); if (rc == 1) { rc = ldlm_extent_compat_queue(&res->lr_waiting, lock, - flags, err, NULL); + flags, NULL, NULL); } if (rc == 0) RETURN(LDLM_ITER_STOP); @@ -335,19 +325,21 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq, } restart: - rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags, err, &rpc_list); + rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags, &rpc_list, + NULL); if (rc < 0) - GOTO(out, rc); /* lock was destroyed */ - if (rc == 2) { + GOTO(destroylock, rc); + if (rc == 2) goto grant; - } - rc2 = ldlm_extent_compat_queue(&res->lr_waiting, lock, flags, err, &rpc_list); + /* Traverse the waiting list in case there are other conflicting + * lock requests ahead of us in the queue and send blocking ASTs */ + rc2 = ldlm_extent_compat_queue(&res->lr_waiting, lock, flags, &rpc_list, + &insertp); if (rc2 < 0) - GOTO(out, rc = rc2); /* lock was destroyed */ - + GOTO(destroylock, rc); if (rc + rc2 == 2) { - grant: + grant: ldlm_extent_policy(res, lock, flags); ldlm_resource_unlink_lock(lock); ldlm_grant_lock(lock, NULL); @@ -359,7 +351,7 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq, * terrible folly -- if we goto restart, we could get * re-ordered! Causes deadlock, because ASTs aren't sent! */ if (list_empty(&lock->l_res_link)) - ldlm_resource_add_lock(res, &res->lr_waiting, lock); + ldlm_resource_add_lock(res, insertp, lock); unlock_res(res); rc = ldlm_run_bl_ast_work(&rpc_list); lock_res(res); @@ -367,8 +359,15 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq, GOTO(restart, -ERESTART); *flags |= LDLM_FL_BLOCK_GRANTED; } - rc = 0; -out: + + RETURN(0); + + destroylock: + list_del_init(&lock->l_res_link); + unlock_res(res); + ldlm_lock_destroy(lock); + lock_res(res); + *err = rc; RETURN(rc); } diff --git a/lustre/ldlm/ldlm_internal.h b/lustre/ldlm/ldlm_internal.h index c6ee99e..8d992b2 100644 --- a/lustre/ldlm/ldlm_internal.h +++ b/lustre/ldlm/ldlm_internal.h @@ -30,8 +30,6 @@ typedef enum { int ldlm_cancel_lru(struct ldlm_namespace *ns, ldlm_sync_t sync); /* ldlm_resource.c */ -void ldlm_resource_insert_lock_after(struct ldlm_lock *original, - struct ldlm_lock *new); int ldlm_resource_putref_locked(struct ldlm_resource *res); /* ldlm_lock.c */ diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index 606ccc8..13d3be3 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -498,11 +498,11 @@ void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode) lock->l_writers--; } - if (lock->l_flags & LDLM_FL_LOCAL && + if (((lock->l_flags & LDLM_FL_LOCAL) || (mode == LCK_GROUP)) && !lock->l_readers && !lock->l_writers) { - /* If this is a local lock on a server namespace and this was - * the last reference, cancel the lock. */ - CDEBUG(D_INFO, "forcing cancel of local lock\n"); + /* If this is a local lock on a server namespace or a group + * lock and this was the last reference, cancel the lock. */ + CDEBUG(D_INFO, "forcing cancel of local or group lock\n"); lock->l_flags |= LDLM_FL_CBPENDING; } @@ -625,16 +625,19 @@ static struct ldlm_lock *search_queue(struct list_head *queue, ldlm_mode_t mode, if (!(lock->l_req_mode & mode)) continue; - if (lock->l_resource->lr_type == LDLM_EXTENT && - (lock->l_policy_data.l_extent.start > - policy->l_extent.start || - lock->l_policy_data.l_extent.end < policy->l_extent.end)) - continue; - - if (lock->l_resource->lr_type == LDLM_EXTENT && - mode == LCK_GROUP && - lock->l_policy_data.l_extent.gid != policy->l_extent.gid) - continue; + if (lock->l_resource->lr_type == LDLM_EXTENT) { + if (mode == LCK_GROUP) { + if (lock->l_policy_data.l_extent.gid != + policy->l_extent.gid) + continue; + } else { + if ((lock->l_policy_data.l_extent.start > + policy->l_extent.start) || + (lock->l_policy_data.l_extent.end < + policy->l_extent.end)) + continue; + } + } /* We match if we have existing lock with same or wider set of bits. */ @@ -1285,11 +1288,13 @@ void ldlm_lock_dump(int level, struct ldlm_lock *lock, int pos) ldlm_lockname[lock->l_granted_mode], atomic_read(&lock->l_refc), lock->l_readers, lock->l_writers); if (lock->l_resource->lr_type == LDLM_EXTENT) - CDEBUG(level, " Extent: "LPU64" -> "LPU64 - " (req "LPU64"-"LPU64")\n", + CDEBUG(level, " Extent: "LPU64" -> "LPU64" gid: "LPU64 + " (req "LPU64"-"LPU64" gid: "LPU64")\n", lock->l_policy_data.l_extent.start, lock->l_policy_data.l_extent.end, - lock->l_req_extent.start, lock->l_req_extent.end); + lock->l_policy_data.l_extent.gid, + lock->l_req_extent.start, lock->l_req_extent.end, + lock->l_req_extent.gid); else if (lock->l_resource->lr_type == LDLM_FLOCK) CDEBUG(level, " Pid: "LPU64" Extent: "LPU64" -> "LPU64"\n", lock->l_policy_data.l_flock.pid, diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index e367cc1..4e32bd4 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -478,7 +478,8 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data) do_gettimeofday(&granted_time); total_enqueue_wait = timeval_sub(&granted_time, &lock->l_enqueued_time); - if (total_enqueue_wait / 1000000 > obd_timeout) + if (((flags & LDLM_FL_NO_TIMEOUT) == 0) && + ((total_enqueue_wait / 1000000) > obd_timeout)) LDLM_ERROR(lock, "enqueue wait took %ldus", total_enqueue_wait); lock_res_and_lock(lock); diff --git a/lustre/ldlm/ldlm_resource.c b/lustre/ldlm/ldlm_resource.c index 3633d87..e3c4011 100644 --- a/lustre/ldlm/ldlm_resource.c +++ b/lustre/ldlm/ldlm_resource.c @@ -708,26 +708,6 @@ void ldlm_resource_add_lock(struct ldlm_resource *res, struct list_head *head, list_add_tail(&lock->l_res_link, head); } -void ldlm_resource_insert_lock_after(struct ldlm_lock *original, - struct ldlm_lock *new) -{ - struct ldlm_resource *res = original->l_resource; - - check_res_locked(res); - - ldlm_resource_dump(D_OTHER, res); - CDEBUG(D_OTHER, "About to insert this lock after %p:\n", original); - ldlm_lock_dump(D_OTHER, new, 0); - - if (new->l_destroyed) { - CDEBUG(D_OTHER, "Lock destroyed, not adding to resource\n"); - return; - } - - LASSERT(list_empty(&new->l_res_link)); - list_add(&new->l_res_link, &original->l_res_link); -} - void ldlm_resource_unlink_lock(struct ldlm_lock *lock) { check_res_locked(lock->l_resource); diff --git a/lustre/lmv/lmv_obd.c b/lustre/lmv/lmv_obd.c index b3d4d68..361a5fe 100644 --- a/lustre/lmv/lmv_obd.c +++ b/lustre/lmv/lmv_obd.c @@ -238,10 +238,9 @@ static int lmv_connect(struct lustre_handle *conn, struct obd_device *obd, } #endif - /* - * all real clients should perform actual connection rightaway, because + /* all real clients should perform actual connection right away, because * it is possible, that LMV will not have opportunity to connect - * targets, as MDC stuff will bit called directly, for instance while + * targets, as MDC stuff will be called directly, for instance while * reading ../mdc/../kbytesfree procfs file, etc. */ if (flags & OBD_OPT_REAL_CLIENT) diff --git a/lustre/lov/lov_obd.c b/lustre/lov/lov_obd.c index cbb114d..8774a94 100644 --- a/lustre/lov/lov_obd.c +++ b/lustre/lov/lov_obd.c @@ -1686,8 +1686,7 @@ static int lov_enqueue(struct obd_export *exp, struct lov_stripe_md *lsm, LASSERT(lov_lockhp); *flags = save_flags; - sub_policy.l_extent.start = req->rq_extent.start; - sub_policy.l_extent.end = req->rq_extent.end; + sub_policy.l_extent = req->rq_extent; rc = obd_enqueue(lov->tgts[req->rq_idx].ltd_exp, req->rq_md, type, &sub_policy, mode, flags, bl_cb, @@ -1731,8 +1730,7 @@ static int lov_match(struct obd_export *exp, struct lov_stripe_md *lsm, lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe; LASSERT(lov_lockhp); - sub_policy.l_extent.start = req->rq_extent.start; - sub_policy.l_extent.end = req->rq_extent.end; + sub_policy.l_extent = req->rq_extent; lov_flags = *flags; rc = obd_match(lov->tgts[req->rq_idx].ltd_exp, req->rq_md, diff --git a/lustre/lov/lov_request.c b/lustre/lov/lov_request.c index f99307c..d5eac9b 100644 --- a/lustre/lov/lov_request.c +++ b/lustre/lov/lov_request.c @@ -315,6 +315,7 @@ int lov_prep_enqueue_set(struct obd_export *exp, struct lov_stripe_md *lsm, req->rq_extent.start = start; req->rq_extent.end = end; + req->rq_extent.gid = policy->l_extent.gid; req->rq_idx = loi->loi_ost_idx; req->rq_gen = loi->loi_ost_gen; @@ -424,6 +425,7 @@ int lov_prep_match_set(struct obd_export *exp, struct lov_stripe_md *lsm, req->rq_extent.start = start; req->rq_extent.end = end; + req->rq_extent.gid = policy->l_extent.gid; req->rq_idx = loi->loi_ost_idx; req->rq_gen = loi->loi_ost_gen; @@ -1324,6 +1326,7 @@ int lov_prep_punch_set(struct obd_export *exp, struct obdo *src_oa, req->rq_extent.start = rs; req->rq_extent.end = re; + req->rq_extent.gid = -1; lov_set_add_req(req, set); } @@ -1409,6 +1412,8 @@ int lov_prep_sync_set(struct obd_export *exp, struct obdo *src_oa, req->rq_oa->o_id = loi->loi_id; req->rq_extent.start = rs; req->rq_extent.end = re; + req->rq_extent.gid = -1; + lov_set_add_req(req, set); } if (!set->set_count) diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index 70ce8a4..b41bbf5 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -2764,11 +2764,7 @@ static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md, { ENTRY; - if (mode == LCK_GROUP) - ldlm_lock_decref_and_cancel(lockh, mode); - else - ldlm_lock_decref(lockh, mode); - + ldlm_lock_decref(lockh, mode); RETURN(0); } -- 1.8.3.1