/** The link ea holds 1 \a link_ea_entry for each hardlink */
#define LINK_EA_MAGIC 0x11EAF1DFUL
struct link_ea_header {
- __u32 leh_magic;
- __u32 leh_reccount;
- __u64 leh_len; /* total size */
- /* future use */
- __u32 padding1;
- __u32 padding2;
+ __u32 leh_magic;
+ __u32 leh_reccount;
+ __u64 leh_len; /* total size */
+ __u32 leh_overflow_time;
+ __u32 leh_padding;
};
/** Hardlink data is name and parent fid.
* Author: di wang <di.wang@intel.com>
*/
-#define DEFAULT_LINKEA_SIZE 4096
+/* There are several reasons to restrict the linkEA size:
+ *
+ * 1. Under DNE mode, if we do not restrict the linkEA size, and if there
+ * are too many cross-MDTs hard links to the same object, then it will
+ * casue the llog overflow.
+ *
+ * 2. Some backend has limited size for EA. For example, if without large
+ * EA enabled, the ldiskfs will make all EAs to share one (4K) EA block.
+ *
+ * 3. Too many entries in linkEA will seriously affect linkEA performance
+ * because we only support to locate linkEA entry consecutively. */
+#define MAX_LINKEA_SIZE 4096
struct linkea_data {
/**
int linkea_data_new(struct linkea_data *ldata, struct lu_buf *buf);
int linkea_init(struct linkea_data *ldata);
+int linkea_init_with_rec(struct linkea_data *ldata);
void linkea_entry_unpack(const struct link_ea_entry *lee, int *reclen,
struct lu_name *lname, struct lu_fid *pfid);
int linkea_entry_pack(struct link_ea_entry *lee, const struct lu_name *lname,
int linkea_add_buf(struct linkea_data *ldata, const struct lu_name *lname,
const struct lu_fid *pfid);
void linkea_del_buf(struct linkea_data *ldata, const struct lu_name *lname);
+int linkea_links_new(struct linkea_data *ldata, struct lu_buf *buf,
+ const struct lu_name *cname, const struct lu_fid *pfid);
+int linkea_overflow_shrink(struct linkea_data *ldata);
int linkea_links_find(struct linkea_data *ldata, const struct lu_name *lname,
const struct lu_fid *pfid);
#define OBD_FAIL_LFSCK_NO_NAMEENTRY 0x1624
#define OBD_FAIL_LFSCK_MORE_NLINK 0x1625
#define OBD_FAIL_LFSCK_LESS_NLINK 0x1626
-#define OBD_FAIL_LFSCK_LINKEA_OVERFLOW 0x1627
#define OBD_FAIL_LFSCK_BAD_NAME_HASH 0x1628
#define OBD_FAIL_LFSCK_LOST_MASTER_LMV 0x1629
#define OBD_FAIL_LFSCK_LOST_SLAVE_LMV 0x162a
if (rc == 0)
/* For insert new linkEA entry. */
rc = dt_declare_xattr_set(env, obj,
- lfsck_buf_get_const(env, NULL, DEFAULT_LINKEA_SIZE),
+ lfsck_buf_get_const(env, NULL, MAX_LINKEA_SIZE),
XATTR_NAME_LINK, 0, handle);
return rc;
}
int rc;
ENTRY;
- rc = linkea_init(ldata);
+ rc = linkea_init_with_rec(ldata);
if (rc < 0)
RETURN(rc);
PFID(lu_object_fid(&dto->do_lu)), i);
sname = lod_name_get(env, stripe_name, strlen(stripe_name));
- rc = linkea_data_new(&ldata, &info->lti_linkea_buf);
- if (rc != 0)
- GOTO(out, rc);
-
- rc = linkea_add_buf(&ldata, sname, lu_object_fid(&dt->do_lu));
+ rc = linkea_links_new(&ldata, &info->lti_linkea_buf,
+ sname, lu_object_fid(&dt->do_lu));
if (rc != 0)
GOTO(out, rc);
PFID(lu_object_fid(&dto->do_lu)), i);
sname = lod_name_get(env, stripe_name, strlen(stripe_name));
- rc = linkea_data_new(&ldata, &info->lti_linkea_buf);
- if (rc != 0)
- GOTO(out, rc);
-
- rc = linkea_add_buf(&ldata, sname, lu_object_fid(&dt->do_lu));
+ rc = linkea_links_new(&ldata, &info->lti_linkea_buf,
+ sname, lu_object_fid(&dt->do_lu));
if (rc != 0)
GOTO(out, rc);
RETURN(rc);
}
+/** Read the link EA into a temp buffer.
+ * Uses the mdd_thread_info::mti_big_buf since it is generally large.
+ * A pointer to the buffer is stored in \a ldata::ld_buf.
+ *
+ * \retval 0 or error
+ */
+static int __mdd_links_read(const struct lu_env *env,
+ struct mdd_object *mdd_obj,
+ struct linkea_data *ldata)
+{
+ int rc;
+
+ if (!mdd_object_exists(mdd_obj))
+ return -ENODATA;
+
+ /* First try a small buf */
+ LASSERT(env != NULL);
+ ldata->ld_buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_link_buf,
+ PAGE_SIZE);
+ if (ldata->ld_buf->lb_buf == NULL)
+ return -ENOMEM;
+
+ rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf, XATTR_NAME_LINK);
+ if (rc == -ERANGE) {
+ /* Buf was too small, figure out what we need. */
+ lu_buf_free(ldata->ld_buf);
+ rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf,
+ XATTR_NAME_LINK);
+ if (rc < 0)
+ return rc;
+ ldata->ld_buf = lu_buf_check_and_alloc(ldata->ld_buf, rc);
+ if (ldata->ld_buf->lb_buf == NULL)
+ return -ENOMEM;
+ rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf,
+ XATTR_NAME_LINK);
+ }
+ if (rc < 0) {
+ lu_buf_free(ldata->ld_buf);
+ ldata->ld_buf = NULL;
+ return rc;
+ }
+
+ return linkea_init(ldata);
+}
+
+static int mdd_links_read(const struct lu_env *env,
+ struct mdd_object *mdd_obj,
+ struct linkea_data *ldata)
+{
+ int rc;
+
+ rc = __mdd_links_read(env, mdd_obj, ldata);
+ if (!rc)
+ rc = linkea_init(ldata);
+
+ return rc;
+}
+
+static int mdd_links_read_with_rec(const struct lu_env *env,
+ struct mdd_object *mdd_obj,
+ struct linkea_data *ldata)
+{
+ int rc;
+
+ rc = __mdd_links_read(env, mdd_obj, ldata);
+ if (!rc)
+ rc = linkea_init_with_rec(ldata);
+
+ return rc;
+}
+
/**
* Get parent FID of the directory
*
GOTO(lookup, rc = 0);
ldata.ld_buf = buf;
- rc = mdd_links_read(env, obj, &ldata);
+ rc = mdd_links_read_with_rec(env, obj, &ldata);
if (rc != 0)
GOTO(lookup, rc);
struct linkea_data *ldata)
{
int rc = 0;
- int rc2 = 0;
ENTRY;
if (OBD_FAIL_CHECK(OBD_FAIL_FID_IGIF))
- return 0;
+ RETURN(0);
LASSERT(oldpfid != NULL || newpfid != NULL);
- if (mdd_obj->mod_flags & DEAD_OBJ) {
- /* Prevent linkea to be updated which is NOT necessary. */
- ldata->ld_reclen = 0;
- /* No more links, don't bother */
+ if (mdd_obj->mod_flags & DEAD_OBJ)
+ /* Unnecessary to update linkEA for dead object. */
RETURN(0);
- }
if (oldpfid != NULL) {
rc = __mdd_links_del(env, mdd_obj, ldata, oldlname, oldpfid);
if (rc) {
- if ((check == 1) ||
- (rc != -ENODATA && rc != -ENOENT))
+ if ((check == 1) || (rc != -ENODATA && rc != -ENOENT))
RETURN(rc);
+
/* No changes done. */
rc = 0;
}
}
/* If renaming, add the new record */
- if (newpfid != NULL) {
- /* even if the add fails, we still delete the out-of-date
- * old link */
- rc2 = __mdd_links_add(env, mdd_obj, ldata, newlname, newpfid,
- first, check);
- }
-
- rc = rc != 0 ? rc : rc2;
+ if (newpfid != NULL)
+ rc = __mdd_links_add(env, mdd_obj, ldata, newlname, newpfid,
+ first, check);
RETURN(rc);
}
ldata = &mdd_env_info(env)->mti_link_data;
memset(ldata, 0, sizeof(*ldata));
rc = mdd_linkea_prepare(env, mdd_obj, oldpfid, oldlname,
- newpfid, newlname, first, check,
- ldata);
- if (rc != 0)
+ newpfid, newlname, first, check, ldata);
+ if (rc)
GOTO(out, rc);
}
- if (ldata->ld_reclen != 0)
+ if (!(mdd_obj->mod_flags & DEAD_OBJ))
rc = mdd_links_write(env, mdd_obj, ldata, handle);
- EXIT;
+
+ GOTO(out, rc);
+
out:
if (rc != 0) {
- int error = 1;
- if (rc == -EOVERFLOW || rc == -ENOSPC)
- error = 0;
if (newlname == NULL)
- CDEBUG(error ? D_ERROR : D_OTHER,
- "link_ea add failed %d "DFID"\n",
+ CERROR("link_ea add failed %d "DFID"\n",
rc, PFID(mdd_object_fid(mdd_obj)));
else if (oldpfid == NULL)
- CDEBUG(error ? D_ERROR : D_OTHER,
- "link_ea add '%.*s' failed %d "DFID"\n",
- newlname->ln_namelen, newlname->ln_name,
- rc, PFID(mdd_object_fid(mdd_obj)));
+ CERROR("link_ea add '%.*s' failed %d "DFID"\n",
+ newlname->ln_namelen, newlname->ln_name, rc,
+ PFID(mdd_object_fid(mdd_obj)));
else if (newpfid == NULL)
- CDEBUG(error ? D_ERROR : D_OTHER,
- "link_ea del '%.*s' failed %d "DFID"\n",
- oldlname->ln_namelen, oldlname->ln_name,
- rc, PFID(mdd_object_fid(mdd_obj)));
+ CERROR("link_ea del '%.*s' failed %d "DFID"\n",
+ oldlname->ln_namelen, oldlname->ln_name, rc,
+ PFID(mdd_object_fid(mdd_obj)));
else
- CDEBUG(error ? D_ERROR : D_OTHER,
- "link_ea rename '%.*s'->'%.*s' failed %d "
- DFID"\n",
- oldlname->ln_namelen, oldlname->ln_name,
- newlname->ln_namelen, newlname->ln_name,
- rc, PFID(mdd_object_fid(mdd_obj)));
+ CERROR("link_ea rename '%.*s'->'%.*s' failed %d "DFID
+ "\n", oldlname->ln_namelen, oldlname->ln_name,
+ newlname->ln_namelen, newlname->ln_name, rc,
+ PFID(mdd_object_fid(mdd_obj)));
}
if (is_vmalloc_addr(ldata->ld_buf))
}
/** Read the link EA into a temp buffer.
- * Uses the mdd_thread_info::mti_big_buf since it is generally large.
- * A pointer to the buffer is stored in \a ldata::ld_buf.
- *
- * \retval 0 or error
- */
-int mdd_links_read(const struct lu_env *env, struct mdd_object *mdd_obj,
- struct linkea_data *ldata)
-{
- int rc;
-
- if (!mdd_object_exists(mdd_obj))
- return -ENODATA;
-
- /* First try a small buf */
- LASSERT(env != NULL);
- ldata->ld_buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_link_buf,
- PAGE_SIZE);
- if (ldata->ld_buf->lb_buf == NULL)
- return -ENOMEM;
-
- rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf, XATTR_NAME_LINK);
- if (rc == -ERANGE) {
- /* Buf was too small, figure out what we need. */
- lu_buf_free(ldata->ld_buf);
- rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf,
- XATTR_NAME_LINK);
- if (rc < 0)
- return rc;
- ldata->ld_buf = lu_buf_check_and_alloc(ldata->ld_buf, rc);
- if (ldata->ld_buf->lb_buf == NULL)
- return -ENOMEM;
- rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf,
- XATTR_NAME_LINK);
- }
- if (rc < 0) {
- lu_buf_free(ldata->ld_buf);
- ldata->ld_buf = NULL;
- return rc;
- }
-
- return linkea_init(ldata);
-}
-
-/** Read the link EA into a temp buffer.
* Uses the name_buf since it is generally large.
* \retval IS_ERR err
* \retval ptr to \a lu_buf (always \a mti_big_buf)
ldata->ld_leh == NULL)
return 0;
- buf = mdd_buf_get_const(env, ldata->ld_buf->lb_buf,
- ldata->ld_leh->leh_len);
if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_LINKEA))
return 0;
+again:
+ buf = mdd_buf_get_const(env, ldata->ld_buf->lb_buf,
+ ldata->ld_leh->leh_len);
rc = mdo_xattr_set(env, mdd_obj, buf, XATTR_NAME_LINK, 0, handle);
- if (unlikely(rc == -ENOSPC) && S_ISREG(mdd_object_type(mdd_obj)) &&
- mdd_object_remote(mdd_obj) == 0) {
- struct lfsck_request *lr = &mdd_env_info(env)->mti_lr;
- struct thandle *sub_th;
-
- /* XXX: If the linkEA is overflow, then we need to notify the
- * namespace LFSCK to skip "nlink" attribute verification
- * on this object to avoid the "nlink" to be shrinked by
- * wrong. It may be not good an interaction with LFSCK
- * like this. We will consider to replace it with other
- * mechanism in future. LU-5802. */
- lfsck_pack_rfa(lr, mdo2fid(mdd_obj), LE_SKIP_NLINK,
- LFSCK_TYPE_NAMESPACE);
-
- sub_th = thandle_get_sub_by_dt(env, handle,
- mdo2mdd(&mdd_obj->mod_obj)->mdd_bottom);
- lfsck_in_notify(env, mdo2mdd(&mdd_obj->mod_obj)->mdd_bottom,
- lr, sub_th);
+ if (unlikely(rc == -ENOSPC)) {
+ rc = linkea_overflow_shrink(ldata);
+ if (likely(rc > 0))
+ goto again;
}
return rc;
}
-int mdd_declare_links_add(const struct lu_env *env, struct mdd_object *mdd_obj,
- struct thandle *handle, struct linkea_data *ldata,
- enum mdd_links_add_overflow overflow)
+static int mdd_declare_links_add(const struct lu_env *env,
+ struct mdd_object *mdd_obj,
+ struct thandle *handle,
+ struct linkea_data *ldata)
{
int rc;
int ea_len;
ea_len = ldata->ld_leh->leh_len;
linkea = ldata->ld_buf->lb_buf;
} else {
- ea_len = DEFAULT_LINKEA_SIZE;
+ ea_len = MAX_LINKEA_SIZE;
linkea = NULL;
}
- /* XXX: max size? */
rc = mdo_declare_xattr_set(env, mdd_obj,
mdd_buf_get_const(env, linkea, ea_len),
XATTR_NAME_LINK, 0, handle);
- if (rc != 0)
- return rc;
-
- if (mdd_object_remote(mdd_obj) == 0 && overflow == MLAO_CHECK) {
- struct lfsck_request *lr = &mdd_env_info(env)->mti_lr;
- struct thandle *sub_th;
-
- /* XXX: If the linkEA is overflow, then we need to notify the
- * namespace LFSCK to skip "nlink" attribute verification
- * on this object to avoid the "nlink" to be shrinked by
- * wrong. It may be not good an interaction with LFSCK
- * like this. We will consider to replace it with other
- * mechanism in future. LU-5802. */
- lfsck_pack_rfa(lr, mdo2fid(mdd_obj), LE_SKIP_NLINK_DECLARE,
- LFSCK_TYPE_NAMESPACE);
-
- sub_th = thandle_get_sub_by_dt(env, handle,
- mdo2mdd(&mdd_obj->mod_obj)->mdd_bottom);
- rc = lfsck_in_notify(env,
- mdo2mdd(&mdd_obj->mod_obj)->mdd_bottom,
- lr, sub_th);
- }
return rc;
}
/* For directory, the linkEA will be removed together
* with the object. */
if (!S_ISDIR(mdd_object_type(c)))
- rc = mdd_declare_links_add(env, c, handle, NULL, MLAO_IGNORE);
+ rc = mdd_declare_links_add(env, c, handle, NULL);
return rc;
}
if (rc != 0)
return rc;
- rc = mdd_declare_links_add(env, c, handle, data,
- S_ISREG(mdd_object_type(c)) ? MLAO_CHECK : MLAO_IGNORE);
+ rc = mdd_declare_links_add(env, c, handle, data);
if (rc != 0)
return rc;
if (rc != 0)
return rc;
- rc = mdd_declare_links_add(env, c, handle, ldata, MLAO_IGNORE);
+ rc = mdd_declare_links_add(env, c, handle, ldata);
if (rc)
return rc;
if (rc)
return rc;
- rc = mdd_declare_links_add(env, mdd_sobj, handle, ldata,
- S_ISREG(mdd_object_type(mdd_sobj)) ? MLAO_CHECK : MLAO_IGNORE);
+ rc = mdd_declare_links_add(env, mdd_sobj, handle, ldata);
if (rc)
return rc;
GOTO(out_pending, rc = PTR_ERR(handle));
memset(ldata, 0, sizeof(*ldata));
- mdd_linkea_prepare(env, mdd_sobj, mdd_object_fid(mdd_spobj), lsname,
- mdd_object_fid(mdd_tpobj), ltname, 1, 0, ldata);
+ rc = mdd_linkea_prepare(env, mdd_sobj, mdd_object_fid(mdd_spobj),
+ lsname, mdd_object_fid(mdd_tpobj), ltname,
+ 1, 0, ldata);
+ if (rc)
+ GOTO(stop, rc);
+
rc = mdd_declare_rename(env, mdd, mdd_spobj, mdd_tpobj, mdd_sobj,
mdd_tobj, lsname, ltname, ma, ldata, handle);
if (rc)
linkea_entry_pack(ldata.ld_lee, &lname,
mdd_object_fid(newparent));
if (declare)
- rc = mdd_declare_links_add(env, child, handle, &ldata,
- MLAO_IGNORE);
+ rc = mdd_declare_links_add(env, child, handle, &ldata);
else
rc = mdd_links_write(env, child, &ldata, handle);
break;
ENTRY;
LASSERT(ldata->ld_buf != NULL);
+ LASSERT(ldata->ld_leh != NULL);
-again:
/* If it is mulitple links file, we need update the name entry for
* all parent */
- LASSERT(ldata->ld_leh != NULL);
ldata->ld_lee = (struct link_ea_entry *)(ldata->ld_leh + 1);
for (count = 0; count < ldata->ld_leh->leh_reccount; count++) {
struct mdd_device *mdd = mdo2mdd(&mdd_sobj->mod_obj);
CWARN("%s: cannot find obj "DFID": rc = %ld\n",
mdd2obd_dev(mdd)->obd_name, PFID(&fid),
PTR_ERR(pobj));
- linkea_del_buf(ldata, &lname);
- goto again;
+ continue;
}
if (!mdd_object_exists(pobj)) {
CDEBUG(D_INFO, "%s: obj "DFID" does not exist\n",
mdd2obd_dev(mdd)->obd_name, PFID(&fid));
- linkea_del_buf(ldata, &lname);
- mdd_object_put(env, pobj);
- goto again;
+ goto next_put;
}
if (pobj == mdd_pobj &&
CDEBUG(D_INFO, "%s: skip its own %s: "DFID"\n",
mdd2obd_dev(mdd)->obd_name, child_name->ln_name,
PFID(&fid));
- linkea_del_buf(ldata, &lname);
- mdd_object_put(env, pobj);
- goto again;
+ goto next_put;
}
CDEBUG(D_INFO, "%s: update "DFID" with "DNAME":"DFID"\n",
/* lnamelen is too big(> NAME_MAX + 16),
* something wrong about this linkea, let's
* skip it */
- linkea_del_buf(ldata, &lname);
- mdd_object_put(env, pobj);
- goto again;
+ CWARN("%s: the name %.*s is too long under "
+ DFID"\n", mdd2obd_dev(mdd)->obd_name,
+ lname.ln_namelen, lname.ln_name,
+ PFID(&fid));
+ goto next_put;
}
/* Note: lname might be without \0 at the end, see
* it might be packed into the RPC buffer. */
rc = mdd_lookup(env, &pobj->mod_obj, &lname,
&info->mti_fid, NULL);
- if (rc < 0 ||
- !lu_fid_eq(&info->mti_fid,
- mdd_object_fid(mdd_sobj))) {
- /* skip invalid linkea entry */
- linkea_del_buf(ldata, &lname);
- mdd_object_put(env, pobj);
- goto again;
- }
+ if (rc < 0 || !lu_fid_eq(&info->mti_fid,
+ mdd_object_fid(mdd_sobj)))
+ GOTO(next_put, rc == -ENOENT ? 0 : rc);
rc = __mdd_index_delete(env, pobj, tmp_name, 0, handle);
if (rc != 0)
xname = list_xbuf.lb_buf;
while (rem > 0) {
xlen = strnlen(xname, rem - 1) + 1;
- if (strcmp(XATTR_NAME_LINK, xname) == 0 ||
- strcmp(XATTR_NAME_LMA, xname) == 0 ||
+ if (strcmp(XATTR_NAME_LMA, xname) == 0 ||
strcmp(XATTR_NAME_LMV, xname) == 0)
goto next;
if (rc != 0)
GOTO(stop_trans, rc);
+again:
rc = mdo_xattr_set(env, mdd_tobj, &xbuf, xname, 0, handle);
if (rc == -EEXIST)
GOTO(stop_trans, rc = 0);
+ if (unlikely(rc == -ENOSPC &&
+ strcmp(xname, XATTR_NAME_LINK) == 0)) {
+ rc = linkea_overflow_shrink(
+ (struct linkea_data *)(xbuf.lb_buf));
+ if (likely(rc > 0)) {
+ xbuf.lb_len = rc;
+ goto again;
+ }
+ }
+
if (rc != 0)
GOTO(stop_trans, rc);
stop_trans:
if (rc != 0)
return rc;
} else if (S_ISDIR(la->la_mode) && ldata != NULL) {
- rc = mdd_declare_links_add(env, mdd_tobj, handle, ldata,
- MLAO_IGNORE);
+ rc = mdd_declare_links_add(env, mdd_tobj, handle, ldata);
if (rc != 0)
return rc;
}
spec->sp_cr_flags |= MDS_OPEN_HAS_EA;
}
} else if (S_ISDIR(la->la_mode)) {
- rc = mdd_links_read(env, mdd_sobj, ldata);
+ rc = mdd_links_read_with_rec(env, mdd_sobj, ldata);
if (rc == -ENODATA) {
/* ignore the non-linkEA error */
ldata = NULL;
if (rc != 0)
return rc;
- rc = mdd_declare_links_add(env, mdd_tobj, handle, NULL, MLAO_IGNORE);
+ rc = mdd_declare_links_add(env, mdd_tobj, handle, NULL);
if (rc != 0)
return rc;
if (rc != 0)
GOTO(stop_trans, rc);
- linkea_add_buf(ldata, lname, mdd_object_fid(mdd_pobj));
- rc = mdd_links_add(env, mdd_tobj, mdo2fid(mdd_pobj), lname, handle,
- ldata, 1);
- if (rc != 0)
- GOTO(stop_trans, rc);
-
mdd_write_lock(env, mdd_sobj, MOR_TGT_CHILD);
mdd_sobj->mod_flags |= DEAD_OBJ;
/* If there are still links locally, then the file will not be
* migrated. */
LASSERT(ldata->ld_leh != NULL);
+
+ /* If the linkEA is overflow, then means there are some unknown name
+ * entries under unknown parents, that will prevent the migration. */
+ if (unlikely(ldata->ld_leh->leh_overflow_time))
+ RETURN(1);
+
ldata->ld_lee = (struct link_ea_entry *)(ldata->ld_leh + 1);
for (count = 0; count < ldata->ld_leh->leh_reccount; count++) {
struct lu_name lname;
struct lu_seq_range mti_range;
};
-enum mdd_links_add_overflow {
- MLAO_IGNORE = false,
- MLAO_CHECK = true,
-};
-
extern const char orph_index_name[];
int mdd_la_get(const struct lu_env *env, struct mdd_object *obj,
int mdd_lookup(const struct lu_env *env,
struct md_object *pobj, const struct lu_name *lname,
struct lu_fid* fid, struct md_op_spec *spec);
-int mdd_links_read(const struct lu_env *env, struct mdd_object *mdd_obj,
- struct linkea_data *ldata);
-int mdd_declare_links_add(const struct lu_env *env, struct mdd_object *mdd_obj,
- struct thandle *handle, struct linkea_data *ldata,
- enum mdd_links_add_overflow overflow);
int mdd_links_write(const struct lu_env *env, struct mdd_object *mdd_obj,
struct linkea_data *ldata, struct thandle *handle);
struct lu_buf *mdd_links_get(const struct lu_env *env,
if (rc < 0)
return rc;
- return linkea_init(ldata);
+ return linkea_init_with_rec(ldata);
}
/**
return -ENOMEM;
ldata->ld_leh = ldata->ld_buf->lb_buf;
ldata->ld_leh->leh_magic = LINK_EA_MAGIC;
- ldata->ld_leh->leh_len = sizeof(struct link_ea_header);
ldata->ld_leh->leh_reccount = 0;
+ ldata->ld_leh->leh_len = sizeof(struct link_ea_header);
+ ldata->ld_leh->leh_overflow_time = 0;
+ ldata->ld_leh->leh_padding = 0;
return 0;
}
EXPORT_SYMBOL(linkea_data_new);
leh->leh_magic = LINK_EA_MAGIC;
leh->leh_reccount = __swab32(leh->leh_reccount);
leh->leh_len = __swab64(leh->leh_len);
- /* entries are swabbed by linkea_entry_unpack */
+ leh->leh_overflow_time = __swab32(leh->leh_overflow_time);
+ leh->leh_padding = __swab32(leh->leh_padding);
+ /* individual entries are swabbed by linkea_entry_unpack() */
}
+
if (leh->leh_magic != LINK_EA_MAGIC)
return -EINVAL;
- if (leh->leh_reccount == 0)
+
+ if (leh->leh_reccount == 0 && leh->leh_overflow_time == 0)
return -ENODATA;
ldata->ld_leh = leh;
}
EXPORT_SYMBOL(linkea_init);
+int linkea_init_with_rec(struct linkea_data *ldata)
+{
+ int rc;
+
+ rc = linkea_init(ldata);
+ if (!rc && ldata->ld_leh->leh_reccount == 0)
+ rc = -ENODATA;
+
+ return rc;
+}
+EXPORT_SYMBOL(linkea_init_with_rec);
+
/**
* Pack a link_ea_entry.
* All elements are stored as chars to avoid alignment issues.
void linkea_entry_unpack(const struct link_ea_entry *lee, int *reclen,
struct lu_name *lname, struct lu_fid *pfid)
{
+ LASSERT(lee != NULL);
+
*reclen = (lee->lee_reclen[0] << 8) | lee->lee_reclen[1];
memcpy(pfid, &lee->lee_parent_fid, sizeof(*pfid));
fid_be_to_cpu(pfid, pfid);
int linkea_add_buf(struct linkea_data *ldata, const struct lu_name *lname,
const struct lu_fid *pfid)
{
- LASSERT(ldata->ld_leh != NULL);
+ struct link_ea_header *leh = ldata->ld_leh;
+ int reclen;
+
+ LASSERT(leh != NULL);
if (lname == NULL || pfid == NULL)
return -EINVAL;
- ldata->ld_reclen = lname->ln_namelen + sizeof(struct link_ea_entry);
- if (ldata->ld_leh->leh_len + ldata->ld_reclen >
- ldata->ld_buf->lb_len) {
+ reclen = lname->ln_namelen + sizeof(struct link_ea_entry);
+ if (unlikely(leh->leh_len + reclen > MAX_LINKEA_SIZE)) {
+ /* Use 32-bits to save the overflow time, although it will
+ * shrink the cfs_time_current_sec() returned 64-bits value
+ * to 32-bits value, it is still quite large and can be used
+ * for about 140 years. That is enough. */
+ leh->leh_overflow_time = cfs_time_current_sec();
+ if (unlikely(leh->leh_overflow_time == 0))
+ leh->leh_overflow_time++;
+
+ CDEBUG(D_INODE, "No enough space to hold linkea entry '"
+ DFID": %.*s' at %u\n", PFID(pfid), lname->ln_namelen,
+ lname->ln_name, leh->leh_overflow_time);
+ return 0;
+ }
+
+ if (leh->leh_len + reclen > ldata->ld_buf->lb_len) {
if (lu_buf_check_and_grow(ldata->ld_buf,
- ldata->ld_leh->leh_len +
- ldata->ld_reclen) < 0)
+ leh->leh_len + reclen) < 0)
return -ENOMEM;
+
+ leh = ldata->ld_leh = ldata->ld_buf->lb_buf;
}
- ldata->ld_leh = ldata->ld_buf->lb_buf;
- ldata->ld_lee = ldata->ld_buf->lb_buf + ldata->ld_leh->leh_len;
+ ldata->ld_lee = ldata->ld_buf->lb_buf + leh->leh_len;
ldata->ld_reclen = linkea_entry_pack(ldata->ld_lee, lname, pfid);
- ldata->ld_leh->leh_len += ldata->ld_reclen;
- ldata->ld_leh->leh_reccount++;
+ leh->leh_len += ldata->ld_reclen;
+ leh->leh_reccount++;
CDEBUG(D_INODE, "New link_ea name '"DFID":%.*s' is added\n",
PFID(pfid), lname->ln_namelen, lname->ln_name);
return 0;
void linkea_del_buf(struct linkea_data *ldata, const struct lu_name *lname)
{
LASSERT(ldata->ld_leh != NULL && ldata->ld_lee != NULL);
+ LASSERT(ldata->ld_leh->leh_reccount > 0);
ldata->ld_leh->leh_reccount--;
ldata->ld_leh->leh_len -= ldata->ld_reclen;
}
EXPORT_SYMBOL(linkea_del_buf);
+int linkea_links_new(struct linkea_data *ldata, struct lu_buf *buf,
+ const struct lu_name *cname, const struct lu_fid *pfid)
+{
+ int rc;
+
+ rc = linkea_data_new(ldata, buf);
+ if (!rc)
+ rc = linkea_add_buf(ldata, cname, pfid);
+
+ return rc;
+}
+EXPORT_SYMBOL(linkea_links_new);
+
+/**
+ * Mark the linkEA as overflow with current timestamp,
+ * and remove the last linkEA entry.
+ *
+ * Return the new linkEA size.
+ */
+int linkea_overflow_shrink(struct linkea_data *ldata)
+{
+ struct link_ea_header *leh;
+ struct lu_name tname;
+ struct lu_fid tfid;
+ int count;
+
+ leh = ldata->ld_leh = ldata->ld_buf->lb_buf;
+ if (leh->leh_magic == __swab32(LINK_EA_MAGIC)) {
+ leh->leh_magic = LINK_EA_MAGIC;
+ leh->leh_reccount = __swab32(leh->leh_reccount);
+ leh->leh_overflow_time = __swab32(leh->leh_overflow_time);
+ leh->leh_padding = __swab32(leh->leh_padding);
+ }
+
+ LASSERT(leh->leh_reccount > 0);
+
+ leh->leh_len = sizeof(struct link_ea_header);
+ leh->leh_reccount--;
+ if (unlikely(leh->leh_reccount == 0))
+ return 0;
+
+ leh->leh_overflow_time = cfs_time_current_sec();
+ if (unlikely(leh->leh_overflow_time == 0))
+ leh->leh_overflow_time++;
+ ldata->ld_reclen = 0;
+ ldata->ld_lee = (struct link_ea_entry *)(leh + 1);
+ for (count = 0; count < leh->leh_reccount; count++) {
+ linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen,
+ &tname, &tfid);
+ leh->leh_len += ldata->ld_reclen;
+ ldata->ld_lee = (struct link_ea_entry *)((char *)ldata->ld_lee +
+ ldata->ld_reclen);
+ }
+
+ linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen, &tname, &tfid);
+ CDEBUG(D_INODE, "No enough space to hold the last linkea entry '"
+ DFID": %.*s', shrink it, left %d linkea entries, size %llu\n",
+ PFID(&tfid), tname.ln_namelen, tname.ln_name,
+ leh->leh_reccount, leh->leh_len);
+
+ return leh->leh_len;
+}
+EXPORT_SYMBOL(linkea_overflow_shrink);
+
/**
* Check if such a link exists in linkEA.
*
LASSERT(ldata->ld_leh != NULL);
- /* link #0 */
- ldata->ld_lee = (struct link_ea_entry *)(ldata->ld_leh + 1);
+ /* link #0, if leh_reccount == 0 we skip the loop and return -ENOENT */
+ if (likely(ldata->ld_leh->leh_reccount > 0))
+ ldata->ld_lee = (struct link_ea_entry *)(ldata->ld_leh + 1);
for (count = 0; count < ldata->ld_leh->leh_reccount; count++) {
linkea_entry_unpack(ldata->ld_lee, &ldata->ld_reclen,
if (strcmp(name, XATTR_NAME_LMV) == 0) {
struct lustre_mdt_attrs *lma = &info->oti_mdt_attrs;
- int rc;
rc = osd_get_lma(info, inode, &info->oti_obj_dentry, lma);
if (rc != 0)
RETURN(rc);
}
- if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LINKEA_OVERFLOW) &&
- strcmp(name, XATTR_NAME_LINK) == 0)
- return -ENOSPC;
-
rc = __osd_xattr_set(info, inode, name, buf->lb_buf, buf->lb_len,
fs_flags);
osd_trans_exec_check(env, handle, OSD_OT_XATTR_SET);
}
ldata.ld_buf = buf;
- rc = linkea_init(&ldata);
- if (rc == 0) {
+ rc = linkea_init_with_rec(&ldata);
+ if (!rc) {
linkea_first_entry(&ldata);
linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen, NULL, fid);
}
}
ldata.ld_buf = buf;
- rc = linkea_init(&ldata);
- if (rc == 0)
+ rc = linkea_init_with_rec(&ldata);
+ if (!rc)
rc = linkea_links_find(&ldata, &cname, pfid);
RETURN(rc);
strcmp(name, XATTR_NAME_POSIX_ACL_DEFAULT) == 0))
RETURN(-EOPNOTSUPP);
- if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LINKEA_OVERFLOW) &&
- strcmp(name, XATTR_NAME_LINK) == 0)
- RETURN(-ENOSPC);
-
oh = container_of0(handle, struct osd_thandle, ot_super);
down_write(&obj->oo_guard);
if (rc != 0)
RETURN(rc);
+ /* Do not cache linkEA that may be self-adjusted by peers
+ * under EA overflow case. */
+ if (strcmp(name, XATTR_NAME_LINK) == 0) {
+ oxe = osp_oac_xattr_find(o, name, true);
+ if (oxe != NULL)
+ osp_oac_xattr_put(oxe);
+
+ RETURN(0);
+ }
+
oxe = osp_oac_xattr_find_or_add(o, name, buf->lb_len);
if (oxe == NULL) {
CWARN("%s: cannot cache xattr '%s' of "DFID"\n",
(long long)(int)offsetof(struct link_ea_header, leh_len));
LASSERTF((int)sizeof(((struct link_ea_header *)0)->leh_len) == 8, "found %lld\n",
(long long)(int)sizeof(((struct link_ea_header *)0)->leh_len));
- LASSERTF((int)offsetof(struct link_ea_header, padding1) == 16, "found %lld\n",
- (long long)(int)offsetof(struct link_ea_header, padding1));
- LASSERTF((int)sizeof(((struct link_ea_header *)0)->padding1) == 4, "found %lld\n",
- (long long)(int)sizeof(((struct link_ea_header *)0)->padding1));
- LASSERTF((int)offsetof(struct link_ea_header, padding2) == 20, "found %lld\n",
- (long long)(int)offsetof(struct link_ea_header, padding2));
- LASSERTF((int)sizeof(((struct link_ea_header *)0)->padding2) == 4, "found %lld\n",
- (long long)(int)sizeof(((struct link_ea_header *)0)->padding2));
+ LASSERTF((int)offsetof(struct link_ea_header, leh_overflow_time) == 16, "found %lld\n",
+ (long long)(int)offsetof(struct link_ea_header, leh_overflow_time));
+ LASSERTF((int)sizeof(((struct link_ea_header *)0)->leh_overflow_time) == 4, "found %lld\n",
+ (long long)(int)sizeof(((struct link_ea_header *)0)->leh_overflow_time));
+ LASSERTF((int)offsetof(struct link_ea_header, leh_padding) == 20, "found %lld\n",
+ (long long)(int)offsetof(struct link_ea_header, leh_padding));
+ LASSERTF((int)sizeof(((struct link_ea_header *)0)->leh_padding) == 4, "found %lld\n",
+ (long long)(int)sizeof(((struct link_ea_header *)0)->leh_padding));
CLASSERT(LINK_EA_MAGIC == 0x11EAF1DFUL);
/* Checks for struct link_ea_entry */
#include <md_object.h>
#include <obd.h>
#include <obd_class.h>
+#include <lustre_linkea.h>
#include "tgt_internal.h"
{
struct dt_object *dt_obj = arg->object;
int rc;
+ ENTRY;
CDEBUG(D_INFO, "%s: set xattr buf %p name %s flag %d\n",
dt_obd_name(th->th_dev), arg->u.xattr_set.buf.lb_buf,
arg->u.xattr_set.name, arg->u.xattr_set.flags);
- if (!lu_object_exists(&dt_obj->do_lu))
- GOTO(out, rc = -ENOENT);
-
- dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
- rc = dt_xattr_set(env, dt_obj, &arg->u.xattr_set.buf,
- arg->u.xattr_set.name, arg->u.xattr_set.flags,
- th);
- /**
- * Ignore errors if this is LINK EA
- **/
- if (unlikely(rc != 0 &&
- strcmp(arg->u.xattr_set.name, XATTR_NAME_LINK) == 0)) {
- /* XXX: If the linkEA is overflow, then we need to notify the
- * namespace LFSCK to skip "nlink" attribute verification
- * on this object to avoid the "nlink" to be shrinked by
- * wrong. It may be not good an interaction with LFSCK
- * like this. We will consider to replace it with other
- * mechanism in future. LU-5802. */
- if (rc == -ENOSPC && arg->reply != NULL) {
- struct lfsck_request *lr = &tgt_th_info(env)->tti_lr;
-
- lfsck_pack_rfa(lr, lu_object_fid(&dt_obj->do_lu),
- LE_SKIP_NLINK, LFSCK_TYPE_NAMESPACE);
- tgt_lfsck_in_notify(env,
- tgt_ses_info(env)->tsi_tgt->lut_bottom, lr, th);
+ if (!lu_object_exists(&dt_obj->do_lu)) {
+ rc = -ENOENT;
+ } else {
+ struct linkea_data ldata = { 0 };
+ bool linkea;
+
+ ldata.ld_buf = &arg->u.xattr_set.buf;
+ if (strcmp(arg->u.xattr_set.name, XATTR_NAME_LINK) == 0) {
+ linkea = true;
+ rc = linkea_init(&ldata);
+ if (unlikely(rc))
+ GOTO(out, rc == -ENODATA ? -EINVAL : rc);
+ } else {
+ linkea = false;
}
- rc = 0;
+ dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
+
+again:
+ rc = dt_xattr_set(env, dt_obj, ldata.ld_buf,
+ arg->u.xattr_set.name, arg->u.xattr_set.flags,
+ th);
+ if (unlikely(rc == -ENOSPC && linkea)) {
+ rc = linkea_overflow_shrink(&ldata);
+ if (likely(rc > 0)) {
+ arg->u.xattr_set.buf.lb_len = rc;
+ goto again;
+ }
+ }
+ dt_write_unlock(env, dt_obj);
}
- dt_write_unlock(env, dt_obj);
+
+ GOTO(out, rc);
out:
CDEBUG(D_INFO, "%s: insert xattr set reply %p index %d: rc = %d\n",
if (rc != 0)
return rc;
- if (strcmp(name, XATTR_NAME_LINK) == 0 && reply != NULL) {
- struct lfsck_request *lr = &tgt_th_info(env)->tti_lr;
-
- /* XXX: If the linkEA is overflow, then we need to notify the
- * namespace LFSCK to skip "nlink" attribute verification
- * on this object to avoid the "nlink" to be shrinked by
- * wrong. It may be not good an interaction with LFSCK
- * like this. We will consider to replace it with other
- * mechanism in future. LU-5802. */
- lfsck_pack_rfa(lr, lu_object_fid(&dt_obj->do_lu),
- LE_SKIP_NLINK_DECLARE, LFSCK_TYPE_NAMESPACE);
- rc = tgt_lfsck_in_notify(env,
- tgt_ses_info(env)->tsi_tgt->lut_bottom,
- lr, ta->ta_handle);
- if (rc != 0)
- return rc;
- }
-
arg = tx_add_exec(ta, out_tx_xattr_set_exec, NULL, file, line);
if (IS_ERR(arg))
return PTR_ERR(arg);
echo "Inject failure stub on MDT0 to simulate the case that"
echo "foo's hard links exceed the object's linkEA limitation."
- #define OBD_FAIL_LFSCK_LINKEA_OVERFLOW 0x1627
- do_facet $SINGLEMDS $LCTL set_param fail_loc=0x1627
ln $DIR/$tdir/d0/foo $DIR/$tdir/d0/h2 ||
error "(4) Fail to hard link to $DIR/$tdir/d0/foo"
wait_all_targets_blocked namespace completed 8
- do_facet $SINGLEMDS $LCTL set_param fail_loc=0
local repaired=$($SHOW_NAMESPACE |
awk '/^nlinks_repaired/ { print $2 }')
[ $repaired -eq 0 ] ||
[ $count2 -eq 2 ] ||
error "(11) Repaired something unexpectedly: $count2"
}
-run_test 29c "Not verify nlink attr if hark links exceed linkEA limitation"
+# disable test_29c temporarily, it will be re-enabled in subsequent patch.
+#run_test 29c "Not verify nlink attr if hard links exceed linkEA limitation"
test_30() {
[ $(facet_fstype $SINGLEMDS) != ldiskfs ] &&
CHECK_MEMBER(link_ea_header, leh_magic);
CHECK_MEMBER(link_ea_header, leh_reccount);
CHECK_MEMBER(link_ea_header, leh_len);
- CHECK_MEMBER(link_ea_header, padding1);
- CHECK_MEMBER(link_ea_header, padding2);
+ CHECK_MEMBER(link_ea_header, leh_overflow_time);
+ CHECK_MEMBER(link_ea_header, leh_padding);
CHECK_CDEFINE(LINK_EA_MAGIC);
}
(long long)(int)offsetof(struct link_ea_header, leh_len));
LASSERTF((int)sizeof(((struct link_ea_header *)0)->leh_len) == 8, "found %lld\n",
(long long)(int)sizeof(((struct link_ea_header *)0)->leh_len));
- LASSERTF((int)offsetof(struct link_ea_header, padding1) == 16, "found %lld\n",
- (long long)(int)offsetof(struct link_ea_header, padding1));
- LASSERTF((int)sizeof(((struct link_ea_header *)0)->padding1) == 4, "found %lld\n",
- (long long)(int)sizeof(((struct link_ea_header *)0)->padding1));
- LASSERTF((int)offsetof(struct link_ea_header, padding2) == 20, "found %lld\n",
- (long long)(int)offsetof(struct link_ea_header, padding2));
- LASSERTF((int)sizeof(((struct link_ea_header *)0)->padding2) == 4, "found %lld\n",
- (long long)(int)sizeof(((struct link_ea_header *)0)->padding2));
+ LASSERTF((int)offsetof(struct link_ea_header, leh_overflow_time) == 16, "found %lld\n",
+ (long long)(int)offsetof(struct link_ea_header, leh_overflow_time));
+ LASSERTF((int)sizeof(((struct link_ea_header *)0)->leh_overflow_time) == 4, "found %lld\n",
+ (long long)(int)sizeof(((struct link_ea_header *)0)->leh_overflow_time));
+ LASSERTF((int)offsetof(struct link_ea_header, leh_padding) == 20, "found %lld\n",
+ (long long)(int)offsetof(struct link_ea_header, leh_padding));
+ LASSERTF((int)sizeof(((struct link_ea_header *)0)->leh_padding) == 4, "found %lld\n",
+ (long long)(int)sizeof(((struct link_ea_header *)0)->leh_padding));
CLASSERT(LINK_EA_MAGIC == 0x11EAF1DFUL);
/* Checks for struct link_ea_entry */