Whamcloud - gitweb
landing b_cmobd_merge on HEAD
[fs/lustre-release.git] / lustre / obdfilter / filter.c
index bac0c32..e2a55dd 100644 (file)
@@ -107,8 +107,10 @@ int filter_finish_transno(struct obd_export *exp, struct obd_trans_info *oti,
         fcd->fcd_last_xid = 0;
 
         off = fed->fed_lr_off;
-        fsfilt_add_journal_cb(exp->exp_obd, last_rcvd, oti->oti_handle,
-                              filter_commit_cb, NULL);
+
+        fsfilt_add_journal_cb(exp->exp_obd, filter->fo_sb, last_rcvd,
+                              oti->oti_handle, filter_commit_cb, NULL);
+
         err = fsfilt_write_record(exp->exp_obd, filter->fo_rcvd_filp, fcd,
                                   sizeof(*fcd), &off, 0);
         if (err) {
@@ -184,7 +186,7 @@ static int filter_client_add(struct obd_device *obd, struct filter_obd *filter,
                fed->fed_lr_idx, fed->fed_lr_off, fed->fed_fcd->fcd_uuid);
 
         if (new_client) {
-                struct obd_run_ctxt saved;
+                struct lvfs_run_ctxt saved;
                 loff_t off = fed->fed_lr_off;
                 int err;
                 void *handle;
@@ -192,7 +194,7 @@ static int filter_client_add(struct obd_device *obd, struct filter_obd *filter,
                 CDEBUG(D_INFO, "writing client fcd at idx %u (%llu) (len %u)\n",
                        fed->fed_lr_idx,off,(unsigned int)sizeof(*fed->fed_fcd));
 
-                push_ctxt(&saved, &obd->obd_ctxt, NULL);
+                push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
                 /* Transaction needed to fix bug 1403 */
                 handle = fsfilt_start(obd,
                                       filter->fo_rcvd_filp->f_dentry->d_inode,
@@ -209,7 +211,7 @@ static int filter_client_add(struct obd_device *obd, struct filter_obd *filter,
                                       filter->fo_rcvd_filp->f_dentry->d_inode,
                                       handle, 1);
                 }
-                pop_ctxt(&saved, &obd->obd_ctxt, NULL);
+                pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
 
                 if (err) {
                         CERROR("error writing %s client idx %u: rc %d\n",
@@ -226,7 +228,7 @@ static int filter_client_free(struct obd_export *exp, int flags)
         struct filter_obd *filter = &exp->exp_obd->u.filter;
         struct obd_device *obd = exp->exp_obd;
         struct filter_client_data zero_fcd;
-        struct obd_run_ctxt saved;
+        struct lvfs_run_ctxt saved;
         int rc;
         loff_t off;
         ENTRY;
@@ -257,10 +259,10 @@ static int filter_client_free(struct obd_export *exp, int flags)
         }
 
         memset(&zero_fcd, 0, sizeof zero_fcd);
-        push_ctxt(&saved, &obd->obd_ctxt, NULL);
+        push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
         rc = fsfilt_write_record(obd, filter->fo_rcvd_filp, &zero_fcd,
                                  sizeof(zero_fcd), &off, 1);
-        pop_ctxt(&saved, &obd->obd_ctxt, NULL);
+        pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
 
         CDEBUG(rc == 0 ? D_INFO : D_ERROR,
                "zeroing disconnecting client %s at idx %u (%llu) in %s rc %d\n",
@@ -422,8 +424,9 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp)
         CDEBUG(D_INODE, "%s: server subdir_count: %u\n",
                obd->obd_name, le16_to_cpu(fsd->fsd_subdir_count));
         CDEBUG(D_INODE, "%s: last_rcvd clients: %lu\n", obd->obd_name,
-               last_rcvd_size <= FILTER_LR_CLIENT_START ? 0 :
-               (last_rcvd_size-FILTER_LR_CLIENT_START) /FILTER_LR_CLIENT_SIZE);
+               last_rcvd_size <= le32_to_cpu(fsd->fsd_client_start) ? 0 :
+               (last_rcvd_size - le32_to_cpu(fsd->fsd_client_start)) /
+                le16_to_cpu(fsd->fsd_client_size));
 
         if (!obd->obd_replayable) {
                 CWARN("%s: recovery support OFF\n", obd->obd_name);
@@ -496,8 +499,8 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp)
 
         if (obd->obd_recoverable_clients) {
                 CWARN("RECOVERY: %d recoverable clients, last_rcvd "
-                       LPU64"\n", obd->obd_recoverable_clients,
-                       le64_to_cpu(fsd->fsd_last_transno));
+                      LPU64"\n", obd->obd_recoverable_clients,
+                      le64_to_cpu(fsd->fsd_last_transno));
                 obd->obd_next_recovery_transno = obd->obd_last_committed + 1;
                 obd->obd_recovering = 1;
         }
@@ -559,6 +562,7 @@ static int filter_cleanup_groups(struct obd_device *obd)
         if (filter->fo_last_objid_files != NULL)
                 OBD_FREE(filter->fo_last_objid_files,
                          filter->fo_group_count * sizeof(struct file *));
+        f_dput(filter->fo_dentry_O);
         RETURN(0);
 }
 
@@ -790,13 +794,13 @@ static int filter_prep_groups(struct obd_device *obd)
                 GOTO(cleanup_O0, rc);
 
         cleanup_O0:
-                dput(O0_dentry);
+                f_dput(O0_dentry);
         cleanup_R:
-                dput(dentry);
+                f_dput(dentry);
                 if (rc)
                         GOTO(cleanup, rc);
         } else {
-                dput(dentry);
+                f_dput(dentry);
         }
 
         cleanup_phase = 2; /* groups */
@@ -825,14 +829,14 @@ static int filter_prep_groups(struct obd_device *obd)
 /* setup the object store with correct subdirectories */
 static int filter_prep(struct obd_device *obd)
 {
-        struct obd_run_ctxt saved;
+        struct lvfs_run_ctxt saved;
         struct filter_obd *filter = &obd->u.filter;
         struct file *file;
         struct inode *inode;
         int rc = 0;
         ENTRY;
 
-        push_ctxt(&saved, &obd->obd_ctxt, NULL);
+        push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
         file = filp_open(LAST_RCVD, O_RDWR | O_CREAT | O_LARGEFILE, 0700);
         if (!file || IS_ERR(file)) {
                 rc = PTR_ERR(file);
@@ -865,7 +869,7 @@ static int filter_prep(struct obd_device *obd)
                 GOTO(err_server_data, rc);
 
  out:
-        pop_ctxt(&saved, &obd->obd_ctxt, NULL);
+        pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
 
         return(rc);
 
@@ -882,7 +886,7 @@ static int filter_prep(struct obd_device *obd)
 /* cleanup the filter: write last used object id to status file */
 static void filter_post(struct obd_device *obd)
 {
-        struct obd_run_ctxt saved;
+        struct lvfs_run_ctxt saved;
         struct filter_obd *filter = &obd->u.filter;
         int rc, i;
 
@@ -890,7 +894,7 @@ static void filter_post(struct obd_device *obd)
          * best to start a transaction with h_sync, because we removed this
          * from lastobjid */
 
-        push_ctxt(&saved, &obd->obd_ctxt, NULL);
+        push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
         rc = filter_update_server_data(obd, filter->fo_rcvd_filp,
                                        filter->fo_fsd, 0);
         if (rc)
@@ -904,15 +908,14 @@ static void filter_post(struct obd_device *obd)
                                i, rc);
         }
 
-        filp_close(filter->fo_rcvd_filp, 0);
+        rc = filp_close(filter->fo_rcvd_filp, 0);
         filter->fo_rcvd_filp = NULL;
         if (rc)
                 CERROR("error closing %s: rc = %d\n", LAST_RCVD, rc);
 
         filter_cleanup_groups(obd);
-        f_dput(filter->fo_dentry_O);
         filter_free_server_data(filter);
-        pop_ctxt(&saved, &obd->obd_ctxt, NULL);
+        pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
 }
 
 static void filter_set_last_id(struct filter_obd *filter, int group, obd_id id)
@@ -1004,7 +1007,7 @@ struct dentry *filter_parent(struct obd_device *obd, obd_gr group, obd_id objid)
         LASSERT(group < filter->fo_group_count);
         LASSERT(group > 0);
 
-        if (/*group > 0 || */filter->fo_subdir_count == 0)
+        if (filter->fo_subdir_count == 0)
                 return filter->fo_groups[group];
 
         return filter->fo_subdirs[group].dentry[objid & (filter->fo_subdir_count - 1)];
@@ -1191,6 +1194,7 @@ static int filter_intent_policy(struct ldlm_namespace *ns,
         res_lvb = res->lr_lvb_data;
         LASSERT(res_lvb != NULL);
         reply_lvb->lvb_size = res_lvb->lvb_size;
+        reply_lvb->lvb_blocks = res_lvb->lvb_blocks;
         up(&res->lr_lvb_sem);
 
         list_for_each(tmp, &res->lr_granted) {
@@ -1200,8 +1204,7 @@ static int filter_intent_policy(struct ldlm_namespace *ns,
                 if (tmplock->l_granted_mode == LCK_PR)
                         continue;
 
-                if (tmplock->l_policy_data.l_extent.end <=
-                    reply_lvb->lvb_size)
+                if (tmplock->l_policy_data.l_extent.end <= reply_lvb->lvb_size)
                         continue;
 
                 if (l == NULL) {
@@ -1231,6 +1234,7 @@ static int filter_intent_policy(struct ldlm_namespace *ns,
 
         down(&res->lr_lvb_sem);
         reply_lvb->lvb_size = res_lvb->lvb_size;
+        reply_lvb->lvb_blocks = res_lvb->lvb_blocks;
         up(&res->lr_lvb_sem);
 
         LDLM_LOCK_PUT(l);
@@ -1238,13 +1242,67 @@ static int filter_intent_policy(struct ldlm_namespace *ns,
         RETURN(ELDLM_LOCK_ABORTED);
 }
 
+static int filter_post_fs_cleanup(struct obd_device *obd)
+{
+        int rc = 0;
+
+        rc = fsfilt_post_cleanup(obd);
+
+        RETURN(rc);
+}
+
+static int filter_group_set_kml_flags(struct obd_device *obd, int group)
+{
+        struct filter_obd *filter = &obd->u.filter;
+        int rc = 0, i = 0;
+        ENTRY;        
+        
+        /* zero group is not longer valid. */
+        if (group== 0)
+                RETURN(rc); 
+        for (i = 0; i < filter->fo_subdir_count; i++) {
+                struct dentry *dentry;
+                dentry = (filter->fo_subdirs + group)->dentry[i];
+                rc = fsfilt_set_kml_flags(obd, dentry->d_inode);
+                if (rc)
+                        RETURN(rc);
+        }
+        RETURN(rc);
+}
+static int filter_post_fs_setup(struct obd_device *obd)
+{
+        struct filter_obd *filter = &obd->u.filter;
+        int rc = 0, j = 0;
+        struct llog_ctxt *ctxt = NULL;
+
+        rc = fsfilt_post_setup(obd);
+        if (rc)
+                RETURN(rc);
+        
+        for (j = 0; j < filter->fo_group_count; j++) {
+                rc = filter_group_set_kml_flags(obd, j);
+                if (rc)
+                        return rc;
+        } 
+
+        fsfilt_get_reint_log_ctxt(obd, filter->fo_sb, &ctxt);
+        if (ctxt) {
+                ctxt->loc_obd = obd;
+                ctxt->loc_idx = LLOG_REINT_ORIG_CTXT;
+                obd->obd_llog_ctxt[LLOG_REINT_ORIG_CTXT] = ctxt;
+        }
+        fsfilt_set_ost_flags(obd, filter->fo_sb);
+        return rc;
+}
+
 /* mount the file system (secretly) */
-int filter_common_setup(struct obd_device *obd, obd_count len, void *buf,
-                        char *option)
+int filter_common_setup(struct obd_device *obd, obd_count len,
+                        void *buf, char *option)
 {
         struct lustre_cfg* lcfg = buf;
         struct filter_obd *filter = &obd->u.filter;
         struct vfsmount *mnt;
+        char name[32] = "CATLIST";
         int rc = 0;
         ENTRY;
 
@@ -1283,16 +1341,24 @@ int filter_common_setup(struct obd_device *obd, obd_count len, void *buf,
         filter->fo_fstype = mnt->mnt_sb->s_type->name;
         CDEBUG(D_SUPER, "%s: mnt = %p\n", filter->fo_fstype, mnt);
 
-        OBD_SET_CTXT_MAGIC(&obd->obd_ctxt);
-        obd->obd_ctxt.pwdmnt = mnt;
-        obd->obd_ctxt.pwd = mnt->mnt_root;
-        obd->obd_ctxt.fs = get_ds();
-        obd->obd_ctxt.cb_ops = filter_lvfs_ops;
+        OBD_SET_CTXT_MAGIC(&obd->obd_lvfs_ctxt);
+        obd->obd_lvfs_ctxt.pwdmnt = mnt;
+        obd->obd_lvfs_ctxt.pwd = mnt->mnt_root;
+        obd->obd_lvfs_ctxt.fs = get_ds();
+        obd->obd_lvfs_ctxt.cb_ops = filter_lvfs_ops;
+
+        rc = fsfilt_setup(obd, mnt->mnt_sb);
+        if (rc)
+                GOTO(err_mntput, rc);
 
         rc = filter_prep(obd);
         if (rc)
                 GOTO(err_mntput, rc);
 
+
+        filter->fo_destroy_in_progress = 0;
+        sema_init(&filter->fo_create_lock, 1);
+
         spin_lock_init(&filter->fo_translock);
         spin_lock_init(&filter->fo_objidlock);
         INIT_LIST_HEAD(&filter->fo_export_list);
@@ -1319,12 +1385,11 @@ int filter_common_setup(struct obd_device *obd, obd_count len, void *buf,
         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
                            "filter_ldlm_cb_client", &obd->obd_ldlm_client);
 
-        rc = llog_cat_initialize(obd, &obd->obd_llogs, 1);
+        rc = obd_llog_cat_initialize(obd, &obd->obd_llogs, 1, name);
         if (rc) {
                 CERROR("failed to setup llogging subsystems\n");
                 GOTO(err_post, rc);
         }
-
         RETURN(0);
 
 err_post:
@@ -1339,32 +1404,48 @@ err_ops:
         return rc;
 }
 
-static int filter_setup(struct obd_device *obd, obd_count len, void *buf)
+static int filter_attach(struct obd_device *obd, obd_count len, void *data)
 {
-        struct lustre_cfg* lcfg = buf;
-        const char *str = NULL;
-        char *option = NULL;
-        int n = 0;
+        struct lprocfs_static_vars lvars;
         int rc;
 
-        if (!strcmp(lcfg->lcfg_inlbuf2, "ext3")) {
-#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
-        /* bug 1577: implement async-delete for 2.5 */
-                str = "errors=remount-ro,asyncdel";
-#else
-                str = "errors=remount-ro";
-#endif
-                n = strlen(str) + 1;
-                OBD_ALLOC(option, n);
-                if (option == NULL)
-                        RETURN(-ENOMEM);
-                strcpy(option, str);
-        }
+        lprocfs_init_vars(filter, &lvars);
+        rc = lprocfs_obd_attach(obd, lvars.obd_vars);
+        if (rc != 0)
+                return rc;
 
-        rc = filter_common_setup(obd, len, buf, option);
-        if (option)
-                OBD_FREE(option, n);
-        return rc;
+        rc = lprocfs_alloc_obd_stats(obd, LPROC_FILTER_LAST);
+        if (rc != 0)
+                return rc;
+
+        /* Init obdfilter private stats here */
+        lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_READ_BYTES,
+                             LPROCFS_CNTR_AVGMINMAX, "read_bytes", "bytes");
+        lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_WRITE_BYTES,
+                             LPROCFS_CNTR_AVGMINMAX, "write_bytes", "bytes");
+
+        return lproc_filter_attach_seqstat(obd);
+}
+
+static int filter_detach(struct obd_device *dev)
+{
+        lprocfs_free_obd_stats(dev);
+        return lprocfs_obd_detach(dev);
+}
+
+static int filter_setup(struct obd_device *obd, obd_count len, void *buf)
+{
+        struct lustre_cfg* lcfg = buf;
+        int rc;
+        ENTRY;
+        /* all mount options including errors=remount-ro and asyncdel are passed
+         * using 4th lcfg param. And it is good, finally we have got rid of
+         * hardcoded fs types in the code. */
+        rc = filter_common_setup(obd, len, buf, lcfg->lcfg_inlbuf4);
+        if (rc)
+                RETURN(rc);
+        rc = filter_post_fs_setup(obd);
+        RETURN(rc);
 }
 
 static int filter_cleanup(struct obd_device *obd, int flags)
@@ -1390,6 +1471,7 @@ static int filter_cleanup(struct obd_device *obd, int flags)
         if (filter->fo_sb == NULL)
                 RETURN(0);
 
+        filter_post_fs_cleanup(obd);
         filter_post(obd);
 
         shrink_dcache_parent(filter->fo_sb->s_root);
@@ -1412,35 +1494,6 @@ static int filter_cleanup(struct obd_device *obd, int flags)
         RETURN(0);
 }
 
-static int filter_attach(struct obd_device *obd, obd_count len, void *data)
-{
-        struct lprocfs_static_vars lvars;
-        int rc;
-
-        lprocfs_init_vars(filter, &lvars);
-        rc = lprocfs_obd_attach(obd, lvars.obd_vars);
-        if (rc != 0)
-                return rc;
-
-        rc = lprocfs_alloc_obd_stats(obd, LPROC_FILTER_LAST);
-        if (rc != 0)
-                return rc;
-
-        /* Init obdfilter private stats here */
-        lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_READ_BYTES,
-                             LPROCFS_CNTR_AVGMINMAX, "read_bytes", "bytes");
-        lprocfs_counter_init(obd->obd_stats, LPROC_FILTER_WRITE_BYTES,
-                             LPROCFS_CNTR_AVGMINMAX, "write_bytes", "bytes");
-
-        return lproc_filter_attach_seqstat(obd);
-}
-
-static int filter_detach(struct obd_device *dev)
-{
-        lprocfs_free_obd_stats(dev);
-        return lprocfs_obd_detach(dev);
-}
-
 /* nearly identical to mds_connect */
 static int filter_connect(struct lustre_handle *conn, struct obd_device *obd,
                           struct obd_uuid *cluuid)
@@ -1526,7 +1579,7 @@ static int filter_precleanup(struct obd_device *obd, int flags)
 /* Do extra sanity checks for grant accounting.  We do this at connect,
  * disconnect, and statfs RPC time, so it shouldn't be too bad.  We can
  * always get rid of it or turn it off when we know accounting is good. */
-static void filter_grant_sanity_check(struct obd_device *obd, char *func)
+static void filter_grant_sanity_check(struct obd_device *obd, const char *func)
 {
         struct filter_export_data *fed;
         struct obd_export *exp;
@@ -1736,7 +1789,8 @@ struct dentry *__filter_oa2dentry(struct obd_device *obd,
         }
 
         if (dchild->d_inode == NULL) {
-                CERROR("%s on non-existent object: "LPU64"\n", what, oa->o_id);
+                CERROR("%s: %s on non-existent object: "LPU64"\n",
+                       obd->obd_name, what, oa->o_id);
                 f_dput(dchild);
                 RETURN(ERR_PTR(-ENOENT));
         }
@@ -1775,7 +1829,7 @@ static int filter_getattr(struct obd_export *exp, struct obdo *oa,
 static int filter_setattr(struct obd_export *exp, struct obdo *oa,
                           struct lov_stripe_md *md, struct obd_trans_info *oti)
 {
-        struct obd_run_ctxt saved;
+        struct lvfs_run_ctxt saved;
         struct filter_obd *filter;
         struct dentry *dentry;
         struct iattr iattr;
@@ -1795,7 +1849,7 @@ static int filter_setattr(struct obd_export *exp, struct obdo *oa,
 
         iattr_from_obdo(&iattr, oa, oa->o_valid);
 
-        push_ctxt(&saved, &exp->exp_obd->obd_ctxt, NULL);
+        push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
         lock_kernel();
 
         if (iattr.ia_valid & ATTR_SIZE)
@@ -1843,7 +1897,7 @@ out_unlock:
         if (iattr.ia_valid & ATTR_SIZE)
                 up(&dentry->d_inode->i_sem);
         unlock_kernel();
-        pop_ctxt(&saved, &exp->exp_obd->obd_ctxt, NULL);
+        pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
 
         f_dput(dentry);
         RETURN(rc);
@@ -1908,25 +1962,50 @@ static void filter_destroy_precreated(struct obd_export *exp, struct obdo *oa,
         LASSERT(oa);
 
         memset(&doa, 0, sizeof(doa));
+        if (oa->o_valid & OBD_MD_FLGROUP) {
+                doa.o_valid |= OBD_MD_FLGROUP;
+                doa.o_gr = oa->o_gr;
+        } else {
+                doa.o_gr = 0;
+        }
         doa.o_mode = S_IFREG;
         doa.o_gr = oa->o_gr;
         doa.o_valid = oa->o_valid & OBD_MD_FLGROUP;
-        last = filter_last_id(filter, oa->o_gr);
-        CWARN("deleting orphan objects from "LPU64" to "LPU64"\n",
-               oa->o_id + 1, last);
+
+        filter->fo_destroy_in_progress = 1;
+        down(&filter->fo_create_lock);
+        if (!filter->fo_destroy_in_progress) {
+                CERROR("%s: destroy_in_progress already cleared\n",
+                        exp->exp_obd->obd_name);
+                up(&filter->fo_create_lock);
+                EXIT;
+                return;
+        }
+
+        last = filter_last_id(filter, doa.o_gr);
+        CWARN("%s: deleting orphan objects from "LPU64" to "LPU64"\n",
+               exp->exp_obd->obd_name, oa->o_id + 1, last);
         for (id = oa->o_id + 1; id <= last; id++) {
                 doa.o_id = id;
                 filter_destroy(exp, &doa, NULL, NULL);
         }
+
+        CDEBUG(D_HA, "%s: after destroy: set last_objids["LPU64"] = "LPU64"\n",
+               exp->exp_obd->obd_name, doa.o_gr, oa->o_id);
+
         spin_lock(&filter->fo_objidlock);
         filter->fo_last_objids[doa.o_gr] = oa->o_id;
         spin_unlock(&filter->fo_objidlock);
+
+        filter->fo_destroy_in_progress = 0;
+        up(&filter->fo_create_lock);
+
         EXIT;
 }
 
 /* returns a negative error or a nonnegative number of files to create */
 static int filter_should_precreate(struct obd_export *exp, struct obdo *oa,
-                                   int group)
+                                   obd_gr group)
 {
         struct obd_device *obd = exp->exp_obd;
         struct filter_obd *filter = &obd->u.filter;
@@ -1977,12 +2056,10 @@ static int filter_should_precreate(struct obd_export *exp, struct obdo *oa,
 static int filter_precreate(struct obd_device *obd, struct obdo *oa,
                             obd_gr group, int *num)
 {
-        struct dentry *dchild = NULL;
+        struct dentry *dchild = NULL, *dparent = NULL;
         struct filter_obd *filter;
-        struct dentry *dparent;
-        int err = 0, rc = 0, i;
+        int err = 0, rc = 0, recreate_obj = 0, i;
         __u64 next_id;
-        int recreate_obj = 0;
         void *handle = NULL;
         ENTRY;
 
@@ -1993,9 +2070,19 @@ static int filter_precreate(struct obd_device *obd, struct obdo *oa,
                 recreate_obj = 1;
         }
 
+        CDEBUG(D_HA, "%s: precreating %d objects\n", obd->obd_name, *num);
+
+        down(&filter->fo_create_lock);
+
         for (i = 0; i < *num && err == 0; i++) {
                 int cleanup_phase = 0;
 
+                if (filter->fo_destroy_in_progress) {
+                        CWARN("%s: precreate aborted by destroy\n",
+                              obd->obd_name);
+                        break;
+                }
+
                 if (recreate_obj) {
                         __u64 last_id;
                         next_id = oa->o_id;
@@ -2004,7 +2091,7 @@ static int filter_precreate(struct obd_device *obd, struct obdo *oa,
                                 CERROR("Error: Trying to recreate obj greater"
                                        "than last id "LPD64" > "LPD64"\n",
                                        next_id, last_id);
-                                RETURN(-EINVAL);
+                                GOTO(cleanup, rc = -EINVAL);
                         }
                 } else {
                         next_id = filter_last_id(filter, group) + 1;
@@ -2028,13 +2115,17 @@ static int filter_precreate(struct obd_device *obd, struct obdo *oa,
                          * already exists
                          */
                         if (recreate_obj) {
-                                CERROR("Serious error: recreating obj %*s but "
-                                       "obj already exists \n",
-                                       dchild->d_name.len, dchild->d_name.name);
+                                CERROR("%s: Serious error: recreating obj %*s "
+                                       "but obj already exists \n",
+                                       obd->obd_name, dchild->d_name.len,
+                                       dchild->d_name.name);
+                                LBUG();
                         } else {
-                                CERROR("Serious error: objid %*s already "
+                                CERROR("%s: Serious error: objid %*s already "
                                        "exists; is this filesystem corrupt?\n",
-                                       dchild->d_name.len, dchild->d_name.name);
+                                       obd->obd_name, dchild->d_name.len,
+                                       dchild->d_name.name);
+                                LBUG();
                         }
                         GOTO(cleanup, rc = -EEXIST);
                 }
@@ -2081,7 +2172,13 @@ static int filter_precreate(struct obd_device *obd, struct obdo *oa,
         }
         *num = i;
 
-        CDEBUG(D_INFO, "filter_precreate() created %d objects\n", i);
+        up(&filter->fo_create_lock);
+
+        CDEBUG(D_HA, "%s: server last_objid for group "LPU64": "LPU64"\n",
+               obd->obd_name, group, filter->fo_last_objids[group]);
+
+        CDEBUG(D_HA, "%s: filter_precreate() created %d objects\n",
+               obd->obd_name, i);
         RETURN(rc);
 }
 
@@ -2090,7 +2187,7 @@ static int filter_create(struct obd_export *exp, struct obdo *oa,
 {
         struct obd_device *obd = NULL;
         struct filter_obd *filter;
-        struct obd_run_ctxt saved;
+        struct lvfs_run_ctxt saved;
         struct lov_stripe_md *lsm = NULL;
         struct filter_export_data *fed;
         int group = oa->o_gr;
@@ -2110,7 +2207,7 @@ static int filter_create(struct obd_export *exp, struct obdo *oa,
         fed = &exp->exp_filter_data;
         filter = &obd->u.filter;
 
-        if (fed->fed_group != group) {
+        if (fed->fed_group != group && !(oa->o_valid & OBD_MD_REINT)) {
                 portals_nid2str(exp->exp_connection->c_peer.peer_ni->pni_number,
                                 exp->exp_connection->c_peer.peer_nid, str);
                 CERROR("!!! This export (nid "LPX64"/%s) used object group %d "
@@ -2132,7 +2229,8 @@ static int filter_create(struct obd_export *exp, struct obdo *oa,
                 }
         }
 
-        push_ctxt(&saved, &obd->obd_ctxt, NULL);
+        obd = exp->exp_obd;
+        push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
 
         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
             (oa->o_flags & OBD_FL_RECREATE_OBJS)) {
@@ -2154,7 +2252,7 @@ static int filter_create(struct obd_export *exp, struct obdo *oa,
                 }
         }
 
-        pop_ctxt(&saved, &obd->obd_ctxt, NULL);
+        pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
         if (rc && ea != NULL && *ea != lsm) {
                 obd_free_memmd(exp, &lsm);
         } else if (rc == 0 && ea != NULL) {
@@ -2176,7 +2274,7 @@ static int filter_destroy(struct obd_export *exp, struct obdo *oa,
         struct obd_device *obd;
         struct filter_obd *filter;
         struct dentry *dchild = NULL, *dparent = NULL;
-        struct obd_run_ctxt saved;
+        struct lvfs_run_ctxt saved;
         void *handle = NULL;
         struct llog_cookie *fcc = NULL;
         int rc, rc2, cleanup_phase = 0, have_prepared = 0;
@@ -2187,7 +2285,7 @@ static int filter_destroy(struct obd_export *exp, struct obdo *oa,
         obd = exp->exp_obd;
         filter = &obd->u.filter;
 
-        push_ctxt(&saved, &obd->obd_ctxt, NULL);
+        push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
 
  acquire_locks:
         dparent = filter_parent_lock(obd, oa->o_gr, oa->o_id);
@@ -2227,9 +2325,10 @@ static int filter_destroy(struct obd_export *exp, struct obdo *oa,
                 goto acquire_locks;
         }
 
-        handle = fsfilt_start_log(obd, dparent->d_inode,FSFILT_OP_UNLINK,oti,1);
+        handle = fsfilt_start_log(obd, dparent->d_inode, FSFILT_OP_UNLINK, oti, 1);
         if (IS_ERR(handle))
                 GOTO(cleanup, rc = PTR_ERR(handle));
+
         cleanup_phase = 3;
 
         /* Our MDC connection is established by the MDS to us */
@@ -2246,11 +2345,13 @@ cleanup:
         case 3:
                 if (fcc != NULL) {
                         if (oti != NULL)
-                                fsfilt_add_journal_cb(obd, 0, oti->oti_handle,
+                                fsfilt_add_journal_cb(obd, filter->fo_sb, 0,
+                                                      oti->oti_handle,
                                                       filter_cancel_cookies_cb,
                                                       fcc);
                         else
-                                fsfilt_add_journal_cb(obd, 0, handle,
+                                fsfilt_add_journal_cb(obd, filter->fo_sb, 0,
+                                                      handle,
                                                       filter_cancel_cookies_cb,
                                                       fcc);
                 }
@@ -2266,7 +2367,7 @@ cleanup:
         case 1:
                 filter_parent_unlock(dparent);
         case 0:
-                pop_ctxt(&saved, &obd->obd_ctxt, NULL);
+                pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
                 break;
         default:
                 CERROR("invalid cleanup_phase %d\n", cleanup_phase);
@@ -2300,7 +2401,7 @@ static int filter_sync(struct obd_export *exp, struct obdo *oa,
                        struct lov_stripe_md *lsm, obd_off start, obd_off end)
 {
         struct obd_device *obd = exp->exp_obd;
-        struct obd_run_ctxt saved;
+        struct lvfs_run_ctxt saved;
         struct filter_obd *filter;
         struct dentry *dentry;
         struct llog_ctxt *ctxt;
@@ -2322,7 +2423,7 @@ static int filter_sync(struct obd_export *exp, struct obdo *oa,
         if (IS_ERR(dentry))
                 RETURN(PTR_ERR(dentry));
 
-        push_ctxt(&saved, &obd->obd_ctxt, NULL);
+        push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
 
         down(&dentry->d_inode->i_sem);
         rc = filemap_fdatasync(dentry->d_inode->i_mapping);
@@ -2342,7 +2443,7 @@ static int filter_sync(struct obd_export *exp, struct obdo *oa,
         oa->o_valid = OBD_MD_FLID;
         obdo_from_inode(oa, dentry->d_inode, FILTER_VALID_FLAGS);
 
-        pop_ctxt(&saved, &obd->obd_ctxt, NULL);
+        pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
 
         f_dput(dentry);
         RETURN(rc);
@@ -2414,13 +2515,29 @@ static int filter_get_info(struct obd_export *exp, __u32 keylen,
                 *last_id = filter_last_id(&obd->u.filter, fed->fed_group);
                 RETURN(0);
         }
+        if (keylen >= strlen("reint_log") && memcmp(key, "reint_log", 9) == 0) {
+                /*Get log_context handle*/
+                unsigned long *llh_handle = val;
+                *vallen = sizeof(unsigned long);
+                *llh_handle = (unsigned long)obd->obd_llog_ctxt[LLOG_REINT_ORIG_CTXT];
+                RETURN(0);
+        }
+        if (keylen >= strlen("cache_sb") && memcmp(key, "cache_sb", 8) == 0) {
+                /*Get log_context handle*/
+                unsigned long *sb = val;
+                *vallen = sizeof(unsigned long);
+                *sb = (unsigned long)obd->u.filter.fo_sb;
+                RETURN(0);
+        }
+
         CDEBUG(D_IOCTL, "invalid key\n");
         RETURN(-EINVAL);
 }
 
-struct obd_llogs * filter_grab_llog_for_group(struct obd_device *obd, int group)
+struct obd_llogs *filter_grab_llog_for_group(struct obd_device *obd, int group)
 {
         struct filter_group_llog *fglog, *nlog;
+        char name[32] = "CATLIST";
         struct filter_obd *filter;
         struct list_head *cur;
         int rc;
@@ -2456,7 +2573,7 @@ struct obd_llogs * filter_grab_llog_for_group(struct obd_device *obd, int group)
         list_add(&fglog->list, &filter->fo_llog_list);
         spin_unlock(&filter->fo_llog_list_lock);
 
-        rc = llog_cat_initialize(obd, fglog->llogs, 1);
+        rc = obd_llog_cat_initialize(obd, fglog->llogs, 1, name);
         if (rc) {
                 OBD_FREE(fglog->llogs, sizeof(*(fglog->llogs)));
                 OBD_FREE(fglog, sizeof(*fglog));
@@ -2472,7 +2589,7 @@ struct obd_llogs * filter_grab_llog_for_group(struct obd_device *obd, int group)
 static int filter_set_info(struct obd_export *exp, __u32 keylen,
                            void *key, __u32 vallen, void *val)
 {
-        struct obd_run_ctxt saved;
+        struct lvfs_run_ctxt saved;
         struct filter_export_data *fed = &exp->exp_filter_data;
         struct obd_device *obd;
         struct lustre_handle conn;
@@ -2513,14 +2630,18 @@ static int filter_set_info(struct obd_export *exp, __u32 keylen,
 
         LASSERT(rc == 0);
 
-        push_ctxt(&saved, &obd->obd_ctxt, NULL);
+        push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
         rc = filter_read_groups(obd, group, 1);
-        pop_ctxt(&saved, &obd->obd_ctxt, NULL);
+        pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
         if (rc != 0) {
                 CERROR("can't read group %u\n", group);
                 RETURN(rc);
         }
-       
+        rc = filter_group_set_kml_flags(obd, group);
+         if (rc != 0) {
+                CERROR("can't set kml flags %u\n", group);
+                RETURN(rc);
+        }
         llog = filter_grab_llog_for_group(obd, group);
         LASSERT(llog != NULL);
 
@@ -2560,7 +2681,7 @@ int filter_iocontrol(unsigned int cmd, struct obd_export *exp,
         }
 
         case OBD_IOC_CATLOGLIST: {
-                rc = llog_catlog_list(obd, 1, data);
+                rc = llog_catalog_list(obd, 1, data);
                 RETURN(rc);
         }
 
@@ -2573,9 +2694,9 @@ int filter_iocontrol(unsigned int cmd, struct obd_export *exp,
 /*
                 struct llog_ctxt *ctxt = NULL;
 
-                push_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_ctxt, NULL);
+                push_ctxt(&saved, &ctxt->loc_ctxt, NULL);
                 rc = llog_ioctl(ctxt, cmd, data);
-                pop_ctxt(&saved, &ctxt->loc_exp->exp_obd->obd_ctxt, NULL);
+                pop_ctxt(&saved, &ctxt->loc_ctxt, NULL);
 
                 RETURN(rc);
 */
@@ -2591,13 +2712,13 @@ int filter_iocontrol(unsigned int cmd, struct obd_export *exp,
 static struct llog_operations filter_unlink_repl_logops;
 static struct llog_operations filter_size_orig_logops = {
         lop_setup: llog_obd_origin_setup,
-        lop_cleanup: llog_obd_origin_cleanup,
-        lop_add: llog_obd_origin_add
+        lop_cleanup: llog_catalog_cleanup,
+        lop_add: llog_catalog_add,
 };
 
 static int filter_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
                             struct obd_device *tgt, int count,
-                            struct llog_catid *logid)
+                            struct llog_catid *catid)
 {
         struct llog_ctxt *ctxt;
         int rc;
@@ -2608,7 +2729,7 @@ static int filter_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
         filter_unlink_repl_logops.lop_connect = llog_repl_connect;
         filter_unlink_repl_logops.lop_sync = llog_obd_repl_sync;
 
-        rc = llog_setup(obd, llogs, LLOG_UNLINK_REPL_CTXT, tgt, 0, NULL,
+        rc = obd_llog_setup(obd, llogs, LLOG_UNLINK_REPL_CTXT, tgt, 0, NULL,
                         &filter_unlink_repl_logops);
         if (rc)
                 RETURN(rc);
@@ -2616,8 +2737,9 @@ static int filter_llog_init(struct obd_device *obd, struct obd_llogs *llogs,
         ctxt = llog_get_context(llogs, LLOG_UNLINK_REPL_CTXT);
         ctxt->llog_proc_cb = filter_recov_log_unlink_cb;
 
-        rc = llog_setup(obd, llogs, LLOG_SIZE_ORIG_CTXT, tgt, 0, NULL,
-                        &filter_size_orig_logops);
+        /* FIXME - count should be 1 to setup size log */
+        rc = obd_llog_setup(obd, llogs, LLOG_SIZE_ORIG_CTXT, tgt, 0, 
+                            &catid->lci_logid, &filter_size_orig_logops);
         RETURN(rc);
 }
 
@@ -2627,16 +2749,16 @@ static int filter_llog_finish(struct obd_device *obd,
         int rc;
         ENTRY;
 
-        rc = llog_cleanup(llog_get_context(llogs, LLOG_UNLINK_REPL_CTXT));
+        rc = obd_llog_cleanup(llog_get_context(llogs, LLOG_UNLINK_REPL_CTXT));
         if (rc)
                 RETURN(rc);
 
-        rc = llog_cleanup(llog_get_context(llogs, LLOG_SIZE_ORIG_CTXT));
+        rc = obd_llog_cleanup(llog_get_context(llogs, LLOG_SIZE_ORIG_CTXT));
         RETURN(rc);
 }
 
 static int filter_llog_connect(struct obd_device *obd,
-                                struct llogd_conn_body *body) 
+                               struct llogd_conn_body *body) 
 {
         struct llog_ctxt *ctxt;
         struct obd_llogs *llog;
@@ -2669,62 +2791,64 @@ static struct lvfs_callback_ops filter_lvfs_ops = {
 };
 
 static struct obd_ops filter_obd_ops = {
-        o_owner:          THIS_MODULE,
-        o_attach:         filter_attach,
-        o_detach:         filter_detach,
-        o_get_info:       filter_get_info,
-        o_set_info:       filter_set_info,
-        o_setup:          filter_setup,
-        o_precleanup:     filter_precleanup,
-        o_cleanup:        filter_cleanup,
-        o_connect:        filter_connect,
-        o_disconnect:     filter_disconnect,
-        o_statfs:         filter_statfs,
-        o_getattr:        filter_getattr,
-        o_unpackmd:       filter_unpackmd,
-        o_create:         filter_create,
-        o_setattr:        filter_setattr,
-        o_destroy:        filter_destroy,
-        o_brw:            filter_brw,
-        o_punch:          filter_truncate,
-        o_sync:           filter_sync,
-        o_preprw:         filter_preprw,
-        o_commitrw:       filter_commitrw,
-        o_destroy_export: filter_destroy_export,
-        o_llog_init:      filter_llog_init,
-        o_llog_finish:    filter_llog_finish,
-        o_llog_connect:   filter_llog_connect,
-        o_iocontrol:      filter_iocontrol,
+        .o_owner          = THIS_MODULE,
+        .o_attach         = filter_attach,
+        .o_detach         = filter_detach,
+        .o_get_info       = filter_get_info,
+        .o_set_info       = filter_set_info,
+        .o_setup          = filter_setup,
+        .o_precleanup     = filter_precleanup,
+        .o_cleanup        = filter_cleanup,
+        .o_connect        = filter_connect,
+        .o_disconnect     = filter_disconnect,
+        .o_statfs         = filter_statfs,
+        .o_getattr        = filter_getattr,
+        .o_unpackmd       = filter_unpackmd,
+        .o_create         = filter_create,
+        .o_setattr        = filter_setattr,
+        .o_destroy        = filter_destroy,
+        .o_brw            = filter_brw,
+        .o_punch          = filter_truncate,
+        .o_sync           = filter_sync,
+        .o_preprw         = filter_preprw,
+        .o_commitrw       = filter_commitrw,
+        .o_write_extents  = filter_write_extents,
+        .o_destroy_export = filter_destroy_export,
+        .o_llog_init      = filter_llog_init,
+        .o_llog_finish    = filter_llog_finish,
+        .o_llog_connect   = filter_llog_connect,
+        .o_iocontrol      = filter_iocontrol,
 };
 
 static struct obd_ops filter_sanobd_ops = {
-        o_owner:          THIS_MODULE,
-        o_attach:         filter_attach,
-        o_detach:         filter_detach,
-        o_get_info:       filter_get_info,
-        o_set_info:       filter_set_info,
-        o_setup:          filter_san_setup,
-        o_precleanup:     filter_precleanup,
-        o_cleanup:        filter_cleanup,
-        o_connect:        filter_connect,
-        o_disconnect:     filter_disconnect,
-        o_statfs:         filter_statfs,
-        o_getattr:        filter_getattr,
-        o_unpackmd:       filter_unpackmd,
-        o_create:         filter_create,
-        o_setattr:        filter_setattr,
-        o_destroy:        filter_destroy,
-        o_brw:            filter_brw,
-        o_punch:          filter_truncate,
-        o_sync:           filter_sync,
-        o_preprw:         filter_preprw,
-        o_commitrw:       filter_commitrw,
-        o_san_preprw:     filter_san_preprw,
-        o_destroy_export: filter_destroy_export,
-        o_llog_init:      filter_llog_init,
-        o_llog_finish:    filter_llog_finish,
-        o_llog_connect:   filter_llog_connect,
-        o_iocontrol:      filter_iocontrol,
+        .o_owner          = THIS_MODULE,
+        .o_attach         = filter_attach,
+        .o_detach         = filter_detach,
+        .o_get_info       = filter_get_info,
+        .o_set_info       = filter_set_info,
+        .o_setup          = filter_san_setup,
+        .o_precleanup     = filter_precleanup,
+        .o_cleanup        = filter_cleanup,
+        .o_connect        = filter_connect,
+        .o_disconnect     = filter_disconnect,
+        .o_statfs         = filter_statfs,
+        .o_getattr        = filter_getattr,
+        .o_unpackmd       = filter_unpackmd,
+        .o_create         = filter_create,
+        .o_setattr        = filter_setattr,
+        .o_destroy        = filter_destroy,
+        .o_brw            = filter_brw,
+        .o_punch          = filter_truncate,
+        .o_sync           = filter_sync,
+        .o_preprw         = filter_preprw,
+        .o_commitrw       = filter_commitrw,
+        .o_write_extents  = filter_write_extents,
+        .o_san_preprw     = filter_san_preprw,
+        .o_destroy_export = filter_destroy_export,
+        .o_llog_init      = filter_llog_init,
+        .o_llog_finish    = filter_llog_finish,
+        .o_llog_connect   = filter_llog_connect,
+        .o_iocontrol      = filter_iocontrol,
 };
 
 static int __init obdfilter_init(void)