Whamcloud - gitweb
LU-2912 mdd: move fid2path from MDD to MDT
authorwangdi <di.wang@whamcloud.com>
Sun, 22 Dec 2013 04:07:23 +0000 (20:07 -0800)
committerOleg Drokin <oleg.drokin@intel.com>
Wed, 20 Mar 2013 18:43:10 +0000 (14:43 -0400)
1. Move some linkEA API to obdclass, so both MDD(set linkea)
and MDT(fid2path) can access these LINKEA operation.

2. Move fid2path from MDD to MDT, so it can detect remote
object, and return -EREMOTE to the client. Then the client
will try fid2path on another MDT, and finally the client
will assemble the path fragments from different MDT.

Signed-off-by: wang di <di.wang@intel.com>
Change-Id: I464cd1fdd44ebbe02c94821f910294829f3d9b94
Reviewed-on: http://review.whamcloud.com/5676
Tested-by: Hudson
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Alex Zhuravlev <alexey.zhuravlev@intel.com>
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
21 files changed:
lustre/include/Makefile.am
lustre/include/lu_object.h
lustre/include/lustre_linkea.h [new file with mode: 0644]
lustre/include/md_object.h
lustre/include/obd.h
lustre/llite/file.c
lustre/lmv/lmv_obd.c
lustre/mdc/mdc_request.c
lustre/mdd/mdd_compat.c
lustre/mdd/mdd_device.c
lustre/mdd/mdd_dir.c
lustre/mdd/mdd_internal.h
lustre/mdd/mdd_lfsck.c
lustre/mdd/mdd_object.c
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_internal.h
lustre/obdclass/Makefile.in
lustre/obdclass/linkea.c [new file with mode: 0644]
lustre/obdclass/lu_object.c
lustre/osp/osp_md_object.c
lustre/tests/sanity.sh

index 313c2cc..f5d0b38 100644 (file)
@@ -51,4 +51,4 @@ EXTRA_DIST = ioctl.h liblustre.h lprocfs_status.h lustre_cfg.h        \
              lustre_fid.h lustre_fld.h lustre_req_layout.h lustre_capa.h \
              lustre_idmap.h lustre_eacl.h interval_tree.h obd_cksum.h \
             lu_ref.h cl_object.h lustre_acl.h lclient.h lu_target.h \
-            lustre_update.h
+            lustre_update.h lustre_linkea.h
index 5f67a06..d3b2089 100644 (file)
@@ -1321,9 +1321,6 @@ struct lu_buf {
         ssize_t lb_len;
 };
 
-/** null buffer */
-extern struct lu_buf LU_BUF_NULL;
-
 #define DLUBUF "(%p %zu)"
 #define PLUBUF(buf) (buf)->lb_buf, (buf)->lb_len
 /**
@@ -1356,9 +1353,15 @@ struct lu_object *lu_object_anon(const struct lu_env *env,
                                 struct lu_device *dev,
                                 const struct lu_object_conf *conf);
 
+/** null buffer */
+extern struct lu_buf LU_BUF_NULL;
+
 void lu_buf_free(struct lu_buf *buf);
 void lu_buf_alloc(struct lu_buf *buf, int size);
 void lu_buf_realloc(struct lu_buf *buf, int size);
 
+int lu_buf_check_and_grow(struct lu_buf *buf, int len);
+struct lu_buf *lu_buf_check_and_alloc(struct lu_buf *buf, int len);
+
 /** @} lu */
 #endif /* __LUSTRE_LU_OBJECT_H */
diff --git a/lustre/include/lustre_linkea.h b/lustre/include/lustre_linkea.h
new file mode 100644 (file)
index 0000000..5790be9
--- /dev/null
@@ -0,0 +1,57 @@
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 021110-1307, USA
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2013, Intel Corporation.
+ * Use is subject to license terms.
+ *
+ * Author: di wang <di.wang@intel.com>
+ */
+
+struct linkea_data {
+       /**
+        * Buffer to keep link EA body.
+        */
+       struct lu_buf           *ld_buf;
+       /**
+        * The matched header, entry and its lenght in the EA
+        */
+       struct link_ea_header   *ld_leh;
+       struct link_ea_entry    *ld_lee;
+       int                     ld_reclen;
+};
+
+int linkea_data_new(struct linkea_data *ldata, struct lu_buf *buf);
+int linkea_init(struct linkea_data *ldata);
+void linkea_entry_unpack(const struct link_ea_entry *lee, int *reclen,
+                        struct lu_name *lname, struct lu_fid *pfid);
+int linkea_add_buf(struct linkea_data *ldata, const struct lu_name *lname,
+                  const struct lu_fid *pfid);
+void linkea_del_buf(struct linkea_data *ldata, const struct lu_name *lname);
+int linkea_links_find(struct linkea_data *ldata, const struct lu_name *lname,
+                     const struct lu_fid  *pfid);
+
+#define LINKEA_NEXT_ENTRY(ldata)       \
+       (struct link_ea_entry *)((char *)ldata.ld_lee + ldata.ld_reclen)
+
+#define LINKEA_FIRST_ENTRY(ldata)      \
+       (struct link_ea_entry *)(ldata.ld_leh + 1)
index a4af8b4..7f4f4ff 100644 (file)
@@ -276,14 +276,13 @@ struct md_object_operations {
                             struct lustre_capa *, int renewal);
 
         int (*moo_object_sync)(const struct lu_env *, struct md_object *);
-        int (*moo_path)(const struct lu_env *env, struct md_object *obj,
-                        char *path, int pathlen, __u64 *recno, int *linkno);
-        int (*moo_file_lock)(const struct lu_env *env, struct md_object *obj,
-                             struct lov_mds_md *lmm, struct ldlm_extent *extent,
-                             struct lustre_handle *lockh);
-        int (*moo_file_unlock)(const struct lu_env *env, struct md_object *obj,
-                               struct lov_mds_md *lmm,
-                               struct lustre_handle *lockh);
+
+       int (*moo_file_lock)(const struct lu_env *env, struct md_object *obj,
+                            struct lov_mds_md *lmm, struct ldlm_extent *extent,
+                            struct lustre_handle *lockh);
+       int (*moo_file_unlock)(const struct lu_env *env, struct md_object *obj,
+                              struct lov_mds_md *lmm,
+                              struct lustre_handle *lockh);
        int (*moo_object_lock)(const struct lu_env *env, struct md_object *obj,
                               struct lustre_handle *lh,
                               struct ldlm_enqueue_info *einfo,
@@ -355,8 +354,8 @@ struct md_dir_operations {
 
 struct md_device_operations {
         /** meta-data device related handlers. */
-        int (*mdo_root_get)(const struct lu_env *env, struct md_device *m,
-                            struct lu_fid *f);
+       int (*mdo_root_get)(const struct lu_env *env, struct md_device *m,
+                           struct lu_fid *f);
 
         int (*mdo_maxsize_get)(const struct lu_env *env, struct md_device *m,
                                int *md_size, int *cookie_size);
@@ -685,14 +684,6 @@ static inline int mo_capa_get(const struct lu_env *env,
         return m->mo_ops->moo_capa_get(env, m, c, renewal);
 }
 
-static inline int mo_path(const struct lu_env *env, struct md_object *m,
-                          char *path, int pathlen, __u64 *recno, int *linkno)
-{
-        if (m->mo_ops->moo_path == NULL)
-                return -ENOSYS;
-        return m->mo_ops->moo_path(env, m, path, pathlen, recno, linkno);
-}
-
 static inline int mo_object_sync(const struct lu_env *env, struct md_object *m)
 {
         LASSERT(m->mo_ops->moo_object_sync);
index d36a871..60a7e57 100644 (file)
@@ -1159,6 +1159,7 @@ enum obd_cleanup_stage {
 
 #define KEY_CACHE_SET          "cache_set"
 #define KEY_CACHE_LRU_SHRINK   "cache_lru_shrink"
+#define KEY_CHANGELOG_INDEX    "changelog_index"
 
 struct lu_context;
 
index 492152c..e79113a 100644 (file)
@@ -1748,6 +1748,7 @@ int ll_fid2path(struct inode *inode, void *arg)
        rc = obd_iocontrol(OBD_IOC_FID2PATH, exp, outsize, gfout, NULL);
        if (rc)
                GOTO(gf_free, rc);
+
        if (copy_to_user(arg, gfout, outsize))
                rc = -EFAULT;
 
index cdb3d61..c868ae8 100644 (file)
@@ -730,6 +730,88 @@ out_local:
         RETURN(rc);
 }
 
+static int lmv_fid2path(struct obd_export *exp, int len, void *karg, void *uarg)
+{
+       struct obd_device       *obddev = class_exp2obd(exp);
+       struct lmv_obd          *lmv = &obddev->u.lmv;
+       struct getinfo_fid2path *gf;
+       struct lmv_tgt_desc     *tgt;
+       struct getinfo_fid2path *remote_gf = NULL;
+       int                     remote_gf_size = 0;
+       int                     rc;
+
+       gf = (struct getinfo_fid2path *)karg;
+       tgt = lmv_find_target(lmv, &gf->gf_fid);
+       if (IS_ERR(tgt))
+               RETURN(PTR_ERR(tgt));
+
+repeat_fid2path:
+       rc = obd_iocontrol(OBD_IOC_FID2PATH, tgt->ltd_exp, len, gf, uarg);
+       if (rc != 0 && rc != -EREMOTE)
+               GOTO(out_fid2path, rc);
+
+       /* If remote_gf != NULL, it means just building the
+        * path on the remote MDT, copy this path segement to gf */
+       if (remote_gf != NULL) {
+               struct getinfo_fid2path *ori_gf;
+               char *ptr;
+
+               ori_gf = (struct getinfo_fid2path *)karg;
+               if (strlen(ori_gf->gf_path) +
+                   strlen(gf->gf_path) > ori_gf->gf_pathlen)
+                       GOTO(out_fid2path, rc = -EOVERFLOW);
+
+               ptr = ori_gf->gf_path;
+
+               memmove(ptr + strlen(gf->gf_path) + 1, ptr,
+                       strlen(ori_gf->gf_path));
+
+               strncpy(ptr, gf->gf_path, strlen(gf->gf_path));
+               ptr += strlen(gf->gf_path);
+               *ptr = '/';
+       }
+
+       CDEBUG(D_INFO, "%s: get path %s "DFID" rec: "LPU64" ln: %u\n",
+              tgt->ltd_exp->exp_obd->obd_name,
+              gf->gf_path, PFID(&gf->gf_fid), gf->gf_recno,
+              gf->gf_linkno);
+
+       if (rc == 0)
+               GOTO(out_fid2path, rc);
+
+       /* sigh, has to go to another MDT to do path building further */
+       if (remote_gf == NULL) {
+               remote_gf_size = sizeof(*remote_gf) + PATH_MAX;
+               OBD_ALLOC(remote_gf, remote_gf_size);
+               if (remote_gf == NULL)
+                       GOTO(out_fid2path, rc = -ENOMEM);
+               remote_gf->gf_pathlen = PATH_MAX;
+       }
+
+       if (!fid_is_sane(&gf->gf_fid)) {
+               CERROR("%s: invalid FID "DFID": rc = %d\n",
+                      tgt->ltd_exp->exp_obd->obd_name,
+                      PFID(&gf->gf_fid), -EINVAL);
+               GOTO(out_fid2path, rc = -EINVAL);
+       }
+
+       tgt = lmv_find_target(lmv, &gf->gf_fid);
+       if (IS_ERR(tgt))
+               GOTO(out_fid2path, rc = -EINVAL);
+
+       remote_gf->gf_fid = gf->gf_fid;
+       remote_gf->gf_recno = -1;
+       remote_gf->gf_linkno = -1;
+       memset(remote_gf->gf_path, 0, remote_gf->gf_pathlen);
+       gf = remote_gf;
+       goto repeat_fid2path;
+
+out_fid2path:
+       if (remote_gf != NULL)
+               OBD_FREE(remote_gf, remote_gf_size);
+       RETURN(rc);
+}
+
 static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp,
                          int len, void *karg, void *uarg)
 {
@@ -850,14 +932,7 @@ static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp,
                break;
        }
        case OBD_IOC_FID2PATH: {
-               struct getinfo_fid2path *gf;
-               struct lmv_tgt_desc     *tgt;
-
-               gf = (struct getinfo_fid2path *)karg;
-               tgt = lmv_find_target(lmv, &gf->gf_fid);
-               if (IS_ERR(tgt))
-                       RETURN(PTR_ERR(tgt));
-               rc = obd_iocontrol(cmd, tgt->ltd_exp, len, karg, uarg);
+               rc = lmv_fid2path(exp, len, karg, uarg);
                break;
        }
        case LL_IOC_HSM_STATE_GET:
index 772502b..9fc8e51 100644 (file)
@@ -1198,9 +1198,9 @@ static int mdc_ioc_fid2path(struct obd_export *exp, struct getinfo_fid2path *gf)
         /* Val is struct getinfo_fid2path result plus path */
         vallen = sizeof(*gf) + gf->gf_pathlen;
 
-        rc = obd_get_info(NULL, exp, keylen, key, &vallen, gf, NULL);
-        if (rc)
-                GOTO(out, rc);
+       rc = obd_get_info(NULL, exp, keylen, key, &vallen, gf, NULL);
+       if (rc != 0 && rc != -EREMOTE)
+               GOTO(out, rc);
 
         if (vallen <= sizeof(*gf))
                 GOTO(out, rc = -EPROTO);
@@ -1960,19 +1960,20 @@ int mdc_get_info_rpc(struct obd_export *exp,
                              RCL_SERVER, vallen);
         ptlrpc_request_set_replen(req);
 
-        rc = ptlrpc_queue_wait(req);
-        if (rc == 0) {
-                tmp = req_capsule_server_get(&req->rq_pill, &RMF_GETINFO_VAL);
-                memcpy(val, tmp, vallen);
-                if (ptlrpc_rep_need_swab(req)) {
-                        if (KEY_IS(KEY_FID2PATH)) {
-                                lustre_swab_fid2path(val);
-                        }
-                }
-        }
-        ptlrpc_req_finished(req);
+       rc = ptlrpc_queue_wait(req);
+       /* -EREMOTE means the get_info result is partial, and it needs to
+        * continue on another MDT, see fid2path part in lmv_iocontrol */
+       if (rc == 0 || rc == -EREMOTE) {
+               tmp = req_capsule_server_get(&req->rq_pill, &RMF_GETINFO_VAL);
+               memcpy(val, tmp, vallen);
+               if (ptlrpc_rep_need_swab(req)) {
+                       if (KEY_IS(KEY_FID2PATH))
+                               lustre_swab_fid2path(val);
+               }
+       }
+       ptlrpc_req_finished(req);
 
-        RETURN(rc);
+       RETURN(rc);
 }
 
 static void lustre_swab_hai(struct hsm_action_item *h)
index fe1eb49..f718025 100644 (file)
@@ -107,7 +107,7 @@ static int mdd_convert_linkea(const struct lu_env *env,
        ENTRY;
 
        th = dt_trans_create(env, mdd->mdd_child);
-       rc = mdd_declare_links_add(env, o, th);
+       rc = mdd_declare_links_add(env, o, th, NULL);
        if (rc)
                GOTO(out, rc);
        rc = dt_trans_start_local(env, mdd->mdd_child, th);
@@ -118,7 +118,7 @@ static int mdd_convert_linkea(const struct lu_env *env,
        oldfid.f_oid = MDD_ROOT_INDEX_OID;
        oldfid.f_ver = 0;
        rc = mdd_links_rename(env, o, &oldfid, name, &mdd->mdd_root_fid,
-                             name, th, 0, 1);
+                             name, th, NULL, 0, 1);
        if (rc == -ENOENT || rc == -EEXIST)
                rc = 0;
 
index 930d5ce..50b292d 100644 (file)
@@ -533,7 +533,7 @@ int mdd_changelog_write_header(const struct lu_env *env,
        }
 
        reclen = llog_data_len(sizeof(*rec) + len);
-       buf = mdd_buf_alloc(env, reclen);
+       buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_big_buf, reclen);
        if (buf->lb_buf == NULL)
                RETURN(-ENOMEM);
        rec = buf->lb_buf;
@@ -697,12 +697,6 @@ static int dot_lustre_mdd_object_sync(const struct lu_env *env,
         return -ENOSYS;
 }
 
-static int dot_lustre_mdd_path(const struct lu_env *env, struct md_object *obj,
-                           char *path, int pathlen, __u64 *recno, int *linkno)
-{
-        return -ENOSYS;
-}
-
 static int dot_file_lock(const struct lu_env *env, struct md_object *obj,
                          struct lov_mds_md *lmm, struct ldlm_extent *extent,
                          struct lustre_handle *lockh)
@@ -734,7 +728,6 @@ static struct md_object_operations mdd_dot_lustre_obj_ops = {
        .moo_close              = dot_lustre_mdd_close,
        .moo_capa_get           = mdd_capa_get,
        .moo_object_sync        = dot_lustre_mdd_object_sync,
-       .moo_path               = dot_lustre_mdd_path,
        .moo_file_lock          = dot_file_lock,
        .moo_file_unlock        = dot_file_unlock,
 };
@@ -942,12 +935,6 @@ static int obf_mdd_readpage(const struct lu_env *env, struct md_object *obj,
         return -EPERM;
 }
 
-static int obf_path(const struct lu_env *env, struct md_object *obj,
-                    char *path, int pathlen, __u64 *recno, int *linkno)
-{
-        return -ENOSYS;
-}
-
 static struct md_object_operations mdd_obf_obj_ops = {
        .moo_attr_get    = obf_attr_get,
        .moo_attr_set    = obf_attr_set,
@@ -958,7 +945,6 @@ static struct md_object_operations mdd_obf_obj_ops = {
        .moo_open        = obf_mdd_open,
        .moo_close       = obf_mdd_close,
        .moo_readpage    = obf_mdd_readpage,
-       .moo_path        = obf_path
 };
 
 /**
@@ -1273,17 +1259,14 @@ const struct lu_device_operations mdd_lu_ops = {
         .ldo_prepare           = mdd_prepare,
 };
 
-/*
- * No permission check is needed.
- */
 static int mdd_root_get(const struct lu_env *env,
-                        struct md_device *m, struct lu_fid *f)
+                       struct md_device *m, struct lu_fid *f)
 {
-        struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
+       struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
 
-        ENTRY;
-        *f = mdd->mdd_root_fid;
-        RETURN(0);
+       ENTRY;
+       *f = mdd->mdd_root_fid;
+       RETURN(0);
 }
 
 /*
@@ -1718,12 +1701,12 @@ static int mdd_iocontrol(const struct lu_env *env, struct md_device *m,
 LU_TYPE_INIT_FINI(mdd, &mdd_thread_key, &mdd_capainfo_key);
 
 const struct md_device_operations mdd_ops = {
-        .mdo_statfs         = mdd_statfs,
-        .mdo_root_get       = mdd_root_get,
-        .mdo_init_capa_ctxt = mdd_init_capa_ctxt,
-        .mdo_update_capa_key= mdd_update_capa_key,
-        .mdo_llog_ctxt_get  = mdd_llog_ctxt_get,
-        .mdo_iocontrol      = mdd_iocontrol,
+       .mdo_statfs         = mdd_statfs,
+       .mdo_root_get       = mdd_root_get,
+       .mdo_init_capa_ctxt = mdd_init_capa_ctxt,
+       .mdo_update_capa_key= mdd_update_capa_key,
+       .mdo_llog_ctxt_get  = mdd_llog_ctxt_get,
+       .mdo_iocontrol      = mdd_iocontrol,
 };
 
 static struct lu_device_type_operations mdd_device_type_ops = {
@@ -1757,7 +1740,8 @@ static void mdd_key_fini(const struct lu_context *ctx,
                 OBD_FREE(info->mti_max_lmm, info->mti_max_lmm_size);
         if (info->mti_max_cookie != NULL)
                 OBD_FREE(info->mti_max_cookie, info->mti_max_cookie_size);
-        mdd_buf_put(&info->mti_big_buf);
+       lu_buf_free(&info->mti_big_buf);
+       lu_buf_free(&info->mti_link_buf);
 
         OBD_FREE_PTR(info);
 }
index 59051e1..b352b6c 100644 (file)
@@ -37,7 +37,7 @@
  *
  * Lustre Metadata Server (mdd) routines
  *
- * Author: Wang Di <wangdi@clusterfs.com>
+ * Author: Wang Di <wangdi@intel.com>
  */
 
 #define DEBUG_SUBSYSTEM S_MDS
@@ -57,19 +57,8 @@ static struct lu_name lname_dotdot = {
         sizeof(dotdot) - 1
 };
 
-static int __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
-                        const struct lu_name *lname, struct lu_fid* fid,
-                        int mask);
-static inline int mdd_links_add(const struct lu_env *env,
-                               struct mdd_object *mdd_obj,
-                               const struct lu_fid *pfid,
-                               const struct lu_name *lname,
-                               struct thandle *handle, int first);
-static inline int mdd_links_del(const struct lu_env *env,
-                               struct mdd_object *mdd_obj,
-                               const struct lu_fid *pfid,
-                               const struct lu_name *lname,
-                               struct thandle *handle);
+static int __mdd_lookup(const struct lu_env *, struct md_object *,
+                       const struct lu_name *, struct lu_fid*, int);
 
 static int
 __mdd_lookup_locked(const struct lu_env *env, struct md_object *pobj,
@@ -637,7 +626,7 @@ int mdd_declare_changelog_store(const struct lu_env *env,
 
        reclen = llog_data_len(sizeof(*rec) +
                               (fname != NULL ? fname->ln_namelen : 0));
-       buf = mdd_buf_alloc(env, reclen);
+       buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_big_buf, reclen);
        if (buf->lb_buf == NULL)
                return -ENOMEM;
 
@@ -675,7 +664,7 @@ static int mdd_declare_changelog_ext_store(const struct lu_env *env,
        reclen = llog_data_len(sizeof(*rec) +
                               (tname != NULL ? tname->ln_namelen : 0) +
                               (sname != NULL ? 1 + sname->ln_namelen : 0));
-       buf = mdd_buf_alloc(env, reclen);
+       buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_big_buf, reclen);
        if (buf->lb_buf == NULL)
                return -ENOMEM;
 
@@ -800,7 +789,7 @@ int mdd_changelog_ns_store(const struct lu_env *env, struct mdd_device *mdd,
        LASSERT(handle != NULL);
 
        reclen = llog_data_len(sizeof(*rec) + tname->ln_namelen);
-       buf = mdd_buf_alloc(env, reclen);
+       buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_big_buf, reclen);
        if (buf->lb_buf == NULL)
                RETURN(-ENOMEM);
        rec = buf->lb_buf;
@@ -868,7 +857,7 @@ static int mdd_changelog_ext_ns_store(const struct lu_env  *env,
 
        reclen = llog_data_len(sizeof(*rec) +
                               sname != NULL ? 1 + sname->ln_namelen : 0);
-       buf = mdd_buf_alloc(env, reclen);
+       buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_big_buf, reclen);
        if (buf->lb_buf == NULL)
                RETURN(-ENOMEM);
        rec = buf->lb_buf;
@@ -904,12 +893,307 @@ static int mdd_changelog_ext_ns_store(const struct lu_env  *env,
        return 0;
 }
 
+static int __mdd_links_add(const struct lu_env *env,
+                          struct mdd_object *mdd_obj,
+                          struct linkea_data *ldata,
+                          const struct lu_name *lname,
+                          const struct lu_fid *pfid,
+                          int first, int check)
+{
+       int rc;
+
+       if (ldata->ld_leh == NULL) {
+               rc = first ? -ENODATA : mdd_links_read(env, mdd_obj, ldata);
+               if (rc) {
+                       if (rc != -ENODATA)
+                               return rc;
+                       rc = linkea_data_new(ldata,
+                                            &mdd_env_info(env)->mti_link_buf);
+                       if (rc)
+                               return rc;
+               }
+       }
+
+       if (check) {
+               rc = linkea_links_find(ldata, lname, pfid);
+               if (rc && rc != -ENOENT)
+                       return rc;
+               if (rc == 0)
+                       return -EEXIST;
+       }
+
+       if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LINKEA_MORE)) {
+               struct lu_fid *tfid = &mdd_env_info(env)->mti_fid2;
+
+               *tfid = *pfid;
+               tfid->f_ver = ~0;
+               linkea_add_buf(ldata, lname, tfid);
+       }
+
+       return linkea_add_buf(ldata, lname, pfid);
+}
+
+static int __mdd_links_del(const struct lu_env *env,
+                          struct mdd_object *mdd_obj,
+                          struct linkea_data *ldata,
+                          const struct lu_name *lname,
+                          const struct lu_fid *pfid)
+{
+       int rc;
+
+       if (ldata->ld_leh == NULL) {
+               rc = mdd_links_read(env, mdd_obj, ldata);
+               if (rc)
+                       return rc;
+       }
+
+       rc = linkea_links_find(ldata, lname, pfid);
+       if (rc)
+               return rc;
+
+       linkea_del_buf(ldata, lname);
+       return 0;
+}
+
+static int mdd_linkea_prepare(const struct lu_env *env,
+                             struct mdd_object *mdd_obj,
+                             const struct lu_fid *oldpfid,
+                             const struct lu_name *oldlname,
+                             const struct lu_fid *newpfid,
+                             const struct lu_name *newlname,
+                             int first, int check,
+                             struct linkea_data *ldata)
+{
+       int rc = 0;
+       int rc2 = 0;
+       ENTRY;
+
+       if (OBD_FAIL_CHECK(OBD_FAIL_FID_IGIF))
+               return 0;
+
+       LASSERT(oldpfid != NULL || newpfid != NULL);
+
+       if (mdd_obj->mod_flags & DEAD_OBJ)
+               /* No more links, don't bother */
+               RETURN(0);
+
+       if (oldpfid != NULL) {
+               rc = __mdd_links_del(env, mdd_obj, ldata, oldlname, oldpfid);
+               if (rc) {
+                       if ((check == 0) ||
+                           (rc != -ENODATA && rc != -ENOENT))
+                               RETURN(rc);
+                       /* No changes done. */
+                       rc = 0;
+               }
+       }
+
+       /* If renaming, add the new record */
+       if (newpfid != NULL) {
+               /* even if the add fails, we still delete the out-of-date
+                * old link */
+               rc2 = __mdd_links_add(env, mdd_obj, ldata, newlname, newpfid,
+                                     first, check);
+               if (rc2 == -EEXIST)
+                       rc2 = 0;
+       }
+
+       rc = rc != 0 ? rc : rc2;
+
+       RETURN(rc);
+}
+
+int mdd_links_rename(const struct lu_env *env,
+                    struct mdd_object *mdd_obj,
+                    const struct lu_fid *oldpfid,
+                    const struct lu_name *oldlname,
+                    const struct lu_fid *newpfid,
+                    const struct lu_name *newlname,
+                    struct thandle *handle,
+                    struct linkea_data *ldata,
+                    int first, int check)
+{
+       int rc2 = 0;
+       int rc = 0;
+       ENTRY;
+
+       if (ldata == NULL) {
+               ldata = &mdd_env_info(env)->mti_link_data;
+               memset(ldata, 0, sizeof(*ldata));
+               rc = mdd_linkea_prepare(env, mdd_obj, oldpfid, oldlname,
+                                       newpfid, newlname, first, check,
+                                       ldata);
+               if (rc != 0)
+                       GOTO(out, rc);
+       }
+
+       if (ldata->ld_lee != NULL)
+               rc = mdd_links_write(env, mdd_obj, ldata, handle);
+       EXIT;
+out:
+       if (rc == 0)
+               rc = rc2;
+       if (rc) {
+               int error = 1;
+               if (rc == -EOVERFLOW || rc == -ENOENT)
+                       error = 0;
+               if (oldpfid == NULL)
+                       CDEBUG(error ? D_ERROR : D_OTHER,
+                              "link_ea add '%.*s' failed %d "DFID"\n",
+                              newlname->ln_namelen, newlname->ln_name,
+                              rc, PFID(mdd_object_fid(mdd_obj)));
+               else if (newpfid == NULL)
+                       CDEBUG(error ? D_ERROR : D_OTHER,
+                              "link_ea del '%.*s' failed %d "DFID"\n",
+                              oldlname->ln_namelen, oldlname->ln_name,
+                              rc, PFID(mdd_object_fid(mdd_obj)));
+               else
+                       CDEBUG(error ? D_ERROR : D_OTHER,
+                              "link_ea rename '%.*s'->'%.*s' failed %d "
+                              DFID"\n",
+                              oldlname->ln_namelen, oldlname->ln_name,
+                              newlname->ln_namelen, newlname->ln_name,
+                              rc, PFID(mdd_object_fid(mdd_obj)));
+       }
+
+       if (ldata->ld_buf && ldata->ld_buf->lb_len > OBD_ALLOC_BIG)
+               /* if we vmalloced a large buffer drop it */
+               lu_buf_free(ldata->ld_buf);
+
+       return rc;
+}
+
+static inline int mdd_links_add(const struct lu_env *env,
+                               struct mdd_object *mdd_obj,
+                               const struct lu_fid *pfid,
+                               const struct lu_name *lname,
+                               struct thandle *handle,
+                               struct linkea_data *data, int first)
+{
+       return mdd_links_rename(env, mdd_obj, NULL, NULL,
+                               pfid, lname, handle, data, first, 0);
+}
+
+static inline int mdd_links_del(const struct lu_env *env,
+                               struct mdd_object *mdd_obj,
+                               const struct lu_fid *pfid,
+                               const struct lu_name *lname,
+                               struct thandle *handle)
+{
+       return mdd_links_rename(env, mdd_obj, pfid, lname,
+                               NULL, NULL, handle, NULL, 0, 0);
+}
+
+/** Read the link EA into a temp buffer.
+ * Uses the mdd_thread_info::mti_big_buf since it is generally large.
+ * A pointer to the buffer is stored in \a ldata::ld_buf.
+ *
+ * \retval 0 or error
+ */
+int mdd_links_read(const struct lu_env *env, struct mdd_object *mdd_obj,
+                  struct linkea_data *ldata)
+{
+       int rc;
+
+       /* First try a small buf */
+       LASSERT(env != NULL);
+       ldata->ld_buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_link_buf,
+                                              CFS_PAGE_SIZE);
+       if (ldata->ld_buf->lb_buf == NULL)
+               return -ENOMEM;
+
+       if (!mdd_object_exists(mdd_obj))
+               return -ENODATA;
+
+       rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf, XATTR_NAME_LINK,
+                         BYPASS_CAPA);
+       if (rc == -ERANGE) {
+               /* Buf was too small, figure out what we need. */
+               lu_buf_free(ldata->ld_buf);
+               rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf,
+                                  XATTR_NAME_LINK, BYPASS_CAPA);
+               if (rc < 0)
+                       return rc;
+               ldata->ld_buf = lu_buf_check_and_alloc(ldata->ld_buf, rc);
+               if (ldata->ld_buf->lb_buf == NULL)
+                       return -ENOMEM;
+               rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf,
+                                 XATTR_NAME_LINK, BYPASS_CAPA);
+       }
+       if (rc < 0)
+               return rc;
+
+       linkea_init(ldata);
+       return 0;
+}
+
+/** Read the link EA into a temp buffer.
+ * Uses the name_buf since it is generally large.
+ * \retval IS_ERR err
+ * \retval ptr to \a lu_buf (always \a mti_big_buf)
+ */
+struct lu_buf *mdd_links_get(const struct lu_env *env,
+                            struct mdd_object *mdd_obj)
+{
+       struct linkea_data ldata = { 0 };
+       int rc;
+
+       rc = mdd_links_read(env, mdd_obj, &ldata);
+       return rc ? ERR_PTR(rc) : ldata.ld_buf;
+}
+
+int mdd_links_write(const struct lu_env *env, struct mdd_object *mdd_obj,
+                   struct linkea_data *ldata, struct thandle *handle)
+{
+       const struct lu_buf *buf = mdd_buf_get_const(env, ldata->ld_buf->lb_buf,
+                                                    ldata->ld_leh->leh_len);
+       return mdo_xattr_set(env, mdd_obj, buf, XATTR_NAME_LINK, 0, handle,
+                            mdd_object_capa(env, mdd_obj));
+}
+
+int mdd_declare_links_add(const struct lu_env *env, struct mdd_object *mdd_obj,
+                         struct thandle *handle, struct linkea_data *ldata)
+{
+       int     rc;
+       int     ea_len;
+       void    *linkea;
+
+       if (ldata != NULL && ldata->ld_lee != NULL) {
+               ea_len = ldata->ld_leh->leh_len;
+               linkea = ldata->ld_buf->lb_buf;
+       } else {
+               ea_len = 4096;
+               linkea = NULL;
+       }
+
+       /* XXX: max size? */
+       rc = mdo_declare_xattr_set(env, mdd_obj,
+                                  mdd_buf_get_const(env, linkea, ea_len),
+                                  XATTR_NAME_LINK, 0, handle);
+       return rc;
+}
+
+static inline int mdd_declare_links_del(const struct lu_env *env,
+                                       struct mdd_object *c,
+                                       struct thandle *handle)
+{
+       int rc = 0;
+
+       /* For directory, the linkEA will be removed together
+        * with the object. */
+       if (!S_ISDIR(mdd_object_type(c)))
+               rc = mdd_declare_links_add(env, c, handle, NULL);
+
+       return rc;
+}
+
 static int mdd_declare_link(const struct lu_env *env,
                             struct mdd_device *mdd,
                             struct mdd_object *p,
                             struct mdd_object *c,
                             const struct lu_name *name,
-                            struct thandle *handle)
+                           struct thandle *handle,
+                           struct linkea_data *data)
 {
         int rc;
 
@@ -929,7 +1213,7 @@ static int mdd_declare_link(const struct lu_env *env,
         if (rc)
                 return rc;
 
-        rc = mdd_declare_links_add(env, c, handle);
+       rc = mdd_declare_links_add(env, c, handle, data);
         if (rc)
                 return rc;
 
@@ -949,6 +1233,7 @@ static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
         struct mdd_device *mdd = mdo2mdd(src_obj);
         struct dynlock_handle *dlh;
         struct thandle *handle;
+       struct linkea_data *ldata = &mdd_env_info(env)->mti_link_data;
         int rc;
         ENTRY;
 
@@ -956,7 +1241,10 @@ static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
         if (IS_ERR(handle))
                 GOTO(out_pending, rc = PTR_ERR(handle));
 
-        rc = mdd_declare_link(env, mdd, mdd_tobj, mdd_sobj, lname, handle);
+       memset(ldata, 0, sizeof(*ldata));
+
+       rc = mdd_declare_link(env, mdd, mdd_tobj, mdd_sobj, lname, handle,
+                             ldata);
         if (rc)
                 GOTO(stop, rc);
 
@@ -996,11 +1284,17 @@ static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
 
         la->la_valid = LA_CTIME;
         rc = mdd_attr_check_set_internal(env, mdd_sobj, la, handle, 0);
-        if (rc == 0) {
-                mdd_links_add(env, mdd_sobj,
-                              mdo2fid(mdd_tobj), lname, handle, 0);
-        }
-
+       if (rc == 0) {
+               rc = mdd_linkea_prepare(env, mdd_sobj, NULL, NULL,
+                                       mdo2fid(mdd_tobj), lname, 0, 0,
+                                       ldata);
+               if (rc == 0)
+                       mdd_links_add(env, mdd_sobj, mdo2fid(mdd_tobj),
+                                     lname, handle, ldata, 0);
+               /* The failure of links_add should not cause the link
+                * failure, reset rc here */
+               rc = 0;
+       }
         EXIT;
 out_unlock:
         mdd_write_unlock(env, mdd_sobj);
@@ -1010,7 +1304,11 @@ out_trans:
                rc = mdd_changelog_ns_store(env, mdd, CL_HARDLINK, 0, mdd_sobj,
                                            mdd_tobj, lname, handle);
 stop:
-        mdd_trans_stop(env, mdd, rc, handle);
+       mdd_trans_stop(env, mdd, rc, handle);
+
+       if (ldata->ld_buf && ldata->ld_buf->lb_len > OBD_ALLOC_BIG)
+               /* if we vmalloced a large buffer drop it */
+               lu_buf_free(ldata->ld_buf);
 out_pending:
         return rc;
 }
@@ -1080,19 +1378,6 @@ int mdd_unlink_sanity_check(const struct lu_env *env, struct mdd_object *pobj,
         RETURN(rc);
 }
 
-static inline int mdd_declare_links_del(const struct lu_env *env,
-                                       struct mdd_object *c,
-                                       struct thandle *handle)
-{
-       int rc = 0;
-
-       /* For directory, the linkEA will be removed together with the object. */
-       if (!S_ISDIR(mdd_object_type(c)))
-               rc = mdd_declare_links_add(env, c, handle);
-
-       return rc;
-}
-
 static int mdd_declare_unlink(const struct lu_env *env, struct mdd_device *mdd,
                              struct mdd_object *p, struct mdd_object *c,
                              const struct lu_name *name, struct md_attr *ma,
@@ -1450,7 +1735,8 @@ static int mdd_declare_object_initialize(const struct lu_env *env,
                                         struct mdd_object *parent,
                                         struct mdd_object *child,
                                         struct lu_attr *attr,
-                                        struct thandle *handle)
+                                        struct thandle *handle,
+                                        struct linkea_data *ldata)
 {
         int rc;
        ENTRY;
@@ -1478,15 +1764,18 @@ static int mdd_declare_object_initialize(const struct lu_env *env,
        if (rc == 0 && (fid_is_norm(mdo2fid(child)) ||
                        fid_is_dot_lustre(mdo2fid(child)) ||
                        fid_is_root(mdo2fid(child))))
-                mdd_declare_links_add(env, child, handle);
+               mdd_declare_links_add(env, child, handle, ldata);
 
        RETURN(rc);
 }
 
-int mdd_object_initialize(const struct lu_env *env, const struct lu_fid *pfid,
-                         const struct lu_name *lname, struct mdd_object *child,
-                         const struct lu_attr *attr, struct thandle *handle,
-                         const struct md_op_spec *spec)
+static int mdd_object_initialize(const struct lu_env *env,
+                                const struct lu_fid *pfid,
+                                const struct lu_name *lname,
+                                struct mdd_object *child,
+                                struct lu_attr *attr, struct thandle *handle,
+                                const struct md_op_spec *spec,
+                                struct linkea_data *ldata)
 {
         int rc;
         ENTRY;
@@ -1517,7 +1806,7 @@ int mdd_object_initialize(const struct lu_env *env, const struct lu_fid *pfid,
        if (rc == 0 && (fid_is_norm(mdo2fid(child)) ||
                        fid_is_dot_lustre(mdo2fid(child)) ||
                        fid_is_root(mdo2fid(child))))
-               mdd_links_add(env, child, pfid, lname, handle, 1);
+               mdd_links_add(env, child, pfid, lname, handle, ldata, 1);
 
        RETURN(rc);
 }
@@ -1607,7 +1896,8 @@ static int mdd_declare_create(const struct lu_env *env, struct mdd_device *mdd,
                              struct lu_attr *attr,
                              int got_def_acl,
                              struct thandle *handle,
-                             const struct md_op_spec *spec)
+                             const struct md_op_spec *spec,
+                             struct linkea_data *ldata)
 {
        int rc;
 
@@ -1646,7 +1936,7 @@ static int mdd_declare_create(const struct lu_env *env, struct mdd_device *mdd,
                        GOTO(out, rc);
         }
 
-       rc = mdd_declare_object_initialize(env, p, c, attr, handle);
+       rc = mdd_declare_object_initialize(env, p, c, attr, handle, ldata);
        if (rc)
                GOTO(out, rc);
 
@@ -1740,6 +2030,7 @@ static int mdd_create(const struct lu_env *env, struct md_object *pobj,
        struct thandle          *handle;
        struct lu_attr          *pattr = &info->mti_pattr;
        struct lu_buf           acl_buf;
+       struct linkea_data      *ldata = &info->mti_link_data;
        struct dynlock_handle   *dlh;
        const char              *name = lname->ln_name;
        int                      rc, created = 0, initialized = 0, inserted = 0;
@@ -1808,8 +2099,11 @@ static int mdd_create(const struct lu_env *env, struct md_object *pobj,
         if (IS_ERR(handle))
                 GOTO(out_free, rc = PTR_ERR(handle));
 
+       memset(ldata, 0, sizeof(*ldata));
+       mdd_linkea_prepare(env, son, NULL, NULL, mdd_object_fid(mdd_pobj),
+                          lname, 1, 0, ldata);
        rc = mdd_declare_create(env, mdd, mdd_pobj, son, lname, attr,
-                               got_def_acl, handle, spec);
+                               got_def_acl, handle, spec, ldata);
         if (rc)
                 GOTO(out_stop, rc);
 
@@ -1864,7 +2158,7 @@ static int mdd_create(const struct lu_env *env, struct md_object *pobj,
 #endif
 
        rc = mdd_object_initialize(env, mdo2fid(mdd_pobj), lname,
-                                  son, attr, handle, spec);
+                                  son, attr, handle, spec, ldata);
 
        /*
         * in case of replay we just set LOVEA provided by the client
@@ -1985,6 +2279,10 @@ out_trans:
 out_stop:
         mdd_trans_stop(env, mdd, rc, handle);
 out_free:
+       if (ldata->ld_buf && ldata->ld_buf->lb_len > OBD_ALLOC_BIG)
+               /* if we vmalloced a large buffer drop it */
+               lu_buf_free(ldata->ld_buf);
+
         /* The child object shouldn't be cached anymore */
         if (rc)
                set_bit(LU_OBJECT_HEARD_BANSHEE,
@@ -2042,35 +2340,35 @@ static int mdd_rename_sanity_check(const struct lu_env *env,
                                   struct lu_attr *so_attr,
                                   struct lu_attr *tg_attr)
 {
-        int rc = 0;
-        ENTRY;
+       int rc = 0;
+       ENTRY;
 
-        /* XXX: when get here, sobj must NOT be NULL,
-         * the other case has been processed in cml_rename
-         * before mdd_rename and enable MDS_PERM_BYPASS. */
-        LASSERT(sobj);
+       /* XXX: when get here, sobj must NOT be NULL,
+        * the other case has been processed in cld_rename
+        * before mdd_rename and enable MDS_PERM_BYPASS. */
+       LASSERT(sobj);
 
        rc = mdd_may_delete(env, src_pobj, sobj, so_attr, NULL, 1, 0);
-        if (rc)
-                RETURN(rc);
+       if (rc)
+               RETURN(rc);
 
-        /* XXX: when get here, "tobj == NULL" means tobj must
-         * NOT exist (neither on remote MDS, such case has been
-         * processed in cml_rename before mdd_rename and enable
-         * MDS_PERM_BYPASS).
-         * So check may_create, but not check may_unlink. */
-        if (!tobj)
-                rc = mdd_may_create(env, tgt_pobj, NULL,
-                                    (src_pobj != tgt_pobj), 0);
-        else
+       /* XXX: when get here, "tobj == NULL" means tobj must
+        * NOT exist (neither on remote MDS, such case has been
+        * processed in cld_rename before mdd_rename and enable
+        * MDS_PERM_BYPASS).
+        * So check may_create, but not check may_unlink. */
+       if (!tobj)
+               rc = mdd_may_create(env, tgt_pobj, NULL,
+                                   (src_pobj != tgt_pobj), 0);
+       else
                rc = mdd_may_delete(env, tgt_pobj, tobj, tg_attr, so_attr,
-                                    (src_pobj != tgt_pobj), 1);
+                                   (src_pobj != tgt_pobj), 1);
 
-        if (!rc && !tobj && (src_pobj != tgt_pobj) &&
+       if (!rc && !tobj && (src_pobj != tgt_pobj) &&
            S_ISDIR(so_attr->la_mode))
-                rc = __mdd_may_link(env, tgt_pobj);
+               rc = __mdd_may_link(env, tgt_pobj);
 
-        RETURN(rc);
+       RETURN(rc);
 }
 
 static int mdd_declare_rename(const struct lu_env *env,
@@ -2127,7 +2425,7 @@ static int mdd_declare_rename(const struct lu_env *env,
         rc = mdo_declare_attr_set(env, mdd_sobj, NULL, handle);
         if (rc)
                 return rc;
-        mdd_declare_links_add(env, mdd_sobj, handle);
+       mdd_declare_links_add(env, mdd_sobj, handle, NULL);
         if (rc)
                 return rc;
 
@@ -2171,7 +2469,7 @@ static int mdd_declare_rename(const struct lu_env *env,
                 if (rc)
                         return rc;
 
-                mdd_declare_links_add(env, mdd_tobj, handle);
+               mdd_declare_links_del(env, mdd_tobj, handle);
                 if (rc)
                         return rc;
 
@@ -2414,11 +2712,12 @@ static int mdd_rename(const struct lu_env *env,
         if (rc == 0 && mdd_sobj) {
                 mdd_write_lock(env, mdd_sobj, MOR_SRC_CHILD);
                rc = mdd_links_rename(env, mdd_sobj, mdo2fid(mdd_spobj), lsname,
-                                     mdo2fid(mdd_tpobj), ltname, handle, 0, 0);
+                                     mdo2fid(mdd_tpobj), ltname, handle, NULL,
+                                     0, 0);
                 if (rc == -ENOENT)
                         /* Old files might not have EA entry */
                         mdd_links_add(env, mdd_sobj, mdo2fid(mdd_spobj),
-                                      lsname, handle, 0);
+                                     lsname, handle, NULL, 0);
                 mdd_write_unlock(env, mdd_sobj);
                 /* We don't fail the transaction if the link ea can't be
                    updated -- fid2path will use alternate lookup method. */
@@ -2495,401 +2794,6 @@ out_pending:
        return rc;
 }
 
-int mdd_links_new(const struct lu_env *env, struct mdd_link_data *ldata)
-{
-       ldata->ml_buf = mdd_buf_alloc(env, CFS_PAGE_SIZE);
-       if (ldata->ml_buf->lb_buf == NULL)
-               return -ENOMEM;
-       ldata->ml_leh = ldata->ml_buf->lb_buf;
-       ldata->ml_leh->leh_magic = LINK_EA_MAGIC;
-       ldata->ml_leh->leh_len = sizeof(struct link_ea_header);
-       ldata->ml_leh->leh_reccount = 0;
-       return 0;
-}
-
-/** Read the link EA into a temp buffer.
- * Uses the mdd_thread_info::mti_big_buf since it is generally large.
- * A pointer to the buffer is stored in \a ldata::ml_buf.
- *
- * \retval 0 or error
- */
-int mdd_links_read(const struct lu_env *env, struct mdd_object *mdd_obj,
-                  struct mdd_link_data *ldata)
-{
-       struct lustre_capa *capa;
-       struct link_ea_header *leh;
-       int rc;
-
-       /* First try a small buf */
-       LASSERT(env != NULL);
-       ldata->ml_buf = mdd_buf_alloc(env, CFS_PAGE_SIZE);
-       if (ldata->ml_buf->lb_buf == NULL)
-               return -ENOMEM;
-
-       if (!mdd_object_exists(mdd_obj))
-               return -ENODATA;
-
-       capa = mdd_object_capa(env, mdd_obj);
-       rc = mdo_xattr_get(env, mdd_obj, ldata->ml_buf,
-                          XATTR_NAME_LINK, capa);
-       if (rc == -ERANGE) {
-               /* Buf was too small, figure out what we need. */
-               mdd_buf_put(ldata->ml_buf);
-               rc = mdo_xattr_get(env, mdd_obj, ldata->ml_buf,
-                                  XATTR_NAME_LINK, capa);
-               if (rc < 0)
-                       return rc;
-               ldata->ml_buf = mdd_buf_alloc(env, rc);
-               if (ldata->ml_buf->lb_buf == NULL)
-                       return -ENOMEM;
-               rc = mdo_xattr_get(env, mdd_obj, &LU_BUF_NULL,
-                                  XATTR_NAME_LINK, capa);
-       }
-       if (rc < 0)
-               return rc;
-
-       leh = ldata->ml_buf->lb_buf;
-       if (leh->leh_magic == __swab32(LINK_EA_MAGIC)) {
-               leh->leh_magic = LINK_EA_MAGIC;
-               leh->leh_reccount = __swab32(leh->leh_reccount);
-               leh->leh_len = __swab64(leh->leh_len);
-               /* entries are swabbed by mdd_lee_unpack */
-       }
-       if (leh->leh_magic != LINK_EA_MAGIC)
-               return -EINVAL;
-       if (leh->leh_reccount == 0)
-               return -ENODATA;
-
-       ldata->ml_leh = leh;
-       return 0;
-}
-
-/** Read the link EA into a temp buffer.
- * Uses the name_buf since it is generally large.
- * \retval IS_ERR err
- * \retval ptr to \a lu_buf (always \a mti_big_buf)
- */
-struct lu_buf *mdd_links_get(const struct lu_env *env,
-                            struct mdd_object *mdd_obj)
-{
-       struct mdd_link_data ldata = { 0 };
-       int rc;
-
-       rc = mdd_links_read(env, mdd_obj, &ldata);
-       return rc ? ERR_PTR(rc) : ldata.ml_buf;
-}
-
-int mdd_links_write(const struct lu_env *env, struct mdd_object *mdd_obj,
-                   struct mdd_link_data *ldata, struct thandle *handle)
-{
-       const struct lu_buf *buf = mdd_buf_get_const(env, ldata->ml_buf->lb_buf,
-                                                    ldata->ml_leh->leh_len);
-       return mdo_xattr_set(env, mdd_obj, buf, XATTR_NAME_LINK, 0, handle,
-                            mdd_object_capa(env, mdd_obj));
-}
-
-/** Pack a link_ea_entry.
- * All elements are stored as chars to avoid alignment issues.
- * Numbers are always big-endian
- * \retval record length
- */
-static int mdd_lee_pack(struct link_ea_entry *lee, const struct lu_name *lname,
-                        const struct lu_fid *pfid)
-{
-        struct lu_fid   tmpfid;
-        int             reclen;
-
-        fid_cpu_to_be(&tmpfid, pfid);
-       if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LINKEA_CRASH))
-               tmpfid.f_ver = ~0;
-        memcpy(&lee->lee_parent_fid, &tmpfid, sizeof(tmpfid));
-        memcpy(lee->lee_name, lname->ln_name, lname->ln_namelen);
-        reclen = sizeof(struct link_ea_entry) + lname->ln_namelen;
-
-        lee->lee_reclen[0] = (reclen >> 8) & 0xff;
-        lee->lee_reclen[1] = reclen & 0xff;
-        return reclen;
-}
-
-void mdd_lee_unpack(const struct link_ea_entry *lee, int *reclen,
-                    struct lu_name *lname, struct lu_fid *pfid)
-{
-        *reclen = (lee->lee_reclen[0] << 8) | lee->lee_reclen[1];
-        memcpy(pfid, &lee->lee_parent_fid, sizeof(*pfid));
-        fid_be_to_cpu(pfid, pfid);
-        lname->ln_name = lee->lee_name;
-        lname->ln_namelen = *reclen - sizeof(struct link_ea_entry);
-}
-
-int mdd_declare_links_add(const struct lu_env *env,
-                         struct mdd_object *mdd_obj,
-                         struct thandle *handle)
-{
-        int rc;
-
-        /* XXX: max size? */
-        rc = mdo_declare_xattr_set(env, mdd_obj,
-                             mdd_buf_get_const(env, NULL, 4096),
-                             XATTR_NAME_LINK, 0, handle);
-
-        return rc;
-}
-
-/** Add a record to the end of link ea buf */
-int mdd_links_add_buf(const struct lu_env *env, struct mdd_link_data *ldata,
-                     const struct lu_name *lname, const struct lu_fid *pfid)
-{
-       LASSERT(ldata->ml_leh != NULL);
-
-       if (lname == NULL || pfid == NULL)
-               return -EINVAL;
-
-       ldata->ml_reclen = lname->ln_namelen + sizeof(struct link_ea_entry);
-       if (ldata->ml_leh->leh_len + ldata->ml_reclen >
-           ldata->ml_buf->lb_len) {
-               if (mdd_buf_grow(env, ldata->ml_leh->leh_len +
-                                ldata->ml_reclen) < 0)
-                       return -ENOMEM;
-       }
-
-       ldata->ml_leh = ldata->ml_buf->lb_buf;
-       ldata->ml_lee = ldata->ml_buf->lb_buf + ldata->ml_leh->leh_len;
-       ldata->ml_reclen = mdd_lee_pack(ldata->ml_lee, lname, pfid);
-       ldata->ml_leh->leh_len += ldata->ml_reclen;
-       ldata->ml_leh->leh_reccount++;
-       CDEBUG(D_INODE, "New link_ea name '%.*s' is added\n",
-              lname->ln_namelen, lname->ln_name);
-       return 0;
-}
-
-/** Del the current record from the link ea buf */
-void mdd_links_del_buf(const struct lu_env *env, struct mdd_link_data *ldata,
-                      const struct lu_name *lname)
-{
-       LASSERT(ldata->ml_leh != NULL);
-
-       ldata->ml_leh->leh_reccount--;
-       ldata->ml_leh->leh_len -= ldata->ml_reclen;
-       memmove(ldata->ml_lee, (char *)ldata->ml_lee + ldata->ml_reclen,
-               (char *)ldata->ml_leh + ldata->ml_leh->leh_len -
-               (char *)ldata->ml_lee);
-       CDEBUG(D_INODE, "Old link_ea name '%.*s' is removed\n",
-              lname->ln_namelen, lname->ln_name);
-
-}
-
-/**
- * Check if such a link exists in linkEA.
- *
- * \param mdd_obj object being handled
- * \param pfid parent fid the link to be found for
- * \param lname name in the parent's directory entry pointing to this object
- * \param ldata link data the search to be done on
- *
- * \retval   0 success
- * \retval -ENOENT link does not exist
- * \retval -ve on error
- */
-int mdd_links_find(const struct lu_env *env, struct mdd_object *mdd_obj,
-                  struct mdd_link_data *ldata, const struct lu_name *lname,
-                  const struct lu_fid  *pfid)
-{
-       struct lu_name *tmpname = &mdd_env_info(env)->mti_name2;
-       struct lu_fid  *tmpfid = &mdd_env_info(env)->mti_fid;
-       int count;
-
-       LASSERT(ldata->ml_leh != NULL);
-
-       /* link #0 */
-       ldata->ml_lee = (struct link_ea_entry *)(ldata->ml_leh + 1);
-
-       for (count = 0; count < ldata->ml_leh->leh_reccount; count++) {
-               mdd_lee_unpack(ldata->ml_lee, &ldata->ml_reclen,
-                              tmpname, tmpfid);
-               if (tmpname->ln_namelen == lname->ln_namelen &&
-                   lu_fid_eq(tmpfid, pfid) &&
-                   (strncmp(tmpname->ln_name, lname->ln_name,
-                            tmpname->ln_namelen) == 0))
-                       break;
-               ldata->ml_lee = (struct link_ea_entry *)((char *)ldata->ml_lee +
-                                                        ldata->ml_reclen);
-       }
-
-       if (count == ldata->ml_leh->leh_reccount) {
-               CDEBUG(D_INODE, "Old link_ea name '%.*s' not found\n",
-                      lname->ln_namelen, lname->ln_name);
-               return -ENOENT;
-       }
-       return 0;
-}
-
-static int __mdd_links_add(const struct lu_env *env,
-                          struct mdd_object *mdd_obj,
-                          struct mdd_link_data *ldata,
-                          const struct lu_name *lname,
-                          const struct lu_fid *pfid,
-                          int first, int check)
-{
-       int rc;
-
-       if (ldata->ml_leh == NULL) {
-               rc = first ? -ENODATA : mdd_links_read(env, mdd_obj, ldata);
-               if (rc) {
-                       if (rc != -ENODATA)
-                               return rc;
-                       rc = mdd_links_new(env, ldata);
-                       if (rc)
-                               return rc;
-               }
-       }
-
-       if (check) {
-               rc = mdd_links_find(env, mdd_obj, ldata, lname, pfid);
-               if (rc && rc != -ENOENT)
-                       return rc;
-               if (rc == 0)
-                       return -EEXIST;
-       }
-
-       if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LINKEA_MORE)) {
-               struct lu_fid *tfid = &mdd_env_info(env)->mti_fid2;
-
-               *tfid = *pfid;
-               tfid->f_ver = ~0;
-               mdd_links_add_buf(env, ldata, lname, tfid);
-       }
-
-       return mdd_links_add_buf(env, ldata, lname, pfid);
-}
-
-static int __mdd_links_del(const struct lu_env *env,
-                          struct mdd_object *mdd_obj,
-                          struct mdd_link_data *ldata,
-                          const struct lu_name *lname,
-                          const struct lu_fid *pfid)
-{
-       int rc;
-
-       if (ldata->ml_leh == NULL) {
-               rc = mdd_links_read(env, mdd_obj, ldata);
-               if (rc)
-                       return rc;
-       }
-
-       rc = mdd_links_find(env, mdd_obj, ldata, lname, pfid);
-       if (rc)
-               return rc;
-
-       mdd_links_del_buf(env, ldata, lname);
-       return 0;
-}
-
-int mdd_links_rename(const struct lu_env *env,
-                    struct mdd_object *mdd_obj,
-                    const struct lu_fid *oldpfid,
-                    const struct lu_name *oldlname,
-                    const struct lu_fid *newpfid,
-                    const struct lu_name *newlname,
-                    struct thandle *handle,
-                    int first, int check)
-{
-       struct mdd_link_data ldata = { 0 };
-       int updated = 0;
-       int rc2 = 0;
-       int rc = 0;
-       ENTRY;
-
-       if (OBD_FAIL_CHECK(OBD_FAIL_FID_IGIF))
-               return 0;
-
-       LASSERT(oldpfid != NULL || newpfid != NULL);
-
-       if (mdd_obj->mod_flags & DEAD_OBJ)
-               /* No more links, don't bother */
-               RETURN(0);
-
-       if (oldpfid != NULL) {
-               rc = __mdd_links_del(env, mdd_obj, &ldata,
-                                    oldlname, oldpfid);
-               if (rc) {
-                       if ((check == 0) ||
-                           (rc != -ENODATA && rc != -ENOENT))
-                               GOTO(out, rc);
-                       /* No changes done. */
-                       rc = 0;
-               } else {
-                       updated = 1;
-               }
-       }
-
-       /* If renaming, add the new record */
-       if (newpfid != NULL) {
-               /* even if the add fails, we still delete the out-of-date
-                * old link */
-               rc2 = __mdd_links_add(env, mdd_obj, &ldata,
-                                     newlname, newpfid, first, check);
-               if (rc2 == -EEXIST)
-                       rc2 = 0;
-               else if (rc2 == 0)
-                       updated = 1;
-       }
-
-       if (updated)
-               rc = mdd_links_write(env, mdd_obj, &ldata, handle);
-       EXIT;
-out:
-       if (rc == 0)
-               rc = rc2;
-       if (rc) {
-               int error = 1;
-               if (rc == -EOVERFLOW || rc == - ENOENT)
-                       error = 0;
-               if (oldpfid == NULL)
-                       CDEBUG(error ? D_ERROR : D_OTHER,
-                              "link_ea add '%.*s' failed %d "DFID"\n",
-                              newlname->ln_namelen, newlname->ln_name,
-                              rc, PFID(mdd_object_fid(mdd_obj)));
-               else if (newpfid == NULL)
-                       CDEBUG(error ? D_ERROR : D_OTHER,
-                              "link_ea del '%.*s' failed %d "DFID"\n",
-                              oldlname->ln_namelen, oldlname->ln_name,
-                              rc, PFID(mdd_object_fid(mdd_obj)));
-               else
-                       CDEBUG(error ? D_ERROR : D_OTHER,
-                              "link_ea rename '%.*s'->'%.*s' failed %d "
-                              DFID"\n",
-                              oldlname->ln_namelen, oldlname->ln_name,
-                              newlname->ln_namelen, newlname->ln_name,
-                              rc, PFID(mdd_object_fid(mdd_obj)));
-       }
-
-       if (ldata.ml_buf && ldata.ml_buf->lb_len > OBD_ALLOC_BIG)
-               /* if we vmalloced a large buffer drop it */
-               mdd_buf_put(ldata.ml_buf);
-
-       return rc;
-}
-
-static inline int mdd_links_add(const struct lu_env *env,
-                               struct mdd_object *mdd_obj,
-                               const struct lu_fid *pfid,
-                               const struct lu_name *lname,
-                               struct thandle *handle, int first)
-{
-       return mdd_links_rename(env, mdd_obj, NULL, NULL,
-                               pfid, lname, handle, first, 0);
-}
-
-static inline int mdd_links_del(const struct lu_env *env,
-                               struct mdd_object *mdd_obj,
-                               const struct lu_fid *pfid,
-                               const struct lu_name *lname,
-                               struct thandle *handle)
-{
-       return mdd_links_rename(env, mdd_obj, pfid, lname,
-                               NULL, NULL, handle, 0, 0);
-}
-
 const struct md_dir_operations mdd_dir_ops = {
         .mdo_is_subdir     = mdd_is_subdir,
         .mdo_lookup        = mdd_lookup,
index 9739ab2..b3691c2 100644 (file)
@@ -51,6 +51,7 @@
 #include <lustre_capa.h>
 #include <lprocfs_status.h>
 #include <lustre_log.h>
+#include <lustre_linkea.h>
 
 #include "mdd_lfsck.h"
 
@@ -156,6 +157,7 @@ struct mdd_thread_info {
         struct obd_trans_info     mti_oti;
         struct lu_buf             mti_buf;
         struct lu_buf             mti_big_buf; /* biggish persistent buf */
+       struct lu_buf             mti_link_buf; /* buf for link ea */
         struct lu_name            mti_name;
        struct lu_name            mti_name2;
         struct obdo               mti_oa;
@@ -167,22 +169,7 @@ struct mdd_thread_info {
         int                       mti_max_cookie_size;
         struct dt_object_format   mti_dof;
         struct obd_quotactl       mti_oqctl;
-};
-
-/**
- * The data that link search is done on.
- */
-struct mdd_link_data {
-       /**
-        * Buffer to keep link EA body.
-        */
-       struct lu_buf           *ml_buf;
-       /**
-        * The matched header, entry and its lenght in the EA
-        */
-       struct link_ea_header   *ml_leh;
-       struct link_ea_entry    *ml_lee;
-       int                      ml_reclen;
+       struct linkea_data        mti_link_data;
 };
 
 extern const char orph_index_name[];
@@ -302,10 +289,7 @@ int mdd_unlink_sanity_check(const struct lu_env *env, struct mdd_object *pobj,
                            struct mdd_object *cobj, struct lu_attr *cattr);
 int mdd_finish_unlink(const struct lu_env *env, struct mdd_object *obj,
                       struct md_attr *ma, struct thandle *th);
-int mdd_object_initialize(const struct lu_env *env, const struct lu_fid *pfid,
-                         const struct lu_name *lname, struct mdd_object *child,
-                         const struct lu_attr *attr, struct thandle *handle,
-                         const struct md_op_spec *spec);
+
 int mdd_link_sanity_check(const struct lu_env *env, struct mdd_object *tgt_obj,
                           const struct lu_name *lname, struct mdd_object *src_obj);
 int mdd_is_root(struct mdd_device *mdd, const struct lu_fid *fid);
@@ -313,24 +297,13 @@ int mdd_lookup(const struct lu_env *env,
                struct md_object *pobj, const struct lu_name *lname,
                struct lu_fid* fid, struct md_op_spec *spec);
 int mdd_links_read(const struct lu_env *env, struct mdd_object *mdd_obj,
-                  struct mdd_link_data *ldata);
-int mdd_links_find(const struct lu_env *env, struct mdd_object *mdd_obj,
-                  struct mdd_link_data *ldata, const struct lu_name *lname,
-                  const struct lu_fid  *pfid);
-int mdd_links_new(const struct lu_env *env, struct mdd_link_data *ldata);
-int mdd_links_add_buf(const struct lu_env *env, struct mdd_link_data *ldata,
-                     const struct lu_name *lname, const struct lu_fid *pfid);
-void mdd_links_del_buf(const struct lu_env *env, struct mdd_link_data *ldata,
-                      const struct lu_name *lname);
-int mdd_declare_links_add(const struct lu_env *env,
-                         struct mdd_object *mdd_obj,
-                         struct thandle *handle);
+                  struct linkea_data *ldata);
+int mdd_declare_links_add(const struct lu_env *env, struct mdd_object *mdd_obj,
+                         struct thandle *handle, struct linkea_data *ldata);
 int mdd_links_write(const struct lu_env *env, struct mdd_object *mdd_obj,
-                   struct mdd_link_data *ldata, struct thandle *handle);
+                   struct linkea_data *ldata, struct thandle *handle);
 struct lu_buf *mdd_links_get(const struct lu_env *env,
                              struct mdd_object *mdd_obj);
-void mdd_lee_unpack(const struct link_ea_entry *lee, int *reclen,
-                    struct lu_name *lname, struct lu_fid *pfid);
 int mdd_links_rename(const struct lu_env *env,
                     struct mdd_object *mdd_obj,
                     const struct lu_fid *oldpfid,
@@ -338,7 +311,10 @@ int mdd_links_rename(const struct lu_env *env,
                     const struct lu_fid *newpfid,
                     const struct lu_name *newlname,
                     struct thandle *handle,
+                    struct linkea_data *ldata,
                     int first, int check);
+int mdd_declare_links_add(const struct lu_env *env, struct mdd_object *mdd_obj,
+                         struct thandle *handle, struct linkea_data *ldata);
 
 /* mdd_lov.c */
 int mdd_declare_unlink_log(const struct lu_env *env, struct mdd_object *obj,
@@ -393,6 +369,8 @@ struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len);
 int mdd_buf_grow(const struct lu_env *env, ssize_t len);
 void mdd_buf_put(struct lu_buf *buf);
 
+struct lu_buf *mdd_link_buf_alloc(const struct lu_env *env, ssize_t len);
+int mdd_link_buf_grow(const struct lu_env *env, ssize_t len);
 extern const struct md_dir_operations    mdd_dir_ops;
 extern const struct md_object_operations mdd_obj_ops;
 
index 152a2d4..02ea968 100644 (file)
@@ -912,7 +912,7 @@ static int mdd_lfsck_namespace_double_scan_one(const struct lu_env *env,
        struct lfsck_bookmark   *bk       = &lfsck->ml_bookmark_ram;
        struct lfsck_namespace  *ns       =
                                (struct lfsck_namespace *)com->lc_file_ram;
-       struct mdd_link_data     ldata    = { 0 };
+       struct linkea_data       ldata    = { 0 };
        struct thandle          *handle   = NULL;
        bool                     locked   = false;
        bool                     update   = false;
@@ -930,7 +930,7 @@ again:
                if (IS_ERR(handle))
                        RETURN(rc = PTR_ERR(handle));
 
-               rc = mdd_declare_links_add(env, child, handle);;
+               rc = mdd_declare_links_add(env, child, handle, NULL);
                if (rc != 0)
                        GOTO(stop, rc);
 
@@ -958,13 +958,14 @@ again:
        if (rc != 0)
                GOTO(stop, rc);
 
-       ldata.ml_lee = (struct link_ea_entry *)(ldata.ml_leh + 1);
-       count = ldata.ml_leh->leh_reccount;
+       ldata.ld_lee = LINKEA_FIRST_ENTRY(ldata);
+       count = ldata.ld_leh->leh_reccount;
        while (count-- > 0) {
                struct mdd_object *parent = NULL;
                struct dt_object *dir;
 
-               mdd_lee_unpack(ldata.ml_lee, &ldata.ml_reclen, cname, pfid);
+               linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen, cname,
+                                   pfid);
                if (!fid_is_sane(pfid))
                        goto shrink;
 
@@ -980,8 +981,7 @@ again:
                /* XXX: need more processing for remote object in the future. */
                if (mdd_object_remote(parent)) {
                        mdd_object_put(env, parent);
-                       ldata.ml_lee = (struct link_ea_entry *)
-                                      ((char *)ldata.ml_lee + ldata.ml_reclen);
+                       ldata.ld_lee = LINKEA_NEXT_ENTRY(ldata);
                        continue;
                }
 
@@ -1004,15 +1004,14 @@ again:
                if (rc == 0) {
                        if (lu_fid_eq(cfid, mdo2fid(child))) {
                                mdd_object_put(env, parent);
-                               ldata.ml_lee = (struct link_ea_entry *)
-                                      ((char *)ldata.ml_lee + ldata.ml_reclen);
+                               ldata.ld_lee = LINKEA_NEXT_ENTRY(ldata);
                                continue;
                        }
 
                        goto shrink;
                }
 
-               if (ldata.ml_leh->leh_reccount > la->la_nlink)
+               if (ldata.ld_leh->leh_reccount > la->la_nlink)
                        goto shrink;
 
                /* XXX: For the case of there is linkea entry, but without name
@@ -1023,8 +1022,7 @@ again:
                 *      It is out of LFSCK 1.5 scope, will implement it in the
                 *      future. Keep the linkEA entry. */
                mdd_object_put(env, parent);
-               ldata.ml_lee = (struct link_ea_entry *)
-                              ((char *)ldata.ml_lee + ldata.ml_reclen);
+               ldata.ld_lee = LINKEA_NEXT_ENTRY(ldata);
                continue;
 
 shrink:
@@ -1036,7 +1034,7 @@ shrink:
                CDEBUG(D_LFSCK, "Remove linkEA: "DFID"[%.*s], "DFID"\n",
                       PFID(mdo2fid(child)), cname->ln_namelen, cname->ln_name,
                       PFID(pfid));
-               mdd_links_del_buf(env, &ldata, cname);
+               linkea_del_buf(&ldata, cname);
                update = true;
        }
 
@@ -1272,7 +1270,7 @@ static int mdd_declare_lfsck_namespace_exec_dir(const struct lu_env *env,
                return rc;
 
        /* For insert new linkEA entry. */
-       rc = mdd_declare_links_add(env, obj, handle);
+       rc = mdd_declare_links_add(env, obj, handle, NULL);
        return rc;
 }
 
@@ -1315,7 +1313,7 @@ static int mdd_lfsck_namespace_exec_dir(const struct lu_env *env,
        struct lfsck_namespace     *ns       =
                                (struct lfsck_namespace *)com->lc_file_ram;
        struct mdd_device          *mdd      = mdd_lfsck2mdd(lfsck);
-       struct mdd_link_data        ldata    = { 0 };
+       struct linkea_data          ldata    = { 0 };
        const struct lu_fid        *pfid     =
                                lu_object_fid(&lfsck->ml_obj_dir->do_lu);
        const struct lu_fid        *cfid     = mdo2fid(obj);
@@ -1373,8 +1371,8 @@ again:
 
        rc = mdd_links_read(env, obj, &ldata);
        if (rc == 0) {
-               count = ldata.ml_leh->leh_reccount;
-               rc = mdd_links_find(env, obj, &ldata, cname, pfid);
+               count = ldata.ld_leh->leh_reccount;
+               rc = linkea_links_find(&ldata, cname, pfid);
                if (rc == 0) {
                        /* For dir, if there are more than one linkea entries,
                         * then remove all the other redundant linkea entries.*/
@@ -1435,7 +1433,7 @@ unmatch:
                }
 
 nodata:
-               rc = mdd_links_new(env, &ldata);
+               rc = linkea_data_new(&ldata, &mdd_env_info(env)->mti_link_buf);
                if (rc != 0)
                        GOTO(stop, rc);
 
@@ -1443,7 +1441,7 @@ add:
                if (!com->lc_journal)
                        goto again;
 
-               rc = mdd_links_add_buf(env, &ldata, cname, pfid);
+               rc = linkea_add_buf(&ldata, cname, pfid);
                if (rc != 0)
                        GOTO(stop, rc);
 
@@ -1451,7 +1449,7 @@ add:
                if (rc != 0)
                        GOTO(stop, rc);
 
-               count = ldata.ml_leh->leh_reccount;
+               count = ldata.ld_leh->leh_reccount;
                repaired = true;
        } else {
                GOTO(stop, rc);
index fe796c2..5a43810 100644 (file)
@@ -128,14 +128,6 @@ struct lu_buf *mdd_buf_get(const struct lu_env *env, void *area, ssize_t len)
         return buf;
 }
 
-void mdd_buf_put(struct lu_buf *buf)
-{
-       if (buf == NULL || buf->lb_buf == NULL)
-               return;
-       OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
-       *buf = LU_BUF_NULL;
-}
-
 const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
                                        const void *area, ssize_t len)
 {
@@ -147,49 +139,6 @@ const struct lu_buf *mdd_buf_get_const(const struct lu_env *env,
         return buf;
 }
 
-struct lu_buf *mdd_buf_alloc(const struct lu_env *env, ssize_t len)
-{
-       struct lu_buf *buf = &mdd_env_info(env)->mti_big_buf;
-
-       if ((len > buf->lb_len) && (buf->lb_buf != NULL)) {
-               OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
-               *buf = LU_BUF_NULL;
-       }
-       if (memcmp(buf, &LU_BUF_NULL, sizeof(*buf)) == 0) {
-               buf->lb_len = len;
-               OBD_ALLOC_LARGE(buf->lb_buf, buf->lb_len);
-               if (buf->lb_buf == NULL)
-                       *buf = LU_BUF_NULL;
-       }
-       return buf;
-}
-
-/** Increase the size of the \a mti_big_buf.
- * preserves old data in buffer
- * old buffer remains unchanged on error
- * \retval 0 or -ENOMEM
- */
-int mdd_buf_grow(const struct lu_env *env, ssize_t len)
-{
-        struct lu_buf *oldbuf = &mdd_env_info(env)->mti_big_buf;
-        struct lu_buf buf;
-
-        LASSERT(len >= oldbuf->lb_len);
-        OBD_ALLOC_LARGE(buf.lb_buf, len);
-
-        if (buf.lb_buf == NULL)
-                return -ENOMEM;
-
-        buf.lb_len = len;
-        memcpy(buf.lb_buf, oldbuf->lb_buf, oldbuf->lb_len);
-
-        OBD_FREE_LARGE(oldbuf->lb_buf, oldbuf->lb_len);
-
-        memcpy(oldbuf, &buf, sizeof(buf));
-
-        return 0;
-}
-
 struct lu_object *mdd_object_alloc(const struct lu_env *env,
                                    const struct lu_object_header *hdr,
                                    struct lu_device *d)
@@ -273,246 +222,7 @@ struct mdd_object *mdd_object_find(const struct lu_env *env,
         return md2mdd_obj(md_object_find_slice(env, &d->mdd_md_dev, f));
 }
 
-static int mdd_path2fid(const struct lu_env *env, struct mdd_device *mdd,
-                        const char *path, struct lu_fid *fid)
-{
-        struct lu_buf *buf;
-        struct lu_fid *f = &mdd_env_info(env)->mti_fid;
-        struct mdd_object *obj;
-        struct lu_name *lname = &mdd_env_info(env)->mti_name;
-        char *name;
-        int rc = 0;
-        ENTRY;
-
-        /* temp buffer for path element */
-        buf = mdd_buf_alloc(env, PATH_MAX);
-        if (buf->lb_buf == NULL)
-                RETURN(-ENOMEM);
-
-        lname->ln_name = name = buf->lb_buf;
-        lname->ln_namelen = 0;
-        *f = mdd->mdd_root_fid;
-
-        while(1) {
-                while (*path == '/')
-                        path++;
-                if (*path == '\0')
-                        break;
-                while (*path != '/' && *path != '\0') {
-                        *name = *path;
-                        path++;
-                        name++;
-                        lname->ln_namelen++;
-                }
-
-                *name = '\0';
-                /* find obj corresponding to fid */
-                obj = mdd_object_find(env, mdd, f);
-                if (obj == NULL)
-                        GOTO(out, rc = -EREMOTE);
-                if (IS_ERR(obj))
-                        GOTO(out, rc = PTR_ERR(obj));
-                /* get child fid from parent and name */
-                rc = mdd_lookup(env, &obj->mod_obj, lname, f, NULL);
-                mdd_object_put(env, obj);
-                if (rc)
-                        break;
-
-                name = buf->lb_buf;
-                lname->ln_namelen = 0;
-        }
-
-        if (!rc)
-                *fid = *f;
-out:
-        RETURN(rc);
-}
-
-/** The maximum depth that fid2path() will search.
- * This is limited only because we want to store the fids for
- * historical path lookup purposes.
- */
-#define MAX_PATH_DEPTH 100
-
-/** mdd_path() lookup structure. */
-struct path_lookup_info {
-        __u64                pli_recno;        /**< history point */
-        __u64                pli_currec;       /**< current record */
-        struct lu_fid        pli_fid;
-        struct lu_fid        pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
-        struct mdd_object   *pli_mdd_obj;
-        char                *pli_path;         /**< full path */
-        int                  pli_pathlen;
-        int                  pli_linkno;       /**< which hardlink to follow */
-        int                  pli_fidcount;     /**< number of \a pli_fids */
-};
-
-static int mdd_path_current(const struct lu_env *env,
-                            struct path_lookup_info *pli)
-{
-        struct mdd_device *mdd = mdo2mdd(&pli->pli_mdd_obj->mod_obj);
-        struct mdd_object *mdd_obj;
-        struct lu_buf     *buf = NULL;
-        struct link_ea_header *leh;
-        struct link_ea_entry  *lee;
-        struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
-        struct lu_fid  *tmpfid = &mdd_env_info(env)->mti_fid;
-        char *ptr;
-        int reclen;
-        int rc;
-        ENTRY;
-
-        ptr = pli->pli_path + pli->pli_pathlen - 1;
-        *ptr = 0;
-        --ptr;
-        pli->pli_fidcount = 0;
-        pli->pli_fids[0] = *(struct lu_fid *)mdd_object_fid(pli->pli_mdd_obj);
-
-        while (!mdd_is_root(mdd, &pli->pli_fids[pli->pli_fidcount])) {
-                mdd_obj = mdd_object_find(env, mdd,
-                                          &pli->pli_fids[pli->pli_fidcount]);
-                if (mdd_obj == NULL)
-                        GOTO(out, rc = -EREMOTE);
-                if (IS_ERR(mdd_obj))
-                        GOTO(out, rc = PTR_ERR(mdd_obj));
-                rc = lu_object_exists(&mdd_obj->mod_obj.mo_lu);
-                if (rc <= 0) {
-                        mdd_object_put(env, mdd_obj);
-                        if (rc == -1)
-                                rc = -EREMOTE;
-                        else if (rc == 0)
-                                /* Do I need to error out here? */
-                                rc = -ENOENT;
-                        GOTO(out, rc);
-                }
-
-                /* Get parent fid and object name */
-                mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD);
-                buf = mdd_links_get(env, mdd_obj);
-                mdd_read_unlock(env, mdd_obj);
-                mdd_object_put(env, mdd_obj);
-                if (IS_ERR(buf))
-                        GOTO(out, rc = PTR_ERR(buf));
-
-                leh = buf->lb_buf;
-                lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
-                mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
-
-                /* If set, use link #linkno for path lookup, otherwise use
-                   link #0.  Only do this for the final path element. */
-                if ((pli->pli_fidcount == 0) &&
-                    (pli->pli_linkno < leh->leh_reccount)) {
-                        int count;
-                        for (count = 0; count < pli->pli_linkno; count++) {
-                                lee = (struct link_ea_entry *)
-                                     ((char *)lee + reclen);
-                                mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
-                        }
-                        if (pli->pli_linkno < leh->leh_reccount - 1)
-                                /* indicate to user there are more links */
-                                pli->pli_linkno++;
-                }
-
-                /* Pack the name in the end of the buffer */
-                ptr -= tmpname->ln_namelen;
-                if (ptr - 1 <= pli->pli_path)
-                        GOTO(out, rc = -EOVERFLOW);
-                strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
-                *(--ptr) = '/';
-
-                /* Store the parent fid for historic lookup */
-                if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
-                        GOTO(out, rc = -EOVERFLOW);
-                pli->pli_fids[pli->pli_fidcount] = *tmpfid;
-        }
-
-        /* Verify that our path hasn't changed since we started the lookup.
-           Record the current index, and verify the path resolves to the
-           same fid. If it does, then the path is correct as of this index. */
-       spin_lock(&mdd->mdd_cl.mc_lock);
-       pli->pli_currec = mdd->mdd_cl.mc_index;
-       spin_unlock(&mdd->mdd_cl.mc_lock);
-        rc = mdd_path2fid(env, mdd, ptr, &pli->pli_fid);
-        if (rc) {
-                CDEBUG(D_INFO, "mdd_path2fid(%s) failed %d\n", ptr, rc);
-                GOTO (out, rc = -EAGAIN);
-        }
-        if (!lu_fid_eq(&pli->pli_fids[0], &pli->pli_fid)) {
-                CDEBUG(D_INFO, "mdd_path2fid(%s) found another FID o="DFID
-                       " n="DFID"\n", ptr, PFID(&pli->pli_fids[0]),
-                       PFID(&pli->pli_fid));
-                GOTO(out, rc = -EAGAIN);
-        }
-        ptr++; /* skip leading / */
-        memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
-
-        EXIT;
-out:
-        if (buf && !IS_ERR(buf) && buf->lb_len > OBD_ALLOC_BIG)
-                /* if we vmalloced a large buffer drop it */
-                mdd_buf_put(buf);
-
-        return rc;
-}
-
-static int mdd_path_historic(const struct lu_env *env,
-                             struct path_lookup_info *pli)
-{
-        return 0;
-}
-
 /* Returns the full path to this fid, as of changelog record recno. */
-static int mdd_path(const struct lu_env *env, struct md_object *obj,
-                    char *path, int pathlen, __u64 *recno, int *linkno)
-{
-        struct path_lookup_info *pli;
-        int tries = 3;
-        int rc = -EAGAIN;
-        ENTRY;
-
-        if (pathlen < 3)
-                RETURN(-EOVERFLOW);
-
-        if (mdd_is_root(mdo2mdd(obj), mdd_object_fid(md2mdd_obj(obj)))) {
-                path[0] = '\0';
-                RETURN(0);
-        }
-
-        OBD_ALLOC_PTR(pli);
-        if (pli == NULL)
-                RETURN(-ENOMEM);
-
-        pli->pli_mdd_obj = md2mdd_obj(obj);
-        pli->pli_recno = *recno;
-        pli->pli_path = path;
-        pli->pli_pathlen = pathlen;
-        pli->pli_linkno = *linkno;
-
-        /* Retry multiple times in case file is being moved */
-        while (tries-- && rc == -EAGAIN)
-                rc = mdd_path_current(env, pli);
-
-        /* For historical path lookup, the current links may not have existed
-         * at "recno" time.  We must switch over to earlier links/parents
-         * by using the changelog records.  If the earlier parent doesn't
-         * exist, we must search back through the changelog to reconstruct
-         * its parents, then check if it exists, etc.
-         * We may ignore this problem for the initial implementation and
-         * state that an "original" hardlink must still exist for us to find
-         * historic path name. */
-        if (pli->pli_recno != -1) {
-                rc = mdd_path_historic(env, pli);
-        } else {
-                *recno = pli->pli_currec;
-                /* Return next link index to caller */
-                *linkno = pli->pli_linkno;
-        }
-
-        OBD_FREE_PTR(pli);
-
-        RETURN (rc);
-}
-
 int mdd_get_flags(const struct lu_env *env, struct mdd_object *obj)
 {
         struct lu_attr *la = &mdd_env_info(env)->mti_la;
@@ -993,10 +703,10 @@ static int mdd_changelog_data_store(const struct lu_env *env,
                 RETURN(0);
         }
 
-        reclen = llog_data_len(sizeof(*rec));
-        buf = mdd_buf_alloc(env, reclen);
-        if (buf->lb_buf == NULL)
-                RETURN(-ENOMEM);
+       reclen = llog_data_len(sizeof(*rec));
+       buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_big_buf, reclen);
+       if (buf->lb_buf == NULL)
+               RETURN(-ENOMEM);
        rec = buf->lb_buf;
 
         rec->cr.cr_flags = CLF_VERSION | (CLF_FLAGMASK & flags);
@@ -1458,7 +1168,8 @@ repeat:
        if (memcmp(buf, &LU_BUF_NULL, sizeof(*buf)) == 0) {
                /* mti_big_buf was not allocated, so we have to
                 * allocate it based on the ea size */
-               buf = mdd_buf_alloc(env, sz);
+               buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_big_buf,
+                                            sz);
                if (buf->lb_buf == NULL)
                        GOTO(out, rc = -ENOMEM);
                goto repeat;
@@ -2212,6 +1923,5 @@ const struct md_object_operations mdd_obj_ops = {
        .moo_changelog          = mdd_changelog,
        .moo_capa_get           = mdd_capa_get,
        .moo_object_sync        = mdd_object_sync,
-       .moo_path               = mdd_path,
        .moo_object_lock        = mdd_object_lock,
 };
index fb5de09..c5293ec 100644 (file)
@@ -68,6 +68,7 @@
 #include <lustre_acl.h>
 #include <lustre_param.h>
 #include <lustre_quota.h>
+#include <lustre_linkea.h>
 
 mdl_mode_t mdt_mdl_lock_modes[] = {
         [LCK_MINMODE] = MDL_MINMODE,
@@ -93,8 +94,6 @@ ldlm_mode_t mdt_dlm_lock_modes[] = {
 
 static struct mdt_device *mdt_dev(struct lu_device *d);
 static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info, __u32 flags);
-static int mdt_fid2path(const struct lu_env *env, struct mdt_device *mdt,
-                        struct getinfo_fid2path *fp);
 
 static const struct lu_object_operations mdt_obj_ops;
 
@@ -238,12 +237,10 @@ static void mdt_lock_pdo_mode(struct mdt_thread_info *info, struct mdt_object *o
 
 int mdt_getstatus(struct mdt_thread_info *info)
 {
-        struct mdt_device *mdt  = info->mti_mdt;
-        struct md_device  *next = mdt->mdt_child;
-        struct mdt_body   *repbody;
-        int                rc;
-
-        ENTRY;
+       struct mdt_device       *mdt  = info->mti_mdt;
+       struct mdt_body         *repbody;
+       int                     rc;
+       ENTRY;
 
         rc = mdt_check_ucred(info);
         if (rc)
@@ -252,11 +249,8 @@ int mdt_getstatus(struct mdt_thread_info *info)
         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK))
                 RETURN(err_serious(-ENOMEM));
 
-        repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
-        rc = next->md_ops->mdo_root_get(info->mti_env, next, &repbody->fid1);
-        if (rc != 0)
-                RETURN(rc);
-
+       repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
+       repbody->fid1 = mdt->mdt_md_root_fid;
         repbody->valid |= OBD_MD_FLID;
 
         if (mdt->mdt_opts.mo_mds_capa &&
@@ -3078,7 +3072,8 @@ static void mdt_thread_info_init(struct ptlrpc_request *req,
         info->mti_mos = NULL;
 
         memset(&info->mti_attr, 0, sizeof(info->mti_attr));
-        info->mti_body = NULL;
+       info->mti_big_buf = LU_BUF_NULL;
+       info->mti_body = NULL;
         info->mti_object = NULL;
         info->mti_dlm_req = NULL;
         info->mti_has_trans = 0;
@@ -3093,16 +3088,20 @@ static void mdt_thread_info_init(struct ptlrpc_request *req,
 
 static void mdt_thread_info_fini(struct mdt_thread_info *info)
 {
-        int i;
+       int i;
 
-        req_capsule_fini(info->mti_pill);
-        if (info->mti_object != NULL) {
-                mdt_object_put(info->mti_env, info->mti_object);
-                info->mti_object = NULL;
-        }
-        for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++)
-                mdt_lock_handle_fini(&info->mti_lh[i]);
-        info->mti_env = NULL;
+       req_capsule_fini(info->mti_pill);
+       if (info->mti_object != NULL) {
+               mdt_object_put(info->mti_env, info->mti_object);
+               info->mti_object = NULL;
+       }
+
+       for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++)
+               mdt_lock_handle_fini(&info->mti_lh[i]);
+       info->mti_env = NULL;
+
+       if (unlikely(info->mti_big_buf.lb_buf != NULL))
+               lu_buf_free(&info->mti_big_buf);
 }
 
 static int mdt_filter_recovery_request(struct ptlrpc_request *req,
@@ -5035,11 +5034,12 @@ static int mdt_prepare(const struct lu_env *env,
        if (rc != 0)
                CWARN("Fail to auto trigger paused LFSCK.\n");
 
-       rc = mdt->mdt_child->md_ops->mdo_root_get(env, mdt->mdt_child,
-                                                 &mdt->mdt_md_root_fid);
-       if (rc)
-               RETURN(rc);
-
+       if (mdt->mdt_seq_site.ss_node_id == 0) {
+               rc = mdt->mdt_child->md_ops->mdo_root_get(env, mdt->mdt_child,
+                                                        &mdt->mdt_md_root_fid);
+               if (rc)
+                       RETURN(rc);
+       }
        LASSERT(!test_bit(MDT_FL_CFGLOG, &mdt->mdt_state));
        target_recovery_init(&mdt->mdt_lut, mdt_recovery_handle);
        set_bit(MDT_FL_CFGLOG, &mdt->mdt_state);
@@ -5480,32 +5480,202 @@ static int mdt_destroy_export(struct obd_export *exp)
         RETURN(0);
 }
 
-static int mdt_rpc_fid2path(struct mdt_thread_info *info, void *key,
-                            void *val, int vallen)
+/** The maximum depth that fid2path() will search.
+ * This is limited only because we want to store the fids for
+ * historical path lookup purposes.
+ */
+#define MAX_PATH_DEPTH 100
+
+/** mdt_path() lookup structure. */
+struct path_lookup_info {
+       __u64                   pli_recno;      /**< history point */
+       __u64                   pli_currec;     /**< current record */
+       struct lu_fid           pli_fid;
+       struct lu_fid           pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
+       struct mdt_object       *pli_mdt_obj;
+       char                    *pli_path;      /**< full path */
+       int                     pli_pathlen;
+       int                     pli_linkno;     /**< which hardlink to follow */
+       int                     pli_fidcount;   /**< number of \a pli_fids */
+};
+
+static int mdt_links_read(struct mdt_thread_info *info,
+                         struct mdt_object *mdt_obj, struct linkea_data *ldata)
 {
-        struct mdt_device *mdt = mdt_dev(info->mti_exp->exp_obd->obd_lu_dev);
-        struct getinfo_fid2path *fpout, *fpin;
-        int rc = 0;
+       int rc;
 
-        fpin = key + cfs_size_round(sizeof(KEY_FID2PATH));
-        fpout = val;
+       LASSERT(ldata->ld_buf->lb_buf != NULL);
+
+       if (!mdt_object_exists(mdt_obj))
+               return -ENODATA;
+
+       rc = mo_xattr_get(info->mti_env, mdt_object_child(mdt_obj),
+                         ldata->ld_buf, XATTR_NAME_LINK);
+       if (rc == -ERANGE) {
+               /* Buf was too small, figure out what we need. */
+               lu_buf_free(ldata->ld_buf);
+               rc = mo_xattr_get(info->mti_env, mdt_object_child(mdt_obj),
+                                 ldata->ld_buf, XATTR_NAME_LINK);
+               if (rc < 0)
+                       return rc;
+               ldata->ld_buf = lu_buf_check_and_alloc(ldata->ld_buf, rc);
+               if (ldata->ld_buf->lb_buf == NULL)
+                       return -ENOMEM;
+               rc = mo_xattr_get(info->mti_env, mdt_object_child(mdt_obj),
+                                 ldata->ld_buf, XATTR_NAME_LINK);
+       }
+       if (rc < 0)
+               return rc;
 
-        if (ptlrpc_req_need_swab(info->mti_pill->rc_req))
-                lustre_swab_fid2path(fpin);
+       linkea_init(ldata);
 
-        memcpy(fpout, fpin, sizeof(*fpin));
-        if (fpout->gf_pathlen != vallen - sizeof(*fpin))
-                RETURN(-EINVAL);
+       return 0;
+}
 
-        rc = mdt_fid2path(info->mti_env, mdt, fpout);
-        RETURN(rc);
+static int mdt_path_current(struct mdt_thread_info *info,
+                           struct path_lookup_info *pli)
+{
+       struct mdt_device       *mdt = info->mti_mdt;
+       struct mdt_object       *mdt_obj;
+       struct link_ea_header   *leh;
+       struct link_ea_entry    *lee;
+       struct lu_name          *tmpname = &info->mti_name;
+       struct lu_fid           *tmpfid = &info->mti_tmp_fid1;
+       struct lu_buf           *buf = &info->mti_big_buf;
+       char                    *ptr;
+       int                     reclen;
+       struct linkea_data      ldata = { 0 };
+       int                     rc;
+       ENTRY;
+
+       /* temp buffer for path element, the buffer will be finally freed
+        * in mdt_thread_info_fini */
+       buf = lu_buf_check_and_alloc(buf, PATH_MAX);
+       if (buf->lb_buf == NULL)
+               RETURN(-ENOMEM);
+
+       ldata.ld_buf = buf;
+       ptr = pli->pli_path + pli->pli_pathlen - 1;
+       *ptr = 0;
+       --ptr;
+       pli->pli_fidcount = 0;
+       pli->pli_fids[0] = *(struct lu_fid *)mdt_object_fid(pli->pli_mdt_obj);
+
+       /* root FID only exists on MDT0, and fid2path should also ends at MDT0,
+        * so checking root_fid can only happen on MDT0. */
+       while (!lu_fid_eq(&mdt->mdt_md_root_fid,
+                         &pli->pli_fids[pli->pli_fidcount])) {
+               mdt_obj = mdt_object_find(info->mti_env, mdt,
+                                         &pli->pli_fids[pli->pli_fidcount]);
+               if (IS_ERR(mdt_obj))
+                       GOTO(out, rc = PTR_ERR(mdt_obj));
+               if (mdt_object_remote(mdt_obj)) {
+                       mdt_object_put(info->mti_env, mdt_obj);
+                       GOTO(remote_out, rc = -EREMOTE);
+               }
+               if (!mdt_object_exists(mdt_obj)) {
+                       mdt_object_put(info->mti_env, mdt_obj);
+                       GOTO(out, rc = -ENOENT);
+               }
+
+               rc = mdt_links_read(info, mdt_obj, &ldata);
+               mdt_object_put(info->mti_env, mdt_obj);
+               if (rc != 0)
+                       GOTO(out, rc = PTR_ERR(buf));
+
+               leh = buf->lb_buf;
+               lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
+               linkea_entry_unpack(lee, &reclen, tmpname, tmpfid);
+               /* If set, use link #linkno for path lookup, otherwise use
+                  link #0.  Only do this for the final path element. */
+               if ((pli->pli_fidcount == 0) &&
+                   (pli->pli_linkno < leh->leh_reccount)) {
+                       int count;
+                       for (count = 0; count < pli->pli_linkno; count++) {
+                               lee = (struct link_ea_entry *)
+                                    ((char *)lee + reclen);
+                               linkea_entry_unpack(lee, &reclen, tmpname,
+                                                   tmpfid);
+                       }
+                       if (pli->pli_linkno < leh->leh_reccount - 1)
+                               /* indicate to user there are more links */
+                               pli->pli_linkno++;
+               }
+
+               /* Pack the name in the end of the buffer */
+               ptr -= tmpname->ln_namelen;
+               if (ptr - 1 <= pli->pli_path)
+                       GOTO(out, rc = -EOVERFLOW);
+               strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
+               *(--ptr) = '/';
+
+               /* Store the parent fid for historic lookup */
+               if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
+                       GOTO(out, rc = -EOVERFLOW);
+               pli->pli_fids[pli->pli_fidcount] = *tmpfid;
+       }
+
+remote_out:
+       ptr++; /* skip leading / */
+       memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
+
+       EXIT;
+out:
+       return rc;
 }
 
-static int mdt_fid2path(const struct lu_env *env, struct mdt_device *mdt,
-               struct getinfo_fid2path *fp)
+/* Returns the full path to this fid, as of changelog record recno. */
+static int mdt_path(struct mdt_thread_info *info, struct mdt_object *obj,
+                   char *path, int pathlen, __u64 *recno, int *linkno,
+                   struct lu_fid *fid)
 {
-       struct mdt_object *obj;
+       struct mdt_device       *mdt = info->mti_mdt;
+       struct path_lookup_info *pli;
+       int                     tries = 3;
+       int                     rc = -EAGAIN;
+       ENTRY;
+
+       if (pathlen < 3)
+               RETURN(-EOVERFLOW);
+
+       if (lu_fid_eq(&mdt->mdt_md_root_fid, mdt_object_fid(obj))) {
+               path[0] = '\0';
+               RETURN(0);
+       }
+
+       OBD_ALLOC_PTR(pli);
+       if (pli == NULL)
+               RETURN(-ENOMEM);
+
+       pli->pli_mdt_obj = obj;
+       pli->pli_recno = *recno;
+       pli->pli_path = path;
+       pli->pli_pathlen = pathlen;
+       pli->pli_linkno = *linkno;
+
+       /* Retry multiple times in case file is being moved */
+       while (tries-- && rc == -EAGAIN)
+               rc = mdt_path_current(info, pli);
+
+       /* return the last resolved fids to the client, so the client will
+        * build the left path on another MDT for remote object */
+       *fid = pli->pli_fids[pli->pli_fidcount];
+
+       *recno = pli->pli_currec;
+       /* Return next link index to caller */
+       *linkno = pli->pli_linkno;
+
+       OBD_FREE_PTR(pli);
+
+       RETURN(rc);
+}
+
+static int mdt_fid2path(struct mdt_thread_info *info,
+                       struct getinfo_fid2path *fp)
+{
+       struct mdt_device *mdt = info->mti_mdt;
        struct obd_device *obd = mdt2obd_dev(mdt);
+       struct mdt_object *obj;
        int    rc;
        ENTRY;
 
@@ -5522,7 +5692,7 @@ static int mdt_fid2path(const struct lu_env *env, struct mdt_device *mdt,
                RETURN(-EINVAL);
        }
 
-       obj = mdt_object_find(env, mdt, &fp->gf_fid);
+       obj = mdt_object_find(info->mti_env, mdt, &fp->gf_fid);
        if (obj == NULL || IS_ERR(obj)) {
                CDEBUG(D_IOCTL, "no object "DFID": %ld\n", PFID(&fp->gf_fid),
                        PTR_ERR(obj));
@@ -5535,16 +5705,40 @@ static int mdt_fid2path(const struct lu_env *env, struct mdt_device *mdt,
                rc = -ENOENT;
 
        if (rc < 0) {
-               mdt_object_put(env, obj);
+               mdt_object_put(info->mti_env, obj);
                CDEBUG(D_IOCTL, "nonlocal object "DFID": %d\n",
                        PFID(&fp->gf_fid), rc);
                RETURN(rc);
        }
 
-       rc = mo_path(env, md_object_next(&obj->mot_obj), fp->gf_path,
-                       fp->gf_pathlen, &fp->gf_recno, &fp->gf_linkno);
-       mdt_object_put(env, obj);
+       rc = mdt_path(info, obj, fp->gf_path, fp->gf_pathlen, &fp->gf_recno,
+                     &fp->gf_linkno, &fp->gf_fid);
+
+       CDEBUG(D_INFO, "fid "DFID", path %s recno "LPX64" linkno %u\n",
+              PFID(&fp->gf_fid), fp->gf_path, fp->gf_recno, fp->gf_linkno);
+
+       mdt_object_put(info->mti_env, obj);
+
+       RETURN(rc);
+}
+
+static int mdt_rpc_fid2path(struct mdt_thread_info *info, void *key,
+                           void *val, int vallen)
+{
+       struct getinfo_fid2path *fpout, *fpin;
+       int rc = 0;
+
+       fpin = key + cfs_size_round(sizeof(KEY_FID2PATH));
+       fpout = val;
+
+       if (ptlrpc_req_need_swab(info->mti_pill->rc_req))
+               lustre_swab_fid2path(fpin);
+
+       memcpy(fpout, fpin, sizeof(*fpin));
+       if (fpout->gf_pathlen != vallen - sizeof(*fpin))
+               RETURN(-EINVAL);
 
+       rc = mdt_fid2path(info, fpout);
        RETURN(rc);
 }
 
index 5878ef2..446c7c9 100644 (file)
@@ -468,9 +468,10 @@ struct mdt_thread_info {
         struct mdt_ioepoch        *mti_ioepoch;
         __u64                      mti_replayepoch;
 
-        loff_t                     mti_off;
-        struct lu_buf              mti_buf;
-        struct lustre_capa_key     mti_capa_key;
+       loff_t                     mti_off;
+       struct lu_buf              mti_buf;
+       struct lu_buf              mti_big_buf;
+       struct lustre_capa_key     mti_capa_key;
 
         /* Ops object filename */
         struct lu_name             mti_name;
index 6209e41..996e2a7 100644 (file)
@@ -16,7 +16,7 @@ obdclass-all-objs += statfs_pack.o obdo.o obd_config.o obd_mount.o mea.o
 obdclass-all-objs += lu_object.o dt_object.o capa.o
 obdclass-all-objs += cl_object.o cl_page.o cl_lock.o cl_io.o lu_ref.o
 obdclass-all-objs += acl.o idmap.o
-obdclass-all-objs += md_local_object.o md_attrs.o
+obdclass-all-objs += md_local_object.o md_attrs.o linkea.o
 
 obdclass-objs := $(obdclass-linux-objs) $(obdclass-all-objs)
 
diff --git a/lustre/obdclass/linkea.c b/lustre/obdclass/linkea.c
new file mode 100644 (file)
index 0000000..a09df77
--- /dev/null
@@ -0,0 +1,195 @@
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 021110-1307, USA
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2013, Intel Corporation.
+ * Use is subject to license terms.
+ *
+ * Author: Di Wang <di.wang@intel.com>
+ */
+
+#include <lustre/lustre_idl.h>
+#include <obd.h>
+#include <lustre_linkea.h>
+
+int linkea_data_new(struct linkea_data *ldata, struct lu_buf *buf)
+{
+       ldata->ld_buf = lu_buf_check_and_alloc(buf, CFS_PAGE_SIZE);
+       if (ldata->ld_buf->lb_buf == NULL)
+               return -ENOMEM;
+       ldata->ld_leh = ldata->ld_buf->lb_buf;
+       ldata->ld_leh->leh_magic = LINK_EA_MAGIC;
+       ldata->ld_leh->leh_len = sizeof(struct link_ea_header);
+       ldata->ld_leh->leh_reccount = 0;
+       return 0;
+}
+EXPORT_SYMBOL(linkea_data_new);
+
+int linkea_init(struct linkea_data *ldata)
+{
+       struct link_ea_header *leh;
+
+       LASSERT(ldata->ld_buf != NULL);
+       leh = ldata->ld_buf->lb_buf;
+       if (leh->leh_magic == __swab32(LINK_EA_MAGIC)) {
+               leh->leh_magic = LINK_EA_MAGIC;
+               leh->leh_reccount = __swab32(leh->leh_reccount);
+               leh->leh_len = __swab64(leh->leh_len);
+               /* entries are swabbed by linkea_entry_unpack */
+       }
+       if (leh->leh_magic != LINK_EA_MAGIC)
+               return -EINVAL;
+       if (leh->leh_reccount == 0)
+               return -ENODATA;
+
+       ldata->ld_leh = leh;
+       return 0;
+}
+EXPORT_SYMBOL(linkea_init);
+
+/**
+ * Pack a link_ea_entry.
+ * All elements are stored as chars to avoid alignment issues.
+ * Numbers are always big-endian
+ * \retval record length
+ */
+static int linkea_entry_pack(struct link_ea_entry *lee,
+                            const struct lu_name *lname,
+                            const struct lu_fid *pfid)
+{
+       struct lu_fid   tmpfid;
+       int             reclen;
+
+       fid_cpu_to_be(&tmpfid, pfid);
+       if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LINKEA_CRASH))
+               tmpfid.f_ver = ~0;
+       memcpy(&lee->lee_parent_fid, &tmpfid, sizeof(tmpfid));
+       memcpy(lee->lee_name, lname->ln_name, lname->ln_namelen);
+       reclen = sizeof(struct link_ea_entry) + lname->ln_namelen;
+
+       lee->lee_reclen[0] = (reclen >> 8) & 0xff;
+       lee->lee_reclen[1] = reclen & 0xff;
+       return reclen;
+}
+EXPORT_SYMBOL(linkea_entry_pack);
+
+void linkea_entry_unpack(const struct link_ea_entry *lee, int *reclen,
+                        struct lu_name *lname, struct lu_fid *pfid)
+{
+       *reclen = (lee->lee_reclen[0] << 8) | lee->lee_reclen[1];
+       memcpy(pfid, &lee->lee_parent_fid, sizeof(*pfid));
+       fid_be_to_cpu(pfid, pfid);
+       lname->ln_name = lee->lee_name;
+       lname->ln_namelen = *reclen - sizeof(struct link_ea_entry);
+}
+EXPORT_SYMBOL(linkea_entry_unpack);
+
+/**
+ * Add a record to the end of link ea buf
+ **/
+int linkea_add_buf(struct linkea_data *ldata, const struct lu_name *lname,
+                  const struct lu_fid *pfid)
+{
+       LASSERT(ldata->ld_leh != NULL);
+
+       if (lname == NULL || pfid == NULL)
+               return -EINVAL;
+
+       ldata->ld_reclen = lname->ln_namelen + sizeof(struct link_ea_entry);
+       if (ldata->ld_leh->leh_len + ldata->ld_reclen >
+           ldata->ld_buf->lb_len) {
+               if (lu_buf_check_and_grow(ldata->ld_buf,
+                                         ldata->ld_leh->leh_len +
+                                         ldata->ld_reclen) < 0)
+                       return -ENOMEM;
+       }
+
+       ldata->ld_leh = ldata->ld_buf->lb_buf;
+       ldata->ld_lee = ldata->ld_buf->lb_buf + ldata->ld_leh->leh_len;
+       ldata->ld_reclen = linkea_entry_pack(ldata->ld_lee, lname, pfid);
+       ldata->ld_leh->leh_len += ldata->ld_reclen;
+       ldata->ld_leh->leh_reccount++;
+       CDEBUG(D_INODE, "New link_ea name '%.*s' is added\n",
+              lname->ln_namelen, lname->ln_name);
+       return 0;
+}
+EXPORT_SYMBOL(linkea_add_buf);
+
+/** Del the current record from the link ea buf */
+void linkea_del_buf(struct linkea_data *ldata, const struct lu_name *lname)
+{
+       LASSERT(ldata->ld_leh != NULL && ldata->ld_lee != NULL);
+
+       ldata->ld_leh->leh_reccount--;
+       ldata->ld_leh->leh_len -= ldata->ld_reclen;
+       memmove(ldata->ld_lee, (char *)ldata->ld_lee + ldata->ld_reclen,
+               (char *)ldata->ld_leh + ldata->ld_leh->leh_len -
+               (char *)ldata->ld_lee);
+       CDEBUG(D_INODE, "Old link_ea name '%.*s' is removed\n",
+              lname->ln_namelen, lname->ln_name);
+}
+EXPORT_SYMBOL(linkea_del_buf);
+
+/**
+ * Check if such a link exists in linkEA.
+ *
+ * \param ldata link data the search to be done on
+ * \param lname name in the parent's directory entry pointing to this object
+ * \param pfid parent fid the link to be found for
+ *
+ * \retval   0 success
+ * \retval -ENOENT link does not exist
+ * \retval -ve on error
+ */
+int linkea_links_find(struct linkea_data *ldata, const struct lu_name *lname,
+                     const struct lu_fid  *pfid)
+{
+       struct lu_name tmpname;
+       struct lu_fid  tmpfid;
+       int count;
+
+       LASSERT(ldata->ld_leh != NULL);
+
+       /* link #0 */
+       ldata->ld_lee = (struct link_ea_entry *)(ldata->ld_leh + 1);
+
+       for (count = 0; count < ldata->ld_leh->leh_reccount; count++) {
+               linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen,
+                                   &tmpname, &tmpfid);
+               if (tmpname.ln_namelen == lname->ln_namelen &&
+                   lu_fid_eq(&tmpfid, pfid) &&
+                   (strncmp(tmpname.ln_name, lname->ln_name,
+                            tmpname.ln_namelen) == 0))
+                       break;
+               ldata->ld_lee = (struct link_ea_entry *)((char *)ldata->ld_lee +
+                                                        ldata->ld_reclen);
+       }
+
+       if (count == ldata->ld_leh->leh_reccount) {
+               CDEBUG(D_INODE, "Old link_ea name '%.*s' not found\n",
+                      lname->ln_namelen, lname->ln_name);
+               ldata->ld_lee = NULL;
+               return -ENOENT;
+       }
+       return 0;
+}
+EXPORT_SYMBOL(linkea_links_find);
index 41add70..38f29c0 100644 (file)
@@ -2092,12 +2092,6 @@ void lu_global_fini(void)
         lu_ref_global_fini();
 }
 
-struct lu_buf LU_BUF_NULL = {
-        .lb_buf = NULL,
-        .lb_len = 0
-};
-EXPORT_SYMBOL(LU_BUF_NULL);
-
 static __u32 ls_stats_read(struct lprocfs_stats *stats, int idx)
 {
 #ifdef LPROCFS
@@ -2228,6 +2222,12 @@ struct lu_object *lu_object_anon(const struct lu_env *env,
 }
 EXPORT_SYMBOL(lu_object_anon);
 
+struct lu_buf LU_BUF_NULL = {
+       .lb_buf = NULL,
+       .lb_len = 0
+};
+EXPORT_SYMBOL(LU_BUF_NULL);
+
 void lu_buf_free(struct lu_buf *buf)
 {
        LASSERT(buf);
@@ -2257,3 +2257,45 @@ void lu_buf_realloc(struct lu_buf *buf, int size)
        lu_buf_alloc(buf, size);
 }
 EXPORT_SYMBOL(lu_buf_realloc);
+
+struct lu_buf *lu_buf_check_and_alloc(struct lu_buf *buf, int len)
+{
+       if (buf->lb_buf == NULL && buf->lb_len == 0)
+               lu_buf_alloc(buf, len);
+
+       if ((len > buf->lb_len) && (buf->lb_buf != NULL))
+               lu_buf_realloc(buf, len);
+
+       return buf;
+}
+EXPORT_SYMBOL(lu_buf_check_and_alloc);
+
+/**
+ * Increase the size of the \a buf.
+ * preserves old data in buffer
+ * old buffer remains unchanged on error
+ * \retval 0 or -ENOMEM
+ */
+int lu_buf_check_and_grow(struct lu_buf *buf, int len)
+{
+       char *ptr;
+
+       if (len <= buf->lb_len)
+               return 0;
+
+       OBD_ALLOC_LARGE(ptr, len);
+       if (ptr == NULL)
+               return -ENOMEM;
+
+       /* Free the old buf */
+       if (buf->lb_buf != NULL) {
+               memcpy(ptr, buf->lb_buf, buf->lb_len);
+               OBD_FREE_LARGE(buf->lb_buf, buf->lb_len);
+       }
+
+       buf->lb_buf = ptr;
+       buf->lb_len = len;
+       return 0;
+}
+EXPORT_SYMBOL(lu_buf_check_and_grow);
+
index f9b8aaa..5dcec3c 100644 (file)
@@ -579,6 +579,7 @@ static int osp_md_declare_xattr_set(const struct lu_env *env,
        char                    *bufs[3] = {(char *)name, (char *)buf->lb_buf };
        int                     rc;
 
+       LASSERT(buf->lb_len > 0 && buf->lb_buf != NULL);
        update = osp_find_create_update_loc(th, dt);
        if (IS_ERR(update)) {
                CERROR("%s: Get OSP update buf failed: rc = %d\n",
@@ -661,8 +662,9 @@ static int osp_md_xattr_get(const struct lu_env *env, struct dt_object *dt,
        LASSERT(size > 0 && size < CFS_PAGE_SIZE);
        LASSERT(ea_buf != NULL);
 
-       buf->lb_len = size;
-       memcpy(buf->lb_buf, ea_buf, size);
+       rc = size;
+       if (buf->lb_buf != NULL)
+               memcpy(buf->lb_buf, ea_buf, size);
 out:
        if (req != NULL)
                ptlrpc_req_finished(req);
index 6c4ce0a..3d1df6d 100644 (file)
@@ -9094,7 +9094,7 @@ test_160() {
 }
 run_test 160 "changelog sanity"
 
-test_161() {
+test_161a() {
        [ $PARALLEL == "yes" ] && skip "skip parallel run" && return
     test_mkdir -p $DIR/$tdir
     cp /etc/hosts $DIR/$tdir/$tfile
@@ -9104,11 +9104,11 @@ test_161() {
     ln $DIR/$tdir/$tfile $DIR/$tdir/foo2/zachary
     ln $DIR/$tdir/$tfile $DIR/$tdir/foo1/luna
     ln $DIR/$tdir/$tfile $DIR/$tdir/foo2/thor
-    local FID=$($LFS path2fid $DIR/$tdir/$tfile | tr -d '[')
-    if [ "$($LFS fid2path $DIR $FID | wc -l)" != "5" ]; then
-       $LFS fid2path $DIR $FID
-       err17935 "bad link ea"
-    fi
+       local FID=$($LFS path2fid $DIR/$tdir/$tfile | tr -d '[]')
+       if [ "$($LFS fid2path $DIR $FID | wc -l)" != "5" ]; then
+               $LFS fid2path $DIR $FID
+               err17935 "bad link ea"
+       fi
     # middle
     rm $DIR/$tdir/foo2/zachary
     # last
@@ -9134,7 +9134,59 @@ test_161() {
     unlinkmany $DIR/$tdir/foo2/$longname 1000 || \
        error "failed to unlink many hardlinks"
 }
-run_test 161 "link ea sanity"
+run_test 161a "link ea sanity"
+
+test_161b() {
+       [ $PARALLEL == "yes" ] && skip "skip parallel run" && return
+       [ $MDSCOUNT -lt 2 ] &&
+               skip "skipping remote directory test" && return
+       local MDTIDX=1
+       local remote_dir=$DIR/$tdir/remote_dir
+
+       mkdir -p $DIR/$tdir
+       $LFS mkdir -i $MDTIDX $remote_dir ||
+               error "create remote directory failed"
+
+       cp /etc/hosts $remote_dir/$tfile
+       mkdir -p $remote_dir/foo1
+       mkdir -p $remote_dir/foo2
+       ln $remote_dir/$tfile $remote_dir/foo1/sofia
+       ln $remote_dir/$tfile $remote_dir/foo2/zachary
+       ln $remote_dir/$tfile $remote_dir/foo1/luna
+       ln $remote_dir/$tfile $remote_dir/foo2/thor
+
+       local FID=$($LFS path2fid $remote_dir/$tfile | tr -d '[' |
+                    tr -d ']')
+       if [ "$($LFS fid2path $DIR $FID | wc -l)" != "5" ]; then
+               $LFS fid2path $DIR $FID
+               err17935 "bad link ea"
+       fi
+       # middle
+       rm $remote_dir/foo2/zachary
+       # last
+       rm $remote_dir/foo2/thor
+       # first
+       rm $remote_dir/$tfile
+       # rename
+       mv $remote_dir/foo1/sofia $remote_dir/foo2/maggie
+       local link_path=$($LFS fid2path $FSNAME --link 1 $FID)
+       if [ "$DIR/$link_path" != "$remote_dir/foo2/maggie" ]; then
+               $LFS fid2path $DIR $FID
+               err17935 "bad link rename"
+       fi
+       rm $remote_dir/foo2/maggie
+
+       # overflow the EA
+       local longname=filename_avg_len_is_thirty_two_
+       createmany -l$remote_dir/foo1/luna $remote_dir/foo2/$longname 1000 ||
+               error "failed to hardlink many files"
+       links=$($LFS fid2path $DIR $FID | wc -l)
+       echo -n "${links}/1000 links in link EA"
+       [ ${links} -gt 60 ] || err17935 "expected at least 60 links in link EA"
+       unlinkmany $remote_dir/foo2/$longname 1000 ||
+       error "failed to unlink many hardlinks"
+}
+run_test 161b "link ea sanity under remote directory"
 
 check_path() {
     local expected=$1
@@ -10631,7 +10683,7 @@ mcreate_path2fid () {
        echo "pass with $path and $fid"
 }
 
-test_226 () {
+test_226a () {
        rm -rf $DIR/$tdir
        mkdir -p $DIR/$tdir
 
@@ -10644,7 +10696,29 @@ test_226 () {
        mcreate_path2fid 0120666 0 0 link "symbolic link"
        mcreate_path2fid 0140666 0 0 sock "socket"
 }
-run_test 226 "call path2fid and fid2path on files of all type"
+run_test 226a "call path2fid and fid2path on files of all type"
+
+test_226b () {
+       [ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return
+       rm -rf $DIR/$tdir
+       local MDTIDX=1
+
+       mkdir -p $DIR/$tdir
+       $LFS setdirstripe -i $MDTIDX $DIR/$tdir/remote_dir ||
+               error "create remote directory failed"
+       mcreate_path2fid 0010666 0 0 "remote_dir/fifo" "FIFO"
+       mcreate_path2fid 0020666 1 3 "remote_dir/null" \
+                               "character special file (null)"
+       mcreate_path2fid 0020666 1 255 "remote_dir/none" \
+                               "character special file (no device)"
+       mcreate_path2fid 0040666 0 0 "remote_dir/dir" "directory"
+       mcreate_path2fid 0060666 7 0 "remote_dir/loop0" \
+                               "block special file (loop)"
+       mcreate_path2fid 0100666 0 0 "remote_dir/file" "regular file"
+       mcreate_path2fid 0120666 0 0 "remote_dir/link" "symbolic link"
+       mcreate_path2fid 0140666 0 0 "remote_dir/sock" "socket"
+}
+run_test 226b "call path2fid and fid2path on files of all type under remote dir"
 
 # LU-1299 Executing or running ldd on a truncated executable does not
 # cause an out-of-memory condition.