X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Ftarget%2Fout_lib.c;h=cf0cb2f281210b1d7eaba84d5405523a66f1f3d4;hp=cdddbbca30c562d022f64035bc325dd44d971fa1;hb=f762acebfcc6a88c3f4ba6296cbd6f1696bff530;hpb=d059b3c01a9784b8da61ea99fb746e949d681b7e diff --git a/lustre/target/out_lib.c b/lustre/target/out_lib.c index cdddbbc..cf0cb2f 100644 --- a/lustre/target/out_lib.c +++ b/lustre/target/out_lib.c @@ -20,7 +20,7 @@ * GPL HEADER END */ /* - * Copyright (c) 2014, 2015, Intel Corporation. + * Copyright (c) 2014, 2017, Intel Corporation. */ /* * lustre/target/out_lib.c @@ -32,10 +32,13 @@ #define DEBUG_SUBSYSTEM S_CLASS #include -#include +#include #include +#include #include #include +#include + #include "tgt_internal.h" const char *update_op_str(__u16 opc) @@ -50,6 +53,7 @@ const char *update_op_str(__u16 opc) [OUT_ATTR_GET] = "attr_get", [OUT_XATTR_SET] = "xattr_set", [OUT_XATTR_GET] = "xattr_get", + [OUT_XATTR_LIST] = "xattr_list", [OUT_INDEX_LOOKUP] = "lookup", [OUT_INDEX_INSERT] = "insert", [OUT_INDEX_DELETE] = "delete", @@ -99,7 +103,7 @@ int out_update_header_pack(const struct lu_env *env, unsigned int i; size_t update_size; - if (((reply_size + 7) >> 3) >= 1ULL << 16) + if (reply_size >= LNET_MTU) return -EINVAL; /* Check whether the packing exceeding the maxima update length */ @@ -212,7 +216,6 @@ int out_create_pack(const struct lu_env *env, struct object_update *update, obdo->o_valid = 0; obdo_from_la(obdo, attr, attr->la_valid); - lustre_set_wire_obdo(NULL, obdo, obdo); if (parent_fid != NULL) { struct lu_fid *tmp; @@ -264,7 +267,6 @@ int out_attr_set_pack(const struct lu_env *env, struct object_update *update, obdo->o_valid = 0; obdo_from_la(obdo, attr, attr->la_valid); - lustre_set_wire_obdo(NULL, obdo, obdo); RETURN(0); } @@ -329,14 +331,13 @@ int out_index_delete_pack(const struct lu_env *env, } EXPORT_SYMBOL(out_index_delete_pack); -int out_object_destroy_pack(const struct lu_env *env, - struct object_update *update, - size_t *max_update_size, const struct lu_fid *fid) +int out_destroy_pack(const struct lu_env *env, struct object_update *update, + size_t *max_update_size, const struct lu_fid *fid) { return out_update_pack(env, update, max_update_size, OUT_DESTROY, fid, 0, NULL, NULL, 0); } -EXPORT_SYMBOL(out_object_destroy_pack); +EXPORT_SYMBOL(out_destroy_pack); int out_write_pack(const struct lu_env *env, struct object_update *update, size_t *max_update_size, const struct lu_fid *fid, @@ -404,6 +405,15 @@ int out_xattr_get_pack(const struct lu_env *env, struct object_update *update, } EXPORT_SYMBOL(out_xattr_get_pack); +int out_xattr_list_pack(const struct lu_env *env, struct object_update *update, + size_t *max_update_size, const struct lu_fid *fid, + const int bufsize) +{ + return out_update_pack(env, update, max_update_size, OUT_XATTR_LIST, + fid, 0, NULL, NULL, bufsize); +} +EXPORT_SYMBOL(out_xattr_list_pack); + int out_read_pack(const struct lu_env *env, struct object_update *update, size_t *max_update_size, const struct lu_fid *fid, size_t size, loff_t pos) @@ -429,7 +439,7 @@ static int tx_extend_args(struct thandle_exec_args *ta, int new_alloc_ta) if (ta->ta_alloc_args >= new_alloc_ta) return 0; - OBD_ALLOC(new_ta, sizeof(*new_ta) * new_alloc_ta); + OBD_ALLOC_PTR_ARRAY(new_ta, new_alloc_ta); if (new_ta == NULL) return -ENOMEM; @@ -446,8 +456,7 @@ static int tx_extend_args(struct thandle_exec_args *ta, int new_alloc_ta) /* free the old args */ if (ta->ta_args != NULL) - OBD_FREE(ta->ta_args, sizeof(ta->ta_args[0]) * - ta->ta_alloc_args); + OBD_FREE_PTR_ARRAY(ta->ta_args, ta->ta_alloc_args); ta->ta_args = new_ta; ta->ta_alloc_args = new_alloc_ta; @@ -457,7 +466,7 @@ out: if (new_ta[i] != NULL) OBD_FREE_PTR(new_ta[i]); } - OBD_FREE(new_ta, sizeof(*new_ta) * new_alloc_ta); + OBD_FREE_PTR_ARRAY(new_ta, new_alloc_ta); } return rc; } @@ -499,7 +508,7 @@ static int out_obj_destroy(const struct lu_env *env, struct dt_object *dt_obj, CDEBUG(D_INFO, "%s: destroy "DFID"\n", dt_obd_name(th->th_dev), PFID(lu_object_fid(&dt_obj->do_lu))); - dt_write_lock(env, dt_obj, MOR_TGT_CHILD); + dt_write_lock(env, dt_obj, DT_TGT_CHILD); rc = dt_destroy(env, dt_obj, th); dt_write_unlock(env, dt_obj); @@ -537,7 +546,7 @@ int out_tx_create_exec(const struct lu_env *env, struct thandle *th, arg->u.create.dof.dof_type, arg->u.create.attr.la_mode & S_IFMT); - dt_write_lock(env, dt_obj, MOR_TGT_CHILD); + dt_write_lock(env, dt_obj, DT_TGT_CHILD); rc = dt_create(env, dt_obj, &arg->u.create.attr, &arg->u.create.hint, &arg->u.create.dof, th); @@ -588,6 +597,10 @@ int out_create_add_exec(const struct lu_env *env, struct dt_object *obj, struct tx_arg *arg; int rc; + /* LU-13653: ignore quota for DNE directory creation */ + if (dof->dof_type == DFT_DIR) + th->th_ignore_quota = 1; + rc = dt_declare_create(env, obj, attr, NULL, dof, th); if (rc != 0) return rc; @@ -630,7 +643,7 @@ static int out_tx_attr_set_exec(const struct lu_env *env, struct thandle *th, CDEBUG(D_OTHER, "%s: attr set "DFID"\n", dt_obd_name(th->th_dev), PFID(lu_object_fid(&dt_obj->do_lu))); - dt_write_lock(env, dt_obj, MOR_TGT_CHILD); + dt_write_lock(env, dt_obj, DT_TGT_CHILD); rc = dt_attr_set(env, dt_obj, &arg->u.attr_set.attr, th); dt_write_unlock(env, dt_obj); @@ -657,6 +670,10 @@ int out_attr_set_add_exec(const struct lu_env *env, struct dt_object *dt_obj, if (rc != 0) return rc; + if (attr->la_valid & LA_FLAGS && + attr->la_flags & LUSTRE_SET_SYNC_FL) + th->th_sync |= 1; + arg = tx_add_exec(ta, out_tx_attr_set_exec, out_tx_attr_set_undo, file, line); if (IS_ERR(arg)) @@ -676,14 +693,14 @@ static int out_tx_write_exec(const struct lu_env *env, struct thandle *th, struct dt_object *dt_obj = arg->object; int rc; - CDEBUG(D_INFO, "write "DFID" pos "LPU64" buf %p, len %lu\n", + CDEBUG(D_INFO, "write "DFID" pos %llu buf %p, len %lu\n", PFID(lu_object_fid(&dt_obj->do_lu)), arg->u.write.pos, arg->u.write.buf.lb_buf, (unsigned long)arg->u.write.buf.lb_len); if (OBD_FAIL_CHECK(OBD_FAIL_OUT_ENOSPC)) { rc = -ENOSPC; } else { - dt_write_lock(env, dt_obj, MOR_TGT_CHILD); + dt_write_lock(env, dt_obj, DT_TGT_CHILD); rc = dt_record_write(env, dt_obj, &arg->u.write.buf, &arg->u.write.pos, th); dt_write_unlock(env, dt_obj); @@ -694,7 +711,7 @@ static int out_tx_write_exec(const struct lu_env *env, struct thandle *th, if (arg->reply != NULL) object_update_result_insert(arg->reply, NULL, 0, arg->index, - rc); + rc < 0 ? rc : 0); return rc > 0 ? 0 : rc; } @@ -731,41 +748,100 @@ static int out_tx_xattr_set_exec(const struct lu_env *env, { struct dt_object *dt_obj = arg->object; int rc; + ENTRY; CDEBUG(D_INFO, "%s: set xattr buf %p name %s flag %d\n", dt_obd_name(th->th_dev), arg->u.xattr_set.buf.lb_buf, arg->u.xattr_set.name, arg->u.xattr_set.flags); - if (!lu_object_exists(&dt_obj->do_lu)) - GOTO(out, rc = -ENOENT); + if (!lu_object_exists(&dt_obj->do_lu)) { + rc = -ENOENT; + } else { + struct linkea_data ldata = { 0 }; + bool linkea; + + ldata.ld_buf = &arg->u.xattr_set.buf; + if (strcmp(arg->u.xattr_set.name, XATTR_NAME_LINK) == 0) { + struct link_ea_header *leh; + + linkea = true; + rc = linkea_init(&ldata); + if (unlikely(rc)) + GOTO(out, rc == -ENODATA ? -EINVAL : rc); + + leh = ldata.ld_leh; + LASSERT(leh != NULL); + + /* If the new linkEA contains overflow timestamp, + * then two cases: + * + * 1. The old linkEA for the object has already + * overflowed before current setting, the new + * linkEA does not contains new link entry. So + * the linkEA overflow timestamp is unchanged. + * + * 2. There are new link entry in the new linkEA, + * so its overflow timestamp is differnt from + * the old one. Usually, the overstamp in the + * given linkEA is newer. But because of clock + * drift among MDTs, the timestamp may become + * older. So here, we convert the timestamp to + * the server local time. Then namespace LFSCK + * that uses local time can handle it easily. */ + if (unlikely(leh->leh_overflow_time)) { + struct lu_buf tbuf = { 0 }; + bool update = false; + + lu_buf_alloc(&tbuf, MAX_LINKEA_SIZE); + if (tbuf.lb_buf == NULL) + GOTO(unlock, rc = -ENOMEM); + + rc = dt_xattr_get(env, dt_obj, &tbuf, + XATTR_NAME_LINK); + if (rc > 0) { + struct linkea_data tdata = { 0 }; + + tdata.ld_buf = &tbuf; + rc = linkea_init(&tdata); + if (rc || leh->leh_overflow_time != + tdata.ld_leh->leh_overflow_time) + update = true; + } else { + /* Update the timestamp by force if + * fail to load the old linkEA. */ + update = true; + } + + lu_buf_free(&tbuf); + if (update) { + leh->leh_overflow_time = ktime_get_real_seconds(); + if (unlikely(!leh->leh_overflow_time)) + leh->leh_overflow_time++; + } + } + } else { + linkea = false; + } - dt_write_lock(env, dt_obj, MOR_TGT_CHILD); - rc = dt_xattr_set(env, dt_obj, &arg->u.xattr_set.buf, - arg->u.xattr_set.name, arg->u.xattr_set.flags, - th); - /** - * Ignore errors if this is LINK EA - **/ - if (unlikely(rc != 0 && - strcmp(arg->u.xattr_set.name, XATTR_NAME_LINK) == 0)) { - /* XXX: If the linkEA is overflow, then we need to notify the - * namespace LFSCK to skip "nlink" attribute verification - * on this object to avoid the "nlink" to be shrinked by - * wrong. It may be not good an interaction with LFSCK - * like this. We will consider to replace it with other - * mechanism in future. LU-5802. */ - if (rc == -ENOSPC && arg->reply != NULL) { - struct lfsck_request *lr = &tgt_th_info(env)->tti_lr; - - lfsck_pack_rfa(lr, lu_object_fid(&dt_obj->do_lu), - LE_SKIP_NLINK, LFSCK_TYPE_NAMESPACE); - tgt_lfsck_in_notify(env, - tgt_ses_info(env)->tsi_tgt->lut_bottom, lr, th); + dt_write_lock(env, dt_obj, DT_TGT_CHILD); + +again: + rc = dt_xattr_set(env, dt_obj, ldata.ld_buf, + arg->u.xattr_set.name, arg->u.xattr_set.flags, + th); + if (unlikely(rc == -ENOSPC && linkea)) { + rc = linkea_overflow_shrink(&ldata); + if (likely(rc > 0)) { + arg->u.xattr_set.buf.lb_len = rc; + goto again; + } } - rc = 0; +unlock: + dt_write_unlock(env, dt_obj); } - dt_write_unlock(env, dt_obj); + + GOTO(out, rc); out: CDEBUG(D_INFO, "%s: insert xattr set reply %p index %d: rc = %d\n", @@ -792,24 +868,6 @@ int out_xattr_set_add_exec(const struct lu_env *env, struct dt_object *dt_obj, if (rc != 0) return rc; - if (strcmp(name, XATTR_NAME_LINK) == 0 && reply != NULL) { - struct lfsck_request *lr = &tgt_th_info(env)->tti_lr; - - /* XXX: If the linkEA is overflow, then we need to notify the - * namespace LFSCK to skip "nlink" attribute verification - * on this object to avoid the "nlink" to be shrinked by - * wrong. It may be not good an interaction with LFSCK - * like this. We will consider to replace it with other - * mechanism in future. LU-5802. */ - lfsck_pack_rfa(lr, lu_object_fid(&dt_obj->do_lu), - LE_SKIP_NLINK_DECLARE, LFSCK_TYPE_NAMESPACE); - rc = tgt_lfsck_in_notify(env, - tgt_ses_info(env)->tsi_tgt->lut_bottom, - lr, ta->ta_handle); - if (rc != 0) - return rc; - } - arg = tx_add_exec(ta, out_tx_xattr_set_exec, NULL, file, line); if (IS_ERR(arg)) return PTR_ERR(arg); @@ -838,7 +896,7 @@ static int out_tx_xattr_del_exec(const struct lu_env *env, struct thandle *th, if (!lu_object_exists(&dt_obj->do_lu)) GOTO(out, rc = -ENOENT); - dt_write_lock(env, dt_obj, MOR_TGT_CHILD); + dt_write_lock(env, dt_obj, DT_TGT_CHILD); rc = dt_xattr_del(env, dt_obj, arg->u.xattr_set.name, th); dt_write_unlock(env, dt_obj); @@ -884,7 +942,7 @@ static int out_obj_ref_add(const struct lu_env *env, { int rc; - dt_write_lock(env, dt_obj, MOR_TGT_CHILD); + dt_write_lock(env, dt_obj, DT_TGT_CHILD); rc = dt_ref_add(env, dt_obj, th); dt_write_unlock(env, dt_obj); @@ -897,7 +955,7 @@ static int out_obj_ref_del(const struct lu_env *env, { int rc; - dt_write_lock(env, dt_obj, MOR_TGT_CHILD); + dt_write_lock(env, dt_obj, DT_TGT_CHILD); rc = dt_ref_del(env, dt_obj, th); dt_write_unlock(env, dt_obj); @@ -1017,8 +1075,8 @@ static int out_obj_index_insert(const struct lu_env *env, if (dt_try_as_dir(env, dt_obj) == 0) return -ENOTDIR; - dt_write_lock(env, dt_obj, MOR_TGT_CHILD); - rc = dt_insert(env, dt_obj, rec, key, th, 0); + dt_write_lock(env, dt_obj, DT_TGT_CHILD); + rc = dt_insert(env, dt_obj, rec, key, th); dt_write_unlock(env, dt_obj); return rc; @@ -1038,7 +1096,7 @@ static int out_obj_index_delete(const struct lu_env *env, if (dt_try_as_dir(env, dt_obj) == 0) return -ENOTDIR; - dt_write_lock(env, dt_obj, MOR_TGT_CHILD); + dt_write_lock(env, dt_obj, DT_TGT_CHILD); rc = dt_delete(env, dt_obj, key, th); dt_write_unlock(env, dt_obj);