+ if (ldata->ld_buf && ldata->ld_buf->lb_len > OBD_ALLOC_BIG)
+ /* if we vmalloced a large buffer drop it */
+ lu_buf_free(ldata->ld_buf);
+
+ return rc;
+}
+
+static inline int mdd_links_add(const struct lu_env *env,
+ struct mdd_object *mdd_obj,
+ const struct lu_fid *pfid,
+ const struct lu_name *lname,
+ struct thandle *handle,
+ struct linkea_data *ldata, int first)
+{
+ return mdd_links_rename(env, mdd_obj, NULL, NULL,
+ pfid, lname, handle, ldata, first, 0);
+}
+
+static inline int mdd_links_del(const struct lu_env *env,
+ struct mdd_object *mdd_obj,
+ const struct lu_fid *pfid,
+ const struct lu_name *lname,
+ struct thandle *handle)
+{
+ return mdd_links_rename(env, mdd_obj, pfid, lname,
+ NULL, NULL, handle, NULL, 0, 0);
+}
+
+/** Read the link EA into a temp buffer.
+ * Uses the mdd_thread_info::mti_big_buf since it is generally large.
+ * A pointer to the buffer is stored in \a ldata::ld_buf.
+ *
+ * \retval 0 or error
+ */
+int mdd_links_read(const struct lu_env *env, struct mdd_object *mdd_obj,
+ struct linkea_data *ldata)
+{
+ int rc;
+
+ if (!mdd_object_exists(mdd_obj))
+ return -ENODATA;
+
+ /* First try a small buf */
+ LASSERT(env != NULL);
+ ldata->ld_buf = lu_buf_check_and_alloc(&mdd_env_info(env)->mti_link_buf,
+ PAGE_CACHE_SIZE);
+ if (ldata->ld_buf->lb_buf == NULL)
+ return -ENOMEM;
+
+ rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf, XATTR_NAME_LINK);
+ if (rc == -ERANGE) {
+ /* Buf was too small, figure out what we need. */
+ lu_buf_free(ldata->ld_buf);
+ rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf,
+ XATTR_NAME_LINK);
+ if (rc < 0)
+ return rc;
+ ldata->ld_buf = lu_buf_check_and_alloc(ldata->ld_buf, rc);
+ if (ldata->ld_buf->lb_buf == NULL)
+ return -ENOMEM;
+ rc = mdo_xattr_get(env, mdd_obj, ldata->ld_buf,
+ XATTR_NAME_LINK);
+ }
+ if (rc < 0) {
+ lu_buf_free(ldata->ld_buf);
+ ldata->ld_buf = NULL;
+ return rc;
+ }
+
+ return linkea_init(ldata);
+}
+
+/** Read the link EA into a temp buffer.
+ * Uses the name_buf since it is generally large.
+ * \retval IS_ERR err
+ * \retval ptr to \a lu_buf (always \a mti_big_buf)
+ */
+struct lu_buf *mdd_links_get(const struct lu_env *env,
+ struct mdd_object *mdd_obj)
+{
+ struct linkea_data ldata = { NULL };
+ int rc;
+
+ rc = mdd_links_read(env, mdd_obj, &ldata);
+ return rc ? ERR_PTR(rc) : ldata.ld_buf;
+}
+
+int mdd_links_write(const struct lu_env *env, struct mdd_object *mdd_obj,
+ struct linkea_data *ldata, struct thandle *handle)
+{
+ const struct lu_buf *buf = mdd_buf_get_const(env, ldata->ld_buf->lb_buf,
+ ldata->ld_leh->leh_len);
+ int rc;
+
+ if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NO_LINKEA))
+ return 0;
+
+ rc = mdo_xattr_set(env, mdd_obj, buf, XATTR_NAME_LINK, 0, handle);
+ if (unlikely(rc == -ENOSPC) && S_ISREG(mdd_object_type(mdd_obj)) &&
+ mdd_object_remote(mdd_obj) == 0) {
+ struct lfsck_request *lr = &mdd_env_info(env)->mti_lr;
+ struct thandle *sub_th;
+
+ /* XXX: If the linkEA is overflow, then we need to notify the
+ * namespace LFSCK to skip "nlink" attribute verification
+ * on this object to avoid the "nlink" to be shrinked by
+ * wrong. It may be not good an interaction with LFSCK
+ * like this. We will consider to replace it with other
+ * mechanism in future. LU-5802. */
+ lfsck_pack_rfa(lr, mdo2fid(mdd_obj), LE_SKIP_NLINK,
+ LFSCK_TYPE_NAMESPACE);
+
+ sub_th = thandle_get_sub_by_dt(env, handle,
+ mdo2mdd(&mdd_obj->mod_obj)->mdd_bottom);
+ lfsck_in_notify(env, mdo2mdd(&mdd_obj->mod_obj)->mdd_bottom,
+ lr, sub_th);
+ }
+
+ return rc;
+}
+
+int mdd_declare_links_add(const struct lu_env *env, struct mdd_object *mdd_obj,
+ struct thandle *handle, struct linkea_data *ldata,
+ enum mdd_links_add_overflow overflow)
+{
+ int rc;
+ int ea_len;
+ void *linkea;
+
+ if (ldata != NULL && ldata->ld_leh != NULL) {
+ ea_len = ldata->ld_leh->leh_len;
+ linkea = ldata->ld_buf->lb_buf;
+ } else {
+ ea_len = DEFAULT_LINKEA_SIZE;
+ linkea = NULL;
+ }
+
+ /* XXX: max size? */
+ rc = mdo_declare_xattr_set(env, mdd_obj,
+ mdd_buf_get_const(env, linkea, ea_len),
+ XATTR_NAME_LINK, 0, handle);
+ if (rc != 0)
+ return rc;
+
+ if (mdd_object_remote(mdd_obj) == 0 && overflow == MLAO_CHECK) {
+ struct lfsck_request *lr = &mdd_env_info(env)->mti_lr;
+ struct thandle *sub_th;
+
+ /* XXX: If the linkEA is overflow, then we need to notify the
+ * namespace LFSCK to skip "nlink" attribute verification
+ * on this object to avoid the "nlink" to be shrinked by
+ * wrong. It may be not good an interaction with LFSCK
+ * like this. We will consider to replace it with other
+ * mechanism in future. LU-5802. */
+ lfsck_pack_rfa(lr, mdo2fid(mdd_obj), LE_SKIP_NLINK_DECLARE,
+ LFSCK_TYPE_NAMESPACE);
+
+ sub_th = thandle_get_sub_by_dt(env, handle,
+ mdo2mdd(&mdd_obj->mod_obj)->mdd_bottom);
+ rc = lfsck_in_notify(env,
+ mdo2mdd(&mdd_obj->mod_obj)->mdd_bottom,
+ lr, sub_th);
+ }
+
+ return rc;
+}
+
+static inline int mdd_declare_links_del(const struct lu_env *env,
+ struct mdd_object *c,
+ struct thandle *handle)
+{
+ int rc = 0;
+
+ /* For directory, the linkEA will be removed together
+ * with the object. */
+ if (!S_ISDIR(mdd_object_type(c)))
+ rc = mdd_declare_links_add(env, c, handle, NULL, MLAO_IGNORE);
+
+ return rc;
+}
+
+static int mdd_declare_link(const struct lu_env *env,
+ struct mdd_device *mdd,
+ struct mdd_object *p,
+ struct mdd_object *c,
+ const struct lu_name *name,
+ struct thandle *handle,
+ struct lu_attr *la,
+ struct linkea_data *data)
+{
+ int rc;
+
+ rc = mdo_declare_index_insert(env, p, mdo2fid(c), mdd_object_type(c),
+ name->ln_name, handle);
+ if (rc != 0)
+ return rc;
+
+ rc = mdo_declare_ref_add(env, c, handle);
+ if (rc != 0)
+ return rc;
+
+ if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_MORE_NLINK)) {
+ rc = mdo_declare_ref_add(env, c, handle);
+ if (rc != 0)
+ return rc;
+ }
+
+ la->la_valid = LA_CTIME | LA_MTIME;
+ rc = mdo_declare_attr_set(env, p, la, handle);
+ if (rc != 0)
+ return rc;
+
+ la->la_valid = LA_CTIME;
+ rc = mdo_declare_attr_set(env, c, la, handle);
+ if (rc != 0)
+ return rc;
+
+ rc = mdd_declare_links_add(env, c, handle, data,
+ S_ISREG(mdd_object_type(c)) ? MLAO_CHECK : MLAO_IGNORE);
+ if (rc != 0)
+ return rc;
+
+ rc = mdd_declare_changelog_store(env, mdd, name, NULL, handle);
+
+ return rc;
+}
+
+static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
+ struct md_object *src_obj, const struct lu_name *lname,
+ struct md_attr *ma)
+{
+ const char *name = lname->ln_name;
+ struct lu_attr *la = &mdd_env_info(env)->mti_la_for_fix;
+ struct mdd_object *mdd_tobj = md2mdd_obj(tgt_obj);
+ struct mdd_object *mdd_sobj = md2mdd_obj(src_obj);
+ struct lu_attr *cattr = MDD_ENV_VAR(env, cattr);
+ struct lu_attr *tattr = MDD_ENV_VAR(env, tattr);
+ struct mdd_device *mdd = mdo2mdd(src_obj);
+ struct thandle *handle;
+ struct linkea_data *ldata = &mdd_env_info(env)->mti_link_data;
+ int rc;
+ ENTRY;
+
+ rc = mdd_la_get(env, mdd_sobj, cattr);
+ if (rc != 0)
+ RETURN(rc);
+
+ rc = mdd_la_get(env, mdd_tobj, tattr);
+ if (rc != 0)
+ RETURN(rc);
+
+ handle = mdd_trans_create(env, mdd);
+ if (IS_ERR(handle))
+ GOTO(out_pending, rc = PTR_ERR(handle));
+
+ memset(ldata, 0, sizeof(*ldata));
+
+ LASSERT(ma->ma_attr.la_valid & LA_CTIME);
+ la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
+
+ rc = mdd_declare_link(env, mdd, mdd_tobj, mdd_sobj, lname, handle,
+ la, ldata);
+ if (rc)
+ GOTO(stop, rc);