* GPL HEADER END
*/
/*
- * Copyright 2008 Sun Microsystems, Inc. All rights reserved.
+ * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*/
/*
* Author: Nikita Danilov <nikita.danilov@sun.com>
*/
-/** \addtogroup lov lov @{ */
-
#define DEBUG_SUBSYSTEM S_LOV
#include "lov_cl_internal.h"
+/** \addtogroup lov
+ * @{
+ */
+
/*****************************************************************************
*
* Layout operations.
lov->u = *state;
}
-static void oinfo_get_fid(const struct lov_oinfo *oinfo, struct lu_fid *fid)
-{
- __u64 idx = oinfo->loi_id;
-
- /* See idif definition in wiki:CMD3_interoperability_architecture */
-
- LASSERT(oinfo->loi_gr < 1ULL << 16);
- LASSERT(oinfo->loi_id < 1ULL << 49);
- ENTRY;
-
- /*
- * Now that the fid of stripe is not unique now, ost_idx have to
- * be used to make it unique. This is ok because the stripe fids
- * are just used in client side(to locate the objects). -jay
- */
- fid->f_seq = ((__u64)oinfo->loi_ost_idx) << 32 |
- oinfo->loi_gr << 16 | idx >> 32;
- fid->f_oid = idx; /* truncated to 32 bits by assignment */
- fid->f_ver = 0;
- EXIT;
-}
-
static struct cl_object *lov_sub_find(const struct lu_env *env,
struct cl_device *dev,
const struct lu_fid *fid,
parent = subhdr->coh_parent;
oinfo = r0->lo_lsm->lsm_oinfo[idx];
- CDEBUG(D_INODE, DFID"@%p[%d] -> "DFID"@%p: id: "LPU64" gr: "LPU64
+ CDEBUG(D_INODE, DFID"@%p[%d] -> "DFID"@%p: id: "LPU64" seq: "LPU64
" idx: %d gen: %d\n",
PFID(&subhdr->coh_lu.loh_fid), subhdr, idx,
PFID(&hdr->coh_lu.loh_fid), hdr,
- oinfo->loi_id, oinfo->loi_gr,
+ oinfo->loi_id, oinfo->loi_seq,
oinfo->loi_ost_idx, oinfo->loi_ost_gen);
if (parent == NULL) {
r0->lo_sub[idx]->lso_index = idx;
result = 0;
} else {
- CERROR("Stripe is already owned by other file (%i).\n", idx);
+ CERROR("Stripe is already owned by other file (%d).\n", idx);
LU_OBJECT_DEBUG(D_ERROR, env, &stripe->co_lu, "\n");
LU_OBJECT_DEBUG(D_ERROR, env, lu_object_top(&parent->coh_lu),
"old\n");
r0->lo_lsm = conf->u.coc_md->lsm;
LASSERT(r0->lo_nr <= lov_targets_nr(dev));
- OBD_ALLOC(r0->lo_sub, r0->lo_nr * sizeof r0->lo_sub[0]);
+ OBD_ALLOC_LARGE(r0->lo_sub, r0->lo_nr * sizeof r0->lo_sub[0]);
if (r0->lo_sub != NULL) {
result = 0;
subconf->coc_inode = conf->coc_inode;
+ cfs_spin_lock_init(&r0->lo_sub_lock);
/*
* Create stripe cl_objects.
*/
struct lov_oinfo *oinfo = lsm->lsm_oinfo[i];
int ost_idx = oinfo->loi_ost_idx;
- oinfo_get_fid(oinfo, ofid);
+ fid_ostid_unpack(ofid, &oinfo->loi_oi,
+ oinfo->loi_ost_idx);
subdev = lovsub2cl_dev(dev->ld_target[ost_idx]);
subconf->u.coc_oinfo = oinfo;
LASSERTF(subdev != NULL, "not init ost %d\n", ost_idx);
struct cl_object *sub;
struct lov_layout_raid0 *r0;
struct lu_site *site;
+ struct lu_site_bkt_data *bkt;
cfs_waitlink_t *waiter;
r0 = &lov->u.raid0;
sub = lovsub2cl(los);
site = sub->co_lu.lo_dev->ld_site;
+ bkt = lu_site_bkt_from_fid(site, &sub->co_lu.lo_header->loh_fid);
cl_object_kill(env, sub);
/* release a reference to the sub-object and ... */
if (r0->lo_sub[idx] == los) {
waiter = &lov_env_info(env)->lti_waiter;
cfs_waitlink_init(waiter);
- cfs_waitq_add(&site->ls_marche_funebre, waiter);
- set_current_state(CFS_TASK_UNINT);
-
- while (r0->lo_sub[idx] == los)
+ cfs_waitq_add(&bkt->lsb_marche_funebre, waiter);
+ cfs_set_current_state(CFS_TASK_UNINT);
+ while (1) {
/* this wait-queue is signaled at the end of
* lu_object_free(). */
- cfs_waitq_wait(waiter, CFS_TASK_UNINT);
- cfs_waitq_del(&site->ls_marche_funebre, waiter);
+ cfs_set_current_state(CFS_TASK_UNINT);
+ cfs_spin_lock(&r0->lo_sub_lock);
+ if (r0->lo_sub[idx] == los) {
+ cfs_spin_unlock(&r0->lo_sub_lock);
+ cfs_waitq_wait(waiter, CFS_TASK_UNINT);
+ } else {
+ cfs_spin_unlock(&r0->lo_sub_lock);
+ cfs_set_current_state(CFS_TASK_RUNNING);
+ break;
+ }
+ }
+ cfs_waitq_del(&bkt->lsb_marche_funebre, waiter);
}
LASSERT(r0->lo_sub[idx] == NULL);
}
ENTRY;
if (r0->lo_sub != NULL) {
- OBD_FREE(r0->lo_sub, r0->lo_nr * sizeof r0->lo_sub[0]);
+ OBD_FREE_LARGE(r0->lo_sub, r0->lo_nr * sizeof r0->lo_sub[0]);
r0->lo_sub = NULL;
}
EXIT;
\
__lock &= __obj->lo_owner != cfs_current(); \
if (__lock) \
- down_read(&__obj->lo_type_guard); \
+ cfs_down_read(&__obj->lo_type_guard); \
__result = LOV_2DISPATCH_NOLOCK(obj, op, __VA_ARGS__); \
if (__lock) \
- up_read(&__obj->lo_type_guard); \
+ cfs_up_read(&__obj->lo_type_guard); \
__result; \
})
enum lov_layout_type __llt; \
\
if (__obj->lo_owner != cfs_current()) \
- down_read(&__obj->lo_type_guard); \
+ cfs_down_read(&__obj->lo_type_guard); \
__llt = __obj->lo_type; \
LASSERT(0 <= __llt && __llt < ARRAY_SIZE(lov_dispatch)); \
lov_dispatch[__llt].op(__VA_ARGS__); \
if (__obj->lo_owner != cfs_current()) \
- up_read(&__obj->lo_type_guard); \
+ cfs_up_read(&__obj->lo_type_guard); \
} while (0)
static int lov_layout_change(const struct lu_env *env,
cl_env_reexit(cookie);
old_ops->llo_fini(env, obj, &obj->u);
- LASSERT(list_empty(&hdr->coh_locks));
+ LASSERT(cfs_list_empty(&hdr->coh_locks));
LASSERT(hdr->coh_tree.rnode == NULL);
LASSERT(hdr->coh_pages == 0);
int result;
ENTRY;
- init_rwsem(&lov->lo_type_guard);
+ cfs_init_rwsem(&lov->lo_type_guard);
/* no locking is necessary, as object is being created */
lov->lo_type = cconf->u.coc_md->lsm != NULL ? LLT_RAID0 : LLT_EMPTY;
* Currently only LLT_EMPTY -> LLT_RAID0 transition is supported.
*/
LASSERT(lov->lo_owner != cfs_current());
- down_write(&lov->lo_type_guard);
+ cfs_down_write(&lov->lo_type_guard);
LASSERT(lov->lo_owner == NULL);
lov->lo_owner = cfs_current();
if (lov->lo_type == LLT_EMPTY && conf->u.coc_md->lsm != NULL)
else
result = -EOPNOTSUPP;
lov->lo_owner = NULL;
- up_write(&lov->lo_type_guard);
+ cfs_up_write(&lov->lo_type_guard);
RETURN(result);
}
};
struct lu_object *lov_object_alloc(const struct lu_env *env,
- const struct lu_object_header *_,
+ const struct lu_object_header *unused,
struct lu_device *dev)
{
struct lov_object *lov;