Whamcloud - gitweb
Add flock support.
[fs/lustre-release.git] / lustre / lvfs / llog_lvfs.c
index 80e60e5..938cfc3 100644 (file)
@@ -204,7 +204,7 @@ static int llog_lvfs_write_rec(struct llog_handle *loghandle,
                 loff_t saved_offset;
 
                 /* no header: only allowed to insert record 1 */
-                if (idx != 1 && !file->f_dentry->d_inode->i_size) {
+                if (idx > 1 && !file->f_dentry->d_inode->i_size) {
                         CERROR("idx != -1 in empty log\n");
                         LBUG();
                 }
@@ -246,6 +246,9 @@ static int llog_lvfs_write_rec(struct llog_handle *loghandle,
                 rc = llog_lvfs_pad(ctxt, file, left, loghandle->lgh_last_idx);
                 if (rc)
                         RETURN(rc);
+                /* if it's the last idx in log file, then return -ENOSPC */
+                if (loghandle->lgh_last_idx == LLOG_BITMAP_SIZE(llh) - 1)
+                        RETURN(-ENOSPC);
         }
 
         loghandle->lgh_last_idx++;
@@ -268,15 +271,32 @@ static int llog_lvfs_write_rec(struct llog_handle *loghandle,
         if (rc)
                 RETURN(rc);
 
+        CDEBUG(D_HA, "adding record "LPX64": idx: %u, %u bytes off: %lld\n",
+               loghandle->lgh_id.lgl_oid, index, le32_to_cpu(rec->lrh_len),
+               file->f_pos);
+
         rc = llog_lvfs_write_blob(ctxt, file, rec, buf, file->f_pos);
         if (rc)
                 RETURN(rc);
 
-        CDEBUG(D_HA, "added record "LPX64": idx: %u, %u bytes\n",
-               loghandle->lgh_id.lgl_oid, index, le32_to_cpu(rec->lrh_len));
         if (rc == 0 && reccookie) {
-                reccookie->lgc_lgl = loghandle->lgh_id;
-                reccookie->lgc_index = index;
+                if (llog_cookie_get_flags(reccookie) & LLOG_COOKIE_REPLAY) {
+                        LASSERTF(EQ_LOGID(reccookie->lgc_lgl,loghandle->lgh_id),
+                                 "lgc_lgl.oid/gr "LPU64"/"LPU64" lgh_id.oid/gr"
+                                 LPU64"/"LPU64"\n",
+                                 reccookie->lgc_lgl.lgl_oid,
+                                 reccookie->lgc_lgl.lgl_ogr,
+                                 loghandle->lgh_id.lgl_oid,
+                                 loghandle->lgh_id.lgl_oid);
+                        LASSERTF(reccookie->lgc_index == index,
+                                 "lgc_index %u != index %u\n",
+                                 reccookie->lgc_index, index);
+                } else {
+                        reccookie->lgc_lgl = loghandle->lgh_id;
+                        reccookie->lgc_index = index;
+                        llog_cookie_add_flags(reccookie, LLOG_COOKIE_REPLAY);
+                }
+
                 if (le32_to_cpu(rec->lrh_type) == MDS_UNLINK_REC)
                         reccookie->lgc_subsys = LLOG_UNLINK_ORIG_CTXT;
                 else if (le32_to_cpu(rec->lrh_type) == OST_SZ_REC)
@@ -318,7 +338,6 @@ static int llog_lvfs_next_block(struct llog_handle *loghandle, int *curr_idx,
                                 int len)
 {
         struct llog_ctxt *ctxt = loghandle->lgh_ctxt;
-        int rc;
         ENTRY;
 
         if (len == 0 || len & (LLOG_CHUNK_SIZE - 1))
@@ -331,6 +350,7 @@ static int llog_lvfs_next_block(struct llog_handle *loghandle, int *curr_idx,
                 struct llog_rec_hdr *rec;
                 struct llog_rec_tail *tail;
                 loff_t ppos;
+                int nbytes, rc;
 
                 llog_skip_over(curr_offset, *curr_idx, next_idx);
 
@@ -347,21 +367,20 @@ static int llog_lvfs_next_block(struct llog_handle *loghandle, int *curr_idx,
                         RETURN(rc);
                 }
 
-                /* put number of bytes read into rc to make code simpler */
-                rc = ppos - *curr_offset;
+                nbytes = ppos - *curr_offset;
                 *curr_offset = ppos;
 
-                if (rc == 0) /* end of file, nothing to do */
+                if (nbytes == 0) /* end of file, nothing to do */
                         RETURN(0);
 
-                if (rc < sizeof(*tail)) {
+                if (nbytes < sizeof(*tail)) {
                         CERROR("Invalid llog block at log id "LPU64"/%u offset "
                                LPU64"\n", loghandle->lgh_id.lgl_oid,
                                loghandle->lgh_id.lgl_ogen, *curr_offset);
                         RETURN(-EINVAL);
                 }
 
-                tail = buf + rc - sizeof(struct llog_rec_tail);
+                tail = buf + nbytes - sizeof(struct llog_rec_tail);
                 *curr_idx = le32_to_cpu(tail->lrt_index);
 
                 /* this shouldn't happen */
@@ -476,7 +495,8 @@ static struct file *llog_filp_open(char *name, int flags, int mode)
         } else {
                 filp = l_filp_open(logname, flags, mode);
                 if (IS_ERR(filp)) {
-                        CERROR("logfile creation %s: %ld\n", logname,
+                        CERROR("logfile %s(%s): %ld\n",
+                               flags & O_CREAT ? "create" : "open", logname,
                                PTR_ERR(filp));
                 }
         }
@@ -485,73 +505,243 @@ static struct file *llog_filp_open(char *name, int flags, int mode)
         return filp;
 }
 
-static struct file *llog_object_create(struct llog_ctxt *ctxt)
+/* creates object for the case when we have no obd (smfs). */
+static struct file *
+llog_object_create_alone(struct llog_ctxt *ctxt, struct llog_logid *lgh_id)
 {
-        unsigned int tmpname = ll_insecure_random_int();
-        char fidname[LL_FID_NAMELEN];
         struct file *filp;
-        struct dentry *new_child, *parent;
-        void *handle;
-        int rc = 0, err, namelen;
+        int rc = 0;
         ENTRY;
 
-        sprintf(fidname, "OBJECTS/%u", tmpname);
-        filp = filp_open(fidname, O_CREAT | O_EXCL, 0644);
-        if (IS_ERR(filp)) {
-                rc = PTR_ERR(filp);
-                if (rc == -EEXIST) {
-                        CERROR("impossible object name collision %u\n",
-                               tmpname);
+        LASSERT(lgh_id != NULL);
+        if (lgh_id->lgl_oid) {
+                struct dentry *dchild;
+                char fidname[LL_FID_NAMELEN];
+                int fidlen = 0;
+
+                down(&ctxt->loc_objects_dir->d_inode->i_sem);
+                fidlen = ll_fid2str(fidname, lgh_id->lgl_oid, lgh_id->lgl_ogen);
+                dchild = lookup_one_len(fidname, ctxt->loc_objects_dir, fidlen);
+                if (IS_ERR(dchild)) {
+                        up(&ctxt->loc_objects_dir->d_inode->i_sem);
+                        RETURN((struct file *)dchild);
+                }
+                if (dchild->d_inode == NULL) {
+                        struct dentry_params dp;
+                        struct inode *inode;
+
+                        dchild->d_fsdata = (void *) &dp;
+                        dp.p_ptr = NULL;
+                        dp.p_inum = lgh_id->lgl_oid;
+                        rc = ll_vfs_create(ctxt->loc_objects_dir->d_inode,
+                                           dchild, S_IFREG, NULL);
+                        if (dchild->d_fsdata == (void *)(unsigned long)lgh_id->lgl_oid)
+                                dchild->d_fsdata = NULL;
+                        if (rc) {
+                                CDEBUG(D_INODE, "err during create: %d\n", rc);
+                                dput(dchild);
+                                up(&ctxt->loc_objects_dir->d_inode->i_sem);
+                                RETURN(ERR_PTR(rc));
+                        }
+                        inode = dchild->d_inode;
+                        LASSERT(inode->i_ino == lgh_id->lgl_oid);
+                        inode->i_generation = lgh_id->lgl_ogen;
+                        CDEBUG(D_HA, "recreated ino %lu with gen %u\n",
+                               inode->i_ino, inode->i_generation);
+                        mark_inode_dirty(inode);
+                }
+
+                mntget(ctxt->loc_lvfs_ctxt->pwdmnt);
+                filp = dentry_open(dchild, ctxt->loc_lvfs_ctxt->pwdmnt,
+                                    O_RDWR | O_LARGEFILE);
+                if (IS_ERR(filp)) {
+                        dput(dchild);
+                        up(&ctxt->loc_objects_dir->d_inode->i_sem);
+                        RETURN(filp);
+                }
+                if (!S_ISREG(filp->f_dentry->d_inode->i_mode)) {
+                        CERROR("%s is not a regular file!: mode = %o\n", fidname,
+                               filp->f_dentry->d_inode->i_mode);
+                        filp_close(filp, 0);
+                        up(&ctxt->loc_objects_dir->d_inode->i_sem);
+                        RETURN(ERR_PTR(-ENOENT));
+                }
+
+                up(&ctxt->loc_objects_dir->d_inode->i_sem);
+                RETURN(filp);
+
+        } else {
+                unsigned int tmpname = ll_insecure_random_int();
+                char fidname[LL_FID_NAMELEN];
+                struct dentry *new_child, *parent;
+                void *handle;
+                int err, namelen;
+
+                sprintf(fidname, "OBJECTS/%u", tmpname);
+                filp = filp_open(fidname, O_CREAT | O_EXCL, 0644);
+                if (IS_ERR(filp)) {
+                        rc = PTR_ERR(filp);
+                        if (rc == -EEXIST) {
+                                CERROR("impossible object name collision %u\n",
+                                        tmpname);
+                                LBUG();
+                        }
+                        CERROR("error creating tmp object %u: rc %d\n", tmpname, rc);
+                        RETURN(filp);
+                }
+
+                namelen = ll_fid2str(fidname, filp->f_dentry->d_inode->i_ino,
+                                     filp->f_dentry->d_inode->i_generation);
+                parent = filp->f_dentry->d_parent;
+                down(&parent->d_inode->i_sem);
+                new_child = lookup_one_len(fidname, parent, namelen);
+                if (IS_ERR(new_child)) {
+                        CERROR("getting neg dentry for obj rename: %d\n", rc);
+                        GOTO(out_close, rc = PTR_ERR(new_child));
+                }
+                if (new_child->d_inode != NULL) {
+                        CERROR("impossible non-negative obj dentry %lu:%u!\n",
+                                filp->f_dentry->d_inode->i_ino,
+                                filp->f_dentry->d_inode->i_generation);
                         LBUG();
                 }
-                CERROR("error creating tmp object %u: rc %d\n", tmpname, rc);
+
+                handle = llog_fsfilt_start(ctxt, parent->d_inode, FSFILT_OP_RENAME, NULL);
+                if (IS_ERR(handle))
+                        GOTO(out_dput, rc = PTR_ERR(handle));
+
+                lock_kernel();
+                rc = vfs_rename(parent->d_inode, filp->f_dentry,
+                                parent->d_inode, new_child);
+                unlock_kernel();
+                if (rc)
+                        CERROR("error renaming new object %lu:%u: rc %d\n",
+                                filp->f_dentry->d_inode->i_ino,
+                                filp->f_dentry->d_inode->i_generation, rc);
+
+                err = llog_fsfilt_commit(ctxt, parent->d_inode, handle, 0);
+                if (!rc)
+                        rc = err;
+
+        out_dput:
+                dput(new_child);
+        out_close:
+                up(&parent->d_inode->i_sem);
+                if (rc) {
+                        filp_close(filp, 0);
+                        filp = ERR_PTR(rc);
+                } else {
+                        /* FIXME: is this group 1 is correct? */
+                        lgh_id->lgl_ogr = 1;
+                        lgh_id->lgl_oid = filp->f_dentry->d_inode->i_ino;
+                        lgh_id->lgl_ogen = filp->f_dentry->d_inode->i_generation;
+                }
                 RETURN(filp);
         }
+}
 
-        namelen = ll_fid2str(fidname, filp->f_dentry->d_inode->i_ino,
-                             filp->f_dentry->d_inode->i_generation);
-        parent = filp->f_dentry->d_parent;
-        down(&parent->d_inode->i_sem);
-        new_child = lookup_one_len(fidname, parent, namelen);
-        if (IS_ERR(new_child)) {
-                CERROR("getting neg dentry for obj rename: %d\n", rc);
-                GOTO(out_close, rc = PTR_ERR(new_child));
-        }
-        if (new_child->d_inode != NULL) {
-                CERROR("impossible non-negative obj dentry %lu:%u!\n",
-                       filp->f_dentry->d_inode->i_ino,
-                       filp->f_dentry->d_inode->i_generation);
-                LBUG();
-        }
+/* creates object for generic case (obd exists) */
+static struct file *
+llog_object_create_generic(struct llog_ctxt *ctxt, struct llog_logid *lgh_id)
+{
+        struct file *filp = NULL;
+        struct dentry *dchild;
+        struct obd_device *obd;
+        struct obdo *oa = NULL;
+        int open_flags = O_RDWR | O_LARGEFILE;
+        int rc = 0;
+        ENTRY;
 
-        handle = llog_fsfilt_start(ctxt, parent->d_inode, FSFILT_OP_RENAME, NULL);
-        if (IS_ERR(handle))
-                GOTO(out_dput, rc = PTR_ERR(handle));
+        obd = ctxt->loc_exp->exp_obd;
+        LASSERT(obd != NULL);
+
+        if (lgh_id->lgl_oid) {
+                dchild = obd_lvfs_fid2dentry(ctxt->loc_exp, lgh_id->lgl_oid,
+                                             lgh_id->lgl_ogen, lgh_id->lgl_ogr);
+                if (IS_ERR(dchild) == -ENOENT) {
+                        OBD_ALLOC(oa, sizeof(*oa));
+                        if (!oa)
+                                RETURN(ERR_PTR(-ENOMEM));
+
+                        oa->o_id = lgh_id->lgl_oid;
+                        oa->o_generation = lgh_id->lgl_ogen;
+                        oa->o_gr = lgh_id->lgl_ogr;
+                        oa->o_valid = OBD_MD_FLGENER | OBD_MD_FLGROUP;
+                        rc = obd_create(ctxt->loc_exp, oa, NULL, NULL);
+                        if (rc) {
+                                CDEBUG(D_INODE, "err during create: %d\n", rc);
+                                GOTO(out_free_oa, rc);
+                        }
+                        CDEBUG(D_HA, "re-create log object "LPX64":0x%x:"LPX64"\n",
+                               lgh_id->lgl_oid, lgh_id->lgl_ogen, lgh_id->lgl_ogr);
+
+                        dchild = obd_lvfs_fid2dentry(ctxt->loc_exp, lgh_id->lgl_oid,
+                                                     lgh_id->lgl_ogen, lgh_id->lgl_ogr);
+                } else if (IS_ERR(dchild)) {
+                        CERROR("error looking up logfile "LPX64":0x%x: rc %d\n",
+                               lgh_id->lgl_oid, lgh_id->lgl_ogen, rc);
+                        RETURN((struct file *)dchild);
+                }
 
-        lock_kernel();
-        rc = vfs_rename(parent->d_inode, filp->f_dentry,
-                        parent->d_inode, new_child);
-        unlock_kernel();
-        if (rc)
-                CERROR("error renaming new object %lu:%u: rc %d\n",
-                       filp->f_dentry->d_inode->i_ino,
-                       filp->f_dentry->d_inode->i_generation, rc);
+                filp = l_dentry_open(&obd->obd_lvfs_ctxt, dchild, open_flags);
+                if (IS_ERR(filp)) {
+                        l_dput(dchild);
+                        rc = PTR_ERR(filp);
+                        CERROR("error opening logfile "LPX64"0x%x: rc %d\n",
+                               lgh_id->lgl_oid, lgh_id->lgl_ogen, rc);
+                }
+                GOTO(out_free_oa, rc);
+        } else {
+                /* this is important to work here over obd_create() as it manages 
+                  groups and we need it. Yet another reason is that mds_obd_create()
+                 is fully the same as old version of this function and this helps
+                 us to avoid code duplicating and layering violating. */
+                OBD_ALLOC(oa, sizeof(*oa));
+                if (!oa)
+                        RETURN(ERR_PTR(-ENOMEM));
+
+                oa->o_gr = FILTER_GROUP_LLOG;
+                oa->o_valid = OBD_MD_FLGENER | OBD_MD_FLGROUP;
+                rc = obd_create(ctxt->loc_exp, oa, NULL, NULL);
+                if (rc)
+                        GOTO(out_free_oa, rc);
 
-        err = llog_fsfilt_commit(ctxt, parent->d_inode, handle, 0);
-        if (!rc)
-                rc = err;
-out_dput:
-        dput(new_child);
-out_close:
-        up(&parent->d_inode->i_sem);
-        if (rc) {
-                filp_close(filp, 0);
-                filp = (struct file *)rc;
+                dchild = obd_lvfs_fid2dentry(ctxt->loc_exp, oa->o_id,
+                                             oa->o_generation, oa->o_gr);
+                if (IS_ERR(dchild))
+                        GOTO(out_free_oa, rc = PTR_ERR(dchild));
+
+                filp = l_dentry_open(&obd->obd_lvfs_ctxt, dchild,
+                                     open_flags);
+                if (IS_ERR(filp)) {
+                        l_dput(dchild);
+                        GOTO(out_free_oa, rc = PTR_ERR(filp));
+                }
+
+                /* group 1 is not longer valid, we use the group which is set 
+                by obd_create()->mds_obd_create(). */
+                lgh_id->lgl_ogr = oa->o_gr;
+                lgh_id->lgl_oid = oa->o_id;
+                lgh_id->lgl_ogen = oa->o_generation;
         }
 
+out_free_oa:
+        if (rc)
+                filp = ERR_PTR(rc);
+        if (oa)
+                OBD_FREE(oa, sizeof(*oa));
         RETURN(filp);
 }
 
+static struct file *
+llog_object_create(struct llog_ctxt *ctxt, struct llog_logid *lgh_id)
+{
+        if (ctxt->loc_alone)
+                return llog_object_create_alone(ctxt, lgh_id);
+        else
+                return llog_object_create_generic(ctxt, lgh_id);
+}
+
 static int llog_add_link_object(struct llog_ctxt *ctxt, struct llog_logid logid,
                                 struct dentry *dentry)
 {
@@ -560,7 +750,7 @@ static int llog_add_link_object(struct llog_ctxt *ctxt, struct llog_logid logid,
         void *handle;
         int namelen, rc = 0, err;
         ENTRY;
-
+        
         namelen = ll_fid2str(fidname, logid.lgl_oid, logid.lgl_ogen);
         down(&ctxt->loc_objects_dir->d_inode->i_sem);
         new_child = lookup_one_len(fidname, ctxt->loc_objects_dir, namelen);
@@ -579,7 +769,7 @@ static int llog_add_link_object(struct llog_ctxt *ctxt, struct llog_logid logid,
                                    FSFILT_OP_LINK, NULL);
         if (IS_ERR(handle))
                 GOTO(out_dput, rc = PTR_ERR(handle));
-
+        
         lock_kernel();
         rc = vfs_link(dentry, ctxt->loc_objects_dir->d_inode, new_child);
         unlock_kernel();
@@ -594,67 +784,64 @@ out:
         RETURN(rc);
 }
 
-/* This is a callback from the llog_* functions.
- * Assumes caller has already pushed us into the kernel context. */
-static int llog_lvfs_create(struct llog_ctxt *ctxt, struct llog_handle **res,
-                            struct llog_logid *logid, char *name)
+static int llog_lvfs_open(struct llog_ctxt *ctxt, struct llog_handle **res,
+                          struct llog_logid *logid, char *name, int flags)
 {
         struct llog_handle *handle;
         struct lvfs_run_ctxt saved;
         int rc = 0;
-        int open_flags = O_RDWR | O_CREAT | O_LARGEFILE;
+        int open_flags = O_RDWR | O_LARGEFILE;
         ENTRY;
 
+        if (flags & OBD_LLOG_FL_CREATE)
+                open_flags |= O_CREAT;
+
         handle = llog_alloc_handle();
         if (handle == NULL)
                 RETURN(-ENOMEM);
         *res = handle;
-
+        
         LASSERT(ctxt);
         if (ctxt->loc_lvfs_ctxt)
                 push_ctxt(&saved, ctxt->loc_lvfs_ctxt, NULL);
-
+        
         if (logid != NULL) {
-                char logname[LL_FID_NAMELEN + 10] = "OBJECTS/";
-                char fidname[LL_FID_NAMELEN];
-                ll_fid2str(fidname, logid->lgl_oid, logid->lgl_ogen);
-                strcat(logname, fidname);
-
-                handle->lgh_file = filp_open(logname, O_RDWR | O_LARGEFILE, 0644);
+                handle->lgh_file = llog_object_create(ctxt, logid);
                 if (IS_ERR(handle->lgh_file)) {
-                        CERROR("cannot open %s file\n", logname);
+                        CERROR("cannot create/open llog object "LPX64":%x "
+                               "error = %ld", logid->lgl_oid, logid->lgl_ogen,
+                               PTR_ERR(handle->lgh_file));
                         GOTO(cleanup, rc = PTR_ERR(handle->lgh_file));
                 }
-                if (!S_ISREG(handle->lgh_file->f_dentry->d_inode->i_mode)) {
-                        CERROR("%s is not a regular file!: mode = %o\n", logname,
-                               handle->lgh_file->f_dentry->d_inode->i_mode);
-                        GOTO(cleanup, rc = -ENOENT);
-                }
-                LASSERT(handle->lgh_file->f_dentry->d_parent == ctxt->loc_objects_dir);
                 handle->lgh_id = *logid;
+
         } else if (name) {
                 handle->lgh_file = llog_filp_open(name, open_flags, 0644);
-                if (IS_ERR(handle->lgh_file))
+                if (IS_ERR(handle->lgh_file)) {
+                        CERROR("cannot open %s file, error = %ld\n", 
+                               name, PTR_ERR(handle->lgh_file));
                         GOTO(cleanup, rc = PTR_ERR(handle->lgh_file));
+                }
                 LASSERT(handle->lgh_file->f_dentry->d_parent == ctxt->loc_logs_dir);
-
+                
+                handle->lgh_id.lgl_ogr = 1;
                 handle->lgh_id.lgl_oid = handle->lgh_file->f_dentry->d_inode->i_ino;
                 handle->lgh_id.lgl_ogen = handle->lgh_file->f_dentry->d_inode->i_generation;
                 rc = llog_add_link_object(ctxt, handle->lgh_id, handle->lgh_file->f_dentry);
                 if (rc)
                         GOTO(cleanup, rc);
+
         } else {
-                handle->lgh_file = llog_object_create(ctxt);
-                if (IS_ERR(handle->lgh_file))
+                handle->lgh_file = llog_object_create(ctxt, &handle->lgh_id);
+                if (IS_ERR(handle->lgh_file)) {
+                        CERROR("cannot create llog object, error = %ld\n", 
+                               PTR_ERR(handle->lgh_file));
                         GOTO(cleanup, rc = PTR_ERR(handle->lgh_file));
-                LASSERT(handle->lgh_file->f_dentry->d_parent == ctxt->loc_objects_dir);
-                handle->lgh_id.lgl_oid = handle->lgh_file->f_dentry->d_inode->i_ino;
-                handle->lgh_id.lgl_ogen = handle->lgh_file->f_dentry->d_inode->i_generation;
+                }
         }
 
-        handle->lgh_id.lgl_ogr = 1;
         handle->lgh_ctxt = ctxt;
- finish:
+finish:
         if (ctxt->loc_lvfs_ctxt)
                 pop_ctxt(&saved, ctxt->loc_lvfs_ctxt, NULL);
         RETURN(rc);
@@ -684,16 +871,16 @@ static int llog_lvfs_destroy(struct llog_handle *loghandle)
         void *handle;
         int rc = -EINVAL, err, namelen;
         ENTRY;
-
+        
         if (ctxt->loc_lvfs_ctxt)
                 push_ctxt(&saved, ctxt->loc_lvfs_ctxt, NULL);
-
+        
         fdentry = loghandle->lgh_file->f_dentry;
         parent_inode = fdentry->d_parent->d_inode;
-
+        
         if (!strcmp(fdentry->d_parent->d_name.name, "LOGS")) {
                 LASSERT(parent_inode == ctxt->loc_logs_dir->d_inode);
-
+                
                 namelen = ll_fid2str(fidname, fdentry->d_inode->i_ino,
                                      fdentry->d_inode->i_generation);
                 dget(fdentry);
@@ -702,19 +889,19 @@ static int llog_lvfs_destroy(struct llog_handle *loghandle)
                         dput(fdentry);
                         GOTO(out, rc);
                 }
-
+                
                 handle = llog_fsfilt_start(ctxt, parent_inode,
                                            FSFILT_OP_UNLINK, NULL);
                 if (IS_ERR(handle)) {
                         dput(fdentry);
                         GOTO(out, rc = PTR_ERR(handle));
                 }
-
+                
                 down(&parent_inode->i_sem);
                 rc = vfs_unlink(parent_inode, fdentry);
                 up(&parent_inode->i_sem);
                 dput(fdentry);
-
+                
                 if (!rc) {
                         down(&ctxt->loc_objects_dir->d_inode->i_sem);
                         fdentry = lookup_one_len(fidname, ctxt->loc_objects_dir,
@@ -732,21 +919,41 @@ out_err:
                 err = llog_fsfilt_commit(ctxt, parent_inode, handle, 0);
                 if (err && !rc)
                         err = rc;
-
+                
                 GOTO(out, rc);
         }
-
-        if (!strcmp(fdentry->d_parent->d_name.name, "OBJECTS")) {
-                LASSERT(parent_inode == ctxt->loc_objects_dir->d_inode);
-
-                dget(fdentry);
-                rc = llog_lvfs_close(loghandle);
-                if (rc == 0) {
-                        down(&parent_inode->i_sem);
-                        rc = vfs_unlink(parent_inode, fdentry);
-                        up(&parent_inode->i_sem);
+        if (ctxt->loc_alone) {
+                if (!strcmp(fdentry->d_parent->d_name.name, "OBJECTS")) {
+                        LASSERT(parent_inode == ctxt->loc_objects_dir->d_inode);
+                        
+                        dget(fdentry);
+                        rc = llog_lvfs_close(loghandle);
+                        if (rc == 0) {
+                                down(&parent_inode->i_sem);
+                                rc = vfs_unlink(parent_inode, fdentry);
+                                up(&parent_inode->i_sem);
+                        }
+                        dput(fdentry);
                 }
-                dput(fdentry);
+        } else {
+                struct obdo *oa = NULL;
+                OBD_ALLOC(oa, sizeof(*oa));
+                if (!oa)
+                        GOTO(out, rc = -ENOMEM);
+                
+                oa->o_id = loghandle->lgh_id.lgl_oid;
+                oa->o_gr = loghandle->lgh_id.lgl_ogr;
+                oa->o_generation = loghandle->lgh_id.lgl_ogen;
+                oa->o_valid = OBD_MD_FLID | OBD_MD_FLGROUP | OBD_MD_FLGENER;
+                
+                rc = llog_lvfs_close(loghandle);
+                if (rc)
+                        GOTO(out_free_oa, rc);
+                
+                rc = obd_destroy(loghandle->lgh_ctxt->loc_exp, oa, NULL, NULL);
+out_free_oa:
+                OBD_FREE(oa, sizeof(*oa));
         }
 out:
         if (ctxt->loc_lvfs_ctxt)
@@ -845,7 +1052,7 @@ int llog_put_cat_list(struct lvfs_run_ctxt *ctxt,
 EXPORT_SYMBOL(llog_put_cat_list);
 
 struct llog_operations llog_lvfs_ops = {
-        lop_create:      llog_lvfs_create,
+        lop_open:        llog_lvfs_open,
         lop_destroy:     llog_lvfs_destroy,
         lop_close:       llog_lvfs_close,
         lop_read_header: llog_lvfs_read_header,
@@ -872,8 +1079,8 @@ static int llog_lvfs_write_rec(struct llog_handle *loghandle,
         return 0;
 }
 
-static int llog_lvfs_create(struct llog_ctxt *ctxt, struct llog_handle **res,
-                            struct llog_logid *logid, char *name)
+static int llog_lvfs_open(struct llog_ctxt *ctxt, struct llog_handle **res,
+                          struct llog_logid *logid, char *name, int flags)
 {
         LBUG();
         return 0;
@@ -922,7 +1129,7 @@ int llog_lvfs_next_block(struct llog_handle *loghandle, int *curr_idx,
 }
 
 struct llog_operations llog_lvfs_ops = {
-        lop_create:      llog_lvfs_create,
+        lop_open:        llog_lvfs_open,
         lop_destroy:     llog_lvfs_destroy,
         lop_close:       llog_lvfs_close,
         lop_read_header: llog_lvfs_read_header,