This patch adds a reference counter to the lsm structure.
Each time a lsm is used, a reference must be taken with lsm_get/addref
(lsm_addref() can be used when we already own a reference on the lsm).
This reference can be released via obd_free_memmd(). The lsm is freed
when the last reference is dropped.
This patch also moves lov_stripe_md into a private data of
lov_object.
Signed-off-by: Jacques-Charles Lafoucriere <jacques-charles.lafoucriere@cea.fr>
Signed-off-by: Jinshan Xiong <jinshan.xiong@whamcloud.com>
Change-Id: I156b4cc2dc82bb15ae8107cf9842f100048c03d4
Reviewed-on: http://review.whamcloud.com/1874
Tested-by: Hudson
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Johann Lombardi <johann@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
#define _LUSTRE_POSIX_TYPES_H
#include <asm/types.h>
+#include <stdbool.h> /* for bool */
#ifndef HAVE_UMODE_T
typedef unsigned short cfs_umode_t;
#else
struct ccc_grouplock *cg);
void cl_put_grouplock(struct ccc_grouplock *cg);
+/**
+ * New interfaces to get and put lov_stripe_md from lov layer. This violates
+ * layering because lov_stripe_md is supposed to be a private data in lov.
+ *
+ * NB: If you find you have to use these interfaces for your new code, please
+ * think about it again. These interfaces may be removed in the future for
+ * better layering. */
+struct lov_stripe_md *lov_lsm_get(struct cl_object *clobj);
+void lov_lsm_put(struct cl_object *clobj, struct lov_stripe_md *lsm);
+int lov_read_and_clear_async_rc(struct cl_object *clob);
+
+struct lov_stripe_md *ccc_inode_lsm_get(struct inode *inode);
+void ccc_inode_lsm_put(struct inode *inode, struct lov_stripe_md *lsm);
+
#endif /*LCLIENT_H */
}
struct lov_stripe_md {
+ cfs_atomic_t lsm_refc;
cfs_spinlock_t lsm_lock;
pid_t lsm_lock_owner; /* debugging */
result = 0;
if (!(lli->lli_flags & LLIF_MDS_SIZE_LOCK)) {
CDEBUG(D_DLMTRACE, "Glimpsing inode "DFID"\n", PFID(fid));
- if (lli->lli_smd) {
+ if (lli->lli_has_smd) {
/* NOTE: this looks like DLM lock request, but it may
* not be one. Due to CEF_ASYNC flag (translated
* to LDLM_FL_HAS_INTENT by osc), this is
ENTRY;
- if (!cl_i2info(inode)->lli_smd)
+ if (!cl_i2info(inode)->lli_has_smd)
RETURN(0);
result = cl_io_get(inode, &env, &io, &refcheck);
return 0;
}
+static void ccc_object_size_lock(struct cl_object *obj)
+{
+ struct inode *inode = ccc_object_inode(obj);
+
+ cl_isize_lock(inode);
+ cl_object_attr_lock(obj);
+}
+
+static void ccc_object_size_unlock(struct cl_object *obj)
+{
+ struct inode *inode = ccc_object_inode(obj);
+
+ cl_object_attr_unlock(obj);
+ cl_isize_unlock(inode);
+}
+
/*****************************************************************************
*
* Page operations.
* cancel the result of the truncate. Getting the
* ll_inode_size_lock() after the enqueue maintains the DLM
* -> ll_inode_size_lock() acquiring order. */
- cl_isize_lock(inode, 0);
- cl_object_attr_lock(obj);
+ ccc_object_size_lock(obj);
rc = cl_object_attr_get(env, obj, attr);
if (rc == 0) {
if (lock->cll_descr.cld_start == 0 &&
} else {
CL_LOCK_DEBUG(D_INFO, env, lock, "attr_get: %d\n", rc);
}
- cl_object_attr_unlock(obj);
- cl_isize_unlock(inode, 0);
- }
- EXIT;
+ ccc_object_size_unlock(obj);
+ }
+ EXIT;
}
/*****************************************************************************
}
}
-static void ccc_object_size_lock(struct cl_object *obj)
-{
- struct inode *inode = ccc_object_inode(obj);
-
- cl_isize_lock(inode, 0);
- cl_object_attr_lock(obj);
-}
-
-static void ccc_object_size_unlock(struct cl_object *obj)
-{
- struct inode *inode = ccc_object_inode(obj);
-
- cl_object_attr_unlock(obj);
- cl_isize_unlock(inode, 0);
-}
-
/**
* Helper function that if necessary adjusts file size (inode->i_size), when
* position at the offset \a pos is accessed. File size can be arbitrary stale
gen = (fid_flatten(fid) >> 32);
RETURN(gen);
}
+
+struct lov_stripe_md *ccc_inode_lsm_get(struct inode *inode)
+{
+ return lov_lsm_get(cl_i2info(inode)->lli_clob);
+}
+
+void inline ccc_inode_lsm_put(struct inode *inode, struct lov_stripe_md *lsm)
+{
+ lov_lsm_put(cl_i2info(inode)->lli_clob, lsm);
+}
struct intnl_stat *st = llu_i2stat(inode);
struct ptlrpc_request *request;
struct lookup_intent *it;
- struct lov_stripe_md *lsm;
int rc = 0;
ENTRY;
if (!S_ISREG(st->st_mode))
GOTO(out_release, rc = 0);
- lsm = lli->lli_smd;
- if (lsm)
+ if (lli->lli_has_smd)
flags &= ~O_LOV_DELAY_CREATE;
/*XXX: open_flags are overwritten and the previous ones are lost */
lli->lli_open_flags = flags & ~(O_CREAT | O_EXCL | O_TRUNC);
old_flags & MF_GETATTR_LOCK);
if (rc) {
oa.o_valid = 0;
- if (rc == -ENOENT)
- CDEBUG(D_INODE, "objid "LPX64" is destroyed\n",
- lli->lli_smd->lsm_object_id);
- else
+ if (rc != -ENOENT)
CERROR("inode_getattr failed (%d): unable to "
"send a Size-on-MDS attribute update "
"for inode %llu/%lu\n", rc,
struct llu_sb_info *lli_sbi;
struct lu_fid lli_fid;
- struct lov_stripe_md *lli_smd;
char *lli_symlink_name;
__u64 lli_maxbytes;
unsigned long lli_flags;
* was opened several times without close, we track an
* open_count here */
struct ll_file_data *lli_file_data;
+ bool lli_has_smd;
int lli_open_flags;
int lli_open_count;
return llu_i2info(inode)->lli_file_data;
}
-static inline void cl_isize_lock(struct inode *inode, int lsmlock)
+static inline void cl_isize_lock(struct inode *inode)
{
}
-static inline void cl_isize_unlock(struct inode *inode, int lsmlock)
+static inline void cl_isize_unlock(struct inode *inode)
{
}
if (md.lsm != NULL)
obd_free_memmd(sbi->ll_dt_exp, &md.lsm);
RETURN(inode ? PTR_ERR(inode) : -ENOMEM);
- } else if (md.lsm != NULL &&
- llu_i2info(inode)->lli_smd != md.lsm) {
+ } else if (md.lsm != NULL) {
obd_free_memmd(sbi->ll_dt_exp, &md.lsm);
}
/* If this is a stat, get the authoritative file size */
if (it->it_op == IT_GETATTR && S_ISREG(st->st_mode) &&
- lli->lli_smd != NULL) {
- struct lov_stripe_md *lsm = lli->lli_smd;
+ lli->lli_has_smd) {
ldlm_error_t rc;
- LASSERT(lsm->lsm_object_id != 0);
-
/* bug 2334: drop MDS lock before acquiring OST lock */
ll_intent_drop_lock(it);
size_t llap_cookie_size;
-static int llu_lock_to_stripe_offset(struct inode *inode, struct ldlm_lock *lock)
+static int llu_lock_to_stripe_offset(struct obd_export *exp,
+ struct lov_stripe_md *lsm,
+ struct ldlm_lock *lock)
{
- struct llu_inode_info *lli = llu_i2info(inode);
- struct lov_stripe_md *lsm = lli->lli_smd;
- struct obd_export *exp = llu_i2obdexp(inode);
struct {
char name[16];
struct ldlm_lock *lock;
int rc;
ENTRY;
- if (lsm->lsm_stripe_count == 1)
- RETURN(0);
+ if (lsm == NULL || lsm->lsm_stripe_count == 1)
+ RETURN(0);
/* get our offset in the lov */
rc = obd_get_info(NULL, exp, sizeof(key), &key, &vallen, &stripe, lsm);
LBUG();
}
LASSERT(stripe < lsm->lsm_stripe_count);
- RETURN(stripe);
+ RETURN(stripe);
}
int llu_extent_lock_cancel_cb(struct ldlm_lock *lock,
lli= llu_i2info(inode);
if (!lli)
goto iput;
- if (!lli->lli_smd)
- goto iput;
- lsm = lli->lli_smd;
+ if (!lli->lli_has_smd)
+ goto iput;
- stripe = llu_lock_to_stripe_offset(inode, lock);
+ lsm = ccc_inode_lsm_get(inode);
+ if (lsm == NULL)
+ goto iput;
+
+ stripe = llu_lock_to_stripe_offset(llu_i2obdexp(inode),
+ lsm, lock);
lock_res_and_lock(lock);
kms = ldlm_extent_shift_kms(lock,
lsm->lsm_oinfo[stripe]->loi_kms);
LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,
lsm->lsm_oinfo[stripe]->loi_kms, kms);
loi_kms_set(lsm->lsm_oinfo[stripe], kms);
+ ccc_inode_lsm_put(inode, lsm);
iput:
I_RELE(inode);
break;
static int llu_glimpse_callback(struct ldlm_lock *lock, void *reqp)
{
- struct ptlrpc_request *req = reqp;
- struct inode *inode = llu_inode_from_lock(lock);
- struct llu_inode_info *lli;
- struct ost_lvb *lvb;
+ struct ptlrpc_request *req = reqp;
+ struct inode *inode = llu_inode_from_lock(lock);
+ struct llu_inode_info *lli;
+ struct ost_lvb *lvb;
+ struct lov_stripe_md *lsm;
int rc, stripe = 0;
ENTRY;
lli = llu_i2info(inode);
if (lli == NULL)
GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
- if (lli->lli_smd == NULL)
- GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
-
- /* First, find out which stripe index this lock corresponds to. */
- if (lli->lli_smd->lsm_stripe_count > 1)
- stripe = llu_lock_to_stripe_offset(inode, lock);
req_capsule_extend(&req->rq_pill, &RQF_LDLM_GL_CALLBACK);
req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
GOTO(iput, rc);
}
- lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB);
- lvb->lvb_size = lli->lli_smd->lsm_oinfo[stripe]->loi_kms;
+ lsm = ccc_inode_lsm_get(inode);
+ if (lsm == NULL)
+ GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);
+
+ /* First, find out which stripe index this lock corresponds to. */
+ stripe = llu_lock_to_stripe_offset(llu_i2obdexp(inode), lsm, lock);
+
+ lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB);
+ lvb->lvb_size = lsm->lsm_oinfo[stripe]->loi_kms;
+ ccc_inode_lsm_put(inode, lsm);
LDLM_DEBUG(lock, "i_size: "LPU64" -> stripe number %u -> kms "LPU64,
(__u64)llu_i2stat(inode)->st_size, stripe,lvb->lvb_size);
int llu_merge_lvb(struct inode *inode)
{
- struct llu_inode_info *lli = llu_i2info(inode);
- struct llu_sb_info *sbi = llu_i2sbi(inode);
- struct intnl_stat *st = llu_i2stat(inode);
- struct ost_lvb lvb;
- int rc;
- ENTRY;
-
- lov_stripe_lock(lli->lli_smd);
+ struct llu_inode_info *lli = llu_i2info(inode);
+ struct llu_sb_info *sbi = llu_i2sbi(inode);
+ struct intnl_stat *st = llu_i2stat(inode);
+ struct ost_lvb lvb;
+ struct lov_stripe_md *lsm;
+ int rc;
+ ENTRY;
+
+ lsm = ccc_inode_lsm_get(inode);
+ if (lsm == NULL)
+ RETURN(0);
+
+ lov_stripe_lock(lsm);
inode_init_lvb(inode, &lvb);
/* merge timestamps the most resently obtained from mds with
timestamps obtained from osts */
lvb.lvb_atime = lli->lli_lvb.lvb_atime;
lvb.lvb_mtime = lli->lli_lvb.lvb_mtime;
lvb.lvb_ctime = lli->lli_lvb.lvb_ctime;
- rc = obd_merge_lvb(sbi->ll_dt_exp, lli->lli_smd, &lvb, 0);
+ rc = obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 0);
st->st_size = lvb.lvb_size;
st->st_blocks = lvb.lvb_blocks;
/* handle st_blocks overflow gracefully */
st->st_mtime = lvb.lvb_mtime;
st->st_atime = lvb.lvb_atime;
st->st_ctime = lvb.lvb_ctime;
- lov_stripe_unlock(lli->lli_smd);
+ lov_stripe_unlock(lsm);
+ ccc_inode_lsm_put(inode, lsm);
- RETURN(rc);
+ RETURN(rc);
}
int llu_extent_lock(struct ll_file_data *fd, struct inode *inode,
st->st_mode = (st->st_mode & ~S_IFMT)|(body->mode & S_IFMT);
if (lsm != NULL) {
- if (lli->lli_smd == NULL) {
- cl_file_inode_init(inode, md);
- lli->lli_smd = lsm;
- lli->lli_maxbytes = lsm->lsm_maxbytes;
- if (lli->lli_maxbytes > MAX_LFS_FILESIZE)
- lli->lli_maxbytes = MAX_LFS_FILESIZE;
- } else {
- if (lov_stripe_md_cmp(lli->lli_smd, lsm)) {
- CERROR("lsm mismatch for inode %lld\n",
- (long long)st->st_ino);
- LBUG();
- }
- }
+ if (!lli->lli_has_smd) {
+ cl_file_inode_init(inode, md);
+ lli->lli_has_smd = true;
+ lli->lli_maxbytes = lsm->lsm_maxbytes;
+ if (lli->lli_maxbytes > MAX_LFS_FILESIZE)
+ lli->lli_maxbytes = MAX_LFS_FILESIZE;
+ }
+ if (md->lsm != NULL)
+ obd_free_memmd(llu_i2obdexp(inode), &md->lsm);
}
if (body->valid & OBD_MD_FLATIME) {
lli->lli_st_flags = body->flags;
if (body->valid & OBD_MD_FLSIZE) {
if ((llu_i2sbi(inode)->ll_lco.lco_flags & OBD_CONNECT_SOM) &&
- S_ISREG(st->st_mode) && lli->lli_smd) {
+ S_ISREG(st->st_mode) && lli->lli_has_smd) {
struct lustre_handle lockh;
ldlm_mode_t mode;
int llu_inode_getattr(struct inode *inode, struct obdo *obdo,
__u64 ioepoch, int sync)
{
- struct llu_inode_info *lli = llu_i2info(inode);
- struct ptlrpc_request_set *set;
- struct lov_stripe_md *lsm = lli->lli_smd;
- struct obd_info oinfo = { { { 0 } } };
- int rc;
- ENTRY;
+ struct ptlrpc_request_set *set;
+ struct lov_stripe_md *lsm = NULL;
+ struct obd_info oinfo = { { { 0 } } };
+ int rc;
+ ENTRY;
+ lsm = ccc_inode_lsm_get(inode);
LASSERT(lsm);
oinfo.oi_md = lsm;
rc = ptlrpc_set_wait(set);
ptlrpc_set_destroy(set);
}
+ ccc_inode_lsm_put(inode, lsm);
if (rc)
RETURN(rc);
obdo_refresh_inode(inode, oinfo.oi_oa, oinfo.oi_oa->o_valid);
CDEBUG(D_INODE, "objid "LPX64" size %llu, blocks %llu, "
- "blksize %llu\n", lli->lli_smd->lsm_object_id,
- (long long unsigned)llu_i2stat(inode)->st_size,
- (long long unsigned)llu_i2stat(inode)->st_blocks,
- (long long unsigned)llu_i2stat(inode)->st_blksize);
- RETURN(0);
+ "blksize %llu\n", oinfo.oi_oa->o_id,
+ (long long unsigned)llu_i2stat(inode)->st_size,
+ (long long unsigned)llu_i2stat(inode)->st_blocks,
+ (long long unsigned)llu_i2stat(inode)->st_blksize);
+ RETURN(0);
}
static struct inode* llu_new_inode(struct filesys *fs,
/* initialize lli here */
lli->lli_sbi = llu_fs2sbi(fs);
- lli->lli_smd = NULL;
+ lli->lli_has_smd = false;
lli->lli_symlink_name = NULL;
lli->lli_flags = 0;
lli->lli_maxbytes = (__u64)(~0UL);
llu_update_inode(inode, &md);
- if (md.lsm != NULL && lli->lli_smd != md.lsm)
- obd_free_memmd(sbi->ll_dt_exp, &md.lsm);
- ptlrpc_req_finished(req);
- }
+ if (md.lsm != NULL)
+ obd_free_memmd(sbi->ll_dt_exp, &md.lsm);
+ ptlrpc_req_finished(req);
+ }
- if (!lli->lli_smd) {
+ if (!lli->lli_has_smd) {
/* object not yet allocated, don't validate size */
st->st_atime = lli->lli_lvb.lvb_atime;
st->st_mtime = lli->lli_lvb.lvb_mtime;
void llu_clear_inode(struct inode *inode)
{
- struct llu_inode_info *lli = llu_i2info(inode);
- struct llu_sb_info *sbi = llu_i2sbi(inode);
+ struct llu_inode_info *lli = llu_i2info(inode);
+ struct llu_sb_info *sbi = llu_i2sbi(inode);
+ struct lov_stripe_md *lsm;
ENTRY;
CDEBUG(D_VFSTRACE, "VFS Op:inode=%llu/%lu(%p)\n",
md_change_cbdata(sbi->ll_md_exp, ll_inode2fid(inode),
null_if_equal, inode);
- if (lli->lli_smd)
- obd_change_cbdata(sbi->ll_dt_exp, lli->lli_smd,
- null_if_equal, inode);
+ lsm = ccc_inode_lsm_get(inode);
+ if (lsm != NULL)
+ obd_change_cbdata(sbi->ll_dt_exp, lsm, null_if_equal, inode);
+ ccc_inode_lsm_put(inode, lsm);
- cl_inode_fini(inode);
-
- if (lli->lli_smd) {
- obd_free_memmd(sbi->ll_dt_exp, &lli->lli_smd);
- lli->lli_smd = NULL;
- }
+ cl_inode_fini(inode);
+ lli->lli_has_smd = false;
if (lli->lli_symlink_name) {
OBD_FREE(lli->lli_symlink_name,
* inode_setattr() is only ever invoked with ATTR_SIZE (by
* llu_setattr_raw()) when file has no bodies. Check this.
*/
- LASSERT(ergo(ia_valid & ATTR_SIZE, llu_i2info(inode)->lli_smd == NULL));
+ LASSERT(ergo(ia_valid & ATTR_SIZE, !llu_i2info(inode)->lli_has_smd));
if (ia_valid & ATTR_SIZE)
st->st_size = attr->ia_size;
*/
int llu_setattr_raw(struct inode *inode, struct iattr *attr)
{
- struct lov_stripe_md *lsm = llu_i2info(inode)->lli_smd;
+ int has_lsm = llu_i2info(inode)->lli_has_smd;
struct intnl_stat *st = llu_i2stat(inode);
int ia_valid = attr->ia_valid;
struct md_op_data op_data = { { 0 } };
if (attr->ia_valid & (ATTR_MTIME | ATTR_CTIME))
CDEBUG(D_INODE, "setting mtime "CFS_TIME_T", ctime "CFS_TIME_T
", now = "CFS_TIME_T"\n",
- LTIME_S(attr->ia_mtime), LTIME_S(attr->ia_ctime),
- LTIME_S(CFS_CURRENT_TIME));
-
- /* NB: ATTR_SIZE will only be set after this point if the size
- * resides on the MDS, ie, this file has no objects. */
- if (lsm)
- attr->ia_valid &= ~ATTR_SIZE;
-
- /* If only OST attributes being set on objects, don't do MDS RPC.
- * In that case, we need to check permissions and update the local
- * inode ourselves so we can call obdo_from_inode() always. */
- if (ia_valid & (lsm ? ~(ATTR_FROM_OPEN | ATTR_RAW) : ~0)) {
+ LTIME_S(attr->ia_mtime), LTIME_S(attr->ia_ctime),
+ LTIME_S(CFS_CURRENT_TIME));
+
+ /* NB: ATTR_SIZE will only be set after this point if the size
+ * resides on the MDS, ie, this file has no objects. */
+ if (has_lsm)
+ attr->ia_valid &= ~ATTR_SIZE;
+
+ /* If only OST attributes being set on objects, don't do MDS RPC.
+ * In that case, we need to check permissions and update the local
+ * inode ourselves so we can call obdo_from_inode() always. */
+ if (ia_valid & (has_lsm ? ~(ATTR_FROM_OPEN | ATTR_RAW) : ~0)) {
memcpy(&op_data.op_attr, attr, sizeof(*attr));
/* Open epoch for truncate. */
RETURN(rc);
llu_ioepoch_open(llu_i2info(inode), op_data.op_ioepoch);
- if (!lsm || !S_ISREG(st->st_mode)) {
+ if (!has_lsm || !S_ISREG(st->st_mode)) {
CDEBUG(D_INODE, "no lsm: not setting attrs on OST\n");
GOTO(out, rc);
}
int rc = 0;
ENTRY;
- if (lli->lli_smd) {
+ if (lli->lli_has_smd) {
CDEBUG(D_IOCTL, "stripe already exists for ino "DFID"\n",
PFID(&lli->lli_fid));
return -EEXIST;
static int llu_lov_getstripe(struct inode *ino, unsigned long arg)
{
- struct lov_stripe_md *lsm = llu_i2info(ino)->lli_smd;
-
- if (!lsm)
- RETURN(-ENODATA);
-
- return obd_iocontrol(LL_IOC_LOV_GETSTRIPE, llu_i2obdexp(ino), 0, lsm,
- (void *)arg);
+ struct lov_stripe_md *lsm = NULL;
+ int rc = -ENODATA;
+
+ lsm = ccc_inode_lsm_get(ino);
+ if (lsm != NULL)
+ rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, llu_i2obdexp(ino), 0, lsm,
+ (void *)arg);
+ ccc_inode_lsm_put(ino, lsm);
+ return rc;
}
static int llu_iop_ioctl(struct inode *ino, unsigned long int request,
* < 0 error */
static int find_cbdata(struct inode *inode)
{
- struct ll_inode_info *lli = ll_i2info(inode);
- struct ll_sb_info *sbi = ll_i2sbi(inode);
+ struct ll_sb_info *sbi = ll_i2sbi(inode);
+ struct lov_stripe_md *lsm;
int rc = 0;
ENTRY;
if (rc != 0)
RETURN(rc);
- if (lli->lli_smd)
- rc = obd_find_cbdata(sbi->ll_dt_exp, lli->lli_smd,
- return_if_equal, NULL);
+ lsm = ccc_inode_lsm_get(inode);
+ if (lsm == NULL)
+ RETURN(rc);
- RETURN(rc);
+ rc = obd_find_cbdata(sbi->ll_dt_exp, lsm, return_if_equal, NULL);
+ ccc_inode_lsm_put(inode, lsm);
+
+ RETURN(rc);
}
/**
LASSERT(d_refcount(de) == 1);
#endif
+ /* Disable this piece of code temproarily because this is called
+ * inside dcache_lock so it's not appropriate to do lots of work
+ * here. */
+#if 0
/* if not ldlm lock for this inode, set i_nlink to 0 so that
* this inode can be recycled later b=20433 */
if (de->d_inode && !find_cbdata(de->d_inode))
de->d_inode->i_nlink = 0;
+#endif
if (d_lustre_invalid((struct dentry *)de))
- RETURN(1);
-
- RETURN(0);
+ RETURN(1);
+ RETURN(0);
}
static int ll_set_dd(struct dentry *de)
{
- ENTRY;
- LASSERT(de != NULL);
+ ENTRY;
+ LASSERT(de != NULL);
- CDEBUG(D_DENTRY, "ldd on dentry %.*s (%p) parent %p inode %p refc %d\n",
- de->d_name.len, de->d_name.name, de, de->d_parent, de->d_inode,
+ CDEBUG(D_DENTRY, "ldd on dentry %.*s (%p) parent %p inode %p refc %d\n",
+ de->d_name.len, de->d_name.name, de, de->d_parent, de->d_inode,
d_refcount(de));
if (de->d_fsdata == NULL) {
else
OBD_FREE_PTR(lld);
spin_unlock(&de->d_lock);
- } else {
- RETURN(-ENOMEM);
- }
- }
+ } else {
+ RETURN(-ENOMEM);
+ }
+ }
- RETURN(0);
+ RETURN(0);
}
int ll_dops_init(struct dentry *de, int block, int init_sa)
RETURN(rc);
}
-int lov_test_and_clear_async_rc(struct lov_stripe_md *lsm);
-
/* While this returns an error code, fput() the caller does not, so we need
* to make every effort to clean up all of our state here. Also, applications
* rarely check close errors and even if an error is returned they will not
struct ll_file_data *fd;
struct ll_sb_info *sbi = ll_i2sbi(inode);
struct ll_inode_info *lli = ll_i2info(inode);
- struct lov_stripe_md *lsm = lli->lli_smd;
int rc;
ENTRY;
}
if (!S_ISDIR(inode->i_mode)) {
- if (lsm)
- lov_test_and_clear_async_rc(lsm);
+ lov_read_and_clear_async_rc(lli->lli_clob);
lli->lli_async_rc = 0;
}
struct ll_inode_info *lli = ll_i2info(inode);
struct lookup_intent *it, oit = { .it_op = IT_OPEN,
.it_flags = file->f_flags };
- struct lov_stripe_md *lsm;
struct obd_client_handle **och_p = NULL;
__u64 *och_usecount = NULL;
struct ll_file_data *fd;
ll_capa_open(inode);
- lsm = lli->lli_smd;
- if (lsm == NULL) {
+ if (!lli->lli_has_smd) {
if (file->f_flags & O_LOV_DELAY_CREATE ||
!(file->f_mode & FMODE_WRITE)) {
CDEBUG(D_INODE, "object creation was delayed\n");
int ll_inode_getattr(struct inode *inode, struct obdo *obdo,
__u64 ioepoch, int sync)
{
- struct ll_inode_info *lli = ll_i2info(inode);
- struct obd_capa *capa = ll_mdscapa_get(inode);
- int rc;
- ENTRY;
+ struct obd_capa *capa = ll_mdscapa_get(inode);
+ struct lov_stripe_md *lsm;
+ int rc;
+ ENTRY;
- rc = ll_lsm_getattr(lli->lli_smd, ll_i2dtexp(inode),
- capa, obdo, ioepoch, sync);
- capa_put(capa);
- if (rc == 0) {
- obdo_refresh_inode(inode, obdo, obdo->o_valid);
- CDEBUG(D_INODE,
- "objid "LPX64" size %llu, blocks %llu, blksize %lu\n",
- lli->lli_smd->lsm_object_id, i_size_read(inode),
- (unsigned long long)inode->i_blocks,
- (unsigned long)ll_inode_blksize(inode));
- }
- RETURN(rc);
+ lsm = ccc_inode_lsm_get(inode);
+ rc = ll_lsm_getattr(lsm, ll_i2dtexp(inode),
+ capa, obdo, ioepoch, sync);
+ capa_put(capa);
+ if (rc == 0) {
+ obdo_refresh_inode(inode, obdo, obdo->o_valid);
+ CDEBUG(D_INODE,
+ "objid "LPX64" size %llu, blocks %llu, blksize %lu\n",
+ lsm ? lsm->lsm_object_id : 0, i_size_read(inode),
+ (unsigned long long)inode->i_blocks,
+ (unsigned long)ll_inode_blksize(inode));
+ }
+ ccc_inode_lsm_put(inode, lsm);
+ RETURN(rc);
}
int ll_merge_lvb(struct inode *inode)
{
- struct ll_inode_info *lli = ll_i2info(inode);
- struct ll_sb_info *sbi = ll_i2sbi(inode);
- struct ost_lvb lvb;
- int rc;
-
- ENTRY;
+ struct ll_inode_info *lli = ll_i2info(inode);
+ struct ll_sb_info *sbi = ll_i2sbi(inode);
+ struct lov_stripe_md *lsm;
+ struct ost_lvb lvb;
+ int rc = 0;
- ll_inode_size_lock(inode, 1);
- inode_init_lvb(inode, &lvb);
-
- /* merge timestamps the most resently obtained from mds with
- timestamps obtained from osts */
- lvb.lvb_atime = lli->lli_lvb.lvb_atime;
- lvb.lvb_mtime = lli->lli_lvb.lvb_mtime;
- lvb.lvb_ctime = lli->lli_lvb.lvb_ctime;
- rc = obd_merge_lvb(sbi->ll_dt_exp, lli->lli_smd, &lvb, 0);
- cl_isize_write_nolock(inode, lvb.lvb_size);
-
- CDEBUG(D_VFSTRACE, DFID" updating i_size "LPU64"\n",
- PFID(&lli->lli_fid), lvb.lvb_size);
- inode->i_blocks = lvb.lvb_blocks;
+ ENTRY;
- LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
- LTIME_S(inode->i_atime) = lvb.lvb_atime;
- LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
- ll_inode_size_unlock(inode, 1);
+ lsm = ccc_inode_lsm_get(inode);
+ ll_inode_size_lock(inode);
+ inode_init_lvb(inode, &lvb);
+
+ /* merge timestamps the most resently obtained from mds with
+ timestamps obtained from osts */
+ lvb.lvb_atime = lli->lli_lvb.lvb_atime;
+ lvb.lvb_mtime = lli->lli_lvb.lvb_mtime;
+ lvb.lvb_ctime = lli->lli_lvb.lvb_ctime;
+ if (lsm != NULL) {
+ rc = obd_merge_lvb(sbi->ll_dt_exp, lsm, &lvb, 0);
+ cl_isize_write_nolock(inode, lvb.lvb_size);
+
+ CDEBUG(D_VFSTRACE, DFID" updating i_size "LPU64"\n",
+ PFID(&lli->lli_fid), lvb.lvb_size);
+ inode->i_blocks = lvb.lvb_blocks;
+
+ LTIME_S(inode->i_mtime) = lvb.lvb_mtime;
+ LTIME_S(inode->i_atime) = lvb.lvb_atime;
+ LTIME_S(inode->i_ctime) = lvb.lvb_ctime;
+ }
+ ll_inode_size_unlock(inode);
+ ccc_inode_lsm_put(inode, lsm);
- RETURN(rc);
+ RETURN(rc);
}
int ll_glimpse_ioctl(struct ll_sb_info *sbi, struct lov_stripe_md *lsm,
static int ll_lov_recreate(struct inode *inode, obd_id id, obd_seq seq,
obd_count ost_idx)
{
- struct obd_export *exp = ll_i2dtexp(inode);
- struct obd_trans_info oti = { 0 };
- struct obdo *oa = NULL;
- int lsm_size;
- int rc = 0;
- struct lov_stripe_md *lsm, *lsm2;
- ENTRY;
+ struct obd_export *exp = ll_i2dtexp(inode);
+ struct obd_trans_info oti = { 0 };
+ struct obdo *oa = NULL;
+ int lsm_size;
+ int rc = 0;
+ struct lov_stripe_md *lsm = NULL, *lsm2;
+ ENTRY;
- OBDO_ALLOC(oa);
- if (oa == NULL)
- RETURN(-ENOMEM);
+ OBDO_ALLOC(oa);
+ if (oa == NULL)
+ RETURN(-ENOMEM);
- ll_inode_size_lock(inode, 0);
- lsm = ll_i2info(inode)->lli_smd;
+ lsm = ccc_inode_lsm_get(inode);
if (lsm == NULL)
GOTO(out, rc = -ENOENT);
+
lsm_size = sizeof(*lsm) + (sizeof(struct lov_oinfo) *
(lsm->lsm_stripe_count));
OBD_MD_FLMTIME | OBD_MD_FLCTIME);
obdo_set_parent_fid(oa, &ll_i2info(inode)->lli_fid);
memcpy(lsm2, lsm, lsm_size);
- rc = obd_create(NULL, exp, oa, &lsm2, &oti);
+ ll_inode_size_lock(inode);
+ rc = obd_create(NULL, exp, oa, &lsm2, &oti);
+ ll_inode_size_unlock(inode);
- OBD_FREE_LARGE(lsm2, lsm_size);
- GOTO(out, rc);
+ OBD_FREE_LARGE(lsm2, lsm_size);
+ GOTO(out, rc);
out:
- ll_inode_size_unlock(inode, 0);
- OBDO_FREE(oa);
- return rc;
+ ccc_inode_lsm_put(inode, lsm);
+ OBDO_FREE(oa);
+ return rc;
}
static int ll_lov_recreate_obj(struct inode *inode, unsigned long arg)
int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file,
int flags, struct lov_user_md *lum, int lum_size)
{
- struct lov_stripe_md *lsm;
- struct lookup_intent oit = {.it_op = IT_OPEN, .it_flags = flags};
- int rc = 0;
- ENTRY;
+ struct lov_stripe_md *lsm = NULL;
+ struct lookup_intent oit = {.it_op = IT_OPEN, .it_flags = flags};
+ int rc = 0;
+ ENTRY;
- ll_inode_size_lock(inode, 0);
- lsm = ll_i2info(inode)->lli_smd;
- if (lsm) {
- ll_inode_size_unlock(inode, 0);
- CDEBUG(D_IOCTL, "stripe already exists for ino %lu\n",
- inode->i_ino);
- RETURN(-EEXIST);
- }
+ lsm = ccc_inode_lsm_get(inode);
+ if (lsm != NULL) {
+ ccc_inode_lsm_put(inode, lsm);
+ CDEBUG(D_IOCTL, "stripe already exists for ino %lu\n",
+ inode->i_ino);
+ RETURN(-EEXIST);
+ }
+ ll_inode_size_lock(inode);
rc = ll_intent_file_open(file, lum, lum_size, &oit);
if (rc)
GOTO(out, rc);
ll_release_openhandle(file->f_dentry, &oit);
out:
- ll_inode_size_unlock(inode, 0);
- ll_intent_release(&oit);
- RETURN(rc);
+ ll_inode_size_unlock(inode);
+ ll_intent_release(&oit);
+ ccc_inode_lsm_put(inode, lsm);
+ RETURN(rc);
out_req_free:
- ptlrpc_req_finished((struct ptlrpc_request *) oit.d.lustre.it_data);
- goto out;
+ ptlrpc_req_finished((struct ptlrpc_request *) oit.d.lustre.it_data);
+ goto out;
}
int ll_lov_getstripe_ea_info(struct inode *inode, const char *filename,
rc = ll_lov_setstripe_ea_info(inode, file, flags, lumv1, lum_size);
if (rc == 0) {
- put_user(0, &lumv1p->lmm_stripe_count);
- rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode),
- 0, ll_i2info(inode)->lli_smd,
- (void *)arg);
- }
- RETURN(rc);
+ struct lov_stripe_md *lsm;
+ put_user(0, &lumv1p->lmm_stripe_count);
+ lsm = ccc_inode_lsm_get(inode);
+ rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode),
+ 0, lsm, (void *)arg);
+ ccc_inode_lsm_put(inode, lsm);
+ }
+ RETURN(rc);
}
static int ll_lov_getstripe(struct inode *inode, unsigned long arg)
{
- struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
- int rc = -ENODATA;
- ENTRY;
+ struct lov_stripe_md *lsm;
+ int rc = -ENODATA;
+ ENTRY;
- if (lsm != NULL)
- rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode), 0,
- lsm, (void *)arg);
- RETURN(rc);
+ lsm = ccc_inode_lsm_get(inode);
+ if (lsm != NULL)
+ rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode), 0,
+ lsm, (void *)arg);
+ ccc_inode_lsm_put(inode, lsm);
+ RETURN(rc);
}
int ll_get_grouplock(struct inode *inode, struct file *file, unsigned long arg)
int ll_do_fiemap(struct inode *inode, struct ll_user_fiemap *fiemap,
int num_bytes)
{
- struct obd_export *exp = ll_i2dtexp(inode);
- struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
+ struct obd_export *exp = ll_i2dtexp(inode);
+ struct lov_stripe_md *lsm = NULL;
struct ll_fiemap_info_key fm_key = { .name = KEY_FIEMAP, };
int vallen = num_bytes;
int rc;
return rc;
}
- /* If the stripe_count > 1 and the application does not understand
- * DEVICE_ORDER flag, then it cannot interpret the extents correctly.
- */
- if (lsm->lsm_stripe_count > 1 &&
- !(fiemap->fm_flags & FIEMAP_FLAG_DEVICE_ORDER))
- return -EOPNOTSUPP;
+ lsm = ccc_inode_lsm_get(inode);
+ if (lsm == NULL)
+ return -ENOENT;
+
+ /* If the stripe_count > 1 and the application does not understand
+ * DEVICE_ORDER flag, then it cannot interpret the extents correctly.
+ */
+ if (lsm->lsm_stripe_count > 1 &&
+ !(fiemap->fm_flags & FIEMAP_FLAG_DEVICE_ORDER))
+ GOTO(out, rc = -EOPNOTSUPP);
fm_key.oa.o_id = lsm->lsm_object_id;
fm_key.oa.o_seq = lsm->lsm_object_seq;
/* If filesize is 0, then there would be no objects for mapping */
if (fm_key.oa.o_size == 0) {
fiemap->fm_mapped_extents = 0;
- RETURN(0);
+ GOTO(out, rc = 0);
}
memcpy(&fm_key.fiemap, fiemap, sizeof(*fiemap));
if (rc)
CERROR("obd_get_info failed: rc = %d\n", rc);
- RETURN(rc);
+out:
+ ccc_inode_lsm_put(inode, lsm);
+ RETURN(rc);
}
int ll_fid2path(struct obd_export *exp, void *arg)
static int ll_data_version(struct inode *inode, __u64 *data_version,
int extent_lock)
{
- struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
- struct ll_sb_info *sbi = ll_i2sbi(inode);
- struct obdo *obdo = NULL;
- int rc;
- ENTRY;
+ struct lov_stripe_md *lsm = NULL;
+ struct ll_sb_info *sbi = ll_i2sbi(inode);
+ struct obdo *obdo = NULL;
+ int rc;
+ ENTRY;
- /* If no stripe, we consider version is 0. */
- if (!lsm) {
+ /* If no stripe, we consider version is 0. */
+ lsm = ccc_inode_lsm_get(inode);
+ if (lsm == NULL) {
*data_version = 0;
CDEBUG(D_INODE, "No object for inode\n");
RETURN(0);
}
OBD_ALLOC_PTR(obdo);
- if (obdo == NULL)
- RETURN(-ENOMEM);
+ if (obdo == NULL) {
+ ccc_inode_lsm_put(inode, lsm);
+ RETURN(-ENOMEM);
+ }
rc = ll_lsm_getattr(lsm, sbi->ll_dt_exp, NULL, obdo, 0, extent_lock);
if (!rc) {
}
OBD_FREE_PTR(obdo);
+ ccc_inode_lsm_put(inode, lsm);
- RETURN(rc);
+ RETURN(rc);
}
long ll_file_ioctl(struct file *file, unsigned int cmd, unsigned long arg)
{
struct inode *inode = file->f_dentry->d_inode;
struct ll_inode_info *lli = ll_i2info(inode);
- struct lov_stripe_md *lsm = lli->lli_smd;
int rc, err;
LASSERT(!S_ISDIR(inode->i_mode));
* failed for pages in this mapping. */
rc = lli->lli_async_rc;
lli->lli_async_rc = 0;
- if (lsm) {
- err = lov_test_and_clear_async_rc(lsm);
- if (rc == 0)
- rc = err;
- }
+ err = lov_read_and_clear_async_rc(lli->lli_clob);
+ if (rc == 0)
+ rc = err;
- return rc ? -EIO : 0;
+ return rc ? -EIO : 0;
}
/**
{
struct inode *inode = file->f_dentry->d_inode;
struct ll_inode_info *lli = ll_i2info(inode);
- struct lov_stripe_md *lsm = lli->lli_smd;
struct ptlrpc_request *req;
struct obd_capa *oc;
+ struct lov_stripe_md *lsm;
int rc, err;
ENTRY;
CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino,
lli->lli_async_rc = 0;
if (rc == 0)
rc = err;
- if (lsm) {
- err = lov_test_and_clear_async_rc(lsm);
- if (rc == 0)
- rc = err;
- }
+ err = lov_read_and_clear_async_rc(lli->lli_clob);
+ if (rc == 0)
+ rc = err;
}
oc = ll_mdscapa_get(inode);
if (!err)
ptlrpc_req_finished(req);
- if (data && lsm) {
+ lsm = ccc_inode_lsm_get(inode);
+ if (data && lsm) {
err = cl_sync_file_range(inode, 0, OBD_OBJECT_EOF,
- CL_FSYNC_ALL);
+ CL_FSYNC_ALL);
if (rc == 0 && err < 0)
- rc = err;
- lli->lli_write_rc = rc < 0 ? rc : 0;
- }
+ rc = err;
+ lli->lli_write_rc = rc < 0 ? rc : 0;
+ }
+ ccc_inode_lsm_put(inode, lsm);
- RETURN(rc);
+ RETURN(rc);
}
int ll_file_flock(struct file *file, int cmd, struct file_lock *file_lock)
rc = __ll_inode_revalidate_it(dentry, it, ibits);
/* if object not yet allocated, don't validate size */
- if (rc == 0 && ll_i2info(dentry->d_inode)->lli_smd == NULL) {
+ if (rc == 0 && !ll_i2info(dentry->d_inode)->lli_has_smd) {
LTIME_S(inode->i_atime) = ll_i2info(inode)->lli_lvb.lvb_atime;
LTIME_S(inode->i_mtime) = ll_i2info(inode)->lli_lvb.lvb_mtime;
LTIME_S(inode->i_ctime) = ll_i2info(inode)->lli_lvb.lvb_ctime;
old_flags & MF_GETATTR_LOCK);
if (rc) {
oa->o_valid = 0;
- if (rc == -ENOENT)
- CDEBUG(D_INODE, "objid "LPX64" is destroyed\n",
- lli->lli_smd->lsm_object_id);
- else
+ if (rc != -ENOENT)
CERROR("inode_getattr failed (%d): unable to "
"send a Size-on-MDS attribute update "
"for inode %lu/%u\n", rc, inode->i_ino,
__u64 lli_open_fd_write_count;
__u64 lli_open_fd_exec_count;
/* Protects access to och pointers and their usage counters, also
- * atomicity of check-update of lli_smd */
+ * atomicity of check-update of lli_has_smd */
cfs_mutex_t lli_och_mutex;
struct inode lli_vfs_inode;
* In the future, if more members are added only for directory,
* some of the following members can be moved into u.f.
*/
- struct lov_stripe_md *lli_smd;
+ bool lli_has_smd;
struct cl_object *lli_clob;
};
/*
* Locking to guarantee consistency of non-atomic updates to long long i_size,
- * consistency between file size and KMS, and consistency within
- * ->lli_smd->lsm_oinfo[]'s.
+ * consistency between file size and KMS.
*
- * Implemented by ->lli_size_sem and ->lsm_sem, nested in that order.
+ * Implemented by ->lli_size_sem and ->lsm_lock, nested in that order.
*/
-void ll_inode_size_lock(struct inode *inode, int lock_lsm);
-void ll_inode_size_unlock(struct inode *inode, int unlock_lsm);
+void ll_inode_size_lock(struct inode *inode);
+void ll_inode_size_unlock(struct inode *inode);
// FIXME: replace the name of this with LL_I to conform to kernel stuff
// static inline struct ll_inode_info *LL_I(struct inode *inode)
return LUSTRE_FPRIVATE(attr->ia_file);
}
-static inline void cl_isize_lock(struct inode *inode, int lsmlock)
+static inline void cl_isize_lock(struct inode *inode)
{
- ll_inode_size_lock(inode, lsmlock);
+ ll_inode_size_lock(inode);
}
-static inline void cl_isize_unlock(struct inode *inode, int lsmlock)
+static inline void cl_isize_unlock(struct inode *inode)
{
- ll_inode_size_unlock(inode, lsmlock);
+ ll_inode_size_unlock(inode);
}
static inline void cl_isize_write_nolock(struct inode *inode, loff_t kms)
static inline void cl_isize_write(struct inode *inode, loff_t kms)
{
- ll_inode_size_lock(inode, 0);
- i_size_write(inode, kms);
- ll_inode_size_unlock(inode, 0);
+ ll_inode_size_lock(inode);
+ i_size_write(inode, kms);
+ ll_inode_size_unlock(inode);
}
#define cl_isize_read(inode) i_size_read(inode)
lli->lli_open_fd_exec_count = 0;
cfs_mutex_init(&lli->lli_och_mutex);
cfs_spin_lock_init(&lli->lli_agl_lock);
- lli->lli_smd = NULL;
+ lli->lli_has_smd = false;
lli->lli_clob = NULL;
LASSERT(lli->lli_vfs_inode.i_mode != 0);
* cl_object still uses inode lsm.
*/
cl_inode_fini(inode);
+ lli->lli_has_smd = false;
- if (lli->lli_smd) {
- obd_free_memmd(sbi->ll_dt_exp, &lli->lli_smd);
- lli->lli_smd = NULL;
- }
-
-
- EXIT;
+ EXIT;
}
int ll_md_setattr(struct dentry *dentry, struct md_op_data *op_data,
*/
int ll_setattr_raw(struct dentry *dentry, struct iattr *attr)
{
+ struct lov_stripe_md *lsm;
struct inode *inode = dentry->d_inode;
struct ll_inode_info *lli = ll_i2info(inode);
- struct lov_stripe_md *lsm = lli->lli_smd;
struct md_op_data *op_data = NULL;
struct md_open_data *mod = NULL;
int ia_valid = attr->ia_valid;
LTIME_S(attr->ia_mtime), LTIME_S(attr->ia_ctime),
cfs_time_current_sec());
- /* NB: ATTR_SIZE will only be set after this point if the size
- * resides on the MDS, ie, this file has no objects. */
- if (lsm)
- attr->ia_valid &= ~ATTR_SIZE;
-
/* We always do an MDS RPC, even if we're only changing the size;
* only the MDS knows whether truncate() should fail with -ETXTBUSY */
DOWN_WRITE_I_ALLOC_SEM(inode);
}
+ /* We need a steady stripe configuration for setattr to avoid
+ * confusion. */
+ lsm = ccc_inode_lsm_get(inode);
+
+ /* NB: ATTR_SIZE will only be set after this point if the size
+ * resides on the MDS, ie, this file has no objects. */
+ if (lsm != NULL)
+ attr->ia_valid &= ~ATTR_SIZE;
+
memcpy(&op_data->op_attr, attr, sizeof(*attr));
/* Open epoch for truncate. */
GOTO(out, rc);
ll_ioepoch_open(lli, op_data->op_ioepoch);
- if (!lsm || !S_ISREG(inode->i_mode)) {
+ if (lsm == NULL || !S_ISREG(inode->i_mode)) {
CDEBUG(D_INODE, "no lsm: not setting attrs on OST\n");
GOTO(out, rc = 0);
}
rc = ll_setattr_ost(inode, attr);
EXIT;
out:
+ ccc_inode_lsm_put(inode, lsm);
if (op_data) {
if (op_data->op_ioepoch) {
rc1 = ll_setattr_done_writing(inode, op_data, mod);
return 0;
}
-void ll_inode_size_lock(struct inode *inode, int lock_lsm)
+void ll_inode_size_lock(struct inode *inode)
{
struct ll_inode_info *lli;
- struct lov_stripe_md *lsm;
LASSERT(!S_ISDIR(inode->i_mode));
cfs_down(&lli->lli_size_sem);
LASSERT(lli->lli_size_sem_owner == NULL);
lli->lli_size_sem_owner = current;
- lsm = lli->lli_smd;
- LASSERTF(lsm != NULL || lock_lsm == 0, "lsm %p, lock_lsm %d\n",
- lsm, lock_lsm);
- if (lock_lsm)
- lov_stripe_lock(lsm);
}
-void ll_inode_size_unlock(struct inode *inode, int unlock_lsm)
+void ll_inode_size_unlock(struct inode *inode)
{
struct ll_inode_info *lli;
- struct lov_stripe_md *lsm;
lli = ll_i2info(inode);
- lsm = lli->lli_smd;
- LASSERTF(lsm != NULL || unlock_lsm == 0, "lsm %p, lock_lsm %d\n",
- lsm, unlock_lsm);
- if (unlock_lsm)
- lov_stripe_unlock(lsm);
LASSERT(lli->lli_size_sem_owner == current);
lli->lli_size_sem_owner = NULL;
cfs_up(&lli->lli_size_sem);
LASSERT(S_ISREG(inode->i_mode));
cfs_mutex_lock(&lli->lli_och_mutex);
- if (lli->lli_smd == NULL) {
- if (lsm->lsm_magic != LOV_MAGIC_V1 &&
- lsm->lsm_magic != LOV_MAGIC_V3) {
- dump_lsm(D_ERROR, lsm);
- LBUG();
- }
- CDEBUG(D_INODE, "adding lsm %p to inode %lu/%u(%p)\n",
- lsm, inode->i_ino, inode->i_generation, inode);
- /* cl_file_inode_init must go before lli_smd or a race
- * is possible where client thinks the file has stripes,
- * but lov raid0 is not setup yet and parallel e.g.
- * glimpse would try to use uninitialized lov */
- cl_file_inode_init(inode, md);
- cfs_spin_lock(&lli->lli_lock);
- lli->lli_smd = lsm;
- cfs_spin_unlock(&lli->lli_lock);
- cfs_mutex_unlock(&lli->lli_och_mutex);
- lli->lli_maxbytes = lsm->lsm_maxbytes;
- if (lli->lli_maxbytes > MAX_LFS_FILESIZE)
- lli->lli_maxbytes = MAX_LFS_FILESIZE;
- } else {
- cfs_mutex_unlock(&lli->lli_och_mutex);
- LASSERT(lli->lli_smd->lsm_magic == lsm->lsm_magic &&
- lli->lli_smd->lsm_stripe_count ==
- lsm->lsm_stripe_count);
- if (lov_stripe_md_cmp(lli->lli_smd, lsm)) {
- CERROR("lsm mismatch for inode %ld\n",
- inode->i_ino);
- CERROR("lli_smd:\n");
- dump_lsm(D_ERROR, lli->lli_smd);
- CERROR("lsm:\n");
- dump_lsm(D_ERROR, lsm);
- LBUG();
- }
- }
- if (lli->lli_smd != lsm)
- obd_free_memmd(ll_i2dtexp(inode), &lsm);
+ CDEBUG(D_INODE, "adding lsm %p to inode %lu/%u(%p)\n",
+ lsm, inode->i_ino, inode->i_generation, inode);
+ /* cl_file_inode_init must go before lli_has_smd or a race
+ * is possible where client thinks the file has stripes,
+ * but lov raid0 is not setup yet and parallel e.g.
+ * glimpse would try to use uninitialized lov */
+ if (cl_file_inode_init(inode, md) == 0)
+ lli->lli_has_smd = true;
+ cfs_mutex_unlock(&lli->lli_och_mutex);
+ lli->lli_maxbytes = lsm->lsm_maxbytes;
+ if (lli->lli_maxbytes > MAX_LFS_FILESIZE)
+ lli->lli_maxbytes = MAX_LFS_FILESIZE;
+ if (md->lsm != NULL)
+ obd_free_memmd(ll_i2dtexp(inode), &md->lsm);
}
if (sbi->ll_flags & LL_SBI_RMT_CLIENT) {
if (body->valid & OBD_MD_FLSIZE) {
if (exp_connect_som(ll_i2mdexp(inode)) &&
- S_ISREG(inode->i_mode) && lli->lli_smd) {
+ S_ISREG(inode->i_mode) && lli->lli_has_smd) {
struct lustre_handle lockh;
ldlm_mode_t mode;
CDEBUG(D_VFSTRACE, "VFS Op:inode="DFID"(%p)\n",
PFID(&lli->lli_fid), inode);
- LASSERT(!lli->lli_smd);
+ LASSERT(!lli->lli_has_smd);
/* Core attributes from the MDS first. This is a new inode, and
* the VFS doesn't zero times in the core inode so we have to do
RETURN(put_user(flags, (int *)arg));
}
case FSFILT_IOC_SETFLAGS: {
- struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
+ struct lov_stripe_md *lsm;
struct obd_info oinfo = { { { 0 } } };
struct md_op_data *op_data;
if (get_user(flags, (int *)arg))
RETURN(-EFAULT);
- oinfo.oi_md = lsm;
- OBDO_ALLOC(oinfo.oi_oa);
- if (!oinfo.oi_oa)
- RETURN(-ENOMEM);
-
op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0,
LUSTRE_OPC_ANY, NULL);
if (IS_ERR(op_data))
NULL, 0, NULL, 0, &req, NULL);
ll_finish_md_op_data(op_data);
ptlrpc_req_finished(req);
- if (rc) {
- OBDO_FREE(oinfo.oi_oa);
- RETURN(rc);
- }
+ if (rc)
+ RETURN(rc);
- if (lsm == NULL) {
- OBDO_FREE(oinfo.oi_oa);
- GOTO(update_cache, rc);
- }
+ OBDO_ALLOC(oinfo.oi_oa);
+ if (!oinfo.oi_oa)
+ RETURN(-ENOMEM);
+
+ lsm = ccc_inode_lsm_get(inode);
+ if (lsm == NULL) {
+ inode->i_flags = ll_ext_to_inode_flags(flags);
+ OBDO_FREE(oinfo.oi_oa);
+ RETURN(0);
+ }
+ oinfo.oi_md = lsm;
oinfo.oi_oa->o_id = lsm->lsm_object_id;
oinfo.oi_oa->o_seq = lsm->lsm_object_seq;
oinfo.oi_oa->o_flags = flags;
rc = obd_setattr_rqset(sbi->ll_dt_exp, &oinfo, NULL);
capa_put(oinfo.oi_capa);
OBDO_FREE(oinfo.oi_oa);
- if (rc) {
- if (rc != -EPERM && rc != -EACCES)
- CERROR("osc_setattr_async fails: rc = %d\n",rc);
- RETURN(rc);
- }
+ ccc_inode_lsm_put(inode, lsm);
- EXIT;
-update_cache:
- inode->i_flags = ll_ext_to_inode_flags(flags);
- return 0;
+ if (rc && rc != -EPERM && rc != -EACCES)
+ CERROR("osc_setattr_async fails: rc = %d\n", rc);
+
+ RETURN(rc);
}
default:
RETURN(-ENOSYS);
* try to use the default stripe data from parent directory for
* allocating OST objects. Try to pass the parent FID to MDS. */
if (opc == LUSTRE_OPC_CREATE && i1 == i2 && S_ISREG(i2->i_mode) &&
- ll_i2info(i2)->lli_smd == NULL) {
- struct ll_inode_info *lli = ll_i2info(i2);
+ !ll_i2info(i2)->lli_has_smd) {
+ struct ll_inode_info *lli = ll_i2info(i2);
- cfs_spin_lock(&lli->lli_lock);
- if (likely(lli->lli_smd == NULL &&
- !fid_is_zero(&lli->lli_pfid)))
+ cfs_spin_lock(&lli->lli_lock);
+ if (likely(!lli->lli_has_smd && !fid_is_zero(&lli->lli_pfid)))
op_data->op_fid1 = lli->lli_pfid;
cfs_spin_unlock(&lli->lli_lock);
/** We ignore parent's capability temporary. */
* It is an anonymous dentry without OST objects created yet.
* We have to find the parent to tell MDS how to init lov objects.
*/
- if (S_ISREG(inode->i_mode) && ll_i2info(inode)->lli_smd == NULL &&
+ if (S_ISREG(inode->i_mode) && !ll_i2info(inode)->lli_has_smd &&
parent != NULL) {
struct ll_inode_info *lli = ll_i2info(inode);
result = cl_io_read_page(env, io, page);
} else {
/* Page from a non-object file. */
- LASSERT(!ll_i2info(vmpage->mapping->host)->lli_smd);
+ LASSERT(!ll_i2info(vmpage->mapping->host)->lli_has_smd);
unlock_page(vmpage);
result = 0;
}
long count = iov_length(iov, nr_segs);
long tot_bytes = 0, result = 0;
struct ll_inode_info *lli = ll_i2info(inode);
- struct lov_stripe_md *lsm = lli->lli_smd;
unsigned long seg = 0;
long size = MAX_DIO_SIZE;
int refcheck;
ENTRY;
- if (!lli->lli_smd || !lli->lli_smd->lsm_object_id)
+ if (!lli->lli_has_smd)
RETURN(-EBADF);
/* FIXME: io smaller than PAGE_SIZE is broken on ia64 ??? */
if (tot_bytes > 0) {
if (rw == WRITE) {
- lov_stripe_lock(lsm);
- obd_adjust_kms(ll_i2dtexp(inode), lsm, file_offset, 0);
- lov_stripe_unlock(lsm);
- }
- }
-
- cl_env_put(env, &refcheck);
- RETURN(tot_bytes ? : result);
+ struct lov_stripe_md *lsm;
+
+ lsm = ccc_inode_lsm_get(inode);
+ LASSERT(lsm != NULL);
+ lov_stripe_lock(lsm);
+ obd_adjust_kms(ll_i2dtexp(inode), lsm, file_offset, 0);
+ lov_stripe_unlock(lsm);
+ ccc_inode_lsm_put(inode, lsm);
+ }
+ }
+
+ cl_env_put(env, &refcheck);
+ RETURN(tot_bytes ? : result);
}
#if defined(HAVE_KERNEL_WRITE_BEGIN_END) || defined(MS_HAS_NEW_AOPS)
static inline int agl_should_run(struct ll_statahead_info *sai,
struct inode *inode)
{
- if (inode != NULL && S_ISREG(inode->i_mode) &&
- ll_i2info(inode)->lli_smd != NULL && sai->sai_agl_valid)
- return 1;
- return 0;
+ if (inode != NULL && S_ISREG(inode->i_mode) &&
+ ll_i2info(inode)->lli_has_smd && sai->sai_agl_valid)
+ return 1;
+ return 0;
}
static inline struct ll_sa_entry *
CDEBUG(D_VFSTRACE, "VFS Op\n");
- ll_inode_size_lock(inode, 0);
+ ll_inode_size_lock(inode);
rc = ll_readlink_internal(inode, &request, &symname);
if (rc)
GOTO(out, rc);
rc = vfs_readlink(dentry, buffer, buflen, symname);
out:
ptlrpc_req_finished(request);
- ll_inode_size_unlock(inode, 0);
- RETURN(rc);
+ ll_inode_size_unlock(inode);
+ RETURN(rc);
}
#ifdef HAVE_COOKIE_FOLLOW_LINK
} else if (THREAD_SIZE == 8192 && current->link_count >= 8) {
rc = -ELOOP;
} else {
- ll_inode_size_lock(inode, 0);
- rc = ll_readlink_internal(inode, &request, &symname);
- ll_inode_size_unlock(inode, 0);
+ ll_inode_size_lock(inode);
+ rc = ll_readlink_internal(inode, &request, &symname);
+ ll_inode_size_unlock(inode);
}
if (rc) {
cfs_path_put(nd); /* Kernel assumes that ->follow_link()
ENTRY;
/* XXX: Layer violation, we shouldn't see lsm at llite level. */
- if (lli->lli_smd != NULL) /* lsm-less file, don't need to lock */
+ if (lli->lli_has_smd) /* lsm-less file doesn't need to lock */
result = vvp_io_rw_lock(env, io, CLM_READ,
io->u.ci_rd.rd.crw_pos,
io->u.ci_rd.rd.crw_pos +
static int vvp_do_vmtruncate(struct inode *inode, size_t size)
{
- int result;
- /*
- * Only ll_inode_size_lock is taken at this level. lov_stripe_lock()
- * is grabbed by ll_truncate() only over call to obd_adjust_kms().
- */
- ll_inode_size_lock(inode, 0);
- result = vmtruncate(inode, size);
- ll_inode_size_unlock(inode, 0);
+ int result;
+ /*
+ * Only ll_inode_size_lock is taken at this level. lov_stripe_lock()
+ * is grabbed by ll_truncate() only over call to obd_adjust_kms().
+ */
+ ll_inode_size_lock(inode);
+ result = vmtruncate(inode, size);
+ ll_inode_size_unlock(inode);
- return result;
+ return result;
}
static int vvp_io_setattr_trunc(const struct lu_env *env,
size = cl_offset(obj, pg->cp_index) + to;
- ll_inode_size_lock(inode, 0);
+ ll_inode_size_lock(inode);
if (result == 0) {
if (size > i_size_read(inode)) {
cl_isize_write_nolock(inode, size);
if (size > i_size_read(inode))
cl_page_discard(env, io, pg);
}
- ll_inode_size_unlock(inode, 0);
- RETURN(result);
+ ll_inode_size_unlock(inode);
+ RETURN(result);
}
static const struct cl_io_operations vvp_io_ops = {
(strncmp(name, XATTR_LUSTRE_PREFIX,
sizeof(XATTR_LUSTRE_PREFIX) - 1) == 0 &&
strcmp(name + sizeof(XATTR_LUSTRE_PREFIX) - 1, "lov") == 0)) {
+ struct lov_stripe_md *lsm;
struct lov_user_md *lump;
struct lov_mds_md *lmm = NULL;
struct ptlrpc_request *request = NULL;
GOTO(out, rc = sizeof(struct lov_user_md));
}
- if (!ll_i2info(inode)->lli_smd) {
+ lsm = ccc_inode_lsm_get(inode);
+ if (lsm == NULL) {
if (S_ISDIR(inode->i_mode)) {
rc = ll_dir_getstripe(inode, &lmm,
&lmmsize, &request);
} else {
/* LSM is present already after lookup/getattr call.
* we need to grab layout lock once it is implemented */
- rc = obd_packmd(ll_i2dtexp(inode), &lmm,
- ll_i2info(inode)->lli_smd);
- lmmsize = rc;
- }
+ rc = obd_packmd(ll_i2dtexp(inode), &lmm, lsm);
+ lmmsize = rc;
+ }
+ ccc_inode_lsm_put(inode, lsm);
if (rc < 0)
GOTO(out, rc);
rc -= xlen;
}
}
- if (S_ISREG(inode->i_mode)) {
- if (ll_i2info(inode)->lli_smd == NULL)
+ if (S_ISREG(inode->i_mode)) {
+ if (!ll_i2info(inode)->lli_has_smd)
rc2 = -1;
} else if (S_ISDIR(inode->i_mode)) {
rc2 = ll_dir_getstripe(inode, &lmm, &lmmsize, &request);
* Type of an object. Protected by lov_object::lo_type_guard.
*/
enum lov_layout_type lo_type;
+ /**
+ * Waitq - wait for no one else is using lo_lsm
+ */
+ cfs_waitq_t lo_waitq;
union lov_layout_state {
struct lov_layout_raid0 {
* lov_io::lis_cl::cis_object.
*/
struct lov_object *lis_object;
+ /**
+ * Lov stripe - this determines how this io fans out.
+ * Hold a refcount to the lsm so it can't go away during IO.
+ */
+ struct lov_stripe_md *lis_lsm;
/**
* Original end-of-io position for this IO, set by the upper layer as
* cl_io::u::ci_rw::pos + cl_io::u::ci_rw::count. lov remembers this,
struct lov_io *lio,
const struct cl_page_slice *slice);
+void lov_lsm_decref(struct lov_object *lov, struct lov_stripe_md *lsm);
+struct lov_stripe_md *lov_lsm_addref(struct lov_object *lov);
#define lov_foreach_target(lov, var) \
for (var = 0; var < lov_targets_nr(lov); ++var)
struct lov_stripe_md *lsm, struct lov_user_md *lump);
int lov_alloc_memmd(struct lov_stripe_md **lsmp, __u16 stripe_count,
int pattern, int magic);
-void lov_free_memmd(struct lov_stripe_md **lsmp);
+int lov_free_memmd(struct lov_stripe_md **lsmp);
void lov_dump_lmm_v1(int level, struct lov_mds_md_v1 *lmm);
void lov_dump_lmm_v3(int level, struct lov_mds_md_v3 *lmm);
int lov_check_index_in_pool(__u32 idx, struct pool_desc *pool);
void lov_pool_putref(struct pool_desc *pool);
+static inline struct lov_stripe_md *lsm_addref(struct lov_stripe_md *lsm)
+{
+ LASSERT(cfs_atomic_read(&lsm->lsm_refc) > 0);
+ cfs_atomic_inc(&lsm->lsm_refc);
+ return lsm;
+}
+
#endif
* Implementation of cl_io for LOV layer.
*
* Author: Nikita Danilov <nikita.danilov@sun.com>
+ * Author: Jinshan Xiong <jinshan.xiong@whamcloud.com>
*/
#define DEBUG_SUBSYSTEM S_LOV
static void lov_io_sub_inherit(struct cl_io *io, struct lov_io *lio,
int stripe, loff_t start, loff_t end)
{
- struct lov_stripe_md *lsm = lov_r0(lio->lis_object)->lo_lsm;
+ struct lov_stripe_md *lsm = lio->lis_lsm;
struct cl_io *parent = lio->lis_cl.cis_io;
switch(io->ci_type) {
struct lov_io_sub *lov_page_subio(const struct lu_env *env, struct lov_io *lio,
const struct cl_page_slice *slice)
{
- struct lov_stripe_md *lsm = lov_r0(lio->lis_object)->lo_lsm;
+ struct lov_stripe_md *lsm = lio->lis_lsm;
struct cl_page *page = slice->cpl_page;
int stripe;
static int lov_io_subio_init(const struct lu_env *env, struct lov_io *lio,
struct cl_io *io)
{
- struct lov_object *lov = lio->lis_object;
- struct lov_stripe_md *lsm = lov_r0(lov)->lo_lsm;
+ struct lov_stripe_md *lsm = lio->lis_lsm;
int result;
LASSERT(lio->lis_object != NULL);
static void lov_io_slice_init(struct lov_io *lio,
struct lov_object *obj, struct cl_io *io)
{
- struct lov_stripe_md *lsm = lov_r0(obj)->lo_lsm;
+ struct lov_stripe_md *lsm = lov_lsm_addref(obj);
+ ENTRY;
- LASSERT(lsm != NULL);
- ENTRY;
+ io->ci_result = 0;
+ lio->lis_object = obj;
- io->ci_result = 0;
- lio->lis_object = obj;
+ LASSERT(lsm != NULL);
+ lio->lis_lsm = lsm; /* called inside lo_type_guard. */
lio->lis_stripe_count = lsm->lsm_stripe_count;
switch (io->ci_type) {
lio->lis_nr_subios * sizeof lio->lis_subs[0]);
lio->lis_nr_subios = 0;
}
- EXIT;
+ lov_lsm_decref(lio->lis_object, lio->lis_lsm);
+ lio->lis_lsm = NULL;
+ EXIT;
}
static obd_off lov_offset_mod(obd_off val, int delta)
static int lov_io_iter_init(const struct lu_env *env,
const struct cl_io_slice *ios)
{
- struct lov_io *lio = cl2lov_io(env, ios);
- struct lov_stripe_md *lsm = lov_r0(lio->lis_object)->lo_lsm;
+ struct lov_io *lio = cl2lov_io(env, ios);
+ struct lov_stripe_md *lsm = lio->lis_lsm;
struct lov_io_sub *sub;
obd_off endpos;
obd_off start;
static int lov_io_rw_iter_init(const struct lu_env *env,
const struct cl_io_slice *ios)
{
- struct lov_io *lio = cl2lov_io(env, ios);
- struct cl_io *io = ios->cis_io;
- struct lov_stripe_md *lsm = lov_r0(cl2lov(ios->cis_obj))->lo_lsm;
+ struct lov_io *lio = cl2lov_io(env, ios);
+ struct cl_io *io = ios->cis_io;
+ struct lov_stripe_md *lsm = lio->lis_lsm;
loff_t start = io->u.ci_rw.crw_pos;
loff_t next;
unsigned long ssize = lsm->lsm_stripe_size;
CDEBUG(D_VFSTRACE, "stripe: "LPU64" chunk: ["LPU64", "LPU64") "
LPU64"\n", (__u64)start, lio->lis_pos, lio->lis_endpos,
(__u64)lio->lis_io_endpos);
- }
- /*
- * XXX The following call should be optimized: we know, that
- * [lio->lis_pos, lio->lis_endpos) intersects with exactly one stripe.
- */
- RETURN(lov_io_iter_init(env, ios));
+ }
+ /*
+ * XXX The following call should be optimized: we know, that
+ * [lio->lis_pos, lio->lis_endpos) intersects with exactly one stripe.
+ */
+ RETURN(lov_io_iter_init(env, ios));
}
static int lov_io_call(const struct lu_env *env, struct lov_io *lio,
if (parent->ci_result == 0)
parent->ci_result = sub->sub_io->ci_result;
- }
- RETURN(rc);
+ }
+ RETURN(rc);
}
static int lov_io_lock(const struct lu_env *env, const struct cl_io_slice *ios)
CFS_INIT_LIST_HEAD(&lio->lis_active);
lov_io_slice_init(lio, lov, io);
if (io->ci_result == 0) {
- LASSERT(lov_r0(lov)->lo_lsm != NULL);
io->ci_result = lov_io_subio_init(env, lio, io);
if (io->ci_result == 0)
cl_io_slice_add(io, &lio->lis_cl, obj, &lov_io_ops);
int lov_io_init_empty(const struct lu_env *env, struct cl_object *obj,
struct cl_io *io)
{
- struct lov_io *lio = lov_env_io(env);
- int result;
+ struct lov_io *lio = lov_env_io(env);
+ int result;
+ ENTRY;
- ENTRY;
- switch (io->ci_type) {
- default:
- LBUG();
+ lio->lis_lsm = NULL;
+ switch (io->ci_type) {
+ default:
+ LBUG();
case CIT_FSYNC:
case CIT_MISC:
case CIT_READ:
int lov_merge_lvb(struct obd_export *exp,
struct lov_stripe_md *lsm, struct ost_lvb *lvb, int kms_only)
{
- int rc;
- __u64 kms;
-
- ENTRY;
- rc = lov_merge_lvb_kms(lsm, lvb, &kms);
- if (kms_only)
- lvb->lvb_size = kms;
+ int rc;
+ __u64 kms;
+
+ ENTRY;
+ lov_stripe_lock(lsm);
+ rc = lov_merge_lvb_kms(lsm, lvb, &kms);
+ lov_stripe_unlock(lsm);
+ if (kms_only)
+ lvb->lvb_size = kms;
CDEBUG(D_INODE, "merged for FID "DFID" s="LPU64" m="LPU64" a="LPU64
" c="LPU64" b="LPU64"\n",
lsm->lsm_object_seq, (__u32)lsm->lsm_object_id,
(__u32)(lsm->lsm_object_id >> 32), lvb->lvb_size, lvb->lvb_mtime,
lvb->lvb_atime, lvb->lvb_ctime, lvb->lvb_blocks);
- RETURN(rc);
+ RETURN(rc);
}
/* Must be called under the lov_stripe_lock() */
RETURN(rc);
}
-int lov_test_and_clear_async_rc(struct lov_stripe_md *lsm)
-{
- int i, rc = 0;
- ENTRY;
-
- for (i = 0; i < lsm->lsm_stripe_count; i++) {
- struct lov_oinfo *loi = lsm->lsm_oinfo[i];
- if (loi->loi_ar.ar_rc && !rc)
- rc = loi->loi_ar.ar_rc;
- loi->loi_ar.ar_rc = 0;
- }
- RETURN(rc);
-}
-EXPORT_SYMBOL(lov_test_and_clear_async_rc);
-
-
static int lov_extent_calc(struct obd_export *exp, struct lov_stripe_md *lsm,
int cmd, __u64 *offset)
{
* Implementation of cl_object for LOV layer.
*
* Author: Nikita Danilov <nikita.danilov@sun.com>
+ * Author: Jinshan Xiong <jinshan.xiong@whamcloud.com>
*/
#define DEBUG_SUBSYSTEM S_LOV
#include "lov_cl_internal.h"
+#include <lustre_debug.h>
/** \addtogroup lov
* @{
struct lov_layout_raid0 *r0 = &state->raid0;
ENTRY;
- r0->lo_nr = conf->u.coc_md->lsm->lsm_stripe_count;
- r0->lo_lsm = conf->u.coc_md->lsm;
+
+ if (lsm->lsm_magic != LOV_MAGIC_V1 && lsm->lsm_magic != LOV_MAGIC_V3) {
+ dump_lsm(D_ERROR, lsm);
+ LASSERTF(0, "magic mismatch, expected %d/%d, actual %d.\n",
+ LOV_MAGIC_V1, LOV_MAGIC_V3, lsm->lsm_magic);
+ }
+
+ r0->lo_lsm = lsm_addref(lsm);
+ r0->lo_nr = lsm->lsm_stripe_count;
LASSERT(r0->lo_nr <= lov_targets_nr(dev));
OBD_ALLOC_LARGE(r0->lo_sub, r0->lo_nr * sizeof r0->lo_sub[0]);
static void lov_delete_raid0(const struct lu_env *env, struct lov_object *lov,
union lov_layout_state *state)
{
- struct lov_layout_raid0 *r0 = &state->raid0;
- int i;
+ struct lov_layout_raid0 *r0 = &state->raid0;
+ struct lov_stripe_md *lsm = r0->lo_lsm;
+ struct l_wait_info lwi = { 0 };
+ int i;
+
+ ENTRY;
+
+ /* wait until there is no extra users. */
+ dump_lsm(D_INODE, lsm);
+ l_wait_event(lov->lo_waitq, cfs_atomic_read(&lsm->lsm_refc) == 1, &lwi);
- ENTRY;
if (r0->lo_sub != NULL) {
for (i = 0; i < r0->lo_nr; ++i) {
struct lovsub_object *los = r0->lo_sub[i];
struct lov_layout_raid0 *r0 = &state->raid0;
ENTRY;
+
if (r0->lo_sub != NULL) {
OBD_FREE_LARGE(r0->lo_sub, r0->lo_nr * sizeof r0->lo_sub[0]);
r0->lo_sub = NULL;
}
- EXIT;
+
+ LASSERT(cfs_atomic_read(&r0->lo_lsm->lsm_refc) == 1);
+ lov_free_memmd(&r0->lo_lsm);
+
+ EXIT;
}
static int lov_print_empty(const struct lu_env *env, void *cookie,
cl_env_put(nested, &refcheck);
cl_env_reexit(cookie);
+ old_ops->llo_delete(env, obj, &obj->u);
old_ops->llo_fini(env, obj, &obj->u);
LASSERT(cfs_list_empty(&hdr->coh_locks));
LASSERT(hdr->coh_tree.rnode == NULL);
ENTRY;
cfs_init_rwsem(&lov->lo_type_guard);
+ cfs_waitq_init(&lov->lo_waitq);
/* no locking is necessary, as object is being created */
lov->lo_type = cconf->u.coc_md->lsm != NULL ? LLT_RAID0 : LLT_EMPTY;
static int lov_conf_set(const struct lu_env *env, struct cl_object *obj,
const struct cl_object_conf *conf)
{
- struct lov_object *lov = cl2lov(obj);
- int result;
+ struct lov_stripe_md *lsm = conf->u.coc_md->lsm;
+ struct lov_object *lov = cl2lov(obj);
+ int result = 0;
ENTRY;
/*
cfs_down_write(&lov->lo_type_guard);
LASSERT(lov->lo_owner == NULL);
lov->lo_owner = cfs_current();
- if (lov->lo_type == LLT_EMPTY && conf->u.coc_md->lsm != NULL)
- result = lov_layout_change(env, lov, LLT_RAID0, conf);
- else
- result = -EOPNOTSUPP;
- lov->lo_owner = NULL;
- cfs_up_write(&lov->lo_type_guard);
- RETURN(result);
+ switch (lov->lo_type) {
+ case LLT_EMPTY:
+ if (lsm != NULL)
+ result = lov_layout_change(env, lov, LLT_RAID0, conf);
+ break;
+ case LLT_RAID0:
+ if (lsm == NULL || lov_stripe_md_cmp(lov->u.raid0.lo_lsm, lsm))
+ result = -EOPNOTSUPP;
+ break;
+ default:
+ LBUG();
+ }
+ lov->lo_owner = NULL;
+ cfs_up_write(&lov->lo_type_guard);
+ RETURN(result);
}
static void lov_object_delete(const struct lu_env *env, struct lu_object *obj)
RETURN(obj);
}
+struct lov_stripe_md *lov_lsm_addref(struct lov_object *lov)
+{
+ struct lov_stripe_md *lsm = NULL;
+
+ cfs_down_read(&lov->lo_type_guard);
+ switch (lov->lo_type) {
+ case LLT_RAID0:
+ lsm = lsm_addref(lov->u.raid0.lo_lsm);
+ case LLT_EMPTY:
+ break;
+ default:
+ LBUG();
+ }
+ cfs_up_read(&lov->lo_type_guard);
+ return lsm;
+}
+
+void lov_lsm_decref(struct lov_object *lov, struct lov_stripe_md *lsm)
+{
+ if (lsm == NULL)
+ return;
+
+ lov_free_memmd(&lsm);
+ if (lov->lo_owner != NULL)
+ cfs_waitq_signal(&lov->lo_waitq);
+}
+
+struct lov_stripe_md *lov_lsm_get(struct cl_object *clobj)
+{
+ struct lu_object *luobj;
+ struct lov_stripe_md *lsm = NULL;
+
+ if (clobj == NULL)
+ return NULL;
+
+ luobj = lu_object_locate(&cl_object_header(clobj)->coh_lu,
+ &lov_device_type);
+ if (luobj != NULL)
+ lsm = lov_lsm_addref(lu2lov(luobj));
+ return lsm;
+}
+EXPORT_SYMBOL(lov_lsm_get);
+
+void lov_lsm_put(struct cl_object *clobj, struct lov_stripe_md *lsm)
+{
+ struct lu_object *luobj;
+
+ if (clobj == NULL || lsm == NULL)
+ return;
+
+ luobj = lu_object_locate(&cl_object_header(clobj)->coh_lu,
+ &lov_device_type);
+ LASSERT(luobj != NULL);
+
+ lov_lsm_decref(lu2lov(luobj), lsm);
+}
+EXPORT_SYMBOL(lov_lsm_put);
+
+int lov_read_and_clear_async_rc(struct cl_object *clob)
+{
+ struct lu_object *luobj;
+ int rc = 0;
+ ENTRY;
+
+ luobj = lu_object_locate(&cl_object_header(clob)->coh_lu,
+ &lov_device_type);
+ if (luobj != NULL) {
+ struct lov_object *lov = lu2lov(luobj);
+
+ cfs_down_read(&lov->lo_type_guard);
+ switch (lov->lo_type) {
+ case LLT_RAID0: {
+ struct lov_stripe_md *lsm;
+ int i;
+
+ lsm = lov->u.raid0.lo_lsm;
+ LASSERT(lsm != NULL);
+ for (i = 0; i < lsm->lsm_stripe_count; i++) {
+ struct lov_oinfo *loi = lsm->lsm_oinfo[i];
+ if (loi->loi_ar.ar_rc && !rc)
+ rc = loi->loi_ar.ar_rc;
+ loi->loi_ar.ar_rc = 0;
+ }
+ }
+ case LLT_EMPTY:
+ break;
+ default:
+ LBUG();
+ }
+ cfs_up_read(&lov->lo_type_guard);
+ }
+ RETURN(rc);
+}
+EXPORT_SYMBOL(lov_read_and_clear_async_rc);
+
/** @} lov */
RETURN(-ENOMEM);
}
+ cfs_atomic_set(&(*lsmp)->lsm_refc, 1);
cfs_spin_lock_init(&(*lsmp)->lsm_lock);
(*lsmp)->lsm_magic = magic;
(*lsmp)->lsm_stripe_count = stripe_count;
RETURN(lsm_size);
}
-void lov_free_memmd(struct lov_stripe_md **lsmp)
+int lov_free_memmd(struct lov_stripe_md **lsmp)
{
- struct lov_stripe_md *lsm = *lsmp;
-
- LASSERT(lsm_op_find(lsm->lsm_magic) != NULL);
- lsm_op_find(lsm->lsm_magic)->lsm_free(lsm);
-
- *lsmp = NULL;
+ struct lov_stripe_md *lsm = *lsmp;
+ int refc;
+
+ *lsmp = NULL;
+ LASSERT(cfs_atomic_read(&lsm->lsm_refc) > 0);
+ if ((refc = cfs_atomic_dec_return(&lsm->lsm_refc)) == 0) {
+ LASSERT(lsm_op_find(lsm->lsm_magic) != NULL);
+ lsm_op_find(lsm->lsm_magic)->lsm_free(lsm);
+ }
+ return refc;
}
void dump_lsm(int level, struct lov_stripe_md *lsm)
{
- CDEBUG(level, "lsm %p, objid "LPX64", maxbytes "LPX64", magic 0x%08X, "
- "stripe_size %u, stripe_count %u, "
- "layout_gen %u, pool ["LOV_POOLNAMEF"]\n", lsm,
- lsm->lsm_object_id, lsm->lsm_maxbytes, lsm->lsm_magic,
- lsm->lsm_stripe_size, lsm->lsm_stripe_count,
- lsm->lsm_layout_gen, lsm->lsm_pool_name);
+ CDEBUG(level, "lsm %p, objid "LPX64", maxbytes "LPX64", magic 0x%08X, "
+ "stripe_size %u, stripe_count %u, refc: %d, "
+ "layout_gen %u, pool ["LOV_POOLNAMEF"]\n", lsm,
+ lsm->lsm_object_id, lsm->lsm_maxbytes, lsm->lsm_magic,
+ lsm->lsm_stripe_size, lsm->lsm_stripe_count,
+ cfs_atomic_read(&lsm->lsm_refc), lsm->lsm_layout_gen,
+ lsm->lsm_pool_name);
}
#define LPDS sizeof(__u64)
{
struct echo_object *eco = cl2echo_obj(lu2cl(obj));
struct echo_client_obd *ec = eco->eo_dev->ed_ec;
- struct lov_stripe_md *lsm = eco->eo_lsm;
ENTRY;
LASSERT(cfs_atomic_read(&eco->eo_npages) == 0);
lu_object_fini(obj);
lu_object_header_fini(obj->lo_header);
- if (lsm)
- obd_free_memmd(ec->ec_exp, &lsm);
- OBD_SLAB_FREE_PTR(eco, echo_object_kmem);
- EXIT;
+ if (eco->eo_lsm)
+ obd_free_memmd(ec->ec_exp, &eco->eo_lsm);
+ OBD_SLAB_FREE_PTR(eco, echo_object_kmem);
+ EXIT;
}
static int echo_object_print(const struct lu_env *env, void *cookie,