Whamcloud - gitweb
branch: HEAD
authorericm <ericm>
Wed, 13 May 2009 20:10:42 +0000 (20:10 +0000)
committerericm <ericm>
Wed, 13 May 2009 20:10:42 +0000 (20:10 +0000)
implement Group Locks for CLIO.
b=18884
r=jay
r=wangdi

13 files changed:
lustre/include/cl_object.h
lustre/include/lclient.h
lustre/lclient/lcommon_misc.c
lustre/llite/file.c
lustre/llite/llite_internal.h
lustre/llite/llite_mmap.c
lustre/lov/lov_io.c
lustre/lov/lov_lock.c
lustre/lov/lovsub_lock.c
lustre/obdclass/cl_lock.c
lustre/osc/osc_cl_internal.h
lustre/osc/osc_lock.c
lustre/osc/osc_page.c

index c6c764e..fa8c613 100644 (file)
@@ -782,7 +782,8 @@ enum cl_lock_mode {
          */
         CLM_PHANTOM,
         CLM_READ,
          */
         CLM_PHANTOM,
         CLM_READ,
-        CLM_WRITE
+        CLM_WRITE,
+        CLM_GROUP
 };
 
 /**
 };
 
 /**
@@ -1291,6 +1292,8 @@ struct cl_lock_descr {
         pgoff_t           cld_start;
         /** Index of the last page (inclusive) protected by this lock. */
         pgoff_t           cld_end;
         pgoff_t           cld_start;
         /** Index of the last page (inclusive) protected by this lock. */
         pgoff_t           cld_end;
+        /** Group ID, for group lock */
+        __u64             cld_gid;
         /** Lock mode. */
         enum cl_lock_mode cld_mode;
 };
         /** Lock mode. */
         enum cl_lock_mode cld_mode;
 };
@@ -1884,6 +1887,8 @@ enum cl_io_type {
          *
          *     - glimpse. An io context to acquire glimpse lock.
          *
          *
          *     - glimpse. An io context to acquire glimpse lock.
          *
+         *     - grouplock. An io context to acquire group lock.
+         *
          * CIT_MISC io is used simply as a context in which locks and pages
          * are manipulated. Such io has no internal "process", that is,
          * cl_io_loop() is never called for it.
          * CIT_MISC io is used simply as a context in which locks and pages
          * are manipulated. Such io has no internal "process", that is,
          * cl_io_loop() is never called for it.
@@ -2233,6 +2238,11 @@ struct cl_io {
         struct cl_lockset              ci_lockset;
         /** lock requirements, this is just a help info for sublayers. */
         enum cl_io_lock_dmd            ci_lockreq;
         struct cl_lockset              ci_lockset;
         /** lock requirements, this is just a help info for sublayers. */
         enum cl_io_lock_dmd            ci_lockreq;
+        /**
+         * This io has held grouplock, to inform sublayers that
+         * don't do lockless i/o.
+         */
+        int                            ci_no_srvlock;
         union {
                 struct cl_rd_io {
                         struct cl_io_rw_common rd;
         union {
                 struct cl_rd_io {
                         struct cl_io_rw_common rd;
index c0c6cd0..e1e18f7 100644 (file)
@@ -381,4 +381,14 @@ int cl_ocd_update(struct obd_device *host,
                   struct obd_device *watched,
                   enum obd_notify_event ev, void *owner);
 
                   struct obd_device *watched,
                   enum obd_notify_event ev, void *owner);
 
+struct ccc_grouplock {
+        struct lu_env   *cg_env;
+        struct cl_lock  *cg_lock;
+        unsigned long    cg_gid;
+};
+
+int  cl_get_grouplock(struct cl_object *obj, unsigned long gid, int nonblock,
+                      struct ccc_grouplock *cg);
+void cl_put_grouplock(struct ccc_grouplock *cg);
+
 #endif /*LCLIENT_H */
 #endif /*LCLIENT_H */
index aed40e5..62a1ca5 100644 (file)
@@ -40,6 +40,8 @@
 #include <obd_class.h>
 #include <obd_support.h>
 #include <obd.h>
 #include <obd_class.h>
 #include <obd_support.h>
 #include <obd.h>
+#include <cl_object.h>
+#include <lclient.h>
 
 #include <lustre_lite.h>
 
 
 #include <lustre_lite.h>
 
@@ -116,3 +118,74 @@ int cl_ocd_update(struct obd_device *host,
         }
         RETURN(result);
 }
         }
         RETURN(result);
 }
+
+#define GROUPLOCK_SCOPE "grouplock"
+
+int cl_get_grouplock(struct cl_object *obj, unsigned long gid, int nonblock,
+                     struct ccc_grouplock *cg)
+{
+        struct lu_env          *env;
+        struct cl_io           *io;
+        struct cl_lock         *lock;
+        struct cl_lock_descr   *descr;
+        __u32                   enqflags;
+        int                     refcheck;
+        int                     rc;
+
+        env = cl_env_get(&refcheck);
+        if (IS_ERR(env))
+                return PTR_ERR(env);
+
+        io = &ccc_env_info(env)->cti_io;
+        io->ci_obj = obj;
+
+        rc = cl_io_init(env, io, CIT_MISC, io->ci_obj);
+        if (rc) {
+                LASSERT(rc < 0);
+                cl_env_put(env, &refcheck);
+                return rc;
+        }
+
+        descr = &ccc_env_info(env)->cti_descr;
+        descr->cld_obj = obj;
+        descr->cld_start = 0;
+        descr->cld_end = CL_PAGE_EOF;
+        descr->cld_gid = gid;
+        descr->cld_mode = CLM_GROUP;
+
+        enqflags = CEF_MUST | (nonblock ? CEF_NONBLOCK : 0);
+        lock = cl_lock_request(env, io, descr, enqflags,
+                               GROUPLOCK_SCOPE, cfs_current());
+        if (IS_ERR(lock)) {
+                cl_io_fini(env, io);
+                cl_env_put(env, &refcheck);
+                return PTR_ERR(lock);
+        }
+
+        cg->cg_env = cl_env_get(&refcheck);
+        cg->cg_lock = lock;
+        cg->cg_gid = gid;
+        LASSERT(cg->cg_env == env);
+
+        cl_env_unplant(env, &refcheck);
+        return 0;
+}
+
+void cl_put_grouplock(struct ccc_grouplock *cg)
+{
+        struct lu_env          *env = cg->cg_env;
+        struct cl_lock         *lock = cg->cg_lock;
+        int                     refcheck;
+
+        LASSERT(cg->cg_env);
+        LASSERT(cg->cg_gid);
+
+        cl_env_implant(env, &refcheck);
+        cl_env_put(env, &refcheck);
+
+        cl_unuse(env, lock);
+        cl_lock_release(env, lock, GROUPLOCK_SCOPE, cfs_current());
+        cl_io_fini(env, &ccc_env_info(env)->cti_io);
+        cl_env_put(env, NULL);
+}
+
index 17990d8..beaf065 100644 (file)
@@ -232,14 +232,8 @@ int ll_md_close(struct obd_export *md_exp, struct inode *inode,
         ENTRY;
 
         /* clear group lock, if present */
         ENTRY;
 
         /* clear group lock, if present */
-        if (unlikely(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
-#if 0 /* XXX */
-                struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
-                fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED|LL_FILE_IGNORE_LOCK);
-                rc = ll_extent_unlock(fd, inode, lsm, LCK_GROUP,
-                                      &fd->fd_cwlockh);
-#endif
-        }
+        if (unlikely(fd->fd_flags & LL_FILE_GROUP_LOCKED))
+                ll_put_grouplock(inode, file, fd->fd_grouplock.cg_gid);
 
         /* Let's see if we have good enough OPEN lock on the file and if
            we can skip talking to MDS */
 
         /* Let's see if we have good enough OPEN lock on the file and if
            we can skip talking to MDS */
@@ -806,10 +800,13 @@ void ll_io_init(struct cl_io *io, const struct file *file, int write)
                 io->u.ci_wr.wr_append = file->f_flags & O_APPEND;
         io->ci_obj     = ll_i2info(inode)->lli_clob;
         io->ci_lockreq = CILR_MAYBE;
                 io->u.ci_wr.wr_append = file->f_flags & O_APPEND;
         io->ci_obj     = ll_i2info(inode)->lli_clob;
         io->ci_lockreq = CILR_MAYBE;
-        if (fd->fd_flags & LL_FILE_IGNORE_LOCK || sbi->ll_flags & LL_SBI_NOLCK)
+        if (fd->fd_flags & LL_FILE_IGNORE_LOCK ||
+            sbi->ll_flags & LL_SBI_NOLCK) {
                 io->ci_lockreq = CILR_NEVER;
                 io->ci_lockreq = CILR_NEVER;
-        else if (file->f_flags & O_APPEND)
+                io->ci_no_srvlock = 1;
+        } else if (file->f_flags & O_APPEND) {
                 io->ci_lockreq = CILR_MANDATORY;
                 io->ci_lockreq = CILR_MANDATORY;
+        }
 }
 
 static ssize_t ll_file_io_generic(const struct lu_env *env,
 }
 
 static ssize_t ll_file_io_generic(const struct lu_env *env,
@@ -1421,18 +1418,77 @@ static int ll_lov_getstripe(struct inode *inode, unsigned long arg)
                             (void *)arg);
 }
 
                             (void *)arg);
 }
 
-static int ll_get_grouplock(struct inode *inode, struct file *file,
-                            unsigned long arg)
+int ll_get_grouplock(struct inode *inode, struct file *file, unsigned long arg)
 {
 {
-        /* XXX */
-        return -ENOSYS;
+        struct ll_inode_info   *lli = ll_i2info(inode);
+        struct ll_file_data    *fd = LUSTRE_FPRIVATE(file);
+        struct ccc_grouplock    grouplock;
+        int                     rc;
+        ENTRY;
+
+        spin_lock(&lli->lli_lock);
+        if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
+                CERROR("group lock already existed with gid %lu\n",
+                       fd->fd_grouplock.cg_gid);
+                spin_unlock(&lli->lli_lock);
+                RETURN(-EINVAL);
+        }
+        LASSERT(fd->fd_grouplock.cg_lock == NULL);
+        spin_unlock(&lli->lli_lock);
+
+        rc = cl_get_grouplock(cl_i2info(inode)->lli_clob,
+                              arg, (file->f_flags & O_NONBLOCK), &grouplock);
+        if (rc)
+                RETURN(rc);
+
+        spin_lock(&lli->lli_lock);
+        if (fd->fd_flags & LL_FILE_GROUP_LOCKED) {
+                spin_unlock(&lli->lli_lock);
+                CERROR("another thread just won the race\n");
+                cl_put_grouplock(&grouplock);
+                RETURN(-EINVAL);
+        }
+
+        fd->fd_flags |= (LL_FILE_GROUP_LOCKED | LL_FILE_IGNORE_LOCK);
+        fd->fd_grouplock = grouplock;
+        spin_unlock(&lli->lli_lock);
+
+        CDEBUG(D_INFO, "group lock %lu obtained\n", arg);
+        RETURN(0);
 }
 
 }
 
-static int ll_put_grouplock(struct inode *inode, struct file *file,
-                            unsigned long arg)
+int ll_put_grouplock(struct inode *inode, struct file *file, unsigned long arg)
 {
 {
-        /* XXX */
-        return -ENOSYS;
+        struct ll_inode_info   *lli = ll_i2info(inode);
+        struct ll_file_data    *fd = LUSTRE_FPRIVATE(file);
+        struct ccc_grouplock    grouplock;
+        ENTRY;
+
+        spin_lock(&lli->lli_lock);
+        if (!(fd->fd_flags & LL_FILE_GROUP_LOCKED)) {
+                spin_unlock(&lli->lli_lock);
+                CERROR("no group lock held\n");
+                RETURN(-EINVAL);
+        }
+        LASSERT(fd->fd_grouplock.cg_lock != NULL);
+
+        if (fd->fd_grouplock.cg_gid != arg) {
+                CERROR("group lock %lu doesn't match current id %lu\n",
+                       arg, fd->fd_grouplock.cg_gid);
+                spin_unlock(&lli->lli_lock);
+                RETURN(-EINVAL);
+        }
+
+        grouplock = fd->fd_grouplock;
+        fd->fd_grouplock.cg_env = NULL;
+        fd->fd_grouplock.cg_lock = NULL;
+        fd->fd_grouplock.cg_gid = 0;
+        fd->fd_flags &= ~(LL_FILE_GROUP_LOCKED | LL_FILE_IGNORE_LOCK);
+        spin_unlock(&lli->lli_lock);
+
+        cl_put_grouplock(&grouplock);
+        CDEBUG(D_INFO, "group lock %lu released\n", arg);
+        RETURN(0);
 }
 
 #if LUSTRE_FIX >= 50
 }
 
 #if LUSTRE_FIX >= 50
index 14428ac..e2461ec 100644 (file)
@@ -504,8 +504,7 @@ struct lustre_handle;
 struct ll_file_data {
         struct ll_readahead_state fd_ras;
         int fd_omode;
 struct ll_file_data {
         struct ll_readahead_state fd_ras;
         int fd_omode;
-        struct lustre_handle fd_cwlockh;
-        unsigned long fd_gid;
+        struct ccc_grouplock fd_grouplock;
         struct ll_file_dir fd_dir;
         __u32 fd_flags;
         struct file *fd_file;
         struct ll_file_dir fd_dir;
         __u32 fd_flags;
         struct file *fd_file;
@@ -678,6 +677,8 @@ int ll_fsync(struct file *file, struct dentry *dentry, int data);
 int ll_fiemap(struct inode *inode, struct ll_user_fiemap *fiemap,
               int num_bytes);
 int ll_merge_lvb(struct inode *inode);
 int ll_fiemap(struct inode *inode, struct ll_user_fiemap *fiemap,
               int num_bytes);
 int ll_merge_lvb(struct inode *inode);
+int ll_get_grouplock(struct inode *inode, struct file *file, unsigned long arg);
+int ll_put_grouplock(struct inode *inode, struct file *file, unsigned long arg);
 
 /* llite/dcache.c */
 /* llite/namei.c */
 
 /* llite/dcache.c */
 /* llite/namei.c */
index 52a8970..c237fac 100644 (file)
@@ -182,10 +182,12 @@ struct page *ll_nopage(struct vm_area_struct *vma, unsigned long address,
 
                         /* mmap lock should be MANDATORY or NEVER. */
                         if (fd->fd_flags & LL_FILE_IGNORE_LOCK ||
 
                         /* mmap lock should be MANDATORY or NEVER. */
                         if (fd->fd_flags & LL_FILE_IGNORE_LOCK ||
-                            sbi->ll_flags & LL_SBI_NOLCK)
+                            sbi->ll_flags & LL_SBI_NOLCK) {
                                 io->ci_lockreq = CILR_NEVER;
                                 io->ci_lockreq = CILR_NEVER;
-                        else
+                                io->ci_no_srvlock = 1;
+                        } else {
                                 io->ci_lockreq = CILR_MANDATORY;
                                 io->ci_lockreq = CILR_MANDATORY;
+                        }
 
                         vio->u.fault.ft_vma     = vma;
                         vio->u.fault.ft_address = address;
 
                         vio->u.fault.ft_vma     = vma;
                         vio->u.fault.ft_address = address;
index 6fada6d..f3d8893 100644 (file)
@@ -185,6 +185,7 @@ static int lov_io_sub_init(const struct lu_env *env, struct lov_io *lio,
                 sub_io->ci_parent  = io;
                 sub_io->ci_lockreq = io->ci_lockreq;
                 sub_io->ci_type    = io->ci_type;
                 sub_io->ci_parent  = io;
                 sub_io->ci_lockreq = io->ci_lockreq;
                 sub_io->ci_type    = io->ci_type;
+                sub_io->ci_no_srvlock = io->ci_no_srvlock;
 
                 lov_sub_enter(sub);
                 result = cl_io_sub_init(sub->sub_env, sub_io,
 
                 lov_sub_enter(sub);
                 result = cl_io_sub_init(sub->sub_env, sub_io,
index 67700dd..79a6942 100644 (file)
@@ -330,6 +330,7 @@ static int lov_lock_sub_init(const struct lu_env *env,
                         descr->cld_start = cl_index(descr->cld_obj, start);
                         descr->cld_end   = cl_index(descr->cld_obj, end);
                         descr->cld_mode  = parent->cll_descr.cld_mode;
                         descr->cld_start = cl_index(descr->cld_obj, start);
                         descr->cld_end   = cl_index(descr->cld_obj, end);
                         descr->cld_mode  = parent->cll_descr.cld_mode;
+                        descr->cld_gid   = parent->cll_descr.cld_gid;
                         /* XXX has no effect */
                         lck->lls_sub[nr].sub_got = *descr;
                         lck->lls_sub[nr].sub_stripe = stripe;
                         /* XXX has no effect */
                         lck->lls_sub[nr].sub_got = *descr;
                         lck->lls_sub[nr].sub_stripe = stripe;
@@ -402,6 +403,7 @@ static int lov_sublock_release(const struct lu_env *env, struct lov_lock *lck,
                  * while sub-lock is being paged out.
                  */
                 dying = (sublock->cll_descr.cld_mode == CLM_PHANTOM ||
                  * while sub-lock is being paged out.
                  */
                 dying = (sublock->cll_descr.cld_mode == CLM_PHANTOM ||
+                         sublock->cll_descr.cld_mode == CLM_GROUP ||
                          (sublock->cll_flags & (CLF_CANCELPEND|CLF_DOOMED))) &&
                         sublock->cll_holds == 1;
                 if (dying)
                          (sublock->cll_flags & (CLF_CANCELPEND|CLF_DOOMED))) &&
                         sublock->cll_holds == 1;
                 if (dying)
@@ -824,6 +826,7 @@ static int lov_lock_stripe_is_matching(const struct lu_env *env,
 
                 subd->cld_obj  = NULL;   /* don't need sub object at all */
                 subd->cld_mode = descr->cld_mode;
 
                 subd->cld_obj  = NULL;   /* don't need sub object at all */
                 subd->cld_mode = descr->cld_mode;
+                subd->cld_gid  = descr->cld_gid;
                 result = lov_stripe_intersects(lsm, stripe, start, end,
                                                &sub_start, &sub_end);
                 LASSERT(result);
                 result = lov_stripe_intersects(lsm, stripe, start, end,
                                                &sub_start, &sub_end);
                 LASSERT(result);
@@ -857,7 +860,12 @@ static int lov_lock_fits_into(const struct lu_env *env,
 
         ENTRY;
 
 
         ENTRY;
 
-        if (lov->lls_nr == 1) {
+        if (need->cld_mode == CLM_GROUP)
+                /*
+                 * always allow to match group lock.
+                 */
+                result = cl_lock_ext_match(&lov->lls_orig, need);
+        else if (lov->lls_nr == 1) {
                 struct cl_lock_descr *got = &lov->lls_sub[0].sub_got;
                 result = lov_lock_stripe_is_matching(env,
                                                      cl2lov(slice->cls_obj),
                 struct cl_lock_descr *got = &lov->lls_sub[0].sub_got;
                 result = lov_lock_stripe_is_matching(env,
                                                      cl2lov(slice->cls_obj),
index 162033d..48d788a 100644 (file)
@@ -258,6 +258,7 @@ int lov_sublock_modify(const struct lu_env *env, struct lov_lock *lov,
 
         pd->cld_obj  = parent_descr->cld_obj;
         pd->cld_mode = parent_descr->cld_mode;
 
         pd->cld_obj  = parent_descr->cld_obj;
         pd->cld_mode = parent_descr->cld_mode;
+        pd->cld_gid  = parent_descr->cld_gid;
         lovsub_lock_descr_map(d, subobj->lso_super, subobj->lso_index, pd);
         lov->lls_sub[idx].sub_got = *d;
         /*
         lovsub_lock_descr_map(d, subobj->lso_super, subobj->lso_index, pd);
         lov->lls_sub[idx].sub_got = *d;
         /*
index b75685f..6f6251d 100644 (file)
@@ -194,12 +194,18 @@ EXPORT_SYMBOL(cl_lock_slice_add);
  */
 int cl_lock_mode_match(enum cl_lock_mode has, enum cl_lock_mode need)
 {
  */
 int cl_lock_mode_match(enum cl_lock_mode has, enum cl_lock_mode need)
 {
-        LINVRNT(need == CLM_READ || need == CLM_WRITE || need == CLM_PHANTOM);
-        LINVRNT(has == CLM_READ || has == CLM_WRITE || has == CLM_PHANTOM);
+        LINVRNT(need == CLM_READ || need == CLM_WRITE ||
+                need == CLM_PHANTOM || need == CLM_GROUP);
+        LINVRNT(has == CLM_READ || has == CLM_WRITE ||
+                has == CLM_PHANTOM || has == CLM_GROUP);
         CLASSERT(CLM_PHANTOM < CLM_READ);
         CLASSERT(CLM_READ < CLM_WRITE);
         CLASSERT(CLM_PHANTOM < CLM_READ);
         CLASSERT(CLM_READ < CLM_WRITE);
+        CLASSERT(CLM_WRITE < CLM_GROUP);
 
 
-        return need <= has;
+        if (has != CLM_GROUP)
+                return need <= has;
+        else
+                return need == has;
 }
 EXPORT_SYMBOL(cl_lock_mode_match);
 
 }
 EXPORT_SYMBOL(cl_lock_mode_match);
 
@@ -212,7 +218,8 @@ int cl_lock_ext_match(const struct cl_lock_descr *has,
         return
                 has->cld_start <= need->cld_start &&
                 has->cld_end >= need->cld_end &&
         return
                 has->cld_start <= need->cld_start &&
                 has->cld_end >= need->cld_end &&
-                cl_lock_mode_match(has->cld_mode, need->cld_mode);
+                cl_lock_mode_match(has->cld_mode, need->cld_mode) &&
+                (has->cld_mode != CLM_GROUP || has->cld_gid == need->cld_gid);
 }
 EXPORT_SYMBOL(cl_lock_ext_match);
 
 }
 EXPORT_SYMBOL(cl_lock_ext_match);
 
@@ -831,10 +838,11 @@ static void cl_lock_hold_release(const struct lu_env *env, struct cl_lock *lock,
         lu_ref_del(&lock->cll_holders, scope, source);
         cl_lock_hold_mod(env, lock, -1);
         if (lock->cll_holds == 0) {
         lu_ref_del(&lock->cll_holders, scope, source);
         cl_lock_hold_mod(env, lock, -1);
         if (lock->cll_holds == 0) {
-                if (lock->cll_descr.cld_mode == CLM_PHANTOM)
+                if (lock->cll_descr.cld_mode == CLM_PHANTOM ||
+                    lock->cll_descr.cld_mode == CLM_GROUP)
                         /*
                         /*
-                         * If lock is still phantom when user is done with
-                         * it---destroy the lock.
+                         * If lock is still phantom or grouplock when user is
+                         * done with it---destroy the lock.
                          */
                         lock->cll_flags |= CLF_CANCELPEND|CLF_DOOMED;
                 if (lock->cll_flags & CLF_CANCELPEND) {
                          */
                         lock->cll_flags |= CLF_CANCELPEND|CLF_DOOMED;
                 if (lock->cll_flags & CLF_CANCELPEND) {
@@ -2077,7 +2085,8 @@ const char *cl_lock_mode_name(const enum cl_lock_mode mode)
         static const char *names[] = {
                 [CLM_PHANTOM] = "PHANTOM",
                 [CLM_READ]    = "READ",
         static const char *names[] = {
                 [CLM_PHANTOM] = "PHANTOM",
                 [CLM_READ]    = "READ",
-                [CLM_WRITE]   = "WRITE"
+                [CLM_WRITE]   = "WRITE",
+                [CLM_GROUP]   = "GROUP"
         };
         if (0 <= mode && mode < ARRAY_SIZE(names))
                 return names[mode];
         };
         if (0 <= mode && mode < ARRAY_SIZE(names))
                 return names[mode];
index 6085101..b8cdd3e 100644 (file)
@@ -388,14 +388,24 @@ static inline struct osc_object *cl2osc(const struct cl_object *obj)
 
 static inline ldlm_mode_t osc_cl_lock2ldlm(enum cl_lock_mode mode)
 {
 
 static inline ldlm_mode_t osc_cl_lock2ldlm(enum cl_lock_mode mode)
 {
-        LASSERT(mode == CLM_READ || mode == CLM_WRITE);
-        return mode == CLM_READ ? LCK_PR : LCK_PW;
+        LASSERT(mode == CLM_READ || mode == CLM_WRITE || mode == CLM_GROUP);
+        if (mode == CLM_READ)
+                return LCK_PR;
+        else if (mode == CLM_WRITE)
+                return LCK_PW;
+        else
+                return LCK_GROUP;
 }
 
 static inline enum cl_lock_mode osc_ldlm2cl_lock(ldlm_mode_t mode)
 {
 }
 
 static inline enum cl_lock_mode osc_ldlm2cl_lock(ldlm_mode_t mode)
 {
-        LASSERT(mode == LCK_PR || mode == LCK_PW);
-        return mode == LCK_PR ? CLM_READ : CLM_WRITE;
+        LASSERT(mode == LCK_PR || mode == LCK_PW || mode == LCK_GROUP);
+        if (mode == LCK_PR)
+                return CLM_READ;
+        else if (mode == LCK_PW)
+                return CLM_WRITE;
+        else
+                return CLM_GROUP;
 }
 
 static inline struct osc_page *cl2osc_page(const struct cl_page_slice *slice)
 }
 
 static inline struct osc_page *cl2osc_page(const struct cl_page_slice *slice)
@@ -415,6 +425,11 @@ static inline struct osc_lock *osc_lock_at(const struct cl_lock *lock)
         return cl2osc_lock(cl_lock_at(lock, &osc_device_type));
 }
 
         return cl2osc_lock(cl_lock_at(lock, &osc_device_type));
 }
 
+static inline int osc_io_srvlock(struct osc_io *oio)
+{
+        return (oio->oi_lockless && !oio->oi_cl.cis_io->ci_no_srvlock);
+}
+
 /** @} osc */
 
 #endif /* OSC_CL_INTERNAL_H */
 /** @} osc */
 
 #endif /* OSC_CL_INTERNAL_H */
index f611bf6..ddf9a34 100644 (file)
@@ -242,6 +242,7 @@ static void osc_lock_build_policy(const struct lu_env *env,
         const struct cl_lock_descr *d = &lock->cll_descr;
 
         osc_index2policy(policy, d->cld_obj, d->cld_start, d->cld_end);
         const struct cl_lock_descr *d = &lock->cll_descr;
 
         osc_index2policy(policy, d->cld_obj, d->cld_start, d->cld_end);
+        policy->l_extent.gid = d->cld_gid;
 }
 
 static int osc_enq2ldlm_flags(__u32 enqflags)
 }
 
 static int osc_enq2ldlm_flags(__u32 enqflags)
@@ -405,6 +406,7 @@ static void osc_lock_granted(const struct lu_env *env, struct osc_lock *olck,
                 descr->cld_mode  = osc_ldlm2cl_lock(dlmlock->l_granted_mode);
                 descr->cld_start = cl_index(descr->cld_obj, ext->start);
                 descr->cld_end   = cl_index(descr->cld_obj, ext->end);
                 descr->cld_mode  = osc_ldlm2cl_lock(dlmlock->l_granted_mode);
                 descr->cld_start = cl_index(descr->cld_obj, ext->start);
                 descr->cld_end   = cl_index(descr->cld_obj, ext->end);
+                descr->cld_gid   = ext->gid;
                 /*
                  * tell upper layers the extent of the lock that was actually
                  * granted
                 /*
                  * tell upper layers the extent of the lock that was actually
                  * granted
@@ -1116,6 +1118,14 @@ static int osc_lock_enqueue_wait(const struct lu_env *env,
                         continue;
 
                 /* overlapped and living locks. */
                         continue;
 
                 /* overlapped and living locks. */
+
+                /* We're not supposed to give up group lock. */
+                if (scan->cll_descr.cld_mode == CLM_GROUP) {
+                        LASSERT(descr->cld_mode != CLM_GROUP ||
+                                descr->cld_gid != scan->cll_descr.cld_gid);
+                        continue;
+                }
+
                 /* A tricky case for lockless pages:
                  * We need to cancel the compatible locks if we're enqueuing
                  * a lockless lock, for example:
                 /* A tricky case for lockless pages:
                  * We need to cancel the compatible locks if we're enqueuing
                  * a lockless lock, for example:
index ca9f7d7..5f5179c 100644 (file)
@@ -201,7 +201,7 @@ static int osc_page_cache_add(const struct lu_env *env,
         ENTRY;
 
         /* Set the OBD_BRW_SRVLOCK before the page is queued. */
         ENTRY;
 
         /* Set the OBD_BRW_SRVLOCK before the page is queued. */
-        brw_flags = oio->oi_lockless ? OBD_BRW_SRVLOCK : 0;
+        brw_flags = osc_io_srvlock(oio) ? OBD_BRW_SRVLOCK : 0;
         if (!client_is_remote(osc_export(obj)) &&
             cfs_capable(CFS_CAP_SYS_RESOURCE)) {
                 brw_flags |= OBD_BRW_NOQUOTA;
         if (!client_is_remote(osc_export(obj)) &&
             cfs_capable(CFS_CAP_SYS_RESOURCE)) {
                 brw_flags |= OBD_BRW_NOQUOTA;
@@ -528,7 +528,7 @@ void osc_io_submit_page(const struct lu_env *env,
         oap->oap_page_off   = opg->ops_from;
         oap->oap_count      = opg->ops_to - opg->ops_from;
         oap->oap_brw_flags |= OBD_BRW_SYNC;
         oap->oap_page_off   = opg->ops_from;
         oap->oap_count      = opg->ops_to - opg->ops_from;
         oap->oap_brw_flags |= OBD_BRW_SYNC;
-        if (oio->oi_lockless)
+        if (osc_io_srvlock(oio))
                 oap->oap_brw_flags |= OBD_BRW_SRVLOCK;
 
         oap->oap_cmd = crt == CRT_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ;
                 oap->oap_brw_flags |= OBD_BRW_SRVLOCK;
 
         oap->oap_cmd = crt == CRT_WRITE ? OBD_BRW_WRITE : OBD_BRW_READ;