X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fmdc%2Fmdc_request.c;h=aca18074651b33b66a77ca5121ef359995dd192b;hp=1503bf9b5114d7377035ff6eeea0d9db0cbaaa35;hb=0d8c5ccc4ecfe7c0d10a0a4f92fd291320a97190;hpb=dbb6623ef78a60c4fe71e798d52f8ec37281c12f diff --git a/lustre/mdc/mdc_request.c b/lustre/mdc/mdc_request.c index 1503bf9..aca1807 100644 --- a/lustre/mdc/mdc_request.c +++ b/lustre/mdc/mdc_request.c @@ -47,6 +47,7 @@ #endif #include +#include #include #include #include @@ -95,15 +96,15 @@ static inline int mdc_queue_wait(struct ptlrpc_request *req) struct client_obd *cli = &req->rq_import->imp_obd->u.cli; int rc; - /* mdc_enter_request() ensures that this client has no more + /* obd_get_request_slot() ensures that this client has no more * than cl_max_rpcs_in_flight RPCs simultaneously inf light * against an MDT. */ - rc = mdc_enter_request(cli); + rc = obd_get_request_slot(cli); if (rc != 0) return rc; rc = ptlrpc_queue_wait(req); - mdc_exit_request(cli); + obd_put_request_slot(cli); return rc; } @@ -373,7 +374,7 @@ static int mdc_xattr_common(struct obd_export *exp,const struct req_format *fmt, /* Flush local XATTR locks to get rid of a possible cancel RPC */ if (opcode == MDS_REINT && fid_is_sane(fid) && exp->exp_connect_data.ocd_ibits_known & MDS_INODELOCK_XATTR) { - CFS_LIST_HEAD(cancels); + struct list_head cancels = LIST_HEAD_INIT(cancels); int count; /* Without that packing would fail */ @@ -405,8 +406,8 @@ static int mdc_xattr_common(struct obd_export *exp,const struct req_format *fmt, sizeof(struct mdt_rec_reint)); rec = req_capsule_client_get(&req->rq_pill, &RMF_REC_REINT); rec->sx_opcode = REINT_SETXATTR; - rec->sx_fsuid = current_fsuid(); - rec->sx_fsgid = current_fsgid(); + rec->sx_fsuid = from_kuid(&init_user_ns, current_fsuid()); + rec->sx_fsgid = from_kgid(&init_user_ns, current_fsgid()); rec->sx_cap = cfs_curproc_cap_pack(); rec->sx_suppgid1 = suppgid; rec->sx_suppgid2 = -1; @@ -878,7 +879,7 @@ int mdc_close(struct obd_export *exp, struct md_op_data *op_data, req_fmt = &RQF_MDS_RELEASE_CLOSE; /* allocate a FID for volatile file */ - rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data); + rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data); if (rc < 0) { CERROR("%s: "DFID" failed to allocate FID: %d\n", obd->obd_name, PFID(&op_data->op_fid1), rc); @@ -926,10 +927,10 @@ int mdc_close(struct obd_export *exp, struct md_op_data *op_data, mdc_close_pack(req, op_data); - req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, - obd->u.cli.cl_max_mds_easize); - req_capsule_set_size(&req->rq_pill, &RMF_LOGCOOKIES, RCL_SERVER, - obd->u.cli.cl_max_mds_cookiesize); + req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, + obd->u.cli.cl_default_mds_easize); + req_capsule_set_size(&req->rq_pill, &RMF_LOGCOOKIES, RCL_SERVER, + obd->u.cli.cl_default_mds_cookiesize); ptlrpc_request_set_replen(req); @@ -1239,8 +1240,8 @@ static struct page *mdc_page_locate(struct address_space *mapping, __u64 *hash, LASSERTF(*start <= *hash, "start = "LPX64 ",end = "LPX64",hash = "LPX64"\n", *start, *end, *hash); - CDEBUG(D_VFSTRACE, "page%lu [%llu %llu], hash"LPU64"\n", - offset, *start, *end, *hash); + CDEBUG(D_VFSTRACE, "offset %lx ["LPX64" "LPX64"]," + " hash "LPX64"\n", offset, *start, *end, *hash); if (*hash > *end) { kunmap(page); mdc_release_page(page, 0); @@ -1328,8 +1329,8 @@ static void mdc_adjust_dirpages(struct page **pages, int cfs_pgs, int lu_pgs) struct lu_dirpage *first = dp; struct lu_dirent *end_dirent = NULL; struct lu_dirent *ent; - __u64 hash_end = dp->ldp_hash_end; - __u32 flags = dp->ldp_flags; + __u64 hash_end = le64_to_cpu(dp->ldp_hash_end); + __u32 flags = le32_to_cpu(dp->ldp_flags); while (--lu_pgs > 0) { ent = lu_dirent_start(dp); @@ -1344,8 +1345,8 @@ static void mdc_adjust_dirpages(struct page **pages, int cfs_pgs, int lu_pgs) break; /* Save the hash and flags of this lu_dirpage. */ - hash_end = dp->ldp_hash_end; - flags = dp->ldp_flags; + hash_end = le64_to_cpu(dp->ldp_hash_end); + flags = le32_to_cpu(dp->ldp_flags); /* Check if lu_dirpage contains no entries. */ if (end_dirent == NULL) @@ -1502,7 +1503,7 @@ static int mdc_read_page(struct obd_export *exp, struct md_op_data *op_data, { struct lookup_intent it = { .it_op = IT_READDIR }; struct page *page; - struct inode *dir = NULL; + struct inode *dir = op_data->op_data; struct address_space *mapping; struct lu_dirpage *dp; __u64 start = 0; @@ -1516,18 +1517,10 @@ static int mdc_read_page(struct obd_export *exp, struct md_op_data *op_data, *ppage = NULL; - if (op_data->op_mea1 != NULL) { - __u32 index = op_data->op_stripe_offset; - - dir = op_data->op_mea1->lsm_md_oinfo[index].lmo_root; - } else { - dir = op_data->op_data; - } LASSERT(dir != NULL); - mapping = dir->i_mapping; - rc = mdc_intent_lock(exp, op_data, NULL, 0, &it, 0, &enq_req, + rc = mdc_intent_lock(exp, op_data, &it, &enq_req, cb_op->md_blocking_ast, 0); if (enq_req != NULL) ptlrpc_req_finished(enq_req); @@ -1649,10 +1642,13 @@ int mdc_read_entry(struct obd_export *exp, struct md_op_data *op_data, struct lu_dirpage *dp; struct lu_dirent *ent; int rc = 0; + __u32 same_hash_count; + __u64 hash_offset = op_data->op_hash_offset; ENTRY; - CDEBUG(D_INFO, DFID "offset = "LPU64"\n", PFID(&op_data->op_fid1), - op_data->op_hash_offset); + CDEBUG(D_INFO, DFID " offset = "LPU64", flags %#x\n", + PFID(&op_data->op_fid1), op_data->op_hash_offset, + op_data->op_cli_flags); *ppage = NULL; *entp = NULL; @@ -1664,6 +1660,9 @@ int mdc_read_entry(struct obd_export *exp, struct md_op_data *op_data, if (rc != 0) RETURN(rc); + /* same_hash_count means how many entries with this + * hash value has been read */ + same_hash_count = op_data->op_same_hash_offset + 1; dp = page_address(page); for (ent = lu_dirent_start(dp); ent != NULL; ent = lu_dirent_next(ent)) { @@ -1671,20 +1670,37 @@ int mdc_read_entry(struct obd_export *exp, struct md_op_data *op_data, if (le16_to_cpu(ent->lde_namelen) == 0) continue; - if (le64_to_cpu(ent->lde_hash) > op_data->op_hash_offset) - break; + if (le64_to_cpu(ent->lde_hash) < + op_data->op_hash_offset) + continue; + + if (unlikely(le64_to_cpu(ent->lde_hash) == + op_data->op_hash_offset)) { + /* If it is not for next entry, which usually from + * ll_dir_entry_start, return this entry. */ + if (!(op_data->op_cli_flags & CLI_NEXT_ENTRY)) + break; + + /* Keep reading until all of entries being read are + * skipped. */ + if (same_hash_count > 0) { + same_hash_count--; + continue; + } + } + break; } /* If it can not find entry in current page, try next page. */ if (ent == NULL) { - __u64 orig_offset = op_data->op_hash_offset; - - if (dp->ldp_hash_end == MDS_DIR_END_OFF) { - mdc_release_page(page, 0); + if (le64_to_cpu(dp->ldp_hash_end) == MDS_DIR_END_OFF) { + op_data->op_same_hash_offset = 0; + mdc_release_page(page, + le32_to_cpu(dp->ldp_flags) & LDF_COLLIDE); RETURN(0); } - op_data->op_hash_offset = dp->ldp_hash_end; + op_data->op_hash_offset = le64_to_cpu(dp->ldp_hash_end); mdc_release_page(page, le32_to_cpu(dp->ldp_flags) & LDF_COLLIDE); rc = mdc_read_page(exp, op_data, cb_op, &page); @@ -1695,13 +1711,19 @@ int mdc_read_entry(struct obd_export *exp, struct md_op_data *op_data, dp = page_address(page); ent = lu_dirent_start(dp); } + } - op_data->op_hash_offset = orig_offset; + /* If the next hash is the same as the current hash, increase + * the op_same_hash_offset to resolve the same hash conflict */ + if (ent != NULL && op_data->op_cli_flags & CLI_NEXT_ENTRY) { + if (unlikely(le64_to_cpu(ent->lde_hash) == hash_offset)) + op_data->op_same_hash_offset++; + else + op_data->op_same_hash_offset = 0; } *ppage = page; *entp = ent; - RETURN(rc); } @@ -1766,12 +1788,12 @@ int mdc_read_entry(struct obd_export *exp, struct md_op_data *op_data, RETURN(rc); dp = page_address(page); - if (dp->ldp_hash_end < op_data->op_hash_offset) + if (le64_to_cpu(dp->ldp_hash_end) < op_data->op_hash_offset) GOTO(out, *entp = NULL); for (ent = lu_dirent_start(dp); ent != NULL; ent = lu_dirent_next(ent)) - if (ent->lde_hash >= op_data->op_hash_offset) + if (le64_to_cpu(ent->lde_hash) >= op_data->op_hash_offset) break; *entp = ent; out: @@ -2281,8 +2303,9 @@ out: static int mdc_ioc_changelog_send(struct obd_device *obd, struct ioc_changelog *icc) { - struct changelog_show *cs; - int rc; + struct changelog_show *cs; + struct task_struct *task; + int rc; /* Freed in mdc_changelog_send_thread */ OBD_ALLOC_PTR(cs); @@ -2299,16 +2322,20 @@ static int mdc_ioc_changelog_send(struct obd_device *obd, * New thread because we should return to user app before * writing into our pipe */ - rc = PTR_ERR(kthread_run(mdc_changelog_send_thread, cs, - "mdc_clg_send_thread")); - if (!IS_ERR_VALUE(rc)) { - CDEBUG(D_CHANGELOG, "start changelog thread\n"); - return 0; + task = kthread_run(mdc_changelog_send_thread, cs, + "mdc_clg_send_thread"); + if (IS_ERR(task)) { + rc = PTR_ERR(task); + CERROR("%s: cannot start changelog thread: rc = %d\n", + obd->obd_name, rc); + OBD_FREE_PTR(cs); + } else { + rc = 0; + CDEBUG(D_CHANGELOG, "%s: started changelog thread\n", + obd->obd_name); } - CERROR("Failed to start changelog thread: %d\n", rc); - OBD_FREE_PTR(cs); - return rc; + return rc; } static int mdc_ioc_hsm_ct_start(struct obd_export *exp, @@ -2401,7 +2428,7 @@ static int mdc_quotactl(struct obd_device *unused, struct obd_export *exp, static int mdc_ioc_swap_layouts(struct obd_export *exp, struct md_op_data *op_data) { - CFS_LIST_HEAD(cancels); + struct list_head cancels = LIST_HEAD_INIT(cancels); struct ptlrpc_request *req; int rc, count; struct mdc_swap_layouts *msl, *payload; @@ -2416,9 +2443,11 @@ static int mdc_ioc_swap_layouts(struct obd_export *exp, * with the request RPC to avoid extra RPC round trips */ count = mdc_resource_get_unused(exp, &op_data->op_fid1, &cancels, - LCK_CR, MDS_INODELOCK_LAYOUT); + LCK_EX, MDS_INODELOCK_LAYOUT | + MDS_INODELOCK_XATTR); count += mdc_resource_get_unused(exp, &op_data->op_fid2, &cancels, - LCK_CR, MDS_INODELOCK_LAYOUT); + LCK_EX, MDS_INODELOCK_LAYOUT | + MDS_INODELOCK_XATTR); req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_SWAP_LAYOUTS); @@ -2824,22 +2853,50 @@ int mdc_set_info_async(const struct lu_env *env, } int mdc_get_info(const struct lu_env *env, struct obd_export *exp, - __u32 keylen, void *key, __u32 *vallen, void *val, - struct lov_stripe_md *lsm) + __u32 keylen, void *key, __u32 *vallen, void *val, + struct lov_stripe_md *lsm) { - int rc = -EINVAL; + int rc = -EINVAL; - if (KEY_IS(KEY_MAX_EASIZE)) { - int mdsize, *max_easize; + if (KEY_IS(KEY_MAX_EASIZE)) { + int mdsize, *max_easize; - if (*vallen != sizeof(int)) - RETURN(-EINVAL); - mdsize = *(int*)val; - if (mdsize > exp->exp_obd->u.cli.cl_max_mds_easize) - exp->exp_obd->u.cli.cl_max_mds_easize = mdsize; - max_easize = val; - *max_easize = exp->exp_obd->u.cli.cl_max_mds_easize; - RETURN(0); + if (*vallen != sizeof(int)) + RETURN(-EINVAL); + mdsize = *(int *)val; + if (mdsize > exp->exp_obd->u.cli.cl_max_mds_easize) + exp->exp_obd->u.cli.cl_max_mds_easize = mdsize; + max_easize = val; + *max_easize = exp->exp_obd->u.cli.cl_max_mds_easize; + RETURN(0); + } else if (KEY_IS(KEY_DEFAULT_EASIZE)) { + int *default_easize; + + if (*vallen != sizeof(int)) + RETURN(-EINVAL); + default_easize = val; + *default_easize = exp->exp_obd->u.cli.cl_default_mds_easize; + RETURN(0); + } else if (KEY_IS(KEY_MAX_COOKIESIZE)) { + int mdsize, *max_cookiesize; + + if (*vallen != sizeof(int)) + RETURN(-EINVAL); + mdsize = *(int *)val; + if (mdsize > exp->exp_obd->u.cli.cl_max_mds_cookiesize) + exp->exp_obd->u.cli.cl_max_mds_cookiesize = mdsize; + max_cookiesize = val; + *max_cookiesize = exp->exp_obd->u.cli.cl_max_mds_cookiesize; + RETURN(0); + } else if (KEY_IS(KEY_DEFAULT_COOKIESIZE)) { + int *default_cookiesize; + + if (*vallen != sizeof(int)) + RETURN(-EINVAL); + default_cookiesize = val; + *default_cookiesize = + exp->exp_obd->u.cli.cl_default_mds_cookiesize; + RETURN(0); } else if (KEY_IS(KEY_CONN_DATA)) { struct obd_import *imp = class_exp2cliimp(exp); struct obd_connect_data *data = val; @@ -3031,13 +3088,13 @@ static int mdc_import_event(struct obd_device *obd, struct obd_import *imp, RETURN(rc); } -int mdc_fid_alloc(struct obd_export *exp, struct lu_fid *fid, - struct md_op_data *op_data) +int mdc_fid_alloc(const struct lu_env *env, struct obd_export *exp, + struct lu_fid *fid, struct md_op_data *op_data) { - struct client_obd *cli = &exp->exp_obd->u.cli; - struct lu_client_seq *seq = cli->cl_seq; - ENTRY; - RETURN(seq_client_alloc_fid(NULL, seq, fid)); + struct client_obd *cli = &exp->exp_obd->u.cli; + struct lu_client_seq *seq = cli->cl_seq; + ENTRY; + RETURN(seq_client_alloc_fid(env, seq, fid)); } struct obd_uuid *mdc_get_uuid(struct obd_export *exp) { @@ -3050,18 +3107,18 @@ struct obd_uuid *mdc_get_uuid(struct obd_export *exp) { * recovery, non zero value will be return if the lock can be canceled, * or zero returned for not */ -static int mdc_cancel_for_recovery(struct ldlm_lock *lock) +static int mdc_cancel_weight(struct ldlm_lock *lock) { - if (lock->l_resource->lr_type != LDLM_IBITS) - RETURN(0); + if (lock->l_resource->lr_type != LDLM_IBITS) + RETURN(0); - /* FIXME: if we ever get into a situation where there are too many - * opened files with open locks on a single node, then we really - * should replay these open locks to reget it */ - if (lock->l_policy_data.l_inodebits.bits & MDS_INODELOCK_OPEN) - RETURN(0); + /* FIXME: if we ever get into a situation where there are too many + * opened files with open locks on a single node, then we really + * should replay these open locks to reget it */ + if (lock->l_policy_data.l_inodebits.bits & MDS_INODELOCK_OPEN) + RETURN(0); - RETURN(1); + RETURN(1); } static int mdc_resource_inode_free(struct ldlm_resource *res) @@ -3073,7 +3130,7 @@ static int mdc_resource_inode_free(struct ldlm_resource *res) } struct ldlm_valblock_ops inode_lvbo = { - lvbo_free: mdc_resource_inode_free + .lvbo_free = mdc_resource_inode_free }; static int mdc_setup(struct obd_device *obd, struct lustre_cfg *cfg) @@ -3107,7 +3164,7 @@ static int mdc_setup(struct obd_device *obd, struct lustre_cfg *cfg) sptlrpc_lprocfs_cliobd_attach(obd); ptlrpc_lprocfs_register_obd(obd); - ns_register_cancel(obd->obd_namespace, mdc_cancel_for_recovery); + ns_register_cancel(obd->obd_namespace, mdc_cancel_weight); obd->obd_namespace->ns_lvbo = &inode_lvbo; @@ -3129,26 +3186,33 @@ err_rpc_lock: } /* Initialize the default and maximum LOV EA and cookie sizes. This allows - * us to make MDS RPCs with large enough reply buffers to hold the - * maximum-sized (= maximum striped) EA and cookie without having to - * calculate this (via a call into the LOV + OSCs) each time we make an RPC. */ + * us to make MDS RPCs with large enough reply buffers to hold a default + * sized EA and cookie without having to calculate this (via a call into the + * LOV + OSCs) each time we make an RPC. The maximum size is also tracked + * but not used to avoid wastefully vmalloc()'ing large reply buffers when + * a large number of stripes is possible. If a larger reply buffer is + * required it will be reallocated in the ptlrpc layer due to overflow. + */ static int mdc_init_ea_size(struct obd_export *exp, int easize, - int def_easize, int cookiesize) + int def_easize, int cookiesize, int def_cookiesize) { - struct obd_device *obd = exp->exp_obd; - struct client_obd *cli = &obd->u.cli; - ENTRY; + struct obd_device *obd = exp->exp_obd; + struct client_obd *cli = &obd->u.cli; + ENTRY; - if (cli->cl_max_mds_easize < easize) - cli->cl_max_mds_easize = easize; + if (cli->cl_max_mds_easize < easize) + cli->cl_max_mds_easize = easize; - if (cli->cl_default_mds_easize < def_easize) - cli->cl_default_mds_easize = def_easize; + if (cli->cl_default_mds_easize < def_easize) + cli->cl_default_mds_easize = def_easize; - if (cli->cl_max_mds_cookiesize < cookiesize) - cli->cl_max_mds_cookiesize = cookiesize; + if (cli->cl_max_mds_cookiesize < cookiesize) + cli->cl_max_mds_cookiesize = cookiesize; - RETURN(0); + if (cli->cl_default_mds_cookiesize < def_cookiesize) + cli->cl_default_mds_cookiesize = def_cookiesize; + + RETURN(0); } static int mdc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage) @@ -3392,11 +3456,11 @@ struct md_ops mdc_md_ops = { int __init mdc_init(void) { - return class_register_type(&mdc_obd_ops, &mdc_md_ops, NULL, + return class_register_type(&mdc_obd_ops, &mdc_md_ops, true, NULL, #ifndef HAVE_ONLY_PROCFS_SEQ - NULL, + NULL, #endif - LUSTRE_MDC_NAME, NULL); + LUSTRE_MDC_NAME, NULL); } #ifdef __KERNEL__