__u64 try_bits; /* optional bits to try */
__u64 cancel_bits; /* for lock convert */
};
+ __u64 li_gid;
};
struct ldlm_flock_wire {
*/
static int
ldlm_inodebits_compat_queue(struct list_head *queue, struct ldlm_lock *req,
- struct list_head *work_list)
+ __u64 *ldlm_flags, struct list_head *work_list)
{
+ enum ldlm_mode req_mode = req->l_req_mode;
struct list_head *tmp;
struct ldlm_lock *lock;
__u64 req_bits = req->l_policy_data.l_inodebits.bits;
ENTRY;
+ lockmode_verify(req_mode);
+
/* There is no sense in lock with no bits set. Also such a lock
* would be compatible with any other bit lock.
* Meanwhile that can be true if there were just try_bits and all
continue;
}
- /* locks' mode are compatible, bits don't matter */
- if (lockmode_compat(lock->l_req_mode, req->l_req_mode)) {
- /* jump to last lock in mode group */
- tmp = mode_tail;
+ if (lockmode_compat(lock->l_req_mode, req_mode)) {
+ /* non group locks are compatible, bits don't matter */
+ if (likely(req_mode != LCK_GROUP)) {
+ /* jump to last lock in mode group */
+ tmp = mode_tail;
+ continue;
+ }
+
+ if (req->l_policy_data.l_inodebits.li_gid ==
+ lock->l_policy_data.l_inodebits.li_gid) {
+ if (ldlm_is_granted(lock))
+ RETURN(2);
+
+ if (*ldlm_flags & LDLM_FL_BLOCK_NOWAIT)
+ RETURN(-EWOULDBLOCK);
+
+ /* Place the same group together */
+ ldlm_resource_insert_lock_after(lock, req);
+ RETURN(0);
+ }
+ }
+
+ /* GROUP locks are placed to a head of the waiting list, but
+ * grouped by gid. */
+ if (unlikely(req_mode == LCK_GROUP && !ldlm_is_granted(lock))) {
+ compat = 0;
+ if (lock->l_req_mode != LCK_GROUP) {
+ /* Already not a GROUP lock, insert before. */
+ ldlm_resource_insert_lock_before(lock, req);
+ break;
+ }
+ /* Still GROUP but a different gid(the same gid would
+ * be handled above). Keep searching for the same gid */
+ LASSERT(req->l_policy_data.l_inodebits.li_gid !=
+ lock->l_policy_data.l_inodebits.li_gid);
continue;
}
!ldlm_is_cos_incompat(req) &&
ldlm_is_cos_enabled(req) &&
lock->l_client_cookie == req->l_client_cookie)
- goto not_conflicting;
+ goto skip_work_list;
+
+ compat = 0;
+
+ if (unlikely(lock->l_req_mode == LCK_GROUP)) {
+ LASSERT(ldlm_has_dom(lock));
+
+ if (*ldlm_flags & LDLM_FL_BLOCK_NOWAIT)
+ RETURN(-EWOULDBLOCK);
+
+ goto skip_work_list;
+ }
/* Found a conflicting policy group. */
if (!work_list)
RETURN(0);
- compat = 0;
-
/* Add locks of the policy group to @work_list
* as blocking locks for @req */
if (lock->l_blocking_ast)
ldlm_add_ast_work_item(lock,
req, work_list);
}
-not_conflicting:
+skip_work_list:
if (tmp == mode_tail)
break;
* waiting queues. The lock is granted if no conflicts are found in
* either queue.
*/
-int ldlm_process_inodebits_lock(struct ldlm_lock *lock, __u64 *flags,
+int ldlm_process_inodebits_lock(struct ldlm_lock *lock, __u64 *ldlm_flags,
enum ldlm_process_intention intention,
enum ldlm_error *err,
struct list_head *work_list)
struct ldlm_resource *res = lock->l_resource;
struct list_head *grant_work = intention == LDLM_PROCESS_ENQUEUE ?
NULL : work_list;
- int rc;
+ int rc, rc2 = 0;
ENTRY;
*err = ELDLM_LOCK_ABORTED;
if (intention == LDLM_PROCESS_RESCAN) {
struct list_head *bl_list =
- *flags & LDLM_FL_BLOCK_NOWAIT ? NULL : work_list;
+ *ldlm_flags & LDLM_FL_BLOCK_NOWAIT ? NULL : work_list;
LASSERT(lock->l_policy_data.l_inodebits.bits != 0);
* any blocked locks from granted queue during every reprocess
* and bl_ast will be sent if needed.
*/
+ *ldlm_flags = 0;
rc = ldlm_inodebits_compat_queue(&res->lr_granted, lock,
- bl_list);
+ ldlm_flags, bl_list);
if (!rc)
RETURN(LDLM_ITER_STOP);
- rc = ldlm_inodebits_compat_queue(&res->lr_waiting, lock, NULL);
+ rc = ldlm_inodebits_compat_queue(&res->lr_waiting, lock,
+ ldlm_flags, NULL);
if (!rc)
RETURN(LDLM_ITER_STOP);
lock->l_policy_data.l_inodebits.bits |=
lock->l_policy_data.l_inodebits.try_bits;
lock->l_policy_data.l_inodebits.try_bits = 0;
- *flags |= LDLM_FL_LOCK_CHANGED;
+ *ldlm_flags |= LDLM_FL_LOCK_CHANGED;
}
ldlm_resource_unlink_lock(lock);
ldlm_grant_lock(lock, grant_work);
RETURN(LDLM_ITER_CONTINUE);
}
- rc = ldlm_inodebits_compat_queue(&res->lr_granted, lock, work_list);
- rc += ldlm_inodebits_compat_queue(&res->lr_waiting, lock, work_list);
+ rc = ldlm_inodebits_compat_queue(&res->lr_granted, lock,
+ ldlm_flags, work_list);
+ if (rc < 0)
+ GOTO(out, *err = rc);
if (rc != 2) {
+ rc2 = ldlm_inodebits_compat_queue(&res->lr_waiting, lock,
+ ldlm_flags, work_list);
+ if (rc2 < 0)
+ GOTO(out, *err = rc = rc2);
+ }
+
+ if (rc + rc2 != 2) {
/* if there were only bits to try and all are conflicting */
if ((lock->l_policy_data.l_inodebits.bits |
lock->l_policy_data.l_inodebits.try_bits)) {
- /* There is no sense to set LDLM_FL_NO_TIMEOUT to @flags
- * for DOM lock while they are enqueued through intents,
- * i.e. @lock here is local which does not timeout. */
+ /* There is no sense to set LDLM_FL_NO_TIMEOUT to
+ * @ldlm_flags for DOM lock while they are enqueued
+ * through intents, i.e. @lock here is local which does
+ * not timeout. */
*err = ELDLM_OK;
}
} else {
lock->l_policy_data.l_inodebits.bits |=
lock->l_policy_data.l_inodebits.try_bits;
lock->l_policy_data.l_inodebits.try_bits = 0;
- *flags |= LDLM_FL_LOCK_CHANGED;
+ *ldlm_flags |= LDLM_FL_LOCK_CHANGED;
}
LASSERT(lock->l_policy_data.l_inodebits.bits);
ldlm_resource_unlink_lock(lock);
}
RETURN(LDLM_ITER_CONTINUE);
+out:
+ return rc;
}
#endif /* HAVE_SERVER_SUPPORT */
union ldlm_policy_data *lpolicy)
{
lpolicy->l_inodebits.bits = wpolicy->l_inodebits.bits;
+ lpolicy->l_inodebits.li_gid = wpolicy->l_inodebits.li_gid;
/**
* try_bits are to be handled outside of generic write_to_local due
* to different behavior on a server and client.
memset(wpolicy, 0, sizeof(*wpolicy));
wpolicy->l_inodebits.bits = lpolicy->l_inodebits.bits;
wpolicy->l_inodebits.try_bits = lpolicy->l_inodebits.try_bits;
+ wpolicy->l_inodebits.li_gid = lpolicy->l_inodebits.li_gid;
}
/**
}
void ldlm_inodebits_add_lock(struct ldlm_resource *res, struct list_head *head,
- struct ldlm_lock *lock)
+ struct ldlm_lock *lock, bool tail)
{
int i;
if (head == &res->lr_waiting) {
for (i = 0; i < MDS_INODELOCK_NUMBITS; i++) {
- if (lock->l_policy_data.l_inodebits.bits & BIT(i))
+ if (!(lock->l_policy_data.l_inodebits.bits & BIT(i)))
+ continue;
+ if (tail)
list_add_tail(&lock->l_ibits_node->lin_link[i],
- &res->lr_ibits_queues->liq_waiting[i]);
+ &res->lr_ibits_queues->liq_waiting[i]);
+ else
+ list_add(&lock->l_ibits_node->lin_link[i],
+ &res->lr_ibits_queues->liq_waiting[i]);
}
} else if (head == &res->lr_granted && lock->l_ibits_node != NULL) {
for (i = 0; i < MDS_INODELOCK_NUMBITS; i++)
LASSERT(list_empty(&lock->l_ibits_node->lin_link[i]));
OBD_SLAB_FREE_PTR(lock->l_ibits_node, ldlm_inodebits_slab);
lock->l_ibits_node = NULL;
+ } else if (head != &res->lr_granted) {
+ /* we are inserting in a middle of a list, after @head */
+ struct ldlm_lock *orig = list_entry(head, struct ldlm_lock,
+ l_res_link);
+ LASSERT(orig->l_policy_data.l_inodebits.bits ==
+ lock->l_policy_data.l_inodebits.bits);
+ /* The is no a use case to insert before with exactly matched
+ * set of bits */
+ LASSERT(tail == false);
+
+ for (i = 0; i < MDS_INODELOCK_NUMBITS; i++) {
+ if (!(lock->l_policy_data.l_inodebits.bits & (1 << i)))
+ continue;
+ list_add(&lock->l_ibits_node->lin_link[i],
+ &orig->l_ibits_node->lin_link[i]);
+ }
}
}
int ldlm_inodebits_alloc_lock(struct ldlm_lock *lock);
void ldlm_inodebits_add_lock(struct ldlm_resource *res, struct list_head *head,
- struct ldlm_lock *lock);
+ struct ldlm_lock *lock, bool tail);
void ldlm_inodebits_unlink_lock(struct ldlm_lock *lock);
/* ldlm_flock.c */
data->lmd_policy->l_inodebits.bits) !=
data->lmd_policy->l_inodebits.bits)
return INTERVAL_ITER_CONT;
+
+ if (unlikely(match == LCK_GROUP) &&
+ data->lmd_policy->l_inodebits.li_gid != LDLM_GID_ANY &&
+ lpol->l_inodebits.li_gid !=
+ data->lmd_policy->l_inodebits.li_gid)
+ return INTERVAL_ITER_CONT;
break;
default:
;
switch (resource->lr_type) {
case LDLM_EXTENT:
libcfs_debug_msg(msgdata,
- "%pV ns: %s lock: %p/%#llx lrc: %d/%d,%d mode: %s/%s res: " DLDLMRES " rrc: %d type: %s [%llu->%llu] (req %llu->%llu) flags: %#llx nid: %s remote: %#llx expref: %d pid: %u timeout: %lld lvb_type: %d\n",
+ "%pV ns: %s lock: %p/%#llx lrc: %d/%d,%d mode: %s/%s res: " DLDLMRES " rrc: %d type: %s [%llu->%llu] (req %llu->%llu) gid %llu flags: %#llx nid: %s remote: %#llx expref: %d pid: %u timeout: %lld lvb_type: %d\n",
&vaf,
ldlm_lock_to_ns_name(lock), lock,
lock->l_handle.h_cookie,
lock->l_policy_data.l_extent.start,
lock->l_policy_data.l_extent.end,
lock->l_req_extent.start, lock->l_req_extent.end,
+ lock->l_req_extent.gid,
lock->l_flags, nid,
lock->l_remote_handle.cookie,
exp ? refcount_read(&exp->exp_handle.h_ref) : -99,
case LDLM_IBITS:
libcfs_debug_msg(msgdata,
- "%pV ns: %s lock: %p/%#llx lrc: %d/%d,%d mode: %s/%s res: " DLDLMRES " bits %#llx/%#llx rrc: %d type: %s flags: %#llx nid: %s remote: %#llx expref: %d pid: %u timeout: %lld lvb_type: %d\n",
+ "%pV ns: %s lock: %p/%#llx lrc: %d/%d,%d mode: %s/%s res: " DLDLMRES " bits %#llx/%#llx rrc: %d type: %s gid %llu flags: %#llx nid: %s remote: %#llx expref: %d pid: %u timeout: %lld lvb_type: %d\n",
&vaf,
ldlm_lock_to_ns_name(lock),
lock, lock->l_handle.h_cookie,
lock->l_policy_data.l_inodebits.try_bits,
atomic_read(&resource->lr_refcount),
ldlm_typename[resource->lr_type],
+ lock->l_policy_data.l_inodebits.li_gid,
lock->l_flags, nid,
lock->l_remote_handle.cookie,
exp ? refcount_read(&exp->exp_handle.h_ref) : -99,
list_add(&lock->l_res_link, head);
if (res->lr_type == LDLM_IBITS)
- ldlm_inodebits_add_lock(res, head, lock);
+ ldlm_inodebits_add_lock(res, head, lock, tail);
ldlm_resource_dump(D_INFO, res);
}
/**
* Insert a lock into resource before the specified lock.
+ *
+ * IBITS waiting locks are to be inserted to the ibit lists as well, and only
+ * the insert-after operation is supported for them, because the set of bits
+ * of the previous and the new locks must match. Therefore, get the previous
+ * lock and insert after.
*/
void ldlm_resource_insert_lock_before(struct ldlm_lock *original,
struct ldlm_lock *new)
#include "mdc_internal.h"
static void mdc_lock_build_policy(const struct lu_env *env,
+ const struct cl_lock *lock,
union ldlm_policy_data *policy)
{
memset(policy, 0, sizeof *policy);
policy->l_inodebits.bits = MDS_INODELOCK_DOM;
+ if (lock) {
+ policy->l_inodebits.li_gid = lock->cll_descr.cld_gid;
+ }
}
int mdc_ldlm_glimpse_ast(struct ldlm_lock *dlmlock, void *data)
ENTRY;
fid_build_reg_res_name(lu_object_fid(osc2lu(obj)), resname);
- mdc_lock_build_policy(env, policy);
+ mdc_lock_build_policy(env, NULL, policy);
+ policy->l_inodebits.li_gid = LDLM_GID_ANY;
flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING;
if (dap_flags & OSC_DAP_FL_TEST_LOCK)
* osc_lock.
*/
fid_build_reg_res_name(lu_object_fid(osc2lu(osc)), resname);
- mdc_lock_build_policy(env, policy);
+ mdc_lock_build_policy(env, lock, policy);
LASSERT(!oscl->ols_speculative);
result = mdc_enqueue_send(env, osc_export(osc), resname,
&oscl->ols_flags, policy,
ols->ols_flags = flags;
ols->ols_speculative = !!(enqflags & CEF_SPECULATIVE);
+ if (lock->cll_descr.cld_mode == CLM_GROUP)
+ ols->ols_flags |= LDLM_FL_ATOMIC_CB;
if (ols->ols_flags & LDLM_FL_HAS_INTENT) {
ols->ols_flags |= LDLM_FL_BLOCK_GRANTED;
lh->mlh_type = MDT_REG_LOCK;
}
+void mdt_lh_reg_init(struct mdt_lock_handle *lh, struct ldlm_lock *lock)
+{
+ mdt_lock_reg_init(lh, lock->l_req_mode);
+ if (lock->l_req_mode == LCK_GROUP)
+ lh->mlh_gid = lock->l_policy_data.l_inodebits.li_gid;
+}
+
void mdt_lock_pdo_init(struct mdt_lock_handle *lh, enum ldlm_mode lock_mode,
const struct lu_name *lname)
{
policy->l_inodebits.bits = *ibits;
policy->l_inodebits.try_bits = trybits;
+ policy->l_inodebits.li_gid = lh->mlh_gid;
/*
* Use LDLM_FL_LOCAL_ONLY for this lock. We do not know yet if it is
m->mdt_skip_lfsck = 1;
}
- /* DoM files get IO lock at open optionally by default */
- m->mdt_opts.mo_dom_lock = ALWAYS_DOM_LOCK_ON_OPEN;
+ /* Just try to get a DoM lock by default. Otherwise, having a group
+ * lock granted, it may get blocked for a long time. */
+ m->mdt_opts.mo_dom_lock = TRYLOCK_DOM_ON_OPEN;
/* DoM files are read at open and data is packed in the reply */
m->mdt_opts.mo_dom_read_open = 1;
/* Regular lock */
struct lustre_handle mlh_reg_lh;
enum ldlm_mode mlh_reg_mode;
+ __u64 mlh_gid;
/* Pdirops lock */
struct lustre_handle mlh_pdo_lh;
const struct lu_name *lname);
void mdt_lock_reg_init(struct mdt_lock_handle *lh, enum ldlm_mode lm);
+void mdt_lh_reg_init(struct mdt_lock_handle *lh, struct ldlm_lock *lock);
int mdt_lock_setup(struct mdt_thread_info *info, struct mdt_object *mo,
struct mdt_lock_handle *lh);
/* resent case */
if (!lustre_handle_is_used(&lhc->mlh_reg_lh)) {
mdt_lock_handle_init(lhc);
- mdt_lock_reg_init(lhc, (*lockp)->l_req_mode);
+ mdt_lh_reg_init(lhc, *lockp);
+
/* This will block MDT thread but it should be fine until
* client caches small amount of data for DoM, which should be
* smaller than one BRW RPC and should be able to be
(long long)(int)sizeof(((struct ldlm_extent *)0)->gid));
/* Checks for struct ldlm_inodebits */
- LASSERTF((int)sizeof(struct ldlm_inodebits) == 16, "found %lld\n",
+ LASSERTF((int)sizeof(struct ldlm_inodebits) == 24, "found %lld\n",
(long long)(int)sizeof(struct ldlm_inodebits));
LASSERTF((int)offsetof(struct ldlm_inodebits, bits) == 0, "found %lld\n",
(long long)(int)offsetof(struct ldlm_inodebits, bits));
(long long)(int)offsetof(struct ldlm_inodebits, try_bits));
LASSERTF((int)sizeof(((struct ldlm_inodebits *)0)->try_bits) == 8, "found %lld\n",
(long long)(int)sizeof(((struct ldlm_inodebits *)0)->try_bits));
+ LASSERTF((int)offsetof(struct ldlm_inodebits, li_gid) == 16, "found %lld\n",
+ (long long)(int)offsetof(struct ldlm_inodebits, li_gid));
+
+ LASSERTF((int)sizeof(((struct ldlm_inodebits *)0)->li_gid) == 8, "found %lld\n",
+ (long long)(int)sizeof(((struct ldlm_inodebits *)0)->li_gid));
/* Checks for struct ldlm_flock_wire */
LASSERTF((int)sizeof(struct ldlm_flock_wire) == 32, "found %lld\n",
test_sanityn()
{
SANITYN_ONLY=${SANITYN_ONLY:-"1 2 4 5 6 7 8 9 10 11 12 14 17 19 20 \
- 23 27 39 51a 51c 51d"}
+ 23 27 39 51a 51c 51d 107"}
SANITYN_REPEAT=${SANITYN_REPEAT:-1}
# XXX: to fix 60
ONLY=$SANITYN_ONLY ONLY_REPEAT=$SANITYN_REPEAT OSC="mdc" DOM="yes" \
}
run_test 106c "Verify statx attributes mask"
-test_107() { # LU-1031
+test_107a() { # LU-1031
dd if=/dev/zero of=$DIR1/$tfile bs=1M count=10
local gid1=14091995
local gid2=16022000
local MULTIPID2=$!
kill -USR1 $MULTIPID2
sleep 2
- if [[ `ps h -o comm -p $MULTIPID2` == "" ]]; then
+ if [[ $(ps h -o comm -p $MULTIPID2) == "" ]]; then
error "First grouplock does not block second one"
else
echo "First grouplock blocks second one"
wait $MULTIPID1
wait $MULTIPID2
}
-run_test 107 "Basic grouplock conflict"
+run_test 107a "Basic grouplock conflict"
+
+test_107b() {
+ dd if=/dev/zero of=$DIR1/$tfile bs=1M count=10
+ local gid1=14091995
+ local gid2=16022000
+
+ $LFS getstripe $DIR1/$tfile
+
+ multiop_bg_pause $DIR1/$tfile OG${gid1}_g${gid1}c || return 1
+ local MULTIPID1=$!
+ multiop $DIR2/$tfile Or10c &
+ local MULTIPID2=$!
+ sleep 2
+
+ if [[ $(ps h -o comm -p $MULTIPID2) == "" ]]; then
+ error "Grouplock does not block IO"
+ else
+ echo "Grouplock blocks IO"
+ fi
+
+ multiop $DIR2/$tfile OG${gid2}_g${gid2}c &
+ local MULTIPID3=$!
+ sleep 2
+ if [[ $(ps h -o comm -p $MULTIPID3) == "" ]]; then
+ error "First grouplock does not block second one"
+ else
+ echo "First grouplock blocks second one"
+ fi
+
+ kill -USR1 $MULTIPID1
+ sleep 2
+
+ if [[ $(ps h -o comm -p $MULTIPID3) == "" ]]; then
+ error "Second grouplock thread disappeared"
+ fi
+
+ if [[ $(ps h -o comm -p $MULTIPID2) == "" ]]; then
+ error "Second grouplock does not block IO"
+ else
+ echo "Second grouplock blocks IO"
+ fi
+
+ kill -USR1 $MULTIPID3
+ wait $MULTIPID1
+ wait $MULTIPID2
+ wait $MULTIPID3
+}
+run_test 107b "Grouplock is added to the head of waiting list"
log "cleanup: ======================================================"
CHECK_STRUCT(ldlm_inodebits);
CHECK_MEMBER(ldlm_inodebits, bits);
CHECK_MEMBER(ldlm_inodebits, try_bits);
+ CHECK_MEMBER(ldlm_inodebits, li_gid);
}
static void
(long long)(int)sizeof(((struct ldlm_extent *)0)->gid));
/* Checks for struct ldlm_inodebits */
- LASSERTF((int)sizeof(struct ldlm_inodebits) == 16, "found %lld\n",
+ LASSERTF((int)sizeof(struct ldlm_inodebits) == 24, "found %lld\n",
(long long)(int)sizeof(struct ldlm_inodebits));
LASSERTF((int)offsetof(struct ldlm_inodebits, bits) == 0, "found %lld\n",
(long long)(int)offsetof(struct ldlm_inodebits, bits));
(long long)(int)offsetof(struct ldlm_inodebits, try_bits));
LASSERTF((int)sizeof(((struct ldlm_inodebits *)0)->try_bits) == 8, "found %lld\n",
(long long)(int)sizeof(((struct ldlm_inodebits *)0)->try_bits));
+ LASSERTF((int)offsetof(struct ldlm_inodebits, li_gid) == 16, "found %lld\n",
+ (long long)(int)offsetof(struct ldlm_inodebits, li_gid));
+ LASSERTF((int)sizeof(((struct ldlm_inodebits *)0)->li_gid) == 8, "found %lld\n",
+ (long long)(int)sizeof(((struct ldlm_inodebits *)0)->li_gid));
+
/* Checks for struct ldlm_flock_wire */
LASSERTF((int)sizeof(struct ldlm_flock_wire) == 32, "found %lld\n",