Whamcloud - gitweb
- former precreate pools stuff is returned back
authoryury <yury>
Tue, 13 Dec 2005 08:35:18 +0000 (08:35 +0000)
committeryury <yury>
Tue, 13 Dec 2005 08:35:18 +0000 (08:35 +0000)
- some additions to make quota work with formet creareate code
- some small fixes to filter_setattr() path

27 files changed:
lustre/ChangeLog
lustre/include/linux/lustre_export.h
lustre/include/linux/lustre_idl.h
lustre/include/linux/lustre_import.h
lustre/include/linux/obd.h
lustre/include/linux/obd_support.h
lustre/lov/lov_obd.c
lustre/lvfs/fsfilt_ext3.c
lustre/mds/handler.c
lustre/mds/mds_lov.c
lustre/mds/mds_open.c
lustre/mds/mds_reint.c
lustre/obdfilter/filter.c
lustre/obdfilter/filter_internal.h
lustre/obdfilter/filter_io.c
lustre/obdfilter/filter_io_24.c
lustre/obdfilter/filter_io_26.c
lustre/obdfilter/filter_lvb.c
lustre/osc/lproc_osc.c
lustre/osc/osc_create.c
lustre/osc/osc_request.c
lustre/ptlrpc/import.c
lustre/ptlrpc/pack_generic.c
lustre/ptlrpc/pinger.c
lustre/tests/replay-dual.sh
lustre/tests/replay-single.sh
lustre/utils/wiretest.c

index 5c4f770..b6fc65e 100644 (file)
@@ -8,15 +8,6 @@
        * bug fixes
 
 Severity   : enhancement
-Bugzilla   : 8888
-Description: Introduced CReate On Write (CROW)
-Details    : CROW is improved create approach, which defers OST objects
-            creates to the time when they realy needed. This is when client 
-            wants to perform first write to file for instance. Or when object 
-            changes some of its attributes stored on OST. This should improve 
-            create rate.
-
-Severity   : enhancement
 Bugzilla   : 7981/8208
 Description: Introduced Lustre Networking (LNET)
 Details    : LNET is new networking infrastructure for Lustre, it includes
index 5804902..f939a78 100644 (file)
@@ -21,10 +21,14 @@ struct mds_export_data {
 
 struct osc_creator {
         spinlock_t              oscc_lock;
+        struct list_head        oscc_list;
         struct obd_device       *oscc_obd;
+        obd_id                  oscc_last_id;//last available pre-created object
+        obd_id                  oscc_next_id;// what object id to give out next
+        int                     oscc_grow_count;
+        struct obdo             oscc_oa;
         int                     oscc_flags;
-        obd_id                  oscc_next_id;
-        wait_queue_head_t       oscc_waitq;
+        wait_queue_head_t       oscc_waitq; /* creating procs wait on this */
 };
 
 struct ldlm_export_data {
index 5b44d17..09db217 100644 (file)
@@ -229,6 +229,8 @@ static inline void lustre_msg_set_op_flags(struct lustre_msg *msg, int flags)
 #define OBD_CONNECT_REQPORTAL   0x40ULL /* Separate portal for non-IO reqs */
 #define OBD_CONNECT_ACL         0x80ULL /* client using access control lists */
 #define OBD_CONNECT_XATTR      0x100ULL /* client using extended attributes*/
+
+
 #define OBD_CONNECT_CROW       0x200ULL /* MDS+OST do object create-on-write */
 #define OBD_CONNECT_TRUNCLOCK  0x400ULL /* server gets locks for punch b=9528 */
 #define OBD_CONNECT_TRANSNO    0x800ULL /* replay is sending initial transno */
@@ -240,7 +242,7 @@ static inline void lustre_msg_set_op_flags(struct lustre_msg *msg, int flags)
                                 OBD_CONNECT_IBITS | OBD_CONNECT_JOIN)
 #define OST_CONNECT_SUPPORTED  (OBD_CONNECT_SRVLOCK | OBD_CONNECT_GRANT | \
                                 OBD_CONNECT_REQPORTAL | OBD_CONNECT_VERSION | \
-                                OBD_CONNECT_TRUNCLOCK | OBD_CONNECT_CROW)
+                                OBD_CONNECT_TRUNCLOCK)
 #define ECHO_CONNECT_SUPPORTED (0)
 
 #define OBD_OCD_VERSION(major,minor,patch,fix) (((major)<<24) + ((minor)<<16) +\
@@ -323,7 +325,8 @@ typedef uint32_t        obd_count;
 #define OBD_FL_DEBUG_CHECK   (0x00000040) /* echo client/server debug check */
 #define OBD_FL_NO_USRQUOTA   (0x00000100) /* the object's owner is over quota */
 #define OBD_FL_NO_GRPQUOTA   (0x00000200) /* the object's group is over quota */
-#define OBD_FL_CREATE_CROW   (0x00000400) /* object should be created with crow */
+#define OBD_FL_CREATE_CROW   (0x00000400) /* object swhould be created with crow */
+
 /*
  * set this to delegate DLM locking during obd_punch() to the OSTs. Only OSTs
  * that declared OBD_CONNECT_TRUNCLOCK in their connect flags support this
@@ -370,11 +373,6 @@ struct obdo {
 #define o_dropped o_misc
 #define o_cksum   o_nlink
 
-#define OBDO_URGENT_CREATE(oa)                      \
-        (!((oa)->o_valid & OBD_MD_FLFLAGS) ||       \
-         !((oa)->o_flags & OBD_FL_CREATE_CROW) ||   \
-         ((oa)->o_flags & OBD_FL_RECREATE_OBJS))
-
 extern void lustre_swab_obdo (struct obdo *o);
 
 
index b0445bb..59cf6ad 100644 (file)
@@ -96,9 +96,6 @@ struct obd_import {
         __u64                     imp_connect_flags_orig;
 };
 
-#define IMP_CROW_ABLE(imp) \
-        ((imp)->imp_connect_data.ocd_connect_flags & OBD_CONNECT_CROW)
-
 typedef void (*obd_import_callback)(struct obd_import *imp, void *closure,
                                     int event, void *event_arg, void *cb_data);
 
index a130628..be2f09a 100644 (file)
@@ -246,6 +246,7 @@ struct filter_obd {
         __u64                fo_mount_count;
 
         int                  fo_destroy_in_progress;
+        struct semaphore     fo_create_lock;
 
         struct file_operations *fo_fop;
         struct inode_operations *fo_iop;
@@ -309,11 +310,6 @@ struct filter_obd {
         struct lustre_quota_ctxt fo_quota_ctxt;
         spinlock_t               fo_quotacheck_lock;
         atomic_t                 fo_quotachecking;
-
-        /* objids black list stuff. See for detailed comment in
-         * filter_clear_orphans() */
-        struct filter_ext       *fo_blacklist;
-        spinlock_t               fo_blacklist_lock;
 };
 
 struct mds_server_data;
index 2ab3d1f..ea0c5b2 100644 (file)
@@ -39,7 +39,6 @@ extern unsigned int obd_fail_loc;
 extern unsigned int obd_dump_on_timeout;
 extern unsigned int obd_timeout;          /* seconds */
 #define PING_INTERVAL max(obd_timeout / 4, 1U)
-#define STATFS_INTERVAL max(obd_timeout / 20, 1U)
 extern unsigned int ldlm_timeout;
 extern unsigned int obd_health_check_timeout;
 extern char obd_lustre_upcall[128];
@@ -170,9 +169,6 @@ extern wait_queue_head_t obd_race_waitq;
 
 #define OBD_FAIL_MDC_REVALIDATE_PAUSE    0x800
 
-#define OBD_FAIL_OST_CROW_EIO            0x801
-#define OBD_FAIL_OST_CLEAR_ORPHANS_RACE  0x802
-
 /* preparation for a more advanced failure testbed (not functional yet) */
 #define OBD_FAIL_MASK_SYS    0x0000FF00
 #define OBD_FAIL_MASK_LOC    (0x000000FF | OBD_FAIL_MASK_SYS)
index 90a5b78..2e294eb 100644 (file)
@@ -823,8 +823,6 @@ static int lov_clear_orphans(struct obd_export *export, struct obdo *src_oa,
                         continue;
 
                 memcpy(tmp_oa, src_oa, sizeof(*tmp_oa));
-                tmp_oa->o_valid |= OBD_MD_FLID;
-                tmp_oa->o_id = oti->oti_objid[i];
 
                 LASSERT(lov->tgts[i].ltd_exp);
                 /* XXX: LOV STACKING: use real "obj_mdp" sub-data */
@@ -842,14 +840,52 @@ static int lov_clear_orphans(struct obd_export *export, struct obdo *src_oa,
         RETURN(rc);
 }
 
+static int lov_recreate(struct obd_export *exp, struct obdo *src_oa,
+                        struct lov_stripe_md **ea, struct obd_trans_info *oti)
+{
+        struct lov_stripe_md *obj_mdp, *lsm;
+        struct lov_obd *lov = &exp->exp_obd->u.lov;
+        unsigned ost_idx;
+        int rc, i;
+        ENTRY;
+
+        LASSERT(src_oa->o_valid & OBD_MD_FLFLAGS &&
+                src_oa->o_flags & OBD_FL_RECREATE_OBJS);
+
+        OBD_ALLOC(obj_mdp, sizeof(*obj_mdp));
+        if (obj_mdp == NULL)
+                RETURN(-ENOMEM);
+
+        ost_idx = src_oa->o_nlink;
+        lsm = *ea;
+        if (lsm == NULL)
+                GOTO(out, rc = -EINVAL);
+        if (ost_idx >= lov->desc.ld_tgt_count)
+                GOTO(out, rc = -EINVAL);
+
+        for (i = 0; i < lsm->lsm_stripe_count; i++) {
+                if (lsm->lsm_oinfo[i].loi_ost_idx == ost_idx) {
+                        if (lsm->lsm_oinfo[i].loi_id != src_oa->o_id)
+                                GOTO(out, rc = -EINVAL);
+                        break;
+                }
+        }
+        if (i == lsm->lsm_stripe_count)
+                GOTO(out, rc = -EINVAL);
+
+        rc = obd_create(lov->tgts[ost_idx].ltd_exp, src_oa, &obj_mdp, oti);
+out:
+        OBD_FREE(obj_mdp, sizeof(*obj_mdp));
+        RETURN(rc);
+}
+
 /* the LOV expects oa->o_id to be set to the LOV object id */
-static int
-lov_create(struct obd_export *exp, struct obdo *src_oa,
+static int lov_create(struct obd_export *exp, struct obdo *src_oa,
                       struct lov_stripe_md **ea, struct obd_trans_info *oti)
 {
+        struct lov_obd *lov;
         struct lov_request_set *set = NULL;
         struct list_head *pos;
-        struct lov_obd *lov;
         int rc = 0;
         ENTRY;
 
@@ -863,14 +899,17 @@ lov_create(struct obd_export *exp, struct obdo *src_oa,
                 RETURN(rc);
         }
 
-        LASSERT(ergo(src_oa->o_valid & OBD_MD_FLFLAGS,
-                     !!(src_oa->o_flags & OBD_FL_CREATE_CROW) !=
-                     !!(src_oa->o_flags & OBD_FL_RECREATE_OBJS)));
-        
         lov = &exp->exp_obd->u.lov;
         if (!lov->desc.ld_active_tgt_count)
                 RETURN(-EIO);
 
+        /* Recreate a specific object id at the given OST index */
+        if ((src_oa->o_valid & OBD_MD_FLFLAGS) &&
+            (src_oa->o_flags & OBD_FL_RECREATE_OBJS)) {
+                 rc = lov_recreate(exp, src_oa, ea, oti);
+                 RETURN(rc);
+        }
+
         rc = lov_prep_create_set(exp, ea, src_oa, oti, &set);
         if (rc)
                 RETURN(rc);
index f5b8a97..4fecc71 100644 (file)
@@ -451,7 +451,6 @@ static int fsfilt_ext3_setattr(struct dentry *dentry, void *handle,
         }
 
         unlock_kernel();
-
         return rc;
 }
 
index 788d784..ec6da33 100644 (file)
@@ -2021,6 +2021,14 @@ int mds_postrecov(struct obd_device *obd)
         LASSERT(!obd->obd_recovering);
         LASSERT(llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT) != NULL);
 
+        /* set nextid first, so we are sure it happens */
+        rc = mds_lov_set_nextid(obd);
+        if (rc) {
+                CERROR("%s: mds_lov_set_nextid failed\n",
+                       obd->obd_name);
+                GOTO(out, rc);
+        }
+        
         /* clean PENDING dir */
         rc = mds_cleanup_pending(obd);
         if (rc < 0) {
index 4b163b8..744ef14 100644 (file)
@@ -106,37 +106,48 @@ int mds_lov_write_objids(struct obd_device *obd)
 
 int mds_lov_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
 {
-        struct lov_stripe_md  *empty_ea = NULL;
-        struct obd_trans_info oti = { 0 };
-        struct obdo *oa;
         int rc;
+        struct obdo oa;
+        struct obd_trans_info oti = {0};
+        struct lov_stripe_md  *empty_ea = NULL;
         ENTRY;
 
         LASSERT(mds->mds_lov_objids != NULL);
 
-        oa = obdo_alloc();
-        if (oa == NULL)
-                RETURN(-ENOMEM);
-
-        oa->o_valid = OBD_MD_FLFLAGS;
-        oa->o_flags = OBD_FL_DELORPHAN;
-
+        /* This create will in fact either create or destroy:  If the OST is
+         * missing objects below this ID, they will be created.  If it finds
+         * objects above this ID, they will be removed. */
+        memset(&oa, 0, sizeof(oa));
+        oa.o_valid = OBD_MD_FLFLAGS;
+        oa.o_flags = OBD_FL_DELORPHAN;
         if (ost_uuid != NULL) {
-                memcpy(&oa->o_inline, ost_uuid, sizeof(*ost_uuid));
-                oa->o_valid |= OBD_MD_FLINLINE;
+                memcpy(&oa.o_inline, ost_uuid, sizeof(*ost_uuid));
+                oa.o_valid |= OBD_MD_FLINLINE;
         }
+        rc = obd_create(mds->mds_osc_exp, &oa, &empty_ea, &oti);
+
+        RETURN(rc);
+}
+
+/* update the LOV-OSC knowledge of the last used object id's */
+int mds_lov_set_nextid(struct obd_device *obd)
+{
+        struct mds_obd *mds = &obd->u.mds;
+        int rc;
+        ENTRY;
 
-        oti.oti_objid = mds->mds_lov_objids;
-        rc = obd_create(mds->mds_osc_exp, oa, &empty_ea, &oti);
+        LASSERT(!obd->obd_recovering);
 
-        obdo_free(oa);
+        LASSERT(mds->mds_lov_objids != NULL);
+
+        rc = obd_set_info(mds->mds_osc_exp, strlen("next_id"), "next_id",
+                          mds->mds_lov_desc.ld_tgt_count, mds->mds_lov_objids);
         RETURN(rc);
 }
 
 /* update the LOV-OSC knowledge of the last used object id's */
 int mds_lov_connect(struct obd_device *obd, char * lov_name)
 {
-        struct obd_connect_data *data = NULL;
         struct mds_obd *mds = &obd->u.mds;
         struct lustre_handle conn = {0,};
         int valsize;
@@ -156,15 +167,8 @@ int mds_lov_connect(struct obd_device *obd, char * lov_name)
                 RETURN(-ENOTCONN);
         }
 
-        OBD_ALLOC_PTR(data);
-        if (!data)
-                RETURN(-ENOMEM);
-        data->ocd_connect_flags = OBD_CONNECT_CROW;
-
         rc = obd_connect(&conn, mds->mds_osc_obd, &obd->obd_uuid,
-                         data);
-        OBD_FREE_PTR(data);
-        
+                         NULL /* obd_connect_data */);
         if (rc) {
                 CERROR("MDS cannot connect to LOV %s (%d)\n", lov_name, rc);
                 mds->mds_osc_obd = ERR_PTR(rc);
index d6a468a..f688174 100644 (file)
@@ -380,7 +380,6 @@ static int mds_create_objects(struct ptlrpc_request *req, int offset,
         oa->o_gid = 0;
         oa->o_mode = S_IFREG | 0600;
         oa->o_id = inode->i_ino;
-        oa->o_flags = OBD_FL_CREATE_CROW;
         oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLFLAGS |
                 OBD_MD_FLMODE | OBD_MD_FLUID | OBD_MD_FLGID;
         oa->o_size = 0;
@@ -445,12 +444,6 @@ static int mds_create_objects(struct ptlrpc_request *req, int offset,
                 oa->o_generation = body->fid1.generation;
                 oa->o_valid |= OBD_MD_FLFID | OBD_MD_FLGENER;
 
-                /* do not set CROW flag in setattr path as it is not needed
-                 * there and only confuses setattr code in filter. */
-                oa->o_flags &= ~OBD_FL_CREATE_CROW;
-                if (!oa->o_flags)
-                        oa->o_valid &= ~OBD_MD_FLFLAGS;
-                
                 rc = obd_setattr(mds->mds_osc_exp, oa, lsm, &oti);
                 if (rc) {
                         CERROR("error setting attrs for inode %lu: rc %d\n",
index 7c7a755..d800547 100644 (file)
@@ -450,9 +450,9 @@ static int mds_reint_setattr(struct mds_update_record *rec, int offset,
                              struct ptlrpc_request *req,
                              struct lustre_handle *lh)
 {
+        unsigned int ia_valid = rec->ur_iattr.ia_valid;
         struct mds_obd *mds = mds_req2mds(req);
         struct obd_device *obd = req->rq_export->exp_obd;
-        unsigned int ia_valid = rec->ur_iattr.ia_valid;
         struct mds_body *body;
         struct dentry *de;
         struct inode *inode = NULL;
@@ -600,10 +600,7 @@ static int mds_reint_setattr(struct mds_update_record *rec, int offset,
         mds_pack_inode2fid(&body->fid1, inode);
         mds_pack_inode2body(body, inode);
 
-        /* don't return OST-specific attributes if we didn't just set them. Use
-         * saved ->ia_valid here, as rec->ur_iattr.ia_valid gets rewritten by
-         * fsfilt_setattr() what breaks case of truncating file with no object
-         * on OST and no lsm (test_34c from sanity.sh). --umka */
+        /* don't return OST-specific attributes if we didn't just set them. */
         if (ia_valid & ATTR_SIZE)
                 body->valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
         if (ia_valid & (ATTR_MTIME | ATTR_MTIME_SET))
index 81dad57..2f6c647 100644 (file)
@@ -558,12 +558,6 @@ static int filter_cleanup_groups(struct obd_device *obd)
         int i;
         ENTRY;
 
-        if (filter->fo_blacklist != NULL) {
-                OBD_FREE(filter->fo_blacklist,
-                         FILTER_GROUPS * sizeof(struct filter_ext));
-                filter->fo_blacklist = NULL;
-        }
-        
         if (filter->fo_dentry_O_groups != NULL) {
                 for (i = 0; i < FILTER_GROUPS; i++) {
                         dentry = filter->fo_dentry_O_groups[i];
@@ -616,11 +610,6 @@ static int filter_prep_groups(struct obd_device *obd)
         int i, rc = 0, cleanup_phase = 0;
         ENTRY;
 
-        OBD_ALLOC(filter->fo_blacklist,
-                  FILTER_GROUPS * sizeof(struct filter_ext));
-        if (!filter->fo_blacklist)
-                GOTO(cleanup, rc = -ENOMEM);
-        
         O_dentry = simple_mkdir(current->fs->pwd, "O", 0700, 1);
         CDEBUG(D_INODE, "got/created O: %p\n", O_dentry);
         if (IS_ERR(O_dentry)) {
@@ -880,35 +869,34 @@ static void filter_post(struct obd_device *obd)
         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
 }
 
-static void filter_set_last_id(struct filter_obd *filter, 
-                              int group, obd_id id)
+static void filter_set_last_id(struct filter_obd *filter, struct obdo *oa,
+                               obd_id id)
 {
+        obd_gr group = 0;
         LASSERT(filter->fo_fsd != NULL);
-        LASSERT(group <= FILTER_GROUPS);
 
-        spin_lock(&filter->fo_objidlock);
-        filter->fo_last_objids[group] = id;
-        spin_unlock(&filter->fo_objidlock);
-}
-
-static void filter_grow_last_id(struct filter_obd *filter, 
-                                int group, obd_id id)
-{
-        LASSERT(filter->fo_fsd != NULL);
-        LASSERT(group <= FILTER_GROUPS);
+        if (oa != NULL) {
+                LASSERT(oa->o_gr <= FILTER_GROUPS);
+                group = oa->o_gr;
+        }
 
         spin_lock(&filter->fo_objidlock);
-        if (id > filter->fo_last_objids[group])
         filter->fo_last_objids[group] = id;
         spin_unlock(&filter->fo_objidlock);
 }
 
-__u64 filter_last_id(struct filter_obd *filter, int group)
+__u64 filter_last_id(struct filter_obd *filter, struct obdo *oa)
 {
         obd_id id;
+        obd_gr group = 0;
         LASSERT(filter->fo_fsd != NULL);
-        LASSERT(group < FILTER_GROUPS);
 
+        if (oa != NULL) {
+                LASSERT(oa->o_gr <= FILTER_GROUPS);
+                group = oa->o_gr;
+        }
+
+        /* FIXME: object groups */
         spin_lock(&filter->fo_objidlock);
         id = filter->fo_last_objids[group];
         spin_unlock(&filter->fo_objidlock);
@@ -916,46 +904,12 @@ __u64 filter_last_id(struct filter_obd *filter, int group)
         return id;
 }
 
-static void filter_lock_dentry(struct obd_device *obd,
-                               struct dentry *dparent)
+static int filter_lock_dentry(struct obd_device *obd, struct dentry *dparent)
 {
         down(&dparent->d_inode->i_sem);
+        return 0;
 }
 
-static void filter_unlock_dentry(struct obd_device *obd,
-                                 struct dentry *dparent)
-{
-        up(&dparent->d_inode->i_sem);
-}
-
-static void filter_parents_access(struct obd_device *obd,
-                                  obd_gr group, int lock)
-{
-        void (*access_func) (struct obd_device *, struct dentry *);
-        struct filter_obd *filter = &obd->u.filter;
-        struct dentry *dparent;
-        int i = 0;
-
-        access_func = lock ? filter_lock_dentry :
-                filter_unlock_dentry;
-        
-        if (group > 0 || filter->fo_subdir_count == 0) {
-                dparent = filter->fo_dentry_O_groups[group];
-                access_func(obd, dparent);
-        } else {
-                for (i = 0; i < filter->fo_subdir_count; i++) {
-                        dparent = filter->fo_dentry_O_sub[i];
-                        access_func(obd, dparent);
-                }
-        }
-}
-
-#define LOCK_PARENTS(obd, group)   \
-        filter_parents_access(obd, group, 1)
-
-#define UNLOCK_PARENTS(obd, group) \
-        filter_parents_access(obd, group, 0)
-
 /* We never dget the object parent, so DON'T dput it either */
 struct dentry *filter_parent(struct obd_device *obd, obd_gr group, obd_id objid)
 {
@@ -972,22 +926,22 @@ struct dentry *filter_parent(struct obd_device *obd, obd_gr group, obd_id objid)
 struct dentry *filter_parent_lock(struct obd_device *obd, obd_gr group,
                                   obd_id objid)
 {
-        struct dentry *dparent = filter_parent(obd, group, objid);
         unsigned long now = jiffies;
+        struct dentry *dparent = filter_parent(obd, group, objid);
+        int rc;
 
         if (IS_ERR(dparent))
                 return dparent;
 
-        filter_lock_dentry(obd, dparent);
+        rc = filter_lock_dentry(obd, dparent);
         fsfilt_check_slow(now, obd_timeout, "parent lock");
-        return dparent;
+        return rc ? ERR_PTR(rc) : dparent;
 }
 
-/* we never dget the object parent, so DON'T dput it either */
-static void filter_parent_unlock(struct obd_device *obd,
-                                 struct dentry *dparent)
+/* We never dget the object parent, so DON'T dput it either */
+static void filter_parent_unlock(struct dentry *dparent)
 {
-        filter_unlock_dentry(obd, dparent);
+        up(&dparent->d_inode->i_sem);
 }
 
 /* How to get files, dentries, inodes from object id's.
@@ -1029,7 +983,7 @@ struct dentry *filter_fid2dentry(struct obd_device *obd,
                dparent->d_name.len, dparent->d_name.name, name);
         dchild = /*ll_*/lookup_one_len(name, dparent, len);
         if (dir_dentry == NULL)
-                filter_parent_unlock(obd, dparent);
+                filter_parent_unlock(dparent);
         if (IS_ERR(dchild)) {
                 CERROR("%s: child lookup error %ld\n", obd->obd_name,
                        PTR_ERR(dchild));
@@ -1075,8 +1029,9 @@ static int filter_prepare_destroy(struct obd_device *obd, obd_id objid)
 
 /* Caller must hold LCK_PW on parent and push us into kernel context.
  * Caller is also required to ensure that dchild->d_inode exists. */
-static int filter_unlink(struct obd_device *obd, obd_id objid,
-                         struct dentry *dparent, struct dentry *dchild)
+static int filter_destroy_internal(struct obd_device *obd, obd_id objid,
+                                   struct dentry *dparent,
+                                   struct dentry *dchild)
 {
         struct inode *inode = dchild->d_inode;
         int rc;
@@ -1425,8 +1380,8 @@ int filter_common_setup(struct obd_device *obd, obd_count len, void *buf,
                 GOTO(err_mntput, rc);
 
         filter->fo_destroy_in_progress = 0;
-
-        spin_lock_init(&filter->fo_blacklist_lock);
+        sema_init(&filter->fo_create_lock, 1);
+        
         spin_lock_init(&filter->fo_translock);
         spin_lock_init(&filter->fo_objidlock);
         spin_lock_init(&filter->fo_stats_lock);
@@ -1685,7 +1640,6 @@ static int filter_cleanup(struct obd_device *obd)
 static int filter_connect_internal(struct obd_export *exp,
                                    struct obd_connect_data *data)
 {
-        struct filter_obd *filter = &exp->exp_obd->u.filter;
         if (data != NULL) {
                 CDEBUG(D_RPCTRACE, "%s: cli %s/%p ocd_connect_flags: "LPX64
                        " ocd_version: %x ocd_grant: %d\n",
@@ -1697,16 +1651,6 @@ static int filter_connect_internal(struct obd_export *exp,
                 exp->exp_connect_flags = data->ocd_connect_flags;
                 data->ocd_version = LUSTRE_VERSION_CODE;
 
-                if (!(filter->fo_fsd->fsd_feature_rocompat &
-                      cpu_to_le32(OBD_ROCOMPAT_CROW)) &&
-                    data->ocd_connect_flags & OBD_CONNECT_CROW) {
-                        filter->fo_fsd->fsd_feature_rocompat |=
-                                cpu_to_le32(OBD_ROCOMPAT_CROW);
-                        filter_update_server_data(exp->exp_obd,
-                                                  filter->fo_rcvd_filp,
-                                                  filter->fo_fsd, 1);
-                }
-
                 if (exp->exp_connect_flags & OBD_CONNECT_GRANT) {
                         obd_size left, want;
 
@@ -2014,6 +1958,45 @@ static int filter_getattr(struct obd_export *exp, struct obdo *oa,
         RETURN(rc);
 }
 
+/* this should be enabled/disabled in condition to enabled/disabled large inodes
+ * in backing store FS. */
+int filter_update_fidea(struct obd_export *exp, struct inode *inode,
+                        void *handle, struct obdo *oa)
+{
+        struct obd_device *obd = exp->exp_obd;
+        int rc = 0;
+        ENTRY;
+        
+        if (oa->o_valid & OBD_MD_FLFID) {
+                struct filter_fid ff;
+                obd_gr group = 0;
+
+                if (oa->o_valid & OBD_MD_FLGROUP)
+                        group = oa->o_gr;
+
+                /* packing fid and converting it to LE for storing into EA. Here
+                 * ->o_stripe_idx should be filled by LOV and rest of fields -
+                 * by client. */
+                ff.ff_fid.id = cpu_to_le64(oa->o_fid);
+                ff.ff_fid.f_type = cpu_to_le32(oa->o_stripe_idx);
+                ff.ff_fid.generation = cpu_to_le32(oa->o_generation);
+                ff.ff_objid = cpu_to_le64(oa->o_id);
+                ff.ff_group = cpu_to_le64(group);
+
+                CDEBUG(D_INODE, "storing filter fid EA ("LPU64"/%u/%u"
+                       LPU64"/"LPU64")\n", oa->o_fid, oa->o_stripe_idx,
+                       oa->o_generation, oa->o_id, group);
+                        
+                rc = fsfilt_set_md(obd, inode, handle, &ff, sizeof(ff));
+                if (rc)
+                        CERROR("store fid in object failed! rc: %d\n", rc);
+        } else {
+                CDEBUG(D_HA, "OSS object without fid info!\n");
+        }
+
+        RETURN(rc);
+}
+
 /* this is called from filter_truncate() until we have filter_punch() */
 int filter_setattr_internal(struct obd_export *exp, struct dentry *dentry,
                             struct obdo *oa, struct obd_trans_info *oti)
@@ -2021,14 +2004,17 @@ int filter_setattr_internal(struct obd_export *exp, struct dentry *dentry,
         unsigned int orig_ids[MAXQUOTAS] = {0, 0};
         struct llog_cookie *fcc = NULL;
         struct filter_obd *filter;
+        int rc, err, locked = 0;
+        struct inode *inode;
         struct iattr iattr;
         void *handle;
-        int rc, err;
         ENTRY;
 
         LASSERT(dentry != NULL);
         LASSERT(!IS_ERR(dentry));
-        LASSERT(dentry->d_inode != NULL);
+
+        inode = dentry->d_inode;
+        LASSERT(inode != NULL);
 
         filter = &exp->exp_obd->u.filter;
         iattr_from_obdo(&iattr, oa, oa->o_valid);
@@ -2039,16 +2025,48 @@ int filter_setattr_internal(struct obd_export *exp, struct dentry *dentry,
                         memcpy(fcc, obdo_logcookie(oa), sizeof(*fcc));
         }
 
-        if (iattr.ia_valid & ATTR_SIZE)
-                down(&dentry->d_inode->i_sem);
+        if (iattr.ia_valid & ATTR_SIZE || iattr.ia_valid & (ATTR_UID | ATTR_GID)) {
+                down(&inode->i_sem);
+                locked = 1;
+        }
 
+        /* If the inode still has SUID+SGID bits set (see filter_precreate())
+         * then we will accept the UID+GID sent by the client during write for
+         * initializing the ownership of this inode.  We only allow this to
+         * happen once so clear these bits in setattr. In 2.6 kernels it is
+         * possible to get ATTR_UID and ATTR_GID separately, so we only clear
+         * the flags that are actually being set. */
         if (iattr.ia_valid & (ATTR_UID | ATTR_GID)) {
-                orig_ids[USRQUOTA] = dentry->d_inode->i_uid;
-                orig_ids[GRPQUOTA] = dentry->d_inode->i_gid;
-                handle = fsfilt_start_log(exp->exp_obd, dentry->d_inode,
+                CDEBUG(D_INODE, "update UID/GID to %lu/%lu\n",
+                       (unsigned long)oa->o_uid, (unsigned long)oa->o_gid);
+
+                if ((inode->i_mode & S_ISUID) &&
+                    (iattr.ia_valid & ATTR_UID)) {
+                        if (!(iattr.ia_valid & ATTR_MODE)) {
+                                iattr.ia_mode = inode->i_mode;
+                                iattr.ia_valid |= ATTR_MODE;
+                        }
+                        iattr.ia_mode &= ~S_ISUID;
+                }
+                if ((inode->i_mode & S_ISGID) &&
+                    (iattr.ia_valid & ATTR_GID)) {
+                        if (!(iattr.ia_valid & ATTR_MODE)) {
+                                iattr.ia_mode = inode->i_mode;
+                                iattr.ia_valid |= ATTR_MODE;
+                        }
+                        iattr.ia_mode &= ~S_ISGID;
+                }
+
+                orig_ids[USRQUOTA] = inode->i_uid;
+                orig_ids[GRPQUOTA] = inode->i_gid;
+                handle = fsfilt_start_log(exp->exp_obd, inode,
                                           FSFILT_OP_SETATTR, oti, 1);
+
+                /* update inode EA only once */
+                if (inode->i_mode & S_ISUID || inode->i_mode & S_ISGID)
+                        filter_update_fidea(exp, inode, handle, oa);
         } else {
-                handle = fsfilt_start(exp->exp_obd, dentry->d_inode,
+                handle = fsfilt_start(exp->exp_obd, inode,
                                       FSFILT_OP_SETATTR, oti);
         }
 
@@ -2056,7 +2074,7 @@ int filter_setattr_internal(struct obd_export *exp, struct dentry *dentry,
                 GOTO(out_unlock, rc = PTR_ERR(handle));
 
         if (oa->o_valid & OBD_MD_FLFLAGS) {
-                rc = fsfilt_iocontrol(exp->exp_obd, dentry->d_inode, NULL,
+                rc = fsfilt_iocontrol(exp->exp_obd, inode, NULL,
                                       EXT3_IOC_SETFLAGS, (long)&oa->o_flags);
         } else {
                 rc = fsfilt_setattr(exp->exp_obd, dentry, handle, &iattr, 1);
@@ -2068,9 +2086,14 @@ int filter_setattr_internal(struct obd_export *exp, struct dentry *dentry,
                                               fcc);
         }
 
+        if (locked) {
+                up(&inode->i_sem);
+                locked = 0;
+        }
+
         rc = filter_finish_transno(exp, oti, rc);
         
-        err = fsfilt_commit(exp->exp_obd, dentry->d_inode, handle, 0);
+        err = fsfilt_commit(exp->exp_obd, inode, handle, 0);
         if (err) {
                 CERROR("error on commit, err = %d\n", err);
                 if (!rc)
@@ -2078,8 +2101,8 @@ int filter_setattr_internal(struct obd_export *exp, struct dentry *dentry,
         }
         EXIT;
 out_unlock:
-        if (iattr.ia_valid & ATTR_SIZE)
-                up(&dentry->d_inode->i_sem);
+        if (locked)
+                up(&inode->i_sem);
 
         /* trigger quota release */
         if (iattr.ia_valid & (ATTR_SIZE | ATTR_UID | ATTR_GID)) {
@@ -2105,16 +2128,13 @@ int filter_setattr(struct obd_export *exp, struct obdo *oa,
         int rc;
         ENTRY;
 
-        //LASSERT(oti != NULL);
-
+        dentry = __filter_oa2dentry(exp->exp_obd, oa,
+                                    __FUNCTION__, 1);
+        if (IS_ERR(dentry))
+                RETURN(PTR_ERR(dentry));
+                        
         filter = &exp->exp_obd->u.filter;
         push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
-    
-        /* make sure that object is allocated. */
-        dentry = filter_crow_object(exp->exp_obd, oa);
-        if (IS_ERR(dentry))
-                GOTO(out_pop, rc = PTR_ERR(dentry));
-
         lock_kernel();
 
         /* setting objects attributes (including owner/group) */
@@ -2142,7 +2162,6 @@ int filter_setattr(struct obd_export *exp, struct obdo *oa,
 out_unlock:
         unlock_kernel();
         f_dput(dentry);
-out_pop:
         pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
         return rc;
 }
@@ -2197,6 +2216,96 @@ static int filter_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
         RETURN(lsm_size);
 }
 
+static void filter_destroy_precreated(struct obd_export *exp, struct obdo *oa,
+                                      struct filter_obd *filter)
+{
+        struct obdo doa; /* XXX obdo on stack */
+        __u64 last, id;
+        ENTRY;
+        LASSERT(oa);
+
+        memset(&doa, 0, sizeof(doa));
+        if (oa->o_valid & OBD_MD_FLGROUP) {
+                doa.o_valid |= OBD_MD_FLGROUP;
+                doa.o_gr = oa->o_gr;
+        } else {
+                doa.o_gr = 0;
+        }
+        doa.o_mode = S_IFREG;
+
+        filter->fo_destroy_in_progress = 1;
+        down(&filter->fo_create_lock);
+        if (!filter->fo_destroy_in_progress) {
+                CERROR("%s: destroy_in_progress already cleared\n",
+                        exp->exp_obd->obd_name);
+                up(&filter->fo_create_lock);
+                EXIT;
+                return;
+        }
+
+        last = filter_last_id(filter, &doa);
+        CWARN("%s: deleting orphan objects from "LPU64" to "LPU64"\n",
+               exp->exp_obd->obd_name, oa->o_id + 1, last);
+        for (id = oa->o_id + 1; id <= last; id++) {
+                doa.o_id = id;
+                filter_destroy(exp, &doa, NULL, NULL, NULL);
+        }
+
+        CDEBUG(D_HA, "%s: after destroy: set last_objids["LPU64"] = "LPU64"\n",
+               exp->exp_obd->obd_name, doa.o_gr, oa->o_id);
+
+        spin_lock(&filter->fo_objidlock);
+        filter->fo_last_objids[doa.o_gr] = oa->o_id;
+        spin_unlock(&filter->fo_objidlock);
+
+        filter->fo_destroy_in_progress = 0;
+        up(&filter->fo_create_lock);
+
+        EXIT;
+}
+
+/* returns a negative error or a nonnegative number of files to create */
+static int filter_should_precreate(struct obd_export *exp, struct obdo *oa,
+                                   obd_gr group)
+{
+        struct obd_device *obd = exp->exp_obd;
+        struct filter_obd *filter = &obd->u.filter;
+        int diff, rc;
+        ENTRY;
+
+        diff = oa->o_id - filter_last_id(filter, oa);
+        CDEBUG(D_INFO, "filter_last_id() = "LPU64" -> diff = %d\n",
+               filter_last_id(filter, oa), diff);
+
+        /* delete orphans request */
+        if ((oa->o_valid & OBD_MD_FLFLAGS) &&
+            (oa->o_flags & OBD_FL_DELORPHAN)) {
+                if (diff >= 0)
+                        RETURN(diff);
+                if (-diff > OST_MAX_PRECREATE) {
+                        CERROR("%s: ignoring bogus orphan destroy request: "
+                               "obdid "LPU64" last_id "LPU64"\n", obd->obd_name,
+                               oa->o_id, filter_last_id(filter, oa));
+                        RETURN(-EINVAL);
+                }
+                filter_destroy_precreated(exp, oa, filter);
+                rc = filter_update_last_objid(obd, group, 0);
+                if (rc)
+                        CERROR("%s: unable to write lastobjid, but orphans"
+                               "were deleted\n", obd->obd_name);
+                RETURN(0);
+        } else {
+                /* only precreate if group == 0 and o_id is specfied */
+                if (!(oa->o_valid & OBD_FL_DELORPHAN) &&
+                    (group != 0 || oa->o_id == 0))
+                        RETURN(1);
+
+                LASSERTF(diff >= 0,"%s: "LPU64" - "LPU64" = %d\n",obd->obd_name,
+                         oa->o_id, filter_last_id(filter, oa), diff);
+                RETURN(diff);
+        }
+}
+
 static int filter_statfs(struct obd_device *obd, struct obd_statfs *osfs,
                          unsigned long max_age)
 {
@@ -2233,185 +2342,231 @@ static int filter_statfs(struct obd_device *obd, struct obd_statfs *osfs,
         RETURN(rc);
 }
 
-struct dentry *
-filter_create_object(struct obd_device *obd, struct obdo *oa)
+/* We rely on the fact that only one thread will be creating files in a given
+ * group at a time, which is why we don't need an atomic filter_get_new_id.
+ * Even if we had that atomic function, the following race would exist:
+ *
+ * thread 1: gets id x from filter_next_id
+ * thread 2: gets id (x + 1) from filter_next_id
+ * thread 2: creates object (x + 1)
+ * thread 1: tries to create object x, gets -ENOSPC
+ */
+static int filter_precreate(struct obd_device *obd, struct obdo *oa,
+                            obd_gr group, int *num)
 {
-        struct dentry *dparent = NULL;
-        struct dentry *dchild = NULL;
-        struct lvfs_ucred uc = {0,};
-        struct lvfs_run_ctxt saved;
+        struct dentry *dchild = NULL, *dparent = NULL;
         struct filter_obd *filter;
-        int cleanup_phase = 0;
-        int err = 0, rc = 0;
+        struct obd_statfs *osfs;
+        int err = 0, rc = 0, recreate_obj = 0, i;
+        unsigned long enough_time = jiffies + (obd_timeout * HZ) / 3;
+        __u64 next_id;
         void *handle = NULL;
-        obd_gr group = 0;
         ENTRY;
 
         filter = &obd->u.filter;
 
-        CDEBUG(D_INFO, "create objid "LPU64"\n", oa->o_id);
-
-        if (oa->o_valid & OBD_MD_FLGROUP)
-                group = oa->o_gr;
-
-        dparent = filter_parent_lock(obd, group, oa->o_id);
-        if (IS_ERR(dparent))
-                GOTO(cleanup, dchild = dparent);
-        cleanup_phase = 1;
-
-        /* check if object is in blacklist. This should be done under parent
-         * lock. */
-        spin_lock(&filter->fo_blacklist_lock);
-        if (oa->o_id > filter->fo_blacklist[group].fe_start &&
-            oa->o_id <= filter->fo_blacklist[group].fe_end) {
-                spin_unlock(&filter->fo_blacklist_lock);
-                GOTO(cleanup, dchild = ERR_PTR(-ENOENT));
-        }
-        spin_unlock(&filter->fo_blacklist_lock);
-
-        /* check if object is already allocated */
-        dchild = filter_fid2dentry(obd, dparent, group, oa->o_id);
-        if (IS_ERR(dchild))
-                GOTO(cleanup, dchild);
-
-        /* Files that already exist should only be below or at last_id */
-        if (dchild->d_inode) {
-                __u64 last_id = filter_last_id(filter, group);
-
-                LASSERTF(oa->o_id <= last_id,
-                         "existing objid "LPU64" larger than last_id "LPU64"\n",
-                         oa->o_id, last_id);
-                GOTO(cleanup, dchild);
+        if ((oa->o_valid & OBD_MD_FLFLAGS) &&
+            (oa->o_flags & OBD_FL_RECREATE_OBJS)) {
+                recreate_obj = 1;
+        } else {
+                OBD_ALLOC(osfs, sizeof(*osfs));
+                if (osfs == NULL)
+                        RETURN(-ENOMEM);
+                rc = filter_statfs(obd, osfs, jiffies - HZ);
+                if (rc == 0 && osfs->os_bavail < (osfs->os_blocks >> 10)) {
+                        CDEBUG(D_HA, "OST out of space! avail "LPU64"\n",
+                              osfs->os_bavail<<filter->fo_obt.obt_sb->s_blocksize_bits);
+                        *num=0;
+                        rc = -ENOSPC;
+                }
+                OBD_FREE(osfs, sizeof(*osfs));
+                if (rc) {
+                        RETURN(rc);
+                }
         }
 
-        /* create new object */
-        handle = fsfilt_start_log(obd, dparent->d_inode,
-                                  FSFILT_OP_CREATE, NULL, 1);
-        if (IS_ERR(handle))
-                GOTO(cleanup, dchild = handle);
-        cleanup_phase = 2;
-
-        uc.luc_fsuid = oa->o_valid & OBD_MD_FLUID ?
-                oa->o_uid : 0;
-        uc.luc_fsgid = oa->o_valid & OBD_MD_FLGID ?
-                oa->o_gid : 0;
-        uc.luc_cap = current->cap_effective;
+        CDEBUG(D_HA, "%s: precreating %d objects\n", obd->obd_name, *num);
 
-        cap_raise(uc.luc_cap, CAP_SYS_RESOURCE);
+        down(&filter->fo_create_lock);
 
-        push_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
-        rc = ll_vfs_create(dparent->d_inode, dchild, S_IFREG, NULL);
-        pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
+        for (i = 0; i < *num && err == 0; i++) {
+                int cleanup_phase = 0;
 
-        if (rc) {
-                CERROR("create failed rc = %d\n", rc);
-                f_dput(dchild);
-                GOTO(cleanup, dchild = ERR_PTR(rc));
-        }
+                if (filter->fo_destroy_in_progress) {
+                        CWARN("%s: precreate aborted by destroy\n",
+                              obd->obd_name);
+                        break;
+                }
 
-        /* grow last created object id. */
-        filter_grow_last_id(filter, group, oa->o_id);
-        rc = filter_update_last_objid(obd, group, 0);
-        if (rc) {
-                CERROR("unable to write lastobjid, but "
-                       "object is created, err = %d\n",
-                       rc);
-                rc = 0;
-        }
+                if (recreate_obj) {
+                        __u64 last_id;
+                        next_id = oa->o_id;
+                        last_id = filter_last_id(filter, oa);
+                        if (next_id > last_id) {
+                                CERROR("Error: Trying to recreate obj greater"
+                                       "than last id "LPD64" > "LPD64"\n",
+                                       next_id, last_id);
+                                GOTO(cleanup, rc = -EINVAL);
+                        }
+                } else
+                        next_id = filter_last_id(filter, oa) + 1;
+
+                CDEBUG(D_INFO, "precreate objid "LPU64"\n", next_id);
+
+                dparent = filter_parent_lock(obd, group, next_id);
+                if (IS_ERR(dparent))
+                        GOTO(cleanup, rc = PTR_ERR(dparent));
+                cleanup_phase = 1;
+
+                dchild = filter_fid2dentry(obd, dparent, group, next_id);
+                if (IS_ERR(dchild))
+                        GOTO(cleanup, rc = PTR_ERR(dchild));
+                cleanup_phase = 2;
+
+                if (dchild->d_inode != NULL) {
+                        /* This would only happen if lastobjid was bad on disk*/
+                        /* Could also happen if recreating missing obj but
+                         * already exists
+                         */
+                        if (recreate_obj) {
+                                CERROR("%s: recreating existing object %.*s?\n",
+                                       obd->obd_name, dchild->d_name.len,
+                                       dchild->d_name.name);
+                        } else {
+                                CERROR("%s: Serious error: objid %.*s already "
+                                       "exists; is this filesystem corrupt?\n",
+                                       obd->obd_name, dchild->d_name.len,
+                                       dchild->d_name.name);
+                                LBUG();
+                        }
+                        GOTO(cleanup, rc = -EEXIST);
+                }
 
-        /* nobody else is touching this newly created object */
-        LASSERT(dchild->d_inode);
+                handle = fsfilt_start_log(obd, dparent->d_inode,
+                                          FSFILT_OP_CREATE, NULL, 1);
+                if (IS_ERR(handle))
+                        GOTO(cleanup, rc = PTR_ERR(handle));
+                cleanup_phase = 3;
 
-        if (oa->o_valid & OBD_MD_FLFID) {
-                struct filter_fid ff;
+                rc = ll_vfs_create(dparent->d_inode, dchild,
+                                   S_IFREG |  S_ISUID | S_ISGID | 0666, NULL);
+                if (rc) {
+                        CERROR("create failed rc = %d\n", rc);
+                        GOTO(cleanup, rc);
+                }
 
-                /* packing fid and converting it to LE for storing into EA. Here
-                 * oa->o_stripe_idx should be filled by LOV and rest of fields -
-                 * by client. */
-                ff.ff_fid.id = cpu_to_le64(oa->o_fid);
-                ff.ff_fid.f_type = cpu_to_le32(oa->o_stripe_idx);
-                ff.ff_fid.generation = cpu_to_le32(oa->o_generation);
-                ff.ff_objid = cpu_to_le64(oa->o_id);
-                ff.ff_group = cpu_to_le64(group);
+                if (!recreate_obj) {
+                        filter_set_last_id(filter, oa, next_id);
+                        err = filter_update_last_objid(obd, group, 0);
+                        if (err)
+                                CERROR("unable to write lastobjid "
+                                       "but file created\n");
+                }
 
-                down(&dchild->d_inode->i_sem);
-                rc = fsfilt_set_md(obd, dchild->d_inode, handle,&ff,sizeof(ff));
-                up(&dchild->d_inode->i_sem);
-                if (rc) {
-                        CERROR("store fid in object failed! rc:%d\n", rc);
+        cleanup:
+                switch(cleanup_phase) {
+                case 3:
+                        err = fsfilt_commit(obd, dparent->d_inode, handle, 0);
+                        if (err) {
+                                CERROR("error on commit, err = %d\n", err);
+                                if (!rc)
+                                        rc = err;
+                        }
+                case 2:
                         f_dput(dchild);
-                        GOTO(cleanup, dchild = ERR_PTR(rc));
+                case 1:
+                        filter_parent_unlock(dparent);
+                case 0:
+                        break;
                 }
-        } else {
-                CDEBUG(D_HA, "create OSS object without fid!\n");
-        }
 
-cleanup:
-        switch(cleanup_phase) {
-        case 2:
-                err = fsfilt_commit(obd, dparent->d_inode, handle, 0);
-                if (err) {
-                        CERROR("error on commit, err = %d\n", err);
-                        if (!rc) {
-                                rc = err;
-                                f_dput(dchild);
-                                dchild = ERR_PTR(rc);
-                        }
+                if (rc)
+                        break;
+                if (time_after(jiffies, enough_time)) {
+                        CDEBUG(D_INODE,"%s: precreate slow - want %d got %d \n",
+                               obd->obd_name, *num, i);
+                        break;
                 }
-        case 1:
-                filter_parent_unlock(obd, dparent);
-        case 0:
-                break;
         }
+        *num = i;
 
-        RETURN(dchild);
+        up(&filter->fo_create_lock);
+
+        CDEBUG(D_HA, "%s: server last_objid for group "LPU64": "LPU64"\n",
+               obd->obd_name, group, filter->fo_last_objids[group]);
+
+        CDEBUG(D_HA, "%s: filter_precreate() created %d objects\n",
+               obd->obd_name, i);
+        RETURN(rc);
 }
 
-struct dentry *
-filter_crow_object(struct obd_device *obd, struct obdo *oa)
+static int filter_create(struct obd_export *exp, struct obdo *oa,
+                         struct lov_stripe_md **ea, struct obd_trans_info *oti)
 {
-        struct filter_obd *filter;
-        struct dentry *dentry;
+        struct obd_device *obd = NULL;
+        struct lvfs_run_ctxt saved;
+        struct lov_stripe_md *lsm = NULL;
         obd_gr group = 0;
+        int rc = 0, diff;
         ENTRY;
 
-        if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_OST_CROW_EIO))
-                RETURN(ERR_PTR(-EIO));
-        
-        filter = &obd->u.filter;
-
         if (oa->o_valid & OBD_MD_FLGROUP)
                 group = oa->o_gr;
 
-        /* try to create new object (if it is not yet) */
-        dentry = filter_create_object(obd, oa);
-        if (IS_ERR(dentry)) {
-                CERROR("cannot create OSS object "LPU64"/"LPU64
-                       ", err = %d\n", oa->o_id, group,
-                       (int)PTR_ERR(dentry));
-                RETURN(dentry);
+        CDEBUG(D_INFO, "filter_create(od->o_gr="LPU64",od->o_id="LPU64")\n",
+               group, oa->o_id);
+        if (ea != NULL) {
+                lsm = *ea;
+                if (lsm == NULL) {
+                        rc = obd_alloc_memmd(exp, &lsm);
+                        if (rc < 0)
+                                RETURN(rc);
+                }
         }
 
-        RETURN(dentry);
+        obd = exp->exp_obd;
+        push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+
+        if ((oa->o_valid & OBD_MD_FLFLAGS) &&
+            (oa->o_flags & OBD_FL_RECREATE_OBJS)) {
+                if (oa->o_id > filter_last_id(&obd->u.filter, oa)) {
+                        CERROR("recreate objid "LPU64" > last id "LPU64"\n",
+                               oa->o_id, filter_last_id(&obd->u.filter, oa));
+                        rc = -EINVAL;
+                } else {
+                        diff = 1;
+                        rc = filter_precreate(obd, oa, group, &diff);
+                }
+        } else {
+                diff = filter_should_precreate(exp, oa, group);
+                if (diff > 0) {
+                        oa->o_id = filter_last_id(&obd->u.filter, oa);
+                        rc = filter_precreate(obd, oa, group, &diff);
+                        oa->o_id = filter_last_id(&obd->u.filter, oa);
+                        oa->o_valid = OBD_MD_FLID;
+                }
+        }
+
+        pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+        if (rc && ea != NULL && *ea != lsm) {
+                obd_free_memmd(exp, &lsm);
+        } else if (rc == 0 && ea != NULL) {
+                /* XXX LOV STACKING: the lsm that is passed to us from
+                 * LOV does not have valid lsm_oinfo data structs, so
+                 * don't go touching that.  This needs to be fixed in a
+                 * big way. */
+                lsm->lsm_object_id = oa->o_id;
+                *ea = lsm;
+        }
+
+        RETURN(rc);
 }
 
-/* destroys object @oa. Takes care of locking if @lock says that parent is not
- * yet locked. Also drops parent lock before taking ldlm PW lock to avoid
- * deadlocks in lock retraction related paths.
- *
- * This function does not change locking and does not imply hiden locking
- * knowladge. After this fucntion is finished, all parents stay at the same
- * locking state.
-
- * If @lock == 1, this means that parent of @oa is not locked and should be
- * locked for destroy operation. However, after operation is finished, parent
- * will be unlocked. The same is true about opposite case, when parent is
- * already locked and filter_destroy_internal() does not need to lock it. */
-static int
-filter_destroy_internal(struct obd_export *exp, struct obdo *oa,
-                        struct lov_stripe_md *md, struct obd_trans_info *oti,
-                        int lock)
+int filter_destroy(struct obd_export *exp, struct obdo *oa,
+                   struct lov_stripe_md *md, struct obd_trans_info *oti,
+                   struct obd_export *md_exp)
 {
+        unsigned int qcids[MAXQUOTAS] = {0, 0};
         struct obd_device *obd;
         struct filter_obd *filter;
         struct dentry *dchild = NULL, *dparent = NULL;
@@ -2419,7 +2574,6 @@ filter_destroy_internal(struct obd_export *exp, struct obdo *oa,
         void *handle = NULL;
         struct llog_cookie *fcc = NULL;
         int rc, rc2, cleanup_phase = 0, have_prepared = 0;
-        unsigned int qcids[MAXQUOTAS] = {0, 0};
         obd_gr group = 0;
         ENTRY;
 
@@ -2432,9 +2586,7 @@ filter_destroy_internal(struct obd_export *exp, struct obdo *oa,
         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
 
  acquire_locks:
-        dparent = lock ?
-                filter_parent_lock(obd, group, oa->o_id):
-                filter_parent(obd, group, oa->o_id);
+        dparent = filter_parent_lock(obd, group, oa->o_id);
         if (IS_ERR(dparent))
                 GOTO(cleanup, rc = PTR_ERR(dparent));
         cleanup_phase = 1;
@@ -2457,11 +2609,11 @@ filter_destroy_internal(struct obd_export *exp, struct obdo *oa,
         }
 
         if (!have_prepared) {
-                /* If we're really going to destroy the object, get ready by
-                 * getting the clients to discard their cached data.
+                /* If we're really going to destroy the object, get ready
+                 * by getting the clients to discard their cached data.
                  *
                  * We have to drop the parent lock, because
-                 * filter_prepare_destroy() will acquire a PW on the object, and
+                 * filter_prepare_destroy will acquire a PW on the object, and
                  * we don't want to deadlock with an incoming write to the
                  * object, which has the extent PW and then wants to get the
                  * parent dentry to do the lookup.
@@ -2470,15 +2622,9 @@ filter_destroy_internal(struct obd_export *exp, struct obdo *oa,
                  * complication of condition the above code to skip it on the
                  * second time through. */
                 f_dput(dchild);
+                filter_parent_unlock(dparent);
 
-                filter_unlock_dentry(obd, dparent);
                 filter_prepare_destroy(obd, oa->o_id);
-
-                /* lock parent dentry again, to keep locking state the same as
-                 * before calling this function. */
-                if (!lock)
-                        filter_lock_dentry(obd, dparent);
-
                 have_prepared = 1;
                 goto acquire_locks;
         }
@@ -2497,8 +2643,9 @@ filter_destroy_internal(struct obd_export *exp, struct obdo *oa,
 
         /* Quota release need uid/gid of inode */
         obdo_from_inode(oa, dchild->d_inode, OBD_MD_FLUID|OBD_MD_FLGID);
-        rc = filter_unlink(obd, oa->o_id, dparent, dchild);
+        rc = filter_destroy_internal(obd, oa->o_id, dparent, dchild);
 
+        EXIT;
 cleanup:
         switch(cleanup_phase) {
         case 3:
@@ -2517,8 +2664,7 @@ cleanup:
         case 2:
                 f_dput(dchild);
         case 1:
-                if (lock)
-                        filter_parent_unlock(obd, dparent);
+                filter_parent_unlock(dparent);
         case 0:
                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
                 break;
@@ -2534,207 +2680,7 @@ cleanup:
                             FSFILT_OP_UNLINK); 
         CDEBUG(rc2 ? D_ERROR : D_QUOTA, 
                "filter adjust qunit! (rc:%d)\n", rc2);
-
-        RETURN(rc);
-}
-
-/* destroy oject with taking lock on parent first. */
-int filter_destroy(struct obd_export *exp, struct obdo *oa,
-                   struct lov_stripe_md *md, struct obd_trans_info *oti,
-                   struct obd_export *md_exp)
-{
-        int rc;
-
-        ENTRY;
-        rc = filter_destroy_internal(exp, oa, md, oti, 1);
-        RETURN(rc);
-}
-
-static int
-filter_clear_orphans(struct obd_export *exp, struct obdo *oa)
-{
-        struct filter_obd *filter;
-        struct obd_device *obd;
-        struct obdo *doa;
-        obd_gr group = 0;
-        int rc, orphans;
-        __u64 last, id;
-        ENTRY;
-
-        LASSERT(oa);
-
-        OBD_RACE(OBD_FAIL_OST_CLEAR_ORPHANS_RACE);
-
-        obd = exp->exp_obd;
-        filter = &obd->u.filter;
-
-        if (oa->o_valid & OBD_MD_FLGROUP)
-                group = oa->o_gr;
-
-        filter->fo_destroy_in_progress = 1;
-        
-        LOCK_PARENTS(obd, group);
-        if (!filter->fo_destroy_in_progress) {
-                UNLOCK_PARENTS(obd, group);
-                CDEBUG(D_HA, "cleanup orphans is already canceled\n");
-                RETURN(0);
-        }
-
-        last = filter_last_id(filter, group);
-        orphans = last - oa->o_id;
-        
-        if (orphans <= 0) {
-                filter->fo_destroy_in_progress = 0;
-                UNLOCK_PARENTS(obd, group);
-                CDEBUG(D_HA, "nothing to cleanup, MDS objid "LPU64
-                       " is not bigger than OST one "LPU64"\n",
-                       oa->o_id, last);
-                RETURN(0);
-        }
-
-        CDEBUG(D_HA, "adding orphans extent "LPU64":"LPU64"-"LPU64
-               " to blacklist\n", group, oa->o_id, last);
-
-        /* making all orphans entries in blacklist, that will deny to re-create
-         * them by CROW in filter_create_object(). This is done for case when
-         * orphans already exist on client and will be tried to write something
-         * and we want to stop them.
-         *
-         * In fact the issue is even worse, as we want to put in blacklist not
-         * only the objects which we just destroed, but also those which not yet
-         * created on OST (and OST has no idea about) but possibly existing on
-         * clients. */
-        spin_lock(&filter->fo_blacklist_lock);
-        filter->fo_blacklist[group].fe_start = oa->o_id;
-        filter->fo_blacklist[group].fe_end = last;
-        spin_unlock(&filter->fo_blacklist_lock);
-        
-       doa = obdo_alloc();
-        if (doa == NULL) {
-                filter->fo_destroy_in_progress = 0;
-                UNLOCK_PARENTS(obd, group);
-                RETURN(-ENOMEM);
-        }
-
-        doa->o_gr = group;
-        doa->o_mode = S_IFREG;
-        doa->o_valid = oa->o_valid & (OBD_MD_FLGROUP | OBD_MD_FLID);
-
-        CDEBUG(D_ERROR, "%s:["LPU64"] deleting orphan objects from "LPU64" to "
-              LPU64"\n", exp->exp_obd->obd_name, doa->o_gr, oa->o_id, last);
-
-        for (id = last; id > oa->o_id; id--) {
-                doa->o_id = id;
-
-                /* remove object @doa. It will not lock parent as parents
-                 * already locked. */
-                filter_destroy_internal(exp, doa, NULL, NULL, 0);
-
-                /* update last id just for case when OST will down in cleanup
-                 * orphans time. */
-                filter_set_last_id(filter, group, id);
-
-                /* update last_id on disk periodicaly */
-                if ((id & 1023) == 0)
-                        filter_update_last_objid(obd, group, 0);
-        }
-
-        UNLOCK_PARENTS(obd, group);
-
-        /* return next free id to be used as a new start of sequence. As we
-         * return last id from OST, this will make sure that MDS will start new
-         * sequence from object id which is far from existing and there will not
-         * be object id sharing. */
-        oa->o_id = last + 1;
-        filter_set_last_id(filter, group, oa->o_id);
-
-        CDEBUG(D_ERROR, "%s:["LPU64"] after destroy: set last_objids = "
-               LPU64"\n", exp->exp_obd->obd_name, doa->o_gr, oa->o_id);
-
-        rc = filter_update_last_objid(obd, group, 1);
-        filter->fo_destroy_in_progress = 0;
-
-        obdo_free(doa);
-        RETURN(rc);
-}
-
-static int filter_create(struct obd_export *exp, struct obdo *oa,
-                         struct lov_stripe_md **ea, struct obd_trans_info *oti)
-{
-        struct filter_export_data *fed;
-        struct lvfs_run_ctxt saved;
-        struct filter_obd *filter;
-        obd_gr group = oa->o_gr;
-        struct obd_device *obd;
-        int rc = 0;
-        ENTRY;
-
-        obd = exp->exp_obd;
-        fed = &exp->exp_filter_data;
-        filter = &obd->u.filter;
-
-        CDEBUG(D_INFO, "filter_create(od->o_gr="LPU64",od->o_id="LPU64")\n",
-               group, oa->o_id);
-
-        if (oa->o_valid & OBD_MD_FLFLAGS && oa->o_flags == OBD_FL_DELORPHAN) {
-                push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
-
-                rc = filter_clear_orphans(exp, oa);
-                if (rc) {
-                        CERROR("cannot clear orphans starting from "
-                               LPU64", err = %d\n", oa->o_id, rc);
-                }
-                pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
-                RETURN(rc);
-        }
-
-        LASSERT(ergo(oa->o_valid & OBD_MD_FLFLAGS,
-                     !!(oa->o_flags & OBD_FL_CREATE_CROW) !=
-                     !!(oa->o_flags & OBD_FL_RECREATE_OBJS)));
-
-        /* all non-CROW creates should end up here */
-        if (OBDO_URGENT_CREATE(oa)) {
-                struct obd_statfs *osfs;
-                struct dentry *dentry;
-                
-                /* check space first. As this is real create and client does not
-                 * have yet file created, this is good place to check space. */
-                OBD_ALLOC_PTR(osfs);
-                if (!osfs)
-                        RETURN(-ENOMEM);
-
-                rc = filter_statfs(obd, osfs, jiffies - HZ);
-                if (rc == 0 && osfs->os_bavail < (osfs->os_blocks >> 10)) {
-                        CDEBUG(D_HA, "OST out of space! avail "LPU64"\n",
-                               osfs->os_bavail << filter->fo_obt.obt_sb->s_blocksize_bits);
-                        rc = -ENOSPC;
-                }
-
-                OBD_FREE_PTR(osfs);
-                if (rc)
-                        RETURN(rc);
-
-                dentry = filter_create_object(obd, oa);
-                if (!IS_ERR(dentry)) {
-                        f_dput(dentry);
-                        if (ea != NULL) {
-                                struct lov_stripe_md *lsm = *ea;
-                                if (lsm == NULL) {
-                                        rc = obd_alloc_memmd(exp, &lsm);
-                                        if (rc)
-                                                RETURN(rc);
-                                }
-                                lsm->lsm_object_id = oa->o_id;
-                                *ea = lsm;
-                                rc = 0;
-                        }
-                }
-        } else {
-                CERROR("wrong @oa flags detected 0x%lx. Not an urgent "
-                       "create and not recovery.\n",(unsigned long)oa->o_flags);
-                LBUG();
-        }
-        RETURN(rc);
+        return rc;
 }
 
 /* NB start and end are used for punch, but not truncate */
index 0b6daf5..989c990 100644 (file)
@@ -29,9 +29,9 @@
 #define FILTER_SUBDIR_COUNT      32            /* set to zero for no subdirs */
 #define FILTER_GROUPS 3 /* must be at least 3; not dynamic yet */
 
-#define FILTER_RECOVERY_TIMEOUT (obd_timeout * 5 * HZ / 2) /* *waves hands* */
+#define FILTER_ROCOMPAT_SUPP (0)
 
-#define FILTER_ROCOMPAT_SUPP   (OBD_ROCOMPAT_CROW)
+#define FILTER_RECOVERY_TIMEOUT (obd_timeout * 5 * HZ / 2) /* *waves hands* */
 
 #define FILTER_INCOMPAT_SUPP   (OBD_INCOMPAT_GROUPS)
 
@@ -103,20 +103,20 @@ struct dentry *filter_fid2dentry(struct obd_device *, struct dentry *dir,
 struct dentry *__filter_oa2dentry(struct obd_device *obd, struct obdo *oa,
                                   const char *what, int quiet);
 #define filter_oa2dentry(obd, oa) __filter_oa2dentry(obd, oa, __FUNCTION__, 0)
-#define filter_oa2dentry_quiet(obd, oa) __filter_oa2dentry(obd, oa, __FUNCTION__, 1)
 
 int filter_finish_transno(struct obd_export *, struct obd_trans_info *, int rc);
-__u64 filter_last_id(struct filter_obd *, int group);
+__u64 filter_next_id(struct filter_obd *, struct obdo *);
+__u64 filter_last_id(struct filter_obd *, struct obdo *);
+int filter_update_fidea(struct obd_export *exp, struct inode *inode,
+                        void *handle, struct obdo *oa);
 int filter_update_server_data(struct obd_device *, struct file *,
                               struct filter_server_data *, int force_sync);
 int filter_update_last_objid(struct obd_device *, obd_gr, int force_sync);
 int filter_common_setup(struct obd_device *, obd_count len, void *buf,
                         void *option);
 int filter_destroy(struct obd_export *exp, struct obdo *oa,
-                   struct lov_stripe_md *md, struct obd_trans_info *, 
-                   struct obd_export *md_exp);
-struct dentry *filter_crow_object(struct obd_device *obd, struct obdo *oa);
-
+                   struct lov_stripe_md *md, struct obd_trans_info *,
+                   struct obd_export *);
 int filter_setattr_internal(struct obd_export *exp, struct dentry *dentry,
                             struct obdo *oa, struct obd_trans_info *oti);
 int filter_setattr(struct obd_export *exp, struct obdo *oa,
index c2e7219..da25e3c 100644 (file)
@@ -296,20 +296,16 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
 
         iobuf = filter_iobuf_get(&obd->u.filter, oti);
 
-        dentry = filter_oa2dentry_quiet(obd, oa);
+        dentry = filter_oa2dentry(obd, oa);
         if (IS_ERR(dentry)) {
-                if (PTR_ERR(dentry) == -ENOENT) {
-                        dentry = NULL;
-                        inode = NULL;
-                } else {
-                        dentry = NULL;
-                        GOTO(cleanup, rc = PTR_ERR(dentry));
-                }
-        } else {
-                inode = dentry->d_inode;
+                rc = PTR_ERR(dentry);
+                dentry = NULL;
+                GOTO(cleanup, rc);
         }
-
-        if (oa && inode != NULL)
+        
+        inode = dentry->d_inode;
+        
+        if (oa)
                 obdo_to_inode(inode, oa, OBD_MD_FLATIME);
 
         fsfilt_check_slow(now, obd_timeout, "preprw_read setup");
@@ -328,10 +324,9 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
                  */
                 LASSERT(lnb->page != NULL);
 
-                if (inode == NULL || inode->i_size <= rnb->offset)
-                        /* If there's no more data, or inode is not yet
-                         * allocated by CROW abort early. lnb->rc == 0, so it's
-                         * easy to detect later. */
+                if (inode->i_size <= rnb->offset)
+                        /* If there's no more data, abort early.  lnb->rc == 0,
+                         * so it's easy to detect later. */
                         break;
                 else
                         filter_alloc_dio_page(obd, inode, lnb);
@@ -348,12 +343,10 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
 
         fsfilt_check_slow(now, obd_timeout, "start_page_read");
 
-        if (inode != NULL) {
-                rc = filter_direct_io(OBD_BRW_READ, dentry, iobuf,
-                                      exp, NULL, NULL, NULL);
-                if (rc)
-                        GOTO(cleanup, rc);
-        }
+        rc = filter_direct_io(OBD_BRW_READ, dentry, iobuf,
+                              exp, NULL, NULL, NULL);
+        if (rc)
+                GOTO(cleanup, rc);
 
         lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_READ_BYTES, tot_bytes);
 
@@ -521,19 +514,24 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa,
         LASSERT(objcount == 1);
         LASSERT(obj->ioo_bufcnt > 0);
 
-        OBD_RACE(OBD_FAIL_OST_CLEAR_ORPHANS_RACE);
-
+        push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
         iobuf = filter_iobuf_get(&exp->exp_obd->u.filter, oti);
+        if (iobuf == NULL)
+                GOTO(cleanup, rc = -ENOMEM);
         cleanup_phase = 1;
 
-        push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
-
-        /* make sure that object is already allocated */
-        dentry = filter_crow_object(exp->exp_obd, oa);
+        dentry = filter_fid2dentry(exp->exp_obd, NULL, obj->ioo_gr,
+                                   obj->ioo_id);
         if (IS_ERR(dentry))
                 GOTO(cleanup, rc = PTR_ERR(dentry));
         cleanup_phase = 2;
 
+        if (dentry->d_inode == NULL) {
+                CERROR("%s: trying to BRW to non-existent file "LPU64"\n",
+                       exp->exp_obd->obd_name, obj->ioo_id);
+                GOTO(cleanup, rc = -ENOENT);
+        }
+
         fso.fso_dentry = dentry;
         fso.fso_bufcnt = obj->ioo_bufcnt;
 
@@ -552,13 +550,12 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa,
         rc = filter_grant_check(exp, objcount, &fso, niocount, nb, res,
                                 &left, dentry->d_inode);
 
-        /* We're finishing using body->oa as an input variable, so reset
-         * o_valid here. */
+        /* do not zero out oa->o_valid as it is used in filter_commitrw_write()
+         * for setting UID/GID and fid EA in first write time. */
         if (oa && oa->o_valid & OBD_MD_FLGRANT) {
                 oa->o_grant = filter_grant(exp,oa->o_grant,oa->o_undirty,left);
-                oa->o_valid = OBD_MD_FLGRANT;
-        } else if (oa)
-                oa->o_valid = 0;
+                oa->o_valid |= OBD_MD_FLGRANT;
+        }
 
         spin_unlock(&exp->exp_obd->obd_osfs_lock);
 
index 1b69b0a..f0e7e27 100644 (file)
@@ -428,7 +428,34 @@ int filter_commitrw_write(struct obd_export *exp, struct obdo *oa, int objcount,
 
         fsfilt_check_slow(now, obd_timeout, "brw_start");
 
-        iattr_from_obdo(&iattr,oa,OBD_MD_FLATIME|OBD_MD_FLMTIME|OBD_MD_FLCTIME);
+        i = OBD_MD_FLATIME | OBD_MD_FLMTIME | OBD_MD_FLCTIME;
+
+        /* If the inode still has SUID+SGID bits set (see filter_precreate())
+         * then we will accept the UID+GID if sent by the client for
+         * initializing the ownership of this inode.  We only allow this to
+         * happen once (so clear these bits) and later only allow setattr. */
+        if (inode->i_mode & S_ISUID)
+                i |= OBD_MD_FLUID;
+        if (inode->i_mode & S_ISGID)
+                i |= OBD_MD_FLGID;
+
+        iattr_from_obdo(&iattr, oa, i);
+        if (iattr.ia_valid & (ATTR_UID | ATTR_GID)) {
+                CDEBUG(D_INODE, "update UID/GID to %lu/%lu\n",
+                       (unsigned long)oa->o_uid, (unsigned long)oa->o_gid);
+
+                cap_raise(current->cap_effective, CAP_SYS_RESOURCE);
+
+                iattr.ia_valid |= ATTR_MODE;
+                iattr.ia_mode = inode->i_mode;
+                if (iattr.ia_valid & ATTR_UID)
+                        iattr.ia_mode &= ~S_ISUID;
+                if (iattr.ia_valid & ATTR_GID)
+                        iattr.ia_mode &= ~S_ISGID;
+
+                rc = filter_update_fidea(exp, inode, oti->oti_handle, oa);
+        }
+
         /* filter_direct_io drops i_sem */
         rc = filter_direct_io(OBD_BRW_WRITE, res->dentry, iobuf, exp, &iattr,
                               oti, &wait_handle);
index 7821606..e4b9721 100644 (file)
@@ -613,7 +613,34 @@ int filter_commitrw_write(struct obd_export *exp, struct obdo *oa,
 
         fsfilt_check_slow(now, obd_timeout, "brw_start");
 
-        iattr_from_obdo(&iattr,oa,OBD_MD_FLATIME|OBD_MD_FLMTIME|OBD_MD_FLCTIME);
+        i = OBD_MD_FLATIME | OBD_MD_FLMTIME | OBD_MD_FLCTIME;
+
+        /* If the inode still has SUID+SGID bits set (see filter_precreate())
+         * then we will accept the UID+GID if sent by the client for
+         * initializing the ownership of this inode.  We only allow this to
+         * happen once (so clear these bits) and later only allow setattr. */
+        if (inode->i_mode & S_ISUID)
+                i |= OBD_MD_FLUID;
+        if (inode->i_mode & S_ISGID)
+                i |= OBD_MD_FLGID;
+
+        iattr_from_obdo(&iattr, oa, i);
+        if (iattr.ia_valid & (ATTR_UID | ATTR_GID)) {
+                CDEBUG(D_INODE, "update UID/GID to %lu/%lu\n",
+                       (unsigned long)oa->o_uid, (unsigned long)oa->o_gid);
+                
+                cap_raise(current->cap_effective, CAP_SYS_RESOURCE);
+                
+                iattr.ia_valid |= ATTR_MODE;
+                iattr.ia_mode = inode->i_mode;
+                if (iattr.ia_valid & ATTR_UID)
+                        iattr.ia_mode &= ~S_ISUID;
+                if (iattr.ia_valid & ATTR_GID)
+                        iattr.ia_mode &= ~S_ISGID;
+
+                rc = filter_update_fidea(exp, inode, oti->oti_handle, oa);
+        }
+
         /* filter_direct_io drops i_sem */
         rc = filter_direct_io(OBD_BRW_WRITE, res->dentry, iobuf, exp, &iattr,
                               oti, &wait_handle);
index 1a47b1e..5f5eeaa 100644 (file)
@@ -75,28 +75,24 @@ static int filter_lvbo_init(struct ldlm_resource *res)
         if (IS_ERR(dentry))
                 RETURN(PTR_ERR(dentry));
 
-        if (dentry->d_inode == NULL) {
-                lvb->lvb_size = 0;
-                lvb->lvb_blocks = 0;
-
-                /* making client use MDS mtime as this one is zero, bigger one
-                 * will be taken and this does not break POSIX */
-                lvb->lvb_mtime = 0;
-        } else {
-                lvb->lvb_size = dentry->d_inode->i_size;
-                lvb->lvb_mtime = LTIME_S(dentry->d_inode->i_mtime);
-                lvb->lvb_blocks = dentry->d_inode->i_blocks;
-        }
+        if (dentry->d_inode == NULL)
+                GOTO(out_dentry, rc = -ENOENT);
+
+        lvb->lvb_size = dentry->d_inode->i_size;
+        lvb->lvb_mtime = LTIME_S(dentry->d_inode->i_mtime);
+        lvb->lvb_blocks = dentry->d_inode->i_blocks;
 
         CDEBUG(D_DLMTRACE, "res: "LPU64" initial lvb size: "LPU64", "
                "mtime: "LPU64", blocks: "LPU64"\n",
                res->lr_name.name[0], lvb->lvb_size,
                lvb->lvb_mtime, lvb->lvb_blocks);
 
+        EXIT;
+out_dentry:
         f_dput(dentry);
 
         /* Don't free lvb data on lookup error */
-        RETURN(rc);
+        return rc;
 }
 
 /* This will be called in two ways:
index a4dae3a..b4ca5d9 100644 (file)
@@ -169,6 +169,65 @@ static int osc_rd_cur_grant_bytes(char *page, char **start, off_t off,
         return rc;
 }
 
+static int osc_rd_create_count(char *page, char **start, off_t off, int count,
+                               int *eof, void *data)
+{
+        struct obd_device *obd = data;
+
+        if (obd == NULL)
+                return 0;
+
+        return snprintf(page, count, "%d\n",
+                        obd->u.cli.cl_oscc.oscc_grow_count);
+}
+
+static int osc_wr_create_count(struct file *file, const char *buffer,
+                               unsigned long count, void *data)
+{
+        struct obd_device *obd = data;
+        int val, rc;
+
+        if (obd == NULL)
+                return 0;
+
+        rc = lprocfs_write_helper(buffer, count, &val);
+        if (rc)
+                return rc;
+
+        if (val < 0)
+                return -ERANGE;
+        if (val > OST_MAX_PRECREATE)
+                return -ERANGE;
+
+        obd->u.cli.cl_oscc.oscc_grow_count = val;
+
+        return count;
+}
+
+static int osc_rd_prealloc_next_id(char *page, char **start, off_t off,
+                                   int count, int *eof, void *data)
+{
+        struct obd_device *obd = data;
+
+        if (obd == NULL)
+                return 0;
+
+        return snprintf(page, count, LPU64"\n",
+                        obd->u.cli.cl_oscc.oscc_next_id);
+}
+
+static int osc_rd_prealloc_last_id(char *page, char **start, off_t off,
+                                   int count, int *eof, void *data)
+{
+        struct obd_device *obd = data;
+
+        if (obd == NULL)
+                return 0;
+
+        return snprintf(page, count, LPU64"\n",
+                        obd->u.cli.cl_oscc.oscc_last_id);
+}
+
 static int osc_rd_checksum(char *page, char **start, off_t off, int count,
                            int *eof, void *data)
 {
@@ -199,20 +258,6 @@ static int osc_wr_checksum(struct file *file, const char *buffer,
         return count;
 }
 
-static int osc_rd_last_id(char *page, char **start, off_t off,
-                          int count, int *eof, void *data)
-{
-        struct obd_device *obd = (struct obd_device *)data;
-        struct osc_creator *oscc = &obd->u.cli.cl_oscc;
-        int rc;
-
-        *eof = 1;
-        spin_lock(&oscc->oscc_lock);
-        rc = snprintf(page, count, LPU64"\n", oscc->oscc_next_id);
-        spin_unlock(&oscc->oscc_lock);
-        return rc;
-}
-
 static struct lprocfs_vars lprocfs_obd_vars[] = {
         { "uuid",            lprocfs_rd_uuid,        0, 0 },
         { "ping",            0, lprocfs_wr_ping,        0 },
@@ -232,8 +277,10 @@ static struct lprocfs_vars lprocfs_obd_vars[] = {
         { "max_dirty_mb", osc_rd_max_dirty_mb, osc_wr_max_dirty_mb, 0 },
         { "cur_dirty_bytes", osc_rd_cur_dirty_bytes, 0, 0 },
         { "cur_grant_bytes", osc_rd_cur_grant_bytes, 0, 0 },
+        { "create_count", osc_rd_create_count, osc_wr_create_count, 0 },
+        { "prealloc_next_id", osc_rd_prealloc_next_id, 0, 0 },
+        { "prealloc_last_id", osc_rd_prealloc_last_id, 0, 0 },
         { "checksums", osc_rd_checksum, osc_wr_checksum, 0 },
-        { "last_id",         osc_rd_last_id,     0, 0 },
         { 0 }
 };
 
index ceaccb5..89bb1b6 100644 (file)
 #include <linux/obd_class.h>
 #include "osc_internal.h"
 
-int oscc_recovering(struct osc_creator *oscc)
+static int osc_interpret_create(struct ptlrpc_request *req, void *data, int rc)
 {
-        int recov = 0;
+        struct osc_creator *oscc;
+        struct ost_body *body = NULL;
+        ENTRY;
+
+        if (req->rq_repmsg) {
+                body = lustre_swab_repbuf(req, 0, sizeof(*body),
+                                          lustre_swab_ost_body);
+                if (body == NULL && rc == 0)
+                        rc = -EPROTO;
+        }
 
+        oscc = req->rq_async_args.pointer_arg[0];
+        LASSERT(oscc && (oscc->oscc_obd != LP_POISON));
+        
         spin_lock(&oscc->oscc_lock);
-        recov = oscc->oscc_flags & OSCC_FLAG_RECOVERING;
-        spin_unlock(&oscc->oscc_lock);
+        oscc->oscc_flags &= ~OSCC_FLAG_CREATING;
+        if (rc == -ENOSPC || rc == -EROFS) {
+                oscc->oscc_flags |= OSCC_FLAG_NOSPC;
+                if (body && rc == -ENOSPC) {
+                        oscc->oscc_grow_count = OST_MIN_PRECREATE;
+                        oscc->oscc_last_id = body->oa.o_id;
+                }
+                spin_unlock(&oscc->oscc_lock);
+                DEBUG_REQ(D_INODE, req, "OST out of space, flagging");
+        } else if (rc != 0 && rc != -EIO) {
+                oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
+                oscc->oscc_grow_count = OST_MIN_PRECREATE;
+                spin_unlock(&oscc->oscc_lock);
+                DEBUG_REQ(D_ERROR, req,
+                          "unknown rc %d from async create: failing oscc", rc);
+                ptlrpc_fail_import(req->rq_import, req->rq_import_generation);
+        } else {
+                if (rc == 0) {
+                        oscc->oscc_flags &= ~OSCC_FLAG_LOW;
+                        if (body) {
+                                int diff = body->oa.o_id - oscc->oscc_last_id;
+                                if (diff != oscc->oscc_grow_count)
+                                        oscc->oscc_grow_count =
+                                                max(diff/3, OST_MIN_PRECREATE);
+                                oscc->oscc_last_id = body->oa.o_id;
+                        }
+                }
+                spin_unlock(&oscc->oscc_lock);
+        }
 
-        return recov;
+        CDEBUG(D_HA, "preallocated through id "LPU64" (last used "LPU64")\n",
+               oscc->oscc_last_id, oscc->oscc_next_id);
+
+        wake_up(&oscc->oscc_waitq);
+        RETURN(rc);
 }
 
-static int osc_check_state(struct obd_export *exp)
+static int oscc_internal_create(struct osc_creator *oscc)
 {
-        int rc;
+        struct ptlrpc_request *request;
+        struct ost_body *body;
+        int size = sizeof(*body);
         ENTRY;
 
-        /* ->os_state contains positive error code on remote OST. To convert it
-         * to usual errno form we have to make an sign inversion. */
-        spin_lock(&exp->exp_obd->obd_osfs_lock);
-        rc = -exp->exp_obd->obd_osfs.os_state;
-        spin_unlock(&exp->exp_obd->obd_osfs_lock);
-        
-        RETURN(rc);
+        spin_lock(&oscc->oscc_lock);
+        if (oscc->oscc_grow_count < OST_MAX_PRECREATE &&
+            !(oscc->oscc_flags & (OSCC_FLAG_LOW | OSCC_FLAG_RECOVERING)) &&
+            (__s64)(oscc->oscc_last_id - oscc->oscc_next_id) <=
+                   (oscc->oscc_grow_count / 4 + 1)) {
+                oscc->oscc_flags |= OSCC_FLAG_LOW;
+                oscc->oscc_grow_count *= 2;
+        }
+
+        if (oscc->oscc_grow_count > OST_MAX_PRECREATE / 2)
+                oscc->oscc_grow_count = OST_MAX_PRECREATE / 2;
+
+        if (oscc->oscc_flags & OSCC_FLAG_CREATING ||
+            oscc->oscc_flags & OSCC_FLAG_RECOVERING) {
+                spin_unlock(&oscc->oscc_lock);
+                RETURN(0);
+        }
+        oscc->oscc_flags |= OSCC_FLAG_CREATING;
+        spin_unlock(&oscc->oscc_lock);
+
+        request = ptlrpc_prep_req(oscc->oscc_obd->u.cli.cl_import,
+                                  LUSTRE_OST_VERSION, OST_CREATE, 1,
+                                  &size, NULL);
+        if (request == NULL) {
+                spin_lock(&oscc->oscc_lock);
+                oscc->oscc_flags &= ~OSCC_FLAG_CREATING;
+                spin_unlock(&oscc->oscc_lock);
+                RETURN(-ENOMEM);
+        }
+
+        request->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
+        body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof(*body));
+
+        spin_lock(&oscc->oscc_lock);
+        body->oa.o_id = oscc->oscc_last_id + oscc->oscc_grow_count;
+        body->oa.o_valid |= OBD_MD_FLID;
+        spin_unlock(&oscc->oscc_lock);
+        CDEBUG(D_HA, "preallocating through id "LPU64" (last used "LPU64")\n",
+               body->oa.o_id, oscc->oscc_next_id);
+
+        request->rq_replen = lustre_msg_size(1, &size);
+
+        request->rq_async_args.pointer_arg[0] = oscc;
+        request->rq_interpret_reply = osc_interpret_create;
+        ptlrpcd_add_req(request);
+
+        RETURN(0);
+}
+
+static int oscc_has_objects(struct osc_creator *oscc, int count)
+{
+        int have_objs;
+        spin_lock(&oscc->oscc_lock);
+        have_objs = ((__s64)(oscc->oscc_last_id - oscc->oscc_next_id) >= count);
+        spin_unlock(&oscc->oscc_lock);
+
+        if (!have_objs)
+                oscc_internal_create(oscc);
+
+        return have_objs;
+}
+
+static int oscc_wait_for_objects(struct osc_creator *oscc, int count)
+{
+        int have_objs;
+        int ost_full;
+        int osc_invalid;
+
+        have_objs = oscc_has_objects(oscc, count);
+
+        spin_lock(&oscc->oscc_lock);
+        ost_full = (oscc->oscc_flags & OSCC_FLAG_NOSPC);
+        spin_unlock(&oscc->oscc_lock);
+
+        osc_invalid = oscc->oscc_obd->u.cli.cl_import->imp_invalid;
+
+        return have_objs || ost_full || osc_invalid;
 }
 
-static int osc_check_nospc(struct obd_export *exp)
+static int oscc_precreate(struct osc_creator *oscc, int wait)
 {
-        __u64 blocks, bavail;
-        __u64 inodes, iavail;
+        struct l_wait_info lwi = { 0 };
         int rc = 0;
         ENTRY;
 
-        spin_lock(&exp->exp_obd->obd_osfs_lock);
-        blocks = exp->exp_obd->obd_osfs.os_blocks;
-        bavail = exp->exp_obd->obd_osfs.os_bavail;
-        inodes = exp->exp_obd->obd_osfs.os_files;
-        iavail = exp->exp_obd->obd_osfs.os_ffree;
-        spin_unlock(&exp->exp_obd->obd_osfs_lock);
-        
-        /* return 1 if available space smaller then (blocks >> 10) of all space
-         * on OST. The main point of this water mark is to stop create files at
-         * some point, to let all created and opened files finish possible
-         * writes. */
-        if (blocks > 0 && bavail < (blocks >> 10))
-                rc = 1;
+        if (oscc_has_objects(oscc, oscc->oscc_grow_count / 2))
+                RETURN(0);
+
+        if (!wait)
+                RETURN(0);
 
-        if (inodes > 0 && iavail < 128)
-                rc = 1;
+        /* no rc check -- a no-INTR, no-TIMEOUT wait can't fail */
+        l_wait_event(oscc->oscc_waitq, oscc_wait_for_objects(oscc, 1), &lwi);
+
+        if (!oscc_has_objects(oscc, 1) && (oscc->oscc_flags & OSCC_FLAG_NOSPC))
+                rc = -ENOSPC;
+
+        if (oscc->oscc_obd->u.cli.cl_import->imp_invalid)
+                rc = -EIO;
 
         RETURN(rc);
 }
 
+int oscc_recovering(struct osc_creator *oscc)
+{
+        int recov = 0;
+
+        spin_lock(&oscc->oscc_lock);
+        recov = oscc->oscc_flags & OSCC_FLAG_RECOVERING;
+        spin_unlock(&oscc->oscc_lock);
+
+        return recov;
+}
+
 int osc_create(struct obd_export *exp, struct obdo *oa,
                struct lov_stripe_md **ea, struct obd_trans_info *oti)
 {
+        struct lov_stripe_md *lsm;
         struct osc_creator *oscc = &exp->exp_obd->u.cli.cl_oscc;
         int try_again = 1, rc = 0;
         ENTRY;
+        LASSERT(oa);
+        LASSERT(ea);
+
+        if ((oa->o_valid & OBD_MD_FLGROUP) && (oa->o_gr != 0))
+                RETURN(osc_real_create(exp, oa, ea, oti));
+
+        if ((oa->o_valid & OBD_MD_FLFLAGS) &&
+            oa->o_flags == OBD_FL_RECREATE_OBJS) {
+                RETURN(osc_real_create(exp, oa, ea, oti));
+        }
 
-        LASSERT(oa != NULL);
-        LASSERT(ea != NULL);
-        
         /* this is the special case where create removes orphans */
-        if (oa->o_valid & OBD_MD_FLFLAGS && oa->o_flags == OBD_FL_DELORPHAN) {
+        if ((oa->o_valid & OBD_MD_FLFLAGS) &&
+            oa->o_flags == OBD_FL_DELORPHAN) {
                 spin_lock(&oscc->oscc_lock);
                 if (oscc->oscc_flags & OSCC_FLAG_SYNC_IN_PROGRESS) {
                         spin_unlock(&oscc->oscc_lock);
@@ -136,16 +268,15 @@ int osc_create(struct obd_export *exp, struct obdo *oa,
                 spin_unlock(&oscc->oscc_lock);
                 CDEBUG(D_HA, "%s: oscc recovery started\n",
                        oscc->oscc_obd->obd_name);
-                LASSERT(oscc->oscc_flags & OSCC_FLAG_RECOVERING);
+
+                /* delete from next_id on up */
+                oa->o_valid |= OBD_MD_FLID;
+                oa->o_id = oscc->oscc_next_id - 1;
 
                 CDEBUG(D_HA, "%s: deleting to next_id: "LPU64"\n",
                        oscc->oscc_obd->obd_name, oa->o_id);
 
                 rc = osc_real_create(exp, oa, ea, NULL);
-                if (oscc->oscc_obd == NULL) {
-                        CWARN("the obd for oscc %p has been freed\n", oscc);
-                        RETURN(rc);
-                }
 
                 spin_lock(&oscc->oscc_lock);
                 oscc->oscc_flags &= ~OSCC_FLAG_SYNC_IN_PROGRESS;
@@ -153,6 +284,7 @@ int osc_create(struct obd_export *exp, struct obdo *oa,
                         if (rc == -ENOSPC)
                                 oscc->oscc_flags |= OSCC_FLAG_NOSPC;
                         oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
+                        oscc->oscc_last_id = oa->o_id;
                         CDEBUG(D_HA, "%s: oscc recovery finished: %d\n",
                                oscc->oscc_obd->obd_name, rc);
                         wake_up(&oscc->oscc_waitq);
@@ -161,36 +293,21 @@ int osc_create(struct obd_export *exp, struct obdo *oa,
                                oscc->oscc_obd->obd_name, rc);
                 }
                 spin_unlock(&oscc->oscc_lock);
-                RETURN(rc);
-        }
 
-        LASSERT(ergo(oa->o_valid & OBD_MD_FLFLAGS,
-                     !!(oa->o_flags & OBD_FL_CREATE_CROW) !=
-                     !!(oa->o_flags & OBD_FL_RECREATE_OBJS)));
-
-        /* perform urgent create if asked or import is not crow capable or
-         * ENOSPC case if detected. */
-        if (OBDO_URGENT_CREATE(oa) || !IMP_CROW_ABLE(class_exp2cliimp(exp)) ||
-            osc_check_nospc(exp)) {
-                CDEBUG(D_HA, "perform urgent create\n");
-                oa->o_flags &= ~OBD_FL_CREATE_CROW;
-                if (!oa->o_flags)
-                        oa->o_valid &= ~OBD_MD_FLFLAGS;
-                rc = osc_real_create(exp, oa, ea, oti);
+
                 RETURN(rc);
         }
 
-        /* check OST fs state. */
-        rc = osc_check_state(exp);
-        if (rc) { 
-                CDEBUG(D_HA,"OST is in bad shape to create objects, err %d\n",
-                       rc);
-                RETURN(rc);
+        lsm = *ea;
+        if (lsm == NULL) {
+                rc = obd_alloc_memmd(exp, &lsm);
+                if (rc < 0)
+                        RETURN(rc);
         }
-        
+
         while (try_again) {
-                /* if orphans are being recovered, then we must wait until it is
-                 * finished before we can continue with create. */
+                /* If orphans are being recovered, then we must wait until
+                   it is finished before we can continue with create. */
                 if (oscc_recovering(oscc)) {
                         struct l_wait_info lwi;
 
@@ -202,7 +319,7 @@ int osc_create(struct obd_export *exp, struct obdo *oa,
                                           !oscc_recovering(oscc), &lwi);
                         LASSERT(rc == 0 || rc == -ETIMEDOUT);
                         if (rc == -ETIMEDOUT) {
-                                CDEBUG(D_HA, "%p: timeout waiting on recovery\n",
+                                CDEBUG(D_HA,"%p: timeout waiting on recovery\n",
                                        oscc);
                                 RETURN(rc);
                         }
@@ -216,22 +333,30 @@ int osc_create(struct obd_export *exp, struct obdo *oa,
                         break;
                 }
 
-                if (oscc->oscc_flags & OSCC_FLAG_NOSPC) {
+                if (oscc->oscc_last_id >= oscc->oscc_next_id) {
+                        memcpy(oa, &oscc->oscc_oa, sizeof(*oa));
+                        oa->o_id = oscc->oscc_next_id;
+                        lsm->lsm_object_id = oscc->oscc_next_id;
+                        *ea = lsm;
+                        oscc->oscc_next_id++;
+                        try_again = 0;
+                } else if (oscc->oscc_flags & OSCC_FLAG_NOSPC) {
                         rc = -ENOSPC;
                         spin_unlock(&oscc->oscc_lock);
                         break;
                 }
-
-                oscc->oscc_next_id++;
-                oa->o_id = oscc->oscc_next_id;
-                try_again = 0;
                 spin_unlock(&oscc->oscc_lock);
+                rc = oscc_precreate(oscc, try_again);
+                if (rc)
+                        break;
+        }
 
+        if (rc == 0)
                 CDEBUG(D_HA, "%s: returning objid "LPU64"\n",
                        oscc->oscc_obd->u.cli.cl_import->imp_target_uuid.uuid,
-                       oa->o_id);
-        }
-
+                       lsm->lsm_object_id);
+        else if (*ea == NULL)
+                obd_free_memmd(exp, &lsm);
         RETURN(rc);
 }
 
@@ -243,10 +368,17 @@ void oscc_init(struct obd_device *obd)
                 return;
 
         oscc = &obd->u.cli.cl_oscc;
-        memset(oscc, 0, sizeof(*oscc));
 
-        oscc->oscc_obd = obd;
+        memset(oscc, 0, sizeof(*oscc));
+        INIT_LIST_HEAD(&oscc->oscc_list);
+        init_waitqueue_head(&oscc->oscc_waitq);
         spin_lock_init(&oscc->oscc_lock);
+        oscc->oscc_obd = obd;
+        oscc->oscc_grow_count = OST_MIN_PRECREATE;
+
+        oscc->oscc_next_id = 2;
+        oscc->oscc_last_id = 1;
         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
-        init_waitqueue_head(&oscc->oscc_waitq);
+        /* XXX the export handle should give the oscc the last object */
+        /* oed->oed_oscc.oscc_last_id = exph->....; */
 }
index 9396dd0..c65b2ee 100644 (file)
@@ -315,7 +315,6 @@ static int osc_setattr_async(struct obd_export *exp, struct obdo *oa,
 int osc_real_create(struct obd_export *exp, struct obdo *oa,
                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
 {
-        struct osc_creator *oscc = &exp->exp_obd->u.cli.cl_oscc;
         struct ptlrpc_request *request;
         struct ost_body *body;
         struct lov_stripe_md *lsm;
@@ -361,16 +360,6 @@ int osc_real_create(struct obd_export *exp, struct obdo *oa,
                 GOTO (out_req, rc = -EPROTO);
         }
 
-        if ((oa->o_valid & OBD_MD_FLFLAGS) && oa->o_flags == OBD_FL_DELORPHAN) {
-                struct obd_import *imp = class_exp2cliimp(exp);
-                /* MDS declares last known object, OSS responses
-                 * with next possible object -bzzz */
-                spin_lock(&oscc->oscc_lock);
-                oscc->oscc_next_id = body->oa.o_id;
-                spin_unlock(&oscc->oscc_lock);
-                CDEBUG(D_HA, "%s: set nextid "LPD64" after recovery\n",
-                       imp->imp_target_uuid.uuid, oa->o_id);
-        }
         memcpy(oa, &body->oa, sizeof(*oa));
 
         /* This should really be sent by the OST */
@@ -3061,6 +3050,17 @@ static int osc_set_info(struct obd_export *exp, obd_count keylen,
 
         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
 
+        if (KEY_IS("next_id")) {
+                if (vallen != sizeof(obd_id))
+                        RETURN(-EINVAL);
+                obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
+                CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
+                       exp->exp_obd->obd_name,
+                       obd->u.cli.cl_oscc.oscc_next_id);
+
+                RETURN(0);
+        }
+        
         if (KEY_IS("unlinked")) {
                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
                 spin_lock(&oscc->oscc_lock);
@@ -3069,7 +3069,6 @@ static int osc_set_info(struct obd_export *exp, obd_count keylen,
                 RETURN(0);
         }
 
-
         if (KEY_IS("initial_recov")) {
                 struct obd_import *imp = exp->exp_obd->u.cli.cl_import;
                 if (vallen != sizeof(int))
index 4ab7435..5adf67b 100644 (file)
@@ -587,11 +587,6 @@ finish:
                 exp->exp_connect_flags = ocd->ocd_connect_flags;
                 class_export_put(exp);
 
-                if (IMP_CROW_ABLE(imp)) {
-                        CDEBUG(D_HA, "connected to CROW capable target: %s\n",
-                               imp->imp_target_uuid.uuid);
-                }
-
                 obd_import_event(imp->imp_obd, imp, IMP_EVENT_OCD);
 
                 if ((ocd->ocd_connect_flags & OBD_CONNECT_VERSION) &&
index 72ce472..b44a60d 100644 (file)
@@ -935,8 +935,8 @@ void lustre_swab_qdata(struct qunit_data *d)
 void lustre_assert_wire_constants(void)
 {
         /* Wire protocol assertions generated by 'wirecheck'
-         * running on Linux schatzie.adilger.int 2.6.12-1.1381_FC3 #1 Fri Oct 21 03:46:55 EDT 2005 i6
-         * with gcc version 3.3.4 20040817 (Red Hat Linux 3.3.4-2) */
+         * running on Linux beetle 2.6.12-10-686 #1 Fri Nov 18 12:09:04 UTC 2005 i686 GNU/Linux
+         * with gcc version 3.3.6 (Ubuntu 1:3.3.6-8ubuntu1) */
 
 
         /* Constants... */
@@ -1748,6 +1748,18 @@ void lustre_assert_wire_constants(void)
                  (long long)(int)offsetof(struct mds_body, aclsize));
         LASSERTF((int)sizeof(((struct mds_body *)0)->aclsize) == 4, " found %lld\n",
                  (long long)(int)sizeof(((struct mds_body *)0)->aclsize));
+        LASSERTF((int)offsetof(struct mds_body, padding_2) == 156, " found %lld\n",
+                 (long long)(int)offsetof(struct mds_body, padding_2));
+        LASSERTF((int)sizeof(((struct mds_body *)0)->padding_2) == 4, " found %lld\n",
+                 (long long)(int)sizeof(((struct mds_body *)0)->padding_2));
+        LASSERTF((int)offsetof(struct mds_body, padding_3) == 160, " found %lld\n",
+                 (long long)(int)offsetof(struct mds_body, padding_3));
+        LASSERTF((int)sizeof(((struct mds_body *)0)->padding_3) == 4, " found %lld\n",
+                 (long long)(int)sizeof(((struct mds_body *)0)->padding_3));
+        LASSERTF((int)offsetof(struct mds_body, padding_4) == 164, " found %lld\n",
+                 (long long)(int)offsetof(struct mds_body, padding_4));
+        LASSERTF((int)sizeof(((struct mds_body *)0)->padding_4) == 4, " found %lld\n",
+                 (long long)(int)sizeof(((struct mds_body *)0)->padding_4));
         LASSERTF(FMODE_READ == 1, " found %lld\n",
                  (long long)FMODE_READ);
         LASSERTF(FMODE_WRITE == 2, " found %lld\n",
index 21bb6f3..733ce98 100644 (file)
@@ -65,70 +65,10 @@ int ptlrpc_ping(struct obd_import *imp)
         RETURN(rc);
 }
 
-static int ptlrpc_statfs_interpret(struct ptlrpc_request *req,
-                                   void *data, int rc)
-{
-        struct obd_statfs *msfs;
-        struct obd_device *obd;
-        ENTRY;
-
-        if (rc)
-                RETURN(rc);
-        
-        if (!req->rq_repmsg)
-                RETURN(-EPROTO);
-        
-        msfs = lustre_swab_repbuf(req, 0, sizeof(*msfs),
-                                  lustre_swab_obd_statfs);
-        if (msfs == NULL)
-                RETURN(-EPROTO);
-
-        obd = req->rq_import->imp_obd;
-        
-        spin_lock(&obd->obd_osfs_lock);
-        obd->obd_osfs = *msfs;
-        obd->obd_osfs_age = jiffies;
-        spin_unlock(&obd->obd_osfs_lock);
-        
-        RETURN(0);
-}
-
-int ptlrpc_statfs(struct obd_import *imp)
-{
-        int size = sizeof(struct obd_statfs);
-        struct ptlrpc_request *req;
-        ENTRY;
-
-        req = ptlrpc_prep_req(imp, LUSTRE_OST_VERSION, OST_STATFS, 0,
-                              NULL, NULL);
-        if (!req) {
-                CERROR("OOM trying to ping %s->%s\n",
-                       imp->imp_obd->obd_uuid.uuid,
-                       imp->imp_target_uuid.uuid);
-                RETURN(-ENOMEM);
-        }
-
-        DEBUG_REQ(D_INFO, req, "pinging %s->%s",
-                  imp->imp_obd->obd_uuid.uuid,
-                  imp->imp_target_uuid.uuid);
-
-        req->rq_interpret_reply = ptlrpc_statfs_interpret;
-        req->rq_replen = lustre_msg_size(1, &size);
-        req->rq_no_resend = req->rq_no_delay = 1;
-        ptlrpcd_add_req(req);
-
-        RETURN(0);
-}
-
 static void ptlrpc_update_next_ping(struct obd_import *imp)
 {
-        __u32 interval;
-
-        interval = IMP_CROW_ABLE(imp) ?
-                STATFS_INTERVAL : PING_INTERVAL;
-
         imp->imp_next_ping = jiffies + HZ *
-                (imp->imp_state == LUSTRE_IMP_DISCON ? 10 : interval);
+                (imp->imp_state == LUSTRE_IMP_DISCON ? 10 : PING_INTERVAL);
 }
 
 void ptlrpc_ping_import_soon(struct obd_import *imp)
@@ -164,11 +104,9 @@ static int ptlrpc_pinger_main(void *arg)
 
         /* And now, loop forever, pinging as needed. */
         while (1) {
-                unsigned long sleep_interval = PING_INTERVAL;
-                unsigned long update_interval = 0;
                 unsigned long this_ping = jiffies;
-                struct l_wait_info lwi;
                 long time_to_next_ping;
+                struct l_wait_info lwi;
                 struct list_head *iter;
 
                 down(&pinger_sem);
@@ -179,9 +117,6 @@ static int ptlrpc_pinger_main(void *arg)
                         int force, level;
                         unsigned long flags;
 
-                        if (IMP_CROW_ABLE(imp))
-                                sleep_interval = STATFS_INTERVAL;
-                        
                         spin_lock_irqsave(&imp->imp_lock, flags);
                         level = imp->imp_state;
                         force = imp->imp_force_verify;
@@ -215,10 +150,7 @@ static int ptlrpc_pinger_main(void *arg)
                                                imp->imp_deactive,
                                                imp->imp_obd->obd_no_recov);
                                 } else if (imp->imp_pingable || force) {
-                                        if (IMP_CROW_ABLE(imp))
-                                                ptlrpc_statfs(imp);
-                                        else
-                                                ptlrpc_ping(imp);
+                                        ptlrpc_ping(imp);
                                 }
                         } else {
                                 if (!imp->imp_pingable)
@@ -229,24 +161,15 @@ static int ptlrpc_pinger_main(void *arg)
                                        imp->imp_next_ping, this_ping);
                         }
 
-                        /* using here new calculated @update_interval, as
-                         * sleep_interval holds minimal of possible intervals
-                         * over pingable imports. */
-                        update_interval = IMP_CROW_ABLE(imp) ?
-                                STATFS_INTERVAL : PING_INTERVAL;
-                        
                         /* obd_timeout might have changed */
                         if (time_after(imp->imp_next_ping,
-                                       this_ping + update_interval * HZ))
+                                       this_ping + PING_INTERVAL * HZ))
                                 ptlrpc_update_next_ping(imp);
                 }
                 up(&pinger_sem);
 
-                /* Wait until the next ping time, or until we're stopped. We
-                 * sleep here smaller interval of two possible (ping or
-                 * statfs). If one of imports is CROW capable we'll sleep
-                 * STATFS_INTERVAL and PING_INTERVAL otherwise. */
-                time_to_next_ping = this_ping + (sleep_interval * HZ) - jiffies;
+                /* Wait until the next ping time, or until we're stopped. */
+                time_to_next_ping = this_ping + (PING_INTERVAL * HZ) - jiffies;
                 
                 /* The ping sent by ptlrpc_send_rpc may get sent out
                    say .01 second after this.
@@ -255,7 +178,7 @@ static int ptlrpc_pinger_main(void *arg)
                    we will SKIP the next ping at next_ping, and the
                    ping will get sent 2 timeouts from now!  Beware. */
                 CDEBUG(D_INFO, "next ping in %lu (%lu)\n", time_to_next_ping,
-                       this_ping + sleep_interval * HZ);
+                       this_ping + PING_INTERVAL * HZ);
                 if (time_to_next_ping > 0) {
                         lwi = LWI_TIMEOUT(max_t(long, time_to_next_ping, HZ),
                                           NULL, NULL);
index b71f318..785a664 100755 (executable)
@@ -2,8 +2,8 @@
 
 set -e
 
-#         bug 6088
-ALWAYS_EXCEPT="8 $REPLAY_DUAL_EXCEPT"
+# bug 6088 9761 (CROW related)
+ALWAYS_EXCEPT="8 15a 15b 15c $REPLAY_DUAL_EXCEPT"
 
 LUSTRE=${LUSTRE:-`dirname $0`/..}
 . $LUSTRE/tests/test-framework.sh
index 99f2b5f..988e9b4 100755 (executable)
@@ -14,8 +14,8 @@ init_test_env $@
 . ${CONFIG:=$LUSTRE/tests/cfg/local.sh}
 
 # Skip these tests
-# bug number: 2766 4176
-ALWAYS_EXCEPT="0b  39 48 $REPLAY_SINGLE_EXCEPT"
+# bug number: 2766 4176 9761 (CROW related)
+ALWAYS_EXCEPT="0b 1a 39 $REPLAY_SINGLE_EXCEPT"
 
 gen_config() {
     rm -f $XMLCONFIG
index 0a56ab2..0db15f0 100644 (file)
@@ -26,8 +26,8 @@ int main()
 void lustre_assert_wire_constants(void)
 {
         /* Wire protocol assertions generated by 'wirecheck'
-         * running on Linux schatzie.adilger.int 2.6.12-1.1381_FC3 #1 Fri Oct 21 03:46:55 EDT 2005 i6
-         * with gcc version 3.3.4 20040817 (Red Hat Linux 3.3.4-2) */
+         * running on Linux beetle 2.6.12-10-686 #1 Fri Nov 18 12:09:04 UTC 2005 i686 GNU/Linux
+         * with gcc version 3.3.6 (Ubuntu 1:3.3.6-8ubuntu1) */
 
 
         /* Constants... */
@@ -839,6 +839,18 @@ void lustre_assert_wire_constants(void)
                  (long long)(int)offsetof(struct mds_body, aclsize));
         LASSERTF((int)sizeof(((struct mds_body *)0)->aclsize) == 4, " found %lld\n",
                  (long long)(int)sizeof(((struct mds_body *)0)->aclsize));
+        LASSERTF((int)offsetof(struct mds_body, padding_2) == 156, " found %lld\n",
+                 (long long)(int)offsetof(struct mds_body, padding_2));
+        LASSERTF((int)sizeof(((struct mds_body *)0)->padding_2) == 4, " found %lld\n",
+                 (long long)(int)sizeof(((struct mds_body *)0)->padding_2));
+        LASSERTF((int)offsetof(struct mds_body, padding_3) == 160, " found %lld\n",
+                 (long long)(int)offsetof(struct mds_body, padding_3));
+        LASSERTF((int)sizeof(((struct mds_body *)0)->padding_3) == 4, " found %lld\n",
+                 (long long)(int)sizeof(((struct mds_body *)0)->padding_3));
+        LASSERTF((int)offsetof(struct mds_body, padding_4) == 164, " found %lld\n",
+                 (long long)(int)offsetof(struct mds_body, padding_4));
+        LASSERTF((int)sizeof(((struct mds_body *)0)->padding_4) == 4, " found %lld\n",
+                 (long long)(int)sizeof(((struct mds_body *)0)->padding_4));
         LASSERTF(FMODE_READ == 1, " found %lld\n",
                  (long long)FMODE_READ);
         LASSERTF(FMODE_WRITE == 2, " found %lld\n",