Whamcloud - gitweb
- CROW (CReate On Write) (precreation is removed)
authoryury <yury>
Wed, 22 Jun 2005 11:33:48 +0000 (11:33 +0000)
committeryury <yury>
Wed, 22 Jun 2005 11:33:48 +0000 (11:33 +0000)
- setting owner/group on OSS (needed for quotas)

- some fixes:
  - deadlock on soc client lock.
  - idr_remove() issue.
  - forwarding to wrong MDS in lmv_link()
  - f_dput(dentry) before using it in filter_lvbo_update()
  - filter_sync() assert(group > 0) due to missed GROUP flag oa->o_valid in ll_fsync().
  - cleanups, comments and more debugging messages.
  - jump to wrong label in mds_create_objects() on error path.
  - few missed/wrong ENTRY/RETURN things

34 files changed:
lustre/include/linux/lustre_dlm.h
lustre/include/linux/lustre_export.h
lustre/include/linux/lustre_fsfilt.h
lustre/include/linux/lustre_idl.h
lustre/include/linux/lustre_smfs.h
lustre/include/linux/obd.h
lustre/ldlm/ldlm_lock.c
lustre/ldlm/ldlm_request.c
lustre/ldlm/ldlm_resource.c
lustre/llite/file.c
lustre/llite/llite_internal.h
lustre/llite/llite_lib.c
lustre/llite/rw.c
lustre/lmv/lmv_obd.c
lustre/lov/lov_obd.c
lustre/lov/lov_qos.c
lustre/lov/lov_request.c
lustre/mds/handler.c
lustre/mds/mds_internal.h
lustre/mds/mds_lmv.c
lustre/mds/mds_lov.c
lustre/mds/mds_open.c
lustre/mds/mds_reint.c
lustre/mds/mds_unlink_open.c
lustre/obdfilter/filter.c
lustre/obdfilter/filter_internal.h
lustre/obdfilter/filter_io.c
lustre/obdfilter/filter_lvb.c
lustre/osc/lproc_osc.c
lustre/osc/osc_create.c
lustre/osc/osc_request.c
lustre/smfs/fsfilt.c
lustre/tests/replay-single.sh
lustre/tests/sanity.sh

index 2c44a3d..74b1c52 100644 (file)
@@ -158,7 +158,9 @@ typedef int (*ldlm_res_policy)(struct ldlm_namespace *, struct ldlm_lock **,
 
 struct ldlm_valblock_ops {
         int (*lvbo_init)(struct ldlm_resource *res);
-        int (*lvbo_update)(struct ldlm_resource *res, struct lustre_msg *m,
+        
+        int (*lvbo_update)(struct ldlm_resource *res,
+                           struct lustre_msg *m,
                            int buf_idx, int increase);
 };
 
index 0a91241..8f9b829 100644 (file)
@@ -31,18 +31,8 @@ struct mds_export_data {
 
 struct osc_creator {
         spinlock_t              oscc_lock;
-        struct list_head        oscc_list;
-        struct obd_device       *oscc_obd;
-        obd_id                  oscc_last_id;//last available pre-created object
-        obd_id                  oscc_next_id;// what object id to give out next
-        obd_id                  oscc_gr;
-        int                     oscc_grow_count;
-        int                     oscc_max_grow_count;
-        int                     oscc_kick_barrier;
-        struct osc_created     *oscc_osccd;
-        struct obdo             oscc_oa;
+        struct obd_device      *oscc_obd;
         int                     oscc_flags;
-        wait_queue_head_t       oscc_waitq; /* creating procs wait on this */
 };
 
 struct ldlm_export_data {
index 70d027b..5c9ecd0 100644 (file)
@@ -601,6 +601,7 @@ fsfilt_clear_fs_flags(struct obd_device *obd, struct inode *inode, int flags)
                 return obd->obd_fsops->fs_clear_fs_flags(inode, flags);
         return 0;
 }
+
 static inline int 
 fsfilt_precreate_rec(struct obd_device *obd, struct dentry *dentry,
                      int *num, struct obdo *oa)
index 6a19cf2..c94afce 100644 (file)
@@ -463,10 +463,7 @@ extern void lustre_swab_obd_statfs (struct obd_statfs *os);
 #define OBD_BRW_FROM_GRANT 0x20 /* the osc manages this under llite */
 #define OBD_BRW_GRANTED    0x40 /* the ost manages this */
 
-#define OBD_OBJECT_EOF 0xffffffffffffffffULL
-
-#define OST_MIN_PRECREATE 32
-#define OST_MAX_PRECREATE 20000
+#define OBD_OBJECT_EOF     0xffffffffffffffffULL
 
 struct obd_ioobj {
         obd_id               ioo_id;
index feab216..ba6667f 100644 (file)
@@ -162,7 +162,6 @@ struct fs_extent{
 
 /* SMFS external flags and methods */
 #define SM_ALL_PLG      0x80L
-#define SM_PRECREATE    0x100L
 
 #define SM_DO_REC               0x1
 #define SM_INIT_REC             0x2
index 60a18f9..23f46e6 100644 (file)
@@ -373,11 +373,12 @@ struct mds_obd {
         struct obd_export               *mds_dt_exp;
         int                              mds_has_dt_desc;
         struct lov_desc                  mds_dt_desc;
+
+        spinlock_t                       mds_dt_lock;
         obd_id                          *mds_dt_objids;
-        int                              mds_dt_objids_valid;
-        int                              mds_dt_nextid_set;
         struct file                     *mds_dt_objid_filp;
-        spinlock_t                       mds_dt_lock;
+        int                              mds_dt_objids_valid;
+
         unsigned long                   *mds_client_bitmap;
         struct semaphore                 mds_orphan_recovery_sem;
         
@@ -407,6 +408,7 @@ struct mds_obd {
         /* which secure flavor from remote to this mds is denied */
         spinlock_t                      mds_denylist_lock;
         struct list_head                mds_denylist;
+        struct semaphore                mds_create_sem;
 };
 
 struct echo_obd {
@@ -545,13 +547,19 @@ struct niobuf_local {
         int rc;
 };
 
+#define OBD_MODE_ASYNC (1 << 0)
+#define OBD_MODE_CROW  (1 << 1)
+
 /* Don't conflict with on-wire flags OBD_BRW_WRITE, etc */
 #define N_LOCAL_TEMP_PAGE 0x10000000
 
+typedef int (*obd_obj_alloc_func_t)(obd_id *objid);
+
 struct obd_trans_info {
         __u64                    oti_transno;
         __u64                   *oti_objid;
-        /* Only used on the server side for tracking acks. */
+
+        /* only used on the server side for tracking acks. */
         struct oti_req_ack_lock {
                 struct lustre_handle lock;
                 __u32                mode;
@@ -560,7 +568,8 @@ struct obd_trans_info {
         struct llog_cookie       oti_onecookie;
         struct llog_cookie      *oti_logcookies;
         int                      oti_numcookies;
-        int                      oti_async;
+        int                      oti_flags;
+        obd_obj_alloc_func_t     oti_obj_alloc;
 };
 
 static inline void oti_alloc_cookies(struct obd_trans_info *oti,int num_cookies)
index 2b02882..6d2dae8 100644 (file)
@@ -300,7 +300,8 @@ int ldlm_lock_change_resource(struct ldlm_namespace *ns, struct ldlm_lock *lock,
         LASSERT(list_empty(&lock->l_res_link));
 
         lock->l_resource = ldlm_resource_get(ns, NULL, new_resid,
-                                             lock->l_resource->lr_type, 1);
+                                             lock->l_resource->lr_type, 
+                                            1);
         if (lock->l_resource == NULL) {
                 LBUG();
                 RETURN(-ENOMEM);
@@ -769,7 +770,8 @@ struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns,
                         parent_res = parent_lock->l_resource;
         }
 
-        res = ldlm_resource_get(ns, parent_res, res_id, type, 1);
+        res = ldlm_resource_get(ns, parent_res, res_id,
+                                type, 1);
         if (res == NULL)
                 RETURN(NULL);
 
index 1ed72f3..3bc90b0 100644 (file)
@@ -922,8 +922,10 @@ int ldlm_namespace_foreach_res(struct ldlm_namespace *ns,
 }
 
 /* non-blocking function to manipulate a lock whose cb_data is being put away.*/
-void ldlm_change_cbdata(struct ldlm_namespace *ns, struct ldlm_res_id *res_id,
-                        ldlm_iterator_t iter, void *data)
+void ldlm_change_cbdata(struct ldlm_namespace *ns, 
+                       struct ldlm_res_id *res_id,
+                        ldlm_iterator_t iter, 
+                       void *data)
 {
         struct ldlm_resource *res;
         ENTRY;
index ac93ac6..9a88971 100644 (file)
@@ -494,8 +494,8 @@ struct ldlm_resource *
 ldlm_resource_get(struct ldlm_namespace *ns, struct ldlm_resource *parent,
                   struct ldlm_res_id name, __u32 type, int create)
 {
-        struct list_head *bucket, *tmp;
         struct ldlm_resource *res = NULL;
+        struct list_head *bucket, *tmp;
         ENTRY;
 
         LASSERT(ns != NULL);
@@ -536,8 +536,8 @@ ldlm_resource_get(struct ldlm_namespace *ns, struct ldlm_resource *parent,
                 rc = ns->ns_lvbo->lvbo_init(res);
                 up(&res->lr_lvb_sem);
                 if (rc)
-                        CERROR("lvbo_init failed for resource "LPU64": rc %d\n",
-                               name.name[0], rc);
+                        CERROR("lvbo_init failed for resource "
+                              LPU64": rc %d\n", name.name[0], rc);
         } else {
 out:
                 l_unlock(&ns->ns_lock);
index e2f5453..b522d89 100644 (file)
@@ -1158,62 +1158,6 @@ out:
         return retval;
 }
 
-static int ll_lov_recreate_obj(struct inode *inode, struct file *file,
-                               unsigned long arg)
-{
-        struct ll_inode_info *lli = ll_i2info(inode);
-        struct obd_export *exp = ll_i2dtexp(inode);
-        struct ll_recreate_obj ucreatp;
-        struct obd_trans_info oti = { 0 };
-        struct obdo *oa = NULL;
-        int lsm_size;
-        int rc = 0;
-        struct lov_stripe_md *lsm, *lsm2;
-        ENTRY;
-
-        if (!capable (CAP_SYS_ADMIN))
-                RETURN(-EPERM);
-
-        rc = copy_from_user(&ucreatp, (struct ll_recreate_obj *)arg,
-                            sizeof(struct ll_recreate_obj));
-        if (rc) {
-                RETURN(-EFAULT);
-        }
-        oa = obdo_alloc();
-        if (oa == NULL) 
-                RETURN(-ENOMEM);
-
-        down(&lli->lli_open_sem);
-        lsm = lli->lli_smd;
-        if (lsm == NULL)
-                GOTO(out, rc = -ENOENT);
-        lsm_size = sizeof(*lsm) + (sizeof(struct lov_oinfo) *
-                   (lsm->lsm_stripe_count));
-
-        OBD_ALLOC(lsm2, lsm_size);
-        if (lsm2 == NULL)
-                GOTO(out, rc = -ENOMEM);
-
-        oa->o_id = ucreatp.lrc_id;
-        oa->o_nlink = ucreatp.lrc_ost_idx;
-        oa->o_gr = ucreatp.lrc_group;
-        oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP | OBD_MD_FLFLAGS;
-        oa->o_flags |= OBD_FL_RECREATE_OBJS;
-        obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
-                        OBD_MD_FLMTIME | OBD_MD_FLCTIME);
-
-        oti.oti_objid = NULL;
-        memcpy(lsm2, lsm, lsm_size);
-        rc = obd_create(exp, oa, NULL, 0, &lsm2, &oti);
-
-        OBD_FREE(lsm2, lsm_size);
-        GOTO(out, rc);
-out:
-        up(&lli->lli_open_sem);
-        obdo_free(oa);
-        return rc;
-}
-
 static int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file,
                                     int flags, struct lov_user_md *lum,
                                     int lum_size)
@@ -1309,12 +1253,12 @@ static int ll_lov_setstripe_ea_info(struct inode *inode, struct file *file,
 }
 
 static int ll_lov_setea(struct inode *inode, struct file *file,
-                            unsigned long arg)
+                        unsigned long arg)
 {
         int flags = MDS_OPEN_HAS_OBJS | FMODE_WRITE;
         struct lov_user_md  *lump;
         int lum_size = sizeof(struct lov_user_md) +
-                       sizeof(struct lov_user_ost_data);
+                sizeof(struct lov_user_ost_data);
         int rc;
         ENTRY;
 
@@ -1497,8 +1441,6 @@ int ll_file_ioctl(struct inode *inode, struct file *file, unsigned int cmd,
         }
         case LL_IOC_LOV_GETSTRIPE:
                 RETURN(ll_lov_getstripe(inode, arg));
-        case LL_IOC_RECREATE_OBJ:
-                RETURN(ll_lov_recreate_obj(inode, file, arg));
         case EXT3_IOC_GETFLAGS:
         case EXT3_IOC_SETFLAGS:
                 RETURN( ll_iocontrol(inode, file, cmd, arg) );
@@ -1607,7 +1549,8 @@ int ll_fsync(struct file *file, struct dentry *dentry, int data)
 
                 oa->o_id = lsm->lsm_object_id;
                 oa->o_gr = lsm->lsm_object_gr;
-                oa->o_valid = OBD_MD_FLID;
+                oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
+
                 obdo_from_inode(oa, inode, (OBD_MD_FLTYPE | OBD_MD_FLATIME |
                                             OBD_MD_FLMTIME | OBD_MD_FLCTIME |
                                             OBD_MD_FLGROUP));
index 673b763..25ae7e6 100644 (file)
@@ -489,33 +489,4 @@ ll_prepare_mdc_data(struct mdc_op_data *data, struct inode *i1,
         data->mod_time = LTIME_S(CURRENT_TIME);
 }
 
-#if 0
-/* 
- * this was needed for catching correct calling place of ll_intent_alloc() with
- * missed ll_intent_free() causing memory leak. --umka
- */
-#define ll_intent_alloc(it)                                             \
-        ({                                                              \
-                int err;                                                \
-                OBD_SLAB_ALLOC((it)->d.fs_data, ll_intent_slab, SLAB_KERNEL, \
-                               sizeof(struct lustre_intent_data));      \
-                if (!(it)->d.fs_data) {                                 \
-                        err = -ENOMEM;                                  \
-                } else {                                                \
-                        err = 0;                                        \
-                }                                                       \
-                (it)->it_op_release = ll_intent_release;                \
-                err;                                                    \
-        })
-
-#define ll_intent_free(it)                                      \
-        do {                                                    \
-                if ((it)->d.fs_data) {                                  \
-                        OBD_SLAB_FREE((it)->d.fs_data, ll_intent_slab,  \
-                                      sizeof(struct lustre_intent_data)); \
-                        (it)->d.fs_data = NULL;                         \
-                }                                                       \
-        } while (0)
-#endif
-
 #endif /* LLITE_INTERNAL_H */
index 6e0a11a..85508c7 100644 (file)
@@ -141,9 +141,8 @@ int lustre_common_fill_super(struct super_block *sb, char *lmv, char *lov,
         struct obd_device *obd;
         struct obd_statfs osfs;
         struct lustre_md md;
-        kdev_t devno;
-        int err;
         __u32 valsize;
+        int err;
         ENTRY;
 
         obd = class_name2obd(lmv);
@@ -212,11 +211,18 @@ int lustre_common_fill_super(struct super_block *sb, char *lmv, char *lov,
         sb->s_blocksize = osfs.os_bsize;
         sb->s_blocksize_bits = log2(osfs.os_bsize);
         sb->s_maxbytes = PAGE_CACHE_MAXBYTES;
-       
-        devno = get_uuid2int((char *)sbi->ll_md_exp->exp_obd->obd_uuid.uuid, 
-                             strlen((char *)sbi->ll_md_exp->exp_obd->obd_uuid.uuid));
 
-        sb->s_dev = devno;
+        /* in 2.6.x FS is not allowed to form s_dev */
+#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
+        {
+                kdev_t devno;
+                
+                devno = get_uuid2int((char *)sbi->ll_md_exp->exp_obd->obd_uuid.uuid, 
+                                     strlen((char *)sbi->ll_md_exp->exp_obd->obd_uuid.uuid));
+                
+                sb->s_dev = devno;
+        }
+#endif
 
         /* after statfs, we are supposed to have connected to MDSs,
          * so it's ok to check remote flag returned.
@@ -1170,7 +1176,7 @@ int ll_setattr_raw(struct inode *inode, struct iattr *attr)
                         /* from sys_utime() */
                         if (!(ia_valid & (ATTR_MTIME_SET | ATTR_ATIME_SET))) {
                                 if (current->fsuid != inode->i_uid &&
-                                    (rc=ll_permission(inode,MAY_WRITE,NULL))!=0)
+                                    (rc = ll_permission(inode, MAY_WRITE, NULL)) != 0)
                                         RETURN(rc);
                         } else {
                                 /* from inode_change_ok() */
@@ -1228,7 +1234,7 @@ int ll_setattr_raw(struct inode *inode, struct iattr *attr)
                         if (!rc)
                                 rc = err;
                 }
-        } else if (ia_valid & (ATTR_MTIME | ATTR_MTIME_SET)) {
+        } else if (ia_valid & (ATTR_MTIME | ATTR_MTIME_SET | ATTR_UID | ATTR_GID)) {
                 struct obdo *oa = NULL;
 
                 CDEBUG(D_INODE, "set mtime on OST inode %lu to %lu\n",
@@ -1241,6 +1247,17 @@ int ll_setattr_raw(struct inode *inode, struct iattr *attr)
                 oa->o_id = lsm->lsm_object_id;
                 oa->o_gr = lsm->lsm_object_gr;
                 oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
+
+                if (ia_valid & ATTR_UID) {
+                        oa->o_uid = inode->i_uid;
+                        oa->o_valid |= OBD_MD_FLUID;
+                }
+
+                if (ia_valid & ATTR_GID) {
+                        oa->o_gid = inode->i_gid;
+                        oa->o_valid |= OBD_MD_FLGID;
+                }
+
                 obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
                                 OBD_MD_FLMTIME | OBD_MD_FLCTIME);
                 rc = obd_setattr(sbi->ll_dt_exp, oa, lsm, NULL);
@@ -1248,6 +1265,7 @@ int ll_setattr_raw(struct inode *inode, struct iattr *attr)
                 if (rc)
                         CERROR("obd_setattr fails: rc = %d\n", rc);
         }
+
         RETURN(rc);
 }
 
index 88b1cc0..26fc979 100644 (file)
@@ -180,8 +180,8 @@ out_unlock:
         up(&lli->lli_size_sem);
 } /* ll_truncate */
 
-int ll_prepare_write(struct file *file, struct page *page, unsigned from,
-                     unsigned to)
+int ll_prepare_write(struct file *file, struct page *page,
+                     unsigned from, unsigned to)
 {
         struct inode *inode = page->mapping->host;
         struct ll_inode_info *lli = ll_i2info(inode);
@@ -209,11 +209,22 @@ int ll_prepare_write(struct file *file, struct page *page, unsigned from,
         oa->o_id = lsm->lsm_object_id;
         oa->o_gr = lsm->lsm_object_gr;
         oa->o_mode = inode->i_mode;
+
         oa->o_valid = OBD_MD_FLID | OBD_MD_FLMODE |
                 OBD_MD_FLTYPE | OBD_MD_FLGROUP;
 
-        rc = obd_brw(OBD_BRW_CHECK, ll_i2dtexp(inode), oa, lsm,
-                     1, &pga, NULL);
+        /*
+         * needed for quota to create OSS object on write with correct
+         * owner/group.
+         */
+        oa->o_uid = inode->i_uid;
+        oa->o_valid |= OBD_MD_FLUID;
+
+        oa->o_gid = inode->i_gid;
+        oa->o_valid |= OBD_MD_FLGID;
+        
+        rc = obd_brw(OBD_BRW_CHECK, ll_i2dtexp(inode),
+                     oa, lsm, 1, &pga, NULL);
         if (rc)
                 GOTO(out_free_oa, rc);
 
@@ -317,9 +328,13 @@ static int ll_ap_refresh_count(void *data, int cmd)
         lli = ll_i2info(page->mapping->host);
         lsm = lli->lli_smd;
 
-        down(&lli->lli_size_sem);
+        /*
+         * this callback is called with client lock taken, thus, it should not
+         * sleep or deadlock is possible. --umka
+         */
+//        down(&lli->lli_size_sem);
         kms = lov_merge_size(lsm, 1);
-        up(&lli->lli_size_sem);
+//        up(&lli->lli_size_sem);
 
         /* catch race with truncate */
         if (((__u64)page->index << PAGE_SHIFT) >= kms)
index ea1358d..ac36396 100644 (file)
@@ -1201,7 +1201,7 @@ int lmv_link(struct obd_export *exp, struct mdc_op_data *data,
         struct obd_device *obd = exp->exp_obd;
         struct lmv_obd *lmv = &obd->u.lmv;
         struct lmv_obj *obj;
-        int rc;
+        int rc, mds;
         ENTRY;
         
         rc = lmv_check_connect(obd);
@@ -1210,25 +1210,31 @@ int lmv_link(struct obd_export *exp, struct mdc_op_data *data,
 
         if (data->namelen != 0) {
                 /* usual link request */
-                obj = lmv_grab_obj(obd, &data->id1);
+                obj = lmv_grab_obj(obd, &data->id2);
                 if (obj) {
                         rc = raw_name2idx(obj->hashtype, obj->objcount, 
                                           data->name, data->namelen);
-                        data->id1 = obj->objs[rc].id;
+                        data->id2 = obj->objs[rc].id;
                         lmv_put_obj(obj);
                 }
+
+                mds = id_group(&data->id2);
                 
                 CDEBUG(D_OTHER,"link "DLID4":%*s to "DLID4"\n",
                        OLID4(&data->id2), data->namelen, data->name,
                        OLID4(&data->id1));
         } else {
+                mds = id_group(&data->id1);
+                
                 /* request from MDS to acquire i_links for inode by id1 */
                 CDEBUG(D_OTHER, "inc i_nlinks for "DLID4"\n",
                        OLID4(&data->id1));
         }
-                        
-        rc = md_link(lmv->tgts[id_group(&data->id1)].ltd_exp, 
-                     data, request);
+
+        CDEBUG(D_OTHER, "forward to MDS #%u ("DLID4")\n",
+               mds, OLID4(&data->id1));
+        rc = md_link(lmv->tgts[mds].ltd_exp, data, request);
+        
         RETURN(rc);
 }
 
index 8b3c13a..ae74d22 100644 (file)
@@ -656,7 +656,8 @@ out:
 #define log2(n) ffz(~(n))
 #endif
 
-static int lov_clear_orphans(struct obd_export *export, struct obdo *src_oa,
+static int lov_clear_orphans(struct obd_export *export,
+                             struct obdo *src_oa,
                              struct lov_stripe_md **ea,
                              struct obd_trans_info *oti)
 {
@@ -682,12 +683,14 @@ static int lov_clear_orphans(struct obd_export *export, struct obdo *src_oa,
         }
 
         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
+                int err;
                 struct lov_stripe_md obj_md;
                 struct lov_stripe_md *obj_mdp = &obj_md;
-                int err;
 
-                /* if called for a specific target, we don't
-                   care if it is not active. */
+                /*
+                 * if called for a specific target, we don't care if it is not
+                 * active.
+                 */
                 if (lov->tgts[i].active == 0 && ost_uuid == NULL) {
                         CDEBUG(D_HA, "lov idx %d inactive\n", i);
                         continue;
@@ -696,16 +699,25 @@ static int lov_clear_orphans(struct obd_export *export, struct obdo *src_oa,
                 if (ost_uuid && !obd_uuid_equals(ost_uuid, &lov->tgts[i].uuid))
                         continue;
 
+                /* 
+                 * setting up objid OSS objects should be destroyed starting
+                 * from it.
+                 */
                 memcpy(tmp_oa, src_oa, sizeof(*tmp_oa));
+                tmp_oa->o_valid |= OBD_MD_FLID;
+                tmp_oa->o_id = oti->oti_objid[i];
 
                 /* XXX: LOV STACKING: use real "obj_mdp" sub-data */
                 err = obd_create(lov->tgts[i].ltd_exp, tmp_oa, NULL, 0,
                                  &obj_mdp, oti);
-                if (err)
-                        /* This export will be disabled until it is recovered,
-                           and then orphan recovery will be completed. */
+                if (err) {
+                        /*
+                         * this export will be disabled until it is recovered,
+                         * and then orphan recovery will be completed.
+                         */
                         CERROR("error in orphan recovery on OST idx %d/%d: "
                                "rc = %d\n", i, lov->desc.ld_tgt_count, err);
+                }
 
                 if (ost_uuid)
                         break;
@@ -714,51 +726,11 @@ static int lov_clear_orphans(struct obd_export *export, struct obdo *src_oa,
         RETURN(rc);
 }
 
-static int lov_recreate(struct obd_export *exp, struct obdo *src_oa,
-                        void *acl, int acl_size,
-                        struct lov_stripe_md **ea, struct obd_trans_info *oti)
-{
-        struct lov_stripe_md *obj_mdp, *lsm;
-        struct lov_obd *lov = &exp->exp_obd->u.lov;
-        unsigned ost_idx;
-        int rc, i;
-        ENTRY;
-
-        LASSERT(src_oa->o_valid & OBD_MD_FLFLAGS &&
-                src_oa->o_flags & OBD_FL_RECREATE_OBJS);
-
-        OBD_ALLOC(obj_mdp, sizeof(*obj_mdp));
-        if (obj_mdp == NULL)
-                RETURN(-ENOMEM);
-
-        ost_idx = src_oa->o_nlink;
-        lsm = *ea;
-        if (lsm == NULL)
-                GOTO(out, rc = -EINVAL);
-        if (ost_idx >= lov->desc.ld_tgt_count)
-                GOTO(out, rc = -EINVAL);
-
-        for (i = 0; i < lsm->lsm_stripe_count; i++) {
-                if (lsm->lsm_oinfo[i].loi_ost_idx == ost_idx) {
-                        if (lsm->lsm_oinfo[i].loi_id != src_oa->o_id)
-                                GOTO(out, rc = -EINVAL);
-                        break;
-                }
-        }
-        if (i == lsm->lsm_stripe_count)
-                GOTO(out, rc = -EINVAL);
-
-        rc = obd_create(lov->tgts[ost_idx].ltd_exp, src_oa, acl, acl_size,
-                        &obj_mdp, oti);
-out:
-        OBD_FREE(obj_mdp, sizeof(*obj_mdp));
-        RETURN(rc);
-}
-
 /* the LOV expects oa->o_id to be set to the LOV object id */
-static int lov_create(struct obd_export *exp, struct obdo *src_oa,
-                      void *acl, int acl_size,
-                      struct lov_stripe_md **ea, struct obd_trans_info *oti)
+static int
+lov_create(struct obd_export *exp, struct obdo *src_oa,
+           void *acl, int acl_size, struct lov_stripe_md **ea,
+           struct obd_trans_info *oti)
 {
         struct lov_request_set *set = NULL;
         struct list_head *pos;
@@ -780,13 +752,9 @@ static int lov_create(struct obd_export *exp, struct obdo *src_oa,
         if (!lov->desc.ld_active_tgt_count)
                 RETURN(-EIO);
 
-        /* Recreate a specific object id at the given OST index */
-        if ((src_oa->o_valid & OBD_MD_FLFLAGS) &&
-            (src_oa->o_flags & OBD_FL_RECREATE_OBJS)) {
-                 rc = lov_recreate(exp, src_oa, acl, acl_size, ea, oti);
-                 RETURN(rc);
-        }
-
+        LASSERT(oti->oti_flags & OBD_MODE_CROW);
+                
+        /* main creation loop */
         rc = lov_prep_create_set(exp, ea, src_oa, oti, &set);
         if (rc)
                 RETURN(rc);
@@ -795,9 +763,21 @@ static int lov_create(struct obd_export *exp, struct obdo *src_oa,
                 struct lov_request *req = 
                         list_entry(pos, struct lov_request, rq_link);
 
-                /* XXX: LOV STACKING: use real "obj_mdp" sub-data */
-                rc = obd_create(lov->tgts[req->rq_idx].ltd_exp, req->rq_oa,
-                                acl, acl_size, &req->rq_md, oti);
+                obd_id *objids = oti->oti_objid;
+
+                if (oti->oti_obj_alloc) {
+                        __u64 next_id;
+                                
+                        /* 
+                         * allocating new objid. Here it is delegated to caller,
+                         * that is MDS in CROW case.
+                         */
+                        next_id = oti->oti_obj_alloc(&objids[req->rq_idx]);
+                        req->rq_oa->o_id = next_id;
+                } else {
+                        /* and here is default "allocator" */
+                        req->rq_oa->o_id = ++objids[req->rq_idx];
+                }
                 lov_update_create_set(set, req, rc);
         }
         rc = lov_fini_create_set(set, ea);
@@ -996,12 +976,6 @@ static int lov_setattr(struct obd_export *exp, struct obdo *src_oa,
         if (!exp || !exp->exp_obd)
                 RETURN(-ENODEV);
 
-        /* for now, we only expect time updates here */
-        LASSERT(!(src_oa->o_valid & ~(OBD_MD_FLID|OBD_MD_FLTYPE | OBD_MD_FLMODE|
-                                      OBD_MD_FLATIME | OBD_MD_FLMTIME |
-                                      OBD_MD_FLCTIME | OBD_MD_FLFLAGS |
-                                      OBD_MD_FLSIZE | OBD_MD_FLGROUP)));
-
         LASSERT(!(src_oa->o_valid & OBD_MD_FLGROUP) || src_oa->o_gr > 0);
 
         lov = &exp->exp_obd->u.lov;
@@ -2081,21 +2055,6 @@ static int lov_set_info(struct obd_export *exp, obd_count keylen,
 #define KEY_IS(str) \
         (keylen == strlen(str) && memcmp(key, str, keylen) == 0)
 
-        if (KEY_IS("next_id")) {
-                if (vallen != lov->desc.ld_tgt_count)
-                        RETURN(-EINVAL);
-                for (i = 0; i < lov->desc.ld_tgt_count; i++) {
-                        /* initialize all OSCs, even inactive ones */
-                        if (obd_uuid_empty(&lov->tgts[i].uuid))
-                                continue;
-                        err = obd_set_info(lov->tgts[i].ltd_exp,
-                                          keylen, key, sizeof(obd_id),
-                                          ((obd_id*)val) + i);
-                        if (!rc)
-                                rc = err;
-                }
-                RETURN(rc);
-        }
         if (KEY_IS("async")) {
                 struct lov_desc *desc = &lov->desc;
                 struct lov_tgt_desc *tgts = lov->tgts;
@@ -2131,10 +2090,7 @@ static int lov_set_info(struct obd_export *exp, obd_count keylen,
                 RETURN(rc);
         }
 
-        if (KEY_IS("growth_count")) {
-                if (vallen != sizeof(int))
-                        RETURN(-EINVAL);
-        } else if (KEY_IS("mds_conn")) {
+        if (KEY_IS("mds_conn")) {
                 if (vallen != sizeof(__u32))
                         RETURN(-EINVAL);
         } else if (KEY_IS("unlinked") || KEY_IS("unrecovery")) {
index b8ac8fe..64cfa27 100644 (file)
@@ -182,6 +182,3 @@ int qos_prep_create(struct lov_obd *lov, struct lov_request_set *set, int newea)
 out:
         RETURN(rc);
 }
-
-
-
index 9df75b6..de66527 100644 (file)
@@ -591,10 +591,10 @@ int lov_fini_create_set(struct lov_request_set *set, struct lov_stripe_md **ea)
 int lov_update_create_set(struct lov_request_set *set,
                           struct lov_request *req, int rc)
 {
+        struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
         struct obd_trans_info *oti = set->set_oti;
         struct lov_stripe_md *lsm = set->set_md;
         struct lov_oinfo *loi;
-        struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;
         ENTRY;
 
         req->rq_stripe = set->set_success;
@@ -658,7 +658,7 @@ int lov_prep_create_set(struct obd_export *exp, struct lov_stripe_md **ea,
                 /* If the MDS file was truncated up to some size, stripe over
                  * enough OSTs to allow the file to be created at that size. */
                 if (src_oa->o_valid & OBD_MD_FLSIZE) {
-                        stripes=((src_oa->o_size+LUSTRE_STRIPE_MAXBYTES)>>12)-1;
+                        stripes = ((src_oa->o_size + LUSTRE_STRIPE_MAXBYTES) >> 12) - 1;
                         do_div(stripes, (__u32)(LUSTRE_STRIPE_MAXBYTES >> 12));
 
                         if (stripes > lov->desc.ld_active_tgt_count)
index 72ccf1c..f6ab6c1 100644 (file)
@@ -344,10 +344,19 @@ struct dentry *mds_id2dentry(struct obd_device *obd, struct lustre_id *id,
         if (inode->i_ino != id_ino(&mds->mds_rootid) && generation &&
             inode->i_generation != generation) {
                 /* we didn't find the right inode.. */
-                CERROR("bad inode %lu, link: %lu, ct: %d, generation %u/%u\n",
-                       inode->i_ino, (unsigned long)inode->i_nlink,
-                       atomic_read(&inode->i_count), inode->i_generation,
-                       generation);
+                if (id_group(id) != mds->mds_num) {
+                        CERROR("bad inode %lu found, link: %lu, ct: %d, generation "
+                               "%u != %u, mds %u != %u, request to wrong MDS?\n",
+                               inode->i_ino, (unsigned long)inode->i_nlink,
+                               atomic_read(&inode->i_count), inode->i_generation,
+                               generation, mds->mds_num, (unsigned)id_group(id));
+                } else {
+                        CERROR("bad inode %lu found, link: %lu, ct: %d, generation "
+                               "%u != %u, inode is recreated while request handled?\n",
+                               inode->i_ino, (unsigned long)inode->i_nlink,
+                               atomic_read(&inode->i_count), inode->i_generation,
+                               generation);
+                }
                 dput(result);
                 RETURN(ERR_PTR(-ENOENT));
         }
@@ -868,16 +877,14 @@ int mds_get_md(struct obd_device *obd, struct inode *inode,
         RETURN(rc);
 }
 
-
 /* Call with lock=1 if you want mds_pack_md to take the i_sem.
  * Call with lock=0 if the caller has already taken the i_sem. */
 int mds_pack_md(struct obd_device *obd, struct lustre_msg *msg, int offset,
                 struct mds_body *body, struct inode *inode, int lock, int mea)
 {
         struct mds_obd *mds = &obd->u.mds;
+        int rc, lmm_size;
         void *lmm;
-        int lmm_size;
-        int rc;
         ENTRY;
 
         lmm = lustre_msg_buf(msg, offset, 0);
@@ -902,11 +909,9 @@ int mds_pack_md(struct obd_device *obd, struct lustre_msg *msg, int offset,
 
         rc = mds_get_md(obd, inode, lmm, &lmm_size, lock, mea);
         if (rc > 0) {
-                if (S_ISDIR(inode->i_mode))
-                        body->valid |= OBD_MD_FLDIREA;
-                else
-                        body->valid |= OBD_MD_FLEASIZE;
-
+                body->valid |= S_ISDIR(inode->i_mode) ?
+                        OBD_MD_FLDIREA : OBD_MD_FLEASIZE;
+                
                 if (mea)
                         body->valid |= OBD_MD_MEA;
                 
@@ -916,6 +921,7 @@ int mds_pack_md(struct obd_device *obd, struct lustre_msg *msg, int offset,
 
         RETURN(rc);
 }
+
 int mds_pack_link(struct dentry *dentry, struct ptlrpc_request *req,
                   struct mds_body *repbody, int reply_off)
 {
@@ -3264,6 +3270,7 @@ static int mds_setup(struct obd_device *obd, obd_count len, void *buf)
         /* we have to know mdsnum before touching underlying fs -bzzz */
         atomic_set(&mds->mds_open_count, 0);
         sema_init(&mds->mds_md_sem, 1);
+        sema_init(&mds->mds_create_sem, 1);
         mds->mds_md_connected = 0;
         mds->mds_md_name = NULL;
 
@@ -3478,13 +3485,6 @@ int mds_postrecov_common(struct obd_device *obd)
         ctxt = llog_get_context(&obd->obd_llogs, LLOG_UNLINK_ORIG_CTXT);
         LASSERT(ctxt != NULL);
 
-        /* set nextid first, so we are sure it happens */
-        rc = mds_dt_set_nextid(obd);
-        if (rc) {
-                CERROR("%s: mds_dt_set_nextid() failed\n", obd->obd_name);
-                GOTO(out, rc);
-        }
-
         /* clean PENDING dir */
         rc = mds_cleanup_orphans(obd);
         if (rc < 0)
@@ -3493,8 +3493,8 @@ int mds_postrecov_common(struct obd_device *obd)
 
         group = FILTER_GROUP_FIRST_MDS + mds->mds_num;
         valsize = sizeof(group);
-        rc = obd_set_info(mds->mds_dt_exp, strlen("mds_conn"), "mds_conn",
-                          valsize, &group);
+        rc = obd_set_info(mds->mds_dt_exp, strlen("mds_conn"),
+                          "mds_conn", valsize, &group);
         if (rc)
                 GOTO(out, rc);
 
@@ -3507,7 +3507,7 @@ int mds_postrecov_common(struct obd_device *obd)
         }
 
         /* remove the orphaned precreated objects */
-        rc = mds_dt_clearorphans(mds, NULL /* all OSTs */);
+        rc = mds_dt_clear_orphans(mds, NULL /* all OSTs */);
         if (rc)
                 GOTO(err_llog, rc);
 
index 0e6dd0f..796b70a 100644 (file)
@@ -140,10 +140,8 @@ int mds_dt_set_info(struct obd_export *exp, obd_count keylen,
                      void *key, obd_count vallen, void *val);
 int mds_get_lovtgts(struct obd_device *, int tgt_count, struct obd_uuid *);
 int mds_dt_write_objids(struct obd_device *obd);
-void mds_dt_update_objids(struct obd_device *obd, obd_id *ids);
 int mds_dt_set_growth(struct mds_obd *mds, int count);
-int mds_dt_set_nextid(struct obd_device *obd);
-int mds_dt_clearorphans(struct mds_obd *mds, struct obd_uuid *ost_uuid);
+int mds_dt_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid);
 int mds_post_mds_lovconf(struct obd_device *obd);
 int mds_notify(struct obd_device *obd, struct obd_device *watched,
                int active, void *data);
@@ -152,10 +150,17 @@ int mds_convert_lov_ea(struct obd_device *obd, struct inode *inode,
                        struct lov_mds_md *lmm, int lmm_size);
 int mds_revalidate_lov_ea(struct obd_device *obd, struct inode *inode,
                           struct lustre_msg *msg, int offset);
+void mds_dt_update_objids(struct obd_device *obd, obd_id *ids);
+void mds_dt_save_objids(struct obd_device *obd, obd_id *ids);
 
 /* mds/mds_open.c */
-int mds_destroy_objects(struct obd_device *obd,
-                        struct inode *inode, int async);
+int
+mds_create_object(struct obd_device *obd, struct ptlrpc_request *req,
+                  int offset, struct mds_update_record *rec,
+                  struct dentry *dchild, void **handle,
+                  obd_id *ids);
+int mds_destroy_object(struct obd_device *obd,
+                       struct inode *inode, int async);
 int mds_query_write_access(struct inode *inode);
 int mds_open(struct mds_update_record *rec, int offset,
              struct ptlrpc_request *req, struct lustre_handle *);
index ce81bc0..a660e41 100644 (file)
@@ -781,10 +781,6 @@ static int filter_start_page_write(struct inode *inode,
         return 0;
 }
 
-struct dentry *filter_id2dentry(struct obd_device *obd,
-                                struct dentry *dir_dentry,
-                                obd_gr group, obd_id id);
-
 int mds_preprw(int cmd, struct obd_export *exp, struct obdo *oa,
                 int objcount, struct obd_ioobj *obj,
                 int niocount, struct niobuf_remote *nb,
index 5d13d0f..ced694c 100644 (file)
@@ -55,6 +55,19 @@ void cpu_to_le_lov_desc (struct lov_desc *ld)
         ld->ld_pattern = cpu_to_le32 (ld->ld_pattern);
 }
 
+void mds_dt_save_objids(struct obd_device *obd, obd_id *ids)
+{
+        struct mds_obd *mds = &obd->u.mds;
+        int i;
+        ENTRY;
+
+        spin_lock(&mds->mds_dt_lock);
+        for (i = 0; i < mds->mds_dt_desc.ld_tgt_count; i++)
+                ids[i] = mds->mds_dt_objids[i];
+        spin_unlock(&mds->mds_dt_lock);
+        EXIT;
+}
+
 void mds_dt_update_objids(struct obd_device *obd, obd_id *ids)
 {
         struct mds_obd *mds = &obd->u.mds;
@@ -63,8 +76,8 @@ void mds_dt_update_objids(struct obd_device *obd, obd_id *ids)
 
         spin_lock(&mds->mds_dt_lock);
         for (i = 0; i < mds->mds_dt_desc.ld_tgt_count; i++)
-                if (ids[i] > (mds->mds_dt_objids)[i])
-                        (mds->mds_dt_objids)[i] = ids[i];
+                if (ids[i] > mds->mds_dt_objids[i])
+                        mds->mds_dt_objids[i] = ids[i];
         spin_unlock(&mds->mds_dt_lock);
         EXIT;
 }
@@ -72,14 +85,15 @@ void mds_dt_update_objids(struct obd_device *obd, obd_id *ids)
 static int mds_dt_read_objids(struct obd_device *obd)
 {
         struct mds_obd *mds = &obd->u.mds;
-        obd_id *ids;
+        int i, rc, size;
         loff_t off = 0;
-        int i, rc, size = mds->mds_dt_desc.ld_tgt_count * sizeof(*ids);
+        obd_id *ids;
         ENTRY;
 
         if (mds->mds_dt_objids != NULL)
                 RETURN(0);
 
+        size = mds->mds_dt_desc.ld_tgt_count * sizeof(*ids);
         OBD_ALLOC(ids, size);
         if (ids == NULL)
                 RETURN(-ENOMEM);
@@ -87,17 +101,19 @@ static int mds_dt_read_objids(struct obd_device *obd)
 
         if (mds->mds_dt_objid_filp->f_dentry->d_inode->i_size == 0)
                 RETURN(0);
+        
         rc = fsfilt_read_record(obd, mds->mds_dt_objid_filp, ids, size, &off);
         if (rc < 0) {
-                CERROR("Error reading objids %d\n", rc);
+                CERROR("error reading objids %d\n", rc);
         } else {
                 mds->mds_dt_objids_valid = 1;
                 rc = 0;
         }
 
-        for (i = 0; i < mds->mds_dt_desc.ld_tgt_count; i++)
-                CDEBUG(D_INFO, "read last object "LPU64" for idx %d\n",
-                       mds->mds_dt_objids[i], i);
+        for (i = 0; i < mds->mds_dt_desc.ld_tgt_count; i++) {
+                CDEBUG(D_INFO, "read last object "LPU64
+                       " for idx %d\n", mds->mds_dt_objids[i], i);
+        }
 
         RETURN(rc);
 }
@@ -105,25 +121,26 @@ static int mds_dt_read_objids(struct obd_device *obd)
 int mds_dt_write_objids(struct obd_device *obd)
 {
         struct mds_obd *mds = &obd->u.mds;
+        int i, rc, size;
         loff_t off = 0;
-        int i, rc, size = mds->mds_dt_desc.ld_tgt_count * sizeof(obd_id);
         ENTRY;
 
         for (i = 0; i < mds->mds_dt_desc.ld_tgt_count; i++)
                 CDEBUG(D_INFO, "writing last object "LPU64" for idx %d\n",
                        mds->mds_dt_objids[i], i);
 
+        size = mds->mds_dt_desc.ld_tgt_count * sizeof(obd_id);
         rc = fsfilt_write_record(obd, mds->mds_dt_objid_filp,
                                  mds->mds_dt_objids, size, &off, 0);
         RETURN(rc);
 }
 
-int mds_dt_clearorphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
+int mds_dt_clear_orphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
 {
-        int rc;
+        struct lov_stripe_md *empty_ea = NULL;
+        struct obd_trans_info oti = { 0 };
         struct obdo *oa = NULL;
-        struct obd_trans_info oti = {0};
-        struct lov_stripe_md  *empty_ea = NULL;
+        int rc;
         ENTRY;
 
         LASSERT(mds->mds_dt_objids != NULL);
@@ -138,32 +155,27 @@ int mds_dt_clearorphans(struct mds_obd *mds, struct obd_uuid *ost_uuid)
                 RETURN(-ENOMEM);
         
         memset(oa, 0, sizeof(*oa));
+
         oa->o_gr = FILTER_GROUP_FIRST_MDS + mds->mds_num;
         oa->o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
         oa->o_flags = OBD_FL_DELORPHAN;
         
         if (ost_uuid != NULL) {
-                memcpy(&oa->o_inline, ost_uuid, sizeof(*ost_uuid));
+                memcpy(&oa->o_inline, ost_uuid,
+                       sizeof(*ost_uuid));
                 oa->o_valid |= OBD_MD_FLINLINE;
         }
-        rc = obd_create(mds->mds_dt_exp, oa, NULL, 0, &empty_ea, &oti);
-        obdo_free(oa);
-        RETURN(rc);
-}
 
-/* update the LOV-OSC knowledge of the last used object id's */
-int mds_dt_set_nextid(struct obd_device *obd)
-{
-        struct mds_obd *mds = &obd->u.mds;
-        int rc;
-        ENTRY;
-
-        LASSERT(!obd->obd_recovering);
-
-        LASSERT(mds->mds_dt_objids != NULL);
-
-        rc = obd_set_info(mds->mds_dt_exp, strlen("next_id"), "next_id",
-                          mds->mds_dt_desc.ld_tgt_count, mds->mds_dt_objids);
+        /* 
+         * passing current objids for letting data layer know last objids MDS
+         * knows about and do appropriate. --umka
+         */
+        oti.oti_objid = mds->mds_dt_objids;
+        
+        rc = obd_create(mds->mds_dt_exp, oa,
+                        NULL, 0, &empty_ea, &oti);
+        
+        obdo_free(oa);
         RETURN(rc);
 }
 
@@ -226,8 +238,8 @@ static int mds_dt_update_desc(struct obd_device *obd, struct obd_export *lov)
 int mds_dt_connect(struct obd_device *obd, char *lov_name)
 {
         struct mds_obd *mds = &obd->u.mds;
-        struct lustre_handle conn = {0,};
-        int rc, i;
+        struct lustre_handle conn = { 0 };
+        int i, rc = 0;
         ENTRY;
 
         if (IS_ERR(mds->mds_dt_obd))
@@ -247,7 +259,8 @@ int mds_dt_connect(struct obd_device *obd, char *lov_name)
         if (mds->mds_ost_sec) {
                 rc = obd_set_info(mds->mds_dt_obd->obd_self_export,
                                   strlen("sec"), "sec",
-                                  strlen(mds->mds_ost_sec), mds->mds_ost_sec);
+                                  strlen(mds->mds_ost_sec),
+                                  mds->mds_ost_sec);
                 if (rc) {
                         mds->mds_dt_obd = ERR_PTR(rc);
                         RETURN(rc);
@@ -290,10 +303,13 @@ int mds_dt_connect(struct obd_device *obd, char *lov_name)
                 GOTO(err_reg, rc);
         }
 
-        /* If we're mounting this code for the first time on an existing FS,
-         * we need to populate the objids array from the real OST values */
+        /*
+         * If we're mounting this code for the first time on an existing FS, we
+         * need to populate the objids array from the real OST values.
+         */
         if (!mds->mds_dt_objids_valid) {
                 __u32 size = sizeof(obd_id) * mds->mds_dt_desc.ld_tgt_count;
+                
                 rc = obd_get_info(mds->mds_dt_exp, strlen("last_id"),
                                   "last_id", &size, mds->mds_dt_objids);
                 if (!rc) {
@@ -307,12 +323,12 @@ int mds_dt_connect(struct obd_device *obd, char *lov_name)
                                        "writing objids file: %d\n", rc);
                 }
         }
-
-        /* I want to see a callback happen when the OBD moves to a
-         * "For General Use" state, and that's when we'll call
-         * set_nextid().  The class driver can help us here, because
-         * it can use the obd_recovering flag to determine when the
-         * the OBD is full available. */
+        /*
+         * I want to see a callback happen when the OBD moves to a "For General
+         * Use" state, and that's when we'll call set_nextid(). The class driver
+         * can help us here, because it can use the obd_recovering flag to
+         * determine when the the OBD is full available.
+         */
         if (!obd->obd_recovering) {
                 CDEBUG(D_OTHER, "call mds_postrecov_common()\n");
                 rc = mds_postrecov_common(obd);
@@ -325,8 +341,8 @@ err_reg:
         obd_register_observer(mds->mds_dt_obd, NULL);
 err_discon:
         obd_disconnect(mds->mds_dt_exp, 0);
-        mds->mds_dt_exp = NULL;
         mds->mds_dt_obd = ERR_PTR(rc);
+        mds->mds_dt_exp = NULL;
         return rc;
 }
 
@@ -665,9 +681,9 @@ int mds_dt_synchronize(void *data)
         CWARN("MDS %s: %s now active, resetting orphans\n",
               obd->obd_name, uuid->uuid);
 
-        rc = mds_dt_clearorphans(&obd->u.mds, uuid);
+        rc = mds_dt_clear_orphans(&obd->u.mds, uuid);
         if (rc != 0) {
-                CERROR("%s: failed at mds_dt_clearorphans(): %d\n", 
+                CERROR("%s: failed at mds_dt_clear_orphans(): %d\n", 
                        obd->obd_name, rc);
                 GOTO(cleanup, rc);
         }
index fac6e93..2f45a66 100644 (file)
@@ -247,35 +247,34 @@ static struct mds_file_data *mds_dentry_open(struct dentry *dentry,
         struct mds_obd *mds = mds_req2mds(req);
         struct mds_file_data *mfd;
         struct mds_body *body;
-        int error;
+        int rc = 0;
         ENTRY;
 
         mfd = mds_mfd_new();
         if (mfd == NULL) {
                 CERROR("mds: out of memory\n");
-                GOTO(cleanup_dentry, error = -ENOMEM);
+                GOTO(cleanup_dentry, rc = -ENOMEM);
         }
 
         body = lustre_msg_buf(req->rq_repmsg, 1, sizeof (*body));
 
         if (flags & FMODE_WRITE) {
                 /* FIXME: in recovery, need to pass old epoch here */
-                error = mds_get_write_access(mds, dentry->d_inode, 0);
-                if (error)
-                        GOTO(cleanup_mfd, error);
+                rc = mds_get_write_access(mds, dentry->d_inode, 0);
+                if (rc)
+                        GOTO(cleanup_mfd, rc);
 #ifdef IFILTERDATA_ACTUALLY_USED
                 body->io_epoch = MDS_FILTERDATA(dentry->d_inode)->io_epoch;
-#endif /*IFILTERDATA_ACTUALLY_USED*/
+#endif
         } else if (flags & FMODE_EXEC) {
-                error = mds_deny_write_access(mds, dentry->d_inode);
-                if (error)
-                        GOTO(cleanup_mfd, error);
+                rc = mds_deny_write_access(mds, dentry->d_inode);
+                if (rc)
+                        GOTO(cleanup_mfd, rc);
         }
 
         dget(dentry);
 
-        /* Mark the file as open to handle open-unlink. */
-
+        /* mark the file as open to handle open-unlink. */
         DOWN_WRITE_I_ALLOC_SEM(dentry->d_inode);
         mds_orphan_open_inc(dentry->d_inode);
         UP_WRITE_I_ALLOC_SEM(dentry->d_inode);
@@ -290,41 +289,50 @@ static struct mds_file_data *mds_dentry_open(struct dentry *dentry,
         mds_mfd_put(mfd);
 
         body->handle.cookie = mfd->mfd_handle.h_cookie;
-
         RETURN(mfd);
-
 cleanup_mfd:
         mds_mfd_put(mfd);
         mds_mfd_destroy(mfd);
 cleanup_dentry:
-        return ERR_PTR(error);
+        return ERR_PTR(rc);
 }
 
-static void mds_objids_from_lmm(obd_id *ids, struct lov_mds_md *lmm,
-                                struct lov_desc *desc)
+/* this is object id allocation callback */
+static int mds_obj_alloc(obd_id *objid)
+{
+        ENTRY;
+        LASSERT(objid != NULL);
+        RETURN(++(*objid));
+}
+
+static inline void
+mds_objids_from_lmm(obd_id *ids, struct lov_mds_md *lmm,
+                    struct lov_desc *desc)
 {
         int i;
+        
         for (i = 0; i < le32_to_cpu(lmm->lmm_stripe_count); i++) {
                 ids[le32_to_cpu(lmm->lmm_objects[i].l_ost_idx)] =
                         le64_to_cpu(lmm->lmm_objects[i].l_object_id);
         }
 }
 
-/* Must be called with i_sem held */
-static int mds_create_objects(struct ptlrpc_request *req, int offset,
-                              struct mds_update_record *rec,
-                              struct mds_obd *mds, struct obd_device *obd,
-                              struct dentry *dchild, void **handle, 
-                              obd_id **ids)
+/* must be called with i_sem held */
+int
+mds_create_object(struct obd_device *obd, struct ptlrpc_request *req,
+                  int offset, struct mds_update_record *rec,
+                  struct dentry *dchild, void **handle,
+                  obd_id *ids)
 {
-        struct obdo *oa = NULL;
+        struct inode *inode = dchild->d_inode;
+        struct mds_obd *mds = &obd->u.mds;
         struct obd_trans_info oti = { 0 };
-        struct mds_body *body;
         struct lov_stripe_md *lsm = NULL;
         struct lov_mds_md *lmm = NULL;
-        struct inode *inode = dchild->d_inode;
-        void *lmm_buf;
         int rc, lmm_bufsize, lmm_size;
+        struct obdo *oa = NULL;
+        struct mds_body *body;
+        void *lmm_buf;
         ENTRY;
 
         if (rec->ur_flags & MDS_OPEN_DELAY_CREATE ||
@@ -338,11 +346,8 @@ static int mds_create_objects(struct ptlrpc_request *req, int offset,
         if (body->valid & OBD_MD_FLEASIZE)
                 RETURN(0);
 
-        OBD_ALLOC(*ids, mds->mds_dt_desc.ld_tgt_count * sizeof(**ids));
-        if (*ids == NULL)
-                RETURN(-ENOMEM);
-        oti.oti_objid = *ids;
-
+        oti.oti_objid = ids;
+                
         /* replay case */
         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) {
                 LASSERT(id_ino(rec->ur_id2));
@@ -352,19 +357,25 @@ static int mds_create_objects(struct ptlrpc_request *req, int offset,
                 LASSERT(lmm);
 
                 if (*handle == NULL)
-                        *handle = fsfilt_start(obd,inode,FSFILT_OP_CREATE,NULL);
+                        *handle = fsfilt_start(obd, inode, FSFILT_OP_CREATE, NULL);
                 if (IS_ERR(*handle)) {
                         rc = PTR_ERR(*handle);
                         *handle = NULL;
-                        GOTO(out_ids, rc);
+                        RETURN(rc);
                 }
 
-                mds_objids_from_lmm(*ids, lmm, &mds->mds_dt_desc);
+                /* 
+                 * FIXME: this is evil layering violation, all things related to
+                 * stripping should be done by LOV.  --umka.
+                 */
+                mds_objids_from_lmm(ids, lmm, &mds->mds_dt_desc);
 
                 lmm_buf = lustre_msg_buf(req->rq_repmsg, offset, 0);
                 lmm_bufsize = req->rq_repmsg->buflens[offset];
-                LASSERT(lmm_buf);
+                
+                LASSERT(lmm_buf != NULL);
                 LASSERT(lmm_bufsize >= lmm_size);
+
                 memcpy(lmm_buf, lmm, lmm_size);
                 rc = fsfilt_set_md(obd, inode, *handle, lmm,
                                    lmm_size, EA_LOV);
@@ -374,11 +385,11 @@ static int mds_create_objects(struct ptlrpc_request *req, int offset,
         }
 
         if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_MDS_ALLOC_OBDO))
-                GOTO(out_ids, rc = -ENOMEM);
+                RETURN(-ENOMEM);
 
         oa = obdo_alloc();
         if (oa == NULL)
-                GOTO(out_ids, rc = -ENOMEM);
+                RETURN(-ENOMEM);
         oa->o_mode = S_IFREG | 0600;
         oa->o_id = inode->i_ino;
         oa->o_gr = FILTER_GROUP_FIRST_MDS + mds->mds_num;
@@ -389,8 +400,8 @@ static int mds_create_objects(struct ptlrpc_request *req, int offset,
                 OBD_MD_FLMODE | OBD_MD_FLUID | OBD_MD_FLGID | OBD_MD_FLGROUP;
         oa->o_size = 0;
 
-        obdo_from_inode(oa, inode, OBD_MD_FLTYPE|OBD_MD_FLATIME|OBD_MD_FLMTIME|
-                        OBD_MD_FLCTIME);
+        obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
+                        OBD_MD_FLMTIME | OBD_MD_FLCTIME);
 
         if (!(rec->ur_flags & MDS_OPEN_HAS_OBJS)) {
                 /* check if things like lfs setstripe are sending us the ea */
@@ -415,15 +426,25 @@ static int mds_create_objects(struct ptlrpc_request *req, int offset,
                         OBD_FREE(lmm, mds->mds_max_mdsize);
                         if (rc)
                                 GOTO(out_oa, rc);
-                } 
+                }
+
+                /* 
+                 * create with CROW flag and base ids for allocating new ids on
+                 * them.
+                 */
+                oti.oti_flags |= OBD_MODE_CROW;
+                oti.oti_obj_alloc = mds_obj_alloc;
+
                 LASSERT(oa->o_gr >= FILTER_GROUP_FIRST_MDS);
                 rc = obd_create(mds->mds_dt_exp, oa, NULL, 0, &lsm, &oti);
+
                 if (rc) {
                         int level = D_ERROR;
                         if (rc == -ENOSPC)
                                 level = D_INODE;
-                        CDEBUG(level, "error creating objects for "
-                                      "inode %lu: rc = %d\n",
+                        CDEBUG((rc == -ENOSPC ? D_INODE : D_ERROR),
+                               "error creating objects for "
+                               "inode %lu: rc = %d\n",
                                inode->i_ino, rc);
                         if (rc > 0) {
                                 CERROR("obd_create returned invalid "
@@ -435,16 +456,17 @@ static int mds_create_objects(struct ptlrpc_request *req, int offset,
         } else {
                 rc = obd_iocontrol(OBD_IOC_LOV_SETEA, mds->mds_dt_exp,
                                    0, &lsm, rec->ur_eadata);
-                if (rc) {
+                if (rc)
                         GOTO(out_oa, rc);
-                }
+
                 lsm->lsm_object_id = oa->o_id;
                 lsm->lsm_object_gr = oa->o_gr;
         }
         if (inode->i_size) {
                 oa->o_size = inode->i_size;
-                obdo_from_inode(oa, inode, OBD_MD_FLTYPE|OBD_MD_FLATIME|
-                                OBD_MD_FLMTIME| OBD_MD_FLCTIME| OBD_MD_FLSIZE);
+                obdo_from_inode(oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME |
+                                OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLSIZE);
+                
                 rc = obd_setattr(mds->mds_dt_exp, oa, lsm, &oti);
                 if (rc) {
                         CERROR("error setting attrs for inode %lu: rc %d\n",
@@ -465,7 +487,11 @@ static int mds_create_objects(struct ptlrpc_request *req, int offset,
         rc = obd_packmd(mds->mds_dt_exp, &lmm, lsm);
         if (!id_ino(rec->ur_id2))
                 obd_free_memmd(mds->mds_dt_exp, &lsm);
-        LASSERT(rc >= 0);
+        if (rc < 0) {
+                CERROR("cannot pack lsm, err = %d\n", rc);
+                GOTO(out_oa, rc);
+        }
+
         lmm_size = rc;
         body->eadatasize = rc;
 
@@ -487,22 +513,18 @@ static int mds_create_objects(struct ptlrpc_request *req, int offset,
 
         memcpy(lmm_buf, lmm, lmm_size);
         obd_free_diskmd(mds->mds_dt_exp, &lmm);
- out_oa:
+out_oa:
         oti_free_cookies(&oti);
         obdo_free(oa);
- out_ids:
-        if (rc) {
-                OBD_FREE(*ids, mds->mds_dt_desc.ld_tgt_count * sizeof(**ids));
-                *ids = NULL;
-        }
-        if(lsm)
+
+        if (lsm)
                 obd_free_memmd(mds->mds_dt_exp, &lsm);
         RETURN(rc);
 }
 
 int
-mds_destroy_objects(struct obd_device *obd,
-                    struct inode *inode, int async)
+mds_destroy_object(struct obd_device *obd,
+                   struct inode *inode, int async)
 {
         struct mds_obd *mds = &obd->u.mds;
         struct lov_mds_md *lmm = NULL;
@@ -512,8 +534,8 @@ mds_destroy_objects(struct obd_device *obd,
         LASSERT(inode != NULL);
 
         if (inode->i_nlink != 0) {
-                CWARN("attempt to destroy OSS object when "
-                      "i_nlink == %d\n", (int)inode->i_nlink);
+                CDEBUG(D_INODE, "attempt to destroy OSS object when "
+                       "i_nlink == %d\n", (int)inode->i_nlink);
                 RETURN(0);
         }
         
@@ -696,10 +718,10 @@ static int mds_finish_open(struct ptlrpc_request *req, struct dentry *dchild,
                            struct mds_body *body, int flags, void **handle,
                            struct mds_update_record *rec, struct ldlm_reply *rep)
 {
-        struct mds_obd *mds = mds_req2mds(req);
         struct obd_device *obd = req->rq_export->exp_obd;
+        struct mds_obd *mds = mds_req2mds(req);
         struct mds_file_data *mfd = NULL;
-        obd_id *ids = NULL; /* object IDs created */
+        obd_id *ids = NULL;
         unsigned mode;
         int rc = 0;
         ENTRY;
@@ -707,6 +729,7 @@ static int mds_finish_open(struct ptlrpc_request *req, struct dentry *dchild,
         /* atomically create objects if necessary */
         down(&dchild->d_inode->i_sem);
         mode = dchild->d_inode->i_mode;
+
         if ((S_ISREG(mode) && !(body->valid & OBD_MD_FLEASIZE)) || 
             (S_ISDIR(mode) && !(body->valid & OBD_MD_FLDIREA))) {
                 rc = mds_pack_md(obd, req->rq_repmsg, 2, body,
@@ -716,6 +739,7 @@ static int mds_finish_open(struct ptlrpc_request *req, struct dentry *dchild,
                         RETURN(rc);
                 }
         }
+
         if (rec != NULL) {
                 /* no EA: create objects */
                 if ((body->valid & OBD_MD_FLEASIZE) &&
@@ -723,16 +747,49 @@ static int mds_finish_open(struct ptlrpc_request *req, struct dentry *dchild,
                         up(&dchild->d_inode->i_sem);
                         RETURN(-EEXIST);
                 }
+                
                 if (!(body->valid & OBD_MD_FLEASIZE)) {
-                        /* no EA: create objects */
-                        rc = mds_create_objects(req, 2, rec, mds, obd,
-                                                dchild, handle, &ids);
+                        int ids_size = mds->mds_dt_desc.ld_tgt_count * sizeof(*ids);
+                        
+                        OBD_ALLOC(ids, ids_size);
+                        if (ids == NULL) {
+                                up(&dchild->d_inode->i_sem);
+                                RETURN(-ENOMEM);
+                        }
+
+                        /* 
+                         * synchronizing object creating to prevent another
+                         * threads take the same base objid values.
+                         */
+                        down(&mds->mds_create_sem);
+
+                        /* preparing base ids */
+                        mds_dt_save_objids(obd, ids);
+
+                        /* 
+                         * create objects, @ids will contain new allocated obj
+                         * ids.
+                         */
+                        rc = mds_create_object(obd, req, 2, rec,
+                                               dchild, handle, ids);
                         if (rc) {
-                                CERROR("mds_create_objects: rc = %d\n", rc);
+                                CERROR("mds_create_object: rc = %d\n", rc);
+                                up(&mds->mds_create_sem);
                                 up(&dchild->d_inode->i_sem);
+                                OBD_FREE(ids, ids_size);
                                 RETURN(rc);
                         }
+
+                        /*
+                         * update MDS objids by new ones allocated in
+                         * mds_create_object().
+                         */
+                        mds_dt_update_objids(obd, ids);
+                        OBD_FREE(ids, ids_size);
+                        
+                        up(&mds->mds_create_sem);
                 }
+                
                 if (S_ISREG(dchild->d_inode->i_mode) &&
                     (body->valid & OBD_MD_FLEASIZE)) {
                         rc = mds_revalidate_lov_ea(obd, dchild->d_inode,
@@ -746,6 +803,7 @@ static int mds_finish_open(struct ptlrpc_request *req, struct dentry *dchild,
                         }
                 }
         }
+        
         rc = mds_pack_acl(obd, req->rq_repmsg, 3, body, dchild->d_inode);
         if (rc < 0) {
                 CERROR("mds_pack_acl: rc = %d\n", rc);
@@ -759,6 +817,7 @@ static int mds_finish_open(struct ptlrpc_request *req, struct dentry *dchild,
                 body->valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
                                 OBD_MD_FLATIME | OBD_MD_FLMTIME);
         }
+
         up(&dchild->d_inode->i_sem);
 
         intent_set_disposition(rep, DISP_OPEN_OPEN);
@@ -768,12 +827,6 @@ static int mds_finish_open(struct ptlrpc_request *req, struct dentry *dchild,
 
         CDEBUG(D_INODE, "mfd %p, cookie "LPX64"\n", mfd,
                mfd->mfd_handle.h_cookie);
-        if (ids != NULL) {
-                mds_dt_update_objids(obd, ids);
-                OBD_FREE(ids, sizeof(*ids) * mds->mds_dt_desc.ld_tgt_count);
-        }
-        //if (rc)
-        //        mds_mfd_destroy(mfd);
         RETURN(rc);
 }
 
@@ -949,8 +1002,8 @@ int mds_open(struct mds_update_record *rec, int offset,
 
                 LASSERT(id_ino(rec->ur_id2));
 
-                rc = mds_open_by_id(req, rec->ur_id2, body, rec->ur_flags,
-                                    rec, rep);
+                rc = mds_open_by_id(req, rec->ur_id2, body,
+                                    rec->ur_flags, rec, rep);
                 if (rc != -ENOENT) {
                         mds_body_do_reverse_map(med, body);
                         RETURN(rc);
@@ -1511,7 +1564,7 @@ int mds_mfd_close(struct ptlrpc_request *req, int offset,
                         reply_body->valid |= OBD_MD_FLCOOKIE;
                 }
                
-               rc = mds_destroy_objects(obd, inode, 1);
+               rc = mds_destroy_object(obd, inode, 1);
                if (rc) {
                        CERROR("cannot destroy OSS object on close, err %d\n",
                               rc);
index 88eef7b..4b76927 100644 (file)
@@ -499,35 +499,38 @@ static int mds_reint_setattr(struct mds_update_record *rec, int offset,
                 } else if (rec->ur_iattr.ia_valid & ATTR_EA_RM) {
                         rc = -EOPNOTSUPP;
                         if (inode->i_op && inode->i_op->removexattr) 
-                                rc = inode->i_op->removexattr(de,
-                                                    rec->ur_eadata);
-                } else if ((S_ISREG(inode->i_mode) ||
-                            S_ISDIR(inode->i_mode)) && rec->ur_eadata != NULL) {
+                                rc = inode->i_op->removexattr(de, rec->ur_eadata);
+                } else if (S_ISREG(inode->i_mode) || S_ISDIR(inode->i_mode)) {
                         struct lov_stripe_md *lsm = NULL;
                         struct lov_user_md *lum = NULL;
-                        
-                        rc = ll_permission(inode, MAY_WRITE, NULL);
-                        if (rc < 0)
-                                GOTO(cleanup, rc);
 
-                        lum = rec->ur_eadata;
-                        /* if lmm_stripe_size is -1 delete default stripe from dir */
-                        if (S_ISDIR(inode->i_mode) &&
-                            lum->lmm_stripe_size == (typeof(lum->lmm_stripe_size))(-1)){
-                                rc = fsfilt_set_md(obd, inode, handle, NULL, 0, EA_LOV);
-                                if (rc)
-                                        GOTO(cleanup, rc);
-                        } else {
-                                rc = obd_iocontrol(OBD_IOC_LOV_SETSTRIPE, mds->mds_dt_exp,
-                                                   0, &lsm, rec->ur_eadata);
-                                if (rc)
+                        if (rec->ur_eadata != NULL) {
+                                rc = ll_permission(inode, MAY_WRITE, NULL);
+                                if (rc < 0)
                                         GOTO(cleanup, rc);
+
+                                lum = rec->ur_eadata;
+                        
+                                /* if lmm_stripe_size is -1 delete default
+                                 * stripe from dir */
+                                if (S_ISDIR(inode->i_mode) &&
+                                    lum->lmm_stripe_size == (typeof(lum->lmm_stripe_size))(-1)){
+                                        rc = fsfilt_set_md(obd, inode, handle, NULL, 0, EA_LOV);
+                                        if (rc)
+                                                GOTO(cleanup, rc);
+                                } else {
+                                        rc = obd_iocontrol(OBD_IOC_LOV_SETSTRIPE,
+                                                           mds->mds_dt_exp, 0,
+                                                           &lsm, rec->ur_eadata);
+                                        if (rc)
+                                                GOTO(cleanup, rc);
                                 
-                                obd_free_memmd(mds->mds_dt_exp, &lsm);
-                                rc = fsfilt_set_md(obd, inode, handle, rec->ur_eadata,
-                                                   rec->ur_eadatalen, EA_LOV);
-                                if (rc)
-                                        GOTO(cleanup, rc);
+                                        obd_free_memmd(mds->mds_dt_exp, &lsm);
+                                        rc = fsfilt_set_md(obd, inode, handle, rec->ur_eadata,
+                                                           rec->ur_eadatalen, EA_LOV);
+                                        if (rc)
+                                                GOTO(cleanup, rc);
+                                }
                         }
                 }    
         }
@@ -2249,7 +2252,7 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset,
                         body->valid |= OBD_MD_FLCOOKIE;
                 }
                 
-                rc = mds_destroy_objects(obd, child_inode, 1);
+                rc = mds_destroy_object(obd, child_inode, 1);
                 if (rc) {
                         CERROR("can't remove OST object, err %d\n",
                                rc);
@@ -3507,7 +3510,7 @@ static int mds_reint_rename(struct mds_update_record *rec, int offset,
                         body->valid |= OBD_MD_FLCOOKIE;
                 }
                 
-                rc = mds_destroy_objects(obd, old_inode, 1);
+                rc = mds_destroy_object(obd, old_inode, 1);
                 if (rc) {
                         CERROR("can't remove OST object, err %d\n",
                                rc);
index b1a171e..c480885 100644 (file)
@@ -83,8 +83,10 @@ mds_unlink_object(struct mds_obd *mds, struct inode *inode,
 
         CDEBUG(D_INODE, "destroy OSS object %d/%d\n",
                (int)oa->o_id, (int)oa->o_gr);
+
+        if (async)
+                oti.oti_flags |= OBD_MODE_ASYNC;
         
-        oti.oti_async = async;
         rc = obd_destroy(mds->mds_dt_exp, oa, lsm, &oti);
         obdo_free(oa);
 out_free_memmd:
index 1f9aa77..05c54f8 100644 (file)
@@ -1388,6 +1388,7 @@ static int filter_post_fs_cleanup(struct obd_device *obd)
 
         RETURN(rc);
 }
+
 #if 0
 static int filter_group_set_fs_flags(struct obd_device *obd, int group)
 {
@@ -1409,6 +1410,7 @@ static int filter_group_set_fs_flags(struct obd_device *obd, int group)
         RETURN(rc);
 }
 #endif
+
 static int filter_post_fs_setup(struct obd_device *obd)
 {
         struct filter_obd *filter = &obd->u.filter;
@@ -1686,7 +1688,7 @@ static int filter_connect_post(struct obd_export *exp, unsigned initial,
         char str[PTL_NALFMT_SIZE];
         struct obd_llogs *llog;
         struct llog_ctxt *ctxt;
-        int rc;
+        int rc = 0;
         ENTRY;
 
         fed = &exp->exp_filter_data;
@@ -1701,8 +1703,10 @@ static int filter_connect_post(struct obd_export *exp, unsigned initial,
         LASSERT(ctxt != NULL);
 
         rc = llog_receptor_accept(ctxt, exp->exp_imp_reverse);
+        
         portals_nid2str(exp->exp_connection->c_peer.peer_ni->pni_number,
                         exp->exp_connection->c_peer.peer_id.nid, str);
+        
         CDEBUG(D_OTHER, "%s: init llog ctxt for export "LPX64"/%s, group %d\n",
                obd->obd_name, exp->exp_connection->c_peer.peer_id.nid,
                str, fed->fed_group);
@@ -2045,6 +2049,7 @@ struct dentry *__filter_oa2dentry(struct obd_device *obd,
 {
         struct dentry *dchild = NULL;
         obd_gr group = 0;
+        ENTRY;
 
         if (oa->o_valid & OBD_MD_FLGROUP)
                 group = oa->o_gr;
@@ -2058,13 +2063,13 @@ struct dentry *__filter_oa2dentry(struct obd_device *obd,
         }
 
         if (dchild->d_inode == NULL) {
-                CERROR("%s: %s on non-existent object: "LPU64"\n",
-                       obd->obd_name, what, oa->o_id);
+                CDEBUG(D_INFO, "%s: %s on non-existent object: "
+                       LPU64"\n", obd->obd_name, what, oa->o_id);
                 f_dput(dchild);
                 RETURN(ERR_PTR(-ENOENT));
         }
 
-        return dchild;
+        RETURN(dchild);
 }
 
 static int filter_getattr(struct obd_export *exp, struct obdo *oa,
@@ -2094,79 +2099,106 @@ static int filter_getattr(struct obd_export *exp, struct obdo *oa,
         RETURN(rc);
 }
 
-/* this is called from filter_truncate() until we have filter_punch() */
-static int filter_setattr(struct obd_export *exp, struct obdo *oa,
-                          struct lov_stripe_md *md, struct obd_trans_info *oti)
+int filter_setattr_internal(struct obd_export *exp, struct dentry *dentry,
+                            struct obdo *oa, struct obd_trans_info *oti)
 {
-        struct lvfs_run_ctxt saved;
         struct filter_obd *filter;
-        struct dentry *dentry;
         struct iattr iattr;
-        struct ldlm_res_id res_id = { .name = { oa->o_id, 0, oa->o_gr, 0 } };
-        struct ldlm_resource *res;
         void *handle;
-        int rc, rc2;
+        int rc, err;
         ENTRY;
 
-        LASSERT(oti != NULL);
-
-        dentry = filter_oa2dentry(exp->exp_obd, oa);
-        if (IS_ERR(dentry))
-                RETURN(PTR_ERR(dentry));
-
+        LASSERT(dentry != NULL);
+        LASSERT(!IS_ERR(dentry));
+        LASSERT(dentry->d_inode != NULL);
+        
         filter = &exp->exp_obd->u.filter;
-
         iattr_from_obdo(&iattr, oa, oa->o_valid);
 
-        push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
-        lock_kernel();
-
         if (iattr.ia_valid & ATTR_SIZE)
                 down(&dentry->d_inode->i_sem);
-        handle = fsfilt_start(exp->exp_obd, dentry->d_inode, FSFILT_OP_SETATTR,
-                              oti);
+        handle = fsfilt_start(exp->exp_obd, dentry->d_inode,
+                              FSFILT_OP_SETATTR, oti);
         if (IS_ERR(handle))
                 GOTO(out_unlock, rc = PTR_ERR(handle));
 
         /* XXX this could be a rwsem instead, if filter_preprw played along */
         if (iattr.ia_valid & ATTR_ATTR_FLAG)
-                rc = fsfilt_iocontrol(exp->exp_obd, dentry->d_inode, NULL,
-                                      EXT3_IOC_SETFLAGS,
+                rc = fsfilt_iocontrol(exp->exp_obd, dentry->d_inode,
+                                      NULL, EXT3_IOC_SETFLAGS,
                                       (long)&iattr.ia_attr_flags);
         else
-                rc = fsfilt_setattr(exp->exp_obd, dentry, handle, &iattr, 1);
+                rc = fsfilt_setattr(exp->exp_obd, dentry, handle,
+                                    &iattr, 1);
+        
         rc = filter_finish_transno(exp, oti, rc);
-        rc2 = fsfilt_commit(exp->exp_obd, filter->fo_sb, dentry->d_inode, 
-                            handle, exp->exp_sync);
-        if (rc2) {
-                CERROR("error on commit, err = %d\n", rc2);
+        
+        err = fsfilt_commit(exp->exp_obd, filter->fo_sb,
+                            dentry->d_inode, handle,
+                            exp->exp_sync);
+        if (err) {
+                CERROR("error on commit, err = %d\n", err);
                 if (!rc)
-                        rc = rc2;
+                        rc = err;
         }
+        EXIT;
+out_unlock:
+        if (iattr.ia_valid & ATTR_SIZE)
+                up(&dentry->d_inode->i_sem);
+        return rc;
+}
+
+/* this is called from filter_truncate() until we have filter_punch() */
+int filter_setattr(struct obd_export *exp, struct obdo *oa,
+                   struct lov_stripe_md *md, struct obd_trans_info *oti)
+{
+        struct ldlm_res_id res_id = { .name = { oa->o_id, 0, oa->o_gr, 0 } };
+        struct ldlm_valblock_ops *ns_lvbo;
+        struct lvfs_run_ctxt saved;
+        struct filter_obd *filter;
+        struct ldlm_resource *res;
+        struct dentry *dentry;
+        int rc;
+        ENTRY;
+
+        LASSERT(oti != NULL);
+
+        filter = &exp->exp_obd->u.filter;
+        push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
+
+        /* make sure that object is allocated. */
+        dentry = filter_crow_object(exp->exp_obd,
+                                    oa->o_gr, oa->o_id);
+        if (IS_ERR(dentry))
+                GOTO(out_pop, rc = PTR_ERR(dentry));
+
+        lock_kernel();
+
+        /* setting objects attributes (including owner/group) */
+        rc = filter_setattr_internal(exp, dentry, oa, oti);
+        if (rc)
+                GOTO(out_unlock, rc);
 
         res = ldlm_resource_get(exp->exp_obd->obd_namespace, NULL,
                                 res_id, LDLM_EXTENT, 0);
+        
         if (res != NULL) {
-                if (res->lr_namespace->ns_lvbo &&
-                    res->lr_namespace->ns_lvbo->lvbo_update)
-                        rc = res->lr_namespace->ns_lvbo->lvbo_update(res, NULL,
-                                                                     0, 0);
+                ns_lvbo = res->lr_namespace->ns_lvbo;
+                if (ns_lvbo && ns_lvbo->lvbo_update)
+                        rc = ns_lvbo->lvbo_update(res, NULL, 0, 0);
                 ldlm_resource_putref(res);
-        } else if (iattr.ia_valid & ATTR_SIZE) {
-                /* called from MDS. */
         }
-
+        
         oa->o_valid = OBD_MD_FLID;
         obdo_from_inode(oa, dentry->d_inode, FILTER_VALID_FLAGS);
 
+        EXIT;
 out_unlock:
-        if (iattr.ia_valid & ATTR_SIZE)
-                up(&dentry->d_inode->i_sem);
         unlock_kernel();
-        pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
-
         f_dput(dentry);
-        RETURN(rc);
+out_pop:
+        pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
+        return rc;
 }
 
 /* XXX identical to osc_unpackmd */
@@ -2219,111 +2251,6 @@ static int filter_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
         RETURN(lsm_size);
 }
 
-static void filter_destroy_precreated(struct obd_export *exp, struct obdo *oa,
-                                      struct filter_obd *filter)
-{
-        struct obdo *doa = NULL;
-        __u64 last, id;
-        ENTRY;
-        
-        LASSERT(oa);
-        LASSERT(oa->o_gr != 0);
-        LASSERT(oa->o_valid & OBD_MD_FLGROUP);
-
-       doa = obdo_alloc();
-        if (doa == NULL) {
-                CERROR("cannot allocate doa, error %d\n",
-                       -ENOMEM);
-                EXIT;
-                return;
-        }
-
-        doa->o_mode = S_IFREG;
-        doa->o_gr = oa->o_gr;
-        doa->o_valid = oa->o_valid & (OBD_MD_FLGROUP | OBD_MD_FLID);
-
-        set_bit(doa->o_gr, &filter->fo_destroys_in_progress);
-        down(&filter->fo_create_locks[doa->o_gr]);
-        if (!test_bit(doa->o_gr, &filter->fo_destroys_in_progress)) {
-                CERROR("%s:["LPU64"] destroy_in_progress already cleared\n",
-                       exp->exp_obd->obd_name, doa->o_gr);
-                up(&filter->fo_create_locks[doa->o_gr]);
-                GOTO(out_free_doa, 0);
-        }
-
-        last = filter_last_id(filter, doa->o_gr);
-        CWARN("%s:["LPU64"] deleting orphan objects from "LPU64" to "LPU64"\n",
-              exp->exp_obd->obd_name, doa->o_gr, oa->o_id + 1, last);
-        for (id = oa->o_id + 1; id <= last; id++) {
-                doa->o_id = id;
-                filter_destroy(exp, doa, NULL, NULL);
-        }
-
-        CDEBUG(D_HA, "%s:["LPU64"] after destroy: set last_objids = "LPU64"\n",
-               exp->exp_obd->obd_name, doa->o_gr, oa->o_id);
-
-        filter_set_last_id(filter, doa->o_gr, oa->o_id);
-
-        clear_bit(doa->o_gr, &filter->fo_destroys_in_progress);
-        up(&filter->fo_create_locks[doa->o_gr]);
-
-        EXIT;
-out_free_doa:
-        obdo_free(doa);
-}
-
-/* returns a negative error or a nonnegative number of files to create */
-static int filter_should_precreate(struct obd_export *exp, struct obdo *oa,
-                                   obd_gr group)
-{
-        struct obd_device *obd = exp->exp_obd;
-        struct filter_obd *filter = &obd->u.filter;
-        int diff, rc;
-        ENTRY;
-
-        diff = oa->o_id - filter_last_id(filter, oa->o_gr);
-        CDEBUG(D_INFO, "filter_last_id() = "LPU64" -> diff = %d\n",
-               filter_last_id(filter, oa->o_gr), diff);
-
-        /* delete orphans request */
-        if ((oa->o_valid & OBD_MD_FLFLAGS) &&
-            (oa->o_flags & OBD_FL_DELORPHAN)) {
-                if (diff >= 0)
-                        RETURN(diff);
-                if (-diff > OST_MAX_PRECREATE) {
-                        CERROR("ignoring bogus orphan destroy request: obdid "
-                               LPU64" last_id "LPU64"\n",
-                               oa->o_id, filter_last_id(filter, oa->o_gr));
-                        RETURN(-EINVAL);
-                }
-                filter_destroy_precreated(exp, oa, filter);
-                rc = filter_update_last_objid(obd, group, 0);
-                if (rc)
-                        CERROR("unable to write lastobjid, but orphans"
-                               "were deleted\n");
-                RETURN(0);
-        } else {
-                /* only precreate if group == 0 and o_id is specfied */
-                if (!(oa->o_valid & OBD_FL_DELORPHAN) &&
-                    (/*group != 0 ||*/ oa->o_id == 0))
-                        RETURN(1);
-
-                LASSERTF(diff >= 0, LPU64" - "LPU64" = %d\n", oa->o_id,
-                         filter_last_id(filter, oa->o_gr), diff);
-                RETURN(diff);
-        }
-}
-static int filter_precreate_rec(struct obd_device *obd, struct dentry *dentry, 
-                                int *number, struct obdo *oa)
-{
-        int rc;
-        ENTRY;
-
-        rc = fsfilt_precreate_rec(obd, dentry, number, oa);
-
-        RETURN(rc);
-}
-
 static int filter_statfs(struct obd_device *obd, struct obd_statfs *osfs,
                          unsigned long max_age)
 {
@@ -2355,188 +2282,252 @@ static int filter_statfs(struct obd_device *obd, struct obd_statfs *osfs,
         RETURN(rc);
 }
 
-/* We rely on the fact that only one thread will be creating files in a given
- * group at a time, which is why we don't need an atomic filter_get_new_id.
- * Even if we had that atomic function, the following race would exist:
- *
- * thread 1: gets id x from filter_next_id
- * thread 2: gets id (x + 1) from filter_next_id
- * thread 2: creates object (x + 1)
- * thread 1: tries to create object x, gets -ENOSPC
- */
-static int filter_precreate(struct obd_device *obd, struct obdo *oa,
-                            obd_gr group, int *num)
+int filter_create_object(struct obd_device *obd, struct obdo *oa,
+                         obd_gr group)
 {
-        struct dentry *dchild = NULL, *dparent = NULL;
-        int err = 0, rc = 0, recreate_obj = 0, i;
+        struct dentry *dparent = NULL;
+        struct dentry *dchild = NULL;
         struct filter_obd *filter;
+        struct obd_statfs *osfs;
+        int cleanup_phase = 0;
+        int err = 0, rc = 0;
         void *handle = NULL;
         void *lock = NULL;
-        struct obd_statfs *osfs;
-        unsigned long enough_time = jiffies + (obd_timeout * HZ) / 3;
-        __u64 next_id;
         ENTRY;
 
         filter = &obd->u.filter;
 
-        if ((oa->o_valid & OBD_MD_FLFLAGS) &&
-            (oa->o_flags & OBD_FL_RECREATE_OBJS)) {
-                recreate_obj = 1;
-        } else {
-                OBD_ALLOC(osfs, sizeof(*osfs));
-                if (osfs == NULL)
-                        RETURN(-ENOMEM);
-                rc = filter_statfs(obd, osfs, jiffies-HZ);
-                if (rc == 0 && osfs->os_bavail < (osfs->os_blocks >> 10)) {
-                        CDEBUG(D_HA, "OST out of space! avail "LPU64"\n",
-                              osfs->os_bavail<<filter->fo_sb->s_blocksize_bits);
-                        *num = 0;
-                        rc = -ENOSPC;
-                }
-                OBD_FREE(osfs, sizeof(*osfs));
-                if (rc) {
-                        RETURN(rc);
-                }
+        OBD_ALLOC(osfs, sizeof(*osfs));
+        if (osfs == NULL)
+                RETURN(-ENOMEM);
+        rc = filter_statfs(obd, osfs, jiffies - HZ);
+        if (rc == 0 && osfs->os_bavail < (osfs->os_blocks >> 10)) {
+                CDEBUG(D_HA, "OST out of space! avail "LPU64"\n",
+                       osfs->os_bavail << filter->fo_sb->s_blocksize_bits);
+                rc = -ENOSPC;
         }
-
-        CDEBUG(D_HA, "%s: precreating %d objects\n", obd->obd_name, *num);
+        OBD_FREE(osfs, sizeof(*osfs));
+        if (rc)
+                RETURN(rc);
 
         down(&filter->fo_create_locks[group]);
 
-        for (i = 0; i < *num && err == 0; i++) {
-                int cleanup_phase = 0;
+        if (test_bit(group, &filter->fo_destroys_in_progress)) {
+                CWARN("%s: precreate aborted by destroy\n",
+                      obd->obd_name);
+                GOTO(out, rc = -EALREADY);
+        }
 
-                if (test_bit(group, &filter->fo_destroys_in_progress)) {
-                        CWARN("%s: precreate aborted by destroy\n",
-                              obd->obd_name);
-                        break;
-                }
+        CDEBUG(D_INFO, "precreate objid "LPU64"\n", oa->o_id);
 
-                if (recreate_obj) {
-                        __u64 last_id;
-                        next_id = oa->o_id;
-                        last_id = filter_last_id(filter, group);
-                        if (next_id > last_id) {
-                                CERROR("Error: Trying to recreate obj greater"
-                                       "than last id "LPD64" > "LPD64"\n",
-                                       next_id, last_id);
-                                GOTO(cleanup, rc = -EINVAL);
-                        }
-                } else {
-                        next_id = filter_last_id(filter, group) + 1;
-                }
+        dparent = filter_parent_lock(obd, group, oa->o_id, &lock);
+        if (IS_ERR(dparent))
+                GOTO(cleanup, rc = PTR_ERR(dparent));
+        cleanup_phase = 1;
 
-                CDEBUG(D_INFO, "precreate objid "LPU64"\n", next_id);
-
-                dparent = filter_parent_lock(obd, group, next_id, &lock);
-                if (IS_ERR(dparent))
-                        GOTO(cleanup, rc = PTR_ERR(dparent));
-                cleanup_phase = 1;
-
-                /* precreate objects are not logged */
-                fsfilt_set_fs_flags(obd, dparent->d_inode, SM_PRECREATE);
-
-                dchild = filter_id2dentry(obd, dparent, group, next_id);
-                if (IS_ERR(dchild))
-                        GOTO(cleanup, rc = PTR_ERR(dchild));
-                cleanup_phase = 2;
-
-                if (dchild->d_inode != NULL) {
-                        /* This would only happen if lastobjid was bad on disk*/
-                        /* Could also happen if recreating missing obj but
-                         * already exists
-                         */
-                        if (recreate_obj) {
-                                CERROR("%s: recreating existing object %.*s?\n",
-                                       obd->obd_name, dchild->d_name.len,
-                                       dchild->d_name.name);
-                        } else {
-                                CERROR("%s: Serious error: objid %.*s already "
-                                       "exists; is this filesystem corrupt?\n",
-                                       obd->obd_name, dchild->d_name.len,
-                                       dchild->d_name.name);
-                                LBUG();
-                        }
-                        GOTO(cleanup, rc = -EEXIST);
-                }
+        dchild = filter_id2dentry(obd, dparent, group, oa->o_id);
+        if (IS_ERR(dchild))
+                GOTO(cleanup, rc = PTR_ERR(dchild));
+        cleanup_phase = 2;
+
+        if (dchild->d_inode != NULL) {
+                CERROR("%s: serious error: objid %.*s already "
+                       "exists; is this filesystem corrupted?\n",
+                       obd->obd_name, dchild->d_name.len,
+                       dchild->d_name.name);
+                GOTO(cleanup, rc = -EEXIST);
+        }
+
+        handle = fsfilt_start_log(obd, dparent->d_inode,
+                                  FSFILT_OP_CREATE, NULL, 1);
+        if (IS_ERR(handle))
+                GOTO(cleanup, rc = PTR_ERR(handle));
+        cleanup_phase = 3;
 
-                handle = fsfilt_start_log(obd, dparent->d_inode,
-                                          FSFILT_OP_CREATE, NULL, 1);
-                if (IS_ERR(handle))
-                        GOTO(cleanup, rc = PTR_ERR(handle));
-                cleanup_phase = 3;
+        rc = ll_vfs_create(dparent->d_inode, dchild, S_IFREG, NULL);
+        if (rc) {
+                CERROR("create failed rc = %d\n", rc);
+                GOTO(cleanup, rc);
+        }
+
+        fsfilt_set_fs_flags(obd, dparent->d_inode, SM_DO_REC);
+
+        if (oa->o_id > filter_last_id(filter, group)) {
+                /*
+                 * saving last created object id, it will be needed in recovery
+                 * for deleting orphanes.
+                 */
+                filter_set_last_id(filter, group, oa->o_id);
 
-                rc = ll_vfs_create(dparent->d_inode, dchild, S_IFREG, NULL);
+                rc = filter_update_last_objid(obd, group, 0);
                 if (rc) {
-                        CERROR("create failed rc = %d\n", rc);
-                        GOTO(cleanup, rc);
+                        CERROR("unable to write lastobjid, but "
+                               "orphans were deleted, err = %d\n",
+                               rc);
+                        rc = 0;
                 }
-
-                if (!recreate_obj) {
-                        filter_set_last_id(filter, group, next_id);
-                        err = filter_update_last_objid(obd, group, 0);
-                        if (err)
-                                CERROR("unable to write lastobjid "
-                                       "but file created\n");
+        }
+cleanup:
+        switch(cleanup_phase) {
+        case 3:
+                err = fsfilt_commit(obd, filter->fo_sb,
+                                    dparent->d_inode, handle, 0);
+                if (err) {
+                        CERROR("error on commit, err = %d\n", err);
+                        if (!rc)
+                                rc = err;
                 }
-                fsfilt_set_fs_flags(obd, dparent->d_inode, SM_DO_REC);
+        case 2:
+                f_dput(dchild);
+        case 1:
+                filter_parent_unlock(dparent, lock);
+        case 0:
+                break;
+        }
+
+        if (rc)
+                GOTO(out, rc);
+
+out:
+        up(&filter->fo_create_locks[group]);
+        RETURN(rc);
+}
+
+struct dentry *filter_crow_object(struct obd_device *obd,
+                                  __u64 ogr, __u64 oid)
+{
+        struct dentry *dentry;
+        struct obdo *oa;
+        int rc = 0;
+        ENTRY;
+
+        /* check if object is already allocated */
+        dentry = filter_id2dentry(obd, NULL, ogr, oid);
+        if (IS_ERR(dentry))
+                RETURN(dentry);
+
+        if (dentry->d_inode)
+                RETURN(dentry);
+
+        f_dput(dentry);
         
-        cleanup:
-                switch(cleanup_phase) {
-                case 3:
-                        err = fsfilt_commit(obd, filter->fo_sb,
-                                            dparent->d_inode, handle, 0);
-                        if (err) {
-                                CERROR("error on commit, err = %d\n", err);
-                                if (!rc)
-                                        rc = err;
-                        }
-                case 2:
-                        f_dput(dchild);
-                case 1:
-                        filter_parent_unlock(dparent, lock);
-                case 0:
-                        break;
-                }
+        /* allocate object as it does not exist */
+        oa = obdo_alloc();
+        if (oa == NULL)
+                RETURN(ERR_PTR(-ENOMEM));
 
-                if (rc)
-                        break;
-                if (time_after(jiffies, enough_time)) {
-                        CDEBUG(D_INODE,"%s: precreate slow - want %d got %d \n",
-                               obd->obd_name, *num, i);
-                        break;
-                }
+        oa->o_id = oid;
+        oa->o_gr = ogr;
+        oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
+
+        CDEBUG(D_INODE, "OSS object "LPU64"/"LPU64
+               " does not exists - allocate now\n",
+               oid, ogr);
+
+        rc = filter_create_object(obd, oa, oa->o_gr);
+        if (rc) {
+                CERROR("cannot create OSS object "LPU64"/"LPU64
+                       ", err = %d\n", oa->o_id, oa->o_gr, rc);
+                GOTO(out_free_oa, dentry = ERR_PTR(rc));
         }
 
-        *num = i;
+        /* lookup for just created object and return it to caller */
+        dentry = filter_id2dentry(obd, NULL, ogr, oid);
+        if (IS_ERR(dentry))
+                GOTO(out_free_oa, dentry);
+                
+        if (dentry->d_inode == NULL) {
+                f_dput(dentry);
+                dentry = ERR_PTR(-ENOENT);
+                CERROR("cannot find just created OSS object "
+                       LPU64"/"LPU64" err = %d\n", oid,
+                       ogr, (int)PTR_ERR(dentry));
+                GOTO(out_free_oa, dentry);
+        }
 
-        /* check if we have an error after ll_vfs_create(). It is possible that
-         * there will be say -ENOSPC and we will leak it. */
-        if (rc == 0)
-                rc = filter_precreate_rec(obd, dparent, num, oa);
+        EXIT;
+out_free_oa:
+        obdo_free(oa);
+        return dentry;
+}
 
-        up(&filter->fo_create_locks[group]);
+static int
+filter_clear_orphans(struct obd_export *exp, struct obdo *oa)
+{
+        struct obd_device *obd = NULL;
+        struct filter_obd *filter;
+        struct obdo *doa = NULL;
+        int rc = 0, orphans;
+        __u64 last, id;
+        ENTRY;
+        
+        LASSERT(oa);
+        LASSERT(oa->o_gr != 0);
+        LASSERT(oa->o_valid & OBD_MD_FLGROUP);
+
+        obd = exp->exp_obd;
+        filter = &obd->u.filter;
 
-        CDEBUG(D_HA, "%s: server last_objid for group "LPU64": "LPU64"\n",
-               obd->obd_name, group, filter->fo_last_objids[group]);
+        last = filter_last_id(filter, oa->o_gr);
+        orphans = last - oa->o_id;
+        
+        if (orphans <= 0)
+                RETURN(0);
+                
+       doa = obdo_alloc();
+        if (doa == NULL)
+                RETURN(-ENOMEM);
 
-        CDEBUG(D_HA, "%s: filter_precreate() created %d objects\n",
-               obd->obd_name, i);
+        doa->o_gr = oa->o_gr;
+        doa->o_mode = S_IFREG;
+        doa->o_valid = oa->o_valid & (OBD_MD_FLGROUP | OBD_MD_FLID);
 
-        RETURN(rc);
+        set_bit(doa->o_gr, &filter->fo_destroys_in_progress);
+        down(&filter->fo_create_locks[doa->o_gr]);
+        if (!test_bit(doa->o_gr, &filter->fo_destroys_in_progress)) {
+                CERROR("%s:["LPU64"] destroy_in_progress already cleared\n",
+                       exp->exp_obd->obd_name, doa->o_gr);
+                up(&filter->fo_create_locks[doa->o_gr]);
+                GOTO(out_free_doa, 0);
+        }
+
+        CWARN("%s:["LPU64"] deleting orphan objects from "LPU64" to "
+              LPU64"\n", exp->exp_obd->obd_name, doa->o_gr,
+              oa->o_id + 1, last);
+        
+        for (id = oa->o_id + 1; id <= last; id++) {
+                doa->o_id = id;
+                filter_destroy(exp, doa, NULL, NULL);
+        }
+
+        CDEBUG(D_HA, "%s:["LPU64"] after destroy: set last_objids = "
+               LPU64"\n", exp->exp_obd->obd_name, doa->o_gr, oa->o_id);
+
+        filter_set_last_id(filter, oa->o_gr, oa->o_id);
+        clear_bit(doa->o_gr, &filter->fo_destroys_in_progress);
+        up(&filter->fo_create_locks[oa->o_gr]);
+
+        EXIT;
+out_free_doa:
+        obdo_free(doa);
+        return rc;
 }
 
-static int filter_create(struct obd_export *exp, struct obdo *oa,
-                         void *acl, int acl_size,
-                         struct lov_stripe_md **ea, struct obd_trans_info *oti)
+/*
+ * by now this function is only needed as entry point for deleting orphanes on
+ * OSS as objects are created on first write attempt. --umka
+ */
+static int
+filter_create(struct obd_export *exp, struct obdo *oa, void *acl,
+              int acl_size, struct lov_stripe_md **ea,
+              struct obd_trans_info *oti)
 {
+        struct filter_export_data *fed;
         struct obd_device *obd = NULL;
-        struct filter_obd *filter;
+        int group = oa->o_gr, rc = 0;
         struct lvfs_run_ctxt saved;
-        struct lov_stripe_md *lsm = NULL;
-        struct filter_export_data *fed;
+        struct filter_obd *filter;
         char str[PTL_NALFMT_SIZE];
-        int group = oa->o_gr, rc = 0, diff, recreate_objs = 0;
         ENTRY;
 
         LASSERT(acl == NULL && acl_size == 0);
@@ -2549,19 +2540,14 @@ static int filter_create(struct obd_export *exp, struct obdo *oa,
                 RETURN(-EINVAL);
         }
 
-        if ((oa->o_valid & OBD_MD_FLFLAGS) &&
-            (oa->o_flags & OBD_FL_RECREATE_OBJS))
-                recreate_objs = 1;
-
         obd = exp->exp_obd;
         fed = &exp->exp_filter_data;
         filter = &obd->u.filter;
 
-        if (fed->fed_group != group && !recreate_objs &&
-            !(oa->o_valid & OBD_MD_REINT)) {
+        if (fed->fed_group != group) {
                 portals_nid2str(exp->exp_connection->c_peer.peer_ni->pni_number,
                                 exp->exp_connection->c_peer.peer_id.nid, str);
-                CERROR("!!! This export (nid "LPX64"/%s) used object group %d "
+                CERROR("!!! this export (nid "LPX64"/%s) used object group %d "
                        "earlier; now it's trying to use group %d!  This could "
                        "be a bug in the MDS.  Tell CFS.\n",
                        exp->exp_connection->c_peer.peer_id.nid, str,
@@ -2571,54 +2557,28 @@ static int filter_create(struct obd_export *exp, struct obdo *oa,
 
         CDEBUG(D_INFO, "filter_create(od->o_gr=%d,od->o_id="LPU64")\n",
                group, oa->o_id);
-        if (ea != NULL) {
-                lsm = *ea;
-                if (lsm == NULL) {
-                        rc = obd_alloc_memmd(exp, &lsm);
-                        if (rc < 0)
-                                RETURN(rc);
-                }
-        }
 
         obd = exp->exp_obd;
         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
 
-        if (oa->o_valid & OBD_MD_REINT) {
-                int num = *((int*)oa->o_inline);  
-                rc = filter_precreate(obd, oa, oa->o_gr, &num);
-        } else if (recreate_objs) {
-                if (oa->o_id > filter_last_id(&obd->u.filter, group)) {
-                        CERROR("recreate objid "LPU64" > last id "LPU64"\n",
-                               oa->o_id, filter_last_id(&obd->u.filter, group));
-                        rc = -EINVAL;
-                } else {
-                        diff = 1;
-                        rc = filter_precreate(obd, oa, group, &diff);
-                }
+        LASSERT((oa->o_valid & OBD_MD_FLFLAGS) &&
+                (oa->o_flags == OBD_FL_DELORPHAN));
+                
+        rc = filter_clear_orphans(exp, oa);
+        if (rc) {
+                CERROR("cannot clear orphanes starting from "
+                       LPU64", err = %d\n", oa->o_id, rc);
         } else {
-                diff = filter_should_precreate(exp, oa, group);
-                if (diff > 0) {
-                        oa->o_id = filter_last_id(&obd->u.filter, group);
-                        rc = filter_precreate(obd, oa, group, &diff);
-                        oa->o_id = filter_last_id(&obd->u.filter, oa->o_gr);
-                        oa->o_valid = OBD_MD_FLID;
+                rc = filter_update_last_objid(obd, group, 0);
+                if (rc) {
+                        CERROR("unable to write lastobjid, but "
+                               "orphans were deleted, err = %d\n",
+                               rc);
                 }
         }
-
         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
-        if (rc && ea != NULL && *ea != lsm) {
-                obd_free_memmd(exp, &lsm);
-        } else if (rc == 0 && ea != NULL) {
-                /* XXX LOV STACKING: the lsm that is passed to us from
-                 * LOV does not have valid lsm_oinfo data structs, so
-                 * don't go touching that.  This needs to be fixed in a
-                 * big way. */
-                lsm->lsm_object_id = oa->o_id;
-                lsm->lsm_object_gr = oa->o_gr;
-                *ea = lsm;
-        }
-
-        RETURN(rc);
+        
+        RETURN(0);
 }
 
 static int filter_destroy(struct obd_export *exp, struct obdo *oa,
index 06b802a..95583ae 100644 (file)
@@ -111,6 +111,18 @@ int filter_update_last_objid(struct obd_device *, obd_gr, int force_sync);
 int filter_common_setup(struct obd_device *, obd_count len, void *buf,
                         char *option);
 
+struct dentry *filter_crow_object(struct obd_device *obd, __u64 ogr,
+                                  __u64 oid);
+
+int filter_setattr_internal(struct obd_export *exp, struct dentry *dentry,
+                            struct obdo *oa, struct obd_trans_info *oti);
+
+int filter_setattr(struct obd_export *exp, struct obdo *oa,
+                   struct lov_stripe_md *md, struct obd_trans_info *oti);
+
+int filter_create_object(struct obd_device *obd, struct obdo *oa,
+                         obd_gr group);
+
 /* filter_lvb.c */
 extern struct ldlm_valblock_ops filter_lvbo;
 
index 21d7464..1c9cd4d 100644 (file)
@@ -297,16 +297,10 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
         if (rc)
                 GOTO(cleanup, rc);
 
-        dentry = filter_oa2dentry(obd, oa);
+        dentry = filter_id2dentry(obd, NULL, oa->o_gr, oa->o_id);
         if (IS_ERR(dentry))
                 GOTO(cleanup, rc = PTR_ERR(dentry));
 
-        if (dentry->d_inode == NULL) {
-                CERROR("trying to BRW to non-existent file "LPU64"\n",
-                               obj->ioo_id);
-                GOTO(cleanup, rc = -ENOENT);
-        }
-
         inode = dentry->d_inode; 
 
         fsfilt_check_slow(now, obd_timeout, "preprw_read setup");
@@ -318,13 +312,14 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
                 lnb->len    = rnb->len;
                 lnb->flags  = rnb->flags;
 
-                if (inode->i_size <= rnb->offset)
-                      /* If there's no more data, abort early.
-                      * lnb->page == NULL and lnb->rc == 0, so it's
-                      * easy to detect later. */
+                if ((inode && inode->i_size <= rnb->offset) || inode == NULL)
+                        /*
+                         * if there's no more data, abort early.  lnb->page == *
+                         * NULL and lnb->rc == 0, so it's easy to detect later.
+                         */
                         break;
-                else
-                        rc = filter_alloc_dio_page(obd, inode, lnb);
+                
+                rc = filter_alloc_dio_page(obd, inode, lnb);
                 if (rc) {
                         CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
                              "page err %u@"LPU64" %u/%u %p: rc %d\n",
@@ -345,33 +340,37 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
 
         fsfilt_check_slow(now, obd_timeout, "start_page_read");
 
-        rc = filter_direct_io(OBD_BRW_READ, dentry, iobuf, exp,
-                              NULL, NULL, NULL);
-        if (rc)
-                GOTO(cleanup, rc);
-
-        lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_READ_BYTES, tot_bytes);
+        if (inode != NULL) {
+                rc = filter_direct_io(OBD_BRW_READ, dentry, iobuf,
+                                      exp, NULL, NULL, NULL);
+                if (rc)
+                        GOTO(cleanup, rc);
+        }
 
+        lprocfs_counter_add(obd->obd_stats,
+                            LPROC_FILTER_READ_BYTES, tot_bytes);
         filter_tally_read(&exp->exp_obd->u.filter, res, niocount);
 
         EXIT;
-
 cleanup:
-        if (rc != 0) {
-                filter_free_dio_pages(objcount, obj, niocount, res);
-
-                if (dentry != NULL)
-                        f_dput(dentry);
-                else
-                        CERROR("NULL dentry in cleanup -- tell CFS\n");
+        if (rc) {
+                filter_free_dio_pages(objcount, obj,
+                                      niocount, res);
+                /*
+                 * in other cases (no errors) dentry is released in
+                 * filter_commitrw_read().
+                 */
+                f_dput(dentry);
         }
 
         if (iobuf != NULL)
                 filter_free_iobuf(iobuf);
 
         pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
+
         if (rc)
                 CERROR("io error %d\n", rc);
+        
         return rc;
 }
 
@@ -498,15 +497,17 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa,
                                struct niobuf_local *res,
                                struct obd_trans_info *oti)
 {
+        int rc = 0, i, tot_bytes = 0, cleanup_phase = 0;
+        struct niobuf_local *lnb = res;
+        struct dentry *dentry = NULL;
+        unsigned long now = jiffies;
         struct lvfs_run_ctxt saved;
         struct niobuf_remote *rnb;
-        struct niobuf_local *lnb = res;
         struct fsfilt_objinfo fso;
-        struct dentry *dentry = NULL;
-        void *iobuf; 
+        struct obd_device *obd;
         obd_size left;
-        unsigned long now = jiffies;
-        int rc = 0, i, tot_bytes = 0, cleanup_phase = 0;
+        void *iobuf; 
+        
         ENTRY;
         LASSERT(objcount == 1);
         LASSERT(obj->ioo_bufcnt > 0);
@@ -518,26 +519,36 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa,
                 GOTO(cleanup, rc);
         cleanup_phase = 1;
 
-        push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
-        dentry = filter_id2dentry(exp->exp_obd, NULL, obj->ioo_gr,
-                                  obj->ioo_id);
+        obd = exp->exp_obd;
+        push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+
+        /* make sure that object is already allocated */
+        dentry = filter_crow_object(obd, obj->ioo_gr,
+                                    obj->ioo_id);
+
         if (IS_ERR(dentry))
                 GOTO(cleanup, rc = PTR_ERR(dentry));
-        
+
         cleanup_phase = 2;
-        
-        if (dentry->d_inode == NULL) {
-                CERROR("trying to BRW to non-existent file "LPU64"\n",
-                       obj->ioo_id);
-                GOTO(cleanup, rc = -ENOENT);
-        }
 
+        /* 
+         * setting attrs passed along with write requests (owner/group). We
+         * goind it here as object should not exist with wrong owner/group as
+         * this may break quotas. --umka
+         */
+        rc = filter_setattr_internal(exp, dentry, oa, NULL);
+        if (rc) {
+                CERROR("cannot set attrs on write, err %d\n",
+                       rc);
+                GOTO(cleanup, rc);
+        }
+        
         fso.fso_dentry = dentry;
         fso.fso_bufcnt = obj->ioo_bufcnt;
 
         fsfilt_check_slow(now, obd_timeout, "preprw_write setup");
 
-        spin_lock(&exp->exp_obd->obd_osfs_lock);
+        spin_lock(&obd->obd_osfs_lock);
         if (oa)
                 filter_grant_incoming(exp, oa);
         
@@ -554,7 +565,7 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa,
          * o_valid here. */
         oa->o_valid = 0;
 
-        spin_unlock(&exp->exp_obd->obd_osfs_lock);
+        spin_unlock(&obd->obd_osfs_lock);
 
         if (rc) 
                 GOTO(cleanup, rc);
@@ -569,7 +580,7 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa,
                 lnb->len    = rnb->len;
                 lnb->flags  = rnb->flags;
 
-                rc = filter_alloc_dio_page(exp->exp_obd, dentry->d_inode,lnb);
+                rc = filter_alloc_dio_page(obd, dentry->d_inode,lnb);
                 if (rc) {
                         CERROR("page err %u@"LPU64" %u/%u %p: rc %d\n",
                                lnb->len, lnb->offset,
@@ -586,8 +597,7 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa,
                  * asked to read unmapped blocks -- brw_kiovec() does this. */
                 if (lnb->len != PAGE_SIZE) {
                         if (lnb->offset + lnb->len < dentry->d_inode->i_size) {
-                                filter_iobuf_add_page(exp->exp_obd, iobuf,
-                                                      dentry->d_inode,
+                                filter_iobuf_add_page(obd, iobuf, dentry->d_inode,
                                                       lnb->page);
                         } else {
                                 memset(kmap(lnb->page) + lnb->len, 0,
@@ -604,7 +614,7 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa,
         
         fsfilt_check_slow(now, obd_timeout, "start_page_write");
 
-        lprocfs_counter_add(exp->exp_obd->obd_stats, LPROC_FILTER_WRITE_BYTES,
+        lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_WRITE_BYTES,
                             tot_bytes);
         EXIT;
 cleanup:
@@ -613,18 +623,18 @@ cleanup:
                 if (rc)
                         filter_free_dio_pages(objcount, obj, niocount, res);
         case 3:
-                pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
+                pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
                 filter_free_iobuf(iobuf);
         case 2:
-                if (rc)
+                if (rc && dentry && !IS_ERR(dentry))
                         f_dput(dentry);
                 break;
         case 1:
-                spin_lock(&exp->exp_obd->obd_osfs_lock);
+                spin_lock(&obd->obd_osfs_lock);
                 if (oa)
                         filter_grant_incoming(exp, oa);
-                spin_unlock(&exp->exp_obd->obd_osfs_lock);
-                pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
+                spin_unlock(&obd->obd_osfs_lock);
+                pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
                 filter_free_iobuf(iobuf);
                 break;
         default:;
index c54b1d7..500ff07 100644 (file)
 /* Called with res->lr_lvb_sem held */
 static int filter_lvbo_init(struct ldlm_resource *res)
 {
-        int rc = 0;
-        struct obdo *oa = NULL;
         struct ost_lvb *lvb = NULL;
+        struct filter_obd *filter;
         struct obd_device *obd;
         struct dentry *dentry;
+        __u64 ogr, oid;
+        int rc = 0;
         ENTRY;
 
         LASSERT(res);
@@ -65,38 +66,34 @@ static int filter_lvbo_init(struct ldlm_resource *res)
         res->lr_lvb_len = sizeof(*lvb);
 
         obd = res->lr_namespace->ns_lvbp;
+        filter = &obd->u.filter;
         LASSERT(obd != NULL);
 
-        oa = obdo_alloc();
-        if (oa == NULL)
-                GOTO(out, rc = -ENOMEM);
-
-        oa->o_id = res->lr_name.name[0];
-        oa->o_gr = res->lr_name.name[2];
-        oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
+        oid = res->lr_name.name[0];
+        ogr = res->lr_name.name[2];
 
-        dentry = filter_oa2dentry(obd, oa);
+        dentry = filter_id2dentry(obd, NULL, ogr, oid);
         if (IS_ERR(dentry))
                 GOTO(out, rc = PTR_ERR(dentry));
 
-        /* Limit the valid bits in the return data to what we actually use */
-        oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
-        obdo_from_inode(oa, dentry->d_inode, FILTER_VALID_FLAGS);
-        f_dput(dentry);
-
-        lvb->lvb_size = dentry->d_inode->i_size;
-        lvb->lvb_mtime = LTIME_S(dentry->d_inode->i_mtime);
-        lvb->lvb_blocks = dentry->d_inode->i_blocks;
+        if (dentry->d_inode == NULL) {
+                lvb->lvb_size = 0;
+                lvb->lvb_blocks = 0;
+                lvb->lvb_mtime = LTIME_S(CURRENT_TIME);
+        } else {
+                lvb->lvb_size = dentry->d_inode->i_size;
+                lvb->lvb_blocks = dentry->d_inode->i_blocks;
+                lvb->lvb_mtime = LTIME_S(dentry->d_inode->i_mtime);
+        }
 
         CDEBUG(D_DLMTRACE, "res: "LPU64" initial lvb size: "LPU64", "
-               "mtime: "LPU64", blocks: "LPU64"\n",
-               res->lr_name.name[0], lvb->lvb_size,
-               lvb->lvb_mtime, lvb->lvb_blocks);
+               "mtime: "LPU64", blocks: "LPU64"\n", res->lr_name.name[0],
+               lvb->lvb_size, lvb->lvb_mtime, lvb->lvb_blocks);
 
- out:
-        if (oa)
-                obdo_free(oa);
-        /* Don't free lvb data on lookup error */
+        f_dput(dentry);
+        EXIT;
+out:
+        /* don't free lvb data on lookup error */
         return rc;
 }
 
@@ -110,11 +107,11 @@ static int filter_lvbo_init(struct ldlm_resource *res)
 static int filter_lvbo_update(struct ldlm_resource *res, struct lustre_msg *m,
                               int buf_idx, int increase)
 {
-        int rc = 0;
-        struct obdo *oa = NULL;
         struct ost_lvb *lvb = res->lr_lvb_data;
         struct obd_device *obd;
+        struct obdo *oa = NULL;
         struct dentry *dentry;
+        int rc = 0;
         ENTRY;
 
         LASSERT(res);
index 4f0035d..cda20af 100644 (file)
@@ -166,63 +166,6 @@ int osc_rd_cur_grant_bytes(char *page, char **start, off_t off, int count,
         return rc;
 }
 
-int osc_rd_create_count(char *page, char **start, off_t off, int count,
-                        int *eof, void *data)
-{
-        struct obd_device *obd = data;
-
-        if (obd == NULL)
-                return 0;
-
-        return snprintf(page, count, "%d\n",
-                        obd->u.cli.cl_oscc.oscc_grow_count);
-}
-
-int osc_wr_create_count(struct file *file, const char *buffer,
-                        unsigned long count, void *data)
-{
-        struct obd_device *obd = data;
-        int val, rc;
-
-        if (obd == NULL)
-                return 0;
-
-        rc = lprocfs_write_helper(buffer, count, &val);
-        if (rc)
-                return rc;
-
-        if (val < 0)
-                return -ERANGE;
-
-        obd->u.cli.cl_oscc.oscc_grow_count = val;
-
-        return count;
-}
-
-int osc_rd_prealloc_next_id(char *page, char **start, off_t off, int count,
-                            int *eof, void *data)
-{
-        struct obd_device *obd = data;
-
-        if (obd == NULL)
-                return 0;
-
-        return snprintf(page, count, LPU64"\n",
-                        obd->u.cli.cl_oscc.oscc_next_id);
-}
-
-int osc_rd_prealloc_last_id(char *page, char **start, off_t off, int count,
-                            int *eof, void *data)
-{
-        struct obd_device *obd = data;
-
-        if (obd == NULL)
-                return 0;
-
-        return snprintf(page, count, LPU64"\n",
-                        obd->u.cli.cl_oscc.oscc_last_id);
-}
-
 static struct lprocfs_vars lprocfs_obd_vars[] = {
         { "uuid",            lprocfs_rd_uuid,        0, 0 },
         { "blocksize",       lprocfs_rd_blksize,     0, 0 },
@@ -241,9 +184,6 @@ static struct lprocfs_vars lprocfs_obd_vars[] = {
         { "max_dirty_mb", osc_rd_max_dirty_mb, osc_wr_max_dirty_mb, 0 },
         { "cur_dirty_bytes", osc_rd_cur_dirty_bytes, 0, 0 },
         { "cur_grant_bytes", osc_rd_cur_grant_bytes, 0, 0 },
-        { "create_count", osc_rd_create_count, osc_wr_create_count, 0 },
-        { "prealloc_next_id", osc_rd_prealloc_next_id, 0, 0 },
-        { "prealloc_last_id", osc_rd_prealloc_last_id, 0, 0 },
         { 0 }
 };
 
index a80cd1a..722cfc7 100644 (file)
 #include <linux/obd_class.h>
 #include "osc_internal.h"
 
-static int osc_interpret_create(struct ptlrpc_request *req, void *data, int rc)
-{
-        struct osc_creator *oscc;
-        struct ost_body *body = NULL;
-        ENTRY;
-
-        if (req->rq_repmsg) {
-                body = lustre_swab_repbuf(req, 0, sizeof(*body),
-                                          lustre_swab_ost_body);
-                if (body == NULL && rc == 0)
-                        rc = -EPROTO;
-        }
-
-        oscc = req->rq_async_args.pointer_arg[0];
-        spin_lock(&oscc->oscc_lock);
-        oscc->oscc_flags &= ~OSCC_FLAG_CREATING;
-        if (rc == -ENOSPC || rc == -EROFS) {
-                oscc->oscc_flags |= OSCC_FLAG_NOSPC;
-                if (body && rc == -ENOSPC) {
-                        oscc->oscc_grow_count = OST_MIN_PRECREATE;
-                        oscc->oscc_last_id = body->oa.o_id;
-                }
-                spin_unlock(&oscc->oscc_lock);
-                DEBUG_REQ(D_INODE, req, "OST out of space, flagging");
-        } else if (rc != 0 && rc != -EIO) {
-                oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
-                oscc->oscc_grow_count = OST_MIN_PRECREATE;
-                spin_unlock(&oscc->oscc_lock);
-                DEBUG_REQ(D_ERROR, req,
-                          "unknown rc %d from async create: failing oscc", rc);
-                ptlrpc_fail_import(req->rq_import, req->rq_import_generation);
-        } else {
-                if (rc == 0) {
-                        oscc->oscc_flags &= ~OSCC_FLAG_LOW;
-                        if (body) {
-                                int diff = body->oa.o_id - oscc->oscc_last_id;
-                                if (diff != oscc->oscc_grow_count)
-                                        oscc->oscc_grow_count =
-                                                max(diff/3, OST_MIN_PRECREATE);
-                                oscc->oscc_last_id = body->oa.o_id;
-                        }
-                }
-                spin_unlock(&oscc->oscc_lock);
-        }
-
-        CDEBUG(D_HA, "preallocated through id "LPU64" (last used "LPU64")\n",
-               oscc->oscc_last_id, oscc->oscc_next_id);
-
-        wake_up(&oscc->oscc_waitq);
-        RETURN(rc);
-}
-
-static int oscc_internal_create(struct osc_creator *oscc)
-{
-        struct ptlrpc_request *request;
-        struct ost_body *body;
-        int size = sizeof(*body);
-        ENTRY;
-
-        spin_lock(&oscc->oscc_lock);
-        if (oscc->oscc_grow_count < OST_MAX_PRECREATE &&
-            !(oscc->oscc_flags & (OSCC_FLAG_LOW | OSCC_FLAG_RECOVERING)) &&
-            (__s64)(oscc->oscc_last_id - oscc->oscc_next_id) <=
-                   (oscc->oscc_grow_count / 4 + 1)) {
-                oscc->oscc_flags |= OSCC_FLAG_LOW;
-                oscc->oscc_grow_count *= 2;
-        }
-
-        if (oscc->oscc_grow_count > OST_MAX_PRECREATE / 2)
-                oscc->oscc_grow_count = OST_MAX_PRECREATE / 2;
-
-        if (oscc->oscc_flags & OSCC_FLAG_CREATING ||
-            oscc->oscc_flags & OSCC_FLAG_RECOVERING) {
-                spin_unlock(&oscc->oscc_lock);
-                RETURN(0);
-        }
-        oscc->oscc_flags |= OSCC_FLAG_CREATING;
-        spin_unlock(&oscc->oscc_lock);
-
-        request = ptlrpc_prep_req(oscc->oscc_obd->u.cli.cl_import,
-                                  LUSTRE_OBD_VERSION, OST_CREATE,
-                                  1, &size, NULL);
-        if (request == NULL) {
-                spin_lock(&oscc->oscc_lock);
-                oscc->oscc_flags &= ~OSCC_FLAG_CREATING;
-                spin_unlock(&oscc->oscc_lock);
-                RETURN(-ENOMEM);
-        }
-
-        request->rq_request_portal = OST_CREATE_PORTAL; //XXX FIXME bug 249
-        body = lustre_msg_buf(request->rq_reqmsg, 0, sizeof(*body));
-
-        spin_lock(&oscc->oscc_lock);
-        body->oa.o_id = oscc->oscc_last_id + oscc->oscc_grow_count;
-        /* probably we should take frequence of request into account? -bzzz */
-        if (oscc->oscc_grow_count < oscc->oscc_max_grow_count) {
-                oscc->oscc_grow_count *= 2;
-                if (oscc->oscc_grow_count > oscc->oscc_max_grow_count)
-                        oscc->oscc_grow_count = oscc->oscc_max_grow_count;
-        }
-        body->oa.o_gr = oscc->oscc_gr;
-        LASSERT(body->oa.o_gr > 0);
-        body->oa.o_valid |= OBD_MD_FLID | OBD_MD_FLGROUP;
-        spin_unlock(&oscc->oscc_lock);
-        CDEBUG(D_INFO, "preallocating through id "LPU64" (last used "LPU64")\n",
-               body->oa.o_id, oscc->oscc_next_id);
-
-        request->rq_replen = lustre_msg_size(1, &size);
-
-        request->rq_async_args.pointer_arg[0] = oscc;
-        request->rq_interpret_reply = osc_interpret_create;
-        ptlrpcd_add_req(request);
-
-        RETURN(0);
-}
-
-static int oscc_has_objects(struct osc_creator *oscc, int count)
-{
-        int have_objs;
-        spin_lock(&oscc->oscc_lock);
-        have_objs = ((__s64)(oscc->oscc_last_id - oscc->oscc_next_id) >= count);
-        spin_unlock(&oscc->oscc_lock);
-
-        if (!have_objs)
-                oscc_internal_create(oscc);
-
-        return have_objs;
-}
-
-static int oscc_wait_for_objects(struct osc_creator *oscc, int count)
-{
-        int have_objs;
-        int ost_full;
-        int osc_invalid;
-
-        have_objs = oscc_has_objects(oscc, count);
-
-        spin_lock(&oscc->oscc_lock);
-        ost_full = (oscc->oscc_flags & OSCC_FLAG_NOSPC);
-        spin_unlock(&oscc->oscc_lock);
-
-        osc_invalid = oscc->oscc_obd->u.cli.cl_import->imp_invalid;
-
-        return have_objs || ost_full || osc_invalid;
-}
-
-static int oscc_precreate(struct osc_creator *oscc, int wait)
-{
-        struct l_wait_info lwi = { 0 };
-        int rc = 0;
-        ENTRY;
-
-        if (oscc_has_objects(oscc, oscc->oscc_grow_count / 2))
-                RETURN(0);
-
-        if (!wait)
-                RETURN(0);
-
-        /* no rc check -- a no-INTR, no-TIMEOUT wait can't fail */
-        l_wait_event(oscc->oscc_waitq, oscc_wait_for_objects(oscc, 1), &lwi);
-
-        if (!oscc_has_objects(oscc, 1) && (oscc->oscc_flags & OSCC_FLAG_NOSPC))
-                rc = -ENOSPC;
-
-        if (oscc->oscc_obd->u.cli.cl_import->imp_invalid)
-                rc = -EIO;
-
-        RETURN(rc);
-}
-
-int oscc_recovering(struct osc_creator *oscc)
-{
-        int recov = 0;
-
-        spin_lock(&oscc->oscc_lock);
-        recov = oscc->oscc_flags & OSCC_FLAG_RECOVERING;
-        spin_unlock(&oscc->oscc_lock);
-
-        return recov;
-}
-
+/* this only is used now for deleting orphanes */
 int osc_create(struct obd_export *exp, struct obdo *oa,
-               void *acl, int acl_size,
-               struct lov_stripe_md **ea, struct obd_trans_info *oti)
+               void *acl, int acl_size, struct lov_stripe_md **ea,
+               struct obd_trans_info *oti)
 {
-        struct lov_stripe_md *lsm;
         struct osc_creator *oscc = &exp->exp_obd->u.cli.cl_oscc;
-        int try_again = 1, rc = 0;
+        int rc = 0;
         ENTRY;
+
         LASSERT(oa);
         LASSERT(ea);
-        LASSERT(oa->o_valid & OBD_MD_FLGROUP);
         LASSERT(oa->o_gr > 0);
+        LASSERT(oa->o_valid & OBD_MD_FLGROUP);
         LASSERT(acl == NULL && acl_size == 0);
 
-        if ((oa->o_valid & OBD_MD_FLFLAGS) &&
-            oa->o_flags == OBD_FL_RECREATE_OBJS) {
-                /* Exceptional case where we are trying to repair missing
-                 * objects for various groups.  We have already validated that
-                 * this is a valid group for the file.  Don't set oscc->oscc_gr.
-                 */
-                RETURN(osc_real_create(exp, oa, ea, oti));
-        }
-
-        LASSERT(oscc->oscc_gr == 0 || oscc->oscc_gr == oa->o_gr);
-        oscc->oscc_gr = oa->o_gr;
-
         if (oa->o_gr == FILTER_GROUP_LLOG || oa->o_gr == FILTER_GROUP_ECHO)
                 RETURN(osc_real_create(exp, oa, ea, oti));
 
@@ -284,10 +92,6 @@ int osc_create(struct obd_export *exp, struct obdo *oa,
                        exp->exp_obd->obd_name);
                 LASSERT(oscc->oscc_flags & OSCC_FLAG_RECOVERING);
 
-                /* delete from next_id on up */
-                oa->o_valid |= OBD_MD_FLID;
-                oa->o_id = oscc->oscc_next_id - 1;
-
                 CDEBUG(D_HA, "%s: deleting to next_id: "LPU64"\n",
                        oscc->oscc_obd->obd_name, oa->o_id);
 
@@ -303,83 +107,18 @@ int osc_create(struct obd_export *exp, struct obdo *oa,
                         if (rc == -ENOSPC)
                                 oscc->oscc_flags |= OSCC_FLAG_NOSPC;
                         oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
-                        oscc->oscc_last_id = oa->o_id;
-                        
                         CDEBUG(D_HA, "%s: oscc recovery finished: %d\n",
                                oscc->oscc_obd->obd_name, rc);
-                        wake_up(&oscc->oscc_waitq);
-                        
                 } else {
                         CDEBUG(D_ERROR, "%s: oscc recovery failed: %d\n",
                                oscc->oscc_obd->obd_name, rc);
                 }
                 spin_unlock(&oscc->oscc_lock);
-
                 RETURN(rc);
         }
 
-        lsm = *ea;
-        if (lsm == NULL) {
-                rc = obd_alloc_memmd(exp, &lsm);
-                if (rc < 0)
-                        RETURN(rc);
-        }
-
-        while (try_again) {
-                /* If orphans are being recovered, then we must wait until 
-                   it is finished before we can continue with create. */
-                if (oscc_recovering(oscc)) {
-                        struct l_wait_info lwi;
-                       
-                        CDEBUG(D_HA,"%p: oscc recovery in progress, waiting\n",
-                               oscc);
-                        lwi = LWI_TIMEOUT(MAX(obd_timeout*HZ/4, 1), NULL, NULL);
-                        rc = l_wait_event(oscc->oscc_waitq,
-                                          !oscc_recovering(oscc), &lwi);
-                        
-                        LASSERT(rc == 0 || rc == -ETIMEDOUT);
-                        if (rc == -ETIMEDOUT) {
-                                CDEBUG(D_HA,"%p: timeout waiting on recovery\n",
-                                       oscc);
-                                RETURN(rc);
-                        }
-                        CDEBUG(D_HA, "%s: oscc recovery over, waking up\n",
-                               exp->exp_obd->obd_name);
-                }
-                
-                spin_lock(&oscc->oscc_lock);
-                if (oscc->oscc_flags & OSCC_FLAG_EXITING) {
-                        spin_unlock(&oscc->oscc_lock);
-                        break;
-                }
-
-                if (oscc->oscc_last_id >= oscc->oscc_next_id) {
-                        memcpy(oa, &oscc->oscc_oa, sizeof(*oa));
-                        oa->o_id = oscc->oscc_next_id;
-                        oa->o_gr = oscc->oscc_gr;
-                        lsm->lsm_object_id = oscc->oscc_next_id;
-                        lsm->lsm_object_gr = oscc->oscc_gr;
-                        *ea = lsm;
-                        oscc->oscc_next_id++;
-                        try_again = 0;
-                } else if (oscc->oscc_flags & OSCC_FLAG_NOSPC) {
-                        rc = -ENOSPC;
-                        spin_unlock(&oscc->oscc_lock);
-                        break;
-                }
-                spin_unlock(&oscc->oscc_lock);
-                rc = oscc_precreate(oscc, try_again);
-                if (rc)
-                        break;
-        }
-
-        if (rc == 0)
-                CDEBUG(D_HA, "%s: returning objid "LPU64"\n",
-                       oscc->oscc_obd->u.cli.cl_import->imp_target_uuid.uuid,
-                       lsm->lsm_object_id);
-        else if (*ea == NULL)
-                obd_free_memmd(exp, &lsm);
-        RETURN(rc);
+        LBUG();
+        RETURN(0);
 }
 
 void oscc_init(struct obd_device *obd)
@@ -390,19 +129,9 @@ void oscc_init(struct obd_device *obd)
                 return;
 
         oscc = &obd->u.cli.cl_oscc;
-
         memset(oscc, 0, sizeof(*oscc));
-        INIT_LIST_HEAD(&oscc->oscc_list);
-        init_waitqueue_head(&oscc->oscc_waitq);
-        spin_lock_init(&oscc->oscc_lock);
-        oscc->oscc_obd = obd;
-        oscc->oscc_kick_barrier = 100;
-        oscc->oscc_max_grow_count = 2000;
-        oscc->oscc_grow_count = OST_MIN_PRECREATE;
 
-        oscc->oscc_next_id = 2;
-        oscc->oscc_last_id = 1;
+        oscc->oscc_obd = obd;
+        spin_lock_init(&oscc->oscc_lock);
         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
-        /* XXX the export handle should give the oscc the last object */
-        /* oed->oed_oscc.oscc_last_id = exph->....; */
 }
index a2deb85..bd0924c 100644 (file)
@@ -268,17 +268,21 @@ static int osc_setattr(struct obd_export *exp, struct obdo *oa,
 
         request->rq_replen = lustre_msg_size(1, &size);
 
-        rc = ptlrpc_queue_wait(request);
-        if (rc)
-                GOTO(out, rc);
-
-        body = lustre_swab_repbuf(request, 0, sizeof(*body),
-                                  lustre_swab_ost_body);
-        if (body == NULL)
-                GOTO(out, rc = -EPROTO);
+        if (oti != NULL && (oti->oti_flags & OBD_MODE_ASYNC)) {
+                ptlrpcd_add_req(request);
+                rc = 0;
+        } else {
+                rc = ptlrpc_queue_wait(request);
+                if (rc)
+                        GOTO(out, rc);
 
-        memcpy(oa, &body->oa, sizeof(*oa));
+                body = lustre_swab_repbuf(request, 0, sizeof(*body),
+                                          lustre_swab_ost_body);
+                if (body == NULL)
+                        GOTO(out, rc = -EPROTO);
 
+                memcpy(oa, &body->oa, sizeof(*oa));
+        }
         EXIT;
 out:
         ptlrpc_req_finished(request);
@@ -417,7 +421,8 @@ static int osc_punch(struct obd_export *exp, struct obdo *oa,
 }
 
 static int osc_sync(struct obd_export *exp, struct obdo *oa,
-                    struct lov_stripe_md *md, obd_size start, obd_size end)
+                    struct lov_stripe_md *md, obd_size start,
+                    obd_size end)
 {
         struct ptlrpc_request *request;
         struct ost_body *body;
@@ -492,8 +497,7 @@ static int osc_destroy(struct obd_export *exp, struct obdo *oa,
         memcpy(&body->oa, oa, sizeof(*oa));
         request->rq_replen = lustre_msg_size(1, &size);
 
-        if (oti != NULL && oti->oti_async) {
-                /* asynchrounous destroy */
+        if (oti != NULL && (oti->oti_flags & OBD_MODE_ASYNC)) {
                 ptlrpcd_add_req(request);
                 rc = 0;
         } else {
@@ -1264,7 +1268,6 @@ static int brw_interpret_oap(struct ptlrpc_request *request,
 
         osc_wake_cache_waiters(cli);
         osc_check_rpcs(cli);
-
         spin_unlock(&cli->cl_loi_list_lock);
 
         obdo_free(aa->aa_oa);
@@ -2867,26 +2870,6 @@ static int osc_set_info(struct obd_export *exp, obd_count keylen,
         int rc = 0;
         ENTRY;
 
-        if (keylen == strlen("next_id") &&
-            memcmp(key, "next_id", strlen("next_id")) == 0) {
-                if (vallen != sizeof(obd_id))
-                        RETURN(-EINVAL);
-               obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
-                CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
-                       exp->exp_obd->obd_name,
-                       obd->u.cli.cl_oscc.oscc_next_id);
-
-                RETURN(0);
-        }
-
-        if (keylen == strlen("growth_count") &&
-            memcmp(key, "growth_count", strlen("growth_count")) == 0) {
-                if (vallen != sizeof(int))
-                        RETURN(-EINVAL);
-               obd->u.cli.cl_oscc.oscc_max_grow_count = *((int*)val);
-                RETURN(0);
-        }
-
         if (keylen == strlen("unlinked") &&
             memcmp(key, "unlinked", keylen) == 0) {
                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
@@ -2915,7 +2898,8 @@ static int osc_set_info(struct obd_export *exp, obd_count keylen,
                 RETURN(0);
         }
 
-        if (keylen == strlen("async") && memcmp(key, "async", keylen) == 0) {
+        if (keylen == strlen("async") &&
+            memcmp(key, "async", keylen) == 0) {
                 struct client_obd *cl = &obd->u.cli;
                 if (vallen != sizeof(int))
                         RETURN(-EINVAL);
@@ -2951,16 +2935,18 @@ static int osc_set_info(struct obd_export *exp, obd_count keylen,
                 RETURN(-EINVAL);
         }
 
-        if (keylen < strlen("mds_conn") || memcmp(key, "mds_conn", keylen) != 0)
+        if (keylen < strlen("mds_conn") ||
+            memcmp(key, "mds_conn", keylen) != 0)
                 RETURN(-EINVAL);
 
-        ctxt = llog_get_context(&exp->exp_obd->obd_llogs, LLOG_UNLINK_ORIG_CTXT);
+        ctxt = llog_get_context(&exp->exp_obd->obd_llogs,
+                                LLOG_UNLINK_ORIG_CTXT);
         if (ctxt) {
                 if (rc == 0)
                         rc = llog_initiator_connect(ctxt);
                 else
-                        CERROR("cannot establish the connect for ctxt %p: %d\n",
-                               ctxt, rc);
+                        CERROR("cannot establish the connect for "
+                               "ctxt %p: %d\n", ctxt, rc);
         }
 
         imp->imp_server_timeout = 1;
@@ -2976,6 +2962,7 @@ static struct llog_operations osc_size_repl_logops = {
 };
 
 static struct llog_operations osc_unlink_orig_logops;
+
 static int osc_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
                          struct obd_device *tgt, int count,
                          struct llog_catid *catid)
@@ -3013,7 +3000,6 @@ static int osc_llog_finish(struct obd_device *obd,
         RETURN(rc);
 }
 
-
 static int osc_connect(struct lustre_handle *exph,
                        struct obd_device *obd, struct obd_uuid *cluuid,
                        struct obd_connect_data *data,
index a8c7254..be036c8 100644 (file)
@@ -565,10 +565,6 @@ static int fsfilt_smfs_set_fs_flags(struct inode *inode, int flags)
 
         if (flags & SM_ALL_PLG) /* enable all plugins */
                 SMFS_SET(I2SMI(inode)->smi_flags, SMFS_PLG_ALL);
-        if (flags & SM_PRECREATE) /* disable logs for precreated objs */
-                SMFS_CLEAR(I2SMI(inode)->smi_flags, SMFS_PLG_ALL);
-
-
 #if 0
         if (SMFS_DO_COW(S2SMI(inode->i_sb)) && (flags & SM_DO_COW))
                 SMFS_SET_INODE_COW(inode);
@@ -588,9 +584,6 @@ static int fsfilt_smfs_clear_fs_flags(struct inode *inode, int flags)
         */
         if(flags & SM_ALL_PLG) /* disable all plugins */
                 SMFS_CLEAR(I2SMI(inode)->smi_flags, SMFS_PLG_ALL);
-        if (flags & SM_PRECREATE) /* enable log again */
-                SMFS_SET(I2SMI(inode)->smi_flags, SMFS_PLG_ALL);
-
         RETURN(rc);
 }
 
index bce9e84..ccfbf75 100755 (executable)
@@ -1057,8 +1057,11 @@ test_51b() {
     mkdir -p $DIR/$tdir-2
     multiop $DIR/$tdir-1/f O_c &
     pid=$!
+
     # give multiop a chance to open
-    sleep 1
+    # 1 second seems to be not enough, we met already such a cases
+    # --umka
+    sleep 5
 
     do_facet $SINGLEMDS "sysctl -w lustre.fail_loc=0x80000107"
     touch $DIR/${tdir}-2/f &
index 47e73c3..48f9c8a 100644 (file)
@@ -1921,7 +1921,7 @@ test_54a() {
        $SOCKETCLIENT $DIR/socket || error
        $MUNLINK $DIR/socket
 }
-run_test 54a "unix damain socket test =========================="
+run_test 54a "unix domain socket test =========================="
 
 test_54b() {
        f="$DIR/f54b"