Whamcloud - gitweb
b=19669
[fs/lustre-release.git] / lustre / osd / osd_handler.c
index 0d4b6be..c584859 100644 (file)
@@ -85,7 +85,6 @@
 /* llo_* api support */
 #include <md_object.h>
 
-static const char MDT_XATTR_NAME[] = "trusted.lma";
 static const char dot[] = ".";
 static const char dotdot[] = "..";
 static const char remote_obj_dir[] = "REM_OBJ_DIR";
@@ -93,7 +92,6 @@ static const char remote_obj_dir[] = "REM_OBJ_DIR";
 struct osd_directory {
         struct iam_container od_container;
         struct iam_descr     od_descr;
-        struct semaphore     od_sem;
 };
 
 struct osd_object {
@@ -106,6 +104,10 @@ struct osd_object {
          * creation, or assigned by osd_object_create() under write lock).
          */
         struct inode          *oo_inode;
+        /**
+         * to protect index ops.
+         */
+        struct rw_semaphore    oo_ext_idx_sem;
         struct rw_semaphore    oo_sem;
         struct osd_directory  *oo_dir;
         /** protects inode attributes. */
@@ -403,6 +405,7 @@ static struct lu_object *osd_object_alloc(const struct lu_env *env,
 
                 l->lo_ops = &osd_lu_obj_ops;
                 init_rwsem(&mo->oo_sem);
+                init_rwsem(&mo->oo_ext_idx_sem);
                 spin_lock_init(&mo->oo_guard);
                 return l;
         } else
@@ -520,17 +523,22 @@ static int osd_inode_remove(const struct lu_env *env, struct osd_object *obj)
         struct osd_device      *osd = osd_obj2dev(obj);
         struct osd_thread_info *oti = osd_oti_get(env);
         struct txn_param       *prm = &oti->oti_txn;
+        struct lu_env          *env_del_obj = &oti->oti_obj_delete_tx_env;
         struct thandle         *th;
         int result;
 
+        lu_env_init(env_del_obj, LCT_DT_THREAD);
         txn_param_init(prm, OSD_TXN_OI_DELETE_CREDITS +
                             OSD_TXN_INODE_DELETE_CREDITS);
-        th = osd_trans_start(env, &osd->od_dt_dev, prm);
+        th = osd_trans_start(env_del_obj, &osd->od_dt_dev, prm);
         if (!IS_ERR(th)) {
-                result = osd_oi_delete(oti, &osd->od_oi, fid, th);
-                osd_trans_stop(env, th);
+                result = osd_oi_delete(osd_oti_get(env_del_obj),
+                                       &osd->od_oi, fid, th);
+                osd_trans_stop(env_del_obj, th);
         } else
                 result = PTR_ERR(th);
+
+        lu_env_fini(env_del_obj);
         return result;
 }
 
@@ -956,7 +964,7 @@ static const int osd_dto_credits_quota[DTO_NR] = {
         [DTO_INDEX_DELETE]  = 20,
         /**
          * Unused now.
-         */ 
+         */
         [DTO_IDNEX_UPDATE]  = 16,
         /*
          * Create a object. Same as create object in EXT3 filesystem.
@@ -972,7 +980,7 @@ static const int osd_dto_credits_quota[DTO_NR] = {
          * INDEX_EXTRA_BLOCKS(8) +
          * 3(inode bits, groups, GDT) +
          * QUOTA(?)
-         */ 
+         */
         [DTO_OBJECT_DELETE] = 27,
         /**
          * Attr set credits.
@@ -1263,7 +1271,11 @@ static int osd_inode_setattr(const struct lu_env *env,
                 struct iattr iattr;
                 int rc;
 
-                iattr.ia_valid = bits & (LA_UID | LA_GID);
+                iattr.ia_valid = 0;
+                if (bits & LA_UID)
+                        iattr.ia_valid |= ATTR_UID;
+                if (bits & LA_GID)
+                        iattr.ia_valid |= ATTR_GID;
                 iattr.ia_uid = attr->la_uid;
                 iattr.ia_gid = attr->la_gid;
                 osd_push_ctxt(env, save);
@@ -1284,14 +1296,14 @@ static int osd_inode_setattr(const struct lu_env *env,
                 LDISKFS_I(inode)->i_disksize = attr->la_size;
                 i_size_write(inode, attr->la_size);
         }
-# if 0
-        /*
-         * OSD should not change "i_blocks" which is used by quota.
+
+        /* OSD should not change "i_blocks" which is used by quota.
          * "i_blocks" should be changed by ldiskfs only.
-         * Disable this assignment until SOM to fix some EA field. */
+         * Enable this assignment for SOM purpose now, until it is
+         * stored in SOM EA. */
         if (bits & LA_BLOCKS)
                 inode->i_blocks = attr->la_blocks;
-#endif
+
         if (bits & LA_MODE)
                 inode->i_mode   = (inode->i_mode & S_IFMT) |
                         (attr->la_mode & ~S_IFMT);
@@ -1328,8 +1340,6 @@ static int osd_create_pre(struct osd_thread_info *info, struct osd_object *obj,
 static int osd_create_post(struct osd_thread_info *info, struct osd_object *obj,
                            struct lu_attr *attr, struct thandle *th)
 {
-        LASSERT(obj->oo_inode != NULL);
-
         osd_object_init0(obj);
         return 0;
 }
@@ -1689,12 +1699,11 @@ static int __osd_xattr_set(const struct lu_env *env, struct dt_object *dt,
         *t = inode->i_ctime;
         rc = inode->i_op->setxattr(dentry, name, buf->lb_buf,
                                    buf->lb_len, fs_flags);
-        if (likely(rc == 0)) {
-                spin_lock(&obj->oo_guard);
-                inode->i_ctime = *t;
-                spin_unlock(&obj->oo_guard);
-                mark_inode_dirty(inode);
-        }
+        /* ctime should not be updated with server-side time. */
+        spin_lock(&obj->oo_guard);
+        inode->i_ctime = *t;
+        spin_unlock(&obj->oo_guard);
+        mark_inode_dirty(inode);
         return rc;
 }
 
@@ -1718,7 +1727,7 @@ static int osd_ea_fid_set(const struct lu_env *env, struct dt_object *dt,
 
         return __osd_xattr_set(env, dt,
                                osd_buf_get(env, mdt_attrs, sizeof *mdt_attrs),
-                               MDT_XATTR_NAME, LU_XATTR_CREATE);
+                               XATTR_NAME_LMA, LU_XATTR_CREATE);
 
 }
 
@@ -1760,9 +1769,18 @@ static int osd_ea_fid_get(const struct lu_env *env, struct dentry *dentry,
 
         LASSERT(inode->i_op != NULL && inode->i_op->getxattr != NULL);
 
-        rc = inode->i_op->getxattr(dentry, MDT_XATTR_NAME, (void *)mdt_attrs,
+        rc = inode->i_op->getxattr(dentry, XATTR_NAME_LMA, (void *)mdt_attrs,
                                    sizeof *mdt_attrs);
 
+        /* Check LMA compatibility */
+        if (rc > 0 &&
+            (mdt_attrs->lma_incompat & ~cpu_to_be32(LMA_INCOMPAT_SUPP))) {
+                CWARN("Inode %lx: Unsupported incompat LMA feature(s) %#x\n",
+                      inode->i_ino, be32_to_cpu(mdt_attrs->lma_incompat) &
+                      ~LMA_INCOMPAT_SUPP);
+                return -ENOSYS;
+        }
+
         if (rc > 0) {
                 fid_be_to_cpu(fid, &mdt_attrs->lma_self_fid);
                 rc = 0;
@@ -1959,13 +1977,11 @@ static int osd_xattr_del(const struct lu_env *env,
         dentry->d_inode = inode;
         *t = inode->i_ctime;
         rc = inode->i_op->removexattr(dentry, name);
-        if (likely(rc == 0)) {
-                /* ctime should not be updated with server-side time. */
-                spin_lock(&obj->oo_guard);
-                inode->i_ctime = *t;
-                spin_unlock(&obj->oo_guard);
-                mark_inode_dirty(inode);
-        }
+        /* ctime should not be updated with server-side time. */
+        spin_lock(&obj->oo_guard);
+        inode->i_ctime = *t;
+        spin_unlock(&obj->oo_guard);
+        mark_inode_dirty(inode);
         return rc;
 }
 
@@ -2076,6 +2092,45 @@ static int osd_object_sync(const struct lu_env *env, struct dt_object *dt)
         RETURN(rc);
 }
 
+/*
+ * Get the 64-bit version for an inode.
+ */
+static dt_obj_version_t osd_object_version_get(const struct lu_env *env,
+                                               struct dt_object *dt)
+{
+        struct inode *inode = osd_dt_obj(dt)->oo_inode;
+
+        CDEBUG(D_INFO, "Get version "LPX64" for inode %lu\n",
+               LDISKFS_I(inode)->i_fs_version, inode->i_ino);
+        return LDISKFS_I(inode)->i_fs_version;
+}
+
+/*
+ * Set the 64-bit version and return the old version.
+ */
+static void osd_object_version_set(const struct lu_env *env, struct dt_object *dt,
+                                   dt_obj_version_t new_version)
+{
+        struct inode *inode = osd_dt_obj(dt)->oo_inode;
+
+        CDEBUG(D_INFO, "Set version "LPX64" (old "LPX64") for inode %lu\n",
+               new_version, LDISKFS_I(inode)->i_fs_version, inode->i_ino);
+        LDISKFS_I(inode)->i_fs_version = new_version;
+        /** Version is set after all inode operations are finished,
+         *  so we should mark it dirty here */
+        inode->i_sb->s_op->dirty_inode(inode);
+}
+
+static int osd_data_get(const struct lu_env *env, struct dt_object *dt,
+                        void **data)
+{
+        struct osd_object *obj = osd_dt_obj(dt);
+        ENTRY;
+
+        *data = (void *)obj->oo_inode;
+        RETURN(0);
+}
+
 static const struct dt_object_operations osd_obj_ops = {
         .do_read_lock    = osd_object_read_lock,
         .do_write_lock   = osd_object_write_lock,
@@ -2094,6 +2149,9 @@ static const struct dt_object_operations osd_obj_ops = {
         .do_xattr_list   = osd_xattr_list,
         .do_capa_get     = osd_capa_get,
         .do_object_sync  = osd_object_sync,
+        .do_version_get  = osd_object_version_get,
+        .do_version_set  = osd_object_version_set,
+        .do_data_get     = osd_data_get,
 };
 
 /**
@@ -2118,6 +2176,9 @@ static const struct dt_object_operations osd_obj_ea_ops = {
         .do_xattr_list   = osd_xattr_list,
         .do_capa_get     = osd_capa_get,
         .do_object_sync  = osd_object_sync,
+        .do_version_get  = osd_object_version_get,
+        .do_version_set  = osd_object_version_set,
+        .do_data_get     = osd_data_get,
 };
 
 /*
@@ -2290,7 +2351,6 @@ static int osd_index_try(const struct lu_env *env, struct dt_object *dt,
 
                 OBD_ALLOC_PTR(dir);
                 if (dir != NULL) {
-                        sema_init(&dir->od_sem, 1);
 
                         spin_lock(&obj->oo_guard);
                         if (obj->oo_dir == NULL)
@@ -2305,7 +2365,7 @@ static int osd_index_try(const struct lu_env *env, struct dt_object *dt,
                          * Now, that we have container data, serialize its
                          * initialization.
                          */
-                        down(&obj->oo_dir->od_sem);
+                        down_write(&obj->oo_ext_idx_sem);
                         /*
                          * recheck under lock.
                          */
@@ -2313,7 +2373,7 @@ static int osd_index_try(const struct lu_env *env, struct dt_object *dt,
                                 result = osd_iam_container_init(env, obj, dir);
                         else
                                 result = 0;
-                        up(&obj->oo_dir->od_sem);
+                        up_write(&obj->oo_ext_idx_sem);
                 } else
                         result = -ENOMEM;
         } else
@@ -2412,16 +2472,29 @@ static int osd_index_ea_delete(const struct lu_env *env, struct dt_object *dt,
 
         dentry = osd_child_dentry_get(env, obj,
                                       (char *)key, strlen((char *)key));
+
+        down_write(&obj->oo_ext_idx_sem);
         bh = ldiskfs_find_entry(dentry, &de);
         if (bh) {
+                struct osd_thread_info *oti = osd_oti_get(env);
+                struct timespec *ctime = &oti->oti_time;
+                struct timespec *mtime = &oti->oti_time2;
+
+                *ctime = dir->i_ctime;
+                *mtime = dir->i_mtime;
                 rc = ldiskfs_delete_entry(oh->ot_handle,
                                 dir, de, bh);
-                if (!rc)
-                        mark_inode_dirty(dir);
+                /* xtime should not be updated with server-side time. */
+                spin_lock(&obj->oo_guard);
+                dir->i_ctime = *ctime;
+                dir->i_mtime = *mtime;
+                spin_unlock(&obj->oo_guard);
+                mark_inode_dirty(dir);
                 brelse(bh);
         } else
                 rc = -ENOENT;
 
+        up_write(&obj->oo_ext_idx_sem);
         LASSERT(osd_invariant(obj));
         RETURN(rc);
 }
@@ -2659,6 +2732,8 @@ static int osd_ea_lookup_rec(const struct lu_env *env, struct osd_object *obj,
 
         dentry = osd_child_dentry_get(env, obj,
                                       (char *)key, strlen((char *)key));
+
+        down_read(&obj->oo_ext_idx_sem);
         bh = ldiskfs_find_entry(dentry, &de);
         if (bh) {
                 ino = le32_to_cpu(de->inode);
@@ -2673,10 +2748,11 @@ static int osd_ea_lookup_rec(const struct lu_env *env, struct osd_object *obj,
                         rc = osd_ea_fid_get(env, dentry, rec);
                         iput(inode);
                 } else
-                        rc = -ENOENT;
+                        rc = PTR_ERR(inode);
         } else
                 rc = -ENOENT;
 
+        up_read(&obj->oo_ext_idx_sem);
         RETURN (rc);
 }
 
@@ -2758,7 +2834,7 @@ static int osd_index_ea_insert(const struct lu_env *env, struct dt_object *dt,
         const char               *name  = (const char *)key;
         struct osd_object        *child;
 #ifdef HAVE_QUOTA_SUPPORT
-        cfs_cap_t              save = current->cap_effective;
+        cfs_cap_t                 save  = current->cap_effective;
 #endif
         int rc;
 
@@ -2776,18 +2852,32 @@ static int osd_index_ea_insert(const struct lu_env *env, struct dt_object *dt,
                 RETURN(rc);
         child = osd_object_find(env, dt, fid);
         if (!IS_ERR(child)) {
+                struct inode *inode = obj->oo_inode;
+                struct osd_thread_info *oti = osd_oti_get(env);
+                struct timespec *ctime = &oti->oti_time;
+                struct timespec *mtime = &oti->oti_time2;
+
+                *ctime = inode->i_ctime;
+                *mtime = inode->i_mtime;
 #ifdef HAVE_QUOTA_SUPPORT
                 if (ignore_quota)
                         current->cap_effective |= CFS_CAP_SYS_RESOURCE_MASK;
                 else
                         current->cap_effective &= ~CFS_CAP_SYS_RESOURCE_MASK;
 #endif
+                down_write(&obj->oo_ext_idx_sem);
                 rc = osd_ea_add_rec(env, obj, child, name, th);
-
+                up_write(&obj->oo_ext_idx_sem);
 #ifdef HAVE_QUOTA_SUPPORT
                 current->cap_effective = save;
 #endif
                 osd_object_put(env, child);
+                /* xtime should not be updated with server-side time. */
+                spin_lock(&obj->oo_guard);
+                inode->i_ctime = *ctime;
+                inode->i_mtime = *mtime;
+                spin_unlock(&obj->oo_guard);
+                mark_inode_dirty(inode);
         } else {
                 rc = PTR_ERR(child);
         }
@@ -2996,16 +3086,18 @@ static struct dt_it *osd_it_ea_init(const struct lu_env *env,
         obj_dentry->d_sb = osd_sb(osd_obj2dev(obj));
         obj_dentry->d_name.hash = 0;
 
-        it->oie_namelen         = 0;
+        it->oie_rd_dirent       = 0;
+        it->oie_it_dirent       = 0;
         it->oie_curr_pos        = 0;
         it->oie_next_pos        = 0;
+        it->oie_dirent          = NULL;
+        it->oie_buf             = info->oti_it_ea_buf;
         it->oie_obj             = obj;
         it->oie_file.f_dentry   = obj_dentry;
         it->oie_file.f_mapping    = obj->oo_inode->i_mapping;
         it->oie_file.f_op         = obj->oo_inode->i_fop;
         it->oie_file.private_data = NULL;
         lu_object_get(lo);
-
         RETURN((struct dt_it*) it);
 }
 
@@ -3018,9 +3110,10 @@ static void osd_it_ea_fini(const struct lu_env *env, struct dt_it *di)
 {
         struct osd_it_ea     *it   = (struct osd_it_ea *)di;
         struct osd_object    *obj  = it->oie_obj;
-
+        struct inode       *inode  = obj->oo_inode;
 
         ENTRY;
+        it->oie_file.f_op->release(inode, &it->oie_file);
         lu_object_put(env, &obj->oo_dt.do_lu);
         EXIT;
 }
@@ -3042,9 +3135,11 @@ static int osd_it_ea_get(const struct lu_env *env,
 
         ENTRY;
         LASSERT(((const char *)key)[0] == '\0');
-        it->oie_namelen         = 0;
         it->oie_curr_pos        = 0;
         it->oie_next_pos        = 0;
+        it->oie_rd_dirent       = 0;
+        it->oie_it_dirent       = 0;
+        it->oie_dirent          = NULL;
 
         RETURN(+1);
 }
@@ -3071,26 +3166,27 @@ static int osd_ldiskfs_filldir(char *buf, const char *name, int namelen,
                                loff_t offset, ino_t ino,
                                unsigned int d_type)
 {
-        struct osd_it_ea   *it     = (struct osd_it_ea *)buf;
-        struct dirent64    *dirent = &it->oie_dirent64;
-        int                 reclen = LDISKFS_DIR_REC_LEN(namelen);
-
-
+        struct osd_it_ea        *it = (struct osd_it_ea *)buf;
+        struct osd_it_ea_dirent *ent = it->oie_dirent;
         ENTRY;
-        if (it->oie_namelen)
-                RETURN(-ENOENT);
 
-        if (namelen == 0 || namelen > LDISKFS_NAME_LEN)
+        /* this should never happen */
+        if (unlikely(namelen == 0 || namelen > LDISKFS_NAME_LEN)) {
+                CERROR("ldiskfs return invalid namelen %d\n", namelen);
                 RETURN(-EIO);
+        }
 
-        strncpy(dirent->d_name, name, LDISKFS_NAME_LEN);
-        dirent->d_name[namelen] = 0;
-        dirent->d_ino           = ino;
-        dirent->d_off           = offset;
-        dirent->d_reclen        = reclen;
-        it->oie_namelen         = namelen;
-        it->oie_curr_pos        = offset;
+        if ((void *) ent - it->oie_buf + sizeof(*ent) + namelen >
+            OSD_IT_EA_BUFSIZE)
+                RETURN(1);
 
+        ent->oied_ino     = ino;
+        ent->oied_off     = offset;
+        ent->oied_namelen = namelen;
+        memcpy(ent->oied_name, name, namelen);
+
+        it->oie_rd_dirent++;
+        it->oie_dirent = (void *) ent + size_round(sizeof(*ent) + namelen);
         RETURN(0);
 }
 
@@ -3103,7 +3199,7 @@ static int osd_ldiskfs_filldir(char *buf, const char *name, int namelen,
  * \retval   0, on success
  * \retval -ve, on error
  */
-int osd_ldiskfs_it_fill(const struct dt_it *di)
+static int osd_ldiskfs_it_fill(const struct dt_it *di)
 {
         struct osd_it_ea   *it    = (struct osd_it_ea *)di;
         struct osd_object  *obj   = it->oie_obj;
@@ -3111,16 +3207,23 @@ int osd_ldiskfs_it_fill(const struct dt_it *di)
         int                result = 0;
 
         ENTRY;
-        it->oie_namelen    = 0;
+        it->oie_dirent = it->oie_buf;
+        it->oie_rd_dirent = 0;
         it->oie_file.f_pos = it->oie_curr_pos;
 
+        down_read(&obj->oo_ext_idx_sem);
         result = inode->i_fop->readdir(&it->oie_file, it,
                                        (filldir_t) osd_ldiskfs_filldir);
 
+        up_read(&obj->oo_ext_idx_sem);
         it->oie_next_pos = it->oie_file.f_pos;
 
-        if(!result && it->oie_namelen == 0)
+        if (it->oie_rd_dirent == 0) {
                 result = -EIO;
+        } else {
+                it->oie_dirent = it->oie_buf;
+                it->oie_it_dirent = 1;
+        }
 
         RETURN(result);
 }
@@ -3142,12 +3245,21 @@ static int osd_it_ea_next(const struct lu_env *env, struct dt_it *di)
         int rc;
 
         ENTRY;
-        it->oie_curr_pos = it->oie_next_pos;
 
-        if (it->oie_curr_pos == LDISKFS_HTREE_EOF)
-                rc = +1;
-        else
-                rc = osd_ldiskfs_it_fill(di);
+        if (it->oie_it_dirent < it->oie_rd_dirent) {
+                it->oie_dirent = (void *) it->oie_dirent +
+                                 size_round(sizeof(struct osd_it_ea_dirent) +
+                                            it->oie_dirent->oied_namelen);
+                it->oie_it_dirent++;
+                RETURN(0);
+        } else {
+                it->oie_curr_pos = it->oie_next_pos;
+
+                if (it->oie_curr_pos == LDISKFS_HTREE_EOF)
+                        rc = +1;
+                else
+                        rc = osd_ldiskfs_it_fill(di);
+        }
 
         RETURN(rc);
 }
@@ -3164,7 +3276,7 @@ static struct dt_key *osd_it_ea_key(const struct lu_env *env,
 {
         struct osd_it_ea *it = (struct osd_it_ea *)di;
         ENTRY;
-        RETURN((struct dt_key *)it->oie_dirent64.d_name);
+        RETURN((struct dt_key *)it->oie_dirent->oied_name);
 }
 
 /**
@@ -3178,7 +3290,7 @@ static int osd_it_ea_key_size(const struct lu_env *env, const struct dt_it *di)
 {
         struct osd_it_ea *it = (struct osd_it_ea *)di;
         ENTRY;
-        RETURN(it->oie_namelen);
+        RETURN(it->oie_dirent->oied_namelen);
 }
 
 /**
@@ -3205,18 +3317,19 @@ static struct dt_rec *osd_it_ea_rec(const struct lu_env *env,
 
         ENTRY;
         dev  = osd_dev(ldev);
-        id->oii_ino = it->oie_dirent64.d_ino;
+        id->oii_ino = it->oie_dirent->oied_ino;
         id->oii_gen = OSD_OII_NOGEN;
         inode = osd_iget(info, dev, id);
         if (!IS_ERR(inode)) {
                 dentry->d_inode = inode;
                 LASSERT(dentry->d_inode->i_sb == osd_sb(dev));
         } else {
-                CERROR("Error getting inode for ino =%d", id->oii_ino);
                 RETURN((struct dt_rec *) PTR_ERR(inode));
         }
 
         rc = osd_ea_fid_get(env, dentry, (struct dt_rec*) rec);
+        if (rc != 0)
+                rec = ERR_PTR(rc);
 
         iput(inode);
         RETURN((struct dt_rec *)rec);
@@ -3235,7 +3348,7 @@ static __u64 osd_it_ea_store(const struct lu_env *env, const struct dt_it *di)
 {
         struct osd_it_ea *it = (struct osd_it_ea *)di;
         ENTRY;
-        RETURN(it->oie_curr_pos);
+        RETURN(it->oie_dirent->oied_off);
 }
 
 /**
@@ -3255,7 +3368,7 @@ static int osd_it_ea_load(const struct lu_env *env,
         int rc;
 
         ENTRY;
-        it->oie_curr_pos = it->oie_next_pos = hash;
+        it->oie_curr_pos = hash;
 
         rc =  osd_ldiskfs_it_fill(di);
         if (rc == 0)
@@ -3331,15 +3444,29 @@ static void *osd_key_init(const struct lu_context *ctx,
         struct osd_thread_info *info;
 
         OBD_ALLOC_PTR(info);
-        if (info != NULL)
-                info->oti_env = container_of(ctx, struct lu_env, le_ctx);
-        else
+        if (info != NULL) {
+                OBD_ALLOC(info->oti_it_ea_buf, OSD_IT_EA_BUFSIZE);
+                if (info->oti_it_ea_buf != NULL) {
+                        info->oti_env = container_of(ctx, struct lu_env,
+                                                     le_ctx);
+                } else {
+                        OBD_FREE_PTR(info);
+                        info = ERR_PTR(-ENOMEM);
+                }
+        } else {
                 info = ERR_PTR(-ENOMEM);
+        }
         return info;
 }
 
-/* context key destructor: osd_key_fini */
-LU_KEY_FINI(osd, struct osd_thread_info);
+static void osd_key_fini(const struct lu_context *ctx,
+                         struct lu_context_key *key, void* data)
+{
+        struct osd_thread_info *info = data;
+
+        OBD_FREE(info->oti_it_ea_buf, OSD_IT_EA_BUFSIZE);
+        OBD_FREE_PTR(info);
+}
 
 static void osd_key_exit(const struct lu_context *ctx,
                          struct lu_context_key *key, void *data)
@@ -3575,17 +3702,22 @@ static struct inode *osd_iget(struct osd_thread_info *info,
         if (inode == NULL) {
                 CERROR("no inode\n");
                 inode = ERR_PTR(-EACCES);
-        } else if (is_bad_inode(inode)) {
-                CERROR("bad inode\n");
-                iput(inode);
-                inode = ERR_PTR(-ENOENT);
         } else if (id->oii_gen != OSD_OII_NOGEN &&
                    inode->i_generation != id->oii_gen) {
-                CERROR("stale inode\n");
                 iput(inode);
                 inode = ERR_PTR(-ESTALE);
+        } else if (inode->i_nlink == 0) {
+                /* due to parallel readdir and unlink,
+                * we can have dead inode here. */
+                CWARN("stale inode\n");
+                make_bad_inode(inode);
+                iput(inode);
+                inode = ERR_PTR(-ESTALE);
+        } else if (is_bad_inode(inode)) {
+                CERROR("bad inode %lx\n",inode->i_ino);
+                iput(inode);
+                inode = ERR_PTR(-ENOENT);
         }
-
         return inode;
 
 }