Whamcloud - gitweb
LU-1338 hsm: HSM flags feature
[fs/lustre-release.git] / lustre / llite / file.c
index 9855069..8d04d4e 100644 (file)
@@ -27,7 +27,7 @@
  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2011, 2012, Whamcloud, Inc.
+ * Copyright (c) 2011, 2012, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -81,6 +81,9 @@ void ll_pack_inode2opdata(struct inode *inode, struct md_op_data *op_data,
         if (fh)
                 op_data->op_handle = *fh;
         op_data->op_capa1 = ll_mdscapa_get(inode);
+
+       if (LLIF_DATA_MODIFIED & ll_i2info(inode)->lli_flags)
+               op_data->op_bias |= MDS_DATA_MODIFIED;
 }
 
 /**
@@ -154,6 +157,17 @@ static int ll_close_inode_openhandle(struct obd_export *md_exp,
                 CERROR("inode %lu mdc close failed: rc = %d\n",
                        inode->i_ino, rc);
         }
+
+       /* DATA_MODIFIED flag was successfully sent on close, cancel data
+        * modification flag. */
+       if (rc == 0 && (op_data->op_bias & MDS_DATA_MODIFIED)) {
+               struct ll_inode_info *lli = ll_i2info(inode);
+
+               spin_lock(&lli->lli_lock);
+               lli->lli_flags &= ~LLIF_DATA_MODIFIED;
+               spin_unlock(&lli->lli_lock);
+       }
+
         ll_finish_md_op_data(op_data);
 
         if (rc == 0) {
@@ -397,7 +411,7 @@ static int ll_intent_file_open(struct file *file, void *lmm,
                 GOTO(out, rc);
         }
 
-        rc = ll_prep_inode(&file->f_dentry->d_inode, req, NULL);
+        rc = ll_prep_inode(&file->f_dentry->d_inode, req, NULL, itp);
         if (!rc && itp->d.lustre.it_lock_mode)
                 ll_set_lock_data(sbi->ll_md_exp, file->f_dentry->d_inode,
                                  itp, NULL);
@@ -1271,37 +1285,36 @@ out:
 
 static int ll_lov_recreate_obj(struct inode *inode, unsigned long arg)
 {
-        struct ll_recreate_obj ucreat;
-        ENTRY;
+       struct ll_recreate_obj ucreat;
+       ENTRY;
 
-        if (!cfs_capable(CFS_CAP_SYS_ADMIN))
-                RETURN(-EPERM);
+       if (!cfs_capable(CFS_CAP_SYS_ADMIN))
+               RETURN(-EPERM);
 
-        if (cfs_copy_from_user(&ucreat, (struct ll_recreate_obj *)arg,
-                               sizeof(struct ll_recreate_obj)))
-                RETURN(-EFAULT);
+       if (copy_from_user(&ucreat, (struct ll_recreate_obj *)arg,
+                          sizeof(ucreat)))
+               RETURN(-EFAULT);
 
-        RETURN(ll_lov_recreate(inode, ucreat.lrc_id, 0,
-                               ucreat.lrc_ost_idx));
+       RETURN(ll_lov_recreate(inode, ucreat.lrc_id, 0,
+                              ucreat.lrc_ost_idx));
 }
 
 static int ll_lov_recreate_fid(struct inode *inode, unsigned long arg)
 {
-        struct lu_fid fid;
-        obd_id id;
-        obd_count ost_idx;
+       struct lu_fid   fid;
+       obd_id          id;
+       obd_count       ost_idx;
         ENTRY;
 
-        if (!cfs_capable(CFS_CAP_SYS_ADMIN))
-                RETURN(-EPERM);
+       if (!cfs_capable(CFS_CAP_SYS_ADMIN))
+               RETURN(-EPERM);
 
-        if (cfs_copy_from_user(&fid, (struct lu_fid *)arg,
-                               sizeof(struct lu_fid)))
-                RETURN(-EFAULT);
+       if (copy_from_user(&fid, (struct lu_fid *)arg, sizeof(fid)))
+               RETURN(-EFAULT);
 
-        id = fid_oid(&fid) | ((fid_seq(&fid) & 0xffff) << 32);
-        ost_idx = (fid_seq(&fid) >> 16) & 0xffff;
-        RETURN(ll_lov_recreate(inode, id, 0, ost_idx));
+       id = fid_oid(&fid) | ((fid_seq(&fid) & 0xffff) << 32);
+       ost_idx = (fid_seq(&fid) >> 16) & 0xffff;
+       RETURN(ll_lov_recreate(inode, id, 0, ost_idx));
 }
 
 int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file,
@@ -1421,58 +1434,61 @@ out:
 static int ll_lov_setea(struct inode *inode, struct file *file,
                             unsigned long arg)
 {
-        int flags = MDS_OPEN_HAS_OBJS | FMODE_WRITE;
-        struct lov_user_md  *lump;
-        int lum_size = sizeof(struct lov_user_md) +
-                       sizeof(struct lov_user_ost_data);
-        int rc;
-        ENTRY;
+       int                      flags = MDS_OPEN_HAS_OBJS | FMODE_WRITE;
+       struct lov_user_md      *lump;
+       int                      lum_size = sizeof(struct lov_user_md) +
+                                           sizeof(struct lov_user_ost_data);
+       int                      rc;
+       ENTRY;
 
-        if (!cfs_capable(CFS_CAP_SYS_ADMIN))
-                RETURN(-EPERM);
+       if (!cfs_capable(CFS_CAP_SYS_ADMIN))
+               RETURN(-EPERM);
 
-        OBD_ALLOC_LARGE(lump, lum_size);
-        if (lump == NULL) {
+       OBD_ALLOC_LARGE(lump, lum_size);
+       if (lump == NULL)
                 RETURN(-ENOMEM);
-        }
-        if (cfs_copy_from_user(lump, (struct lov_user_md  *)arg, lum_size)) {
-                OBD_FREE_LARGE(lump, lum_size);
-                RETURN(-EFAULT);
-        }
 
-        rc = ll_lov_setstripe_ea_info(inode, file, flags, lump, lum_size);
+       if (copy_from_user(lump, (struct lov_user_md  *)arg, lum_size)) {
+               OBD_FREE_LARGE(lump, lum_size);
+               RETURN(-EFAULT);
+       }
 
-        OBD_FREE_LARGE(lump, lum_size);
-        RETURN(rc);
+       rc = ll_lov_setstripe_ea_info(inode, file, flags, lump, lum_size);
+
+       OBD_FREE_LARGE(lump, lum_size);
+       RETURN(rc);
 }
 
 static int ll_lov_setstripe(struct inode *inode, struct file *file,
-                            unsigned long arg)
-{
-        struct lov_user_md_v3 lumv3;
-        struct lov_user_md_v1 *lumv1 = (struct lov_user_md_v1 *)&lumv3;
-        struct lov_user_md_v1 *lumv1p = (struct lov_user_md_v1 *)arg;
-        struct lov_user_md_v3 *lumv3p = (struct lov_user_md_v3 *)arg;
-        int lum_size;
-        int rc;
-        int flags = FMODE_WRITE;
-        ENTRY;
+                           unsigned long arg)
+{
+       struct lov_user_md_v3    lumv3;
+       struct lov_user_md_v1   *lumv1 = (struct lov_user_md_v1 *)&lumv3;
+       struct lov_user_md_v1   *lumv1p = (struct lov_user_md_v1 *)arg;
+       struct lov_user_md_v3   *lumv3p = (struct lov_user_md_v3 *)arg;
+       int                      lum_size, rc;
+       int                      flags = FMODE_WRITE;
+       ENTRY;
 
-        /* first try with v1 which is smaller than v3 */
-        lum_size = sizeof(struct lov_user_md_v1);
-        if (cfs_copy_from_user(lumv1, lumv1p, lum_size))
-                RETURN(-EFAULT);
+       /* first try with v1 which is smaller than v3 */
+       lum_size = sizeof(struct lov_user_md_v1);
+       if (copy_from_user(lumv1, lumv1p, lum_size))
+               RETURN(-EFAULT);
 
-        if (lumv1->lmm_magic == LOV_USER_MAGIC_V3) {
-                lum_size = sizeof(struct lov_user_md_v3);
-                if (cfs_copy_from_user(&lumv3, lumv3p, lum_size))
-                        RETURN(-EFAULT);
-        }
+       if (lumv1->lmm_magic == LOV_USER_MAGIC_V3) {
+               lum_size = sizeof(struct lov_user_md_v3);
+               if (copy_from_user(&lumv3, lumv3p, lum_size))
+                       RETURN(-EFAULT);
+       }
 
-        rc = ll_lov_setstripe_ea_info(inode, file, flags, lumv1, lum_size);
-        if (rc == 0) {
+       rc = ll_lov_setstripe_ea_info(inode, file, flags, lumv1, lum_size);
+       if (rc == 0) {
                struct lov_stripe_md *lsm;
+               __u32 gen;
+
                put_user(0, &lumv1p->lmm_stripe_count);
+
+               ll_layout_refresh(inode, &gen);
                lsm = ccc_inode_lsm_get(inode);
                rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode),
                                   0, lsm, (void *)arg);
@@ -1679,43 +1695,43 @@ out:
 
 int ll_fid2path(struct inode *inode, void *arg)
 {
-       struct obd_export *exp = ll_i2mdexp(inode);
-        struct getinfo_fid2path *gfout, *gfin;
-        int outsize, rc;
-        ENTRY;
+       struct obd_export       *exp = ll_i2mdexp(inode);
+       struct getinfo_fid2path *gfout, *gfin;
+       int                      outsize, rc;
+       ENTRY;
 
        if (!cfs_capable(CFS_CAP_DAC_READ_SEARCH) &&
            !(ll_i2sbi(inode)->ll_flags & LL_SBI_USER_FID2PATH))
                RETURN(-EPERM);
 
-        /* Need to get the buflen */
-        OBD_ALLOC_PTR(gfin);
-        if (gfin == NULL)
-                RETURN(-ENOMEM);
-        if (cfs_copy_from_user(gfin, arg, sizeof(*gfin))) {
-                OBD_FREE_PTR(gfin);
-                RETURN(-EFAULT);
-        }
+       /* Need to get the buflen */
+       OBD_ALLOC_PTR(gfin);
+       if (gfin == NULL)
+               RETURN(-ENOMEM);
+       if (copy_from_user(gfin, arg, sizeof(*gfin))) {
+               OBD_FREE_PTR(gfin);
+               RETURN(-EFAULT);
+       }
 
-        outsize = sizeof(*gfout) + gfin->gf_pathlen;
-        OBD_ALLOC(gfout, outsize);
-        if (gfout == NULL) {
-                OBD_FREE_PTR(gfin);
-                RETURN(-ENOMEM);
-        }
-        memcpy(gfout, gfin, sizeof(*gfout));
-        OBD_FREE_PTR(gfin);
+       outsize = sizeof(*gfout) + gfin->gf_pathlen;
+       OBD_ALLOC(gfout, outsize);
+       if (gfout == NULL) {
+               OBD_FREE_PTR(gfin);
+               RETURN(-ENOMEM);
+       }
+       memcpy(gfout, gfin, sizeof(*gfout));
+       OBD_FREE_PTR(gfin);
 
-        /* Call mdc_iocontrol */
-        rc = obd_iocontrol(OBD_IOC_FID2PATH, exp, outsize, gfout, NULL);
-        if (rc)
-                GOTO(gf_free, rc);
-        if (cfs_copy_to_user(arg, gfout, outsize))
-                rc = -EFAULT;
+       /* Call mdc_iocontrol */
+       rc = obd_iocontrol(OBD_IOC_FID2PATH, exp, outsize, gfout, NULL);
+       if (rc)
+               GOTO(gf_free, rc);
+       if (copy_to_user(arg, gfout, outsize))
+               rc = -EFAULT;
 
 gf_free:
-        OBD_FREE(gfout, outsize);
-        RETURN(rc);
+       OBD_FREE(gfout, outsize);
+       RETURN(rc);
 }
 
 static int ll_ioctl_fiemap(struct inode *inode, unsigned long arg)
@@ -1737,10 +1753,10 @@ static int ll_ioctl_fiemap(struct inode *inode, unsigned long arg)
         if (fiemap_s == NULL)
                 RETURN(-ENOMEM);
 
-        /* get the fiemap value */
-        if (copy_from_user(fiemap_s,(struct ll_user_fiemap __user *)arg,
-                           sizeof(*fiemap_s)))
-                GOTO(error, rc = -EFAULT);
+       /* get the fiemap value */
+       if (copy_from_user(fiemap_s, (struct ll_user_fiemap __user *)arg,
+                          sizeof(*fiemap_s)))
+               GOTO(error, rc = -EFAULT);
 
         /* If fm_extent_count is non-zero, read the first extent since
          * it is used to calculate end_offset and device from previous
@@ -1762,8 +1778,8 @@ static int ll_ioctl_fiemap(struct inode *inode, unsigned long arg)
                 ret_bytes += (fiemap_s->fm_mapped_extents *
                                  sizeof(struct ll_fiemap_extent));
 
-        if (copy_to_user((void *)arg, fiemap_s, ret_bytes))
-                rc = -EFAULT;
+       if (copy_to_user((void *)arg, fiemap_s, ret_bytes))
+               rc = -EFAULT;
 
 error:
         OBD_FREE_LARGE(fiemap_s, num_bytes);
@@ -1777,40 +1793,40 @@ error:
  * Version is computed using server side locking.
  *
  * @param extent_lock  Take extent lock. Not needed if a process is already
- *                     holding the OST object group locks.
+ *                    holding the OST object group locks.
  */
-static int ll_data_version(struct inode *inode, __u64 *data_version,
-                           int extent_lock)
+int ll_data_version(struct inode *inode, __u64 *data_version,
+                   int extent_lock)
 {
-       struct lov_stripe_md *lsm = NULL;
-       struct ll_sb_info    *sbi = ll_i2sbi(inode);
-       struct obdo          *obdo = NULL;
-       int                   rc;
+       struct lov_stripe_md    *lsm = NULL;
+       struct ll_sb_info       *sbi = ll_i2sbi(inode);
+       struct obdo             *obdo = NULL;
+       int                      rc;
        ENTRY;
 
        /* If no stripe, we consider version is 0. */
        lsm = ccc_inode_lsm_get(inode);
        if (lsm == NULL) {
-                *data_version = 0;
-                CDEBUG(D_INODE, "No object for inode\n");
-                RETURN(0);
-        }
+               *data_version = 0;
+               CDEBUG(D_INODE, "No object for inode\n");
+               RETURN(0);
+       }
 
-        OBD_ALLOC_PTR(obdo);
+       OBD_ALLOC_PTR(obdo);
        if (obdo == NULL) {
                ccc_inode_lsm_put(inode, lsm);
                RETURN(-ENOMEM);
        }
 
-        rc = ll_lsm_getattr(lsm, sbi->ll_dt_exp, NULL, obdo, 0, extent_lock);
-        if (!rc) {
-                if (!(obdo->o_valid & OBD_MD_FLDATAVERSION))
-                        rc = -EOPNOTSUPP;
-                else
-                        *data_version = obdo->o_data_version;
-        }
+       rc = ll_lsm_getattr(lsm, sbi->ll_dt_exp, NULL, obdo, 0, extent_lock);
+       if (!rc) {
+               if (!(obdo->o_valid & OBD_MD_FLDATAVERSION))
+                       rc = -EOPNOTSUPP;
+               else
+                       *data_version = obdo->o_data_version;
+       }
 
-        OBD_FREE_PTR(obdo);
+       OBD_FREE_PTR(obdo);
        ccc_inode_lsm_put(inode, lsm);
 
        RETURN(rc);
@@ -1889,33 +1905,32 @@ long ll_file_ioctl(struct file *file, unsigned int cmd, unsigned long arg)
         case FSFILT_IOC_SETVERSION_OLD:
         case FSFILT_IOC_SETVERSION:
         */
-        case LL_IOC_FLUSHCTX:
-                RETURN(ll_flush_ctx(inode));
-        case LL_IOC_PATH2FID: {
-                if (cfs_copy_to_user((void *)arg, ll_inode2fid(inode),
-                                     sizeof(struct lu_fid)))
-                        RETURN(-EFAULT);
+       case LL_IOC_FLUSHCTX:
+               RETURN(ll_flush_ctx(inode));
+       case LL_IOC_PATH2FID: {
+               if (copy_to_user((void *)arg, ll_inode2fid(inode),
+                                sizeof(struct lu_fid)))
+                       RETURN(-EFAULT);
 
-                RETURN(0);
-        }
-        case OBD_IOC_FID2PATH:
+               RETURN(0);
+       }
+       case OBD_IOC_FID2PATH:
                RETURN(ll_fid2path(inode, (void *)arg));
-        case LL_IOC_DATA_VERSION: {
-                struct ioc_data_version idv;
-                int rc;
+       case LL_IOC_DATA_VERSION: {
+               struct ioc_data_version idv;
+               int                     rc;
 
-                if (cfs_copy_from_user(&idv, (char *)arg, sizeof(idv)))
-                        RETURN(-EFAULT);
+               if (copy_from_user(&idv, (char *)arg, sizeof(idv)))
+                       RETURN(-EFAULT);
 
-                rc = ll_data_version(inode, &idv.idv_version,
-                                     !(idv.idv_flags & LL_DV_NOFLUSH));
+               rc = ll_data_version(inode, &idv.idv_version,
+                               !(idv.idv_flags & LL_DV_NOFLUSH));
 
-                if (rc == 0 &&
-                    cfs_copy_to_user((char *) arg, &idv, sizeof(idv)))
-                        RETURN(-EFAULT);
+               if (rc == 0 && copy_to_user((char *) arg, &idv, sizeof(idv)))
+                       RETURN(-EFAULT);
 
-                RETURN(rc);
-        }
+               RETURN(rc);
+       }
 
         case LL_IOC_GET_MDTIDX: {
                 int mdtidx;
@@ -1929,20 +1944,83 @@ long ll_file_ioctl(struct file *file, unsigned int cmd, unsigned long arg)
 
                 RETURN(0);
         }
-        case OBD_IOC_GETDTNAME:
-        case OBD_IOC_GETMDNAME:
-                RETURN(ll_get_obd_name(inode, cmd, arg));
-        default: {
-                int err;
-
-                if (LLIOC_STOP ==
-                    ll_iocontrol_call(inode, file, cmd, arg, &err))
-                        RETURN(err);
-
-                RETURN(obd_iocontrol(cmd, ll_i2dtexp(inode), 0, NULL,
-                                     (void *)arg));
-        }
-        }
+       case OBD_IOC_GETDTNAME:
+       case OBD_IOC_GETMDNAME:
+               RETURN(ll_get_obd_name(inode, cmd, arg));
+       case LL_IOC_HSM_STATE_GET: {
+               struct md_op_data       *op_data;
+               struct hsm_user_state   *hus;
+               int                      rc;
+
+               OBD_ALLOC_PTR(hus);
+               if (hus == NULL)
+                       RETURN(-ENOMEM);
+
+               op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0,
+                                            LUSTRE_OPC_ANY, hus);
+               if (op_data == NULL) {
+                       OBD_FREE_PTR(hus);
+                       RETURN(-ENOMEM);
+               }
+
+               rc = obd_iocontrol(cmd, ll_i2mdexp(inode), sizeof(*op_data),
+                                  op_data, NULL);
+
+               if (copy_to_user((void *)arg, hus, sizeof(*hus)))
+                       rc = -EFAULT;
+
+               ll_finish_md_op_data(op_data);
+               OBD_FREE_PTR(hus);
+               RETURN(rc);
+       }
+       case LL_IOC_HSM_STATE_SET: {
+               struct md_op_data       *op_data;
+               struct hsm_state_set    *hss;
+               int                      rc;
+
+               OBD_ALLOC_PTR(hss);
+               if (hss == NULL)
+                       RETURN(-ENOMEM);
+               if (copy_from_user(hss, (char *)arg, sizeof(*hss))) {
+                       OBD_FREE_PTR(hss);
+                       RETURN(-EFAULT);
+               }
+
+               /* Non-root users are forbidden to set or clear flags which are
+                * NOT defined in HSM_USER_MASK. */
+               if (((hss->hss_setmask | hss->hss_clearmask) & ~HSM_USER_MASK)
+                   && !cfs_capable(CFS_CAP_SYS_ADMIN)) {
+                       OBD_FREE_PTR(hss);
+                       RETURN(-EPERM);
+               }
+
+               op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0,
+                                            LUSTRE_OPC_ANY, hss);
+               if (op_data == NULL) {
+                       OBD_FREE_PTR(hss);
+                       RETURN(-ENOMEM);
+               }
+
+               rc = obd_iocontrol(cmd, ll_i2mdexp(inode), sizeof(*op_data),
+                                  op_data, NULL);
+
+               ll_finish_md_op_data(op_data);
+
+               OBD_FREE_PTR(hss);
+               RETURN(rc);
+       }
+
+       default: {
+               int err;
+
+               if (LLIOC_STOP ==
+                       ll_iocontrol_call(inode, file, cmd, arg, &err))
+                       RETURN(err);
+
+               RETURN(obd_iocontrol(cmd, ll_i2dtexp(inode), 0, NULL,
+                                    (void *)arg));
+       }
+       }
 }
 
 #ifndef HAVE_FILE_LLSEEK_SIZE
@@ -2195,6 +2273,7 @@ int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
         ldlm_policy_data_t flock = {{0}};
         int flags = 0;
         int rc;
+       int rc2 = 0;
         ENTRY;
 
         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu file_lock=%p\n",
@@ -2290,15 +2369,22 @@ int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
         rc = md_enqueue(sbi->ll_md_exp, &einfo, NULL,
                         op_data, &lockh, &flock, 0, NULL /* req */, flags);
 
-        ll_finish_md_op_data(op_data);
-
         if ((file_lock->fl_flags & FL_FLOCK) &&
             (rc == 0 || file_lock->fl_type == F_UNLCK))
-                flock_lock_file_wait(file, file_lock);
+               rc2  = flock_lock_file_wait(file, file_lock);
         if ((file_lock->fl_flags & FL_POSIX) &&
             (rc == 0 || file_lock->fl_type == F_UNLCK) &&
             !(flags & LDLM_FL_TEST_LOCK))
-                posix_lock_file_wait(file, file_lock);
+               rc2  = posix_lock_file_wait(file, file_lock);
+
+       if (rc2 && file_lock->fl_type != F_UNLCK) {
+               einfo.ei_mode = LCK_NL;
+               md_enqueue(sbi->ll_md_exp, &einfo, NULL,
+                       op_data, &lockh, &flock, 0, NULL /* req */, flags);
+               rc = rc2;
+       }
+
+       ll_finish_md_op_data(op_data);
 
         RETURN(rc);
 }
@@ -2487,7 +2573,7 @@ int __ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it,
                         RETURN(rc);
                 }
 
-                rc = ll_prep_inode(&inode, req, NULL);
+                rc = ll_prep_inode(&inode, req, NULL, NULL);
         }
 out:
         ptlrpc_req_finished(req);
@@ -2895,10 +2981,126 @@ int ll_layout_conf(struct inode *inode, const struct cl_object_conf *conf)
 
        result = cl_conf_set(env, lli->lli_clob, conf);
        cl_env_nested_put(&nest, env);
+
+       if (conf->coc_opc == OBJECT_CONF_SET) {
+               struct ldlm_lock *lock = conf->coc_lock;
+
+               LASSERT(lock != NULL);
+               LASSERT(ldlm_has_layout(lock));
+               if (result == 0) {
+                       /* it can only be allowed to match after layout is
+                        * applied to inode otherwise false layout would be
+                        * seen. Applying layout shoud happen before dropping
+                        * the intent lock. */
+                       ldlm_lock_allow_match(lock);
+               }
+       }
        RETURN(result);
 }
 
 /**
+ * Apply the layout to the inode. Layout lock is held and will be released
+ * in this function.
+ */
+static int ll_layout_lock_set(struct lustre_handle *lockh, ldlm_mode_t mode,
+                               struct inode *inode, __u32 *gen, bool reconf)
+{
+       struct ll_inode_info *lli = ll_i2info(inode);
+       struct ll_sb_info    *sbi = ll_i2sbi(inode);
+       struct ldlm_lock *lock;
+       struct lustre_md md = { NULL };
+       struct cl_object_conf conf;
+       int rc = 0;
+       bool lvb_ready;
+       ENTRY;
+
+       LASSERT(lustre_handle_is_used(lockh));
+
+       lock = ldlm_handle2lock(lockh);
+       LASSERT(lock != NULL);
+       LASSERT(ldlm_has_layout(lock));
+
+       LDLM_DEBUG(lock, "File %p/"DFID" being reconfigured: %d.\n",
+               inode, PFID(&lli->lli_fid), reconf);
+
+       lock_res_and_lock(lock);
+       lvb_ready = !!(lock->l_flags & LDLM_FL_LVB_READY);
+       unlock_res_and_lock(lock);
+       /* checking lvb_ready is racy but this is okay. The worst case is
+        * that multi processes may configure the file on the same time. */
+       if (lvb_ready || !reconf) {
+               LDLM_LOCK_PUT(lock);
+
+               rc = -ENODATA;
+               if (lvb_ready) {
+                       /* layout_gen must be valid if layout lock is not
+                        * cancelled and stripe has already set */
+                       *gen = lli->lli_layout_gen;
+                       rc = 0;
+               }
+               ldlm_lock_decref(lockh, mode);
+               RETURN(rc);
+       }
+
+       /* for layout lock, lmm is returned in lock's lvb.
+        * lvb_data is immutable if the lock is held so it's safe to access it
+        * without res lock. See the description in ldlm_lock_decref_internal()
+        * for the condition to free lvb_data of layout lock */
+       if (lock->l_lvb_data != NULL) {
+               rc = obd_unpackmd(sbi->ll_dt_exp, &md.lsm,
+                                 lock->l_lvb_data, lock->l_lvb_len);
+               if (rc >= 0) {
+                       if (md.lsm != NULL)
+                               *gen = md.lsm->lsm_layout_gen;
+                       rc = 0;
+               } else {
+                       CERROR("%s: file "DFID" unpackmd error: %d\n",
+                               ll_get_fsname(inode->i_sb, NULL, 0),
+                               PFID(&lli->lli_fid), rc);
+               }
+       }
+       if (rc < 0) {
+               LDLM_LOCK_PUT(lock);
+               ldlm_lock_decref(lockh, mode);
+               RETURN(rc);
+       }
+
+       /* set layout to file. Unlikely this will fail as old layout was
+        * surely eliminated */
+       memset(&conf, 0, sizeof conf);
+       conf.coc_opc = OBJECT_CONF_SET;
+       conf.coc_inode = inode;
+       conf.coc_lock = lock;
+       conf.u.coc_md = &md;
+       rc = ll_layout_conf(inode, &conf);
+       LDLM_LOCK_PUT(lock);
+
+       ldlm_lock_decref(lockh, mode);
+
+       if (md.lsm != NULL)
+               obd_free_memmd(sbi->ll_dt_exp, &md.lsm);
+
+       /* wait for IO to complete if it's still being used. */
+       if (rc == -EBUSY) {
+               CDEBUG(D_INODE, "%s: %p/"DFID" wait for layout reconf.\n",
+                       ll_get_fsname(inode->i_sb, NULL, 0),
+                       inode, PFID(&lli->lli_fid));
+
+               memset(&conf, 0, sizeof conf);
+               conf.coc_opc = OBJECT_CONF_WAIT;
+               conf.coc_inode = inode;
+               rc = ll_layout_conf(inode, &conf);
+               if (rc == 0)
+                       rc = -EAGAIN;
+
+               CDEBUG(D_INODE, "file: "DFID" waiting layout return: %d.\n",
+                       PFID(&lli->lli_fid), rc);
+       }
+
+       RETURN(rc);
+}
+
+/**
  * This function checks if there exists a LAYOUT lock on the client side,
  * or enqueues it if it doesn't have one in cache.
  *
@@ -2915,9 +3117,9 @@ int ll_layout_refresh(struct inode *inode, __u32 *gen)
 {
        struct ll_inode_info  *lli = ll_i2info(inode);
        struct ll_sb_info     *sbi = ll_i2sbi(inode);
-       struct md_op_data     *op_data = NULL;
-       struct lookup_intent   it = { .it_op = IT_LAYOUT };
-       struct lustre_handle   lockh = { 0 };
+       struct md_op_data     *op_data;
+       struct lookup_intent   it;
+       struct lustre_handle   lockh;
        ldlm_mode_t            mode;
        struct ldlm_enqueue_info einfo = { .ei_type = LDLM_IBITS,
                                           .ei_mode = LCK_CR,
@@ -2927,7 +3129,7 @@ int ll_layout_refresh(struct inode *inode, __u32 *gen)
        int rc;
        ENTRY;
 
-       *gen = 0;
+       *gen = LL_LAYOUT_GEN_ZERO;
        if (!(sbi->ll_flags & LL_SBI_LAYOUT_LOCK))
                RETURN(0);
 
@@ -2937,92 +3139,67 @@ int ll_layout_refresh(struct inode *inode, __u32 *gen)
 
        /* mostly layout lock is caching on the local side, so try to match
         * it before grabbing layout lock mutex. */
-       mode = ll_take_md_lock(inode, MDS_INODELOCK_LAYOUT, &lockh,
-                               LDLM_FL_LVB_READY);
+       mode = ll_take_md_lock(inode, MDS_INODELOCK_LAYOUT, &lockh, 0);
        if (mode != 0) { /* hit cached lock */
-               /* lsm_layout_gen is started from 0, plus 1 here to distinguish
-                * the cases of no layout and first layout. */
-               *gen = lli->lli_layout_gen + 1;
+               rc = ll_layout_lock_set(&lockh, mode, inode, gen, false);
+               if (rc == 0)
+                       RETURN(0);
 
-               ldlm_lock_decref(&lockh, mode);
-               RETURN(0);
+               /* better hold lli_layout_mutex to try again otherwise
+                * it will have starvation problem. */
        }
 
-       op_data = ll_prep_md_op_data(NULL, inode, inode, NULL,
-                                    0, 0, LUSTRE_OPC_ANY, NULL);
-       if (IS_ERR(op_data))
-               RETURN(PTR_ERR(op_data));
-
        /* take layout lock mutex to enqueue layout lock exclusively. */
        mutex_lock(&lli->lli_layout_mutex);
 
-       /* try again inside layout mutex */
-       mode = ll_take_md_lock(inode, MDS_INODELOCK_LAYOUT, &lockh,
-                               LDLM_FL_LVB_READY);
+again:
+       /* try again. Maybe somebody else has done this. */
+       mode = ll_take_md_lock(inode, MDS_INODELOCK_LAYOUT, &lockh, 0);
        if (mode != 0) { /* hit cached lock */
-               *gen = lli->lli_layout_gen + 1;
+               rc = ll_layout_lock_set(&lockh, mode, inode, gen, true);
+               if (rc == -EAGAIN)
+                       goto again;
 
-               ldlm_lock_decref(&lockh, mode);
                mutex_unlock(&lli->lli_layout_mutex);
-               ll_finish_md_op_data(op_data);
-               RETURN(0);
+               RETURN(rc);
+       }
+
+       op_data = ll_prep_md_op_data(NULL, inode, inode, NULL,
+                       0, 0, LUSTRE_OPC_ANY, NULL);
+       if (IS_ERR(op_data)) {
+               mutex_unlock(&lli->lli_layout_mutex);
+               RETURN(PTR_ERR(op_data));
        }
 
        /* have to enqueue one */
+       memset(&it, 0, sizeof(it));
+       it.it_op = IT_LAYOUT;
+       lockh.cookie = 0ULL;
+
+       LDLM_DEBUG_NOLOCK("%s: requeue layout lock for file %p/"DFID".\n",
+                       ll_get_fsname(inode->i_sb, NULL, 0), inode,
+                       PFID(&lli->lli_fid));
+
        rc = md_enqueue(sbi->ll_md_exp, &einfo, &it, op_data, &lockh,
                        NULL, 0, NULL, 0);
        if (it.d.lustre.it_data != NULL)
                ptlrpc_req_finished(it.d.lustre.it_data);
        it.d.lustre.it_data = NULL;
 
-       if (rc == 0) {
-               struct ldlm_lock *lock;
-               struct cl_object_conf conf;
-               struct lustre_md md = { NULL };
-               void *lmm;
-               int lmmsize;
+       ll_finish_md_op_data(op_data);
 
-               LASSERT(lustre_handle_is_used(&lockh));
+       mode = it.d.lustre.it_lock_mode;
+       it.d.lustre.it_lock_mode = 0;
+       ll_intent_drop_lock(&it);
 
+       if (rc == 0) {
                /* set lock data in case this is a new lock */
                ll_set_lock_data(sbi->ll_md_exp, inode, &it, NULL);
-
-               lock = ldlm_handle2lock(&lockh);
-               LASSERT(lock != NULL);
-
-               /* for IT_LAYOUT lock, lmm is returned in lock's lvb
-                * data via completion callback */
-               lmm = lock->l_lvb_data;
-               lmmsize = lock->l_lvb_len;
-               if (lmm != NULL) {
-                       rc = obd_unpackmd(sbi->ll_dt_exp, &md.lsm,
-                                       lmm, lmmsize);
-                       if (rc >= 0) {
-                               if (md.lsm != NULL)
-                                       *gen = md.lsm->lsm_layout_gen + 1;
-                               rc = 0;
-                       } else {
-                               CERROR("file: "DFID" unpackmd error: %d\n",
-                                       PFID(&lli->lli_fid), rc);
-                       }
-               }
-               LDLM_LOCK_PUT(lock);
-
-               /* set layout to file. This may cause lock expiration as we
-                * set layout inside layout ibits lock. */
-               memset(&conf, 0, sizeof conf);
-               conf.coc_inode = inode;
-               conf.u.coc_md = &md;
-               ll_layout_conf(inode, &conf);
-               /* is this racy? */
-               lli->lli_has_smd = md.lsm != NULL;
-               if (md.lsm != NULL)
-                       obd_free_memmd(sbi->ll_dt_exp, &md.lsm);
+               rc = ll_layout_lock_set(&lockh, mode, inode, gen, true);
+               if (rc == -EAGAIN)
+                       goto again;
        }
-       ll_intent_drop_lock(&it);
-
        mutex_unlock(&lli->lli_layout_mutex);
-       ll_finish_md_op_data(op_data);
 
        RETURN(rc);
 }