* Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*
- * Copyright (c) 2011, 2013, Intel Corporation.
+ * Copyright (c) 2011, 2014, Intel Corporation.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
#define DEBUG_SUBSYSTEM S_LOV
#include "lov_cl_internal.h"
-#include <lustre_debug.h>
/** \addtogroup lov
* @{
int (*llo_print)(const struct lu_env *env, void *cookie,
lu_printer_t p, const struct lu_object *o);
int (*llo_page_init)(const struct lu_env *env, struct cl_object *obj,
- struct cl_page *page, struct page *vmpage);
+ struct cl_page *page, pgoff_t index);
int (*llo_lock_init)(const struct lu_env *env,
struct cl_object *obj, struct cl_lock *lock,
const struct cl_io *io);
static int lov_layout_wait(const struct lu_env *env, struct lov_object *lov);
+struct lov_stripe_md *lov_lsm_get(struct cl_object *clobj)
+{
+ struct lu_object *luobj;
+ struct lov_stripe_md *lsm = NULL;
+
+ if (clobj == NULL)
+ return NULL;
+
+ luobj = lu_object_locate(&cl_object_header(clobj)->coh_lu,
+ &lov_device_type);
+ if (luobj != NULL)
+ lsm = lov_lsm_addref(lu2lov(luobj));
+ return lsm;
+}
+EXPORT_SYMBOL(lov_lsm_get);
+
+void lov_lsm_put(struct cl_object *unused, struct lov_stripe_md *lsm)
+{
+ if (lsm != NULL)
+ lov_free_memmd(&lsm);
+}
+EXPORT_SYMBOL(lov_lsm_put);
+
/*****************************************************************************
*
* Lov object layout operations.
}
static int lov_init_sub(const struct lu_env *env, struct lov_object *lov,
- struct cl_object *stripe,
- struct lov_layout_raid0 *r0, int idx)
+ struct cl_object *stripe, struct lov_layout_raid0 *r0,
+ int idx)
{
- struct cl_object_header *hdr;
- struct cl_object_header *subhdr;
- struct cl_object_header *parent;
- struct lov_oinfo *oinfo;
- int result;
+ struct cl_object_header *hdr;
+ struct cl_object_header *subhdr;
+ struct cl_object_header *parent;
+ struct lov_oinfo *oinfo;
+ int result;
if (OBD_FAIL_CHECK(OBD_FAIL_LOV_INIT)) {
/* For sanity:test_206.
return -EIO;
}
- hdr = cl_object_header(lov2cl(lov));
- subhdr = cl_object_header(stripe);
- parent = subhdr->coh_parent;
+ hdr = cl_object_header(lov2cl(lov));
+ subhdr = cl_object_header(stripe);
oinfo = lov->lo_lsm->lsm_oinfo[idx];
CDEBUG(D_INODE, DFID"@%p[%d] -> "DFID"@%p: ostid: "DOSTID
PFID(&hdr->coh_lu.loh_fid), hdr, POSTID(&oinfo->loi_oi),
oinfo->loi_ost_idx, oinfo->loi_ost_gen);
- if (parent == NULL) {
- subhdr->coh_parent = hdr;
- subhdr->coh_nesting = hdr->coh_nesting + 1;
- lu_object_ref_add(&stripe->co_lu, "lov-parent", lov);
- r0->lo_sub[idx] = cl2lovsub(stripe);
- r0->lo_sub[idx]->lso_super = lov;
- r0->lo_sub[idx]->lso_index = idx;
- result = 0;
- } else {
+ /* reuse ->coh_attr_guard to protect coh_parent change */
+ spin_lock(&subhdr->coh_attr_guard);
+ parent = subhdr->coh_parent;
+ if (parent == NULL) {
+ subhdr->coh_parent = hdr;
+ spin_unlock(&subhdr->coh_attr_guard);
+ subhdr->coh_nesting = hdr->coh_nesting + 1;
+ lu_object_ref_add(&stripe->co_lu, "lov-parent", lov);
+ r0->lo_sub[idx] = cl2lovsub(stripe);
+ r0->lo_sub[idx]->lso_super = lov;
+ r0->lo_sub[idx]->lso_index = idx;
+ result = 0;
+ } else {
struct lu_object *old_obj;
struct lov_object *old_lov;
unsigned int mask = D_INODE;
+ spin_unlock(&subhdr->coh_attr_guard);
old_obj = lu_object_locate(&parent->coh_lu, &lov_device_type);
LASSERT(old_obj != NULL);
old_lov = cl2lov(lu2cl(old_obj));
LU_OBJECT_DEBUG(mask, env, old_obj, "owned.\n");
LU_OBJECT_HEADER(mask, env, lov2lu(lov), "try to own.\n");
cl_object_put(env, stripe);
- }
- return result;
+ }
+ return result;
+}
+
+static int lov_page_slice_fixup(struct lov_object *lov,
+ struct cl_object *stripe)
+{
+ struct cl_object_header *hdr = cl_object_header(&lov->lo_cl);
+ struct cl_object *o;
+
+ if (stripe == NULL)
+ return hdr->coh_page_bufsize - lov->lo_cl.co_slice_off -
+ cfs_size_round(sizeof(struct lov_page));
+
+ cl_object_for_each(o, stripe)
+ o->co_slice_off += hdr->coh_page_bufsize;
+
+ return cl_object_header(stripe)->coh_page_bufsize;
}
static int lov_init_raid0(const struct lu_env *env,
LASSERT(lov->lo_lsm == NULL);
lov->lo_lsm = lsm_addref(lsm);
r0->lo_nr = lsm->lsm_stripe_count;
- LASSERT(r0->lo_nr <= lov_targets_nr(dev));
+ LASSERT(r0->lo_nr <= lov_targets_nr(dev));
- OBD_ALLOC_LARGE(r0->lo_sub, r0->lo_nr * sizeof r0->lo_sub[0]);
- if (r0->lo_sub != NULL) {
- result = 0;
- subconf->coc_inode = conf->coc_inode;
+ lov->lo_layout_invalid = true;
+
+ OBD_ALLOC_LARGE(r0->lo_sub, r0->lo_nr * sizeof r0->lo_sub[0]);
+ if (r0->lo_sub != NULL) {
+ int psz = 0;
+
+ result = 0;
+ subconf->coc_inode = conf->coc_inode;
spin_lock_init(&r0->lo_sub_lock);
/*
* Create stripe cl_objects.
struct lov_oinfo *oinfo = lsm->lsm_oinfo[i];
int ost_idx = oinfo->loi_ost_idx;
- result = ostid_to_fid(ofid, &oinfo->loi_oi,
+ if (lov_oinfo_is_dummy(oinfo))
+ continue;
+
+ result = ostid_to_fid(ofid, &oinfo->loi_oi,
oinfo->loi_ost_idx);
if (result != 0)
GOTO(out, result);
- subdev = lovsub2cl_dev(dev->ld_target[ost_idx]);
- subconf->u.coc_oinfo = oinfo;
- LASSERTF(subdev != NULL, "not init ost %d\n", ost_idx);
+ subdev = lovsub2cl_dev(dev->ld_target[ost_idx]);
+ subconf->u.coc_oinfo = oinfo;
+ LASSERTF(subdev != NULL, "not init ost %d\n", ost_idx);
/* In the function below, .hs_keycmp resolves to
* lu_obj_hop_keycmp() */
/* coverity[overrun-buffer-val] */
- stripe = lov_sub_find(env, subdev, ofid, subconf);
- if (!IS_ERR(stripe)) {
- result = lov_init_sub(env, lov, stripe, r0, i);
+ stripe = lov_sub_find(env, subdev, ofid, subconf);
+ if (!IS_ERR(stripe)) {
+ result = lov_init_sub(env, lov, stripe, r0, i);
if (result == -EAGAIN) { /* try again */
--i;
result = 0;
+ continue;
}
- } else {
- result = PTR_ERR(stripe);
+ } else {
+ result = PTR_ERR(stripe);
+ }
+
+ if (result == 0) {
+ int sz = lov_page_slice_fixup(lov, stripe);
+ LASSERT(ergo(psz > 0, psz == sz));
+ psz = sz;
}
}
- } else
- result = -ENOMEM;
+ if (result == 0)
+ cl_object_header(&lov->lo_cl)->coh_page_bufsize += psz;
+ } else
+ result = -ENOMEM;
out:
- RETURN(result);
+ RETURN(result);
}
static int lov_init_released(const struct lu_env *env,
LASSERT(lov->lo_type == LLT_EMPTY || lov->lo_type == LLT_RELEASED);
lov_layout_wait(env, lov);
-
- cl_object_prune(env, &lov->lo_cl);
return 0;
}
struct lovsub_object *los = r0->lo_sub[i];
if (los != NULL) {
- cl_locks_prune(env, &los->lso_cl, 1);
+ cl_object_prune(env, &los->lso_cl);
/*
* If top-level object is to be evicted from
* the cache, so are its sub-objects.
*/
lov_subobject_kill(env, lov, los, i);
}
- }
- }
- cl_object_prune(env, &lov->lo_cl);
+ }
+ }
RETURN(0);
}
(*p)(env, cookie, "stripes: %d, %s, lsm{%p 0x%08X %d %u %u}:\n",
r0->lo_nr, lov->lo_layout_invalid ? "invalid" : "valid", lsm,
- lsm->lsm_magic, cfs_atomic_read(&lsm->lsm_refc),
+ lsm->lsm_magic, atomic_read(&lsm->lsm_refc),
lsm->lsm_stripe_count, lsm->lsm_layout_gen);
for (i = 0; i < r0->lo_nr; ++i) {
struct lu_object *sub;
(*p)(env, cookie,
"released: %s, lsm{%p 0x%08X %d %u %u}:\n",
lov->lo_layout_invalid ? "invalid" : "valid", lsm,
- lsm->lsm_magic, cfs_atomic_read(&lsm->lsm_refc),
+ lsm->lsm_magic, atomic_read(&lsm->lsm_refc),
lsm->lsm_stripe_count, lsm->lsm_layout_gen);
return 0;
}
* hit this assertion.
* Anyway, it's still okay to call attr_get w/o type guard as layout
* can't go if locks exist. */
- /* LASSERT(cfs_atomic_read(&lsm->lsm_refc) > 1); */
+ /* LASSERT(atomic_read(&lsm->lsm_refc) > 1); */
if (!r0->lo_attr_valid) {
struct lov_stripe_md *lsm = lov->lo_lsm;
/**
* Return lov_layout_type associated with a given lsm
*/
-enum lov_layout_type lov_type(struct lov_stripe_md *lsm)
+static enum lov_layout_type lov_type(struct lov_stripe_md *lsm)
{
if (lsm == NULL)
return LLT_EMPTY;
static inline void lov_conf_freeze(struct lov_object *lov)
{
- if (lov->lo_owner != cfs_current())
+ if (lov->lo_owner != current)
down_read(&lov->lo_type_guard);
}
static inline void lov_conf_thaw(struct lov_object *lov)
{
- if (lov->lo_owner != cfs_current())
+ if (lov->lo_owner != current)
up_read(&lov->lo_type_guard);
}
static void lov_conf_lock(struct lov_object *lov)
{
- LASSERT(lov->lo_owner != cfs_current());
+ LASSERT(lov->lo_owner != current);
down_write(&lov->lo_type_guard);
LASSERT(lov->lo_owner == NULL);
- lov->lo_owner = cfs_current();
+ lov->lo_owner = current;
}
static void lov_conf_unlock(struct lov_object *lov)
struct l_wait_info lwi = { 0 };
ENTRY;
- while (cfs_atomic_read(&lov->lo_active_ios) > 0) {
+ while (atomic_read(&lov->lo_active_ios) > 0) {
CDEBUG(D_INODE, "file:"DFID" wait for active IO, now: %d.\n",
PFID(lu_object_fid(lov2lu(lov))),
- cfs_atomic_read(&lov->lo_active_ios));
+ atomic_read(&lov->lo_active_ios));
l_wait_event(lov->lo_waitq,
- cfs_atomic_read(&lov->lo_active_ios) == 0, &lwi);
+ atomic_read(&lov->lo_active_ios) == 0, &lwi);
}
RETURN(0);
}
const struct lov_layout_operations *old_ops;
const struct lov_layout_operations *new_ops;
- struct cl_object_header *hdr = cl_object_header(&lov->lo_cl);
void *cookie;
struct lu_env *env;
int refcheck;
old_ops = &lov_dispatch[lov->lo_type];
new_ops = &lov_dispatch[llt];
+ result = cl_object_prune(env, &lov->lo_cl);
+ if (result != 0)
+ GOTO(out, result);
+
result = old_ops->llo_delete(env, lov, &lov->u);
if (result == 0) {
old_ops->llo_fini(env, lov, &lov->u);
- LASSERT(cfs_atomic_read(&lov->lo_active_ios) == 0);
- LASSERT(hdr->coh_tree.rnode == NULL);
- LASSERT(hdr->coh_pages == 0);
+ LASSERT(atomic_read(&lov->lo_active_ios) == 0);
lov->lo_type = LLT_EMPTY;
+ /* page bufsize fixup */
+ cl_object_header(&lov->lo_cl)->coh_page_bufsize -=
+ lov_page_slice_fixup(lov, NULL);
+
result = new_ops->llo_init(env,
lu2lov_dev(lov->lo_cl.co_lu.lo_dev),
lov, conf, state);
}
}
+out:
cl_env_put(env, &refcheck);
cl_env_reexit(cookie);
RETURN(result);
ENTRY;
init_rwsem(&lov->lo_type_guard);
- cfs_atomic_set(&lov->lo_active_ios, 0);
+ atomic_set(&lov->lo_active_ios, 0);
init_waitqueue_head(&lov->lo_waitq);
cl_object_page_init(lu2cl(obj), sizeof(struct lov_page));
if (conf->coc_opc == OBJECT_CONF_WAIT) {
if (lov->lo_layout_invalid &&
- cfs_atomic_read(&lov->lo_active_ios) > 0) {
+ atomic_read(&lov->lo_active_ios) > 0) {
lov_conf_unlock(lov);
result = lov_layout_wait(env, lov);
lov_conf_lock(lov);
}
/* will change layout - check if there still exists active IO. */
- if (cfs_atomic_read(&lov->lo_active_ios) > 0) {
+ if (atomic_read(&lov->lo_active_ios) > 0) {
lov->lo_layout_invalid = true;
GOTO(out, result = -EBUSY);
}
- lov->lo_layout_invalid = lov_layout_change(env, lov, conf);
+ result = lov_layout_change(env, lov, conf);
+ lov->lo_layout_invalid = result != 0;
EXIT;
out:
}
int lov_page_init(const struct lu_env *env, struct cl_object *obj,
- struct cl_page *page, struct page *vmpage)
+ struct cl_page *page, pgoff_t index)
{
- return LOV_2DISPATCH_NOLOCK(cl2lov(obj),
- llo_page_init, env, obj, page, vmpage);
+ return LOV_2DISPATCH_NOLOCK(cl2lov(obj), llo_page_init, env, obj, page,
+ index);
}
/**
return LOV_2DISPATCH_NOLOCK(cl2lov(obj), llo_getattr, env, obj, attr);
}
-static int lov_attr_set(const struct lu_env *env, struct cl_object *obj,
- const struct cl_attr *attr, unsigned valid)
+static int lov_attr_update(const struct lu_env *env, struct cl_object *obj,
+ const struct cl_attr *attr, unsigned valid)
{
- /*
- * No dispatch is required here, as no layout implements this.
- */
- return 0;
+ /*
+ * No dispatch is required here, as no layout implements this.
+ */
+ return 0;
}
int lov_lock_init(const struct lu_env *env, struct cl_object *obj,
io);
}
+static int lov_object_getstripe(const struct lu_env *env, struct cl_object *obj,
+ struct lov_user_md __user *lum)
+{
+ struct lov_object *lov = cl2lov(obj);
+ struct lov_stripe_md *lsm;
+ int rc = 0;
+ ENTRY;
+
+ lsm = lov_lsm_addref(lov);
+ if (lsm == NULL)
+ RETURN(-ENODATA);
+
+ rc = lov_getstripe(cl2lov(obj), lsm, lum);
+ lov_lsm_put(obj, lsm);
+ RETURN(rc);
+}
+
static const struct cl_object_operations lov_ops = {
- .coo_page_init = lov_page_init,
- .coo_lock_init = lov_lock_init,
- .coo_io_init = lov_io_init,
- .coo_attr_get = lov_attr_get,
- .coo_attr_set = lov_attr_set,
- .coo_conf_set = lov_conf_set
+ .coo_page_init = lov_page_init,
+ .coo_lock_init = lov_lock_init,
+ .coo_io_init = lov_io_init,
+ .coo_attr_get = lov_attr_get,
+ .coo_attr_update = lov_attr_update,
+ .coo_conf_set = lov_conf_set,
+ .coo_getstripe = lov_object_getstripe
};
static const struct lu_object_operations lov_lu_obj_ops = {
};
struct lu_object *lov_object_alloc(const struct lu_env *env,
- const struct lu_object_header *unused,
- struct lu_device *dev)
+ const struct lu_object_header *unused,
+ struct lu_device *dev)
{
- struct lov_object *lov;
- struct lu_object *obj;
+ struct lov_object *lov;
+ struct lu_object *obj;
- ENTRY;
- OBD_SLAB_ALLOC_PTR_GFP(lov, lov_object_kmem, __GFP_IO);
- if (lov != NULL) {
- obj = lov2lu(lov);
- lu_object_init(obj, NULL, dev);
- lov->lo_cl.co_ops = &lov_ops;
- lov->lo_type = -1; /* invalid, to catch uninitialized type */
- /*
- * object io operation vector (cl_object::co_iop) is installed
- * later in lov_object_init(), as different vectors are used
- * for object with different layouts.
- */
- obj->lo_ops = &lov_lu_obj_ops;
- } else
- obj = NULL;
- RETURN(obj);
+ ENTRY;
+ OBD_SLAB_ALLOC_PTR_GFP(lov, lov_object_kmem, GFP_NOFS);
+ if (lov != NULL) {
+ obj = lov2lu(lov);
+ lu_object_init(obj, NULL, dev);
+ lov->lo_cl.co_ops = &lov_ops;
+ lov->lo_type = -1; /* invalid, to catch uninitialized type */
+ /*
+ * object io operation vector (cl_object::co_iop) is installed
+ * later in lov_object_init(), as different vectors are used
+ * for object with different layouts.
+ */
+ obj->lo_ops = &lov_lu_obj_ops;
+ } else
+ obj = NULL;
+ RETURN(obj);
}
struct lov_stripe_md *lov_lsm_addref(struct lov_object *lov)
if (lov->lo_lsm != NULL) {
lsm = lsm_addref(lov->lo_lsm);
CDEBUG(D_INODE, "lsm %p addref %d/%d by %p.\n",
- lsm, cfs_atomic_read(&lsm->lsm_refc),
- lov->lo_layout_invalid, cfs_current());
+ lsm, atomic_read(&lsm->lsm_refc),
+ lov->lo_layout_invalid, current);
}
lov_conf_thaw(lov);
return lsm;
}
-void lov_lsm_decref(struct lov_object *lov, struct lov_stripe_md *lsm)
-{
- if (lsm == NULL)
- return;
-
- CDEBUG(D_INODE, "lsm %p decref %d by %p.\n",
- lsm, cfs_atomic_read(&lsm->lsm_refc), cfs_current());
-
- lov_free_memmd(&lsm);
-}
-
-struct lov_stripe_md *lov_lsm_get(struct cl_object *clobj)
-{
- struct lu_object *luobj;
- struct lov_stripe_md *lsm = NULL;
-
- if (clobj == NULL)
- return NULL;
-
- luobj = lu_object_locate(&cl_object_header(clobj)->coh_lu,
- &lov_device_type);
- if (luobj != NULL)
- lsm = lov_lsm_addref(lu2lov(luobj));
- return lsm;
-}
-EXPORT_SYMBOL(lov_lsm_get);
-
-void lov_lsm_put(struct cl_object *unused, struct lov_stripe_md *lsm)
-{
- if (lsm != NULL)
- lov_free_memmd(&lsm);
-}
-EXPORT_SYMBOL(lov_lsm_put);
-
int lov_read_and_clear_async_rc(struct cl_object *clob)
{
struct lu_object *luobj;
LASSERT(lsm != NULL);
for (i = 0; i < lsm->lsm_stripe_count; i++) {
struct lov_oinfo *loi = lsm->lsm_oinfo[i];
+
+ if (lov_oinfo_is_dummy(loi))
+ continue;
+
if (loi->loi_ar.ar_rc && !rc)
rc = loi->loi_ar.ar_rc;
loi->loi_ar.ar_rc = 0;