From 2e485f943e66f2ee467e7ebcf2e151a319fc0639 Mon Sep 17 00:00:00 2001 From: dmilos Date: Sun, 21 Sep 2003 16:31:00 +0000 Subject: [PATCH] Fixes to pass Posix record locking tests. --- lustre/ldlm/ldlm_flock.c | 117 ++++++++++++++++++++++++++++------------------- 1 file changed, 69 insertions(+), 48 deletions(-) diff --git a/lustre/ldlm/ldlm_flock.c b/lustre/ldlm/ldlm_flock.c index 3f8d079..fd29f47 100644 --- a/lustre/ldlm/ldlm_flock.c +++ b/lustre/ldlm/ldlm_flock.c @@ -71,6 +71,9 @@ ldlm_flock_destroy(struct ldlm_lock *lock, ldlm_mode_t mode, int flags) { ENTRY; + LDLM_DEBUG(lock, "ldlm_flock_destroy(mode: %d, flags: 0x%x)", + mode, flags); + list_del_init(&lock->l_res_link); if (flags == LDLM_FL_WAIT_NOREPROC) { /* client side - set a flag to prevent sending a CANCEL */ @@ -171,11 +174,7 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq, *err = -EAGAIN; RETURN(LDLM_ITER_STOP); } - if (ldlm_flock_deadlock(req, lock)) { - ldlm_flock_destroy(req, mode, *flags); - *err = -EDEADLK; - RETURN(LDLM_ITER_STOP); - } + if (*flags & LDLM_FL_TEST_LOCK) { ldlm_flock_destroy(req, mode, *flags); req->l_req_mode = lock->l_granted_mode; @@ -189,6 +188,12 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq, RETURN(LDLM_ITER_STOP); } + if (ldlm_flock_deadlock(req, lock)) { + ldlm_flock_destroy(req, mode, *flags); + *err = -EDEADLK; + RETURN(LDLM_ITER_STOP); + } + req->l_policy_data.l_flock.blocking_pid = lock->l_policy_data.l_flock.pid; req->l_policy_data.l_flock.blocking_export = @@ -223,29 +228,37 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq, break; if (lock->l_granted_mode == mode) { - if (lock->l_policy_data.l_flock.end < - (new->l_policy_data.l_flock.start - 1)) + /* If the modes are the same then we need to process + * locks that overlap OR adjoin the new lock. The extra + * logic condition is necessary to deal with arithmetic + * overflow and underflow. */ + if ((new->l_policy_data.l_flock.start > + (lock->l_policy_data.l_flock.end + 1)) + && (lock->l_policy_data.l_flock.end != ~0)) continue; - if (lock->l_policy_data.l_flock.start > - (new->l_policy_data.l_flock.end + 1)) + if ((new->l_policy_data.l_flock.end < + (lock->l_policy_data.l_flock.start - 1)) + && (lock->l_policy_data.l_flock.start != 0)) break; - if (lock->l_policy_data.l_flock.start > - new->l_policy_data.l_flock.start) + if (new->l_policy_data.l_flock.start < + lock->l_policy_data.l_flock.start) { lock->l_policy_data.l_flock.start = new->l_policy_data.l_flock.start; - else + } else { new->l_policy_data.l_flock.start = lock->l_policy_data.l_flock.start; + } - if (lock->l_policy_data.l_flock.end < - new->l_policy_data.l_flock.end) + if (new->l_policy_data.l_flock.end > + lock->l_policy_data.l_flock.end) { lock->l_policy_data.l_flock.end = new->l_policy_data.l_flock.end; - else + } else { new->l_policy_data.l_flock.end = lock->l_policy_data.l_flock.end; + } if (added) { ldlm_flock_destroy(lock, mode, *flags); @@ -256,11 +269,12 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq, continue; } - if (lock->l_policy_data.l_flock.end < - new->l_policy_data.l_flock.start) + if (new->l_policy_data.l_flock.start > + lock->l_policy_data.l_flock.end) continue; - if (lock->l_policy_data.l_flock.start > - new->l_policy_data.l_flock.end) + + if (new->l_policy_data.l_flock.end < + lock->l_policy_data.l_flock.start) break; ++overlaps; @@ -272,17 +286,8 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq, lock->l_policy_data.l_flock.start = new->l_policy_data.l_flock.end + 1; break; - } else if (added) { - ldlm_flock_destroy(lock, lock->l_req_mode, - *flags); - } else { - lock->l_policy_data.l_flock.start = - new->l_policy_data.l_flock.start; - lock->l_policy_data.l_flock.end = - new->l_policy_data.l_flock.end; - new = lock; - added = 1; } + ldlm_flock_destroy(lock, lock->l_req_mode, *flags); continue; } if (new->l_policy_data.l_flock.end >= @@ -336,20 +341,19 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq, break; } - if (added) { - ldlm_flock_destroy(req, mode, *flags); - } else { - /* insert new after ownlocks */ - new->l_granted_mode = new->l_req_mode; - list_del_init(&new->l_res_link); - ldlm_resource_add_lock(res, ownlocks, new); + /* Add req to the granted queue before calling ldlm_reprocess_all(). */ + if (!added) { + req->l_granted_mode = req->l_req_mode; + list_del_init(&req->l_res_link); + /* insert new lock before ownlocks in list. */ + ldlm_resource_add_lock(res, ownlocks, req); } if (*flags != LDLM_FL_WAIT_NOREPROC) { if (first_enq) { /* The only problem with doing the reprocessing here * is that the completion ASTs for newly granted locks - * will * be sent before the unlock completion is sent. + * will be sent before the unlock completion is sent. * It shouldn't be an issue. Also note that * ldlm_process_flock_lock() will recurse, but only * once because there can't be unlock requests on the @@ -362,26 +366,44 @@ ldlm_process_flock_lock(struct ldlm_lock *req, int *flags, int first_enq, } } + /* In case we're reprocessing the requested lock we can't destroy + * it until after calling ldlm_ast_work_item() above so that lawi() + * can bump the reference count on req. Otherwise req could be freed + * before the completion AST can be sent. */ + if (added) + ldlm_flock_destroy(req, mode, *flags); + ldlm_resource_dump(res); RETURN(LDLM_ITER_CONTINUE); } -static void -interrupted_flock_completion_wait(void *data) -{ -} - -struct flock_wait_data { +struct ldlm_flock_wait_data { struct ldlm_lock *fwd_lock; int fwd_generation; }; +static void +ldlm_flock_interrupted_wait(void *data) +{ + struct ldlm_lock *lock; + struct lustre_handle lockh; + int rc; + ENTRY; + + lock = ((struct ldlm_flock_wait_data *)data)->fwd_lock; + ldlm_lock_decref_internal(lock, lock->l_req_mode); + ldlm_lock2handle(lock, &lockh); + rc = ldlm_cli_cancel(&lockh); + CDEBUG(D_DLMTRACE, "ldlm_cli_cancel: %d\n", rc); + EXIT; +} + int ldlm_flock_completion_ast(struct ldlm_lock *lock, int flags, void *data) { struct ldlm_namespace *ns; struct file_lock *getlk = lock->l_ast_data; - struct flock_wait_data fwd; + struct ldlm_flock_wait_data fwd; unsigned long irqflags; struct obd_device *obd; struct obd_import *imp = NULL; @@ -422,22 +444,21 @@ ldlm_flock_completion_ast(struct ldlm_lock *lock, int flags, void *data) spin_unlock_irqrestore(&imp->imp_lock, irqflags); } - lwi = LWI_TIMEOUT_INTR(0,NULL,interrupted_flock_completion_wait,&fwd); + lwi = LWI_TIMEOUT_INTR(0,NULL,ldlm_flock_interrupted_wait,&fwd); /* Go to sleep until the lock is granted. */ rc = l_wait_event(lock->l_waitq, ((lock->l_req_mode == lock->l_granted_mode) || lock->l_destroyed), &lwi); - LASSERT(!(lock->l_destroyed)); - if (rc) { LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)", rc); - /* XXX - need to cancel flock request on server */ RETURN(rc); } + LASSERT(!(lock->l_destroyed)); + granted: LDLM_DEBUG(lock, "client-side enqueue waking up"); -- 1.8.3.1