Whamcloud - gitweb
- CROW-related fixes from b_hd_mdref
[fs/lustre-release.git] / lustre / obdfilter / filter.c
index 023bb30..80a55f7 100644 (file)
@@ -60,6 +60,7 @@
 #include <libcfs/list.h>
 
 #include <linux/lustre_smfs.h>
+#include <linux/lustre_sec.h>
 #include "filter_internal.h"
 
 /* Group 0 is no longer a legal group, to catch uninitialized IDs */
@@ -499,6 +500,7 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp)
                 spin_lock_init(&fed->fed_lock);
 
                 fcd = NULL;
+                exp->exp_connected = 0;
                 exp->exp_req_replay_needed = 1;
                 exp->exp_lock_replay_needed = 1;
                 atomic_inc(&obd->obd_req_replay_clients);
@@ -1386,6 +1388,7 @@ static int filter_post_fs_cleanup(struct obd_device *obd)
 
         RETURN(rc);
 }
+
 #if 0
 static int filter_group_set_fs_flags(struct obd_device *obd, int group)
 {
@@ -1407,6 +1410,7 @@ static int filter_group_set_fs_flags(struct obd_device *obd, int group)
         RETURN(rc);
 }
 #endif
+
 static int filter_post_fs_setup(struct obd_device *obd)
 {
         struct filter_obd *filter = &obd->u.filter;
@@ -1570,13 +1574,29 @@ static int filter_detach(struct obd_device *dev)
 
 static int filter_setup(struct obd_device *obd, obd_count len, void *buf)
 {
-        struct lustre_cfg* lcfg = buf;
+        struct filter_obd *filter = &obd->u.filter;
+        struct lustre_cfg *lcfg = buf;
+        unsigned long page;
         int rc;
         ENTRY;
+
+        spin_lock_init(&filter->fo_denylist_lock);
+        INIT_LIST_HEAD(&filter->fo_denylist);
+
+        /* 2.6.9 selinux wants a full option page for do_kern_mount (bug6471) */
+        page = get_zeroed_page(GFP_KERNEL);
+        if (!page)
+                RETURN(-ENOMEM);
+
+        memcpy((void *)page, lustre_cfg_buf(lcfg, 4),
+               LUSTRE_CFG_BUFLEN(lcfg, 4));
+        
         /* all mount options including errors=remount-ro and asyncdel are passed
          * using 4th lcfg param. And it is good, finally we have got rid of
          * hardcoded fs types in the code. */
-        rc = filter_common_setup(obd, len, buf, lustre_cfg_buf(lcfg, 4));
+        rc = filter_common_setup(obd, len, buf, (void *)page);
+        free_page(page);
+        
         if (rc)
                 RETURN(rc);
         rc = filter_post_fs_setup(obd);
@@ -1616,6 +1636,15 @@ static int filter_cleanup(struct obd_device *obd, int flags)
         shrink_dcache_parent(filter->fo_sb->s_root);
         filter->fo_sb = 0;
 
+        spin_lock(&filter->fo_denylist_lock);
+        while (!list_empty(&filter->fo_denylist)) {
+                deny_sec_t *p_deny_sec = list_entry(filter->fo_denylist.next,
+                                                    deny_sec_t, list);
+                list_del(&p_deny_sec->list);
+                OBD_FREE(p_deny_sec, sizeof(*p_deny_sec));
+        }
+        spin_unlock(&filter->fo_denylist_lock);
+
         unlock_kernel();
         lvfs_umount_fs(filter->fo_lvfs_ctxt);
         //destroy_buffers(filter->fo_sb->s_dev);
@@ -1628,6 +1657,40 @@ static int filter_cleanup(struct obd_device *obd, int flags)
         RETURN(0);
 }
 
+static int filter_process_config(struct obd_device *obd, obd_count len, void *buf)
+{
+        struct lustre_cfg *lcfg = buf;
+        struct filter_obd *filter = &obd->u.filter;
+        int rc = 0;
+        ENTRY;
+
+        switch(lcfg->lcfg_command) {
+        case LCFG_SET_SECURITY: {
+                if ((LUSTRE_CFG_BUFLEN(lcfg, 1) == 0) ||
+                    (LUSTRE_CFG_BUFLEN(lcfg, 2) == 0))
+                        GOTO(out, rc = -EINVAL);
+
+                if (!strcmp(lustre_cfg_string(lcfg, 1), "deny_sec")){
+                        spin_lock(&filter->fo_denylist_lock);
+                        rc = add_deny_security(lustre_cfg_string(lcfg, 2),
+                                               &filter->fo_denylist);
+                        spin_unlock(&filter->fo_denylist_lock);
+                }else {
+                        CERROR("Unrecognized key\n");
+                        rc = -EINVAL;
+                }
+                break;
+        }
+        default: {
+                CERROR("Unknown command: %d\n", lcfg->lcfg_command);
+                GOTO(out, rc = -EINVAL);
+
+        }
+        }
+out:
+        RETURN(rc);
+}
+
 static int filter_connect_post(struct obd_export *exp, unsigned initial,
                                unsigned long connect_flags)
 {
@@ -1636,7 +1699,7 @@ static int filter_connect_post(struct obd_export *exp, unsigned initial,
         char str[PTL_NALFMT_SIZE];
         struct obd_llogs *llog;
         struct llog_ctxt *ctxt;
-        int rc;
+        int rc = 0;
         ENTRY;
 
         fed = &exp->exp_filter_data;
@@ -1651,8 +1714,10 @@ static int filter_connect_post(struct obd_export *exp, unsigned initial,
         LASSERT(ctxt != NULL);
 
         rc = llog_receptor_accept(ctxt, exp->exp_imp_reverse);
+        
         portals_nid2str(exp->exp_connection->c_peer.peer_ni->pni_number,
                         exp->exp_connection->c_peer.peer_id.nid, str);
+        
         CDEBUG(D_OTHER, "%s: init llog ctxt for export "LPX64"/%s, group %d\n",
                obd->obd_name, exp->exp_connection->c_peer.peer_id.nid,
                str, fed->fed_group);
@@ -1995,6 +2060,7 @@ struct dentry *__filter_oa2dentry(struct obd_device *obd,
 {
         struct dentry *dchild = NULL;
         obd_gr group = 0;
+        ENTRY;
 
         if (oa->o_valid & OBD_MD_FLGROUP)
                 group = oa->o_gr;
@@ -2008,13 +2074,13 @@ struct dentry *__filter_oa2dentry(struct obd_device *obd,
         }
 
         if (dchild->d_inode == NULL) {
-                CERROR("%s: %s on non-existent object: "LPU64"\n",
-                       obd->obd_name, what, oa->o_id);
+                CDEBUG(D_INFO, "%s: %s on non-existent object: "
+                       LPU64"\n", obd->obd_name, what, oa->o_id);
                 f_dput(dchild);
                 RETURN(ERR_PTR(-ENOENT));
         }
 
-        return dchild;
+        RETURN(dchild);
 }
 
 static int filter_getattr(struct obd_export *exp, struct obdo *oa,
@@ -2044,79 +2110,106 @@ static int filter_getattr(struct obd_export *exp, struct obdo *oa,
         RETURN(rc);
 }
 
-/* this is called from filter_truncate() until we have filter_punch() */
-static int filter_setattr(struct obd_export *exp, struct obdo *oa,
-                          struct lov_stripe_md *md, struct obd_trans_info *oti)
+int filter_setattr_internal(struct obd_export *exp, struct dentry *dentry,
+                            struct obdo *oa, struct obd_trans_info *oti)
 {
-        struct lvfs_run_ctxt saved;
         struct filter_obd *filter;
-        struct dentry *dentry;
         struct iattr iattr;
-        struct ldlm_res_id res_id = { .name = { oa->o_id, 0, oa->o_gr, 0 } };
-        struct ldlm_resource *res;
         void *handle;
-        int rc, rc2;
+        int rc, err;
         ENTRY;
 
-        LASSERT(oti != NULL);
-
-        dentry = filter_oa2dentry(exp->exp_obd, oa);
-        if (IS_ERR(dentry))
-                RETURN(PTR_ERR(dentry));
-
+        LASSERT(dentry != NULL);
+        LASSERT(!IS_ERR(dentry));
+        LASSERT(dentry->d_inode != NULL);
+        
         filter = &exp->exp_obd->u.filter;
-
         iattr_from_obdo(&iattr, oa, oa->o_valid);
 
-        push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
-        lock_kernel();
-
         if (iattr.ia_valid & ATTR_SIZE)
                 down(&dentry->d_inode->i_sem);
-        handle = fsfilt_start(exp->exp_obd, dentry->d_inode, FSFILT_OP_SETATTR,
-                              oti);
+        handle = fsfilt_start(exp->exp_obd, dentry->d_inode,
+                              FSFILT_OP_SETATTR, oti);
         if (IS_ERR(handle))
                 GOTO(out_unlock, rc = PTR_ERR(handle));
 
         /* XXX this could be a rwsem instead, if filter_preprw played along */
         if (iattr.ia_valid & ATTR_ATTR_FLAG)
-                rc = fsfilt_iocontrol(exp->exp_obd, dentry->d_inode, NULL,
-                                      EXT3_IOC_SETFLAGS,
+                rc = fsfilt_iocontrol(exp->exp_obd, dentry->d_inode,
+                                      NULL, EXT3_IOC_SETFLAGS,
                                       (long)&iattr.ia_attr_flags);
         else
-                rc = fsfilt_setattr(exp->exp_obd, dentry, handle, &iattr, 1);
+                rc = fsfilt_setattr(exp->exp_obd, dentry, handle,
+                                    &iattr, 1);
+        
         rc = filter_finish_transno(exp, oti, rc);
-        rc2 = fsfilt_commit(exp->exp_obd, filter->fo_sb, dentry->d_inode, 
-                            handle, exp->exp_sync);
-        if (rc2) {
-                CERROR("error on commit, err = %d\n", rc2);
+        
+        err = fsfilt_commit(exp->exp_obd, filter->fo_sb,
+                            dentry->d_inode, handle,
+                            exp->exp_sync);
+        if (err) {
+                CERROR("error on commit, err = %d\n", err);
                 if (!rc)
-                        rc = rc2;
+                        rc = err;
         }
+        EXIT;
+out_unlock:
+        if (iattr.ia_valid & ATTR_SIZE)
+                up(&dentry->d_inode->i_sem);
+        return rc;
+}
+
+/* this is called from filter_truncate() until we have filter_punch() */
+int filter_setattr(struct obd_export *exp, struct obdo *oa,
+                   struct lov_stripe_md *md, struct obd_trans_info *oti)
+{
+        struct ldlm_res_id res_id = { .name = { oa->o_id, 0, oa->o_gr, 0 } };
+        struct ldlm_valblock_ops *ns_lvbo;
+        struct lvfs_run_ctxt saved;
+        struct filter_obd *filter;
+        struct ldlm_resource *res;
+        struct dentry *dentry;
+        int rc;
+        ENTRY;
+
+        LASSERT(oti != NULL);
+
+        filter = &exp->exp_obd->u.filter;
+        push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
+
+        /* make sure that object is allocated. */
+        dentry = filter_crow_object(exp->exp_obd,
+                                    oa->o_gr, oa->o_id);
+        if (IS_ERR(dentry))
+                GOTO(out_pop, rc = PTR_ERR(dentry));
+
+        lock_kernel();
+
+        /* setting objects attributes (including owner/group) */
+        rc = filter_setattr_internal(exp, dentry, oa, oti);
+        if (rc)
+                GOTO(out_unlock, rc);
 
         res = ldlm_resource_get(exp->exp_obd->obd_namespace, NULL,
                                 res_id, LDLM_EXTENT, 0);
+        
         if (res != NULL) {
-                if (res->lr_namespace->ns_lvbo &&
-                    res->lr_namespace->ns_lvbo->lvbo_update)
-                        rc = res->lr_namespace->ns_lvbo->lvbo_update(res, NULL,
-                                                                     0, 0);
+                ns_lvbo = res->lr_namespace->ns_lvbo;
+                if (ns_lvbo && ns_lvbo->lvbo_update)
+                        rc = ns_lvbo->lvbo_update(res, NULL, 0, 0);
                 ldlm_resource_putref(res);
-        } else if (iattr.ia_valid & ATTR_SIZE) {
-                /* called from MDS. */
         }
-
+        
         oa->o_valid = OBD_MD_FLID;
         obdo_from_inode(oa, dentry->d_inode, FILTER_VALID_FLAGS);
 
+        EXIT;
 out_unlock:
-        if (iattr.ia_valid & ATTR_SIZE)
-                up(&dentry->d_inode->i_sem);
         unlock_kernel();
-        pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
-
         f_dput(dentry);
-        RETURN(rc);
+out_pop:
+        pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
+        return rc;
 }
 
 /* XXX identical to osc_unpackmd */
@@ -2169,111 +2262,6 @@ static int filter_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
         RETURN(lsm_size);
 }
 
-static void filter_destroy_precreated(struct obd_export *exp, struct obdo *oa,
-                                      struct filter_obd *filter)
-{
-        struct obdo *doa = NULL;
-        __u64 last, id;
-        ENTRY;
-        
-        LASSERT(oa);
-        LASSERT(oa->o_gr != 0);
-        LASSERT(oa->o_valid & OBD_MD_FLGROUP);
-
-       doa = obdo_alloc();
-        if (doa == NULL) {
-                CERROR("cannot allocate doa, error %d\n",
-                       -ENOMEM);
-                EXIT;
-                return;
-        }
-
-        doa->o_mode = S_IFREG;
-        doa->o_gr = oa->o_gr;
-        doa->o_valid = oa->o_valid & (OBD_MD_FLGROUP | OBD_MD_FLID);
-
-        set_bit(doa->o_gr, &filter->fo_destroys_in_progress);
-        down(&filter->fo_create_locks[doa->o_gr]);
-        if (!test_bit(doa->o_gr, &filter->fo_destroys_in_progress)) {
-                CERROR("%s:["LPU64"] destroy_in_progress already cleared\n",
-                       exp->exp_obd->obd_name, doa->o_gr);
-                up(&filter->fo_create_locks[doa->o_gr]);
-                GOTO(out_free_doa, 0);
-        }
-
-        last = filter_last_id(filter, doa->o_gr);
-        CWARN("%s:["LPU64"] deleting orphan objects from "LPU64" to "LPU64"\n",
-              exp->exp_obd->obd_name, doa->o_gr, oa->o_id + 1, last);
-        for (id = oa->o_id + 1; id <= last; id++) {
-                doa->o_id = id;
-                filter_destroy(exp, doa, NULL, NULL);
-        }
-
-        CDEBUG(D_HA, "%s:["LPU64"] after destroy: set last_objids = "LPU64"\n",
-               exp->exp_obd->obd_name, doa->o_gr, oa->o_id);
-
-        filter_set_last_id(filter, doa->o_gr, oa->o_id);
-
-        clear_bit(doa->o_gr, &filter->fo_destroys_in_progress);
-        up(&filter->fo_create_locks[doa->o_gr]);
-
-        EXIT;
-out_free_doa:
-        obdo_free(doa);
-}
-
-/* returns a negative error or a nonnegative number of files to create */
-static int filter_should_precreate(struct obd_export *exp, struct obdo *oa,
-                                   obd_gr group)
-{
-        struct obd_device *obd = exp->exp_obd;
-        struct filter_obd *filter = &obd->u.filter;
-        int diff, rc;
-        ENTRY;
-
-        diff = oa->o_id - filter_last_id(filter, oa->o_gr);
-        CDEBUG(D_INFO, "filter_last_id() = "LPU64" -> diff = %d\n",
-               filter_last_id(filter, oa->o_gr), diff);
-
-        /* delete orphans request */
-        if ((oa->o_valid & OBD_MD_FLFLAGS) &&
-            (oa->o_flags & OBD_FL_DELORPHAN)) {
-                if (diff >= 0)
-                        RETURN(diff);
-                if (-diff > OST_MAX_PRECREATE) {
-                        CERROR("ignoring bogus orphan destroy request: obdid "
-                               LPU64" last_id "LPU64"\n",
-                               oa->o_id, filter_last_id(filter, oa->o_gr));
-                        RETURN(-EINVAL);
-                }
-                filter_destroy_precreated(exp, oa, filter);
-                rc = filter_update_last_objid(obd, group, 0);
-                if (rc)
-                        CERROR("unable to write lastobjid, but orphans"
-                               "were deleted\n");
-                RETURN(0);
-        } else {
-                /* only precreate if group == 0 and o_id is specfied */
-                if (!(oa->o_valid & OBD_FL_DELORPHAN) &&
-                    (/*group != 0 ||*/ oa->o_id == 0))
-                        RETURN(1);
-
-                LASSERTF(diff >= 0, LPU64" - "LPU64" = %d\n", oa->o_id,
-                         filter_last_id(filter, oa->o_gr), diff);
-                RETURN(diff);
-        }
-}
-static int filter_precreate_rec(struct obd_device *obd, struct dentry *dentry, 
-                                int *number, struct obdo *oa)
-{
-        int rc;
-        ENTRY;
-
-        rc = fsfilt_precreate_rec(obd, dentry, number, oa);
-
-        RETURN(rc);
-}
-
 static int filter_statfs(struct obd_device *obd, struct obd_statfs *osfs,
                          unsigned long max_age)
 {
@@ -2305,188 +2293,250 @@ static int filter_statfs(struct obd_device *obd, struct obd_statfs *osfs,
         RETURN(rc);
 }
 
-/* We rely on the fact that only one thread will be creating files in a given
- * group at a time, which is why we don't need an atomic filter_get_new_id.
- * Even if we had that atomic function, the following race would exist:
- *
- * thread 1: gets id x from filter_next_id
- * thread 2: gets id (x + 1) from filter_next_id
- * thread 2: creates object (x + 1)
- * thread 1: tries to create object x, gets -ENOSPC
- */
-static int filter_precreate(struct obd_device *obd, struct obdo *oa,
-                            obd_gr group, int *num)
+int filter_create_object(struct obd_device *obd, struct obdo *oa,
+                         obd_gr group)
 {
-        struct dentry *dchild = NULL, *dparent = NULL;
-        int err = 0, rc = 0, recreate_obj = 0, i;
+        struct dentry *dparent = NULL;
+        struct dentry *dchild = NULL;
         struct filter_obd *filter;
+        struct obd_statfs *osfs;
+        int cleanup_phase = 0;
+        int err = 0, rc = 0;
         void *handle = NULL;
         void *lock = NULL;
-        struct obd_statfs *osfs;
-        unsigned long enough_time = jiffies + (obd_timeout * HZ) / 3;
-        __u64 next_id;
         ENTRY;
 
         filter = &obd->u.filter;
 
-        if ((oa->o_valid & OBD_MD_FLFLAGS) &&
-            (oa->o_flags & OBD_FL_RECREATE_OBJS)) {
-                recreate_obj = 1;
-        } else {
-                OBD_ALLOC(osfs, sizeof(*osfs));
-                if (osfs == NULL)
-                        RETURN(-ENOMEM);
-                rc = filter_statfs(obd, osfs, jiffies-HZ);
-                if (rc == 0 && osfs->os_bavail < (osfs->os_blocks >> 10)) {
-                        CDEBUG(D_HA, "OST out of space! avail "LPU64"\n",
-                              osfs->os_bavail<<filter->fo_sb->s_blocksize_bits);
-                        *num = 0;
-                        rc = -ENOSPC;
-                }
-                OBD_FREE(osfs, sizeof(*osfs));
-                if (rc) {
-                        RETURN(rc);
-                }
+        OBD_ALLOC(osfs, sizeof(*osfs));
+        if (osfs == NULL)
+                RETURN(-ENOMEM);
+        rc = filter_statfs(obd, osfs, jiffies - HZ);
+        if (rc == 0 && osfs->os_bavail < (osfs->os_blocks >> 10)) {
+                CDEBUG(D_HA, "OST out of space! avail "LPU64"\n",
+                       osfs->os_bavail << filter->fo_sb->s_blocksize_bits);
+                rc = -ENOSPC;
         }
-
-        CDEBUG(D_HA, "%s: precreating %d objects\n", obd->obd_name, *num);
+        OBD_FREE(osfs, sizeof(*osfs));
+        if (rc)
+                RETURN(rc);
 
         down(&filter->fo_create_locks[group]);
 
-        for (i = 0; i < *num && err == 0; i++) {
-                int cleanup_phase = 0;
+        if (test_bit(group, &filter->fo_destroys_in_progress)) {
+                CWARN("%s: precreate aborted by destroy\n",
+                      obd->obd_name);
+                GOTO(out, rc = -EALREADY);
+        }
 
-                if (test_bit(group, &filter->fo_destroys_in_progress)) {
-                        CWARN("%s: precreate aborted by destroy\n",
-                              obd->obd_name);
-                        break;
-                }
+        CDEBUG(D_INFO, "precreate objid "LPU64"\n", oa->o_id);
 
-                if (recreate_obj) {
-                        __u64 last_id;
-                        next_id = oa->o_id;
-                        last_id = filter_last_id(filter, group);
-                        if (next_id > last_id) {
-                                CERROR("Error: Trying to recreate obj greater"
-                                       "than last id "LPD64" > "LPD64"\n",
-                                       next_id, last_id);
-                                GOTO(cleanup, rc = -EINVAL);
-                        }
-                } else {
-                        next_id = filter_last_id(filter, group) + 1;
-                }
+        dparent = filter_parent_lock(obd, group, oa->o_id, &lock);
+        if (IS_ERR(dparent))
+                GOTO(cleanup, rc = PTR_ERR(dparent));
+        cleanup_phase = 1;
 
-                CDEBUG(D_INFO, "precreate objid "LPU64"\n", next_id);
-
-                dparent = filter_parent_lock(obd, group, next_id, &lock);
-                if (IS_ERR(dparent))
-                        GOTO(cleanup, rc = PTR_ERR(dparent));
-                cleanup_phase = 1;
-
-                /* precreate objects are not logged */
-                fsfilt_set_fs_flags(obd, dparent->d_inode, SM_PRECREATE);
-
-                dchild = filter_id2dentry(obd, dparent, group, next_id);
-                if (IS_ERR(dchild))
-                        GOTO(cleanup, rc = PTR_ERR(dchild));
-                cleanup_phase = 2;
-
-                if (dchild->d_inode != NULL) {
-                        /* This would only happen if lastobjid was bad on disk*/
-                        /* Could also happen if recreating missing obj but
-                         * already exists
-                         */
-                        if (recreate_obj) {
-                                CERROR("%s: recreating existing object %.*s?\n",
-                                       obd->obd_name, dchild->d_name.len,
-                                       dchild->d_name.name);
-                        } else {
-                                CERROR("%s: Serious error: objid %.*s already "
-                                       "exists; is this filesystem corrupt?\n",
-                                       obd->obd_name, dchild->d_name.len,
-                                       dchild->d_name.name);
-                                LBUG();
-                        }
-                        GOTO(cleanup, rc = -EEXIST);
-                }
+        dchild = filter_id2dentry(obd, dparent, group, oa->o_id);
+        if (IS_ERR(dchild))
+                GOTO(cleanup, rc = PTR_ERR(dchild));
+        cleanup_phase = 2;
+
+        if (dchild->d_inode != NULL)
+                GOTO(cleanup, rc = 0);
 
-                handle = fsfilt_start_log(obd, dparent->d_inode,
-                                          FSFILT_OP_CREATE, NULL, 1);
-                if (IS_ERR(handle))
-                        GOTO(cleanup, rc = PTR_ERR(handle));
-                cleanup_phase = 3;
+        handle = fsfilt_start_log(obd, dparent->d_inode,
+                                  FSFILT_OP_CREATE, NULL, 1);
+        if (IS_ERR(handle))
+                GOTO(cleanup, rc = PTR_ERR(handle));
+        cleanup_phase = 3;
+
+        rc = ll_vfs_create(dparent->d_inode, dchild, S_IFREG, NULL);
+        if (rc) {
+                CERROR("create failed rc = %d\n", rc);
+                GOTO(cleanup, rc);
+        }
+
+        fsfilt_set_fs_flags(obd, dparent->d_inode, SM_DO_REC);
+
+        if (oa->o_id > filter_last_id(filter, group)) {
+                /*
+                 * saving last created object id, it will be needed in recovery
+                 * for deleting orphanes.
+                 */
+                filter_set_last_id(filter, group, oa->o_id);
 
-                rc = ll_vfs_create(dparent->d_inode, dchild, S_IFREG, NULL);
+                rc = filter_update_last_objid(obd, group, 0);
                 if (rc) {
-                        CERROR("create failed rc = %d\n", rc);
-                        GOTO(cleanup, rc);
+                        CERROR("unable to write lastobjid, but "
+                               "orphans were deleted, err = %d\n",
+                               rc);
+                        rc = 0;
                 }
-
-                if (!recreate_obj) {
-                        filter_set_last_id(filter, group, next_id);
-                        err = filter_update_last_objid(obd, group, 0);
-                        if (err)
-                                CERROR("unable to write lastobjid "
-                                       "but file created\n");
+        }
+cleanup:
+        switch(cleanup_phase) {
+        case 3:
+                err = fsfilt_commit(obd, filter->fo_sb,
+                                    dparent->d_inode, handle, 0);
+                if (err) {
+                        CERROR("error on commit, err = %d\n", err);
+                        if (!rc)
+                                rc = err;
                 }
-                fsfilt_set_fs_flags(obd, dparent->d_inode, SM_DO_REC);
+        case 2:
+                f_dput(dchild);
+        case 1:
+                filter_parent_unlock(dparent, lock);
+        case 0:
+                break;
+        }
+
+        if (rc)
+                GOTO(out, rc);
+
+out:
+        up(&filter->fo_create_locks[group]);
+        RETURN(rc);
+}
+
+struct dentry *filter_crow_object(struct obd_device *obd,
+                                  __u64 ogr, __u64 oid)
+{
+        struct dentry *dentry;
+        struct obdo *oa;
+        int rc = 0;
+        ENTRY;
+
+        /* check if object is already allocated */
+        dentry = filter_id2dentry(obd, NULL, ogr, oid);
+        if (IS_ERR(dentry))
+                RETURN(dentry);
+
+        if (dentry->d_inode)
+                RETURN(dentry);
+
+        f_dput(dentry);
         
-        cleanup:
-                switch(cleanup_phase) {
-                case 3:
-                        err = fsfilt_commit(obd, filter->fo_sb,
-                                            dparent->d_inode, handle, 0);
-                        if (err) {
-                                CERROR("error on commit, err = %d\n", err);
-                                if (!rc)
-                                        rc = err;
-                        }
-                case 2:
-                        f_dput(dchild);
-                case 1:
-                        filter_parent_unlock(dparent, lock);
-                case 0:
-                        break;
-                }
+        /* allocate object as it does not exist */
+        oa = obdo_alloc();
+        if (oa == NULL)
+                RETURN(ERR_PTR(-ENOMEM));
 
-                if (rc)
-                        break;
-                if (time_after(jiffies, enough_time)) {
-                        CDEBUG(D_INODE,"%s: precreate slow - want %d got %d \n",
-                               obd->obd_name, *num, i);
-                        break;
-                }
+        oa->o_id = oid;
+        oa->o_gr = ogr;
+        oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
+
+        CDEBUG(D_INODE, "OSS object "LPU64"/"LPU64
+               " does not exists - allocate now\n",
+               oid, ogr);
+
+        rc = filter_create_object(obd, oa, oa->o_gr);
+        if (rc) {
+                CERROR("cannot create OSS object "LPU64"/"LPU64
+                       ", err = %d\n", oa->o_id, oa->o_gr, rc);
+                GOTO(out_free_oa, dentry = ERR_PTR(rc));
         }
 
-        *num = i;
+        /* lookup for just created object and return it to caller */
+        dentry = filter_id2dentry(obd, NULL, ogr, oid);
+        if (IS_ERR(dentry))
+                GOTO(out_free_oa, dentry);
+                
+        if (dentry->d_inode == NULL) {
+                f_dput(dentry);
+                dentry = ERR_PTR(-ENOENT);
+                CERROR("cannot find just created OSS object "
+                       LPU64"/"LPU64" err = %d\n", oid,
+                       ogr, (int)PTR_ERR(dentry));
+                GOTO(out_free_oa, dentry);
+        }
 
-        /* check if we have an error after ll_vfs_create(). It is possible that
-         * there will be say -ENOSPC and we will leak it. */
-        if (rc == 0)
-                rc = filter_precreate_rec(obd, dparent, num, oa);
+        EXIT;
+out_free_oa:
+        obdo_free(oa);
+        return dentry;
+}
 
-        up(&filter->fo_create_locks[group]);
+static int
+filter_clear_orphans(struct obd_export *exp, struct obdo *oa)
+{
+        struct obd_device *obd = NULL;
+        struct filter_obd *filter;
+        struct obdo *doa = NULL;
+        int rc = 0, orphans;
+        __u64 last, id;
+        ENTRY;
+        
+        LASSERT(oa);
+        LASSERT(oa->o_gr != 0);
+        LASSERT(oa->o_valid & OBD_MD_FLGROUP);
 
-        CDEBUG(D_HA, "%s: server last_objid for group "LPU64": "LPU64"\n",
-               obd->obd_name, group, filter->fo_last_objids[group]);
+        obd = exp->exp_obd;
+        filter = &obd->u.filter;
 
-        CDEBUG(D_HA, "%s: filter_precreate() created %d objects\n",
-               obd->obd_name, i);
+        last = filter_last_id(filter, oa->o_gr);
+        orphans = last - oa->o_id;
+        
+        if (orphans <= 0)
+                RETURN(0);
+                
+       doa = obdo_alloc();
+        if (doa == NULL)
+                RETURN(-ENOMEM);
 
-        RETURN(rc);
+        doa->o_gr = oa->o_gr;
+        doa->o_mode = S_IFREG;
+        doa->o_valid = oa->o_valid & (OBD_MD_FLGROUP | OBD_MD_FLID);
+
+        set_bit(doa->o_gr, &filter->fo_destroys_in_progress);
+        down(&filter->fo_create_locks[doa->o_gr]);
+        if (!test_bit(doa->o_gr, &filter->fo_destroys_in_progress)) {
+                CERROR("%s:["LPU64"] destroy_in_progress already cleared\n",
+                       exp->exp_obd->obd_name, doa->o_gr);
+                up(&filter->fo_create_locks[doa->o_gr]);
+                GOTO(out_free_doa, 0);
+        }
+
+        CWARN("%s:["LPU64"] deleting orphan objects from "LPU64" to "
+              LPU64"\n", exp->exp_obd->obd_name, doa->o_gr,
+              oa->o_id + 1, last);
+        
+        for (id = oa->o_id + 1; id <= last; id++) {
+                doa->o_id = id;
+                filter_destroy(exp, doa, NULL, NULL);
+        }
+
+        CDEBUG(D_HA, "%s:["LPU64"] after destroy: set last_objids = "
+               LPU64"\n", exp->exp_obd->obd_name, doa->o_gr, oa->o_id);
+
+        /* return next free id to be used as a new start of sequence -bzzz */
+        oa->o_id = last + 1;
+
+        filter_set_last_id(filter, oa->o_gr, oa->o_id);
+        clear_bit(doa->o_gr, &filter->fo_destroys_in_progress);
+        up(&filter->fo_create_locks[oa->o_gr]);
+
+        EXIT;
+out_free_doa:
+        obdo_free(doa);
+        return rc;
 }
 
-static int filter_create(struct obd_export *exp, struct obdo *oa,
-                         void *acl, int acl_size,
-                         struct lov_stripe_md **ea, struct obd_trans_info *oti)
+/*
+ * by now this function is only needed as entry point for deleting orphanes on
+ * OSS as objects are created on first write attempt. --umka
+ */
+static int
+filter_create(struct obd_export *exp, struct obdo *oa, void *acl,
+              int acl_size, struct lov_stripe_md **ea,
+              struct obd_trans_info *oti)
 {
+        struct filter_export_data *fed;
         struct obd_device *obd = NULL;
-        struct filter_obd *filter;
+        int group = oa->o_gr, rc = 0;
         struct lvfs_run_ctxt saved;
-        struct lov_stripe_md *lsm = NULL;
-        struct filter_export_data *fed;
+        struct filter_obd *filter;
         char str[PTL_NALFMT_SIZE];
-        int group = oa->o_gr, rc = 0, diff, recreate_objs = 0;
         ENTRY;
 
         LASSERT(acl == NULL && acl_size == 0);
@@ -2499,19 +2549,14 @@ static int filter_create(struct obd_export *exp, struct obdo *oa,
                 RETURN(-EINVAL);
         }
 
-        if ((oa->o_valid & OBD_MD_FLFLAGS) &&
-            (oa->o_flags & OBD_FL_RECREATE_OBJS))
-                recreate_objs = 1;
-
         obd = exp->exp_obd;
         fed = &exp->exp_filter_data;
         filter = &obd->u.filter;
 
-        if (fed->fed_group != group && !recreate_objs &&
-            !(oa->o_valid & OBD_MD_REINT)) {
+        if (fed->fed_group != group) {
                 portals_nid2str(exp->exp_connection->c_peer.peer_ni->pni_number,
                                 exp->exp_connection->c_peer.peer_id.nid, str);
-                CERROR("!!! This export (nid "LPX64"/%s) used object group %d "
+                CERROR("!!! this export (nid "LPX64"/%s) used object group %d "
                        "earlier; now it's trying to use group %d!  This could "
                        "be a bug in the MDS.  Tell CFS.\n",
                        exp->exp_connection->c_peer.peer_id.nid, str,
@@ -2521,54 +2566,28 @@ static int filter_create(struct obd_export *exp, struct obdo *oa,
 
         CDEBUG(D_INFO, "filter_create(od->o_gr=%d,od->o_id="LPU64")\n",
                group, oa->o_id);
-        if (ea != NULL) {
-                lsm = *ea;
-                if (lsm == NULL) {
-                        rc = obd_alloc_memmd(exp, &lsm);
-                        if (rc < 0)
-                                RETURN(rc);
-                }
-        }
 
         obd = exp->exp_obd;
         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
 
-        if (oa->o_valid & OBD_MD_REINT) {
-                int num = *((int*)oa->o_inline);  
-                rc = filter_precreate(obd, oa, oa->o_gr, &num);
-        } else if (recreate_objs) {
-                if (oa->o_id > filter_last_id(&obd->u.filter, group)) {
-                        CERROR("recreate objid "LPU64" > last id "LPU64"\n",
-                               oa->o_id, filter_last_id(&obd->u.filter, group));
-                        rc = -EINVAL;
-                } else {
-                        diff = 1;
-                        rc = filter_precreate(obd, oa, group, &diff);
-                }
+        LASSERT((oa->o_valid & OBD_MD_FLFLAGS) &&
+                (oa->o_flags == OBD_FL_DELORPHAN));
+                
+        rc = filter_clear_orphans(exp, oa);
+        if (rc) {
+                CERROR("cannot clear orphanes starting from "
+                       LPU64", err = %d\n", oa->o_id, rc);
         } else {
-                diff = filter_should_precreate(exp, oa, group);
-                if (diff > 0) {
-                        oa->o_id = filter_last_id(&obd->u.filter, group);
-                        rc = filter_precreate(obd, oa, group, &diff);
-                        oa->o_id = filter_last_id(&obd->u.filter, oa->o_gr);
-                        oa->o_valid = OBD_MD_FLID;
+                rc = filter_update_last_objid(obd, group, 0);
+                if (rc) {
+                        CERROR("unable to write lastobjid, but "
+                               "orphans were deleted, err = %d\n",
+                               rc);
                 }
         }
-
         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
-        if (rc && ea != NULL && *ea != lsm) {
-                obd_free_memmd(exp, &lsm);
-        } else if (rc == 0 && ea != NULL) {
-                /* XXX LOV STACKING: the lsm that is passed to us from
-                 * LOV does not have valid lsm_oinfo data structs, so
-                 * don't go touching that.  This needs to be fixed in a
-                 * big way. */
-                lsm->lsm_object_id = oa->o_id;
-                lsm->lsm_object_gr = oa->o_gr;
-                *ea = lsm;
-        }
-
-        RETURN(rc);
+        
+        RETURN(0);
 }
 
 static int filter_destroy(struct obd_export *exp, struct obdo *oa,
@@ -3040,6 +3059,7 @@ static struct obd_ops filter_obd_ops = {
         .o_setup          = filter_setup,
         .o_precleanup     = filter_precleanup,
         .o_cleanup        = filter_cleanup,
+        .o_process_config = filter_process_config,
         .o_connect        = filter_connect,
         .o_connect_post   = filter_connect_post,
         .o_disconnect     = filter_disconnect,