X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Ftarget%2Fout_lib.c;h=c486f3f00e5312a44e0fe8d26b0683475bb813c6;hb=c4ff984dc55b29d810ee996a0d30ebd817c09df4;hp=0c2c95b73007d3c1ba0877537c951f0f44ddaaa8;hpb=4f53536d002c13886210b672b657795baa067144;p=fs%2Flustre-release.git diff --git a/lustre/target/out_lib.c b/lustre/target/out_lib.c index 0c2c95b..c486f3f 100644 --- a/lustre/target/out_lib.c +++ b/lustre/target/out_lib.c @@ -20,7 +20,7 @@ * GPL HEADER END */ /* - * Copyright (c) 2014, Intel Corporation. + * Copyright (c) 2014, 2015, Intel Corporation. */ /* * lustre/target/out_lib.c @@ -32,14 +32,14 @@ #define DEBUG_SUBSYSTEM S_CLASS #include -#include +#include #include +#include #include #include -#include "tgt_internal.h" +#include -#define OUT_UPDATE_BUFFER_SIZE_ADD 4096 -#define OUT_UPDATE_BUFFER_SIZE_MAX (256 * 4096) /* 1MB update size now */ +#include "tgt_internal.h" const char *update_op_str(__u16 opc) { @@ -60,6 +60,7 @@ const char *update_op_str(__u16 opc) [OUT_XATTR_DEL] = "xattr_del", [OUT_PUNCH] = "punch", [OUT_READ] = "read", + [OUT_NOOP] = "noop", }; if (opc < ARRAY_SIZE(opc_str) && opc_str[opc] != NULL) @@ -77,40 +78,50 @@ EXPORT_SYMBOL(update_op_str); * * \params[in] env execution environment * \params[in] update object update to be filled - * \params[in] max_update_size maximum object update size, if the current - * update length equals or exceeds the size, it - * will return -E2BIG. + * \params[in,out] max_update_size maximum object update size, if the + * current update length equals or + * exceeds the size, it will return -E2BIG. * \params[in] update_op update type * \params[in] fid object FID of the update - * \params[in] params_count the count of the update parameters - * \params[in] params_sizes the length of each parameters + * \params[in] param_count the count of the update parameters + * \params[in] param_sizes the length of each parameters * * \retval 0 if packing succeeds. * \retval -E2BIG if packing exceeds the maximum length. */ int out_update_header_pack(const struct lu_env *env, - struct object_update *update, size_t max_update_size, - enum update_type update_op, const struct lu_fid *fid, - unsigned int param_count, __u16 *params_sizes) + struct object_update *update, + size_t *max_update_size, + enum update_type update_op, + const struct lu_fid *fid, + unsigned int param_count, + __u16 *param_sizes, + __u32 reply_size) { struct object_update_param *param; unsigned int i; size_t update_size; + if (((reply_size + 7) >> 3) >= 1ULL << 16) + return -EINVAL; + /* Check whether the packing exceeding the maxima update length */ update_size = sizeof(*update); for (i = 0; i < param_count; i++) - update_size += cfs_size_round(sizeof(*param) + params_sizes[i]); + update_size += cfs_size_round(sizeof(*param) + param_sizes[i]); - if (unlikely(update_size >= max_update_size)) + if (unlikely(update_size >= *max_update_size)) { + *max_update_size = update_size; return -E2BIG; + } update->ou_fid = *fid; update->ou_type = update_op; update->ou_params_count = param_count; + update->ou_result_size = reply_size; param = &update->ou_params[0]; for (i = 0; i < param_count; i++) { - param->oup_len = params_sizes[i]; + param->oup_len = param_sizes[i]; param = (struct object_update_param *)((char *)param + object_update_param_size(param)); } @@ -134,9 +145,10 @@ int out_update_header_pack(const struct lu_env *env, * \retval negative errno if updates packing fails **/ int out_update_pack(const struct lu_env *env, struct object_update *update, - size_t max_update_size, enum update_type op, + size_t *max_update_size, enum update_type op, const struct lu_fid *fid, unsigned int param_count, - __u16 *param_sizes, const void **param_bufs) + __u16 *param_sizes, const void **param_bufs, + __u32 reply_size) { struct object_update_param *param; unsigned int i; @@ -144,7 +156,7 @@ int out_update_pack(const struct lu_env *env, struct object_update *update, ENTRY; rc = out_update_header_pack(env, update, max_update_size, op, fid, - param_count, param_sizes); + param_count, param_sizes, reply_size); if (rc != 0) RETURN(rc); @@ -175,7 +187,7 @@ EXPORT_SYMBOL(out_update_pack); * \retval negative errno if insertion fails. */ int out_create_pack(const struct lu_env *env, struct object_update *update, - size_t max_update_size, const struct lu_fid *fid, + size_t *max_update_size, const struct lu_fid *fid, const struct lu_attr *attr, struct dt_allocation_hint *hint, struct dt_object_format *dof) { @@ -193,21 +205,24 @@ int out_create_pack(const struct lu_env *env, struct object_update *update, } rc = out_update_header_pack(env, update, max_update_size, OUT_CREATE, - fid, buf_count, sizes); + fid, buf_count, sizes, 0); if (rc != 0) RETURN(rc); obdo = object_update_param_get(update, 0, NULL); - LASSERT(obdo != NULL); + if (IS_ERR(obdo)) + RETURN(PTR_ERR(obdo)); + obdo->o_valid = 0; obdo_from_la(obdo, attr, attr->la_valid); - lustre_set_wire_obdo(NULL, obdo, obdo); if (parent_fid != NULL) { struct lu_fid *tmp; tmp = object_update_param_get(update, 1, NULL); - LASSERT(tmp != NULL); + if (IS_ERR(tmp)) + RETURN(PTR_ERR(tmp)); + fid_cpu_to_le(tmp, parent_fid); } @@ -216,23 +231,23 @@ int out_create_pack(const struct lu_env *env, struct object_update *update, EXPORT_SYMBOL(out_create_pack); int out_ref_del_pack(const struct lu_env *env, struct object_update *update, - size_t max_update_size, const struct lu_fid *fid) + size_t *max_update_size, const struct lu_fid *fid) { return out_update_pack(env, update, max_update_size, OUT_REF_DEL, fid, - 0, NULL, NULL); + 0, NULL, NULL, 0); } EXPORT_SYMBOL(out_ref_del_pack); int out_ref_add_pack(const struct lu_env *env, struct object_update *update, - size_t max_update_size, const struct lu_fid *fid) + size_t *max_update_size, const struct lu_fid *fid) { return out_update_pack(env, update, max_update_size, OUT_REF_ADD, fid, - 0, NULL, NULL); + 0, NULL, NULL, 0); } EXPORT_SYMBOL(out_ref_add_pack); int out_attr_set_pack(const struct lu_env *env, struct object_update *update, - size_t max_update_size, const struct lu_fid *fid, + size_t *max_update_size, const struct lu_fid *fid, const struct lu_attr *attr) { struct obdo *obdo; @@ -241,22 +256,23 @@ int out_attr_set_pack(const struct lu_env *env, struct object_update *update, ENTRY; rc = out_update_header_pack(env, update, max_update_size, - OUT_ATTR_SET, fid, 1, &size); + OUT_ATTR_SET, fid, 1, &size, 0); if (rc != 0) RETURN(rc); obdo = object_update_param_get(update, 0, NULL); - LASSERT(obdo != NULL); + if (IS_ERR(obdo)) + RETURN(PTR_ERR(obdo)); + obdo->o_valid = 0; obdo_from_la(obdo, attr, attr->la_valid); - lustre_set_wire_obdo(NULL, obdo, obdo); RETURN(0); } EXPORT_SYMBOL(out_attr_set_pack); int out_xattr_set_pack(const struct lu_env *env, struct object_update *update, - size_t max_update_size, const struct lu_fid *fid, + size_t *max_update_size, const struct lu_fid *fid, const struct lu_buf *buf, const char *name, __u32 flag) { __u16 sizes[3] = {strlen(name) + 1, buf->lb_len, sizeof(flag)}; @@ -264,25 +280,24 @@ int out_xattr_set_pack(const struct lu_env *env, struct object_update *update, (char *)&flag}; return out_update_pack(env, update, max_update_size, OUT_XATTR_SET, - fid, ARRAY_SIZE(sizes), sizes, bufs); + fid, ARRAY_SIZE(sizes), sizes, bufs, 0); } EXPORT_SYMBOL(out_xattr_set_pack); int out_xattr_del_pack(const struct lu_env *env, struct object_update *update, - size_t max_update_size, const struct lu_fid *fid, + size_t *max_update_size, const struct lu_fid *fid, const char *name) { __u16 size = strlen(name) + 1; return out_update_pack(env, update, max_update_size, OUT_XATTR_DEL, - fid, 1, &size, (const void **)&name); + fid, 1, &size, (const void **)&name, 0); } EXPORT_SYMBOL(out_xattr_del_pack); - int out_index_insert_pack(const struct lu_env *env, struct object_update *update, - size_t max_update_size, const struct lu_fid *fid, + size_t *max_update_size, const struct lu_fid *fid, const struct dt_rec *rec, const struct dt_key *key) { struct dt_insert_rec *rec1 = (struct dt_insert_rec *)rec; @@ -298,34 +313,33 @@ int out_index_insert_pack(const struct lu_env *env, fid_cpu_to_le(&rec_fid, rec1->rec_fid); return out_update_pack(env, update, max_update_size, OUT_INDEX_INSERT, - fid, ARRAY_SIZE(sizes), sizes, bufs); + fid, ARRAY_SIZE(sizes), sizes, bufs, 0); } EXPORT_SYMBOL(out_index_insert_pack); int out_index_delete_pack(const struct lu_env *env, struct object_update *update, - size_t max_update_size, const struct lu_fid *fid, + size_t *max_update_size, const struct lu_fid *fid, const struct dt_key *key) { __u16 size = strlen((char *)key) + 1; const void *buf = key; return out_update_pack(env, update, max_update_size, OUT_INDEX_DELETE, - fid, 1, &size, &buf); + fid, 1, &size, &buf, 0); } EXPORT_SYMBOL(out_index_delete_pack); -int out_object_destroy_pack(const struct lu_env *env, - struct object_update *update, - size_t max_update_size, const struct lu_fid *fid) +int out_destroy_pack(const struct lu_env *env, struct object_update *update, + size_t *max_update_size, const struct lu_fid *fid) { return out_update_pack(env, update, max_update_size, OUT_DESTROY, fid, - 0, NULL, NULL); + 0, NULL, NULL, 0); } -EXPORT_SYMBOL(out_object_destroy_pack); +EXPORT_SYMBOL(out_destroy_pack); int out_write_pack(const struct lu_env *env, struct object_update *update, - size_t max_update_size, const struct lu_fid *fid, + size_t *max_update_size, const struct lu_fid *fid, const struct lu_buf *buf, __u64 pos) { __u16 sizes[2] = {buf->lb_len, sizeof(pos)}; @@ -335,7 +349,7 @@ int out_write_pack(const struct lu_env *env, struct object_update *update, pos = cpu_to_le64(pos); rc = out_update_pack(env, update, max_update_size, OUT_WRITE, fid, - ARRAY_SIZE(sizes), sizes, bufs); + ARRAY_SIZE(sizes), sizes, bufs, 0); return rc; } EXPORT_SYMBOL(out_write_pack); @@ -356,28 +370,29 @@ EXPORT_SYMBOL(out_write_pack); **/ int out_index_lookup_pack(const struct lu_env *env, struct object_update *update, - size_t max_update_size, const struct lu_fid *fid, + size_t *max_update_size, const struct lu_fid *fid, struct dt_rec *rec, const struct dt_key *key) { const void *name = key; __u16 size = strlen((char *)name) + 1; + /* XXX: this shouldn't be hardcoded */ return out_update_pack(env, update, max_update_size, OUT_INDEX_LOOKUP, - fid, 1, &size, &name); + fid, 1, &size, &name, 256); } EXPORT_SYMBOL(out_index_lookup_pack); int out_attr_get_pack(const struct lu_env *env, struct object_update *update, - size_t max_update_size, const struct lu_fid *fid) + size_t *max_update_size, const struct lu_fid *fid) { return out_update_pack(env, update, max_update_size, OUT_ATTR_GET, - fid, 0, NULL, NULL); + fid, 0, NULL, NULL, sizeof(struct obdo)); } EXPORT_SYMBOL(out_attr_get_pack); int out_xattr_get_pack(const struct lu_env *env, struct object_update *update, - size_t max_update_size, const struct lu_fid *fid, - const char *name) + size_t *max_update_size, const struct lu_fid *fid, + const char *name, const int bufsize) { __u16 size; @@ -385,22 +400,23 @@ int out_xattr_get_pack(const struct lu_env *env, struct object_update *update, size = strlen(name) + 1; return out_update_pack(env, update, max_update_size, OUT_XATTR_GET, - fid, 1, &size, (const void **)&name); + fid, 1, &size, (const void **)&name, bufsize); } EXPORT_SYMBOL(out_xattr_get_pack); int out_read_pack(const struct lu_env *env, struct object_update *update, - size_t max_update_length, const struct lu_fid *fid, + size_t *max_update_size, const struct lu_fid *fid, size_t size, loff_t pos) { __u16 sizes[2] = {sizeof(size), sizeof(pos)}; const void *bufs[2] = {&size, &pos}; + LASSERT(size > 0); size = cpu_to_le64(size); pos = cpu_to_le64(pos); - return out_update_pack(env, update, max_update_length, OUT_READ, fid, - ARRAY_SIZE(sizes), sizes, bufs); + return out_update_pack(env, update, max_update_size, OUT_READ, fid, + ARRAY_SIZE(sizes), sizes, bufs, size); } EXPORT_SYMBOL(out_read_pack); @@ -660,17 +676,21 @@ static int out_tx_write_exec(const struct lu_env *env, struct thandle *th, struct dt_object *dt_obj = arg->object; int rc; - CDEBUG(D_INFO, "write "DFID" pos "LPU64" buf %p, len %lu\n", + CDEBUG(D_INFO, "write "DFID" pos %llu buf %p, len %lu\n", PFID(lu_object_fid(&dt_obj->do_lu)), arg->u.write.pos, arg->u.write.buf.lb_buf, (unsigned long)arg->u.write.buf.lb_len); - dt_write_lock(env, dt_obj, MOR_TGT_CHILD); - rc = dt_record_write(env, dt_obj, &arg->u.write.buf, - &arg->u.write.pos, th); - dt_write_unlock(env, dt_obj); + if (OBD_FAIL_CHECK(OBD_FAIL_OUT_ENOSPC)) { + rc = -ENOSPC; + } else { + dt_write_lock(env, dt_obj, MOR_TGT_CHILD); + rc = dt_record_write(env, dt_obj, &arg->u.write.buf, + &arg->u.write.pos, th); + dt_write_unlock(env, dt_obj); - if (rc == 0) - rc = arg->u.write.buf.lb_len; + if (rc == 0) + rc = arg->u.write.buf.lb_len; + } if (arg->reply != NULL) object_update_result_insert(arg->reply, NULL, 0, arg->index, @@ -711,41 +731,100 @@ static int out_tx_xattr_set_exec(const struct lu_env *env, { struct dt_object *dt_obj = arg->object; int rc; + ENTRY; CDEBUG(D_INFO, "%s: set xattr buf %p name %s flag %d\n", dt_obd_name(th->th_dev), arg->u.xattr_set.buf.lb_buf, arg->u.xattr_set.name, arg->u.xattr_set.flags); - if (!lu_object_exists(&dt_obj->do_lu)) - GOTO(out, rc = -ENOENT); + if (!lu_object_exists(&dt_obj->do_lu)) { + rc = -ENOENT; + } else { + struct linkea_data ldata = { 0 }; + bool linkea; + + ldata.ld_buf = &arg->u.xattr_set.buf; + if (strcmp(arg->u.xattr_set.name, XATTR_NAME_LINK) == 0) { + struct link_ea_header *leh; + + linkea = true; + rc = linkea_init(&ldata); + if (unlikely(rc)) + GOTO(out, rc == -ENODATA ? -EINVAL : rc); + + leh = ldata.ld_leh; + LASSERT(leh != NULL); + + /* If the new linkEA contains overflow timestamp, + * then two cases: + * + * 1. The old linkEA for the object has already + * overflowed before current setting, the new + * linkEA does not contains new link entry. So + * the linkEA overflow timestamp is unchanged. + * + * 2. There are new link entry in the new linkEA, + * so its overflow timestamp is differnt from + * the old one. Usually, the overstamp in the + * given linkEA is newer. But because of clock + * drift among MDTs, the timestamp may become + * older. So here, we convert the timestamp to + * the server local time. Then namespace LFSCK + * that uses local time can handle it easily. */ + if (unlikely(leh->leh_overflow_time)) { + struct lu_buf tbuf = { 0 }; + bool update = false; + + lu_buf_alloc(&tbuf, MAX_LINKEA_SIZE); + if (tbuf.lb_buf == NULL) + GOTO(unlock, rc = -ENOMEM); + + rc = dt_xattr_get(env, dt_obj, &tbuf, + XATTR_NAME_LINK); + if (rc > 0) { + struct linkea_data tdata = { 0 }; + + tdata.ld_buf = &tbuf; + rc = linkea_init(&tdata); + if (rc || leh->leh_overflow_time != + tdata.ld_leh->leh_overflow_time) + update = true; + } else { + /* Update the timestamp by force if + * fail to load the old linkEA. */ + update = true; + } + + lu_buf_free(&tbuf); + if (update) { + leh->leh_overflow_time = ktime_get_real_seconds(); + if (unlikely(!leh->leh_overflow_time)) + leh->leh_overflow_time++; + } + } + } else { + linkea = false; + } - dt_write_lock(env, dt_obj, MOR_TGT_CHILD); - rc = dt_xattr_set(env, dt_obj, &arg->u.xattr_set.buf, - arg->u.xattr_set.name, arg->u.xattr_set.flags, - th); - /** - * Ignore errors if this is LINK EA - **/ - if (unlikely(rc != 0 && - strcmp(arg->u.xattr_set.name, XATTR_NAME_LINK) == 0)) { - /* XXX: If the linkEA is overflow, then we need to notify the - * namespace LFSCK to skip "nlink" attribute verification - * on this object to avoid the "nlink" to be shrinked by - * wrong. It may be not good an interaction with LFSCK - * like this. We will consider to replace it with other - * mechanism in future. LU-5802. */ - if (rc == -ENOSPC && arg->reply != NULL) { - struct lfsck_request *lr = &tgt_th_info(env)->tti_lr; - - lfsck_pack_rfa(lr, lu_object_fid(&dt_obj->do_lu), - LE_SKIP_NLINK, LFSCK_TYPE_NAMESPACE); - tgt_lfsck_in_notify(env, - tgt_ses_info(env)->tsi_tgt->lut_bottom, lr, th); + dt_write_lock(env, dt_obj, MOR_TGT_CHILD); + +again: + rc = dt_xattr_set(env, dt_obj, ldata.ld_buf, + arg->u.xattr_set.name, arg->u.xattr_set.flags, + th); + if (unlikely(rc == -ENOSPC && linkea)) { + rc = linkea_overflow_shrink(&ldata); + if (likely(rc > 0)) { + arg->u.xattr_set.buf.lb_len = rc; + goto again; + } } - rc = 0; +unlock: + dt_write_unlock(env, dt_obj); } - dt_write_unlock(env, dt_obj); + + GOTO(out, rc); out: CDEBUG(D_INFO, "%s: insert xattr set reply %p index %d: rc = %d\n", @@ -772,24 +851,6 @@ int out_xattr_set_add_exec(const struct lu_env *env, struct dt_object *dt_obj, if (rc != 0) return rc; - if (strcmp(name, XATTR_NAME_LINK) == 0 && reply != NULL) { - struct lfsck_request *lr = &tgt_th_info(env)->tti_lr; - - /* XXX: If the linkEA is overflow, then we need to notify the - * namespace LFSCK to skip "nlink" attribute verification - * on this object to avoid the "nlink" to be shrinked by - * wrong. It may be not good an interaction with LFSCK - * like this. We will consider to replace it with other - * mechanism in future. LU-5802. */ - lfsck_pack_rfa(lr, lu_object_fid(&dt_obj->do_lu), - LE_SKIP_NLINK_DECLARE, LFSCK_TYPE_NAMESPACE); - rc = tgt_lfsck_in_notify(env, - tgt_ses_info(env)->tsi_tgt->lut_bottom, - lr, ta->ta_handle); - if (rc != 0) - return rc; - } - arg = tx_add_exec(ta, out_tx_xattr_set_exec, NULL, file, line); if (IS_ERR(arg)) return PTR_ERR(arg);