From: pravins Date: Mon, 19 May 2008 14:29:49 +0000 (+0000) Subject: b=14230 X-Git-Tag: v1_9_50~480 X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=commitdiff_plain;h=69a3513021212ed1eb8823a50f80853e22e607b3 b=14230 i=alex.zhuravlev i=h.huang following patch removes 1.6 dead code from head branch and moves some function to respective layers. diffstat: b/lustre/include/lustre_mds.h | 70 - b/lustre/mdd/mdd_lov.c | 180 ++ b/lustre/mds/Makefile.in | 3 b/lustre/mds/handler.c | 2571 ------------------------------------------ b/lustre/mds/mds_fs.c | 654 ---------- b/lustre/mds/mds_internal.h | 209 --- b/lustre/mds/mds_log.c | 84 - b/lustre/mds/mds_lov.c | 295 ---- b/lustre/mdt/mdt_handler.c | 116 + b/lustre/mdt/mdt_recovery.c | 53 lustre/mds/commit_confd.c | 98 - lustre/mds/mds_join.c | 508 -------- lustre/mds/mds_lib.c | 488 ------- lustre/mds/mds_open.c | 1533 ------------------------- lustre/mds/mds_reint.c | 2419 --------------------------------------- lustre/mds/mds_unlink_open.c | 287 ---- lustre/mds/mds_xattr.c | 358 ----- 17 files changed, 369 insertions(+), 9557 deletions(-) --- diff --git a/lustre/include/lustre_mds.h b/lustre/include/lustre_mds.h index 5655d5b..7f9f24d 100644 --- a/lustre/include/lustre_mds.h +++ b/lustre/include/lustre_mds.h @@ -28,91 +28,21 @@ #error Unsupported operating system. #endif -struct ldlm_lock_desc; -struct mds_obd; -struct ptlrpc_connection; -struct ptlrpc_client; -struct obd_export; -struct ptlrpc_request; -struct obd_device; -struct ll_file_data; - -struct mds_update_record { - __u32 ur_opcode; - struct ll_fid *ur_fid1; - struct ll_fid *ur_fid2; - int ur_namelen; - char *ur_name; - int ur_tgtlen; - char *ur_tgt; - int ur_eadatalen; - void *ur_eadata; - int ur_cookielen; - struct llog_cookie *ur_logcookies; - struct iattr ur_iattr; - struct lvfs_ucred ur_uc; - __u64 ur_rdev; - __u64 ur_time; - __u32 ur_mode; - __u32 ur_flags; - struct lvfs_grp_hash_entry *ur_grp_entry; - struct ldlm_request *ur_dlm; -}; - -/* file data for open files on MDS */ -struct mds_file_data { - struct portals_handle mfd_handle; /* must be first */ - atomic_t mfd_refcount; - struct list_head mfd_list; /* protected by med_open_lock */ - __u64 mfd_xid; - int mfd_mode; - struct dentry *mfd_dentry; -}; - struct mds_group_info { struct obd_uuid *uuid; int group; }; /* mds/mds_reint.c */ -struct inode; - -int mds_reint_rec(struct mds_update_record *r, int offset, - struct ptlrpc_request *req, struct lustre_handle *); - -int mds_osc_setattr_async(struct obd_device *obd, __u32 uid, __u32 gid, - struct lov_mds_md *lmm, int lmm_size, - struct llog_cookie *logcookies, __u64 id, __u32 gen, - struct obd_capa *oc); - -int mds_log_op_unlink(struct obd_device *obd, - struct lov_mds_md *lmm, int lmm_size, - struct llog_cookie *logcookies, int cookies_size); -int mds_log_op_setattr(struct obd_device *obd, __u32 uid, __u32 gid, - struct lov_mds_md *lmm, int lmm_size, - struct llog_cookie *logcookies, int cookies_size); - int mds_lov_write_objids(struct obd_device *obd); void mds_lov_update_objids(struct obd_device *obd, struct lov_mds_md *lmm); -/* ioctls for trying requests */ -#define IOC_REQUEST_TYPE 'f' -#define IOC_REQUEST_MIN_NR 30 - -#define IOC_REQUEST_GETATTR _IOWR('f', 30, long) -#define IOC_REQUEST_READPAGE _IOWR('f', 31, long) -#define IOC_REQUEST_SETATTR _IOWR('f', 32, long) -#define IOC_REQUEST_CREATE _IOWR('f', 33, long) -#define IOC_REQUEST_OPEN _IOWR('f', 34, long) -#define IOC_REQUEST_CLOSE _IOWR('f', 35, long) -#define IOC_REQUEST_MAX_NR 35 #define MDS_LOV_MD_NAME "trusted.lov" #define MDS_LMV_MD_NAME "trusted.lmv" #define MDD_OBD_NAME "mdd_obd" #define MDD_OBD_UUID "mdd_obd_uuid" #define MDD_OBD_TYPE "mds" -#define MDD_OBD_PROFILE "lustre-MDT0000" static inline int md_should_create(__u32 flags) { diff --git a/lustre/mdd/mdd_lov.c b/lustre/mdd/mdd_lov.c index cb7a68b..19d1bcd 100644 --- a/lustre/mdd/mdd_lov.c +++ b/lustre/mdd/mdd_lov.c @@ -342,9 +342,36 @@ static obd_id mdd_lov_create_id(const struct lu_fid *fid) return fid_flatten(fid); } +static void mdd_lov_update_objids(struct obd_device *obd, struct lov_mds_md *lmm) +{ + struct mds_obd *mds = &obd->u.mds; + int j; + ENTRY; + + /* if we create file without objects - lmm is NULL */ + if (lmm == NULL) + return; + + for (j = 0; j < le32_to_cpu(lmm->lmm_stripe_count); j++) { + int i = le32_to_cpu(lmm->lmm_objects[j].l_ost_idx); + obd_id id = le64_to_cpu(lmm->lmm_objects[j].l_object_id); + int page = i / OBJID_PER_PAGE(); + int idx = i % OBJID_PER_PAGE(); + obd_id *data = mds->mds_lov_page_array[page]; + + CDEBUG(D_INODE,"update last object for ost %d - new %llu" + " old %llu\n", i, id, data[idx]); + if (id > data[idx]) { + data[idx] = id; + cfs_bitmap_set(mds->mds_lov_page_dirty, page); + } + } + EXIT; +} + void mdd_lov_objid_update(struct mdd_device *mdd, struct lov_mds_md *lmm) { - mds_lov_update_objids(mdd->mdd_obd_dev, lmm); + mdd_lov_update_objids(mdd->mdd_obd_dev, lmm); } void mdd_lov_create_finish(const struct lu_env *env, struct mdd_device *mdd, @@ -612,6 +639,43 @@ int mdd_lov_destroy(const struct lu_env *env, struct mdd_device *mdd, RETURN(rc); } +int mdd_log_op_unlink(struct obd_device *obd, + struct lov_mds_md *lmm, int lmm_size, + struct llog_cookie *logcookies, int cookies_size) +{ + struct mds_obd *mds = &obd->u.mds; + struct lov_stripe_md *lsm = NULL; + struct llog_unlink_rec *lur; + struct llog_ctxt *ctxt; + int rc; + ENTRY; + + if (IS_ERR(mds->mds_osc_obd)) + RETURN(PTR_ERR(mds->mds_osc_obd)); + + rc = obd_unpackmd(mds->mds_osc_exp, &lsm, lmm, lmm_size); + if (rc < 0) + RETURN(rc); + rc = obd_checkmd(mds->mds_osc_exp, obd->obd_self_export, lsm); + if (rc) + GOTO(out, rc); + /* first prepare unlink log record */ + OBD_ALLOC(lur, sizeof(*lur)); + if (!lur) + GOTO(out, rc = -ENOMEM); + lur->lur_hdr.lrh_len = lur->lur_tail.lrt_len = sizeof(*lur); + lur->lur_hdr.lrh_type = MDS_UNLINK_REC; + + ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT); + rc = llog_add(ctxt, &lur->lur_hdr, lsm, logcookies, + cookies_size / sizeof(struct llog_cookie)); + llog_ctxt_put(ctxt); + + OBD_FREE(lur, sizeof(*lur)); +out: + obd_free_memmd(mds->mds_osc_exp, &lsm); + RETURN(rc); +} int mdd_unlink_log(const struct lu_env *env, struct mdd_device *mdd, struct mdd_object *mdd_cobj, struct md_attr *ma) @@ -621,13 +685,58 @@ int mdd_unlink_log(const struct lu_env *env, struct mdd_device *mdd, LASSERT(ma->ma_valid & MA_LOV); if ((ma->ma_cookie_size > 0) && - (mds_log_op_unlink(obd, ma->ma_lmm, ma->ma_lmm_size, + (mdd_log_op_unlink(obd, ma->ma_lmm, ma->ma_lmm_size, ma->ma_cookie, ma->ma_cookie_size) > 0)) { ma->ma_valid |= MA_COOKIE; } return 0; } +int mdd_log_op_setattr(struct obd_device *obd, __u32 uid, __u32 gid, + struct lov_mds_md *lmm, int lmm_size, + struct llog_cookie *logcookies, int cookies_size) +{ + struct mds_obd *mds = &obd->u.mds; + struct lov_stripe_md *lsm = NULL; + struct llog_setattr_rec *lsr; + struct llog_ctxt *ctxt; + int rc; + ENTRY; + + if (IS_ERR(mds->mds_osc_obd)) + RETURN(PTR_ERR(mds->mds_osc_obd)); + + rc = obd_unpackmd(mds->mds_osc_exp, &lsm, lmm, lmm_size); + if (rc < 0) + RETURN(rc); + + rc = obd_checkmd(mds->mds_osc_exp, obd->obd_self_export, lsm); + if (rc) + GOTO(out, rc); + + OBD_ALLOC(lsr, sizeof(*lsr)); + if (!lsr) + GOTO(out, rc = -ENOMEM); + + /* prepare setattr log record */ + lsr->lsr_hdr.lrh_len = lsr->lsr_tail.lrt_len = sizeof(*lsr); + lsr->lsr_hdr.lrh_type = MDS_SETATTR_REC; + lsr->lsr_uid = uid; + lsr->lsr_gid = gid; + + /* write setattr log */ + ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT); + rc = llog_add(ctxt, &lsr->lsr_hdr, lsm, logcookies, + cookies_size / sizeof(struct llog_cookie)); + + llog_ctxt_put(ctxt); + + OBD_FREE(lsr, sizeof(*lsr)); + out: + obd_free_memmd(mds->mds_osc_exp, &lsm); + RETURN(rc); +} + int mdd_setattr_log(const struct lu_env *env, struct mdd_device *mdd, const struct md_attr *ma, struct lov_mds_md *lmm, int lmm_size, @@ -640,7 +749,7 @@ int mdd_setattr_log(const struct lu_env *env, struct mdd_device *mdd, CDEBUG(D_INFO, "setattr llog for uid/gid=%lu/%lu\n", (unsigned long)ma->ma_attr.la_uid, (unsigned long)ma->ma_attr.la_gid); - return mds_log_op_setattr(obd, ma->ma_attr.la_uid, + return mdd_log_op_setattr(obd, ma->ma_attr.la_uid, ma->ma_attr.la_gid, lmm, lmm_size, logcookies, cookies_size); @@ -648,6 +757,68 @@ int mdd_setattr_log(const struct lu_env *env, struct mdd_device *mdd, return 0; } +static int mdd_osc_setattr_async(struct obd_device *obd, __u32 uid, __u32 gid, + struct lov_mds_md *lmm, int lmm_size, + struct llog_cookie *logcookies, __u64 id, __u32 gen, + struct obd_capa *oc) +{ + struct mds_obd *mds = &obd->u.mds; + struct obd_trans_info oti = { 0 }; + struct obd_info oinfo = { { { 0 } } }; + int rc; + ENTRY; + + if (OBD_FAIL_CHECK(OBD_FAIL_MDS_OST_SETATTR)) + RETURN(0); + + /* first get memory EA */ + OBDO_ALLOC(oinfo.oi_oa); + if (!oinfo.oi_oa) + RETURN(-ENOMEM); + + LASSERT(lmm); + + rc = obd_unpackmd(mds->mds_osc_exp, &oinfo.oi_md, lmm, lmm_size); + if (rc < 0) { + CERROR("Error unpack md %p for inode "LPU64"\n", lmm, id); + GOTO(out, rc); + } + + rc = obd_checkmd(mds->mds_osc_exp, obd->obd_self_export, oinfo.oi_md); + if (rc) { + CERROR("Error revalidate lsm %p \n", oinfo.oi_md); + GOTO(out, rc); + } + + /* then fill oa */ + oinfo.oi_oa->o_uid = uid; + oinfo.oi_oa->o_gid = gid; + oinfo.oi_oa->o_id = oinfo.oi_md->lsm_object_id; + oinfo.oi_oa->o_gr = oinfo.oi_md->lsm_object_gr; + oinfo.oi_oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGROUP | + OBD_MD_FLUID | OBD_MD_FLGID; + if (logcookies) { + oinfo.oi_oa->o_valid |= OBD_MD_FLCOOKIE; + oti.oti_logcookies = logcookies; + } + + oinfo.oi_oa->o_fid = id; + oinfo.oi_oa->o_generation = gen; + oinfo.oi_oa->o_valid |= OBD_MD_FLFID | OBD_MD_FLGENER; + oinfo.oi_capa = oc; + + /* do async setattr from mds to ost not waiting for responses. */ + rc = obd_setattr_async(mds->mds_osc_exp, &oinfo, &oti, NULL); + if (rc) + CDEBUG(D_INODE, "mds to ost setattr objid 0x"LPX64 + " on ost error %d\n", oinfo.oi_md->lsm_object_id, rc); +out: + if (oinfo.oi_md) + obd_free_memmd(mds->mds_osc_exp, &oinfo.oi_md); + OBDO_FREE(oinfo.oi_oa); + RETURN(rc); +} + int mdd_lov_setattr_async(const struct lu_env *env, struct mdd_object *obj, struct lov_mds_md *lmm, int lmm_size, struct llog_cookie *logcookies) @@ -670,7 +841,7 @@ int mdd_lov_setattr_async(const struct lu_env *env, struct mdd_object *obj, if (IS_ERR(oc)) oc = NULL; - rc = mds_osc_setattr_async(obd, tmp_la->la_uid, tmp_la->la_gid, lmm, + rc = mdd_osc_setattr_async(obd, tmp_la->la_uid, tmp_la->la_gid, lmm, lmm_size, logcookies, fid_seq(fid), fid_oid(fid), oc); @@ -678,4 +849,3 @@ int mdd_lov_setattr_async(const struct lu_env *env, struct mdd_object *obj, RETURN(rc); } - diff --git a/lustre/mds/Makefile.in b/lustre/mds/Makefile.in index 96d7ec7..a6400b8 100644 --- a/lustre/mds/Makefile.in +++ b/lustre/mds/Makefile.in @@ -1,5 +1,4 @@ MODULES := mds -mds-objs := mds_log.o mds_unlink_open.o mds_lov.o handler.o mds_reint.o -mds-objs += mds_fs.o lproc_mds.o mds_open.o mds_lib.o mds_xattr.o mds_join.o +mds-objs := handler.o lproc_mds.o mds_fs.o mds_log.o mds_lov.o @INCLUDE_RULES@ diff --git a/lustre/mds/handler.c b/lustre/mds/handler.c index 9c33a98..9621687 100644 --- a/lustre/mds/handler.c +++ b/lustre/mds/handler.c @@ -29,9 +29,6 @@ * license text for more details. */ -#ifndef EXPORT_SYMTAB -# define EXPORT_SYMTAB -#endif #define DEBUG_SUBSYSTEM S_MDS #include @@ -58,143 +55,10 @@ #include "mds_internal.h" -int mds_num_threads; -CFS_MODULE_PARM(mds_num_threads, "i", int, 0444, - "number of MDS service threads to start"); - __u32 mds_max_ost_index=0xFFFF; CFS_MODULE_PARM(mds_max_ost_index, "i", int, 0444, "maximal OST index"); -static int mds_intent_policy(struct ldlm_namespace *ns, - struct ldlm_lock **lockp, void *req_cookie, - ldlm_mode_t mode, int flags, void *data); -static int mds_postsetup(struct obd_device *obd); -static int mds_cleanup(struct obd_device *obd); - -/* Assumes caller has already pushed into the kernel filesystem context */ -static int mds_sendpage(struct ptlrpc_request *req, struct file *file, - loff_t offset, int count) -{ - struct ptlrpc_bulk_desc *desc; - struct l_wait_info lwi; - struct page **pages; - int rc = 0, npages, i, tmpcount, tmpsize = 0; - ENTRY; - - LASSERT((offset & ~CFS_PAGE_MASK) == 0); /* I'm dubious about this */ - - npages = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT; - OBD_ALLOC(pages, sizeof(*pages) * npages); - if (!pages) - GOTO(out, rc = -ENOMEM); - - desc = ptlrpc_prep_bulk_exp(req, npages, BULK_PUT_SOURCE, - MDS_BULK_PORTAL); - if (desc == NULL) - GOTO(out_free, rc = -ENOMEM); - - for (i = 0, tmpcount = count; i < npages; i++, tmpcount -= tmpsize) { - tmpsize = tmpcount > CFS_PAGE_SIZE ? CFS_PAGE_SIZE : tmpcount; - - OBD_PAGE_ALLOC(pages[i], CFS_ALLOC_STD); - if (pages[i] == NULL) - GOTO(cleanup_buf, rc = -ENOMEM); - - ptlrpc_prep_bulk_page(desc, pages[i], 0, tmpsize); - } - - for (i = 0, tmpcount = count; i < npages; i++, tmpcount -= tmpsize) { - tmpsize = tmpcount > CFS_PAGE_SIZE ? CFS_PAGE_SIZE : tmpcount; - CDEBUG(D_EXT2, "reading %u@%llu from dir %lu (size %llu)\n", - tmpsize, offset, file->f_dentry->d_inode->i_ino, - i_size_read(file->f_dentry->d_inode)); - - rc = fsfilt_readpage(req->rq_export->exp_obd, file, - kmap(pages[i]), tmpsize, &offset); - kunmap(pages[i]); - - if (rc != tmpsize) - GOTO(cleanup_buf, rc = -EIO); - } - - LASSERT(desc->bd_nob == count); - - rc = ptlrpc_start_bulk_transfer(desc); - if (rc) - GOTO(cleanup_buf, rc); - - if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE)) { - CERROR("obd_fail_loc=%x, fail operation rc=%d\n", - OBD_FAIL_MDS_SENDPAGE, rc); - GOTO(abort_bulk, rc); - } - - lwi = LWI_TIMEOUT(obd_timeout * HZ / 4, NULL, NULL); - rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc), &lwi); - LASSERT (rc == 0 || rc == -ETIMEDOUT); - - if (rc == 0) { - if (desc->bd_success && - desc->bd_nob_transferred == count) - GOTO(cleanup_buf, rc); - - rc = -ETIMEDOUT; /* XXX should this be a different errno? */ - } - - DEBUG_REQ(D_ERROR, req, "bulk failed: %s %d(%d), evicting %s@%s", - (rc == -ETIMEDOUT) ? "timeout" : "network error", - desc->bd_nob_transferred, count, - req->rq_export->exp_client_uuid.uuid, - req->rq_export->exp_connection->c_remote_uuid.uuid); - - class_fail_export(req->rq_export); - - EXIT; - abort_bulk: - ptlrpc_abort_bulk (desc); - cleanup_buf: - for (i = 0; i < npages; i++) - if (pages[i]) - OBD_PAGE_FREE(pages[i]); - - ptlrpc_free_bulk(desc); - out_free: - OBD_FREE(pages, sizeof(*pages) * npages); - out: - return rc; -} - -/* only valid locked dentries or errors should be returned */ -struct dentry *mds_fid2locked_dentry(struct obd_device *obd, struct ll_fid *fid, - struct vfsmount **mnt, int lock_mode, - struct lustre_handle *lockh, - __u64 lockpart) -{ - struct mds_obd *mds = &obd->u.mds; - struct dentry *de = mds_fid2dentry(mds, fid, mnt), *retval = de; - struct ldlm_res_id res_id = { .name = {0} }; - int flags = LDLM_FL_ATOMIC_CB, rc; - ldlm_policy_data_t policy = { .l_inodebits = { lockpart} }; - ENTRY; - - if (IS_ERR(de)) - RETURN(de); - - res_id.name[0] = de->d_inode->i_ino; - res_id.name[1] = de->d_inode->i_generation; - rc = ldlm_cli_enqueue_local(obd->obd_namespace, &res_id, - LDLM_IBITS, &policy, lock_mode, &flags, - ldlm_blocking_ast, ldlm_completion_ast, - NULL, NULL, 0, NULL, lockh); - if (rc != ELDLM_OK) { - l_dput(de); - retval = ERR_PTR(-EIO); /* XXX translate ldlm code */ - } - - RETURN(retval); -} - /* Look up an entry by inode number. */ /* this function ONLY returns valid dget'd dentries with an initialized inode or errors */ @@ -232,1692 +96,28 @@ struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid, inode->i_generation,(unsigned long)inode->i_nlink, atomic_read(&inode->i_count)); dput(result); - RETURN(ERR_PTR(-ENOENT)); - } - - if (generation && inode->i_generation != generation) { - /* we didn't find the right inode.. */ - CDEBUG(D_INODE, "found wrong generation: inode %lu, link: %lu, " - "count: %d, generation %u/%u\n", inode->i_ino, - (unsigned long)inode->i_nlink, - atomic_read(&inode->i_count), inode->i_generation, - generation); - dput(result); - RETURN(ERR_PTR(-ENOENT)); - } - - if (mnt) { - *mnt = mds->mds_vfsmnt; - mntget(*mnt); - } - - RETURN(result); -} - -static int mds_connect_internal(struct obd_export *exp, - struct obd_connect_data *data) -{ - struct obd_device *obd = exp->exp_obd; - if (data != NULL) { - data->ocd_connect_flags &= MDT_CONNECT_SUPPORTED; - data->ocd_ibits_known &= MDS_INODELOCK_FULL; - - /* If no known bits (which should not happen, probably, - as everybody should support LOOKUP and UPDATE bits at least) - revert to compat mode with plain locks. */ - if (!data->ocd_ibits_known && - data->ocd_connect_flags & OBD_CONNECT_IBITS) - data->ocd_connect_flags &= ~OBD_CONNECT_IBITS; - - if (!obd->u.mds.mds_fl_acl) - data->ocd_connect_flags &= ~OBD_CONNECT_ACL; - - if (!obd->u.mds.mds_fl_user_xattr) - data->ocd_connect_flags &= ~OBD_CONNECT_XATTR; - - exp->exp_connect_flags = data->ocd_connect_flags; - data->ocd_version = LUSTRE_VERSION_CODE; - exp->exp_mds_data.med_ibits_known = data->ocd_ibits_known; - } - - if (obd->u.mds.mds_fl_acl && - ((exp->exp_connect_flags & OBD_CONNECT_ACL) == 0)) { - CWARN("%s: MDS requires ACL support but client does not\n", - obd->obd_name); - return -EBADE; - } - return 0; -} - -static int mds_reconnect(const struct lu_env *env, - struct obd_export *exp, struct obd_device *obd, - struct obd_uuid *cluuid, - struct obd_connect_data *data) -{ - int rc; - ENTRY; - - if (exp == NULL || obd == NULL || cluuid == NULL) - RETURN(-EINVAL); - - rc = mds_connect_internal(exp, data); - - RETURN(rc); -} - -/* Establish a connection to the MDS. - * - * This will set up an export structure for the client to hold state data - * about that client, like open files, the last operation number it did - * on the server, etc. - */ -static int mds_connect(const struct lu_env *env, - struct lustre_handle *conn, struct obd_device *obd, - struct obd_uuid *cluuid, struct obd_connect_data *data, - void *localdata) -{ - struct obd_export *exp; - struct mds_export_data *med; - struct mds_client_data *mcd = NULL; - int rc; - ENTRY; - - if (!conn || !obd || !cluuid) - RETURN(-EINVAL); - - /* XXX There is a small race between checking the list and adding a - * new connection for the same UUID, but the real threat (list - * corruption when multiple different clients connect) is solved. - * - * There is a second race between adding the export to the list, - * and filling in the client data below. Hence skipping the case - * of NULL mcd above. We should already be controlling multiple - * connects at the client, and we can't hold the spinlock over - * memory allocations without risk of deadlocking. - */ - rc = class_connect(conn, obd, cluuid); - if (rc) - RETURN(rc); - exp = class_conn2export(conn); - LASSERT(exp); - med = &exp->exp_mds_data; - - exp->exp_flvr.sf_rpc = SPTLRPC_FLVR_NULL; - - rc = mds_connect_internal(exp, data); - if (rc) - GOTO(out, rc); - - OBD_ALLOC(mcd, sizeof(*mcd)); - if (!mcd) - GOTO(out, rc = -ENOMEM); - - memcpy(mcd->mcd_uuid, cluuid, sizeof(mcd->mcd_uuid)); - med->med_mcd = mcd; - - rc = mds_client_add(obd, exp, -1, localdata); - GOTO(out, rc); - -out: - if (rc) { - if (mcd) { - OBD_FREE(mcd, sizeof(*mcd)); - med->med_mcd = NULL; - } - class_disconnect(exp); - } else { - class_export_put(exp); - } - - RETURN(rc); -} - -int mds_init_export(struct obd_export *exp) -{ - struct mds_export_data *med = &exp->exp_mds_data; - - CFS_INIT_LIST_HEAD(&med->med_open_head); - spin_lock_init(&med->med_open_lock); - - spin_lock(&exp->exp_lock); - exp->exp_connecting = 1; - spin_unlock(&exp->exp_lock); - - RETURN(0); -} - -static int mds_destroy_export(struct obd_export *export) -{ - struct mds_export_data *med; - struct obd_device *obd = export->exp_obd; - struct mds_obd *mds = &obd->u.mds; - struct lvfs_run_ctxt saved; - struct lov_mds_md *lmm; - struct llog_cookie *logcookies; - int rc = 0; - ENTRY; - - med = &export->exp_mds_data; - target_destroy_export(export); - - if (obd_uuid_equals(&export->exp_client_uuid, &obd->obd_uuid)) - RETURN(0); - - push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); - /* Close any open files (which may also cause orphan unlinking). */ - - OBD_ALLOC(lmm, mds->mds_max_mdsize); - if (lmm == NULL) { - CWARN("%s: allocation failure during cleanup; can not force " - "close file handles on this service.\n", obd->obd_name); - GOTO(out, rc = -ENOMEM); - } - - OBD_ALLOC(logcookies, mds->mds_max_cookiesize); - if (logcookies == NULL) { - CWARN("%s: allocation failure during cleanup; can not force " - "close file handles on this service.\n", obd->obd_name); - OBD_FREE(lmm, mds->mds_max_mdsize); - GOTO(out_lmm, rc = -ENOMEM); - } - - spin_lock(&med->med_open_lock); - while (!list_empty(&med->med_open_head)) { - struct list_head *tmp = med->med_open_head.next; - struct mds_file_data *mfd = - list_entry(tmp, struct mds_file_data, mfd_list); - int lmm_size = mds->mds_max_mdsize; - umode_t mode = mfd->mfd_dentry->d_inode->i_mode; - __u64 valid = 0; - - /* Remove mfd handle so it can't be found again. - * We are consuming the mfd_list reference here. */ - mds_mfd_unlink(mfd, 0); - spin_unlock(&med->med_open_lock); - - /* If you change this message, be sure to update - * replay_single:test_46 */ - CDEBUG(D_INODE|D_IOCTL, "%s: force closing file handle for " - "%.*s (ino %lu)\n", obd->obd_name, - mfd->mfd_dentry->d_name.len,mfd->mfd_dentry->d_name.name, - mfd->mfd_dentry->d_inode->i_ino); - - rc = mds_get_md(obd, mfd->mfd_dentry->d_inode, lmm, &lmm_size, 1); - if (rc < 0) - CWARN("mds_get_md failure, rc=%d\n", rc); - else - valid |= OBD_MD_FLEASIZE; - - /* child orphan sem protects orphan_dec_test and - * is_orphan race, mds_mfd_close drops it */ - MDS_DOWN_WRITE_ORPHAN_SEM(mfd->mfd_dentry->d_inode); - rc = mds_mfd_close(NULL, REQ_REC_OFF, obd, mfd, - !(export->exp_flags & OBD_OPT_FAILOVER), - lmm, lmm_size, logcookies, - mds->mds_max_cookiesize, - &valid); - - if (rc) - CDEBUG(D_INODE|D_IOCTL, "Error closing file: %d\n", rc); - - if (valid & OBD_MD_FLCOOKIE) { - rc = mds_osc_destroy_orphan(obd, mode, lmm, - lmm_size, logcookies, 1); - if (rc < 0) { - CDEBUG(D_INODE, "%s: destroy of orphan failed," - " rc = %d\n", obd->obd_name, rc); - rc = 0; - } - valid &= ~OBD_MD_FLCOOKIE; - } - - spin_lock(&med->med_open_lock); - } - spin_unlock(&med->med_open_lock); - - OBD_FREE(logcookies, mds->mds_max_cookiesize); -out_lmm: - OBD_FREE(lmm, mds->mds_max_mdsize); -out: - pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); - mds_client_free(export); - RETURN(rc); -} - -static int mds_disconnect(struct obd_export *exp) -{ - int rc; - ENTRY; - - LASSERT(exp); - class_export_get(exp); - - /* Disconnect early so that clients can't keep using export */ - rc = class_disconnect(exp); - if (exp->exp_obd->obd_namespace != NULL) - ldlm_cancel_locks_for_export(exp); - - /* complete all outstanding replies */ - spin_lock(&exp->exp_lock); - while (!list_empty(&exp->exp_outstanding_replies)) { - struct ptlrpc_reply_state *rs = - list_entry(exp->exp_outstanding_replies.next, - struct ptlrpc_reply_state, rs_exp_list); - struct ptlrpc_service *svc = rs->rs_service; - - spin_lock(&svc->srv_lock); - list_del_init(&rs->rs_exp_list); - ptlrpc_schedule_difficult_reply(rs); - spin_unlock(&svc->srv_lock); - } - spin_unlock(&exp->exp_lock); - - class_export_put(exp); - RETURN(rc); -} - -static int mds_getstatus(struct ptlrpc_request *req) -{ - struct mds_obd *mds = mds_req2mds(req); - struct mds_body *body; - int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) }; - ENTRY; - - if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK)) - RETURN(req->rq_status = -ENOMEM); - rc = lustre_pack_reply(req, 2, size, NULL); - if (rc) - RETURN(req->rq_status = rc); - - body = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*body)); - memcpy(&body->fid1, &mds->mds_rootfid, sizeof(body->fid1)); - - /* the last_committed and last_xid fields are filled in for all - * replies already - no need to do so here also. - */ - RETURN(0); -} - -/* get the LOV EA from @inode and store it into @md. It can be at most - * @size bytes, and @size is updated with the actual EA size. - * The EA size is also returned on success, and -ve errno on failure. - * If there is no EA then 0 is returned. */ -int mds_get_md(struct obd_device *obd, struct inode *inode, void *md, - int *size, int lock) -{ - int rc = 0; - int lmm_size; - - if (lock) - LOCK_INODE_MUTEX(inode); - rc = fsfilt_get_md(obd, inode, md, *size, "lov"); - - if (rc < 0) { - CERROR("Error %d reading eadata for ino %lu\n", - rc, inode->i_ino); - } else if (rc > 0) { - lmm_size = rc; - rc = mds_convert_lov_ea(obd, inode, md, lmm_size); - - if (rc == 0) { - *size = lmm_size; - rc = lmm_size; - } else if (rc > 0) { - *size = rc; - } - } else { - *size = 0; - } - if (lock) - UNLOCK_INODE_MUTEX(inode); - - RETURN (rc); -} - - -/* Call with lock=1 if you want mds_pack_md to take the i_mutex. - * Call with lock=0 if the caller has already taken the i_mutex. */ -int mds_pack_md(struct obd_device *obd, struct lustre_msg *msg, int offset, - struct mds_body *body, struct inode *inode, int lock) -{ - struct mds_obd *mds = &obd->u.mds; - void *lmm; - int lmm_size; - int rc; - ENTRY; - - lmm = lustre_msg_buf(msg, offset, 0); - if (lmm == NULL) { - /* Some problem with getting eadata when I sized the reply - * buffer... */ - CDEBUG(D_INFO, "no space reserved for inode %lu MD\n", - inode->i_ino); - RETURN(0); - } - lmm_size = lustre_msg_buflen(msg, offset); - - /* I don't really like this, but it is a sanity check on the client - * MD request. However, if the client doesn't know how much space - * to reserve for the MD, it shouldn't be bad to have too much space. - */ - if (lmm_size > mds->mds_max_mdsize) { - CWARN("Reading MD for inode %lu of %d bytes > max %d\n", - inode->i_ino, lmm_size, mds->mds_max_mdsize); - // RETURN(-EINVAL); - } - - rc = mds_get_md(obd, inode, lmm, &lmm_size, lock); - if (rc > 0) { - if (S_ISDIR(inode->i_mode)) - body->valid |= OBD_MD_FLDIREA; - else - body->valid |= OBD_MD_FLEASIZE; - body->eadatasize = lmm_size; - rc = 0; - } - - RETURN(rc); -} - -#ifdef CONFIG_FS_POSIX_ACL -static -int mds_pack_posix_acl(struct inode *inode, struct lustre_msg *repmsg, - struct mds_body *repbody, int repoff) -{ - struct dentry de = { .d_inode = inode }; - int buflen, rc; - ENTRY; - - LASSERT(repbody->aclsize == 0); - LASSERT(lustre_msg_bufcount(repmsg) > repoff); - - buflen = lustre_msg_buflen(repmsg, repoff); - if (!buflen) - GOTO(out, 0); - - if (!inode->i_op || !inode->i_op->getxattr) - GOTO(out, 0); - - rc = inode->i_op->getxattr(&de, MDS_XATTR_NAME_ACL_ACCESS, - lustre_msg_buf(repmsg, repoff, buflen), - buflen); - - if (rc >= 0) - repbody->aclsize = rc; - else if (rc != -ENODATA) { - CERROR("buflen %d, get acl: %d\n", buflen, rc); - RETURN(rc); - } - EXIT; -out: - repbody->valid |= OBD_MD_FLACL; - return 0; -} -#else -#define mds_pack_posix_acl(inode, repmsg, repbody, repoff) 0 -#endif - -int mds_pack_acl(struct mds_export_data *med, struct inode *inode, - struct lustre_msg *repmsg, struct mds_body *repbody, - int repoff) -{ - return mds_pack_posix_acl(inode, repmsg, repbody, repoff); -} - -static int mds_getattr_internal(struct obd_device *obd, struct dentry *dentry, - struct ptlrpc_request *req, - struct mds_body *reqbody, int reply_off) -{ - struct mds_body *body; - struct inode *inode = dentry->d_inode; - int rc = 0; - ENTRY; - - if (inode == NULL) - RETURN(-ENOENT); - - body = lustre_msg_buf(req->rq_repmsg, reply_off, sizeof(*body)); - LASSERT(body != NULL); /* caller prepped reply */ - - mds_pack_inode2fid(&body->fid1, inode); - body->flags = reqbody->flags; /* copy MDS_BFLAG_EXT_FLAGS if present */ - mds_pack_inode2body(body, inode); - reply_off++; - - if ((S_ISREG(inode->i_mode) && (reqbody->valid & OBD_MD_FLEASIZE)) || - (S_ISDIR(inode->i_mode) && (reqbody->valid & OBD_MD_FLDIREA))) { - rc = mds_pack_md(obd, req->rq_repmsg, reply_off, body, - inode, 1); - - /* If we have LOV EA data, the OST holds size, atime, mtime */ - if (!(body->valid & OBD_MD_FLEASIZE) && - !(body->valid & OBD_MD_FLDIREA)) - body->valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS | - OBD_MD_FLATIME | OBD_MD_FLMTIME); - - lustre_shrink_reply(req, reply_off, body->eadatasize, 0); - if (body->eadatasize) - reply_off++; - } else if (S_ISLNK(inode->i_mode) && - (reqbody->valid & OBD_MD_LINKNAME) != 0) { - char *symname = lustre_msg_buf(req->rq_repmsg, reply_off, 0); - int len; - - LASSERT (symname != NULL); /* caller prepped reply */ - len = lustre_msg_buflen(req->rq_repmsg, reply_off); - - rc = inode->i_op->readlink(dentry, symname, len); - if (rc < 0) { - CERROR("readlink failed: %d\n", rc); - } else if (rc != len - 1) { - CERROR ("Unexpected readlink rc %d: expecting %d\n", - rc, len - 1); - rc = -EINVAL; - } else { - CDEBUG(D_INODE, "read symlink dest %s\n", symname); - body->valid |= OBD_MD_LINKNAME; - body->eadatasize = rc + 1; - symname[rc] = 0; /* NULL terminate */ - rc = 0; - } - reply_off++; - } else if (reqbody->valid == OBD_MD_FLFLAGS && - reqbody->flags & MDS_BFLAG_EXT_FLAGS) { - int flags; - - /* We only return the full set of flags on ioctl, otherwise we - * get enough flags from the inode in mds_pack_inode2body(). */ - rc = fsfilt_iocontrol(obd, inode, NULL, EXT3_IOC_GETFLAGS, - (long)&flags); - if (rc == 0) - body->flags = flags | MDS_BFLAG_EXT_FLAGS; - } - - if (reqbody->valid & OBD_MD_FLMODEASIZE) { - struct mds_obd *mds = mds_req2mds(req); - body->max_cookiesize = mds->mds_max_cookiesize; - body->max_mdsize = mds->mds_max_mdsize; - body->valid |= OBD_MD_FLMODEASIZE; - } - - if (rc) - RETURN(rc); - -#ifdef CONFIG_FS_POSIX_ACL - if ((req->rq_export->exp_connect_flags & OBD_CONNECT_ACL) && - (reqbody->valid & OBD_MD_FLACL)) { - rc = mds_pack_acl(&req->rq_export->exp_mds_data, - inode, req->rq_repmsg, - body, reply_off); - - lustre_shrink_reply(req, reply_off, body->aclsize, 0); - if (body->aclsize) - reply_off++; - } -#endif - - RETURN(rc); -} - -static int mds_getattr_pack_msg(struct ptlrpc_request *req, struct inode *inode, - int offset) -{ - struct mds_obd *mds = mds_req2mds(req); - struct mds_body *body; - int rc, bufcount = 2; - int size[4] = { sizeof(struct ptlrpc_body), sizeof(*body) }; - ENTRY; - - LASSERT(offset == REQ_REC_OFF); /* non-intent */ - - body = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*body)); - LASSERT(body != NULL); /* checked by caller */ - LASSERT(lustre_req_swabbed(req, offset)); /* swabbed by caller */ - - if ((S_ISREG(inode->i_mode) && (body->valid & OBD_MD_FLEASIZE)) || - (S_ISDIR(inode->i_mode) && (body->valid & OBD_MD_FLDIREA))) { - LOCK_INODE_MUTEX(inode); - rc = fsfilt_get_md(req->rq_export->exp_obd, inode, NULL, 0, - "lov"); - UNLOCK_INODE_MUTEX(inode); - CDEBUG(D_INODE, "got %d bytes MD data for inode %lu\n", - rc, inode->i_ino); - if (rc < 0) { - if (rc != -ENODATA) { - CERROR("error getting inode %lu MD: rc = %d\n", - inode->i_ino, rc); - RETURN(rc); - } - size[bufcount] = 0; - } else if (rc > mds->mds_max_mdsize) { - size[bufcount] = 0; - CERROR("MD size %d larger than maximum possible %u\n", - rc, mds->mds_max_mdsize); - } else { - size[bufcount] = rc; - } - bufcount++; - } else if (S_ISLNK(inode->i_mode) && (body->valid & OBD_MD_LINKNAME)) { - if (i_size_read(inode) + 1 != body->eadatasize) - CERROR("symlink size: %Lu, reply space: %d\n", - i_size_read(inode) + 1, body->eadatasize); - size[bufcount] = min_t(int, i_size_read(inode) + 1, - body->eadatasize); - bufcount++; - CDEBUG(D_INODE, "symlink size: %Lu, reply space: %d\n", - i_size_read(inode) + 1, body->eadatasize); - } - -#ifdef CONFIG_FS_POSIX_ACL - if ((req->rq_export->exp_connect_flags & OBD_CONNECT_ACL) && - (body->valid & OBD_MD_FLACL)) { - struct dentry de = { .d_inode = inode }; - - size[bufcount] = 0; - if (inode->i_op && inode->i_op->getxattr) { - rc = inode->i_op->getxattr(&de, MDS_XATTR_NAME_ACL_ACCESS, - NULL, 0); - - if (rc < 0) { - if (rc != -ENODATA) { - CERROR("got acl size: %d\n", rc); - RETURN(rc); - } - } else - size[bufcount] = rc; - } - bufcount++; - } -#endif - - if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) { - CERROR("failed MDS_GETATTR_PACK test\n"); - req->rq_status = -ENOMEM; - RETURN(-ENOMEM); - } - - rc = lustre_pack_reply(req, bufcount, size, NULL); - if (rc) { - req->rq_status = rc; - RETURN(rc); - } - - RETURN(0); -} - -static int mds_getattr_lock(struct ptlrpc_request *req, int offset, - int child_part, struct lustre_handle *child_lockh) -{ - struct obd_device *obd = req->rq_export->exp_obd; - struct mds_obd *mds = &obd->u.mds; - struct ldlm_reply *rep = NULL; - struct lvfs_run_ctxt saved; - struct mds_body *body; - struct dentry *dparent = NULL, *dchild = NULL; - struct lvfs_ucred uc = {0,}; - struct lustre_handle parent_lockh; - int namesize; - int rc = 0, cleanup_phase = 0, resent_req = 0; - char *name; - ENTRY; - - LASSERT(!strcmp(obd->obd_type->typ_name, LUSTRE_MDS_NAME)); - - /* Swab now, before anyone looks inside the request */ - body = lustre_swab_reqbuf(req, offset, sizeof(*body), - lustre_swab_mds_body); - if (body == NULL) { - CERROR("Can't swab mds_body\n"); - RETURN(-EFAULT); - } - - lustre_set_req_swabbed(req, offset + 1); - name = lustre_msg_string(req->rq_reqmsg, offset + 1, 0); - if (name == NULL) { - CERROR("Can't unpack name\n"); - RETURN(-EFAULT); - } - namesize = lustre_msg_buflen(req->rq_reqmsg, offset + 1); - /* namesize less than 2 means we have empty name, probably came from - revalidate by cfid, so no point in having name to be set */ - if (namesize <= 1) - name = NULL; - - rc = mds_init_ucred(&uc, req, offset); - if (rc) - GOTO(cleanup, rc); - - LASSERT(offset == REQ_REC_OFF || offset == DLM_INTENT_REC_OFF); - /* if requests were at offset 2, the getattr reply goes back at 1 */ - if (offset == DLM_INTENT_REC_OFF) { - rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF, - sizeof(*rep)); - offset = DLM_REPLY_REC_OFF; - } - - push_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc); - cleanup_phase = 1; /* kernel context */ - intent_set_disposition(rep, DISP_LOOKUP_EXECD); - - /* FIXME: handle raw lookup */ -#if 0 - if (body->valid == OBD_MD_FLID) { - struct mds_body *mds_reply; - int size = sizeof(*mds_reply); - ino_t inum; - // The user requested ONLY the inode number, so do a raw lookup - rc = lustre_pack_reply(req, 1, &size, NULL); - if (rc) { - CERROR("out of memory\n"); - GOTO(cleanup, rc); - } - - rc = dir->i_op->lookup_raw(dir, name, namesize - 1, &inum); - - mds_reply = lustre_msg_buf(req->rq_repmsg, offset, - sizeof(*mds_reply)); - mds_reply->fid1.id = inum; - mds_reply->valid = OBD_MD_FLID; - GOTO(cleanup, rc); - } -#endif - - if (lustre_handle_is_used(child_lockh)) { - LASSERT(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT); - resent_req = 1; - } - - if (resent_req == 0) { - if (name) { - OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RESEND, obd_timeout * 2); - rc = mds_get_parent_child_locked(obd, &obd->u.mds, - &body->fid1, - &parent_lockh, - &dparent, LCK_CR, - MDS_INODELOCK_UPDATE, - name, namesize, - child_lockh, &dchild, - LCK_CR, child_part); - } else { - /* For revalidate by fid we always take UPDATE lock */ - dchild = mds_fid2locked_dentry(obd, &body->fid2, NULL, - LCK_CR, child_lockh, - child_part); - LASSERT(dchild); - if (IS_ERR(dchild)) - rc = PTR_ERR(dchild); - } - if (rc) - GOTO(cleanup, rc); - } else { - struct ldlm_lock *granted_lock; - struct ll_fid child_fid; - struct ldlm_resource *res; - DEBUG_REQ(D_DLMTRACE, req, "resent, not enqueuing new locks"); - granted_lock = ldlm_handle2lock(child_lockh); - LASSERTF(granted_lock != NULL, LPU64"/%u lockh "LPX64"\n", - body->fid1.id, body->fid1.generation, - child_lockh->cookie); - - - res = granted_lock->l_resource; - child_fid.id = res->lr_name.name[0]; - child_fid.generation = res->lr_name.name[1]; - dchild = mds_fid2dentry(&obd->u.mds, &child_fid, NULL); - LASSERT(!IS_ERR(dchild)); - LDLM_LOCK_PUT(granted_lock); - } - - cleanup_phase = 2; /* dchild, dparent, locks */ - - if (dchild->d_inode == NULL) { - intent_set_disposition(rep, DISP_LOOKUP_NEG); - /* in the intent case, the policy clears this error: - the disposition is enough */ - GOTO(cleanup, rc = -ENOENT); - } else { - intent_set_disposition(rep, DISP_LOOKUP_POS); - } - - if (req->rq_repmsg == NULL) { - rc = mds_getattr_pack_msg(req, dchild->d_inode, offset); - if (rc != 0) { - CERROR ("mds_getattr_pack_msg: %d\n", rc); - GOTO (cleanup, rc); - } - } - - rc = mds_getattr_internal(obd, dchild, req, body, offset); - GOTO(cleanup, rc); /* returns the lock to the client */ - - cleanup: - switch (cleanup_phase) { - case 2: - if (resent_req == 0) { - if (rc && dchild->d_inode) - ldlm_lock_decref(child_lockh, LCK_CR); - if (name) { - ldlm_lock_decref(&parent_lockh, LCK_CR); - l_dput(dparent); - } - } - l_dput(dchild); - case 1: - pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc); - default: - mds_exit_ucred(&uc, mds); - if (req->rq_reply_state == NULL) { - int rc2 = lustre_pack_reply(req, 1, NULL, NULL); - if (rc == 0) - rc = rc2; - req->rq_status = rc; - } - } - return rc; -} - -static int mds_getattr(struct ptlrpc_request *req, int offset) -{ - struct mds_obd *mds = mds_req2mds(req); - struct obd_device *obd = req->rq_export->exp_obd; - struct lvfs_run_ctxt saved; - struct dentry *de; - struct mds_body *body; - struct lvfs_ucred uc = {0,}; - int rc = 0; - ENTRY; - - OBD_COUNTER_INCREMENT(obd, getattr); - - body = lustre_swab_reqbuf(req, offset, sizeof(*body), - lustre_swab_mds_body); - if (body == NULL) - RETURN(-EFAULT); - - rc = mds_init_ucred(&uc, req, offset); - if (rc) - GOTO(out_ucred, rc); - - push_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc); - de = mds_fid2dentry(mds, &body->fid1, NULL); - if (IS_ERR(de)) { - rc = req->rq_status = PTR_ERR(de); - GOTO(out_pop, rc); - } - - rc = mds_getattr_pack_msg(req, de->d_inode, offset); - if (rc != 0) { - CERROR("mds_getattr_pack_msg: %d\n", rc); - GOTO(out_pop, rc); - } - - req->rq_status = mds_getattr_internal(obd, de, req, body, - REPLY_REC_OFF); - - l_dput(de); - GOTO(out_pop, rc); -out_pop: - pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc); -out_ucred: - if (req->rq_reply_state == NULL) { - int rc2 = lustre_pack_reply(req, 1, NULL, NULL); - if (rc == 0) - rc = rc2; - req->rq_status = rc; - } - mds_exit_ucred(&uc, mds); - return rc; -} - -static int mds_obd_statfs(struct obd_device *obd, struct obd_statfs *osfs, - __u64 max_age, __u32 flags) -{ - int rc; - - spin_lock(&obd->obd_osfs_lock); - rc = fsfilt_statfs(obd, obd->u.obt.obt_sb, max_age); - if (rc == 0) - memcpy(osfs, &obd->obd_osfs, sizeof(*osfs)); - spin_unlock(&obd->obd_osfs_lock); - - return rc; -} - -static int mds_statfs(struct ptlrpc_request *req) -{ - struct obd_device *obd = req->rq_export->exp_obd; - int rc, size[2] = { sizeof(struct ptlrpc_body), - sizeof(struct obd_statfs) }; - ENTRY; - - /* This will trigger a watchdog timeout */ - OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_STATFS_LCW_SLEEP, - (MDS_SERVICE_WATCHDOG_TIMEOUT / 1000) + 1); - OBD_COUNTER_INCREMENT(obd, statfs); - - if (OBD_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK)) - GOTO(out, rc = -ENOMEM); - rc = lustre_pack_reply(req, 2, size, NULL); - if (rc) - GOTO(out, rc); - - /* We call this so that we can cache a bit - 1 jiffie worth */ - rc = mds_obd_statfs(obd, lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, - size[REPLY_REC_OFF]), - cfs_time_current_64() - HZ, 0); - if (rc) { - CERROR("mds_obd_statfs failed: rc %d\n", rc); - GOTO(out, rc); - } - - EXIT; -out: - req->rq_status = rc; - return 0; -} - -static int mds_sync(struct ptlrpc_request *req, int offset) -{ - struct obd_device *obd = req->rq_export->exp_obd; - struct mds_obd *mds = &obd->u.mds; - struct mds_body *body; - int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) }; - ENTRY; - - body = lustre_swab_reqbuf(req, offset, sizeof(*body), - lustre_swab_mds_body); - if (body == NULL) - GOTO(out, rc = -EFAULT); - - if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SYNC_PACK)) - GOTO(out, rc = -ENOMEM); - rc = lustre_pack_reply(req, 2, size, NULL); - if (rc) - GOTO(out, rc); - - rc = fsfilt_sync(obd, obd->u.obt.obt_sb); - if (rc == 0 && body->fid1.id != 0) { - struct dentry *de; - - de = mds_fid2dentry(mds, &body->fid1, NULL); - if (IS_ERR(de)) - GOTO(out, rc = PTR_ERR(de)); - - body = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, - sizeof(*body)); - mds_pack_inode2fid(&body->fid1, de->d_inode); - mds_pack_inode2body(body, de->d_inode); - - l_dput(de); - } - GOTO(out, rc); -out: - req->rq_status = rc; - return 0; -} - -/* mds_readpage does not take a DLM lock on the inode, because the client must - * already have a PR lock. - * - * If we were to take another one here, a deadlock will result, if another - * thread is already waiting for a PW lock. */ -static int mds_readpage(struct ptlrpc_request *req, int offset) -{ - struct obd_device *obd = req->rq_export->exp_obd; - struct mds_obd *mds = &obd->u.mds; - struct vfsmount *mnt; - struct dentry *de; - struct file *file; - struct mds_body *body, *repbody; - struct lvfs_run_ctxt saved; - int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*repbody) }; - struct lvfs_ucred uc = {0,}; - ENTRY; - - if (OBD_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK)) - RETURN(-ENOMEM); - rc = lustre_pack_reply(req, 2, size, NULL); - if (rc) - GOTO(out, rc); - - body = lustre_swab_reqbuf(req, offset, sizeof(*body), - lustre_swab_mds_body); - if (body == NULL) - GOTO (out, rc = -EFAULT); - - rc = mds_init_ucred(&uc, req, offset); - if (rc) - GOTO(out, rc); - - push_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc); - de = mds_fid2dentry(&obd->u.mds, &body->fid1, &mnt); - if (IS_ERR(de)) - GOTO(out_pop, rc = PTR_ERR(de)); - - CDEBUG(D_INODE, "ino %lu\n", de->d_inode->i_ino); - - file = dentry_open(de, mnt, O_RDONLY | O_LARGEFILE); - /* note: in case of an error, dentry_open puts dentry */ - if (IS_ERR(file)) - GOTO(out_pop, rc = PTR_ERR(file)); - - /* body->size is actually the offset -eeb */ - if ((body->size & (de->d_inode->i_sb->s_blocksize - 1)) != 0) { - CERROR("offset "LPU64" not on a block boundary of %lu\n", - body->size, de->d_inode->i_sb->s_blocksize); - GOTO(out_file, rc = -EFAULT); - } - - /* body->nlink is actually the #bytes to read -eeb */ - if (body->nlink & (de->d_inode->i_sb->s_blocksize - 1)) { - CERROR("size %u is not multiple of blocksize %lu\n", - body->nlink, de->d_inode->i_sb->s_blocksize); - GOTO(out_file, rc = -EFAULT); - } - - repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, - sizeof(*repbody)); - repbody->size = i_size_read(file->f_dentry->d_inode); - repbody->valid = OBD_MD_FLSIZE; - - /* to make this asynchronous make sure that the handling function - doesn't send a reply when this function completes. Instead a - callback function would send the reply */ - /* body->size is actually the offset -eeb */ - rc = mds_sendpage(req, file, body->size, body->nlink); - -out_file: - filp_close(file, 0); -out_pop: - pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc); -out: - mds_exit_ucred(&uc, mds); - req->rq_status = rc; - RETURN(0); -} - -int mds_reint(struct ptlrpc_request *req, int offset, - struct lustre_handle *lockh) -{ - struct mds_update_record *rec; /* 116 bytes on the stack? no sir! */ - int rc; - - OBD_ALLOC(rec, sizeof(*rec)); - if (rec == NULL) - RETURN(-ENOMEM); - - rc = mds_update_unpack(req, offset, rec); - if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNPACK)) { - CERROR("invalid record\n"); - GOTO(out, req->rq_status = -EINVAL); - } - - /* rc will be used to interrupt a for loop over multiple records */ - rc = mds_reint_rec(rec, offset, req, lockh); - out: - OBD_FREE(rec, sizeof(*rec)); - return rc; -} - -int mds_filter_recovery_request(struct ptlrpc_request *req, - struct obd_device *obd, int *process) -{ - switch (lustre_msg_get_opc(req->rq_reqmsg)) { - case MDS_CONNECT: /* This will never get here, but for completeness. */ - case OST_CONNECT: /* This will never get here, but for completeness. */ - case MDS_DISCONNECT: - case OST_DISCONNECT: - *process = 1; - RETURN(0); - - case MDS_CLOSE: - case MDS_DONE_WRITING: - case MDS_SYNC: /* used in unmounting */ - case OBD_PING: - case MDS_REINT: - case SEQ_QUERY: - case FLD_QUERY: - case LDLM_ENQUEUE: - *process = target_queue_recovery_request(req, obd); - RETURN(0); - - default: - DEBUG_REQ(D_ERROR, req, "not permitted during recovery"); - *process = -EAGAIN; - RETURN(0); - } -} -EXPORT_SYMBOL(mds_filter_recovery_request); - -static char *reint_names[] = { - [REINT_SETATTR] "setattr", - [REINT_CREATE] "create", - [REINT_LINK] "link", - [REINT_UNLINK] "unlink", - [REINT_RENAME] "rename", - [REINT_OPEN] "open", -}; - -static int mds_set_info_rpc(struct obd_export *exp, struct ptlrpc_request *req) -{ - void *key, *val; - int keylen, vallen, rc = 0; - ENTRY; - - key = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, 1); - if (key == NULL) { - DEBUG_REQ(D_HA, req, "no set_info key"); - RETURN(-EFAULT); - } - keylen = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF); - - val = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, 0); - vallen = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF + 1); - - rc = lustre_pack_reply(req, 1, NULL, NULL); - if (rc) - RETURN(rc); - - lustre_msg_set_status(req->rq_repmsg, 0); - - if (KEY_IS("read-only")) { - if (val == NULL || vallen < sizeof(__u32)) { - DEBUG_REQ(D_HA, req, "no set_info val"); - RETURN(-EFAULT); - } - - if (*(__u32 *)val) - exp->exp_connect_flags |= OBD_CONNECT_RDONLY; - else - exp->exp_connect_flags &= ~OBD_CONNECT_RDONLY; - } else { - RETURN(-EINVAL); - } - - RETURN(0); -} - -static int mds_handle_quotacheck(struct ptlrpc_request *req) -{ - struct obd_quotactl *oqctl; - int rc; - ENTRY; - - oqctl = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*oqctl), - lustre_swab_obd_quotactl); - if (oqctl == NULL) - RETURN(-EPROTO); - - rc = lustre_pack_reply(req, 1, NULL, NULL); - if (rc) - RETURN(rc); - - req->rq_status = obd_quotacheck(req->rq_export, oqctl); - RETURN(0); -} - -static int mds_handle_quotactl(struct ptlrpc_request *req) -{ - struct obd_quotactl *oqctl, *repoqc; - int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*repoqc) }; - ENTRY; - - oqctl = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*oqctl), - lustre_swab_obd_quotactl); - if (oqctl == NULL) - RETURN(-EPROTO); - - rc = lustre_pack_reply(req, 2, size, NULL); - if (rc) - RETURN(rc); - - repoqc = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*repoqc)); - - req->rq_status = obd_quotactl(req->rq_export, oqctl); - *repoqc = *oqctl; - RETURN(0); -} - -int mds_msg_check_version(struct lustre_msg *msg) -{ - int rc; - - switch (lustre_msg_get_opc(msg)) { - case MDS_CONNECT: - case MDS_DISCONNECT: - case OBD_PING: - case SEC_CTX_INIT: - case SEC_CTX_INIT_CONT: - case SEC_CTX_FINI: - rc = lustre_msg_check_version(msg, LUSTRE_OBD_VERSION); - if (rc) - CERROR("bad opc %u version %08x, expecting %08x\n", - lustre_msg_get_opc(msg), - lustre_msg_get_version(msg), - LUSTRE_OBD_VERSION); - break; - case MDS_GETSTATUS: - case MDS_GETATTR: - case MDS_GETATTR_NAME: - case MDS_STATFS: - case MDS_READPAGE: - case MDS_WRITEPAGE: - case MDS_IS_SUBDIR: - case MDS_REINT: - case MDS_CLOSE: - case MDS_DONE_WRITING: - case MDS_PIN: - case MDS_SYNC: - case MDS_GETXATTR: - case MDS_SETXATTR: - case MDS_SET_INFO: - case MDS_QUOTACHECK: - case MDS_QUOTACTL: - case QUOTA_DQACQ: - case QUOTA_DQREL: - case SEQ_QUERY: - case FLD_QUERY: - rc = lustre_msg_check_version(msg, LUSTRE_MDS_VERSION); - if (rc) - CERROR("bad opc %u version %08x, expecting %08x\n", - lustre_msg_get_opc(msg), - lustre_msg_get_version(msg), - LUSTRE_MDS_VERSION); - break; - case LDLM_ENQUEUE: - case LDLM_CONVERT: - case LDLM_BL_CALLBACK: - case LDLM_CP_CALLBACK: - rc = lustre_msg_check_version(msg, LUSTRE_DLM_VERSION); - if (rc) - CERROR("bad opc %u version %08x, expecting %08x\n", - lustre_msg_get_opc(msg), - lustre_msg_get_version(msg), - LUSTRE_DLM_VERSION); - break; - case OBD_LOG_CANCEL: - case LLOG_ORIGIN_HANDLE_CREATE: - case LLOG_ORIGIN_HANDLE_NEXT_BLOCK: - case LLOG_ORIGIN_HANDLE_READ_HEADER: - case LLOG_ORIGIN_HANDLE_CLOSE: - case LLOG_ORIGIN_HANDLE_DESTROY: - case LLOG_ORIGIN_HANDLE_PREV_BLOCK: - case LLOG_CATINFO: - rc = lustre_msg_check_version(msg, LUSTRE_LOG_VERSION); - if (rc) - CERROR("bad opc %u version %08x, expecting %08x\n", - lustre_msg_get_opc(msg), - lustre_msg_get_version(msg), - LUSTRE_LOG_VERSION); - break; - default: - CERROR("MDS unknown opcode %d\n", lustre_msg_get_opc(msg)); - rc = -ENOTSUPP; - } - return rc; -} -EXPORT_SYMBOL(mds_msg_check_version); - -int mds_handle(struct ptlrpc_request *req) -{ - int should_process, fail = OBD_FAIL_MDS_ALL_REPLY_NET; - int rc; - struct mds_obd *mds = NULL; /* quell gcc overwarning */ - struct obd_device *obd = NULL; - ENTRY; - - if (OBD_FAIL_CHECK_ORSET(OBD_FAIL_MDS_ALL_REQUEST_NET, OBD_FAIL_ONCE)) - RETURN(0); - - LASSERT(current->journal_info == NULL); - - rc = mds_msg_check_version(req->rq_reqmsg); - if (rc) { - CERROR("MDS drop mal-formed request\n"); - RETURN(rc); - } - - /* XXX identical to OST */ - if (lustre_msg_get_opc(req->rq_reqmsg) != MDS_CONNECT) { - struct mds_export_data *med; - int recovering; - - if (req->rq_export == NULL) { - CERROR("operation %d on unconnected MDS from %s\n", - lustre_msg_get_opc(req->rq_reqmsg), - libcfs_id2str(req->rq_peer)); - req->rq_status = -ENOTCONN; - GOTO(out, rc = -ENOTCONN); - } - - med = &req->rq_export->exp_mds_data; - obd = req->rq_export->exp_obd; - mds = mds_req2mds(req); - - /* sanity check: if the xid matches, the request must - * be marked as a resent or replayed */ - if (req->rq_xid == le64_to_cpu(med->med_mcd->mcd_last_xid) || - req->rq_xid == le64_to_cpu(med->med_mcd->mcd_last_close_xid)) - if (!(lustre_msg_get_flags(req->rq_reqmsg) & - (MSG_RESENT | MSG_REPLAY))) { - CERROR("rq_xid "LPU64" matches last_xid, " - "expected RESENT flag\n", - req->rq_xid); - req->rq_status = -ENOTCONN; - GOTO(out, rc = -EFAULT); - } - /* else: note the opposite is not always true; a - * RESENT req after a failover will usually not match - * the last_xid, since it was likely never - * committed. A REPLAYed request will almost never - * match the last xid, however it could for a - * committed, but still retained, open. */ - - /* Check for aborted recovery. */ - spin_lock_bh(&obd->obd_processing_task_lock); - recovering = obd->obd_recovering; - spin_unlock_bh(&obd->obd_processing_task_lock); - if (recovering) { - rc = mds_filter_recovery_request(req, obd, - &should_process); - if (rc || !should_process) - RETURN(rc); - else if (should_process < 0) { - req->rq_status = should_process; - rc = ptlrpc_error(req); - RETURN(rc); - } - } - } - - switch (lustre_msg_get_opc(req->rq_reqmsg)) { - case MDS_CONNECT: - DEBUG_REQ(D_INODE, req, "connect"); - if (OBD_FAIL_CHECK(OBD_FAIL_MDS_CONNECT_NET)) - RETURN(0); - rc = target_handle_connect(req); - if (!rc) { - /* Now that we have an export, set mds. */ - /* - * XXX nikita: these assignments are useless: mds is - * never used below, and obd is only used for - * MSG_LAST_REPLAY case, which never happens for - * MDS_CONNECT. - */ - obd = req->rq_export->exp_obd; - mds = mds_req2mds(req); - } - break; - - case MDS_DISCONNECT: - DEBUG_REQ(D_INODE, req, "disconnect"); - if (OBD_FAIL_CHECK(OBD_FAIL_MDS_DISCONNECT_NET)) - RETURN(0); - rc = target_handle_disconnect(req); - req->rq_status = rc; /* superfluous? */ - break; - - case MDS_GETSTATUS: - DEBUG_REQ(D_INODE, req, "getstatus"); - if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_NET)) - RETURN(0); - rc = mds_getstatus(req); - break; - - case MDS_GETATTR: - DEBUG_REQ(D_INODE, req, "getattr"); - if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_NET)) - RETURN(0); - rc = mds_getattr(req, REQ_REC_OFF); - break; - - case MDS_SETXATTR: - DEBUG_REQ(D_INODE, req, "setxattr"); - if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SETXATTR_NET)) - RETURN(0); - rc = mds_setxattr(req); - break; - - case MDS_GETXATTR: - DEBUG_REQ(D_INODE, req, "getxattr"); - if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETXATTR_NET)) - RETURN(0); - rc = mds_getxattr(req); - break; - - case MDS_GETATTR_NAME: { - struct lustre_handle lockh = { 0 }; - DEBUG_REQ(D_INODE, req, "getattr_name"); - if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_NAME_NET)) - RETURN(0); - - /* If this request gets a reconstructed reply, we won't be - * acquiring any new locks in mds_getattr_lock, so we don't - * want to cancel. - */ - rc = mds_getattr_lock(req, REQ_REC_OFF, MDS_INODELOCK_UPDATE, - &lockh); - /* this non-intent call (from an ioctl) is special */ - req->rq_status = rc; - if (rc == 0 && lustre_handle_is_used(&lockh)) - ldlm_lock_decref(&lockh, LCK_CR); - break; - } - case MDS_STATFS: - DEBUG_REQ(D_INODE, req, "statfs"); - if (OBD_FAIL_CHECK(OBD_FAIL_MDS_STATFS_NET)) - RETURN(0); - rc = mds_statfs(req); - break; - - case MDS_READPAGE: - DEBUG_REQ(D_INODE, req, "readpage"); - if (OBD_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_NET)) - RETURN(0); - rc = mds_readpage(req, REQ_REC_OFF); - - if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE)) { - RETURN(0); - } - - break; - - case MDS_REINT: { - __u32 *opcp = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, - sizeof(*opcp)); - __u32 opc; - int op = 0; - int size[4] = { sizeof(struct ptlrpc_body), - sizeof(struct mds_body), - mds->mds_max_mdsize, - mds->mds_max_cookiesize }; - int bufcount; - - /* NB only peek inside req now; mds_reint() will swab it */ - if (opcp == NULL) { - CERROR ("Can't inspect opcode\n"); - rc = -EINVAL; - break; - } - opc = *opcp; - if (lustre_msg_swabbed(req->rq_reqmsg)) - __swab32s(&opc); - - DEBUG_REQ(D_INODE, req, "reint %d (%s)", opc, - (opc < sizeof(reint_names) / sizeof(reint_names[0]) || - reint_names[opc] == NULL) ? reint_names[opc] : - "unknown opcode"); - switch (opc) { - case REINT_CREATE: - op = PTLRPC_LAST_CNTR + MDS_REINT_CREATE; - break; - case REINT_LINK: - op = PTLRPC_LAST_CNTR + MDS_REINT_LINK; - break; - case REINT_OPEN: - op = PTLRPC_LAST_CNTR + MDS_REINT_OPEN; - break; - case REINT_SETATTR: - op = PTLRPC_LAST_CNTR + MDS_REINT_SETATTR; - break; - case REINT_RENAME: - op = PTLRPC_LAST_CNTR + MDS_REINT_RENAME; - break; - case REINT_UNLINK: - op = PTLRPC_LAST_CNTR + MDS_REINT_UNLINK; - break; - default: - op = 0; - break; - } - - if (op && req->rq_rqbd->rqbd_service->srv_stats) - lprocfs_counter_incr( - req->rq_rqbd->rqbd_service->srv_stats, op); - - if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_NET)) - RETURN(0); - - if (opc == REINT_UNLINK || opc == REINT_RENAME) - bufcount = 4; - else if (opc == REINT_OPEN) - bufcount = 3; - else - bufcount = 2; - - rc = lustre_pack_reply(req, bufcount, size, NULL); - if (rc) - break; - - rc = mds_reint(req, REQ_REC_OFF, NULL); - fail = OBD_FAIL_MDS_REINT_NET_REP; - break; - } - - case MDS_CLOSE: - DEBUG_REQ(D_INODE, req, "close"); - if (OBD_FAIL_CHECK(OBD_FAIL_MDS_CLOSE_NET)) - RETURN(0); - rc = mds_close(req, REQ_REC_OFF); - fail = OBD_FAIL_MDS_CLOSE_NET_REP; - break; - - case MDS_DONE_WRITING: - DEBUG_REQ(D_INODE, req, "done_writing"); - if (OBD_FAIL_CHECK(OBD_FAIL_MDS_DONE_WRITING_NET)) - RETURN(0); - rc = mds_done_writing(req, REQ_REC_OFF); - break; - - case MDS_PIN: - DEBUG_REQ(D_INODE, req, "pin"); - if (OBD_FAIL_CHECK(OBD_FAIL_MDS_PIN_NET)) - RETURN(0); - rc = mds_pin(req, REQ_REC_OFF); - break; - - case MDS_SYNC: - DEBUG_REQ(D_INODE, req, "sync"); - if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SYNC_NET)) - RETURN(0); - rc = mds_sync(req, REQ_REC_OFF); - break; - - case MDS_SET_INFO: - DEBUG_REQ(D_INODE, req, "set_info"); - rc = mds_set_info_rpc(req->rq_export, req); - break; - - case MDS_QUOTACHECK: - DEBUG_REQ(D_INODE, req, "quotacheck"); - if (OBD_FAIL_CHECK(OBD_FAIL_MDS_QUOTACHECK_NET)) - RETURN(0); - rc = mds_handle_quotacheck(req); - break; - - case MDS_QUOTACTL: - DEBUG_REQ(D_INODE, req, "quotactl"); - if (OBD_FAIL_CHECK(OBD_FAIL_MDS_QUOTACTL_NET)) - RETURN(0); - rc = mds_handle_quotactl(req); - break; - - case OBD_PING: - DEBUG_REQ(D_INODE, req, "ping"); - rc = target_handle_ping(req); - break; - - case OBD_LOG_CANCEL: - CDEBUG(D_INODE, "log cancel\n"); - if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOG_CANCEL_NET)) - RETURN(0); - rc = -ENOTSUPP; /* la la la */ - break; - - case LDLM_ENQUEUE: - DEBUG_REQ(D_INODE, req, "enqueue"); - if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_ENQUEUE)) - RETURN(0); - rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast, - ldlm_server_blocking_ast, NULL); - fail = OBD_FAIL_LDLM_REPLY; - break; - case LDLM_CONVERT: - DEBUG_REQ(D_INODE, req, "convert"); - if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CONVERT)) - RETURN(0); - rc = ldlm_handle_convert(req); - break; - case LDLM_BL_CALLBACK: - case LDLM_CP_CALLBACK: - DEBUG_REQ(D_INODE, req, "callback"); - CERROR("callbacks should not happen on MDS\n"); - LBUG(); - if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_BL_CALLBACK)) - RETURN(0); - break; - case LLOG_ORIGIN_HANDLE_CREATE: - DEBUG_REQ(D_INODE, req, "llog_init"); - if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOGD_NET)) - RETURN(0); - rc = llog_origin_handle_create(req); - break; - case LLOG_ORIGIN_HANDLE_DESTROY: - DEBUG_REQ(D_INODE, req, "llog_init"); - if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOGD_NET)) - RETURN(0); - rc = llog_origin_handle_destroy(req); - break; - case LLOG_ORIGIN_HANDLE_NEXT_BLOCK: - DEBUG_REQ(D_INODE, req, "llog next block"); - if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOGD_NET)) - RETURN(0); - rc = llog_origin_handle_next_block(req); - break; - case LLOG_ORIGIN_HANDLE_PREV_BLOCK: - DEBUG_REQ(D_INODE, req, "llog prev block"); - if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOGD_NET)) - RETURN(0); - rc = llog_origin_handle_prev_block(req); - break; - case LLOG_ORIGIN_HANDLE_READ_HEADER: - DEBUG_REQ(D_INODE, req, "llog read header"); - if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOGD_NET)) - RETURN(0); - rc = llog_origin_handle_read_header(req); - break; - case LLOG_ORIGIN_HANDLE_CLOSE: - DEBUG_REQ(D_INODE, req, "llog close"); - if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOGD_NET)) - RETURN(0); - rc = llog_origin_handle_close(req); - break; - case LLOG_CATINFO: - DEBUG_REQ(D_INODE, req, "llog catinfo"); - if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOGD_NET)) - RETURN(0); - rc = llog_catinfo(req); - break; - default: - req->rq_status = -ENOTSUPP; - rc = ptlrpc_error(req); - RETURN(rc); - } - - LASSERT(current->journal_info == NULL); - - /* If we're DISCONNECTing, the mds_export_data is already freed */ - if (!rc && lustre_msg_get_opc(req->rq_reqmsg) != MDS_DISCONNECT) { - struct mds_export_data *med = &req->rq_export->exp_mds_data; - - /* I don't think last_xid is used for anyway, so I'm not sure - if we need to care about last_close_xid here.*/ - lustre_msg_set_last_xid(req->rq_repmsg, - le64_to_cpu(med->med_mcd->mcd_last_xid)); - - target_committed_to_req(req); - } - - EXIT; - out: - - target_send_reply(req, rc, fail); - return 0; -} - -/* Update the server data on disk. This stores the new mount_count and - * also the last_rcvd value to disk. If we don't have a clean shutdown, - * then the server last_rcvd value may be less than that of the clients. - * This will alert us that we may need to do client recovery. - * - * Also assumes for mds_last_transno that we are not modifying it (no locking). - */ -int mds_update_server_data(struct obd_device *obd, int force_sync) -{ - struct mds_obd *mds = &obd->u.mds; - struct lr_server_data *lsd = mds->mds_server_data; - struct file *filp = mds->mds_rcvd_filp; - struct lvfs_run_ctxt saved; - loff_t off = 0; - int rc; - ENTRY; + RETURN(ERR_PTR(-ENOENT)); + } - CDEBUG(D_SUPER, "MDS mount_count is "LPU64", last_transno is "LPU64"\n", - mds->mds_mount_count, mds->mds_last_transno); + if (generation && inode->i_generation != generation) { + /* we didn't find the right inode.. */ + CDEBUG(D_INODE, "found wrong generation: inode %lu, link: %lu, " + "count: %d, generation %u/%u\n", inode->i_ino, + (unsigned long)inode->i_nlink, + atomic_read(&inode->i_count), inode->i_generation, + generation); + dput(result); + RETURN(ERR_PTR(-ENOENT)); + } - spin_lock(&mds->mds_transno_lock); - lsd->lsd_last_transno = cpu_to_le64(mds->mds_last_transno); - spin_unlock(&mds->mds_transno_lock); + if (mnt) { + *mnt = mds->mds_vfsmnt; + mntget(*mnt); + } - push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); - rc = fsfilt_write_record(obd, filp, lsd, sizeof(*lsd), &off,force_sync); - pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); - if (rc) - CERROR("error writing MDS server data: rc = %d\n", rc); - RETURN(rc); + RETURN(result); } -static void fsoptions_to_mds_flags(struct mds_obd *mds, char *options) -{ - char *p = options; - - if (!options) - return; - - while (*options) { - int len; - - while (*p && *p != ',') - p++; - - len = p - options; - if (len == sizeof("user_xattr") - 1 && - memcmp(options, "user_xattr", len) == 0) { - mds->mds_fl_user_xattr = 1; - LCONSOLE_INFO("Enabling user_xattr\n"); - } else if (len == sizeof("nouser_xattr") - 1 && - memcmp(options, "nouser_xattr", len) == 0) { - mds->mds_fl_user_xattr = 0; - LCONSOLE_INFO("Disabling user_xattr\n"); - } else if (len == sizeof("acl") - 1 && - memcmp(options, "acl", len) == 0) { -#ifdef CONFIG_FS_POSIX_ACL - mds->mds_fl_acl = 1; - LCONSOLE_INFO("Enabling ACL\n"); -#else - CWARN("ignoring unsupported acl mount option\n"); -#endif - } else if (len == sizeof("noacl") - 1 && - memcmp(options, "noacl", len) == 0) { -#ifdef CONFIG_FS_POSIX_ACL - mds->mds_fl_acl = 0; - LCONSOLE_INFO("Disabling ACL\n"); -#endif - } - - options = ++p; - } -} static int mds_lov_presetup (struct mds_obd *mds, struct lustre_cfg *lcfg) { int rc = 0; @@ -1939,191 +139,6 @@ static int mds_lov_presetup (struct mds_obd *mds, struct lustre_cfg *lcfg) RETURN(rc); } -/* mount the file system (secretly). lustre_cfg parameters are: - * 1 = device - * 2 = fstype - * 3 = config name - * 4 = mount options - */ -static int mds_setup(struct obd_device *obd, struct lustre_cfg* lcfg) -{ - struct lprocfs_static_vars lvars; - struct mds_obd *mds = &obd->u.mds; - struct lustre_mount_info *lmi; - struct vfsmount *mnt; - struct lustre_sb_info *lsi; - struct obd_uuid uuid; - __u8 *uuid_ptr; - char *str, *label; - char ns_name[48]; - int rc = 0; - ENTRY; - - /* setup 1:/dev/loop/0 2:ext3 3:mdsA 4:errors=remount-ro,iopen_nopriv */ - - CLASSERT(offsetof(struct obd_device, u.obt) == - offsetof(struct obd_device, u.mds.mds_obt)); - - if (lcfg->lcfg_bufcount < 3) - RETURN(-EINVAL); - - if (LUSTRE_CFG_BUFLEN(lcfg, 1) == 0 || LUSTRE_CFG_BUFLEN(lcfg, 2) == 0) - RETURN(-EINVAL); - - lmi = server_get_mount(obd->obd_name); - if (!lmi) { - CERROR("Not mounted in lustre_fill_super?\n"); - RETURN(-EINVAL); - } - - /* We mounted in lustre_fill_super. - lcfg bufs 1, 2, 4 (device, fstype, mount opts) are ignored.*/ - - lsi = s2lsi(lmi->lmi_sb); - fsoptions_to_mds_flags(mds, lsi->lsi_ldd->ldd_mount_opts); - fsoptions_to_mds_flags(mds, lsi->lsi_lmd->lmd_opts); - mnt = lmi->lmi_mnt; - obd->obd_fsops = fsfilt_get_ops(MT_STR(lsi->lsi_ldd)); - if (IS_ERR(obd->obd_fsops)) - GOTO(err_put, rc = PTR_ERR(obd->obd_fsops)); - - CDEBUG(D_SUPER, "%s: mnt = %p\n", lustre_cfg_string(lcfg, 1), mnt); - - LASSERT(!lvfs_check_rdonly(lvfs_sbdev(mnt->mnt_sb))); - - sema_init(&mds->mds_epoch_sem, 1); - spin_lock_init(&mds->mds_transno_lock); - mds->mds_max_mdsize = sizeof(struct lov_mds_md); - mds->mds_max_cookiesize = sizeof(struct llog_cookie); - mds->mds_atime_diff = MAX_ATIME_DIFF; - mds->mds_evict_ost_nids = 1; - - sprintf(ns_name, "mds-%s", obd->obd_uuid.uuid); - obd->obd_namespace = ldlm_namespace_new(ns_name, LDLM_NAMESPACE_SERVER, - LDLM_NAMESPACE_GREEDY); - if (obd->obd_namespace == NULL) { - mds_cleanup(obd); - GOTO(err_ops, rc = -ENOMEM); - } - ldlm_register_intent(obd->obd_namespace, mds_intent_policy); - - lprocfs_mds_init_vars(&lvars); - if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0 && - lprocfs_alloc_obd_stats(obd, LPROC_MDS_LAST) == 0) { - /* Init private stats here */ - mds_stats_counter_init(obd->obd_stats); - obd->obd_proc_exports_entry = lprocfs_register("exports", - obd->obd_proc_entry, - NULL, NULL); - if (IS_ERR(obd->obd_proc_exports_entry)) { - rc = PTR_ERR(obd->obd_proc_exports_entry); - CERROR("error %d setting up lprocfs for %s\n", - rc, "exports"); - obd->obd_proc_exports_entry = NULL; - } - } - - rc = mds_fs_setup(obd, mnt); - if (rc) { - CERROR("%s: MDS filesystem method init failed: rc = %d\n", - obd->obd_name, rc); - GOTO(err_ns, rc); - } - - if (obd->obd_proc_exports_entry) - lprocfs_add_simple(obd->obd_proc_exports_entry, - "clear", lprocfs_nid_stats_clear_read, - lprocfs_nid_stats_clear_write, obd); - - rc = mds_lov_presetup(mds, lcfg); - if (rc < 0) - GOTO(err_fs, rc); - - ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL, - "mds_ldlm_client", &obd->obd_ldlm_client); - obd->obd_replayable = 1; - - rc = lquota_setup(mds_quota_interface_ref, obd); - if (rc) - GOTO(err_fs, rc); - -#if 0 - mds->mds_group_hash = upcall_cache_init(obd->obd_name); - if (IS_ERR(mds->mds_group_hash)) { - rc = PTR_ERR(mds->mds_group_hash); - mds->mds_group_hash = NULL; - GOTO(err_qctxt, rc); - } -#endif - - /* Don't wait for mds_postrecov trying to clear orphans */ - obd->obd_async_recov = 1; - rc = mds_postsetup(obd); - /* Bug 11557 - allow async abort_recov start - FIXME can remove most of this obd_async_recov plumbing - obd->obd_async_recov = 0; - */ - if (rc) - GOTO(err_qctxt, rc); - - uuid_ptr = fsfilt_uuid(obd, obd->u.obt.obt_sb); - if (uuid_ptr != NULL) { - class_uuid_unparse(uuid_ptr, &uuid); - str = uuid.uuid; - } else { - str = "no UUID"; - } - - label = fsfilt_get_label(obd, obd->u.obt.obt_sb); - if (obd->obd_recovering) { - LCONSOLE_WARN("MDT %s now serving %s (%s%s%s), but will be in " - "recovery until %d %s reconnect, or if no clients" - " reconnect for %d:%.02d; during that time new " - "clients will not be allowed to connect. " - "Recovery progress can be monitored by watching " - "/proc/fs/lustre/mds/%s/recovery_status.\n", - obd->obd_name, lustre_cfg_string(lcfg, 1), - label ?: "", label ? "/" : "", str, - obd->obd_max_recoverable_clients, - (obd->obd_max_recoverable_clients == 1) ? - "client" : "clients", - (int)(OBD_RECOVERY_TIMEOUT) / 60, - (int)(OBD_RECOVERY_TIMEOUT) % 60, - obd->obd_name); - } else { - LCONSOLE_INFO("MDT %s now serving %s (%s%s%s) with recovery " - "%s\n", obd->obd_name, lustre_cfg_string(lcfg, 1), - label ?: "", label ? "/" : "", str, - obd->obd_replayable ? "enabled" : "disabled"); - } - - if (ldlm_timeout == LDLM_TIMEOUT_DEFAULT) - ldlm_timeout = 6; - - RETURN(0); - -err_qctxt: - lquota_cleanup(mds_quota_interface_ref, obd); -err_fs: - /* No extra cleanup needed for llog_init_commit_thread() */ - mds_fs_cleanup(obd); -#if 0 - upcall_cache_cleanup(mds->mds_group_hash); - mds->mds_group_hash = NULL; -#endif -err_ns: - lprocfs_free_obd_stats(obd); - lprocfs_obd_cleanup(obd); - ldlm_namespace_free(obd->obd_namespace, 0); - obd->obd_namespace = NULL; -err_ops: - fsfilt_put_ops(obd->obd_fsops); -err_put: - server_put_mount(obd->obd_name, mnt); - obd->u.obt.obt_sb = NULL; - return rc; -} - static int mds_lov_clean(struct obd_device *obd) { struct mds_obd *mds = &obd->u.mds; @@ -2210,11 +225,12 @@ int mds_postrecov(struct obd_device *obd) LASSERT(!llog_ctxt_null(obd, LLOG_MDS_OST_ORIG_CTXT)); /* clean PENDING dir */ +#if 0 if (strncmp(obd->obd_name, MDD_OBD_NAME, strlen(MDD_OBD_NAME))) rc = mds_cleanup_pending(obd); if (rc < 0) GOTO(out, rc); - +#endif /* FIXME Does target_finish_recovery really need this to block? */ /* Notify the LOV, which will in turn call mds_notify for each tgt */ /* This means that we have to hack obd_notify to think we're obd_set_up @@ -2226,7 +242,6 @@ int mds_postrecov(struct obd_device *obd) /* quota recovery */ lquota_recovery(mds_quota_interface_ref, obd); -out: RETURN(rc); } @@ -2272,450 +287,6 @@ static int mds_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage) RETURN(rc); } -static int mds_cleanup(struct obd_device *obd) -{ - struct mds_obd *mds = &obd->u.mds; - lvfs_sbdev_type save_dev; - ENTRY; - - if (obd->u.obt.obt_sb == NULL) - RETURN(0); - save_dev = lvfs_sbdev(obd->u.obt.obt_sb); - - if (mds->mds_osc_exp) - /* lov export was disconnected by mds_lov_clean; - we just need to drop our ref */ - class_export_put(mds->mds_osc_exp); - - lprocfs_remove_proc_entry("clear", obd->obd_proc_exports_entry); - lprocfs_free_per_client_stats(obd); - lprocfs_free_obd_stats(obd); - lprocfs_obd_cleanup(obd); - - lquota_cleanup(mds_quota_interface_ref, obd); - - mds_update_server_data(obd, 1); - /* XXX - mds_lov_destroy_objids(obd); - */ - mds_fs_cleanup(obd); - -#if 0 - upcall_cache_cleanup(mds->mds_group_hash); - mds->mds_group_hash = NULL; -#endif - - server_put_mount(obd->obd_name, mds->mds_vfsmnt); - obd->u.obt.obt_sb = NULL; - - ldlm_namespace_free(obd->obd_namespace, obd->obd_force); - - spin_lock_bh(&obd->obd_processing_task_lock); - if (obd->obd_recovering) { - target_cancel_recovery_timer(obd); - obd->obd_recovering = 0; - } - spin_unlock_bh(&obd->obd_processing_task_lock); - - fsfilt_put_ops(obd->obd_fsops); - - LCONSOLE_INFO("MDT %s has stopped.\n", obd->obd_name); - - RETURN(0); -} - -static void fixup_handle_for_resent_req(struct ptlrpc_request *req, int offset, - struct ldlm_lock *new_lock, - struct ldlm_lock **old_lock, - struct lustre_handle *lockh) -{ - struct obd_export *exp = req->rq_export; - struct ldlm_request *dlmreq = - lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*dlmreq)); - struct lustre_handle remote_hdl = dlmreq->lock_handle[0]; - struct list_head *iter; - - if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT)) - return; - - spin_lock(&exp->exp_ldlm_data.led_lock); - list_for_each(iter, &exp->exp_ldlm_data.led_held_locks) { - struct ldlm_lock *lock; - lock = list_entry(iter, struct ldlm_lock, l_export_chain); - if (lock == new_lock) - continue; - if (lock->l_remote_handle.cookie == remote_hdl.cookie) { - lockh->cookie = lock->l_handle.h_cookie; - LDLM_DEBUG(lock, "restoring lock cookie"); - DEBUG_REQ(D_DLMTRACE, req,"restoring lock cookie "LPX64, - lockh->cookie); - if (old_lock) - *old_lock = LDLM_LOCK_GET(lock); - spin_unlock(&exp->exp_ldlm_data.led_lock); - return; - } - } - spin_unlock(&exp->exp_ldlm_data.led_lock); - - /* If the xid matches, then we know this is a resent request, - * and allow it. (It's probably an OPEN, for which we don't - * send a lock */ - if (req->rq_xid == - le64_to_cpu(exp->exp_mds_data.med_mcd->mcd_last_xid)) - return; - - if (req->rq_xid == - le64_to_cpu(exp->exp_mds_data.med_mcd->mcd_last_close_xid)) - return; - - /* This remote handle isn't enqueued, so we never received or - * processed this request. Clear MSG_RESENT, because it can - * be handled like any normal request now. */ - - lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT); - - DEBUG_REQ(D_DLMTRACE, req, "no existing lock with rhandle "LPX64, - remote_hdl.cookie); -} - -int intent_disposition(struct ldlm_reply *rep, int flag) -{ - if (!rep) - return 0; - return (rep->lock_policy_res1 & flag); -} - -void intent_set_disposition(struct ldlm_reply *rep, int flag) -{ - if (!rep) - return; - rep->lock_policy_res1 |= flag; -} - -static int mds_intent_policy(struct ldlm_namespace *ns, - struct ldlm_lock **lockp, void *req_cookie, - ldlm_mode_t mode, int flags, void *data) -{ - struct ptlrpc_request *req = req_cookie; - struct ldlm_lock *lock = *lockp; - struct ldlm_intent *it; - struct mds_obd *mds = &req->rq_export->exp_obd->u.mds; - struct ldlm_reply *rep; - struct lustre_handle lockh = { 0 }; - struct ldlm_lock *new_lock = NULL; - int getattr_part = MDS_INODELOCK_UPDATE; - int repsize[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body), - [DLM_LOCKREPLY_OFF] = sizeof(struct ldlm_reply), - [DLM_REPLY_REC_OFF] = sizeof(struct mds_body), - [DLM_REPLY_REC_OFF+1] = mds->mds_max_mdsize }; - int repbufcnt = 4, rc; - ENTRY; - - LASSERT(req != NULL); - - if (lustre_msg_bufcount(req->rq_reqmsg) <= DLM_INTENT_IT_OFF) { - /* No intent was provided */ - rc = lustre_pack_reply(req, 2, repsize, NULL); - if (rc) - RETURN(rc); - RETURN(0); - } - - it = lustre_swab_reqbuf(req, DLM_INTENT_IT_OFF, sizeof(*it), - lustre_swab_ldlm_intent); - if (it == NULL) { - CERROR("Intent missing\n"); - RETURN(req->rq_status = -EFAULT); - } - - LDLM_DEBUG(lock, "intent policy, opc: %s", ldlm_it2str(it->opc)); - - if ((req->rq_export->exp_connect_flags & OBD_CONNECT_ACL) && - (it->opc & (IT_OPEN | IT_GETATTR | IT_LOOKUP))) - /* we should never allow OBD_CONNECT_ACL if not configured */ - repsize[repbufcnt++] = LUSTRE_POSIX_ACL_MAX_SIZE; - else if (it->opc & IT_UNLINK) - repsize[repbufcnt++] = mds->mds_max_cookiesize; - - rc = lustre_pack_reply(req, repbufcnt, repsize, NULL); - if (rc) - RETURN(req->rq_status = rc); - - rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF, sizeof(*rep)); - intent_set_disposition(rep, DISP_IT_EXECD); - - - /* execute policy */ - switch ((long)it->opc) { - case IT_OPEN: - case IT_CREAT|IT_OPEN: - mds_counter_incr(req->rq_export, LPROC_MDS_OPEN); - fixup_handle_for_resent_req(req, DLM_LOCKREQ_OFF, lock, NULL, - &lockh); - /* XXX swab here to assert that an mds_open reint - * packet is following */ - rep->lock_policy_res2 = mds_reint(req, DLM_INTENT_REC_OFF, - &lockh); -#if 0 - /* We abort the lock if the lookup was negative and - * we did not make it to the OPEN portion */ - if (!intent_disposition(rep, DISP_LOOKUP_EXECD)) - RETURN(ELDLM_LOCK_ABORTED); - if (intent_disposition(rep, DISP_LOOKUP_NEG) && - !intent_disposition(rep, DISP_OPEN_OPEN)) -#endif - - /* If there was an error of some sort or if we are not - * returning any locks */ - if (rep->lock_policy_res2 || - !intent_disposition(rep, DISP_OPEN_LOCK)) - RETURN(ELDLM_LOCK_ABORTED); - break; - case IT_LOOKUP: - getattr_part = MDS_INODELOCK_LOOKUP; - case IT_GETATTR: - getattr_part |= MDS_INODELOCK_LOOKUP; - OBD_COUNTER_INCREMENT(req->rq_export->exp_obd, getattr); - case IT_READDIR: - fixup_handle_for_resent_req(req, DLM_LOCKREQ_OFF, lock, - &new_lock, &lockh); - - /* INODEBITS_INTEROP: if this lock was converted from a - * plain lock (client does not support inodebits), then - * child lock must be taken with both lookup and update - * bits set for all operations. - */ - if (!(req->rq_export->exp_connect_flags & OBD_CONNECT_IBITS)) - getattr_part = MDS_INODELOCK_LOOKUP | - MDS_INODELOCK_UPDATE; - - rep->lock_policy_res2 = mds_getattr_lock(req,DLM_INTENT_REC_OFF, - getattr_part, &lockh); - /* FIXME: LDLM can set req->rq_status. MDS sets - policy_res{1,2} with disposition and status. - - replay: returns 0 & req->status is old status - - otherwise: returns req->status */ - if (intent_disposition(rep, DISP_LOOKUP_NEG)) - rep->lock_policy_res2 = 0; - if (!intent_disposition(rep, DISP_LOOKUP_POS) || - rep->lock_policy_res2) - RETURN(ELDLM_LOCK_ABORTED); - if (req->rq_status != 0) { - LBUG(); - rep->lock_policy_res2 = req->rq_status; - RETURN(ELDLM_LOCK_ABORTED); - } - break; - default: - CERROR("Unhandled intent "LPD64"\n", it->opc); - RETURN(-EFAULT); - } - - /* By this point, whatever function we called above must have either - * filled in 'lockh', been an intent replay, or returned an error. We - * want to allow replayed RPCs to not get a lock, since we would just - * drop it below anyways because lock replay is done separately by the - * client afterwards. For regular RPCs we want to give the new lock to - * the client instead of whatever lock it was about to get. */ - if (new_lock == NULL) - new_lock = ldlm_handle2lock(&lockh); - if (new_lock == NULL && (flags & LDLM_FL_INTENT_ONLY)) - RETURN(0); - - LASSERTF(new_lock != NULL, "op "LPX64" lockh "LPX64"\n", - it->opc, lockh.cookie); - - /* If we've already given this lock to a client once, then we should - * have no readers or writers. Otherwise, we should have one reader - * _or_ writer ref (which will be zeroed below) before returning the - * lock to a client. */ - if (new_lock->l_export == req->rq_export) { - LASSERT(new_lock->l_readers + new_lock->l_writers == 0); - } else { - LASSERT(new_lock->l_export == NULL); - LASSERT(new_lock->l_readers + new_lock->l_writers == 1); - } - - *lockp = new_lock; - - if (new_lock->l_export == req->rq_export) { - /* Already gave this to the client, which means that we - * reconstructed a reply. */ - LASSERT(lustre_msg_get_flags(req->rq_reqmsg) & - MSG_RESENT); - RETURN(ELDLM_LOCK_REPLACED); - } - - /* Fixup the lock to be given to the client */ - lock_res_and_lock(new_lock); - new_lock->l_readers = 0; - new_lock->l_writers = 0; - - new_lock->l_export = class_export_get(req->rq_export); - spin_lock(&req->rq_export->exp_ldlm_data.led_lock); - list_add(&new_lock->l_export_chain, - &new_lock->l_export->exp_ldlm_data.led_held_locks); - spin_unlock(&req->rq_export->exp_ldlm_data.led_lock); - - new_lock->l_blocking_ast = lock->l_blocking_ast; - new_lock->l_completion_ast = lock->l_completion_ast; - - memcpy(&new_lock->l_remote_handle, &lock->l_remote_handle, - sizeof(lock->l_remote_handle)); - - new_lock->l_flags &= ~LDLM_FL_LOCAL; - - unlock_res_and_lock(new_lock); - LDLM_LOCK_PUT(new_lock); - - RETURN(ELDLM_LOCK_REPLACED); -} - -static int mdt_setup(struct obd_device *obd, struct lustre_cfg *lcfg) -{ - struct mds_obd *mds = &obd->u.mds; - struct lprocfs_static_vars lvars; - int mds_min_threads; - int mds_max_threads; - int rc = 0; - ENTRY; - - lprocfs_mdt_init_vars(&lvars); - lprocfs_obd_setup(obd, lvars.obd_vars); - - sema_init(&mds->mds_health_sem, 1); - - if (mds_num_threads) { - /* If mds_num_threads is set, it is the min and the max. */ - if (mds_num_threads > MDS_THREADS_MAX) - mds_num_threads = MDS_THREADS_MAX; - if (mds_num_threads < MDS_THREADS_MIN) - mds_num_threads = MDS_THREADS_MIN; - mds_max_threads = mds_min_threads = mds_num_threads; - } else { - /* Base min threads on memory and cpus */ - mds_min_threads = num_possible_cpus() * num_physpages >> - (27 - CFS_PAGE_SHIFT); - if (mds_min_threads < MDS_THREADS_MIN) - mds_min_threads = MDS_THREADS_MIN; - /* Largest auto threads start value */ - if (mds_min_threads > 32) - mds_min_threads = 32; - mds_max_threads = min(MDS_THREADS_MAX, mds_min_threads * 4); - } - - mds->mds_service = - ptlrpc_init_svc(MDS_NBUFS, MDS_BUFSIZE, MDS_MAXREQSIZE, - MDS_MAXREPSIZE, MDS_REQUEST_PORTAL, - MDC_REPLY_PORTAL, MDS_SERVICE_WATCHDOG_TIMEOUT, - mds_handle, LUSTRE_MDS_NAME, - obd->obd_proc_entry, NULL, - mds_min_threads, mds_max_threads, "ll_mdt", 0); - - if (!mds->mds_service) { - CERROR("failed to start service\n"); - GOTO(err_lprocfs, rc = -ENOMEM); - } - - rc = ptlrpc_start_threads(obd, mds->mds_service); - if (rc) - GOTO(err_thread, rc); - - mds->mds_setattr_service = - ptlrpc_init_svc(MDS_NBUFS, MDS_BUFSIZE, MDS_MAXREQSIZE, - MDS_MAXREPSIZE, MDS_SETATTR_PORTAL, - MDC_REPLY_PORTAL, MDS_SERVICE_WATCHDOG_TIMEOUT, - mds_handle, "mds_setattr", - obd->obd_proc_entry, NULL, - mds_min_threads, mds_max_threads, - "ll_mdt_attr", 0); - if (!mds->mds_setattr_service) { - CERROR("failed to start getattr service\n"); - GOTO(err_thread, rc = -ENOMEM); - } - - rc = ptlrpc_start_threads(obd, mds->mds_setattr_service); - if (rc) - GOTO(err_thread2, rc); - - mds->mds_readpage_service = - ptlrpc_init_svc(MDS_NBUFS, MDS_BUFSIZE, MDS_MAXREQSIZE, - MDS_MAXREPSIZE, MDS_READPAGE_PORTAL, - MDC_REPLY_PORTAL, MDS_SERVICE_WATCHDOG_TIMEOUT, - mds_handle, "mds_readpage", - obd->obd_proc_entry, NULL, - MDS_THREADS_MIN_READPAGE, mds_max_threads, - "ll_mdt_rdpg", 0); - if (!mds->mds_readpage_service) { - CERROR("failed to start readpage service\n"); - GOTO(err_thread2, rc = -ENOMEM); - } - - rc = ptlrpc_start_threads(obd, mds->mds_readpage_service); - - if (rc) - GOTO(err_thread3, rc); - - ping_evictor_start(); - - RETURN(0); - -err_thread3: - ptlrpc_unregister_service(mds->mds_readpage_service); - mds->mds_readpage_service = NULL; -err_thread2: - ptlrpc_unregister_service(mds->mds_setattr_service); - mds->mds_setattr_service = NULL; -err_thread: - ptlrpc_unregister_service(mds->mds_service); - mds->mds_service = NULL; -err_lprocfs: - lprocfs_obd_cleanup(obd); - return rc; -} - -static int mdt_cleanup(struct obd_device *obd) -{ - struct mds_obd *mds = &obd->u.mds; - ENTRY; - - ping_evictor_stop(); - - down(&mds->mds_health_sem); - ptlrpc_unregister_service(mds->mds_readpage_service); - ptlrpc_unregister_service(mds->mds_setattr_service); - ptlrpc_unregister_service(mds->mds_service); - mds->mds_readpage_service = NULL; - mds->mds_setattr_service = NULL; - mds->mds_service = NULL; - up(&mds->mds_health_sem); - - lprocfs_obd_cleanup(obd); - - RETURN(0); -} - -static int mdt_health_check(struct obd_device *obd) -{ - struct mds_obd *mds = &obd->u.mds; - int rc = 0; - - down(&mds->mds_health_sem); - rc |= ptlrpc_service_health_check(mds->mds_readpage_service); - rc |= ptlrpc_service_health_check(mds->mds_setattr_service); - rc |= ptlrpc_service_health_check(mds->mds_service); - up(&mds->mds_health_sem); - - /* - * health_check to return 0 on healthy - * and 1 on unhealthy. - */ - if(rc != 0) - rc = 1; - - return rc; -} - static struct dentry *mds_lvfs_fid2dentry(__u64 id, __u32 gen, __u64 gr, void *data) { @@ -2726,108 +297,32 @@ static struct dentry *mds_lvfs_fid2dentry(__u64 id, __u32 gen, __u64 gr, return mds_fid2dentry(&obd->u.mds, &fid, NULL); } -static int mds_health_check(struct obd_device *obd) -{ - struct obd_device_target *odt = &obd->u.obt; -#ifdef USE_HEALTH_CHECK_WRITE - struct mds_obd *mds = &obd->u.mds; -#endif - int rc = 0; - - if (odt->obt_sb->s_flags & MS_RDONLY) - rc = 1; - -#ifdef USE_HEALTH_CHECK_WRITE - LASSERT(mds->mds_health_check_filp != NULL); - rc |= !!lvfs_check_io_health(obd, mds->mds_health_check_filp); -#endif - return rc; -} - -static int mds_process_config(struct obd_device *obd, obd_count len, void *buf) -{ - struct lustre_cfg *lcfg = buf; - struct lprocfs_static_vars lvars; - int rc; - - lprocfs_mds_init_vars(&lvars); - - rc = class_process_proc_param(PARAM_MDT, lvars.obd_vars, lcfg, obd); - return(rc); -} struct lvfs_callback_ops mds_lvfs_ops = { l_fid2dentry: mds_lvfs_fid2dentry, }; -/* use obd ops to offer management infrastructure */ -static struct obd_ops mds_obd_ops = { - .o_owner = THIS_MODULE, - .o_connect = mds_connect, - .o_reconnect = mds_reconnect, - .o_init_export = mds_init_export, - .o_destroy_export = mds_destroy_export, - .o_disconnect = mds_disconnect, - .o_setup = mds_setup, - .o_precleanup = mds_precleanup, - .o_cleanup = mds_cleanup, - .o_postrecov = mds_postrecov, - .o_statfs = mds_obd_statfs, - .o_iocontrol = mds_iocontrol, - .o_create = mds_obd_create, - .o_destroy = mds_obd_destroy, - .o_llog_init = mds_llog_init, - .o_llog_finish = mds_llog_finish, - .o_notify = mds_notify, - .o_health_check = mds_health_check, - .o_process_config = mds_process_config, -}; - -static struct obd_ops mdt_obd_ops = { - .o_owner = THIS_MODULE, - .o_setup = mdt_setup, - .o_cleanup = mdt_cleanup, - .o_health_check = mdt_health_check, -}; - quota_interface_t *mds_quota_interface_ref; extern quota_interface_t mds_quota_interface; -static __attribute__((unused)) int __init mds_init(void) +static void mds_init_ctxt(struct obd_device *obd, struct vfsmount *mnt) { - int rc; - struct lprocfs_static_vars lvars; + struct mds_obd *mds = &obd->u.mds; - request_module("lquota"); - mds_quota_interface_ref = PORTAL_SYMBOL_GET(mds_quota_interface); - rc = lquota_init(mds_quota_interface_ref); - if (rc) { - if (mds_quota_interface_ref) - PORTAL_SYMBOL_PUT(mds_quota_interface); - return rc; - } - init_obd_quota_ops(mds_quota_interface_ref, &mds_obd_ops); + mds->mds_vfsmnt = mnt; + /* why not mnt->mnt_sb instead of mnt->mnt_root->d_inode->i_sb? */ + obd->u.obt.obt_sb = mnt->mnt_root->d_inode->i_sb; - lprocfs_mds_init_vars(&lvars); - class_register_type(&mds_obd_ops, NULL, - lvars.module_vars, LUSTRE_MDS_NAME, NULL); - lprocfs_mds_init_vars(&lvars); - mdt_obd_ops = mdt_obd_ops; //make compiler happy -// class_register_type(&mdt_obd_ops, NULL, -// lvars.module_vars, LUSTRE_MDT_NAME, NULL); + fsfilt_setup(obd, obd->u.obt.obt_sb); - return 0; + OBD_SET_CTXT_MAGIC(&obd->obd_lvfs_ctxt); + obd->obd_lvfs_ctxt.pwdmnt = mnt; + obd->obd_lvfs_ctxt.pwd = mnt->mnt_root; + obd->obd_lvfs_ctxt.fs = get_ds(); + obd->obd_lvfs_ctxt.cb_ops = mds_lvfs_ops; + return; } -static __attribute__((unused)) void /*__exit*/ mds_exit(void) -{ - lquota_exit(mds_quota_interface_ref); - if (mds_quota_interface_ref) - PORTAL_SYMBOL_PUT(mds_quota_interface); - - class_unregister_type(LUSTRE_MDS_NAME); -// class_unregister_type(LUSTRE_MDT_NAME); -} /*mds still need lov setup here*/ static int mds_cmd_setup(struct obd_device *obd, struct lustre_cfg *lcfg) { diff --git a/lustre/mds/mds_fs.c b/lustre/mds/mds_fs.c index 5ecaf5a..db463aa 100644 --- a/lustre/mds/mds_fs.c +++ b/lustre/mds/mds_fs.c @@ -26,9 +26,6 @@ * license text for more details. */ -#ifndef EXPORT_SYMTAB -# define EXPORT_SYMTAB -#endif #define DEBUG_SUBSYSTEM S_MDS #include @@ -48,657 +45,6 @@ #include "mds_internal.h" -static int mds_export_stats_init(struct obd_device *obd, - struct obd_export *exp, - void *localdata) -{ - int rc, num_stats, newnid; - - rc = lprocfs_exp_setup(exp, localdata, &newnid); - if (rc) - return rc; - - if (newnid) { - struct nid_stat *tmp = exp->exp_nid_stats; - LASSERT(tmp != NULL); - num_stats = (sizeof(*obd->obd_type->typ_dt_ops) / sizeof(void *)) + - LPROC_MDS_LAST - 1; - tmp->nid_stats = lprocfs_alloc_stats(num_stats, - LPROCFS_STATS_FLAG_NOPERCPU); - if (tmp->nid_stats == NULL) - return -ENOMEM; - - lprocfs_init_ops_stats(LPROC_MDS_LAST, tmp->nid_stats); - rc = lprocfs_register_stats(tmp->nid_proc, "stats", - tmp->nid_stats); - if (rc) - return rc; - mds_stats_counter_init(tmp->nid_stats); - } - return 0; -} - -/* Add client data to the MDS. We use a bitmap to locate a free space - * in the last_rcvd file if cl_off is -1 (i.e. a new client). - * Otherwise, we have just read the data from the last_rcvd file and - * we know its offset. - * - * It should not be possible to fail adding an existing client - otherwise - * mds_init_server_data() callsite needs to be fixed. - */ -int mds_client_add(struct obd_device *obd, struct obd_export *exp, - int cl_idx, void *localdata) -{ - struct mds_obd *mds = &obd->u.mds; - struct mds_export_data *med = &exp->exp_mds_data; - unsigned long *bitmap = mds->mds_client_bitmap; - int new_client = (cl_idx == -1); - ENTRY; - - LASSERT(bitmap != NULL); - LASSERTF(cl_idx > -2, "%d\n", cl_idx); - - /* XXX if mcd_uuid were a real obd_uuid, I could use obd_uuid_equals */ - if (!strcmp(med->med_mcd->mcd_uuid, obd->obd_uuid.uuid)) - RETURN(0); - - /* the bitmap operations can handle cl_idx > sizeof(long) * 8, so - * there's no need for extra complication here - */ - if (new_client) { - cl_idx = find_first_zero_bit(bitmap, LR_MAX_CLIENTS); - repeat: - if (cl_idx >= LR_MAX_CLIENTS || - OBD_FAIL_CHECK(OBD_FAIL_MDS_CLIENT_ADD)) { - CERROR("no room for %u client - fix LR_MAX_CLIENTS\n", - cl_idx); - return -EOVERFLOW; - } - if (test_and_set_bit(cl_idx, bitmap)) { - cl_idx = find_next_zero_bit(bitmap, LR_MAX_CLIENTS, - cl_idx); - goto repeat; - } - } else { - if (test_and_set_bit(cl_idx, bitmap)) { - CERROR("MDS client %d: bit already set in bitmap!!\n", - cl_idx); - LBUG(); - } - } - - CDEBUG(D_INFO, "client at idx %d with UUID '%s' added\n", - cl_idx, med->med_mcd->mcd_uuid); - - med->med_lr_idx = cl_idx; - med->med_lr_off = le32_to_cpu(mds->mds_server_data->lsd_client_start) + - (cl_idx * le16_to_cpu(mds->mds_server_data->lsd_client_size)); - LASSERTF(med->med_lr_off > 0, "med_lr_off = %llu\n", med->med_lr_off); - mds_export_stats_init(obd, exp, localdata); - - if (new_client) { - struct lvfs_run_ctxt saved; - loff_t off = med->med_lr_off; - struct file *file = mds->mds_rcvd_filp; - void *handle; - int rc; - - push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); - handle = fsfilt_start(obd, file->f_dentry->d_inode, - FSFILT_OP_SETATTR, NULL); - if (IS_ERR(handle)) { - rc = PTR_ERR(handle); - CERROR("unable to start transaction: rc %d\n", rc); - } else { - rc = fsfilt_add_journal_cb(obd, 0, handle, - target_client_add_cb, exp); - if (rc == 0) { - spin_lock(&exp->exp_lock); - exp->exp_need_sync = 1; - spin_unlock(&exp->exp_lock); - } - rc = fsfilt_write_record(obd, file, med->med_mcd, - sizeof(*med->med_mcd), - &off, rc /* sync if no cb */); - fsfilt_commit(obd, file->f_dentry->d_inode, handle, 0); - } - - pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); - - if (rc) - return rc; - CDEBUG(D_INFO, "wrote client mcd at idx %u off %llu (len %u)\n", - med->med_lr_idx, med->med_lr_off, - (unsigned int)sizeof(*med->med_mcd)); - } - return 0; -} - -int mds_client_free(struct obd_export *exp) -{ - struct mds_export_data *med = &exp->exp_mds_data; - struct mds_obd *mds = &exp->exp_obd->u.mds; - struct obd_device *obd = exp->exp_obd; - struct mds_client_data zero_mcd; - struct lvfs_run_ctxt saved; - int rc; - loff_t off; - ENTRY; - - if (!med->med_mcd) - RETURN(0); - - /* XXX if mcd_uuid were a real obd_uuid, I could use obd_uuid_equals */ - if (!strcmp(med->med_mcd->mcd_uuid, obd->obd_uuid.uuid)) - GOTO(free, 0); - - CDEBUG(D_INFO, "freeing client at idx %u, offset %lld with UUID '%s'\n", - med->med_lr_idx, med->med_lr_off, med->med_mcd->mcd_uuid); - - LASSERT(mds->mds_client_bitmap != NULL); - - lprocfs_exp_cleanup(exp); - - off = med->med_lr_off; - - /* Don't clear med_lr_idx here as it is likely also unset. At worst - * we leak a client slot that will be cleaned on the next recovery. */ - if (off <= 0) { - CERROR("%s: client idx %d has offset %lld\n", - obd->obd_name, med->med_lr_idx, off); - GOTO(free, rc = -EINVAL); - } - - /* Clear the bit _after_ zeroing out the client so we don't - race with mds_client_add and zero out new clients.*/ - if (!test_bit(med->med_lr_idx, mds->mds_client_bitmap)) { - CERROR("MDS client %u: bit already clear in bitmap!!\n", - med->med_lr_idx); - LBUG(); - } - - if (!(exp->exp_flags & OBD_OPT_FAILOVER)) { - memset(&zero_mcd, 0, sizeof zero_mcd); - push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); - rc = fsfilt_write_record(obd, mds->mds_rcvd_filp, &zero_mcd, - sizeof(zero_mcd), &off, - (!exp->exp_libclient || - exp->exp_need_sync)); - pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); - - CDEBUG(rc == 0 ? D_INFO : D_ERROR, - "zeroing out client %s idx %u in %s rc %d\n", - med->med_mcd->mcd_uuid, med->med_lr_idx, LAST_RCVD, rc); - } - - if (!test_and_clear_bit(med->med_lr_idx, mds->mds_client_bitmap)) { - CERROR("MDS client %u: bit already clear in bitmap!!\n", - med->med_lr_idx); - LBUG(); - } - - - /* Make sure the server's last_transno is up to date. Do this - * after the client is freed so we know all the client's - * transactions have been committed. */ - mds_update_server_data(exp->exp_obd, 0); - - EXIT; - free: - OBD_FREE(med->med_mcd, sizeof(*med->med_mcd)); - med->med_mcd = NULL; - - return 0; -} - -static int mds_server_free_data(struct mds_obd *mds) -{ - OBD_FREE(mds->mds_client_bitmap, LR_MAX_CLIENTS / 8); - OBD_FREE(mds->mds_server_data, sizeof(*mds->mds_server_data)); - mds->mds_server_data = NULL; - - return 0; -} - -static int mds_init_server_data(struct obd_device *obd, struct file *file) -{ - struct mds_obd *mds = &obd->u.mds; - struct lr_server_data *lsd; - struct mds_client_data *mcd = NULL; - loff_t off = 0; - unsigned long last_rcvd_size = i_size_read(file->f_dentry->d_inode); - __u64 mount_count; - int cl_idx, rc = 0; - ENTRY; - - /* ensure padding in the struct is the correct size */ - LASSERT(offsetof(struct lr_server_data, lsd_padding) + - sizeof(lsd->lsd_padding) == LR_SERVER_SIZE); - LASSERT(offsetof(struct mds_client_data, mcd_padding) + - sizeof(mcd->mcd_padding) == LR_CLIENT_SIZE); - - OBD_ALLOC_WAIT(lsd, sizeof(*lsd)); - if (!lsd) - RETURN(-ENOMEM); - - OBD_ALLOC_WAIT(mds->mds_client_bitmap, LR_MAX_CLIENTS / 8); - if (!mds->mds_client_bitmap) { - OBD_FREE(lsd, sizeof(*lsd)); - RETURN(-ENOMEM); - } - - mds->mds_server_data = lsd; - - if (last_rcvd_size == 0) { - LCONSOLE_WARN("%s: new disk, initializing\n", obd->obd_name); - - memcpy(lsd->lsd_uuid, obd->obd_uuid.uuid,sizeof(lsd->lsd_uuid)); - lsd->lsd_last_transno = 0; - mount_count = lsd->lsd_mount_count = 0; - lsd->lsd_server_size = cpu_to_le32(LR_SERVER_SIZE); - lsd->lsd_client_start = cpu_to_le32(LR_CLIENT_START); - lsd->lsd_client_size = cpu_to_le16(LR_CLIENT_SIZE); - lsd->lsd_feature_rocompat = cpu_to_le32(OBD_ROCOMPAT_LOVOBJID); - lsd->lsd_feature_incompat = cpu_to_le32(OBD_INCOMPAT_MDT); - } else { - rc = fsfilt_read_record(obd, file, lsd, sizeof(*lsd), &off); - if (rc) { - CERROR("error reading MDS %s: rc %d\n", LAST_RCVD, rc); - GOTO(err_msd, rc); - } - if (strcmp(lsd->lsd_uuid, obd->obd_uuid.uuid) != 0) { - LCONSOLE_ERROR_MSG(0x157, "Trying to start OBD %s " - "using the wrong disk %s. Were the " - "/dev/ assignments rearranged?\n", - obd->obd_uuid.uuid, lsd->lsd_uuid); - GOTO(err_msd, rc = -EINVAL); - } - mount_count = le64_to_cpu(lsd->lsd_mount_count); - } - - if (lsd->lsd_feature_incompat & ~cpu_to_le32(MDT_INCOMPAT_SUPP)) { - CERROR("%s: unsupported incompat filesystem feature(s) %x\n", - obd->obd_name, le32_to_cpu(lsd->lsd_feature_incompat) & - ~MDT_INCOMPAT_SUPP); - GOTO(err_msd, rc = -EINVAL); - } - if (lsd->lsd_feature_rocompat & ~cpu_to_le32(MDT_ROCOMPAT_SUPP)) { - CERROR("%s: unsupported read-only filesystem feature(s) %x\n", - obd->obd_name, le32_to_cpu(lsd->lsd_feature_rocompat) & - ~MDT_ROCOMPAT_SUPP); - /* Do something like remount filesystem read-only */ - GOTO(err_msd, rc = -EINVAL); - } - - lsd->lsd_feature_compat = cpu_to_le32(OBD_COMPAT_MDT); - - mds->mds_last_transno = le64_to_cpu(lsd->lsd_last_transno); - - CDEBUG(D_INODE, "%s: server last_transno: "LPU64"\n", - obd->obd_name, mds->mds_last_transno); - CDEBUG(D_INODE, "%s: server mount_count: "LPU64"\n", - obd->obd_name, mount_count + 1); - CDEBUG(D_INODE, "%s: server data size: %u\n", - obd->obd_name, le32_to_cpu(lsd->lsd_server_size)); - CDEBUG(D_INODE, "%s: per-client data start: %u\n", - obd->obd_name, le32_to_cpu(lsd->lsd_client_start)); - CDEBUG(D_INODE, "%s: per-client data size: %u\n", - obd->obd_name, le32_to_cpu(lsd->lsd_client_size)); - CDEBUG(D_INODE, "%s: last_rcvd size: %lu\n", - obd->obd_name, last_rcvd_size); - CDEBUG(D_INODE, "%s: last_rcvd clients: %lu\n", obd->obd_name, - last_rcvd_size <= le32_to_cpu(lsd->lsd_client_start) ? 0 : - (last_rcvd_size - le32_to_cpu(lsd->lsd_client_start)) / - le16_to_cpu(lsd->lsd_client_size)); - - if (!lsd->lsd_server_size || !lsd->lsd_client_start || - !lsd->lsd_client_size) { - CERROR("Bad last_rcvd contents!\n"); - GOTO(err_msd, rc = -EINVAL); - } - - /* When we do a clean MDS shutdown, we save the last_transno into - * the header. If we find clients with higher last_transno values - * then those clients may need recovery done. */ - for (cl_idx = 0, off = le32_to_cpu(lsd->lsd_client_start); - off < last_rcvd_size; cl_idx++) { - __u64 last_transno; - struct obd_export *exp; - struct mds_export_data *med; - - if (!mcd) { - OBD_ALLOC_WAIT(mcd, sizeof(*mcd)); - if (!mcd) - GOTO(err_client, rc = -ENOMEM); - } - - /* Don't assume off is incremented properly by - * fsfilt_read_record(), in case sizeof(*mcd) - * isn't the same as lsd->lsd_client_size. */ - off = le32_to_cpu(lsd->lsd_client_start) + - cl_idx * le16_to_cpu(lsd->lsd_client_size); - rc = fsfilt_read_record(obd, file, mcd, sizeof(*mcd), &off); - if (rc) { - CERROR("error reading MDS %s idx %d, off %llu: rc %d\n", - LAST_RCVD, cl_idx, off, rc); - break; /* read error shouldn't cause startup to fail */ - } - - if (mcd->mcd_uuid[0] == '\0') { - CDEBUG(D_INFO, "skipping zeroed client at offset %d\n", - cl_idx); - continue; - } - - last_transno = le64_to_cpu(mcd->mcd_last_transno) > - le64_to_cpu(mcd->mcd_last_close_transno) ? - le64_to_cpu(mcd->mcd_last_transno) : - le64_to_cpu(mcd->mcd_last_close_transno); - - /* These exports are cleaned up by mds_disconnect(), so they - * need to be set up like real exports as mds_connect() does. - */ - CDEBUG(D_HA, "RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64 - " srv lr: "LPU64" lx: "LPU64"\n", mcd->mcd_uuid, cl_idx, - last_transno, le64_to_cpu(lsd->lsd_last_transno), - le64_to_cpu(mcd->mcd_last_xid)); - - exp = class_new_export(obd, (struct obd_uuid *)mcd->mcd_uuid); - if (IS_ERR(exp)) { - if (PTR_ERR(exp) == -EALREADY) { - /* export already exists, zero out this one */ - mcd->mcd_uuid[0] = '\0'; - } else { - GOTO(err_client, rc = PTR_ERR(exp)); - } - } else { - med = &exp->exp_mds_data; - med->med_mcd = mcd; - rc = mds_client_add(obd, exp, cl_idx, NULL); - LASSERTF(rc == 0, "rc = %d\n", rc); /* can't fail existing */ - - mcd = NULL; - spin_lock(&exp->exp_lock); - exp->exp_req_replay_needed = 1; - exp->exp_connecting = 0; - spin_unlock(&exp->exp_lock); - obd->obd_max_recoverable_clients++; - class_export_put(exp); - } - /* Need to check last_rcvd even for duplicated exports. */ - CDEBUG(D_OTHER, "client at idx %d has last_transno = "LPU64"\n", - cl_idx, last_transno); - - if (last_transno > mds->mds_last_transno) - mds->mds_last_transno = last_transno; - } - - if (mcd) - OBD_FREE(mcd, sizeof(*mcd)); - - obd->obd_last_committed = mds->mds_last_transno; - - if (obd->obd_max_recoverable_clients) { - /* shouldn't happen in b_new_cmd */ - LBUG(); - CWARN("RECOVERY: service %s, %d recoverable clients, " - "last_transno "LPU64"\n", obd->obd_name, - obd->obd_max_recoverable_clients, mds->mds_last_transno); - obd->obd_next_recovery_transno = obd->obd_last_committed + 1; - obd->obd_recovering = 1; - obd->obd_recovery_start = cfs_time_current_sec(); - /* Only used for lprocfs_status */ - obd->obd_recovery_end = obd->obd_recovery_start + - OBD_RECOVERY_TIMEOUT; - } - - mds->mds_mount_count = mount_count + 1; - lsd->lsd_mount_count = lsd->lsd_compat14 = - cpu_to_le64(mds->mds_mount_count); - - /* save it, so mount count and last_transno is current */ - rc = mds_update_server_data(obd, 1); - if (rc) - GOTO(err_client, rc); - - RETURN(0); - -err_client: - class_disconnect_exports(obd); -err_msd: - mds_server_free_data(mds); - RETURN(rc); -} - -void mds_init_ctxt(struct obd_device *obd, struct vfsmount *mnt) -{ - struct mds_obd *mds = &obd->u.mds; - - mds->mds_vfsmnt = mnt; - /* why not mnt->mnt_sb instead of mnt->mnt_root->d_inode->i_sb? */ - obd->u.obt.obt_sb = mnt->mnt_root->d_inode->i_sb; - - fsfilt_setup(obd, obd->u.obt.obt_sb); - - OBD_SET_CTXT_MAGIC(&obd->obd_lvfs_ctxt); - obd->obd_lvfs_ctxt.pwdmnt = mnt; - obd->obd_lvfs_ctxt.pwd = mnt->mnt_root; - obd->obd_lvfs_ctxt.fs = get_ds(); - obd->obd_lvfs_ctxt.cb_ops = mds_lvfs_ops; - return; -} - -int mds_fs_setup(struct obd_device *obd, struct vfsmount *mnt) -{ - struct mds_obd *mds = &obd->u.mds; - struct lvfs_run_ctxt saved; - struct dentry *dentry; - struct file *file; - int rc; - ENTRY; - - if (OBD_FAIL_CHECK(OBD_FAIL_MDS_FS_SETUP)) - RETURN(-ENOENT); - - rc = cleanup_group_info(); - if (rc) - RETURN(rc); - - mds_init_ctxt(obd, mnt); - - /* setup the directory tree */ - push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); - dentry = simple_mkdir(current->fs->pwd, "ROOT", 0755, 0); - if (IS_ERR(dentry)) { - rc = PTR_ERR(dentry); - CERROR("cannot create ROOT directory: rc = %d\n", rc); - GOTO(err_pop, rc); - } - - mds->mds_rootfid.id = dentry->d_inode->i_ino; - mds->mds_rootfid.generation = dentry->d_inode->i_generation; - mds->mds_rootfid.f_type = S_IFDIR; - - dput(dentry); - - dentry = lookup_one_len("__iopen__", current->fs->pwd, - strlen("__iopen__")); - if (IS_ERR(dentry)) { - rc = PTR_ERR(dentry); - CERROR("cannot lookup __iopen__ directory: rc = %d\n", rc); - GOTO(err_pop, rc); - } - - mds->mds_fid_de = dentry; - if (!dentry->d_inode || is_bad_inode(dentry->d_inode)) { - rc = -ENOENT; - CERROR("__iopen__ directory has no inode? rc = %d\n", rc); - GOTO(err_fid, rc); - } - - dentry = simple_mkdir(current->fs->pwd, "PENDING", 0777, 1); - if (IS_ERR(dentry)) { - rc = PTR_ERR(dentry); - CERROR("cannot create PENDING directory: rc = %d\n", rc); - GOTO(err_fid, rc); - } - mds->mds_pending_dir = dentry; - - /* COMPAT_146 */ - dentry = simple_mkdir(current->fs->pwd, MDT_LOGS_DIR, 0777, 1); - if (IS_ERR(dentry)) { - rc = PTR_ERR(dentry); - CERROR("cannot create %s directory: rc = %d\n", - MDT_LOGS_DIR, rc); - GOTO(err_pending, rc); - } - mds->mds_logs_dir = dentry; - /* end COMPAT_146 */ - - dentry = simple_mkdir(current->fs->pwd, "OBJECTS", 0777, 1); - if (IS_ERR(dentry)) { - rc = PTR_ERR(dentry); - CERROR("cannot create OBJECTS directory: rc = %d\n", rc); - GOTO(err_logs, rc); - } - mds->mds_objects_dir = dentry; - - /* open and test the last rcvd file */ - file = filp_open(LAST_RCVD, O_RDWR | O_CREAT, 0644); - if (IS_ERR(file)) { - rc = PTR_ERR(file); - CERROR("cannot open/create %s file: rc = %d\n", LAST_RCVD, rc); - GOTO(err_objects, rc = PTR_ERR(file)); - } - mds->mds_rcvd_filp = file; - if (!S_ISREG(file->f_dentry->d_inode->i_mode)) { - CERROR("%s is not a regular file!: mode = %o\n", LAST_RCVD, - file->f_dentry->d_inode->i_mode); - GOTO(err_last_rcvd, rc = -ENOENT); - } - - rc = mds_init_server_data(obd, file); - if (rc) { - CERROR("cannot read %s: rc = %d\n", LAST_RCVD, rc); - GOTO(err_last_rcvd, rc); - } - - /* open and test the lov objd file */ - file = filp_open(LOV_OBJID, O_RDWR | O_CREAT, 0644); - if (IS_ERR(file)) { - rc = PTR_ERR(file); - CERROR("cannot open/create %s file: rc = %d\n", LOV_OBJID, rc); - GOTO(err_client, rc = PTR_ERR(file)); - } - mds->mds_lov_objid_filp = file; - if (!S_ISREG(file->f_dentry->d_inode->i_mode)) { - CERROR("%s is not a regular file!: mode = %o\n", LOV_OBJID, - file->f_dentry->d_inode->i_mode); - GOTO(err_lov_objid, rc = -ENOENT); - } - - /* open and test the check io file junk */ - file = filp_open(HEALTH_CHECK, O_RDWR | O_CREAT, 0644); - if (IS_ERR(file)) { - rc = PTR_ERR(file); - CERROR("cannot open/create %s file: rc = %d\n", HEALTH_CHECK, - rc); - GOTO(err_lov_objid, rc = PTR_ERR(file)); - } - mds->mds_health_check_filp = file; - if (!S_ISREG(file->f_dentry->d_inode->i_mode)) { - CERROR("%s is not a regular file!: mode = %o\n", HEALTH_CHECK, - file->f_dentry->d_inode->i_mode); - GOTO(err_health_check, rc = -ENOENT); - } - rc = lvfs_check_io_health(obd, file); - if (rc) - GOTO(err_health_check, rc); -err_pop: - pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); - - return rc; - -err_health_check: - if (mds->mds_health_check_filp && - filp_close(mds->mds_health_check_filp, 0)) - CERROR("can't close %s after error\n", HEALTH_CHECK); -err_lov_objid: - if (mds->mds_lov_objid_filp && - filp_close((struct file *)mds->mds_lov_objid_filp, 0)) - CERROR("can't close %s after error\n", LOV_OBJID); -err_client: - class_disconnect_exports(obd); -err_last_rcvd: - if (mds->mds_rcvd_filp && filp_close(mds->mds_rcvd_filp, 0)) - CERROR("can't close %s after error\n", LAST_RCVD); -err_objects: - dput(mds->mds_objects_dir); -err_logs: - dput(mds->mds_logs_dir); -err_pending: - dput(mds->mds_pending_dir); -err_fid: - dput(mds->mds_fid_de); - goto err_pop; -} - -int mds_fs_cleanup(struct obd_device *obd) -{ - struct mds_obd *mds = &obd->u.mds; - struct lvfs_run_ctxt saved; - int rc = 0; - - if (obd->obd_fail) - LCONSOLE_WARN("%s: shutting down for failover; client state " - "will be preserved.\n", obd->obd_name); - - class_disconnect_exports(obd); /* cleans up client info too */ - mds_server_free_data(mds); - - push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); - if (mds->mds_rcvd_filp) { - rc = filp_close(mds->mds_rcvd_filp, 0); - mds->mds_rcvd_filp = NULL; - if (rc) - CERROR("%s file won't close, rc=%d\n", LAST_RCVD, rc); - } - if (mds->mds_lov_objid_filp) { - rc = filp_close((struct file *)mds->mds_lov_objid_filp, 0); - mds->mds_lov_objid_filp = NULL; - if (rc) - CERROR("%s file won't close, rc=%d\n", LOV_OBJID, rc); - } - if (mds->mds_health_check_filp) { - rc = filp_close(mds->mds_health_check_filp, 0); - mds->mds_health_check_filp = NULL; - if (rc) - CERROR("%s file won't close, rc=%d\n", HEALTH_CHECK, - rc); - } - if (mds->mds_objects_dir != NULL) { - l_dput(mds->mds_objects_dir); - mds->mds_objects_dir = NULL; - } - if (mds->mds_logs_dir) { - l_dput(mds->mds_logs_dir); - mds->mds_logs_dir = NULL; - } - if (mds->mds_pending_dir) { - l_dput(mds->mds_pending_dir); - mds->mds_pending_dir = NULL; - } - - lquota_fs_cleanup(mds_quota_interface_ref, obd); - - pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); - shrink_dcache_parent(mds->mds_fid_de); - dput(mds->mds_fid_de); - LL_DQUOT_OFF(obd->u.obt.obt_sb); - - return rc; -} - /* Creates an object with the same name as its fid. Because this is not at all * performance sensitive, it is accomplished by creating a file, checking the * fid, and renaming it. */ diff --git a/lustre/mds/mds_internal.h b/lustre/mds/mds_internal.h index f480a75..03d4888 100644 --- a/lustre/mds/mds_internal.h +++ b/lustre/mds/mds_internal.h @@ -8,171 +8,6 @@ #include #include -#define MDT_ROCOMPAT_SUPP (OBD_ROCOMPAT_LOVOBJID) -#define MDT_INCOMPAT_SUPP (OBD_INCOMPAT_MDT | OBD_INCOMPAT_COMMON_LR) - -/* Data stored per client in the last_rcvd file. In le32 order. */ -struct mds_client_data { - __u8 mcd_uuid[40]; /* client UUID */ - __u64 mcd_last_transno; /* last completed transaction ID */ - __u64 mcd_last_xid; /* xid for the last transaction */ - __u32 mcd_last_result; /* result from last RPC */ - __u32 mcd_last_data; /* per-op data (disposition for open &c.) */ - /* for MDS_CLOSE requests */ - __u64 mcd_last_close_transno; /* last completed transaction ID */ - __u64 mcd_last_close_xid; /* xid for the last transaction */ - __u32 mcd_last_close_result; /* result from last RPC */ - __u32 mcd_last_close_data; /* per-op data (disposition for open &c.) */ - __u8 mcd_padding[LR_CLIENT_SIZE - 88]; -}; - -#define MDS_SERVICE_WATCHDOG_TIMEOUT (obd_timeout * 1000) - -#define MAX_ATIME_DIFF 60 - -struct mds_filter_data { - __u64 io_epoch; -}; - -#define MDS_FILTERDATA(inode) ((struct mds_filter_data *)(inode)->i_filterdata) - -static inline struct mds_obd *mds_req2mds(struct ptlrpc_request *req) -{ - return &req->rq_export->exp_obd->u.mds; -} - -#ifdef __KERNEL__ -/* Open counts for files. No longer atomic, must hold inode->i_sem */ -# define mds_inode_oatomic(inode) ((inode)->i_cindex) - -#ifdef HAVE_I_ALLOC_SEM -#define MDS_UP_READ_ORPHAN_SEM(i) UP_READ_I_ALLOC_SEM(i) -#define MDS_DOWN_READ_ORPHAN_SEM(i) DOWN_READ_I_ALLOC_SEM(i) -#define LASSERT_MDS_ORPHAN_READ_LOCKED(i) LASSERT_I_ALLOC_SEM_READ_LOCKED(i) - -#define MDS_UP_WRITE_ORPHAN_SEM(i) UP_WRITE_I_ALLOC_SEM(i) -#define MDS_DOWN_WRITE_ORPHAN_SEM(i) DOWN_WRITE_I_ALLOC_SEM(i) -#define LASSERT_MDS_ORPHAN_WRITE_LOCKED(i) LASSERT_I_ALLOC_SEM_WRITE_LOCKED(i) -#define MDS_PACK_MD_LOCK 1 -#else -#define MDS_UP_READ_ORPHAN_SEM(i) do { up(&(i)->i_sem); } while (0) -#define MDS_DOWN_READ_ORPHAN_SEM(i) do { down(&(i)->i_sem); } while (0) -#define LASSERT_MDS_ORPHAN_READ_LOCKED(i) LASSERT(down_trylock(&(i)->i_sem)!=0) - -#define MDS_UP_WRITE_ORPHAN_SEM(i) do { up(&(i)->i_sem); } while (0) -#define MDS_DOWN_WRITE_ORPHAN_SEM(i) do { down(&(i)->i_sem); } while (0) -#define LASSERT_MDS_ORPHAN_WRITE_LOCKED(i) LASSERT(down_trylock(&(i)->i_sem)!=0) -#define MDS_PACK_MD_LOCK 0 -#endif - -static inline int mds_orphan_open_count(struct inode *inode) -{ - LASSERT_MDS_ORPHAN_READ_LOCKED(inode); - return mds_inode_oatomic(inode); -} - -static inline int mds_orphan_open_inc(struct inode *inode) -{ - LASSERT_MDS_ORPHAN_WRITE_LOCKED(inode); - return ++mds_inode_oatomic(inode); -} - -static inline int mds_orphan_open_dec_test(struct inode *inode) -{ - LASSERT_MDS_ORPHAN_WRITE_LOCKED(inode); - return --mds_inode_oatomic(inode) == 0; -} - -#define mds_inode_is_orphan(inode) ((inode)->i_flags & 0x4000000) - -static inline void mds_inode_set_orphan(struct inode *inode) -{ - inode->i_flags |= 0x4000000; - CDEBUG(D_VFSTRACE, "setting orphan flag on inode %p\n", inode); -} - -static inline void mds_inode_unset_orphan(struct inode *inode) -{ - inode->i_flags &= ~(0x4000000); - CDEBUG(D_VFSTRACE, "removing orphan flag from inode %p\n", inode); -} - -#endif /* __KERNEL__ */ - -#define MDS_CHECK_RESENT(req, reconstruct) \ -{ \ - if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) { \ - struct mds_client_data *mcd = \ - req->rq_export->exp_mds_data.med_mcd; \ - if (le64_to_cpu(mcd->mcd_last_xid) == req->rq_xid) { \ - reconstruct; \ - RETURN(le32_to_cpu(mcd->mcd_last_result)); \ - } \ - if (le64_to_cpu(mcd->mcd_last_close_xid) == req->rq_xid) { \ - reconstruct; \ - RETURN(le32_to_cpu(mcd->mcd_last_close_result)); \ - } \ - DEBUG_REQ(D_HA, req, "no reply for RESENT req (have "LPD64")",\ - mcd->mcd_last_xid); \ - } \ -} - -/* mds/mds_reint.c */ -int res_gt(const struct ldlm_res_id *res1, const struct ldlm_res_id *res2, - ldlm_policy_data_t *p1, ldlm_policy_data_t *p2); -int enqueue_ordered_locks(struct obd_device *obd, - const struct ldlm_res_id *p1_res_id, - struct lustre_handle *p1_lockh, int p1_lock_mode, - ldlm_policy_data_t *p1_policy, - const struct ldlm_res_id *p2_res_id, - struct lustre_handle *p2_lockh, int p2_lock_mode, - ldlm_policy_data_t *p2_policy); -void mds_commit_cb(struct obd_device *, __u64 last_rcvd, void *data, int error); -int mds_finish_transno(struct mds_obd *mds, struct inode *inode, void *handle, - struct ptlrpc_request *req, int rc, __u32 op_data, - int force_sync); -void mds_reconstruct_generic(struct ptlrpc_request *req); -void mds_req_from_mcd(struct ptlrpc_request *req, struct mds_client_data *mcd); -int mds_get_parent_child_locked(struct obd_device *obd, struct mds_obd *mds, - struct ll_fid *fid, - struct lustre_handle *parent_lockh, - struct dentry **dparentp, int parent_mode, - __u64 parent_lockpart, - char *name, int namelen, - struct lustre_handle *child_lockh, - struct dentry **dchildp, int child_mode, - __u64 child_lockpart); -int mds_lock_new_child(struct obd_device *obd, struct inode *inode, - struct lustre_handle *child_lockh); - -int mds_get_parents_children_locked(struct obd_device *obd, - struct mds_obd *mds, - struct ll_fid *p1_fid, - struct dentry **de_srcdirp, - struct ll_fid *p2_fid, - struct dentry **de_tgtdirp, - int parent_mode, - const char *old_name, int old_len, - struct dentry **de_oldp, - const char *new_name, int new_len, - struct dentry **de_newp, - struct lustre_handle *dlm_handles, - int child_mode); - -void mds_shrink_reply(struct obd_device *obd, struct ptlrpc_request *req, - struct mds_body *body, int md_off); -int mds_get_cookie_size(struct obd_device *obd, struct lov_mds_md *lmm); -/* mds/mds_lib.c */ -int mds_update_unpack(struct ptlrpc_request *, int offset, - struct mds_update_record *); -int mds_init_ucred(struct lvfs_ucred *ucred, struct ptlrpc_request *req, - int offset); -void mds_exit_ucred(struct lvfs_ucred *ucred, struct mds_obd *obd); - -/* mds/mds_unlink_open.c */ -int mds_osc_destroy_orphan(struct obd_device *obd, umode_t mode, - struct lov_mds_md *lmm, int lmm_size, - struct llog_cookie *logcookies, int log_unlink); int mds_cleanup_pending(struct obd_device *obd); @@ -185,7 +20,6 @@ int mds_llog_finish(struct obd_device *obd, int count); /* mds/mds_lov.c */ int mds_lov_connect(struct obd_device *obd, char * lov_name); int mds_lov_disconnect(struct obd_device *obd); -int mds_lov_write_objids(struct obd_device *obd); int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid); void mds_lov_update_objids(struct obd_device *obd, struct lov_mds_md *lmm); @@ -201,60 +35,17 @@ int mds_convert_lov_ea(struct obd_device *obd, struct inode *inode, struct lov_mds_md *lmm, int lmm_size); int mds_init_lov_desc(struct obd_device *obd, struct obd_export *osc_exp); -/* mds/mds_open.c */ -int mds_query_write_access(struct inode *inode); -int mds_open(struct mds_update_record *rec, int offset, - struct ptlrpc_request *req, struct lustre_handle *); -int mds_pin(struct ptlrpc_request *req, int offset); -void mds_mfd_unlink(struct mds_file_data *mfd, int decref); -int mds_mfd_close(struct ptlrpc_request *req, int offset,struct obd_device *obd, - struct mds_file_data *mfd, int unlink_orphan, - struct lov_mds_md *lmm, int lmm_size, - struct llog_cookie *logcookies, int cookies_size, - __u64 *valid); -int mds_close(struct ptlrpc_request *req, int offset); -int mds_done_writing(struct ptlrpc_request *req, int offset); - -/*mds/mds_join.c*/ -int mds_join_file(struct mds_update_record *rec, struct ptlrpc_request *req, - struct dentry *dchild, struct lustre_handle *lockh); - -/* mds/mds_fs.c */ -int mds_client_add(struct obd_device *obd, struct obd_export *exp, - int cl_off, void *localdata); -int mds_client_free(struct obd_export *exp); int mds_obd_create(struct obd_export *exp, struct obdo *oa, struct lov_stripe_md **ea, struct obd_trans_info *oti); int mds_obd_destroy(struct obd_export *exp, struct obdo *oa, struct lov_stripe_md *ea, struct obd_trans_info *oti, struct obd_export *md_exp); -void mds_init_ctxt(struct obd_device *obd, struct vfsmount *mnt); /* mds/handler.c */ extern struct lvfs_callback_ops mds_lvfs_ops; -extern int mds_iocontrol(unsigned int cmd, struct obd_export *exp, - int len, void *karg, void *uarg); -int mds_postrecov(struct obd_device *obd); -int mds_init_export(struct obd_export *exp); -#ifdef __KERNEL__ -int mds_get_md(struct obd_device *, struct inode *, void *md, int *size, - int lock); -int mds_pack_md(struct obd_device *, struct lustre_msg *, int offset, - struct mds_body *, struct inode *, int lock); -void mds_pack_inode2fid(struct ll_fid *fid, struct inode *inode); -void mds_pack_inode2body(struct mds_body *body, struct inode *inode); -#endif -int mds_pack_acl(struct mds_export_data *med, struct inode *inode, - struct lustre_msg *repmsg, struct mds_body *repbody, - int repoff); - /* quota stuff */ extern quota_interface_t *mds_quota_interface_ref; -/* mds/mds_xattr.c */ -int mds_setxattr(struct ptlrpc_request *req); -int mds_getxattr(struct ptlrpc_request *req); - /* mds/lproc_mds.c */ enum { LPROC_MDS_OPEN = 0, diff --git a/lustre/mds/mds_log.c b/lustre/mds/mds_log.c index 4f3940b..e1488d2 100644 --- a/lustre/mds/mds_log.c +++ b/lustre/mds/mds_log.c @@ -93,90 +93,6 @@ static int mds_llog_repl_cancel(struct llog_ctxt *ctxt, struct lov_stripe_md *ls RETURN(rc); } -int mds_log_op_unlink(struct obd_device *obd, - struct lov_mds_md *lmm, int lmm_size, - struct llog_cookie *logcookies, int cookies_size) -{ - struct mds_obd *mds = &obd->u.mds; - struct lov_stripe_md *lsm = NULL; - struct llog_unlink_rec *lur; - struct llog_ctxt *ctxt; - int rc; - ENTRY; - - if (IS_ERR(mds->mds_osc_obd)) - RETURN(PTR_ERR(mds->mds_osc_obd)); - - rc = obd_unpackmd(mds->mds_osc_exp, &lsm, lmm, lmm_size); - if (rc < 0) - RETURN(rc); - rc = obd_checkmd(mds->mds_osc_exp, obd->obd_self_export, lsm); - if (rc) - GOTO(out, rc); - /* first prepare unlink log record */ - OBD_ALLOC(lur, sizeof(*lur)); - if (!lur) - GOTO(out, rc = -ENOMEM); - lur->lur_hdr.lrh_len = lur->lur_tail.lrt_len = sizeof(*lur); - lur->lur_hdr.lrh_type = MDS_UNLINK_REC; - - ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT); - rc = llog_add(ctxt, &lur->lur_hdr, lsm, logcookies, - cookies_size / sizeof(struct llog_cookie)); - llog_ctxt_put(ctxt); - - OBD_FREE(lur, sizeof(*lur)); -out: - obd_free_memmd(mds->mds_osc_exp, &lsm); - RETURN(rc); -} -EXPORT_SYMBOL(mds_log_op_unlink); -int mds_log_op_setattr(struct obd_device *obd, __u32 uid, __u32 gid, - struct lov_mds_md *lmm, int lmm_size, - struct llog_cookie *logcookies, int cookies_size) -{ - struct mds_obd *mds = &obd->u.mds; - struct lov_stripe_md *lsm = NULL; - struct llog_setattr_rec *lsr; - struct llog_ctxt *ctxt; - int rc; - ENTRY; - - if (IS_ERR(mds->mds_osc_obd)) - RETURN(PTR_ERR(mds->mds_osc_obd)); - - rc = obd_unpackmd(mds->mds_osc_exp, &lsm, lmm, lmm_size); - if (rc < 0) - RETURN(rc); - - rc = obd_checkmd(mds->mds_osc_exp, obd->obd_self_export, lsm); - if (rc) - GOTO(out, rc); - - OBD_ALLOC(lsr, sizeof(*lsr)); - if (!lsr) - GOTO(out, rc = -ENOMEM); - - /* prepare setattr log record */ - lsr->lsr_hdr.lrh_len = lsr->lsr_tail.lrt_len = sizeof(*lsr); - lsr->lsr_hdr.lrh_type = MDS_SETATTR_REC; - lsr->lsr_uid = uid; - lsr->lsr_gid = gid; - - /* write setattr log */ - ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT); - rc = llog_add(ctxt, &lsr->lsr_hdr, lsm, logcookies, - cookies_size / sizeof(struct llog_cookie)); - - llog_ctxt_put(ctxt); - - OBD_FREE(lsr, sizeof(*lsr)); - out: - obd_free_memmd(mds->mds_osc_exp, &lsm); - RETURN(rc); -} -EXPORT_SYMBOL(mds_log_op_setattr); - static struct llog_operations mds_ost_orig_logops = { lop_add: mds_llog_origin_add, lop_connect: mds_llog_origin_connect, diff --git a/lustre/mds/mds_lov.c b/lustre/mds/mds_lov.c index 91dcd33..d4ec371 100644 --- a/lustre/mds/mds_lov.c +++ b/lustre/mds/mds_lov.c @@ -26,9 +26,6 @@ * license text for more details. */ -#ifndef EXPORT_SYMTAB -# define EXPORT_SYMTAB -#endif #define DEBUG_SUBSYSTEM S_MDS #include @@ -120,7 +117,6 @@ err_free_bitmap: RETURN(rc); } -EXPORT_SYMBOL(mds_lov_init_objids); void mds_lov_destroy_objids(struct obd_device *obd) { @@ -148,35 +144,6 @@ void mds_lov_destroy_objids(struct obd_device *obd) FREE_BITMAP(mds->mds_lov_page_dirty); EXIT; } -EXPORT_SYMBOL(mds_lov_destroy_objids); - -void mds_lov_update_objids(struct obd_device *obd, struct lov_mds_md *lmm) -{ - struct mds_obd *mds = &obd->u.mds; - int j; - ENTRY; - - /* if we create file without objects - lmm is NULL */ - if (lmm == NULL) - return; - - for (j = 0; j < le32_to_cpu(lmm->lmm_stripe_count); j++) { - int i = le32_to_cpu(lmm->lmm_objects[j].l_ost_idx); - obd_id id = le64_to_cpu(lmm->lmm_objects[j].l_object_id); - int page = i / OBJID_PER_PAGE(); - int idx = i % OBJID_PER_PAGE(); - obd_id *data = mds->mds_lov_page_array[page]; - - CDEBUG(D_INODE,"update last object for ost %d - new %llu" - " old %llu\n", i, id, data[idx]); - if (id > data[idx]) { - data[idx] = id; - cfs_bitmap_set(mds->mds_lov_page_dirty, page); - } - } - EXIT; -} -EXPORT_SYMBOL(mds_lov_update_objids); static int mds_lov_read_objids(struct obd_device *obd) { @@ -615,214 +582,6 @@ int mds_lov_disconnect(struct obd_device *obd) RETURN(rc); } -int mds_iocontrol(unsigned int cmd, struct obd_export *exp, int len, - void *karg, void *uarg) -{ - static struct obd_uuid cfg_uuid = { .uuid = "config_uuid" }; - struct obd_device *obd = exp->exp_obd; - struct mds_obd *mds = &obd->u.mds; - struct obd_ioctl_data *data = karg; - struct lvfs_run_ctxt saved; - int rc = 0; - - ENTRY; - CDEBUG(D_IOCTL, "handling ioctl cmd %#x\n", cmd); - - switch (cmd) { - case OBD_IOC_RECORD: { - char *name = data->ioc_inlbuf1; - struct llog_ctxt *ctxt; - - if (mds->mds_cfg_llh) - RETURN(-EBUSY); - - ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT); - push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); - rc = llog_create(ctxt, &mds->mds_cfg_llh, NULL, name); - llog_ctxt_put(ctxt); - if (rc == 0) - llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN, - &cfg_uuid); - else - mds->mds_cfg_llh = NULL; - pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); - - RETURN(rc); - } - - case OBD_IOC_ENDRECORD: { - if (!mds->mds_cfg_llh) - RETURN(-EBADF); - - push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); - rc = llog_close(mds->mds_cfg_llh); - pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); - - mds->mds_cfg_llh = NULL; - RETURN(rc); - } - - case OBD_IOC_CLEAR_LOG: { - char *name = data->ioc_inlbuf1; - struct llog_ctxt *ctxt; - if (mds->mds_cfg_llh) - RETURN(-EBUSY); - - ctxt = llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT); - push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); - rc = llog_create(ctxt, &mds->mds_cfg_llh, NULL, name); - llog_ctxt_put(ctxt); - if (rc == 0) { - llog_init_handle(mds->mds_cfg_llh, LLOG_F_IS_PLAIN, - NULL); - - rc = llog_destroy(mds->mds_cfg_llh); - llog_free_handle(mds->mds_cfg_llh); - } - pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); - - mds->mds_cfg_llh = NULL; - RETURN(rc); - } - - case OBD_IOC_DORECORD: { - char *cfg_buf; - struct llog_rec_hdr rec; - if (!mds->mds_cfg_llh) - RETURN(-EBADF); - - rec.lrh_len = llog_data_len(data->ioc_plen1); - - if (data->ioc_type == LUSTRE_CFG_TYPE) { - rec.lrh_type = OBD_CFG_REC; - } else { - CERROR("unknown cfg record type:%d \n", data->ioc_type); - RETURN(-EINVAL); - } - - OBD_ALLOC(cfg_buf, data->ioc_plen1); - if (cfg_buf == NULL) - RETURN(-EINVAL); - rc = copy_from_user(cfg_buf, data->ioc_pbuf1, data->ioc_plen1); - if (rc) { - OBD_FREE(cfg_buf, data->ioc_plen1); - RETURN(rc); - } - - push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); - rc = llog_write_rec(mds->mds_cfg_llh, &rec, NULL, 0, - cfg_buf, -1); - pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); - - OBD_FREE(cfg_buf, data->ioc_plen1); - RETURN(rc); - } - - case OBD_IOC_PARSE: { - struct llog_ctxt *ctxt = - llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT); - push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); - rc = class_config_parse_llog(ctxt, data->ioc_inlbuf1, NULL); - pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); - llog_ctxt_put(ctxt); - if (rc) - RETURN(rc); - - RETURN(rc); - } - - case OBD_IOC_DUMP_LOG: { - struct llog_ctxt *ctxt = - llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT); - push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); - rc = class_config_dump_llog(ctxt, data->ioc_inlbuf1, NULL); - pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); - llog_ctxt_put(ctxt); - if (rc) - RETURN(rc); - - RETURN(rc); - } - - case OBD_IOC_SYNC: { - CDEBUG(D_INFO, "syncing mds %s\n", obd->obd_name); - rc = fsfilt_sync(obd, obd->u.obt.obt_sb); - RETURN(rc); - } - - case OBD_IOC_SET_READONLY: { - void *handle; - struct inode *inode = obd->u.obt.obt_sb->s_root->d_inode; - BDEVNAME_DECLARE_STORAGE(tmp); - CERROR("*** setting device %s read-only ***\n", - ll_bdevname(obd->u.obt.obt_sb, tmp)); - - handle = fsfilt_start(obd, inode, FSFILT_OP_MKNOD, NULL); - if (!IS_ERR(handle)) - rc = fsfilt_commit(obd, inode, handle, 1); - - CDEBUG(D_HA, "syncing mds %s\n", obd->obd_name); - rc = fsfilt_sync(obd, obd->u.obt.obt_sb); - - lvfs_set_rdonly(obd, obd->u.obt.obt_sb); - RETURN(0); - } - - case OBD_IOC_CATLOGLIST: { - int count = mds->mds_lov_desc.ld_tgt_count; - rc = llog_catalog_list(obd, count, data); - RETURN(rc); - - } - case OBD_IOC_LLOG_CHECK: - case OBD_IOC_LLOG_CANCEL: - case OBD_IOC_LLOG_REMOVE: { - struct llog_ctxt *ctxt = - llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT); - int rc2; - __u32 group; - - obd_llog_finish(obd, mds->mds_lov_desc.ld_tgt_count); - push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL); - rc = llog_ioctl(ctxt, cmd, data); - pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL); - llog_cat_initialize(obd, &obd->obd_olg, - mds->mds_lov_desc.ld_tgt_count, NULL); - group = FILTER_GROUP_MDS0 + mds->mds_id; - llog_ctxt_put(ctxt); - rc2 = obd_set_info_async(mds->mds_osc_exp, - strlen(KEY_MDS_CONN), KEY_MDS_CONN, - sizeof(group), &group, NULL); - if (!rc) - rc = rc2; - RETURN(rc); - } - case OBD_IOC_LLOG_INFO: - case OBD_IOC_LLOG_PRINT: { - struct llog_ctxt *ctxt = - llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT); - - push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL); - rc = llog_ioctl(ctxt, cmd, data); - pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_lvfs_ctxt, NULL); - llog_ctxt_put(ctxt); - - RETURN(rc); - } - - case OBD_IOC_ABORT_RECOVERY: - CERROR("aborting recovery for device %s\n", obd->obd_name); - target_stop_recovery_thread(obd); - RETURN(0); - - default: - CDEBUG(D_INFO, "unknown command %x\n", cmd); - RETURN(-EINVAL); - } - RETURN(0); - -} - /* Collect the preconditions we need to allow client connects */ static void mds_allow_cli(struct obd_device *obd, unsigned int flag) { @@ -1091,57 +850,3 @@ int mds_notify(struct obd_device *obd, struct obd_device *watched, RETURN(rc); } -/* Convert the on-disk LOV EA structre. - * We always try to convert from an old LOV EA format to the common in-memory - * (lsm) format (obd_unpackmd() understands the old on-disk (lmm) format) and - * then convert back to the new on-disk format and save it back to disk - * (obd_packmd() only ever saves to the new on-disk format) so we don't have - * to convert it each time this inode is accessed. - * - * This function is a bit interesting in the error handling. We can safely - * ship the old lmm to the client in case of failure, since it uses the same - * obd_unpackmd() code and can do the conversion if the MDS fails for some - * reason. We will not delete the old lmm data until we have written the - * new format lmm data in fsfilt_set_md(). */ -int mds_convert_lov_ea(struct obd_device *obd, struct inode *inode, - struct lov_mds_md *lmm, int lmm_size) -{ - struct lov_stripe_md *lsm = NULL; - void *handle; - int rc, err; - ENTRY; - - if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC || - le32_to_cpu(lmm->lmm_magic == LOV_MAGIC_JOIN)) - RETURN(0); - - CDEBUG(D_INODE, "converting LOV EA on %lu/%u from %#08x to %#08x\n", - inode->i_ino, inode->i_generation, le32_to_cpu(lmm->lmm_magic), - LOV_MAGIC); - - rc = obd_unpackmd(obd->u.mds.mds_osc_exp, &lsm, lmm, lmm_size); - if (rc < 0) - GOTO(conv_end, rc); - - rc = obd_packmd(obd->u.mds.mds_osc_exp, &lmm, lsm); - if (rc < 0) - GOTO(conv_free, rc); - lmm_size = rc; - - handle = fsfilt_start(obd, inode, FSFILT_OP_SETATTR, NULL); - if (IS_ERR(handle)) { - rc = PTR_ERR(handle); - GOTO(conv_free, rc); - } - - rc = fsfilt_set_md(obd, inode, handle, lmm, lmm_size, "lov"); - - err = fsfilt_commit(obd, inode, handle, 0); - if (!rc) - rc = err ? err : lmm_size; - GOTO(conv_free, rc); -conv_free: - obd_free_memmd(obd->u.mds.mds_osc_exp, &lsm); -conv_end: - return rc; -} diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index ebf3ddf..850271a 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -2223,9 +2223,35 @@ static void mdt_thread_info_fini(struct mdt_thread_info *info) info->mti_env = NULL; } -/* mds/handler.c */ -extern int mds_filter_recovery_request(struct ptlrpc_request *req, - struct obd_device *obd, int *process); +static int mdt_filter_recovery_request(struct ptlrpc_request *req, + struct obd_device *obd, int *process) +{ + switch (lustre_msg_get_opc(req->rq_reqmsg)) { + case MDS_CONNECT: /* This will never get here, but for completeness. */ + case OST_CONNECT: /* This will never get here, but for completeness. */ + case MDS_DISCONNECT: + case OST_DISCONNECT: + *process = 1; + RETURN(0); + + case MDS_CLOSE: + case MDS_DONE_WRITING: + case MDS_SYNC: /* used in unmounting */ + case OBD_PING: + case MDS_REINT: + case SEQ_QUERY: + case FLD_QUERY: + case LDLM_ENQUEUE: + *process = target_queue_recovery_request(req, obd); + RETURN(0); + + default: + DEBUG_REQ(D_ERROR, req, "not permitted during recovery"); + *process = -EAGAIN; + RETURN(0); + } +} + /* * Handle recovery. Return: * +1: continue request processing; @@ -2303,7 +2329,7 @@ static int mdt_recovery(struct mdt_thread_info *info) int rc; int should_process; DEBUG_REQ(D_INFO, req, "Got new replay"); - rc = mds_filter_recovery_request(req, obd, &should_process); + rc = mdt_filter_recovery_request(req, obd, &should_process); if (rc != 0 || !should_process) RETURN(rc); else if (should_process < 0) { @@ -2330,8 +2356,84 @@ static int mdt_reply(struct ptlrpc_request *req, int rc, RETURN(0); } -/* mds/handler.c */ -extern int mds_msg_check_version(struct lustre_msg *msg); +static int mdt_msg_check_version(struct lustre_msg *msg) +{ + int rc; + + switch (lustre_msg_get_opc(msg)) { + case MDS_CONNECT: + case MDS_DISCONNECT: + case OBD_PING: + case SEC_CTX_INIT: + case SEC_CTX_INIT_CONT: + case SEC_CTX_FINI: + rc = lustre_msg_check_version(msg, LUSTRE_OBD_VERSION); + if (rc) + CERROR("bad opc %u version %08x, expecting %08x\n", + lustre_msg_get_opc(msg), + lustre_msg_get_version(msg), + LUSTRE_OBD_VERSION); + break; + case MDS_GETSTATUS: + case MDS_GETATTR: + case MDS_GETATTR_NAME: + case MDS_STATFS: + case MDS_READPAGE: + case MDS_WRITEPAGE: + case MDS_IS_SUBDIR: + case MDS_REINT: + case MDS_CLOSE: + case MDS_DONE_WRITING: + case MDS_PIN: + case MDS_SYNC: + case MDS_GETXATTR: + case MDS_SETXATTR: + case MDS_SET_INFO: + case MDS_QUOTACHECK: + case MDS_QUOTACTL: + case QUOTA_DQACQ: + case QUOTA_DQREL: + case SEQ_QUERY: + case FLD_QUERY: + rc = lustre_msg_check_version(msg, LUSTRE_MDS_VERSION); + if (rc) + CERROR("bad opc %u version %08x, expecting %08x\n", + lustre_msg_get_opc(msg), + lustre_msg_get_version(msg), + LUSTRE_MDS_VERSION); + break; + case LDLM_ENQUEUE: + case LDLM_CONVERT: + case LDLM_BL_CALLBACK: + case LDLM_CP_CALLBACK: + rc = lustre_msg_check_version(msg, LUSTRE_DLM_VERSION); + if (rc) + CERROR("bad opc %u version %08x, expecting %08x\n", + lustre_msg_get_opc(msg), + lustre_msg_get_version(msg), + LUSTRE_DLM_VERSION); + break; + case OBD_LOG_CANCEL: + case LLOG_ORIGIN_HANDLE_CREATE: + case LLOG_ORIGIN_HANDLE_NEXT_BLOCK: + case LLOG_ORIGIN_HANDLE_READ_HEADER: + case LLOG_ORIGIN_HANDLE_CLOSE: + case LLOG_ORIGIN_HANDLE_DESTROY: + case LLOG_ORIGIN_HANDLE_PREV_BLOCK: + case LLOG_CATINFO: + rc = lustre_msg_check_version(msg, LUSTRE_LOG_VERSION); + if (rc) + CERROR("bad opc %u version %08x, expecting %08x\n", + lustre_msg_get_opc(msg), + lustre_msg_get_version(msg), + LUSTRE_LOG_VERSION); + break; + default: + CERROR("MDS unknown opcode %d\n", lustre_msg_get_opc(msg)); + rc = -ENOTSUPP; + } + return rc; +} static int mdt_handle0(struct ptlrpc_request *req, struct mdt_thread_info *info, @@ -2349,7 +2451,7 @@ static int mdt_handle0(struct ptlrpc_request *req, LASSERT(current->journal_info == NULL); msg = req->rq_reqmsg; - rc = mds_msg_check_version(msg); + rc = mdt_msg_check_version(msg); if (likely(rc == 0)) { rc = mdt_recovery(info); if (likely(rc == +1)) { diff --git a/lustre/mdt/mdt_recovery.c b/lustre/mdt/mdt_recovery.c index 04376af..ce27e10 100644 --- a/lustre/mdt/mdt_recovery.c +++ b/lustre/mdt/mdt_recovery.c @@ -1000,7 +1000,56 @@ void mdt_fs_cleanup(const struct lu_env *env, struct mdt_device *mdt) } /* reconstruction code */ -extern void mds_steal_ack_locks(struct ptlrpc_request *req); +static void mdt_steal_ack_locks(struct ptlrpc_request *req) +{ + struct obd_export *exp = req->rq_export; + struct list_head *tmp; + struct ptlrpc_reply_state *oldrep; + struct ptlrpc_service *svc; + int i; + + /* CAVEAT EMPTOR: spinlock order */ + spin_lock(&exp->exp_lock); + list_for_each (tmp, &exp->exp_outstanding_replies) { + oldrep = list_entry(tmp, struct ptlrpc_reply_state,rs_exp_list); + + if (oldrep->rs_xid != req->rq_xid) + continue; + + if (lustre_msg_get_opc(oldrep->rs_msg) != + lustre_msg_get_opc(req->rq_reqmsg)) + CERROR ("Resent req xid "LPX64" has mismatched opc: " + "new %d old %d\n", req->rq_xid, + lustre_msg_get_opc(req->rq_reqmsg), + lustre_msg_get_opc(oldrep->rs_msg)); + + svc = oldrep->rs_service; + spin_lock (&svc->srv_lock); + + list_del_init (&oldrep->rs_exp_list); + + CWARN("Stealing %d locks from rs %p x"LPD64".t"LPD64 + " o%d NID %s\n", + oldrep->rs_nlocks, oldrep, + oldrep->rs_xid, oldrep->rs_transno, + lustre_msg_get_opc(oldrep->rs_msg), + libcfs_nid2str(exp->exp_connection->c_peer.nid)); + + for (i = 0; i < oldrep->rs_nlocks; i++) + ptlrpc_save_lock(req, + &oldrep->rs_locks[i], + oldrep->rs_modes[i]); + oldrep->rs_nlocks = 0; + + DEBUG_REQ(D_HA, req, "stole locks for"); + ptlrpc_schedule_difficult_reply (oldrep); + + spin_unlock (&svc->srv_lock); + break; + } + spin_unlock(&exp->exp_lock); +} + void mdt_req_from_mcd(struct ptlrpc_request *req, struct mdt_client_data *mcd) { @@ -1019,7 +1068,7 @@ void mdt_req_from_mcd(struct ptlrpc_request *req, lustre_msg_set_transno(req->rq_repmsg, req->rq_transno); lustre_msg_set_status(req->rq_repmsg, req->rq_status); } - mds_steal_ack_locks(req); + mdt_steal_ack_locks(req); } void mdt_reconstruct_generic(struct mdt_thread_info *mti,