Whamcloud - gitweb
LU-1534 osd: Fix LBUGs when destroying IGIF objects
[fs/lustre-release.git] / lustre / osd-ldiskfs / osd_handler.c
index e1dfcec..f71b522 100644 (file)
@@ -1,6 +1,4 @@
-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
- *
+/*
  * GPL HEADER START
  *
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
@@ -43,9 +41,6 @@
  *         Pravin Shelar <pravin.shelar@sun.com> : Added fid in dirent
  */
 
-#ifndef EXPORT_SYMTAB
-# define EXPORT_SYMTAB
-#endif
 #define DEBUG_SUBSYSTEM S_MDS
 
 #include <linux/module.h>
@@ -164,10 +159,7 @@ static int osd_write_locked(const struct lu_env *env, struct osd_object *o)
 static int osd_root_get(const struct lu_env *env,
                         struct dt_device *dev, struct lu_fid *f)
 {
-        struct inode *inode;
-
-        inode = osd_sb(osd_dt_dev(dev))->s_root->d_inode;
-        LU_IGIF_BUILD(f, inode->i_ino, inode->i_generation);
+        lu_local_obj_fid(f, OSD_FS_ROOT_OID);
         return 0;
 }
 
@@ -251,43 +243,101 @@ static struct lu_object *osd_object_alloc(const struct lu_env *env,
         }
 }
 
+static int osd_get_lma(struct inode *inode, struct dentry *dentry,
+                      struct lustre_mdt_attrs *lma)
+{
+       int rc;
+
+       dentry->d_inode = inode;
+       rc = inode->i_op->getxattr(dentry, XATTR_NAME_LMA, (void *)lma,
+                                  sizeof(*lma));
+       if (rc > 0) {
+               /* Check LMA compatibility */
+               if (lma->lma_incompat & ~cpu_to_le32(LMA_INCOMPAT_SUPP)) {
+                       CWARN("%.16s: unsupported incompat LMA feature(s) "
+                             "%lx/%#x\n",
+                             LDISKFS_SB(inode->i_sb)->s_es->s_volume_name,
+                             inode->i_ino, le32_to_cpu(lma->lma_incompat) &
+                                                       ~LMA_INCOMPAT_SUPP);
+                       rc = -ENOSYS;
+               } else {
+                       lustre_lma_swab(lma);
+                       rc = 0;
+               }
+       } else if (rc == 0) {
+               rc = -ENODATA;
+       }
+
+       return rc;
+}
+
 /*
  * retrieve object from backend ext fs.
  **/
-struct inode *osd_iget(struct osd_thread_info *info,
-                       struct osd_device *dev,
-                       const struct osd_inode_id *id)
-{
-        struct inode *inode = NULL;
-
-        inode = ldiskfs_iget(osd_sb(dev), id->oii_ino);
-        if (IS_ERR(inode)) {
-                CERROR("Cannot get inode, rc = %li\n", PTR_ERR(inode));
-        } else if (id->oii_gen != OSD_OII_NOGEN &&
-                   inode->i_generation != id->oii_gen) {
-                iput(inode);
-                inode = ERR_PTR(-ESTALE);
-        } else if (inode->i_nlink == 0) {
-                /* due to parallel readdir and unlink,
-                * we can have dead inode here. */
-                CWARN("stale inode\n");
-                make_bad_inode(inode);
-                iput(inode);
-                inode = ERR_PTR(-ESTALE);
-        } else if (is_bad_inode(inode)) {
-                CERROR("bad inode %lx\n",inode->i_ino);
-                iput(inode);
-                inode = ERR_PTR(-ENOENT);
-        } else {
-                /* Do not update file c/mtime in ldiskfs.
-                 * NB: we don't have any lock to protect this because we don't
-                 * have reference on osd_object now, but contention with
-                 * another lookup + attr_set can't happen in the tiny window
-                 * between if (...) and set S_NOCMTIME. */
-                if (!(inode->i_flags & S_NOCMTIME))
-                        inode->i_flags |= S_NOCMTIME;
-        }
-        return inode;
+struct inode *osd_iget(struct osd_thread_info *info, struct osd_device *dev,
+                      struct osd_inode_id *id)
+{
+       struct inode *inode = NULL;
+
+       inode = ldiskfs_iget(osd_sb(dev), id->oii_ino);
+       if (IS_ERR(inode)) {
+               CDEBUG(D_INODE, "no inode: ino = %u, rc = %ld\n",
+                      id->oii_ino, PTR_ERR(inode));
+       } else if (id->oii_gen != OSD_OII_NOGEN &&
+                  inode->i_generation != id->oii_gen) {
+               CDEBUG(D_INODE, "unmatched inode: ino = %u, gen0 = %u, "
+                      "gen1 = %u\n",
+                      id->oii_ino, id->oii_gen, inode->i_generation);
+               iput(inode);
+               inode = ERR_PTR(-ESTALE);
+       } else if (inode->i_nlink == 0) {
+               /* due to parallel readdir and unlink,
+               * we can have dead inode here. */
+               CDEBUG(D_INODE, "stale inode: ino = %u\n", id->oii_ino);
+               make_bad_inode(inode);
+               iput(inode);
+               inode = ERR_PTR(-ESTALE);
+       } else if (is_bad_inode(inode)) {
+               CWARN("%s: bad inode: ino = %u\n",
+               dev->od_dt_dev.dd_lu_dev.ld_obd->obd_name, id->oii_ino);
+               iput(inode);
+               inode = ERR_PTR(-ENOENT);
+       } else {
+               if (id->oii_gen == OSD_OII_NOGEN)
+                       osd_id_gen(id, inode->i_ino, inode->i_generation);
+
+               /* Do not update file c/mtime in ldiskfs.
+                * NB: we don't have any lock to protect this because we don't
+                * have reference on osd_object now, but contention with
+                * another lookup + attr_set can't happen in the tiny window
+                * between if (...) and set S_NOCMTIME. */
+               if (!(inode->i_flags & S_NOCMTIME))
+                       inode->i_flags |= S_NOCMTIME;
+       }
+       return inode;
+}
+
+struct inode *osd_iget_fid(struct osd_thread_info *info, struct osd_device *dev,
+                          struct osd_inode_id *id, struct lu_fid *fid)
+{
+       struct lustre_mdt_attrs *lma   = &info->oti_mdt_attrs;
+       struct inode            *inode;
+       int                      rc;
+
+       inode = osd_iget(info, dev, id);
+       if (IS_ERR(inode))
+               return inode;
+
+       rc = osd_get_lma(inode, &info->oti_obj_dentry, lma);
+       if (rc == 0) {
+               *fid = lma->lma_self_fid;
+       } else if (rc == -ENODATA) {
+               LU_IGIF_BUILD(fid, inode->i_ino, inode->i_generation);
+       } else {
+               iput(inode);
+               inode = ERR_PTR(rc);
+       }
+       return inode;
 }
 
 static int osd_fid_lookup(const struct lu_env *env,
@@ -302,7 +352,7 @@ static int osd_fid_lookup(const struct lu_env *env,
 
         LINVRNT(osd_invariant(obj));
         LASSERT(obj->oo_inode == NULL);
-        LASSERTF(fid_is_sane(fid) || osd_fid_is_root(fid), DFID, PFID(fid));
+        LASSERTF(fid_is_sane(fid) || fid_is_idif(fid), DFID, PFID(fid));
         /*
          * This assertion checks that osd layer sees only local
          * fids. Unfortunately it is somewhat expensive (does a
@@ -348,18 +398,20 @@ static int osd_fid_lookup(const struct lu_env *env,
         }
 
         if (!S_ISDIR(inode->i_mode) || !ldiskfs_pdo) /* done */
-                goto out;
+               GOTO(out, result = 0);
+
+       LASSERT(obj->oo_hl_head == NULL);
+       obj->oo_hl_head = ldiskfs_htree_lock_head_alloc(HTREE_HBITS_DEF);
+       if (obj->oo_hl_head == NULL) {
+               obj->oo_inode = NULL;
+               iput(inode);
+               GOTO(out, result = -ENOMEM);
+       }
+       GOTO(out, result = 0);
 
-        LASSERT(obj->oo_hl_head == NULL);
-        obj->oo_hl_head = ldiskfs_htree_lock_head_alloc(HTREE_HBITS_DEF);
-        if (obj->oo_hl_head == NULL) {
-                obj->oo_inode = NULL;
-                iput(inode);
-                result = -ENOMEM;
-        }
 out:
-        LINVRNT(osd_invariant(obj));
-        RETURN(result);
+       LINVRNT(osd_invariant(obj));
+       return result;
 }
 
 /*
@@ -559,9 +611,11 @@ static void osd_trans_commit_cb(struct journal_callback *jcb, int error)
 
         dt_txn_hook_commit(th);
 
-        /* call per-transaction callbacks if any */
-        cfs_list_for_each_entry_safe(dcb, tmp, &oh->ot_dcb_list, dcb_linkage)
-                dcb->dcb_func(NULL, th, dcb, error);
+       /* call per-transaction callbacks if any */
+       cfs_list_for_each_entry_safe(dcb, tmp, &oh->ot_dcb_list, dcb_linkage) {
+               cfs_list_del_init(&dcb->dcb_linkage);
+               dcb->dcb_func(NULL, th, dcb, error);
+       }
 
         lu_ref_del_at(&lud->ld_reference, oh->ot_dev_link, "osd-tx", th);
         lu_device_put(lud);
@@ -751,7 +805,8 @@ static int osd_trans_stop(const struct lu_env *env, struct thandle *th)
          * IMPORTANT: we have to wait till any IO submited by the thread is
          * completed otherwise iobuf may be corrupted by different request
          */
-        cfs_wait_event(iobuf->dr_wait, cfs_atomic_read(&iobuf->dr_numreqs)==0);
+        cfs_wait_event(iobuf->dr_wait,
+                       cfs_atomic_read(&iobuf->dr_numreqs) == 0);
         if (!rc)
                 rc = iobuf->dr_error;
 
@@ -825,24 +880,39 @@ static int osd_object_print(const struct lu_env *env, void *cookie,
  * Concurrency: shouldn't matter.
  */
 int osd_statfs(const struct lu_env *env, struct dt_device *d,
-               cfs_kstatfs_t *sfs)
+               struct obd_statfs *sfs)
 {
-        struct osd_device *osd = osd_dt_dev(d);
+        struct osd_device  *osd = osd_dt_dev(d);
         struct super_block *sb = osd_sb(osd);
+        struct kstatfs     *ksfs;
         int result = 0;
 
+        /* osd_lproc.c call this without env, allocate ksfs for that case */
+        if (unlikely(env == NULL)) {
+                OBD_ALLOC_PTR(ksfs);
+                if (ksfs == NULL)
+                        return -ENOMEM;
+        } else {
+                ksfs = &osd_oti_get(env)->oti_ksfs;
+        }
+
         cfs_spin_lock(&osd->od_osfs_lock);
         /* cache 1 second */
         if (cfs_time_before_64(osd->od_osfs_age, cfs_time_shift_64(-1))) {
-                result = ll_do_statfs(sb, &osd->od_kstatfs);
-                if (likely(result == 0)) /* N.B. statfs can't really fail */
+                result = ll_do_statfs(sb, ksfs);
+                if (likely(result == 0)) /* N.B. statfs can't really fail */
                         osd->od_osfs_age = cfs_time_current_64();
+                        statfs_pack(&osd->od_statfs, ksfs);
+                }
         }
 
         if (likely(result == 0))
-                *sfs = osd->od_kstatfs;
+                *sfs = osd->od_statfs;
         cfs_spin_unlock(&osd->od_osfs_lock);
 
+        if (unlikely(env == NULL))
+                OBD_FREE_PTR(ksfs);
+
         return result;
 }
 
@@ -860,7 +930,7 @@ static void osd_conf_get(const struct lu_env *env,
          */
         param->ddp_max_name_len = LDISKFS_NAME_LEN;
         param->ddp_max_nlink    = LDISKFS_LINK_MAX;
-        param->ddp_block_shift  = osd_sb(osd_dt_dev(dev))->s_blocksize_bits;
+       param->ddp_block_shift  = sb->s_blocksize_bits;
         param->ddp_mntopts      = 0;
         if (test_opt(sb, XATTR_USER))
                 param->ddp_mntopts |= MNTOPT_USERXATTR;
@@ -987,7 +1057,7 @@ const int osd_dto_credits_noquota[DTO_NR] = {
         [DTO_INDEX_INSERT]  = 16,
         [DTO_INDEX_DELETE]  = 16,
         /**
-         * Unused now
+        * Used for OI scrub
          */
         [DTO_INDEX_UPDATE]  = 16,
         /**
@@ -1311,28 +1381,6 @@ static int osd_inode_setattr(const struct lu_env *env,
 
         LASSERT(!(bits & LA_TYPE)); /* Huh? You want too much. */
 
-#ifdef HAVE_QUOTA_SUPPORT
-        if ((bits & LA_UID && attr->la_uid != inode->i_uid) ||
-            (bits & LA_GID && attr->la_gid != inode->i_gid)) {
-                struct osd_ctxt *save = &osd_oti_get(env)->oti_ctxt;
-                struct iattr iattr;
-                int rc;
-
-                iattr.ia_valid = 0;
-                if (bits & LA_UID)
-                        iattr.ia_valid |= ATTR_UID;
-                if (bits & LA_GID)
-                        iattr.ia_valid |= ATTR_GID;
-                iattr.ia_uid = attr->la_uid;
-                iattr.ia_gid = attr->la_gid;
-                osd_push_ctxt(env, save);
-                rc = ll_vfs_dq_transfer(inode, &iattr) ? -EDQUOT : 0;
-                osd_pop_ctxt(save);
-                if (rc != 0)
-                        return rc;
-        }
-#endif
-
         if (bits & LA_ATIME)
                 inode->i_atime  = *osd_inode_time(env, inode, attr->la_atime);
         if (bits & LA_CTIME)
@@ -1377,6 +1425,7 @@ static int osd_attr_set(const struct lu_env *env,
                         struct lustre_capa *capa)
 {
         struct osd_object *obj = osd_dt_obj(dt);
+        struct inode      *inode;
         int rc;
 
         LASSERT(handle != NULL);
@@ -1388,12 +1437,35 @@ static int osd_attr_set(const struct lu_env *env,
 
         OSD_EXEC_OP(handle, attr_set);
 
+        inode = obj->oo_inode;
+#ifdef HAVE_QUOTA_SUPPORT
+        if ((attr->la_valid & LA_UID && attr->la_uid != inode->i_uid) ||
+            (attr->la_valid & LA_GID && attr->la_gid != inode->i_gid)) {
+                struct osd_ctxt *save = &osd_oti_get(env)->oti_ctxt;
+                struct iattr iattr;
+                int rc;
+
+                iattr.ia_valid = 0;
+                if (attr->la_valid & LA_UID)
+                        iattr.ia_valid |= ATTR_UID;
+                if (attr->la_valid & LA_GID)
+                        iattr.ia_valid |= ATTR_GID;
+                iattr.ia_uid = attr->la_uid;
+                iattr.ia_gid = attr->la_gid;
+                osd_push_ctxt(env, save);
+                rc = ll_vfs_dq_transfer(inode, &iattr) ? -EDQUOT : 0;
+                osd_pop_ctxt(save);
+                if (rc != 0)
+                        return rc;
+        }
+#endif
+
         cfs_spin_lock(&obj->oo_guard);
-        rc = osd_inode_setattr(env, obj->oo_inode, attr);
+        rc = osd_inode_setattr(env, inode, attr);
         cfs_spin_unlock(&obj->oo_guard);
 
         if (!rc)
-                obj->oo_inode->i_sb->s_op->dirty_inode(obj->oo_inode);
+                inode->i_sb->s_op->dirty_inode(inode);
         return rc;
 }
 
@@ -1432,7 +1504,7 @@ static int osd_mkfile(struct osd_thread_info *info, struct osd_object *obj,
         int result;
         struct osd_device  *osd = osd_obj2dev(obj);
         struct osd_thandle *oth;
-        struct dt_object   *parent;
+        struct dt_object   *parent = NULL;
         struct inode       *inode;
 #ifdef HAVE_QUOTA_SUPPORT
         struct osd_ctxt    *save = &info->oti_ctxt;
@@ -1453,15 +1525,13 @@ static int osd_mkfile(struct osd_thread_info *info, struct osd_object *obj,
 
         if (hint && hint->dah_parent)
                 parent = hint->dah_parent;
-        else
-                parent = osd->od_obj_area;
 
 #ifdef HAVE_QUOTA_SUPPORT
         osd_push_ctxt(info->oti_env, save);
 #endif
         inode = ldiskfs_create_inode(oth->ot_handle,
-                                     parent ?  osd_dt_obj(parent)->oo_inode :
-                                               osd_sb(osd)->s_root->d_inode,
+                                     parent ? osd_dt_obj(parent)->oo_inode :
+                                              osd_sb(osd)->s_root->d_inode,
                                      mode);
 #ifdef HAVE_QUOTA_SUPPORT
         osd_pop_ctxt(save);
@@ -1471,6 +1541,7 @@ static int osd_mkfile(struct osd_thread_info *info, struct osd_object *obj,
                  * NB: don't need any lock because no contention at this
                  * early stage */
                 inode->i_flags |= S_NOCMTIME;
+               inode->i_state |= I_LUSTRE_NOSCRUB;
                 obj->oo_inode = inode;
                 result = 0;
         } else {
@@ -1684,11 +1755,8 @@ static int __osd_oi_insert(const struct lu_env *env, struct osd_object *obj,
         LASSERT(obj->oo_inode != NULL);
         LASSERT(uc != NULL);
 
-        id->oii_ino = obj->oo_inode->i_ino;
-        id->oii_gen = obj->oo_inode->i_generation;
-
-        return osd_oi_insert(info, osd, fid, id, th,
-                             uc->mu_cap & CFS_CAP_SYS_RESOURCE_MASK);
+       osd_id_gen(id, obj->oo_inode->i_ino, obj->oo_inode->i_generation);
+       return osd_oi_insert(info, osd, fid, id, th);
 }
 
 static int osd_declare_object_create(const struct lu_env *env,
@@ -1807,6 +1875,9 @@ static int osd_object_destroy(const struct lu_env *env,
         LASSERT(inode);
         LASSERT(!lu_object_is_dying(dt->do_lu.lo_header));
 
+       /* Parallel control for OI scrub. For most of cases, there is no
+        * lock contention. So it will not affect unlink performance. */
+       cfs_mutex_lock(&inode->i_mutex);
         if (S_ISDIR(inode->i_mode)) {
                 LASSERT(osd_inode_unlinked(inode) ||
                         inode->i_nlink == 1);
@@ -1821,6 +1892,7 @@ static int osd_object_destroy(const struct lu_env *env,
         OSD_EXEC_OP(th, destroy);
 
         result = osd_oi_delete(osd_oti_get(env), osd, fid, th);
+       cfs_mutex_unlock(&inode->i_mutex);
 
         /* XXX: add to ext3 orphan list */
         /* rc = ext3_orphan_add(handle_t *handle, struct inode *inode) */
@@ -1846,7 +1918,6 @@ static int __osd_xattr_set(const struct lu_env *env, struct dt_object *dt,
 
         LASSERT(dt_object_exists(dt));
         LASSERT(inode->i_op != NULL && inode->i_op->setxattr != NULL);
-        LASSERT(osd_write_locked(env, obj));
 
         if (fl & LU_XATTR_REPLACE)
                 fs_flags |= XATTR_REPLACE;
@@ -1885,15 +1956,6 @@ static int osd_ea_fid_set(const struct lu_env *env, struct dt_object *dt,
 }
 
 /**
- * Helper function to form igif
- */
-static inline void osd_igif_get(const struct lu_env *env, struct inode  *inode,
-                                struct lu_fid *fid)
-{
-        LU_IGIF_BUILD(fid, inode->i_ino, inode->i_generation);
-}
-
-/**
  * ldiskfs supports fid in dirent, it is passed in dentry->d_fsdata.
  * lustre 1.8 also uses d_fsdata for passing other info to ldiskfs.
  * To have compatilibility with 1.8 ldiskfs driver we need to have
@@ -1920,54 +1982,20 @@ void osd_get_ldiskfs_dirent_param(struct ldiskfs_dentry_param *param,
  * \retval 0 on success
  */
 static int osd_ea_fid_get(const struct lu_env *env, struct osd_object *obj,
-                          __u32 ino, struct lu_fid *fid)
+                         __u32 ino, struct lu_fid *fid)
 {
-        struct osd_thread_info  *info      = osd_oti_get(env);
-        struct lustre_mdt_attrs *mdt_attrs = &info->oti_mdt_attrs;
-        struct lu_device        *ldev   = obj->oo_dt.do_lu.lo_dev;
-        struct dentry           *dentry = &info->oti_child_dentry;
-        struct osd_inode_id     *id     = &info->oti_id;
-        struct osd_device       *dev;
-        struct inode            *inode;
-        int                      rc;
-
-        ENTRY;
-        dev  = osd_dev(ldev);
+       struct osd_thread_info  *info = osd_oti_get(env);
+       struct osd_inode_id     *id = &info->oti_id;
+       struct inode            *inode;
+       ENTRY;
 
-        id->oii_ino = ino;
-        id->oii_gen = OSD_OII_NOGEN;
+       osd_id_gen(id, ino, OSD_OII_NOGEN);
+       inode = osd_iget_fid(info, osd_obj2dev(obj), id, fid);
+       if (IS_ERR(inode))
+               RETURN(PTR_ERR(inode));
 
-        inode = osd_iget(info, dev, id);
-        if (IS_ERR(inode)) {
-                rc = PTR_ERR(inode);
-                GOTO(out,rc);
-        }
-        dentry->d_inode = inode;
-
-        LASSERT(inode->i_op != NULL && inode->i_op->getxattr != NULL);
-        rc = inode->i_op->getxattr(dentry, XATTR_NAME_LMA, (void *)mdt_attrs,
-                                   sizeof *mdt_attrs);
-
-        /* Check LMA compatibility */
-        if (rc > 0 &&
-            (mdt_attrs->lma_incompat & ~cpu_to_le32(LMA_INCOMPAT_SUPP))) {
-                CWARN("Inode %lx: Unsupported incompat LMA feature(s) %#x\n",
-                      inode->i_ino, le32_to_cpu(mdt_attrs->lma_incompat) &
-                      ~LMA_INCOMPAT_SUPP);
-                return -ENOSYS;
-        }
-
-        if (rc > 0) {
-                lustre_lma_swab(mdt_attrs);
-                memcpy(fid, &mdt_attrs->lma_self_fid, sizeof(*fid));
-                rc = 0;
-        } else if (rc == -ENODATA) {
-                osd_igif_get(env, inode, fid);
-                rc = 0;
-        }
-        iput(inode);
-out:
-        RETURN(rc);
+       iput(inode);
+       RETURN(0);
 }
 
 /**
@@ -1999,7 +2027,6 @@ static int osd_object_ea_create(const struct lu_env *env, struct dt_object *dt,
         OSD_EXEC_OP(th, create);
 
         result = __osd_object_create(info, obj, attr, hint, dof, th);
-
         /* objects under osd root shld have igif fid, so dont add fid EA */
         if (result == 0 && fid_seq(fid) >= FID_SEQ_NORMAL)
                 result = osd_ea_fid_set(env, dt, fid);
@@ -2063,7 +2090,7 @@ static int osd_object_ref_add(const struct lu_env *env,
                     inode->i_nlink == 2)
                         inode->i_nlink = 1;
         }
-        LASSERT(inode->i_nlink < LDISKFS_LINK_MAX);
+        LASSERT(inode->i_nlink <= LDISKFS_LINK_MAX);
         cfs_spin_unlock(&obj->oo_guard);
         inode->i_sb->s_op->dirty_inode(inode);
         LINVRNT(osd_invariant(obj));
@@ -2902,7 +2929,7 @@ static int osd_index_iam_insert(const struct lu_env *env, struct dt_object *dt,
         LASSERT(th != NULL);
 
         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT))
-                return -EACCES;
+               RETURN(-EACCES);
 
         OSD_EXEC_OP(th, insert);
 
@@ -2957,13 +2984,19 @@ static int __osd_ea_add_rec(struct osd_thread_info *info,
 
         child = osd_child_dentry_get(info->oti_env, pobj, name, strlen(name));
 
+        /* XXX: remove fid_is_igif() check here.
+         * IGIF check is just to handle insertion of .. when it is 'ROOT',
+         * it is IGIF now but needs FID in dir entry as well for readdir
+         * to work.
+         * LU-838 should fix that and remove fid_is_igif() check */
         if (fid_is_igif((struct lu_fid *)fid) ||
             fid_is_norm((struct lu_fid *)fid)) {
                 ldp = (struct ldiskfs_dentry_param *)info->oti_ldp;
                 osd_get_ldiskfs_dirent_param(ldp, fid);
-                child->d_fsdata = (void*) ldp;
-        } else
+                child->d_fsdata = (void *)ldp;
+        } else {
                 child->d_fsdata = NULL;
+        }
         rc = osd_ldiskfs_add_entry(oth->ot_handle, child, cinode, hlock);
 
         RETURN(rc);
@@ -2988,10 +3021,10 @@ static int osd_add_dot_dotdot(struct osd_thread_info *info,
                               const struct dt_rec *dot_dot_fid,
                               struct thandle *th)
 {
-        struct inode            *inode  = dir->oo_inode;
+        struct inode                *inode = dir->oo_inode;
         struct ldiskfs_dentry_param *dot_ldp;
         struct ldiskfs_dentry_param *dot_dot_ldp;
-        struct osd_thandle      *oth;
+        struct osd_thandle          *oth;
         int result = 0;
 
         oth = container_of(th, struct osd_thandle, ot_super);
@@ -3012,7 +3045,7 @@ static int osd_add_dot_dotdot(struct osd_thread_info *info,
 
                 if (!dir->oo_compat_dot_created)
                         return -EINVAL;
-                if (fid_seq((struct lu_fid *)dot_fid) >= FID_SEQ_NORMAL) {
+                if (!fid_is_igif((struct lu_fid *)dot_fid)) {
                         osd_get_ldiskfs_dirent_param(dot_ldp, dot_fid);
                         osd_get_ldiskfs_dirent_param(dot_dot_ldp, dot_dot_fid);
                 } else {
@@ -3158,7 +3191,7 @@ struct osd_object *osd_object_find(const struct lu_env *env,
                         else
                                 LU_OBJECT_DEBUG(D_ERROR, env, luch,
                                                 "lu_object can't be located"
-                                                ""DFID"\n", PFID(fid));
+                                               DFID"\n", PFID(fid));
 
                         if (child == NULL) {
                                 lu_object_put(env, luch);
@@ -3169,6 +3202,7 @@ struct osd_object *osd_object_find(const struct lu_env *env,
                         LU_OBJECT_DEBUG(D_ERROR, env, luch,
                                         "lu_object does not exists "DFID"\n",
                                         PFID(fid));
+                       lu_object_put(env, luch);
                         child = ERR_PTR(-ENOENT);
                 }
         } else
@@ -3437,37 +3471,44 @@ static int osd_it_iam_rec(const struct lu_env *env,
                           const struct dt_it *di,
                           struct dt_rec *dtrec, __u32 attr)
 {
-        struct osd_it_iam *it        = (struct osd_it_iam *)di;
-        struct osd_thread_info *info = osd_oti_get(env);
-        struct lu_fid     *fid       = &info->oti_fid;
-        const struct osd_fid_pack *rec;
-        struct lu_dirent *lde = (struct lu_dirent *)dtrec;
-        char *name;
-        int namelen;
-        __u64 hash;
-        int rc;
+       struct osd_it_iam      *it   = (struct osd_it_iam *)di;
+       struct osd_thread_info *info = osd_oti_get(env);
+       ENTRY;
 
-        name = (char *)iam_it_key_get(&it->oi_it);
-        if (IS_ERR(name))
-                RETURN(PTR_ERR(name));
+       if (S_ISDIR(it->oi_obj->oo_inode->i_mode)) {
+               const struct osd_fid_pack *rec;
+               struct lu_fid             *fid = &info->oti_fid;
+               struct lu_dirent          *lde = (struct lu_dirent *)dtrec;
+               char                      *name;
+               int                        namelen;
+               __u64                      hash;
+               int                        rc;
 
-        namelen = iam_it_key_size(&it->oi_it);
+               name = (char *)iam_it_key_get(&it->oi_it);
+               if (IS_ERR(name))
+                       RETURN(PTR_ERR(name));
 
-        rec = (const struct osd_fid_pack *) iam_it_rec_get(&it->oi_it);
-        if (IS_ERR(rec))
-                RETURN(PTR_ERR(rec));
+               namelen = iam_it_key_size(&it->oi_it);
 
-        rc = osd_fid_unpack(fid, rec);
-        if (rc)
-                RETURN(rc);
+               rec = (const struct osd_fid_pack *)iam_it_rec_get(&it->oi_it);
+               if (IS_ERR(rec))
+                       RETURN(PTR_ERR(rec));
 
-        hash = iam_it_store(&it->oi_it);
+               rc = osd_fid_unpack(fid, rec);
+               if (rc)
+                       RETURN(rc);
 
-        /* IAM does not store object type in IAM index (dir) */
-        osd_it_pack_dirent(lde, fid, hash, name, namelen,
-                           0, LUDA_FID);
+               hash = iam_it_store(&it->oi_it);
 
-        return 0;
+               /* IAM does not store object type in IAM index (dir) */
+               osd_it_pack_dirent(lde, fid, hash, name, namelen,
+                                  0, LUDA_FID);
+       } else {
+               iam_reccpy(&it->oi_it.ii_path.ip_leaf,
+                          (struct iam_rec *)dtrec);
+       }
+
+       RETURN(0);
 }
 
 /**
@@ -3959,7 +4000,7 @@ static void osd_key_exit(const struct lu_context *ctx,
 LU_TYPE_INIT_FINI(osd, &osd_key);
 
 struct lu_context_key osd_key = {
-        .lct_tags = LCT_DT_THREAD | LCT_MD_THREAD,
+        .lct_tags = LCT_DT_THREAD | LCT_MD_THREAD | LCT_MG_THREAD | LCT_LOCAL,
         .lct_init = osd_key_init,
         .lct_fini = osd_key_fini,
         .lct_exit = osd_key_exit
@@ -3974,21 +4015,16 @@ static int osd_device_init(const struct lu_env *env, struct lu_device *d,
 
 static int osd_shutdown(const struct lu_env *env, struct osd_device *o)
 {
-        struct osd_thread_info *info = osd_oti_get(env);
-        ENTRY;
-        if (o->od_obj_area != NULL) {
-                lu_object_put(env, &o->od_obj_area->do_lu);
-                o->od_obj_area = NULL;
-        }
-        if (o->od_oi_table != NULL)
-                osd_oi_fini(info, o);
+       ENTRY;
 
-        if (o->od_fsops) {
-                fsfilt_put_ops(o->od_fsops);
-                o->od_fsops = NULL;
-        }
+       osd_scrub_cleanup(env, o);
 
-        RETURN(0);
+       if (o->od_fsops) {
+               fsfilt_put_ops(o->od_fsops);
+       o->od_fsops = NULL;
+       }
+
+       RETURN(0);
 }
 
 static int osd_mount(const struct lu_env *env,
@@ -3998,6 +4034,7 @@ static int osd_mount(const struct lu_env *env,
         const char               *dev  = lustre_cfg_string(cfg, 0);
         struct lustre_disk_data  *ldd;
         struct lustre_sb_info    *lsi;
+        int                       rc = 0;
 
         ENTRY;
 
@@ -4022,18 +4059,24 @@ static int osd_mount(const struct lu_env *env,
         LASSERT(lmi != NULL);
         /* save lustre_mount_info in dt_device */
         o->od_mount = lmi;
+        o->od_mnt = lmi->lmi_mnt;
 
         lsi = s2lsi(lmi->lmi_sb);
         ldd = lsi->lsi_ldd;
 
         if (ldd->ldd_flags & LDD_F_IAM_DIR) {
                 o->od_iop_mode = 0;
-                LCONSOLE_WARN("OSD: IAM mode enabled\n");
+                LCONSOLE_WARN("%s: OSD: IAM mode enabled\n", dev);
         } else
                 o->od_iop_mode = 1;
 
-        o->od_obj_area = NULL;
-        RETURN(0);
+        if (ldd->ldd_flags & LDD_F_SV_TYPE_OST) {
+                rc = osd_compat_init(o);
+                if (rc)
+                        CERROR("%s: can't initialize compats: %d\n", dev, rc);
+        }
+
+        RETURN(rc);
 }
 
 static struct lu_device *osd_device_fini(const struct lu_env *env,
@@ -4042,6 +4085,8 @@ static struct lu_device *osd_device_fini(const struct lu_env *env,
         int rc;
         ENTRY;
 
+        osd_compat_fini(osd_dev(d));
+
         shrink_dcache_sb(osd_sb(osd_dev(d)));
         osd_sync(env, lu2dt_dev(d));
 
@@ -4076,6 +4121,7 @@ static struct lu_device *osd_device_alloc(const struct lu_env *env,
                         l->ld_ops = &osd_lu_ops;
                         o->od_dt_dev.dd_ops = &osd_dt_ops;
                         cfs_spin_lock_init(&o->od_osfs_lock);
+                       cfs_mutex_init(&o->od_otable_mutex);
                         o->od_osfs_age = cfs_time_shift_64(-1000);
                         o->od_capa_hash = init_capa_hash();
                         if (o->od_capa_hash == NULL) {
@@ -4131,48 +4177,23 @@ static int osd_recovery_complete(const struct lu_env *env,
         RETURN(0);
 }
 
-static int osd_prepare(const struct lu_env *env,
-                       struct lu_device *pdev,
+static int osd_prepare(const struct lu_env *env, struct lu_device *pdev,
                        struct lu_device *dev)
 {
-        struct osd_device *osd = osd_dev(dev);
-        struct lustre_sb_info *lsi;
-        struct lustre_disk_data *ldd;
-        struct lustre_mount_info  *lmi;
-        struct osd_thread_info *oti = osd_oti_get(env);
-        struct dt_object *d;
-        int result;
+       struct osd_device *osd = osd_dev(dev);
+       int                result;
+       ENTRY;
 
-        ENTRY;
-        /* 1. initialize oi before any file create or file open */
-        result = osd_oi_init(oti, osd);
+       /* 1. setup scrub, including OI files initialization */
+       result = osd_scrub_setup(env, osd);
         if (result < 0)
                 RETURN(result);
 
         if (!lu_device_is_md(pdev))
                 RETURN(0);
 
-        lmi = osd->od_mount;
-        lsi = s2lsi(lmi->lmi_sb);
-        ldd = lsi->lsi_ldd;
-
         /* 2. setup local objects */
         result = llo_local_objects_setup(env, lu2md_dev(pdev), lu2dt_dev(dev));
-        if (result)
-                goto out;
-
-        /* 3. open remote object dir */
-        d = dt_store_open(env, lu2dt_dev(dev), "",
-                          remote_obj_dir, &oti->oti_fid);
-        if (!IS_ERR(d)) {
-                osd->od_obj_area = d;
-                result = 0;
-        } else {
-                result = PTR_ERR(d);
-                osd->od_obj_area = NULL;
-        }
-
-out:
         RETURN(result);
 }
 
@@ -4210,7 +4231,7 @@ static struct lu_device_type osd_device_type = {
         .ldt_tags     = LU_DEVICE_DT,
         .ldt_name     = LUSTRE_OSD_NAME,
         .ldt_ops      = &osd_device_type_ops,
-        .ldt_ctx_tags = LCT_MD_THREAD|LCT_DT_THREAD
+        .ldt_ctx_tags = LCT_LOCAL,
 };
 
 /*
@@ -4220,19 +4241,11 @@ static struct obd_ops osd_obd_device_ops = {
         .o_owner = THIS_MODULE
 };
 
-static struct lu_local_obj_desc llod_osd_rem_obj_dir = {
-        .llod_name      = remote_obj_dir,
-        .llod_oid       = OSD_REM_OBJ_DIR_OID,
-        .llod_is_index  = 1,
-        .llod_feat      = &dt_directory_features,
-};
-
 static int __init osd_mod_init(void)
 {
         struct lprocfs_static_vars lvars;
 
         osd_oi_mod_init();
-        llo_local_obj_register(&llod_osd_rem_obj_dir);
         lprocfs_osd_init_vars(&lvars);
         return class_register_type(&osd_obd_device_ops, NULL, lvars.module_vars,
                                    LUSTRE_OSD_NAME, &osd_device_type);
@@ -4240,7 +4253,6 @@ static int __init osd_mod_init(void)
 
 static void __exit osd_mod_exit(void)
 {
-        llo_local_obj_unregister(&llod_osd_rem_obj_dir);
         class_unregister_type(LUSTRE_OSD_NAME);
 }