X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fmdc%2Fmdc_dev.c;h=5ef4480fa05f5ab797836942a8b4914c169da514;hb=163870abfb7c;hp=e2c680e387a377b50e09a9c626962617e3f9a3bc;hpb=f8929e6d0f3c28639763ba474f663cdfd5181268;p=fs%2Flustre-release.git diff --git a/lustre/mdc/mdc_dev.c b/lustre/mdc/mdc_dev.c index e2c680e..5ef4480 100644 --- a/lustre/mdc/mdc_dev.c +++ b/lustre/mdc/mdc_dev.c @@ -20,7 +20,7 @@ * GPL HEADER END */ /* - * Copyright (c) 2017 Intel Corporation. + * Copyright (c) 2017, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -34,14 +34,20 @@ #include #include +#include +#include #include "mdc_internal.h" static void mdc_lock_build_policy(const struct lu_env *env, + const struct cl_lock *lock, union ldlm_policy_data *policy) { memset(policy, 0, sizeof *policy); policy->l_inodebits.bits = MDS_INODELOCK_DOM; + if (lock) { + policy->l_inodebits.li_gid = lock->cll_descr.cld_gid; + } } int mdc_ldlm_glimpse_ast(struct ldlm_lock *dlmlock, void *data) @@ -67,21 +73,17 @@ static void mdc_lock_lvb_update(const struct lu_env *env, struct ldlm_lock *dlmlock, struct ost_lvb *lvb); -static int mdc_set_dom_lock_data(const struct lu_env *env, - struct ldlm_lock *lock, void *data) +static int mdc_set_dom_lock_data(struct ldlm_lock *lock, void *data) { - struct osc_object *obj = data; int set = 0; LASSERT(lock != NULL); LASSERT(lock->l_glimpse_ast == mdc_ldlm_glimpse_ast); lock_res_and_lock(lock); - if (lock->l_ast_data == NULL) { - lock->l_ast_data = data; - mdc_lock_lvb_update(env, obj, lock, NULL); - } + if (lock->l_ast_data == NULL) + lock->l_ast_data = data; if (lock->l_ast_data == data) set = 1; @@ -91,10 +93,11 @@ static int mdc_set_dom_lock_data(const struct lu_env *env, } int mdc_dom_lock_match(const struct lu_env *env, struct obd_export *exp, - struct ldlm_res_id *res_id, - enum ldlm_type type, union ldlm_policy_data *policy, - enum ldlm_mode mode, __u64 *flags, void *data, - struct lustre_handle *lockh, int unref) + struct ldlm_res_id *res_id, enum ldlm_type type, + union ldlm_policy_data *policy, enum ldlm_mode mode, + __u64 *flags, struct osc_object *obj, + struct lustre_handle *lockh, + enum ldlm_match_flags match_flags) { struct obd_device *obd = exp->exp_obd; __u64 lflags = *flags; @@ -102,16 +105,24 @@ int mdc_dom_lock_match(const struct lu_env *env, struct obd_export *exp, ENTRY; - rc = ldlm_lock_match(obd->obd_namespace, lflags, - res_id, type, policy, mode, lockh, unref); + rc = ldlm_lock_match_with_skip(obd->obd_namespace, lflags, 0, + res_id, type, policy, mode, lockh, match_flags); if (rc == 0 || lflags & LDLM_FL_TEST_LOCK) RETURN(rc); - if (data != NULL) { + if (obj != NULL) { struct ldlm_lock *lock = ldlm_handle2lock(lockh); LASSERT(lock != NULL); - if (!mdc_set_dom_lock_data(env, lock, data)) { + if (mdc_set_dom_lock_data(lock, obj)) { + lock_res_and_lock(lock); + if (!ldlm_is_lvb_cached(lock)) { + LASSERT(lock->l_ast_data == obj); + mdc_lock_lvb_update(env, obj, lock, NULL); + ldlm_set_lvb_cached(lock); + } + unlock_res_and_lock(lock); + } else { ldlm_lock_decref(lockh, rc); rc = 0; } @@ -135,24 +146,32 @@ struct ldlm_lock *mdc_dlmlock_at_pgoff(const struct lu_env *env, struct ldlm_lock *lock = NULL; enum ldlm_mode mode; __u64 flags; + enum ldlm_match_flags match_flags = 0; ENTRY; fid_build_reg_res_name(lu_object_fid(osc2lu(obj)), resname); - mdc_lock_build_policy(env, policy); + mdc_lock_build_policy(env, NULL, policy); + policy->l_inodebits.li_gid = LDLM_GID_ANY; flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING; if (dap_flags & OSC_DAP_FL_TEST_LOCK) flags |= LDLM_FL_TEST_LOCK; + if (dap_flags & OSC_DAP_FL_AST) + match_flags |= LDLM_MATCH_AST; + + if (dap_flags & OSC_DAP_FL_CANCELING) + match_flags |= LDLM_MATCH_UNREF; + again: /* Next, search for already existing extent locks that will cover us */ /* If we're trying to read, we also search for an existing PW lock. The * VFS and page cache already protect us locally, so lots of readers/ * writers can share a single PW lock. */ mode = mdc_dom_lock_match(env, osc_export(obj), resname, LDLM_IBITS, - policy, LCK_PR | LCK_PW, &flags, obj, &lockh, - dap_flags & OSC_DAP_FL_CANCELING); + policy, LCK_PR | LCK_PW | LCK_GROUP, &flags, + obj, &lockh, match_flags); if (mode != 0) { lock = ldlm_handle2lock(&lockh); /* RACE: the lock is cancelled so let's try again */ @@ -166,35 +185,40 @@ again: /** * Check if page @page is covered by an extra lock or discard it. */ -static int mdc_check_and_discard_cb(const struct lu_env *env, struct cl_io *io, - struct osc_page *ops, void *cbdata) +static bool mdc_check_and_discard_cb(const struct lu_env *env, struct cl_io *io, + void **pvec, int count, void *cbdata) { struct osc_thread_info *info = osc_env_info(env); struct osc_object *osc = cbdata; pgoff_t index; - - index = osc_index(ops); - if (index >= info->oti_fn_index) { - struct ldlm_lock *tmp; - struct cl_page *page = ops->ops_cl.cpl_page; - - /* refresh non-overlapped index */ - tmp = mdc_dlmlock_at_pgoff(env, osc, index, - OSC_DAP_FL_TEST_LOCK); - if (tmp != NULL) { - info->oti_fn_index = CL_PAGE_EOF; - LDLM_LOCK_PUT(tmp); - } else if (cl_page_own(env, io, page) == 0) { - /* discard the page */ - cl_page_discard(env, io, page); - cl_page_disown(env, io, page); - } else { - LASSERT(page->cp_state == CPS_FREEING); + int i; + + for (i = 0; i < count; i++) { + struct osc_page *ops = pvec[i]; + + index = osc_index(ops); + if (index >= info->oti_fn_index) { + struct ldlm_lock *tmp; + struct cl_page *page = ops->ops_cl.cpl_page; + + /* refresh non-overlapped index */ + tmp = mdc_dlmlock_at_pgoff(env, osc, index, + OSC_DAP_FL_TEST_LOCK | OSC_DAP_FL_AST); + if (tmp != NULL) { + info->oti_fn_index = CL_PAGE_EOF; + LDLM_LOCK_PUT(tmp); + } else if (cl_page_own(env, io, page) == 0) { + /* discard the page */ + cl_page_discard(env, io, page); + cl_page_disown(env, io, page); + } else { + LASSERT(page->cp_state == CPS_FREEING); + } } - } - info->oti_next_index = index + 1; - return CLP_GANG_OKAY; + info->oti_next_index = index + 1; + } + return true; } /** @@ -213,7 +237,6 @@ static int mdc_lock_discard_pages(const struct lu_env *env, struct osc_thread_info *info = osc_env_info(env); struct cl_io *io = &info->oti_io; osc_page_gang_cbt cb; - int res; int result; ENTRY; @@ -226,15 +249,9 @@ static int mdc_lock_discard_pages(const struct lu_env *env, cb = discard ? osc_discard_cb : mdc_check_and_discard_cb; info->oti_fn_index = info->oti_next_index = start; - do { - res = osc_page_gang_lookup(env, io, osc, info->oti_next_index, - end, cb, (void *)osc); - if (info->oti_next_index > end) - break; - if (res == CLP_GANG_RESCHED) - cond_resched(); - } while (res != CLP_GANG_OKAY); + osc_page_gang_lookup(env, io, osc, info->oti_next_index, + end, cb, (void *)osc); out: cl_io_fini(env, io); RETURN(result); @@ -259,7 +276,9 @@ static int mdc_lock_flush(const struct lu_env *env, struct osc_object *obj, result = 0; } - rc = mdc_lock_discard_pages(env, obj, start, end, discard); + /* Avoid lock matching with CLM_WRITE, there can be no other locks */ + rc = mdc_lock_discard_pages(env, obj, start, end, + mode == CLM_WRITE || discard); if (result == 0 && rc < 0) result = rc; @@ -288,9 +307,8 @@ void mdc_lock_lockless_cancel(const struct lu_env *env, * Helper for osc_dlm_blocking_ast() handling discrepancies between cl_lock * and ldlm_lock caches. */ -static int mdc_dlm_blocking_ast0(const struct lu_env *env, - struct ldlm_lock *dlmlock, - void *data, int flag) +static int mdc_dlm_canceling(const struct lu_env *env, + struct ldlm_lock *dlmlock) { struct cl_object *obj = NULL; int result = 0; @@ -299,11 +317,8 @@ static int mdc_dlm_blocking_ast0(const struct lu_env *env, ENTRY; - LASSERT(flag == LDLM_CB_CANCELING); - LASSERT(dlmlock != NULL); - lock_res_and_lock(dlmlock); - if (dlmlock->l_granted_mode != dlmlock->l_req_mode) { + if (!ldlm_is_granted(dlmlock)) { dlmlock->l_ast_data = NULL; unlock_res_and_lock(dlmlock); RETURN(0); @@ -315,10 +330,8 @@ static int mdc_dlm_blocking_ast0(const struct lu_env *env, if (dlmlock->l_ast_data != NULL) { obj = osc2cl(dlmlock->l_ast_data); - dlmlock->l_ast_data = NULL; cl_object_get(obj); } - ldlm_set_kms_ignore(dlmlock); unlock_res_and_lock(dlmlock); /* if l_ast_data is NULL, the dlmlock was enqueued by AGL or @@ -334,6 +347,7 @@ static int mdc_dlm_blocking_ast0(const struct lu_env *env, */ /* losing a lock, update kms */ lock_res_and_lock(dlmlock); + dlmlock->l_ast_data = NULL; cl_object_attr_lock(obj); attr->cat_kms = 0; cl_object_attr_update(env, obj, attr, CAT_KMS); @@ -345,13 +359,13 @@ static int mdc_dlm_blocking_ast0(const struct lu_env *env, } int mdc_ldlm_blocking_ast(struct ldlm_lock *dlmlock, - struct ldlm_lock_desc *new, void *data, int flag) + struct ldlm_lock_desc *new, void *data, int reason) { int rc = 0; ENTRY; - switch (flag) { + switch (reason) { case LDLM_CB_BLOCKING: { struct lustre_handle lockh; @@ -382,7 +396,7 @@ int mdc_ldlm_blocking_ast(struct ldlm_lock *dlmlock, break; } - rc = mdc_dlm_blocking_ast0(env, dlmlock, data, flag); + rc = mdc_dlm_canceling(env, dlmlock); cl_env_put(env, &refcheck); break; } @@ -408,6 +422,7 @@ void mdc_lock_lvb_update(const struct lu_env *env, struct osc_object *osc, struct cl_attr *attr = &osc_env_info(env)->oti_attr; unsigned valid = CAT_BLOCKS | CAT_ATIME | CAT_CTIME | CAT_MTIME | CAT_SIZE; + unsigned int setkms = 0; ENTRY; @@ -425,24 +440,32 @@ void mdc_lock_lvb_update(const struct lu_env *env, struct osc_object *osc, size = lvb->lvb_size; if (size >= oinfo->loi_kms) { - LDLM_DEBUG(dlmlock, "lock acquired, setting rss=%llu," - " kms=%llu", lvb->lvb_size, size); valid |= CAT_KMS; attr->cat_kms = size; - } else { - LDLM_DEBUG(dlmlock, "lock acquired, setting rss=%llu," - " leaving kms=%llu", - lvb->lvb_size, oinfo->loi_kms); + setkms = 1; } + ldlm_lock_allow_match_locked(dlmlock); } + + /* The size should not be less than the kms */ + if (attr->cat_size < oinfo->loi_kms) + attr->cat_size = oinfo->loi_kms; + + LDLM_DEBUG(dlmlock, "acquired size %llu, setting rss=%llu;%s " + "kms=%llu, end=%llu", lvb->lvb_size, attr->cat_size, + setkms ? "" : " leaving", + setkms ? attr->cat_kms : oinfo->loi_kms, + dlmlock ? dlmlock->l_policy_data.l_extent.end : -1ull); + cl_object_attr_update(env, obj, attr, valid); cl_object_attr_unlock(obj); EXIT; } static void mdc_lock_granted(const struct lu_env *env, struct osc_lock *oscl, - struct lustre_handle *lockh, bool lvb_update) + struct lustre_handle *lockh) { + struct osc_object *osc = cl2osc(oscl->ols_cl.cls_obj); struct ldlm_lock *dlmlock; ENTRY; @@ -471,7 +494,7 @@ static void mdc_lock_granted(const struct lu_env *env, struct osc_lock *oscl, /* Lock must have been granted. */ lock_res_and_lock(dlmlock); - if (dlmlock->l_granted_mode == dlmlock->l_req_mode) { + if (ldlm_is_granted(dlmlock)) { struct cl_lock_descr *descr = &oscl->ols_cl.cls_lock->cll_descr; /* extend the lock extent, otherwise it will have problem when @@ -481,10 +504,11 @@ static void mdc_lock_granted(const struct lu_env *env, struct osc_lock *oscl, descr->cld_end = CL_PAGE_EOF; /* no lvb update for matched lock */ - if (lvb_update) { + if (!ldlm_is_lvb_cached(dlmlock)) { LASSERT(oscl->ols_flags & LDLM_FL_LVB_READY); - mdc_lock_lvb_update(env, cl2osc(oscl->ols_cl.cls_obj), - dlmlock, NULL); + LASSERT(osc == dlmlock->l_ast_data); + mdc_lock_lvb_update(env, osc, dlmlock, NULL); + ldlm_set_lvb_cached(dlmlock); } } unlock_res_and_lock(dlmlock); @@ -496,7 +520,7 @@ static void mdc_lock_granted(const struct lu_env *env, struct osc_lock *oscl, /** * Lock upcall function that is executed either when a reply to ENQUEUE rpc is - * received from a server, or after osc_enqueue_base() matched a local DLM + * received from a server, or after mdc_enqueue_send() matched a local DLM * lock. */ static int mdc_lock_upcall(void *cookie, struct lustre_handle *lockh, @@ -525,21 +549,10 @@ static int mdc_lock_upcall(void *cookie, struct lustre_handle *lockh, CDEBUG(D_INODE, "rc %d, err %d\n", rc, errcode); if (rc == 0) - mdc_lock_granted(env, oscl, lockh, errcode == ELDLM_OK); + mdc_lock_granted(env, oscl, lockh); /* Error handling, some errors are tolerable. */ - if (oscl->ols_locklessable && rc == -EUSERS) { - /* This is a tolerable error, turn this lock into - * lockless lock. - */ - osc_object_set_contended(cl2osc(slice->cls_obj)); - LASSERT(slice->cls_ops != oscl->ols_lockless_ops); - - /* Change this lock to ldlmlock-less lock. */ - osc_lock_to_lockless(env, oscl, 1); - oscl->ols_state = OLS_GRANTED; - rc = 0; - } else if (oscl->ols_glimpse && rc == -ENAVAIL) { + if (oscl->ols_glimpse && rc == -ENAVAIL) { LASSERT(oscl->ols_flags & LDLM_FL_LVB_READY); mdc_lock_lvb_update(env, cl2osc(slice->cls_obj), NULL, &oscl->ols_lvb); @@ -554,53 +567,66 @@ static int mdc_lock_upcall(void *cookie, struct lustre_handle *lockh, RETURN(rc); } -int mdc_fill_lvb(struct ptlrpc_request *req, struct ost_lvb *lvb) +/* This is needed only for old servers (before 2.14) support */ +int mdc_fill_lvb(struct req_capsule *pill, struct ost_lvb *lvb) { struct mdt_body *body; - body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY); + /* get LVB data from mdt_body otherwise */ + body = req_capsule_server_get(pill, &RMF_MDT_BODY); if (!body) RETURN(-EPROTO); - lvb->lvb_mtime = body->mbo_mtime; - lvb->lvb_atime = body->mbo_atime; - lvb->lvb_ctime = body->mbo_ctime; - lvb->lvb_blocks = body->mbo_dom_blocks; - lvb->lvb_size = body->mbo_dom_size; + if (!(body->mbo_valid & OBD_MD_DOM_SIZE)) + RETURN(-EPROTO); + mdc_body2lvb(body, lvb); RETURN(0); } -int mdc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall, - void *cookie, struct lustre_handle *lockh, - enum ldlm_mode mode, __u64 *flags, int errcode) +int mdc_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req, + osc_enqueue_upcall_f upcall, void *cookie, + struct lustre_handle *lockh, enum ldlm_mode mode, + __u64 *flags, int errcode) { struct osc_lock *ols = cookie; - struct ldlm_lock *lock; + bool glimpse = *flags & LDLM_FL_HAS_INTENT; int rc = 0; ENTRY; - /* The request was created before ldlm_cli_enqueue call. */ - if (errcode == ELDLM_LOCK_ABORTED) { + /* needed only for glimpse from an old server (< 2.14) */ + if (glimpse && !exp_connect_dom_lvb(exp)) + rc = mdc_fill_lvb(&req->rq_pill, &ols->ols_lvb); + + if (glimpse && errcode == ELDLM_LOCK_ABORTED) { struct ldlm_reply *rep; rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP); - LASSERT(rep != NULL); - - rep->lock_policy_res2 = - ptlrpc_status_ntoh(rep->lock_policy_res2); - if (rep->lock_policy_res2) - errcode = rep->lock_policy_res2; - - rc = mdc_fill_lvb(req, &ols->ols_lvb); + if (likely(rep)) { + rep->lock_policy_res2 = + ptlrpc_status_ntoh(rep->lock_policy_res2); + if (rep->lock_policy_res2) + errcode = rep->lock_policy_res2; + } else { + rc = -EPROTO; + } *flags |= LDLM_FL_LVB_READY; } else if (errcode == ELDLM_OK) { + struct ldlm_lock *lock; + /* Callers have references, should be valid always */ lock = ldlm_handle2lock(lockh); - LASSERT(lock); - rc = mdc_fill_lvb(req, &lock->l_ost_lvb); + /* At this point ols_lvb must be filled with correct LVB either + * by mdc_fill_lvb() above or by ldlm_cli_enqueue_fini(). + * DoM uses l_ost_lvb to store LVB data, so copy it here from + * just updated ols_lvb. + */ + lock_res_and_lock(lock); + memcpy(&lock->l_ost_lvb, &ols->ols_lvb, + sizeof(lock->l_ost_lvb)); + unlock_res_and_lock(lock); LDLM_LOCK_PUT(lock); *flags |= LDLM_FL_LVB_READY; } @@ -618,11 +644,16 @@ int mdc_enqueue_fini(struct ptlrpc_request *req, osc_enqueue_upcall_f upcall, } int mdc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req, - struct osc_enqueue_args *aa, int rc) + void *args, int rc) { + struct osc_enqueue_args *aa = args; struct ldlm_lock *lock; struct lustre_handle *lockh = &aa->oa_lockh; enum ldlm_mode mode = aa->oa_mode; + struct ldlm_enqueue_info einfo = { + .ei_type = aa->oa_type, + .ei_mode = mode, + }; ENTRY; @@ -638,7 +669,8 @@ int mdc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req, /* Take an additional reference so that a blocking AST that * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed * to arrive after an upcall has been executed by - * osc_enqueue_fini(). */ + * mdc_enqueue_fini(). + */ ldlm_lock_addref(lockh, mode); /* Let cl_lock_state_wait fail with -ERESTARTSYS to unuse sublocks. */ @@ -648,12 +680,12 @@ int mdc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req, OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1); /* Complete obtaining the lock procedure. */ - rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_type, 1, - aa->oa_mode, aa->oa_flags, NULL, 0, - lockh, rc); + rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, &einfo, 1, aa->oa_flags, + aa->oa_lvb, aa->oa_lvb ? + sizeof(*aa->oa_lvb) : 0, lockh, rc); /* Complete mdc stuff. */ - rc = mdc_enqueue_fini(req, aa->oa_upcall, aa->oa_cookie, lockh, mode, - aa->oa_flags, rc); + rc = mdc_enqueue_fini(aa->oa_exp, req, aa->oa_upcall, aa->oa_cookie, + lockh, mode, aa->oa_flags, rc); OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10); @@ -671,8 +703,7 @@ int mdc_enqueue_interpret(const struct lu_env *env, struct ptlrpc_request *req, * release locks just after they are obtained. */ int mdc_enqueue_send(const struct lu_env *env, struct obd_export *exp, struct ldlm_res_id *res_id, __u64 *flags, - union ldlm_policy_data *policy, - struct ost_lvb *lvb, int kms_valid, + union ldlm_policy_data *policy, struct ost_lvb *lvb, osc_enqueue_upcall_f upcall, void *cookie, struct ldlm_enqueue_info *einfo, int async) { @@ -683,7 +714,10 @@ int mdc_enqueue_send(const struct lu_env *env, struct obd_export *exp, enum ldlm_mode mode; bool glimpse = *flags & LDLM_FL_HAS_INTENT; __u64 match_flags = *flags; - int rc; + LIST_HEAD(cancels); + int rc, count; + int lvb_size; + bool compat_glimpse = glimpse && !exp_connect_dom_lvb(exp); ENTRY; @@ -691,10 +725,11 @@ int mdc_enqueue_send(const struct lu_env *env, struct obd_export *exp, if (einfo->ei_mode == LCK_PR) mode |= LCK_PW; - if (!glimpse) + match_flags |= LDLM_FL_LVB_READY; + if (glimpse) match_flags |= LDLM_FL_BLOCK_GRANTED; mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id, - einfo->ei_type, policy, mode, &lockh, 0); + einfo->ei_type, policy, mode, &lockh); if (mode) { struct ldlm_lock *matched; @@ -702,10 +737,11 @@ int mdc_enqueue_send(const struct lu_env *env, struct obd_export *exp, RETURN(ELDLM_OK); matched = ldlm_handle2lock(&lockh); - if (ldlm_is_kms_ignore(matched)) - goto no_match; - if (mdc_set_dom_lock_data(env, matched, einfo->ei_cbdata)) { + if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GLIMPSE_DDOS)) + ldlm_set_kms_ignore(matched); + + if (mdc_set_dom_lock_data(matched, einfo->ei_cbdata)) { *flags |= LDLM_FL_LVB_READY; /* We already have a lock, and it's referenced. */ @@ -715,7 +751,6 @@ int mdc_enqueue_send(const struct lu_env *env, struct obd_export *exp, LDLM_LOCK_PUT(matched); RETURN(ELDLM_OK); } -no_match: ldlm_lock_decref(&lockh, mode); LDLM_LOCK_PUT(matched); } @@ -723,36 +758,53 @@ no_match: if (*flags & (LDLM_FL_TEST_LOCK | LDLM_FL_MATCH_LOCK)) RETURN(-ENOLCK); - req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_INTENT); + /* Glimpse is intent on old server */ + req = ptlrpc_request_alloc(class_exp2cliimp(exp), compat_glimpse ? + &RQF_LDLM_INTENT : &RQF_LDLM_ENQUEUE); if (req == NULL) RETURN(-ENOMEM); - rc = ldlm_prep_enqueue_req(exp, req, NULL, 0); + /* For WRITE lock cancel other locks on resource early if any */ + if (einfo->ei_mode & LCK_PW) + count = mdc_resource_get_unused_res(exp, res_id, &cancels, + einfo->ei_mode, + MDS_INODELOCK_DOM); + else + count = 0; + + rc = ldlm_prep_enqueue_req(exp, req, &cancels, count); if (rc < 0) { ptlrpc_request_free(req); RETURN(rc); } - /* pack the intent */ - lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT); - lit->opc = glimpse ? IT_GLIMPSE : IT_BRW; - - req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, 0); - req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, 0); - ptlrpc_request_set_replen(req); + if (compat_glimpse) { + /* pack the glimpse intent */ + lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT); + lit->opc = IT_GLIMPSE; + } /* users of mdc_enqueue() can pass this flag for ldlm_lock_match() */ *flags &= ~LDLM_FL_BLOCK_GRANTED; - /* All MDC IO locks are intents */ - *flags |= LDLM_FL_HAS_INTENT; - rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, NULL, - 0, LVB_T_NONE, &lockh, async); + + if (compat_glimpse) { + req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, 0); + req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, 0); + lvb_size = 0; + } else { + lvb_size = sizeof(*lvb); + req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, + lvb_size); + } + ptlrpc_request_set_replen(req); + + rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb, + lvb_size, LVB_T_OST, &lockh, async); if (async) { if (!rc) { struct osc_enqueue_args *aa; - CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args)); - aa = ptlrpc_req_async_args(req); + aa = ptlrpc_req_async_args(aa, req); aa->oa_exp = exp; aa->oa_mode = einfo->ei_mode; aa->oa_type = einfo->ei_type; @@ -761,10 +813,9 @@ no_match: aa->oa_cookie = cookie; aa->oa_speculative = false; aa->oa_flags = flags; - aa->oa_lvb = lvb; + aa->oa_lvb = compat_glimpse ? NULL : lvb; - req->rq_interpret_reply = - (ptlrpc_interpterer_t)mdc_enqueue_interpret; + req->rq_interpret_reply = mdc_enqueue_interpret; ptlrpcd_add_req(req); } else { ptlrpc_req_finished(req); @@ -772,7 +823,7 @@ no_match: RETURN(rc); } - rc = mdc_enqueue_fini(req, upcall, cookie, &lockh, einfo->ei_mode, + rc = mdc_enqueue_fini(exp, req, upcall, cookie, &lockh, einfo->ei_mode, flags, rc); ptlrpc_req_finished(req); RETURN(rc); @@ -856,11 +907,10 @@ enqueue_base: * osc_lock. */ fid_build_reg_res_name(lu_object_fid(osc2lu(osc)), resname); - mdc_lock_build_policy(env, policy); + mdc_lock_build_policy(env, lock, policy); LASSERT(!oscl->ols_speculative); result = mdc_enqueue_send(env, osc_export(osc), resname, - &oscl->ols_flags, policy, - &oscl->ols_lvb, osc->oo_oinfo->loi_kms_valid, + &oscl->ols_flags, policy, &oscl->ols_lvb, upcall, cookie, &oscl->ols_einfo, async); if (result == 0) { if (osc_lock_is_lockless(oscl)) { @@ -922,6 +972,8 @@ int mdc_lock_init(const struct lu_env *env, struct cl_object *obj, ols->ols_flags = flags; ols->ols_speculative = !!(enqflags & CEF_SPECULATIVE); + if (lock->cll_descr.cld_mode == CLM_GROUP) + ols->ols_flags |= LDLM_FL_ATOMIC_CB; if (ols->ols_flags & LDLM_FL_HAS_INTENT) { ols->ols_flags |= LDLM_FL_BLOCK_GRANTED; @@ -933,8 +985,6 @@ int mdc_lock_init(const struct lu_env *env, struct cl_object *obj, if (!(enqflags & CEF_MUST)) osc_lock_to_lockless(env, ols, (enqflags & CEF_NEVER)); - if (ols->ols_locklessable && !(enqflags & CEF_DISCARD_DATA)) - ols->ols_flags |= LDLM_FL_DENY_ON_CONTENTION; if (io->ci_type == CIT_WRITE || cl_io_is_mkwrite(io)) osc_lock_set_writer(env, io, obj, ols); @@ -959,6 +1009,33 @@ static int mdc_async_upcall(void *a, int rc) return 0; } +static int mdc_get_lock_handle(const struct lu_env *env, struct osc_object *osc, + pgoff_t index, struct lustre_handle *lh) +{ + struct ldlm_lock *lock; + + /* find DOM lock protecting object */ + lock = mdc_dlmlock_at_pgoff(env, osc, index, + OSC_DAP_FL_TEST_LOCK | + OSC_DAP_FL_CANCELING); + if (lock == NULL) { + struct ldlm_resource *res; + struct ldlm_res_id *resname; + + resname = &osc_env_info(env)->oti_resname; + fid_build_reg_res_name(lu_object_fid(osc2lu(osc)), resname); + res = ldlm_resource_get(osc_export(osc)->exp_obd->obd_namespace, + NULL, resname, LDLM_IBITS, 0); + ldlm_resource_dump(D_ERROR, res); + libcfs_debug_dumpstack(NULL); + return -ENOENT; + } else { + *lh = lock->l_remote_handle; + LDLM_LOCK_PUT(lock); + } + return 0; +} + static int mdc_io_setattr_start(const struct lu_env *env, const struct cl_io_slice *slice) { @@ -970,7 +1047,8 @@ static int mdc_io_setattr_start(const struct lu_env *env, struct obdo *oa = &oio->oi_oa; struct osc_async_cbargs *cbargs = &oio->oi_cbarg; __u64 size = io->u.ci_setattr.sa_attr.lvb_size; - unsigned int ia_valid = io->u.ci_setattr.sa_valid; + unsigned int ia_avalid = io->u.ci_setattr.sa_avalid; + enum op_xvalid ia_xvalid = io->u.ci_setattr.sa_xvalid; int rc; /* silently ignore non-truncate setattr for Data-on-MDT object */ @@ -980,6 +1058,11 @@ static int mdc_io_setattr_start(const struct lu_env *env, &oio->oi_trunc); if (rc < 0) return rc; + } else if (cl_io_is_fallocate(io) && + io->u.ci_setattr.sa_falloc_mode & FALLOC_FL_PUNCH_HOLE) { + rc = osc_punch_start(env, io, obj); + if (rc < 0) + return rc; } if (oio->oi_lockless == 0) { @@ -989,19 +1072,20 @@ static int mdc_io_setattr_start(const struct lu_env *env, struct ost_lvb *lvb = &io->u.ci_setattr.sa_attr; unsigned int cl_valid = 0; - if (ia_valid & ATTR_SIZE) { - attr->cat_size = attr->cat_kms = size; + if (ia_avalid & ATTR_SIZE) { + attr->cat_size = size; + attr->cat_kms = size; cl_valid = (CAT_SIZE | CAT_KMS); } - if (ia_valid & ATTR_MTIME_SET) { + if (ia_avalid & ATTR_MTIME_SET) { attr->cat_mtime = lvb->lvb_mtime; cl_valid |= CAT_MTIME; } - if (ia_valid & ATTR_ATIME_SET) { + if (ia_avalid & ATTR_ATIME_SET) { attr->cat_atime = lvb->lvb_atime; cl_valid |= CAT_ATIME; } - if (ia_valid & ATTR_CTIME_SET) { + if (ia_xvalid & OP_XVALID_CTIME_SET) { attr->cat_ctime = lvb->lvb_ctime; cl_valid |= CAT_CTIME; } @@ -1012,7 +1096,7 @@ static int mdc_io_setattr_start(const struct lu_env *env, return rc; } - if (!(ia_valid & ATTR_SIZE)) + if (!(ia_avalid & ATTR_SIZE) && !cl_io_is_fallocate(io)) return 0; memset(oa, 0, sizeof(*oa)); @@ -1020,21 +1104,34 @@ static int mdc_io_setattr_start(const struct lu_env *env, oa->o_mtime = attr->cat_mtime; oa->o_atime = attr->cat_atime; oa->o_ctime = attr->cat_ctime; - - oa->o_size = size; - oa->o_blocks = OBD_OBJECT_EOF; oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP | OBD_MD_FLATIME | OBD_MD_FLCTIME | OBD_MD_FLMTIME | OBD_MD_FLSIZE | OBD_MD_FLBLOCKS; + if (oio->oi_lockless) { oa->o_flags = OBD_FL_SRVLOCK; oa->o_valid |= OBD_MD_FLFLAGS; + } else { + rc = mdc_get_lock_handle(env, cl2osc(obj), CL_PAGE_EOF, + &oa->o_handle); + if (!rc) + oa->o_valid |= OBD_MD_FLHANDLE; } init_completion(&cbargs->opc_sync); + if (cl_io_is_fallocate(io)) { + int falloc_mode = io->u.ci_setattr.sa_falloc_mode; - rc = osc_punch_send(osc_export(cl2osc(obj)), oa, - mdc_async_upcall, cbargs); + oa->o_size = io->u.ci_setattr.sa_falloc_offset; + oa->o_blocks = io->u.ci_setattr.sa_falloc_end; + rc = osc_fallocate_base(osc_export(cl2osc(obj)), oa, + mdc_async_upcall, cbargs, falloc_mode); + } else { + oa->o_size = size; + oa->o_blocks = OBD_OBJECT_EOF; + rc = osc_punch_send(osc_export(cl2osc(obj)), oa, + mdc_async_upcall, cbargs); + } cbargs->opc_rpc_sent = rc == 0; return rc; } @@ -1044,6 +1141,7 @@ static int mdc_io_read_ahead(const struct lu_env *env, pgoff_t start, struct cl_read_ahead *ra) { struct osc_object *osc = cl2osc(ios->cis_obj); + struct osc_io *oio = cl2osc_io(env, ios); struct ldlm_lock *dlmlock; ENTRY; @@ -1052,6 +1150,7 @@ static int mdc_io_read_ahead(const struct lu_env *env, if (dlmlock == NULL) RETURN(-ENODATA); + oio->oi_is_readahead = 1; if (dlmlock->l_req_mode != LCK_PR) { struct lustre_handle lockh; @@ -1060,10 +1159,11 @@ static int mdc_io_read_ahead(const struct lu_env *env, ldlm_lock_decref(&lockh, dlmlock->l_req_mode); } - ra->cra_rpc_size = osc_cli(osc)->cl_max_pages_per_rpc; - ra->cra_end = CL_PAGE_EOF; + ra->cra_rpc_pages = osc_cli(osc)->cl_max_pages_per_rpc; + ra->cra_end_idx = CL_PAGE_EOF; ra->cra_release = osc_read_ahead_release; - ra->cra_cbdata = dlmlock; + ra->cra_dlmlock = dlmlock; + ra->cra_oio = oio; RETURN(0); } @@ -1106,16 +1206,133 @@ int mdc_io_fsync_start(const struct lu_env *env, RETURN(result); } -static struct cl_io_operations mdc_io_ops = { +struct mdc_data_version_args { + struct osc_io *dva_oio; +}; + +static int +mdc_data_version_interpret(const struct lu_env *env, struct ptlrpc_request *req, + void *args, int rc) +{ + struct mdc_data_version_args *dva = args; + struct osc_io *oio = dva->dva_oio; + const struct mdt_body *body; + + ENTRY; + if (rc < 0) + GOTO(out, rc); + + body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY); + if (body == NULL) + GOTO(out, rc = -EPROTO); + + /* Prepare OBDO from mdt_body for CLIO */ + oio->oi_oa.o_valid = body->mbo_valid; + oio->oi_oa.o_flags = body->mbo_flags; + oio->oi_oa.o_data_version = body->mbo_version; + oio->oi_oa.o_layout_version = body->mbo_layout_gen; + EXIT; +out: + oio->oi_cbarg.opc_rc = rc; + complete(&oio->oi_cbarg.opc_sync); + return 0; +} + +static int mdc_io_data_version_start(const struct lu_env *env, + const struct cl_io_slice *slice) +{ + struct cl_data_version_io *dv = &slice->cis_io->u.ci_data_version; + struct osc_io *oio = cl2osc_io(env, slice); + struct osc_async_cbargs *cbargs = &oio->oi_cbarg; + struct osc_object *obj = cl2osc(slice->cis_obj); + struct obd_export *exp = osc_export(obj); + struct ptlrpc_request *req; + struct mdt_body *body; + struct mdc_data_version_args *dva; + int rc; + + ENTRY; + + memset(&oio->oi_oa, 0, sizeof(oio->oi_oa)); + oio->oi_oa.o_oi.oi_fid = *lu_object_fid(osc2lu(obj)); + oio->oi_oa.o_valid = OBD_MD_FLID; + + init_completion(&cbargs->opc_sync); + + req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_GETATTR); + if (req == NULL) + RETURN(-ENOMEM); + + rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_GETATTR); + if (rc < 0) { + ptlrpc_request_free(req); + RETURN(rc); + } + + body = req_capsule_client_get(&req->rq_pill, &RMF_MDT_BODY); + body->mbo_fid1 = *lu_object_fid(osc2lu(obj)); + body->mbo_valid = OBD_MD_FLID; + /* Indicate that data version is needed */ + body->mbo_valid |= OBD_MD_FLDATAVERSION; + body->mbo_flags = 0; + + if (dv->dv_flags & (LL_DV_RD_FLUSH | LL_DV_WR_FLUSH)) { + body->mbo_valid |= OBD_MD_FLFLAGS; + body->mbo_flags |= OBD_FL_SRVLOCK; + if (dv->dv_flags & LL_DV_WR_FLUSH) + body->mbo_flags |= OBD_FL_FLUSH; + } + + req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, 0); + req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, 0); + ptlrpc_request_set_replen(req); + + req->rq_interpret_reply = mdc_data_version_interpret; + dva = ptlrpc_req_async_args(dva, req); + dva->dva_oio = oio; + + ptlrpcd_add_req(req); + + RETURN(0); +} + +static void mdc_io_data_version_end(const struct lu_env *env, + const struct cl_io_slice *slice) +{ + struct cl_data_version_io *dv = &slice->cis_io->u.ci_data_version; + struct osc_io *oio = cl2osc_io(env, slice); + struct osc_async_cbargs *cbargs = &oio->oi_cbarg; + + ENTRY; + wait_for_completion(&cbargs->opc_sync); + + if (cbargs->opc_rc != 0) { + slice->cis_io->ci_result = cbargs->opc_rc; + } else { + slice->cis_io->ci_result = 0; + if (!(oio->oi_oa.o_valid & + (OBD_MD_LAYOUT_VERSION | OBD_MD_FLDATAVERSION))) + slice->cis_io->ci_result = -ENOTSUPP; + + if (oio->oi_oa.o_valid & OBD_MD_LAYOUT_VERSION) + dv->dv_layout_version = oio->oi_oa.o_layout_version; + if (oio->oi_oa.o_valid & OBD_MD_FLDATAVERSION) + dv->dv_data_version = oio->oi_oa.o_data_version; + } + + EXIT; +} + +static const struct cl_io_operations mdc_io_ops = { .op = { [CIT_READ] = { .cio_iter_init = osc_io_iter_init, - .cio_iter_fini = osc_io_iter_fini, + .cio_iter_fini = osc_io_rw_iter_fini, .cio_start = osc_io_read_start, }, [CIT_WRITE] = { - .cio_iter_init = osc_io_write_iter_init, - .cio_iter_fini = osc_io_write_iter_fini, + .cio_iter_init = osc_io_iter_init, + .cio_iter_fini = osc_io_rw_iter_fini, .cio_start = osc_io_write_start, .cio_end = osc_io_end, }, @@ -1125,10 +1342,9 @@ static struct cl_io_operations mdc_io_ops = { .cio_start = mdc_io_setattr_start, .cio_end = osc_io_setattr_end, }, - /* no support for data version so far */ [CIT_DATA_VERSION] = { - .cio_start = NULL, - .cio_end = NULL, + .cio_start = mdc_io_data_version_start, + .cio_end = mdc_io_data_version_end, }, [CIT_FAULT] = { .cio_iter_init = osc_io_iter_init, @@ -1140,10 +1356,16 @@ static struct cl_io_operations mdc_io_ops = { .cio_start = mdc_io_fsync_start, .cio_end = osc_io_fsync_end, }, + [CIT_LSEEK] = { + .cio_start = osc_io_lseek_start, + .cio_end = osc_io_lseek_end, + }, }, .cio_read_ahead = mdc_io_read_ahead, + .cio_lru_reserve = osc_io_lru_reserve, .cio_submit = osc_io_submit, .cio_commit_async = osc_io_commit_async, + .cio_extent_release = osc_io_extent_release, }; int mdc_io_init(const struct lu_env *env, struct cl_object *obj, @@ -1182,35 +1404,22 @@ static void mdc_req_attr_set(const struct lu_env *env, struct cl_object *obj, attr->cra_oa->o_valid |= OBD_MD_FLID; if (flags & OBD_MD_FLHANDLE) { - struct ldlm_lock *lock; /* _some_ lock protecting @apage */ struct osc_page *opg; opg = osc_cl_page_osc(attr->cra_page, cl2osc(obj)); - lock = mdc_dlmlock_at_pgoff(env, cl2osc(obj), osc_index(opg), - OSC_DAP_FL_TEST_LOCK | OSC_DAP_FL_CANCELING); - if (lock == NULL && !opg->ops_srvlock) { - struct ldlm_resource *res; - struct ldlm_res_id *resname; - - CL_PAGE_DEBUG(D_ERROR, env, attr->cra_page, - "uncovered page!\n"); - - resname = &osc_env_info(env)->oti_resname; - mdc_build_res_name(cl2osc(obj), resname); - res = ldlm_resource_get( - osc_export(cl2osc(obj))->exp_obd->obd_namespace, - NULL, resname, LDLM_IBITS, 0); - ldlm_resource_dump(D_ERROR, res); - - libcfs_debug_dumpstack(NULL); - LBUG(); - } - - /* check for lockless io. */ - if (lock != NULL) { - attr->cra_oa->o_handle = lock->l_remote_handle; - attr->cra_oa->o_valid |= OBD_MD_FLHANDLE; - LDLM_LOCK_PUT(lock); + if (!opg->ops_srvlock) { + int rc; + + rc = mdc_get_lock_handle(env, cl2osc(obj), + osc_index(opg), + &attr->cra_oa->o_handle); + if (rc) { + CL_PAGE_DEBUG(D_ERROR, env, attr->cra_page, + "uncovered page!\n"); + LBUG(); + } else { + attr->cra_oa->o_valid |= OBD_MD_FLHANDLE; + } } } } @@ -1228,12 +1437,34 @@ static int mdc_attr_get(const struct lu_env *env, struct cl_object *obj, static int mdc_object_ast_clear(struct ldlm_lock *lock, void *data) { + struct osc_object *osc = (struct osc_object *)data; + struct ost_lvb *lvb = &lock->l_ost_lvb; + struct lov_oinfo *oinfo; ENTRY; - if ((lock->l_ast_data == NULL && !ldlm_is_kms_ignore(lock)) || - (lock->l_ast_data == data)) { + if (lock->l_ast_data == data) { lock->l_ast_data = NULL; - ldlm_set_kms_ignore(lock); + + LASSERT(osc != NULL); + LASSERT(osc->oo_oinfo != NULL); + LASSERT(lvb != NULL); + + /* Updates lvb in lock by the cached oinfo */ + oinfo = osc->oo_oinfo; + + LDLM_DEBUG(lock, "update lock size %llu blocks %llu [cma]time: " + "%llu %llu %llu by oinfo size %llu blocks %llu " + "[cma]time %llu %llu %llu", lvb->lvb_size, + lvb->lvb_blocks, lvb->lvb_ctime, lvb->lvb_mtime, + lvb->lvb_atime, oinfo->loi_lvb.lvb_size, + oinfo->loi_lvb.lvb_blocks, oinfo->loi_lvb.lvb_ctime, + oinfo->loi_lvb.lvb_mtime, oinfo->loi_lvb.lvb_atime); + LASSERT(oinfo->loi_lvb.lvb_size >= oinfo->loi_kms); + + cl_object_attr_lock(&osc->oo_cl); + memcpy(lvb, &oinfo->loi_lvb, sizeof(oinfo->loi_lvb)); + cl_object_attr_unlock(&osc->oo_cl); + ldlm_clear_lvb_cached(lock); } RETURN(LDLM_ITER_CONTINUE); } @@ -1251,6 +1482,17 @@ int mdc_object_prune(const struct lu_env *env, struct cl_object *obj) return 0; } +static int mdc_object_flush(const struct lu_env *env, struct cl_object *obj, + struct ldlm_lock *lock) +{ + /* if lock cancel is initiated from llite then it is combined + * lock with DOM bit and it may have no l_ast_data initialized yet, + * so init it here with given osc_object. + */ + mdc_set_dom_lock_data(lock, cl2osc(obj)); + RETURN(mdc_dlm_canceling(env, lock)); +} + static const struct cl_object_operations mdc_ops = { .coo_page_init = osc_page_init, .coo_lock_init = mdc_lock_init, @@ -1260,6 +1502,7 @@ static const struct cl_object_operations mdc_ops = { .coo_glimpse = osc_object_glimpse, .coo_req_attr_set = mdc_req_attr_set, .coo_prune = mdc_object_prune, + .coo_object_flush = mdc_object_flush }; static const struct osc_object_operations mdc_object_ops = { @@ -1315,15 +1558,17 @@ struct lu_object *mdc_object_alloc(const struct lu_env *env, return obj; } -static int mdc_cl_process_config(const struct lu_env *env, - struct lu_device *d, struct lustre_cfg *cfg) +static int mdc_process_config(const struct lu_env *env, struct lu_device *d, + struct lustre_cfg *cfg) { - return mdc_process_config(d->ld_obd, 0, cfg); + size_t count = class_modify_config(cfg, PARAM_MDC, + &d->ld_obd->obd_kset.kobj); + return count > 0 ? 0 : count; } const struct lu_device_operations mdc_lu_ops = { .ldo_object_alloc = mdc_object_alloc, - .ldo_process_config = mdc_cl_process_config, + .ldo_process_config = mdc_process_config, .ldo_recovery_complete = NULL, };