Whamcloud - gitweb
LU-15171 osd-ldiskfs: xattr_sem locking is missing for dquot_transfer
[fs/lustre-release.git] / lustre / osd-ldiskfs / osd_handler.c
index 466da64..34f8465 100644 (file)
@@ -27,7 +27,6 @@
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
  *
  * lustre/osd/osd_handler.c
  *
@@ -39,6 +38,7 @@
 
 #define DEBUG_SUBSYSTEM S_OSD
 
+#include <linux/fs_struct.h>
 #include <linux/kallsyms.h>
 #include <linux/module.h>
 #include <linux/user_namespace.h>
@@ -75,6 +75,9 @@
 
 #include <lustre_linkea.h>
 
+/* encoding routines */
+#include <lustre_crypto.h>
+
 /* Maximum EA size is limited by LNET_MTU for remote objects */
 #define OSD_MAX_EA_SIZE 1048364
 
@@ -209,14 +212,14 @@ osd_idc_add(const struct lu_env *env, struct osd_device *osd,
                i = oti->oti_ins_cache_size * 2;
                if (i == 0)
                        i = OSD_INS_CACHE_SIZE;
-               OBD_ALLOC(idc, sizeof(*idc) * i);
+               OBD_ALLOC_PTR_ARRAY(idc, i);
                if (idc == NULL)
                        return ERR_PTR(-ENOMEM);
                if (oti->oti_ins_cache != NULL) {
                        memcpy(idc, oti->oti_ins_cache,
                               oti->oti_ins_cache_used * sizeof(*idc));
-                       OBD_FREE(oti->oti_ins_cache,
-                                oti->oti_ins_cache_used * sizeof(*idc));
+                       OBD_FREE_PTR_ARRAY(oti->oti_ins_cache,
+                                          oti->oti_ins_cache_used);
                }
                oti->oti_ins_cache = idc;
                oti->oti_ins_cache_size = i;
@@ -284,6 +287,76 @@ osd_idc_find_or_init(const struct lu_env *env, struct osd_device *osd,
        return idc;
 }
 
+static void osd_idc_dump_lma(const struct lu_env *env,
+                               struct osd_device *osd,
+                               unsigned long ino,
+                               bool check_in_oi)
+{
+       struct osd_thread_info *info = osd_oti_get(env);
+       struct lustre_ost_attrs *loa = &info->oti_ost_attrs;
+       const struct lu_fid *fid;
+       struct osd_inode_id lid;
+       struct inode *inode;
+       int rc;
+
+       inode = osd_ldiskfs_iget(osd_sb(osd), ino);
+       if (IS_ERR(inode)) {
+               CERROR("%s: can't get inode %lu: rc = %d\n",
+                      osd->od_svname, ino, (int)PTR_ERR(inode));
+               return;
+       }
+       if (is_bad_inode(inode)) {
+               CERROR("%s: bad inode %lu\n", osd->od_svname, ino);
+               goto put;
+       }
+       rc = osd_get_lma(info, inode, &info->oti_obj_dentry, loa);
+       if (rc) {
+               CERROR("%s: can't get LMA for %lu: rc = %d\n",
+                      osd->od_svname, ino, rc);
+               goto put;
+       }
+       fid = &loa->loa_lma.lma_self_fid;
+       LCONSOLE(D_INFO, "%s: "DFID" in inode %lu/%u\n", osd->od_svname,
+                     PFID(fid), ino, (unsigned)inode->i_generation);
+       if (!check_in_oi)
+               goto put;
+       rc = osd_oi_lookup(osd_oti_get(env), osd, fid, &lid, 0);
+       if (rc) {
+               CERROR("%s: can't lookup "DFID": rc = %d\n",
+                      osd->od_svname, PFID(fid), rc);
+               goto put;
+       }
+       LCONSOLE(D_INFO, "%s: "DFID" maps to %u/%u\n", osd->od_svname,
+                     PFID(fid), lid.oii_ino, lid.oii_gen);
+put:
+       iput(inode);
+}
+
+static void osd_idc_dump_debug(const struct lu_env *env,
+                               struct osd_device *osd,
+                               const struct lu_fid *fid,
+                               unsigned long ino1,
+                               unsigned long ino2)
+{
+       struct osd_inode_id lid;
+
+       int rc;
+
+       rc = osd_oi_lookup(osd_oti_get(env), osd, fid, &lid, 0);
+       if (!rc) {
+               LCONSOLE(D_INFO, "%s: "DFID" maps to %u/%u\n",
+                       osd->od_svname, PFID(fid), lid.oii_ino, lid.oii_gen);
+               osd_idc_dump_lma(env, osd, lid.oii_ino, false);
+       } else {
+               CERROR("%s: can't lookup "DFID": rc = %d\n",
+                      osd->od_svname, PFID(fid), rc);
+       }
+       if (ino1)
+               osd_idc_dump_lma(env, osd, ino1, true);
+       if (ino2)
+               osd_idc_dump_lma(env, osd, ino2, true);
+}
+
 /*
  * lookup mapping for given FID and fill it from the given object.
  * the object is lolcal by definition.
@@ -301,7 +374,12 @@ static int osd_idc_find_and_init(const struct lu_env *env,
                if (obj->oo_inode == NULL)
                        return 0;
                if (idc->oic_lid.oii_ino != obj->oo_inode->i_ino) {
-                       LASSERT(idc->oic_lid.oii_ino == 0);
+                       if (idc->oic_lid.oii_ino) {
+                               osd_idc_dump_debug(env, osd, fid,
+                                                  idc->oic_lid.oii_ino,
+                                                  obj->oo_inode->i_ino);
+                               return -EINVAL;
+                       }
                        idc->oic_lid.oii_ino = obj->oo_inode->i_ino;
                        idc->oic_lid.oii_gen = obj->oo_inode->i_generation;
                }
@@ -391,12 +469,11 @@ int osd_get_lma(struct osd_thread_info *info, struct inode *inode,
                lustre_loa_swab(loa, true);
                /* Check LMA compatibility */
                if (lma->lma_incompat & ~LMA_INCOMPAT_SUPP) {
-                       CWARN("%s: unsupported incompat LMA feature(s) %#x "
-                             "for fid = "DFID", ino = %lu\n",
+                       rc = -EOPNOTSUPP;
+                       CWARN("%s: unsupported incompat LMA feature(s) %#x for fid = "DFID", ino = %lu: rc = %d\n",
                              osd_ino2name(inode),
                              lma->lma_incompat & ~LMA_INCOMPAT_SUPP,
-                             PFID(&lma->lma_self_fid), inode->i_ino);
-                       rc = -EOPNOTSUPP;
+                             PFID(&lma->lma_self_fid), inode->i_ino, rc);
                }
        } else if (rc == 0) {
                rc = -ENODATA;
@@ -441,10 +518,11 @@ struct inode *osd_iget(struct osd_thread_info *info, struct osd_device *dev,
                iput(inode);
                inode = ERR_PTR(-ESTALE);
        } else if (is_bad_inode(inode)) {
-               CWARN("%s: bad inode: ino = %u\n",
-                     osd_dev2name(dev), id->oii_ino);
+               rc = -ENOENT;
+               CWARN("%s: bad inode: ino = %u: rc = %d\n",
+                     osd_dev2name(dev), id->oii_ino, rc);
                iput(inode);
-               inode = ERR_PTR(-ENOENT);
+               inode = ERR_PTR(rc);
        } else if ((rc = osd_attach_jinode(inode))) {
                iput(inode);
                inode = ERR_PTR(rc);
@@ -477,12 +555,13 @@ int osd_ldiskfs_add_entry(struct osd_thread_info *info, struct osd_device *osd,
                struct lustre_ost_attrs *loa = &info->oti_ost_attrs;
                struct inode *parent = child->d_parent->d_inode;
                struct lu_fid *fid = NULL;
+               char fidstr[FID_LEN + 1] = "unknown";
 
                rc2 = osd_get_lma(info, parent, child->d_parent, loa);
                if (!rc2) {
                        fid = &loa->loa_lma.lma_self_fid;
                } else if (rc2 == -ENODATA) {
-                       if (unlikely(parent == inode->i_sb->s_root->d_inode)) {
+                       if (unlikely(is_root_inode(parent))) {
                                fid = &info->oti_fid3;
                                lu_local_obj_fid(fid, OSD_FS_ROOT_OID);
                        } else if (!osd->od_is_ost && osd->od_index == 0) {
@@ -493,19 +572,18 @@ int osd_ldiskfs_add_entry(struct osd_thread_info *info, struct osd_device *osd,
                }
 
                if (fid != NULL)
-                       /* below message is checked in sanity.sh test_129 */
-                       CWARN("%s: directory (inode: %lu, FID: "DFID") %s maximum entry limit\n",
-                             osd_name(osd), parent->i_ino, PFID(fid),
-                             rc == -ENOSPC ? "has reached" : "is approaching");
-               else
-                       /* below message is checked in sanity.sh test_129 */
-                       CWARN("%s: directory (inode: %lu, FID: unknown) %s maximum entry limit\n",
-                             osd_name(osd), parent->i_ino,
-                             rc == -ENOSPC ? "has reached" : "is approaching");
+                       snprintf(fidstr, sizeof(fidstr), DFID, PFID(fid));
+
+               /* below message is checked in sanity.sh test_129 */
+               if (rc == -ENOSPC) {
+                       CWARN("%s: directory (inode: %lu, FID: %s) has reached max size limit\n",
+                             osd_name(osd), parent->i_ino, fidstr);
+               } else {
+                       rc = 0; /* ignore such error now */
+                       CWARN("%s: directory (inode: %lu, FID: %s) is approaching max size limit\n",
+                             osd_name(osd), parent->i_ino, fidstr);
+               }
 
-               /* ignore such error now */
-               if (rc == -ENOBUFS)
-                       rc = 0;
        }
 
        return rc;
@@ -528,7 +606,7 @@ osd_iget_fid(struct osd_thread_info *info, struct osd_device *dev,
        if (!rc) {
                *fid = loa->loa_lma.lma_self_fid;
        } else if (rc == -ENODATA) {
-               if (unlikely(inode == osd_sb(dev)->s_root->d_inode))
+               if (unlikely(is_root_inode(inode)))
                        lu_local_obj_fid(fid, OSD_FS_ROOT_OID);
                else
                        lu_igif_build(fid, inode->i_ino, inode->i_generation);
@@ -857,7 +935,6 @@ struct osd_check_lmv_buf {
        struct dir_context ctx;
        struct osd_thread_info *oclb_info;
        struct osd_device *oclb_dev;
-       struct osd_idmap_cache *oclb_oic;
        int oclb_items;
        bool oclb_found;
 };
@@ -883,7 +960,6 @@ static int osd_stripe_dir_filldir(void *buf,
        struct lu_fid *fid = &oti->oti_fid3;
        struct osd_inode_id *id = &oti->oti_id3;
        struct osd_device *dev = oclb->oclb_dev;
-       struct osd_idmap_cache *oic = oclb->oclb_oic;
        struct inode *inode;
 
        oclb->oclb_items++;
@@ -906,10 +982,7 @@ static int osd_stripe_dir_filldir(void *buf,
 
        iput(inode);
        osd_add_oi_cache(oti, dev, id, fid);
-       oic->oic_fid = *fid;
-       oic->oic_lid = *id;
-       oic->oic_dev = dev;
-       osd_oii_insert(dev, oic, true);
+       osd_oii_insert(dev, fid, id, true);
        oclb->oclb_found = true;
 
        return 1;
@@ -952,18 +1025,16 @@ static int osd_stripe_dir_filldir(void *buf,
  *    the correct OI mapping for the slave MDT-object.
  */
 static int osd_check_lmv(struct osd_thread_info *oti, struct osd_device *dev,
-                        struct inode *inode, struct osd_idmap_cache *oic)
+                        struct inode *inode)
 {
        struct lu_buf *buf = &oti->oti_big_buf;
        struct dentry *dentry = &oti->oti_obj_dentry;
-       struct file *filp = &oti->oti_file;
-       const struct file_operations *fops;
+       struct file *filp;
        struct lmv_mds_md_v1 *lmv1;
        struct osd_check_lmv_buf oclb = {
                .ctx.actor = osd_stripe_dir_filldir,
                .oclb_info = oti,
                .oclb_dev = dev,
-               .oclb_oic = oic,
                .oclb_found = false,
        };
        int rc = 0;
@@ -1002,17 +1073,7 @@ again:
        if (le32_to_cpu(lmv1->lmv_magic) != LMV_MAGIC_V1)
                GOTO(out, rc = 0);
 
-       fops = inode->i_fop;
-       dentry->d_inode = inode;
-       dentry->d_sb = inode->i_sb;
-       filp->f_pos = 0;
-       filp->f_path.dentry = dentry;
-       filp->f_flags |= O_NOATIME;
-       filp->f_mode = FMODE_64BITHASH | FMODE_NONOTIFY;
-       filp->f_mapping = inode->i_mapping;
-       filp->f_op = fops;
-       filp->private_data = NULL;
-       set_file_inode(filp, inode);
+       filp = osd_quasi_file(oti->oti_env, inode);
        rc = osd_security_file_alloc(filp);
        if (rc)
                goto out;
@@ -1022,14 +1083,14 @@ again:
                rc = iterate_dir(filp, &oclb.ctx);
        } while (rc >= 0 && oclb.oclb_items > 0 && !oclb.oclb_found &&
                 filp->f_pos != LDISKFS_HTREE_EOF_64BIT);
-       fops->release(inode, filp);
+       inode->i_fop->release(inode, filp);
 
 out:
        if (rc < 0)
-               CDEBUG(D_LFSCK, "%s: fail to check LMV EA, inode = %lu/%u,"
-                      DFID": rc = %d\n", osd_ino2name(inode),
-                      inode->i_ino, inode->i_generation,
-                      PFID(&oic->oic_fid), rc);
+               CDEBUG(D_LFSCK,
+                      "%s: cannot check LMV, ino = %lu/%u: rc = %d\n",
+                      osd_ino2name(inode), inode->i_ino, inode->i_generation,
+                      rc);
        else
                rc = 0;
 
@@ -1062,7 +1123,13 @@ static int osd_fid_lookup(const struct lu_env *env, struct osd_object *obj,
 
        LINVRNT(osd_invariant(obj));
        LASSERT(obj->oo_inode == NULL);
-       LASSERTF(fid_is_sane(fid) || fid_is_idif(fid), DFID"\n", PFID(fid));
+
+       if (fid_is_sane(fid) == 0) {
+               CERROR("%s: invalid FID "DFID"\n", ldev->ld_obd->obd_name,
+                      PFID(fid));
+               dump_stack();
+               RETURN(-EINVAL);
+       }
 
        dev = osd_dev(ldev);
        scrub = &dev->od_scrub.os_scrub;
@@ -1091,6 +1158,7 @@ static int osd_fid_lookup(const struct lu_env *env, struct osd_object *obj,
        }
 
        id = &info->oti_id;
+       memset(id, 0, sizeof(struct osd_inode_id));
        if (!list_empty(&scrub->os_inconsistent_items)) {
                /* Search order: 2. OI scrub pending list. */
                result = osd_oii_lookup(dev, fid, id);
@@ -1177,22 +1245,23 @@ trigger:
                }
        }
 
-       if (thread_is_running(&scrub->os_thread)) {
+       if (scrub->os_running) {
                if (scrub->os_partial_scan && !scrub->os_in_join)
                        goto join;
 
-               if (IS_ERR_OR_NULL(inode) || result)
+               if (IS_ERR_OR_NULL(inode) || result) {
+                       osd_oii_insert(dev, fid, id, result == -ENOENT);
                        GOTO(out, result = -EINPROGRESS);
+               }
 
                LASSERT(remote);
                LASSERT(obj->oo_inode == inode);
 
-               osd_add_oi_cache(info, dev, id, fid);
-               osd_oii_insert(dev, oic, true);
+               osd_oii_insert(dev, fid, id, true);
                goto found;
        }
 
-       if (dev->od_auto_scrub_interval == AS_NEVER) {
+       if (dev->od_scrub.os_scrub.os_auto_scrub_interval == AS_NEVER) {
                if (!remote)
                        GOTO(out, result = -EREMCHG);
 
@@ -1205,19 +1274,21 @@ trigger:
 
 join:
        rc1 = osd_scrub_start(env, dev, flags);
-       LCONSOLE_WARN("%s: trigger OI scrub by RPC for the " DFID" with flags "
-                     "0x%x, rc = %d\n", osd_name(dev), PFID(fid), flags, rc1);
+       CDEBUG_LIMIT(D_LFSCK | D_CONSOLE | D_WARNING,
+                    "%s: trigger OI scrub by RPC for "DFID"/%u with flags %#x: rc = %d\n",
+                    osd_name(dev), PFID(fid), id->oii_ino, flags, rc1);
        if (rc1 && rc1 != -EALREADY)
                GOTO(out, result = -EREMCHG);
 
-       if (IS_ERR_OR_NULL(inode) || result)
+       if (IS_ERR_OR_NULL(inode) || result) {
+               osd_oii_insert(dev, fid, id, result == -ENOENT);
                GOTO(out, result = -EINPROGRESS);
+       }
 
        LASSERT(remote);
        LASSERT(obj->oo_inode == inode);
 
-       osd_add_oi_cache(info, dev, id, fid);
-       osd_oii_insert(dev, oic, true);
+       osd_oii_insert(dev, fid, id, true);
        goto found;
 
 check_lma:
@@ -1314,6 +1385,8 @@ check_lma:
 
        if (saved_ino == id->oii_ino && saved_gen == id->oii_gen) {
                result = -EREMCHG;
+               osd_scrub_refresh_mapping(info, dev, fid, id, DTO_INDEX_DELETE,
+                                         true, 0, NULL);
                goto trigger;
        }
 
@@ -1348,7 +1421,7 @@ found:
 
        if (S_ISDIR(inode->i_mode) &&
            (flags & SS_AUTO_PARTIAL || sf->sf_status == SS_SCANNING))
-               osd_check_lmv(info, dev, inode, oic);
+               osd_check_lmv(info, dev, inode);
 
        result = osd_attach_jinode(inode);
        if (result)
@@ -1399,6 +1472,13 @@ static int osd_object_init(const struct lu_env *env, struct lu_object *l,
 
        LINVRNT(osd_invariant(obj));
 
+       if (OBD_FAIL_PRECHECK(OBD_FAIL_MDS_LLOG_UMOUNT_RACE) &&
+           cfs_fail_val == 2) {
+               struct osd_thread_info *info = osd_oti_get(env);
+               struct osd_idmap_cache *oic = &info->oti_cache;
+               /* invalidate thread cache */
+               memset(&oic->oic_fid, 0, sizeof(oic->oic_fid));
+       }
        if (fid_is_otable_it(&l->lo_header->loh_fid)) {
                obj->oo_dt.do_ops = &osd_obj_otable_it_ops;
                l->lo_header->loh_attr |= LOHA_EXISTS;
@@ -1428,6 +1508,7 @@ static int osd_object_init(const struct lu_env *env, struct lu_object *l,
                        result = 0;
                }
        }
+       obj->oo_dirent_count = LU_DIRENT_COUNT_UNSET;
 
        LINVRNT(osd_invariant(obj));
        return result;
@@ -1454,8 +1535,6 @@ static int osd_oxc_get(struct osd_object *obj, const char *name,
        size_t namelen = strlen(name);
        int rc;
 
-       ENTRY;
-
        rcu_read_lock();
        list_for_each_entry_rcu(tmp, &obj->oo_xattr_list, oxe_list) {
                if (namelen == tmp->oxe_namelen &&
@@ -1482,7 +1561,6 @@ static int osd_oxc_get(struct osd_object *obj, const char *name,
                GOTO(out, rc = -ERANGE);
 
        memcpy(buf->lb_buf, &oxe->oxe_buf[namelen + 1], rc);
-       EXIT;
 out:
        rcu_read_unlock();
 
@@ -1582,11 +1660,10 @@ static void osd_object_free(const struct lu_env *env, struct lu_object *l)
        dt_object_fini(&obj->oo_dt);
        if (obj->oo_hl_head != NULL)
                ldiskfs_htree_lock_head_free(obj->oo_hl_head);
+       /* obj doesn't contain an lu_object_header, so we don't need call_rcu */
        OBD_FREE_PTR(obj);
-       if (unlikely(h)) {
-               lu_object_header_fini(h);
-               OBD_FREE_PTR(h);
-       }
+       if (unlikely(h))
+               lu_object_header_free(h);
 }
 
 /*
@@ -1608,16 +1685,6 @@ static void osd_index_fini(struct osd_object *o)
        }
 }
 
-/*
- * Concurrency: no concurrent access is possible that late in object
- * life-cycle (for all existing callers, that is. New callers have to provide
- * their own locking.)
- */
-static int osd_inode_unlinked(const struct inode *inode)
-{
-       return inode->i_nlink == 0;
-}
-
 enum {
        OSD_TXN_OI_DELETE_CREDITS    = 20,
        OSD_TXN_INODE_DELETE_CREDITS = 20
@@ -1704,9 +1771,10 @@ static int osd_param_is_not_sane(const struct osd_device *dev,
 static void osd_trans_commit_cb(struct super_block *sb,
                                struct ldiskfs_journal_cb_entry *jcb, int error)
 {
-       struct osd_thandle *oh = container_of0(jcb, struct osd_thandle, ot_jcb);
+       struct osd_thandle *oh = container_of(jcb, struct osd_thandle, ot_jcb);
        struct thandle *th = &oh->ot_super;
        struct lu_device *lud = &th->th_dev->dd_lu_dev;
+       struct osd_device *osd = osd_dev(lud);
        struct dt_txn_commit_cb *dcb, *tmp;
 
        LASSERT(oh->ot_handle == NULL);
@@ -1726,17 +1794,13 @@ static void osd_trans_commit_cb(struct super_block *sb,
        }
 
        lu_ref_del_at(&lud->ld_reference, &oh->ot_dev_link, "osd-tx", th);
-       lu_device_put(lud);
+       if (atomic_dec_and_test(&osd->od_commit_cb_in_flight))
+               wake_up(&osd->od_commit_cb_done);
        th->th_dev = NULL;
 
        OBD_FREE_PTR(oh);
 }
 
-#ifndef HAVE_SB_START_WRITE
-# define sb_start_write(sb) do {} while (0)
-# define sb_end_write(sb) do {} while (0)
-#endif
-
 static struct thandle *osd_trans_create(const struct lu_env *env,
                                        struct dt_device *d)
 {
@@ -1772,6 +1836,7 @@ static struct thandle *osd_trans_create(const struct lu_env *env,
        th->th_dev = d;
        th->th_result = 0;
        oh->ot_credits = 0;
+       oh->oh_declared_ext = 0;
        INIT_LIST_HEAD(&oh->ot_commit_dcb_list);
        INIT_LIST_HEAD(&oh->ot_stop_dcb_list);
        INIT_LIST_HEAD(&oh->ot_trunc_locks);
@@ -1794,7 +1859,7 @@ void osd_trans_dump_creds(const struct lu_env *env, struct thandle *th)
        struct osd_thread_info *oti = osd_oti_get(env);
        struct osd_thandle *oh;
 
-       oh = container_of0(th, struct osd_thandle, ot_super);
+       oh = container_of(th, struct osd_thandle, ot_super);
        LASSERT(oh != NULL);
 
        CWARN("  create: %u/%u/%u, destroy: %u/%u/%u\n",
@@ -1853,7 +1918,7 @@ static int osd_trans_start(const struct lu_env *env, struct dt_device *d,
 
        LASSERT(current->journal_info == NULL);
 
-       oh = container_of0(th, struct osd_thandle, ot_super);
+       oh = container_of(th, struct osd_thandle, ot_super);
        LASSERT(oh != NULL);
        LASSERT(oh->ot_handle == NULL);
 
@@ -1914,7 +1979,7 @@ static int osd_trans_start(const struct lu_env *env, struct dt_device *d,
                oh->ot_handle = jh;
                LASSERT(oti->oti_txns == 0);
 
-               lu_device_get(&d->dd_lu_dev);
+               atomic_inc(&dev->od_commit_cb_in_flight);
                lu_ref_add_at(&d->dd_lu_dev.ld_reference, &oh->ot_dev_link,
                              "osd-tx", th);
                oti->oti_txns++;
@@ -1982,7 +2047,7 @@ static int osd_trans_stop(const struct lu_env *env, struct dt_device *dt,
 
        ENTRY;
 
-       oh = container_of0(th, struct osd_thandle, ot_super);
+       oh = container_of(th, struct osd_thandle, ot_super);
 
        remove_agents = oh->ot_remove_agents;
 
@@ -2025,13 +2090,13 @@ static int osd_trans_stop(const struct lu_env *env, struct dt_device *dt,
                if (!rc)
                        rc = rc2;
 
-               osd_process_truncates(&truncates);
+               osd_process_truncates(env, &truncates);
        } else {
                osd_trans_stop_cb(oh, th->th_result);
                OBD_FREE_PTR(oh);
        }
 
-       osd_trunc_unlock_all(&truncates);
+       osd_trunc_unlock_all(env, &truncates);
 
        /* inform the quota slave device that the transaction is stopping */
        qsd_op_end(env, qsd, qtrans);
@@ -2055,6 +2120,7 @@ static int osd_trans_stop(const struct lu_env *env, struct dt_device *dt,
        if (unlikely(remove_agents != 0))
                osd_process_scheduled_agent_removals(env, osd);
 
+       LASSERT(oti->oti_ins_cache_depth > 0);
        oti->oti_ins_cache_depth--;
        /* reset OI cache for safety */
        if (oti->oti_ins_cache_depth == 0)
@@ -2067,8 +2133,8 @@ static int osd_trans_stop(const struct lu_env *env, struct dt_device *dt,
 
 static int osd_trans_cb_add(struct thandle *th, struct dt_txn_commit_cb *dcb)
 {
-       struct osd_thandle *oh = container_of0(th, struct osd_thandle,
-                                              ot_super);
+       struct osd_thandle *oh = container_of(th, struct osd_thandle,
+                                             ot_super);
 
        LASSERT(dcb->dcb_magic == TRANS_COMMIT_CB_MAGIC);
        LASSERT(&dcb->dcb_func != NULL);
@@ -2107,6 +2173,9 @@ static void osd_object_delete(const struct lu_env *env, struct lu_object *l)
        if (!inode)
                return;
 
+       if (osd_has_index(obj) &&  obj->oo_dt.do_index_ops == &osd_index_iam_ops)
+               ldiskfs_set_inode_flag(inode, LDISKFS_INODE_JOURNAL_DATA);
+
        uid = i_uid_read(inode);
        gid = i_gid_read(inode);
        projid = i_projid_read(inode);
@@ -2204,9 +2273,9 @@ int osd_statfs(const struct lu_env *env, struct dt_device *d,
 
        statfs_pack(sfs, ksfs);
        if (unlikely(sb->s_flags & SB_RDONLY))
-               sfs->os_state |= OS_STATE_READONLY;
+               sfs->os_state |= OS_STATFS_READONLY;
 
-       sfs->os_state |= osd->od_nonrotational ? OS_STATE_NONROT : 0;
+       sfs->os_state |= osd->od_nonrotational ? OS_STATFS_NONROT : 0;
 
        if (ldiskfs_has_feature_extents(sb))
                sfs->os_maxbytes = sb->s_maxbytes;
@@ -2358,16 +2427,17 @@ static void osd_conf_get(const struct lu_env *env,
                                        OBD_CKSUM_T10IP512 :
                                        OBD_CKSUM_T10IP4K;
                        } else {
-                               CERROR("%s: unsupported checksum type of "
-                                      "T10PI type '%s'",
+                               CERROR("%s: unsupported checksum type of T10PI type '%s'\n",
                                       d->od_svname, name);
                        }
 
                } else {
-                       CERROR("%s: unsupported T10PI type '%s'",
+                       CERROR("%s: unsupported T10PI type '%s'\n",
                               d->od_svname, name);
                }
        }
+
+       param->ddp_has_lseek_data_hole = true;
 }
 
 static struct super_block *osd_mnt_sb_get(const struct dt_device *d)
@@ -2422,10 +2492,6 @@ static int osd_commit_async(const struct lu_env *env,
        RETURN(rc);
 }
 
-/* Our own copy of the set readonly functions if present, or NU if not. */
-static int (*priv_dev_set_rdonly)(struct block_device *bdev);
-static int (*priv_dev_check_rdonly)(struct block_device *bdev);
-/* static int (*priv_dev_clear_rdonly)(struct block_device *bdev); */
 static int (*priv_security_file_alloc)(struct file *file);
 
 int osd_security_file_alloc(struct file *file)
@@ -2446,35 +2512,8 @@ static int osd_ro(const struct lu_env *env, struct dt_device *d)
 
        ENTRY;
 
-       if (priv_dev_set_rdonly) {
-               struct block_device *jdev = LDISKFS_SB(sb)->journal_bdev;
-
-               rc = 0;
-               CERROR("*** setting %s read-only ***\n",
-                      osd_dt_dev(d)->od_svname);
-
-               if (sb->s_op->freeze_fs) {
-                       rc = sb->s_op->freeze_fs(sb);
-                       if (rc)
-                               goto out;
-               }
-
-               if (jdev && (jdev != dev)) {
-                       CDEBUG(D_IOCTL | D_HA, "set journal dev %lx rdonly\n",
-                              (long)jdev);
-                       priv_dev_set_rdonly(jdev);
-               }
-               CDEBUG(D_IOCTL | D_HA, "set dev %lx rdonly\n", (long)dev);
-               priv_dev_set_rdonly(dev);
-
-               if (sb->s_op->unfreeze_fs)
-                       sb->s_op->unfreeze_fs(sb);
-       }
-
-out:
-       if (rc)
-               CERROR("%s: %lx CANNOT BE SET READONLY: rc = %d\n",
-                      osd_dt_dev(d)->od_svname, (long)dev, rc);
+       CERROR("%s: %lx CANNOT BE SET READONLY: rc = %d\n",
+              osd_dt_dev(d)->od_svname, (long)dev, rc);
 
        RETURN(rc);
 }
@@ -2537,18 +2576,42 @@ const int osd_dto_credits_noquota[DTO_NR] = {
        [DTO_ATTR_SET_CHOWN] = 0
 };
 
+/* reserve or free quota for some operation */
+static int osd_reserve_or_free_quota(const struct lu_env *env,
+                                    struct dt_device *dev,
+                                    enum quota_type type, __u64 uid,
+                                    __u64 gid, __s64 count, bool is_md)
+{
+       int rc;
+       struct osd_device       *osd = osd_dt_dev(dev);
+       struct osd_thread_info  *info = osd_oti_get(env);
+       struct lquota_id_info   *qi = &info->oti_qi;
+       struct qsd_instance     *qsd = NULL;
+
+       ENTRY;
+
+       if (is_md)
+               qsd = osd->od_quota_slave_md;
+       else
+               qsd = osd->od_quota_slave_dt;
+
+       rc = quota_reserve_or_free(env, qsd, qi, type, uid, gid, count, is_md);
+       RETURN(rc);
+}
+
 static const struct dt_device_operations osd_dt_ops = {
-       .dt_root_get       = osd_root_get,
-       .dt_statfs         = osd_statfs,
-       .dt_trans_create   = osd_trans_create,
-       .dt_trans_start    = osd_trans_start,
-       .dt_trans_stop     = osd_trans_stop,
-       .dt_trans_cb_add   = osd_trans_cb_add,
-       .dt_conf_get       = osd_conf_get,
-       .dt_mnt_sb_get     = osd_mnt_sb_get,
-       .dt_sync           = osd_sync,
-       .dt_ro             = osd_ro,
-       .dt_commit_async   = osd_commit_async,
+       .dt_root_get              = osd_root_get,
+       .dt_statfs                = osd_statfs,
+       .dt_trans_create          = osd_trans_create,
+       .dt_trans_start           = osd_trans_start,
+       .dt_trans_stop            = osd_trans_stop,
+       .dt_trans_cb_add          = osd_trans_cb_add,
+       .dt_conf_get              = osd_conf_get,
+       .dt_mnt_sb_get            = osd_mnt_sb_get,
+       .dt_sync                  = osd_sync,
+       .dt_ro                    = osd_ro,
+       .dt_commit_async          = osd_commit_async,
+       .dt_reserve_or_free_quota = osd_reserve_or_free_quota,
 };
 
 static void osd_read_lock(const struct lu_env *env, struct dt_object *dt,
@@ -2623,11 +2686,12 @@ static void osd_inode_getattr(const struct lu_env *env,
        attr->la_valid  |= LA_ATIME | LA_MTIME | LA_CTIME | LA_MODE |
                           LA_SIZE | LA_BLOCKS | LA_UID | LA_GID |
                           LA_PROJID | LA_FLAGS | LA_NLINK | LA_RDEV |
-                          LA_BLKSIZE | LA_TYPE;
+                          LA_BLKSIZE | LA_TYPE | LA_BTIME;
 
        attr->la_atime = inode->i_atime.tv_sec;
        attr->la_mtime = inode->i_mtime.tv_sec;
        attr->la_ctime = inode->i_ctime.tv_sec;
+       attr->la_btime = LDISKFS_I(inode)->i_crtime.tv_sec;
        attr->la_mode    = inode->i_mode;
        attr->la_size    = i_size_read(inode);
        attr->la_blocks  = inode->i_blocks;
@@ -2648,10 +2712,71 @@ static void osd_inode_getattr(const struct lu_env *env,
                attr->la_flags |= LUSTRE_PROJINHERIT_FL;
 }
 
+static int osd_dirent_count(const struct lu_env *env, struct dt_object *dt,
+                           u64 *count)
+{
+       struct osd_object *obj = osd_dt_obj(dt);
+       const struct dt_it_ops *iops;
+       struct dt_it *it;
+       int rc;
+
+       ENTRY;
+
+       LASSERT(S_ISDIR(obj->oo_inode->i_mode));
+       LASSERT(fid_is_namespace_visible(lu_object_fid(&obj->oo_dt.do_lu)));
+
+       if (obj->oo_dirent_count != LU_DIRENT_COUNT_UNSET) {
+               *count = obj->oo_dirent_count;
+               RETURN(0);
+       }
+
+       /* directory not initialized yet */
+       if (!dt->do_index_ops) {
+               *count = 0;
+               RETURN(0);
+       }
+
+       iops = &dt->do_index_ops->dio_it;
+       it = iops->init(env, dt, LUDA_64BITHASH);
+       if (IS_ERR(it))
+               RETURN(PTR_ERR(it));
+
+       rc = iops->load(env, it, 0);
+       if (rc < 0) {
+               if (rc == -ENODATA) {
+                       rc = 0;
+                       *count = 0;
+               }
+               GOTO(out, rc);
+       }
+       if (rc > 0)
+               rc = iops->next(env, it);
+
+       for (*count = 0; rc == 0 || rc == -ESTALE; rc = iops->next(env, it)) {
+               if (rc == -ESTALE)
+                       continue;
+
+               if (iops->key_size(env, it) == 0)
+                       continue;
+
+               (*count)++;
+       }
+       if (rc == 1) {
+               obj->oo_dirent_count = *count;
+               rc = 0;
+       }
+out:
+       iops->put(env, it);
+       iops->fini(env, it);
+
+       RETURN(rc);
+}
+
 static int osd_attr_get(const struct lu_env *env, struct dt_object *dt,
                        struct lu_attr *attr)
 {
        struct osd_object *obj = osd_dt_obj(dt);
+       int rc = 0;
 
        if (unlikely(!dt_object_exists(dt)))
                return -ENOENT;
@@ -2667,16 +2792,24 @@ static int osd_attr_get(const struct lu_env *env, struct dt_object *dt,
                attr->la_valid |= LA_FLAGS;
                attr->la_flags |= LUSTRE_ORPHAN_FL;
        }
+       if (obj->oo_lma_flags & LUSTRE_ENCRYPT_FL) {
+               attr->la_valid |= LA_FLAGS;
+               attr->la_flags |= LUSTRE_ENCRYPT_FL;
+       }
        spin_unlock(&obj->oo_guard);
 
-       return 0;
+       if (S_ISDIR(obj->oo_inode->i_mode) &&
+           fid_is_namespace_visible(lu_object_fid(&dt->do_lu)))
+               rc = osd_dirent_count(env, dt, &attr->la_dirent_count);
+
+       return rc;
 }
 
 static int osd_declare_attr_qid(const struct lu_env *env,
                                struct osd_object *obj,
                                struct osd_thandle *oh, long long bspace,
                                qid_t old_id, qid_t new_id, bool enforce,
-                               unsigned int type, bool ignore_edquot)
+                               unsigned int type)
 {
        int rc;
        struct osd_thread_info *info = osd_oti_get(env);
@@ -2691,7 +2824,7 @@ static int osd_declare_attr_qid(const struct lu_env *env,
        qi->lqi_space      = 1;
        /* Reserve credits for the new id */
        rc = osd_declare_qid(env, oh, qi, NULL, enforce, NULL);
-       if (ignore_edquot && (rc == -EDQUOT || rc == -EINPROGRESS))
+       if (rc == -EDQUOT || rc == -EINPROGRESS)
                rc = 0;
        if (rc)
                RETURN(rc);
@@ -2700,7 +2833,7 @@ static int osd_declare_attr_qid(const struct lu_env *env,
        qi->lqi_id.qid_uid = old_id;
        qi->lqi_space = -1;
        rc = osd_declare_qid(env, oh, qi, obj, enforce, NULL);
-       if (ignore_edquot && (rc == -EDQUOT || rc == -EINPROGRESS))
+       if (rc == -EDQUOT || rc == -EINPROGRESS)
                rc = 0;
        if (rc)
                RETURN(rc);
@@ -2716,7 +2849,7 @@ static int osd_declare_attr_qid(const struct lu_env *env,
         * to save credit reservation.
         */
        rc = osd_declare_qid(env, oh, qi, obj, enforce, NULL);
-       if (ignore_edquot && (rc == -EDQUOT || rc == -EINPROGRESS))
+       if (rc == -EDQUOT || rc == -EINPROGRESS)
                rc = 0;
        if (rc)
                RETURN(rc);
@@ -2725,7 +2858,7 @@ static int osd_declare_attr_qid(const struct lu_env *env,
        qi->lqi_id.qid_uid = old_id;
        qi->lqi_space      = -bspace;
        rc = osd_declare_qid(env, oh, qi, obj, enforce, NULL);
-       if (ignore_edquot && (rc == -EDQUOT || rc == -EINPROGRESS))
+       if (rc == -EDQUOT || rc == -EINPROGRESS)
                rc = 0;
 
        RETURN(rc);
@@ -2752,7 +2885,7 @@ static int osd_declare_attr_set(const struct lu_env *env,
        obj = osd_dt_obj(dt);
        LASSERT(osd_invariant(obj));
 
-       oh = container_of0(handle, struct osd_thandle, ot_super);
+       oh = container_of(handle, struct osd_thandle, ot_super);
        LASSERT(oh->ot_handle == NULL);
 
        osd_trans_declare_op(env, oh, OSD_OT_ATTR_SET,
@@ -2776,29 +2909,20 @@ static int osd_declare_attr_set(const struct lu_env *env,
         * space adjustment once the operation is completed.
         */
        if (attr->la_valid & LA_UID || attr->la_valid & LA_GID) {
-               bool ignore_edquot = !(attr->la_flags & LUSTRE_SET_SYNC_FL);
-
-               if (!ignore_edquot)
-                       CDEBUG(D_QUOTA, "%s: enforce quota on UID %u, GID %u"
-                              "(the quota space is %lld)\n",
-                              obj->oo_inode->i_sb->s_id, attr->la_uid,
-                              attr->la_gid, bspace);
-
                /* USERQUOTA */
                uid = i_uid_read(obj->oo_inode);
                enforce = (attr->la_valid & LA_UID) && (attr->la_uid != uid);
                rc = osd_declare_attr_qid(env, obj, oh, bspace, uid,
-                                         attr->la_uid, enforce, USRQUOTA,
-                                         true);
+                                         attr->la_uid, enforce, USRQUOTA);
                if (rc)
                        RETURN(rc);
 
                gid = i_gid_read(obj->oo_inode);
+               CDEBUG(D_QUOTA, "declare uid %d -> %d gid %d -> %d\n", uid,
+                      attr->la_uid, gid, attr->la_gid);
                enforce = (attr->la_valid & LA_GID) && (attr->la_gid != gid);
-               rc = osd_declare_attr_qid(env, obj, oh, bspace,
-                                         i_gid_read(obj->oo_inode),
-                                         attr->la_gid, enforce, GRPQUOTA,
-                                         ignore_edquot);
+               rc = osd_declare_attr_qid(env, obj, oh, bspace, gid,
+                                         attr->la_gid, enforce, GRPQUOTA);
                if (rc)
                        RETURN(rc);
 
@@ -2811,7 +2935,7 @@ static int osd_declare_attr_set(const struct lu_env *env,
                                        (attr->la_projid != projid);
                rc = osd_declare_attr_qid(env, obj, oh, bspace,
                                          (qid_t)projid, (qid_t)attr->la_projid,
-                                         enforce, PRJQUOTA, true);
+                                         enforce, PRJQUOTA);
                if (rc)
                        RETURN(rc);
        }
@@ -2866,6 +2990,13 @@ static int osd_inode_setattr(const struct lu_env *env,
                /* always keep S_NOCMTIME */
                inode->i_flags = ll_ext_to_inode_flags(attr->la_flags) |
                                 S_NOCMTIME;
+#if defined(S_ENCRYPTED)
+               /* Always remove S_ENCRYPTED, because ldiskfs must not be
+                * aware of encryption status. It is just stored into LMA
+                * so that it can be forwared to client side.
+                */
+               inode->i_flags &= ~S_ENCRYPTED;
+#endif
                /*
                 * Ext4 did not transfer inherit flags from
                 * @inode->i_flags to raw inode i_flags when writing
@@ -2880,7 +3011,8 @@ static int osd_inode_setattr(const struct lu_env *env,
 }
 
 #ifdef HAVE_PROJECT_QUOTA
-static int osd_transfer_project(struct inode *inode, __u32 projid)
+static int osd_transfer_project(struct inode *inode, __u32 projid,
+                               struct thandle *handle)
 {
        struct super_block *sb = inode->i_sb;
        struct ldiskfs_inode_info *ei = LDISKFS_I(inode);
@@ -2912,16 +3044,27 @@ static int osd_transfer_project(struct inode *inode, __u32 projid)
 
        raw_inode = ldiskfs_raw_inode(&iloc);
        if (!LDISKFS_FITS_IN_INODE(raw_inode, ei, i_projid)) {
-               err = -EOVERFLOW;
-               brelse(iloc.bh);
-               return err;
+               struct osd_thandle *oh = container_of(handle,
+                                                     struct osd_thandle,
+                                                     ot_super);
+               /**
+                * try to expand inode size automatically.
+                */
+               ldiskfs_mark_inode_dirty(oh->ot_handle, inode);
+               if (!LDISKFS_FITS_IN_INODE(raw_inode, ei, i_projid)) {
+                       err = -EOVERFLOW;
+                       brelse(iloc.bh);
+                       return err;
+               }
        }
        brelse(iloc.bh);
 
        dquot_initialize(inode);
        transfer_to[PRJQUOTA] = dqget(sb, make_kqid_projid(kprojid));
        if (transfer_to[PRJQUOTA]) {
+               lock_dquot_transfer(inode);
                err = __dquot_transfer(inode, transfer_to);
+               unlock_dquot_transfer(inode);
                dqput(transfer_to[PRJQUOTA]);
                if (err)
                        return err;
@@ -2931,7 +3074,8 @@ static int osd_transfer_project(struct inode *inode, __u32 projid)
 }
 #endif
 
-static int osd_quota_transfer(struct inode *inode, const struct lu_attr *attr)
+static int osd_quota_transfer(struct inode *inode, const struct lu_attr *attr,
+                             struct thandle *handle)
 {
        int rc;
 
@@ -2939,6 +3083,11 @@ static int osd_quota_transfer(struct inode *inode, const struct lu_attr *attr)
            (attr->la_valid & LA_GID && attr->la_gid != i_gid_read(inode))) {
                struct iattr iattr;
 
+               CDEBUG(D_QUOTA,
+                      "executing dquot_transfer inode %ld uid %d -> %d gid %d -> %d\n",
+                      inode->i_ino, i_uid_read(inode), attr->la_uid,
+                      i_gid_read(inode), attr->la_gid);
+
                dquot_initialize(inode);
                iattr.ia_valid = 0;
                if (attr->la_valid & LA_UID)
@@ -2948,11 +3097,12 @@ static int osd_quota_transfer(struct inode *inode, const struct lu_attr *attr)
                iattr.ia_uid = make_kuid(&init_user_ns, attr->la_uid);
                iattr.ia_gid = make_kgid(&init_user_ns, attr->la_gid);
 
+               lock_dquot_transfer(inode);
                rc = dquot_transfer(inode, &iattr);
+               unlock_dquot_transfer(inode);
                if (rc) {
-                       CERROR("%s: quota transfer failed: rc = %d. Is quota "
-                              "enforcement enabled on the ldiskfs "
-                              "filesystem?\n", inode->i_sb->s_id, rc);
+                       CERROR("%s: quota transfer failed. Is quota enforcement enabled on the ldiskfs filesystem? rc = %d\n",
+                              osd_ino2name(inode), rc);
                        return rc;
                }
        }
@@ -2960,15 +3110,16 @@ static int osd_quota_transfer(struct inode *inode, const struct lu_attr *attr)
        /* Handle project id transfer here properly */
        if (attr->la_valid & LA_PROJID &&
            attr->la_projid != i_projid_read(inode)) {
+               if (!projid_valid(make_kprojid(&init_user_ns, attr->la_projid)))
+                       return -EINVAL;
 #ifdef HAVE_PROJECT_QUOTA
-               rc = osd_transfer_project(inode, attr->la_projid);
+               rc = osd_transfer_project(inode, attr->la_projid, handle);
 #else
                rc = -ENOTSUPP;
 #endif
                if (rc) {
-                       CERROR("%s: quota transfer failed: rc = %d. Is project "
-                              "enforcement enabled on the ldiskfs "
-                              "filesystem?\n", inode->i_sb->s_id, rc);
+                       CERROR("%s: quota transfer failed. Is project enforcement enabled on the ldiskfs filesystem? rc = %d\n",
+                              osd_ino2name(inode), rc);
                        return rc;
                }
        }
@@ -3012,7 +3163,7 @@ static int osd_attr_set(const struct lu_env *env,
                if (unlikely(ipd == NULL))
                        RETURN(-ENOMEM);
 
-               oh = container_of0(handle, struct osd_thandle, ot_super);
+               oh = container_of(handle, struct osd_thandle, ot_super);
                rc = iam_update(oh->ot_handle, bag,
                                (const struct iam_key *)fid1,
                                (const struct iam_rec *)id, ipd);
@@ -3022,7 +3173,7 @@ static int osd_attr_set(const struct lu_env *env,
 
        inode = obj->oo_inode;
 
-       rc = osd_quota_transfer(inode, attr);
+       rc = osd_quota_transfer(inode, attr, handle);
        if (rc)
                return rc;
 
@@ -3032,7 +3183,9 @@ static int osd_attr_set(const struct lu_env *env,
        if (rc != 0)
                GOTO(out, rc);
 
-       ll_dirty_inode(inode, I_DIRTY_DATASYNC);
+       osd_dirty_inode(inode, I_DIRTY_DATASYNC);
+
+       osd_trans_exec_check(env, handle, OSD_OT_ATTR_SET);
 
        if (!(attr->la_valid & LA_FLAGS))
                GOTO(out, rc);
@@ -3052,6 +3205,9 @@ static int osd_attr_set(const struct lu_env *env,
                lma->lma_incompat |=
                        lustre_to_lma_flags(attr->la_flags);
                lustre_lma_swab(lma);
+
+               osd_trans_exec_op(env, handle, OSD_OT_XATTR_SET);
+
                rc = __osd_xattr_set(info, inode, XATTR_NAME_LMA,
                                     lma, sizeof(*lma), XATTR_REPLACE);
                if (rc != 0) {
@@ -3067,7 +3223,6 @@ static int osd_attr_set(const struct lu_env *env,
                osd_trans_exec_check(env, handle, OSD_OT_XATTR_SET);
        }
 out:
-       osd_trans_exec_check(env, handle, OSD_OT_ATTR_SET);
 
        return rc;
 }
@@ -3088,12 +3243,21 @@ static int osd_mkfile(struct osd_thread_info *info, struct osd_object *obj,
        struct osd_thandle *oth;
        struct dt_object *parent = NULL;
        struct inode *inode;
-       uid_t owner[2] = {0, 0};
+       struct iattr iattr = {
+               .ia_valid = ATTR_UID | ATTR_GID |
+                           ATTR_CTIME | ATTR_MTIME | ATTR_ATIME,
+               .ia_ctime.tv_sec = attr->la_ctime,
+               .ia_mtime.tv_sec = attr->la_mtime,
+               .ia_atime.tv_sec = attr->la_atime,
+               .ia_uid = GLOBAL_ROOT_UID,
+               .ia_gid = GLOBAL_ROOT_GID,
+       };
+       const struct osd_timespec omit = { .tv_nsec = UTIME_OMIT };
 
        if (attr->la_valid & LA_UID)
-               owner[0] = attr->la_uid;
+               iattr.ia_uid = make_kuid(&init_user_ns, attr->la_uid);
        if (attr->la_valid & LA_GID)
-               owner[1] = attr->la_gid;
+               iattr.ia_gid = make_kgid(&init_user_ns, attr->la_gid);
 
        LINVRNT(osd_invariant(obj));
        LASSERT(obj->oo_inode == NULL);
@@ -3113,10 +3277,18 @@ static int osd_mkfile(struct osd_thread_info *info, struct osd_object *obj,
            !dt_object_remote(hint->dah_parent))
                parent = hint->dah_parent;
 
+       /* if a time component is not valid set it to UTIME_OMIT */
+       if (!(attr->la_valid & LA_CTIME))
+               iattr.ia_ctime = omit;
+       if (!(attr->la_valid & LA_MTIME))
+               iattr.ia_mtime = omit;
+       if (!(attr->la_valid & LA_ATIME))
+               iattr.ia_atime = omit;
+
        inode = ldiskfs_create_inode(oth->ot_handle,
                                     parent ? osd_dt_obj(parent)->oo_inode :
                                              osd_sb(osd)->s_root->d_inode,
-                                    mode, owner);
+                                    mode, &iattr);
        if (!IS_ERR(inode)) {
                /* Do not update file c/mtime in ldiskfs. */
                inode->i_flags |= S_NOCMTIME;
@@ -3158,6 +3330,8 @@ static int osd_mkdir(struct osd_thread_info *info, struct osd_object *obj,
 
        oth = container_of(th, struct osd_thandle, ot_super);
        LASSERT(oth->ot_handle->h_transaction != NULL);
+       if (fid_is_namespace_visible(lu_object_fid(&obj->oo_dt.do_lu)))
+               obj->oo_dirent_count = 0;
        result = osd_mkfile(info, obj, mode, hint, th, attr);
 
        return result;
@@ -3285,7 +3459,6 @@ static osd_obj_type_f osd_create_type_f(enum dt_format_type type)
        return result;
 }
 
-
 static void osd_ah_init(const struct lu_env *env, struct dt_allocation_hint *ah,
                        struct dt_object *parent, struct dt_object *child,
                        umode_t child_mode)
@@ -3304,7 +3477,8 @@ static void osd_ah_init(const struct lu_env *env, struct dt_allocation_hint *ah,
 }
 
 static void osd_attr_init(struct osd_thread_info *info, struct osd_object *obj,
-                         struct lu_attr *attr, struct dt_object_format *dof)
+                         struct lu_attr *attr, struct dt_object_format *dof,
+                         struct thandle *handle)
 {
        struct inode *inode = obj->oo_inode;
        __u64 valid = attr->la_valid;
@@ -3321,7 +3495,7 @@ static void osd_attr_init(struct osd_thread_info *info, struct osd_object *obj,
        if ((valid & LA_MTIME) && (attr->la_mtime == inode->i_mtime.tv_sec))
                attr->la_valid &= ~LA_MTIME;
 
-       result = osd_quota_transfer(inode, attr);
+       result = osd_quota_transfer(inode, attr, handle);
        if (result)
                return;
 
@@ -3335,7 +3509,7 @@ static void osd_attr_init(struct osd_thread_info *info, struct osd_object *obj,
                 * enabled on ldiskfs (lquota takes care of it).
                 */
                LASSERTF(result == 0, "%d\n", result);
-               ll_dirty_inode(inode, I_DIRTY_DATASYNC);
+               osd_dirty_inode(inode, I_DIRTY_DATASYNC);
        }
 
        attr->la_valid = valid;
@@ -3372,7 +3546,7 @@ static int __osd_create(struct osd_thread_info *info, struct osd_object *obj,
        }
 
        if (likely(result == 0)) {
-               osd_attr_init(info, obj, attr, dof);
+               osd_attr_init(info, obj, attr, dof, th);
                osd_object_init0(obj);
        }
 
@@ -3400,7 +3574,10 @@ static int __osd_oi_insert(const struct lu_env *env, struct osd_object *obj,
 
        LASSERT(obj->oo_inode != NULL);
 
-       oh = container_of0(th, struct osd_thandle, ot_super);
+       if (CFS_FAIL_CHECK(OBD_FAIL_OSD_OI_ENOSPC))
+               return -ENOSPC;
+
+       oh = container_of(th, struct osd_thandle, ot_super);
        LASSERT(oh->ot_handle);
        osd_trans_exec_op(env, th, OSD_OT_INSERT);
 
@@ -3460,13 +3637,15 @@ static int osd_declare_create(const struct lu_env *env, struct dt_object *dt,
                              struct thandle *handle)
 {
        struct osd_thandle *oh;
+       struct super_block *sb = osd_sb(osd_dev(dt->do_lu.lo_dev));
+       int credits;
        int rc;
 
        ENTRY;
 
        LASSERT(handle != NULL);
 
-       oh = container_of0(handle, struct osd_thandle, ot_super);
+       oh = container_of(handle, struct osd_thandle, ot_super);
        LASSERT(oh->ot_handle == NULL);
 
        /*
@@ -3474,10 +3653,23 @@ static int osd_declare_create(const struct lu_env *env, struct dt_object *dt,
         * vs. osd_mkreg: osd_mk_index will create 2 blocks for root_node and
         * leaf_node, could involves the block, block bitmap, groups, GDT
         * change for each block, so add 4 * 2 credits in that case.
+        *
+        * The default ACL initialization may consume an additional 16 blocks
         */
-       osd_trans_declare_op(env, oh, OSD_OT_CREATE,
-                            osd_dto_credits_noquota[DTO_OBJECT_CREATE] +
-                            (dof->dof_type == DFT_INDEX) ? 4 * 2 : 0);
+       credits = osd_dto_credits_noquota[DTO_OBJECT_CREATE] +
+                 ((dof->dof_type == DFT_INDEX) ? 4 * 2 : 0);
+
+       /**
+        * While ldiskfs_new_inode() calls ldiskfs_init_acl() we have to add
+        * credits for possible default ACL creation in new inode
+        */
+       if (hint && hint->dah_acl_len)
+               credits += osd_calc_bkmap_credits(sb, NULL, 0, -1,
+                               (hint->dah_acl_len + sb->s_blocksize - 1) >>
+                               sb->s_blocksize_bits);
+
+       osd_trans_declare_op(env, oh, OSD_OT_CREATE, credits);
+
        /*
         * Reuse idle OI block may cause additional one OI block
         * to be changed.
@@ -3524,7 +3716,7 @@ static int osd_declare_destroy(const struct lu_env *env, struct dt_object *dt,
        if (inode == NULL)
                RETURN(-ENOENT);
 
-       oh = container_of0(th, struct osd_thandle, ot_super);
+       oh = container_of(th, struct osd_thandle, ot_super);
        LASSERT(oh->ot_handle == NULL);
 
        osd_trans_declare_op(env, oh, OSD_OT_DESTROY,
@@ -3575,7 +3767,7 @@ static int osd_destroy(const struct lu_env *env, struct dt_object *dt,
 
        ENTRY;
 
-       oh = container_of0(th, struct osd_thandle, ot_super);
+       oh = container_of(th, struct osd_thandle, ot_super);
        LASSERT(oh->ot_handle);
        LASSERT(inode);
        LASSERT(!lu_object_is_dying(dt->do_lu.lo_header));
@@ -3591,13 +3783,15 @@ static int osd_destroy(const struct lu_env *env, struct dt_object *dt,
        }
 
        if (S_ISDIR(inode->i_mode)) {
-               LASSERT(osd_inode_unlinked(inode) || inode->i_nlink == 1 ||
-                       inode->i_nlink == 2);
+               if (inode->i_nlink > 2)
+                       CERROR("%s: directory "DFID" ino %lu link count is %u at unlink. run e2fsck to repair\n",
+                              osd_name(osd), PFID(fid), inode->i_ino,
+                              inode->i_nlink);
 
                spin_lock(&obj->oo_guard);
                clear_nlink(inode);
                spin_unlock(&obj->oo_guard);
-               ll_dirty_inode(inode, I_DIRTY_DATASYNC);
+               osd_dirty_inode(inode, I_DIRTY_DATASYNC);
        }
 
        osd_trans_exec_op(env, th, OSD_OT_DESTROY);
@@ -3756,6 +3950,9 @@ static int osd_add_dot_dotdot_internal(struct osd_thread_info *info,
        __u32 saved_nlink = dir->i_nlink;
        int rc;
 
+       if (OBD_FAIL_CHECK(OBD_FAIL_OSD_DOTDOT_ENOSPC))
+               return -ENOSPC;
+
        dot_dot_ldp = (struct ldiskfs_dentry_param *)info->oti_ldp2;
        osd_get_ldiskfs_dirent_param(dot_dot_ldp, dot_dot_fid);
 
@@ -3796,6 +3993,15 @@ static struct inode *osd_create_local_agent_inode(const struct lu_env *env,
        struct osd_thread_info *info = osd_oti_get(env);
        struct inode *local;
        struct osd_thandle *oh;
+       struct iattr iattr = {
+               .ia_valid = ATTR_UID | ATTR_GID |
+                           ATTR_CTIME | ATTR_MTIME | ATTR_ATIME,
+               .ia_ctime.tv_nsec = UTIME_OMIT,
+               .ia_mtime.tv_nsec = UTIME_OMIT,
+               .ia_atime.tv_nsec = UTIME_OMIT,
+               .ia_uid = GLOBAL_ROOT_UID,
+               .ia_gid = GLOBAL_ROOT_GID,
+       };
        int rc;
 
        ENTRY;
@@ -3804,8 +4010,8 @@ static struct inode *osd_create_local_agent_inode(const struct lu_env *env,
        oh = container_of(th, struct osd_thandle, ot_super);
        LASSERT(oh->ot_handle->h_transaction != NULL);
 
-       local = ldiskfs_create_inode(oh->ot_handle, pobj->oo_inode, type,
-                                    NULL);
+       local = ldiskfs_create_inode(oh->ot_handle, pobj->oo_inode,
+                                    type, &iattr);
        if (IS_ERR(local)) {
                CERROR("%s: create local error %d\n", osd_name(osd),
                       (int)PTR_ERR(local));
@@ -3825,8 +4031,8 @@ static struct inode *osd_create_local_agent_inode(const struct lu_env *env,
         */
        if (S_ISLNK(type)) {
                BUILD_BUG_ON(LDISKFS_N_BLOCKS * 4 < FID_LEN + 1);
-               rc = snprintf((char *)LDISKFS_I(local)->i_data,
-                             LDISKFS_N_BLOCKS * 4, DFID, PFID(fid));
+               rc = scnprintf((char *)LDISKFS_I(local)->i_data,
+                              LDISKFS_N_BLOCKS * 4, DFID, PFID(fid));
 
                i_size_write(local, rc);
                LDISKFS_I(local)->i_disksize = rc;
@@ -3837,11 +4043,10 @@ static struct inode *osd_create_local_agent_inode(const struct lu_env *env,
 #ifdef HAVE_PROJECT_QUOTA
        if (LDISKFS_I(pobj->oo_inode)->i_flags & LUSTRE_PROJINHERIT_FL &&
            i_projid_read(pobj->oo_inode) != 0) {
-               rc = osd_transfer_project(local, 0);
+               rc = osd_transfer_project(local, 0, th);
                if (rc) {
-                       CERROR("%s: quota transfer failed: rc = %d. Is project "
-                              "quota enforcement enabled on the ldiskfs "
-                              "filesystem?\n", local->i_sb->s_id, rc);
+                       CERROR("%s: quota transfer failed:. Is project quota enforcement enabled on the ldiskfs filesystem? rc = %d\n",
+                              osd_ino2name(local), rc);
                        RETURN(ERR_PTR(rc));
                }
        }
@@ -3908,20 +4113,16 @@ static int osd_process_scheduled_agent_removals(const struct lu_env *env,
        struct osd_thread_info *info = osd_oti_get(env);
        struct osd_obj_orphan *oor, *tmp;
        struct osd_inode_id id;
-       struct list_head list;
+       LIST_HEAD(list);
        struct inode *inode;
        struct lu_fid fid;
        handle_t *jh;
        __u32 ino;
 
-       INIT_LIST_HEAD(&list);
-
        spin_lock(&osd->od_osfs_lock);
        list_for_each_entry_safe(oor, tmp, &osd->od_orphan_list, oor_list) {
-               if (oor->oor_env == env) {
-                       list_del(&oor->oor_list);
-                       list_add(&oor->oor_list, &list);
-               }
+               if (oor->oor_env == env)
+                       list_move(&oor->oor_list, &list);
        }
        spin_unlock(&osd->od_osfs_lock);
 
@@ -4009,8 +4210,21 @@ static int osd_create(const struct lu_env *env, struct dt_object *dt,
                        obj->oo_dt.do_body_ops = &osd_body_ops;
        }
 
-       if (!result && !CFS_FAIL_CHECK(OBD_FAIL_OSD_NO_OI_ENTRY))
+       if (!result && !CFS_FAIL_CHECK(OBD_FAIL_OSD_NO_OI_ENTRY)) {
+               struct inode *inode = obj->oo_inode;
+
                result = __osd_oi_insert(env, obj, fid, th);
+               if (result && inode) {
+                       spin_lock(&obj->oo_guard);
+                       clear_nlink(inode);
+                       spin_unlock(&obj->oo_guard);
+                       osd_dirty_inode(inode, I_DIRTY_DATASYNC);
+                       ldiskfs_set_inode_state(inode,
+                                               LDISKFS_STATE_LUSTRE_DESTROY);
+                       iput(inode);
+                       obj->oo_inode = NULL;
+               }
+       }
 
        /*
         * a small optimization - dt_insert() isn't usually applied
@@ -4034,19 +4248,21 @@ static int osd_declare_ref_add(const struct lu_env *env, struct dt_object *dt,
                               struct thandle *handle)
 {
        struct osd_thandle *oh;
+       int rc;
 
        /* it's possible that object doesn't exist yet */
        LASSERT(handle != NULL);
 
-       oh = container_of0(handle, struct osd_thandle, ot_super);
+       oh = container_of(handle, struct osd_thandle, ot_super);
        LASSERT(oh->ot_handle == NULL);
 
        osd_trans_declare_op(env, oh, OSD_OT_REF_ADD,
                             osd_dto_credits_noquota[DTO_ATTR_SET_BASE]);
 
-       osd_idc_find_and_init(env, osd_dev(dt->do_lu.lo_dev), osd_dt_obj(dt));
+       rc = osd_idc_find_and_init(env, osd_dev(dt->do_lu.lo_dev),
+                                  osd_dt_obj(dt));
 
-       return 0;
+       return rc;
 }
 
 /*
@@ -4068,7 +4284,7 @@ static int osd_ref_add(const struct lu_env *env, struct dt_object *dt,
        LASSERT(osd_is_write_locked(env, obj));
        LASSERT(th != NULL);
 
-       oh = container_of0(th, struct osd_thandle, ot_super);
+       oh = container_of(th, struct osd_thandle, ot_super);
        LASSERT(oh->ot_handle != NULL);
 
        osd_trans_exec_op(env, th, OSD_OT_REF_ADD);
@@ -4097,7 +4313,7 @@ static int osd_ref_add(const struct lu_env *env, struct dt_object *dt,
        }
        spin_unlock(&obj->oo_guard);
 
-       ll_dirty_inode(inode, I_DIRTY_DATASYNC);
+       osd_dirty_inode(inode, I_DIRTY_DATASYNC);
        LINVRNT(osd_invariant(obj));
 
        osd_trans_exec_check(env, th, OSD_OT_REF_ADD);
@@ -4116,7 +4332,7 @@ static int osd_declare_ref_del(const struct lu_env *env, struct dt_object *dt,
        LASSERT(!dt_object_remote(dt));
        LASSERT(handle != NULL);
 
-       oh = container_of0(handle, struct osd_thandle, ot_super);
+       oh = container_of(handle, struct osd_thandle, ot_super);
        LASSERT(oh->ot_handle == NULL);
 
        osd_trans_declare_op(env, oh, OSD_OT_REF_DEL,
@@ -4144,7 +4360,10 @@ static int osd_ref_del(const struct lu_env *env, struct dt_object *dt,
        LASSERT(osd_is_write_locked(env, obj));
        LASSERT(th != NULL);
 
-       oh = container_of0(th, struct osd_thandle, ot_super);
+       if (OBD_FAIL_CHECK(OBD_FAIL_OSD_REF_DEL))
+               return -EIO;
+
+       oh = container_of(th, struct osd_thandle, ot_super);
        LASSERT(oh->ot_handle != NULL);
 
        osd_trans_exec_op(env, th, OSD_OT_REF_DEL);
@@ -4170,7 +4389,7 @@ static int osd_ref_del(const struct lu_env *env, struct dt_object *dt,
        ldiskfs_dec_count(oh->ot_handle, inode);
        spin_unlock(&obj->oo_guard);
 
-       ll_dirty_inode(inode, I_DIRTY_DATASYNC);
+       osd_dirty_inode(inode, I_DIRTY_DATASYNC);
        LINVRNT(osd_invariant(obj));
 
        osd_trans_exec_check(env, th, OSD_OT_REF_DEL);
@@ -4295,7 +4514,7 @@ static int osd_declare_xattr_set(const struct lu_env *env,
 
        LASSERT(handle != NULL);
 
-       oh = container_of0(handle, struct osd_thandle, ot_super);
+       oh = container_of(handle, struct osd_thandle, ot_super);
        LASSERT(oh->ot_handle == NULL);
 
        if (strcmp(name, XATTR_NAME_LMA) == 0) {
@@ -4434,10 +4653,10 @@ static int osd_xattr_set_pfid(const struct lu_env *env, struct osd_object *obj,
 
                /* Remove old PFID EA entry firstly. */
                dquot_initialize(inode);
-               rc = osd_removexattr(dentry, inode, XATTR_NAME_FID);
+               rc = ll_vfs_removexattr(dentry, inode, XATTR_NAME_FID);
                if (rc == -ENODATA) {
-                       if ((fl & LU_XATTR_REPLACE) && !(fl & LU_XATTR_CREATE))
-                               RETURN(rc);
+                       /* XATTR_NAME_FID is already absent */
+                       rc = 0;
                } else if (rc) {
                        RETURN(rc);
                }
@@ -4514,7 +4733,7 @@ static int osd_xattr_handle_linkea(const struct lu_env *env,
 
        ENTRY;
 
-       oh = container_of0(handle, struct osd_thandle, ot_super);
+       oh = container_of(handle, struct osd_thandle, ot_super);
        LASSERT(oh->ot_handle != NULL);
 
        rc = linkea_init_with_rec(&ldata);
@@ -4587,7 +4806,7 @@ static int osd_xattr_set(const struct lu_env *env, struct dt_object *dt,
                 * Version is set after all inode operations are finished,
                 * so we should mark it dirty here
                 */
-               ll_dirty_inode(inode, I_DIRTY_DATASYNC);
+               osd_dirty_inode(inode, I_DIRTY_DATASYNC);
 
                RETURN(0);
        }
@@ -4683,7 +4902,7 @@ static int osd_declare_xattr_del(const struct lu_env *env,
        LASSERT(!dt_object_remote(dt));
        LASSERT(handle != NULL);
 
-       oh = container_of0(handle, struct osd_thandle, ot_super);
+       oh = container_of(handle, struct osd_thandle, ot_super);
        LASSERT(oh->ot_handle == NULL);
 
        osd_trans_declare_op(env, oh, OSD_OT_XATTR_SET,
@@ -4740,7 +4959,7 @@ static int osd_xattr_del(const struct lu_env *env, struct dt_object *dt,
                dquot_initialize(inode);
                dentry->d_inode = inode;
                dentry->d_sb = inode->i_sb;
-               rc = osd_removexattr(dentry, inode, name);
+               rc = ll_vfs_removexattr(dentry, inode, name);
        }
 
        osd_trans_exec_check(env, handle, OSD_OT_XATTR_SET);
@@ -4758,20 +4977,11 @@ static int osd_object_sync(const struct lu_env *env, struct dt_object *dt,
 {
        struct osd_object *obj = osd_dt_obj(dt);
        struct inode *inode = obj->oo_inode;
-       struct osd_thread_info *info = osd_oti_get(env);
-       struct dentry *dentry = &info->oti_obj_dentry;
-       struct file *file = &info->oti_file;
+       struct file *file = osd_quasi_file(env, inode);
        int rc;
 
        ENTRY;
 
-       dentry->d_inode = inode;
-       dentry->d_sb = inode->i_sb;
-       file->f_path.dentry = dentry;
-       file->f_mapping = inode->i_mapping;
-       file->f_op = inode->i_fop;
-       set_file_inode(file, inode);
-
        rc = vfs_fsync_range(file, start, end, 0);
 
        RETURN(rc);
@@ -4782,6 +4992,10 @@ static int osd_invalidate(const struct lu_env *env, struct dt_object *dt)
        return 0;
 }
 
+static bool osd_check_stale(struct dt_object *dt)
+{
+       return false;
+}
 /*
  * Index operations.
  */
@@ -4970,6 +5184,7 @@ static const struct dt_object_operations osd_obj_ops = {
        .do_xattr_list          = osd_xattr_list,
        .do_object_sync         = osd_object_sync,
        .do_invalidate          = osd_invalidate,
+       .do_check_stale         = osd_check_stale,
 };
 
 static const struct dt_object_operations osd_obj_otable_it_ops = {
@@ -4984,7 +5199,7 @@ static int osd_index_declare_iam_delete(const struct lu_env *env,
 {
        struct osd_thandle *oh;
 
-       oh = container_of0(handle, struct osd_thandle, ot_super);
+       oh = container_of(handle, struct osd_thandle, ot_super);
        LASSERT(oh->ot_handle == NULL);
 
        /* Recycle  may cause additional three blocks to be changed. */
@@ -5032,7 +5247,7 @@ static int osd_index_iam_delete(const struct lu_env *env, struct dt_object *dt,
        if (unlikely(ipd == NULL))
                RETURN(-ENOMEM);
 
-       oh = container_of0(handle, struct osd_thandle, ot_super);
+       oh = container_of(handle, struct osd_thandle, ot_super);
        LASSERT(oh->ot_handle != NULL);
        LASSERT(oh->ot_handle->h_transaction != NULL);
 
@@ -5063,7 +5278,7 @@ static int osd_index_declare_ea_delete(const struct lu_env *env,
        LASSERT(!dt_object_remote(dt));
        LASSERT(handle != NULL);
 
-       oh = container_of0(handle, struct osd_thandle, ot_super);
+       oh = container_of(handle, struct osd_thandle, ot_super);
        LASSERT(oh->ot_handle == NULL);
 
        credits = osd_dto_credits_noquota[DTO_INDEX_DELETE];
@@ -5151,6 +5366,50 @@ static void osd_take_care_of_agent(const struct lu_env *env,
 }
 
 /**
+ * Utility function to get real name from object name
+ *
+ * \param[in] obj      pointer to the object to be handled
+ * \param[in] name     object name
+ * \param[in] len      object name len
+ * \param[out]ln       pointer to the struct lu_name to hold the real name
+ *
+ * If file is not encrypted, real name is just the object name.
+ * If file is encrypted, object name needs to be decoded. In
+ * this case a new buffer is allocated, and ln->ln_name needs to be freed by
+ * the caller.
+ *
+ * \retval   0, on success
+ * \retval -ve, on error
+ */
+static int obj_name2lu_name(struct osd_object *obj, const char *name,
+                           int len, struct lu_name *ln)
+{
+       struct lu_fid namefid;
+
+       fid_zero(&namefid);
+
+       if (name[0] == '[')
+               sscanf(name + 1, SFID, RFID(&namefid));
+
+       if (fid_is_sane(&namefid) || name_is_dot_or_dotdot(name, len) ||
+           !(obj->oo_lma_flags & LUSTRE_ENCRYPT_FL)) {
+               ln->ln_name = name;
+               ln->ln_namelen = len;
+       } else {
+               char *buf = kmalloc(len, GFP_NOFS);
+
+               if (!buf)
+                       return -ENOMEM;
+
+               len = critical_decode(name, len, buf);
+               ln->ln_name = buf;
+               ln->ln_namelen = len;
+       }
+
+       return 0;
+}
+
+/**
  * Index delete function for interoperability mode (b11826).
  * It will remove the directory entry added by osd_index_ea_insert().
  * This entry is needed to maintain name->fid mapping.
@@ -5171,6 +5430,7 @@ static int osd_index_ea_delete(const struct lu_env *env, struct dt_object *dt,
        struct buffer_head *bh;
        struct htree_lock *hlock = NULL;
        struct osd_device *osd = osd_dev(dt->do_lu.lo_dev);
+       struct lu_name ln;
        int rc;
 
        ENTRY;
@@ -5182,6 +5442,10 @@ static int osd_index_ea_delete(const struct lu_env *env, struct dt_object *dt,
        LASSERT(!dt_object_remote(dt));
        LASSERT(handle != NULL);
 
+       rc = obj_name2lu_name(obj, (char *)key, strlen((char *)key), &ln);
+       if (rc)
+               RETURN(rc);
+
        osd_trans_exec_op(env, handle, OSD_OT_DELETE);
 
        oh = container_of(handle, struct osd_thandle, ot_super);
@@ -5189,8 +5453,7 @@ static int osd_index_ea_delete(const struct lu_env *env, struct dt_object *dt,
        LASSERT(oh->ot_handle->h_transaction != NULL);
 
        dquot_initialize(dir);
-       dentry = osd_child_dentry_get(env, obj,
-                                     (char *)key, strlen((char *)key));
+       dentry = osd_child_dentry_get(env, obj, ln.ln_name, ln.ln_namelen);
 
        if (obj->oo_hl_head != NULL) {
                hlock = osd_oti_get(env)->oti_hlock;
@@ -5225,15 +5488,27 @@ static int osd_index_ea_delete(const struct lu_env *env, struct dt_object *dt,
        } else {
                rc = PTR_ERR(bh);
        }
+
+       if (!rc && fid_is_namespace_visible(lu_object_fid(&dt->do_lu)) &&
+           obj->oo_dirent_count != LU_DIRENT_COUNT_UNSET) {
+               /* NB, dirent count may not be accurate, because it's counted
+                * without lock.
+                */
+               if (obj->oo_dirent_count)
+                       obj->oo_dirent_count--;
+               else
+                       obj->oo_dirent_count = LU_DIRENT_COUNT_UNSET;
+       }
        if (hlock != NULL)
                ldiskfs_htree_unlock(hlock);
        else
                up_write(&obj->oo_ext_idx_sem);
-
        GOTO(out, rc);
 out:
        LASSERT(osd_invariant(obj));
        osd_trans_exec_check(env, handle, OSD_OT_DELETE);
+       if (ln.ln_name != (char *)key)
+               kfree(ln.ln_name);
        RETURN(rc);
 }
 
@@ -5316,7 +5591,7 @@ static int osd_index_declare_iam_insert(const struct lu_env *env,
 
        LASSERT(handle != NULL);
 
-       oh = container_of0(handle, struct osd_thandle, ot_super);
+       oh = container_of(handle, struct osd_thandle, ot_super);
        LASSERT(oh->ot_handle == NULL);
 
        osd_trans_declare_op(env, oh, OSD_OT_INSERT,
@@ -5366,7 +5641,7 @@ static int osd_index_iam_insert(const struct lu_env *env, struct dt_object *dt,
        if (unlikely(ipd == NULL))
                RETURN(-ENOMEM);
 
-       oh = container_of0(th, struct osd_thandle, ot_super);
+       oh = container_of(th, struct osd_thandle, ot_super);
        LASSERT(oh->ot_handle != NULL);
        LASSERT(oh->ot_handle->h_transaction != NULL);
        if (S_ISDIR(obj->oo_inode->i_mode)) {
@@ -5408,6 +5683,7 @@ static int __osd_ea_add_rec(struct osd_thread_info *info,
        struct ldiskfs_dentry_param *ldp;
        struct dentry *child;
        struct osd_thandle *oth;
+       struct lu_name ln;
        int rc;
 
        oth = container_of(th, struct osd_thandle, ot_super);
@@ -5415,13 +5691,17 @@ static int __osd_ea_add_rec(struct osd_thread_info *info,
        LASSERT(oth->ot_handle->h_transaction != NULL);
        LASSERT(pobj->oo_inode);
 
+       rc = obj_name2lu_name(pobj, name, strlen(name), &ln);
+       if (rc)
+               RETURN(rc);
+
        ldp = (struct ldiskfs_dentry_param *)info->oti_ldp;
-       if (unlikely(pobj->oo_inode ==
-                    osd_sb(osd_obj2dev(pobj))->s_root->d_inode))
+       if (unlikely(osd_object_is_root(pobj)))
                ldp->edp_magic = 0;
        else
                osd_get_ldiskfs_dirent_param(ldp, fid);
-       child = osd_child_dentry_get(info->oti_env, pobj, name, strlen(name));
+       child = osd_child_dentry_get(info->oti_env, pobj,
+                                    ln.ln_name, ln.ln_namelen);
        child->d_fsdata = (void *)ldp;
        dquot_initialize(pobj->oo_inode);
        rc = osd_ldiskfs_add_entry(info, osd_obj2dev(pobj), oth->ot_handle,
@@ -5450,6 +5730,8 @@ static int __osd_ea_add_rec(struct osd_thread_info *info,
                }
        }
 
+       if (ln.ln_name != name)
+               kfree(ln.ln_name);
        RETURN(rc);
 }
 
@@ -5564,6 +5846,10 @@ static int osd_ea_add_rec(const struct lu_env *env, struct osd_object *pobj,
                                              hlock, th);
                }
        }
+       if (!rc && fid_is_namespace_visible(lu_object_fid(&pobj->oo_dt.do_lu))
+           && pobj->oo_dirent_count != LU_DIRENT_COUNT_UNSET)
+               pobj->oo_dirent_count++;
+
        if (hlock != NULL)
                ldiskfs_htree_unlock(hlock);
        else
@@ -5573,31 +5859,20 @@ static int osd_ea_add_rec(const struct lu_env *env, struct osd_object *pobj,
 }
 
 static int
-osd_consistency_check(struct osd_thread_info *oti, struct osd_device *dev,
-                     struct osd_idmap_cache *oic)
+osd_ldiskfs_consistency_check(struct osd_thread_info *oti,
+                             struct osd_device *dev,
+                             const struct lu_fid *fid,
+                             struct osd_inode_id *id)
 {
        struct lustre_scrub *scrub = &dev->od_scrub.os_scrub;
-       struct lu_fid *fid = &oic->oic_fid;
-       struct osd_inode_id *id = &oic->oic_lid;
        struct inode *inode = NULL;
        int once = 0;
        bool insert;
        int rc;
 
        ENTRY;
-
-       if (!fid_is_norm(fid) && !fid_is_igif(fid))
-               RETURN(0);
-
-       if (thread_is_running(&scrub->os_thread) &&
-           scrub->os_pos_current > id->oii_ino)
+       if (!scrub_needs_check(scrub, fid, id->oii_ino))
                RETURN(0);
-
-       if (dev->od_auto_scrub_interval == AS_NEVER ||
-           ktime_get_real_seconds() <
-           scrub->os_file.sf_time_last_complete + dev->od_auto_scrub_interval)
-               RETURN(0);
-
 again:
        rc = osd_oi_lookup(oti, dev, fid, &oti->oti_id, 0);
        if (rc == -ENOENT) {
@@ -5631,7 +5906,7 @@ again:
        insert = false;
 
 trigger:
-       if (thread_is_running(&scrub->os_thread)) {
+       if (scrub->os_running) {
                if (inode == NULL) {
                        inode = osd_iget(oti, dev, id);
                        /* The inode has been removed (by race maybe). */
@@ -5642,7 +5917,7 @@ trigger:
                        }
                }
 
-               rc = osd_oii_insert(dev, oic, insert);
+               rc = osd_oii_insert(dev, fid, id, insert);
                /*
                 * There is race condition between osd_oi_lookup and OI scrub.
                 * The OI scrub finished just after osd_oi_lookup() failure.
@@ -5655,18 +5930,18 @@ trigger:
                if (!S_ISDIR(inode->i_mode))
                        rc = 0;
                else
-                       rc = osd_check_lmv(oti, dev, inode, oic);
+                       rc = osd_check_lmv(oti, dev, inode);
 
                GOTO(out, rc);
        }
 
-       if (dev->od_auto_scrub_interval != AS_NEVER && ++once == 1) {
+       if (dev->od_scrub.os_scrub.os_auto_scrub_interval != AS_NEVER &&
+           ++once == 1) {
                rc = osd_scrub_start(oti->oti_env, dev, SS_AUTO_PARTIAL |
                                     SS_CLEAR_DRYRUN | SS_CLEAR_FAILOUT);
-               CDEBUG(D_LFSCK | D_CONSOLE | D_WARNING,
-                      "%s: trigger partial OI scrub for RPC inconsistency "
-                      "checking FID "DFID": rc = %d\n",
-                      osd_dev2name(dev), PFID(fid), rc);
+               CDEBUG_LIMIT(D_LFSCK | D_CONSOLE | D_WARNING,
+                            "%s: trigger partial OI scrub for RPC inconsistency, checking FID "DFID"/%u: rc = %d\n",
+                            osd_dev2name(dev), PFID(fid), id->oii_ino, rc);
                if (rc == 0 || rc == -EALREADY)
                        goto again;
        }
@@ -5674,18 +5949,17 @@ trigger:
        GOTO(out, rc);
 
 out:
-       if (inode)
-               iput(inode);
+       iput(inode);
 
        RETURN(rc);
 }
 
 static int osd_fail_fid_lookup(struct osd_thread_info *oti,
                               struct osd_device *dev,
-                              struct osd_idmap_cache *oic,
                               struct lu_fid *fid, __u32 ino)
 {
        struct lustre_ost_attrs *loa = &oti->oti_ost_attrs;
+       struct osd_idmap_cache *oic = &oti->oti_cache;
        struct inode *inode;
        int rc;
 
@@ -5853,6 +6127,7 @@ static int osd_ea_lookup_rec(const struct lu_env *env, struct osd_object *obj,
        struct buffer_head *bh;
        struct lu_fid *fid = (struct lu_fid *)rec;
        struct htree_lock *hlock = NULL;
+       struct lu_name ln;
        int ino;
        int rc;
 
@@ -5861,8 +6136,11 @@ static int osd_ea_lookup_rec(const struct lu_env *env, struct osd_object *obj,
        LASSERT(dir->i_op != NULL);
        LASSERT(dir->i_op->lookup != NULL);
 
-       dentry = osd_child_dentry_get(env, obj,
-                                     (char *)key, strlen((char *)key));
+       rc = obj_name2lu_name(obj, (char *)key, strlen((char *)key), &ln);
+       if (rc)
+               RETURN(rc);
+
+       dentry = osd_child_dentry_get(env, obj, ln.ln_name, ln.ln_namelen);
 
        if (obj->oo_hl_head != NULL) {
                hlock = osd_oti_get(env)->oti_hlock;
@@ -5876,13 +6154,12 @@ static int osd_ea_lookup_rec(const struct lu_env *env, struct osd_object *obj,
        if (!IS_ERR(bh)) {
                struct osd_thread_info *oti = osd_oti_get(env);
                struct osd_inode_id *id = &oti->oti_id;
-               struct osd_idmap_cache *oic = &oti->oti_cache;
                struct osd_device *dev = osd_obj2dev(obj);
 
                ino = le32_to_cpu(de->inode);
                if (OBD_FAIL_CHECK(OBD_FAIL_FID_LOOKUP)) {
                        brelse(bh);
-                       rc = osd_fail_fid_lookup(oti, dev, oic, fid, ino);
+                       rc = osd_fail_fid_lookup(oti, dev, fid, ino);
                        GOTO(out, rc);
                }
 
@@ -5910,19 +6187,24 @@ static int osd_ea_lookup_rec(const struct lu_env *env, struct osd_object *obj,
                        osd_id_gen(id, ino, OSD_OII_NOGEN);
                }
 
-               if (rc != 0 || osd_remote_fid(env, dev, fid)) {
-                       fid_zero(&oic->oic_fid);
-
+               if (rc != 0 || osd_remote_fid(env, dev, fid))
                        GOTO(out, rc);
-               }
 
-               osd_add_oi_cache(osd_oti_get(env), osd_obj2dev(obj), id, fid);
-               rc = osd_consistency_check(oti, dev, oic);
-               if (rc == -ENOENT)
-                       fid_zero(&oic->oic_fid);
-               else
+               rc = osd_ldiskfs_consistency_check(oti, dev, fid, id);
+               if (rc != -ENOENT) {
                        /* Other error should not affect lookup result. */
                        rc = 0;
+
+                       /* Normal file mapping should be added into OI cache
+                        * after FID in LMA check, but for local files like
+                        * hsm_actions, their FIDs are not stored in OI files,
+                        * see osd_initial_OI_scrub(), and here is the only
+                        * place to load mapping into OI cache.
+                        */
+                       if (!fid_is_namespace_visible(fid))
+                               osd_add_oi_cache(osd_oti_get(env),
+                                                osd_obj2dev(obj), id, fid);
+               }
        } else {
                rc = PTR_ERR(bh);
        }
@@ -5934,7 +6216,9 @@ out:
                ldiskfs_htree_unlock(hlock);
        else
                up_read(&obj->oo_ext_idx_sem);
-       return rc;
+       if (ln.ln_name != (char *)key)
+               kfree(ln.ln_name);
+       RETURN(rc);
 }
 
 static int osd_index_declare_ea_insert(const struct lu_env *env,
@@ -5957,7 +6241,7 @@ static int osd_index_declare_ea_insert(const struct lu_env *env,
        LASSERT(fid != NULL);
        LASSERT(rec1->rec_type != 0);
 
-       oh = container_of0(handle, struct osd_thandle, ot_super);
+       oh = container_of(handle, struct osd_thandle, ot_super);
        LASSERT(oh->ot_handle == NULL);
 
        credits = osd_dto_credits_noquota[DTO_INDEX_INSERT];
@@ -6008,7 +6292,7 @@ static int osd_index_declare_ea_insert(const struct lu_env *env,
                    i_projid_read(inode) != 0)
                        rc = osd_declare_attr_qid(env, osd_dt_obj(dt), oh,
                                                  0, i_projid_read(inode),
-                                                 0, false, PRJQUOTA, true);
+                                                 0, false, PRJQUOTA);
 #endif
        }
 
@@ -6117,6 +6401,7 @@ static int osd_index_ea_insert(const struct lu_env *env, struct dt_object *dt,
                iput(child_inode);
        LASSERT(osd_invariant(obj));
        osd_trans_exec_check(env, th, OSD_OT_INSERT);
+
        RETURN(rc);
 }
 
@@ -6398,36 +6683,23 @@ static const struct dt_index_operations osd_index_iam_ops = {
        }
 };
 
-
-/**
- * Creates or initializes iterator context.
- *
- * \retval struct osd_it_ea, iterator structure on success
- *
- */
-static struct dt_it *osd_it_ea_init(const struct lu_env *env,
-                                   struct dt_object *dt,
-                                   __u32 attr)
+struct osd_it_ea *osd_it_dir_init(const struct lu_env *env,
+                                 struct inode *inode, __u32 attr)
 {
-       struct osd_object *obj = osd_dt_obj(dt);
        struct osd_thread_info *info = osd_oti_get(env);
        struct osd_it_ea *oie;
        struct file *file;
-       struct lu_object *lo = &dt->do_lu;
        struct dentry *obj_dentry;
 
        ENTRY;
 
-       if (!dt_object_exists(dt) || obj->oo_destroyed)
-               RETURN(ERR_PTR(-ENOENT));
-
        OBD_SLAB_ALLOC_PTR_GFP(oie, osd_itea_cachep, GFP_NOFS);
        if (oie == NULL)
                RETURN(ERR_PTR(-ENOMEM));
        obj_dentry = &oie->oie_dentry;
 
-       obj_dentry->d_inode = obj->oo_inode;
-       obj_dentry->d_sb = osd_sb(osd_obj2dev(obj));
+       obj_dentry->d_inode = inode;
+       obj_dentry->d_sb = inode->i_sb;
        obj_dentry->d_name.hash = 0;
 
        oie->oie_rd_dirent       = 0;
@@ -6441,7 +6713,7 @@ static struct dt_it *osd_it_ea_init(const struct lu_env *env,
                if (oie->oie_buf == NULL)
                        RETURN(ERR_PTR(-ENOMEM));
        }
-       oie->oie_obj = obj;
+       oie->oie_obj = NULL;
 
        file = &oie->oie_file;
 
@@ -6451,14 +6723,56 @@ static struct dt_it *osd_it_ea_init(const struct lu_env *env,
        else
                file->f_mode    = FMODE_32BITHASH;
        file->f_path.dentry     = obj_dentry;
-       file->f_mapping         = obj->oo_inode->i_mapping;
-       file->f_op              = obj->oo_inode->i_fop;
-       set_file_inode(file, obj->oo_inode);
+       file->f_mapping         = inode->i_mapping;
+       file->f_op              = inode->i_fop;
+       file->f_inode = inode;
+
+       RETURN(oie);
+}
+
+/**
+ * Creates or initializes iterator context.
+ *
+ * \retval struct osd_it_ea, iterator structure on success
+ *
+ */
+static struct dt_it *osd_it_ea_init(const struct lu_env *env,
+                                   struct dt_object *dt,
+                                   __u32 attr)
+{
+       struct osd_object *obj = osd_dt_obj(dt);
+       struct lu_object *lo = &dt->do_lu;
+       struct osd_it_ea *oie;
+
+       ENTRY;
+
+       if (!dt_object_exists(dt) || obj->oo_destroyed)
+               RETURN(ERR_PTR(-ENOENT));
+
+       oie = osd_it_dir_init(env, obj->oo_inode, attr);
+       if (IS_ERR(oie))
+               RETURN((struct dt_it *)oie);
 
+       oie->oie_obj = obj;
        lu_object_get(lo);
        RETURN((struct dt_it *)oie);
 }
 
+void osd_it_dir_fini(const struct lu_env *env, struct osd_it_ea *oie,
+                    struct inode *inode)
+{
+       struct osd_thread_info *info = osd_oti_get(env);
+
+       ENTRY;
+       oie->oie_file.f_op->release(inode, &oie->oie_file);
+       if (unlikely(oie->oie_buf != info->oti_it_ea_buf))
+               OBD_FREE(oie->oie_buf, OSD_IT_EA_BUFSIZE);
+       else
+               info->oti_it_ea_buf_used = 0;
+       OBD_SLAB_FREE_PTR(oie, osd_itea_cachep);
+       EXIT;
+}
+
 /**
  * Destroy or finishes iterator context.
  *
@@ -6466,19 +6780,13 @@ static struct dt_it *osd_it_ea_init(const struct lu_env *env,
  */
 static void osd_it_ea_fini(const struct lu_env *env, struct dt_it *di)
 {
-       struct osd_thread_info *info = osd_oti_get(env);
        struct osd_it_ea *oie = (struct osd_it_ea *)di;
        struct osd_object *obj = oie->oie_obj;
        struct inode *inode = obj->oo_inode;
 
        ENTRY;
-       oie->oie_file.f_op->release(inode, &oie->oie_file);
+       osd_it_dir_fini(env, (struct osd_it_ea *)di, inode);
        osd_object_put(env, obj);
-       if (unlikely(oie->oie_buf != info->oti_it_ea_buf))
-               OBD_FREE(oie->oie_buf, OSD_IT_EA_BUFSIZE);
-       else
-               info->oti_it_ea_buf_used = 0;
-       OBD_SLAB_FREE_PTR(oie, osd_itea_cachep);
        EXIT;
 }
 
@@ -6542,6 +6850,7 @@ static int osd_ldiskfs_filldir(void *buf,
        struct osd_it_ea_dirent *ent = it->oie_dirent;
        struct lu_fid *fid = &ent->oied_fid;
        struct osd_fid_pack *rec;
+       struct lu_fid namefid;
 
        ENTRY;
 
@@ -6555,9 +6864,12 @@ static int osd_ldiskfs_filldir(void *buf,
            OSD_IT_EA_BUFSIZE)
                RETURN(1);
 
+       fid_zero(&namefid);
+
        /* "." is just the object itself. */
        if (namelen == 1 && name[0] == '.') {
-               *fid = obj->oo_dt.do_lu.lo_header->loh_fid;
+               if (obj != NULL)
+                       *fid = obj->oo_dt.do_lu.lo_header->loh_fid;
        } else if (d_type & LDISKFS_DIRENT_LUFID) {
                rec = (struct osd_fid_pack *)(name + namelen + 1);
                if (osd_fid_unpack(fid, rec) != 0)
@@ -6568,18 +6880,34 @@ static int osd_ldiskfs_filldir(void *buf,
        d_type &= ~LDISKFS_DIRENT_LUFID;
 
        /* NOT export local root. */
-       if (unlikely(osd_sb(osd_obj2dev(obj))->s_root->d_inode->i_ino == ino)) {
+       if (obj != NULL &&
+           unlikely(osd_sb(osd_obj2dev(obj))->s_root->d_inode->i_ino == ino)) {
                ino = obj->oo_inode->i_ino;
                *fid = obj->oo_dt.do_lu.lo_header->loh_fid;
        }
 
+       if (name[0] == '[')
+               sscanf(name + 1, SFID, RFID(&namefid));
+       if (fid_is_sane(&namefid) || name_is_dot_or_dotdot(name, namelen) ||
+           !obj || !(obj->oo_lma_flags & LUSTRE_ENCRYPT_FL)) {
+               memcpy(ent->oied_name, name, namelen);
+       } else {
+               int presented_len = critical_chars(name, namelen);
+
+               if (presented_len == namelen)
+                       memcpy(ent->oied_name, name, namelen);
+               else
+                       namelen = critical_encode(name, namelen,
+                                                 ent->oied_name);
+
+               ent->oied_name[namelen] = '\0';
+       }
+
        ent->oied_ino     = ino;
        ent->oied_off     = offset;
        ent->oied_namelen = namelen;
        ent->oied_type    = d_type;
 
-       memcpy(ent->oied_name, name, namelen);
-
        it->oie_rd_dirent++;
        it->oie_dirent = (void *)ent + cfs_size_round(sizeof(*ent) + namelen);
        RETURN(0);
@@ -6595,12 +6923,10 @@ static int osd_ldiskfs_filldir(void *buf,
  * \retval -ve on error
  * \retval +1 reach the end of entry
  */
-static int osd_ldiskfs_it_fill(const struct lu_env *env,
-                              const struct dt_it *di)
+int osd_ldiskfs_it_fill(const struct lu_env *env, const struct dt_it *di)
 {
        struct osd_it_ea *it = (struct osd_it_ea *)di;
        struct osd_object *obj = it->oie_obj;
-       struct inode *inode = obj->oo_inode;
        struct htree_lock *hlock = NULL;
        struct file *filp = &it->oie_file;
        int rc = 0;
@@ -6613,28 +6939,27 @@ static int osd_ldiskfs_it_fill(const struct lu_env *env,
        it->oie_dirent = it->oie_buf;
        it->oie_rd_dirent = 0;
 
-       if (obj->oo_hl_head != NULL) {
-               hlock = osd_oti_get(env)->oti_hlock;
-               ldiskfs_htree_lock(hlock, obj->oo_hl_head,
-                                  inode, LDISKFS_HLOCK_READDIR);
-       } else {
-               down_read(&obj->oo_ext_idx_sem);
+       if (obj) {
+               if (obj->oo_hl_head != NULL) {
+                       hlock = osd_oti_get(env)->oti_hlock;
+                       ldiskfs_htree_lock(hlock, obj->oo_hl_head,
+                                          obj->oo_inode,
+                                          LDISKFS_HLOCK_READDIR);
+               } else {
+                       down_read(&obj->oo_ext_idx_sem);
+               }
        }
 
+       filp->f_cred = current_cred();
        rc = osd_security_file_alloc(filp);
        if (rc)
-               RETURN(rc);
+               GOTO(unlock, rc);
 
        filp->f_flags |= O_NOATIME;
        filp->f_mode |= FMODE_NONOTIFY;
        rc = iterate_dir(filp, &buf.ctx);
        if (rc)
-               RETURN(rc);
-
-       if (hlock != NULL)
-               ldiskfs_htree_unlock(hlock);
-       else
-               up_read(&obj->oo_ext_idx_sem);
+               GOTO(unlock, rc);
 
        if (it->oie_rd_dirent == 0) {
                /*
@@ -6648,6 +6973,13 @@ static int osd_ldiskfs_it_fill(const struct lu_env *env,
                it->oie_dirent = it->oie_buf;
                it->oie_it_dirent = 1;
        }
+unlock:
+       if (obj) {
+               if (hlock != NULL)
+                       ldiskfs_htree_unlock(hlock);
+               else
+                       up_read(&obj->oo_ext_idx_sem);
+       }
 
        RETURN(rc);
 }
@@ -6716,6 +7048,11 @@ static int osd_it_ea_key_size(const struct lu_env *env, const struct dt_it *di)
        return it->oie_dirent->oied_namelen;
 }
 
+#if defined LDISKFS_DIR_ENTRY_LEN && defined LDISKFS_DIR_ENTRY_LEN_
+#undef LDISKFS_DIR_REC_LEN
+#define LDISKFS_DIR_REC_LEN(de)                LDISKFS_DIR_ENTRY_LEN_((de))
+#endif
+
 static inline bool osd_dotdot_has_space(struct ldiskfs_dir_entry_2 *de)
 {
        if (LDISKFS_DIR_REC_LEN(de) >=
@@ -6791,11 +7128,10 @@ osd_dirent_reinsert(const struct lu_env *env, struct osd_device *dev,
         * That means we lose it!
         */
        if (rc != 0)
-               CDEBUG(D_LFSCK, "%s: fail to reinsert the dirent, "
-                      "dir = %lu/%u, name = %.*s, "DFID": rc = %d\n",
-                      osd_ino2name(inode),
-                      dir->i_ino, dir->i_generation, namelen,
-                      dentry->d_name.name, PFID(fid), rc);
+               CDEBUG(D_LFSCK,
+                      "%s: fail to reinsert the dirent, dir = %lu/%u, name = %.*s, "DFID": rc = %d\n",
+                      osd_ino2name(inode), dir->i_ino, dir->i_generation,
+                      namelen, dentry->d_name.name, PFID(fid), rc);
 
        RETURN(rc);
 }
@@ -6823,6 +7159,7 @@ osd_dirent_check_repair(const struct lu_env *env, struct osd_object *obj,
        int rc;
        bool dotdot = false;
        bool dirty = false;
+       struct lu_name ln;
 
        ENTRY;
 
@@ -6857,8 +7194,11 @@ osd_dirent_check_repair(const struct lu_env *env, struct osd_object *obj,
                RETURN(rc);
        }
 
-       dentry = osd_child_dentry_by_inode(env, dir, ent->oied_name,
-                                          ent->oied_namelen);
+       rc = obj_name2lu_name(obj, ent->oied_name, ent->oied_namelen, &ln);
+       if (rc)
+               RETURN(rc);
+
+       dentry = osd_child_dentry_by_inode(env, dir, ln.ln_name, ln.ln_namelen);
        rc = osd_get_lma(info, inode, dentry, &info->oti_ost_attrs);
        if (rc == -ENODATA || !fid_is_sane(&lma->lma_self_fid))
                lma = NULL;
@@ -7131,6 +7471,8 @@ out_inode:
        iput(inode);
        if (rc >= 0 && !dirty)
                dev->od_dirent_journal = 0;
+       if (ln.ln_name != ent->oied_name)
+               kfree(ln.ln_name);
 
        return rc;
 }
@@ -7206,8 +7548,6 @@ static inline int osd_it_ea_rec(const struct lu_env *env,
 
                                rc = osd_ea_fid_get(env, obj, ino, fid, id);
                        }
-               } else {
-                       osd_id_gen(id, ino, OSD_OII_NOGEN);
                }
        }
 
@@ -7217,15 +7557,6 @@ static inline int osd_it_ea_rec(const struct lu_env *env,
                           it->oie_dirent->oied_namelen,
                           it->oie_dirent->oied_type, attr);
 
-       if (rc < 0)
-               RETURN(rc);
-
-       if (osd_remote_fid(env, dev, fid))
-               RETURN(0);
-
-       if (likely(!(attr & (LUDA_IGNORE | LUDA_UNKNOWN)) && rc == 0))
-               osd_add_oi_cache(oti, dev, id, fid);
-
        RETURN(rc > 0 ? 0 : rc);
 }
 
@@ -7382,11 +7713,17 @@ static void osd_key_fini(const struct lu_context *ctx,
        if (info->oti_dio_pages) {
                int i;
                for (i = 0; i < PTLRPC_MAX_BRW_PAGES; i++) {
-                       if (info->oti_dio_pages[i])
-                               __free_page(info->oti_dio_pages[i]);
+                       struct page *page = info->oti_dio_pages[i];
+                       if (page) {
+                               LASSERT(PagePrivate2(page));
+                               LASSERT(PageLocked(page));
+                               ClearPagePrivate2(page);
+                               unlock_page(page);
+                               __free_page(page);
+                       }
                }
-               OBD_FREE(info->oti_dio_pages,
-                        sizeof(struct page *) * PTLRPC_MAX_BRW_PAGES);
+               OBD_FREE_PTR_ARRAY_LARGE(info->oti_dio_pages,
+                                        PTLRPC_MAX_BRW_PAGES);
        }
 
        if (info->oti_inode != NULL)
@@ -7400,7 +7737,7 @@ static void osd_key_fini(const struct lu_context *ctx,
        lu_buf_free(&info->oti_big_buf);
        if (idc != NULL) {
                LASSERT(info->oti_ins_cache_size > 0);
-               OBD_FREE(idc, sizeof(*idc) * info->oti_ins_cache_size);
+               OBD_FREE_PTR_ARRAY(idc, info->oti_ins_cache_size);
                info->oti_ins_cache = NULL;
                info->oti_ins_cache_size = 0;
        }
@@ -7442,7 +7779,7 @@ static int osd_device_init(const struct lu_env *env, struct lu_device *d,
 static int osd_fid_init(const struct lu_env *env, struct osd_device *osd)
 {
        struct seq_server_site *ss = osd_seq_site(osd);
-       int rc;
+       int rc = 0;
 
        ENTRY;
 
@@ -7456,13 +7793,8 @@ static int osd_fid_init(const struct lu_env *env, struct osd_device *osd)
        if (osd->od_cl_seq == NULL)
                RETURN(-ENOMEM);
 
-       rc = seq_client_init(osd->od_cl_seq, NULL, LUSTRE_SEQ_METADATA,
-                            osd->od_svname, ss->ss_server_seq);
-       if (rc != 0) {
-               OBD_FREE_PTR(osd->od_cl_seq);
-               osd->od_cl_seq = NULL;
-               RETURN(rc);
-       }
+       seq_client_init(osd->od_cl_seq, NULL, LUSTRE_SEQ_METADATA,
+                       osd->od_svname, ss->ss_server_seq);
 
        if (ss->ss_node_id == 0) {
                /*
@@ -7519,6 +7851,8 @@ static void osd_umount(const struct lu_env *env, struct osd_device *o)
        if (o->od_mnt != NULL) {
                shrink_dcache_sb(osd_sb(o));
                osd_sync(env, &o->od_dt_dev);
+               wait_event(o->od_commit_cb_done,
+                         !atomic_read(&o->od_commit_cb_in_flight));
 
                mntput(o->od_mnt);
                o->od_mnt = NULL;
@@ -7527,21 +7861,35 @@ static void osd_umount(const struct lu_env *env, struct osd_device *o)
        EXIT;
 }
 
+#if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(3, 2, 53, 0)
+# ifndef LDISKFS_HAS_INCOMPAT_FEATURE
+/* Newer kernels provide the ldiskfs_set_feature_largedir() wrapper already,
+ * which calls ldiskfs_update_dynamic_rev() to update ancient filesystems.
+ * All ldiskfs filesystems are already v2, so it is a no-op and unnecessary.
+ * This avoids maintaining patches to export this otherwise-useless function.
+ */
+void ldiskfs_update_dynamic_rev(struct super_block *sb)
+{
+       /* do nothing */
+}
+# endif
+#endif
+
 static int osd_mount(const struct lu_env *env,
                     struct osd_device *o, struct lustre_cfg *cfg)
 {
        const char *name = lustre_cfg_string(cfg, 0);
        const char *dev = lustre_cfg_string(cfg, 1);
        const char *opts;
-       unsigned long page, s_flags, lmd_flags = 0;
+       unsigned long page, s_flags = 0, lmd_flags = 0;
        struct page *__page;
        struct file_system_type *type;
        char *options = NULL;
-       char *str;
+       const char *str;
        struct osd_thread_info *info = osd_oti_get(env);
        struct lu_fid *fid = &info->oti_fid;
        struct inode *inode;
-       int rc = 0, force_over_512tb = 0;
+       int rc = 0, force_over_1024tb = 0;
 
        ENTRY;
 
@@ -7552,11 +7900,9 @@ static int osd_mount(const struct lu_env *env,
                RETURN(-E2BIG);
        strcpy(o->od_mntdev, dev);
 
-       str = lustre_cfg_string(cfg, 2);
-       s_flags = simple_strtoul(str, NULL, 0);
-       str = strstr(str, ":");
-       if (str)
-               lmd_flags = simple_strtoul(str + 1, NULL, 0);
+       str = lustre_cfg_buf(cfg, 2);
+       sscanf(str, "%lu:%lu", &s_flags, &lmd_flags);
+
        opts = lustre_cfg_string(cfg, 3);
 #ifdef __BIG_ENDIAN
        if (opts == NULL || strstr(opts, "bigendian_extents") == NULL) {
@@ -7568,23 +7914,22 @@ static int osd_mount(const struct lu_env *env,
 #endif
 #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(3, 0, 53, 0)
        if (opts != NULL && strstr(opts, "force_over_128tb") != NULL) {
-               CWARN("force_over_128tb option is deprecated. "
-                     "Filesystems less than 512TB can be created without any "
-                     "force options. Use force_over_512tb option for "
-                     "filesystems greater than 512TB.\n");
+               CWARN("force_over_128tb option is deprecated.  Filesystems smaller than 1024TB can be created without any force option. Use force_over_1024tb option for filesystems larger than 1024TB.\n");
        }
 #endif
 #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(3, 1, 53, 0)
        if (opts != NULL && strstr(opts, "force_over_256tb") != NULL) {
-               CWARN("force_over_256tb option is deprecated. "
-                     "Filesystems less than 512TB can be created without any "
-                     "force options. Use force_over_512tb option for "
-                     "filesystems greater than 512TB.\n");
+               CWARN("force_over_256tb option is deprecated.  Filesystems smaller than 1024TB can be created without any force options. Use force_over_1024tb option for filesystems larger than 1024TB.\n");
+       }
+#endif
+#if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(3, 2, 53, 0)
+       if (opts != NULL && strstr(opts, "force_over_512tb") != NULL) {
+               CWARN("force_over_512tb option is deprecated.  Filesystems smaller than 1024TB can be created without any force options. Use force_over_1024tb option for filesystems larger than 1024TB.\n");
        }
 #endif
 
-       if (opts != NULL && strstr(opts, "force_over_512tb") != NULL)
-               force_over_512tb = 1;
+       if (opts != NULL && strstr(opts, "force_over_1024tb") != NULL)
+               force_over_1024tb = 1;
 
        __page = alloc_page(GFP_KERNEL);
        if (__page == NULL)
@@ -7594,7 +7939,7 @@ static int osd_mount(const struct lu_env *env,
        *options = '\0';
        if (opts != NULL) {
                /* strip out the options for back compatiblity */
-               static char *sout[] = {
+               static const char * const sout[] = {
                        "mballoc",
                        "iopen",
                        "noiopen",
@@ -7606,6 +7951,8 @@ static int osd_mount(const struct lu_env *env,
                        "force_over_128tb",
                        "force_over_256tb",
                        "force_over_512tb",
+                       "force_over_1024tb",
+                       "resetoi",
                        NULL
                };
                strncat(options, opts, PAGE_SIZE);
@@ -7653,33 +8000,18 @@ static int osd_mount(const struct lu_env *env,
        }
 
        if (ldiskfs_blocks_count(LDISKFS_SB(osd_sb(o))->s_es) <<
-                                osd_sb(o)->s_blocksize_bits > 512ULL << 40 &&
-                                force_over_512tb == 0) {
-               CERROR("%s: device %s LDISKFS does not support filesystems "
-                      "greater than 512TB and can cause data corruption. "
-                      "Use \"force_over_512tb\" mount option to override.\n",
+                                osd_sb(o)->s_blocksize_bits > 1024ULL << 40 &&
+                                force_over_1024tb == 0) {
+               CERROR("%s: device %s LDISKFS has not been tested on filesystems larger than 1024TB and may cause data corruption. Use 'force_over_1024tb' mount option to override.\n",
                       name, dev);
                GOTO(out_mnt, rc = -EINVAL);
        }
 
        if (lmd_flags & LMD_FLG_DEV_RDONLY) {
-               if (priv_dev_set_rdonly) {
-                       priv_dev_set_rdonly(osd_sb(o)->s_bdev);
-                       o->od_dt_dev.dd_rdonly = 1;
-                       LCONSOLE_WARN("%s: set dev_rdonly on this device\n",
-                                     name);
-               } else {
-                       LCONSOLE_WARN("%s: not support dev_rdonly on this device",
-                                     name);
+               LCONSOLE_WARN("%s: not support dev_rdonly on this device\n",
+                             name);
 
-                       GOTO(out_mnt, rc = -EOPNOTSUPP);
-               }
-       } else if (priv_dev_check_rdonly &&
-                  priv_dev_check_rdonly(osd_sb(o)->s_bdev)) {
-               CERROR("%s: underlying device %s is marked as "
-                      "read-only. Setup failed\n", name, dev);
-
-               GOTO(out_mnt, rc = -EROFS);
+               GOTO(out_mnt, rc = -EOPNOTSUPP);
        }
 
        if (!ldiskfs_has_feature_journal(o->od_mnt->mnt_sb)) {
@@ -7687,6 +8019,7 @@ static int osd_mount(const struct lu_env *env,
                GOTO(out_mnt, rc = -EINVAL);
        }
 
+#if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(3, 2, 53, 0)
 #ifdef LDISKFS_MOUNT_DIRDATA
        if (ldiskfs_has_feature_dirdata(o->od_mnt->mnt_sb))
                LDISKFS_SB(osd_sb(o))->s_mount_opt |= LDISKFS_MOUNT_DIRDATA;
@@ -7696,6 +8029,15 @@ static int osd_mount(const struct lu_env *env,
                      "downgrade to Lustre-1.x again, you can enable it via "
                      "'tune2fs -O dirdata device'\n", name, dev);
 #endif
+       /* enable large_dir on MDTs to avoid REMOTE_PARENT_DIR overflow,
+        * and on very large OSTs to avoid object directory overflow */
+       if (unlikely(!ldiskfs_has_feature_largedir(o->od_mnt->mnt_sb) &&
+                    !strstr(name, "MGS"))) {
+               ldiskfs_set_feature_largedir(o->od_mnt->mnt_sb);
+               LCONSOLE_INFO("%s: enabled 'large_dir' feature on device %s\n",
+                             name, dev);
+       }
+#endif
        inode = osd_sb(o)->s_root->d_inode;
        lu_local_obj_fid(fid, OSD_FS_ROOT_OID);
        rc = osd_ea_fid_set(info, inode, fid, LMAC_NOT_IN_OI, 0);
@@ -7705,7 +8047,7 @@ static int osd_mount(const struct lu_env *env,
        }
 
        if (lmd_flags & LMD_FLG_NOSCRUB)
-               o->od_auto_scrub_interval = AS_NEVER;
+               o->od_scrub.os_scrub.os_auto_scrub_interval = AS_NEVER;
 
        if (blk_queue_nonrot(bdev_get_queue(osd_sb(o)->s_bdev))) {
                /* do not use pagecache with flash-backed storage */
@@ -7736,6 +8078,10 @@ static struct lu_device *osd_device_fini(const struct lu_env *env,
        osd_index_backup(env, o, false);
        osd_shutdown(env, o);
        osd_procfs_fini(o);
+       if (o->od_oi_table != NULL)
+               osd_oi_fini(osd_oti_get(env), o);
+       if (o->od_extent_bytes_percpu)
+               free_percpu(o->od_extent_bytes_percpu);
        osd_obj_map_fini(o);
        osd_umount(env, o);
 
@@ -7748,8 +8094,10 @@ static int osd_device_init0(const struct lu_env *env,
 {
        struct lu_device *l = osd2lu_dev(o);
        struct osd_thread_info *info;
-       int rc;
        int cplen = 0;
+       char *opts = NULL;
+       bool restored = false;
+       int rc;
 
        /* if the module was re-loaded, env can loose its keys */
        rc = lu_env_refill((struct lu_env *)env);
@@ -7769,13 +8117,16 @@ static int osd_device_init0(const struct lu_env *env,
        spin_lock_init(&o->od_lock);
        o->od_index_backup_policy = LIBP_NONE;
        o->od_t10_type = 0;
+       init_waitqueue_head(&o->od_commit_cb_done);
 
        o->od_read_cache = 1;
        o->od_writethrough_cache = 1;
        o->od_readcache_max_filesize = OSD_MAX_CACHE_SIZE;
        o->od_readcache_max_iosize = OSD_READCACHE_MAX_IO_MB << 20;
        o->od_writethrough_max_iosize = OSD_WRITECACHE_MAX_IO_MB << 20;
-       o->od_auto_scrub_interval = AS_DEFAULT;
+       o->od_scrub.os_scrub.os_auto_scrub_interval = AS_DEFAULT;
+       /* default fallocate to unwritten extents: LU-14326/LU-14333 */
+       o->od_fallocate_zero_blocks = 0;
 
        cplen = strlcpy(o->od_svname, lustre_cfg_string(cfg, 4),
                        sizeof(o->od_svname));
@@ -7813,10 +8164,14 @@ static int osd_device_init0(const struct lu_env *env,
        if (rc != 0)
                GOTO(out_site, rc);
 
+       opts = lustre_cfg_string(cfg, 3);
+       if (opts && strstr(opts, "resetoi"))
+               restored = true;
+
        INIT_LIST_HEAD(&o->od_ios_list);
        /* setup scrub, including OI files initialization */
        o->od_in_init = 1;
-       rc = osd_scrub_setup(env, o);
+       rc = osd_scrub_setup(env, o, restored);
        o->od_in_init = 0;
        if (rc < 0)
                GOTO(out_site, rc);
@@ -7858,6 +8213,12 @@ static int osd_device_init0(const struct lu_env *env,
                GOTO(out_procfs, rc);
        }
 
+       o->od_extent_bytes_percpu = alloc_percpu(unsigned int);
+       if (!o->od_extent_bytes_percpu) {
+               rc = -ENOMEM;
+               GOTO(out_procfs, rc);
+       }
+
        RETURN(0);
 
 out_procfs:
@@ -7913,10 +8274,8 @@ static struct lu_device *osd_device_free(const struct lu_env *env,
        /* XXX: make osd top device in order to release reference */
        d->ld_site->ls_top_dev = d;
        lu_site_purge(env, d->ld_site, -1);
-       if (!cfs_hash_is_empty(d->ld_site->ls_obj_hash)) {
-               LIBCFS_DEBUG_MSG_DATA_DECL(msgdata, D_ERROR, NULL);
-               lu_site_print(env, d->ld_site, &msgdata, lu_cdebug_printer);
-       }
+       lu_site_print(env, d->ld_site, &d->ld_site->ls_obj_hash.nelems,
+                     D_ERROR, lu_cdebug_printer);
        lu_site_fini(&o->od_site);
        dt_device_fini(&o->od_dt_dev);
        OBD_FREE_PTR(o);
@@ -8088,10 +8447,18 @@ static int osd_prepare(const struct lu_env *env, struct lu_device *pdev,
        RETURN(result);
 }
 
-static int osd_fid_alloc(const struct lu_env *env, struct obd_export *exp,
-                        struct lu_fid *fid, struct md_op_data *op_data)
+/**
+ * Implementation of lu_device_operations::ldo_fid_alloc() for OSD
+ *
+ * Allocate FID.
+ *
+ * see include/lu_object.h for the details.
+ */
+static int osd_fid_alloc(const struct lu_env *env, struct lu_device *d,
+                        struct lu_fid *fid, struct lu_object *parent,
+                        const struct lu_name *name)
 {
-       struct osd_device *osd = osd_dev(exp->exp_obd->obd_lu_dev);
+       struct osd_device *osd = osd_dev(d);
 
        return seq_client_alloc_fid(env, osd->od_cl_seq, fid);
 }
@@ -8110,6 +8477,7 @@ const struct lu_device_operations osd_lu_ops = {
        .ldo_process_config    = osd_process_config,
        .ldo_recovery_complete = osd_recovery_complete,
        .ldo_prepare           = osd_prepare,
+       .ldo_fid_alloc         = osd_fid_alloc,
 };
 
 static const struct lu_device_type_operations osd_device_type_ops = {
@@ -8148,7 +8516,6 @@ static const struct obd_ops osd_obd_device_ops = {
        .o_owner = THIS_MODULE,
        .o_connect      = osd_obd_connect,
        .o_disconnect   = osd_obd_disconnect,
-       .o_fid_alloc    = osd_fid_alloc,
        .o_health_check = osd_health_check,
 };
 
@@ -8196,13 +8563,10 @@ static int __init osd_init(void)
 
 #ifdef CONFIG_KALLSYMS
        priv_security_file_alloc =
-               (void *)kallsyms_lookup_name("security_file_alloc");
-       priv_dev_set_rdonly = (void *)kallsyms_lookup_name("dev_set_rdonly");
-       priv_dev_check_rdonly =
-               (void *)kallsyms_lookup_name("dev_check_rdonly");
+               (void *)cfs_kallsyms_lookup_name("security_file_alloc");
 #endif
 
-       rc = class_register_type(&osd_obd_device_ops, NULL, true, NULL,
+       rc = class_register_type(&osd_obd_device_ops, NULL, true,
                                 LUSTRE_OSD_LDISKFS_NAME, &osd_device_type);
        if (rc) {
                lu_kmem_fini(ldiskfs_caches);