#include "mdc_internal.h"
static void mdc_lock_build_policy(const struct lu_env *env,
+ const struct cl_lock *lock,
union ldlm_policy_data *policy)
{
memset(policy, 0, sizeof *policy);
policy->l_inodebits.bits = MDS_INODELOCK_DOM;
+ if (lock) {
+ policy->l_inodebits.li_gid = lock->cll_descr.cld_gid;
+ }
}
int mdc_ldlm_glimpse_ast(struct ldlm_lock *dlmlock, void *data)
struct ldlm_res_id *res_id, enum ldlm_type type,
union ldlm_policy_data *policy, enum ldlm_mode mode,
__u64 *flags, struct osc_object *obj,
- struct lustre_handle *lockh, int unref)
+ struct lustre_handle *lockh,
+ enum ldlm_match_flags match_flags)
{
struct obd_device *obd = exp->exp_obd;
__u64 lflags = *flags;
ENTRY;
- rc = ldlm_lock_match(obd->obd_namespace, lflags,
- res_id, type, policy, mode, lockh, unref);
+ rc = ldlm_lock_match_with_skip(obd->obd_namespace, lflags, 0,
+ res_id, type, policy, mode, lockh, match_flags);
+
if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
RETURN(rc);
struct ldlm_lock *lock = NULL;
enum ldlm_mode mode;
__u64 flags;
+ enum ldlm_match_flags match_flags = 0;
ENTRY;
fid_build_reg_res_name(lu_object_fid(osc2lu(obj)), resname);
- mdc_lock_build_policy(env, policy);
+ mdc_lock_build_policy(env, NULL, policy);
+ policy->l_inodebits.li_gid = LDLM_GID_ANY;
flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING;
if (dap_flags & OSC_DAP_FL_TEST_LOCK)
flags |= LDLM_FL_TEST_LOCK;
+ if (dap_flags & OSC_DAP_FL_AST)
+ match_flags |= LDLM_MATCH_AST;
+
+ if (dap_flags & OSC_DAP_FL_CANCELING)
+ match_flags |= LDLM_MATCH_UNREF;
+
again:
/* Next, search for already existing extent locks that will cover us */
/* If we're trying to read, we also search for an existing PW lock. The
* writers can share a single PW lock. */
mode = mdc_dom_lock_match(env, osc_export(obj), resname, LDLM_IBITS,
policy, LCK_PR | LCK_PW | LCK_GROUP, &flags,
- obj, &lockh,
- dap_flags & OSC_DAP_FL_CANCELING);
+ obj, &lockh, match_flags);
if (mode != 0) {
lock = ldlm_handle2lock(&lockh);
/* RACE: the lock is cancelled so let's try again */
/**
* Check if page @page is covered by an extra lock or discard it.
*/
-static int mdc_check_and_discard_cb(const struct lu_env *env, struct cl_io *io,
- struct osc_page *ops, void *cbdata)
+static bool mdc_check_and_discard_cb(const struct lu_env *env, struct cl_io *io,
+ struct osc_page *ops, void *cbdata)
{
struct osc_thread_info *info = osc_env_info(env);
struct osc_object *osc = cbdata;
/* refresh non-overlapped index */
tmp = mdc_dlmlock_at_pgoff(env, osc, index,
- OSC_DAP_FL_TEST_LOCK);
+ OSC_DAP_FL_TEST_LOCK | OSC_DAP_FL_AST);
if (tmp != NULL) {
info->oti_fn_index = CL_PAGE_EOF;
LDLM_LOCK_PUT(tmp);
}
info->oti_next_index = index + 1;
- return CLP_GANG_OKAY;
+ return true;
}
/**
struct osc_thread_info *info = osc_env_info(env);
struct cl_io *io = &info->oti_io;
osc_page_gang_cbt cb;
- int res;
int result;
ENTRY;
cb = discard ? osc_discard_cb : mdc_check_and_discard_cb;
info->oti_fn_index = info->oti_next_index = start;
- do {
- res = osc_page_gang_lookup(env, io, osc, info->oti_next_index,
- end, cb, (void *)osc);
- if (info->oti_next_index > end)
- break;
- if (res == CLP_GANG_RESCHED)
- cond_resched();
- } while (res != CLP_GANG_OKAY);
+ osc_page_gang_lookup(env, io, osc, info->oti_next_index,
+ end, cb, (void *)osc);
out:
cl_io_fini(env, io);
RETURN(result);
if (dlmlock->l_ast_data != NULL) {
obj = osc2cl(dlmlock->l_ast_data);
- dlmlock->l_ast_data = NULL;
cl_object_get(obj);
}
unlock_res_and_lock(dlmlock);
*/
/* losing a lock, update kms */
lock_res_and_lock(dlmlock);
+ dlmlock->l_ast_data = NULL;
cl_object_attr_lock(obj);
attr->cat_kms = 0;
cl_object_attr_update(env, obj, attr, CAT_KMS);
* such locks should be skipped.
*/
mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
- einfo->ei_type, policy, mode, &lockh, 0);
+ einfo->ei_type, policy, mode, &lockh);
if (mode) {
struct ldlm_lock *matched;
* osc_lock.
*/
fid_build_reg_res_name(lu_object_fid(osc2lu(osc)), resname);
- mdc_lock_build_policy(env, policy);
+ mdc_lock_build_policy(env, lock, policy);
LASSERT(!oscl->ols_speculative);
result = mdc_enqueue_send(env, osc_export(osc), resname,
&oscl->ols_flags, policy,
ols->ols_flags = flags;
ols->ols_speculative = !!(enqflags & CEF_SPECULATIVE);
+ if (lock->cll_descr.cld_mode == CLM_GROUP)
+ ols->ols_flags |= LDLM_FL_ATOMIC_CB;
if (ols->ols_flags & LDLM_FL_HAS_INTENT) {
ols->ols_flags |= LDLM_FL_BLOCK_GRANTED;
ldlm_lock_decref(&lockh, dlmlock->l_req_mode);
}
- ra->cra_rpc_size = osc_cli(osc)->cl_max_pages_per_rpc;
- ra->cra_end = CL_PAGE_EOF;
+ ra->cra_rpc_pages = osc_cli(osc)->cl_max_pages_per_rpc;
+ ra->cra_end_idx = CL_PAGE_EOF;
ra->cra_release = osc_read_ahead_release;
ra->cra_cbdata = dlmlock;
.cio_start = mdc_io_fsync_start,
.cio_end = osc_io_fsync_end,
},
+ [CIT_LSEEK] = {
+ .cio_start = osc_io_lseek_start,
+ .cio_end = osc_io_lseek_end,
+ },
},
.cio_read_ahead = mdc_io_read_ahead,
.cio_submit = osc_io_submit,
static int mdc_object_flush(const struct lu_env *env, struct cl_object *obj,
struct ldlm_lock *lock)
{
+ /* if lock cancel is initiated from llite then it is combined
+ * lock with DOM bit and it may have no l_ast_data initialized yet,
+ * so init it here with given osc_object.
+ */
+ mdc_set_dom_lock_data(lock, cl2osc(obj));
RETURN(mdc_dlm_blocking_ast0(env, lock, LDLM_CB_CANCELING));
}