/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * * lustre/cmm/mdc_object.c * Lustre Cluster Metadata Manager (cmm) * * Copyright (c) 2006 Cluster File Systems, Inc. * Author: Mike Pershin * * This file is part of the Lustre file system, http://www.lustre.org * Lustre is a trademark of Cluster File Systems, Inc. * * You may have signed or agreed to another license before downloading * this software. If so, you are bound by the terms and conditions * of that agreement, and the following does not apply to you. See the * LICENSE file included with this distribution for more information. * * If you did not agree to a different license, then this copy of Lustre * is open source software; you can redistribute it and/or modify it * under the terms of version 2 of the GNU General Public License as * published by the Free Software Foundation. * * In either case, Lustre is distributed in the hope that it will be * useful, but WITHOUT ANY WARRANTY; without even the implied warranty * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * license text for more details. */ #ifndef EXPORT_SYMTAB # define EXPORT_SYMTAB #endif #define DEBUG_SUBSYSTEM S_MDS #include #include #include #include #include "cmm_internal.h" #include "mdc_internal.h" static struct md_object_operations mdc_mo_ops; static struct md_dir_operations mdc_dir_ops; static struct lu_object_operations mdc_obj_ops; extern struct lu_context_key mdc_thread_key; struct lu_object *mdc_object_alloc(const struct lu_env *env, const struct lu_object_header *hdr, struct lu_device *ld) { struct mdc_object *mco; ENTRY; OBD_ALLOC_PTR(mco); if (mco != NULL) { struct lu_object *lo; lo = &mco->mco_obj.mo_lu; lu_object_init(lo, NULL, ld); mco->mco_obj.mo_ops = &mdc_mo_ops; mco->mco_obj.mo_dir_ops = &mdc_dir_ops; lo->lo_ops = &mdc_obj_ops; RETURN(lo); } else RETURN(NULL); } static void mdc_object_free(const struct lu_env *env, struct lu_object *lo) { struct mdc_object *mco = lu2mdc_obj(lo); lu_object_fini(lo); OBD_FREE_PTR(mco); } static int mdc_object_init(const struct lu_env *env, struct lu_object *lo) { ENTRY; lo->lo_header->loh_attr |= LOHA_REMOTE; RETURN(0); } static int mdc_object_print(const struct lu_env *env, void *cookie, lu_printer_t p, const struct lu_object *lo) { return (*p)(env, cookie, LUSTRE_CMM_MDC_NAME"-object@%p", lo); } static struct lu_object_operations mdc_obj_ops = { .loo_object_init = mdc_object_init, .loo_object_free = mdc_object_free, .loo_object_print = mdc_object_print, }; /* md_object_operations */ static struct mdc_thread_info *mdc_info_get(const struct lu_env *env) { struct mdc_thread_info *mci; mci = lu_context_key_get(&env->le_ctx, &mdc_thread_key); LASSERT(mci); return mci; } static struct mdc_thread_info *mdc_info_init(const struct lu_env *env) { struct mdc_thread_info *mci = mdc_info_get(env); memset(mci, 0, sizeof(*mci)); return mci; } static void mdc_body2attr(struct mdt_body *body, struct md_attr *ma) { struct lu_attr *la = &ma->ma_attr; /* update time */ if (body->valid & OBD_MD_FLCTIME && body->ctime >= la->la_ctime) { la->la_ctime = body->ctime; if (body->valid & OBD_MD_FLMTIME) la->la_mtime = body->mtime; } if (body->valid & OBD_MD_FLMODE) la->la_mode = body->mode; if (body->valid & OBD_MD_FLSIZE) la->la_size = body->size; if (body->valid & OBD_MD_FLBLOCKS) la->la_blocks = body->blocks; if (body->valid & OBD_MD_FLUID) la->la_uid = body->uid; if (body->valid & OBD_MD_FLGID) la->la_gid = body->gid; if (body->valid & OBD_MD_FLFLAGS) la->la_flags = body->flags; if (body->valid & OBD_MD_FLNLINK) la->la_nlink = body->nlink; if (body->valid & OBD_MD_FLRDEV) la->la_rdev = body->rdev; la->la_valid = body->valid; ma->ma_valid = MA_INODE; } static int mdc_req2attr_update(const struct lu_env *env, struct md_attr *ma) { struct mdc_thread_info *mci; struct ptlrpc_request *req; struct mdt_body *body; struct lov_mds_md *lov; struct llog_cookie *cookie; ENTRY; mci = mdc_info_get(env); req = mci->mci_req; LASSERT(req); body = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*body)); LASSERT(body); mdc_body2attr(body, ma); if (body->valid & OBD_MD_FLMDSCAPA) { struct lustre_capa *capa; /* create for cross-ref will fetch mds capa from remote obj */ capa = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF + 1, sizeof(*capa)); LASSERT(capa != NULL); LASSERT(ma->ma_capa != NULL); *ma->ma_capa = *capa; } if (!(body->valid & OBD_MD_FLEASIZE)) RETURN(0); if (body->eadatasize == 0) { CERROR("OBD_MD_FLEASIZE is set but eadatasize is zero\n"); RETURN(-EPROTO); } lov = lustre_swab_repbuf(req, REPLY_REC_OFF + 1, body->eadatasize, NULL); if (lov == NULL) { CERROR("Can't unpack MDS EA data\n"); RETURN(-EPROTO); } LASSERT(ma->ma_lmm != NULL); LASSERT(ma->ma_lmm_size >= body->eadatasize); ma->ma_lmm_size = body->eadatasize; memcpy(ma->ma_lmm, lov, ma->ma_lmm_size); ma->ma_valid |= MA_LOV; if (!(body->valid & OBD_MD_FLCOOKIE)) RETURN(0); if (body->aclsize == 0) { CERROR("OBD_MD_FLCOOKIE is set but cookie size is zero\n"); RETURN(-EPROTO); } cookie = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF + 2, body->aclsize); if (cookie == NULL) { CERROR("Can't unpack unlink cookie data\n"); RETURN(-EPROTO); } LASSERT(ma->ma_cookie != NULL); LASSERT(ma->ma_cookie_size == body->aclsize); memcpy(ma->ma_cookie, cookie, ma->ma_cookie_size); ma->ma_valid |= MA_COOKIE; RETURN(0); } static int mdc_attr_get(const struct lu_env *env, struct md_object *mo, struct md_attr *ma) { struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo)); struct mdc_thread_info *mci; int rc; ENTRY; mci = lu_context_key_get(&env->le_ctx, &mdc_thread_key); LASSERT(mci); memset(&mci->mci_opdata, 0, sizeof(mci->mci_opdata)); rc = md_getattr(mc->mc_desc.cl_exp, lu_object_fid(&mo->mo_lu), NULL, OBD_MD_FLMODE | OBD_MD_FLUID | OBD_MD_FLGID | OBD_MD_FLFLAGS | OBD_MD_FLCROSSREF, 0, &mci->mci_req); if (rc == 0) { /* get attr from request */ rc = mdc_req2attr_update(env, ma); } ptlrpc_req_finished(mci->mci_req); RETURN(rc); } static inline struct timespec *mdc_attr_time(struct timespec *t, __u64 seconds) { t->tv_sec = seconds; t->tv_nsec = 0; return t; } /* * XXX: It is only used for set ctime when rename's source on remote MDS. */ static int mdc_attr_set(const struct lu_env *env, struct md_object *mo, const struct md_attr *ma) { struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo)); const struct lu_attr *la = &ma->ma_attr; struct mdc_thread_info *mci; struct md_ucred *uc = md_ucred(env); int rc; ENTRY; LASSERT(ma->ma_attr.la_valid & LA_CTIME); mci = lu_context_key_get(&env->le_ctx, &mdc_thread_key); LASSERT(mci); memset(&mci->mci_opdata, 0, sizeof(mci->mci_opdata)); mci->mci_opdata.op_fid1 = *lu_object_fid(&mo->mo_lu); mdc_attr_time(&mci->mci_opdata.op_attr.ia_ctime, la->la_ctime); mci->mci_opdata.op_attr.ia_mode = la->la_mode; mci->mci_opdata.op_attr.ia_valid = ATTR_CTIME_SET; if (uc && ((uc->mu_valid == UCRED_OLD) || (uc->mu_valid == UCRED_NEW))) { mci->mci_opdata.op_fsuid = uc->mu_fsuid; mci->mci_opdata.op_fsgid = uc->mu_fsgid; mci->mci_opdata.op_cap = uc->mu_cap; if (uc->mu_ginfo || (uc->mu_valid == UCRED_OLD)) { mci->mci_opdata.op_suppgids[0] = uc->mu_suppgids[0]; mci->mci_opdata.op_suppgids[1] = uc->mu_suppgids[1]; } else { mci->mci_opdata.op_suppgids[0] = mci->mci_opdata.op_suppgids[1] = -1; } } else { mci->mci_opdata.op_fsuid = la->la_uid; mci->mci_opdata.op_fsgid = la->la_gid; mci->mci_opdata.op_cap = current->cap_effective; mci->mci_opdata.op_suppgids[0] = mci->mci_opdata.op_suppgids[1] = -1; } rc = md_setattr(mc->mc_desc.cl_exp, &mci->mci_opdata, NULL, 0, NULL, 0, &mci->mci_req, NULL); ptlrpc_req_finished(mci->mci_req); RETURN(rc); } static int mdc_object_create(const struct lu_env *env, struct md_object *mo, const struct md_op_spec *spec, struct md_attr *ma) { struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo)); struct lu_attr *la = &ma->ma_attr; struct mdc_thread_info *mci; const void *symname; struct md_ucred *uc = md_ucred(env); int rc, symlen; uid_t uid; gid_t gid; __u32 cap; ENTRY; LASSERT(S_ISDIR(la->la_mode)); LASSERT(spec->u.sp_pfid != NULL); mci = mdc_info_init(env); mci->mci_opdata.op_bias = MDS_CROSS_REF; mci->mci_opdata.op_fid2 = *lu_object_fid(&mo->mo_lu); /* Parent fid is needed to create dotdot on the remote node. */ mci->mci_opdata.op_fid1 = *(spec->u.sp_pfid); mci->mci_opdata.op_mod_time = la->la_ctime; if (uc && ((uc->mu_valid == UCRED_OLD) || (uc->mu_valid == UCRED_NEW))) { uid = uc->mu_fsuid; if (la->la_mode & S_ISGID) gid = la->la_gid; else gid = uc->mu_fsgid; cap = uc->mu_cap; if (uc->mu_ginfo || (uc->mu_valid == UCRED_OLD)) mci->mci_opdata.op_suppgids[0] = uc->mu_suppgids[0]; else mci->mci_opdata.op_suppgids[0] = -1; } else { uid = la->la_uid; gid = la->la_gid; cap = 0; mci->mci_opdata.op_suppgids[0] = -1; } /* get data from spec */ if (spec->sp_cr_flags & MDS_CREATE_SLAVE_OBJ) { symname = spec->u.sp_ea.eadata; symlen = spec->u.sp_ea.eadatalen; mci->mci_opdata.op_fid1 = *(spec->u.sp_ea.fid); mci->mci_opdata.op_flags |= MDS_CREATE_SLAVE_OBJ; #ifdef CONFIG_FS_POSIX_ACL } else if (spec->sp_cr_flags & MDS_CREATE_RMT_ACL) { symname = spec->u.sp_ea.eadata; symlen = spec->u.sp_ea.eadatalen; mci->mci_opdata.op_fid1 = *(spec->u.sp_ea.fid); mci->mci_opdata.op_flags |= MDS_CREATE_RMT_ACL; #endif } else { symname = spec->u.sp_symname; symlen = symname ? strlen(symname) + 1 : 0; } rc = md_create(mc->mc_desc.cl_exp, &mci->mci_opdata, symname, symlen, la->la_mode, uid, gid, cap, la->la_rdev, &mci->mci_req); if (rc == 0) { /* get attr from request */ rc = mdc_req2attr_update(env, ma); } ptlrpc_req_finished(mci->mci_req); RETURN(rc); } static int mdc_ref_add(const struct lu_env *env, struct md_object *mo, const struct md_attr *ma) { struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo)); const struct lu_attr *la = &ma->ma_attr; struct mdc_thread_info *mci; struct md_ucred *uc = md_ucred(env); int rc; ENTRY; mci = lu_context_key_get(&env->le_ctx, &mdc_thread_key); LASSERT(mci); memset(&mci->mci_opdata, 0, sizeof(mci->mci_opdata)); mci->mci_opdata.op_bias = MDS_CROSS_REF; mci->mci_opdata.op_fid1 = *lu_object_fid(&mo->mo_lu); mci->mci_opdata.op_mod_time = la->la_ctime; if (uc && ((uc->mu_valid == UCRED_OLD) || (uc->mu_valid == UCRED_NEW))) { mci->mci_opdata.op_fsuid = uc->mu_fsuid; mci->mci_opdata.op_fsgid = uc->mu_fsgid; mci->mci_opdata.op_cap = uc->mu_cap; if (uc->mu_ginfo || (uc->mu_valid == UCRED_OLD)) { mci->mci_opdata.op_suppgids[0] = uc->mu_suppgids[0]; mci->mci_opdata.op_suppgids[1] = uc->mu_suppgids[1]; } else { mci->mci_opdata.op_suppgids[0] = mci->mci_opdata.op_suppgids[1] = -1; } } else { mci->mci_opdata.op_fsuid = la->la_uid; mci->mci_opdata.op_fsgid = la->la_gid; mci->mci_opdata.op_cap = current->cap_effective; mci->mci_opdata.op_suppgids[0] = mci->mci_opdata.op_suppgids[1] = -1; } rc = md_link(mc->mc_desc.cl_exp, &mci->mci_opdata, &mci->mci_req); ptlrpc_req_finished(mci->mci_req); RETURN(rc); } static int mdc_ref_del(const struct lu_env *env, struct md_object *mo, struct md_attr *ma) { struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo)); struct lu_attr *la = &ma->ma_attr; struct mdc_thread_info *mci; struct md_ucred *uc = md_ucred(env); int rc; ENTRY; mci = mdc_info_init(env); mci->mci_opdata.op_bias = MDS_CROSS_REF; if (ma->ma_attr_flags & MDS_VTX_BYPASS) mci->mci_opdata.op_bias |= MDS_VTX_BYPASS; else mci->mci_opdata.op_bias &= ~MDS_VTX_BYPASS; mci->mci_opdata.op_fid1 = *lu_object_fid(&mo->mo_lu); mci->mci_opdata.op_mode = la->la_mode; mci->mci_opdata.op_mod_time = la->la_ctime; if (uc && ((uc->mu_valid == UCRED_OLD) || (uc->mu_valid == UCRED_NEW))) { mci->mci_opdata.op_fsuid = uc->mu_fsuid; mci->mci_opdata.op_fsgid = uc->mu_fsgid; mci->mci_opdata.op_cap = uc->mu_cap; if (uc->mu_ginfo || (uc->mu_valid == UCRED_OLD)) mci->mci_opdata.op_suppgids[0] = uc->mu_suppgids[0]; else mci->mci_opdata.op_suppgids[0] = -1; } else { mci->mci_opdata.op_fsuid = la->la_uid; mci->mci_opdata.op_fsgid = la->la_gid; mci->mci_opdata.op_cap = current->cap_effective; mci->mci_opdata.op_suppgids[0] = -1; } rc = md_unlink(mc->mc_desc.cl_exp, &mci->mci_opdata, &mci->mci_req); if (rc == 0) { /* get attr from request */ rc = mdc_req2attr_update(env, ma); } ptlrpc_req_finished(mci->mci_req); RETURN(rc); } #ifdef HAVE_SPLIT_SUPPORT int mdc_send_page(struct cmm_device *cm, const struct lu_env *env, struct md_object *mo, struct page *page, __u32 offset) { struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo)); int rc; ENTRY; rc = mdc_sendpage(mc->mc_desc.cl_exp, lu_object_fid(&mo->mo_lu), page, offset); CDEBUG(rc ? D_ERROR : D_INFO, "send page %p offset %d fid "DFID " rc %d \n", page, offset, PFID(lu_object_fid(&mo->mo_lu)), rc); RETURN(rc); } #endif static struct md_object_operations mdc_mo_ops = { .moo_attr_get = mdc_attr_get, .moo_attr_set = mdc_attr_set, .moo_object_create = mdc_object_create, .moo_ref_add = mdc_ref_add, .moo_ref_del = mdc_ref_del, }; /* md_dir_operations */ static int mdc_rename_tgt(const struct lu_env *env, struct md_object *mo_p, struct md_object *mo_t, const struct lu_fid *lf, const struct lu_name *lname, struct md_attr *ma) { struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo_p)); struct lu_attr *la = &ma->ma_attr; struct mdc_thread_info *mci; struct md_ucred *uc = md_ucred(env); int rc; ENTRY; mci = mdc_info_init(env); mci->mci_opdata.op_bias = MDS_CROSS_REF; if (ma->ma_attr_flags & MDS_VTX_BYPASS) mci->mci_opdata.op_bias |= MDS_VTX_BYPASS; else mci->mci_opdata.op_bias &= ~MDS_VTX_BYPASS; mci->mci_opdata.op_fid1 = *lu_object_fid(&mo_p->mo_lu); mci->mci_opdata.op_fid2 = *lf; mci->mci_opdata.op_mode = la->la_mode; mci->mci_opdata.op_mod_time = la->la_ctime; if (uc && ((uc->mu_valid == UCRED_OLD) || (uc->mu_valid == UCRED_NEW))) { mci->mci_opdata.op_fsuid = uc->mu_fsuid; mci->mci_opdata.op_fsgid = uc->mu_fsgid; mci->mci_opdata.op_cap = uc->mu_cap; if (uc->mu_ginfo || (uc->mu_valid == UCRED_OLD)) { mci->mci_opdata.op_suppgids[0] = uc->mu_suppgids[0]; mci->mci_opdata.op_suppgids[1] = uc->mu_suppgids[1]; } else { mci->mci_opdata.op_suppgids[0] = mci->mci_opdata.op_suppgids[1] = -1; } } else { mci->mci_opdata.op_fsuid = la->la_uid; mci->mci_opdata.op_fsgid = la->la_gid; mci->mci_opdata.op_cap = current->cap_effective; mci->mci_opdata.op_suppgids[0] = mci->mci_opdata.op_suppgids[1] = -1; } rc = md_rename(mc->mc_desc.cl_exp, &mci->mci_opdata, NULL, 0, lname->ln_name, lname->ln_namelen, &mci->mci_req); if (rc == 0) { /* get attr from request */ mdc_req2attr_update(env, ma); } ptlrpc_req_finished(mci->mci_req); RETURN(rc); } /* * Return resulting fid in sfid * 0: fids are not relatives * fid: fid at which search stopped */ static int mdc_is_subdir(const struct lu_env *env, struct md_object *mo, const struct lu_fid *fid, struct lu_fid *sfid) { struct mdc_device *mc = md2mdc_dev(md_obj2dev(mo)); struct mdc_thread_info *mci; struct mdt_body *body; int rc; ENTRY; mci = mdc_info_init(env); rc = md_is_subdir(mc->mc_desc.cl_exp, lu_object_fid(&mo->mo_lu), fid, &mci->mci_req); if (rc == 0 || rc == -EREMOTE) { body = lustre_msg_buf(mci->mci_req->rq_repmsg, REPLY_REC_OFF, sizeof(*body)); LASSERT(body->valid & OBD_MD_FLID); CDEBUG(D_INFO, "Remote mdo_is_subdir(), new src "DFID"\n", PFID(&body->fid1)); *sfid = body->fid1; } ptlrpc_req_finished(mci->mci_req); RETURN(rc); } static struct md_dir_operations mdc_dir_ops = { .mdo_is_subdir = mdc_is_subdir, .mdo_rename_tgt = mdc_rename_tgt };