X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fmdc%2Fmdc_dev.c;h=cbe02017122fded1db6ce1889f3cebb95808cb2f;hb=e1bd38e27a810bad7a25813ebc1ca0535c9d7228;hp=1adc457181d0bcc501b0533e880e11aef4b62838;hpb=b915221b6d0f3457fd9dd202a9d14c5f8385bf47;p=fs%2Flustre-release.git diff --git a/lustre/mdc/mdc_dev.c b/lustre/mdc/mdc_dev.c index 1adc457..cbe0201 100644 --- a/lustre/mdc/mdc_dev.c +++ b/lustre/mdc/mdc_dev.c @@ -39,10 +39,14 @@ #include "mdc_internal.h" static void mdc_lock_build_policy(const struct lu_env *env, + const struct cl_lock *lock, union ldlm_policy_data *policy) { memset(policy, 0, sizeof *policy); policy->l_inodebits.bits = MDS_INODELOCK_DOM; + if (lock) { + policy->l_inodebits.li_gid = lock->cll_descr.cld_gid; + } } int mdc_ldlm_glimpse_ast(struct ldlm_lock *dlmlock, void *data) @@ -68,21 +72,17 @@ static void mdc_lock_lvb_update(const struct lu_env *env, struct ldlm_lock *dlmlock, struct ost_lvb *lvb); -static int mdc_set_dom_lock_data(const struct lu_env *env, - struct ldlm_lock *lock, void *data) +static int mdc_set_dom_lock_data(struct ldlm_lock *lock, void *data) { - struct osc_object *obj = data; int set = 0; LASSERT(lock != NULL); LASSERT(lock->l_glimpse_ast == mdc_ldlm_glimpse_ast); lock_res_and_lock(lock); - if (lock->l_ast_data == NULL) { - lock->l_ast_data = data; - mdc_lock_lvb_update(env, obj, lock, NULL); - } + if (lock->l_ast_data == NULL) + lock->l_ast_data = data; if (lock->l_ast_data == data) set = 1; @@ -92,10 +92,11 @@ static int mdc_set_dom_lock_data(const struct lu_env *env, } int mdc_dom_lock_match(const struct lu_env *env, struct obd_export *exp, - struct ldlm_res_id *res_id, - enum ldlm_type type, union ldlm_policy_data *policy, - enum ldlm_mode mode, __u64 *flags, void *data, - struct lustre_handle *lockh, int unref) + struct ldlm_res_id *res_id, enum ldlm_type type, + union ldlm_policy_data *policy, enum ldlm_mode mode, + __u64 *flags, struct osc_object *obj, + struct lustre_handle *lockh, + enum ldlm_match_flags match_flags) { struct obd_device *obd = exp->exp_obd; __u64 lflags = *flags; @@ -103,16 +104,25 @@ int mdc_dom_lock_match(const struct lu_env *env, struct obd_export *exp, ENTRY; - rc = ldlm_lock_match(obd->obd_namespace, lflags, - res_id, type, policy, mode, lockh, unref); + rc = ldlm_lock_match_with_skip(obd->obd_namespace, lflags, 0, + res_id, type, policy, mode, lockh, match_flags); + if (rc == 0 || lflags & LDLM_FL_TEST_LOCK) RETURN(rc); - if (data != NULL) { + if (obj != NULL) { struct ldlm_lock *lock = ldlm_handle2lock(lockh); LASSERT(lock != NULL); - if (!mdc_set_dom_lock_data(env, lock, data)) { + if (mdc_set_dom_lock_data(lock, obj)) { + lock_res_and_lock(lock); + if (!ldlm_is_lvb_cached(lock)) { + LASSERT(lock->l_ast_data == obj); + mdc_lock_lvb_update(env, obj, lock, NULL); + ldlm_set_lvb_cached(lock); + } + unlock_res_and_lock(lock); + } else { ldlm_lock_decref(lockh, rc); rc = 0; } @@ -136,16 +146,24 @@ struct ldlm_lock *mdc_dlmlock_at_pgoff(const struct lu_env *env, struct ldlm_lock *lock = NULL; enum ldlm_mode mode; __u64 flags; + enum ldlm_match_flags match_flags = 0; ENTRY; fid_build_reg_res_name(lu_object_fid(osc2lu(obj)), resname); - mdc_lock_build_policy(env, policy); + mdc_lock_build_policy(env, NULL, policy); + policy->l_inodebits.li_gid = LDLM_GID_ANY; flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING; if (dap_flags & OSC_DAP_FL_TEST_LOCK) flags |= LDLM_FL_TEST_LOCK; + if (dap_flags & OSC_DAP_FL_AST) + match_flags |= LDLM_MATCH_AST; + + if (dap_flags & OSC_DAP_FL_CANCELING) + match_flags |= LDLM_MATCH_UNREF; + again: /* Next, search for already existing extent locks that will cover us */ /* If we're trying to read, we also search for an existing PW lock. The @@ -153,8 +171,7 @@ again: * writers can share a single PW lock. */ mode = mdc_dom_lock_match(env, osc_export(obj), resname, LDLM_IBITS, policy, LCK_PR | LCK_PW | LCK_GROUP, &flags, - obj, &lockh, - dap_flags & OSC_DAP_FL_CANCELING); + obj, &lockh, match_flags); if (mode != 0) { lock = ldlm_handle2lock(&lockh); /* RACE: the lock is cancelled so let's try again */ @@ -168,8 +185,8 @@ again: /** * Check if page @page is covered by an extra lock or discard it. */ -static int mdc_check_and_discard_cb(const struct lu_env *env, struct cl_io *io, - struct osc_page *ops, void *cbdata) +static bool mdc_check_and_discard_cb(const struct lu_env *env, struct cl_io *io, + struct osc_page *ops, void *cbdata) { struct osc_thread_info *info = osc_env_info(env); struct osc_object *osc = cbdata; @@ -182,7 +199,7 @@ static int mdc_check_and_discard_cb(const struct lu_env *env, struct cl_io *io, /* refresh non-overlapped index */ tmp = mdc_dlmlock_at_pgoff(env, osc, index, - OSC_DAP_FL_TEST_LOCK); + OSC_DAP_FL_TEST_LOCK | OSC_DAP_FL_AST); if (tmp != NULL) { info->oti_fn_index = CL_PAGE_EOF; LDLM_LOCK_PUT(tmp); @@ -196,7 +213,7 @@ static int mdc_check_and_discard_cb(const struct lu_env *env, struct cl_io *io, } info->oti_next_index = index + 1; - return CLP_GANG_OKAY; + return true; } /** @@ -215,7 +232,6 @@ static int mdc_lock_discard_pages(const struct lu_env *env, struct osc_thread_info *info = osc_env_info(env); struct cl_io *io = &info->oti_io; osc_page_gang_cbt cb; - int res; int result; ENTRY; @@ -228,15 +244,9 @@ static int mdc_lock_discard_pages(const struct lu_env *env, cb = discard ? osc_discard_cb : mdc_check_and_discard_cb; info->oti_fn_index = info->oti_next_index = start; - do { - res = osc_page_gang_lookup(env, io, osc, info->oti_next_index, - end, cb, (void *)osc); - if (info->oti_next_index > end) - break; - if (res == CLP_GANG_RESCHED) - cond_resched(); - } while (res != CLP_GANG_OKAY); + osc_page_gang_lookup(env, io, osc, info->oti_next_index, + end, cb, (void *)osc); out: cl_io_fini(env, io); RETURN(result); @@ -261,7 +271,9 @@ static int mdc_lock_flush(const struct lu_env *env, struct osc_object *obj, result = 0; } - rc = mdc_lock_discard_pages(env, obj, start, end, discard); + /* Avoid lock matching with CLM_WRITE, there can be no other locks */ + rc = mdc_lock_discard_pages(env, obj, start, end, + mode == CLM_WRITE || discard); if (result == 0 && rc < 0) result = rc; @@ -292,7 +304,7 @@ void mdc_lock_lockless_cancel(const struct lu_env *env, */ static int mdc_dlm_blocking_ast0(const struct lu_env *env, struct ldlm_lock *dlmlock, - void *data, int flag) + int flag) { struct cl_object *obj = NULL; int result = 0; @@ -317,10 +329,8 @@ static int mdc_dlm_blocking_ast0(const struct lu_env *env, if (dlmlock->l_ast_data != NULL) { obj = osc2cl(dlmlock->l_ast_data); - dlmlock->l_ast_data = NULL; cl_object_get(obj); } - ldlm_set_kms_ignore(dlmlock); unlock_res_and_lock(dlmlock); /* if l_ast_data is NULL, the dlmlock was enqueued by AGL or @@ -336,6 +346,7 @@ static int mdc_dlm_blocking_ast0(const struct lu_env *env, */ /* losing a lock, update kms */ lock_res_and_lock(dlmlock); + dlmlock->l_ast_data = NULL; cl_object_attr_lock(obj); attr->cat_kms = 0; cl_object_attr_update(env, obj, attr, CAT_KMS); @@ -384,7 +395,7 @@ int mdc_ldlm_blocking_ast(struct ldlm_lock *dlmlock, break; } - rc = mdc_dlm_blocking_ast0(env, dlmlock, data, flag); + rc = mdc_dlm_blocking_ast0(env, dlmlock, flag); cl_env_put(env, &refcheck); break; } @@ -410,6 +421,7 @@ void mdc_lock_lvb_update(const struct lu_env *env, struct osc_object *osc, struct cl_attr *attr = &osc_env_info(env)->oti_attr; unsigned valid = CAT_BLOCKS | CAT_ATIME | CAT_CTIME | CAT_MTIME | CAT_SIZE; + unsigned int setkms = 0; ENTRY; @@ -427,24 +439,31 @@ void mdc_lock_lvb_update(const struct lu_env *env, struct osc_object *osc, size = lvb->lvb_size; if (size >= oinfo->loi_kms) { - LDLM_DEBUG(dlmlock, "lock acquired, setting rss=%llu," - " kms=%llu", lvb->lvb_size, size); valid |= CAT_KMS; attr->cat_kms = size; - } else { - LDLM_DEBUG(dlmlock, "lock acquired, setting rss=%llu," - " leaving kms=%llu", - lvb->lvb_size, oinfo->loi_kms); + setkms = 1; } } + + /* The size should not be less than the kms */ + if (attr->cat_size < oinfo->loi_kms) + attr->cat_size = oinfo->loi_kms; + + LDLM_DEBUG(dlmlock, "acquired size %llu, setting rss=%llu;%s " + "kms=%llu, end=%llu", lvb->lvb_size, attr->cat_size, + setkms ? "" : " leaving", + setkms ? attr->cat_kms : oinfo->loi_kms, + dlmlock ? dlmlock->l_policy_data.l_extent.end : -1ull); + cl_object_attr_update(env, obj, attr, valid); cl_object_attr_unlock(obj); EXIT; } static void mdc_lock_granted(const struct lu_env *env, struct osc_lock *oscl, - struct lustre_handle *lockh, bool lvb_update) + struct lustre_handle *lockh) { + struct osc_object *osc = cl2osc(oscl->ols_cl.cls_obj); struct ldlm_lock *dlmlock; ENTRY; @@ -483,10 +502,11 @@ static void mdc_lock_granted(const struct lu_env *env, struct osc_lock *oscl, descr->cld_end = CL_PAGE_EOF; /* no lvb update for matched lock */ - if (lvb_update) { + if (!ldlm_is_lvb_cached(dlmlock)) { LASSERT(oscl->ols_flags & LDLM_FL_LVB_READY); - mdc_lock_lvb_update(env, cl2osc(oscl->ols_cl.cls_obj), - dlmlock, NULL); + LASSERT(osc == dlmlock->l_ast_data); + mdc_lock_lvb_update(env, osc, dlmlock, NULL); + ldlm_set_lvb_cached(dlmlock); } } unlock_res_and_lock(dlmlock); @@ -527,7 +547,7 @@ static int mdc_lock_upcall(void *cookie, struct lustre_handle *lockh, CDEBUG(D_INODE, "rc %d, err %d\n", rc, errcode); if (rc == 0) - mdc_lock_granted(env, oscl, lockh, errcode == ELDLM_OK); + mdc_lock_granted(env, oscl, lockh); /* Error handling, some errors are tolerable. */ if (oscl->ols_locklessable && rc == -EUSERS) { @@ -686,7 +706,8 @@ int mdc_enqueue_send(const struct lu_env *env, struct obd_export *exp, enum ldlm_mode mode; bool glimpse = *flags & LDLM_FL_HAS_INTENT; __u64 match_flags = *flags; - int rc; + LIST_HEAD(cancels); + int rc, count; ENTRY; @@ -700,10 +721,8 @@ int mdc_enqueue_send(const struct lu_env *env, struct obd_export *exp, * LVB information, e.g. canceled locks or locks of just pruned object, * such locks should be skipped. */ - mode = ldlm_lock_match_with_skip(obd->obd_namespace, match_flags, - LDLM_FL_KMS_IGNORE, res_id, - einfo->ei_type, policy, mode, - &lockh, 0); + mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id, + einfo->ei_type, policy, mode, &lockh); if (mode) { struct ldlm_lock *matched; @@ -711,18 +730,11 @@ int mdc_enqueue_send(const struct lu_env *env, struct obd_export *exp, RETURN(ELDLM_OK); matched = ldlm_handle2lock(&lockh); - /* this shouldn't happen but this check is kept to make - * related test fail if problem occurs - */ - if (unlikely(ldlm_is_kms_ignore(matched))) { - LDLM_ERROR(matched, "matched lock has KMS ignore flag"); - goto no_match; - } if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GLIMPSE_DDOS)) ldlm_set_kms_ignore(matched); - if (mdc_set_dom_lock_data(env, matched, einfo->ei_cbdata)) { + if (mdc_set_dom_lock_data(matched, einfo->ei_cbdata)) { *flags |= LDLM_FL_LVB_READY; /* We already have a lock, and it's referenced. */ @@ -732,7 +744,6 @@ int mdc_enqueue_send(const struct lu_env *env, struct obd_export *exp, LDLM_LOCK_PUT(matched); RETURN(ELDLM_OK); } -no_match: ldlm_lock_decref(&lockh, mode); LDLM_LOCK_PUT(matched); } @@ -744,7 +755,15 @@ no_match: if (req == NULL) RETURN(-ENOMEM); - rc = ldlm_prep_enqueue_req(exp, req, NULL, 0); + /* For WRITE lock cancel other locks on resource early if any */ + if (einfo->ei_mode & LCK_PW) + count = mdc_resource_get_unused_res(exp, res_id, &cancels, + einfo->ei_mode, + MDS_INODELOCK_DOM); + else + count = 0; + + rc = ldlm_prep_enqueue_req(exp, req, &cancels, count); if (rc < 0) { ptlrpc_request_free(req); RETURN(rc); @@ -768,8 +787,7 @@ no_match: if (!rc) { struct osc_enqueue_args *aa; - CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args)); - aa = ptlrpc_req_async_args(req); + aa = ptlrpc_req_async_args(aa, req); aa->oa_exp = exp; aa->oa_mode = einfo->ei_mode; aa->oa_type = einfo->ei_type; @@ -872,7 +890,7 @@ enqueue_base: * osc_lock. */ fid_build_reg_res_name(lu_object_fid(osc2lu(osc)), resname); - mdc_lock_build_policy(env, policy); + mdc_lock_build_policy(env, lock, policy); LASSERT(!oscl->ols_speculative); result = mdc_enqueue_send(env, osc_export(osc), resname, &oscl->ols_flags, policy, @@ -938,6 +956,8 @@ int mdc_lock_init(const struct lu_env *env, struct cl_object *obj, ols->ols_flags = flags; ols->ols_speculative = !!(enqflags & CEF_SPECULATIVE); + if (lock->cll_descr.cld_mode == CLM_GROUP) + ols->ols_flags |= LDLM_FL_ATOMIC_CB; if (ols->ols_flags & LDLM_FL_HAS_INTENT) { ols->ols_flags |= LDLM_FL_BLOCK_GRANTED; @@ -1110,8 +1130,8 @@ static int mdc_io_read_ahead(const struct lu_env *env, ldlm_lock_decref(&lockh, dlmlock->l_req_mode); } - ra->cra_rpc_size = osc_cli(osc)->cl_max_pages_per_rpc; - ra->cra_end = CL_PAGE_EOF; + ra->cra_rpc_pages = osc_cli(osc)->cl_max_pages_per_rpc; + ra->cra_end_idx = CL_PAGE_EOF; ra->cra_release = osc_read_ahead_release; ra->cra_cbdata = dlmlock; @@ -1238,8 +1258,7 @@ static int mdc_io_data_version_start(const struct lu_env *env, ptlrpc_request_set_replen(req); req->rq_interpret_reply = mdc_data_version_interpret; - CLASSERT(sizeof(*dva) <= sizeof(req->rq_async_args)); - dva = ptlrpc_req_async_args(req); + dva = ptlrpc_req_async_args(dva, req); dva->dva_oio = oio; ptlrpcd_add_req(req); @@ -1277,13 +1296,13 @@ static void mdc_io_data_version_end(const struct lu_env *env, static struct cl_io_operations mdc_io_ops = { .op = { [CIT_READ] = { - .cio_iter_init = osc_io_iter_init, - .cio_iter_fini = osc_io_iter_fini, + .cio_iter_init = osc_io_rw_iter_init, + .cio_iter_fini = osc_io_rw_iter_fini, .cio_start = osc_io_read_start, }, [CIT_WRITE] = { - .cio_iter_init = osc_io_write_iter_init, - .cio_iter_fini = osc_io_write_iter_fini, + .cio_iter_init = osc_io_rw_iter_init, + .cio_iter_fini = osc_io_rw_iter_fini, .cio_start = osc_io_write_start, .cio_end = osc_io_end, }, @@ -1307,6 +1326,10 @@ static struct cl_io_operations mdc_io_ops = { .cio_start = mdc_io_fsync_start, .cio_end = osc_io_fsync_end, }, + [CIT_LSEEK] = { + .cio_start = osc_io_lseek_start, + .cio_end = osc_io_lseek_end, + }, }, .cio_read_ahead = mdc_io_read_ahead, .cio_submit = osc_io_submit, @@ -1382,11 +1405,35 @@ static int mdc_attr_get(const struct lu_env *env, struct cl_object *obj, static int mdc_object_ast_clear(struct ldlm_lock *lock, void *data) { + struct osc_object *osc = (struct osc_object *)data; + struct ost_lvb *lvb = &lock->l_ost_lvb; + struct lov_oinfo *oinfo; ENTRY; - if (lock->l_ast_data == data) + if (lock->l_ast_data == data) { lock->l_ast_data = NULL; - ldlm_set_kms_ignore(lock); + + LASSERT(osc != NULL); + LASSERT(osc->oo_oinfo != NULL); + LASSERT(lvb != NULL); + + /* Updates lvb in lock by the cached oinfo */ + oinfo = osc->oo_oinfo; + + LDLM_DEBUG(lock, "update lock size %llu blocks %llu [cma]time: " + "%llu %llu %llu by oinfo size %llu blocks %llu " + "[cma]time %llu %llu %llu", lvb->lvb_size, + lvb->lvb_blocks, lvb->lvb_ctime, lvb->lvb_mtime, + lvb->lvb_atime, oinfo->loi_lvb.lvb_size, + oinfo->loi_lvb.lvb_blocks, oinfo->loi_lvb.lvb_ctime, + oinfo->loi_lvb.lvb_mtime, oinfo->loi_lvb.lvb_atime); + LASSERT(oinfo->loi_lvb.lvb_size >= oinfo->loi_kms); + + cl_object_attr_lock(&osc->oo_cl); + memcpy(lvb, &oinfo->loi_lvb, sizeof(oinfo->loi_lvb)); + cl_object_attr_unlock(&osc->oo_cl); + ldlm_clear_lvb_cached(lock); + } RETURN(LDLM_ITER_CONTINUE); } @@ -1403,6 +1450,17 @@ int mdc_object_prune(const struct lu_env *env, struct cl_object *obj) return 0; } +static int mdc_object_flush(const struct lu_env *env, struct cl_object *obj, + struct ldlm_lock *lock) +{ + /* if lock cancel is initiated from llite then it is combined + * lock with DOM bit and it may have no l_ast_data initialized yet, + * so init it here with given osc_object. + */ + mdc_set_dom_lock_data(lock, cl2osc(obj)); + RETURN(mdc_dlm_blocking_ast0(env, lock, LDLM_CB_CANCELING)); +} + static const struct cl_object_operations mdc_ops = { .coo_page_init = osc_page_init, .coo_lock_init = mdc_lock_init, @@ -1412,6 +1470,7 @@ static const struct cl_object_operations mdc_ops = { .coo_glimpse = osc_object_glimpse, .coo_req_attr_set = mdc_req_attr_set, .coo_prune = mdc_object_prune, + .coo_object_flush = mdc_object_flush }; static const struct osc_object_operations mdc_object_ops = {