From 7a80c675e94def12b91c14746183f7dc830d1bf3 Mon Sep 17 00:00:00 2001 From: nikita Date: Fri, 31 Mar 2006 09:45:55 +0000 Subject: [PATCH] mdt prototype updates --- lustre/mdt/mdt.h | 9 +- lustre/mdt/mdt_handler.c | 1683 ++++++++-------------------------------------- 2 files changed, 289 insertions(+), 1403 deletions(-) diff --git a/lustre/mdt/mdt.h b/lustre/mdt/mdt.h index 1c40b73..56f29a6 100644 --- a/lustre/mdt/mdt.h +++ b/lustre/mdt/mdt.h @@ -63,13 +63,20 @@ struct md_object { struct lu_object mo_lu; }; +static inline int lu_device_is_md(struct lu_device *d) +{ + return d->ld_type->ldt_tags & LU_DEVICE_MD; +} + static inline struct md_object *lu2md(struct lu_object *o) { + LASSERT(lu_device_is_md(o->lo_dev)); return container_of(o, struct md_object, mo_lu); } static inline struct md_device *md_device_get(struct md_object *o) { + LASSERT(lu_device_is_md(o->mo_lu.lo_dev)); return container_of(o->mo_lu.lo_dev, struct md_device, md_lu_dev); } @@ -133,7 +140,7 @@ struct mdt_thread_info { }; int fid_lock(const struct ll_fid *, struct lustre_handle *, ldlm_mode_t); -int fid_unlock(const struct ll_fid *, struct lustre_handle *, ldlm_mode_t); +void fid_unlock(const struct ll_fid *, struct lustre_handle *, ldlm_mode_t); #endif /* __KERNEL__ */ #endif /* _MDT_H */ diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index 1e20c39..0bc0cbd 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -37,18 +37,24 @@ #include -/* - * LUSTRE_VERSION_CODE - */ +/* LUSTRE_VERSION_CODE */ #include /* * struct OBD_{ALLOC,FREE}*() * OBD_FAIL_CHECK */ #include +/* struct ptlrpc_request */ +#include +/* struct obd_export */ +#include +/* struct obd_device */ +#include #include +/* struct mds_client_data */ +#include "../mds/mds_internal.h" #include "mdt.h" /* @@ -83,1320 +89,144 @@ static int mdt_getstatus(struct mdt_thread_info *info, RETURN(result); } -/* - * struct obd_device - */ -#include -/* - * struct class_connect() - */ -#include -/* - * struct obd_export - */ -#include -/* - * struct mds_client_data - */ -#include <../mds/mds_internal.h> -#include -#include -#include -#include -#include -#include -#include - -static int mds_intent_policy(struct ldlm_namespace *ns, - struct ldlm_lock **lockp, void *req_cookie, - ldlm_mode_t mode, int flags, void *data); -static int mds_postsetup(struct obd_device *obd); -static int mds_cleanup(struct obd_device *obd); - -/* Assumes caller has already pushed into the kernel filesystem context */ -static int mds_sendpage(struct ptlrpc_request *req, struct file *file, - loff_t offset, int count) +static int mdt_connect(struct mdt_thread_info *info, + struct ptlrpc_request *req, int offset) { - struct ptlrpc_bulk_desc *desc; - struct l_wait_info lwi; - struct page **pages; - int rc = 0, npages, i, tmpcount, tmpsize = 0; - ENTRY; - - LASSERT((offset & (PAGE_SIZE - 1)) == 0); /* I'm dubious about this */ - - npages = (count + PAGE_SIZE - 1) >> PAGE_SHIFT; - OBD_ALLOC(pages, sizeof(*pages) * npages); - if (!pages) - GOTO(out, rc = -ENOMEM); - - desc = ptlrpc_prep_bulk_exp(req, npages, BULK_PUT_SOURCE, - MDS_BULK_PORTAL); - if (desc == NULL) - GOTO(out_free, rc = -ENOMEM); - - for (i = 0, tmpcount = count; i < npages; i++, tmpcount -= tmpsize) { - tmpsize = tmpcount > PAGE_SIZE ? PAGE_SIZE : tmpcount; - - pages[i] = alloc_pages(GFP_KERNEL, 0); - if (pages[i] == NULL) - GOTO(cleanup_buf, rc = -ENOMEM); - - ptlrpc_prep_bulk_page(desc, pages[i], 0, tmpsize); - } - - for (i = 0, tmpcount = count; i < npages; i++, tmpcount -= tmpsize) { - tmpsize = tmpcount > PAGE_SIZE ? PAGE_SIZE : tmpcount; - CDEBUG(D_EXT2, "reading %u@%llu from dir %lu (size %llu)\n", - tmpsize, offset, file->f_dentry->d_inode->i_ino, - file->f_dentry->d_inode->i_size); - - rc = fsfilt_readpage(req->rq_export->exp_obd, file, - kmap(pages[i]), tmpsize, &offset); - kunmap(pages[i]); - - if (rc != tmpsize) - GOTO(cleanup_buf, rc = -EIO); - } - - LASSERT(desc->bd_nob == count); - - rc = ptlrpc_start_bulk_transfer(desc); - if (rc) - GOTO(cleanup_buf, rc); - - if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE)) { - CERROR("obd_fail_loc=%x, fail operation rc=%d\n", - OBD_FAIL_MDS_SENDPAGE, rc); - GOTO(abort_bulk, rc); - } - - lwi = LWI_TIMEOUT(obd_timeout * HZ / 4, NULL, NULL); - rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc), &lwi); - LASSERT (rc == 0 || rc == -ETIMEDOUT); - - if (rc == 0) { - if (desc->bd_success && - desc->bd_nob_transferred == count) - GOTO(cleanup_buf, rc); - - rc = -ETIMEDOUT; /* XXX should this be a different errno? */ - } - - DEBUG_REQ(D_ERROR, req, "bulk failed: %s %d(%d), evicting %s@%s\n", - (rc == -ETIMEDOUT) ? "timeout" : "network error", - desc->bd_nob_transferred, count, - req->rq_export->exp_client_uuid.uuid, - req->rq_export->exp_connection->c_remote_uuid.uuid); - - class_fail_export(req->rq_export); - - EXIT; - abort_bulk: - ptlrpc_abort_bulk (desc); - cleanup_buf: - for (i = 0; i < npages; i++) - if (pages[i]) - __free_pages(pages[i], 0); - - ptlrpc_free_bulk(desc); - out_free: - OBD_FREE(pages, sizeof(*pages) * npages); - out: - return rc; + return -EOPNOTSUPP; } -/* only valid locked dentries or errors should be returned */ -struct dentry *mds_fid2locked_dentry(struct obd_device *obd, struct ll_fid *fid, - struct vfsmount **mnt, int lock_mode, - struct lustre_handle *lockh, - char *name, int namelen, __u64 lockpart) +static int mdt_disconnect(struct mdt_thread_info *info, + struct ptlrpc_request *req, int offset) { - struct mds_obd *mds = &obd->u.mds; - struct dentry *de = mds_fid2dentry(mds, fid, mnt), *retval = de; - struct ldlm_res_id res_id = { .name = {0} }; - int flags = 0, rc; - ldlm_policy_data_t policy = { .l_inodebits = { lockpart} }; - ENTRY; - - if (IS_ERR(de)) - RETURN(de); - - res_id.name[0] = de->d_inode->i_ino; - res_id.name[1] = de->d_inode->i_generation; - rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, res_id, - LDLM_IBITS, &policy, lock_mode, &flags, - ldlm_blocking_ast, ldlm_completion_ast, - NULL, NULL, NULL, 0, NULL, lockh); - if (rc != ELDLM_OK) { - l_dput(de); - retval = ERR_PTR(-EIO); /* XXX translate ldlm code */ - } - - RETURN(retval); + return -EOPNOTSUPP; } -/* Look up an entry by inode number. */ -/* this function ONLY returns valid dget'd dentries with an initialized inode - or errors */ -struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid, - struct vfsmount **mnt) +static int mdt_getattr(struct mdt_thread_info *info, + struct ptlrpc_request *req, int offset) { - char fid_name[32]; - unsigned long ino = fid->id; - __u32 generation = fid->generation; - struct inode *inode; - struct dentry *result; - - if (ino == 0) - RETURN(ERR_PTR(-ESTALE)); - - snprintf(fid_name, sizeof(fid_name), "0x%lx", ino); - - CDEBUG(D_DENTRY, "--> mds_fid2dentry: ino/gen %lu/%u, sb %p\n", - ino, generation, mds->mds_obt.obt_sb); - - /* under ext3 this is neither supposed to return bad inodes - nor NULL inodes. */ - result = ll_lookup_one_len(fid_name, mds->mds_fid_de, strlen(fid_name)); - if (IS_ERR(result)) - RETURN(result); - - inode = result->d_inode; - if (!inode) - RETURN(ERR_PTR(-ENOENT)); - - if (inode->i_generation == 0 || inode->i_nlink == 0) { - LCONSOLE_WARN("Found inode with zero generation or link -- this" - " may indicate disk corruption (inode: %lu, link:" - " %lu, count: %d)\n", inode->i_ino, - (unsigned long)inode->i_nlink, - atomic_read(&inode->i_count)); - dput(result); - RETURN(ERR_PTR(-ENOENT)); - } - - if (generation && inode->i_generation != generation) { - /* we didn't find the right inode.. */ - CDEBUG(D_INODE, "found wrong generation: inode %lu, link: %lu, " - "count: %d, generation %u/%u\n", inode->i_ino, - (unsigned long)inode->i_nlink, - atomic_read(&inode->i_count), inode->i_generation, - generation); - dput(result); - RETURN(ERR_PTR(-ENOENT)); - } - - if (mnt) { - *mnt = mds->mds_vfsmnt; - mntget(*mnt); - } - - RETURN(result); -} - -static int mds_connect_internal(struct obd_export *exp, - struct obd_connect_data *data) -{ - struct obd_device *obd = exp->exp_obd; - if (data != NULL) { - data->ocd_connect_flags &= MDS_CONNECT_SUPPORTED; - data->ocd_ibits_known &= MDS_INODELOCK_FULL; - - /* If no known bits (which should not happen, probably, - as everybody should support LOOKUP and UPDATE bits at least) - revert to compat mode with plain locks. */ - if (!data->ocd_ibits_known && - data->ocd_connect_flags & OBD_CONNECT_IBITS) - data->ocd_connect_flags &= ~OBD_CONNECT_IBITS; - - if (!obd->u.mds.mds_fl_acl) - data->ocd_connect_flags &= ~OBD_CONNECT_ACL; - - if (!obd->u.mds.mds_fl_user_xattr) - data->ocd_connect_flags &= ~OBD_CONNECT_XATTR; - - exp->exp_connect_flags = data->ocd_connect_flags; - data->ocd_version = LUSTRE_VERSION_CODE; - exp->exp_mds_data.med_ibits_known = data->ocd_ibits_known; - } - - if (obd->u.mds.mds_fl_acl && - ((exp->exp_connect_flags & OBD_CONNECT_ACL) == 0)) { - CWARN("%s: MDS requires ACL support but client does not\n", - obd->obd_name); - return -EBADE; - } - return 0; -} - -static int mds_reconnect(struct obd_export *exp, struct obd_device *obd, - struct obd_uuid *cluuid, - struct obd_connect_data *data) -{ - int rc; - ENTRY; - - if (exp == NULL || obd == NULL || cluuid == NULL) - RETURN(-EINVAL); - - rc = mds_connect_internal(exp, data); - - RETURN(rc); + return -EOPNOTSUPP; } -/* Establish a connection to the MDS. - * - * This will set up an export structure for the client to hold state data - * about that client, like open files, the last operation number it did - * on the server, etc. - */ -static int mds_connect(struct lustre_handle *conn, struct obd_device *obd, - struct obd_uuid *cluuid, struct obd_connect_data *data) +static int mdt_getattr_name(struct mdt_thread_info *info, + struct ptlrpc_request *req, int offset) { - struct obd_export *exp; - struct mds_export_data *med; - struct mds_client_data *mcd = NULL; - int rc, abort_recovery; - ENTRY; - - if (!conn || !obd || !cluuid) - RETURN(-EINVAL); - - /* Check for aborted recovery. */ - spin_lock_bh(&obd->obd_processing_task_lock); - abort_recovery = obd->obd_abort_recovery; - spin_unlock_bh(&obd->obd_processing_task_lock); - if (abort_recovery) - target_abort_recovery(obd); - - /* XXX There is a small race between checking the list and adding a - * new connection for the same UUID, but the real threat (list - * corruption when multiple different clients connect) is solved. - * - * There is a second race between adding the export to the list, - * and filling in the client data below. Hence skipping the case - * of NULL mcd above. We should already be controlling multiple - * connects at the client, and we can't hold the spinlock over - * memory allocations without risk of deadlocking. - */ - rc = class_connect(conn, obd, cluuid); - if (rc) - RETURN(rc); - exp = class_conn2export(conn); - LASSERT(exp); - med = &exp->exp_mds_data; - - rc = mds_connect_internal(exp, data); - if (rc) - GOTO(out, rc); - - OBD_ALLOC(mcd, sizeof(*mcd)); - if (!mcd) - GOTO(out, rc = -ENOMEM); - - memcpy(mcd->mcd_uuid, cluuid, sizeof(mcd->mcd_uuid)); - med->med_mcd = mcd; - - rc = mds_client_add(obd, &obd->u.mds, med, -1); - GOTO(out, rc); - -out: - if (rc) { - if (mcd) { - OBD_FREE(mcd, sizeof(*mcd)); - med->med_mcd = NULL; - } - class_disconnect(exp); - } else { - class_export_put(exp); - } - - RETURN(rc); -} - -static int mds_init_export(struct obd_export *exp) -{ - struct mds_export_data *med = &exp->exp_mds_data; - - INIT_LIST_HEAD(&med->med_open_head); - spin_lock_init(&med->med_open_lock); - RETURN(0); -} - -static int mds_destroy_export(struct obd_export *export) -{ - struct mds_export_data *med; - struct obd_device *obd = export->exp_obd; - struct lvfs_run_ctxt saved; - int rc = 0; - ENTRY; - - med = &export->exp_mds_data; - target_destroy_export(export); - - if (obd_uuid_equals(&export->exp_client_uuid, &obd->obd_uuid)) - GOTO(out, 0); - - push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); - /* Close any open files (which may also cause orphan unlinking). */ - spin_lock(&med->med_open_lock); - while (!list_empty(&med->med_open_head)) { - struct list_head *tmp = med->med_open_head.next; - struct mds_file_data *mfd = - list_entry(tmp, struct mds_file_data, mfd_list); - struct dentry *dentry = mfd->mfd_dentry; - - /* Remove mfd handle so it can't be found again. - * We are consuming the mfd_list reference here. */ - mds_mfd_unlink(mfd, 0); - spin_unlock(&med->med_open_lock); - - /* If you change this message, be sure to update - * replay_single:test_46 */ - CDEBUG(D_INODE|D_IOCTL, "%s: force closing file handle for " - "%.*s (ino %lu)\n", obd->obd_name, dentry->d_name.len, - dentry->d_name.name, dentry->d_inode->i_ino); - /* child orphan sem protects orphan_dec_test and - * is_orphan race, mds_mfd_close drops it */ - MDS_DOWN_WRITE_ORPHAN_SEM(dentry->d_inode); - rc = mds_mfd_close(NULL, MDS_REQ_REC_OFF, obd, mfd, - !(export->exp_flags & OBD_OPT_FAILOVER)); - - if (rc) - CDEBUG(D_INODE|D_IOCTL, "Error closing file: %d\n", rc); - spin_lock(&med->med_open_lock); - } - spin_unlock(&med->med_open_lock); - pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); -out: - mds_client_free(export); - - RETURN(rc); + return -EOPNOTSUPP; } -static int mds_disconnect(struct obd_export *exp) +static int mdt_setxattr(struct mdt_thread_info *info, + struct ptlrpc_request *req, int offset) { - unsigned long irqflags; - int rc; - ENTRY; - - LASSERT(exp); - class_export_get(exp); - - /* Disconnect early so that clients can't keep using export */ - rc = class_disconnect(exp); - ldlm_cancel_locks_for_export(exp); - - /* complete all outstanding replies */ - spin_lock_irqsave(&exp->exp_lock, irqflags); - while (!list_empty(&exp->exp_outstanding_replies)) { - struct ptlrpc_reply_state *rs = - list_entry(exp->exp_outstanding_replies.next, - struct ptlrpc_reply_state, rs_exp_list); - struct ptlrpc_service *svc = rs->rs_service; - - spin_lock(&svc->srv_lock); - list_del_init(&rs->rs_exp_list); - ptlrpc_schedule_difficult_reply(rs); - spin_unlock(&svc->srv_lock); - } - spin_unlock_irqrestore(&exp->exp_lock, irqflags); - - class_export_put(exp); - RETURN(rc); + return -EOPNOTSUPP; } -int mds_get_md(struct obd_device *obd, struct inode *inode, void *md, - int *size, int lock) +static int mdt_getxattr(struct mdt_thread_info *info, + struct ptlrpc_request *req, int offset) { - int rc = 0; - int lmm_size; - - if (lock) - down(&inode->i_sem); - rc = fsfilt_get_md(obd, inode, md, *size, "lov"); - - if (rc < 0) { - CERROR("Error %d reading eadata for ino %lu\n", - rc, inode->i_ino); - } else if (rc > 0) { - lmm_size = rc; - rc = mds_convert_lov_ea(obd, inode, md, lmm_size); - - if (rc == 0) { - *size = lmm_size; - rc = lmm_size; - } else if (rc > 0) { - *size = rc; - } - } else { - *size = 0; - } - if (lock) - up(&inode->i_sem); - - RETURN (rc); + return -EOPNOTSUPP; } - -/* Call with lock=1 if you want mds_pack_md to take the i_sem. - * Call with lock=0 if the caller has already taken the i_sem. */ -int mds_pack_md(struct obd_device *obd, struct lustre_msg *msg, int offset, - struct mds_body *body, struct inode *inode, int lock) +static int mdt_statfs(struct mdt_thread_info *info, + struct ptlrpc_request *req, int offset) { - struct mds_obd *mds = &obd->u.mds; - void *lmm; - int lmm_size; - int rc; - ENTRY; - - lmm = lustre_msg_buf(msg, offset, 0); - if (lmm == NULL) { - /* Some problem with getting eadata when I sized the reply - * buffer... */ - CDEBUG(D_INFO, "no space reserved for inode %lu MD\n", - inode->i_ino); - RETURN(0); - } - lmm_size = msg->buflens[offset]; - - /* I don't really like this, but it is a sanity check on the client - * MD request. However, if the client doesn't know how much space - * to reserve for the MD, it shouldn't be bad to have too much space. - */ - if (lmm_size > mds->mds_max_mdsize) { - CWARN("Reading MD for inode %lu of %d bytes > max %d\n", - inode->i_ino, lmm_size, mds->mds_max_mdsize); - // RETURN(-EINVAL); - } - - rc = mds_get_md(obd, inode, lmm, &lmm_size, lock); - if (rc > 0) { - if (S_ISDIR(inode->i_mode)) - body->valid |= OBD_MD_FLDIREA; - else - body->valid |= OBD_MD_FLEASIZE; - body->eadatasize = lmm_size; - rc = 0; - } - - RETURN(rc); + return -EOPNOTSUPP; } -#ifdef CONFIG_FS_POSIX_ACL -static -int mds_pack_posix_acl(struct inode *inode, struct lustre_msg *repmsg, - struct mds_body *repbody, int repoff) +static int mdt_readpage(struct mdt_thread_info *info, + struct ptlrpc_request *req, int offset) { - struct dentry de = { .d_inode = inode }; - int buflen, rc; - ENTRY; - - LASSERT(repbody->aclsize == 0); - LASSERT(repmsg->bufcount > repoff); - - buflen = lustre_msg_buflen(repmsg, repoff); - if (!buflen) - GOTO(out, 0); - - if (!inode->i_op || !inode->i_op->getxattr) - GOTO(out, 0); - - lock_24kernel(); - rc = inode->i_op->getxattr(&de, XATTR_NAME_ACL_ACCESS, - lustre_msg_buf(repmsg, repoff, buflen), - buflen); - unlock_24kernel(); - - if (rc >= 0) - repbody->aclsize = rc; - else if (rc != -ENODATA) { - CERROR("buflen %d, get acl: %d\n", buflen, rc); - RETURN(rc); - } - EXIT; -out: - repbody->valid |= OBD_MD_FLACL; - return 0; + return -EOPNOTSUPP; } -#else -#define mds_pack_posix_acl(inode, repmsg, repbody, repoff) 0 -#endif -int mds_pack_acl(struct mds_export_data *med, struct inode *inode, - struct lustre_msg *repmsg, struct mds_body *repbody, - int repoff) +static int mdt_reint(struct mdt_thread_info *info, + struct ptlrpc_request *req, int offset) { - return mds_pack_posix_acl(inode, repmsg, repbody, repoff); + return -EOPNOTSUPP; } -static int mds_getattr_internal(struct obd_device *obd, struct dentry *dentry, - struct ptlrpc_request *req, - struct mds_body *reqbody, int reply_off) +static int mdt_close(struct mdt_thread_info *info, + struct ptlrpc_request *req, int offset) { - struct mds_body *body; - struct inode *inode = dentry->d_inode; - int rc = 0; - ENTRY; - - if (inode == NULL) - RETURN(-ENOENT); - - body = lustre_msg_buf(req->rq_repmsg, reply_off, sizeof(*body)); - LASSERT(body != NULL); /* caller prepped reply */ - - mds_pack_inode2fid(&body->fid1, inode); - mds_pack_inode2body(body, inode); - reply_off++; - - if ((S_ISREG(inode->i_mode) && (reqbody->valid & OBD_MD_FLEASIZE)) || - (S_ISDIR(inode->i_mode) && (reqbody->valid & OBD_MD_FLDIREA))) { - rc = mds_pack_md(obd, req->rq_repmsg, reply_off, body, - inode, 1); - - /* If we have LOV EA data, the OST holds size, atime, mtime */ - if (!(body->valid & OBD_MD_FLEASIZE) && - !(body->valid & OBD_MD_FLDIREA)) - body->valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS | - OBD_MD_FLATIME | OBD_MD_FLMTIME); - - lustre_shrink_reply(req, reply_off, body->eadatasize, 0); - if (body->eadatasize) - reply_off++; - } else if (S_ISLNK(inode->i_mode) && - (reqbody->valid & OBD_MD_LINKNAME) != 0) { - char *symname = lustre_msg_buf(req->rq_repmsg, reply_off, 0); - int len; - - LASSERT (symname != NULL); /* caller prepped reply */ - len = req->rq_repmsg->buflens[reply_off]; - - rc = inode->i_op->readlink(dentry, symname, len); - if (rc < 0) { - CERROR("readlink failed: %d\n", rc); - } else if (rc != len - 1) { - CERROR ("Unexpected readlink rc %d: expecting %d\n", - rc, len - 1); - rc = -EINVAL; - } else { - CDEBUG(D_INODE, "read symlink dest %s\n", symname); - body->valid |= OBD_MD_LINKNAME; - body->eadatasize = rc + 1; - symname[rc] = 0; /* NULL terminate */ - rc = 0; - } - reply_off++; - } - - if (reqbody->valid & OBD_MD_FLMODEASIZE) { - struct mds_obd *mds = mds_req2mds(req); - body->max_cookiesize = mds->mds_max_cookiesize; - body->max_mdsize = mds->mds_max_mdsize; - body->valid |= OBD_MD_FLMODEASIZE; - } - - if (rc) - RETURN(rc); - -#ifdef CONFIG_FS_POSIX_ACL - if ((req->rq_export->exp_connect_flags & OBD_CONNECT_ACL) && - (reqbody->valid & OBD_MD_FLACL)) { - rc = mds_pack_acl(&req->rq_export->exp_mds_data, - inode, req->rq_repmsg, - body, reply_off); - - lustre_shrink_reply(req, reply_off, body->aclsize, 0); - if (body->aclsize) - reply_off++; - } -#endif - - RETURN(rc); + return -EOPNOTSUPP; } -static int mds_getattr_pack_msg(struct ptlrpc_request *req, struct inode *inode, - int offset) +static int mdt_done_writing(struct mdt_thread_info *info, + struct ptlrpc_request *req, int offset) { - struct mds_obd *mds = mds_req2mds(req); - struct mds_body *body; - int rc, size[2] = {sizeof(*body)}, bufcount = 1; - ENTRY; - - body = lustre_msg_buf(req->rq_reqmsg, offset, sizeof (*body)); - LASSERT(body != NULL); /* checked by caller */ - LASSERT_REQSWABBED(req, offset); /* swabbed by caller */ - - if ((S_ISREG(inode->i_mode) && (body->valid & OBD_MD_FLEASIZE)) || - (S_ISDIR(inode->i_mode) && (body->valid & OBD_MD_FLDIREA))) { - down(&inode->i_sem); - rc = fsfilt_get_md(req->rq_export->exp_obd, inode, NULL, 0, - "lov"); - up(&inode->i_sem); - CDEBUG(D_INODE, "got %d bytes MD data for inode %lu\n", - rc, inode->i_ino); - if (rc < 0) { - if (rc != -ENODATA) { - CERROR("error getting inode %lu MD: rc = %d\n", - inode->i_ino, rc); - RETURN(rc); - } - size[bufcount] = 0; - } else if (rc > mds->mds_max_mdsize) { - size[bufcount] = 0; - CERROR("MD size %d larger than maximum possible %u\n", - rc, mds->mds_max_mdsize); - } else { - size[bufcount] = rc; - } - bufcount++; - } else if (S_ISLNK(inode->i_mode) && (body->valid & OBD_MD_LINKNAME)) { - if (inode->i_size + 1 != body->eadatasize) - CERROR("symlink size: %Lu, reply space: %d\n", - inode->i_size + 1, body->eadatasize); - size[bufcount] = min_t(int, inode->i_size+1, body->eadatasize); - bufcount++; - CDEBUG(D_INODE, "symlink size: %Lu, reply space: %d\n", - inode->i_size + 1, body->eadatasize); - } - -#ifdef CONFIG_FS_POSIX_ACL - if ((req->rq_export->exp_connect_flags & OBD_CONNECT_ACL) && - (body->valid & OBD_MD_FLACL)) { - struct dentry de = { .d_inode = inode }; - - size[bufcount] = 0; - if (inode->i_op && inode->i_op->getxattr) { - lock_24kernel(); - rc = inode->i_op->getxattr(&de, XATTR_NAME_ACL_ACCESS, - NULL, 0); - unlock_24kernel(); - - if (rc < 0) { - if (rc != -ENODATA) { - CERROR("got acl size: %d\n", rc); - RETURN(rc); - } - } else - size[bufcount] = rc; - } - bufcount++; - } -#endif - - if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) { - CERROR("failed MDS_GETATTR_PACK test\n"); - req->rq_status = -ENOMEM; - RETURN(-ENOMEM); - } - - rc = lustre_pack_reply(req, bufcount, size, NULL); - if (rc) { - CERROR("lustre_pack_reply failed: rc %d\n", rc); - req->rq_status = rc; - RETURN(rc); - } - - RETURN(0); + return -EOPNOTSUPP; } -static int mds_getattr_name(int offset, struct ptlrpc_request *req, - int child_part, struct lustre_handle *child_lockh) -{ - struct obd_device *obd = req->rq_export->exp_obd; - struct mds_obd *mds = &obd->u.mds; - struct ldlm_reply *rep = NULL; - struct lvfs_run_ctxt saved; - struct mds_body *body; - struct dentry *dparent = NULL, *dchild = NULL; - struct lvfs_ucred uc = {NULL,}; - struct lustre_handle parent_lockh; - int namesize; - int rc = 0, cleanup_phase = 0, resent_req = 0; - char *name; - ENTRY; - - LASSERT(!strcmp(obd->obd_type->typ_name, LUSTRE_MDS_NAME)); - - /* Swab now, before anyone looks inside the request */ - - body = lustre_swab_reqbuf(req, offset, sizeof(*body), - lustre_swab_mds_body); - if (body == NULL) { - CERROR("Can't swab mds_body\n"); - RETURN(-EFAULT); - } - - LASSERT_REQSWAB(req, offset + 1); - name = lustre_msg_string(req->rq_reqmsg, offset + 1, 0); - if (name == NULL) { - CERROR("Can't unpack name\n"); - RETURN(-EFAULT); - } - namesize = lustre_msg_buflen(req->rq_reqmsg, offset + 1); - - rc = mds_init_ucred(&uc, req, offset); - if (rc) - GOTO(cleanup, rc); - - LASSERT (offset == MDS_REQ_REC_OFF || offset == MDS_REQ_INTENT_REC_OFF); - /* if requests were at offset 2, the getattr reply goes back at 1 */ - if (offset == MDS_REQ_INTENT_REC_OFF) { - rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*rep)); - offset = 1; - } - - push_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc); - cleanup_phase = 1; /* kernel context */ - intent_set_disposition(rep, DISP_LOOKUP_EXECD); - - /* FIXME: handle raw lookup */ -#if 0 - if (body->valid == OBD_MD_FLID) { - struct mds_body *mds_reply; - int size = sizeof(*mds_reply); - ino_t inum; - // The user requested ONLY the inode number, so do a raw lookup - rc = lustre_pack_reply(req, 1, &size, NULL); - if (rc) { - CERROR("out of memory\n"); - GOTO(cleanup, rc); - } - - rc = dir->i_op->lookup_raw(dir, name, namesize - 1, &inum); - - mds_reply = lustre_msg_buf(req->rq_repmsg, offset, - sizeof(*mds_reply)); - mds_reply->fid1.id = inum; - mds_reply->valid = OBD_MD_FLID; - GOTO(cleanup, rc); - } -#endif - - if (lustre_handle_is_used(child_lockh)) { - LASSERT(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT); - resent_req = 1; - } - - if (resent_req == 0) { - if (name) { - rc = mds_get_parent_child_locked(obd, &obd->u.mds, &body->fid1, - &parent_lockh, &dparent, - LCK_CR, - MDS_INODELOCK_UPDATE, - name, namesize, - child_lockh, &dchild, LCK_CR, - child_part); - } else { - /* For revalidate by fid we always take UPDATE lock */ - dchild = mds_fid2locked_dentry(obd, &body->fid2, NULL, - LCK_CR, child_lockh, - NULL, 0, - MDS_INODELOCK_UPDATE); - LASSERT(dchild); - if (IS_ERR(dchild)) - rc = PTR_ERR(dchild); - } - if (rc) - GOTO(cleanup, rc); - } else { - struct ldlm_lock *granted_lock; - struct ll_fid child_fid; - struct ldlm_resource *res; - DEBUG_REQ(D_DLMTRACE, req, "resent, not enqueuing new locks"); - granted_lock = ldlm_handle2lock(child_lockh); - LASSERTF(granted_lock != NULL, LPU64"/%u lockh "LPX64"\n", - body->fid1.id, body->fid1.generation, - child_lockh->cookie); - - - res = granted_lock->l_resource; - child_fid.id = res->lr_name.name[0]; - child_fid.generation = res->lr_name.name[1]; - dchild = mds_fid2dentry(&obd->u.mds, &child_fid, NULL); - LASSERT(!IS_ERR(dchild)); - LDLM_LOCK_PUT(granted_lock); - } - - cleanup_phase = 2; /* dchild, dparent, locks */ - - if (dchild->d_inode == NULL) { - intent_set_disposition(rep, DISP_LOOKUP_NEG); - /* in the intent case, the policy clears this error: - the disposition is enough */ - GOTO(cleanup, rc = -ENOENT); - } else { - intent_set_disposition(rep, DISP_LOOKUP_POS); - } - - if (req->rq_repmsg == NULL) { - rc = mds_getattr_pack_msg(req, dchild->d_inode, offset); - if (rc != 0) { - CERROR ("mds_getattr_pack_msg: %d\n", rc); - GOTO (cleanup, rc); - } - } - - rc = mds_getattr_internal(obd, dchild, req, body, offset); - GOTO(cleanup, rc); /* returns the lock to the client */ - - cleanup: - switch (cleanup_phase) { - case 2: - if (resent_req == 0) { - if (rc && dchild->d_inode) - ldlm_lock_decref(child_lockh, LCK_CR); - ldlm_lock_decref(&parent_lockh, LCK_CR); - l_dput(dparent); - } - l_dput(dchild); - case 1: - pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc); - default: - mds_exit_ucred(&uc, mds); - if (req->rq_reply_state == NULL) { - req->rq_status = rc; - lustre_pack_reply(req, 0, NULL, NULL); - } - } - return rc; -} - -static int mds_getattr(struct ptlrpc_request *req, int offset) -{ - struct mds_obd *mds = mds_req2mds(req); - struct obd_device *obd = req->rq_export->exp_obd; - struct lvfs_run_ctxt saved; - struct dentry *de; - struct mds_body *body; - struct lvfs_ucred uc = {NULL,}; - int rc = 0; - ENTRY; - - body = lustre_swab_reqbuf(req, offset, sizeof(*body), - lustre_swab_mds_body); - if (body == NULL) - RETURN(-EFAULT); - - rc = mds_init_ucred(&uc, req, offset); - if (rc) - GOTO(out_ucred, rc); - - push_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc); - de = mds_fid2dentry(mds, &body->fid1, NULL); - if (IS_ERR(de)) { - rc = req->rq_status = PTR_ERR(de); - GOTO(out_pop, rc); - } - - rc = mds_getattr_pack_msg(req, de->d_inode, offset); - if (rc != 0) { - CERROR("mds_getattr_pack_msg: %d\n", rc); - GOTO(out_pop, rc); - } - - req->rq_status = mds_getattr_internal(obd, de, req, body, 0); - - l_dput(de); - GOTO(out_pop, rc); -out_pop: - pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc); -out_ucred: - if (req->rq_reply_state == NULL) { - req->rq_status = rc; - lustre_pack_reply(req, 0, NULL, NULL); - } - mds_exit_ucred(&uc, mds); - return rc; -} - -static int mds_obd_statfs(struct obd_device *obd, struct obd_statfs *osfs, - unsigned long max_age) -{ - int rc; - - spin_lock(&obd->obd_osfs_lock); - rc = fsfilt_statfs(obd, obd->u.obt.obt_sb, max_age); - if (rc == 0) - memcpy(osfs, &obd->obd_osfs, sizeof(*osfs)); - spin_unlock(&obd->obd_osfs_lock); - - return rc; -} - -static int mds_statfs(struct ptlrpc_request *req) -{ - struct obd_device *obd = req->rq_export->exp_obd; - int rc, size = sizeof(struct obd_statfs); - ENTRY; - - /* This will trigger a watchdog timeout */ - OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_STATFS_LCW_SLEEP, - (MDS_SERVICE_WATCHDOG_TIMEOUT / 1000) + 1); - - rc = lustre_pack_reply(req, 1, &size, NULL); - if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK)) { - CERROR("mds: statfs lustre_pack_reply failed: rc = %d\n", rc); - GOTO(out, rc); - } - - /* We call this so that we can cache a bit - 1 jiffie worth */ - rc = mds_obd_statfs(obd, lustre_msg_buf(req->rq_repmsg, 0, size), - jiffies - HZ); - if (rc) { - CERROR("mds_obd_statfs failed: rc %d\n", rc); - GOTO(out, rc); - } - - EXIT; -out: - req->rq_status = rc; - return 0; -} - -static int mds_sync(struct ptlrpc_request *req, int offset) -{ - struct obd_device *obd = req->rq_export->exp_obd; - struct mds_obd *mds = &obd->u.mds; - struct mds_body *body; - int rc, size = sizeof(*body); - ENTRY; - - body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_mds_body); - if (body == NULL) - GOTO(out, rc = -EFAULT); - - rc = lustre_pack_reply(req, 1, &size, NULL); - if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_SYNC_PACK)) { - CERROR("fsync lustre_pack_reply failed: rc = %d\n", rc); - GOTO(out, rc); - } - - if (body->fid1.id == 0) { - /* a fid of zero is taken to mean "sync whole filesystem" */ - rc = fsfilt_sync(obd, obd->u.obt.obt_sb); - GOTO(out, rc); - } else { - struct dentry *de; - - de = mds_fid2dentry(mds, &body->fid1, NULL); - if (IS_ERR(de)) - GOTO(out, rc = PTR_ERR(de)); - - /* The file parameter isn't used for anything */ - if (de->d_inode->i_fop && de->d_inode->i_fop->fsync) - rc = de->d_inode->i_fop->fsync(NULL, de, 1); - if (rc == 0) { - body = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*body)); - mds_pack_inode2fid(&body->fid1, de->d_inode); - mds_pack_inode2body(body, de->d_inode); - } - - l_dput(de); - GOTO(out, rc); - } -out: - req->rq_status = rc; - return 0; -} - -/* mds_readpage does not take a DLM lock on the inode, because the client must - * already have a PR lock. - * - * If we were to take another one here, a deadlock will result, if another - * thread is already waiting for a PW lock. */ -static int mds_readpage(struct ptlrpc_request *req, int offset) -{ - struct obd_device *obd = req->rq_export->exp_obd; - struct mds_obd *mds = &obd->u.mds; - struct vfsmount *mnt; - struct dentry *de; - struct file *file; - struct mds_body *body, *repbody; - struct lvfs_run_ctxt saved; - int rc, size = sizeof(*repbody); - struct lvfs_ucred uc = {NULL,}; - ENTRY; - - if (OBD_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK)) - RETURN(-ENOMEM); - - rc = lustre_pack_reply(req, 1, &size, NULL); - if (rc) { - CERROR("error packing readpage reply: rc %d\n", rc); - GOTO(out, rc); - } - - body = lustre_swab_reqbuf(req, offset, sizeof(*body), - lustre_swab_mds_body); - if (body == NULL) - GOTO (out, rc = -EFAULT); - - rc = mds_init_ucred(&uc, req, 0); - if (rc) - GOTO(out, rc); - - push_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc); - de = mds_fid2dentry(&obd->u.mds, &body->fid1, &mnt); - if (IS_ERR(de)) - GOTO(out_pop, rc = PTR_ERR(de)); - - CDEBUG(D_INODE, "ino %lu\n", de->d_inode->i_ino); - - file = dentry_open(de, mnt, O_RDONLY | O_LARGEFILE); - /* note: in case of an error, dentry_open puts dentry */ - if (IS_ERR(file)) - GOTO(out_pop, rc = PTR_ERR(file)); - - /* body->size is actually the offset -eeb */ - if ((body->size & (de->d_inode->i_blksize - 1)) != 0) { - CERROR("offset "LPU64" not on a block boundary of %lu\n", - body->size, de->d_inode->i_blksize); - GOTO(out_file, rc = -EFAULT); - } - - /* body->nlink is actually the #bytes to read -eeb */ - if (body->nlink & (de->d_inode->i_blksize - 1)) { - CERROR("size %u is not multiple of blocksize %lu\n", - body->nlink, de->d_inode->i_blksize); - GOTO(out_file, rc = -EFAULT); - } - - repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*repbody)); - repbody->size = file->f_dentry->d_inode->i_size; - repbody->valid = OBD_MD_FLSIZE; - - /* to make this asynchronous make sure that the handling function - doesn't send a reply when this function completes. Instead a - callback function would send the reply */ - /* body->size is actually the offset -eeb */ - rc = mds_sendpage(req, file, body->size, body->nlink); - -out_file: - filp_close(file, 0); -out_pop: - pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc); -out: - mds_exit_ucred(&uc, mds); - req->rq_status = rc; - RETURN(0); -} - -int mds_reint(struct ptlrpc_request *req, int offset, - struct lustre_handle *lockh) -{ - struct mds_update_record *rec; /* 116 bytes on the stack? no sir! */ - int rc; - - OBD_ALLOC(rec, sizeof(*rec)); - if (rec == NULL) - RETURN(-ENOMEM); - - rc = mds_update_unpack(req, offset, rec); - if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNPACK)) { - CERROR("invalid record\n"); - GOTO(out, req->rq_status = -EINVAL); - } - - /* rc will be used to interrupt a for loop over multiple records */ - rc = mds_reint_rec(rec, offset, req, lockh); - out: - OBD_FREE(rec, sizeof(*rec)); - return rc; -} - -static int mds_filter_recovery_request(struct ptlrpc_request *req, - struct obd_device *obd, int *process) -{ - switch (req->rq_reqmsg->opc) { - case MDS_CONNECT: /* This will never get here, but for completeness. */ - case OST_CONNECT: /* This will never get here, but for completeness. */ - case MDS_DISCONNECT: - case OST_DISCONNECT: - *process = 1; - RETURN(0); - - case MDS_CLOSE: - case MDS_SYNC: /* used in unmounting */ - case OBD_PING: - case MDS_REINT: - case LDLM_ENQUEUE: - *process = target_queue_recovery_request(req, obd); - RETURN(0); - - default: - DEBUG_REQ(D_ERROR, req, "not permitted during recovery"); - *process = 0; - /* XXX what should we set rq_status to here? */ - req->rq_status = -EAGAIN; - RETURN(ptlrpc_error(req)); - } -} - -static char *reint_names[] = { - [REINT_SETATTR] "setattr", - [REINT_CREATE] "create", - [REINT_LINK] "link", - [REINT_UNLINK] "unlink", - [REINT_RENAME] "rename", - [REINT_OPEN] "open", -}; - -static int mds_set_info(struct obd_export *exp, struct ptlrpc_request *req) -{ - char *key; - __u32 *val; - int keylen, rc = 0; - ENTRY; - - key = lustre_msg_buf(req->rq_reqmsg, 0, 1); - if (key == NULL) { - DEBUG_REQ(D_HA, req, "no set_info key"); - RETURN(-EFAULT); - } - keylen = req->rq_reqmsg->buflens[0]; - - val = lustre_msg_buf(req->rq_reqmsg, 1, sizeof(*val)); - if (val == NULL) { - DEBUG_REQ(D_HA, req, "no set_info val"); - RETURN(-EFAULT); - } - - rc = lustre_pack_reply(req, 0, NULL, NULL); - if (rc) - RETURN(rc); - req->rq_repmsg->status = 0; - - if (keylen < strlen("read-only") || - memcmp(key, "read-only", keylen) != 0) - RETURN(-EINVAL); - - if (*val) - exp->exp_connect_flags |= OBD_CONNECT_RDONLY; - else - exp->exp_connect_flags &= ~OBD_CONNECT_RDONLY; - - RETURN(0); -} - -static int mds_handle_quotacheck(struct ptlrpc_request *req) -{ - struct obd_quotactl *oqctl; - int rc; - ENTRY; - - oqctl = lustre_swab_reqbuf(req, 0, sizeof(*oqctl), - lustre_swab_obd_quotactl); - if (oqctl == NULL) - RETURN(-EPROTO); - - rc = lustre_pack_reply(req, 0, NULL, NULL); - if (rc) { - CERROR("mds: out of memory while packing quotacheck reply\n"); - RETURN(rc); - } - - req->rq_status = obd_quotacheck(req->rq_export, oqctl); - RETURN(0); -} - -static int mds_handle_quotactl(struct ptlrpc_request *req) -{ - struct obd_quotactl *oqctl, *repoqc; - int rc, size = sizeof(*repoqc); - ENTRY; - - oqctl = lustre_swab_reqbuf(req, 0, sizeof(*oqctl), - lustre_swab_obd_quotactl); - if (oqctl == NULL) - RETURN(-EPROTO); - - rc = lustre_pack_reply(req, 1, &size, NULL); - if (rc) - RETURN(rc); - - repoqc = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repoqc)); - - req->rq_status = obd_quotactl(req->rq_export, oqctl); - *repoqc = *oqctl; - RETURN(0); -} - -static int mds_msg_check_version(struct lustre_msg *msg) -{ - int rc; - - /* TODO: enable the below check while really introducing msg version. - * it's disabled because it will break compatibility with b1_4. - */ - return (0); - - switch (msg->opc) { - case MDS_CONNECT: - case MDS_DISCONNECT: - case OBD_PING: - rc = lustre_msg_check_version(msg, LUSTRE_OBD_VERSION); - if (rc) - CERROR("bad opc %u version %08x, expecting %08x\n", - msg->opc, msg->version, LUSTRE_OBD_VERSION); - break; - case MDS_GETSTATUS: - case MDS_GETATTR: - case MDS_GETATTR_NAME: - case MDS_STATFS: - case MDS_READPAGE: - case MDS_REINT: - case MDS_CLOSE: - case MDS_DONE_WRITING: - case MDS_PIN: - case MDS_SYNC: - case MDS_GETXATTR: - case MDS_SETXATTR: - case MDS_SET_INFO: - case MDS_QUOTACHECK: - case MDS_QUOTACTL: - case QUOTA_DQACQ: - case QUOTA_DQREL: - rc = lustre_msg_check_version(msg, LUSTRE_MDS_VERSION); - if (rc) - CERROR("bad opc %u version %08x, expecting %08x\n", - msg->opc, msg->version, LUSTRE_MDS_VERSION); - break; - case LDLM_ENQUEUE: - case LDLM_CONVERT: - case LDLM_BL_CALLBACK: - case LDLM_CP_CALLBACK: - rc = lustre_msg_check_version(msg, LUSTRE_DLM_VERSION); - if (rc) - CERROR("bad opc %u version %08x, expecting %08x\n", - msg->opc, msg->version, LUSTRE_DLM_VERSION); - break; - case OBD_LOG_CANCEL: - case LLOG_ORIGIN_HANDLE_CREATE: - case LLOG_ORIGIN_HANDLE_NEXT_BLOCK: - case LLOG_ORIGIN_HANDLE_PREV_BLOCK: - case LLOG_ORIGIN_HANDLE_READ_HEADER: - case LLOG_ORIGIN_HANDLE_CLOSE: - case LLOG_CATINFO: - rc = lustre_msg_check_version(msg, LUSTRE_LOG_VERSION); - if (rc) - CERROR("bad opc %u version %08x, expecting %08x\n", - msg->opc, msg->version, LUSTRE_LOG_VERSION); - break; - default: - CERROR("MDS unknown opcode %d\n", msg->opc); - rc = -ENOTSUPP; - } - return rc; +static int mdt_pin(struct mdt_thread_info *info, + struct ptlrpc_request *req, int offset) +{ + return -EOPNOTSUPP; } +static int mdt_sync(struct mdt_thread_info *info, + struct ptlrpc_request *req, int offset) +{ + return -EOPNOTSUPP; +} -enum mdt_handler_flags { +static int mdt_set_info(struct mdt_thread_info *info, + struct ptlrpc_request *req, int offset) +{ + return -EOPNOTSUPP; +} + +static int mdt_handle_quotacheck(struct mdt_thread_info *info, + struct ptlrpc_request *req, int offset) +{ + return -EOPNOTSUPP; +} + +static int mdt_handle_quotactl(struct mdt_thread_info *info, + struct ptlrpc_request *req, int offset) +{ + return -EOPNOTSUPP; +} + + +int fid_lock(const struct ll_fid *f, struct lustre_handle *lh, ldlm_mode_t mode) +{ + return 0; +} + +void fid_unlock(const struct ll_fid *f, + struct lustre_handle *lh, ldlm_mode_t mode) +{ +} + +static struct lu_device_operations mdt_lu_ops; + +static int lu_device_is_mdt(struct lu_device *d) +{ /* - * struct mds_body is passed in the 0-th incoming buffer. + * XXX for now. Tags in lu_device_type->ldt_something are needed. */ - HABEO_CORPUS = (1 << 0) -}; + return ergo(d->ld_ops != NULL, d->ld_ops == &mdt_lu_ops); +} + +static struct mdt_object *mdt_obj(struct lu_object *o) +{ + LASSERT(lu_device_is_mdt(o->lo_dev)); + return container_of(o, struct mdt_object, mot_obj.mo_lu); +} + +struct mdt_object *mdt_object_find(struct mdt_device *d, struct ll_fid *f) +{ + struct lu_object *o; + + o = lu_object_find(d->mdt_md_dev.md_lu_dev.ld_site, f); + if (IS_ERR(o)) + return (struct mdt_object *)o; + else + return mdt_obj(o); +} + +void mdt_object_put(struct mdt_object *o) +{ + lu_object_put(&o->mot_obj.mo_lu); +} struct mdt_handler { const char *mh_name; @@ -1407,82 +237,28 @@ struct mdt_handler { struct ptlrpc_request *req, int offset); }; -#define DEF_HNDL(prefix, base, flags, opc, fn) \ -[prefix ## _ ## opc - prefix ## _ ## base] = { \ - .mh_name = #opc, \ - .mh_fail_id = OBD_FAIL_ ## prefix ## _ ## opc ## _NET, \ - .mh_opc = prefix ## _ ## opc, \ - .mh_flags = flags, \ - .mh_act = fn \ -} - -#define DEF_MDT_HNDL(flags, name, fn) DEF_HNDL(MDS, GETATTR, flags, name, fn) - -static struct mdt_handler mdt_mds_ops[] = { - DEF_MDT_HNDL(0, GETSTATUS, mdt_getstatus), - - DEF_MDT_HNDL(0, CONNECT, mds_connect), - DEF_MDT_HNDL(0, DISCONNECT, mds_disconnect), - DEF_MDT_HNDL(HABEO_CORPUS, GETATTR, mds_getattr), - DEF_MDT_HNDL(HABEO_CORPUS, GETATTR_NAME, mds_getattr_name), - DEF_MDT_HNDL(HABEO_CORPUS, SETXATTR, mds_setxattr), - DEF_MDT_HNDL(HABEO_CORPUS, GETXATTR, mds_getxattr), - DEF_MDT_HNDL(0, STATFS, mds_statfs), - DEF_MDT_HNDL(HABEO_CORPUS, READPAGE, mds_readpage), - DEF_MDT_HNDL(0, REINT, mds_reint), - DEF_MDT_HNDL(HABEO_CORPUS, CLOSE, mds_close), - DEF_MDT_HNDL(HABEO_CORPUS, DONE_WRITING, mds_done_writing), - DEF_MDT_HNDL(0, PIN, mds_pin), - DEF_MDT_HNDL(HABEO_CORPUS, SYNC, mds_sync), - DEF_MDT_HNDL(0, SET_INFO, mds_set_info), - DEF_MDT_HNDL(0, QUOTACHECK, mds_handle_quotacheck), - DEF_MDT_HNDL(0, QUOTACTL, mds_handle_quotactl) -}; - -static struct mdt_handler mdt_obd_ops[] = { -}; - -static struct mdt_handler mdt_dlm_ops[] = { +enum mdt_handler_flags { + /* + * struct mds_body is passed in the 0-th incoming buffer. + */ + HABEO_CORPUS = (1 << 0) }; -static struct mdt_handler mdt_llog_ops[] = { +struct mdt_opc_slice { + __u32 mos_opc_start; + int mos_opc_end; + struct mdt_handler *mos_hs; }; -static struct mdt_opc_slice { - __u32 mos_opc_start; - int mos_opc_end; - struct mdt_handler *mos_hs; -} mdt_handlers[] = { - { - .mos_opc_start = MDS_GETATTR, - .mos_opc_end = MDS_LAST_OPC, - .mos_hs = mdt_mds_ops - }, - { - .mos_opc_start = OBD_PING, - .mos_opc_end = OBD_LAST_OPC, - .mos_hs = mdt_obd_ops - }, - { - .mos_opc_start = LDLM_ENQUEUE, - .mos_opc_end = LDLM_LAST_OPC, - .mos_hs = mdt_dlm_ops - }, - { - .mos_opc_start = LLOG_ORIGIN_HANDLE_CREATE, - .mos_opc_end = LLOG_LAST_OPC, - .mos_hs = mdt_llog_ops - } -}; +static struct mdt_opc_slice mdt_handlers[]; struct mdt_handler *mdt_handler_find(__u32 opc) { - int i; struct mdt_opc_slice *s; - struct mdt_handler *h; + struct mdt_handler *h; h = NULL; - for (i = 0, s = mdt_handlers; i < ARRAY_SIZE(mdt_handlers); i++, s++) { + for (s = mdt_handlers; s->mos_hs != NULL; s++) { if (s->mos_opc_start <= opc && opc < s->mos_opc_end) { h = s->mos_hs + (opc - s->mos_opc_start); if (h->mh_opc != 0) @@ -1495,22 +271,6 @@ struct mdt_handler *mdt_handler_find(__u32 opc) return h; } -struct mdt_object *mdt_object_find(struct mdt_device *d, struct ll_fid *f) -{ - struct lu_object *o; - - o = lu_object_find(d->mdt_md_dev.md_lu_dev.ld_site, f); - if (IS_ERR(o)) - return (struct mdt_object *)o; - else - return container_of(o, struct mdt_object, mot_obj.mo_lu); -} - -void mdt_object_put(struct mdt_object *o) -{ - lu_object_put(&o->mot_obj.mo_lu); -} - static int mdt_req_handle(struct mdt_thread_info *info, struct mdt_handler *h, struct ptlrpc_request *req, int shift) @@ -1576,6 +336,74 @@ static void mdt_thread_info_fini(struct mdt_thread_info *info) } } +static int mds_msg_check_version(struct lustre_msg *msg) +{ + int rc; + + /* TODO: enable the below check while really introducing msg version. + * it's disabled because it will break compatibility with b1_4. + */ + return (0); + + switch (msg->opc) { + case MDS_CONNECT: + case MDS_DISCONNECT: + case OBD_PING: + rc = lustre_msg_check_version(msg, LUSTRE_OBD_VERSION); + if (rc) + CERROR("bad opc %u version %08x, expecting %08x\n", + msg->opc, msg->version, LUSTRE_OBD_VERSION); + break; + case MDS_GETSTATUS: + case MDS_GETATTR: + case MDS_GETATTR_NAME: + case MDS_STATFS: + case MDS_READPAGE: + case MDS_REINT: + case MDS_CLOSE: + case MDS_DONE_WRITING: + case MDS_PIN: + case MDS_SYNC: + case MDS_GETXATTR: + case MDS_SETXATTR: + case MDS_SET_INFO: + case MDS_QUOTACHECK: + case MDS_QUOTACTL: + case QUOTA_DQACQ: + case QUOTA_DQREL: + rc = lustre_msg_check_version(msg, LUSTRE_MDS_VERSION); + if (rc) + CERROR("bad opc %u version %08x, expecting %08x\n", + msg->opc, msg->version, LUSTRE_MDS_VERSION); + break; + case LDLM_ENQUEUE: + case LDLM_CONVERT: + case LDLM_BL_CALLBACK: + case LDLM_CP_CALLBACK: + rc = lustre_msg_check_version(msg, LUSTRE_DLM_VERSION); + if (rc) + CERROR("bad opc %u version %08x, expecting %08x\n", + msg->opc, msg->version, LUSTRE_DLM_VERSION); + break; + case OBD_LOG_CANCEL: + case LLOG_ORIGIN_HANDLE_CREATE: + case LLOG_ORIGIN_HANDLE_NEXT_BLOCK: + case LLOG_ORIGIN_HANDLE_PREV_BLOCK: + case LLOG_ORIGIN_HANDLE_READ_HEADER: + case LLOG_ORIGIN_HANDLE_CLOSE: + case LLOG_CATINFO: + rc = lustre_msg_check_version(msg, LUSTRE_LOG_VERSION); + if (rc) + CERROR("bad opc %u version %08x, expecting %08x\n", + msg->opc, msg->version, LUSTRE_LOG_VERSION); + break; + default: + CERROR("MDS unknown opcode %d\n", msg->opc); + rc = -ENOTSUPP; + } + return rc; +} + static int mdt_handle0(struct ptlrpc_request *req, struct mdt_thread_info *info) { int rc; @@ -1595,10 +423,8 @@ static int mdt_handle0(struct ptlrpc_request *req, struct mdt_thread_info *info) RETURN(rc); } - /* XXX identical to OST */ if (req->rq_reqmsg->opc != MDS_CONNECT) { struct mds_export_data *med; - int recovering, abort_recovery; if (req->rq_export == NULL) { CERROR("operation %d on unconnected MDS from %s\n", @@ -1614,12 +440,11 @@ static int mdt_handle0(struct ptlrpc_request *req, struct mdt_thread_info *info) /* sanity check: if the xid matches, the request must * be marked as a resent or replayed */ - if (req->rq_xid == med->med_mcd->mcd_last_xid) - LASSERTF(lustre_msg_get_flags(req->rq_reqmsg) & - (MSG_RESENT | MSG_REPLAY), - "rq_xid "LPU64" matches last_xid, " - "expected RESENT flag\n", - req->rq_xid); + LASSERTF(ergo(req->rq_xid == med->med_mcd->mcd_last_xid, + lustre_msg_get_flags(req->rq_reqmsg) & + (MSG_RESENT | MSG_REPLAY)), + "rq_xid "LPU64" matches last_xid, " + "expected RESENT flag\n", req->rq_xid); /* else: note the opposite is not always true; a * RESENT req after a failover will usually not match * the last_xid, since it was likely never @@ -1627,21 +452,7 @@ static int mdt_handle0(struct ptlrpc_request *req, struct mdt_thread_info *info) * match the last xid, however it could for a * committed, but still retained, open. */ - /* Check for aborted recovery. */ - spin_lock_bh(&obd->obd_processing_task_lock); - abort_recovery = obd->obd_abort_recovery; - recovering = obd->obd_recovering; - spin_unlock_bh(&obd->obd_processing_task_lock); - if (abort_recovery) { - target_abort_recovery(obd); - } else if (recovering) { - int should_process; - - rc = mds_filter_recovery_request(req, obd, - &should_process); - if (rc || !should_process) - RETURN(rc); - } + /* Check for aborted recovery... */ } h = mdt_handler_find(req->rq_reqmsg->opc); @@ -1680,22 +491,6 @@ static int mdt_handle0(struct ptlrpc_request *req, struct mdt_thread_info *info) RETURN(0); } -static struct lu_device_operations mdt_lu_ops; - -static int lu_device_is_mdt(struct lu_device *d) -{ - /* - * XXX for now. Tags in lu_device_type->ldt_something are needed. - */ - return ergo(d->ld_ops != NULL, d->ld_ops == &mdt_lu_ops); -} - -static struct mdt_object *mdt_obj(struct lu_object *o) -{ - LASSERT(lu_device_is_mdt(o->lo_dev)); - return container_of(o, struct mdt_object, mot_obj.mo_lu); -} - static struct mdt_device *mdt_dev(struct lu_device *d) { LASSERT(lu_device_is_mdt(d)); @@ -1895,6 +690,11 @@ static void mdt_object_unlock(struct mdt_object *o, ldlm_mode_t mode) fid_unlock(mdt_object_fid(o), &o->mot_lh, mode); } +struct md_object *mdt_object_child(struct mdt_object *o) +{ + return lu2md(lu_object_next(&o->mot_obj.mo_lu)); +} + int mdt_mkdir(struct mdt_device *d, struct ll_fid *pfid, const char *name) { struct mdt_object *o; @@ -1905,7 +705,8 @@ int mdt_mkdir(struct mdt_device *d, struct ll_fid *pfid, const char *name) return PTR_ERR(o); result = mdt_object_lock(o, LCK_PW); if (result == 0) { - result = d->mdt_child->md_ops->mdo_mkdir(&o->mot_obj, name); + result = d->mdt_child->md_ops->mdo_mkdir(mdt_object_child(o), + name); mdt_object_unlock(o, LCK_PW); } mdt_object_put(o); @@ -1913,7 +714,7 @@ int mdt_mkdir(struct mdt_device *d, struct ll_fid *pfid, const char *name) } static struct obd_ops mdt_obd_device_ops = { - .o_owner = THIS_MODULE + .o_owner = THIS_MODULE }; struct lu_device *mdt_device_alloc(struct lu_device_type *t, @@ -1961,10 +762,21 @@ static struct lu_device_type_operations mdt_device_type_ops = { }; static struct lu_device_type mdt_device_type = { + .ldt_tags = LU_DEVICE_MD, .ldt_name = LUSTRE_MDT0_NAME, .ldt_ops = &mdt_device_type_ops }; +struct lprocfs_vars lprocfs_mdt_obd_vars[] = { + { 0 } +}; + +struct lprocfs_vars lprocfs_mdt_module_vars[] = { + { 0 } +}; + +LPROCFS_INIT_VARS(mdt, lprocfs_mdt_module_vars, lprocfs_mdt_obd_vars); + static int __init mdt_mod_init(void) { struct lprocfs_static_vars lvars; @@ -1991,6 +803,73 @@ static void __exit mdt_mod_exit(void) class_unregister_type(LUSTRE_MDT0_NAME); } + +#define DEF_HNDL(prefix, base, flags, opc, fn) \ +[prefix ## _ ## opc - prefix ## _ ## base] = { \ + .mh_name = #opc, \ + .mh_fail_id = OBD_FAIL_ ## prefix ## _ ## opc ## _NET, \ + .mh_opc = prefix ## _ ## opc, \ + .mh_flags = flags, \ + .mh_act = fn \ +} + +#define DEF_MDT_HNDL(flags, name, fn) DEF_HNDL(MDS, GETATTR, flags, name, fn) + +static struct mdt_handler mdt_mds_ops[] = { + DEF_MDT_HNDL(0, CONNECT, mdt_connect), + DEF_MDT_HNDL(0, DISCONNECT, mdt_disconnect), + DEF_MDT_HNDL(0, GETSTATUS, mdt_getstatus), + DEF_MDT_HNDL(HABEO_CORPUS, GETATTR, mdt_getattr), + DEF_MDT_HNDL(HABEO_CORPUS, GETATTR_NAME, mdt_getattr_name), + DEF_MDT_HNDL(HABEO_CORPUS, SETXATTR, mdt_setxattr), + DEF_MDT_HNDL(HABEO_CORPUS, GETXATTR, mdt_getxattr), + DEF_MDT_HNDL(0, STATFS, mdt_statfs), + DEF_MDT_HNDL(HABEO_CORPUS, READPAGE, mdt_readpage), + DEF_MDT_HNDL(0, REINT, mdt_reint), + DEF_MDT_HNDL(HABEO_CORPUS, CLOSE, mdt_close), + DEF_MDT_HNDL(HABEO_CORPUS, DONE_WRITING, mdt_done_writing), + DEF_MDT_HNDL(0, PIN, mdt_pin), + DEF_MDT_HNDL(HABEO_CORPUS, SYNC, mdt_sync), + DEF_MDT_HNDL(0, SET_INFO, mdt_set_info), + DEF_MDT_HNDL(0, QUOTACHECK, mdt_handle_quotacheck), + DEF_MDT_HNDL(0, QUOTACTL, mdt_handle_quotactl), +}; + +static struct mdt_handler mdt_obd_ops[] = { +}; + +static struct mdt_handler mdt_dlm_ops[] = { +}; + +static struct mdt_handler mdt_llog_ops[] = { +}; + +static struct mdt_opc_slice mdt_handlers[] = { + { + .mos_opc_start = MDS_GETATTR, + .mos_opc_end = MDS_LAST_OPC, + .mos_hs = mdt_mds_ops + }, + { + .mos_opc_start = OBD_PING, + .mos_opc_end = OBD_LAST_OPC, + .mos_hs = mdt_obd_ops + }, + { + .mos_opc_start = LDLM_ENQUEUE, + .mos_opc_end = LDLM_LAST_OPC, + .mos_hs = mdt_dlm_ops + }, + { + .mos_opc_start = LLOG_ORIGIN_HANDLE_CREATE, + .mos_opc_end = LLOG_LAST_OPC, + .mos_hs = mdt_llog_ops + }, + { + .mos_hs = NULL + } +}; + MODULE_AUTHOR("Cluster File Systems, Inc. "); MODULE_DESCRIPTION("Lustre Meta-data Target Prototype ("LUSTRE_MDT0_NAME")"); MODULE_LICENSE("GPL"); -- 1.8.3.1