Whamcloud - gitweb
Branch HEAD
authorfanyong <fanyong>
Tue, 8 Jul 2008 02:18:20 +0000 (02:18 +0000)
committerfanyong <fanyong>
Tue, 8 Jul 2008 02:18:20 +0000 (02:18 +0000)
b=16167
i=nikita
i=huanghua

If subdir count is up to ddp_max_nlink, then enable MNLINK_OBJ flag and
assign "i_nlink" to 1. It is a trick to break through the "i_nlink"
limitation for subdir count.

lustre/mdd/mdd_dir.c
lustre/mdd/mdd_internal.h
lustre/mdd/mdd_object.c
lustre/osd/osd_handler.c

index bcb0eac..72bbbe7 100644 (file)
@@ -252,7 +252,11 @@ static int __mdd_may_link(const struct lu_env *env, struct mdd_object *obj)
         if (rc)
                 RETURN(rc);
 
-        if (la->la_nlink >= m->mdd_dt_conf.ddp_max_nlink)
+        /*
+         * Subdir count limitation can be broken through.
+         */ 
+        if (la->la_nlink >= m->mdd_dt_conf.ddp_max_nlink &&
+            !S_ISDIR(la->la_mode))
                 RETURN(-EMLINK);
         else
                 RETURN(0);
@@ -445,6 +449,42 @@ const struct dt_rec *__mdd_fid_rec(const struct lu_env *env,
         return (const struct dt_rec *)pack;
 }
 
+/**
+ * If subdir count is up to ddp_max_nlink, then enable MNLINK_OBJ flag and
+ * assign i_nlink to 1 which means the i_nlink for subdir count is incredible
+ * (maybe too large to be represented). It is a trick to break through the
+ * "i_nlink" limitation for subdir count.
+ */
+void __mdd_ref_add(const struct lu_env *env, struct mdd_object *obj,
+                   struct thandle *handle)
+{
+        struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
+        struct mdd_device *m = mdd_obj2mdd_dev(obj);
+
+        if (!mdd_is_mnlink(obj)) {
+                if (S_ISDIR(mdd_object_type(obj))) {
+                        if (mdd_la_get(env, obj, tmp_la, BYPASS_CAPA))
+                                return;
+
+                        if (tmp_la->la_nlink >= m->mdd_dt_conf.ddp_max_nlink) {
+                                obj->mod_flags |= MNLINK_OBJ;
+                                tmp_la->la_nlink = 1;
+                                tmp_la->la_valid = LA_NLINK;
+                                mdd_attr_set_internal(env, obj, tmp_la, handle,
+                                                      0);
+                                return;
+                        }
+                }
+                mdo_ref_add(env, obj, handle);
+        }
+}
+
+void __mdd_ref_del(const struct lu_env *env, struct mdd_object *obj,
+                   struct thandle *handle, int is_dot)
+{
+        if (!mdd_is_mnlink(obj) || is_dot)
+                mdo_ref_del(env, obj, handle);
+}
 
 /* insert named index, add reference if isdir */
 static int __mdd_index_insert(const struct lu_env *env, struct mdd_object *pobj,
@@ -467,7 +507,7 @@ static int __mdd_index_insert(const struct lu_env *env, struct mdd_object *pobj,
         if (rc == 0) {
                 if (is_dir) {
                         mdd_write_lock(env, pobj);
-                        mdo_ref_add(env, pobj, handle);
+                        __mdd_ref_add(env, pobj, handle);
                         mdd_write_unlock(env, pobj);
                 }
         }
@@ -488,8 +528,12 @@ static int __mdd_index_delete(const struct lu_env *env, struct mdd_object *pobj,
                                                     (struct dt_key *)name,
                                                     handle, capa);
                 if (rc == 0 && is_dir) {
+                        int is_dot = 0;
+
+                        if (name != NULL && name[0] == '.' && name[1] == 0)
+                                is_dot = 1;
                         mdd_write_lock(env, pobj);
-                        mdo_ref_del(env, pobj, handle);
+                        __mdd_ref_del(env, pobj, handle, is_dot);
                         mdd_write_unlock(env, pobj);
                 }
         } else
@@ -552,7 +596,7 @@ static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
         if (rc)
                 GOTO(out_unlock, rc);
 
-        mdo_ref_add(env, mdd_sobj, handle);
+        __mdd_ref_add(env, mdd_sobj, handle);
 
         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
@@ -657,10 +701,10 @@ static int mdd_unlink(const struct lu_env *env, struct md_object *pobj,
         if (rc)
                 GOTO(cleanup, rc);
 
-        mdo_ref_del(env, mdd_cobj, handle);
+        __mdd_ref_del(env, mdd_cobj, handle, 0);
         if (is_dir)
                 /* unlink dot */
-                mdo_ref_del(env, mdd_cobj, handle);
+                __mdd_ref_del(env, mdd_cobj, handle, 1);
 
         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
@@ -922,11 +966,11 @@ static int mdd_rename_tgt(const struct lu_env *env,
          * it must be local one.
          */
         if (tobj && mdd_object_exists(mdd_tobj)) {
-                mdo_ref_del(env, mdd_tobj, handle);
+                __mdd_ref_del(env, mdd_tobj, handle, 0);
 
                 /* Remove dot reference. */
                 if (S_ISDIR(ma->ma_attr.la_mode))
-                        mdo_ref_del(env, mdd_tobj, handle);
+                        __mdd_ref_del(env, mdd_tobj, handle, 1);
 
                 la->la_valid = LA_CTIME;
                 rc = mdd_attr_check_set_internal(env, mdd_tobj, la, handle, 0);
@@ -1093,7 +1137,7 @@ int mdd_object_initialize(const struct lu_env *env, const struct lu_fid *pfid,
 
         if (S_ISDIR(ma->ma_attr.la_mode)) {
                 /* Add "." and ".." for newly created dir */
-                mdo_ref_add(env, child, handle);
+                __mdd_ref_add(env, child, handle);
                 rc = __mdd_index_insert_only(env, child, mdo2fid(child),
                                              dot, handle, BYPASS_CAPA);
                 if (rc == 0) {
@@ -1103,13 +1147,11 @@ int mdd_object_initialize(const struct lu_env *env, const struct lu_fid *pfid,
                         if (rc != 0) {
                                 int rc2;
 
-                                rc2 = __mdd_index_delete(env, child, dot, 0,
+                                rc2 = __mdd_index_delete(env, child, dot, 1,
                                                          handle, BYPASS_CAPA);
                                 if (rc2 != 0)
                                         CERROR("Failure to cleanup after dotdot"
                                                " creation: %d (%d)\n", rc2, rc);
-                                else
-                                        mdo_ref_del(env, child, handle);
                         }
                 }
         }
@@ -1176,12 +1218,6 @@ static int mdd_create_sanity_check(const struct lu_env *env,
         }
 
         switch (ma->ma_attr.la_mode & S_IFMT) {
-        case S_IFDIR: {
-                if (la->la_nlink >= m->mdd_dt_conf.ddp_max_nlink)
-                        RETURN(-EMLINK);
-                else
-                        RETURN(0);
-        }
         case S_IFLNK: {
                 unsigned int symlen = strlen(spec->u.sp_symname) + 1;
 
@@ -1190,6 +1226,7 @@ static int mdd_create_sanity_check(const struct lu_env *env,
                 else
                         RETURN(0);
         }
+        case S_IFDIR:
         case S_IFREG:
         case S_IFCHR:
         case S_IFBLK:
@@ -1222,7 +1259,7 @@ static int mdd_create(const struct lu_env *env,
         struct lu_attr    *attr = &ma->ma_attr;
         struct lov_mds_md *lmm = NULL;
         struct thandle    *handle;
-        int rc, created = 0, inserted = 0, lmm_size = 0;
+        int rc, created = 0, initialized = 0, inserted = 0, lmm_size = 0;
         struct dynlock_handle *dlh;
         ENTRY;
 
@@ -1322,6 +1359,8 @@ static int mdd_create(const struct lu_env *env,
                  */
                 GOTO(cleanup, rc);
 
+        initialized = 1;
+
         rc = __mdd_index_insert(env, mdd_pobj, mdo2fid(son),
                                 name, S_ISDIR(attr->la_mode), handle,
                                 mdd_object_capa(env, mdd_pobj));
@@ -1382,9 +1421,12 @@ cleanup:
                                 CERROR("error can not cleanup destroy %d\n",
                                        rc2);
                 }
+
                 if (rc2 == 0) {
                         mdd_write_lock(env, son);
-                        mdo_ref_del(env, son, handle);
+                        __mdd_ref_del(env, son, handle, 0);
+                        if (initialized && S_ISDIR(attr->la_mode))
+                                __mdd_ref_del(env, son, handle, 1);
                         mdd_write_unlock(env, son);
                 }
         }
@@ -1583,11 +1625,11 @@ static int mdd_rename(const struct lu_env *env,
          */
         if (tobj && mdd_object_exists(mdd_tobj)) {
                 mdd_write_lock(env, mdd_tobj);
-                mdo_ref_del(env, mdd_tobj, handle);
+                __mdd_ref_del(env, mdd_tobj, handle, 0);
 
                 /* Remove dot reference. */
                 if (is_dir)
-                        mdo_ref_del(env, mdd_tobj, handle);
+                        __mdd_ref_del(env, mdd_tobj, handle, 1);
 
                 la->la_valid = LA_CTIME;
                 rc = mdd_attr_check_set_internal(env, mdd_tobj, la, handle, 0);
index e77e84f..5d577a2 100644 (file)
@@ -78,7 +78,8 @@ enum mod_flags {
         DEAD_OBJ   = 1 << 0,
         APPEND_OBJ = 1 << 1,
         IMMUTE_OBJ = 1 << 2,
-        ORPHAN_OBJ = 1 << 3
+        ORPHAN_OBJ = 1 << 3,
+        MNLINK_OBJ = 1 << 4
 };
 
 #define LUSTRE_APPEND_FL LDISKFS_APPEND_FL
@@ -199,6 +200,10 @@ void mdd_pdo_write_unlock(const struct lu_env *env, struct mdd_object *obj,
 void mdd_pdo_read_unlock(const struct lu_env *env, struct mdd_object *obj,
                          struct dynlock_handle *dlh);
 /* mdd_dir.c */
+void __mdd_ref_add(const struct lu_env *env, struct mdd_object *obj,
+                   struct thandle *handle);
+void __mdd_ref_del(const struct lu_env *env, struct mdd_object *obj,
+                   struct thandle *handle, int is_dot);
 int mdd_may_create(const struct lu_env *env, struct mdd_object *pobj,
                    struct mdd_object *cobj, int check_perm, int check_nlink);
 int mdd_may_unlink(const struct lu_env *env, struct mdd_object *pobj,
@@ -427,6 +432,11 @@ static inline int mdd_is_append(struct mdd_object *obj)
         return obj->mod_flags & APPEND_OBJ;
 }
 
+static inline int mdd_is_mnlink(struct mdd_object *obj)
+{
+        return obj->mod_flags & MNLINK_OBJ;
+}
+
 static inline int mdd_object_exists(struct mdd_object *obj)
 {
         return lu_object_exists(mdd2lu_obj(obj));
index 83e4a38..ec6f727 100644 (file)
@@ -263,8 +263,11 @@ int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
 
         ENTRY;
         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
-        if (rc == 0)
+        if (rc == 0) {
                 mdd_flags_xlate(obj, la->la_flags);
+                if (S_ISDIR(la->la_mode) && la->la_nlink == 1)
+                        obj->mod_flags |= MNLINK_OBJ;
+        }
         RETURN(rc);
 }
 
@@ -1054,11 +1057,11 @@ static int mdd_ref_del(const struct lu_env *env, struct md_object *obj,
         if (rc)
                 GOTO(cleanup, rc);
 
-        mdo_ref_del(env, mdd_obj, handle);
+        __mdd_ref_del(env, mdd_obj, handle, 0);
 
         if (S_ISDIR(lu_object_attr(&obj->mo_lu))) {
                 /* unlink dot */
-                mdo_ref_del(env, mdd_obj, handle);
+                __mdd_ref_del(env, mdd_obj, handle, 1);
         }
 
         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
@@ -1197,7 +1200,7 @@ static int mdd_ref_add(const struct lu_env *env, struct md_object *obj,
         mdd_write_lock(env, mdd_obj);
         rc = mdd_link_sanity_check(env, NULL, NULL, mdd_obj);
         if (rc == 0)
-                mdo_ref_add(env, mdd_obj, handle);
+                __mdd_ref_add(env, mdd_obj, handle);
         mdd_write_unlock(env, mdd_obj);
         if (rc == 0) {
                 LASSERT(ma->ma_attr.la_valid & LA_CTIME);
index fe067c3..898f597 100644 (file)
@@ -1294,15 +1294,10 @@ static void osd_object_ref_add(const struct lu_env *env,
         LASSERT(th != NULL);
 
         spin_lock(&obj->oo_guard);
-        if (inode->i_nlink < LDISKFS_LINK_MAX) {
-                inode->i_nlink ++;
-                spin_unlock(&obj->oo_guard);
-                mark_inode_dirty(inode);
-        } else {
-                spin_unlock(&obj->oo_guard);
-                LU_OBJECT_DEBUG(D_ERROR, env, &dt->do_lu,
-                                "Overflowed nlink\n");
-        }
+        LASSERT(inode->i_nlink < LDISKFS_LINK_MAX);
+        inode->i_nlink++;
+        spin_unlock(&obj->oo_guard);
+        mark_inode_dirty(inode);
         LASSERT(osd_invariant(obj));
 }
 
@@ -1322,15 +1317,10 @@ static void osd_object_ref_del(const struct lu_env *env,
         LASSERT(th != NULL);
 
         spin_lock(&obj->oo_guard);
-        if (inode->i_nlink > 0) {
-                inode->i_nlink --;
-                spin_unlock(&obj->oo_guard);
-                mark_inode_dirty(inode);
-        } else {
-                spin_unlock(&obj->oo_guard);
-                LU_OBJECT_DEBUG(D_ERROR, env, &dt->do_lu,
-                                "Underflowed nlink\n");
-        }
+        LASSERT(inode->i_nlink > 0);
+        inode->i_nlink--;
+        spin_unlock(&obj->oo_guard);
+        mark_inode_dirty(inode);
         LASSERT(osd_invariant(obj));
 }