X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fmdc%2Fmdc_request.c;h=3abdd777f11f319f52d5460315a83861cc3ef5be;hb=421d8e98f309870475aadc05aa81a60e9acd8e65;hp=bb148d75b17660ee321c12f1773227def6290231;hpb=0df86a8af6e98d885da3516691e7ebf27804a9c5;p=fs%2Flustre-release.git diff --git a/lustre/mdc/mdc_request.c b/lustre/mdc/mdc_request.c index bb148d7..3abdd77 100644 --- a/lustre/mdc/mdc_request.c +++ b/lustre/mdc/mdc_request.c @@ -1,25 +1,40 @@ /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * - * Copyright (C) 2001-2003 Cluster File Systems, Inc. + * GPL HEADER START * - * This file is part of the Lustre file system, http://www.lustre.org - * Lustre is a trademark of Cluster File Systems, Inc. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * - * You may have signed or agreed to another license before downloading - * this software. If so, you are bound by the terms and conditions - * of that agreement, and the following does not apply to you. See the - * LICENSE file included with this distribution for more information. + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 only, + * as published by the Free Software Foundation. * - * If you did not agree to a different license, then this copy of Lustre - * is open source software; you can redistribute it and/or modify it - * under the terms of version 2 of the GNU General Public License as - * published by the Free Software Foundation. + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License version 2 for more details (a copy is included + * in the LICENSE file that accompanied this code). * - * In either case, Lustre is distributed in the hope that it will be - * useful, but WITHOUT ANY WARRANTY; without even the implied warranty - * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * license text for more details. + * You should have received a copy of the GNU General Public License + * version 2 along with this program; If not, see + * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf + * + * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, + * CA 95054 USA or visit www.sun.com if you need additional information or + * have any questions. + * + * GPL HEADER END + */ +/* + * Copyright (c) 2001, 2010, Oracle and/or its affiliates. All rights reserved. + * Use is subject to license terms. + */ +/* + * Copyright (c) 2011 Whamcloud, Inc. + */ +/* + * This file is part of Lustre, http://www.lustre.org/ + * Lustre is a trademark of Sun Microsystems, Inc. */ #ifndef EXPORT_SYMTAB @@ -36,767 +51,1316 @@ # include #endif +#include #include #include +#include +#include #include #include #include "mdc_internal.h" - -static quota_interface_t *quota_interface; +#include #define REQUEST_MINOR 244 +struct mdc_renew_capa_args { + struct obd_capa *ra_oc; + renew_capa_cb_t ra_cb; +}; + static quota_interface_t *quota_interface; extern quota_interface_t mdc_quota_interface; static int mdc_cleanup(struct obd_device *obd); -extern int mds_queue_req(struct ptlrpc_request *); +int mdc_unpack_capa(struct obd_export *exp, struct ptlrpc_request *req, + const struct req_msg_field *field, struct obd_capa **oc) +{ + struct lustre_capa *capa; + struct obd_capa *c; + ENTRY; + + /* swabbed already in mdc_enqueue */ + capa = req_capsule_server_get(&req->rq_pill, field); + if (capa == NULL) + RETURN(-EPROTO); + + c = alloc_capa(CAPA_SITE_CLIENT); + if (IS_ERR(c)) { + CDEBUG(D_INFO, "alloc capa failed!\n"); + RETURN(PTR_ERR(c)); + } else { + c->c_capa = *capa; + *oc = c; + RETURN(0); + } +} + /* Helper that implements most of mdc_getstatus and signal_completed_replay. */ /* XXX this should become mdc_get_info("key"), sending MDS_GET_INFO RPC */ -static int send_getstatus(struct obd_import *imp, struct ll_fid *rootfid, - int level, int msg_flags) +static int send_getstatus(struct obd_import *imp, struct lu_fid *rootfid, + struct obd_capa **pc, int level, int msg_flags) { struct ptlrpc_request *req; - struct mds_body *body; - int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) }; + struct mdt_body *body; + int rc; ENTRY; - req = ptlrpc_prep_req(imp, LUSTRE_MDS_VERSION, MDS_GETSTATUS, 2, size, - NULL); - if (!req) - GOTO(out, rc = -ENOMEM); + req = ptlrpc_request_alloc_pack(imp, &RQF_MDS_GETSTATUS, + LUSTRE_MDS_VERSION, MDS_GETSTATUS); + if (req == NULL) + RETURN(-ENOMEM); + mdc_pack_body(req, NULL, NULL, 0, 0, -1, 0); + lustre_msg_add_flags(req->rq_reqmsg, msg_flags); req->rq_send_state = level; - ptlrpc_req_set_repsize(req, 2, size); - mdc_pack_req_body(req, REQ_REC_OFF, 0, NULL, 0, 0); - lustre_msg_add_flags(req->rq_reqmsg, msg_flags); - rc = ptlrpc_queue_wait(req); + ptlrpc_request_set_replen(req); - if (!rc) { - body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body), - lustre_swab_mds_body); - if (body == NULL) { - CERROR ("Can't extract mds_body\n"); - GOTO (out, rc = -EPROTO); - } + rc = ptlrpc_queue_wait(req); + if (rc) + GOTO(out, rc); - memcpy(rootfid, &body->fid1, sizeof(*rootfid)); + body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY); + if (body == NULL) + GOTO(out, rc = -EPROTO); - CDEBUG(D_NET, "root ino="LPU64", last_committed="LPU64 - ", last_xid="LPU64"\n", - rootfid->id, - lustre_msg_get_last_committed(req->rq_repmsg), - lustre_msg_get_last_xid(req->rq_repmsg)); + if (body->valid & OBD_MD_FLMDSCAPA) { + rc = mdc_unpack_capa(NULL, req, &RMF_CAPA1, pc); + if (rc) + GOTO(out, rc); } + *rootfid = body->fid1; + CDEBUG(D_NET, + "root fid="DFID", last_committed="LPU64"\n", + PFID(rootfid), + lustre_msg_get_last_committed(req->rq_repmsg)); EXIT; - out: +out: ptlrpc_req_finished(req); return rc; } /* This should be mdc_get_info("rootfid") */ -int mdc_getstatus(struct obd_export *exp, struct ll_fid *rootfid) +int mdc_getstatus(struct obd_export *exp, struct lu_fid *rootfid, + struct obd_capa **pc) { - return send_getstatus(class_exp2cliimp(exp), rootfid, LUSTRE_IMP_FULL, - 0); + return send_getstatus(class_exp2cliimp(exp), rootfid, pc, + LUSTRE_IMP_FULL, 0); } -static -int mdc_getattr_common(struct obd_export *exp, unsigned int ea_size, - unsigned int acl_size, struct ptlrpc_request *req) +/* + * This function now is known to always saying that it will receive 4 buffers + * from server. Even for cases when acl_size and md_size is zero, RPC header + * will contain 4 fields and RPC itself will contain zero size fields. This is + * because mdt_getattr*() _always_ returns 4 fields, but if acl is not needed + * and thus zero, it shrinks it, making zero size. The same story about + * md_size. And this is course of problem when client waits for smaller number + * of fields. This issue will be fixed later when client gets aware of RPC + * layouts. --umka + */ +static int mdc_getattr_common(struct obd_export *exp, + struct ptlrpc_request *req) { - struct mds_body *body; - void *eadata; - int size[4] = { sizeof(struct ptlrpc_body), sizeof(*body) }; - int bufcount = 2, rc; + struct req_capsule *pill = &req->rq_pill; + struct mdt_body *body; + void *eadata; + int rc; ENTRY; - /* request message already built */ - - if (ea_size != 0) { - size[bufcount++] = ea_size; - CDEBUG(D_INODE, "reserved %u bytes for MD/symlink in packet\n", - ea_size); - } - if (acl_size) { - size[bufcount++] = acl_size; - CDEBUG(D_INODE, "reserved %u bytes for ACL\n", acl_size); - } - - ptlrpc_req_set_repsize(req, bufcount, size); - + /* Request message already built. */ rc = ptlrpc_queue_wait(req); if (rc != 0) - RETURN (rc); + RETURN(rc); - body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body), - lustre_swab_mds_body); - if (body == NULL) { - CERROR ("Can't unpack mds_body\n"); - RETURN (-EPROTO); - } + /* sanity check for the reply */ + body = req_capsule_server_get(pill, &RMF_MDT_BODY); + if (body == NULL) + RETURN(-EPROTO); CDEBUG(D_NET, "mode: %o\n", body->mode); - LASSERT_REPSWAB(req, REPLY_REC_OFF + 1); if (body->eadatasize != 0) { - /* reply indicates presence of eadata; check it's there... */ - eadata = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF + 1, - body->eadatasize); - if (eadata == NULL) { - CERROR ("Missing/short eadata\n"); - RETURN (-EPROTO); - } + mdc_update_max_ea_from_body(exp, body); + + eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD, + body->eadatasize); + if (eadata == NULL) + RETURN(-EPROTO); + } + + if (body->valid & OBD_MD_FLRMTPERM) { + struct mdt_remote_perm *perm; + + LASSERT(client_is_remote(exp)); + perm = req_capsule_server_swab_get(pill, &RMF_ACL, + lustre_swab_mdt_remote_perm); + if (perm == NULL) + RETURN(-EPROTO); } - - if (body->valid & OBD_MD_FLMODEASIZE) { - if (exp->exp_obd->u.cli.cl_max_mds_easize < body->max_mdsize) - exp->exp_obd->u.cli.cl_max_mds_easize = - body->max_mdsize; - if (exp->exp_obd->u.cli.cl_max_mds_cookiesize < - body->max_cookiesize) - exp->exp_obd->u.cli.cl_max_mds_cookiesize = - body->max_cookiesize; + + if (body->valid & OBD_MD_FLMDSCAPA) { + struct lustre_capa *capa; + capa = req_capsule_server_get(pill, &RMF_CAPA1); + if (capa == NULL) + RETURN(-EPROTO); } - RETURN (0); + RETURN(0); } -int mdc_getattr(struct obd_export *exp, struct ll_fid *fid, - obd_valid valid, unsigned int ea_size, +int mdc_getattr(struct obd_export *exp, struct md_op_data *op_data, struct ptlrpc_request **request) { struct ptlrpc_request *req; - int size[2] = { sizeof(struct ptlrpc_body), sizeof(struct mds_body) }; - int acl_size = 0, rc; + int rc; ENTRY; - /* XXX do we need to make another request here? We just did a getattr - * to do the lookup in the first place. - */ - req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION, - MDS_GETATTR, 2, size, NULL); - if (!req) - GOTO(out, rc = -ENOMEM); + *request = NULL; + req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_GETATTR); + if (req == NULL) + RETURN(-ENOMEM); - mdc_pack_req_body(req, REQ_REC_OFF, valid, fid, ea_size, - MDS_BFLAG_EXT_FLAGS/*request "new" flags(bug 9486)*/); + mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1); + + rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_GETATTR); + if (rc) { + ptlrpc_request_free(req); + RETURN(rc); + } - /* currently only root inode will call us with FLACL */ - if (valid & OBD_MD_FLACL) - acl_size = LUSTRE_POSIX_ACL_MAX_SIZE; + mdc_pack_body(req, &op_data->op_fid1, op_data->op_capa1, + op_data->op_valid, op_data->op_mode, -1, 0); - rc = mdc_getattr_common(exp, ea_size, acl_size, req); - if (rc != 0) { - ptlrpc_req_finished (req); - req = NULL; + req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, + op_data->op_mode); + if (op_data->op_valid & OBD_MD_FLRMTPERM) { + LASSERT(client_is_remote(exp)); + req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, + sizeof(struct mdt_remote_perm)); } - out: - *request = req; - RETURN (rc); + ptlrpc_request_set_replen(req); + + rc = mdc_getattr_common(exp, req); + if (rc) + ptlrpc_req_finished(req); + else + *request = req; + RETURN(rc); } -int mdc_getattr_name(struct obd_export *exp, struct ll_fid *fid, - const char *filename, int namelen, unsigned long valid, - unsigned int ea_size, struct ptlrpc_request **request) +int mdc_getattr_name(struct obd_export *exp, struct md_op_data *op_data, + struct ptlrpc_request **request) { struct ptlrpc_request *req; - struct mds_body *body; - int rc, size[3] = { sizeof(struct ptlrpc_body), sizeof(*body), namelen}; + int rc; ENTRY; - req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION, - MDS_GETATTR_NAME, 3, size, NULL); - if (!req) - GOTO(out, rc = -ENOMEM); + *request = NULL; + req = ptlrpc_request_alloc(class_exp2cliimp(exp), + &RQF_MDS_GETATTR_NAME); + if (req == NULL) + RETURN(-ENOMEM); - mdc_pack_req_body(req, REQ_REC_OFF, valid, fid, ea_size, - MDS_BFLAG_EXT_FLAGS/*request "new" flags(bug 9486)*/); - - LASSERT(strnlen(filename, namelen) == namelen - 1); - memcpy(lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, namelen), - filename, namelen); + mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1); + req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT, + op_data->op_namelen + 1); - rc = mdc_getattr_common(exp, ea_size, 0, req); - if (rc != 0) { - ptlrpc_req_finished (req); - req = NULL; + rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_GETATTR_NAME); + if (rc) { + ptlrpc_request_free(req); + RETURN(rc); } - out: - *request = req; + + mdc_pack_body(req, &op_data->op_fid1, op_data->op_capa1, + op_data->op_valid, op_data->op_mode, + op_data->op_suppgids[0], 0); + + if (op_data->op_name) { + char *name = req_capsule_client_get(&req->rq_pill, &RMF_NAME); + LASSERT(strnlen(op_data->op_name, op_data->op_namelen) == + op_data->op_namelen); + memcpy(name, op_data->op_name, op_data->op_namelen); + } + + req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, + op_data->op_mode); + ptlrpc_request_set_replen(req); + + rc = mdc_getattr_common(exp, req); + if (rc) + ptlrpc_req_finished(req); + else + *request = req; RETURN(rc); } -static -int mdc_xattr_common(struct obd_export *exp, struct ll_fid *fid, - int opcode, obd_valid valid, const char *xattr_name, - const char *input, int input_size, int output_size, - int flags, struct ptlrpc_request **request) +static int mdc_is_subdir(struct obd_export *exp, + const struct lu_fid *pfid, + const struct lu_fid *cfid, + struct ptlrpc_request **request) +{ + struct ptlrpc_request *req; + int rc; + + ENTRY; + + *request = NULL; + req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp), + &RQF_MDS_IS_SUBDIR, LUSTRE_MDS_VERSION, + MDS_IS_SUBDIR); + if (req == NULL) + RETURN(-ENOMEM); + + mdc_is_subdir_pack(req, pfid, cfid, 0); + ptlrpc_request_set_replen(req); + + rc = ptlrpc_queue_wait(req); + if (rc && rc != -EREMOTE) + ptlrpc_req_finished(req); + else + *request = req; + RETURN(rc); +} + +static int mdc_xattr_common(struct obd_export *exp,const struct req_format *fmt, + const struct lu_fid *fid, + struct obd_capa *oc, int opcode, obd_valid valid, + const char *xattr_name, const char *input, + int input_size, int output_size, int flags, + __u32 suppgid, struct ptlrpc_request **request) { struct ptlrpc_request *req; - int size[4] = { sizeof(struct ptlrpc_body), sizeof(struct mds_body) }; - // int size[3] = {sizeof(struct mds_body)}, bufcnt = 1; - int rc, xattr_namelen = 0, bufcnt = 2, offset; - void *tmp; + int xattr_namelen = 0; + char *tmp; + int rc; ENTRY; + *request = NULL; + req = ptlrpc_request_alloc(class_exp2cliimp(exp), fmt); + if (req == NULL) + RETURN(-ENOMEM); + + mdc_set_capa_size(req, &RMF_CAPA1, oc); if (xattr_name) { xattr_namelen = strlen(xattr_name) + 1; - size[bufcnt++] = xattr_namelen; + req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT, + xattr_namelen); } if (input_size) { LASSERT(input); - size[bufcnt++] = input_size; + req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, + input_size); } - req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION, - opcode, bufcnt, size, NULL); - if (!req) - GOTO(out, rc = -ENOMEM); - - /* request data */ - mdc_pack_req_body(req, REQ_REC_OFF, valid, fid, output_size, flags); + rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, opcode); + if (rc) { + ptlrpc_request_free(req); + RETURN(rc); + } - offset = REQ_REC_OFF + 1; + if (opcode == MDS_REINT) { + struct mdt_rec_setxattr *rec; + + CLASSERT(sizeof(struct mdt_rec_setxattr) == + sizeof(struct mdt_rec_reint)); + rec = req_capsule_client_get(&req->rq_pill, &RMF_REC_REINT); + rec->sx_opcode = REINT_SETXATTR; + /* TODO: + * cfs_curproc_fs{u,g}id() should replace + * current->fs{u,g}id for portability. + */ + rec->sx_fsuid = cfs_curproc_fsuid(); + rec->sx_fsgid = cfs_curproc_fsgid(); + rec->sx_cap = cfs_curproc_cap_pack(); + rec->sx_suppgid1 = suppgid; + rec->sx_suppgid2 = -1; + rec->sx_fid = *fid; + rec->sx_valid = valid | OBD_MD_FLCTIME; + rec->sx_time = cfs_time_current_sec(); + rec->sx_size = output_size; + rec->sx_flags = flags; + + mdc_pack_capa(req, &RMF_CAPA1, oc); + } else { + mdc_pack_body(req, fid, oc, valid, output_size, suppgid, flags); + } if (xattr_name) { - tmp = lustre_msg_buf(req->rq_reqmsg, offset++, xattr_namelen); + tmp = req_capsule_client_get(&req->rq_pill, &RMF_NAME); memcpy(tmp, xattr_name, xattr_namelen); } if (input_size) { - tmp = lustre_msg_buf(req->rq_reqmsg, offset++, input_size); + tmp = req_capsule_client_get(&req->rq_pill, &RMF_EADATA); memcpy(tmp, input, input_size); } - /* reply buffers */ - if (opcode == MDS_GETXATTR) { - size[REPLY_REC_OFF] = sizeof(struct mds_body); - bufcnt = 2; - } else { - bufcnt = 1; - } - - /* we do this even output_size is 0, because server is doing that */ - size[bufcnt++] = output_size; - - ptlrpc_req_set_repsize(req, bufcnt, size); + if (req_capsule_has_field(&req->rq_pill, &RMF_EADATA, RCL_SERVER)) + req_capsule_set_size(&req->rq_pill, &RMF_EADATA, + RCL_SERVER, output_size); + ptlrpc_request_set_replen(req); /* make rpc */ - if (opcode == MDS_SETXATTR) + if (opcode == MDS_REINT) mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL); rc = ptlrpc_queue_wait(req); - if (opcode == MDS_SETXATTR) + if (opcode == MDS_REINT) mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL); - if (rc != 0) - GOTO(err_out, rc); - - if (opcode == MDS_GETXATTR) { - struct mds_body * body = lustre_swab_repbuf(req, REPLY_REC_OFF, - sizeof(*body), - lustre_swab_mds_body); - if (body == NULL) { - CERROR ("Can't unpack mds_body\n"); - GOTO(err_out, rc = -EPROTO); - } - } -out: - *request = req; - RETURN (rc); -err_out: - ptlrpc_req_finished(req); - req = NULL; - goto out; -} - -int mdc_setxattr(struct obd_export *exp, struct ll_fid *fid, - obd_valid valid, const char *xattr_name, - const char *input, int input_size, - int output_size, int flags, - struct ptlrpc_request **request) -{ - return mdc_xattr_common(exp, fid, MDS_SETXATTR, valid, xattr_name, - input, input_size, output_size, flags, request); + if (rc) + ptlrpc_req_finished(req); + else + *request = req; + RETURN(rc); } -int mdc_getxattr(struct obd_export *exp, struct ll_fid *fid, - obd_valid valid, const char *xattr_name, - const char *input, int input_size, - int output_size, struct ptlrpc_request **request) +int mdc_setxattr(struct obd_export *exp, const struct lu_fid *fid, + struct obd_capa *oc, obd_valid valid, const char *xattr_name, + const char *input, int input_size, int output_size, + int flags, __u32 suppgid, struct ptlrpc_request **request) { - return mdc_xattr_common(exp, fid, MDS_GETXATTR, valid, xattr_name, - input, input_size, output_size, 0, request); + return mdc_xattr_common(exp, &RQF_MDS_REINT_SETXATTR, + fid, oc, MDS_REINT, valid, xattr_name, + input, input_size, output_size, flags, + suppgid, request); } -/* This should be called with both the request and the reply still packed. */ -void mdc_store_inode_generation(struct ptlrpc_request *req, int reqoff, - int repoff) +int mdc_getxattr(struct obd_export *exp, const struct lu_fid *fid, + struct obd_capa *oc, obd_valid valid, const char *xattr_name, + const char *input, int input_size, int output_size, + int flags, struct ptlrpc_request **request) { - struct mds_rec_create *rec = lustre_msg_buf(req->rq_reqmsg, reqoff, - sizeof(*rec)); - struct mds_body *body = lustre_msg_buf(req->rq_repmsg, repoff, - sizeof(*body)); - - LASSERT (rec != NULL); - LASSERT (body != NULL); - - memcpy(&rec->cr_replayfid, &body->fid1, sizeof rec->cr_replayfid); - if (body->fid1.id == 0) { - DEBUG_REQ(D_ERROR, req, "saving replay request with id = 0 " - "gen = %u", body->fid1.generation); - LBUG(); - } - - DEBUG_REQ(D_HA, req, "storing generation %u for ino "LPU64, - rec->cr_replayfid.generation, rec->cr_replayfid.id); + return mdc_xattr_common(exp, &RQF_MDS_GETXATTR, + fid, oc, MDS_GETXATTR, valid, xattr_name, + input, input_size, output_size, flags, + -1, request); } #ifdef CONFIG_FS_POSIX_ACL -static -int mdc_unpack_acl(struct obd_export *exp, struct ptlrpc_request *req, - struct lustre_md *md, unsigned int offset) +static int mdc_unpack_acl(struct ptlrpc_request *req, struct lustre_md *md) { - struct mds_body *body = md->body; - struct posix_acl *acl; - void *buf; - int rc; + struct req_capsule *pill = &req->rq_pill; + struct mdt_body *body = md->body; + struct posix_acl *acl; + void *buf; + int rc; + ENTRY; if (!body->aclsize) - return 0; + RETURN(0); - buf = lustre_msg_buf(req->rq_repmsg, offset, body->aclsize); - if (!buf) { - CERROR("aclsize %u, bufcount %u, bufsize %u\n", - body->aclsize, lustre_msg_bufcount(req->rq_repmsg), - (lustre_msg_bufcount(req->rq_repmsg) <= offset) ? - -1 : lustre_msg_buflen(req->rq_repmsg, offset)); - return -EPROTO; - } + buf = req_capsule_server_sized_get(pill, &RMF_ACL, body->aclsize); + + if (!buf) + RETURN(-EPROTO); acl = posix_acl_from_xattr(buf, body->aclsize); if (IS_ERR(acl)) { rc = PTR_ERR(acl); CERROR("convert xattr to acl: %d\n", rc); - return rc; + RETURN(rc); } rc = posix_acl_valid(acl); if (rc) { CERROR("validate acl: %d\n", rc); posix_acl_release(acl); - return rc; + RETURN(rc); } md->posix_acl = acl; - return 0; + RETURN(0); } #else -#define mdc_unpack_acl(exp, req, md, offset) 0 +#define mdc_unpack_acl(req, md) 0 #endif -int mdc_req2lustre_md(struct ptlrpc_request *req, int offset, - struct obd_export *exp, +int mdc_get_lustre_md(struct obd_export *exp, struct ptlrpc_request *req, + struct obd_export *dt_exp, struct obd_export *md_exp, struct lustre_md *md) { - int rc = 0; + struct req_capsule *pill = &req->rq_pill; + int rc; ENTRY; LASSERT(md); memset(md, 0, sizeof(*md)); - md->body = lustre_msg_buf(req->rq_repmsg, offset, sizeof (*md->body)); - LASSERT (md->body != NULL); - LASSERT_REPSWABBED(req, offset); - offset++; + md->body = req_capsule_server_get(pill, &RMF_MDT_BODY); + LASSERT(md->body != NULL); if (md->body->valid & OBD_MD_FLEASIZE) { int lmmsize; struct lov_mds_md *lmm; - LASSERT(S_ISREG(md->body->mode)); + if (!S_ISREG(md->body->mode)) { + CDEBUG(D_INFO, "OBD_MD_FLEASIZE set, should be a " + "regular file, but is not\n"); + GOTO(out, rc = -EPROTO); + } if (md->body->eadatasize == 0) { - CERROR ("OBD_MD_FLEASIZE set, but eadatasize 0\n"); - RETURN(-EPROTO); + CDEBUG(D_INFO, "OBD_MD_FLEASIZE set, " + "but eadatasize 0\n"); + GOTO(out, rc = -EPROTO); } lmmsize = md->body->eadatasize; - lmm = lustre_msg_buf(req->rq_repmsg, offset, lmmsize); - LASSERT (lmm != NULL); - LASSERT_REPSWABBED(req, offset); + lmm = req_capsule_server_sized_get(pill, &RMF_MDT_MD, lmmsize); + if (!lmm) + GOTO(out, rc = -EPROTO); - rc = obd_unpackmd(exp, &md->lsm, lmm, lmmsize); + rc = obd_unpackmd(dt_exp, &md->lsm, lmm, lmmsize); if (rc < 0) - RETURN(rc); + GOTO(out, rc); + + if (rc < sizeof(*md->lsm)) { + CDEBUG(D_INFO, "lsm size too small: " + "rc < sizeof (*md->lsm) (%d < %d)\n", + rc, (int)sizeof(*md->lsm)); + GOTO(out, rc = -EPROTO); + } - LASSERT (rc >= sizeof (*md->lsm)); - rc = 0; + } else if (md->body->valid & OBD_MD_FLDIREA) { + int lmvsize; + struct lov_mds_md *lmv; + + if(!S_ISDIR(md->body->mode)) { + CDEBUG(D_INFO, "OBD_MD_FLDIREA set, should be a " + "directory, but is not\n"); + GOTO(out, rc = -EPROTO); + } - offset++; + if (md->body->eadatasize == 0) { + CDEBUG(D_INFO, "OBD_MD_FLDIREA is set, " + "but eadatasize 0\n"); + RETURN(-EPROTO); + } + if (md->body->valid & OBD_MD_MEA) { + lmvsize = md->body->eadatasize; + lmv = req_capsule_server_sized_get(pill, &RMF_MDT_MD, + lmvsize); + if (!lmv) + GOTO(out, rc = -EPROTO); + + rc = obd_unpackmd(md_exp, (void *)&md->mea, lmv, + lmvsize); + if (rc < 0) + GOTO(out, rc); + + if (rc < sizeof(*md->mea)) { + CDEBUG(D_INFO, "size too small: " + "rc < sizeof(*md->mea) (%d < %d)\n", + rc, (int)sizeof(*md->mea)); + GOTO(out, rc = -EPROTO); + } + } + } + rc = 0; + + if (md->body->valid & OBD_MD_FLRMTPERM) { + /* remote permission */ + LASSERT(client_is_remote(exp)); + md->remote_perm = req_capsule_server_swab_get(pill, &RMF_ACL, + lustre_swab_mdt_remote_perm); + if (!md->remote_perm) + GOTO(out, rc = -EPROTO); + } + else if (md->body->valid & OBD_MD_FLACL) { + /* for ACL, it's possible that FLACL is set but aclsize is zero. + * only when aclsize != 0 there's an actual segment for ACL + * in reply buffer. + */ + if (md->body->aclsize) { + rc = mdc_unpack_acl(req, md); + if (rc) + GOTO(out, rc); +#ifdef CONFIG_FS_POSIX_ACL + } else { + md->posix_acl = NULL; +#endif + } } + if (md->body->valid & OBD_MD_FLMDSCAPA) { + struct obd_capa *oc = NULL; - /* for ACL, it's possible that FLACL is set but aclsize is zero. - * only when aclsize != 0 there's an actual segment for ACL in - * reply buffer. - */ - if ((md->body->valid & OBD_MD_FLACL) && md->body->aclsize) { - rc = mdc_unpack_acl(exp, req, md, offset); + rc = mdc_unpack_capa(NULL, req, &RMF_CAPA1, &oc); if (rc) - GOTO(err_out, rc); - offset++; + GOTO(out, rc); + md->mds_capa = oc; } -out: - RETURN(rc); -err_out: - if (md->lsm) - obd_free_memmd(exp, &md->lsm); - goto out; -} + if (md->body->valid & OBD_MD_FLOSSCAPA) { + struct obd_capa *oc = NULL; -void mdc_free_lustre_md(struct obd_export *exp, struct lustre_md *md) -{ - if (md->lsm) - obd_free_memmd(exp, &md->lsm); + rc = mdc_unpack_capa(NULL, req, &RMF_CAPA2, &oc); + if (rc) + GOTO(out, rc); + md->oss_capa = oc; + } + EXIT; +out: + if (rc) { + if (md->oss_capa) { + capa_put(md->oss_capa); + md->oss_capa = NULL; + } + if (md->mds_capa) { + capa_put(md->mds_capa); + md->mds_capa = NULL; + } #ifdef CONFIG_FS_POSIX_ACL - if (md->posix_acl) { posix_acl_release(md->posix_acl); - md->posix_acl = NULL; - } #endif + if (md->lsm) + obd_free_memmd(dt_exp, &md->lsm); + } + return rc; } -static void mdc_commit_open(struct ptlrpc_request *req) +int mdc_free_lustre_md(struct obd_export *exp, struct lustre_md *md) { - struct mdc_open_data *mod = req->rq_cb_data; - if (mod == NULL) - return; - - if (mod->mod_close_req != NULL) - mod->mod_close_req->rq_cb_data = NULL; - - if (mod->mod_och != NULL) - mod->mod_och->och_mod = NULL; - - OBD_FREE(mod, sizeof(*mod)); - req->rq_cb_data = NULL; + ENTRY; + RETURN(0); } -static void mdc_replay_open(struct ptlrpc_request *req) +/** + * Handles both OPEN and SETATTR RPCs for OPEN-CLOSE and SETATTR-DONE_WRITING + * RPC chains. + */ +void mdc_replay_open(struct ptlrpc_request *req) { - struct mdc_open_data *mod = req->rq_cb_data; - struct obd_client_handle *och; + struct md_open_data *mod = req->rq_cb_data; struct ptlrpc_request *close_req; + struct obd_client_handle *och; struct lustre_handle old; - struct mds_body *body; + struct mdt_body *body; ENTRY; - body = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF, sizeof(*body), - lustre_swab_mds_body); - LASSERT (body != NULL); - if (mod == NULL) { DEBUG_REQ(D_ERROR, req, - "can't properly replay without open data"); + "Can't properly replay without open data."); EXIT; return; } + body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY); + LASSERT(body != NULL); + och = mod->mod_och; if (och != NULL) { struct lustre_handle *file_fh; + LASSERT(och->och_magic == OBD_CLIENT_HANDLE_MAGIC); + file_fh = &och->och_fh; CDEBUG(D_HA, "updating handle from "LPX64" to "LPX64"\n", file_fh->cookie, body->handle.cookie); - memcpy(&old, file_fh, sizeof(old)); - memcpy(file_fh, &body->handle, sizeof(*file_fh)); + old = *file_fh; + *file_fh = body->handle; } - close_req = mod->mod_close_req; if (close_req != NULL) { - struct mds_body *close_body; - LASSERT(lustre_msg_get_opc(close_req->rq_reqmsg) == MDS_CLOSE); - close_body = lustre_msg_buf(close_req->rq_reqmsg, REQ_REC_OFF, - sizeof(*close_body)); + __u32 opc = lustre_msg_get_opc(close_req->rq_reqmsg); + struct mdt_ioepoch *epoch; + + LASSERT(opc == MDS_CLOSE || opc == MDS_DONE_WRITING); + epoch = req_capsule_client_get(&close_req->rq_pill, + &RMF_MDT_EPOCH); + LASSERT(epoch); + if (och != NULL) - LASSERT(!memcmp(&old, &close_body->handle, sizeof old)); + LASSERT(!memcmp(&old, &epoch->handle, sizeof(old))); DEBUG_REQ(D_HA, close_req, "updating close body with new fh"); - memcpy(&close_body->handle, &body->handle, - sizeof(close_body->handle)); + epoch->handle = body->handle; } - EXIT; } -void mdc_set_open_replay_data(struct obd_client_handle *och, - struct ptlrpc_request *open_req) +void mdc_commit_open(struct ptlrpc_request *req) +{ + struct md_open_data *mod = req->rq_cb_data; + if (mod == NULL) + return; + + /** + * No need to touch md_open_data::mod_och, it holds a reference on + * \var mod and will zero references to each other, \var mod will be + * freed after that when md_open_data::mod_och will put the reference. + */ + + /** + * Do not let open request to disappear as it still may be needed + * for close rpc to happen (it may happen on evict only, otherwise + * ptlrpc_request::rq_replay does not let mdc_commit_open() to be + * called), just mark this rpc as committed to distinguish these 2 + * cases, see mdc_close() for details. The open request reference will + * be put along with freeing \var mod. + */ + ptlrpc_request_addref(req); + cfs_spin_lock(&req->rq_lock); + req->rq_committed = 1; + cfs_spin_unlock(&req->rq_lock); + req->rq_cb_data = NULL; + obd_mod_put(mod); +} + +int mdc_set_open_replay_data(struct obd_export *exp, + struct obd_client_handle *och, + struct ptlrpc_request *open_req) { - struct mdc_open_data *mod; - struct mds_rec_create *rec = lustre_msg_buf(open_req->rq_reqmsg, - DLM_INTENT_REC_OFF, - sizeof(*rec)); - struct mds_body *body = lustre_msg_buf(open_req->rq_repmsg, - DLM_REPLY_REC_OFF, - sizeof(*body)); - - /* incoming message in my byte order (it's been swabbed) */ + struct md_open_data *mod; + struct mdt_rec_create *rec; + struct mdt_body *body; + struct obd_import *imp = open_req->rq_import; + ENTRY; + + if (!open_req->rq_replay) + RETURN(0); + + rec = req_capsule_client_get(&open_req->rq_pill, &RMF_REC_REINT); + body = req_capsule_server_get(&open_req->rq_pill, &RMF_MDT_BODY); LASSERT(rec != NULL); - LASSERT_REPSWABBED(open_req, DLM_REPLY_REC_OFF); - /* outgoing messages always in my byte order */ + /* Incoming message in my byte order (it's been swabbed). */ + /* Outgoing messages always in my byte order. */ LASSERT(body != NULL); - if (och) { - OBD_ALLOC(mod, sizeof(*mod)); + /* Only if the import is replayable, we set replay_open data */ + if (och && imp->imp_replayable) { + mod = obd_mod_alloc(); if (mod == NULL) { - DEBUG_REQ(D_ERROR, open_req, "can't allocate mdc_open_data"); - return; + DEBUG_REQ(D_ERROR, open_req, + "Can't allocate md_open_data"); + RETURN(0); } - spin_lock(&open_req->rq_lock); - if (!open_req->rq_replay) { - OBD_FREE(mod, sizeof(*mod)); - spin_unlock(&open_req->rq_lock); - return; - } + /** + * Take a reference on \var mod, to be freed on mdc_close(). + * It protects \var mod from being freed on eviction (commit + * callback is called despite rq_replay flag). + * Another reference for \var och. + */ + obd_mod_get(mod); + obd_mod_get(mod); + cfs_spin_lock(&open_req->rq_lock); och->och_mod = mod; mod->mod_och = och; mod->mod_open_req = open_req; open_req->rq_cb_data = mod; open_req->rq_commit_cb = mdc_commit_open; - spin_unlock(&open_req->rq_lock); + cfs_spin_unlock(&open_req->rq_lock); } - memcpy(&rec->cr_replayfid, &body->fid1, sizeof rec->cr_replayfid); + rec->cr_fid2 = body->fid1; + rec->cr_ioepoch = body->ioepoch; + rec->cr_old_handle.cookie = body->handle.cookie; open_req->rq_replay_cb = mdc_replay_open; - if (body->fid1.id == 0) { - DEBUG_REQ(D_ERROR, open_req, "saving replay request with " - "id = 0 gen = %u", body->fid1.generation); + if (!fid_is_sane(&body->fid1)) { + DEBUG_REQ(D_ERROR, open_req, "Saving replay request with " + "insane fid"); LBUG(); } - DEBUG_REQ(D_HA, open_req, "set up replay data"); + DEBUG_REQ(D_RPCTRACE, open_req, "Set up open replay data"); + RETURN(0); } -void mdc_clear_open_replay_data(struct obd_client_handle *och) +int mdc_clear_open_replay_data(struct obd_export *exp, + struct obd_client_handle *och) { - struct mdc_open_data *mod = och->och_mod; + struct md_open_data *mod = och->och_mod; + ENTRY; + + /** + * It is possible to not have \var mod in a case of eviction between + * lookup and ll_file_open(). + **/ + if (mod == NULL) + RETURN(0); - /* Don't free the structure now (it happens in mdc_commit_open, after - * we're sure we won't need to fix up the close request in the future), - * but make sure that replay doesn't poke at the och, which is about to - * be freed. */ LASSERT(mod != LP_POISON); - if (mod != NULL) - mod->mod_och = NULL; + + mod->mod_och = NULL; och->och_mod = NULL; + obd_mod_put(mod); + + RETURN(0); } -static void mdc_commit_close(struct ptlrpc_request *req) -{ - struct mdc_open_data *mod = req->rq_cb_data; - struct ptlrpc_request *open_req; - struct obd_import *imp = req->rq_import; +/* Prepares the request for the replay by the given reply */ +static void mdc_close_handle_reply(struct ptlrpc_request *req, + struct md_op_data *op_data, int rc) { + struct mdt_body *repbody; + struct mdt_ioepoch *epoch; - DEBUG_REQ(D_HA, req, "close req committed"); - if (mod == NULL) - return; + if (req && rc == -EAGAIN) { + repbody = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY); + epoch = req_capsule_client_get(&req->rq_pill, &RMF_MDT_EPOCH); - mod->mod_close_req = NULL; - req->rq_cb_data = NULL; - req->rq_commit_cb = NULL; - - open_req = mod->mod_open_req; - LASSERT(open_req != NULL); - LASSERT(open_req != LP_POISON); - LASSERT(open_req->rq_type != LI_POISON); - - DEBUG_REQ(D_HA, open_req, "open req balanced"); - LASSERT(open_req->rq_transno != 0); - LASSERT(open_req->rq_import == imp); - - /* We no longer want to preserve this for transno-unconditional - * replay. */ - spin_lock(&open_req->rq_lock); - open_req->rq_replay = 0; - spin_unlock(&open_req->rq_lock); + epoch->flags |= MF_SOM_AU; + if (repbody->valid & OBD_MD_FLGETATTRLOCK) + op_data->op_flags |= MF_GETATTR_LOCK; + } } -int mdc_close(struct obd_export *exp, struct obdo *oa, - struct obd_client_handle *och, struct ptlrpc_request **request) +int mdc_close(struct obd_export *exp, struct md_op_data *op_data, + struct md_open_data *mod, struct ptlrpc_request **request) { - struct obd_device *obd = class_exp2obd(exp); - int reqsize[2] = { sizeof(struct ptlrpc_body), - sizeof(struct mds_body) }; - int rc, repsize[4] = { sizeof(struct ptlrpc_body), - sizeof(struct mds_body), - obd->u.cli.cl_max_mds_easize, - obd->u.cli.cl_max_mds_cookiesize }; + struct obd_device *obd = class_exp2obd(exp); struct ptlrpc_request *req; - struct mdc_open_data *mod; + int rc; ENTRY; - req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION, - MDS_CLOSE, 2, reqsize, NULL); + *request = NULL; + req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_CLOSE); if (req == NULL) - GOTO(out, rc = -ENOMEM); + RETURN(-ENOMEM); + + mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1); + + rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_CLOSE); + if (rc) { + ptlrpc_request_free(req); + RETURN(rc); + } /* To avoid a livelock (bug 7034), we need to send CLOSE RPCs to a * portal whose threads are not taking any DLM locks and are therefore * always progressing */ - /* XXX FIXME bug 249 */ req->rq_request_portal = MDS_READPAGE_PORTAL; + ptlrpc_at_set_req_timeout(req); /* Ensure that this close's handle is fixed up during replay. */ - LASSERT(och != NULL); - LASSERT(och->och_magic == OBD_CLIENT_HANDLE_MAGIC); - mod = och->och_mod; if (likely(mod != NULL)) { - if (mod->mod_open_req->rq_type == LI_POISON) { - CERROR("LBUG POISONED open %p!\n", mod->mod_open_req); - LBUG(); - ptlrpc_req_finished(req); - req = NULL; - GOTO(out, rc = -EIO); - } + LASSERTF(mod->mod_open_req != NULL && + mod->mod_open_req->rq_type != LI_POISON, + "POISONED open %p!\n", mod->mod_open_req); + mod->mod_close_req = req; + DEBUG_REQ(D_HA, mod->mod_open_req, "matched open"); + /* We no longer want to preserve this open for replay even + * though the open was committed. b=3632, b=3633 */ + cfs_spin_lock(&mod->mod_open_req->rq_lock); + mod->mod_open_req->rq_replay = 0; + cfs_spin_unlock(&mod->mod_open_req->rq_lock); } else { - CDEBUG(D_HA, "couldn't find open req; expecting close error\n"); + CDEBUG(D_HA, "couldn't find open req; expecting close error\n"); } - mdc_close_pack(req, REQ_REC_OFF, oa, oa->o_valid, och); + mdc_close_pack(req, op_data); - ptlrpc_req_set_repsize(req, 4, repsize); - req->rq_commit_cb = mdc_commit_close; - LASSERT(req->rq_cb_data == NULL); - req->rq_cb_data = mod; + req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, + obd->u.cli.cl_max_mds_easize); + req_capsule_set_size(&req->rq_pill, &RMF_LOGCOOKIES, RCL_SERVER, + obd->u.cli.cl_max_mds_cookiesize); + + ptlrpc_request_set_replen(req); mdc_get_rpc_lock(obd->u.cli.cl_close_lock, NULL); rc = ptlrpc_queue_wait(req); mdc_put_rpc_lock(obd->u.cli.cl_close_lock, NULL); if (req->rq_repmsg == NULL) { - CDEBUG(D_HA, "request failed to send: %p, %d\n", req, + CDEBUG(D_RPCTRACE, "request failed to send: %p, %d\n", req, req->rq_status); if (rc == 0) - rc = req->rq_status ? req->rq_status : -EIO; - } else if (rc == 0) { + rc = req->rq_status ?: -EIO; + } else if (rc == 0 || rc == -EAGAIN) { + struct mdt_body *body; + rc = lustre_msg_get_status(req->rq_repmsg); if (lustre_msg_get_type(req->rq_repmsg) == PTL_RPC_MSG_ERR) { DEBUG_REQ(D_ERROR, req, "type == PTL_RPC_MSG_ERR, err " "= %d", rc); if (rc > 0) rc = -rc; - } else if (mod == NULL) { - CERROR("Unexpected: can't find mdc_open_data, but the " - "close succeeded. Please tell CFS.\n"); } - if (!lustre_swab_repbuf(req, REPLY_REC_OFF, - sizeof(struct mds_body), - lustre_swab_mds_body)) { - CERROR("Error unpacking mds_body\n"); + body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY); + if (body == NULL) rc = -EPROTO; + } else if (rc == -ESTALE) { + /** + * it can be allowed error after 3633 if open was committed and + * server failed before close was sent. Let's check if mod + * exists and return no error in that case + */ + if (mod) { + DEBUG_REQ(D_HA, req, "Reset ESTALE = %d", rc); + LASSERT(mod->mod_open_req != NULL); + if (mod->mod_open_req->rq_committed) + rc = 0; } } - EXIT; + if (mod) { + if (rc != 0) + mod->mod_close_req = NULL; + /* Since now, mod is accessed through open_req only, + * thus close req does not keep a reference on mod anymore. */ + obd_mod_put(mod); + } *request = req; - out: - if (rc != 0 && req && req->rq_commit_cb) - req->rq_commit_cb(req); - - return rc; + mdc_close_handle_reply(req, op_data, rc); + RETURN(rc); } -int mdc_done_writing(struct obd_export *exp, struct obdo *obdo) +int mdc_done_writing(struct obd_export *exp, struct md_op_data *op_data, + struct md_open_data *mod) { + struct obd_device *obd = class_exp2obd(exp); struct ptlrpc_request *req; - struct mds_body *body; - int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) }; + int rc; ENTRY; - req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION, - MDS_DONE_WRITING, 2, size, NULL); + req = ptlrpc_request_alloc(class_exp2cliimp(exp), + &RQF_MDS_DONE_WRITING); if (req == NULL) RETURN(-ENOMEM); - body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body)); - mdc_pack_fid(&body->fid1, obdo->o_id, 0, obdo->o_mode); - body->size = obdo->o_size; - body->blocks = obdo->o_blocks; - body->flags = obdo->o_flags; - body->valid = obdo->o_valid; -// memcpy(&body->handle, &och->och_fh, sizeof(body->handle)); + mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1); + rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_DONE_WRITING); + if (rc) { + ptlrpc_request_free(req); + RETURN(rc); + } + + if (mod != NULL) { + LASSERTF(mod->mod_open_req != NULL && + mod->mod_open_req->rq_type != LI_POISON, + "POISONED setattr %p!\n", mod->mod_open_req); - ptlrpc_req_set_repsize(req, 2, size); + mod->mod_close_req = req; + DEBUG_REQ(D_HA, mod->mod_open_req, "matched setattr"); + /* We no longer want to preserve this setattr for replay even + * though the open was committed. b=3632, b=3633 */ + cfs_spin_lock(&mod->mod_open_req->rq_lock); + mod->mod_open_req->rq_replay = 0; + cfs_spin_unlock(&mod->mod_open_req->rq_lock); + } + mdc_close_pack(req, op_data); + ptlrpc_request_set_replen(req); + + mdc_get_rpc_lock(obd->u.cli.cl_close_lock, NULL); rc = ptlrpc_queue_wait(req); + mdc_put_rpc_lock(obd->u.cli.cl_close_lock, NULL); + + if (rc == -ESTALE) { + /** + * it can be allowed error after 3633 if open or setattr were + * committed and server failed before close was sent. + * Let's check if mod exists and return no error in that case + */ + if (mod) { + LASSERT(mod->mod_open_req != NULL); + if (mod->mod_open_req->rq_committed) + rc = 0; + } + } + + if (mod) { + if (rc != 0) + mod->mod_close_req = NULL; + /* Since now, mod is accessed through setattr req only, + * thus DW req does not keep a reference on mod anymore. */ + obd_mod_put(mod); + } + + mdc_close_handle_reply(req, op_data, rc); ptlrpc_req_finished(req); RETURN(rc); } -int mdc_readpage(struct obd_export *exp, struct ll_fid *fid, __u64 offset, - struct page *page, struct ptlrpc_request **request) +#ifdef HAVE_SPLIT_SUPPORT +int mdc_sendpage(struct obd_export *exp, const struct lu_fid *fid, + const struct page *page, int offset) { - struct obd_import *imp = class_exp2cliimp(exp); - struct ptlrpc_request *req = NULL; - struct ptlrpc_bulk_desc *desc = NULL; - struct mds_body *body; - int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) }; + struct ptlrpc_request *req; + struct ptlrpc_bulk_desc *desc; + int rc; ENTRY; - CDEBUG(D_INODE, "inode: "LPU64"\n", fid->id); - - req = ptlrpc_prep_req(imp, LUSTRE_MDS_VERSION, MDS_READPAGE, 2, size, - NULL); + req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_WRITEPAGE); if (req == NULL) - GOTO(out, rc = -ENOMEM); + RETURN(-ENOMEM); + + /* FIXME: capa doesn't support split yet */ + mdc_set_capa_size(req, &RMF_CAPA1, NULL); + + rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_WRITEPAGE); + if (rc) { + ptlrpc_request_free(req); + RETURN(rc); + } - /* XXX FIXME bug 249 */ req->rq_request_portal = MDS_READPAGE_PORTAL; + ptlrpc_at_set_req_timeout(req); - desc = ptlrpc_prep_bulk_imp(req, 1, BULK_PUT_SINK, MDS_BULK_PORTAL); + desc = ptlrpc_prep_bulk_imp(req, 1, BULK_GET_SOURCE, MDS_BULK_PORTAL); if (desc == NULL) GOTO(out, rc = -ENOMEM); - /* NB req now owns desc and will free it when it gets freed */ - ptlrpc_prep_bulk_page(desc, page, 0, CFS_PAGE_SIZE); + /* NB req now owns desc and will free it when it gets freed. */ + ptlrpc_prep_bulk_page(desc, (struct page *)page, 0, offset); + mdc_readdir_pack(req, 0, offset, fid, NULL); + + ptlrpc_request_set_replen(req); + rc = ptlrpc_queue_wait(req); + if (rc) + GOTO(out, rc); + + rc = sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk); +out: + ptlrpc_req_finished(req); + return rc; +} +EXPORT_SYMBOL(mdc_sendpage); +#endif + +int mdc_readpage(struct obd_export *exp, const struct lu_fid *fid, + struct obd_capa *oc, __u64 offset, struct page **pages, + unsigned npages, struct ptlrpc_request **request) +{ + struct ptlrpc_request *req; + struct ptlrpc_bulk_desc *desc; + int i; + cfs_waitq_t waitq; + int resends = 0; + struct l_wait_info lwi; + int rc; + ENTRY; + + *request = NULL; + cfs_waitq_init(&waitq); + +restart_bulk: + req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_READPAGE); + if (req == NULL) + RETURN(-ENOMEM); + + mdc_set_capa_size(req, &RMF_CAPA1, oc); + + rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_READPAGE); + if (rc) { + ptlrpc_request_free(req); + RETURN(rc); + } + + req->rq_request_portal = MDS_READPAGE_PORTAL; + ptlrpc_at_set_req_timeout(req); + + desc = ptlrpc_prep_bulk_imp(req, npages, BULK_PUT_SINK, + MDS_BULK_PORTAL); + if (desc == NULL) { + ptlrpc_request_free(req); + RETURN(-ENOMEM); + } - mdc_readdir_pack(req, REQ_REC_OFF, offset, CFS_PAGE_SIZE, fid); + /* NB req now owns desc and will free it when it gets freed */ + for (i = 0; i < npages; i++) + ptlrpc_prep_bulk_page(desc, pages[i], 0, CFS_PAGE_SIZE); - ptlrpc_req_set_repsize(req, 2, size); + mdc_readdir_pack(req, offset, CFS_PAGE_SIZE * npages, fid, oc); + + ptlrpc_request_set_replen(req); rc = ptlrpc_queue_wait(req); + if (rc) { + ptlrpc_req_finished(req); + if (rc != -ETIMEDOUT) + RETURN(rc); - if (rc == 0) { - body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body), - lustre_swab_mds_body); - if (body == NULL) { - CERROR("Can't unpack mds_body\n"); - GOTO(out, rc = -EPROTO); + resends++; + if (!client_should_resend(resends, &exp->exp_obd->u.cli)) { + CERROR("too many resend retries, returning error\n"); + RETURN(-EIO); } + lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL); + l_wait_event(waitq, 0, &lwi); - if (req->rq_bulk->bd_nob_transferred != CFS_PAGE_SIZE) { - CERROR ("Unexpected # bytes transferred: %d" - " (%lu expected)\n", - req->rq_bulk->bd_nob_transferred, - CFS_PAGE_SIZE); - GOTO (out, rc = -EPROTO); - } + goto restart_bulk; + } + + rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, + req->rq_bulk->bd_nob_transferred); + if (rc < 0) { + ptlrpc_req_finished(req); + RETURN(rc); + } + + if (req->rq_bulk->bd_nob_transferred & ~LU_PAGE_MASK) { + CERROR("Unexpected # bytes transferred: %d (%ld expected)\n", + req->rq_bulk->bd_nob_transferred, + CFS_PAGE_SIZE * npages); + ptlrpc_req_finished(req); + RETURN(-EPROTO); } - EXIT; - out: *request = req; + RETURN(0); +} + +static int mdc_statfs(struct obd_device *obd, struct obd_statfs *osfs, + __u64 max_age, __u32 flags) +{ + struct ptlrpc_request *req; + struct obd_statfs *msfs; + struct obd_import *imp = NULL; + int rc; + ENTRY; + + /* + * Since the request might also come from lprocfs, so we need + * sync this with client_disconnect_export Bug15684 + */ + cfs_down_read(&obd->u.cli.cl_sem); + if (obd->u.cli.cl_import) + imp = class_import_get(obd->u.cli.cl_import); + cfs_up_read(&obd->u.cli.cl_sem); + if (!imp) + RETURN(-ENODEV); + + req = ptlrpc_request_alloc_pack(imp, &RQF_MDS_STATFS, + LUSTRE_MDS_VERSION, MDS_STATFS); + if (req == NULL) + GOTO(output, rc = -ENOMEM); + + ptlrpc_request_set_replen(req); + + if (flags & OBD_STATFS_NODELAY) { + /* procfs requests not want stay in wait for avoid deadlock */ + req->rq_no_resend = 1; + req->rq_no_delay = 1; + } + + rc = ptlrpc_queue_wait(req); + if (rc) { + /* check connection error first */ + if (imp->imp_connect_error) + rc = imp->imp_connect_error; + GOTO(out, rc); + } + + msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS); + if (msfs == NULL) + GOTO(out, rc = -EPROTO); + + *osfs = *msfs; + EXIT; +out: + ptlrpc_req_finished(req); +output: + class_import_put(imp); + return rc; +} + +static int mdc_ioc_fid2path(struct obd_export *exp, struct getinfo_fid2path *gf) +{ + __u32 keylen, vallen; + void *key; + int rc; + + if (gf->gf_pathlen > PATH_MAX) + RETURN(-ENAMETOOLONG); + if (gf->gf_pathlen < 2) + RETURN(-EOVERFLOW); + + /* Key is KEY_FID2PATH + getinfo_fid2path description */ + keylen = cfs_size_round(sizeof(KEY_FID2PATH)) + sizeof(*gf); + OBD_ALLOC(key, keylen); + if (key == NULL) + RETURN(-ENOMEM); + memcpy(key, KEY_FID2PATH, sizeof(KEY_FID2PATH)); + memcpy(key + cfs_size_round(sizeof(KEY_FID2PATH)), gf, sizeof(*gf)); + + CDEBUG(D_IOCTL, "path get "DFID" from "LPU64" #%d\n", + PFID(&gf->gf_fid), gf->gf_recno, gf->gf_linkno); + + if (!fid_is_sane(&gf->gf_fid)) + GOTO(out, rc = -EINVAL); + + /* Val is struct getinfo_fid2path result plus path */ + vallen = sizeof(*gf) + gf->gf_pathlen; + + rc = obd_get_info(exp, keylen, key, &vallen, gf, NULL); + if (rc) + GOTO(out, rc); + + if (vallen <= sizeof(*gf)) + GOTO(out, rc = -EPROTO); + else if (vallen > sizeof(*gf) + gf->gf_pathlen) + GOTO(out, rc = -EOVERFLOW); + + CDEBUG(D_IOCTL, "path get "DFID" from "LPU64" #%d\n%s\n", + PFID(&gf->gf_fid), gf->gf_recno, gf->gf_linkno, gf->gf_path); + +out: + OBD_FREE(key, keylen); + return rc; +} + +static struct kuc_hdr *changelog_kuc_hdr(char *buf, int len, int flags) +{ + struct kuc_hdr *lh = (struct kuc_hdr *)buf; + + LASSERT(len <= CR_MAXSIZE); + + lh->kuc_magic = KUC_MAGIC; + lh->kuc_transport = KUC_TRANSPORT_CHANGELOG; + lh->kuc_flags = flags; + lh->kuc_msgtype = CL_RECORD; + lh->kuc_msglen = len; + return lh; +} + +#define D_CHANGELOG 0 + +struct changelog_show { + __u64 cs_startrec; + __u32 cs_flags; + cfs_file_t *cs_fp; + char *cs_buf; + struct obd_device *cs_obd; +}; + +static int changelog_show_cb(struct llog_handle *llh, struct llog_rec_hdr *hdr, + void *data) +{ + struct changelog_show *cs = data; + struct llog_changelog_rec *rec = (struct llog_changelog_rec *)hdr; + struct kuc_hdr *lh; + int len, rc; + ENTRY; + + if ((rec->cr_hdr.lrh_type != CHANGELOG_REC) || + (rec->cr.cr_type >= CL_LAST)) { + CERROR("Not a changelog rec %d/%d\n", rec->cr_hdr.lrh_type, + rec->cr.cr_type); + RETURN(-EINVAL); + } + + if (rec->cr.cr_index < cs->cs_startrec) { + /* Skip entries earlier than what we are interested in */ + CDEBUG(D_CHANGELOG, "rec="LPU64" start="LPU64"\n", + rec->cr.cr_index, cs->cs_startrec); + RETURN(0); + } + + CDEBUG(D_CHANGELOG, LPU64" %02d%-5s "LPU64" 0x%x t="DFID" p="DFID + " %.*s\n", rec->cr.cr_index, rec->cr.cr_type, + changelog_type2str(rec->cr.cr_type), rec->cr.cr_time, + rec->cr.cr_flags & CLF_FLAGMASK, + PFID(&rec->cr.cr_tfid), PFID(&rec->cr.cr_pfid), + rec->cr.cr_namelen, rec->cr.cr_name); + + len = sizeof(*lh) + sizeof(rec->cr) + rec->cr.cr_namelen; + + /* Set up the message */ + lh = changelog_kuc_hdr(cs->cs_buf, len, cs->cs_flags); + memcpy(lh + 1, &rec->cr, len - sizeof(*lh)); + + rc = libcfs_kkuc_msg_put(cs->cs_fp, lh); + CDEBUG(D_CHANGELOG, "kucmsg fp %p len %d rc %d\n", cs->cs_fp, len,rc); + + RETURN(rc); +} + +static int mdc_changelog_send_thread(void *csdata) +{ + struct changelog_show *cs = csdata; + struct llog_ctxt *ctxt = NULL; + struct llog_handle *llh = NULL; + struct kuc_hdr *kuch; + int rc; + + CDEBUG(D_CHANGELOG, "changelog to fp=%p start "LPU64"\n", + cs->cs_fp, cs->cs_startrec); + + /* + * It's important to daemonize here to close unused FDs. + * The write fd from pipe is already opened by the caller, + * so it's fine to clear all files here + */ + cfs_daemonize("mdc_clg_send_thread"); + + OBD_ALLOC(cs->cs_buf, CR_MAXSIZE); + if (cs->cs_buf == NULL) + GOTO(out, rc = -ENOMEM); + + /* Set up the remote catalog handle */ + ctxt = llog_get_context(cs->cs_obd, LLOG_CHANGELOG_REPL_CTXT); + if (ctxt == NULL) + GOTO(out, rc = -ENOENT); + rc = llog_create(ctxt, &llh, NULL, CHANGELOG_CATALOG); + if (rc) { + CERROR("llog_create() failed %d\n", rc); + GOTO(out, rc); + } + rc = llog_init_handle(llh, LLOG_F_IS_CAT, NULL); + if (rc) { + CERROR("llog_init_handle failed %d\n", rc); + GOTO(out, rc); + } + + rc = llog_cat_process_flags(llh, changelog_show_cb, cs, 0, 0, 0); + + /* Send EOF no matter what our result */ + if ((kuch = changelog_kuc_hdr(cs->cs_buf, sizeof(*kuch), + cs->cs_flags))) { + kuch->kuc_msgtype = CL_EOF; + libcfs_kkuc_msg_put(cs->cs_fp, kuch); + } + +out: + cfs_put_file(cs->cs_fp); + if (llh) + llog_cat_put(llh); + if (ctxt) + llog_ctxt_put(ctxt); + if (cs->cs_buf) + OBD_FREE(cs->cs_buf, CR_MAXSIZE); + OBD_FREE_PTR(cs); + /* detach from parent process so we get cleaned up */ + cfs_daemonize("cl_send"); + return rc; +} + +static int mdc_ioc_changelog_send(struct obd_device *obd, + struct ioc_changelog *icc) +{ + struct changelog_show *cs; + int rc; + + /* Freed in mdc_changelog_send_thread */ + OBD_ALLOC_PTR(cs); + if (!cs) + return -ENOMEM; + + cs->cs_obd = obd; + cs->cs_startrec = icc->icc_recno; + /* matching cfs_put_file in mdc_changelog_send_thread */ + cs->cs_fp = cfs_get_fd(icc->icc_id); + cs->cs_flags = icc->icc_flags; + + /* New thread because we should return to user app before + writing into our pipe */ + rc = cfs_create_thread(mdc_changelog_send_thread, cs, CFS_DAEMON_FLAGS); + if (rc >= 0) { + CDEBUG(D_CHANGELOG, "start changelog thread: %d\n", rc); + return 0; + } + + CERROR("Failed to start changelog thread: %d\n", rc); + OBD_FREE_PTR(cs); return rc; } +static int mdc_ioc_hsm_ct_start(struct obd_export *exp, + struct lustre_kernelcomm *lk); static int mdc_iocontrol(unsigned int cmd, struct obd_export *exp, int len, void *karg, void *uarg) @@ -808,15 +1372,30 @@ static int mdc_iocontrol(unsigned int cmd, struct obd_export *exp, int len, int rc; ENTRY; -#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)) - MOD_INC_USE_COUNT; -#else - if (!try_module_get(THIS_MODULE)) { + if (!cfs_try_module_get(THIS_MODULE)) { CERROR("Can't get module. Is it alive?"); return -EINVAL; } -#endif switch (cmd) { + case LL_IOC_HSM_CT_START: + rc = mdc_ioc_hsm_ct_start(exp, karg); + GOTO(out, rc); + case OBD_IOC_CHANGELOG_SEND: + rc = mdc_ioc_changelog_send(obd, karg); + GOTO(out, rc); + case OBD_IOC_CHANGELOG_CLEAR: { + struct ioc_changelog *icc = karg; + struct changelog_setinfo cs = + {.cs_recno = icc->icc_recno, .cs_id = icc->icc_id}; + rc = obd_set_info_async(exp, strlen(KEY_CHANGELOG_CLEAR), + KEY_CHANGELOG_CLEAR, sizeof(cs), &cs, + NULL); + GOTO(out, rc); + } + case OBD_IOC_FID2PATH: { + rc = mdc_ioc_fid2path(exp, karg); + GOTO(out, rc); + } case OBD_IOC_CLIENT_RECOVER: rc = ptlrpc_recover_import(imp, data->ioc_inlbuf1); if (rc < 0) @@ -828,6 +1407,7 @@ static int mdc_iocontrol(unsigned int cmd, struct obd_export *exp, int len, case OBD_IOC_PARSE: { ctxt = llog_get_context(exp->exp_obd, LLOG_CONFIG_REPL_CTXT); rc = class_config_parse_llog(ctxt, data->ioc_inlbuf1, NULL); + llog_ctxt_put(ctxt); GOTO(out, rc); } #ifdef __KERNEL__ @@ -835,7 +1415,7 @@ static int mdc_iocontrol(unsigned int cmd, struct obd_export *exp, int len, case OBD_IOC_LLOG_PRINT: { ctxt = llog_get_context(obd, LLOG_CONFIG_REPL_CTXT); rc = llog_ioctl(ctxt, cmd, data); - + llog_ctxt_put(ctxt); GOTO(out, rc); } #endif @@ -843,86 +1423,256 @@ static int mdc_iocontrol(unsigned int cmd, struct obd_export *exp, int len, rc = lquota_poll_check(quota_interface, exp, (struct if_quotacheck *)karg); GOTO(out, rc); + case OBD_IOC_PING_TARGET: + rc = ptlrpc_obd_ping(obd); + GOTO(out, rc); + /* + * Normally IOC_OBD_STATFS iocontrol is handled by LMV instead of MDC. + * But when the cluster is upgraded from 1.8, there'd be no LMV layer + * thus we might be called here. Eventually this code should be removed. + * bz20731. + */ + case IOC_OBD_STATFS: { + struct obd_statfs stat_buf = {0}; + + if (*((__u32 *) data->ioc_inlbuf2) != 0) + GOTO(out, rc = -ENODEV); + + /* copy UUID */ + if (cfs_copy_to_user(data->ioc_pbuf2, obd2cli_tgt(obd), + min((int) data->ioc_plen2, + (int) sizeof(struct obd_uuid)))) + GOTO(out, rc = -EFAULT); + + rc = mdc_statfs(obd, &stat_buf, + cfs_time_shift_64(-OBD_STATFS_CACHE_SECONDS), + 0); + if (rc != 0) + GOTO(out, rc); + + if (cfs_copy_to_user(data->ioc_pbuf1, &stat_buf, + min((int) data->ioc_plen1, + (int) sizeof(stat_buf)))) + GOTO(out, rc = -EFAULT); + + GOTO(out, rc = 0); + } + case LL_IOC_GET_CONNECT_FLAGS: { + if (cfs_copy_to_user(uarg, &exp->exp_connect_flags, + sizeof(__u64))) + GOTO(out, rc = -EFAULT); + else + GOTO(out, rc = 0); + } default: CERROR("mdc_ioctl(): unrecognised ioctl %#x\n", cmd); GOTO(out, rc = -ENOTTY); } out: -#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)) - MOD_DEC_USE_COUNT; -#else - module_put(THIS_MODULE); -#endif + cfs_module_put(THIS_MODULE); return rc; } -int mdc_set_info_async(struct obd_export *exp, obd_count keylen, - void *key, obd_count vallen, void *val, - struct ptlrpc_request_set *set) +int mdc_get_info_rpc(struct obd_export *exp, + obd_count keylen, void *key, + int vallen, void *val) { - struct obd_import *imp = class_exp2cliimp(exp); - int rc = -EINVAL; + struct obd_import *imp = class_exp2cliimp(exp); + struct ptlrpc_request *req; + char *tmp; + int rc = -EINVAL; + ENTRY; - if (KEY_IS(KEY_INIT_RECOV)) { - if (vallen != sizeof(int)) - RETURN(-EINVAL); - spin_lock(&imp->imp_lock); - imp->imp_initial_recov = *(int *)val; - spin_unlock(&imp->imp_lock); + req = ptlrpc_request_alloc(imp, &RQF_MDS_GET_INFO); + if (req == NULL) + RETURN(-ENOMEM); - CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n", - exp->exp_obd->obd_name, imp->imp_initial_recov); - RETURN(0); + req_capsule_set_size(&req->rq_pill, &RMF_GETINFO_KEY, + RCL_CLIENT, keylen); + req_capsule_set_size(&req->rq_pill, &RMF_GETINFO_VALLEN, + RCL_CLIENT, sizeof(__u32)); + + rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_GET_INFO); + if (rc) { + ptlrpc_request_free(req); + RETURN(rc); } - /* Turn off initial_recov after we try all backup servers once */ - if (KEY_IS(KEY_INIT_RECOV_BACKUP)) { - if (vallen != sizeof(int)) - RETURN(-EINVAL); - spin_lock(&imp->imp_lock); - imp->imp_initial_recov_bk = *(int *)val; - if (imp->imp_initial_recov_bk) - imp->imp_initial_recov = 1; - spin_unlock(&imp->imp_lock); + tmp = req_capsule_client_get(&req->rq_pill, &RMF_GETINFO_KEY); + memcpy(tmp, key, keylen); + tmp = req_capsule_client_get(&req->rq_pill, &RMF_GETINFO_VALLEN); + memcpy(tmp, &vallen, sizeof(__u32)); - CDEBUG(D_HA, "%s: set imp_initial_recov_bk = %d\n", - exp->exp_obd->obd_name, imp->imp_initial_recov_bk); - RETURN(0); + req_capsule_set_size(&req->rq_pill, &RMF_GETINFO_VAL, + RCL_SERVER, vallen); + ptlrpc_request_set_replen(req); + + rc = ptlrpc_queue_wait(req); + if (rc == 0) { + tmp = req_capsule_server_get(&req->rq_pill, &RMF_GETINFO_VAL); + memcpy(val, tmp, vallen); + if (ptlrpc_rep_need_swab(req)) { + if (KEY_IS(KEY_FID2PATH)) { + lustre_swab_fid2path(val); + } + } + } + ptlrpc_req_finished(req); + + RETURN(rc); +} + +static void lustre_swab_hai(struct hsm_action_item *h) +{ + __swab32s(&h->hai_len); + __swab32s(&h->hai_action); + lustre_swab_lu_fid(&h->hai_fid); + __swab64s(&h->hai_cookie); + __swab64s(&h->hai_extent.offset); + __swab64s(&h->hai_extent.length); + __swab64s(&h->hai_gid); +} + +static void lustre_swab_hal(struct hsm_action_list *h) +{ + struct hsm_action_item *hai; + int i; + + __swab32s(&h->hal_version); + __swab32s(&h->hal_count); + __swab32s(&h->hal_archive_num); + hai = hai_zero(h); + for (i = 0; i < h->hal_count; i++) { + lustre_swab_hai(hai); + hai = hai_next(hai); + } +} + +static void lustre_swab_kuch(struct kuc_hdr *l) +{ + __swab16s(&l->kuc_magic); + /* __u8 l->kuc_transport */ + __swab16s(&l->kuc_msgtype); + __swab16s(&l->kuc_msglen); +} + +static int mdc_ioc_hsm_ct_start(struct obd_export *exp, + struct lustre_kernelcomm *lk) +{ + int rc = 0; + + if (lk->lk_group != KUC_GRP_HSM) { + CERROR("Bad copytool group %d\n", lk->lk_group); + return -EINVAL; + } + + CDEBUG(D_HSM, "CT start r%d w%d u%d g%d f%#x\n", lk->lk_rfd, lk->lk_wfd, + lk->lk_uid, lk->lk_group, lk->lk_flags); + + if (lk->lk_flags & LK_FLG_STOP) + rc = libcfs_kkuc_group_rem(lk->lk_uid,lk->lk_group); + else { + cfs_file_t *fp = cfs_get_fd(lk->lk_wfd); + rc = libcfs_kkuc_group_add(fp, lk->lk_uid,lk->lk_group, + lk->lk_data); + if (rc && fp) + cfs_put_file(fp); } - if (KEY_IS("read-only")) { - struct ptlrpc_request *req; - int size[3] = { sizeof(struct ptlrpc_body), keylen, vallen }; - char *bufs[3] = { NULL, key, val }; + /* lk_data is archive number mask */ + /* TODO: register archive num with mdt so coordinator can choose + correct agent. */ + + return rc; +} + +/** + * Send a message to any listening copytools + * @param val KUC message (kuc_hdr + hsm_action_list) + * @param len total length of message + */ +static int mdc_hsm_copytool_send(int len, void *val) +{ + struct kuc_hdr *lh = (struct kuc_hdr *)val; + struct hsm_action_list *hal = (struct hsm_action_list *)(lh + 1); + int rc; + ENTRY; + + if (len < sizeof(*lh) + sizeof(*hal)) { + CERROR("Short HSM message %d < %d\n", len, + (int) (sizeof(*lh) + sizeof(*hal))); + RETURN(-EPROTO); + } + if (lh->kuc_magic == __swab16(KUC_MAGIC)) { + lustre_swab_kuch(lh); + lustre_swab_hal(hal); + } else if (lh->kuc_magic != KUC_MAGIC) { + CERROR("Bad magic %x!=%x\n", lh->kuc_magic, KUC_MAGIC); + RETURN(-EPROTO); + } + + CDEBUG(D_HSM, " Received message mg=%x t=%d m=%d l=%d actions=%d\n", + lh->kuc_magic, lh->kuc_transport, lh->kuc_msgtype, + lh->kuc_msglen, hal->hal_count); + + /* Broadcast to HSM listeners */ + rc = libcfs_kkuc_group_put(KUC_GRP_HSM, lh); + + RETURN(rc); +} + +int mdc_set_info_async(struct obd_export *exp, + obd_count keylen, void *key, + obd_count vallen, void *val, + struct ptlrpc_request_set *set) +{ + struct obd_import *imp = class_exp2cliimp(exp); + int rc = -EINVAL; + ENTRY; + + if (KEY_IS(KEY_READ_ONLY)) { if (vallen != sizeof(int)) RETURN(-EINVAL); + cfs_spin_lock(&imp->imp_lock); if (*((int *)val)) { imp->imp_connect_flags_orig |= OBD_CONNECT_RDONLY; - imp->imp_connect_data.ocd_connect_flags |= - OBD_CONNECT_RDONLY; + imp->imp_connect_data.ocd_connect_flags |= OBD_CONNECT_RDONLY; } else { imp->imp_connect_flags_orig &= ~OBD_CONNECT_RDONLY; - imp->imp_connect_data.ocd_connect_flags &= - ~OBD_CONNECT_RDONLY; - } - - req = ptlrpc_prep_req(imp, LUSTRE_MDS_VERSION, MDS_SET_INFO, - 3, size, bufs); - if (req == NULL) - RETURN(-ENOMEM); - - ptlrpc_req_set_repsize(req, 1, NULL); - if (set) { - rc = 0; - ptlrpc_set_add_req(set, req); - ptlrpc_check_set(set); - } else { - rc = ptlrpc_queue_wait(req); - ptlrpc_req_finished(req); + imp->imp_connect_data.ocd_connect_flags &= ~OBD_CONNECT_RDONLY; } + cfs_spin_unlock(&imp->imp_lock); + rc = do_set_info_async(imp, MDS_SET_INFO, LUSTRE_MDS_VERSION, + keylen, key, vallen, val, set); + RETURN(rc); + } + if (KEY_IS(KEY_SPTLRPC_CONF)) { + sptlrpc_conf_client_adapt(exp->exp_obd); + RETURN(0); + } + if (KEY_IS(KEY_FLUSH_CTX)) { + sptlrpc_import_flush_my_ctx(imp); + RETURN(0); + } + if (KEY_IS(KEY_MDS_CONN)) { + /* mds-mds import */ + cfs_spin_lock(&imp->imp_lock); + imp->imp_server_timeout = 1; + cfs_spin_unlock(&imp->imp_lock); + imp->imp_client->cli_request_portal = MDS_MDS_PORTAL; + CDEBUG(D_OTHER, "%s: timeout / 2\n", exp->exp_obd->obd_name); + RETURN(0); + } + if (KEY_IS(KEY_CHANGELOG_CLEAR)) { + rc = do_set_info_async(imp, MDS_SET_INFO, LUSTRE_MDS_VERSION, + keylen, key, vallen, val, set); + RETURN(rc); + } + if (KEY_IS(KEY_HSM_COPYTOOL_SEND)) { + rc = mdc_hsm_copytool_send(vallen, val); RETURN(rc); } @@ -930,12 +1680,11 @@ int mdc_set_info_async(struct obd_export *exp, obd_count keylen, } int mdc_get_info(struct obd_export *exp, __u32 keylen, void *key, - __u32 *vallen, void *val) + __u32 *vallen, void *val, struct lov_stripe_md *lsm) { int rc = -EINVAL; - if (keylen == strlen("max_easize") && - memcmp(key, "max_easize", strlen("max_easize")) == 0) { + if (KEY_IS(KEY_MAX_EASIZE)) { int mdsize, *max_easize; if (*vallen != sizeof(int)) @@ -947,155 +1696,138 @@ int mdc_get_info(struct obd_export *exp, __u32 keylen, void *key, *max_easize = exp->exp_obd->u.cli.cl_max_mds_easize; RETURN(0); } - RETURN(rc); -} - -static int mdc_statfs(struct obd_device *obd, struct obd_statfs *osfs, - __u64 max_age) -{ - struct ptlrpc_request *req; - struct obd_statfs *msfs; - int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*msfs) }; - ENTRY; - - /* We could possibly pass max_age in the request (as an absolute - * timestamp or a "seconds.usec ago") so the target can avoid doing - * extra calls into the filesystem if that isn't necessary (e.g. - * during mount that would help a bit). Having relative timestamps - * is not so great if request processing is slow, while absolute - * timestamps are not ideal because they need time synchronization. */ - req = ptlrpc_prep_req(obd->u.cli.cl_import, LUSTRE_MDS_VERSION, - MDS_STATFS, 1, NULL, NULL); - if (!req) - RETURN(-ENOMEM); - - ptlrpc_req_set_repsize(req, 2, size); - - rc = ptlrpc_queue_wait(req); + if (KEY_IS(KEY_CONN_DATA)) { + struct obd_import *imp = class_exp2cliimp(exp); + struct obd_connect_data *data = val; - if (rc) - GOTO(out, rc); + if (*vallen != sizeof(*data)) + RETURN(-EINVAL); - msfs = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*msfs), - lustre_swab_obd_statfs); - if (msfs == NULL) { - CERROR("Can't unpack obd_statfs\n"); - GOTO(out, rc = -EPROTO); + *data = imp->imp_connect_data; + RETURN(0); } - memcpy(osfs, msfs, sizeof(*msfs)); - EXIT; -out: - ptlrpc_req_finished(req); + rc = mdc_get_info_rpc(exp, keylen, key, *vallen, val); - return rc; + RETURN(rc); } -static int mdc_pin(struct obd_export *exp, obd_id ino, __u32 gen, int type, - struct obd_client_handle *handle, int flag) +static int mdc_pin(struct obd_export *exp, const struct lu_fid *fid, + struct obd_capa *oc, struct obd_client_handle *handle, + int flags) { struct ptlrpc_request *req; - struct mds_body *body; - int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) }; + struct mdt_body *body; + int rc; ENTRY; - req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION, - MDS_PIN, 2, size, NULL); + req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_PIN); if (req == NULL) RETURN(-ENOMEM); - body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body)); - mdc_pack_fid(&body->fid1, ino, gen, type); - body->flags = flag; + mdc_set_capa_size(req, &RMF_CAPA1, oc); + + rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_PIN); + if (rc) { + ptlrpc_request_free(req); + RETURN(rc); + } + + mdc_pack_body(req, fid, oc, 0, 0, -1, flags); - ptlrpc_req_set_repsize(req, 2, size); + ptlrpc_request_set_replen(req); mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL); rc = ptlrpc_queue_wait(req); mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL); if (rc) { - CERROR("pin failed: %d\n", rc); - ptlrpc_req_finished(req); - RETURN(rc); + CERROR("Pin failed: %d\n", rc); + GOTO(err_out, rc); } - body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body), - lustre_swab_mds_body); - if (body == NULL) { - ptlrpc_req_finished(req); - RETURN(rc); - } + body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY); + if (body == NULL) + GOTO(err_out, rc = -EPROTO); - memcpy(&handle->och_fh, &body->handle, sizeof(body->handle)); + handle->och_fh = body->handle; handle->och_magic = OBD_CLIENT_HANDLE_MAGIC; - OBD_ALLOC(handle->och_mod, sizeof(*handle->och_mod)); + handle->och_mod = obd_mod_alloc(); if (handle->och_mod == NULL) { - DEBUG_REQ(D_ERROR, req, "can't allocate mdc_open_data"); - RETURN(-ENOMEM); + DEBUG_REQ(D_ERROR, req, "can't allocate md_open_data"); + GOTO(err_out, rc = -ENOMEM); } handle->och_mod->mod_open_req = req; /* will be dropped by unpin */ + RETURN(0); + +err_out: + ptlrpc_req_finished(req); RETURN(rc); } -static int mdc_unpin(struct obd_export *exp, - struct obd_client_handle *handle, int flag) +static int mdc_unpin(struct obd_export *exp, struct obd_client_handle *handle, + int flag) { struct ptlrpc_request *req; - struct mds_body *body; - int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) }; + struct mdt_body *body; + int rc; ENTRY; - if (handle->och_magic != OBD_CLIENT_HANDLE_MAGIC) - RETURN(0); - - req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION, - MDS_CLOSE, 2, size, NULL); + req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp), &RQF_MDS_UNPIN, + LUSTRE_MDS_VERSION, MDS_UNPIN); if (req == NULL) RETURN(-ENOMEM); - body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body)); - memcpy(&body->handle, &handle->och_fh, sizeof(body->handle)); + body = req_capsule_client_get(&req->rq_pill, &RMF_MDT_BODY); + body->handle = handle->och_fh; body->flags = flag; - ptlrpc_req_set_repsize(req, 1, NULL); + ptlrpc_request_set_replen(req); + mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL); rc = ptlrpc_queue_wait(req); mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL); if (rc != 0) - CERROR("unpin failed: %d\n", rc); + CERROR("Unpin failed: %d\n", rc); ptlrpc_req_finished(req); ptlrpc_req_finished(handle->och_mod->mod_open_req); - OBD_FREE(handle->och_mod, sizeof(*handle->och_mod)); + + obd_mod_put(handle->och_mod); RETURN(rc); } -int mdc_sync(struct obd_export *exp, struct ll_fid *fid, - struct ptlrpc_request **request) +int mdc_sync(struct obd_export *exp, const struct lu_fid *fid, + struct obd_capa *oc, struct ptlrpc_request **request) { struct ptlrpc_request *req; - int size[2] = { sizeof(struct ptlrpc_body), sizeof(struct mds_body) }; - int rc; + int rc; ENTRY; - req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION, - MDS_SYNC, 2, size, NULL); - if (!req) - RETURN(rc = -ENOMEM); + *request = NULL; + req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_SYNC); + if (req == NULL) + RETURN(-ENOMEM); - mdc_pack_req_body(req, REQ_REC_OFF, 0, fid, 0, 0); + mdc_set_capa_size(req, &RMF_CAPA1, oc); - ptlrpc_req_set_repsize(req, 2, size); + rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_SYNC); + if (rc) { + ptlrpc_request_free(req); + RETURN(rc); + } + + mdc_pack_body(req, fid, oc, 0, 0, -1, 0); + + ptlrpc_request_set_replen(req); rc = ptlrpc_queue_wait(req); - if (rc || request == NULL) + if (rc) ptlrpc_req_finished(req); else *request = req; - RETURN(rc); } @@ -1108,9 +1840,21 @@ static int mdc_import_event(struct obd_device *obd, struct obd_import *imp, switch (event) { case IMP_EVENT_DISCON: { +#if 0 + /* XXX Pass event up to OBDs stack. used only for FLD now */ + rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DISCON, NULL); +#endif break; } case IMP_EVENT_INACTIVE: { + struct client_obd *cli = &obd->u.cli; + /* + * Flush current sequence to make client obtain new one + * from server in case of disconnect/reconnect. + */ + if (cli->cl_seq != NULL) + seq_client_flush(cli->cl_seq); + rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL); break; } @@ -1126,8 +1870,11 @@ static int mdc_import_event(struct obd_device *obd, struct obd_import *imp, break; } case IMP_EVENT_OCD: + rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL); + break; + case IMP_EVENT_DEACTIVATE: + case IMP_EVENT_ACTIVATE: break; - default: CERROR("Unknown import event %x\n", event); LBUG(); @@ -1135,10 +1882,100 @@ static int mdc_import_event(struct obd_device *obd, struct obd_import *imp, RETURN(rc); } -static int mdc_setup(struct obd_device *obd, obd_count len, void *buf) +static int mdc_fid_init(struct obd_export *exp) +{ + struct client_obd *cli = &exp->exp_obd->u.cli; + char *prefix; + int rc; + ENTRY; + + OBD_ALLOC_PTR(cli->cl_seq); + if (cli->cl_seq == NULL) + RETURN(-ENOMEM); + + OBD_ALLOC(prefix, MAX_OBD_NAME + 5); + if (prefix == NULL) + GOTO(out_free_seq, rc = -ENOMEM); + + snprintf(prefix, MAX_OBD_NAME + 5, "srv-%s", + exp->exp_obd->obd_name); + + /* Init client side sequence-manager */ + rc = seq_client_init(cli->cl_seq, exp, + LUSTRE_SEQ_METADATA, + prefix, NULL); + OBD_FREE(prefix, MAX_OBD_NAME + 5); + if (rc) + GOTO(out_free_seq, rc); + + RETURN(rc); +out_free_seq: + OBD_FREE_PTR(cli->cl_seq); + cli->cl_seq = NULL; + return rc; +} + +static int mdc_fid_fini(struct obd_export *exp) +{ + struct client_obd *cli = &exp->exp_obd->u.cli; + ENTRY; + + if (cli->cl_seq != NULL) { + seq_client_fini(cli->cl_seq); + OBD_FREE_PTR(cli->cl_seq); + cli->cl_seq = NULL; + } + + RETURN(0); +} + +int mdc_fid_alloc(struct obd_export *exp, struct lu_fid *fid, + struct md_op_data *op_data) +{ + struct client_obd *cli = &exp->exp_obd->u.cli; + struct lu_client_seq *seq = cli->cl_seq; + ENTRY; + RETURN(seq_client_alloc_fid(seq, fid)); +} + +/* XXX This method is used only to clear current fid seq + * once fld/mds insert failed */ +static int mdc_fid_delete(struct obd_export *exp, const struct lu_fid *fid) +{ + struct client_obd *cli = &exp->exp_obd->u.cli; + + seq_client_flush(cli->cl_seq); + return 0; +} + +struct obd_uuid *mdc_get_uuid(struct obd_export *exp) { + struct client_obd *cli = &exp->exp_obd->u.cli; + return &cli->cl_target_uuid; +} + +/** + * Determine whether the lock can be canceled before replaying it during + * recovery, non zero value will be return if the lock can be canceled, + * or zero returned for not + */ +static int mdc_cancel_for_recovery(struct ldlm_lock *lock) +{ + if (lock->l_resource->lr_type != LDLM_IBITS) + RETURN(0); + + /* FIXME: if we ever get into a situation where there are too many + * opened files with open locks on a single node, then we really + * should replay these open locks to reget it */ + if (lock->l_policy_data.l_inodebits.bits & MDS_INODELOCK_OPEN) + RETURN(0); + + RETURN(1); +} + +static int mdc_setup(struct obd_device *obd, struct lustre_cfg *cfg) { struct client_obd *cli = &obd->u.cli; - struct lprocfs_static_vars lvars; + struct lprocfs_static_vars lvars = { 0 }; int rc; ENTRY; @@ -1159,13 +1996,17 @@ static int mdc_setup(struct obd_device *obd, obd_count len, void *buf) GOTO(err_setattr_lock, rc = -ENOMEM); mdc_init_rpc_lock(cli->cl_close_lock); - rc = client_obd_setup(obd, len, buf); + rc = client_obd_setup(obd, cfg); if (rc) GOTO(err_close_lock, rc); - lprocfs_init_vars(mdc, &lvars); + lprocfs_mdc_init_vars(&lvars); lprocfs_obd_setup(obd, lvars.obd_vars); + sptlrpc_lprocfs_cliobd_attach(obd); + ptlrpc_lprocfs_register_obd(obd); + + ns_register_cancel(obd->obd_namespace, mdc_cancel_for_recovery); - rc = obd_llog_init(obd, obd, 0, NULL, NULL); + rc = obd_llog_init(obd, &obd->obd_olg, obd, NULL); if (rc) { mdc_cleanup(obd); CERROR("failed to setup llogging subsystems\n"); @@ -1187,42 +2028,22 @@ err_rpc_lock: * us to make MDS RPCs with large enough reply buffers to hold the * maximum-sized (= maximum striped) EA and cookie without having to * calculate this (via a call into the LOV + OSCs) each time we make an RPC. */ -int mdc_init_ea_size(struct obd_export *mdc_exp, struct obd_export *lov_exp) +static int mdc_init_ea_size(struct obd_export *exp, int easize, + int def_easize, int cookiesize) { - struct obd_device *obd = mdc_exp->exp_obd; + struct obd_device *obd = exp->exp_obd; struct client_obd *cli = &obd->u.cli; - struct lov_stripe_md lsm = { .lsm_magic = LOV_MAGIC }; - struct lov_desc desc; - __u32 valsize = sizeof(desc); - __u32 stripes; - int rc, size; ENTRY; - rc = obd_get_info(lov_exp, strlen(KEY_LOVDESC) + 1, KEY_LOVDESC, - &valsize, &desc); - if (rc) - RETURN(rc); - - stripes = min(desc.ld_tgt_count, (__u32)LOV_MAX_STRIPE_COUNT); - lsm.lsm_stripe_count = stripes; - size = obd_size_diskmd(lov_exp, &lsm); - - if (cli->cl_max_mds_easize < size) - cli->cl_max_mds_easize = size; - - lsm.lsm_stripe_count = desc.ld_default_stripe_count; - size = obd_size_diskmd(lov_exp, &lsm); + if (cli->cl_max_mds_easize < easize) + cli->cl_max_mds_easize = easize; - if (cli->cl_default_mds_easize < size) - cli->cl_default_mds_easize = size; + if (cli->cl_default_mds_easize < def_easize) + cli->cl_default_mds_easize = def_easize; - size = stripes * sizeof(struct llog_cookie); - if (cli->cl_max_mds_cookiesize < size) - cli->cl_max_mds_cookiesize = size; + if (cli->cl_max_mds_cookiesize < cookiesize) + cli->cl_max_mds_cookiesize = cookiesize; - CDEBUG(D_HA, "updating max_mdsize/max_cookiesize: %d/%d\n", - cli->cl_max_mds_easize, cli->cl_max_mds_cookiesize); - RETURN(0); } @@ -1232,25 +2053,18 @@ static int mdc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage) ENTRY; switch (stage) { - case OBD_CLEANUP_EARLY: - case OBD_CLEANUP_EXPORTS: - /* If we set up but never connected, the - client import will not have been cleaned. */ - if (obd->u.cli.cl_import) { - struct obd_import *imp; - imp = obd->u.cli.cl_import; - CERROR("client import never connected\n"); - ptlrpc_invalidate_import(imp); - ptlrpc_free_rq_pool(imp->imp_rq_pool); - class_destroy_import(imp); - obd->u.cli.cl_import = NULL; - } + case OBD_CLEANUP_EARLY: break; - case OBD_CLEANUP_SELF_EXP: + case OBD_CLEANUP_EXPORTS: + /* Failsafe, ok if racy */ + if (obd->obd_type->typ_refcnt <= 1) + libcfs_kkuc_group_rem(0, KUC_GRP_HSM); + + obd_cleanup_client_import(obd); + rc = obd_llog_finish(obd, 0); if (rc != 0) CERROR("failed to cleanup llogging subsystems\n"); - case OBD_CLEANUP_OBD: break; } RETURN(rc); @@ -1264,6 +2078,7 @@ static int mdc_cleanup(struct obd_device *obd) OBD_FREE(cli->cl_setattr_lock, sizeof (*cli->cl_setattr_lock)); OBD_FREE(cli->cl_close_lock, sizeof (*cli->cl_close_lock)); + ptlrpc_lprocfs_unregister_obd(obd); lprocfs_obd_cleanup(obd); ptlrpcd_decref(); @@ -1271,26 +2086,30 @@ static int mdc_cleanup(struct obd_device *obd) } -static int mdc_llog_init(struct obd_device *obd, struct obd_device *tgt, - int count, struct llog_catid *logid, - struct obd_uuid *uuid) +static int mdc_llog_init(struct obd_device *obd, struct obd_llog_group *olg, + struct obd_device *tgt, int *index) { struct llog_ctxt *ctxt; int rc; ENTRY; - rc = llog_setup(obd, LLOG_CONFIG_REPL_CTXT, tgt, 0, NULL, + LASSERT(olg == &obd->obd_olg); + + rc = llog_setup(obd, olg, LLOG_LOVEA_REPL_CTXT, tgt, 0, NULL, &llog_client_ops); - if (rc == 0) { - ctxt = llog_get_context(obd, LLOG_CONFIG_REPL_CTXT); - ctxt->loc_imp = obd->u.cli.cl_import; - } + if (rc) + RETURN(rc); + + ctxt = llog_get_context(obd, LLOG_LOVEA_REPL_CTXT); + llog_initiator_connect(ctxt); + llog_ctxt_put(ctxt); - rc = llog_setup(obd, LLOG_LOVEA_REPL_CTXT, tgt, 0, NULL, - &llog_client_ops); + rc = llog_setup(obd, olg, LLOG_CHANGELOG_REPL_CTXT, tgt, 0, NULL, + &llog_client_ops); if (rc == 0) { - ctxt = llog_get_context(obd, LLOG_LOVEA_REPL_CTXT); - ctxt->loc_imp = obd->u.cli.cl_import; + ctxt = llog_group_get_ctxt(olg, LLOG_CHANGELOG_REPL_CTXT); + llog_initiator_connect(ctxt); + llog_ctxt_put(ctxt); } RETURN(rc); @@ -1298,61 +2117,228 @@ static int mdc_llog_init(struct obd_device *obd, struct obd_device *tgt, static int mdc_llog_finish(struct obd_device *obd, int count) { - int rc; + struct llog_ctxt *ctxt; + int rc = 0; ENTRY; - rc = llog_cleanup(llog_get_context(obd, LLOG_LOVEA_REPL_CTXT)); - if (rc) { - CERROR("can not cleanup LLOG_CONFIG_REPL_CTXT rc %d\n", rc); - } - rc = llog_cleanup(llog_get_context(obd, LLOG_CONFIG_REPL_CTXT)); + ctxt = llog_get_context(obd, LLOG_LOVEA_REPL_CTXT); + if (ctxt) + rc = llog_cleanup(ctxt); + + ctxt = llog_get_context(obd, LLOG_CHANGELOG_REPL_CTXT); + if (ctxt) + rc = llog_cleanup(ctxt); + RETURN(rc); } static int mdc_process_config(struct obd_device *obd, obd_count len, void *buf) { struct lustre_cfg *lcfg = buf; - struct lprocfs_static_vars lvars; + struct lprocfs_static_vars lvars = { 0 }; int rc = 0; - lprocfs_init_vars(mdc, &lvars); - - rc = class_process_proc_param(PARAM_MDC, lvars.obd_vars, lcfg, obd); + lprocfs_mdc_init_vars(&lvars); + switch (lcfg->lcfg_command) { + default: + rc = class_process_proc_param(PARAM_MDC, lvars.obd_vars, + lcfg, obd); + if (rc > 0) + rc = 0; + break; + } return(rc); } + +/* get remote permission for current user on fid */ +int mdc_get_remote_perm(struct obd_export *exp, const struct lu_fid *fid, + struct obd_capa *oc, __u32 suppgid, + struct ptlrpc_request **request) +{ + struct ptlrpc_request *req; + int rc; + ENTRY; + + LASSERT(client_is_remote(exp)); + + *request = NULL; + req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_GETATTR); + if (req == NULL) + RETURN(-ENOMEM); + + mdc_set_capa_size(req, &RMF_CAPA1, oc); + + rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_GETATTR); + if (rc) { + ptlrpc_request_free(req); + RETURN(rc); + } + + mdc_pack_body(req, fid, oc, OBD_MD_FLRMTPERM, 0, suppgid, 0); + + req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, + sizeof(struct mdt_remote_perm)); + + ptlrpc_request_set_replen(req); + + rc = ptlrpc_queue_wait(req); + if (rc) + ptlrpc_req_finished(req); + else + *request = req; + RETURN(rc); +} + +static int mdc_interpret_renew_capa(const struct lu_env *env, + struct ptlrpc_request *req, void *args, + int status) +{ + struct mdc_renew_capa_args *ra = args; + struct mdt_body *body = NULL; + struct lustre_capa *capa; + ENTRY; + + if (status) + GOTO(out, capa = ERR_PTR(status)); + + body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY); + if (body == NULL) + GOTO(out, capa = ERR_PTR(-EFAULT)); + + if ((body->valid & OBD_MD_FLOSSCAPA) == 0) + GOTO(out, capa = ERR_PTR(-ENOENT)); + + capa = req_capsule_server_get(&req->rq_pill, &RMF_CAPA2); + if (!capa) + GOTO(out, capa = ERR_PTR(-EFAULT)); + EXIT; +out: + ra->ra_cb(ra->ra_oc, capa); + return 0; +} + +static int mdc_renew_capa(struct obd_export *exp, struct obd_capa *oc, + renew_capa_cb_t cb) +{ + struct ptlrpc_request *req; + struct mdc_renew_capa_args *ra; + ENTRY; + + req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp), &RQF_MDS_GETATTR, + LUSTRE_MDS_VERSION, MDS_GETATTR); + if (req == NULL) + RETURN(-ENOMEM); + + /* NB, OBD_MD_FLOSSCAPA is set here, but it doesn't necessarily mean the + * capa to renew is oss capa. + */ + mdc_pack_body(req, &oc->c_capa.lc_fid, oc, OBD_MD_FLOSSCAPA, 0, -1, 0); + ptlrpc_request_set_replen(req); + + CLASSERT(sizeof(*ra) <= sizeof(req->rq_async_args)); + ra = ptlrpc_req_async_args(req); + ra->ra_oc = oc; + ra->ra_cb = cb; + req->rq_interpret_reply = mdc_interpret_renew_capa; + ptlrpcd_add_req(req, PSCOPE_OTHER); + RETURN(0); +} + +static int mdc_connect(const struct lu_env *env, + struct obd_export **exp, + struct obd_device *obd, struct obd_uuid *cluuid, + struct obd_connect_data *data, + void *localdata) +{ + struct obd_import *imp = obd->u.cli.cl_import; + + /* mds-mds import features */ + if (data && (data->ocd_connect_flags & OBD_CONNECT_MDS_MDS)) { + cfs_spin_lock(&imp->imp_lock); + imp->imp_server_timeout = 1; + cfs_spin_unlock(&imp->imp_lock); + imp->imp_client->cli_request_portal = MDS_MDS_PORTAL; + CDEBUG(D_OTHER, "%s: Set 'mds' portal and timeout\n", + obd->obd_name); + } + + return client_connect_import(env, exp, obd, cluuid, data, NULL); +} + struct obd_ops mdc_obd_ops = { - .o_owner = THIS_MODULE, - .o_setup = mdc_setup, - .o_precleanup = mdc_precleanup, - .o_cleanup = mdc_cleanup, - .o_add_conn = client_import_add_conn, - .o_del_conn = client_import_del_conn, - .o_connect = client_connect_import, - .o_disconnect = client_disconnect_export, - .o_iocontrol = mdc_iocontrol, - .o_set_info_async = mdc_set_info_async, - .o_get_info = mdc_get_info, - .o_statfs = mdc_statfs, - .o_pin = mdc_pin, - .o_unpin = mdc_unpin, - .o_import_event = mdc_import_event, - .o_llog_init = mdc_llog_init, - .o_llog_finish = mdc_llog_finish, - .o_process_config = mdc_process_config, + .o_owner = THIS_MODULE, + .o_setup = mdc_setup, + .o_precleanup = mdc_precleanup, + .o_cleanup = mdc_cleanup, + .o_add_conn = client_import_add_conn, + .o_del_conn = client_import_del_conn, + .o_connect = mdc_connect, + .o_disconnect = client_disconnect_export, + .o_iocontrol = mdc_iocontrol, + .o_set_info_async = mdc_set_info_async, + .o_statfs = mdc_statfs, + .o_pin = mdc_pin, + .o_unpin = mdc_unpin, + .o_fid_init = mdc_fid_init, + .o_fid_fini = mdc_fid_fini, + .o_fid_alloc = mdc_fid_alloc, + .o_fid_delete = mdc_fid_delete, + .o_import_event = mdc_import_event, + .o_llog_init = mdc_llog_init, + .o_llog_finish = mdc_llog_finish, + .o_get_info = mdc_get_info, + .o_process_config = mdc_process_config, + .o_get_uuid = mdc_get_uuid, +}; + +struct md_ops mdc_md_ops = { + .m_getstatus = mdc_getstatus, + .m_change_cbdata = mdc_change_cbdata, + .m_find_cbdata = mdc_find_cbdata, + .m_close = mdc_close, + .m_create = mdc_create, + .m_done_writing = mdc_done_writing, + .m_enqueue = mdc_enqueue, + .m_getattr = mdc_getattr, + .m_getattr_name = mdc_getattr_name, + .m_intent_lock = mdc_intent_lock, + .m_link = mdc_link, + .m_is_subdir = mdc_is_subdir, + .m_rename = mdc_rename, + .m_setattr = mdc_setattr, + .m_setxattr = mdc_setxattr, + .m_getxattr = mdc_getxattr, + .m_sync = mdc_sync, + .m_readpage = mdc_readpage, + .m_unlink = mdc_unlink, + .m_cancel_unused = mdc_cancel_unused, + .m_init_ea_size = mdc_init_ea_size, + .m_set_lock_data = mdc_set_lock_data, + .m_lock_match = mdc_lock_match, + .m_get_lustre_md = mdc_get_lustre_md, + .m_free_lustre_md = mdc_free_lustre_md, + .m_set_open_replay_data = mdc_set_open_replay_data, + .m_clear_open_replay_data = mdc_clear_open_replay_data, + .m_renew_capa = mdc_renew_capa, + .m_unpack_capa = mdc_unpack_capa, + .m_get_remote_perm = mdc_get_remote_perm, + .m_intent_getattr_async = mdc_intent_getattr_async, + .m_revalidate_lock = mdc_revalidate_lock }; int __init mdc_init(void) { int rc; - struct lprocfs_static_vars lvars; - lprocfs_init_vars(mdc, &lvars); - request_module("lquota"); + struct lprocfs_static_vars lvars = { 0 }; + lprocfs_mdc_init_vars(&lvars); + + cfs_request_module("lquota"); quota_interface = PORTAL_SYMBOL_GET(mdc_quota_interface); init_obd_quota_ops(quota_interface, &mdc_obd_ops); - rc = class_register_type(&mdc_obd_ops, lvars.module_vars, - LUSTRE_MDC_NAME); + rc = class_register_type(&mdc_obd_ops, &mdc_md_ops, lvars.module_vars, + LUSTRE_MDC_NAME, NULL); if (rc && quota_interface) PORTAL_SYMBOL_PUT(mdc_quota_interface); @@ -1368,32 +2354,10 @@ static void /*__exit*/ mdc_exit(void) class_unregister_type(LUSTRE_MDC_NAME); } -MODULE_AUTHOR("Cluster File Systems, Inc. "); +MODULE_AUTHOR("Sun Microsystems, Inc. "); MODULE_DESCRIPTION("Lustre Metadata Client"); MODULE_LICENSE("GPL"); -EXPORT_SYMBOL(mdc_req2lustre_md); -EXPORT_SYMBOL(mdc_free_lustre_md); -EXPORT_SYMBOL(mdc_change_cbdata); -EXPORT_SYMBOL(mdc_getstatus); -EXPORT_SYMBOL(mdc_getattr); -EXPORT_SYMBOL(mdc_getattr_name); -EXPORT_SYMBOL(mdc_create); -EXPORT_SYMBOL(mdc_unlink); -EXPORT_SYMBOL(mdc_rename); -EXPORT_SYMBOL(mdc_link); -EXPORT_SYMBOL(mdc_readpage); -EXPORT_SYMBOL(mdc_setattr); -EXPORT_SYMBOL(mdc_close); -EXPORT_SYMBOL(mdc_done_writing); -EXPORT_SYMBOL(mdc_sync); -EXPORT_SYMBOL(mdc_set_open_replay_data); -EXPORT_SYMBOL(mdc_clear_open_replay_data); -EXPORT_SYMBOL(mdc_store_inode_generation); -EXPORT_SYMBOL(mdc_init_ea_size); -EXPORT_SYMBOL(mdc_getxattr); -EXPORT_SYMBOL(mdc_setxattr); - module_init(mdc_init); module_exit(mdc_exit); #endif