X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fmdd%2Fmdd_lov.c;h=69df68e806fb4d3db1314c10b15b054c62e5fd85;hb=01138321c7ce393c189a7ed11559c0938ce9f17e;hp=cb7a68bbc6de89d65ed8440c76787e9a70a79234;hpb=d387061f1d9a4c169b67567d51b6ac42a82de490;p=fs%2Flustre-release.git diff --git a/lustre/mdd/mdd_lov.c b/lustre/mdd/mdd_lov.c index cb7a68b..69df68e 100644 --- a/lustre/mdd/mdd_lov.c +++ b/lustre/mdd/mdd_lov.c @@ -1,31 +1,49 @@ /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * - * lustre/mdd/mdd_lov.c - * Lustre Metadata Server (mds) handling of striped file data + * GPL HEADER START * - * Copyright (C) 2001-2006 Cluster File Systems, Inc. - * Author: Peter Braam - * wangdi + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * - * This file is part of the Lustre file system, http://www.lustre.org - * Lustre is a trademark of Cluster File Systems, Inc. + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 only, + * as published by the Free Software Foundation. * - * You may have signed or agreed to another license before downloading - * this software. If so, you are bound by the terms and conditions - * of that agreement, and the following does not apply to you. See the - * LICENSE file included with this distribution for more information. + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License version 2 for more details (a copy is included + * in the LICENSE file that accompanied this code). * - * If you did not agree to a different license, then this copy of Lustre - * is open source software; you can redistribute it and/or modify it - * under the terms of version 2 of the GNU General Public License as - * published by the Free Software Foundation. + * You should have received a copy of the GNU General Public License + * version 2 along with this program; If not, see + * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf + * + * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, + * CA 95054 USA or visit www.sun.com if you need additional information or + * have any questions. + * + * GPL HEADER END + */ +/* + * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved. + * Use is subject to license terms. + * + * Copyright (c) 2011 Whamcloud, Inc. * - * In either case, Lustre is distributed in the hope that it will be - * useful, but WITHOUT ANY WARRANTY; without even the implied warranty - * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * license text for more details. */ +/* + * This file is part of Lustre, http://www.lustre.org/ + * Lustre is a trademark of Sun Microsystems, Inc. + * + * lustre/mdd/mdd_lov.c + * + * Lustre Metadata Server (mds) handling of striped file data + * + * Author: Peter Braam + * Author: wangdi + */ + #ifndef EXPORT_SYMTAB # define EXPORT_SYMTAB #endif @@ -45,7 +63,7 @@ #include "mdd_internal.h" static int mdd_notify(struct obd_device *host, struct obd_device *watched, - enum obd_notify_event ev, void *owner) + enum obd_notify_event ev, void *owner, void *data) { struct mdd_device *mdd = owner; int rc = 0; @@ -57,11 +75,19 @@ static int mdd_notify(struct obd_device *host, struct obd_device *watched, case OBD_NOTIFY_ACTIVE: case OBD_NOTIFY_SYNC: case OBD_NOTIFY_SYNC_NONBLOCK: - rc = md_do_upcall(NULL, &mdd->mdd_md_dev, MD_LOV_SYNC); + rc = md_do_upcall(NULL, &mdd->mdd_md_dev, + MD_LOV_SYNC, data); break; case OBD_NOTIFY_CONFIG: - rc = md_do_upcall(NULL, &mdd->mdd_md_dev, MD_LOV_CONFIG); + rc = md_do_upcall(NULL, &mdd->mdd_md_dev, + MD_LOV_CONFIG, data); break; +#ifdef HAVE_QUOTA_SUPPORT + case OBD_NOTIFY_QUOTA: + rc = md_do_upcall(NULL, &mdd->mdd_md_dev, + MD_LOV_QUOTA, data); + break; +#endif default: CDEBUG(D_INFO, "Unhandled notification %#x\n", ev); } @@ -82,7 +108,7 @@ int mdd_init_obd(const struct lu_env *env, struct mdd_device *mdd, struct obd_device *obd; ENTRY; - mds_id = mdd2lu_dev(mdd)->ld_site->ls_node_id; + mds_id = lu_site2md(mdd2lu_dev(mdd)->ld_site)->ms_node_id; name_size = strlen(MDD_OBD_NAME) + 35; uuid_size = strlen(MDD_OBD_UUID) + 35; @@ -95,11 +121,11 @@ int mdd_init_obd(const struct lu_env *env, struct mdd_device *mdd, if (!bufs) GOTO(cleanup_mem, rc = -ENOMEM); - snprintf(name, strlen(MDD_OBD_NAME) + 35, "%s-%s-%d", - MDD_OBD_NAME, dev, mds_id); + snprintf(name, strlen(MDD_OBD_NAME) + 35, "%s-%s", + MDD_OBD_NAME, dev); - snprintf(uuid, strlen(MDD_OBD_UUID) + 35, "%s-%s-%d", - MDD_OBD_UUID, dev, mds_id); + snprintf(uuid, strlen(MDD_OBD_UUID) + 35, "%s-%s", + MDD_OBD_UUID, dev); lustre_cfg_bufs_reset(bufs, name); lustre_cfg_bufs_set_string(bufs, 1, MDD_OBD_TYPE); @@ -122,15 +148,20 @@ int mdd_init_obd(const struct lu_env *env, struct mdd_device *mdd, LBUG(); } + cfs_spin_lock(&obd->obd_dev_lock); obd->obd_recovering = 1; + cfs_spin_unlock(&obd->obd_dev_lock); obd->u.mds.mds_id = mds_id; + obd->u.obt.obt_osd_properties.osd_max_ea_size = + mdd->mdd_dt_conf.ddp_max_ea_size; + rc = class_setup(obd, lcfg); if (rc) GOTO(class_detach, rc); /* * Add here for obd notify mechanism, when adding a new ost, the mds - * will notify this mdd. + * will notify this mdd. The mds will be used for quota also. */ obd->obd_upcall.onu_upcall = mdd_notify; obd->obd_upcall.onu_owner = mdd; @@ -170,7 +201,7 @@ int mdd_fini_obd(const struct lu_env *env, struct mdd_device *mdd, if (rc) GOTO(lcfg_cleanup, rc); mdd->mdd_obd_dev = NULL; - + EXIT; lcfg_cleanup: return rc; @@ -192,7 +223,7 @@ int mdd_get_md(const struct lu_env *env, struct mdd_object *obj, *md_size = 0; rc = 0; } else if (rc < 0) { - CERROR("Error %d reading eadata \n", rc); + CERROR("Error %d reading eadata - %d\n", rc, *md_size); } else { /* XXX: Convert lov EA but fixed after verification test. */ *md_size = rc; @@ -205,7 +236,7 @@ int mdd_get_md_locked(const struct lu_env *env, struct mdd_object *obj, void *md, int *md_size, const char *name) { int rc = 0; - mdd_read_lock(env, obj); + mdd_read_lock(env, obj, MOR_TGT_CHILD); rc = mdd_get_md(env, obj, md, md_size, name); mdd_read_unlock(env, obj); return rc; @@ -217,7 +248,7 @@ static int mdd_lov_set_stripe_md(const struct lu_env *env, { struct mdd_device *mdd = mdo2mdd(&obj->mod_obj); struct obd_device *obd = mdd2obd_dev(mdd); - struct obd_export *lov_exp = obd->u.mds.mds_osc_exp; + struct obd_export *lov_exp = obd->u.mds.mds_lov_exp; struct lov_stripe_md *lsm = NULL; int rc; ENTRY; @@ -229,7 +260,7 @@ static int mdd_lov_set_stripe_md(const struct lu_env *env, RETURN(rc); obd_free_memmd(lov_exp, &lsm); - rc = mdd_xattr_set_txn(env, obj, buf, MDS_LOV_MD_NAME, 0, handle); + rc = mdd_xattr_set_txn(env, obj, buf, XATTR_NAME_LOV, 0, handle); CDEBUG(D_INFO, "set lov ea of "DFID" rc %d \n", PFID(mdo2fid(obj)), rc); RETURN(rc); @@ -250,14 +281,13 @@ static int mdd_lov_set_dir_md(const struct lu_env *env, LASSERT(S_ISDIR(mdd_object_type(obj))); lum = (struct lov_user_md*)buf->lb_buf; - /* if { size, offset, count } = { 0, -1, 0 } (i.e. all default + /* if { size, offset, count } = { 0, -1, 0 } and no pool (i.e. all default * values specified) then delete default striping from dir. */ - if ((lum->lmm_stripe_size == 0 && lum->lmm_stripe_count == 0 && - lum->lmm_stripe_offset == (typeof(lum->lmm_stripe_offset))(-1)) || - /* lmm_stripe_size == -1 is deprecated in 1.4.6 */ - lum->lmm_stripe_size == (typeof(lum->lmm_stripe_size))(-1)){ + if (LOVEA_DELETE_VALUES(lum->lmm_stripe_size, lum->lmm_stripe_count, + lum->lmm_stripe_offset) && + lum->lmm_magic != LOV_USER_MAGIC_V3) { rc = mdd_xattr_set_txn(env, obj, &LU_BUF_NULL, - MDS_LOV_MD_NAME, 0, handle); + XATTR_NAME_LOV, 0, handle); if (rc == -ENODATA) rc = 0; CDEBUG(D_INFO, "delete lov ea of "DFID" rc %d \n", @@ -279,9 +309,10 @@ int mdd_lsm_sanity_check(const struct lu_env *env, struct mdd_object *obj) if (rc) RETURN(rc); - if ((uc->mu_fsuid != tmp_la->la_uid) && !mdd_capable(uc, CAP_FOWNER)) + if ((uc->mu_fsuid != tmp_la->la_uid) && + !mdd_capable(uc, CFS_CAP_FOWNER)) rc = mdd_permission_internal_locked(env, obj, tmp_la, - MAY_WRITE); + MAY_WRITE, MOR_TGT_CHILD); RETURN(rc); } @@ -291,7 +322,7 @@ int mdd_lov_set_md(const struct lu_env *env, struct mdd_object *pobj, int lmm_size, struct thandle *handle, int set_stripe) { struct lu_buf *buf; - umode_t mode; + cfs_umode_t mode; int rc = 0; ENTRY; @@ -302,22 +333,22 @@ int mdd_lov_set_md(const struct lu_env *env, struct mdd_object *pobj, rc = mdd_lov_set_stripe_md(env, child, buf, handle); } else { rc = mdd_xattr_set_txn(env, child, buf, - MDS_LOV_MD_NAME, 0, handle); + XATTR_NAME_LOV, 0, handle); } } else if (S_ISDIR(mode)) { if (lmmp == NULL && lmm_size == 0) { struct mdd_device *mdd = mdd_obj2mdd_dev(child); struct lov_mds_md *lmm = mdd_max_lmm_get(env, mdd); - int size = sizeof(*lmm); + int size = sizeof(struct lov_mds_md_v3); /* Get parent dir stripe and set */ if (pobj != NULL) rc = mdd_get_md_locked(env, pobj, lmm, &size, - MDS_LOV_MD_NAME); + XATTR_NAME_LOV); if (rc > 0) { buf = mdd_buf_get(env, lmm, size); rc = mdd_xattr_set_txn(env, child, buf, - MDS_LOV_MD_NAME, 0, handle); + XATTR_NAME_LOV, 0, handle); if (rc) CERROR("error on copy stripe info: rc " "= %d\n", rc); @@ -332,18 +363,15 @@ int mdd_lov_set_md(const struct lu_env *env, struct mdd_object *pobj, RETURN(rc); } -/* - * XXX: this is for create lsm object id, which should identify the lsm object - * unique in the whole mds, as I see. But it seems, we still not need it - * now. Right? So just borrow the ll_fid_build_ino(). - */ -static obd_id mdd_lov_create_id(const struct lu_fid *fid) +int mdd_lov_objid_prepare(struct mdd_device *mdd, struct lov_mds_md *lmm) { - return fid_flatten(fid); + /* copy mds_lov code is using wrong layer */ + return mds_lov_prepare_objids(mdd->mdd_obd_dev, lmm); } void mdd_lov_objid_update(struct mdd_device *mdd, struct lov_mds_md *lmm) { + /* copy mds_lov code is using wrong layer */ mds_lov_update_objids(mdd->mdd_obd_dev, lmm); } @@ -351,8 +379,8 @@ void mdd_lov_create_finish(const struct lu_env *env, struct mdd_device *mdd, struct lov_mds_md *lmm, int lmm_size, const struct md_op_spec *spec) { - if (lmm && !spec->u.sp_ea.no_lov_create) - OBD_FREE(lmm, lmm_size); + if (lmm && !spec->no_create) + OBD_FREE_LARGE(lmm, lmm_size); } int mdd_lov_create(const struct lu_env *env, struct mdd_device *mdd, @@ -361,25 +389,33 @@ int mdd_lov_create(const struct lu_env *env, struct mdd_device *mdd, const struct md_op_spec *spec, struct lu_attr *la) { struct obd_device *obd = mdd2obd_dev(mdd); - struct obd_export *lov_exp = obd->u.mds.mds_osc_exp; + struct obd_export *lov_exp = obd->u.mds.mds_lov_exp; + struct lu_site *site = mdd2lu_dev(mdd)->ld_site; struct obdo *oa; struct lov_stripe_md *lsm = NULL; const void *eadata = spec->u.sp_ea.eadata; - __u32 create_flags = spec->sp_cr_flags; + __u64 create_flags = spec->sp_cr_flags; struct obd_trans_info *oti = &mdd_env_info(env)->mti_oti; int rc = 0; ENTRY; - if (!md_should_create(create_flags)) + if (!md_should_create(create_flags)) { + *lmm_size = 0; RETURN(0); - + } oti_init(oti, NULL); /* replay case, has objects already, only get lov from eadata */ - if (spec->u.sp_ea.no_lov_create != 0) { + if (spec->no_create != 0) { *lmm = (struct lov_mds_md *)spec->u.sp_ea.eadata; *lmm_size = spec->u.sp_ea.eadatalen; - RETURN(0); + if (*lmm_size == lov_mds_md_size((*lmm)->lmm_stripe_count, + (*lmm)->lmm_magic)) { + RETURN(0); + } else { + CERROR("incorrect lsm received during recovery\n"); + RETURN(-EPROTO); + } } if (OBD_FAIL_CHECK(OBD_FAIL_MDS_ALLOC_OBDO)) @@ -390,9 +426,9 @@ int mdd_lov_create(const struct lu_env *env, struct mdd_device *mdd, oa->o_uid = 0; /* must have 0 uid / gid on OST */ oa->o_gid = 0; - oa->o_gr = FILTER_GROUP_MDS0 + mdd2lu_dev(mdd)->ld_site->ls_node_id; + oa->o_seq = mdt_to_obd_objseq(lu_site2md(site)->ms_node_id); oa->o_mode = S_IFREG | 0600; - oa->o_id = mdd_lov_create_id(mdd_object_fid(child)); + oa->o_id = fid_ver_oid(mdd_object_fid(child)); oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLFLAGS | OBD_MD_FLMODE | OBD_MD_FLUID | OBD_MD_FLGID | OBD_MD_FLGROUP; oa->o_size = 0; @@ -404,8 +440,6 @@ int mdd_lov_create(const struct lu_env *env, struct mdd_device *mdd, 0, &lsm, (void*)eadata); if (rc) GOTO(out_oti, rc); - lsm->lsm_object_id = oa->o_id; - lsm->lsm_object_gr = oa->o_gr; } else if (parent != NULL) { /* get lov ea from parent and set to lov */ struct lov_mds_md *_lmm; @@ -419,14 +453,17 @@ int mdd_lov_create(const struct lu_env *env, struct mdd_device *mdd, rc = mdd_get_md_locked(env, parent, _lmm, &_lmm_size, - MDS_LOV_MD_NAME); + XATTR_NAME_LOV); if (rc > 0) rc = obd_iocontrol(OBD_IOC_LOV_SETSTRIPE, - lov_exp, 0, &lsm, _lmm); + lov_exp, *lmm_size, + &lsm, _lmm); + if (rc) GOTO(out_oti, rc); } + OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_OPEN_WAIT_CREATE, 10); rc = obd_create(lov_exp, oa, &lsm, oti); if (rc) { if (rc > 0) { @@ -436,17 +473,18 @@ int mdd_lov_create(const struct lu_env *env, struct mdd_device *mdd, } GOTO(out_oti, rc); } - LASSERT(lsm->lsm_object_gr >= FILTER_GROUP_MDS0); + LASSERT_SEQ_IS_MDT(lsm->lsm_object_seq); } else { LASSERT(eadata != NULL); rc = obd_iocontrol(OBD_IOC_LOV_SETEA, lov_exp, 0, &lsm, (void*)eadata); if (rc) GOTO(out_oti, rc); - lsm->lsm_object_id = oa->o_id; - lsm->lsm_object_gr = oa->o_gr; + } + lsm->lsm_object_id = fid_ver_oid(mdd_object_fid(child)); + lsm->lsm_object_seq = fid_seq(mdd_object_fid(child)); /* * Sometimes, we may truncate some object(without lsm) then open it * (with write flags), so creating lsm above. The Nonzero(truncated) @@ -467,21 +505,15 @@ int mdd_lov_create(const struct lu_env *env, struct mdd_device *mdd, * filter_fid, but can not see what is the usages. So just pack * o_seq o_ver here, maybe fix it after this cycle. */ - oa->o_fid = fid_seq(mdd_object_fid(child)); - oa->o_generation = fid_oid(mdd_object_fid(child)); - oa->o_valid |= OBD_MD_FLFID | OBD_MD_FLGENER; + obdo_from_inode(oa, NULL, + (struct lu_fid *)mdd_object_fid(child), 0); oinfo->oi_oa = oa; oinfo->oi_md = lsm; - oinfo->oi_capa = mdo_capa_get(env, child, NULL, - CAPA_OPC_MDS_DEFAULT); + oinfo->oi_capa = NULL; oinfo->oi_policy.l_extent.start = la->la_size; oinfo->oi_policy.l_extent.end = OBD_OBJECT_EOF; - if (IS_ERR(oinfo->oi_capa)) - oinfo->oi_capa = NULL; - rc = obd_punch_rqset(lov_exp, oinfo, oti); - capa_put(oinfo->oi_capa); if (rc) { CERROR("Error setting attrs for "DFID": rc %d\n", PFID(mdo2fid(child)), rc); @@ -493,7 +525,6 @@ int mdd_lov_create(const struct lu_env *env, struct mdd_device *mdd, GOTO(out_oti, rc); } } - /* blksize should be changed after create data object */ la->la_valid |= LA_BLKSIZE; la->la_blksize = oa->o_blksize; @@ -503,6 +534,12 @@ int mdd_lov_create(const struct lu_env *env, struct mdd_device *mdd, CERROR("Cannot pack lsm, err = %d\n", rc); GOTO(out_oti, rc); } + if (mdd_lov_objid_prepare(mdd, *lmm) != 0) { + CERROR("Not have memory for update objid\n"); + OBD_FREE(*lmm, rc); + *lmm = NULL; + GOTO(out_oti, rc = -ENOMEM); + } *lmm_size = rc; rc = 0; EXIT; @@ -527,10 +564,11 @@ int mdd_lovobj_unlink(const struct lu_env *env, struct mdd_device *mdd, int log_unlink) { struct obd_device *obd = mdd2obd_dev(mdd); - struct obd_export *lov_exp = obd->u.mds.mds_osc_exp; + struct obd_export *lov_exp = obd->u.mds.mds_lov_exp; struct lov_stripe_md *lsm = NULL; struct obd_trans_info *oti = &mdd_env_info(env)->mti_oti; struct obdo *oa = &mdd_env_info(env)->mti_oa; + struct lu_site *site = mdd2lu_dev(mdd)->ld_site; int rc; ENTRY; @@ -547,7 +585,7 @@ int mdd_lovobj_unlink(const struct lu_env *env, struct mdd_device *mdd, } oa->o_id = lsm->lsm_object_id; - oa->o_gr = FILTER_GROUP_MDS0 + mdd2lu_dev(mdd)->ld_site->ls_node_id; + oa->o_seq = mdt_to_obd_objseq(lu_site2md(site)->ms_node_id); oa->o_mode = la->la_mode & S_IFMT; oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLGROUP; @@ -557,18 +595,17 @@ int mdd_lovobj_unlink(const struct lu_env *env, struct mdd_device *mdd, oti->oti_logcookies = logcookies; } - CDEBUG(D_INFO, "destroying OSS object %d/%d\n", - (int)oa->o_id, (int)oa->o_gr); + CDEBUG(D_INFO, "destroying OSS object "LPU64":"LPU64"\n", oa->o_seq, + oa->o_id); - rc = obd_destroy(lov_exp, oa, lsm, oti, NULL); + rc = obd_destroy(lov_exp, oa, lsm, oti, NULL, NULL); obd_free_memmd(lov_exp, &lsm); RETURN(rc); } - /* - * called with obj not locked. + * called with obj locked. */ int mdd_lov_destroy(const struct lu_env *env, struct mdd_device *mdd, struct mdd_object *obj, struct lu_attr *la) @@ -577,6 +614,11 @@ int mdd_lov_destroy(const struct lu_env *env, struct mdd_device *mdd, int rc; ENTRY; + LASSERT(mdd_write_locked(env, obj) != 0); + + if (unlikely(!S_ISREG(mdd_object_type(obj)))) + RETURN(0); + if (unlikely(la->la_nlink != 0)) { CWARN("Attempt to destroy OSS object when nlink == %d\n", la->la_nlink); @@ -591,43 +633,90 @@ int mdd_lov_destroy(const struct lu_env *env, struct mdd_device *mdd, RETURN(rc = -ENOMEM); /* get lov ea */ - rc = mdd_get_md_locked(env, obj, ma->ma_lmm, &ma->ma_lmm_size, - MDS_LOV_MD_NAME); - if (rc) { - CWARN("Get lov ea failed for "DFID"\n", PFID(mdo2fid(obj))); + + rc = mdd_get_md(env, obj, ma->ma_lmm, &ma->ma_lmm_size, + XATTR_NAME_LOV); + + if (rc <= 0) { + CWARN("Get lov ea failed for "DFID" rc = %d\n", + PFID(mdo2fid(obj)), rc); + if (rc == 0) + rc = -ENOENT; RETURN(rc); } + ma->ma_valid = MA_LOV; - + rc = mdd_unlink_log(env, mdd, obj, ma); if (rc) { CWARN("mds unlink log for "DFID" failed: %d\n", PFID(mdo2fid(obj)), rc); RETURN(rc); } - if (ma->ma_valid | MA_COOKIE) - rc = mdd_lovobj_unlink(env, mdd, obj, la, + + if (ma->ma_valid & MA_COOKIE) + rc = mdd_lovobj_unlink(env, mdd, obj, la, ma->ma_lmm, ma->ma_lmm_size, ma->ma_cookie, 1); RETURN(rc); } - int mdd_unlink_log(const struct lu_env *env, struct mdd_device *mdd, struct mdd_object *mdd_cobj, struct md_attr *ma) { - struct obd_device *obd = mdd2obd_dev(mdd); - LASSERT(ma->ma_valid & MA_LOV); if ((ma->ma_cookie_size > 0) && - (mds_log_op_unlink(obd, ma->ma_lmm, ma->ma_lmm_size, + (mds_log_op_unlink(mdd2obd_dev(mdd), ma->ma_lmm, ma->ma_lmm_size, ma->ma_cookie, ma->ma_cookie_size) > 0)) { + CDEBUG(D_HA, "DEBUG: unlink log is added for object "DFID"\n", + PFID(mdd_object_fid(mdd_cobj))); ma->ma_valid |= MA_COOKIE; } return 0; } +int mdd_log_op_setattr(struct obd_device *obd, __u32 uid, __u32 gid, + struct lov_mds_md *lmm, int lmm_size, + struct llog_cookie *logcookies, int cookies_size) +{ + struct mds_obd *mds = &obd->u.mds; + struct lov_stripe_md *lsm = NULL; + struct llog_setattr64_rec *lsr; + struct llog_ctxt *ctxt; + int rc; + ENTRY; + + if (IS_ERR(mds->mds_lov_obd)) + RETURN(PTR_ERR(mds->mds_lov_obd)); + + rc = obd_unpackmd(mds->mds_lov_exp, &lsm, lmm, lmm_size); + if (rc < 0) + RETURN(rc); + + OBD_ALLOC(lsr, sizeof(*lsr)); + if (!lsr) + GOTO(out, rc = -ENOMEM); + + /* prepare setattr log record */ + lsr->lsr_hdr.lrh_len = lsr->lsr_tail.lrt_len = sizeof(*lsr); + lsr->lsr_hdr.lrh_type = MDS_SETATTR64_REC; + lsr->lsr_uid = uid; + lsr->lsr_gid = gid; + + /* write setattr log */ + ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT); + rc = llog_add(ctxt, &lsr->lsr_hdr, lsm, logcookies, + cookies_size / sizeof(struct llog_cookie)); + + llog_ctxt_put(ctxt); + + OBD_FREE(lsr, sizeof(*lsr)); + out: + obd_free_memmd(mds->mds_lov_exp, &lsm); + RETURN(rc); +} + int mdd_setattr_log(const struct lu_env *env, struct mdd_device *mdd, const struct md_attr *ma, struct lov_mds_md *lmm, int lmm_size, @@ -638,44 +727,253 @@ int mdd_setattr_log(const struct lu_env *env, struct mdd_device *mdd, /* journal chown/chgrp in llog, just like unlink */ if (lmm_size > 0) { CDEBUG(D_INFO, "setattr llog for uid/gid=%lu/%lu\n", - (unsigned long)ma->ma_attr.la_uid, + (unsigned long)ma->ma_attr.la_uid, (unsigned long)ma->ma_attr.la_gid); - return mds_log_op_setattr(obd, ma->ma_attr.la_uid, - ma->ma_attr.la_gid, lmm, + return mdd_log_op_setattr(obd, ma->ma_attr.la_uid, + ma->ma_attr.la_gid, lmm, lmm_size, logcookies, cookies_size); } else return 0; } +static int mdd_osc_setattr_async(struct obd_device *obd, __u32 uid, __u32 gid, + struct lov_mds_md *lmm, int lmm_size, + struct llog_cookie *logcookies, const struct lu_fid *parent, + struct obd_capa *oc) +{ + struct mds_obd *mds = &obd->u.mds; + struct obd_trans_info oti = { 0 }; + struct obd_info oinfo = { { { 0 } } }; + int rc; + ENTRY; + + if (OBD_FAIL_CHECK(OBD_FAIL_MDS_OST_SETATTR)) + RETURN(0); + + /* first get memory EA */ + OBDO_ALLOC(oinfo.oi_oa); + if (!oinfo.oi_oa) + RETURN(-ENOMEM); + + LASSERT(lmm); + + rc = obd_unpackmd(mds->mds_lov_exp, &oinfo.oi_md, lmm, lmm_size); + if (rc < 0) { + CERROR("Error unpack md %p for obj "DFID"\n", lmm, + PFID(parent)); + GOTO(out, rc); + } + + /* then fill oa */ + oinfo.oi_oa->o_uid = uid; + oinfo.oi_oa->o_gid = gid; + oinfo.oi_oa->o_id = oinfo.oi_md->lsm_object_id; + oinfo.oi_oa->o_seq = oinfo.oi_md->lsm_object_seq; + oinfo.oi_oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGROUP | + OBD_MD_FLUID | OBD_MD_FLGID; + if (logcookies) { + oinfo.oi_oa->o_valid |= OBD_MD_FLCOOKIE; + oti.oti_logcookies = logcookies; + } + + obdo_from_inode(oinfo.oi_oa, NULL, (struct lu_fid *)parent, 0); + oinfo.oi_capa = oc; + + /* do async setattr from mds to ost not waiting for responses. */ + rc = obd_setattr_async(mds->mds_lov_exp, &oinfo, &oti, NULL); + if (rc) + CDEBUG(D_INODE, "mds to ost setattr objid 0x"LPX64 + " on ost error %d\n", oinfo.oi_md->lsm_object_id, rc); +out: + if (oinfo.oi_md) + obd_free_memmd(mds->mds_lov_exp, &oinfo.oi_md); + OBDO_FREE(oinfo.oi_oa); + RETURN(rc); +} + int mdd_lov_setattr_async(const struct lu_env *env, struct mdd_object *obj, - struct lov_mds_md *lmm, int lmm_size, + struct lov_mds_md *lmm, int lmm_size, struct llog_cookie *logcookies) { struct mdd_device *mdd = mdo2mdd(&obj->mod_obj); struct obd_device *obd = mdd2obd_dev(mdd); struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la; const struct lu_fid *fid = mdd_object_fid(obj); - struct obd_capa *oc; int rc = 0; ENTRY; - mdd_read_lock(env, obj); + mdd_read_lock(env, obj, MOR_TGT_CHILD); rc = mdo_attr_get(env, obj, tmp_la, mdd_object_capa(env, obj)); mdd_read_unlock(env, obj); if (rc) RETURN(rc); - oc = mdo_capa_get(env, obj, NULL, CAPA_OPC_MDS_DEFAULT); - if (IS_ERR(oc)) - oc = NULL; + rc = mdd_osc_setattr_async(obd, tmp_la->la_uid, tmp_la->la_gid, lmm, + lmm_size, logcookies, fid, NULL); + RETURN(rc); +} - rc = mds_osc_setattr_async(obd, tmp_la->la_uid, tmp_la->la_gid, lmm, - lmm_size, logcookies, fid_seq(fid), - fid_oid(fid), oc); +static int grouplock_blocking_ast(struct ldlm_lock *lock, + struct ldlm_lock_desc *desc, + void *data, int flag) +{ + struct md_attr *ma = data; + struct lustre_handle lockh; + int rc = 0; + ENTRY; + + switch (flag) + { + case LDLM_CB_BLOCKING : + /* lock is canceled */ + CDEBUG(D_DLMTRACE, "Lock %p is canceled\n", lock); - capa_put(oc); + ldlm_lock2handle(lock, &lockh); + rc = ldlm_cli_cancel(&lockh); + break; + case LDLM_CB_CANCELING : + CDEBUG(D_DLMTRACE, + "Lock %p has been canceled, do cleaning\n", + lock); + + if (ma && ma->ma_som) + OBD_FREE_PTR(ma->ma_som); + if (ma) + OBD_FREE_PTR(ma); + break; + default: + LBUG(); + } RETURN(rc); } +static int grouplock_glimpse_ast(struct ldlm_lock *lock, void *data) +{ + struct ptlrpc_request *req = data; + struct ost_lvb *lvb; + int rc; + struct md_attr *ma; + ENTRY; + + ma = lock->l_ast_data; + + req_capsule_extend(&req->rq_pill, &RQF_LDLM_GL_CALLBACK); + req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, + sizeof(*lvb)); + rc = req_capsule_server_pack(&req->rq_pill); + if (rc) { + CERROR("failed pack reply: %d\n", rc); + GOTO(out, rc); + } + + lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB); + + if ((ma) && (ma->ma_valid & MA_SOM)) { + lvb->lvb_size = ma->ma_som->msd_size; + lvb->lvb_blocks = ma->ma_som->msd_blocks; + } else if ((ma) && (ma->ma_valid & MA_INODE)) { + lvb->lvb_size = ma->ma_attr.la_size; + lvb->lvb_blocks = ma->ma_attr.la_blocks; + } else { + lvb->lvb_size = 0; + rc = -ELDLM_NO_LOCK_DATA; + } + + EXIT; +out: + if (rc == -ELDLM_NO_LOCK_DATA) + lustre_pack_reply(req, 1, NULL, NULL); + + req->rq_status = rc; + return rc; +} + +int mdd_file_lock(const struct lu_env *env, struct md_object *obj, + struct lov_mds_md *lmm, struct ldlm_extent *extent, + struct lustre_handle *lockh) +{ + struct ldlm_enqueue_info einfo = { 0 }; + struct obd_info oinfo = { { { 0 } } }; + struct obd_device *obd; + struct obd_export *lov_exp; + struct lov_stripe_md *lsm = NULL; + struct md_attr *ma = NULL; + int rc; + ENTRY; + + obd = mdo2mdd(obj)->mdd_obd_dev; + lov_exp = obd->u.mds.mds_lov_exp; + + obd_unpackmd(lov_exp, &lsm, lmm, + lov_mds_md_size(lmm->lmm_stripe_count, lmm->lmm_magic)); + + OBD_ALLOC_PTR(ma); + if (ma == NULL) + GOTO(out, rc = -ENOMEM); + + OBD_ALLOC_PTR(ma->ma_som); + if (ma->ma_som == NULL) + GOTO(out, rc = -ENOMEM); + + ma->ma_need = MA_SOM | MA_INODE; + mo_attr_get(env, obj, ma); + + einfo.ei_type = LDLM_EXTENT; + einfo.ei_mode = LCK_GROUP; + einfo.ei_cb_bl = grouplock_blocking_ast; + einfo.ei_cb_cp = ldlm_completion_ast; + einfo.ei_cb_gl = grouplock_glimpse_ast; + + if (ma->ma_valid & (MA_SOM | MA_INODE)) + einfo.ei_cbdata = ma; + else + einfo.ei_cbdata = NULL; + + memset(&oinfo.oi_policy, 0, sizeof(oinfo.oi_policy)); + oinfo.oi_policy.l_extent = *extent; + oinfo.oi_lockh = lockh; + oinfo.oi_md = lsm; + oinfo.oi_flags = 0; + + rc = obd_enqueue(lov_exp, &oinfo, &einfo, NULL); + /* ei_cbdata is used as a free flag at exit */ + if (rc) + einfo.ei_cbdata = NULL; + + obd_unpackmd(lov_exp, &lsm, NULL, 0); + +out: + /* ma is freed if not used as callback data */ + if ((einfo.ei_cbdata == NULL) && ma && ma->ma_som) + OBD_FREE_PTR(ma->ma_som); + if ((einfo.ei_cbdata == NULL) && ma) + OBD_FREE_PTR(ma); + + RETURN(rc); +} + +int mdd_file_unlock(const struct lu_env *env, struct md_object *obj, + struct lov_mds_md *lmm, struct lustre_handle *lockh) +{ + struct obd_device *obd; + struct obd_export *lov_exp; + struct lov_stripe_md *lsm = NULL; + int rc; + ENTRY; + + LASSERT(lustre_handle_is_used(lockh)); + + obd = mdo2mdd(obj)->mdd_obd_dev; + lov_exp = obd->u.mds.mds_lov_exp; + + obd_unpackmd(lov_exp, &lsm, lmm, + lov_mds_md_size(lmm->lmm_stripe_count, lmm->lmm_magic)); + + rc = obd_cancel(lov_exp, lsm, LCK_GROUP, lockh); + + obd_unpackmd(lov_exp, &lsm, NULL, 0); + + RETURN(rc); +}