From 8f15cf15aba4f4f2c32ce7044ec548fbe7db886f Mon Sep 17 00:00:00 2001 From: adilger Date: Thu, 12 Sep 2002 00:47:45 +0000 Subject: [PATCH] Large commit which implements more "intelligent" offsets for stripe creation in the LOV. Most of it is just variable renaming. Needs some testing. --- lustre/include/linux/lustre_idl.h | 23 +- lustre/include/linux/lustre_lite.h | 16 +- lustre/include/linux/obd.h | 34 ++- lustre/include/linux/obd_class.h | 52 +++-- lustre/include/linux/obd_lov.h | 10 + lustre/lib/l_net.c | 1 + lustre/lib/lov_pack.c | 66 ++++-- lustre/lib/obd_pack.c | 4 +- lustre/llite/file.c | 83 +++---- lustre/llite/namei.c | 78 +++---- lustre/llite/rw.c | 18 +- lustre/llite/super.c | 57 ++--- lustre/lov/lov_obd.c | 467 ++++++++++++++++++++++--------------- lustre/mdc/mdc_reint.c | 13 +- lustre/mdc/mdc_request.c | 11 +- lustre/mds/handler.c | 12 +- lustre/mds/mds_extN.c | 37 +-- lustre/mds/mds_lov.c | 2 +- lustre/mds/mds_reint.c | 42 ++-- lustre/obdclass/class_obd.c | 17 +- lustre/obdfilter/filter.c | 8 +- lustre/osc/osc_request.c | 11 +- lustre/utils/obd.c | 5 + 23 files changed, 594 insertions(+), 473 deletions(-) diff --git a/lustre/include/linux/lustre_idl.h b/lustre/include/linux/lustre_idl.h index 17d8c44..557f183 100644 --- a/lustre/include/linux/lustre_idl.h +++ b/lustre/include/linux/lustre_idl.h @@ -193,14 +193,15 @@ struct lov_object_id { /* per-child structure */ #define LOV_MAGIC 0x0BD00BD0 struct lov_mds_md { - __u32 lmd_magic; - __u32 lmd_easize; /* packed size of extended */ - __u64 lmd_object_id; /* lov object id */ - __u64 lmd_stripe_offset; /* offset of the stripe */ - __u64 lmd_stripe_size; /* size of the stripe */ - __u32 lmd_stripe_count; /* how many objects are being striped */ - __u32 lmd_stripe_pattern; /* per-lov object stripe pattern */ - struct lov_object_id lmd_objects[0]; + __u32 lmm_magic; + __u32 lmm_easize; /* packed size of extended */ + __u64 lmm_object_id; /* lov object id */ + __u32 lmm_stripe_offset; /* starting stripe offset in lmd_objects */ + __u32 lmm_stripe_count; /* number of stipes in use for this object */ + __u64 lmm_stripe_size; /* size of the stripe */ + __u32 lmm_ost_count; /* how many OSTs are in this LOV */ + __u32 lmm_stripe_pattern; /* per-lov object stripe pattern */ + struct lov_object_id lmm_objects[0]; }; #define OBD_MD_FLALL (0xffffffff) @@ -405,21 +406,21 @@ struct mds_rec_rename { }; -/* +/* * LOV data structures */ #define LOV_RAID0 0 #define LOV_RAIDRR 1 -struct lov_desc { +struct lov_desc { __u32 ld_tgt_count; /* how many OBD's */ __u32 ld_default_stripe_count; /* how many objects are used */ __u64 ld_default_stripe_size; /* in bytes */ __u64 ld_default_stripe_offset; /* in bytes */ __u32 ld_pattern; /* RAID 0,1 etc */ uuid_t ld_uuid; -}; +}; /* * LDLM requests: diff --git a/lustre/include/linux/lustre_lite.h b/lustre/include/linux/lustre_lite.h index 712d410..e149a75 100644 --- a/lustre/include/linux/lustre_lite.h +++ b/lustre/include/linux/lustre_lite.h @@ -30,9 +30,9 @@ struct ll_file_data { }; -struct ll_inode_md { - struct mds_body *body; - struct lov_mds_md *md; +struct ll_read_inode2_cookie { + struct mds_body *lic_body; + struct lov_mds_md *lic_lmm; }; #define LL_IOC_GETFLAGS _IOR ('f', 151, long) @@ -121,6 +121,16 @@ static inline void ll_inode2fid(struct ll_fid *fid, struct inode *inode) inode->i_mode & S_IFMT); } +static inline int ll_mds_easize(struct super_block *sb) +{ + return sbi2mdc(ll_s2sbi(sb))->cl_max_mds_easize; +} + +static inline int ll_ost_easize(struct super_block *sb) +{ + return sbi2mdc(ll_s2sbi(sb))->cl_max_ost_easize; +} + /* namei.c */ int ll_lock(struct inode *dir, struct dentry *dentry, struct lookup_intent *it, struct lustre_handle *lockh); diff --git a/lustre/include/linux/obd.h b/lustre/include/linux/obd.h index f74c2b3..268b0533 100644 --- a/lustre/include/linux/obd.h +++ b/lustre/include/linux/obd.h @@ -34,30 +34,21 @@ struct brw_page { }; struct lov_oinfo { /* per-child structure */ - __u64 loi_id; - __u64 loi_size; + __u64 loi_id; /* object ID on the target OST */ + __u64 loi_size; /* size of this object on the target OST */ + int loi_ost_idx; /* OST stripe index in lmd_objects array */ }; struct lov_stripe_md { - __u32 lmd_magic; - __u32 lmd_mds_easize; /* packed size for MDS of ea */ - __u64 lmd_object_id; /* lov object id */ - __u64 lmd_stripe_offset; /* offset of the stripe */ - __u64 lmd_stripe_size; /* size of the stripe */ - __u32 lmd_stripe_count; /* how many objects are being striped */ - __u32 lmd_stripe_pattern; /* per-lov object stripe pattern */ - struct lov_oinfo lmd_oinfo[0]; -}; - -struct lov_stripe_md_one { - __u32 lmd_magic; - __u32 lmd_easize; /* packed size for MDS of ea */ - __u64 lmd_object_id; /* lov object id */ - __u64 lmd_stripe_offset; /* offset of the stripe */ - __u64 lmd_stripe_size; /* size of the stripe */ - __u32 lmd_stripe_count; /* how many objects are being striped */ - __u32 lmd_stripe_pattern; /* per-lov object stripe pattern */ - struct lov_oinfo lmd_oinfo[1]; + __u32 lsm_magic; + __u32 lsm_mds_easize; /* packed size for MDS of ea */ + __u64 lsm_object_id; /* lov object id */ + __u64 lsm_stripe_size; /* size of the stripe */ + __u32 lsm_stripe_pattern; /* per-lov object stripe pattern */ + int lsm_stripe_offset; /* offset of first stripe in lmd_objects */ + int lsm_stripe_count; /* how many objects are being striped on */ + int lsm_ost_count; /* how many OSTs are in this LOV */ + struct lov_oinfo lsm_oinfo[0]; }; /* Individual type definitions */ @@ -106,6 +97,7 @@ struct client_obd { int cl_conn_count; __u8 cl_target_uuid[37]; /* XXX -> lustre_name */ int cl_max_mds_easize; + int cl_max_ost_easize; }; struct mds_obd { diff --git a/lustre/include/linux/obd_class.h b/lustre/include/linux/obd_class.h index b880f8a..d1de775 100644 --- a/lustre/include/linux/obd_class.h +++ b/lustre/include/linux/obd_class.h @@ -192,7 +192,8 @@ static inline int obd_cleanup(struct obd_device *obd) RETURN(rc); } -static inline int obd_create(struct lustre_handle *conn, struct obdo *obdo, struct lov_stripe_md **ea) +static inline int obd_create(struct lustre_handle *conn, struct obdo *obdo, + struct lov_stripe_md **ea) { int rc; struct obd_export *export; @@ -207,7 +208,8 @@ static inline int obd_create(struct lustre_handle *conn, struct obdo *obdo, stru RETURN(rc); } -static inline int obd_destroy(struct lustre_handle *conn, struct obdo *obdo, struct lov_stripe_md *ea) +static inline int obd_destroy(struct lustre_handle *conn, struct obdo *obdo, + struct lov_stripe_md *ea) { int rc; struct obd_export *export; @@ -218,8 +220,7 @@ static inline int obd_destroy(struct lustre_handle *conn, struct obdo *obdo, str RETURN(rc); } -static inline int obd_getattr(struct lustre_handle *conn, - struct obdo *obdo, +static inline int obd_getattr(struct lustre_handle *conn, struct obdo *obdo, struct lov_stripe_md *ea) { int rc; @@ -231,7 +232,8 @@ static inline int obd_getattr(struct lustre_handle *conn, RETURN(rc); } -static inline int obd_close(struct lustre_handle *conn, struct obdo *obdo, struct lov_stripe_md *md) +static inline int obd_close(struct lustre_handle *conn, struct obdo *obdo, + struct lov_stripe_md *md) { int rc; struct obd_export *export; @@ -241,7 +243,7 @@ static inline int obd_close(struct lustre_handle *conn, struct obdo *obdo, struc rc = OBP(export->exp_obd, close)(conn, obdo, md); RETURN(rc); } -static inline int obd_open(struct lustre_handle *conn, struct obdo *obdo, +static inline int obd_open(struct lustre_handle *conn, struct obdo *obdo, struct lov_stripe_md *md) { int rc; @@ -253,8 +255,7 @@ static inline int obd_open(struct lustre_handle *conn, struct obdo *obdo, RETURN(rc); } -static inline int obd_setattr(struct lustre_handle *conn, - struct obdo *obdo, +static inline int obd_setattr(struct lustre_handle *conn, struct obdo *obdo, struct lov_stripe_md *ea) { int rc; @@ -300,7 +301,7 @@ static inline int obd_statfs(struct lustre_handle *conn,struct obd_statfs *osfs) } static inline int obd_punch(struct lustre_handle *conn, struct obdo *tgt, - struct lov_stripe_md *md, + struct lov_stripe_md *ea, obd_size count, obd_off offset) { int rc; @@ -308,14 +309,13 @@ static inline int obd_punch(struct lustre_handle *conn, struct obdo *tgt, OBD_CHECK_SETUP(conn, export); OBD_CHECK_OP(export->exp_obd,punch); - rc = OBP(export->exp_obd, punch)(conn, tgt, md, count, offset); + rc = OBP(export->exp_obd, punch)(conn, tgt, ea, count, offset); RETURN(rc); } -static inline int obd_brw(int cmd, struct lustre_handle *conn, - struct lov_stripe_md *md, - obd_count oa_bufs, - struct brw_page *pg, +static inline int obd_brw(int cmd, struct lustre_handle *conn, + struct lov_stripe_md *ea, obd_count oa_bufs, + struct brw_page *pg, brw_callback_t callback, void *data) { int rc; @@ -328,7 +328,8 @@ static inline int obd_brw(int cmd, struct lustre_handle *conn, LBUG(); } - rc = OBP(export->exp_obd, brw)(cmd, conn, md, oa_bufs, pg, callback, data); + rc = OBP(export->exp_obd, brw)(cmd, conn, ea, oa_bufs, pg, callback, + data); RETURN(rc); } @@ -343,7 +344,7 @@ static inline int obd_preprw(int cmd, struct lustre_handle *conn, OBD_CHECK_OP(export->exp_obd,preprw); rc = OBP(export->exp_obd, preprw)(cmd, conn, objcount, obj, niocount, - remote, local, desc_private); + remote, local, desc_private); RETURN(rc); } @@ -358,7 +359,7 @@ static inline int obd_commitrw(int cmd, struct lustre_handle *conn, OBD_CHECK_OP(export->exp_obd,commitrw); rc = OBP(export->exp_obd, commitrw)(cmd, conn, objcount, obj, niocount, - local, desc_private); + local, desc_private); RETURN(rc); } @@ -375,8 +376,8 @@ static inline int obd_iocontrol(int cmd, struct lustre_handle *conn, } static inline int obd_enqueue(struct lustre_handle *conn, - struct lov_stripe_md *md, - struct lustre_handle *parent_lock, + struct lov_stripe_md *ea, + struct lustre_handle *parent_lock, __u32 type, void *cookie, int cookielen, __u32 mode, int *flags, void *cb, void *data, int datalen, struct lustre_handle *lockh) @@ -386,13 +387,14 @@ static inline int obd_enqueue(struct lustre_handle *conn, OBD_CHECK_SETUP(conn, export); OBD_CHECK_OP(export->exp_obd,enqueue); - rc = OBP(export->exp_obd, enqueue)(conn, md, parent_lock, type, - cookie, cookielen, mode, flags, cb, - data, datalen, lockh); + rc = OBP(export->exp_obd, enqueue)(conn, ea, parent_lock, type, + cookie, cookielen, mode, flags, cb, + data, datalen, lockh); RETURN(rc); } -static inline int obd_cancel(struct lustre_handle *conn, struct lov_stripe_md *md, __u32 mode, +static inline int obd_cancel(struct lustre_handle *conn, + struct lov_stripe_md *ea, __u32 mode, struct lustre_handle *lockh) { int rc; @@ -400,11 +402,11 @@ static inline int obd_cancel(struct lustre_handle *conn, struct lov_stripe_md *m OBD_CHECK_SETUP(conn, export); OBD_CHECK_OP(export->exp_obd,cancel); - rc = OBP(export->exp_obd, cancel)(conn, md, mode, lockh); + rc = OBP(export->exp_obd, cancel)(conn, ea, mode, lockh); RETURN(rc); } -#endif +#endif /* * ======== OBD Metadata Support =========== diff --git a/lustre/include/linux/obd_lov.h b/lustre/include/linux/obd_lov.h index 5aace10..d3b3ab1 100644 --- a/lustre/include/linux/obd_lov.h +++ b/lustre/include/linux/obd_lov.h @@ -14,5 +14,15 @@ void lov_packdesc(struct lov_desc *ld); void lov_packmd(struct lov_mds_md *mdsmd, struct lov_stripe_md *md); void lov_unpackmd(struct lov_stripe_md *md, struct lov_mds_md *mdsmd); +static inline int lov_stripe_md_size(int stripes) +{ + return sizeof(struct lov_stripe_md) + stripes*sizeof(struct lov_oinfo); +} + +static inline int lov_mds_md_size(int stripes) +{ + return sizeof(struct lov_mds_md) + stripes*sizeof(struct lov_object_id); +} + #endif #endif diff --git a/lustre/lib/l_net.c b/lustre/lib/l_net.c index 39f74c5..9a7ae96 100644 --- a/lustre/lib/l_net.c +++ b/lustre/lib/l_net.c @@ -101,6 +101,7 @@ int client_obd_setup(struct obd_device *obddev, obd_count len, void *buf) cli->cl_import.imp_obd = obddev; cli->cl_max_mds_easize = sizeof(struct lov_mds_md); + cli->cl_max_ost_easize = sizeof(struct lov_stripe_md); MOD_INC_USE_COUNT; RETURN(0); diff --git a/lustre/lib/lov_pack.c b/lustre/lib/lov_pack.c index 19b8301..18eae31 100644 --- a/lustre/lib/lov_pack.c +++ b/lustre/lib/lov_pack.c @@ -44,38 +44,58 @@ void lov_unpackdesc(struct lov_desc *ld) ld->ld_pattern = HTON__u32(ld->ld_pattern); } -void lov_packmd(struct lov_mds_md *mdsmd, struct lov_stripe_md *md) +void lov_packmd(struct lov_mds_md *lmm, struct lov_stripe_md *lsm) { + struct lov_oinfo *loi; int i; - mdsmd->lmd_magic = md->lmd_magic; - mdsmd->lmd_easize = md->lmd_mds_easize; - mdsmd->lmd_object_id = md->lmd_object_id; - mdsmd->lmd_stripe_offset = md->lmd_stripe_offset; - mdsmd->lmd_stripe_count = md->lmd_stripe_count; - mdsmd->lmd_stripe_size = md->lmd_stripe_size; - mdsmd->lmd_stripe_pattern = md->lmd_stripe_pattern; - for (i = 0; i < md->lmd_stripe_count; i++) - mdsmd->lmd_objects[i].l_object_id = md->lmd_oinfo[i].loi_id; + /* XXX endianness */ + lmm->lmm_magic = (lsm->lsm_magic); + lmm->lmm_easize = (lsm->lsm_mds_easize); + lmm->lmm_object_id = (lsm->lsm_object_id); + lmm->lmm_stripe_size = (lsm->lsm_stripe_size); + lmm->lmm_stripe_pattern = (lsm->lsm_stripe_pattern); + lmm->lmm_ost_count = (lsm->lsm_ost_count); + lmm->lmm_stripe_count = (lsm->lsm_stripe_count); + lmm->lmm_stripe_offset = (lsm->lsm_stripe_offset); + + /* Only fill in the object ids which we are actually using. + * Assumes lmd_objects is otherwise zero-filled. */ + for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) { + lmm->lmm_objects[loi->loi_ost_idx].l_object_id = + (loi->loi_id); + } } -void lov_unpackmd(struct lov_stripe_md *md, struct lov_mds_md *mdsmd) +void lov_unpackmd(struct lov_stripe_md *lsm, struct lov_mds_md *lmm) { + struct lov_oinfo *loi; + int ost_count, ost_offset; int i; - md->lmd_magic = mdsmd->lmd_magic; - md->lmd_mds_easize = mdsmd->lmd_easize; - md->lmd_object_id = mdsmd->lmd_object_id; - md->lmd_stripe_offset = mdsmd->lmd_stripe_offset; - md->lmd_stripe_count = mdsmd->lmd_stripe_count; - md->lmd_stripe_size = mdsmd->lmd_stripe_size; - md->lmd_stripe_pattern = mdsmd->lmd_stripe_pattern; - for (i = 0; i < md->lmd_stripe_count; i++) { - md->lmd_oinfo[i].loi_id = mdsmd->lmd_objects[i].l_object_id; - md->lmd_oinfo[i].loi_size = 0; - } -} + /* XXX endianness */ + lsm->lsm_magic = (lmm->lmm_magic); + lsm->lsm_mds_easize = (lmm->lmm_easize); + lsm->lsm_object_id = (lmm->lmm_object_id); + lsm->lsm_stripe_size = (lmm->lmm_stripe_size); + lsm->lsm_stripe_pattern = (lmm->lmm_stripe_pattern); + lsm->lsm_ost_count = (lmm->lmm_ost_count); + lsm->lsm_stripe_count = (lmm->lmm_stripe_count); + lsm->lsm_stripe_offset = (lmm->lmm_stripe_offset); + ost_count = lsm->lsm_ost_count; + ost_offset = lsm->lsm_stripe_offset; + for (i = 0, loi = lsm->lsm_oinfo; i < ost_count; i++, ost_offset++) { + ost_offset %= ost_count; + if (!lmm->lmm_objects[ost_offset].l_object_id) + continue; + LASSERT(loi - lsm->lsm_oinfo < lsm->lsm_stripe_count); + loi->loi_id = (lmm->lmm_objects[ost_offset].l_object_id); + loi->loi_ost_idx = ost_offset; + loi->loi_size = 0; /* set by LOV later */ + loi++; + } +} diff --git a/lustre/lib/obd_pack.c b/lustre/lib/obd_pack.c index b202791..8b3c33a 100644 --- a/lustre/lib/obd_pack.c +++ b/lustre/lib/obd_pack.c @@ -27,12 +27,12 @@ #include #include -void ost_pack_ioo(void **tmp, struct lov_stripe_md *md, int bufcnt) +void ost_pack_ioo(void **tmp, struct lov_stripe_md *lsm, int bufcnt) { struct obd_ioobj *ioo = *tmp; char *c = *tmp; - ioo->ioo_id = HTON__u64(md->lmd_object_id); + ioo->ioo_id = HTON__u64(lsm->lsm_object_id); ioo->ioo_gr = HTON__u64(0); ioo->ioo_type = HTON__u32(S_IFREG); ioo->ioo_bufcnt = HTON__u32(bufcnt); diff --git a/lustre/llite/file.c b/lustre/llite/file.c index a1aeaf7..44b5f49 100644 --- a/lustre/llite/file.c +++ b/lustre/llite/file.c @@ -37,7 +37,7 @@ static int ll_file_open(struct inode *inode, struct file *file) struct ptlrpc_request *req = NULL; struct ll_file_data *fd; struct obdo *oa = NULL; - struct lov_stripe_md *md = NULL; + struct lov_stripe_md *lsm = NULL; struct ll_sb_info *sbi = ll_i2sbi(inode); struct ll_inode_info *lli = ll_i2info(inode); ENTRY; @@ -45,12 +45,13 @@ static int ll_file_open(struct inode *inode, struct file *file) if (file->private_data) LBUG(); + lsm = lli->lli_smd; + /* delayed create of object (intent created inode) */ /* XXX object needs to be cleaned up if mdc_open fails */ /* XXX error handling appropriate here? */ - if (lli->lli_smd == NULL) { - struct client_obd *mdc = sbi2mdc(ll_s2sbi(inode->i_sb)); - struct inode * inode = file->f_dentry->d_inode; + if (lsm == NULL) { + struct inode *inode = file->f_dentry->d_inode; down(&lli->lli_open_sem); /* Check to see if we lost the race */ @@ -61,7 +62,7 @@ static int ll_file_open(struct inode *inode, struct file *file) RETURN(-ENOMEM); } oa->o_mode = S_IFREG | 0600; - oa->o_easize = mdc->cl_max_mds_easize; + oa->o_easize = ll_mds_easize(inode->i_sb); oa->o_id = inode->i_ino; oa->o_valid = OBD_MD_FLMODE | OBD_MD_FLEASIZE | OBD_MD_FLID; @@ -72,11 +73,11 @@ static int ll_file_open(struct inode *inode, struct file *file) up(&lli->lli_open_sem); RETURN(rc); } - md = lli->lli_smd; } - if (lli->lli_smd->lmd_object_id == 0) + if (lli->lli_smd->lsm_object_id == 0) LBUG(); up(&lli->lli_open_sem); + lsm = lli->lli_smd; } fd = kmem_cache_alloc(ll_file_data_slab, SLAB_KERNEL); @@ -85,7 +86,7 @@ static int ll_file_open(struct inode *inode, struct file *file) memset(fd, 0, sizeof(*fd)); rc = mdc_open(&sbi->ll_mdc_conn, inode->i_ino, S_IFREG | inode->i_mode, - file->f_flags, md, (__u64)(unsigned long)file, + file->f_flags, lsm, (__u64)(unsigned long)file, &fd->fd_mdshandle, &req); fd->fd_req = req; ptlrpc_req_finished(req); @@ -99,10 +100,10 @@ static int ll_file_open(struct inode *inode, struct file *file) if (oa == NULL && (oa = obdo_alloc()) == NULL) GOTO(out_mdc, rc = -EINVAL); - oa->o_id = lli->lli_smd->lmd_object_id; + oa->o_id = lsm->lsm_object_id; oa->o_mode = S_IFREG | inode->i_mode; oa->o_valid = OBD_MD_FLMODE | OBD_MD_FLID | OBD_MD_FLSIZE; - rc = obd_open(ll_i2obdconn(inode), oa, lli->lli_smd); + rc = obd_open(ll_i2obdconn(inode), oa, lsm); obdo_free(oa); oa = NULL; @@ -127,7 +128,7 @@ out: return rc; } -int ll_size_lock(struct inode *inode, struct lov_stripe_md *md, __u64 start, +int ll_size_lock(struct inode *inode, struct lov_stripe_md *lsm, __u64 start, int mode, struct lustre_handle **lockhs_p) { struct ll_sb_info *sbi = ll_i2sbi(inode); @@ -140,25 +141,25 @@ int ll_size_lock(struct inode *inode, struct lov_stripe_md *md, __u64 start, RETURN(0); } - OBD_ALLOC(lockhs, md->lmd_stripe_count * sizeof(*lockhs)); + OBD_ALLOC(lockhs, lsm->lsm_stripe_count * sizeof(*lockhs)); if (lockhs == NULL) RETURN(-ENOMEM); extent.start = start; extent.end = ~0; - rc = obd_enqueue(&sbi->ll_osc_conn, md, NULL, LDLM_EXTENT, &extent, + rc = obd_enqueue(&sbi->ll_osc_conn, lsm, NULL, LDLM_EXTENT, &extent, sizeof(extent), mode, &flags, ll_lock_callback, inode, sizeof(*inode), lockhs); if (rc != ELDLM_OK) { CERROR("lock enqueue: %d\n", rc); - OBD_FREE(lockhs, md->lmd_stripe_count * sizeof(*lockhs)); + OBD_FREE(lockhs, lsm->lsm_stripe_count * sizeof(*lockhs)); } else *lockhs_p = lockhs; RETURN(rc); } -int ll_size_unlock(struct inode *inode, struct lov_stripe_md *md, int mode, +int ll_size_unlock(struct inode *inode, struct lov_stripe_md *lsm, int mode, struct lustre_handle *lockhs) { struct ll_sb_info *sbi = ll_i2sbi(inode); @@ -172,37 +173,37 @@ int ll_size_unlock(struct inode *inode, struct lov_stripe_md *md, int mode, RETURN(-EINVAL); } - rc = obd_cancel(&sbi->ll_osc_conn, md, mode, lockhs); + rc = obd_cancel(&sbi->ll_osc_conn, lsm, mode, lockhs); if (rc != ELDLM_OK) { CERROR("lock cancel: %d\n", rc); LBUG(); } - OBD_FREE(lockhs, md->lmd_stripe_count * sizeof(*lockhs)); + OBD_FREE(lockhs, lsm->lsm_stripe_count * sizeof(*lockhs)); RETURN(rc); } -int ll_file_size(struct inode *inode, struct lov_stripe_md *md) +int ll_file_size(struct inode *inode, struct lov_stripe_md *lsm) { struct ll_sb_info *sbi = ll_i2sbi(inode); struct lustre_handle *lockhs; struct obdo oa; int err, rc; - rc = ll_size_lock(inode, md, 0, LCK_PR, &lockhs); + rc = ll_size_lock(inode, lsm, 0, LCK_PR, &lockhs); if (rc != ELDLM_OK) { CERROR("lock enqueue: %d\n", rc); RETURN(rc); } - oa.o_id = md->lmd_object_id; + oa.o_id = lsm->lsm_object_id; oa.o_mode = S_IFREG; oa.o_valid = OBD_MD_FLID|OBD_MD_FLMODE|OBD_MD_FLSIZE|OBD_MD_FLBLOCKS; - rc = obd_getattr(&sbi->ll_osc_conn, &oa, md); + rc = obd_getattr(&sbi->ll_osc_conn, &oa, lsm); if (!rc) obdo_to_inode(inode, &oa, oa.o_valid); - err = ll_size_unlock(inode, md, LCK_PR, lockhs); + err = ll_size_unlock(inode, lsm, LCK_PR, lockhs); if (err != ELDLM_OK) { CERROR("lock cancel: %d\n", err); LBUG(); @@ -228,7 +229,7 @@ static int ll_file_release(struct inode *inode, struct file *file) } memset(&oa, 0, sizeof(oa)); - oa.o_id = lli->lli_smd->lmd_object_id; + oa.o_id = lli->lli_smd->lsm_object_id; oa.o_mode = S_IFREG; oa.o_valid = (OBD_MD_FLMODE | OBD_MD_FLID); rc = obd_close(ll_i2obdconn(inode), &oa, lli->lli_smd); @@ -244,7 +245,7 @@ static int ll_file_release(struct inode *inode, struct file *file) if (rc) GOTO(out_fd, abs(rc)); - oa.o_id = lli->lli_smd->lmd_object_id; + oa.o_id = lli->lli_smd->lsm_object_id; oa.o_mode = S_IFREG; oa.o_valid = OBD_MD_FLID | OBD_MD_FLMODE | OBD_MD_FLSIZE | OBD_MD_FLBLOCKS; @@ -361,7 +362,7 @@ static ssize_t ll_file_read(struct file *filp, char *buf, size_t count, struct ll_sb_info *sbi = ll_i2sbi(inode); struct ldlm_extent extent; struct lustre_handle *lockhs = NULL; - struct lov_stripe_md *md = ll_i2info(inode)->lli_smd; + struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd; int flags = 0; ldlm_error_t err; ssize_t retval; @@ -369,7 +370,7 @@ static ssize_t ll_file_read(struct file *filp, char *buf, size_t count, if (!(fd->fd_flags & LL_FILE_IGNORE_LOCK) || sbi->ll_flags & LL_SBI_NOLCK) { - OBD_ALLOC(lockhs, md->lmd_stripe_count * sizeof(*lockhs)); + OBD_ALLOC(lockhs, lsm->lsm_stripe_count * sizeof(*lockhs)); if (!lockhs) RETURN(-ENOMEM); @@ -378,12 +379,12 @@ static ssize_t ll_file_read(struct file *filp, char *buf, size_t count, CDEBUG(D_INFO, "Locking inode %ld, start "LPU64" end "LPU64"\n", inode->i_ino, extent.start, extent.end); - err = obd_enqueue(&sbi->ll_osc_conn, md, NULL, LDLM_EXTENT, + err = obd_enqueue(&sbi->ll_osc_conn, lsm, NULL, LDLM_EXTENT, &extent, sizeof(extent), LCK_PR, &flags, ll_lock_callback, inode, sizeof(*inode), lockhs); if (err != ELDLM_OK) { - OBD_FREE(lockhs, md->lmd_stripe_count *sizeof(*lockhs)); + OBD_FREE(lockhs, lsm->lsm_stripe_count*sizeof(*lockhs)); CERROR("lock enqueue: err: %d\n", err); RETURN(err); } @@ -398,7 +399,7 @@ static ssize_t ll_file_read(struct file *filp, char *buf, size_t count, if (!(fd->fd_flags & LL_FILE_IGNORE_LOCK) || sbi->ll_flags & LL_SBI_NOLCK) { - err = obd_cancel(&sbi->ll_osc_conn, md, LCK_PR, lockhs); + err = obd_cancel(&sbi->ll_osc_conn, lsm, LCK_PR, lockhs); if (err != ELDLM_OK) { CERROR("lock cancel: err: %d\n", err); retval = err; @@ -406,7 +407,7 @@ static ssize_t ll_file_read(struct file *filp, char *buf, size_t count, } if (lockhs) - OBD_FREE(lockhs, md->lmd_stripe_count * sizeof(*lockhs)); + OBD_FREE(lockhs, lsm->lsm_stripe_count * sizeof(*lockhs)); RETURN(retval); } @@ -421,7 +422,7 @@ ll_file_write(struct file *file, const char *buf, size_t count, loff_t *ppos) struct ll_sb_info *sbi = ll_i2sbi(inode); struct ldlm_extent extent; struct lustre_handle *lockhs = NULL, *eof_lockhs = NULL; - struct lov_stripe_md *md = ll_i2info(inode)->lli_smd; + struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd; int flags = 0; ldlm_error_t err; ssize_t retval; @@ -429,15 +430,15 @@ ll_file_write(struct file *file, const char *buf, size_t count, loff_t *ppos) if (!S_ISBLK(inode->i_mode) && file->f_flags & O_APPEND) { struct obdo oa; - err = ll_size_lock(inode, md, 0, LCK_PW, &eof_lockhs); + err = ll_size_lock(inode, lsm, 0, LCK_PW, &eof_lockhs); if (err) RETURN(err); - oa.o_id = md->lmd_object_id; + oa.o_id = lsm->lsm_object_id; oa.o_mode = inode->i_mode; oa.o_valid = OBD_MD_FLID | OBD_MD_FLMODE | OBD_MD_FLSIZE | OBD_MD_FLBLOCKS; - retval = obd_getattr(&sbi->ll_osc_conn, &oa, md); + retval = obd_getattr(&sbi->ll_osc_conn, &oa, lsm); if (retval) GOTO(out_eof, retval); @@ -447,7 +448,7 @@ ll_file_write(struct file *file, const char *buf, size_t count, loff_t *ppos) if (!(fd->fd_flags & LL_FILE_IGNORE_LOCK) || sbi->ll_flags & LL_SBI_NOLCK) { - OBD_ALLOC(lockhs, md->lmd_stripe_count * sizeof(*lockhs)); + OBD_ALLOC(lockhs, lsm->lsm_stripe_count * sizeof(*lockhs)); if (!lockhs) GOTO(out_eof, retval = -ENOMEM); extent.start = *ppos; @@ -455,7 +456,7 @@ ll_file_write(struct file *file, const char *buf, size_t count, loff_t *ppos) CDEBUG(D_INFO, "Locking inode %ld, start "LPU64" end "LPU64"\n", inode->i_ino, extent.start, extent.end); - err = obd_enqueue(&sbi->ll_osc_conn, md, NULL, LDLM_EXTENT, + err = obd_enqueue(&sbi->ll_osc_conn, lsm, NULL, LDLM_EXTENT, &extent, sizeof(extent), LCK_PW, &flags, ll_lock_callback, inode, sizeof(*inode), lockhs); @@ -465,14 +466,14 @@ ll_file_write(struct file *file, const char *buf, size_t count, loff_t *ppos) } } - CDEBUG(D_INFO, "Writing inode %ld, %ld bytes, offset %Ld\n", - inode->i_ino, (long)count, *ppos); + CDEBUG(D_INFO, "Writing inode %ld, "LPD64" bytes, offset "LPD64"\n", + inode->i_ino, count, *ppos); retval = generic_file_write(file, buf, count, ppos); if (!(fd->fd_flags & LL_FILE_IGNORE_LOCK) || sbi->ll_flags & LL_SBI_NOLCK) { - err = obd_cancel(&sbi->ll_osc_conn, md, LCK_PW, lockhs); + err = obd_cancel(&sbi->ll_osc_conn, lsm, LCK_PW, lockhs); if (err != ELDLM_OK) { CERROR("lock cancel: err: %d\n", err); GOTO(out_free, retval = err); @@ -482,11 +483,11 @@ ll_file_write(struct file *file, const char *buf, size_t count, loff_t *ppos) EXIT; out_free: if (lockhs) - OBD_FREE(lockhs, md->lmd_stripe_count * sizeof(*lockhs)); + OBD_FREE(lockhs, lsm->lsm_stripe_count * sizeof(*lockhs)); out_eof: if (!S_ISBLK(inode->i_mode) && file->f_flags & O_APPEND) { - err = ll_size_unlock(inode, md, LCK_PW, eof_lockhs); + err = ll_size_unlock(inode, lsm, LCK_PW, eof_lockhs); if (err && !retval) retval = err; } diff --git a/lustre/llite/namei.c b/lustre/llite/namei.c index a25f291..c603340 100644 --- a/lustre/llite/namei.c +++ b/lustre/llite/namei.c @@ -86,9 +86,9 @@ static inline int ext2_add_nondir(struct dentry *dentry, struct inode *inode) /* methods */ static int ll_find_inode(struct inode *inode, unsigned long ino, void *opaque) { - struct ll_inode_md *md = opaque; + struct ll_read_inode2_cookie *lic = opaque; - if (inode->i_generation != md->body->generation) + if (inode->i_generation != lic->lic_body->generation) return 0; return 1; @@ -142,7 +142,7 @@ static struct dentry *ll_lookup2(struct inode *dir, struct dentry *dentry, struct ptlrpc_request *request = NULL; struct inode * inode = NULL; struct ll_sb_info *sbi = ll_i2sbi(dir); - struct ll_inode_md md; + struct ll_read_inode2_cookie lic; struct lustre_handle lockh; struct lookup_intent lookup_it = { IT_LOOKUP }; int err, offset; @@ -168,7 +168,7 @@ static struct dentry *ll_lookup2(struct inode *dir, struct dentry *dentry, request = (struct ptlrpc_request *)it->it_data; if (it->it_disposition) { - int mode, easize = 0; + int mode, symlen = 0; obd_flag valid; offset = 1; @@ -209,18 +209,18 @@ static struct dentry *ll_lookup2(struct inode *dir, struct dentry *dentry, } /* Do a getattr now that we have the lock */ - md.body = lustre_msg_buf(request->rq_repmsg, offset); - ino = md.body->fid1.id; - mode = md.body->mode; + lic.lic_body = lustre_msg_buf(request->rq_repmsg, offset); + ino = lic.lic_body->fid1.id; + mode = lic.lic_body->mode; valid = OBD_MD_FLNOTOBD | OBD_MD_FLEASIZE; if (it->it_op == IT_READLINK) { valid |= OBD_MD_LINKNAME; - easize = md.body->size; + symlen = lic.lic_body->size; } ptlrpc_free_req(request); request = NULL; err = mdc_getattr(&sbi->ll_mdc_conn, ino, mode, - valid, easize, &request); + valid, symlen, &request); if (err) { CERROR("failure %d inode %Ld\n", err, (long long)ino); ptlrpc_free_req(request); @@ -249,18 +249,17 @@ static struct dentry *ll_lookup2(struct inode *dir, struct dentry *dentry, } iget: - md.body = lustre_msg_buf(request->rq_repmsg, offset); - if (S_ISREG(md.body->mode)) { - if (request->rq_repmsg->bufcount < offset + 1) - LBUG(); - md.md = lustre_msg_buf(request->rq_repmsg, offset + 1); - if (md.md->lmd_magic != LOV_MAGIC) - md.md = NULL; + lic.lic_body = lustre_msg_buf(request->rq_repmsg, offset); + if (S_ISREG(lic.lic_body->mode)) { + LASSERT(request->rq_repmsg->bufcount < offset + 1); + lic.lic_lmm = lustre_msg_buf(request->rq_repmsg, offset + 1); + if (lic.lic_lmm->lmm_magic != LOV_MAGIC) + lic.lic_lmm = NULL; } else - md.md = NULL; + lic.lic_lmm = NULL; /* No rpc's happen during iget4, -ENOMEM's are possible */ - inode = iget4(dir->i_sb, ino, ll_find_inode, &md); + inode = iget4(dir->i_sb, ino, ll_find_inode, &lic); LASSERT(!IS_ERR(inode)); if (!inode) { @@ -285,7 +284,7 @@ static struct inode *ll_create_node(struct inode *dir, const char *name, int namelen, const char *tgt, int tgtlen, int mode, __u64 extra, struct lookup_intent *it, - struct lov_stripe_md *smd) + struct lov_stripe_md *lsm) { struct inode *inode; struct ptlrpc_request *request = NULL; @@ -294,8 +293,8 @@ static struct inode *ll_create_node(struct inode *dir, const char *name, time_t time = CURRENT_TIME; struct ll_sb_info *sbi = ll_i2sbi(dir); int gid = current->fsgid; - struct ll_inode_md md; - struct lov_mds_md *mds_md = NULL; + struct ll_read_inode2_cookie lic; + struct lov_mds_md *lmm = NULL; int mds_md_size = 0; ENTRY; @@ -309,25 +308,24 @@ static struct inode *ll_create_node(struct inode *dir, const char *name, if (!it || !it->it_disposition) { rc = mdc_create(&sbi->ll_mdc_conn, dir, name, namelen, tgt, tgtlen, mode, current->fsuid, - gid, time, extra, smd, &request); + gid, time, extra, lsm, &request); if (rc) { inode = ERR_PTR(rc); GOTO(out, rc); } body = lustre_msg_buf(request->rq_repmsg, 0); - if (smd != NULL) { - mds_md_size = sizeof (struct lov_mds_md) + - smd->lmd_stripe_count * sizeof(struct lov_object_id); - OBD_ALLOC(mds_md, mds_md_size); - lov_packmd(mds_md, smd); - md.md = mds_md; + if (lsm != NULL) { + mds_md_size = ll_mds_easize(inode->i_sb); + OBD_ALLOC(lmm, mds_md_size); + lov_packmd(lmm, lsm); + lic.lic_lmm = lmm; } else - md.md = NULL; + lic.lic_lmm = NULL; } else { request = it->it_data; body = lustre_msg_buf(request->rq_repmsg, 1); - md.md = NULL; + lic.lic_lmm = NULL; } body->valid = OBD_MD_FLNOTOBD; @@ -338,9 +336,9 @@ static struct inode *ll_create_node(struct inode *dir, const char *name, body->gid = gid; body->mode = mode; - md.body = body; + lic.lic_body = body; - inode = iget4(dir->i_sb, body->ino, ll_find_inode, &md); + inode = iget4(dir->i_sb, body->ino, ll_find_inode, &lic); if (IS_ERR(inode)) { rc = PTR_ERR(inode); CERROR("new_inode -fatal: rc %d\n", rc); @@ -360,8 +358,8 @@ static struct inode *ll_create_node(struct inode *dir, const char *name, EXIT; out: - if (mds_md != NULL) - OBD_FREE(mds_md, mds_md_size); + if (lmm != NULL) + OBD_FREE(lmm, mds_md_size); ptlrpc_free_req(request); return inode; } @@ -429,7 +427,7 @@ static int ll_create(struct inode * dir, struct dentry * dentry, int mode) int err, rc = 0; struct obdo oa; struct inode *inode; - struct lov_stripe_md *smd = NULL; + struct lov_stripe_md *lsm = NULL; struct ll_inode_info *lli = NULL; ENTRY; @@ -437,7 +435,7 @@ static int ll_create(struct inode * dir, struct dentry * dentry, int mode) memset(&oa, 0, sizeof(oa)); oa.o_valid = OBD_MD_FLMODE; oa.o_mode = S_IFREG | 0600; - rc = obd_create(ll_i2obdconn(dir), &oa, &smd); + rc = obd_create(ll_i2obdconn(dir), &oa, &lsm); CDEBUG(D_DENTRY, "name %s mode %o o_id %lld: rc = %d\n", dentry->d_name.name, mode, (long long)oa.o_id, rc); if (rc) @@ -445,7 +443,7 @@ static int ll_create(struct inode * dir, struct dentry * dentry, int mode) } inode = ll_create_node(dir, dentry->d_name.name, dentry->d_name.len, - NULL, 0, mode, 0, dentry->d_it, smd); + NULL, 0, mode, 0, dentry->d_it, lsm); if (IS_ERR(inode)) { rc = PTR_ERR(inode); @@ -468,10 +466,10 @@ static int ll_create(struct inode * dir, struct dentry * dentry, int mode) RETURN(rc); out_destroy: - if (smd) { - oa.o_easize = smd->lmd_mds_easize; + if (lsm) { + oa.o_easize = ll_mds_easize(inode->i_sb); oa.o_valid |= OBD_MD_FLEASIZE; - err = obd_destroy(ll_i2obdconn(dir), &oa, smd); + err = obd_destroy(ll_i2obdconn(dir), &oa, lsm); if (err) CERROR("error destroying objid %Ld on error: err %d\n", (unsigned long long)oa.o_id, err); diff --git a/lustre/llite/rw.c b/lustre/llite/rw.c index d979b25..f389625 100644 --- a/lustre/llite/rw.c +++ b/lustre/llite/rw.c @@ -231,24 +231,24 @@ static int ll_commit_write(struct file *file, struct page *page, void ll_truncate(struct inode *inode) { struct obdo oa = {0}; - struct lov_stripe_md *md = ll_i2info(inode)->lli_smd; + struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd; struct lustre_handle *lockhs = NULL; int err; ENTRY; - if (!md) { + if (!lsm) { /* object not yet allocated */ inode->i_mtime = inode->i_ctime = CURRENT_TIME; return; } - oa.o_id = md->lmd_object_id; + oa.o_id = lsm->lsm_object_id; oa.o_size = inode->i_size; CDEBUG(D_INFO, "calling punch for "LPX64" (all bytes after "LPD64")\n", oa.o_id, oa.o_size); - err = ll_size_lock(inode, md, oa.o_size, LCK_PW, &lockhs); + err = ll_size_lock(inode, lsm, oa.o_size, LCK_PW, &lockhs); if (err) { CERROR("ll_size_lock failed: %d\n", err); /* FIXME: What to do here? It's too late to back out... */ @@ -258,14 +258,14 @@ void ll_truncate(struct inode *inode) oa.o_valid = OBD_MD_FLID; /* truncate == punch to/from start from/to end: set end to -1 for that. */ - err = obd_punch(ll_i2obdconn(inode), &oa, md, inode->i_size, + err = obd_punch(ll_i2obdconn(inode), &oa, lsm, inode->i_size, OBD_PUNCH_EOF); if (err) CERROR("obd_truncate fails (%d)\n", err); else obdo_to_inode(inode, &oa, oa.o_valid); - err = ll_size_unlock(inode, md, LCK_PW, lockhs); + err = ll_size_unlock(inode, lsm, LCK_PW, lockhs); if (err) CERROR("ll_size_unlock failed: %d\n", err); @@ -278,13 +278,13 @@ static int ll_direct_IO(int rw, struct inode *inode, struct kiobuf *iobuf, { obd_count bufs_per_obdo = iobuf->nr_pages; struct ll_inode_info *lli = ll_i2info(inode); - struct lov_stripe_md *md = lli->lli_smd; + struct lov_stripe_md *lsm = lli->lli_smd; struct brw_page *pga; int i, rc = 0; struct io_cb_data *cbd; ENTRY; - if (!md || !md->lmd_object_id) + if (!lsm || !lsm->lsm_object_id) RETURN(-ENOMEM); if (blocksize != PAGE_SIZE) { @@ -313,7 +313,7 @@ static int ll_direct_IO(int rw, struct inode *inode, struct kiobuf *iobuf, } rc = obd_brw(rw == WRITE ? OBD_BRW_WRITE : OBD_BRW_READ, - ll_i2obdconn(inode), md, bufs_per_obdo, pga, + ll_i2obdconn(inode), lsm, bufs_per_obdo, pga, ll_sync_io_cb, cbd); if (rc == 0) rc = bufs_per_obdo * PAGE_SIZE; diff --git a/lustre/llite/super.c b/lustre/llite/super.c index 172b3e4..d012327 100644 --- a/lustre/llite/super.c +++ b/lustre/llite/super.c @@ -89,17 +89,6 @@ static void ll_options(char *options, char **ost, char **mds, int *flags) #ifndef log2 #define log2(n) ffz(~(n)) #endif -inline int ll_mds_easize(struct super_block *sb) -{ - struct client_obd *mdc = sbi2mdc(ll_s2sbi(sb)); - return mdc->cl_max_mds_easize; -} - -inline int ll_lov_easize(struct lov_stripe_md *md) -{ - return sizeof(*md) + md->lmd_stripe_count * - sizeof(struct lov_oinfo); -} static struct super_block * ll_read_super(struct super_block *sb, void *data, int silent) @@ -115,7 +104,7 @@ static struct super_block * ll_read_super(struct super_block *sb, __u64 last_committed; __u64 last_xid; struct ptlrpc_request *request = NULL; - struct ll_inode_md md; + struct ll_read_inode2_cookie lic; class_uuid_t uuid; ENTRY; @@ -212,9 +201,9 @@ static struct super_block * ll_read_super(struct super_block *sb, GOTO(out_request, sb = NULL); } - md.body = lustre_msg_buf(request->rq_repmsg, 0); - md.md = NULL; - root = iget4(sb, sbi->ll_rootino, NULL, &md); + lic.lic_body = lustre_msg_buf(request->rq_repmsg, 0); + lic.lic_lmm = NULL; + root = iget4(sb, sbi->ll_rootino, NULL, &lic); if (root) { sb->s_root = d_alloc_root(root); @@ -264,12 +253,11 @@ static void ll_clear_inode(struct inode *inode) { if (atomic_read(&inode->i_count) == 0) { struct ll_inode_info *lli = ll_i2info(inode); - struct lov_stripe_md *md = lli->lli_smd; + struct lov_stripe_md *lsm = lli->lli_smd; char *symlink_name = lli->lli_symlink_name; - if (md) { - int size = ll_lov_easize(md); - OBD_FREE(md, size); + if (lsm) { + OBD_FREE(lsm, ll_ost_easize(inode->i_sb)); lli->lli_smd = NULL; } if (symlink_name) { @@ -284,12 +272,12 @@ static void ll_delete_inode(struct inode *inode) if (S_ISREG(inode->i_mode)) { int err; struct obdo *oa; - struct lov_stripe_md *md = ll_i2info(inode)->lli_smd; + struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd; - if (!md) + if (!lsm) GOTO(out, -EINVAL); - if (md->lmd_object_id == 0) { + if (lsm->lsm_object_id == 0) { CERROR("This really happens\n"); /* No obdo was ever created */ GOTO(out, 0); @@ -299,15 +287,15 @@ static void ll_delete_inode(struct inode *inode) if (oa == NULL) GOTO(out, -ENOMEM); - oa->o_id = md->lmd_object_id; - oa->o_easize = md->lmd_mds_easize; + oa->o_id = lsm->lsm_object_id; + oa->o_easize = ll_mds_easize(inode->i_sb); oa->o_mode = inode->i_mode; oa->o_valid = OBD_MD_FLID | OBD_MD_FLEASIZE | OBD_MD_FLMODE; - err = obd_destroy(ll_i2obdconn(inode), oa, md); + err = obd_destroy(ll_i2obdconn(inode), oa, lsm); obdo_free(oa); - CDEBUG(D_SUPER, "obd destroy of "LPD64" error %d\n", - md->lmd_object_id, err); + CDEBUG(D_SUPER, "obd destroy of objid "LPX64" error %d\n", + lsm->lsm_object_id, err); } out: clear_inode(inode); @@ -423,8 +411,8 @@ out: static void ll_read_inode2(struct inode *inode, void *opaque) { - struct ll_inode_md *md = opaque; - struct mds_body *body = md->body; + struct ll_read_inode2_cookie *lic = opaque; + struct mds_body *body = lic->lic_body; struct ll_inode_info *lli = ll_i2info(inode); ENTRY; @@ -457,21 +445,22 @@ static void ll_read_inode2(struct inode *inode, void *opaque) inode->i_size = body->size; //if (body->valid & OBD_MD_FLEASIZE) - if (md && md->md) { - struct lov_mds_md *smd = md->md; + if (lic && lic->lic_lmm) { + struct lov_mds_md *lmm = lic->lic_lmm; int size; - if (md->md->lmd_easize != ll_mds_easize(inode->i_sb)) { + + if (lmm->lmm_easize != ll_mds_easize(inode->i_sb)) { CERROR("Striping metadata size error %ld\n", inode->i_ino); LBUG(); } - size = ll_lov_easize(lli->lli_smd); + size = ll_ost_easize(inode->i_sb); OBD_ALLOC(lli->lli_smd, size); if (!lli->lli_smd) { CERROR("No memory for %d\n", size); LBUG(); } - lov_unpackmd(lli->lli_smd, smd); + lov_unpackmd(lli->lli_smd, lmm); } else { lli->lli_smd = NULL; } diff --git a/lustre/lov/lov_obd.c b/lustre/lov/lov_obd.c index 1e4cba6..ab8d710 100644 --- a/lustre/lov/lov_obd.c +++ b/lustre/lov/lov_obd.c @@ -24,6 +24,7 @@ #include #include #include +#include /* obd methods */ static int lov_connect(struct lustre_handle *conn, struct obd_device *obd, @@ -62,7 +63,7 @@ static int lov_connect(struct lustre_handle *conn, struct obd_device *obd, /* sanity... */ if (req->rq_repmsg->bufcount < 2 || req->rq_repmsg->buflens[0] < sizeof(*desc)) { - CERROR("invalid descriptor returned\n"); + CERROR("LOV desc: invalid descriptor returned\n"); GOTO(out, rc = -EINVAL); } @@ -70,25 +71,40 @@ static int lov_connect(struct lustre_handle *conn, struct obd_device *obd, lov_unpackdesc(desc); if (req->rq_repmsg->buflens[1] < sizeof(*uuidarray)*desc->ld_tgt_count){ - CERROR("invalid uuid array returned\n"); + CERROR("LOV desc: invalid uuid array returned\n"); GOTO(out, rc = -EINVAL); } - mdc->cl_max_mds_easize = sizeof(struct lov_mds_md) + - desc->ld_tgt_count * sizeof(struct lov_object_id); + mdc->cl_max_mds_easize = lov_mds_md_size(desc->ld_tgt_count); + mdc->cl_max_ost_easize = lov_stripe_md_size(desc->ld_tgt_count); if (memcmp(obd->obd_uuid, desc->ld_uuid, sizeof(desc->ld_uuid))) { - CERROR("lov uuid %s not on mds device (%s)\n", + CERROR("LOV desc: uuid %s not on mds device (%s)\n", obd->obd_uuid, desc->ld_uuid); GOTO(out, rc = -EINVAL); } if (desc->ld_tgt_count > 1000) { - CERROR("configuration error: target count > 1000 (%d)\n", + CERROR("LOV desc: target count > 1000 (%d)\n", desc->ld_tgt_count); GOTO(out, rc = -EINVAL); } + if (desc->ld_default_stripe_count == 0) { + CERROR("LOV desc: default stripe count is zero\n"); + GOTO(out, rc = -EINVAL); + } + + /* Because of 64-bit divide/mod operations only work with a 32-bit + * divisor in a 32-bit kernel, we cannot support a stripe width + * of 4GB or larger. + */ + if (desc->ld_default_stripe_size * desc->ld_tgt_count > ~0UL) { + CERROR("LOV desc: stripe width > %lu on 32-bit system\n", + ~0UL); + GOTO(out, rc = -EINVAL); + } + lov->bufsize = sizeof(struct lov_tgt_desc) * desc->ld_tgt_count; OBD_ALLOC(lov->tgts, lov->bufsize); if (!lov->tgts) { @@ -177,7 +193,7 @@ static int lov_setup(struct obd_device *obd, obd_count len, void *buf) } if (data->ioc_inllen1 > 37) { - CERROR("mdc UUID must be less than 38 characters\n"); + CERROR("mdc UUID must be 36 characters or less\n"); RETURN(-EINVAL); } @@ -191,35 +207,17 @@ static int lov_setup(struct obd_device *obd, obd_count len, void *buf) } -static inline int lov_stripe_md_size(struct obd_device *obd) -{ - struct lov_obd *lov = &obd->u.lov; - int size; - - size = sizeof(struct lov_stripe_md) + - lov->desc.ld_tgt_count * sizeof(struct lov_oinfo); - return size; -} - -static inline int lov_mds_md_size(struct obd_device *obd) -{ - struct lov_obd *lov = &obd->u.lov; - int size; - - size = sizeof(struct lov_mds_md) + - lov->desc.ld_tgt_count * sizeof(struct lov_object_id); - return size; -} - -/* the LOV counts on oa->o_id to be set as the LOV object id */ +/* the LOV expects oa->o_id to be set to the LOV object id */ static int lov_create(struct lustre_handle *conn, struct obdo *oa, struct lov_stripe_md **ea) { - int rc = 0, i; - struct obdo tmp; struct obd_export *export = class_conn2export(conn); struct lov_obd *lov; - struct lov_stripe_md *md; + struct lov_stripe_md *lsm; + struct lov_oinfo *loi; + int sub_offset, stripe_offset; + int ost_count; + int rc = 0, i; ENTRY; if (!ea) { @@ -228,64 +226,90 @@ static int lov_create(struct lustre_handle *conn, struct obdo *oa, } if (!export) RETURN(-EINVAL); - lov = &export->exp_obd->u.lov; - oa->o_easize = lov_stripe_md_size(export->exp_obd); + lov = &export->exp_obd->u.lov; + ost_count = lov->desc.ld_tgt_count; + oa->o_easize = lov_stripe_md_size(ost_count); if (!*ea) { OBD_ALLOC(*ea, oa->o_easize); - if (! *ea) + if (!*ea) RETURN(-ENOMEM); } - md = *ea; - md->lmd_mds_easize = lov_mds_md_size(export->exp_obd); - md->lmd_object_id = oa->o_id; - if (!md->lmd_stripe_count) - md->lmd_stripe_count = lov->desc.ld_default_stripe_count; + lsm = *ea; + LASSERT(oa->o_valid & OBD_MD_FLID); + lsm->lsm_mds_easize = lov_mds_md_size(ost_count); + lsm->lsm_object_id = oa->o_id; + if (!lsm->lsm_stripe_count) + lsm->lsm_stripe_count = lov->desc.ld_default_stripe_count; - if (!md->lmd_stripe_size) - md->lmd_stripe_size = lov->desc.ld_default_stripe_size; + if (!lsm->lsm_stripe_size) + lsm->lsm_stripe_size = lov->desc.ld_default_stripe_size; + lsm->lsm_ost_count = ost_count; + sub_offset = ((int)lsm->lsm_object_id / lsm->lsm_stripe_count) % + lsm->lsm_stripe_count; + stripe_offset = (((int)lsm->lsm_object_id * lsm->lsm_stripe_count) % + ost_count); + lsm->lsm_stripe_offset = stripe_offset + sub_offset; + CDEBUG(D_INODE, "allocating objects starting at OST idx %d\n", + lsm->lsm_stripe_offset); - for (i = 0; i < md->lmd_stripe_count; i++) { + for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) { struct lov_stripe_md obj_md; struct lov_stripe_md *obj_mdp = &obj_md; + struct obdo tmp; + int ost_idx = (((sub_offset + i) % lsm->lsm_stripe_count) + + stripe_offset) % ost_count; + /* create data objects with "parent" OA */ memcpy(&tmp, oa, sizeof(tmp)); tmp.o_easize = sizeof(struct lov_stripe_md); - rc = obd_create(&lov->tgts[i].conn, &tmp, &obj_mdp); - if (rc) + rc = obd_create(&lov->tgts[ost_idx].conn, &tmp, &obj_mdp); + if (rc) { + CERROR("error creating objid "LPX64" sub-object on " + "OST idx %d: rc = %d\n", oa->o_id, ost_idx, rc); GOTO(out_cleanup, rc); - md->lmd_oinfo[i].loi_id = tmp.o_id; - md->lmd_oinfo[i].loi_size = tmp.o_size; + } + loi->loi_id = tmp.o_id; + loi->loi_size = tmp.o_size; + loi->loi_ost_idx = ost_idx; } out_cleanup: if (rc) { - int i2, rc2; - for (i2 = 0; i2 < i; i2++) { + while (i-- > 0) { + struct obdo tmp; + int err; + + --loi; /* destroy already created objects here */ - tmp.o_id = md->lmd_oinfo[i].loi_id; - rc2 = obd_destroy(&lov->tgts[i].conn, &tmp, NULL); - if (rc2) - CERROR("Failed to remove object from target " - "%d\n", i2); + memcpy(&tmp, oa, sizeof(tmp)); + tmp.o_id = loi->loi_id; + err = obd_destroy(&lov->tgts[loi->loi_ost_idx].conn, + &tmp, NULL); + if (err) + CERROR("Failed to remove objid "LPX64" subobj " + LPX64" on OST idx %d: rc = %d\n", + oa->o_id, loi->loi_id, loi->loi_ost_idx, + err); } } return rc; } static int lov_destroy(struct lustre_handle *conn, struct obdo *oa, - struct lov_stripe_md *md) + struct lov_stripe_md *lsm) { - int rc = 0, i; struct obdo tmp; struct obd_export *export = class_conn2export(conn); struct lov_obd *lov; + struct lov_oinfo *loi; + int rc = 0, i; ENTRY; - if (!md) { + if (!lsm) { CERROR("LOV requires striping ea for destruction\n"); RETURN(-EINVAL); } @@ -294,29 +318,31 @@ static int lov_destroy(struct lustre_handle *conn, struct obdo *oa, RETURN(-ENODEV); lov = &export->exp_obd->u.lov; - for (i = 0; i < md->lmd_stripe_count; i++) { + for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) { /* create data objects with "parent" OA */ memcpy(&tmp, oa, sizeof(tmp)); - tmp.o_id = md->lmd_oinfo[i].loi_id; - rc = obd_destroy(&lov->tgts[i].conn, &tmp, NULL); + tmp.o_id = loi->loi_id; + rc = obd_destroy(&lov->tgts[loi->loi_ost_idx].conn, &tmp, NULL); if (rc) - CERROR("Error destroying object "LPD64" on %d\n", - md->lmd_oinfo[i].loi_id, i); + CERROR("Error destroying objid "LPX64" subobj "LPX64 + " on OST idx %d\n: rc = %d", + oa->o_id, loi->loi_id, loi->loi_ost_idx, rc); } RETURN(rc); } static int lov_getattr(struct lustre_handle *conn, struct obdo *oa, - struct lov_stripe_md *md) + struct lov_stripe_md *lsm) { - int rc = 0, i; struct obdo tmp; struct obd_export *export = class_conn2export(conn); struct lov_obd *lov; + struct lov_oinfo *loi; + int rc = 0, i; int set = 0; ENTRY; - if (!md) { + if (!lsm) { CERROR("LOV requires striping ea\n"); RETURN(-EINVAL); } @@ -327,20 +353,21 @@ static int lov_getattr(struct lustre_handle *conn, struct obdo *oa, lov = &export->exp_obd->u.lov; oa->o_size = 0; oa->o_blocks = 0; - for (i = 0; i < md->lmd_stripe_count; i++) { + for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) { int err; - if (md->lmd_oinfo[i].loi_id == 0) + if (loi->loi_id == 0) continue; /* create data objects with "parent" OA */ memcpy(&tmp, oa, sizeof(tmp)); - tmp.o_id = md->lmd_oinfo[i].loi_id; + tmp.o_id = loi->loi_id; - err = obd_getattr(&lov->tgts[i].conn, &tmp, NULL); + err = obd_getattr(&lov->tgts[loi->loi_ost_idx].conn, &tmp,NULL); if (err) { - CERROR("Error getattr object "LPD64" on %d: err = %d\n", - md->lmd_oinfo[i].loi_id, i, err); + CERROR("Error getattr objid "LPX64" subobj "LPX64 + " on OST idx %d: rc = %d\n", + oa->o_id, loi->loi_id, loi->loi_ost_idx, err); if (!rc) rc = err; continue; /* XXX or break? */ @@ -349,6 +376,7 @@ static int lov_getattr(struct lustre_handle *conn, struct obdo *oa, obdo_cpy_md(oa, &tmp, tmp.o_valid); set = 1; } else { +#warning FIXME: the size needs to be fixed for sparse files if (tmp.o_valid & OBD_MD_FLSIZE) oa->o_size += tmp.o_size; if (tmp.o_valid & OBD_MD_FLBLOCKS) @@ -365,15 +393,16 @@ static int lov_getattr(struct lustre_handle *conn, struct obdo *oa, } static int lov_setattr(struct lustre_handle *conn, struct obdo *oa, - struct lov_stripe_md *md) + struct lov_stripe_md *lsm) { int rc = 0, i; struct obdo tmp; struct obd_export *export = class_conn2export(conn); struct lov_obd *lov; + struct lov_oinfo *loi; ENTRY; - if (!md) { + if (!lsm) { CERROR("LOV requires striping ea\n"); RETURN(-EINVAL); } @@ -381,30 +410,40 @@ static int lov_setattr(struct lustre_handle *conn, struct obdo *oa, if (!export || !export->exp_obd) RETURN(-ENODEV); + if (oa->o_valid && OBD_MD_FLSIZE) + CERROR("setting size on an LOV object is totally broken\n"); + lov = &export->exp_obd->u.lov; - for (i = 0; i < md->lmd_stripe_count; i++) { + for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) { + int err; + /* create data objects with "parent" OA */ memcpy(&tmp, oa, sizeof(tmp)); - tmp.o_id = md->lmd_oinfo[i].loi_id; + tmp.o_id = loi->loi_id; - rc = obd_setattr(&lov->tgts[i].conn, &tmp, NULL); - if (rc) - CERROR("Error setattr object "LPD64" on %d\n", - tmp.o_id, i); + err = obd_setattr(&lov->tgts[loi->loi_ost_idx].conn, &tmp,NULL); + if (err) { + CERROR("Error setattr objid "LPX64" subobj "LPX64 + " on OST idx %d: rc = %d\n", + oa->o_id, loi->loi_id, loi->loi_ost_idx, err); + if (!rc) + rc = err; + } } RETURN(rc); } static int lov_open(struct lustre_handle *conn, struct obdo *oa, - struct lov_stripe_md *md) + struct lov_stripe_md *lsm) { - int rc = 0, rc2 = 0, i; struct obdo tmp; struct obd_export *export = class_conn2export(conn); struct lov_obd *lov; + struct lov_oinfo *loi; + int rc = 0, i; ENTRY; - if (!md) { + if (!lsm) { CERROR("LOV requires striping ea for opening\n"); RETURN(-EINVAL); } @@ -413,31 +452,44 @@ static int lov_open(struct lustre_handle *conn, struct obdo *oa, RETURN(-ENODEV); lov = &export->exp_obd->u.lov; - for (i = 0; i < md->lmd_stripe_count; i++) { + for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) { + int err; + /* create data objects with "parent" OA */ memcpy(&tmp, oa, sizeof(tmp)); - tmp.o_id = md->lmd_oinfo[i].loi_id; + tmp.o_id = loi->loi_id; - rc = obd_open(&lov->tgts[i].conn, &tmp, NULL); - if (rc) { - rc2 = rc; - CERROR("Error open object "LPD64" on %d\n", - md->lmd_oinfo[i].loi_id, i); + err = obd_open(&lov->tgts[loi->loi_ost_idx].conn, &tmp, NULL); + if (err) { + CERROR("Error open objid "LPX64" subobj "LPX64 + " on OST idx %d: rc = %d\n", + oa->o_id, lsm->lsm_oinfo[i].loi_id, + loi->loi_ost_idx, rc); + if (!rc) + rc = err; } } - RETURN(rc2); + /* FIXME: returning an error, but having opened some objects is a bad + * idea, since they will likely never be closed. We either + * need to not return an error if _some_ objects could be + * opened, and leave it to read/write to return -EIO (with + * hopefully partial error status) or close all opened objects + * and return an error. + */ + RETURN(rc); } static int lov_close(struct lustre_handle *conn, struct obdo *oa, - struct lov_stripe_md *md) + struct lov_stripe_md *lsm) { - int rc = 0, i; struct obdo tmp; struct obd_export *export = class_conn2export(conn); struct lov_obd *lov; + struct lov_oinfo *loi; + int rc = 0, i; ENTRY; - if (!md) { + if (!lsm) { CERROR("LOV requires striping ea\n"); RETURN(-EINVAL); } @@ -446,15 +498,21 @@ static int lov_close(struct lustre_handle *conn, struct obdo *oa, RETURN(-ENODEV); lov = &export->exp_obd->u.lov; - for (i = 0; i < md->lmd_stripe_count; i++) { + for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) { + int err; + /* create data objects with "parent" OA */ memcpy(&tmp, oa, sizeof(tmp)); - tmp.o_id = md->lmd_oinfo[i].loi_id; + tmp.o_id = loi->loi_id; - rc = obd_close(&lov->tgts[i].conn, &tmp, NULL); - if (rc) - CERROR("Error close object "LPD64" on %d\n", - md->lmd_oinfo[i].loi_id, i); + err = obd_close(&lov->tgts[loi->loi_ost_idx].conn, &tmp, NULL); + if (err) { + CERROR("Error close objid "LPX64" subobj "LPX64 + " on OST idx %d: rc = %d\n", + oa->o_id, loi->loi_id, loi->loi_ost_idx, err); + if (!rc) + rc = err; + } } RETURN(rc); } @@ -463,34 +521,39 @@ static int lov_close(struct lustre_handle *conn, struct obdo *oa, #define log2(n) ffz(~(n)) #endif -/* compute offset in stripe i corresponding to offset "in" */ -static __u64 lov_offset(struct lov_stripe_md *md, __u64 in, int i) +#warning FIXME: merge these two functions now that they are nearly the same + +/* compute ost offset in stripe "stripeno" corresponding to offset "lov_off" */ +static __u64 lov_offset(struct lov_stripe_md *lsm, __u64 lov_off, int stripeno) { - __u32 ssz = md->lmd_stripe_size; - /* full stripes across all * stripe size */ - __u32 out = ( ((__u32)in) / (md->lmd_stripe_count * ssz)) * ssz; - __u32 off = (__u32)in % (md->lmd_stripe_count * ssz); + unsigned long ssize = lsm->lsm_stripe_size; + unsigned long swidth = ssize * lsm->lsm_stripe_count; + unsigned long stripe_off; - if ( in == 0xffffffffffffffff ) { - return 0xffffffffffffffff; - } + if (lov_off == OBD_PUNCH_EOF) + return OBD_PUNCH_EOF; - if ( (i+1) * ssz <= off ) - out += (i+1) * ssz; - else if ( i * ssz > off ) - out += 0; + /* do_div(a, b) returns a % b, and a = a / b */ + stripe_off = do_div(lov_off, swidth); + + if (stripe_off < stripeno * ssize) + stripe_off = 0; else - out += (off - (i * ssz)) % ssz; + stripe_off -= stripeno * ssize; - return (__u64) out; + return lov_off + stripe_off; } -static int lov_stripe_which(struct lov_stripe_md *md, __u64 in) +/* compute which stripe offset "lov_off" will be written into */ +static int lov_stripe_which(struct lov_stripe_md *lsm, __u64 lov_off) { - __u32 ssz = md->lmd_stripe_size; - int j; - j = (((__u32) in) / ssz) % md->lmd_stripe_count; - return j; + unsigned long ssize = lsm->lsm_stripe_size; + unsigned long swidth = ssize * lsm->lsm_stripe_count; + unsigned long stripe_off; + + stripe_off = do_div(lov_off, swidth); + + return stripe_off / ssize; } @@ -498,16 +561,17 @@ static int lov_stripe_which(struct lov_stripe_md *md, __u64 in) * we can send this 'punch' to just the authoritative node and the nodes * that the punch will affect. */ static int lov_punch(struct lustre_handle *conn, struct obdo *oa, - struct lov_stripe_md *md, + struct lov_stripe_md *lsm, obd_off start, obd_off end) { - int rc = 0, i; struct obdo tmp; struct obd_export *export = class_conn2export(conn); struct lov_obd *lov; + struct lov_oinfo *loi; + int rc = 0, i; ENTRY; - if (!md) { + if (!lsm) { CERROR("LOV requires striping ea for desctruction\n"); RETURN(-EINVAL); } @@ -516,21 +580,26 @@ static int lov_punch(struct lustre_handle *conn, struct obdo *oa, RETURN(-ENODEV); lov = &export->exp_obd->u.lov; - for (i = 0; i < md->lmd_stripe_count; i++) { - __u64 starti = lov_offset(md, start, i); - __u64 endi = lov_offset(md, end, i); + for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) { + __u64 starti = lov_offset(lsm, start, i); + __u64 endi = lov_offset(lsm, end, i); + int err; if (starti == endi) continue; /* create data objects with "parent" OA */ memcpy(&tmp, oa, sizeof(tmp)); - tmp.o_id = md->lmd_oinfo[i].loi_id; + tmp.o_id = loi->loi_id; - rc = obd_punch(&lov->tgts[i].conn, &tmp, NULL, + err = obd_punch(&lov->tgts[loi->loi_ost_idx].conn, &tmp, NULL, starti, endi); - if (rc) - CERROR("Error punch object "LPD64" on %d\n", - md->lmd_oinfo[i].loi_id, i); + if (err) { + CERROR("Error punch objid "LPX64" subobj "LPX64 + " on OST idx %d: rc = %d\n", + oa->o_id, loi->loi_id, loi->loi_ost_idx, err); + if (!rc) + rc = err; + } } RETURN(rc); } @@ -556,22 +625,25 @@ static int lov_osc_brw_callback(struct io_cb_data *cbd, int err, int phase) } static inline int lov_brw(int cmd, struct lustre_handle *conn, - struct lov_stripe_md *md, obd_count oa_bufs, + struct lov_stripe_md *lsm, obd_count oa_bufs, struct brw_page *pga, brw_callback_t callback, struct io_cb_data *cbd) { - int stripe_count = md->lmd_stripe_count; + int stripe_count = lsm->lsm_stripe_count; struct obd_export *export = class_conn2export(conn); struct lov_obd *lov; struct { int bufct; int index; int subcount; - struct lov_stripe_md md; - } *stripeinfo; + struct lov_stripe_md lsm; + int ost_idx; + } *stripeinfo, *si, *si_last; struct brw_page *ioarr; int rc, i; struct io_cb_data *our_cb; + struct lov_oinfo *loi; + int *where; ENTRY; lov = &export->exp_obd->u.lov; @@ -584,51 +656,53 @@ static inline int lov_brw(int cmd, struct lustre_handle *conn, if (!stripeinfo) GOTO(out_cbdata, rc = -ENOMEM); + OBD_ALLOC(where, sizeof(*where) * oa_bufs); + if (!where) + GOTO(out_sinfo, rc = -ENOMEM); + OBD_ALLOC(ioarr, sizeof(*ioarr) * oa_bufs); if (!ioarr) - GOTO(out_sinfo, rc = -ENOMEM); + GOTO(out_where, rc = -ENOMEM); + + /* This is the only race-free way I can think of to get the refcount + * correct. -phil */ + atomic_set(&our_cb->refcount, 0); + our_cb->cb = callback; + our_cb->data = cbd; for (i = 0; i < oa_bufs; i++) { - int which; - which = lov_stripe_which(md, pga[i].off); - stripeinfo[which].bufct++; + where[i] = lov_stripe_which(lsm, pga[i].off); + if (stripeinfo[where[i]].bufct++ == 0) + atomic_inc(&our_cb->refcount); } - for (i = 0; i < stripe_count; i++) { + for (i = 0, loi = lsm->lsm_oinfo, si = si_last = stripeinfo; + i < stripe_count; + i++, loi++, si_last = si, si++) { if (i > 0) - stripeinfo[i].index = stripeinfo[i - 1].index + - stripeinfo[i - 1].bufct; - stripeinfo[i].md.lmd_object_id = md->lmd_oinfo[i].loi_id; + si->index = si_last->index + si_last->bufct; + si->lsm.lsm_object_id = loi->loi_id; + si->ost_idx = loi->loi_ost_idx; } for (i = 0; i < oa_bufs; i++) { - int which, shift; - which = lov_stripe_which(md, pga[i].off); - - shift = stripeinfo[which].index; - LASSERT(shift + stripeinfo[which].subcount < oa_bufs); - ioarr[shift + stripeinfo[which].subcount] = pga[i]; - ioarr[shift + stripeinfo[which].subcount].off = - lov_offset(md, pga[i].off, which); + int which = where[i]; + int shift; + + shift = stripeinfo[which].index + stripeinfo[which].subcount; + LASSERT(shift < oa_bufs); + ioarr[shift] = pga[i]; + ioarr[shift].off = lov_offset(lsm, pga[i].off, which); stripeinfo[which].subcount++; } - our_cb->cb = callback; - our_cb->data = cbd; - - /* This is the only race-free way I can think of to get the refcount - * correct. -phil */ - atomic_set(&our_cb->refcount, 0); - for (i = 0; i < stripe_count; i++) - if (stripeinfo[i].bufct) - atomic_inc(&our_cb->refcount); + for (i = 0, si = stripeinfo; i < stripe_count; i++) { + int shift = si->index; - for (i = 0; i < stripe_count; i++) { - int shift = stripeinfo[i].index; - if (stripeinfo[i].bufct) { + if (si->bufct) { LASSERT(shift < oa_bufs); - obd_brw(cmd, &lov->tgts[i].conn, &stripeinfo[i].md, - stripeinfo[i].bufct, &ioarr[shift], + obd_brw(cmd, &lov->tgts[si->ost_idx].conn, + &si->lsm, si->bufct, &ioarr[shift], lov_osc_brw_callback, our_cb); } } @@ -636,6 +710,8 @@ static inline int lov_brw(int cmd, struct lustre_handle *conn, rc = callback(cbd, 0, CB_PHASE_START); OBD_FREE(ioarr, sizeof(*ioarr) * oa_bufs); + out_where: + OBD_FREE(where, sizeof(*where) * oa_bufs); out_sinfo: OBD_FREE(stripeinfo, stripe_count * sizeof(*stripeinfo)); out_cbdata: @@ -643,19 +719,19 @@ static inline int lov_brw(int cmd, struct lustre_handle *conn, RETURN(rc); } -static int lov_enqueue(struct lustre_handle *conn, struct lov_stripe_md *md, +static int lov_enqueue(struct lustre_handle *conn, struct lov_stripe_md *lsm, struct lustre_handle *parent_lock, __u32 type, void *cookie, int cookielen, __u32 mode, int *flags, void *cb, void *data, int datalen, struct lustre_handle *lockhs) { - int rc = 0, i; struct obd_export *export = class_conn2export(conn); struct lov_obd *lov; - struct lov_stripe_md submd; + struct lov_oinfo *loi; + int rc = 0, i; ENTRY; - if (!md) { + if (!lsm) { CERROR("LOV requires striping ea for desctruction\n"); RETURN(-EINVAL); } @@ -664,40 +740,45 @@ static int lov_enqueue(struct lustre_handle *conn, struct lov_stripe_md *md, RETURN(-ENODEV); lov = &export->exp_obd->u.lov; - for (i = 0; i < md->lmd_stripe_count; i++) { + for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) { struct ldlm_extent *extent = (struct ldlm_extent *)cookie; struct ldlm_extent sub_ext; + struct lov_stripe_md submd; - sub_ext.start = lov_offset(md, extent->start, i); - sub_ext.end = lov_offset(md, extent->end, i); - if ( sub_ext.start == sub_ext.end ) + sub_ext.start = lov_offset(lsm, extent->start, i); + sub_ext.end = lov_offset(lsm, extent->end, i); + if (sub_ext.start == sub_ext.end) continue; - submd.lmd_object_id = md->lmd_oinfo[i].loi_id; - submd.lmd_mds_easize = sizeof(struct lov_mds_md); - submd.lmd_stripe_count = 0; + submd.lsm_object_id = loi->loi_id; + /* XXX submd lsm_mds_easize should be that from the subobj, + * and the subobj should get it opaquely from the LOV. + */ + submd.lsm_mds_easize = lov_mds_md_size(lsm->lsm_ost_count); + submd.lsm_stripe_count = 0; /* XXX submd is not fully initialized here */ - rc = obd_enqueue(&(lov->tgts[i].conn), &submd, parent_lock, - type, &sub_ext, sizeof(sub_ext), mode, - flags, cb, data, datalen, &(lockhs[i])); + rc = obd_enqueue(&(lov->tgts[loi->loi_ost_idx].conn), &submd, + parent_lock, type, &sub_ext, sizeof(sub_ext), + mode, flags, cb, data, datalen, &(lockhs[i])); // XXX add a lock debug statement here if (rc) - CERROR("Error obd_enqueue object "LPD64 - " subobj "LPD64"\n", - md->lmd_object_id, md->lmd_oinfo[i].loi_id); + CERROR("Error enqueue objid "LPX64" subobj "LPX64 + " on OST idx %d: rc = %d\n", lsm->lsm_object_id, + loi->loi_id, loi->loi_ost_idx, rc); } RETURN(rc); } -static int lov_cancel(struct lustre_handle *conn, struct lov_stripe_md *md, +static int lov_cancel(struct lustre_handle *conn, struct lov_stripe_md *lsm, __u32 mode, struct lustre_handle *lockhs) { - int rc = 0, i; struct obd_export *export = class_conn2export(conn); struct lov_obd *lov; + struct lov_oinfo *loi; + int rc = 0, i; ENTRY; - if (!md) { + if (!lsm) { CERROR("LOV requires striping ea for lock cancellation\n"); RETURN(-EINVAL); } @@ -706,19 +787,21 @@ static int lov_cancel(struct lustre_handle *conn, struct lov_stripe_md *md, RETURN(-ENODEV); lov = &export->exp_obd->u.lov; - for (i = 0; i < md->lmd_stripe_count; i++) { + for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) { struct lov_stripe_md submd; - if ( lockhs[i].addr == 0 ) + if (lockhs[i].addr == 0) continue; - submd.lmd_object_id = md->lmd_oinfo[i].loi_id; - submd.lmd_mds_easize = sizeof(struct lov_mds_md); - submd.lmd_stripe_count = 0; - rc = obd_cancel(&lov->tgts[i].conn, &submd, mode, &lockhs[i]); + submd.lsm_object_id = loi->loi_id; + submd.lsm_mds_easize = lov_mds_md_size(lsm->lsm_ost_count); + submd.lsm_stripe_count = 0; + rc = obd_cancel(&lov->tgts[loi->loi_ost_idx].conn, &submd, + mode, &lockhs[i]); if (rc) - CERROR("Error cancel object "LPD64" subobj "LPD64"\n", - md->lmd_object_id, md->lmd_oinfo[i].loi_id); + CERROR("Error cancel objid "LPX64" subobj "LPX64 + " on OST idx %d: rc = %d\n", lsm->lsm_object_id, + loi->loi_id, loi->loi_ost_idx, rc); } RETURN(rc); } @@ -744,7 +827,7 @@ static int lov_statfs(struct lustre_handle *conn, struct obd_statfs *osfs) err = obd_statfs(&lov->tgts[i].conn, &lov_sfs); if (err) { - CERROR("Error statfs OSC %s on %d: err = %d\n", + CERROR("Error statfs OSC %s idx %d: err = %d\n", lov->tgts[i].uuid, i, err); if (!rc) rc = err; diff --git a/lustre/mdc/mdc_reint.c b/lustre/mdc/mdc_reint.c index ee891f3..4267d3c 100644 --- a/lustre/mdc/mdc_reint.c +++ b/lustre/mdc/mdc_reint.c @@ -76,7 +76,7 @@ int mdc_setattr(struct lustre_handle *conn, int mdc_create(struct lustre_handle *conn, struct inode *dir, const char *name, int namelen, const char *tgt, int tgtlen, int mode, __u32 uid, - __u32 gid, __u64 time, __u64 rdev, struct lov_stripe_md *smd, + __u32 gid, __u64 time, __u64 rdev, struct lov_stripe_md *lsm, struct ptlrpc_request **request) { struct mds_rec_create *rec; @@ -87,12 +87,13 @@ int mdc_create(struct lustre_handle *conn, ENTRY; if (S_ISREG(mode)) { - if (!smd) { - CERROR("File create, but no md (%ld, %*s)\n", - dir->i_ino, namelen, name); + if (!lsm) { + CERROR("File create, but no stripe md (%ld, %*s)\n", + dir->i_ino, namelen, name); LBUG(); } - size[2] = smd->lmd_mds_easize; + // size[2] = mdc->cl_max_mds_easize; soon... + size[2] = lsm->lsm_mds_easize; bufcount = 3; } else if (S_ISLNK(mode)) { size[2] = tgtlen + 1; @@ -110,7 +111,7 @@ int mdc_create(struct lustre_handle *conn, name, namelen, NULL, 0); if (S_ISREG(mode)) { - lov_packmd(lustre_msg_buf(req->rq_reqmsg, 2), smd); + lov_packmd(lustre_msg_buf(req->rq_reqmsg, 2), lsm); } else if (S_ISLNK(mode)) { tmp = lustre_msg_buf(req->rq_reqmsg, 2); LOGL0(tgt, tgtlen, tmp); diff --git a/lustre/mdc/mdc_request.c b/lustre/mdc/mdc_request.c index 8878e28..739beb2 100644 --- a/lustre/mdc/mdc_request.c +++ b/lustre/mdc/mdc_request.c @@ -348,7 +348,7 @@ static void mdc_replay_open(struct ptlrpc_request *req, void *data) } int mdc_open(struct lustre_handle *conn, obd_id ino, int type, int flags, - struct lov_stripe_md *smd, __u64 cookie, __u64 *fh, + struct lov_stripe_md *lsm, __u64 cookie, __u64 *fh, struct ptlrpc_request **request) { struct mds_body *body; @@ -356,9 +356,10 @@ int mdc_open(struct lustre_handle *conn, obd_id ino, int type, int flags, struct ptlrpc_request *req; ENTRY; - if (smd != NULL) { + if (lsm) { bufcount = 2; - size[1] = smd->lmd_mds_easize; + // size[1] = mdc->cl_max_mds_easize; soon... + size[1] = lsm->lsm_mds_easize; } req = ptlrpc_prep_req(class_conn2cliimp(conn), MDS_OPEN, bufcount, size, @@ -373,8 +374,8 @@ int mdc_open(struct lustre_handle *conn, obd_id ino, int type, int flags, body->flags = HTON__u32(flags); body->extra = cookie; - if (smd != NULL) - lov_packmd(lustre_msg_buf(req->rq_reqmsg, 1), smd); + if (lsm) + lov_packmd(lustre_msg_buf(req->rq_reqmsg, 1), lsm); req->rq_replen = lustre_msg_size(1, size); diff --git a/lustre/mds/handler.c b/lustre/mds/handler.c index d8ea1e7..fec6e2b 100644 --- a/lustre/mds/handler.c +++ b/lustre/mds/handler.c @@ -457,11 +457,11 @@ static int mds_getattr_internal(struct mds_obd *mds, struct dentry *dentry, mds_pack_inode2body(body, inode); if (S_ISREG(inode->i_mode)) { - struct lov_mds_md *md; + struct lov_mds_md *lmm; - md = lustre_msg_buf(req->rq_repmsg, reply_off + 1); - md->lmd_easize = mds->mds_max_mdsize; - rc = mds_fs_get_md(mds, inode, md); + lmm = lustre_msg_buf(req->rq_repmsg, reply_off + 1); + lmm->lmm_easize = mds->mds_max_mdsize; + rc = mds_fs_get_md(mds, inode, lmm); if (rc < 0) { if (rc == -ENODATA) @@ -1279,9 +1279,9 @@ static int ldlm_intent_policy(struct ldlm_lock *lock, void *req_cookie, /* If the client is about to open a file that doesn't have an MD * stripe record, it's going to need a write lock. */ if (it->opc & IT_OPEN) { - struct lov_mds_md *md = + struct lov_mds_md *lmm = lustre_msg_buf(req->rq_repmsg, 2); - if (md->lmd_easize == 0) { + if (lmm->lmm_easize == 0) { LDLM_DEBUG(lock, "open with no EA; returning PW" " lock"); lock->l_req_mode = LCK_PW; diff --git a/lustre/mds/mds_extN.c b/lustre/mds/mds_extN.c index 2c61b47..0467fcd 100644 --- a/lustre/mds/mds_extN.c +++ b/lustre/mds/mds_extN.c @@ -112,65 +112,68 @@ static int mds_extN_setattr(struct dentry *dentry, void *handle, } static int mds_extN_set_md(struct inode *inode, void *handle, - struct lov_mds_md *md) + struct lov_mds_md *lmm) { int rc; down(&inode->i_sem); lock_kernel(); - if (md == NULL) + if (lmm == NULL) rc = extN_xattr_set(handle, inode, EXTN_XATTR_INDEX_LUSTRE, XATTR_LUSTRE_MDS_OBJID, NULL, 0, 0); else { - md->lmd_magic = cpu_to_le32(XATTR_MDS_MO_MAGIC); + lmm->lmm_magic = cpu_to_le32(XATTR_MDS_MO_MAGIC); rc = extN_xattr_set(handle, inode, EXTN_XATTR_INDEX_LUSTRE, - XATTR_LUSTRE_MDS_OBJID, md, - md->lmd_easize, XATTR_CREATE); + XATTR_LUSTRE_MDS_OBJID, lmm, + lmm->lmm_easize, XATTR_CREATE); if (rc == -EEXIST) rc = extN_xattr_set(handle, inode, EXTN_XATTR_INDEX_LUSTRE, - XATTR_LUSTRE_MDS_OBJID, md, - md->lmd_easize, 0); + XATTR_LUSTRE_MDS_OBJID, lmm, + lmm->lmm_easize, 0); } unlock_kernel(); up(&inode->i_sem); if (rc) { - CERROR("error adding objectid %Ld to inode %ld: %d\n", - (unsigned long long)md->lmd_object_id, inode->i_ino, rc); + CERROR("error adding objectid "LPX64" to inode %ld: %d\n", + lmm->lmm_object_id, inode->i_ino, rc); LBUG(); } return rc; } -static int mds_extN_get_md(struct inode *inode, struct lov_mds_md *md) +static int mds_extN_get_md(struct inode *inode, struct lov_mds_md *lmm) { int rc; - int size = md->lmd_easize; + int size = lmm->lmm_easize; down(&inode->i_sem); lock_kernel(); rc = extN_xattr_get(inode, EXTN_XATTR_INDEX_LUSTRE, - XATTR_LUSTRE_MDS_OBJID, md, size); + XATTR_LUSTRE_MDS_OBJID, lmm, size); unlock_kernel(); up(&inode->i_sem); + /* This gives us the MD size */ + if (lmm == NULL) + return rc; + if (rc < 0) { CDEBUG(D_INFO, "error getting EA %s from MDS inode %ld: " "rc = %d\n", XATTR_LUSTRE_MDS_OBJID, inode->i_ino, rc); - memset(md, 0, size); - return rc; - } else if (md == NULL) + memset(lmm, 0, size); return rc; + } - if (md->lmd_magic != cpu_to_le32(XATTR_MDS_MO_MAGIC)) { + if (lmm->lmm_magic != cpu_to_le32(XATTR_MDS_MO_MAGIC)) { CERROR("MDS striping md for ino %ld has bad magic\n", inode->i_ino); rc = -EINVAL; } else { /* This field is byteswapped because it appears in the * catalogue. All others are opaque to the MDS */ - md->lmd_object_id = le64_to_cpu(md->lmd_object_id); + lmm->lmm_object_id = le64_to_cpu(lmm->lmm_object_id); } return rc; diff --git a/lustre/mds/mds_lov.c b/lustre/mds/mds_lov.c index a051d04..30d8482 100644 --- a/lustre/mds/mds_lov.c +++ b/lustre/mds/mds_lov.c @@ -20,8 +20,8 @@ #include #include #include -#include #include +#include #include int mds_configure_lov(struct obd_device *obd, struct lov_desc *desc, diff --git a/lustre/mds/mds_reint.c b/lustre/mds/mds_reint.c index 5272f91..c4c244a 100644 --- a/lustre/mds/mds_reint.c +++ b/lustre/mds/mds_reint.c @@ -245,7 +245,8 @@ static int mds_reint_create(struct mds_update_record *rec, int offset, if (dchild->d_inode) { struct mds_body *body; struct inode *inode = dchild->d_inode; - struct lov_mds_md *md; + struct lov_mds_md *lmm; + CDEBUG(D_INODE, "child exists (dir %ld, name %s, ino %ld)\n", dir->i_ino, rec->ur_name, dchild->d_inode->i_ino); @@ -253,11 +254,11 @@ static int mds_reint_create(struct mds_update_record *rec, int offset, mds_pack_inode2fid(&body->fid1, inode); mds_pack_inode2body(body, inode); if (S_ISREG(inode->i_mode)) { - md = lustre_msg_buf(req->rq_repmsg, offset + 1); - md->lmd_easize = mds->mds_max_mdsize; + lmm = lustre_msg_buf(req->rq_repmsg, offset + 1); + lmm->lmm_easize = mds->mds_max_mdsize; - if (mds_fs_get_md(mds, inode, md) < 0) - memset(md, 0, md->lmd_easize); + if (mds_fs_get_md(mds, inode, lmm) < 0) + memset(lmm, 0, lmm->lmm_easize); } /* now a normal case for intent locking */ GOTO(out_create_dchild, rc = -EEXIST); @@ -325,9 +326,10 @@ static int mds_reint_create(struct mds_update_record *rec, int offset, CDEBUG(D_INODE, "created ino %ld\n", dchild->d_inode->i_ino); if (!offset && type == S_IFREG) { - struct lov_mds_md *md; - md = lustre_msg_buf(req->rq_reqmsg, 2); - rc = mds_fs_set_md(mds, inode, handle, md); + struct lov_mds_md *lmm; + + lmm = lustre_msg_buf(req->rq_reqmsg, 2); + rc = mds_fs_set_md(mds, inode, handle, lmm); if (rc) { CERROR("error %d setting LOV MD for %ld\n", rc, inode->i_ino); @@ -461,14 +463,14 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset, case S_IFREG: /* get OBD EA data first so client can also destroy object */ if ((inode->i_mode & S_IFMT) == S_IFREG && offset) { - struct lov_mds_md *md; + struct lov_mds_md *lmm; - md = lustre_msg_buf(req->rq_repmsg, 2); - md->lmd_easize = mds->mds_max_mdsize; - if ((rc = mds_fs_get_md(mds, inode, md)) < 0) { + lmm = lustre_msg_buf(req->rq_repmsg, 2); + lmm->lmm_easize = mds->mds_max_mdsize; + if ((rc = mds_fs_get_md(mds, inode, lmm)) < 0) { CDEBUG(D_INFO, "No md for ino %ld: rc = %d\n", inode->i_ino, rc); - memset(md, 0, md->lmd_easize); + memset(lmm, 0, lmm->lmm_easize); } } /* no break */ @@ -670,15 +672,15 @@ static int mds_reint_rename(struct mds_update_record *rec, int offset, } else { mds_pack_inode2fid(&body->fid1, inode); mds_pack_inode2body(body, inode); - if (S_ISREG(inode->i_mode)) { - struct lov_mds_md *md; - - md = lustre_msg_buf(req->rq_repmsg, 2); - md->lmd_easize = mds->mds_max_mdsize; - if ((rc = mds_fs_get_md(mds, inode, md)) < 0) { + if (S_ISREG(inode->i_mode)) { + struct lov_mds_md *lmm; + + lmm = lustre_msg_buf(req->rq_repmsg, 2); + lmm->lmm_easize = mds->mds_max_mdsize; + if ((rc = mds_fs_get_md(mds, inode, lmm)) < 0) { CDEBUG(D_INFO,"No md for %ld: rc %d\n", inode->i_ino, rc); - memset(md, 0, md->lmd_easize); + memset(lmm, 0, lmm->lmm_easize); } } } diff --git a/lustre/obdclass/class_obd.c b/lustre/obdclass/class_obd.c index dab194f..a66e5d4 100644 --- a/lustre/obdclass/class_obd.c +++ b/lustre/obdclass/class_obd.c @@ -792,26 +792,27 @@ static int obd_class_ioctl (struct inode * inode, struct file * filp, } case OBD_IOC_OPEN: { - struct lov_stripe_md *md; // XXX fill in md from create + struct lov_stripe_md *lsm = NULL; // XXX fill in from create obd_data2conn(&conn, data); - err = obd_open(&conn, &data->ioc_obdo1, NULL); + err = obd_open(&conn, &data->ioc_obdo1, lsm); GOTO(out, err); } case OBD_IOC_CLOSE: { - struct lov_stripe_md *md; // XXX fill in md from create + struct lov_stripe_md *lsm = NULL; // XXX fill in from create obd_data2conn(&conn, data); obd_data2conn(&conn, data); - err = obd_close(&conn, &data->ioc_obdo1, NULL); + err = obd_close(&conn, &data->ioc_obdo1, lsm); GOTO(out, err); } case OBD_IOC_BRW_WRITE: rw = OBD_BRW_WRITE; case OBD_IOC_BRW_READ: { - struct lov_stripe_md smd; + struct lov_stripe_md tmp_lsm; // XXX fill in from create + struct lov_stripe_md *lsm = &tmp_lsm; // XXX fill in from create struct io_cb_data *cbd = ll_init_cb(); obd_count pages = 0; struct brw_page *pga, *pgp; @@ -837,8 +838,8 @@ static int obd_class_ioctl (struct inode * inode, struct file * filp, GOTO(brw_free, err = -ENOMEM); } - memset(&smd, 0, sizeof(smd)); - smd.lmd_object_id = id; + memset(lsm, 0, sizeof(*lsm)); // XXX don't do this later + lsm->lsm_object_id = id; // ensure id == lsm->lsm_object_id for (j = 0, pgp = pga; j < pages; j++, off += PAGE_SIZE, pgp++){ pgp->pg = alloc_pages(gfp_mask, 0); @@ -864,7 +865,7 @@ static int obd_class_ioctl (struct inode * inode, struct file * filp, } } - err = obd_brw(rw, &conn, &smd, j, pga, ll_sync_io_cb, cbd); + err = obd_brw(rw, &conn, lsm, j, pga, ll_sync_io_cb, cbd); if (err) CERROR("test_brw: error from obd_brw: err = %d\n", err); EXIT; diff --git a/lustre/obdfilter/filter.c b/lustre/obdfilter/filter.c index b66fe60..0c738c3 100644 --- a/lustre/obdfilter/filter.c +++ b/lustre/obdfilter/filter.c @@ -711,7 +711,7 @@ static int filter_truncate(struct lustre_handle *conn, struct obdo *oa, } static int filter_pgcache_brw(int cmd, struct lustre_handle *conn, - struct lov_stripe_md *md, obd_count oa_bufs, + struct lov_stripe_md *lsm, obd_count oa_bufs, struct brw_page *pga, brw_callback_t callback, struct io_cb_data *data) { @@ -734,7 +734,7 @@ static int filter_pgcache_brw(int cmd, struct lustre_handle *conn, push_ctxt(&saved, &obd->u.filter.fo_ctxt); pnum = 0; /* pnum indexes buf 0..num_pages */ - file = filter_obj_open(obd, md->lmd_object_id, S_IFREG); + file = filter_obj_open(obd, lsm->lsm_object_id, S_IFREG); if (IS_ERR(file)) GOTO(out, retval = PTR_ERR(file)); @@ -1421,8 +1421,8 @@ int filter_copy_data(struct lustre_handle *dst_conn, struct obdo *dst, memset(&srcmd, 0, sizeof(srcmd)); memset(&dstmd, 0, sizeof(dstmd)); - srcmd.lmd_object_id = src->o_id; - dstmd.lmd_object_id = dst->o_id; + srcmd.lsm_object_id = src->o_id; + dstmd.lsm_object_id = dst->o_id; ENTRY; CDEBUG(D_INFO, "src: ino %Ld blocks %Ld, size %Ld, dst: ino %Ld\n", diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index 9d8a16a..2396323 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -180,10 +180,11 @@ static int osc_create(struct lustre_handle *conn, struct obdo *oa, } if (!*ea) { + // XXX check oa->o_valid & OBD_MD_FLEASIZE first... OBD_ALLOC(*ea, oa->o_easize); if (!*ea) RETURN(-ENOMEM); - (*ea)->lmd_mds_easize = oa->o_easize; + (*ea)->lsm_mds_easize = oa->o_easize; } request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_CREATE, 1, &size, @@ -204,8 +205,8 @@ static int osc_create(struct lustre_handle *conn, struct obdo *oa, body = lustre_msg_buf(request->rq_repmsg, 0); memcpy(oa, &body->oa, sizeof(*oa)); - (*ea)->lmd_object_id = oa->o_id; - (*ea)->lmd_stripe_count = 0; + (*ea)->lsm_object_id = oa->o_id; + (*ea)->lsm_stripe_count = 0; EXIT; out: ptlrpc_free_req(request); @@ -607,13 +608,13 @@ static int osc_brw(int cmd, struct lustre_handle *conn, return osc_brw_read(conn, md, page_count, pga, callback, data); } -static int osc_enqueue(struct lustre_handle *connh, struct lov_stripe_md *md, +static int osc_enqueue(struct lustre_handle *connh, struct lov_stripe_md *lsm, struct lustre_handle *parent_lock, __u32 type, void *extentp, int extent_len, __u32 mode, int *flags, void *callback, void *data, int datalen, struct lustre_handle *lockh) { - __u64 res_id[RES_NAME_SIZE] = { md->lmd_object_id }; + __u64 res_id[RES_NAME_SIZE] = { lsm->lsm_object_id }; struct obd_device *obddev = class_conn2obd(connh); struct ldlm_extent *extent = extentp; int rc; diff --git a/lustre/utils/obd.c b/lustre/utils/obd.c index b8b0181..3f04568 100644 --- a/lustre/utils/obd.c +++ b/lustre/utils/obd.c @@ -1167,6 +1167,11 @@ int jt_obd_lov_config(int argc, char **argv) desc.ld_tgt_count); return -EINVAL; } + if (desc.ld_default_stripe_count == 0) { + fprintf(stderr, "error: %s: stripe count is zero\n", + cmdname(argv[0])); + return -EINVAL; + } desc.ld_default_stripe_size = strtoul(argv[3], &end, 0); if (*end) { fprintf(stderr, "error: %s: bad default stripe size '%s'\n", -- 1.8.3.1