Whamcloud - gitweb
LU-993 osd: code cleanup for directory nlink count
authorNiu Yawei <niu@whamcloud.com>
Mon, 16 Jan 2012 06:52:12 +0000 (22:52 -0800)
committerOleg Drokin <green@whamcloud.com>
Thu, 9 Feb 2012 07:43:48 +0000 (02:43 -0500)
- LDISKFS_LINK_MAX is ldiskfs unique, we should not handle it in the mdd
  layer. It's moved down to the osd layer in this patch.

- Remove the declared operation count assertion in OSD_EXEC_OP(), since
  the undo operations are not declared, if there some operation failed,
  the undo operation will trigger the assertion unnecessarily.

- Restore the sub-directory count to 70000 for the sanity test_51b, and
  add test_51ba for testing unlink > LDISKFS_LINK_MAX sub-directories.

Signed-off-by: Alex Zhuravlev <bzzz@whamcloud.com>
Signed-off-by: Niu Yawei <niu@whamcloud.com>
Change-Id: Id02e7d2ac3a7664a55566b1de783f0a73162339b
Reviewed-on: http://review.whamcloud.com/1971
Tested-by: Hudson
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
lustre/mdd/mdd_dir.c
lustre/mdd/mdd_internal.h
lustre/mdd/mdd_object.c
lustre/osd-ldiskfs/osd_handler.c
lustre/tests/sanity.sh

index a2693a3..2c1ffec 100644 (file)
@@ -494,43 +494,6 @@ int mdd_link_sanity_check(const struct lu_env *env,
         RETURN(rc);
 }
 
         RETURN(rc);
 }
 
-/**
- * If subdir count is up to ddp_max_nlink, then enable MNLINK_OBJ flag and
- * assign i_nlink to 1 which means the i_nlink for subdir count is incredible
- * (maybe too large to be represented). It is a trick to break through the
- * "i_nlink" limitation for subdir count.
- */
-void __mdd_ref_add(const struct lu_env *env, struct mdd_object *obj,
-                   struct thandle *handle)
-{
-        struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
-        struct mdd_device *m = mdd_obj2mdd_dev(obj);
-
-        if (!mdd_is_mnlink(obj)) {
-                if (S_ISDIR(mdd_object_type(obj))) {
-                        if (mdd_la_get(env, obj, tmp_la, BYPASS_CAPA))
-                                return;
-
-                        if (tmp_la->la_nlink >= m->mdd_dt_conf.ddp_max_nlink) {
-                                obj->mod_flags |= MNLINK_OBJ;
-                                tmp_la->la_nlink = 1;
-                                tmp_la->la_valid = LA_NLINK;
-                                mdd_attr_set_internal(env, obj, tmp_la, handle,
-                                                      0);
-                                return;
-                        }
-                }
-                mdo_ref_add(env, obj, handle);
-        }
-}
-
-void __mdd_ref_del(const struct lu_env *env, struct mdd_object *obj,
-                   struct thandle *handle, int is_dot)
-{
-        if (!mdd_is_mnlink(obj) || is_dot)
-                mdo_ref_del(env, obj, handle);
-}
-
 static int __mdd_index_delete_only(const struct lu_env *env, struct mdd_object *pobj,
                                    const char *name, struct thandle *handle,
                                    struct lustre_capa *capa)
 static int __mdd_index_delete_only(const struct lu_env *env, struct mdd_object *pobj,
                                    const char *name, struct thandle *handle,
                                    struct lustre_capa *capa)
@@ -584,7 +547,7 @@ static int __mdd_index_insert(const struct lu_env *env, struct mdd_object *pobj,
         rc = __mdd_index_insert_only(env, pobj, lf, name, handle, capa);
         if (rc == 0 && is_dir) {
                 mdd_write_lock(env, pobj, MOR_TGT_PARENT);
         rc = __mdd_index_insert_only(env, pobj, lf, name, handle, capa);
         if (rc == 0 && is_dir) {
                 mdd_write_lock(env, pobj, MOR_TGT_PARENT);
-                __mdd_ref_add(env, pobj, handle);
+                mdo_ref_add(env, pobj, handle);
                 mdd_write_unlock(env, pobj);
         }
         RETURN(rc);
                 mdd_write_unlock(env, pobj);
         }
         RETURN(rc);
@@ -600,12 +563,8 @@ static int __mdd_index_delete(const struct lu_env *env, struct mdd_object *pobj,
 
         rc = __mdd_index_delete_only(env, pobj, name, handle, capa);
         if (rc == 0 && is_dir) {
 
         rc = __mdd_index_delete_only(env, pobj, name, handle, capa);
         if (rc == 0 && is_dir) {
-                int is_dot = 0;
-
-                if (name != NULL && name[0] == '.' && name[1] == 0)
-                        is_dot = 1;
                 mdd_write_lock(env, pobj, MOR_TGT_PARENT);
                 mdd_write_lock(env, pobj, MOR_TGT_PARENT);
-                __mdd_ref_del(env, pobj, handle, is_dot);
+                mdo_ref_del(env, pobj, handle);
                 mdd_write_unlock(env, pobj);
         }
 
                 mdd_write_unlock(env, pobj);
         }
 
@@ -834,7 +793,7 @@ static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
         if (rc)
                 GOTO(out_unlock, rc);
 
         if (rc)
                 GOTO(out_unlock, rc);
 
-        __mdd_ref_add(env, mdd_sobj, handle);
+        mdo_ref_add(env, mdd_sobj, handle);
 
         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
 
         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
@@ -896,6 +855,7 @@ int mdd_finish_unlink(const struct lu_env *env,
 {
         int rc;
         int reset = 1;
 {
         int rc;
         int reset = 1;
+        int is_dir = S_ISDIR(ma->ma_attr.la_mode);
         ENTRY;
 
         LASSERT(mdd_write_locked(env, obj) != 0);
         ENTRY;
 
         LASSERT(mdd_write_locked(env, obj) != 0);
@@ -903,7 +863,7 @@ int mdd_finish_unlink(const struct lu_env *env,
         /* read HSM flags, needed to set changelogs flags */
         ma->ma_need = MA_HSM | MA_INODE;
         rc = mdd_attr_get_internal(env, obj, ma);
         /* read HSM flags, needed to set changelogs flags */
         ma->ma_need = MA_HSM | MA_INODE;
         rc = mdd_attr_get_internal(env, obj, ma);
-        if (rc == 0 && ma->ma_attr.la_nlink == 0) {
+        if (rc == 0 && (ma->ma_attr.la_nlink == 0 || is_dir)) {
                 obj->mod_flags |= DEAD_OBJ;
                 /* add new orphan and the object
                  * will be deleted during mdd_close() */
                 obj->mod_flags |= DEAD_OBJ;
                 /* add new orphan and the object
                  * will be deleted during mdd_close() */
@@ -926,6 +886,9 @@ int mdd_finish_unlink(const struct lu_env *env,
                                 reset = 0;
                 }
 
                                 reset = 0;
                 }
 
+                /* get the i_nlink */
+                ma->ma_need = MA_INODE;
+                rc = mdd_attr_get_internal(env, obj, ma);
         }
         if (reset)
                 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
         }
         if (reset)
                 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
@@ -1044,10 +1007,10 @@ static int mdd_unlink(const struct lu_env *env, struct md_object *pobj,
         if (rc)
                 GOTO(cleanup, rc);
 
         if (rc)
                 GOTO(cleanup, rc);
 
-        __mdd_ref_del(env, mdd_cobj, handle, 0);
+        mdo_ref_del(env, mdd_cobj, handle);
         if (is_dir)
                 /* unlink dot */
         if (is_dir)
                 /* unlink dot */
-                __mdd_ref_del(env, mdd_cobj, handle, 1);
+                mdo_ref_del(env, mdd_cobj, handle);
 
         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
 
         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
@@ -1463,11 +1426,11 @@ static int mdd_rename_tgt(const struct lu_env *env,
          * it must be local one.
          */
         if (tobj && mdd_object_exists(mdd_tobj)) {
          * it must be local one.
          */
         if (tobj && mdd_object_exists(mdd_tobj)) {
-                __mdd_ref_del(env, mdd_tobj, handle, 0);
+                mdo_ref_del(env, mdd_tobj, handle);
 
                 /* Remove dot reference. */
                 if (S_ISDIR(ma->ma_attr.la_mode))
 
                 /* Remove dot reference. */
                 if (S_ISDIR(ma->ma_attr.la_mode))
-                        __mdd_ref_del(env, mdd_tobj, handle, 1);
+                        mdo_ref_del(env, mdd_tobj, handle);
 
                 la->la_valid = LA_CTIME;
                 rc = mdd_attr_check_set_internal(env, mdd_tobj, la, handle, 0);
 
                 la->la_valid = LA_CTIME;
                 rc = mdd_attr_check_set_internal(env, mdd_tobj, la, handle, 0);
@@ -1718,7 +1681,7 @@ int mdd_object_initialize(const struct lu_env *env, const struct lu_fid *pfid,
 
         if (S_ISDIR(ma->ma_attr.la_mode)) {
                 /* Add "." and ".." for newly created dir */
 
         if (S_ISDIR(ma->ma_attr.la_mode)) {
                 /* Add "." and ".." for newly created dir */
-                __mdd_ref_add(env, child, handle);
+                mdo_ref_add(env, child, handle);
                 rc = __mdd_index_insert_only(env, child, mdo2fid(child),
                                              dot, handle, BYPASS_CAPA);
                 if (rc == 0)
                 rc = __mdd_index_insert_only(env, child, mdo2fid(child),
                                              dot, handle, BYPASS_CAPA);
                 if (rc == 0)
@@ -1726,7 +1689,7 @@ int mdd_object_initialize(const struct lu_env *env, const struct lu_fid *pfid,
                                                      dotdot, handle,
                                                      BYPASS_CAPA);
                 if (rc != 0)
                                                      dotdot, handle,
                                                      BYPASS_CAPA);
                 if (rc != 0)
-                        __mdd_ref_del(env, child, handle, 1);
+                        mdo_ref_del(env, child, handle);
         }
         if (rc == 0)
                 mdd_links_add(env, child, pfid, lname, handle, 1);
         }
         if (rc == 0)
                 mdd_links_add(env, child, pfid, lname, handle, 1);
@@ -2156,9 +2119,9 @@ cleanup:
 
                 if (rc2 == 0) {
                         mdd_write_lock(env, son, MOR_TGT_CHILD);
 
                 if (rc2 == 0) {
                         mdd_write_lock(env, son, MOR_TGT_CHILD);
-                        __mdd_ref_del(env, son, handle, 0);
+                        mdo_ref_del(env, son, handle);
                         if (initialized && S_ISDIR(attr->la_mode))
                         if (initialized && S_ISDIR(attr->la_mode))
-                                __mdd_ref_del(env, son, handle, 1);
+                                mdo_ref_del(env, son, handle);
                         mdd_write_unlock(env, son);
                 }
         }
                         mdd_write_unlock(env, son);
                 }
         }
@@ -2576,11 +2539,11 @@ static int mdd_rename(const struct lu_env *env,
                         rc = -EINVAL;
                         goto cleanup;
                 }
                         rc = -EINVAL;
                         goto cleanup;
                 }
-                __mdd_ref_del(env, mdd_tobj, handle, 0);
+                mdo_ref_del(env, mdd_tobj, handle);
 
                 /* Remove dot reference. */
                 if (is_dir)
 
                 /* Remove dot reference. */
                 if (is_dir)
-                        __mdd_ref_del(env, mdd_tobj, handle, 1);
+                        mdo_ref_del(env, mdd_tobj, handle);
 
                 la->la_valid = LA_CTIME;
                 rc = mdd_attr_check_set_internal(env, mdd_tobj, la, handle, 0);
 
                 la->la_valid = LA_CTIME;
                 rc = mdd_attr_check_set_internal(env, mdd_tobj, la, handle, 0);
index 4bdf444..72e497c 100644 (file)
@@ -123,7 +123,6 @@ enum mod_flags {
         APPEND_OBJ = 1 << 1,
         IMMUTE_OBJ = 1 << 2,
         ORPHAN_OBJ = 1 << 3,
         APPEND_OBJ = 1 << 1,
         IMMUTE_OBJ = 1 << 2,
         ORPHAN_OBJ = 1 << 3,
-        MNLINK_OBJ = 1 << 4
 };
 
 enum mdd_object_role {
 };
 
 enum mdd_object_role {
@@ -277,10 +276,6 @@ void mdd_pdo_read_unlock(const struct lu_env *env, struct mdd_object *obj,
 /* mdd_dir.c */
 int mdd_is_subdir(const struct lu_env *env, struct md_object *mo,
                   const struct lu_fid *fid, struct lu_fid *sfid);
 /* mdd_dir.c */
 int mdd_is_subdir(const struct lu_env *env, struct md_object *mo,
                   const struct lu_fid *fid, struct lu_fid *sfid);
-void __mdd_ref_add(const struct lu_env *env, struct mdd_object *obj,
-                   struct thandle *handle);
-void __mdd_ref_del(const struct lu_env *env, struct mdd_object *obj,
-                   struct thandle *handle, int is_dot);
 int mdd_may_create(const struct lu_env *env, struct mdd_object *pobj,
                    struct mdd_object *cobj, int check_perm, int check_nlink);
 int mdd_may_unlink(const struct lu_env *env, struct mdd_object *pobj,
 int mdd_may_create(const struct lu_env *env, struct mdd_object *pobj,
                    struct mdd_object *cobj, int check_perm, int check_nlink);
 int mdd_may_unlink(const struct lu_env *env, struct mdd_object *pobj,
@@ -577,11 +572,6 @@ static inline int mdd_is_append(struct mdd_object *obj)
         return obj->mod_flags & APPEND_OBJ;
 }
 
         return obj->mod_flags & APPEND_OBJ;
 }
 
-static inline int mdd_is_mnlink(struct mdd_object *obj)
-{
-        return obj->mod_flags & MNLINK_OBJ;
-}
-
 static inline int mdd_object_exists(struct mdd_object *obj)
 {
         return lu_object_exists(mdd2lu_obj(obj));
 static inline int mdd_object_exists(struct mdd_object *obj)
 {
         return lu_object_exists(mdd2lu_obj(obj));
index cf17146..20c4385 100644 (file)
@@ -556,8 +556,6 @@ int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
         if (rc == 0) {
                 mdd_flags_xlate(obj, la->la_flags);
         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
         if (rc == 0) {
                 mdd_flags_xlate(obj, la->la_flags);
-                if (S_ISDIR(la->la_mode) && la->la_nlink == 1)
-                        obj->mod_flags |= MNLINK_OBJ;
         }
         RETURN(rc);
 }
         }
         RETURN(rc);
 }
@@ -1965,11 +1963,11 @@ static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
         if (rc)
                 GOTO(cleanup, rc);
 
         if (rc)
                 GOTO(cleanup, rc);
 
-        __mdd_ref_del(env, mdd_obj, handle, 0);
+        mdo_ref_del(env, mdd_obj, handle);
 
         if (S_ISDIR(lu_object_attr(&obj->mo_lu))) {
                 /* unlink dot */
 
         if (S_ISDIR(lu_object_attr(&obj->mo_lu))) {
                 /* unlink dot */
-                __mdd_ref_del(env, mdd_obj, handle, 1);
+                mdo_ref_del(env, mdd_obj, handle);
         }
 
         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
         }
 
         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
@@ -2181,7 +2179,7 @@ static int mdd_ref_add(const struct lu_env *env, struct md_object *obj,
         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
         rc = mdd_link_sanity_check(env, NULL, NULL, mdd_obj);
         if (rc == 0)
         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
         rc = mdd_link_sanity_check(env, NULL, NULL, mdd_obj);
         if (rc == 0)
-                __mdd_ref_add(env, mdd_obj, handle);
+                mdo_ref_add(env, mdd_obj, handle);
         mdd_write_unlock(env, mdd_obj);
         if (rc == 0) {
                 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
         mdd_write_unlock(env, mdd_obj);
         if (rc == 0) {
                 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
@@ -2360,7 +2358,7 @@ static int mdd_close(const struct lu_env *env, struct md_object *obj,
         struct mdd_device *mdd = mdo2mdd(obj);
         struct thandle    *handle = NULL;
         int rc;
         struct mdd_device *mdd = mdo2mdd(obj);
         struct thandle    *handle = NULL;
         int rc;
-        int reset = 1;
+        int is_orphan = 0, reset = 1;
 
 #ifdef HAVE_QUOTA_SUPPORT
         struct obd_device *obd = mdo2mdd(obj)->mdd_obd_dev;
 
 #ifdef HAVE_QUOTA_SUPPORT
         struct obd_device *obd = mdo2mdd(obj)->mdd_obd_dev;
@@ -2418,6 +2416,7 @@ static int mdd_close(const struct lu_env *env, struct md_object *obj,
                         CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
                                "list, OSS objects to be destroyed.\n",
                                PFID(mdd_object_fid(mdd_obj)));
                         CDEBUG(D_HA, "Object "DFID" is deleted from orphan "
                                "list, OSS objects to be destroyed.\n",
                                PFID(mdd_object_fid(mdd_obj)));
+                        is_orphan = 1;
                 } else {
                         CERROR("Object "DFID" can not be deleted from orphan "
                                 "list, maybe cause OST objects can not be "
                 } else {
                         CERROR("Object "DFID" can not be deleted from orphan "
                                 "list, maybe cause OST objects can not be "
@@ -2433,7 +2432,7 @@ static int mdd_close(const struct lu_env *env, struct md_object *obj,
         rc = mdd_iattr_get(env, mdd_obj, ma);
         /* Object maybe not in orphan list originally, it is rare case for
          * mdd_finish_unlink() failure. */
         rc = mdd_iattr_get(env, mdd_obj, ma);
         /* Object maybe not in orphan list originally, it is rare case for
          * mdd_finish_unlink() failure. */
-        if (rc == 0 && ma->ma_attr.la_nlink == 0) {
+        if (rc == 0 && (ma->ma_attr.la_nlink == 0 || is_orphan)) {
 #ifdef HAVE_QUOTA_SUPPORT
                 if (mds->mds_quota) {
                         quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
 #ifdef HAVE_QUOTA_SUPPORT
                 if (mds->mds_quota) {
                         quota_opc = FSFILT_OP_UNLINK_PARTIAL_CHILD;
index 72bfd42..dfc5b15 100644 (file)
@@ -150,8 +150,10 @@ static const struct dt_index_operations       osd_index_ea_ops;
 #define OSD_EXEC_OP(handle, op)     {                            \
         struct osd_thandle *oh;                                  \
         oh = container_of0(handle, struct osd_thandle, ot_super);\
 #define OSD_EXEC_OP(handle, op)     {                            \
         struct osd_thandle *oh;                                  \
         oh = container_of0(handle, struct osd_thandle, ot_super);\
-        LASSERT((oh)->ot_declare_ ##op > 0);                     \
-        ((oh)->ot_declare_ ##op)--; }
+        if (((oh)->ot_declare_ ##op) > 0) {                      \
+                ((oh)->ot_declare_ ##op)--;                      \
+        }                                                        \
+        }
 #else
 #define OSD_DECLARE_OP(oh, op)
 #define OSD_EXEC_OP(oh, op)
 #else
 #define OSD_DECLARE_OP(oh, op)
 #define OSD_EXEC_OP(oh, op)
@@ -1988,7 +1990,17 @@ static int osd_object_destroy(const struct lu_env *env,
         oh = container_of0(th, struct osd_thandle, ot_super);
         LASSERT(oh->ot_handle);
         LASSERT(inode);
         oh = container_of0(th, struct osd_thandle, ot_super);
         LASSERT(oh->ot_handle);
         LASSERT(inode);
-        LASSERT(osd_inode_unlinked(inode));
+
+        if (S_ISDIR(inode->i_mode)) {
+                LASSERT(osd_inode_unlinked(inode) ||
+                        inode->i_nlink == 1);
+                cfs_spin_lock(&obj->oo_guard);
+                inode->i_nlink = 0;
+                cfs_spin_unlock(&obj->oo_guard);
+                inode->i_sb->s_op->dirty_inode(inode);
+        } else {
+                LASSERT(osd_inode_unlinked(inode));
+        }
 
         OSD_EXEC_OP(th, destroy);
 
 
         OSD_EXEC_OP(th, destroy);
 
@@ -2248,9 +2260,24 @@ static int osd_object_ref_add(const struct lu_env *env,
 
         OSD_EXEC_OP(th, ref_add);
 
 
         OSD_EXEC_OP(th, ref_add);
 
+        /*
+         * DIR_NLINK feature is set for compatibility reasons if:
+         * 1) nlinks > LDISKFS_LINK_MAX, or
+         * 2) nlinks == 2, since this indicates i_nlink was previously 1.
+         *
+         * It is easier to always set this flag (rather than check and set),
+         * since it has less overhead, and the superblock will be dirtied
+         * at some point. Both e2fsprogs and any Lustre-supported ldiskfs
+         * do not actually care whether this flag is set or not.
+         */
         cfs_spin_lock(&obj->oo_guard);
         cfs_spin_lock(&obj->oo_guard);
-        LASSERT(inode->i_nlink < LDISKFS_LINK_MAX);
         inode->i_nlink++;
         inode->i_nlink++;
+        if (S_ISDIR(inode->i_mode) && inode->i_nlink > 1) {
+                if (inode->i_nlink >= LDISKFS_LINK_MAX ||
+                    inode->i_nlink == 2)
+                        inode->i_nlink = 1;
+        }
+        LASSERT(inode->i_nlink < LDISKFS_LINK_MAX);
         cfs_spin_unlock(&obj->oo_guard);
         inode->i_sb->s_op->dirty_inode(inode);
         LINVRNT(osd_invariant(obj));
         cfs_spin_unlock(&obj->oo_guard);
         inode->i_sb->s_op->dirty_inode(inode);
         LINVRNT(osd_invariant(obj));
@@ -2296,6 +2323,11 @@ static int osd_object_ref_del(const struct lu_env *env,
         cfs_spin_lock(&obj->oo_guard);
         LASSERT(inode->i_nlink > 0);
         inode->i_nlink--;
         cfs_spin_lock(&obj->oo_guard);
         LASSERT(inode->i_nlink > 0);
         inode->i_nlink--;
+        /* If this is/was a many-subdir directory (nlink > LDISKFS_LINK_MAX)
+         * then the nlink count is 1. Don't let it be set to 0 or the directory
+         * inode will be deleted incorrectly. */
+        if (S_ISDIR(inode->i_mode) && inode->i_nlink == 0)
+                inode->i_nlink++;
         cfs_spin_unlock(&obj->oo_guard);
         inode->i_sb->s_op->dirty_inode(inode);
         LINVRNT(osd_invariant(obj));
         cfs_spin_unlock(&obj->oo_guard);
         inode->i_sb->s_op->dirty_inode(inode);
         LINVRNT(osd_invariant(obj));
index e79c635..cde9086 100644 (file)
@@ -2982,17 +2982,13 @@ test_51a() {    # was test_51
 }
 run_test 51a "special situations: split htree with empty entry =="
 
 }
 run_test 51a "special situations: split htree with empty entry =="
 
-#export NUMTEST=70000
-# FIXME: I select a relatively small number to do basic test.
-# large number may give panic(). debugging on this is going on.
-export NUMTEST=70
+export NUMTEST=70000
 test_51b() {
        NUMFREE=`df -i -P $DIR | tail -n 1 | awk '{ print $4 }'`
        [ $NUMFREE -lt 21000 ] && \
                skip "not enough free inodes ($NUMFREE)" && \
                return
 
 test_51b() {
        NUMFREE=`df -i -P $DIR | tail -n 1 | awk '{ print $4 }'`
        [ $NUMFREE -lt 21000 ] && \
                skip "not enough free inodes ($NUMFREE)" && \
                return
 
-       check_kernel_version 40 || NUMTEST=31000
        [ $NUMFREE -lt $NUMTEST ] && NUMTEST=$(($NUMFREE - 50))
 
        mkdir -p $DIR/d51b
        [ $NUMFREE -lt $NUMTEST ] && NUMTEST=$(($NUMFREE - 50))
 
        mkdir -p $DIR/d51b
@@ -3000,6 +2996,32 @@ test_51b() {
 }
 run_test 51b "mkdir .../t-0 --- .../t-$NUMTEST ===================="
 
 }
 run_test 51b "mkdir .../t-0 --- .../t-$NUMTEST ===================="
 
+test_51ba() { # LU-993
+        local BASE=$DIR/d51b
+        # unlink all but 100 subdirectories, then check it still works
+        local LEFT=100
+        local DELETE=$((NUMTEST - LEFT))
+
+        ! [ -d "${BASE}/t-$DELETE" ] && skip "test_51b() not run" && return 0
+
+        # for ldiskfs the nlink count should be 1, but this is OSD specific
+        # and so this is listed for informational purposes only
+        log "nlink before: $(stat -c %h $BASE)"
+        unlinkmany -d $BASE/t- $DELETE ||
+                error "unlink of first $DELETE subdirs failed"
+
+        log "nlink between: $(stat -c %h $BASE)"
+        local FOUND=$(ls -l ${BASE} | wc -l)
+        FOUND=$((FOUND - 1))  # trim the first line of ls output
+        [ $FOUND -ne $LEFT ] &&
+                error "can't find subdirs: found only $FOUND/$LEFT"
+
+        unlinkmany -d $BASE/t- $DELETE $LEFT ||
+                error "unlink of second $LEFT subdirs failed"
+        log "nlink after: $(stat -c %h $BASE)"
+}
+run_test 51ba "rmdir .../t-0 --- .../t-$NUMTEST"
+
 test_51bb() {
        [ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return
 
 test_51bb() {
        [ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return