Whamcloud - gitweb
LU-5855 lfsck: misc fixes for zfs-based backend
[fs/lustre-release.git] / lustre / osd-ldiskfs / osd_handler.c
index b656a79..3c09617 100644 (file)
@@ -73,6 +73,7 @@
 #include <lustre_quota.h>
 
 #include <ldiskfs/xattr.h>
+#include <lustre_linkea.h>
 
 int ldiskfs_pdo = 1;
 CFS_MODULE_PARM(ldiskfs_pdo, "i", int, 0644,
@@ -890,7 +891,7 @@ static int osd_param_is_not_sane(const struct osd_device *dev,
 {
        struct osd_thandle *oh = container_of(th, typeof(*oh), ot_super);
 
-       return oh->ot_credits > osd_journal(dev)->j_max_transaction_buffers;
+       return oh->ot_credits > osd_transaction_size(dev);
 }
 
 /*
@@ -1029,7 +1030,8 @@ int osd_trans_start(const struct lu_env *env, struct dt_device *d,
                      oti->oti_declare_ops_cred[OSD_OT_REF_DEL]);
 
                if (last_credits != oh->ot_credits &&
-                   time_after(jiffies, last_printed + 60 * HZ)) {
+                   time_after(jiffies, last_printed +
+                              msecs_to_jiffies(60 * MSEC_PER_SEC))) {
                        libcfs_debug_dumpstack(NULL);
                        last_credits = oh->ot_credits;
                        last_printed = jiffies;
@@ -1040,7 +1042,7 @@ int osd_trans_start(const struct lu_env *env, struct dt_device *d,
                 *
                 *     This should be removed when we can calculate the
                 *     credits precisely. */
-               oh->ot_credits = osd_journal(dev)->j_max_transaction_buffers;
+               oh->ot_credits = osd_transaction_size(dev);
        }
 
         /*
@@ -1710,17 +1712,18 @@ static void osd_inode_getattr(const struct lu_env *env,
 }
 
 static int osd_attr_get(const struct lu_env *env,
-                        struct dt_object *dt,
-                        struct lu_attr *attr,
-                        struct lustre_capa *capa)
+                       struct dt_object *dt,
+                       struct lu_attr *attr,
+                       struct lustre_capa *capa)
 {
-        struct osd_object *obj = osd_dt_obj(dt);
+       struct osd_object *obj = osd_dt_obj(dt);
 
-       LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
-        LINVRNT(osd_invariant(obj));
+       LASSERT(dt_object_exists(dt));
+       LASSERT(!dt_object_remote(dt));
+       LINVRNT(osd_invariant(obj));
 
-        if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
-                return -EACCES;
+       if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
+               return -EACCES;
 
        spin_lock(&obj->oo_guard);
        osd_inode_getattr(env, obj->oo_inode, attr);
@@ -1741,7 +1744,7 @@ static int osd_declare_attr_set(const struct lu_env *env,
        qid_t                   gid;
        long long               bspace;
        int                     rc = 0;
-       bool                    allocated;
+       bool                    enforce;
        ENTRY;
 
        LASSERT(dt != NULL);
@@ -1769,18 +1772,19 @@ static int osd_declare_attr_set(const struct lu_env *env,
         * We still need to call the osd_declare_qid() to calculate the journal
         * credits for updating quota accounting files and to trigger quota
         * space adjustment once the operation is completed.*/
-       if ((attr->la_valid & LA_UID) != 0 &&
-            attr->la_uid != (uid = i_uid_read(obj->oo_inode))) {
+       if (attr->la_valid & LA_UID || attr->la_valid & LA_GID) {
+               /* USERQUOTA */
+               uid = i_uid_read(obj->oo_inode);
                qi->lqi_type = USRQUOTA;
-
+               enforce = (attr->la_valid & LA_UID) && (attr->la_uid != uid);
                /* inode accounting */
                qi->lqi_is_blk = false;
 
-               /* one more inode for the new owner ... */
+               /* one more inode for the new uid ... */
                qi->lqi_id.qid_uid = attr->la_uid;
                qi->lqi_space      = 1;
-               allocated = (attr->la_uid == 0) ? true : false;
-               rc = osd_declare_qid(env, oh, qi, allocated, NULL);
+               /* Reserve credits for the new uid */
+               rc = osd_declare_qid(env, oh, qi, NULL, enforce, NULL);
                if (rc == -EDQUOT || rc == -EINPROGRESS)
                        rc = 0;
                if (rc)
@@ -1789,7 +1793,7 @@ static int osd_declare_attr_set(const struct lu_env *env,
                /* and one less inode for the current uid */
                qi->lqi_id.qid_uid = uid;
                qi->lqi_space      = -1;
-               rc = osd_declare_qid(env, oh, qi, true, NULL);
+               rc = osd_declare_qid(env, oh, qi, obj, enforce, NULL);
                if (rc == -EDQUOT || rc == -EINPROGRESS)
                        rc = 0;
                if (rc)
@@ -1798,38 +1802,40 @@ static int osd_declare_attr_set(const struct lu_env *env,
                /* block accounting */
                qi->lqi_is_blk = true;
 
-               /* more blocks for the new owner ... */
+               /* more blocks for the new uid ... */
                qi->lqi_id.qid_uid = attr->la_uid;
                qi->lqi_space      = bspace;
-               allocated = (attr->la_uid == 0) ? true : false;
-               rc = osd_declare_qid(env, oh, qi, allocated, NULL);
+               /*
+                * Credits for the new uid has been reserved, re-use "obj"
+                * to save credit reservation.
+                */
+               rc = osd_declare_qid(env, oh, qi, obj, enforce, NULL);
                if (rc == -EDQUOT || rc == -EINPROGRESS)
                        rc = 0;
                if (rc)
                        RETURN(rc);
 
-               /* and finally less blocks for the current owner */
+               /* and finally less blocks for the current uid */
                qi->lqi_id.qid_uid = uid;
                qi->lqi_space      = -bspace;
-               rc = osd_declare_qid(env, oh, qi, true, NULL);
+               rc = osd_declare_qid(env, oh, qi, obj, enforce, NULL);
                if (rc == -EDQUOT || rc == -EINPROGRESS)
                        rc = 0;
                if (rc)
                        RETURN(rc);
-       }
 
-       if (attr->la_valid & LA_GID &&
-           attr->la_gid != (gid = i_gid_read(obj->oo_inode))) {
+               /* GROUP QUOTA */
+               gid = i_gid_read(obj->oo_inode);
                qi->lqi_type = GRPQUOTA;
+               enforce = (attr->la_valid & LA_GID) && (attr->la_gid != gid);
 
                /* inode accounting */
                qi->lqi_is_blk = false;
 
-               /* one more inode for the new group owner ... */
+               /* one more inode for the new gid ... */
                qi->lqi_id.qid_gid = attr->la_gid;
                qi->lqi_space      = 1;
-               allocated = (attr->la_gid == 0) ? true : false;
-               rc = osd_declare_qid(env, oh, qi, allocated, NULL);
+               rc = osd_declare_qid(env, oh, qi, NULL, enforce, NULL);
                if (rc == -EDQUOT || rc == -EINPROGRESS)
                        rc = 0;
                if (rc)
@@ -1838,7 +1844,7 @@ static int osd_declare_attr_set(const struct lu_env *env,
                /* and one less inode for the current gid */
                qi->lqi_id.qid_gid = gid;
                qi->lqi_space      = -1;
-               rc = osd_declare_qid(env, oh, qi, true, NULL);
+               rc = osd_declare_qid(env, oh, qi, obj, enforce, NULL);
                if (rc == -EDQUOT || rc == -EINPROGRESS)
                        rc = 0;
                if (rc)
@@ -1847,20 +1853,19 @@ static int osd_declare_attr_set(const struct lu_env *env,
                /* block accounting */
                qi->lqi_is_blk = true;
 
-               /* more blocks for the new owner ... */
+               /* more blocks for the new gid ... */
                qi->lqi_id.qid_gid = attr->la_gid;
                qi->lqi_space      = bspace;
-               allocated = (attr->la_gid == 0) ? true : false;
-               rc = osd_declare_qid(env, oh, qi, allocated, NULL);
+               rc = osd_declare_qid(env, oh, qi, obj, enforce, NULL);
                if (rc == -EDQUOT || rc == -EINPROGRESS)
                        rc = 0;
                if (rc)
                        RETURN(rc);
 
-               /* and finally less blocks for the current owner */
+               /* and finally less blocks for the current gid */
                qi->lqi_id.qid_gid = gid;
                qi->lqi_space      = -bspace;
-               rc = osd_declare_qid(env, oh, qi, true, NULL);
+               rc = osd_declare_qid(env, oh, qi, obj, enforce, NULL);
                if (rc == -EDQUOT || rc == -EINPROGRESS)
                        rc = 0;
                if (rc)
@@ -1926,6 +1931,7 @@ static int osd_quota_transfer(struct inode *inode, const struct lu_attr *attr)
                struct iattr    iattr;
                int             rc;
 
+               ll_vfs_dq_init(inode);
                iattr.ia_valid = 0;
                if (attr->la_valid & LA_UID)
                        iattr.ia_valid |= ATTR_UID;
@@ -1946,21 +1952,22 @@ static int osd_quota_transfer(struct inode *inode, const struct lu_attr *attr)
 }
 
 static int osd_attr_set(const struct lu_env *env,
-                        struct dt_object *dt,
-                        const struct lu_attr *attr,
-                        struct thandle *handle,
-                        struct lustre_capa *capa)
+                       struct dt_object *dt,
+                       const struct lu_attr *attr,
+                       struct thandle *handle,
+                       struct lustre_capa *capa)
 {
-        struct osd_object *obj = osd_dt_obj(dt);
-        struct inode      *inode;
-        int rc;
+       struct osd_object *obj = osd_dt_obj(dt);
+       struct inode      *inode;
+       int rc;
 
-        LASSERT(handle != NULL);
-       LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
-        LASSERT(osd_invariant(obj));
+       LASSERT(handle != NULL);
+       LASSERT(dt_object_exists(dt));
+       LASSERT(!dt_object_remote(dt));
+       LASSERT(osd_invariant(obj));
 
-        if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
-                return -EACCES;
+       if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
+               return -EACCES;
 
        osd_trans_exec_op(env, handle, OSD_OT_ATTR_SET);
 
@@ -1990,7 +1997,6 @@ static int osd_attr_set(const struct lu_env *env,
        }
 
         inode = obj->oo_inode;
-       ll_vfs_dq_init(inode);
 
        rc = osd_quota_transfer(inode, attr);
        if (rc)
@@ -2380,7 +2386,7 @@ static int osd_declare_object_create(const struct lu_env *env,
                RETURN(0);
 
        rc = osd_declare_inode_qid(env, attr->la_uid, attr->la_gid, 1, oh,
-                                  false, false, NULL, false);
+                                  osd_dt_obj(dt), false, NULL, false);
        if (rc != 0)
                RETURN(rc);
 
@@ -2388,22 +2394,21 @@ static int osd_declare_object_create(const struct lu_env *env,
 }
 
 static int osd_object_create(const struct lu_env *env, struct dt_object *dt,
-                             struct lu_attr *attr,
-                             struct dt_allocation_hint *hint,
-                             struct dt_object_format *dof,
-                             struct thandle *th)
+                            struct lu_attr *attr,
+                            struct dt_allocation_hint *hint,
+                            struct dt_object_format *dof, struct thandle *th)
 {
-        const struct lu_fid    *fid    = lu_object_fid(&dt->do_lu);
-        struct osd_object      *obj    = osd_dt_obj(dt);
-        struct osd_thread_info *info   = osd_oti_get(env);
-        int result;
-
-        ENTRY;
+       const struct lu_fid     *fid    = lu_object_fid(&dt->do_lu);
+       struct osd_object       *obj    = osd_dt_obj(dt);
+       struct osd_thread_info  *info   = osd_oti_get(env);
+       int result;
+       ENTRY;
 
-        LINVRNT(osd_invariant(obj));
-       LASSERT(!dt_object_exists(dt) && !dt_object_remote(dt));
-        LASSERT(osd_write_locked(env, obj));
-        LASSERT(th != NULL);
+       LINVRNT(osd_invariant(obj));
+       LASSERT(!dt_object_exists(dt));
+       LASSERT(!dt_object_remote(dt));
+       LASSERT(osd_write_locked(env, obj));
+       LASSERT(th != NULL);
 
        if (unlikely(fid_is_acct(fid)))
                /* Quota files can't be created from the kernel any more,
@@ -2451,12 +2456,12 @@ static int osd_declare_object_destroy(const struct lu_env *env,
                             osd_dto_credits_noquota[DTO_INDEX_DELETE] + 3);
        /* one less inode */
        rc = osd_declare_inode_qid(env, i_uid_read(inode), i_gid_read(inode),
-                                  -1, oh, false, true, NULL, false);
+                                  -1, oh, obj, false, NULL, false);
        if (rc)
                RETURN(rc);
        /* data to be truncated */
        rc = osd_declare_inode_qid(env, i_uid_read(inode), i_gid_read(inode),
-                                  0, oh, true, true, NULL, false);
+                                  0, oh, obj, true, NULL, false);
        RETURN(rc);
 }
 
@@ -2716,22 +2721,23 @@ static int osd_delete_local_agent_inode(const struct lu_env *env,
  * \retval -ve, on error
  */
 static int osd_object_ea_create(const struct lu_env *env, struct dt_object *dt,
-                                struct lu_attr *attr,
-                                struct dt_allocation_hint *hint,
-                                struct dt_object_format *dof,
-                                struct thandle *th)
+                               struct lu_attr *attr,
+                               struct dt_allocation_hint *hint,
+                               struct dt_object_format *dof,
+                               struct thandle *th)
 {
-        const struct lu_fid    *fid    = lu_object_fid(&dt->do_lu);
-        struct osd_object      *obj    = osd_dt_obj(dt);
-        struct osd_thread_info *info   = osd_oti_get(env);
-        int                     result;
+       const struct lu_fid     *fid    = lu_object_fid(&dt->do_lu);
+       struct osd_object       *obj    = osd_dt_obj(dt);
+       struct osd_thread_info  *info   = osd_oti_get(env);
+       int                      result;
 
-        ENTRY;
+       ENTRY;
 
-        LASSERT(osd_invariant(obj));
-       LASSERT(!dt_object_exists(dt) && !dt_object_remote(dt));
-        LASSERT(osd_write_locked(env, obj));
-        LASSERT(th != NULL);
+       LASSERT(osd_invariant(obj));
+       LASSERT(!dt_object_exists(dt));
+       LASSERT(!dt_object_remote(dt));
+       LASSERT(osd_write_locked(env, obj));
+       LASSERT(th != NULL);
 
        if (unlikely(fid_is_acct(fid)))
                /* Quota files can't be created from the kernel any more,
@@ -2741,7 +2747,7 @@ static int osd_object_ea_create(const struct lu_env *env, struct dt_object *dt,
        osd_trans_exec_op(env, th, OSD_OT_CREATE);
        osd_trans_declare_rb(env, th, OSD_OT_REF_ADD);
 
-        result = __osd_object_create(info, obj, attr, hint, dof, th);
+       result = __osd_object_create(info, obj, attr, hint, dof, th);
        if (result == 0)
                result = osd_ea_fid_set(info, obj->oo_inode, fid,
                                fid_is_on_ost(info, osd_obj2dev(obj),
@@ -2779,20 +2785,21 @@ static int osd_declare_object_ref_add(const struct lu_env *env,
  * Concurrency: @dt is write locked.
  */
 static int osd_object_ref_add(const struct lu_env *env,
-                              struct dt_object *dt, struct thandle *th)
+                             struct dt_object *dt, struct thandle *th)
 {
        struct osd_object  *obj = osd_dt_obj(dt);
        struct inode       *inode = obj->oo_inode;
        struct osd_thandle *oh;
        int                 rc = 0;
 
-        LINVRNT(osd_invariant(obj));
-       LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
-        LASSERT(osd_write_locked(env, obj));
-        LASSERT(th != NULL);
+       LINVRNT(osd_invariant(obj));
+       LASSERT(dt_object_exists(dt));
+       LASSERT(!dt_object_remote(dt));
+       LASSERT(osd_write_locked(env, obj));
+       LASSERT(th != NULL);
 
-        oh = container_of0(th, struct osd_thandle, ot_super);
-        LASSERT(oh->ot_handle != NULL);
+       oh = container_of0(th, struct osd_thandle, ot_super);
+       LASSERT(oh->ot_handle != NULL);
 
        osd_trans_exec_op(env, th, OSD_OT_REF_ADD);
 
@@ -2827,16 +2834,17 @@ static int osd_object_ref_add(const struct lu_env *env,
 }
 
 static int osd_declare_object_ref_del(const struct lu_env *env,
-                                      struct dt_object *dt,
-                                      struct thandle *handle)
+                                     struct dt_object *dt,
+                                     struct thandle *handle)
 {
-        struct osd_thandle *oh;
+       struct osd_thandle *oh;
 
-       LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
-        LASSERT(handle != NULL);
+       LASSERT(dt_object_exists(dt));
+       LASSERT(!dt_object_remote(dt));
+       LASSERT(handle != NULL);
 
-        oh = container_of0(handle, struct osd_thandle, ot_super);
-        LASSERT(oh->ot_handle == NULL);
+       oh = container_of0(handle, struct osd_thandle, ot_super);
+       LASSERT(oh->ot_handle == NULL);
 
        osd_trans_declare_op(env, oh, OSD_OT_REF_DEL,
                             osd_dto_credits_noquota[DTO_ATTR_SET_BASE]);
@@ -2848,7 +2856,7 @@ static int osd_declare_object_ref_del(const struct lu_env *env,
  * Concurrency: @dt is write locked.
  */
 static int osd_object_ref_del(const struct lu_env *env, struct dt_object *dt,
-                              struct thandle *th)
+                             struct thandle *th)
 {
        struct osd_object       *obj = osd_dt_obj(dt);
        struct inode            *inode = obj->oo_inode;
@@ -2856,12 +2864,13 @@ static int osd_object_ref_del(const struct lu_env *env, struct dt_object *dt,
        struct osd_thandle      *oh;
 
        LINVRNT(osd_invariant(obj));
-       LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
+       LASSERT(dt_object_exists(dt));
+       LASSERT(!dt_object_remote(dt));
        LASSERT(osd_write_locked(env, obj));
        LASSERT(th != NULL);
 
-        oh = container_of0(th, struct osd_thandle, ot_super);
-        LASSERT(oh->ot_handle != NULL);
+       oh = container_of0(th, struct osd_thandle, ot_super);
+       LASSERT(oh->ot_handle != NULL);
 
        osd_trans_exec_op(env, th, OSD_OT_REF_DEL);
 
@@ -2920,16 +2929,24 @@ static int osd_xattr_get(const struct lu_env *env, struct dt_object *dt,
         if (strcmp(name, XATTR_NAME_VERSION) == 0) {
                 /* for version we are just using xattr API but change inode
                  * field instead */
-                LASSERT(buf->lb_len == sizeof(dt_obj_version_t));
-                osd_object_version_get(env, dt, buf->lb_buf);
-                return sizeof(dt_obj_version_t);
+               if (buf->lb_len == 0)
+                       return sizeof(dt_obj_version_t);
+
+               if (buf->lb_len < sizeof(dt_obj_version_t))
+                       return -ERANGE;
+
+               osd_object_version_get(env, dt, buf->lb_buf);
+
+               return sizeof(dt_obj_version_t);
         }
 
-       LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
-        LASSERT(inode->i_op != NULL && inode->i_op->getxattr != NULL);
+       LASSERT(dt_object_exists(dt));
+       LASSERT(!dt_object_remote(dt));
+       LASSERT(inode->i_op != NULL);
+       LASSERT(inode->i_op->getxattr != NULL);
 
-        if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
-                return -EACCES;
+       if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
+               return -EACCES;
 
        return __osd_xattr_get(inode, dentry, name, buf->lb_buf, buf->lb_len);
 }
@@ -2942,6 +2959,7 @@ static int osd_declare_xattr_set(const struct lu_env *env,
 {
        struct osd_thandle *oh;
        int credits;
+       struct super_block *sb = osd_sb(osd_dev(dt->do_lu.lo_dev));
 
        LASSERT(handle != NULL);
 
@@ -2957,13 +2975,16 @@ static int osd_declare_xattr_set(const struct lu_env *env,
        } else if (strcmp(name, XATTR_NAME_VERSION) == 0) {
                credits = 1;
        } else {
-               struct osd_device  *osd = osd_dev(dt->do_lu.lo_dev);
-               struct super_block *sb = osd_sb(osd);
                credits = osd_dto_credits_noquota[DTO_XATTR_SET];
                if (buf && buf->lb_len > sb->s_blocksize) {
                        credits *= (buf->lb_len + sb->s_blocksize - 1) >>
                                        sb->s_blocksize_bits;
                }
+               /*
+                * xattr set may involve inode quota change, reserve credits for
+                * dquot_initialize()
+                */
+               oh->ot_credits += LDISKFS_MAXQUOTAS_INIT_BLOCKS(sb);
        }
 
        osd_trans_declare_op(env, oh, OSD_OT_XATTR_SET, credits);
@@ -3016,7 +3037,7 @@ static int osd_xattr_set(const struct lu_env *env, struct dt_object *dt,
         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
                 return -EACCES;
 
-       CDEBUG(D_INODE, DFID" set xattr '%s' with size %zd\n",
+       CDEBUG(D_INODE, DFID" set xattr '%s' with size %zu\n",
               PFID(lu_object_fid(&dt->do_lu)), name, buf->lb_len);
 
        osd_trans_exec_op(env, handle, OSD_OT_XATTR_SET);
@@ -3042,6 +3063,10 @@ static int osd_xattr_set(const struct lu_env *env, struct dt_object *dt,
                        RETURN(rc);
        }
 
+       if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LINKEA_OVERFLOW) &&
+           strcmp(name, XATTR_NAME_LINK) == 0)
+               return -ENOSPC;
+
        return __osd_xattr_set(info, inode, name, buf->lb_buf, buf->lb_len,
                               fs_flags);
 }
@@ -3050,38 +3075,47 @@ static int osd_xattr_set(const struct lu_env *env, struct dt_object *dt,
  * Concurrency: @dt is read locked.
  */
 static int osd_xattr_list(const struct lu_env *env, struct dt_object *dt,
-                          struct lu_buf *buf, struct lustre_capa *capa)
+                         struct lu_buf *buf, struct lustre_capa *capa)
 {
-        struct osd_object      *obj    = osd_dt_obj(dt);
-        struct inode           *inode  = obj->oo_inode;
-        struct osd_thread_info *info   = osd_oti_get(env);
-        struct dentry          *dentry = &info->oti_obj_dentry;
+       struct osd_object      *obj    = osd_dt_obj(dt);
+       struct inode           *inode  = obj->oo_inode;
+       struct osd_thread_info *info   = osd_oti_get(env);
+       struct dentry          *dentry = &info->oti_obj_dentry;
 
-       LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
-        LASSERT(inode->i_op != NULL && inode->i_op->listxattr != NULL);
+       LASSERT(dt_object_exists(dt));
+       LASSERT(!dt_object_remote(dt));
+       LASSERT(inode->i_op != NULL);
+       LASSERT(inode->i_op->listxattr != NULL);
 
-        if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
-                return -EACCES;
+       if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
+               return -EACCES;
 
-        dentry->d_inode = inode;
+       dentry->d_inode = inode;
        dentry->d_sb = inode->i_sb;
-        return inode->i_op->listxattr(dentry, buf->lb_buf, buf->lb_len);
+       return inode->i_op->listxattr(dentry, buf->lb_buf, buf->lb_len);
 }
 
 static int osd_declare_xattr_del(const struct lu_env *env,
-                                 struct dt_object *dt, const char *name,
-                                 struct thandle *handle)
+                                struct dt_object *dt, const char *name,
+                                struct thandle *handle)
 {
-        struct osd_thandle *oh;
+       struct osd_thandle *oh;
+       struct super_block *sb = osd_sb(osd_dev(dt->do_lu.lo_dev));
 
-       LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
-        LASSERT(handle != NULL);
+       LASSERT(dt_object_exists(dt));
+       LASSERT(!dt_object_remote(dt));
+       LASSERT(handle != NULL);
 
-        oh = container_of0(handle, struct osd_thandle, ot_super);
-        LASSERT(oh->ot_handle == NULL);
+       oh = container_of0(handle, struct osd_thandle, ot_super);
+       LASSERT(oh->ot_handle == NULL);
 
        osd_trans_declare_op(env, oh, OSD_OT_XATTR_SET,
                             osd_dto_credits_noquota[DTO_XATTR_SET]);
+       /*
+        * xattr del may involve inode quota change, reserve credits for
+        * dquot_initialize()
+        */
+       oh->ot_credits += LDISKFS_MAXQUOTAS_INIT_BLOCKS(sb);
 
        return 0;
 }
@@ -3090,29 +3124,31 @@ static int osd_declare_xattr_del(const struct lu_env *env,
  * Concurrency: @dt is write locked.
  */
 static int osd_xattr_del(const struct lu_env *env, struct dt_object *dt,
-                         const char *name, struct thandle *handle,
-                         struct lustre_capa *capa)
+                        const char *name, struct thandle *handle,
+                        struct lustre_capa *capa)
 {
-        struct osd_object      *obj    = osd_dt_obj(dt);
-        struct inode           *inode  = obj->oo_inode;
-        struct osd_thread_info *info   = osd_oti_get(env);
-        struct dentry          *dentry = &info->oti_obj_dentry;
-        int                     rc;
+       struct osd_object      *obj    = osd_dt_obj(dt);
+       struct inode           *inode  = obj->oo_inode;
+       struct osd_thread_info *info   = osd_oti_get(env);
+       struct dentry          *dentry = &info->oti_obj_dentry;
+       int                     rc;
 
-       LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
-        LASSERT(inode->i_op != NULL && inode->i_op->removexattr != NULL);
-        LASSERT(handle != NULL);
+       LASSERT(dt_object_exists(dt));
+       LASSERT(!dt_object_remote(dt));
+       LASSERT(inode->i_op != NULL);
+       LASSERT(inode->i_op->removexattr != NULL);
+       LASSERT(handle != NULL);
 
-        if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
-                return -EACCES;
+       if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
+               return -EACCES;
 
        osd_trans_exec_op(env, handle, OSD_OT_XATTR_SET);
 
        ll_vfs_dq_init(inode);
-        dentry->d_inode = inode;
+       dentry->d_inode = inode;
        dentry->d_sb = inode->i_sb;
-        rc = inode->i_op->removexattr(dentry, name);
-        return rc;
+       rc = inode->i_op->removexattr(dentry, name);
+       return rc;
 }
 
 static struct obd_capa *osd_capa_get(const struct lu_env *env,
@@ -3133,7 +3169,8 @@ static struct obd_capa *osd_capa_get(const struct lu_env *env,
        if (!osd->od_fl_capa)
                RETURN(ERR_PTR(-ENOENT));
 
-       LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
+       LASSERT(dt_object_exists(dt));
+       LASSERT(!dt_object_remote(dt));
        LINVRNT(osd_invariant(obj));
 
        /* renewal sanity check */
@@ -3483,28 +3520,27 @@ static int osd_index_declare_iam_delete(const struct lu_env *env,
  *      \retval  0  success
  *      \retval -ve   failure
  */
-
 static int osd_index_iam_delete(const struct lu_env *env, struct dt_object *dt,
-                                const struct dt_key *key,
-                                struct thandle *handle,
-                                struct lustre_capa *capa)
+                               const struct dt_key *key,
+                               struct thandle *handle,
+                               struct lustre_capa *capa)
 {
-        struct osd_thread_info *oti = osd_oti_get(env);
-        struct osd_object      *obj = osd_dt_obj(dt);
-        struct osd_thandle     *oh;
-        struct iam_path_descr  *ipd;
-        struct iam_container   *bag = &obj->oo_dir->od_container;
-        int                     rc;
-
-        ENTRY;
+       struct osd_thread_info *oti = osd_oti_get(env);
+       struct osd_object      *obj = osd_dt_obj(dt);
+       struct osd_thandle     *oh;
+       struct iam_path_descr  *ipd;
+       struct iam_container   *bag = &obj->oo_dir->od_container;
+       int                     rc;
+       ENTRY;
 
-        LINVRNT(osd_invariant(obj));
-       LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
-        LASSERT(bag->ic_object == obj->oo_inode);
-        LASSERT(handle != NULL);
+       LINVRNT(osd_invariant(obj));
+       LASSERT(dt_object_exists(dt));
+       LASSERT(!dt_object_remote(dt));
+       LASSERT(bag->ic_object == obj->oo_inode);
+       LASSERT(handle != NULL);
 
-        if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_DELETE))
-                RETURN(-EACCES);
+       if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_DELETE))
+               RETURN(-EACCES);
 
        osd_trans_exec_op(env, handle, OSD_OT_DELETE);
 
@@ -3538,7 +3574,8 @@ static int osd_index_declare_ea_delete(const struct lu_env *env,
        int                 rc;
        ENTRY;
 
-       LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
+       LASSERT(dt_object_exists(dt));
+       LASSERT(!dt_object_remote(dt));
        LASSERT(handle != NULL);
 
        oh = container_of0(handle, struct osd_thandle, ot_super);
@@ -3551,7 +3588,7 @@ static int osd_index_declare_ea_delete(const struct lu_env *env,
        LASSERT(inode);
 
        rc = osd_declare_inode_qid(env, i_uid_read(inode), i_gid_read(inode),
-                                  0, oh, true, true, NULL, false);
+                                  0, oh, osd_dt_obj(dt), true, NULL, false);
        RETURN(rc);
 }
 
@@ -3602,25 +3639,25 @@ static int osd_remote_fid(const struct lu_env *env, struct osd_device *osd,
  * \retval -ve, on error
  */
 static int osd_index_ea_delete(const struct lu_env *env, struct dt_object *dt,
-                               const struct dt_key *key,
-                               struct thandle *handle,
-                               struct lustre_capa *capa)
+                              const struct dt_key *key, struct thandle *handle,
+                              struct lustre_capa *capa)
 {
-        struct osd_object          *obj    = osd_dt_obj(dt);
-        struct inode               *dir    = obj->oo_inode;
-        struct dentry              *dentry;
-        struct osd_thandle         *oh;
+       struct osd_object          *obj = osd_dt_obj(dt);
+       struct inode               *dir = obj->oo_inode;
+       struct dentry              *dentry;
+       struct osd_thandle         *oh;
        struct ldiskfs_dir_entry_2 *de = NULL;
-        struct buffer_head         *bh;
-        struct htree_lock          *hlock = NULL;
+       struct buffer_head         *bh;
+       struct htree_lock          *hlock = NULL;
        struct lu_fid              *fid = &osd_oti_get(env)->oti_fid;
        struct osd_device          *osd = osd_dev(dt->do_lu.lo_dev);
        int                        rc;
-        ENTRY;
+       ENTRY;
 
-        LINVRNT(osd_invariant(obj));
-       LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
-        LASSERT(handle != NULL);
+       LINVRNT(osd_invariant(obj));
+       LASSERT(dt_object_exists(dt));
+       LASSERT(!dt_object_remote(dt));
+       LASSERT(handle != NULL);
 
        osd_trans_exec_op(env, handle, OSD_OT_DELETE);
 
@@ -3759,22 +3796,22 @@ out:
  *      \retval -ve   failure
  */
 static int osd_index_iam_lookup(const struct lu_env *env, struct dt_object *dt,
-                                struct dt_rec *rec, const struct dt_key *key,
-                                struct lustre_capa *capa)
+                               struct dt_rec *rec, const struct dt_key *key,
+                               struct lustre_capa *capa)
 {
-        struct osd_object      *obj = osd_dt_obj(dt);
-        struct iam_path_descr  *ipd;
-        struct iam_container   *bag = &obj->oo_dir->od_container;
-        struct osd_thread_info *oti = osd_oti_get(env);
-        struct iam_iterator    *it = &oti->oti_idx_it;
-        struct iam_rec         *iam_rec;
-        int                     rc;
-
-        ENTRY;
+       struct osd_object      *obj = osd_dt_obj(dt);
+       struct iam_path_descr  *ipd;
+       struct iam_container   *bag = &obj->oo_dir->od_container;
+       struct osd_thread_info *oti = osd_oti_get(env);
+       struct iam_iterator    *it = &oti->oti_idx_it;
+       struct iam_rec         *iam_rec;
+       int                     rc;
+       ENTRY;
 
-        LASSERT(osd_invariant(obj));
-       LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
-        LASSERT(bag->ic_object == obj->oo_inode);
+       LASSERT(osd_invariant(obj));
+       LASSERT(dt_object_exists(dt));
+       LASSERT(!dt_object_remote(dt));
+       LASSERT(bag->ic_object == obj->oo_inode);
 
         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_LOOKUP))
                 RETURN(-EACCES);
@@ -3848,26 +3885,26 @@ static int osd_index_declare_iam_insert(const struct lu_env *env,
  *      \retval -ve failure
  */
 static int osd_index_iam_insert(const struct lu_env *env, struct dt_object *dt,
-                                const struct dt_rec *rec,
-                                const struct dt_key *key, struct thandle *th,
-                                struct lustre_capa *capa, int ignore_quota)
-{
-        struct osd_object     *obj = osd_dt_obj(dt);
-        struct iam_path_descr *ipd;
-        struct osd_thandle    *oh;
-        struct iam_container  *bag = &obj->oo_dir->od_container;
-        struct osd_thread_info *oti = osd_oti_get(env);
-        struct iam_rec         *iam_rec;
-        int                     rc;
-
-        ENTRY;
+                               const struct dt_rec *rec,
+                               const struct dt_key *key, struct thandle *th,
+                               struct lustre_capa *capa, int ignore_quota)
+{
+       struct osd_object     *obj = osd_dt_obj(dt);
+       struct iam_path_descr *ipd;
+       struct osd_thandle    *oh;
+       struct iam_container  *bag = &obj->oo_dir->od_container;
+       struct osd_thread_info *oti = osd_oti_get(env);
+       struct iam_rec         *iam_rec;
+       int                     rc;
+       ENTRY;
 
-        LINVRNT(osd_invariant(obj));
-       LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
-        LASSERT(bag->ic_object == obj->oo_inode);
-        LASSERT(th != NULL);
+       LINVRNT(osd_invariant(obj));
+       LASSERT(dt_object_exists(dt));
+       LASSERT(!dt_object_remote(dt));
+       LASSERT(bag->ic_object == obj->oo_inode);
+       LASSERT(th != NULL);
 
-        if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT))
+       if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT))
                RETURN(-EACCES);
 
        osd_trans_exec_op(env, th, OSD_OT_INSERT);
@@ -3933,6 +3970,29 @@ static int __osd_ea_add_rec(struct osd_thread_info *info,
        child->d_fsdata = (void *)ldp;
        ll_vfs_dq_init(pobj->oo_inode);
        rc = osd_ldiskfs_add_entry(oth->ot_handle, child, cinode, hlock);
+       if (rc == 0 && OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_TYPE)) {
+               struct ldiskfs_dir_entry_2      *de;
+               struct buffer_head              *bh;
+               int                              rc1;
+
+               bh = osd_ldiskfs_find_entry(pobj->oo_inode, &child->d_name, &de,
+                                           NULL, hlock);
+               if (bh != NULL) {
+                       rc1 = ldiskfs_journal_get_write_access(oth->ot_handle,
+                                                              bh);
+                       if (rc1 == 0) {
+                               if (S_ISDIR(cinode->i_mode))
+                                       de->file_type = LDISKFS_DIRENT_LUFID |
+                                                       LDISKFS_FT_REG_FILE;
+                               else
+                                       de->file_type = LDISKFS_DIRENT_LUFID |
+                                                       LDISKFS_FT_DIR;
+                               ldiskfs_journal_dirty_metadata(oth->ot_handle,
+                                                              bh);
+                               brelse(bh);
+                       }
+               }
+       }
 
        RETURN(rc);
 }
@@ -3981,9 +4041,19 @@ static int osd_add_dot_dotdot(struct osd_thread_info *info,
                                                dot_dot_fid, NULL, th);
                }
 
-               result = osd_add_dot_dotdot_internal(info, dir->oo_inode,
-                                                    parent_dir, dot_fid,
-                                                    dot_dot_fid, oth);
+               if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_PARENT)) {
+                       struct lu_fid tfid = *dot_dot_fid;
+
+                       tfid.f_oid--;
+                       result = osd_add_dot_dotdot_internal(info,
+                                       dir->oo_inode, parent_dir, dot_fid,
+                                       &tfid, oth);
+               } else {
+                       result = osd_add_dot_dotdot_internal(info,
+                                       dir->oo_inode, parent_dir, dot_fid,
+                                       dot_dot_fid, oth);
+               }
+
                if (result == 0)
                        dir->oo_compat_dotdot_created = 1;
        }
@@ -4131,6 +4201,83 @@ int osd_add_oi_cache(struct osd_thread_info *info, struct osd_device *osd,
 }
 
 /**
+ * Get parent FID from the linkEA.
+ *
+ * For a directory which parent resides on remote MDT, to satisfy the
+ * local e2fsck, we insert it into the /REMOTE_PARENT_DIR locally. On
+ * the other hand, to make the lookup(..) on the directory can return
+ * the real parent FID, we append the real parent FID after its ".."
+ * name entry in the /REMOTE_PARENT_DIR.
+ *
+ * Unfortunately, such PFID-in-dirent cannot be preserved via file-level
+ * backup. So after the restore, we cannot get the right parent FID from
+ * its ".." name entry in the /REMOTE_PARENT_DIR. Under such case, since
+ * we have stored the real parent FID in the directory object's linkEA,
+ * we can parse the linkEA for the real parent FID.
+ *
+ * \param[in] env      pointer to the thread context
+ * \param[in] obj      pointer to the object to be handled
+ * \param[out]fid      pointer to the buffer to hold the parent FID
+ *
+ * \retval             0 for getting the real parent FID successfully
+ * \retval             negative error number on failure
+ */
+static int osd_get_pfid_from_linkea(const struct lu_env *env,
+                                   struct osd_object *obj,
+                                   struct lu_fid *fid)
+{
+       struct osd_thread_info  *oti    = osd_oti_get(env);
+       struct lu_buf           *buf    = &oti->oti_big_buf;
+       struct dentry           *dentry = &oti->oti_obj_dentry;
+       struct inode            *inode  = obj->oo_inode;
+       struct linkea_data       ldata  = { 0 };
+       int                      rc;
+       ENTRY;
+
+       fid_zero(fid);
+       if (!S_ISDIR(inode->i_mode))
+               RETURN(-EIO);
+
+again:
+       rc = __osd_xattr_get(inode, dentry, XATTR_NAME_LINK,
+                            buf->lb_buf, buf->lb_len);
+       if (rc == -ERANGE) {
+               rc = __osd_xattr_get(inode, dentry, XATTR_NAME_LINK,
+                                    NULL, 0);
+               if (rc > 0) {
+                       lu_buf_realloc(buf, rc);
+                       if (buf->lb_buf == NULL)
+                               RETURN(-ENOMEM);
+
+                       goto again;
+               }
+       }
+
+       if (unlikely(rc == 0))
+               RETURN(-ENODATA);
+
+       if (rc < 0)
+               RETURN(rc);
+
+       if (unlikely(buf->lb_buf == NULL)) {
+               lu_buf_realloc(buf, rc);
+               if (buf->lb_buf == NULL)
+                       RETURN(-ENOMEM);
+
+               goto again;
+       }
+
+       ldata.ld_buf = buf;
+       rc = linkea_init(&ldata);
+       if (rc == 0) {
+               linkea_first_entry(&ldata);
+               linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen, NULL, fid);
+       }
+
+       RETURN(rc);
+}
+
+/**
  * Calls ->lookup() to find dentry. From dentry get inode and
  * read inode's ea to get fid. This is required for  interoperability
  * mode (b11826)
@@ -4139,33 +4286,34 @@ int osd_add_oi_cache(struct osd_thread_info *info, struct osd_device *osd,
  * \retval -ve, on error
  */
 static int osd_ea_lookup_rec(const struct lu_env *env, struct osd_object *obj,
-                             struct dt_rec *rec, const struct dt_key *key)
-{
-        struct inode               *dir    = obj->oo_inode;
-        struct dentry              *dentry;
-        struct ldiskfs_dir_entry_2 *de;
-        struct buffer_head         *bh;
-        struct lu_fid              *fid = (struct lu_fid *) rec;
-        struct htree_lock          *hlock = NULL;
-        int                         ino;
-        int                         rc;
+                            struct dt_rec *rec, const struct dt_key *key)
+{
+       struct inode                    *dir    = obj->oo_inode;
+       struct dentry                   *dentry;
+       struct ldiskfs_dir_entry_2      *de;
+       struct buffer_head              *bh;
+       struct lu_fid                   *fid = (struct lu_fid *) rec;
+       struct htree_lock               *hlock = NULL;
+       int                             ino;
+       int                             rc;
        ENTRY;
 
-        LASSERT(dir->i_op != NULL && dir->i_op->lookup != NULL);
+       LASSERT(dir->i_op != NULL);
+       LASSERT(dir->i_op->lookup != NULL);
 
-        dentry = osd_child_dentry_get(env, obj,
-                                      (char *)key, strlen((char *)key));
+       dentry = osd_child_dentry_get(env, obj,
+                                     (char *)key, strlen((char *)key));
 
-        if (obj->oo_hl_head != NULL) {
-                hlock = osd_oti_get(env)->oti_hlock;
-                ldiskfs_htree_lock(hlock, obj->oo_hl_head,
-                                   dir, LDISKFS_HLOCK_LOOKUP);
-        } else {
+       if (obj->oo_hl_head != NULL) {
+               hlock = osd_oti_get(env)->oti_hlock;
+               ldiskfs_htree_lock(hlock, obj->oo_hl_head,
+                                  dir, LDISKFS_HLOCK_LOOKUP);
+       } else {
                down_read(&obj->oo_ext_idx_sem);
-        }
+       }
 
-        bh = osd_ldiskfs_find_entry(dir, &dentry->d_name, &de, NULL, hlock);
-        if (bh) {
+       bh = osd_ldiskfs_find_entry(dir, &dentry->d_name, &de, NULL, hlock);
+       if (bh) {
                struct osd_thread_info *oti = osd_oti_get(env);
                struct osd_inode_id *id = &oti->oti_id;
                struct osd_idmap_cache *oic = &oti->oti_cache;
@@ -4184,10 +4332,18 @@ static int osd_ea_lookup_rec(const struct lu_env *env, struct osd_object *obj,
 
                /* done with de, release bh */
                brelse(bh);
-               if (rc != 0)
-                       rc = osd_ea_fid_get(env, obj, ino, fid, id);
-               else
+               if (rc != 0) {
+                       if (unlikely(ino == osd_remote_parent_ino(dev)))
+                               /* If the parent is on remote MDT, and there
+                                * is no FID-in-dirent, then we have to get
+                                * the parent FID from the linkEA.  */
+                               rc = osd_get_pfid_from_linkea(env, obj, fid);
+                       else
+                               rc = osd_ea_fid_get(env, obj, ino, fid, id);
+               } else {
                        osd_id_gen(id, ino, OSD_OII_NOGEN);
+               }
+
                if (rc != 0) {
                        fid_zero(&oic->oic_fid);
                        GOTO(out, rc);
@@ -4318,8 +4474,8 @@ static int osd_index_declare_ea_insert(const struct lu_env *env,
                 * calculate how many blocks will be consumed by this index
                 * insert */
                rc = osd_declare_inode_qid(env, i_uid_read(inode),
-                                          i_gid_read(inode), 0,
-                                          oh, true, true, NULL, false);
+                                          i_gid_read(inode), 0, oh,
+                                          osd_dt_obj(dt), true, NULL, false);
        }
 
        if (fid == NULL)
@@ -4370,7 +4526,8 @@ static int osd_index_ea_insert(const struct lu_env *env, struct dt_object *dt,
        ENTRY;
 
        LASSERT(osd_invariant(obj));
-       LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
+       LASSERT(dt_object_exists(dt));
+       LASSERT(!dt_object_remote(dt));
        LASSERT(th != NULL);
 
        osd_trans_exec_op(env, th, OSD_OT_INSERT);
@@ -4459,16 +4616,30 @@ static struct dt_it *osd_it_iam_init(const struct lu_env *env,
         if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_READ))
                 return ERR_PTR(-EACCES);
 
-        it = &oti->oti_it;
-        ipd = osd_it_ipd_get(env, bag);
-        if (likely(ipd != NULL)) {
-                it->oi_obj = obj;
-                it->oi_ipd = ipd;
-                lu_object_get(lo);
-                iam_it_init(&it->oi_it, bag, IAM_IT_MOVE, ipd);
-                return (struct dt_it *)it;
-        }
-        return ERR_PTR(-ENOMEM);
+       if (oti->oti_it_inline) {
+               OBD_ALLOC_PTR(it);
+               if (it == NULL)
+                       return ERR_PTR(-ENOMEM);
+       } else {
+               it = &oti->oti_it;
+               oti->oti_it_inline = 1;
+       }
+
+       ipd = osd_it_ipd_get(env, bag);
+       if (likely(ipd != NULL)) {
+               it->oi_obj = obj;
+               it->oi_ipd = ipd;
+               lu_object_get(lo);
+               iam_it_init(&it->oi_it, bag, IAM_IT_MOVE, ipd);
+               return (struct dt_it *)it;
+       } else {
+               if (it != &oti->oti_it)
+                       OBD_FREE_PTR(it);
+               else
+                       oti->oti_it_inline = 0;
+
+               return ERR_PTR(-ENOMEM);
+       }
 }
 
 /**
@@ -4477,12 +4648,17 @@ static struct dt_it *osd_it_iam_init(const struct lu_env *env,
 
 static void osd_it_iam_fini(const struct lu_env *env, struct dt_it *di)
 {
-        struct osd_it_iam *it = (struct osd_it_iam *)di;
-        struct osd_object *obj = it->oi_obj;
-
-        iam_it_fini(&it->oi_it);
-        osd_ipd_put(env, &obj->oo_dir->od_container, it->oi_ipd);
-        lu_object_put(env, &obj->oo_dt.do_lu);
+       struct osd_thread_info  *oti = osd_oti_get(env);
+       struct osd_it_iam       *it  = (struct osd_it_iam *)di;
+       struct osd_object       *obj = it->oi_obj;
+
+       iam_it_fini(&it->oi_it);
+       osd_ipd_put(env, &obj->oo_dir->od_container, it->oi_ipd);
+       lu_object_put(env, &obj->oo_dt.do_lu);
+       if (it != &oti->oti_it)
+               OBD_FREE_PTR(it);
+       else
+               oti->oti_it_inline = 0;
 }
 
 /**
@@ -4720,29 +4896,40 @@ static const struct dt_index_operations osd_index_iam_ops = {
  *
  */
 static struct dt_it *osd_it_ea_init(const struct lu_env *env,
-                                    struct dt_object *dt,
-                                    __u32 attr,
-                                    struct lustre_capa *capa)
-{
-        struct osd_object       *obj  = osd_dt_obj(dt);
-        struct osd_thread_info  *info = osd_oti_get(env);
-        struct osd_it_ea        *it   = &info->oti_it_ea;
-       struct file             *file = &it->oie_file;
-        struct lu_object        *lo   = &dt->do_lu;
-        struct dentry           *obj_dentry = &info->oti_it_dentry;
-        ENTRY;
-        LASSERT(lu_object_exists(lo));
+                                   struct dt_object *dt,
+                                   __u32 attr,
+                                   struct lustre_capa *capa)
+{
+       struct osd_object       *obj  = osd_dt_obj(dt);
+       struct osd_thread_info  *info = osd_oti_get(env);
+       struct osd_it_ea        *it;
+       struct file             *file;
+       struct lu_object        *lo   = &dt->do_lu;
+       struct dentry           *obj_dentry = &info->oti_it_dentry;
+       ENTRY;
 
-        obj_dentry->d_inode = obj->oo_inode;
-        obj_dentry->d_sb = osd_sb(osd_obj2dev(obj));
-        obj_dentry->d_name.hash = 0;
+       LASSERT(lu_object_exists(lo));
 
-        it->oie_rd_dirent       = 0;
-        it->oie_it_dirent       = 0;
-        it->oie_dirent          = NULL;
-        it->oie_buf             = info->oti_it_ea_buf;
-        it->oie_obj             = obj;
+       if (info->oti_it_inline) {
+               OBD_ALLOC_PTR(it);
+               if (it == NULL)
+                       RETURN(ERR_PTR(-ENOMEM));
+       } else {
+               it = &info->oti_it_ea;
+               info->oti_it_inline = 1;
+       }
+
+       obj_dentry->d_inode = obj->oo_inode;
+       obj_dentry->d_sb = osd_sb(osd_obj2dev(obj));
+       obj_dentry->d_name.hash = 0;
+
+       it->oie_rd_dirent       = 0;
+       it->oie_it_dirent       = 0;
+       it->oie_dirent          = NULL;
+       it->oie_buf             = info->oti_it_ea_buf;
+       it->oie_obj             = obj;
 
+       file = &it->oie_file;
        /* Reset the "file" totally to avoid to reuse any old value from
         * former readdir handling, the "file->f_pos" should be zero. */
        memset(file, 0, sizeof(*file));
@@ -4767,14 +4954,19 @@ static struct dt_it *osd_it_ea_init(const struct lu_env *env,
  */
 static void osd_it_ea_fini(const struct lu_env *env, struct dt_it *di)
 {
-        struct osd_it_ea     *it   = (struct osd_it_ea *)di;
-        struct osd_object    *obj  = it->oie_obj;
-        struct inode       *inode  = obj->oo_inode;
+        struct osd_thread_info  *info  = osd_oti_get(env);
+        struct osd_it_ea       *it     = (struct osd_it_ea *)di;
+        struct osd_object      *obj    = it->oie_obj;
+        struct inode           *inode  = obj->oo_inode;
 
         ENTRY;
         it->oie_file.f_op->release(inode, &it->oie_file);
         lu_object_put(env, &obj->oo_dt.do_lu);
-        EXIT;
+       if (it != &info->oti_it_ea)
+               OBD_FREE_PTR(it);
+       else
+               info->oti_it_inline = 0;
+       EXIT;
 }
 
 /**
@@ -5211,24 +5403,18 @@ again:
        inode = osd_iget(info, dev, id);
        if (IS_ERR(inode)) {
                rc = PTR_ERR(inode);
-               if (rc == -ENOENT || rc == -ESTALE) {
-                       *attr |= LUDA_IGNORE;
-                       rc = 0;
-               } else {
+               if (rc == -ENOENT || rc == -ESTALE)
+                       rc = 1;
+               else
                        CDEBUG(D_LFSCK, "%.16s: fail to iget for dirent "
                               "check_repair, dir = %lu/%u, name = %.*s: "
                               "rc = %d\n",
                               devname, dir->i_ino, dir->i_generation,
                               ent->oied_namelen, ent->oied_name, rc);
-               }
 
                GOTO(out_journal, rc);
        }
 
-       /* skip the REMOTE_PARENT_DIR. */
-       if (inode == dev->od_mdt_map->omm_remote_parent->d_inode)
-               GOTO(out_inode, rc = 0);
-
        rc = osd_get_lma(info, inode, &info->oti_obj_dentry, lma);
        if (rc == 0) {
                LASSERT(!(lma->lma_compat & LMAC_NOT_IN_OI));
@@ -5408,11 +5594,15 @@ static inline int osd_it_ea_rec(const struct lu_env *env,
        int                     rc    = 0;
        ENTRY;
 
+       LASSERT(obj->oo_inode != dev->od_mdt_map->omm_remote_parent->d_inode);
+
        if (attr & LUDA_VERIFY) {
-               attr |= LUDA_TYPE;
-               if (unlikely(ino == osd_sb(dev)->s_root->d_inode->i_ino)) {
+               if (unlikely(ino == osd_remote_parent_ino(dev))) {
                        attr |= LUDA_IGNORE;
-                       rc = 0;
+                       /* If the parent is on remote MDT, and there
+                        * is no FID-in-dirent, then we have to get
+                        * the parent FID from the linkEA.  */
+                       osd_get_pfid_from_linkea(env, obj, fid);
                } else {
                        rc = osd_dirent_check_repair(env, obj, it, fid, id,
                                                     &attr);
@@ -5426,7 +5616,13 @@ static inline int osd_it_ea_rec(const struct lu_env *env,
                                   it->oie_dirent->oied_name[1] != '.'))
                                RETURN(-ENOENT);
 
-                       rc = osd_ea_fid_get(env, obj, ino, fid, id);
+                       if (unlikely(ino == osd_remote_parent_ino(dev)))
+                               /* If the parent is on remote MDT, and there
+                                * is no FID-in-dirent, then we have to get
+                                * the parent FID from the linkEA.  */
+                               rc = osd_get_pfid_from_linkea(env, obj, fid);
+                       else
+                               rc = osd_ea_fid_get(env, obj, ino, fid, id);
                } else {
                        osd_id_gen(id, ino, OSD_OII_NOGEN);
                }
@@ -5444,7 +5640,7 @@ static inline int osd_it_ea_rec(const struct lu_env *env,
        if (osd_remote_fid(env, dev, fid))
                RETURN(0);
 
-       if (likely(!(attr & LUDA_IGNORE)))
+       if (likely(!(attr & LUDA_IGNORE) && rc == 0))
                rc = osd_add_oi_cache(oti, dev, id, fid);
 
        if (!(attr & LUDA_VERIFY) &&
@@ -5454,7 +5650,7 @@ static inline int osd_it_ea_rec(const struct lu_env *env,
             ldiskfs_test_bit(osd_oi_fid2idx(dev, fid), sf->sf_oi_bitmap)))
                osd_consistency_check(oti, dev, oic);
 
-       RETURN(rc);
+       RETURN(rc > 0 ? 0 : rc);
 }
 
 /**
@@ -5614,6 +5810,7 @@ static void osd_key_fini(const struct lu_context *ctx,
        OBD_FREE(info->oti_it_ea_buf, OSD_IT_EA_BUFSIZE);
        lu_buf_free(&info->oti_iobuf.dr_pg_buf);
        lu_buf_free(&info->oti_iobuf.dr_bl_buf);
+       lu_buf_free(&info->oti_big_buf);
        OBD_FREE_PTR(info);
 }
 
@@ -5915,7 +6112,8 @@ static int osd_device_init0(const struct lu_env *env,
                GOTO(out_scrub, rc);
        }
 
-       LASSERT(l->ld_site->ls_linkage.next && l->ld_site->ls_linkage.prev);
+       LASSERT(l->ld_site->ls_linkage.next != NULL);
+       LASSERT(l->ld_site->ls_linkage.prev != NULL);
 
        /* initialize quota slave instance */
        o->od_quota_slave = qsd_init(env, o->od_svname, &o->od_dt_dev,
@@ -6008,13 +6206,12 @@ static int osd_process_config(const struct lu_env *env,
                break;
        case LCFG_PARAM:
                LASSERT(&o->od_dt_dev);
-               rc = class_process_proc_seq_param(PARAM_OSD,
-                                                 lprocfs_osd_obd_vars,
-                                                 cfg, &o->od_dt_dev);
+               rc = class_process_proc_param(PARAM_OSD, lprocfs_osd_obd_vars,
+                                             cfg, &o->od_dt_dev);
                if (rc > 0 || rc == -ENOSYS)
-                       rc = class_process_proc_seq_param(PARAM_OST,
-                                                         lprocfs_osd_obd_vars,
-                                                         cfg, &o->od_dt_dev);
+                       rc = class_process_proc_param(PARAM_OST,
+                                                     lprocfs_osd_obd_vars,
+                                                     cfg, &o->od_dt_dev);
                break;
        default:
                rc = -ENOSYS;
@@ -6177,9 +6374,6 @@ static int __init osd_mod_init(void)
 
        rc = class_register_type(&osd_obd_device_ops, NULL, true,
                                 lprocfs_osd_module_vars,
-#ifndef HAVE_ONLY_PROCFS_SEQ
-                                NULL,
-#endif
                                 LUSTRE_OSD_LDISKFS_NAME, &osd_device_type);
        if (rc)
                lu_kmem_fini(ldiskfs_caches);