X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fmdc%2Fmdc_request.c;h=8eb065dd070fa97a6443efc970498a45253dc9a8;hp=24a9920174f66cb5a627e55ed6e108a73964605d;hb=c1f7cabab290f16fefee43001b62f20dc5c35e42;hpb=da54b94dbb971b7ee163928bde59640f3eaf10e3 diff --git a/lustre/mdc/mdc_request.c b/lustre/mdc/mdc_request.c index 24a9920..8eb065d 100644 --- a/lustre/mdc/mdc_request.c +++ b/lustre/mdc/mdc_request.c @@ -1,22 +1,25 @@ /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * - * Copyright (C) 2001-2004 Cluster File Systems, Inc. + * Copyright (C) 2001-2003 Cluster File Systems, Inc. * - * This file is part of Lustre, http://www.sf.net/projects/lustre/ + * This file is part of the Lustre file system, http://www.lustre.org + * Lustre is a trademark of Cluster File Systems, Inc. * - * Lustre is free software; you can redistribute it and/or - * modify it under the terms of version 2 of the GNU General Public - * License as published by the Free Software Foundation. + * You may have signed or agreed to another license before downloading + * this software. If so, you are bound by the terms and conditions + * of that agreement, and the following does not apply to you. See the + * LICENSE file included with this distribution for more information. * - * Lustre is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. + * If you did not agree to a different license, then this copy of Lustre + * is open source software; you can redistribute it and/or modify it + * under the terms of version 2 of the GNU General Public License as + * published by the Free Software Foundation. * - * You should have received a copy of the GNU General Public License - * along with Lustre; if not, write to the Free Software - * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + * In either case, Lustre is distributed in the hope that it will be + * useful, but WITHOUT ANY WARRANTY; without even the implied warranty + * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * license text for more details. */ #ifndef EXPORT_SYMTAB @@ -33,60 +36,95 @@ # include #endif -#include -#include -#include -#include -#include #include -#include -#include +#include +#include +#include +#include +#include +#include #include "mdc_internal.h" +static quota_interface_t *quota_interface; + #define REQUEST_MINOR 244 -static int mdc_cleanup(struct obd_device *obd, int flags); +static quota_interface_t *quota_interface; +extern quota_interface_t mdc_quota_interface; + +static int mdc_cleanup(struct obd_device *obd); + +static struct obd_capa *mdc_unpack_capa(struct ptlrpc_request *req, + unsigned int offset) +{ + struct lustre_capa *capa; + struct obd_capa *oc; + + /* swabbed already in mdc_enqueue */ + capa = lustre_msg_buf(req->rq_repmsg, offset, sizeof(*capa)); + if (capa == NULL) { + CERROR("missing capa at offset %d failed!\n", offset); + return ERR_PTR(-EFAULT); + } + + oc = alloc_capa(CAPA_SITE_CLIENT); + if (!oc) { + CERROR("alloc capa failed!\n"); + return ERR_PTR(-ENOMEM); + } + oc->c_capa = *capa; + + return oc; +} -extern int mds_queue_req(struct ptlrpc_request *); /* Helper that implements most of mdc_getstatus and signal_completed_replay. */ /* XXX this should become mdc_get_info("key"), sending MDS_GET_INFO RPC */ -static int send_getstatus(struct obd_import *imp, struct lustre_id *rootid, - int level, int msg_flags) +static int send_getstatus(struct obd_import *imp, struct lu_fid *rootfid, + struct obd_capa **pc, int level, int msg_flags) { struct ptlrpc_request *req; - struct mds_body *body; - int rc, size[2] = {0, sizeof(*body)}; + struct mdt_body *body; + int rc, size[3] = { sizeof(struct ptlrpc_body), + sizeof(*body), + sizeof(struct lustre_capa) }; ENTRY; - //size[0] = lustre_secdesc_size(); - - req = ptlrpc_prep_req(imp, LUSTRE_MDS_VERSION, MDS_GETSTATUS, - 2, size, NULL); + req = ptlrpc_prep_req(imp, LUSTRE_MDS_VERSION, MDS_GETSTATUS, 2, size, + NULL); if (!req) GOTO(out, rc = -ENOMEM); - //lustre_pack_secdesc(req, size[0]); - - body = lustre_msg_buf(req->rq_reqmsg, MDS_REQ_REC_OFF, sizeof (*body)); req->rq_send_state = level; - req->rq_replen = lustre_msg_size(1, &size[1]); + ptlrpc_req_set_repsize(req, 3, size); - req->rq_reqmsg->flags |= msg_flags; + mdc_pack_req_body(req, REQ_REC_OFF, 0, NULL, NULL, 0, 0); + lustre_msg_add_flags(req->rq_reqmsg, msg_flags); rc = ptlrpc_queue_wait(req); if (!rc) { - body = lustre_swab_repbuf (req, 0, sizeof (*body), - lustre_swab_mds_body); + body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body), + lustre_swab_mdt_body); if (body == NULL) { - CERROR ("Can't extract mds_body\n"); + CERROR ("Can't extract mdt_body\n"); GOTO (out, rc = -EPROTO); } - memcpy(rootid, &body->id1, sizeof(*rootid)); + *rootfid = body->fid1; - CDEBUG(D_NET, "root ino="LPU64", last_committed="LPU64 - ", last_xid="LPU64"\n", rootid->li_stc.u.e3s.l3s_ino, - req->rq_repmsg->last_committed, req->rq_repmsg->last_xid); + if (body->valid & OBD_MD_FLMDSCAPA) { + struct obd_capa *oc; + + oc = mdc_unpack_capa(req, REPLY_REC_OFF + 1); + if (IS_ERR(oc)) + GOTO(out, rc = PTR_ERR(oc)); + *pc = oc; + } + + CDEBUG(D_NET, "root fid="DFID", last_committed="LPU64 + ", last_xid="LPU64"\n", + PFID(rootfid), + lustre_msg_get_last_committed(req->rq_repmsg), + lustre_msg_get_last_xid(req->rq_repmsg)); } EXIT; @@ -95,682 +133,734 @@ static int send_getstatus(struct obd_import *imp, struct lustre_id *rootid, return rc; } -/* This should be mdc_get_info("rootid") */ -int mdc_getstatus(struct obd_export *exp, struct lustre_id *rootid) +/* This should be mdc_get_info("rootfid") */ +int mdc_getstatus(struct obd_export *exp, struct lu_fid *rootfid, + struct obd_capa **pc) { - return send_getstatus(class_exp2cliimp(exp), rootid, + return send_getstatus(class_exp2cliimp(exp), rootfid, pc, LUSTRE_IMP_FULL, 0); } -int -mdc_interpret_getattr(struct ptlrpc_request *req, void *unused, int rc) -{ - struct mds_body *body = NULL; - struct obd_capa *ocapa; - struct lustre_capa *capa = NULL; - unsigned long expiry; - ENTRY; - - if (rc) { - DEBUG_REQ(D_ERROR, req, - "async getattr failed: rc = %d", rc); - RETURN(rc); - } - - body = lustre_swab_repbuf(req, 0, sizeof (*body), lustre_swab_mds_body); - if (body == NULL) { - CERROR ("Can't unpack mds_body\n"); - RETURN(-EPROTO); - } - - if (!(body->valid & OBD_MD_CAPA)) { - CDEBUG(D_INFO, "MDS has disabled capability\n"); - RETURN(0); - } - - capa = lustre_swab_repbuf(req, 1, sizeof(*capa), - lustre_swab_lustre_capa); - if (capa == NULL && rc != 0) { - CERROR ("Can't unpack lustre_capa\n"); - RETURN(-EPROTO); - } - - ocapa = capa_renew(capa, CLIENT_CAPA); - if (!ocapa) - RETURN(-ENOENT); - - spin_lock(&capa_lock); - expiry = expiry_to_jiffies(capa->lc_expiry - capa_pre_expiry(capa)); - if (time_before(expiry, ll_capa_timer.expires) || - !timer_pending(&ll_capa_timer)) { - mod_timer(&ll_capa_timer, expiry); - CDEBUG(D_INFO, "ll_capa_timer new timer: %lu\n", expiry); - } - spin_unlock(&capa_lock); - - RETURN(rc); -} - -int mdc_getattr_async(struct obd_export *exp, struct ptlrpc_request *req) -{ - int repsize[2] = {sizeof(struct mds_body), sizeof(struct lustre_capa)}; - ENTRY; - - req->rq_replen = lustre_msg_size(2, repsize); - req->rq_interpret_reply = mdc_interpret_getattr; - ptlrpcd_add_req(req); - - RETURN (0); -} - -int mdc_getattr_common(struct obd_export *exp, unsigned int ea_size, - struct ptlrpc_request *req) +/* + * This function now is known to always saying that it will receive 4 buffers + * from server. Even for cases when acl_size and md_size is zero, RPC header + * willcontain 4 fields and RPC itself will contain zero size fields. This is + * because mdt_getattr*() _always_ returns 4 fields, but if acl is not needed + * and thus zero, it shirinks it, making zero size. The same story about + * md_size. And this is course of problem when client waits for smaller number + * of fields. This issue will be fixed later when client gets awar of RPC + * layouts. --umka + */ +static int mdc_getattr_common(struct obd_export *exp, unsigned int ea_size, + unsigned int acl_size, int mdscapa, + struct ptlrpc_request *req) { - struct mds_body *body, *reqbody; - void *eadata; - int rc; - int repsize[2] = {sizeof(*body)}; - int bufcount = 1; + struct mdt_body *body; + void *eadata; + int size[5] = { sizeof(struct ptlrpc_body), + sizeof(*body), + ea_size, + acl_size, + sizeof(struct lustre_capa) }; + int offset, rc; ENTRY; - /* request message already built */ - - if (ea_size != 0) { - repsize[bufcount++] = ea_size; + /* Request message already built. */ + if (ea_size) CDEBUG(D_INODE, "reserved %u bytes for MD/symlink in packet\n", ea_size); - } + if (acl_size) + CDEBUG(D_INODE, "reserved %u bytes for ACL\n", acl_size); - reqbody = lustre_msg_buf(req->rq_reqmsg, 1, sizeof(*reqbody)); - LASSERT(!(reqbody->valid & OBD_MD_FLACL)); + ptlrpc_req_set_repsize(req, 5, size); - if (reqbody->valid & OBD_MD_FLKEY) { - repsize[bufcount++] = 5; - repsize[bufcount++] = sizeof(struct lustre_key); - } else if (reqbody->valid & OBD_MD_CAPA) { - LASSERT(ea_size == 0); - repsize[bufcount++] = sizeof(struct lustre_capa); - } - - req->rq_replen = lustre_msg_size(bufcount, repsize); - - mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL); rc = ptlrpc_queue_wait(req); - mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL); if (rc != 0) RETURN (rc); - body = lustre_swab_repbuf (req, 0, sizeof (*body), - lustre_swab_mds_body); + body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body), + lustre_swab_mdt_body); if (body == NULL) { - CERROR ("Can't unpack mds_body\n"); + CERROR ("Can't unpack mdt_body\n"); RETURN (-EPROTO); } CDEBUG(D_NET, "mode: %o\n", body->mode); - LASSERT_REPSWAB (req, 1); - - /* Skip the check if getxattr/listxattr are called with no buffers */ - if ((reqbody->eadatasize != 0) && - !(reqbody->valid & (OBD_MD_FLXATTR | OBD_MD_FLXATTRLIST))) { + offset = REPLY_REC_OFF + 1; + lustre_set_rep_swabbed(req, offset); + if (body->eadatasize != 0) { /* reply indicates presence of eadata; check it's there... */ - eadata = lustre_msg_buf (req->rq_repmsg, 1, - body->eadatasize); + eadata = lustre_msg_buf(req->rq_repmsg, offset++, + body->eadatasize); if (eadata == NULL) { CERROR ("Missing/short eadata\n"); RETURN (-EPROTO); } } - RETURN (0); -} + if (body->valid & OBD_MD_FLMODEASIZE) { + struct client_obd *cli = &exp->exp_obd->u.cli; -static int mdc_cancel_unused(struct obd_export *exp, - struct lov_stripe_md *lsm, - int flags, void *opaque) -{ - struct obd_device *obd = class_exp2obd(exp); + if (cli->cl_max_mds_easize < body->max_mdsize) + cli->cl_max_mds_easize = body->max_mdsize; + if (cli->cl_max_mds_cookiesize < body->max_cookiesize) + cli->cl_max_mds_cookiesize = body->max_cookiesize; + } - ENTRY; - RETURN(ldlm_cli_cancel_unused(obd->obd_namespace, - NULL, flags, opaque)); + offset += !!body->aclsize; + + if (body->valid & OBD_MD_FLMDSCAPA) { + struct lustre_capa *capa; + + LASSERT(mdscapa); + capa = lustre_unpack_capa(req->rq_repmsg, offset++); + if (capa == NULL) { + CERROR("Missing/short client MDS capability\n"); + RETURN(-EPROTO); + } + } + + RETURN (0); } -int mdc_getattr(struct obd_export *exp, struct lustre_id *id, - __u64 valid, const char *xattr_name, - const void *xattr_data, unsigned int xattr_datalen, - unsigned int ea_size, struct obd_capa *ocapa, +int mdc_getattr(struct obd_export *exp, const struct lu_fid *fid, + struct obd_capa *oc, obd_valid valid, int ea_size, struct ptlrpc_request **request) { struct ptlrpc_request *req; - struct mds_body *body; - int xattr_namelen = xattr_name ? strlen(xattr_name) + 1 : 0; - int size[4] = {0, sizeof(*body)}; - int bufcount = 2; - int rc; + int size[3] = { sizeof(struct ptlrpc_body), sizeof(struct mdt_body) }; + int acl_size = 0, rc; ENTRY; - size[0] = lustre_secdesc_size(); - - if (valid & OBD_MD_FLXATTR) { - size[bufcount++] = xattr_namelen; - - if (xattr_datalen > 0) { - LASSERT(xattr_data); - size[bufcount++] = xattr_datalen; - } - } else if (valid & OBD_MD_CAPA) { - LASSERT(valid == OBD_MD_CAPA); - LASSERT(ocapa); - size[bufcount++] = sizeof(*ocapa); - } else { - LASSERT(!xattr_data && !xattr_datalen); - } + size[REQ_REC_OFF + 1] = oc ? sizeof(struct lustre_capa) : 0; + /* + * XXX: Do we need to make another request here? We just did a getattr + * to do the lookup in the first place. + */ req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION, - MDS_GETATTR, bufcount, size, NULL); + MDS_GETATTR, 3, size, NULL); if (!req) GOTO(out, rc = -ENOMEM); - lustre_pack_secdesc(req, size[0]); + mdc_pack_req_body(req, REQ_REC_OFF, valid, fid, oc, ea_size, + MDS_BFLAG_EXT_FLAGS/*request "new" flags(bug 9486)*/); - body = lustre_msg_buf(req->rq_reqmsg, MDS_REQ_REC_OFF, sizeof (*body)); - memcpy(&body->id1, id, sizeof(*id)); - body->valid = valid; - body->eadatasize = ea_size; - - if (valid & OBD_MD_FLXATTR) { - memcpy(lustre_msg_buf(req->rq_reqmsg, 2, xattr_namelen), - xattr_name, xattr_namelen); - if (xattr_datalen) - memcpy(lustre_msg_buf(req->rq_reqmsg, 3, xattr_datalen), - xattr_data, xattr_datalen); - } - - if (valid & OBD_MD_CAPA) { - /* renew capability */ - memcpy(&body->handle, &ocapa->c_handle, sizeof(body->handle)); - memcpy(lustre_msg_buf(req->rq_reqmsg, 2, sizeof(ocapa->c_capa)), - &ocapa->c_capa, sizeof(ocapa->c_capa)); + if (valid & OBD_MD_FLRMTPERM) + acl_size = sizeof(struct mdt_remote_perm); + + /* Currently only root inode will call us with FLACL */ + else if (valid & OBD_MD_FLACL) + acl_size = LUSTRE_POSIX_ACL_MAX_SIZE; - rc = mdc_getattr_async(exp, req); - req = NULL; /* ptlrpcd will finish request */ - } else { - rc = mdc_getattr_common(exp, ea_size, req); - if (rc != 0) { - ptlrpc_req_finished (req); - req = NULL; - } + rc = mdc_getattr_common(exp, ea_size, acl_size, + !!(valid & OBD_MD_FLMDSCAPA), req); + if (rc != 0) { + ptlrpc_req_finished (req); + req = NULL; } out: *request = req; RETURN (rc); } -int mdc_access_check(struct obd_export *exp, struct lustre_id *id, +int mdc_getattr_name(struct obd_export *exp, const struct lu_fid *fid, + struct obd_capa *oc, const char *filename, int namelen, + obd_valid valid, int ea_size, struct ptlrpc_request **request) - { struct ptlrpc_request *req; - struct mds_body *body; - int size[2] = {0, sizeof(*body)}; + struct mdt_body *body; + int size[4] = { sizeof(struct ptlrpc_body), sizeof(*body), 0, namelen}; int rc; ENTRY; - size[0] = lustre_secdesc_size(); + size[REQ_REC_OFF + 1] = oc ? sizeof(struct lustre_capa) : 0; + req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION, - MDS_ACCESS_CHECK, 2, size, NULL); + MDS_GETATTR_NAME, 4, size, NULL); if (!req) GOTO(out, rc = -ENOMEM); - lustre_pack_secdesc(req, size[0]); - body = lustre_msg_buf(req->rq_reqmsg, MDS_REQ_REC_OFF, sizeof (*body)); - memcpy(&body->id1, id, sizeof(*id)); + mdc_pack_req_body(req, REQ_REC_OFF, valid, fid, oc, ea_size, + MDS_BFLAG_EXT_FLAGS/*request "new" flags(bug 9486)*/); - size[0] = sizeof(*body); - size[1] = sizeof(struct mds_remote_perm); - req->rq_replen = lustre_msg_size(2, size); + if (filename) { + LASSERT(strnlen(filename, namelen) == namelen - 1); + memcpy(lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2, namelen), + filename, namelen); + } - mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL); - rc = ptlrpc_queue_wait(req); - mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL); + rc = mdc_getattr_common(exp, ea_size, 0, !!(valid & OBD_MD_FLMDSCAPA), + req); if (rc != 0) { ptlrpc_req_finished (req); req = NULL; - } else { - body = lustre_swab_repbuf (req, 0, sizeof (*body), - lustre_swab_mds_body); - if (body == NULL) { - CERROR ("Can't unpack mds_body\n"); - RETURN (-EPROTO); - } } + out: + *request = req; + RETURN(rc); +} + +static int mdc_is_subdir(struct obd_export *exp, const struct lu_fid *pfid, + const struct lu_fid *cfid, struct ptlrpc_request **request) +{ + int size[2] = { sizeof(struct ptlrpc_body), + sizeof(struct mdt_body) }; + struct ptlrpc_request *req; + struct mdt_body *body; + int rc; + ENTRY; + + req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION, + MDS_IS_SUBDIR, 2, size, NULL); + if (!req) + GOTO(out, rc = -ENOMEM); + + mdc_is_subdir_pack(req, REQ_REC_OFF, pfid, cfid, 0); + + ptlrpc_req_set_repsize(req, 2, size); + rc = ptlrpc_queue_wait(req); + if (rc != 0 && rc != -EREMOTE) + GOTO(out, rc); + body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body), + lustre_swab_mdt_body); + if (body == NULL) { + CERROR ("Can't unpack mdt_body\n"); + GOTO(out, rc = -EPROTO); + } + EXIT; out: *request = req; - RETURN (rc); + return rc; } -int mdc_getattr_lock(struct obd_export *exp, struct lustre_id *id, - char *filename, int namelen, __u64 valid, - unsigned int ea_size, struct ptlrpc_request **request) +static +int mdc_xattr_common(struct obd_export *exp, const struct lu_fid *fid, + struct obd_capa *oc, + int opcode, obd_valid valid, const char *xattr_name, + const char *input, int input_size, int output_size, + int flags, struct ptlrpc_request **request) { struct ptlrpc_request *req; - struct mds_body *body; - int rc, size[3] = {0, sizeof(*body), namelen}; + int size[5] = { sizeof(struct ptlrpc_body), sizeof(struct mdt_body) }; + int bufcnt = 3, offset = REQ_REC_OFF + 2; + int rc, xattr_namelen = 0, remote_acl = 0; + void *tmp; ENTRY; - size[0] = lustre_secdesc_size(); + size[REQ_REC_OFF + 1] = oc ? sizeof(struct lustre_capa) : 0; + if (xattr_name) { + xattr_namelen = strlen(xattr_name) + 1; + size[bufcnt++] = xattr_namelen; + } + if (input_size) { + LASSERT(input); + size[bufcnt++] = input_size; + } req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION, - MDS_GETATTR_LOCK, 3, size, NULL); + opcode, bufcnt, size, NULL); if (!req) GOTO(out, rc = -ENOMEM); - lustre_pack_secdesc(req, size[0]); + /* request data */ + mdc_pack_req_body(req, REQ_REC_OFF, valid, fid, oc, output_size, flags); - body = lustre_msg_buf(req->rq_reqmsg, MDS_REQ_REC_OFF, sizeof (*body)); - memcpy(&body->id1, id, sizeof(*id)); - body->valid = valid; - body->eadatasize = ea_size; - if (filename != NULL) { - LASSERT (strnlen (filename, namelen) == namelen - 1); - memcpy(lustre_msg_buf(req->rq_reqmsg, 2, namelen), - filename, namelen); + if (xattr_name) { + tmp = lustre_msg_buf(req->rq_reqmsg, offset++, xattr_namelen); + memcpy(tmp, xattr_name, xattr_namelen); + if (!strcmp(xattr_name, XATTR_NAME_LUSTRE_ACL)) + remote_acl = 1; + } + if (input_size) { + tmp = lustre_msg_buf(req->rq_reqmsg, offset++, input_size); + memcpy(tmp, input, input_size); + } + + /* reply buffers */ + if (opcode == MDS_GETXATTR) { + size[REPLY_REC_OFF] = sizeof(struct mdt_body); + bufcnt = 2; } else { - LASSERT(namelen == 1); + bufcnt = 1; } - rc = mdc_getattr_common(exp, ea_size, req); - if (rc != 0) { - ptlrpc_req_finished (req); - req = NULL; + /* we do this even output_size is 0, because server is doing that */ + size[bufcnt++] = output_size; + ptlrpc_req_set_repsize(req, bufcnt, size); + + /* make rpc */ + /* NB: set remote acl doesn't need hold rpc lock, because it just + * send command to MDS, and when it's executed on mountpoint on MDS, + * another mdc_xattr_common() will be invoked there. */ + if (opcode == MDS_SETXATTR && !remote_acl) + mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL); + + rc = ptlrpc_queue_wait(req); + + if (opcode == MDS_SETXATTR && !remote_acl) + mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL); + + if (rc != 0) + GOTO(err_out, rc); + + if (opcode == MDS_GETXATTR) { + struct mdt_body * body = lustre_swab_repbuf(req, REPLY_REC_OFF, + sizeof(*body), lustre_swab_mdt_body); + if (body == NULL) { + CERROR ("Can't unpack mdt_body\n"); + GOTO(err_out, rc = -EPROTO); + } } - out: +out: *request = req; - RETURN(rc); + RETURN (rc); +err_out: + ptlrpc_req_finished(req); + req = NULL; + goto out; } -/* This should be called with both the request and the reply still packed. */ -int mdc_store_inode_generation(struct obd_export *exp, - struct ptlrpc_request *req, - int reqoff, int repoff) +int mdc_setxattr(struct obd_export *exp, const struct lu_fid *fid, + struct obd_capa *oc, obd_valid valid, const char *xattr_name, + const char *input, int input_size, int output_size, int flags, + struct ptlrpc_request **request) { - struct mds_rec_create *rec = - lustre_msg_buf(req->rq_reqmsg, reqoff, sizeof(*rec)); - struct mds_body *body = - lustre_msg_buf(req->rq_repmsg, repoff, sizeof(*body)); - - LASSERT (rec != NULL); - LASSERT (body != NULL); + return mdc_xattr_common(exp, fid, oc, MDS_SETXATTR, valid, xattr_name, + input, input_size, output_size, flags, request); +} - memcpy(&rec->cr_replayid, &body->id1, sizeof(rec->cr_replayid)); - DEBUG_REQ(D_HA, req, "storing generation for ino "DLID4, - OLID4(&rec->cr_replayid)); - return 0; +int mdc_getxattr(struct obd_export *exp, const struct lu_fid *fid, + struct obd_capa *oc, obd_valid valid, const char *xattr_name, + const char *input, int input_size, int output_size, int flags, + struct ptlrpc_request **request) +{ + return mdc_xattr_common(exp, fid, oc, MDS_GETXATTR, valid, xattr_name, + input, input_size, output_size, flags, request); } +#ifdef CONFIG_FS_POSIX_ACL static -int mdc_unpack_acl(struct obd_export *exp_lmv, struct ptlrpc_request *req, - unsigned int offset, struct lustre_md *md) +int mdc_unpack_acl(struct obd_export *exp, struct ptlrpc_request *req, + struct lustre_md *md, unsigned int offset) { + struct mdt_body *body = md->body; struct posix_acl *acl; - struct mds_remote_perm *perm; - int size, rc; - void *buf; - ENTRY; - - if (!(md->body->valid & OBD_MD_FLACL)) - RETURN(0); - - if (md->body->valid & OBD_MD_FLRMTACL) { - offset++; /* first 'size' is not used */ + void *buf; + int rc; - buf = lustre_swab_repbuf(req, offset, sizeof(*perm), - lustre_swab_remote_perm); - if (buf == NULL) { - CERROR("Can't unpack remote perm\n"); - RETURN(-EFAULT); - } + if (!body->aclsize) + return 0; - OBD_ALLOC(perm, sizeof(*perm)); - if (!perm) - RETURN(-ENOMEM); - memcpy(perm, buf, sizeof(*perm)); - md->remote_perm = perm; - } else { - size = le32_to_cpu(*(__u32 *) lustre_msg_buf( - req->rq_repmsg, offset, 4)); - buf = lustre_msg_buf(req->rq_repmsg, offset + 1, size); - - acl = posix_acl_from_xattr(buf, size); - if (IS_ERR(acl)) { - rc = PTR_ERR(acl); - CERROR("convert xattr to acl failed: %d\n", rc); - RETURN(rc); - } else if (acl) { - rc = posix_acl_valid(acl); - if (rc) { - CERROR("acl valid error: %d\n", rc); - posix_acl_release(acl); - RETURN(rc); - } - } + buf = lustre_msg_buf(req->rq_repmsg, offset, body->aclsize); + if (!buf) { + CERROR("aclsize %u, bufcount %u, bufsize %u\n", + body->aclsize, lustre_msg_bufcount(req->rq_repmsg), + (lustre_msg_bufcount(req->rq_repmsg) <= offset) ? + -1 : lustre_msg_buflen(req->rq_repmsg, offset)); + return -EPROTO; + } - md->posix_acl = acl; + acl = posix_acl_from_xattr(buf, body->aclsize); + if (IS_ERR(acl)) { + rc = PTR_ERR(acl); + CERROR("convert xattr to acl: %d\n", rc); + return rc; } - RETURN(0); -} + rc = posix_acl_valid(acl); + if (rc) { + CERROR("validate acl: %d\n", rc); + posix_acl_release(acl); + return rc; + } -static int mdc_unpack_gskey(struct obd_export *exp_lmv, struct ptlrpc_request *req, - unsigned int *offset, struct lustre_md *md) -{ - int key_off = 0, rc = 0, size = 0; - void *buf; - - key_off = *offset; - if (md->body->valid & OBD_MD_FLKEY) { - size = le32_to_cpu(*(__u32 *) lustre_msg_buf(req->rq_repmsg, - key_off++, 4)); - buf = lustre_msg_buf(req->rq_repmsg, key_off++, size); - - CDEBUG(D_INFO, "buf %p key_off %d size %d \n", - buf, key_off, size); - md->key = (struct lustre_key *)buf; - *offset = key_off; - } else { - *offset += 2; - } - RETURN(rc); + md->posix_acl = acl; + return 0; } +#else +#define mdc_unpack_acl(exp, req, md, offset) 0 +#endif -int mdc_req2lustre_md(struct obd_export *exp_lmv, struct ptlrpc_request *req, - unsigned int offset, struct obd_export *exp_lov, +int mdc_get_lustre_md(struct obd_export *exp, struct ptlrpc_request *req, + int offset, struct obd_export *dt_exp, + struct obd_export *md_exp, struct lustre_md *md) { - struct lov_mds_md *lmm; - int rc = 0, reply_off; + int rc; ENTRY; - LASSERT(md != NULL); + LASSERT(md); memset(md, 0, sizeof(*md)); - md->body = lustre_msg_buf(req->rq_repmsg, offset, - sizeof(*md->body)); - if (!md->body) - RETURN(-ENOMEM); + md->body = lustre_msg_buf(req->rq_repmsg, offset, sizeof (*md->body)); + LASSERT (md->body != NULL); + LASSERT(lustre_rep_swabbed(req, offset)); + offset++; - LASSERT_REPSWABBED(req, offset); + if (md->body->valid & OBD_MD_FLEASIZE) { + int lmmsize; + struct lov_mds_md *lmm; - if (!(md->body->valid & OBD_MD_FLEASIZE) && - !(md->body->valid & OBD_MD_FLDIREA)) - RETURN(0); + if (!S_ISREG(md->body->mode)) { + CERROR("OBD_MD_FLEASIZE set, should be a regular file, " + "but is not\n"); + GOTO(out, rc = -EPROTO); + } - if (S_ISREG(md->body->mode)) { if (md->body->eadatasize == 0) { - CERROR("invalid EA size (0) is detected\n"); - RETURN(-EPROTO); + CERROR("OBD_MD_FLEASIZE set, but eadatasize 0\n"); + GOTO(out, rc = -EPROTO); } + lmmsize = md->body->eadatasize; + lmm = lustre_msg_buf(req->rq_repmsg, offset, lmmsize); + if (!lmm) { + CERROR ("incorrect message: lmm == 0\n"); + GOTO(out, rc = -EPROTO); + } + LASSERT(lustre_rep_swabbed(req, offset)); - lmm = lustre_msg_buf(req->rq_repmsg, offset + 1, - md->body->eadatasize); - if (!lmm) - RETURN(-EINVAL); + rc = obd_unpackmd(dt_exp, &md->lsm, lmm, lmmsize); + if (rc < 0) + GOTO(out, rc); - LASSERT(exp_lov != NULL); - - rc = obd_unpackmd(exp_lov, &md->lsm, lmm, - md->body->eadatasize); - if (rc > 0) { - LASSERT(rc >= sizeof(*md->lsm)); - rc = 0; + if (rc < sizeof(*md->lsm)) { + CERROR ("lsm size too small: rc < sizeof (*md->lsm) " + "(%d < %d)\n", rc, sizeof(*md->lsm)); + GOTO(out, rc = -EPROTO); } - } else if (S_ISDIR(md->body->mode)) { - /* dir can be non-splitted */ - if (md->body->eadatasize == 0) - RETURN(0); - lmm = lustre_msg_buf(req->rq_repmsg, offset + 1, - md->body->eadatasize); - if (!lmm) - RETURN(-EINVAL); + offset++; + } else if (md->body->valid & OBD_MD_FLDIREA) { + int lmvsize; + struct lov_mds_md *lmv; + + if(!S_ISDIR(md->body->mode)) { + CERROR("OBD_MD_FLDIREA set, should be a directory, but " + "is not\n"); + GOTO(out, rc = -EPROTO); + } + if (md->body->eadatasize == 0) { + CERROR("OBD_MD_FLDIREA is set, but eadatasize 0\n"); + RETURN(-EPROTO); + } if (md->body->valid & OBD_MD_MEA) { - LASSERT(exp_lmv != NULL); - - rc = obd_unpackmd(exp_lmv, (void *)&md->mea, - lmm, md->body->eadatasize); - if (rc > 0) { - LASSERT(rc >= sizeof(*md->mea)); - rc = 0; + lmvsize = md->body->eadatasize; + lmv = lustre_msg_buf(req->rq_repmsg, offset, lmvsize); + if (!lmv) { + CERROR ("incorrect message: lmv == 0\n"); + GOTO(out, rc = -EPROTO); + } + + LASSERT(lustre_rep_swabbed(req, offset)); + + rc = obd_unpackmd(md_exp, (void *)&md->mea, lmv, + lmvsize); + if (rc < 0) + GOTO(out, rc); + + if (rc < sizeof(*md->mea)) { + CERROR ("size too small: rc < sizeof(*md->mea) " + "(%d < %d)\n", rc, sizeof(*md->mea)); + GOTO(out, rc = -EPROTO); } } - } else { - LASSERT(S_ISCHR(md->body->mode) || - S_ISBLK(md->body->mode) || - S_ISFIFO(md->body->mode)|| - S_ISLNK(md->body->mode) || - S_ISSOCK(md->body->mode)); + offset++; + } + rc = 0; + + /* remote permission */ + if (md->body->valid & OBD_MD_FLRMTPERM) { + md->remote_perm = lustre_msg_buf(req->rq_repmsg, offset++, + sizeof(struct mdt_remote_perm)); + if (!md->remote_perm) { + CERROR ("incorrect message: remote_perm == 0\n"); + GOTO(out, rc = -EPROTO); + } } - /* if anything wrong when unpacking md, we don't check acl - * stuff, for simplicity - */ - if (rc) - RETURN(rc); - - reply_off = (md->body->valid & OBD_MD_FLEASIZE) ? - (offset + 2) : (offset + 1); - rc = mdc_unpack_acl(exp_lmv, req, reply_off, md); - if (rc) { - CERROR("upack acl error %d \n", rc); - RETURN(rc); + /* for ACL, it's possible that FLACL is set but aclsize is zero. only + * when aclsize != 0 there's an actual segment for ACL in reply + * buffer. */ + else if (md->body->valid & OBD_MD_FLACL) { + if (md->body->aclsize) { + rc = mdc_unpack_acl(dt_exp, req, md, offset++); + if (rc) + GOTO(out, rc); +#ifdef CONFIG_FS_POSIX_ACL + } else { + md->posix_acl = NULL; +#endif + } } - reply_off += 2; - - rc = mdc_unpack_gskey(exp_lmv, req, &reply_off, md); - if (rc) - RETURN(rc); - RETURN(rc); -} + if (md->body->valid & OBD_MD_FLMDSCAPA) { + struct obd_capa *oc = mdc_unpack_capa(req, offset++); -static void mdc_commit_open(struct ptlrpc_request *req) -{ - struct mdc_open_data *mod = req->rq_cb_data; - if (mod == NULL) - return; + if (IS_ERR(oc)) + GOTO(out, rc = PTR_ERR(oc)); + md->mds_capa = oc; + } - if (mod->mod_close_req != NULL) - mod->mod_close_req->rq_cb_data = NULL; + if (md->body->valid & OBD_MD_FLOSSCAPA) { + struct obd_capa *oc = mdc_unpack_capa(req, offset++); - if (mod->mod_och != NULL) - mod->mod_och->och_mod = NULL; + if (IS_ERR(oc)) + GOTO(out, rc = PTR_ERR(oc)); + md->oss_capa = oc; + } - OBD_FREE(mod, sizeof(*mod)); - req->rq_cb_data = NULL; - LASSERT(atomic_read(&req->rq_refcount) > 1); - ptlrpc_req_finished(req); + EXIT; +out: + if (rc) { + if (md->oss_capa) + free_capa(md->oss_capa); + if (md->mds_capa) + free_capa(md->mds_capa); +#ifdef CONFIG_FS_POSIX_ACL + posix_acl_release(md->posix_acl); +#endif + if (md->lsm) + obd_free_memmd(dt_exp, &md->lsm); + } + return rc; +} + +int mdc_free_lustre_md(struct obd_export *exp, struct lustre_md *md) +{ + ENTRY; + RETURN(0); } static void mdc_replay_open(struct ptlrpc_request *req) { - struct mdc_open_data *mod = req->rq_cb_data; + struct md_open_data *mod = req->rq_cb_data; + struct ptlrpc_request *cur, *tmp; struct obd_client_handle *och; - struct ptlrpc_request *close_req; struct lustre_handle old; - struct mds_body *body; + struct mdt_body *body; ENTRY; - body = lustre_swab_repbuf(req, 1, sizeof(*body), lustre_swab_mds_body); - LASSERT (body != NULL); - if (mod == NULL) { DEBUG_REQ(D_ERROR, req, - "can't properly replay without open data"); + "Can't properly replay without open data."); EXIT; return; } + body = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF, sizeof(*body), + lustre_swab_mdt_body); + LASSERT(body != NULL); + och = mod->mod_och; if (och != NULL) { struct lustre_handle *file_fh; + LASSERT(och->och_magic == OBD_CLIENT_HANDLE_MAGIC); + file_fh = &och->och_fh; CDEBUG(D_HA, "updating handle from "LPX64" to "LPX64"\n", file_fh->cookie, body->handle.cookie); - memcpy(&old, file_fh, sizeof(old)); - memcpy(file_fh, &body->handle, sizeof(*file_fh)); + old = *file_fh; + *file_fh = body->handle; } - close_req = mod->mod_close_req; - if (close_req != NULL) { - struct mds_body *close_body; - LASSERT(close_req->rq_reqmsg->opc == MDS_CLOSE); - close_body = lustre_msg_buf(close_req->rq_reqmsg, - MDS_REQ_REC_OFF, - sizeof(*close_body)); - if (och != NULL) - LASSERT(!memcmp(&old, &close_body->handle, sizeof old)); - DEBUG_REQ(D_HA, close_req, "updating close body with new fh"); - memcpy(&close_body->handle, &body->handle, - sizeof(close_body->handle)); + list_for_each_entry_safe(cur, tmp, &mod->mod_replay_list, rq_mod_list) { + int opc = lustre_msg_get_opc(cur->rq_reqmsg); + struct mdt_epoch *epoch = NULL; + + if (opc == MDS_CLOSE || opc == MDS_DONE_WRITING) { + epoch = lustre_msg_buf(cur->rq_reqmsg, + REQ_REC_OFF, sizeof(*epoch)); + LASSERT(epoch); + DEBUG_REQ(D_HA, cur, "updating %s body with new fh", + opc == MDS_CLOSE ? "CLOSE" : "DONE_WRITING"); + } else if (opc == MDS_REINT) { + struct mdt_rec_setattr *rec; + + /* Check this is REINT_SETATTR. */ + rec = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, + sizeof (*rec)); + LASSERT(rec && rec->sa_opcode == REINT_SETATTR); + + epoch = lustre_msg_buf(cur->rq_reqmsg, + REQ_REC_OFF + 2, sizeof(*epoch)); + LASSERT(epoch); + DEBUG_REQ(D_HA, cur, "updating REINT_SETATTR body " + "with new fh"); + } + if (epoch) { + if (och != NULL) + LASSERT(!memcmp(&old, &epoch->handle, + sizeof(old))); + epoch->handle = body->handle; + } } - EXIT; } +void mdc_commit_delayed(struct ptlrpc_request *req) +{ + struct md_open_data *mod = req->rq_cb_data; + struct ptlrpc_request *cur, *tmp; + + DEBUG_REQ(D_HA, req, "req committed"); + + if (mod == NULL) + return; + + req->rq_cb_data = NULL; + req->rq_commit_cb = NULL; + list_del_init(&req->rq_mod_list); + if (req->rq_sequence) { + list_for_each_entry_safe(cur, tmp, &mod->mod_replay_list, + rq_mod_list) { + LASSERT(cur != LP_POISON); + LASSERT(cur->rq_type != LI_POISON); + DEBUG_REQ(D_HA, cur, "req balanced"); + LASSERT(cur->rq_transno != 0); + LASSERT(cur->rq_import == req->rq_import); + + /* We no longer want to preserve this for transno- + * unconditional replay. */ + spin_lock(&cur->rq_lock); + cur->rq_replay = 0; + spin_unlock(&cur->rq_lock); + } + } + + if (list_empty(&mod->mod_replay_list)) { + if (mod->mod_och != NULL) + mod->mod_och->och_mod = NULL; + + OBD_FREE_PTR(mod); + } +} + int mdc_set_open_replay_data(struct obd_export *exp, struct obd_client_handle *och, struct ptlrpc_request *open_req) { - struct mdc_open_data *mod; - struct mds_rec_create *rec; - struct mds_body *body; - - rec = lustre_msg_buf(open_req->rq_reqmsg, MDS_REQ_INTENT_REC_OFF, - sizeof(*rec)); - body = lustre_msg_buf(open_req->rq_repmsg, 1, sizeof(*body)); + struct md_open_data *mod; + struct mdt_rec_create *rec = lustre_msg_buf(open_req->rq_reqmsg, + DLM_INTENT_REC_OFF, + sizeof(*rec)); + struct mdt_body *body = lustre_msg_buf(open_req->rq_repmsg, + DLM_REPLY_REC_OFF, + sizeof(*body)); + struct obd_import *imp = open_req->rq_import; + ENTRY; LASSERT(rec != NULL); - /* outgoing messages always in my byte order */ + + /* Incoming message in my byte order (it's been swabbed). */ + LASSERT(lustre_rep_swabbed(open_req, DLM_REPLY_REC_OFF)); + + /* Outgoing messages always in my byte order. */ LASSERT(body != NULL); - /* incoming message in my byte order (it's been swabbed) */ - LASSERT_REPSWABBED(open_req, 1); - OBD_ALLOC(mod, sizeof(*mod)); - if (mod == NULL) { - DEBUG_REQ(D_ERROR, open_req, "can't allocate mdc_open_data"); - return 0; - } + /*Only the import is replayable, we set replay_open data */ + if (och && imp->imp_replayable) { + OBD_ALLOC(mod, sizeof(*mod)); + if (mod == NULL) { + DEBUG_REQ(D_ERROR, open_req, + "Can't allocate md_open_data"); + RETURN(0); + } + CFS_INIT_LIST_HEAD(&mod->mod_replay_list); + + spin_lock(&open_req->rq_lock); + if (!open_req->rq_replay) { + OBD_FREE(mod, sizeof(*mod)); + spin_unlock(&open_req->rq_lock); + RETURN(0); + } - och->och_mod = mod; - mod->mod_och = och; - mod->mod_open_req = ptlrpc_request_addref(open_req); + och->och_mod = mod; + mod->mod_och = och; + open_req->rq_cb_data = mod; + list_add_tail(&open_req->rq_mod_list, &mod->mod_replay_list); + open_req->rq_commit_cb = mdc_commit_delayed; + spin_unlock(&open_req->rq_lock); + } - memcpy(&rec->cr_replayid, &body->id1, sizeof rec->cr_replayid); + rec->cr_fid2 = body->fid1; + rec->cr_ioepoch = body->ioepoch; + rec->cr_old_handle.cookie = body->handle.cookie; open_req->rq_replay_cb = mdc_replay_open; - open_req->rq_commit_cb = mdc_commit_open; - open_req->rq_cb_data = mod; - DEBUG_REQ(D_HA, open_req, "set up replay data"); - return 0; + if (!fid_is_sane(&body->fid1)) { + DEBUG_REQ(D_ERROR, open_req, "Saving replay request with " + "insane fid"); + LBUG(); + } + + DEBUG_REQ(D_RPCTRACE, open_req, "Set up open replay data"); + RETURN(0); } int mdc_clear_open_replay_data(struct obd_export *exp, struct obd_client_handle *och) { - struct mdc_open_data *mod = och->och_mod; + struct md_open_data *mod = och->och_mod; + ENTRY; - /* Don't free the structure now (it happens in mdc_commit_open, after - * we're sure we won't need to fix up the close request in the future), + /* + * Don't free the structure now (it happens in mdc_commit_delayed(), + * after the last request is removed from its replay list), * but make sure that replay doesn't poke at the och, which is about to - * be freed. */ + * be freed. + */ LASSERT(mod != LP_POISON); if (mod != NULL) mod->mod_och = NULL; - och->och_mod = NULL; - return 0; -} - -static void mdc_commit_close(struct ptlrpc_request *req) -{ - struct mdc_open_data *mod = req->rq_cb_data; - struct obd_import *imp = req->rq_import; - struct ptlrpc_request *open_req; - DEBUG_REQ(D_HA, req, "close req committed"); - if (mod == NULL) - return; - - mod->mod_close_req = NULL; - req->rq_cb_data = NULL; - req->rq_commit_cb = NULL; - - open_req = mod->mod_open_req; - LASSERT(open_req != NULL); - LASSERT(open_req != LP_POISON); - LASSERT(open_req->rq_type != LI_POISON); - - DEBUG_REQ(D_HA, open_req, "open req balanced"); - if (open_req->rq_transno == 0) { - DEBUG_REQ(D_ERROR, open_req, "BUG 3892 open"); - DEBUG_REQ(D_ERROR, req, "BUG 3892 close"); - LASSERTF(open_req->rq_transno != 0, "BUG 3892\n"); - } - LASSERT(open_req->rq_import == imp); - - /* We no longer want to preserve this for transno-unconditional - * replay. */ - spin_lock(&open_req->rq_lock); - open_req->rq_replay = 0; - spin_unlock(&open_req->rq_lock); + och->och_mod = NULL; + RETURN(0); } -int mdc_close(struct obd_export *exp, struct mdc_op_data *op_data, - struct obd_client_handle *och, struct ptlrpc_request **request) +int mdc_close(struct obd_export *exp, struct md_op_data *op_data, + struct md_open_data *mod, struct ptlrpc_request **request) { struct obd_device *obd = class_exp2obd(exp); - struct obd_import *imp = class_exp2cliimp(exp); - int reqsize[3] = {0, sizeof(struct mds_body), - obd->u.cli.cl_max_mds_cookiesize}; - int rc, repsize[3] = {sizeof(struct mds_body), - obd->u.cli.cl_max_mds_easize, - obd->u.cli.cl_max_mds_cookiesize}; + int reqsize[4] = { sizeof(struct ptlrpc_body), + sizeof(struct mdt_epoch), + sizeof(struct mdt_rec_setattr)}; + int repsize[4] = { sizeof(struct ptlrpc_body), + sizeof(struct mdt_body), + obd->u.cli.cl_max_mds_easize, + obd->u.cli.cl_max_mds_cookiesize }; struct ptlrpc_request *req; - struct mdc_open_data *mod; + int rc; ENTRY; - if (imp->imp_connection == NULL) { - CERROR("request on not connected import %s\n", - imp->imp_obd->obd_name); - RETURN(-EIO); - } - + reqsize[REQ_REC_OFF + 2] = op_data->op_capa1 ? + sizeof(struct lustre_capa) : 0; req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION, - MDS_CLOSE, 3, reqsize, NULL); + MDS_CLOSE, 4, reqsize, NULL); if (req == NULL) GOTO(out, rc = -ENOMEM); - req->rq_request_portal = MDS_CLOSE_PORTAL; - - /* ensure that this close's handle is fixed up during replay. */ - LASSERT(och != NULL); - mod = och->och_mod; - if (likely(mod != NULL)) { - mod->mod_close_req = req; - LASSERT(mod->mod_open_req->rq_type != LI_POISON); - DEBUG_REQ(D_HA, mod->mod_open_req, "matched open"); - } else { - CDEBUG(D_HA, "couldn't find open req; " - "expecting close error\n"); - } - mdc_close_pack(req, 1, op_data, och); + /* To avoid a livelock (bug 7034), we need to send CLOSE RPCs to a + * portal whose threads are not taking any DLM locks and are therefore + * always progressing */ + /* XXX FIXME bug 249 */ + req->rq_request_portal = MDS_READPAGE_PORTAL; + + /* Ensure that this close's handle is fixed up during replay. */ + if (likely(mod != NULL)) + list_add_tail(&req->rq_mod_list, &mod->mod_replay_list); + else + CDEBUG(D_HA, "couldn't find open req; expecting close error\n"); - req->rq_replen = lustre_msg_size(3, repsize); - req->rq_commit_cb = mdc_commit_close; + mdc_close_pack(req, REQ_REC_OFF, op_data); + ptlrpc_req_set_repsize(req, 4, repsize); + req->rq_commit_cb = mdc_commit_delayed; + req->rq_replay = 1; LASSERT(req->rq_cb_data == NULL); req->rq_cb_data = mod; @@ -779,120 +869,172 @@ int mdc_close(struct obd_export *exp, struct mdc_op_data *op_data, mdc_put_rpc_lock(obd->u.cli.cl_close_lock, NULL); if (req->rq_repmsg == NULL) { - CDEBUG(D_HA, "request failed to send: %p, %d\n", req, + CDEBUG(D_RPCTRACE, "request failed to send: %p, %d\n", req, req->rq_status); if (rc == 0) rc = req->rq_status ? req->rq_status : -EIO; - } else if (rc == 0) { - rc = req->rq_repmsg->status; - if (req->rq_repmsg->type == PTL_RPC_MSG_ERR) { - DEBUG_REQ(D_ERROR, req, "type == PTL_RPC_MSG_ERR, " - "err = %d", rc); + } else if (rc == 0 || rc == -EAGAIN) { + rc = lustre_msg_get_status(req->rq_repmsg); + if (lustre_msg_get_type(req->rq_repmsg) == PTL_RPC_MSG_ERR) { + DEBUG_REQ(D_ERROR, req, "type == PTL_RPC_MSG_ERR, err " + "= %d", rc); if (rc > 0) rc = -rc; - } else { - if (mod == NULL) - CERROR("Unexpected: can't find mdc_open_data, but " - "close succeeded. Please tell CFS.\n"); - if (!lustre_swab_repbuf(req, 0, sizeof(struct mds_body), - lustre_swab_mds_body)) - { - CERROR("Error unpacking mds_body\n"); - rc = -EPROTO; - } + } else if (mod == NULL) { + if (req->rq_import->imp_replayable) + CERROR("Unexpected: can't find md_open_data," + "but close succeeded with replayable imp" + "Please tell CFS.\n"); + } + if (!lustre_swab_repbuf(req, REPLY_REC_OFF, + sizeof(struct mdt_body), + lustre_swab_mdt_body)) { + CERROR("Error unpacking mdt_body\n"); + rc = -EPROTO; } } EXIT; - out: *request = req; + out: + if (rc != 0 && rc != -EAGAIN && req && req->rq_commit_cb) + req->rq_commit_cb(req); + return rc; } -int mdc_done_writing(struct obd_export *exp, struct obdo *obdo) +int mdc_done_writing(struct obd_export *exp, struct md_op_data *op_data, + struct md_open_data *mod) { + struct obd_device *obd = class_exp2obd(exp); struct ptlrpc_request *req; - struct mds_body *body; - int rc, size[2] = {0, sizeof(*body)}; + int size[4] = { sizeof(struct ptlrpc_body), + sizeof(struct mdt_epoch), + sizeof(struct mdt_rec_setattr)}; + int repsize[2] = { sizeof(struct ptlrpc_body), + sizeof(struct mdt_body)}; + int rc; + ENTRY; + + if (op_data->op_capa1) + size[REQ_REC_OFF + 2] = sizeof(struct lustre_capa); + req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION, + MDS_DONE_WRITING, 4, size, NULL); + if (req == NULL) + RETURN(-ENOMEM); + + mdc_close_pack(req, REQ_REC_OFF, op_data); + + req->rq_replay = 1; + req->rq_cb_data = mod; + req->rq_commit_cb = mdc_commit_delayed; + if (likely(mod != NULL)) + list_add_tail(&req->rq_mod_list, &mod->mod_replay_list); + else + CDEBUG(D_HA, "couldn't find open req; expecting close error\n"); + + ptlrpc_req_set_repsize(req, 2, repsize); + mdc_get_rpc_lock(obd->u.cli.cl_close_lock, NULL); + rc = ptlrpc_queue_wait(req); + mdc_put_rpc_lock(obd->u.cli.cl_close_lock, NULL); + + /* Close the open replay sequence if an error occured or no SOM + * attribute update is needed. */ + if (rc != -EAGAIN) + ptlrpc_close_replay_seq(req); + + if (rc && rc != -EAGAIN && req->rq_commit_cb) + req->rq_commit_cb(req); + + ptlrpc_req_finished(req); + RETURN(rc); +} + +#ifdef HAVE_SPLIT_SUPPORT +int mdc_sendpage(struct obd_export *exp, const struct lu_fid *fid, + const struct page *page, int offset) +{ + int rc, size[3] = { sizeof(struct ptlrpc_body), sizeof(struct mdt_body) }; + struct obd_import *imp = class_exp2cliimp(exp); + struct ptlrpc_bulk_desc *desc = NULL; + struct ptlrpc_request *req = NULL; ENTRY; - size[0] = lustre_secdesc_size(); + CDEBUG(D_INODE, "object: "DFID"\n", PFID(fid)); - req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION, - MDS_DONE_WRITING, 2, size, NULL); + req = ptlrpc_prep_req(imp, LUSTRE_MDS_VERSION, MDS_WRITEPAGE, 3, + size, NULL); if (req == NULL) - RETURN(-ENOMEM); + GOTO(out, rc = -ENOMEM); - lustre_pack_secdesc(req, size[0]); + req->rq_request_portal = MDS_READPAGE_PORTAL; - body = lustre_msg_buf(req->rq_reqmsg, MDS_REQ_REC_OFF, - sizeof(*body)); - - mdc_pack_id(&body->id1, obdo->o_id, 0, obdo->o_mode, - obdo->o_mds, obdo->o_fid); - - body->size = obdo->o_size; - body->blocks = obdo->o_blocks; - body->flags = obdo->o_flags; - body->valid = obdo->o_valid; + desc = ptlrpc_prep_bulk_imp(req, 1, BULK_GET_SOURCE, MDS_BULK_PORTAL); + if (desc == NULL) + GOTO(out, rc = -ENOMEM); - req->rq_replen = lustre_msg_size(1, &size[1]); + /* NB req now owns desc and will free it when it gets freed. */ + ptlrpc_prep_bulk_page(desc, (struct page *)page, 0, offset); + mdc_readdir_pack(req, REQ_REC_OFF, 0, offset, fid, NULL); + ptlrpc_req_set_repsize(req, 2, size); rc = ptlrpc_queue_wait(req); - ptlrpc_req_finished(req); - RETURN(rc); + EXIT; +out: + if (req != NULL) + ptlrpc_req_finished(req); + return rc; } +EXPORT_SYMBOL(mdc_sendpage); +#endif -int mdc_readpage(struct obd_export *exp, - struct lustre_id *id, - __u64 offset, struct page *page, +int mdc_readpage(struct obd_export *exp, const struct lu_fid *fid, + struct obd_capa *oc, __u64 offset, struct page *page, struct ptlrpc_request **request) { + int rc, size[3] = { sizeof(struct ptlrpc_body), sizeof(struct mdt_body) }; struct obd_import *imp = class_exp2cliimp(exp); - struct ptlrpc_request *req = NULL; struct ptlrpc_bulk_desc *desc = NULL; - struct mds_body *body; - int rc, size[2] = {0, sizeof(*body)}; + struct ptlrpc_request *req = NULL; + struct mdt_body *body; ENTRY; - CDEBUG(D_INODE, "inode: %ld\n", (long)id->li_stc.u.e3s.l3s_ino); - - size[0] = lustre_secdesc_size(); + CDEBUG(D_INODE, "object: "DFID"\n", PFID(fid)); - req = ptlrpc_prep_req(imp, LUSTRE_MDS_VERSION, MDS_READPAGE, - 2, size, NULL); + size[REQ_REC_OFF + 1] = oc ? sizeof(struct lustre_capa) : 0; + req = ptlrpc_prep_req(imp, LUSTRE_MDS_VERSION, MDS_READPAGE, 3, size, + NULL); if (req == NULL) GOTO(out, rc = -ENOMEM); + /* XXX FIXME bug 249 */ req->rq_request_portal = MDS_READPAGE_PORTAL; - lustre_pack_secdesc(req, size[0]); - desc = ptlrpc_prep_bulk_imp(req, 1, BULK_PUT_SINK, MDS_BULK_PORTAL); if (desc == NULL) GOTO(out, rc = -ENOMEM); - /* NB req now owns desc and will free it when it gets freed */ - ptlrpc_prep_bulk_page(desc, page, 0, PAGE_CACHE_SIZE); - mdc_readdir_pack(req, 1, offset, PAGE_CACHE_SIZE, id); + /* NB req now owns desc and will free it when it gets freed */ + ptlrpc_prep_bulk_page(desc, page, 0, CFS_PAGE_SIZE); + mdc_readdir_pack(req, REQ_REC_OFF, offset, CFS_PAGE_SIZE, fid, oc); - req->rq_replen = lustre_msg_size(1, &size[1]); + ptlrpc_req_set_repsize(req, 2, size); rc = ptlrpc_queue_wait(req); if (rc == 0) { - body = lustre_swab_repbuf(req, 0, sizeof (*body), - lustre_swab_mds_body); + body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body), + lustre_swab_mdt_body); if (body == NULL) { - CERROR("Can't unpack mds_body\n"); + CERROR("Can't unpack mdt_body\n"); GOTO(out, rc = -EPROTO); } - if (req->rq_bulk->bd_nob_transferred != PAGE_CACHE_SIZE) { + if (req->rq_bulk->bd_nob_transferred != CFS_PAGE_SIZE) { CERROR ("Unexpected # bytes transferred: %d" " (%ld expected)\n", req->rq_bulk->bd_nob_transferred, - PAGE_CACHE_SIZE); - GOTO (out, rc = -EPROTO); + CFS_PAGE_SIZE); + GOTO(out, rc = -EPROTO); } } @@ -912,14 +1054,10 @@ static int mdc_iocontrol(unsigned int cmd, struct obd_export *exp, int len, int rc; ENTRY; -#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)) - MOD_INC_USE_COUNT; -#else if (!try_module_get(THIS_MODULE)) { CERROR("Can't get module. Is it alive?"); return -EINVAL; } -#endif switch (cmd) { case OBD_IOC_CLIENT_RECOVER: rc = ptlrpc_recover_import(imp, data->ioc_inlbuf1); @@ -929,162 +1067,157 @@ static int mdc_iocontrol(unsigned int cmd, struct obd_export *exp, int len, case IOC_OSC_SET_ACTIVE: rc = ptlrpc_set_import_active(imp, data->ioc_offset); GOTO(out, rc); - case IOC_OSC_CTL_RECOVERY: - rc = ptlrpc_import_control_recovery(imp, data->ioc_offset); - GOTO(out, rc); case OBD_IOC_PARSE: { - ctxt = llog_get_context(&exp->exp_obd->obd_llogs, - LLOG_CONFIG_REPL_CTXT); - rc = class_config_process_llog(ctxt, data->ioc_inlbuf1, NULL); + ctxt = llog_get_context(exp->exp_obd, LLOG_CONFIG_REPL_CTXT); + rc = class_config_parse_llog(ctxt, data->ioc_inlbuf1, NULL); GOTO(out, rc); } #ifdef __KERNEL__ case OBD_IOC_LLOG_INFO: case OBD_IOC_LLOG_PRINT: { - ctxt = llog_get_context(&obd->obd_llogs, LLOG_CONFIG_REPL_CTXT); + ctxt = llog_get_context(obd, LLOG_CONFIG_REPL_CTXT); rc = llog_ioctl(ctxt, cmd, data); GOTO(out, rc); } #endif + case OBD_IOC_POLL_QUOTACHECK: + rc = lquota_poll_check(quota_interface, exp, + (struct if_quotacheck *)karg); + GOTO(out, rc); default: CERROR("mdc_ioctl(): unrecognised ioctl %#x\n", cmd); GOTO(out, rc = -ENOTTY); } out: -#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)) - MOD_DEC_USE_COUNT; -#else module_put(THIS_MODULE); -#endif return rc; } -int mdc_set_info(struct obd_export *exp, obd_count keylen, - void *key, obd_count vallen, void *val) +int mdc_set_info_async(struct obd_export *exp, obd_count keylen, + void *key, obd_count vallen, void *val, + struct ptlrpc_request_set *set) { + struct obd_import *imp = class_exp2cliimp(exp); int rc = -EINVAL; - if (keylen == strlen("initial_recov") && - memcmp(key, "initial_recov", strlen("initial_recov")) == 0) { - struct obd_import *imp = exp->exp_obd->u.cli.cl_import; + if (KEY_IS(KEY_INIT_RECOV)) { if (vallen != sizeof(int)) RETURN(-EINVAL); + spin_lock(&imp->imp_lock); imp->imp_initial_recov = *(int *)val; - CDEBUG(D_HA, "%s: set imp_no_init_recov = %d\n", - exp->exp_obd->obd_name, - imp->imp_initial_recov); + spin_unlock(&imp->imp_lock); + CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n", + exp->exp_obd->obd_name, imp->imp_initial_recov); + RETURN(0); + } + /* Turn off initial_recov after we try all backup servers once */ + if (KEY_IS(KEY_INIT_RECOV_BACKUP)) { + if (vallen != sizeof(int)) + RETURN(-EINVAL); + spin_lock(&imp->imp_lock); + imp->imp_initial_recov_bk = *(int *)val; + if (imp->imp_initial_recov_bk) + imp->imp_initial_recov = 1; + spin_unlock(&imp->imp_lock); + CDEBUG(D_HA, "%s: set imp_initial_recov_bk = %d\n", + exp->exp_obd->obd_name, imp->imp_initial_recov_bk); RETURN(0); - } else if ((keylen >= strlen("crypto_type")) && - strcmp(key, "crypto_type") == 0) { + } + if (KEY_IS(KEY_READ_ONLY)) { struct ptlrpc_request *req; - char *bufs[2] = {key, val}; - int rc, size[2] = {keylen, vallen}; + int size[3] = { sizeof(struct ptlrpc_body), keylen, vallen }; + char *bufs[3] = { NULL, key, val }; + + if (vallen != sizeof(int)) + RETURN(-EINVAL); + + spin_lock(&imp->imp_lock); + if (*((int *)val)) { + imp->imp_connect_flags_orig |= OBD_CONNECT_RDONLY; + imp->imp_connect_data.ocd_connect_flags |= + OBD_CONNECT_RDONLY; + } else { + imp->imp_connect_flags_orig &= ~OBD_CONNECT_RDONLY; + imp->imp_connect_data.ocd_connect_flags &= + ~OBD_CONNECT_RDONLY; + } + spin_unlock(&imp->imp_lock); - req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION, - OST_SET_INFO, 2, size, bufs); + req = ptlrpc_prep_req(imp, LUSTRE_MDS_VERSION, MDS_SET_INFO, + 3, size, bufs); if (req == NULL) RETURN(-ENOMEM); - req->rq_replen = lustre_msg_size(0, NULL); - rc = ptlrpc_queue_wait(req); - ptlrpc_req_finished(req); + ptlrpc_req_set_repsize(req, 1, NULL); + if (set) { + rc = 0; + ptlrpc_set_add_req(set, req); + ptlrpc_check_set(set); + } else { + rc = ptlrpc_queue_wait(req); + ptlrpc_req_finished(req); + } + RETURN(rc); - } else if (keylen >= strlen("inter_mds") && strcmp(key, "inter_mds") == 0) { + } + if (KEY_IS(KEY_FLUSH_CTX)) { + sptlrpc_import_flush_my_ctx(imp); + RETURN(0); + } + if (KEY_IS(KEY_MDS_CONN)) { struct obd_import *imp = class_exp2cliimp(exp); + + /* mds-mds import */ + spin_lock(&imp->imp_lock); imp->imp_server_timeout = 1; - CDEBUG(D_OTHER, "%s: timeout / 2\n", exp->exp_obd->obd_name); + spin_unlock(&imp->imp_lock); + imp->imp_client->cli_request_portal = MDS_MDS_PORTAL; + CDEBUG(D_OTHER|D_WARNING, "%s: timeout / 2\n", exp->exp_obd->obd_name); RETURN(0); - } else if (keylen == strlen("sec") && - memcmp(key, "sec", keylen) == 0) { - struct client_obd *cli = &exp->exp_obd->u.cli; + } - cli->cl_sec_flavor = ptlrpcs_name2flavor(val); - if (cli->cl_sec_flavor == PTLRPCS_FLVR_INVALID) { - CERROR("unrecognized security type %s\n", (char*) val); - RETURN(-EINVAL); - } + RETURN(rc); +} - RETURN(0); - } else if (keylen == strlen("sec_flags") && - memcmp(key, "sec_flags", keylen) == 0) { - struct client_obd *cli = &exp->exp_obd->u.cli; +int mdc_get_info(struct obd_export *exp, __u32 keylen, void *key, + __u32 *vallen, void *val) +{ + int rc = -EINVAL; - cli->cl_sec_flags = *((unsigned long *) val); - RETURN(0); - } else if (keylen == strlen("flush_cred") && - memcmp(key, "flush_cred", keylen) == 0) { - struct client_obd *cli = &exp->exp_obd->u.cli; + if (KEY_IS(KEY_MAX_EASIZE)) { + int mdsize, *max_easize; - if (cli->cl_import) - ptlrpcs_import_flush_current_creds(cli->cl_import); - RETURN(0); - } else if (keylen == strlen("async") && memcmp(key, "async", keylen) == 0) { - struct client_obd *cl = &exp->exp_obd->u.cli; - if (vallen != sizeof(int)) + if (*vallen != sizeof(int)) RETURN(-EINVAL); - cl->cl_async = *(int *)val; - CDEBUG(D_HA, "%s: set async = %d\n", - exp->exp_obd->obd_name, cl->cl_async); + mdsize = *(int*)val; + if (mdsize > exp->exp_obd->u.cli.cl_max_mds_easize) + exp->exp_obd->u.cli.cl_max_mds_easize = mdsize; + max_easize = val; + *max_easize = exp->exp_obd->u.cli.cl_max_mds_easize; RETURN(0); - } else if (keylen == strlen("setext") && memcmp(key, "setext", keylen) == 0) { - struct ptlrpc_request *req; - char *bufs[2] = {key, val}; - int rc, size[2] = {keylen, vallen}; - - req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION, - OST_SET_INFO, 2, size, bufs); - if (req == NULL) - RETURN(-ENOMEM); - - req->rq_replen = lustre_msg_size(0, NULL); - rc = ptlrpc_queue_wait(req); - ptlrpc_req_finished(req); - RETURN(rc); - } else if (keylen == 5 && strcmp(key, "audit") == 0) { - struct ptlrpc_request *req; - char *bufs[2] = {key, val}; - int rc, size[2] = {keylen, vallen}; - - req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION, - OST_SET_INFO, 2, size, bufs); - if (req == NULL) - RETURN(-ENOMEM); - - req->rq_replen = lustre_msg_size(0, NULL); - lustre_swab_reqbuf(req, 1, sizeof(struct audit_attr_msg), - lustre_swab_audit_attr); - rc = ptlrpc_queue_wait(req); - ptlrpc_req_finished(req); - - RETURN(rc); - } else if (keylen == strlen("ids") && memcmp(key, "ids", keylen) == 0) { - struct ptlrpc_request *req; - struct lustre_id *ids = (struct lustre_id *)val; - char *bufs[3] = {key, (char *)ids, (char *)(ids + 1)}; - int rc, size[3] = {keylen, sizeof(struct lustre_id), - sizeof(struct lustre_id)}; + } + if (KEY_IS(KEY_CONN_DATA)) { + struct obd_import *imp = class_exp2cliimp(exp); + struct obd_connect_data *data = val; - req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION, - OST_SET_INFO, 3, size, bufs); - if (req == NULL) - RETURN(-ENOMEM); + if (*vallen != sizeof(*data)) + RETURN(-EINVAL); - req->rq_replen = lustre_msg_size(0, NULL); - rc = ptlrpc_queue_wait(req); - ptlrpc_req_finished(req); - RETURN(rc); + *data = imp->imp_connect_data; + RETURN(0); } + RETURN(rc); } static int mdc_statfs(struct obd_device *obd, struct obd_statfs *osfs, - unsigned long max_age) + __u64 max_age) { - struct obd_statfs *msfs; struct ptlrpc_request *req; - int rc, size = sizeof(*msfs); + struct obd_statfs *msfs; + int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*msfs) }; ENTRY; /* We could possibly pass max_age in the request (as an absolute @@ -1094,62 +1227,58 @@ static int mdc_statfs(struct obd_device *obd, struct obd_statfs *osfs, * is not so great if request processing is slow, while absolute * timestamps are not ideal because they need time synchronization. */ req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_MDS_VERSION, - MDS_STATFS, 0, NULL, NULL); + MDS_STATFS, 1, NULL, NULL); if (!req) RETURN(-ENOMEM); - req->rq_replen = lustre_msg_size(1, &size); + ptlrpc_req_set_repsize(req, 2, size); - mdc_get_rpc_lock(obd->u.cli.cl_rpc_lock, NULL); rc = ptlrpc_queue_wait(req); - mdc_put_rpc_lock(obd->u.cli.cl_rpc_lock, NULL); if (rc) { - /* this can be LMV fake import, whcih is not connected. */ - if (!req->rq_import->imp_connection) - memset(osfs, 0, sizeof(*osfs)); + /* check connection error first */ + if (obd->u.cli.cl_import->imp_connect_error) + rc = obd->u.cli.cl_import->imp_connect_error; + GOTO(out, rc); } - msfs = lustre_swab_repbuf(req, 0, sizeof(*msfs), + msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs), lustre_swab_obd_statfs); if (msfs == NULL) { CERROR("Can't unpack obd_statfs\n"); GOTO(out, rc = -EPROTO); } - memcpy(osfs, msfs, sizeof (*msfs)); + memcpy(osfs, msfs, sizeof(*msfs)); EXIT; out: ptlrpc_req_finished(req); + return rc; } -static int mdc_pin(struct obd_export *exp, obd_id ino, __u32 gen, int type, +static int mdc_pin(struct obd_export *exp, const struct lu_fid *fid, + struct obd_capa *oc, struct obd_client_handle *handle, int flag) { struct ptlrpc_request *req; - struct mds_body *body; - int rc, size[2] = {0, sizeof(*body)}; + struct mdt_body *body; + int rc, size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) }; ENTRY; - //size[0] = lustre_secdesc_size(); - + size[REQ_REC_OFF + 1] = oc ? sizeof(struct lustre_capa) : 0; req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION, - MDS_PIN, 2, size, NULL); + MDS_PIN, 3, size, NULL); if (req == NULL) RETURN(-ENOMEM); - //lustre_pack_secdesc(req, size[0]); - - body = lustre_msg_buf(req->rq_reqmsg, - MDS_REQ_REC_OFF, sizeof(*body)); - - /* FIXME-UMKA: here should be also mdsnum and fid. */ - mdc_pack_id(&body->id1, ino, gen, type, 0, 0); + body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof (*body)); + body->fid1 = *fid; body->flags = flag; + mdc_pack_capa(req, REQ_REC_OFF + 1, oc); - req->rq_replen = lustre_msg_size(1, &size[1]); + ptlrpc_req_set_repsize(req, 2, size); mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL); rc = ptlrpc_queue_wait(req); @@ -1160,7 +1289,8 @@ static int mdc_pin(struct obd_export *exp, obd_id ino, __u32 gen, int type, RETURN(rc); } - body = lustre_swab_repbuf(req, 0, sizeof(*body), lustre_swab_mds_body); + body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body), + lustre_swab_mdt_body); if (body == NULL) { ptlrpc_req_finished(req); RETURN(rc); @@ -1171,10 +1301,13 @@ static int mdc_pin(struct obd_export *exp, obd_id ino, __u32 gen, int type, OBD_ALLOC(handle->och_mod, sizeof(*handle->och_mod)); if (handle->och_mod == NULL) { - DEBUG_REQ(D_ERROR, req, "can't allocate mdc_open_data"); + DEBUG_REQ(D_ERROR, req, "can't allocate md_open_data"); RETURN(-ENOMEM); } - handle->och_mod->mod_open_req = req; /* will be dropped by unpin */ + + /* will be dropped by unpin */ + CFS_INIT_LIST_HEAD(&handle->och_mod->mod_replay_list); + list_add_tail(&req->rq_mod_list, &handle->och_mod->mod_replay_list); RETURN(rc); } @@ -1183,27 +1316,23 @@ static int mdc_unpin(struct obd_export *exp, struct obd_client_handle *handle, int flag) { struct ptlrpc_request *req; - struct mds_body *body; - int rc, size[2] = {0, sizeof(*body)}; + struct mdt_body *body; + int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) }; ENTRY; if (handle->och_magic != OBD_CLIENT_HANDLE_MAGIC) RETURN(0); - //size[0] = lustre_secdesc_size(); - req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION, MDS_CLOSE, 2, size, NULL); if (req == NULL) RETURN(-ENOMEM); - //lustre_pack_secdesc(req, size[0]); - - body = lustre_msg_buf(req->rq_reqmsg, MDS_REQ_REC_OFF, sizeof(*body)); + body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body)); memcpy(&body->handle, &handle->och_fh, sizeof(body->handle)); body->flags = flag; - req->rq_replen = lustre_msg_size(0, NULL); + ptlrpc_req_set_repsize(req, 1, NULL); mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL); rc = ptlrpc_queue_wait(req); mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL); @@ -1212,36 +1341,35 @@ static int mdc_unpin(struct obd_export *exp, CERROR("unpin failed: %d\n", rc); ptlrpc_req_finished(req); - ptlrpc_req_finished(handle->och_mod->mod_open_req); + + LASSERT(!list_empty(&handle->och_mod->mod_replay_list)); + req = list_entry(handle->och_mod->mod_replay_list.next, + typeof(*req), rq_mod_list); + list_del_init(&req->rq_mod_list); + ptlrpc_req_finished(req); + LASSERT(list_empty(&handle->och_mod->mod_replay_list)); + OBD_FREE(handle->och_mod, sizeof(*handle->och_mod)); RETURN(rc); } -int mdc_sync(struct obd_export *exp, struct lustre_id *id, - struct ptlrpc_request **request) +int mdc_sync(struct obd_export *exp, const struct lu_fid *fid, + struct obd_capa *oc, struct ptlrpc_request **request) { struct ptlrpc_request *req; - struct mds_body *body; - int size[2] = {0, sizeof(*body)}; + int size[3] = { sizeof(struct ptlrpc_body), sizeof(struct mdt_body) }; int rc; ENTRY; - //size[0] = lustre_secdesc_size(); - + size[REQ_REC_OFF + 1] = oc ? sizeof(struct lustre_capa) : 0; req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION, - MDS_SYNC, 2, size, NULL); + MDS_SYNC, 3, size, NULL); if (!req) RETURN(rc = -ENOMEM); - //lustre_pack_secdesc(req, size[0]); + mdc_pack_req_body(req, REQ_REC_OFF, 0, fid, oc, 0, 0); - if (id) { - body = lustre_msg_buf(req->rq_reqmsg, MDS_REQ_REC_OFF, - sizeof (*body)); - memcpy(&body->id1, id, sizeof(*id)); - } - - req->rq_replen = lustre_msg_size(1, &size[1]); + ptlrpc_req_set_repsize(req, 2, size); rc = ptlrpc_queue_wait(req); if (rc || request == NULL) @@ -1261,11 +1389,25 @@ static int mdc_import_event(struct obd_device *obd, struct obd_import *imp, switch (event) { case IMP_EVENT_DISCON: { +#if 0 + /* XXX Pass event up to OBDs stack. used only for FLD now */ + rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DISCON, NULL); +#endif break; } case IMP_EVENT_INACTIVE: { - if (obd->obd_observer) - rc = obd_notify(obd->obd_observer, obd, 0, 0); + struct client_obd *cli = &obd->u.cli; + /* + * Flush current sequence to make client obtain new one + * from server in case of disconnect/reconnect. + * If range is already empty then no need to flush it. + */ + if (cli->cl_seq != NULL && + !range_is_exhausted(&cli->cl_seq->lcs_space)) { + seq_client_flush(cli->cl_seq); + } + + rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL); break; } case IMP_EVENT_INVALIDATE: { @@ -1276,33 +1418,90 @@ static int mdc_import_event(struct obd_device *obd, struct obd_import *imp, break; } case IMP_EVENT_ACTIVE: { - if (obd->obd_observer) - rc = obd_notify(obd->obd_observer, obd, 1, 0); + rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL); break; } + case IMP_EVENT_OCD: + rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL); + break; + default: - CERROR("Unknown import event %d\n", event); + CERROR("Unknown import event %x\n", event); LBUG(); } RETURN(rc); } -static int mdc_attach(struct obd_device *dev, obd_count len, void *data) +static int mdc_fid_init(struct obd_export *exp) { - struct lprocfs_static_vars lvars; + struct client_obd *cli = &exp->exp_obd->u.cli; + char *prefix; + int rc; + ENTRY; - lprocfs_init_vars(mdc, &lvars); - return lprocfs_obd_attach(dev, lvars.obd_vars); + OBD_ALLOC_PTR(cli->cl_seq); + if (cli->cl_seq == NULL) + RETURN(-ENOMEM); + + OBD_ALLOC(prefix, MAX_OBD_NAME + 5); + if (prefix == NULL) + GOTO(out_free_seq, rc = -ENOMEM); + + snprintf(prefix, MAX_OBD_NAME + 5, "srv-%s", + exp->exp_obd->obd_name); + + /* Init client side sequence-manager */ + rc = seq_client_init(cli->cl_seq, exp, + LUSTRE_SEQ_METADATA, + prefix, NULL); + OBD_FREE(prefix, MAX_OBD_NAME + 5); + if (rc) + GOTO(out_free_seq, rc); + + RETURN(rc); +out_free_seq: + OBD_FREE_PTR(cli->cl_seq); + cli->cl_seq = NULL; + return rc; +} + +static int mdc_fid_fini(struct obd_export *exp) +{ + struct client_obd *cli = &exp->exp_obd->u.cli; + ENTRY; + + if (cli->cl_seq != NULL) { + seq_client_fini(cli->cl_seq); + OBD_FREE_PTR(cli->cl_seq); + cli->cl_seq = NULL; + } + + RETURN(0); +} + +int mdc_fid_alloc(struct obd_export *exp, struct lu_fid *fid, + struct md_op_data *op_data) +{ + struct client_obd *cli = &exp->exp_obd->u.cli; + struct lu_client_seq *seq = cli->cl_seq; + ENTRY; + RETURN(seq_client_alloc_fid(seq, fid)); } -static int mdc_detach(struct obd_device *dev) +/* XXX This method is used only to clear current fid seq + * once fld/mds insert failed */ +static int mdc_fid_delete(struct obd_export *exp, const struct lu_fid *fid) { - return lprocfs_obd_detach(dev); + struct client_obd *cli = &exp->exp_obd->u.cli; + + seq_client_flush(cli->cl_seq); + return 0; } -static int mdc_setup(struct obd_device *obd, obd_count len, void *buf) +static int mdc_setup(struct obd_device *obd, struct lustre_cfg *cfg) { struct client_obd *cli = &obd->u.cli; + struct lprocfs_static_vars lvars; int rc; ENTRY; @@ -1323,13 +1522,16 @@ static int mdc_setup(struct obd_device *obd, obd_count len, void *buf) GOTO(err_setattr_lock, rc = -ENOMEM); mdc_init_rpc_lock(cli->cl_close_lock); - rc = client_obd_setup(obd, len, buf); + rc = client_obd_setup(obd, cfg); if (rc) GOTO(err_close_lock, rc); + lprocfs_init_vars(mdc, &lvars); + lprocfs_obd_setup(obd, lvars.obd_vars); + ptlrpc_lprocfs_register_obd(obd); - rc = obd_llog_init(obd, &obd->obd_llogs, obd, 0, NULL); + rc = obd_llog_init(obd, NULL, obd, 0, NULL, NULL); if (rc) { - mdc_cleanup(obd, 0); + mdc_cleanup(obd); CERROR("failed to setup llogging subsystems\n"); } @@ -1345,7 +1547,12 @@ err_rpc_lock: RETURN(rc); } -static int mdc_init_ea_size(struct obd_export *exp, int easize, int cookiesize) +/* Initialize the default and maximum LOV EA and cookie sizes. This allows + * us to make MDS RPCs with large enough reply buffers to hold the + * maximum-sized (= maximum striped) EA and cookie without having to + * calculate this (via a call into the LOV + OSCs) each time we make an RPC. */ +int mdc_init_ea_size(struct obd_export *exp, int easize, + int def_easize, int cookiesize) { struct obd_device *obd = exp->exp_obd; struct client_obd *cli = &obd->u.cli; @@ -1353,23 +1560,47 @@ static int mdc_init_ea_size(struct obd_export *exp, int easize, int cookiesize) if (cli->cl_max_mds_easize < easize) cli->cl_max_mds_easize = easize; + + if (cli->cl_default_mds_easize < def_easize) + cli->cl_default_mds_easize = def_easize; + if (cli->cl_max_mds_cookiesize < cookiesize) cli->cl_max_mds_cookiesize = cookiesize; + RETURN(0); } -static int mdc_precleanup(struct obd_device *obd, int flags) +static int mdc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage) { int rc = 0; - - rc = obd_llog_finish(obd, &obd->obd_llogs, 0); - if (rc != 0) - CERROR("failed to cleanup llogging subsystems\n"); + ENTRY; + switch (stage) { + case OBD_CLEANUP_EARLY: + case OBD_CLEANUP_EXPORTS: + /* If we set up but never connected, the + client import will not have been cleaned. */ + if (obd->u.cli.cl_import) { + struct obd_import *imp; + imp = obd->u.cli.cl_import; + CERROR("client import never connected\n"); + ptlrpc_invalidate_import(imp); + ptlrpc_free_rq_pool(imp->imp_rq_pool); + class_destroy_import(imp); + obd->u.cli.cl_import = NULL; + } + break; + case OBD_CLEANUP_SELF_EXP: + rc = obd_llog_finish(obd, 0); + if (rc != 0) + CERROR("failed to cleanup llogging subsystems\n"); + case OBD_CLEANUP_OBD: + break; + } RETURN(rc); } -static int mdc_cleanup(struct obd_device *obd, int flags) +static int mdc_cleanup(struct obd_device *obd) { struct client_obd *cli = &obd->u.cli; @@ -1377,412 +1608,270 @@ static int mdc_cleanup(struct obd_device *obd, int flags) OBD_FREE(cli->cl_setattr_lock, sizeof (*cli->cl_setattr_lock)); OBD_FREE(cli->cl_close_lock, sizeof (*cli->cl_close_lock)); + ptlrpc_lprocfs_unregister_obd(obd); + lprocfs_obd_cleanup(obd); ptlrpcd_decref(); - return client_obd_cleanup(obd, flags); + return client_obd_cleanup(obd); } -static int mdc_llog_init(struct obd_device *obd, struct obd_llogs *llogs, - struct obd_device *tgt, int count, - struct llog_catid *logid) +static int mdc_llog_init(struct obd_device *obd, struct obd_llogs *llogs, + struct obd_device *tgt, + int count, struct llog_catid *logid, + struct obd_uuid *uuid) { struct llog_ctxt *ctxt; int rc; ENTRY; - rc = obd_llog_setup(obd, llogs, LLOG_CONFIG_REPL_CTXT, tgt, 0, NULL, - &llog_client_ops); + rc = llog_setup(obd, llogs, LLOG_CONFIG_REPL_CTXT, tgt, 0, NULL, + &llog_client_ops); + if (rc == 0) { + ctxt = llog_get_context(obd, LLOG_CONFIG_REPL_CTXT); + ctxt->loc_imp = obd->u.cli.cl_import; + } + + rc = llog_setup(obd, llogs, LLOG_LOVEA_REPL_CTXT, tgt, 0, NULL, + &llog_client_ops); if (rc == 0) { - ctxt = llog_get_context(llogs, LLOG_CONFIG_REPL_CTXT); + ctxt = llog_get_context(obd, LLOG_LOVEA_REPL_CTXT); ctxt->loc_imp = obd->u.cli.cl_import; } RETURN(rc); } -static int mdc_llog_finish(struct obd_device *obd, - struct obd_llogs *llogs, int count) +static int mdc_llog_finish(struct obd_device *obd, int count) { int rc; ENTRY; - rc = obd_llog_cleanup(llog_get_context(llogs, LLOG_CONFIG_REPL_CTXT)); + rc = llog_cleanup(llog_get_context(obd, LLOG_LOVEA_REPL_CTXT)); + if (rc) { + CERROR("can not cleanup LLOG_CONFIG_REPL_CTXT rc %d\n", rc); + } + rc = llog_cleanup(llog_get_context(obd, LLOG_CONFIG_REPL_CTXT)); RETURN(rc); } -static struct obd_device *mdc_get_real_obd(struct obd_export *exp, - struct lustre_id *id) +static int mdc_process_config(struct obd_device *obd, obd_count len, void *buf) { - ENTRY; - RETURN(exp->exp_obd); + struct lustre_cfg *lcfg = buf; + struct lprocfs_static_vars lvars; + int rc = 0; + + lprocfs_init_vars(mdc, &lvars); + + rc = class_process_proc_param(PARAM_MDC, lvars.obd_vars, lcfg, obd); + return(rc); } -static int mdc_get_info(struct obd_export *exp, __u32 keylen, - void *key, __u32 *valsize, void *val) +/* get remote permission for current user on fid */ +int mdc_get_remote_perm(struct obd_export *exp, const struct lu_fid *fid, + struct obd_capa *oc, struct ptlrpc_request **request) { struct ptlrpc_request *req; - char *bufs[1] = {key}; - int rc = 0; + struct mdt_body *body; + struct mdt_remote_perm *perm; + int size[5] = { sizeof(struct ptlrpc_body), sizeof(*body) }; + int rc; ENTRY; - - if (!valsize || !val) - RETURN(-EFAULT); - - if (keylen >= strlen("remote_flag") && !strcmp(key, "remote_flag")) { - struct obd_import *imp; - struct obd_connect_data *data; - - imp = class_exp2cliimp(exp); - if (!imp) { - LBUG(); - RETURN(-EINVAL); - } - - if (imp->imp_state != LUSTRE_IMP_FULL) { - CERROR("import state not full\n"); - RETURN(-EINVAL); - } - data = &imp->imp_connect_data; - if (data->ocd_connect_flags & OBD_CONNECT_REMOTE) { - *((int *)val) = 1; - RETURN(0); - } else if (data->ocd_connect_flags & OBD_CONNECT_LOCAL) { - *((int *)val) = 0; - RETURN(0); - } - CERROR("no remote flag set?\n"); - RETURN(-EINVAL); - } + size[REQ_REC_OFF + 1] = oc ? sizeof(struct lustre_capa) : 0; - if ((keylen < strlen("mdsize") || strcmp(key, "mdsize") != 0) && - (keylen < strlen("mdsnum") || strcmp(key, "mdsnum") != 0) && - (keylen < strlen("lovdesc") || strcmp(key, "lovdesc") != 0) && - (keylen < strlen("getext") || strcmp(key, "getext") != 0) && - (keylen < strlen("rootid") || strcmp(key, "rootid") != 0) && - (keylen < strlen("auditid") || strcmp(key, "auditid") != 0)) - RETURN(-EPROTO); - - req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION, - OST_GET_INFO, 1, (int *)&keylen, bufs); - if (req == NULL) + *request = NULL; + req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION, + MDS_GETATTR, 3, size, NULL); + if (!req) RETURN(-ENOMEM); - req->rq_replen = lustre_msg_size(1, (int *)valsize); - rc = ptlrpc_queue_wait(req); - if (rc) - GOTO(out_req, rc); + mdc_pack_req_body(req, REQ_REC_OFF, OBD_MD_FLRMTPERM, fid, oc, 0, 0); - if ((keylen >= strlen("rootid") && !strcmp(key, "rootid")) || - (keylen >= strlen("auditid") && !strcmp(key, "auditid"))) { - struct lustre_id *reply; - - reply = lustre_swab_repbuf(req, 0, sizeof(*reply), - lustre_swab_lustre_id); - if (reply == NULL) { - CERROR("Can't unpack %s\n", (char *)key); - GOTO(out_req, rc = -EPROTO); - } + size[REPLY_REC_OFF + 1] = sizeof(*perm); + ptlrpc_req_set_repsize(req, 5, size); + rc = ptlrpc_queue_wait(req); + if (rc) { + ptlrpc_req_finished(req); + RETURN(rc); + } - *(struct lustre_id *)val = *reply; - } else if (keylen >= strlen("lovdesc") && !strcmp(key, "lovdesc")) { - struct lov_desc *reply; - - reply = lustre_swab_repbuf(req, 0, sizeof(*reply), - lustre_swab_lov_desc); - if (reply == NULL) { - CERROR("Can't unpack %s\n", (char *)key); - GOTO(out_req, rc = -EPROTO); - } + body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body), + lustre_swab_mdt_body); + LASSERT(body); + LASSERT(body->valid & OBD_MD_FLRMTPERM); - *(struct lov_desc *)val = *reply; - RETURN(0); - } else if (keylen >= strlen("getext") && !strcmp(key, "getext")) { - struct fid_extent *reply; - - reply = lustre_swab_repbuf(req, 0, sizeof(*reply), - lustre_swab_fid_extent); - if (reply == NULL) { - CERROR("Can't unpack %s\n", (char *)key); - GOTO(out_req, rc = -EPROTO); - } + perm = lustre_swab_repbuf(req, REPLY_REC_OFF + 1, sizeof(*perm), + lustre_swab_mdt_remote_perm); + LASSERT(perm); - *(struct fid_extent *)val = *reply; - RETURN(0); - } else { - __u32 *reply; - - reply = lustre_swab_repbuf(req, 0, sizeof(*reply), - lustre_swab_generic_32s); - if (reply == NULL) { - CERROR("Can't unpack %s\n", (char *)key); - GOTO(out_req, rc = -EPROTO); - } - *((__u32 *)val) = *reply; - } -out_req: - ptlrpc_req_finished(req); - RETURN(rc); + *request = req; + RETURN(0); } -int mdc_obj_create(struct obd_export *exp, struct obdo *oa, - void *acl, int acl_size, struct lov_stripe_md **ea, - struct obd_trans_info *oti) +static int mdc_interpret_renew_capa(struct ptlrpc_request *req, void *unused, + int status) { - struct ptlrpc_request *request; - struct ost_body *body; - char *acl_buf; - int rc, size[2] = { sizeof(*body), acl_size }; + struct obd_capa *oc = req->rq_async_args.pointer_arg[0]; + renew_capa_cb_t cb = req->rq_async_args.pointer_arg[1]; + struct mdt_body *body = NULL; + struct lustre_capa *capa; ENTRY; - LASSERT(oa); - - request = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION, - OST_CREATE, 2, size, NULL); - if (!request) - GOTO(out_req, rc = -ENOMEM); - - body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body)); - memcpy(&body->oa, oa, sizeof(body->oa)); - - if (acl_size) { - acl_buf = lustre_msg_buf(request->rq_reqmsg, 1, acl_size); - memcpy(acl_buf, acl, acl_size); - } - - request->rq_replen = lustre_msg_size(1, size); - rc = ptlrpc_queue_wait(request); - if (rc) - GOTO(out_req, rc); - - body = lustre_swab_repbuf(request, 0, sizeof(*body), - lustre_swab_ost_body); - if (body == NULL) { - CERROR ("can't unpack ost_body\n"); - GOTO (out_req, rc = -EPROTO); - } + if (status) + GOTO(out, capa = ERR_PTR(status)); - memcpy(oa, &body->oa, sizeof(*oa)); + body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body), + lustre_swab_mdt_body); + if (body == NULL) + GOTO(out, capa = ERR_PTR(-EFAULT)); - /* store ino/generation for recovery */ - body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof (*body)); - body->oa.o_id = oa->o_id; - body->oa.o_generation = oa->o_generation; - body->oa.o_fid = oa->o_fid; - body->oa.o_mds = oa->o_mds; + if ((body->valid & OBD_MD_FLOSSCAPA) == 0) + GOTO(out, capa = ERR_PTR(-ENOENT)); - CDEBUG(D_HA, "transno: "LPD64"\n", request->rq_repmsg->transno); + capa = lustre_unpack_capa(req->rq_repmsg, REPLY_REC_OFF + 1); + if (!capa) + GOTO(out, capa = ERR_PTR(-EFAULT)); EXIT; -out_req: - ptlrpc_req_finished(request); - return rc; +out: + cb(oc, capa); + return 0; } -int mdc_brw(int rw, struct obd_export *exp, struct obdo *oa, - struct lov_stripe_md *ea, obd_count oa_bufs, - struct brw_page *pgarr, struct obd_trans_info *oti) +static int mdc_renew_capa(struct obd_export *exp, struct obd_capa *oc, + renew_capa_cb_t cb) { - struct ptlrpc_bulk_desc *desc; - struct niobuf_remote *niobuf; struct ptlrpc_request *req; - struct obd_ioobj *ioobj; - struct ost_body *body; - int err, opc, i; - int size[3]; - - opc = ((rw & OBD_BRW_WRITE) != 0) ? OST_WRITE : OST_READ; - - size[0] = sizeof(*body); - size[1] = sizeof(*ioobj); - size[2] = oa_bufs * sizeof(*niobuf); - - req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OBD_VERSION, opc, - 3, size, NULL); - LASSERT(req != NULL); - - if (opc == OST_WRITE) - desc = ptlrpc_prep_bulk_imp(req, oa_bufs, BULK_GET_SOURCE, - OST_BULK_PORTAL); - else - desc = ptlrpc_prep_bulk_imp(req, oa_bufs, BULK_PUT_SINK, - OST_BULK_PORTAL); - LASSERT(desc != NULL); - - body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*body)); - ioobj = lustre_msg_buf(req->rq_reqmsg, 1, sizeof(*ioobj)); - niobuf = lustre_msg_buf(req->rq_reqmsg, 2, oa_bufs * sizeof(*niobuf)); - - memcpy(&body->oa, oa, sizeof(*oa)); - obdo_to_ioobj(oa, ioobj); - ioobj->ioo_bufcnt = oa_bufs; - - for (i = 0; i < oa_bufs; i++, niobuf++) { - struct brw_page *pg = &pgarr[i]; - - LASSERT(pg->count > 0); - LASSERT((pg->disk_offset & ~PAGE_MASK) + pg->count <= PAGE_SIZE); + int size[5] = { sizeof(struct ptlrpc_body), + sizeof(struct mdt_body), + sizeof(struct lustre_capa) }; + ENTRY; - ptlrpc_prep_bulk_page(desc, pg->pg, pg->disk_offset & ~PAGE_MASK, - pg->count); + req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION, + MDS_GETATTR, 3, size, NULL); + if (!req) + RETURN(-ENOMEM); - niobuf->offset = pg->disk_offset; - niobuf->len = pg->count; - niobuf->flags = pg->flag; - } + mdc_pack_req_body(req, REQ_REC_OFF, OBD_MD_FLOSSCAPA, + &oc->c_capa.lc_fid, oc, 0, 0); - /* size[0] still sizeof (*body) */ - if (opc == OST_WRITE) { - /* 1 RC per niobuf */ - size[1] = sizeof(__u32) * oa_bufs; - req->rq_replen = lustre_msg_size(2, size); - } else { - /* 1 RC for the whole I/O */ - req->rq_replen = lustre_msg_size(1, size); - } - err = ptlrpc_queue_wait(req); - LASSERT(err == 0); + ptlrpc_req_set_repsize(req, 5, size); + req->rq_async_args.pointer_arg[0] = oc; + req->rq_async_args.pointer_arg[1] = cb; + req->rq_interpret_reply = mdc_interpret_renew_capa; + ptlrpcd_add_req(req); - ptlrpc_req_finished(req); - return 0; + RETURN(0); } -static int mdc_valid_attrs(struct obd_export *exp, - struct lustre_id *id) -{ - struct ldlm_res_id res_id = { .name = {0} }; - struct obd_device *obd = exp->exp_obd; - struct lustre_handle lockh; - ldlm_policy_data_t policy; - int flags; - ENTRY; - - res_id.name[0] = id_fid(id); - res_id.name[1] = id_group(id); - policy.l_inodebits.bits = MDS_INODELOCK_UPDATE; - - CDEBUG(D_INFO, "trying to match res "LPU64"\n", - res_id.name[0]); - - /* FIXME use LDLM_FL_TEST_LOCK instead */ - flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING; - if (ldlm_lock_match(obd->obd_namespace, flags, &res_id, - LDLM_IBITS, &policy, LCK_PR, &lockh)) { - ldlm_lock_decref(&lockh, LCK_PR); - RETURN(1); - } +static int mdc_connect(const struct lu_env *env, + struct lustre_handle *dlm_handle, + struct obd_device *obd, struct obd_uuid *cluuid, + struct obd_connect_data *data) { + struct obd_import *imp = obd->u.cli.cl_import; - if (ldlm_lock_match(obd->obd_namespace, flags, &res_id, - LDLM_IBITS, &policy, LCK_PW, &lockh)) { - ldlm_lock_decref(&lockh, LCK_PW); - RETURN(1); + /* mds-mds import features */ + if (data && (data->ocd_connect_flags & OBD_CONNECT_MDS_MDS)) { + spin_lock(&imp->imp_lock); + imp->imp_server_timeout = 1; + spin_unlock(&imp->imp_lock); + imp->imp_client->cli_request_portal = MDS_MDS_PORTAL; + CDEBUG(D_OTHER, "%s: Set 'mds' portal and timeout\n", + obd->obd_name); } - RETURN(0); -} -static int mdc_change_cbdata_name(struct obd_export *exp, - struct lustre_id *pid, - char *name, int len, - struct lustre_id *cid, - ldlm_iterator_t it, void *data) -{ - int rc; - rc = mdc_change_cbdata(exp, cid, it, data); - RETURN(rc); + return client_connect_import(env, dlm_handle, obd, cluuid, data); } struct obd_ops mdc_obd_ops = { - .o_owner = THIS_MODULE, - .o_attach = mdc_attach, - .o_detach = mdc_detach, - .o_setup = mdc_setup, - .o_precleanup = mdc_precleanup, - .o_cleanup = mdc_cleanup, - .o_add_conn = client_import_add_conn, - .o_del_conn = client_import_del_conn, - .o_connect = client_connect_import, - .o_disconnect = client_disconnect_export, - .o_iocontrol = mdc_iocontrol, - .o_packmd = mdc_packmd, - .o_unpackmd = mdc_unpackmd, - .o_statfs = mdc_statfs, - .o_pin = mdc_pin, - .o_unpin = mdc_unpin, - .o_import_event = mdc_import_event, - .o_llog_init = mdc_llog_init, - .o_llog_finish = mdc_llog_finish, - .o_create = mdc_obj_create, - .o_set_info = mdc_set_info, - .o_get_info = mdc_get_info, - .o_brw = mdc_brw, - .o_cancel_unused = mdc_cancel_unused, - .o_init_ea_size = mdc_init_ea_size, + .o_owner = THIS_MODULE, + .o_setup = mdc_setup, + .o_precleanup = mdc_precleanup, + .o_cleanup = mdc_cleanup, + .o_add_conn = client_import_add_conn, + .o_del_conn = client_import_del_conn, + .o_connect = mdc_connect, + .o_disconnect = client_disconnect_export, + .o_iocontrol = mdc_iocontrol, + .o_set_info_async = mdc_set_info_async, + .o_statfs = mdc_statfs, + .o_pin = mdc_pin, + .o_unpin = mdc_unpin, + .o_fid_init = mdc_fid_init, + .o_fid_fini = mdc_fid_fini, + .o_fid_alloc = mdc_fid_alloc, + .o_fid_delete = mdc_fid_delete, + .o_import_event = mdc_import_event, + .o_llog_init = mdc_llog_init, + .o_llog_finish = mdc_llog_finish, + .o_get_info = mdc_get_info, + .o_process_config = mdc_process_config, }; struct md_ops mdc_md_ops = { - .m_getstatus = mdc_getstatus, - .m_getattr = mdc_getattr, - .m_close = mdc_close, - .m_create = mdc_create, - .m_done_writing = mdc_done_writing, - .m_enqueue = mdc_enqueue, - .m_getattr_lock = mdc_getattr_lock, - .m_intent_lock = mdc_intent_lock, - .m_link = mdc_link, - .m_rename = mdc_rename, - .m_setattr = mdc_setattr, - .m_sync = mdc_sync, - .m_readpage = mdc_readpage, - .m_unlink = mdc_unlink, - .m_valid_attrs = mdc_valid_attrs, - .m_req2lustre_md = mdc_req2lustre_md, - .m_set_open_replay_data = mdc_set_open_replay_data, + .m_getstatus = mdc_getstatus, + .m_change_cbdata = mdc_change_cbdata, + .m_close = mdc_close, + .m_create = mdc_create, + .m_done_writing = mdc_done_writing, + .m_enqueue = mdc_enqueue, + .m_getattr = mdc_getattr, + .m_getattr_name = mdc_getattr_name, + .m_intent_lock = mdc_intent_lock, + .m_link = mdc_link, + .m_is_subdir = mdc_is_subdir, + .m_rename = mdc_rename, + .m_setattr = mdc_setattr, + .m_setxattr = mdc_setxattr, + .m_getxattr = mdc_getxattr, + .m_sync = mdc_sync, + .m_readpage = mdc_readpage, + .m_unlink = mdc_unlink, + .m_cancel_unused = mdc_cancel_unused, + .m_init_ea_size = mdc_init_ea_size, + .m_set_lock_data = mdc_set_lock_data, + .m_lock_match = mdc_lock_match, + .m_get_lustre_md = mdc_get_lustre_md, + .m_free_lustre_md = mdc_free_lustre_md, + .m_set_open_replay_data = mdc_set_open_replay_data, .m_clear_open_replay_data = mdc_clear_open_replay_data, - .m_store_inode_generation = mdc_store_inode_generation, - .m_set_lock_data = mdc_set_lock_data, - .m_get_real_obd = mdc_get_real_obd, - .m_change_cbdata_name = mdc_change_cbdata_name, - .m_change_cbdata = mdc_change_cbdata, - .m_access_check = mdc_access_check, + .m_get_remote_perm = mdc_get_remote_perm, + .m_renew_capa = mdc_renew_capa }; +extern quota_interface_t mdc_quota_interface; + int __init mdc_init(void) { + int rc; struct lprocfs_static_vars lvars; - lprocfs_init_vars(mdc, &lvars); - return class_register_type(&mdc_obd_ops, &mdc_md_ops, - lvars.module_vars, OBD_MDC_DEVICENAME); + + request_module("lquota"); + quota_interface = PORTAL_SYMBOL_GET(mdc_quota_interface); + init_obd_quota_ops(quota_interface, &mdc_obd_ops); + + rc = class_register_type(&mdc_obd_ops, &mdc_md_ops, lvars.module_vars, + LUSTRE_MDC_NAME, NULL); + if (rc && quota_interface) + PORTAL_SYMBOL_PUT(mdc_quota_interface); + + RETURN(rc); } #ifdef __KERNEL__ static void /*__exit*/ mdc_exit(void) { - class_unregister_type(OBD_MDC_DEVICENAME); + if (quota_interface) + PORTAL_SYMBOL_PUT(mdc_quota_interface); + + class_unregister_type(LUSTRE_MDC_NAME); } MODULE_AUTHOR("Cluster File Systems, Inc. "); MODULE_DESCRIPTION("Lustre Metadata Client"); MODULE_LICENSE("GPL"); -EXPORT_SYMBOL(mdc_req2lustre_md); -EXPORT_SYMBOL(mdc_change_cbdata); -EXPORT_SYMBOL(mdc_getstatus); -EXPORT_SYMBOL(mdc_getattr); -EXPORT_SYMBOL(mdc_getattr_lock); -EXPORT_SYMBOL(mdc_create); -EXPORT_SYMBOL(mdc_unlink); -EXPORT_SYMBOL(mdc_rename); -EXPORT_SYMBOL(mdc_link); -EXPORT_SYMBOL(mdc_readpage); -EXPORT_SYMBOL(mdc_setattr); -EXPORT_SYMBOL(mdc_close); -EXPORT_SYMBOL(mdc_done_writing); -EXPORT_SYMBOL(mdc_sync); -EXPORT_SYMBOL(mdc_set_open_replay_data); -EXPORT_SYMBOL(mdc_clear_open_replay_data); -EXPORT_SYMBOL(mdc_store_inode_generation); - module_init(mdc_init); module_exit(mdc_exit); #endif