* Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*
- * Copyright (c) 2011, Whamcloud, Inc.
+ * Copyright (c) 2011, 2013, Intel Corporation.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
#define DEBUG_SUBSYSTEM S_LOV
#include "lov_cl_internal.h"
-#include <lustre_debug.h>
/** \addtogroup lov
* @{
union lov_layout_state *state);
int (*llo_print)(const struct lu_env *env, void *cookie,
lu_printer_t p, const struct lu_object *o);
- struct cl_page *(*llo_page_init)(const struct lu_env *env,
- struct cl_object *obj,
- struct cl_page *page,
- cfs_page_t *vmpage);
+ int (*llo_page_init)(const struct lu_env *env, struct cl_object *obj,
+ struct cl_page *page, pgoff_t index);
int (*llo_lock_init)(const struct lu_env *env,
struct cl_object *obj, struct cl_lock *lock,
const struct cl_io *io);
struct cl_attr *attr);
};
+static int lov_layout_wait(const struct lu_env *env, struct lov_object *lov);
+
/*****************************************************************************
*
* Lov object layout operations.
}
static int lov_init_sub(const struct lu_env *env, struct lov_object *lov,
- struct cl_object *stripe,
- struct lov_layout_raid0 *r0, int idx)
+ struct cl_object *stripe, struct lov_layout_raid0 *r0,
+ int idx)
{
- struct cl_object_header *hdr;
- struct cl_object_header *subhdr;
- struct cl_object_header *parent;
- struct lov_oinfo *oinfo;
- int result;
+ struct cl_object_header *hdr;
+ struct cl_object_header *subhdr;
+ struct cl_object_header *parent;
+ struct lov_oinfo *oinfo;
+ int result;
if (OBD_FAIL_CHECK(OBD_FAIL_LOV_INIT)) {
+ /* For sanity:test_206.
+ * Do not leave the object in cache to avoid accessing
+ * freed memory. This is because osc_object is referring to
+ * lov_oinfo of lsm_stripe_data which will be freed due to
+ * this failure. */
+ cl_object_kill(env, stripe);
cl_object_put(env, stripe);
return -EIO;
}
- hdr = cl_object_header(lov2cl(lov));
- subhdr = cl_object_header(stripe);
- parent = subhdr->coh_parent;
+ hdr = cl_object_header(lov2cl(lov));
+ subhdr = cl_object_header(stripe);
oinfo = lov->lo_lsm->lsm_oinfo[idx];
- CDEBUG(D_INODE, DFID"@%p[%d] -> "DFID"@%p: id: "LPU64" seq: "LPU64
- " idx: %d gen: %d\n",
- PFID(&subhdr->coh_lu.loh_fid), subhdr, idx,
- PFID(&hdr->coh_lu.loh_fid), hdr,
- oinfo->loi_id, oinfo->loi_seq,
- oinfo->loi_ost_idx, oinfo->loi_ost_gen);
-
- if (parent == NULL) {
- subhdr->coh_parent = hdr;
- subhdr->coh_nesting = hdr->coh_nesting + 1;
- lu_object_ref_add(&stripe->co_lu, "lov-parent", lov);
- r0->lo_sub[idx] = cl2lovsub(stripe);
- r0->lo_sub[idx]->lso_super = lov;
- r0->lo_sub[idx]->lso_index = idx;
- result = 0;
- } else {
- CERROR("Stripe is already owned by other file (%d).\n", idx);
- LU_OBJECT_DEBUG(D_ERROR, env, &stripe->co_lu, "\n");
- LU_OBJECT_DEBUG(D_ERROR, env, lu_object_top(&parent->coh_lu),
- "old\n");
- LU_OBJECT_HEADER(D_ERROR, env, lov2lu(lov), "new\n");
- cl_object_put(env, stripe);
- result = -EIO;
- }
- return result;
+ CDEBUG(D_INODE, DFID"@%p[%d] -> "DFID"@%p: ostid: "DOSTID
+ " idx: %d gen: %d\n",
+ PFID(&subhdr->coh_lu.loh_fid), subhdr, idx,
+ PFID(&hdr->coh_lu.loh_fid), hdr, POSTID(&oinfo->loi_oi),
+ oinfo->loi_ost_idx, oinfo->loi_ost_gen);
+
+ /* reuse ->coh_attr_guard to protect coh_parent change */
+ spin_lock(&subhdr->coh_attr_guard);
+ parent = subhdr->coh_parent;
+ if (parent == NULL) {
+ subhdr->coh_parent = hdr;
+ spin_unlock(&subhdr->coh_attr_guard);
+ subhdr->coh_nesting = hdr->coh_nesting + 1;
+ lu_object_ref_add(&stripe->co_lu, "lov-parent", lov);
+ r0->lo_sub[idx] = cl2lovsub(stripe);
+ r0->lo_sub[idx]->lso_super = lov;
+ r0->lo_sub[idx]->lso_index = idx;
+ result = 0;
+ } else {
+ struct lu_object *old_obj;
+ struct lov_object *old_lov;
+ unsigned int mask = D_INODE;
+
+ spin_unlock(&subhdr->coh_attr_guard);
+ old_obj = lu_object_locate(&parent->coh_lu, &lov_device_type);
+ LASSERT(old_obj != NULL);
+ old_lov = cl2lov(lu2cl(old_obj));
+ if (old_lov->lo_layout_invalid) {
+ /* the object's layout has already changed but isn't
+ * refreshed */
+ lu_object_unhash(env, &stripe->co_lu);
+ result = -EAGAIN;
+ } else {
+ mask = D_ERROR;
+ result = -EIO;
+ }
+
+ LU_OBJECT_DEBUG(mask, env, &stripe->co_lu,
+ "stripe %d is already owned.\n", idx);
+ LU_OBJECT_DEBUG(mask, env, old_obj, "owned.\n");
+ LU_OBJECT_HEADER(mask, env, lov2lu(lov), "try to own.\n");
+ cl_object_put(env, stripe);
+ }
+ return result;
+}
+
+static int lov_page_slice_fixup(struct lov_object *lov,
+ struct cl_object *stripe)
+{
+ struct cl_object_header *hdr = cl_object_header(&lov->lo_cl);
+ struct cl_object *o;
+
+ cl_object_for_each(o, stripe)
+ o->co_slice_off += hdr->coh_page_bufsize;
+
+ return cl_object_header(stripe)->coh_page_bufsize;
}
static int lov_init_raid0(const struct lu_env *env,
LASSERT(lov->lo_lsm == NULL);
lov->lo_lsm = lsm_addref(lsm);
r0->lo_nr = lsm->lsm_stripe_count;
- LASSERT(r0->lo_nr <= lov_targets_nr(dev));
+ LASSERT(r0->lo_nr <= lov_targets_nr(dev));
- OBD_ALLOC_LARGE(r0->lo_sub, r0->lo_nr * sizeof r0->lo_sub[0]);
- if (r0->lo_sub != NULL) {
- result = 0;
- subconf->coc_inode = conf->coc_inode;
- cfs_spin_lock_init(&r0->lo_sub_lock);
+ lov->lo_layout_invalid = true;
+
+ OBD_ALLOC_LARGE(r0->lo_sub, r0->lo_nr * sizeof r0->lo_sub[0]);
+ if (r0->lo_sub != NULL) {
+ int psz = 0;
+
+ result = 0;
+ subconf->coc_inode = conf->coc_inode;
+ spin_lock_init(&r0->lo_sub_lock);
/*
* Create stripe cl_objects.
*/
struct lov_oinfo *oinfo = lsm->lsm_oinfo[i];
int ost_idx = oinfo->loi_ost_idx;
- fid_ostid_unpack(ofid, &oinfo->loi_oi,
- oinfo->loi_ost_idx);
- subdev = lovsub2cl_dev(dev->ld_target[ost_idx]);
- subconf->u.coc_oinfo = oinfo;
- LASSERTF(subdev != NULL, "not init ost %d\n", ost_idx);
- stripe = lov_sub_find(env, subdev, ofid, subconf);
- if (!IS_ERR(stripe))
- result = lov_init_sub(env, lov, stripe, r0, i);
- else
- result = PTR_ERR(stripe);
+ result = ostid_to_fid(ofid, &oinfo->loi_oi,
+ oinfo->loi_ost_idx);
+ if (result != 0)
+ GOTO(out, result);
+
+ subdev = lovsub2cl_dev(dev->ld_target[ost_idx]);
+ subconf->u.coc_oinfo = oinfo;
+ LASSERTF(subdev != NULL, "not init ost %d\n", ost_idx);
+ /* In the function below, .hs_keycmp resolves to
+ * lu_obj_hop_keycmp() */
+ /* coverity[overrun-buffer-val] */
+ stripe = lov_sub_find(env, subdev, ofid, subconf);
+ if (!IS_ERR(stripe)) {
+ result = lov_init_sub(env, lov, stripe, r0, i);
+ if (result == -EAGAIN) { /* try again */
+ --i;
+ result = 0;
+ continue;
+ }
+ } else {
+ result = PTR_ERR(stripe);
+ }
+
+ if (result == 0) {
+ int sz = lov_page_slice_fixup(lov, stripe);
+ LASSERT(ergo(psz > 0, psz == sz));
+ psz = sz;
+ }
}
- } else
- result = -ENOMEM;
- RETURN(result);
+ if (result == 0)
+ cl_object_header(&lov->lo_cl)->coh_page_bufsize += psz;
+ } else
+ result = -ENOMEM;
+out:
+ RETURN(result);
+}
+
+static int lov_init_released(const struct lu_env *env,
+ struct lov_device *dev, struct lov_object *lov,
+ const struct cl_object_conf *conf,
+ union lov_layout_state *state)
+{
+ struct lov_stripe_md *lsm = conf->u.coc_md->lsm;
+
+ LASSERT(lsm != NULL);
+ LASSERT(lsm_is_released(lsm));
+ LASSERT(lov->lo_lsm == NULL);
+
+ lov->lo_lsm = lsm_addref(lsm);
+ return 0;
}
static int lov_delete_empty(const struct lu_env *env, struct lov_object *lov,
union lov_layout_state *state)
{
- LASSERT(lov->lo_type == LLT_EMPTY);
+ LASSERT(lov->lo_type == LLT_EMPTY || lov->lo_type == LLT_RELEASED);
+
+ lov_layout_wait(env, lov);
+
+ cl_locks_prune(env, &lov->lo_cl, 0);
return 0;
}
static void lov_subobject_kill(const struct lu_env *env, struct lov_object *lov,
- struct lovsub_object *los, int idx)
+ struct lovsub_object *los, int idx)
{
- struct cl_object *sub;
- struct lov_layout_raid0 *r0;
- struct lu_site *site;
- struct lu_site_bkt_data *bkt;
- cfs_waitlink_t *waiter;
+ struct cl_object *sub;
+ struct lov_layout_raid0 *r0;
+ struct lu_site *site;
+ struct lu_site_bkt_data *bkt;
+ wait_queue_t *waiter;
r0 = &lov->u.raid0;
LASSERT(r0->lo_sub[idx] == los);
/* ... wait until it is actually destroyed---sub-object clears its
* ->lo_sub[] slot in lovsub_object_fini() */
- if (r0->lo_sub[idx] == los) {
- waiter = &lov_env_info(env)->lti_waiter;
- cfs_waitlink_init(waiter);
- cfs_waitq_add(&bkt->lsb_marche_funebre, waiter);
- cfs_set_current_state(CFS_TASK_UNINT);
- while (1) {
- /* this wait-queue is signaled at the end of
- * lu_object_free(). */
- cfs_set_current_state(CFS_TASK_UNINT);
- cfs_spin_lock(&r0->lo_sub_lock);
- if (r0->lo_sub[idx] == los) {
- cfs_spin_unlock(&r0->lo_sub_lock);
- cfs_waitq_wait(waiter, CFS_TASK_UNINT);
- } else {
- cfs_spin_unlock(&r0->lo_sub_lock);
- cfs_set_current_state(CFS_TASK_RUNNING);
- break;
- }
- }
- cfs_waitq_del(&bkt->lsb_marche_funebre, waiter);
- }
- LASSERT(r0->lo_sub[idx] == NULL);
+ if (r0->lo_sub[idx] == los) {
+ waiter = &lov_env_info(env)->lti_waiter;
+ init_waitqueue_entry_current(waiter);
+ add_wait_queue(&bkt->lsb_marche_funebre, waiter);
+ set_current_state(TASK_UNINTERRUPTIBLE);
+ while (1) {
+ /* this wait-queue is signaled at the end of
+ * lu_object_free(). */
+ set_current_state(TASK_UNINTERRUPTIBLE);
+ spin_lock(&r0->lo_sub_lock);
+ if (r0->lo_sub[idx] == los) {
+ spin_unlock(&r0->lo_sub_lock);
+ waitq_wait(waiter, TASK_UNINTERRUPTIBLE);
+ } else {
+ spin_unlock(&r0->lo_sub_lock);
+ set_current_state(TASK_RUNNING);
+ break;
+ }
+ }
+ remove_wait_queue(&bkt->lsb_marche_funebre, waiter);
+ }
+ LASSERT(r0->lo_sub[idx] == NULL);
}
static int lov_delete_raid0(const struct lu_env *env, struct lov_object *lov,
ENTRY;
dump_lsm(D_INODE, lsm);
- if (lov->lo_lsm_invalid && cfs_atomic_read(&lsm->lsm_refc) > 1)
- RETURN(-EBUSY);
+ lov_layout_wait(env, lov);
if (r0->lo_sub != NULL) {
for (i = 0; i < r0->lo_nr; ++i) {
struct lovsub_object *los = r0->lo_sub[i];
- if (los != NULL)
+ if (los != NULL) {
+ cl_locks_prune(env, &los->lso_cl, 1);
/*
* If top-level object is to be evicted from
* the cache, so are its sub-objects.
*/
lov_subobject_kill(env, lov, los, i);
- }
- }
+ }
+ }
+ }
+ cl_locks_prune(env, &lov->lo_cl, 0);
RETURN(0);
}
static void lov_fini_empty(const struct lu_env *env, struct lov_object *lov,
union lov_layout_state *state)
{
- LASSERT(lov->lo_type == LLT_EMPTY);
+ LASSERT(lov->lo_type == LLT_EMPTY || lov->lo_type == LLT_RELEASED);
}
static void lov_fini_raid0(const struct lu_env *env, struct lov_object *lov,
EXIT;
}
+static void lov_fini_released(const struct lu_env *env, struct lov_object *lov,
+ union lov_layout_state *state)
+{
+ ENTRY;
+ dump_lsm(D_INODE, lov->lo_lsm);
+ lov_free_memmd(&lov->lo_lsm);
+ EXIT;
+}
+
static int lov_print_empty(const struct lu_env *env, void *cookie,
lu_printer_t p, const struct lu_object *o)
{
- (*p)(env, cookie, "empty\n");
+ (*p)(env, cookie, "empty %d\n", lu2lov(o)->lo_layout_invalid);
return 0;
}
static int lov_print_raid0(const struct lu_env *env, void *cookie,
- lu_printer_t p, const struct lu_object *o)
+ lu_printer_t p, const struct lu_object *o)
{
- struct lov_object *lov = lu2lov(o);
- struct lov_layout_raid0 *r0 = lov_r0(lov);
- int i;
-
- (*p)(env, cookie, "stripes: %d:\n", r0->lo_nr);
- for (i = 0; i < r0->lo_nr; ++i) {
- struct lu_object *sub;
+ struct lov_object *lov = lu2lov(o);
+ struct lov_layout_raid0 *r0 = lov_r0(lov);
+ struct lov_stripe_md *lsm = lov->lo_lsm;
+ int i;
+
+ (*p)(env, cookie, "stripes: %d, %s, lsm{%p 0x%08X %d %u %u}:\n",
+ r0->lo_nr, lov->lo_layout_invalid ? "invalid" : "valid", lsm,
+ lsm->lsm_magic, cfs_atomic_read(&lsm->lsm_refc),
+ lsm->lsm_stripe_count, lsm->lsm_layout_gen);
+ for (i = 0; i < r0->lo_nr; ++i) {
+ struct lu_object *sub;
+
+ if (r0->lo_sub[i] != NULL) {
+ sub = lovsub2lu(r0->lo_sub[i]);
+ lu_object_print(env, cookie, p, sub);
+ } else {
+ (*p)(env, cookie, "sub %d absent\n", i);
+ }
+ }
+ return 0;
+}
- if (r0->lo_sub[i] != NULL) {
- sub = lovsub2lu(r0->lo_sub[i]);
- lu_object_print(env, cookie, p, sub);
- } else
- (*p)(env, cookie, "sub %d absent\n", i);
- }
- return 0;
+static int lov_print_released(const struct lu_env *env, void *cookie,
+ lu_printer_t p, const struct lu_object *o)
+{
+ struct lov_object *lov = lu2lov(o);
+ struct lov_stripe_md *lsm = lov->lo_lsm;
+
+ (*p)(env, cookie,
+ "released: %s, lsm{%p 0x%08X %d %u %u}:\n",
+ lov->lo_layout_invalid ? "invalid" : "valid", lsm,
+ lsm->lsm_magic, cfs_atomic_read(&lsm->lsm_refc),
+ lsm->lsm_stripe_count, lsm->lsm_layout_gen);
+ return 0;
}
/**
static int lov_attr_get_raid0(const struct lu_env *env, struct cl_object *obj,
struct cl_attr *attr)
{
- struct lov_object *lov = cl2lov(obj);
+ struct lov_object *lov = cl2lov(obj);
struct lov_layout_raid0 *r0 = lov_r0(lov);
- struct lov_stripe_md *lsm = lov->lo_lsm;
- struct ost_lvb *lvb = &lov_env_info(env)->lti_lvb;
- __u64 kms;
- int result = 0;
+ struct cl_attr *lov_attr = &r0->lo_attr;
+ int result = 0;
ENTRY;
/* this is called w/o holding type guard mutex, so it must be inside
- * an on going IO otherwise lsm may be replaced. */
- LASSERT(cfs_atomic_read(&lsm->lsm_refc) > 1);
-
- if (!r0->lo_attr_valid) {
- /*
- * Fill LVB with attributes already initialized by the upper
- * layer.
- */
- cl_attr2lvb(lvb, attr);
- kms = attr->cat_kms;
-
- /*
- * XXX that should be replaced with a loop over sub-objects,
- * doing cl_object_attr_get() on them. But for now, let's
- * reuse old lov code.
- */
-
- /*
- * XXX take lsm spin-lock to keep lov_merge_lvb_kms()
- * happy. It's not needed, because new code uses
- * ->coh_attr_guard spin-lock to protect consistency of
- * sub-object attributes.
- */
- lov_stripe_lock(lsm);
- result = lov_merge_lvb_kms(lsm, lvb, &kms);
- lov_stripe_unlock(lsm);
- if (result == 0) {
- cl_lvb2attr(attr, lvb);
- attr->cat_kms = kms;
- r0->lo_attr_valid = 1;
- r0->lo_attr = *attr;
- }
- } else
- *attr = r0->lo_attr;
- RETURN(result);
+ * an on going IO otherwise lsm may be replaced.
+ * LU-2117: it turns out there exists one exception. For mmaped files,
+ * the lock of those files may be requested in the other file's IO
+ * context, and this function is called in ccc_lock_state(), it will
+ * hit this assertion.
+ * Anyway, it's still okay to call attr_get w/o type guard as layout
+ * can't go if locks exist. */
+ /* LASSERT(cfs_atomic_read(&lsm->lsm_refc) > 1); */
+
+ if (!r0->lo_attr_valid) {
+ struct lov_stripe_md *lsm = lov->lo_lsm;
+ struct ost_lvb *lvb = &lov_env_info(env)->lti_lvb;
+ __u64 kms = 0;
+
+ memset(lvb, 0, sizeof(*lvb));
+ /* XXX: timestamps can be negative by sanity:test_39m,
+ * how can it be? */
+ lvb->lvb_atime = LLONG_MIN;
+ lvb->lvb_ctime = LLONG_MIN;
+ lvb->lvb_mtime = LLONG_MIN;
+
+ /*
+ * XXX that should be replaced with a loop over sub-objects,
+ * doing cl_object_attr_get() on them. But for now, let's
+ * reuse old lov code.
+ */
+
+ /*
+ * XXX take lsm spin-lock to keep lov_merge_lvb_kms()
+ * happy. It's not needed, because new code uses
+ * ->coh_attr_guard spin-lock to protect consistency of
+ * sub-object attributes.
+ */
+ lov_stripe_lock(lsm);
+ result = lov_merge_lvb_kms(lsm, lvb, &kms);
+ lov_stripe_unlock(lsm);
+ if (result == 0) {
+ cl_lvb2attr(lov_attr, lvb);
+ lov_attr->cat_kms = kms;
+ r0->lo_attr_valid = 1;
+ }
+ }
+ if (result == 0) { /* merge results */
+ attr->cat_blocks = lov_attr->cat_blocks;
+ attr->cat_size = lov_attr->cat_size;
+ attr->cat_kms = lov_attr->cat_kms;
+ if (attr->cat_atime < lov_attr->cat_atime)
+ attr->cat_atime = lov_attr->cat_atime;
+ if (attr->cat_ctime < lov_attr->cat_ctime)
+ attr->cat_ctime = lov_attr->cat_ctime;
+ if (attr->cat_mtime < lov_attr->cat_mtime)
+ attr->cat_mtime = lov_attr->cat_mtime;
+ }
+ RETURN(result);
}
const static struct lov_layout_operations lov_dispatch[] = {
.llo_install = lov_install_empty,
.llo_print = lov_print_empty,
.llo_page_init = lov_page_init_empty,
- .llo_lock_init = NULL,
+ .llo_lock_init = lov_lock_init_empty,
.llo_io_init = lov_io_init_empty,
.llo_getattr = lov_attr_get_empty
},
.llo_lock_init = lov_lock_init_raid0,
.llo_io_init = lov_io_init_raid0,
.llo_getattr = lov_attr_get_raid0
+ },
+ [LLT_RELEASED] = {
+ .llo_init = lov_init_released,
+ .llo_delete = lov_delete_empty,
+ .llo_fini = lov_fini_released,
+ .llo_install = lov_install_empty,
+ .llo_print = lov_print_released,
+ .llo_page_init = lov_page_init_empty,
+ .llo_lock_init = lov_lock_init_empty,
+ .llo_io_init = lov_io_init_released,
+ .llo_getattr = lov_attr_get_empty
}
};
-
/**
* Performs a double-dispatch based on the layout type of an object.
*/
lov_dispatch[__llt].op(__VA_ARGS__); \
})
+/**
+ * Return lov_layout_type associated with a given lsm
+ */
+enum lov_layout_type lov_type(struct lov_stripe_md *lsm)
+{
+ if (lsm == NULL)
+ return LLT_EMPTY;
+ if (lsm_is_released(lsm))
+ return LLT_RELEASED;
+ return LLT_RAID0;
+}
+
static inline void lov_conf_freeze(struct lov_object *lov)
{
- if (lov->lo_owner != cfs_current())
- cfs_down_read(&lov->lo_type_guard);
+ if (lov->lo_owner != current)
+ down_read(&lov->lo_type_guard);
}
static inline void lov_conf_thaw(struct lov_object *lov)
{
- if (lov->lo_owner != cfs_current())
- cfs_up_read(&lov->lo_type_guard);
+ if (lov->lo_owner != current)
+ up_read(&lov->lo_type_guard);
}
#define LOV_2DISPATCH_MAYLOCK(obj, op, lock, ...) \
lov_conf_thaw(__obj); \
} while (0)
+static void lov_conf_lock(struct lov_object *lov)
+{
+ LASSERT(lov->lo_owner != current);
+ down_write(&lov->lo_type_guard);
+ LASSERT(lov->lo_owner == NULL);
+ lov->lo_owner = current;
+}
+
+static void lov_conf_unlock(struct lov_object *lov)
+{
+ lov->lo_owner = NULL;
+ up_write(&lov->lo_type_guard);
+}
+
static int lov_layout_wait(const struct lu_env *env, struct lov_object *lov)
{
struct l_wait_info lwi = { 0 };
- struct lov_stripe_md *lsm = lov->lo_lsm;
ENTRY;
- if (!lov->lo_lsm_invalid || lsm == NULL)
- RETURN(0);
+ while (cfs_atomic_read(&lov->lo_active_ios) > 0) {
+ CDEBUG(D_INODE, "file:"DFID" wait for active IO, now: %d.\n",
+ PFID(lu_object_fid(lov2lu(lov))),
+ cfs_atomic_read(&lov->lo_active_ios));
- l_wait_event(lov->lo_waitq, cfs_atomic_read(&lsm->lsm_refc) == 1, &lwi);
+ l_wait_event(lov->lo_waitq,
+ cfs_atomic_read(&lov->lo_active_ios) == 0, &lwi);
+ }
RETURN(0);
}
-static int lov_layout_change(const struct lu_env *env,
- struct lov_object *lov, enum lov_layout_type llt,
- const struct cl_object_conf *conf)
+static int lov_layout_change(const struct lu_env *unused,
+ struct lov_object *lov,
+ const struct cl_object_conf *conf)
{
int result;
+ enum lov_layout_type llt = LLT_EMPTY;
union lov_layout_state *state = &lov->u;
const struct lov_layout_operations *old_ops;
const struct lov_layout_operations *new_ops;
- struct cl_object_header *hdr = cl_object_header(&lov->lo_cl);
void *cookie;
- struct lu_env *nested;
+ struct lu_env *env;
int refcheck;
+ ENTRY;
LASSERT(0 <= lov->lo_type && lov->lo_type < ARRAY_SIZE(lov_dispatch));
+
+ if (conf->u.coc_md != NULL)
+ llt = lov_type(conf->u.coc_md->lsm);
LASSERT(0 <= llt && llt < ARRAY_SIZE(lov_dispatch));
- ENTRY;
cookie = cl_env_reenter();
- nested = cl_env_get(&refcheck);
- if (!IS_ERR(nested))
- cl_object_prune(nested, &lov->lo_cl);
- else
- result = PTR_ERR(nested);
- cl_env_put(nested, &refcheck);
- cl_env_reexit(cookie);
+ env = cl_env_get(&refcheck);
+ if (IS_ERR(env)) {
+ cl_env_reexit(cookie);
+ RETURN(PTR_ERR(env));
+ }
+
+ CDEBUG(D_INODE, DFID" from %s to %s\n",
+ PFID(lu_object_fid(lov2lu(lov))),
+ llt2str(lov->lo_type), llt2str(llt));
old_ops = &lov_dispatch[lov->lo_type];
new_ops = &lov_dispatch[llt];
+ cl_object_prune(env, &lov->lo_cl);
+
result = old_ops->llo_delete(env, lov, &lov->u);
if (result == 0) {
old_ops->llo_fini(env, lov, &lov->u);
- LASSERT(cfs_list_empty(&hdr->coh_locks));
- LASSERT(hdr->coh_tree.rnode == NULL);
- LASSERT(hdr->coh_pages == 0);
+
+ LASSERT(cfs_atomic_read(&lov->lo_active_ios) == 0);
lov->lo_type = LLT_EMPTY;
result = new_ops->llo_init(env,
/* this file becomes an EMPTY file. */
}
}
+
+ cl_env_put(env, &refcheck);
+ cl_env_reexit(cookie);
RETURN(result);
}
* Lov object operations.
*
*/
-
int lov_object_init(const struct lu_env *env, struct lu_object *obj,
const struct lu_object_conf *conf)
{
int result;
ENTRY;
- cfs_init_rwsem(&lov->lo_type_guard);
- cfs_waitq_init(&lov->lo_waitq);
+ init_rwsem(&lov->lo_type_guard);
+ cfs_atomic_set(&lov->lo_active_ios, 0);
+ init_waitqueue_head(&lov->lo_waitq);
+
+ cl_object_page_init(lu2cl(obj), sizeof(struct lov_page));
/* no locking is necessary, as object is being created */
- lov->lo_type = cconf->u.coc_md->lsm != NULL ? LLT_RAID0 : LLT_EMPTY;
+ lov->lo_type = lov_type(cconf->u.coc_md->lsm);
ops = &lov_dispatch[lov->lo_type];
result = ops->llo_init(env, dev, lov, cconf, set);
if (result == 0)
static int lov_conf_set(const struct lu_env *env, struct cl_object *obj,
const struct cl_object_conf *conf)
{
- struct lov_stripe_md *lsm = conf->u.coc_md->lsm;
- struct lov_object *lov = cl2lov(obj);
- int result = 0;
+ struct lov_stripe_md *lsm = NULL;
+ struct lov_object *lov = cl2lov(obj);
+ int result = 0;
ENTRY;
- /*
- * Only LLT_EMPTY <-> LLT_RAID0 transitions are supported.
- */
- LASSERT(lov->lo_owner != cfs_current());
- cfs_down_write(&lov->lo_type_guard);
- LASSERT(lov->lo_owner == NULL);
- lov->lo_owner = cfs_current();
-
- if (conf->coc_invalidate) {
- lov->lo_lsm_invalid = 1;
+ lov_conf_lock(lov);
+ if (conf->coc_opc == OBJECT_CONF_INVALIDATE) {
+ lov->lo_layout_invalid = true;
GOTO(out, result = 0);
}
- if (conf->coc_validate_only) {
- if (!lov->lo_lsm_invalid)
- GOTO(out, result = 0);
+ if (conf->coc_opc == OBJECT_CONF_WAIT) {
+ if (lov->lo_layout_invalid &&
+ cfs_atomic_read(&lov->lo_active_ios) > 0) {
+ lov_conf_unlock(lov);
+ result = lov_layout_wait(env, lov);
+ lov_conf_lock(lov);
+ }
+ GOTO(out, result);
+ }
- lov_layout_wait(env, lov);
- /* fall through to set up new layout */
+ LASSERT(conf->coc_opc == OBJECT_CONF_SET);
+
+ if (conf->u.coc_md != NULL)
+ lsm = conf->u.coc_md->lsm;
+ if ((lsm == NULL && lov->lo_lsm == NULL) ||
+ ((lsm != NULL && lov->lo_lsm != NULL) &&
+ (lov->lo_lsm->lsm_layout_gen == lsm->lsm_layout_gen) &&
+ (lov->lo_lsm->lsm_pattern == lsm->lsm_pattern))) {
+ /* same version of layout */
+ lov->lo_layout_invalid = false;
+ GOTO(out, result = 0);
}
- switch (lov->lo_type) {
- case LLT_EMPTY:
- if (lsm != NULL)
- result = lov_layout_change(env, lov, LLT_RAID0, conf);
- break;
- case LLT_RAID0:
- if (lsm == NULL)
- result = lov_layout_change(env, lov, LLT_EMPTY, conf);
- else if (lov_stripe_md_cmp(lov->lo_lsm, lsm))
- result = -EOPNOTSUPP;
- break;
- default:
- LBUG();
+ /* will change layout - check if there still exists active IO. */
+ if (cfs_atomic_read(&lov->lo_active_ios) > 0) {
+ lov->lo_layout_invalid = true;
+ GOTO(out, result = -EBUSY);
}
- lov->lo_lsm_invalid = result != 0;
+
+ lov->lo_layout_invalid = lov_layout_change(env, lov, conf);
EXIT;
out:
- lov->lo_owner = NULL;
- cfs_up_write(&lov->lo_type_guard);
+ lov_conf_unlock(lov);
+ CDEBUG(D_INODE, DFID" lo_layout_invalid=%d\n",
+ PFID(lu_object_fid(lov2lu(lov))), lov->lo_layout_invalid);
RETURN(result);
}
static int lov_object_print(const struct lu_env *env, void *cookie,
lu_printer_t p, const struct lu_object *o)
{
- return LOV_2DISPATCH(lu2lov(o), llo_print, env, cookie, p, o);
+ return LOV_2DISPATCH_NOLOCK(lu2lov(o), llo_print, env, cookie, p, o);
}
-struct cl_page *lov_page_init(const struct lu_env *env, struct cl_object *obj,
- struct cl_page *page, cfs_page_t *vmpage)
+int lov_page_init(const struct lu_env *env, struct cl_object *obj,
+ struct cl_page *page, pgoff_t index)
{
- return LOV_2DISPATCH(cl2lov(obj),
- llo_page_init, env, obj, page, vmpage);
+ return LOV_2DISPATCH_NOLOCK(cl2lov(obj), llo_page_init, env, obj, page,
+ index);
}
/**
int lov_io_init(const struct lu_env *env, struct cl_object *obj,
struct cl_io *io)
{
- struct lov_io *lio = lov_env_io(env);
-
CL_IO_SLICE_CLEAN(lov_env_io(env), lis_cl);
-
- /* hold lsm before initializing because io relies on it */
- lio->lis_lsm = lov_lsm_addref(cl2lov(obj));
-
- /* No need to lock because we've taken one refcount of layout. */
- return LOV_2DISPATCH_NOLOCK(cl2lov(obj), llo_io_init, env, obj, io);
+ return LOV_2DISPATCH_MAYLOCK(cl2lov(obj), llo_io_init,
+ !io->ci_ignore_layout, env, obj, io);
}
/**
};
struct lu_object *lov_object_alloc(const struct lu_env *env,
- const struct lu_object_header *unused,
- struct lu_device *dev)
+ const struct lu_object_header *unused,
+ struct lu_device *dev)
{
- struct lov_object *lov;
- struct lu_object *obj;
+ struct lov_object *lov;
+ struct lu_object *obj;
- ENTRY;
- OBD_SLAB_ALLOC_PTR_GFP(lov, lov_object_kmem, CFS_ALLOC_IO);
- if (lov != NULL) {
- obj = lov2lu(lov);
- lu_object_init(obj, NULL, dev);
- lov->lo_cl.co_ops = &lov_ops;
- lov->lo_type = -1; /* invalid, to catch uninitialized type */
- /*
- * object io operation vector (cl_object::co_iop) is installed
- * later in lov_object_init(), as different vectors are used
- * for object with different layouts.
- */
- obj->lo_ops = &lov_lu_obj_ops;
- } else
- obj = NULL;
- RETURN(obj);
+ ENTRY;
+ OBD_SLAB_ALLOC_PTR_GFP(lov, lov_object_kmem, GFP_NOFS);
+ if (lov != NULL) {
+ obj = lov2lu(lov);
+ lu_object_init(obj, NULL, dev);
+ lov->lo_cl.co_ops = &lov_ops;
+ lov->lo_type = -1; /* invalid, to catch uninitialized type */
+ /*
+ * object io operation vector (cl_object::co_iop) is installed
+ * later in lov_object_init(), as different vectors are used
+ * for object with different layouts.
+ */
+ obj->lo_ops = &lov_lu_obj_ops;
+ } else
+ obj = NULL;
+ RETURN(obj);
}
struct lov_stripe_md *lov_lsm_addref(struct lov_object *lov)
struct lov_stripe_md *lsm = NULL;
lov_conf_freeze(lov);
- if (!lov->lo_lsm_invalid && lov->lo_lsm != NULL) {
+ if (lov->lo_lsm != NULL) {
lsm = lsm_addref(lov->lo_lsm);
- CDEBUG(D_INODE, "lsm %p addref %d by %p.\n",
- lsm, cfs_atomic_read(&lsm->lsm_refc), cfs_current());
+ CDEBUG(D_INODE, "lsm %p addref %d/%d by %p.\n",
+ lsm, cfs_atomic_read(&lsm->lsm_refc),
+ lov->lo_layout_invalid, current);
}
lov_conf_thaw(lov);
return lsm;
}
-void lov_lsm_decref(struct lov_object *lov, struct lov_stripe_md *lsm)
-{
- if (lsm == NULL)
- return;
-
- CDEBUG(D_INODE, "lsm %p decref %d by %p.\n",
- lsm, cfs_atomic_read(&lsm->lsm_refc), cfs_current());
-
- if (lov_free_memmd(&lsm) <= 1 && lov->lo_lsm_invalid)
- cfs_waitq_signal(&lov->lo_waitq);
-}
-
struct lov_stripe_md *lov_lsm_get(struct cl_object *clobj)
{
struct lu_object *luobj;
}
EXPORT_SYMBOL(lov_lsm_get);
-void lov_lsm_put(struct cl_object *clobj, struct lov_stripe_md *lsm)
+void lov_lsm_put(struct cl_object *unused, struct lov_stripe_md *lsm)
{
- struct lu_object *luobj;
-
- if (clobj == NULL || lsm == NULL)
- return;
-
- luobj = lu_object_locate(&cl_object_header(clobj)->coh_lu,
- &lov_device_type);
- LASSERT(luobj != NULL);
-
- lov_lsm_decref(lu2lov(luobj), lsm);
+ if (lsm != NULL)
+ lov_free_memmd(&lsm);
}
EXPORT_SYMBOL(lov_lsm_put);
loi->loi_ar.ar_rc = 0;
}
}
+ case LLT_RELEASED:
case LLT_EMPTY:
break;
default: