X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fmdc%2Fmdc_dev.c;h=6ab9bca44e70f37156f943ab9e0a75a26f31d7d3;hp=0ebc4efd745fd8ddc5a9144f29edd5d073b51979;hb=7c99f67d9d39e8a037e830cf08a9df305e6d8da2;hpb=1e7fc14bbf48f7e89876cbaa609972981e343944 diff --git a/lustre/mdc/mdc_dev.c b/lustre/mdc/mdc_dev.c index 0ebc4ef..6ab9bca 100644 --- a/lustre/mdc/mdc_dev.c +++ b/lustre/mdc/mdc_dev.c @@ -34,6 +34,7 @@ #include #include +#include #include "mdc_internal.h" @@ -67,21 +68,17 @@ static void mdc_lock_lvb_update(const struct lu_env *env, struct ldlm_lock *dlmlock, struct ost_lvb *lvb); -static int mdc_set_dom_lock_data(const struct lu_env *env, - struct ldlm_lock *lock, void *data) +static int mdc_set_dom_lock_data(struct ldlm_lock *lock, void *data) { - struct osc_object *obj = data; int set = 0; LASSERT(lock != NULL); LASSERT(lock->l_glimpse_ast == mdc_ldlm_glimpse_ast); lock_res_and_lock(lock); - if (lock->l_ast_data == NULL) { - lock->l_ast_data = data; - mdc_lock_lvb_update(env, obj, lock, NULL); - } + if (lock->l_ast_data == NULL) + lock->l_ast_data = data; if (lock->l_ast_data == data) set = 1; @@ -91,9 +88,9 @@ static int mdc_set_dom_lock_data(const struct lu_env *env, } int mdc_dom_lock_match(const struct lu_env *env, struct obd_export *exp, - struct ldlm_res_id *res_id, - enum ldlm_type type, union ldlm_policy_data *policy, - enum ldlm_mode mode, __u64 *flags, void *data, + struct ldlm_res_id *res_id, enum ldlm_type type, + union ldlm_policy_data *policy, enum ldlm_mode mode, + __u64 *flags, struct osc_object *obj, struct lustre_handle *lockh, int unref) { struct obd_device *obd = exp->exp_obd; @@ -107,11 +104,19 @@ int mdc_dom_lock_match(const struct lu_env *env, struct obd_export *exp, if (rc == 0 || lflags & LDLM_FL_TEST_LOCK) RETURN(rc); - if (data != NULL) { + if (obj != NULL) { struct ldlm_lock *lock = ldlm_handle2lock(lockh); LASSERT(lock != NULL); - if (!mdc_set_dom_lock_data(env, lock, data)) { + if (mdc_set_dom_lock_data(lock, obj)) { + lock_res_and_lock(lock); + if (!ldlm_is_lvb_cached(lock)) { + LASSERT(lock->l_ast_data == obj); + mdc_lock_lvb_update(env, obj, lock, NULL); + ldlm_set_lvb_cached(lock); + } + unlock_res_and_lock(lock); + } else { ldlm_lock_decref(lockh, rc); rc = 0; } @@ -151,7 +156,8 @@ again: * VFS and page cache already protect us locally, so lots of readers/ * writers can share a single PW lock. */ mode = mdc_dom_lock_match(env, osc_export(obj), resname, LDLM_IBITS, - policy, LCK_PR | LCK_PW, &flags, obj, &lockh, + policy, LCK_PR | LCK_PW | LCK_GROUP, &flags, + obj, &lockh, dap_flags & OSC_DAP_FL_CANCELING); if (mode != 0) { lock = ldlm_handle2lock(&lockh); @@ -259,7 +265,9 @@ static int mdc_lock_flush(const struct lu_env *env, struct osc_object *obj, result = 0; } - rc = mdc_lock_discard_pages(env, obj, start, end, discard); + /* Avoid lock matching with CLM_WRITE, there can be no other locks */ + rc = mdc_lock_discard_pages(env, obj, start, end, + mode == CLM_WRITE || discard); if (result == 0 && rc < 0) result = rc; @@ -290,7 +298,7 @@ void mdc_lock_lockless_cancel(const struct lu_env *env, */ static int mdc_dlm_blocking_ast0(const struct lu_env *env, struct ldlm_lock *dlmlock, - void *data, int flag) + int flag) { struct cl_object *obj = NULL; int result = 0; @@ -315,10 +323,8 @@ static int mdc_dlm_blocking_ast0(const struct lu_env *env, if (dlmlock->l_ast_data != NULL) { obj = osc2cl(dlmlock->l_ast_data); - dlmlock->l_ast_data = NULL; cl_object_get(obj); } - ldlm_set_kms_ignore(dlmlock); unlock_res_and_lock(dlmlock); /* if l_ast_data is NULL, the dlmlock was enqueued by AGL or @@ -334,6 +340,7 @@ static int mdc_dlm_blocking_ast0(const struct lu_env *env, */ /* losing a lock, update kms */ lock_res_and_lock(dlmlock); + dlmlock->l_ast_data = NULL; cl_object_attr_lock(obj); attr->cat_kms = 0; cl_object_attr_update(env, obj, attr, CAT_KMS); @@ -382,7 +389,7 @@ int mdc_ldlm_blocking_ast(struct ldlm_lock *dlmlock, break; } - rc = mdc_dlm_blocking_ast0(env, dlmlock, data, flag); + rc = mdc_dlm_blocking_ast0(env, dlmlock, flag); cl_env_put(env, &refcheck); break; } @@ -408,6 +415,7 @@ void mdc_lock_lvb_update(const struct lu_env *env, struct osc_object *osc, struct cl_attr *attr = &osc_env_info(env)->oti_attr; unsigned valid = CAT_BLOCKS | CAT_ATIME | CAT_CTIME | CAT_MTIME | CAT_SIZE; + unsigned int setkms = 0; ENTRY; @@ -425,24 +433,31 @@ void mdc_lock_lvb_update(const struct lu_env *env, struct osc_object *osc, size = lvb->lvb_size; if (size >= oinfo->loi_kms) { - LDLM_DEBUG(dlmlock, "lock acquired, setting rss=%llu," - " kms=%llu", lvb->lvb_size, size); valid |= CAT_KMS; attr->cat_kms = size; - } else { - LDLM_DEBUG(dlmlock, "lock acquired, setting rss=%llu," - " leaving kms=%llu", - lvb->lvb_size, oinfo->loi_kms); + setkms = 1; } } + + /* The size should not be less than the kms */ + if (attr->cat_size < oinfo->loi_kms) + attr->cat_size = oinfo->loi_kms; + + LDLM_DEBUG(dlmlock, "acquired size %llu, setting rss=%llu;%s " + "kms=%llu, end=%llu", lvb->lvb_size, attr->cat_size, + setkms ? "" : " leaving", + setkms ? attr->cat_kms : oinfo->loi_kms, + dlmlock ? dlmlock->l_policy_data.l_extent.end : -1ull); + cl_object_attr_update(env, obj, attr, valid); cl_object_attr_unlock(obj); EXIT; } static void mdc_lock_granted(const struct lu_env *env, struct osc_lock *oscl, - struct lustre_handle *lockh, bool lvb_update) + struct lustre_handle *lockh) { + struct osc_object *osc = cl2osc(oscl->ols_cl.cls_obj); struct ldlm_lock *dlmlock; ENTRY; @@ -481,10 +496,11 @@ static void mdc_lock_granted(const struct lu_env *env, struct osc_lock *oscl, descr->cld_end = CL_PAGE_EOF; /* no lvb update for matched lock */ - if (lvb_update) { + if (!ldlm_is_lvb_cached(dlmlock)) { LASSERT(oscl->ols_flags & LDLM_FL_LVB_READY); - mdc_lock_lvb_update(env, cl2osc(oscl->ols_cl.cls_obj), - dlmlock, NULL); + LASSERT(osc == dlmlock->l_ast_data); + mdc_lock_lvb_update(env, osc, dlmlock, NULL); + ldlm_set_lvb_cached(dlmlock); } } unlock_res_and_lock(dlmlock); @@ -525,7 +541,7 @@ static int mdc_lock_upcall(void *cookie, struct lustre_handle *lockh, CDEBUG(D_INODE, "rc %d, err %d\n", rc, errcode); if (rc == 0) - mdc_lock_granted(env, oscl, lockh, errcode == ELDLM_OK); + mdc_lock_granted(env, oscl, lockh); /* Error handling, some errors are tolerable. */ if (oscl->ols_locklessable && rc == -EUSERS) { @@ -618,8 +634,9 @@ int mdc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall, } int mdc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req, - struct osc_enqueue_args *aa, int rc) + void *args, int rc) { + struct osc_enqueue_args *aa = args; struct ldlm_lock *lock; struct lustre_handle *lockh = &aa->oa_lockh; enum ldlm_mode mode = aa->oa_mode; @@ -683,7 +700,8 @@ int mdc_enqueue_send(const struct lu_env *env, struct obd_export *exp, enum ldlm_mode mode; bool glimpse = *flags & LDLM_FL_HAS_INTENT; __u64 match_flags = *flags; - int rc; + LIST_HEAD(cancels); + int rc, count; ENTRY; @@ -691,8 +709,12 @@ int mdc_enqueue_send(const struct lu_env *env, struct obd_export *exp, if (einfo->ei_mode == LCK_PR) mode |= LCK_PW; - if (!glimpse) + if (glimpse) match_flags |= LDLM_FL_BLOCK_GRANTED; + /* DOM locking uses LDLM_FL_KMS_IGNORE to mark locks wich have no valid + * LVB information, e.g. canceled locks or locks of just pruned object, + * such locks should be skipped. + */ mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id, einfo->ei_type, policy, mode, &lockh, 0); if (mode) { @@ -702,10 +724,11 @@ int mdc_enqueue_send(const struct lu_env *env, struct obd_export *exp, RETURN(ELDLM_OK); matched = ldlm_handle2lock(&lockh); - if (ldlm_is_kms_ignore(matched)) - goto no_match; - if (mdc_set_dom_lock_data(env, matched, einfo->ei_cbdata)) { + if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GLIMPSE_DDOS)) + ldlm_set_kms_ignore(matched); + + if (mdc_set_dom_lock_data(matched, einfo->ei_cbdata)) { *flags |= LDLM_FL_LVB_READY; /* We already have a lock, and it's referenced. */ @@ -715,7 +738,6 @@ int mdc_enqueue_send(const struct lu_env *env, struct obd_export *exp, LDLM_LOCK_PUT(matched); RETURN(ELDLM_OK); } -no_match: ldlm_lock_decref(&lockh, mode); LDLM_LOCK_PUT(matched); } @@ -727,7 +749,15 @@ no_match: if (req == NULL) RETURN(-ENOMEM); - rc = ldlm_prep_enqueue_req(exp, req, NULL, 0); + /* For WRITE lock cancel other locks on resource early if any */ + if (einfo->ei_mode & LCK_PW) + count = mdc_resource_get_unused_res(exp, res_id, &cancels, + einfo->ei_mode, + MDS_INODELOCK_DOM); + else + count = 0; + + rc = ldlm_prep_enqueue_req(exp, req, &cancels, count); if (rc < 0) { ptlrpc_request_free(req); RETURN(rc); @@ -751,8 +781,7 @@ no_match: if (!rc) { struct osc_enqueue_args *aa; - CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args)); - aa = ptlrpc_req_async_args(req); + aa = ptlrpc_req_async_args(aa, req); aa->oa_exp = exp; aa->oa_mode = einfo->ei_mode; aa->oa_type = einfo->ei_type; @@ -763,8 +792,7 @@ no_match: aa->oa_flags = flags; aa->oa_lvb = lvb; - req->rq_interpret_reply = - (ptlrpc_interpterer_t)mdc_enqueue_interpret; + req->rq_interpret_reply = mdc_enqueue_interpret; ptlrpcd_add_req(req); } else { ptlrpc_req_finished(req); @@ -997,7 +1025,8 @@ static int mdc_io_setattr_start(const struct lu_env *env, struct obdo *oa = &oio->oi_oa; struct osc_async_cbargs *cbargs = &oio->oi_cbarg; __u64 size = io->u.ci_setattr.sa_attr.lvb_size; - unsigned int ia_valid = io->u.ci_setattr.sa_valid; + unsigned int ia_avalid = io->u.ci_setattr.sa_avalid; + enum op_xvalid ia_xvalid = io->u.ci_setattr.sa_xvalid; int rc; /* silently ignore non-truncate setattr for Data-on-MDT object */ @@ -1016,19 +1045,20 @@ static int mdc_io_setattr_start(const struct lu_env *env, struct ost_lvb *lvb = &io->u.ci_setattr.sa_attr; unsigned int cl_valid = 0; - if (ia_valid & ATTR_SIZE) { - attr->cat_size = attr->cat_kms = size; + if (ia_avalid & ATTR_SIZE) { + attr->cat_size = size; + attr->cat_kms = size; cl_valid = (CAT_SIZE | CAT_KMS); } - if (ia_valid & ATTR_MTIME_SET) { + if (ia_avalid & ATTR_MTIME_SET) { attr->cat_mtime = lvb->lvb_mtime; cl_valid |= CAT_MTIME; } - if (ia_valid & ATTR_ATIME_SET) { + if (ia_avalid & ATTR_ATIME_SET) { attr->cat_atime = lvb->lvb_atime; cl_valid |= CAT_ATIME; } - if (ia_valid & ATTR_CTIME_SET) { + if (ia_xvalid & OP_XVALID_CTIME_SET) { attr->cat_ctime = lvb->lvb_ctime; cl_valid |= CAT_CTIME; } @@ -1039,7 +1069,7 @@ static int mdc_io_setattr_start(const struct lu_env *env, return rc; } - if (!(ia_valid & ATTR_SIZE)) + if (!(ia_avalid & ATTR_SIZE)) return 0; memset(oa, 0, sizeof(*oa)); @@ -1092,8 +1122,8 @@ static int mdc_io_read_ahead(const struct lu_env *env, ldlm_lock_decref(&lockh, dlmlock->l_req_mode); } - ra->cra_rpc_size = osc_cli(osc)->cl_max_pages_per_rpc; - ra->cra_end = CL_PAGE_EOF; + ra->cra_rpc_pages = osc_cli(osc)->cl_max_pages_per_rpc; + ra->cra_end_idx = CL_PAGE_EOF; ra->cra_release = osc_read_ahead_release; ra->cra_cbdata = dlmlock; @@ -1144,9 +1174,9 @@ struct mdc_data_version_args { static int mdc_data_version_interpret(const struct lu_env *env, struct ptlrpc_request *req, - void *arg, int rc) + void *args, int rc) { - struct mdc_data_version_args *dva = arg; + struct mdc_data_version_args *dva = args; struct osc_io *oio = dva->dva_oio; const struct mdt_body *body; @@ -1220,8 +1250,7 @@ static int mdc_io_data_version_start(const struct lu_env *env, ptlrpc_request_set_replen(req); req->rq_interpret_reply = mdc_data_version_interpret; - CLASSERT(sizeof(*dva) <= sizeof(req->rq_async_args)); - dva = ptlrpc_req_async_args(req); + dva = ptlrpc_req_async_args(dva, req); dva->dva_oio = oio; ptlrpcd_add_req(req); @@ -1259,13 +1288,13 @@ static void mdc_io_data_version_end(const struct lu_env *env, static struct cl_io_operations mdc_io_ops = { .op = { [CIT_READ] = { - .cio_iter_init = osc_io_iter_init, - .cio_iter_fini = osc_io_iter_fini, + .cio_iter_init = osc_io_rw_iter_init, + .cio_iter_fini = osc_io_rw_iter_fini, .cio_start = osc_io_read_start, }, [CIT_WRITE] = { - .cio_iter_init = osc_io_write_iter_init, - .cio_iter_fini = osc_io_write_iter_fini, + .cio_iter_init = osc_io_rw_iter_init, + .cio_iter_fini = osc_io_rw_iter_fini, .cio_start = osc_io_write_start, .cio_end = osc_io_end, }, @@ -1364,12 +1393,34 @@ static int mdc_attr_get(const struct lu_env *env, struct cl_object *obj, static int mdc_object_ast_clear(struct ldlm_lock *lock, void *data) { + struct osc_object *osc = (struct osc_object *)data; + struct ost_lvb *lvb = &lock->l_ost_lvb; + struct lov_oinfo *oinfo; ENTRY; - if ((lock->l_ast_data == NULL && !ldlm_is_kms_ignore(lock)) || - (lock->l_ast_data == data)) { + if (lock->l_ast_data == data) { lock->l_ast_data = NULL; - ldlm_set_kms_ignore(lock); + + LASSERT(osc != NULL); + LASSERT(osc->oo_oinfo != NULL); + LASSERT(lvb != NULL); + + /* Updates lvb in lock by the cached oinfo */ + oinfo = osc->oo_oinfo; + + LDLM_DEBUG(lock, "update lock size %llu blocks %llu [cma]time: " + "%llu %llu %llu by oinfo size %llu blocks %llu " + "[cma]time %llu %llu %llu", lvb->lvb_size, + lvb->lvb_blocks, lvb->lvb_ctime, lvb->lvb_mtime, + lvb->lvb_atime, oinfo->loi_lvb.lvb_size, + oinfo->loi_lvb.lvb_blocks, oinfo->loi_lvb.lvb_ctime, + oinfo->loi_lvb.lvb_mtime, oinfo->loi_lvb.lvb_atime); + LASSERT(oinfo->loi_lvb.lvb_size >= oinfo->loi_kms); + + cl_object_attr_lock(&osc->oo_cl); + memcpy(lvb, &oinfo->loi_lvb, sizeof(oinfo->loi_lvb)); + cl_object_attr_unlock(&osc->oo_cl); + ldlm_clear_lvb_cached(lock); } RETURN(LDLM_ITER_CONTINUE); } @@ -1387,6 +1438,12 @@ int mdc_object_prune(const struct lu_env *env, struct cl_object *obj) return 0; } +static int mdc_object_flush(const struct lu_env *env, struct cl_object *obj, + struct ldlm_lock *lock) +{ + RETURN(mdc_dlm_blocking_ast0(env, lock, LDLM_CB_CANCELING)); +} + static const struct cl_object_operations mdc_ops = { .coo_page_init = osc_page_init, .coo_lock_init = mdc_lock_init, @@ -1396,6 +1453,7 @@ static const struct cl_object_operations mdc_ops = { .coo_glimpse = osc_object_glimpse, .coo_req_attr_set = mdc_req_attr_set, .coo_prune = mdc_object_prune, + .coo_object_flush = mdc_object_flush }; static const struct osc_object_operations mdc_object_ops = { @@ -1451,15 +1509,17 @@ struct lu_object *mdc_object_alloc(const struct lu_env *env, return obj; } -static int mdc_cl_process_config(const struct lu_env *env, - struct lu_device *d, struct lustre_cfg *cfg) +static int mdc_process_config(const struct lu_env *env, struct lu_device *d, + struct lustre_cfg *cfg) { - return mdc_process_config(d->ld_obd, 0, cfg); + size_t count = class_modify_config(cfg, PARAM_MDC, + &d->ld_obd->obd_kset.kobj); + return count > 0 ? 0 : count; } const struct lu_device_operations mdc_lu_ops = { .ldo_object_alloc = mdc_object_alloc, - .ldo_process_config = mdc_cl_process_config, + .ldo_process_config = mdc_process_config, .ldo_recovery_complete = NULL, };