X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fmdc%2Fmdc_request.c;h=daf1e2192c5f5e91e368644f8f7f2e28dd825cf5;hp=f378eb8393a1727181d85238075e745f81dc5ef9;hb=4031b885c1dd6b9f8af724b1b4ddf9f638b2b834;hpb=353ef58b1d2394c4721a340e2463b07d4069d99c diff --git a/lustre/mdc/mdc_request.c b/lustre/mdc/mdc_request.c index f378eb8..daf1e21 100644 --- a/lustre/mdc/mdc_request.c +++ b/lustre/mdc/mdc_request.c @@ -15,11 +15,7 @@ * * You should have received a copy of the GNU General Public License * version 2 along with this program; If not, see - * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf - * - * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, - * CA 95054 USA or visit www.sun.com if you need additional information or - * have any questions. + * http://www.gnu.org/licenses/gpl-2.0.html * * GPL HEADER END */ @@ -27,7 +23,7 @@ * Copyright (c) 2001, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2011, 2014, Intel Corporation. + * Copyright (c) 2011, 2017, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -36,27 +32,31 @@ #define DEBUG_SUBSYSTEM S_MDC -#include -#include -#include #include -#include #include +#include +#include #include +#include #ifdef HAVE_UIDGID_HEADER # include #endif +#include + +#include +#include +#include #include -#include -#include -#include #include -#include -#include -#include +#include #include -#include +#include +#include +#include +#include +#include +#include #include "mdc_internal.h" @@ -82,7 +82,22 @@ static inline int mdc_queue_wait(struct ptlrpc_request *req) return rc; } -static int mdc_getstatus(struct obd_export *exp, struct lu_fid *rootfid) +/* + * Send MDS_GET_ROOT RPC to fetch root FID. + * + * If \a fileset is not NULL it should contain a subdirectory off + * the ROOT/ directory to be mounted on the client. Return the FID + * of the subdirectory to the client to mount onto its mountpoint. + * + * \param[in] imp MDC import + * \param[in] fileset fileset name, which could be NULL + * \param[out] rootfid root FID of this mountpoint + * \param[out] pc root capa will be unpacked and saved in this pointer + * + * \retval 0 on success, negative errno on failure + */ +static int mdc_get_root(struct obd_export *exp, const char *fileset, + struct lu_fid *rootfid) { struct ptlrpc_request *req; struct mdt_body *body; @@ -90,13 +105,29 @@ static int mdc_getstatus(struct obd_export *exp, struct lu_fid *rootfid) ENTRY; - req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp), - &RQF_MDS_GETSTATUS, - LUSTRE_MDS_VERSION, MDS_GETSTATUS); + if (fileset && !(exp_connect_flags(exp) & OBD_CONNECT_SUBTREE)) + RETURN(-ENOTSUPP); + + req = ptlrpc_request_alloc(class_exp2cliimp(exp), + &RQF_MDS_GET_ROOT); if (req == NULL) RETURN(-ENOMEM); + if (fileset != NULL) + req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT, + strlen(fileset) + 1); + rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_GET_ROOT); + if (rc) { + ptlrpc_request_free(req); + RETURN(rc); + } mdc_pack_body(req, NULL, 0, 0, -1, 0); + if (fileset != NULL) { + char *name = req_capsule_client_get(&req->rq_pill, &RMF_NAME); + + memcpy(name, fileset, strlen(fileset)); + } + lustre_msg_add_flags(req->rq_reqmsg, LUSTRE_IMP_FULL); req->rq_send_state = LUSTRE_IMP_FULL; ptlrpc_request_set_replen(req); @@ -110,7 +141,7 @@ static int mdc_getstatus(struct obd_export *exp, struct lu_fid *rootfid) GOTO(out, rc = -EPROTO); *rootfid = body->mbo_fid1; - CDEBUG(D_NET, "root fid="DFID", last_committed="LPU64"\n", + CDEBUG(D_NET, "root fid="DFID", last_committed=%llu\n", PFID(rootfid), lustre_msg_get_last_committed(req->rq_repmsg)); EXIT; out: @@ -158,33 +189,37 @@ static int mdc_getattr_common(struct obd_export *exp, RETURN(-EPROTO); } - if (body->mbo_valid & OBD_MD_FLRMTPERM) { - struct mdt_remote_perm *perm; - - LASSERT(client_is_remote(exp)); - perm = req_capsule_server_swab_get(pill, &RMF_ACL, - lustre_swab_mdt_remote_perm); - if (perm == NULL) - RETURN(-EPROTO); - } - RETURN(0); } +static void mdc_reset_acl_req(struct ptlrpc_request *req) +{ + spin_lock(&req->rq_early_free_lock); + sptlrpc_cli_free_repbuf(req); + req->rq_repbuf = NULL; + req->rq_repbuf_len = 0; + req->rq_repdata = NULL; + req->rq_reqdata_len = 0; + spin_unlock(&req->rq_early_free_lock); +} + static int mdc_getattr(struct obd_export *exp, struct md_op_data *op_data, struct ptlrpc_request **request) { - struct ptlrpc_request *req; - int rc; - ENTRY; + struct ptlrpc_request *req; + struct obd_import *imp = class_exp2cliimp(exp); + __u32 acl_bufsize = LUSTRE_POSIX_ACL_MAX_SIZE_OLD; + int rc; + ENTRY; /* Single MDS without an LMV case */ if (op_data->op_flags & MF_GET_MDT_IDX) { op_data->op_mds = 0; RETURN(0); } - *request = NULL; - req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_GETATTR); + + *request = NULL; + req = ptlrpc_request_alloc(imp, &RQF_MDS_GETATTR); if (req == NULL) RETURN(-ENOMEM); @@ -194,36 +229,42 @@ static int mdc_getattr(struct obd_export *exp, struct md_op_data *op_data, RETURN(rc); } +again: mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid, op_data->op_mode, -1, 0); + req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize); + req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, + op_data->op_mode); + ptlrpc_request_set_replen(req); - req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, - op_data->op_mode); - if (op_data->op_valid & OBD_MD_FLRMTPERM) { - LASSERT(client_is_remote(exp)); - req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, - sizeof(struct mdt_remote_perm)); - } - ptlrpc_request_set_replen(req); + rc = mdc_getattr_common(exp, req); + if (rc) { + if (rc == -ERANGE && + acl_bufsize != imp->imp_connect_data.ocd_max_easize) { + acl_bufsize = imp->imp_connect_data.ocd_max_easize; + mdc_reset_acl_req(req); + goto again; + } - rc = mdc_getattr_common(exp, req); - if (rc) - ptlrpc_req_finished(req); - else - *request = req; - RETURN(rc); + ptlrpc_req_finished(req); + } else { + *request = req; + } + + RETURN(rc); } static int mdc_getattr_name(struct obd_export *exp, struct md_op_data *op_data, struct ptlrpc_request **request) { - struct ptlrpc_request *req; - int rc; - ENTRY; + struct ptlrpc_request *req; + struct obd_import *imp = class_exp2cliimp(exp); + __u32 acl_bufsize = LUSTRE_POSIX_ACL_MAX_SIZE_OLD; + int rc; + ENTRY; - *request = NULL; - req = ptlrpc_request_alloc(class_exp2cliimp(exp), - &RQF_MDS_GETATTR_NAME); + *request = NULL; + req = ptlrpc_request_alloc(imp, &RQF_MDS_GETATTR_NAME); if (req == NULL) RETURN(-ENOMEM); @@ -236,9 +277,6 @@ static int mdc_getattr_name(struct obd_export *exp, struct md_op_data *op_data, RETURN(rc); } - mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid, - op_data->op_mode, op_data->op_suppgids[0], 0); - if (op_data->op_name) { char *name = req_capsule_client_get(&req->rq_pill, &RMF_NAME); LASSERT(strnlen(op_data->op_name, op_data->op_namelen) == @@ -246,16 +284,29 @@ static int mdc_getattr_name(struct obd_export *exp, struct md_op_data *op_data, memcpy(name, op_data->op_name, op_data->op_namelen); } - req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, - op_data->op_mode); - ptlrpc_request_set_replen(req); +again: + mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid, + op_data->op_mode, op_data->op_suppgids[0], 0); + req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, + op_data->op_mode); + req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize); + ptlrpc_request_set_replen(req); - rc = mdc_getattr_common(exp, req); - if (rc) - ptlrpc_req_finished(req); - else - *request = req; - RETURN(rc); + rc = mdc_getattr_common(exp, req); + if (rc) { + if (rc == -ERANGE && + acl_bufsize != imp->imp_connect_data.ocd_max_easize) { + acl_bufsize = imp->imp_connect_data.ocd_max_easize; + mdc_reset_acl_req(req); + goto again; + } + + ptlrpc_req_finished(req); + } else { + *request = req; + } + + RETURN(rc); } static int mdc_xattr_common(struct obd_export *exp,const struct req_format *fmt, @@ -314,11 +365,11 @@ static int mdc_xattr_common(struct obd_export *exp,const struct req_format *fmt, } } - if (opcode == MDS_REINT) { - struct mdt_rec_setxattr *rec; + if (opcode == MDS_REINT) { + struct mdt_rec_setxattr *rec; - CLASSERT(sizeof(struct mdt_rec_setxattr) == - sizeof(struct mdt_rec_reint)); + CLASSERT(sizeof(struct mdt_rec_setxattr) == + sizeof(struct mdt_rec_reint)); rec = req_capsule_client_get(&req->rq_pill, &RMF_REC_REINT); rec->sx_opcode = REINT_SETXATTR; rec->sx_fsuid = from_kuid(&init_user_ns, current_fsuid()); @@ -328,7 +379,7 @@ static int mdc_xattr_common(struct obd_export *exp,const struct req_format *fmt, rec->sx_suppgid2 = -1; rec->sx_fid = *fid; rec->sx_valid = valid | OBD_MD_FLCTIME; - rec->sx_time = cfs_time_current_sec(); + rec->sx_time = ktime_get_real_seconds(); rec->sx_size = output_size; rec->sx_flags = flags; } else { @@ -351,12 +402,12 @@ static int mdc_xattr_common(struct obd_export *exp,const struct req_format *fmt, /* make rpc */ if (opcode == MDS_REINT) - mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL); + mdc_get_mod_rpc_slot(req, NULL); rc = ptlrpc_queue_wait(req); - if (opcode == MDS_REINT) - mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL); + if (opcode == MDS_REINT) + mdc_put_mod_rpc_slot(req, NULL); if (rc) ptlrpc_req_finished(req); @@ -366,26 +417,30 @@ static int mdc_xattr_common(struct obd_export *exp,const struct req_format *fmt, } static int mdc_setxattr(struct obd_export *exp, const struct lu_fid *fid, - u64 valid, const char *xattr_name, - const char *input, int input_size, int output_size, - int flags, __u32 suppgid, - struct ptlrpc_request **request) + u64 obd_md_valid, const char *name, + const void *value, size_t value_size, + unsigned int xattr_flags, u32 suppgid, + struct ptlrpc_request **req) { + LASSERT(obd_md_valid == OBD_MD_FLXATTR || + obd_md_valid == OBD_MD_FLXATTRRM); + return mdc_xattr_common(exp, &RQF_MDS_REINT_SETXATTR, - fid, MDS_REINT, valid, xattr_name, - input, input_size, output_size, flags, - suppgid, request); + fid, MDS_REINT, obd_md_valid, name, + value, value_size, 0, xattr_flags, suppgid, + req); } static int mdc_getxattr(struct obd_export *exp, const struct lu_fid *fid, - u64 valid, const char *xattr_name, - const char *input, int input_size, int output_size, - int flags, struct ptlrpc_request **request) + u64 obd_md_valid, const char *name, size_t buf_size, + struct ptlrpc_request **req) { - return mdc_xattr_common(exp, &RQF_MDS_GETXATTR, - fid, MDS_GETXATTR, valid, xattr_name, - input, input_size, output_size, flags, - -1, request); + LASSERT(obd_md_valid == OBD_MD_FLXATTR || + obd_md_valid == OBD_MD_FLXATTRLS); + + return mdc_xattr_common(exp, &RQF_MDS_GETXATTR, fid, MDS_GETXATTR, + obd_md_valid, name, NULL, 0, buf_size, 0, -1, + req); } #ifdef CONFIG_FS_POSIX_ACL @@ -415,7 +470,7 @@ static int mdc_unpack_acl(struct ptlrpc_request *req, struct lustre_md *md) RETURN(rc); } - rc = posix_acl_valid(acl); + rc = posix_acl_valid(&init_user_ns, acl); if (rc) { CERROR("validate acl: %d\n", rc); posix_acl_release(acl); @@ -463,8 +518,8 @@ int mdc_get_lustre_md(struct obd_export *exp, struct ptlrpc_request *req, if (md->layout.lb_buf == NULL) GOTO(out, rc = -EPROTO); } else if (md->body->mbo_valid & OBD_MD_FLDIREA) { - int lmvsize; - struct lov_mds_md *lmv; + const union lmv_mds_md *lmv; + size_t lmv_size; if (!S_ISDIR(md->body->mbo_mode)) { CDEBUG(D_INFO, "OBD_MD_FLDIREA set, should be a " @@ -472,21 +527,20 @@ int mdc_get_lustre_md(struct obd_export *exp, struct ptlrpc_request *req, GOTO(out, rc = -EPROTO); } - if (md->body->mbo_eadatasize == 0) { + lmv_size = md->body->mbo_eadatasize; + if (lmv_size == 0) { CDEBUG(D_INFO, "OBD_MD_FLDIREA is set, " "but eadatasize 0\n"); RETURN(-EPROTO); } if (md->body->mbo_valid & OBD_MD_MEA) { - lmvsize = md->body->mbo_eadatasize; lmv = req_capsule_server_sized_get(pill, &RMF_MDT_MD, - lmvsize); - if (!lmv) + lmv_size); + if (lmv == NULL) GOTO(out, rc = -EPROTO); - rc = obd_unpackmd(md_exp, (void *)&md->lmv, lmv, - lmvsize); + rc = md_unpackmd(md_exp, &md->lmv, lmv, lmv_size); if (rc < 0) GOTO(out, rc); @@ -500,14 +554,7 @@ int mdc_get_lustre_md(struct obd_export *exp, struct ptlrpc_request *req, } rc = 0; - if (md->body->mbo_valid & OBD_MD_FLRMTPERM) { - /* remote permission */ - LASSERT(client_is_remote(exp)); - md->remote_perm = req_capsule_server_swab_get(pill, &RMF_ACL, - lustre_swab_mdt_remote_perm); - if (!md->remote_perm) - GOTO(out, rc = -EPROTO); - } else if (md->body->mbo_valid & OBD_MD_FLACL) { + if (md->body->mbo_valid & OBD_MD_FLACL) { /* for ACL, it's possible that FLACL is set but aclsize is zero. * only when aclsize != 0 there's an actual segment for ACL * in reply buffer. @@ -558,29 +605,37 @@ void mdc_replay_open(struct ptlrpc_request *req) body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY); LASSERT(body != NULL); - och = mod->mod_och; - if (och != NULL) { - struct lustre_handle *file_fh; + spin_lock(&req->rq_lock); + och = mod->mod_och; + if (och && och->och_fh.cookie) + req->rq_early_free_repbuf = 1; + else + req->rq_early_free_repbuf = 0; + spin_unlock(&req->rq_lock); + + if (req->rq_early_free_repbuf) { + struct lustre_handle *file_fh; - LASSERT(och->och_magic == OBD_CLIENT_HANDLE_MAGIC); + LASSERT(och->och_magic == OBD_CLIENT_HANDLE_MAGIC); - file_fh = &och->och_fh; - CDEBUG(D_HA, "updating handle from "LPX64" to "LPX64"\n", + file_fh = &och->och_fh; + CDEBUG(D_HA, "updating handle from %#llx to %#llx\n", file_fh->cookie, body->mbo_handle.cookie); old = *file_fh; *file_fh = body->mbo_handle; - } - close_req = mod->mod_close_req; - if (close_req != NULL) { - __u32 opc = lustre_msg_get_opc(close_req->rq_reqmsg); - struct mdt_ioepoch *epoch; + } + + close_req = mod->mod_close_req; + if (close_req) { + __u32 opc = lustre_msg_get_opc(close_req->rq_reqmsg); + struct mdt_ioepoch *epoch; LASSERT(opc == MDS_CLOSE); epoch = req_capsule_client_get(&close_req->rq_pill, &RMF_MDT_EPOCH); LASSERT(epoch); - if (och != NULL) + if (req->rq_early_free_repbuf) LASSERT(!memcmp(&old, &epoch->mio_handle, sizeof(old))); DEBUG_REQ(D_HA, close_req, "updating close body with new fh"); @@ -624,7 +679,7 @@ int mdc_set_open_replay_data(struct obd_export *exp, struct md_open_data *mod; struct mdt_rec_create *rec; struct mdt_body *body; - struct ptlrpc_request *open_req = it->d.lustre.it_data; + struct ptlrpc_request *open_req = it->it_request; struct obd_import *imp = open_req->rq_import; ENTRY; @@ -664,11 +719,11 @@ int mdc_set_open_replay_data(struct obd_export *exp, mod->mod_open_req = open_req; open_req->rq_cb_data = mod; open_req->rq_commit_cb = mdc_commit_open; + open_req->rq_early_free_repbuf = 1; spin_unlock(&open_req->rq_lock); } rec->cr_fid2 = body->mbo_fid1; - rec->cr_ioepoch = body->mbo_ioepoch; rec->cr_old_handle.cookie = body->mbo_handle.cookie; open_req->rq_replay_cb = mdc_replay_open; if (!fid_is_sane(&body->mbo_fid1)) { @@ -689,9 +744,15 @@ static void mdc_free_open(struct md_open_data *mod) imp_connect_disp_stripe(mod->mod_open_req->rq_import)) committed = 1; - LASSERT(mod->mod_open_req->rq_replay == 0); + /** + * No reason to asssert here if the open request has + * rq_replay == 1. It means that mdc_close failed, and + * close request wasn`t sent. It is not fatal to client. + * The worst thing is eviction if the client gets open lock + **/ - DEBUG_REQ(D_RPCTRACE, mod->mod_open_req, "free open request\n"); + DEBUG_REQ(D_RPCTRACE, mod->mod_open_req, "free open request rq_replay" + "= %d\n", mod->mod_open_req->rq_replay); ptlrpc_request_committed(mod->mod_open_req, committed); if (mod->mod_close_req) @@ -711,8 +772,14 @@ int mdc_clear_open_replay_data(struct obd_export *exp, if (mod == NULL) RETURN(0); - LASSERT(mod != LP_POISON); + LASSERT(mod != LP_POISON); LASSERT(mod->mod_open_req != NULL); + + spin_lock(&mod->mod_open_req->rq_lock); + if (mod->mod_och) + mod->mod_och->och_fh.cookie = 0; + mod->mod_open_req->rq_early_free_repbuf = 0; + spin_unlock(&mod->mod_open_req->rq_lock); mdc_free_open(mod); mod->mod_och = NULL; @@ -728,37 +795,82 @@ static int mdc_close(struct obd_export *exp, struct md_op_data *op_data, struct obd_device *obd = class_exp2obd(exp); struct ptlrpc_request *req; struct req_format *req_fmt; + size_t u32_count = 0; int rc; int saved_rc = 0; ENTRY; - if (op_data->op_bias & MDS_HSM_RELEASE) { - req_fmt = &RQF_MDS_INTENT_CLOSE; + CDEBUG(D_INODE, "%s: "DFID" file closed with intent: %x\n", + exp->exp_obd->obd_name, PFID(&op_data->op_fid1), + op_data->op_bias); + + if (op_data->op_bias & MDS_CLOSE_INTENT) { + req_fmt = &RQF_MDS_CLOSE_INTENT; + if (op_data->op_bias & MDS_HSM_RELEASE) { + /* allocate a FID for volatile file */ + rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, + op_data); + if (rc < 0) { + CERROR("%s: "DFID" allocating FID: rc = %d\n", + obd->obd_name, PFID(&op_data->op_fid1), + rc); + /* save the errcode and proceed to close */ + saved_rc = rc; + } + } + if (op_data->op_bias & MDS_CLOSE_RESYNC_DONE) { + size_t count = op_data->op_data_size / sizeof(__u32); - /* allocate a FID for volatile file */ - rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data); - if (rc < 0) { - CERROR("%s: "DFID" failed to allocate FID: %d\n", - obd->obd_name, PFID(&op_data->op_fid1), rc); - /* save the errcode and proceed to close */ - saved_rc = rc; + if (count > INLINE_RESYNC_ARRAY_SIZE) + u32_count = count; } - } else if (op_data->op_bias & MDS_CLOSE_LAYOUT_SWAP) { - req_fmt = &RQF_MDS_INTENT_CLOSE; } else { req_fmt = &RQF_MDS_CLOSE; } *request = NULL; - req = ptlrpc_request_alloc(class_exp2cliimp(exp), req_fmt); - if (req == NULL) - RETURN(-ENOMEM); + if (OBD_FAIL_CHECK(OBD_FAIL_MDC_CLOSE)) + req = NULL; + else + req = ptlrpc_request_alloc(class_exp2cliimp(exp), req_fmt); + + /* Ensure that this close's handle is fixed up during replay. */ + if (likely(mod != NULL)) { + LASSERTF(mod->mod_open_req != NULL && + mod->mod_open_req->rq_type != LI_POISON, + "POISONED open %p!\n", mod->mod_open_req); + + mod->mod_close_req = req; + + DEBUG_REQ(D_HA, mod->mod_open_req, "matched open"); + /* We no longer want to preserve this open for replay even + * though the open was committed. b=3632, b=3633 */ + spin_lock(&mod->mod_open_req->rq_lock); + mod->mod_open_req->rq_replay = 0; + spin_unlock(&mod->mod_open_req->rq_lock); + } else { + CDEBUG(D_HA, "couldn't find open req; expecting close error\n"); + } + if (req == NULL) { + /** + * TODO: repeat close after errors + */ + CWARN("%s: close of FID "DFID" failed, file reference will be " + "dropped when this client unmounts or is evicted\n", + obd->obd_name, PFID(&op_data->op_fid1)); + GOTO(out, rc = -ENOMEM); + } - rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_CLOSE); - if (rc) { - ptlrpc_request_free(req); - RETURN(rc); - } + if (u32_count > 0) + req_capsule_set_size(&req->rq_pill, &RMF_U32, RCL_CLIENT, + u32_count * sizeof(__u32)); + + rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_CLOSE); + if (rc) { + ptlrpc_request_free(req); + req = NULL; + GOTO(out, rc); + } /* To avoid a livelock (bug 7034), we need to send CLOSE RPCs to a * portal whose threads are not taking any DLM locks and are therefore @@ -766,23 +878,6 @@ static int mdc_close(struct obd_export *exp, struct md_op_data *op_data, req->rq_request_portal = MDS_READPAGE_PORTAL; ptlrpc_at_set_req_timeout(req); - /* Ensure that this close's handle is fixed up during replay. */ - if (likely(mod != NULL)) { - LASSERTF(mod->mod_open_req != NULL && - mod->mod_open_req->rq_type != LI_POISON, - "POISONED open %p!\n", mod->mod_open_req); - - mod->mod_close_req = req; - - DEBUG_REQ(D_HA, mod->mod_open_req, "matched open"); - /* We no longer want to preserve this open for replay even - * though the open was committed. b=3632, b=3633 */ - spin_lock(&mod->mod_open_req->rq_lock); - mod->mod_open_req->rq_replay = 0; - spin_unlock(&mod->mod_open_req->rq_lock); - } else { - CDEBUG(D_HA, "couldn't find open req; expecting close error\n"); - } mdc_close_pack(req, op_data); @@ -791,9 +886,9 @@ static int mdc_close(struct obd_export *exp, struct md_op_data *op_data, ptlrpc_request_set_replen(req); - mdc_get_rpc_lock(obd->u.cli.cl_close_lock, NULL); - rc = ptlrpc_queue_wait(req); - mdc_put_rpc_lock(obd->u.cli.cl_close_lock, NULL); + mdc_get_mod_rpc_slot(req, NULL); + rc = ptlrpc_queue_wait(req); + mdc_put_mod_rpc_slot(req, NULL); if (req->rq_repmsg == NULL) { CDEBUG(D_RPCTRACE, "request failed to send: %p, %d\n", req, @@ -827,6 +922,7 @@ static int mdc_close(struct obd_export *exp, struct md_op_data *op_data, } } +out: if (mod) { if (rc != 0) mod->mod_close_req = NULL; @@ -874,16 +970,16 @@ restart_bulk: MDS_BULK_PORTAL, &ptlrpc_bulk_kiov_pin_ops); if (desc == NULL) { - ptlrpc_request_free(req); + ptlrpc_req_finished(req); RETURN(-ENOMEM); } /* NB req now owns desc and will free it when it gets freed */ for (i = 0; i < npages; i++) desc->bd_frag_ops->add_kiov_frag(desc, pages[i], 0, - PAGE_CACHE_SIZE); + PAGE_SIZE); - mdc_readdir_pack(req, offset, PAGE_CACHE_SIZE * npages, fid); + mdc_readdir_pack(req, offset, PAGE_SIZE * npages, fid); ptlrpc_request_set_replen(req); rc = ptlrpc_queue_wait(req); @@ -915,7 +1011,7 @@ restart_bulk: if (req->rq_bulk->bd_nob_transferred & ~LU_PAGE_MASK) { CERROR("%s: unexpected bytes transferred: %d (%ld expected)\n", exp->exp_obd->obd_name, req->rq_bulk->bd_nob_transferred, - PAGE_CACHE_SIZE * npages); + PAGE_SIZE * npages); ptlrpc_req_finished(req); RETURN(-EPROTO); } @@ -932,7 +1028,7 @@ static void mdc_release_page(struct page *page, int remove) truncate_complete_page(page->mapping, page); unlock_page(page); } - page_cache_release(page); + put_page(page); } static struct page *mdc_page_locate(struct address_space *mapping, __u64 *hash, @@ -953,7 +1049,7 @@ static struct page *mdc_page_locate(struct address_space *mapping, __u64 *hash, if (found > 0 && !radix_tree_exceptional_entry(page)) { struct lu_dirpage *dp; - page_cache_get(page); + get_page(page); spin_unlock_irq(&mapping->tree_lock); /* * In contrast to find_lock_page() we are sure that directory @@ -977,11 +1073,11 @@ static struct page *mdc_page_locate(struct address_space *mapping, __u64 *hash, if (unlikely(*start == 1 && *hash == 0)) *hash = *start; else - LASSERTF(*start <= *hash, "start = "LPX64 - ",end = "LPX64",hash = "LPX64"\n", + LASSERTF(*start <= *hash, "start = %#llx" + ",end = %#llx,hash = %#llx\n", *start, *end, *hash); - CDEBUG(D_VFSTRACE, "offset %lx ["LPX64" "LPX64"]," - " hash "LPX64"\n", offset, *start, *end, *hash); + CDEBUG(D_VFSTRACE, "offset %lx [%#llx %#llx]," + " hash %#llx\n", offset, *start, *end, *hash); if (*hash > *end) { kunmap(page); mdc_release_page(page, 0); @@ -999,7 +1095,7 @@ static struct page *mdc_page_locate(struct address_space *mapping, __u64 *hash, page = NULL; } } else { - page_cache_release(page); + put_page(page); page = ERR_PTR(-EIO); } } else { @@ -1028,7 +1124,7 @@ static struct page *mdc_page_locate(struct address_space *mapping, __u64 *hash, * |s|e|f|p|ent| 0 | ... | 0 | * '----------------- -----' * - * However, on hosts where the native VM page size (PAGE_CACHE_SIZE) is + * However, on hosts where the native VM page size (PAGE_SIZE) is * larger than LU_PAGE_SIZE, a single host page may contain multiple * lu_dirpages. After reading the lu_dirpages from the MDS, the * ldp_hash_end of the first lu_dirpage refers to the one immediately @@ -1059,7 +1155,7 @@ static struct page *mdc_page_locate(struct address_space *mapping, __u64 *hash, * - Adjust the lde_reclen of the ending entry of each lu_dirpage to span * to the first entry of the next lu_dirpage. */ -#if PAGE_CACHE_SIZE > LU_PAGE_SIZE +#if PAGE_SIZE > LU_PAGE_SIZE static void mdc_adjust_dirpages(struct page **pages, int cfs_pgs, int lu_pgs) { int i; @@ -1110,7 +1206,7 @@ static void mdc_adjust_dirpages(struct page **pages, int cfs_pgs, int lu_pgs) } #else #define mdc_adjust_dirpages(pages, cfs_pgs, lu_pgs) do {} while (0) -#endif /* PAGE_CACHE_SIZE > LU_PAGE_SIZE */ +#endif /* PAGE_SIZE > LU_PAGE_SIZE */ /* parameters for readdir page */ struct readpage_param { @@ -1125,7 +1221,7 @@ struct readpage_param { static inline void delete_from_page_cache(struct page *page) { remove_from_page_cache(page); - page_cache_release(page); + put_page(page); } #endif @@ -1141,22 +1237,22 @@ static inline void delete_from_page_cache(struct page *page) **/ static int mdc_read_page_remote(void *data, struct page *page0) { - struct readpage_param *rp = data; - struct page **page_pool; - struct page *page; - struct lu_dirpage *dp; - int rd_pgs = 0; /* number of pages read actually */ - int npages; - struct md_op_data *op_data = rp->rp_mod; - struct ptlrpc_request *req; - int max_pages = op_data->op_max_pages; - struct inode *inode; - struct lu_fid *fid; - int i; - int rc; + struct readpage_param *rp = data; + struct page **page_pool; + struct page *page; + struct lu_dirpage *dp; + struct md_op_data *op_data = rp->rp_mod; + struct ptlrpc_request *req; + int max_pages; + struct inode *inode; + struct lu_fid *fid; + int rd_pgs = 0; /* number of pages actually read */ + int npages; + int i; + int rc; ENTRY; - LASSERT(max_pages > 0 && max_pages <= PTLRPC_MAX_BRW_PAGES); + max_pages = rp->rp_exp->exp_obd->u.cli.cl_max_pages_per_rpc; inode = op_data->op_data; fid = &op_data->op_fid1; LASSERT(inode != NULL); @@ -1170,7 +1266,8 @@ static int mdc_read_page_remote(void *data, struct page *page0) } for (npages = 1; npages < max_pages; npages++) { - page = page_cache_alloc_cold(inode->i_mapping); + page = __page_cache_alloc(mapping_gfp_mask(inode->i_mapping) + | __GFP_COLD); if (page == NULL) break; page_pool[npages] = page; @@ -1183,10 +1280,9 @@ static int mdc_read_page_remote(void *data, struct page *page0) } else { int lu_pgs; - rd_pgs = (req->rq_bulk->bd_nob_transferred + - PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT; - lu_pgs = req->rq_bulk->bd_nob_transferred >> - LU_PAGE_SHIFT; + rd_pgs = (req->rq_bulk->bd_nob_transferred + PAGE_SIZE - 1) >> + PAGE_SHIFT; + lu_pgs = req->rq_bulk->bd_nob_transferred >> LU_PAGE_SHIFT; LASSERT(!(req->rq_bulk->bd_nob_transferred & ~LU_PAGE_MASK)); CDEBUG(D_INODE, "read %d(%d) pages\n", rd_pgs, lu_pgs); @@ -1207,7 +1303,7 @@ static int mdc_read_page_remote(void *data, struct page *page0) page = page_pool[i]; if (rc < 0 || i >= rd_pgs) { - page_cache_release(page); + put_page(page); continue; } @@ -1227,7 +1323,7 @@ static int mdc_read_page_remote(void *data, struct page *page0) else CDEBUG(D_VFSTRACE, "page %lu add to page cache failed:" " rc = %d\n", offset, ret); - page_cache_release(page); + put_page(page); } if (page_pool != &page0) @@ -1286,14 +1382,15 @@ static int mdc_read_page(struct obd_export *exp, struct md_op_data *op_data, } rc = 0; - mdc_set_lock_data(exp, &it.d.lustre.it_lock_handle, dir, NULL); + lockh.cookie = it.it_lock_handle; + mdc_set_lock_data(exp, &lockh, dir, NULL); rp_param.rp_off = hash_offset; rp_param.rp_hash64 = op_data->op_cli_flags & CLI_HASH64; page = mdc_page_locate(mapping, &rp_param.rp_off, &start, &end, rp_param.rp_hash64); if (IS_ERR(page)) { - CERROR("%s: dir page locate: "DFID" at "LPU64": rc %ld\n", + CERROR("%s: dir page locate: "DFID" at %llu: rc %ld\n", exp->exp_obd->obd_name, PFID(&op_data->op_fid1), rp_param.rp_off, PTR_ERR(page)); GOTO(out_unlock, rc = PTR_ERR(page)); @@ -1322,7 +1419,7 @@ static int mdc_read_page(struct obd_export *exp, struct md_op_data *op_data, rp_param.rp_hash64), mdc_read_page_remote, &rp_param); if (IS_ERR(page)) { - CDEBUG(D_INFO, "%s: read cache page: "DFID" at "LPU64": %ld\n", + CDEBUG(D_INFO, "%s: read cache page: "DFID" at %llu: %ld\n", exp->exp_obd->obd_name, PFID(&op_data->op_fid1), rp_param.rp_off, PTR_ERR(page)); GOTO(out_unlock, rc = PTR_ERR(page)); @@ -1331,7 +1428,7 @@ static int mdc_read_page(struct obd_export *exp, struct md_op_data *op_data, wait_on_page_locked(page); (void)kmap(page); if (!PageUptodate(page)) { - CERROR("%s: page not updated: "DFID" at "LPU64": rc %d\n", + CERROR("%s: page not updated: "DFID" at %llu: rc %d\n", exp->exp_obd->obd_name, PFID(&op_data->op_fid1), rp_param.rp_off, -5); goto fail; @@ -1339,7 +1436,7 @@ static int mdc_read_page(struct obd_export *exp, struct md_op_data *op_data, if (!PageChecked(page)) SetPageChecked(page); if (PageError(page)) { - CERROR("%s: page error: "DFID" at "LPU64": rc %d\n", + CERROR("%s: page error: "DFID" at %llu: rc %d\n", exp->exp_obd->obd_name, PFID(&op_data->op_fid1), rp_param.rp_off, -5); goto fail; @@ -1360,8 +1457,8 @@ hash_collision: LASSERT(start == rp_param.rp_off); CWARN("Page-wide hash collision: %#lx\n", (unsigned long)end); #if BITS_PER_LONG == 32 - CWARN("Real page-wide hash collision at ["LPU64" "LPU64"] with " - "hash "LPU64"\n", le64_to_cpu(dp->ldp_hash_start), + CWARN("Real page-wide hash collision at [%llu %llu] with " + "hash %llu\n", le64_to_cpu(dp->ldp_hash_start), le64_to_cpu(dp->ldp_hash_end), hash_offset); #endif @@ -1374,9 +1471,7 @@ hash_collision: } *ppage = page; out_unlock: - lockh.cookie = it.d.lustre.it_lock_handle; - ldlm_lock_decref(&lockh, it.d.lustre.it_lock_mode); - it.d.lustre.it_lock_handle = 0; + ldlm_lock_decref(&lockh, it.it_lock_mode); return rc; fail: kunmap(page); @@ -1385,10 +1480,9 @@ fail: goto out_unlock; } - static int mdc_statfs(const struct lu_env *env, struct obd_export *exp, struct obd_statfs *osfs, - __u64 max_age, __u32 flags) + time64_t max_age, __u32 flags) { struct obd_device *obd = class_exp2obd(exp); struct ptlrpc_request *req; @@ -1444,28 +1538,30 @@ output: static int mdc_ioc_fid2path(struct obd_export *exp, struct getinfo_fid2path *gf) { - __u32 keylen, vallen; - void *key; - int rc; - - if (gf->gf_pathlen > PATH_MAX) - RETURN(-ENAMETOOLONG); - if (gf->gf_pathlen < 2) - RETURN(-EOVERFLOW); + __u32 keylen, vallen; + void *key; + int rc; - /* Key is KEY_FID2PATH + getinfo_fid2path description */ - keylen = cfs_size_round(sizeof(KEY_FID2PATH)) + sizeof(*gf); - OBD_ALLOC(key, keylen); - if (key == NULL) - RETURN(-ENOMEM); - memcpy(key, KEY_FID2PATH, sizeof(KEY_FID2PATH)); - memcpy(key + cfs_size_round(sizeof(KEY_FID2PATH)), gf, sizeof(*gf)); + if (gf->gf_pathlen > PATH_MAX) + RETURN(-ENAMETOOLONG); + if (gf->gf_pathlen < 2) + RETURN(-EOVERFLOW); - CDEBUG(D_IOCTL, "path get "DFID" from "LPU64" #%d\n", - PFID(&gf->gf_fid), gf->gf_recno, gf->gf_linkno); + /* Key is KEY_FID2PATH + getinfo_fid2path description */ + keylen = cfs_size_round(sizeof(KEY_FID2PATH) + sizeof(*gf) + + sizeof(struct lu_fid)); + OBD_ALLOC(key, keylen); + if (key == NULL) + RETURN(-ENOMEM); + memcpy(key, KEY_FID2PATH, sizeof(KEY_FID2PATH)); + memcpy(key + cfs_size_round(sizeof(KEY_FID2PATH)), gf, sizeof(*gf)); + memcpy(key + cfs_size_round(sizeof(KEY_FID2PATH)) + sizeof(*gf), + gf->gf_u.gf_root_fid, sizeof(struct lu_fid)); + CDEBUG(D_IOCTL, "path get "DFID" from %llu #%d\n", + PFID(&gf->gf_fid), gf->gf_recno, gf->gf_linkno); - if (!fid_is_sane(&gf->gf_fid)) - GOTO(out, rc = -EINVAL); + if (!fid_is_sane(&gf->gf_fid)) + GOTO(out, rc = -EINVAL); /* Val is struct getinfo_fid2path result plus path */ vallen = sizeof(*gf) + gf->gf_pathlen; @@ -1474,17 +1570,20 @@ static int mdc_ioc_fid2path(struct obd_export *exp, struct getinfo_fid2path *gf) if (rc != 0 && rc != -EREMOTE) GOTO(out, rc); - if (vallen <= sizeof(*gf)) - GOTO(out, rc = -EPROTO); - else if (vallen > sizeof(*gf) + gf->gf_pathlen) - GOTO(out, rc = -EOVERFLOW); + if (vallen <= sizeof(*gf)) + GOTO(out, rc = -EPROTO); + if (vallen > sizeof(*gf) + gf->gf_pathlen) + GOTO(out, rc = -EOVERFLOW); - CDEBUG(D_IOCTL, "path get "DFID" from "LPU64" #%d\n%s\n", - PFID(&gf->gf_fid), gf->gf_recno, gf->gf_linkno, gf->gf_path); + CDEBUG(D_IOCTL, "path got "DFID" from %llu #%d: %s\n", + PFID(&gf->gf_fid), gf->gf_recno, gf->gf_linkno, + gf->gf_pathlen < 512 ? gf->gf_u.gf_path : + /* only log the last 512 characters of the path */ + gf->gf_u.gf_path + gf->gf_pathlen - 512); out: - OBD_FREE(key, keylen); - return rc; + OBD_FREE(key, keylen); + return rc; } static int mdc_ioc_hsm_progress(struct obd_export *exp, @@ -1501,7 +1600,7 @@ static int mdc_ioc_hsm_progress(struct obd_export *exp, if (req == NULL) GOTO(out, rc = -ENOMEM); - mdc_pack_body(req, NULL, OBD_MD_FLRMTPERM, 0, -1, 0); + mdc_pack_body(req, NULL, 0, 0, -1, 0); /* Copy hsm_progress struct */ req_hpk = req_capsule_client_get(&req->rq_pill, &RMF_MDS_HSM_PROGRESS); @@ -1513,9 +1612,9 @@ static int mdc_ioc_hsm_progress(struct obd_export *exp, ptlrpc_request_set_replen(req); - mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL); + mdc_get_mod_rpc_slot(req, NULL); rc = ptlrpc_queue_wait(req); - mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL); + mdc_put_mod_rpc_slot(req, NULL); GOTO(out, rc); out: @@ -1536,7 +1635,7 @@ static int mdc_ioc_hsm_ct_register(struct obd_import *imp, __u32 archives) if (req == NULL) GOTO(out, rc = -ENOMEM); - mdc_pack_body(req, NULL, OBD_MD_FLRMTPERM, 0, -1, 0); + mdc_pack_body(req, NULL, 0, 0, -1, 0); /* Copy hsm_progress struct */ archive_mask = req_capsule_client_get(&req->rq_pill, @@ -1575,7 +1674,7 @@ static int mdc_ioc_hsm_current_action(struct obd_export *exp, RETURN(rc); } - mdc_pack_body(req, &op_data->op_fid1, OBD_MD_FLRMTPERM, 0, + mdc_pack_body(req, &op_data->op_fid1, 0, 0, op_data->op_suppgids[0], 0); ptlrpc_request_set_replen(req); @@ -1609,7 +1708,7 @@ static int mdc_ioc_hsm_ct_unregister(struct obd_import *imp) if (req == NULL) GOTO(out, rc = -ENOMEM); - mdc_pack_body(req, NULL, OBD_MD_FLRMTPERM, 0, -1, 0); + mdc_pack_body(req, NULL, 0, 0, -1, 0); ptlrpc_request_set_replen(req); @@ -1640,7 +1739,7 @@ static int mdc_ioc_hsm_state_get(struct obd_export *exp, RETURN(rc); } - mdc_pack_body(req, &op_data->op_fid1, OBD_MD_FLRMTPERM, 0, + mdc_pack_body(req, &op_data->op_fid1, 0, 0, op_data->op_suppgids[0], 0); ptlrpc_request_set_replen(req); @@ -1681,7 +1780,7 @@ static int mdc_ioc_hsm_state_set(struct obd_export *exp, RETURN(rc); } - mdc_pack_body(req, &op_data->op_fid1, OBD_MD_FLRMTPERM, 0, + mdc_pack_body(req, &op_data->op_fid1, 0, 0, op_data->op_suppgids[0], 0); /* Copy states */ @@ -1692,9 +1791,9 @@ static int mdc_ioc_hsm_state_set(struct obd_export *exp, ptlrpc_request_set_replen(req); - mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL); + mdc_get_mod_rpc_slot(req, NULL); rc = ptlrpc_queue_wait(req); - mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL); + mdc_put_mod_rpc_slot(req, NULL); GOTO(out, rc); out: @@ -1729,7 +1828,7 @@ static int mdc_ioc_hsm_request(struct obd_export *exp, RETURN(rc); } - mdc_pack_body(req, NULL, OBD_MD_FLRMTPERM, 0, -1, 0); + mdc_pack_body(req, NULL, 0, 0, -1, 0); /* Copy hsm_request struct */ req_hr = req_capsule_client_get(&req->rq_pill, &RMF_MDS_HSM_REQUEST); @@ -1752,9 +1851,9 @@ static int mdc_ioc_hsm_request(struct obd_export *exp, ptlrpc_request_set_replen(req); - mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL); + mdc_get_mod_rpc_slot(req, NULL); rc = ptlrpc_queue_wait(req); - mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL); + mdc_put_mod_rpc_slot(req, NULL); GOTO(out, rc); @@ -1763,209 +1862,43 @@ out: return rc; } -static struct kuc_hdr *changelog_kuc_hdr(char *buf, size_t len, __u32 flags) -{ - struct kuc_hdr *lh = (struct kuc_hdr *)buf; - - LASSERT(len <= KUC_CHANGELOG_MSG_MAXSIZE); - - lh->kuc_magic = KUC_MAGIC; - lh->kuc_transport = KUC_TRANSPORT_CHANGELOG; - lh->kuc_flags = flags; - lh->kuc_msgtype = CL_RECORD; - lh->kuc_msglen = len; - return lh; -} - -struct changelog_show { - __u64 cs_startrec; - enum changelog_send_flag cs_flags; - struct file *cs_fp; - char *cs_buf; - struct obd_device *cs_obd; -}; - -static inline char *cs_obd_name(struct changelog_show *cs) -{ - return cs->cs_obd->obd_name; -} - -static int changelog_kkuc_cb(const struct lu_env *env, struct llog_handle *llh, - struct llog_rec_hdr *hdr, void *data) -{ - struct changelog_show *cs = data; - struct llog_changelog_rec *rec = (struct llog_changelog_rec *)hdr; - struct kuc_hdr *lh; - size_t len; - int rc; - ENTRY; - - if (rec->cr_hdr.lrh_type != CHANGELOG_REC) { - rc = -EINVAL; - CERROR("%s: not a changelog rec %x/%d: rc = %d\n", - cs_obd_name(cs), rec->cr_hdr.lrh_type, - rec->cr.cr_type, rc); - RETURN(rc); - } - - if (rec->cr.cr_index < cs->cs_startrec) { - /* Skip entries earlier than what we are interested in */ - CDEBUG(D_HSM, "rec="LPU64" start="LPU64"\n", - rec->cr.cr_index, cs->cs_startrec); - RETURN(0); - } - - CDEBUG(D_HSM, LPU64" %02d%-5s "LPU64" 0x%x t="DFID" p="DFID" %.*s\n", - rec->cr.cr_index, rec->cr.cr_type, - changelog_type2str(rec->cr.cr_type), rec->cr.cr_time, - rec->cr.cr_flags & CLF_FLAGMASK, - PFID(&rec->cr.cr_tfid), PFID(&rec->cr.cr_pfid), - rec->cr.cr_namelen, changelog_rec_name(&rec->cr)); - - len = sizeof(*lh) + changelog_rec_size(&rec->cr) + rec->cr.cr_namelen; - - /* Set up the message */ - lh = changelog_kuc_hdr(cs->cs_buf, len, cs->cs_flags); - memcpy(lh + 1, &rec->cr, len - sizeof(*lh)); - - rc = libcfs_kkuc_msg_put(cs->cs_fp, lh); - CDEBUG(D_HSM, "kucmsg fp %p len %zu rc %d\n", cs->cs_fp, len, rc); - - RETURN(rc); -} - -static int mdc_changelog_send_thread(void *csdata) -{ - struct changelog_show *cs = csdata; - struct llog_ctxt *ctxt = NULL; - struct llog_handle *llh = NULL; - struct kuc_hdr *kuch; - enum llog_flag flags = LLOG_F_IS_CAT; - int rc; - - CDEBUG(D_HSM, "changelog to fp=%p start "LPU64"\n", - cs->cs_fp, cs->cs_startrec); - - OBD_ALLOC(cs->cs_buf, KUC_CHANGELOG_MSG_MAXSIZE); - if (cs->cs_buf == NULL) - GOTO(out, rc = -ENOMEM); - - /* Set up the remote catalog handle */ - ctxt = llog_get_context(cs->cs_obd, LLOG_CHANGELOG_REPL_CTXT); - if (ctxt == NULL) - GOTO(out, rc = -ENOENT); - rc = llog_open(NULL, ctxt, &llh, NULL, CHANGELOG_CATALOG, - LLOG_OPEN_EXISTS); - if (rc) { - CERROR("%s: fail to open changelog catalog: rc = %d\n", - cs_obd_name(cs), rc); - GOTO(out, rc); - } - - if (cs->cs_flags & CHANGELOG_FLAG_JOBID) - flags |= LLOG_F_EXT_JOBID; - - rc = llog_init_handle(NULL, llh, flags, NULL); - if (rc) { - CERROR("llog_init_handle failed %d\n", rc); - GOTO(out, rc); - } - - rc = llog_cat_process(NULL, llh, changelog_kkuc_cb, cs, 0, 0); - - /* Send EOF no matter what our result */ - kuch = changelog_kuc_hdr(cs->cs_buf, sizeof(*kuch), cs->cs_flags); - kuch->kuc_msgtype = CL_EOF; - libcfs_kkuc_msg_put(cs->cs_fp, kuch); - -out: - fput(cs->cs_fp); - if (llh) - llog_cat_close(NULL, llh); - if (ctxt) - llog_ctxt_put(ctxt); - if (cs->cs_buf) - OBD_FREE(cs->cs_buf, KUC_CHANGELOG_MSG_MAXSIZE); - OBD_FREE_PTR(cs); - return rc; -} - -static int mdc_ioc_changelog_send(struct obd_device *obd, - struct ioc_changelog *icc) -{ - struct changelog_show *cs; - struct task_struct *task; - int rc; - - /* Freed in mdc_changelog_send_thread */ - OBD_ALLOC_PTR(cs); - if (!cs) - return -ENOMEM; - - cs->cs_obd = obd; - cs->cs_startrec = icc->icc_recno; - /* matching fput in mdc_changelog_send_thread */ - cs->cs_fp = fget(icc->icc_id); - cs->cs_flags = icc->icc_flags; - - /* - * New thread because we should return to user app before - * writing into our pipe - */ - task = kthread_run(mdc_changelog_send_thread, cs, - "mdc_clg_send_thread"); - if (IS_ERR(task)) { - rc = PTR_ERR(task); - CERROR("%s: cannot start changelog thread: rc = %d\n", - cs_obd_name(cs), rc); - OBD_FREE_PTR(cs); - } else { - rc = 0; - CDEBUG(D_HSM, "%s: started changelog thread\n", - cs_obd_name(cs)); - } - - return rc; -} - static int mdc_ioc_hsm_ct_start(struct obd_export *exp, struct lustre_kernelcomm *lk); static int mdc_quotactl(struct obd_device *unused, struct obd_export *exp, struct obd_quotactl *oqctl) { - struct ptlrpc_request *req; - struct obd_quotactl *oqc; - int rc; - ENTRY; + struct ptlrpc_request *req; + struct obd_quotactl *oqc; + int rc; + ENTRY; - req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp), - &RQF_MDS_QUOTACTL, LUSTRE_MDS_VERSION, - MDS_QUOTACTL); - if (req == NULL) - RETURN(-ENOMEM); + req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp), + &RQF_MDS_QUOTACTL, LUSTRE_MDS_VERSION, + MDS_QUOTACTL); + if (req == NULL) + RETURN(-ENOMEM); - oqc = req_capsule_client_get(&req->rq_pill, &RMF_OBD_QUOTACTL); - *oqc = *oqctl; + oqc = req_capsule_client_get(&req->rq_pill, &RMF_OBD_QUOTACTL); + *oqc = *oqctl; - ptlrpc_request_set_replen(req); - ptlrpc_at_set_req_timeout(req); - req->rq_no_resend = 1; + ptlrpc_request_set_replen(req); + ptlrpc_at_set_req_timeout(req); - rc = ptlrpc_queue_wait(req); - if (rc) - CERROR("ptlrpc_queue_wait failed, rc: %d\n", rc); - - if (req->rq_repmsg && - (oqc = req_capsule_server_get(&req->rq_pill, &RMF_OBD_QUOTACTL))) { - *oqctl = *oqc; - } else if (!rc) { - CERROR ("Can't unpack obd_quotactl\n"); - rc = -EPROTO; - } - ptlrpc_req_finished(req); + rc = ptlrpc_queue_wait(req); + if (rc) + CERROR("ptlrpc_queue_wait failed, rc: %d\n", rc); + + if (req->rq_repmsg && + (oqc = req_capsule_server_get(&req->rq_pill, &RMF_OBD_QUOTACTL))) { + *oqctl = *oqc; + } else if (!rc) { + CERROR ("Can't unpack obd_quotactl\n"); + rc = -EPROTO; + } + ptlrpc_req_finished(req); - RETURN(rc); + RETURN(rc); } static int mdc_ioc_swap_layouts(struct obd_export *exp, @@ -2027,30 +1960,18 @@ out: static int mdc_iocontrol(unsigned int cmd, struct obd_export *exp, int len, void *karg, void __user *uarg) { - struct obd_device *obd = exp->exp_obd; - struct obd_ioctl_data *data = karg; - struct obd_import *imp = obd->u.cli.cl_import; - int rc; - ENTRY; + struct obd_device *obd = exp->exp_obd; + struct obd_ioctl_data *data = karg; + struct obd_import *imp = obd->u.cli.cl_import; + int rc; + ENTRY; if (!try_module_get(THIS_MODULE)) { CERROR("%s: cannot get module '%s'\n", obd->obd_name, module_name(THIS_MODULE)); return -EINVAL; } - switch (cmd) { - case OBD_IOC_CHANGELOG_SEND: - rc = mdc_ioc_changelog_send(obd, karg); - GOTO(out, rc); - case OBD_IOC_CHANGELOG_CLEAR: { - struct ioc_changelog *icc = karg; - struct changelog_setinfo cs = - {.cs_recno = icc->icc_recno, .cs_id = icc->icc_id}; - rc = obd_set_info_async(NULL, exp, strlen(KEY_CHANGELOG_CLEAR), - KEY_CHANGELOG_CLEAR, sizeof(cs), &cs, - NULL); - GOTO(out, rc); - } + switch (cmd) { case OBD_IOC_FID2PATH: rc = mdc_ioc_fid2path(exp, karg); GOTO(out, rc); @@ -2075,49 +1996,49 @@ static int mdc_iocontrol(unsigned int cmd, struct obd_export *exp, int len, case LL_IOC_HSM_REQUEST: rc = mdc_ioc_hsm_request(exp, karg); GOTO(out, rc); - case OBD_IOC_CLIENT_RECOVER: - rc = ptlrpc_recover_import(imp, data->ioc_inlbuf1, 0); - if (rc < 0) - GOTO(out, rc); - GOTO(out, rc = 0); - case IOC_OSC_SET_ACTIVE: - rc = ptlrpc_set_import_active(imp, data->ioc_offset); - GOTO(out, rc); - case OBD_IOC_PING_TARGET: - rc = ptlrpc_obd_ping(obd); - GOTO(out, rc); - /* - * Normally IOC_OBD_STATFS, OBD_IOC_QUOTACTL iocontrol are handled by - * LMV instead of MDC. But when the cluster is upgraded from 1.8, - * there'd be no LMV layer thus we might be called here. Eventually - * this code should be removed. - * bz20731, LU-592. - */ - case IOC_OBD_STATFS: { - struct obd_statfs stat_buf = {0}; + case OBD_IOC_CLIENT_RECOVER: + rc = ptlrpc_recover_import(imp, data->ioc_inlbuf1, 0); + if (rc < 0) + GOTO(out, rc); + GOTO(out, rc = 0); + case IOC_OSC_SET_ACTIVE: + rc = ptlrpc_set_import_active(imp, data->ioc_offset); + GOTO(out, rc); + case OBD_IOC_PING_TARGET: + rc = ptlrpc_obd_ping(obd); + GOTO(out, rc); + /* + * Normally IOC_OBD_STATFS, OBD_IOC_QUOTACTL iocontrol are handled by + * LMV instead of MDC. But when the cluster is upgraded from 1.8, + * there'd be no LMV layer thus we might be called here. Eventually + * this code should be removed. + * bz20731, LU-592. + */ + case IOC_OBD_STATFS: { + struct obd_statfs stat_buf = {0}; - if (*((__u32 *) data->ioc_inlbuf2) != 0) - GOTO(out, rc = -ENODEV); + if (*((__u32 *) data->ioc_inlbuf2) != 0) + GOTO(out, rc = -ENODEV); - /* copy UUID */ + /* copy UUID */ if (copy_to_user(data->ioc_pbuf2, obd2cli_tgt(obd), min((int)data->ioc_plen2, (int)sizeof(struct obd_uuid)))) GOTO(out, rc = -EFAULT); rc = mdc_statfs(NULL, obd->obd_self_export, &stat_buf, - cfs_time_shift_64(-OBD_STATFS_CACHE_SECONDS), + ktime_get_seconds() - OBD_STATFS_CACHE_SECONDS, 0); if (rc != 0) GOTO(out, rc); if (copy_to_user(data->ioc_pbuf1, &stat_buf, - min((int) data->ioc_plen1, - (int) sizeof(stat_buf)))) - GOTO(out, rc = -EFAULT); + min((int) data->ioc_plen1, + (int) sizeof(stat_buf)))) + GOTO(out, rc = -EFAULT); - GOTO(out, rc = 0); - } + GOTO(out, rc = 0); + } case OBD_IOC_QUOTACTL: { struct if_quotactl *qctl = karg; struct obd_quotactl *oqctl; @@ -2270,7 +2191,8 @@ static int mdc_ioc_hsm_ct_start(struct obd_export *exp, * @param val KUC message (kuc_hdr + hsm_action_list) * @param len total length of message */ -static int mdc_hsm_copytool_send(size_t len, void *val) +static int mdc_hsm_copytool_send(const struct obd_uuid *uuid, + size_t len, void *val) { struct kuc_hdr *lh = (struct kuc_hdr *)val; struct hsm_action_list *hal = (struct hsm_action_list *)(lh + 1); @@ -2296,7 +2218,7 @@ static int mdc_hsm_copytool_send(size_t len, void *val) lh->kuc_msglen, hal->hal_count, hal->hal_fsname); /* Broadcast to HSM listeners */ - rc = libcfs_kkuc_group_put(KUC_GRP_HSM, lh); + rc = libcfs_kkuc_group_put(uuid, KUC_GRP_HSM, lh); RETURN(rc); } @@ -2316,9 +2238,6 @@ static int mdc_hsm_ct_reregister(void *data, void *cb_arg) if (kcd == NULL || kcd->kcd_magic != KKUC_CT_DATA_MAGIC) return -EPROTO; - if (!obd_uuid_equals(&kcd->kcd_uuid, &imp->imp_obd->obd_uuid)) - return 0; - CDEBUG(D_HA, "%s: recover copytool registration to MDT (archive=%#x)\n", imp->imp_obd->obd_name, kcd->kcd_archive); rc = mdc_ioc_hsm_ct_register(imp, kcd->kcd_archive); @@ -2334,8 +2253,8 @@ static int mdc_hsm_ct_reregister(void *data, void *cb_arg) static int mdc_kuc_reregister(struct obd_import *imp) { /* re-register HSM agents */ - return libcfs_kkuc_group_foreach(KUC_GRP_HSM, mdc_hsm_ct_reregister, - (void *)imp); + return libcfs_kkuc_group_foreach(&imp->imp_obd->obd_uuid, KUC_GRP_HSM, + mdc_hsm_ct_reregister, imp); } static int mdc_set_info_async(const struct lu_env *env, @@ -2368,21 +2287,14 @@ static int mdc_set_info_async(const struct lu_env *env, keylen, key, vallen, val, set); RETURN(rc); } - if (KEY_IS(KEY_SPTLRPC_CONF)) { - sptlrpc_conf_client_adapt(exp->exp_obd); - RETURN(0); - } - if (KEY_IS(KEY_FLUSH_CTX)) { - sptlrpc_import_flush_my_ctx(imp); - RETURN(0); - } if (KEY_IS(KEY_CHANGELOG_CLEAR)) { rc = do_set_info_async(imp, MDS_SET_INFO, LUSTRE_MDS_VERSION, keylen, key, vallen, val, set); RETURN(rc); } if (KEY_IS(KEY_HSM_COPYTOOL_SEND)) { - rc = mdc_hsm_copytool_send(vallen, val); + rc = mdc_hsm_copytool_send(&imp->imp_obd->obd_uuid, vallen, + val); RETURN(rc); } @@ -2393,8 +2305,8 @@ static int mdc_set_info_async(const struct lu_env *env, RETURN(0); } - CERROR("Unknown key %s\n", (char *)key); - RETURN(-EINVAL); + rc = osc_set_info_async(env, exp, keylen, key, vallen, val, set); + RETURN(rc); } static int mdc_get_info(const struct lu_env *env, struct obd_export *exp, @@ -2471,65 +2383,95 @@ static int mdc_fsync(struct obd_export *exp, const struct lu_fid *fid, } static int mdc_import_event(struct obd_device *obd, struct obd_import *imp, - enum obd_import_event event) + enum obd_import_event event) { - int rc = 0; - - LASSERT(imp->imp_obd == obd); - - switch (event) { - case IMP_EVENT_DISCON: { -#if 0 - /* XXX Pass event up to OBDs stack. used only for FLD now */ - rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DISCON, NULL); -#endif - break; - } - case IMP_EVENT_INACTIVE: { - struct client_obd *cli = &obd->u.cli; - /* - * Flush current sequence to make client obtain new one - * from server in case of disconnect/reconnect. - */ - if (cli->cl_seq != NULL) - seq_client_flush(cli->cl_seq); + struct client_obd *cli = &obd->u.cli; + int rc = 0; - rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL); - break; - } - case IMP_EVENT_INVALIDATE: { - struct ldlm_namespace *ns = obd->obd_namespace; + LASSERT(imp->imp_obd == obd); - ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY); + switch (event) { + case IMP_EVENT_DISCON: + spin_lock(&cli->cl_loi_list_lock); + cli->cl_avail_grant = 0; + cli->cl_lost_grant = 0; + spin_unlock(&cli->cl_loi_list_lock); + break; + case IMP_EVENT_INACTIVE: + /* + * Flush current sequence to make client obtain new one + * from server in case of disconnect/reconnect. + */ + down_read(&cli->cl_seq_rwsem); + if (cli->cl_seq) + seq_client_flush(cli->cl_seq); + up_read(&cli->cl_seq_rwsem); - break; - } + rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE); + break; + case IMP_EVENT_INVALIDATE: { + struct ldlm_namespace *ns = obd->obd_namespace; + struct lu_env *env; + __u16 refcheck; + + ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY); + + env = cl_env_get(&refcheck); + if (!IS_ERR(env)) { + /* Reset grants. All pages go to failing rpcs due to + * the invalid import. + */ + osc_io_unplug(env, cli, NULL); + + cfs_hash_for_each_nolock(ns->ns_rs_hash, + osc_ldlm_resource_invalidate, + env, 0); + cl_env_put(env, &refcheck); + ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY); + } else { + rc = PTR_ERR(env); + } + break; + } case IMP_EVENT_ACTIVE: - rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL); + rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE); /* redo the kuc registration after reconnecting */ if (rc == 0) rc = mdc_kuc_reregister(imp); break; - case IMP_EVENT_OCD: - rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL); - break; - case IMP_EVENT_DEACTIVATE: - case IMP_EVENT_ACTIVATE: - break; - default: - CERROR("Unknown import event %x\n", event); - LBUG(); - } - RETURN(rc); + case IMP_EVENT_OCD: { + struct obd_connect_data *ocd = &imp->imp_connect_data; + + if (OCD_HAS_FLAG(ocd, GRANT)) + osc_init_grant(cli, ocd); + + rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD); + break; + } + case IMP_EVENT_DEACTIVATE: + case IMP_EVENT_ACTIVATE: + break; + default: + CERROR("Unknown import event %x\n", event); + LBUG(); + } + RETURN(rc); } int mdc_fid_alloc(const struct lu_env *env, struct obd_export *exp, struct lu_fid *fid, struct md_op_data *op_data) { struct client_obd *cli = &exp->exp_obd->u.cli; - struct lu_client_seq *seq = cli->cl_seq; + int rc = -EIO; + ENTRY; - RETURN(seq_client_alloc_fid(env, seq, fid)); + + down_read(&cli->cl_seq_rwsem); + if (cli->cl_seq) + rc = seq_client_alloc_fid(env, cli->cl_seq, fid); + up_read(&cli->cl_seq_rwsem); + + RETURN(rc); } static struct obd_uuid *mdc_get_uuid(struct obd_export *exp) @@ -2602,36 +2544,19 @@ static void mdc_llog_finish(struct obd_device *obd) EXIT; } -static int mdc_setup(struct obd_device *obd, struct lustre_cfg *cfg) +int mdc_setup(struct obd_device *obd, struct lustre_cfg *cfg) { - struct client_obd *cli = &obd->u.cli; - int rc; - ENTRY; + int rc; - OBD_ALLOC(cli->cl_rpc_lock, sizeof (*cli->cl_rpc_lock)); - if (!cli->cl_rpc_lock) - RETURN(-ENOMEM); - mdc_init_rpc_lock(cli->cl_rpc_lock); + ENTRY; - rc = ptlrpcd_addref(); + rc = osc_setup_common(obd, cfg); if (rc < 0) - GOTO(err_rpc_lock, rc); - - OBD_ALLOC(cli->cl_close_lock, sizeof (*cli->cl_close_lock)); - if (!cli->cl_close_lock) - GOTO(err_ptlrpcd_decref, rc = -ENOMEM); - mdc_init_rpc_lock(cli->cl_close_lock); + RETURN(rc); - rc = client_obd_setup(obd, cfg); - if (rc) - GOTO(err_close_lock, rc); -#ifdef CONFIG_PROC_FS - obd->obd_vars = lprocfs_mdc_obd_vars; - lprocfs_obd_setup(obd); - lprocfs_alloc_md_stats(obd, 0); -#endif - sptlrpc_lprocfs_cliobd_attach(obd); - ptlrpc_lprocfs_register_obd(obd); + rc = mdc_tunables_init(obd); + if (rc) + GOTO(err_osc_cleanup, rc); ns_register_cancel(obd->obd_namespace, mdc_cancel_weight); @@ -2639,23 +2564,28 @@ static int mdc_setup(struct obd_device *obd, struct lustre_cfg *cfg) rc = mdc_llog_init(obd); if (rc) { - mdc_cleanup(obd); - CERROR("failed to setup llogging subsystems\n"); - RETURN(rc); + CERROR("%s: failed to setup llogging subsystems: rc = %d\n", + obd->obd_name, rc); + GOTO(err_llog_cleanup, rc); } - spin_lock_init(&cli->cl_mod_rpcs_lock); - cli->cl_max_mod_rpcs_in_flight = OBD_MAX_RIF_DEFAULT - 1; + rc = mdc_changelog_cdev_init(obd); + if (rc) { + CERROR("%s: failed to setup changelog char device: rc = %d\n", + obd->obd_name, rc); + GOTO(err_changelog_cleanup, rc); + } - RETURN(rc); + RETURN(rc); -err_close_lock: - OBD_FREE(cli->cl_close_lock, sizeof (*cli->cl_close_lock)); -err_ptlrpcd_decref: - ptlrpcd_decref(); -err_rpc_lock: - OBD_FREE(cli->cl_rpc_lock, sizeof (*cli->cl_rpc_lock)); - RETURN(rc); +err_changelog_cleanup: + mdc_llog_finish(obd); +err_llog_cleanup: + lprocfs_free_md_stats(obd); + ptlrpc_lprocfs_unregister_obd(obd); +err_osc_cleanup: + osc_cleanup_common(obd); + return rc; } /* Initialize the default and maximum LOV EA sizes. This allows @@ -2682,83 +2612,32 @@ static int mdc_init_ea_size(struct obd_export *exp, __u32 easize, RETURN(0); } -static int mdc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage) -{ - int rc = 0; - ENTRY; - - switch (stage) { - case OBD_CLEANUP_EARLY: - break; - case OBD_CLEANUP_EXPORTS: - /* Failsafe, ok if racy */ - if (obd->obd_type->typ_refcnt <= 1) - libcfs_kkuc_group_rem(0, KUC_GRP_HSM, NULL); - - obd_cleanup_client_import(obd); - ptlrpc_lprocfs_unregister_obd(obd); - lprocfs_obd_cleanup(obd); - lprocfs_free_md_stats(obd); - mdc_llog_finish(obd); - break; - } - RETURN(rc); -} - -static int mdc_cleanup(struct obd_device *obd) +static int mdc_precleanup(struct obd_device *obd) { - struct client_obd *cli = &obd->u.cli; - - OBD_FREE(cli->cl_rpc_lock, sizeof (*cli->cl_rpc_lock)); - OBD_FREE(cli->cl_close_lock, sizeof (*cli->cl_close_lock)); + ENTRY; - ptlrpcd_decref(); + osc_precleanup_common(obd); + mdc_changelog_cdev_finish(obd); - return client_obd_cleanup(obd); + obd_cleanup_client_import(obd); + ptlrpc_lprocfs_unregister_obd(obd); + lprocfs_free_md_stats(obd); + mdc_llog_finish(obd); + RETURN(0); } -static int mdc_process_config(struct obd_device *obd, size_t len, void *buf) +static int mdc_cleanup(struct obd_device *obd) { - struct lustre_cfg *lcfg = buf; - int rc = class_process_proc_param(PARAM_MDC, obd->obd_vars, lcfg, obd); - return (rc > 0 ? 0: rc); + return osc_cleanup_common(obd); } - -/* get remote permission for current user on fid */ -static int mdc_get_remote_perm(struct obd_export *exp, const struct lu_fid *fid, - u32 suppgid, struct ptlrpc_request **request) +int mdc_process_config(struct obd_device *obd, size_t len, void *buf) { - struct ptlrpc_request *req; - int rc; - ENTRY; - - LASSERT(client_is_remote(exp)); - - *request = NULL; - req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_GETATTR); - if (req == NULL) - RETURN(-ENOMEM); - - rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_GETATTR); - if (rc) { - ptlrpc_request_free(req); - RETURN(rc); - } - - mdc_pack_body(req, fid, OBD_MD_FLRMTPERM, 0, suppgid, 0); - - req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, - sizeof(struct mdt_remote_perm)); + struct lustre_cfg *lcfg = buf; + size_t count = class_modify_config(lcfg, PARAM_MDC, + &obd->obd_kset.kobj); - ptlrpc_request_set_replen(req); - - rc = ptlrpc_queue_wait(req); - if (rc) - ptlrpc_req_finished(req); - else - *request = req; - RETURN(rc); + return count > 0 ? 0 : count; } static struct obd_ops mdc_obd_ops = { @@ -2769,7 +2648,8 @@ static struct obd_ops mdc_obd_ops = { .o_add_conn = client_import_add_conn, .o_del_conn = client_import_del_conn, .o_connect = client_connect_import, - .o_disconnect = client_disconnect_export, + .o_reconnect = osc_reconnect, + .o_disconnect = osc_disconnect, .o_iocontrol = mdc_iocontrol, .o_set_info_async = mdc_set_info_async, .o_statfs = mdc_statfs, @@ -2784,9 +2664,8 @@ static struct obd_ops mdc_obd_ops = { }; static struct md_ops mdc_md_ops = { - .m_getstatus = mdc_getstatus, + .m_get_root = mdc_get_root, .m_null_inode = mdc_null_inode, - .m_find_cbdata = mdc_find_cbdata, .m_close = mdc_close, .m_create = mdc_create, .m_enqueue = mdc_enqueue, @@ -2799,6 +2678,7 @@ static struct md_ops mdc_md_ops = { .m_setxattr = mdc_setxattr, .m_getxattr = mdc_getxattr, .m_fsync = mdc_fsync, + .m_file_resync = mdc_file_resync, .m_read_page = mdc_read_page, .m_unlink = mdc_unlink, .m_cancel_unused = mdc_cancel_unused, @@ -2809,7 +2689,6 @@ static struct md_ops mdc_md_ops = { .m_free_lustre_md = mdc_free_lustre_md, .m_set_open_replay_data = mdc_set_open_replay_data, .m_clear_open_replay_data = mdc_clear_open_replay_data, - .m_get_remote_perm = mdc_get_remote_perm, .m_intent_getattr_async = mdc_intent_getattr_async, .m_revalidate_lock = mdc_revalidate_lock }; @@ -2817,16 +2696,17 @@ static struct md_ops mdc_md_ops = { static int __init mdc_init(void) { return class_register_type(&mdc_obd_ops, &mdc_md_ops, true, NULL, - LUSTRE_MDC_NAME, NULL); + LUSTRE_MDC_NAME, &mdc_device_type); } -static void /*__exit*/ mdc_exit(void) +static void __exit mdc_exit(void) { class_unregister_type(LUSTRE_MDC_NAME); } -MODULE_AUTHOR("Sun Microsystems, Inc. "); +MODULE_AUTHOR("OpenSFS, Inc. "); MODULE_DESCRIPTION("Lustre Metadata Client"); +MODULE_VERSION(LUSTRE_VERSION_STRING); MODULE_LICENSE("GPL"); module_init(mdc_init);