implement Group Locks for CLIO.
b=18884
r=jay
r=wangdi
*/
CLM_PHANTOM,
CLM_READ,
- CLM_WRITE
+ CLM_WRITE,
+ CLM_GROUP
};
/**
pgoff_t cld_start;
/** Index of the last page (inclusive) protected by this lock. */
pgoff_t cld_end;
+ /** Group ID, for group lock */
+ __u64 cld_gid;
/** Lock mode. */
enum cl_lock_mode cld_mode;
};
*
* - glimpse. An io context to acquire glimpse lock.
*
+ * - grouplock. An io context to acquire group lock.
+ *
* CIT_MISC io is used simply as a context in which locks and pages
* are manipulated. Such io has no internal "process", that is,
* cl_io_loop() is never called for it.
struct cl_lockset ci_lockset;
/** lock requirements, this is just a help info for sublayers. */
enum cl_io_lock_dmd ci_lockreq;
+ /**
+ * This io has held grouplock, to inform sublayers that
+ * don't do lockless i/o.
+ */
+ int ci_no_srvlock;
union {
struct cl_rd_io {
struct cl_io_rw_common rd;
struct obd_device *watched,
enum obd_notify_event ev, void *owner);
+struct ccc_grouplock {
+ struct lu_env *cg_env;
+ struct cl_lock *cg_lock;
+ unsigned long cg_gid;
+};
+
+int cl_get_grouplock(struct cl_object *obj, unsigned long gid, int nonblock,
+ struct ccc_grouplock *cg);
+void cl_put_grouplock(struct ccc_grouplock *cg);
+
#endif /*LCLIENT_H */
#include <obd_class.h>
#include <obd_support.h>
#include <obd.h>
+#include <cl_object.h>
+#include <lclient.h>
#include <lustre_lite.h>
}
RETURN(result);
}
+
+#define GROUPLOCK_SCOPE "grouplock"
+
+int cl_get_grouplock(struct cl_object *obj, unsigned long gid, int nonblock,
+ struct ccc_grouplock *cg)
+{
+ struct lu_env *env;
+ struct cl_io *io;
+ struct cl_lock *lock;
+ struct cl_lock_descr *descr;
+ __u32 enqflags;
+ int refcheck;
+ int rc;
+
+ env = cl_env_get(&refcheck);
+ if (IS_ERR(env))
+ return PTR_ERR(env);
+
+ io = &ccc_env_info(env)->cti_io;
+ io->ci_obj = obj;
+
+ rc = cl_io_init(env, io, CIT_MISC, io->ci_obj);
+ if (rc) {
+ LASSERT(rc < 0);
+ cl_env_put(env, &refcheck);
+ return rc;
+ }
+
+ descr = &ccc_env_info(env)->cti_descr;
+ descr->cld_obj = obj;
+ descr->cld_start = 0;
+ descr->cld_end = CL_PAGE_EOF;
+ descr->cld_gid = gid;
+ descr->cld_mode = CLM_GROUP;
+
+ enqflags = CEF_MUST | (nonblock ? CEF_NONBLOCK : 0);
+ lock = cl_lock_request(env, io, descr, enqflags,
+ GROUPLOCK_SCOPE, cfs_current());
+ if (IS_ERR(lock)) {
+ cl_io_fini(env, io);
+ cl_env_put(env, &refcheck);
+ return PTR_ERR(lock);
+ }
+
+ cg->cg_env = cl_env_get(&refcheck);
+ cg->cg_lock = lock;
+ cg->cg_gid = gid;
+ LASSERT(cg->cg_env == env);
+
+ cl_env_unplant(env, &refcheck);
+ return 0;
+}
+
+void cl_put_grouplock(struct ccc_grouplock *cg)
+{
+ struct lu_env *env = cg->cg_env;
+ struct cl_lock *lock = cg->cg_lock;
+ int refcheck;
+
+ LASSERT(cg->cg_env);
+ LASSERT(cg->cg_gid);
+
+ cl_env_implant(env, &refcheck);
+ cl_env_put(env, &refcheck);
+
+ cl_unuse(env, lock);
+ cl_lock_release(env, lock, GROUPLOCK_SCOPE, cfs_current());
+ cl_io_fini(env, &ccc_env_info(env)->cti_io);
+ cl_env_put(env, NULL);
+}
+
ENTRY;
/* clear group lock, if present */
- if (unlikely(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
-#if 0 /* XXX */
- struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
- fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
- rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP,
- &fd->fd_cwlockh);
-#endif
- }
+ if (unlikely(fd->fd_flags & LL_FILE_GROUP_LOCKED))
+ ll_put_grouplock(inode, file, fd->fd_grouplock.cg_gid);
/* Let's see if we have good enough OPEN lock on the file and if
we can skip talking to MDS */
io->u.ci_wr.wr_append = file->f_flags & O_APPEND;
io->ci_obj = ll_i2info(inode)->lli_clob;
io->ci_lockreq = CILR_MAYBE;
- if (fd->fd_flags & LL_FILE_IGNORE_LOCK || sbi->ll_flags & LL_SBI_NOLCK)
+ if (fd->fd_flags & LL_FILE_IGNORE_LOCK ||
+ sbi->ll_flags & LL_SBI_NOLCK) {
io->ci_lockreq = CILR_NEVER;
- else if (file->f_flags & O_APPEND)
+ io->ci_no_srvlock = 1;
+ } else if (file->f_flags & O_APPEND) {
io->ci_lockreq = CILR_MANDATORY;
+ }
}
static ssize_t ll_file_io_generic(const struct lu_env *env,
(void *)arg);
}
-static int ll_get_grouplock(struct inode *inode, struct file *file,
- unsigned long arg)
+int ll_get_grouplock(struct inode *inode, struct file *file, unsigned long arg)
{
- /* XXX */
- return -ENOSYS;
+ struct ll_inode_info *lli = ll_i2info(inode);
+ struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
+ struct ccc_grouplock grouplock;
+ int rc;
+ ENTRY;
+
+ spin_lock(&lli->lli_lock);
+ if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
+ CERROR("group lock already existed with gid %lu\n",
+ fd->fd_grouplock.cg_gid);
+ spin_unlock(&lli->lli_lock);
+ RETURN(-EINVAL);
+ }
+ LASSERT(fd->fd_grouplock.cg_lock == NULL);
+ spin_unlock(&lli->lli_lock);
+
+ rc = cl_get_grouplock(cl_i2info(inode)->lli_clob,
+ arg, (file->f_flags & O_NONBLOCK), &grouplock);
+ if (rc)
+ RETURN(rc);
+
+ spin_lock(&lli->lli_lock);
+ if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
+ spin_unlock(&lli->lli_lock);
+ CERROR("another thread just won the race\n");
+ cl_put_grouplock(&grouplock);
+ RETURN(-EINVAL);
+ }
+
+ fd->fd_flags |= (LL_FILE_GROUP_LOCKED | LL_FILE_IGNORE_LOCK);
+ fd->fd_grouplock = grouplock;
+ spin_unlock(&lli->lli_lock);
+
+ CDEBUG(D_INFO, "group lock %lu obtained\n", arg);
+ RETURN(0);
}
-static int ll_put_grouplock(struct inode *inode, struct file *file,
- unsigned long arg)
+int ll_put_grouplock(struct inode *inode, struct file *file, unsigned long arg)
{
- /* XXX */
- return -ENOSYS;
+ struct ll_inode_info *lli = ll_i2info(inode);
+ struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
+ struct ccc_grouplock grouplock;
+ ENTRY;
+
+ spin_lock(&lli->lli_lock);
+ if (!(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
+ spin_unlock(&lli->lli_lock);
+ CERROR("no group lock held\n");
+ RETURN(-EINVAL);
+ }
+ LASSERT(fd->fd_grouplock.cg_lock != NULL);
+
+ if (fd->fd_grouplock.cg_gid != arg) {
+ CERROR("group lock %lu doesn't match current id %lu\n",
+ arg, fd->fd_grouplock.cg_gid);
+ spin_unlock(&lli->lli_lock);
+ RETURN(-EINVAL);
+ }
+
+ grouplock = fd->fd_grouplock;
+ fd->fd_grouplock.cg_env = NULL;
+ fd->fd_grouplock.cg_lock = NULL;
+ fd->fd_grouplock.cg_gid = 0;
+ fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED | LL_FILE_IGNORE_LOCK);
+ spin_unlock(&lli->lli_lock);
+
+ cl_put_grouplock(&grouplock);
+ CDEBUG(D_INFO, "group lock %lu released\n", arg);
+ RETURN(0);
}
#if LUSTRE_FIX >= 50
struct ll_file_data {
struct ll_readahead_state fd_ras;
int fd_omode;
- struct lustre_handle fd_cwlockh;
- unsigned long fd_gid;
+ struct ccc_grouplock fd_grouplock;
struct ll_file_dir fd_dir;
__u32 fd_flags;
struct file *fd_file;
int ll_fiemap(struct inode *inode, struct ll_user_fiemap *fiemap,
int num_bytes);
int ll_merge_lvb(struct inode *inode);
+int ll_get_grouplock(struct inode *inode, struct file *file, unsigned long arg);
+int ll_put_grouplock(struct inode *inode, struct file *file, unsigned long arg);
/* llite/dcache.c */
/* llite/namei.c */
/* mmap lock should be MANDATORY or NEVER. */
if (fd->fd_flags & LL_FILE_IGNORE_LOCK ||
- sbi->ll_flags & LL_SBI_NOLCK)
+ sbi->ll_flags & LL_SBI_NOLCK) {
io->ci_lockreq = CILR_NEVER;
- else
+ io->ci_no_srvlock = 1;
+ } else {
io->ci_lockreq = CILR_MANDATORY;
+ }
vio->u.fault.ft_vma = vma;
vio->u.fault.ft_address = address;
sub_io->ci_parent = io;
sub_io->ci_lockreq = io->ci_lockreq;
sub_io->ci_type = io->ci_type;
+ sub_io->ci_no_srvlock = io->ci_no_srvlock;
lov_sub_enter(sub);
result = cl_io_sub_init(sub->sub_env, sub_io,
descr->cld_start = cl_index(descr->cld_obj, start);
descr->cld_end = cl_index(descr->cld_obj, end);
descr->cld_mode = parent->cll_descr.cld_mode;
+ descr->cld_gid = parent->cll_descr.cld_gid;
/* XXX has no effect */
lck->lls_sub[nr].sub_got = *descr;
lck->lls_sub[nr].sub_stripe = stripe;
* while sub-lock is being paged out.
*/
dying = (sublock->cll_descr.cld_mode == CLM_PHANTOM ||
+ sublock->cll_descr.cld_mode == CLM_GROUP ||
(sublock->cll_flags & (CLF_CANCELPEND|CLF_DOOMED))) &&
sublock->cll_holds == 1;
if (dying)
subd->cld_obj = NULL; /* don't need sub object at all */
subd->cld_mode = descr->cld_mode;
+ subd->cld_gid = descr->cld_gid;
result = lov_stripe_intersects(lsm, stripe, start, end,
&sub_start, &sub_end);
LASSERT(result);
ENTRY;
- if (lov->lls_nr == 1) {
+ if (need->cld_mode == CLM_GROUP)
+ /*
+ * always allow to match group lock.
+ */
+ result = cl_lock_ext_match(&lov->lls_orig, need);
+ else if (lov->lls_nr == 1) {
struct cl_lock_descr *got = &lov->lls_sub[0].sub_got;
result = lov_lock_stripe_is_matching(env,
cl2lov(slice->cls_obj),
pd->cld_obj = parent_descr->cld_obj;
pd->cld_mode = parent_descr->cld_mode;
+ pd->cld_gid = parent_descr->cld_gid;
lovsub_lock_descr_map(d, subobj->lso_super, subobj->lso_index, pd);
lov->lls_sub[idx].sub_got = *d;
/*
*/
int cl_lock_mode_match(enum cl_lock_mode has, enum cl_lock_mode need)
{
- LINVRNT(need == CLM_READ || need == CLM_WRITE || need == CLM_PHANTOM);
- LINVRNT(has == CLM_READ || has == CLM_WRITE || has == CLM_PHANTOM);
+ LINVRNT(need == CLM_READ || need == CLM_WRITE ||
+ need == CLM_PHANTOM || need == CLM_GROUP);
+ LINVRNT(has == CLM_READ || has == CLM_WRITE ||
+ has == CLM_PHANTOM || has == CLM_GROUP);
CLASSERT(CLM_PHANTOM < CLM_READ);
CLASSERT(CLM_READ < CLM_WRITE);
+ CLASSERT(CLM_WRITE < CLM_GROUP);
- return need <= has;
+ if (has != CLM_GROUP)
+ return need <= has;
+ else
+ return need == has;
}
EXPORT_SYMBOL(cl_lock_mode_match);
return
has->cld_start <= need->cld_start &&
has->cld_end >= need->cld_end &&
- cl_lock_mode_match(has->cld_mode, need->cld_mode);
+ cl_lock_mode_match(has->cld_mode, need->cld_mode) &&
+ (has->cld_mode != CLM_GROUP || has->cld_gid == need->cld_gid);
}
EXPORT_SYMBOL(cl_lock_ext_match);
lu_ref_del(&lock->cll_holders, scope, source);
cl_lock_hold_mod(env, lock, -1);
if (lock->cll_holds == 0) {
- if (lock->cll_descr.cld_mode == CLM_PHANTOM)
+ if (lock->cll_descr.cld_mode == CLM_PHANTOM ||
+ lock->cll_descr.cld_mode == CLM_GROUP)
/*
- * If lock is still phantom when user is done with
- * it---destroy the lock.
+ * If lock is still phantom or grouplock when user is
+ * done with it---destroy the lock.
*/
lock->cll_flags |= CLF_CANCELPEND|CLF_DOOMED;
if (lock->cll_flags & CLF_CANCELPEND) {
static const char *names[] = {
[CLM_PHANTOM] = "PHANTOM",
[CLM_READ] = "READ",
- [CLM_WRITE] = "WRITE"
+ [CLM_WRITE] = "WRITE",
+ [CLM_GROUP] = "GROUP"
};
if (0 <= mode && mode < ARRAY_SIZE(names))
return names[mode];
static inline ldlm_mode_t osc_cl_lock2ldlm(enum cl_lock_mode mode)
{
- LASSERT(mode == CLM_READ || mode == CLM_WRITE);
- return mode == CLM_READ ? LCK_PR : LCK_PW;
+ LASSERT(mode == CLM_READ || mode == CLM_WRITE || mode == CLM_GROUP);
+ if (mode == CLM_READ)
+ return LCK_PR;
+ else if (mode == CLM_WRITE)
+ return LCK_PW;
+ else
+ return LCK_GROUP;
}
static inline enum cl_lock_mode osc_ldlm2cl_lock(ldlm_mode_t mode)
{
- LASSERT(mode == LCK_PR || mode == LCK_PW);
- return mode == LCK_PR ? CLM_READ : CLM_WRITE;
+ LASSERT(mode == LCK_PR || mode == LCK_PW || mode == LCK_GROUP);
+ if (mode == LCK_PR)
+ return CLM_READ;
+ else if (mode == LCK_PW)
+ return CLM_WRITE;
+ else
+ return CLM_GROUP;
}
static inline struct osc_page *cl2osc_page(const struct cl_page_slice *slice)
return cl2osc_lock(cl_lock_at(lock, &osc_device_type));
}
+static inline int osc_io_srvlock(struct osc_io *oio)
+{
+ return (oio->oi_lockless && !oio->oi_cl.cis_io->ci_no_srvlock);
+}
+
/** @} osc */
#endif /* OSC_CL_INTERNAL_H */
const struct cl_lock_descr *d = &lock->cll_descr;
osc_index2policy(policy, d->cld_obj, d->cld_start, d->cld_end);
+ policy->l_extent.gid = d->cld_gid;
}
static int osc_enq2ldlm_flags(__u32 enqflags)
descr->cld_mode = osc_ldlm2cl_lock(dlmlock->l_granted_mode);
descr->cld_start = cl_index(descr->cld_obj, ext->start);
descr->cld_end = cl_index(descr->cld_obj, ext->end);
+ descr->cld_gid = ext->gid;
/*
* tell upper layers the extent of the lock that was actually
* granted
continue;
/* overlapped and living locks. */
+
+ /* We're not supposed to give up group lock. */
+ if (scan->cll_descr.cld_mode == CLM_GROUP) {
+ LASSERT(descr->cld_mode != CLM_GROUP ||
+ descr->cld_gid != scan->cll_descr.cld_gid);
+ continue;
+ }
+
/* A tricky case for lockless pages:
* We need to cancel the compatible locks if we're enqueuing
* a lockless lock, for example:
ENTRY;
/* Set the OBD_BRW_SRVLOCK before the page is queued. */
- brw_flags = oio->oi_lockless ? OBD_BRW_SRVLOCK : 0;
+ brw_flags = osc_io_srvlock(oio) ? OBD_BRW_SRVLOCK : 0;
if (!client_is_remote(osc_export(obj)) &&
cfs_capable(CFS_CAP_SYS_RESOURCE)) {
brw_flags |= OBD_BRW_NOQUOTA;
oap->oap_page_off = opg->ops_from;
oap->oap_count = opg->ops_to - opg->ops_from;
oap->oap_brw_flags |= OBD_BRW_SYNC;
- if (oio->oi_lockless)
+ if (osc_io_srvlock(oio))
oap->oap_brw_flags |= OBD_BRW_SRVLOCK;
oap->oap_cmd = crt == CRT_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ;