Whamcloud - gitweb
LU-6557 llite: remove unused ll_max_rw_chunk
[fs/lustre-release.git] / lustre / llite / llite_lib.c
index 91993d7..381fdc4 100644 (file)
@@ -27,7 +27,7 @@
  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2011, 2013, Intel Corporation.
+ * Copyright (c) 2011, 2014, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
 #include <linux/types.h>
 #include <linux/version.h>
 #include <linux/mm.h>
+#include <linux/user_namespace.h>
+#ifdef HAVE_UIDGID_HEADER
+# include <linux/uidgid.h>
+#endif
 
 #include <lustre_ioctl.h>
 #include <lustre_ha.h>
@@ -91,16 +95,11 @@ static struct ll_sb_info *ll_init_sbi(void)
        lru_page_max = pages / 2;
 
        /* initialize ll_cache data */
-       atomic_set(&sbi->ll_cache.ccc_users, 0);
-       sbi->ll_cache.ccc_lru_max = lru_page_max;
-       atomic_long_set(&sbi->ll_cache.ccc_lru_left, lru_page_max);
-       spin_lock_init(&sbi->ll_cache.ccc_lru_lock);
-       INIT_LIST_HEAD(&sbi->ll_cache.ccc_lru);
-
-       /* turn unstable check off by default as it impacts performance */
-       sbi->ll_cache.ccc_unstable_check = 0;
-       atomic_long_set(&sbi->ll_cache.ccc_unstable_nr, 0);
-       init_waitqueue_head(&sbi->ll_cache.ccc_unstable_waitq);
+       sbi->ll_cache = cl_cache_init(lru_page_max);
+       if (sbi->ll_cache == NULL) {
+               OBD_FREE(sbi, sizeof(*sbi));
+               RETURN(NULL);
+       }
 
        sbi->ll_ra_info.ra_max_pages_per_file = min(pages / 32,
                                           SBI_DEFAULT_READAHEAD_MAX);
@@ -162,6 +161,10 @@ static void ll_free_sbi(struct super_block *sb)
                spin_unlock(&ll_sb_lock);
                if (!list_empty(&sbi->ll_squash.rsi_nosquash_nids))
                        cfs_free_nidlist(&sbi->ll_squash.rsi_nosquash_nids);
+               if (sbi->ll_cache != NULL) {
+                       cl_cache_decref(sbi->ll_cache);
+                       sbi->ll_cache = NULL;
+               }
                OBD_FREE(sbi, sizeof(*sbi));
        }
        EXIT;
@@ -170,7 +173,7 @@ static void ll_free_sbi(struct super_block *sb)
 static int client_common_fill_super(struct super_block *sb, char *md, char *dt,
                                     struct vfsmount *mnt)
 {
-        struct inode *root = 0;
+       struct inode *root = NULL;
         struct ll_sb_info *sbi = ll_s2sbi(sb);
         struct obd_device *obd;
         struct obd_capa *oc = NULL;
@@ -180,7 +183,7 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt,
         struct obd_uuid *uuid;
         struct md_op_data *op_data;
         struct lustre_md lmd;
-        obd_valid valid;
+       u64 valid;
         int size, err, checksum;
         ENTRY;
 
@@ -218,9 +221,6 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt,
                                  OBD_CONNECT_OPEN_BY_FID |
                                  OBD_CONNECT_DIR_STRIPE;
 
-        if (sbi->ll_flags & LL_SBI_SOM_PREVIEW)
-                data->ocd_connect_flags |= OBD_CONNECT_SOM;
-
 #ifdef HAVE_LRU_RESIZE_SUPPORT
         if (sbi->ll_flags & LL_SBI_LRU_RESIZE)
                 data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
@@ -321,7 +321,7 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt,
 
        size = sizeof(*data);
        err = obd_get_info(NULL, sbi->ll_md_exp, sizeof(KEY_CONN_DATA),
-                          KEY_CONN_DATA,  &size, data, NULL);
+                          KEY_CONN_DATA,  &size, data);
        if (err) {
                CERROR("%s: Get connect data failed: rc = %d\n",
                       sbi->ll_md_exp->exp_obd->obd_name, err);
@@ -334,7 +334,6 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt,
         sb->s_magic = LL_SUPER_MAGIC;
         sb->s_maxbytes = MAX_LFS_FILESIZE;
         sbi->ll_namelen = osfs->os_namelen;
-        sbi->ll_max_rw_chunk = LL_DEFAULT_MAX_RW_CHUNK;
 
         if ((sbi->ll_flags & LL_SBI_USER_XATTR) &&
             !(data->ocd_connect_flags & OBD_CONNECT_XATTR)) {
@@ -419,9 +418,6 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt,
                                  OBD_CONNECT_LAYOUTLOCK |
                                  OBD_CONNECT_PINGLESS | OBD_CONNECT_LFSCK;
 
-        if (sbi->ll_flags & LL_SBI_SOM_PREVIEW)
-                data->ocd_connect_flags |= OBD_CONNECT_SOM;
-
         if (!OBD_FAIL_CHECK(OBD_FAIL_OSC_CONNECT_CKSUM)) {
                 /* OBD_CONNECT_CKSUM should always be set, even if checksums are
                  * disabled by default, because it can still be enabled on the
@@ -557,12 +553,6 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt,
                 GOTO(out_root, err);
         }
 
-        err = ll_close_thread_start(&sbi->ll_lcq);
-        if (err) {
-                CERROR("cannot start close thread: rc %d\n", err);
-                GOTO(out_root, err);
-        }
-
 #ifdef CONFIG_FS_POSIX_ACL
         if (sbi->ll_flags & LL_SBI_RMT_CLIENT) {
                 rct_init(&sbi->ll_rct);
@@ -577,8 +567,8 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt,
         cl_sb_init(sb);
 
        err = obd_set_info_async(NULL, sbi->ll_dt_exp, sizeof(KEY_CACHE_SET),
-                                KEY_CACHE_SET, sizeof(sbi->ll_cache),
-                                &sbi->ll_cache, NULL);
+                                KEY_CACHE_SET, sizeof(*sbi->ll_cache),
+                                sbi->ll_cache, NULL);
 
        sb->s_root = d_make_root(root);
        if (sb->s_root == NULL) {
@@ -624,8 +614,6 @@ out_lock_cn_cb:
 out_dt:
         obd_disconnect(sbi->ll_dt_exp);
         sbi->ll_dt_exp = NULL;
-       /* Make sure all OScs are gone, since cl_cache is accessing sbi. */
-       obd_zombie_barrier();
 out_md_fid:
        obd_fid_fini(sbi->ll_md_exp->exp_obd);
 out_md:
@@ -646,48 +634,59 @@ int ll_get_max_mdsize(struct ll_sb_info *sbi, int *lmmsize)
        *lmmsize = obd_size_diskmd(sbi->ll_dt_exp, NULL);
        size = sizeof(int);
        rc = obd_get_info(NULL, sbi->ll_md_exp, sizeof(KEY_MAX_EASIZE),
-                         KEY_MAX_EASIZE, &size, lmmsize, NULL);
+                         KEY_MAX_EASIZE, &size, lmmsize);
        if (rc)
                CERROR("Get max mdsize error rc %d\n", rc);
 
        RETURN(rc);
 }
 
+/**
+ * Get the value of the default_easize parameter.
+ *
+ * \see client_obd::cl_default_mds_easize
+ *
+ * \param[in] sbi      superblock info for this filesystem
+ * \param[out] lmmsize pointer to storage location for value
+ *
+ * \retval 0           on success
+ * \retval negative    negated errno on failure
+ */
 int ll_get_default_mdsize(struct ll_sb_info *sbi, int *lmmsize)
 {
        int size, rc;
 
        size = sizeof(int);
        rc = obd_get_info(NULL, sbi->ll_md_exp, sizeof(KEY_DEFAULT_EASIZE),
-                        KEY_DEFAULT_EASIZE, &size, lmmsize, NULL);
+                        KEY_DEFAULT_EASIZE, &size, lmmsize);
        if (rc)
                CERROR("Get default mdsize error rc %d\n", rc);
 
        RETURN(rc);
 }
 
-int ll_get_max_cookiesize(struct ll_sb_info *sbi, int *lmmsize)
+/**
+ * Set the default_easize parameter to the given value.
+ *
+ * \see client_obd::cl_default_mds_easize
+ *
+ * \param[in] sbi      superblock info for this filesystem
+ * \param[in] lmmsize  the size to set
+ *
+ * \retval 0           on success
+ * \retval negative    negated errno on failure
+ */
+int ll_set_default_mdsize(struct ll_sb_info *sbi, int lmmsize)
 {
-       int size, rc;
-
-       size = sizeof(int);
-       rc = obd_get_info(NULL, sbi->ll_md_exp, sizeof(KEY_MAX_COOKIESIZE),
-                         KEY_MAX_COOKIESIZE, &size, lmmsize, NULL);
-       if (rc)
-               CERROR("Get max cookiesize error rc %d\n", rc);
-
-       RETURN(rc);
-}
+       int rc;
 
-int ll_get_default_cookiesize(struct ll_sb_info *sbi, int *lmmsize)
-{
-       int size, rc;
+       if (lmmsize < sizeof(struct lov_mds_md) ||
+           lmmsize > OBD_MAX_DEFAULT_EA_SIZE)
+               return -EINVAL;
 
-       size = sizeof(int);
-       rc = obd_get_info(NULL, sbi->ll_md_exp, sizeof(KEY_DEFAULT_COOKIESIZE),
-                         KEY_DEFAULT_COOKIESIZE, &size, lmmsize, NULL);
-       if (rc)
-               CERROR("Get default cookiesize error rc %d\n", rc);
+       rc = obd_set_info_async(NULL, sbi->ll_md_exp,
+                               sizeof(KEY_DEFAULT_EASIZE), KEY_DEFAULT_EASIZE,
+                               sizeof(int), &lmmsize, NULL);
 
        RETURN(rc);
 }
@@ -732,7 +731,7 @@ void lustre_dump_dentry(struct dentry *dentry, int recur)
                 return;
 
        list_for_each(tmp, &dentry->d_subdirs) {
-               struct dentry *d = list_entry(tmp, struct dentry, d_u.d_child);
+               struct dentry *d = list_entry(tmp, struct dentry, d_child);
                lustre_dump_dentry(d, recur - 1);
        }
 }
@@ -749,8 +748,6 @@ static void client_common_put_super(struct super_block *sb)
         }
 #endif
 
-        ll_close_thread_shutdown(sbi->ll_lcq);
-
         cl_sb_fini(sb);
 
        list_del(&sbi->ll_conn_chain);
@@ -758,9 +755,6 @@ static void client_common_put_super(struct super_block *sb)
        obd_fid_fini(sbi->ll_dt_exp->exp_obd);
         obd_disconnect(sbi->ll_dt_exp);
         sbi->ll_dt_exp = NULL;
-       /* wait till all OSCs are gone, since cl_cache is accessing sbi.
-        * see LU-2543. */
-       obd_zombie_barrier();
 
         lprocfs_unregister_mountpoint(sbi);
 
@@ -789,9 +783,10 @@ void ll_kill_super(struct super_block *sb)
                sbi->ll_umounting = 1;
 
                /* wait running statahead threads to quit */
-               while (atomic_read(&sbi->ll_sa_running) > 0)
-                       schedule_timeout_and_set_state(TASK_UNINTERRUPTIBLE,
-                               msecs_to_jiffies(MSEC_PER_SEC >> 3));
+               while (atomic_read(&sbi->ll_sa_running) > 0) {
+                       set_current_state(TASK_UNINTERRUPTIBLE);
+                       schedule_timeout(msecs_to_jiffies(MSEC_PER_SEC >> 3));
+               }
        }
 
        EXIT;
@@ -895,11 +890,6 @@ static int ll_options(char *options, int *flags)
                         *flags &= ~tmp;
                         goto next;
                 }
-                tmp = ll_set_opt("som_preview", s1, LL_SBI_SOM_PREVIEW);
-                if (tmp) {
-                        *flags |= tmp;
-                        goto next;
-                }
                 tmp = ll_set_opt("32bitapi", s1, LL_SBI_32BIT_API);
                 if (tmp) {
                         *flags |= tmp;
@@ -933,20 +923,16 @@ void ll_lli_init(struct ll_inode_info *lli)
 {
        lli->lli_inode_magic = LLI_INODE_MAGIC;
        lli->lli_flags = 0;
-       lli->lli_ioepoch = 0;
-       lli->lli_maxbytes = MAX_LFS_FILESIZE;
        spin_lock_init(&lli->lli_lock);
        lli->lli_posix_acl = NULL;
        lli->lli_remote_perms = NULL;
        mutex_init(&lli->lli_rmtperm_mutex);
        /* Do not set lli_fid, it has been initialized already. */
        fid_zero(&lli->lli_pfid);
-       INIT_LIST_HEAD(&lli->lli_close_list);
        INIT_LIST_HEAD(&lli->lli_oss_capas);
        atomic_set(&lli->lli_open_count, 0);
        lli->lli_mds_capa = NULL;
        lli->lli_rmtperm_time = 0;
-       lli->lli_pending_och = NULL;
        lli->lli_mds_read_och = NULL;
         lli->lli_mds_write_och = NULL;
         lli->lli_mds_exec_och = NULL;
@@ -955,9 +941,8 @@ void ll_lli_init(struct ll_inode_info *lli)
         lli->lli_open_fd_exec_count = 0;
        mutex_init(&lli->lli_och_mutex);
        spin_lock_init(&lli->lli_agl_lock);
-       lli->lli_has_smd = false;
        spin_lock_init(&lli->lli_layout_lock);
-       ll_layout_version_set(lli, LL_LAYOUT_GEN_NONE);
+       ll_layout_version_set(lli, CL_LAYOUT_GEN_NONE);
        lli->lli_clob = NULL;
 
        init_rwsem(&lli->lli_xattrs_list_rwsem);
@@ -971,6 +956,7 @@ void ll_lli_init(struct ll_inode_info *lli)
                spin_lock_init(&lli->lli_sa_lock);
                lli->lli_opendir_pid = 0;
                lli->lli_sa_enabled = 0;
+               lli->lli_def_stripe_offset = -1;
        } else {
                mutex_init(&lli->lli_size_mutex);
                lli->lli_symlink_name = NULL;
@@ -1076,6 +1062,10 @@ int ll_fill_super(struct super_block *sb, struct vfsmount *mnt)
 
         /* connections, registrations, sb setup */
         err = client_common_fill_super(sb, md, dt, mnt);
+       if (err < 0)
+               GOTO(out_free, err);
+
+       sbi->ll_client_common_fill_super_succeeded = 1;
 
 out_free:
         if (md)
@@ -1121,12 +1111,12 @@ void ll_put_super(struct super_block *sb)
        /* Wait for unstable pages to be committed to stable storage */
        if (force == 0) {
                struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
-               rc = l_wait_event(sbi->ll_cache.ccc_unstable_waitq,
-                       atomic_long_read(&sbi->ll_cache.ccc_unstable_nr) == 0,
+               rc = l_wait_event(sbi->ll_cache->ccc_unstable_waitq,
+                       atomic_long_read(&sbi->ll_cache->ccc_unstable_nr) == 0,
                        &lwi);
        }
 
-       ccc_count = atomic_long_read(&sbi->ll_cache.ccc_unstable_nr);
+       ccc_count = atomic_long_read(&sbi->ll_cache->ccc_unstable_nr);
        if (force == 0 && rc != -EINTR)
                LASSERTF(ccc_count == 0, "count: %li\n", ccc_count);
 
@@ -1141,10 +1131,10 @@ void ll_put_super(struct super_block *sb)
                 }
         }
 
-        if (sbi->ll_lcq) {
-                /* Only if client_common_fill_super succeeded */
-                client_common_put_super(sb);
-        }
+       if (sbi->ll_client_common_fill_super_succeeded) {
+               /* Only if client_common_fill_super succeeded */
+               client_common_put_super(sb);
+       }
 
         next = 0;
         while ((obd = class_devices_in_group(&sbi->ll_sb_uuid, &next)) !=NULL) {
@@ -1295,10 +1285,7 @@ static int ll_init_lsm_md(struct inode *inode, struct lustre_md *md)
                }
        }
 
-       /* Here is where the lsm is being initialized(fill lmo_info) after
-        * client retrieve MD stripe information from MDT. */
-       return md_update_lsm_md(ll_i2mdexp(inode), lsm, md->body,
-                               ll_md_blocking_ast);
+       return 0;
 }
 
 static inline int lli_lsm_md_eq(const struct lmv_stripe_md *lsm_md1,
@@ -1398,10 +1385,7 @@ static int ll_update_lsm_md(struct inode *inode, struct lustre_md *md)
                RETURN(-EIO);
        }
 
-       rc = md_update_lsm_md(ll_i2mdexp(inode), ll_i2info(inode)->lli_lsm_md,
-                             md->body, ll_md_blocking_ast);
-
-       RETURN(rc);
+       RETURN(0);
 }
 
 void ll_clear_inode(struct inode *inode)
@@ -1420,9 +1404,6 @@ void ll_clear_inode(struct inode *inode)
                 LASSERT(lli->lli_opendir_pid == 0);
         }
 
-       spin_lock(&lli->lli_lock);
-        ll_i2info(inode)->lli_flags &= ~LLIF_MDS_SIZE_LOCK;
-       spin_unlock(&lli->lli_lock);
        md_null_inode(sbi->ll_md_exp, ll_inode2fid(inode));
 
         LASSERT(!lli->lli_open_fd_write_count);
@@ -1472,13 +1453,11 @@ void ll_clear_inode(struct inode *inode)
         * cl_object still uses inode lsm.
         */
        cl_inode_fini(inode);
-       lli->lli_has_smd = false;
 
        EXIT;
 }
 
-static int ll_md_setattr(struct dentry *dentry, struct md_op_data *op_data,
-                        struct md_open_data **mod)
+static int ll_md_setattr(struct dentry *dentry, struct md_op_data *op_data)
 {
         struct lustre_md md;
         struct inode *inode = dentry->d_inode;
@@ -1492,8 +1471,7 @@ static int ll_md_setattr(struct dentry *dentry, struct md_op_data *op_data,
         if (IS_ERR(op_data))
                 RETURN(PTR_ERR(op_data));
 
-        rc = md_setattr(sbi->ll_md_exp, op_data, NULL, 0, NULL, 0,
-                        &request, mod);
+       rc = md_setattr(sbi->ll_md_exp, op_data, NULL, 0, &request);
        if (rc) {
                ptlrpc_req_finished(request);
                if (rc == -ENOENT) {
@@ -1527,49 +1505,12 @@ static int ll_md_setattr(struct dentry *dentry, struct md_op_data *op_data,
        rc = simple_setattr(dentry, &op_data->op_attr);
        op_data->op_attr.ia_valid = ia_valid;
 
-        /* Extract epoch data if obtained. */
-       op_data->op_handle = md.body->mbo_handle;
-       op_data->op_ioepoch = md.body->mbo_ioepoch;
-
        rc = ll_update_inode(inode, &md);
        ptlrpc_req_finished(request);
 
        RETURN(rc);
 }
 
-/* Close IO epoch and send Size-on-MDS attribute update. */
-static int ll_setattr_done_writing(struct inode *inode,
-                                   struct md_op_data *op_data,
-                                   struct md_open_data *mod)
-{
-        struct ll_inode_info *lli = ll_i2info(inode);
-        int rc = 0;
-        ENTRY;
-
-        LASSERT(op_data != NULL);
-        if (!S_ISREG(inode->i_mode))
-                RETURN(0);
-
-        CDEBUG(D_INODE, "Epoch "LPU64" closed on "DFID" for truncate\n",
-               op_data->op_ioepoch, PFID(&lli->lli_fid));
-
-        op_data->op_flags = MF_EPOCH_CLOSE;
-        ll_done_writing_attr(inode, op_data);
-        ll_pack_inode2opdata(inode, op_data, NULL);
-
-        rc = md_done_writing(ll_i2sbi(inode)->ll_md_exp, op_data, mod);
-        if (rc == -EAGAIN) {
-                /* MDS has instructed us to obtain Size-on-MDS attribute
-                 * from OSTs and send setattr to back to MDS. */
-                rc = ll_som_update(inode, op_data);
-        } else if (rc) {
-               CERROR("%s: inode "DFID" mdc truncate failed: rc = %d\n",
-                      ll_i2sbi(inode)->ll_md_exp->exp_obd->obd_name,
-                      PFID(ll_inode2fid(inode)), rc);
-        }
-        RETURN(rc);
-}
-
 static int ll_setattr_ost(struct inode *inode, struct iattr *attr)
 {
         struct obd_capa *capa;
@@ -1580,7 +1521,7 @@ static int ll_setattr_ost(struct inode *inode, struct iattr *attr)
         else
                 capa = ll_mdscapa_get(inode);
 
-        rc = cl_setattr_ost(inode, attr, capa);
+       rc = cl_setattr_ost(ll_i2info(inode)->lli_clob, attr, 0, capa);
 
         if (attr->ia_valid & ATTR_SIZE)
                 ll_truncate_free_capa(capa);
@@ -1610,9 +1551,8 @@ int ll_setattr_raw(struct dentry *dentry, struct iattr *attr, bool hsm_import)
         struct inode *inode = dentry->d_inode;
         struct ll_inode_info *lli = ll_i2info(inode);
         struct md_op_data *op_data = NULL;
-        struct md_open_data *mod = NULL;
        bool file_is_released = false;
-       int rc = 0, rc1 = 0;
+       int rc = 0;
        ENTRY;
 
        CDEBUG(D_VFSTRACE, "%s: setattr inode "DFID"(%p) from %llu to %llu, "
@@ -1686,14 +1626,29 @@ int ll_setattr_raw(struct dentry *dentry, struct iattr *attr, bool hsm_import)
         * but other attributes must be set
         */
        if (S_ISREG(inode->i_mode)) {
-               struct lov_stripe_md *lsm;
+               struct cl_layout cl = {
+                       .cl_is_released = false,
+               };
+               struct lu_env *env;
+               int refcheck;
                __u32 gen;
 
-               ll_layout_refresh(inode, &gen);
-               lsm = ccc_inode_lsm_get(inode);
-               if (lsm && lsm->lsm_pattern & LOV_PATTERN_F_RELEASED)
-                       file_is_released = true;
-               ccc_inode_lsm_put(inode, lsm);
+               rc = ll_layout_refresh(inode, &gen);
+               if (rc < 0)
+                       GOTO(out, rc);
+
+               /* XXX: the only place we need to know the layout type,
+                * this will be removed by a later patch. -Jinshan */
+               env = cl_env_get(&refcheck);
+               if (IS_ERR(env))
+                       GOTO(out, rc = PTR_ERR(env));
+
+               rc = cl_object_layout_get(env, lli->lli_clob, &cl);
+               cl_env_put(env, &refcheck);
+               if (rc < 0)
+                       GOTO(out, rc);
+
+               file_is_released = cl.cl_is_released;
 
                if (!hsm_import && attr->ia_valid & ATTR_SIZE) {
                        if (file_is_released) {
@@ -1717,12 +1672,7 @@ int ll_setattr_raw(struct dentry *dentry, struct iattr *attr, bool hsm_import)
 
        memcpy(&op_data->op_attr, attr, sizeof(*attr));
 
-       /* Open epoch for truncate. */
-       if (exp_connect_som(ll_i2mdexp(inode)) && !hsm_import &&
-           (attr->ia_valid & (ATTR_SIZE | ATTR_MTIME | ATTR_MTIME_SET)))
-               op_data->op_flags = MF_EPOCH_OPEN;
-
-       rc = ll_md_setattr(dentry, op_data, &mod);
+       rc = ll_md_setattr(dentry, op_data);
        if (rc)
                GOTO(out, rc);
 
@@ -1733,7 +1683,6 @@ int ll_setattr_raw(struct dentry *dentry, struct iattr *attr, bool hsm_import)
                spin_unlock(&lli->lli_lock);
        }
 
-       ll_ioepoch_open(lli, op_data->op_ioepoch);
        if (!S_ISREG(inode->i_mode) || file_is_released)
                GOTO(out, rc = 0);
 
@@ -1754,14 +1703,9 @@ int ll_setattr_raw(struct dentry *dentry, struct iattr *attr, bool hsm_import)
        }
        EXIT;
 out:
-       if (op_data) {
-               if (op_data->op_ioepoch) {
-                       rc1 = ll_setattr_done_writing(inode, op_data, mod);
-                       if (!rc)
-                               rc = rc1;
-               }
+       if (op_data != NULL)
                ll_finish_md_op_data(op_data);
-       }
+
        if (!S_ISDIR(inode->i_mode)) {
                mutex_lock(&inode->i_mutex);
                if ((attr->ia_valid & ATTR_SIZE) && !hsm_import)
@@ -1920,15 +1864,8 @@ int ll_update_inode(struct inode *inode, struct lustre_md *md)
        struct ll_sb_info *sbi = ll_i2sbi(inode);
 
        LASSERT((lsm != NULL) == ((body->mbo_valid & OBD_MD_FLEASIZE) != 0));
-       if (lsm != NULL) {
-               if (!lli->lli_has_smd &&
-                   !(sbi->ll_flags & LL_SBI_LAYOUT_LOCK))
-                       cl_file_inode_init(inode, md);
-
-               lli->lli_maxbytes = lsm->lsm_maxbytes;
-               if (lli->lli_maxbytes > MAX_LFS_FILESIZE)
-                       lli->lli_maxbytes = MAX_LFS_FILESIZE;
-       }
+       if (lsm != NULL)
+               cl_file_inode_init(inode, md);
 
        if (S_ISDIR(inode->i_mode)) {
                int     rc;
@@ -1958,7 +1895,7 @@ int ll_update_inode(struct inode *inode, struct lustre_md *md)
        if (body->mbo_valid & OBD_MD_FLATIME) {
                if (body->mbo_atime > LTIME_S(inode->i_atime))
                        LTIME_S(inode->i_atime) = body->mbo_atime;
-               lli->lli_lvb.lvb_atime = body->mbo_atime;
+               lli->lli_atime = body->mbo_atime;
        }
 
        if (body->mbo_valid & OBD_MD_FLMTIME) {
@@ -1968,13 +1905,13 @@ int ll_update_inode(struct inode *inode, struct lustre_md *md)
                               LTIME_S(inode->i_mtime), body->mbo_mtime);
                        LTIME_S(inode->i_mtime) = body->mbo_mtime;
                }
-               lli->lli_lvb.lvb_mtime = body->mbo_mtime;
+               lli->lli_mtime = body->mbo_mtime;
        }
 
        if (body->mbo_valid & OBD_MD_FLCTIME) {
                if (body->mbo_ctime > LTIME_S(inode->i_ctime))
                        LTIME_S(inode->i_ctime) = body->mbo_ctime;
-               lli->lli_lvb.lvb_ctime = body->mbo_ctime;
+               lli->lli_ctime = body->mbo_ctime;
        }
 
        if (body->mbo_valid & OBD_MD_FLMODE)
@@ -2019,49 +1956,11 @@ int ll_update_inode(struct inode *inode, struct lustre_md *md)
        LASSERT(fid_seq(&lli->lli_fid) != 0);
 
        if (body->mbo_valid & OBD_MD_FLSIZE) {
-                if (exp_connect_som(ll_i2mdexp(inode)) &&
-                   S_ISREG(inode->i_mode)) {
-                        struct lustre_handle lockh;
-                        ldlm_mode_t mode;
-
-                        /* As it is possible a blocking ast has been processed
-                         * by this time, we need to check there is an UPDATE
-                         * lock on the client and set LLIF_MDS_SIZE_LOCK holding
-                         * it. */
-                        mode = ll_take_md_lock(inode, MDS_INODELOCK_UPDATE,
-                                              &lockh, LDLM_FL_CBPENDING,
-                                              LCK_CR | LCK_CW |
-                                              LCK_PR | LCK_PW);
-                        if (mode) {
-                                if (lli->lli_flags & (LLIF_DONE_WRITING |
-                                                      LLIF_EPOCH_PENDING |
-                                                      LLIF_SOM_DIRTY)) {
-                                       CERROR("%s: inode "DFID" flags %u still"
-                                              " has size authority! do not "
-                                              "trust the size from MDS\n",
-                                              sbi->ll_md_exp->exp_obd->obd_name,
-                                              PFID(ll_inode2fid(inode)),
-                                              lli->lli_flags);
-                                } else {
-                                        /* Use old size assignment to avoid
-                                         * deadlock bz14138 & bz14326 */
-                                       i_size_write(inode, body->mbo_size);
-                                       spin_lock(&lli->lli_lock);
-                                        lli->lli_flags |= LLIF_MDS_SIZE_LOCK;
-                                       spin_unlock(&lli->lli_lock);
-                                }
-                                ldlm_lock_decref(&lockh, mode);
-                        }
-                } else {
-                        /* Use old size assignment to avoid
-                         * deadlock bz14138 & bz14326 */
-                       i_size_write(inode, body->mbo_size);
-
-                       CDEBUG(D_VFSTRACE,
-                              "inode="DFID", updating i_size %llu\n",
-                              PFID(ll_inode2fid(inode)),
-                              (unsigned long long)body->mbo_size);
-               }
+               i_size_write(inode, body->mbo_size);
+
+               CDEBUG(D_VFSTRACE, "inode="DFID", updating i_size %llu\n",
+                      PFID(ll_inode2fid(inode)),
+                      (unsigned long long)body->mbo_size);
 
                if (body->mbo_valid & OBD_MD_FLBLOCKS)
                        inode->i_blocks = body->mbo_blocks;
@@ -2095,8 +1994,6 @@ int ll_read_inode2(struct inode *inode, void *opaque)
         CDEBUG(D_VFSTRACE, "VFS Op:inode="DFID"(%p)\n",
                PFID(&lli->lli_fid), inode);
 
-       LASSERT(!lli->lli_has_smd);
-
         /* Core attributes from the MDS first.  This is a new inode, and
          * the VFS doesn't zero times in the core inode so we have to do
          * it ourselves.  They will be overwritten by either MDS or OST
@@ -2142,7 +2039,7 @@ int ll_read_inode2(struct inode *inode, void *opaque)
 
 void ll_delete_inode(struct inode *inode)
 {
-       struct cl_inode_info *lli = cl_i2info(inode);
+       struct ll_inode_info *lli = ll_i2info(inode);
        ENTRY;
 
        if (S_ISREG(inode->i_mode) && lli->lli_clob != NULL)
@@ -2202,9 +2099,10 @@ int ll_iocontrol(struct inode *inode, struct file *file,
                RETURN(put_user(flags, (int __user *)arg));
         }
         case FSFILT_IOC_SETFLAGS: {
-               struct lov_stripe_md *lsm;
-                struct obd_info oinfo = { { { 0 } } };
-                struct md_op_data *op_data;
+               struct iattr *attr;
+               struct md_op_data *op_data;
+               struct cl_object *obj;
+               struct obd_capa *capa;
 
                if (get_user(flags, (int __user *)arg))
                        RETURN(-EFAULT);
@@ -2216,8 +2114,7 @@ int ll_iocontrol(struct inode *inode, struct file *file,
 
                op_data->op_attr_flags = flags;
                 op_data->op_attr.ia_valid |= ATTR_ATTR_FLAG;
-                rc = md_setattr(sbi->ll_md_exp, op_data,
-                                NULL, 0, NULL, 0, &req, NULL);
+               rc = md_setattr(sbi->ll_md_exp, op_data, NULL, 0, &req);
                 ll_finish_md_op_data(op_data);
                 ptlrpc_req_finished(req);
                if (rc)
@@ -2225,32 +2122,21 @@ int ll_iocontrol(struct inode *inode, struct file *file,
 
                inode->i_flags = ll_ext_to_inode_flags(flags);
 
-               lsm = ccc_inode_lsm_get(inode);
-               if (!lsm_has_objects(lsm)) {
-                       ccc_inode_lsm_put(inode, lsm);
+               obj = ll_i2info(inode)->lli_clob;
+               if (obj == NULL)
                        RETURN(0);
-               }
 
-               OBDO_ALLOC(oinfo.oi_oa);
-               if (!oinfo.oi_oa) {
-                       ccc_inode_lsm_put(inode, lsm);
+               OBD_ALLOC_PTR(attr);
+               if (attr == NULL)
                        RETURN(-ENOMEM);
-               }
-               oinfo.oi_md = lsm;
-               oinfo.oi_oa->o_oi = lsm->lsm_oi;
-                oinfo.oi_oa->o_flags = flags;
-                oinfo.oi_oa->o_valid = OBD_MD_FLID | OBD_MD_FLFLAGS |
-                                       OBD_MD_FLGROUP;
-                oinfo.oi_capa = ll_mdscapa_get(inode);
-                obdo_set_parent_fid(oinfo.oi_oa, &ll_i2info(inode)->lli_fid);
-                rc = obd_setattr_rqset(sbi->ll_dt_exp, &oinfo, NULL);
-                capa_put(oinfo.oi_capa);
-                OBDO_FREE(oinfo.oi_oa);
-               ccc_inode_lsm_put(inode, lsm);
-
-               if (rc && rc != -EPERM && rc != -EACCES)
-                       CERROR("osc_setattr_async fails: rc = %d\n", rc);
 
+               attr->ia_valid = ATTR_ATTR_FLAG;
+
+               capa = ll_mdscapa_get(inode);
+               rc = cl_setattr_ost(obj, attr, flags, capa);
+               capa_put(capa);
+
+               OBD_FREE_PTR(attr);
                RETURN(rc);
         }
         default:
@@ -2356,20 +2242,63 @@ int ll_remount_fs(struct super_block *sb, int *flags, char *data)
         return 0;
 }
 
+/**
+ * Cleanup the open handle that is cached on MDT-side.
+ *
+ * For open case, the client side open handling thread may hit error
+ * after the MDT grant the open. Under such case, the client should
+ * send close RPC to the MDT as cleanup; otherwise, the open handle
+ * on the MDT will be leaked there until the client umount or evicted.
+ *
+ * In further, if someone unlinked the file, because the open handle
+ * holds the reference on such file/object, then it will block the
+ * subsequent threads that want to locate such object via FID.
+ *
+ * \param[in] sb       super block for this file-system
+ * \param[in] open_req pointer to the original open request
+ */
+void ll_open_cleanup(struct super_block *sb, struct ptlrpc_request *open_req)
+{
+       struct mdt_body                 *body;
+       struct md_op_data               *op_data;
+       struct ptlrpc_request           *close_req = NULL;
+       struct obd_export               *exp       = ll_s2sbi(sb)->ll_md_exp;
+       ENTRY;
+
+       body = req_capsule_server_get(&open_req->rq_pill, &RMF_MDT_BODY);
+       OBD_ALLOC_PTR(op_data);
+       if (op_data == NULL) {
+               CWARN("%s: cannot allocate op_data to release open handle for "
+                     DFID"\n",
+                     ll_get_fsname(sb, NULL, 0), PFID(&body->mbo_fid1));
+
+               RETURN_EXIT;
+       }
+
+       op_data->op_fid1 = body->mbo_fid1;
+       op_data->op_handle = body->mbo_handle;
+       op_data->op_mod_time = cfs_time_current_sec();
+       md_close(exp, op_data, NULL, &close_req);
+       ptlrpc_req_finished(close_req);
+       ll_finish_md_op_data(op_data);
+
+       EXIT;
+}
+
 int ll_prep_inode(struct inode **inode, struct ptlrpc_request *req,
                  struct super_block *sb, struct lookup_intent *it)
 {
        struct ll_sb_info *sbi = NULL;
-       struct lustre_md md = { 0 };
+       struct lustre_md md = { NULL };
        int rc;
        ENTRY;
 
-        LASSERT(*inode || sb);
-        sbi = sb ? ll_s2sbi(sb) : ll_i2sbi(*inode);
-        rc = md_get_lustre_md(sbi->ll_md_exp, req, sbi->ll_dt_exp,
-                              sbi->ll_md_exp, &md);
-        if (rc)
-                RETURN(rc);
+       LASSERT(*inode || sb);
+       sbi = sb ? ll_s2sbi(sb) : ll_i2sbi(*inode);
+       rc = md_get_lustre_md(sbi->ll_md_exp, req, sbi->ll_dt_exp,
+                             sbi->ll_md_exp, &md);
+       if (rc != 0)
+               GOTO(cleanup, rc);
 
        if (*inode) {
                rc = ll_update_inode(*inode, &md);
@@ -2429,11 +2358,18 @@ int ll_prep_inode(struct inode **inode, struct ptlrpc_request *req,
                LDLM_LOCK_PUT(lock);
        }
 
+       GOTO(out, rc = 0);
+
 out:
        if (md.lsm != NULL)
                obd_free_memmd(sbi->ll_dt_exp, &md.lsm);
        md_free_lustre_md(sbi->ll_md_exp, &md);
-       RETURN(rc);
+
+cleanup:
+       if (rc != 0 && it != NULL && it->it_op & IT_OPEN)
+               ll_open_cleanup(sb != NULL ? sb : (*inode)->i_sb, req);
+
+       return rc;
 }
 
 int ll_obd_statfs(struct inode *inode, void __user *arg)
@@ -2537,8 +2473,13 @@ struct md_op_data * ll_prep_md_op_data(struct md_op_data *op_data,
        ll_i2gids(op_data->op_suppgids, i1, i2);
        op_data->op_fid1 = *ll_inode2fid(i1);
        op_data->op_capa1 = ll_mdscapa_get(i1);
-       if (S_ISDIR(i1->i_mode))
+       op_data->op_default_stripe_offset = -1;
+       if (S_ISDIR(i1->i_mode)) {
                op_data->op_mea1 = ll_i2info(i1)->lli_lsm_md;
+               if (opc == LUSTRE_OPC_MKDIR)
+                       op_data->op_default_stripe_offset =
+                                  ll_i2info(i1)->lli_def_stripe_offset;
+       }
 
        if (i2) {
                op_data->op_fid2 = *ll_inode2fid(i2);
@@ -2701,7 +2642,7 @@ void ll_dirty_page_discard_warn(struct page *page, int ioret)
 {
        char *buf, *path = NULL;
        struct dentry *dentry = NULL;
-       struct ccc_object *obj = cl_inode2ccc(page->mapping->host);
+       struct inode *inode = page->mapping->host;
 
        /* this can be called inside spin lock so use GFP_ATOMIC. */
        buf = (char *)__get_free_page(GFP_ATOMIC);
@@ -2715,7 +2656,7 @@ void ll_dirty_page_discard_warn(struct page *page, int ioret)
               "%s: dirty page discard: %s/fid: "DFID"/%s may get corrupted "
               "(rc %d)\n", ll_get_fsname(page->mapping->host->i_sb, NULL, 0),
               s2lsi(page->mapping->host->i_sb)->lsi_lmd->lmd_dev,
-              PFID(&obj->cob_header.coh_lu.loh_fid),
+              PFID(ll_inode2fid(inode)),
               (path && !IS_ERR(path)) ? path : "", ioret);
 
        if (dentry != NULL)
@@ -2790,20 +2731,18 @@ void ll_compute_rootsquash_state(struct ll_sb_info *sbi)
 /**
  * Parse linkea content to extract information about a given hardlink
  *
- * \param[in]  ldata  - Initialized linkea data
- * \param[in]  linkno - Link identifier
- * \param[out] gpout  - Destination structure to fill with linkno,
- *                      parent FID and entry name
- * \param[in]  size   - Size of the gp_name buffer in gpout
+ * \param[in]   ldata      - Initialized linkea data
+ * \param[in]   linkno     - Link identifier
+ * \param[out]  parent_fid - The entry's parent FID
+ * \param[out]  ln         - Entry name destination buffer
  *
  * \retval 0 on success
  * \retval Appropriate negative error code on failure
  */
 static int ll_linkea_decode(struct linkea_data *ldata, unsigned int linkno,
-                           struct getparent *gpout, size_t name_size)
+                           struct lu_fid *parent_fid, struct lu_name *ln)
 {
        unsigned int    idx;
-       struct lu_name  ln;
        int             rc;
        ENTRY;
 
@@ -2816,25 +2755,18 @@ static int ll_linkea_decode(struct linkea_data *ldata, unsigned int linkno,
                RETURN(-ENODATA);
 
        linkea_first_entry(ldata);
-       idx = 0;
-       while (ldata->ld_lee != NULL) {
-               linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen, &ln,
-                                   &gpout->gp_fid);
+       for (idx = 0; ldata->ld_lee != NULL; idx++) {
+               linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen, ln,
+                                   parent_fid);
                if (idx == linkno)
                        break;
 
                linkea_next_entry(ldata);
-               idx++;
        }
 
        if (idx < linkno)
                RETURN(-ENODATA);
 
-       if (ln.ln_namelen >= name_size)
-               RETURN(-EOVERFLOW);
-
-       gpout->gp_linkno = linkno;
-       strlcpy(gpout->gp_name, ln.ln_name, name_size);
        RETURN(0);
 }
 
@@ -2857,10 +2789,10 @@ int ll_getparent(struct file *file, struct getparent __user *arg)
        struct inode            *inode = file->f_dentry->d_inode;
        struct linkea_data      *ldata;
        struct lu_buf            buf = LU_BUF_NULL;
-       struct getparent        *gpout;
+       struct lu_name           ln;
+       struct lu_fid            parent_fid;
        __u32                    linkno;
        __u32                    name_size;
-       size_t                   out_size;
        int                      rc;
 
        ENTRY;
@@ -2886,27 +2818,26 @@ int ll_getparent(struct file *file, struct getparent __user *arg)
        if (rc < 0)
                GOTO(ldata_free, rc);
 
-       out_size = sizeof(*gpout) + name_size;
-       OBD_ALLOC(gpout, out_size);
-       if (gpout == NULL)
-               GOTO(lb_free, rc = -ENOMEM);
-
-       if (copy_from_user(gpout, arg, sizeof(*gpout)))
-               GOTO(gp_free, rc = -EFAULT);
-
        rc = ll_getxattr(dentry, XATTR_NAME_LINK, buf.lb_buf, buf.lb_len);
        if (rc < 0)
-               GOTO(gp_free, rc);
+               GOTO(lb_free, rc);
 
-       rc = ll_linkea_decode(ldata, linkno, gpout, name_size);
+       rc = ll_linkea_decode(ldata, linkno, &parent_fid, &ln);
        if (rc < 0)
-               GOTO(gp_free, rc);
+               GOTO(lb_free, rc);
+
+       if (ln.ln_namelen >= name_size)
+               GOTO(lb_free, rc = -EOVERFLOW);
+
+       if (copy_to_user(&arg->gp_fid, &parent_fid, sizeof(arg->gp_fid)))
+               GOTO(lb_free, rc = -EFAULT);
+
+       if (copy_to_user(&arg->gp_name, ln.ln_name, ln.ln_namelen))
+               GOTO(lb_free, rc = -EFAULT);
 
-       if (copy_to_user(arg, gpout, out_size))
-               GOTO(gp_free, rc = -EFAULT);
+       if (put_user('\0', arg->gp_name + ln.ln_namelen))
+               GOTO(lb_free, rc = -EFAULT);
 
-gp_free:
-       OBD_FREE(gpout, out_size);
 lb_free:
        lu_buf_free(&buf);
 ldata_free: