* bug fixes
Severity : enhancement
-Bugzilla : 8888
-Description: Introduced CReate On Write (CROW)
-Details : CROW is improved create approach, which defers OST objects
- creates to the time when they realy needed. This is when client
- wants to perform first write to file for instance. Or when object
- changes some of its attributes stored on OST. This should improve
- create rate.
-
-Severity : enhancement
Bugzilla : 7981/8208
Description: Introduced Lustre Networking (LNET)
Details : LNET is new networking infrastructure for Lustre, it includes
struct osc_creator {
spinlock_t oscc_lock;
+ struct list_head oscc_list;
struct obd_device *oscc_obd;
+ obd_id oscc_last_id;//last available pre-created object
+ obd_id oscc_next_id;// what object id to give out next
+ int oscc_grow_count;
+ struct obdo oscc_oa;
int oscc_flags;
- obd_id oscc_next_id;
- wait_queue_head_t oscc_waitq;
+ wait_queue_head_t oscc_waitq; /* creating procs wait on this */
};
struct ldlm_export_data {
#define OBD_CONNECT_REQPORTAL 0x40ULL /* Separate portal for non-IO reqs */
#define OBD_CONNECT_ACL 0x80ULL /* client using access control lists */
#define OBD_CONNECT_XATTR 0x100ULL /* client using extended attributes*/
+
+
#define OBD_CONNECT_CROW 0x200ULL /* MDS+OST do object create-on-write */
#define OBD_CONNECT_TRUNCLOCK 0x400ULL /* server gets locks for punch b=9528 */
#define OBD_CONNECT_TRANSNO 0x800ULL /* replay is sending initial transno */
OBD_CONNECT_IBITS | OBD_CONNECT_JOIN)
#define OST_CONNECT_SUPPORTED (OBD_CONNECT_SRVLOCK | OBD_CONNECT_GRANT | \
OBD_CONNECT_REQPORTAL | OBD_CONNECT_VERSION | \
- OBD_CONNECT_TRUNCLOCK | OBD_CONNECT_CROW)
+ OBD_CONNECT_TRUNCLOCK)
#define ECHO_CONNECT_SUPPORTED (0)
#define OBD_OCD_VERSION(major,minor,patch,fix) (((major)<<24) + ((minor)<<16) +\
#define OBD_FL_DEBUG_CHECK (0x00000040) /* echo client/server debug check */
#define OBD_FL_NO_USRQUOTA (0x00000100) /* the object's owner is over quota */
#define OBD_FL_NO_GRPQUOTA (0x00000200) /* the object's group is over quota */
-#define OBD_FL_CREATE_CROW (0x00000400) /* object should be created with crow */
+#define OBD_FL_CREATE_CROW (0x00000400) /* object swhould be created with crow */
+
/*
* set this to delegate DLM locking during obd_punch() to the OSTs. Only OSTs
* that declared OBD_CONNECT_TRUNCLOCK in their connect flags support this
#define o_dropped o_misc
#define o_cksum o_nlink
-#define OBDO_URGENT_CREATE(oa) \
- (!((oa)->o_valid & OBD_MD_FLFLAGS) || \
- !((oa)->o_flags & OBD_FL_CREATE_CROW) || \
- ((oa)->o_flags & OBD_FL_RECREATE_OBJS))
-
extern void lustre_swab_obdo (struct obdo *o);
__u64 imp_connect_flags_orig;
};
-#define IMP_CROW_ABLE(imp) \
- ((imp)->imp_connect_data.ocd_connect_flags & OBD_CONNECT_CROW)
-
typedef void (*obd_import_callback)(struct obd_import *imp, void *closure,
int event, void *event_arg, void *cb_data);
__u64 fo_mount_count;
int fo_destroy_in_progress;
+ struct semaphore fo_create_lock;
struct file_operations *fo_fop;
struct inode_operations *fo_iop;
struct lustre_quota_ctxt fo_quota_ctxt;
spinlock_t fo_quotacheck_lock;
atomic_t fo_quotachecking;
-
- /* objids black list stuff. See for detailed comment in
- * filter_clear_orphans() */
- struct filter_ext *fo_blacklist;
- spinlock_t fo_blacklist_lock;
};
struct mds_server_data;
extern unsigned int obd_dump_on_timeout;
extern unsigned int obd_timeout; /* seconds */
#define PING_INTERVAL max(obd_timeout / 4, 1U)
-#define STATFS_INTERVAL max(obd_timeout / 20, 1U)
extern unsigned int ldlm_timeout;
extern unsigned int obd_health_check_timeout;
extern char obd_lustre_upcall[128];
#define OBD_FAIL_MDC_REVALIDATE_PAUSE 0x800
-#define OBD_FAIL_OST_CROW_EIO 0x801
-#define OBD_FAIL_OST_CLEAR_ORPHANS_RACE 0x802
-
/* preparation for a more advanced failure testbed (not functional yet) */
#define OBD_FAIL_MASK_SYS 0x0000FF00
#define OBD_FAIL_MASK_LOC (0x000000FF | OBD_FAIL_MASK_SYS)
continue;
memcpy(tmp_oa, src_oa, sizeof(*tmp_oa));
- tmp_oa->o_valid |= OBD_MD_FLID;
- tmp_oa->o_id = oti->oti_objid[i];
LASSERT(lov->tgts[i].ltd_exp);
/* XXX: LOV STACKING: use real "obj_mdp" sub-data */
RETURN(rc);
}
+static int lov_recreate(struct obd_export *exp, struct obdo *src_oa,
+ struct lov_stripe_md **ea, struct obd_trans_info *oti)
+{
+ struct lov_stripe_md *obj_mdp, *lsm;
+ struct lov_obd *lov = &exp->exp_obd->u.lov;
+ unsigned ost_idx;
+ int rc, i;
+ ENTRY;
+
+ LASSERT(src_oa->o_valid & OBD_MD_FLFLAGS &&
+ src_oa->o_flags & OBD_FL_RECREATE_OBJS);
+
+ OBD_ALLOC(obj_mdp, sizeof(*obj_mdp));
+ if (obj_mdp == NULL)
+ RETURN(-ENOMEM);
+
+ ost_idx = src_oa->o_nlink;
+ lsm = *ea;
+ if (lsm == NULL)
+ GOTO(out, rc = -EINVAL);
+ if (ost_idx >= lov->desc.ld_tgt_count)
+ GOTO(out, rc = -EINVAL);
+
+ for (i = 0; i < lsm->lsm_stripe_count; i++) {
+ if (lsm->lsm_oinfo[i].loi_ost_idx == ost_idx) {
+ if (lsm->lsm_oinfo[i].loi_id != src_oa->o_id)
+ GOTO(out, rc = -EINVAL);
+ break;
+ }
+ }
+ if (i == lsm->lsm_stripe_count)
+ GOTO(out, rc = -EINVAL);
+
+ rc = obd_create(lov->tgts[ost_idx].ltd_exp, src_oa, &obj_mdp, oti);
+out:
+ OBD_FREE(obj_mdp, sizeof(*obj_mdp));
+ RETURN(rc);
+}
+
/* the LOV expects oa->o_id to be set to the LOV object id */
-static int
-lov_create(struct obd_export *exp, struct obdo *src_oa,
+static int lov_create(struct obd_export *exp, struct obdo *src_oa,
struct lov_stripe_md **ea, struct obd_trans_info *oti)
{
+ struct lov_obd *lov;
struct lov_request_set *set = NULL;
struct list_head *pos;
- struct lov_obd *lov;
int rc = 0;
ENTRY;
RETURN(rc);
}
- LASSERT(ergo(src_oa->o_valid & OBD_MD_FLFLAGS,
- !!(src_oa->o_flags & OBD_FL_CREATE_CROW) !=
- !!(src_oa->o_flags & OBD_FL_RECREATE_OBJS)));
-
lov = &exp->exp_obd->u.lov;
if (!lov->desc.ld_active_tgt_count)
RETURN(-EIO);
+ /* Recreate a specific object id at the given OST index */
+ if ((src_oa->o_valid & OBD_MD_FLFLAGS) &&
+ (src_oa->o_flags & OBD_FL_RECREATE_OBJS)) {
+ rc = lov_recreate(exp, src_oa, ea, oti);
+ RETURN(rc);
+ }
+
rc = lov_prep_create_set(exp, ea, src_oa, oti, &set);
if (rc)
RETURN(rc);
}
unlock_kernel();
-
return rc;
}
LASSERT(!obd->obd_recovering);
LASSERT(llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT) != NULL);
+ /* set nextid first, so we are sure it happens */
+ rc = mds_lov_set_nextid(obd);
+ if (rc) {
+ CERROR("%s: mds_lov_set_nextid failed\n",
+ obd->obd_name);
+ GOTO(out, rc);
+ }
+
/* clean PENDING dir */
rc = mds_cleanup_pending(obd);
if (rc < 0) {
int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
{
- struct lov_stripe_md *empty_ea = NULL;
- struct obd_trans_info oti = { 0 };
- struct obdo *oa;
int rc;
+ struct obdo oa;
+ struct obd_trans_info oti = {0};
+ struct lov_stripe_md *empty_ea = NULL;
ENTRY;
LASSERT(mds->mds_lov_objids != NULL);
- oa = obdo_alloc();
- if (oa == NULL)
- RETURN(-ENOMEM);
-
- oa->o_valid = OBD_MD_FLFLAGS;
- oa->o_flags = OBD_FL_DELORPHAN;
-
+ /* This create will in fact either create or destroy: If the OST is
+ * missing objects below this ID, they will be created. If it finds
+ * objects above this ID, they will be removed. */
+ memset(&oa, 0, sizeof(oa));
+ oa.o_valid = OBD_MD_FLFLAGS;
+ oa.o_flags = OBD_FL_DELORPHAN;
if (ost_uuid != NULL) {
- memcpy(&oa->o_inline, ost_uuid, sizeof(*ost_uuid));
- oa->o_valid |= OBD_MD_FLINLINE;
+ memcpy(&oa.o_inline, ost_uuid, sizeof(*ost_uuid));
+ oa.o_valid |= OBD_MD_FLINLINE;
}
+ rc = obd_create(mds->mds_osc_exp, &oa, &empty_ea, &oti);
+
+ RETURN(rc);
+}
+
+/* update the LOV-OSC knowledge of the last used object id's */
+int mds_lov_set_nextid(struct obd_device *obd)
+{
+ struct mds_obd *mds = &obd->u.mds;
+ int rc;
+ ENTRY;
- oti.oti_objid = mds->mds_lov_objids;
- rc = obd_create(mds->mds_osc_exp, oa, &empty_ea, &oti);
+ LASSERT(!obd->obd_recovering);
- obdo_free(oa);
+ LASSERT(mds->mds_lov_objids != NULL);
+
+ rc = obd_set_info(mds->mds_osc_exp, strlen("next_id"), "next_id",
+ mds->mds_lov_desc.ld_tgt_count, mds->mds_lov_objids);
RETURN(rc);
}
/* update the LOV-OSC knowledge of the last used object id's */
int mds_lov_connect(struct obd_device *obd, char * lov_name)
{
- struct obd_connect_data *data = NULL;
struct mds_obd *mds = &obd->u.mds;
struct lustre_handle conn = {0,};
int valsize;
RETURN(-ENOTCONN);
}
- OBD_ALLOC_PTR(data);
- if (!data)
- RETURN(-ENOMEM);
- data->ocd_connect_flags = OBD_CONNECT_CROW;
-
rc = obd_connect(&conn, mds->mds_osc_obd, &obd->obd_uuid,
- data);
- OBD_FREE_PTR(data);
-
+ NULL /* obd_connect_data */);
if (rc) {
CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
mds->mds_osc_obd = ERR_PTR(rc);
oa->o_gid = 0;
oa->o_mode = S_IFREG | 0600;
oa->o_id = inode->i_ino;
- oa->o_flags = OBD_FL_CREATE_CROW;
oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLFLAGS |
OBD_MD_FLMODE | OBD_MD_FLUID | OBD_MD_FLGID;
oa->o_size = 0;
oa->o_generation = body->fid1.generation;
oa->o_valid |= OBD_MD_FLFID | OBD_MD_FLGENER;
- /* do not set CROW flag in setattr path as it is not needed
- * there and only confuses setattr code in filter. */
- oa->o_flags &= ~OBD_FL_CREATE_CROW;
- if (!oa->o_flags)
- oa->o_valid &= ~OBD_MD_FLFLAGS;
-
rc = obd_setattr(mds->mds_osc_exp, oa, lsm, &oti);
if (rc) {
CERROR("error setting attrs for inode %lu: rc %d\n",
struct ptlrpc_request *req,
struct lustre_handle *lh)
{
+ unsigned int ia_valid = rec->ur_iattr.ia_valid;
struct mds_obd *mds = mds_req2mds(req);
struct obd_device *obd = req->rq_export->exp_obd;
- unsigned int ia_valid = rec->ur_iattr.ia_valid;
struct mds_body *body;
struct dentry *de;
struct inode *inode = NULL;
mds_pack_inode2fid(&body->fid1, inode);
mds_pack_inode2body(body, inode);
- /* don't return OST-specific attributes if we didn't just set them. Use
- * saved ->ia_valid here, as rec->ur_iattr.ia_valid gets rewritten by
- * fsfilt_setattr() what breaks case of truncating file with no object
- * on OST and no lsm (test_34c from sanity.sh). --umka */
+ /* don't return OST-specific attributes if we didn't just set them. */
if (ia_valid & ATTR_SIZE)
body->valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
if (ia_valid & (ATTR_MTIME | ATTR_MTIME_SET))
int i;
ENTRY;
- if (filter->fo_blacklist != NULL) {
- OBD_FREE(filter->fo_blacklist,
- FILTER_GROUPS * sizeof(struct filter_ext));
- filter->fo_blacklist = NULL;
- }
-
if (filter->fo_dentry_O_groups != NULL) {
for (i = 0; i < FILTER_GROUPS; i++) {
dentry = filter->fo_dentry_O_groups[i];
int i, rc = 0, cleanup_phase = 0;
ENTRY;
- OBD_ALLOC(filter->fo_blacklist,
- FILTER_GROUPS * sizeof(struct filter_ext));
- if (!filter->fo_blacklist)
- GOTO(cleanup, rc = -ENOMEM);
-
O_dentry = simple_mkdir(current->fs->pwd, "O", 0700, 1);
CDEBUG(D_INODE, "got/created O: %p\n", O_dentry);
if (IS_ERR(O_dentry)) {
pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
}
-static void filter_set_last_id(struct filter_obd *filter,
- int group, obd_id id)
+static void filter_set_last_id(struct filter_obd *filter, struct obdo *oa,
+ obd_id id)
{
+ obd_gr group = 0;
LASSERT(filter->fo_fsd != NULL);
- LASSERT(group <= FILTER_GROUPS);
- spin_lock(&filter->fo_objidlock);
- filter->fo_last_objids[group] = id;
- spin_unlock(&filter->fo_objidlock);
-}
-
-static void filter_grow_last_id(struct filter_obd *filter,
- int group, obd_id id)
-{
- LASSERT(filter->fo_fsd != NULL);
- LASSERT(group <= FILTER_GROUPS);
+ if (oa != NULL) {
+ LASSERT(oa->o_gr <= FILTER_GROUPS);
+ group = oa->o_gr;
+ }
spin_lock(&filter->fo_objidlock);
- if (id > filter->fo_last_objids[group])
filter->fo_last_objids[group] = id;
spin_unlock(&filter->fo_objidlock);
}
-__u64 filter_last_id(struct filter_obd *filter, int group)
+__u64 filter_last_id(struct filter_obd *filter, struct obdo *oa)
{
obd_id id;
+ obd_gr group = 0;
LASSERT(filter->fo_fsd != NULL);
- LASSERT(group < FILTER_GROUPS);
+ if (oa != NULL) {
+ LASSERT(oa->o_gr <= FILTER_GROUPS);
+ group = oa->o_gr;
+ }
+
+ /* FIXME: object groups */
spin_lock(&filter->fo_objidlock);
id = filter->fo_last_objids[group];
spin_unlock(&filter->fo_objidlock);
return id;
}
-static void filter_lock_dentry(struct obd_device *obd,
- struct dentry *dparent)
+static int filter_lock_dentry(struct obd_device *obd, struct dentry *dparent)
{
down(&dparent->d_inode->i_sem);
+ return 0;
}
-static void filter_unlock_dentry(struct obd_device *obd,
- struct dentry *dparent)
-{
- up(&dparent->d_inode->i_sem);
-}
-
-static void filter_parents_access(struct obd_device *obd,
- obd_gr group, int lock)
-{
- void (*access_func) (struct obd_device *, struct dentry *);
- struct filter_obd *filter = &obd->u.filter;
- struct dentry *dparent;
- int i = 0;
-
- access_func = lock ? filter_lock_dentry :
- filter_unlock_dentry;
-
- if (group > 0 || filter->fo_subdir_count == 0) {
- dparent = filter->fo_dentry_O_groups[group];
- access_func(obd, dparent);
- } else {
- for (i = 0; i < filter->fo_subdir_count; i++) {
- dparent = filter->fo_dentry_O_sub[i];
- access_func(obd, dparent);
- }
- }
-}
-
-#define LOCK_PARENTS(obd, group) \
- filter_parents_access(obd, group, 1)
-
-#define UNLOCK_PARENTS(obd, group) \
- filter_parents_access(obd, group, 0)
-
/* We never dget the object parent, so DON'T dput it either */
struct dentry *filter_parent(struct obd_device *obd, obd_gr group, obd_id objid)
{
struct dentry *filter_parent_lock(struct obd_device *obd, obd_gr group,
obd_id objid)
{
- struct dentry *dparent = filter_parent(obd, group, objid);
unsigned long now = jiffies;
+ struct dentry *dparent = filter_parent(obd, group, objid);
+ int rc;
if (IS_ERR(dparent))
return dparent;
- filter_lock_dentry(obd, dparent);
+ rc = filter_lock_dentry(obd, dparent);
fsfilt_check_slow(now, obd_timeout, "parent lock");
- return dparent;
+ return rc ? ERR_PTR(rc) : dparent;
}
-/* we never dget the object parent, so DON'T dput it either */
-static void filter_parent_unlock(struct obd_device *obd,
- struct dentry *dparent)
+/* We never dget the object parent, so DON'T dput it either */
+static void filter_parent_unlock(struct dentry *dparent)
{
- filter_unlock_dentry(obd, dparent);
+ up(&dparent->d_inode->i_sem);
}
/* How to get files, dentries, inodes from object id's.
dparent->d_name.len, dparent->d_name.name, name);
dchild = /*ll_*/lookup_one_len(name, dparent, len);
if (dir_dentry == NULL)
- filter_parent_unlock(obd, dparent);
+ filter_parent_unlock(dparent);
if (IS_ERR(dchild)) {
CERROR("%s: child lookup error %ld\n", obd->obd_name,
PTR_ERR(dchild));
/* Caller must hold LCK_PW on parent and push us into kernel context.
* Caller is also required to ensure that dchild->d_inode exists. */
-static int filter_unlink(struct obd_device *obd, obd_id objid,
- struct dentry *dparent, struct dentry *dchild)
+static int filter_destroy_internal(struct obd_device *obd, obd_id objid,
+ struct dentry *dparent,
+ struct dentry *dchild)
{
struct inode *inode = dchild->d_inode;
int rc;
GOTO(err_mntput, rc);
filter->fo_destroy_in_progress = 0;
-
- spin_lock_init(&filter->fo_blacklist_lock);
+ sema_init(&filter->fo_create_lock, 1);
+
spin_lock_init(&filter->fo_translock);
spin_lock_init(&filter->fo_objidlock);
spin_lock_init(&filter->fo_stats_lock);
static int filter_connect_internal(struct obd_export *exp,
struct obd_connect_data *data)
{
- struct filter_obd *filter = &exp->exp_obd->u.filter;
if (data != NULL) {
CDEBUG(D_RPCTRACE, "%s: cli %s/%p ocd_connect_flags: "LPX64
" ocd_version: %x ocd_grant: %d\n",
exp->exp_connect_flags = data->ocd_connect_flags;
data->ocd_version = LUSTRE_VERSION_CODE;
- if (!(filter->fo_fsd->fsd_feature_rocompat &
- cpu_to_le32(OBD_ROCOMPAT_CROW)) &&
- data->ocd_connect_flags & OBD_CONNECT_CROW) {
- filter->fo_fsd->fsd_feature_rocompat |=
- cpu_to_le32(OBD_ROCOMPAT_CROW);
- filter_update_server_data(exp->exp_obd,
- filter->fo_rcvd_filp,
- filter->fo_fsd, 1);
- }
-
if (exp->exp_connect_flags & OBD_CONNECT_GRANT) {
obd_size left, want;
RETURN(rc);
}
+/* this should be enabled/disabled in condition to enabled/disabled large inodes
+ * in backing store FS. */
+int filter_update_fidea(struct obd_export *exp, struct inode *inode,
+ void *handle, struct obdo *oa)
+{
+ struct obd_device *obd = exp->exp_obd;
+ int rc = 0;
+ ENTRY;
+
+ if (oa->o_valid & OBD_MD_FLFID) {
+ struct filter_fid ff;
+ obd_gr group = 0;
+
+ if (oa->o_valid & OBD_MD_FLGROUP)
+ group = oa->o_gr;
+
+ /* packing fid and converting it to LE for storing into EA. Here
+ * ->o_stripe_idx should be filled by LOV and rest of fields -
+ * by client. */
+ ff.ff_fid.id = cpu_to_le64(oa->o_fid);
+ ff.ff_fid.f_type = cpu_to_le32(oa->o_stripe_idx);
+ ff.ff_fid.generation = cpu_to_le32(oa->o_generation);
+ ff.ff_objid = cpu_to_le64(oa->o_id);
+ ff.ff_group = cpu_to_le64(group);
+
+ CDEBUG(D_INODE, "storing filter fid EA ("LPU64"/%u/%u"
+ LPU64"/"LPU64")\n", oa->o_fid, oa->o_stripe_idx,
+ oa->o_generation, oa->o_id, group);
+
+ rc = fsfilt_set_md(obd, inode, handle, &ff, sizeof(ff));
+ if (rc)
+ CERROR("store fid in object failed! rc: %d\n", rc);
+ } else {
+ CDEBUG(D_HA, "OSS object without fid info!\n");
+ }
+
+ RETURN(rc);
+}
+
/* this is called from filter_truncate() until we have filter_punch() */
int filter_setattr_internal(struct obd_export *exp, struct dentry *dentry,
struct obdo *oa, struct obd_trans_info *oti)
unsigned int orig_ids[MAXQUOTAS] = {0, 0};
struct llog_cookie *fcc = NULL;
struct filter_obd *filter;
+ int rc, err, locked = 0;
+ struct inode *inode;
struct iattr iattr;
void *handle;
- int rc, err;
ENTRY;
LASSERT(dentry != NULL);
LASSERT(!IS_ERR(dentry));
- LASSERT(dentry->d_inode != NULL);
+
+ inode = dentry->d_inode;
+ LASSERT(inode != NULL);
filter = &exp->exp_obd->u.filter;
iattr_from_obdo(&iattr, oa, oa->o_valid);
memcpy(fcc, obdo_logcookie(oa), sizeof(*fcc));
}
- if (iattr.ia_valid & ATTR_SIZE)
- down(&dentry->d_inode->i_sem);
+ if (iattr.ia_valid & ATTR_SIZE || iattr.ia_valid & (ATTR_UID | ATTR_GID)) {
+ down(&inode->i_sem);
+ locked = 1;
+ }
+ /* If the inode still has SUID+SGID bits set (see filter_precreate())
+ * then we will accept the UID+GID sent by the client during write for
+ * initializing the ownership of this inode. We only allow this to
+ * happen once so clear these bits in setattr. In 2.6 kernels it is
+ * possible to get ATTR_UID and ATTR_GID separately, so we only clear
+ * the flags that are actually being set. */
if (iattr.ia_valid & (ATTR_UID | ATTR_GID)) {
- orig_ids[USRQUOTA] = dentry->d_inode->i_uid;
- orig_ids[GRPQUOTA] = dentry->d_inode->i_gid;
- handle = fsfilt_start_log(exp->exp_obd, dentry->d_inode,
+ CDEBUG(D_INODE, "update UID/GID to %lu/%lu\n",
+ (unsigned long)oa->o_uid, (unsigned long)oa->o_gid);
+
+ if ((inode->i_mode & S_ISUID) &&
+ (iattr.ia_valid & ATTR_UID)) {
+ if (!(iattr.ia_valid & ATTR_MODE)) {
+ iattr.ia_mode = inode->i_mode;
+ iattr.ia_valid |= ATTR_MODE;
+ }
+ iattr.ia_mode &= ~S_ISUID;
+ }
+ if ((inode->i_mode & S_ISGID) &&
+ (iattr.ia_valid & ATTR_GID)) {
+ if (!(iattr.ia_valid & ATTR_MODE)) {
+ iattr.ia_mode = inode->i_mode;
+ iattr.ia_valid |= ATTR_MODE;
+ }
+ iattr.ia_mode &= ~S_ISGID;
+ }
+
+ orig_ids[USRQUOTA] = inode->i_uid;
+ orig_ids[GRPQUOTA] = inode->i_gid;
+ handle = fsfilt_start_log(exp->exp_obd, inode,
FSFILT_OP_SETATTR, oti, 1);
+
+ /* update inode EA only once */
+ if (inode->i_mode & S_ISUID || inode->i_mode & S_ISGID)
+ filter_update_fidea(exp, inode, handle, oa);
} else {
- handle = fsfilt_start(exp->exp_obd, dentry->d_inode,
+ handle = fsfilt_start(exp->exp_obd, inode,
FSFILT_OP_SETATTR, oti);
}
GOTO(out_unlock, rc = PTR_ERR(handle));
if (oa->o_valid & OBD_MD_FLFLAGS) {
- rc = fsfilt_iocontrol(exp->exp_obd, dentry->d_inode, NULL,
+ rc = fsfilt_iocontrol(exp->exp_obd, inode, NULL,
EXT3_IOC_SETFLAGS, (long)&oa->o_flags);
} else {
rc = fsfilt_setattr(exp->exp_obd, dentry, handle, &iattr, 1);
fcc);
}
+ if (locked) {
+ up(&inode->i_sem);
+ locked = 0;
+ }
+
rc = filter_finish_transno(exp, oti, rc);
- err = fsfilt_commit(exp->exp_obd, dentry->d_inode, handle, 0);
+ err = fsfilt_commit(exp->exp_obd, inode, handle, 0);
if (err) {
CERROR("error on commit, err = %d\n", err);
if (!rc)
}
EXIT;
out_unlock:
- if (iattr.ia_valid & ATTR_SIZE)
- up(&dentry->d_inode->i_sem);
+ if (locked)
+ up(&inode->i_sem);
/* trigger quota release */
if (iattr.ia_valid & (ATTR_SIZE | ATTR_UID | ATTR_GID)) {
int rc;
ENTRY;
- //LASSERT(oti != NULL);
-
+ dentry = __filter_oa2dentry(exp->exp_obd, oa,
+ __FUNCTION__, 1);
+ if (IS_ERR(dentry))
+ RETURN(PTR_ERR(dentry));
+
filter = &exp->exp_obd->u.filter;
push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
-
- /* make sure that object is allocated. */
- dentry = filter_crow_object(exp->exp_obd, oa);
- if (IS_ERR(dentry))
- GOTO(out_pop, rc = PTR_ERR(dentry));
-
lock_kernel();
/* setting objects attributes (including owner/group) */
out_unlock:
unlock_kernel();
f_dput(dentry);
-out_pop:
pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
return rc;
}
RETURN(lsm_size);
}
+static void filter_destroy_precreated(struct obd_export *exp, struct obdo *oa,
+ struct filter_obd *filter)
+{
+ struct obdo doa; /* XXX obdo on stack */
+ __u64 last, id;
+ ENTRY;
+ LASSERT(oa);
+
+ memset(&doa, 0, sizeof(doa));
+ if (oa->o_valid & OBD_MD_FLGROUP) {
+ doa.o_valid |= OBD_MD_FLGROUP;
+ doa.o_gr = oa->o_gr;
+ } else {
+ doa.o_gr = 0;
+ }
+ doa.o_mode = S_IFREG;
+
+ filter->fo_destroy_in_progress = 1;
+ down(&filter->fo_create_lock);
+ if (!filter->fo_destroy_in_progress) {
+ CERROR("%s: destroy_in_progress already cleared\n",
+ exp->exp_obd->obd_name);
+ up(&filter->fo_create_lock);
+ EXIT;
+ return;
+ }
+
+ last = filter_last_id(filter, &doa);
+ CWARN("%s: deleting orphan objects from "LPU64" to "LPU64"\n",
+ exp->exp_obd->obd_name, oa->o_id + 1, last);
+ for (id = oa->o_id + 1; id <= last; id++) {
+ doa.o_id = id;
+ filter_destroy(exp, &doa, NULL, NULL, NULL);
+ }
+
+ CDEBUG(D_HA, "%s: after destroy: set last_objids["LPU64"] = "LPU64"\n",
+ exp->exp_obd->obd_name, doa.o_gr, oa->o_id);
+
+ spin_lock(&filter->fo_objidlock);
+ filter->fo_last_objids[doa.o_gr] = oa->o_id;
+ spin_unlock(&filter->fo_objidlock);
+
+ filter->fo_destroy_in_progress = 0;
+ up(&filter->fo_create_lock);
+
+ EXIT;
+}
+
+/* returns a negative error or a nonnegative number of files to create */
+static int filter_should_precreate(struct obd_export *exp, struct obdo *oa,
+ obd_gr group)
+{
+ struct obd_device *obd = exp->exp_obd;
+ struct filter_obd *filter = &obd->u.filter;
+ int diff, rc;
+ ENTRY;
+
+ diff = oa->o_id - filter_last_id(filter, oa);
+ CDEBUG(D_INFO, "filter_last_id() = "LPU64" -> diff = %d\n",
+ filter_last_id(filter, oa), diff);
+
+ /* delete orphans request */
+ if ((oa->o_valid & OBD_MD_FLFLAGS) &&
+ (oa->o_flags & OBD_FL_DELORPHAN)) {
+ if (diff >= 0)
+ RETURN(diff);
+ if (-diff > OST_MAX_PRECREATE) {
+ CERROR("%s: ignoring bogus orphan destroy request: "
+ "obdid "LPU64" last_id "LPU64"\n", obd->obd_name,
+ oa->o_id, filter_last_id(filter, oa));
+ RETURN(-EINVAL);
+ }
+ filter_destroy_precreated(exp, oa, filter);
+ rc = filter_update_last_objid(obd, group, 0);
+ if (rc)
+ CERROR("%s: unable to write lastobjid, but orphans"
+ "were deleted\n", obd->obd_name);
+ RETURN(0);
+ } else {
+ /* only precreate if group == 0 and o_id is specfied */
+ if (!(oa->o_valid & OBD_FL_DELORPHAN) &&
+ (group != 0 || oa->o_id == 0))
+ RETURN(1);
+
+ LASSERTF(diff >= 0,"%s: "LPU64" - "LPU64" = %d\n",obd->obd_name,
+ oa->o_id, filter_last_id(filter, oa), diff);
+ RETURN(diff);
+ }
+}
+
static int filter_statfs(struct obd_device *obd, struct obd_statfs *osfs,
unsigned long max_age)
{
RETURN(rc);
}
-struct dentry *
-filter_create_object(struct obd_device *obd, struct obdo *oa)
+/* We rely on the fact that only one thread will be creating files in a given
+ * group at a time, which is why we don't need an atomic filter_get_new_id.
+ * Even if we had that atomic function, the following race would exist:
+ *
+ * thread 1: gets id x from filter_next_id
+ * thread 2: gets id (x + 1) from filter_next_id
+ * thread 2: creates object (x + 1)
+ * thread 1: tries to create object x, gets -ENOSPC
+ */
+static int filter_precreate(struct obd_device *obd, struct obdo *oa,
+ obd_gr group, int *num)
{
- struct dentry *dparent = NULL;
- struct dentry *dchild = NULL;
- struct lvfs_ucred uc = {0,};
- struct lvfs_run_ctxt saved;
+ struct dentry *dchild = NULL, *dparent = NULL;
struct filter_obd *filter;
- int cleanup_phase = 0;
- int err = 0, rc = 0;
+ struct obd_statfs *osfs;
+ int err = 0, rc = 0, recreate_obj = 0, i;
+ unsigned long enough_time = jiffies + (obd_timeout * HZ) / 3;
+ __u64 next_id;
void *handle = NULL;
- obd_gr group = 0;
ENTRY;
filter = &obd->u.filter;
- CDEBUG(D_INFO, "create objid "LPU64"\n", oa->o_id);
-
- if (oa->o_valid & OBD_MD_FLGROUP)
- group = oa->o_gr;
-
- dparent = filter_parent_lock(obd, group, oa->o_id);
- if (IS_ERR(dparent))
- GOTO(cleanup, dchild = dparent);
- cleanup_phase = 1;
-
- /* check if object is in blacklist. This should be done under parent
- * lock. */
- spin_lock(&filter->fo_blacklist_lock);
- if (oa->o_id > filter->fo_blacklist[group].fe_start &&
- oa->o_id <= filter->fo_blacklist[group].fe_end) {
- spin_unlock(&filter->fo_blacklist_lock);
- GOTO(cleanup, dchild = ERR_PTR(-ENOENT));
- }
- spin_unlock(&filter->fo_blacklist_lock);
-
- /* check if object is already allocated */
- dchild = filter_fid2dentry(obd, dparent, group, oa->o_id);
- if (IS_ERR(dchild))
- GOTO(cleanup, dchild);
-
- /* Files that already exist should only be below or at last_id */
- if (dchild->d_inode) {
- __u64 last_id = filter_last_id(filter, group);
-
- LASSERTF(oa->o_id <= last_id,
- "existing objid "LPU64" larger than last_id "LPU64"\n",
- oa->o_id, last_id);
- GOTO(cleanup, dchild);
+ if ((oa->o_valid & OBD_MD_FLFLAGS) &&
+ (oa->o_flags & OBD_FL_RECREATE_OBJS)) {
+ recreate_obj = 1;
+ } else {
+ OBD_ALLOC(osfs, sizeof(*osfs));
+ if (osfs == NULL)
+ RETURN(-ENOMEM);
+ rc = filter_statfs(obd, osfs, jiffies - HZ);
+ if (rc == 0 && osfs->os_bavail < (osfs->os_blocks >> 10)) {
+ CDEBUG(D_HA, "OST out of space! avail "LPU64"\n",
+ osfs->os_bavail<<filter->fo_obt.obt_sb->s_blocksize_bits);
+ *num=0;
+ rc = -ENOSPC;
+ }
+ OBD_FREE(osfs, sizeof(*osfs));
+ if (rc) {
+ RETURN(rc);
+ }
}
- /* create new object */
- handle = fsfilt_start_log(obd, dparent->d_inode,
- FSFILT_OP_CREATE, NULL, 1);
- if (IS_ERR(handle))
- GOTO(cleanup, dchild = handle);
- cleanup_phase = 2;
-
- uc.luc_fsuid = oa->o_valid & OBD_MD_FLUID ?
- oa->o_uid : 0;
- uc.luc_fsgid = oa->o_valid & OBD_MD_FLGID ?
- oa->o_gid : 0;
- uc.luc_cap = current->cap_effective;
+ CDEBUG(D_HA, "%s: precreating %d objects\n", obd->obd_name, *num);
- cap_raise(uc.luc_cap, CAP_SYS_RESOURCE);
+ down(&filter->fo_create_lock);
- push_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
- rc = ll_vfs_create(dparent->d_inode, dchild, S_IFREG, NULL);
- pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
+ for (i = 0; i < *num && err == 0; i++) {
+ int cleanup_phase = 0;
- if (rc) {
- CERROR("create failed rc = %d\n", rc);
- f_dput(dchild);
- GOTO(cleanup, dchild = ERR_PTR(rc));
- }
+ if (filter->fo_destroy_in_progress) {
+ CWARN("%s: precreate aborted by destroy\n",
+ obd->obd_name);
+ break;
+ }
- /* grow last created object id. */
- filter_grow_last_id(filter, group, oa->o_id);
- rc = filter_update_last_objid(obd, group, 0);
- if (rc) {
- CERROR("unable to write lastobjid, but "
- "object is created, err = %d\n",
- rc);
- rc = 0;
- }
+ if (recreate_obj) {
+ __u64 last_id;
+ next_id = oa->o_id;
+ last_id = filter_last_id(filter, oa);
+ if (next_id > last_id) {
+ CERROR("Error: Trying to recreate obj greater"
+ "than last id "LPD64" > "LPD64"\n",
+ next_id, last_id);
+ GOTO(cleanup, rc = -EINVAL);
+ }
+ } else
+ next_id = filter_last_id(filter, oa) + 1;
+
+ CDEBUG(D_INFO, "precreate objid "LPU64"\n", next_id);
+
+ dparent = filter_parent_lock(obd, group, next_id);
+ if (IS_ERR(dparent))
+ GOTO(cleanup, rc = PTR_ERR(dparent));
+ cleanup_phase = 1;
+
+ dchild = filter_fid2dentry(obd, dparent, group, next_id);
+ if (IS_ERR(dchild))
+ GOTO(cleanup, rc = PTR_ERR(dchild));
+ cleanup_phase = 2;
+
+ if (dchild->d_inode != NULL) {
+ /* This would only happen if lastobjid was bad on disk*/
+ /* Could also happen if recreating missing obj but
+ * already exists
+ */
+ if (recreate_obj) {
+ CERROR("%s: recreating existing object %.*s?\n",
+ obd->obd_name, dchild->d_name.len,
+ dchild->d_name.name);
+ } else {
+ CERROR("%s: Serious error: objid %.*s already "
+ "exists; is this filesystem corrupt?\n",
+ obd->obd_name, dchild->d_name.len,
+ dchild->d_name.name);
+ LBUG();
+ }
+ GOTO(cleanup, rc = -EEXIST);
+ }
- /* nobody else is touching this newly created object */
- LASSERT(dchild->d_inode);
+ handle = fsfilt_start_log(obd, dparent->d_inode,
+ FSFILT_OP_CREATE, NULL, 1);
+ if (IS_ERR(handle))
+ GOTO(cleanup, rc = PTR_ERR(handle));
+ cleanup_phase = 3;
- if (oa->o_valid & OBD_MD_FLFID) {
- struct filter_fid ff;
+ rc = ll_vfs_create(dparent->d_inode, dchild,
+ S_IFREG | S_ISUID | S_ISGID | 0666, NULL);
+ if (rc) {
+ CERROR("create failed rc = %d\n", rc);
+ GOTO(cleanup, rc);
+ }
- /* packing fid and converting it to LE for storing into EA. Here
- * oa->o_stripe_idx should be filled by LOV and rest of fields -
- * by client. */
- ff.ff_fid.id = cpu_to_le64(oa->o_fid);
- ff.ff_fid.f_type = cpu_to_le32(oa->o_stripe_idx);
- ff.ff_fid.generation = cpu_to_le32(oa->o_generation);
- ff.ff_objid = cpu_to_le64(oa->o_id);
- ff.ff_group = cpu_to_le64(group);
+ if (!recreate_obj) {
+ filter_set_last_id(filter, oa, next_id);
+ err = filter_update_last_objid(obd, group, 0);
+ if (err)
+ CERROR("unable to write lastobjid "
+ "but file created\n");
+ }
- down(&dchild->d_inode->i_sem);
- rc = fsfilt_set_md(obd, dchild->d_inode, handle,&ff,sizeof(ff));
- up(&dchild->d_inode->i_sem);
- if (rc) {
- CERROR("store fid in object failed! rc:%d\n", rc);
+ cleanup:
+ switch(cleanup_phase) {
+ case 3:
+ err = fsfilt_commit(obd, dparent->d_inode, handle, 0);
+ if (err) {
+ CERROR("error on commit, err = %d\n", err);
+ if (!rc)
+ rc = err;
+ }
+ case 2:
f_dput(dchild);
- GOTO(cleanup, dchild = ERR_PTR(rc));
+ case 1:
+ filter_parent_unlock(dparent);
+ case 0:
+ break;
}
- } else {
- CDEBUG(D_HA, "create OSS object without fid!\n");
- }
-cleanup:
- switch(cleanup_phase) {
- case 2:
- err = fsfilt_commit(obd, dparent->d_inode, handle, 0);
- if (err) {
- CERROR("error on commit, err = %d\n", err);
- if (!rc) {
- rc = err;
- f_dput(dchild);
- dchild = ERR_PTR(rc);
- }
+ if (rc)
+ break;
+ if (time_after(jiffies, enough_time)) {
+ CDEBUG(D_INODE,"%s: precreate slow - want %d got %d \n",
+ obd->obd_name, *num, i);
+ break;
}
- case 1:
- filter_parent_unlock(obd, dparent);
- case 0:
- break;
}
+ *num = i;
- RETURN(dchild);
+ up(&filter->fo_create_lock);
+
+ CDEBUG(D_HA, "%s: server last_objid for group "LPU64": "LPU64"\n",
+ obd->obd_name, group, filter->fo_last_objids[group]);
+
+ CDEBUG(D_HA, "%s: filter_precreate() created %d objects\n",
+ obd->obd_name, i);
+ RETURN(rc);
}
-struct dentry *
-filter_crow_object(struct obd_device *obd, struct obdo *oa)
+static int filter_create(struct obd_export *exp, struct obdo *oa,
+ struct lov_stripe_md **ea, struct obd_trans_info *oti)
{
- struct filter_obd *filter;
- struct dentry *dentry;
+ struct obd_device *obd = NULL;
+ struct lvfs_run_ctxt saved;
+ struct lov_stripe_md *lsm = NULL;
obd_gr group = 0;
+ int rc = 0, diff;
ENTRY;
- if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_OST_CROW_EIO))
- RETURN(ERR_PTR(-EIO));
-
- filter = &obd->u.filter;
-
if (oa->o_valid & OBD_MD_FLGROUP)
group = oa->o_gr;
- /* try to create new object (if it is not yet) */
- dentry = filter_create_object(obd, oa);
- if (IS_ERR(dentry)) {
- CERROR("cannot create OSS object "LPU64"/"LPU64
- ", err = %d\n", oa->o_id, group,
- (int)PTR_ERR(dentry));
- RETURN(dentry);
+ CDEBUG(D_INFO, "filter_create(od->o_gr="LPU64",od->o_id="LPU64")\n",
+ group, oa->o_id);
+ if (ea != NULL) {
+ lsm = *ea;
+ if (lsm == NULL) {
+ rc = obd_alloc_memmd(exp, &lsm);
+ if (rc < 0)
+ RETURN(rc);
+ }
}
- RETURN(dentry);
+ obd = exp->exp_obd;
+ push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+
+ if ((oa->o_valid & OBD_MD_FLFLAGS) &&
+ (oa->o_flags & OBD_FL_RECREATE_OBJS)) {
+ if (oa->o_id > filter_last_id(&obd->u.filter, oa)) {
+ CERROR("recreate objid "LPU64" > last id "LPU64"\n",
+ oa->o_id, filter_last_id(&obd->u.filter, oa));
+ rc = -EINVAL;
+ } else {
+ diff = 1;
+ rc = filter_precreate(obd, oa, group, &diff);
+ }
+ } else {
+ diff = filter_should_precreate(exp, oa, group);
+ if (diff > 0) {
+ oa->o_id = filter_last_id(&obd->u.filter, oa);
+ rc = filter_precreate(obd, oa, group, &diff);
+ oa->o_id = filter_last_id(&obd->u.filter, oa);
+ oa->o_valid = OBD_MD_FLID;
+ }
+ }
+
+ pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+ if (rc && ea != NULL && *ea != lsm) {
+ obd_free_memmd(exp, &lsm);
+ } else if (rc == 0 && ea != NULL) {
+ /* XXX LOV STACKING: the lsm that is passed to us from
+ * LOV does not have valid lsm_oinfo data structs, so
+ * don't go touching that. This needs to be fixed in a
+ * big way. */
+ lsm->lsm_object_id = oa->o_id;
+ *ea = lsm;
+ }
+
+ RETURN(rc);
}
-/* destroys object @oa. Takes care of locking if @lock says that parent is not
- * yet locked. Also drops parent lock before taking ldlm PW lock to avoid
- * deadlocks in lock retraction related paths.
- *
- * This function does not change locking and does not imply hiden locking
- * knowladge. After this fucntion is finished, all parents stay at the same
- * locking state.
-
- * If @lock == 1, this means that parent of @oa is not locked and should be
- * locked for destroy operation. However, after operation is finished, parent
- * will be unlocked. The same is true about opposite case, when parent is
- * already locked and filter_destroy_internal() does not need to lock it. */
-static int
-filter_destroy_internal(struct obd_export *exp, struct obdo *oa,
- struct lov_stripe_md *md, struct obd_trans_info *oti,
- int lock)
+int filter_destroy(struct obd_export *exp, struct obdo *oa,
+ struct lov_stripe_md *md, struct obd_trans_info *oti,
+ struct obd_export *md_exp)
{
+ unsigned int qcids[MAXQUOTAS] = {0, 0};
struct obd_device *obd;
struct filter_obd *filter;
struct dentry *dchild = NULL, *dparent = NULL;
void *handle = NULL;
struct llog_cookie *fcc = NULL;
int rc, rc2, cleanup_phase = 0, have_prepared = 0;
- unsigned int qcids[MAXQUOTAS] = {0, 0};
obd_gr group = 0;
ENTRY;
push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
acquire_locks:
- dparent = lock ?
- filter_parent_lock(obd, group, oa->o_id):
- filter_parent(obd, group, oa->o_id);
+ dparent = filter_parent_lock(obd, group, oa->o_id);
if (IS_ERR(dparent))
GOTO(cleanup, rc = PTR_ERR(dparent));
cleanup_phase = 1;
}
if (!have_prepared) {
- /* If we're really going to destroy the object, get ready by
- * getting the clients to discard their cached data.
+ /* If we're really going to destroy the object, get ready
+ * by getting the clients to discard their cached data.
*
* We have to drop the parent lock, because
- * filter_prepare_destroy() will acquire a PW on the object, and
+ * filter_prepare_destroy will acquire a PW on the object, and
* we don't want to deadlock with an incoming write to the
* object, which has the extent PW and then wants to get the
* parent dentry to do the lookup.
* complication of condition the above code to skip it on the
* second time through. */
f_dput(dchild);
+ filter_parent_unlock(dparent);
- filter_unlock_dentry(obd, dparent);
filter_prepare_destroy(obd, oa->o_id);
-
- /* lock parent dentry again, to keep locking state the same as
- * before calling this function. */
- if (!lock)
- filter_lock_dentry(obd, dparent);
-
have_prepared = 1;
goto acquire_locks;
}
/* Quota release need uid/gid of inode */
obdo_from_inode(oa, dchild->d_inode, OBD_MD_FLUID|OBD_MD_FLGID);
- rc = filter_unlink(obd, oa->o_id, dparent, dchild);
+ rc = filter_destroy_internal(obd, oa->o_id, dparent, dchild);
+ EXIT;
cleanup:
switch(cleanup_phase) {
case 3:
case 2:
f_dput(dchild);
case 1:
- if (lock)
- filter_parent_unlock(obd, dparent);
+ filter_parent_unlock(dparent);
case 0:
pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
break;
FSFILT_OP_UNLINK);
CDEBUG(rc2 ? D_ERROR : D_QUOTA,
"filter adjust qunit! (rc:%d)\n", rc2);
-
- RETURN(rc);
-}
-
-/* destroy oject with taking lock on parent first. */
-int filter_destroy(struct obd_export *exp, struct obdo *oa,
- struct lov_stripe_md *md, struct obd_trans_info *oti,
- struct obd_export *md_exp)
-{
- int rc;
-
- ENTRY;
- rc = filter_destroy_internal(exp, oa, md, oti, 1);
- RETURN(rc);
-}
-
-static int
-filter_clear_orphans(struct obd_export *exp, struct obdo *oa)
-{
- struct filter_obd *filter;
- struct obd_device *obd;
- struct obdo *doa;
- obd_gr group = 0;
- int rc, orphans;
- __u64 last, id;
- ENTRY;
-
- LASSERT(oa);
-
- OBD_RACE(OBD_FAIL_OST_CLEAR_ORPHANS_RACE);
-
- obd = exp->exp_obd;
- filter = &obd->u.filter;
-
- if (oa->o_valid & OBD_MD_FLGROUP)
- group = oa->o_gr;
-
- filter->fo_destroy_in_progress = 1;
-
- LOCK_PARENTS(obd, group);
- if (!filter->fo_destroy_in_progress) {
- UNLOCK_PARENTS(obd, group);
- CDEBUG(D_HA, "cleanup orphans is already canceled\n");
- RETURN(0);
- }
-
- last = filter_last_id(filter, group);
- orphans = last - oa->o_id;
-
- if (orphans <= 0) {
- filter->fo_destroy_in_progress = 0;
- UNLOCK_PARENTS(obd, group);
- CDEBUG(D_HA, "nothing to cleanup, MDS objid "LPU64
- " is not bigger than OST one "LPU64"\n",
- oa->o_id, last);
- RETURN(0);
- }
-
- CDEBUG(D_HA, "adding orphans extent "LPU64":"LPU64"-"LPU64
- " to blacklist\n", group, oa->o_id, last);
-
- /* making all orphans entries in blacklist, that will deny to re-create
- * them by CROW in filter_create_object(). This is done for case when
- * orphans already exist on client and will be tried to write something
- * and we want to stop them.
- *
- * In fact the issue is even worse, as we want to put in blacklist not
- * only the objects which we just destroed, but also those which not yet
- * created on OST (and OST has no idea about) but possibly existing on
- * clients. */
- spin_lock(&filter->fo_blacklist_lock);
- filter->fo_blacklist[group].fe_start = oa->o_id;
- filter->fo_blacklist[group].fe_end = last;
- spin_unlock(&filter->fo_blacklist_lock);
-
- doa = obdo_alloc();
- if (doa == NULL) {
- filter->fo_destroy_in_progress = 0;
- UNLOCK_PARENTS(obd, group);
- RETURN(-ENOMEM);
- }
-
- doa->o_gr = group;
- doa->o_mode = S_IFREG;
- doa->o_valid = oa->o_valid & (OBD_MD_FLGROUP | OBD_MD_FLID);
-
- CDEBUG(D_ERROR, "%s:["LPU64"] deleting orphan objects from "LPU64" to "
- LPU64"\n", exp->exp_obd->obd_name, doa->o_gr, oa->o_id, last);
-
- for (id = last; id > oa->o_id; id--) {
- doa->o_id = id;
-
- /* remove object @doa. It will not lock parent as parents
- * already locked. */
- filter_destroy_internal(exp, doa, NULL, NULL, 0);
-
- /* update last id just for case when OST will down in cleanup
- * orphans time. */
- filter_set_last_id(filter, group, id);
-
- /* update last_id on disk periodicaly */
- if ((id & 1023) == 0)
- filter_update_last_objid(obd, group, 0);
- }
-
- UNLOCK_PARENTS(obd, group);
-
- /* return next free id to be used as a new start of sequence. As we
- * return last id from OST, this will make sure that MDS will start new
- * sequence from object id which is far from existing and there will not
- * be object id sharing. */
- oa->o_id = last + 1;
- filter_set_last_id(filter, group, oa->o_id);
-
- CDEBUG(D_ERROR, "%s:["LPU64"] after destroy: set last_objids = "
- LPU64"\n", exp->exp_obd->obd_name, doa->o_gr, oa->o_id);
-
- rc = filter_update_last_objid(obd, group, 1);
- filter->fo_destroy_in_progress = 0;
-
- obdo_free(doa);
- RETURN(rc);
-}
-
-static int filter_create(struct obd_export *exp, struct obdo *oa,
- struct lov_stripe_md **ea, struct obd_trans_info *oti)
-{
- struct filter_export_data *fed;
- struct lvfs_run_ctxt saved;
- struct filter_obd *filter;
- obd_gr group = oa->o_gr;
- struct obd_device *obd;
- int rc = 0;
- ENTRY;
-
- obd = exp->exp_obd;
- fed = &exp->exp_filter_data;
- filter = &obd->u.filter;
-
- CDEBUG(D_INFO, "filter_create(od->o_gr="LPU64",od->o_id="LPU64")\n",
- group, oa->o_id);
-
- if (oa->o_valid & OBD_MD_FLFLAGS && oa->o_flags == OBD_FL_DELORPHAN) {
- push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
-
- rc = filter_clear_orphans(exp, oa);
- if (rc) {
- CERROR("cannot clear orphans starting from "
- LPU64", err = %d\n", oa->o_id, rc);
- }
- pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
- RETURN(rc);
- }
-
- LASSERT(ergo(oa->o_valid & OBD_MD_FLFLAGS,
- !!(oa->o_flags & OBD_FL_CREATE_CROW) !=
- !!(oa->o_flags & OBD_FL_RECREATE_OBJS)));
-
- /* all non-CROW creates should end up here */
- if (OBDO_URGENT_CREATE(oa)) {
- struct obd_statfs *osfs;
- struct dentry *dentry;
-
- /* check space first. As this is real create and client does not
- * have yet file created, this is good place to check space. */
- OBD_ALLOC_PTR(osfs);
- if (!osfs)
- RETURN(-ENOMEM);
-
- rc = filter_statfs(obd, osfs, jiffies - HZ);
- if (rc == 0 && osfs->os_bavail < (osfs->os_blocks >> 10)) {
- CDEBUG(D_HA, "OST out of space! avail "LPU64"\n",
- osfs->os_bavail << filter->fo_obt.obt_sb->s_blocksize_bits);
- rc = -ENOSPC;
- }
-
- OBD_FREE_PTR(osfs);
- if (rc)
- RETURN(rc);
-
- dentry = filter_create_object(obd, oa);
- if (!IS_ERR(dentry)) {
- f_dput(dentry);
- if (ea != NULL) {
- struct lov_stripe_md *lsm = *ea;
- if (lsm == NULL) {
- rc = obd_alloc_memmd(exp, &lsm);
- if (rc)
- RETURN(rc);
- }
- lsm->lsm_object_id = oa->o_id;
- *ea = lsm;
- rc = 0;
- }
- }
- } else {
- CERROR("wrong @oa flags detected 0x%lx. Not an urgent "
- "create and not recovery.\n",(unsigned long)oa->o_flags);
- LBUG();
- }
- RETURN(rc);
+ return rc;
}
/* NB start and end are used for punch, but not truncate */
#define FILTER_SUBDIR_COUNT 32 /* set to zero for no subdirs */
#define FILTER_GROUPS 3 /* must be at least 3; not dynamic yet */
-#define FILTER_RECOVERY_TIMEOUT (obd_timeout * 5 * HZ / 2) /* *waves hands* */
+#define FILTER_ROCOMPAT_SUPP (0)
-#define FILTER_ROCOMPAT_SUPP (OBD_ROCOMPAT_CROW)
+#define FILTER_RECOVERY_TIMEOUT (obd_timeout * 5 * HZ / 2) /* *waves hands* */
#define FILTER_INCOMPAT_SUPP (OBD_INCOMPAT_GROUPS)
struct dentry *__filter_oa2dentry(struct obd_device *obd, struct obdo *oa,
const char *what, int quiet);
#define filter_oa2dentry(obd, oa) __filter_oa2dentry(obd, oa, __FUNCTION__, 0)
-#define filter_oa2dentry_quiet(obd, oa) __filter_oa2dentry(obd, oa, __FUNCTION__, 1)
int filter_finish_transno(struct obd_export *, struct obd_trans_info *, int rc);
-__u64 filter_last_id(struct filter_obd *, int group);
+__u64 filter_next_id(struct filter_obd *, struct obdo *);
+__u64 filter_last_id(struct filter_obd *, struct obdo *);
+int filter_update_fidea(struct obd_export *exp, struct inode *inode,
+ void *handle, struct obdo *oa);
int filter_update_server_data(struct obd_device *, struct file *,
struct filter_server_data *, int force_sync);
int filter_update_last_objid(struct obd_device *, obd_gr, int force_sync);
int filter_common_setup(struct obd_device *, obd_count len, void *buf,
void *option);
int filter_destroy(struct obd_export *exp, struct obdo *oa,
- struct lov_stripe_md *md, struct obd_trans_info *,
- struct obd_export *md_exp);
-struct dentry *filter_crow_object(struct obd_device *obd, struct obdo *oa);
-
+ struct lov_stripe_md *md, struct obd_trans_info *,
+ struct obd_export *);
int filter_setattr_internal(struct obd_export *exp, struct dentry *dentry,
struct obdo *oa, struct obd_trans_info *oti);
int filter_setattr(struct obd_export *exp, struct obdo *oa,
iobuf = filter_iobuf_get(&obd->u.filter, oti);
- dentry = filter_oa2dentry_quiet(obd, oa);
+ dentry = filter_oa2dentry(obd, oa);
if (IS_ERR(dentry)) {
- if (PTR_ERR(dentry) == -ENOENT) {
- dentry = NULL;
- inode = NULL;
- } else {
- dentry = NULL;
- GOTO(cleanup, rc = PTR_ERR(dentry));
- }
- } else {
- inode = dentry->d_inode;
+ rc = PTR_ERR(dentry);
+ dentry = NULL;
+ GOTO(cleanup, rc);
}
-
- if (oa && inode != NULL)
+
+ inode = dentry->d_inode;
+
+ if (oa)
obdo_to_inode(inode, oa, OBD_MD_FLATIME);
fsfilt_check_slow(now, obd_timeout, "preprw_read setup");
*/
LASSERT(lnb->page != NULL);
- if (inode == NULL || inode->i_size <= rnb->offset)
- /* If there's no more data, or inode is not yet
- * allocated by CROW abort early. lnb->rc == 0, so it's
- * easy to detect later. */
+ if (inode->i_size <= rnb->offset)
+ /* If there's no more data, abort early. lnb->rc == 0,
+ * so it's easy to detect later. */
break;
else
filter_alloc_dio_page(obd, inode, lnb);
fsfilt_check_slow(now, obd_timeout, "start_page_read");
- if (inode != NULL) {
- rc = filter_direct_io(OBD_BRW_READ, dentry, iobuf,
- exp, NULL, NULL, NULL);
- if (rc)
- GOTO(cleanup, rc);
- }
+ rc = filter_direct_io(OBD_BRW_READ, dentry, iobuf,
+ exp, NULL, NULL, NULL);
+ if (rc)
+ GOTO(cleanup, rc);
lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_READ_BYTES, tot_bytes);
LASSERT(objcount == 1);
LASSERT(obj->ioo_bufcnt > 0);
- OBD_RACE(OBD_FAIL_OST_CLEAR_ORPHANS_RACE);
-
+ push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
iobuf = filter_iobuf_get(&exp->exp_obd->u.filter, oti);
+ if (iobuf == NULL)
+ GOTO(cleanup, rc = -ENOMEM);
cleanup_phase = 1;
- push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
-
- /* make sure that object is already allocated */
- dentry = filter_crow_object(exp->exp_obd, oa);
+ dentry = filter_fid2dentry(exp->exp_obd, NULL, obj->ioo_gr,
+ obj->ioo_id);
if (IS_ERR(dentry))
GOTO(cleanup, rc = PTR_ERR(dentry));
cleanup_phase = 2;
+ if (dentry->d_inode == NULL) {
+ CERROR("%s: trying to BRW to non-existent file "LPU64"\n",
+ exp->exp_obd->obd_name, obj->ioo_id);
+ GOTO(cleanup, rc = -ENOENT);
+ }
+
fso.fso_dentry = dentry;
fso.fso_bufcnt = obj->ioo_bufcnt;
rc = filter_grant_check(exp, objcount, &fso, niocount, nb, res,
&left, dentry->d_inode);
- /* We're finishing using body->oa as an input variable, so reset
- * o_valid here. */
+ /* do not zero out oa->o_valid as it is used in filter_commitrw_write()
+ * for setting UID/GID and fid EA in first write time. */
if (oa && oa->o_valid & OBD_MD_FLGRANT) {
oa->o_grant = filter_grant(exp,oa->o_grant,oa->o_undirty,left);
- oa->o_valid = OBD_MD_FLGRANT;
- } else if (oa)
- oa->o_valid = 0;
+ oa->o_valid |= OBD_MD_FLGRANT;
+ }
spin_unlock(&exp->exp_obd->obd_osfs_lock);
fsfilt_check_slow(now, obd_timeout, "brw_start");
- iattr_from_obdo(&iattr,oa,OBD_MD_FLATIME|OBD_MD_FLMTIME|OBD_MD_FLCTIME);
+ i = OBD_MD_FLATIME | OBD_MD_FLMTIME | OBD_MD_FLCTIME;
+
+ /* If the inode still has SUID+SGID bits set (see filter_precreate())
+ * then we will accept the UID+GID if sent by the client for
+ * initializing the ownership of this inode. We only allow this to
+ * happen once (so clear these bits) and later only allow setattr. */
+ if (inode->i_mode & S_ISUID)
+ i |= OBD_MD_FLUID;
+ if (inode->i_mode & S_ISGID)
+ i |= OBD_MD_FLGID;
+
+ iattr_from_obdo(&iattr, oa, i);
+ if (iattr.ia_valid & (ATTR_UID | ATTR_GID)) {
+ CDEBUG(D_INODE, "update UID/GID to %lu/%lu\n",
+ (unsigned long)oa->o_uid, (unsigned long)oa->o_gid);
+
+ cap_raise(current->cap_effective, CAP_SYS_RESOURCE);
+
+ iattr.ia_valid |= ATTR_MODE;
+ iattr.ia_mode = inode->i_mode;
+ if (iattr.ia_valid & ATTR_UID)
+ iattr.ia_mode &= ~S_ISUID;
+ if (iattr.ia_valid & ATTR_GID)
+ iattr.ia_mode &= ~S_ISGID;
+
+ rc = filter_update_fidea(exp, inode, oti->oti_handle, oa);
+ }
+
/* filter_direct_io drops i_sem */
rc = filter_direct_io(OBD_BRW_WRITE, res->dentry, iobuf, exp, &iattr,
oti, &wait_handle);
fsfilt_check_slow(now, obd_timeout, "brw_start");
- iattr_from_obdo(&iattr,oa,OBD_MD_FLATIME|OBD_MD_FLMTIME|OBD_MD_FLCTIME);
+ i = OBD_MD_FLATIME | OBD_MD_FLMTIME | OBD_MD_FLCTIME;
+
+ /* If the inode still has SUID+SGID bits set (see filter_precreate())
+ * then we will accept the UID+GID if sent by the client for
+ * initializing the ownership of this inode. We only allow this to
+ * happen once (so clear these bits) and later only allow setattr. */
+ if (inode->i_mode & S_ISUID)
+ i |= OBD_MD_FLUID;
+ if (inode->i_mode & S_ISGID)
+ i |= OBD_MD_FLGID;
+
+ iattr_from_obdo(&iattr, oa, i);
+ if (iattr.ia_valid & (ATTR_UID | ATTR_GID)) {
+ CDEBUG(D_INODE, "update UID/GID to %lu/%lu\n",
+ (unsigned long)oa->o_uid, (unsigned long)oa->o_gid);
+
+ cap_raise(current->cap_effective, CAP_SYS_RESOURCE);
+
+ iattr.ia_valid |= ATTR_MODE;
+ iattr.ia_mode = inode->i_mode;
+ if (iattr.ia_valid & ATTR_UID)
+ iattr.ia_mode &= ~S_ISUID;
+ if (iattr.ia_valid & ATTR_GID)
+ iattr.ia_mode &= ~S_ISGID;
+
+ rc = filter_update_fidea(exp, inode, oti->oti_handle, oa);
+ }
+
/* filter_direct_io drops i_sem */
rc = filter_direct_io(OBD_BRW_WRITE, res->dentry, iobuf, exp, &iattr,
oti, &wait_handle);
if (IS_ERR(dentry))
RETURN(PTR_ERR(dentry));
- if (dentry->d_inode == NULL) {
- lvb->lvb_size = 0;
- lvb->lvb_blocks = 0;
-
- /* making client use MDS mtime as this one is zero, bigger one
- * will be taken and this does not break POSIX */
- lvb->lvb_mtime = 0;
- } else {
- lvb->lvb_size = dentry->d_inode->i_size;
- lvb->lvb_mtime = LTIME_S(dentry->d_inode->i_mtime);
- lvb->lvb_blocks = dentry->d_inode->i_blocks;
- }
+ if (dentry->d_inode == NULL)
+ GOTO(out_dentry, rc = -ENOENT);
+
+ lvb->lvb_size = dentry->d_inode->i_size;
+ lvb->lvb_mtime = LTIME_S(dentry->d_inode->i_mtime);
+ lvb->lvb_blocks = dentry->d_inode->i_blocks;
CDEBUG(D_DLMTRACE, "res: "LPU64" initial lvb size: "LPU64", "
"mtime: "LPU64", blocks: "LPU64"\n",
res->lr_name.name[0], lvb->lvb_size,
lvb->lvb_mtime, lvb->lvb_blocks);
+ EXIT;
+out_dentry:
f_dput(dentry);
/* Don't free lvb data on lookup error */
- RETURN(rc);
+ return rc;
}
/* This will be called in two ways:
return rc;
}
+static int osc_rd_create_count(char *page, char **start, off_t off, int count,
+ int *eof, void *data)
+{
+ struct obd_device *obd = data;
+
+ if (obd == NULL)
+ return 0;
+
+ return snprintf(page, count, "%d\n",
+ obd->u.cli.cl_oscc.oscc_grow_count);
+}
+
+static int osc_wr_create_count(struct file *file, const char *buffer,
+ unsigned long count, void *data)
+{
+ struct obd_device *obd = data;
+ int val, rc;
+
+ if (obd == NULL)
+ return 0;
+
+ rc = lprocfs_write_helper(buffer, count, &val);
+ if (rc)
+ return rc;
+
+ if (val < 0)
+ return -ERANGE;
+ if (val > OST_MAX_PRECREATE)
+ return -ERANGE;
+
+ obd->u.cli.cl_oscc.oscc_grow_count = val;
+
+ return count;
+}
+
+static int osc_rd_prealloc_next_id(char *page, char **start, off_t off,
+ int count, int *eof, void *data)
+{
+ struct obd_device *obd = data;
+
+ if (obd == NULL)
+ return 0;
+
+ return snprintf(page, count, LPU64"\n",
+ obd->u.cli.cl_oscc.oscc_next_id);
+}
+
+static int osc_rd_prealloc_last_id(char *page, char **start, off_t off,
+ int count, int *eof, void *data)
+{
+ struct obd_device *obd = data;
+
+ if (obd == NULL)
+ return 0;
+
+ return snprintf(page, count, LPU64"\n",
+ obd->u.cli.cl_oscc.oscc_last_id);
+}
+
static int osc_rd_checksum(char *page, char **start, off_t off, int count,
int *eof, void *data)
{
return count;
}
-static int osc_rd_last_id(char *page, char **start, off_t off,
- int count, int *eof, void *data)
-{
- struct obd_device *obd = (struct obd_device *)data;
- struct osc_creator *oscc = &obd->u.cli.cl_oscc;
- int rc;
-
- *eof = 1;
- spin_lock(&oscc->oscc_lock);
- rc = snprintf(page, count, LPU64"\n", oscc->oscc_next_id);
- spin_unlock(&oscc->oscc_lock);
- return rc;
-}
-
static struct lprocfs_vars lprocfs_obd_vars[] = {
{ "uuid", lprocfs_rd_uuid, 0, 0 },
{ "ping", 0, lprocfs_wr_ping, 0 },
{ "max_dirty_mb", osc_rd_max_dirty_mb, osc_wr_max_dirty_mb, 0 },
{ "cur_dirty_bytes", osc_rd_cur_dirty_bytes, 0, 0 },
{ "cur_grant_bytes", osc_rd_cur_grant_bytes, 0, 0 },
+ { "create_count", osc_rd_create_count, osc_wr_create_count, 0 },
+ { "prealloc_next_id", osc_rd_prealloc_next_id, 0, 0 },
+ { "prealloc_last_id", osc_rd_prealloc_last_id, 0, 0 },
{ "checksums", osc_rd_checksum, osc_wr_checksum, 0 },
- { "last_id", osc_rd_last_id, 0, 0 },
{ 0 }
};
#include <linux/obd_class.h>
#include "osc_internal.h"
-int oscc_recovering(struct osc_creator *oscc)
+static int osc_interpret_create(struct ptlrpc_request *req, void *data, int rc)
{
- int recov = 0;
+ struct osc_creator *oscc;
+ struct ost_body *body = NULL;
+ ENTRY;
+
+ if (req->rq_repmsg) {
+ body = lustre_swab_repbuf(req, 0, sizeof(*body),
+ lustre_swab_ost_body);
+ if (body == NULL && rc == 0)
+ rc = -EPROTO;
+ }
+ oscc = req->rq_async_args.pointer_arg[0];
+ LASSERT(oscc && (oscc->oscc_obd != LP_POISON));
+
spin_lock(&oscc->oscc_lock);
- recov = oscc->oscc_flags & OSCC_FLAG_RECOVERING;
- spin_unlock(&oscc->oscc_lock);
+ oscc->oscc_flags &= ~OSCC_FLAG_CREATING;
+ if (rc == -ENOSPC || rc == -EROFS) {
+ oscc->oscc_flags |= OSCC_FLAG_NOSPC;
+ if (body && rc == -ENOSPC) {
+ oscc->oscc_grow_count = OST_MIN_PRECREATE;
+ oscc->oscc_last_id = body->oa.o_id;
+ }
+ spin_unlock(&oscc->oscc_lock);
+ DEBUG_REQ(D_INODE, req, "OST out of space, flagging");
+ } else if (rc != 0 && rc != -EIO) {
+ oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
+ oscc->oscc_grow_count = OST_MIN_PRECREATE;
+ spin_unlock(&oscc->oscc_lock);
+ DEBUG_REQ(D_ERROR, req,
+ "unknown rc %d from async create: failing oscc", rc);
+ ptlrpc_fail_import(req->rq_import, req->rq_import_generation);
+ } else {
+ if (rc == 0) {
+ oscc->oscc_flags &= ~OSCC_FLAG_LOW;
+ if (body) {
+ int diff = body->oa.o_id - oscc->oscc_last_id;
+ if (diff != oscc->oscc_grow_count)
+ oscc->oscc_grow_count =
+ max(diff/3, OST_MIN_PRECREATE);
+ oscc->oscc_last_id = body->oa.o_id;
+ }
+ }
+ spin_unlock(&oscc->oscc_lock);
+ }
- return recov;
+ CDEBUG(D_HA, "preallocated through id "LPU64" (last used "LPU64")\n",
+ oscc->oscc_last_id, oscc->oscc_next_id);
+
+ wake_up(&oscc->oscc_waitq);
+ RETURN(rc);
}
-static int osc_check_state(struct obd_export *exp)
+static int oscc_internal_create(struct osc_creator *oscc)
{
- int rc;
+ struct ptlrpc_request *request;
+ struct ost_body *body;
+ int size = sizeof(*body);
ENTRY;
- /* ->os_state contains positive error code on remote OST. To convert it
- * to usual errno form we have to make an sign inversion. */
- spin_lock(&exp->exp_obd->obd_osfs_lock);
- rc = -exp->exp_obd->obd_osfs.os_state;
- spin_unlock(&exp->exp_obd->obd_osfs_lock);
-
- RETURN(rc);
+ spin_lock(&oscc->oscc_lock);
+ if (oscc->oscc_grow_count < OST_MAX_PRECREATE &&
+ !(oscc->oscc_flags & (OSCC_FLAG_LOW | OSCC_FLAG_RECOVERING)) &&
+ (__s64)(oscc->oscc_last_id - oscc->oscc_next_id) <=
+ (oscc->oscc_grow_count / 4 + 1)) {
+ oscc->oscc_flags |= OSCC_FLAG_LOW;
+ oscc->oscc_grow_count *= 2;
+ }
+
+ if (oscc->oscc_grow_count > OST_MAX_PRECREATE / 2)
+ oscc->oscc_grow_count = OST_MAX_PRECREATE / 2;
+
+ if (oscc->oscc_flags & OSCC_FLAG_CREATING ||
+ oscc->oscc_flags & OSCC_FLAG_RECOVERING) {
+ spin_unlock(&oscc->oscc_lock);
+ RETURN(0);
+ }
+ oscc->oscc_flags |= OSCC_FLAG_CREATING;
+ spin_unlock(&oscc->oscc_lock);
+
+ request = ptlrpc_prep_req(oscc->oscc_obd->u.cli.cl_import,
+ LUSTRE_OST_VERSION, OST_CREATE, 1,
+ &size, NULL);
+ if (request == NULL) {
+ spin_lock(&oscc->oscc_lock);
+ oscc->oscc_flags &= ~OSCC_FLAG_CREATING;
+ spin_unlock(&oscc->oscc_lock);
+ RETURN(-ENOMEM);
+ }
+
+ request->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
+ body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof(*body));
+
+ spin_lock(&oscc->oscc_lock);
+ body->oa.o_id = oscc->oscc_last_id + oscc->oscc_grow_count;
+ body->oa.o_valid |= OBD_MD_FLID;
+ spin_unlock(&oscc->oscc_lock);
+ CDEBUG(D_HA, "preallocating through id "LPU64" (last used "LPU64")\n",
+ body->oa.o_id, oscc->oscc_next_id);
+
+ request->rq_replen = lustre_msg_size(1, &size);
+
+ request->rq_async_args.pointer_arg[0] = oscc;
+ request->rq_interpret_reply = osc_interpret_create;
+ ptlrpcd_add_req(request);
+
+ RETURN(0);
+}
+
+static int oscc_has_objects(struct osc_creator *oscc, int count)
+{
+ int have_objs;
+ spin_lock(&oscc->oscc_lock);
+ have_objs = ((__s64)(oscc->oscc_last_id - oscc->oscc_next_id) >= count);
+ spin_unlock(&oscc->oscc_lock);
+
+ if (!have_objs)
+ oscc_internal_create(oscc);
+
+ return have_objs;
+}
+
+static int oscc_wait_for_objects(struct osc_creator *oscc, int count)
+{
+ int have_objs;
+ int ost_full;
+ int osc_invalid;
+
+ have_objs = oscc_has_objects(oscc, count);
+
+ spin_lock(&oscc->oscc_lock);
+ ost_full = (oscc->oscc_flags & OSCC_FLAG_NOSPC);
+ spin_unlock(&oscc->oscc_lock);
+
+ osc_invalid = oscc->oscc_obd->u.cli.cl_import->imp_invalid;
+
+ return have_objs || ost_full || osc_invalid;
}
-static int osc_check_nospc(struct obd_export *exp)
+static int oscc_precreate(struct osc_creator *oscc, int wait)
{
- __u64 blocks, bavail;
- __u64 inodes, iavail;
+ struct l_wait_info lwi = { 0 };
int rc = 0;
ENTRY;
- spin_lock(&exp->exp_obd->obd_osfs_lock);
- blocks = exp->exp_obd->obd_osfs.os_blocks;
- bavail = exp->exp_obd->obd_osfs.os_bavail;
- inodes = exp->exp_obd->obd_osfs.os_files;
- iavail = exp->exp_obd->obd_osfs.os_ffree;
- spin_unlock(&exp->exp_obd->obd_osfs_lock);
-
- /* return 1 if available space smaller then (blocks >> 10) of all space
- * on OST. The main point of this water mark is to stop create files at
- * some point, to let all created and opened files finish possible
- * writes. */
- if (blocks > 0 && bavail < (blocks >> 10))
- rc = 1;
+ if (oscc_has_objects(oscc, oscc->oscc_grow_count / 2))
+ RETURN(0);
+
+ if (!wait)
+ RETURN(0);
- if (inodes > 0 && iavail < 128)
- rc = 1;
+ /* no rc check -- a no-INTR, no-TIMEOUT wait can't fail */
+ l_wait_event(oscc->oscc_waitq, oscc_wait_for_objects(oscc, 1), &lwi);
+
+ if (!oscc_has_objects(oscc, 1) && (oscc->oscc_flags & OSCC_FLAG_NOSPC))
+ rc = -ENOSPC;
+
+ if (oscc->oscc_obd->u.cli.cl_import->imp_invalid)
+ rc = -EIO;
RETURN(rc);
}
+int oscc_recovering(struct osc_creator *oscc)
+{
+ int recov = 0;
+
+ spin_lock(&oscc->oscc_lock);
+ recov = oscc->oscc_flags & OSCC_FLAG_RECOVERING;
+ spin_unlock(&oscc->oscc_lock);
+
+ return recov;
+}
+
int osc_create(struct obd_export *exp, struct obdo *oa,
struct lov_stripe_md **ea, struct obd_trans_info *oti)
{
+ struct lov_stripe_md *lsm;
struct osc_creator *oscc = &exp->exp_obd->u.cli.cl_oscc;
int try_again = 1, rc = 0;
ENTRY;
+ LASSERT(oa);
+ LASSERT(ea);
+
+ if ((oa->o_valid & OBD_MD_FLGROUP) && (oa->o_gr != 0))
+ RETURN(osc_real_create(exp, oa, ea, oti));
+
+ if ((oa->o_valid & OBD_MD_FLFLAGS) &&
+ oa->o_flags == OBD_FL_RECREATE_OBJS) {
+ RETURN(osc_real_create(exp, oa, ea, oti));
+ }
- LASSERT(oa != NULL);
- LASSERT(ea != NULL);
-
/* this is the special case where create removes orphans */
- if (oa->o_valid & OBD_MD_FLFLAGS && oa->o_flags == OBD_FL_DELORPHAN) {
+ if ((oa->o_valid & OBD_MD_FLFLAGS) &&
+ oa->o_flags == OBD_FL_DELORPHAN) {
spin_lock(&oscc->oscc_lock);
if (oscc->oscc_flags & OSCC_FLAG_SYNC_IN_PROGRESS) {
spin_unlock(&oscc->oscc_lock);
spin_unlock(&oscc->oscc_lock);
CDEBUG(D_HA, "%s: oscc recovery started\n",
oscc->oscc_obd->obd_name);
- LASSERT(oscc->oscc_flags & OSCC_FLAG_RECOVERING);
+
+ /* delete from next_id on up */
+ oa->o_valid |= OBD_MD_FLID;
+ oa->o_id = oscc->oscc_next_id - 1;
CDEBUG(D_HA, "%s: deleting to next_id: "LPU64"\n",
oscc->oscc_obd->obd_name, oa->o_id);
rc = osc_real_create(exp, oa, ea, NULL);
- if (oscc->oscc_obd == NULL) {
- CWARN("the obd for oscc %p has been freed\n", oscc);
- RETURN(rc);
- }
spin_lock(&oscc->oscc_lock);
oscc->oscc_flags &= ~OSCC_FLAG_SYNC_IN_PROGRESS;
if (rc == -ENOSPC)
oscc->oscc_flags |= OSCC_FLAG_NOSPC;
oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
+ oscc->oscc_last_id = oa->o_id;
CDEBUG(D_HA, "%s: oscc recovery finished: %d\n",
oscc->oscc_obd->obd_name, rc);
wake_up(&oscc->oscc_waitq);
oscc->oscc_obd->obd_name, rc);
}
spin_unlock(&oscc->oscc_lock);
- RETURN(rc);
- }
- LASSERT(ergo(oa->o_valid & OBD_MD_FLFLAGS,
- !!(oa->o_flags & OBD_FL_CREATE_CROW) !=
- !!(oa->o_flags & OBD_FL_RECREATE_OBJS)));
-
- /* perform urgent create if asked or import is not crow capable or
- * ENOSPC case if detected. */
- if (OBDO_URGENT_CREATE(oa) || !IMP_CROW_ABLE(class_exp2cliimp(exp)) ||
- osc_check_nospc(exp)) {
- CDEBUG(D_HA, "perform urgent create\n");
- oa->o_flags &= ~OBD_FL_CREATE_CROW;
- if (!oa->o_flags)
- oa->o_valid &= ~OBD_MD_FLFLAGS;
- rc = osc_real_create(exp, oa, ea, oti);
+
RETURN(rc);
}
- /* check OST fs state. */
- rc = osc_check_state(exp);
- if (rc) {
- CDEBUG(D_HA,"OST is in bad shape to create objects, err %d\n",
- rc);
- RETURN(rc);
+ lsm = *ea;
+ if (lsm == NULL) {
+ rc = obd_alloc_memmd(exp, &lsm);
+ if (rc < 0)
+ RETURN(rc);
}
-
+
while (try_again) {
- /* if orphans are being recovered, then we must wait until it is
- * finished before we can continue with create. */
+ /* If orphans are being recovered, then we must wait until
+ it is finished before we can continue with create. */
if (oscc_recovering(oscc)) {
struct l_wait_info lwi;
!oscc_recovering(oscc), &lwi);
LASSERT(rc == 0 || rc == -ETIMEDOUT);
if (rc == -ETIMEDOUT) {
- CDEBUG(D_HA, "%p: timeout waiting on recovery\n",
+ CDEBUG(D_HA,"%p: timeout waiting on recovery\n",
oscc);
RETURN(rc);
}
break;
}
- if (oscc->oscc_flags & OSCC_FLAG_NOSPC) {
+ if (oscc->oscc_last_id >= oscc->oscc_next_id) {
+ memcpy(oa, &oscc->oscc_oa, sizeof(*oa));
+ oa->o_id = oscc->oscc_next_id;
+ lsm->lsm_object_id = oscc->oscc_next_id;
+ *ea = lsm;
+ oscc->oscc_next_id++;
+ try_again = 0;
+ } else if (oscc->oscc_flags & OSCC_FLAG_NOSPC) {
rc = -ENOSPC;
spin_unlock(&oscc->oscc_lock);
break;
}
-
- oscc->oscc_next_id++;
- oa->o_id = oscc->oscc_next_id;
- try_again = 0;
spin_unlock(&oscc->oscc_lock);
+ rc = oscc_precreate(oscc, try_again);
+ if (rc)
+ break;
+ }
+ if (rc == 0)
CDEBUG(D_HA, "%s: returning objid "LPU64"\n",
oscc->oscc_obd->u.cli.cl_import->imp_target_uuid.uuid,
- oa->o_id);
- }
-
+ lsm->lsm_object_id);
+ else if (*ea == NULL)
+ obd_free_memmd(exp, &lsm);
RETURN(rc);
}
return;
oscc = &obd->u.cli.cl_oscc;
- memset(oscc, 0, sizeof(*oscc));
- oscc->oscc_obd = obd;
+ memset(oscc, 0, sizeof(*oscc));
+ INIT_LIST_HEAD(&oscc->oscc_list);
+ init_waitqueue_head(&oscc->oscc_waitq);
spin_lock_init(&oscc->oscc_lock);
+ oscc->oscc_obd = obd;
+ oscc->oscc_grow_count = OST_MIN_PRECREATE;
+
+ oscc->oscc_next_id = 2;
+ oscc->oscc_last_id = 1;
oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
- init_waitqueue_head(&oscc->oscc_waitq);
+ /* XXX the export handle should give the oscc the last object */
+ /* oed->oed_oscc.oscc_last_id = exph->....; */
}
int osc_real_create(struct obd_export *exp, struct obdo *oa,
struct lov_stripe_md **ea, struct obd_trans_info *oti)
{
- struct osc_creator *oscc = &exp->exp_obd->u.cli.cl_oscc;
struct ptlrpc_request *request;
struct ost_body *body;
struct lov_stripe_md *lsm;
GOTO (out_req, rc = -EPROTO);
}
- if ((oa->o_valid & OBD_MD_FLFLAGS) && oa->o_flags == OBD_FL_DELORPHAN) {
- struct obd_import *imp = class_exp2cliimp(exp);
- /* MDS declares last known object, OSS responses
- * with next possible object -bzzz */
- spin_lock(&oscc->oscc_lock);
- oscc->oscc_next_id = body->oa.o_id;
- spin_unlock(&oscc->oscc_lock);
- CDEBUG(D_HA, "%s: set nextid "LPD64" after recovery\n",
- imp->imp_target_uuid.uuid, oa->o_id);
- }
memcpy(oa, &body->oa, sizeof(*oa));
/* This should really be sent by the OST */
OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
+ if (KEY_IS("next_id")) {
+ if (vallen != sizeof(obd_id))
+ RETURN(-EINVAL);
+ obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
+ CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
+ exp->exp_obd->obd_name,
+ obd->u.cli.cl_oscc.oscc_next_id);
+
+ RETURN(0);
+ }
+
if (KEY_IS("unlinked")) {
struct osc_creator *oscc = &obd->u.cli.cl_oscc;
spin_lock(&oscc->oscc_lock);
RETURN(0);
}
-
if (KEY_IS("initial_recov")) {
struct obd_import *imp = exp->exp_obd->u.cli.cl_import;
if (vallen != sizeof(int))
exp->exp_connect_flags = ocd->ocd_connect_flags;
class_export_put(exp);
- if (IMP_CROW_ABLE(imp)) {
- CDEBUG(D_HA, "connected to CROW capable target: %s\n",
- imp->imp_target_uuid.uuid);
- }
-
obd_import_event(imp->imp_obd, imp, IMP_EVENT_OCD);
if ((ocd->ocd_connect_flags & OBD_CONNECT_VERSION) &&
void lustre_assert_wire_constants(void)
{
/* Wire protocol assertions generated by 'wirecheck'
- * running on Linux schatzie.adilger.int 2.6.12-1.1381_FC3 #1 Fri Oct 21 03:46:55 EDT 2005 i6
- * with gcc version 3.3.4 20040817 (Red Hat Linux 3.3.4-2) */
+ * running on Linux beetle 2.6.12-10-686 #1 Fri Nov 18 12:09:04 UTC 2005 i686 GNU/Linux
+ * with gcc version 3.3.6 (Ubuntu 1:3.3.6-8ubuntu1) */
/* Constants... */
(long long)(int)offsetof(struct mds_body, aclsize));
LASSERTF((int)sizeof(((struct mds_body *)0)->aclsize) == 4, " found %lld\n",
(long long)(int)sizeof(((struct mds_body *)0)->aclsize));
+ LASSERTF((int)offsetof(struct mds_body, padding_2) == 156, " found %lld\n",
+ (long long)(int)offsetof(struct mds_body, padding_2));
+ LASSERTF((int)sizeof(((struct mds_body *)0)->padding_2) == 4, " found %lld\n",
+ (long long)(int)sizeof(((struct mds_body *)0)->padding_2));
+ LASSERTF((int)offsetof(struct mds_body, padding_3) == 160, " found %lld\n",
+ (long long)(int)offsetof(struct mds_body, padding_3));
+ LASSERTF((int)sizeof(((struct mds_body *)0)->padding_3) == 4, " found %lld\n",
+ (long long)(int)sizeof(((struct mds_body *)0)->padding_3));
+ LASSERTF((int)offsetof(struct mds_body, padding_4) == 164, " found %lld\n",
+ (long long)(int)offsetof(struct mds_body, padding_4));
+ LASSERTF((int)sizeof(((struct mds_body *)0)->padding_4) == 4, " found %lld\n",
+ (long long)(int)sizeof(((struct mds_body *)0)->padding_4));
LASSERTF(FMODE_READ == 1, " found %lld\n",
(long long)FMODE_READ);
LASSERTF(FMODE_WRITE == 2, " found %lld\n",
RETURN(rc);
}
-static int ptlrpc_statfs_interpret(struct ptlrpc_request *req,
- void *data, int rc)
-{
- struct obd_statfs *msfs;
- struct obd_device *obd;
- ENTRY;
-
- if (rc)
- RETURN(rc);
-
- if (!req->rq_repmsg)
- RETURN(-EPROTO);
-
- msfs = lustre_swab_repbuf(req, 0, sizeof(*msfs),
- lustre_swab_obd_statfs);
- if (msfs == NULL)
- RETURN(-EPROTO);
-
- obd = req->rq_import->imp_obd;
-
- spin_lock(&obd->obd_osfs_lock);
- obd->obd_osfs = *msfs;
- obd->obd_osfs_age = jiffies;
- spin_unlock(&obd->obd_osfs_lock);
-
- RETURN(0);
-}
-
-int ptlrpc_statfs(struct obd_import *imp)
-{
- int size = sizeof(struct obd_statfs);
- struct ptlrpc_request *req;
- ENTRY;
-
- req = ptlrpc_prep_req(imp, LUSTRE_OST_VERSION, OST_STATFS, 0,
- NULL, NULL);
- if (!req) {
- CERROR("OOM trying to ping %s->%s\n",
- imp->imp_obd->obd_uuid.uuid,
- imp->imp_target_uuid.uuid);
- RETURN(-ENOMEM);
- }
-
- DEBUG_REQ(D_INFO, req, "pinging %s->%s",
- imp->imp_obd->obd_uuid.uuid,
- imp->imp_target_uuid.uuid);
-
- req->rq_interpret_reply = ptlrpc_statfs_interpret;
- req->rq_replen = lustre_msg_size(1, &size);
- req->rq_no_resend = req->rq_no_delay = 1;
- ptlrpcd_add_req(req);
-
- RETURN(0);
-}
-
static void ptlrpc_update_next_ping(struct obd_import *imp)
{
- __u32 interval;
-
- interval = IMP_CROW_ABLE(imp) ?
- STATFS_INTERVAL : PING_INTERVAL;
-
imp->imp_next_ping = jiffies + HZ *
- (imp->imp_state == LUSTRE_IMP_DISCON ? 10 : interval);
+ (imp->imp_state == LUSTRE_IMP_DISCON ? 10 : PING_INTERVAL);
}
void ptlrpc_ping_import_soon(struct obd_import *imp)
/* And now, loop forever, pinging as needed. */
while (1) {
- unsigned long sleep_interval = PING_INTERVAL;
- unsigned long update_interval = 0;
unsigned long this_ping = jiffies;
- struct l_wait_info lwi;
long time_to_next_ping;
+ struct l_wait_info lwi;
struct list_head *iter;
down(&pinger_sem);
int force, level;
unsigned long flags;
- if (IMP_CROW_ABLE(imp))
- sleep_interval = STATFS_INTERVAL;
-
spin_lock_irqsave(&imp->imp_lock, flags);
level = imp->imp_state;
force = imp->imp_force_verify;
imp->imp_deactive,
imp->imp_obd->obd_no_recov);
} else if (imp->imp_pingable || force) {
- if (IMP_CROW_ABLE(imp))
- ptlrpc_statfs(imp);
- else
- ptlrpc_ping(imp);
+ ptlrpc_ping(imp);
}
} else {
if (!imp->imp_pingable)
imp->imp_next_ping, this_ping);
}
- /* using here new calculated @update_interval, as
- * sleep_interval holds minimal of possible intervals
- * over pingable imports. */
- update_interval = IMP_CROW_ABLE(imp) ?
- STATFS_INTERVAL : PING_INTERVAL;
-
/* obd_timeout might have changed */
if (time_after(imp->imp_next_ping,
- this_ping + update_interval * HZ))
+ this_ping + PING_INTERVAL * HZ))
ptlrpc_update_next_ping(imp);
}
up(&pinger_sem);
- /* Wait until the next ping time, or until we're stopped. We
- * sleep here smaller interval of two possible (ping or
- * statfs). If one of imports is CROW capable we'll sleep
- * STATFS_INTERVAL and PING_INTERVAL otherwise. */
- time_to_next_ping = this_ping + (sleep_interval * HZ) - jiffies;
+ /* Wait until the next ping time, or until we're stopped. */
+ time_to_next_ping = this_ping + (PING_INTERVAL * HZ) - jiffies;
/* The ping sent by ptlrpc_send_rpc may get sent out
say .01 second after this.
we will SKIP the next ping at next_ping, and the
ping will get sent 2 timeouts from now! Beware. */
CDEBUG(D_INFO, "next ping in %lu (%lu)\n", time_to_next_ping,
- this_ping + sleep_interval * HZ);
+ this_ping + PING_INTERVAL * HZ);
if (time_to_next_ping > 0) {
lwi = LWI_TIMEOUT(max_t(long, time_to_next_ping, HZ),
NULL, NULL);
set -e
-# bug 6088
-ALWAYS_EXCEPT="8 $REPLAY_DUAL_EXCEPT"
+# bug 6088 9761 (CROW related)
+ALWAYS_EXCEPT="8 15a 15b 15c $REPLAY_DUAL_EXCEPT"
LUSTRE=${LUSTRE:-`dirname $0`/..}
. $LUSTRE/tests/test-framework.sh
. ${CONFIG:=$LUSTRE/tests/cfg/local.sh}
# Skip these tests
-# bug number: 2766 4176
-ALWAYS_EXCEPT="0b 39 48 $REPLAY_SINGLE_EXCEPT"
+# bug number: 2766 4176 9761 (CROW related)
+ALWAYS_EXCEPT="0b 1a 39 $REPLAY_SINGLE_EXCEPT"
gen_config() {
rm -f $XMLCONFIG
void lustre_assert_wire_constants(void)
{
/* Wire protocol assertions generated by 'wirecheck'
- * running on Linux schatzie.adilger.int 2.6.12-1.1381_FC3 #1 Fri Oct 21 03:46:55 EDT 2005 i6
- * with gcc version 3.3.4 20040817 (Red Hat Linux 3.3.4-2) */
+ * running on Linux beetle 2.6.12-10-686 #1 Fri Nov 18 12:09:04 UTC 2005 i686 GNU/Linux
+ * with gcc version 3.3.6 (Ubuntu 1:3.3.6-8ubuntu1) */
/* Constants... */
(long long)(int)offsetof(struct mds_body, aclsize));
LASSERTF((int)sizeof(((struct mds_body *)0)->aclsize) == 4, " found %lld\n",
(long long)(int)sizeof(((struct mds_body *)0)->aclsize));
+ LASSERTF((int)offsetof(struct mds_body, padding_2) == 156, " found %lld\n",
+ (long long)(int)offsetof(struct mds_body, padding_2));
+ LASSERTF((int)sizeof(((struct mds_body *)0)->padding_2) == 4, " found %lld\n",
+ (long long)(int)sizeof(((struct mds_body *)0)->padding_2));
+ LASSERTF((int)offsetof(struct mds_body, padding_3) == 160, " found %lld\n",
+ (long long)(int)offsetof(struct mds_body, padding_3));
+ LASSERTF((int)sizeof(((struct mds_body *)0)->padding_3) == 4, " found %lld\n",
+ (long long)(int)sizeof(((struct mds_body *)0)->padding_3));
+ LASSERTF((int)offsetof(struct mds_body, padding_4) == 164, " found %lld\n",
+ (long long)(int)offsetof(struct mds_body, padding_4));
+ LASSERTF((int)sizeof(((struct mds_body *)0)->padding_4) == 4, " found %lld\n",
+ (long long)(int)sizeof(((struct mds_body *)0)->padding_4));
LASSERTF(FMODE_READ == 1, " found %lld\n",
(long long)FMODE_READ);
LASSERTF(FMODE_WRITE == 2, " found %lld\n",