X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fmds%2Fhandler.c;h=9b015ca8bbbe2b69a68c4a1673715063e035ff06;hp=d8f314f3cf95de06ae63a3cfebce6b39301dda89;hb=d750891e478804bc495ffa075d771d1816369958;hpb=113303973ec9f8484eb2355a1a6ef3c4c7fd6a56 diff --git a/lustre/mds/handler.c b/lustre/mds/handler.c index d8f314f..9b015ca 100644 --- a/lustre/mds/handler.c +++ b/lustre/mds/handler.c @@ -1,2091 +1,154 @@ /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * - * lustre/mds/handler.c - * Lustre Metadata Server (mds) request handler + * GPL HEADER START * - * Copyright (c) 2001-2005 Cluster File Systems, Inc. - * Author: Peter Braam - * Author: Andreas Dilger - * Author: Phil Schwan - * Author: Mike Shaver + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * - * This file is part of the Lustre file system, http://www.lustre.org - * Lustre is a trademark of Cluster File Systems, Inc. + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 only, + * as published by the Free Software Foundation. * - * You may have signed or agreed to another license before downloading - * this software. If so, you are bound by the terms and conditions - * of that agreement, and the following does not apply to you. See the - * LICENSE file included with this distribution for more information. + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License version 2 for more details (a copy is included + * in the LICENSE file that accompanied this code). * - * If you did not agree to a different license, then this copy of Lustre - * is open source software; you can redistribute it and/or modify it - * under the terms of version 2 of the GNU General Public License as - * published by the Free Software Foundation. + * You should have received a copy of the GNU General Public License + * version 2 along with this program; If not, see + * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf * - * In either case, Lustre is distributed in the hope that it will be - * useful, but WITHOUT ANY WARRANTY; without even the implied warranty - * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * license text for more details. + * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, + * CA 95054 USA or visit www.sun.com if you need additional information or + * have any questions. + * + * GPL HEADER END + */ +/* + * Copyright (c) 2001, 2010, Oracle and/or its affiliates. All rights reserved. + * Use is subject to license terms. + */ +/* + * This file is part of Lustre, http://www.lustre.org/ + * Lustre is a trademark of Sun Microsystems, Inc. + * + * lustre/mds/handler.c + * + * Author: Peter Braam + * Author: Andreas Dilger + * Author: Phil Schwan + * Author: Mike Shaver */ -#ifndef EXPORT_SYMTAB -# define EXPORT_SYMTAB -#endif #define DEBUG_SUBSYSTEM S_MDS #include #include #include -#include #include #include -#include -#if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0)) -# include -# include -# include -# include -#else -# include -#endif +#include +#include +#include +#include +#include #include #include #include #include #include -#include -#include #include #include #include "mds_internal.h" -int mds_num_threads; -CFS_MODULE_PARM(mds_num_threads, "i", int, 0444, - "number of MDS service threads to start"); - -static int mds_intent_policy(struct ldlm_namespace *ns, - struct ldlm_lock **lockp, void *req_cookie, - ldlm_mode_t mode, int flags, void *data); -static int mds_postsetup(struct obd_device *obd); -static int mds_cleanup(struct obd_device *obd); - -/* Assumes caller has already pushed into the kernel filesystem context */ -static int mds_sendpage(struct ptlrpc_request *req, struct file *file, - loff_t offset, int count) -{ - struct ptlrpc_bulk_desc *desc; - struct l_wait_info lwi; - struct page **pages; - int rc = 0, npages, i, tmpcount, tmpsize = 0; - ENTRY; - - LASSERT((offset & ~CFS_PAGE_MASK) == 0); /* I'm dubious about this */ - - npages = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT; - OBD_ALLOC(pages, sizeof(*pages) * npages); - if (!pages) - GOTO(out, rc = -ENOMEM); - - desc = ptlrpc_prep_bulk_exp(req, npages, BULK_PUT_SOURCE, - MDS_BULK_PORTAL); - if (desc == NULL) - GOTO(out_free, rc = -ENOMEM); - - for (i = 0, tmpcount = count; i < npages; i++, tmpcount -= tmpsize) { - tmpsize = tmpcount > CFS_PAGE_SIZE ? CFS_PAGE_SIZE : tmpcount; - - pages[i] = alloc_pages(GFP_KERNEL, 0); - if (pages[i] == NULL) - GOTO(cleanup_buf, rc = -ENOMEM); - - ptlrpc_prep_bulk_page(desc, pages[i], 0, tmpsize); - } - - for (i = 0, tmpcount = count; i < npages; i++, tmpcount -= tmpsize) { - tmpsize = tmpcount > CFS_PAGE_SIZE ? CFS_PAGE_SIZE : tmpcount; - CDEBUG(D_EXT2, "reading %u@%llu from dir %lu (size %llu)\n", - tmpsize, offset, file->f_dentry->d_inode->i_ino, - file->f_dentry->d_inode->i_size); - - rc = fsfilt_readpage(req->rq_export->exp_obd, file, - kmap(pages[i]), tmpsize, &offset); - kunmap(pages[i]); - - if (rc != tmpsize) - GOTO(cleanup_buf, rc = -EIO); - } - - LASSERT(desc->bd_nob == count); - - rc = ptlrpc_start_bulk_transfer(desc); - if (rc) - GOTO(cleanup_buf, rc); - - if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE)) { - CERROR("obd_fail_loc=%x, fail operation rc=%d\n", - OBD_FAIL_MDS_SENDPAGE, rc); - GOTO(abort_bulk, rc); - } - - lwi = LWI_TIMEOUT(obd_timeout * HZ / 4, NULL, NULL); - rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc), &lwi); - LASSERT (rc == 0 || rc == -ETIMEDOUT); - - if (rc == 0) { - if (desc->bd_success && - desc->bd_nob_transferred == count) - GOTO(cleanup_buf, rc); - - rc = -ETIMEDOUT; /* XXX should this be a different errno? */ - } - - DEBUG_REQ(D_ERROR, req, "bulk failed: %s %d(%d), evicting %s@%s\n", - (rc == -ETIMEDOUT) ? "timeout" : "network error", - desc->bd_nob_transferred, count, - req->rq_export->exp_client_uuid.uuid, - req->rq_export->exp_connection->c_remote_uuid.uuid); - - class_fail_export(req->rq_export); - - EXIT; - abort_bulk: - ptlrpc_abort_bulk (desc); - cleanup_buf: - for (i = 0; i < npages; i++) - if (pages[i]) - __free_pages(pages[i], 0); - - ptlrpc_free_bulk(desc); - out_free: - OBD_FREE(pages, sizeof(*pages) * npages); - out: - return rc; -} - -/* only valid locked dentries or errors should be returned */ -struct dentry *mds_fid2locked_dentry(struct obd_device *obd, struct ll_fid *fid, - struct vfsmount **mnt, int lock_mode, - struct lustre_handle *lockh, - char *name, int namelen, __u64 lockpart) -{ - struct mds_obd *mds = &obd->u.mds; - struct dentry *de = mds_fid2dentry(mds, fid, mnt), *retval = de; - struct ldlm_res_id res_id = { .name = {0} }; - int flags = LDLM_FL_ATOMIC_CB, rc; - ldlm_policy_data_t policy = { .l_inodebits = { lockpart} }; - ENTRY; - - if (IS_ERR(de)) - RETURN(de); - - res_id.name[0] = de->d_inode->i_ino; - res_id.name[1] = de->d_inode->i_generation; - rc = ldlm_cli_enqueue_local(obd->obd_namespace, res_id, - LDLM_IBITS, &policy, lock_mode, &flags, - ldlm_blocking_ast, ldlm_completion_ast, - NULL, NULL, 0, NULL, lockh); - if (rc != ELDLM_OK) { - l_dput(de); - retval = ERR_PTR(-EIO); /* XXX translate ldlm code */ - } - - RETURN(retval); -} +__u32 mds_max_ost_index=0xFFFF; +CFS_MODULE_PARM(mds_max_ost_index, "i", int, 0444, + "maximal OST index"); /* Look up an entry by inode number. */ /* this function ONLY returns valid dget'd dentries with an initialized inode or errors */ -struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid, - struct vfsmount **mnt) -{ - char fid_name[32]; - unsigned long ino = fid->id; - __u32 generation = fid->generation; - struct inode *inode; - struct dentry *result; - - if (ino == 0) - RETURN(ERR_PTR(-ESTALE)); - - snprintf(fid_name, sizeof(fid_name), "0x%lx", ino); - - CDEBUG(D_DENTRY, "--> mds_fid2dentry: ino/gen %lu/%u, sb %p\n", - ino, generation, mds->mds_obt.obt_sb); - - /* under ext3 this is neither supposed to return bad inodes - nor NULL inodes. */ - result = ll_lookup_one_len(fid_name, mds->mds_fid_de, strlen(fid_name)); - if (IS_ERR(result)) - RETURN(result); - - inode = result->d_inode; - if (!inode) - RETURN(ERR_PTR(-ENOENT)); - - if (inode->i_generation == 0 || inode->i_nlink == 0) { - LCONSOLE_WARN("Found inode with zero generation or link -- this" - " may indicate disk corruption (inode: %lu/%u, " - "link %lu, count %d)\n", inode->i_ino, - inode->i_generation,(unsigned long)inode->i_nlink, - atomic_read(&inode->i_count)); - dput(result); - RETURN(ERR_PTR(-ENOENT)); - } - - if (generation && inode->i_generation != generation) { - /* we didn't find the right inode.. */ - CDEBUG(D_INODE, "found wrong generation: inode %lu, link: %lu, " - "count: %d, generation %u/%u\n", inode->i_ino, - (unsigned long)inode->i_nlink, - atomic_read(&inode->i_count), inode->i_generation, - generation); - dput(result); - RETURN(ERR_PTR(-ENOENT)); - } - - if (mnt) { - *mnt = mds->mds_vfsmnt; - mntget(*mnt); - } - - RETURN(result); -} - -static int mds_connect_internal(struct obd_export *exp, - struct obd_connect_data *data) -{ - struct obd_device *obd = exp->exp_obd; - if (data != NULL) { - data->ocd_connect_flags &= MDS_CONNECT_SUPPORTED; - data->ocd_ibits_known &= MDS_INODELOCK_FULL; - - /* If no known bits (which should not happen, probably, - as everybody should support LOOKUP and UPDATE bits at least) - revert to compat mode with plain locks. */ - if (!data->ocd_ibits_known && - data->ocd_connect_flags & OBD_CONNECT_IBITS) - data->ocd_connect_flags &= ~OBD_CONNECT_IBITS; - - if (!obd->u.mds.mds_fl_acl) - data->ocd_connect_flags &= ~OBD_CONNECT_ACL; - - if (!obd->u.mds.mds_fl_user_xattr) - data->ocd_connect_flags &= ~OBD_CONNECT_XATTR; - - exp->exp_connect_flags = data->ocd_connect_flags; - data->ocd_version = LUSTRE_VERSION_CODE; - exp->exp_mds_data.med_ibits_known = data->ocd_ibits_known; - } - - if (obd->u.mds.mds_fl_acl && - ((exp->exp_connect_flags & OBD_CONNECT_ACL) == 0)) { - CWARN("%s: MDS requires ACL support but client does not\n", - obd->obd_name); - return -EBADE; - } - return 0; -} - -static int mds_reconnect(struct obd_export *exp, struct obd_device *obd, - struct obd_uuid *cluuid, - struct obd_connect_data *data) -{ - int rc; - ENTRY; - - if (exp == NULL || obd == NULL || cluuid == NULL) - RETURN(-EINVAL); - - rc = mds_connect_internal(exp, data); - - RETURN(rc); -} - -/* Establish a connection to the MDS. - * - * This will set up an export structure for the client to hold state data - * about that client, like open files, the last operation number it did - * on the server, etc. - */ -static int mds_connect(struct lustre_handle *conn, struct obd_device *obd, - struct obd_uuid *cluuid, struct obd_connect_data *data) -{ - struct obd_export *exp; - struct mds_export_data *med; - struct mds_client_data *mcd = NULL; - int rc, abort_recovery; - ENTRY; - - if (!conn || !obd || !cluuid) - RETURN(-EINVAL); - - /* Check for aborted recovery. */ - spin_lock_bh(&obd->obd_processing_task_lock); - abort_recovery = obd->obd_abort_recovery; - spin_unlock_bh(&obd->obd_processing_task_lock); - if (abort_recovery) - target_abort_recovery(obd); - - /* XXX There is a small race between checking the list and adding a - * new connection for the same UUID, but the real threat (list - * corruption when multiple different clients connect) is solved. - * - * There is a second race between adding the export to the list, - * and filling in the client data below. Hence skipping the case - * of NULL mcd above. We should already be controlling multiple - * connects at the client, and we can't hold the spinlock over - * memory allocations without risk of deadlocking. - */ - rc = class_connect(conn, obd, cluuid); - if (rc) - RETURN(rc); - exp = class_conn2export(conn); - LASSERT(exp); - med = &exp->exp_mds_data; - - rc = mds_connect_internal(exp, data); - if (rc) - GOTO(out, rc); - - OBD_ALLOC(mcd, sizeof(*mcd)); - if (!mcd) - GOTO(out, rc = -ENOMEM); - - memcpy(mcd->mcd_uuid, cluuid, sizeof(mcd->mcd_uuid)); - med->med_mcd = mcd; - - rc = mds_client_add(obd, exp, -1); - GOTO(out, rc); - -out: - if (rc) { - if (mcd) { - OBD_FREE(mcd, sizeof(*mcd)); - med->med_mcd = NULL; - } - class_disconnect(exp); - } else { - class_export_put(exp); - } - - RETURN(rc); -} - -int mds_init_export(struct obd_export *exp) -{ - struct mds_export_data *med = &exp->exp_mds_data; - - INIT_LIST_HEAD(&med->med_open_head); - spin_lock_init(&med->med_open_lock); - exp->exp_connecting = 1; - RETURN(0); -} - -static int mds_destroy_export(struct obd_export *export) -{ - struct mds_export_data *med; - struct obd_device *obd = export->exp_obd; - struct mds_obd *mds = &obd->u.mds; - struct lvfs_run_ctxt saved; - struct lov_mds_md *lmm; - struct llog_cookie *logcookies; - int rc = 0; - ENTRY; - - med = &export->exp_mds_data; - target_destroy_export(export); - - if (obd_uuid_equals(&export->exp_client_uuid, &obd->obd_uuid)) - RETURN(0); - - push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); - /* Close any open files (which may also cause orphan unlinking). */ - - OBD_ALLOC(lmm, mds->mds_max_mdsize); - if (lmm == NULL) { - CWARN("%s: allocation failure during cleanup; can not force " - "close file handles on this service.\n", obd->obd_name); - GOTO(out, rc = -ENOMEM); - } - - OBD_ALLOC(logcookies, mds->mds_max_cookiesize); - if (logcookies == NULL) { - CWARN("%s: allocation failure during cleanup; can not force " - "close file handles on this service.\n", obd->obd_name); - OBD_FREE(lmm, mds->mds_max_mdsize); - GOTO(out, rc = -ENOMEM); - } - - spin_lock(&med->med_open_lock); - while (!list_empty(&med->med_open_head)) { - struct list_head *tmp = med->med_open_head.next; - struct mds_file_data *mfd = - list_entry(tmp, struct mds_file_data, mfd_list); - int lmm_size = mds->mds_max_mdsize; - umode_t mode = mfd->mfd_dentry->d_inode->i_mode; - __u64 valid = 0; - - /* Remove mfd handle so it can't be found again. - * We are consuming the mfd_list reference here. */ - mds_mfd_unlink(mfd, 0); - spin_unlock(&med->med_open_lock); - - /* If you change this message, be sure to update - * replay_single:test_46 */ - CDEBUG(D_INODE|D_IOCTL, "%s: force closing file handle for " - "%.*s (ino %lu)\n", obd->obd_name, - mfd->mfd_dentry->d_name.len,mfd->mfd_dentry->d_name.name, - mfd->mfd_dentry->d_inode->i_ino); - - rc = mds_get_md(obd, mfd->mfd_dentry->d_inode, lmm,&lmm_size,1); - if (rc < 0) - CWARN("mds_get_md failure, rc=%d\n", rc); - else - valid |= OBD_MD_FLEASIZE; - - /* child orphan sem protects orphan_dec_test and - * is_orphan race, mds_mfd_close drops it */ - MDS_DOWN_WRITE_ORPHAN_SEM(mfd->mfd_dentry->d_inode); - - rc = mds_mfd_close(NULL, REQ_REC_OFF, obd, mfd, - !(export->exp_flags & OBD_OPT_FAILOVER), - lmm, lmm_size, logcookies, - mds->mds_max_cookiesize, - &valid); - - if (rc) - CDEBUG(D_INODE|D_IOCTL, "Error closing file: %d\n", rc); - - if (valid & OBD_MD_FLCOOKIE) { - rc = mds_osc_destroy_orphan(obd, mode, lmm, - lmm_size, logcookies, 1); - if (rc < 0) { - CDEBUG(D_INODE, "%s: destroy of orphan failed," - " rc = %d\n", obd->obd_name, rc); - rc = 0; - } - valid &= ~OBD_MD_FLCOOKIE; - } - - spin_lock(&med->med_open_lock); - } - - OBD_FREE(logcookies, mds->mds_max_cookiesize); - OBD_FREE(lmm, mds->mds_max_mdsize); - - spin_unlock(&med->med_open_lock); - - pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); - mds_client_free(export); - - out: - RETURN(rc); -} - -static int mds_disconnect(struct obd_export *exp) -{ - int rc; - ENTRY; - - LASSERT(exp); - class_export_get(exp); - - /* Disconnect early so that clients can't keep using export */ - rc = class_disconnect(exp); - if (exp->exp_obd->obd_namespace != NULL) - ldlm_cancel_locks_for_export(exp); - - /* complete all outstanding replies */ - spin_lock(&exp->exp_lock); - while (!list_empty(&exp->exp_outstanding_replies)) { - struct ptlrpc_reply_state *rs = - list_entry(exp->exp_outstanding_replies.next, - struct ptlrpc_reply_state, rs_exp_list); - struct ptlrpc_service *svc = rs->rs_service; - - spin_lock(&svc->srv_lock); - list_del_init(&rs->rs_exp_list); - ptlrpc_schedule_difficult_reply(rs); - spin_unlock(&svc->srv_lock); - } - spin_unlock(&exp->exp_lock); - - class_export_put(exp); - RETURN(rc); -} - -static int mds_getstatus(struct ptlrpc_request *req) -{ - struct mds_obd *mds = mds_req2mds(req); - struct mds_body *body; - int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) }; - ENTRY; - - rc = lustre_pack_reply(req, 2, size, NULL); - if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK)) { - CERROR("mds: out of memory for message\n"); - req->rq_status = -ENOMEM; /* superfluous? */ - RETURN(-ENOMEM); - } - - body = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*body)); - memcpy(&body->fid1, &mds->mds_rootfid, sizeof(body->fid1)); - - /* the last_committed and last_xid fields are filled in for all - * replies already - no need to do so here also. - */ - RETURN(0); -} - -/* get the LOV EA from @inode and store it into @md. It can be at most - * @size bytes, and @size is updated with the actual EA size. - * The EA size is also returned on success, and -ve errno on failure. - * If there is no EA then 0 is returned. */ -int mds_get_md(struct obd_device *obd, struct inode *inode, void *md, - int *size, int lock) -{ - int rc = 0; - int lmm_size; - - if (lock) - LOCK_INODE_MUTEX(inode); - rc = fsfilt_get_md(obd, inode, md, *size, "lov"); - - if (rc < 0) { - CERROR("Error %d reading eadata for ino %lu\n", - rc, inode->i_ino); - } else if (rc > 0) { - lmm_size = rc; - rc = mds_convert_lov_ea(obd, inode, md, lmm_size); - - if (rc == 0) { - *size = lmm_size; - rc = lmm_size; - } else if (rc > 0) { - *size = rc; - } - } else { - *size = 0; - } - if (lock) - UNLOCK_INODE_MUTEX(inode); - - RETURN (rc); -} - - -/* Call with lock=1 if you want mds_pack_md to take the i_mutex. - * Call with lock=0 if the caller has already taken the i_mutex. */ -int mds_pack_md(struct obd_device *obd, struct lustre_msg *msg, int offset, - struct mds_body *body, struct inode *inode, int lock) -{ - struct mds_obd *mds = &obd->u.mds; - void *lmm; - int lmm_size; - int rc; - ENTRY; - - lmm = lustre_msg_buf(msg, offset, 0); - if (lmm == NULL) { - /* Some problem with getting eadata when I sized the reply - * buffer... */ - CDEBUG(D_INFO, "no space reserved for inode %lu MD\n", - inode->i_ino); - RETURN(0); - } - lmm_size = lustre_msg_buflen(msg, offset); - - /* I don't really like this, but it is a sanity check on the client - * MD request. However, if the client doesn't know how much space - * to reserve for the MD, it shouldn't be bad to have too much space. - */ - if (lmm_size > mds->mds_max_mdsize) { - CWARN("Reading MD for inode %lu of %d bytes > max %d\n", - inode->i_ino, lmm_size, mds->mds_max_mdsize); - // RETURN(-EINVAL); - } - - rc = mds_get_md(obd, inode, lmm, &lmm_size, lock); - if (rc > 0) { - if (S_ISDIR(inode->i_mode)) - body->valid |= OBD_MD_FLDIREA; - else - body->valid |= OBD_MD_FLEASIZE; - body->eadatasize = lmm_size; - rc = 0; - } - - RETURN(rc); -} - -#ifdef CONFIG_FS_POSIX_ACL -static -int mds_pack_posix_acl(struct inode *inode, struct lustre_msg *repmsg, - struct mds_body *repbody, int repoff) -{ - struct dentry de = { .d_inode = inode }; - int buflen, rc; - ENTRY; - - LASSERT(repbody->aclsize == 0); - LASSERT(lustre_msg_bufcount(repmsg) > repoff); - - buflen = lustre_msg_buflen(repmsg, repoff); - if (!buflen) - GOTO(out, 0); - - if (!inode->i_op || !inode->i_op->getxattr) - GOTO(out, 0); - - lock_24kernel(); - rc = inode->i_op->getxattr(&de, MDS_XATTR_NAME_ACL_ACCESS, - lustre_msg_buf(repmsg, repoff, buflen), - buflen); - unlock_24kernel(); - - if (rc >= 0) - repbody->aclsize = rc; - else if (rc != -ENODATA) { - CERROR("buflen %d, get acl: %d\n", buflen, rc); - RETURN(rc); - } - EXIT; -out: - repbody->valid |= OBD_MD_FLACL; - return 0; -} -#else -#define mds_pack_posix_acl(inode, repmsg, repbody, repoff) 0 -#endif - -int mds_pack_acl(struct mds_export_data *med, struct inode *inode, - struct lustre_msg *repmsg, struct mds_body *repbody, - int repoff) -{ - return mds_pack_posix_acl(inode, repmsg, repbody, repoff); -} - -static int mds_getattr_internal(struct obd_device *obd, struct dentry *dentry, - struct ptlrpc_request *req, - struct mds_body *reqbody, int reply_off) -{ - struct mds_body *body; - struct inode *inode = dentry->d_inode; - int rc = 0; - ENTRY; - - if (inode == NULL) - RETURN(-ENOENT); - - body = lustre_msg_buf(req->rq_repmsg, reply_off, sizeof(*body)); - LASSERT(body != NULL); /* caller prepped reply */ - - mds_pack_inode2fid(&body->fid1, inode); - body->flags = reqbody->flags; /* copy MDS_BFLAG_EXT_FLAGS if present */ - mds_pack_inode2body(body, inode); - reply_off++; - - if ((S_ISREG(inode->i_mode) && (reqbody->valid & OBD_MD_FLEASIZE)) || - (S_ISDIR(inode->i_mode) && (reqbody->valid & OBD_MD_FLDIREA))) { - rc = mds_pack_md(obd, req->rq_repmsg, reply_off, body, - inode, 1); - - /* If we have LOV EA data, the OST holds size, atime, mtime */ - if (!(body->valid & OBD_MD_FLEASIZE) && - !(body->valid & OBD_MD_FLDIREA)) - body->valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS | - OBD_MD_FLATIME | OBD_MD_FLMTIME); - - lustre_shrink_reply(req, reply_off, body->eadatasize, 0); - if (body->eadatasize) - reply_off++; - } else if (S_ISLNK(inode->i_mode) && - (reqbody->valid & OBD_MD_LINKNAME) != 0) { - char *symname = lustre_msg_buf(req->rq_repmsg, reply_off, 0); - int len; - - LASSERT (symname != NULL); /* caller prepped reply */ - len = lustre_msg_buflen(req->rq_repmsg, reply_off); - - rc = inode->i_op->readlink(dentry, symname, len); - if (rc < 0) { - CERROR("readlink failed: %d\n", rc); - } else if (rc != len - 1) { - CERROR ("Unexpected readlink rc %d: expecting %d\n", - rc, len - 1); - rc = -EINVAL; - } else { - CDEBUG(D_INODE, "read symlink dest %s\n", symname); - body->valid |= OBD_MD_LINKNAME; - body->eadatasize = rc + 1; - symname[rc] = 0; /* NULL terminate */ - rc = 0; - } - reply_off++; - } else if (reqbody->valid == OBD_MD_FLFLAGS && - reqbody->flags & MDS_BFLAG_EXT_FLAGS) { - int flags; - - /* We only return the full set of flags on ioctl, otherwise we - * get enough flags from the inode in mds_pack_inode2body(). */ - rc = fsfilt_iocontrol(obd, inode, NULL, EXT3_IOC_GETFLAGS, - (long)&flags); - if (rc == 0) - body->flags = flags | MDS_BFLAG_EXT_FLAGS; - } - - if (reqbody->valid & OBD_MD_FLMODEASIZE) { - struct mds_obd *mds = mds_req2mds(req); - body->max_cookiesize = mds->mds_max_cookiesize; - body->max_mdsize = mds->mds_max_mdsize; - body->valid |= OBD_MD_FLMODEASIZE; - } - - if (rc) - RETURN(rc); - -#ifdef CONFIG_FS_POSIX_ACL - if ((req->rq_export->exp_connect_flags & OBD_CONNECT_ACL) && - (reqbody->valid & OBD_MD_FLACL)) { - rc = mds_pack_acl(&req->rq_export->exp_mds_data, - inode, req->rq_repmsg, - body, reply_off); - - lustre_shrink_reply(req, reply_off, body->aclsize, 0); - if (body->aclsize) - reply_off++; - } -#endif - - RETURN(rc); -} - -static int mds_getattr_pack_msg(struct ptlrpc_request *req, struct inode *inode, - int offset) -{ - struct mds_obd *mds = mds_req2mds(req); - struct mds_body *body; - int rc, bufcount = 2; - int size[4] = { sizeof(struct ptlrpc_body), sizeof(*body) }; - ENTRY; - - LASSERT(offset == REQ_REC_OFF); /* non-intent */ - - body = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*body)); - LASSERT(body != NULL); /* checked by caller */ - LASSERT_REQSWABBED(req, offset); /* swabbed by caller */ - - if ((S_ISREG(inode->i_mode) && (body->valid & OBD_MD_FLEASIZE)) || - (S_ISDIR(inode->i_mode) && (body->valid & OBD_MD_FLDIREA))) { - LOCK_INODE_MUTEX(inode); - rc = fsfilt_get_md(req->rq_export->exp_obd, inode, NULL, 0, - "lov"); - UNLOCK_INODE_MUTEX(inode); - CDEBUG(D_INODE, "got %d bytes MD data for inode %lu\n", - rc, inode->i_ino); - if (rc < 0) { - if (rc != -ENODATA) { - CERROR("error getting inode %lu MD: rc = %d\n", - inode->i_ino, rc); - RETURN(rc); - } - size[bufcount] = 0; - } else if (rc > mds->mds_max_mdsize) { - size[bufcount] = 0; - CERROR("MD size %d larger than maximum possible %u\n", - rc, mds->mds_max_mdsize); - } else { - size[bufcount] = rc; - } - bufcount++; - } else if (S_ISLNK(inode->i_mode) && (body->valid & OBD_MD_LINKNAME)) { - if (inode->i_size + 1 != body->eadatasize) - CERROR("symlink size: %Lu, reply space: %d\n", - inode->i_size + 1, body->eadatasize); - size[bufcount] = min_t(int, inode->i_size+1, body->eadatasize); - bufcount++; - CDEBUG(D_INODE, "symlink size: %Lu, reply space: %d\n", - inode->i_size + 1, body->eadatasize); - } - -#ifdef CONFIG_FS_POSIX_ACL - if ((req->rq_export->exp_connect_flags & OBD_CONNECT_ACL) && - (body->valid & OBD_MD_FLACL)) { - struct dentry de = { .d_inode = inode }; - - size[bufcount] = 0; - if (inode->i_op && inode->i_op->getxattr) { - lock_24kernel(); - rc = inode->i_op->getxattr(&de, MDS_XATTR_NAME_ACL_ACCESS, - NULL, 0); - unlock_24kernel(); - - if (rc < 0) { - if (rc != -ENODATA) { - CERROR("got acl size: %d\n", rc); - RETURN(rc); - } - } else - size[bufcount] = rc; - } - bufcount++; - } -#endif - - if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) { - CERROR("failed MDS_GETATTR_PACK test\n"); - req->rq_status = -ENOMEM; - RETURN(-ENOMEM); - } - - rc = lustre_pack_reply(req, bufcount, size, NULL); - if (rc) { - CERROR("lustre_pack_reply failed: rc %d\n", rc); - req->rq_status = rc; - RETURN(rc); - } - - RETURN(0); -} - -static int mds_getattr_lock(struct ptlrpc_request *req, int offset, - int child_part, struct lustre_handle *child_lockh) -{ - struct obd_device *obd = req->rq_export->exp_obd; - struct mds_obd *mds = &obd->u.mds; - struct ldlm_reply *rep = NULL; - struct lvfs_run_ctxt saved; - struct mds_body *body; - struct dentry *dparent = NULL, *dchild = NULL; - struct lvfs_ucred uc = {NULL,}; - struct lustre_handle parent_lockh; - int namesize; - int rc = 0, cleanup_phase = 0, resent_req = 0; - char *name; - ENTRY; - - LASSERT(!strcmp(obd->obd_type->typ_name, LUSTRE_MDS_NAME)); - - /* Swab now, before anyone looks inside the request */ - body = lustre_swab_reqbuf(req, offset, sizeof(*body), - lustre_swab_mds_body); - if (body == NULL) { - CERROR("Can't swab mds_body\n"); - RETURN(-EFAULT); - } - - LASSERT_REQSWAB(req, offset + 1); - name = lustre_msg_string(req->rq_reqmsg, offset + 1, 0); - if (name == NULL) { - CERROR("Can't unpack name\n"); - RETURN(-EFAULT); - } - namesize = lustre_msg_buflen(req->rq_reqmsg, offset + 1); - /* namesize less than 2 means we have empty name, probably came from - revalidate by cfid, so no point in having name to be set */ - if (namesize <= 1) - name = NULL; - - rc = mds_init_ucred(&uc, req, offset); - if (rc) - GOTO(cleanup, rc); - - LASSERT(offset == REQ_REC_OFF || offset == DLM_INTENT_REC_OFF); - /* if requests were at offset 2, the getattr reply goes back at 1 */ - if (offset == DLM_INTENT_REC_OFF) { - rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF, - sizeof(*rep)); - offset = DLM_REPLY_REC_OFF; - } - - push_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc); - cleanup_phase = 1; /* kernel context */ - intent_set_disposition(rep, DISP_LOOKUP_EXECD); - - /* FIXME: handle raw lookup */ -#if 0 - if (body->valid == OBD_MD_FLID) { - struct mds_body *mds_reply; - int size = sizeof(*mds_reply); - ino_t inum; - // The user requested ONLY the inode number, so do a raw lookup - rc = lustre_pack_reply(req, 1, &size, NULL); - if (rc) { - CERROR("out of memory\n"); - GOTO(cleanup, rc); - } - - rc = dir->i_op->lookup_raw(dir, name, namesize - 1, &inum); - - mds_reply = lustre_msg_buf(req->rq_repmsg, offset, - sizeof(*mds_reply)); - mds_reply->fid1.id = inum; - mds_reply->valid = OBD_MD_FLID; - GOTO(cleanup, rc); - } -#endif - - if (lustre_handle_is_used(child_lockh)) { - LASSERT(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT); - resent_req = 1; - } - - if (resent_req == 0) { - if (name) { - OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RESEND, obd_timeout*2); - rc = mds_get_parent_child_locked(obd, &obd->u.mds, - &body->fid1, - &parent_lockh, - &dparent, LCK_CR, - MDS_INODELOCK_UPDATE, - name, namesize, - child_lockh, &dchild, - LCK_CR, child_part); - } else { - /* For revalidate by fid we always take UPDATE lock */ - dchild = mds_fid2locked_dentry(obd, &body->fid2, NULL, - LCK_CR, child_lockh, - NULL, 0, child_part); - LASSERT(dchild); - if (IS_ERR(dchild)) - rc = PTR_ERR(dchild); - } - if (rc) - GOTO(cleanup, rc); - } else { - struct ldlm_lock *granted_lock; - struct ll_fid child_fid; - struct ldlm_resource *res; - DEBUG_REQ(D_DLMTRACE, req, "resent, not enqueuing new locks"); - granted_lock = ldlm_handle2lock(child_lockh); - LASSERTF(granted_lock != NULL, LPU64"/%u lockh "LPX64"\n", - body->fid1.id, body->fid1.generation, - child_lockh->cookie); - - - res = granted_lock->l_resource; - child_fid.id = res->lr_name.name[0]; - child_fid.generation = res->lr_name.name[1]; - dchild = mds_fid2dentry(&obd->u.mds, &child_fid, NULL); - LASSERT(!IS_ERR(dchild)); - LDLM_LOCK_PUT(granted_lock); - } - - cleanup_phase = 2; /* dchild, dparent, locks */ - - if (dchild->d_inode == NULL) { - intent_set_disposition(rep, DISP_LOOKUP_NEG); - /* in the intent case, the policy clears this error: - the disposition is enough */ - GOTO(cleanup, rc = -ENOENT); - } else { - intent_set_disposition(rep, DISP_LOOKUP_POS); - } - - if (req->rq_repmsg == NULL) { - rc = mds_getattr_pack_msg(req, dchild->d_inode, offset); - if (rc != 0) { - CERROR ("mds_getattr_pack_msg: %d\n", rc); - GOTO (cleanup, rc); - } - } - - rc = mds_getattr_internal(obd, dchild, req, body, offset); - GOTO(cleanup, rc); /* returns the lock to the client */ - - cleanup: - switch (cleanup_phase) { - case 2: - if (resent_req == 0) { - if (rc && dchild->d_inode) - ldlm_lock_decref(child_lockh, LCK_CR); - if (name) { - ldlm_lock_decref(&parent_lockh, LCK_CR); - l_dput(dparent); - } - } - l_dput(dchild); - case 1: - pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc); - default: - mds_exit_ucred(&uc, mds); - if (req->rq_reply_state == NULL) { - req->rq_status = rc; - lustre_pack_reply(req, 1, NULL, NULL); - } - } - return rc; -} - -static int mds_getattr(struct ptlrpc_request *req, int offset) -{ - struct mds_obd *mds = mds_req2mds(req); - struct obd_device *obd = req->rq_export->exp_obd; - struct lvfs_run_ctxt saved; - struct dentry *de; - struct mds_body *body; - struct lvfs_ucred uc = { NULL, }; - int rc = 0; - ENTRY; - - OBD_COUNTER_INCREMENT(obd, getattr); - - body = lustre_swab_reqbuf(req, offset, sizeof(*body), - lustre_swab_mds_body); - if (body == NULL) - RETURN(-EFAULT); - - rc = mds_init_ucred(&uc, req, offset); - if (rc) - GOTO(out_ucred, rc); - - push_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc); - de = mds_fid2dentry(mds, &body->fid1, NULL); - if (IS_ERR(de)) { - rc = req->rq_status = PTR_ERR(de); - GOTO(out_pop, rc); - } - - rc = mds_getattr_pack_msg(req, de->d_inode, offset); - if (rc != 0) { - CERROR("mds_getattr_pack_msg: %d\n", rc); - GOTO(out_pop, rc); - } - - req->rq_status = mds_getattr_internal(obd, de, req, body,REPLY_REC_OFF); - - l_dput(de); - GOTO(out_pop, rc); -out_pop: - pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc); -out_ucred: - if (req->rq_reply_state == NULL) { - req->rq_status = rc; - lustre_pack_reply(req, 1, NULL, NULL); - } - mds_exit_ucred(&uc, mds); - return rc; -} - -static int mds_obd_statfs(struct obd_device *obd, struct obd_statfs *osfs, - __u64 max_age) -{ - int rc; - - spin_lock(&obd->obd_osfs_lock); - rc = fsfilt_statfs(obd, obd->u.obt.obt_sb, max_age); - if (rc == 0) - memcpy(osfs, &obd->obd_osfs, sizeof(*osfs)); - spin_unlock(&obd->obd_osfs_lock); - - return rc; -} - -static int mds_statfs(struct ptlrpc_request *req) -{ - struct obd_device *obd = req->rq_export->exp_obd; - int rc, size[2] = { sizeof(struct ptlrpc_body), - sizeof(struct obd_statfs) }; - ENTRY; - - /* This will trigger a watchdog timeout */ - OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_STATFS_LCW_SLEEP, - (MDS_SERVICE_WATCHDOG_TIMEOUT / 1000) + 1); - OBD_COUNTER_INCREMENT(obd, statfs); - - rc = lustre_pack_reply(req, 2, size, NULL); - if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK)) { - CERROR("mds: statfs lustre_pack_reply failed: rc = %d\n", rc); - GOTO(out, rc); - } - - /* We call this so that we can cache a bit - 1 jiffie worth */ - rc = mds_obd_statfs(obd, lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, - size[REPLY_REC_OFF]), - cfs_time_current_64() - HZ); - if (rc) { - CERROR("mds_obd_statfs failed: rc %d\n", rc); - GOTO(out, rc); - } - - EXIT; -out: - req->rq_status = rc; - return 0; -} - -static int mds_sync(struct ptlrpc_request *req, int offset) -{ - struct obd_device *obd = req->rq_export->exp_obd; - struct mds_obd *mds = &obd->u.mds; - struct mds_body *body; - int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) }; - ENTRY; - - body = lustre_swab_reqbuf(req, offset, sizeof(*body), - lustre_swab_mds_body); - if (body == NULL) - GOTO(out, rc = -EFAULT); - - rc = lustre_pack_reply(req, 2, size, NULL); - if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_SYNC_PACK)) { - CERROR("fsync lustre_pack_reply failed: rc = %d\n", rc); - GOTO(out, rc); - } - - if (body->fid1.id == 0) { - /* a fid of zero is taken to mean "sync whole filesystem" */ - rc = fsfilt_sync(obd, obd->u.obt.obt_sb); - GOTO(out, rc); - } else { - struct dentry *de; - - de = mds_fid2dentry(mds, &body->fid1, NULL); - if (IS_ERR(de)) - GOTO(out, rc = PTR_ERR(de)); - - /* The file parameter isn't used for anything */ - if (de->d_inode->i_fop && de->d_inode->i_fop->fsync) - rc = de->d_inode->i_fop->fsync(NULL, de, 1); - if (rc == 0) { - body = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, - sizeof(*body)); - mds_pack_inode2fid(&body->fid1, de->d_inode); - mds_pack_inode2body(body, de->d_inode); - } - - l_dput(de); - GOTO(out, rc); - } -out: - req->rq_status = rc; - return 0; -} - -/* mds_readpage does not take a DLM lock on the inode, because the client must - * already have a PR lock. - * - * If we were to take another one here, a deadlock will result, if another - * thread is already waiting for a PW lock. */ -static int mds_readpage(struct ptlrpc_request *req, int offset) -{ - struct obd_device *obd = req->rq_export->exp_obd; - struct mds_obd *mds = &obd->u.mds; - struct vfsmount *mnt; - struct dentry *de; - struct file *file; - struct mds_body *body, *repbody; - struct lvfs_run_ctxt saved; - int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*repbody) }; - struct lvfs_ucred uc = {NULL,}; - ENTRY; - - if (OBD_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK)) - RETURN(-ENOMEM); - - rc = lustre_pack_reply(req, 2, size, NULL); - if (rc) { - CERROR("error packing readpage reply: rc %d\n", rc); - GOTO(out, rc); - } - - body = lustre_swab_reqbuf(req, offset, sizeof(*body), - lustre_swab_mds_body); - if (body == NULL) - GOTO (out, rc = -EFAULT); - - rc = mds_init_ucred(&uc, req, offset); - if (rc) - GOTO(out, rc); - - push_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc); - de = mds_fid2dentry(&obd->u.mds, &body->fid1, &mnt); - if (IS_ERR(de)) - GOTO(out_pop, rc = PTR_ERR(de)); - - CDEBUG(D_INODE, "ino %lu\n", de->d_inode->i_ino); - - file = dentry_open(de, mnt, O_RDONLY | O_LARGEFILE); - /* note: in case of an error, dentry_open puts dentry */ - if (IS_ERR(file)) - GOTO(out_pop, rc = PTR_ERR(file)); - - /* body->size is actually the offset -eeb */ - if ((body->size & (de->d_inode->i_blksize - 1)) != 0) { - CERROR("offset "LPU64" not on a block boundary of %lu\n", - body->size, de->d_inode->i_blksize); - GOTO(out_file, rc = -EFAULT); - } - - /* body->nlink is actually the #bytes to read -eeb */ - if (body->nlink & (de->d_inode->i_blksize - 1)) { - CERROR("size %u is not multiple of blocksize %lu\n", - body->nlink, de->d_inode->i_blksize); - GOTO(out_file, rc = -EFAULT); - } - - repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, - sizeof(*repbody)); - repbody->size = file->f_dentry->d_inode->i_size; - repbody->valid = OBD_MD_FLSIZE; - - /* to make this asynchronous make sure that the handling function - doesn't send a reply when this function completes. Instead a - callback function would send the reply */ - /* body->size is actually the offset -eeb */ - rc = mds_sendpage(req, file, body->size, body->nlink); - -out_file: - filp_close(file, 0); -out_pop: - pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc); -out: - mds_exit_ucred(&uc, mds); - req->rq_status = rc; - RETURN(0); -} - -int mds_reint(struct ptlrpc_request *req, int offset, - struct lustre_handle *lockh) -{ - struct mds_update_record *rec; /* 116 bytes on the stack? no sir! */ - int rc; - - OBD_ALLOC(rec, sizeof(*rec)); - if (rec == NULL) - RETURN(-ENOMEM); - - rc = mds_update_unpack(req, offset, rec); - if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNPACK)) { - CERROR("invalid record\n"); - GOTO(out, req->rq_status = -EINVAL); - } - - /* rc will be used to interrupt a for loop over multiple records */ - rc = mds_reint_rec(rec, offset, req, lockh); - out: - OBD_FREE(rec, sizeof(*rec)); - return rc; -} - -static int mds_filter_recovery_request(struct ptlrpc_request *req, - struct obd_device *obd, int *process) -{ - switch (lustre_msg_get_opc(req->rq_reqmsg)) { - case MDS_CONNECT: /* This will never get here, but for completeness. */ - case OST_CONNECT: /* This will never get here, but for completeness. */ - case MDS_DISCONNECT: - case OST_DISCONNECT: - *process = 1; - RETURN(0); - - case MDS_CLOSE: - case MDS_SYNC: /* used in unmounting */ - case OBD_PING: - case MDS_REINT: - case LDLM_ENQUEUE: - *process = target_queue_recovery_request(req, obd); - RETURN(0); - - default: - DEBUG_REQ(D_ERROR, req, "not permitted during recovery"); - *process = 0; - /* XXX what should we set rq_status to here? */ - req->rq_status = -EAGAIN; - RETURN(ptlrpc_error(req)); - } -} - -static char *reint_names[] = { - [REINT_SETATTR] "setattr", - [REINT_CREATE] "create", - [REINT_LINK] "link", - [REINT_UNLINK] "unlink", - [REINT_RENAME] "rename", - [REINT_OPEN] "open", -}; - -static int mds_set_info_rpc(struct obd_export *exp, struct ptlrpc_request *req) -{ - char *key; - __u32 *val; - int keylen, rc = 0; - ENTRY; - - key = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, 1); - if (key == NULL) { - DEBUG_REQ(D_HA, req, "no set_info key"); - RETURN(-EFAULT); - } - keylen = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF); - - val = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, sizeof(*val)); - if (val == NULL) { - DEBUG_REQ(D_HA, req, "no set_info val"); - RETURN(-EFAULT); - } - - rc = lustre_pack_reply(req, 1, NULL, NULL); - if (rc) - RETURN(rc); - lustre_msg_set_status(req->rq_repmsg, 0); - - if (keylen < strlen("read-only") || - memcmp(key, "read-only", keylen) != 0) - RETURN(-EINVAL); - - if (*val) - exp->exp_connect_flags |= OBD_CONNECT_RDONLY; - else - exp->exp_connect_flags &= ~OBD_CONNECT_RDONLY; - - RETURN(0); -} - -static int mds_handle_quotacheck(struct ptlrpc_request *req) -{ - struct obd_quotactl *oqctl; - int rc; - ENTRY; - - oqctl = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*oqctl), - lustre_swab_obd_quotactl); - if (oqctl == NULL) - RETURN(-EPROTO); - - rc = lustre_pack_reply(req, 1, NULL, NULL); - if (rc) { - CERROR("mds: out of memory while packing quotacheck reply\n"); - RETURN(rc); - } - - req->rq_status = obd_quotacheck(req->rq_export, oqctl); - RETURN(0); -} - -static int mds_handle_quotactl(struct ptlrpc_request *req) -{ - struct obd_quotactl *oqctl, *repoqc; - int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*repoqc) }; - ENTRY; - - oqctl = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*oqctl), - lustre_swab_obd_quotactl); - if (oqctl == NULL) - RETURN(-EPROTO); - - rc = lustre_pack_reply(req, 2, size, NULL); - if (rc) - RETURN(rc); - - repoqc = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*repoqc)); - - req->rq_status = obd_quotactl(req->rq_export, oqctl); - *repoqc = *oqctl; - RETURN(0); -} - -static int mds_msg_check_version(struct lustre_msg *msg) -{ - int rc; - - switch (lustre_msg_get_opc(msg)) { - case MDS_CONNECT: - case MDS_DISCONNECT: - case OBD_PING: - rc = lustre_msg_check_version(msg, LUSTRE_OBD_VERSION); - if (rc) - CERROR("bad opc %u version %08x, expecting %08x\n", - lustre_msg_get_opc(msg), - lustre_msg_get_version(msg), - LUSTRE_OBD_VERSION); - break; - case MDS_GETSTATUS: - case MDS_GETATTR: - case MDS_GETATTR_NAME: - case MDS_STATFS: - case MDS_READPAGE: - case MDS_REINT: - case MDS_CLOSE: - case MDS_DONE_WRITING: - case MDS_PIN: - case MDS_SYNC: - case MDS_GETXATTR: - case MDS_SETXATTR: - case MDS_SET_INFO: - case MDS_QUOTACHECK: - case MDS_QUOTACTL: - case QUOTA_DQACQ: - case QUOTA_DQREL: - rc = lustre_msg_check_version(msg, LUSTRE_MDS_VERSION); - if (rc) - CERROR("bad opc %u version %08x, expecting %08x\n", - lustre_msg_get_opc(msg), - lustre_msg_get_version(msg), - LUSTRE_MDS_VERSION); - break; - case LDLM_ENQUEUE: - case LDLM_CONVERT: - case LDLM_BL_CALLBACK: - case LDLM_CP_CALLBACK: - rc = lustre_msg_check_version(msg, LUSTRE_DLM_VERSION); - if (rc) - CERROR("bad opc %u version %08x, expecting %08x\n", - lustre_msg_get_opc(msg), - lustre_msg_get_version(msg), - LUSTRE_DLM_VERSION); - break; - case OBD_LOG_CANCEL: - case LLOG_ORIGIN_HANDLE_CREATE: - case LLOG_ORIGIN_HANDLE_NEXT_BLOCK: - case LLOG_ORIGIN_HANDLE_READ_HEADER: - case LLOG_ORIGIN_HANDLE_CLOSE: - case LLOG_ORIGIN_HANDLE_DESTROY: - case LLOG_ORIGIN_HANDLE_PREV_BLOCK: - case LLOG_CATINFO: - rc = lustre_msg_check_version(msg, LUSTRE_LOG_VERSION); - if (rc) - CERROR("bad opc %u version %08x, expecting %08x\n", - lustre_msg_get_opc(msg), - lustre_msg_get_version(msg), - LUSTRE_LOG_VERSION); - break; - default: - CERROR("MDS unknown opcode %d\n", lustre_msg_get_opc(msg)); - rc = -ENOTSUPP; - } - return rc; -} - -int mds_handle(struct ptlrpc_request *req) -{ - int should_process, fail = OBD_FAIL_MDS_ALL_REPLY_NET; - int rc = 0; - struct mds_obd *mds = NULL; /* quell gcc overwarning */ - struct obd_device *obd = NULL; - ENTRY; - - OBD_FAIL_RETURN(OBD_FAIL_MDS_ALL_REQUEST_NET | OBD_FAIL_ONCE, 0); - - LASSERT(current->journal_info == NULL); - - rc = mds_msg_check_version(req->rq_reqmsg); - if (rc) { - CERROR("MDS drop mal-formed request\n"); - RETURN(rc); - } - - /* XXX identical to OST */ - if (lustre_msg_get_opc(req->rq_reqmsg) != MDS_CONNECT) { - struct mds_export_data *med; - int recovering, abort_recovery; - - if (req->rq_export == NULL) { - CERROR("operation %d on unconnected MDS from %s\n", - lustre_msg_get_opc(req->rq_reqmsg), - libcfs_id2str(req->rq_peer)); - req->rq_status = -ENOTCONN; - GOTO(out, rc = -ENOTCONN); - } - - med = &req->rq_export->exp_mds_data; - obd = req->rq_export->exp_obd; - mds = &obd->u.mds; - - /* sanity check: if the xid matches, the request must - * be marked as a resent or replayed */ - if (req->rq_xid == le64_to_cpu(med->med_mcd->mcd_last_xid) || - req->rq_xid == le64_to_cpu(med->med_mcd->mcd_last_close_xid)) - if (!(lustre_msg_get_flags(req->rq_reqmsg) & - (MSG_RESENT | MSG_REPLAY))) { - CERROR("rq_xid "LPU64" matches last_xid, " - "expected RESENT flag\n", - req->rq_xid); - req->rq_status = -ENOTCONN; - GOTO(out, rc = -EFAULT); - } - /* else: note the opposite is not always true; a - * RESENT req after a failover will usually not match - * the last_xid, since it was likely never - * committed. A REPLAYed request will almost never - * match the last xid, however it could for a - * committed, but still retained, open. */ - - /* Check for aborted recovery. */ - spin_lock_bh(&obd->obd_processing_task_lock); - abort_recovery = obd->obd_abort_recovery; - recovering = obd->obd_recovering; - spin_unlock_bh(&obd->obd_processing_task_lock); - if (abort_recovery) { - target_abort_recovery(obd); - } else if (recovering) { - rc = mds_filter_recovery_request(req, obd, - &should_process); - if (rc || !should_process) - RETURN(rc); - } - } - - switch (lustre_msg_get_opc(req->rq_reqmsg)) { - case MDS_CONNECT: - DEBUG_REQ(D_INODE, req, "connect"); - OBD_FAIL_RETURN(OBD_FAIL_MDS_CONNECT_NET, 0); - rc = target_handle_connect(req, mds_handle); - if (!rc) { - /* Now that we have an export, set mds. */ - obd = req->rq_export->exp_obd; - mds = mds_req2mds(req); - } - break; - - case MDS_DISCONNECT: - DEBUG_REQ(D_INODE, req, "disconnect"); - OBD_FAIL_RETURN(OBD_FAIL_MDS_DISCONNECT_NET, 0); - rc = target_handle_disconnect(req); - req->rq_status = rc; /* superfluous? */ - break; - - case MDS_GETSTATUS: - DEBUG_REQ(D_INODE, req, "getstatus"); - OBD_FAIL_RETURN(OBD_FAIL_MDS_GETSTATUS_NET, 0); - rc = mds_getstatus(req); - break; - - case MDS_GETATTR: - DEBUG_REQ(D_INODE, req, "getattr"); - OBD_FAIL_RETURN(OBD_FAIL_MDS_GETATTR_NET, 0); - rc = mds_getattr(req, REQ_REC_OFF); - break; - - case MDS_SETXATTR: - DEBUG_REQ(D_INODE, req, "setxattr"); - OBD_FAIL_RETURN(OBD_FAIL_MDS_SETXATTR_NET, 0); - rc = mds_setxattr(req); - break; - - case MDS_GETXATTR: - DEBUG_REQ(D_INODE, req, "getxattr"); - OBD_FAIL_RETURN(OBD_FAIL_MDS_GETXATTR_NET, 0); - rc = mds_getxattr(req); - break; - - case MDS_GETATTR_NAME: { - struct lustre_handle lockh = { 0 }; - DEBUG_REQ(D_INODE, req, "getattr_name"); - OBD_FAIL_RETURN(OBD_FAIL_MDS_GETATTR_NAME_NET, 0); - - /* If this request gets a reconstructed reply, we won't be - * acquiring any new locks in mds_getattr_lock, so we don't - * want to cancel. - */ - rc = mds_getattr_lock(req, REQ_REC_OFF, MDS_INODELOCK_UPDATE, - &lockh); - /* this non-intent call (from an ioctl) is special */ - req->rq_status = rc; - if (rc == 0 && lustre_handle_is_used(&lockh)) - ldlm_lock_decref(&lockh, LCK_CR); - break; - } - case MDS_STATFS: - DEBUG_REQ(D_INODE, req, "statfs"); - OBD_FAIL_RETURN(OBD_FAIL_MDS_STATFS_NET, 0); - rc = mds_statfs(req); - break; - - case MDS_READPAGE: - DEBUG_REQ(D_INODE, req, "readpage"); - OBD_FAIL_RETURN(OBD_FAIL_MDS_READPAGE_NET, 0); - rc = mds_readpage(req, REQ_REC_OFF); - - if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_MDS_SENDPAGE)) { - RETURN(0); - } - - break; - - case MDS_REINT: { - __u32 *opcp = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, - sizeof(*opcp)); - __u32 opc; - int size[4] = { sizeof(struct ptlrpc_body), - sizeof(struct mds_body), - mds->mds_max_mdsize, - mds->mds_max_cookiesize }; - int bufcount; - - /* NB only peek inside req now; mds_reint() will swab it */ - if (opcp == NULL) { - CERROR ("Can't inspect opcode\n"); - rc = -EINVAL; - break; - } - opc = *opcp; - if (lustre_msg_swabbed(req->rq_reqmsg)) - __swab32s(&opc); - - DEBUG_REQ(D_INODE, req, "reint %d (%s)", opc, - (opc < sizeof(reint_names) / sizeof(reint_names[0]) || - reint_names[opc] == NULL) ? reint_names[opc] : - "unknown opcode"); - - OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_NET, 0); - - if (opc == REINT_UNLINK || opc == REINT_RENAME) - bufcount = 4; - else if (opc == REINT_OPEN) - bufcount = 3; - else - bufcount = 2; - - rc = lustre_pack_reply(req, bufcount, size, NULL); - if (rc) - break; - - rc = mds_reint(req, REQ_REC_OFF, NULL); - fail = OBD_FAIL_MDS_REINT_NET_REP; - break; - } - - case MDS_CLOSE: - DEBUG_REQ(D_INODE, req, "close"); - OBD_FAIL_RETURN(OBD_FAIL_MDS_CLOSE_NET, 0); - rc = mds_close(req, REQ_REC_OFF); - break; - - case MDS_DONE_WRITING: - DEBUG_REQ(D_INODE, req, "done_writing"); - OBD_FAIL_RETURN(OBD_FAIL_MDS_DONE_WRITING_NET, 0); - rc = mds_done_writing(req, REQ_REC_OFF); - break; - - case MDS_PIN: - DEBUG_REQ(D_INODE, req, "pin"); - OBD_FAIL_RETURN(OBD_FAIL_MDS_PIN_NET, 0); - rc = mds_pin(req, REQ_REC_OFF); - break; - - case MDS_SYNC: - DEBUG_REQ(D_INODE, req, "sync"); - OBD_FAIL_RETURN(OBD_FAIL_MDS_SYNC_NET, 0); - rc = mds_sync(req, REQ_REC_OFF); - break; - - case MDS_SET_INFO: - DEBUG_REQ(D_INODE, req, "set_info"); - rc = mds_set_info_rpc(req->rq_export, req); - break; - - case MDS_QUOTACHECK: - DEBUG_REQ(D_INODE, req, "quotacheck"); - OBD_FAIL_RETURN(OBD_FAIL_MDS_QUOTACHECK_NET, 0); - rc = mds_handle_quotacheck(req); - break; - - case MDS_QUOTACTL: - DEBUG_REQ(D_INODE, req, "quotactl"); - OBD_FAIL_RETURN(OBD_FAIL_MDS_QUOTACTL_NET, 0); - rc = mds_handle_quotactl(req); - break; - - case OBD_PING: - DEBUG_REQ(D_INODE, req, "ping"); - rc = target_handle_ping(req); - break; - - case OBD_LOG_CANCEL: - CDEBUG(D_INODE, "log cancel\n"); - OBD_FAIL_RETURN(OBD_FAIL_OBD_LOG_CANCEL_NET, 0); - rc = -ENOTSUPP; /* la la la */ - break; - - case LDLM_ENQUEUE: - DEBUG_REQ(D_INODE, req, "enqueue"); - OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0); - rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast, - ldlm_server_blocking_ast, NULL); - fail = OBD_FAIL_LDLM_REPLY; - break; - case LDLM_CONVERT: - DEBUG_REQ(D_INODE, req, "convert"); - OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0); - rc = ldlm_handle_convert(req); - break; - case LDLM_BL_CALLBACK: - case LDLM_CP_CALLBACK: - DEBUG_REQ(D_INODE, req, "callback"); - CERROR("callbacks should not happen on MDS\n"); - LBUG(); - OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0); - break; - case LLOG_ORIGIN_HANDLE_CREATE: - DEBUG_REQ(D_INODE, req, "llog_init"); - OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0); - rc = llog_origin_handle_create(req); - break; - case LLOG_ORIGIN_HANDLE_DESTROY: - DEBUG_REQ(D_INODE, req, "llog_init"); - OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0); - rc = llog_origin_handle_destroy(req); - break; - case LLOG_ORIGIN_HANDLE_NEXT_BLOCK: - DEBUG_REQ(D_INODE, req, "llog next block"); - OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0); - rc = llog_origin_handle_next_block(req); - break; - case LLOG_ORIGIN_HANDLE_PREV_BLOCK: - DEBUG_REQ(D_INODE, req, "llog prev block"); - OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0); - rc = llog_origin_handle_prev_block(req); - break; - case LLOG_ORIGIN_HANDLE_READ_HEADER: - DEBUG_REQ(D_INODE, req, "llog read header"); - OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0); - rc = llog_origin_handle_read_header(req); - break; - case LLOG_ORIGIN_HANDLE_CLOSE: - DEBUG_REQ(D_INODE, req, "llog close"); - OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0); - rc = llog_origin_handle_close(req); - break; - case LLOG_CATINFO: - DEBUG_REQ(D_INODE, req, "llog catinfo"); - OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0); - rc = llog_catinfo(req); - break; - default: - req->rq_status = -ENOTSUPP; - rc = ptlrpc_error(req); - RETURN(rc); - } - - LASSERT(current->journal_info == NULL); - - /* If we're DISCONNECTing, the mds_export_data is already freed */ - if (!rc && lustre_msg_get_opc(req->rq_reqmsg) != MDS_DISCONNECT) { - struct mds_export_data *med = &req->rq_export->exp_mds_data; - - /* I don't think last_xid is used for anyway, so I'm not sure - if we need to care about last_close_xid here.*/ - lustre_msg_set_last_xid(req->rq_repmsg, - le64_to_cpu(med->med_mcd->mcd_last_xid)); - - target_committed_to_req(req); - } - - EXIT; - out: - - if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) { - if (obd && obd->obd_recovering) { - DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply"); - return target_queue_final_reply(req, rc); - } - /* Lost a race with recovery; let the error path DTRT. */ - rc = req->rq_status = -ENOTCONN; - } - - target_send_reply(req, rc, fail); - return 0; -} - -/* Update the server data on disk. This stores the new mount_count and - * also the last_rcvd value to disk. If we don't have a clean shutdown, - * then the server last_rcvd value may be less than that of the clients. - * This will alert us that we may need to do client recovery. - * - * Also assumes for mds_last_transno that we are not modifying it (no locking). - */ -int mds_update_server_data(struct obd_device *obd, int force_sync) -{ - struct mds_obd *mds = &obd->u.mds; - struct lr_server_data *lsd = mds->mds_server_data; - struct file *filp = mds->mds_rcvd_filp; - struct lvfs_run_ctxt saved; - loff_t off = 0; - int rc; - ENTRY; - - CDEBUG(D_SUPER, "MDS mount_count is "LPU64", last_transno is "LPU64"\n", - mds->mds_mount_count, mds->mds_last_transno); - - spin_lock(&mds->mds_transno_lock); - lsd->lsd_last_transno = cpu_to_le64(mds->mds_last_transno); - spin_unlock(&mds->mds_transno_lock); - - push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); - rc = fsfilt_write_record(obd, filp, lsd, sizeof(*lsd), &off,force_sync); - pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); - if (rc) - CERROR("error writing MDS server data: rc = %d\n", rc); - - RETURN(rc); -} - -static void fsoptions_to_mds_flags(struct mds_obd *mds, char *options) -{ - char *p = options; - - if (!options) - return; - - while (*options) { - int len; - - while (*p && *p != ',') - p++; - - len = p - options; - if (len == sizeof("user_xattr") - 1 && - memcmp(options, "user_xattr", len) == 0) { - mds->mds_fl_user_xattr = 1; - LCONSOLE_INFO("Enabling user_xattr\n"); - } else if (len == sizeof("nouser_xattr") - 1 && - memcmp(options, "nouser_xattr", len) == 0) { - mds->mds_fl_user_xattr = 0; - LCONSOLE_INFO("Disabling user_xattr\n"); - } else if (len == sizeof("acl") - 1 && - memcmp(options, "acl", len) == 0) { -#ifdef CONFIG_FS_POSIX_ACL - mds->mds_fl_acl = 1; - LCONSOLE_INFO("Enabling ACL\n"); -#else - CWARN("ignoring unsupported acl mount option\n"); -#endif - } else if (len == sizeof("noacl") - 1 && - memcmp(options, "noacl", len) == 0) { -#ifdef CONFIG_FS_POSIX_ACL - mds->mds_fl_acl = 0; - LCONSOLE_INFO("Disabling ACL\n"); -#endif - } - - options = ++p; - } -} - -/* mount the file system (secretly). lustre_cfg parameters are: - * 1 = device - * 2 = fstype - * 3 = config name - * 4 = mount options - */ -static int mds_setup(struct obd_device *obd, obd_count len, void *buf) -{ - struct lprocfs_static_vars lvars; - struct lustre_cfg* lcfg = buf; - struct mds_obd *mds = &obd->u.mds; - struct lustre_mount_info *lmi; - struct vfsmount *mnt; - struct obd_uuid uuid; - __u8 *uuid_ptr; - char *options, *str, *label; - char ns_name[48]; - unsigned long page; - int rc = 0; - ENTRY; +struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid, + struct vfsmount **mnt) +{ + char fid_name[32]; + unsigned long ino = fid->id; + __u32 generation = fid->generation; + struct inode *inode; + struct dentry *result; - /* setup 1:/dev/loop/0 2:ext3 3:mdsA 4:errors=remount-ro,iopen_nopriv */ - - CLASSERT(offsetof(struct obd_device, u.obt) == - offsetof(struct obd_device, u.mds.mds_obt)); - - if (lcfg->lcfg_bufcount < 3) - RETURN(rc = -EINVAL); - - if (LUSTRE_CFG_BUFLEN(lcfg, 1) == 0 || LUSTRE_CFG_BUFLEN(lcfg, 2) == 0) - RETURN(rc = -EINVAL); - - lmi = server_get_mount(obd->obd_name); - if (lmi) { - /* We already mounted in lustre_fill_super. - lcfg bufs 1, 2, 4 (device, fstype, mount opts) are ignored.*/ - struct lustre_sb_info *lsi = s2lsi(lmi->lmi_sb); - fsoptions_to_mds_flags(mds, lsi->lsi_ldd->ldd_mount_opts); - fsoptions_to_mds_flags(mds, lsi->lsi_lmd->lmd_opts); - mnt = lmi->lmi_mnt; - obd->obd_fsops = fsfilt_get_ops(MT_STR(lsi->lsi_ldd)); - } else { - /* old path - used by lctl */ - CERROR("Using old MDS mount method\n"); - page = __get_free_page(GFP_KERNEL); - if (!page) - RETURN(-ENOMEM); + if (ino == 0) + RETURN(ERR_PTR(-ESTALE)); - options = (char *)page; - memset(options, 0, CFS_PAGE_SIZE); + snprintf(fid_name, sizeof(fid_name), "0x%lx", ino); - /* here we use "iopen_nopriv" hardcoded, because it affects - * MDS utility and the rest of options are passed by mount - * options. Probably this should be moved to somewhere else - * like startup scripts or lconf. */ - strcpy(options, "iopen_nopriv"); + /* under ext3 this is neither supposed to return bad inodes + nor NULL inodes. */ + result = ll_lookup_one_len(fid_name, mds->mds_fid_de, strlen(fid_name)); + if (IS_ERR(result)) + RETURN(result); - if (LUSTRE_CFG_BUFLEN(lcfg, 4) > 0 && lustre_cfg_buf(lcfg, 4)) { - sprintf(options + strlen(options), ",%s", - lustre_cfg_string(lcfg, 4)); - fsoptions_to_mds_flags(mds, options); - } - - mnt = do_kern_mount(lustre_cfg_string(lcfg, 2), 0, - lustre_cfg_string(lcfg, 1), - (void *)options); - free_page(page); - if (IS_ERR(mnt)) { - rc = PTR_ERR(mnt); - LCONSOLE_ERROR("Can't mount disk %s (%d)\n", - lustre_cfg_string(lcfg, 1), rc); - RETURN(rc); - } + inode = result->d_inode; + if (!inode) + RETURN(ERR_PTR(-ENOENT)); - obd->obd_fsops = fsfilt_get_ops(lustre_cfg_string(lcfg, 2)); + if (inode->i_generation == 0 || inode->i_nlink == 0) { + LCONSOLE_WARN("Found inode with zero generation or link -- this" + " may indicate disk corruption (inode: %lu/%u, " + "link %lu, count %d)\n", inode->i_ino, + inode->i_generation,(unsigned long)inode->i_nlink, + atomic_read(&inode->i_count)); + dput(result); + RETURN(ERR_PTR(-ENOENT)); } - if (IS_ERR(obd->obd_fsops)) - GOTO(err_put, rc = PTR_ERR(obd->obd_fsops)); - - CDEBUG(D_SUPER, "%s: mnt = %p\n", lustre_cfg_string(lcfg, 1), mnt); - - LASSERT(!lvfs_check_rdonly(lvfs_sbdev(mnt->mnt_sb))); - sema_init(&mds->mds_epoch_sem, 1); - spin_lock_init(&mds->mds_transno_lock); - mds->mds_max_mdsize = sizeof(struct lov_mds_md); - mds->mds_max_cookiesize = sizeof(struct llog_cookie); - mds->mds_atime_diff = MAX_ATIME_DIFF; - - sprintf(ns_name, "mds-%s", obd->obd_uuid.uuid); - obd->obd_namespace = ldlm_namespace_new(ns_name, LDLM_NAMESPACE_SERVER); - if (obd->obd_namespace == NULL) { - mds_cleanup(obd); - GOTO(err_ops, rc = -ENOMEM); + if (generation && inode->i_generation != generation) { + /* we didn't find the right inode.. */ + CDEBUG(D_INODE, "found wrong generation: inode %lu, link: %lu, " + "count: %d, generation %u/%u\n", inode->i_ino, + (unsigned long)inode->i_nlink, + atomic_read(&inode->i_count), inode->i_generation, + generation); + dput(result); + RETURN(ERR_PTR(-ENOENT)); } - ldlm_register_intent(obd->obd_namespace, mds_intent_policy); - rc = mds_fs_setup(obd, mnt); - if (rc) { - CERROR("%s: MDS filesystem method init failed: rc = %d\n", - obd->obd_name, rc); - GOTO(err_ns, rc); + if (mnt) { + *mnt = mds->mds_obt.obt_vfsmnt; + mntget(*mnt); } - rc = llog_start_commit_thread(); - if (rc < 0) - GOTO(err_fs, rc); + RETURN(result); +} + +static int mds_lov_presetup (struct mds_obd *mds, struct lustre_cfg *lcfg) +{ + int rc = 0; + ENTRY; if (lcfg->lcfg_bufcount >= 4 && LUSTRE_CFG_BUFLEN(lcfg, 3) > 0) { class_uuid_t uuid; - generate_random_uuid(uuid); + ll_generate_random_uuid(uuid); class_uuid_unparse(uuid, &mds->mds_lov_uuid); OBD_ALLOC(mds->mds_profile, LUSTRE_CFG_BUFLEN(lcfg, 3)); if (mds->mds_profile == NULL) - GOTO(err_fs, rc = -ENOMEM); + RETURN(-ENOMEM); strncpy(mds->mds_profile, lustre_cfg_string(lcfg, 3), LUSTRE_CFG_BUFLEN(lcfg, 3)); } - - ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL, - "mds_ldlm_client", &obd->obd_ldlm_client); - obd->obd_replayable = 1; - - rc = lquota_setup(mds_quota_interface_ref, obd); - if (rc) - GOTO(err_fs, rc); - - mds->mds_group_hash = upcall_cache_init(obd->obd_name); - if (IS_ERR(mds->mds_group_hash)) { - rc = PTR_ERR(mds->mds_group_hash); - mds->mds_group_hash = NULL; - GOTO(err_qctxt, rc); - } - - /* Don't wait for mds_postrecov trying to clear orphans */ - obd->obd_async_recov = 1; - rc = mds_postsetup(obd); - /* Bug 11557 - allow async abort_recov start - FIXME can remove most of this obd_async_recov plumbing - obd->obd_async_recov = 0; - */ - if (rc) - GOTO(err_qctxt, rc); - - lprocfs_init_vars(mds, &lvars); - if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0 && - lprocfs_alloc_obd_stats(obd, LPROC_MDS_LAST) == 0) { - /* Init private stats here */ - mds_stats_counter_init(obd->obd_stats); - obd->obd_proc_exports = proc_mkdir("exports", - obd->obd_proc_entry); - } - - uuid_ptr = fsfilt_uuid(obd, obd->u.obt.obt_sb); - if (uuid_ptr != NULL) { - class_uuid_unparse(uuid_ptr, &uuid); - str = uuid.uuid; - } else { - str = "no UUID"; - } - - label = fsfilt_get_label(obd, obd->u.obt.obt_sb); - if (obd->obd_recovering) { - LCONSOLE_WARN("MDT %s now serving %s (%s%s%s), but will be in " - "recovery until %d %s reconnect, or if no clients" - " reconnect for %d:%.02d; during that time new " - "clients will not be allowed to connect. " - "Recovery progress can be monitored by watching " - "/proc/fs/lustre/mds/%s/recovery_status.\n", - obd->obd_name, lustre_cfg_string(lcfg, 1), - label ?: "", label ? "/" : "", str, - obd->obd_recoverable_clients, - (obd->obd_recoverable_clients == 1) ? - "client" : "clients", - (int)(OBD_RECOVERY_TIMEOUT) / 60, - (int)(OBD_RECOVERY_TIMEOUT) % 60, - obd->obd_name); - } else { - LCONSOLE_INFO("MDT %s now serving %s (%s%s%s) with recovery " - "%s\n", obd->obd_name, lustre_cfg_string(lcfg, 1), - label ?: "", label ? "/" : "", str, - obd->obd_replayable ? "enabled" : "disabled"); - } - - ldlm_timeout = 6; - - RETURN(0); - -err_qctxt: - lquota_cleanup(mds_quota_interface_ref, obd); -err_fs: - /* No extra cleanup needed for llog_init_commit_thread() */ - mds_fs_cleanup(obd); - upcall_cache_cleanup(mds->mds_group_hash); - mds->mds_group_hash = NULL; -err_ns: - ldlm_namespace_free(obd->obd_namespace, 0); - obd->obd_namespace = NULL; -err_ops: - fsfilt_put_ops(obd->obd_fsops); -err_put: - if (lmi) { - server_put_mount(obd->obd_name, mnt); - } else { - /* old method */ - unlock_kernel(); - mntput(mnt); - lock_kernel(); - } - obd->u.obt.obt_sb = NULL; - return rc; + RETURN(rc); } static int mds_lov_clean(struct obd_device *obd) { struct mds_obd *mds = &obd->u.mds; - struct obd_device *osc = mds->mds_osc_obd; + struct obd_device *osc = mds->mds_lov_obd; ENTRY; if (mds->mds_profile) { @@ -2107,9 +170,8 @@ static int mds_lov_clean(struct obd_device *obd) osc->obd_fail = obd->obd_fail; /* Cleanup the lov */ - obd_disconnect(mds->mds_osc_exp); + obd_disconnect(mds->mds_lov_exp); class_manual_cleanup(osc); - mds->mds_osc_exp = NULL; RETURN(0); } @@ -2117,32 +179,35 @@ static int mds_lov_clean(struct obd_device *obd) static int mds_postsetup(struct obd_device *obd) { struct mds_obd *mds = &obd->u.mds; + struct llog_ctxt *ctxt; int rc = 0; ENTRY; - rc = llog_setup(obd, LLOG_CONFIG_ORIG_CTXT, obd, 0, NULL, + rc = llog_setup(obd, &obd->obd_olg, LLOG_CONFIG_ORIG_CTXT, obd, 0, NULL, &llog_lvfs_ops); if (rc) RETURN(rc); - rc = llog_setup(obd, LLOG_LOVEA_ORIG_CTXT, obd, 0, NULL, + rc = llog_setup(obd, &obd->obd_olg, LLOG_LOVEA_ORIG_CTXT, obd, 0, NULL, &llog_lvfs_ops); if (rc) - RETURN(rc); + GOTO(err_llog, rc); + + mds_changelog_llog_init(obd, obd); if (mds->mds_profile) { struct lustre_profile *lprof; - /* The profile defines which osc and mdc to connect to, for a + /* The profile defines which osc and mdc to connect to, for a client. We reuse that here to figure out the name of the - lov to use (and ignore lprof->lp_mdc). - The profile was set in the config log with + lov to use (and ignore lprof->lp_md). + The profile was set in the config log with LCFG_MOUNTOPT profilenm oscnm mdcnm */ lprof = class_get_profile(mds->mds_profile); if (lprof == NULL) { CERROR("No profile found: %s\n", mds->mds_profile); GOTO(err_cleanup, rc = -ENOENT); } - rc = mds_lov_connect(obd, lprof->lp_osc); + rc = mds_lov_connect(obd, lprof->lp_dt); if (rc) GOTO(err_cleanup, rc); } @@ -2151,48 +216,40 @@ static int mds_postsetup(struct obd_device *obd) err_cleanup: mds_lov_clean(obd); - llog_cleanup(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT)); - llog_cleanup(llog_get_context(obd, LLOG_LOVEA_ORIG_CTXT)); - RETURN(rc); + ctxt = llog_get_context(obd, LLOG_LOVEA_ORIG_CTXT); + if (ctxt) + llog_cleanup(ctxt); +err_llog: + ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT); + if (ctxt) + llog_cleanup(ctxt); + return rc; } int mds_postrecov(struct obd_device *obd) { - int rc; + int rc = 0; ENTRY; if (obd->obd_fail) RETURN(0); LASSERT(!obd->obd_recovering); - LASSERT(llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT) != NULL); - - /* FIXME why not put this in the synchronize? */ - /* set nextid first, so we are sure it happens */ - rc = mds_lov_set_nextid(obd); - if (rc) { - CERROR("%s: mds_lov_set_nextid failed %d\n", - obd->obd_name, rc); - GOTO(out, rc); - } - /* clean PENDING dir */ - rc = mds_cleanup_pending(obd); - if (rc < 0) - GOTO(out, rc); - +#if 0 + if (strncmp(obd->obd_name, MDD_OBD_NAME, strlen(MDD_OBD_NAME))) + rc = mds_cleanup_pending(obd); + if (rc < 0) + GOTO(out, rc); +#endif /* FIXME Does target_finish_recovery really need this to block? */ /* Notify the LOV, which will in turn call mds_notify for each tgt */ /* This means that we have to hack obd_notify to think we're obd_set_up during mds_lov_connect. */ - obd_notify(obd->u.mds.mds_osc_obd, NULL, + obd_notify(obd->u.mds.mds_lov_obd, NULL, obd->obd_async_recov ? OBD_NOTIFY_SYNC_NONBLOCK : OBD_NOTIFY_SYNC, NULL); - /* quota recovery */ - lquota_recovery(mds_quota_interface_ref, obd); - -out: RETURN(rc); } @@ -2200,7 +257,7 @@ out: static int mds_lov_early_clean(struct obd_device *obd) { struct mds_obd *mds = &obd->u.mds; - struct obd_device *osc = mds->mds_osc_obd; + struct obd_device *osc = mds->mds_lov_obd; if (!osc || (!obd->obd_force && !obd->obd_fail)) return(0); @@ -2211,6 +268,8 @@ static int mds_lov_early_clean(struct obd_device *obd) static int mds_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage) { + struct mds_obd *mds = &obd->u.mds; + struct llog_ctxt *ctxt; int rc = 0; ENTRY; @@ -2218,550 +277,214 @@ static int mds_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage) case OBD_CLEANUP_EARLY: break; case OBD_CLEANUP_EXPORTS: - target_cleanup_recovery(obd); mds_lov_early_clean(obd); - break; - case OBD_CLEANUP_SELF_EXP: + cfs_down_write(&mds->mds_notify_lock); mds_lov_disconnect(obd); mds_lov_clean(obd); - llog_cleanup(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT)); - llog_cleanup(llog_get_context(obd, LLOG_LOVEA_ORIG_CTXT)); + ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT); + if (ctxt) + llog_cleanup(ctxt); + ctxt = llog_get_context(obd, LLOG_LOVEA_ORIG_CTXT); + if (ctxt) + llog_cleanup(ctxt); rc = obd_llog_finish(obd, 0); - break; - case OBD_CLEANUP_OBD: + mds->mds_lov_exp = NULL; + cfs_up_write(&mds->mds_notify_lock); break; } RETURN(rc); } -static int mds_cleanup(struct obd_device *obd) +static struct dentry *mds_lvfs_fid2dentry(__u64 id, __u32 gen, __u64 gr, + void *data) { - struct mds_obd *mds = &obd->u.mds; - lvfs_sbdev_type save_dev; - int must_put = 0; - int must_relock = 0; - ENTRY; - - if (obd->u.obt.obt_sb == NULL) - RETURN(0); - save_dev = lvfs_sbdev(obd->u.obt.obt_sb); - - if (mds->mds_osc_exp) - /* lov export was disconnected by mds_lov_clean; - we just need to drop our ref */ - class_export_put(mds->mds_osc_exp); - - lprocfs_obd_cleanup(obd); - lprocfs_free_obd_stats(obd); - - lquota_cleanup(mds_quota_interface_ref, obd); - - mds_update_server_data(obd, 1); - if (mds->mds_lov_objids != NULL) - OBD_FREE(mds->mds_lov_objids, mds->mds_lov_objids_size); - mds_fs_cleanup(obd); - - upcall_cache_cleanup(mds->mds_group_hash); - mds->mds_group_hash = NULL; - - must_put = server_put_mount(obd->obd_name, mds->mds_vfsmnt); - /* must_put is for old method (l_p_m returns non-0 on err) */ - - /* We can only unlock kernel if we are in the context of sys_ioctl, - otherwise we never called lock_kernel */ - if (ll_kernel_locked()) { - unlock_kernel(); - must_relock++; - } - - if (must_put) { - /* In case we didn't mount with lustre_get_mount -- old method*/ - mntput(mds->mds_vfsmnt); - lvfs_clear_rdonly(save_dev); - } - obd->u.obt.obt_sb = NULL; - - ldlm_namespace_free(obd->obd_namespace, obd->obd_force); - - spin_lock_bh(&obd->obd_processing_task_lock); - if (obd->obd_recovering) { - target_cancel_recovery_timer(obd); - obd->obd_recovering = 0; - } - spin_unlock_bh(&obd->obd_processing_task_lock); - - if (must_relock) - lock_kernel(); - - fsfilt_put_ops(obd->obd_fsops); - - LCONSOLE_INFO("MDT %s has stopped.\n", obd->obd_name); - - RETURN(0); + struct obd_device *obd = data; + struct ll_fid fid; + fid.id = id; + fid.generation = gen; + return mds_fid2dentry(&obd->u.mds, &fid, NULL); } -static void fixup_handle_for_resent_req(struct ptlrpc_request *req, int offset, - struct ldlm_lock *new_lock, - struct ldlm_lock **old_lock, - struct lustre_handle *lockh) -{ - struct obd_export *exp = req->rq_export; - struct ldlm_request *dlmreq = - lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*dlmreq)); - struct lustre_handle remote_hdl = dlmreq->lock_handle1; - struct list_head *iter; - - if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT)) - return; - - spin_lock(&exp->exp_ldlm_data.led_lock); - list_for_each(iter, &exp->exp_ldlm_data.led_held_locks) { - struct ldlm_lock *lock; - lock = list_entry(iter, struct ldlm_lock, l_export_chain); - if (lock == new_lock) - continue; - if (lock->l_remote_handle.cookie == remote_hdl.cookie) { - lockh->cookie = lock->l_handle.h_cookie; - LDLM_DEBUG(lock, "restoring lock cookie"); - DEBUG_REQ(D_HA, req, "restoring lock cookie "LPX64, - lockh->cookie); - if (old_lock) - *old_lock = LDLM_LOCK_GET(lock); - spin_unlock(&exp->exp_ldlm_data.led_lock); - return; - } - } - spin_unlock(&exp->exp_ldlm_data.led_lock); - - /* If the xid matches, then we know this is a resent request, - * and allow it. (It's probably an OPEN, for which we don't - * send a lock */ - if (req->rq_xid == - le64_to_cpu(exp->exp_mds_data.med_mcd->mcd_last_xid)) - return; - - if (req->rq_xid == - le64_to_cpu(exp->exp_mds_data.med_mcd->mcd_last_close_xid)) - return; - /* This remote handle isn't enqueued, so we never received or - * processed this request. Clear MSG_RESENT, because it can - * be handled like any normal request now. */ - - lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT); - - DEBUG_REQ(D_HA, req, "no existing lock with rhandle "LPX64, - remote_hdl.cookie); -} +struct lvfs_callback_ops mds_lvfs_ops = { + l_fid2dentry: mds_lvfs_fid2dentry, +}; -int intent_disposition(struct ldlm_reply *rep, int flag) +static void mds_init_ctxt(struct obd_device *obd, struct vfsmount *mnt) { - if (!rep) - return 0; - return (rep->lock_policy_res1 & flag); -} + struct mds_obd *mds = &obd->u.mds; -void intent_set_disposition(struct ldlm_reply *rep, int flag) -{ - if (!rep) - return; - rep->lock_policy_res1 |= flag; + mds->mds_obt.obt_vfsmnt = mnt; + /* why not mnt->mnt_sb instead of mnt->mnt_root->d_inode->i_sb? */ + obd->u.obt.obt_sb = mnt->mnt_root->d_inode->i_sb; + obd->u.obt.obt_magic = OBT_MAGIC; + fsfilt_setup(obd, obd->u.obt.obt_sb); + + OBD_SET_CTXT_MAGIC(&obd->obd_lvfs_ctxt); + obd->obd_lvfs_ctxt.pwdmnt = mnt; + obd->obd_lvfs_ctxt.pwd = mnt->mnt_root; + obd->obd_lvfs_ctxt.fs = get_ds(); + obd->obd_lvfs_ctxt.cb_ops = mds_lvfs_ops; + return; } -static int mds_intent_policy(struct ldlm_namespace *ns, - struct ldlm_lock **lockp, void *req_cookie, - ldlm_mode_t mode, int flags, void *data) +/*mds still need lov setup here*/ +static int mds_cmd_setup(struct obd_device *obd, struct lustre_cfg *lcfg) { - struct ptlrpc_request *req = req_cookie; - struct ldlm_lock *lock = *lockp; - struct ldlm_intent *it; - struct mds_obd *mds = &req->rq_export->exp_obd->u.mds; - struct ldlm_reply *rep; - struct lustre_handle lockh = { 0 }; - struct ldlm_lock *new_lock = NULL; - int getattr_part = MDS_INODELOCK_UPDATE; - int repsize[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body), - [DLM_LOCKREPLY_OFF] = sizeof(struct ldlm_reply), - [DLM_REPLY_REC_OFF] = sizeof(struct mds_body), - [DLM_REPLY_REC_OFF+1] = mds->mds_max_mdsize }; - int repbufcnt = 4, rc; + struct mds_obd *mds = &obd->u.mds; + struct lvfs_run_ctxt saved; + const char *dev; + struct vfsmount *mnt; + struct lustre_sb_info *lsi; + struct lustre_mount_info *lmi; + struct dentry *dentry; + int rc = 0; ENTRY; - LASSERT(req != NULL); - - if (lustre_msg_bufcount(req->rq_reqmsg) <= DLM_INTENT_IT_OFF) { - /* No intent was provided */ - rc = lustre_pack_reply(req, 2, repsize, NULL); - LASSERT(rc == 0); + CDEBUG(D_INFO, "obd %s setup \n", obd->obd_name); + if (strncmp(obd->obd_name, MDD_OBD_NAME, strlen(MDD_OBD_NAME))) RETURN(0); - } - - it = lustre_swab_reqbuf(req, DLM_INTENT_IT_OFF, sizeof(*it), - lustre_swab_ldlm_intent); - if (it == NULL) { - CERROR("Intent missing\n"); - RETURN(req->rq_status = -EFAULT); - } - - LDLM_DEBUG(lock, "intent policy, opc: %s", ldlm_it2str(it->opc)); - if ((req->rq_export->exp_connect_flags & OBD_CONNECT_ACL) && - (it->opc & (IT_OPEN | IT_GETATTR | IT_LOOKUP))) - /* we should never allow OBD_CONNECT_ACL if not configured */ - repsize[repbufcnt++] = LUSTRE_POSIX_ACL_MAX_SIZE; - else if (it->opc & IT_UNLINK) - repsize[repbufcnt++] = mds->mds_max_cookiesize; - - rc = lustre_pack_reply(req, repbufcnt, repsize, NULL); - if (rc) - RETURN(req->rq_status = rc); - - rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF, sizeof(*rep)); - intent_set_disposition(rep, DISP_IT_EXECD); - - - /* execute policy */ - switch ((long)it->opc) { - case IT_OPEN: - case IT_CREAT|IT_OPEN: - mds_counter_incr(req->rq_export, LPROC_MDS_OPEN); - fixup_handle_for_resent_req(req, DLM_LOCKREQ_OFF, lock, NULL, - &lockh); - /* XXX swab here to assert that an mds_open reint - * packet is following */ - rep->lock_policy_res2 = mds_reint(req, DLM_INTENT_REC_OFF, - &lockh); -#if 0 - /* We abort the lock if the lookup was negative and - * we did not make it to the OPEN portion */ - if (!intent_disposition(rep, DISP_LOOKUP_EXECD)) - RETURN(ELDLM_LOCK_ABORTED); - if (intent_disposition(rep, DISP_LOOKUP_NEG) && - !intent_disposition(rep, DISP_OPEN_OPEN)) -#endif - - /* If there was an error of some sort or if we are not - * returning any locks */ - if (rep->lock_policy_res2 || - !intent_disposition(rep, DISP_OPEN_LOCK)) - RETURN(ELDLM_LOCK_ABORTED); - break; - case IT_LOOKUP: - getattr_part = MDS_INODELOCK_LOOKUP; - case IT_GETATTR: - getattr_part |= MDS_INODELOCK_LOOKUP; - OBD_COUNTER_INCREMENT(req->rq_export->exp_obd, getattr); - case IT_READDIR: - fixup_handle_for_resent_req(req, DLM_LOCKREQ_OFF, lock, - &new_lock, &lockh); - - /* INODEBITS_INTEROP: if this lock was converted from a - * plain lock (client does not support inodebits), then - * child lock must be taken with both lookup and update - * bits set for all operations. - */ - if (!(req->rq_export->exp_connect_flags & OBD_CONNECT_IBITS)) - getattr_part = MDS_INODELOCK_LOOKUP | - MDS_INODELOCK_UPDATE; - - rep->lock_policy_res2 = mds_getattr_lock(req,DLM_INTENT_REC_OFF, - getattr_part, &lockh); - /* FIXME: LDLM can set req->rq_status. MDS sets - policy_res{1,2} with disposition and status. - - replay: returns 0 & req->status is old status - - otherwise: returns req->status */ - if (intent_disposition(rep, DISP_LOOKUP_NEG)) - rep->lock_policy_res2 = 0; - if (!intent_disposition(rep, DISP_LOOKUP_POS) || - rep->lock_policy_res2) - RETURN(ELDLM_LOCK_ABORTED); - if (req->rq_status != 0) { - LBUG(); - rep->lock_policy_res2 = req->rq_status; - RETURN(ELDLM_LOCK_ABORTED); - } - break; - default: - CERROR("Unhandled intent "LPD64"\n", it->opc); - RETURN(-EFAULT); + if (lcfg->lcfg_bufcount < 5) { + CERROR("invalid arg for setup %s\n", MDD_OBD_NAME); + RETURN(-EINVAL); } + dev = lustre_cfg_string(lcfg, 4); + lmi = server_get_mount(dev); + LASSERT(lmi != NULL); - /* By this point, whatever function we called above must have either - * filled in 'lockh', been an intent replay, or returned an error. We - * want to allow replayed RPCs to not get a lock, since we would just - * drop it below anyways because lock replay is done separately by the - * client afterwards. For regular RPCs we want to give the new lock to - * the client instead of whatever lock it was about to get. */ - if (new_lock == NULL) - new_lock = ldlm_handle2lock(&lockh); - if (new_lock == NULL && (flags & LDLM_FL_INTENT_ONLY)) - RETURN(0); - - LASSERTF(new_lock != NULL, "op "LPX64" lockh "LPX64"\n", - it->opc, lockh.cookie); - - /* If we've already given this lock to a client once, then we should - * have no readers or writers. Otherwise, we should have one reader - * _or_ writer ref (which will be zeroed below) before returning the - * lock to a client. */ - if (new_lock->l_export == req->rq_export) { - LASSERT(new_lock->l_readers + new_lock->l_writers == 0); - } else { - LASSERT(new_lock->l_export == NULL); - LASSERT(new_lock->l_readers + new_lock->l_writers == 1); - } + lsi = s2lsi(lmi->lmi_sb); + mnt = lmi->lmi_mnt; + /* FIXME: MDD LOV initialize objects. + * we need only lmi here but not get mount + * OSD did mount already, so put mount back + */ + cfs_atomic_dec(&lsi->lsi_mounts); + mntput(mnt); + cfs_init_rwsem(&mds->mds_notify_lock); - *lockp = new_lock; + obd->obd_fsops = fsfilt_get_ops(MT_STR(lsi->lsi_ldd)); + mds_init_ctxt(obd, mnt); - if (new_lock->l_export == req->rq_export) { - /* Already gave this to the client, which means that we - * reconstructed a reply. */ - LASSERT(lustre_msg_get_flags(req->rq_reqmsg) & - MSG_RESENT); - RETURN(ELDLM_LOCK_REPLACED); + push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); + dentry = simple_mkdir(cfs_fs_pwd(current->fs), mnt, "OBJECTS", 0777, 1); + if (IS_ERR(dentry)) { + rc = PTR_ERR(dentry); + CERROR("cannot create OBJECTS directory: rc = %d\n", rc); + GOTO(err_putfs, rc); } + mds->mds_objects_dir = dentry; - /* Fixup the lock to be given to the client */ - lock_res_and_lock(new_lock); - new_lock->l_readers = 0; - new_lock->l_writers = 0; - - new_lock->l_export = class_export_get(req->rq_export); - spin_lock(&req->rq_export->exp_ldlm_data.led_lock); - list_add(&new_lock->l_export_chain, - &new_lock->l_export->exp_ldlm_data.led_held_locks); - spin_unlock(&req->rq_export->exp_ldlm_data.led_lock); - - new_lock->l_blocking_ast = lock->l_blocking_ast; - new_lock->l_completion_ast = lock->l_completion_ast; - - memcpy(&new_lock->l_remote_handle, &lock->l_remote_handle, - sizeof(lock->l_remote_handle)); - - new_lock->l_flags &= ~LDLM_FL_LOCAL; - - unlock_res_and_lock(new_lock); - LDLM_LOCK_PUT(new_lock); - - RETURN(ELDLM_LOCK_REPLACED); -} - -static int mdt_setup(struct obd_device *obd, obd_count len, void *buf) -{ - struct mds_obd *mds = &obd->u.mds; - struct lprocfs_static_vars lvars; - int mds_min_threads = MDS_THREADS_AUTO_MIN; - int mds_max_threads = MDS_THREADS_AUTO_MAX; - int rc = 0; - ENTRY; - - lprocfs_init_vars(mdt, &lvars); - lprocfs_obd_setup(obd, lvars.obd_vars); - - sema_init(&mds->mds_health_sem, 1); - - if (mds_num_threads) { - /* If mds_num_threads is set, it is the min and the max. */ - if (mds_num_threads > MDS_THREADS_MAX) - mds_num_threads = MDS_THREADS_MAX; - if (mds_num_threads < MDS_THREADS_MIN) - mds_num_threads = MDS_THREADS_MIN; - mds_max_threads = mds_min_threads = mds_num_threads; + dentry = ll_lookup_one_len("__iopen__", cfs_fs_pwd(current->fs), + strlen("__iopen__")); + if (IS_ERR(dentry)) { + rc = PTR_ERR(dentry); + CERROR("cannot lookup __iopen__ directory: rc = %d\n", rc); + GOTO(err_objects, rc); } - mds->mds_service = - ptlrpc_init_svc(MDS_NBUFS, MDS_BUFSIZE, MDS_MAXREQSIZE, - MDS_MAXREPSIZE, MDS_REQUEST_PORTAL, - MDC_REPLY_PORTAL, MDS_SERVICE_WATCHDOG_TIMEOUT, - mds_handle, LUSTRE_MDS_NAME, - obd->obd_proc_entry, NULL, - mds_min_threads, mds_max_threads, "ll_mdt"); - - if (!mds->mds_service) { - CERROR("failed to start service\n"); - GOTO(err_lprocfs, rc = -ENOMEM); + mds->mds_fid_de = dentry; + if (!dentry->d_inode || is_bad_inode(dentry->d_inode)) { + rc = -ENOENT; + CERROR("__iopen__ directory has no inode? rc = %d\n", rc); + GOTO(err_fid, rc); } - - rc = ptlrpc_start_threads(obd, mds->mds_service); - if (rc) - GOTO(err_thread, rc); - - mds->mds_setattr_service = - ptlrpc_init_svc(MDS_NBUFS, MDS_BUFSIZE, MDS_MAXREQSIZE, - MDS_MAXREPSIZE, MDS_SETATTR_PORTAL, - MDC_REPLY_PORTAL, MDS_SERVICE_WATCHDOG_TIMEOUT, - mds_handle, "mds_setattr", - obd->obd_proc_entry, NULL, - mds_min_threads, mds_max_threads, - "ll_mdt_attr"); - if (!mds->mds_setattr_service) { - CERROR("failed to start getattr service\n"); - GOTO(err_thread, rc = -ENOMEM); + rc = mds_lov_init_objids(obd); + if (rc != 0) { + CERROR("cannot init lov objid rc = %d\n", rc); + GOTO(err_fid, rc ); } - rc = ptlrpc_start_threads(obd, mds->mds_setattr_service); - if (rc) - GOTO(err_thread2, rc); - - mds->mds_readpage_service = - ptlrpc_init_svc(MDS_NBUFS, MDS_BUFSIZE, MDS_MAXREQSIZE, - MDS_MAXREPSIZE, MDS_READPAGE_PORTAL, - MDC_REPLY_PORTAL, MDS_SERVICE_WATCHDOG_TIMEOUT, - mds_handle, "mds_readpage", - obd->obd_proc_entry, NULL, - MDS_THREADS_MIN_READPAGE, mds_max_threads, - "ll_mdt_rdpg"); - if (!mds->mds_readpage_service) { - CERROR("failed to start readpage service\n"); - GOTO(err_thread2, rc = -ENOMEM); - } + rc = mds_lov_presetup(mds, lcfg); + if (rc < 0) + GOTO(err_objects, rc); - rc = ptlrpc_start_threads(obd, mds->mds_readpage_service); + /* Don't wait for mds_postrecov trying to clear orphans */ + obd->obd_async_recov = 1; + rc = mds_postsetup(obd); + /* Bug 11557 - allow async abort_recov start + FIXME can remove most of this obd_async_recov plumbing + obd->obd_async_recov = 0; + */ if (rc) - GOTO(err_thread3, rc); - - ping_evictor_start(); - - RETURN(0); - -err_thread3: - ptlrpc_unregister_service(mds->mds_readpage_service); - mds->mds_readpage_service = NULL; -err_thread2: - ptlrpc_unregister_service(mds->mds_setattr_service); - mds->mds_setattr_service = NULL; -err_thread: - ptlrpc_unregister_service(mds->mds_service); - mds->mds_service = NULL; -err_lprocfs: - lprocfs_obd_cleanup(obd); - return rc; -} - -static int mdt_cleanup(struct obd_device *obd) -{ - struct mds_obd *mds = &obd->u.mds; - ENTRY; - - ping_evictor_stop(); - - down(&mds->mds_health_sem); - ptlrpc_unregister_service(mds->mds_readpage_service); - ptlrpc_unregister_service(mds->mds_setattr_service); - ptlrpc_unregister_service(mds->mds_service); - mds->mds_readpage_service = NULL; - mds->mds_setattr_service = NULL; - mds->mds_service = NULL; - up(&mds->mds_health_sem); - - lprocfs_obd_cleanup(obd); + GOTO(err_objects, rc); - RETURN(0); +err_pop: + pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); + RETURN(rc); +err_fid: + dput(mds->mds_fid_de); +err_objects: + dput(mds->mds_objects_dir); +err_putfs: + fsfilt_put_ops(obd->obd_fsops); + goto err_pop; } -static int mdt_health_check(struct obd_device *obd) +static int mds_cmd_cleanup(struct obd_device *obd) { struct mds_obd *mds = &obd->u.mds; + struct lvfs_run_ctxt saved; int rc = 0; + ENTRY; - down(&mds->mds_health_sem); - rc |= ptlrpc_service_health_check(mds->mds_readpage_service); - rc |= ptlrpc_service_health_check(mds->mds_setattr_service); - rc |= ptlrpc_service_health_check(mds->mds_service); - up(&mds->mds_health_sem); + mds->mds_lov_exp = NULL; - /* - * health_check to return 0 on healthy - * and 1 on unhealthy. - */ - if(rc != 0) - rc = 1; + if (obd->obd_fail) + LCONSOLE_WARN("%s: shutting down for failover; client state " + "will be preserved.\n", obd->obd_name); - return rc; -} + if (strncmp(obd->obd_name, MDD_OBD_NAME, strlen(MDD_OBD_NAME))) + RETURN(0); -static struct dentry *mds_lvfs_fid2dentry(__u64 id, __u32 gen, __u64 gr, - void *data) -{ - struct obd_device *obd = data; - struct ll_fid fid; - fid.id = id; - fid.generation = gen; - return mds_fid2dentry(&obd->u.mds, &fid, NULL); -} + push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); -static int mds_health_check(struct obd_device *obd) -{ - struct obd_device_target *odt = &obd->u.obt; - struct mds_obd *mds = &obd->u.mds; - int rc = 0; + mds_lov_destroy_objids(obd); - if (odt->obt_sb->s_flags & MS_RDONLY) - rc = 1; + if (mds->mds_objects_dir != NULL) { + l_dput(mds->mds_objects_dir); + mds->mds_objects_dir = NULL; + } - LASSERT(mds->mds_health_check_filp != NULL); - rc |= !!lvfs_check_io_health(obd, mds->mds_health_check_filp); + dput(mds->mds_fid_de); + ll_vfs_dq_off(obd->u.obt.obt_sb, 0); + shrink_dcache_sb(mds->mds_obt.obt_sb); + fsfilt_put_ops(obd->obd_fsops); - return rc; + pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); + RETURN(rc); } -static int mds_process_config(struct obd_device *obd, obd_count len, void *buf) +#if 0 +static int mds_cmd_health_check(struct obd_device *obd) { - struct lustre_cfg *lcfg = buf; - struct lprocfs_static_vars lvars; - int rc; - - lprocfs_init_vars(mds, &lvars); - - rc = class_process_proc_param(PARAM_MDT, lvars.obd_vars, lcfg, obd); - - return(rc); + return 0; } - -struct lvfs_callback_ops mds_lvfs_ops = { - l_fid2dentry: mds_lvfs_fid2dentry, -}; - -/* use obd ops to offer management infrastructure */ -static struct obd_ops mds_obd_ops = { +#endif +static struct obd_ops mds_cmd_obd_ops = { .o_owner = THIS_MODULE, - .o_connect = mds_connect, - .o_reconnect = mds_reconnect, - .o_init_export = mds_init_export, - .o_destroy_export = mds_destroy_export, - .o_disconnect = mds_disconnect, - .o_setup = mds_setup, + .o_setup = mds_cmd_setup, + .o_cleanup = mds_cmd_cleanup, .o_precleanup = mds_precleanup, - .o_cleanup = mds_cleanup, - .o_postrecov = mds_postrecov, - .o_statfs = mds_obd_statfs, - .o_iocontrol = mds_iocontrol, .o_create = mds_obd_create, .o_destroy = mds_obd_destroy, .o_llog_init = mds_llog_init, .o_llog_finish = mds_llog_finish, .o_notify = mds_notify, - .o_health_check = mds_health_check, - .o_process_config = mds_process_config, -}; - -static struct obd_ops mdt_obd_ops = { - .o_owner = THIS_MODULE, - .o_setup = mdt_setup, - .o_cleanup = mdt_cleanup, - .o_health_check = mdt_health_check, + .o_postrecov = mds_postrecov, + // .o_health_check = mds_cmd_health_check, }; quota_interface_t *mds_quota_interface_ref; extern quota_interface_t mds_quota_interface; -static int __init mds_init(void) +static int __init mds_cmd_init(void) { - int rc; struct lprocfs_static_vars lvars; + int rc; - request_module("lquota"); + cfs_request_module("%s", "lquota"); mds_quota_interface_ref = PORTAL_SYMBOL_GET(mds_quota_interface); rc = lquota_init(mds_quota_interface_ref); if (rc) { @@ -2769,29 +492,28 @@ static int __init mds_init(void) PORTAL_SYMBOL_PUT(mds_quota_interface); return rc; } - init_obd_quota_ops(mds_quota_interface_ref, &mds_obd_ops); - - lprocfs_init_vars(mds, &lvars); - class_register_type(&mds_obd_ops, lvars.module_vars, LUSTRE_MDS_NAME); - lprocfs_init_vars(mdt, &lvars); - class_register_type(&mdt_obd_ops, lvars.module_vars, LUSTRE_MDT_NAME); + init_obd_quota_ops(mds_quota_interface_ref, &mds_cmd_obd_ops); + + lprocfs_mds_init_vars(&lvars); + class_register_type(&mds_cmd_obd_ops, NULL, lvars.module_vars, + LUSTRE_MDS_NAME, NULL); return 0; } -static void /*__exit*/ mds_exit(void) +static void /*__exit*/ mds_cmd_exit(void) { lquota_exit(mds_quota_interface_ref); if (mds_quota_interface_ref) PORTAL_SYMBOL_PUT(mds_quota_interface); class_unregister_type(LUSTRE_MDS_NAME); - class_unregister_type(LUSTRE_MDT_NAME); } -MODULE_AUTHOR("Cluster File Systems, Inc. "); +EXPORT_SYMBOL(mds_quota_interface_ref); +MODULE_AUTHOR("Sun Microsystems, Inc. "); MODULE_DESCRIPTION("Lustre Metadata Server (MDS)"); MODULE_LICENSE("GPL"); -module_init(mds_init); -module_exit(mds_exit); +module_init(mds_cmd_init); +module_exit(mds_cmd_exit);