union lov_layout_state *state);
int (*llo_print)(const struct lu_env *env, void *cookie,
lu_printer_t p, const struct lu_object *o);
- struct cl_page *(*llo_page_init)(const struct lu_env *env,
- struct cl_object *obj,
- struct cl_page *page,
- cfs_page_t *vmpage);
+ int (*llo_page_init)(const struct lu_env *env, struct cl_object *obj,
+ struct cl_page *page, cfs_page_t *vmpage);
int (*llo_lock_init)(const struct lu_env *env,
struct cl_object *obj, struct cl_lock *lock,
const struct cl_io *io);
union lov_layout_state *state)
{
LASSERT(lov->lo_type == LLT_EMPTY);
+ if (cfs_atomic_read(&lov->lo_active_ios) > 0)
+ RETURN(-EBUSY);
+
cl_object_prune(env, &lov->lo_cl);
return 0;
}
ENTRY;
dump_lsm(D_INODE, lsm);
- if (lov->lo_lsm_invalid && cfs_atomic_read(&lsm->lsm_refc) > 1)
+ if (cfs_atomic_read(&lov->lo_active_ios) > 0)
RETURN(-EBUSY);
if (r0->lo_sub != NULL) {
}
}
}
+ cl_object_prune(env, &lov->lo_cl);
RETURN(0);
}
static int lov_attr_get_raid0(const struct lu_env *env, struct cl_object *obj,
struct cl_attr *attr)
{
- struct lov_object *lov = cl2lov(obj);
+ struct lov_object *lov = cl2lov(obj);
struct lov_layout_raid0 *r0 = lov_r0(lov);
- struct lov_stripe_md *lsm = lov->lo_lsm;
- struct ost_lvb *lvb = &lov_env_info(env)->lti_lvb;
- __u64 kms;
- int result = 0;
+ struct cl_attr *lov_attr = &r0->lo_attr;
+ int result = 0;
ENTRY;
* can't go if locks exist. */
/* LASSERT(cfs_atomic_read(&lsm->lsm_refc) > 1); */
- if (!r0->lo_attr_valid) {
- /*
- * Fill LVB with attributes already initialized by the upper
- * layer.
- */
- cl_attr2lvb(lvb, attr);
- kms = attr->cat_kms;
-
- /*
- * XXX that should be replaced with a loop over sub-objects,
- * doing cl_object_attr_get() on them. But for now, let's
- * reuse old lov code.
- */
-
- /*
- * XXX take lsm spin-lock to keep lov_merge_lvb_kms()
- * happy. It's not needed, because new code uses
- * ->coh_attr_guard spin-lock to protect consistency of
- * sub-object attributes.
- */
- lov_stripe_lock(lsm);
- result = lov_merge_lvb_kms(lsm, lvb, &kms);
- lov_stripe_unlock(lsm);
- if (result == 0) {
- cl_lvb2attr(attr, lvb);
- attr->cat_kms = kms;
- r0->lo_attr_valid = 1;
- r0->lo_attr = *attr;
- }
- } else
- *attr = r0->lo_attr;
- RETURN(result);
+ if (!r0->lo_attr_valid) {
+ struct lov_stripe_md *lsm = lov->lo_lsm;
+ struct ost_lvb *lvb = &lov_env_info(env)->lti_lvb;
+ __u64 kms = 0;
+
+ memset(lvb, 0, sizeof(*lvb));
+ /* XXX: timestamps can be negative by sanity:test_39m,
+ * how can it be? */
+ lvb->lvb_atime = LLONG_MIN;
+ lvb->lvb_ctime = LLONG_MIN;
+ lvb->lvb_mtime = LLONG_MIN;
+
+ /*
+ * XXX that should be replaced with a loop over sub-objects,
+ * doing cl_object_attr_get() on them. But for now, let's
+ * reuse old lov code.
+ */
+
+ /*
+ * XXX take lsm spin-lock to keep lov_merge_lvb_kms()
+ * happy. It's not needed, because new code uses
+ * ->coh_attr_guard spin-lock to protect consistency of
+ * sub-object attributes.
+ */
+ lov_stripe_lock(lsm);
+ result = lov_merge_lvb_kms(lsm, lvb, &kms);
+ lov_stripe_unlock(lsm);
+ if (result == 0) {
+ cl_lvb2attr(lov_attr, lvb);
+ lov_attr->cat_kms = kms;
+ r0->lo_attr_valid = 1;
+ }
+ }
+ if (result == 0) { /* merge results */
+ attr->cat_blocks = lov_attr->cat_blocks;
+ attr->cat_size = lov_attr->cat_size;
+ attr->cat_kms = lov_attr->cat_kms;
+ if (attr->cat_atime < lov_attr->cat_atime)
+ attr->cat_atime = lov_attr->cat_atime;
+ if (attr->cat_ctime < lov_attr->cat_ctime)
+ attr->cat_ctime = lov_attr->cat_ctime;
+ if (attr->cat_mtime < lov_attr->cat_mtime)
+ attr->cat_mtime = lov_attr->cat_mtime;
+ }
+ RETURN(result);
}
const static struct lov_layout_operations lov_dispatch[] = {
static int lov_layout_wait(const struct lu_env *env, struct lov_object *lov)
{
struct l_wait_info lwi = { 0 };
- struct lov_stripe_md *lsm = lov->lo_lsm;
ENTRY;
- if (!lov->lo_lsm_invalid || lsm == NULL)
+ if (!lov->lo_layout_invalid)
RETURN(0);
- LASSERT(cfs_atomic_read(&lsm->lsm_refc) > 0);
- while (cfs_atomic_read(&lsm->lsm_refc) > 1 && lov->lo_lsm_invalid) {
+ while (cfs_atomic_read(&lov->lo_active_ios) > 0) {
lov_conf_unlock(lov);
CDEBUG(D_INODE, "file:"DFID" wait for active IO, now: %d.\n",
PFID(lu_object_fid(lov2lu(lov))),
- cfs_atomic_read(&lsm->lsm_refc));
+ cfs_atomic_read(&lov->lo_active_ios));
l_wait_event(lov->lo_waitq,
- cfs_atomic_read(&lsm->lsm_refc) == 1, &lwi);
+ cfs_atomic_read(&lov->lo_active_ios) == 0, &lwi);
lov_conf_lock(lov);
}
RETURN(0);
result = old_ops->llo_delete(env, lov, &lov->u);
if (result == 0) {
old_ops->llo_fini(env, lov, &lov->u);
+
+ LASSERT(cfs_atomic_read(&lov->lo_active_ios) == 0);
LASSERT(cfs_list_empty(&hdr->coh_locks));
LASSERT(hdr->coh_tree.rnode == NULL);
LASSERT(hdr->coh_pages == 0);
ENTRY;
init_rwsem(&lov->lo_type_guard);
+ cfs_atomic_set(&lov->lo_active_ios, 0);
cfs_waitq_init(&lov->lo_waitq);
+ cl_object_page_init(lu2cl(obj), sizeof(struct lov_page));
+
/* no locking is necessary, as object is being created */
lov->lo_type = cconf->u.coc_md->lsm != NULL ? LLT_RAID0 : LLT_EMPTY;
ops = &lov_dispatch[lov->lo_type];
lov_conf_lock(lov);
if (conf->coc_opc == OBJECT_CONF_INVALIDATE) {
- lov->lo_lsm_invalid = 1;
+ lov->lo_layout_invalid = true;
GOTO(out, result = 0);
}
(lsm != NULL && lov->lo_lsm != NULL &&
lov->lo_lsm->lsm_layout_gen == lsm->lsm_layout_gen)) {
/* same version of layout */
- lov->lo_lsm_invalid = 0;
+ lov->lo_layout_invalid = false;
GOTO(out, result = 0);
}
/* will change layout - check if there still exists active IO. */
- if (lov->lo_lsm != NULL &&
- cfs_atomic_read(&lov->lo_lsm->lsm_refc) > 1) {
- lov->lo_lsm_invalid = 1;
+ if (cfs_atomic_read(&lov->lo_active_ios) > 1) {
+ lov->lo_layout_invalid = true;
GOTO(out, result = -EBUSY);
}
default:
LBUG();
}
- lov->lo_lsm_invalid = result != 0;
+ lov->lo_layout_invalid = result != 0;
EXIT;
out:
return LOV_2DISPATCH(lu2lov(o), llo_print, env, cookie, p, o);
}
-struct cl_page *lov_page_init(const struct lu_env *env, struct cl_object *obj,
- struct cl_page *page, cfs_page_t *vmpage)
+int lov_page_init(const struct lu_env *env, struct cl_object *obj,
+ struct cl_page *page, cfs_page_t *vmpage)
{
return LOV_2DISPATCH_NOLOCK(cl2lov(obj),
llo_page_init, env, obj, page, vmpage);
lsm = lsm_addref(lov->lo_lsm);
CDEBUG(D_INODE, "lsm %p addref %d/%d by %p.\n",
lsm, cfs_atomic_read(&lsm->lsm_refc),
- lov->lo_lsm_invalid, cfs_current());
+ lov->lo_layout_invalid, cfs_current());
}
lov_conf_thaw(lov);
return lsm;
CDEBUG(D_INODE, "lsm %p decref %d by %p.\n",
lsm, cfs_atomic_read(&lsm->lsm_refc), cfs_current());
- if (lov_free_memmd(&lsm) <= 1 && lov->lo_lsm_invalid)
- cfs_waitq_signal(&lov->lo_waitq);
+ lov_free_memmd(&lsm);
}
struct lov_stripe_md *lov_lsm_get(struct cl_object *clobj)
}
EXPORT_SYMBOL(lov_lsm_get);
-void lov_lsm_put(struct cl_object *clobj, struct lov_stripe_md *lsm)
+void lov_lsm_put(struct cl_object *unused, struct lov_stripe_md *lsm)
{
- struct lu_object *luobj;
-
- if (clobj == NULL || lsm == NULL)
- return;
-
- luobj = lu_object_locate(&cl_object_header(clobj)->coh_lu,
- &lov_device_type);
- LASSERT(luobj != NULL);
-
- lov_lsm_decref(lu2lov(luobj), lsm);
+ if (lsm != NULL)
+ lov_free_memmd(&lsm);
}
EXPORT_SYMBOL(lov_lsm_put);