#include <md_object.h>
#include <obd.h>
#include <obd_class.h>
+#include <lustre_linkea.h>
#include "tgt_internal.h"
obdo->o_valid = 0;
obdo_from_la(obdo, attr, attr->la_valid);
- lustre_set_wire_obdo(NULL, obdo, obdo);
if (parent_fid != NULL) {
struct lu_fid *tmp;
obdo->o_valid = 0;
obdo_from_la(obdo, attr, attr->la_valid);
- lustre_set_wire_obdo(NULL, obdo, obdo);
RETURN(0);
}
}
EXPORT_SYMBOL(out_index_delete_pack);
-int out_object_destroy_pack(const struct lu_env *env,
- struct object_update *update,
- size_t *max_update_size, const struct lu_fid *fid)
+int out_destroy_pack(const struct lu_env *env, struct object_update *update,
+ size_t *max_update_size, const struct lu_fid *fid)
{
return out_update_pack(env, update, max_update_size, OUT_DESTROY, fid,
0, NULL, NULL, 0);
}
-EXPORT_SYMBOL(out_object_destroy_pack);
+EXPORT_SYMBOL(out_destroy_pack);
int out_write_pack(const struct lu_env *env, struct object_update *update,
size_t *max_update_size, const struct lu_fid *fid,
struct dt_object *dt_obj = arg->object;
int rc;
- CDEBUG(D_INFO, "write "DFID" pos "LPU64" buf %p, len %lu\n",
+ CDEBUG(D_INFO, "write "DFID" pos %llu buf %p, len %lu\n",
PFID(lu_object_fid(&dt_obj->do_lu)), arg->u.write.pos,
arg->u.write.buf.lb_buf, (unsigned long)arg->u.write.buf.lb_len);
{
struct dt_object *dt_obj = arg->object;
int rc;
+ ENTRY;
CDEBUG(D_INFO, "%s: set xattr buf %p name %s flag %d\n",
dt_obd_name(th->th_dev), arg->u.xattr_set.buf.lb_buf,
arg->u.xattr_set.name, arg->u.xattr_set.flags);
- if (!lu_object_exists(&dt_obj->do_lu))
- GOTO(out, rc = -ENOENT);
+ if (!lu_object_exists(&dt_obj->do_lu)) {
+ rc = -ENOENT;
+ } else {
+ struct linkea_data ldata = { 0 };
+ bool linkea;
+
+ ldata.ld_buf = &arg->u.xattr_set.buf;
+ if (strcmp(arg->u.xattr_set.name, XATTR_NAME_LINK) == 0) {
+ struct link_ea_header *leh;
+
+ linkea = true;
+ rc = linkea_init(&ldata);
+ if (unlikely(rc))
+ GOTO(out, rc == -ENODATA ? -EINVAL : rc);
+
+ leh = ldata.ld_leh;
+ LASSERT(leh != NULL);
+
+ /* If the new linkEA contains overflow timestamp,
+ * then two cases:
+ *
+ * 1. The old linkEA for the object has already
+ * overflowed before current setting, the new
+ * linkEA does not contains new link entry. So
+ * the linkEA overflow timestamp is unchanged.
+ *
+ * 2. There are new link entry in the new linkEA,
+ * so its overflow timestamp is differnt from
+ * the old one. Usually, the overstamp in the
+ * given linkEA is newer. But because of clock
+ * drift among MDTs, the timestamp may become
+ * older. So here, we convert the timestamp to
+ * the server local time. Then namespace LFSCK
+ * that uses local time can handle it easily. */
+ if (unlikely(leh->leh_overflow_time)) {
+ struct lu_buf tbuf = { 0 };
+ bool update = false;
+
+ lu_buf_alloc(&tbuf, MAX_LINKEA_SIZE);
+ if (tbuf.lb_buf == NULL)
+ GOTO(unlock, rc = -ENOMEM);
+
+ rc = dt_xattr_get(env, dt_obj, &tbuf,
+ XATTR_NAME_LINK);
+ if (rc > 0) {
+ struct linkea_data tdata = { 0 };
+
+ tdata.ld_buf = &tbuf;
+ rc = linkea_init(&tdata);
+ if (rc || leh->leh_overflow_time !=
+ tdata.ld_leh->leh_overflow_time)
+ update = true;
+ } else {
+ /* Update the timestamp by force if
+ * fail to load the old linkEA. */
+ update = true;
+ }
+
+ lu_buf_free(&tbuf);
+ if (update) {
+ leh->leh_overflow_time =
+ cfs_time_current_sec();
+ if (unlikely(!leh->leh_overflow_time))
+ leh->leh_overflow_time++;
+ }
+ }
+ } else {
+ linkea = false;
+ }
- dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
- rc = dt_xattr_set(env, dt_obj, &arg->u.xattr_set.buf,
- arg->u.xattr_set.name, arg->u.xattr_set.flags,
- th);
- /**
- * Ignore errors if this is LINK EA
- **/
- if (unlikely(rc != 0 &&
- strcmp(arg->u.xattr_set.name, XATTR_NAME_LINK) == 0)) {
- /* XXX: If the linkEA is overflow, then we need to notify the
- * namespace LFSCK to skip "nlink" attribute verification
- * on this object to avoid the "nlink" to be shrinked by
- * wrong. It may be not good an interaction with LFSCK
- * like this. We will consider to replace it with other
- * mechanism in future. LU-5802. */
- if (rc == -ENOSPC && arg->reply != NULL) {
- struct lfsck_request *lr = &tgt_th_info(env)->tti_lr;
-
- lfsck_pack_rfa(lr, lu_object_fid(&dt_obj->do_lu),
- LE_SKIP_NLINK, LFSCK_TYPE_NAMESPACE);
- tgt_lfsck_in_notify(env,
- tgt_ses_info(env)->tsi_tgt->lut_bottom, lr, th);
+ dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
+
+again:
+ rc = dt_xattr_set(env, dt_obj, ldata.ld_buf,
+ arg->u.xattr_set.name, arg->u.xattr_set.flags,
+ th);
+ if (unlikely(rc == -ENOSPC && linkea)) {
+ rc = linkea_overflow_shrink(&ldata);
+ if (likely(rc > 0)) {
+ arg->u.xattr_set.buf.lb_len = rc;
+ goto again;
+ }
}
- rc = 0;
+unlock:
+ dt_write_unlock(env, dt_obj);
}
- dt_write_unlock(env, dt_obj);
+
+ GOTO(out, rc);
out:
CDEBUG(D_INFO, "%s: insert xattr set reply %p index %d: rc = %d\n",
if (rc != 0)
return rc;
- if (strcmp(name, XATTR_NAME_LINK) == 0 && reply != NULL) {
- struct lfsck_request *lr = &tgt_th_info(env)->tti_lr;
-
- /* XXX: If the linkEA is overflow, then we need to notify the
- * namespace LFSCK to skip "nlink" attribute verification
- * on this object to avoid the "nlink" to be shrinked by
- * wrong. It may be not good an interaction with LFSCK
- * like this. We will consider to replace it with other
- * mechanism in future. LU-5802. */
- lfsck_pack_rfa(lr, lu_object_fid(&dt_obj->do_lu),
- LE_SKIP_NLINK_DECLARE, LFSCK_TYPE_NAMESPACE);
- rc = tgt_lfsck_in_notify(env,
- tgt_ses_info(env)->tsi_tgt->lut_bottom,
- lr, ta->ta_handle);
- if (rc != 0)
- return rc;
- }
-
arg = tx_add_exec(ta, out_tx_xattr_set_exec, NULL, file, line);
if (IS_ERR(arg))
return PTR_ERR(arg);