X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fmdc%2Fmdc_request.c;h=56fcc0d626d6f9e94d0d7cf5622575f3a401b363;hp=1699d29e69b5287c1bb3219801ec98d2386db532;hb=9eda825b1b449baaf2676cc80ccae79d4297cf2d;hpb=1f1d3a376d488d715dd1b0c94d5b66ea05c1e6ca diff --git a/lustre/mdc/mdc_request.c b/lustre/mdc/mdc_request.c index 1699d29..56fcc0d 100644 --- a/lustre/mdc/mdc_request.c +++ b/lustre/mdc/mdc_request.c @@ -27,7 +27,7 @@ * Copyright (c) 2001, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2011, 2013, Intel Corporation. + * Copyright (c) 2011, 2014, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -41,6 +41,11 @@ #include #include #include +#include +#include +#ifdef HAVE_UIDGID_HEADER +# include +#endif #include #include @@ -50,8 +55,8 @@ #include #include #include +#include #include -#include #include "mdc_internal.h" @@ -64,8 +69,9 @@ struct mdc_renew_capa_args { static int mdc_cleanup(struct obd_device *obd); -int mdc_unpack_capa(struct obd_export *exp, struct ptlrpc_request *req, - const struct req_msg_field *field, struct obd_capa **oc) +static int mdc_unpack_capa(struct obd_export *exp, struct ptlrpc_request *req, + const struct req_msg_field *field, + struct obd_capa **oc) { struct lustre_capa *capa; struct obd_capa *c; @@ -150,8 +156,8 @@ out: } /* This should be mdc_get_info("rootfid") */ -int mdc_getstatus(struct obd_export *exp, struct lu_fid *rootfid, - struct obd_capa **pc) +static int mdc_getstatus(struct obd_export *exp, struct lu_fid *rootfid, + struct obd_capa **pc) { return send_getstatus(class_exp2cliimp(exp), rootfid, pc, LUSTRE_IMP_FULL, 0); @@ -216,8 +222,8 @@ static int mdc_getattr_common(struct obd_export *exp, RETURN(0); } -int mdc_getattr(struct obd_export *exp, struct md_op_data *op_data, - struct ptlrpc_request **request) +static int mdc_getattr(struct obd_export *exp, struct md_op_data *op_data, + struct ptlrpc_request **request) { struct ptlrpc_request *req; int rc; @@ -261,8 +267,8 @@ int mdc_getattr(struct obd_export *exp, struct md_op_data *op_data, RETURN(rc); } -int mdc_getattr_name(struct obd_export *exp, struct md_op_data *op_data, - struct ptlrpc_request **request) +static int mdc_getattr_name(struct obd_export *exp, struct md_op_data *op_data, + struct ptlrpc_request **request) { struct ptlrpc_request *req; int rc; @@ -308,11 +314,11 @@ int mdc_getattr_name(struct obd_export *exp, struct md_op_data *op_data, } static int mdc_xattr_common(struct obd_export *exp,const struct req_format *fmt, - const struct lu_fid *fid, - struct obd_capa *oc, int opcode, obd_valid valid, - const char *xattr_name, const char *input, - int input_size, int output_size, int flags, - __u32 suppgid, struct ptlrpc_request **request) + const struct lu_fid *fid, + struct obd_capa *oc, int opcode, u64 valid, + const char *xattr_name, const char *input, + int input_size, int output_size, int flags, + __u32 suppgid, struct ptlrpc_request **request) { struct ptlrpc_request *req; int xattr_namelen = 0; @@ -418,10 +424,12 @@ static int mdc_xattr_common(struct obd_export *exp,const struct req_format *fmt, RETURN(rc); } -int mdc_setxattr(struct obd_export *exp, const struct lu_fid *fid, - struct obd_capa *oc, obd_valid valid, const char *xattr_name, - const char *input, int input_size, int output_size, - int flags, __u32 suppgid, struct ptlrpc_request **request) +static int mdc_setxattr(struct obd_export *exp, const struct lu_fid *fid, + struct obd_capa *oc, u64 valid, + const char *xattr_name, + const char *input, int input_size, int output_size, + int flags, __u32 suppgid, + struct ptlrpc_request **request) { return mdc_xattr_common(exp, &RQF_MDS_REINT_SETXATTR, fid, oc, MDS_REINT, valid, xattr_name, @@ -429,10 +437,11 @@ int mdc_setxattr(struct obd_export *exp, const struct lu_fid *fid, suppgid, request); } -int mdc_getxattr(struct obd_export *exp, const struct lu_fid *fid, - struct obd_capa *oc, obd_valid valid, const char *xattr_name, - const char *input, int input_size, int output_size, - int flags, struct ptlrpc_request **request) +static int mdc_getxattr(struct obd_export *exp, const struct lu_fid *fid, + struct obd_capa *oc, u64 valid, + const char *xattr_name, + const char *input, int input_size, int output_size, + int flags, struct ptlrpc_request **request) { return mdc_xattr_common(exp, &RQF_MDS_GETXATTR, fid, oc, MDS_GETXATTR, valid, xattr_name, @@ -520,7 +529,7 @@ int mdc_get_lustre_md(struct obd_export *exp, struct ptlrpc_request *req, if (rc < 0) GOTO(out, rc); - if (rc < sizeof(*md->lsm)) { + if (rc < (typeof(rc))sizeof(*md->lsm)) { CDEBUG(D_INFO, "lsm size too small: " "rc < sizeof (*md->lsm) (%d < %d)\n", rc, (int)sizeof(*md->lsm)); @@ -555,7 +564,7 @@ int mdc_get_lustre_md(struct obd_export *exp, struct ptlrpc_request *req, if (rc < 0) GOTO(out, rc); - if (rc < sizeof(*md->lmv)) { + if (rc < (typeof(rc))sizeof(*md->lmv)) { CDEBUG(D_INFO, "size too small: " "rc < sizeof(*md->lmv) (%d < %d)\n", rc, (int)sizeof(*md->lmv)); @@ -631,10 +640,6 @@ int mdc_free_lustre_md(struct obd_export *exp, struct lustre_md *md) RETURN(0); } -/** - * Handles both OPEN and SETATTR RPCs for OPEN-CLOSE and SETATTR-DONE_WRITING - * RPC chains. - */ void mdc_replay_open(struct ptlrpc_request *req) { struct md_open_data *mod = req->rq_cb_data; @@ -671,17 +676,18 @@ void mdc_replay_open(struct ptlrpc_request *req) __u32 opc = lustre_msg_get_opc(close_req->rq_reqmsg); struct mdt_ioepoch *epoch; - LASSERT(opc == MDS_CLOSE || opc == MDS_DONE_WRITING); - epoch = req_capsule_client_get(&close_req->rq_pill, - &RMF_MDT_EPOCH); - LASSERT(epoch); + LASSERT(opc == MDS_CLOSE); + epoch = req_capsule_client_get(&close_req->rq_pill, + &RMF_MDT_EPOCH); + LASSERT(epoch); - if (och != NULL) - LASSERT(!memcmp(&old, &epoch->handle, sizeof(old))); - DEBUG_REQ(D_HA, close_req, "updating close body with new fh"); - epoch->handle = body->mbo_handle; - } - EXIT; + if (och != NULL) + LASSERT(!memcmp(&old, &epoch->mio_handle, sizeof(old))); + + DEBUG_REQ(D_HA, close_req, "updating close body with new fh"); + epoch->mio_handle = body->mbo_handle; + } + EXIT; } void mdc_commit_open(struct ptlrpc_request *req) @@ -817,24 +823,8 @@ int mdc_clear_open_replay_data(struct obd_export *exp, RETURN(0); } -/* Prepares the request for the replay by the given reply */ -static void mdc_close_handle_reply(struct ptlrpc_request *req, - struct md_op_data *op_data, int rc) { - struct mdt_body *repbody; - struct mdt_ioepoch *epoch; - - if (req && rc == -EAGAIN) { - repbody = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY); - epoch = req_capsule_client_get(&req->rq_pill, &RMF_MDT_EPOCH); - - epoch->flags |= MF_SOM_AU; - if (repbody->mbo_valid & OBD_MD_FLGETATTRLOCK) - op_data->op_flags |= MF_GETATTR_LOCK; - } -} - -int mdc_close(struct obd_export *exp, struct md_op_data *op_data, - struct md_open_data *mod, struct ptlrpc_request **request) +static int mdc_close(struct obd_export *exp, struct md_op_data *op_data, + struct md_open_data *mod, struct ptlrpc_request **request) { struct obd_device *obd = class_exp2obd(exp); struct ptlrpc_request *req; @@ -947,126 +937,10 @@ int mdc_close(struct obd_export *exp, struct md_op_data *op_data, obd_mod_put(mod); } *request = req; - mdc_close_handle_reply(req, op_data, rc); - RETURN(rc < 0 ? rc : saved_rc); -} - -int mdc_done_writing(struct obd_export *exp, struct md_op_data *op_data, - struct md_open_data *mod) -{ - struct obd_device *obd = class_exp2obd(exp); - struct ptlrpc_request *req; - int rc; - ENTRY; - - req = ptlrpc_request_alloc(class_exp2cliimp(exp), - &RQF_MDS_DONE_WRITING); - if (req == NULL) - RETURN(-ENOMEM); - - mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1); - rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_DONE_WRITING); - if (rc) { - ptlrpc_request_free(req); - RETURN(rc); - } - - if (mod != NULL) { - LASSERTF(mod->mod_open_req != NULL && - mod->mod_open_req->rq_type != LI_POISON, - "POISONED setattr %p!\n", mod->mod_open_req); - - mod->mod_close_req = req; - DEBUG_REQ(D_HA, mod->mod_open_req, "matched setattr"); - /* We no longer want to preserve this setattr for replay even - * though the open was committed. b=3632, b=3633 */ - spin_lock(&mod->mod_open_req->rq_lock); - mod->mod_open_req->rq_replay = 0; - spin_unlock(&mod->mod_open_req->rq_lock); - } - - mdc_close_pack(req, op_data); - ptlrpc_request_set_replen(req); - - mdc_get_rpc_lock(obd->u.cli.cl_close_lock, NULL); - rc = ptlrpc_queue_wait(req); - mdc_put_rpc_lock(obd->u.cli.cl_close_lock, NULL); - - if (rc == -ESTALE) { - /** - * it can be allowed error after 3633 if open or setattr were - * committed and server failed before close was sent. - * Let's check if mod exists and return no error in that case - */ - if (mod) { - LASSERT(mod->mod_open_req != NULL); - if (mod->mod_open_req->rq_committed) - rc = 0; - } - } - - if (mod) { - if (rc != 0) - mod->mod_close_req = NULL; - LASSERT(mod->mod_open_req != NULL); - mdc_free_open(mod); - /* Since now, mod is accessed through setattr req only, - * thus DW req does not keep a reference on mod anymore. */ - obd_mod_put(mod); - } - - mdc_close_handle_reply(req, op_data, rc); - ptlrpc_req_finished(req); - RETURN(rc); + RETURN(rc < 0 ? rc : saved_rc); } -#ifdef HAVE_SPLIT_SUPPORT -int mdc_sendpage(struct obd_export *exp, const struct lu_fid *fid, - const struct page *page, int offset) -{ - struct ptlrpc_request *req; - struct ptlrpc_bulk_desc *desc; - int rc; - ENTRY; - - req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_WRITEPAGE); - if (req == NULL) - RETURN(-ENOMEM); - - /* FIXME: capa doesn't support split yet */ - mdc_set_capa_size(req, &RMF_CAPA1, NULL); - - rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_WRITEPAGE); - if (rc) { - ptlrpc_request_free(req); - RETURN(rc); - } - - req->rq_request_portal = MDS_READPAGE_PORTAL; - ptlrpc_at_set_req_timeout(req); - - desc = ptlrpc_prep_bulk_imp(req, 1, 1,BULK_GET_SOURCE, MDS_BULK_PORTAL); - if (desc == NULL) - GOTO(out, rc = -ENOMEM); - - /* NB req now owns desc and will free it when it gets freed. */ - ptlrpc_prep_bulk_page(desc, (struct page *)page, 0, offset); - mdc_readdir_pack(req, 0, offset, fid, NULL); - - ptlrpc_request_set_replen(req); - rc = ptlrpc_queue_wait(req); - if (rc) - GOTO(out, rc); - - rc = sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk); -out: - ptlrpc_req_finished(req); - return rc; -} -EXPORT_SYMBOL(mdc_sendpage); -#endif - static int mdc_getpage(struct obd_export *exp, const struct lu_fid *fid, __u64 offset, struct obd_capa *oc, struct page **pages, int npages, @@ -1250,7 +1124,7 @@ static struct page *mdc_page_locate(struct address_space *mapping, __u64 *hash, * | | * .|--------v------- -----. * |s|e|f|p|ent|ent| ... |ent| - * '--|-------------- -----' Each CFS_PAGE contains a single + * '--|-------------- -----' Each PAGE contains a single * '------. lu_dirpage. * .---------v------- -----. * |s|e|f|p|ent| 0 | ... | 0 | @@ -1260,26 +1134,26 @@ static struct page *mdc_page_locate(struct address_space *mapping, __u64 *hash, * larger than LU_PAGE_SIZE, a single host page may contain multiple * lu_dirpages. After reading the lu_dirpages from the MDS, the * ldp_hash_end of the first lu_dirpage refers to the one immediately - * after it in the same CFS_PAGE (arrows simplified for brevity, but + * after it in the same PAGE (arrows simplified for brevity, but * in general e0==s1, e1==s2, etc.): * * .-------------------- -----. * |s0|e0|f0|p|ent|ent| ... |ent| * |---v---------------- -----| * |s1|e1|f1|p|ent|ent| ... |ent| - * |---v---------------- -----| Here, each CFS_PAGE contains + * |---v---------------- -----| Here, each PAGE contains * ... multiple lu_dirpages. * |---v---------------- -----| * |s'|e'|f'|p|ent|ent| ... |ent| * '---|---------------- -----' * v * .----------------------------. - * | next CFS_PAGE | + * | next PAGE | * * This structure is transformed into a single logical lu_dirpage as follows: * * - Replace e0 with e' so the request for the next lu_dirpage gets the page - * labeled 'next CFS_PAGE'. + * labeled 'next PAGE'. * * - Copy the LDF_COLLIDE flag from f' to f0 to correctly reflect whether * a hash collision with the next page exists. @@ -1308,8 +1182,8 @@ static void mdc_adjust_dirpages(struct page **pages, int cfs_pgs, int lu_pgs) /* Advance dp to next lu_dirpage. */ dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE); - /* Check if we've reached the end of the CFS_PAGE. */ - if (!((unsigned long)dp & ~CFS_PAGE_MASK)) + /* Check if we've reached the end of the PAGE. */ + if (!((unsigned long)dp & ~PAGE_MASK)) break; /* Save the hash and flags of this lu_dirpage. */ @@ -1349,6 +1223,14 @@ struct readpage_param { struct md_callback *rp_cb; }; +#ifndef HAVE_DELETE_FROM_PAGE_CACHE +static inline void delete_from_page_cache(struct page *page) +{ + remove_from_page_cache(page); + page_cache_release(page); +} +#endif + /** * Read pages from server. * @@ -1356,7 +1238,7 @@ struct readpage_param { * a header lu_dirpage which describes the start/end hash, and whether this * page is empty (contains no dir entry) or hash collide with next page. * After client receives reply, several pages will be integrated into dir page - * in CFS_PAGE_SIZE (if CFS_PAGE_SIZE greater than LU_PAGE_SIZE), and the + * in PAGE_SIZE (if PAGE_SIZE greater than LU_PAGE_SIZE), and the * lu_dirpage for this integrated page will be adjusted. **/ static int mdc_read_page_remote(void *data, struct page *page0) @@ -1398,7 +1280,10 @@ static int mdc_read_page_remote(void *data, struct page *page0) rc = mdc_getpage(rp->rp_exp, fid, rp->rp_off, op_data->op_capa1, page_pool, npages, &req); - if (rc == 0) { + if (rc < 0) { + /* page0 is special, which was added into page cache early */ + delete_from_page_cache(page0); + } else { int lu_pgs; rd_pgs = (req->rq_bulk->bd_nob_transferred + @@ -1407,15 +1292,14 @@ static int mdc_read_page_remote(void *data, struct page *page0) LU_PAGE_SHIFT; LASSERT(!(req->rq_bulk->bd_nob_transferred & ~LU_PAGE_MASK)); - CDEBUG(D_INODE, "read %d(%d)/%d pages\n", rd_pgs, lu_pgs, - op_data->op_npages); + CDEBUG(D_INODE, "read %d(%d) pages\n", rd_pgs, lu_pgs); mdc_adjust_dirpages(page_pool, rd_pgs, lu_pgs); SetPageUptodate(page0); } - unlock_page(page0); + ptlrpc_req_finished(req); CDEBUG(D_CACHE, "read %d/%d pages\n", rd_pgs, npages); for (i = 1; i < npages; i++) { @@ -1541,7 +1425,7 @@ static int mdc_read_page(struct obd_export *exp, struct md_op_data *op_data, rp_param.rp_hash64), mdc_read_page_remote, &rp_param); if (IS_ERR(page)) { - CERROR("%s: read cache page: "DFID" at "LPU64": rc %ld\n", + CDEBUG(D_INFO, "%s: read cache page: "DFID" at "LPU64": %ld\n", exp->exp_obd->obd_name, PFID(&op_data->op_fid1), rp_param.rp_off, PTR_ERR(page)); GOTO(out_unlock, rc = PTR_ERR(page)); @@ -1689,7 +1573,7 @@ static int mdc_ioc_fid2path(struct obd_export *exp, struct getinfo_fid2path *gf) /* Val is struct getinfo_fid2path result plus path */ vallen = sizeof(*gf) + gf->gf_pathlen; - rc = obd_get_info(NULL, exp, keylen, key, &vallen, gf, NULL); + rc = obd_get_info(NULL, exp, keylen, key, &vallen, gf); if (rc != 0 && rc != -EREMOTE) GOTO(out, rc); @@ -1732,7 +1616,10 @@ static int mdc_ioc_hsm_progress(struct obd_export *exp, ptlrpc_request_set_replen(req); - rc = mdc_queue_wait(req); + mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL); + rc = ptlrpc_queue_wait(req); + mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL); + GOTO(out, rc); out: ptlrpc_req_finished(req); @@ -1914,10 +1801,11 @@ static int mdc_ioc_hsm_state_set(struct obd_export *exp, ptlrpc_request_set_replen(req); - rc = mdc_queue_wait(req); - GOTO(out, rc); + mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL); + rc = ptlrpc_queue_wait(req); + mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL); - EXIT; + GOTO(out, rc); out: ptlrpc_req_finished(req); return rc; @@ -1973,7 +1861,10 @@ static int mdc_ioc_hsm_request(struct obd_export *exp, ptlrpc_request_set_replen(req); - rc = mdc_queue_wait(req); + mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL); + rc = ptlrpc_queue_wait(req); + mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL); + GOTO(out, rc); out: @@ -1981,7 +1872,7 @@ out: return rc; } -static struct kuc_hdr *changelog_kuc_hdr(char *buf, int len, int flags) +static struct kuc_hdr *changelog_kuc_hdr(char *buf, size_t len, __u32 flags) { struct kuc_hdr *lh = (struct kuc_hdr *)buf; @@ -1995,46 +1886,50 @@ static struct kuc_hdr *changelog_kuc_hdr(char *buf, int len, int flags) return lh; } -#define D_CHANGELOG 0 - struct changelog_show { - __u64 cs_startrec; - __u32 cs_flags; - struct file *cs_fp; - char *cs_buf; - struct obd_device *cs_obd; + __u64 cs_startrec; + enum changelog_send_flag cs_flags; + struct file *cs_fp; + char *cs_buf; + struct obd_device *cs_obd; }; +static inline char *cs_obd_name(struct changelog_show *cs) +{ + return cs->cs_obd->obd_name; +} + static int changelog_kkuc_cb(const struct lu_env *env, struct llog_handle *llh, struct llog_rec_hdr *hdr, void *data) { - struct changelog_show *cs = data; - struct llog_changelog_rec *rec = (struct llog_changelog_rec *)hdr; - struct kuc_hdr *lh; - int len, rc; + struct changelog_show *cs = data; + struct llog_changelog_rec *rec = (struct llog_changelog_rec *)hdr; + struct kuc_hdr *lh; + size_t len; + int rc; ENTRY; if (rec->cr_hdr.lrh_type != CHANGELOG_REC) { rc = -EINVAL; CERROR("%s: not a changelog rec %x/%d: rc = %d\n", - cs->cs_obd->obd_name, rec->cr_hdr.lrh_type, + cs_obd_name(cs), rec->cr_hdr.lrh_type, rec->cr.cr_type, rc); RETURN(rc); } if (rec->cr.cr_index < cs->cs_startrec) { /* Skip entries earlier than what we are interested in */ - CDEBUG(D_CHANGELOG, "rec="LPU64" start="LPU64"\n", + CDEBUG(D_HSM, "rec="LPU64" start="LPU64"\n", rec->cr.cr_index, cs->cs_startrec); RETURN(0); } - CDEBUG(D_CHANGELOG, LPU64" %02d%-5s "LPU64" 0x%x t="DFID" p="DFID - " %.*s\n", rec->cr.cr_index, rec->cr.cr_type, - changelog_type2str(rec->cr.cr_type), rec->cr.cr_time, - rec->cr.cr_flags & CLF_FLAGMASK, - PFID(&rec->cr.cr_tfid), PFID(&rec->cr.cr_pfid), - rec->cr.cr_namelen, changelog_rec_name(&rec->cr)); + CDEBUG(D_HSM, LPU64" %02d%-5s "LPU64" 0x%x t="DFID" p="DFID" %.*s\n", + rec->cr.cr_index, rec->cr.cr_type, + changelog_type2str(rec->cr.cr_type), rec->cr.cr_time, + rec->cr.cr_flags & CLF_FLAGMASK, + PFID(&rec->cr.cr_tfid), PFID(&rec->cr.cr_pfid), + rec->cr.cr_namelen, changelog_rec_name(&rec->cr)); len = sizeof(*lh) + changelog_rec_size(&rec->cr) + rec->cr.cr_namelen; @@ -2043,38 +1938,43 @@ static int changelog_kkuc_cb(const struct lu_env *env, struct llog_handle *llh, memcpy(lh + 1, &rec->cr, len - sizeof(*lh)); rc = libcfs_kkuc_msg_put(cs->cs_fp, lh); - CDEBUG(D_CHANGELOG, "kucmsg fp %p len %d rc %d\n", cs->cs_fp, len,rc); + CDEBUG(D_HSM, "kucmsg fp %p len %zu rc %d\n", cs->cs_fp, len, rc); RETURN(rc); } static int mdc_changelog_send_thread(void *csdata) { - struct changelog_show *cs = csdata; - struct llog_ctxt *ctxt = NULL; - struct llog_handle *llh = NULL; - struct kuc_hdr *kuch; - int rc; + struct changelog_show *cs = csdata; + struct llog_ctxt *ctxt = NULL; + struct llog_handle *llh = NULL; + struct kuc_hdr *kuch; + enum llog_flag flags = LLOG_F_IS_CAT; + int rc; - CDEBUG(D_CHANGELOG, "changelog to fp=%p start "LPU64"\n", + CDEBUG(D_HSM, "changelog to fp=%p start "LPU64"\n", cs->cs_fp, cs->cs_startrec); OBD_ALLOC(cs->cs_buf, KUC_CHANGELOG_MSG_MAXSIZE); if (cs->cs_buf == NULL) GOTO(out, rc = -ENOMEM); - /* Set up the remote catalog handle */ - ctxt = llog_get_context(cs->cs_obd, LLOG_CHANGELOG_REPL_CTXT); - if (ctxt == NULL) - GOTO(out, rc = -ENOENT); + /* Set up the remote catalog handle */ + ctxt = llog_get_context(cs->cs_obd, LLOG_CHANGELOG_REPL_CTXT); + if (ctxt == NULL) + GOTO(out, rc = -ENOENT); rc = llog_open(NULL, ctxt, &llh, NULL, CHANGELOG_CATALOG, LLOG_OPEN_EXISTS); if (rc) { CERROR("%s: fail to open changelog catalog: rc = %d\n", - cs->cs_obd->obd_name, rc); + cs_obd_name(cs), rc); GOTO(out, rc); } - rc = llog_init_handle(NULL, llh, LLOG_F_IS_CAT, NULL); + + if (cs->cs_flags & CHANGELOG_FLAG_JOBID) + flags |= LLOG_F_EXT_JOBID; + + rc = llog_init_handle(NULL, llh, flags, NULL); if (rc) { CERROR("llog_init_handle failed %d\n", rc); GOTO(out, rc); @@ -2082,19 +1982,17 @@ static int mdc_changelog_send_thread(void *csdata) rc = llog_cat_process(NULL, llh, changelog_kkuc_cb, cs, 0, 0); - /* Send EOF no matter what our result */ - if ((kuch = changelog_kuc_hdr(cs->cs_buf, sizeof(*kuch), - cs->cs_flags))) { - kuch->kuc_msgtype = CL_EOF; - libcfs_kkuc_msg_put(cs->cs_fp, kuch); - } + /* Send EOF no matter what our result */ + kuch = changelog_kuc_hdr(cs->cs_buf, sizeof(*kuch), cs->cs_flags); + kuch->kuc_msgtype = CL_EOF; + libcfs_kkuc_msg_put(cs->cs_fp, kuch); out: fput(cs->cs_fp); if (llh) llog_cat_close(NULL, llh); - if (ctxt) - llog_ctxt_put(ctxt); + if (ctxt) + llog_ctxt_put(ctxt); if (cs->cs_buf) OBD_FREE(cs->cs_buf, KUC_CHANGELOG_MSG_MAXSIZE); OBD_FREE_PTR(cs); @@ -2128,12 +2026,12 @@ static int mdc_ioc_changelog_send(struct obd_device *obd, if (IS_ERR(task)) { rc = PTR_ERR(task); CERROR("%s: cannot start changelog thread: rc = %d\n", - obd->obd_name, rc); + cs_obd_name(cs), rc); OBD_FREE_PTR(cs); } else { rc = 0; - CDEBUG(D_CHANGELOG, "%s: started changelog thread\n", - obd->obd_name); + CDEBUG(D_HSM, "%s: started changelog thread\n", + cs_obd_name(cs)); } return rc; @@ -2286,7 +2184,7 @@ out: } static int mdc_iocontrol(unsigned int cmd, struct obd_export *exp, int len, - void *karg, void *uarg) + void *karg, void __user *uarg) { struct obd_device *obd = exp->exp_obd; struct obd_ioctl_data *data = karg; @@ -2420,9 +2318,9 @@ out: return rc; } -int mdc_get_info_rpc(struct obd_export *exp, - obd_count keylen, void *key, - int vallen, void *val) +static int mdc_get_info_rpc(struct obd_export *exp, + u32 keylen, void *key, + u32 vallen, void *val) { struct obd_import *imp = class_exp2cliimp(exp); struct ptlrpc_request *req; @@ -2437,7 +2335,7 @@ int mdc_get_info_rpc(struct obd_export *exp, req_capsule_set_size(&req->rq_pill, &RMF_GETINFO_KEY, RCL_CLIENT, keylen); req_capsule_set_size(&req->rq_pill, &RMF_GETINFO_VALLEN, - RCL_CLIENT, sizeof(__u32)); + RCL_CLIENT, sizeof(vallen)); rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_GET_INFO); if (rc) { @@ -2448,7 +2346,7 @@ int mdc_get_info_rpc(struct obd_export *exp, tmp = req_capsule_client_get(&req->rq_pill, &RMF_GETINFO_KEY); memcpy(tmp, key, keylen); tmp = req_capsule_client_get(&req->rq_pill, &RMF_GETINFO_VALLEN); - memcpy(tmp, &vallen, sizeof(__u32)); + memcpy(tmp, &vallen, sizeof(vallen)); req_capsule_set_size(&req->rq_pill, &RMF_GETINFO_VAL, RCL_SERVER, vallen); @@ -2485,7 +2383,7 @@ static void lustre_swab_hai(struct hsm_action_item *h) static void lustre_swab_hal(struct hsm_action_list *h) { struct hsm_action_item *hai; - int i; + __u32 i; __swab32s(&h->hal_version); __swab32s(&h->hal_count); @@ -2534,7 +2432,7 @@ static int mdc_ioc_hsm_ct_start(struct obd_export *exp, * @param val KUC message (kuc_hdr + hsm_action_list) * @param len total length of message */ -static int mdc_hsm_copytool_send(int len, void *val) +static int mdc_hsm_copytool_send(size_t len, void *val) { struct kuc_hdr *lh = (struct kuc_hdr *)val; struct hsm_action_list *hal = (struct hsm_action_list *)(lh + 1); @@ -2542,8 +2440,8 @@ static int mdc_hsm_copytool_send(int len, void *val) ENTRY; if (len < sizeof(*lh) + sizeof(*hal)) { - CERROR("Short HSM message %d < %d\n", len, - (int) (sizeof(*lh) + sizeof(*hal))); + CERROR("Short HSM message %zu < %zu\n", len, + sizeof(*lh) + sizeof(*hal)); RETURN(-EPROTO); } if (lh->kuc_magic == __swab16(KUC_MAGIC)) { @@ -2602,11 +2500,11 @@ static int mdc_kuc_reregister(struct obd_import *imp) (void *)imp); } -int mdc_set_info_async(const struct lu_env *env, - struct obd_export *exp, - obd_count keylen, void *key, - obd_count vallen, void *val, - struct ptlrpc_request_set *set) +static int mdc_set_info_async(const struct lu_env *env, + struct obd_export *exp, + u32 keylen, void *key, + u32 vallen, void *val, + struct ptlrpc_request_set *set) { struct obd_import *imp = class_exp2cliimp(exp); int rc; @@ -2650,29 +2548,35 @@ int mdc_set_info_async(const struct lu_env *env, RETURN(rc); } + if (KEY_IS(KEY_DEFAULT_EASIZE)) { + __u32 *default_easize = val; + + exp->exp_obd->u.cli.cl_default_mds_easize = *default_easize; + RETURN(0); + } + CERROR("Unknown key %s\n", (char *)key); RETURN(-EINVAL); } -int mdc_get_info(const struct lu_env *env, struct obd_export *exp, - __u32 keylen, void *key, __u32 *vallen, void *val, - struct lov_stripe_md *lsm) +static int mdc_get_info(const struct lu_env *env, struct obd_export *exp, + __u32 keylen, void *key, __u32 *vallen, void *val) { int rc = -EINVAL; if (KEY_IS(KEY_MAX_EASIZE)) { - int mdsize, *max_easize; + __u32 mdsize, *max_easize; if (*vallen != sizeof(int)) RETURN(-EINVAL); - mdsize = *(int *)val; + mdsize = *(__u32 *)val; if (mdsize > exp->exp_obd->u.cli.cl_max_mds_easize) exp->exp_obd->u.cli.cl_max_mds_easize = mdsize; max_easize = val; *max_easize = exp->exp_obd->u.cli.cl_max_mds_easize; RETURN(0); } else if (KEY_IS(KEY_DEFAULT_EASIZE)) { - int *default_easize; + __u32 *default_easize; if (*vallen != sizeof(int)) RETURN(-EINVAL); @@ -2680,18 +2584,15 @@ int mdc_get_info(const struct lu_env *env, struct obd_export *exp, *default_easize = exp->exp_obd->u.cli.cl_default_mds_easize; RETURN(0); } else if (KEY_IS(KEY_MAX_COOKIESIZE)) { - int mdsize, *max_cookiesize; + __u32 *max_cookiesize; if (*vallen != sizeof(int)) RETURN(-EINVAL); - mdsize = *(int *)val; - if (mdsize > exp->exp_obd->u.cli.cl_max_mds_cookiesize) - exp->exp_obd->u.cli.cl_max_mds_cookiesize = mdsize; max_cookiesize = val; *max_cookiesize = exp->exp_obd->u.cli.cl_max_mds_cookiesize; RETURN(0); } else if (KEY_IS(KEY_DEFAULT_COOKIESIZE)) { - int *default_cookiesize; + __u32 *default_cookiesize; if (*vallen != sizeof(int)) RETURN(-EINVAL); @@ -2709,7 +2610,7 @@ int mdc_get_info(const struct lu_env *env, struct obd_export *exp, *data = imp->imp_connect_data; RETURN(0); } else if (KEY_IS(KEY_TGT_COUNT)) { - *((int *)val) = 1; + *((__u32 *)val) = 1; RETURN(0); } @@ -2718,8 +2619,8 @@ int mdc_get_info(const struct lu_env *env, struct obd_export *exp, RETURN(rc); } -int mdc_fsync(struct obd_export *exp, const struct lu_fid *fid, - struct obd_capa *oc, struct ptlrpc_request **request) +static int mdc_fsync(struct obd_export *exp, const struct lu_fid *fid, + struct obd_capa *oc, struct ptlrpc_request **request) { struct ptlrpc_request *req; int rc; @@ -2812,7 +2713,8 @@ int mdc_fid_alloc(const struct lu_env *env, struct obd_export *exp, RETURN(seq_client_alloc_fid(env, seq, fid)); } -struct obd_uuid *mdc_get_uuid(struct obd_export *exp) { +static struct obd_uuid *mdc_get_uuid(struct obd_export *exp) +{ struct client_obd *cli = &exp->exp_obd->u.cli; return &cli->cl_target_uuid; } @@ -2844,10 +2746,43 @@ static int mdc_resource_inode_free(struct ldlm_resource *res) return 0; } -struct ldlm_valblock_ops inode_lvbo = { +static struct ldlm_valblock_ops inode_lvbo = { .lvbo_free = mdc_resource_inode_free }; +static int mdc_llog_init(struct obd_device *obd) +{ + struct obd_llog_group *olg = &obd->obd_olg; + struct llog_ctxt *ctxt; + int rc; + + ENTRY; + + rc = llog_setup(NULL, obd, olg, LLOG_CHANGELOG_REPL_CTXT, obd, + &llog_client_ops); + if (rc < 0) + RETURN(rc); + + ctxt = llog_group_get_ctxt(olg, LLOG_CHANGELOG_REPL_CTXT); + llog_initiator_connect(ctxt); + llog_ctxt_put(ctxt); + + RETURN(0); +} + +static void mdc_llog_finish(struct obd_device *obd) +{ + struct llog_ctxt *ctxt; + + ENTRY; + + ctxt = llog_get_context(obd, LLOG_CHANGELOG_REPL_CTXT); + if (ctxt != NULL) + llog_cleanup(NULL, ctxt); + + EXIT; +} + static int mdc_setup(struct obd_device *obd, struct lustre_cfg *cfg) { struct client_obd *cli = &obd->u.cli; @@ -2871,9 +2806,9 @@ static int mdc_setup(struct obd_device *obd, struct lustre_cfg *cfg) rc = client_obd_setup(obd, cfg); if (rc) GOTO(err_close_lock, rc); -#ifdef LPROCFS +#ifdef CONFIG_PROC_FS obd->obd_vars = lprocfs_mdc_obd_vars; - lprocfs_seq_obd_setup(obd); + lprocfs_obd_setup(obd); lprocfs_alloc_md_stats(obd, 0); #endif sptlrpc_lprocfs_cliobd_attach(obd); @@ -2883,7 +2818,7 @@ static int mdc_setup(struct obd_device *obd, struct lustre_cfg *cfg) obd->obd_namespace->ns_lvbo = &inode_lvbo; - rc = obd_llog_init(obd, &obd->obd_olg, obd, NULL); + rc = mdc_llog_init(obd); if (rc) { mdc_cleanup(obd); CERROR("failed to setup llogging subsystems\n"); @@ -2908,8 +2843,9 @@ err_rpc_lock: * a large number of stripes is possible. If a larger reply buffer is * required it will be reallocated in the ptlrpc layer due to overflow. */ -static int mdc_init_ea_size(struct obd_export *exp, int easize, - int def_easize, int cookiesize, int def_cookiesize) +static int mdc_init_ea_size(struct obd_export *exp, __u32 easize, + __u32 def_easize, __u32 cookiesize, + __u32 def_cookiesize) { struct obd_device *obd = exp->exp_obd; struct client_obd *cli = &obd->u.cli; @@ -2947,10 +2883,7 @@ static int mdc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage) ptlrpc_lprocfs_unregister_obd(obd); lprocfs_obd_cleanup(obd); lprocfs_free_md_stats(obd); - - rc = obd_llog_finish(obd, 0); - if (rc != 0) - CERROR("failed to cleanup llogging subsystems\n"); + mdc_llog_finish(obd); break; } RETURN(rc); @@ -2968,55 +2901,18 @@ static int mdc_cleanup(struct obd_device *obd) return client_obd_cleanup(obd); } - -static int mdc_llog_init(struct obd_device *obd, struct obd_llog_group *olg, - struct obd_device *tgt, int *index) -{ - struct llog_ctxt *ctxt; - int rc; - - ENTRY; - - LASSERT(olg == &obd->obd_olg); - - rc = llog_setup(NULL, obd, olg, LLOG_CHANGELOG_REPL_CTXT, tgt, - &llog_client_ops); - if (rc) - RETURN(rc); - - ctxt = llog_group_get_ctxt(olg, LLOG_CHANGELOG_REPL_CTXT); - llog_initiator_connect(ctxt); - llog_ctxt_put(ctxt); - - RETURN(0); -} - -static int mdc_llog_finish(struct obd_device *obd, int count) -{ - struct llog_ctxt *ctxt; - - ENTRY; - - ctxt = llog_get_context(obd, LLOG_CHANGELOG_REPL_CTXT); - if (ctxt) - llog_cleanup(NULL, ctxt); - - RETURN(0); -} - -static int mdc_process_config(struct obd_device *obd, obd_count len, void *buf) +static int mdc_process_config(struct obd_device *obd, size_t len, void *buf) { struct lustre_cfg *lcfg = buf; - int rc = class_process_proc_seq_param(PARAM_MDC, obd->obd_vars, - lcfg, obd); + int rc = class_process_proc_param(PARAM_MDC, obd->obd_vars, lcfg, obd); return (rc > 0 ? 0: rc); } /* get remote permission for current user on fid */ -int mdc_get_remote_perm(struct obd_export *exp, const struct lu_fid *fid, - struct obd_capa *oc, __u32 suppgid, - struct ptlrpc_request **request) +static int mdc_get_remote_perm(struct obd_export *exp, const struct lu_fid *fid, + struct obd_capa *oc, __u32 suppgid, + struct ptlrpc_request **request) { struct ptlrpc_request *req; int rc; @@ -3107,7 +3003,7 @@ static int mdc_renew_capa(struct obd_export *exp, struct obd_capa *oc, RETURN(0); } -struct obd_ops mdc_obd_ops = { +static struct obd_ops mdc_obd_ops = { .o_owner = THIS_MODULE, .o_setup = mdc_setup, .o_precleanup = mdc_precleanup, @@ -3123,8 +3019,6 @@ struct obd_ops mdc_obd_ops = { .o_fid_fini = client_fid_fini, .o_fid_alloc = mdc_fid_alloc, .o_import_event = mdc_import_event, - .o_llog_init = mdc_llog_init, - .o_llog_finish = mdc_llog_finish, .o_get_info = mdc_get_info, .o_process_config = mdc_process_config, .o_get_uuid = mdc_get_uuid, @@ -3132,13 +3026,12 @@ struct obd_ops mdc_obd_ops = { .o_quotacheck = mdc_quotacheck }; -struct md_ops mdc_md_ops = { +static struct md_ops mdc_md_ops = { .m_getstatus = mdc_getstatus, .m_null_inode = mdc_null_inode, .m_find_cbdata = mdc_find_cbdata, .m_close = mdc_close, .m_create = mdc_create, - .m_done_writing = mdc_done_writing, .m_enqueue = mdc_enqueue, .m_getattr = mdc_getattr, .m_getattr_name = mdc_getattr_name, @@ -3166,12 +3059,9 @@ struct md_ops mdc_md_ops = { .m_revalidate_lock = mdc_revalidate_lock }; -int __init mdc_init(void) +static int __init mdc_init(void) { return class_register_type(&mdc_obd_ops, &mdc_md_ops, true, NULL, -#ifndef HAVE_ONLY_PROCFS_SEQ - NULL, -#endif LUSTRE_MDC_NAME, NULL); }