X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fmdc%2Fmdc_request.c;h=2a5b2c7537def5f378f13c15cf041963960c7406;hp=b6a55b7173c495ad25c3a6e45db7ddf742269b47;hb=3e76334402a71e37aed4e9a5166d0141e30af375;hpb=7da1d93e2320bd7e6db8062350a90a651a00f84b diff --git a/lustre/mdc/mdc_request.c b/lustre/mdc/mdc_request.c index b6a55b7..2a5b2c7 100644 --- a/lustre/mdc/mdc_request.c +++ b/lustre/mdc/mdc_request.c @@ -23,7 +23,7 @@ * Copyright (c) 2001, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2011, 2016, Intel Corporation. + * Copyright (c) 2011, 2017, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -34,29 +34,30 @@ #include #include -#include #include #include #include #include -#ifdef HAVE_UIDGID_HEADER -# include -#endif +#include +#include +#include +#include -#include +#include #include #include #include #include +#include #include -#include +#include #include #include #include -#include #include #include +#include #include "mdc_internal.h" @@ -192,20 +193,34 @@ static int mdc_getattr_common(struct obd_export *exp, RETURN(0); } +static void mdc_reset_acl_req(struct ptlrpc_request *req) +{ + spin_lock(&req->rq_early_free_lock); + sptlrpc_cli_free_repbuf(req); + req->rq_repbuf = NULL; + req->rq_repbuf_len = 0; + req->rq_repdata = NULL; + req->rq_reqdata_len = 0; + spin_unlock(&req->rq_early_free_lock); +} + static int mdc_getattr(struct obd_export *exp, struct md_op_data *op_data, struct ptlrpc_request **request) { - struct ptlrpc_request *req; - int rc; - ENTRY; + struct ptlrpc_request *req; + struct obd_import *imp = class_exp2cliimp(exp); + __u32 acl_bufsize = LUSTRE_POSIX_ACL_MAX_SIZE_OLD; + int rc; + ENTRY; /* Single MDS without an LMV case */ if (op_data->op_flags & MF_GET_MDT_IDX) { op_data->op_mds = 0; RETURN(0); } - *request = NULL; - req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_GETATTR); + + *request = NULL; + req = ptlrpc_request_alloc(imp, &RQF_MDS_GETATTR); if (req == NULL) RETURN(-ENOMEM); @@ -215,31 +230,43 @@ static int mdc_getattr(struct obd_export *exp, struct md_op_data *op_data, RETURN(rc); } +again: mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid, op_data->op_mode, -1, 0); + req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize); + req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, + op_data->op_mode); + ptlrpc_request_set_replen(req); - req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, - op_data->op_mode); - ptlrpc_request_set_replen(req); + rc = mdc_getattr_common(exp, req); + if (rc) { + if (rc == -ERANGE) { + acl_bufsize = min_t(__u32, + imp->imp_connect_data.ocd_max_easize, + XATTR_SIZE_MAX); + mdc_reset_acl_req(req); + goto again; + } - rc = mdc_getattr_common(exp, req); - if (rc) - ptlrpc_req_finished(req); - else - *request = req; - RETURN(rc); + ptlrpc_req_finished(req); + } else { + *request = req; + } + + RETURN(rc); } static int mdc_getattr_name(struct obd_export *exp, struct md_op_data *op_data, struct ptlrpc_request **request) { - struct ptlrpc_request *req; - int rc; - ENTRY; + struct ptlrpc_request *req; + struct obd_import *imp = class_exp2cliimp(exp); + __u32 acl_bufsize = LUSTRE_POSIX_ACL_MAX_SIZE_OLD; + int rc; + ENTRY; - *request = NULL; - req = ptlrpc_request_alloc(class_exp2cliimp(exp), - &RQF_MDS_GETATTR_NAME); + *request = NULL; + req = ptlrpc_request_alloc(imp, &RQF_MDS_GETATTR_NAME); if (req == NULL) RETURN(-ENOMEM); @@ -252,9 +279,6 @@ static int mdc_getattr_name(struct obd_export *exp, struct md_op_data *op_data, RETURN(rc); } - mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid, - op_data->op_mode, op_data->op_suppgids[0], 0); - if (op_data->op_name) { char *name = req_capsule_client_get(&req->rq_pill, &RMF_NAME); LASSERT(strnlen(op_data->op_name, op_data->op_namelen) == @@ -262,16 +286,30 @@ static int mdc_getattr_name(struct obd_export *exp, struct md_op_data *op_data, memcpy(name, op_data->op_name, op_data->op_namelen); } - req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, - op_data->op_mode); - ptlrpc_request_set_replen(req); +again: + mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid, + op_data->op_mode, op_data->op_suppgids[0], 0); + req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, + op_data->op_mode); + req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize); + ptlrpc_request_set_replen(req); - rc = mdc_getattr_common(exp, req); - if (rc) - ptlrpc_req_finished(req); - else - *request = req; - RETURN(rc); + rc = mdc_getattr_common(exp, req); + if (rc) { + if (rc == -ERANGE) { + acl_bufsize = min_t(__u32, + imp->imp_connect_data.ocd_max_easize, + XATTR_SIZE_MAX); + mdc_reset_acl_req(req); + goto again; + } + + ptlrpc_req_finished(req); + } else { + *request = req; + } + + RETURN(rc); } static int mdc_xattr_common(struct obd_export *exp,const struct req_format *fmt, @@ -291,21 +329,30 @@ static int mdc_xattr_common(struct obd_export *exp,const struct req_format *fmt, if (req == NULL) RETURN(-ENOMEM); - if (xattr_name) { - xattr_namelen = strlen(xattr_name) + 1; - req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT, - xattr_namelen); - } - if (input_size) { - LASSERT(input); - req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, - input_size); - } + if (xattr_name) { + xattr_namelen = strlen(xattr_name) + 1; + req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT, + xattr_namelen); + } + if (input_size) + LASSERT(input); + req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, + input_size); + + /* get SELinux policy info if any */ + rc = sptlrpc_get_sepol(req); + if (rc < 0) { + ptlrpc_request_free(req); + RETURN(rc); + } + req_capsule_set_size(&req->rq_pill, &RMF_SELINUX_POL, RCL_CLIENT, + strlen(req->rq_sepol) ? + strlen(req->rq_sepol) + 1 : 0); /* Flush local XATTR locks to get rid of a possible cancel RPC */ if (opcode == MDS_REINT && fid_is_sane(fid) && exp->exp_connect_data.ocd_ibits_known & MDS_INODELOCK_XATTR) { - struct list_head cancels = LIST_HEAD_INIT(cancels); + LIST_HEAD(cancels); int count; /* Without that packing would fail */ @@ -330,11 +377,11 @@ static int mdc_xattr_common(struct obd_export *exp,const struct req_format *fmt, } } - if (opcode == MDS_REINT) { - struct mdt_rec_setxattr *rec; + if (opcode == MDS_REINT) { + struct mdt_rec_setxattr *rec; - CLASSERT(sizeof(struct mdt_rec_setxattr) == - sizeof(struct mdt_rec_reint)); + BUILD_BUG_ON(sizeof(struct mdt_rec_setxattr) != + sizeof(struct mdt_rec_reint)); rec = req_capsule_client_get(&req->rq_pill, &RMF_REC_REINT); rec->sx_opcode = REINT_SETXATTR; rec->sx_fsuid = from_kuid(&init_user_ns, current_fsuid()); @@ -360,6 +407,8 @@ static int mdc_xattr_common(struct obd_export *exp,const struct req_format *fmt, memcpy(tmp, input, input_size); } + mdc_file_sepol_pack(req); + if (req_capsule_has_field(&req->rq_pill, &RMF_EADATA, RCL_SERVER)) req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER, output_size); @@ -367,12 +416,12 @@ static int mdc_xattr_common(struct obd_export *exp,const struct req_format *fmt, /* make rpc */ if (opcode == MDS_REINT) - mdc_get_mod_rpc_slot(req, NULL); + ptlrpc_get_mod_rpc_slot(req); rc = ptlrpc_queue_wait(req); if (opcode == MDS_REINT) - mdc_put_mod_rpc_slot(req, NULL); + ptlrpc_put_mod_rpc_slot(req); if (rc) ptlrpc_req_finished(req); @@ -382,72 +431,86 @@ static int mdc_xattr_common(struct obd_export *exp,const struct req_format *fmt, } static int mdc_setxattr(struct obd_export *exp, const struct lu_fid *fid, - u64 valid, const char *xattr_name, - const char *input, int input_size, int output_size, - int flags, __u32 suppgid, - struct ptlrpc_request **request) + u64 obd_md_valid, const char *name, + const void *value, size_t value_size, + unsigned int xattr_flags, u32 suppgid, + struct ptlrpc_request **req) { + LASSERT(obd_md_valid == OBD_MD_FLXATTR || + obd_md_valid == OBD_MD_FLXATTRRM); + return mdc_xattr_common(exp, &RQF_MDS_REINT_SETXATTR, - fid, MDS_REINT, valid, xattr_name, - input, input_size, output_size, flags, - suppgid, request); + fid, MDS_REINT, obd_md_valid, name, + value, value_size, 0, xattr_flags, suppgid, + req); } static int mdc_getxattr(struct obd_export *exp, const struct lu_fid *fid, - u64 valid, const char *xattr_name, - const char *input, int input_size, int output_size, - int flags, struct ptlrpc_request **request) + u64 obd_md_valid, const char *name, size_t buf_size, + struct ptlrpc_request **req) { - return mdc_xattr_common(exp, &RQF_MDS_GETXATTR, - fid, MDS_GETXATTR, valid, xattr_name, - input, input_size, output_size, flags, - -1, request); -} + struct mdt_body *body; + int rc; -#ifdef CONFIG_FS_POSIX_ACL -static int mdc_unpack_acl(struct ptlrpc_request *req, struct lustre_md *md) -{ - struct req_capsule *pill = &req->rq_pill; - struct mdt_body *body = md->body; - struct posix_acl *acl; - void *buf; - int rc; - ENTRY; + LASSERT(obd_md_valid == OBD_MD_FLXATTR || + obd_md_valid == OBD_MD_FLXATTRLS); - if (!body->mbo_aclsize) - RETURN(0); + /* Message below is checked in sanity-selinux test_20d + * and sanity-sec test_49 + */ + CDEBUG(D_INFO, "%s: get xattr '%s' for "DFID"\n", + exp->exp_obd->obd_name, name, PFID(fid)); + rc = mdc_xattr_common(exp, &RQF_MDS_GETXATTR, fid, MDS_GETXATTR, + obd_md_valid, name, NULL, 0, buf_size, 0, -1, + req); + if (rc < 0) + GOTO(out, rc); - buf = req_capsule_server_sized_get(pill, &RMF_ACL, body->mbo_aclsize); + body = req_capsule_server_get(&(*req)->rq_pill, &RMF_MDT_BODY); + if (body == NULL) + GOTO(out, rc = -EPROTO); - if (!buf) - RETURN(-EPROTO); + /* only detect the xattr size */ + if (buf_size == 0) { + /* LU-11109: Older MDTs do not distinguish + * between nonexistent xattrs and zero length + * values in this case. Newer MDTs will return + * -ENODATA or set OBD_MD_FLXATTR. */ + GOTO(out, rc = body->mbo_eadatasize); + } - acl = posix_acl_from_xattr(&init_user_ns, buf, body->mbo_aclsize); - if (acl == NULL) - RETURN(0); - if (IS_ERR(acl)) { - rc = PTR_ERR(acl); - CERROR("convert xattr to acl: %d\n", rc); - RETURN(rc); - } + if (body->mbo_eadatasize == 0) { + /* LU-11109: Newer MDTs set OBD_MD_FLXATTR on + * success so that we can distinguish between + * zero length value and nonexistent xattr. + * + * If OBD_MD_FLXATTR is not set then we keep + * the old behavior and return -ENODATA for + * getxattr() when mbo_eadatasize is 0. But + * -ENODATA only makes sense for getxattr() + * and not for listxattr(). */ + if (body->mbo_valid & OBD_MD_FLXATTR) + GOTO(out, rc = 0); + else if (obd_md_valid == OBD_MD_FLXATTR) + GOTO(out, rc = -ENODATA); + else + GOTO(out, rc = 0); + } - rc = posix_acl_valid(&init_user_ns, acl); - if (rc) { - CERROR("validate acl: %d\n", rc); - posix_acl_release(acl); - RETURN(rc); - } + GOTO(out, rc = body->mbo_eadatasize); +out: + if (rc < 0) { + ptlrpc_req_finished(*req); + *req = NULL; + } - md->posix_acl = acl; - RETURN(0); + return rc; } -#else -#define mdc_unpack_acl(req, md) 0 -#endif -int mdc_get_lustre_md(struct obd_export *exp, struct ptlrpc_request *req, - struct obd_export *dt_exp, struct obd_export *md_exp, - struct lustre_md *md) +static int mdc_get_lustre_md(struct obd_export *exp, struct ptlrpc_request *req, + struct obd_export *dt_exp, + struct obd_export *md_exp, + struct lustre_md *md) { struct req_capsule *pill = &req->rq_pill; int rc; @@ -488,14 +551,14 @@ int mdc_get_lustre_md(struct obd_export *exp, struct ptlrpc_request *req, GOTO(out, rc = -EPROTO); } - lmv_size = md->body->mbo_eadatasize; - if (lmv_size == 0) { - CDEBUG(D_INFO, "OBD_MD_FLDIREA is set, " - "but eadatasize 0\n"); - RETURN(-EPROTO); - } - if (md->body->mbo_valid & OBD_MD_MEA) { + lmv_size = md->body->mbo_eadatasize; + if (lmv_size == 0) { + CDEBUG(D_INFO, "OBD_MD_FLDIREA is set, " + "but eadatasize 0\n"); + RETURN(-EPROTO); + } + lmv = req_capsule_server_sized_get(pill, &RMF_MDT_MD, lmv_size); if (lmv == NULL) @@ -505,94 +568,130 @@ int mdc_get_lustre_md(struct obd_export *exp, struct ptlrpc_request *req, if (rc < 0) GOTO(out, rc); - if (rc < (typeof(rc))sizeof(*md->lmv)) { - CDEBUG(D_INFO, "size too small: " - "rc < sizeof(*md->lmv) (%d < %d)\n", - rc, (int)sizeof(*md->lmv)); + if (rc < (int)sizeof(*md->lmv)) { + struct lmv_foreign_md *lfm = md->lfm; + + /* short (< sizeof(struct lmv_stripe_md)) + * foreign LMV case + */ + if (lfm->lfm_magic != LMV_MAGIC_FOREIGN) { + CDEBUG(D_INFO, + "lmv size too small: %d < %d\n", + rc, (int)sizeof(*md->lmv)); + GOTO(out, rc = -EPROTO); + } + } + } + + /* since 2.12.58 intent_getattr fetches default LMV */ + if (md->body->mbo_valid & OBD_MD_DEFAULT_MEA) { + lmv_size = sizeof(struct lmv_user_md); + lmv = req_capsule_server_sized_get(pill, + &RMF_DEFAULT_MDT_MD, + lmv_size); + if (!lmv) + GOTO(out, rc = -EPROTO); + + rc = md_unpackmd(md_exp, &md->default_lmv, lmv, + lmv_size); + if (rc < 0) + GOTO(out, rc); + + if (rc < (int)sizeof(*md->default_lmv)) { + CDEBUG(D_INFO, + "default lmv size too small: %d < %d\n", + rc, (int)sizeof(*md->default_lmv)); GOTO(out, rc = -EPROTO); } } - } - rc = 0; + } + rc = 0; if (md->body->mbo_valid & OBD_MD_FLACL) { /* for ACL, it's possible that FLACL is set but aclsize is zero. * only when aclsize != 0 there's an actual segment for ACL * in reply buffer. */ - if (md->body->mbo_aclsize) { - rc = mdc_unpack_acl(req, md); - if (rc) - GOTO(out, rc); -#ifdef CONFIG_FS_POSIX_ACL - } else { - md->posix_acl = NULL; -#endif - } - } + rc = mdc_unpack_acl(req, md); + if (rc) + GOTO(out, rc); + } - EXIT; + EXIT; out: - if (rc) { -#ifdef CONFIG_FS_POSIX_ACL - posix_acl_release(md->posix_acl); -#endif - } - return rc; + if (rc) + lmd_clear_acl(md); + + return rc; } -int mdc_free_lustre_md(struct obd_export *exp, struct lustre_md *md) +static int mdc_free_lustre_md(struct obd_export *exp, struct lustre_md *md) { - ENTRY; - RETURN(0); + ENTRY; + RETURN(0); } void mdc_replay_open(struct ptlrpc_request *req) { - struct md_open_data *mod = req->rq_cb_data; - struct ptlrpc_request *close_req; - struct obd_client_handle *och; - struct lustre_handle old; - struct mdt_body *body; - ENTRY; + struct md_open_data *mod = req->rq_cb_data; + struct ptlrpc_request *close_req; + struct obd_client_handle *och; + struct lustre_handle old_open_handle = { }; + struct mdt_body *body; + struct ldlm_reply *rep; + ENTRY; - if (mod == NULL) { - DEBUG_REQ(D_ERROR, req, - "Can't properly replay without open data."); - EXIT; - return; - } + if (mod == NULL) { + DEBUG_REQ(D_ERROR, req, + "cannot properly replay without open data"); + EXIT; + return; + } - body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY); - LASSERT(body != NULL); + body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY); + LASSERT(body != NULL); + + rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP); + if (rep != NULL && rep->lock_policy_res2 != 0) + DEBUG_REQ(D_ERROR, req, "Open request replay failed with %ld ", + (long int)rep->lock_policy_res2); + + spin_lock(&req->rq_lock); + och = mod->mod_och; + if (och && och->och_open_handle.cookie) + req->rq_early_free_repbuf = 1; + else + req->rq_early_free_repbuf = 0; + spin_unlock(&req->rq_lock); - och = mod->mod_och; - if (och != NULL) { - struct lustre_handle *file_fh; + if (req->rq_early_free_repbuf) { + struct lustre_handle *file_open_handle; - LASSERT(och->och_magic == OBD_CLIENT_HANDLE_MAGIC); + LASSERT(och->och_magic == OBD_CLIENT_HANDLE_MAGIC); - file_fh = &och->och_fh; + file_open_handle = &och->och_open_handle; CDEBUG(D_HA, "updating handle from %#llx to %#llx\n", - file_fh->cookie, body->mbo_handle.cookie); - old = *file_fh; - *file_fh = body->mbo_handle; - } - close_req = mod->mod_close_req; - if (close_req != NULL) { - __u32 opc = lustre_msg_get_opc(close_req->rq_reqmsg); - struct mdt_ioepoch *epoch; + file_open_handle->cookie, body->mbo_open_handle.cookie); + old_open_handle = *file_open_handle; + *file_open_handle = body->mbo_open_handle; + } + + close_req = mod->mod_close_req; + if (close_req) { + __u32 opc = lustre_msg_get_opc(close_req->rq_reqmsg); + struct mdt_ioepoch *epoch; LASSERT(opc == MDS_CLOSE); epoch = req_capsule_client_get(&close_req->rq_pill, &RMF_MDT_EPOCH); LASSERT(epoch); - if (och != NULL) - LASSERT(!memcmp(&old, &epoch->mio_handle, sizeof(old))); + if (req->rq_early_free_repbuf) + LASSERT(old_open_handle.cookie == + epoch->mio_open_handle.cookie); DEBUG_REQ(D_HA, close_req, "updating close body with new fh"); - epoch->mio_handle = body->mbo_handle; + epoch->mio_open_handle = body->mbo_open_handle; } EXIT; } @@ -646,14 +745,14 @@ int mdc_set_open_replay_data(struct obd_export *exp, /* Outgoing messages always in my byte order. */ LASSERT(body != NULL); - /* Only if the import is replayable, we set replay_open data */ - if (och && imp->imp_replayable) { - mod = obd_mod_alloc(); - if (mod == NULL) { - DEBUG_REQ(D_ERROR, open_req, - "Can't allocate md_open_data"); - RETURN(0); - } + /* Only if the import is replayable, we set replay_open data */ + if (och && imp->imp_replayable) { + mod = obd_mod_alloc(); + if (mod == NULL) { + DEBUG_REQ(D_ERROR, open_req, + "cannot allocate md_open_data"); + RETURN(0); + } /** * Take a reference on \var mod, to be freed on mdc_close(). @@ -672,21 +771,22 @@ int mdc_set_open_replay_data(struct obd_export *exp, mod->mod_open_req = open_req; open_req->rq_cb_data = mod; open_req->rq_commit_cb = mdc_commit_open; + open_req->rq_early_free_repbuf = 1; spin_unlock(&open_req->rq_lock); - } + } rec->cr_fid2 = body->mbo_fid1; - rec->cr_ioepoch = body->mbo_ioepoch; - rec->cr_old_handle.cookie = body->mbo_handle.cookie; + rec->cr_open_handle_old = body->mbo_open_handle; open_req->rq_replay_cb = mdc_replay_open; if (!fid_is_sane(&body->mbo_fid1)) { - DEBUG_REQ(D_ERROR, open_req, "Saving replay request with " - "insane fid"); - LBUG(); - } + DEBUG_REQ(D_ERROR, open_req, + "saving replay request with insane FID " DFID, + PFID(&body->mbo_fid1)); + LBUG(); + } - DEBUG_REQ(D_RPCTRACE, open_req, "Set up open replay data"); - RETURN(0); + DEBUG_REQ(D_RPCTRACE, open_req, "Set up open replay data"); + RETURN(0); } static void mdc_free_open(struct md_open_data *mod) @@ -704,36 +804,43 @@ static void mdc_free_open(struct md_open_data *mod) * The worst thing is eviction if the client gets open lock **/ - DEBUG_REQ(D_RPCTRACE, mod->mod_open_req, "free open request rq_replay" - "= %d\n", mod->mod_open_req->rq_replay); + DEBUG_REQ(D_RPCTRACE, mod->mod_open_req, + "free open request, rq_replay=%d", + mod->mod_open_req->rq_replay); ptlrpc_request_committed(mod->mod_open_req, committed); if (mod->mod_close_req) ptlrpc_request_committed(mod->mod_close_req, committed); } -int mdc_clear_open_replay_data(struct obd_export *exp, - struct obd_client_handle *och) +static int mdc_clear_open_replay_data(struct obd_export *exp, + struct obd_client_handle *och) { - struct md_open_data *mod = och->och_mod; - ENTRY; + struct md_open_data *mod = och->och_mod; + ENTRY; - /** - * It is possible to not have \var mod in a case of eviction between - * lookup and ll_file_open(). - **/ - if (mod == NULL) - RETURN(0); + /** + * It is possible to not have \var mod in a case of eviction between + * lookup and ll_file_open(). + **/ + if (mod == NULL) + RETURN(0); - LASSERT(mod != LP_POISON); + LASSERT(mod != LP_POISON); LASSERT(mod->mod_open_req != NULL); + + spin_lock(&mod->mod_open_req->rq_lock); + if (mod->mod_och) + mod->mod_och->och_open_handle.cookie = 0; + mod->mod_open_req->rq_early_free_repbuf = 0; + spin_unlock(&mod->mod_open_req->rq_lock); mdc_free_open(mod); - mod->mod_och = NULL; - och->och_mod = NULL; - obd_mod_put(mod); + mod->mod_och = NULL; + och->och_mod = NULL; + obd_mod_put(mod); - RETURN(0); + RETURN(0); } static int mdc_close(struct obd_export *exp, struct md_op_data *op_data, @@ -742,23 +849,35 @@ static int mdc_close(struct obd_export *exp, struct md_op_data *op_data, struct obd_device *obd = class_exp2obd(exp); struct ptlrpc_request *req; struct req_format *req_fmt; + size_t u32_count = 0; int rc; int saved_rc = 0; ENTRY; - if (op_data->op_bias & MDS_HSM_RELEASE) { - req_fmt = &RQF_MDS_INTENT_CLOSE; + CDEBUG(D_INODE, "%s: "DFID" file closed with intent: %x\n", + exp->exp_obd->obd_name, PFID(&op_data->op_fid1), + op_data->op_bias); + + if (op_data->op_bias & MDS_CLOSE_INTENT) { + req_fmt = &RQF_MDS_CLOSE_INTENT; + if (op_data->op_bias & MDS_HSM_RELEASE) { + /* allocate a FID for volatile file */ + rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, + op_data); + if (rc < 0) { + CERROR("%s: "DFID" allocating FID: rc = %d\n", + obd->obd_name, PFID(&op_data->op_fid1), + rc); + /* save the errcode and proceed to close */ + saved_rc = rc; + } + } + if (op_data->op_bias & MDS_CLOSE_RESYNC_DONE) { + size_t count = op_data->op_data_size / sizeof(__u32); - /* allocate a FID for volatile file */ - rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data); - if (rc < 0) { - CERROR("%s: "DFID" failed to allocate FID: %d\n", - obd->obd_name, PFID(&op_data->op_fid1), rc); - /* save the errcode and proceed to close */ - saved_rc = rc; + if (count > INLINE_RESYNC_ARRAY_SIZE) + u32_count = count; } - } else if (op_data->op_bias & MDS_CLOSE_LAYOUT_SWAP) { - req_fmt = &RQF_MDS_INTENT_CLOSE; } else { req_fmt = &RQF_MDS_CLOSE; } @@ -777,7 +896,7 @@ static int mdc_close(struct obd_export *exp, struct md_op_data *op_data, mod->mod_close_req = req; - DEBUG_REQ(D_HA, mod->mod_open_req, "matched open"); + DEBUG_REQ(D_RPCTRACE, mod->mod_open_req, "matched open"); /* We no longer want to preserve this open for replay even * though the open was committed. b=3632, b=3633 */ spin_lock(&mod->mod_open_req->rq_lock); @@ -796,6 +915,10 @@ static int mdc_close(struct obd_export *exp, struct md_op_data *op_data, GOTO(out, rc = -ENOMEM); } + if (u32_count > 0) + req_capsule_set_size(&req->rq_pill, &RMF_U32, RCL_CLIENT, + u32_count * sizeof(__u32)); + rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_CLOSE); if (rc) { ptlrpc_request_free(req); @@ -809,6 +932,9 @@ static int mdc_close(struct obd_export *exp, struct md_op_data *op_data, req->rq_request_portal = MDS_READPAGE_PORTAL; ptlrpc_at_set_req_timeout(req); + if (!(exp_connect_flags2(exp) & OBD_CONNECT2_LSOM)) + op_data->op_xvalid &= ~(OP_XVALID_LAZYSIZE | + OP_XVALID_LAZYBLOCKS); mdc_close_pack(req, op_data); @@ -817,41 +943,41 @@ static int mdc_close(struct obd_export *exp, struct md_op_data *op_data, ptlrpc_request_set_replen(req); - mdc_get_mod_rpc_slot(req, NULL); + ptlrpc_get_mod_rpc_slot(req); rc = ptlrpc_queue_wait(req); - mdc_put_mod_rpc_slot(req, NULL); - - if (req->rq_repmsg == NULL) { - CDEBUG(D_RPCTRACE, "request failed to send: %p, %d\n", req, - req->rq_status); - if (rc == 0) - rc = req->rq_status ?: -EIO; - } else if (rc == 0 || rc == -EAGAIN) { - struct mdt_body *body; - - rc = lustre_msg_get_status(req->rq_repmsg); - if (lustre_msg_get_type(req->rq_repmsg) == PTL_RPC_MSG_ERR) { - DEBUG_REQ(D_ERROR, req, "type == PTL_RPC_MSG_ERR, err " - "= %d", rc); - if (rc > 0) - rc = -rc; - } - body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY); - if (body == NULL) - rc = -EPROTO; - } else if (rc == -ESTALE) { - /** - * it can be allowed error after 3633 if open was committed and - * server failed before close was sent. Let's check if mod - * exists and return no error in that case - */ - if (mod) { - DEBUG_REQ(D_HA, req, "Reset ESTALE = %d", rc); - LASSERT(mod->mod_open_req != NULL); - if (mod->mod_open_req->rq_committed) - rc = 0; - } - } + ptlrpc_put_mod_rpc_slot(req); + + if (req->rq_repmsg == NULL) { + CDEBUG(D_RPCTRACE, "request %p failed to send: rc = %d\n", req, + req->rq_status); + if (rc == 0) + rc = req->rq_status ?: -EIO; + } else if (rc == 0 || rc == -EAGAIN) { + struct mdt_body *body; + + rc = lustre_msg_get_status(req->rq_repmsg); + if (lustre_msg_get_type(req->rq_repmsg) == PTL_RPC_MSG_ERR) { + DEBUG_REQ(D_ERROR, req, + "type = PTL_RPC_MSG_ERR: rc = %d", rc); + if (rc > 0) + rc = -rc; + } + body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY); + if (body == NULL) + rc = -EPROTO; + } else if (rc == -ESTALE) { + /** + * it can be allowed error after 3633 if open was committed and + * server failed before close was sent. Let's check if mod + * exists and return no error in that case + */ + if (mod) { + DEBUG_REQ(D_HA, req, "Reset ESTALE = %d", rc); + LASSERT(mod->mod_open_req != NULL); + if (mod->mod_open_req->rq_committed) + rc = 0; + } + } out: if (mod) { @@ -873,14 +999,11 @@ static int mdc_getpage(struct obd_export *exp, const struct lu_fid *fid, struct ptlrpc_request *req; struct ptlrpc_bulk_desc *desc; int i; - wait_queue_head_t waitq; int resends = 0; - struct l_wait_info lwi; int rc; ENTRY; *request = NULL; - init_waitqueue_head(&waitq); restart_bulk: req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_READPAGE); @@ -897,11 +1020,11 @@ restart_bulk: ptlrpc_at_set_req_timeout(req); desc = ptlrpc_prep_bulk_imp(req, npages, 1, - PTLRPC_BULK_PUT_SINK | PTLRPC_BULK_BUF_KIOV, + PTLRPC_BULK_PUT_SINK, MDS_BULK_PORTAL, &ptlrpc_bulk_kiov_pin_ops); if (desc == NULL) { - ptlrpc_request_free(req); + ptlrpc_req_finished(req); RETURN(-ENOMEM); } @@ -925,9 +1048,12 @@ restart_bulk: exp->exp_obd->obd_name, -EIO); RETURN(-EIO); } - lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, - NULL); - l_wait_event(waitq, 0, &lwi); + + /* If a signal interrupts then the timeout returned will + * not be zero. In that case return -EINTR + */ + if (msleep_interruptible(resends * 1000)) + RETURN(-EINTR); goto restart_bulk; } @@ -956,7 +1082,7 @@ static void mdc_release_page(struct page *page, int remove) if (remove) { lock_page(page); if (likely(page->mapping != NULL)) - truncate_complete_page(page->mapping, page); + delete_from_page_cache(page); unlock_page(page); } put_page(page); @@ -972,16 +1098,17 @@ static struct page *mdc_page_locate(struct address_space *mapping, __u64 *hash, */ unsigned long offset = hash_x_index(*hash, hash64); struct page *page; + unsigned long flags; int found; - spin_lock_irq(&mapping->tree_lock); + ll_xa_lock_irqsave(&mapping->i_pages, flags); found = radix_tree_gang_lookup(&mapping->page_tree, (void **)&page, offset, 1); - if (found > 0 && !radix_tree_exceptional_entry(page)) { + if (found > 0 && !ll_xa_is_value(page)) { struct lu_dirpage *dp; get_page(page); - spin_unlock_irq(&mapping->tree_lock); + ll_xa_unlock_irqrestore(&mapping->i_pages, flags); /* * In contrast to find_lock_page() we are sure that directory * page cannot be truncated (while DLM lock is held) and, @@ -1030,7 +1157,7 @@ static struct page *mdc_page_locate(struct address_space *mapping, __u64 *hash, page = ERR_PTR(-EIO); } } else { - spin_unlock_irq(&mapping->tree_lock); + ll_xa_unlock_irqrestore(&mapping->i_pages, flags); page = NULL; } return page; @@ -1092,12 +1219,12 @@ static void mdc_adjust_dirpages(struct page **pages, int cfs_pgs, int lu_pgs) int i; for (i = 0; i < cfs_pgs; i++) { - struct lu_dirpage *dp = kmap(pages[i]); - struct lu_dirpage *first = dp; - struct lu_dirent *end_dirent = NULL; - struct lu_dirent *ent; - __u64 hash_end = le64_to_cpu(dp->ldp_hash_end); - __u32 flags = le32_to_cpu(dp->ldp_flags); + struct lu_dirpage *dp = kmap(pages[i]); + struct lu_dirpage *first = dp; + struct lu_dirent *end_dirent = NULL; + struct lu_dirent *ent; + __u64 hash_end = dp->ldp_hash_end; + __u32 flags = dp->ldp_flags; while (--lu_pgs > 0) { ent = lu_dirent_start(dp); @@ -1112,8 +1239,8 @@ static void mdc_adjust_dirpages(struct page **pages, int cfs_pgs, int lu_pgs) break; /* Save the hash and flags of this lu_dirpage. */ - hash_end = le64_to_cpu(dp->ldp_hash_end); - flags = le32_to_cpu(dp->ldp_flags); + hash_end = dp->ldp_hash_end; + flags = dp->ldp_flags; /* Check if lu_dirpage contains no entries. */ if (end_dirent == NULL) @@ -1148,14 +1275,6 @@ struct readpage_param { struct md_callback *rp_cb; }; -#ifndef HAVE_DELETE_FROM_PAGE_CACHE -static inline void delete_from_page_cache(struct page *page) -{ - remove_from_page_cache(page); - put_page(page); -} -#endif - /** * Read pages from server. * @@ -1168,27 +1287,27 @@ static inline void delete_from_page_cache(struct page *page) **/ static int mdc_read_page_remote(void *data, struct page *page0) { - struct readpage_param *rp = data; - struct page **page_pool; - struct page *page; - struct lu_dirpage *dp; - int rd_pgs = 0; /* number of pages read actually */ - int npages; - struct md_op_data *op_data = rp->rp_mod; - struct ptlrpc_request *req; - int max_pages = op_data->op_max_pages; - struct inode *inode; - struct lu_fid *fid; - int i; - int rc; + struct readpage_param *rp = data; + struct page **page_pool; + struct page *page; + struct lu_dirpage *dp; + struct md_op_data *op_data = rp->rp_mod; + struct ptlrpc_request *req; + int max_pages; + struct inode *inode; + struct lu_fid *fid; + int rd_pgs = 0; /* number of pages actually read */ + int npages; + int i; + int rc; ENTRY; - LASSERT(max_pages > 0 && max_pages <= PTLRPC_MAX_BRW_PAGES); + max_pages = rp->rp_exp->exp_obd->u.cli.cl_max_pages_per_rpc; inode = op_data->op_data; fid = &op_data->op_fid1; LASSERT(inode != NULL); - OBD_ALLOC(page_pool, sizeof(page_pool[0]) * max_pages); + OBD_ALLOC_PTR_ARRAY(page_pool, max_pages); if (page_pool != NULL) { page_pool[0] = page0; } else { @@ -1197,7 +1316,7 @@ static int mdc_read_page_remote(void *data, struct page *page0) } for (npages = 1; npages < max_pages; npages++) { - page = page_cache_alloc_cold(inode->i_mapping); + page = page_cache_alloc(inode->i_mapping); if (page == NULL) break; page_pool[npages] = page; @@ -1210,10 +1329,9 @@ static int mdc_read_page_remote(void *data, struct page *page0) } else { int lu_pgs; - rd_pgs = (req->rq_bulk->bd_nob_transferred + - PAGE_SIZE - 1) >> PAGE_SHIFT; - lu_pgs = req->rq_bulk->bd_nob_transferred >> - LU_PAGE_SHIFT; + rd_pgs = (req->rq_bulk->bd_nob_transferred + PAGE_SIZE - 1) >> + PAGE_SHIFT; + lu_pgs = req->rq_bulk->bd_nob_transferred >> LU_PAGE_SHIFT; LASSERT(!(req->rq_bulk->bd_nob_transferred & ~LU_PAGE_MASK)); CDEBUG(D_INODE, "read %d(%d) pages\n", rd_pgs, lu_pgs); @@ -1258,7 +1376,7 @@ static int mdc_read_page_remote(void *data, struct page *page0) } if (page_pool != &page0) - OBD_FREE(page_pool, sizeof(page_pool[0]) * max_pages); + OBD_FREE_PTR_ARRAY(page_pool, max_pages); RETURN(rc); } @@ -1411,33 +1529,95 @@ fail: goto out_unlock; } +static int mdc_statfs_interpret(const struct lu_env *env, + struct ptlrpc_request *req, void *args, int rc) +{ + struct obd_info *oinfo = args; + struct obd_statfs *osfs; + + if (!rc) { + osfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS); + if (!osfs) + return -EPROTO; + + oinfo->oi_osfs = osfs; + + CDEBUG(D_CACHE, "blocks=%llu free=%llu avail=%llu " + "objects=%llu free=%llu state=%x\n", + osfs->os_blocks, osfs->os_bfree, osfs->os_bavail, + osfs->os_files, osfs->os_ffree, osfs->os_state); + } + + oinfo->oi_cb_up(oinfo, rc); + + return rc; +} + +static int mdc_statfs_async(struct obd_export *exp, + struct obd_info *oinfo, time64_t max_age, + struct ptlrpc_request_set *unused) +{ + struct ptlrpc_request *req; + struct obd_info *aa; + + req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp), &RQF_MDS_STATFS, + LUSTRE_MDS_VERSION, MDS_STATFS); + if (req == NULL) + return -ENOMEM; + + ptlrpc_request_set_replen(req); + req->rq_interpret_reply = mdc_statfs_interpret; + + aa = ptlrpc_req_async_args(aa, req); + *aa = *oinfo; + + ptlrpcd_add_req(req); + + return 0; +} static int mdc_statfs(const struct lu_env *env, struct obd_export *exp, struct obd_statfs *osfs, - __u64 max_age, __u32 flags) + time64_t max_age, __u32 flags) { - struct obd_device *obd = class_exp2obd(exp); - struct ptlrpc_request *req; - struct obd_statfs *msfs; - struct obd_import *imp = NULL; - int rc; - ENTRY; + struct obd_device *obd = class_exp2obd(exp); + struct req_format *fmt; + struct ptlrpc_request *req; + struct obd_statfs *msfs; + struct obd_import *imp = NULL; + int rc; + ENTRY; /* * Since the request might also come from lprocfs, so we need * sync this with client_disconnect_export Bug15684 */ down_read(&obd->u.cli.cl_sem); - if (obd->u.cli.cl_import) - imp = class_import_get(obd->u.cli.cl_import); + if (obd->u.cli.cl_import) + imp = class_import_get(obd->u.cli.cl_import); up_read(&obd->u.cli.cl_sem); - if (!imp) - RETURN(-ENODEV); + if (!imp) + RETURN(-ENODEV); + + fmt = &RQF_MDS_STATFS; + if ((exp_connect_flags2(exp) & OBD_CONNECT2_SUM_STATFS) && + (flags & OBD_STATFS_SUM)) + fmt = &RQF_MDS_STATFS_NEW; + req = ptlrpc_request_alloc_pack(imp, fmt, LUSTRE_MDS_VERSION, + MDS_STATFS); + if (req == NULL) + GOTO(output, rc = -ENOMEM); - req = ptlrpc_request_alloc_pack(imp, &RQF_MDS_STATFS, - LUSTRE_MDS_VERSION, MDS_STATFS); - if (req == NULL) - GOTO(output, rc = -ENOMEM); + if ((flags & OBD_STATFS_SUM) && + (exp_connect_flags2(exp) & OBD_CONNECT2_SUM_STATFS)) { + /* request aggregated states */ + struct mdt_body *body; + + body = req_capsule_client_get(&req->rq_pill, &RMF_MDT_BODY); + if (body == NULL) + GOTO(out, rc = -EPROTO); + body->mbo_valid = OBD_MD_FLAGSTATFS; + } ptlrpc_request_set_replen(req); @@ -1544,40 +1724,65 @@ static int mdc_ioc_hsm_progress(struct obd_export *exp, ptlrpc_request_set_replen(req); - mdc_get_mod_rpc_slot(req, NULL); + ptlrpc_get_mod_rpc_slot(req); rc = ptlrpc_queue_wait(req); - mdc_put_mod_rpc_slot(req, NULL); + ptlrpc_put_mod_rpc_slot(req); GOTO(out, rc); out: ptlrpc_req_finished(req); return rc; } - -static int mdc_ioc_hsm_ct_register(struct obd_import *imp, __u32 archives) +/** + * Send hsm_ct_register to MDS + * + * \param[in] imp import + * \param[in] archive_count if in bitmap format, it is the bitmap, + * else it is the count of archive_ids + * \param[in] archives if in bitmap format, it is NULL, + * else it is archive_id lists + */ +static int mdc_ioc_hsm_ct_register(struct obd_import *imp, __u32 archive_count, + __u32 *archives) { - __u32 *archive_mask; - struct ptlrpc_request *req; - int rc; + struct ptlrpc_request *req; + __u32 *archive_array; + size_t archives_size; + int rc; ENTRY; - req = ptlrpc_request_alloc_pack(imp, &RQF_MDS_HSM_CT_REGISTER, - LUSTRE_MDS_VERSION, - MDS_HSM_CT_REGISTER); + req = ptlrpc_request_alloc(imp, &RQF_MDS_HSM_CT_REGISTER); if (req == NULL) - GOTO(out, rc = -ENOMEM); + RETURN(-ENOMEM); + + if (archives != NULL) + archives_size = sizeof(*archive_array) * archive_count; + else + archives_size = sizeof(archive_count); + + req_capsule_set_size(&req->rq_pill, &RMF_MDS_HSM_ARCHIVE, + RCL_CLIENT, archives_size); + + rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_HSM_CT_REGISTER); + if (rc) { + ptlrpc_request_free(req); + RETURN(-ENOMEM); + } mdc_pack_body(req, NULL, 0, 0, -1, 0); - /* Copy hsm_progress struct */ - archive_mask = req_capsule_client_get(&req->rq_pill, - &RMF_MDS_HSM_ARCHIVE); - if (archive_mask == NULL) + archive_array = req_capsule_client_get(&req->rq_pill, + &RMF_MDS_HSM_ARCHIVE); + if (archive_array == NULL) GOTO(out, rc = -EPROTO); - *archive_mask = archives; + if (archives != NULL) + memcpy(archive_array, archives, archives_size); + else + *archive_array = archive_count; ptlrpc_request_set_replen(req); + req->rq_no_resend = 1; rc = mdc_queue_wait(req); GOTO(out, rc); @@ -1723,9 +1928,9 @@ static int mdc_ioc_hsm_state_set(struct obd_export *exp, ptlrpc_request_set_replen(req); - mdc_get_mod_rpc_slot(req, NULL); + ptlrpc_get_mod_rpc_slot(req); rc = ptlrpc_queue_wait(req); - mdc_put_mod_rpc_slot(req, NULL); + ptlrpc_put_mod_rpc_slot(req); GOTO(out, rc); out: @@ -1783,9 +1988,9 @@ static int mdc_ioc_hsm_request(struct obd_export *exp, ptlrpc_request_set_replen(req); - mdc_get_mod_rpc_slot(req, NULL); + ptlrpc_get_mod_rpc_slot(req); rc = ptlrpc_queue_wait(req); - mdc_put_mod_rpc_slot(req, NULL); + ptlrpc_put_mod_rpc_slot(req); GOTO(out, rc); @@ -1794,215 +1999,66 @@ out: return rc; } -static struct kuc_hdr *changelog_kuc_hdr(char *buf, size_t len, __u32 flags) -{ - struct kuc_hdr *lh = (struct kuc_hdr *)buf; - - LASSERT(len <= KUC_CHANGELOG_MSG_MAXSIZE); - - lh->kuc_magic = KUC_MAGIC; - lh->kuc_transport = KUC_TRANSPORT_CHANGELOG; - lh->kuc_flags = flags; - lh->kuc_msgtype = CL_RECORD; - lh->kuc_msglen = len; - return lh; -} - -struct changelog_show { - __u64 cs_startrec; - enum changelog_send_flag cs_flags; - struct file *cs_fp; - char *cs_buf; - struct obd_device *cs_obd; -}; - -static inline char *cs_obd_name(struct changelog_show *cs) -{ - return cs->cs_obd->obd_name; -} +static int mdc_ioc_hsm_ct_start(struct obd_export *exp, + struct lustre_kernelcomm *lk); -static int changelog_kkuc_cb(const struct lu_env *env, struct llog_handle *llh, - struct llog_rec_hdr *hdr, void *data) +static int mdc_quotactl(struct obd_device *unused, struct obd_export *exp, + struct obd_quotactl *oqctl) { - struct changelog_show *cs = data; - struct llog_changelog_rec *rec = (struct llog_changelog_rec *)hdr; - struct kuc_hdr *lh; - size_t len; - int rc; + struct ptlrpc_request *req; + struct obd_quotactl *oqc; + int rc; ENTRY; - if (rec->cr_hdr.lrh_type != CHANGELOG_REC) { - rc = -EINVAL; - CERROR("%s: not a changelog rec %x/%d: rc = %d\n", - cs_obd_name(cs), rec->cr_hdr.lrh_type, - rec->cr.cr_type, rc); - RETURN(rc); - } - - if (rec->cr.cr_index < cs->cs_startrec) { - /* Skip entries earlier than what we are interested in */ - CDEBUG(D_HSM, "rec=%llu start=%llu\n", - rec->cr.cr_index, cs->cs_startrec); - RETURN(0); - } - - CDEBUG(D_HSM, "%llu %02d%-5s %llu 0x%x t="DFID" p="DFID" %.*s\n", - rec->cr.cr_index, rec->cr.cr_type, - changelog_type2str(rec->cr.cr_type), rec->cr.cr_time, - rec->cr.cr_flags & CLF_FLAGMASK, - PFID(&rec->cr.cr_tfid), PFID(&rec->cr.cr_pfid), - rec->cr.cr_namelen, changelog_rec_name(&rec->cr)); - - len = sizeof(*lh) + changelog_rec_size(&rec->cr) + rec->cr.cr_namelen; - - /* Set up the message */ - lh = changelog_kuc_hdr(cs->cs_buf, len, cs->cs_flags); - memcpy(lh + 1, &rec->cr, len - sizeof(*lh)); - - rc = libcfs_kkuc_msg_put(cs->cs_fp, lh); - CDEBUG(D_HSM, "kucmsg fp %p len %zu rc %d\n", cs->cs_fp, len, rc); - - RETURN(rc); -} - -static int mdc_changelog_send_thread(void *csdata) -{ - struct changelog_show *cs = csdata; - struct llog_ctxt *ctxt = NULL; - struct llog_handle *llh = NULL; - struct kuc_hdr *kuch; - enum llog_flag flags = LLOG_F_IS_CAT; - int rc; + req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_QUOTACTL); + if (req == NULL) + RETURN(-ENOMEM); - CDEBUG(D_HSM, "changelog to fp=%p start %llu\n", - cs->cs_fp, cs->cs_startrec); - OBD_ALLOC(cs->cs_buf, KUC_CHANGELOG_MSG_MAXSIZE); - if (cs->cs_buf == NULL) - GOTO(out, rc = -ENOMEM); + if (LUSTRE_Q_CMD_IS_POOL(oqctl->qc_cmd)) + req_capsule_set_size(&req->rq_pill, + &RMF_OBD_QUOTACTL, + RCL_CLIENT, + sizeof(*oqc) + LOV_MAXPOOLNAME + 1); - /* Set up the remote catalog handle */ - ctxt = llog_get_context(cs->cs_obd, LLOG_CHANGELOG_REPL_CTXT); - if (ctxt == NULL) - GOTO(out, rc = -ENOENT); - rc = llog_open(NULL, ctxt, &llh, NULL, CHANGELOG_CATALOG, - LLOG_OPEN_EXISTS); + rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, + MDS_QUOTACTL); if (rc) { - CERROR("%s: fail to open changelog catalog: rc = %d\n", - cs_obd_name(cs), rc); - GOTO(out, rc); + ptlrpc_request_free(req); + RETURN(rc); } - if (cs->cs_flags & CHANGELOG_FLAG_JOBID) - flags |= LLOG_F_EXT_JOBID; + oqc = req_capsule_client_get(&req->rq_pill, &RMF_OBD_QUOTACTL); + QCTL_COPY(oqc, oqctl); - rc = llog_init_handle(NULL, llh, flags, NULL); + ptlrpc_request_set_replen(req); + ptlrpc_at_set_req_timeout(req); + + rc = ptlrpc_queue_wait(req); if (rc) { - CERROR("llog_init_handle failed %d\n", rc); + CERROR("%s: ptlrpc_queue_wait failed: rc = %d\n", + exp->exp_obd->obd_name, rc); GOTO(out, rc); } - rc = llog_cat_process(NULL, llh, changelog_kkuc_cb, cs, 0, 0); - - /* Send EOF no matter what our result */ - kuch = changelog_kuc_hdr(cs->cs_buf, sizeof(*kuch), cs->cs_flags); - kuch->kuc_msgtype = CL_EOF; - libcfs_kkuc_msg_put(cs->cs_fp, kuch); - -out: - fput(cs->cs_fp); - if (llh) - llog_cat_close(NULL, llh); - if (ctxt) - llog_ctxt_put(ctxt); - if (cs->cs_buf) - OBD_FREE(cs->cs_buf, KUC_CHANGELOG_MSG_MAXSIZE); - OBD_FREE_PTR(cs); - return rc; -} - -static int mdc_ioc_changelog_send(struct obd_device *obd, - struct ioc_changelog *icc) -{ - struct changelog_show *cs; - struct task_struct *task; - int rc; - - /* Freed in mdc_changelog_send_thread */ - OBD_ALLOC_PTR(cs); - if (!cs) - return -ENOMEM; - - cs->cs_obd = obd; - cs->cs_startrec = icc->icc_recno; - /* matching fput in mdc_changelog_send_thread */ - cs->cs_fp = fget(icc->icc_id); - cs->cs_flags = icc->icc_flags; - - /* - * New thread because we should return to user app before - * writing into our pipe - */ - task = kthread_run(mdc_changelog_send_thread, cs, - "mdc_clg_send_thread"); - if (IS_ERR(task)) { - rc = PTR_ERR(task); - CERROR("%s: cannot start changelog thread: rc = %d\n", - cs_obd_name(cs), rc); - OBD_FREE_PTR(cs); - } else { - rc = 0; - CDEBUG(D_HSM, "%s: started changelog thread\n", - cs_obd_name(cs)); + if (req->rq_repmsg && + (oqc = req_capsule_server_get(&req->rq_pill, &RMF_OBD_QUOTACTL))) { + QCTL_COPY(oqctl, oqc); + } else if (!rc) { + rc = -EPROTO; + CERROR("%s: cannot unpack obd_quotactl: rc = %d\n", + exp->exp_obd->obd_name, rc); } +out: + ptlrpc_req_finished(req); - return rc; -} - -static int mdc_ioc_hsm_ct_start(struct obd_export *exp, - struct lustre_kernelcomm *lk); - -static int mdc_quotactl(struct obd_device *unused, struct obd_export *exp, - struct obd_quotactl *oqctl) -{ - struct ptlrpc_request *req; - struct obd_quotactl *oqc; - int rc; - ENTRY; - - req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp), - &RQF_MDS_QUOTACTL, LUSTRE_MDS_VERSION, - MDS_QUOTACTL); - if (req == NULL) - RETURN(-ENOMEM); - - oqc = req_capsule_client_get(&req->rq_pill, &RMF_OBD_QUOTACTL); - *oqc = *oqctl; - - ptlrpc_request_set_replen(req); - ptlrpc_at_set_req_timeout(req); - req->rq_no_resend = 1; - - rc = ptlrpc_queue_wait(req); - if (rc) - CERROR("ptlrpc_queue_wait failed, rc: %d\n", rc); - - if (req->rq_repmsg && - (oqc = req_capsule_server_get(&req->rq_pill, &RMF_OBD_QUOTACTL))) { - *oqctl = *oqc; - } else if (!rc) { - CERROR ("Can't unpack obd_quotactl\n"); - rc = -EPROTO; - } - ptlrpc_req_finished(req); - - RETURN(rc); + RETURN(rc); } static int mdc_ioc_swap_layouts(struct obd_export *exp, struct md_op_data *op_data) { - struct list_head cancels = LIST_HEAD_INIT(cancels); + LIST_HEAD(cancels); struct ptlrpc_request *req; int rc, count; struct mdc_swap_layouts *msl, *payload; @@ -2058,30 +2114,18 @@ out: static int mdc_iocontrol(unsigned int cmd, struct obd_export *exp, int len, void *karg, void __user *uarg) { - struct obd_device *obd = exp->exp_obd; - struct obd_ioctl_data *data = karg; - struct obd_import *imp = obd->u.cli.cl_import; - int rc; - ENTRY; + struct obd_device *obd = exp->exp_obd; + struct obd_ioctl_data *data = karg; + struct obd_import *imp = obd->u.cli.cl_import; + int rc; + ENTRY; if (!try_module_get(THIS_MODULE)) { CERROR("%s: cannot get module '%s'\n", obd->obd_name, module_name(THIS_MODULE)); return -EINVAL; } - switch (cmd) { - case OBD_IOC_CHANGELOG_SEND: - rc = mdc_ioc_changelog_send(obd, karg); - GOTO(out, rc); - case OBD_IOC_CHANGELOG_CLEAR: { - struct ioc_changelog *icc = karg; - struct changelog_setinfo cs = - {.cs_recno = icc->icc_recno, .cs_id = icc->icc_id}; - rc = obd_set_info_async(NULL, exp, strlen(KEY_CHANGELOG_CLEAR), - KEY_CHANGELOG_CLEAR, sizeof(cs), &cs, - NULL); - GOTO(out, rc); - } + switch (cmd) { case OBD_IOC_FID2PATH: rc = mdc_ioc_fid2path(exp, karg); GOTO(out, rc); @@ -2106,49 +2150,46 @@ static int mdc_iocontrol(unsigned int cmd, struct obd_export *exp, int len, case LL_IOC_HSM_REQUEST: rc = mdc_ioc_hsm_request(exp, karg); GOTO(out, rc); - case OBD_IOC_CLIENT_RECOVER: - rc = ptlrpc_recover_import(imp, data->ioc_inlbuf1, 0); - if (rc < 0) - GOTO(out, rc); - GOTO(out, rc = 0); - case IOC_OSC_SET_ACTIVE: - rc = ptlrpc_set_import_active(imp, data->ioc_offset); - GOTO(out, rc); - case OBD_IOC_PING_TARGET: - rc = ptlrpc_obd_ping(obd); - GOTO(out, rc); - /* - * Normally IOC_OBD_STATFS, OBD_IOC_QUOTACTL iocontrol are handled by - * LMV instead of MDC. But when the cluster is upgraded from 1.8, - * there'd be no LMV layer thus we might be called here. Eventually - * this code should be removed. - * bz20731, LU-592. - */ - case IOC_OBD_STATFS: { - struct obd_statfs stat_buf = {0}; + case OBD_IOC_CLIENT_RECOVER: + rc = ptlrpc_recover_import(imp, data->ioc_inlbuf1, 0); + if (rc < 0) + GOTO(out, rc); + GOTO(out, rc = 0); + case IOC_OSC_SET_ACTIVE: + rc = ptlrpc_set_import_active(imp, data->ioc_offset); + GOTO(out, rc); + /* + * Normally IOC_OBD_STATFS, OBD_IOC_QUOTACTL iocontrol are handled by + * LMV instead of MDC. But when the cluster is upgraded from 1.8, + * there'd be no LMV layer thus we might be called here. Eventually + * this code should be removed. + * bz20731, LU-592. + */ + case IOC_OBD_STATFS: { + struct obd_statfs stat_buf = {0}; - if (*((__u32 *) data->ioc_inlbuf2) != 0) - GOTO(out, rc = -ENODEV); + if (*((__u32 *) data->ioc_inlbuf2) != 0) + GOTO(out, rc = -ENODEV); - /* copy UUID */ + /* copy UUID */ if (copy_to_user(data->ioc_pbuf2, obd2cli_tgt(obd), min((int)data->ioc_plen2, (int)sizeof(struct obd_uuid)))) GOTO(out, rc = -EFAULT); rc = mdc_statfs(NULL, obd->obd_self_export, &stat_buf, - cfs_time_shift_64(-OBD_STATFS_CACHE_SECONDS), + ktime_get_seconds() - OBD_STATFS_CACHE_SECONDS, 0); if (rc != 0) GOTO(out, rc); if (copy_to_user(data->ioc_pbuf1, &stat_buf, - min((int) data->ioc_plen1, - (int) sizeof(stat_buf)))) - GOTO(out, rc = -EFAULT); + min((int) data->ioc_plen1, + (int) sizeof(stat_buf)))) + GOTO(out, rc = -EFAULT); - GOTO(out, rc = 0); - } + GOTO(out, rc = 0); + } case OBD_IOC_QUOTACTL: { struct if_quotactl *qctl = karg; struct obd_quotactl *oqctl; @@ -2274,9 +2315,8 @@ static void lustre_swab_kuch(struct kuc_hdr *l) static int mdc_ioc_hsm_ct_start(struct obd_export *exp, struct lustre_kernelcomm *lk) { - struct obd_import *imp = class_exp2cliimp(exp); - __u32 archive = lk->lk_data; - int rc = 0; + struct obd_import *imp = class_exp2cliimp(exp); + int rc = 0; if (lk->lk_group != KUC_GRP_HSM) { CERROR("Bad copytool group %d\n", lk->lk_group); @@ -2290,7 +2330,12 @@ static int mdc_ioc_hsm_ct_start(struct obd_export *exp, /* Unregister with the coordinator */ rc = mdc_ioc_hsm_ct_unregister(imp); } else { - rc = mdc_ioc_hsm_ct_register(imp, archive); + __u32 *archives = NULL; + + if ((lk->lk_flags & LK_FLG_DATANR) && lk->lk_data_count > 0) + archives = lk->lk_data; + + rc = mdc_ioc_hsm_ct_register(imp, lk->lk_data_count, archives); } return rc; @@ -2301,7 +2346,8 @@ static int mdc_ioc_hsm_ct_start(struct obd_export *exp, * @param val KUC message (kuc_hdr + hsm_action_list) * @param len total length of message */ -static int mdc_hsm_copytool_send(size_t len, void *val) +static int mdc_hsm_copytool_send(const struct obd_uuid *uuid, + size_t len, void *val) { struct kuc_hdr *lh = (struct kuc_hdr *)val; struct hsm_action_list *hal = (struct hsm_action_list *)(lh + 1); @@ -2327,7 +2373,7 @@ static int mdc_hsm_copytool_send(size_t len, void *val) lh->kuc_msglen, hal->hal_count, hal->hal_fsname); /* Broadcast to HSM listeners */ - rc = libcfs_kkuc_group_put(KUC_GRP_HSM, lh); + rc = libcfs_kkuc_group_put(uuid, KUC_GRP_HSM, lh); RETURN(rc); } @@ -2340,20 +2386,29 @@ static int mdc_hsm_copytool_send(size_t len, void *val) */ static int mdc_hsm_ct_reregister(void *data, void *cb_arg) { - struct kkuc_ct_data *kcd = data; - struct obd_import *imp = (struct obd_import *)cb_arg; - int rc; + struct obd_import *imp = (struct obd_import *)cb_arg; + struct kkuc_ct_data *kcd = data; + __u32 *archives = NULL; + int rc; - if (kcd == NULL || kcd->kcd_magic != KKUC_CT_DATA_MAGIC) + if (kcd == NULL || + (kcd->kcd_magic != KKUC_CT_DATA_ARRAY_MAGIC && + kcd->kcd_magic != KKUC_CT_DATA_BITMAP_MAGIC)) return -EPROTO; - if (!obd_uuid_equals(&kcd->kcd_uuid, &imp->imp_obd->obd_uuid)) - return 0; - - CDEBUG(D_HA, "%s: recover copytool registration to MDT (archive=%#x)\n", - imp->imp_obd->obd_name, kcd->kcd_archive); - rc = mdc_ioc_hsm_ct_register(imp, kcd->kcd_archive); + if (kcd->kcd_magic == KKUC_CT_DATA_BITMAP_MAGIC) { + CDEBUG(D_HA, "%s: recover copytool registration to MDT " + "(archive=%#x)\n", imp->imp_obd->obd_name, + kcd->kcd_nr_archives); + } else { + CDEBUG(D_HA, "%s: recover copytool registration to MDT " + "(archive nr = %u)\n", + imp->imp_obd->obd_name, kcd->kcd_nr_archives); + if (kcd->kcd_nr_archives != 0) + archives = kcd->kcd_archives; + } + rc = mdc_ioc_hsm_ct_register(imp, kcd->kcd_nr_archives, archives); /* ignore error if the copytool is already registered */ return (rc == -EEXIST) ? 0 : rc; } @@ -2365,8 +2420,8 @@ static int mdc_hsm_ct_reregister(void *data, void *cb_arg) static int mdc_kuc_reregister(struct obd_import *imp) { /* re-register HSM agents */ - return libcfs_kkuc_group_foreach(KUC_GRP_HSM, mdc_hsm_ct_reregister, - (void *)imp); + return libcfs_kkuc_group_foreach(&imp->imp_obd->obd_uuid, KUC_GRP_HSM, + mdc_hsm_ct_reregister, imp); } static int mdc_set_info_async(const struct lu_env *env, @@ -2399,21 +2454,14 @@ static int mdc_set_info_async(const struct lu_env *env, keylen, key, vallen, val, set); RETURN(rc); } - if (KEY_IS(KEY_SPTLRPC_CONF)) { - sptlrpc_conf_client_adapt(exp->exp_obd); - RETURN(0); - } - if (KEY_IS(KEY_FLUSH_CTX)) { - sptlrpc_import_flush_my_ctx(imp); - RETURN(0); - } if (KEY_IS(KEY_CHANGELOG_CLEAR)) { rc = do_set_info_async(imp, MDS_SET_INFO, LUSTRE_MDS_VERSION, keylen, key, vallen, val, set); RETURN(rc); } if (KEY_IS(KEY_HSM_COPYTOOL_SEND)) { - rc = mdc_hsm_copytool_send(vallen, val); + rc = mdc_hsm_copytool_send(&imp->imp_obd->obd_uuid, vallen, + val); RETURN(rc); } @@ -2424,8 +2472,8 @@ static int mdc_set_info_async(const struct lu_env *env, RETURN(0); } - CERROR("Unknown key %s\n", (char *)key); - RETURN(-EINVAL); + rc = osc_set_info_async(env, exp, keylen, key, vallen, val, set); + RETURN(rc); } static int mdc_get_info(const struct lu_env *env, struct obd_export *exp, @@ -2501,17 +2549,97 @@ static int mdc_fsync(struct obd_export *exp, const struct lu_fid *fid, RETURN(rc); } +struct mdc_rmfid_args { + int *mra_rcs; + int mra_nr; +}; + +int mdc_rmfid_interpret(const struct lu_env *env, struct ptlrpc_request *req, + void *args, int rc) +{ + struct mdc_rmfid_args *aa; + int *rcs, size; + ENTRY; + + if (!rc) { + aa = ptlrpc_req_async_args(aa, req); + + size = req_capsule_get_size(&req->rq_pill, &RMF_RCS, + RCL_SERVER); + LASSERT(size == sizeof(int) * aa->mra_nr); + rcs = req_capsule_server_get(&req->rq_pill, &RMF_RCS); + LASSERT(rcs); + LASSERT(aa->mra_rcs); + LASSERT(aa->mra_nr); + memcpy(aa->mra_rcs, rcs, size); + } + + RETURN(rc); +} + +static int mdc_rmfid(struct obd_export *exp, struct fid_array *fa, + int *rcs, struct ptlrpc_request_set *set) +{ + struct ptlrpc_request *req; + struct mdc_rmfid_args *aa; + struct mdt_body *b; + struct lu_fid *tmp; + int rc, flen; + ENTRY; + + req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_RMFID); + if (req == NULL) + RETURN(-ENOMEM); + + flen = fa->fa_nr * sizeof(struct lu_fid); + req_capsule_set_size(&req->rq_pill, &RMF_FID_ARRAY, + RCL_CLIENT, flen); + req_capsule_set_size(&req->rq_pill, &RMF_FID_ARRAY, + RCL_SERVER, flen); + req_capsule_set_size(&req->rq_pill, &RMF_RCS, + RCL_SERVER, fa->fa_nr * sizeof(__u32)); + rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_RMFID); + if (rc) { + ptlrpc_request_free(req); + RETURN(rc); + } + tmp = req_capsule_client_get(&req->rq_pill, &RMF_FID_ARRAY); + memcpy(tmp, fa->fa_fids, flen); + + mdc_pack_body(req, NULL, 0, 0, -1, 0); + b = req_capsule_client_get(&req->rq_pill, &RMF_MDT_BODY); + b->mbo_ctime = ktime_get_real_seconds(); + + ptlrpc_request_set_replen(req); + + LASSERT(rcs); + aa = ptlrpc_req_async_args(aa, req); + aa->mra_rcs = rcs; + aa->mra_nr = fa->fa_nr; + req->rq_interpret_reply = mdc_rmfid_interpret; + + ptlrpc_set_add_req(set, req); + ptlrpc_check_set(NULL, set); + + RETURN(rc); +} + static int mdc_import_event(struct obd_device *obd, struct obd_import *imp, enum obd_import_event event) { + struct client_obd *cli = &obd->u.cli; int rc = 0; LASSERT(imp->imp_obd == obd); switch (event) { - - case IMP_EVENT_INACTIVE: { - struct client_obd *cli = &obd->u.cli; + case IMP_EVENT_DISCON: + spin_lock(&cli->cl_loi_list_lock); + cli->cl_avail_grant = 0; + cli->cl_lost_grant = 0; + spin_unlock(&cli->cl_loi_list_lock); + break; + case IMP_EVENT_INACTIVE: /* * Flush current sequence to make client obtain new one * from server in case of disconnect/reconnect. @@ -2523,12 +2651,28 @@ static int mdc_import_event(struct obd_device *obd, struct obd_import *imp, rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE); break; - } case IMP_EVENT_INVALIDATE: { struct ldlm_namespace *ns = obd->obd_namespace; + struct lu_env *env; + __u16 refcheck; ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY); + env = cl_env_get(&refcheck); + if (!IS_ERR(env)) { + /* Reset grants. All pages go to failing rpcs due to + * the invalid import. + */ + osc_io_unplug(env, cli, NULL); + + cfs_hash_for_each_nolock(ns->ns_rs_hash, + osc_ldlm_resource_invalidate, + env, 0); + cl_env_put(env, &refcheck); + ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY); + } else { + rc = PTR_ERR(env); + } break; } case IMP_EVENT_ACTIVE: @@ -2537,10 +2681,15 @@ static int mdc_import_event(struct obd_device *obd, struct obd_import *imp, if (rc == 0) rc = mdc_kuc_reregister(imp); break; - case IMP_EVENT_OCD: + case IMP_EVENT_OCD: { + struct obd_connect_data *ocd = &imp->imp_connect_data; + + if (OCD_HAS_FLAG(ocd, GRANT)) + osc_init_grant(cli, ocd); + rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD); break; - case IMP_EVENT_DISCON: + } case IMP_EVENT_DEACTIVATE: case IMP_EVENT_ACTIVATE: break; @@ -2589,6 +2738,12 @@ static int mdc_cancel_weight(struct ldlm_lock *lock) if (lock->l_policy_data.l_inodebits.bits & MDS_INODELOCK_OPEN) RETURN(0); + /* Special case for DoM locks, cancel only unused and granted locks */ + if (ldlm_has_dom(lock) && + (lock->l_granted_mode != lock->l_req_mode || + osc_ldlm_weigh_ast(lock) != 0)) + RETURN(0); + RETURN(1); } @@ -2637,25 +2792,21 @@ static void mdc_llog_finish(struct obd_device *obd) EXIT; } -static int mdc_setup(struct obd_device *obd, struct lustre_cfg *cfg) +int mdc_setup(struct obd_device *obd, struct lustre_cfg *cfg) { - int rc; + int rc; + ENTRY; - rc = ptlrpcd_addref(); + rc = osc_setup_common(obd, cfg); if (rc < 0) RETURN(rc); - rc = client_obd_setup(obd, cfg); - if (rc) - GOTO(err_ptlrpcd_decref, rc); -#ifdef CONFIG_PROC_FS - obd->obd_vars = lprocfs_mdc_obd_vars; - lprocfs_obd_setup(obd); - lprocfs_alloc_md_stats(obd, 0); -#endif - sptlrpc_lprocfs_cliobd_attach(obd); - ptlrpc_lprocfs_register_obd(obd); + rc = mdc_tunables_init(obd); + if (rc) + GOTO(err_osc_cleanup, rc); + + obd->u.cli.cl_dom_min_inline_repsize = MDC_DOM_DEF_INLINE_REPSIZE; ns_register_cancel(obd->obd_namespace, mdc_cancel_weight); @@ -2663,16 +2814,28 @@ static int mdc_setup(struct obd_device *obd, struct lustre_cfg *cfg) rc = mdc_llog_init(obd); if (rc) { - mdc_cleanup(obd); - CERROR("failed to setup llogging subsystems\n"); - RETURN(rc); + CERROR("%s: failed to setup llogging subsystems: rc = %d\n", + obd->obd_name, rc); + GOTO(err_llog_cleanup, rc); } - RETURN(rc); + rc = mdc_changelog_cdev_init(obd); + if (rc) { + CERROR("%s: failed to setup changelog char device: rc = %d\n", + obd->obd_name, rc); + GOTO(err_changelog_cleanup, rc); + } -err_ptlrpcd_decref: - ptlrpcd_decref(); - RETURN(rc); + RETURN(rc); + +err_changelog_cleanup: + mdc_llog_finish(obd); +err_llog_cleanup: + lprocfs_free_md_stats(obd); + ptlrpc_lprocfs_unregister_obd(obd); +err_osc_cleanup: + osc_cleanup_common(obd); + return rc; } /* Initialize the default and maximum LOV EA sizes. This allows @@ -2703,13 +2866,11 @@ static int mdc_precleanup(struct obd_device *obd) { ENTRY; - /* Failsafe, ok if racy */ - if (obd->obd_type->typ_refcnt <= 1) - libcfs_kkuc_group_rem(0, KUC_GRP_HSM); + osc_precleanup_common(obd); + mdc_changelog_cdev_finish(obd); obd_cleanup_client_import(obd); ptlrpc_lprocfs_unregister_obd(obd); - lprocfs_obd_cleanup(obd); lprocfs_free_md_stats(obd); mdc_llog_finish(obd); RETURN(0); @@ -2717,78 +2878,100 @@ static int mdc_precleanup(struct obd_device *obd) static int mdc_cleanup(struct obd_device *obd) { - ptlrpcd_decref(); - - return client_obd_cleanup(obd); -} - -static int mdc_process_config(struct obd_device *obd, size_t len, void *buf) -{ - struct lustre_cfg *lcfg = buf; - int rc = class_process_proc_param(PARAM_MDC, obd->obd_vars, lcfg, obd); - return (rc > 0 ? 0: rc); -} - -static struct obd_ops mdc_obd_ops = { - .o_owner = THIS_MODULE, - .o_setup = mdc_setup, - .o_precleanup = mdc_precleanup, - .o_cleanup = mdc_cleanup, - .o_add_conn = client_import_add_conn, - .o_del_conn = client_import_del_conn, - .o_connect = client_connect_import, - .o_disconnect = client_disconnect_export, - .o_iocontrol = mdc_iocontrol, - .o_set_info_async = mdc_set_info_async, - .o_statfs = mdc_statfs, + return osc_cleanup_common(obd); +} + +static const struct obd_ops mdc_obd_ops = { + .o_owner = THIS_MODULE, + .o_setup = mdc_setup, + .o_precleanup = mdc_precleanup, + .o_cleanup = mdc_cleanup, + .o_add_conn = client_import_add_conn, + .o_del_conn = client_import_del_conn, + .o_connect = client_connect_import, + .o_reconnect = osc_reconnect, + .o_disconnect = osc_disconnect, + .o_iocontrol = mdc_iocontrol, + .o_set_info_async = mdc_set_info_async, + .o_statfs = mdc_statfs, + .o_statfs_async = mdc_statfs_async, .o_fid_init = client_fid_init, .o_fid_fini = client_fid_fini, - .o_fid_alloc = mdc_fid_alloc, - .o_import_event = mdc_import_event, - .o_get_info = mdc_get_info, - .o_process_config = mdc_process_config, - .o_get_uuid = mdc_get_uuid, - .o_quotactl = mdc_quotactl, + .o_fid_alloc = mdc_fid_alloc, + .o_import_event = mdc_import_event, + .o_get_info = mdc_get_info, + .o_get_uuid = mdc_get_uuid, + .o_quotactl = mdc_quotactl, }; -static struct md_ops mdc_md_ops = { +static const struct md_ops mdc_md_ops = { .m_get_root = mdc_get_root, - .m_null_inode = mdc_null_inode, - .m_close = mdc_close, - .m_create = mdc_create, - .m_enqueue = mdc_enqueue, - .m_getattr = mdc_getattr, - .m_getattr_name = mdc_getattr_name, - .m_intent_lock = mdc_intent_lock, - .m_link = mdc_link, - .m_rename = mdc_rename, - .m_setattr = mdc_setattr, - .m_setxattr = mdc_setxattr, - .m_getxattr = mdc_getxattr, + .m_null_inode = mdc_null_inode, + .m_close = mdc_close, + .m_create = mdc_create, + .m_enqueue = mdc_enqueue, + .m_getattr = mdc_getattr, + .m_getattr_name = mdc_getattr_name, + .m_intent_lock = mdc_intent_lock, + .m_link = mdc_link, + .m_rename = mdc_rename, + .m_setattr = mdc_setattr, + .m_setxattr = mdc_setxattr, + .m_getxattr = mdc_getxattr, .m_fsync = mdc_fsync, + .m_file_resync = mdc_file_resync, .m_read_page = mdc_read_page, - .m_unlink = mdc_unlink, - .m_cancel_unused = mdc_cancel_unused, - .m_init_ea_size = mdc_init_ea_size, - .m_set_lock_data = mdc_set_lock_data, - .m_lock_match = mdc_lock_match, - .m_get_lustre_md = mdc_get_lustre_md, - .m_free_lustre_md = mdc_free_lustre_md, - .m_set_open_replay_data = mdc_set_open_replay_data, - .m_clear_open_replay_data = mdc_clear_open_replay_data, - .m_intent_getattr_async = mdc_intent_getattr_async, - .m_revalidate_lock = mdc_revalidate_lock + .m_unlink = mdc_unlink, + .m_cancel_unused = mdc_cancel_unused, + .m_init_ea_size = mdc_init_ea_size, + .m_set_lock_data = mdc_set_lock_data, + .m_lock_match = mdc_lock_match, + .m_get_lustre_md = mdc_get_lustre_md, + .m_free_lustre_md = mdc_free_lustre_md, + .m_set_open_replay_data = mdc_set_open_replay_data, + .m_clear_open_replay_data = mdc_clear_open_replay_data, + .m_intent_getattr_async = mdc_intent_getattr_async, + .m_revalidate_lock = mdc_revalidate_lock, + .m_rmfid = mdc_rmfid, }; +dev_t mdc_changelog_dev; +struct class *mdc_changelog_class; static int __init mdc_init(void) { - return class_register_type(&mdc_obd_ops, &mdc_md_ops, true, NULL, - LUSTRE_MDC_NAME, NULL); + int rc = 0; + rc = alloc_chrdev_region(&mdc_changelog_dev, 0, + MDC_CHANGELOG_DEV_COUNT, + MDC_CHANGELOG_DEV_NAME); + if (rc) + return rc; + + mdc_changelog_class = class_create(THIS_MODULE, MDC_CHANGELOG_DEV_NAME); + if (IS_ERR(mdc_changelog_class)) { + rc = PTR_ERR(mdc_changelog_class); + goto out_dev; + } + + rc = class_register_type(&mdc_obd_ops, &mdc_md_ops, true, + LUSTRE_MDC_NAME, &mdc_device_type); + if (rc) + goto out_class; + + return 0; + +out_class: + class_destroy(mdc_changelog_class); +out_dev: + unregister_chrdev_region(mdc_changelog_dev, MDC_CHANGELOG_DEV_COUNT); + return rc; } static void __exit mdc_exit(void) { - class_unregister_type(LUSTRE_MDC_NAME); + class_unregister_type(LUSTRE_MDC_NAME); + class_destroy(mdc_changelog_class); + unregister_chrdev_region(mdc_changelog_dev, MDC_CHANGELOG_DEV_COUNT); + idr_destroy(&mdc_changelog_minor_idr); } MODULE_AUTHOR("OpenSFS, Inc. ");