X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fmds%2Fhandler.c;h=6bd5e0c6981853169b337489ef44e0c0079d245a;hp=11b3dda59bd4efd0140e37dbc6e1eaea254f9e7a;hb=400b0681017091fab9cef9bd00e0f536e1793dcc;hpb=cf372c0e4ab0097ab39484d4d40db21dc7e823d2 diff --git a/lustre/mds/handler.c b/lustre/mds/handler.c index 11b3dda..6bd5e0c 100644 --- a/lustre/mds/handler.c +++ b/lustre/mds/handler.c @@ -32,14 +32,26 @@ #include #include #include +#include +#include +#include +#if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0)) +#include +#endif +#include + +static kmem_cache_t *mds_file_cache; -extern int mds_get_lovtgts(struct obd_device *obd, int tgt_count, - uuid_t *uuidarray); -extern int mds_get_lovdesc(struct obd_device *obd, struct lov_desc *desc); +extern int mds_get_lovtgts(struct mds_obd *obd, int tgt_count, + obd_uuid_t *uuidarray); +extern int mds_get_lovdesc(struct mds_obd *obd, struct lov_desc *desc); extern int mds_update_last_rcvd(struct mds_obd *mds, void *handle, struct ptlrpc_request *req); static int mds_cleanup(struct obd_device * obddev); +extern lprocfs_vars_t status_var_nm_1[]; +extern lprocfs_vars_t status_class_var[]; + inline struct mds_obd *mds_req2mds(struct ptlrpc_request *req) { return &req->rq_export->exp_obd->u.mds; @@ -48,9 +60,9 @@ inline struct mds_obd *mds_req2mds(struct ptlrpc_request *req) static int mds_bulk_timeout(void *data) { struct ptlrpc_bulk_desc *desc = data; - + ENTRY; - CERROR("(not yet) starting recovery of client %p\n", desc->b_client); + CERROR("(not yet) starting recovery of client %p\n", desc->bd_client); RETURN(1); } @@ -83,10 +95,10 @@ static int mds_sendpage(struct ptlrpc_request *req, struct file *file, if (rc != PAGE_SIZE) GOTO(cleanup_buf, rc = -EIO); - bulk->b_xid = req->rq_xid; - bulk->b_buf = buf; - bulk->b_buflen = PAGE_SIZE; - desc->b_portal = MDS_BULK_PORTAL; + bulk->bp_xid = req->rq_xid; + bulk->bp_buf = buf; + bulk->bp_buflen = PAGE_SIZE; + desc->bd_portal = MDS_BULK_PORTAL; rc = ptlrpc_send_bulk(desc); if (rc) @@ -100,7 +112,7 @@ static int mds_sendpage(struct ptlrpc_request *req, struct file *file, } lwi = LWI_TIMEOUT(obd_timeout * HZ, mds_bulk_timeout, desc); - rc = l_wait_event(desc->b_waitq, desc->b_flags & PTL_BULK_FL_SENT, &lwi); + rc = l_wait_event(desc->bd_waitq, desc->bd_flags & PTL_BULK_FL_SENT, &lwi); if (rc) { if (rc != -ETIMEDOUT) LBUG(); @@ -132,7 +144,7 @@ struct dentry *mds_name2locked_dentry(struct obd_device *obd, int dir_lock_mode) { struct dentry *dchild; - int flags, rc; + int flags = 0, rc; __u64 res_id[3] = {0}; ENTRY; @@ -153,6 +165,7 @@ struct dentry *mds_name2locked_dentry(struct obd_device *obd, RETURN(dchild); res_id[0] = dchild->d_inode->i_ino; + res_id[1] = dchild->d_inode->i_generation; rc = ldlm_match_or_enqueue(NULL, NULL, obd->obd_namespace, NULL, res_id, LDLM_PLAIN, NULL, 0, lock_mode, &flags, ldlm_completion_ast, @@ -172,7 +185,7 @@ struct dentry *mds_fid2locked_dentry(struct obd_device *obd, struct ll_fid *fid, { struct mds_obd *mds = &obd->u.mds; struct dentry *de = mds_fid2dentry(mds, fid, mnt), *retval = de; - int flags, rc; + int flags = 0, rc; __u64 res_id[3] = {0}; ENTRY; @@ -180,6 +193,7 @@ struct dentry *mds_fid2locked_dentry(struct obd_device *obd, struct ll_fid *fid, RETURN(de); res_id[0] = de->d_inode->i_ino; + res_id[1] = de->d_inode->i_generation; rc = ldlm_match_or_enqueue(NULL, NULL, obd->obd_namespace, NULL, res_id, LDLM_PLAIN, NULL, 0, lock_mode, &flags, ldlm_completion_ast, @@ -192,6 +206,10 @@ struct dentry *mds_fid2locked_dentry(struct obd_device *obd, struct ll_fid *fid, RETURN(retval); } +#ifndef DCACHE_DISCONNECTED +#define DCACHE_DISCONNECTED DCACHE_NFSD_DISCONNECTED +#endif + /* Look up an entry by inode number. */ struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid, struct vfsmount **mnt) @@ -231,7 +249,7 @@ struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid, spin_lock(&dcache_lock); list_for_each(lp, &inode->i_dentry) { result = list_entry(lp, struct dentry, d_alias); - if (!(result->d_flags & DCACHE_NFSD_DISCONNECTED)) { + if (!(result->d_flags & DCACHE_DISCONNECTED)) { dget_locked(result); result->d_vfs_flags |= DCACHE_REFERENCED; spin_unlock(&dcache_lock); @@ -249,7 +267,7 @@ struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid, } if (mnt) mntget(*mnt); - result->d_flags |= DCACHE_NFSD_DISCONNECTED; + result->d_flags |= DCACHE_DISCONNECTED; return result; } @@ -260,9 +278,11 @@ struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid, * on the server, etc. */ static int mds_connect(struct lustre_handle *conn, struct obd_device *obd, - char *cluuid) + obd_uuid_t cluuid, struct recovd_obd *recovd, + ptlrpc_recovery_cb_t recover) { struct obd_export *exp; + struct mds_export_data *med; struct mds_client_data *mcd; struct list_head *p; int rc; @@ -273,37 +293,55 @@ static int mds_connect(struct lustre_handle *conn, struct obd_device *obd, MOD_INC_USE_COUNT; + spin_lock(&obd->obd_dev_lock); list_for_each(p, &obd->obd_exports) { - exp = list_entry(p, struct obd_export, exp_chain); + exp = list_entry(p, struct obd_export, exp_obd_chain); mcd = exp->exp_mds_data.med_mcd; if (!memcmp(cluuid, mcd->mcd_uuid, sizeof(mcd->mcd_uuid))) { - CDEBUG(D_INFO, "existing export for UUID '%s' at %p\n", - cluuid, exp); LASSERT(exp->exp_obd == obd); - exp->exp_rconnh.addr = conn->addr; - exp->exp_rconnh.cookie = conn->cookie; + if (!list_empty(&exp->exp_conn_chain)) { + CERROR("existing uuid/export, list not empty!\n"); + spin_unlock(&obd->obd_dev_lock); + /* XXX should we MOD_DEC_USE_COUNT; here? */ + RETURN(-EALREADY); + } conn->addr = (__u64) (unsigned long)exp; conn->cookie = exp->exp_cookie; + spin_unlock(&obd->obd_dev_lock); + CDEBUG(D_INFO, "existing export for UUID '%s' at %p\n", + cluuid, exp); CDEBUG(D_IOCTL,"connect: addr %Lx cookie %Lx\n", (long long)conn->addr, (long long)conn->cookie); + MOD_DEC_USE_COUNT; RETURN(0); } } + spin_unlock(&obd->obd_dev_lock); + /* XXX There is a small race between checking the list and adding a + * new connection for the same UUID, but the real threat (list + * corruption when multiple different clients connect) is solved. + */ rc = class_connect(conn, obd, cluuid); if (rc) GOTO(out_dec, rc); exp = class_conn2export(conn); LASSERT(exp); + med = &exp->exp_mds_data; OBD_ALLOC(mcd, sizeof(*mcd)); if (!mcd) { CERROR("mds: out of memory for client data\n"); GOTO(out_export, rc = -ENOMEM); } + memcpy(mcd->mcd_uuid, cluuid, sizeof(mcd->mcd_uuid)); - exp->exp_mds_data.med_mcd = mcd; - rc = mds_client_add(&exp->exp_mds_data, -1); + med->med_mcd = mcd; + + INIT_LIST_HEAD(&med->med_open_head); + spin_lock_init(&med->med_open_lock); + + rc = mds_client_add(med, -1); if (rc) GOTO(out_mdc, rc); @@ -319,31 +357,73 @@ out_dec: return rc; } +/* Call with med->med_open_lock held, please. */ +inline int mds_close_mfd(struct mds_file_data *mfd, struct mds_export_data *med) +{ + struct file *file = mfd->mfd_file; + LASSERT(file->private_data == mfd); + + list_del(&mfd->mfd_list); + mfd->mfd_servercookie = DEAD_HANDLE_MAGIC; + kmem_cache_free(mds_file_cache, mfd); + + return filp_close(file, 0); +} + static int mds_disconnect(struct lustre_handle *conn) { - struct obd_export *exp; + struct obd_export *export = class_conn2export(conn); + struct list_head *tmp, *n; + struct mds_export_data *med = &export->exp_mds_data; int rc; + ENTRY; - exp = class_conn2export(conn); - if (!exp) - RETURN(-EINVAL); + /* + * Close any open files. + */ + spin_lock(&med->med_open_lock); + list_for_each_safe(tmp, n, &med->med_open_head) { + struct mds_file_data *mfd = + list_entry(tmp, struct mds_file_data, mfd_list); + rc = mds_close_mfd(mfd, med); + if (rc) { + /* XXX better diagnostics, with file path and stuff */ + CDEBUG(D_INODE, "Error %d closing mfd %p\n", rc, mfd); + } + } + spin_unlock(&med->med_open_lock); - rc = mds_client_free(&exp->exp_mds_data); - if (rc) - CERROR("error freeing client data: rc = %d\n", rc); + ldlm_cancel_locks_for_export(export); + mds_client_free(export); rc = class_disconnect(conn); if (!rc) MOD_DEC_USE_COUNT; - return rc; + RETURN(rc); +} + +/* + * XXX This is NOT guaranteed to flush all transactions to disk (even though + * it is equivalent to calling sync()) because it only _starts_ the flush + * and does not wait for completion. It's better than nothing though. + * What we really want is a mild form of fsync_dev_lockfs(), but it is + * non-standard, or enabling do_sync_supers in ext3, just for this call. + */ +static void mds_fsync_super(struct super_block *sb) +{ + lock_kernel(); + lock_super(sb); + if (sb->s_dirt && sb->s_op && sb->s_op->write_super) + sb->s_op->write_super(sb); + unlock_super(sb); + unlock_kernel(); } static int mds_getstatus(struct ptlrpc_request *req) { struct mds_obd *mds = mds_req2mds(req); struct mds_body *body; - struct mds_export_data *med = &req->rq_export->exp_mds_data; int rc, size = sizeof(*body); ENTRY; @@ -354,19 +434,19 @@ static int mds_getstatus(struct ptlrpc_request *req) RETURN(0); } - body = lustre_msg_buf(req->rq_reqmsg, 0); - mds_unpack_body(body); + /* Flush any outstanding transactions to disk so the client will + * get the latest last_committed value and can drop their local + * requests if they have any. This would be fsync_super() if it + * was exported. + */ + mds_fsync_super(mds->mds_sb); - /* Anything we need to do here with the client's trans no or so? */ body = lustre_msg_buf(req->rq_repmsg, 0); memcpy(&body->fid1, &mds->mds_rootfid, sizeof(body->fid1)); - LASSERT(med->med_mcd); - - /* mcd_last_xid is is stored in little endian on the disk and - mds_pack_rep_body converts it to network order */ - req->rq_repmsg->last_xid = le32_to_cpu(med->med_mcd->mcd_last_xid); - mds_pack_rep_body(req); + /* the last_committed and last_xid fields are filled in for all + * replies already - no need to do so here also. + */ RETURN(0); } @@ -392,23 +472,23 @@ static int mds_getlovinfo(struct ptlrpc_request *req) } desc = lustre_msg_buf(req->rq_repmsg, 0); - rc = mds_get_lovdesc(req->rq_obd, desc); - if (rc != 0 ) { + rc = mds_get_lovdesc(mds, desc); + if (rc) { CERROR("mds_get_lovdesc error %d", rc); req->rq_status = rc; RETURN(0); } - tgt_count = NTOH__u32(desc->ld_tgt_count); - if (tgt_count * sizeof(uuid_t) > streq->repbuf) { + tgt_count = le32_to_cpu(desc->ld_tgt_count); + if (tgt_count * sizeof(obd_uuid_t) > streq->repbuf) { CERROR("too many targets, enlarge client buffers\n"); req->rq_status = -ENOSPC; RETURN(0); } - mds->mds_max_mdsize = sizeof(struct lov_mds_md) + + mds->mds_max_mdsize = sizeof(struct lov_mds_md) + tgt_count * sizeof(struct lov_object_id); - rc = mds_get_lovtgts(req->rq_obd, tgt_count, + rc = mds_get_lovtgts(mds, tgt_count, lustre_msg_buf(req->rq_repmsg, 1)); if (rc) { CERROR("get_lovtgts error %d\n", rc); @@ -419,11 +499,17 @@ static int mds_getlovinfo(struct ptlrpc_request *req) } int mds_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, - void *data, __u32 data_len) + void *data, __u32 data_len, int flag) { int do_ast; ENTRY; + if (flag == LDLM_CB_CANCELING) { + /* Don't need to do anything here. */ + RETURN(0); + } + + /* XXX layering violation! -phil */ l_lock(&lock->l_resource->lr_namespace->ns_lock); lock->l_flags |= LDLM_FL_CBPENDING; do_ast = (!lock->l_readers && !lock->l_writers); @@ -446,9 +532,9 @@ int mds_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, static int mds_getattr_internal(struct mds_obd *mds, struct dentry *dentry, struct ptlrpc_request *req, - int request_off, int reply_off) + struct mds_body *reqbody, int reply_off) { - struct mds_body *request_body, *body; + struct mds_body *body; struct inode *inode = dentry->d_inode; int rc; ENTRY; @@ -456,30 +542,17 @@ static int mds_getattr_internal(struct mds_obd *mds, struct dentry *dentry, if (inode == NULL) RETURN(-ENOENT); - /* Did the client request the link name? */ - request_body = lustre_msg_buf(req->rq_reqmsg, request_off); body = lustre_msg_buf(req->rq_repmsg, reply_off); - if ((body->valid & OBD_MD_LINKNAME) && S_ISLNK(inode->i_mode)) { - char *tmp = lustre_msg_buf(req->rq_repmsg, reply_off + 1); - - rc = inode->i_op->readlink(dentry, tmp, req->rq_repmsg-> - buflens[reply_off + 1]); - if (rc < 0) { - CERROR("readlink failed: %d\n", rc); - RETURN(rc); - } - - body->valid |= OBD_MD_LINKNAME; - } mds_pack_inode2fid(&body->fid1, inode); mds_pack_inode2body(body, inode); + if (S_ISREG(inode->i_mode)) { - struct lov_mds_md *md; + struct lov_mds_md *lmm; - md = lustre_msg_buf(req->rq_repmsg, reply_off + 1); - md->lmd_easize = mds->mds_max_mdsize; - rc = mds_fs_get_md(mds, inode, md); + lmm = lustre_msg_buf(req->rq_repmsg, reply_off + 1); + lmm->lmm_easize = mds->mds_max_mdsize; + rc = mds_fs_get_md(mds, inode, lmm); if (rc < 0) { if (rc == -ENODATA) @@ -488,6 +561,18 @@ static int mds_getattr_internal(struct mds_obd *mds, struct dentry *dentry, RETURN(rc); } body->valid |= OBD_MD_FLEASIZE; + } else if (S_ISLNK(inode->i_mode) && reqbody->valid & OBD_MD_LINKNAME) { + char *symname = lustre_msg_buf(req->rq_repmsg, reply_off + 1); + int len = req->rq_repmsg->buflens[reply_off + 1]; + + rc = inode->i_op->readlink(dentry, symname, len); + if (rc < 0) { + CERROR("readlink failed: %d\n", rc); + RETURN(rc); + } else + CDEBUG(D_INODE, "read symlink dest %s\n", symname); + + body->valid |= OBD_MD_LINKNAME; } RETURN(0); } @@ -502,7 +587,8 @@ static int mds_getattr_name(int offset, struct ptlrpc_request *req) struct inode *dir; struct lustre_handle lockh; char *name; - int namelen, flags, lock_mode, rc = 0, old_offset = offset; + int namelen, flags = 0, lock_mode, rc = 0; + struct obd_ucred uc; __u64 res_id[3] = {0, 0, 0}; ENTRY; @@ -520,7 +606,9 @@ static int mds_getattr_name(int offset, struct ptlrpc_request *req) if (offset) offset = 1; - push_ctxt(&saved, &mds->mds_ctxt); + uc.ouc_fsuid = body->fsuid; + uc.ouc_fsgid = body->fsgid; + push_ctxt(&saved, &mds->mds_ctxt, &uc); de = mds_fid2dentry(mds, &body->fid1, NULL); if (IS_ERR(de)) { LBUG(); @@ -532,11 +620,12 @@ static int mds_getattr_name(int offset, struct ptlrpc_request *req) lock_mode = (req->rq_reqmsg->opc == MDS_REINT) ? LCK_CW : LCK_PW; res_id[0] = dir->i_ino; + res_id[1] = dir->i_generation; rc = ldlm_lock_match(obd->obd_namespace, res_id, LDLM_PLAIN, NULL, 0, lock_mode, &lockh); if (rc == 0) { - LDLM_DEBUG_NOLOCK("enqueue res %Lu", res_id[0]); + LDLM_DEBUG_NOLOCK("enqueue res "LPU64, res_id[0]); rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL, res_id, LDLM_PLAIN, NULL, 0, lock_mode, &flags, ldlm_completion_ast, @@ -557,7 +646,7 @@ static int mds_getattr_name(int offset, struct ptlrpc_request *req) GOTO(out_create_dchild, rc = -ESTALE); } - rc = mds_getattr_internal(mds, dchild, req, old_offset, offset); + rc = mds_getattr_internal(mds, dchild, req, body, offset); EXIT; out_create_dchild: @@ -579,15 +668,18 @@ static int mds_getattr(int offset, struct ptlrpc_request *req) struct dentry *de; struct inode *inode; struct mds_body *body; + struct obd_ucred uc; int rc = 0, size[2] = {sizeof(*body)}, bufcount = 1; ENTRY; body = lustre_msg_buf(req->rq_reqmsg, offset); - push_ctxt(&saved, &mds->mds_ctxt); + uc.ouc_fsuid = body->fsuid; + uc.ouc_fsgid = body->fsgid; + push_ctxt(&saved, &mds->mds_ctxt, &uc); de = mds_fid2dentry(mds, &body->fid1, NULL); if (IS_ERR(de)) { - req->rq_status = -ENOENT; - GOTO(out_pop, rc = -ENOENT); + rc = req->rq_status = -ENOENT; + GOTO(out_pop, PTR_ERR(de)); } inode = de->d_inode; @@ -596,18 +688,26 @@ static int mds_getattr(int offset, struct ptlrpc_request *req) size[1] = mds->mds_max_mdsize; } else if (body->valid & OBD_MD_LINKNAME) { bufcount = 2; - size[1] = inode->i_size; + size[1] = MIN(inode->i_size + 1, body->size); + CDEBUG(D_INODE, "symlink size: %d, reply space: %d\n", + inode->i_size + 1, body->size); + } + + if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) { + CERROR("failed GETATTR_PACK test\n"); + req->rq_status = -ENOMEM; + GOTO(out, rc = -ENOMEM); } rc = lustre_pack_msg(bufcount, size, NULL, &req->rq_replen, &req->rq_repmsg); - if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) { + if (rc) { CERROR("out of memory or FAIL_MDS_GETATTR_PACK\n"); req->rq_status = rc; - GOTO(out, rc = 0); + GOTO(out, rc); } - req->rq_status = mds_getattr_internal(mds, de, req, offset, 0); + req->rq_status = mds_getattr_internal(mds, de, req, body, 0); out: l_dput(de); @@ -638,50 +738,107 @@ static int mds_statfs(struct ptlrpc_request *req) } osfs = lustre_msg_buf(req->rq_repmsg, 0); memset(osfs, 0, size); - obd_statfs_pack(osfs, &sfs); + statfs_pack(osfs, &sfs); + obd_statfs_pack(osfs, osfs); out: req->rq_status = rc; RETURN(0); } +static struct mds_file_data *mds_handle2mfd(struct lustre_handle *handle) +{ + struct mds_file_data *mfd = NULL; + + if (!handle || !handle->addr) + RETURN(NULL); + + mfd = (struct mds_file_data *)(unsigned long)(handle->addr); + if (!kmem_cache_validate(mds_file_cache, mfd)) + RETURN(NULL); + + if (mfd->mfd_servercookie != handle->cookie) + RETURN(NULL); + + return mfd; +} + +static int mds_store_ea(struct mds_obd *mds, struct ptlrpc_request *req, + struct mds_body *body, struct dentry *de, + struct lov_mds_md *lmm) +{ + struct obd_run_ctxt saved; + struct obd_ucred uc; + void *handle; + int rc, rc2; + + uc.ouc_fsuid = body->fsuid; + uc.ouc_fsgid = body->fsgid; + push_ctxt(&saved, &mds->mds_ctxt, &uc); + handle = mds_fs_start(mds, de->d_inode, MDS_FSOP_SETATTR); + if (!handle) + GOTO(out_ea, rc = -ENOMEM); + + rc = mds_fs_set_md(mds, de->d_inode, handle, lmm); + if (!rc) + rc = mds_update_last_rcvd(mds, handle, req); + + rc2 = mds_fs_commit(mds, de->d_inode, handle); + if (rc2 && !rc) + rc = rc2; +out_ea: + pop_ctxt(&saved); + + return rc; +} + static int mds_open(struct ptlrpc_request *req) { - struct dentry *de; + struct mds_obd *mds = mds_req2mds(req); struct mds_body *body; + struct mds_export_data *med; + struct mds_file_data *mfd; + struct dentry *de; struct file *file; struct vfsmount *mnt; - struct mds_obd *mds = mds_req2mds(req); - struct mds_export_data *med; __u32 flags; struct list_head *tmp; - struct mds_file_data *mfd; int rc, size = sizeof(*body); ENTRY; - rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg); - if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_OPEN_PACK)) { - CERROR("mds: out of memory\n"); + if (OBD_FAIL_CHECK(OBD_FAIL_MDS_OPEN_PACK)) { + CERROR("test case OBD_FAIL_MDS_OPEN_PACK\n"); req->rq_status = -ENOMEM; - RETURN(0); + RETURN(-ENOMEM); + } + + rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg); + if (rc) { + CERROR("mds: pack error: rc = %d\n", rc); + req->rq_status = rc; + RETURN(rc); } body = lustre_msg_buf(req->rq_reqmsg, 0); - /* was this animal open already? */ - /* XXX we should only check on re-open, or do a refcount... */ + /* was this animal open already and the client lost the reply? */ + /* XXX need some way to detect a reopen, to avoid locked list walks */ med = &req->rq_export->exp_mds_data; + spin_lock(&med->med_open_lock); list_for_each(tmp, &med->med_open_head) { - struct mds_file_data *fd; - fd = list_entry(tmp, struct mds_file_data, mfd_list); - if (body->extra == fd->mfd_clientfd && - body->fid1.id == fd->mfd_file->f_dentry->d_inode->i_ino) { - CERROR("Re opening %Ld\n", body->fid1.id); - RETURN(0); + mfd = list_entry(tmp, typeof(*mfd), mfd_list); + if (!memcmp(&mfd->mfd_clienthandle, &body->handle, + sizeof(mfd->mfd_clienthandle)) && + body->fid1.id == mfd->mfd_file->f_dentry->d_inode->i_ino) { + de = mfd->mfd_file->f_dentry; + spin_unlock(&med->med_open_lock); + CERROR("Re opening "LPD64"\n", body->fid1.id); + GOTO(out_pack, rc = 0); } } + spin_unlock(&med->med_open_lock); - OBD_ALLOC(mfd, sizeof(*mfd)); + mfd = kmem_cache_alloc(mds_file_cache, GFP_KERNEL); if (!mfd) { CERROR("mds: out of memory\n"); req->rq_status = -ENOMEM; @@ -689,102 +846,87 @@ static int mds_open(struct ptlrpc_request *req) } de = mds_fid2dentry(mds, &body->fid1, &mnt); - if (IS_ERR(de)) { - req->rq_status = -ENOENT; - RETURN(0); - } + if (IS_ERR(de)) + GOTO(out_free, rc = PTR_ERR(de)); /* check if this inode has seen a delayed object creation */ - if (req->rq_reqmsg->bufcount > 1) { - void *handle; - struct lov_mds_md *md; - struct inode *inode = de->d_inode; - int rc; - - md = lustre_msg_buf(req->rq_reqmsg, 1); - - handle = mds_fs_start(mds, de->d_inode, MDS_FSOP_SETATTR); - if (!handle) { - req->rq_status = -ENOMEM; - RETURN(0); - } + if (lustre_msg_get_op_flags(req->rq_reqmsg) & MDS_OPEN_HAS_EA) { + struct lov_mds_md *lmm = lustre_msg_buf(req->rq_reqmsg, 1); - /* XXX error handling */ - rc = mds_fs_set_md(mds, inode, handle, md); - if (!rc) { - struct obd_run_ctxt saved; - push_ctxt(&saved, &mds->mds_ctxt); - rc = mds_update_last_rcvd(mds, handle, req); - pop_ctxt(&saved); - } else { - req->rq_status = rc; - RETURN(0); - } - /* FIXME: need to return last_rcvd, last_committed */ - - /* FIXME: keep rc intact */ - rc = mds_fs_commit(mds, de->d_inode, handle); + rc = mds_store_ea(mds, req, body, de, lmm); if (rc) { - req->rq_status = rc; - RETURN(0); + l_dput(de); + mntput(mnt); + GOTO(out_free, rc); } } flags = body->flags; + /* dentry_open does a dput(de) and mntput(mnt) on error */ file = dentry_open(de, mnt, flags & ~O_DIRECT); - if (!file || IS_ERR(file)) { - req->rq_status = -EINVAL; - OBD_FREE(mfd, sizeof(*mfd)); - RETURN(0); + if (IS_ERR(file)) { + rc = PTR_ERR(file); + GOTO(out_free, 0); } file->private_data = mfd; mfd->mfd_file = file; - mfd->mfd_clientfd = body->extra; + memcpy(&mfd->mfd_clienthandle, &body->handle, sizeof(body->handle)); + get_random_bytes(&mfd->mfd_servercookie, sizeof(mfd->mfd_servercookie)); + spin_lock(&med->med_open_lock); list_add(&mfd->mfd_list, &med->med_open_head); + spin_unlock(&med->med_open_lock); +out_pack: body = lustre_msg_buf(req->rq_repmsg, 0); - /* FIXME: need to have cookies involved here */ - body->extra = (__u64) (unsigned long)file; + mds_pack_inode2fid(&body->fid1, de->d_inode); + mds_pack_inode2body(body, de->d_inode); + body->handle.addr = (__u64)(unsigned long)mfd; + body->handle.cookie = mfd->mfd_servercookie; + CDEBUG(D_INODE, "llite file "LPX64": addr %p, cookie "LPX64"\n", + mfd->mfd_clienthandle.addr, mfd, mfd->mfd_servercookie); + RETURN(0); + +out_free: + mfd->mfd_servercookie = DEAD_HANDLE_MAGIC; + kmem_cache_free(mds_file_cache, mfd); + req->rq_status = rc; RETURN(0); } static int mds_close(struct ptlrpc_request *req) { - struct dentry *de; + struct mds_export_data *med = &req->rq_export->exp_mds_data; struct mds_body *body; - struct file *file; - struct mds_obd *mds = mds_req2mds(req); - struct vfsmount *mnt; struct mds_file_data *mfd; int rc; ENTRY; - rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg); - if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_CLOSE_PACK)) { - CERROR("mds: out of memory\n"); - req->rq_status = -ENOMEM; - RETURN(0); - } - body = lustre_msg_buf(req->rq_reqmsg, 0); - de = mds_fid2dentry(mds, &body->fid1, &mnt); - if (IS_ERR(de)) { - req->rq_status = -ENOENT; - RETURN(0); + + mfd = mds_handle2mfd(&body->handle); + if (!mfd) { + CERROR("no handle for file close "LPD64 + ": addr "LPX64", cookie "LPX64"\n", + body->fid1.id, body->handle.addr, body->handle.cookie); + RETURN(-ESTALE); } - /* FIXME: need to have cookies involved here */ - file = (struct file *)(unsigned long)body->extra; - if (!file->f_dentry) - LBUG(); - mfd = (struct mds_file_data *)file->private_data; - list_del(&mfd->mfd_list); - OBD_FREE(mfd, sizeof(*mfd)); + spin_lock(&med->med_open_lock); + req->rq_status = mds_close_mfd(mfd, med); + spin_unlock(&med->med_open_lock); - req->rq_status = filp_close(file, 0); - l_dput(de); - mntput(mnt); + if (OBD_FAIL_CHECK(OBD_FAIL_MDS_CLOSE_PACK)) { + CERROR("test case OBD_FAIL_MDS_CLOSE_PACK\n"); + req->rq_status = -ENOMEM; + RETURN(-ENOMEM); + } + + rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg); + if (rc) { + CERROR("mds: lustre_pack_msg: rc = %d\n", rc); + req->rq_status = rc; + } RETURN(0); } @@ -795,9 +937,10 @@ static int mds_readpage(struct ptlrpc_request *req) struct vfsmount *mnt; struct dentry *de; struct file *file; - struct mds_body *body; + struct mds_body *body, *repbody; struct obd_run_ctxt saved; int rc, size = sizeof(*body); + struct obd_ucred uc; ENTRY; rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg); @@ -807,7 +950,9 @@ static int mds_readpage(struct ptlrpc_request *req) } body = lustre_msg_buf(req->rq_reqmsg, 0); - push_ctxt(&saved, &mds->mds_ctxt); + uc.ouc_fsuid = body->fsuid; + uc.ouc_fsgid = body->fsgid; + push_ctxt(&saved, &mds->mds_ctxt, &uc); de = mds_fid2dentry(mds, &body->fid1, &mnt); if (IS_ERR(de)) GOTO(out_pop, rc = PTR_ERR(de)); @@ -819,9 +964,14 @@ static int mds_readpage(struct ptlrpc_request *req) if (IS_ERR(file)) GOTO(out_pop, rc = PTR_ERR(file)); + repbody = lustre_msg_buf(req->rq_repmsg, 0); + repbody->size = file->f_dentry->d_inode->i_size; + repbody->valid = OBD_MD_FLSIZE; + /* to make this asynchronous make sure that the handling function doesn't send a reply when this function completes. Instead a callback function would send the reply */ + /* note: in case of an error, dentry_open puts dentry */ rc = mds_sendpage(req, file, body->size); filp_close(file, 0); @@ -859,17 +1009,10 @@ int mds_handle(struct ptlrpc_request *req) GOTO(out, rc); } - if (req->rq_reqmsg->type != PTL_RPC_MSG_REQUEST) { - CERROR("lustre_mds: wrong packet type sent %d\n", - req->rq_reqmsg->type); - GOTO(out, rc = -EINVAL); - } - if (req->rq_reqmsg->opc != MDS_CONNECT && req->rq_export == NULL) GOTO(out, rc = -ENOTCONN); - if (strcmp(req->rq_obd->obd_type->typ_name, "mds") != 0) - GOTO(out, rc = -EINVAL); + LASSERT(!strcmp(req->rq_obd->obd_type->typ_name, LUSTRE_MDT_NAME)); switch (req->rq_reqmsg->opc) { case MDS_CONNECT: @@ -951,7 +1094,6 @@ int mds_handle(struct ptlrpc_request *req) if (rc) break; RETURN(0); - case LDLM_CONVERT: CDEBUG(D_INODE, "convert\n"); OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0); @@ -959,14 +1101,6 @@ int mds_handle(struct ptlrpc_request *req) if (rc) break; RETURN(0); - - case LDLM_CANCEL: - CDEBUG(D_INODE, "cancel\n"); - OBD_FAIL_RETURN(OBD_FAIL_LDLM_CANCEL, 0); - rc = ldlm_handle_cancel(req); - if (rc) - break; - RETURN(0); case LDLM_BL_CALLBACK: case LDLM_CP_CALLBACK: CDEBUG(D_INODE, "callback\n"); @@ -982,23 +1116,22 @@ int mds_handle(struct ptlrpc_request *req) EXIT; if (!rc) { + struct mds_export_data *med = &req->rq_export->exp_mds_data; struct mds_obd *mds = mds_req2mds(req); - req->rq_repmsg->last_xid = HTON__u64(mds->mds_last_rcvd); + + req->rq_repmsg->last_xid = + HTON__u64(le64_to_cpu(med->med_mcd->mcd_last_xid)); req->rq_repmsg->last_committed = HTON__u64(mds->mds_last_committed); - CDEBUG(D_INFO, "last_rcvd %Lu, last_committed %Lu, xid %d\n", + CDEBUG(D_INFO, "last_rcvd ~%Lu, last_committed %Lu, xid %d\n", (unsigned long long)mds->mds_last_rcvd, (unsigned long long)mds->mds_last_committed, cpu_to_le32(req->rq_xid)); } out: - /* Still not 100% sure whether we should reply with the server - * last_rcvd or that of this client. I'm not sure it even makes - * a difference on a per-client basis, because last_rcvd is global - * and we are not supposed to allow transactions while in recovery. - */ if (rc) { - CERROR("mds: processing error %d\n", rc); + CERROR("mds: processing error (opcode %d): %d\n", + req->rq_reqmsg->opc, rc); ptlrpc_error(req->rq_svc, req); } else { CDEBUG(D_NET, "sending reply\n"); @@ -1013,6 +1146,8 @@ int mds_handle(struct ptlrpc_request *req) * This will alert us that we may need to do client recovery. * * Assumes we are already in the server filesystem context. + * + * Also assumes for mds_last_rcvd that we are not modifying it (no locking). */ static int mds_update_server_data(struct mds_obd *mds) @@ -1035,7 +1170,11 @@ int mds_update_server_data(struct mds_obd *mds) RETURN(-EIO); RETURN(rc); } +#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)) rc = fsync_dev(filp->f_dentry->d_inode->i_rdev); +#else + rc = file_fsync(filp, filp->f_dentry, 1); +#endif if (rc) CERROR("error flushing MDS server data: rc = %d\n", rc); @@ -1051,18 +1190,16 @@ static int mds_recover(struct obd_device *obddev) /* This happens at the end when recovery is complete */ ++mds->mds_mount_count; - push_ctxt(&saved, &mds->mds_ctxt); + push_ctxt(&saved, &mds->mds_ctxt, NULL); rc = mds_update_server_data(mds); pop_ctxt(&saved); return rc; } -#define MDS_NUM_THREADS 8 /* mount the file system (secretly) */ static int mds_setup(struct obd_device *obddev, obd_count len, void *buf) { - int i; struct obd_ioctl_data* data = buf; struct mds_obd *mds = &obddev->u.mds; struct vfsmount *mnt; @@ -1085,10 +1222,12 @@ static int mds_setup(struct obd_device *obddev, obd_count len, void *buf) GOTO(err_kfree, rc); } + CERROR("%s: mnt is %p\n", data->ioc_inlbuf1, mnt); mds->mds_sb = mnt->mnt_root->d_inode->i_sb; if (!mds->mds_sb) GOTO(err_put, rc = -ENODEV); + spin_lock_init(&mds->mds_last_lock); mds->mds_max_mdsize = sizeof(struct lov_mds_md); rc = mds_fs_setup(obddev, mnt); if (rc) { @@ -1096,41 +1235,23 @@ static int mds_setup(struct obd_device *obddev, obd_count len, void *buf) GOTO(err_put, rc); } - mds->mds_service = ptlrpc_init_svc(64 * 1024, MDS_REQUEST_PORTAL, - MDC_REPLY_PORTAL, "self",mds_handle, - "mds"); - if (!mds->mds_service) { - CERROR("failed to start service\n"); - GOTO(err_fs, rc = -EINVAL); - } - obddev->obd_namespace = ldlm_namespace_new("mds_server", LDLM_NAMESPACE_SERVER); if (obddev->obd_namespace == NULL) { mds_cleanup(obddev); - GOTO(err_svc, rc = -ENOMEM); + GOTO(err_fs, rc = -ENOMEM); } - for (i = 0; i < MDS_NUM_THREADS; i++) { - char name[32]; - sprintf(name, "lustre_MDS_%02d", i); - rc = ptlrpc_start_thread(obddev, mds->mds_service, name); - if (rc) { - CERROR("cannot start MDS thread #%d: rc %d\n", i, rc); - GOTO(err_thread, rc); - } - } rc = mds_recover(obddev); if (rc) - GOTO(err_thread, rc); + GOTO(err_fs, rc); + + ptlrpc_init_client(LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL, + "mds_ldlm_client", &obddev->obd_ldlm_client); RETURN(0); -err_thread: - ptlrpc_stop_all_threads(mds->mds_service); -err_svc: - ptlrpc_unregister_service(mds->mds_service); err_fs: mds_fs_cleanup(obddev); err_put: @@ -1142,7 +1263,7 @@ err_kfree: kfree(mds->mds_fstype); err_dec: MOD_DEC_USE_COUNT; - return rc; + RETURN(rc); } static int mds_cleanup(struct obd_device *obddev) @@ -1152,14 +1273,11 @@ static int mds_cleanup(struct obd_device *obddev) struct obd_run_ctxt saved; ENTRY; - ptlrpc_stop_all_threads(mds->mds_service); - ptlrpc_unregister_service(mds->mds_service); - sb = mds->mds_sb; if (!mds->mds_sb) RETURN(0); - push_ctxt(&saved, &mds->mds_ctxt); + push_ctxt(&saved, &mds->mds_ctxt, NULL); mds_update_server_data(mds); if (mds->mds_rcvd_filp) { @@ -1189,7 +1307,7 @@ static int mds_cleanup(struct obd_device *obddev) } static int ldlm_intent_policy(struct ldlm_lock *lock, void *req_cookie, - ldlm_mode_t mode, void *data) + ldlm_mode_t mode, int flags, void *data) { struct ptlrpc_request *req = req_cookie; int rc = 0; @@ -1235,10 +1353,10 @@ static int ldlm_intent_policy(struct ldlm_lock *lock, void *req_cookie, } break; case IT_CREAT: - case IT_LINK: case IT_MKDIR: case IT_MKNOD: case IT_RENAME2: + case IT_LINK2: case IT_RMDIR: case IT_SYMLINK: case IT_UNLINK: @@ -1256,6 +1374,7 @@ static int ldlm_intent_policy(struct ldlm_lock *lock, void *req_cookie, case IT_READDIR: case IT_READLINK: case IT_RENAME: + case IT_LINK: case IT_SETATTR: rc = mds_getattr_name(2, req); /* FIXME: we need to sit down and decide on who should @@ -1274,8 +1393,16 @@ static int ldlm_intent_policy(struct ldlm_lock *lock, void *req_cookie, LBUG(); } - if (it->opc == IT_UNLINK || it->opc == IT_RMDIR || - it->opc == IT_RENAME || it->opc == IT_RENAME2) + /* We don't bother returning a lock to the client for a file + * or directory we are removing. + * + * As for link and rename, there is no reason for the client + * to get a lock on the target at this point. If they are + * going to modify the file/directory later they will get a + * lock at that time. + */ + if (it->opc & (IT_UNLINK | IT_RMDIR | IT_LINK | IT_LINK2 | + IT_RENAME | IT_RENAME2)) RETURN(ELDLM_LOCK_ABORTED); rep->lock_policy_res2 = req->rq_status; @@ -1284,18 +1411,23 @@ static int ldlm_intent_policy(struct ldlm_lock *lock, void *req_cookie, /* If the client is about to open a file that doesn't have an MD * stripe record, it's going to need a write lock. */ if (it->opc & IT_OPEN) { - struct lov_mds_md *md = + struct lov_mds_md *lmm = lustre_msg_buf(req->rq_repmsg, 2); - if (md->lmd_easize == 0) { + if (lmm->lmm_easize == 0) { LDLM_DEBUG(lock, "open with no EA; returning PW" " lock"); lock->l_req_mode = LCK_PW; } } + if (flags & LDLM_FL_INTENT_ONLY) { + LDLM_DEBUG(lock, "INTENT_ONLY, aborting lock"); + RETURN(ELDLM_LOCK_ABORTED); + } /* Give the client a lock on the child object, instead of the * parent that it requested. */ new_resid[0] = NTOH__u32(mds_rep->ino); + new_resid[1] = NTOH__u32(mds_rep->generation); if (new_resid[0] == 0) LBUG(); old_res = lock->l_resource->lr_name[0]; @@ -1320,12 +1452,82 @@ static int ldlm_intent_policy(struct ldlm_lock *lock, void *req_cookie, RETURN(rc); } +int mds_attach(struct obd_device *dev, + obd_count len, void *data) +{ + int rc; + rc = lprocfs_reg_obd(dev, (lprocfs_vars_t*)status_var_nm_1, (void*)dev); + return rc; +} + +int mds_detach(struct obd_device *dev) +{ + int rc; + rc = lprocfs_dereg_obd(dev); + return rc; + +} + +#define MDT_NUM_THREADS 8 +static int mdt_setup(struct obd_device *obddev, obd_count len, void *buf) +{ + int i; + // struct obd_ioctl_data* data = buf; + struct mds_obd *mds = &obddev->u.mds; + int rc = 0; + ENTRY; + + MOD_INC_USE_COUNT; + + mds->mds_service = ptlrpc_init_svc(MDS_NEVENTS, MDS_NBUFS, + MDS_BUFSIZE, MDS_MAXREQSIZE, + MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL, + "self", mds_handle, "mds"); + if (!mds->mds_service) { + CERROR("failed to start service\n"); + GOTO(err_dec, rc = -EINVAL); + } + + for (i = 0; i < MDT_NUM_THREADS; i++) { + char name[32]; + sprintf(name, "lustre_MDT_%02d", i); + rc = ptlrpc_start_thread(obddev, mds->mds_service, name); + if (rc) { + CERROR("cannot start MDT thread #%d: rc %d\n", i, rc); + GOTO(err_thread, rc); + } + } + + RETURN(0); + +err_thread: + ptlrpc_stop_all_threads(mds->mds_service); + ptlrpc_unregister_service(mds->mds_service); +err_dec: + MOD_DEC_USE_COUNT; + RETURN(rc); +} + + +static int mdt_cleanup(struct obd_device *obddev) +{ + struct mds_obd *mds = &obddev->u.mds; + ENTRY; + + ptlrpc_stop_all_threads(mds->mds_service); + ptlrpc_unregister_service(mds->mds_service); + + MOD_DEC_USE_COUNT; + RETURN(0); +} extern int mds_iocontrol(long cmd, struct lustre_handle *conn, int len, void *karg, void *uarg); /* use obd ops to offer management infrastructure */ static struct obd_ops mds_obd_ops = { + o_attach: mds_attach, + o_detach: mds_detach, o_connect: mds_connect, o_disconnect: mds_disconnect, o_setup: mds_setup, @@ -1333,17 +1535,41 @@ static struct obd_ops mds_obd_ops = { o_iocontrol: mds_iocontrol }; +static struct obd_ops mdt_obd_ops = { + o_setup: mdt_setup, + o_cleanup: mdt_cleanup, +}; + + static int __init mds_init(void) { - class_register_type(&mds_obd_ops, LUSTRE_MDS_NAME); + + mds_file_cache = kmem_cache_create("ll_mds_file_data", + sizeof(struct mds_file_data), + 0, 0, NULL, NULL); + if (mds_file_cache == NULL) + return -ENOMEM; + + class_register_type(&mds_obd_ops, (lprocfs_vars_t*)status_class_var, + LUSTRE_MDS_NAME); + class_register_type(&mdt_obd_ops, 0, LUSTRE_MDT_NAME); + ldlm_register_intent(ldlm_intent_policy); + return 0; + } static void __exit mds_exit(void) { + + ldlm_unregister_intent(); class_unregister_type(LUSTRE_MDS_NAME); + class_unregister_type(LUSTRE_MDT_NAME); + if (kmem_cache_destroy(mds_file_cache)) + CERROR("couldn't free MDS file cache\n"); + } MODULE_AUTHOR("Cluster File Systems ");