Whamcloud - gitweb
b=17633
authorjxiong <jxiong>
Fri, 19 Dec 2008 02:58:02 +0000 (02:58 +0000)
committerjxiong <jxiong>
Fri, 19 Dec 2008 02:58:02 +0000 (02:58 +0000)
r=nikita,zam

lockless io fixes for multistripe objects.

lustre/include/cl_object.h
lustre/llite/llite_mmap.c
lustre/llite/vvp_io.c
lustre/lov/lov_cl_internal.h
lustre/lov/lov_io.c
lustre/lov/lov_lock.c
lustre/obdclass/cl_io.c
lustre/osc/osc_io.c
lustre/osc/osc_lock.c
lustre/tests/sanityN.sh

index 3343139..ad4b468 100644 (file)
@@ -2098,7 +2098,24 @@ enum cl_enq_flags {
          * owner of the conflicting lock, that it can drop dirty pages
          * protected by this lock, without sending them to the server.
          */
-        CEF_DISCARD_DATA = 0x00000004
+        CEF_DISCARD_DATA = 0x00000004,
+        /**
+         * tell the sub layers that it must be a `real' lock.
+         */
+        CEF_MUST         = 0x00000008,
+        /**
+         * tell the sub layers that never request a `real' lock.
+         * currently, the CEF_MUST & CEF_NEVER are only used for mmap locks.
+         * cl_io::ci_lockreq and these two flags: ci_lockreq just describes
+         * generic information of lock requirement for this IO, especially for
+         * locks which belong to the object doing IO; however, lock itself may
+         * have precise requirements, this is described by the latter.
+         */
+        CEF_NEVER        = 0x00000010,
+        /**
+         * mask of enq_flags.
+         */
+        CEF_MASK         = 0x0000001f
 };
 
 /**
@@ -2837,7 +2854,7 @@ void  cl_io_end          (const struct lu_env *env, struct cl_io *io);
 int   cl_io_lock_add     (const struct lu_env *env, struct cl_io *io,
                           struct cl_io_lock_link *link);
 int   cl_io_lock_alloc_add(const struct lu_env *env, struct cl_io *io,
-                           struct cl_lock_descr *descr);
+                           struct cl_lock_descr *descr, int enqflags);
 int   cl_io_read_page    (const struct lu_env *env, struct cl_io *io,
                           struct cl_page *page);
 int   cl_io_prepare_write(const struct lu_env *env, struct cl_io *io,
index 4126a83..52a8970 100644 (file)
@@ -150,6 +150,7 @@ struct page *ll_nopage(struct vm_area_struct *vma, unsigned long address,
                 struct cl_fault_io *fio;
 
                 io = &ccc_env_info(env)->cti_io;
+                memset(io, 0, sizeof(*io));
                 io->ci_obj = ll_i2info(inode)->lli_clob;
                 LASSERT(io->ci_obj != NULL);
 
@@ -174,13 +175,22 @@ struct page *ll_nopage(struct vm_area_struct *vma, unsigned long address,
                 if (cl_io_init(env, io, CIT_FAULT, io->ci_obj) == 0) {
                         struct vvp_io *vio = vvp_env_io(env);
                         struct ccc_io *cio = ccc_env_io(env);
+                        struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
+                        struct ll_sb_info *sbi  = ll_i2sbi(inode);
 
                         LASSERT(cio->cui_cl.cis_io == io);
 
+                        /* mmap lock should be MANDATORY or NEVER. */
+                        if (fd->fd_flags & LL_FILE_IGNORE_LOCK ||
+                            sbi->ll_flags & LL_SBI_NOLCK)
+                                io->ci_lockreq = CILR_NEVER;
+                        else
+                                io->ci_lockreq = CILR_MANDATORY;
+
                         vio->u.fault.ft_vma     = vma;
                         vio->u.fault.ft_address = address;
                         vio->u.fault.ft_type    = type;
-                        cio->cui_fd = LUSTRE_FPRIVATE(file);
+                        cio->cui_fd             = fd;
 
                         result = cl_io_loop(env, io);
                         if (result == 0) {
index aa2dcf0..890997e 100644 (file)
@@ -146,9 +146,17 @@ static int vvp_mmap_locks(const struct lu_env *env,
                 count += addr & (~CFS_PAGE_MASK);
                 addr &= CFS_PAGE_MASK;
                 while((vma = our_vma(addr, count)) != NULL) {
-                        LASSERT(vma->vm_file);
+                        struct file *file = vma->vm_file;
+                        struct ll_file_data *fd;
+
+                        LASSERT(file);
+                        fd = LUSTRE_FPRIVATE(file);
+
+                        inode = file->f_dentry->d_inode;
+                        if (!(fd->fd_flags & LL_FILE_IGNORE_LOCK ||
+                            ll_i2sbi(inode)->ll_flags & LL_SBI_NOLCK))
+                                goto cont;
 
-                        inode = vma->vm_file->f_dentry->d_inode;
                         /*
                          * XXX: Required lock mode can be weakened: CIT_WRITE
                          * io only ever reads user level buffer, and CIT_READ
@@ -161,9 +169,11 @@ static int vvp_mmap_locks(const struct lu_env *env,
                                                     policy.l_extent.start);
                         descr->cld_end = cl_index(descr->cld_obj,
                                                   policy.l_extent.end);
-                        result = cl_io_lock_alloc_add(env, io, descr);
+                        result = cl_io_lock_alloc_add(env, io, descr, CEF_MUST);
                         if (result < 0)
                                 RETURN(result);
+
+                cont:
                         if (vma->vm_end - addr >= count)
                                 break;
                         count -= vma->vm_end - addr;
index 90f53fb..6a98fbc 100644 (file)
@@ -396,6 +396,15 @@ struct lovsub_lock {
         struct cl_lock       *lss_active;
 };
 
+/**
+ * Describe the environment settings for sublocks.
+ */
+struct lov_sublock_env {
+        const struct lu_env *lse_env;
+        struct cl_io        *lse_io;
+        struct lov_io_sub   *lse_sub;
+};
+
 struct lovsub_page {
         struct cl_page_slice lsb_cl;
 };
@@ -507,7 +516,8 @@ struct lov_io {
 };
 
 struct lov_session {
-        struct lov_io ls_io;
+        struct lov_io          ls_io;
+        struct lov_sublock_env ls_subenv;
 };
 
 /**
@@ -564,7 +574,9 @@ int   lov_io_init_empty   (const struct lu_env *env, struct cl_object *obj,
 void  lov_lock_unlink     (const struct lu_env *env, struct lov_lock_link *link,
                            struct lovsub_lock *sub);
 
-void  lov_sub_put         (struct lov_io_sub *sub);
+struct lov_io_sub *lov_sub_get(const struct lu_env *env, struct lov_io *lio,
+                               int stripe);
+void  lov_sub_put             (struct lov_io_sub *sub);
 int   lov_sublock_modify  (const struct lu_env *env, struct lov_lock *lov,
                            struct lovsub_lock *sublock,
                            const struct cl_lock_descr *d, int idx);
index 346a992..4ab7d25 100644 (file)
@@ -201,8 +201,8 @@ static int lov_io_sub_init(const struct lu_env *env, struct lov_io *lio,
         RETURN(result);
 }
 
-static struct lov_io_sub *lov_sub_get(const struct lu_env *env,
-                                      struct lov_io *lio, int stripe)
+struct lov_io_sub *lov_sub_get(const struct lu_env *env,
+                               struct lov_io *lio, int stripe)
 {
         int rc;
         struct lov_io_sub *sub = &lio->lis_subs[stripe];
@@ -391,6 +391,7 @@ static int lov_io_iter_init(const struct lu_env *env,
                                stripe, start, end);
                 } else
                         rc = PTR_ERR(sub);
+
                 if (!rc)
                         list_add_tail(&sub->sub_linkage, &lio->lis_active);
                 else
index 14ecd68..905b6cc 100644 (file)
@@ -53,6 +53,50 @@ static struct cl_lock_closure *lov_closure_get(const struct lu_env *env,
  *
  */
 
+static struct lov_sublock_env *lov_sublock_env_get(const struct lu_env *env,
+                                                   struct cl_lock *parent,
+                                                   struct lov_lock_sub *lls)
+{
+        struct lov_sublock_env *subenv;
+        struct lov_io          *lio    = lov_env_io(env);
+        struct cl_io           *io     = lio->lis_cl.cis_io;
+        struct lov_io_sub      *sub;
+
+        subenv = &lov_env_session(env)->ls_subenv;
+
+        /*
+         * FIXME: We tend to use the subio's env & io to call the sublock
+         * lock operations because osc lock sometimes stores some control
+         * variables in thread's IO infomation(Now only lockless information).
+         * However, if the lock's host(object) is different from the object
+         * for current IO, we have no way to get the subenv and subio because
+         * they are not initialized at all. As a temp fix, in this case,
+         * we still borrow the parent's env to call sublock operations.
+         */
+        if (!cl_object_same(io->ci_obj, parent->cll_descr.cld_obj)) {
+                subenv->lse_env = env;
+                subenv->lse_io  = io;
+                subenv->lse_sub = NULL;
+        } else {
+                LASSERT(io != NULL);
+                sub = lov_sub_get(env, lio, lls->sub_stripe);
+                if (!IS_ERR(sub)) {
+                        subenv->lse_env = sub->sub_env;
+                        subenv->lse_io  = sub->sub_io;
+                        subenv->lse_sub = sub;
+                } else {
+                        subenv = (void*)sub;
+                }
+        }
+        return subenv;
+}
+
+static void lov_sublock_env_put(struct lov_sublock_env *subenv)
+{
+        if (subenv && subenv->lse_sub)
+                lov_sub_put(subenv->lse_sub);
+}
+
 static void lov_sublock_adopt(const struct lu_env *env, struct lov_lock *lck,
                               struct cl_lock *sublock, int idx,
                               struct lov_lock_link *link)
@@ -102,15 +146,30 @@ static struct cl_lock *lov_sublock_alloc(const struct lu_env *env,
 
         OBD_SLAB_ALLOC_PTR(link, lov_lock_link_kmem);
         if (link != NULL) {
-                struct lov_lock_sub  *sub;
+                struct lov_sublock_env *subenv;
+                struct lov_lock_sub  *lls;
                 struct cl_lock_descr *descr;
 
                 parent = lck->lls_cl.cls_lock;
-                sub    = &lck->lls_sub[idx];
-                descr  = &sub->sub_descr;
+                lls    = &lck->lls_sub[idx];
+                descr  = &lls->sub_descr;
+
+                subenv = lov_sublock_env_get(env, parent, lls);
+                if (!IS_ERR(subenv)) {
+                        /* CAVEAT: Don't try to add a field in lov_lock_sub
+                         * to remember the subio. This is because lock is able
+                         * to be cached, but this is not true for IO. This
+                         * further means a sublock might be referenced in
+                         * different io context. -jay */
+
+                        sublock = cl_lock_hold(subenv->lse_env, subenv->lse_io,
+                                               descr, "lov-parent", parent);
+                        lov_sublock_env_put(subenv);
+                } else {
+                        /* error occurs. */
+                        sublock = (void*)subenv;
+                }
 
-                /* XXX maybe sub-io? */
-                sublock = cl_lock_hold(env, io, descr, "lov-parent", parent);
                 if (!IS_ERR(sublock))
                         *out = link;
                 else
@@ -122,28 +181,46 @@ static struct cl_lock *lov_sublock_alloc(const struct lu_env *env,
 
 static void lov_sublock_unlock(const struct lu_env *env,
                                struct lovsub_lock *lsl,
-                               struct cl_lock_closure *closure)
+                               struct cl_lock_closure *closure,
+                               struct lov_sublock_env *subenv)
 {
         ENTRY;
+        lov_sublock_env_put(subenv);
         lsl->lss_active = NULL;
         cl_lock_disclosure(env, closure);
         EXIT;
 }
 
-static int lov_sublock_lock(const struct lu_env *env, struct lovsub_lock *lsl,
-                            struct cl_lock_closure *closure)
+static int lov_sublock_lock(const struct lu_env *env,
+                            struct lov_lock_sub *lls,
+                            struct cl_lock_closure *closure,
+                            struct lov_sublock_env **lsep)
 {
         struct cl_lock *child;
-        int             result;
+        int             result = 0;
+        ENTRY;
 
         LASSERT(list_empty(&closure->clc_list));
 
-        ENTRY;
-        child = lsl->lss_cl.cls_lock;
+        child = lls->sub_lock->lss_cl.cls_lock;
         result = cl_lock_closure_build(env, child, closure);
         if (result == 0) {
+                struct cl_lock *parent = closure->clc_origin;
+
                 LASSERT(cl_lock_is_mutexed(child));
-                lsl->lss_active = closure->clc_origin;
+                lls->sub_lock->lss_active = parent;
+
+                if (lsep) {
+                        struct lov_sublock_env *subenv;
+                        subenv = lov_sublock_env_get(env, parent, lls);
+                        if (IS_ERR(subenv)) {
+                                lov_sublock_unlock(env, lls->sub_lock,
+                                                   closure, NULL);
+                                result = PTR_ERR(subenv);
+                        } else {
+                                *lsep = subenv;
+                        }
+                }
         }
         RETURN(result);
 }
@@ -308,7 +385,7 @@ static int lov_sublock_release(const struct lu_env *env, struct lov_lock *lck,
         ENTRY;
 
         if (lck->lls_sub[i].sub_flags & LSF_HELD) {
-                struct cl_lock *sublock;
+                struct cl_lock    *sublock;
                 int dying;
 
                 LASSERT(lck->lls_sub[i].sub_lock != NULL);
@@ -404,8 +481,8 @@ static int lov_lock_enqueue_one(const struct lu_env *env, struct lov_lock *lck,
                                 struct cl_io *io, __u32 enqflags, int last)
 {
         int result;
-
         ENTRY;
+
         /* first, try to enqueue a sub-lock ... */
         result = cl_enqueue_try(env, sublock, io, enqflags);
         if (sublock->cll_state == CLS_ENQUEUED)
@@ -480,8 +557,10 @@ static int lov_lock_enqueue(const struct lu_env *env,
 
         for (result = 0, minstate = CLS_FREEING, i = 0; i < lck->lls_nr; ++i) {
                 int rc;
-                struct lovsub_lock *sub;
-                struct cl_lock *sublock;
+                struct lovsub_lock     *sub;
+                struct lov_lock_sub    *lls;
+                struct cl_lock         *sublock;
+                struct lov_sublock_env *subenv;
 
                 if (lock->cll_state != CLS_QUEUING) {
                         /*
@@ -493,7 +572,8 @@ static int lov_lock_enqueue(const struct lu_env *env,
                         break;
                 }
 
-                sub = lck->lls_sub[i].sub_lock;
+                lls = &lck->lls_sub[i];
+                sub = lls->sub_lock;
                 /*
                  * Sub-lock might have been canceled, while top-lock was
                  * cached.
@@ -505,11 +585,11 @@ static int lov_lock_enqueue(const struct lu_env *env,
                         break;
                 }
                 sublock = sub->lss_cl.cls_lock;
-                rc = lov_sublock_lock(env, sub, closure);
+                rc = lov_sublock_lock(env, lls, closure, &subenv);
                 if (rc == 0) {
                         lov_sublock_hold(env, lck, i);
-                        rc = lov_lock_enqueue_one(env, lck, sublock, io,
-                                                  enqflags,
+                        rc = lov_lock_enqueue_one(subenv->lse_env, lck, sublock,
+                                                  subenv->lse_io, enqflags,
                                                   i == lck->lls_nr - 1);
                         minstate = min(minstate, sublock->cll_state);
                         /*
@@ -518,7 +598,7 @@ static int lov_lock_enqueue(const struct lu_env *env,
                          */
                         if (sublock->cll_state > CLS_HELD)
                                 rc = lov_sublock_release(env, lck, i, 1, rc);
-                        lov_sublock_unlock(env, sub, closure);
+                        lov_sublock_unlock(env, sub, closure, subenv);
                 }
                 result = lov_subresult(result, rc);
                 if (result < 0)
@@ -540,28 +620,31 @@ static int lov_lock_unuse(const struct lu_env *env,
 
         for (result = 0, i = 0; i < lck->lls_nr; ++i) {
                 int rc;
-                struct lovsub_lock *sub;
-                struct cl_lock *sublock;
+                struct lovsub_lock     *sub;
+                struct cl_lock         *sublock;
+                struct lov_lock_sub    *lls;
+                struct lov_sublock_env *subenv;
 
                 /* top-lock state cannot change concurrently, because single
                  * thread (one that released the last hold) carries unlocking
                  * to the completion. */
                 LASSERT(slice->cls_lock->cll_state == CLS_UNLOCKING);
-                sub = lck->lls_sub[i].sub_lock;
+                lls = &lck->lls_sub[i];
+                sub = lls->sub_lock;
                 if (sub == NULL)
                         continue;
 
                 sublock = sub->lss_cl.cls_lock;
-                rc = lov_sublock_lock(env, sub, closure);
+                rc = lov_sublock_lock(env, lls, closure, &subenv);
                 if (rc == 0) {
                         if (lck->lls_sub[i].sub_flags & LSF_HELD) {
                                 LASSERT(sublock->cll_state == CLS_HELD);
-                                rc = cl_unuse_try(env, sublock);
+                                rc = cl_unuse_try(subenv->lse_env, sublock);
                                 if (rc != CLO_WAIT)
                                         rc = lov_sublock_release(env, lck,
                                                                  i, 0, rc);
                         }
-                        lov_sublock_unlock(env, sub, closure);
+                        lov_sublock_unlock(env, sub, closure, subenv);
                 }
                 result = lov_subresult(result, rc);
                 if (result < 0)
@@ -588,19 +671,23 @@ static int lov_lock_wait(const struct lu_env *env,
 
         for (result = 0, minstate = CLS_FREEING, i = 0; i < lck->lls_nr; ++i) {
                 int rc;
-                struct lovsub_lock *sub;
-                struct cl_lock *sublock;
+                struct lovsub_lock     *sub;
+                struct cl_lock         *sublock;
+                struct lov_lock_sub    *lls;
+                struct lov_sublock_env *subenv;
 
-                sub = lck->lls_sub[i].sub_lock;
+                lls = &lck->lls_sub[i];
+                sub = lls->sub_lock;
                 LASSERT(sub != NULL);
                 sublock = sub->lss_cl.cls_lock;
-                rc = lov_sublock_lock(env, sub, closure);
+                rc = lov_sublock_lock(env, lls, closure, &subenv);
                 if (rc == 0) {
                         LASSERT(sublock->cll_state >= CLS_ENQUEUED);
                         if (sublock->cll_state < CLS_HELD)
                                 rc = cl_wait_try(env, sublock);
+
                         minstate = min(minstate, sublock->cll_state);
-                        lov_sublock_unlock(env, sub, closure);
+                        lov_sublock_unlock(env, sub, closure, subenv);
                 }
                 result = lov_subresult(result, rc);
                 if (result < 0)
@@ -623,8 +710,10 @@ static int lov_lock_use(const struct lu_env *env,
 
         for (result = 0, i = 0; i < lck->lls_nr; ++i) {
                 int rc;
-                struct lovsub_lock *sub;
-                struct cl_lock *sublock;
+                struct lovsub_lock     *sub;
+                struct cl_lock         *sublock;
+                struct lov_lock_sub    *lls;
+                struct lov_sublock_env *subenv;
 
                 if (slice->cls_lock->cll_state != CLS_CACHED) {
                         /* see comment in lov_lock_enqueue(). */
@@ -636,21 +725,22 @@ static int lov_lock_use(const struct lu_env *env,
                  * CLS_CACHED state, top-lock would have been moved into
                  * CLS_NEW state, so all sub-locks have to be in place.
                  */
-                sub = lck->lls_sub[i].sub_lock;
+                lls = &lck->lls_sub[i];
+                sub = lls->sub_lock;
                 LASSERT(sub != NULL);
                 sublock = sub->lss_cl.cls_lock;
-                rc = lov_sublock_lock(env, sub, closure);
+                rc = lov_sublock_lock(env, lls, closure, &subenv);
                 if (rc == 0) {
                         LASSERT(sublock->cll_state != CLS_FREEING);
                         lov_sublock_hold(env, lck, i);
                         if (sublock->cll_state == CLS_CACHED) {
-                                rc = cl_use_try(env, sublock);
+                                rc = cl_use_try(subenv->lse_env, sublock);
                                 if (rc != 0)
                                         rc = lov_sublock_release(env, lck,
                                                                  i, 1, rc);
                         } else
                                 rc = 0;
-                        lov_sublock_unlock(env, sub, closure);
+                        lov_sublock_unlock(env, sub, closure, subenv);
                 }
                 result = lov_subresult(result, rc);
                 if (result < 0)
@@ -838,16 +928,18 @@ static void lov_lock_delete(const struct lu_env *env,
         ENTRY;
 
         for (i = 0; i < lck->lls_nr; ++i) {
-                struct lovsub_lock *lsl;
-                struct cl_lock *sublock;
+                struct lov_lock_sub *lls;
+                struct lovsub_lock  *lsl;
+                struct cl_lock      *sublock;
                 int rc;
 
-                lsl = lck->lls_sub[i].sub_lock;
+                lls = &lck->lls_sub[i];
+                lsl = lls->sub_lock;
                 if (lsl == NULL)
                         continue;
 
                 sublock = lsl->lss_cl.cls_lock;
-                rc = lov_sublock_lock(env, lsl, closure);
+                rc = lov_sublock_lock(env, lls, closure, NULL);
                 if (rc == 0) {
                         if (lck->lls_sub[i].sub_flags & LSF_HELD)
                                 lov_sublock_release(env, lck, i, 1, 0);
@@ -859,7 +951,7 @@ static void lov_lock_delete(const struct lu_env *env,
                                 lov_lock_unlink(env, link, lsl);
                                 LASSERT(lck->lls_sub[i].sub_lock == NULL);
                         }
-                        lov_sublock_unlock(env, lsl, closure);
+                        lov_sublock_unlock(env, lsl, closure, NULL);
                 } else if (rc == CLO_REPEAT) {
                         --i; /* repeat with this lock */
                 } else {
index 62357e7..0e2f342 100644 (file)
@@ -582,7 +582,7 @@ static void cl_free_io_lock_link(const struct lu_env *env,
  * Allocates new lock link, and uses it to add a lock to a lockset.
  */
 int cl_io_lock_alloc_add(const struct lu_env *env, struct cl_io *io,
-                         struct cl_lock_descr *descr)
+                         struct cl_lock_descr *descr, int enqflags)
 {
         struct cl_io_lock_link *link;
         int result;
@@ -590,8 +590,9 @@ int cl_io_lock_alloc_add(const struct lu_env *env, struct cl_io *io,
         ENTRY;
         OBD_ALLOC_PTR(link);
         if (link != NULL) {
-                link->cill_descr = *descr;
-                link->cill_fini = cl_free_io_lock_link;
+                link->cill_descr     = *descr;
+                link->cill_enq_flags = enqflags;
+                link->cill_fini      = cl_free_io_lock_link;
                 result = cl_io_lock_add(env, io, link);
                 if (result) /* lock match */
                         link->cill_fini(env, link);
index 180e4f1..fd38a1c 100644 (file)
@@ -422,19 +422,23 @@ static int osc_io_trunc_start(const struct lu_env *env,
         struct osc_punch_cbargs *cbargs = &oio->oi_punch_cbarg;
         struct obd_capa         *capa;
         loff_t                   size   = io->u.ci_truncate.tr_size;
-        int                      result;
+        int                      result = 0;
+
 
         memset(oa, 0, sizeof(*oa));
 
         osc_trunc_check(env, io, oio, size);
 
-        cl_object_attr_lock(obj);
-        result = cl_object_attr_get(env, obj, attr);
-        if (result == 0) {
-                attr->cat_size = attr->cat_kms = size;
-                result = cl_object_attr_set(env, obj, attr, CAT_SIZE|CAT_KMS);
+        if (oio->oi_lockless == 0) {
+                cl_object_attr_lock(obj);
+                result = cl_object_attr_get(env, obj, attr);
+                if (result == 0) {
+                        attr->cat_size = attr->cat_kms = size;
+                        result = cl_object_attr_set(env, obj, attr,
+                                                    CAT_SIZE|CAT_KMS);
+                }
+                cl_object_attr_unlock(obj);
         }
-        cl_object_attr_unlock(obj);
 
         if (result == 0) {
                 oa->o_id = loi->loi_id;
index a0e3190..8f2b2b5 100644 (file)
@@ -60,6 +60,8 @@
 
 static const struct cl_lock_operations osc_lock_ops;
 static const struct cl_lock_operations osc_lock_lockless_ops;
+static void osc_lock_to_lockless(const struct lu_env *env,
+                                 struct osc_lock *ols, int force);
 
 int osc_lock_is_lockless(const struct osc_lock *olck)
 {
@@ -247,7 +249,7 @@ static int osc_enq2ldlm_flags(__u32 enqflags)
 {
         int result = 0;
 
-        LASSERT((enqflags & ~(CEF_NONBLOCK|CEF_ASYNC|CEF_DISCARD_DATA)) == 0);
+        LASSERT((enqflags & ~CEF_MASK) == 0);
 
         if (enqflags & CEF_NONBLOCK)
                 result |= LDLM_FL_BLOCK_NOWAIT;
@@ -303,19 +305,6 @@ static void osc_ast_data_put(const struct lu_env *env, struct osc_lock *olck)
         cl_lock_put(env, lock);
 }
 
-static void osc_lock_to_lockless(struct osc_lock *olck)
-{
-        struct cl_lock_slice *slice = &olck->ols_cl;
-        struct cl_lock  *lock       = slice->cls_lock;
-
-        /*
-         * TODO: Discover which locks we need to convert the lock
-         * to ldlmlockless.
-         */
-        LASSERT(cl_lock_is_mutexed(lock));
-        slice->cls_ops = &osc_lock_lockless_ops;
-}
-
 /**
  * Updates object attributes from a lock value block (lvb) received together
  * with the DLM lock reply from the server. Copy of osc_update_enqueue()
@@ -519,7 +508,7 @@ static int osc_lock_upcall(void *cookie, int errcode)
                         LASSERT(slice->cls_ops == &osc_lock_ops);
 
                         /* Change this lock to ldlmlock-less lock. */
-                        osc_lock_to_lockless(olck);
+                        osc_lock_to_lockless(env, olck, 1);
                         olck->ols_state = OLS_GRANTED;
                         rc = 0;
                 } else if (olck->ols_glimpse && rc == -ENAVAIL) {
@@ -998,6 +987,60 @@ static int osc_lock_cancel_wait(const struct lu_env *env, struct cl_lock *lock,
 }
 
 /**
+ * Determine if the lock should be converted into a lockless lock.
+ *
+ * Steps to check:
+ * - if the lock has an explicite requirment for a non-lockless lock;
+ * - if the io lock request type ci_lockreq;
+ * - send the enqueue rpc to ost to make the further decision;
+ * - special treat to truncate lockless lock
+ *
+ *  Additional policy can be implemented here, e.g., never do lockless-io
+ *  for large extents.
+ */
+static void osc_lock_to_lockless(const struct lu_env *env,
+                                 struct osc_lock *ols, int force)
+{
+        struct cl_lock_slice *slice = &ols->ols_cl;
+        struct cl_lock *lock        = slice->cls_lock;
+
+        LASSERT(ols->ols_state == OLS_NEW ||
+                ols->ols_state == OLS_UPCALL_RECEIVED);
+
+        if (force) {
+                ols->ols_locklessable = 1;
+                LASSERT(cl_lock_is_mutexed(lock));
+                slice->cls_ops = &osc_lock_lockless_ops;
+        } else {
+                struct osc_io *oio     = osc_env_io(env);
+                struct cl_io  *io      = oio->oi_cl.cis_io;
+                struct cl_object *obj  = slice->cls_obj;
+                struct osc_object *oob = cl2osc(obj);
+                const struct osc_device *osd = lu2osc_dev(obj->co_lu.lo_dev);
+                struct obd_connect_data *ocd;
+
+                LASSERT(io->ci_lockreq == CILR_MANDATORY ||
+                        io->ci_lockreq == CILR_MAYBE ||
+                        io->ci_lockreq == CILR_NEVER);
+
+                ocd = &class_exp2cliimp(osc_export(oob))->imp_connect_data;
+                ols->ols_locklessable = (io->ci_type != CIT_TRUNC) &&
+                                (io->ci_lockreq == CILR_MAYBE) &&
+                                (ocd->ocd_connect_flags & OBD_CONNECT_SRVLOCK);
+                if (io->ci_lockreq == CILR_NEVER ||
+                        /* lockless IO */
+                    (ols->ols_locklessable && osc_object_is_contended(oob)) ||
+                        /* lockless truncate */
+                    (io->ci_type == CIT_TRUNC &&
+                     (ocd->ocd_connect_flags & OBD_CONNECT_TRUNCLOCK) &&
+                      osd->od_lockless_truncate)) {
+                        ols->ols_locklessable = 1;
+                        slice->cls_ops = &osc_lock_lockless_ops;
+                }
+        }
+}
+
+/**
  * Cancel all conflicting locks and wait for them to be destroyed.
  *
  * This function is used for two purposes:
@@ -1190,8 +1233,6 @@ static int osc_lock_enqueue(const struct lu_env *env,
         osc_lock_build_res(env, obj, resname);
         osc_lock_build_policy(env, lock, policy);
         ols->ols_flags = osc_enq2ldlm_flags(enqflags);
-        if (ols->ols_locklessable)
-                ols->ols_flags |= LDLM_FL_DENY_ON_CONTENTION;
         if (osc_deadlock_is_possible(env, lock))
                 ols->ols_flags |= LDLM_FL_BLOCK_GRANTED;
         if (ols->ols_flags & LDLM_FL_HAS_INTENT)
@@ -1199,26 +1240,37 @@ static int osc_lock_enqueue(const struct lu_env *env,
 
         result = osc_lock_enqueue_wait(env, ols);
         if (result == 0) {
-                /* a reference for lock, passed as an upcall cookie */
-                cl_lock_get(lock);
-                lu_ref_add(&lock->cll_reference, "upcall", lock);
-                ols->ols_state = OLS_ENQUEUED;
+                if (!(enqflags & CEF_MUST))
+                        /* try to convert this lock to a lockless lock */
+                        osc_lock_to_lockless(env, ols, (enqflags & CEF_NEVER));
+                if (!osc_lock_is_lockless(ols)) {
+                        if (ols->ols_locklessable)
+                                ols->ols_flags |= LDLM_FL_DENY_ON_CONTENTION;
+
+                        /* a reference for lock, passed as an upcall cookie */
+                        cl_lock_get(lock);
+                        lu_ref_add(&lock->cll_reference, "upcall", lock);
+                        ols->ols_state = OLS_ENQUEUED;
 
-                /*
-                 * XXX: this is possible blocking point as
-                 * ldlm_lock_match(LDLM_FL_LVB_READY) waits for
-                 * LDLM_CP_CALLBACK.
-                 */
-                result = osc_enqueue_base(osc_export(obj), resname,
+                        /*
+                         * XXX: this is possible blocking point as
+                         * ldlm_lock_match(LDLM_FL_LVB_READY) waits for
+                         * LDLM_CP_CALLBACK.
+                         */
+                        result = osc_enqueue_base(osc_export(obj), resname,
                                           &ols->ols_flags, policy,
                                           &ols->ols_lvb,
                                           obj->oo_oinfo->loi_kms_valid,
                                           osc_lock_upcall,
                                           ols, einfo, &ols->ols_handle,
                                           PTLRPCD_SET, 1);
-                if (result != 0) {
-                        lu_ref_del(&lock->cll_reference, "upcall", lock);
-                        cl_lock_put(env, lock);
+                        if (result != 0) {
+                                lu_ref_del(&lock->cll_reference,
+                                           "upcall", lock);
+                                cl_lock_put(env, lock);
+                        }
+                } else {
+                        ols->ols_state = OLS_GRANTED;
                 }
         }
 
@@ -1473,18 +1525,8 @@ static int osc_lock_lockless_enqueue(const struct lu_env *env,
                                      const struct cl_lock_slice *slice,
                                      struct cl_io *_, __u32 enqflags)
 {
-        struct osc_lock          *ols     = cl2osc_lock(slice);
-        struct cl_lock           *lock    = ols->ols_cl.cls_lock;
-        int result;
-
-        LASSERT(cl_lock_is_mutexed(lock));
-        LASSERT(lock->cll_state == CLS_QUEUING);
-        LASSERT(ols->ols_state == OLS_NEW);
-
-        result = osc_lock_enqueue_wait(env, ols);
-        if (result == 0)
-                ols->ols_state = OLS_GRANTED;
-        return result;
+        LBUG();
+        return 0;
 }
 
 static int osc_lock_lockless_unuse(const struct lu_env *env,
@@ -1537,7 +1579,11 @@ static void osc_lock_lockless_state(const struct lu_env *env,
         if (state == CLS_HELD) {
                 LASSERT(lock->ols_owner == NULL);
                 lock->ols_owner = oio;
-                oio->oi_lockless = 1;
+
+                /* set the io to be lockless if this lock is for io's
+                 * host object */
+                if (cl_object_same(oio->oi_cl.cis_obj, slice->cls_obj))
+                        oio->oi_lockless = 1;
         } else
                 lock->ols_owner = NULL;
 }
@@ -1563,56 +1609,16 @@ static const struct cl_lock_operations osc_lock_lockless_ops = {
 
 int osc_lock_init(const struct lu_env *env,
                   struct cl_object *obj, struct cl_lock *lock,
-                  const struct cl_io *io)
+                  const struct cl_io *_)
 {
-        struct osc_lock   *clk;
-        struct osc_io     *oio = osc_env_io(env);
-        struct osc_object *oob = cl2osc(obj);
+        struct osc_lock *clk;
         int result;
 
         OBD_SLAB_ALLOC_PTR(clk, osc_lock_kmem);
         if (clk != NULL) {
-                const struct cl_lock_operations *ops;
-                const struct osc_device *osd = lu2osc_dev(obj->co_lu.lo_dev);
-                struct obd_connect_data *ocd;
-
                 osc_lock_build_einfo(env, lock, clk, &clk->ols_einfo);
                 clk->ols_state = OLS_NEW;
-
-                /*
-                 * Check if we need to do lockless IO here.
-                 * Following conditions must be satisfied:
-                 * - the current IO must be locklessable;
-                 * - the stripe is in contention;
-                 * - requested lock is not a glimpse.
-                 *
-                 * if not, we have to inherit the locklessable flag to
-                 * osc_lock, and let ost make the decision.
-                 *
-                 * Additional policy can be implemented here, e.g., never do
-                 * lockless-io for large extents.
-                 */
-                LASSERT(io->ci_lockreq == CILR_MANDATORY ||
-                        io->ci_lockreq == CILR_MAYBE ||
-                        io->ci_lockreq == CILR_NEVER);
-                ocd = &class_exp2cliimp(osc_export(oob))->imp_connect_data;
-                clk->ols_locklessable = (io->ci_type != CIT_TRUNC) &&
-                                (io->ci_lockreq == CILR_MAYBE) &&
-                                (ocd->ocd_connect_flags & OBD_CONNECT_SRVLOCK);
-                ops = &osc_lock_ops;
-                if (io->ci_lockreq == CILR_NEVER ||
-                    /* lockless IO */
-                    (clk->ols_locklessable && osc_object_is_contended(oob)) ||
-                     /* lockless truncate */
-                    (io->ci_type == CIT_TRUNC &&
-                     (ocd->ocd_connect_flags & OBD_CONNECT_TRUNCLOCK) &&
-                     osd->od_lockless_truncate)) {
-                        ops = &osc_lock_lockless_ops;
-                        oio->oi_lockless     = 1;
-                        clk->ols_locklessable = 1;
-                }
-
-                cl_lock_slice_add(lock, &clk->ols_cl, obj, ops);
+                cl_lock_slice_add(lock, &clk->ols_cl, obj, &osc_lock_ops);
                 result = 0;
         } else
                 result = -ENOMEM;
index 89cbd93..591024d 100644 (file)
@@ -636,9 +636,11 @@ test_32a() { # bug 11270
         local p="$TMP/sanityN-$TESTNAME.parameters"
         save_lustre_params $HOSTNAME osc.*.lockless_truncate > $p
         cancel_lru_locks osc
-        clear_osc_stats
         enable_lockless_truncate 1
+        rm -f $DIR1/$tfile
+        lfs setstripe -c -1 $DIR1/$tfile
         dd if=/dev/zero of=$DIR1/$tfile count=10 bs=1M > /dev/null 2>&1
+        clear_osc_stats
 
         log "checking cached lockless truncate"
         $TRUNCATE $DIR1/$tfile 8000000