From ef8c1bae68bfd8485beda6061b98b277e15d2fc4 Mon Sep 17 00:00:00 2001 From: yury Date: Tue, 13 Dec 2005 08:35:18 +0000 Subject: [PATCH] - former precreate pools stuff is returned back - some additions to make quota work with formet creareate code - some small fixes to filter_setattr() path --- lustre/ChangeLog | 9 - lustre/include/linux/lustre_export.h | 8 +- lustre/include/linux/lustre_idl.h | 12 +- lustre/include/linux/lustre_import.h | 3 - lustre/include/linux/obd.h | 6 +- lustre/include/linux/obd_support.h | 4 - lustre/lov/lov_obd.c | 57 ++- lustre/lvfs/fsfilt_ext3.c | 1 - lustre/mds/handler.c | 8 + lustre/mds/mds_lov.c | 52 ++- lustre/mds/mds_open.c | 7 - lustre/mds/mds_reint.c | 7 +- lustre/obdfilter/filter.c | 880 ++++++++++++++++------------------- lustre/obdfilter/filter_internal.h | 16 +- lustre/obdfilter/filter_io.c | 63 ++- lustre/obdfilter/filter_io_24.c | 29 +- lustre/obdfilter/filter_io_26.c | 29 +- lustre/obdfilter/filter_lvb.c | 22 +- lustre/osc/lproc_osc.c | 77 ++- lustre/osc/osc_create.c | 286 +++++++++--- lustre/osc/osc_request.c | 23 +- lustre/ptlrpc/import.c | 5 - lustre/ptlrpc/pack_generic.c | 16 +- lustre/ptlrpc/pinger.c | 91 +--- lustre/tests/replay-dual.sh | 4 +- lustre/tests/replay-single.sh | 4 +- lustre/utils/wiretest.c | 16 +- 27 files changed, 935 insertions(+), 800 deletions(-) diff --git a/lustre/ChangeLog b/lustre/ChangeLog index 5c4f770..b6fc65e 100644 --- a/lustre/ChangeLog +++ b/lustre/ChangeLog @@ -8,15 +8,6 @@ * bug fixes Severity : enhancement -Bugzilla : 8888 -Description: Introduced CReate On Write (CROW) -Details : CROW is improved create approach, which defers OST objects - creates to the time when they realy needed. This is when client - wants to perform first write to file for instance. Or when object - changes some of its attributes stored on OST. This should improve - create rate. - -Severity : enhancement Bugzilla : 7981/8208 Description: Introduced Lustre Networking (LNET) Details : LNET is new networking infrastructure for Lustre, it includes diff --git a/lustre/include/linux/lustre_export.h b/lustre/include/linux/lustre_export.h index 5804902..f939a78 100644 --- a/lustre/include/linux/lustre_export.h +++ b/lustre/include/linux/lustre_export.h @@ -21,10 +21,14 @@ struct mds_export_data { struct osc_creator { spinlock_t oscc_lock; + struct list_head oscc_list; struct obd_device *oscc_obd; + obd_id oscc_last_id;//last available pre-created object + obd_id oscc_next_id;// what object id to give out next + int oscc_grow_count; + struct obdo oscc_oa; int oscc_flags; - obd_id oscc_next_id; - wait_queue_head_t oscc_waitq; + wait_queue_head_t oscc_waitq; /* creating procs wait on this */ }; struct ldlm_export_data { diff --git a/lustre/include/linux/lustre_idl.h b/lustre/include/linux/lustre_idl.h index 5b44d17..09db217 100644 --- a/lustre/include/linux/lustre_idl.h +++ b/lustre/include/linux/lustre_idl.h @@ -229,6 +229,8 @@ static inline void lustre_msg_set_op_flags(struct lustre_msg *msg, int flags) #define OBD_CONNECT_REQPORTAL 0x40ULL /* Separate portal for non-IO reqs */ #define OBD_CONNECT_ACL 0x80ULL /* client using access control lists */ #define OBD_CONNECT_XATTR 0x100ULL /* client using extended attributes*/ + + #define OBD_CONNECT_CROW 0x200ULL /* MDS+OST do object create-on-write */ #define OBD_CONNECT_TRUNCLOCK 0x400ULL /* server gets locks for punch b=9528 */ #define OBD_CONNECT_TRANSNO 0x800ULL /* replay is sending initial transno */ @@ -240,7 +242,7 @@ static inline void lustre_msg_set_op_flags(struct lustre_msg *msg, int flags) OBD_CONNECT_IBITS | OBD_CONNECT_JOIN) #define OST_CONNECT_SUPPORTED (OBD_CONNECT_SRVLOCK | OBD_CONNECT_GRANT | \ OBD_CONNECT_REQPORTAL | OBD_CONNECT_VERSION | \ - OBD_CONNECT_TRUNCLOCK | OBD_CONNECT_CROW) + OBD_CONNECT_TRUNCLOCK) #define ECHO_CONNECT_SUPPORTED (0) #define OBD_OCD_VERSION(major,minor,patch,fix) (((major)<<24) + ((minor)<<16) +\ @@ -323,7 +325,8 @@ typedef uint32_t obd_count; #define OBD_FL_DEBUG_CHECK (0x00000040) /* echo client/server debug check */ #define OBD_FL_NO_USRQUOTA (0x00000100) /* the object's owner is over quota */ #define OBD_FL_NO_GRPQUOTA (0x00000200) /* the object's group is over quota */ -#define OBD_FL_CREATE_CROW (0x00000400) /* object should be created with crow */ +#define OBD_FL_CREATE_CROW (0x00000400) /* object swhould be created with crow */ + /* * set this to delegate DLM locking during obd_punch() to the OSTs. Only OSTs * that declared OBD_CONNECT_TRUNCLOCK in their connect flags support this @@ -370,11 +373,6 @@ struct obdo { #define o_dropped o_misc #define o_cksum o_nlink -#define OBDO_URGENT_CREATE(oa) \ - (!((oa)->o_valid & OBD_MD_FLFLAGS) || \ - !((oa)->o_flags & OBD_FL_CREATE_CROW) || \ - ((oa)->o_flags & OBD_FL_RECREATE_OBJS)) - extern void lustre_swab_obdo (struct obdo *o); diff --git a/lustre/include/linux/lustre_import.h b/lustre/include/linux/lustre_import.h index b0445bb..59cf6ad 100644 --- a/lustre/include/linux/lustre_import.h +++ b/lustre/include/linux/lustre_import.h @@ -96,9 +96,6 @@ struct obd_import { __u64 imp_connect_flags_orig; }; -#define IMP_CROW_ABLE(imp) \ - ((imp)->imp_connect_data.ocd_connect_flags & OBD_CONNECT_CROW) - typedef void (*obd_import_callback)(struct obd_import *imp, void *closure, int event, void *event_arg, void *cb_data); diff --git a/lustre/include/linux/obd.h b/lustre/include/linux/obd.h index a130628..be2f09a 100644 --- a/lustre/include/linux/obd.h +++ b/lustre/include/linux/obd.h @@ -246,6 +246,7 @@ struct filter_obd { __u64 fo_mount_count; int fo_destroy_in_progress; + struct semaphore fo_create_lock; struct file_operations *fo_fop; struct inode_operations *fo_iop; @@ -309,11 +310,6 @@ struct filter_obd { struct lustre_quota_ctxt fo_quota_ctxt; spinlock_t fo_quotacheck_lock; atomic_t fo_quotachecking; - - /* objids black list stuff. See for detailed comment in - * filter_clear_orphans() */ - struct filter_ext *fo_blacklist; - spinlock_t fo_blacklist_lock; }; struct mds_server_data; diff --git a/lustre/include/linux/obd_support.h b/lustre/include/linux/obd_support.h index 2ab3d1f..ea0c5b2 100644 --- a/lustre/include/linux/obd_support.h +++ b/lustre/include/linux/obd_support.h @@ -39,7 +39,6 @@ extern unsigned int obd_fail_loc; extern unsigned int obd_dump_on_timeout; extern unsigned int obd_timeout; /* seconds */ #define PING_INTERVAL max(obd_timeout / 4, 1U) -#define STATFS_INTERVAL max(obd_timeout / 20, 1U) extern unsigned int ldlm_timeout; extern unsigned int obd_health_check_timeout; extern char obd_lustre_upcall[128]; @@ -170,9 +169,6 @@ extern wait_queue_head_t obd_race_waitq; #define OBD_FAIL_MDC_REVALIDATE_PAUSE 0x800 -#define OBD_FAIL_OST_CROW_EIO 0x801 -#define OBD_FAIL_OST_CLEAR_ORPHANS_RACE 0x802 - /* preparation for a more advanced failure testbed (not functional yet) */ #define OBD_FAIL_MASK_SYS 0x0000FF00 #define OBD_FAIL_MASK_LOC (0x000000FF | OBD_FAIL_MASK_SYS) diff --git a/lustre/lov/lov_obd.c b/lustre/lov/lov_obd.c index 90a5b78..2e294eb 100644 --- a/lustre/lov/lov_obd.c +++ b/lustre/lov/lov_obd.c @@ -823,8 +823,6 @@ static int lov_clear_orphans(struct obd_export *export, struct obdo *src_oa, continue; memcpy(tmp_oa, src_oa, sizeof(*tmp_oa)); - tmp_oa->o_valid |= OBD_MD_FLID; - tmp_oa->o_id = oti->oti_objid[i]; LASSERT(lov->tgts[i].ltd_exp); /* XXX: LOV STACKING: use real "obj_mdp" sub-data */ @@ -842,14 +840,52 @@ static int lov_clear_orphans(struct obd_export *export, struct obdo *src_oa, RETURN(rc); } +static int lov_recreate(struct obd_export *exp, struct obdo *src_oa, + struct lov_stripe_md **ea, struct obd_trans_info *oti) +{ + struct lov_stripe_md *obj_mdp, *lsm; + struct lov_obd *lov = &exp->exp_obd->u.lov; + unsigned ost_idx; + int rc, i; + ENTRY; + + LASSERT(src_oa->o_valid & OBD_MD_FLFLAGS && + src_oa->o_flags & OBD_FL_RECREATE_OBJS); + + OBD_ALLOC(obj_mdp, sizeof(*obj_mdp)); + if (obj_mdp == NULL) + RETURN(-ENOMEM); + + ost_idx = src_oa->o_nlink; + lsm = *ea; + if (lsm == NULL) + GOTO(out, rc = -EINVAL); + if (ost_idx >= lov->desc.ld_tgt_count) + GOTO(out, rc = -EINVAL); + + for (i = 0; i < lsm->lsm_stripe_count; i++) { + if (lsm->lsm_oinfo[i].loi_ost_idx == ost_idx) { + if (lsm->lsm_oinfo[i].loi_id != src_oa->o_id) + GOTO(out, rc = -EINVAL); + break; + } + } + if (i == lsm->lsm_stripe_count) + GOTO(out, rc = -EINVAL); + + rc = obd_create(lov->tgts[ost_idx].ltd_exp, src_oa, &obj_mdp, oti); +out: + OBD_FREE(obj_mdp, sizeof(*obj_mdp)); + RETURN(rc); +} + /* the LOV expects oa->o_id to be set to the LOV object id */ -static int -lov_create(struct obd_export *exp, struct obdo *src_oa, +static int lov_create(struct obd_export *exp, struct obdo *src_oa, struct lov_stripe_md **ea, struct obd_trans_info *oti) { + struct lov_obd *lov; struct lov_request_set *set = NULL; struct list_head *pos; - struct lov_obd *lov; int rc = 0; ENTRY; @@ -863,14 +899,17 @@ lov_create(struct obd_export *exp, struct obdo *src_oa, RETURN(rc); } - LASSERT(ergo(src_oa->o_valid & OBD_MD_FLFLAGS, - !!(src_oa->o_flags & OBD_FL_CREATE_CROW) != - !!(src_oa->o_flags & OBD_FL_RECREATE_OBJS))); - lov = &exp->exp_obd->u.lov; if (!lov->desc.ld_active_tgt_count) RETURN(-EIO); + /* Recreate a specific object id at the given OST index */ + if ((src_oa->o_valid & OBD_MD_FLFLAGS) && + (src_oa->o_flags & OBD_FL_RECREATE_OBJS)) { + rc = lov_recreate(exp, src_oa, ea, oti); + RETURN(rc); + } + rc = lov_prep_create_set(exp, ea, src_oa, oti, &set); if (rc) RETURN(rc); diff --git a/lustre/lvfs/fsfilt_ext3.c b/lustre/lvfs/fsfilt_ext3.c index f5b8a97..4fecc71 100644 --- a/lustre/lvfs/fsfilt_ext3.c +++ b/lustre/lvfs/fsfilt_ext3.c @@ -451,7 +451,6 @@ static int fsfilt_ext3_setattr(struct dentry *dentry, void *handle, } unlock_kernel(); - return rc; } diff --git a/lustre/mds/handler.c b/lustre/mds/handler.c index 788d784..ec6da33 100644 --- a/lustre/mds/handler.c +++ b/lustre/mds/handler.c @@ -2021,6 +2021,14 @@ int mds_postrecov(struct obd_device *obd) LASSERT(!obd->obd_recovering); LASSERT(llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT) != NULL); + /* set nextid first, so we are sure it happens */ + rc = mds_lov_set_nextid(obd); + if (rc) { + CERROR("%s: mds_lov_set_nextid failed\n", + obd->obd_name); + GOTO(out, rc); + } + /* clean PENDING dir */ rc = mds_cleanup_pending(obd); if (rc < 0) { diff --git a/lustre/mds/mds_lov.c b/lustre/mds/mds_lov.c index 4b163b8..744ef14 100644 --- a/lustre/mds/mds_lov.c +++ b/lustre/mds/mds_lov.c @@ -106,37 +106,48 @@ int mds_lov_write_objids(struct obd_device *obd) int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid) { - struct lov_stripe_md *empty_ea = NULL; - struct obd_trans_info oti = { 0 }; - struct obdo *oa; int rc; + struct obdo oa; + struct obd_trans_info oti = {0}; + struct lov_stripe_md *empty_ea = NULL; ENTRY; LASSERT(mds->mds_lov_objids != NULL); - oa = obdo_alloc(); - if (oa == NULL) - RETURN(-ENOMEM); - - oa->o_valid = OBD_MD_FLFLAGS; - oa->o_flags = OBD_FL_DELORPHAN; - + /* This create will in fact either create or destroy: If the OST is + * missing objects below this ID, they will be created. If it finds + * objects above this ID, they will be removed. */ + memset(&oa, 0, sizeof(oa)); + oa.o_valid = OBD_MD_FLFLAGS; + oa.o_flags = OBD_FL_DELORPHAN; if (ost_uuid != NULL) { - memcpy(&oa->o_inline, ost_uuid, sizeof(*ost_uuid)); - oa->o_valid |= OBD_MD_FLINLINE; + memcpy(&oa.o_inline, ost_uuid, sizeof(*ost_uuid)); + oa.o_valid |= OBD_MD_FLINLINE; } + rc = obd_create(mds->mds_osc_exp, &oa, &empty_ea, &oti); + + RETURN(rc); +} + +/* update the LOV-OSC knowledge of the last used object id's */ +int mds_lov_set_nextid(struct obd_device *obd) +{ + struct mds_obd *mds = &obd->u.mds; + int rc; + ENTRY; - oti.oti_objid = mds->mds_lov_objids; - rc = obd_create(mds->mds_osc_exp, oa, &empty_ea, &oti); + LASSERT(!obd->obd_recovering); - obdo_free(oa); + LASSERT(mds->mds_lov_objids != NULL); + + rc = obd_set_info(mds->mds_osc_exp, strlen("next_id"), "next_id", + mds->mds_lov_desc.ld_tgt_count, mds->mds_lov_objids); RETURN(rc); } /* update the LOV-OSC knowledge of the last used object id's */ int mds_lov_connect(struct obd_device *obd, char * lov_name) { - struct obd_connect_data *data = NULL; struct mds_obd *mds = &obd->u.mds; struct lustre_handle conn = {0,}; int valsize; @@ -156,15 +167,8 @@ int mds_lov_connect(struct obd_device *obd, char * lov_name) RETURN(-ENOTCONN); } - OBD_ALLOC_PTR(data); - if (!data) - RETURN(-ENOMEM); - data->ocd_connect_flags = OBD_CONNECT_CROW; - rc = obd_connect(&conn, mds->mds_osc_obd, &obd->obd_uuid, - data); - OBD_FREE_PTR(data); - + NULL /* obd_connect_data */); if (rc) { CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc); mds->mds_osc_obd = ERR_PTR(rc); diff --git a/lustre/mds/mds_open.c b/lustre/mds/mds_open.c index d6a468a..f688174 100644 --- a/lustre/mds/mds_open.c +++ b/lustre/mds/mds_open.c @@ -380,7 +380,6 @@ static int mds_create_objects(struct ptlrpc_request *req, int offset, oa->o_gid = 0; oa->o_mode = S_IFREG | 0600; oa->o_id = inode->i_ino; - oa->o_flags = OBD_FL_CREATE_CROW; oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLFLAGS | OBD_MD_FLMODE | OBD_MD_FLUID | OBD_MD_FLGID; oa->o_size = 0; @@ -445,12 +444,6 @@ static int mds_create_objects(struct ptlrpc_request *req, int offset, oa->o_generation = body->fid1.generation; oa->o_valid |= OBD_MD_FLFID | OBD_MD_FLGENER; - /* do not set CROW flag in setattr path as it is not needed - * there and only confuses setattr code in filter. */ - oa->o_flags &= ~OBD_FL_CREATE_CROW; - if (!oa->o_flags) - oa->o_valid &= ~OBD_MD_FLFLAGS; - rc = obd_setattr(mds->mds_osc_exp, oa, lsm, &oti); if (rc) { CERROR("error setting attrs for inode %lu: rc %d\n", diff --git a/lustre/mds/mds_reint.c b/lustre/mds/mds_reint.c index 7c7a755..d800547 100644 --- a/lustre/mds/mds_reint.c +++ b/lustre/mds/mds_reint.c @@ -450,9 +450,9 @@ static int mds_reint_setattr(struct mds_update_record *rec, int offset, struct ptlrpc_request *req, struct lustre_handle *lh) { + unsigned int ia_valid = rec->ur_iattr.ia_valid; struct mds_obd *mds = mds_req2mds(req); struct obd_device *obd = req->rq_export->exp_obd; - unsigned int ia_valid = rec->ur_iattr.ia_valid; struct mds_body *body; struct dentry *de; struct inode *inode = NULL; @@ -600,10 +600,7 @@ static int mds_reint_setattr(struct mds_update_record *rec, int offset, mds_pack_inode2fid(&body->fid1, inode); mds_pack_inode2body(body, inode); - /* don't return OST-specific attributes if we didn't just set them. Use - * saved ->ia_valid here, as rec->ur_iattr.ia_valid gets rewritten by - * fsfilt_setattr() what breaks case of truncating file with no object - * on OST and no lsm (test_34c from sanity.sh). --umka */ + /* don't return OST-specific attributes if we didn't just set them. */ if (ia_valid & ATTR_SIZE) body->valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS; if (ia_valid & (ATTR_MTIME | ATTR_MTIME_SET)) diff --git a/lustre/obdfilter/filter.c b/lustre/obdfilter/filter.c index 81dad57..2f6c647 100644 --- a/lustre/obdfilter/filter.c +++ b/lustre/obdfilter/filter.c @@ -558,12 +558,6 @@ static int filter_cleanup_groups(struct obd_device *obd) int i; ENTRY; - if (filter->fo_blacklist != NULL) { - OBD_FREE(filter->fo_blacklist, - FILTER_GROUPS * sizeof(struct filter_ext)); - filter->fo_blacklist = NULL; - } - if (filter->fo_dentry_O_groups != NULL) { for (i = 0; i < FILTER_GROUPS; i++) { dentry = filter->fo_dentry_O_groups[i]; @@ -616,11 +610,6 @@ static int filter_prep_groups(struct obd_device *obd) int i, rc = 0, cleanup_phase = 0; ENTRY; - OBD_ALLOC(filter->fo_blacklist, - FILTER_GROUPS * sizeof(struct filter_ext)); - if (!filter->fo_blacklist) - GOTO(cleanup, rc = -ENOMEM); - O_dentry = simple_mkdir(current->fs->pwd, "O", 0700, 1); CDEBUG(D_INODE, "got/created O: %p\n", O_dentry); if (IS_ERR(O_dentry)) { @@ -880,35 +869,34 @@ static void filter_post(struct obd_device *obd) pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); } -static void filter_set_last_id(struct filter_obd *filter, - int group, obd_id id) +static void filter_set_last_id(struct filter_obd *filter, struct obdo *oa, + obd_id id) { + obd_gr group = 0; LASSERT(filter->fo_fsd != NULL); - LASSERT(group <= FILTER_GROUPS); - spin_lock(&filter->fo_objidlock); - filter->fo_last_objids[group] = id; - spin_unlock(&filter->fo_objidlock); -} - -static void filter_grow_last_id(struct filter_obd *filter, - int group, obd_id id) -{ - LASSERT(filter->fo_fsd != NULL); - LASSERT(group <= FILTER_GROUPS); + if (oa != NULL) { + LASSERT(oa->o_gr <= FILTER_GROUPS); + group = oa->o_gr; + } spin_lock(&filter->fo_objidlock); - if (id > filter->fo_last_objids[group]) filter->fo_last_objids[group] = id; spin_unlock(&filter->fo_objidlock); } -__u64 filter_last_id(struct filter_obd *filter, int group) +__u64 filter_last_id(struct filter_obd *filter, struct obdo *oa) { obd_id id; + obd_gr group = 0; LASSERT(filter->fo_fsd != NULL); - LASSERT(group < FILTER_GROUPS); + if (oa != NULL) { + LASSERT(oa->o_gr <= FILTER_GROUPS); + group = oa->o_gr; + } + + /* FIXME: object groups */ spin_lock(&filter->fo_objidlock); id = filter->fo_last_objids[group]; spin_unlock(&filter->fo_objidlock); @@ -916,46 +904,12 @@ __u64 filter_last_id(struct filter_obd *filter, int group) return id; } -static void filter_lock_dentry(struct obd_device *obd, - struct dentry *dparent) +static int filter_lock_dentry(struct obd_device *obd, struct dentry *dparent) { down(&dparent->d_inode->i_sem); + return 0; } -static void filter_unlock_dentry(struct obd_device *obd, - struct dentry *dparent) -{ - up(&dparent->d_inode->i_sem); -} - -static void filter_parents_access(struct obd_device *obd, - obd_gr group, int lock) -{ - void (*access_func) (struct obd_device *, struct dentry *); - struct filter_obd *filter = &obd->u.filter; - struct dentry *dparent; - int i = 0; - - access_func = lock ? filter_lock_dentry : - filter_unlock_dentry; - - if (group > 0 || filter->fo_subdir_count == 0) { - dparent = filter->fo_dentry_O_groups[group]; - access_func(obd, dparent); - } else { - for (i = 0; i < filter->fo_subdir_count; i++) { - dparent = filter->fo_dentry_O_sub[i]; - access_func(obd, dparent); - } - } -} - -#define LOCK_PARENTS(obd, group) \ - filter_parents_access(obd, group, 1) - -#define UNLOCK_PARENTS(obd, group) \ - filter_parents_access(obd, group, 0) - /* We never dget the object parent, so DON'T dput it either */ struct dentry *filter_parent(struct obd_device *obd, obd_gr group, obd_id objid) { @@ -972,22 +926,22 @@ struct dentry *filter_parent(struct obd_device *obd, obd_gr group, obd_id objid) struct dentry *filter_parent_lock(struct obd_device *obd, obd_gr group, obd_id objid) { - struct dentry *dparent = filter_parent(obd, group, objid); unsigned long now = jiffies; + struct dentry *dparent = filter_parent(obd, group, objid); + int rc; if (IS_ERR(dparent)) return dparent; - filter_lock_dentry(obd, dparent); + rc = filter_lock_dentry(obd, dparent); fsfilt_check_slow(now, obd_timeout, "parent lock"); - return dparent; + return rc ? ERR_PTR(rc) : dparent; } -/* we never dget the object parent, so DON'T dput it either */ -static void filter_parent_unlock(struct obd_device *obd, - struct dentry *dparent) +/* We never dget the object parent, so DON'T dput it either */ +static void filter_parent_unlock(struct dentry *dparent) { - filter_unlock_dentry(obd, dparent); + up(&dparent->d_inode->i_sem); } /* How to get files, dentries, inodes from object id's. @@ -1029,7 +983,7 @@ struct dentry *filter_fid2dentry(struct obd_device *obd, dparent->d_name.len, dparent->d_name.name, name); dchild = /*ll_*/lookup_one_len(name, dparent, len); if (dir_dentry == NULL) - filter_parent_unlock(obd, dparent); + filter_parent_unlock(dparent); if (IS_ERR(dchild)) { CERROR("%s: child lookup error %ld\n", obd->obd_name, PTR_ERR(dchild)); @@ -1075,8 +1029,9 @@ static int filter_prepare_destroy(struct obd_device *obd, obd_id objid) /* Caller must hold LCK_PW on parent and push us into kernel context. * Caller is also required to ensure that dchild->d_inode exists. */ -static int filter_unlink(struct obd_device *obd, obd_id objid, - struct dentry *dparent, struct dentry *dchild) +static int filter_destroy_internal(struct obd_device *obd, obd_id objid, + struct dentry *dparent, + struct dentry *dchild) { struct inode *inode = dchild->d_inode; int rc; @@ -1425,8 +1380,8 @@ int filter_common_setup(struct obd_device *obd, obd_count len, void *buf, GOTO(err_mntput, rc); filter->fo_destroy_in_progress = 0; - - spin_lock_init(&filter->fo_blacklist_lock); + sema_init(&filter->fo_create_lock, 1); + spin_lock_init(&filter->fo_translock); spin_lock_init(&filter->fo_objidlock); spin_lock_init(&filter->fo_stats_lock); @@ -1685,7 +1640,6 @@ static int filter_cleanup(struct obd_device *obd) static int filter_connect_internal(struct obd_export *exp, struct obd_connect_data *data) { - struct filter_obd *filter = &exp->exp_obd->u.filter; if (data != NULL) { CDEBUG(D_RPCTRACE, "%s: cli %s/%p ocd_connect_flags: "LPX64 " ocd_version: %x ocd_grant: %d\n", @@ -1697,16 +1651,6 @@ static int filter_connect_internal(struct obd_export *exp, exp->exp_connect_flags = data->ocd_connect_flags; data->ocd_version = LUSTRE_VERSION_CODE; - if (!(filter->fo_fsd->fsd_feature_rocompat & - cpu_to_le32(OBD_ROCOMPAT_CROW)) && - data->ocd_connect_flags & OBD_CONNECT_CROW) { - filter->fo_fsd->fsd_feature_rocompat |= - cpu_to_le32(OBD_ROCOMPAT_CROW); - filter_update_server_data(exp->exp_obd, - filter->fo_rcvd_filp, - filter->fo_fsd, 1); - } - if (exp->exp_connect_flags & OBD_CONNECT_GRANT) { obd_size left, want; @@ -2014,6 +1958,45 @@ static int filter_getattr(struct obd_export *exp, struct obdo *oa, RETURN(rc); } +/* this should be enabled/disabled in condition to enabled/disabled large inodes + * in backing store FS. */ +int filter_update_fidea(struct obd_export *exp, struct inode *inode, + void *handle, struct obdo *oa) +{ + struct obd_device *obd = exp->exp_obd; + int rc = 0; + ENTRY; + + if (oa->o_valid & OBD_MD_FLFID) { + struct filter_fid ff; + obd_gr group = 0; + + if (oa->o_valid & OBD_MD_FLGROUP) + group = oa->o_gr; + + /* packing fid and converting it to LE for storing into EA. Here + * ->o_stripe_idx should be filled by LOV and rest of fields - + * by client. */ + ff.ff_fid.id = cpu_to_le64(oa->o_fid); + ff.ff_fid.f_type = cpu_to_le32(oa->o_stripe_idx); + ff.ff_fid.generation = cpu_to_le32(oa->o_generation); + ff.ff_objid = cpu_to_le64(oa->o_id); + ff.ff_group = cpu_to_le64(group); + + CDEBUG(D_INODE, "storing filter fid EA ("LPU64"/%u/%u" + LPU64"/"LPU64")\n", oa->o_fid, oa->o_stripe_idx, + oa->o_generation, oa->o_id, group); + + rc = fsfilt_set_md(obd, inode, handle, &ff, sizeof(ff)); + if (rc) + CERROR("store fid in object failed! rc: %d\n", rc); + } else { + CDEBUG(D_HA, "OSS object without fid info!\n"); + } + + RETURN(rc); +} + /* this is called from filter_truncate() until we have filter_punch() */ int filter_setattr_internal(struct obd_export *exp, struct dentry *dentry, struct obdo *oa, struct obd_trans_info *oti) @@ -2021,14 +2004,17 @@ int filter_setattr_internal(struct obd_export *exp, struct dentry *dentry, unsigned int orig_ids[MAXQUOTAS] = {0, 0}; struct llog_cookie *fcc = NULL; struct filter_obd *filter; + int rc, err, locked = 0; + struct inode *inode; struct iattr iattr; void *handle; - int rc, err; ENTRY; LASSERT(dentry != NULL); LASSERT(!IS_ERR(dentry)); - LASSERT(dentry->d_inode != NULL); + + inode = dentry->d_inode; + LASSERT(inode != NULL); filter = &exp->exp_obd->u.filter; iattr_from_obdo(&iattr, oa, oa->o_valid); @@ -2039,16 +2025,48 @@ int filter_setattr_internal(struct obd_export *exp, struct dentry *dentry, memcpy(fcc, obdo_logcookie(oa), sizeof(*fcc)); } - if (iattr.ia_valid & ATTR_SIZE) - down(&dentry->d_inode->i_sem); + if (iattr.ia_valid & ATTR_SIZE || iattr.ia_valid & (ATTR_UID | ATTR_GID)) { + down(&inode->i_sem); + locked = 1; + } + /* If the inode still has SUID+SGID bits set (see filter_precreate()) + * then we will accept the UID+GID sent by the client during write for + * initializing the ownership of this inode. We only allow this to + * happen once so clear these bits in setattr. In 2.6 kernels it is + * possible to get ATTR_UID and ATTR_GID separately, so we only clear + * the flags that are actually being set. */ if (iattr.ia_valid & (ATTR_UID | ATTR_GID)) { - orig_ids[USRQUOTA] = dentry->d_inode->i_uid; - orig_ids[GRPQUOTA] = dentry->d_inode->i_gid; - handle = fsfilt_start_log(exp->exp_obd, dentry->d_inode, + CDEBUG(D_INODE, "update UID/GID to %lu/%lu\n", + (unsigned long)oa->o_uid, (unsigned long)oa->o_gid); + + if ((inode->i_mode & S_ISUID) && + (iattr.ia_valid & ATTR_UID)) { + if (!(iattr.ia_valid & ATTR_MODE)) { + iattr.ia_mode = inode->i_mode; + iattr.ia_valid |= ATTR_MODE; + } + iattr.ia_mode &= ~S_ISUID; + } + if ((inode->i_mode & S_ISGID) && + (iattr.ia_valid & ATTR_GID)) { + if (!(iattr.ia_valid & ATTR_MODE)) { + iattr.ia_mode = inode->i_mode; + iattr.ia_valid |= ATTR_MODE; + } + iattr.ia_mode &= ~S_ISGID; + } + + orig_ids[USRQUOTA] = inode->i_uid; + orig_ids[GRPQUOTA] = inode->i_gid; + handle = fsfilt_start_log(exp->exp_obd, inode, FSFILT_OP_SETATTR, oti, 1); + + /* update inode EA only once */ + if (inode->i_mode & S_ISUID || inode->i_mode & S_ISGID) + filter_update_fidea(exp, inode, handle, oa); } else { - handle = fsfilt_start(exp->exp_obd, dentry->d_inode, + handle = fsfilt_start(exp->exp_obd, inode, FSFILT_OP_SETATTR, oti); } @@ -2056,7 +2074,7 @@ int filter_setattr_internal(struct obd_export *exp, struct dentry *dentry, GOTO(out_unlock, rc = PTR_ERR(handle)); if (oa->o_valid & OBD_MD_FLFLAGS) { - rc = fsfilt_iocontrol(exp->exp_obd, dentry->d_inode, NULL, + rc = fsfilt_iocontrol(exp->exp_obd, inode, NULL, EXT3_IOC_SETFLAGS, (long)&oa->o_flags); } else { rc = fsfilt_setattr(exp->exp_obd, dentry, handle, &iattr, 1); @@ -2068,9 +2086,14 @@ int filter_setattr_internal(struct obd_export *exp, struct dentry *dentry, fcc); } + if (locked) { + up(&inode->i_sem); + locked = 0; + } + rc = filter_finish_transno(exp, oti, rc); - err = fsfilt_commit(exp->exp_obd, dentry->d_inode, handle, 0); + err = fsfilt_commit(exp->exp_obd, inode, handle, 0); if (err) { CERROR("error on commit, err = %d\n", err); if (!rc) @@ -2078,8 +2101,8 @@ int filter_setattr_internal(struct obd_export *exp, struct dentry *dentry, } EXIT; out_unlock: - if (iattr.ia_valid & ATTR_SIZE) - up(&dentry->d_inode->i_sem); + if (locked) + up(&inode->i_sem); /* trigger quota release */ if (iattr.ia_valid & (ATTR_SIZE | ATTR_UID | ATTR_GID)) { @@ -2105,16 +2128,13 @@ int filter_setattr(struct obd_export *exp, struct obdo *oa, int rc; ENTRY; - //LASSERT(oti != NULL); - + dentry = __filter_oa2dentry(exp->exp_obd, oa, + __FUNCTION__, 1); + if (IS_ERR(dentry)) + RETURN(PTR_ERR(dentry)); + filter = &exp->exp_obd->u.filter; push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL); - - /* make sure that object is allocated. */ - dentry = filter_crow_object(exp->exp_obd, oa); - if (IS_ERR(dentry)) - GOTO(out_pop, rc = PTR_ERR(dentry)); - lock_kernel(); /* setting objects attributes (including owner/group) */ @@ -2142,7 +2162,6 @@ int filter_setattr(struct obd_export *exp, struct obdo *oa, out_unlock: unlock_kernel(); f_dput(dentry); -out_pop: pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL); return rc; } @@ -2197,6 +2216,96 @@ static int filter_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp, RETURN(lsm_size); } +static void filter_destroy_precreated(struct obd_export *exp, struct obdo *oa, + struct filter_obd *filter) +{ + struct obdo doa; /* XXX obdo on stack */ + __u64 last, id; + ENTRY; + LASSERT(oa); + + memset(&doa, 0, sizeof(doa)); + if (oa->o_valid & OBD_MD_FLGROUP) { + doa.o_valid |= OBD_MD_FLGROUP; + doa.o_gr = oa->o_gr; + } else { + doa.o_gr = 0; + } + doa.o_mode = S_IFREG; + + filter->fo_destroy_in_progress = 1; + down(&filter->fo_create_lock); + if (!filter->fo_destroy_in_progress) { + CERROR("%s: destroy_in_progress already cleared\n", + exp->exp_obd->obd_name); + up(&filter->fo_create_lock); + EXIT; + return; + } + + last = filter_last_id(filter, &doa); + CWARN("%s: deleting orphan objects from "LPU64" to "LPU64"\n", + exp->exp_obd->obd_name, oa->o_id + 1, last); + for (id = oa->o_id + 1; id <= last; id++) { + doa.o_id = id; + filter_destroy(exp, &doa, NULL, NULL, NULL); + } + + CDEBUG(D_HA, "%s: after destroy: set last_objids["LPU64"] = "LPU64"\n", + exp->exp_obd->obd_name, doa.o_gr, oa->o_id); + + spin_lock(&filter->fo_objidlock); + filter->fo_last_objids[doa.o_gr] = oa->o_id; + spin_unlock(&filter->fo_objidlock); + + filter->fo_destroy_in_progress = 0; + up(&filter->fo_create_lock); + + EXIT; +} + +/* returns a negative error or a nonnegative number of files to create */ +static int filter_should_precreate(struct obd_export *exp, struct obdo *oa, + obd_gr group) +{ + struct obd_device *obd = exp->exp_obd; + struct filter_obd *filter = &obd->u.filter; + int diff, rc; + ENTRY; + + diff = oa->o_id - filter_last_id(filter, oa); + CDEBUG(D_INFO, "filter_last_id() = "LPU64" -> diff = %d\n", + filter_last_id(filter, oa), diff); + + /* delete orphans request */ + if ((oa->o_valid & OBD_MD_FLFLAGS) && + (oa->o_flags & OBD_FL_DELORPHAN)) { + if (diff >= 0) + RETURN(diff); + if (-diff > OST_MAX_PRECREATE) { + CERROR("%s: ignoring bogus orphan destroy request: " + "obdid "LPU64" last_id "LPU64"\n", obd->obd_name, + oa->o_id, filter_last_id(filter, oa)); + RETURN(-EINVAL); + } + filter_destroy_precreated(exp, oa, filter); + rc = filter_update_last_objid(obd, group, 0); + if (rc) + CERROR("%s: unable to write lastobjid, but orphans" + "were deleted\n", obd->obd_name); + RETURN(0); + } else { + /* only precreate if group == 0 and o_id is specfied */ + if (!(oa->o_valid & OBD_FL_DELORPHAN) && + (group != 0 || oa->o_id == 0)) + RETURN(1); + + LASSERTF(diff >= 0,"%s: "LPU64" - "LPU64" = %d\n",obd->obd_name, + oa->o_id, filter_last_id(filter, oa), diff); + RETURN(diff); + } +} + static int filter_statfs(struct obd_device *obd, struct obd_statfs *osfs, unsigned long max_age) { @@ -2233,185 +2342,231 @@ static int filter_statfs(struct obd_device *obd, struct obd_statfs *osfs, RETURN(rc); } -struct dentry * -filter_create_object(struct obd_device *obd, struct obdo *oa) +/* We rely on the fact that only one thread will be creating files in a given + * group at a time, which is why we don't need an atomic filter_get_new_id. + * Even if we had that atomic function, the following race would exist: + * + * thread 1: gets id x from filter_next_id + * thread 2: gets id (x + 1) from filter_next_id + * thread 2: creates object (x + 1) + * thread 1: tries to create object x, gets -ENOSPC + */ +static int filter_precreate(struct obd_device *obd, struct obdo *oa, + obd_gr group, int *num) { - struct dentry *dparent = NULL; - struct dentry *dchild = NULL; - struct lvfs_ucred uc = {0,}; - struct lvfs_run_ctxt saved; + struct dentry *dchild = NULL, *dparent = NULL; struct filter_obd *filter; - int cleanup_phase = 0; - int err = 0, rc = 0; + struct obd_statfs *osfs; + int err = 0, rc = 0, recreate_obj = 0, i; + unsigned long enough_time = jiffies + (obd_timeout * HZ) / 3; + __u64 next_id; void *handle = NULL; - obd_gr group = 0; ENTRY; filter = &obd->u.filter; - CDEBUG(D_INFO, "create objid "LPU64"\n", oa->o_id); - - if (oa->o_valid & OBD_MD_FLGROUP) - group = oa->o_gr; - - dparent = filter_parent_lock(obd, group, oa->o_id); - if (IS_ERR(dparent)) - GOTO(cleanup, dchild = dparent); - cleanup_phase = 1; - - /* check if object is in blacklist. This should be done under parent - * lock. */ - spin_lock(&filter->fo_blacklist_lock); - if (oa->o_id > filter->fo_blacklist[group].fe_start && - oa->o_id <= filter->fo_blacklist[group].fe_end) { - spin_unlock(&filter->fo_blacklist_lock); - GOTO(cleanup, dchild = ERR_PTR(-ENOENT)); - } - spin_unlock(&filter->fo_blacklist_lock); - - /* check if object is already allocated */ - dchild = filter_fid2dentry(obd, dparent, group, oa->o_id); - if (IS_ERR(dchild)) - GOTO(cleanup, dchild); - - /* Files that already exist should only be below or at last_id */ - if (dchild->d_inode) { - __u64 last_id = filter_last_id(filter, group); - - LASSERTF(oa->o_id <= last_id, - "existing objid "LPU64" larger than last_id "LPU64"\n", - oa->o_id, last_id); - GOTO(cleanup, dchild); + if ((oa->o_valid & OBD_MD_FLFLAGS) && + (oa->o_flags & OBD_FL_RECREATE_OBJS)) { + recreate_obj = 1; + } else { + OBD_ALLOC(osfs, sizeof(*osfs)); + if (osfs == NULL) + RETURN(-ENOMEM); + rc = filter_statfs(obd, osfs, jiffies - HZ); + if (rc == 0 && osfs->os_bavail < (osfs->os_blocks >> 10)) { + CDEBUG(D_HA, "OST out of space! avail "LPU64"\n", + osfs->os_bavail<fo_obt.obt_sb->s_blocksize_bits); + *num=0; + rc = -ENOSPC; + } + OBD_FREE(osfs, sizeof(*osfs)); + if (rc) { + RETURN(rc); + } } - /* create new object */ - handle = fsfilt_start_log(obd, dparent->d_inode, - FSFILT_OP_CREATE, NULL, 1); - if (IS_ERR(handle)) - GOTO(cleanup, dchild = handle); - cleanup_phase = 2; - - uc.luc_fsuid = oa->o_valid & OBD_MD_FLUID ? - oa->o_uid : 0; - uc.luc_fsgid = oa->o_valid & OBD_MD_FLGID ? - oa->o_gid : 0; - uc.luc_cap = current->cap_effective; + CDEBUG(D_HA, "%s: precreating %d objects\n", obd->obd_name, *num); - cap_raise(uc.luc_cap, CAP_SYS_RESOURCE); + down(&filter->fo_create_lock); - push_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc); - rc = ll_vfs_create(dparent->d_inode, dchild, S_IFREG, NULL); - pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc); + for (i = 0; i < *num && err == 0; i++) { + int cleanup_phase = 0; - if (rc) { - CERROR("create failed rc = %d\n", rc); - f_dput(dchild); - GOTO(cleanup, dchild = ERR_PTR(rc)); - } + if (filter->fo_destroy_in_progress) { + CWARN("%s: precreate aborted by destroy\n", + obd->obd_name); + break; + } - /* grow last created object id. */ - filter_grow_last_id(filter, group, oa->o_id); - rc = filter_update_last_objid(obd, group, 0); - if (rc) { - CERROR("unable to write lastobjid, but " - "object is created, err = %d\n", - rc); - rc = 0; - } + if (recreate_obj) { + __u64 last_id; + next_id = oa->o_id; + last_id = filter_last_id(filter, oa); + if (next_id > last_id) { + CERROR("Error: Trying to recreate obj greater" + "than last id "LPD64" > "LPD64"\n", + next_id, last_id); + GOTO(cleanup, rc = -EINVAL); + } + } else + next_id = filter_last_id(filter, oa) + 1; + + CDEBUG(D_INFO, "precreate objid "LPU64"\n", next_id); + + dparent = filter_parent_lock(obd, group, next_id); + if (IS_ERR(dparent)) + GOTO(cleanup, rc = PTR_ERR(dparent)); + cleanup_phase = 1; + + dchild = filter_fid2dentry(obd, dparent, group, next_id); + if (IS_ERR(dchild)) + GOTO(cleanup, rc = PTR_ERR(dchild)); + cleanup_phase = 2; + + if (dchild->d_inode != NULL) { + /* This would only happen if lastobjid was bad on disk*/ + /* Could also happen if recreating missing obj but + * already exists + */ + if (recreate_obj) { + CERROR("%s: recreating existing object %.*s?\n", + obd->obd_name, dchild->d_name.len, + dchild->d_name.name); + } else { + CERROR("%s: Serious error: objid %.*s already " + "exists; is this filesystem corrupt?\n", + obd->obd_name, dchild->d_name.len, + dchild->d_name.name); + LBUG(); + } + GOTO(cleanup, rc = -EEXIST); + } - /* nobody else is touching this newly created object */ - LASSERT(dchild->d_inode); + handle = fsfilt_start_log(obd, dparent->d_inode, + FSFILT_OP_CREATE, NULL, 1); + if (IS_ERR(handle)) + GOTO(cleanup, rc = PTR_ERR(handle)); + cleanup_phase = 3; - if (oa->o_valid & OBD_MD_FLFID) { - struct filter_fid ff; + rc = ll_vfs_create(dparent->d_inode, dchild, + S_IFREG | S_ISUID | S_ISGID | 0666, NULL); + if (rc) { + CERROR("create failed rc = %d\n", rc); + GOTO(cleanup, rc); + } - /* packing fid and converting it to LE for storing into EA. Here - * oa->o_stripe_idx should be filled by LOV and rest of fields - - * by client. */ - ff.ff_fid.id = cpu_to_le64(oa->o_fid); - ff.ff_fid.f_type = cpu_to_le32(oa->o_stripe_idx); - ff.ff_fid.generation = cpu_to_le32(oa->o_generation); - ff.ff_objid = cpu_to_le64(oa->o_id); - ff.ff_group = cpu_to_le64(group); + if (!recreate_obj) { + filter_set_last_id(filter, oa, next_id); + err = filter_update_last_objid(obd, group, 0); + if (err) + CERROR("unable to write lastobjid " + "but file created\n"); + } - down(&dchild->d_inode->i_sem); - rc = fsfilt_set_md(obd, dchild->d_inode, handle,&ff,sizeof(ff)); - up(&dchild->d_inode->i_sem); - if (rc) { - CERROR("store fid in object failed! rc:%d\n", rc); + cleanup: + switch(cleanup_phase) { + case 3: + err = fsfilt_commit(obd, dparent->d_inode, handle, 0); + if (err) { + CERROR("error on commit, err = %d\n", err); + if (!rc) + rc = err; + } + case 2: f_dput(dchild); - GOTO(cleanup, dchild = ERR_PTR(rc)); + case 1: + filter_parent_unlock(dparent); + case 0: + break; } - } else { - CDEBUG(D_HA, "create OSS object without fid!\n"); - } -cleanup: - switch(cleanup_phase) { - case 2: - err = fsfilt_commit(obd, dparent->d_inode, handle, 0); - if (err) { - CERROR("error on commit, err = %d\n", err); - if (!rc) { - rc = err; - f_dput(dchild); - dchild = ERR_PTR(rc); - } + if (rc) + break; + if (time_after(jiffies, enough_time)) { + CDEBUG(D_INODE,"%s: precreate slow - want %d got %d \n", + obd->obd_name, *num, i); + break; } - case 1: - filter_parent_unlock(obd, dparent); - case 0: - break; } + *num = i; - RETURN(dchild); + up(&filter->fo_create_lock); + + CDEBUG(D_HA, "%s: server last_objid for group "LPU64": "LPU64"\n", + obd->obd_name, group, filter->fo_last_objids[group]); + + CDEBUG(D_HA, "%s: filter_precreate() created %d objects\n", + obd->obd_name, i); + RETURN(rc); } -struct dentry * -filter_crow_object(struct obd_device *obd, struct obdo *oa) +static int filter_create(struct obd_export *exp, struct obdo *oa, + struct lov_stripe_md **ea, struct obd_trans_info *oti) { - struct filter_obd *filter; - struct dentry *dentry; + struct obd_device *obd = NULL; + struct lvfs_run_ctxt saved; + struct lov_stripe_md *lsm = NULL; obd_gr group = 0; + int rc = 0, diff; ENTRY; - if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_OST_CROW_EIO)) - RETURN(ERR_PTR(-EIO)); - - filter = &obd->u.filter; - if (oa->o_valid & OBD_MD_FLGROUP) group = oa->o_gr; - /* try to create new object (if it is not yet) */ - dentry = filter_create_object(obd, oa); - if (IS_ERR(dentry)) { - CERROR("cannot create OSS object "LPU64"/"LPU64 - ", err = %d\n", oa->o_id, group, - (int)PTR_ERR(dentry)); - RETURN(dentry); + CDEBUG(D_INFO, "filter_create(od->o_gr="LPU64",od->o_id="LPU64")\n", + group, oa->o_id); + if (ea != NULL) { + lsm = *ea; + if (lsm == NULL) { + rc = obd_alloc_memmd(exp, &lsm); + if (rc < 0) + RETURN(rc); + } } - RETURN(dentry); + obd = exp->exp_obd; + push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); + + if ((oa->o_valid & OBD_MD_FLFLAGS) && + (oa->o_flags & OBD_FL_RECREATE_OBJS)) { + if (oa->o_id > filter_last_id(&obd->u.filter, oa)) { + CERROR("recreate objid "LPU64" > last id "LPU64"\n", + oa->o_id, filter_last_id(&obd->u.filter, oa)); + rc = -EINVAL; + } else { + diff = 1; + rc = filter_precreate(obd, oa, group, &diff); + } + } else { + diff = filter_should_precreate(exp, oa, group); + if (diff > 0) { + oa->o_id = filter_last_id(&obd->u.filter, oa); + rc = filter_precreate(obd, oa, group, &diff); + oa->o_id = filter_last_id(&obd->u.filter, oa); + oa->o_valid = OBD_MD_FLID; + } + } + + pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); + if (rc && ea != NULL && *ea != lsm) { + obd_free_memmd(exp, &lsm); + } else if (rc == 0 && ea != NULL) { + /* XXX LOV STACKING: the lsm that is passed to us from + * LOV does not have valid lsm_oinfo data structs, so + * don't go touching that. This needs to be fixed in a + * big way. */ + lsm->lsm_object_id = oa->o_id; + *ea = lsm; + } + + RETURN(rc); } -/* destroys object @oa. Takes care of locking if @lock says that parent is not - * yet locked. Also drops parent lock before taking ldlm PW lock to avoid - * deadlocks in lock retraction related paths. - * - * This function does not change locking and does not imply hiden locking - * knowladge. After this fucntion is finished, all parents stay at the same - * locking state. - - * If @lock == 1, this means that parent of @oa is not locked and should be - * locked for destroy operation. However, after operation is finished, parent - * will be unlocked. The same is true about opposite case, when parent is - * already locked and filter_destroy_internal() does not need to lock it. */ -static int -filter_destroy_internal(struct obd_export *exp, struct obdo *oa, - struct lov_stripe_md *md, struct obd_trans_info *oti, - int lock) +int filter_destroy(struct obd_export *exp, struct obdo *oa, + struct lov_stripe_md *md, struct obd_trans_info *oti, + struct obd_export *md_exp) { + unsigned int qcids[MAXQUOTAS] = {0, 0}; struct obd_device *obd; struct filter_obd *filter; struct dentry *dchild = NULL, *dparent = NULL; @@ -2419,7 +2574,6 @@ filter_destroy_internal(struct obd_export *exp, struct obdo *oa, void *handle = NULL; struct llog_cookie *fcc = NULL; int rc, rc2, cleanup_phase = 0, have_prepared = 0; - unsigned int qcids[MAXQUOTAS] = {0, 0}; obd_gr group = 0; ENTRY; @@ -2432,9 +2586,7 @@ filter_destroy_internal(struct obd_export *exp, struct obdo *oa, push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); acquire_locks: - dparent = lock ? - filter_parent_lock(obd, group, oa->o_id): - filter_parent(obd, group, oa->o_id); + dparent = filter_parent_lock(obd, group, oa->o_id); if (IS_ERR(dparent)) GOTO(cleanup, rc = PTR_ERR(dparent)); cleanup_phase = 1; @@ -2457,11 +2609,11 @@ filter_destroy_internal(struct obd_export *exp, struct obdo *oa, } if (!have_prepared) { - /* If we're really going to destroy the object, get ready by - * getting the clients to discard their cached data. + /* If we're really going to destroy the object, get ready + * by getting the clients to discard their cached data. * * We have to drop the parent lock, because - * filter_prepare_destroy() will acquire a PW on the object, and + * filter_prepare_destroy will acquire a PW on the object, and * we don't want to deadlock with an incoming write to the * object, which has the extent PW and then wants to get the * parent dentry to do the lookup. @@ -2470,15 +2622,9 @@ filter_destroy_internal(struct obd_export *exp, struct obdo *oa, * complication of condition the above code to skip it on the * second time through. */ f_dput(dchild); + filter_parent_unlock(dparent); - filter_unlock_dentry(obd, dparent); filter_prepare_destroy(obd, oa->o_id); - - /* lock parent dentry again, to keep locking state the same as - * before calling this function. */ - if (!lock) - filter_lock_dentry(obd, dparent); - have_prepared = 1; goto acquire_locks; } @@ -2497,8 +2643,9 @@ filter_destroy_internal(struct obd_export *exp, struct obdo *oa, /* Quota release need uid/gid of inode */ obdo_from_inode(oa, dchild->d_inode, OBD_MD_FLUID|OBD_MD_FLGID); - rc = filter_unlink(obd, oa->o_id, dparent, dchild); + rc = filter_destroy_internal(obd, oa->o_id, dparent, dchild); + EXIT; cleanup: switch(cleanup_phase) { case 3: @@ -2517,8 +2664,7 @@ cleanup: case 2: f_dput(dchild); case 1: - if (lock) - filter_parent_unlock(obd, dparent); + filter_parent_unlock(dparent); case 0: pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); break; @@ -2534,207 +2680,7 @@ cleanup: FSFILT_OP_UNLINK); CDEBUG(rc2 ? D_ERROR : D_QUOTA, "filter adjust qunit! (rc:%d)\n", rc2); - - RETURN(rc); -} - -/* destroy oject with taking lock on parent first. */ -int filter_destroy(struct obd_export *exp, struct obdo *oa, - struct lov_stripe_md *md, struct obd_trans_info *oti, - struct obd_export *md_exp) -{ - int rc; - - ENTRY; - rc = filter_destroy_internal(exp, oa, md, oti, 1); - RETURN(rc); -} - -static int -filter_clear_orphans(struct obd_export *exp, struct obdo *oa) -{ - struct filter_obd *filter; - struct obd_device *obd; - struct obdo *doa; - obd_gr group = 0; - int rc, orphans; - __u64 last, id; - ENTRY; - - LASSERT(oa); - - OBD_RACE(OBD_FAIL_OST_CLEAR_ORPHANS_RACE); - - obd = exp->exp_obd; - filter = &obd->u.filter; - - if (oa->o_valid & OBD_MD_FLGROUP) - group = oa->o_gr; - - filter->fo_destroy_in_progress = 1; - - LOCK_PARENTS(obd, group); - if (!filter->fo_destroy_in_progress) { - UNLOCK_PARENTS(obd, group); - CDEBUG(D_HA, "cleanup orphans is already canceled\n"); - RETURN(0); - } - - last = filter_last_id(filter, group); - orphans = last - oa->o_id; - - if (orphans <= 0) { - filter->fo_destroy_in_progress = 0; - UNLOCK_PARENTS(obd, group); - CDEBUG(D_HA, "nothing to cleanup, MDS objid "LPU64 - " is not bigger than OST one "LPU64"\n", - oa->o_id, last); - RETURN(0); - } - - CDEBUG(D_HA, "adding orphans extent "LPU64":"LPU64"-"LPU64 - " to blacklist\n", group, oa->o_id, last); - - /* making all orphans entries in blacklist, that will deny to re-create - * them by CROW in filter_create_object(). This is done for case when - * orphans already exist on client and will be tried to write something - * and we want to stop them. - * - * In fact the issue is even worse, as we want to put in blacklist not - * only the objects which we just destroed, but also those which not yet - * created on OST (and OST has no idea about) but possibly existing on - * clients. */ - spin_lock(&filter->fo_blacklist_lock); - filter->fo_blacklist[group].fe_start = oa->o_id; - filter->fo_blacklist[group].fe_end = last; - spin_unlock(&filter->fo_blacklist_lock); - - doa = obdo_alloc(); - if (doa == NULL) { - filter->fo_destroy_in_progress = 0; - UNLOCK_PARENTS(obd, group); - RETURN(-ENOMEM); - } - - doa->o_gr = group; - doa->o_mode = S_IFREG; - doa->o_valid = oa->o_valid & (OBD_MD_FLGROUP | OBD_MD_FLID); - - CDEBUG(D_ERROR, "%s:["LPU64"] deleting orphan objects from "LPU64" to " - LPU64"\n", exp->exp_obd->obd_name, doa->o_gr, oa->o_id, last); - - for (id = last; id > oa->o_id; id--) { - doa->o_id = id; - - /* remove object @doa. It will not lock parent as parents - * already locked. */ - filter_destroy_internal(exp, doa, NULL, NULL, 0); - - /* update last id just for case when OST will down in cleanup - * orphans time. */ - filter_set_last_id(filter, group, id); - - /* update last_id on disk periodicaly */ - if ((id & 1023) == 0) - filter_update_last_objid(obd, group, 0); - } - - UNLOCK_PARENTS(obd, group); - - /* return next free id to be used as a new start of sequence. As we - * return last id from OST, this will make sure that MDS will start new - * sequence from object id which is far from existing and there will not - * be object id sharing. */ - oa->o_id = last + 1; - filter_set_last_id(filter, group, oa->o_id); - - CDEBUG(D_ERROR, "%s:["LPU64"] after destroy: set last_objids = " - LPU64"\n", exp->exp_obd->obd_name, doa->o_gr, oa->o_id); - - rc = filter_update_last_objid(obd, group, 1); - filter->fo_destroy_in_progress = 0; - - obdo_free(doa); - RETURN(rc); -} - -static int filter_create(struct obd_export *exp, struct obdo *oa, - struct lov_stripe_md **ea, struct obd_trans_info *oti) -{ - struct filter_export_data *fed; - struct lvfs_run_ctxt saved; - struct filter_obd *filter; - obd_gr group = oa->o_gr; - struct obd_device *obd; - int rc = 0; - ENTRY; - - obd = exp->exp_obd; - fed = &exp->exp_filter_data; - filter = &obd->u.filter; - - CDEBUG(D_INFO, "filter_create(od->o_gr="LPU64",od->o_id="LPU64")\n", - group, oa->o_id); - - if (oa->o_valid & OBD_MD_FLFLAGS && oa->o_flags == OBD_FL_DELORPHAN) { - push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); - - rc = filter_clear_orphans(exp, oa); - if (rc) { - CERROR("cannot clear orphans starting from " - LPU64", err = %d\n", oa->o_id, rc); - } - pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); - RETURN(rc); - } - - LASSERT(ergo(oa->o_valid & OBD_MD_FLFLAGS, - !!(oa->o_flags & OBD_FL_CREATE_CROW) != - !!(oa->o_flags & OBD_FL_RECREATE_OBJS))); - - /* all non-CROW creates should end up here */ - if (OBDO_URGENT_CREATE(oa)) { - struct obd_statfs *osfs; - struct dentry *dentry; - - /* check space first. As this is real create and client does not - * have yet file created, this is good place to check space. */ - OBD_ALLOC_PTR(osfs); - if (!osfs) - RETURN(-ENOMEM); - - rc = filter_statfs(obd, osfs, jiffies - HZ); - if (rc == 0 && osfs->os_bavail < (osfs->os_blocks >> 10)) { - CDEBUG(D_HA, "OST out of space! avail "LPU64"\n", - osfs->os_bavail << filter->fo_obt.obt_sb->s_blocksize_bits); - rc = -ENOSPC; - } - - OBD_FREE_PTR(osfs); - if (rc) - RETURN(rc); - - dentry = filter_create_object(obd, oa); - if (!IS_ERR(dentry)) { - f_dput(dentry); - if (ea != NULL) { - struct lov_stripe_md *lsm = *ea; - if (lsm == NULL) { - rc = obd_alloc_memmd(exp, &lsm); - if (rc) - RETURN(rc); - } - lsm->lsm_object_id = oa->o_id; - *ea = lsm; - rc = 0; - } - } - } else { - CERROR("wrong @oa flags detected 0x%lx. Not an urgent " - "create and not recovery.\n",(unsigned long)oa->o_flags); - LBUG(); - } - RETURN(rc); + return rc; } /* NB start and end are used for punch, but not truncate */ diff --git a/lustre/obdfilter/filter_internal.h b/lustre/obdfilter/filter_internal.h index 0b6daf5..989c990 100644 --- a/lustre/obdfilter/filter_internal.h +++ b/lustre/obdfilter/filter_internal.h @@ -29,9 +29,9 @@ #define FILTER_SUBDIR_COUNT 32 /* set to zero for no subdirs */ #define FILTER_GROUPS 3 /* must be at least 3; not dynamic yet */ -#define FILTER_RECOVERY_TIMEOUT (obd_timeout * 5 * HZ / 2) /* *waves hands* */ +#define FILTER_ROCOMPAT_SUPP (0) -#define FILTER_ROCOMPAT_SUPP (OBD_ROCOMPAT_CROW) +#define FILTER_RECOVERY_TIMEOUT (obd_timeout * 5 * HZ / 2) /* *waves hands* */ #define FILTER_INCOMPAT_SUPP (OBD_INCOMPAT_GROUPS) @@ -103,20 +103,20 @@ struct dentry *filter_fid2dentry(struct obd_device *, struct dentry *dir, struct dentry *__filter_oa2dentry(struct obd_device *obd, struct obdo *oa, const char *what, int quiet); #define filter_oa2dentry(obd, oa) __filter_oa2dentry(obd, oa, __FUNCTION__, 0) -#define filter_oa2dentry_quiet(obd, oa) __filter_oa2dentry(obd, oa, __FUNCTION__, 1) int filter_finish_transno(struct obd_export *, struct obd_trans_info *, int rc); -__u64 filter_last_id(struct filter_obd *, int group); +__u64 filter_next_id(struct filter_obd *, struct obdo *); +__u64 filter_last_id(struct filter_obd *, struct obdo *); +int filter_update_fidea(struct obd_export *exp, struct inode *inode, + void *handle, struct obdo *oa); int filter_update_server_data(struct obd_device *, struct file *, struct filter_server_data *, int force_sync); int filter_update_last_objid(struct obd_device *, obd_gr, int force_sync); int filter_common_setup(struct obd_device *, obd_count len, void *buf, void *option); int filter_destroy(struct obd_export *exp, struct obdo *oa, - struct lov_stripe_md *md, struct obd_trans_info *, - struct obd_export *md_exp); -struct dentry *filter_crow_object(struct obd_device *obd, struct obdo *oa); - + struct lov_stripe_md *md, struct obd_trans_info *, + struct obd_export *); int filter_setattr_internal(struct obd_export *exp, struct dentry *dentry, struct obdo *oa, struct obd_trans_info *oti); int filter_setattr(struct obd_export *exp, struct obdo *oa, diff --git a/lustre/obdfilter/filter_io.c b/lustre/obdfilter/filter_io.c index c2e7219..da25e3c 100644 --- a/lustre/obdfilter/filter_io.c +++ b/lustre/obdfilter/filter_io.c @@ -296,20 +296,16 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa, iobuf = filter_iobuf_get(&obd->u.filter, oti); - dentry = filter_oa2dentry_quiet(obd, oa); + dentry = filter_oa2dentry(obd, oa); if (IS_ERR(dentry)) { - if (PTR_ERR(dentry) == -ENOENT) { - dentry = NULL; - inode = NULL; - } else { - dentry = NULL; - GOTO(cleanup, rc = PTR_ERR(dentry)); - } - } else { - inode = dentry->d_inode; + rc = PTR_ERR(dentry); + dentry = NULL; + GOTO(cleanup, rc); } - - if (oa && inode != NULL) + + inode = dentry->d_inode; + + if (oa) obdo_to_inode(inode, oa, OBD_MD_FLATIME); fsfilt_check_slow(now, obd_timeout, "preprw_read setup"); @@ -328,10 +324,9 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa, */ LASSERT(lnb->page != NULL); - if (inode == NULL || inode->i_size <= rnb->offset) - /* If there's no more data, or inode is not yet - * allocated by CROW abort early. lnb->rc == 0, so it's - * easy to detect later. */ + if (inode->i_size <= rnb->offset) + /* If there's no more data, abort early. lnb->rc == 0, + * so it's easy to detect later. */ break; else filter_alloc_dio_page(obd, inode, lnb); @@ -348,12 +343,10 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa, fsfilt_check_slow(now, obd_timeout, "start_page_read"); - if (inode != NULL) { - rc = filter_direct_io(OBD_BRW_READ, dentry, iobuf, - exp, NULL, NULL, NULL); - if (rc) - GOTO(cleanup, rc); - } + rc = filter_direct_io(OBD_BRW_READ, dentry, iobuf, + exp, NULL, NULL, NULL); + if (rc) + GOTO(cleanup, rc); lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_READ_BYTES, tot_bytes); @@ -521,19 +514,24 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa, LASSERT(objcount == 1); LASSERT(obj->ioo_bufcnt > 0); - OBD_RACE(OBD_FAIL_OST_CLEAR_ORPHANS_RACE); - + push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL); iobuf = filter_iobuf_get(&exp->exp_obd->u.filter, oti); + if (iobuf == NULL) + GOTO(cleanup, rc = -ENOMEM); cleanup_phase = 1; - push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL); - - /* make sure that object is already allocated */ - dentry = filter_crow_object(exp->exp_obd, oa); + dentry = filter_fid2dentry(exp->exp_obd, NULL, obj->ioo_gr, + obj->ioo_id); if (IS_ERR(dentry)) GOTO(cleanup, rc = PTR_ERR(dentry)); cleanup_phase = 2; + if (dentry->d_inode == NULL) { + CERROR("%s: trying to BRW to non-existent file "LPU64"\n", + exp->exp_obd->obd_name, obj->ioo_id); + GOTO(cleanup, rc = -ENOENT); + } + fso.fso_dentry = dentry; fso.fso_bufcnt = obj->ioo_bufcnt; @@ -552,13 +550,12 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa, rc = filter_grant_check(exp, objcount, &fso, niocount, nb, res, &left, dentry->d_inode); - /* We're finishing using body->oa as an input variable, so reset - * o_valid here. */ + /* do not zero out oa->o_valid as it is used in filter_commitrw_write() + * for setting UID/GID and fid EA in first write time. */ if (oa && oa->o_valid & OBD_MD_FLGRANT) { oa->o_grant = filter_grant(exp,oa->o_grant,oa->o_undirty,left); - oa->o_valid = OBD_MD_FLGRANT; - } else if (oa) - oa->o_valid = 0; + oa->o_valid |= OBD_MD_FLGRANT; + } spin_unlock(&exp->exp_obd->obd_osfs_lock); diff --git a/lustre/obdfilter/filter_io_24.c b/lustre/obdfilter/filter_io_24.c index 1b69b0a..f0e7e27 100644 --- a/lustre/obdfilter/filter_io_24.c +++ b/lustre/obdfilter/filter_io_24.c @@ -428,7 +428,34 @@ int filter_commitrw_write(struct obd_export *exp, struct obdo *oa, int objcount, fsfilt_check_slow(now, obd_timeout, "brw_start"); - iattr_from_obdo(&iattr,oa,OBD_MD_FLATIME|OBD_MD_FLMTIME|OBD_MD_FLCTIME); + i = OBD_MD_FLATIME | OBD_MD_FLMTIME | OBD_MD_FLCTIME; + + /* If the inode still has SUID+SGID bits set (see filter_precreate()) + * then we will accept the UID+GID if sent by the client for + * initializing the ownership of this inode. We only allow this to + * happen once (so clear these bits) and later only allow setattr. */ + if (inode->i_mode & S_ISUID) + i |= OBD_MD_FLUID; + if (inode->i_mode & S_ISGID) + i |= OBD_MD_FLGID; + + iattr_from_obdo(&iattr, oa, i); + if (iattr.ia_valid & (ATTR_UID | ATTR_GID)) { + CDEBUG(D_INODE, "update UID/GID to %lu/%lu\n", + (unsigned long)oa->o_uid, (unsigned long)oa->o_gid); + + cap_raise(current->cap_effective, CAP_SYS_RESOURCE); + + iattr.ia_valid |= ATTR_MODE; + iattr.ia_mode = inode->i_mode; + if (iattr.ia_valid & ATTR_UID) + iattr.ia_mode &= ~S_ISUID; + if (iattr.ia_valid & ATTR_GID) + iattr.ia_mode &= ~S_ISGID; + + rc = filter_update_fidea(exp, inode, oti->oti_handle, oa); + } + /* filter_direct_io drops i_sem */ rc = filter_direct_io(OBD_BRW_WRITE, res->dentry, iobuf, exp, &iattr, oti, &wait_handle); diff --git a/lustre/obdfilter/filter_io_26.c b/lustre/obdfilter/filter_io_26.c index 7821606..e4b9721 100644 --- a/lustre/obdfilter/filter_io_26.c +++ b/lustre/obdfilter/filter_io_26.c @@ -613,7 +613,34 @@ int filter_commitrw_write(struct obd_export *exp, struct obdo *oa, fsfilt_check_slow(now, obd_timeout, "brw_start"); - iattr_from_obdo(&iattr,oa,OBD_MD_FLATIME|OBD_MD_FLMTIME|OBD_MD_FLCTIME); + i = OBD_MD_FLATIME | OBD_MD_FLMTIME | OBD_MD_FLCTIME; + + /* If the inode still has SUID+SGID bits set (see filter_precreate()) + * then we will accept the UID+GID if sent by the client for + * initializing the ownership of this inode. We only allow this to + * happen once (so clear these bits) and later only allow setattr. */ + if (inode->i_mode & S_ISUID) + i |= OBD_MD_FLUID; + if (inode->i_mode & S_ISGID) + i |= OBD_MD_FLGID; + + iattr_from_obdo(&iattr, oa, i); + if (iattr.ia_valid & (ATTR_UID | ATTR_GID)) { + CDEBUG(D_INODE, "update UID/GID to %lu/%lu\n", + (unsigned long)oa->o_uid, (unsigned long)oa->o_gid); + + cap_raise(current->cap_effective, CAP_SYS_RESOURCE); + + iattr.ia_valid |= ATTR_MODE; + iattr.ia_mode = inode->i_mode; + if (iattr.ia_valid & ATTR_UID) + iattr.ia_mode &= ~S_ISUID; + if (iattr.ia_valid & ATTR_GID) + iattr.ia_mode &= ~S_ISGID; + + rc = filter_update_fidea(exp, inode, oti->oti_handle, oa); + } + /* filter_direct_io drops i_sem */ rc = filter_direct_io(OBD_BRW_WRITE, res->dentry, iobuf, exp, &iattr, oti, &wait_handle); diff --git a/lustre/obdfilter/filter_lvb.c b/lustre/obdfilter/filter_lvb.c index 1a47b1e..5f5eeaa 100644 --- a/lustre/obdfilter/filter_lvb.c +++ b/lustre/obdfilter/filter_lvb.c @@ -75,28 +75,24 @@ static int filter_lvbo_init(struct ldlm_resource *res) if (IS_ERR(dentry)) RETURN(PTR_ERR(dentry)); - if (dentry->d_inode == NULL) { - lvb->lvb_size = 0; - lvb->lvb_blocks = 0; - - /* making client use MDS mtime as this one is zero, bigger one - * will be taken and this does not break POSIX */ - lvb->lvb_mtime = 0; - } else { - lvb->lvb_size = dentry->d_inode->i_size; - lvb->lvb_mtime = LTIME_S(dentry->d_inode->i_mtime); - lvb->lvb_blocks = dentry->d_inode->i_blocks; - } + if (dentry->d_inode == NULL) + GOTO(out_dentry, rc = -ENOENT); + + lvb->lvb_size = dentry->d_inode->i_size; + lvb->lvb_mtime = LTIME_S(dentry->d_inode->i_mtime); + lvb->lvb_blocks = dentry->d_inode->i_blocks; CDEBUG(D_DLMTRACE, "res: "LPU64" initial lvb size: "LPU64", " "mtime: "LPU64", blocks: "LPU64"\n", res->lr_name.name[0], lvb->lvb_size, lvb->lvb_mtime, lvb->lvb_blocks); + EXIT; +out_dentry: f_dput(dentry); /* Don't free lvb data on lookup error */ - RETURN(rc); + return rc; } /* This will be called in two ways: diff --git a/lustre/osc/lproc_osc.c b/lustre/osc/lproc_osc.c index a4dae3a..b4ca5d9 100644 --- a/lustre/osc/lproc_osc.c +++ b/lustre/osc/lproc_osc.c @@ -169,6 +169,65 @@ static int osc_rd_cur_grant_bytes(char *page, char **start, off_t off, return rc; } +static int osc_rd_create_count(char *page, char **start, off_t off, int count, + int *eof, void *data) +{ + struct obd_device *obd = data; + + if (obd == NULL) + return 0; + + return snprintf(page, count, "%d\n", + obd->u.cli.cl_oscc.oscc_grow_count); +} + +static int osc_wr_create_count(struct file *file, const char *buffer, + unsigned long count, void *data) +{ + struct obd_device *obd = data; + int val, rc; + + if (obd == NULL) + return 0; + + rc = lprocfs_write_helper(buffer, count, &val); + if (rc) + return rc; + + if (val < 0) + return -ERANGE; + if (val > OST_MAX_PRECREATE) + return -ERANGE; + + obd->u.cli.cl_oscc.oscc_grow_count = val; + + return count; +} + +static int osc_rd_prealloc_next_id(char *page, char **start, off_t off, + int count, int *eof, void *data) +{ + struct obd_device *obd = data; + + if (obd == NULL) + return 0; + + return snprintf(page, count, LPU64"\n", + obd->u.cli.cl_oscc.oscc_next_id); +} + +static int osc_rd_prealloc_last_id(char *page, char **start, off_t off, + int count, int *eof, void *data) +{ + struct obd_device *obd = data; + + if (obd == NULL) + return 0; + + return snprintf(page, count, LPU64"\n", + obd->u.cli.cl_oscc.oscc_last_id); +} + static int osc_rd_checksum(char *page, char **start, off_t off, int count, int *eof, void *data) { @@ -199,20 +258,6 @@ static int osc_wr_checksum(struct file *file, const char *buffer, return count; } -static int osc_rd_last_id(char *page, char **start, off_t off, - int count, int *eof, void *data) -{ - struct obd_device *obd = (struct obd_device *)data; - struct osc_creator *oscc = &obd->u.cli.cl_oscc; - int rc; - - *eof = 1; - spin_lock(&oscc->oscc_lock); - rc = snprintf(page, count, LPU64"\n", oscc->oscc_next_id); - spin_unlock(&oscc->oscc_lock); - return rc; -} - static struct lprocfs_vars lprocfs_obd_vars[] = { { "uuid", lprocfs_rd_uuid, 0, 0 }, { "ping", 0, lprocfs_wr_ping, 0 }, @@ -232,8 +277,10 @@ static struct lprocfs_vars lprocfs_obd_vars[] = { { "max_dirty_mb", osc_rd_max_dirty_mb, osc_wr_max_dirty_mb, 0 }, { "cur_dirty_bytes", osc_rd_cur_dirty_bytes, 0, 0 }, { "cur_grant_bytes", osc_rd_cur_grant_bytes, 0, 0 }, + { "create_count", osc_rd_create_count, osc_wr_create_count, 0 }, + { "prealloc_next_id", osc_rd_prealloc_next_id, 0, 0 }, + { "prealloc_last_id", osc_rd_prealloc_last_id, 0, 0 }, { "checksums", osc_rd_checksum, osc_wr_checksum, 0 }, - { "last_id", osc_rd_last_id, 0, 0 }, { 0 } }; diff --git a/lustre/osc/osc_create.c b/lustre/osc/osc_create.c index ceaccb5..89bb1b6 100644 --- a/lustre/osc/osc_create.c +++ b/lustre/osc/osc_create.c @@ -59,70 +59,202 @@ #include #include "osc_internal.h" -int oscc_recovering(struct osc_creator *oscc) +static int osc_interpret_create(struct ptlrpc_request *req, void *data, int rc) { - int recov = 0; + struct osc_creator *oscc; + struct ost_body *body = NULL; + ENTRY; + + if (req->rq_repmsg) { + body = lustre_swab_repbuf(req, 0, sizeof(*body), + lustre_swab_ost_body); + if (body == NULL && rc == 0) + rc = -EPROTO; + } + oscc = req->rq_async_args.pointer_arg[0]; + LASSERT(oscc && (oscc->oscc_obd != LP_POISON)); + spin_lock(&oscc->oscc_lock); - recov = oscc->oscc_flags & OSCC_FLAG_RECOVERING; - spin_unlock(&oscc->oscc_lock); + oscc->oscc_flags &= ~OSCC_FLAG_CREATING; + if (rc == -ENOSPC || rc == -EROFS) { + oscc->oscc_flags |= OSCC_FLAG_NOSPC; + if (body && rc == -ENOSPC) { + oscc->oscc_grow_count = OST_MIN_PRECREATE; + oscc->oscc_last_id = body->oa.o_id; + } + spin_unlock(&oscc->oscc_lock); + DEBUG_REQ(D_INODE, req, "OST out of space, flagging"); + } else if (rc != 0 && rc != -EIO) { + oscc->oscc_flags |= OSCC_FLAG_RECOVERING; + oscc->oscc_grow_count = OST_MIN_PRECREATE; + spin_unlock(&oscc->oscc_lock); + DEBUG_REQ(D_ERROR, req, + "unknown rc %d from async create: failing oscc", rc); + ptlrpc_fail_import(req->rq_import, req->rq_import_generation); + } else { + if (rc == 0) { + oscc->oscc_flags &= ~OSCC_FLAG_LOW; + if (body) { + int diff = body->oa.o_id - oscc->oscc_last_id; + if (diff != oscc->oscc_grow_count) + oscc->oscc_grow_count = + max(diff/3, OST_MIN_PRECREATE); + oscc->oscc_last_id = body->oa.o_id; + } + } + spin_unlock(&oscc->oscc_lock); + } - return recov; + CDEBUG(D_HA, "preallocated through id "LPU64" (last used "LPU64")\n", + oscc->oscc_last_id, oscc->oscc_next_id); + + wake_up(&oscc->oscc_waitq); + RETURN(rc); } -static int osc_check_state(struct obd_export *exp) +static int oscc_internal_create(struct osc_creator *oscc) { - int rc; + struct ptlrpc_request *request; + struct ost_body *body; + int size = sizeof(*body); ENTRY; - /* ->os_state contains positive error code on remote OST. To convert it - * to usual errno form we have to make an sign inversion. */ - spin_lock(&exp->exp_obd->obd_osfs_lock); - rc = -exp->exp_obd->obd_osfs.os_state; - spin_unlock(&exp->exp_obd->obd_osfs_lock); - - RETURN(rc); + spin_lock(&oscc->oscc_lock); + if (oscc->oscc_grow_count < OST_MAX_PRECREATE && + !(oscc->oscc_flags & (OSCC_FLAG_LOW | OSCC_FLAG_RECOVERING)) && + (__s64)(oscc->oscc_last_id - oscc->oscc_next_id) <= + (oscc->oscc_grow_count / 4 + 1)) { + oscc->oscc_flags |= OSCC_FLAG_LOW; + oscc->oscc_grow_count *= 2; + } + + if (oscc->oscc_grow_count > OST_MAX_PRECREATE / 2) + oscc->oscc_grow_count = OST_MAX_PRECREATE / 2; + + if (oscc->oscc_flags & OSCC_FLAG_CREATING || + oscc->oscc_flags & OSCC_FLAG_RECOVERING) { + spin_unlock(&oscc->oscc_lock); + RETURN(0); + } + oscc->oscc_flags |= OSCC_FLAG_CREATING; + spin_unlock(&oscc->oscc_lock); + + request = ptlrpc_prep_req(oscc->oscc_obd->u.cli.cl_import, + LUSTRE_OST_VERSION, OST_CREATE, 1, + &size, NULL); + if (request == NULL) { + spin_lock(&oscc->oscc_lock); + oscc->oscc_flags &= ~OSCC_FLAG_CREATING; + spin_unlock(&oscc->oscc_lock); + RETURN(-ENOMEM); + } + + request->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249 + body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof(*body)); + + spin_lock(&oscc->oscc_lock); + body->oa.o_id = oscc->oscc_last_id + oscc->oscc_grow_count; + body->oa.o_valid |= OBD_MD_FLID; + spin_unlock(&oscc->oscc_lock); + CDEBUG(D_HA, "preallocating through id "LPU64" (last used "LPU64")\n", + body->oa.o_id, oscc->oscc_next_id); + + request->rq_replen = lustre_msg_size(1, &size); + + request->rq_async_args.pointer_arg[0] = oscc; + request->rq_interpret_reply = osc_interpret_create; + ptlrpcd_add_req(request); + + RETURN(0); +} + +static int oscc_has_objects(struct osc_creator *oscc, int count) +{ + int have_objs; + spin_lock(&oscc->oscc_lock); + have_objs = ((__s64)(oscc->oscc_last_id - oscc->oscc_next_id) >= count); + spin_unlock(&oscc->oscc_lock); + + if (!have_objs) + oscc_internal_create(oscc); + + return have_objs; +} + +static int oscc_wait_for_objects(struct osc_creator *oscc, int count) +{ + int have_objs; + int ost_full; + int osc_invalid; + + have_objs = oscc_has_objects(oscc, count); + + spin_lock(&oscc->oscc_lock); + ost_full = (oscc->oscc_flags & OSCC_FLAG_NOSPC); + spin_unlock(&oscc->oscc_lock); + + osc_invalid = oscc->oscc_obd->u.cli.cl_import->imp_invalid; + + return have_objs || ost_full || osc_invalid; } -static int osc_check_nospc(struct obd_export *exp) +static int oscc_precreate(struct osc_creator *oscc, int wait) { - __u64 blocks, bavail; - __u64 inodes, iavail; + struct l_wait_info lwi = { 0 }; int rc = 0; ENTRY; - spin_lock(&exp->exp_obd->obd_osfs_lock); - blocks = exp->exp_obd->obd_osfs.os_blocks; - bavail = exp->exp_obd->obd_osfs.os_bavail; - inodes = exp->exp_obd->obd_osfs.os_files; - iavail = exp->exp_obd->obd_osfs.os_ffree; - spin_unlock(&exp->exp_obd->obd_osfs_lock); - - /* return 1 if available space smaller then (blocks >> 10) of all space - * on OST. The main point of this water mark is to stop create files at - * some point, to let all created and opened files finish possible - * writes. */ - if (blocks > 0 && bavail < (blocks >> 10)) - rc = 1; + if (oscc_has_objects(oscc, oscc->oscc_grow_count / 2)) + RETURN(0); + + if (!wait) + RETURN(0); - if (inodes > 0 && iavail < 128) - rc = 1; + /* no rc check -- a no-INTR, no-TIMEOUT wait can't fail */ + l_wait_event(oscc->oscc_waitq, oscc_wait_for_objects(oscc, 1), &lwi); + + if (!oscc_has_objects(oscc, 1) && (oscc->oscc_flags & OSCC_FLAG_NOSPC)) + rc = -ENOSPC; + + if (oscc->oscc_obd->u.cli.cl_import->imp_invalid) + rc = -EIO; RETURN(rc); } +int oscc_recovering(struct osc_creator *oscc) +{ + int recov = 0; + + spin_lock(&oscc->oscc_lock); + recov = oscc->oscc_flags & OSCC_FLAG_RECOVERING; + spin_unlock(&oscc->oscc_lock); + + return recov; +} + int osc_create(struct obd_export *exp, struct obdo *oa, struct lov_stripe_md **ea, struct obd_trans_info *oti) { + struct lov_stripe_md *lsm; struct osc_creator *oscc = &exp->exp_obd->u.cli.cl_oscc; int try_again = 1, rc = 0; ENTRY; + LASSERT(oa); + LASSERT(ea); + + if ((oa->o_valid & OBD_MD_FLGROUP) && (oa->o_gr != 0)) + RETURN(osc_real_create(exp, oa, ea, oti)); + + if ((oa->o_valid & OBD_MD_FLFLAGS) && + oa->o_flags == OBD_FL_RECREATE_OBJS) { + RETURN(osc_real_create(exp, oa, ea, oti)); + } - LASSERT(oa != NULL); - LASSERT(ea != NULL); - /* this is the special case where create removes orphans */ - if (oa->o_valid & OBD_MD_FLFLAGS && oa->o_flags == OBD_FL_DELORPHAN) { + if ((oa->o_valid & OBD_MD_FLFLAGS) && + oa->o_flags == OBD_FL_DELORPHAN) { spin_lock(&oscc->oscc_lock); if (oscc->oscc_flags & OSCC_FLAG_SYNC_IN_PROGRESS) { spin_unlock(&oscc->oscc_lock); @@ -136,16 +268,15 @@ int osc_create(struct obd_export *exp, struct obdo *oa, spin_unlock(&oscc->oscc_lock); CDEBUG(D_HA, "%s: oscc recovery started\n", oscc->oscc_obd->obd_name); - LASSERT(oscc->oscc_flags & OSCC_FLAG_RECOVERING); + + /* delete from next_id on up */ + oa->o_valid |= OBD_MD_FLID; + oa->o_id = oscc->oscc_next_id - 1; CDEBUG(D_HA, "%s: deleting to next_id: "LPU64"\n", oscc->oscc_obd->obd_name, oa->o_id); rc = osc_real_create(exp, oa, ea, NULL); - if (oscc->oscc_obd == NULL) { - CWARN("the obd for oscc %p has been freed\n", oscc); - RETURN(rc); - } spin_lock(&oscc->oscc_lock); oscc->oscc_flags &= ~OSCC_FLAG_SYNC_IN_PROGRESS; @@ -153,6 +284,7 @@ int osc_create(struct obd_export *exp, struct obdo *oa, if (rc == -ENOSPC) oscc->oscc_flags |= OSCC_FLAG_NOSPC; oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING; + oscc->oscc_last_id = oa->o_id; CDEBUG(D_HA, "%s: oscc recovery finished: %d\n", oscc->oscc_obd->obd_name, rc); wake_up(&oscc->oscc_waitq); @@ -161,36 +293,21 @@ int osc_create(struct obd_export *exp, struct obdo *oa, oscc->oscc_obd->obd_name, rc); } spin_unlock(&oscc->oscc_lock); - RETURN(rc); - } - LASSERT(ergo(oa->o_valid & OBD_MD_FLFLAGS, - !!(oa->o_flags & OBD_FL_CREATE_CROW) != - !!(oa->o_flags & OBD_FL_RECREATE_OBJS))); - - /* perform urgent create if asked or import is not crow capable or - * ENOSPC case if detected. */ - if (OBDO_URGENT_CREATE(oa) || !IMP_CROW_ABLE(class_exp2cliimp(exp)) || - osc_check_nospc(exp)) { - CDEBUG(D_HA, "perform urgent create\n"); - oa->o_flags &= ~OBD_FL_CREATE_CROW; - if (!oa->o_flags) - oa->o_valid &= ~OBD_MD_FLFLAGS; - rc = osc_real_create(exp, oa, ea, oti); + RETURN(rc); } - /* check OST fs state. */ - rc = osc_check_state(exp); - if (rc) { - CDEBUG(D_HA,"OST is in bad shape to create objects, err %d\n", - rc); - RETURN(rc); + lsm = *ea; + if (lsm == NULL) { + rc = obd_alloc_memmd(exp, &lsm); + if (rc < 0) + RETURN(rc); } - + while (try_again) { - /* if orphans are being recovered, then we must wait until it is - * finished before we can continue with create. */ + /* If orphans are being recovered, then we must wait until + it is finished before we can continue with create. */ if (oscc_recovering(oscc)) { struct l_wait_info lwi; @@ -202,7 +319,7 @@ int osc_create(struct obd_export *exp, struct obdo *oa, !oscc_recovering(oscc), &lwi); LASSERT(rc == 0 || rc == -ETIMEDOUT); if (rc == -ETIMEDOUT) { - CDEBUG(D_HA, "%p: timeout waiting on recovery\n", + CDEBUG(D_HA,"%p: timeout waiting on recovery\n", oscc); RETURN(rc); } @@ -216,22 +333,30 @@ int osc_create(struct obd_export *exp, struct obdo *oa, break; } - if (oscc->oscc_flags & OSCC_FLAG_NOSPC) { + if (oscc->oscc_last_id >= oscc->oscc_next_id) { + memcpy(oa, &oscc->oscc_oa, sizeof(*oa)); + oa->o_id = oscc->oscc_next_id; + lsm->lsm_object_id = oscc->oscc_next_id; + *ea = lsm; + oscc->oscc_next_id++; + try_again = 0; + } else if (oscc->oscc_flags & OSCC_FLAG_NOSPC) { rc = -ENOSPC; spin_unlock(&oscc->oscc_lock); break; } - - oscc->oscc_next_id++; - oa->o_id = oscc->oscc_next_id; - try_again = 0; spin_unlock(&oscc->oscc_lock); + rc = oscc_precreate(oscc, try_again); + if (rc) + break; + } + if (rc == 0) CDEBUG(D_HA, "%s: returning objid "LPU64"\n", oscc->oscc_obd->u.cli.cl_import->imp_target_uuid.uuid, - oa->o_id); - } - + lsm->lsm_object_id); + else if (*ea == NULL) + obd_free_memmd(exp, &lsm); RETURN(rc); } @@ -243,10 +368,17 @@ void oscc_init(struct obd_device *obd) return; oscc = &obd->u.cli.cl_oscc; - memset(oscc, 0, sizeof(*oscc)); - oscc->oscc_obd = obd; + memset(oscc, 0, sizeof(*oscc)); + INIT_LIST_HEAD(&oscc->oscc_list); + init_waitqueue_head(&oscc->oscc_waitq); spin_lock_init(&oscc->oscc_lock); + oscc->oscc_obd = obd; + oscc->oscc_grow_count = OST_MIN_PRECREATE; + + oscc->oscc_next_id = 2; + oscc->oscc_last_id = 1; oscc->oscc_flags |= OSCC_FLAG_RECOVERING; - init_waitqueue_head(&oscc->oscc_waitq); + /* XXX the export handle should give the oscc the last object */ + /* oed->oed_oscc.oscc_last_id = exph->....; */ } diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index 9396dd0..c65b2ee 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -315,7 +315,6 @@ static int osc_setattr_async(struct obd_export *exp, struct obdo *oa, int osc_real_create(struct obd_export *exp, struct obdo *oa, struct lov_stripe_md **ea, struct obd_trans_info *oti) { - struct osc_creator *oscc = &exp->exp_obd->u.cli.cl_oscc; struct ptlrpc_request *request; struct ost_body *body; struct lov_stripe_md *lsm; @@ -361,16 +360,6 @@ int osc_real_create(struct obd_export *exp, struct obdo *oa, GOTO (out_req, rc = -EPROTO); } - if ((oa->o_valid & OBD_MD_FLFLAGS) && oa->o_flags == OBD_FL_DELORPHAN) { - struct obd_import *imp = class_exp2cliimp(exp); - /* MDS declares last known object, OSS responses - * with next possible object -bzzz */ - spin_lock(&oscc->oscc_lock); - oscc->oscc_next_id = body->oa.o_id; - spin_unlock(&oscc->oscc_lock); - CDEBUG(D_HA, "%s: set nextid "LPD64" after recovery\n", - imp->imp_target_uuid.uuid, oa->o_id); - } memcpy(oa, &body->oa, sizeof(*oa)); /* This should really be sent by the OST */ @@ -3061,6 +3050,17 @@ static int osc_set_info(struct obd_export *exp, obd_count keylen, OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10); + if (KEY_IS("next_id")) { + if (vallen != sizeof(obd_id)) + RETURN(-EINVAL); + obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1; + CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n", + exp->exp_obd->obd_name, + obd->u.cli.cl_oscc.oscc_next_id); + + RETURN(0); + } + if (KEY_IS("unlinked")) { struct osc_creator *oscc = &obd->u.cli.cl_oscc; spin_lock(&oscc->oscc_lock); @@ -3069,7 +3069,6 @@ static int osc_set_info(struct obd_export *exp, obd_count keylen, RETURN(0); } - if (KEY_IS("initial_recov")) { struct obd_import *imp = exp->exp_obd->u.cli.cl_import; if (vallen != sizeof(int)) diff --git a/lustre/ptlrpc/import.c b/lustre/ptlrpc/import.c index 4ab7435..5adf67b 100644 --- a/lustre/ptlrpc/import.c +++ b/lustre/ptlrpc/import.c @@ -587,11 +587,6 @@ finish: exp->exp_connect_flags = ocd->ocd_connect_flags; class_export_put(exp); - if (IMP_CROW_ABLE(imp)) { - CDEBUG(D_HA, "connected to CROW capable target: %s\n", - imp->imp_target_uuid.uuid); - } - obd_import_event(imp->imp_obd, imp, IMP_EVENT_OCD); if ((ocd->ocd_connect_flags & OBD_CONNECT_VERSION) && diff --git a/lustre/ptlrpc/pack_generic.c b/lustre/ptlrpc/pack_generic.c index 72ce472..b44a60d 100644 --- a/lustre/ptlrpc/pack_generic.c +++ b/lustre/ptlrpc/pack_generic.c @@ -935,8 +935,8 @@ void lustre_swab_qdata(struct qunit_data *d) void lustre_assert_wire_constants(void) { /* Wire protocol assertions generated by 'wirecheck' - * running on Linux schatzie.adilger.int 2.6.12-1.1381_FC3 #1 Fri Oct 21 03:46:55 EDT 2005 i6 - * with gcc version 3.3.4 20040817 (Red Hat Linux 3.3.4-2) */ + * running on Linux beetle 2.6.12-10-686 #1 Fri Nov 18 12:09:04 UTC 2005 i686 GNU/Linux + * with gcc version 3.3.6 (Ubuntu 1:3.3.6-8ubuntu1) */ /* Constants... */ @@ -1748,6 +1748,18 @@ void lustre_assert_wire_constants(void) (long long)(int)offsetof(struct mds_body, aclsize)); LASSERTF((int)sizeof(((struct mds_body *)0)->aclsize) == 4, " found %lld\n", (long long)(int)sizeof(((struct mds_body *)0)->aclsize)); + LASSERTF((int)offsetof(struct mds_body, padding_2) == 156, " found %lld\n", + (long long)(int)offsetof(struct mds_body, padding_2)); + LASSERTF((int)sizeof(((struct mds_body *)0)->padding_2) == 4, " found %lld\n", + (long long)(int)sizeof(((struct mds_body *)0)->padding_2)); + LASSERTF((int)offsetof(struct mds_body, padding_3) == 160, " found %lld\n", + (long long)(int)offsetof(struct mds_body, padding_3)); + LASSERTF((int)sizeof(((struct mds_body *)0)->padding_3) == 4, " found %lld\n", + (long long)(int)sizeof(((struct mds_body *)0)->padding_3)); + LASSERTF((int)offsetof(struct mds_body, padding_4) == 164, " found %lld\n", + (long long)(int)offsetof(struct mds_body, padding_4)); + LASSERTF((int)sizeof(((struct mds_body *)0)->padding_4) == 4, " found %lld\n", + (long long)(int)sizeof(((struct mds_body *)0)->padding_4)); LASSERTF(FMODE_READ == 1, " found %lld\n", (long long)FMODE_READ); LASSERTF(FMODE_WRITE == 2, " found %lld\n", diff --git a/lustre/ptlrpc/pinger.c b/lustre/ptlrpc/pinger.c index 21bb6f3..733ce98 100644 --- a/lustre/ptlrpc/pinger.c +++ b/lustre/ptlrpc/pinger.c @@ -65,70 +65,10 @@ int ptlrpc_ping(struct obd_import *imp) RETURN(rc); } -static int ptlrpc_statfs_interpret(struct ptlrpc_request *req, - void *data, int rc) -{ - struct obd_statfs *msfs; - struct obd_device *obd; - ENTRY; - - if (rc) - RETURN(rc); - - if (!req->rq_repmsg) - RETURN(-EPROTO); - - msfs = lustre_swab_repbuf(req, 0, sizeof(*msfs), - lustre_swab_obd_statfs); - if (msfs == NULL) - RETURN(-EPROTO); - - obd = req->rq_import->imp_obd; - - spin_lock(&obd->obd_osfs_lock); - obd->obd_osfs = *msfs; - obd->obd_osfs_age = jiffies; - spin_unlock(&obd->obd_osfs_lock); - - RETURN(0); -} - -int ptlrpc_statfs(struct obd_import *imp) -{ - int size = sizeof(struct obd_statfs); - struct ptlrpc_request *req; - ENTRY; - - req = ptlrpc_prep_req(imp, LUSTRE_OST_VERSION, OST_STATFS, 0, - NULL, NULL); - if (!req) { - CERROR("OOM trying to ping %s->%s\n", - imp->imp_obd->obd_uuid.uuid, - imp->imp_target_uuid.uuid); - RETURN(-ENOMEM); - } - - DEBUG_REQ(D_INFO, req, "pinging %s->%s", - imp->imp_obd->obd_uuid.uuid, - imp->imp_target_uuid.uuid); - - req->rq_interpret_reply = ptlrpc_statfs_interpret; - req->rq_replen = lustre_msg_size(1, &size); - req->rq_no_resend = req->rq_no_delay = 1; - ptlrpcd_add_req(req); - - RETURN(0); -} - static void ptlrpc_update_next_ping(struct obd_import *imp) { - __u32 interval; - - interval = IMP_CROW_ABLE(imp) ? - STATFS_INTERVAL : PING_INTERVAL; - imp->imp_next_ping = jiffies + HZ * - (imp->imp_state == LUSTRE_IMP_DISCON ? 10 : interval); + (imp->imp_state == LUSTRE_IMP_DISCON ? 10 : PING_INTERVAL); } void ptlrpc_ping_import_soon(struct obd_import *imp) @@ -164,11 +104,9 @@ static int ptlrpc_pinger_main(void *arg) /* And now, loop forever, pinging as needed. */ while (1) { - unsigned long sleep_interval = PING_INTERVAL; - unsigned long update_interval = 0; unsigned long this_ping = jiffies; - struct l_wait_info lwi; long time_to_next_ping; + struct l_wait_info lwi; struct list_head *iter; down(&pinger_sem); @@ -179,9 +117,6 @@ static int ptlrpc_pinger_main(void *arg) int force, level; unsigned long flags; - if (IMP_CROW_ABLE(imp)) - sleep_interval = STATFS_INTERVAL; - spin_lock_irqsave(&imp->imp_lock, flags); level = imp->imp_state; force = imp->imp_force_verify; @@ -215,10 +150,7 @@ static int ptlrpc_pinger_main(void *arg) imp->imp_deactive, imp->imp_obd->obd_no_recov); } else if (imp->imp_pingable || force) { - if (IMP_CROW_ABLE(imp)) - ptlrpc_statfs(imp); - else - ptlrpc_ping(imp); + ptlrpc_ping(imp); } } else { if (!imp->imp_pingable) @@ -229,24 +161,15 @@ static int ptlrpc_pinger_main(void *arg) imp->imp_next_ping, this_ping); } - /* using here new calculated @update_interval, as - * sleep_interval holds minimal of possible intervals - * over pingable imports. */ - update_interval = IMP_CROW_ABLE(imp) ? - STATFS_INTERVAL : PING_INTERVAL; - /* obd_timeout might have changed */ if (time_after(imp->imp_next_ping, - this_ping + update_interval * HZ)) + this_ping + PING_INTERVAL * HZ)) ptlrpc_update_next_ping(imp); } up(&pinger_sem); - /* Wait until the next ping time, or until we're stopped. We - * sleep here smaller interval of two possible (ping or - * statfs). If one of imports is CROW capable we'll sleep - * STATFS_INTERVAL and PING_INTERVAL otherwise. */ - time_to_next_ping = this_ping + (sleep_interval * HZ) - jiffies; + /* Wait until the next ping time, or until we're stopped. */ + time_to_next_ping = this_ping + (PING_INTERVAL * HZ) - jiffies; /* The ping sent by ptlrpc_send_rpc may get sent out say .01 second after this. @@ -255,7 +178,7 @@ static int ptlrpc_pinger_main(void *arg) we will SKIP the next ping at next_ping, and the ping will get sent 2 timeouts from now! Beware. */ CDEBUG(D_INFO, "next ping in %lu (%lu)\n", time_to_next_ping, - this_ping + sleep_interval * HZ); + this_ping + PING_INTERVAL * HZ); if (time_to_next_ping > 0) { lwi = LWI_TIMEOUT(max_t(long, time_to_next_ping, HZ), NULL, NULL); diff --git a/lustre/tests/replay-dual.sh b/lustre/tests/replay-dual.sh index b71f318..785a664 100755 --- a/lustre/tests/replay-dual.sh +++ b/lustre/tests/replay-dual.sh @@ -2,8 +2,8 @@ set -e -# bug 6088 -ALWAYS_EXCEPT="8 $REPLAY_DUAL_EXCEPT" +# bug 6088 9761 (CROW related) +ALWAYS_EXCEPT="8 15a 15b 15c $REPLAY_DUAL_EXCEPT" LUSTRE=${LUSTRE:-`dirname $0`/..} . $LUSTRE/tests/test-framework.sh diff --git a/lustre/tests/replay-single.sh b/lustre/tests/replay-single.sh index 99f2b5f..988e9b4 100755 --- a/lustre/tests/replay-single.sh +++ b/lustre/tests/replay-single.sh @@ -14,8 +14,8 @@ init_test_env $@ . ${CONFIG:=$LUSTRE/tests/cfg/local.sh} # Skip these tests -# bug number: 2766 4176 -ALWAYS_EXCEPT="0b 39 48 $REPLAY_SINGLE_EXCEPT" +# bug number: 2766 4176 9761 (CROW related) +ALWAYS_EXCEPT="0b 1a 39 $REPLAY_SINGLE_EXCEPT" gen_config() { rm -f $XMLCONFIG diff --git a/lustre/utils/wiretest.c b/lustre/utils/wiretest.c index 0a56ab2..0db15f0 100644 --- a/lustre/utils/wiretest.c +++ b/lustre/utils/wiretest.c @@ -26,8 +26,8 @@ int main() void lustre_assert_wire_constants(void) { /* Wire protocol assertions generated by 'wirecheck' - * running on Linux schatzie.adilger.int 2.6.12-1.1381_FC3 #1 Fri Oct 21 03:46:55 EDT 2005 i6 - * with gcc version 3.3.4 20040817 (Red Hat Linux 3.3.4-2) */ + * running on Linux beetle 2.6.12-10-686 #1 Fri Nov 18 12:09:04 UTC 2005 i686 GNU/Linux + * with gcc version 3.3.6 (Ubuntu 1:3.3.6-8ubuntu1) */ /* Constants... */ @@ -839,6 +839,18 @@ void lustre_assert_wire_constants(void) (long long)(int)offsetof(struct mds_body, aclsize)); LASSERTF((int)sizeof(((struct mds_body *)0)->aclsize) == 4, " found %lld\n", (long long)(int)sizeof(((struct mds_body *)0)->aclsize)); + LASSERTF((int)offsetof(struct mds_body, padding_2) == 156, " found %lld\n", + (long long)(int)offsetof(struct mds_body, padding_2)); + LASSERTF((int)sizeof(((struct mds_body *)0)->padding_2) == 4, " found %lld\n", + (long long)(int)sizeof(((struct mds_body *)0)->padding_2)); + LASSERTF((int)offsetof(struct mds_body, padding_3) == 160, " found %lld\n", + (long long)(int)offsetof(struct mds_body, padding_3)); + LASSERTF((int)sizeof(((struct mds_body *)0)->padding_3) == 4, " found %lld\n", + (long long)(int)sizeof(((struct mds_body *)0)->padding_3)); + LASSERTF((int)offsetof(struct mds_body, padding_4) == 164, " found %lld\n", + (long long)(int)offsetof(struct mds_body, padding_4)); + LASSERTF((int)sizeof(((struct mds_body *)0)->padding_4) == 4, " found %lld\n", + (long long)(int)sizeof(((struct mds_body *)0)->padding_4)); LASSERTF(FMODE_READ == 1, " found %lld\n", (long long)FMODE_READ); LASSERTF(FMODE_WRITE == 2, " found %lld\n", -- 1.8.3.1