X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fmdc%2Fmdc_dev.c;h=75d44ba45dbe9868102a21820c244efd1672903e;hp=a59b1a8821f1e260c8bdf23b8c0acfc3d4faada9;hb=72617588ac8cb2e3e5a7b8e5ebc201cab524d938;hpb=16c156c3218b9dea5a7cd91c12278a608d278c54 diff --git a/lustre/mdc/mdc_dev.c b/lustre/mdc/mdc_dev.c index a59b1a8..75d44ba 100644 --- a/lustre/mdc/mdc_dev.c +++ b/lustre/mdc/mdc_dev.c @@ -39,10 +39,14 @@ #include "mdc_internal.h" static void mdc_lock_build_policy(const struct lu_env *env, + const struct cl_lock *lock, union ldlm_policy_data *policy) { memset(policy, 0, sizeof *policy); policy->l_inodebits.bits = MDS_INODELOCK_DOM; + if (lock) { + policy->l_inodebits.li_gid = lock->cll_descr.cld_gid; + } } int mdc_ldlm_glimpse_ast(struct ldlm_lock *dlmlock, void *data) @@ -68,21 +72,17 @@ static void mdc_lock_lvb_update(const struct lu_env *env, struct ldlm_lock *dlmlock, struct ost_lvb *lvb); -static int mdc_set_dom_lock_data(const struct lu_env *env, - struct ldlm_lock *lock, void *data) +static int mdc_set_dom_lock_data(struct ldlm_lock *lock, void *data) { - struct osc_object *obj = data; int set = 0; LASSERT(lock != NULL); LASSERT(lock->l_glimpse_ast == mdc_ldlm_glimpse_ast); lock_res_and_lock(lock); - if (lock->l_ast_data == NULL) { - lock->l_ast_data = data; - mdc_lock_lvb_update(env, obj, lock, NULL); - } + if (lock->l_ast_data == NULL) + lock->l_ast_data = data; if (lock->l_ast_data == data) set = 1; @@ -92,10 +92,11 @@ static int mdc_set_dom_lock_data(const struct lu_env *env, } int mdc_dom_lock_match(const struct lu_env *env, struct obd_export *exp, - struct ldlm_res_id *res_id, - enum ldlm_type type, union ldlm_policy_data *policy, - enum ldlm_mode mode, __u64 *flags, void *data, - struct lustre_handle *lockh, int unref) + struct ldlm_res_id *res_id, enum ldlm_type type, + union ldlm_policy_data *policy, enum ldlm_mode mode, + __u64 *flags, struct osc_object *obj, + struct lustre_handle *lockh, + enum ldlm_match_flags match_flags) { struct obd_device *obd = exp->exp_obd; __u64 lflags = *flags; @@ -103,16 +104,24 @@ int mdc_dom_lock_match(const struct lu_env *env, struct obd_export *exp, ENTRY; - rc = ldlm_lock_match(obd->obd_namespace, lflags, - res_id, type, policy, mode, lockh, unref); + rc = ldlm_lock_match_with_skip(obd->obd_namespace, lflags, 0, + res_id, type, policy, mode, lockh, match_flags); if (rc == 0 || lflags & LDLM_FL_TEST_LOCK) RETURN(rc); - if (data != NULL) { + if (obj != NULL) { struct ldlm_lock *lock = ldlm_handle2lock(lockh); LASSERT(lock != NULL); - if (!mdc_set_dom_lock_data(env, lock, data)) { + if (mdc_set_dom_lock_data(lock, obj)) { + lock_res_and_lock(lock); + if (!ldlm_is_lvb_cached(lock)) { + LASSERT(lock->l_ast_data == obj); + mdc_lock_lvb_update(env, obj, lock, NULL); + ldlm_set_lvb_cached(lock); + } + unlock_res_and_lock(lock); + } else { ldlm_lock_decref(lockh, rc); rc = 0; } @@ -136,16 +145,24 @@ struct ldlm_lock *mdc_dlmlock_at_pgoff(const struct lu_env *env, struct ldlm_lock *lock = NULL; enum ldlm_mode mode; __u64 flags; + enum ldlm_match_flags match_flags = 0; ENTRY; fid_build_reg_res_name(lu_object_fid(osc2lu(obj)), resname); - mdc_lock_build_policy(env, policy); + mdc_lock_build_policy(env, NULL, policy); + policy->l_inodebits.li_gid = LDLM_GID_ANY; flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING; if (dap_flags & OSC_DAP_FL_TEST_LOCK) flags |= LDLM_FL_TEST_LOCK; + if (dap_flags & OSC_DAP_FL_AST) + match_flags |= LDLM_MATCH_AST; + + if (dap_flags & OSC_DAP_FL_CANCELING) + match_flags |= LDLM_MATCH_UNREF; + again: /* Next, search for already existing extent locks that will cover us */ /* If we're trying to read, we also search for an existing PW lock. The @@ -153,8 +170,7 @@ again: * writers can share a single PW lock. */ mode = mdc_dom_lock_match(env, osc_export(obj), resname, LDLM_IBITS, policy, LCK_PR | LCK_PW | LCK_GROUP, &flags, - obj, &lockh, - dap_flags & OSC_DAP_FL_CANCELING); + obj, &lockh, match_flags); if (mode != 0) { lock = ldlm_handle2lock(&lockh); /* RACE: the lock is cancelled so let's try again */ @@ -168,8 +184,8 @@ again: /** * Check if page @page is covered by an extra lock or discard it. */ -static int mdc_check_and_discard_cb(const struct lu_env *env, struct cl_io *io, - struct osc_page *ops, void *cbdata) +static bool mdc_check_and_discard_cb(const struct lu_env *env, struct cl_io *io, + struct osc_page *ops, void *cbdata) { struct osc_thread_info *info = osc_env_info(env); struct osc_object *osc = cbdata; @@ -182,7 +198,7 @@ static int mdc_check_and_discard_cb(const struct lu_env *env, struct cl_io *io, /* refresh non-overlapped index */ tmp = mdc_dlmlock_at_pgoff(env, osc, index, - OSC_DAP_FL_TEST_LOCK); + OSC_DAP_FL_TEST_LOCK | OSC_DAP_FL_AST); if (tmp != NULL) { info->oti_fn_index = CL_PAGE_EOF; LDLM_LOCK_PUT(tmp); @@ -196,7 +212,7 @@ static int mdc_check_and_discard_cb(const struct lu_env *env, struct cl_io *io, } info->oti_next_index = index + 1; - return CLP_GANG_OKAY; + return true; } /** @@ -215,7 +231,6 @@ static int mdc_lock_discard_pages(const struct lu_env *env, struct osc_thread_info *info = osc_env_info(env); struct cl_io *io = &info->oti_io; osc_page_gang_cbt cb; - int res; int result; ENTRY; @@ -228,15 +243,9 @@ static int mdc_lock_discard_pages(const struct lu_env *env, cb = discard ? osc_discard_cb : mdc_check_and_discard_cb; info->oti_fn_index = info->oti_next_index = start; - do { - res = osc_page_gang_lookup(env, io, osc, info->oti_next_index, - end, cb, (void *)osc); - if (info->oti_next_index > end) - break; - if (res == CLP_GANG_RESCHED) - cond_resched(); - } while (res != CLP_GANG_OKAY); + osc_page_gang_lookup(env, io, osc, info->oti_next_index, + end, cb, (void *)osc); out: cl_io_fini(env, io); RETURN(result); @@ -261,7 +270,9 @@ static int mdc_lock_flush(const struct lu_env *env, struct osc_object *obj, result = 0; } - rc = mdc_lock_discard_pages(env, obj, start, end, discard); + /* Avoid lock matching with CLM_WRITE, there can be no other locks */ + rc = mdc_lock_discard_pages(env, obj, start, end, + mode == CLM_WRITE || discard); if (result == 0 && rc < 0) result = rc; @@ -290,9 +301,8 @@ void mdc_lock_lockless_cancel(const struct lu_env *env, * Helper for osc_dlm_blocking_ast() handling discrepancies between cl_lock * and ldlm_lock caches. */ -static int mdc_dlm_blocking_ast0(const struct lu_env *env, - struct ldlm_lock *dlmlock, - void *data, int flag) +static int mdc_dlm_canceling(const struct lu_env *env, + struct ldlm_lock *dlmlock) { struct cl_object *obj = NULL; int result = 0; @@ -301,11 +311,8 @@ static int mdc_dlm_blocking_ast0(const struct lu_env *env, ENTRY; - LASSERT(flag == LDLM_CB_CANCELING); - LASSERT(dlmlock != NULL); - lock_res_and_lock(dlmlock); - if (dlmlock->l_granted_mode != dlmlock->l_req_mode) { + if (!ldlm_is_granted(dlmlock)) { dlmlock->l_ast_data = NULL; unlock_res_and_lock(dlmlock); RETURN(0); @@ -317,10 +324,8 @@ static int mdc_dlm_blocking_ast0(const struct lu_env *env, if (dlmlock->l_ast_data != NULL) { obj = osc2cl(dlmlock->l_ast_data); - dlmlock->l_ast_data = NULL; cl_object_get(obj); } - ldlm_set_kms_ignore(dlmlock); unlock_res_and_lock(dlmlock); /* if l_ast_data is NULL, the dlmlock was enqueued by AGL or @@ -336,6 +341,7 @@ static int mdc_dlm_blocking_ast0(const struct lu_env *env, */ /* losing a lock, update kms */ lock_res_and_lock(dlmlock); + dlmlock->l_ast_data = NULL; cl_object_attr_lock(obj); attr->cat_kms = 0; cl_object_attr_update(env, obj, attr, CAT_KMS); @@ -347,13 +353,13 @@ static int mdc_dlm_blocking_ast0(const struct lu_env *env, } int mdc_ldlm_blocking_ast(struct ldlm_lock *dlmlock, - struct ldlm_lock_desc *new, void *data, int flag) + struct ldlm_lock_desc *new, void *data, int reason) { int rc = 0; ENTRY; - switch (flag) { + switch (reason) { case LDLM_CB_BLOCKING: { struct lustre_handle lockh; @@ -384,7 +390,7 @@ int mdc_ldlm_blocking_ast(struct ldlm_lock *dlmlock, break; } - rc = mdc_dlm_blocking_ast0(env, dlmlock, data, flag); + rc = mdc_dlm_canceling(env, dlmlock); cl_env_put(env, &refcheck); break; } @@ -410,6 +416,7 @@ void mdc_lock_lvb_update(const struct lu_env *env, struct osc_object *osc, struct cl_attr *attr = &osc_env_info(env)->oti_attr; unsigned valid = CAT_BLOCKS | CAT_ATIME | CAT_CTIME | CAT_MTIME | CAT_SIZE; + unsigned int setkms = 0; ENTRY; @@ -427,24 +434,32 @@ void mdc_lock_lvb_update(const struct lu_env *env, struct osc_object *osc, size = lvb->lvb_size; if (size >= oinfo->loi_kms) { - LDLM_DEBUG(dlmlock, "lock acquired, setting rss=%llu," - " kms=%llu", lvb->lvb_size, size); valid |= CAT_KMS; attr->cat_kms = size; - } else { - LDLM_DEBUG(dlmlock, "lock acquired, setting rss=%llu," - " leaving kms=%llu", - lvb->lvb_size, oinfo->loi_kms); + setkms = 1; } + ldlm_lock_allow_match_locked(dlmlock); } + + /* The size should not be less than the kms */ + if (attr->cat_size < oinfo->loi_kms) + attr->cat_size = oinfo->loi_kms; + + LDLM_DEBUG(dlmlock, "acquired size %llu, setting rss=%llu;%s " + "kms=%llu, end=%llu", lvb->lvb_size, attr->cat_size, + setkms ? "" : " leaving", + setkms ? attr->cat_kms : oinfo->loi_kms, + dlmlock ? dlmlock->l_policy_data.l_extent.end : -1ull); + cl_object_attr_update(env, obj, attr, valid); cl_object_attr_unlock(obj); EXIT; } static void mdc_lock_granted(const struct lu_env *env, struct osc_lock *oscl, - struct lustre_handle *lockh, bool lvb_update) + struct lustre_handle *lockh) { + struct osc_object *osc = cl2osc(oscl->ols_cl.cls_obj); struct ldlm_lock *dlmlock; ENTRY; @@ -473,7 +488,7 @@ static void mdc_lock_granted(const struct lu_env *env, struct osc_lock *oscl, /* Lock must have been granted. */ lock_res_and_lock(dlmlock); - if (dlmlock->l_granted_mode == dlmlock->l_req_mode) { + if (ldlm_is_granted(dlmlock)) { struct cl_lock_descr *descr = &oscl->ols_cl.cls_lock->cll_descr; /* extend the lock extent, otherwise it will have problem when @@ -483,10 +498,11 @@ static void mdc_lock_granted(const struct lu_env *env, struct osc_lock *oscl, descr->cld_end = CL_PAGE_EOF; /* no lvb update for matched lock */ - if (lvb_update) { + if (!ldlm_is_lvb_cached(dlmlock)) { LASSERT(oscl->ols_flags & LDLM_FL_LVB_READY); - mdc_lock_lvb_update(env, cl2osc(oscl->ols_cl.cls_obj), - dlmlock, NULL); + LASSERT(osc == dlmlock->l_ast_data); + mdc_lock_lvb_update(env, osc, dlmlock, NULL); + ldlm_set_lvb_cached(dlmlock); } } unlock_res_and_lock(dlmlock); @@ -498,7 +514,7 @@ static void mdc_lock_granted(const struct lu_env *env, struct osc_lock *oscl, /** * Lock upcall function that is executed either when a reply to ENQUEUE rpc is - * received from a server, or after osc_enqueue_base() matched a local DLM + * received from a server, or after mdc_enqueue_send() matched a local DLM * lock. */ static int mdc_lock_upcall(void *cookie, struct lustre_handle *lockh, @@ -527,7 +543,7 @@ static int mdc_lock_upcall(void *cookie, struct lustre_handle *lockh, CDEBUG(D_INODE, "rc %d, err %d\n", rc, errcode); if (rc == 0) - mdc_lock_granted(env, oscl, lockh, errcode == ELDLM_OK); + mdc_lock_granted(env, oscl, lockh); /* Error handling, some errors are tolerable. */ if (oscl->ols_locklessable && rc == -EUSERS) { @@ -556,53 +572,66 @@ static int mdc_lock_upcall(void *cookie, struct lustre_handle *lockh, RETURN(rc); } +/* This is needed only for old servers (before 2.14) support */ int mdc_fill_lvb(struct ptlrpc_request *req, struct ost_lvb *lvb) { struct mdt_body *body; + /* get LVB data from mdt_body otherwise */ body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY); if (!body) RETURN(-EPROTO); - lvb->lvb_mtime = body->mbo_mtime; - lvb->lvb_atime = body->mbo_atime; - lvb->lvb_ctime = body->mbo_ctime; - lvb->lvb_blocks = body->mbo_dom_blocks; - lvb->lvb_size = body->mbo_dom_size; + if (!(body->mbo_valid & OBD_MD_DOM_SIZE)) + RETURN(-EPROTO); + mdc_body2lvb(body, lvb); RETURN(0); } -int mdc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall, - void *cookie, struct lustre_handle *lockh, - enum ldlm_mode mode, __u64 *flags, int errcode) +int mdc_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req, + osc_enqueue_upcall_f upcall, void *cookie, + struct lustre_handle *lockh, enum ldlm_mode mode, + __u64 *flags, int errcode) { struct osc_lock *ols = cookie; - struct ldlm_lock *lock; + bool glimpse = *flags & LDLM_FL_HAS_INTENT; int rc = 0; ENTRY; - /* The request was created before ldlm_cli_enqueue call. */ - if (errcode == ELDLM_LOCK_ABORTED) { + /* needed only for glimpse from an old server (< 2.14) */ + if (glimpse && !exp_connect_dom_lvb(exp)) + rc = mdc_fill_lvb(req, &ols->ols_lvb); + + if (glimpse && errcode == ELDLM_LOCK_ABORTED) { struct ldlm_reply *rep; rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP); - LASSERT(rep != NULL); - - rep->lock_policy_res2 = - ptlrpc_status_ntoh(rep->lock_policy_res2); - if (rep->lock_policy_res2) - errcode = rep->lock_policy_res2; - - rc = mdc_fill_lvb(req, &ols->ols_lvb); + if (likely(rep)) { + rep->lock_policy_res2 = + ptlrpc_status_ntoh(rep->lock_policy_res2); + if (rep->lock_policy_res2) + errcode = rep->lock_policy_res2; + } else { + rc = -EPROTO; + } *flags |= LDLM_FL_LVB_READY; } else if (errcode == ELDLM_OK) { + struct ldlm_lock *lock; + /* Callers have references, should be valid always */ lock = ldlm_handle2lock(lockh); - LASSERT(lock); - rc = mdc_fill_lvb(req, &lock->l_ost_lvb); + /* At this point ols_lvb must be filled with correct LVB either + * by mdc_fill_lvb() above or by ldlm_cli_enqueue_fini(). + * DoM uses l_ost_lvb to store LVB data, so copy it here from + * just updated ols_lvb. + */ + lock_res_and_lock(lock); + memcpy(&lock->l_ost_lvb, &ols->ols_lvb, + sizeof(lock->l_ost_lvb)); + unlock_res_and_lock(lock); LDLM_LOCK_PUT(lock); *flags |= LDLM_FL_LVB_READY; } @@ -626,6 +655,10 @@ int mdc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req, struct ldlm_lock *lock; struct lustre_handle *lockh = &aa->oa_lockh; enum ldlm_mode mode = aa->oa_mode; + struct ldlm_enqueue_info einfo = { + .ei_type = aa->oa_type, + .ei_mode = mode, + }; ENTRY; @@ -641,7 +674,8 @@ int mdc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req, /* Take an additional reference so that a blocking AST that * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed * to arrive after an upcall has been executed by - * osc_enqueue_fini(). */ + * mdc_enqueue_fini(). + */ ldlm_lock_addref(lockh, mode); /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */ @@ -651,12 +685,12 @@ int mdc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req, OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1); /* Complete obtaining the lock procedure. */ - rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1, - aa->oa_mode, aa->oa_flags, NULL, 0, - lockh, rc); + rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, &einfo, 1, aa->oa_flags, + aa->oa_lvb, aa->oa_lvb ? + sizeof(*aa->oa_lvb) : 0, lockh, rc); /* Complete mdc stuff. */ - rc = mdc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode, - aa->oa_flags, rc); + rc = mdc_enqueue_fini(aa->oa_exp, req, aa->oa_upcall, aa->oa_cookie, + lockh, mode, aa->oa_flags, rc); OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10); @@ -674,8 +708,7 @@ int mdc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req, * release locks just after they are obtained. */ int mdc_enqueue_send(const struct lu_env *env, struct obd_export *exp, struct ldlm_res_id *res_id, __u64 *flags, - union ldlm_policy_data *policy, - struct ost_lvb *lvb, int kms_valid, + union ldlm_policy_data *policy, struct ost_lvb *lvb, osc_enqueue_upcall_f upcall, void *cookie, struct ldlm_enqueue_info *einfo, int async) { @@ -686,8 +719,10 @@ int mdc_enqueue_send(const struct lu_env *env, struct obd_export *exp, enum ldlm_mode mode; bool glimpse = *flags & LDLM_FL_HAS_INTENT; __u64 match_flags = *flags; - struct list_head cancels = LIST_HEAD_INIT(cancels); + LIST_HEAD(cancels); int rc, count; + int lvb_size; + bool compat_glimpse = glimpse && !exp_connect_dom_lvb(exp); ENTRY; @@ -695,16 +730,11 @@ int mdc_enqueue_send(const struct lu_env *env, struct obd_export *exp, if (einfo->ei_mode == LCK_PR) mode |= LCK_PW; + match_flags |= LDLM_FL_LVB_READY; if (glimpse) match_flags |= LDLM_FL_BLOCK_GRANTED; - /* DOM locking uses LDLM_FL_KMS_IGNORE to mark locks wich have no valid - * LVB information, e.g. canceled locks or locks of just pruned object, - * such locks should be skipped. - */ - mode = ldlm_lock_match_with_skip(obd->obd_namespace, match_flags, - LDLM_FL_KMS_IGNORE, res_id, - einfo->ei_type, policy, mode, - &lockh, 0); + mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id, + einfo->ei_type, policy, mode, &lockh); if (mode) { struct ldlm_lock *matched; @@ -712,18 +742,11 @@ int mdc_enqueue_send(const struct lu_env *env, struct obd_export *exp, RETURN(ELDLM_OK); matched = ldlm_handle2lock(&lockh); - /* this shouldn't happen but this check is kept to make - * related test fail if problem occurs - */ - if (unlikely(ldlm_is_kms_ignore(matched))) { - LDLM_ERROR(matched, "matched lock has KMS ignore flag"); - goto no_match; - } if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GLIMPSE_DDOS)) ldlm_set_kms_ignore(matched); - if (mdc_set_dom_lock_data(env, matched, einfo->ei_cbdata)) { + if (mdc_set_dom_lock_data(matched, einfo->ei_cbdata)) { *flags |= LDLM_FL_LVB_READY; /* We already have a lock, and it's referenced. */ @@ -733,7 +756,6 @@ int mdc_enqueue_send(const struct lu_env *env, struct obd_export *exp, LDLM_LOCK_PUT(matched); RETURN(ELDLM_OK); } -no_match: ldlm_lock_decref(&lockh, mode); LDLM_LOCK_PUT(matched); } @@ -741,7 +763,9 @@ no_match: if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK)) RETURN(-ENOLCK); - req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_INTENT); + /* Glimpse is intent on old server */ + req = ptlrpc_request_alloc(class_exp2cliimp(exp), compat_glimpse ? + &RQF_LDLM_INTENT : &RQF_LDLM_ENQUEUE); if (req == NULL) RETURN(-ENOMEM); @@ -759,26 +783,33 @@ no_match: RETURN(rc); } - /* pack the intent */ - lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT); - lit->opc = glimpse ? IT_GLIMPSE : IT_BRW; - - req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, 0); - req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, 0); - ptlrpc_request_set_replen(req); + if (compat_glimpse) { + /* pack the glimpse intent */ + lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT); + lit->opc = IT_GLIMPSE; + } /* users of mdc_enqueue() can pass this flag for ldlm_lock_match() */ *flags &= ~LDLM_FL_BLOCK_GRANTED; - /* All MDC IO locks are intents */ - *flags |= LDLM_FL_HAS_INTENT; - rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, NULL, - 0, LVB_T_NONE, &lockh, async); + + if (compat_glimpse) { + req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, 0); + req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, 0); + lvb_size = 0; + } else { + lvb_size = sizeof(*lvb); + req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, + lvb_size); + } + ptlrpc_request_set_replen(req); + + rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb, + lvb_size, LVB_T_OST, &lockh, async); if (async) { if (!rc) { struct osc_enqueue_args *aa; - CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args)); - aa = ptlrpc_req_async_args(req); + aa = ptlrpc_req_async_args(aa, req); aa->oa_exp = exp; aa->oa_mode = einfo->ei_mode; aa->oa_type = einfo->ei_type; @@ -787,7 +818,7 @@ no_match: aa->oa_cookie = cookie; aa->oa_speculative = false; aa->oa_flags = flags; - aa->oa_lvb = lvb; + aa->oa_lvb = compat_glimpse ? NULL : lvb; req->rq_interpret_reply = mdc_enqueue_interpret; ptlrpcd_add_req(req); @@ -797,7 +828,7 @@ no_match: RETURN(rc); } - rc = mdc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode, + rc = mdc_enqueue_fini(exp, req, upcall, cookie, &lockh, einfo->ei_mode, flags, rc); ptlrpc_req_finished(req); RETURN(rc); @@ -881,11 +912,10 @@ enqueue_base: * osc_lock. */ fid_build_reg_res_name(lu_object_fid(osc2lu(osc)), resname); - mdc_lock_build_policy(env, policy); + mdc_lock_build_policy(env, lock, policy); LASSERT(!oscl->ols_speculative); result = mdc_enqueue_send(env, osc_export(osc), resname, - &oscl->ols_flags, policy, - &oscl->ols_lvb, osc->oo_oinfo->loi_kms_valid, + &oscl->ols_flags, policy, &oscl->ols_lvb, upcall, cookie, &oscl->ols_einfo, async); if (result == 0) { if (osc_lock_is_lockless(oscl)) { @@ -947,6 +977,8 @@ int mdc_lock_init(const struct lu_env *env, struct cl_object *obj, ols->ols_flags = flags; ols->ols_speculative = !!(enqflags & CEF_SPECULATIVE); + if (lock->cll_descr.cld_mode == CLM_GROUP) + ols->ols_flags |= LDLM_FL_ATOMIC_CB; if (ols->ols_flags & LDLM_FL_HAS_INTENT) { ols->ols_flags |= LDLM_FL_BLOCK_GRANTED; @@ -1119,8 +1151,8 @@ static int mdc_io_read_ahead(const struct lu_env *env, ldlm_lock_decref(&lockh, dlmlock->l_req_mode); } - ra->cra_rpc_size = osc_cli(osc)->cl_max_pages_per_rpc; - ra->cra_end = CL_PAGE_EOF; + ra->cra_rpc_pages = osc_cli(osc)->cl_max_pages_per_rpc; + ra->cra_end_idx = CL_PAGE_EOF; ra->cra_release = osc_read_ahead_release; ra->cra_cbdata = dlmlock; @@ -1247,8 +1279,7 @@ static int mdc_io_data_version_start(const struct lu_env *env, ptlrpc_request_set_replen(req); req->rq_interpret_reply = mdc_data_version_interpret; - CLASSERT(sizeof(*dva) <= sizeof(req->rq_async_args)); - dva = ptlrpc_req_async_args(req); + dva = ptlrpc_req_async_args(dva, req); dva->dva_oio = oio; ptlrpcd_add_req(req); @@ -1286,13 +1317,13 @@ static void mdc_io_data_version_end(const struct lu_env *env, static struct cl_io_operations mdc_io_ops = { .op = { [CIT_READ] = { - .cio_iter_init = osc_io_iter_init, - .cio_iter_fini = osc_io_iter_fini, + .cio_iter_init = osc_io_rw_iter_init, + .cio_iter_fini = osc_io_rw_iter_fini, .cio_start = osc_io_read_start, }, [CIT_WRITE] = { - .cio_iter_init = osc_io_write_iter_init, - .cio_iter_fini = osc_io_write_iter_fini, + .cio_iter_init = osc_io_rw_iter_init, + .cio_iter_fini = osc_io_rw_iter_fini, .cio_start = osc_io_write_start, .cio_end = osc_io_end, }, @@ -1316,6 +1347,10 @@ static struct cl_io_operations mdc_io_ops = { .cio_start = mdc_io_fsync_start, .cio_end = osc_io_fsync_end, }, + [CIT_LSEEK] = { + .cio_start = osc_io_lseek_start, + .cio_end = osc_io_lseek_end, + }, }, .cio_read_ahead = mdc_io_read_ahead, .cio_submit = osc_io_submit, @@ -1391,11 +1426,35 @@ static int mdc_attr_get(const struct lu_env *env, struct cl_object *obj, static int mdc_object_ast_clear(struct ldlm_lock *lock, void *data) { + struct osc_object *osc = (struct osc_object *)data; + struct ost_lvb *lvb = &lock->l_ost_lvb; + struct lov_oinfo *oinfo; ENTRY; - if (lock->l_ast_data == data) + if (lock->l_ast_data == data) { lock->l_ast_data = NULL; - ldlm_set_kms_ignore(lock); + + LASSERT(osc != NULL); + LASSERT(osc->oo_oinfo != NULL); + LASSERT(lvb != NULL); + + /* Updates lvb in lock by the cached oinfo */ + oinfo = osc->oo_oinfo; + + LDLM_DEBUG(lock, "update lock size %llu blocks %llu [cma]time: " + "%llu %llu %llu by oinfo size %llu blocks %llu " + "[cma]time %llu %llu %llu", lvb->lvb_size, + lvb->lvb_blocks, lvb->lvb_ctime, lvb->lvb_mtime, + lvb->lvb_atime, oinfo->loi_lvb.lvb_size, + oinfo->loi_lvb.lvb_blocks, oinfo->loi_lvb.lvb_ctime, + oinfo->loi_lvb.lvb_mtime, oinfo->loi_lvb.lvb_atime); + LASSERT(oinfo->loi_lvb.lvb_size >= oinfo->loi_kms); + + cl_object_attr_lock(&osc->oo_cl); + memcpy(lvb, &oinfo->loi_lvb, sizeof(oinfo->loi_lvb)); + cl_object_attr_unlock(&osc->oo_cl); + ldlm_clear_lvb_cached(lock); + } RETURN(LDLM_ITER_CONTINUE); } @@ -1412,6 +1471,17 @@ int mdc_object_prune(const struct lu_env *env, struct cl_object *obj) return 0; } +static int mdc_object_flush(const struct lu_env *env, struct cl_object *obj, + struct ldlm_lock *lock) +{ + /* if lock cancel is initiated from llite then it is combined + * lock with DOM bit and it may have no l_ast_data initialized yet, + * so init it here with given osc_object. + */ + mdc_set_dom_lock_data(lock, cl2osc(obj)); + RETURN(mdc_dlm_canceling(env, lock)); +} + static const struct cl_object_operations mdc_ops = { .coo_page_init = osc_page_init, .coo_lock_init = mdc_lock_init, @@ -1421,6 +1491,7 @@ static const struct cl_object_operations mdc_ops = { .coo_glimpse = osc_object_glimpse, .coo_req_attr_set = mdc_req_attr_set, .coo_prune = mdc_object_prune, + .coo_object_flush = mdc_object_flush }; static const struct osc_object_operations mdc_object_ops = {