Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / mdd / mdd_dir.c
index cb84239..03a9e86 100644 (file)
@@ -71,6 +71,19 @@ static struct lu_name lname_dotdot = {
 static int __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
                         const struct lu_name *lname, struct lu_fid* fid,
                         int mask);
+static int mdd_links_add(const struct lu_env *env,
+                         struct mdd_object *mdd_obj,
+                         const struct lu_fid *pfid,
+                         const struct lu_name *lname,
+                         struct thandle *handle);
+static int mdd_links_rename(const struct lu_env *env,
+                            struct mdd_object *mdd_obj,
+                            const struct lu_fid *oldpfid,
+                            const struct lu_name *oldlname,
+                            const struct lu_fid *newpfid,
+                            const struct lu_name *newlname,
+                            struct thandle *handle);
+
 static int
 __mdd_lookup_locked(const struct lu_env *env, struct md_object *pobj,
                     const struct lu_name *lname, struct lu_fid* fid, int mask)
@@ -89,9 +102,9 @@ __mdd_lookup_locked(const struct lu_env *env, struct md_object *pobj,
         return rc;
 }
 
-static int mdd_lookup(const struct lu_env *env,
-                      struct md_object *pobj, const struct lu_name *lname,
-                      struct lu_fid* fid, struct md_op_spec *spec)
+int mdd_lookup(const struct lu_env *env,
+               struct md_object *pobj, const struct lu_name *lname,
+               struct lu_fid* fid, struct md_op_spec *spec)
 {
         int rc;
         ENTRY;
@@ -99,7 +112,6 @@ static int mdd_lookup(const struct lu_env *env,
         RETURN(rc);
 }
 
-
 static int mdd_parent_fid(const struct lu_env *env, struct mdd_object *obj,
                           struct lu_fid *fid)
 {
@@ -107,10 +119,10 @@ static int mdd_parent_fid(const struct lu_env *env, struct mdd_object *obj,
 }
 
 /*
- * For root fid use special function, whcih does not compare version component
- * of fid. Vresion component is different for root fids on all MDTs.
+ * For root fid use special function, which does not compare version component
+ * of fid. Version component is different for root fids on all MDTs.
  */
-static int mdd_is_root(struct mdd_device *mdd, const struct lu_fid *fid)
+int mdd_is_root(struct mdd_device *mdd, const struct lu_fid *fid)
 {
         return fid_seq(&mdd->mdd_root_fid) == fid_seq(fid) &&
                 fid_oid(&mdd->mdd_root_fid) == fid_oid(fid);
@@ -577,6 +589,66 @@ __mdd_index_insert_only(const struct lu_env *env, struct mdd_object *pobj,
         RETURN(rc);
 }
 
+/** Store a namespace change changelog record
+ * If this fails, we must fail the whole transaction; we don't
+ * want the change to commit without the log entry.
+ * \param target - mdd_object of change
+ * \param parent - parent dir/object
+ * \param tf - target lu_fid, overrides fid of \a target if this is non-null
+ * \param tname - target name string
+ * \param handle - transacion handle
+ */
+static int mdd_changelog_ns_store(const struct lu_env  *env,
+                                  struct mdd_device    *mdd,
+                                  enum changelog_rec_type type,
+                                  struct mdd_object    *target,
+                                  struct mdd_object    *parent,
+                                  const struct lu_fid  *tf,
+                                  const struct lu_name *tname,
+                                  struct thandle *handle)
+{
+        const struct lu_fid *tfid;
+        const struct lu_fid *tpfid = mdo2fid(parent);
+        struct llog_changelog_rec *rec;
+        struct lu_buf *buf;
+        int reclen;
+        int rc;
+        ENTRY;
+
+        if (!(mdd->mdd_cl.mc_flags & CLM_ON))
+                RETURN(0);
+
+        LASSERT(parent != NULL);
+        LASSERT(tname != NULL);
+        LASSERT(handle != NULL);
+
+        /* target */
+        reclen = llog_data_len(sizeof(*rec) + tname->ln_namelen);
+        buf = mdd_buf_alloc(env, reclen);
+        if (buf->lb_buf == NULL)
+                RETURN(-ENOMEM);
+        rec = (struct llog_changelog_rec *)buf->lb_buf;
+
+        rec->cr_flags = CLF_VERSION;
+        rec->cr_type = (__u32)type;
+        tfid = tf ? tf : mdo2fid(target);
+        rec->cr_tfid = *tfid;
+        rec->cr_pfid = *tpfid;
+        rec->cr_namelen = tname->ln_namelen;
+        memcpy(rec->cr_name, tname->ln_name, rec->cr_namelen);
+        if (likely(target))
+                target->mod_cltime = cfs_time_current_64();
+
+        rc = mdd_changelog_llog_write(mdd, rec, handle);
+        if (rc < 0) {
+                CERROR("changelog failed: rc=%d, op%d %s c"DFID" p"DFID"\n",
+                       rc, type, tname->ln_name, PFID(tfid), PFID(tpfid));
+                return -EFAULT;
+        }
+
+        return 0;
+}
+
 static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
                     struct md_object *src_obj, const struct lu_name *lname,
                     struct md_attr *ma)
@@ -603,12 +675,15 @@ static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
 
                 rc = mdd_la_get(env, mdd_tobj, la_tmp, BYPASS_CAPA);
                 if (!rc) {
+                        void *data = NULL;
+                        mdd_data_get(env, mdd_tobj, &data);
                         quota_opc = FSFILT_OP_LINK;
                         mdd_quota_wrapper(la_tmp, qids);
                         /* get block quota for parent */
                         lquota_chkquota(mds_quota_interface_ref, obd,
                                         qids[USRQUOTA], qids[GRPQUOTA], 1,
-                                        &rec_pending, NULL, LQUOTA_FLAGS_BLK);
+                                        &rec_pending, NULL, LQUOTA_FLAGS_BLK,
+                                        data, 1);
                 }
         }
 #endif
@@ -645,11 +720,17 @@ static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
 
         la->la_valid = LA_CTIME;
         rc = mdd_attr_check_set_internal(env, mdd_sobj, la, handle, 0);
+        if (rc == 0)
+                mdd_links_add(env, mdd_sobj, mdo2fid(mdd_tobj), lname, handle);
+
         EXIT;
 out_unlock:
         mdd_write_unlock(env, mdd_sobj);
         mdd_pdo_write_unlock(env, mdd_tobj, dlh);
 out_trans:
+        if (rc == 0)
+                rc = mdd_changelog_ns_store(env, mdd, CL_HARDLINK, mdd_sobj,
+                                            mdd_tobj, NULL, lname, handle);
         mdd_trans_stop(env, mdd, rc, handle);
 out_pending:
 #ifdef HAVE_QUOTA_SUPPORT
@@ -657,7 +738,7 @@ out_pending:
                 if (rec_pending)
                         lquota_pending_commit(mds_quota_interface_ref, obd,
                                               qids[USRQUOTA], qids[GRPQUOTA],
-                                              1, 1);
+                                              rec_pending, 1);
                 /* Trigger dqacq for the parent owner. If failed,
                  * the next call for lquota_chkquota will process it. */
                 lquota_adjust(mds_quota_interface_ref, obd, 0, qids, rc,
@@ -733,7 +814,8 @@ static int mdd_unlink(const struct lu_env *env, struct md_object *pobj,
         unsigned int qpids[MAXQUOTAS] = { 0, 0 };
         int quota_opc = 0;
 #endif
-        int rc, is_dir;
+        int is_dir = S_ISDIR(ma->ma_attr.la_mode);
+        int rc;
         ENTRY;
 
         LASSERTF(mdd_object_exists(mdd_cobj) > 0, "FID is "DFID"\n",
@@ -747,13 +829,11 @@ static int mdd_unlink(const struct lu_env *env, struct md_object *pobj,
         if (IS_ERR(handle))
                 RETURN(PTR_ERR(handle));
 
-
         dlh = mdd_pdo_write_lock(env, mdd_pobj, name, MOR_TGT_PARENT);
         if (dlh == NULL)
                 GOTO(out_trans, rc = -ENOMEM);
         mdd_write_lock(env, mdd_cobj, MOR_TGT_CHILD);
 
-        is_dir = S_ISDIR(ma->ma_attr.la_mode);
         rc = mdd_unlink_sanity_check(env, mdd_pobj, mdd_cobj, ma);
         if (rc)
                 GOTO(cleanup, rc);
@@ -804,11 +884,22 @@ static int mdd_unlink(const struct lu_env *env, struct md_object *pobj,
                 obd_set_info_async(mdd2obd_dev(mdd)->u.mds.mds_osc_exp,
                                    sizeof(KEY_UNLINKED), KEY_UNLINKED, 0,
                                    NULL, NULL);
+        if (!is_dir)
+                /* old files may not have link ea; ignore errors */
+                mdd_links_rename(env, mdd_cobj, mdo2fid(mdd_pobj),
+                                 lname, NULL, NULL, handle);
+
         EXIT;
 cleanup:
         mdd_write_unlock(env, mdd_cobj);
         mdd_pdo_write_unlock(env, mdd_pobj, dlh);
 out_trans:
+        if (rc == 0)
+                rc = mdd_changelog_ns_store(env, mdd,
+                                            is_dir ? CL_RMDIR : CL_UNLINK,
+                                            mdd_cobj, mdd_pobj, NULL, lname,
+                                            handle);
+
         mdd_trans_stop(env, mdd, rc, handle);
 #ifdef HAVE_QUOTA_SUPPORT
         if (quota_opc)
@@ -871,13 +962,15 @@ static int mdd_name_insert(const struct lu_env *env,
 
                         rc = mdd_la_get(env, mdd_obj, la_tmp, BYPASS_CAPA);
                         if (!rc) {
+                                void *data = NULL;
+                                mdd_data_get(env, mdd_obj, &data);
                                 quota_opc = FSFILT_OP_LINK;
                                 mdd_quota_wrapper(la_tmp, qids);
                                 /* get block quota for parent */
                                 lquota_chkquota(mds_quota_interface_ref, obd,
                                                 qids[USRQUOTA], qids[GRPQUOTA],
                                                 1, &rec_pending, NULL,
-                                                LQUOTA_FLAGS_BLK);
+                                                LQUOTA_FLAGS_BLK, data, 1);
                         }
                 } else {
                         uc->mu_cap |= CFS_CAP_SYS_RESOURCE_MASK;
@@ -926,7 +1019,8 @@ out_pending:
                         if (rec_pending)
                                 lquota_pending_commit(mds_quota_interface_ref,
                                                       obd, qids[USRQUOTA],
-                                                      qids[GRPQUOTA], 1, 1);
+                                                      qids[GRPQUOTA],
+                                                      rec_pending, 1);
                         /* Trigger dqacq for the parent owner. If failed,
                          * the next call for lquota_chkquota will process it*/
                         lquota_adjust(mds_quota_interface_ref, obd, 0, qids,
@@ -1065,6 +1159,7 @@ static int mdd_rt_sanity_check(const struct lu_env *env,
         RETURN(rc);
 }
 
+/* Partial rename op on slave MDD */
 static int mdd_rename_tgt(const struct lu_env *env,
                           struct md_object *pobj, struct md_object *tobj,
                           const struct lu_fid *lf, const struct lu_name *lname,
@@ -1093,12 +1188,15 @@ static int mdd_rename_tgt(const struct lu_env *env,
 
                 rc = mdd_la_get(env, mdd_tpobj, la_tmp, BYPASS_CAPA);
                 if (!rc) {
+                        void *data = NULL;
+                        mdd_data_get(env, mdd_tpobj, &data);
                         quota_opc = FSFILT_OP_LINK;
                         mdd_quota_wrapper(la_tmp, qpids);
                         /* get block quota for target parent */
                         lquota_chkquota(mds_quota_interface_ref, obd,
                                         qpids[USRQUOTA], qpids[GRPQUOTA], 1,
-                                        &rec_pending, NULL, LQUOTA_FLAGS_BLK);
+                                        &rec_pending, NULL, LQUOTA_FLAGS_BLK,
+                                        data, 1);
                 }
         }
 #endif
@@ -1173,6 +1271,12 @@ cleanup:
                 mdd_write_unlock(env, mdd_tobj);
         mdd_pdo_write_unlock(env, mdd_tpobj, dlh);
 out_trans:
+        if (rc == 0)
+                /* Bare EXT record with no RENAME in front of it signifies
+                   a partial slave op */
+                rc = mdd_changelog_ns_store(env, mdd, CL_EXT, mdd_tobj,
+                                            mdd_tpobj, NULL, lname, handle);
+
         mdd_trans_stop(env, mdd, rc, handle);
 out_pending:
 #ifdef HAVE_QUOTA_SUPPORT
@@ -1181,7 +1285,7 @@ out_pending:
                         lquota_pending_commit(mds_quota_interface_ref, obd,
                                               qpids[USRQUOTA],
                                               qpids[GRPQUOTA],
-                                              1, 1);
+                                              rec_pending, 1);
                 if (quota_opc)
                         /* Trigger dqrel/dqacq on the target owner of child and
                          * parent. If failed, the next call for lquota_chkquota
@@ -1272,6 +1376,7 @@ out_free:
         RETURN(rc);
 }
 
+/* Get fid from name and parent */
 static int
 __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
              const struct lu_name *lname, struct lu_fid* fid, int mask)
@@ -1308,6 +1413,7 @@ __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
 
         if (likely(S_ISDIR(mdd_object_type(mdd_obj)) &&
                    dt_try_as_dir(env, dir))) {
+
                 rc = dir->do_index_ops->dio_lookup(env, dir,
                                                  (struct dt_rec *)pack, key,
                                                  mdd_object_capa(env, mdd_obj));
@@ -1322,8 +1428,9 @@ __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
 }
 
 int mdd_object_initialize(const struct lu_env *env, const struct lu_fid *pfid,
-                          struct mdd_object *child, struct md_attr *ma,
-                          struct thandle *handle, const struct md_op_spec *spec)
+                          const struct lu_name *lname, struct mdd_object *child,
+                          struct md_attr *ma, struct thandle *handle,
+                          const struct md_op_spec *spec)
 {
         int rc;
         ENTRY;
@@ -1360,6 +1467,9 @@ int mdd_object_initialize(const struct lu_env *env, const struct lu_fid *pfid,
                         }
                 }
         }
+        if (rc == 0)
+                mdd_links_add(env, child, pfid, lname, handle);
+
         RETURN(rc);
 }
 
@@ -1535,7 +1645,7 @@ static int mdd_create(const struct lu_env *env,
                         /* get file quota for child */
                         lquota_chkquota(mds_quota_interface_ref, obd,
                                         qcids[USRQUOTA], qcids[GRPQUOTA], 1,
-                                        &inode_pending, NULL, 0);
+                                        &inode_pending, NULL, 0, NULL, 0);
                         switch (ma->ma_attr.la_mode & S_IFMT) {
                         case S_IFLNK:
                         case S_IFDIR:
@@ -1556,12 +1666,12 @@ static int mdd_create(const struct lu_env *env,
                                                 qcids[USRQUOTA], qcids[GRPQUOTA],
                                                 block_count,
                                                 &block_pending, NULL,
-                                                LQUOTA_FLAGS_BLK);
+                                                LQUOTA_FLAGS_BLK, NULL, 0);
                         if (!same)
                                 lquota_chkquota(mds_quota_interface_ref, obd,
                                                 qpids[USRQUOTA], qpids[GRPQUOTA], 1,
                                                 &parent_pending, NULL,
-                                                LQUOTA_FLAGS_BLK);
+                                                LQUOTA_FLAGS_BLK, NULL, 0);
                 }
         }
 #endif
@@ -1626,7 +1736,7 @@ static int mdd_create(const struct lu_env *env,
         }
 #endif
 
-        rc = mdd_object_initialize(env, mdo2fid(mdd_pobj),
+        rc = mdd_object_initialize(env, mdo2fid(mdd_pobj), lname,
                                    son, ma, handle, spec);
         mdd_write_unlock(env, son);
         if (rc)
@@ -1717,6 +1827,12 @@ cleanup:
 
         mdd_pdo_write_unlock(env, mdd_pobj, dlh);
 out_trans:
+        if (rc == 0)
+                rc = mdd_changelog_ns_store(env, mdd,
+                            S_ISDIR(attr->la_mode) ? CL_MKDIR :
+                            S_ISREG(attr->la_mode) ? CL_CREATE :
+                            S_ISLNK(attr->la_mode) ? CL_SOFTLINK : CL_MKNOD,
+                            son, mdd_pobj, NULL, lname, handle);
         mdd_trans_stop(env, mdd, rc, handle);
 out_free:
         /* finis lov_create stuff, free all temporary data */
@@ -1727,15 +1843,15 @@ out_pending:
                 if (inode_pending)
                         lquota_pending_commit(mds_quota_interface_ref, obd,
                                               qcids[USRQUOTA], qcids[GRPQUOTA],
-                                              1, 0);
+                                              inode_pending, 0);
                 if (block_pending)
                         lquota_pending_commit(mds_quota_interface_ref, obd,
                                               qcids[USRQUOTA], qcids[GRPQUOTA],
-                                              block_count, 1);
+                                              block_pending, 1);
                 if (parent_pending)
                         lquota_pending_commit(mds_quota_interface_ref, obd,
                                               qpids[USRQUOTA], qpids[GRPQUOTA],
-                                              1, 1);
+                                              parent_pending, 1);
                 /* Trigger dqacq on the owner of child and parent. If failed,
                  * the next call for lquota_chkquota will process it. */
                 lquota_adjust(mds_quota_interface_ref, obd, qcids, qpids, rc,
@@ -1837,10 +1953,10 @@ static int mdd_rename(const struct lu_env *env,
         const char *sname = lsname->ln_name;
         const char *tname = ltname->ln_name;
         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
-        struct mdd_object *mdd_spobj = md2mdd_obj(src_pobj);
+        struct mdd_object *mdd_spobj = md2mdd_obj(src_pobj); /* source parent */
         struct mdd_object *mdd_tpobj = md2mdd_obj(tgt_pobj);
         struct mdd_device *mdd = mdo2mdd(src_pobj);
-        struct mdd_object *mdd_sobj = NULL;
+        struct mdd_object *mdd_sobj = NULL;                  /* source object */
         struct mdd_object *mdd_tobj = NULL;
         struct dynlock_handle *sdlh, *tdlh;
         struct thandle *handle;
@@ -1875,6 +1991,8 @@ static int mdd_rename(const struct lu_env *env,
                                 rc = mdd_la_get(env, mdd_tpobj, la_tmp,
                                                 BYPASS_CAPA);
                                 if (!rc) {
+                                        void *data = NULL;
+                                        mdd_data_get(env, mdd_tpobj, &data);
                                         quota_opc = FSFILT_OP_LINK;
                                         mdd_quota_wrapper(la_tmp, qtpids);
                                         /* get block quota for target parent */
@@ -1882,7 +2000,8 @@ static int mdd_rename(const struct lu_env *env,
                                                         obd, qtpids[USRQUOTA],
                                                         qtpids[GRPQUOTA], 1,
                                                         &rec_pending, NULL,
-                                                        LQUOTA_FLAGS_BLK);
+                                                        LQUOTA_FLAGS_BLK,
+                                                        data, 1);
                                 }
                         }
                 }
@@ -1924,6 +2043,7 @@ static int mdd_rename(const struct lu_env *env,
         if (rc)
                 GOTO(cleanup, rc);
 
+        /* Remove source name from source directory */
         rc = __mdd_index_delete(env, mdd_spobj, sname, is_dir, handle,
                                 mdd_object_capa(env, mdd_spobj));
         if (rc)
@@ -1943,7 +2063,7 @@ static int mdd_rename(const struct lu_env *env,
                         GOTO(cleanup, rc);
         }
 
-        /*
+        /* Remove target name from target directory
          * Here tobj can be remote one, so we do index_delete unconditionally
          * and -ENOENT is allowed.
          */
@@ -1952,6 +2072,7 @@ static int mdd_rename(const struct lu_env *env,
         if (rc != 0 && rc != -ENOENT)
                 GOTO(cleanup, rc);
 
+        /* Insert new fid with target name into target dir */
         rc = __mdd_index_insert(env, mdd_tpobj, lf, tname, is_dir, handle,
                                 mdd_object_capa(env, mdd_tpobj));
         if (rc)
@@ -1969,7 +2090,7 @@ static int mdd_rename(const struct lu_env *env,
                         GOTO(cleanup, rc);
         }
 
-        /*
+        /* Remove old target object
          * For tobj is remote case cmm layer has processed
          * and set tobj to NULL then. So when tobj is NOT NULL,
          * it must be local one.
@@ -2012,6 +2133,20 @@ static int mdd_rename(const struct lu_env *env,
                                                   handle, 0);
         }
 
+        if (rc == 0 && mdd_sobj) {
+                mdd_write_lock(env, mdd_sobj, MOR_SRC_CHILD);
+                rc = mdd_links_rename(env, mdd_sobj, mdo2fid(mdd_spobj), lsname,
+                                      mdo2fid(mdd_tpobj), ltname, handle);
+                if (rc == -ENOENT)
+                        /* Old files might not have EA entry */
+                        mdd_links_add(env, mdd_sobj, mdo2fid(mdd_spobj),
+                                      lsname, handle);
+                mdd_write_unlock(env, mdd_sobj);
+                /* We don't fail the transaction if the link ea can't be
+                   updated -- fid2path will use alternate lookup method. */
+                rc = 0;
+        }
+
         EXIT;
 cleanup:
         if (likely(tdlh) && sdlh != tdlh)
@@ -2019,6 +2154,13 @@ cleanup:
         if (likely(sdlh))
                 mdd_pdo_write_unlock(env, mdd_spobj, sdlh);
 cleanup_unlocked:
+        if (rc == 0)
+                rc = mdd_changelog_ns_store(env, mdd, CL_RENAME, mdd_tobj,
+                                            mdd_spobj, lf, lsname, handle);
+        if (rc == 0)
+                rc = mdd_changelog_ns_store(env, mdd, CL_EXT, mdd_tobj,
+                                            mdd_tpobj, lf, ltname, handle);
+
         mdd_trans_stop(env, mdd, rc, handle);
         if (mdd_sobj)
                 mdd_object_put(env, mdd_sobj);
@@ -2029,7 +2171,7 @@ out_pending:
                         lquota_pending_commit(mds_quota_interface_ref, obd,
                                               qtpids[USRQUOTA],
                                               qtpids[GRPQUOTA],
-                                              1, 1);
+                                              rec_pending, 1);
                 /* Trigger dqrel on the source owner of parent.
                  * If failed, the next call for lquota_chkquota will
                  * process it. */
@@ -2046,6 +2188,266 @@ out_pending:
         return rc;
 }
 
+/** enable/disable storing of hardlink info */
+int mdd_linkea_enable = 1;
+CFS_MODULE_PARM(mdd_linkea_enable, "d", int, 0644,
+                "record hardlink info in EAs");
+
+/** Read the link EA into a temp buffer.
+ * Uses the name_buf since it is generally large.
+ * \retval IS_ERR err
+ * \retval ptr to \a lu_buf (always \a mti_big_buf)
+ */
+struct lu_buf *mdd_links_get(const struct lu_env *env,
+                             struct mdd_object *mdd_obj)
+{
+        struct lu_buf *buf;
+        struct lustre_capa *capa;
+        struct link_ea_header *leh;
+        int rc;
+
+        /* First try a small buf */
+        buf = mdd_buf_alloc(env, CFS_PAGE_SIZE);
+        if (buf->lb_buf == NULL)
+                return ERR_PTR(-ENOMEM);
+
+        capa = mdd_object_capa(env, mdd_obj);
+        rc = mdo_xattr_get(env, mdd_obj, buf, XATTR_NAME_LINK, capa);
+        if (rc == -ERANGE) {
+                /* Buf was too small, figure out what we need. */
+                buf->lb_buf = NULL;
+                buf->lb_len = 0;
+                rc = mdo_xattr_get(env, mdd_obj, buf, XATTR_NAME_LINK, capa);
+                if (rc < 0)
+                        return ERR_PTR(rc);
+                buf = mdd_buf_alloc(env, rc);
+                if (buf->lb_buf == NULL)
+                        return ERR_PTR(-ENOMEM);
+                rc = mdo_xattr_get(env, mdd_obj, buf, XATTR_NAME_LINK, capa);
+        }
+        if (rc < 0)
+                return ERR_PTR(rc);
+
+        leh = buf->lb_buf;
+        if (leh->leh_magic == __swab32(LINK_EA_MAGIC)) {
+                leh->leh_magic = LINK_EA_MAGIC;
+                leh->leh_reccount = __swab32(leh->leh_reccount);
+                leh->leh_len = __swab64(leh->leh_len);
+                /* entries are swabbed by mdd_lee_unpack */
+        }
+        if (leh->leh_magic != LINK_EA_MAGIC)
+                return ERR_PTR(-EINVAL);
+        if (leh->leh_reccount == 0)
+                return ERR_PTR(-ENODATA);
+
+        return buf;
+}
+
+/** Pack a link_ea_entry.
+ * All elements are stored as chars to avoid alignment issues.
+ * Numbers are always big-endian
+ * \param packbuf is a temp fid buffer
+ * \retval record length
+ */
+static int mdd_lee_pack(struct link_ea_entry *lee, const struct lu_name *lname,
+                         const struct lu_fid *pfid, struct lu_fid* packbuf)
+{
+        char *ptr;
+        int reclen;
+
+        fid_pack(&lee->lee_parent_fid, pfid, packbuf);
+        ptr = (char *)&lee->lee_parent_fid + lee->lee_parent_fid.fp_len;
+        strncpy(ptr, lname->ln_name, lname->ln_namelen);
+        reclen = lee->lee_parent_fid.fp_len + lname->ln_namelen +
+                sizeof(lee->lee_reclen);
+        lee->lee_reclen[0] = (reclen >> 8) & 0xff;
+        lee->lee_reclen[1] = reclen & 0xff;
+        return reclen;
+}
+
+void mdd_lee_unpack(const struct link_ea_entry *lee, int *reclen,
+                    struct lu_name *lname, struct lu_fid *pfid)
+{
+        *reclen = (lee->lee_reclen[0] << 8) | lee->lee_reclen[1];
+        fid_unpack(&lee->lee_parent_fid, pfid);
+        lname->ln_name = (char *)&lee->lee_parent_fid +
+                lee->lee_parent_fid.fp_len;
+        lname->ln_namelen = *reclen - lee->lee_parent_fid.fp_len -
+                sizeof(lee->lee_reclen);
+}
+
+/** Add a record to the end of link ea buf */
+static int __mdd_links_add(const struct lu_env *env, struct lu_buf *buf,
+                           const struct lu_fid *pfid,
+                           const struct lu_name *lname)
+{
+        struct link_ea_header *leh;
+        struct link_ea_entry *lee;
+        int reclen;
+
+        if (lname == NULL || pfid == NULL)
+                return -EINVAL;
+
+        /* Make sure our buf is big enough for the new one */
+        leh = buf->lb_buf;
+        reclen = lname->ln_namelen + sizeof(struct link_ea_entry);
+        if (leh->leh_len + reclen > buf->lb_len) {
+                if (mdd_buf_grow(env, leh->leh_len + reclen) < 0)
+                        return -ENOMEM;
+        }
+
+        leh = buf->lb_buf;
+        lee = buf->lb_buf + leh->leh_len;
+        reclen = mdd_lee_pack(lee, lname, pfid, &mdd_env_info(env)->mti_fid2);
+        leh->leh_len += reclen;
+        leh->leh_reccount++;
+        return 0;
+}
+
+/* For pathologic linkers, we don't want to spend lots of time scanning the
+ * link ea.  Limit ourseleves to something reasonable; links not in the EA
+ * can be looked up via (slower) parent lookup.
+ */
+#define LINKEA_MAX_COUNT 128
+
+static int mdd_links_add(const struct lu_env *env,
+                         struct mdd_object *mdd_obj,
+                         const struct lu_fid *pfid,
+                         const struct lu_name *lname,
+                         struct thandle *handle)
+{
+        struct lu_buf *buf;
+        struct link_ea_header *leh;
+        int rc;
+        ENTRY;
+
+        if (!mdd_linkea_enable)
+                RETURN(0);
+
+        buf = mdd_links_get(env, mdd_obj);
+        if (IS_ERR(buf)) {
+                rc = PTR_ERR(buf);
+                if (rc != -ENODATA) {
+                        CERROR("link_ea read failed %d "DFID"\n", rc,
+                               PFID(mdd_object_fid(mdd_obj)));
+                        RETURN (rc);
+                }
+                /* empty EA; start one */
+                buf = mdd_buf_alloc(env, CFS_PAGE_SIZE);
+                if (buf->lb_buf == NULL)
+                        RETURN(-ENOMEM);
+                leh = buf->lb_buf;
+                leh->leh_magic = LINK_EA_MAGIC;
+                leh->leh_len = sizeof(struct link_ea_header);
+                leh->leh_reccount = 0;
+        }
+
+        leh = buf->lb_buf;
+        if (leh->leh_reccount > LINKEA_MAX_COUNT)
+                RETURN(-EOVERFLOW);
+
+        rc = __mdd_links_add(env, buf, pfid, lname);
+        if (rc)
+                RETURN(rc);
+
+        leh = buf->lb_buf;
+        rc = __mdd_xattr_set(env, mdd_obj,
+                             mdd_buf_get_const(env, buf->lb_buf, leh->leh_len),
+                             XATTR_NAME_LINK, 0, handle);
+        if (rc)
+                CERROR("link_ea add failed %d "DFID"\n", rc,
+                       PFID(mdd_object_fid(mdd_obj)));
+
+        if (buf->lb_vmalloc)
+                /* if we vmalloced a large buffer drop it */
+                mdd_buf_put(buf);
+
+        RETURN (rc);
+}
+
+static int mdd_links_rename(const struct lu_env *env,
+                            struct mdd_object *mdd_obj,
+                            const struct lu_fid *oldpfid,
+                            const struct lu_name *oldlname,
+                            const struct lu_fid *newpfid,
+                            const struct lu_name *newlname,
+                            struct thandle *handle)
+{
+        struct lu_buf  *buf;
+        struct link_ea_header *leh;
+        struct link_ea_entry  *lee;
+        struct lu_name *tmpname = &mdd_env_info(env)->mti_name;
+        struct lu_fid  *tmpfid = &mdd_env_info(env)->mti_fid;
+        int reclen = 0;
+        int count;
+        int rc, rc2 = 0;
+        ENTRY;
+
+        if (!mdd_linkea_enable)
+                RETURN(0);
+
+        if (mdd_obj->mod_flags & DEAD_OBJ)
+                /* No more links, don't bother */
+                RETURN(0);
+
+        buf = mdd_links_get(env, mdd_obj);
+        if (IS_ERR(buf)) {
+                rc = PTR_ERR(buf);
+                CERROR("link_ea read failed %d "DFID"\n",
+                       rc, PFID(mdd_object_fid(mdd_obj)));
+                RETURN(rc);
+        }
+        leh = buf->lb_buf;
+        lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
+
+        /* Find the old record */
+        for(count = 0; count <= leh->leh_reccount; count++) {
+                mdd_lee_unpack(lee, &reclen, tmpname, tmpfid);
+                if (tmpname->ln_namelen == oldlname->ln_namelen &&
+                    lu_fid_eq(tmpfid, oldpfid) &&
+                    (strncmp(tmpname->ln_name, oldlname->ln_name,
+                             tmpname->ln_namelen) == 0))
+                        break;
+                lee = (struct link_ea_entry *)((char *)lee + reclen);
+        }
+        if (count > leh->leh_reccount) {
+                CDEBUG(D_INODE, "Old link_ea name '%.*s' not found\n",
+                       oldlname->ln_namelen, oldlname->ln_name);
+                GOTO(out, rc = -ENOENT);
+        }
+
+        /* Remove the old record */
+        leh->leh_reccount--;
+        leh->leh_len -= reclen;
+        memmove(lee, (char *)lee + reclen, (char *)leh + leh->leh_len -
+                (char *)lee);
+
+        /* If renaming, add the new record */
+        if (newpfid != NULL) {
+                /* if the add fails, we still delete the out-of-date old link */
+                rc2 = __mdd_links_add(env, buf, newpfid, newlname);
+                leh = buf->lb_buf;
+        }
+
+        rc = __mdd_xattr_set(env, mdd_obj,
+                             mdd_buf_get_const(env, buf->lb_buf, leh->leh_len),
+                             XATTR_NAME_LINK, 0, handle);
+
+out:
+        if (rc == 0)
+                rc = rc2;
+        if (rc)
+                CDEBUG(D_INODE, "link_ea mv/unlink '%.*s' failed %d "DFID"\n",
+                       oldlname->ln_namelen, oldlname->ln_name, rc,
+                       PFID(mdd_object_fid(mdd_obj)));
+
+        if (buf->lb_vmalloc)
+                /* if we vmalloced a large buffer drop it */
+                mdd_buf_put(buf);
+
+        RETURN (rc);
+}
+
 const struct md_dir_operations mdd_dir_ops = {
         .mdo_is_subdir     = mdd_is_subdir,
         .mdo_lookup        = mdd_lookup,