X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;ds=inline;f=lustre%2Fmds%2Fhandler.c;h=9c33a9832ee14ccbb08f7b0a53965dacb8f11bbe;hb=c1f6b32958c799412c830f35f8d16ed7275407ea;hp=63ba97277d603509a330865031072901633f3119;hpb=11e0902cb38306ccb570ae2aab6348f64bdb9825;p=fs%2Flustre-release.git diff --git a/lustre/mds/handler.c b/lustre/mds/handler.c index 63ba972..9c33a98 100644 --- a/lustre/mds/handler.c +++ b/lustre/mds/handler.c @@ -4,26 +4,29 @@ * lustre/mds/handler.c * Lustre Metadata Server (mds) request handler * - * Copyright (c) 2001-2003 Cluster File Systems, Inc. + * Copyright (c) 2001-2005 Cluster File Systems, Inc. * Author: Peter Braam * Author: Andreas Dilger * Author: Phil Schwan * Author: Mike Shaver * - * This file is part of Lustre, http://www.lustre.org. + * This file is part of the Lustre file system, http://www.lustre.org + * Lustre is a trademark of Cluster File Systems, Inc. * - * Lustre is free software; you can redistribute it and/or - * modify it under the terms of version 2 of the GNU General Public - * License as published by the Free Software Foundation. + * You may have signed or agreed to another license before downloading + * this software. If so, you are bound by the terms and conditions + * of that agreement, and the following does not apply to you. See the + * LICENSE file included with this distribution for more information. * - * Lustre is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. + * If you did not agree to a different license, then this copy of Lustre + * is open source software; you can redistribute it and/or modify it + * under the terms of version 2 of the GNU General Public License as + * published by the Free Software Foundation. * - * You should have received a copy of the GNU General Public License - * along with Lustre; if not, write to the Free Software - * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + * In either case, Lustre is distributed in the hope that it will be + * useful, but WITHOUT ANY WARRANTY; without even the implied warranty + * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * license text for more details. */ #ifndef EXPORT_SYMTAB @@ -31,40 +34,43 @@ #endif #define DEBUG_SUBSYSTEM S_MDS +#include #include -#include -#include #include -#include #include #include #include -#include -#include -#if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0)) -# include -# include -# include -# include -#else -# include -#endif -#include -#include -#include -#include -#include -#include +#include +#include +#include +#include + #include +#include +#include +#include +#include +#include +#include +#include +#include +#include + #include "mds_internal.h" -#include + +int mds_num_threads; +CFS_MODULE_PARM(mds_num_threads, "i", int, 0444, + "number of MDS service threads to start"); + +__u32 mds_max_ost_index=0xFFFF; +CFS_MODULE_PARM(mds_max_ost_index, "i", int, 0444, + "maximal OST index"); static int mds_intent_policy(struct ldlm_namespace *ns, struct ldlm_lock **lockp, void *req_cookie, ldlm_mode_t mode, int flags, void *data); static int mds_postsetup(struct obd_device *obd); -static int mds_cleanup(struct obd_device *obd, int flags); - +static int mds_cleanup(struct obd_device *obd); /* Assumes caller has already pushed into the kernel filesystem context */ static int mds_sendpage(struct ptlrpc_request *req, struct file *file, @@ -76,9 +82,9 @@ static int mds_sendpage(struct ptlrpc_request *req, struct file *file, int rc = 0, npages, i, tmpcount, tmpsize = 0; ENTRY; - LASSERT((offset & (PAGE_SIZE - 1)) == 0); /* I'm dubious about this */ + LASSERT((offset & ~CFS_PAGE_MASK) == 0); /* I'm dubious about this */ - npages = (count + PAGE_SIZE - 1) >> PAGE_SHIFT; + npages = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT; OBD_ALLOC(pages, sizeof(*pages) * npages); if (!pages) GOTO(out, rc = -ENOMEM); @@ -89,9 +95,9 @@ static int mds_sendpage(struct ptlrpc_request *req, struct file *file, GOTO(out_free, rc = -ENOMEM); for (i = 0, tmpcount = count; i < npages; i++, tmpcount -= tmpsize) { - tmpsize = tmpcount > PAGE_SIZE ? PAGE_SIZE : tmpcount; + tmpsize = tmpcount > CFS_PAGE_SIZE ? CFS_PAGE_SIZE : tmpcount; - pages[i] = alloc_pages(GFP_KERNEL, 0); + OBD_PAGE_ALLOC(pages[i], CFS_ALLOC_STD); if (pages[i] == NULL) GOTO(cleanup_buf, rc = -ENOMEM); @@ -99,10 +105,10 @@ static int mds_sendpage(struct ptlrpc_request *req, struct file *file, } for (i = 0, tmpcount = count; i < npages; i++, tmpcount -= tmpsize) { - tmpsize = tmpcount > PAGE_SIZE ? PAGE_SIZE : tmpcount; + tmpsize = tmpcount > CFS_PAGE_SIZE ? CFS_PAGE_SIZE : tmpcount; CDEBUG(D_EXT2, "reading %u@%llu from dir %lu (size %llu)\n", tmpsize, offset, file->f_dentry->d_inode->i_ino, - file->f_dentry->d_inode->i_size); + i_size_read(file->f_dentry->d_inode)); rc = fsfilt_readpage(req->rq_export->exp_obd, file, kmap(pages[i]), tmpsize, &offset); @@ -120,7 +126,7 @@ static int mds_sendpage(struct ptlrpc_request *req, struct file *file, if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE)) { CERROR("obd_fail_loc=%x, fail operation rc=%d\n", - OBD_FAIL_MDS_SENDPAGE, rc = -EIO); + OBD_FAIL_MDS_SENDPAGE, rc); GOTO(abort_bulk, rc); } @@ -136,13 +142,13 @@ static int mds_sendpage(struct ptlrpc_request *req, struct file *file, rc = -ETIMEDOUT; /* XXX should this be a different errno? */ } - DEBUG_REQ(D_ERROR, req, "bulk failed: %s %d(%d), evicting %s@%s\n", + DEBUG_REQ(D_ERROR, req, "bulk failed: %s %d(%d), evicting %s@%s", (rc == -ETIMEDOUT) ? "timeout" : "network error", desc->bd_nob_transferred, count, req->rq_export->exp_client_uuid.uuid, req->rq_export->exp_connection->c_remote_uuid.uuid); - ptlrpc_fail_export(req->rq_export); + class_fail_export(req->rq_export); EXIT; abort_bulk: @@ -150,7 +156,7 @@ static int mds_sendpage(struct ptlrpc_request *req, struct file *file, cleanup_buf: for (i = 0; i < npages; i++) if (pages[i]) - __free_pages(pages[i], 0); + OBD_PAGE_FREE(pages[i]); ptlrpc_free_bulk(desc); out_free: @@ -159,186 +165,59 @@ static int mds_sendpage(struct ptlrpc_request *req, struct file *file, return rc; } -extern char *ldlm_lockname[]; - -int mds_lock_mode_for_dir(struct obd_device *obd, - struct dentry *dentry, int mode) -{ - int ret_mode = 0, split; - - /* any dir access needs couple locks: - * 1) on part of dir we gonna lookup/modify in - * 2) on a whole dir to protect it from concurrent splitting - * and to flush client's cache for readdir() - * so, for a given mode and dentry this routine decides what - * lock mode to use for lock #2: - * 1) if caller's gonna lookup in dir then we need to protect - * dir from being splitted only - LCK_CR - * 2) if caller's gonna modify dir then we need to protect - * dir from being splitted and to flush cache - LCK_CW - * 3) if caller's gonna modify dir and that dir seems ready - * for splitting then we need to protect it from any - * type of access (lookup/modify/split) - LCK_EX -bzzz */ - - split = mds_splitting_expected(obd, dentry); - - /* - * it is important to check here only for MDS_NO_SPLITTABLE. The reason - * is that MDS_NO_SPLITTABLE means dir is not splittable in principle - * and another thread will not split it on the quiet. But if we have - * MDS_NO_SPLIT_EXPECTED, this means, that dir may be splitted anytime, - * but not now (for current thread) and we should consider that it can - * happen soon and go that branch which can yield LCK_EX to protect from - * possible splitting. - */ - if (split == MDS_NO_SPLITTABLE) { - /* - * this inode won't be splitted. so we need not to protect from - * just flush client's cache on modification. - */ - if (mode == LCK_PW) - ret_mode = LCK_CW; - else - ret_mode = 0; - } else { - if (mode == LCK_EX) { - ret_mode = LCK_EX; - } else if (mode == LCK_PR) { - ret_mode = LCK_CR; - } else if (mode == LCK_PW) { - /* - * caller gonna modify directory. We use concurrent - * write lock here to retract client's cache for - * readdir. - */ - if (split == MDS_EXPECT_SPLIT) { - /* - * splitting possible. serialize any access the - * idea is that first one seen dir is splittable - * is given exclusive lock and split - * directory. caller passes lock mode to - * mds_try_to_split_dir() and splitting would be - * done with exclusive lock only -bzzz. - */ - CDEBUG(D_OTHER, "%s: gonna split %lu/%lu\n", - obd->obd_name, - (unsigned long)dentry->d_inode->i_ino, - (unsigned long)dentry->d_inode->i_generation); - ret_mode = LCK_EX; - } else { - ret_mode = LCK_CW; - } - } - } - - return ret_mode; -} - /* only valid locked dentries or errors should be returned */ -struct dentry *mds_id2locked_dentry(struct obd_device *obd, struct lustre_id *id, - struct vfsmount **mnt, int lock_mode, - struct lustre_handle *lockh, int *mode, - char *name, int namelen, __u64 lockpart) +struct dentry *mds_fid2locked_dentry(struct obd_device *obd, struct ll_fid *fid, + struct vfsmount **mnt, int lock_mode, + struct lustre_handle *lockh, + __u64 lockpart) { - struct dentry *de = mds_id2dentry(obd, id, mnt), *retval = de; - ldlm_policy_data_t policy = { .l_inodebits = { lockpart } }; + struct mds_obd *mds = &obd->u.mds; + struct dentry *de = mds_fid2dentry(mds, fid, mnt), *retval = de; struct ldlm_res_id res_id = { .name = {0} }; int flags = LDLM_FL_ATOMIC_CB, rc; + ldlm_policy_data_t policy = { .l_inodebits = { lockpart} }; ENTRY; if (IS_ERR(de)) RETURN(de); - lockh[1].cookie = 0; - res_id.name[0] = id_fid(id); - res_id.name[1] = id_group(id); - -#ifdef S_PDIROPS - if (name && IS_PDIROPS(de->d_inode)) { - ldlm_policy_data_t cpolicy = - { .l_inodebits = { MDS_INODELOCK_UPDATE } }; - LASSERT(mode != NULL); - *mode = mds_lock_mode_for_dir(obd, de, lock_mode); - if (*mode) { - rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, - res_id, LDLM_IBITS, - &cpolicy, *mode, &flags, - mds_blocking_ast, - ldlm_completion_ast, NULL, NULL, - NULL, 0, NULL, lockh + 1); - if (rc != ELDLM_OK) { - l_dput(de); - RETURN(ERR_PTR(-ENOLCK)); - } - } - flags = LDLM_FL_ATOMIC_CB; - - res_id.name[2] = full_name_hash((unsigned char *)name, namelen); - - CDEBUG(D_INFO, "take lock on "DLID4":"LPX64"\n", - OLID4(id), res_id.name[2]); - } -#else -#warning "No PDIROPS support in the kernel" -#endif - rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, res_id, - LDLM_IBITS, &policy, lock_mode, &flags, - mds_blocking_ast, ldlm_completion_ast, - NULL, NULL, NULL, 0, NULL, lockh); + res_id.name[0] = de->d_inode->i_ino; + res_id.name[1] = de->d_inode->i_generation; + rc = ldlm_cli_enqueue_local(obd->obd_namespace, &res_id, + LDLM_IBITS, &policy, lock_mode, &flags, + ldlm_blocking_ast, ldlm_completion_ast, + NULL, NULL, 0, NULL, lockh); if (rc != ELDLM_OK) { l_dput(de); retval = ERR_PTR(-EIO); /* XXX translate ldlm code */ -#ifdef S_PDIROPS - if (lockh[1].cookie) - ldlm_lock_decref(lockh + 1, *mode); -#endif - } else if (de->d_inode && de->d_inode->i_nlink == 0) { - /* as sometimes we lookup inode by ino/generation through - iopen mechanism, it's possible to find already unlinked - inode with nlink == 0. let's interpretate the case as - ENOENT -bzzz */ - CWARN("found already unlinked inode %lu/%u\n", - de->d_inode->i_ino, de->d_inode->i_generation); - l_dput(de); - retval = ERR_PTR(-ENOENT); - ldlm_lock_decref(lockh, lock_mode); -#ifdef S_PDIROPS - if (lockh[1].cookie) - ldlm_lock_decref(lockh + 1, *mode); -#endif } RETURN(retval); } -#ifndef DCACHE_DISCONNECTED -#define DCACHE_DISCONNECTED DCACHE_NFSD_DISCONNECTED -#endif - -/* Look up an entry by inode number. This function ONLY returns valid dget'd - * dentries with an initialized inode or errors */ -struct dentry *mds_id2dentry(struct obd_device *obd, struct lustre_id *id, - struct vfsmount **mnt) +/* Look up an entry by inode number. */ +/* this function ONLY returns valid dget'd dentries with an initialized inode + or errors */ +struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid, + struct vfsmount **mnt) { - unsigned long ino = (unsigned long)id_ino(id); - __u32 generation = (__u32)id_gen(id); - struct mds_obd *mds = &obd->u.mds; - struct dentry *result; + char fid_name[32]; + unsigned long ino = fid->id; + __u32 generation = fid->generation; struct inode *inode; - char idname[32]; + struct dentry *result; if (ino == 0) RETURN(ERR_PTR(-ESTALE)); - snprintf(idname, sizeof(idname), "0x%lx", ino); + snprintf(fid_name, sizeof(fid_name), "0x%lx", ino); - CDEBUG(D_DENTRY, "--> mds_id2dentry: ino/gen %lu/%u, sb %p\n", - ino, generation, mds->mds_sb); + CDEBUG(D_DENTRY, "--> mds_fid2dentry: ino/gen %lu/%u, sb %p\n", + ino, generation, mds->mds_obt.obt_sb); - /* under ext3 this is neither supposed to return bad inodes nor NULL - inodes. */ - result = ll_lookup_one_len(idname, mds->mds_id_de, - strlen(idname)); + /* under ext3 this is neither supposed to return bad inodes + nor NULL inodes. */ + result = ll_lookup_one_len(fid_name, mds->mds_fid_de, strlen(fid_name)); if (IS_ERR(result)) RETURN(result); @@ -346,31 +225,23 @@ struct dentry *mds_id2dentry(struct obd_device *obd, struct lustre_id *id, if (!inode) RETURN(ERR_PTR(-ENOENT)); - if (is_bad_inode(inode)) { - CERROR("bad inode returned %lu/%u\n", - inode->i_ino, inode->i_generation); + if (inode->i_generation == 0 || inode->i_nlink == 0) { + LCONSOLE_WARN("Found inode with zero generation or link -- this" + " may indicate disk corruption (inode: %lu/%u, " + "link %lu, count %d)\n", inode->i_ino, + inode->i_generation,(unsigned long)inode->i_nlink, + atomic_read(&inode->i_count)); dput(result); RETURN(ERR_PTR(-ENOENT)); } - /* here we disabled generation check, as root inode i_generation - * of cache mds and real mds are different. */ - if (inode->i_ino != id_ino(&mds->mds_rootid) && generation && - inode->i_generation != generation) { + if (generation && inode->i_generation != generation) { /* we didn't find the right inode.. */ - if (id_group(id) != mds->mds_num) { - CERROR("bad inode %lu found, link: %lu, ct: %d, generation " - "%u != %u, mds %u != %u, request to wrong MDS?\n", - inode->i_ino, (unsigned long)inode->i_nlink, - atomic_read(&inode->i_count), inode->i_generation, - generation, mds->mds_num, (unsigned)id_group(id)); - } else { - CERROR("bad inode %lu found, link: %lu, ct: %d, generation " - "%u != %u, inode is recreated while request handled?\n", - inode->i_ino, (unsigned long)inode->i_nlink, - atomic_read(&inode->i_count), inode->i_generation, - generation); - } + CDEBUG(D_INODE, "found wrong generation: inode %lu, link: %lu, " + "count: %d, generation %u/%u\n", inode->i_ino, + (unsigned long)inode->i_nlink, + atomic_read(&inode->i_count), inode->i_generation, + generation); dput(result); RETURN(ERR_PTR(-ENOENT)); } @@ -383,385 +254,265 @@ struct dentry *mds_id2dentry(struct obd_device *obd, struct lustre_id *id, RETURN(result); } -static -int mds_req_add_idmapping(struct ptlrpc_request *req, - struct mds_export_data *med) +static int mds_connect_internal(struct obd_export *exp, + struct obd_connect_data *data) { - struct mds_req_sec_desc *rsd; - struct lustre_sec_desc *lsd; - int rc; - - if (!med->med_remote) - return 0; - - /* maybe we should do it more completely: invalidate the gss ctxt? */ - if (req->rq_mapped_uid == MDS_IDMAP_NOTFOUND) { - CWARN("didn't find mapped uid\n"); - return -EPERM; - } - - rsd = lustre_swab_mds_secdesc(req, MDS_REQ_SECDESC_OFF); - if (!rsd) { - CERROR("Can't unpack security desc\n"); - return -EPROTO; - } - - lsd = mds_get_lsd(req->rq_mapped_uid); - if (!lsd) { - CERROR("can't get LSD(%u), no mapping added\n", - req->rq_mapped_uid); - return -EPERM; - } + struct obd_device *obd = exp->exp_obd; + if (data != NULL) { + data->ocd_connect_flags &= MDT_CONNECT_SUPPORTED; + data->ocd_ibits_known &= MDS_INODELOCK_FULL; - rc = mds_idmap_add(med->med_idmap, rsd->rsd_uid, lsd->lsd_uid, - rsd->rsd_gid, lsd->lsd_gid); - mds_put_lsd(lsd); - return rc; -} + /* If no known bits (which should not happen, probably, + as everybody should support LOOKUP and UPDATE bits at least) + revert to compat mode with plain locks. */ + if (!data->ocd_ibits_known && + data->ocd_connect_flags & OBD_CONNECT_IBITS) + data->ocd_connect_flags &= ~OBD_CONNECT_IBITS; -static -int mds_req_del_idmapping(struct ptlrpc_request *req, - struct mds_export_data *med) -{ - struct mds_req_sec_desc *rsd; - struct lustre_sec_desc *lsd; - int rc; + if (!obd->u.mds.mds_fl_acl) + data->ocd_connect_flags &= ~OBD_CONNECT_ACL; - if (!med->med_remote) - return 0; + if (!obd->u.mds.mds_fl_user_xattr) + data->ocd_connect_flags &= ~OBD_CONNECT_XATTR; - rsd = lustre_swab_mds_secdesc(req, MDS_REQ_SECDESC_OFF); - if (!rsd) { - CERROR("Can't unpack security desc\n"); - return -EPROTO; + exp->exp_connect_flags = data->ocd_connect_flags; + data->ocd_version = LUSTRE_VERSION_CODE; + exp->exp_mds_data.med_ibits_known = data->ocd_ibits_known; } - LASSERT(req->rq_mapped_uid != -1); - lsd = mds_get_lsd(req->rq_mapped_uid); - if (!lsd) { - CERROR("can't get LSD(%u), no idmapping deleted\n", - req->rq_mapped_uid); - return -EPERM; + if (obd->u.mds.mds_fl_acl && + ((exp->exp_connect_flags & OBD_CONNECT_ACL) == 0)) { + CWARN("%s: MDS requires ACL support but client does not\n", + obd->obd_name); + return -EBADE; } - - rc = mds_idmap_del(med->med_idmap, rsd->rsd_uid, lsd->lsd_uid, - rsd->rsd_gid, lsd->lsd_gid); - mds_put_lsd(lsd); - return rc; + return 0; } -static int mds_init_export_data(struct ptlrpc_request *req, - struct mds_export_data *med) +static int mds_reconnect(const struct lu_env *env, + struct obd_export *exp, struct obd_device *obd, + struct obd_uuid *cluuid, + struct obd_connect_data *data) { - struct obd_connect_data *data, *reply; - int ask_remote, ask_local; + int rc; ENTRY; - data = lustre_msg_buf(req->rq_reqmsg, 5, sizeof(*data)); - reply = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*data)); - LASSERT(data && reply); - - if (med->med_initialized) { - CDEBUG(D_SEC, "med already initialized, reconnect?\n"); - goto reply; - } - - ask_remote = data->ocd_connect_flags & OBD_CONNECT_REMOTE; - ask_local = data->ocd_connect_flags & OBD_CONNECT_LOCAL; - - /* currently the policy is simple: satisfy client as possible - * as we can. - */ - if (req->rq_auth_uid == -1) { - if (ask_remote) - CWARN("null sec is used, force to be local\n"); - med->med_remote = 0; - } else { - if (ask_remote) { - if (!req->rq_remote_realm) - CWARN("local realm asked to be remote\n"); - med->med_remote = 1; - } else if (ask_local) { - if (req->rq_remote_realm) - CWARN("remote realm asked to be local\n"); - med->med_remote = 0; - } else - med->med_remote = (req->rq_remote_realm != 0); - } - - med->med_nllu = data->ocd_nllu[0]; - med->med_nllg = data->ocd_nllu[1]; - - med->med_initialized = 1; -reply: - reply->ocd_connect_flags &= ~(OBD_CONNECT_REMOTE | OBD_CONNECT_LOCAL); - if (med->med_remote) { - if (!med->med_idmap) - med->med_idmap = mds_idmap_alloc(); - - if (!med->med_idmap) - CERROR("Failed to alloc idmap, following request from " - "this client will be refused\n"); - - reply->ocd_connect_flags |= OBD_CONNECT_REMOTE; - CDEBUG(D_SEC, "set client as remote\n"); - } else { - reply->ocd_connect_flags |= OBD_CONNECT_LOCAL; - CDEBUG(D_SEC, "set client as local\n"); - } - - RETURN(0); -} + if (exp == NULL || obd == NULL || cluuid == NULL) + RETURN(-EINVAL); -static void mds_free_export_data(struct mds_export_data *med) -{ - if (!med->med_idmap) - return; + rc = mds_connect_internal(exp, data); - LASSERT(med->med_remote); - mds_idmap_free(med->med_idmap); - med->med_idmap = NULL; + RETURN(rc); } /* Establish a connection to the MDS. * - * This will set up an export structure for the client to hold state data about - * that client, like open files, the last operation number it did on the server, - * etc. + * This will set up an export structure for the client to hold state data + * about that client, like open files, the last operation number it did + * on the server, etc. */ -static int mds_connect(struct lustre_handle *conn, struct obd_device *obd, +static int mds_connect(const struct lu_env *env, + struct lustre_handle *conn, struct obd_device *obd, struct obd_uuid *cluuid, struct obd_connect_data *data, - unsigned long flags) + void *localdata) { - struct mds_export_data *med; - struct mds_client_data *mcd; struct obd_export *exp; + struct mds_export_data *med; + struct mds_client_data *mcd = NULL; int rc; ENTRY; if (!conn || !obd || !cluuid) RETURN(-EINVAL); - /* XXX There is a small race between checking the list and adding a new - * connection for the same UUID, but the real threat (list corruption - * when multiple different clients connect) is solved. + /* XXX There is a small race between checking the list and adding a + * new connection for the same UUID, but the real threat (list + * corruption when multiple different clients connect) is solved. * - * There is a second race between adding the export to the list, and - * filling in the client data below. Hence skipping the case of NULL - * mcd above. We should already be controlling multiple connects at the - * client, and we can't hold the spinlock over memory allocations - * without risk of deadlocking. + * There is a second race between adding the export to the list, + * and filling in the client data below. Hence skipping the case + * of NULL mcd above. We should already be controlling multiple + * connects at the client, and we can't hold the spinlock over + * memory allocations without risk of deadlocking. */ rc = class_connect(conn, obd, cluuid); if (rc) RETURN(rc); exp = class_conn2export(conn); - - LASSERT(exp != NULL); + LASSERT(exp); med = &exp->exp_mds_data; + exp->exp_flvr.sf_rpc = SPTLRPC_FLVR_NULL; + + rc = mds_connect_internal(exp, data); + if (rc) + GOTO(out, rc); + OBD_ALLOC(mcd, sizeof(*mcd)); - if (!mcd) { - CERROR("%s: out of memory for client data.\n", - obd->obd_name); + if (!mcd) GOTO(out, rc = -ENOMEM); - } memcpy(mcd->mcd_uuid, cluuid, sizeof(mcd->mcd_uuid)); med->med_mcd = mcd; - rc = mds_client_add(obd, &obd->u.mds, med, -1); - if (rc) - GOTO(out, rc); - - EXIT; + rc = mds_client_add(obd, exp, -1, localdata); + GOTO(out, rc); + out: if (rc) { - if (mcd) + if (mcd) { OBD_FREE(mcd, sizeof(*mcd)); - class_disconnect(exp, 0); + med->med_mcd = NULL; + } + class_disconnect(exp); } else { class_export_put(exp); } - return rc; -} - -static int mds_connect_post(struct obd_export *exp, unsigned initial, - unsigned long flags) -{ - struct obd_device *obd = exp->exp_obd; - struct mds_obd *mds = &obd->u.mds; - struct mds_export_data *med; - struct mds_client_data *mcd; - int rc = 0; - ENTRY; - - med = &exp->exp_mds_data; - mcd = med->med_mcd; - - if (initial) { - /* some one reconnect initially, we have to reset - * data existing export can have. bug 6102 */ - if (mcd->mcd_last_xid != 0) - CDEBUG(D_HA, "initial reconnect to existing export\n"); - mcd->mcd_last_transno = 0; - mcd->mcd_last_xid = 0; - mcd->mcd_last_close_xid = 0; - mcd->mcd_last_result = 0; - mcd->mcd_last_data = 0; - } - if (!(flags & OBD_OPT_MDS_CONNECTION)) { - if (!(exp->exp_flags & OBD_OPT_REAL_CLIENT)) { - atomic_inc(&mds->mds_real_clients); - CDEBUG(D_OTHER,"%s: peer from %s is real client (%d)\n", - obd->obd_name, exp->exp_client_uuid.uuid, - atomic_read(&mds->mds_real_clients)); - exp->exp_flags |= OBD_OPT_REAL_CLIENT; - } - if (mds->mds_md_name) - rc = mds_md_connect(obd, mds->mds_md_name); - } RETURN(rc); } -static int mds_init_export(struct obd_export *exp) +int mds_init_export(struct obd_export *exp) { struct mds_export_data *med = &exp->exp_mds_data; - INIT_LIST_HEAD(&med->med_open_head); + CFS_INIT_LIST_HEAD(&med->med_open_head); spin_lock_init(&med->med_open_lock); - return 0; + + spin_lock(&exp->exp_lock); + exp->exp_connecting = 1; + spin_unlock(&exp->exp_lock); + + RETURN(0); } static int mds_destroy_export(struct obd_export *export) { + struct mds_export_data *med; struct obd_device *obd = export->exp_obd; - struct mds_export_data *med = &export->exp_mds_data; + struct mds_obd *mds = &obd->u.mds; struct lvfs_run_ctxt saved; + struct lov_mds_md *lmm; + struct llog_cookie *logcookies; int rc = 0; ENTRY; - mds_free_export_data(med); + med = &export->exp_mds_data; target_destroy_export(export); if (obd_uuid_equals(&export->exp_client_uuid, &obd->obd_uuid)) - GOTO(out, 0); + RETURN(0); push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); - /* Close any open files (which may also cause orphan unlinking). */ + + OBD_ALLOC(lmm, mds->mds_max_mdsize); + if (lmm == NULL) { + CWARN("%s: allocation failure during cleanup; can not force " + "close file handles on this service.\n", obd->obd_name); + GOTO(out, rc = -ENOMEM); + } + + OBD_ALLOC(logcookies, mds->mds_max_cookiesize); + if (logcookies == NULL) { + CWARN("%s: allocation failure during cleanup; can not force " + "close file handles on this service.\n", obd->obd_name); + OBD_FREE(lmm, mds->mds_max_mdsize); + GOTO(out_lmm, rc = -ENOMEM); + } + spin_lock(&med->med_open_lock); while (!list_empty(&med->med_open_head)) { struct list_head *tmp = med->med_open_head.next; struct mds_file_data *mfd = list_entry(tmp, struct mds_file_data, mfd_list); - struct lustre_id sid; - - BDEVNAME_DECLARE_STORAGE(btmp); - - /* bug 1579: fix force-closing for 2.5 */ - struct dentry *dentry = mfd->mfd_dentry; + int lmm_size = mds->mds_max_mdsize; + umode_t mode = mfd->mfd_dentry->d_inode->i_mode; + __u64 valid = 0; - list_del(&mfd->mfd_list); + /* Remove mfd handle so it can't be found again. + * We are consuming the mfd_list reference here. */ + mds_mfd_unlink(mfd, 0); spin_unlock(&med->med_open_lock); - down(&dentry->d_inode->i_sem); - rc = mds_read_inode_sid(obd, dentry->d_inode, &sid); - up(&dentry->d_inode->i_sem); - if (rc) { - CERROR("Can't read inode self id, inode %lu, " - "rc %d\n", dentry->d_inode->i_ino, rc); - memset(&sid, 0, sizeof(sid)); - } - /* If you change this message, be sure to update * replay_single:test_46 */ - CERROR("force closing client file handle for %.*s (%s:" - DLID4")\n", dentry->d_name.len, dentry->d_name.name, - ll_bdevname(dentry->d_inode->i_sb, btmp), - OLID4(&sid)); - - /* child inode->i_alloc_sem protects orphan_dec_test and + CDEBUG(D_INODE|D_IOCTL, "%s: force closing file handle for " + "%.*s (ino %lu)\n", obd->obd_name, + mfd->mfd_dentry->d_name.len,mfd->mfd_dentry->d_name.name, + mfd->mfd_dentry->d_inode->i_ino); + + rc = mds_get_md(obd, mfd->mfd_dentry->d_inode, lmm, &lmm_size, 1); + if (rc < 0) + CWARN("mds_get_md failure, rc=%d\n", rc); + else + valid |= OBD_MD_FLEASIZE; + + /* child orphan sem protects orphan_dec_test and * is_orphan race, mds_mfd_close drops it */ - DOWN_WRITE_I_ALLOC_SEM(dentry->d_inode); - rc = mds_mfd_close(NULL, 0, obd, mfd, - !(export->exp_flags & OBD_OPT_FAILOVER)); + MDS_DOWN_WRITE_ORPHAN_SEM(mfd->mfd_dentry->d_inode); + rc = mds_mfd_close(NULL, REQ_REC_OFF, obd, mfd, + !(export->exp_flags & OBD_OPT_FAILOVER), + lmm, lmm_size, logcookies, + mds->mds_max_cookiesize, + &valid); + if (rc) - CDEBUG(D_INODE, "Error closing file: %d\n", rc); + CDEBUG(D_INODE|D_IOCTL, "Error closing file: %d\n", rc); + + if (valid & OBD_MD_FLCOOKIE) { + rc = mds_osc_destroy_orphan(obd, mode, lmm, + lmm_size, logcookies, 1); + if (rc < 0) { + CDEBUG(D_INODE, "%s: destroy of orphan failed," + " rc = %d\n", obd->obd_name, rc); + rc = 0; + } + valid &= ~OBD_MD_FLCOOKIE; + } + spin_lock(&med->med_open_lock); } spin_unlock(&med->med_open_lock); - pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); - EXIT; + OBD_FREE(logcookies, mds->mds_max_cookiesize); +out_lmm: + OBD_FREE(lmm, mds->mds_max_mdsize); out: - mds_client_free(export, !(export->exp_flags & OBD_OPT_FAILOVER)); - return rc; + pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); + mds_client_free(export); + RETURN(rc); } -static int mds_disconnect(struct obd_export *exp, unsigned long flags) +static int mds_disconnect(struct obd_export *exp) { - unsigned long irqflags; - struct obd_device *obd; - struct mds_obd *mds; int rc; ENTRY; - LASSERT(exp != NULL); - obd = class_exp2obd(exp); - if (obd == NULL) { - CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n", - exp->exp_handle.h_cookie); - RETURN(-EINVAL); - } - mds = &obd->u.mds; - - /* - * suppress any inter-mds requests durring disconnecting lmv if this is - * detected --force mode. This is needed to avoid endless recovery. - */ - if (atomic_read(&mds->mds_real_clients) > 0 && - !(exp->exp_flags & OBD_OPT_REAL_CLIENT)) - flags |= OBD_OPT_FORCE; - - if (!(exp->exp_flags & OBD_OPT_REAL_CLIENT) - && !atomic_read(&mds->mds_real_clients)) { - /* there was no client at all */ - mds_md_disconnect(obd, flags); - } - - if ((exp->exp_flags & OBD_OPT_REAL_CLIENT) - && atomic_dec_and_test(&mds->mds_real_clients)) { - /* time to drop LMV connections */ - CDEBUG(D_OTHER, "%s: last real client %s disconnected. " - "Disconnnect from LMV now\n", - obd->obd_name, exp->exp_client_uuid.uuid); - mds_md_disconnect(obd, flags); - } - - spin_lock_irqsave(&exp->exp_lock, irqflags); - exp->exp_flags = flags; - spin_unlock_irqrestore(&exp->exp_lock, irqflags); + LASSERT(exp); + class_export_get(exp); - /* disconnect early so that clients can't keep using export */ - rc = class_disconnect(exp, flags); - ldlm_cancel_locks_for_export(exp); + /* Disconnect early so that clients can't keep using export */ + rc = class_disconnect(exp); + if (exp->exp_obd->obd_namespace != NULL) + ldlm_cancel_locks_for_export(exp); /* complete all outstanding replies */ - spin_lock_irqsave(&exp->exp_lock, irqflags); + spin_lock(&exp->exp_lock); while (!list_empty(&exp->exp_outstanding_replies)) { struct ptlrpc_reply_state *rs = list_entry(exp->exp_outstanding_replies.next, struct ptlrpc_reply_state, rs_exp_list); - struct ptlrpc_service *svc = rs->rs_srv_ni->sni_service; + struct ptlrpc_service *svc = rs->rs_service; spin_lock(&svc->srv_lock); list_del_init(&rs->rs_exp_list); ptlrpc_schedule_difficult_reply(rs); spin_unlock(&svc->srv_lock); } - spin_unlock_irqrestore(&exp->exp_lock, irqflags); + spin_unlock(&exp->exp_lock); + + class_export_put(exp); RETURN(rc); } @@ -769,137 +520,70 @@ static int mds_getstatus(struct ptlrpc_request *req) { struct mds_obd *mds = mds_req2mds(req); struct mds_body *body; - int rc, size; + int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) }; ENTRY; - size = sizeof(*body); - - rc = lustre_pack_reply(req, 1, &size, NULL); - if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK)) { - CERROR("mds: out of memory for message: size=%d\n", size); - req->rq_status = -ENOMEM; /* superfluous? */ - RETURN(-ENOMEM); - } - - body = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*body)); - body->valid |= OBD_MD_FID; - - memcpy(&body->id1, &mds->mds_rootid, sizeof(body->id1)); - - /* - * the last_committed and last_xid fields are filled in for all replies - * already - no need to do so here also. - */ - RETURN(0); -} - -int mds_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, - void *data, int flag) -{ - int do_ast; - ENTRY; + if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK)) + RETURN(req->rq_status = -ENOMEM); + rc = lustre_pack_reply(req, 2, size, NULL); + if (rc) + RETURN(req->rq_status = rc); - if (flag == LDLM_CB_CANCELING) { - /* Don't need to do anything here. */ - RETURN(0); - } + body = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*body)); + memcpy(&body->fid1, &mds->mds_rootfid, sizeof(body->fid1)); - /* XXX layering violation! -phil */ - lock_res_and_lock(lock); - - /* - * get this: if mds_blocking_ast is racing with mds_intent_policy, such - * that mds_blocking_ast is called just before l_i_p takes the ns_lock, - * then by the time we get the lock, we might not be the correct - * blocking function anymore. So check, and return early, if so. + /* the last_committed and last_xid fields are filled in for all + * replies already - no need to do so here also. */ - if (lock->l_blocking_ast != mds_blocking_ast) { - unlock_res_and_lock(lock); - RETURN(0); - } - - lock->l_flags |= LDLM_FL_CBPENDING; - do_ast = (!lock->l_readers && !lock->l_writers); - unlock_res_and_lock(lock); - - if (do_ast) { - struct lustre_handle lockh; - int rc; - - LDLM_DEBUG(lock, "already unused, calling ldlm_cli_cancel"); - ldlm_lock2handle(lock, &lockh); - rc = ldlm_cli_cancel(&lockh); - if (rc < 0) - CERROR("ldlm_cli_cancel: %d\n", rc); - } else { - LDLM_DEBUG(lock, "Lock still has references, will be " - "cancelled later"); - } RETURN(0); } -static int mds_convert_md(struct obd_device *obd, struct inode *inode, - void *md, int size, int mea) -{ - int rc = size; - - if (S_ISREG(inode->i_mode)) { - rc = mds_convert_lov_ea(obd, inode, md, size); - } else if (S_ISDIR(inode->i_mode)) { - if (mea) { - rc = mds_convert_mea_ea(obd, inode, md, size); - } else { - rc = mds_convert_lov_ea(obd, inode, md, size); - } - if (rc == -EINVAL) { - CERROR("Invalid EA format (nor LOV or MEA) " - "is detected. Inode %lu/%u\n", - inode->i_ino, inode->i_generation); - } - } - return rc; -} - -int mds_get_md(struct obd_device *obd, struct inode *inode, - void *md, int *size, int lock, int mea) +/* get the LOV EA from @inode and store it into @md. It can be at most + * @size bytes, and @size is updated with the actual EA size. + * The EA size is also returned on success, and -ve errno on failure. + * If there is no EA then 0 is returned. */ +int mds_get_md(struct obd_device *obd, struct inode *inode, void *md, + int *size, int lock) { - int lmm_size; int rc = 0; - ENTRY; + int lmm_size; if (lock) - down(&inode->i_sem); + LOCK_INODE_MUTEX(inode); + rc = fsfilt_get_md(obd, inode, md, *size, "lov"); - rc = fsfilt_get_md(obd, inode, md, *size, - (mea ? EA_MEA : EA_LOV)); if (rc < 0) { CERROR("Error %d reading eadata for ino %lu\n", rc, inode->i_ino); } else if (rc > 0) { lmm_size = rc; - rc = mds_convert_md(obd, inode, md, - lmm_size, mea); + rc = mds_convert_lov_ea(obd, inode, md, lmm_size); + if (rc == 0) { *size = lmm_size; rc = lmm_size; } else if (rc > 0) { *size = rc; } + } else { + *size = 0; } if (lock) - up(&inode->i_sem); + UNLOCK_INODE_MUTEX(inode); - RETURN(rc); + RETURN (rc); } -/* Call with lock=1 if you want mds_pack_md to take the i_sem. - * Call with lock=0 if the caller has already taken the i_sem. */ + +/* Call with lock=1 if you want mds_pack_md to take the i_mutex. + * Call with lock=0 if the caller has already taken the i_mutex. */ int mds_pack_md(struct obd_device *obd, struct lustre_msg *msg, int offset, - struct mds_body *body, struct inode *inode, int lock, int mea) + struct mds_body *body, struct inode *inode, int lock) { struct mds_obd *mds = &obd->u.mds; - int rc, lmm_size; void *lmm; + int lmm_size; + int rc; ENTRY; lmm = lustre_msg_buf(msg, offset, 0); @@ -910,7 +594,7 @@ int mds_pack_md(struct obd_device *obd, struct lustre_msg *msg, int offset, inode->i_ino); RETURN(0); } - lmm_size = msg->buflens[offset]; + lmm_size = lustre_msg_buflen(msg, offset); /* I don't really like this, but it is a sanity check on the client * MD request. However, if the client doesn't know how much space @@ -922,14 +606,12 @@ int mds_pack_md(struct obd_device *obd, struct lustre_msg *msg, int offset, // RETURN(-EINVAL); } - rc = mds_get_md(obd, inode, lmm, &lmm_size, lock, mea); + rc = mds_get_md(obd, inode, lmm, &lmm_size, lock); if (rc > 0) { - body->valid |= S_ISDIR(inode->i_mode) ? - OBD_MD_FLDIREA : OBD_MD_FLEASIZE; - - if (mea) - body->valid |= OBD_MD_MEA; - + if (S_ISDIR(inode->i_mode)) + body->valid |= OBD_MD_FLDIREA; + else + body->valid |= OBD_MD_FLEASIZE; body->eadatasize = lmm_size; rc = 0; } @@ -937,306 +619,175 @@ int mds_pack_md(struct obd_device *obd, struct lustre_msg *msg, int offset, RETURN(rc); } -int mds_pack_link(struct dentry *dentry, struct ptlrpc_request *req, - struct mds_body *repbody, int reply_off) +#ifdef CONFIG_FS_POSIX_ACL +static +int mds_pack_posix_acl(struct inode *inode, struct lustre_msg *repmsg, + struct mds_body *repbody, int repoff) { - struct inode *inode = dentry->d_inode; - char *symname; - int len, rc; + struct dentry de = { .d_inode = inode }; + int buflen, rc; ENTRY; - symname = lustre_msg_buf(req->rq_repmsg, reply_off + 1,0); - LASSERT(symname != NULL); - len = req->rq_repmsg->buflens[reply_off + 1]; - - rc = inode->i_op->readlink(dentry, symname, len); - if (rc < 0) { - CERROR("readlink failed: %d\n", rc); - } else if (rc != len - 1) { - CERROR ("Unexpected readlink rc %d: expecting %d\n", - rc, len - 1); - rc = -EINVAL; - } else { - CDEBUG(D_INODE, "read symlink dest %s\n", symname); - repbody->valid |= OBD_MD_LINKNAME; - repbody->eadatasize = rc + 1; - symname[rc] = 0; /* NULL terminate */ - rc = 0; - } - - RETURN(rc); -} + LASSERT(repbody->aclsize == 0); + LASSERT(lustre_msg_bufcount(repmsg) > repoff); -int mds_pack_ea(struct dentry *dentry, struct ptlrpc_request *req, - struct mds_body *repbody, int req_off, int reply_off) -{ - struct inode *inode = dentry->d_inode; - char *ea_name; - void *value = NULL; - int len, rc; - ENTRY; + buflen = lustre_msg_buflen(repmsg, repoff); + if (!buflen) + GOTO(out, 0); - ea_name = lustre_msg_string(req->rq_reqmsg, req_off + 1, 0); - len = req->rq_repmsg->buflens[reply_off + 1]; - if (len != 0) - value = lustre_msg_buf(req->rq_repmsg, reply_off + 1, len); + if (!inode->i_op || !inode->i_op->getxattr) + GOTO(out, 0); - rc = -EOPNOTSUPP; - if (inode->i_op && inode->i_op->getxattr) - rc = inode->i_op->getxattr(dentry, ea_name, value, len); + rc = inode->i_op->getxattr(&de, MDS_XATTR_NAME_ACL_ACCESS, + lustre_msg_buf(repmsg, repoff, buflen), + buflen); - if (rc < 0) { - if (rc != -ENODATA && rc != -EOPNOTSUPP) - CERROR("getxattr failed: %d", rc); - } else { - repbody->valid |= OBD_MD_FLEA; - repbody->eadatasize = rc; - rc = 0; + if (rc >= 0) + repbody->aclsize = rc; + else if (rc != -ENODATA) { + CERROR("buflen %d, get acl: %d\n", buflen, rc); + RETURN(rc); } + EXIT; +out: + repbody->valid |= OBD_MD_FLACL; + return 0; +} +#else +#define mds_pack_posix_acl(inode, repmsg, repbody, repoff) 0 +#endif - RETURN(rc); +int mds_pack_acl(struct mds_export_data *med, struct inode *inode, + struct lustre_msg *repmsg, struct mds_body *repbody, + int repoff) +{ + return mds_pack_posix_acl(inode, repmsg, repbody, repoff); } -int mds_pack_ealist(struct dentry *dentry, struct ptlrpc_request *req, - struct mds_body *repbody, int reply_off) +static int mds_getattr_internal(struct obd_device *obd, struct dentry *dentry, + struct ptlrpc_request *req, + struct mds_body *reqbody, int reply_off) { - struct inode *inode = dentry->d_inode; - void *value = NULL; - int len, rc; + struct mds_body *body; + struct inode *inode = dentry->d_inode; + int rc = 0; ENTRY; - len = req->rq_repmsg->buflens[reply_off + 1]; - if (len != 0) - value = lustre_msg_buf(req->rq_repmsg, reply_off + 1, len); + if (inode == NULL) + RETURN(-ENOENT); - rc = -EOPNOTSUPP; - if (inode->i_op && inode->i_op->getxattr) - rc = inode->i_op->listxattr(dentry, value, len); + body = lustre_msg_buf(req->rq_repmsg, reply_off, sizeof(*body)); + LASSERT(body != NULL); /* caller prepped reply */ - if (rc < 0) { - CERROR("listxattr failed: %d", rc); - } else { - repbody->valid |= OBD_MD_FLEALIST; - repbody->eadatasize = rc; - rc = 0; - } - RETURN(rc); -} + mds_pack_inode2fid(&body->fid1, inode); + body->flags = reqbody->flags; /* copy MDS_BFLAG_EXT_FLAGS if present */ + mds_pack_inode2body(body, inode); + reply_off++; -static -int mds_pack_posix_acl(struct lustre_msg *repmsg, int offset, - struct mds_body *body, struct inode *inode) -{ - struct dentry de = { .d_inode = inode }; - __u32 buflen, *sizep; - void *buf; - int size; - ENTRY; + if ((S_ISREG(inode->i_mode) && (reqbody->valid & OBD_MD_FLEASIZE)) || + (S_ISDIR(inode->i_mode) && (reqbody->valid & OBD_MD_FLDIREA))) { + rc = mds_pack_md(obd, req->rq_repmsg, reply_off, body, + inode, 1); - if (!inode->i_op->getxattr) - RETURN(0); + /* If we have LOV EA data, the OST holds size, atime, mtime */ + if (!(body->valid & OBD_MD_FLEASIZE) && + !(body->valid & OBD_MD_FLDIREA)) + body->valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS | + OBD_MD_FLATIME | OBD_MD_FLMTIME); - buflen = repmsg->buflens[offset + 1]; - buf = lustre_msg_buf(repmsg, offset + 1, buflen); + lustre_shrink_reply(req, reply_off, body->eadatasize, 0); + if (body->eadatasize) + reply_off++; + } else if (S_ISLNK(inode->i_mode) && + (reqbody->valid & OBD_MD_LINKNAME) != 0) { + char *symname = lustre_msg_buf(req->rq_repmsg, reply_off, 0); + int len; - size = inode->i_op->getxattr(&de, XATTR_NAME_ACL_ACCESS, buf, buflen); - if (size == -ENODATA || size == -EOPNOTSUPP) - RETURN(0); - if (size < 0) - RETURN(size); - LASSERT(size); + LASSERT (symname != NULL); /* caller prepped reply */ + len = lustre_msg_buflen(req->rq_repmsg, reply_off); - sizep = lustre_msg_buf(repmsg, offset, 4); - if (!sizep) { - CERROR("can't locate returned acl size buf\n"); - RETURN(-EPROTO); + rc = inode->i_op->readlink(dentry, symname, len); + if (rc < 0) { + CERROR("readlink failed: %d\n", rc); + } else if (rc != len - 1) { + CERROR ("Unexpected readlink rc %d: expecting %d\n", + rc, len - 1); + rc = -EINVAL; + } else { + CDEBUG(D_INODE, "read symlink dest %s\n", symname); + body->valid |= OBD_MD_LINKNAME; + body->eadatasize = rc + 1; + symname[rc] = 0; /* NULL terminate */ + rc = 0; + } + reply_off++; + } else if (reqbody->valid == OBD_MD_FLFLAGS && + reqbody->flags & MDS_BFLAG_EXT_FLAGS) { + int flags; + + /* We only return the full set of flags on ioctl, otherwise we + * get enough flags from the inode in mds_pack_inode2body(). */ + rc = fsfilt_iocontrol(obd, inode, NULL, EXT3_IOC_GETFLAGS, + (long)&flags); + if (rc == 0) + body->flags = flags | MDS_BFLAG_EXT_FLAGS; } - *sizep = cpu_to_le32(size); - body->valid |= OBD_MD_FLACL_ACCESS; - - RETURN(0); -} + if (reqbody->valid & OBD_MD_FLMODEASIZE) { + struct mds_obd *mds = mds_req2mds(req); + body->max_cookiesize = mds->mds_max_cookiesize; + body->max_mdsize = mds->mds_max_mdsize; + body->valid |= OBD_MD_FLMODEASIZE; + } -static -int mds_pack_remote_perm(struct ptlrpc_request *req, int reply_off, - struct mds_body *body, struct inode *inode) -{ - struct lustre_sec_desc *lsd; - struct mds_remote_perm *perm; - __u32 lsd_perms; - - LASSERT(inode->i_op); - LASSERT(inode->i_op->permission); - LASSERT(req->rq_export->exp_mds_data.med_remote); - - perm = (struct mds_remote_perm *) - lustre_msg_buf(req->rq_repmsg, reply_off, sizeof(perm)); - if (!perm) - return -EINVAL; - - memset(perm, 0, sizeof(*perm)); - - /* obtain authenticated uid/gid and LSD permissions, which - * might be different from current process context, from LSD - */ - lsd = mds_get_lsd(current->uid); - if (!lsd) { - CWARN("can't LSD of uid %u\n", current->uid); - RETURN(-EPERM); - } - - perm->mrp_auth_uid = lsd->lsd_uid; - perm->mrp_auth_gid = lsd->lsd_gid; - - lsd_perms = mds_lsd_get_perms(lsd, 1, 0, req->rq_peer.peer_id.nid); - if (lsd_perms & LSD_PERM_SETUID) - perm->mrp_allow_setuid = 1; - if (lsd_perms & LSD_PERM_SETGID) - perm->mrp_allow_setgid = 1; - - mds_put_lsd(lsd); - - /* permission bits of current user - * XXX this is low efficient, could we do it in one blow? - */ - if (inode->i_op->permission(inode, MAY_EXEC, NULL) == 0) - perm->mrp_perm |= MAY_EXEC; - if (inode->i_op->permission(inode, MAY_WRITE, NULL) == 0) - perm->mrp_perm |= MAY_WRITE; - if (inode->i_op->permission(inode, MAY_READ, NULL) == 0) - perm->mrp_perm |= MAY_READ; - - body->valid |= (OBD_MD_FLACL_ACCESS | OBD_MD_RACL); - - RETURN(0); -} - -int mds_pack_acl(struct ptlrpc_request *req, int reply_off, - struct mds_body *body, struct inode *inode) -{ - int rc; - - if (!req->rq_export->exp_mds_data.med_remote) - rc = mds_pack_posix_acl(req->rq_repmsg, reply_off, body, inode); - else - rc = mds_pack_remote_perm(req, reply_off, body, inode); - - return rc; -} - -static int mds_getattr_internal(struct obd_device *obd, struct dentry *dentry, - struct ptlrpc_request *req, int req_off, - struct mds_body *reqbody, int reply_off, - struct mds_req_sec_desc *rsd) -{ - struct mds_export_data *med = &req->rq_export->u.eu_mds_data; - struct inode *inode = dentry->d_inode; - struct mds_body *body; - int rc = 0; - ENTRY; - - if (inode == NULL && !(dentry->d_flags & DCACHE_CROSS_REF)) - RETURN(-ENOENT); - - body = lustre_msg_buf(req->rq_repmsg, reply_off, sizeof(*body)); - LASSERT(body != NULL); /* caller prepped reply */ + if (rc) + RETURN(rc); - if (dentry->d_flags & DCACHE_CROSS_REF) { - mds_pack_dentry2body(obd, body, dentry, - (reqbody->valid & OBD_MD_FID) ? 1 : 0); - CDEBUG(D_OTHER, "cross reference: "DLID4"\n", - OLID4(&body->id1)); - RETURN(0); - } - - mds_pack_inode2body(obd, body, inode, - (reqbody->valid & OBD_MD_FID) ? 1 : 0); +#ifdef CONFIG_FS_POSIX_ACL + if ((req->rq_export->exp_connect_flags & OBD_CONNECT_ACL) && + (reqbody->valid & OBD_MD_FLACL)) { + rc = mds_pack_acl(&req->rq_export->exp_mds_data, + inode, req->rq_repmsg, + body, reply_off); - if ((S_ISREG(inode->i_mode) && (reqbody->valid & OBD_MD_FLEASIZE)) || - (S_ISDIR(inode->i_mode) && (reqbody->valid & OBD_MD_FLDIREA))) { - - /* guessing what kind og attribute do we need. */ - int is_mea = (S_ISDIR(inode->i_mode) && - (reqbody->valid & OBD_MD_MEA) != 0); - - rc = mds_pack_md(obd, req->rq_repmsg, reply_off + 1, - body, inode, 1, is_mea); - - /* if we have LOV EA data, the OST holds size, atime, mtime. */ - if (!(body->valid & OBD_MD_FLEASIZE) && - !(body->valid & OBD_MD_FLDIREA)) - body->valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS | - OBD_MD_FLATIME | OBD_MD_FLMTIME); - } else if (S_ISLNK(inode->i_mode) && - (reqbody->valid & OBD_MD_LINKNAME) != 0) { - rc = mds_pack_link(dentry, req, body, reply_off); - } else if (reqbody->valid & OBD_MD_FLEA) { - rc = mds_pack_ea(dentry, req, body, req_off, reply_off); - } else if (reqbody->valid & OBD_MD_FLEALIST) { - rc = mds_pack_ealist(dentry, req, body, reply_off); + lustre_shrink_reply(req, reply_off, body->aclsize, 0); + if (body->aclsize) + reply_off++; } - - if (reqbody->valid & OBD_MD_FLACL_ACCESS) { - int inc = (reqbody->valid & OBD_MD_FLEASIZE) ? 2 : 1; - rc = mds_pack_acl(req, reply_off + inc, body, inode); - } - - if (rc == 0) - mds_body_do_reverse_map(med, body); +#endif RETURN(rc); } -static int mds_getattr_pack_msg_cf(struct ptlrpc_request *req, - struct dentry *dentry, - int offset) -{ - int rc = 0, size[1] = {sizeof(struct mds_body)}; - ENTRY; - - if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) { - CERROR("failed MDS_GETATTR_PACK test\n"); - req->rq_status = -ENOMEM; - RETURN(-ENOMEM); - } - - rc = lustre_pack_reply(req, 1, size, NULL); - if (rc) { - CERROR("lustre_pack_reply failed: rc %d\n", rc); - GOTO(out, req->rq_status = rc); - } - - EXIT; -out: - return rc; -} - -static int mds_getattr_pack_msg(struct ptlrpc_request *req, struct dentry *de, +static int mds_getattr_pack_msg(struct ptlrpc_request *req, struct inode *inode, int offset) { - struct inode *inode = de->d_inode; struct mds_obd *mds = mds_req2mds(req); struct mds_body *body; - int rc = 0, size[4] = {sizeof(*body)}, bufcount = 1; + int rc, bufcount = 2; + int size[4] = { sizeof(struct ptlrpc_body), sizeof(*body) }; ENTRY; + LASSERT(offset == REQ_REC_OFF); /* non-intent */ + body = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*body)); - LASSERT(body != NULL); /* checked by caller */ - LASSERT_REQSWABBED(req, offset); /* swabbed by caller */ + LASSERT(body != NULL); /* checked by caller */ + LASSERT(lustre_req_swabbed(req, offset)); /* swabbed by caller */ if ((S_ISREG(inode->i_mode) && (body->valid & OBD_MD_FLEASIZE)) || (S_ISDIR(inode->i_mode) && (body->valid & OBD_MD_FLDIREA))) { - int rc; - - down(&inode->i_sem); + LOCK_INODE_MUTEX(inode); rc = fsfilt_get_md(req->rq_export->exp_obd, inode, NULL, 0, - ((body->valid & OBD_MD_MEA) ? EA_MEA : EA_LOV)); - up(&inode->i_sem); + "lov"); + UNLOCK_INODE_MUTEX(inode); + CDEBUG(D_INODE, "got %d bytes MD data for inode %lu\n", + rc, inode->i_ino); if (rc < 0) { - if (rc != -ENODATA && rc != -EOPNOTSUPP) + if (rc != -ENODATA) { CERROR("error getting inode %lu MD: rc = %d\n", inode->i_ino, rc); + RETURN(rc); + } size[bufcount] = 0; } else if (rc > mds->mds_max_mdsize) { size[bufcount] = 0; @@ -1247,520 +798,313 @@ static int mds_getattr_pack_msg(struct ptlrpc_request *req, struct dentry *de, } bufcount++; } else if (S_ISLNK(inode->i_mode) && (body->valid & OBD_MD_LINKNAME)) { - if (inode->i_size + 1 != body->eadatasize) + if (i_size_read(inode) + 1 != body->eadatasize) CERROR("symlink size: %Lu, reply space: %d\n", - inode->i_size + 1, body->eadatasize); - size[bufcount] = min_t(int, inode->i_size+1, body->eadatasize); + i_size_read(inode) + 1, body->eadatasize); + size[bufcount] = min_t(int, i_size_read(inode) + 1, + body->eadatasize); bufcount++; CDEBUG(D_INODE, "symlink size: %Lu, reply space: %d\n", - inode->i_size + 1, body->eadatasize); - } else if ((body->valid & OBD_MD_FLEA)) { - char *ea_name = lustre_msg_string(req->rq_reqmsg, - offset + 1, 0); - rc = -EOPNOTSUPP; - if (inode->i_op && inode->i_op->getxattr) - rc = inode->i_op->getxattr(de, ea_name, NULL, 0); - - if (rc < 0) { - if (rc != -ENODATA && rc != -EOPNOTSUPP) - CERROR("error getting inode %lu EA: rc = %d\n", - inode->i_ino, rc); - size[bufcount] = 0; - } else { - size[bufcount] = min_t(int, body->eadatasize, rc); - } - bufcount++; - } else if (body->valid & OBD_MD_FLEALIST) { - rc = -EOPNOTSUPP; - if (inode->i_op && inode->i_op->getxattr) - rc = inode->i_op->listxattr(de, NULL, 0); - - if (rc < 0) { - if (rc != -ENODATA && rc != -EOPNOTSUPP) - CERROR("error getting inode %lu EA: rc = %d\n", - inode->i_ino, rc); - size[bufcount] = 0; - } else { - size[bufcount] = min_t(int, body->eadatasize, rc); + i_size_read(inode) + 1, body->eadatasize); + } + +#ifdef CONFIG_FS_POSIX_ACL + if ((req->rq_export->exp_connect_flags & OBD_CONNECT_ACL) && + (body->valid & OBD_MD_FLACL)) { + struct dentry de = { .d_inode = inode }; + + size[bufcount] = 0; + if (inode->i_op && inode->i_op->getxattr) { + rc = inode->i_op->getxattr(&de, MDS_XATTR_NAME_ACL_ACCESS, + NULL, 0); + + if (rc < 0) { + if (rc != -ENODATA) { + CERROR("got acl size: %d\n", rc); + RETURN(rc); + } + } else + size[bufcount] = rc; } bufcount++; } - - /* may co-exist with OBD_MD_FLEASIZE */ - if (body->valid & OBD_MD_FLACL_ACCESS) { - if (req->rq_export->exp_mds_data.med_remote) { - size[bufcount++] = sizeof(struct mds_remote_perm); - } else { - size[bufcount++] = 4; - size[bufcount++] = xattr_acl_size(LL_ACL_MAX_ENTRIES); - } - } +#endif if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) { CERROR("failed MDS_GETATTR_PACK test\n"); req->rq_status = -ENOMEM; - GOTO(out, rc = -ENOMEM); + RETURN(-ENOMEM); } rc = lustre_pack_reply(req, bufcount, size, NULL); if (rc) { - CERROR("out of memory\n"); - GOTO(out, req->rq_status = rc); - } - - EXIT; - out: - return rc; -} - -int mds_check_mds_num(struct obd_device *obd, struct inode *inode, - char *name, int namelen) -{ - struct mea *mea = NULL; - int mea_size, rc = 0; - ENTRY; - - rc = mds_md_get_attr(obd, inode, &mea, &mea_size); - if (rc) + req->rq_status = rc; RETURN(rc); - if (mea != NULL) { - /* - * dir is already splitted, check if requested filename should - * live at this MDS or at another one. - */ - int i = mea_name2idx(mea, name, namelen - 1); - if (mea->mea_master != id_group(&mea->mea_ids[i])) { - CDEBUG(D_OTHER, - "inapropriate MDS(%d) for %s. should be " - "%lu(%d)\n", mea->mea_master, name, - (unsigned long)id_group(&mea->mea_ids[i]), i); - rc = -ERESTART; - } - } - - if (mea) - OBD_FREE(mea, mea_size); - RETURN(rc); -} - -int mds_getattr_size(struct obd_device *obd, struct dentry *dentry, - struct ptlrpc_request *req, struct mds_body *body) -{ - struct inode *inode = dentry->d_inode; - ENTRY; - - LASSERT(body != NULL); - - if (dentry->d_inode == NULL || !S_ISREG(inode->i_mode)) - RETURN(0); - - if (obd->obd_recovering) { - CDEBUG(D_ERROR, "size for "DLID4" is unknown yet (recovering)\n", - OLID4(&body->id1)); - RETURN(0); } - if (atomic_read(&inode->i_writecount)) { - /* some one has opened the file for write. - * mds doesn't know actual size */ - CDEBUG(D_OTHER, "MDS doesn't know actual size for "DLID4"\n", - OLID4(&body->id1)); - RETURN(0); - } - CDEBUG(D_OTHER, "MDS returns "LPD64"/"LPD64" for"DLID4"\n", - body->size, body->blocks, OLID4(&body->id1)); - body->valid |= OBD_MD_FLSIZE; RETURN(0); } static int mds_getattr_lock(struct ptlrpc_request *req, int offset, - struct lustre_handle *child_lockh, int child_part) + int child_part, struct lustre_handle *child_lockh) { struct obd_device *obd = req->rq_export->exp_obd; struct mds_obd *mds = &obd->u.mds; struct ldlm_reply *rep = NULL; struct lvfs_run_ctxt saved; - struct mds_req_sec_desc *rsd; struct mds_body *body; struct dentry *dparent = NULL, *dchild = NULL; - struct lvfs_ucred uc = {NULL, NULL,}; - struct lustre_handle parent_lockh[2] = {{0}, {0}}; - unsigned int namesize; - int rc = 0, cleanup_phase = 0, resent_req = 0, update_mode, reply_offset; - char *name = NULL; + struct lvfs_ucred uc = {0,}; + struct lustre_handle parent_lockh; + int namesize; + int rc = 0, cleanup_phase = 0, resent_req = 0; + char *name; ENTRY; - LASSERT(!strcmp(obd->obd_type->typ_name, OBD_MDS_DEVICENAME)); - MD_COUNTER_INCREMENT(obd, getattr_lock); - - rsd = lustre_swab_mds_secdesc(req, MDS_REQ_SECDESC_OFF); - if (!rsd) { - CERROR("Can't unpack security desc\n"); - RETURN(-EFAULT); - } + LASSERT(!strcmp(obd->obd_type->typ_name, LUSTRE_MDS_NAME)); - /* swab now, before anyone looks inside the request. */ + /* Swab now, before anyone looks inside the request */ body = lustre_swab_reqbuf(req, offset, sizeof(*body), lustre_swab_mds_body); if (body == NULL) { CERROR("Can't swab mds_body\n"); - GOTO(cleanup, rc = -EFAULT); + RETURN(-EFAULT); } - LASSERT_REQSWAB(req, offset + 1); + lustre_set_req_swabbed(req, offset + 1); name = lustre_msg_string(req->rq_reqmsg, offset + 1, 0); if (name == NULL) { CERROR("Can't unpack name\n"); - GOTO(cleanup, rc = -EFAULT); + RETURN(-EFAULT); } - namesize = req->rq_reqmsg->buflens[offset + 1]; - + namesize = lustre_msg_buflen(req->rq_reqmsg, offset + 1); /* namesize less than 2 means we have empty name, probably came from revalidate by cfid, so no point in having name to be set */ if (namesize <= 1) name = NULL; - LASSERT (offset == 1 || offset == 3); - if (offset == 3) { - rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*rep)); - reply_offset = 1; - } else { - reply_offset = 0; - } - - rc = mds_init_ucred(&uc, req, rsd); - if (rc) { + rc = mds_init_ucred(&uc, req, offset); + if (rc) GOTO(cleanup, rc); + + LASSERT(offset == REQ_REC_OFF || offset == DLM_INTENT_REC_OFF); + /* if requests were at offset 2, the getattr reply goes back at 1 */ + if (offset == DLM_INTENT_REC_OFF) { + rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF, + sizeof(*rep)); + offset = DLM_REPLY_REC_OFF; } push_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc); cleanup_phase = 1; /* kernel context */ intent_set_disposition(rep, DISP_LOOKUP_EXECD); - LASSERT(namesize > 0); - if (child_lockh->cookie != 0) { - LASSERT(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT); - resent_req = 1; - } -#if HAVE_LOOKUP_RAW + /* FIXME: handle raw lookup */ +#if 0 if (body->valid == OBD_MD_FLID) { struct mds_body *mds_reply; int size = sizeof(*mds_reply); - struct inode *dir; ino_t inum; - - dparent = mds_id2dentry(obd, &body->id1, NULL); - if (IS_ERR(dparent)) { - rc = PTR_ERR(dparent); - GOTO(cleanup, rc); - } - /* - * the user requested ONLY the inode number, so do a raw lookup. - */ + // The user requested ONLY the inode number, so do a raw lookup rc = lustre_pack_reply(req, 1, &size, NULL); if (rc) { CERROR("out of memory\n"); - l_dput(dparent); GOTO(cleanup, rc); } - dir = dparent->d_inode; - LASSERT(dir->i_op->lookup_raw != NULL); + rc = dir->i_op->lookup_raw(dir, name, namesize - 1, &inum); - l_dput(dparent); - mds_reply = lustre_msg_buf(req->rq_repmsg, 0, - sizeof(*mds_reply)); - id_ino(&mds_reply->id1) = inum; + mds_reply = lustre_msg_buf(req->rq_repmsg, offset, + sizeof(*mds_reply)); + mds_reply->fid1.id = inum; mds_reply->valid = OBD_MD_FLID; GOTO(cleanup, rc); } #endif + + if (lustre_handle_is_used(child_lockh)) { + LASSERT(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT); + resent_req = 1; + } + if (resent_req == 0) { - LASSERT(id_fid(&body->id1) != 0); if (name) { - rc = mds_get_parent_child_locked(obd, mds, &body->id1, - parent_lockh, &dparent, - LCK_PR, + OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RESEND, obd_timeout * 2); + rc = mds_get_parent_child_locked(obd, &obd->u.mds, + &body->fid1, + &parent_lockh, + &dparent, LCK_CR, MDS_INODELOCK_UPDATE, - &update_mode, name, namesize, - child_lockh, &dchild, - LCK_PR, child_part); - if (rc) - GOTO(cleanup, rc); - - cleanup_phase = 2; /* dchild, dparent, locks */ - - /* - * let's make sure this name should leave on this mds - * node. - */ - rc = mds_check_mds_num(obd, dparent->d_inode, name, namesize); - if (rc) - GOTO(cleanup, rc); + child_lockh, &dchild, + LCK_CR, child_part); } else { - /* we have no dentry here, drop LOOKUP bit */ - /* FIXME: we need MDS_INODELOCK_LOOKUP or not. */ - child_part &= ~MDS_INODELOCK_LOOKUP; - CDEBUG(D_OTHER, "%s: retrieve attrs for "DLID4"\n", - obd->obd_name, OLID4(&body->id1)); - - dchild = mds_id2locked_dentry(obd, &body->id1, NULL, - LCK_PR, parent_lockh, - &update_mode, - NULL, 0, - MDS_INODELOCK_UPDATE); - if (IS_ERR(dchild)) { - CERROR("can't find inode with id "DLID4", err = %d\n", - OLID4(&body->id1), (int)PTR_ERR(dchild)); - GOTO(cleanup, rc = PTR_ERR(dchild)); - } - memcpy(child_lockh, parent_lockh, sizeof(parent_lockh[0])); + /* For revalidate by fid we always take UPDATE lock */ + dchild = mds_fid2locked_dentry(obd, &body->fid2, NULL, + LCK_CR, child_lockh, + child_part); + LASSERT(dchild); + if (IS_ERR(dchild)) + rc = PTR_ERR(dchild); } + if (rc) + GOTO(cleanup, rc); } else { struct ldlm_lock *granted_lock; - + struct ll_fid child_fid; + struct ldlm_resource *res; DEBUG_REQ(D_DLMTRACE, req, "resent, not enqueuing new locks"); granted_lock = ldlm_handle2lock(child_lockh); - - LASSERTF(granted_lock != NULL, LPU64"/%lu lockh "LPX64"\n", - id_fid(&body->id1), (unsigned long)id_group(&body->id1), + LASSERTF(granted_lock != NULL, LPU64"/%u lockh "LPX64"\n", + body->fid1.id, body->fid1.generation, child_lockh->cookie); - if (name) { - /* usual named request */ - dparent = mds_id2dentry(obd, &body->id1, NULL); - LASSERT(!IS_ERR(dparent)); - dchild = ll_lookup_one_len(name, dparent, namesize - 1); - LASSERT(!IS_ERR(dchild)); - } else { - /* client wants to get attr. by id */ - dchild = mds_id2dentry(obd, &body->id1, NULL); - LASSERT(!IS_ERR(dchild)); - } + + res = granted_lock->l_resource; + child_fid.id = res->lr_name.name[0]; + child_fid.generation = res->lr_name.name[1]; + dchild = mds_fid2dentry(&obd->u.mds, &child_fid, NULL); + LASSERT(!IS_ERR(dchild)); LDLM_LOCK_PUT(granted_lock); } cleanup_phase = 2; /* dchild, dparent, locks */ - if (!DENTRY_VALID(dchild)) { + if (dchild->d_inode == NULL) { intent_set_disposition(rep, DISP_LOOKUP_NEG); - /* - * in the intent case, the policy clears this error: the - * disposition is enough. - */ - rc = -ENOENT; - GOTO(cleanup, rc); + /* in the intent case, the policy clears this error: + the disposition is enough */ + GOTO(cleanup, rc = -ENOENT); } else { intent_set_disposition(rep, DISP_LOOKUP_POS); } if (req->rq_repmsg == NULL) { - if (dchild->d_flags & DCACHE_CROSS_REF) - rc = mds_getattr_pack_msg_cf(req, dchild, offset); - else - rc = mds_getattr_pack_msg(req, dchild, offset); + rc = mds_getattr_pack_msg(req, dchild->d_inode, offset); if (rc != 0) { CERROR ("mds_getattr_pack_msg: %d\n", rc); GOTO (cleanup, rc); } } - rc = mds_getattr_internal(obd, dchild, req, offset, body, - reply_offset, rsd); - if (rc) - GOTO(cleanup, rc); /* returns the lock to the client */ - - /* probably MDS knows actual size? */ - body = lustre_msg_buf(req->rq_repmsg, reply_offset, sizeof(*body)); - LASSERT(body != NULL); - mds_getattr_size(obd, dchild, req, body); - - GOTO(cleanup, rc); + rc = mds_getattr_internal(obd, dchild, req, body, offset); + GOTO(cleanup, rc); /* returns the lock to the client */ cleanup: switch (cleanup_phase) { case 2: if (resent_req == 0) { - if (rc && DENTRY_VALID(dchild)) - ldlm_lock_decref(child_lockh, LCK_PR); - if (name) - ldlm_lock_decref(parent_lockh, LCK_PR); -#ifdef S_PDIROPS - if (parent_lockh[1].cookie != 0) - ldlm_lock_decref(parent_lockh + 1, update_mode); -#endif - if (dparent) + if (rc && dchild->d_inode) + ldlm_lock_decref(child_lockh, LCK_CR); + if (name) { + ldlm_lock_decref(&parent_lockh, LCK_CR); l_dput(dparent); + } } l_dput(dchild); case 1: pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc); default: - mds_exit_ucred(&uc); + mds_exit_ucred(&uc, mds); + if (req->rq_reply_state == NULL) { + int rc2 = lustre_pack_reply(req, 1, NULL, NULL); + if (rc == 0) + rc = rc2; + req->rq_status = rc; + } } return rc; } static int mds_getattr(struct ptlrpc_request *req, int offset) { + struct mds_obd *mds = mds_req2mds(req); struct obd_device *obd = req->rq_export->exp_obd; struct lvfs_run_ctxt saved; struct dentry *de; - struct mds_req_sec_desc *rsd; struct mds_body *body; - struct lvfs_ucred uc = {NULL, NULL,}; + struct lvfs_ucred uc = {0,}; int rc = 0; ENTRY; - MD_COUNTER_INCREMENT(obd, getattr); - - rsd = lustre_swab_mds_secdesc(req, MDS_REQ_SECDESC_OFF); - if (!rsd) { - CERROR("Can't unpack security desc\n"); - RETURN(-EFAULT); - } + OBD_COUNTER_INCREMENT(obd, getattr); body = lustre_swab_reqbuf(req, offset, sizeof(*body), lustre_swab_mds_body); - if (body == NULL) { - CERROR ("Can't unpack body\n"); - RETURN (-EFAULT); - } + if (body == NULL) + RETURN(-EFAULT); - rc = mds_init_ucred(&uc, req, rsd); - if (rc) { - mds_exit_ucred(&uc); - RETURN(rc); - } + rc = mds_init_ucred(&uc, req, offset); + if (rc) + GOTO(out_ucred, rc); push_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc); - de = mds_id2dentry(obd, &body->id1, NULL); + de = mds_fid2dentry(mds, &body->fid1, NULL); if (IS_ERR(de)) { rc = req->rq_status = PTR_ERR(de); GOTO(out_pop, rc); } - rc = mds_getattr_pack_msg(req, de, offset); + rc = mds_getattr_pack_msg(req, de->d_inode, offset); if (rc != 0) { CERROR("mds_getattr_pack_msg: %d\n", rc); GOTO(out_pop, rc); } - req->rq_status = mds_getattr_internal(obd, de, req, offset, body, - 0, rsd); - l_dput(de); - - EXIT; -out_pop: - pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc); - mds_exit_ucred(&uc); - return rc; -} - -static int mds_access_check(struct ptlrpc_request *req, int offset) -{ - struct obd_device *obd = req->rq_export->exp_obd; - struct lvfs_run_ctxt saved; - struct dentry *de; - struct mds_req_sec_desc *rsd; - struct mds_body *body; - struct lvfs_ucred uc; - int rep_size[2] = {sizeof(*body), - sizeof(struct mds_remote_perm)}; - int rc = 0; - ENTRY; - - if (!req->rq_export->exp_mds_data.med_remote) { - CERROR("from local client "LPU64"\n", req->rq_peer.peer_id.nid); - RETURN(-EINVAL); - } - - rsd = lustre_swab_mds_secdesc(req, MDS_REQ_SECDESC_OFF); - if (!rsd) { - CERROR("Can't unpack security desc\n"); - RETURN(-EFAULT); - } - - body = lustre_swab_reqbuf(req, offset, sizeof(*body), - lustre_swab_mds_body); - if (body == NULL) { - CERROR ("Can't unpack body\n"); - RETURN (-EFAULT); - } - - MD_COUNTER_INCREMENT(obd, access_check); - - rc = mds_init_ucred(&uc, req, rsd); - if (rc) { - CERROR("init ucred error: %d\n", rc); - RETURN(rc); - } - push_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc); - - de = mds_id2dentry(obd, &body->id1, NULL); - if (IS_ERR(de)) { - CERROR("grab ino "LPU64": err %ld\n", - body->id1.li_stc.u.e3s.l3s_ino, PTR_ERR(de)); - GOTO(out_pop, rc = PTR_ERR(de)); - } - - rc = lustre_pack_reply(req, 2, rep_size, NULL); - if (rc) { - CERROR("pack reply error: %d\n", rc); - GOTO(out_dput, rc = -EINVAL); - } - - body = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*body)); - LASSERT(body); - - rc = mds_pack_remote_perm(req, 1, body, de->d_inode); - EXIT; + req->rq_status = mds_getattr_internal(obd, de, req, body, + REPLY_REC_OFF); -out_dput: l_dput(de); + GOTO(out_pop, rc); out_pop: pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc); - mds_exit_ucred(&uc); +out_ucred: + if (req->rq_reply_state == NULL) { + int rc2 = lustre_pack_reply(req, 1, NULL, NULL); + if (rc == 0) + rc = rc2; + req->rq_status = rc; + } + mds_exit_ucred(&uc, mds); return rc; } static int mds_obd_statfs(struct obd_device *obd, struct obd_statfs *osfs, - unsigned long max_age) + __u64 max_age, __u32 flags) { int rc; - ENTRY; spin_lock(&obd->obd_osfs_lock); - rc = fsfilt_statfs(obd, obd->u.mds.mds_sb, max_age); + rc = fsfilt_statfs(obd, obd->u.obt.obt_sb, max_age); if (rc == 0) memcpy(osfs, &obd->obd_osfs, sizeof(*osfs)); spin_unlock(&obd->obd_osfs_lock); - RETURN(rc); + return rc; } static int mds_statfs(struct ptlrpc_request *req) { struct obd_device *obd = req->rq_export->exp_obd; - int rc, size = sizeof(struct obd_statfs); + int rc, size[2] = { sizeof(struct ptlrpc_body), + sizeof(struct obd_statfs) }; ENTRY; /* This will trigger a watchdog timeout */ OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_STATFS_LCW_SLEEP, (MDS_SERVICE_WATCHDOG_TIMEOUT / 1000) + 1); + OBD_COUNTER_INCREMENT(obd, statfs); - rc = lustre_pack_reply(req, 1, &size, NULL); - if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK)) { - CERROR("mds: statfs lustre_pack_reply failed: rc = %d\n", rc); + if (OBD_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK)) + GOTO(out, rc = -ENOMEM); + rc = lustre_pack_reply(req, 2, size, NULL); + if (rc) GOTO(out, rc); - } - - OBD_COUNTER_INCREMENT(obd, statfs); /* We call this so that we can cache a bit - 1 jiffie worth */ - rc = mds_obd_statfs(obd, lustre_msg_buf(req->rq_repmsg, 0, size), - jiffies - HZ); + rc = mds_obd_statfs(obd, lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, + size[REPLY_REC_OFF]), + cfs_time_current_64() - HZ, 0); if (rc) { CERROR("mds_obd_statfs failed: rc %d\n", rc); GOTO(out, rc); @@ -1769,7 +1113,7 @@ static int mds_statfs(struct ptlrpc_request *req) EXIT; out: req->rq_status = rc; - return rc; + return 0; } static int mds_sync(struct ptlrpc_request *req, int offset) @@ -1777,49 +1121,39 @@ static int mds_sync(struct ptlrpc_request *req, int offset) struct obd_device *obd = req->rq_export->exp_obd; struct mds_obd *mds = &obd->u.mds; struct mds_body *body; - int rc, size = sizeof(*body); + int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) }; ENTRY; body = lustre_swab_reqbuf(req, offset, sizeof(*body), lustre_swab_mds_body); if (body == NULL) - GOTO(out, rc = -EPROTO); + GOTO(out, rc = -EFAULT); - rc = lustre_pack_reply(req, 1, &size, NULL); - if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_SYNC_PACK)) { - CERROR("fsync lustre_pack_reply failed: rc = %d\n", rc); + if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SYNC_PACK)) + GOTO(out, rc = -ENOMEM); + rc = lustre_pack_reply(req, 2, size, NULL); + if (rc) GOTO(out, rc); - } - if (id_ino(&body->id1) == 0) { - /* an id of zero is taken to mean "sync whole filesystem" */ - rc = fsfilt_sync(obd, mds->mds_sb); - if (rc) - GOTO(out, rc); - } else { - /* just any file to grab fsync method - "file" arg unused */ - struct file *file = mds->mds_rcvd_filp; - struct mds_body *rep_body; + rc = fsfilt_sync(obd, obd->u.obt.obt_sb); + if (rc == 0 && body->fid1.id != 0) { struct dentry *de; - de = mds_id2dentry(obd, &body->id1, NULL); + de = mds_fid2dentry(mds, &body->fid1, NULL); if (IS_ERR(de)) GOTO(out, rc = PTR_ERR(de)); - rc = file->f_op->fsync(NULL, de, 1); - if (rc) - GOTO(out, rc); + body = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, + sizeof(*body)); + mds_pack_inode2fid(&body->fid1, de->d_inode); + mds_pack_inode2body(body, de->d_inode); - rep_body = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*rep_body)); - mds_pack_inode2body(obd, rep_body, de->d_inode, - (body->valid & OBD_MD_FID) ? 1 : 0); l_dput(de); } - - EXIT; + GOTO(out, rc); out: req->rq_status = rc; - return rc; + return 0; } /* mds_readpage does not take a DLM lock on the inode, because the client must @@ -1830,42 +1164,33 @@ out: static int mds_readpage(struct ptlrpc_request *req, int offset) { struct obd_device *obd = req->rq_export->exp_obd; + struct mds_obd *mds = &obd->u.mds; struct vfsmount *mnt; struct dentry *de; struct file *file; - struct mds_req_sec_desc *rsd; struct mds_body *body, *repbody; struct lvfs_run_ctxt saved; - int rc, size = sizeof(*repbody); - struct lvfs_ucred uc = {NULL, NULL,}; + int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*repbody) }; + struct lvfs_ucred uc = {0,}; ENTRY; - rc = lustre_pack_reply(req, 1, &size, NULL); - if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK)) { - CERROR("mds: out of memory\n"); - GOTO(out, rc = -ENOMEM); - } - - rsd = lustre_swab_mds_secdesc(req, MDS_REQ_SECDESC_OFF); - if (!rsd) { - CERROR("Can't unpack security desc\n"); - GOTO (out, rc = -EFAULT); - } + if (OBD_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK)) + RETURN(-ENOMEM); + rc = lustre_pack_reply(req, 2, size, NULL); + if (rc) + GOTO(out, rc); body = lustre_swab_reqbuf(req, offset, sizeof(*body), lustre_swab_mds_body); - if (body == NULL) { - CERROR("Can't unpack body\n"); + if (body == NULL) GOTO (out, rc = -EFAULT); - } - rc = mds_init_ucred(&uc, req, rsd); - if (rc) { + rc = mds_init_ucred(&uc, req, offset); + if (rc) GOTO(out, rc); - } push_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc); - de = mds_id2dentry(obd, &body->id1, &mnt); + de = mds_fid2dentry(&obd->u.mds, &body->fid1, &mnt); if (IS_ERR(de)) GOTO(out_pop, rc = PTR_ERR(de)); @@ -1877,21 +1202,22 @@ static int mds_readpage(struct ptlrpc_request *req, int offset) GOTO(out_pop, rc = PTR_ERR(file)); /* body->size is actually the offset -eeb */ - if ((body->size & (de->d_inode->i_blksize - 1)) != 0) { + if ((body->size & (de->d_inode->i_sb->s_blocksize - 1)) != 0) { CERROR("offset "LPU64" not on a block boundary of %lu\n", - body->size, de->d_inode->i_blksize); + body->size, de->d_inode->i_sb->s_blocksize); GOTO(out_file, rc = -EFAULT); } /* body->nlink is actually the #bytes to read -eeb */ - if (body->nlink & (de->d_inode->i_blksize - 1)) { + if (body->nlink & (de->d_inode->i_sb->s_blocksize - 1)) { CERROR("size %u is not multiple of blocksize %lu\n", - body->nlink, de->d_inode->i_blksize); + body->nlink, de->d_inode->i_sb->s_blocksize); GOTO(out_file, rc = -EFAULT); } - repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*repbody)); - repbody->size = file->f_dentry->d_inode->i_size; + repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, + sizeof(*repbody)); + repbody->size = i_size_read(file->f_dentry->d_inode); repbody->valid = OBD_MD_FLSIZE; /* to make this asynchronous make sure that the handling function @@ -1900,194 +1226,43 @@ static int mds_readpage(struct ptlrpc_request *req, int offset) /* body->size is actually the offset -eeb */ rc = mds_sendpage(req, file, body->size, body->nlink); - EXIT; out_file: filp_close(file, 0); out_pop: pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc); out: - mds_exit_ucred(&uc); + mds_exit_ucred(&uc, mds); req->rq_status = rc; - return 0; -} - -/* update master MDS ID, which is stored in local inode EA. */ -int mds_update_mid(struct obd_device *obd, struct lustre_id *id, - void *data, int data_len) -{ - struct mds_obd *mds = &obd->u.mds; - struct dentry *dentry; - void *handle; - int rc = 0; - ENTRY; - - LASSERT(id); - LASSERT(obd); - - dentry = mds_id2dentry(obd, id, NULL); - if (IS_ERR(dentry)) - GOTO(out, rc = PTR_ERR(dentry)); - - if (!dentry->d_inode) { - CERROR("Can't find object "DLID4".\n", - OLID4(id)); - GOTO(out_dentry, rc = -EINVAL); - } - - handle = fsfilt_start(obd, dentry->d_inode, - FSFILT_OP_SETATTR, NULL); - if (IS_ERR(handle)) - GOTO(out_dentry, rc = PTR_ERR(handle)); - - rc = mds_update_inode_mid(obd, dentry->d_inode, handle, - (struct lustre_id *)data); - if (rc) { - CERROR("Can't update inode "DLID4" master id, " - "error = %d.\n", OLID4(id), rc); - GOTO(out_commit, rc); - } - - EXIT; -out_commit: - fsfilt_commit(obd, mds->mds_sb, dentry->d_inode, - handle, 0); -out_dentry: - l_dput(dentry); -out: - return rc; -} -EXPORT_SYMBOL(mds_update_mid); - -/* read master MDS ID, which is stored in local inode EA. */ -int mds_read_mid(struct obd_device *obd, struct lustre_id *id, - void *data, int data_len) -{ - struct dentry *dentry; - int rc = 0; - ENTRY; - - LASSERT(id); - LASSERT(obd); - - dentry = mds_id2dentry(obd, id, NULL); - if (IS_ERR(dentry)) - GOTO(out, rc = PTR_ERR(dentry)); - - if (!dentry->d_inode) { - CERROR("Can't find object "DLID4".\n", - OLID4(id)); - GOTO(out_dentry, rc = -EINVAL); - } - - down(&dentry->d_inode->i_sem); - rc = mds_read_inode_mid(obd, dentry->d_inode, - (struct lustre_id *)data); - up(&dentry->d_inode->i_sem); - if (rc) { - CERROR("Can't read inode "DLID4" master id, " - "error = %d.\n", OLID4(id), rc); - GOTO(out_dentry, rc); - } - - EXIT; -out_dentry: - l_dput(dentry); -out: - return rc; -} -EXPORT_SYMBOL(mds_read_mid); - -int mds_read_md(struct obd_device *obd, struct lustre_id *id, - char **data, int *datalen) -{ - struct dentry *dentry; - struct mds_obd *mds = &obd->u.mds; - int rc = 0, mea = 0; - char *ea; - ENTRY; - - LASSERT(id); - LASSERT(obd); - - dentry = mds_id2dentry(obd, id, NULL); - if (IS_ERR(dentry)) - GOTO(out, rc = PTR_ERR(dentry)); - - if (!dentry->d_inode) { - CERROR("Can't find object "DLID4".\n", - OLID4(id)); - GOTO(out_dentry, rc = -EINVAL); - } - if (S_ISDIR(dentry->d_inode->i_mode)) { - *datalen = obd_packmd(mds->mds_md_exp, NULL, NULL); - mea = 1; - } else { - *datalen = obd_packmd(mds->mds_dt_exp, NULL, NULL); - mea = 0; - } - OBD_ALLOC(ea, *datalen); - if (!ea) { - *datalen = 0; - GOTO(out_dentry, rc = PTR_ERR(dentry)); - } - *data = ea; - down(&dentry->d_inode->i_sem); - rc = fsfilt_get_md(obd, dentry->d_inode, *data, *datalen, - (mea ? EA_MEA : EA_LOV)); - up(&dentry->d_inode->i_sem); - - if (rc < 0) - CERROR("Error %d reading eadata for ino %lu\n", - rc, dentry->d_inode->i_ino); -out_dentry: - l_dput(dentry); -out: - RETURN(rc); + RETURN(0); } -EXPORT_SYMBOL(mds_read_md); int mds_reint(struct ptlrpc_request *req, int offset, struct lustre_handle *lockh) { - struct mds_update_record *rec; - struct mds_req_sec_desc *rsd; + struct mds_update_record *rec; /* 116 bytes on the stack? no sir! */ int rc; - ENTRY; OBD_ALLOC(rec, sizeof(*rec)); if (rec == NULL) RETURN(-ENOMEM); - rsd = lustre_swab_mds_secdesc(req, MDS_REQ_SECDESC_OFF); - if (!rsd) { - CERROR("Can't unpack security desc\n"); - GOTO(out, rc = -EFAULT); - } - rc = mds_update_unpack(req, offset, rec); if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNPACK)) { CERROR("invalid record\n"); GOTO(out, req->rq_status = -EINVAL); } - rc = mds_init_ucred(&rec->ur_uc, req, rsd); - if (rc) { - GOTO(out, rc); - } - /* rc will be used to interrupt a for loop over multiple records */ rc = mds_reint_rec(rec, offset, req, lockh); - out: - mds_exit_ucred(&rec->ur_uc); OBD_FREE(rec, sizeof(*rec)); - RETURN(rc); + return rc; } -static int mds_filter_recovery_request(struct ptlrpc_request *req, - struct obd_device *obd, int *process) +int mds_filter_recovery_request(struct ptlrpc_request *req, + struct obd_device *obd, int *process) { - switch (req->rq_reqmsg->opc) { + switch (lustre_msg_get_opc(req->rq_reqmsg)) { case MDS_CONNECT: /* This will never get here, but for completeness. */ case OST_CONNECT: /* This will never get here, but for completeness. */ case MDS_DISCONNECT: @@ -2096,22 +1271,23 @@ static int mds_filter_recovery_request(struct ptlrpc_request *req, RETURN(0); case MDS_CLOSE: + case MDS_DONE_WRITING: case MDS_SYNC: /* used in unmounting */ case OBD_PING: case MDS_REINT: + case SEQ_QUERY: + case FLD_QUERY: case LDLM_ENQUEUE: - case OST_CREATE: *process = target_queue_recovery_request(req, obd); RETURN(0); default: DEBUG_REQ(D_ERROR, req, "not permitted during recovery"); - *process = 0; - /* XXX what should we set rq_status to here? */ - req->rq_status = -EAGAIN; - RETURN(ptlrpc_error(req)); + *process = -EAGAIN; + RETURN(0); } } +EXPORT_SYMBOL(mds_filter_recovery_request); static char *reint_names[] = { [REINT_SETATTR] "setattr", @@ -2122,643 +1298,131 @@ static char *reint_names[] = { [REINT_OPEN] "open", }; -#define FILTER_VALID_FLAGS (OBD_MD_FLTYPE | OBD_MD_FLMODE | OBD_MD_FLGENER | \ - OBD_MD_FLSIZE | OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ| \ - OBD_MD_FLATIME | OBD_MD_FLMTIME | OBD_MD_FLCTIME| \ - OBD_MD_FLID) - -static void reconstruct_create(struct ptlrpc_request *req) -{ - struct mds_export_data *med = &req->rq_export->exp_mds_data; - struct mds_client_data *mcd = med->med_mcd; - struct dentry *dentry; - struct ost_body *body; - struct lustre_id id; - int rc; - ENTRY; - - /* copy rc, transno and disp; steal locks */ - mds_req_from_mcd(req, mcd); - if (req->rq_status) { - EXIT; - return; - } - - id_gen(&id) = 0; - id_group(&id) = 0; - - id_ino(&id) = mcd->mcd_last_data; - LASSERT(id_ino(&id) != 0); - - dentry = mds_id2dentry(req2obd(req), &id, NULL); - if (IS_ERR(dentry)) { - CERROR("can't find inode "LPU64"\n", id_ino(&id)); - req->rq_status = PTR_ERR(dentry); - EXIT; - return; - } - - CWARN("reconstruct reply for x"LPU64" (remote ino) "LPU64" -> %lu/%u\n", - req->rq_xid, id_ino(&id), dentry->d_inode->i_ino, - dentry->d_inode->i_generation); - - body = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*body)); - obdo_from_inode(&body->oa, dentry->d_inode, FILTER_VALID_FLAGS); - body->oa.o_id = dentry->d_inode->i_ino; - body->oa.o_generation = dentry->d_inode->i_generation; - body->oa.o_valid |= OBD_MD_FLID | OBD_MD_FLGENER; - - down(&dentry->d_inode->i_sem); - rc = mds_read_inode_sid(req2obd(req), dentry->d_inode, &id); - up(&dentry->d_inode->i_sem); - if (rc) { - CERROR("Can't read inode self id, inode %lu, " - "rc %d\n", dentry->d_inode->i_ino, rc); - id_fid(&id) = 0; - } - - body->oa.o_fid = id_fid(&id); - body->oa.o_mds = id_group(&id); - l_dput(dentry); - - EXIT; -} - -static int mds_inode_init_acl(struct obd_device *obd, void *handle, - struct dentry *de, void *xattr, int xattr_size) -{ - struct inode *inode = de->d_inode; - struct posix_acl *acl; - mode_t mode; - int rc = 0; - - LASSERT(handle); - LASSERT(inode); - LASSERT(xattr); - LASSERT(xattr_size > 0); - - if (!inode->i_op->getxattr || !inode->i_op->setxattr) { - CERROR("backend fs dosen't support xattr\n"); - return -EOPNOTSUPP; - } - - /* set default acl */ - if (S_ISDIR(inode->i_mode)) { - rc = inode->i_op->setxattr(de, XATTR_NAME_ACL_DEFAULT, - xattr, xattr_size, 0); - if (rc) { - CERROR("set default acl err: %d\n", rc); - return rc; - } - } - - /* set access acl */ - acl = posix_acl_from_xattr(xattr, xattr_size); - if (acl == NULL || IS_ERR(acl)) { - CERROR("insane attr data\n"); - return PTR_ERR(acl); - } - - if (posix_acl_valid(acl)) { - CERROR("default acl not valid: %d\n", rc); - rc = -EFAULT; - goto out; - } - - mode = inode->i_mode; - rc = posix_acl_create_masq(acl, &mode); - if (rc < 0) { - CERROR("create masq err %d\n", rc); - goto out; - } - - if (inode->i_mode != mode) { - struct iattr iattr = { .ia_valid = ATTR_MODE, - .ia_mode = mode }; - int rc2; - - rc2 = fsfilt_setattr(obd, de, handle, &iattr, 0); - if (rc2) { - CERROR("setattr mode err: %d\n", rc2); - rc = rc2; - goto out; - } - } - - if (rc > 0) { - /* we didn't change acl except mode bits of some - * entries, so should be fit into original size. - */ - rc = posix_acl_to_xattr(acl, xattr, xattr_size); - LASSERT(rc > 0); - - rc = inode->i_op->setxattr(de, XATTR_NAME_ACL_ACCESS, - xattr, xattr_size, 0); - if (rc) - CERROR("set access acl err: %d\n", rc); - } -out: - posix_acl_release(acl); - return rc; -} - -static int mdt_obj_create(struct ptlrpc_request *req) +static int mds_set_info_rpc(struct obd_export *exp, struct ptlrpc_request *req) { - struct obd_device *obd = req->rq_export->exp_obd; - struct mds_obd *mds = &obd->u.mds; - struct ost_body *body, *repbody; - void *acl = NULL; - int acl_size; - char idname[LL_ID_NAMELEN]; - int size = sizeof(*repbody); - struct inode *parent_inode; - struct lvfs_run_ctxt saved; - int rc, cleanup_phase = 0; - struct dentry *new = NULL; - struct dentry_params dp; - int mealen, flags = 0; - struct lvfs_ucred uc; - struct lustre_id id; - struct mea *mea; - void *handle = NULL; - unsigned long cr_inum = 0; - __u64 fid = 0; + void *key, *val; + int keylen, vallen, rc = 0; ENTRY; - - DEBUG_REQ(D_HA, req, "create remote object"); - parent_inode = mds->mds_unnamed_dir->d_inode; - body = lustre_swab_reqbuf(req, 0, sizeof(*body), - lustre_swab_ost_body); - if (body == NULL) + key = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, 1); + if (key == NULL) { + DEBUG_REQ(D_HA, req, "no set_info key"); RETURN(-EFAULT); - - /* acl data is packed transparently, no swab here */ - LASSERT(req->rq_reqmsg->bufcount >= 2); - acl_size = req->rq_reqmsg->buflens[1]; - if (acl_size) { - acl = lustre_msg_buf(req->rq_reqmsg, 1, acl_size); - if (!acl) { - CERROR("No default acl buf?\n"); - RETURN(-EFAULT); - } } + keylen = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF); - rc = lustre_pack_reply(req, 1, &size, NULL); + val = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, 0); + vallen = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF + 1); + + rc = lustre_pack_reply(req, 1, NULL, NULL); if (rc) RETURN(rc); - MDS_CHECK_RESENT(req, reconstruct_create(req)); - - uc.luc_lsd = NULL; - uc.luc_ginfo = NULL; - uc.luc_uid = body->oa.o_uid; - uc.luc_gid = body->oa.o_gid; - uc.luc_fsuid = body->oa.o_uid; - uc.luc_fsgid = body->oa.o_gid; - - push_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc); - repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody)); - - /* in REPLAY case inum should be given (client or other MDS fills it) */ - if (body->oa.o_id && ((body->oa.o_flags & OBD_FL_RECREATE_OBJS) || - (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY))) { - /* - * this is re-create request from MDS holding directory name. - * we have to lookup given ino/gen first. if it exists (good - * case) then there is nothing to do. if it does not then we - * have to recreate it. - */ - id_ino(&id) = body->oa.o_id; - id_gen(&id) = body->oa.o_generation; - - new = mds_id2dentry(obd, &id, NULL); - if (!IS_ERR(new) && new->d_inode) { - struct lustre_id sid; - - CDEBUG(D_OTHER, "mkdir repairing %lu/%lu\n", - (unsigned long)id_ino(&id), - (unsigned long)id_gen(&id)); - - obdo_from_inode(&repbody->oa, new->d_inode, - FILTER_VALID_FLAGS); - - repbody->oa.o_id = new->d_inode->i_ino; - repbody->oa.o_generation = new->d_inode->i_generation; - repbody->oa.o_valid |= OBD_MD_FLID | OBD_MD_FLGENER; - cleanup_phase = 1; - - down(&new->d_inode->i_sem); - rc = mds_read_inode_sid(obd, new->d_inode, &sid); - up(&new->d_inode->i_sem); - if (rc) { - CERROR("Can't read inode self id " - "inode %lu, rc %d.\n", - new->d_inode->i_ino, rc); - GOTO(cleanup, rc); - } - - repbody->oa.o_fid = id_fid(&sid); - repbody->oa.o_mds = id_group(&sid); - LASSERT(id_fid(&sid) != 0); - - /* - * here we could use fid passed in body->oa.o_fid and - * thus avoid mds_read_inode_sid(). - */ - cr_inum = new->d_inode->i_ino; - GOTO(cleanup, rc = 0); - } - } - - down(&parent_inode->i_sem); - handle = fsfilt_start(obd, parent_inode, FSFILT_OP_MKDIR, NULL); - if (IS_ERR(handle)) { - up(&parent_inode->i_sem); - CERROR("fsfilt_start() failed, rc = %d\n", - (int)PTR_ERR(handle)); - GOTO(cleanup, rc = PTR_ERR(handle)); - } - cleanup_phase = 1; /* transaction */ - -repeat: - rc = sprintf(idname, "%u.%u", ll_insecure_random_int(), current->pid); - new = lookup_one_len(idname, mds->mds_unnamed_dir, rc); - if (IS_ERR(new)) { - CERROR("%s: can't lookup new inode (%s) for mkdir: %d\n", - obd->obd_name, idname, (int) PTR_ERR(new)); - fsfilt_commit(obd, mds->mds_sb, new->d_inode, handle, 0); - up(&parent_inode->i_sem); - RETURN(PTR_ERR(new)); - } else if (new->d_inode) { - CERROR("%s: name exists. repeat\n", obd->obd_name); - goto repeat; - } - if ((body->oa.o_flags & OBD_FL_RECREATE_OBJS) || - lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) { - fid = body->oa.o_fid; - } else { - fid = mds_alloc_fid(obd); - } - new->d_fsdata = (void *)&dp; - dp.p_inum = 0; - dp.p_ptr = req; - dp.p_fid = fid; - dp.p_group = mds->mds_num; - - if ((lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) || - (body->oa.o_flags & OBD_FL_RECREATE_OBJS)) { - LASSERT(body->oa.o_id != 0); - dp.p_inum = body->oa.o_id; - DEBUG_REQ(D_HA, req, "replay create obj %lu/%lu", - (unsigned long)body->oa.o_id, - (unsigned long)body->oa.o_generation); - } + lustre_msg_set_status(req->rq_repmsg, 0); - rc = vfs_mkdir(parent_inode, new, body->oa.o_mode); - if (rc == 0) { - if (acl) { - rc = mds_inode_init_acl(obd, handle, new, - acl, acl_size); - if (rc) { - up(&parent_inode->i_sem); - GOTO(cleanup, rc); - } - } - if ((body->oa.o_flags & OBD_FL_RECREATE_OBJS) || - lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) { - new->d_inode->i_generation = body->oa.o_generation; - mark_inode_dirty(new->d_inode); - - /* - * avoiding asserts in cache flush case, as - * @body->oa.o_id should be zero. - */ - if (body->oa.o_id) { - LASSERTF(body->oa.o_id == new->d_inode->i_ino, - "BUG 3550: failed to recreate obj " - LPU64" -> %lu\n", body->oa.o_id, - new->d_inode->i_ino); - - LASSERTF(body->oa.o_generation == - new->d_inode->i_generation, - "BUG 3550: failed to recreate obj/gen " - LPU64"/%u -> %lu/%u\n", body->oa.o_id, - body->oa.o_generation, - new->d_inode->i_ino, - new->d_inode->i_generation); - } - } - - obdo_from_inode(&repbody->oa, new->d_inode, FILTER_VALID_FLAGS); - repbody->oa.o_id = new->d_inode->i_ino; - repbody->oa.o_generation = new->d_inode->i_generation; - repbody->oa.o_valid |= OBD_MD_FLID | OBD_MD_FLGENER | OBD_MD_FID; - - if ((body->oa.o_flags & OBD_FL_RECREATE_OBJS) || - lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) { - id_group(&id) = mds->mds_num; - - LASSERT(body->oa.o_fid != 0); - id_fid(&id) = body->oa.o_fid; - - LASSERT(body->oa.o_id != 0); - id_ino(&id) = repbody->oa.o_id; - id_gen(&id) = repbody->oa.o_generation; - - down(&new->d_inode->i_sem); - rc = mds_update_inode_sid(obd, new->d_inode, handle, &id); - up(&new->d_inode->i_sem); - - /* - * make sure, that fid is up-to-date. - */ - mds_set_last_fid(obd, id_fid(&id)); - } else { - /* - * allocate new sid, as object is created from scratch - * and this is not replay. - */ - down(&new->d_inode->i_sem); - rc = mds_set_inode_sid(obd, new->d_inode, handle, &id, fid); - up(&new->d_inode->i_sem); - } - if (rc) { - CERROR("Can't update lustre ID for inode %lu, " - "error = %d\n", new->d_inode->i_ino, rc); - GOTO(cleanup, rc); + if (KEY_IS("read-only")) { + if (val == NULL || vallen < sizeof(__u32)) { + DEBUG_REQ(D_HA, req, "no set_info val"); + RETURN(-EFAULT); } - /* initializing o_fid after it is allocated. */ - repbody->oa.o_fid = id_fid(&id); - repbody->oa.o_mds = id_group(&id); - - rc = fsfilt_del_dir_entry(obd, new); - up(&parent_inode->i_sem); - if (rc) { - CERROR("can't remove name for object: %d\n", rc); - GOTO(cleanup, rc); - } - - cleanup_phase = 2; /* created directory object */ - - CDEBUG(D_OTHER, "created dirobj: %lu/%lu mode %o\n", - (unsigned long)new->d_inode->i_ino, - (unsigned long)new->d_inode->i_generation, - (unsigned)new->d_inode->i_mode); - cr_inum = new->d_inode->i_ino; + if (*(__u32 *)val) + exp->exp_connect_flags |= OBD_CONNECT_RDONLY; + else + exp->exp_connect_flags &= ~OBD_CONNECT_RDONLY; } else { - up(&parent_inode->i_sem); - CERROR("%s: can't create dirobj: %d\n", obd->obd_name, rc); - GOTO(cleanup, rc); - } - - if (body->oa.o_valid & OBD_MD_FLID) { - /* this is new object for splitted dir. We have to prevent - * recursive splitting on it -bzzz */ - mealen = obd_size_diskmd(mds->mds_md_exp, NULL); - - OBD_ALLOC(mea, mealen); - if (mea == NULL) - GOTO(cleanup, rc = -ENOMEM); - - mea->mea_magic = MEA_MAGIC_ALL_CHARS; - mea->mea_master = 0; - mea->mea_count = 0; - - down(&new->d_inode->i_sem); - rc = fsfilt_set_md(obd, new->d_inode, handle, - mea, mealen, EA_MEA); - up(&new->d_inode->i_sem); - if (rc) - CERROR("fsfilt_set_md() failed, " - "rc = %d\n", rc); - - OBD_FREE(mea, mealen); - - CDEBUG(D_OTHER, "%s: mark non-splittable %lu/%u - %d\n", - obd->obd_name, new->d_inode->i_ino, - new->d_inode->i_generation, flags); - } else if (body->oa.o_easize) { - /* we pass LCK_EX to split routine to signal that we have - * exclusive access to the directory. simple because nobody - * knows it already exists -bzzz */ - rc = mds_try_to_split_dir(obd, new, NULL, - body->oa.o_easize, LCK_EX); - if (rc < 0) { - CERROR("Can't split directory %lu, error = %d.\n", - new->d_inode->i_ino, rc); - } else { - rc = 0; - } - } - - EXIT; -cleanup: - switch (cleanup_phase) { - case 2: /* object has been created, but we'll may want to replay it later */ - if (rc == 0) - ptlrpc_require_repack(req); - case 1: /* transaction */ - rc = mds_finish_transno(mds, parent_inode, handle, - req, rc, cr_inum); + RETURN(-EINVAL); } - l_dput(new); - pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc); - return rc; + RETURN(0); } -static int mdt_get_info(struct ptlrpc_request *req) +static int mds_handle_quotacheck(struct ptlrpc_request *req) { - struct obd_export *exp = req->rq_export; - int keylen, rc = 0; - char *key; + struct obd_quotactl *oqctl; + int rc; ENTRY; - key = lustre_msg_buf(req->rq_reqmsg, 0, 1); - if (key == NULL) { - DEBUG_REQ(D_HA, req, "no get_info key"); - RETURN(-EFAULT); - } - keylen = req->rq_reqmsg->buflens[0]; - - if ((keylen < strlen("mdsize") || strcmp(key, "mdsize") != 0) && - (keylen < strlen("mdsnum") || strcmp(key, "mdsnum") != 0) && - (keylen < strlen("rootid") || strcmp(key, "rootid") != 0)) + oqctl = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*oqctl), + lustre_swab_obd_quotactl); + if (oqctl == NULL) RETURN(-EPROTO); - if (keylen >= strlen("rootid") && !strcmp(key, "rootid")) { - struct lustre_id *reply; - int size = sizeof(*reply); - - rc = lustre_pack_reply(req, 1, &size, NULL); - if (rc) - RETURN(rc); - - reply = lustre_msg_buf(req->rq_repmsg, 0, size); - rc = obd_get_info(exp, keylen, key, (__u32 *)&size, reply); - } else { - obd_id *reply; - int size = sizeof(*reply); - - rc = lustre_pack_reply(req, 1, &size, NULL); - if (rc) - RETURN(rc); - - reply = lustre_msg_buf(req->rq_repmsg, 0, size); - rc = obd_get_info(exp, keylen, key, (__u32 *)&size, reply); - } - - req->rq_repmsg->status = 0; - RETURN(rc); -} - -static int mds_set_info(struct obd_export *exp, __u32 keylen, - void *key, __u32 vallen, void *val) -{ - struct obd_device *obd; - struct mds_obd *mds; - int rc = 0; - ENTRY; - - obd = class_exp2obd(exp); - if (obd == NULL) { - CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n", - exp->exp_handle.h_cookie); - RETURN(-EINVAL); - } - - mds = &obd->u.mds; - if (keylen >= strlen("mds_type") && - memcmp(key, "mds_type", keylen) == 0) { - int valsize; - __u32 group; - - CDEBUG(D_IOCTL, "set mds type to %x\n", *(int*)val); - - mds->mds_obd_type = *(int*)val; - group = FILTER_GROUP_FIRST_MDS + mds->mds_obd_type; - valsize = sizeof(group); - - /* mds number has been changed, so the corresponding obdfilter - * exp need to be changed too. */ - rc = obd_set_info(mds->mds_dt_exp, strlen("mds_conn"), - "mds_conn", valsize, &group); + rc = lustre_pack_reply(req, 1, NULL, NULL); + if (rc) RETURN(rc); - } - CDEBUG(D_IOCTL, "invalid key\n"); - RETURN(-EINVAL); -} - -static int mdt_set_info(struct ptlrpc_request *req) -{ - char *key, *val; - struct obd_export *exp = req->rq_export; - int keylen, rc = 0, vallen; - ENTRY; - - key = lustre_msg_buf(req->rq_reqmsg, 0, 1); - if (key == NULL) { - DEBUG_REQ(D_HA, req, "no set_info key"); - RETURN(-EFAULT); - } - keylen = req->rq_reqmsg->buflens[0]; - - if (keylen == strlen("mds_type") && - memcmp(key, "mds_type", keylen) == 0) { - rc = lustre_pack_reply(req, 0, NULL, NULL); - if (rc) - RETURN(rc); - - val = lustre_msg_buf(req->rq_reqmsg, 1, 0); - vallen = req->rq_reqmsg->buflens[1]; - rc = obd_set_info(exp, keylen, key, vallen, val); - req->rq_repmsg->status = 0; - RETURN(rc); - } - CDEBUG(D_IOCTL, "invalid key\n"); - RETURN(-EINVAL); + req->rq_status = obd_quotacheck(req->rq_export, oqctl); + RETURN(0); } -static void mds_revoke_export_locks(struct obd_export *exp) +static int mds_handle_quotactl(struct ptlrpc_request *req) { - struct list_head *locklist = &exp->exp_ldlm_data.led_held_locks; - struct list_head work; - struct ldlm_lock *lock, *next; - struct ldlm_lock_desc desc; - - if (!exp->u.eu_mds_data.med_remote) - return; - + struct obd_quotactl *oqctl, *repoqc; + int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*repoqc) }; ENTRY; - CERROR("implement right locking here! -bzzz\n"); - INIT_LIST_HEAD(&work); - spin_lock(&exp->exp_ldlm_data.led_lock); - list_for_each_entry_safe(lock, next, locklist, l_export_chain) { - - lock_res_and_lock(lock); - if (lock->l_req_mode != lock->l_granted_mode) { - unlock_res_and_lock(lock); - continue; - } - - LASSERT(lock->l_resource); - if (lock->l_resource->lr_type != LDLM_IBITS && - lock->l_resource->lr_type != LDLM_PLAIN) { - unlock_res_and_lock(lock); - continue; - } - - if (lock->l_flags & LDLM_FL_AST_SENT) { - unlock_res_and_lock(lock); - continue; - } - lock->l_flags |= LDLM_FL_AST_SENT; - unlock_res_and_lock(lock); + oqctl = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*oqctl), + lustre_swab_obd_quotactl); + if (oqctl == NULL) + RETURN(-EPROTO); - /* the desc just pretend to exclusive */ - ldlm_lock2desc(lock, &desc); - desc.l_req_mode = LCK_EX; - desc.l_granted_mode = 0; + rc = lustre_pack_reply(req, 2, size, NULL); + if (rc) + RETURN(rc); - lock->l_blocking_ast(lock, &desc, NULL, LDLM_CB_BLOCKING); - } - spin_unlock(&exp->exp_ldlm_data.led_lock); + repoqc = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*repoqc)); - EXIT; + req->rq_status = obd_quotactl(req->rq_export, oqctl); + *repoqc = *oqctl; + RETURN(0); } -static int mds_msg_check_version(struct lustre_msg *msg) +int mds_msg_check_version(struct lustre_msg *msg) { int rc; - switch (msg->opc) { + switch (lustre_msg_get_opc(msg)) { case MDS_CONNECT: case MDS_DISCONNECT: case OBD_PING: + case SEC_CTX_INIT: + case SEC_CTX_INIT_CONT: + case SEC_CTX_FINI: rc = lustre_msg_check_version(msg, LUSTRE_OBD_VERSION); if (rc) CERROR("bad opc %u version %08x, expecting %08x\n", - msg->opc, msg->version, LUSTRE_OBD_VERSION); + lustre_msg_get_opc(msg), + lustre_msg_get_version(msg), + LUSTRE_OBD_VERSION); break; - case MDS_STATFS: case MDS_GETSTATUS: case MDS_GETATTR: - case MDS_GETATTR_LOCK: - case MDS_ACCESS_CHECK: + case MDS_GETATTR_NAME: + case MDS_STATFS: case MDS_READPAGE: + case MDS_WRITEPAGE: + case MDS_IS_SUBDIR: case MDS_REINT: case MDS_CLOSE: case MDS_DONE_WRITING: case MDS_PIN: case MDS_SYNC: + case MDS_GETXATTR: + case MDS_SETXATTR: + case MDS_SET_INFO: + case MDS_QUOTACHECK: + case MDS_QUOTACTL: + case QUOTA_DQACQ: + case QUOTA_DQREL: + case SEQ_QUERY: + case FLD_QUERY: rc = lustre_msg_check_version(msg, LUSTRE_MDS_VERSION); if (rc) CERROR("bad opc %u version %08x, expecting %08x\n", - msg->opc, msg->version, LUSTRE_MDS_VERSION); + lustre_msg_get_opc(msg), + lustre_msg_get_version(msg), + LUSTRE_MDS_VERSION); break; case LDLM_ENQUEUE: case LDLM_CONVERT: @@ -2767,52 +1431,45 @@ static int mds_msg_check_version(struct lustre_msg *msg) rc = lustre_msg_check_version(msg, LUSTRE_DLM_VERSION); if (rc) CERROR("bad opc %u version %08x, expecting %08x\n", - msg->opc, msg->version, LUSTRE_DLM_VERSION); + lustre_msg_get_opc(msg), + lustre_msg_get_version(msg), + LUSTRE_DLM_VERSION); break; case OBD_LOG_CANCEL: - case LLOG_ORIGIN_HANDLE_OPEN: + case LLOG_ORIGIN_HANDLE_CREATE: case LLOG_ORIGIN_HANDLE_NEXT_BLOCK: - case LLOG_ORIGIN_HANDLE_PREV_BLOCK: case LLOG_ORIGIN_HANDLE_READ_HEADER: case LLOG_ORIGIN_HANDLE_CLOSE: + case LLOG_ORIGIN_HANDLE_DESTROY: + case LLOG_ORIGIN_HANDLE_PREV_BLOCK: case LLOG_CATINFO: rc = lustre_msg_check_version(msg, LUSTRE_LOG_VERSION); if (rc) CERROR("bad opc %u version %08x, expecting %08x\n", - msg->opc, msg->version, LUSTRE_LOG_VERSION); - break; - case OST_CREATE: - case OST_WRITE: - case OST_GET_INFO: - case OST_SET_INFO: - rc = lustre_msg_check_version(msg, LUSTRE_OBD_VERSION); - if (rc) - CERROR("bad opc %u version %08x, expecting %08x\n", - msg->opc, msg->version, LUSTRE_OBD_VERSION); - break; - case SEC_INIT: - case SEC_INIT_CONTINUE: - case SEC_FINI: - rc = 0; + lustre_msg_get_opc(msg), + lustre_msg_get_version(msg), + LUSTRE_LOG_VERSION); break; default: - CERROR("MDS unknown opcode %d\n", msg->opc); + CERROR("MDS unknown opcode %d\n", lustre_msg_get_opc(msg)); rc = -ENOTSUPP; - break; } - return rc; } +EXPORT_SYMBOL(mds_msg_check_version); int mds_handle(struct ptlrpc_request *req) { int should_process, fail = OBD_FAIL_MDS_ALL_REPLY_NET; - struct obd_device *obd = NULL; + int rc; struct mds_obd *mds = NULL; /* quell gcc overwarning */ - int rc = 0; + struct obd_device *obd = NULL; ENTRY; - OBD_FAIL_RETURN(OBD_FAIL_MDS_ALL_REQUEST_NET | OBD_FAIL_ONCE, 0); + if (OBD_FAIL_CHECK_ORSET(OBD_FAIL_MDS_ALL_REQUEST_NET, OBD_FAIL_ONCE)) + RETURN(0); + + LASSERT(current->journal_info == NULL); rc = mds_msg_check_version(req->rq_reqmsg); if (rc) { @@ -2820,52 +1477,35 @@ int mds_handle(struct ptlrpc_request *req) RETURN(rc); } - /* Security opc should NOT trigger any recovery events */ - if (req->rq_reqmsg->opc == SEC_INIT || - req->rq_reqmsg->opc == SEC_INIT_CONTINUE) { - if (req->rq_export) { - mds_req_add_idmapping(req, - &req->rq_export->exp_mds_data); - mds_revoke_export_locks(req->rq_export); - } - GOTO(out, rc = 0); - } else if (req->rq_reqmsg->opc == SEC_FINI) { - if (req->rq_export) { - mds_req_del_idmapping(req, - &req->rq_export->exp_mds_data); - mds_revoke_export_locks(req->rq_export); - } - GOTO(out, rc = 0); - } - - LASSERT(current->journal_info == NULL); /* XXX identical to OST */ - if (req->rq_reqmsg->opc != MDS_CONNECT) { + if (lustre_msg_get_opc(req->rq_reqmsg) != MDS_CONNECT) { struct mds_export_data *med; int recovering; if (req->rq_export == NULL) { CERROR("operation %d on unconnected MDS from %s\n", - req->rq_reqmsg->opc, - req->rq_peerstr); + lustre_msg_get_opc(req->rq_reqmsg), + libcfs_id2str(req->rq_peer)); req->rq_status = -ENOTCONN; GOTO(out, rc = -ENOTCONN); } med = &req->rq_export->exp_mds_data; obd = req->rq_export->exp_obd; - mds = &obd->u.mds; + mds = mds_req2mds(req); /* sanity check: if the xid matches, the request must * be marked as a resent or replayed */ if (req->rq_xid == le64_to_cpu(med->med_mcd->mcd_last_xid) || - req->rq_xid == le64_to_cpu(med->med_mcd->mcd_last_close_xid)) { - LASSERTF(lustre_msg_get_flags(req->rq_reqmsg) & - (MSG_RESENT | MSG_REPLAY), - "rq_xid "LPU64" matches last_xid, " - "expected RESENT flag\n", - req->rq_xid); - } + req->rq_xid == le64_to_cpu(med->med_mcd->mcd_last_close_xid)) + if (!(lustre_msg_get_flags(req->rq_reqmsg) & + (MSG_RESENT | MSG_REPLAY))) { + CERROR("rq_xid "LPU64" matches last_xid, " + "expected RESENT flag\n", + req->rq_xid); + req->rq_status = -ENOTCONN; + GOTO(out, rc = -EFAULT); + } /* else: note the opposite is not always true; a * RESENT req after a failover will usually not match * the last_xid, since it was likely never @@ -2873,15 +1513,16 @@ int mds_handle(struct ptlrpc_request *req) * match the last xid, however it could for a * committed, but still retained, open. */ + /* Check for aborted recovery. */ spin_lock_bh(&obd->obd_processing_task_lock); recovering = obd->obd_recovering; spin_unlock_bh(&obd->obd_processing_task_lock); if (recovering) { rc = mds_filter_recovery_request(req, obd, &should_process); - if (rc || should_process == 0) { + if (rc || !should_process) RETURN(rc); - } else if (should_process < 0) { + else if (should_process < 0) { req->rq_status = should_process; rc = ptlrpc_error(req); RETURN(rc); @@ -2889,20 +1530,20 @@ int mds_handle(struct ptlrpc_request *req) } } - switch (req->rq_reqmsg->opc) { + switch (lustre_msg_get_opc(req->rq_reqmsg)) { case MDS_CONNECT: DEBUG_REQ(D_INODE, req, "connect"); - OBD_FAIL_RETURN(OBD_FAIL_MDS_CONNECT_NET, 0); + if (OBD_FAIL_CHECK(OBD_FAIL_MDS_CONNECT_NET)) + RETURN(0); rc = target_handle_connect(req); if (!rc) { - struct mds_export_data *med; - - LASSERT(req->rq_export); - med = &req->rq_export->u.eu_mds_data; - mds_init_export_data(req, med); - mds_req_add_idmapping(req, med); - /* Now that we have an export, set mds. */ + /* + * XXX nikita: these assignments are useless: mds is + * never used below, and obd is only used for + * MSG_LAST_REPLAY case, which never happens for + * MDS_CONNECT. + */ obd = req->rq_export->exp_obd; mds = mds_req2mds(req); } @@ -2910,73 +1551,86 @@ int mds_handle(struct ptlrpc_request *req) case MDS_DISCONNECT: DEBUG_REQ(D_INODE, req, "disconnect"); - OBD_FAIL_RETURN(OBD_FAIL_MDS_DISCONNECT_NET, 0); + if (OBD_FAIL_CHECK(OBD_FAIL_MDS_DISCONNECT_NET)) + RETURN(0); rc = target_handle_disconnect(req); req->rq_status = rc; /* superfluous? */ break; case MDS_GETSTATUS: DEBUG_REQ(D_INODE, req, "getstatus"); - OBD_FAIL_RETURN(OBD_FAIL_MDS_GETSTATUS_NET, 0); + if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_NET)) + RETURN(0); rc = mds_getstatus(req); break; case MDS_GETATTR: DEBUG_REQ(D_INODE, req, "getattr"); - OBD_FAIL_RETURN(OBD_FAIL_MDS_GETATTR_NET, 0); - rc = mds_getattr(req, MDS_REQ_REC_OFF); + if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_NET)) + RETURN(0); + rc = mds_getattr(req, REQ_REC_OFF); + break; + + case MDS_SETXATTR: + DEBUG_REQ(D_INODE, req, "setxattr"); + if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SETXATTR_NET)) + RETURN(0); + rc = mds_setxattr(req); break; - case MDS_ACCESS_CHECK: - DEBUG_REQ(D_INODE, req, "access_check"); - OBD_FAIL_RETURN(OBD_FAIL_MDS_ACCESS_CHECK_NET, 0); - rc = mds_access_check(req, MDS_REQ_REC_OFF); + case MDS_GETXATTR: + DEBUG_REQ(D_INODE, req, "getxattr"); + if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETXATTR_NET)) + RETURN(0); + rc = mds_getxattr(req); break; - case MDS_GETATTR_LOCK: { - struct lustre_handle lockh; - DEBUG_REQ(D_INODE, req, "getattr_lock"); - OBD_FAIL_RETURN(OBD_FAIL_MDS_GETATTR_LOCK_NET, 0); + case MDS_GETATTR_NAME: { + struct lustre_handle lockh = { 0 }; + DEBUG_REQ(D_INODE, req, "getattr_name"); + if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_NAME_NET)) + RETURN(0); /* If this request gets a reconstructed reply, we won't be * acquiring any new locks in mds_getattr_lock, so we don't * want to cancel. */ - lockh.cookie = 0; - rc = mds_getattr_lock(req, MDS_REQ_REC_OFF, &lockh, - MDS_INODELOCK_UPDATE); + rc = mds_getattr_lock(req, REQ_REC_OFF, MDS_INODELOCK_UPDATE, + &lockh); /* this non-intent call (from an ioctl) is special */ req->rq_status = rc; - if (rc == 0 && lockh.cookie) - ldlm_lock_decref(&lockh, LCK_PR); + if (rc == 0 && lustre_handle_is_used(&lockh)) + ldlm_lock_decref(&lockh, LCK_CR); break; } case MDS_STATFS: DEBUG_REQ(D_INODE, req, "statfs"); - OBD_FAIL_RETURN(OBD_FAIL_MDS_STATFS_NET, 0); + if (OBD_FAIL_CHECK(OBD_FAIL_MDS_STATFS_NET)) + RETURN(0); rc = mds_statfs(req); break; case MDS_READPAGE: DEBUG_REQ(D_INODE, req, "readpage"); - OBD_FAIL_RETURN(OBD_FAIL_MDS_READPAGE_NET, 0); - rc = mds_readpage(req, MDS_REQ_REC_OFF); + if (OBD_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_NET)) + RETURN(0); + rc = mds_readpage(req, REQ_REC_OFF); - if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_MDS_SENDPAGE)) { - if (req->rq_reply_state) { - lustre_free_reply_state (req->rq_reply_state); - req->rq_reply_state = NULL; - } + if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE)) { RETURN(0); } break; + case MDS_REINT: { - __u32 *opcp = lustre_msg_buf(req->rq_reqmsg, MDS_REQ_REC_OFF, - sizeof (*opcp)); + __u32 *opcp = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, + sizeof(*opcp)); __u32 opc; - int size[3] = {sizeof(struct mds_body), mds->mds_max_mdsize, - mds->mds_max_cookiesize}; + int op = 0; + int size[4] = { sizeof(struct ptlrpc_body), + sizeof(struct mds_body), + mds->mds_max_mdsize, + mds->mds_max_cookiesize }; int bufcount; /* NB only peek inside req now; mds_reint() will swab it */ @@ -2986,54 +1640,106 @@ int mds_handle(struct ptlrpc_request *req) break; } opc = *opcp; - if (lustre_msg_swabbed (req->rq_reqmsg)) + if (lustre_msg_swabbed(req->rq_reqmsg)) __swab32s(&opc); DEBUG_REQ(D_INODE, req, "reint %d (%s)", opc, (opc < sizeof(reint_names) / sizeof(reint_names[0]) || reint_names[opc] == NULL) ? reint_names[opc] : "unknown opcode"); + switch (opc) { + case REINT_CREATE: + op = PTLRPC_LAST_CNTR + MDS_REINT_CREATE; + break; + case REINT_LINK: + op = PTLRPC_LAST_CNTR + MDS_REINT_LINK; + break; + case REINT_OPEN: + op = PTLRPC_LAST_CNTR + MDS_REINT_OPEN; + break; + case REINT_SETATTR: + op = PTLRPC_LAST_CNTR + MDS_REINT_SETATTR; + break; + case REINT_RENAME: + op = PTLRPC_LAST_CNTR + MDS_REINT_RENAME; + break; + case REINT_UNLINK: + op = PTLRPC_LAST_CNTR + MDS_REINT_UNLINK; + break; + default: + op = 0; + break; + } + + if (op && req->rq_rqbd->rqbd_service->srv_stats) + lprocfs_counter_incr( + req->rq_rqbd->rqbd_service->srv_stats, op); - OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_NET, 0); + if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_NET)) + RETURN(0); if (opc == REINT_UNLINK || opc == REINT_RENAME) - bufcount = 3; + bufcount = 4; else if (opc == REINT_OPEN) - bufcount = 2; + bufcount = 3; else - bufcount = 1; + bufcount = 2; rc = lustre_pack_reply(req, bufcount, size, NULL); if (rc) break; - rc = mds_reint(req, MDS_REQ_REC_OFF, NULL); + rc = mds_reint(req, REQ_REC_OFF, NULL); fail = OBD_FAIL_MDS_REINT_NET_REP; break; } case MDS_CLOSE: DEBUG_REQ(D_INODE, req, "close"); - OBD_FAIL_RETURN(OBD_FAIL_MDS_CLOSE_NET, 0); - rc = mds_close(req, MDS_REQ_REC_OFF); + if (OBD_FAIL_CHECK(OBD_FAIL_MDS_CLOSE_NET)) + RETURN(0); + rc = mds_close(req, REQ_REC_OFF); + fail = OBD_FAIL_MDS_CLOSE_NET_REP; + break; + + case MDS_DONE_WRITING: + DEBUG_REQ(D_INODE, req, "done_writing"); + if (OBD_FAIL_CHECK(OBD_FAIL_MDS_DONE_WRITING_NET)) + RETURN(0); + rc = mds_done_writing(req, REQ_REC_OFF); + break; + + case MDS_PIN: + DEBUG_REQ(D_INODE, req, "pin"); + if (OBD_FAIL_CHECK(OBD_FAIL_MDS_PIN_NET)) + RETURN(0); + rc = mds_pin(req, REQ_REC_OFF); + break; + + case MDS_SYNC: + DEBUG_REQ(D_INODE, req, "sync"); + if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SYNC_NET)) + RETURN(0); + rc = mds_sync(req, REQ_REC_OFF); break; - case MDS_DONE_WRITING: - DEBUG_REQ(D_INODE, req, "done_writing"); - OBD_FAIL_RETURN(OBD_FAIL_MDS_DONE_WRITING_NET, 0); - rc = mds_done_writing(req, MDS_REQ_REC_OFF); + case MDS_SET_INFO: + DEBUG_REQ(D_INODE, req, "set_info"); + rc = mds_set_info_rpc(req->rq_export, req); break; - case MDS_PIN: - DEBUG_REQ(D_INODE, req, "pin"); - OBD_FAIL_RETURN(OBD_FAIL_MDS_PIN_NET, 0); - rc = mds_pin(req, MDS_REQ_REC_OFF); + case MDS_QUOTACHECK: + DEBUG_REQ(D_INODE, req, "quotacheck"); + if (OBD_FAIL_CHECK(OBD_FAIL_MDS_QUOTACHECK_NET)) + RETURN(0); + rc = mds_handle_quotacheck(req); break; - case MDS_SYNC: - DEBUG_REQ(D_INODE, req, "sync"); - OBD_FAIL_RETURN(OBD_FAIL_MDS_SYNC_NET, 0); - rc = mds_sync(req, MDS_REQ_REC_OFF); + case MDS_QUOTACTL: + DEBUG_REQ(D_INODE, req, "quotactl"); + if (OBD_FAIL_CHECK(OBD_FAIL_MDS_QUOTACTL_NET)) + RETURN(0); + rc = mds_handle_quotactl(req); break; case OBD_PING: @@ -3043,20 +1749,23 @@ int mds_handle(struct ptlrpc_request *req) case OBD_LOG_CANCEL: CDEBUG(D_INODE, "log cancel\n"); - OBD_FAIL_RETURN(OBD_FAIL_OBD_LOG_CANCEL_NET, 0); + if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOG_CANCEL_NET)) + RETURN(0); rc = -ENOTSUPP; /* la la la */ break; case LDLM_ENQUEUE: DEBUG_REQ(D_INODE, req, "enqueue"); - OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0); + if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_ENQUEUE)) + RETURN(0); rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast, ldlm_server_blocking_ast, NULL); fail = OBD_FAIL_LDLM_REPLY; break; case LDLM_CONVERT: DEBUG_REQ(D_INODE, req, "convert"); - OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0); + if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CONVERT)) + RETURN(0); rc = ldlm_handle_convert(req); break; case LDLM_BL_CALLBACK: @@ -3064,56 +1773,49 @@ int mds_handle(struct ptlrpc_request *req) DEBUG_REQ(D_INODE, req, "callback"); CERROR("callbacks should not happen on MDS\n"); LBUG(); - OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0); + if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_BL_CALLBACK)) + RETURN(0); + break; + case LLOG_ORIGIN_HANDLE_CREATE: + DEBUG_REQ(D_INODE, req, "llog_init"); + if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOGD_NET)) + RETURN(0); + rc = llog_origin_handle_create(req); break; - case LLOG_ORIGIN_HANDLE_OPEN: + case LLOG_ORIGIN_HANDLE_DESTROY: DEBUG_REQ(D_INODE, req, "llog_init"); - OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0); - rc = llog_origin_handle_open(req); + if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOGD_NET)) + RETURN(0); + rc = llog_origin_handle_destroy(req); break; case LLOG_ORIGIN_HANDLE_NEXT_BLOCK: DEBUG_REQ(D_INODE, req, "llog next block"); - OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0); + if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOGD_NET)) + RETURN(0); rc = llog_origin_handle_next_block(req); break; case LLOG_ORIGIN_HANDLE_PREV_BLOCK: DEBUG_REQ(D_INODE, req, "llog prev block"); - OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0); + if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOGD_NET)) + RETURN(0); rc = llog_origin_handle_prev_block(req); break; case LLOG_ORIGIN_HANDLE_READ_HEADER: DEBUG_REQ(D_INODE, req, "llog read header"); - OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0); + if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOGD_NET)) + RETURN(0); rc = llog_origin_handle_read_header(req); break; case LLOG_ORIGIN_HANDLE_CLOSE: DEBUG_REQ(D_INODE, req, "llog close"); - OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0); + if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOGD_NET)) + RETURN(0); rc = llog_origin_handle_close(req); break; - case OST_CREATE: - DEBUG_REQ(D_INODE, req, "ost_create"); - rc = mdt_obj_create(req); - break; - case OST_GET_INFO: - DEBUG_REQ(D_INODE, req, "get_info"); - rc = mdt_get_info(req); - break; - case OST_SET_INFO: - DEBUG_REQ(D_INODE, req, "set_info"); - rc = mdt_set_info(req); - break; - case OST_WRITE: - CDEBUG(D_INODE, "write\n"); - OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0); - rc = ost_brw_write(req, NULL); - LASSERT(current->journal_info == NULL); - /* mdt_brw sends its own replies */ - RETURN(rc); - break; case LLOG_CATINFO: DEBUG_REQ(D_INODE, req, "llog catinfo"); - OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0); + if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOGD_NET)) + RETURN(0); rc = llog_catinfo(req); break; default: @@ -3124,410 +1826,203 @@ int mds_handle(struct ptlrpc_request *req) LASSERT(current->journal_info == NULL); - EXIT; - /* If we're DISCONNECTing, the mds_export_data is already freed */ - if (!rc && req->rq_reqmsg->opc != MDS_DISCONNECT) { + if (!rc && lustre_msg_get_opc(req->rq_reqmsg) != MDS_DISCONNECT) { struct mds_export_data *med = &req->rq_export->exp_mds_data; - struct obd_device *obd = list_entry(mds, struct obd_device, - u.mds); - req->rq_repmsg->last_xid = - le64_to_cpu(med->med_mcd->mcd_last_xid); - - if (!obd->obd_no_transno) { - req->rq_repmsg->last_committed = - obd->obd_last_committed; - } else { - DEBUG_REQ(D_IOCTL, req, - "not sending last_committed update"); - } - CDEBUG(D_INFO, "last_transno "LPU64", last_committed "LPU64 - ", xid "LPU64"\n", - mds->mds_last_transno, obd->obd_last_committed, - req->rq_xid); + + /* I don't think last_xid is used for anyway, so I'm not sure + if we need to care about last_close_xid here.*/ + lustre_msg_set_last_xid(req->rq_repmsg, + le64_to_cpu(med->med_mcd->mcd_last_xid)); + + target_committed_to_req(req); } - out: + EXIT; + out: target_send_reply(req, rc, fail); return 0; } -/* Update the server data on disk. This stores the new mount_count and also the - * last_rcvd value to disk. If we don't have a clean shutdown, then the server - * last_rcvd value may be less than that of the clients. This will alert us - * that we may need to do client recovery. +/* Update the server data on disk. This stores the new mount_count and + * also the last_rcvd value to disk. If we don't have a clean shutdown, + * then the server last_rcvd value may be less than that of the clients. + * This will alert us that we may need to do client recovery. * * Also assumes for mds_last_transno that we are not modifying it (no locking). */ int mds_update_server_data(struct obd_device *obd, int force_sync) { struct mds_obd *mds = &obd->u.mds; - struct mds_server_data *msd = mds->mds_server_data; + struct lr_server_data *lsd = mds->mds_server_data; struct file *filp = mds->mds_rcvd_filp; struct lvfs_run_ctxt saved; loff_t off = 0; int rc; ENTRY; - push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); - msd->msd_last_transno = cpu_to_le64(mds->mds_last_transno); - CDEBUG(D_SUPER, "MDS mount_count is "LPU64", last_transno is "LPU64"\n", mds->mds_mount_count, mds->mds_last_transno); - rc = fsfilt_write_record(obd, filp, msd, sizeof(*msd), &off, force_sync); - if (rc) - CERROR("error writing MDS server data: rc = %d\n", rc); - pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); - - RETURN(rc); -} - -/* saves last allocated fid counter to file. */ -int mds_update_last_fid(struct obd_device *obd, void *handle, - int force_sync) -{ - struct mds_obd *mds = &obd->u.mds; - struct file *filp = mds->mds_fid_filp; - struct lvfs_run_ctxt saved; - loff_t off = 0; - __u64 last_fid; - int rc = 0; - ENTRY; - - spin_lock(&mds->mds_last_fid_lock); - last_fid = mds->mds_last_fid; - spin_unlock(&mds->mds_last_fid_lock); - CDEBUG(D_SUPER, "MDS last_fid is #"LPU64"\n", - last_fid); + spin_lock(&mds->mds_transno_lock); + lsd->lsd_last_transno = cpu_to_le64(mds->mds_last_transno); + spin_unlock(&mds->mds_transno_lock); - if (handle) { - fsfilt_add_journal_cb(obd, mds->mds_sb, last_fid, - handle, mds_commit_last_fid_cb, - NULL); - } - push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); - rc = fsfilt_write_record(obd, filp, &last_fid, sizeof(last_fid), - &off, force_sync); + rc = fsfilt_write_record(obd, filp, lsd, sizeof(*lsd), &off,force_sync); pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); - - if (rc) { - CERROR("error writing MDS last_fid #"LPU64 - ", err = %d\n", last_fid, rc); - RETURN(rc); - } - - CDEBUG(D_SUPER, "wrote fid #"LPU64" at idx " - "%llu: err = %d\n", last_fid, off, rc); - + if (rc) + CERROR("error writing MDS server data: rc = %d\n", rc); RETURN(rc); } -void mds_set_last_fid(struct obd_device *obd, __u64 fid) -{ - struct mds_obd *mds = &obd->u.mds; - - spin_lock(&mds->mds_last_fid_lock); - if (fid > mds->mds_last_fid) - mds->mds_last_fid = fid; - spin_unlock(&mds->mds_last_fid_lock); -} - -void mds_commit_last_transno_cb(struct obd_device *obd, - __u64 transno, void *data, - int error) +static void fsoptions_to_mds_flags(struct mds_obd *mds, char *options) { - obd_transno_commit_cb(obd, transno, error); -} + char *p = options; -void mds_commit_last_fid_cb(struct obd_device *obd, - __u64 fid, void *data, - int error) -{ - if (error) { - CERROR("%s: fid "LPD64" commit error: %d\n", - obd->obd_name, fid, error); + if (!options) return; - } - - CDEBUG(D_HA, "%s: fid "LPD64" committed\n", - obd->obd_name, fid); -} - -__u64 mds_alloc_fid(struct obd_device *obd) -{ - struct mds_obd *mds = &obd->u.mds; - __u64 fid; - - spin_lock(&mds->mds_last_fid_lock); - fid = ++mds->mds_last_fid; - spin_unlock(&mds->mds_last_fid_lock); - - return fid; -} -/* - * update new lustre_id on passed @inode and saves it to inode EA. - */ -int mds_set_inode_sid(struct obd_device *obd, struct inode *inode, - void *handle, struct lustre_id *id, __u64 fid) -{ - struct mds_obd *mds = &obd->u.mds; - int alloc = 0, rc = 0; - ENTRY; - LASSERT(obd != NULL); - LASSERT(inode != NULL); - - if (id == NULL) { - OBD_ALLOC(id, sizeof(*id)); - if (id == NULL) - RETURN(-ENOMEM); - alloc = 1; - } - id_group(id) = mds->mds_num; - id_fid(id) = fid; - id_ino(id) = inode->i_ino; - id_gen(id) = inode->i_generation; - id_type(id) = (S_IFMT & inode->i_mode); - - rc = mds_update_inode_sid(obd, inode, handle, id); - if (rc) { - CERROR("Can't update inode FID EA, " - "rc = %d\n", rc); - } - - if (alloc) - OBD_FREE(id, sizeof(*id)); - RETURN(rc); -} - -/* - * reads inode self id from inode EA. Probably later this should be replaced by - * caching inode self id to avoid raeding it every time it is needed. - */ -int mds_read_inode_sid(struct obd_device *obd, struct inode *inode, - struct lustre_id *id) -{ - int rc; - ENTRY; - - LASSERT(id != NULL); - LASSERT(obd != NULL); - LASSERT(inode != NULL); + while (*options) { + int len; + + while (*p && *p != ',') + p++; + + len = p - options; + if (len == sizeof("user_xattr") - 1 && + memcmp(options, "user_xattr", len) == 0) { + mds->mds_fl_user_xattr = 1; + LCONSOLE_INFO("Enabling user_xattr\n"); + } else if (len == sizeof("nouser_xattr") - 1 && + memcmp(options, "nouser_xattr", len) == 0) { + mds->mds_fl_user_xattr = 0; + LCONSOLE_INFO("Disabling user_xattr\n"); + } else if (len == sizeof("acl") - 1 && + memcmp(options, "acl", len) == 0) { +#ifdef CONFIG_FS_POSIX_ACL + mds->mds_fl_acl = 1; + LCONSOLE_INFO("Enabling ACL\n"); +#else + CWARN("ignoring unsupported acl mount option\n"); +#endif + } else if (len == sizeof("noacl") - 1 && + memcmp(options, "noacl", len) == 0) { +#ifdef CONFIG_FS_POSIX_ACL + mds->mds_fl_acl = 0; + LCONSOLE_INFO("Disabling ACL\n"); +#endif + } - rc = fsfilt_get_md(obd, inode, &id->li_fid, - sizeof(id->li_fid), EA_SID); - if (rc < 0) { - CERROR("fsfilt_get_md() failed, " - "rc = %d\n", rc); - RETURN(rc); - } else if (!rc) { - rc = -ENODATA; - RETURN(rc); - } else { - rc = 0; + options = ++p; } - - RETURN(rc); } - -/* updates inode self id in EA. */ -int mds_update_inode_sid(struct obd_device *obd, struct inode *inode, - void *handle, struct lustre_id *id) +static int mds_lov_presetup (struct mds_obd *mds, struct lustre_cfg *lcfg) { int rc = 0; ENTRY; - LASSERT(id != NULL); - LASSERT(obd != NULL); - LASSERT(inode != NULL); - - rc = fsfilt_set_md(obd, inode, handle, &id->li_fid, - sizeof(id->li_fid), EA_SID); - if (rc) { - CERROR("fsfilt_set_md() failed, rc = %d\n", rc); - RETURN(rc); - } - - RETURN(rc); -} + if (lcfg->lcfg_bufcount >= 4 && LUSTRE_CFG_BUFLEN(lcfg, 3) > 0) { + class_uuid_t uuid; -/* - * reads inode id on master MDS. This is usualy done by CMOBD to update requests - * to master MDS by correct store cookie, needed to find inode on master MDS - * quickly. - */ -int mds_read_inode_mid(struct obd_device *obd, struct inode *inode, - struct lustre_id *id) -{ - int rc; - ENTRY; + ll_generate_random_uuid(uuid); + class_uuid_unparse(uuid, &mds->mds_lov_uuid); - LASSERT(id != NULL); - LASSERT(obd != NULL); - LASSERT(inode != NULL); + OBD_ALLOC(mds->mds_profile, LUSTRE_CFG_BUFLEN(lcfg, 3)); + if (mds->mds_profile == NULL) + RETURN(-ENOMEM); - rc = fsfilt_get_md(obd, inode, id, sizeof(*id), EA_MID); - if (rc < 0) { - CERROR("fsfilt_get_md() failed, rc = %d\n", rc); - RETURN(rc); - } else if (!rc) { - rc = -ENODATA; - RETURN(rc); - } else { - rc = 0; + strncpy(mds->mds_profile, lustre_cfg_string(lcfg, 3), + LUSTRE_CFG_BUFLEN(lcfg, 3)); } - RETURN(rc); } -/* - * updates master inode id. Usualy this is done by CMOBD after an inode is - * created and relationship between cache MDS and master one should be - * established. +/* mount the file system (secretly). lustre_cfg parameters are: + * 1 = device + * 2 = fstype + * 3 = config name + * 4 = mount options */ -int mds_update_inode_mid(struct obd_device *obd, struct inode *inode, - void *handle, struct lustre_id *id) -{ - int rc = 0; - ENTRY; - - LASSERT(id != NULL); - LASSERT(obd != NULL); - LASSERT(inode != NULL); - - rc = fsfilt_set_md(obd, inode, handle, id, - sizeof(*id), EA_MID); - if (rc) { - CERROR("fsfilt_set_md() failed, " - "rc = %d\n", rc); - RETURN(rc); - } - - RETURN(rc); -} - -/* mount the file system (secretly) */ -static int mds_setup(struct obd_device *obd, obd_count len, void *buf) +static int mds_setup(struct obd_device *obd, struct lustre_cfg* lcfg) { - struct lustre_cfg* lcfg = buf; + struct lprocfs_static_vars lvars; struct mds_obd *mds = &obd->u.mds; - struct lvfs_obd_ctxt *lvfs_ctxt = NULL; - char *options = NULL; + struct lustre_mount_info *lmi; struct vfsmount *mnt; + struct lustre_sb_info *lsi; + struct obd_uuid uuid; + __u8 *uuid_ptr; + char *str, *label; char ns_name[48]; - unsigned long page; int rc = 0; ENTRY; - if (lcfg->lcfg_bufcount < 3) - RETURN(rc = -EINVAL); - - if (LUSTRE_CFG_BUFLEN(lcfg, 1) == 0 || LUSTRE_CFG_BUFLEN(lcfg, 2) == 0) - RETURN(rc = -EINVAL); - - obd->obd_fsops = fsfilt_get_ops(lustre_cfg_string(lcfg, 2)); - if (IS_ERR(obd->obd_fsops)) - RETURN(rc = PTR_ERR(obd->obd_fsops)); - - mds->mds_max_mdsize = sizeof(struct lov_mds_md); - - page = get_zeroed_page(GFP_KERNEL); - if (!page) - RETURN(-ENOMEM); - - options = (char *)page; - - /* - * here we use "iopen_nopriv" hardcoded, because it affects MDS utility - * and the rest of options are passed by mount options. Probably this - * should be moved to somewhere else like startup scripts or lconf. */ - sprintf(options, "iopen_nopriv"); - - if (LUSTRE_CFG_BUFLEN(lcfg, 4) > 0 && lustre_cfg_buf(lcfg, 4)) - sprintf(options + strlen(options), ",%s", - lustre_cfg_string(lcfg, 4)); - - /* we have to know mdsnum before touching underlying fs -bzzz */ - atomic_set(&mds->mds_open_count, 0); - sema_init(&mds->mds_md_sem, 1); - sema_init(&mds->mds_create_sem, 1); - mds->mds_md_connected = 0; - mds->mds_md_name = NULL; - - if (LUSTRE_CFG_BUFLEN(lcfg, 5) > 0 && lustre_cfg_buf(lcfg, 5) && - strncmp(lustre_cfg_string(lcfg, 5), "dumb", LUSTRE_CFG_BUFLEN(lcfg, 5))) { - class_uuid_t uuid; + /* setup 1:/dev/loop/0 2:ext3 3:mdsA 4:errors=remount-ro,iopen_nopriv */ - generate_random_uuid(uuid); - class_uuid_unparse(uuid, &mds->mds_md_uuid); + CLASSERT(offsetof(struct obd_device, u.obt) == + offsetof(struct obd_device, u.mds.mds_obt)); - OBD_ALLOC(mds->mds_md_name, LUSTRE_CFG_BUFLEN(lcfg, 5)); - if (mds->mds_md_name == NULL) - RETURN(rc = -ENOMEM); - - memcpy(mds->mds_md_name, lustre_cfg_buf(lcfg, 5), - LUSTRE_CFG_BUFLEN(lcfg, 5)); - - CDEBUG(D_OTHER, "MDS: %s is master for %s\n", - obd->obd_name, mds->mds_md_name); + if (lcfg->lcfg_bufcount < 3) + RETURN(-EINVAL); - rc = mds_md_connect(obd, mds->mds_md_name); - if (rc) { - OBD_FREE(mds->mds_md_name, LUSTRE_CFG_BUFLEN(lcfg, 5)); - GOTO(err_ops, rc); - } - } + if (LUSTRE_CFG_BUFLEN(lcfg, 1) == 0 || LUSTRE_CFG_BUFLEN(lcfg, 2) == 0) + RETURN(-EINVAL); - mds->mds_obd_type = MDS_MASTER_OBD; - - if (LUSTRE_CFG_BUFLEN(lcfg, 6) > 0 && lustre_cfg_buf(lcfg, 6) && - strncmp(lustre_cfg_string(lcfg, 6), "dumb", - LUSTRE_CFG_BUFLEN(lcfg, 6))) { - if (!memcmp(lustre_cfg_string(lcfg, 6), "master", - strlen("master"))) { - mds->mds_obd_type = MDS_MASTER_OBD; - } else if (!memcmp(lustre_cfg_string(lcfg, 6), "cache", - strlen("cache"))) { - mds->mds_obd_type = MDS_CACHE_OBD; - } + lmi = server_get_mount(obd->obd_name); + if (!lmi) { + CERROR("Not mounted in lustre_fill_super?\n"); + RETURN(-EINVAL); } - rc = lvfs_mount_fs(lustre_cfg_string(lcfg, 1), - lustre_cfg_string(lcfg, 2), - options, 0, &lvfs_ctxt); - - free_page(page); + /* We mounted in lustre_fill_super. + lcfg bufs 1, 2, 4 (device, fstype, mount opts) are ignored.*/ - if (rc || !lvfs_ctxt) { - CERROR("lvfs_mount_fs failed: rc = %d\n", rc); - GOTO(err_ops, rc); - } - - mnt = lvfs_ctxt->loc_mnt; - mds->mds_lvfs_ctxt = lvfs_ctxt; - ll_clear_rdonly(ll_sbdev(mnt->mnt_sb)); + lsi = s2lsi(lmi->lmi_sb); + fsoptions_to_mds_flags(mds, lsi->lsi_ldd->ldd_mount_opts); + fsoptions_to_mds_flags(mds, lsi->lsi_lmd->lmd_opts); + mnt = lmi->lmi_mnt; + obd->obd_fsops = fsfilt_get_ops(MT_STR(lsi->lsi_ldd)); + if (IS_ERR(obd->obd_fsops)) + GOTO(err_put, rc = PTR_ERR(obd->obd_fsops)); CDEBUG(D_SUPER, "%s: mnt = %p\n", lustre_cfg_string(lcfg, 1), mnt); + LASSERT(!lvfs_check_rdonly(lvfs_sbdev(mnt->mnt_sb))); + sema_init(&mds->mds_epoch_sem, 1); - atomic_set(&mds->mds_real_clients, 0); spin_lock_init(&mds->mds_transno_lock); - spin_lock_init(&mds->mds_last_fid_lock); - sema_init(&mds->mds_orphan_recovery_sem, 1); + mds->mds_max_mdsize = sizeof(struct lov_mds_md); mds->mds_max_cookiesize = sizeof(struct llog_cookie); + mds->mds_atime_diff = MAX_ATIME_DIFF; + mds->mds_evict_ost_nids = 1; sprintf(ns_name, "mds-%s", obd->obd_uuid.uuid); - obd->obd_namespace = ldlm_namespace_new(ns_name, LDLM_NAMESPACE_SERVER); - + obd->obd_namespace = ldlm_namespace_new(ns_name, LDLM_NAMESPACE_SERVER, + LDLM_NAMESPACE_GREEDY); if (obd->obd_namespace == NULL) { - mds_cleanup(obd, 0); - GOTO(err_put, rc = -ENOMEM); + mds_cleanup(obd); + GOTO(err_ops, rc = -ENOMEM); } ldlm_register_intent(obd->obd_namespace, mds_intent_policy); + lprocfs_mds_init_vars(&lvars); + if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0 && + lprocfs_alloc_obd_stats(obd, LPROC_MDS_LAST) == 0) { + /* Init private stats here */ + mds_stats_counter_init(obd->obd_stats); + obd->obd_proc_exports_entry = lprocfs_register("exports", + obd->obd_proc_entry, + NULL, NULL); + if (IS_ERR(obd->obd_proc_exports_entry)) { + rc = PTR_ERR(obd->obd_proc_exports_entry); + CERROR("error %d setting up lprocfs for %s\n", + rc, "exports"); + obd->obd_proc_exports_entry = NULL; + } + } + rc = mds_fs_setup(obd, mnt); if (rc) { CERROR("%s: MDS filesystem method init failed: rc = %d\n", @@ -3535,89 +2030,130 @@ static int mds_setup(struct obd_device *obd, obd_count len, void *buf) GOTO(err_ns, rc); } - rc = llog_start_commit_thread(); - if (rc < 0) + if (obd->obd_proc_exports_entry) + lprocfs_add_simple(obd->obd_proc_exports_entry, + "clear", lprocfs_nid_stats_clear_read, + lprocfs_nid_stats_clear_write, obd); + rc = mds_lov_presetup(mds, lcfg); + if (rc < 0) GOTO(err_fs, rc); + ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL, + "mds_ldlm_client", &obd->obd_ldlm_client); + obd->obd_replayable = 1; - if (LUSTRE_CFG_BUFLEN(lcfg, 3) > 0 && lustre_cfg_buf(lcfg, 3) && - strncmp(lustre_cfg_string(lcfg, 3), "dumb", - LUSTRE_CFG_BUFLEN(lcfg, 3))) { - class_uuid_t uuid; - - generate_random_uuid(uuid); - class_uuid_unparse(uuid, &mds->mds_dt_uuid); - - OBD_ALLOC(mds->mds_profile, LUSTRE_CFG_BUFLEN(lcfg, 3)); - if (mds->mds_profile == NULL) - GOTO(err_fs, rc = -ENOMEM); + rc = lquota_setup(mds_quota_interface_ref, obd); + if (rc) + GOTO(err_fs, rc); - strncpy(mds->mds_profile, lustre_cfg_string(lcfg, 3), - LUSTRE_CFG_BUFLEN(lcfg, 3)); +#if 0 + mds->mds_group_hash = upcall_cache_init(obd->obd_name); + if (IS_ERR(mds->mds_group_hash)) { + rc = PTR_ERR(mds->mds_group_hash); + mds->mds_group_hash = NULL; + GOTO(err_qctxt, rc); } +#endif - /* - * setup root dir and files ID dir if lmv already connected, or there is - * not lmv at all. - */ - if (mds->mds_md_exp || (LUSTRE_CFG_BUFLEN(lcfg, 3) > 0 && - lustre_cfg_buf(lcfg, 3) && - strncmp(lustre_cfg_string(lcfg, 3), "dumb", - LUSTRE_CFG_BUFLEN(lcfg, 3)))) { - rc = mds_fs_setup_rootid(obd); - if (rc) - GOTO(err_fs, rc); + /* Don't wait for mds_postrecov trying to clear orphans */ + obd->obd_async_recov = 1; + rc = mds_postsetup(obd); + /* Bug 11557 - allow async abort_recov start + FIXME can remove most of this obd_async_recov plumbing + obd->obd_async_recov = 0; + */ + if (rc) + GOTO(err_qctxt, rc); - rc = mds_fs_setup_virtid(obd); - if (rc) - GOTO(err_fs, rc); + uuid_ptr = fsfilt_uuid(obd, obd->u.obt.obt_sb); + if (uuid_ptr != NULL) { + class_uuid_unparse(uuid_ptr, &uuid); + str = uuid.uuid; + } else { + str = "no UUID"; } - ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL, - "mds_ldlm_client", &obd->obd_ldlm_client); - obd->obd_replayable = 1; + label = fsfilt_get_label(obd, obd->u.obt.obt_sb); + if (obd->obd_recovering) { + LCONSOLE_WARN("MDT %s now serving %s (%s%s%s), but will be in " + "recovery until %d %s reconnect, or if no clients" + " reconnect for %d:%.02d; during that time new " + "clients will not be allowed to connect. " + "Recovery progress can be monitored by watching " + "/proc/fs/lustre/mds/%s/recovery_status.\n", + obd->obd_name, lustre_cfg_string(lcfg, 1), + label ?: "", label ? "/" : "", str, + obd->obd_max_recoverable_clients, + (obd->obd_max_recoverable_clients == 1) ? + "client" : "clients", + (int)(OBD_RECOVERY_TIMEOUT) / 60, + (int)(OBD_RECOVERY_TIMEOUT) % 60, + obd->obd_name); + } else { + LCONSOLE_INFO("MDT %s now serving %s (%s%s%s) with recovery " + "%s\n", obd->obd_name, lustre_cfg_string(lcfg, 1), + label ?: "", label ? "/" : "", str, + obd->obd_replayable ? "enabled" : "disabled"); + } - rc = mds_postsetup(obd); - if (rc) - GOTO(err_fs, rc); + if (ldlm_timeout == LDLM_TIMEOUT_DEFAULT) + ldlm_timeout = 6; RETURN(0); +err_qctxt: + lquota_cleanup(mds_quota_interface_ref, obd); err_fs: /* No extra cleanup needed for llog_init_commit_thread() */ - mds_fs_cleanup(obd, 0); + mds_fs_cleanup(obd); +#if 0 + upcall_cache_cleanup(mds->mds_group_hash); + mds->mds_group_hash = NULL; +#endif err_ns: + lprocfs_free_obd_stats(obd); + lprocfs_obd_cleanup(obd); ldlm_namespace_free(obd->obd_namespace, 0); obd->obd_namespace = NULL; -err_put: - unlock_kernel(); - lvfs_umount_fs(mds->mds_lvfs_ctxt); - mds->mds_sb = 0; - lock_kernel(); err_ops: fsfilt_put_ops(obd->obd_fsops); +err_put: + server_put_mount(obd->obd_name, mnt); + obd->u.obt.obt_sb = NULL; return rc; } -static int mds_fs_post_setup(struct obd_device *obd) +static int mds_lov_clean(struct obd_device *obd) { struct mds_obd *mds = &obd->u.mds; - struct dentry *dentry; - int rc = 0; + struct obd_device *osc = mds->mds_osc_obd; ENTRY; - - dentry = mds_id2dentry(obd, &mds->mds_rootid, NULL); - if (IS_ERR(dentry)) { - CERROR("Can't find ROOT, err = %d\n", - (int)PTR_ERR(dentry)); - RETURN(PTR_ERR(dentry)); + + if (mds->mds_profile) { + class_del_profile(mds->mds_profile); + OBD_FREE(mds->mds_profile, strlen(mds->mds_profile) + 1); + mds->mds_profile = NULL; } - - rc = fsfilt_post_setup(obd, dentry); - l_dput(dentry); - RETURN(rc); + /* There better be a lov */ + if (!osc) + RETURN(0); + if (IS_ERR(osc)) + RETURN(PTR_ERR(osc)); + + obd_register_observer(osc, NULL); + + /* Give lov our same shutdown flags */ + osc->obd_force = obd->obd_force; + osc->obd_fail = obd->obd_fail; + + /* Cleanup the lov */ + obd_disconnect(mds->mds_osc_exp); + class_manual_cleanup(osc); + mds->mds_osc_exp = NULL; + + RETURN(0); } static int mds_postsetup(struct obd_device *obd) @@ -3626,202 +2162,153 @@ static int mds_postsetup(struct obd_device *obd) int rc = 0; ENTRY; - rc = obd_llog_setup(obd, &obd->obd_llogs, LLOG_CONFIG_ORIG_CTXT, - obd, 0, NULL, &llog_lvfs_ops); + rc = llog_setup(obd, &obd->obd_olg, LLOG_CONFIG_ORIG_CTXT, obd, 0, NULL, + &llog_lvfs_ops); + if (rc) + RETURN(rc); + + rc = llog_setup(obd, &obd->obd_olg, LLOG_LOVEA_ORIG_CTXT, obd, 0, NULL, + &llog_lvfs_ops); if (rc) RETURN(rc); if (mds->mds_profile) { - struct llog_ctxt *lgctxt; - struct lvfs_run_ctxt saved; struct lustre_profile *lprof; - struct config_llog_instance cfg; - - cfg.cfg_instance = NULL; - cfg.cfg_uuid = mds->mds_dt_uuid; - push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); - - lgctxt = llog_get_context(&obd->obd_llogs, LLOG_CONFIG_ORIG_CTXT); - if (!lgctxt) { - pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); - GOTO(err_llog, rc = -EINVAL); - } - - rc = class_config_process_llog(lgctxt, mds->mds_profile, &cfg); - pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); - - if (rc) - GOTO(err_llog, rc); - + /* The profile defines which osc and mdc to connect to, for a + client. We reuse that here to figure out the name of the + lov to use (and ignore lprof->lp_md). + The profile was set in the config log with + LCFG_MOUNTOPT profilenm oscnm mdcnm */ lprof = class_get_profile(mds->mds_profile); if (lprof == NULL) { CERROR("No profile found: %s\n", mds->mds_profile); GOTO(err_cleanup, rc = -ENOENT); } - rc = mds_dt_connect(obd, lprof->lp_lov); - if (rc) - GOTO(err_cleanup, rc); - - rc = mds_md_postsetup(obd); + rc = mds_lov_connect(obd, lprof->lp_dt); if (rc) GOTO(err_cleanup, rc); - } - rc = mds_fs_post_setup(obd); - if (rc) - CERROR("can not post setup fsfilt\n"); - RETURN(rc); -err_cleanup: - mds_dt_clean(obd); -err_llog: - obd_llog_cleanup(llog_get_context(&obd->obd_llogs, - LLOG_CONFIG_ORIG_CTXT)); - return rc; -} - -int mds_postrecov_common(struct obd_device *obd) -{ - struct mds_obd *mds = &obd->u.mds; - struct llog_ctxt *ctxt; - int rc, item = 0, valsize; - __u32 group; - ENTRY; - - LASSERT(!obd->obd_recovering); - ctxt = llog_get_context(&obd->obd_llogs, LLOG_UNLINK_ORIG_CTXT); - LASSERT(ctxt != NULL); - - /* clean PENDING dir */ - rc = mds_cleanup_orphans(obd); - if (rc < 0) - GOTO(out, rc); - item = rc; - - group = FILTER_GROUP_FIRST_MDS + mds->mds_num; - valsize = sizeof(group); - rc = obd_set_info(mds->mds_dt_exp, strlen("mds_conn"), - "mds_conn", valsize, &group); - if (rc) - GOTO(out, rc); - - rc = llog_connect(ctxt, obd->u.mds.mds_dt_desc.ld_tgt_count, - NULL, NULL, NULL); - if (rc) { - CERROR("%s: failed at llog_origin_connect: %d\n", - obd->obd_name, rc); - GOTO(out, rc); - } - - /* remove the orphaned precreated objects */ - rc = mds_dt_clear_orphans(mds, NULL /* all OSTs */); - if (rc) - GOTO(err_llog, rc); + } -out: - RETURN(rc < 0 ? rc : item); + RETURN(rc); -err_llog: - /* cleanup all llogging subsystems */ - rc = obd_llog_finish(obd, &obd->obd_llogs, - mds->mds_dt_desc.ld_tgt_count); - if (rc) - CERROR("%s: failed to cleanup llogging subsystems\n", - obd->obd_name); - goto out; +err_cleanup: + mds_lov_clean(obd); + llog_cleanup(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT)); + llog_cleanup(llog_get_context(obd, LLOG_LOVEA_ORIG_CTXT)); + RETURN(rc); } int mds_postrecov(struct obd_device *obd) { - int rc; + int rc = 0; ENTRY; - rc = mds_postrecov_common(obd); - if (rc == 0) - rc = mds_md_reconnect(obd); - RETURN(rc); -} -int mds_dt_clean(struct obd_device *obd) -{ - struct mds_obd *mds = &obd->u.mds; - ENTRY; + if (obd->obd_fail) + RETURN(0); - if (mds->mds_profile) { - char * cln_prof; - struct llog_ctxt *llctx; - struct lvfs_run_ctxt saved; - struct config_llog_instance cfg; - int len = strlen(mds->mds_profile) + sizeof("-clean") + 1; + LASSERT(!obd->obd_recovering); + LASSERT(!llog_ctxt_null(obd, LLOG_MDS_OST_ORIG_CTXT)); - OBD_ALLOC(cln_prof, len); - sprintf(cln_prof, "%s-clean", mds->mds_profile); + /* clean PENDING dir */ + if (strncmp(obd->obd_name, MDD_OBD_NAME, strlen(MDD_OBD_NAME))) + rc = mds_cleanup_pending(obd); + if (rc < 0) + GOTO(out, rc); - cfg.cfg_instance = NULL; - cfg.cfg_uuid = mds->mds_dt_uuid; + /* FIXME Does target_finish_recovery really need this to block? */ + /* Notify the LOV, which will in turn call mds_notify for each tgt */ + /* This means that we have to hack obd_notify to think we're obd_set_up + during mds_lov_connect. */ + obd_notify(obd->u.mds.mds_osc_obd, NULL, + obd->obd_async_recov ? OBD_NOTIFY_SYNC_NONBLOCK : + OBD_NOTIFY_SYNC, NULL); - push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); - llctx = llog_get_context(&obd->obd_llogs, - LLOG_CONFIG_ORIG_CTXT); - class_config_process_llog(llctx, cln_prof, &cfg); - pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); + /* quota recovery */ + lquota_recovery(mds_quota_interface_ref, obd); - OBD_FREE(cln_prof, len); - OBD_FREE(mds->mds_profile, strlen(mds->mds_profile) + 1); - mds->mds_profile = NULL; - } - RETURN(0); +out: + RETURN(rc); } -int mds_md_clean(struct obd_device *obd) +/* We need to be able to stop an mds_lov_synchronize */ +static int mds_lov_early_clean(struct obd_device *obd) { struct mds_obd *mds = &obd->u.mds; - ENTRY; + struct obd_device *osc = mds->mds_osc_obd; - if (mds->mds_md_name) { - OBD_FREE(mds->mds_md_name, strlen(mds->mds_md_name) + 1); - mds->mds_md_name = NULL; - } - RETURN(0); + if (!osc || (!obd->obd_force && !obd->obd_fail)) + return(0); + + CDEBUG(D_HA, "abort inflight\n"); + return (obd_precleanup(osc, OBD_CLEANUP_EARLY)); } -static int mds_precleanup(struct obd_device *obd, int flags) +static int mds_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage) { int rc = 0; ENTRY; - mds_md_clean(obd); - mds_dt_disconnect(obd, flags); - mds_dt_clean(obd); - obd_llog_cleanup(llog_get_context(&obd->obd_llogs, LLOG_CONFIG_ORIG_CTXT)); + switch (stage) { + case OBD_CLEANUP_EARLY: + break; + case OBD_CLEANUP_EXPORTS: + /*XXX Use this for mdd mds cleanup, so comment out + *this target_cleanup_recovery for this tmp MDD MDS + *Wangdi*/ + if (strncmp(obd->obd_name, MDD_OBD_NAME, strlen(MDD_OBD_NAME))) + target_cleanup_recovery(obd); + mds_lov_early_clean(obd); + break; + case OBD_CLEANUP_SELF_EXP: + mds_lov_disconnect(obd); + mds_lov_clean(obd); + llog_cleanup(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT)); + llog_cleanup(llog_get_context(obd, LLOG_LOVEA_ORIG_CTXT)); + rc = obd_llog_finish(obd, 0); + break; + case OBD_CLEANUP_OBD: + break; + } RETURN(rc); } -extern void lgss_svc_cache_purge_all(void); -static int mds_cleanup(struct obd_device *obd, int flags) +static int mds_cleanup(struct obd_device *obd) { struct mds_obd *mds = &obd->u.mds; + lvfs_sbdev_type save_dev; ENTRY; - if (mds->mds_sb == NULL) + if (obd->u.obt.obt_sb == NULL) RETURN(0); + save_dev = lvfs_sbdev(obd->u.obt.obt_sb); - mds_update_server_data(obd, 1); - mds_update_last_fid(obd, NULL, 1); - - if (mds->mds_dt_objids != NULL) { - int size = mds->mds_dt_desc.ld_tgt_count * - sizeof(obd_id); - OBD_FREE(mds->mds_dt_objids, size); - } - mds_fs_cleanup(obd, flags); + if (mds->mds_osc_exp) + /* lov export was disconnected by mds_lov_clean; + we just need to drop our ref */ + class_export_put(mds->mds_osc_exp); + + lprocfs_remove_proc_entry("clear", obd->obd_proc_exports_entry); + lprocfs_free_per_client_stats(obd); + lprocfs_free_obd_stats(obd); + lprocfs_obd_cleanup(obd); - unlock_kernel(); + lquota_cleanup(mds_quota_interface_ref, obd); - /* 2 seems normal on mds, (may_umount() also expects 2 - fwiw), but we only see 1 at this point in obdfilter. */ - lvfs_umount_fs(mds->mds_lvfs_ctxt); + mds_update_server_data(obd, 1); + /* XXX + mds_lov_destroy_objids(obd); + */ + mds_fs_cleanup(obd); + +#if 0 + upcall_cache_cleanup(mds->mds_group_hash); + mds->mds_group_hash = NULL; +#endif - mds->mds_sb = 0; + server_put_mount(obd->obd_name, mds->mds_vfsmnt); + obd->u.obt.obt_sb = NULL; - ldlm_namespace_free(obd->obd_namespace, flags & OBD_OPT_FORCE); + ldlm_namespace_free(obd->obd_namespace, obd->obd_force); spin_lock_bh(&obd->obd_processing_task_lock); if (obd->obd_recovering) { @@ -3830,97 +2317,28 @@ static int mds_cleanup(struct obd_device *obd, int flags) } spin_unlock_bh(&obd->obd_processing_task_lock); - lock_kernel(); fsfilt_put_ops(obd->obd_fsops); -#ifdef ENABLE_GSS - /* XXX */ - lgss_svc_cache_purge_all(); -#endif - - spin_lock(&mds->mds_denylist_lock); - while (!list_empty( &mds->mds_denylist ) ) { - deny_sec_t *p_deny_sec = list_entry(mds->mds_denylist.next, - deny_sec_t, list); - list_del(&p_deny_sec->list); - OBD_FREE(p_deny_sec, sizeof(*p_deny_sec)); - } - spin_unlock(&mds->mds_denylist_lock); + LCONSOLE_INFO("MDT %s has stopped.\n", obd->obd_name); RETURN(0); } -static int set_security(const char *value, char **sec) -{ - if (!strcmp(value, "null")) - *sec = "null"; - else if (!strcmp(value, "krb5i")) - *sec = "krb5i"; - else if (!strcmp(value, "krb5p")) - *sec = "krb5p"; - else { - CERROR("Unrecognized security flavor %s\n", value); - return -EINVAL; - } - - return 0; -} - -static int mds_process_config(struct obd_device *obd, obd_count len, void *buf) -{ - struct lustre_cfg *lcfg = buf; - struct mds_obd *mds = &obd->u.mds; - int rc = 0; - ENTRY; - - switch(lcfg->lcfg_command) { - case LCFG_SET_SECURITY: { - if ((LUSTRE_CFG_BUFLEN(lcfg, 1) == 0) || - (LUSTRE_CFG_BUFLEN(lcfg, 2) == 0)) - GOTO(out, rc = -EINVAL); - - if (!strcmp(lustre_cfg_string(lcfg, 1), "mds_sec")) - rc = set_security(lustre_cfg_string(lcfg, 2), - &mds->mds_mds_sec); - else if (!strcmp(lustre_cfg_string(lcfg, 1), "oss_sec")) - rc = set_security(lustre_cfg_string(lcfg, 2), - &mds->mds_ost_sec); - else if (!strcmp(lustre_cfg_string(lcfg, 1), "deny_sec")){ - spin_lock(&mds->mds_denylist_lock); - rc = add_deny_security(lustre_cfg_string(lcfg, 2), - &mds->mds_denylist); - spin_unlock(&mds->mds_denylist_lock); - } else { - CERROR("Unrecognized key\n"); - rc = -EINVAL; - } - break; - } - default: - CERROR("Unknown command: %d\n", lcfg->lcfg_command); - GOTO(out, rc = -EINVAL); - } -out: - RETURN(rc); -} - -static void fixup_handle_for_resent_req(struct ptlrpc_request *req, - int offset, +static void fixup_handle_for_resent_req(struct ptlrpc_request *req, int offset, struct ldlm_lock *new_lock, struct ldlm_lock **old_lock, struct lustre_handle *lockh) { struct obd_export *exp = req->rq_export; - struct obd_device *obd = exp->exp_obd; struct ldlm_request *dlmreq = - lustre_msg_buf(req->rq_reqmsg, offset, sizeof (*dlmreq)); - struct lustre_handle remote_hdl = dlmreq->lock_handle1; + lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*dlmreq)); + struct lustre_handle remote_hdl = dlmreq->lock_handle[0]; struct list_head *iter; if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT)) return; - spin_lock(&obd->obd_namespace->ns_hash_lock); + spin_lock(&exp->exp_ldlm_data.led_lock); list_for_each(iter, &exp->exp_ldlm_data.led_held_locks) { struct ldlm_lock *lock; lock = list_entry(iter, struct ldlm_lock, l_export_chain); @@ -3929,24 +2347,24 @@ static void fixup_handle_for_resent_req(struct ptlrpc_request *req, if (lock->l_remote_handle.cookie == remote_hdl.cookie) { lockh->cookie = lock->l_handle.h_cookie; LDLM_DEBUG(lock, "restoring lock cookie"); - DEBUG_REQ(D_HA, req, "restoring lock cookie "LPX64, + DEBUG_REQ(D_DLMTRACE, req,"restoring lock cookie "LPX64, lockh->cookie); if (old_lock) *old_lock = LDLM_LOCK_GET(lock); - spin_unlock(&obd->obd_namespace->ns_hash_lock); + spin_unlock(&exp->exp_ldlm_data.led_lock); return; } } - spin_unlock(&obd->obd_namespace->ns_hash_lock); + spin_unlock(&exp->exp_ldlm_data.led_lock); /* If the xid matches, then we know this is a resent request, * and allow it. (It's probably an OPEN, for which we don't * send a lock */ - if (req->rq_xid == + if (req->rq_xid == le64_to_cpu(exp->exp_mds_data.med_mcd->mcd_last_xid)) return; - if (req->rq_xid == + if (req->rq_xid == le64_to_cpu(exp->exp_mds_data.med_mcd->mcd_last_close_xid)) return; @@ -3956,7 +2374,7 @@ static void fixup_handle_for_resent_req(struct ptlrpc_request *req, lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT); - DEBUG_REQ(D_HA, req, "no existing lock with rhandle "LPX64, + DEBUG_REQ(D_DLMTRACE, req, "no existing lock with rhandle "LPX64, remote_hdl.cookie); } @@ -3983,29 +2401,27 @@ static int mds_intent_policy(struct ldlm_namespace *ns, struct ldlm_intent *it; struct mds_obd *mds = &req->rq_export->exp_obd->u.mds; struct ldlm_reply *rep; - struct lustre_handle lockh[2] = {{0}, {0}}; + struct lustre_handle lockh = { 0 }; struct ldlm_lock *new_lock = NULL; int getattr_part = MDS_INODELOCK_UPDATE; - int rc, reply_buffers; - int repsize[5] = {sizeof(struct ldlm_reply), - sizeof(struct mds_body), - mds->mds_max_mdsize}; - - int offset = MDS_REQ_INTENT_REC_OFF; + int repsize[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body), + [DLM_LOCKREPLY_OFF] = sizeof(struct ldlm_reply), + [DLM_REPLY_REC_OFF] = sizeof(struct mds_body), + [DLM_REPLY_REC_OFF+1] = mds->mds_max_mdsize }; + int repbufcnt = 4, rc; ENTRY; LASSERT(req != NULL); - MD_COUNTER_INCREMENT(req->rq_export->exp_obd, intent_lock); - if (req->rq_reqmsg->bufcount <= MDS_REQ_INTENT_IT_OFF) { + if (lustre_msg_bufcount(req->rq_reqmsg) <= DLM_INTENT_IT_OFF) { /* No intent was provided */ - int size = sizeof(struct ldlm_reply); - rc = lustre_pack_reply(req, 1, &size, NULL); - LASSERT(rc == 0); + rc = lustre_pack_reply(req, 2, repsize, NULL); + if (rc) + RETURN(rc); RETURN(0); } - it = lustre_swab_reqbuf(req, MDS_REQ_INTENT_IT_OFF, sizeof(*it), + it = lustre_swab_reqbuf(req, DLM_INTENT_IT_OFF, sizeof(*it), lustre_swab_ldlm_intent); if (it == NULL) { CERROR("Intent missing\n"); @@ -4014,68 +2430,67 @@ static int mds_intent_policy(struct ldlm_namespace *ns, LDLM_DEBUG(lock, "intent policy, opc: %s", ldlm_it2str(it->opc)); - reply_buffers = 3; - if (it->opc & ( IT_OPEN | IT_GETATTR | IT_LOOKUP | IT_CHDIR )) { - if (req->rq_export->exp_mds_data.med_remote) { - reply_buffers = 4; - repsize[3] = sizeof(struct mds_remote_perm); - } else { - reply_buffers = 5; - repsize[3] = 4; - repsize[4] = xattr_acl_size(LL_ACL_MAX_ENTRIES); - } - } + if ((req->rq_export->exp_connect_flags & OBD_CONNECT_ACL) && + (it->opc & (IT_OPEN | IT_GETATTR | IT_LOOKUP))) + /* we should never allow OBD_CONNECT_ACL if not configured */ + repsize[repbufcnt++] = LUSTRE_POSIX_ACL_MAX_SIZE; + else if (it->opc & IT_UNLINK) + repsize[repbufcnt++] = mds->mds_max_cookiesize; - rc = lustre_pack_reply(req, reply_buffers, repsize, NULL); + rc = lustre_pack_reply(req, repbufcnt, repsize, NULL); if (rc) RETURN(req->rq_status = rc); - rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*rep)); - LASSERT(rep != NULL); - + rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF, sizeof(*rep)); intent_set_disposition(rep, DISP_IT_EXECD); + /* execute policy */ switch ((long)it->opc) { case IT_OPEN: case IT_CREAT|IT_OPEN: - fixup_handle_for_resent_req(req, MDS_REQ_INTENT_LOCKREQ_OFF, - lock, NULL, lockh); + mds_counter_incr(req->rq_export, LPROC_MDS_OPEN); + fixup_handle_for_resent_req(req, DLM_LOCKREQ_OFF, lock, NULL, + &lockh); /* XXX swab here to assert that an mds_open reint * packet is following */ - fixup_handle_for_resent_req(req, MDS_REQ_INTENT_LOCKREQ_OFF, - lock, NULL, lockh); - rep->lock_policy_res2 = mds_reint(req, offset, lockh); - - if (rep->lock_policy_res2) { - /* - * mds_open() returns ENOLCK where it should return - * zero, but it has no lock to return. - */ - if (rep->lock_policy_res2 == ENOLCK) - rep->lock_policy_res2 = 0; - + rep->lock_policy_res2 = mds_reint(req, DLM_INTENT_REC_OFF, + &lockh); +#if 0 + /* We abort the lock if the lookup was negative and + * we did not make it to the OPEN portion */ + if (!intent_disposition(rep, DISP_LOOKUP_EXECD)) RETURN(ELDLM_LOCK_ABORTED); - } - - /* - * IT_OPEN may return lock on cross-node dentry that we want to - * hold during attr retrival -bzzz - */ - if (lockh[0].cookie == 0) + if (intent_disposition(rep, DISP_LOOKUP_NEG) && + !intent_disposition(rep, DISP_OPEN_OPEN)) +#endif + + /* If there was an error of some sort or if we are not + * returning any locks */ + if (rep->lock_policy_res2 || + !intent_disposition(rep, DISP_OPEN_LOCK)) RETURN(ELDLM_LOCK_ABORTED); - break; case IT_LOOKUP: - getattr_part = MDS_INODELOCK_LOOKUP; - case IT_CHDIR: + getattr_part = MDS_INODELOCK_LOOKUP; case IT_GETATTR: - getattr_part |= MDS_INODELOCK_LOOKUP; + getattr_part |= MDS_INODELOCK_LOOKUP; + OBD_COUNTER_INCREMENT(req->rq_export->exp_obd, getattr); case IT_READDIR: - fixup_handle_for_resent_req(req, MDS_REQ_INTENT_LOCKREQ_OFF, - lock, &new_lock, lockh); - rep->lock_policy_res2 = mds_getattr_lock(req, offset, lockh, - getattr_part); + fixup_handle_for_resent_req(req, DLM_LOCKREQ_OFF, lock, + &new_lock, &lockh); + + /* INODEBITS_INTEROP: if this lock was converted from a + * plain lock (client does not support inodebits), then + * child lock must be taken with both lookup and update + * bits set for all operations. + */ + if (!(req->rq_export->exp_connect_flags & OBD_CONNECT_IBITS)) + getattr_part = MDS_INODELOCK_LOOKUP | + MDS_INODELOCK_UPDATE; + + rep->lock_policy_res2 = mds_getattr_lock(req,DLM_INTENT_REC_OFF, + getattr_part, &lockh); /* FIXME: LDLM can set req->rq_status. MDS sets policy_res{1,2} with disposition and status. - replay: returns 0 & req->status is old status @@ -4091,17 +2506,9 @@ static int mds_intent_policy(struct ldlm_namespace *ns, RETURN(ELDLM_LOCK_ABORTED); } break; - case IT_UNLINK: - rc = mds_lock_and_check_slave(offset, req, lockh); - if ((rep->lock_policy_res2 = rc)) { - if (rc == ENOLCK) - rep->lock_policy_res2 = 0; - RETURN(ELDLM_LOCK_ABORTED); - } - break; default: CERROR("Unhandled intent "LPD64"\n", it->opc); - LBUG(); + RETURN(-EFAULT); } /* By this point, whatever function we called above must have either @@ -4111,12 +2518,12 @@ static int mds_intent_policy(struct ldlm_namespace *ns, * client afterwards. For regular RPCs we want to give the new lock to * the client instead of whatever lock it was about to get. */ if (new_lock == NULL) - new_lock = ldlm_handle2lock(&lockh[0]); + new_lock = ldlm_handle2lock(&lockh); if (new_lock == NULL && (flags & LDLM_FL_INTENT_ONLY)) RETURN(0); LASSERTF(new_lock != NULL, "op "LPX64" lockh "LPX64"\n", - it->opc, lockh[0].cookie); + it->opc, lockh.cookie); /* If we've already given this lock to a client once, then we should * have no readers or writers. Otherwise, we should have one reader @@ -4145,11 +2552,10 @@ static int mds_intent_policy(struct ldlm_namespace *ns, new_lock->l_writers = 0; new_lock->l_export = class_export_get(req->rq_export); - - spin_lock(&new_lock->l_export->exp_ldlm_data.led_lock); + spin_lock(&req->rq_export->exp_ldlm_data.led_lock); list_add(&new_lock->l_export_chain, &new_lock->l_export->exp_ldlm_data.led_held_locks); - spin_unlock(&new_lock->l_export->exp_ldlm_data.led_lock); + spin_unlock(&req->rq_export->exp_ldlm_data.led_lock); new_lock->l_blocking_ast = lock->l_blocking_ast; new_lock->l_completion_ast = lock->l_completion_ast; @@ -4165,229 +2571,206 @@ static int mds_intent_policy(struct ldlm_namespace *ns, RETURN(ELDLM_LOCK_REPLACED); } -int mds_attach(struct obd_device *dev, obd_count len, void *data) +static int mdt_setup(struct obd_device *obd, struct lustre_cfg *lcfg) { + struct mds_obd *mds = &obd->u.mds; struct lprocfs_static_vars lvars; + int mds_min_threads; + int mds_max_threads; int rc = 0; - struct mds_obd *mds = &dev->u.mds; - - spin_lock_init(&mds->mds_denylist_lock); - INIT_LIST_HEAD(&mds->mds_denylist); - - lprocfs_init_multi_vars(0, &lvars); - - rc = lprocfs_obd_attach(dev, lvars.obd_vars); - if (rc) - return rc; - - return lprocfs_alloc_md_stats(dev, 0); -} - -int mds_detach(struct obd_device *dev) -{ - lprocfs_free_md_stats(dev); - return lprocfs_obd_detach(dev); -} - -int mdt_attach(struct obd_device *dev, obd_count len, void *data) -{ - struct lprocfs_static_vars lvars; + ENTRY; - lprocfs_init_multi_vars(1, &lvars); - return lprocfs_obd_attach(dev, lvars.obd_vars); -} + lprocfs_mdt_init_vars(&lvars); + lprocfs_obd_setup(obd, lvars.obd_vars); -int mdt_detach(struct obd_device *dev) -{ - return lprocfs_obd_detach(dev); -} + sema_init(&mds->mds_health_sem, 1); -static int mdt_setup(struct obd_device *obd, obd_count len, void *buf) -{ - struct mds_obd *mds = &obd->u.mds; - int rc = 0; - ENTRY; + if (mds_num_threads) { + /* If mds_num_threads is set, it is the min and the max. */ + if (mds_num_threads > MDS_THREADS_MAX) + mds_num_threads = MDS_THREADS_MAX; + if (mds_num_threads < MDS_THREADS_MIN) + mds_num_threads = MDS_THREADS_MIN; + mds_max_threads = mds_min_threads = mds_num_threads; + } else { + /* Base min threads on memory and cpus */ + mds_min_threads = num_possible_cpus() * num_physpages >> + (27 - CFS_PAGE_SHIFT); + if (mds_min_threads < MDS_THREADS_MIN) + mds_min_threads = MDS_THREADS_MIN; + /* Largest auto threads start value */ + if (mds_min_threads > 32) + mds_min_threads = 32; + mds_max_threads = min(MDS_THREADS_MAX, mds_min_threads * 4); + } mds->mds_service = ptlrpc_init_svc(MDS_NBUFS, MDS_BUFSIZE, MDS_MAXREQSIZE, - MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL, - MDS_SERVICE_WATCHDOG_TIMEOUT, - mds_handle, "mds", obd->obd_proc_entry); + MDS_MAXREPSIZE, MDS_REQUEST_PORTAL, + MDC_REPLY_PORTAL, MDS_SERVICE_WATCHDOG_TIMEOUT, + mds_handle, LUSTRE_MDS_NAME, + obd->obd_proc_entry, NULL, + mds_min_threads, mds_max_threads, "ll_mdt", 0); if (!mds->mds_service) { CERROR("failed to start service\n"); - RETURN(-ENOMEM); + GOTO(err_lprocfs, rc = -ENOMEM); } - rc = ptlrpc_start_n_threads(obd, mds->mds_service, MDT_NUM_THREADS, - "ll_mdt"); + rc = ptlrpc_start_threads(obd, mds->mds_service); if (rc) GOTO(err_thread, rc); mds->mds_setattr_service = ptlrpc_init_svc(MDS_NBUFS, MDS_BUFSIZE, MDS_MAXREQSIZE, - MDS_SETATTR_PORTAL, MDC_REPLY_PORTAL, - MDS_SERVICE_WATCHDOG_TIMEOUT, + MDS_MAXREPSIZE, MDS_SETATTR_PORTAL, + MDC_REPLY_PORTAL, MDS_SERVICE_WATCHDOG_TIMEOUT, mds_handle, "mds_setattr", - obd->obd_proc_entry); + obd->obd_proc_entry, NULL, + mds_min_threads, mds_max_threads, + "ll_mdt_attr", 0); if (!mds->mds_setattr_service) { CERROR("failed to start getattr service\n"); GOTO(err_thread, rc = -ENOMEM); } - rc = ptlrpc_start_n_threads(obd, mds->mds_setattr_service, - MDT_NUM_THREADS, "ll_mdt_attr"); + rc = ptlrpc_start_threads(obd, mds->mds_setattr_service); if (rc) GOTO(err_thread2, rc); mds->mds_readpage_service = ptlrpc_init_svc(MDS_NBUFS, MDS_BUFSIZE, MDS_MAXREQSIZE, - MDS_READPAGE_PORTAL, MDC_REPLY_PORTAL, - MDS_SERVICE_WATCHDOG_TIMEOUT, + MDS_MAXREPSIZE, MDS_READPAGE_PORTAL, + MDC_REPLY_PORTAL, MDS_SERVICE_WATCHDOG_TIMEOUT, mds_handle, "mds_readpage", - obd->obd_proc_entry); + obd->obd_proc_entry, NULL, + MDS_THREADS_MIN_READPAGE, mds_max_threads, + "ll_mdt_rdpg", 0); if (!mds->mds_readpage_service) { CERROR("failed to start readpage service\n"); GOTO(err_thread2, rc = -ENOMEM); } - rc = ptlrpc_start_n_threads(obd, mds->mds_readpage_service, - MDT_NUM_THREADS, "ll_mdt_rdpg"); + rc = ptlrpc_start_threads(obd, mds->mds_readpage_service); if (rc) GOTO(err_thread3, rc); + ping_evictor_start(); + RETURN(0); err_thread3: ptlrpc_unregister_service(mds->mds_readpage_service); + mds->mds_readpage_service = NULL; err_thread2: ptlrpc_unregister_service(mds->mds_setattr_service); + mds->mds_setattr_service = NULL; err_thread: ptlrpc_unregister_service(mds->mds_service); + mds->mds_service = NULL; +err_lprocfs: + lprocfs_obd_cleanup(obd); return rc; } -static int mdt_cleanup(struct obd_device *obd, int flags) +static int mdt_cleanup(struct obd_device *obd) { struct mds_obd *mds = &obd->u.mds; ENTRY; - ptlrpc_stop_all_threads(mds->mds_readpage_service); - ptlrpc_unregister_service(mds->mds_readpage_service); + ping_evictor_stop(); - ptlrpc_stop_all_threads(mds->mds_setattr_service); + down(&mds->mds_health_sem); + ptlrpc_unregister_service(mds->mds_readpage_service); ptlrpc_unregister_service(mds->mds_setattr_service); - - ptlrpc_stop_all_threads(mds->mds_service); ptlrpc_unregister_service(mds->mds_service); + mds->mds_readpage_service = NULL; + mds->mds_setattr_service = NULL; + mds->mds_service = NULL; + up(&mds->mds_health_sem); + + lprocfs_obd_cleanup(obd); RETURN(0); } -static struct dentry *mds_lvfs_id2dentry(__u64 ino, __u32 gen, - __u64 gr, void *data) +static int mdt_health_check(struct obd_device *obd) { - struct lustre_id id; - struct obd_device *obd = data; - - id_ino(&id) = ino; - id_gen(&id) = gen; - return mds_id2dentry(obd, &id, NULL); -} + struct mds_obd *mds = &obd->u.mds; + int rc = 0; -static int mds_get_info(struct obd_export *exp, __u32 keylen, - void *key, __u32 *valsize, void *val) -{ - struct obd_device *obd; - struct mds_obd *mds; - ENTRY; + down(&mds->mds_health_sem); + rc |= ptlrpc_service_health_check(mds->mds_readpage_service); + rc |= ptlrpc_service_health_check(mds->mds_setattr_service); + rc |= ptlrpc_service_health_check(mds->mds_service); + up(&mds->mds_health_sem); - obd = class_exp2obd(exp); - mds = &obd->u.mds; - - if (obd == NULL) { - CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n", - exp->exp_handle.h_cookie); - RETURN(-EINVAL); - } + /* + * health_check to return 0 on healthy + * and 1 on unhealthy. + */ + if(rc != 0) + rc = 1; - if (keylen >= strlen("reint_log") && memcmp(key, "reint_log", 9) == 0) { - /* get log_context handle. */ - unsigned long *llh_handle = val; - *valsize = sizeof(unsigned long); - *llh_handle = (unsigned long)obd->obd_llog_ctxt[LLOG_REINT_ORIG_CTXT]; - RETURN(0); - } - if (keylen >= strlen("cache_sb") && memcmp(key, "cache_sb", 8) == 0) { - /* get log_context handle. */ - unsigned long *sb = val; - *valsize = sizeof(unsigned long); - *sb = (unsigned long)obd->u.mds.mds_sb; - RETURN(0); - } + return rc; +} - if (keylen >= strlen("mdsize") && memcmp(key, "mdsize", keylen) == 0) { - __u32 *mdsize = val; - *valsize = sizeof(*mdsize); - *mdsize = mds->mds_max_mdsize; - RETURN(0); - } +static struct dentry *mds_lvfs_fid2dentry(__u64 id, __u32 gen, __u64 gr, + void *data) +{ + struct obd_device *obd = data; + struct ll_fid fid; + fid.id = id; + fid.generation = gen; + return mds_fid2dentry(&obd->u.mds, &fid, NULL); +} - if (keylen >= strlen("mdsnum") && strcmp(key, "mdsnum") == 0) { - __u32 *mdsnum = val; - *valsize = sizeof(*mdsnum); - *mdsnum = mds->mds_num; - RETURN(0); - } +static int mds_health_check(struct obd_device *obd) +{ + struct obd_device_target *odt = &obd->u.obt; +#ifdef USE_HEALTH_CHECK_WRITE + struct mds_obd *mds = &obd->u.mds; +#endif + int rc = 0; - if (keylen >= strlen("rootid") && strcmp(key, "rootid") == 0) { - struct lustre_id *rootid = val; - *valsize = sizeof(*rootid); - *rootid = mds->mds_rootid; - RETURN(0); - } + if (odt->obt_sb->s_flags & MS_RDONLY) + rc = 1; - if (keylen >= strlen("lovdesc") && strcmp(key, "lovdesc") == 0) { - struct lov_desc *desc = val; - *valsize = sizeof(*desc); - *desc = mds->mds_dt_desc; - RETURN(0); - } +#ifdef USE_HEALTH_CHECK_WRITE + LASSERT(mds->mds_health_check_filp != NULL); + rc |= !!lvfs_check_io_health(obd, mds->mds_health_check_filp); +#endif + return rc; +} + +static int mds_process_config(struct obd_device *obd, obd_count len, void *buf) +{ + struct lustre_cfg *lcfg = buf; + struct lprocfs_static_vars lvars; + int rc; - CDEBUG(D_IOCTL, "invalid key\n"); - RETURN(-EINVAL); + lprocfs_mds_init_vars(&lvars); + rc = class_process_proc_param(PARAM_MDT, lvars.obd_vars, lcfg, obd); + return(rc); } + struct lvfs_callback_ops mds_lvfs_ops = { - l_id2dentry: mds_lvfs_id2dentry, + l_fid2dentry: mds_lvfs_fid2dentry, }; -int mds_preprw(int cmd, struct obd_export *exp, struct obdo *oa, - int objcount, struct obd_ioobj *obj, - int niocount, struct niobuf_remote *nb, - struct niobuf_local *res, - struct obd_trans_info *oti); - -int mds_commitrw(int cmd, struct obd_export *exp, struct obdo *oa, - int objcount, struct obd_ioobj *obj, int niocount, - struct niobuf_local *res, struct obd_trans_info *oti, - int rc); - /* use obd ops to offer management infrastructure */ static struct obd_ops mds_obd_ops = { .o_owner = THIS_MODULE, - .o_attach = mds_attach, - .o_detach = mds_detach, .o_connect = mds_connect, - .o_connect_post = mds_connect_post, + .o_reconnect = mds_reconnect, .o_init_export = mds_init_export, .o_destroy_export = mds_destroy_export, .o_disconnect = mds_disconnect, .o_setup = mds_setup, .o_precleanup = mds_precleanup, .o_cleanup = mds_cleanup, - .o_process_config = mds_process_config, .o_postrecov = mds_postrecov, .o_statfs = mds_obd_statfs, .o_iocontrol = mds_iocontrol, @@ -4396,47 +2779,219 @@ static struct obd_ops mds_obd_ops = { .o_llog_init = mds_llog_init, .o_llog_finish = mds_llog_finish, .o_notify = mds_notify, - .o_get_info = mds_get_info, - .o_set_info = mds_set_info, - .o_preprw = mds_preprw, - .o_commitrw = mds_commitrw, + .o_health_check = mds_health_check, + .o_process_config = mds_process_config, }; static struct obd_ops mdt_obd_ops = { .o_owner = THIS_MODULE, - .o_attach = mdt_attach, - .o_detach = mdt_detach, .o_setup = mdt_setup, .o_cleanup = mdt_cleanup, + .o_health_check = mdt_health_check, }; -static int __init mds_init(void) +quota_interface_t *mds_quota_interface_ref; +extern quota_interface_t mds_quota_interface; + +static __attribute__((unused)) int __init mds_init(void) { + int rc; struct lprocfs_static_vars lvars; - mds_init_lsd_cache(); + request_module("lquota"); + mds_quota_interface_ref = PORTAL_SYMBOL_GET(mds_quota_interface); + rc = lquota_init(mds_quota_interface_ref); + if (rc) { + if (mds_quota_interface_ref) + PORTAL_SYMBOL_PUT(mds_quota_interface); + return rc; + } + init_obd_quota_ops(mds_quota_interface_ref, &mds_obd_ops); + + lprocfs_mds_init_vars(&lvars); + class_register_type(&mds_obd_ops, NULL, + lvars.module_vars, LUSTRE_MDS_NAME, NULL); + lprocfs_mds_init_vars(&lvars); + mdt_obd_ops = mdt_obd_ops; //make compiler happy +// class_register_type(&mdt_obd_ops, NULL, +// lvars.module_vars, LUSTRE_MDT_NAME, NULL); + + return 0; +} + +static __attribute__((unused)) void /*__exit*/ mds_exit(void) +{ + lquota_exit(mds_quota_interface_ref); + if (mds_quota_interface_ref) + PORTAL_SYMBOL_PUT(mds_quota_interface); + + class_unregister_type(LUSTRE_MDS_NAME); +// class_unregister_type(LUSTRE_MDT_NAME); +} +/*mds still need lov setup here*/ +static int mds_cmd_setup(struct obd_device *obd, struct lustre_cfg *lcfg) +{ + struct mds_obd *mds = &obd->u.mds; + struct lvfs_run_ctxt saved; + const char *dev; + struct vfsmount *mnt; + struct lustre_sb_info *lsi; + struct lustre_mount_info *lmi; + struct dentry *dentry; + int rc = 0; + ENTRY; + + CDEBUG(D_INFO, "obd %s setup \n", obd->obd_name); + if (strncmp(obd->obd_name, MDD_OBD_NAME, strlen(MDD_OBD_NAME))) + RETURN(0); + + if (lcfg->lcfg_bufcount < 5) { + CERROR("invalid arg for setup %s\n", MDD_OBD_NAME); + RETURN(-EINVAL); + } + dev = lustre_cfg_string(lcfg, 4); + lmi = server_get_mount(dev); + LASSERT(lmi != NULL); + + lsi = s2lsi(lmi->lmi_sb); + mnt = lmi->lmi_mnt; + /* FIXME: MDD LOV initialize objects. + * we need only lmi here but not get mount + * OSD did mount already, so put mount back + */ + atomic_dec(&lsi->lsi_mounts); + mntput(mnt); + + obd->obd_fsops = fsfilt_get_ops(MT_STR(lsi->lsi_ldd)); + mds_init_ctxt(obd, mnt); + + push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); + dentry = simple_mkdir(current->fs->pwd, "OBJECTS", 0777, 1); + if (IS_ERR(dentry)) { + rc = PTR_ERR(dentry); + CERROR("cannot create OBJECTS directory: rc = %d\n", rc); + GOTO(err_putfs, rc); + } + mds->mds_objects_dir = dentry; + + dentry = lookup_one_len("__iopen__", current->fs->pwd, + strlen("__iopen__")); + if (IS_ERR(dentry)) { + rc = PTR_ERR(dentry); + CERROR("cannot lookup __iopen__ directory: rc = %d\n", rc); + GOTO(err_objects, rc); + } + + mds->mds_fid_de = dentry; + if (!dentry->d_inode || is_bad_inode(dentry->d_inode)) { + rc = -ENOENT; + CERROR("__iopen__ directory has no inode? rc = %d\n", rc); + GOTO(err_fid, rc); + } + rc = mds_lov_init_objids(obd); + if (rc != 0) { + CERROR("cannot init lov objid rc = %d\n", rc); + GOTO(err_fid, rc ); + } + + rc = mds_lov_presetup(mds, lcfg); + if (rc < 0) + GOTO(err_objects, rc); + + /* Don't wait for mds_postrecov trying to clear orphans */ + obd->obd_async_recov = 1; + rc = mds_postsetup(obd); + /* Bug 11557 - allow async abort_recov start + FIXME can remove most of this obd_async_recov plumbing + obd->obd_async_recov = 0; + */ + + if (rc) + GOTO(err_objects, rc); + + mds->mds_max_mdsize = sizeof(struct lov_mds_md); + mds->mds_max_cookiesize = sizeof(struct llog_cookie); + +err_pop: + pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); + RETURN(rc); +err_fid: + dput(mds->mds_fid_de); +err_objects: + dput(mds->mds_objects_dir); +err_putfs: + fsfilt_put_ops(obd->obd_fsops); + goto err_pop; +} + +static int mds_cmd_cleanup(struct obd_device *obd) +{ + struct mds_obd *mds = &obd->u.mds; + struct lvfs_run_ctxt saved; + int rc = 0; + ENTRY; + + if (obd->obd_fail) + LCONSOLE_WARN("%s: shutting down for failover; client state " + "will be preserved.\n", obd->obd_name); + + push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); + + mds_lov_destroy_objids(obd); - lprocfs_init_multi_vars(0, &lvars); - class_register_type(&mds_obd_ops, NULL, lvars.module_vars, - OBD_MDS_DEVICENAME); - lprocfs_init_multi_vars(1, &lvars); - class_register_type(&mdt_obd_ops, NULL, lvars.module_vars, - OBD_MDT_DEVICENAME); + if (mds->mds_objects_dir != NULL) { + l_dput(mds->mds_objects_dir); + mds->mds_objects_dir = NULL; + } + + shrink_dcache_parent(mds->mds_fid_de); + dput(mds->mds_fid_de); + LL_DQUOT_OFF(obd->u.obt.obt_sb); + fsfilt_put_ops(obd->obd_fsops); + + pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); + RETURN(rc); +} +#if 0 +static int mds_cmd_health_check(struct obd_device *obd) +{ return 0; } +#endif +static struct obd_ops mds_cmd_obd_ops = { + .o_owner = THIS_MODULE, + .o_setup = mds_cmd_setup, + .o_cleanup = mds_cmd_cleanup, + .o_precleanup = mds_precleanup, + .o_create = mds_obd_create, + .o_destroy = mds_obd_destroy, + .o_llog_init = mds_llog_init, + .o_llog_finish = mds_llog_finish, + .o_notify = mds_notify, + .o_postrecov = mds_postrecov, + // .o_health_check = mds_cmd_health_check, +}; -static void /*__exit*/ mds_exit(void) +static int __init mds_cmd_init(void) { - mds_cleanup_lsd_cache(); + struct lprocfs_static_vars lvars; + + lprocfs_mds_init_vars(&lvars); + class_register_type(&mds_cmd_obd_ops, NULL, lvars.module_vars, + LUSTRE_MDS_NAME, NULL); - class_unregister_type(OBD_MDS_DEVICENAME); - class_unregister_type(OBD_MDT_DEVICENAME); + return 0; +} + +static void /*__exit*/ mds_cmd_exit(void) +{ + class_unregister_type(LUSTRE_MDS_NAME); } MODULE_AUTHOR("Cluster File Systems, Inc. "); MODULE_DESCRIPTION("Lustre Metadata Server (MDS)"); MODULE_LICENSE("GPL"); -module_init(mds_init); -module_exit(mds_exit); +module_init(mds_cmd_init); +module_exit(mds_cmd_exit);