-static int lov_lock_sub_init(const struct lu_env *env,
- struct lov_lock *lck, const struct cl_io *io)
-{
- int result = 0;
- int i;
- int nr;
- obd_off start;
- obd_off end;
- obd_off file_start;
- obd_off file_end;
-
- struct lov_object *loo = cl2lov(lck->lls_cl.cls_obj);
- struct lov_layout_raid0 *r0 = lov_r0(loo);
- struct cl_lock *parent = lck->lls_cl.cls_lock;
-
- ENTRY;
-
- lck->lls_orig = parent->cll_descr;
- file_start = cl_offset(lov2cl(loo), parent->cll_descr.cld_start);
- file_end = cl_offset(lov2cl(loo), parent->cll_descr.cld_end + 1) - 1;
-
- for (i = 0, nr = 0; i < r0->lo_nr; i++) {
- /*
- * XXX for wide striping smarter algorithm is desirable,
- * breaking out of the loop, early.
- */
- if (lov_stripe_intersects(r0->lo_lsm, i,
- file_start, file_end, &start, &end))
- nr++;
- }
- LASSERT(nr > 0);
- OBD_ALLOC(lck->lls_sub, nr * sizeof lck->lls_sub[0]);
- if (lck->lls_sub == NULL)
- RETURN(-ENOMEM);
-
- lck->lls_nr = nr;
- /*
- * First, fill in sub-lock descriptions in
- * lck->lls_sub[].sub_descr. They are used by lov_sublock_alloc()
- * (called below in this function, and by lov_lock_enqueue()) to
- * create sub-locks. At this moment, no other thread can access
- * top-lock.
- */
- for (i = 0, nr = 0; i < r0->lo_nr; ++i) {
- if (lov_stripe_intersects(r0->lo_lsm, i,
- file_start, file_end, &start, &end)) {
- struct cl_lock_descr *descr;
-
- descr = &lck->lls_sub[nr].sub_descr;
-
- LASSERT(descr->cld_obj == NULL);
- descr->cld_obj = lovsub2cl(r0->lo_sub[i]);
- descr->cld_start = cl_index(descr->cld_obj, start);
- descr->cld_end = cl_index(descr->cld_obj, end);
- descr->cld_mode = parent->cll_descr.cld_mode;
- descr->cld_gid = parent->cll_descr.cld_gid;
- /* XXX has no effect */
- lck->lls_sub[nr].sub_got = *descr;
- lck->lls_sub[nr].sub_stripe = i;
- nr++;
- }
- }
- LASSERT(nr == lck->lls_nr);
- /*
- * Then, create sub-locks. Once at least one sub-lock was created,
- * top-lock can be reached by other threads.
- */
- for (i = 0; i < lck->lls_nr; ++i) {
- struct cl_lock *sublock;
- struct lov_lock_link *link;
-
- if (lck->lls_sub[i].sub_lock == NULL) {
- sublock = lov_sublock_alloc(env, io, lck, i, &link);
- if (IS_ERR(sublock)) {
- result = PTR_ERR(sublock);
- break;
- }
- cl_lock_mutex_get(env, sublock);
- cl_lock_mutex_get(env, parent);
- /*
- * recheck under mutex that sub-lock wasn't created
- * concurrently, and that top-lock is still alive.
- */
- if (lck->lls_sub[i].sub_lock == NULL &&
- parent->cll_state < CLS_FREEING) {
- lov_sublock_adopt(env, lck, sublock, i, link);
- cl_lock_mutex_put(env, parent);
- } else {
- OBD_SLAB_FREE_PTR(link, lov_lock_link_kmem);
- cl_lock_mutex_put(env, parent);
- cl_lock_unhold(env, sublock,
- "lov-parent", parent);
- }
- cl_lock_mutex_put(env, sublock);
- }
- }
- /*
- * Some sub-locks can be missing at this point. This is not a problem,
- * because enqueue will create them anyway. Main duty of this function
- * is to fill in sub-lock descriptions in a race free manner.
- */
- RETURN(result);
-}
-
-static int lov_sublock_release(const struct lu_env *env, struct lov_lock *lck,
- int i, int deluser, int rc)
-{
- struct cl_lock *parent = lck->lls_cl.cls_lock;
-
- LASSERT(cl_lock_is_mutexed(parent));
- ENTRY;
-
- if (lck->lls_sub[i].sub_flags & LSF_HELD) {
- struct cl_lock *sublock;
- int dying;
-
- LASSERT(lck->lls_sub[i].sub_lock != NULL);
- sublock = lck->lls_sub[i].sub_lock->lss_cl.cls_lock;
- LASSERT(cl_lock_is_mutexed(sublock));
-
- lck->lls_sub[i].sub_flags &= ~LSF_HELD;
- if (deluser)
- cl_lock_user_del(env, sublock);
- /*
- * If the last hold is released, and cancellation is pending
- * for a sub-lock, release parent mutex, to avoid keeping it
- * while sub-lock is being paged out.
- */
- dying = (sublock->cll_descr.cld_mode == CLM_PHANTOM ||
- sublock->cll_descr.cld_mode == CLM_GROUP ||
- (sublock->cll_flags & (CLF_CANCELPEND|CLF_DOOMED))) &&
- sublock->cll_holds == 1;
- if (dying)
- cl_lock_mutex_put(env, parent);
- cl_lock_unhold(env, sublock, "lov-parent", parent);
- if (dying) {
- cl_lock_mutex_get(env, parent);
- rc = lov_subresult(rc, CLO_REPEAT);
- }
- /*
- * From now on lck->lls_sub[i].sub_lock is a "weak" pointer,
- * not backed by a reference on a
- * sub-lock. lovsub_lock_delete() will clear
- * lck->lls_sub[i].sub_lock under semaphores, just before
- * sub-lock is destroyed.
- */
- }
- RETURN(rc);
-}
-
-static void lov_sublock_hold(const struct lu_env *env, struct lov_lock *lck,
- int i)