X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fmds%2Fhandler.c;h=6bd5e0c6981853169b337489ef44e0c0079d245a;hp=c09a6188c7e1469258728ef0e66b4d94c5181867;hb=400b0681017091fab9cef9bd00e0f536e1793dcc;hpb=d8b0dffb5c369fc94fccdeeb689e6405836d0cd2 diff --git a/lustre/mds/handler.c b/lustre/mds/handler.c index c09a6188..6bd5e0c 100644 --- a/lustre/mds/handler.c +++ b/lustre/mds/handler.c @@ -1,20 +1,28 @@ /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * - * linux/mds/handler.c - * + * lustre/mds/handler.c * Lustre Metadata Server (mds) request handler * - * Copyright (C) 2001, 2002 Cluster File Systems, Inc. + * Copyright (c) 2001, 2002 Cluster File Systems, Inc. + * Author: Peter Braam + * Author: Andreas Dilger + * Author: Phil Schwan * - * This code is issued under the GNU General Public License. - * See the file COPYING in this distribution + * This file is part of Lustre, http://www.lustre.org. * - * by Peter Braam & - * Andreas Dilger + * Lustre is free software; you can redistribute it and/or + * modify it under the terms of version 2 of the GNU General Public + * License as published by the Free Software Foundation. * - * This server is single threaded at present (but can easily be multi threaded) + * Lustre is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. * + * You should have received a copy of the GNU General Public License + * along with Lustre; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */ #define EXPORT_SYMTAB @@ -23,26 +31,50 @@ #include #include #include -extern int mds_get_lovtgts(struct obd_device *obd, int tgt_count, - uuid_t *uuidarray); -extern int mds_get_lovdesc(struct obd_device *obd, struct lov_desc *desc); +#include +#include +#include +#include +#if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0)) +#include +#endif +#include + +static kmem_cache_t *mds_file_cache; + +extern int mds_get_lovtgts(struct mds_obd *obd, int tgt_count, + obd_uuid_t *uuidarray); +extern int mds_get_lovdesc(struct mds_obd *obd, struct lov_desc *desc); extern int mds_update_last_rcvd(struct mds_obd *mds, void *handle, struct ptlrpc_request *req); static int mds_cleanup(struct obd_device * obddev); +extern lprocfs_vars_t status_var_nm_1[]; +extern lprocfs_vars_t status_class_var[]; + inline struct mds_obd *mds_req2mds(struct ptlrpc_request *req) { return &req->rq_export->exp_obd->u.mds; } +static int mds_bulk_timeout(void *data) +{ + struct ptlrpc_bulk_desc *desc = data; + + ENTRY; + CERROR("(not yet) starting recovery of client %p\n", desc->bd_client); + RETURN(1); +} + /* Assumes caller has already pushed into the kernel filesystem context */ static int mds_sendpage(struct ptlrpc_request *req, struct file *file, __u64 offset) { int rc = 0; - struct mds_obd *mds = mds_req2mds(req); + struct mds_obd *mds = mds_req2mds(req); struct ptlrpc_bulk_desc *desc; struct ptlrpc_bulk_page *bulk; + struct l_wait_info lwi; char *buf; ENTRY; @@ -63,10 +95,10 @@ static int mds_sendpage(struct ptlrpc_request *req, struct file *file, if (rc != PAGE_SIZE) GOTO(cleanup_buf, rc = -EIO); - bulk->b_xid = req->rq_xid; - bulk->b_buf = buf; - bulk->b_buflen = PAGE_SIZE; - desc->b_portal = MDS_BULK_PORTAL; + bulk->bp_xid = req->rq_xid; + bulk->bp_buf = buf; + bulk->bp_buflen = PAGE_SIZE; + desc->bd_portal = MDS_BULK_PORTAL; rc = ptlrpc_send_bulk(desc); if (rc) @@ -79,9 +111,13 @@ static int mds_sendpage(struct ptlrpc_request *req, struct file *file, GOTO(cleanup_buf, rc); } - wait_event(desc->b_waitq, ptlrpc_check_bulk_sent(desc)); - if (desc->b_flags & PTL_RPC_FL_INTR) - GOTO(cleanup_buf, rc = -EINTR); + lwi = LWI_TIMEOUT(obd_timeout * HZ, mds_bulk_timeout, desc); + rc = l_wait_event(desc->bd_waitq, desc->bd_flags & PTL_BULK_FL_SENT, &lwi); + if (rc) { + if (rc != -ETIMEDOUT) + LBUG(); + GOTO(cleanup_buf, rc); + } EXIT; cleanup_buf: @@ -92,15 +128,23 @@ static int mds_sendpage(struct ptlrpc_request *req, struct file *file, return rc; } -/* 'dir' is a inode for which a lock has already been taken */ -struct dentry *mds_name2locked_dentry(struct mds_obd *mds, struct dentry *dir, - struct vfsmount **mnt, char *name, - int namelen, int lock_mode, - struct lustre_handle *lockh, +/* + * Look up a named entry in a directory, and get an LDLM lock on it. + * 'dir' is a inode for which an LDLM lock has already been taken. + * + * If we do not need an exclusive or write lock on this entry (e.g. + * a read lock for attribute lookup only) then we do not hold the + * directory on return. It is up to the caller to know what type + * of lock it is getting, and clean up appropriately. + */ +struct dentry *mds_name2locked_dentry(struct obd_device *obd, + struct dentry *dir, struct vfsmount **mnt, + char *name, int namelen, int lock_mode, + struct lustre_handle *lockh, int dir_lock_mode) { struct dentry *dchild; - int flags, rc; + int flags = 0, rc; __u64 res_id[3] = {0}; ENTRY; @@ -110,36 +154,38 @@ struct dentry *mds_name2locked_dentry(struct mds_obd *mds, struct dentry *dir, CERROR("child lookup error %ld\n", PTR_ERR(dchild)); up(&dir->d_inode->i_sem); LBUG(); + RETURN(dchild); } - if (dir_lock_mode != LCK_EX && dir_lock_mode != LCK_PW) { + if (dir_lock_mode != LCK_EX && dir_lock_mode != LCK_PW) { up(&dir->d_inode->i_sem); - ldlm_lock_decref(lockh, dir_lock_mode); + ldlm_lock_decref(lockh, dir_lock_mode); } if (lock_mode == 0 || !dchild->d_inode) RETURN(dchild); res_id[0] = dchild->d_inode->i_ino; - rc = ldlm_match_or_enqueue(mds->mds_ldlm_client, mds->mds_ldlm_conn, - (struct lustre_handle *)&mds->mds_connh, - NULL, mds->mds_local_namespace, NULL, + res_id[1] = dchild->d_inode->i_generation; + rc = ldlm_match_or_enqueue(NULL, NULL, obd->obd_namespace, NULL, res_id, LDLM_PLAIN, NULL, 0, lock_mode, - &flags, (void *)mds_lock_callback, NULL, - 0, lockh); + &flags, ldlm_completion_ast, + mds_blocking_ast, NULL, 0, lockh); if (rc != ELDLM_OK) { l_dput(dchild); - RETURN(NULL); + up(&dir->d_inode->i_sem); + RETURN(ERR_PTR(-ENOLCK)); /* XXX translate ldlm code */ } RETURN(dchild); } -struct dentry *mds_fid2locked_dentry(struct mds_obd *mds, struct ll_fid *fid, +struct dentry *mds_fid2locked_dentry(struct obd_device *obd, struct ll_fid *fid, struct vfsmount **mnt, int lock_mode, struct lustre_handle *lockh) { + struct mds_obd *mds = &obd->u.mds; struct dentry *de = mds_fid2dentry(mds, fid, mnt), *retval = de; - int flags, rc; + int flags = 0, rc; __u64 res_id[3] = {0}; ENTRY; @@ -147,20 +193,24 @@ struct dentry *mds_fid2locked_dentry(struct mds_obd *mds, struct ll_fid *fid, RETURN(de); res_id[0] = de->d_inode->i_ino; - rc = ldlm_match_or_enqueue(mds->mds_ldlm_client, mds->mds_ldlm_conn, - (struct lustre_handle *)&mds->mds_connh, - NULL, mds->mds_local_namespace, NULL, + res_id[1] = de->d_inode->i_generation; + rc = ldlm_match_or_enqueue(NULL, NULL, obd->obd_namespace, NULL, res_id, LDLM_PLAIN, NULL, 0, lock_mode, - &flags, (void *)mds_lock_callback, NULL, - 0, lockh); + &flags, ldlm_completion_ast, + mds_blocking_ast, NULL, 0, lockh); if (rc != ELDLM_OK) { l_dput(de); - retval = NULL; + retval = ERR_PTR(-ENOLCK); /* XXX translate ldlm code */ } RETURN(retval); } +#ifndef DCACHE_DISCONNECTED +#define DCACHE_DISCONNECTED DCACHE_NFSD_DISCONNECTED +#endif + +/* Look up an entry by inode number. */ struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid, struct vfsmount **mnt) { @@ -193,15 +243,13 @@ struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid, RETURN(ERR_PTR(-ESTALE)); } - /* now to find a dentry. - * If possible, get a well-connected one - */ + /* now to find a dentry. If possible, get a well-connected one */ if (mnt) *mnt = mds->mds_vfsmnt; spin_lock(&dcache_lock); - for (lp = inode->i_dentry.next; lp != &inode->i_dentry ; lp=lp->next) { - result = list_entry(lp,struct dentry, d_alias); - if (! (result->d_flags & DCACHE_NFSD_DISCONNECTED)) { + list_for_each(lp, &inode->i_dentry) { + result = list_entry(lp, struct dentry, d_alias); + if (!(result->d_flags & DCACHE_DISCONNECTED)) { dget_locked(result); result->d_vfs_flags |= DCACHE_REFERENCED; spin_unlock(&dcache_lock); @@ -219,41 +267,163 @@ struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid, } if (mnt) mntget(*mnt); - result->d_flags |= DCACHE_NFSD_DISCONNECTED; + result->d_flags |= DCACHE_DISCONNECTED; return result; } -static int mds_connect(struct lustre_handle *conn, struct obd_device *obd) +/* Establish a connection to the MDS. + * + * This will set up an export structure for the client to hold state data + * about that client, like open files, the last operation number it did + * on the server, etc. + */ +static int mds_connect(struct lustre_handle *conn, struct obd_device *obd, + obd_uuid_t cluuid, struct recovd_obd *recovd, + ptlrpc_recovery_cb_t recover) { + struct obd_export *exp; + struct mds_export_data *med; + struct mds_client_data *mcd; + struct list_head *p; int rc; + ENTRY; + + if (!conn || !obd || !cluuid) + RETURN(-EINVAL); MOD_INC_USE_COUNT; - rc = class_connect(conn, obd); + spin_lock(&obd->obd_dev_lock); + list_for_each(p, &obd->obd_exports) { + exp = list_entry(p, struct obd_export, exp_obd_chain); + mcd = exp->exp_mds_data.med_mcd; + if (!memcmp(cluuid, mcd->mcd_uuid, sizeof(mcd->mcd_uuid))) { + LASSERT(exp->exp_obd == obd); + + if (!list_empty(&exp->exp_conn_chain)) { + CERROR("existing uuid/export, list not empty!\n"); + spin_unlock(&obd->obd_dev_lock); + /* XXX should we MOD_DEC_USE_COUNT; here? */ + RETURN(-EALREADY); + } + conn->addr = (__u64) (unsigned long)exp; + conn->cookie = exp->exp_cookie; + spin_unlock(&obd->obd_dev_lock); + CDEBUG(D_INFO, "existing export for UUID '%s' at %p\n", + cluuid, exp); + CDEBUG(D_IOCTL,"connect: addr %Lx cookie %Lx\n", + (long long)conn->addr, (long long)conn->cookie); + MOD_DEC_USE_COUNT; + RETURN(0); + } + } + spin_unlock(&obd->obd_dev_lock); + /* XXX There is a small race between checking the list and adding a + * new connection for the same UUID, but the real threat (list + * corruption when multiple different clients connect) is solved. + */ + rc = class_connect(conn, obd, cluuid); if (rc) - MOD_DEC_USE_COUNT; + GOTO(out_dec, rc); + exp = class_conn2export(conn); + LASSERT(exp); + med = &exp->exp_mds_data; + + OBD_ALLOC(mcd, sizeof(*mcd)); + if (!mcd) { + CERROR("mds: out of memory for client data\n"); + GOTO(out_export, rc = -ENOMEM); + } + + memcpy(mcd->mcd_uuid, cluuid, sizeof(mcd->mcd_uuid)); + med->med_mcd = mcd; + + INIT_LIST_HEAD(&med->med_open_head); + spin_lock_init(&med->med_open_lock); + + rc = mds_client_add(med, -1); + if (rc) + GOTO(out_mdc, rc); + + RETURN(0); + +out_mdc: + OBD_FREE(mcd, sizeof(*mcd)); +out_export: + class_disconnect(conn); +out_dec: + MOD_DEC_USE_COUNT; return rc; } +/* Call with med->med_open_lock held, please. */ +inline int mds_close_mfd(struct mds_file_data *mfd, struct mds_export_data *med) +{ + struct file *file = mfd->mfd_file; + LASSERT(file->private_data == mfd); + + list_del(&mfd->mfd_list); + mfd->mfd_servercookie = DEAD_HANDLE_MAGIC; + kmem_cache_free(mds_file_cache, mfd); + + return filp_close(file, 0); +} + static int mds_disconnect(struct lustre_handle *conn) { + struct obd_export *export = class_conn2export(conn); + struct list_head *tmp, *n; + struct mds_export_data *med = &export->exp_mds_data; int rc; + ENTRY; + + /* + * Close any open files. + */ + spin_lock(&med->med_open_lock); + list_for_each_safe(tmp, n, &med->med_open_head) { + struct mds_file_data *mfd = + list_entry(tmp, struct mds_file_data, mfd_list); + rc = mds_close_mfd(mfd, med); + if (rc) { + /* XXX better diagnostics, with file path and stuff */ + CDEBUG(D_INODE, "Error %d closing mfd %p\n", rc, mfd); + } + } + spin_unlock(&med->med_open_lock); + + ldlm_cancel_locks_for_export(export); + mds_client_free(export); rc = class_disconnect(conn); if (!rc) MOD_DEC_USE_COUNT; - return rc; + RETURN(rc); +} + +/* + * XXX This is NOT guaranteed to flush all transactions to disk (even though + * it is equivalent to calling sync()) because it only _starts_ the flush + * and does not wait for completion. It's better than nothing though. + * What we really want is a mild form of fsync_dev_lockfs(), but it is + * non-standard, or enabling do_sync_supers in ext3, just for this call. + */ +static void mds_fsync_super(struct super_block *sb) +{ + lock_kernel(); + lock_super(sb); + if (sb->s_dirt && sb->s_op && sb->s_op->write_super) + sb->s_op->write_super(sb); + unlock_super(sb); + unlock_kernel(); } -/* FIXME: the error cases need fixing to avoid leaks */ static int mds_getstatus(struct ptlrpc_request *req) { struct mds_obd *mds = mds_req2mds(req); struct mds_body *body; - struct mds_client_info *mci; - struct mds_client_data *mcd; int rc, size = sizeof(*body); ENTRY; @@ -264,44 +434,19 @@ static int mds_getstatus(struct ptlrpc_request *req) RETURN(0); } - body = lustre_msg_buf(req->rq_reqmsg, 0); - mds_unpack_body(body); + /* Flush any outstanding transactions to disk so the client will + * get the latest last_committed value and can drop their local + * requests if they have any. This would be fsync_super() if it + * was exported. + */ + mds_fsync_super(mds->mds_sb); - /* Anything we need to do here with the client's trans no or so? */ body = lustre_msg_buf(req->rq_repmsg, 0); memcpy(&body->fid1, &mds->mds_rootfid, sizeof(body->fid1)); - mci = mds_uuid_to_mci(mds, ptlrpc_req_to_uuid(req)); - if (!mci) { - /* We don't have any old connection data for this client */ - int rc; - - CDEBUG(D_INFO, "allocating new client data for UUID '%s'", - ptlrpc_req_to_uuid(req)); - - OBD_ALLOC(mcd, sizeof(*mcd)); - if (!mcd) { - CERROR("mds: out of memory for client data\n"); - req->rq_status = -ENOMEM; - RETURN(0); - } - memcpy(mcd->mcd_uuid, ptlrpc_req_to_uuid(req), - sizeof(mcd->mcd_uuid)); - rc = mds_client_add(mds, mcd, -1); - if (rc) { - req->rq_status = rc; - RETURN(0); - } - } else { - /* We have old connection data for this client... */ - mcd = mci->mci_mcd; - CDEBUG(D_INFO, "found existing data for UUID '%s' at #%d\n", - mcd->mcd_uuid, mci->mci_off); - } - /* mcd_last_xid is is stored in little endian on the disk and - mds_pack_rep_body converts it to network order */ - body->last_xid = le32_to_cpu(mcd->mcd_last_xid); - mds_pack_rep_body(req); + /* the last_committed and last_xid fields are filled in for all + * replies already - no need to do so here also. + */ RETURN(0); } @@ -309,40 +454,41 @@ static int mds_getlovinfo(struct ptlrpc_request *req) { struct mds_obd *mds = mds_req2mds(req); struct mds_status_req *streq; - struct lov_desc *desc; + struct lov_desc *desc; int tgt_count; int rc, size[2] = {sizeof(*desc)}; ENTRY; - streq = lustre_msg_buf(req->rq_reqmsg, 0); - streq->flags = NTOH__u32(streq->flags); - streq->repbuf = NTOH__u32(streq->repbuf); + streq = lustre_msg_buf(req->rq_reqmsg, 0); + streq->flags = NTOH__u32(streq->flags); + streq->repbuf = NTOH__u32(streq->repbuf); size[1] = streq->repbuf; rc = lustre_pack_msg(2, size, NULL, &req->rq_replen, &req->rq_repmsg); - if (rc) { + if (rc) { CERROR("mds: out of memory for message: size=%d\n", size[1]); req->rq_status = -ENOMEM; RETURN(0); } - desc = lustre_msg_buf(req->rq_repmsg, 0); - rc = mds_get_lovdesc(req->rq_obd, desc); - if (rc != 0 ) { + desc = lustre_msg_buf(req->rq_repmsg, 0); + rc = mds_get_lovdesc(mds, desc); + if (rc) { CERROR("mds_get_lovdesc error %d", rc); req->rq_status = rc; RETURN(0); } - tgt_count = NTOH__u32(desc->ld_tgt_count); - if (tgt_count * sizeof(uuid_t) > streq->repbuf) { + tgt_count = le32_to_cpu(desc->ld_tgt_count); + if (tgt_count * sizeof(obd_uuid_t) > streq->repbuf) { CERROR("too many targets, enlarge client buffers\n"); req->rq_status = -ENOSPC; RETURN(0); } - mds->mds_max_mdsize = sizeof(desc) + tgt_count * sizeof(uuid_t); - rc = mds_get_lovtgts(req->rq_obd, tgt_count, + mds->mds_max_mdsize = sizeof(struct lov_mds_md) + + tgt_count * sizeof(struct lov_object_id); + rc = mds_get_lovtgts(mds, tgt_count, lustre_msg_buf(req->rq_repmsg, 1)); if (rc) { CERROR("get_lovtgts error %d\n", rc); @@ -352,36 +498,101 @@ static int mds_getlovinfo(struct ptlrpc_request *req) RETURN(0); } -int mds_lock_callback(struct lustre_handle *lockh, struct ldlm_lock_desc *desc, - void *data, int data_len, struct ptlrpc_request **reqp) +int mds_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, + void *data, __u32 data_len, int flag) { + int do_ast; ENTRY; - if (desc == NULL) { - /* Completion AST. Do nothing */ + if (flag == LDLM_CB_CANCELING) { + /* Don't need to do anything here. */ RETURN(0); } - if (ldlm_cli_cancel(lockh) < 0) - LBUG(); + /* XXX layering violation! -phil */ + l_lock(&lock->l_resource->lr_namespace->ns_lock); + lock->l_flags |= LDLM_FL_CBPENDING; + do_ast = (!lock->l_readers && !lock->l_writers); + l_unlock(&lock->l_resource->lr_namespace->ns_lock); + + if (do_ast) { + struct lustre_handle lockh; + int rc; + + LDLM_DEBUG(lock, "already unused, calling ldlm_cli_cancel"); + ldlm_lock2handle(lock, &lockh); + rc = ldlm_cli_cancel(&lockh); + if (rc < 0) + CERROR("ldlm_cli_cancel: %d\n", rc); + } else + LDLM_DEBUG(lock, "Lock still has references, will be" + "cancelled later"); + RETURN(0); +} + +static int mds_getattr_internal(struct mds_obd *mds, struct dentry *dentry, + struct ptlrpc_request *req, + struct mds_body *reqbody, int reply_off) +{ + struct mds_body *body; + struct inode *inode = dentry->d_inode; + int rc; + ENTRY; + + if (inode == NULL) + RETURN(-ENOENT); + + body = lustre_msg_buf(req->rq_repmsg, reply_off); + + mds_pack_inode2fid(&body->fid1, inode); + mds_pack_inode2body(body, inode); + + if (S_ISREG(inode->i_mode)) { + struct lov_mds_md *lmm; + + lmm = lustre_msg_buf(req->rq_repmsg, reply_off + 1); + lmm->lmm_easize = mds->mds_max_mdsize; + rc = mds_fs_get_md(mds, inode, lmm); + + if (rc < 0) { + if (rc == -ENODATA) + RETURN(0); + CERROR("mds_fs_get_md failed: %d\n", rc); + RETURN(rc); + } + body->valid |= OBD_MD_FLEASIZE; + } else if (S_ISLNK(inode->i_mode) && reqbody->valid & OBD_MD_LINKNAME) { + char *symname = lustre_msg_buf(req->rq_repmsg, reply_off + 1); + int len = req->rq_repmsg->buflens[reply_off + 1]; + + rc = inode->i_op->readlink(dentry, symname, len); + if (rc < 0) { + CERROR("readlink failed: %d\n", rc); + RETURN(rc); + } else + CDEBUG(D_INODE, "read symlink dest %s\n", symname); + + body->valid |= OBD_MD_LINKNAME; + } RETURN(0); } static int mds_getattr_name(int offset, struct ptlrpc_request *req) { struct mds_obd *mds = mds_req2mds(req); + struct obd_device *obd = req->rq_export->exp_obd; struct obd_run_ctxt saved; struct mds_body *body; struct dentry *de = NULL, *dchild = NULL; struct inode *dir; struct lustre_handle lockh; char *name; - int namelen, flags, lock_mode, rc = 0; + int namelen, flags = 0, lock_mode, rc = 0; + struct obd_ucred uc; __u64 res_id[3] = {0, 0, 0}; ENTRY; - if (strcmp(req->rq_export->exp_obd->obd_type->typ_name, "mds") != 0) - LBUG(); + LASSERT(!strcmp(req->rq_export->exp_obd->obd_type->typ_name, "mds")); if (req->rq_reqmsg->bufcount <= offset + 1) { LBUG(); @@ -395,7 +606,9 @@ static int mds_getattr_name(int offset, struct ptlrpc_request *req) if (offset) offset = 1; - push_ctxt(&saved, &mds->mds_ctxt); + uc.ouc_fsuid = body->fsuid; + uc.ouc_fsgid = body->fsgid; + push_ctxt(&saved, &mds->mds_ctxt, &uc); de = mds_fid2dentry(mds, &body->fid1, NULL); if (IS_ERR(de)) { LBUG(); @@ -403,21 +616,20 @@ static int mds_getattr_name(int offset, struct ptlrpc_request *req) } dir = de->d_inode; - CDEBUG(D_INODE, "parent ino %ld\n", dir->i_ino); + CDEBUG(D_INODE, "parent ino %ld, name %*s\n", dir->i_ino,namelen,name); lock_mode = (req->rq_reqmsg->opc == MDS_REINT) ? LCK_CW : LCK_PW; res_id[0] = dir->i_ino; + res_id[1] = dir->i_generation; - rc = ldlm_lock_match(mds->mds_local_namespace, res_id, LDLM_PLAIN, + rc = ldlm_lock_match(obd->obd_namespace, res_id, LDLM_PLAIN, NULL, 0, lock_mode, &lockh); if (rc == 0) { - LDLM_DEBUG_NOLOCK("enqueue res %Lu", res_id[0]); - rc = ldlm_cli_enqueue(mds->mds_ldlm_client, mds->mds_ldlm_conn, - (struct lustre_handle *)&mds->mds_connh, - NULL, mds->mds_local_namespace, NULL, + LDLM_DEBUG_NOLOCK("enqueue res "LPU64, res_id[0]); + rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL, res_id, LDLM_PLAIN, NULL, 0, lock_mode, - &flags, (void *)mds_lock_callback, - NULL, 0, &lockh); + &flags, ldlm_completion_ast, + mds_blocking_ast, NULL, 0, &lockh); if (rc != ELDLM_OK) { CERROR("lock enqueue: err: %d\n", rc); GOTO(out_create_de, rc = -EIO); @@ -434,25 +646,7 @@ static int mds_getattr_name(int offset, struct ptlrpc_request *req) GOTO(out_create_dchild, rc = -ESTALE); } - if (dchild->d_inode) { - struct mds_body *body; - struct inode *inode = dchild->d_inode; - CDEBUG(D_INODE, "child exists (dir %ld, name %s, ino %ld)\n", - dir->i_ino, name, dchild->d_inode->i_ino); - - body = lustre_msg_buf(req->rq_repmsg, offset); - mds_pack_inode2fid(&body->fid1, inode); - mds_pack_inode2body(body, inode); - if (S_ISREG(inode->i_mode)) { - struct lov_stripe_md *md; - md = lustre_msg_buf(req->rq_repmsg, offset + 1); - md->lmd_size = mds->mds_max_mdsize; - mds_fs_get_md(mds, inode, md); - } - /* now a normal case for intent locking */ - rc = 0; - } else - rc = -ENOENT; + rc = mds_getattr_internal(mds, dchild, req, body, offset); EXIT; out_create_dchild: @@ -467,7 +661,6 @@ out_pre_de: return 0; } - static int mds_getattr(int offset, struct ptlrpc_request *req) { struct mds_obd *mds = mds_req2mds(req); @@ -475,14 +668,18 @@ static int mds_getattr(int offset, struct ptlrpc_request *req) struct dentry *de; struct inode *inode; struct mds_body *body; - int rc, size[2] = {sizeof(*body)}, bufcount = 1; + struct obd_ucred uc; + int rc = 0, size[2] = {sizeof(*body)}, bufcount = 1; ENTRY; body = lustre_msg_buf(req->rq_reqmsg, offset); - push_ctxt(&saved, &mds->mds_ctxt); + uc.ouc_fsuid = body->fsuid; + uc.ouc_fsgid = body->fsgid; + push_ctxt(&saved, &mds->mds_ctxt, &uc); de = mds_fid2dentry(mds, &body->fid1, NULL); if (IS_ERR(de)) { - GOTO(out_pop, rc = -ENOENT); + rc = req->rq_status = -ENOENT; + GOTO(out_pop, PTR_ERR(de)); } inode = de->d_inode; @@ -491,54 +688,32 @@ static int mds_getattr(int offset, struct ptlrpc_request *req) size[1] = mds->mds_max_mdsize; } else if (body->valid & OBD_MD_LINKNAME) { bufcount = 2; - size[1] = inode->i_size; + size[1] = MIN(inode->i_size + 1, body->size); + CDEBUG(D_INODE, "symlink size: %d, reply space: %d\n", + inode->i_size + 1, body->size); + } + + if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) { + CERROR("failed GETATTR_PACK test\n"); + req->rq_status = -ENOMEM; + GOTO(out, rc = -ENOMEM); } rc = lustre_pack_msg(bufcount, size, NULL, &req->rq_replen, &req->rq_repmsg); - if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) { - CERROR("mds: out of memory\n"); + if (rc) { + CERROR("out of memory or FAIL_MDS_GETATTR_PACK\n"); + req->rq_status = rc; GOTO(out, rc); } - if (body->valid & OBD_MD_LINKNAME) { - char *tmp = lustre_msg_buf(req->rq_repmsg, 1); - - rc = inode->i_op->readlink(de, tmp, size[1]); - - if (rc < 0) { - CERROR("readlink failed: %d\n", rc); - GOTO(out, rc); - } - } - - body = lustre_msg_buf(req->rq_repmsg, 0); - body->ino = inode->i_ino; - body->generation = inode->i_generation; - body->atime = inode->i_atime; - body->ctime = inode->i_ctime; - body->mtime = inode->i_mtime; - body->uid = inode->i_uid; - body->gid = inode->i_gid; - body->size = inode->i_size; - body->mode = inode->i_mode; - body->nlink = inode->i_nlink; - body->valid = ~0; /* FIXME: should be more selective */ + req->rq_status = mds_getattr_internal(mds, de, req, body, 0); - if (S_ISREG(inode->i_mode)) { - rc = mds_fs_get_md(mds, inode, - lustre_msg_buf(req->rq_repmsg, 1)); - if (rc < 0) { - CERROR("mds_fs_get_md failed: %d\n", rc); - GOTO(out, rc); - } - } out: l_dput(de); out_pop: pop_ctxt(&saved); - req->rq_status = rc; - RETURN(0); + RETURN(rc); } static int mds_statfs(struct ptlrpc_request *req) @@ -563,56 +738,107 @@ static int mds_statfs(struct ptlrpc_request *req) } osfs = lustre_msg_buf(req->rq_repmsg, 0); memset(osfs, 0, size); - obd_statfs_pack(osfs, &sfs); + statfs_pack(osfs, &sfs); + obd_statfs_pack(osfs, osfs); out: req->rq_status = rc; RETURN(0); } +static struct mds_file_data *mds_handle2mfd(struct lustre_handle *handle) +{ + struct mds_file_data *mfd = NULL; + + if (!handle || !handle->addr) + RETURN(NULL); + + mfd = (struct mds_file_data *)(unsigned long)(handle->addr); + if (!kmem_cache_validate(mds_file_cache, mfd)) + RETURN(NULL); + + if (mfd->mfd_servercookie != handle->cookie) + RETURN(NULL); + + return mfd; +} + +static int mds_store_ea(struct mds_obd *mds, struct ptlrpc_request *req, + struct mds_body *body, struct dentry *de, + struct lov_mds_md *lmm) +{ + struct obd_run_ctxt saved; + struct obd_ucred uc; + void *handle; + int rc, rc2; + + uc.ouc_fsuid = body->fsuid; + uc.ouc_fsgid = body->fsgid; + push_ctxt(&saved, &mds->mds_ctxt, &uc); + handle = mds_fs_start(mds, de->d_inode, MDS_FSOP_SETATTR); + if (!handle) + GOTO(out_ea, rc = -ENOMEM); + + rc = mds_fs_set_md(mds, de->d_inode, handle, lmm); + if (!rc) + rc = mds_update_last_rcvd(mds, handle, req); + + rc2 = mds_fs_commit(mds, de->d_inode, handle); + if (rc2 && !rc) + rc = rc2; +out_ea: + pop_ctxt(&saved); + + return rc; +} + static int mds_open(struct ptlrpc_request *req) { - struct dentry *de; + struct mds_obd *mds = mds_req2mds(req); struct mds_body *body; + struct mds_export_data *med; + struct mds_file_data *mfd; + struct dentry *de; struct file *file; struct vfsmount *mnt; - struct mds_obd *mds = mds_req2mds(req); - struct mds_client_info *mci; __u32 flags; struct list_head *tmp; - struct mds_file_data *mfd; int rc, size = sizeof(*body); ENTRY; - rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg); - if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_OPEN_PACK)) { - CERROR("mds: out of memory\n"); + if (OBD_FAIL_CHECK(OBD_FAIL_MDS_OPEN_PACK)) { + CERROR("test case OBD_FAIL_MDS_OPEN_PACK\n"); req->rq_status = -ENOMEM; - RETURN(0); + RETURN(-ENOMEM); } - mci = mds_uuid_to_mci(mds, ptlrpc_req_to_uuid(req)); - if (!mci) { - CERROR("mds: no mci!\n"); - req->rq_status = -ENOTCONN; - RETURN(0); + rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg); + if (rc) { + CERROR("mds: pack error: rc = %d\n", rc); + req->rq_status = rc; + RETURN(rc); } body = lustre_msg_buf(req->rq_reqmsg, 0); - /* was this animal open already? */ - /* XXX we should only check on re-open, or do a refcount... */ - list_for_each(tmp, &mci->mci_open_head) { - struct mds_file_data *fd; - fd = list_entry(tmp, struct mds_file_data, mfd_list); - if (body->extra == fd->mfd_clientfd && - body->fid1.id == fd->mfd_file->f_dentry->d_inode->i_ino) { - CERROR("Re opening %Ld\n", body->fid1.id); - RETURN(0); + /* was this animal open already and the client lost the reply? */ + /* XXX need some way to detect a reopen, to avoid locked list walks */ + med = &req->rq_export->exp_mds_data; + spin_lock(&med->med_open_lock); + list_for_each(tmp, &med->med_open_head) { + mfd = list_entry(tmp, typeof(*mfd), mfd_list); + if (!memcmp(&mfd->mfd_clienthandle, &body->handle, + sizeof(mfd->mfd_clienthandle)) && + body->fid1.id == mfd->mfd_file->f_dentry->d_inode->i_ino) { + de = mfd->mfd_file->f_dentry; + spin_unlock(&med->med_open_lock); + CERROR("Re opening "LPD64"\n", body->fid1.id); + GOTO(out_pack, rc = 0); } } + spin_unlock(&med->med_open_lock); - OBD_ALLOC(mfd, sizeof(*mfd)); + mfd = kmem_cache_alloc(mds_file_cache, GFP_KERNEL); if (!mfd) { CERROR("mds: out of memory\n"); req->rq_status = -ENOMEM; @@ -620,103 +846,87 @@ static int mds_open(struct ptlrpc_request *req) } de = mds_fid2dentry(mds, &body->fid1, &mnt); - if (IS_ERR(de)) { - req->rq_status = -ENOENT; - RETURN(0); - } + if (IS_ERR(de)) + GOTO(out_free, rc = PTR_ERR(de)); /* check if this inode has seen a delayed object creation */ - if (req->rq_reqmsg->bufcount > 1) { - void *handle; - struct lov_stripe_md *md; - struct inode *inode = de->d_inode; - //struct iattr iattr; - int rc; - - md = lustre_msg_buf(req->rq_reqmsg, 1); - //iattr.ia_mode = inode->i_mode; - - handle = mds_fs_start(mds, de->d_inode, MDS_FSOP_SETATTR); - if (!handle) { - req->rq_status = -ENOMEM; - RETURN(0); - } + if (lustre_msg_get_op_flags(req->rq_reqmsg) & MDS_OPEN_HAS_EA) { + struct lov_mds_md *lmm = lustre_msg_buf(req->rq_reqmsg, 1); - /* XXX error handling */ - rc = mds_fs_set_md(mds, inode, handle, md); - // rc = mds_fs_setattr(mds, de, handle, &iattr); - if (!rc) { - struct obd_run_ctxt saved; - push_ctxt(&saved, &mds->mds_ctxt); - rc = mds_update_last_rcvd(mds, handle, req); - pop_ctxt(&saved); - } else { - req->rq_status = rc; - RETURN(0); - } - /* FIXME: need to return last_rcvd, last_committed */ - - /* FIXME: keep rc intact */ - rc = mds_fs_commit(mds, de->d_inode, handle); + rc = mds_store_ea(mds, req, body, de, lmm); if (rc) { - req->rq_status = rc; - RETURN(0); + l_dput(de); + mntput(mnt); + GOTO(out_free, rc); } } flags = body->flags; + /* dentry_open does a dput(de) and mntput(mnt) on error */ file = dentry_open(de, mnt, flags & ~O_DIRECT); - if (!file || IS_ERR(file)) { - req->rq_status = -EINVAL; - OBD_FREE(mfd, sizeof(*mfd)); - RETURN(0); + if (IS_ERR(file)) { + rc = PTR_ERR(file); + GOTO(out_free, 0); } file->private_data = mfd; mfd->mfd_file = file; - mfd->mfd_clientfd = body->extra; - list_add(&mfd->mfd_list, &mci->mci_open_head); + memcpy(&mfd->mfd_clienthandle, &body->handle, sizeof(body->handle)); + get_random_bytes(&mfd->mfd_servercookie, sizeof(mfd->mfd_servercookie)); + spin_lock(&med->med_open_lock); + list_add(&mfd->mfd_list, &med->med_open_head); + spin_unlock(&med->med_open_lock); +out_pack: body = lustre_msg_buf(req->rq_repmsg, 0); - body->extra = (__u64) (unsigned long)file; + mds_pack_inode2fid(&body->fid1, de->d_inode); + mds_pack_inode2body(body, de->d_inode); + body->handle.addr = (__u64)(unsigned long)mfd; + body->handle.cookie = mfd->mfd_servercookie; + CDEBUG(D_INODE, "llite file "LPX64": addr %p, cookie "LPX64"\n", + mfd->mfd_clienthandle.addr, mfd, mfd->mfd_servercookie); + RETURN(0); + +out_free: + mfd->mfd_servercookie = DEAD_HANDLE_MAGIC; + kmem_cache_free(mds_file_cache, mfd); + req->rq_status = rc; RETURN(0); } static int mds_close(struct ptlrpc_request *req) { - struct dentry *de; + struct mds_export_data *med = &req->rq_export->exp_mds_data; struct mds_body *body; - struct file *file; - struct mds_obd *mds = mds_req2mds(req); - struct vfsmount *mnt; struct mds_file_data *mfd; int rc; ENTRY; - rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg); - if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_CLOSE_PACK)) { - CERROR("mds: out of memory\n"); - req->rq_status = -ENOMEM; - RETURN(0); - } - body = lustre_msg_buf(req->rq_reqmsg, 0); - de = mds_fid2dentry(mds, &body->fid1, &mnt); - if (IS_ERR(de)) { - req->rq_status = -ENOENT; - RETURN(0); + + mfd = mds_handle2mfd(&body->handle); + if (!mfd) { + CERROR("no handle for file close "LPD64 + ": addr "LPX64", cookie "LPX64"\n", + body->fid1.id, body->handle.addr, body->handle.cookie); + RETURN(-ESTALE); } - file = (struct file *)(unsigned long)body->extra; - if (!file->f_dentry) - LBUG(); - mfd = (struct mds_file_data *)file->private_data; - list_del(&mfd->mfd_list); - OBD_FREE(mfd, sizeof(*mfd)); + spin_lock(&med->med_open_lock); + req->rq_status = mds_close_mfd(mfd, med); + spin_unlock(&med->med_open_lock); - req->rq_status = filp_close(file, 0); - l_dput(de); - mntput(mnt); + if (OBD_FAIL_CHECK(OBD_FAIL_MDS_CLOSE_PACK)) { + CERROR("test case OBD_FAIL_MDS_CLOSE_PACK\n"); + req->rq_status = -ENOMEM; + RETURN(-ENOMEM); + } + + rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg); + if (rc) { + CERROR("mds: lustre_pack_msg: rc = %d\n", rc); + req->rq_status = rc; + } RETURN(0); } @@ -727,9 +937,10 @@ static int mds_readpage(struct ptlrpc_request *req) struct vfsmount *mnt; struct dentry *de; struct file *file; - struct mds_body *body; + struct mds_body *body, *repbody; struct obd_run_ctxt saved; int rc, size = sizeof(*body); + struct obd_ucred uc; ENTRY; rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg); @@ -739,7 +950,9 @@ static int mds_readpage(struct ptlrpc_request *req) } body = lustre_msg_buf(req->rq_reqmsg, 0); - push_ctxt(&saved, &mds->mds_ctxt); + uc.ouc_fsuid = body->fsuid; + uc.ouc_fsgid = body->fsgid; + push_ctxt(&saved, &mds->mds_ctxt, &uc); de = mds_fid2dentry(mds, &body->fid1, &mnt); if (IS_ERR(de)) GOTO(out_pop, rc = PTR_ERR(de)); @@ -751,9 +964,14 @@ static int mds_readpage(struct ptlrpc_request *req) if (IS_ERR(file)) GOTO(out_pop, rc = PTR_ERR(file)); + repbody = lustre_msg_buf(req->rq_repmsg, 0); + repbody->size = file->f_dentry->d_inode->i_size; + repbody->valid = OBD_MD_FLSIZE; + /* to make this asynchronous make sure that the handling function doesn't send a reply when this function completes. Instead a callback function would send the reply */ + /* note: in case of an error, dentry_open puts dentry */ rc = mds_sendpage(req, file, body->size); filp_close(file, 0); @@ -791,18 +1009,10 @@ int mds_handle(struct ptlrpc_request *req) GOTO(out, rc); } - if (req->rq_reqmsg->type != PTL_RPC_MSG_REQUEST) { - CERROR("lustre_mds: wrong packet type sent %d\n", - req->rq_reqmsg->type); - GOTO(out, rc = -EINVAL); - } - - if (req->rq_reqmsg->opc != MDS_CONNECT && - req->rq_export == NULL) + if (req->rq_reqmsg->opc != MDS_CONNECT && req->rq_export == NULL) GOTO(out, rc = -ENOTCONN); - if (strcmp(req->rq_obd->obd_type->typ_name, "mds") != 0) - GOTO(out, rc = -EINVAL); + LASSERT(!strcmp(req->rq_obd->obd_type->typ_name, LUSTRE_MDT_NAME)); switch (req->rq_reqmsg->opc) { case MDS_CONNECT: @@ -857,13 +1067,13 @@ int mds_handle(struct ptlrpc_request *req) rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg); if (rc) { - rc = req->rq_status = -ENOMEM; + req->rq_status = rc; break; } rc = mds_reint(0, req); OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_NET_REP, 0); break; - } + } case MDS_OPEN: CDEBUG(D_INODE, "open\n"); @@ -877,6 +1087,27 @@ int mds_handle(struct ptlrpc_request *req) rc = mds_close(req); break; + case LDLM_ENQUEUE: + CDEBUG(D_INODE, "enqueue\n"); + OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0); + rc = ldlm_handle_enqueue(req); + if (rc) + break; + RETURN(0); + case LDLM_CONVERT: + CDEBUG(D_INODE, "convert\n"); + OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0); + rc = ldlm_handle_convert(req); + if (rc) + break; + RETURN(0); + case LDLM_BL_CALLBACK: + case LDLM_CP_CALLBACK: + CDEBUG(D_INODE, "callback\n"); + CERROR("callbacks should not happen on MDS\n"); + LBUG(); + OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0); + break; default: rc = ptlrpc_error(req->rq_svc, req); RETURN(rc); @@ -884,24 +1115,23 @@ int mds_handle(struct ptlrpc_request *req) EXIT; - if (!rc) { + if (!rc) { + struct mds_export_data *med = &req->rq_export->exp_mds_data; struct mds_obd *mds = mds_req2mds(req); - req->rq_repmsg->last_rcvd = HTON__u64(mds->mds_last_rcvd); + + req->rq_repmsg->last_xid = + HTON__u64(le64_to_cpu(med->med_mcd->mcd_last_xid)); req->rq_repmsg->last_committed = HTON__u64(mds->mds_last_committed); - CDEBUG(D_INFO, "last_rcvd %Lu, last_committed %Lu, xid %d\n", + CDEBUG(D_INFO, "last_rcvd ~%Lu, last_committed %Lu, xid %d\n", (unsigned long long)mds->mds_last_rcvd, (unsigned long long)mds->mds_last_committed, cpu_to_le32(req->rq_xid)); } out: - /* Still not 100% sure whether we should reply with the server - * last_rcvd or that of this client. I'm not sure it even makes - * a difference on a per-client basis, because last_rcvd is global - * and we are not supposed to allow transactions while in recovery. - */ if (rc) { - CERROR("mds: processing error %d\n", rc); + CERROR("mds: processing error (opcode %d): %d\n", + req->rq_reqmsg->opc, rc); ptlrpc_error(req->rq_svc, req); } else { CDEBUG(D_NET, "sending reply\n"); @@ -914,11 +1144,14 @@ int mds_handle(struct ptlrpc_request *req) * also the last_rcvd value to disk. If we don't have a clean shutdown, * then the server last_rcvd value may be less than that of the clients. * This will alert us that we may need to do client recovery. + * + * Assumes we are already in the server filesystem context. + * + * Also assumes for mds_last_rcvd that we are not modifying it (no locking). */ static int mds_update_server_data(struct mds_obd *mds) { - struct obd_run_ctxt saved; struct mds_server_data *msd = mds->mds_server_data; struct file *filp = mds->mds_rcvd_filp; loff_t off = 0; @@ -930,7 +1163,6 @@ int mds_update_server_data(struct mds_obd *mds) CDEBUG(D_SUPER, "MDS mount_count is %Lu, last_rcvd is %Lu\n", (unsigned long long)mds->mds_mount_count, (unsigned long long)mds->mds_last_rcvd); - push_ctxt(&saved, &mds->mds_ctxt); rc = lustre_fwrite(filp, (char *)msd, sizeof(*msd), &off); if (rc != sizeof(*msd)) { CERROR("error writing MDS server data: rc = %d\n", rc); @@ -938,8 +1170,11 @@ int mds_update_server_data(struct mds_obd *mds) RETURN(-EIO); RETURN(rc); } +#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)) rc = fsync_dev(filp->f_dentry->d_inode->i_rdev); - pop_ctxt(&saved); +#else + rc = file_fsync(filp, filp->f_dentry, 1); +#endif if (rc) CERROR("error flushing MDS server data: rc = %d\n", rc); @@ -950,21 +1185,22 @@ int mds_update_server_data(struct mds_obd *mds) static int mds_recover(struct obd_device *obddev) { struct mds_obd *mds = &obddev->u.mds; + struct obd_run_ctxt saved; int rc; /* This happens at the end when recovery is complete */ ++mds->mds_mount_count; + push_ctxt(&saved, &mds->mds_ctxt, NULL); rc = mds_update_server_data(mds); + pop_ctxt(&saved); return rc; } - /* mount the file system (secretly) */ static int mds_setup(struct obd_device *obddev, obd_count len, void *buf) { struct obd_ioctl_data* data = buf; - struct obd_export *export; struct mds_obd *mds = &obddev->u.mds; struct vfsmount *mnt; int rc = 0; @@ -986,86 +1222,38 @@ static int mds_setup(struct obd_device *obddev, obd_count len, void *buf) GOTO(err_kfree, rc); } + CERROR("%s: mnt is %p\n", data->ioc_inlbuf1, mnt); mds->mds_sb = mnt->mnt_root->d_inode->i_sb; if (!mds->mds_sb) GOTO(err_put, rc = -ENODEV); - mds->mds_max_mdsize = sizeof(struct lov_stripe_md); - rc = mds_fs_setup(mds, mnt); + spin_lock_init(&mds->mds_last_lock); + mds->mds_max_mdsize = sizeof(struct lov_mds_md); + rc = mds_fs_setup(obddev, mnt); if (rc) { CERROR("MDS filesystem method init failed: rc = %d\n", rc); GOTO(err_put, rc); } - mds->mds_service = ptlrpc_init_svc(64 * 1024, MDS_REQUEST_PORTAL, - MDC_REPLY_PORTAL, "self",mds_handle); - if (!mds->mds_service) { - CERROR("failed to start service\n"); - GOTO(err_fs, rc = -EINVAL); - } - - rc = ptlrpc_start_thread(obddev, mds->mds_service, "lustre_mds"); - if (rc) { - CERROR("cannot start thread: rc = %d\n", rc); - GOTO(err_svc, rc); - } - - rc = -ENOENT; - mds->mds_ldlm_conn = ptlrpc_uuid_to_connection("self"); - if (!mds->mds_ldlm_conn) { - mds_cleanup(obddev); - GOTO(err_thread, rc); - } - obddev->obd_namespace = ldlm_namespace_new("mds_server", LDLM_NAMESPACE_SERVER); if (obddev->obd_namespace == NULL) { - LBUG(); mds_cleanup(obddev); - GOTO(err_thread, rc); + GOTO(err_fs, rc = -ENOMEM); } - mds->mds_local_namespace = - ldlm_namespace_new("mds_client", LDLM_NAMESPACE_CLIENT); - if (mds->mds_local_namespace == NULL) { - LBUG(); - mds_cleanup(obddev); - GOTO(err_thread, rc); - } - - OBD_ALLOC(mds->mds_ldlm_client, sizeof(*mds->mds_ldlm_client)); - if (mds->mds_ldlm_client == NULL) { - LBUG(); - mds_cleanup(obddev); - GOTO(err_thread, rc); - } - ptlrpc_init_client(NULL, NULL, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL, - mds->mds_ldlm_client); - mds->mds_ldlm_client->cli_target_devno = obddev->obd_minor; - mds->mds_ldlm_client->cli_name = "mds ldlm"; rc = mds_recover(obddev); if (rc) - GOTO(err_thread, rc); + GOTO(err_fs, rc); - rc = class_connect(&mds->mds_connh, obddev); - if (rc) - GOTO(err_thread, rc); - export = class_conn2export(&mds->mds_connh); - if (!export) - LBUG(); - export->exp_connection = mds->mds_ldlm_conn; + ptlrpc_init_client(LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL, + "mds_ldlm_client", &obddev->obd_ldlm_client); RETURN(0); - - -err_thread: - ptlrpc_stop_all_threads(mds->mds_service); -err_svc: - ptlrpc_unregister_service(mds->mds_service); err_fs: - mds_fs_cleanup(mds); + mds_fs_cleanup(obddev); err_put: unlock_kernel(); mntput(mds->mds_vfsmnt); @@ -1075,30 +1263,21 @@ err_kfree: kfree(mds->mds_fstype); err_dec: MOD_DEC_USE_COUNT; - return rc; + RETURN(rc); } -static int mds_cleanup(struct obd_device * obddev) +static int mds_cleanup(struct obd_device *obddev) { struct super_block *sb; struct mds_obd *mds = &obddev->u.mds; - + struct obd_run_ctxt saved; ENTRY; - class_disconnect(&mds->mds_connh); - - - if ( !list_empty(&obddev->obd_exports) ) { - CERROR("still has exports!\n"); - RETURN(-EBUSY); - } - - ptlrpc_stop_all_threads(mds->mds_service); - ptlrpc_unregister_service(mds->mds_service); sb = mds->mds_sb; if (!mds->mds_sb) RETURN(0); + push_ctxt(&saved, &mds->mds_ctxt, NULL); mds_update_server_data(mds); if (mds->mds_rcvd_filp) { @@ -1108,35 +1287,247 @@ static int mds_cleanup(struct obd_device * obddev) if (rc) CERROR("last_rcvd file won't close, rc=%d\n", rc); } + pop_ctxt(&saved); unlock_kernel(); mntput(mds->mds_vfsmnt); mds->mds_sb = 0; kfree(mds->mds_fstype); - ldlm_namespace_free(mds->mds_local_namespace); ldlm_namespace_free(obddev->obd_namespace); - if (mds->mds_ldlm_conn != NULL) - ptlrpc_put_connection(mds->mds_ldlm_conn); - - OBD_FREE(mds->mds_ldlm_client, sizeof(*mds->mds_ldlm_client)); - lock_kernel(); #ifdef CONFIG_DEV_RDONLY dev_clear_rdonly(2); #endif - mds_fs_cleanup(mds); + mds_fs_cleanup(obddev); + + MOD_DEC_USE_COUNT; + RETURN(0); +} + +static int ldlm_intent_policy(struct ldlm_lock *lock, void *req_cookie, + ldlm_mode_t mode, int flags, void *data) +{ + struct ptlrpc_request *req = req_cookie; + int rc = 0; + ENTRY; + + if (!req_cookie) + RETURN(0); + + if (req->rq_reqmsg->bufcount > 1) { + /* an intent needs to be considered */ + struct ldlm_intent *it = lustre_msg_buf(req->rq_reqmsg, 1); + struct mds_obd *mds= &req->rq_export->exp_obd->u.mds; + struct mds_body *mds_rep; + struct ldlm_reply *rep; + __u64 new_resid[3] = {0, 0, 0}, old_res; + int rc, size[3] = {sizeof(struct ldlm_reply), + sizeof(struct mds_body), + mds->mds_max_mdsize}; + + it->opc = NTOH__u64(it->opc); + + LDLM_DEBUG(lock, "intent policy, opc: %s", + ldlm_it2str(it->opc)); + + rc = lustre_pack_msg(3, size, NULL, &req->rq_replen, + &req->rq_repmsg); + if (rc) { + rc = req->rq_status = -ENOMEM; + RETURN(rc); + } + + rep = lustre_msg_buf(req->rq_repmsg, 0); + rep->lock_policy_res1 = 1; + + /* execute policy */ + switch ((long)it->opc) { + case IT_CREAT|IT_OPEN: + rc = mds_reint(2, req); + if (rc || (req->rq_status != 0 && + req->rq_status != -EEXIST)) { + rep->lock_policy_res2 = req->rq_status; + RETURN(ELDLM_LOCK_ABORTED); + } + break; + case IT_CREAT: + case IT_MKDIR: + case IT_MKNOD: + case IT_RENAME2: + case IT_LINK2: + case IT_RMDIR: + case IT_SYMLINK: + case IT_UNLINK: + rc = mds_reint(2, req); + if (rc || (req->rq_status != 0 && + req->rq_status != -EISDIR && + req->rq_status != -ENOTDIR)) { + rep->lock_policy_res2 = req->rq_status; + RETURN(ELDLM_LOCK_ABORTED); + } + break; + case IT_GETATTR: + case IT_LOOKUP: + case IT_OPEN: + case IT_READDIR: + case IT_READLINK: + case IT_RENAME: + case IT_LINK: + case IT_SETATTR: + rc = mds_getattr_name(2, req); + /* FIXME: we need to sit down and decide on who should + * set req->rq_status, who should return negative and + * positive return values, and what they all mean. */ + if (rc || req->rq_status != 0) { + rep->lock_policy_res2 = req->rq_status; + RETURN(ELDLM_LOCK_ABORTED); + } + break; + case IT_READDIR|IT_OPEN: + LBUG(); + break; + default: + CERROR("Unhandled intent\n"); + LBUG(); + } + + /* We don't bother returning a lock to the client for a file + * or directory we are removing. + * + * As for link and rename, there is no reason for the client + * to get a lock on the target at this point. If they are + * going to modify the file/directory later they will get a + * lock at that time. + */ + if (it->opc & (IT_UNLINK | IT_RMDIR | IT_LINK | IT_LINK2 | + IT_RENAME | IT_RENAME2)) + RETURN(ELDLM_LOCK_ABORTED); + + rep->lock_policy_res2 = req->rq_status; + mds_rep = lustre_msg_buf(req->rq_repmsg, 1); + + /* If the client is about to open a file that doesn't have an MD + * stripe record, it's going to need a write lock. */ + if (it->opc & IT_OPEN) { + struct lov_mds_md *lmm = + lustre_msg_buf(req->rq_repmsg, 2); + if (lmm->lmm_easize == 0) { + LDLM_DEBUG(lock, "open with no EA; returning PW" + " lock"); + lock->l_req_mode = LCK_PW; + } + } + + if (flags & LDLM_FL_INTENT_ONLY) { + LDLM_DEBUG(lock, "INTENT_ONLY, aborting lock"); + RETURN(ELDLM_LOCK_ABORTED); + } + /* Give the client a lock on the child object, instead of the + * parent that it requested. */ + new_resid[0] = NTOH__u32(mds_rep->ino); + new_resid[1] = NTOH__u32(mds_rep->generation); + if (new_resid[0] == 0) + LBUG(); + old_res = lock->l_resource->lr_name[0]; + + ldlm_lock_change_resource(lock, new_resid); + if (lock->l_resource == NULL) { + LBUG(); + RETURN(-ENOMEM); + } + LDLM_DEBUG(lock, "intent policy, old res %ld", + (long)old_res); + RETURN(ELDLM_LOCK_CHANGED); + } else { + int size = sizeof(struct ldlm_reply); + rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, + &req->rq_repmsg); + if (rc) { + LBUG(); + RETURN(-ENOMEM); + } + } + RETURN(rc); +} + +int mds_attach(struct obd_device *dev, + obd_count len, void *data) +{ + int rc; + rc = lprocfs_reg_obd(dev, (lprocfs_vars_t*)status_var_nm_1, (void*)dev); + return rc; +} + +int mds_detach(struct obd_device *dev) +{ + int rc; + rc = lprocfs_dereg_obd(dev); + return rc; + +} + +#define MDT_NUM_THREADS 8 +static int mdt_setup(struct obd_device *obddev, obd_count len, void *buf) +{ + int i; + // struct obd_ioctl_data* data = buf; + struct mds_obd *mds = &obddev->u.mds; + int rc = 0; + ENTRY; + + MOD_INC_USE_COUNT; + + mds->mds_service = ptlrpc_init_svc(MDS_NEVENTS, MDS_NBUFS, + MDS_BUFSIZE, MDS_MAXREQSIZE, + MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL, + "self", mds_handle, "mds"); + if (!mds->mds_service) { + CERROR("failed to start service\n"); + GOTO(err_dec, rc = -EINVAL); + } + + for (i = 0; i < MDT_NUM_THREADS; i++) { + char name[32]; + sprintf(name, "lustre_MDT_%02d", i); + rc = ptlrpc_start_thread(obddev, mds->mds_service, name); + if (rc) { + CERROR("cannot start MDT thread #%d: rc %d\n", i, rc); + GOTO(err_thread, rc); + } + } + + RETURN(0); + +err_thread: + ptlrpc_stop_all_threads(mds->mds_service); + ptlrpc_unregister_service(mds->mds_service); +err_dec: + MOD_DEC_USE_COUNT; + RETURN(rc); +} + + +static int mdt_cleanup(struct obd_device *obddev) +{ + struct mds_obd *mds = &obddev->u.mds; + ENTRY; + + ptlrpc_stop_all_threads(mds->mds_service); + ptlrpc_unregister_service(mds->mds_service); MOD_DEC_USE_COUNT; RETURN(0); } -extern int mds_iocontrol(long cmd, struct lustre_handle *conn, - int len, void *karg, void *uarg); +extern int mds_iocontrol(long cmd, struct lustre_handle *conn, + int len, void *karg, void *uarg); /* use obd ops to offer management infrastructure */ static struct obd_ops mds_obd_ops = { + o_attach: mds_attach, + o_detach: mds_detach, o_connect: mds_connect, o_disconnect: mds_disconnect, o_setup: mds_setup, @@ -1144,20 +1535,41 @@ static struct obd_ops mds_obd_ops = { o_iocontrol: mds_iocontrol }; +static struct obd_ops mdt_obd_ops = { + o_setup: mdt_setup, + o_cleanup: mdt_cleanup, +}; + + static int __init mds_init(void) { - inter_module_register("mds_reint", THIS_MODULE, &mds_reint); - inter_module_register("mds_getattr_name", THIS_MODULE, - &mds_getattr_name); - class_register_type(&mds_obd_ops, LUSTRE_MDS_NAME); + + mds_file_cache = kmem_cache_create("ll_mds_file_data", + sizeof(struct mds_file_data), + 0, 0, NULL, NULL); + if (mds_file_cache == NULL) + return -ENOMEM; + + class_register_type(&mds_obd_ops, (lprocfs_vars_t*)status_class_var, + LUSTRE_MDS_NAME); + class_register_type(&mdt_obd_ops, 0, LUSTRE_MDT_NAME); + + ldlm_register_intent(ldlm_intent_policy); + return 0; + } static void __exit mds_exit(void) { - inter_module_unregister("mds_reint"); - inter_module_unregister("mds_getattr_name"); + + + ldlm_unregister_intent(); class_unregister_type(LUSTRE_MDS_NAME); + class_unregister_type(LUSTRE_MDT_NAME); + if (kmem_cache_destroy(mds_file_cache)) + CERROR("couldn't free MDS file cache\n"); + } MODULE_AUTHOR("Cluster File Systems ");