* GPL HEADER END
*/
/*
- * Copyright (c) 2014, Intel Corporation.
+ * Copyright (c) 2014, 2015, Intel Corporation.
*/
/*
* lustre/target/out_lib.c
#define DEBUG_SUBSYSTEM S_CLASS
#include <lu_target.h>
-#include <md_object.h>
+#include <lustre_obdo.h>
#include <lustre_update.h>
+#include <md_object.h>
#include <obd.h>
#include <obd_class.h>
+#include <lustre_linkea.h>
+
#include "tgt_internal.h"
const char *update_op_str(__u16 opc)
size_t *max_update_size,
enum update_type update_op,
const struct lu_fid *fid,
- unsigned int param_count, __u16 *param_sizes)
+ unsigned int param_count,
+ __u16 *param_sizes,
+ __u32 reply_size)
{
struct object_update_param *param;
unsigned int i;
size_t update_size;
+ if (((reply_size + 7) >> 3) >= 1ULL << 16)
+ return -EINVAL;
+
/* Check whether the packing exceeding the maxima update length */
update_size = sizeof(*update);
for (i = 0; i < param_count; i++)
update->ou_fid = *fid;
update->ou_type = update_op;
update->ou_params_count = param_count;
+ update->ou_result_size = reply_size;
param = &update->ou_params[0];
for (i = 0; i < param_count; i++) {
param->oup_len = param_sizes[i];
int out_update_pack(const struct lu_env *env, struct object_update *update,
size_t *max_update_size, enum update_type op,
const struct lu_fid *fid, unsigned int param_count,
- __u16 *param_sizes, const void **param_bufs)
+ __u16 *param_sizes, const void **param_bufs,
+ __u32 reply_size)
{
struct object_update_param *param;
unsigned int i;
ENTRY;
rc = out_update_header_pack(env, update, max_update_size, op, fid,
- param_count, param_sizes);
+ param_count, param_sizes, reply_size);
if (rc != 0)
RETURN(rc);
}
rc = out_update_header_pack(env, update, max_update_size, OUT_CREATE,
- fid, buf_count, sizes);
+ fid, buf_count, sizes, 0);
if (rc != 0)
RETURN(rc);
obdo = object_update_param_get(update, 0, NULL);
- LASSERT(obdo != NULL);
+ if (IS_ERR(obdo))
+ RETURN(PTR_ERR(obdo));
+
obdo->o_valid = 0;
obdo_from_la(obdo, attr, attr->la_valid);
- lustre_set_wire_obdo(NULL, obdo, obdo);
if (parent_fid != NULL) {
struct lu_fid *tmp;
tmp = object_update_param_get(update, 1, NULL);
- LASSERT(tmp != NULL);
+ if (IS_ERR(tmp))
+ RETURN(PTR_ERR(tmp));
+
fid_cpu_to_le(tmp, parent_fid);
}
size_t *max_update_size, const struct lu_fid *fid)
{
return out_update_pack(env, update, max_update_size, OUT_REF_DEL, fid,
- 0, NULL, NULL);
+ 0, NULL, NULL, 0);
}
EXPORT_SYMBOL(out_ref_del_pack);
size_t *max_update_size, const struct lu_fid *fid)
{
return out_update_pack(env, update, max_update_size, OUT_REF_ADD, fid,
- 0, NULL, NULL);
+ 0, NULL, NULL, 0);
}
EXPORT_SYMBOL(out_ref_add_pack);
ENTRY;
rc = out_update_header_pack(env, update, max_update_size,
- OUT_ATTR_SET, fid, 1, &size);
+ OUT_ATTR_SET, fid, 1, &size, 0);
if (rc != 0)
RETURN(rc);
obdo = object_update_param_get(update, 0, NULL);
- LASSERT(obdo != NULL);
+ if (IS_ERR(obdo))
+ RETURN(PTR_ERR(obdo));
+
obdo->o_valid = 0;
obdo_from_la(obdo, attr, attr->la_valid);
- lustre_set_wire_obdo(NULL, obdo, obdo);
RETURN(0);
}
(char *)&flag};
return out_update_pack(env, update, max_update_size, OUT_XATTR_SET,
- fid, ARRAY_SIZE(sizes), sizes, bufs);
+ fid, ARRAY_SIZE(sizes), sizes, bufs, 0);
}
EXPORT_SYMBOL(out_xattr_set_pack);
__u16 size = strlen(name) + 1;
return out_update_pack(env, update, max_update_size, OUT_XATTR_DEL,
- fid, 1, &size, (const void **)&name);
+ fid, 1, &size, (const void **)&name, 0);
}
EXPORT_SYMBOL(out_xattr_del_pack);
fid_cpu_to_le(&rec_fid, rec1->rec_fid);
return out_update_pack(env, update, max_update_size, OUT_INDEX_INSERT,
- fid, ARRAY_SIZE(sizes), sizes, bufs);
+ fid, ARRAY_SIZE(sizes), sizes, bufs, 0);
}
EXPORT_SYMBOL(out_index_insert_pack);
const void *buf = key;
return out_update_pack(env, update, max_update_size, OUT_INDEX_DELETE,
- fid, 1, &size, &buf);
+ fid, 1, &size, &buf, 0);
}
EXPORT_SYMBOL(out_index_delete_pack);
-int out_object_destroy_pack(const struct lu_env *env,
- struct object_update *update,
- size_t *max_update_size, const struct lu_fid *fid)
+int out_destroy_pack(const struct lu_env *env, struct object_update *update,
+ size_t *max_update_size, const struct lu_fid *fid)
{
return out_update_pack(env, update, max_update_size, OUT_DESTROY, fid,
- 0, NULL, NULL);
+ 0, NULL, NULL, 0);
}
-EXPORT_SYMBOL(out_object_destroy_pack);
+EXPORT_SYMBOL(out_destroy_pack);
int out_write_pack(const struct lu_env *env, struct object_update *update,
size_t *max_update_size, const struct lu_fid *fid,
pos = cpu_to_le64(pos);
rc = out_update_pack(env, update, max_update_size, OUT_WRITE, fid,
- ARRAY_SIZE(sizes), sizes, bufs);
+ ARRAY_SIZE(sizes), sizes, bufs, 0);
return rc;
}
EXPORT_SYMBOL(out_write_pack);
const void *name = key;
__u16 size = strlen((char *)name) + 1;
+ /* XXX: this shouldn't be hardcoded */
return out_update_pack(env, update, max_update_size, OUT_INDEX_LOOKUP,
- fid, 1, &size, &name);
+ fid, 1, &size, &name, 256);
}
EXPORT_SYMBOL(out_index_lookup_pack);
size_t *max_update_size, const struct lu_fid *fid)
{
return out_update_pack(env, update, max_update_size, OUT_ATTR_GET,
- fid, 0, NULL, NULL);
+ fid, 0, NULL, NULL, sizeof(struct obdo));
}
EXPORT_SYMBOL(out_attr_get_pack);
int out_xattr_get_pack(const struct lu_env *env, struct object_update *update,
size_t *max_update_size, const struct lu_fid *fid,
- const char *name)
+ const char *name, const int bufsize)
{
__u16 size;
size = strlen(name) + 1;
return out_update_pack(env, update, max_update_size, OUT_XATTR_GET,
- fid, 1, &size, (const void **)&name);
+ fid, 1, &size, (const void **)&name, bufsize);
}
EXPORT_SYMBOL(out_xattr_get_pack);
__u16 sizes[2] = {sizeof(size), sizeof(pos)};
const void *bufs[2] = {&size, &pos};
+ LASSERT(size > 0);
size = cpu_to_le64(size);
pos = cpu_to_le64(pos);
return out_update_pack(env, update, max_update_size, OUT_READ, fid,
- ARRAY_SIZE(sizes), sizes, bufs);
+ ARRAY_SIZE(sizes), sizes, bufs, size);
}
EXPORT_SYMBOL(out_read_pack);
struct dt_object *dt_obj = arg->object;
int rc;
- CDEBUG(D_INFO, "write "DFID" pos "LPU64" buf %p, len %lu\n",
+ CDEBUG(D_INFO, "write "DFID" pos %llu buf %p, len %lu\n",
PFID(lu_object_fid(&dt_obj->do_lu)), arg->u.write.pos,
arg->u.write.buf.lb_buf, (unsigned long)arg->u.write.buf.lb_len);
{
struct dt_object *dt_obj = arg->object;
int rc;
+ ENTRY;
CDEBUG(D_INFO, "%s: set xattr buf %p name %s flag %d\n",
dt_obd_name(th->th_dev), arg->u.xattr_set.buf.lb_buf,
arg->u.xattr_set.name, arg->u.xattr_set.flags);
- if (!lu_object_exists(&dt_obj->do_lu))
- GOTO(out, rc = -ENOENT);
+ if (!lu_object_exists(&dt_obj->do_lu)) {
+ rc = -ENOENT;
+ } else {
+ struct linkea_data ldata = { 0 };
+ bool linkea;
+
+ ldata.ld_buf = &arg->u.xattr_set.buf;
+ if (strcmp(arg->u.xattr_set.name, XATTR_NAME_LINK) == 0) {
+ struct link_ea_header *leh;
+
+ linkea = true;
+ rc = linkea_init(&ldata);
+ if (unlikely(rc))
+ GOTO(out, rc == -ENODATA ? -EINVAL : rc);
+
+ leh = ldata.ld_leh;
+ LASSERT(leh != NULL);
+
+ /* If the new linkEA contains overflow timestamp,
+ * then two cases:
+ *
+ * 1. The old linkEA for the object has already
+ * overflowed before current setting, the new
+ * linkEA does not contains new link entry. So
+ * the linkEA overflow timestamp is unchanged.
+ *
+ * 2. There are new link entry in the new linkEA,
+ * so its overflow timestamp is differnt from
+ * the old one. Usually, the overstamp in the
+ * given linkEA is newer. But because of clock
+ * drift among MDTs, the timestamp may become
+ * older. So here, we convert the timestamp to
+ * the server local time. Then namespace LFSCK
+ * that uses local time can handle it easily. */
+ if (unlikely(leh->leh_overflow_time)) {
+ struct lu_buf tbuf = { 0 };
+ bool update = false;
+
+ lu_buf_alloc(&tbuf, MAX_LINKEA_SIZE);
+ if (tbuf.lb_buf == NULL)
+ GOTO(unlock, rc = -ENOMEM);
+
+ rc = dt_xattr_get(env, dt_obj, &tbuf,
+ XATTR_NAME_LINK);
+ if (rc > 0) {
+ struct linkea_data tdata = { 0 };
+
+ tdata.ld_buf = &tbuf;
+ rc = linkea_init(&tdata);
+ if (rc || leh->leh_overflow_time !=
+ tdata.ld_leh->leh_overflow_time)
+ update = true;
+ } else {
+ /* Update the timestamp by force if
+ * fail to load the old linkEA. */
+ update = true;
+ }
+
+ lu_buf_free(&tbuf);
+ if (update) {
+ leh->leh_overflow_time = ktime_get_real_seconds();
+ if (unlikely(!leh->leh_overflow_time))
+ leh->leh_overflow_time++;
+ }
+ }
+ } else {
+ linkea = false;
+ }
- dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
- rc = dt_xattr_set(env, dt_obj, &arg->u.xattr_set.buf,
- arg->u.xattr_set.name, arg->u.xattr_set.flags,
- th);
- /**
- * Ignore errors if this is LINK EA
- **/
- if (unlikely(rc != 0 &&
- strcmp(arg->u.xattr_set.name, XATTR_NAME_LINK) == 0)) {
- /* XXX: If the linkEA is overflow, then we need to notify the
- * namespace LFSCK to skip "nlink" attribute verification
- * on this object to avoid the "nlink" to be shrinked by
- * wrong. It may be not good an interaction with LFSCK
- * like this. We will consider to replace it with other
- * mechanism in future. LU-5802. */
- if (rc == -ENOSPC && arg->reply != NULL) {
- struct lfsck_request *lr = &tgt_th_info(env)->tti_lr;
-
- lfsck_pack_rfa(lr, lu_object_fid(&dt_obj->do_lu),
- LE_SKIP_NLINK, LFSCK_TYPE_NAMESPACE);
- tgt_lfsck_in_notify(env,
- tgt_ses_info(env)->tsi_tgt->lut_bottom, lr, th);
+ dt_write_lock(env, dt_obj, MOR_TGT_CHILD);
+
+again:
+ rc = dt_xattr_set(env, dt_obj, ldata.ld_buf,
+ arg->u.xattr_set.name, arg->u.xattr_set.flags,
+ th);
+ if (unlikely(rc == -ENOSPC && linkea)) {
+ rc = linkea_overflow_shrink(&ldata);
+ if (likely(rc > 0)) {
+ arg->u.xattr_set.buf.lb_len = rc;
+ goto again;
+ }
}
- rc = 0;
+unlock:
+ dt_write_unlock(env, dt_obj);
}
- dt_write_unlock(env, dt_obj);
+
+ GOTO(out, rc);
out:
CDEBUG(D_INFO, "%s: insert xattr set reply %p index %d: rc = %d\n",
if (rc != 0)
return rc;
- if (strcmp(name, XATTR_NAME_LINK) == 0 && reply != NULL) {
- struct lfsck_request *lr = &tgt_th_info(env)->tti_lr;
-
- /* XXX: If the linkEA is overflow, then we need to notify the
- * namespace LFSCK to skip "nlink" attribute verification
- * on this object to avoid the "nlink" to be shrinked by
- * wrong. It may be not good an interaction with LFSCK
- * like this. We will consider to replace it with other
- * mechanism in future. LU-5802. */
- lfsck_pack_rfa(lr, lu_object_fid(&dt_obj->do_lu),
- LE_SKIP_NLINK_DECLARE, LFSCK_TYPE_NAMESPACE);
- rc = tgt_lfsck_in_notify(env,
- tgt_ses_info(env)->tsi_tgt->lut_bottom,
- lr, ta->ta_handle);
- if (rc != 0)
- return rc;
- }
-
arg = tx_add_exec(ta, out_tx_xattr_set_exec, NULL, file, line);
if (IS_ERR(arg))
return PTR_ERR(arg);