From 35fde6b733c30f4ea2c6f455442fcf2bd33b5278 Mon Sep 17 00:00:00 2001 From: nikita Date: Wed, 29 Mar 2006 18:52:52 +0000 Subject: [PATCH] mdt prototype changes --- lustre/mdt/mdt.h | 77 ++- lustre/mdt/mdt_handler.c | 1252 ++++++++++++++++++++++++++++++++++++---------- 2 files changed, 1054 insertions(+), 275 deletions(-) diff --git a/lustre/mdt/mdt.h b/lustre/mdt/mdt.h index 5d28baa..1c40b73 100644 --- a/lustre/mdt/mdt.h +++ b/lustre/mdt/mdt.h @@ -7,6 +7,16 @@ #if defined(__KERNEL__) +/* + * struct ptlrpc_client + */ +#include +/* + * struct obd_connect_data + * struct lustre_handle + */ +#include + #include #define LUSTRE_MDT0_NAME "mdt0" @@ -24,13 +34,16 @@ struct ptlrpc_service_conf { int psc_num_threads; }; +struct md_object; + struct md_device { struct lu_device md_lu_dev; struct md_device_operations *md_ops; }; struct md_device_operations { - int (*mdo_root_get)(struct md_device *m, struct lfid *f); + int (*mdo_root_get)(struct md_device *m, struct ll_fid *f); + int (*mdo_mkdir)(struct md_object *o, const char *name); }; struct mdt_device { @@ -40,19 +53,12 @@ struct mdt_device { struct ptlrpc_service_conf mdt_service_conf; /* DLM name-space for meta-data locks maintained by this server */ struct ldlm_namespace *mdt_namespace; - /* DLM handle for MDS->client connections (for lock ASTs). */ - struct ldlm_client mdt_ldlm_client; + /* ptlrpc handle for MDS->client connections (for lock ASTs). */ + struct ptlrpc_client mdt_ldlm_client; /* underlying device */ - struct md_device *mdt_mdd; + struct md_device *mdt_child; }; -/* - * Meta-data stacking. - */ - -struct md_object; -struct md_device; - struct md_object { struct lu_object mo_lu; }; @@ -64,12 +70,16 @@ static inline struct md_object *lu2md(struct lu_object *o) static inline struct md_device *md_device_get(struct md_object *o) { - return container_of(o->mo_lu.lo_dev, struct md_device, md_lu); + return container_of(o->mo_lu.lo_dev, struct md_device, md_lu_dev); } struct mdt_object { struct lu_object_header mot_header; struct md_object mot_obj; + /* + * lock handle for dlm lock. + */ + struct lustre_handle mot_lh; }; struct mdd_object { @@ -81,8 +91,49 @@ struct osd_object { struct dentry *oo_dentry; }; -int md_device_init(struct md_device *md); +int md_device_init(struct md_device *md, struct lu_device_type *t); void md_device_fini(struct md_device *md); +enum { + MDT_REP_BUF_NR_MAX = 8 +}; + +/* + * Common data shared by mdt-level handlers. This is allocated per-thread to + * reduce stack consumption. + */ +struct mdt_thread_info { + struct mdt_device *mti_mdt; + /* + * number of buffers in reply message. + */ + int mti_rep_buf_nr; + /* + * sizes of reply buffers. + */ + int mti_rep_buf_size[MDT_REP_BUF_NR_MAX]; + /* + * Body for "habeo corpus" operations. + */ + struct mds_body *mti_body; + /* + * Host object. This is released at the end of mdt_handler(). + */ + struct mdt_object *mti_object; + /* + * Additional fail id that can be set by handler. Passed to + * target_send_reply(). + */ + int mti_fail_id; + /* + * Offset of incoming buffers. 0 for top-level request processing. +ve + * for intent handling. + */ + int mti_offset; +}; + +int fid_lock(const struct ll_fid *, struct lustre_handle *, ldlm_mode_t); +int fid_unlock(const struct ll_fid *, struct lustre_handle *, ldlm_mode_t); + #endif /* __KERNEL__ */ #endif /* _MDT_H */ diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index c387e27..1e20c39 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -1,4 +1,3 @@ -#if 0 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * @@ -37,13 +36,266 @@ #define DEBUG_SUBSYSTEM S_MDS #include + +/* + * LUSTRE_VERSION_CODE + */ +#include +/* + * struct OBD_{ALLOC,FREE}*() + * OBD_FAIL_CHECK + */ +#include + #include #include "mdt.h" -int mdt_num_threads; +/* + * Initialized in mdt_mod_init(). + */ +unsigned long mdt_num_threads; + +static int mdt_getstatus(struct mdt_thread_info *info, + struct ptlrpc_request *req, int offset) +{ + struct md_device *mdd = info->mti_mdt->mdt_child; + struct mds_body *body; + int size = sizeof *body; + int result; + + ENTRY; + + result = lustre_pack_reply(req, 1, &size, NULL); + if (result) + CERROR(LUSTRE_MDT0_NAME" out of memory for message: size=%d\n", + size); + else if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK)) + result = -ENOMEM; + else { + body = lustre_msg_buf(req->rq_repmsg, 0, sizeof *body); + result = mdd->md_ops->mdo_root_get(mdd, &body->fid1); + } + + /* the last_committed and last_xid fields are filled in for all + * replies already - no need to do so here also. + */ + RETURN(result); +} + +/* + * struct obd_device + */ +#include +/* + * struct class_connect() + */ +#include +/* + * struct obd_export + */ +#include +/* + * struct mds_client_data + */ +#include <../mds/mds_internal.h> +#include +#include +#include +#include +#include +#include +#include + +static int mds_intent_policy(struct ldlm_namespace *ns, + struct ldlm_lock **lockp, void *req_cookie, + ldlm_mode_t mode, int flags, void *data); +static int mds_postsetup(struct obd_device *obd); +static int mds_cleanup(struct obd_device *obd); + +/* Assumes caller has already pushed into the kernel filesystem context */ +static int mds_sendpage(struct ptlrpc_request *req, struct file *file, + loff_t offset, int count) +{ + struct ptlrpc_bulk_desc *desc; + struct l_wait_info lwi; + struct page **pages; + int rc = 0, npages, i, tmpcount, tmpsize = 0; + ENTRY; + + LASSERT((offset & (PAGE_SIZE - 1)) == 0); /* I'm dubious about this */ + + npages = (count + PAGE_SIZE - 1) >> PAGE_SHIFT; + OBD_ALLOC(pages, sizeof(*pages) * npages); + if (!pages) + GOTO(out, rc = -ENOMEM); + + desc = ptlrpc_prep_bulk_exp(req, npages, BULK_PUT_SOURCE, + MDS_BULK_PORTAL); + if (desc == NULL) + GOTO(out_free, rc = -ENOMEM); + + for (i = 0, tmpcount = count; i < npages; i++, tmpcount -= tmpsize) { + tmpsize = tmpcount > PAGE_SIZE ? PAGE_SIZE : tmpcount; + + pages[i] = alloc_pages(GFP_KERNEL, 0); + if (pages[i] == NULL) + GOTO(cleanup_buf, rc = -ENOMEM); + + ptlrpc_prep_bulk_page(desc, pages[i], 0, tmpsize); + } + + for (i = 0, tmpcount = count; i < npages; i++, tmpcount -= tmpsize) { + tmpsize = tmpcount > PAGE_SIZE ? PAGE_SIZE : tmpcount; + CDEBUG(D_EXT2, "reading %u@%llu from dir %lu (size %llu)\n", + tmpsize, offset, file->f_dentry->d_inode->i_ino, + file->f_dentry->d_inode->i_size); + + rc = fsfilt_readpage(req->rq_export->exp_obd, file, + kmap(pages[i]), tmpsize, &offset); + kunmap(pages[i]); + + if (rc != tmpsize) + GOTO(cleanup_buf, rc = -EIO); + } + + LASSERT(desc->bd_nob == count); + + rc = ptlrpc_start_bulk_transfer(desc); + if (rc) + GOTO(cleanup_buf, rc); + + if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE)) { + CERROR("obd_fail_loc=%x, fail operation rc=%d\n", + OBD_FAIL_MDS_SENDPAGE, rc); + GOTO(abort_bulk, rc); + } + + lwi = LWI_TIMEOUT(obd_timeout * HZ / 4, NULL, NULL); + rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc), &lwi); + LASSERT (rc == 0 || rc == -ETIMEDOUT); + + if (rc == 0) { + if (desc->bd_success && + desc->bd_nob_transferred == count) + GOTO(cleanup_buf, rc); + + rc = -ETIMEDOUT; /* XXX should this be a different errno? */ + } + + DEBUG_REQ(D_ERROR, req, "bulk failed: %s %d(%d), evicting %s@%s\n", + (rc == -ETIMEDOUT) ? "timeout" : "network error", + desc->bd_nob_transferred, count, + req->rq_export->exp_client_uuid.uuid, + req->rq_export->exp_connection->c_remote_uuid.uuid); -static int mdt_connect_internal(struct obd_export *exp, + class_fail_export(req->rq_export); + + EXIT; + abort_bulk: + ptlrpc_abort_bulk (desc); + cleanup_buf: + for (i = 0; i < npages; i++) + if (pages[i]) + __free_pages(pages[i], 0); + + ptlrpc_free_bulk(desc); + out_free: + OBD_FREE(pages, sizeof(*pages) * npages); + out: + return rc; +} + +/* only valid locked dentries or errors should be returned */ +struct dentry *mds_fid2locked_dentry(struct obd_device *obd, struct ll_fid *fid, + struct vfsmount **mnt, int lock_mode, + struct lustre_handle *lockh, + char *name, int namelen, __u64 lockpart) +{ + struct mds_obd *mds = &obd->u.mds; + struct dentry *de = mds_fid2dentry(mds, fid, mnt), *retval = de; + struct ldlm_res_id res_id = { .name = {0} }; + int flags = 0, rc; + ldlm_policy_data_t policy = { .l_inodebits = { lockpart} }; + ENTRY; + + if (IS_ERR(de)) + RETURN(de); + + res_id.name[0] = de->d_inode->i_ino; + res_id.name[1] = de->d_inode->i_generation; + rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, res_id, + LDLM_IBITS, &policy, lock_mode, &flags, + ldlm_blocking_ast, ldlm_completion_ast, + NULL, NULL, NULL, 0, NULL, lockh); + if (rc != ELDLM_OK) { + l_dput(de); + retval = ERR_PTR(-EIO); /* XXX translate ldlm code */ + } + + RETURN(retval); +} + +/* Look up an entry by inode number. */ +/* this function ONLY returns valid dget'd dentries with an initialized inode + or errors */ +struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid, + struct vfsmount **mnt) +{ + char fid_name[32]; + unsigned long ino = fid->id; + __u32 generation = fid->generation; + struct inode *inode; + struct dentry *result; + + if (ino == 0) + RETURN(ERR_PTR(-ESTALE)); + + snprintf(fid_name, sizeof(fid_name), "0x%lx", ino); + + CDEBUG(D_DENTRY, "--> mds_fid2dentry: ino/gen %lu/%u, sb %p\n", + ino, generation, mds->mds_obt.obt_sb); + + /* under ext3 this is neither supposed to return bad inodes + nor NULL inodes. */ + result = ll_lookup_one_len(fid_name, mds->mds_fid_de, strlen(fid_name)); + if (IS_ERR(result)) + RETURN(result); + + inode = result->d_inode; + if (!inode) + RETURN(ERR_PTR(-ENOENT)); + + if (inode->i_generation == 0 || inode->i_nlink == 0) { + LCONSOLE_WARN("Found inode with zero generation or link -- this" + " may indicate disk corruption (inode: %lu, link:" + " %lu, count: %d)\n", inode->i_ino, + (unsigned long)inode->i_nlink, + atomic_read(&inode->i_count)); + dput(result); + RETURN(ERR_PTR(-ENOENT)); + } + + if (generation && inode->i_generation != generation) { + /* we didn't find the right inode.. */ + CDEBUG(D_INODE, "found wrong generation: inode %lu, link: %lu, " + "count: %d, generation %u/%u\n", inode->i_ino, + (unsigned long)inode->i_nlink, + atomic_read(&inode->i_count), inode->i_generation, + generation); + dput(result); + RETURN(ERR_PTR(-ENOENT)); + } + + if (mnt) { + *mnt = mds->mds_vfsmnt; + mntget(*mnt); + } + + RETURN(result); +} + +static int mds_connect_internal(struct obd_export *exp, struct obd_connect_data *data) { struct obd_device *obd = exp->exp_obd; @@ -58,18 +310,18 @@ static int mdt_connect_internal(struct obd_export *exp, data->ocd_connect_flags & OBD_CONNECT_IBITS) data->ocd_connect_flags &= ~OBD_CONNECT_IBITS; - if (!obd->u.mds.mdt_fl_acl) + if (!obd->u.mds.mds_fl_acl) data->ocd_connect_flags &= ~OBD_CONNECT_ACL; - if (!obd->u.mds.mdt_fl_user_xattr) + if (!obd->u.mds.mds_fl_user_xattr) data->ocd_connect_flags &= ~OBD_CONNECT_XATTR; exp->exp_connect_flags = data->ocd_connect_flags; data->ocd_version = LUSTRE_VERSION_CODE; - exp->exp_mdt_data.med_ibits_known = data->ocd_ibits_known; + exp->exp_mds_data.med_ibits_known = data->ocd_ibits_known; } - if (obd->u.mds.mdt_fl_acl && + if (obd->u.mds.mds_fl_acl && ((exp->exp_connect_flags & OBD_CONNECT_ACL) == 0)) { CWARN("%s: MDS requires ACL support but client does not\n", obd->obd_name); @@ -78,7 +330,7 @@ static int mdt_connect_internal(struct obd_export *exp, return 0; } -static int mdt_reconnect(struct obd_export *exp, struct obd_device *obd, +static int mds_reconnect(struct obd_export *exp, struct obd_device *obd, struct obd_uuid *cluuid, struct obd_connect_data *data) { @@ -88,7 +340,7 @@ static int mdt_reconnect(struct obd_export *exp, struct obd_device *obd, if (exp == NULL || obd == NULL || cluuid == NULL) RETURN(-EINVAL); - rc = mdt_connect_internal(exp, data); + rc = mds_connect_internal(exp, data); RETURN(rc); } @@ -99,12 +351,12 @@ static int mdt_reconnect(struct obd_export *exp, struct obd_device *obd, * about that client, like open files, the last operation number it did * on the server, etc. */ -static int mdt_connect(struct lustre_handle *conn, struct obd_device *obd, +static int mds_connect(struct lustre_handle *conn, struct obd_device *obd, struct obd_uuid *cluuid, struct obd_connect_data *data) { struct obd_export *exp; - struct mdt_export_data *med; - struct mdt_client_data *mcd = NULL; + struct mds_export_data *med; + struct mds_client_data *mcd = NULL; int rc, abort_recovery; ENTRY; @@ -133,26 +385,26 @@ static int mdt_connect(struct lustre_handle *conn, struct obd_device *obd, RETURN(rc); exp = class_conn2export(conn); LASSERT(exp); - med = &exp->exp_mdt_data; + med = &exp->exp_mds_data; - rc = mdt_connect_internal(exp, data); + rc = mds_connect_internal(exp, data); if (rc) GOTO(out, rc); - OBD_ALLOC_PTR(mcd); + OBD_ALLOC(mcd, sizeof(*mcd)); if (!mcd) GOTO(out, rc = -ENOMEM); memcpy(mcd->mcd_uuid, cluuid, sizeof(mcd->mcd_uuid)); med->med_mcd = mcd; - rc = mdt_client_add(obd, &obd->u.mds, med, -1); + rc = mds_client_add(obd, &obd->u.mds, med, -1); GOTO(out, rc); out: if (rc) { if (mcd) { - OBD_FREE_PTR(mcd); + OBD_FREE(mcd, sizeof(*mcd)); med->med_mcd = NULL; } class_disconnect(exp); @@ -163,42 +415,41 @@ out: RETURN(rc); } -int mdt_init_export(struct obd_export *exp) +static int mds_init_export(struct obd_export *exp) { - struct mdt_export_data *med = &exp->exp_mdt_data; + struct mds_export_data *med = &exp->exp_mds_data; INIT_LIST_HEAD(&med->med_open_head); spin_lock_init(&med->med_open_lock); - exp->exp_connecting = 1; RETURN(0); } -static int mdt_destroy_export(struct obd_export *export) +static int mds_destroy_export(struct obd_export *export) { - struct mdt_export_data *med; + struct mds_export_data *med; struct obd_device *obd = export->exp_obd; struct lvfs_run_ctxt saved; int rc = 0; ENTRY; - med = &export->exp_mdt_data; + med = &export->exp_mds_data; target_destroy_export(export); if (obd_uuid_equals(&export->exp_client_uuid, &obd->obd_uuid)) - RETURN(0); + GOTO(out, 0); push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); /* Close any open files (which may also cause orphan unlinking). */ spin_lock(&med->med_open_lock); while (!list_empty(&med->med_open_head)) { struct list_head *tmp = med->med_open_head.next; - struct mdt_file_data *mfd = - list_entry(tmp, struct mdt_file_data, mfd_list); + struct mds_file_data *mfd = + list_entry(tmp, struct mds_file_data, mfd_list); struct dentry *dentry = mfd->mfd_dentry; /* Remove mfd handle so it can't be found again. * We are consuming the mfd_list reference here. */ - mdt_mfd_unlink(mfd, 0); + mds_mfd_unlink(mfd, 0); spin_unlock(&med->med_open_lock); /* If you change this message, be sure to update @@ -207,9 +458,9 @@ static int mdt_destroy_export(struct obd_export *export) "%.*s (ino %lu)\n", obd->obd_name, dentry->d_name.len, dentry->d_name.name, dentry->d_inode->i_ino); /* child orphan sem protects orphan_dec_test and - * is_orphan race, mdt_mfd_close drops it */ - MDT_DOWN_WRITE_ORPHAN_SEM(dentry->d_inode); - rc = mdt_mfd_close(NULL, MDS_REQ_REC_OFF, obd, mfd, + * is_orphan race, mds_mfd_close drops it */ + MDS_DOWN_WRITE_ORPHAN_SEM(dentry->d_inode); + rc = mds_mfd_close(NULL, MDS_REQ_REC_OFF, obd, mfd, !(export->exp_flags & OBD_OPT_FAILOVER)); if (rc) @@ -218,12 +469,13 @@ static int mdt_destroy_export(struct obd_export *export) } spin_unlock(&med->med_open_lock); pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); - mdt_client_free(export); +out: + mds_client_free(export); RETURN(rc); } -static int mdt_disconnect(struct obd_export *exp) +static int mds_disconnect(struct obd_export *exp) { unsigned long irqflags; int rc; @@ -255,34 +507,131 @@ static int mdt_disconnect(struct obd_export *exp) RETURN(rc); } -static int mdt_getstatus(struct mdt_thread_info *info, - struct ptlrpc_request *req) +int mds_get_md(struct obd_device *obd, struct inode *inode, void *md, + int *size, int lock) { - struct md_device *mdd = info->mti_mdt->mdt_mdd; - int size = sizeof *body; - struct mds_body *body; - int result; + int rc = 0; + int lmm_size; + if (lock) + down(&inode->i_sem); + rc = fsfilt_get_md(obd, inode, md, *size, "lov"); + + if (rc < 0) { + CERROR("Error %d reading eadata for ino %lu\n", + rc, inode->i_ino); + } else if (rc > 0) { + lmm_size = rc; + rc = mds_convert_lov_ea(obd, inode, md, lmm_size); + + if (rc == 0) { + *size = lmm_size; + rc = lmm_size; + } else if (rc > 0) { + *size = rc; + } + } else { + *size = 0; + } + if (lock) + up(&inode->i_sem); + + RETURN (rc); +} + + +/* Call with lock=1 if you want mds_pack_md to take the i_sem. + * Call with lock=0 if the caller has already taken the i_sem. */ +int mds_pack_md(struct obd_device *obd, struct lustre_msg *msg, int offset, + struct mds_body *body, struct inode *inode, int lock) +{ + struct mds_obd *mds = &obd->u.mds; + void *lmm; + int lmm_size; + int rc; ENTRY; - result = lustre_pack_reply(req, 1, &size, NULL); - if (result) - CERROR(LUSTRE_MDT0_NAME" out of memory for message: size=%d\n", - size); - else if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK)) - result = -ENOMEM; - else { - body = lustre_msg_buf(req->rq_repmsg, 0, sizeof *body); - result = mdd->md_ops->mdo_root_get(mdd, &body->fid1); - } + lmm = lustre_msg_buf(msg, offset, 0); + if (lmm == NULL) { + /* Some problem with getting eadata when I sized the reply + * buffer... */ + CDEBUG(D_INFO, "no space reserved for inode %lu MD\n", + inode->i_ino); + RETURN(0); + } + lmm_size = msg->buflens[offset]; - /* the last_committed and last_xid fields are filled in for all - * replies already - no need to do so here also. + /* I don't really like this, but it is a sanity check on the client + * MD request. However, if the client doesn't know how much space + * to reserve for the MD, it shouldn't be bad to have too much space. */ - RETURN(result); + if (lmm_size > mds->mds_max_mdsize) { + CWARN("Reading MD for inode %lu of %d bytes > max %d\n", + inode->i_ino, lmm_size, mds->mds_max_mdsize); + // RETURN(-EINVAL); + } + + rc = mds_get_md(obd, inode, lmm, &lmm_size, lock); + if (rc > 0) { + if (S_ISDIR(inode->i_mode)) + body->valid |= OBD_MD_FLDIREA; + else + body->valid |= OBD_MD_FLEASIZE; + body->eadatasize = lmm_size; + rc = 0; + } + + RETURN(rc); } -static int mdt_getattr_internal(struct obd_device *obd, struct dentry *dentry, +#ifdef CONFIG_FS_POSIX_ACL +static +int mds_pack_posix_acl(struct inode *inode, struct lustre_msg *repmsg, + struct mds_body *repbody, int repoff) +{ + struct dentry de = { .d_inode = inode }; + int buflen, rc; + ENTRY; + + LASSERT(repbody->aclsize == 0); + LASSERT(repmsg->bufcount > repoff); + + buflen = lustre_msg_buflen(repmsg, repoff); + if (!buflen) + GOTO(out, 0); + + if (!inode->i_op || !inode->i_op->getxattr) + GOTO(out, 0); + + lock_24kernel(); + rc = inode->i_op->getxattr(&de, XATTR_NAME_ACL_ACCESS, + lustre_msg_buf(repmsg, repoff, buflen), + buflen); + unlock_24kernel(); + + if (rc >= 0) + repbody->aclsize = rc; + else if (rc != -ENODATA) { + CERROR("buflen %d, get acl: %d\n", buflen, rc); + RETURN(rc); + } + EXIT; +out: + repbody->valid |= OBD_MD_FLACL; + return 0; +} +#else +#define mds_pack_posix_acl(inode, repmsg, repbody, repoff) 0 +#endif + +int mds_pack_acl(struct mds_export_data *med, struct inode *inode, + struct lustre_msg *repmsg, struct mds_body *repbody, + int repoff) +{ + return mds_pack_posix_acl(inode, repmsg, repbody, repoff); +} + +static int mds_getattr_internal(struct obd_device *obd, struct dentry *dentry, struct ptlrpc_request *req, struct mds_body *reqbody, int reply_off) { @@ -297,13 +646,13 @@ static int mdt_getattr_internal(struct obd_device *obd, struct dentry *dentry, body = lustre_msg_buf(req->rq_repmsg, reply_off, sizeof(*body)); LASSERT(body != NULL); /* caller prepped reply */ - mdt_pack_inode2fid(&body->fid1, inode); - mdt_pack_inode2body(body, inode); + mds_pack_inode2fid(&body->fid1, inode); + mds_pack_inode2body(body, inode); reply_off++; if ((S_ISREG(inode->i_mode) && (reqbody->valid & OBD_MD_FLEASIZE)) || (S_ISDIR(inode->i_mode) && (reqbody->valid & OBD_MD_FLDIREA))) { - rc = mdt_pack_md(obd, req->rq_repmsg, reply_off, body, + rc = mds_pack_md(obd, req->rq_repmsg, reply_off, body, inode, 1); /* If we have LOV EA data, the OST holds size, atime, mtime */ @@ -341,22 +690,35 @@ static int mdt_getattr_internal(struct obd_device *obd, struct dentry *dentry, } if (reqbody->valid & OBD_MD_FLMODEASIZE) { - struct mdt_obd *mds = mdt_req2mds(req); - body->max_cookiesize = mds->mdt_max_cookiesize; - body->max_mdsize = mds->mdt_max_mdsize; + struct mds_obd *mds = mds_req2mds(req); + body->max_cookiesize = mds->mds_max_cookiesize; + body->max_mdsize = mds->mds_max_mdsize; body->valid |= OBD_MD_FLMODEASIZE; } if (rc) RETURN(rc); +#ifdef CONFIG_FS_POSIX_ACL + if ((req->rq_export->exp_connect_flags & OBD_CONNECT_ACL) && + (reqbody->valid & OBD_MD_FLACL)) { + rc = mds_pack_acl(&req->rq_export->exp_mds_data, + inode, req->rq_repmsg, + body, reply_off); + + lustre_shrink_reply(req, reply_off, body->aclsize, 0); + if (body->aclsize) + reply_off++; + } +#endif + RETURN(rc); } -static int mdt_getattr_pack_msg(struct ptlrpc_request *req, struct inode *inode, +static int mds_getattr_pack_msg(struct ptlrpc_request *req, struct inode *inode, int offset) { - struct mdt_obd *mds = mdt_req2mds(req); + struct mds_obd *mds = mds_req2mds(req); struct mds_body *body; int rc, size[2] = {sizeof(*body)}, bufcount = 1; ENTRY; @@ -367,10 +729,10 @@ static int mdt_getattr_pack_msg(struct ptlrpc_request *req, struct inode *inode, if ((S_ISREG(inode->i_mode) && (body->valid & OBD_MD_FLEASIZE)) || (S_ISDIR(inode->i_mode) && (body->valid & OBD_MD_FLDIREA))) { - LOCK_INODE_MUTEX(inode); + down(&inode->i_sem); rc = fsfilt_get_md(req->rq_export->exp_obd, inode, NULL, 0, "lov"); - UNLOCK_INODE_MUTEX(inode); + up(&inode->i_sem); CDEBUG(D_INODE, "got %d bytes MD data for inode %lu\n", rc, inode->i_ino); if (rc < 0) { @@ -380,10 +742,10 @@ static int mdt_getattr_pack_msg(struct ptlrpc_request *req, struct inode *inode, RETURN(rc); } size[bufcount] = 0; - } else if (rc > mds->mdt_max_mdsize) { + } else if (rc > mds->mds_max_mdsize) { size[bufcount] = 0; CERROR("MD size %d larger than maximum possible %u\n", - rc, mds->mdt_max_mdsize); + rc, mds->mds_max_mdsize); } else { size[bufcount] = rc; } @@ -398,8 +760,32 @@ static int mdt_getattr_pack_msg(struct ptlrpc_request *req, struct inode *inode, inode->i_size + 1, body->eadatasize); } +#ifdef CONFIG_FS_POSIX_ACL + if ((req->rq_export->exp_connect_flags & OBD_CONNECT_ACL) && + (body->valid & OBD_MD_FLACL)) { + struct dentry de = { .d_inode = inode }; + + size[bufcount] = 0; + if (inode->i_op && inode->i_op->getxattr) { + lock_24kernel(); + rc = inode->i_op->getxattr(&de, XATTR_NAME_ACL_ACCESS, + NULL, 0); + unlock_24kernel(); + + if (rc < 0) { + if (rc != -ENODATA) { + CERROR("got acl size: %d\n", rc); + RETURN(rc); + } + } else + size[bufcount] = rc; + } + bufcount++; + } +#endif + if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) { - CERROR("failed MDT_GETATTR_PACK test\n"); + CERROR("failed MDS_GETATTR_PACK test\n"); req->rq_status = -ENOMEM; RETURN(-ENOMEM); } @@ -414,11 +800,11 @@ static int mdt_getattr_pack_msg(struct ptlrpc_request *req, struct inode *inode, RETURN(0); } -static int mdt_getattr_name(int offset, struct ptlrpc_request *req, +static int mds_getattr_name(int offset, struct ptlrpc_request *req, int child_part, struct lustre_handle *child_lockh) { struct obd_device *obd = req->rq_export->exp_obd; - struct mdt_obd *mds = &obd->u.mds; + struct mds_obd *mds = &obd->u.mds; struct ldlm_reply *rep = NULL; struct lvfs_run_ctxt saved; struct mds_body *body; @@ -430,14 +816,14 @@ static int mdt_getattr_name(int offset, struct ptlrpc_request *req, char *name; ENTRY; - LASSERT(!strcmp(obd->obd_type->typ_name, LUSTRE_MDT_NAME)); + LASSERT(!strcmp(obd->obd_type->typ_name, LUSTRE_MDS_NAME)); /* Swab now, before anyone looks inside the request */ body = lustre_swab_reqbuf(req, offset, sizeof(*body), - lustre_swab_mdt_body); + lustre_swab_mds_body); if (body == NULL) { - CERROR("Can't swab mdt_body\n"); + CERROR("Can't swab mds_body\n"); RETURN(-EFAULT); } @@ -449,7 +835,7 @@ static int mdt_getattr_name(int offset, struct ptlrpc_request *req, } namesize = lustre_msg_buflen(req->rq_reqmsg, offset + 1); - rc = mdt_init_ucred(&uc, req, offset); + rc = mds_init_ucred(&uc, req, offset); if (rc) GOTO(cleanup, rc); @@ -464,6 +850,29 @@ static int mdt_getattr_name(int offset, struct ptlrpc_request *req, cleanup_phase = 1; /* kernel context */ intent_set_disposition(rep, DISP_LOOKUP_EXECD); + /* FIXME: handle raw lookup */ +#if 0 + if (body->valid == OBD_MD_FLID) { + struct mds_body *mds_reply; + int size = sizeof(*mds_reply); + ino_t inum; + // The user requested ONLY the inode number, so do a raw lookup + rc = lustre_pack_reply(req, 1, &size, NULL); + if (rc) { + CERROR("out of memory\n"); + GOTO(cleanup, rc); + } + + rc = dir->i_op->lookup_raw(dir, name, namesize - 1, &inum); + + mds_reply = lustre_msg_buf(req->rq_repmsg, offset, + sizeof(*mds_reply)); + mds_reply->fid1.id = inum; + mds_reply->valid = OBD_MD_FLID; + GOTO(cleanup, rc); + } +#endif + if (lustre_handle_is_used(child_lockh)) { LASSERT(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT); resent_req = 1; @@ -471,7 +880,7 @@ static int mdt_getattr_name(int offset, struct ptlrpc_request *req, if (resent_req == 0) { if (name) { - rc = mdt_get_parent_child_locked(obd, &obd->u.mds, &body->fid1, + rc = mds_get_parent_child_locked(obd, &obd->u.mds, &body->fid1, &parent_lockh, &dparent, LCK_CR, MDS_INODELOCK_UPDATE, @@ -480,10 +889,10 @@ static int mdt_getattr_name(int offset, struct ptlrpc_request *req, child_part); } else { /* For revalidate by fid we always take UPDATE lock */ - dchild = mdt_fid2locked_dentry(obd, &body->fid2, NULL, + dchild = mds_fid2locked_dentry(obd, &body->fid2, NULL, LCK_CR, child_lockh, NULL, 0, - MDT_INODELOCK_UPDATE); + MDS_INODELOCK_UPDATE); LASSERT(dchild); if (IS_ERR(dchild)) rc = PTR_ERR(dchild); @@ -504,7 +913,7 @@ static int mdt_getattr_name(int offset, struct ptlrpc_request *req, res = granted_lock->l_resource; child_fid.id = res->lr_name.name[0]; child_fid.generation = res->lr_name.name[1]; - dchild = mdt_fid2dentry(&obd->u.mds, &child_fid, NULL); + dchild = mds_fid2dentry(&obd->u.mds, &child_fid, NULL); LASSERT(!IS_ERR(dchild)); LDLM_LOCK_PUT(granted_lock); } @@ -521,14 +930,14 @@ static int mdt_getattr_name(int offset, struct ptlrpc_request *req, } if (req->rq_repmsg == NULL) { - rc = mdt_getattr_pack_msg(req, dchild->d_inode, offset); + rc = mds_getattr_pack_msg(req, dchild->d_inode, offset); if (rc != 0) { - CERROR ("mdt_getattr_pack_msg: %d\n", rc); + CERROR ("mds_getattr_pack_msg: %d\n", rc); GOTO (cleanup, rc); } } - rc = mdt_getattr_internal(obd, dchild, req, body, offset); + rc = mds_getattr_internal(obd, dchild, req, body, offset); GOTO(cleanup, rc); /* returns the lock to the client */ cleanup: @@ -601,7 +1010,6 @@ out_ucred: return rc; } - static int mds_obd_statfs(struct obd_device *obd, struct obd_statfs *osfs, unsigned long max_age) { @@ -646,6 +1054,194 @@ out: return 0; } +static int mds_sync(struct ptlrpc_request *req, int offset) +{ + struct obd_device *obd = req->rq_export->exp_obd; + struct mds_obd *mds = &obd->u.mds; + struct mds_body *body; + int rc, size = sizeof(*body); + ENTRY; + + body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_mds_body); + if (body == NULL) + GOTO(out, rc = -EFAULT); + + rc = lustre_pack_reply(req, 1, &size, NULL); + if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_SYNC_PACK)) { + CERROR("fsync lustre_pack_reply failed: rc = %d\n", rc); + GOTO(out, rc); + } + + if (body->fid1.id == 0) { + /* a fid of zero is taken to mean "sync whole filesystem" */ + rc = fsfilt_sync(obd, obd->u.obt.obt_sb); + GOTO(out, rc); + } else { + struct dentry *de; + + de = mds_fid2dentry(mds, &body->fid1, NULL); + if (IS_ERR(de)) + GOTO(out, rc = PTR_ERR(de)); + + /* The file parameter isn't used for anything */ + if (de->d_inode->i_fop && de->d_inode->i_fop->fsync) + rc = de->d_inode->i_fop->fsync(NULL, de, 1); + if (rc == 0) { + body = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*body)); + mds_pack_inode2fid(&body->fid1, de->d_inode); + mds_pack_inode2body(body, de->d_inode); + } + + l_dput(de); + GOTO(out, rc); + } +out: + req->rq_status = rc; + return 0; +} + +/* mds_readpage does not take a DLM lock on the inode, because the client must + * already have a PR lock. + * + * If we were to take another one here, a deadlock will result, if another + * thread is already waiting for a PW lock. */ +static int mds_readpage(struct ptlrpc_request *req, int offset) +{ + struct obd_device *obd = req->rq_export->exp_obd; + struct mds_obd *mds = &obd->u.mds; + struct vfsmount *mnt; + struct dentry *de; + struct file *file; + struct mds_body *body, *repbody; + struct lvfs_run_ctxt saved; + int rc, size = sizeof(*repbody); + struct lvfs_ucred uc = {NULL,}; + ENTRY; + + if (OBD_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK)) + RETURN(-ENOMEM); + + rc = lustre_pack_reply(req, 1, &size, NULL); + if (rc) { + CERROR("error packing readpage reply: rc %d\n", rc); + GOTO(out, rc); + } + + body = lustre_swab_reqbuf(req, offset, sizeof(*body), + lustre_swab_mds_body); + if (body == NULL) + GOTO (out, rc = -EFAULT); + + rc = mds_init_ucred(&uc, req, 0); + if (rc) + GOTO(out, rc); + + push_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc); + de = mds_fid2dentry(&obd->u.mds, &body->fid1, &mnt); + if (IS_ERR(de)) + GOTO(out_pop, rc = PTR_ERR(de)); + + CDEBUG(D_INODE, "ino %lu\n", de->d_inode->i_ino); + + file = dentry_open(de, mnt, O_RDONLY | O_LARGEFILE); + /* note: in case of an error, dentry_open puts dentry */ + if (IS_ERR(file)) + GOTO(out_pop, rc = PTR_ERR(file)); + + /* body->size is actually the offset -eeb */ + if ((body->size & (de->d_inode->i_blksize - 1)) != 0) { + CERROR("offset "LPU64" not on a block boundary of %lu\n", + body->size, de->d_inode->i_blksize); + GOTO(out_file, rc = -EFAULT); + } + + /* body->nlink is actually the #bytes to read -eeb */ + if (body->nlink & (de->d_inode->i_blksize - 1)) { + CERROR("size %u is not multiple of blocksize %lu\n", + body->nlink, de->d_inode->i_blksize); + GOTO(out_file, rc = -EFAULT); + } + + repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*repbody)); + repbody->size = file->f_dentry->d_inode->i_size; + repbody->valid = OBD_MD_FLSIZE; + + /* to make this asynchronous make sure that the handling function + doesn't send a reply when this function completes. Instead a + callback function would send the reply */ + /* body->size is actually the offset -eeb */ + rc = mds_sendpage(req, file, body->size, body->nlink); + +out_file: + filp_close(file, 0); +out_pop: + pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc); +out: + mds_exit_ucred(&uc, mds); + req->rq_status = rc; + RETURN(0); +} + +int mds_reint(struct ptlrpc_request *req, int offset, + struct lustre_handle *lockh) +{ + struct mds_update_record *rec; /* 116 bytes on the stack? no sir! */ + int rc; + + OBD_ALLOC(rec, sizeof(*rec)); + if (rec == NULL) + RETURN(-ENOMEM); + + rc = mds_update_unpack(req, offset, rec); + if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNPACK)) { + CERROR("invalid record\n"); + GOTO(out, req->rq_status = -EINVAL); + } + + /* rc will be used to interrupt a for loop over multiple records */ + rc = mds_reint_rec(rec, offset, req, lockh); + out: + OBD_FREE(rec, sizeof(*rec)); + return rc; +} + +static int mds_filter_recovery_request(struct ptlrpc_request *req, + struct obd_device *obd, int *process) +{ + switch (req->rq_reqmsg->opc) { + case MDS_CONNECT: /* This will never get here, but for completeness. */ + case OST_CONNECT: /* This will never get here, but for completeness. */ + case MDS_DISCONNECT: + case OST_DISCONNECT: + *process = 1; + RETURN(0); + + case MDS_CLOSE: + case MDS_SYNC: /* used in unmounting */ + case OBD_PING: + case MDS_REINT: + case LDLM_ENQUEUE: + *process = target_queue_recovery_request(req, obd); + RETURN(0); + + default: + DEBUG_REQ(D_ERROR, req, "not permitted during recovery"); + *process = 0; + /* XXX what should we set rq_status to here? */ + req->rq_status = -EAGAIN; + RETURN(ptlrpc_error(req)); + } +} + +static char *reint_names[] = { + [REINT_SETATTR] "setattr", + [REINT_CREATE] "create", + [REINT_LINK] "link", + [REINT_UNLINK] "unlink", + [REINT_RENAME] "rename", + [REINT_OPEN] "open", +}; + static int mds_set_info(struct obd_export *exp, struct ptlrpc_request *req) { char *key; @@ -683,6 +1279,118 @@ static int mds_set_info(struct obd_export *exp, struct ptlrpc_request *req) RETURN(0); } +static int mds_handle_quotacheck(struct ptlrpc_request *req) +{ + struct obd_quotactl *oqctl; + int rc; + ENTRY; + + oqctl = lustre_swab_reqbuf(req, 0, sizeof(*oqctl), + lustre_swab_obd_quotactl); + if (oqctl == NULL) + RETURN(-EPROTO); + + rc = lustre_pack_reply(req, 0, NULL, NULL); + if (rc) { + CERROR("mds: out of memory while packing quotacheck reply\n"); + RETURN(rc); + } + + req->rq_status = obd_quotacheck(req->rq_export, oqctl); + RETURN(0); +} + +static int mds_handle_quotactl(struct ptlrpc_request *req) +{ + struct obd_quotactl *oqctl, *repoqc; + int rc, size = sizeof(*repoqc); + ENTRY; + + oqctl = lustre_swab_reqbuf(req, 0, sizeof(*oqctl), + lustre_swab_obd_quotactl); + if (oqctl == NULL) + RETURN(-EPROTO); + + rc = lustre_pack_reply(req, 1, &size, NULL); + if (rc) + RETURN(rc); + + repoqc = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repoqc)); + + req->rq_status = obd_quotactl(req->rq_export, oqctl); + *repoqc = *oqctl; + RETURN(0); +} + +static int mds_msg_check_version(struct lustre_msg *msg) +{ + int rc; + + /* TODO: enable the below check while really introducing msg version. + * it's disabled because it will break compatibility with b1_4. + */ + return (0); + + switch (msg->opc) { + case MDS_CONNECT: + case MDS_DISCONNECT: + case OBD_PING: + rc = lustre_msg_check_version(msg, LUSTRE_OBD_VERSION); + if (rc) + CERROR("bad opc %u version %08x, expecting %08x\n", + msg->opc, msg->version, LUSTRE_OBD_VERSION); + break; + case MDS_GETSTATUS: + case MDS_GETATTR: + case MDS_GETATTR_NAME: + case MDS_STATFS: + case MDS_READPAGE: + case MDS_REINT: + case MDS_CLOSE: + case MDS_DONE_WRITING: + case MDS_PIN: + case MDS_SYNC: + case MDS_GETXATTR: + case MDS_SETXATTR: + case MDS_SET_INFO: + case MDS_QUOTACHECK: + case MDS_QUOTACTL: + case QUOTA_DQACQ: + case QUOTA_DQREL: + rc = lustre_msg_check_version(msg, LUSTRE_MDS_VERSION); + if (rc) + CERROR("bad opc %u version %08x, expecting %08x\n", + msg->opc, msg->version, LUSTRE_MDS_VERSION); + break; + case LDLM_ENQUEUE: + case LDLM_CONVERT: + case LDLM_BL_CALLBACK: + case LDLM_CP_CALLBACK: + rc = lustre_msg_check_version(msg, LUSTRE_DLM_VERSION); + if (rc) + CERROR("bad opc %u version %08x, expecting %08x\n", + msg->opc, msg->version, LUSTRE_DLM_VERSION); + break; + case OBD_LOG_CANCEL: + case LLOG_ORIGIN_HANDLE_CREATE: + case LLOG_ORIGIN_HANDLE_NEXT_BLOCK: + case LLOG_ORIGIN_HANDLE_PREV_BLOCK: + case LLOG_ORIGIN_HANDLE_READ_HEADER: + case LLOG_ORIGIN_HANDLE_CLOSE: + case LLOG_CATINFO: + rc = lustre_msg_check_version(msg, LUSTRE_LOG_VERSION); + if (rc) + CERROR("bad opc %u version %08x, expecting %08x\n", + msg->opc, msg->version, LUSTRE_LOG_VERSION); + break; + default: + CERROR("MDS unknown opcode %d\n", msg->opc); + rc = -ENOTSUPP; + } + return rc; +} + + enum mdt_handler_flags { /* * struct mds_body is passed in the 0-th incoming buffer. @@ -695,38 +1403,40 @@ struct mdt_handler { int mh_fail_id; __u32 mh_opc; __u32 mh_flags; - int (*mh_act)(struct mdt_thread_info *info, struct ptlrpc_request *req); + int (*mh_act)(struct mdt_thread_info *info, + struct ptlrpc_request *req, int offset); }; -#define DEF_HNDL(prefix, base, flags, name, fn) \ -[prefix ## name - prefix ## base] = { \ - .mh_name = #name, \ +#define DEF_HNDL(prefix, base, flags, opc, fn) \ +[prefix ## _ ## opc - prefix ## _ ## base] = { \ + .mh_name = #opc, \ .mh_fail_id = OBD_FAIL_ ## prefix ## _ ## opc ## _NET, \ .mh_opc = prefix ## _ ## opc, \ .mh_flags = flags, \ .mh_act = fn \ } -#define DEF_MDT_HNDL(flags, name, fn) DEF_HNDL(mdt, CONNECT, flags, name, fn) +#define DEF_MDT_HNDL(flags, name, fn) DEF_HNDL(MDS, GETATTR, flags, name, fn) static struct mdt_handler mdt_mds_ops[] = { - DEF_MDT_HNDL(0, CONNECT, mdt_connect), - DEF_MDT_HNDL(0, DISCONNECT, mdt_disconnect), DEF_MDT_HNDL(0, GETSTATUS, mdt_getstatus), - DEF_MDT_HNDL(HABEO_CORPUS, GETATTR, mdt_getattr), - DEF_MDT_HNDL(HABEO_CORPUS, GETATTR_NAME, mdt_getattr_name), - DEF_MDT_HNDL(HABEO_CORPUS, SETXATTR, mdt_setxattr), - DEF_MDT_HNDL(HABEO_CORPUS, GETXATTR, mdt_getxattr), - DEF_MDT_HNDL(0, STATFS, mdt_statfs), - DEF_MDT_HNDL(HABEO_CORPUS, READPAGE, mdt_readpage), - DEF_MDT_HNDL(0, REINT, mdt_reint), - DEF_MDT_HNDL(HABEO_CORPUS, CLOSE, mdt_close), - DEF_MDT_HNDL(HABEO_CORPUS, DONE_WRITING, mdt_done_writing), - DEF_MDT_HNDL(0, PIN, mdt_pin), - DEF_MDT_HNDL(HABEO_CORPUS, SYNC, mdt_sync), - DEF_MDT_HNDL(0, 0 /*SET_INFO*/, mdt_set_info), - DEF_MDT_HNDL(0, QUOTACHECK, mdt_handle_quotacheck), - DEF_MDT_HNDL(0, QUOTACTL, mdt_handle_quotactl) + + DEF_MDT_HNDL(0, CONNECT, mds_connect), + DEF_MDT_HNDL(0, DISCONNECT, mds_disconnect), + DEF_MDT_HNDL(HABEO_CORPUS, GETATTR, mds_getattr), + DEF_MDT_HNDL(HABEO_CORPUS, GETATTR_NAME, mds_getattr_name), + DEF_MDT_HNDL(HABEO_CORPUS, SETXATTR, mds_setxattr), + DEF_MDT_HNDL(HABEO_CORPUS, GETXATTR, mds_getxattr), + DEF_MDT_HNDL(0, STATFS, mds_statfs), + DEF_MDT_HNDL(HABEO_CORPUS, READPAGE, mds_readpage), + DEF_MDT_HNDL(0, REINT, mds_reint), + DEF_MDT_HNDL(HABEO_CORPUS, CLOSE, mds_close), + DEF_MDT_HNDL(HABEO_CORPUS, DONE_WRITING, mds_done_writing), + DEF_MDT_HNDL(0, PIN, mds_pin), + DEF_MDT_HNDL(HABEO_CORPUS, SYNC, mds_sync), + DEF_MDT_HNDL(0, SET_INFO, mds_set_info), + DEF_MDT_HNDL(0, QUOTACHECK, mds_handle_quotacheck), + DEF_MDT_HNDL(0, QUOTACTL, mds_handle_quotactl) }; static struct mdt_handler mdt_obd_ops[] = { @@ -765,44 +1475,6 @@ static struct mdt_opc_slice { } }; -enum { - MDT_REP_BUF_NR_MAX = 8 -}; - -/* - * Common data shared by mdt-level handlers. This is allocated per-thread to - * reduce stack consumption. - */ -struct mdt_thread_info { - struct mdt_device *mti_mdt; - /* - * number of buffers in reply message. - */ - int mti_rep_buf_nr; - /* - * sizes of reply buffers. - */ - int mti_rep_buf_size[MDT_REP_BUF_NR_MAX]; - /* - * Body for "habeo corpus" operations. - */ - struct mds_body *mti_body; - /* - * Host object. This is released at the end of mdt_handler(). - */ - struct mdt_object *mti_object; - /* - * Additional fail id that can be set by handler. Passed to - * target_send_reply(). - */ - int mti_fail_id; - /* - * Offset of incoming buffers. 0 for top-level request processing. +ve - * for intent handling. - */ - int mti_offset; -}; - struct mdt_handler *mdt_handler_find(__u32 opc) { int i; @@ -813,8 +1485,8 @@ struct mdt_handler *mdt_handler_find(__u32 opc) for (i = 0, s = mdt_handlers; i < ARRAY_SIZE(mdt_handlers); i++, s++) { if (s->mos_opc_start <= opc && opc < s->mos_opc_end) { h = s->mos_hs + (opc - s->mos_opc_start); - if (h->mos_opc != 0) - LASSERT(h->mos_opc == opc); + if (h->mh_opc != 0) + LASSERT(h->mh_opc == opc); else h = NULL; /* unsupported opc */ break; @@ -823,13 +1495,13 @@ struct mdt_handler *mdt_handler_find(__u32 opc) return h; } -struct mdt_object *mdt_object_find(struct mdt_device *d, struct lfid *f) +struct mdt_object *mdt_object_find(struct mdt_device *d, struct ll_fid *f) { struct lu_object *o; - o = lu_object_find(&d->mdt_lu_dev.ld_site, f); + o = lu_object_find(d->mdt_md_dev.md_lu_dev.ld_site, f); if (IS_ERR(o)) - return (struct mdd_object *)o; + return (struct mdt_object *)o; else return container_of(o, struct mdt_object, mot_obj.mo_lu); } @@ -844,6 +1516,7 @@ static int mdt_req_handle(struct mdt_thread_info *info, int shift) { int result; + int off; ENTRY; @@ -855,22 +1528,23 @@ static int mdt_req_handle(struct mdt_thread_info *info, if (h->mh_fail_id != 0) OBD_FAIL_RETURN(h->mh_fail_id, 0); - h->mh_offset = MDS_REQ_REC_OFF + shift; + off = MDS_REQ_REC_OFF + shift; + result = 0; if (h->mh_flags & HABEO_CORPUS) { - info->mti_body = lustre_swab_reqbuf(req, h->mh_offset, - sizeof *info->mti_body, + info->mti_body = lustre_swab_reqbuf(req, off, + sizeof *info->mti_body, lustre_swab_mds_body); if (info->mti_body == NULL) { CERROR("Can't unpack body\n"); result = req->rq_status = -EFAULT; } info->mti_object = mdt_object_find(info->mti_mdt, - info->mti_body.fid1); + &info->mti_body->fid1); if (IS_ERR(info->mti_object)) result = PTR_ERR(info->mti_object); } if (result == 0) - result = h->mh_act(info, h, req); + result = h->mh_act(info, req, off); /* * XXX result value is unconditionally shoved into ->rq_status * (original code sometimes placed error code into ->rq_status, and @@ -902,13 +1576,11 @@ static void mdt_thread_info_fini(struct mdt_thread_info *info) } } -int mdt_handle(struct ptlrpc_request *req) +static int mdt_handle0(struct ptlrpc_request *req, struct mdt_thread_info *info) { - int should_process, - int rc = 0; + int rc; struct mds_obd *mds = NULL; /* quell gcc overwarning */ struct obd_device *obd = NULL; - struct mdt_thread_info info; /* XXX on stack for now */ struct mdt_handler *h; ENTRY; @@ -963,6 +1635,8 @@ int mdt_handle(struct ptlrpc_request *req) if (abort_recovery) { target_abort_recovery(obd); } else if (recovering) { + int should_process; + rc = mds_filter_recovery_request(req, obd, &should_process); if (rc || !should_process) @@ -972,7 +1646,7 @@ int mdt_handle(struct ptlrpc_request *req) h = mdt_handler_find(req->rq_reqmsg->opc); if (h != NULL) { - rc = mdt_handle_req(&info, h, req, 0); + rc = mdt_req_handle(info, h, req, 0); } else { req->rq_status = -ENOTSUPP; rc = ptlrpc_error(req); @@ -996,95 +1670,80 @@ int mdt_handle(struct ptlrpc_request *req) if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) { if (obd && obd->obd_recovering) { DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply"); - return target_queue_final_reply(req, rc); + RETURN(target_queue_final_reply(req, rc)); } /* Lost a race with recovery; let the error path DTRT. */ rc = req->rq_status = -ENOTCONN; } - target_send_reply(req, rc, info.mti_fail_id); - return 0; + target_send_reply(req, rc, info->mti_fail_id); + RETURN(0); } -static int mdt_intent_policy(struct ldlm_namespace *ns, - struct ldlm_lock **lockp, void *req_cookie, - ldlm_mode_t mode, int flags, void *data) -{ - RETURN(ELDLM_LOCK_ABORTED); -} +static struct lu_device_operations mdt_lu_ops; -struct ptlrpc_service *ptlrpc_init_svc_conf(struct ptlrpc_service_conf *c, - svc_handler_t h, char *name, - struct proc_dir_entry *proc_entry, - svcreq_printfn_t prntfn) +static int lu_device_is_mdt(struct lu_device *d) { - return ptlrpc_init_svc(c->psc_nbufs, c->psc_bufsize, - c->psc_max_req_size, c->psc_max_reply_size, - c->psc_req_portal, c->psc_rep_portal, - c->psc_watchdog_timeout, - h, char name, proc_entry, - prntfn, c->psc_num_threads); + /* + * XXX for now. Tags in lu_device_type->ldt_something are needed. + */ + return ergo(d->ld_ops != NULL, d->ld_ops == &mdt_lu_ops); } -int md_device_init(struct md_device *md) +static struct mdt_object *mdt_obj(struct lu_object *o) { - return lu_device_init(&md->md_lu_dev); + LASSERT(lu_device_is_mdt(o->lo_dev)); + return container_of(o, struct mdt_object, mot_obj.mo_lu); } -void md_device_fini(struct md_device *md) +static struct mdt_device *mdt_dev(struct lu_device *d) { - lu_device_fini(&md->md_lu_dev); + LASSERT(lu_device_is_mdt(d)); + return container_of(d, struct mdt_device, mdt_md_dev.md_lu_dev); } -static struct lu_device_operations mdt_lu_ops; - -static int mdt_device_init(struct mdt_device *m) +int mdt_handle(struct ptlrpc_request *req) { - md_device_init(&m->mdt_md_dev); + int result; - m->mdt_md_dev.md_lu_dev.ld_ops = &mdt_lu_ops; + struct mdt_thread_info info; /* XXX on stack for now */ + mdt_thread_info_init(&info); + info.mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev); - m->mdt_service_conf.psc_nbufs = MDS_NBUFS; - m->mdt_service_conf.psc_bufsize = MDS_BUFSIZE; - m->mdt_service_conf.psc_max_req_size = MDS_MAXREQSIZE; - m->mdt_service_conf.psc_max_reply_size = MDS_MAXREPSIZE; - m->mdt_service_conf.psc_req_portal = MDS_REQUEST_PORTAL; - m->mdt_service_conf.psc_rep_portal = MDC_REPLY_PORTAL; - m->mdt_service_conf.psc_watchdog_timeout = MDS_SERVICE_WATCHDOG_TIMEOUT; - /* - * We'd like to have a mechanism to set this on a per-device basis, - * but alas... - */ - if (mds_num_threads < 2) - mds_num_threads = MDS_DEF_THREADS; - m->mdt_service_conf.psc_num_threads = min(mds_num_threads, - MDS_MAX_THREADS); - return 0; + result = mdt_handle0(req, &info); + + mdt_thread_info_fini(&info); + return result; } -static void mdt_device_fini(struct mdt_device *m) +static int mdt_intent_policy(struct ldlm_namespace *ns, + struct ldlm_lock **lockp, void *req_cookie, + ldlm_mode_t mode, int flags, void *data) { - md_device_fini(&m->mdt_md_dev); + RETURN(ELDLM_LOCK_ABORTED); } -static int lu_device_is_mdt(struct lu_device *d) +struct ptlrpc_service *ptlrpc_init_svc_conf(struct ptlrpc_service_conf *c, + svc_handler_t h, char *name, + struct proc_dir_entry *proc_entry, + svcreq_printfn_t prntfn) { - /* - * XXX for now. Tags in lu_device_type->ldt_something are needed. - */ - return ergo(d->ld_ops != NULL, d->ld_ops == &mdt_lu_ops); + return ptlrpc_init_svc(c->psc_nbufs, c->psc_bufsize, + c->psc_max_req_size, c->psc_max_reply_size, + c->psc_req_portal, c->psc_rep_portal, + c->psc_watchdog_timeout, + h, name, proc_entry, + prntfn, c->psc_num_threads); } -static struct mdt_device *mdt_dev(struct lu_device *d) +int md_device_init(struct md_device *md, struct lu_device_type *t) { - LASSERT(lu_device_is_mdt(d)); - return container_of(d, struct mdt_device, mdt_lu_dev); + return lu_device_init(&md->md_lu_dev, t); } -static struct mdt_object *mdt_obj(struct lu_object *o) +void md_device_fini(struct md_device *md) { - LASSERT(lu_device_is_mdt(o->lo_dev)); - return container_of(o, struct mdt_object, mot_obj.mo_lu); + lu_device_fini(&md->md_lu_dev); } static void mdt_fini(struct lu_device *d) @@ -1105,11 +1764,12 @@ static void mdt_fini(struct lu_device *d) } LASSERT(atomic_read(&d->ld_ref) == 0); + md_device_fini(&m->mdt_md_dev); } -static int mdt_init0(struct lu_device *d) +static int mdt_init0(struct mdt_device *m, + struct lu_device_type *t, struct lustre_cfg *cfg) { - struct mdt_device *m = mdt_dev(d); struct lu_site *s; char ns_name[48]; @@ -1119,38 +1779,46 @@ static int mdt_init0(struct lu_device *d) if (s == NULL) return -ENOMEM; - mdt_device_init(m); - lu_site_init(s, m); + md_device_init(&m->mdt_md_dev, t); + + m->mdt_md_dev.md_lu_dev.ld_ops = &mdt_lu_ops; + + m->mdt_service_conf.psc_nbufs = MDS_NBUFS; + m->mdt_service_conf.psc_bufsize = MDS_BUFSIZE; + m->mdt_service_conf.psc_max_req_size = MDS_MAXREQSIZE; + m->mdt_service_conf.psc_max_reply_size = MDS_MAXREPSIZE; + m->mdt_service_conf.psc_req_portal = MDS_REQUEST_PORTAL; + m->mdt_service_conf.psc_rep_portal = MDC_REPLY_PORTAL; + m->mdt_service_conf.psc_watchdog_timeout = MDS_SERVICE_WATCHDOG_TIMEOUT; + /* + * We'd like to have a mechanism to set this on a per-device basis, + * but alas... + */ + m->mdt_service_conf.psc_num_threads = min(max(mdt_num_threads, + MDT_MIN_THREADS), + MDT_MAX_THREADS); + lu_site_init(s, &m->mdt_md_dev.md_lu_dev); snprintf(ns_name, sizeof ns_name, LUSTRE_MDT0_NAME"-%p", m); m->mdt_namespace = ldlm_namespace_new(ns_name, LDLM_NAMESPACE_SERVER); if (m->mdt_namespace == NULL) return -ENOMEM; - ldlm_register_intent(m->mst_namespace, mdt_intent_policy); + ldlm_register_intent(m->mdt_namespace, mdt_intent_policy); ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL, "mdt_ldlm_client", &m->mdt_ldlm_client); - m->mdt_service = ptlrpc_init_svc_conf(&mdt->mdt_service_conf, - mdt_handle, LUSTRE_MDT0_NAME, - mdt->mdt_lu_dev.ld_proc_entry - NULL); + m->mdt_service = + ptlrpc_init_svc_conf(&m->mdt_service_conf, mdt_handle, + LUSTRE_MDT0_NAME, + m->mdt_md_dev.md_lu_dev.ld_proc_entry, + NULL); if (m->mdt_service == NULL) return -ENOMEM; return ptlrpc_start_threads(NULL, m->mdt_service, LUSTRE_MDT0_NAME); } -static int mdt_init(struct lu_device *d) -{ - int result; - - result = mdt_init0(d); - if (result != 0) - mdt_fini(d); - return result; -} - struct lu_object *mdt_object_alloc(struct lu_device *d) { struct mdt_object *mo; @@ -1166,6 +1834,7 @@ struct lu_object *mdt_object_alloc(struct lu_device *d) lu_object_init(o, h, d); /* ->lo_depth and ->lo_flags are automatically 0 */ lu_object_add_top(h, o); + return o; } else return NULL; } @@ -1176,8 +1845,8 @@ int mdt_object_init(struct lu_object *o) struct lu_device *under; struct lu_object *below; - under = &d->mdt_mdd->md_lu_dev; - below = under->ld_ops->ldo_alloc(under); + under = &d->mdt_child->md_lu_dev; + below = under->ld_ops->ldo_object_alloc(under); if (below != NULL) { lu_object_add(o, below); return 0; @@ -1187,7 +1856,7 @@ int mdt_object_init(struct lu_object *o) void mdt_object_free(struct lu_object *o) { - struct lu_object_header; + struct lu_object_header *h; h = o->lo_header; lu_object_fini(o); @@ -1204,70 +1873,129 @@ int mdt_object_print(struct seq_file *f, const struct lu_object *o) } static struct lu_device_operations mdt_lu_ops = { - .ldo_init = mdt_init, - .ldo_fini = mdt_fini, .ldo_object_alloc = mdt_object_alloc, .ldo_object_init = mdt_object_init, .ldo_object_free = mdt_object_free, .ldo_object_release = mdt_object_release, .ldo_object_print = mdt_object_print +}; + +static struct ll_fid *mdt_object_fid(struct mdt_object *o) +{ + return lu_object_fid(&o->mot_obj.mo_lu); +} + +static int mdt_object_lock(struct mdt_object *o, ldlm_mode_t mode) +{ + return fid_lock(mdt_object_fid(o), &o->mot_lh, mode); } -int mdt_mkdir(struct mdt_device *d, struct lfid *pfid, const char *name) +static void mdt_object_unlock(struct mdt_object *o, ldlm_mode_t mode) +{ + fid_unlock(mdt_object_fid(o), &o->mot_lh, mode); +} + +int mdt_mkdir(struct mdt_device *d, struct ll_fid *pfid, const char *name) { struct mdt_object *o; - struct lock_handle lh; int result; o = mdt_object_find(d, pfid); if (IS_ERR(o)) return PTR_ERR(o); - result = fid_lock(pfid, LCK_PW, &lh); + result = mdt_object_lock(o, LCK_PW); if (result == 0) { - result = d->mdt_dev.md_ops->mdo_mkdir(o, name); - fid_unlock(&lh); + result = d->mdt_child->md_ops->mdo_mkdir(&o->mot_obj, name); + mdt_object_unlock(o, LCK_PW); } mdt_object_put(o); return result; } -static struct obd_ops mdt_ops = { - .o_owner = THIS_MODULE, - .o_connect = mds_connect, - .o_reconnect = mds_reconnect, - .o_init_export = mds_init_export, - .o_destroy_export = mds_destroy_export, - .o_disconnect = mds_disconnect, - .o_setup = mds_setup, - .o_precleanup = mds_precleanup, - .o_cleanup = mds_cleanup, - .o_postrecov = mds_postrecov, - .o_statfs = mds_obd_statfs, - .o_iocontrol = mds_iocontrol, - .o_create = mds_obd_create, - .o_destroy = mds_obd_destroy, - .o_llog_init = mds_llog_init, - .o_llog_finish = mds_llog_finish, - .o_notify = mds_notify, - .o_health_check = mds_health_check, +static struct obd_ops mdt_obd_device_ops = { + .o_owner = THIS_MODULE +}; + +struct lu_device *mdt_device_alloc(struct lu_device_type *t, + struct lustre_cfg *cfg) +{ + struct lu_device *l; + struct mdt_device *m; + + OBD_ALLOC_PTR(m); + if (m != NULL) { + int result; + + l = &m->mdt_md_dev.md_lu_dev; + result = mdt_init0(m, t, cfg); + if (result != 0) { + mdt_fini(l); + m = ERR_PTR(result); + } + } else + l = ERR_PTR(-ENOMEM); + return l; +} + +void mdt_device_free(struct lu_device *m) +{ + mdt_fini(m); + OBD_FREE_PTR(m); +} + +int mdt_type_init(struct lu_device_type *t) +{ + return 0; +} + +void mdt_type_fini(struct lu_device_type *t) +{ +} + +static struct lu_device_type_operations mdt_device_type_ops = { + .ldto_init = mdt_type_init, + .ldto_fini = mdt_type_fini, + + .ldto_device_alloc = mdt_device_alloc, + .ldto_device_free = mdt_device_free +}; + +static struct lu_device_type mdt_device_type = { + .ldt_name = LUSTRE_MDT0_NAME, + .ldt_ops = &mdt_device_type_ops }; static int __init mdt_mod_init(void) { - return 0; + struct lprocfs_static_vars lvars; + struct obd_type *type; + int result; + + mdt_num_threads = MDT_NUM_THREADS; + lprocfs_init_vars(mdt, &lvars); + result = class_register_type(&mdt_obd_device_ops, + lvars.module_vars, LUSTRE_MDT0_NAME); + if (result == 0) { + type = class_get_type(LUSTRE_MDT0_NAME); + LASSERT(type != NULL); + type->typ_lu = &mdt_device_type; + result = type->typ_lu->ldt_ops->ldto_init(type->typ_lu); + if (result != 0) + class_unregister_type(LUSTRE_MDT0_NAME); + } + return result; } static void __exit mdt_mod_exit(void) { + class_unregister_type(LUSTRE_MDT0_NAME); } MODULE_AUTHOR("Cluster File Systems, Inc. "); MODULE_DESCRIPTION("Lustre Meta-data Target Prototype ("LUSTRE_MDT0_NAME")"); MODULE_LICENSE("GPL"); -CFS_MODULE_PARM(mdt_num_threads, "i", int, 0444, +CFS_MODULE_PARM(mdt_num_threads, "ul", ulong, 0444, "number of mdt service threads to start"); cfs_module(mdt, "0.0.2", mdt_mod_init, mdt_mod_exit); - -#endif /* 0 */ -- 1.8.3.1