From b0bb8f62db68627794b9c4c038cd28713deae27f Mon Sep 17 00:00:00 2001 From: green Date: Tue, 28 Jun 2005 13:47:17 +0000 Subject: [PATCH] Branch: b1_4 b=6935 r=adilger Migrate grouplocks support from b_cray to b1_4 --- lustre/include/linux/lustre_dlm.h | 9 +- lustre/include/linux/lustre_idl.h | 5 +- lustre/include/lustre/lustre_user.h | 1 + lustre/ldlm/ldlm_extent.c | 166 +++++++++++++++++++++++++++++++----- lustre/ldlm/ldlm_internal.h | 4 + lustre/ldlm/ldlm_lock.c | 12 ++- lustre/ldlm/ldlm_lockd.c | 7 +- lustre/ldlm/ldlm_request.c | 12 ++- lustre/ldlm/ldlm_resource.c | 23 +++++ lustre/liblustre/file.c | 8 ++ lustre/liblustre/super.c | 4 - lustre/llite/file.c | 81 +++++++++++++++++- lustre/llite/llite_internal.h | 2 + lustre/llite/rw.c | 11 ++- lustre/lov/lov_obd.c | 6 +- lustre/lov/lov_request.c | 4 + lustre/osc/osc_request.c | 5 +- lustre/ptlrpc/pack_generic.c | 2 +- lustre/utils/wiretest.c | 2 +- 19 files changed, 315 insertions(+), 49 deletions(-) diff --git a/lustre/include/linux/lustre_dlm.h b/lustre/include/linux/lustre_dlm.h index b612d93..fcc9a74 100644 --- a/lustre/include/linux/lustre_dlm.h +++ b/lustre/include/linux/lustre_dlm.h @@ -68,6 +68,9 @@ typedef enum { #define LDLM_FL_WARN 0x008000 /* see ldlm_cli_cancel_unused */ #define LDLM_FL_DISCARD_DATA 0x010000 /* discard (no writeback) on cancel */ +#define LDLM_FL_NO_TIMEOUT 0x020000 /* Blocked by CW lock - wait + indefinitely */ + /* file & record locking */ #define LDLM_FL_BLOCK_NOWAIT 0x040000 // server told not to wait if blocked #define LDLM_FL_TEST_LOCK 0x080000 // return blocking lock @@ -110,6 +113,7 @@ typedef enum { #define LCK_COMPAT_CW (LCK_COMPAT_PW | LCK_CW) #define LCK_COMPAT_CR (LCK_COMPAT_CW | LCK_PR | LCK_PW) #define LCK_COMPAT_NL (LCK_COMPAT_CR | LCK_EX) +#define LCK_COMPAT_GROUP (LCK_GROUP | LCK_NL) static ldlm_mode_t lck_compat_array[] = { [LCK_EX] LCK_COMPAT_EX, @@ -117,12 +121,13 @@ static ldlm_mode_t lck_compat_array[] = { [LCK_PR] LCK_COMPAT_PR, [LCK_CW] LCK_COMPAT_CW, [LCK_CR] LCK_COMPAT_CR, - [LCK_NL] LCK_COMPAT_NL + [LCK_NL] LCK_COMPAT_NL, + [LCK_GROUP] LCK_COMPAT_GROUP }; static inline void lockmode_verify(ldlm_mode_t mode) { - LASSERT(mode >= LCK_EX && mode <= LCK_NL); + LASSERT(mode > LCK_MINMODE && mode < LCK_MAXMODE); } static inline int lockmode_compat(ldlm_mode_t exist, ldlm_mode_t new) diff --git a/lustre/include/linux/lustre_idl.h b/lustre/include/linux/lustre_idl.h index d80b7e6..4467897 100644 --- a/lustre/include/linux/lustre_idl.h +++ b/lustre/include/linux/lustre_idl.h @@ -786,13 +786,15 @@ extern void lustre_swab_ldlm_res_id (struct ldlm_res_id *id); /* lock types */ typedef enum { + LCK_MINMODE = 0, LCK_EX = 1, LCK_PW = 2, LCK_PR = 4, LCK_CW = 8, LCK_CR = 16, LCK_NL = 32, -// LCK_GROUP = 64, + LCK_GROUP = 64, + LCK_MAXMODE } ldlm_mode_t; typedef enum { @@ -808,6 +810,7 @@ typedef enum { struct ldlm_extent { __u64 start; __u64 end; + __u64 gid; }; struct ldlm_flock { diff --git a/lustre/include/lustre/lustre_user.h b/lustre/include/lustre/lustre_user.h index 28526dd..9c4b9e5 100644 --- a/lustre/include/lustre/lustre_user.h +++ b/lustre/include/lustre/lustre_user.h @@ -58,6 +58,7 @@ #define LL_FILE_IGNORE_LOCK 0x00000001 #define LL_FILE_GROUP_LOCKED 0x00000002 +#define LL_FILE_READAHEAD 0x00000004 #define LOV_USER_MAGIC_V1 0x0BD10BD0 #define LOV_USER_MAGIC LOV_USER_MAGIC_V1 diff --git a/lustre/ldlm/ldlm_extent.c b/lustre/ldlm/ldlm_extent.c index 1d87959..0cd1240 100644 --- a/lustre/ldlm/ldlm_extent.c +++ b/lustre/ldlm/ldlm_extent.c @@ -147,10 +147,15 @@ static void ldlm_extent_policy(struct ldlm_resource *res, /* Determine if the lock is compatible with all locks on the queue. * We stop walking the queue if we hit ourselves so we don't take - * conflicting locks enqueued after us into accound, or we'd wait forever. */ + * conflicting locks enqueued after us into accound, or we'd wait forever. + * 0 if the lock is not compatible + * 1 if the lock is compatible + * 2 if this group lock is compatible and requires no further checking + * negative error, such as EWOULDBLOCK for group locks + */ static int ldlm_extent_compat_queue(struct list_head *queue, struct ldlm_lock *req, - int send_cbs) + int send_cbs, int *flags, ldlm_error_t *err) { struct list_head *tmp; struct ldlm_lock *lock; @@ -158,6 +163,7 @@ ldlm_extent_compat_queue(struct list_head *queue, struct ldlm_lock *req, __u64 req_start = req->l_req_extent.start; __u64 req_end = req->l_req_extent.end; int compat = 1; + int scan = 0; ENTRY; lockmode_verify(req_mode); @@ -168,14 +174,111 @@ ldlm_extent_compat_queue(struct list_head *queue, struct ldlm_lock *req, if (req == lock) RETURN(compat); + if (unlikely(scan)) { + /* We only get here if we are queuing GROUP lock + and met some incompatible one. The main idea of this + code is to insert GROUP lock past compatible GROUP + lock in the waiting queue or if there is not any, + then in front of first non-GROUP lock */ + if (lock->l_req_mode != LCK_GROUP) { + /* Ok, we hit non-GROUP lock, there should be no + more GROUP locks later on, queue in front of + first non-GROUP lock */ + + ldlm_resource_insert_lock_after(lock, req); + list_del_init(&lock->l_res_link); + ldlm_resource_insert_lock_after(req, lock); + RETURN(0); + } + if (req->l_policy_data.l_extent.gid == + lock->l_policy_data.l_extent.gid) { + /* found it */ + ldlm_resource_insert_lock_after(lock, + req); + RETURN(0); + } + continue; + } + /* locks are compatible, overlap doesn't matter */ - if (lockmode_compat(lock->l_req_mode, req_mode)) + if (lockmode_compat(lock->l_req_mode, req_mode)) { + /* non-group locks are compatible, overlap doesn't + matter */ + if (likely(req_mode != LCK_GROUP)) + continue; + + /* If we are trying to get a GROUP lock and there is + another one of this kind, we need to compare gid */ + if (req->l_policy_data.l_extent.gid == + lock->l_policy_data.l_extent.gid) { + /* If existing lock with matched gid is granted, + we grant new one too. */ + if (lock->l_req_mode == lock->l_granted_mode) + RETURN(2); + + /* Otherwise we are scanning queue of waiting + * locks and it means current request would + * block along with existing lock (that is + * already blocked. + * If we are in nonblocking mode - return + * immediately */ + if (*flags & LDLM_FL_BLOCK_NOWAIT) { + compat = -EWOULDBLOCK; + goto destroylock; + } + /* If this group lock is compatible with another + * group lock on the waiting list, they must be + * together in the list, so they can be granted + * at the same time. Otherwise the later lock + * can get stuck behind another, incompatible, + * lock. */ + ldlm_resource_insert_lock_after(lock, req); + /* Because 'lock' is not granted, we can stop + * processing this queue and return immediately. + * There is no need to check the rest of the + * list. */ + RETURN(0); + } + } + + if (unlikely(req_mode == LCK_GROUP && + (lock->l_req_mode != lock->l_granted_mode))) { + scan = 1; + compat = 0; + if (lock->l_req_mode != LCK_GROUP) { + /* Ok, we hit non-GROUP lock, there should be no + more GROUP locks later on, queue in front of + first non-GROUP lock */ + + ldlm_resource_insert_lock_after(lock, req); + list_del_init(&lock->l_res_link); + ldlm_resource_insert_lock_after(req, lock); + RETURN(0); + } + if (req->l_policy_data.l_extent.gid == + lock->l_policy_data.l_extent.gid) { + /* found it */ + ldlm_resource_insert_lock_after(lock, req); + RETURN(0); + } continue; + } - /* if lock doesn't overlap skip it */ - if (lock->l_policy_data.l_extent.end < req_start || - lock->l_policy_data.l_extent.start > req_end) + if (unlikely(lock->l_req_mode == LCK_GROUP)) { + /* If compared lock is GROUP,then requested is PR/PW/ => + * this is not compatible; extent range does not + * matter */ + if (*flags & LDLM_FL_BLOCK_NOWAIT) { + compat = -EWOULDBLOCK; + goto destroylock; + } else { + *flags |= LDLM_FL_NO_TIMEOUT; + } + } else if (lock->l_policy_data.l_extent.end < req_start || + lock->l_policy_data.l_extent.start > req_end) { + /* if a non group lock doesn't overlap skip it */ continue; + } if (!send_cbs) RETURN(0); @@ -185,6 +288,11 @@ ldlm_extent_compat_queue(struct list_head *queue, struct ldlm_lock *req, ldlm_add_ast_work_item(lock, req, NULL, 0); } + return(compat); +destroylock: + list_del_init(&req->l_res_link); + ldlm_lock_destroy(req); + *err = compat; RETURN(compat); } @@ -202,18 +310,21 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq, { struct ldlm_resource *res = lock->l_resource; struct list_head rpc_list = LIST_HEAD_INIT(rpc_list); - int rc; + int rc, rc2; ENTRY; LASSERT(list_empty(&res->lr_converting)); + *err = ELDLM_OK; if (!first_enq) { LASSERT(res->lr_tmp != NULL); - rc = ldlm_extent_compat_queue(&res->lr_granted, lock, 0); - if (!rc) - RETURN(LDLM_ITER_STOP); - rc = ldlm_extent_compat_queue(&res->lr_waiting, lock, 0); - if (!rc) + rc = ldlm_extent_compat_queue(&res->lr_granted, lock, 0, flags, + err); + if (rc == 1) { + rc = ldlm_extent_compat_queue(&res->lr_waiting, lock, 0, + flags, err); + } + if (rc == 0) RETURN(LDLM_ITER_STOP); ldlm_resource_unlink_lock(lock); @@ -226,12 +337,26 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq, restart: LASSERT(res->lr_tmp == NULL); res->lr_tmp = &rpc_list; - rc = ldlm_extent_compat_queue(&res->lr_granted, lock, 1); - rc += ldlm_extent_compat_queue(&res->lr_waiting, lock, 1); + rc = ldlm_extent_compat_queue(&res->lr_granted, lock, 1, flags, err); + if (rc < 0) + GOTO(out, rc); /* lock was destroyed */ + if (rc == 2) { + res->lr_tmp = NULL; + goto grant; + } + + rc2 = ldlm_extent_compat_queue(&res->lr_waiting, lock, 1, flags, err); + if (rc2 < 0) + GOTO(out, rc = rc2); /* lock was destroyed */ res->lr_tmp = NULL; - if (rc != 2) { - /* If either of the compat_queue()s returned 0, then we + if (rc + rc2 == 2) { + grant: + ldlm_extent_policy(res, lock, flags); + ldlm_resource_unlink_lock(lock); + ldlm_grant_lock(lock, NULL, 0, 0); + } else { + /* If either of the compat_queue()s returned failure, then we * have ASTs to send and must go onto the waiting list. * * bug 2322: we used to unlink and re-add here, which was a @@ -245,12 +370,11 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq, if (rc == -ERESTART) GOTO(restart, -ERESTART); *flags |= LDLM_FL_BLOCK_GRANTED; - } else { - ldlm_extent_policy(res, lock, flags); - ldlm_resource_unlink_lock(lock); - ldlm_grant_lock(lock, NULL, 0, 0); } - RETURN(0); + rc = 0; +out: + res->lr_tmp = NULL; + RETURN(rc); } /* When a lock is cancelled by a client, the KMS may undergo change if this diff --git a/lustre/ldlm/ldlm_internal.h b/lustre/ldlm/ldlm_internal.h index ccaafdc..2967ab8 100644 --- a/lustre/ldlm/ldlm_internal.h +++ b/lustre/ldlm/ldlm_internal.h @@ -10,6 +10,10 @@ typedef enum { int ldlm_cancel_lru(struct ldlm_namespace *ns, ldlm_sync_t sync); +/* ldlm_resource.c */ +void ldlm_resource_insert_lock_after(struct ldlm_lock *original, + struct ldlm_lock *new); + /* ldlm_lock.c */ void ldlm_grant_lock(struct ldlm_lock *lock, void *data, int datalen, int run_ast); diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index 8f73b2a..2cee31a 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -45,7 +45,8 @@ char *ldlm_lockname[] = { [LCK_PR] "PR", [LCK_CW] "CW", [LCK_CR] "CR", - [LCK_NL] "NL" + [LCK_NL] "NL", + [LCK_GROUP] "GROUP" }; char *ldlm_typename[] = { [LDLM_PLAIN] "PLN", @@ -437,7 +438,7 @@ void ldlm_lock_addref_internal(struct ldlm_lock *lock, __u32 mode) ldlm_lock_remove_from_lru(lock); if (mode & (LCK_NL | LCK_CR | LCK_PR)) lock->l_readers++; - if (mode & (LCK_EX | LCK_CW | LCK_PW)) + if (mode & (LCK_EX | LCK_CW | LCK_PW | LCK_GROUP)) lock->l_writers++; lock->l_last_used = jiffies; LDLM_LOCK_GET(lock); @@ -457,7 +458,7 @@ void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode) LASSERT(lock->l_readers > 0); lock->l_readers--; } - if (mode & (LCK_EX | LCK_CW | LCK_PW)) { + if (mode & (LCK_EX | LCK_CW | LCK_PW | LCK_GROUP)) { LASSERT(lock->l_writers > 0); lock->l_writers--; } @@ -592,6 +593,11 @@ static struct ldlm_lock *search_queue(struct list_head *queue, ldlm_mode_t mode, lock->l_policy_data.l_extent.end < policy->l_extent.end)) continue; + if (lock->l_resource->lr_type == LDLM_EXTENT && + mode == LCK_GROUP && + lock->l_policy_data.l_extent.gid != policy->l_extent.gid) + continue; + if (lock->l_destroyed || (lock->l_flags & LDLM_FL_FAILED)) continue; diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index 0738c73..8effb0c 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -196,7 +196,8 @@ static void waiting_locks_callback(unsigned long unused) lock = list_entry(waiting_locks_list.next, struct ldlm_lock, l_pending_chain); - if (time_after(lock->l_callback_timeout, jiffies)) + if (time_after(lock->l_callback_timeout, jiffies) || + (lock->l_req_mode == LCK_GROUP)) break; LDLM_ERROR(lock, "lock callback timer expired: evicting client " @@ -672,8 +673,8 @@ int ldlm_handle_enqueue(struct ptlrpc_request *req, GOTO(out, rc = -EFAULT); } - if (dlm_req->lock_desc.l_req_mode < LCK_EX || - dlm_req->lock_desc.l_req_mode > LCK_NL || + if (dlm_req->lock_desc.l_req_mode <= LCK_MINMODE || + dlm_req->lock_desc.l_req_mode >= LCK_MAXMODE || dlm_req->lock_desc.l_req_mode & (dlm_req->lock_desc.l_req_mode-1)) { DEBUG_REQ(D_ERROR, req, "invalid lock request mode %d\n", dlm_req->lock_desc.l_req_mode); diff --git a/lustre/ldlm/ldlm_request.c b/lustre/ldlm/ldlm_request.c index 83055cb..d15a258 100644 --- a/lustre/ldlm/ldlm_request.c +++ b/lustre/ldlm/ldlm_request.c @@ -110,8 +110,16 @@ noreproc: lwd.lwd_lock = lock; - lwi = LWI_TIMEOUT_INTR(obd_timeout * HZ, ldlm_expired_completion_wait, - interrupted_completion_wait, &lwd); + if (unlikely(flags & LDLM_FL_NO_TIMEOUT)) { + LDLM_DEBUG(lock, "waiting indefinitely because CW lock was" + " met\n"); + lwi = LWI_INTR(interrupted_completion_wait, &lwd); + } else { + lwi = LWI_TIMEOUT_INTR(obd_timeout * HZ, + ldlm_expired_completion_wait, + interrupted_completion_wait, &lwd); + } + if (imp != NULL) { spin_lock_irqsave(&imp->imp_lock, irqflags); lwd.lwd_generation = imp->imp_generation; diff --git a/lustre/ldlm/ldlm_resource.c b/lustre/ldlm/ldlm_resource.c index 2738587..9a5922b 100644 --- a/lustre/ldlm/ldlm_resource.c +++ b/lustre/ldlm/ldlm_resource.c @@ -650,6 +650,29 @@ void ldlm_resource_add_lock(struct ldlm_resource *res, struct list_head *head, l_unlock(&res->lr_namespace->ns_lock); } +void ldlm_resource_insert_lock_after(struct ldlm_lock *original, + struct ldlm_lock *new) +{ + struct ldlm_resource *res = original->l_resource; + + l_lock(&res->lr_namespace->ns_lock); + + ldlm_resource_dump(D_OTHER, res); + CDEBUG(D_OTHER, "About to insert this lock after %p:\n", original); + ldlm_lock_dump(D_OTHER, new, 0); + + if (new->l_destroyed) { + CDEBUG(D_OTHER, "Lock destroyed, not adding to resource\n"); + goto out; + } + + LASSERT(list_empty(&new->l_res_link)); + + list_add(&new->l_res_link, &original->l_res_link); + out: + l_unlock(&res->lr_namespace->ns_lock); +} + void ldlm_resource_unlink_lock(struct ldlm_lock *lock) { l_lock(&lock->l_resource->lr_namespace->ns_lock); diff --git a/lustre/liblustre/file.c b/lustre/liblustre/file.c index 4df3b65..b7f4b55 100644 --- a/lustre/liblustre/file.c +++ b/lustre/liblustre/file.c @@ -285,6 +285,14 @@ int llu_mdc_close(struct obd_export *mdc_exp, struct inode *inode) int rc, valid; ENTRY; + /* clear group lock, if present */ + if (fd->fd_flags & LL_FILE_GROUP_LOCKED) { + struct lov_stripe_md *lsm = llu_i2info(inode)->lli_smd; + fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK); + rc = llu_extent_unlock(fd, inode, lsm, LCK_GROUP, + &fd->fd_cwlockh); + } + obdo.o_id = st->st_ino; obdo.o_valid = OBD_MD_FLID; valid = OBD_MD_FLTYPE | OBD_MD_FLMODE | OBD_MD_FLSIZE |OBD_MD_FLBLOCKS | diff --git a/lustre/liblustre/super.c b/lustre/liblustre/super.c index 51a35f2..5dcd3b8 100644 --- a/lustre/liblustre/super.c +++ b/lustre/liblustre/super.c @@ -1448,7 +1448,6 @@ static int llu_iop_fcntl(struct inode *ino, int cmd, va_list ap, int *rtn) return ENOSYS; } -#if 0 static int llu_get_grouplock(struct inode *inode, unsigned long arg) { struct llu_inode_info *lli = llu_i2info(inode); @@ -1506,7 +1505,6 @@ static int llu_put_grouplock(struct inode *inode, unsigned long arg) RETURN(0); } -#endif static int llu_iop_ioctl(struct inode *ino, unsigned long int request, va_list ap) @@ -1515,7 +1513,6 @@ static int llu_iop_ioctl(struct inode *ino, unsigned long int request, liblustre_wait_event(0); switch (request) { -#if 0 unsigned long arg; case LL_IOC_GROUP_LOCK: arg = va_arg(ap, unsigned long); @@ -1523,7 +1520,6 @@ static int llu_iop_ioctl(struct inode *ino, unsigned long int request, case LL_IOC_GROUP_UNLOCK: arg = va_arg(ap, unsigned long); return llu_put_grouplock(ino, arg); -#endif } CERROR("did not support ioctl cmd %lx\n", request); diff --git a/lustre/llite/file.c b/lustre/llite/file.c index 0e72f5a..856079f 100644 --- a/lustre/llite/file.c +++ b/lustre/llite/file.c @@ -42,6 +42,14 @@ int ll_mdc_close(struct obd_export *mdc_exp, struct inode *inode, int rc; ENTRY; + /* clear group lock, if present */ + if (unlikely(fd->fd_flags & LL_FILE_GROUP_LOCKED)) { + struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd; + fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK); + rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP, + &fd->fd_cwlockh); + } + obdo.o_id = inode->i_ino; obdo.o_valid = OBD_MD_FLID; obdo_from_inode(&obdo, inode, OBD_MD_FLTYPE | OBD_MD_FLMODE | @@ -1087,6 +1095,66 @@ static int ll_lov_getstripe(struct inode *inode, unsigned long arg) (void *)arg); } +static int ll_get_grouplock(struct inode *inode, struct file *file, + unsigned long arg) +{ + struct ll_file_data *fd = file->private_data; + ldlm_policy_data_t policy = { .l_extent = { .start = 0, + .end = OBD_OBJECT_EOF}}; + struct lustre_handle lockh = { 0 }; + struct ll_inode_info *lli = ll_i2info(inode); + struct lov_stripe_md *lsm = lli->lli_smd; + int flags = 0, rc; + ENTRY; + + if (fd->fd_flags & LL_FILE_GROUP_LOCKED) { + RETURN(-EINVAL); + } + + policy.l_extent.gid = arg; + if (file->f_flags & O_NONBLOCK) + flags = LDLM_FL_BLOCK_NOWAIT; + + rc = ll_extent_lock(fd, inode, lsm, LCK_GROUP, &policy, &lockh, flags); + if (rc) + RETURN(rc); + + fd->fd_flags |= LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK; + fd->fd_gid = arg; + memcpy(&fd->fd_cwlockh, &lockh, sizeof(lockh)); + + RETURN(0); +} + +static int ll_put_grouplock(struct inode *inode, struct file *file, + unsigned long arg) +{ + struct ll_file_data *fd = file->private_data; + struct ll_inode_info *lli = ll_i2info(inode); + struct lov_stripe_md *lsm = lli->lli_smd; + int rc; + ENTRY; + + if (!(fd->fd_flags & LL_FILE_GROUP_LOCKED)) { + /* Ugh, it's already unlocked. */ + RETURN(-EINVAL); + } + + if (fd->fd_gid != arg) /* Ugh? Unlocking with different gid? */ + RETURN(-EINVAL); + + fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK); + + rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP, &fd->fd_cwlockh); + if (rc) + RETURN(rc); + + fd->fd_gid = 0; + memset(&fd->fd_cwlockh, 0, sizeof(fd->fd_cwlockh)); + + RETURN(0); +} + int ll_file_ioctl(struct inode *inode, struct file *file, unsigned int cmd, unsigned long arg) { @@ -1134,6 +1202,11 @@ int ll_file_ioctl(struct inode *inode, struct file *file, unsigned int cmd, case EXT3_IOC_GETVERSION_OLD: case EXT3_IOC_GETVERSION: RETURN(put_user(inode->i_generation, (int *) arg)); + case LL_IOC_GROUP_LOCK: + RETURN(ll_get_grouplock(inode, file, arg)); + case LL_IOC_GROUP_UNLOCK: + RETURN(ll_put_grouplock(inode, file, arg)); + /* We need to special case any other ioctls we want to handle, * to send them to the MDS/OST as appropriate and to properly * network encode the arg field. @@ -1164,9 +1237,13 @@ loff_t ll_file_seek(struct file *file, loff_t offset, int origin) if (origin == 2) { /* SEEK_END */ ldlm_policy_data_t policy = { .l_extent = {0, OBD_OBJECT_EOF }}; struct ll_inode_info *lli = ll_i2info(inode); - int rc; + int nonblock = 0, rc; + + if (file->f_flags & O_NONBLOCK) + nonblock = LDLM_FL_BLOCK_NOWAIT; - rc = ll_extent_lock(fd, inode, lsm, LCK_PR, &policy, &lockh,0); + rc = ll_extent_lock(fd, inode, lsm, LCK_PR, &policy, &lockh, + nonblock); if (rc != 0) RETURN(rc); diff --git a/lustre/llite/llite_internal.h b/lustre/llite/llite_internal.h index 829bd02..9e91555 100644 --- a/lustre/llite/llite_internal.h +++ b/lustre/llite/llite_internal.h @@ -163,6 +163,8 @@ struct ll_file_data { struct obd_client_handle fd_mds_och; struct ll_readahead_state fd_ras; __u32 fd_flags; + struct lustre_handle fd_cwlockh; + unsigned long fd_gid; }; struct lov_stripe_md; diff --git a/lustre/llite/rw.c b/lustre/llite/rw.c index f912850..fd8781f 100644 --- a/lustre/llite/rw.c +++ b/lustre/llite/rw.c @@ -878,7 +878,7 @@ void ll_removepage(struct page *page) EXIT; } -static int ll_page_matches(struct page *page, int readahead) +static int ll_page_matches(struct page *page, int fd_flags) { struct lustre_handle match_lockh = {0}; struct inode *inode = page->mapping->host; @@ -886,11 +886,14 @@ static int ll_page_matches(struct page *page, int readahead) int flags, matches; ENTRY; + if (unlikely(fd_flags & LL_FILE_GROUP_LOCKED)) + RETURN(1); + page_extent.l_extent.start = (__u64)page->index << PAGE_CACHE_SHIFT; page_extent.l_extent.end = page_extent.l_extent.start + PAGE_CACHE_SIZE - 1; flags = LDLM_FL_TEST_LOCK; - if (!readahead) + if (!(fd_flags&LL_FILE_READAHEAD)) flags |= LDLM_FL_CBPENDING | LDLM_FL_BLOCK_GRANTED; matches = obd_match(ll_i2sbi(inode)->ll_osc_exp, ll_i2info(inode)->lli_smd, LDLM_EXTENT, @@ -1032,7 +1035,7 @@ static int ll_readahead(struct ll_readahead_state *ras, goto next_page; /* bail when we hit the end of the lock. */ - if ((rc = ll_page_matches(page, 1)) <= 0) { + if ((rc = ll_page_matches(page, flags|LL_FILE_READAHEAD)) <= 0) { LL_CDEBUG_PAGE(D_READA | D_PAGE, page, "lock match failed: rc %d\n", rc); ll_ra_stats_inc(mapping, RA_STAT_FAILED_MATCH); @@ -1267,7 +1270,7 @@ int ll_readpage(struct file *filp, struct page *page) GOTO(out_oig, rc = 0); } - rc = ll_page_matches(page, 0); + rc = ll_page_matches(page, fd->fd_flags); if (rc < 0) { LL_CDEBUG_PAGE(D_ERROR, page, "lock match failed: rc %d\n", rc); GOTO(out, rc); diff --git a/lustre/lov/lov_obd.c b/lustre/lov/lov_obd.c index dd4dd30..5e28f4b 100644 --- a/lustre/lov/lov_obd.c +++ b/lustre/lov/lov_obd.c @@ -1560,8 +1560,7 @@ static int lov_enqueue(struct obd_export *exp, struct lov_stripe_md *lsm, LASSERT(lov_lockhp); *flags = save_flags; - sub_policy.l_extent.start = req->rq_extent.start; - sub_policy.l_extent.end = req->rq_extent.end; + sub_policy.l_extent = req->rq_extent; rc = obd_enqueue(lov->tgts[req->rq_idx].ltd_exp, req->rq_md, type, &sub_policy, mode, flags, bl_cb, @@ -1604,9 +1603,8 @@ static int lov_match(struct obd_export *exp, struct lov_stripe_md *lsm, lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe; LASSERT(lov_lockhp); - sub_policy.l_extent.start = req->rq_extent.start; - sub_policy.l_extent.end = req->rq_extent.end; lov_flags = *flags; + sub_policy.l_extent = req->rq_extent; rc = obd_match(lov->tgts[req->rq_idx].ltd_exp, req->rq_md, type, &sub_policy, mode, &lov_flags, data, diff --git a/lustre/lov/lov_request.c b/lustre/lov/lov_request.c index 1ac7190..4477213 100644 --- a/lustre/lov/lov_request.c +++ b/lustre/lov/lov_request.c @@ -277,6 +277,7 @@ int lov_prep_enqueue_set(struct obd_export *exp, struct lov_stripe_md *lsm, req->rq_extent.start = start; req->rq_extent.end = end; + req->rq_extent.gid = policy->l_extent.gid; req->rq_idx = loi->loi_ost_idx; req->rq_stripe = i; @@ -384,6 +385,7 @@ int lov_prep_match_set(struct obd_export *exp, struct lov_stripe_md *lsm, req->rq_extent.start = start; req->rq_extent.end = end; + req->rq_extent.gid = policy->l_extent.gid; req->rq_idx = loi->loi_ost_idx; req->rq_stripe = i; @@ -1203,6 +1205,7 @@ int lov_prep_punch_set(struct obd_export *exp, struct obdo *src_oa, req->rq_extent.start = rs; req->rq_extent.end = re; + req->rq_extent.gid = -1; lov_set_add_req(req, set); } @@ -1281,6 +1284,7 @@ int lov_prep_sync_set(struct obd_export *exp, struct obdo *src_oa, req->rq_extent.start = rs; req->rq_extent.end = re; + req->rq_extent.gid = -1; lov_set_add_req(req, set); } diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index 993c2c5..7bdf0de 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -2737,7 +2737,10 @@ static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md, { ENTRY; - ldlm_lock_decref(lockh, mode); + if (mode == LCK_GROUP) + ldlm_lock_decref_and_cancel(lockh, mode); + else + ldlm_lock_decref(lockh, mode); RETURN(0); } diff --git a/lustre/ptlrpc/pack_generic.c b/lustre/ptlrpc/pack_generic.c index a20932f..03bbec0 100644 --- a/lustre/ptlrpc/pack_generic.c +++ b/lustre/ptlrpc/pack_generic.c @@ -1847,7 +1847,7 @@ void lustre_assert_wire_constants(void) (long long)(int)sizeof(((struct ldlm_res_id *)0)->name[4])); /* Checks for struct ldlm_extent */ - LASSERTF((int)sizeof(struct ldlm_extent) == 16, " found %lld\n", + LASSERTF((int)sizeof(struct ldlm_extent) == 24, " found %lld\n", (long long)(int)sizeof(struct ldlm_extent)); LASSERTF((int)offsetof(struct ldlm_extent, start) == 0, " found %lld\n", (long long)(int)offsetof(struct ldlm_extent, start)); diff --git a/lustre/utils/wiretest.c b/lustre/utils/wiretest.c index 454a585..f802c1a 100644 --- a/lustre/utils/wiretest.c +++ b/lustre/utils/wiretest.c @@ -1108,7 +1108,7 @@ void lustre_assert_wire_constants(void) (long long)(int)sizeof(((struct ldlm_res_id *)0)->name[4])); /* Checks for struct ldlm_extent */ - LASSERTF((int)sizeof(struct ldlm_extent) == 16, " found %lld\n", + LASSERTF((int)sizeof(struct ldlm_extent) == 24, " found %lld\n", (long long)(int)sizeof(struct ldlm_extent)); LASSERTF((int)offsetof(struct ldlm_extent, start) == 0, " found %lld\n", (long long)(int)offsetof(struct ldlm_extent, start)); -- 1.8.3.1