Whamcloud - gitweb
LU-8569 linkea: linkEA size limitation 00/23500/9
authorFan Yong <fan.yong@intel.com>
Sun, 28 Aug 2016 10:15:37 +0000 (18:15 +0800)
committerOleg Drokin <oleg.drokin@intel.com>
Sun, 1 Jan 2017 01:57:34 +0000 (01:57 +0000)
Under DNE mode, if we do not restrict the linkEA size, and if there
are too many cross-MDTs hard links to the same object, then it will
casue the llog overflow. On the other hand, too many linkEA entries
in the linkEA will serious affect the linkEA performance because we
only support to locate linkEA entry consecutively.

So we need to restrict the linkEA size. Currently, it is 4096 bytes,
that is independent from the backend. If too many hard links caused
the linkEA overflowed, we will add overflow timestamp in the linkEA
header. Such overflow timestamp has some functionalities:

1. It will prevent the object being migrated to other MDT, because
   some name entries may be not in the linkEA, so we cannot update
   these name entries for the migration.

2. It will tell the namespace LFSCK that the 'nlink' attribute may
   be more trustable than the linkEA, then avoid misguiding the
   namespace LFSCK to repair 'nlink' attribute based on linkEA.

There will be subsequent patch(es) for namespace LFSCK to handle the
linkEA size limitation and overflow cases.

Signed-off-by: Fan Yong <fan.yong@intel.com>
Change-Id: I2d6c2be04305c1d7e3af160d8b80e73b66a36483
Reviewed-on: https://review.whamcloud.com/23500
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Reviewed-by: wangdi <di.wang@intel.com>
Reviewed-by: Lai Siyao <lai.siyao@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
18 files changed:
lustre/include/lustre/lustre_idl.h
lustre/include/lustre_linkea.h
lustre/include/obd_support.h
lustre/lfsck/lfsck_namespace.c
lustre/llite/llite_lib.c
lustre/lod/lod_object.c
lustre/mdd/mdd_dir.c
lustre/mdd/mdd_internal.h
lustre/mdt/mdt_handler.c
lustre/obdclass/linkea.c
lustre/osd-ldiskfs/osd_handler.c
lustre/osd-zfs/osd_xattr.c
lustre/osp/osp_object.c
lustre/ptlrpc/wiretest.c
lustre/target/out_lib.c
lustre/tests/sanity-lfsck.sh
lustre/utils/wirecheck.c
lustre/utils/wiretest.c

index afb9272..4b5b0e4 100644 (file)
@@ -3572,12 +3572,11 @@ struct lustre_capa_key {
 /** The link ea holds 1 \a link_ea_entry for each hardlink */
 #define LINK_EA_MAGIC 0x11EAF1DFUL
 struct link_ea_header {
 /** The link ea holds 1 \a link_ea_entry for each hardlink */
 #define LINK_EA_MAGIC 0x11EAF1DFUL
 struct link_ea_header {
-        __u32 leh_magic;
-        __u32 leh_reccount;
-        __u64 leh_len;      /* total size */
-        /* future use */
-        __u32 padding1;
-        __u32 padding2;
+       __u32 leh_magic;
+       __u32 leh_reccount;
+       __u64 leh_len;  /* total size */
+       __u32 leh_overflow_time;
+       __u32 leh_padding;
 };
 
 /** Hardlink data is name and parent fid.
 };
 
 /** Hardlink data is name and parent fid.
index a77f457..89a040f 100644 (file)
  * Author: di wang <di.wang@intel.com>
  */
 
  * Author: di wang <di.wang@intel.com>
  */
 
-#define DEFAULT_LINKEA_SIZE    4096
+/* There are several reasons to restrict the linkEA size:
+ *
+ * 1. Under DNE mode, if we do not restrict the linkEA size, and if there
+ *    are too many cross-MDTs hard links to the same object, then it will
+ *    casue the llog overflow.
+ *
+ * 2. Some backend has limited size for EA. For example, if without large
+ *    EA enabled, the ldiskfs will make all EAs to share one (4K) EA block.
+ *
+ * 3. Too many entries in linkEA will seriously affect linkEA performance
+ *    because we only support to locate linkEA entry consecutively. */
+#define MAX_LINKEA_SIZE        4096
 
 struct linkea_data {
        /**
 
 struct linkea_data {
        /**
@@ -44,6 +55,7 @@ struct linkea_data {
 
 int linkea_data_new(struct linkea_data *ldata, struct lu_buf *buf);
 int linkea_init(struct linkea_data *ldata);
 
 int linkea_data_new(struct linkea_data *ldata, struct lu_buf *buf);
 int linkea_init(struct linkea_data *ldata);
+int linkea_init_with_rec(struct linkea_data *ldata);
 void linkea_entry_unpack(const struct link_ea_entry *lee, int *reclen,
                         struct lu_name *lname, struct lu_fid *pfid);
 int linkea_entry_pack(struct link_ea_entry *lee, const struct lu_name *lname,
 void linkea_entry_unpack(const struct link_ea_entry *lee, int *reclen,
                         struct lu_name *lname, struct lu_fid *pfid);
 int linkea_entry_pack(struct link_ea_entry *lee, const struct lu_name *lname,
@@ -51,6 +63,9 @@ int linkea_entry_pack(struct link_ea_entry *lee, const struct lu_name *lname,
 int linkea_add_buf(struct linkea_data *ldata, const struct lu_name *lname,
                   const struct lu_fid *pfid);
 void linkea_del_buf(struct linkea_data *ldata, const struct lu_name *lname);
 int linkea_add_buf(struct linkea_data *ldata, const struct lu_name *lname,
                   const struct lu_fid *pfid);
 void linkea_del_buf(struct linkea_data *ldata, const struct lu_name *lname);
+int linkea_links_new(struct linkea_data *ldata, struct lu_buf *buf,
+                    const struct lu_name *cname, const struct lu_fid *pfid);
+int linkea_overflow_shrink(struct linkea_data *ldata);
 int linkea_links_find(struct linkea_data *ldata, const struct lu_name *lname,
                      const struct lu_fid  *pfid);
 
 int linkea_links_find(struct linkea_data *ldata, const struct lu_name *lname,
                      const struct lu_fid  *pfid);
 
index e477312..43b6149 100644 (file)
@@ -573,7 +573,6 @@ extern char obd_jobid_var[];
 #define OBD_FAIL_LFSCK_NO_NAMEENTRY    0x1624
 #define OBD_FAIL_LFSCK_MORE_NLINK      0x1625
 #define OBD_FAIL_LFSCK_LESS_NLINK      0x1626
 #define OBD_FAIL_LFSCK_NO_NAMEENTRY    0x1624
 #define OBD_FAIL_LFSCK_MORE_NLINK      0x1625
 #define OBD_FAIL_LFSCK_LESS_NLINK      0x1626
-#define OBD_FAIL_LFSCK_LINKEA_OVERFLOW 0x1627
 #define OBD_FAIL_LFSCK_BAD_NAME_HASH   0x1628
 #define OBD_FAIL_LFSCK_LOST_MASTER_LMV 0x1629
 #define OBD_FAIL_LFSCK_LOST_SLAVE_LMV  0x162a
 #define OBD_FAIL_LFSCK_BAD_NAME_HASH   0x1628
 #define OBD_FAIL_LFSCK_LOST_MASTER_LMV 0x1629
 #define OBD_FAIL_LFSCK_LOST_SLAVE_LMV  0x162a
index 29edfa9..75c939e 100644 (file)
@@ -675,7 +675,7 @@ static int lfsck_declare_namespace_exec_dir(const struct lu_env *env,
        if (rc == 0)
                /* For insert new linkEA entry. */
                rc = dt_declare_xattr_set(env, obj,
        if (rc == 0)
                /* For insert new linkEA entry. */
                rc = dt_declare_xattr_set(env, obj,
-                       lfsck_buf_get_const(env, NULL, DEFAULT_LINKEA_SIZE),
+                       lfsck_buf_get_const(env, NULL, MAX_LINKEA_SIZE),
                        XATTR_NAME_LINK, 0, handle);
        return rc;
 }
                        XATTR_NAME_LINK, 0, handle);
        return rc;
 }
index 63161f8..e542d38 100644 (file)
@@ -2708,7 +2708,7 @@ static int ll_linkea_decode(struct linkea_data *ldata, unsigned int linkno,
        int             rc;
        ENTRY;
 
        int             rc;
        ENTRY;
 
-       rc = linkea_init(ldata);
+       rc = linkea_init_with_rec(ldata);
        if (rc < 0)
                RETURN(rc);
 
        if (rc < 0)
                RETURN(rc);
 
index 7be5c45..cb15551 100644 (file)
@@ -1648,11 +1648,8 @@ static int lod_dir_declare_create_stripes(const struct lu_env *env,
                                PFID(lu_object_fid(&dto->do_lu)), i);
 
                sname = lod_name_get(env, stripe_name, strlen(stripe_name));
                                PFID(lu_object_fid(&dto->do_lu)), i);
 
                sname = lod_name_get(env, stripe_name, strlen(stripe_name));
-               rc = linkea_data_new(&ldata, &info->lti_linkea_buf);
-               if (rc != 0)
-                       GOTO(out, rc);
-
-               rc = linkea_add_buf(&ldata, sname, lu_object_fid(&dt->do_lu));
+               rc = linkea_links_new(&ldata, &info->lti_linkea_buf,
+                                     sname, lu_object_fid(&dt->do_lu));
                if (rc != 0)
                        GOTO(out, rc);
 
                if (rc != 0)
                        GOTO(out, rc);
 
@@ -2438,11 +2435,8 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt,
                                 PFID(lu_object_fid(&dto->do_lu)), i);
 
                sname = lod_name_get(env, stripe_name, strlen(stripe_name));
                                 PFID(lu_object_fid(&dto->do_lu)), i);
 
                sname = lod_name_get(env, stripe_name, strlen(stripe_name));
-               rc = linkea_data_new(&ldata, &info->lti_linkea_buf);
-               if (rc != 0)
-                       GOTO(out, rc);
-
-               rc = linkea_add_buf(&ldata, sname, lu_object_fid(&dt->do_lu));
+               rc = linkea_links_new(&ldata, &info->lti_linkea_buf,
+                                     sname, lu_object_fid(&dt->do_lu));
                if (rc != 0)
                        GOTO(out, rc);
 
                if (rc != 0)
                        GOTO(out, rc);
 
index 7012c4e..4c20548 100644 (file)
@@ -120,6 +120,77 @@ int mdd_lookup(const struct lu_env *env,
         RETURN(rc);
 }
 
         RETURN(rc);
 }
 
+/** Read the link EA into a temp buffer.
+ * Uses the mdd_thread_info::mti_big_buf since it is generally large.
+ * A pointer to the buffer is stored in \a ldata::ld_buf.
+ *
+ * \retval 0 or error
+ */
+static int __mdd_links_read(const struct lu_env *env,
+                           struct mdd_object *mdd_obj,
+                           struct linkea_data *ldata)
+{
+       int rc;
+
+       if (!mdd_object_exists(mdd_obj))
+               return -ENODATA;
+
+       /* First try a small buf */
+       LASSERT(env != NULL);
+       ldata->ld_buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_link_buf,
+                                              PAGE_SIZE);
+       if (ldata->ld_buf->lb_buf == NULL)
+               return -ENOMEM;
+
+       rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf, XATTR_NAME_LINK);
+       if (rc == -ERANGE) {
+               /* Buf was too small, figure out what we need. */
+               lu_buf_free(ldata->ld_buf);
+               rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf,
+                                  XATTR_NAME_LINK);
+               if (rc < 0)
+                       return rc;
+               ldata->ld_buf = lu_buf_check_and_alloc(ldata->ld_buf, rc);
+               if (ldata->ld_buf->lb_buf == NULL)
+                       return -ENOMEM;
+               rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf,
+                                 XATTR_NAME_LINK);
+       }
+       if (rc < 0) {
+               lu_buf_free(ldata->ld_buf);
+               ldata->ld_buf = NULL;
+               return rc;
+       }
+
+       return linkea_init(ldata);
+}
+
+static int mdd_links_read(const struct lu_env *env,
+                         struct mdd_object *mdd_obj,
+                         struct linkea_data *ldata)
+{
+       int rc;
+
+       rc = __mdd_links_read(env, mdd_obj, ldata);
+       if (!rc)
+               rc = linkea_init(ldata);
+
+       return rc;
+}
+
+static int mdd_links_read_with_rec(const struct lu_env *env,
+                                  struct mdd_object *mdd_obj,
+                                  struct linkea_data *ldata)
+{
+       int rc;
+
+       rc = __mdd_links_read(env, mdd_obj, ldata);
+       if (!rc)
+               rc = linkea_init_with_rec(ldata);
+
+       return rc;
+}
+
 /**
  * Get parent FID of the directory
  *
 /**
  * Get parent FID of the directory
  *
@@ -154,7 +225,7 @@ static inline int mdd_parent_fid(const struct lu_env *env,
                GOTO(lookup, rc = 0);
 
        ldata.ld_buf = buf;
                GOTO(lookup, rc = 0);
 
        ldata.ld_buf = buf;
-       rc = mdd_links_read(env, obj, &ldata);
+       rc = mdd_links_read_with_rec(env, obj, &ldata);
        if (rc != 0)
                GOTO(lookup, rc);
 
        if (rc != 0)
                GOTO(lookup, rc);
 
@@ -962,41 +1033,32 @@ static int mdd_linkea_prepare(const struct lu_env *env,
                              struct linkea_data *ldata)
 {
        int rc = 0;
                              struct linkea_data *ldata)
 {
        int rc = 0;
-       int rc2 = 0;
        ENTRY;
 
        if (OBD_FAIL_CHECK(OBD_FAIL_FID_IGIF))
        ENTRY;
 
        if (OBD_FAIL_CHECK(OBD_FAIL_FID_IGIF))
-               return 0;
+               RETURN(0);
 
        LASSERT(oldpfid != NULL || newpfid != NULL);
 
 
        LASSERT(oldpfid != NULL || newpfid != NULL);
 
-       if (mdd_obj->mod_flags & DEAD_OBJ) {
-               /* Prevent linkea to be updated which is NOT necessary. */
-               ldata->ld_reclen = 0;
-               /* No more links, don't bother */
+       if (mdd_obj->mod_flags & DEAD_OBJ)
+               /* Unnecessary to update linkEA for dead object.  */
                RETURN(0);
                RETURN(0);
-       }
 
        if (oldpfid != NULL) {
                rc = __mdd_links_del(env, mdd_obj, ldata, oldlname, oldpfid);
                if (rc) {
 
        if (oldpfid != NULL) {
                rc = __mdd_links_del(env, mdd_obj, ldata, oldlname, oldpfid);
                if (rc) {
-                       if ((check == 1) ||
-                           (rc != -ENODATA && rc != -ENOENT))
+                       if ((check == 1) || (rc != -ENODATA && rc != -ENOENT))
                                RETURN(rc);
                                RETURN(rc);
+
                        /* No changes done. */
                        rc = 0;
                }
        }
 
        /* If renaming, add the new record */
                        /* No changes done. */
                        rc = 0;
                }
        }
 
        /* If renaming, add the new record */
-       if (newpfid != NULL) {
-               /* even if the add fails, we still delete the out-of-date
-                * old link */
-               rc2 = __mdd_links_add(env, mdd_obj, ldata, newlname, newpfid,
-                                     first, check);
-       }
-
-       rc = rc != 0 ? rc : rc2;
+       if (newpfid != NULL)
+               rc = __mdd_links_add(env, mdd_obj, ldata, newlname, newpfid,
+                                    first, check);
 
        RETURN(rc);
 }
 
        RETURN(rc);
 }
@@ -1018,41 +1080,34 @@ int mdd_links_rename(const struct lu_env *env,
                ldata = &mdd_env_info(env)->mti_link_data;
                memset(ldata, 0, sizeof(*ldata));
                rc = mdd_linkea_prepare(env, mdd_obj, oldpfid, oldlname,
                ldata = &mdd_env_info(env)->mti_link_data;
                memset(ldata, 0, sizeof(*ldata));
                rc = mdd_linkea_prepare(env, mdd_obj, oldpfid, oldlname,
-                                       newpfid, newlname, first, check,
-                                       ldata);
-               if (rc != 0)
+                                       newpfid, newlname, first, check, ldata);
+               if (rc)
                        GOTO(out, rc);
        }
 
                        GOTO(out, rc);
        }
 
-       if (ldata->ld_reclen != 0)
+       if (!(mdd_obj->mod_flags & DEAD_OBJ))
                rc = mdd_links_write(env, mdd_obj, ldata, handle);
                rc = mdd_links_write(env, mdd_obj, ldata, handle);
-       EXIT;
+
+       GOTO(out, rc);
+
 out:
        if (rc != 0) {
 out:
        if (rc != 0) {
-               int error = 1;
-               if (rc == -EOVERFLOW || rc == -ENOSPC)
-                       error = 0;
                if (newlname == NULL)
                if (newlname == NULL)
-                       CDEBUG(error ? D_ERROR : D_OTHER,
-                              "link_ea add failed %d "DFID"\n",
+                       CERROR("link_ea add failed %d "DFID"\n",
                               rc, PFID(mdd_object_fid(mdd_obj)));
                else if (oldpfid == NULL)
                               rc, PFID(mdd_object_fid(mdd_obj)));
                else if (oldpfid == NULL)
-                       CDEBUG(error ? D_ERROR : D_OTHER,
-                              "link_ea add '%.*s' failed %d "DFID"\n",
-                              newlname->ln_namelen, newlname->ln_name,
-                              rc, PFID(mdd_object_fid(mdd_obj)));
+                       CERROR("link_ea add '%.*s' failed %d "DFID"\n",
+                              newlname->ln_namelen, newlname->ln_name, rc,
+                              PFID(mdd_object_fid(mdd_obj)));
                else if (newpfid == NULL)
                else if (newpfid == NULL)
-                       CDEBUG(error ? D_ERROR : D_OTHER,
-                              "link_ea del '%.*s' failed %d "DFID"\n",
-                              oldlname->ln_namelen, oldlname->ln_name,
-                              rc, PFID(mdd_object_fid(mdd_obj)));
+                       CERROR("link_ea del '%.*s' failed %d "DFID"\n",
+                              oldlname->ln_namelen, oldlname->ln_name, rc,
+                              PFID(mdd_object_fid(mdd_obj)));
                else
                else
-                       CDEBUG(error ? D_ERROR : D_OTHER,
-                              "link_ea rename '%.*s'->'%.*s' failed %d "
-                              DFID"\n",
-                              oldlname->ln_namelen, oldlname->ln_name,
-                              newlname->ln_namelen, newlname->ln_name,
-                              rc, PFID(mdd_object_fid(mdd_obj)));
+                       CERROR("link_ea rename '%.*s'->'%.*s' failed %d "DFID
+                              "\n", oldlname->ln_namelen, oldlname->ln_name,
+                              newlname->ln_namelen, newlname->ln_name, rc,
+                              PFID(mdd_object_fid(mdd_obj)));
        }
 
        if (is_vmalloc_addr(ldata->ld_buf))
        }
 
        if (is_vmalloc_addr(ldata->ld_buf))
@@ -1084,50 +1139,6 @@ static inline int mdd_links_del(const struct lu_env *env,
 }
 
 /** Read the link EA into a temp buffer.
 }
 
 /** Read the link EA into a temp buffer.
- * Uses the mdd_thread_info::mti_big_buf since it is generally large.
- * A pointer to the buffer is stored in \a ldata::ld_buf.
- *
- * \retval 0 or error
- */
-int mdd_links_read(const struct lu_env *env, struct mdd_object *mdd_obj,
-                  struct linkea_data *ldata)
-{
-       int rc;
-
-       if (!mdd_object_exists(mdd_obj))
-               return -ENODATA;
-
-       /* First try a small buf */
-       LASSERT(env != NULL);
-       ldata->ld_buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_link_buf,
-                                              PAGE_SIZE);
-       if (ldata->ld_buf->lb_buf == NULL)
-               return -ENOMEM;
-
-       rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf, XATTR_NAME_LINK);
-       if (rc == -ERANGE) {
-               /* Buf was too small, figure out what we need. */
-               lu_buf_free(ldata->ld_buf);
-               rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf,
-                                  XATTR_NAME_LINK);
-               if (rc < 0)
-                       return rc;
-               ldata->ld_buf = lu_buf_check_and_alloc(ldata->ld_buf, rc);
-               if (ldata->ld_buf->lb_buf == NULL)
-                       return -ENOMEM;
-               rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf,
-                                 XATTR_NAME_LINK);
-       }
-       if (rc < 0) {
-               lu_buf_free(ldata->ld_buf);
-               ldata->ld_buf = NULL;
-               return rc;
-       }
-
-       return linkea_init(ldata);
-}
-
-/** Read the link EA into a temp buffer.
  * Uses the name_buf since it is generally large.
  * \retval IS_ERR err
  * \retval ptr to \a lu_buf (always \a mti_big_buf)
  * Uses the name_buf since it is generally large.
  * \retval IS_ERR err
  * \retval ptr to \a lu_buf (always \a mti_big_buf)
@@ -1152,38 +1163,26 @@ int mdd_links_write(const struct lu_env *env, struct mdd_object *mdd_obj,
            ldata->ld_leh == NULL)
                return 0;
 
            ldata->ld_leh == NULL)
                return 0;
 
-       buf = mdd_buf_get_const(env, ldata->ld_buf->lb_buf,
-                               ldata->ld_leh->leh_len);
        if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_LINKEA))
                return 0;
 
        if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_LINKEA))
                return 0;
 
+again:
+       buf = mdd_buf_get_const(env, ldata->ld_buf->lb_buf,
+                               ldata->ld_leh->leh_len);
        rc = mdo_xattr_set(env, mdd_obj, buf, XATTR_NAME_LINK, 0, handle);
        rc = mdo_xattr_set(env, mdd_obj, buf, XATTR_NAME_LINK, 0, handle);
-       if (unlikely(rc == -ENOSPC) && S_ISREG(mdd_object_type(mdd_obj)) &&
-           mdd_object_remote(mdd_obj) == 0) {
-               struct lfsck_request *lr = &mdd_env_info(env)->mti_lr;
-               struct thandle  *sub_th;
-
-               /* XXX: If the linkEA is overflow, then we need to notify the
-                *      namespace LFSCK to skip "nlink" attribute verification
-                *      on this object to avoid the "nlink" to be shrinked by
-                *      wrong. It may be not good an interaction with LFSCK
-                *      like this. We will consider to replace it with other
-                *      mechanism in future. LU-5802. */
-               lfsck_pack_rfa(lr, mdo2fid(mdd_obj), LE_SKIP_NLINK,
-                              LFSCK_TYPE_NAMESPACE);
-
-               sub_th = thandle_get_sub_by_dt(env, handle,
-                               mdo2mdd(&mdd_obj->mod_obj)->mdd_bottom);
-               lfsck_in_notify(env, mdo2mdd(&mdd_obj->mod_obj)->mdd_bottom,
-                               lr, sub_th);
+       if (unlikely(rc == -ENOSPC)) {
+               rc = linkea_overflow_shrink(ldata);
+               if (likely(rc > 0))
+                       goto again;
        }
 
        return rc;
 }
 
        }
 
        return rc;
 }
 
-int mdd_declare_links_add(const struct lu_env *env, struct mdd_object *mdd_obj,
-                         struct thandle *handle, struct linkea_data *ldata,
-                         enum mdd_links_add_overflow overflow)
+static int mdd_declare_links_add(const struct lu_env *env,
+                                struct mdd_object *mdd_obj,
+                                struct thandle *handle,
+                                struct linkea_data *ldata)
 {
        int     rc;
        int     ea_len;
 {
        int     rc;
        int     ea_len;
@@ -1193,36 +1192,13 @@ int mdd_declare_links_add(const struct lu_env *env, struct mdd_object *mdd_obj,
                ea_len = ldata->ld_leh->leh_len;
                linkea = ldata->ld_buf->lb_buf;
        } else {
                ea_len = ldata->ld_leh->leh_len;
                linkea = ldata->ld_buf->lb_buf;
        } else {
-               ea_len = DEFAULT_LINKEA_SIZE;
+               ea_len = MAX_LINKEA_SIZE;
                linkea = NULL;
        }
 
                linkea = NULL;
        }
 
-       /* XXX: max size? */
        rc = mdo_declare_xattr_set(env, mdd_obj,
                                   mdd_buf_get_const(env, linkea, ea_len),
                                   XATTR_NAME_LINK, 0, handle);
        rc = mdo_declare_xattr_set(env, mdd_obj,
                                   mdd_buf_get_const(env, linkea, ea_len),
                                   XATTR_NAME_LINK, 0, handle);
-       if (rc != 0)
-               return rc;
-
-       if (mdd_object_remote(mdd_obj) == 0 && overflow == MLAO_CHECK) {
-               struct lfsck_request *lr = &mdd_env_info(env)->mti_lr;
-               struct thandle  *sub_th;
-
-               /* XXX: If the linkEA is overflow, then we need to notify the
-                *      namespace LFSCK to skip "nlink" attribute verification
-                *      on this object to avoid the "nlink" to be shrinked by
-                *      wrong. It may be not good an interaction with LFSCK
-                *      like this. We will consider to replace it with other
-                *      mechanism in future. LU-5802. */
-               lfsck_pack_rfa(lr, mdo2fid(mdd_obj), LE_SKIP_NLINK_DECLARE,
-                              LFSCK_TYPE_NAMESPACE);
-
-               sub_th = thandle_get_sub_by_dt(env, handle,
-                               mdo2mdd(&mdd_obj->mod_obj)->mdd_bottom);
-               rc = lfsck_in_notify(env,
-                                    mdo2mdd(&mdd_obj->mod_obj)->mdd_bottom,
-                                    lr, sub_th);
-       }
 
        return rc;
 }
 
        return rc;
 }
@@ -1236,7 +1212,7 @@ static inline int mdd_declare_links_del(const struct lu_env *env,
        /* For directory, the linkEA will be removed together
         * with the object. */
        if (!S_ISDIR(mdd_object_type(c)))
        /* For directory, the linkEA will be removed together
         * with the object. */
        if (!S_ISDIR(mdd_object_type(c)))
-               rc = mdd_declare_links_add(env, c, handle, NULL, MLAO_IGNORE);
+               rc = mdd_declare_links_add(env, c, handle, NULL);
 
        return rc;
 }
 
        return rc;
 }
@@ -1281,8 +1257,7 @@ static int mdd_declare_link(const struct lu_env *env,
        if (rc != 0)
                return rc;
 
        if (rc != 0)
                return rc;
 
-       rc = mdd_declare_links_add(env, c, handle, data,
-                       S_ISREG(mdd_object_type(c)) ? MLAO_CHECK : MLAO_IGNORE);
+       rc = mdd_declare_links_add(env, c, handle, data);
        if (rc != 0)
                return rc;
 
        if (rc != 0)
                return rc;
 
@@ -2122,7 +2097,7 @@ static int mdd_declare_create(const struct lu_env *env, struct mdd_device *mdd,
                if (rc != 0)
                        return rc;
 
                if (rc != 0)
                        return rc;
 
-               rc = mdd_declare_links_add(env, c, handle, ldata, MLAO_IGNORE);
+               rc = mdd_declare_links_add(env, c, handle, ldata);
                if (rc)
                        return rc;
 
                if (rc)
                        return rc;
 
@@ -2708,8 +2683,7 @@ static int mdd_declare_rename(const struct lu_env *env,
        if (rc)
                return rc;
 
        if (rc)
                return rc;
 
-       rc = mdd_declare_links_add(env, mdd_sobj, handle, ldata,
-               S_ISREG(mdd_object_type(mdd_sobj)) ? MLAO_CHECK : MLAO_IGNORE);
+       rc = mdd_declare_links_add(env, mdd_sobj, handle, ldata);
        if (rc)
                return rc;
 
        if (rc)
                return rc;
 
@@ -2840,8 +2814,12 @@ static int mdd_rename(const struct lu_env *env,
                 GOTO(out_pending, rc = PTR_ERR(handle));
 
        memset(ldata, 0, sizeof(*ldata));
                 GOTO(out_pending, rc = PTR_ERR(handle));
 
        memset(ldata, 0, sizeof(*ldata));
-       mdd_linkea_prepare(env, mdd_sobj, mdd_object_fid(mdd_spobj), lsname,
-                          mdd_object_fid(mdd_tpobj), ltname, 1, 0, ldata);
+       rc = mdd_linkea_prepare(env, mdd_sobj, mdd_object_fid(mdd_spobj),
+                               lsname, mdd_object_fid(mdd_tpobj), ltname,
+                               1, 0, ldata);
+       if (rc)
+               GOTO(stop, rc);
+
        rc = mdd_declare_rename(env, mdd, mdd_spobj, mdd_tpobj, mdd_sobj,
                                mdd_tobj, lsname, ltname, ma, ldata, handle);
        if (rc)
        rc = mdd_declare_rename(env, mdd, mdd_spobj, mdd_tpobj, mdd_sobj,
                                mdd_tobj, lsname, ltname, ma, ldata, handle);
        if (rc)
@@ -3120,8 +3098,7 @@ static int mdd_linkea_update_child_internal(const struct lu_env *env,
                linkea_entry_pack(ldata.ld_lee, &lname,
                                  mdd_object_fid(newparent));
                if (declare)
                linkea_entry_pack(ldata.ld_lee, &lname,
                                  mdd_object_fid(newparent));
                if (declare)
-                       rc = mdd_declare_links_add(env, child, handle, &ldata,
-                                                  MLAO_IGNORE);
+                       rc = mdd_declare_links_add(env, child, handle, &ldata);
                else
                        rc = mdd_links_write(env, child, &ldata, handle);
                break;
                else
                        rc = mdd_links_write(env, child, &ldata, handle);
                break;
@@ -3168,11 +3145,10 @@ static int mdd_update_linkea_internal(const struct lu_env *env,
        ENTRY;
 
        LASSERT(ldata->ld_buf != NULL);
        ENTRY;
 
        LASSERT(ldata->ld_buf != NULL);
+       LASSERT(ldata->ld_leh != NULL);
 
 
-again:
        /* If it is mulitple links file, we need update the name entry for
         * all parent */
        /* If it is mulitple links file, we need update the name entry for
         * all parent */
-       LASSERT(ldata->ld_leh != NULL);
        ldata->ld_lee = (struct link_ea_entry *)(ldata->ld_leh + 1);
        for (count = 0; count < ldata->ld_leh->leh_reccount; count++) {
                struct mdd_device       *mdd = mdo2mdd(&mdd_sobj->mod_obj);
        ldata->ld_lee = (struct link_ea_entry *)(ldata->ld_leh + 1);
        for (count = 0; count < ldata->ld_leh->leh_reccount; count++) {
                struct mdd_device       *mdd = mdo2mdd(&mdd_sobj->mod_obj);
@@ -3187,16 +3163,13 @@ again:
                        CWARN("%s: cannot find obj "DFID": rc = %ld\n",
                              mdd2obd_dev(mdd)->obd_name, PFID(&fid),
                              PTR_ERR(pobj));
                        CWARN("%s: cannot find obj "DFID": rc = %ld\n",
                              mdd2obd_dev(mdd)->obd_name, PFID(&fid),
                              PTR_ERR(pobj));
-                       linkea_del_buf(ldata, &lname);
-                       goto again;
+                       continue;
                }
 
                if (!mdd_object_exists(pobj)) {
                        CDEBUG(D_INFO, "%s: obj "DFID" does not exist\n",
                              mdd2obd_dev(mdd)->obd_name, PFID(&fid));
                }
 
                if (!mdd_object_exists(pobj)) {
                        CDEBUG(D_INFO, "%s: obj "DFID" does not exist\n",
                              mdd2obd_dev(mdd)->obd_name, PFID(&fid));
-                       linkea_del_buf(ldata, &lname);
-                       mdd_object_put(env, pobj);
-                       goto again;
+                       goto next_put;
                }
 
                if (pobj == mdd_pobj &&
                }
 
                if (pobj == mdd_pobj &&
@@ -3206,9 +3179,7 @@ again:
                        CDEBUG(D_INFO, "%s: skip its own %s: "DFID"\n",
                              mdd2obd_dev(mdd)->obd_name, child_name->ln_name,
                              PFID(&fid));
                        CDEBUG(D_INFO, "%s: skip its own %s: "DFID"\n",
                              mdd2obd_dev(mdd)->obd_name, child_name->ln_name,
                              PFID(&fid));
-                       linkea_del_buf(ldata, &lname);
-                       mdd_object_put(env, pobj);
-                       goto again;
+                       goto next_put;
                }
 
                CDEBUG(D_INFO, "%s: update "DFID" with "DNAME":"DFID"\n",
                }
 
                CDEBUG(D_INFO, "%s: update "DFID" with "DNAME":"DFID"\n",
@@ -3244,9 +3215,11 @@ again:
                                /* lnamelen is too big(> NAME_MAX + 16),
                                 * something wrong about this linkea, let's
                                 * skip it */
                                /* lnamelen is too big(> NAME_MAX + 16),
                                 * something wrong about this linkea, let's
                                 * skip it */
-                               linkea_del_buf(ldata, &lname);
-                               mdd_object_put(env, pobj);
-                               goto again;
+                               CWARN("%s: the name %.*s is too long under "
+                                     DFID"\n", mdd2obd_dev(mdd)->obd_name,
+                                     lname.ln_namelen, lname.ln_name,
+                                     PFID(&fid));
+                               goto next_put;
                        }
 
                        /* Note: lname might be without \0 at the end, see
                        }
 
                        /* Note: lname might be without \0 at the end, see
@@ -3260,14 +3233,9 @@ again:
                         * it might be packed into the RPC buffer. */
                        rc = mdd_lookup(env, &pobj->mod_obj, &lname,
                                        &info->mti_fid, NULL);
                         * it might be packed into the RPC buffer. */
                        rc = mdd_lookup(env, &pobj->mod_obj, &lname,
                                        &info->mti_fid, NULL);
-                       if (rc < 0 ||
-                           !lu_fid_eq(&info->mti_fid,
-                                       mdd_object_fid(mdd_sobj))) {
-                               /* skip invalid linkea entry */
-                               linkea_del_buf(ldata, &lname);
-                               mdd_object_put(env, pobj);
-                               goto again;
-                       }
+                       if (rc < 0 || !lu_fid_eq(&info->mti_fid,
+                                                mdd_object_fid(mdd_sobj)))
+                               GOTO(next_put, rc == -ENOENT ? 0 : rc);
 
                        rc = __mdd_index_delete(env, pobj, tmp_name, 0, handle);
                        if (rc != 0)
 
                        rc = __mdd_index_delete(env, pobj, tmp_name, 0, handle);
                        if (rc != 0)
@@ -3340,8 +3308,7 @@ static int mdd_migrate_xattrs(const struct lu_env *env,
        xname = list_xbuf.lb_buf;
        while (rem > 0) {
                xlen = strnlen(xname, rem - 1) + 1;
        xname = list_xbuf.lb_buf;
        while (rem > 0) {
                xlen = strnlen(xname, rem - 1) + 1;
-               if (strcmp(XATTR_NAME_LINK, xname) == 0 ||
-                   strcmp(XATTR_NAME_LMA, xname) == 0 ||
+               if (strcmp(XATTR_NAME_LMA, xname) == 0 ||
                    strcmp(XATTR_NAME_LMV, xname) == 0)
                        goto next;
 
                    strcmp(XATTR_NAME_LMV, xname) == 0)
                        goto next;
 
@@ -3384,10 +3351,21 @@ static int mdd_migrate_xattrs(const struct lu_env *env,
                if (rc != 0)
                        GOTO(stop_trans, rc);
 
                if (rc != 0)
                        GOTO(stop_trans, rc);
 
+again:
                rc = mdo_xattr_set(env, mdd_tobj, &xbuf, xname, 0, handle);
                if (rc == -EEXIST)
                        GOTO(stop_trans, rc = 0);
 
                rc = mdo_xattr_set(env, mdd_tobj, &xbuf, xname, 0, handle);
                if (rc == -EEXIST)
                        GOTO(stop_trans, rc = 0);
 
+               if (unlikely(rc == -ENOSPC &&
+                            strcmp(xname, XATTR_NAME_LINK) == 0)) {
+                       rc = linkea_overflow_shrink(
+                                       (struct linkea_data *)(xbuf.lb_buf));
+                       if (likely(rc > 0)) {
+                               xbuf.lb_len = rc;
+                               goto again;
+                       }
+               }
+
                if (rc != 0)
                        GOTO(stop_trans, rc);
 stop_trans:
                if (rc != 0)
                        GOTO(stop_trans, rc);
 stop_trans:
@@ -3438,8 +3416,7 @@ static int mdd_declare_migrate_create(const struct lu_env *env,
                if (rc != 0)
                        return rc;
        } else if (S_ISDIR(la->la_mode) && ldata != NULL) {
                if (rc != 0)
                        return rc;
        } else if (S_ISDIR(la->la_mode) && ldata != NULL) {
-               rc = mdd_declare_links_add(env, mdd_tobj, handle, ldata,
-                                          MLAO_IGNORE);
+               rc = mdd_declare_links_add(env, mdd_tobj, handle, ldata);
                if (rc != 0)
                        return rc;
        }
                if (rc != 0)
                        return rc;
        }
@@ -3520,7 +3497,7 @@ static int mdd_migrate_create(const struct lu_env *env,
                        spec->sp_cr_flags |= MDS_OPEN_HAS_EA;
                }
        } else if (S_ISDIR(la->la_mode)) {
                        spec->sp_cr_flags |= MDS_OPEN_HAS_EA;
                }
        } else if (S_ISDIR(la->la_mode)) {
-               rc = mdd_links_read(env, mdd_sobj, ldata);
+               rc = mdd_links_read_with_rec(env, mdd_sobj, ldata);
                if (rc == -ENODATA) {
                        /* ignore the non-linkEA error */
                        ldata = NULL;
                if (rc == -ENODATA) {
                        /* ignore the non-linkEA error */
                        ldata = NULL;
@@ -3883,7 +3860,7 @@ static int mdd_declare_migrate_update_name(const struct lu_env *env,
        if (rc != 0)
                return rc;
 
        if (rc != 0)
                return rc;
 
-       rc = mdd_declare_links_add(env, mdd_tobj, handle, NULL, MLAO_IGNORE);
+       rc = mdd_declare_links_add(env, mdd_tobj, handle, NULL);
        if (rc != 0)
                return rc;
 
        if (rc != 0)
                return rc;
 
@@ -4025,12 +4002,6 @@ static int mdd_migrate_update_name(const struct lu_env *env,
        if (rc != 0)
                GOTO(stop_trans, rc);
 
        if (rc != 0)
                GOTO(stop_trans, rc);
 
-       linkea_add_buf(ldata, lname, mdd_object_fid(mdd_pobj));
-       rc = mdd_links_add(env, mdd_tobj, mdo2fid(mdd_pobj), lname, handle,
-                          ldata, 1);
-       if (rc != 0)
-               GOTO(stop_trans, rc);
-
        mdd_write_lock(env, mdd_sobj, MOR_TGT_CHILD);
 
        mdd_sobj->mod_flags |= DEAD_OBJ;
        mdd_write_lock(env, mdd_sobj, MOR_TGT_CHILD);
 
        mdd_sobj->mod_flags |= DEAD_OBJ;
@@ -4172,6 +4143,12 @@ static int mdd_migrate_sanity_check(const struct lu_env *env,
        /* If there are still links locally, then the file will not be
         * migrated. */
        LASSERT(ldata->ld_leh != NULL);
        /* If there are still links locally, then the file will not be
         * migrated. */
        LASSERT(ldata->ld_leh != NULL);
+
+       /* If the linkEA is overflow, then means there are some unknown name
+        * entries under unknown parents, that will prevent the migration. */
+       if (unlikely(ldata->ld_leh->leh_overflow_time))
+               RETURN(1);
+
        ldata->ld_lee = (struct link_ea_entry *)(ldata->ld_leh + 1);
        for (count = 0; count < ldata->ld_leh->leh_reccount; count++) {
                struct lu_name          lname;
        ldata->ld_lee = (struct link_ea_entry *)(ldata->ld_leh + 1);
        for (count = 0; count < ldata->ld_leh->leh_reccount; count++) {
                struct lu_name          lname;
index e83ef20..37f644b 100644 (file)
@@ -160,11 +160,6 @@ struct mdd_thread_info {
        struct lu_seq_range       mti_range;
 };
 
        struct lu_seq_range       mti_range;
 };
 
-enum mdd_links_add_overflow {
-       MLAO_IGNORE     = false,
-       MLAO_CHECK      = true,
-};
-
 extern const char orph_index_name[];
 
 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
 extern const char orph_index_name[];
 
 int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
@@ -220,11 +215,6 @@ int mdd_is_root(struct mdd_device *mdd, const struct lu_fid *fid);
 int mdd_lookup(const struct lu_env *env,
                struct md_object *pobj, const struct lu_name *lname,
                struct lu_fid* fid, struct md_op_spec *spec);
 int mdd_lookup(const struct lu_env *env,
                struct md_object *pobj, const struct lu_name *lname,
                struct lu_fid* fid, struct md_op_spec *spec);
-int mdd_links_read(const struct lu_env *env, struct mdd_object *mdd_obj,
-                  struct linkea_data *ldata);
-int mdd_declare_links_add(const struct lu_env *env, struct mdd_object *mdd_obj,
-                         struct thandle *handle, struct linkea_data *ldata,
-                         enum mdd_links_add_overflow overflow);
 int mdd_links_write(const struct lu_env *env, struct mdd_object *mdd_obj,
                    struct linkea_data *ldata, struct thandle *handle);
 struct lu_buf *mdd_links_get(const struct lu_env *env,
 int mdd_links_write(const struct lu_env *env, struct mdd_object *mdd_obj,
                    struct linkea_data *ldata, struct thandle *handle);
 struct lu_buf *mdd_links_get(const struct lu_env *env,
index 1582399..45be626 100644 (file)
@@ -5515,7 +5515,7 @@ int mdt_links_read(struct mdt_thread_info *info, struct mdt_object *mdt_obj,
        if (rc < 0)
                return rc;
 
        if (rc < 0)
                return rc;
 
-       return linkea_init(ldata);
+       return linkea_init_with_rec(ldata);
 }
 
 /**
 }
 
 /**
index 7b8891a..a1bcc3d 100644 (file)
@@ -38,8 +38,10 @@ int linkea_data_new(struct linkea_data *ldata, struct lu_buf *buf)
                return -ENOMEM;
        ldata->ld_leh = ldata->ld_buf->lb_buf;
        ldata->ld_leh->leh_magic = LINK_EA_MAGIC;
                return -ENOMEM;
        ldata->ld_leh = ldata->ld_buf->lb_buf;
        ldata->ld_leh->leh_magic = LINK_EA_MAGIC;
-       ldata->ld_leh->leh_len = sizeof(struct link_ea_header);
        ldata->ld_leh->leh_reccount = 0;
        ldata->ld_leh->leh_reccount = 0;
+       ldata->ld_leh->leh_len = sizeof(struct link_ea_header);
+       ldata->ld_leh->leh_overflow_time = 0;
+       ldata->ld_leh->leh_padding = 0;
        return 0;
 }
 EXPORT_SYMBOL(linkea_data_new);
        return 0;
 }
 EXPORT_SYMBOL(linkea_data_new);
@@ -54,11 +56,15 @@ int linkea_init(struct linkea_data *ldata)
                leh->leh_magic = LINK_EA_MAGIC;
                leh->leh_reccount = __swab32(leh->leh_reccount);
                leh->leh_len = __swab64(leh->leh_len);
                leh->leh_magic = LINK_EA_MAGIC;
                leh->leh_reccount = __swab32(leh->leh_reccount);
                leh->leh_len = __swab64(leh->leh_len);
-               /* entries are swabbed by linkea_entry_unpack */
+               leh->leh_overflow_time = __swab32(leh->leh_overflow_time);
+               leh->leh_padding = __swab32(leh->leh_padding);
+               /* individual entries are swabbed by linkea_entry_unpack() */
        }
        }
+
        if (leh->leh_magic != LINK_EA_MAGIC)
                return -EINVAL;
        if (leh->leh_magic != LINK_EA_MAGIC)
                return -EINVAL;
-       if (leh->leh_reccount == 0)
+
+       if (leh->leh_reccount == 0 && leh->leh_overflow_time == 0)
                return -ENODATA;
 
        ldata->ld_leh = leh;
                return -ENODATA;
 
        ldata->ld_leh = leh;
@@ -66,6 +72,18 @@ int linkea_init(struct linkea_data *ldata)
 }
 EXPORT_SYMBOL(linkea_init);
 
 }
 EXPORT_SYMBOL(linkea_init);
 
+int linkea_init_with_rec(struct linkea_data *ldata)
+{
+       int rc;
+
+       rc = linkea_init(ldata);
+       if (!rc && ldata->ld_leh->leh_reccount == 0)
+               rc = -ENODATA;
+
+       return rc;
+}
+EXPORT_SYMBOL(linkea_init_with_rec);
+
 /**
  * Pack a link_ea_entry.
  * All elements are stored as chars to avoid alignment issues.
 /**
  * Pack a link_ea_entry.
  * All elements are stored as chars to avoid alignment issues.
@@ -97,6 +115,8 @@ EXPORT_SYMBOL(linkea_entry_pack);
 void linkea_entry_unpack(const struct link_ea_entry *lee, int *reclen,
                         struct lu_name *lname, struct lu_fid *pfid)
 {
 void linkea_entry_unpack(const struct link_ea_entry *lee, int *reclen,
                         struct lu_name *lname, struct lu_fid *pfid)
 {
+       LASSERT(lee != NULL);
+
        *reclen = (lee->lee_reclen[0] << 8) | lee->lee_reclen[1];
        memcpy(pfid, &lee->lee_parent_fid, sizeof(*pfid));
        fid_be_to_cpu(pfid, pfid);
        *reclen = (lee->lee_reclen[0] << 8) | lee->lee_reclen[1];
        memcpy(pfid, &lee->lee_parent_fid, sizeof(*pfid));
        fid_be_to_cpu(pfid, pfid);
@@ -113,25 +133,42 @@ EXPORT_SYMBOL(linkea_entry_unpack);
 int linkea_add_buf(struct linkea_data *ldata, const struct lu_name *lname,
                   const struct lu_fid *pfid)
 {
 int linkea_add_buf(struct linkea_data *ldata, const struct lu_name *lname,
                   const struct lu_fid *pfid)
 {
-       LASSERT(ldata->ld_leh != NULL);
+       struct link_ea_header *leh = ldata->ld_leh;
+       int reclen;
+
+       LASSERT(leh != NULL);
 
        if (lname == NULL || pfid == NULL)
                return -EINVAL;
 
 
        if (lname == NULL || pfid == NULL)
                return -EINVAL;
 
-       ldata->ld_reclen = lname->ln_namelen + sizeof(struct link_ea_entry);
-       if (ldata->ld_leh->leh_len + ldata->ld_reclen >
-           ldata->ld_buf->lb_len) {
+       reclen = lname->ln_namelen + sizeof(struct link_ea_entry);
+       if (unlikely(leh->leh_len + reclen > MAX_LINKEA_SIZE)) {
+               /* Use 32-bits to save the overflow time, although it will
+                * shrink the cfs_time_current_sec() returned 64-bits value
+                * to 32-bits value, it is still quite large and can be used
+                * for about 140 years. That is enough. */
+               leh->leh_overflow_time = cfs_time_current_sec();
+               if (unlikely(leh->leh_overflow_time == 0))
+                       leh->leh_overflow_time++;
+
+               CDEBUG(D_INODE, "No enough space to hold linkea entry '"
+                      DFID": %.*s' at %u\n", PFID(pfid), lname->ln_namelen,
+                      lname->ln_name, leh->leh_overflow_time);
+               return 0;
+       }
+
+       if (leh->leh_len + reclen > ldata->ld_buf->lb_len) {
                if (lu_buf_check_and_grow(ldata->ld_buf,
                if (lu_buf_check_and_grow(ldata->ld_buf,
-                                         ldata->ld_leh->leh_len +
-                                         ldata->ld_reclen) < 0)
+                                         leh->leh_len + reclen) < 0)
                        return -ENOMEM;
                        return -ENOMEM;
+
+               leh = ldata->ld_leh = ldata->ld_buf->lb_buf;
        }
 
        }
 
-       ldata->ld_leh = ldata->ld_buf->lb_buf;
-       ldata->ld_lee = ldata->ld_buf->lb_buf + ldata->ld_leh->leh_len;
+       ldata->ld_lee = ldata->ld_buf->lb_buf + leh->leh_len;
        ldata->ld_reclen = linkea_entry_pack(ldata->ld_lee, lname, pfid);
        ldata->ld_reclen = linkea_entry_pack(ldata->ld_lee, lname, pfid);
-       ldata->ld_leh->leh_len += ldata->ld_reclen;
-       ldata->ld_leh->leh_reccount++;
+       leh->leh_len += ldata->ld_reclen;
+       leh->leh_reccount++;
        CDEBUG(D_INODE, "New link_ea name '"DFID":%.*s' is added\n",
               PFID(pfid), lname->ln_namelen, lname->ln_name);
        return 0;
        CDEBUG(D_INODE, "New link_ea name '"DFID":%.*s' is added\n",
               PFID(pfid), lname->ln_namelen, lname->ln_name);
        return 0;
@@ -142,6 +179,7 @@ EXPORT_SYMBOL(linkea_add_buf);
 void linkea_del_buf(struct linkea_data *ldata, const struct lu_name *lname)
 {
        LASSERT(ldata->ld_leh != NULL && ldata->ld_lee != NULL);
 void linkea_del_buf(struct linkea_data *ldata, const struct lu_name *lname)
 {
        LASSERT(ldata->ld_leh != NULL && ldata->ld_lee != NULL);
+       LASSERT(ldata->ld_leh->leh_reccount > 0);
 
        ldata->ld_leh->leh_reccount--;
        ldata->ld_leh->leh_len -= ldata->ld_reclen;
 
        ldata->ld_leh->leh_reccount--;
        ldata->ld_leh->leh_len -= ldata->ld_reclen;
@@ -157,6 +195,70 @@ void linkea_del_buf(struct linkea_data *ldata, const struct lu_name *lname)
 }
 EXPORT_SYMBOL(linkea_del_buf);
 
 }
 EXPORT_SYMBOL(linkea_del_buf);
 
+int linkea_links_new(struct linkea_data *ldata, struct lu_buf *buf,
+                    const struct lu_name *cname, const struct lu_fid *pfid)
+{
+       int rc;
+
+       rc = linkea_data_new(ldata, buf);
+       if (!rc)
+               rc = linkea_add_buf(ldata, cname, pfid);
+
+       return rc;
+}
+EXPORT_SYMBOL(linkea_links_new);
+
+/**
+ * Mark the linkEA as overflow with current timestamp,
+ * and remove the last linkEA entry.
+ *
+ * Return the new linkEA size.
+ */
+int linkea_overflow_shrink(struct linkea_data *ldata)
+{
+       struct link_ea_header *leh;
+       struct lu_name tname;
+       struct lu_fid tfid;
+       int count;
+
+       leh = ldata->ld_leh = ldata->ld_buf->lb_buf;
+       if (leh->leh_magic == __swab32(LINK_EA_MAGIC)) {
+               leh->leh_magic = LINK_EA_MAGIC;
+               leh->leh_reccount = __swab32(leh->leh_reccount);
+               leh->leh_overflow_time = __swab32(leh->leh_overflow_time);
+               leh->leh_padding = __swab32(leh->leh_padding);
+       }
+
+       LASSERT(leh->leh_reccount > 0);
+
+       leh->leh_len = sizeof(struct link_ea_header);
+       leh->leh_reccount--;
+       if (unlikely(leh->leh_reccount == 0))
+               return 0;
+
+       leh->leh_overflow_time = cfs_time_current_sec();
+       if (unlikely(leh->leh_overflow_time == 0))
+               leh->leh_overflow_time++;
+       ldata->ld_reclen = 0;
+       ldata->ld_lee = (struct link_ea_entry *)(leh + 1);
+       for (count = 0; count < leh->leh_reccount; count++) {
+               linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen,
+                                   &tname, &tfid);
+               leh->leh_len += ldata->ld_reclen;
+               ldata->ld_lee = (struct link_ea_entry *)((char *)ldata->ld_lee +
+                                                        ldata->ld_reclen);
+       }
+
+       linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen, &tname, &tfid);
+       CDEBUG(D_INODE, "No enough space to hold the last linkea entry '"
+              DFID": %.*s', shrink it, left %d linkea entries, size %llu\n",
+              PFID(&tfid), tname.ln_namelen, tname.ln_name,
+              leh->leh_reccount, leh->leh_len);
+
+       return leh->leh_len;
+}
+EXPORT_SYMBOL(linkea_overflow_shrink);
+
 /**
  * Check if such a link exists in linkEA.
  *
 /**
  * Check if such a link exists in linkEA.
  *
@@ -177,8 +279,9 @@ int linkea_links_find(struct linkea_data *ldata, const struct lu_name *lname,
 
        LASSERT(ldata->ld_leh != NULL);
 
 
        LASSERT(ldata->ld_leh != NULL);
 
-       /* link #0 */
-       ldata->ld_lee = (struct link_ea_entry *)(ldata->ld_leh + 1);
+       /* link #0, if leh_reccount == 0 we skip the loop and return -ENOENT */
+       if (likely(ldata->ld_leh->leh_reccount > 0))
+               ldata->ld_lee = (struct link_ea_entry *)(ldata->ld_leh + 1);
 
        for (count = 0; count < ldata->ld_leh->leh_reccount; count++) {
                linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen,
 
        for (count = 0; count < ldata->ld_leh->leh_reccount; count++) {
                linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen,
index e284977..4c2e495 100644 (file)
@@ -3898,7 +3898,6 @@ static int osd_xattr_set(const struct lu_env *env, struct dt_object *dt,
 
        if (strcmp(name, XATTR_NAME_LMV) == 0) {
                struct lustre_mdt_attrs *lma = &info->oti_mdt_attrs;
 
        if (strcmp(name, XATTR_NAME_LMV) == 0) {
                struct lustre_mdt_attrs *lma = &info->oti_mdt_attrs;
-               int                      rc;
 
                rc = osd_get_lma(info, inode, &info->oti_obj_dentry, lma);
                if (rc != 0)
 
                rc = osd_get_lma(info, inode, &info->oti_obj_dentry, lma);
                if (rc != 0)
@@ -3912,10 +3911,6 @@ static int osd_xattr_set(const struct lu_env *env, struct dt_object *dt,
                        RETURN(rc);
        }
 
                        RETURN(rc);
        }
 
-       if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LINKEA_OVERFLOW) &&
-           strcmp(name, XATTR_NAME_LINK) == 0)
-               return -ENOSPC;
-
        rc = __osd_xattr_set(info, inode, name, buf->lb_buf, buf->lb_len,
                               fs_flags);
        osd_trans_exec_check(env, handle, OSD_OT_XATTR_SET);
        rc = __osd_xattr_set(info, inode, name, buf->lb_buf, buf->lb_len,
                               fs_flags);
        osd_trans_exec_check(env, handle, OSD_OT_XATTR_SET);
@@ -5039,8 +5034,8 @@ again:
        }
 
        ldata.ld_buf = buf;
        }
 
        ldata.ld_buf = buf;
-       rc = linkea_init(&ldata);
-       if (rc == 0) {
+       rc = linkea_init_with_rec(&ldata);
+       if (!rc) {
                linkea_first_entry(&ldata);
                linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen, NULL, fid);
        }
                linkea_first_entry(&ldata);
                linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen, NULL, fid);
        }
@@ -5083,8 +5078,8 @@ again:
        }
 
        ldata.ld_buf = buf;
        }
 
        ldata.ld_buf = buf;
-       rc = linkea_init(&ldata);
-       if (rc == 0)
+       rc = linkea_init_with_rec(&ldata);
+       if (!rc)
                rc = linkea_links_find(&ldata, &cname, pfid);
 
        RETURN(rc);
                rc = linkea_links_find(&ldata, &cname, pfid);
 
        RETURN(rc);
index 448f6fb..3b4c2f3 100644 (file)
@@ -607,10 +607,6 @@ int osd_xattr_set(const struct lu_env *env, struct dt_object *dt,
             strcmp(name, XATTR_NAME_POSIX_ACL_DEFAULT) == 0))
                RETURN(-EOPNOTSUPP);
 
             strcmp(name, XATTR_NAME_POSIX_ACL_DEFAULT) == 0))
                RETURN(-EOPNOTSUPP);
 
-       if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LINKEA_OVERFLOW) &&
-           strcmp(name, XATTR_NAME_LINK) == 0)
-               RETURN(-ENOSPC);
-
        oh = container_of0(handle, struct osd_thandle, ot_super);
 
        down_write(&obj->oo_guard);
        oh = container_of0(handle, struct osd_thandle, ot_super);
 
        down_write(&obj->oo_guard);
index cb1c252..84dda71 100644 (file)
@@ -1092,6 +1092,16 @@ int osp_xattr_set(const struct lu_env *env, struct dt_object *dt,
        if (rc != 0)
                RETURN(rc);
 
        if (rc != 0)
                RETURN(rc);
 
+       /* Do not cache linkEA that may be self-adjusted by peers
+        * under EA overflow case. */
+       if (strcmp(name, XATTR_NAME_LINK) == 0) {
+               oxe = osp_oac_xattr_find(o, name, true);
+               if (oxe != NULL)
+                       osp_oac_xattr_put(oxe);
+
+               RETURN(0);
+       }
+
        oxe = osp_oac_xattr_find_or_add(o, name, buf->lb_len);
        if (oxe == NULL) {
                CWARN("%s: cannot cache xattr '%s' of "DFID"\n",
        oxe = osp_oac_xattr_find_or_add(o, name, buf->lb_len);
        if (oxe == NULL) {
                CWARN("%s: cannot cache xattr '%s' of "DFID"\n",
index 428ae10..55c6b5e 100644 (file)
@@ -4143,14 +4143,14 @@ void lustre_assert_wire_constants(void)
                 (long long)(int)offsetof(struct link_ea_header, leh_len));
        LASSERTF((int)sizeof(((struct link_ea_header *)0)->leh_len) == 8, "found %lld\n",
                 (long long)(int)sizeof(((struct link_ea_header *)0)->leh_len));
                 (long long)(int)offsetof(struct link_ea_header, leh_len));
        LASSERTF((int)sizeof(((struct link_ea_header *)0)->leh_len) == 8, "found %lld\n",
                 (long long)(int)sizeof(((struct link_ea_header *)0)->leh_len));
-       LASSERTF((int)offsetof(struct link_ea_header, padding1) == 16, "found %lld\n",
-                (long long)(int)offsetof(struct link_ea_header, padding1));
-       LASSERTF((int)sizeof(((struct link_ea_header *)0)->padding1) == 4, "found %lld\n",
-                (long long)(int)sizeof(((struct link_ea_header *)0)->padding1));
-       LASSERTF((int)offsetof(struct link_ea_header, padding2) == 20, "found %lld\n",
-                (long long)(int)offsetof(struct link_ea_header, padding2));
-       LASSERTF((int)sizeof(((struct link_ea_header *)0)->padding2) == 4, "found %lld\n",
-                (long long)(int)sizeof(((struct link_ea_header *)0)->padding2));
+       LASSERTF((int)offsetof(struct link_ea_header, leh_overflow_time) == 16, "found %lld\n",
+                (long long)(int)offsetof(struct link_ea_header, leh_overflow_time));
+       LASSERTF((int)sizeof(((struct link_ea_header *)0)->leh_overflow_time) == 4, "found %lld\n",
+                (long long)(int)sizeof(((struct link_ea_header *)0)->leh_overflow_time));
+       LASSERTF((int)offsetof(struct link_ea_header, leh_padding) == 20, "found %lld\n",
+                (long long)(int)offsetof(struct link_ea_header, leh_padding));
+       LASSERTF((int)sizeof(((struct link_ea_header *)0)->leh_padding) == 4, "found %lld\n",
+                (long long)(int)sizeof(((struct link_ea_header *)0)->leh_padding));
        CLASSERT(LINK_EA_MAGIC == 0x11EAF1DFUL);
 
        /* Checks for struct link_ea_entry */
        CLASSERT(LINK_EA_MAGIC == 0x11EAF1DFUL);
 
        /* Checks for struct link_ea_entry */
index a16774b..98f2741 100644 (file)
@@ -37,6 +37,7 @@
 #include <md_object.h>
 #include <obd.h>
 #include <obd_class.h>
 #include <md_object.h>
 #include <obd.h>
 #include <obd_class.h>
+#include <lustre_linkea.h>
 
 #include "tgt_internal.h"
 
 
 #include "tgt_internal.h"
 
@@ -733,41 +734,45 @@ static int out_tx_xattr_set_exec(const struct lu_env *env,
 {
        struct dt_object *dt_obj = arg->object;
        int rc;
 {
        struct dt_object *dt_obj = arg->object;
        int rc;
+       ENTRY;
 
        CDEBUG(D_INFO, "%s: set xattr buf %p name %s flag %d\n",
               dt_obd_name(th->th_dev), arg->u.xattr_set.buf.lb_buf,
               arg->u.xattr_set.name, arg->u.xattr_set.flags);
 
 
        CDEBUG(D_INFO, "%s: set xattr buf %p name %s flag %d\n",
               dt_obd_name(th->th_dev), arg->u.xattr_set.buf.lb_buf,
               arg->u.xattr_set.name, arg->u.xattr_set.flags);
 
-       if (!lu_object_exists(&dt_obj->do_lu))
-               GOTO(out, rc = -ENOENT);
-
-       dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
-       rc = dt_xattr_set(env, dt_obj, &arg->u.xattr_set.buf,
-                         arg->u.xattr_set.name, arg->u.xattr_set.flags,
-                         th);
-       /**
-        * Ignore errors if this is LINK EA
-        **/
-       if (unlikely(rc != 0 &&
-                    strcmp(arg->u.xattr_set.name, XATTR_NAME_LINK) == 0)) {
-               /* XXX: If the linkEA is overflow, then we need to notify the
-                *      namespace LFSCK to skip "nlink" attribute verification
-                *      on this object to avoid the "nlink" to be shrinked by
-                *      wrong. It may be not good an interaction with LFSCK
-                *      like this. We will consider to replace it with other
-                *      mechanism in future. LU-5802. */
-               if (rc == -ENOSPC && arg->reply != NULL) {
-                       struct lfsck_request *lr = &tgt_th_info(env)->tti_lr;
-
-                       lfsck_pack_rfa(lr, lu_object_fid(&dt_obj->do_lu),
-                                      LE_SKIP_NLINK, LFSCK_TYPE_NAMESPACE);
-                       tgt_lfsck_in_notify(env,
-                               tgt_ses_info(env)->tsi_tgt->lut_bottom, lr, th);
+       if (!lu_object_exists(&dt_obj->do_lu)) {
+               rc = -ENOENT;
+       } else {
+               struct linkea_data ldata = { 0 };
+               bool linkea;
+
+               ldata.ld_buf = &arg->u.xattr_set.buf;
+               if (strcmp(arg->u.xattr_set.name, XATTR_NAME_LINK) == 0) {
+                       linkea = true;
+                       rc = linkea_init(&ldata);
+                       if (unlikely(rc))
+                               GOTO(out, rc == -ENODATA ? -EINVAL : rc);
+               } else {
+                       linkea = false;
                }
 
                }
 
-               rc = 0;
+               dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
+
+again:
+               rc = dt_xattr_set(env, dt_obj, ldata.ld_buf,
+                                 arg->u.xattr_set.name, arg->u.xattr_set.flags,
+                                 th);
+               if (unlikely(rc == -ENOSPC && linkea)) {
+                       rc = linkea_overflow_shrink(&ldata);
+                       if (likely(rc > 0)) {
+                               arg->u.xattr_set.buf.lb_len = rc;
+                               goto again;
+                       }
+               }
+               dt_write_unlock(env, dt_obj);
        }
        }
-       dt_write_unlock(env, dt_obj);
+
+       GOTO(out, rc);
 
 out:
        CDEBUG(D_INFO, "%s: insert xattr set reply %p index %d: rc = %d\n",
 
 out:
        CDEBUG(D_INFO, "%s: insert xattr set reply %p index %d: rc = %d\n",
@@ -794,24 +799,6 @@ int out_xattr_set_add_exec(const struct lu_env *env, struct dt_object *dt_obj,
        if (rc != 0)
                return rc;
 
        if (rc != 0)
                return rc;
 
-       if (strcmp(name, XATTR_NAME_LINK) == 0 && reply != NULL) {
-               struct lfsck_request *lr = &tgt_th_info(env)->tti_lr;
-
-               /* XXX: If the linkEA is overflow, then we need to notify the
-                *      namespace LFSCK to skip "nlink" attribute verification
-                *      on this object to avoid the "nlink" to be shrinked by
-                *      wrong. It may be not good an interaction with LFSCK
-                *      like this. We will consider to replace it with other
-                *      mechanism in future. LU-5802. */
-               lfsck_pack_rfa(lr, lu_object_fid(&dt_obj->do_lu),
-                              LE_SKIP_NLINK_DECLARE, LFSCK_TYPE_NAMESPACE);
-               rc = tgt_lfsck_in_notify(env,
-                                        tgt_ses_info(env)->tsi_tgt->lut_bottom,
-                                        lr, ta->ta_handle);
-               if (rc != 0)
-                       return rc;
-       }
-
        arg = tx_add_exec(ta, out_tx_xattr_set_exec, NULL, file, line);
        if (IS_ERR(arg))
                return PTR_ERR(arg);
        arg = tx_add_exec(ta, out_tx_xattr_set_exec, NULL, file, line);
        if (IS_ERR(arg))
                return PTR_ERR(arg);
index 0d3f136..d4507cf 100644 (file)
@@ -3812,8 +3812,6 @@ test_29c() {
        echo "Inject failure stub on MDT0 to simulate the case that"
        echo "foo's hard links exceed the object's linkEA limitation."
 
        echo "Inject failure stub on MDT0 to simulate the case that"
        echo "foo's hard links exceed the object's linkEA limitation."
 
-       #define OBD_FAIL_LFSCK_LINKEA_OVERFLOW  0x1627
-       do_facet $SINGLEMDS $LCTL set_param fail_loc=0x1627
        ln $DIR/$tdir/d0/foo $DIR/$tdir/d0/h2 ||
                error "(4) Fail to hard link to $DIR/$tdir/d0/foo"
 
        ln $DIR/$tdir/d0/foo $DIR/$tdir/d0/h2 ||
                error "(4) Fail to hard link to $DIR/$tdir/d0/foo"
 
@@ -3833,7 +3831,6 @@ test_29c() {
 
        wait_all_targets_blocked namespace completed 8
 
 
        wait_all_targets_blocked namespace completed 8
 
-       do_facet $SINGLEMDS $LCTL set_param fail_loc=0
        local repaired=$($SHOW_NAMESPACE |
                         awk '/^nlinks_repaired/ { print $2 }')
        [ $repaired -eq 0 ] ||
        local repaired=$($SHOW_NAMESPACE |
                         awk '/^nlinks_repaired/ { print $2 }')
        [ $repaired -eq 0 ] ||
@@ -3848,7 +3845,8 @@ test_29c() {
        [ $count2 -eq 2 ] ||
                error "(11) Repaired something unexpectedly: $count2"
 }
        [ $count2 -eq 2 ] ||
                error "(11) Repaired something unexpectedly: $count2"
 }
-run_test 29c "Not verify nlink attr if hark links exceed linkEA limitation"
+# disable test_29c temporarily, it will be re-enabled in subsequent patch.
+#run_test 29c "Not verify nlink attr if hard links exceed linkEA limitation"
 
 test_30() {
        [ $(facet_fstype $SINGLEMDS) != ldiskfs ] &&
 
 test_30() {
        [ $(facet_fstype $SINGLEMDS) != ldiskfs ] &&
index 902394d..8dff709 100644 (file)
@@ -1926,8 +1926,8 @@ check_link_ea_header(void)
        CHECK_MEMBER(link_ea_header, leh_magic);
        CHECK_MEMBER(link_ea_header, leh_reccount);
        CHECK_MEMBER(link_ea_header, leh_len);
        CHECK_MEMBER(link_ea_header, leh_magic);
        CHECK_MEMBER(link_ea_header, leh_reccount);
        CHECK_MEMBER(link_ea_header, leh_len);
-       CHECK_MEMBER(link_ea_header, padding1);
-       CHECK_MEMBER(link_ea_header, padding2);
+       CHECK_MEMBER(link_ea_header, leh_overflow_time);
+       CHECK_MEMBER(link_ea_header, leh_padding);
 
        CHECK_CDEFINE(LINK_EA_MAGIC);
 }
 
        CHECK_CDEFINE(LINK_EA_MAGIC);
 }
index 98ba8d7..f3f798b 100644 (file)
@@ -4158,14 +4158,14 @@ void lustre_assert_wire_constants(void)
                 (long long)(int)offsetof(struct link_ea_header, leh_len));
        LASSERTF((int)sizeof(((struct link_ea_header *)0)->leh_len) == 8, "found %lld\n",
                 (long long)(int)sizeof(((struct link_ea_header *)0)->leh_len));
                 (long long)(int)offsetof(struct link_ea_header, leh_len));
        LASSERTF((int)sizeof(((struct link_ea_header *)0)->leh_len) == 8, "found %lld\n",
                 (long long)(int)sizeof(((struct link_ea_header *)0)->leh_len));
-       LASSERTF((int)offsetof(struct link_ea_header, padding1) == 16, "found %lld\n",
-                (long long)(int)offsetof(struct link_ea_header, padding1));
-       LASSERTF((int)sizeof(((struct link_ea_header *)0)->padding1) == 4, "found %lld\n",
-                (long long)(int)sizeof(((struct link_ea_header *)0)->padding1));
-       LASSERTF((int)offsetof(struct link_ea_header, padding2) == 20, "found %lld\n",
-                (long long)(int)offsetof(struct link_ea_header, padding2));
-       LASSERTF((int)sizeof(((struct link_ea_header *)0)->padding2) == 4, "found %lld\n",
-                (long long)(int)sizeof(((struct link_ea_header *)0)->padding2));
+       LASSERTF((int)offsetof(struct link_ea_header, leh_overflow_time) == 16, "found %lld\n",
+                (long long)(int)offsetof(struct link_ea_header, leh_overflow_time));
+       LASSERTF((int)sizeof(((struct link_ea_header *)0)->leh_overflow_time) == 4, "found %lld\n",
+                (long long)(int)sizeof(((struct link_ea_header *)0)->leh_overflow_time));
+       LASSERTF((int)offsetof(struct link_ea_header, leh_padding) == 20, "found %lld\n",
+                (long long)(int)offsetof(struct link_ea_header, leh_padding));
+       LASSERTF((int)sizeof(((struct link_ea_header *)0)->leh_padding) == 4, "found %lld\n",
+                (long long)(int)sizeof(((struct link_ea_header *)0)->leh_padding));
        CLASSERT(LINK_EA_MAGIC == 0x11EAF1DFUL);
 
        /* Checks for struct link_ea_entry */
        CLASSERT(LINK_EA_MAGIC == 0x11EAF1DFUL);
 
        /* Checks for struct link_ea_entry */