*/
struct inode *coc_inode;
/**
- * Validate object conf. If object is using an invalid conf,
- * then invalidate it and set the new layout.
- */
- bool coc_validate_only;
- /**
* Invalidate the current stripe configuration due to losing
* layout lock.
*/
*/
ci_need_restart:1,
/**
- * Ignore layout change.
- * Most of the CIT_MISC operations can ignore layout change, because
- * the purpose to create this kind of cl_io is to give an environment
- * to run clio methods, for example:
- * 1. request group lock;
- * 2. flush caching pages by osc;
- * 3. writepage
- * 4. echo client
- * So far, only direct IO and glimpse clio need restart if layout
- * change during IO time.
+ * to not refresh layout - the IO issuer knows that the layout won't
+ * change(page operations, layout change causes all page to be
+ * discarded), or it doesn't matter if it changes(sync).
+ */
+ ci_ignore_layout:1,
+ /**
+ * Check if layout changed after the IO finishes. Mainly for HSM
+ * requirement. If IO occurs to openning files, it doesn't need to
+ * verify layout because HSM won't release openning files.
+ * Right now, only two opertaions need to verify layout: glimpse
+ * and setattr.
*/
- ci_ignore_layout:1;
+ ci_verify_layout:1;
/**
* Number of pages owned by this IO. For invariant checking.
*/
cl_lock_release(env, lock, "glimpse", cfs_current());
} else {
CDEBUG(D_DLMTRACE, "No objects for inode\n");
+ cl_merge_lvb(inode);
}
}
result = cl_io_get(inode, &env, &io, &refcheck);
if (result > 0) {
again:
+ io->ci_verify_layout = 1;
result = cl_io_init(env, io, CIT_MISC, io->ci_obj);
if (result > 0)
/*
RETURN(gen);
}
+/* lsm is unreliable after hsm implementation as layout can be changed at
+ * any time. This is only to support old, non-clio-ized interfaces. It will
+ * cause deadlock if clio operations are called with this extra layout refcount
+ * because in case the layout changed during the IO, ll_layout_refresh() will
+ * have to wait for the refcount to become zero to destroy the older layout.
+ *
+ * Notice that the lsm returned by this function may not be valid unless called
+ * inside layout lock - MDS_INODELOCK_LAYOUT. */
struct lov_stripe_md *ccc_inode_lsm_get(struct inode *inode)
{
return lov_lsm_get(cl_i2info(inode)->lli_clob);
/* XXX FIXME see comment on CAN_MATCH in lustre_dlm.h */
l_wait_event(lock->l_waitq,
lock->l_flags & LDLM_FL_LVB_READY ||
- lock->l_failed,
+ lock->l_destroyed || lock->l_failed,
&lwi);
if (!(lock->l_flags & LDLM_FL_LVB_READY)) {
if (flags & LDLM_FL_TEST_LOCK)
struct lustre_handle *handle;
if (it->it_op && it->d.lustre.it_lock_mode) {
- handle = (struct lustre_handle *)&it->d.lustre.it_lock_handle;
+ struct ldlm_lock *lock;
+
+ handle = (struct lustre_handle *)&it->d.lustre.it_lock_handle;
+ lock = ldlm_handle2lock(handle);
+ if (lock != NULL) {
+ /* it can only be allowed to match after layout is
+ * applied to inode otherwise false layout would be
+ * seen. Applying layout shoud happen before dropping
+ * the intent lock. */
+ if (it->d.lustre.it_lock_bits & MDS_INODELOCK_LAYOUT)
+ ldlm_lock_allow_match(lock);
+ LDLM_LOCK_PUT(lock);
+ }
+
CDEBUG(D_DLMTRACE, "releasing lock with cookie "LPX64
" from it %p\n", handle->cookie, it);
ldlm_lock_decref(handle, it->d.lustre.it_lock_mode);
if (req != NULL && !it_disposition(it, DISP_ENQ_COMPLETE))
ptlrpc_req_finished(req);
if (rc == 0) {
+ /* mdt may grant layout lock for the newly created file, so
+ * release the lock to avoid leaking */
+ ll_intent_drop_lock(it);
ll_invalidate_aliases(de->d_inode);
} else {
__u64 bits = 0;
CDEBUG(D_VFSTRACE, DFID" updating i_size "LPU64"\n",
PFID(&lli->lli_fid), lvb.lvb_size);
inode->i_blocks = lvb.lvb_blocks;
-
- LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
- LTIME_S(inode->i_atime) = lvb.lvb_atime;
- LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
}
+ LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
+ LTIME_S(inode->i_atime) = lvb.lvb_atime;
+ LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
ll_inode_size_unlock(inode);
ccc_inode_lsm_put(inode, lsm);
struct ll_inode_info *lli = ll_i2info(inode);
struct ptlrpc_request *req;
struct obd_capa *oc;
- struct lov_stripe_md *lsm;
int rc, err;
ENTRY;
+
CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
inode->i_generation, inode);
ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FSYNC, 1);
if (!err)
ptlrpc_req_finished(req);
- lsm = ccc_inode_lsm_get(inode);
- if (data && lsm) {
+ if (data) {
struct ll_file_data *fd = LUSTRE_FPRIVATE(file);
err = cl_sync_file_range(inode, 0, OBD_OBJECT_EOF,
else
fd->fd_write_failed = false;
}
- ccc_inode_lsm_put(inode, lsm);
#ifdef HAVE_FILE_FSYNC_4ARGS
mutex_unlock(&inode->i_mutex);
}
ldlm_mode_t ll_take_md_lock(struct inode *inode, __u64 bits,
- struct lustre_handle *lockh)
+ struct lustre_handle *lockh, __u64 flags)
{
ldlm_policy_data_t policy = { .l_inodebits = {bits}};
struct lu_fid *fid;
ldlm_mode_t rc;
- __u64 flags;
ENTRY;
fid = &ll_i2info(inode)->lli_fid;
CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid));
- flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING;
- rc = md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS, &policy,
+ rc = md_lock_match(ll_i2mdexp(inode), LDLM_FL_BLOCK_GRANTED|flags,
+ fid, LDLM_IBITS, &policy,
LCK_CR|LCK_CW|LCK_PR|LCK_PW, lockh);
RETURN(rc);
}
ENTRY;
rc = __ll_inode_revalidate_it(dentry, it, ibits);
-
- /* if object not yet allocated, don't validate size */
- if (rc == 0 && !ll_i2info(dentry->d_inode)->lli_has_smd) {
- LTIME_S(inode->i_atime) = ll_i2info(inode)->lli_lvb.lvb_atime;
- LTIME_S(inode->i_mtime) = ll_i2info(inode)->lli_lvb.lvb_mtime;
- LTIME_S(inode->i_ctime) = ll_i2info(inode)->lli_lvb.lvb_ctime;
- RETURN(0);
- }
-
- /* ll_glimpse_size will prefer locally cached writes if they extend
- * the file */
-
- if (rc == 0)
- rc = ll_glimpse_size(inode);
-
+ if (rc != 0)
+ RETURN(rc);
+
+ /* if object isn't regular file, don't validate size */
+ if (!S_ISREG(inode->i_mode)) {
+ LTIME_S(inode->i_atime) = ll_i2info(inode)->lli_lvb.lvb_atime;
+ LTIME_S(inode->i_mtime) = ll_i2info(inode)->lli_lvb.lvb_mtime;
+ LTIME_S(inode->i_ctime) = ll_i2info(inode)->lli_lvb.lvb_ctime;
+ } else {
+ rc = ll_glimpse_size(inode);
+ }
RETURN(rc);
}
struct ll_inode_info *lli = ll_i2info(inode);
struct ll_sb_info *sbi = ll_i2sbi(inode);
struct md_op_data *op_data = NULL;
- struct ptlrpc_request *req = NULL;
struct lookup_intent it = { .it_op = IT_LAYOUT };
- struct lustre_handle lockh;
+ struct lustre_handle lockh = { 0 };
ldlm_mode_t mode;
- struct cl_object_conf conf = { .coc_inode = inode,
- .coc_validate_only = true };
+ struct ldlm_enqueue_info einfo = { .ei_type = LDLM_IBITS,
+ .ei_mode = LCK_CR,
+ .ei_cb_bl = ll_md_blocking_ast,
+ .ei_cb_cp = ldlm_completion_ast,
+ .ei_cbdata = inode };
int rc;
ENTRY;
*gen = 0;
- if (!(ll_i2sbi(inode)->ll_flags & LL_SBI_LAYOUT_LOCK))
+ if (!(sbi->ll_flags & LL_SBI_LAYOUT_LOCK))
RETURN(0);
/* sanity checks */
/* mostly layout lock is caching on the local side, so try to match
* it before grabbing layout lock mutex. */
- mode = ll_take_md_lock(inode, MDS_INODELOCK_LAYOUT, &lockh);
+ mode = ll_take_md_lock(inode, MDS_INODELOCK_LAYOUT, &lockh,
+ LDLM_FL_LVB_READY);
if (mode != 0) { /* hit cached lock */
- struct lov_stripe_md *lsm;
+ /* lsm_layout_gen is started from 0, plus 1 here to distinguish
+ * the cases of no layout and first layout. */
+ *gen = lli->lli_layout_gen + 1;
- lsm = ccc_inode_lsm_get(inode);
- if (lsm != NULL)
- *gen = lsm->lsm_layout_gen;
- ccc_inode_lsm_put(inode, lsm);
ldlm_lock_decref(&lockh, mode);
-
RETURN(0);
}
/* take layout lock mutex to enqueue layout lock exclusively. */
cfs_mutex_lock(&lli->lli_layout_mutex);
- /* make sure the old conf goes away */
- ll_layout_conf(inode, &conf);
+ /* try again inside layout mutex */
+ mode = ll_take_md_lock(inode, MDS_INODELOCK_LAYOUT, &lockh,
+ LDLM_FL_LVB_READY);
+ if (mode != 0) { /* hit cached lock */
+ *gen = lli->lli_layout_gen + 1;
+
+ ldlm_lock_decref(&lockh, mode);
+ cfs_mutex_unlock(&lli->lli_layout_mutex);
+ ll_finish_md_op_data(op_data);
+ RETURN(0);
+ }
+
+ /* have to enqueue one */
+ rc = md_enqueue(sbi->ll_md_exp, &einfo, &it, op_data, &lockh,
+ NULL, 0, NULL, 0);
+ if (it.d.lustre.it_data != NULL)
+ ptlrpc_req_finished(it.d.lustre.it_data);
+ it.d.lustre.it_data = NULL;
- /* enqueue layout lock */
- rc = md_intent_lock(sbi->ll_md_exp, op_data, NULL, 0, &it, 0,
- &req, ll_md_blocking_ast, 0);
if (rc == 0) {
- /* we get a new lock, so update the lock data */
- lockh.cookie = it.d.lustre.it_lock_handle;
- md_set_lock_data(sbi->ll_md_exp, &lockh.cookie, inode, NULL);
-
- /* req == NULL is when lock was found in client cache, without
- * any request to server (but lsm can be canceled just after a
- * release) */
- if (req != NULL) {
- struct ldlm_lock *lock = ldlm_handle2lock(&lockh);
- struct lustre_md md = { NULL };
- void *lmm;
- int lmmsize;
-
- /* for IT_LAYOUT lock, lmm is returned in lock's lvb
- * data via completion callback */
- LASSERT(lock != NULL);
- lmm = lock->l_lvb_data;
- lmmsize = lock->l_lvb_len;
- if (lmm != NULL)
- rc = obd_unpackmd(sbi->ll_dt_exp, &md.lsm,
- lmm, lmmsize);
- if (rc == 0) {
+ struct ldlm_lock *lock;
+ struct cl_object_conf conf;
+ struct lustre_md md = { NULL };
+ void *lmm;
+ int lmmsize;
+
+ LASSERT(lustre_handle_is_used(&lockh));
+
+ /* set lock data in case this is a new lock */
+ ll_set_lock_data(sbi->ll_md_exp, inode, &it, NULL);
+
+ lock = ldlm_handle2lock(&lockh);
+ LASSERT(lock != NULL);
+
+ /* for IT_LAYOUT lock, lmm is returned in lock's lvb
+ * data via completion callback */
+ lmm = lock->l_lvb_data;
+ lmmsize = lock->l_lvb_len;
+ if (lmm != NULL) {
+ rc = obd_unpackmd(sbi->ll_dt_exp, &md.lsm,
+ lmm, lmmsize);
+ if (rc >= 0) {
if (md.lsm != NULL)
- *gen = md.lsm->lsm_layout_gen;
-
- memset(&conf, 0, sizeof conf);
- conf.coc_inode = inode;
- conf.u.coc_md = &md;
- ll_layout_conf(inode, &conf);
- /* is this racy? */
- lli->lli_has_smd = md.lsm != NULL;
+ *gen = md.lsm->lsm_layout_gen + 1;
+ rc = 0;
+ } else {
+ CERROR("file: "DFID" unpackmd error: %d\n",
+ PFID(&lli->lli_fid), rc);
}
- if (md.lsm != NULL)
- obd_free_memmd(sbi->ll_dt_exp, &md.lsm);
-
- LDLM_LOCK_PUT(lock);
- ptlrpc_req_finished(req);
- } else { /* hit caching lock */
- struct lov_stripe_md *lsm;
-
- lsm = ccc_inode_lsm_get(inode);
- if (lsm != NULL)
- *gen = lsm->lsm_layout_gen;
- ccc_inode_lsm_put(inode, lsm);
}
- ll_intent_drop_lock(&it);
+ LDLM_LOCK_PUT(lock);
+
+ /* set layout to file. This may cause lock expiration as we
+ * set layout inside layout ibits lock. */
+ memset(&conf, 0, sizeof conf);
+ conf.coc_inode = inode;
+ conf.u.coc_md = &md;
+ ll_layout_conf(inode, &conf);
+ /* is this racy? */
+ lli->lli_has_smd = md.lsm != NULL;
+ if (md.lsm != NULL)
+ obd_free_memmd(sbi->ll_dt_exp, &md.lsm);
}
+ ll_intent_drop_lock(&it);
+
cfs_mutex_unlock(&lli->lli_layout_mutex);
ll_finish_md_op_data(op_data);
__u64 lli_open_fd_read_count;
__u64 lli_open_fd_write_count;
__u64 lli_open_fd_exec_count;
- /* Protects access to och pointers and their usage counters, also
- * atomicity of check-update of lli_has_smd */
+ /* Protects access to och pointers and their usage counters */
cfs_mutex_t lli_och_mutex;
struct inode lli_vfs_inode;
/* mutex to request for layout lock exclusively. */
cfs_mutex_t lli_layout_mutex;
+ /* valid only inside LAYOUT ibits lock, protected by lli_layout_mutex */
+ __u32 lli_layout_gen;
};
/*
extern int ll_have_md_lock(struct inode *inode, __u64 *bits,
ldlm_mode_t l_req_mode);
extern ldlm_mode_t ll_take_md_lock(struct inode *inode, __u64 bits,
- struct lustre_handle *lockh);
+ struct lustre_handle *lockh, __u64 flags);
int __ll_inode_revalidate_it(struct dentry *, struct lookup_intent *,
__u64 bits);
int ll_revalidate_nd(struct dentry *dentry, struct nameidata *nd);
* resides on the MDS, ie, this file has no objects. */
if (lsm != NULL)
attr->ia_valid &= ~ATTR_SIZE;
+ /* can't call ll_setattr_ost() while holding a refcount of lsm */
+ ccc_inode_lsm_put(inode, lsm);
memcpy(&op_data->op_attr, attr, sizeof(*attr));
GOTO(out, rc);
ll_ioepoch_open(lli, op_data->op_ioepoch);
- if (lsm == NULL || !S_ISREG(inode->i_mode)) {
- CDEBUG(D_INODE, "no lsm: not setting attrs on OST\n");
+ if (!S_ISREG(inode->i_mode))
GOTO(out, rc = 0);
- }
if (ia_valid & ATTR_SIZE)
attr->ia_valid |= ATTR_SIZE;
rc = ll_setattr_ost(inode, attr);
EXIT;
out:
- ccc_inode_lsm_put(inode, lsm);
if (op_data) {
if (op_data->op_ioepoch) {
rc1 = ll_setattr_done_writing(inode, op_data, mod);
LASSERT ((lsm != NULL) == ((body->valid & OBD_MD_FLEASIZE) != 0));
if (lsm != NULL) {
LASSERT(S_ISREG(inode->i_mode));
- cfs_mutex_lock(&lli->lli_och_mutex);
CDEBUG(D_INODE, "adding lsm %p to inode %lu/%u(%p)\n",
lsm, inode->i_ino, inode->i_generation, inode);
/* cl_file_inode_init must go before lli_has_smd or a race
* glimpse would try to use uninitialized lov */
if (cl_file_inode_init(inode, md) == 0)
lli->lli_has_smd = true;
- cfs_mutex_unlock(&lli->lli_och_mutex);
lli->lli_maxbytes = lsm->lsm_maxbytes;
if (lli->lli_maxbytes > MAX_LFS_FILESIZE)
* lock on the client and set LLIF_MDS_SIZE_LOCK holding
* it. */
mode = ll_take_md_lock(inode, MDS_INODELOCK_UPDATE,
- &lockh);
+ &lockh, LDLM_FL_CBPENDING);
if (mode) {
if (lli->lli_flags & (LLIF_DONE_WRITING |
LLIF_EPOCH_PENDING |
static inline int agl_should_run(struct ll_statahead_info *sai,
struct inode *inode)
{
- if (inode != NULL && S_ISREG(inode->i_mode) &&
- ll_i2info(inode)->lli_has_smd && sai->sai_agl_valid)
- return 1;
- return 0;
+ return (inode != NULL && S_ISREG(inode->i_mode) && sai->sai_agl_valid);
}
static inline struct ll_sa_entry *
struct cl_io *io = ios->cis_io;
struct cl_object *obj = io->ci_obj;
struct ccc_io *cio = cl2ccc_io(env, ios);
- __u32 gen;
CLOBINVRNT(env, obj, ccc_object_invariant(obj));
- /* check layout version */
- ll_layout_refresh(ccc_object_inode(obj), &gen);
- if (cio->cui_layout_gen > 0)
- io->ci_need_restart = cio->cui_layout_gen == gen;
+ CDEBUG(D_VFSTRACE, "ignore/verify layout %d/%d, layout version %d.\n",
+ io->ci_ignore_layout, io->ci_verify_layout, cio->cui_layout_gen);
+
+ if (!io->ci_ignore_layout && io->ci_verify_layout) {
+ __u32 gen = 0;
+
+ /* check layout version */
+ ll_layout_refresh(ccc_object_inode(obj), &gen);
+ io->ci_need_restart = cio->cui_layout_gen != gen;
+ if (io->ci_need_restart)
+ CDEBUG(D_VFSTRACE, "layout changed from %d to %d.\n",
+ cio->cui_layout_gen, gen);
+ }
}
static void vvp_io_fault_fini(const struct lu_env *env,
return 0;
}
+int vvp_conf_set(const struct lu_env *env, struct cl_object *obj,
+ const struct cl_object_conf *conf)
+{
+ struct ll_inode_info *lli = ll_i2info(conf->coc_inode);
+
+ if (conf->u.coc_md != NULL && conf->u.coc_md->lsm != NULL)
+ lli->lli_layout_gen = conf->u.coc_md->lsm->lsm_layout_gen;
+
+ return 0;
+}
+
static const struct cl_object_operations vvp_ops = {
.coo_page_init = vvp_page_init,
.coo_lock_init = vvp_lock_init,
.coo_io_init = vvp_io_init,
.coo_attr_get = vvp_attr_get,
.coo_attr_set = vvp_attr_set,
- .coo_conf_set = ccc_conf_set,
+ .coo_conf_set = vvp_conf_set,
.coo_glimpse = ccc_object_glimpse
};
int lov_lock_init_raid0 (const struct lu_env *env, struct cl_object *obj,
struct cl_lock *lock, const struct cl_io *io);
+int lov_lock_init_empty (const struct lu_env *env, struct cl_object *obj,
+ struct cl_lock *lock, const struct cl_io *io);
int lov_io_init_raid0 (const struct lu_env *env, struct cl_object *obj,
struct cl_io *io);
int lov_io_init_empty (const struct lu_env *env, struct cl_object *obj,
io->ci_result = 0;
lio->lis_object = obj;
- LASSERT(lio->lis_lsm != NULL);
+ LASSERT(obj->lo_lsm != NULL);
+ lio->lis_lsm = lsm_addref(obj->lo_lsm);
lio->lis_stripe_count = lio->lis_lsm->lsm_stripe_count;
switch (io->ci_type) {
switch (io->ci_type) {
default:
LBUG();
+ case CIT_MISC:
+ case CIT_READ:
+ result = 0;
+ break;
case CIT_FSYNC:
- case CIT_MISC:
- case CIT_READ:
- result = 0;
- break;
+ case CIT_SETATTR:
+ result = +1;
+ break;
case CIT_WRITE:
- case CIT_SETATTR:
result = -EBADF;
break;
case CIT_FAULT:
}
if (result == 0)
cl_io_slice_add(io, &lio->lis_cl, obj, &lov_empty_io_ops);
- io->ci_result = result;
- RETURN(result != 0);
+ io->ci_result = result < 0 ? result : 0;
+ RETURN(result != 0);
}
/** @} lov */
RETURN(result);
}
+static void lov_empty_lock_fini(const struct lu_env *env,
+ struct cl_lock_slice *slice)
+{
+ struct lov_lock *lck = cl2lov_lock(slice);
+ OBD_SLAB_FREE_PTR(lck, lov_lock_kmem);
+}
+
+static int lov_empty_lock_print(const struct lu_env *env, void *cookie,
+ lu_printer_t p, const struct cl_lock_slice *slice)
+{
+ (*p)(env, cookie, "empty\n");
+ return 0;
+}
+
+static const struct cl_lock_operations lov_empty_lock_ops = {
+ .clo_fini = lov_empty_lock_fini,
+ .clo_print = lov_empty_lock_print
+};
+
+int lov_lock_init_empty(const struct lu_env *env, struct cl_object *obj,
+ struct cl_lock *lock, const struct cl_io *io)
+{
+ struct lov_lock *lck;
+ int result = -ENOMEM;
+
+ ENTRY;
+ OBD_SLAB_ALLOC_PTR_GFP(lck, lov_lock_kmem, CFS_ALLOC_IO);
+ if (lck != NULL) {
+ cl_lock_slice_add(lock, &lck->lls_cl, obj, &lov_empty_lock_ops);
+ lck->lls_orig = lock->cll_descr;
+ result = 0;
+ }
+ RETURN(result);
+}
+
static struct cl_lock_closure *lov_closure_get(const struct lu_env *env,
struct cl_lock *parent)
{
union lov_layout_state *state)
{
LASSERT(lov->lo_type == LLT_EMPTY);
+ cl_object_prune(env, &lov->lo_cl);
return 0;
}
for (i = 0; i < r0->lo_nr; ++i) {
struct lovsub_object *los = r0->lo_sub[i];
- if (los != NULL)
+ if (los != NULL) {
+ cl_locks_prune(env, &los->lso_cl, 1);
/*
* If top-level object is to be evicted from
* the cache, so are its sub-objects.
*/
lov_subobject_kill(env, lov, los, i);
+ }
}
}
RETURN(0);
ENTRY;
/* this is called w/o holding type guard mutex, so it must be inside
- * an on going IO otherwise lsm may be replaced. */
- LASSERT(cfs_atomic_read(&lsm->lsm_refc) > 1);
+ * an on going IO otherwise lsm may be replaced.
+ * LU-2117: it turns out there exists one exception. For mmaped files,
+ * the lock of those files may be requested in the other file's IO
+ * context, and this function is called in ccc_lock_state(), it will
+ * hit this assertion.
+ * Anyway, it's still okay to call attr_get w/o type guard as layout
+ * can't go if locks exist. */
+ /* LASSERT(cfs_atomic_read(&lsm->lsm_refc) > 1); */
if (!r0->lo_attr_valid) {
/*
.llo_install = lov_install_empty,
.llo_print = lov_print_empty,
.llo_page_init = lov_page_init_empty,
- .llo_lock_init = NULL,
+ .llo_lock_init = lov_lock_init_empty,
.llo_io_init = lov_io_init_empty,
.llo_getattr = lov_attr_get_empty
},
lov_conf_thaw(__obj); \
} while (0)
+static void lov_conf_lock(struct lov_object *lov)
+{
+ LASSERT(lov->lo_owner != cfs_current());
+ cfs_down_write(&lov->lo_type_guard);
+ LASSERT(lov->lo_owner == NULL);
+ lov->lo_owner = cfs_current();
+}
+
+static void lov_conf_unlock(struct lov_object *lov)
+{
+ lov->lo_owner = NULL;
+ cfs_up_write(&lov->lo_type_guard);
+}
+
static int lov_layout_wait(const struct lu_env *env, struct lov_object *lov)
{
struct l_wait_info lwi = { 0 };
if (!lov->lo_lsm_invalid || lsm == NULL)
RETURN(0);
- l_wait_event(lov->lo_waitq, cfs_atomic_read(&lsm->lsm_refc) == 1, &lwi);
+ LASSERT(cfs_atomic_read(&lsm->lsm_refc) > 0);
+ while (cfs_atomic_read(&lsm->lsm_refc) > 1) {
+ lov_conf_unlock(lov);
+ l_wait_event(lov->lo_waitq,
+ cfs_atomic_read(&lsm->lsm_refc) == 1, &lwi);
+ lov_conf_lock(lov);
+ }
RETURN(0);
}
-static int lov_layout_change(const struct lu_env *env,
+static int lov_layout_change(const struct lu_env *unused,
struct lov_object *lov, enum lov_layout_type llt,
const struct cl_object_conf *conf)
{
struct cl_object_header *hdr = cl_object_header(&lov->lo_cl);
void *cookie;
- struct lu_env *nested;
+ struct lu_env *env;
int refcheck;
LASSERT(0 <= lov->lo_type && lov->lo_type < ARRAY_SIZE(lov_dispatch));
ENTRY;
cookie = cl_env_reenter();
- nested = cl_env_get(&refcheck);
- if (!IS_ERR(nested))
- cl_object_prune(nested, &lov->lo_cl);
- else
- result = PTR_ERR(nested);
- cl_env_put(nested, &refcheck);
- cl_env_reexit(cookie);
+ env = cl_env_get(&refcheck);
+ if (IS_ERR(env)) {
+ cl_env_reexit(cookie);
+ RETURN(PTR_ERR(env));
+ }
old_ops = &lov_dispatch[lov->lo_type];
new_ops = &lov_dispatch[llt];
/* this file becomes an EMPTY file. */
}
}
+
+ cl_env_put(env, &refcheck);
+ cl_env_reexit(cookie);
RETURN(result);
}
static int lov_conf_set(const struct lu_env *env, struct cl_object *obj,
const struct cl_object_conf *conf)
{
- struct lov_stripe_md *lsm = conf->u.coc_md->lsm;
+ struct lov_stripe_md *lsm = NULL;
struct lov_object *lov = cl2lov(obj);
int result = 0;
ENTRY;
- /*
- * Only LLT_EMPTY <-> LLT_RAID0 transitions are supported.
- */
- LASSERT(lov->lo_owner != cfs_current());
- cfs_down_write(&lov->lo_type_guard);
- LASSERT(lov->lo_owner == NULL);
- lov->lo_owner = cfs_current();
-
+ lov_conf_lock(lov);
if (conf->coc_invalidate) {
lov->lo_lsm_invalid = 1;
GOTO(out, result = 0);
}
- if (conf->coc_validate_only) {
- if (!lov->lo_lsm_invalid)
- GOTO(out, result = 0);
+ if (conf->u.coc_md != NULL)
+ lsm = conf->u.coc_md->lsm;
- lov_layout_wait(env, lov);
- /* fall through to set up new layout */
+ if ((lsm == NULL && lov->lo_lsm == NULL) ||
+ (lsm != NULL && lov->lo_lsm != NULL &&
+ lov->lo_lsm->lsm_layout_gen == lsm->lsm_layout_gen)) {
+ lov->lo_lsm_invalid = 0;
+ GOTO(out, result = 0);
}
+ /* will change layout */
+ lov_layout_wait(env, lov);
+
+ /*
+ * Only LLT_EMPTY <-> LLT_RAID0 transitions are supported.
+ */
switch (lov->lo_type) {
case LLT_EMPTY:
if (lsm != NULL)
EXIT;
out:
- lov->lo_owner = NULL;
- cfs_up_write(&lov->lo_type_guard);
+ lov_conf_unlock(lov);
RETURN(result);
}
struct cl_page *lov_page_init(const struct lu_env *env, struct cl_object *obj,
struct cl_page *page, cfs_page_t *vmpage)
{
- return LOV_2DISPATCH(cl2lov(obj),
- llo_page_init, env, obj, page, vmpage);
+ return LOV_2DISPATCH_NOLOCK(cl2lov(obj),
+ llo_page_init, env, obj, page, vmpage);
}
/**
int lov_io_init(const struct lu_env *env, struct cl_object *obj,
struct cl_io *io)
{
- struct lov_io *lio = lov_env_io(env);
-
CL_IO_SLICE_CLEAN(lov_env_io(env), lis_cl);
-
- /* hold lsm before initializing because io relies on it */
- lio->lis_lsm = lov_lsm_addref(cl2lov(obj));
-
- /* No need to lock because we've taken one refcount of layout. */
- return LOV_2DISPATCH_NOLOCK(cl2lov(obj), llo_io_init, env, obj, io);
+ return LOV_2DISPATCH_MAYLOCK(cl2lov(obj), llo_io_init,
+ !io->ci_ignore_layout, env, obj, io);
}
/**
struct lov_stripe_md *lsm = NULL;
lov_conf_freeze(lov);
- if (!lov->lo_lsm_invalid && lov->lo_lsm != NULL) {
+ if (lov->lo_lsm != NULL) {
lsm = lsm_addref(lov->lo_lsm);
- CDEBUG(D_INODE, "lsm %p addref %d by %p.\n",
- lsm, cfs_atomic_read(&lsm->lsm_refc), cfs_current());
+ CDEBUG(D_INODE, "lsm %p addref %d/%d by %p.\n",
+ lsm, cfs_atomic_read(&lsm->lsm_refc),
+ lov->lo_lsm_invalid, cfs_current());
}
lov_conf_thaw(lov);
return lsm;
ENTRY;
while (!cfs_list_empty(&io->ci_layers)) {
- slice = container_of(io->ci_layers.next, struct cl_io_slice,
+ slice = container_of(io->ci_layers.prev, struct cl_io_slice,
cis_linkage);
cfs_list_del_init(&slice->cis_linkage);
if (slice->cis_iop->op[io->ci_type].cio_fini != NULL)
case CIT_FSYNC:
LASSERT(!io->ci_need_restart);
break;
+ case CIT_SETATTR:
case CIT_MISC:
/* Check ignore layout change conf */
- LASSERT(ergo(io->ci_ignore_layout, !io->ci_need_restart));
- case CIT_SETATTR:
+ LASSERT(ergo(io->ci_ignore_layout || !io->ci_verify_layout,
+ !io->ci_need_restart));
break;
default:
LBUG();
cfs_spin_lock(&head->coh_lock_guard);
cfs_list_del_init(&lock->cll_linkage);
-
cfs_spin_unlock(&head->coh_lock_guard);
+
/*
* From now on, no new references to this lock can be acquired
* by cl_lock_lookup().