Whamcloud - gitweb
b=3550
[fs/lustre-release.git] / lustre / lvfs / llog_lvfs.c
index 6a4c74f..5f57b64 100644 (file)
@@ -277,8 +277,15 @@ static int llog_lvfs_write_rec(struct llog_handle *loghandle,
                 RETURN(rc);
 
         if (rc == 0 && reccookie) {
-                reccookie->lgc_lgl = loghandle->lgh_id;
-                reccookie->lgc_index = index;
+                if (llog_cookie_get_flags(reccookie) & LLOG_COOKIE_REPLAY) {
+                        LASSERT(EQ_LOGID(reccookie->lgc_lgl, loghandle->lgh_id));
+                        LASSERT(reccookie->lgc_index == index);        
+                } else {
+                        reccookie->lgc_lgl = loghandle->lgh_id;
+                        reccookie->lgc_index = index;
+                        llog_cookie_add_flags(reccookie, LLOG_COOKIE_REPLAY);
+                }
+
                 if (le32_to_cpu(rec->lrh_type) == MDS_UNLINK_REC)
                         reccookie->lgc_subsys = LLOG_UNLINK_ORIG_CTXT;
                 else if (le32_to_cpu(rec->lrh_type) == OST_SZ_REC)
@@ -491,128 +498,228 @@ static struct file *llog_filp_open(char *name, int flags, int mode)
 static struct file *
 llog_object_create_alone(struct llog_ctxt *ctxt, struct llog_logid *lgh_id)
 {
-        unsigned int tmpname = ll_insecure_random_int();
-        char fidname[LL_FID_NAMELEN];
         struct file *filp;
-        struct dentry *new_child, *parent;
-        void *handle;
-        int rc = 0, err, namelen;
+        int rc = 0;
         ENTRY;
 
-        sprintf(fidname, "OBJECTS/%u", tmpname);
-        filp = filp_open(fidname, O_CREAT | O_EXCL, 0644);
-        if (IS_ERR(filp)) {
-                rc = PTR_ERR(filp);
-                if (rc == -EEXIST) {
-                        CERROR("impossible object name collision %u\n",
-                               tmpname);
-                        LBUG();
+        LASSERT(lgh_id != NULL);
+        if (lgh_id->lgl_oid) {
+                struct dentry *dchild;
+                char fidname[LL_FID_NAMELEN];
+                int fidlen = 0;
+
+                down(&ctxt->loc_objects_dir->d_inode->i_sem);
+                fidlen = ll_fid2str(fidname, lgh_id->lgl_oid, lgh_id->lgl_ogen);
+                dchild = lookup_one_len(fidname, ctxt->loc_objects_dir, fidlen);
+                if (IS_ERR(dchild)) {
+                        up(&ctxt->loc_objects_dir->d_inode->i_sem);
+                        RETURN((struct file *)dchild);
                 }
-                CERROR("error creating tmp object %u: rc %d\n", tmpname, rc);
+                if (dchild->d_inode == NULL) {
+                        struct dentry_params dp;
+                        struct inode *inode;
+
+                        dchild->d_fsdata = (void *) &dp;
+                        dp.p_ptr = NULL;
+                        dp.p_inum = lgh_id->lgl_oid;
+                        rc = ll_vfs_create(ctxt->loc_objects_dir->d_inode,
+                                           dchild, S_IFREG, NULL);
+                        if (dchild->d_fsdata == (void *)(unsigned long)lgh_id->lgl_oid)
+                                dchild->d_fsdata = NULL;
+                        if (rc) {
+                                CDEBUG(D_INODE, "err during create: %d\n", rc);
+                                dput(dchild);
+                                up(&ctxt->loc_objects_dir->d_inode->i_sem);
+                                RETURN(ERR_PTR(rc));
+                        }
+                        inode = dchild->d_inode;
+                        LASSERT(inode->i_ino == lgh_id->lgl_oid);
+                        inode->i_generation = lgh_id->lgl_ogen;
+                        CDEBUG(D_HA, "recreated ino %lu with gen %u\n",
+                               inode->i_ino, inode->i_generation);
+                        mark_inode_dirty(inode);
+                }
+
+                mntget(ctxt->loc_lvfs_ctxt->pwdmnt);
+                filp = dentry_open(dchild, ctxt->loc_lvfs_ctxt->pwdmnt,
+                                    O_RDWR | O_LARGEFILE);
+                if (IS_ERR(filp)) {
+                        dput(dchild);
+                        up(&ctxt->loc_objects_dir->d_inode->i_sem);
+                        RETURN(filp);
+                }
+                if (!S_ISREG(filp->f_dentry->d_inode->i_mode)) {
+                        CERROR("%s is not a regular file!: mode = %o\n", fidname,
+                               filp->f_dentry->d_inode->i_mode);
+                        filp_close(filp, 0);
+                        up(&ctxt->loc_objects_dir->d_inode->i_sem);
+                        RETURN(ERR_PTR(-ENOENT));
+                }
+
+                up(&ctxt->loc_objects_dir->d_inode->i_sem);
                 RETURN(filp);
-        }
 
-        namelen = ll_fid2str(fidname, filp->f_dentry->d_inode->i_ino,
-                             filp->f_dentry->d_inode->i_generation);
-        parent = filp->f_dentry->d_parent;
-        down(&parent->d_inode->i_sem);
-        new_child = lookup_one_len(fidname, parent, namelen);
-        if (IS_ERR(new_child)) {
-                CERROR("getting neg dentry for obj rename: %d\n", rc);
-                GOTO(out_close, rc = PTR_ERR(new_child));
-        }
-        if (new_child->d_inode != NULL) {
-                CERROR("impossible non-negative obj dentry %lu:%u!\n",
-                       filp->f_dentry->d_inode->i_ino,
-                       filp->f_dentry->d_inode->i_generation);
-                LBUG();
-        }
+        } else {
+                unsigned int tmpname = ll_insecure_random_int();
+                char fidname[LL_FID_NAMELEN];
+                struct dentry *new_child, *parent;
+                void *handle;
+                int err, namelen;
 
-        handle = llog_fsfilt_start(ctxt, parent->d_inode, FSFILT_OP_RENAME, NULL);
-        if (IS_ERR(handle))
-                GOTO(out_dput, rc = PTR_ERR(handle));
+                sprintf(fidname, "OBJECTS/%u", tmpname);
+                filp = filp_open(fidname, O_CREAT | O_EXCL, 0644);
+                if (IS_ERR(filp)) {
+                        rc = PTR_ERR(filp);
+                        if (rc == -EEXIST) {
+                                CERROR("impossible object name collision %u\n",
+                                        tmpname);
+                                LBUG();
+                        }
+                        CERROR("error creating tmp object %u: rc %d\n", tmpname, rc);
+                        RETURN(filp);
+                }
 
-        lock_kernel();
-        rc = vfs_rename(parent->d_inode, filp->f_dentry,
-                        parent->d_inode, new_child);
-        unlock_kernel();
-        if (rc)
-                CERROR("error renaming new object %lu:%u: rc %d\n",
-                       filp->f_dentry->d_inode->i_ino,
-                       filp->f_dentry->d_inode->i_generation, rc);
+                namelen = ll_fid2str(fidname, filp->f_dentry->d_inode->i_ino,
+                                     filp->f_dentry->d_inode->i_generation);
+                parent = filp->f_dentry->d_parent;
+                down(&parent->d_inode->i_sem);
+                new_child = lookup_one_len(fidname, parent, namelen);
+                if (IS_ERR(new_child)) {
+                        CERROR("getting neg dentry for obj rename: %d\n", rc);
+                        GOTO(out_close, rc = PTR_ERR(new_child));
+                }
+                if (new_child->d_inode != NULL) {
+                        CERROR("impossible non-negative obj dentry %lu:%u!\n",
+                                filp->f_dentry->d_inode->i_ino,
+                                filp->f_dentry->d_inode->i_generation);
+                        LBUG();
+                }
 
-        err = llog_fsfilt_commit(ctxt, parent->d_inode, handle, 0);
-        if (!rc)
-                rc = err;
+                handle = llog_fsfilt_start(ctxt, parent->d_inode, FSFILT_OP_RENAME, NULL);
+                if (IS_ERR(handle))
+                        GOTO(out_dput, rc = PTR_ERR(handle));
 
-out_dput:
-        dput(new_child);
-out_close:
-        up(&parent->d_inode->i_sem);
-        if (rc) {
-                filp_close(filp, 0);
-                filp = (struct file *)rc;
-        } else {
-                /* FIXME: is this group 1 is correct? */
-                lgh_id->lgl_ogr = 1;
-                lgh_id->lgl_oid = filp->f_dentry->d_inode->i_ino;
-                lgh_id->lgl_ogen = filp->f_dentry->d_inode->i_generation;
+                lock_kernel();
+                rc = vfs_rename(parent->d_inode, filp->f_dentry,
+                                parent->d_inode, new_child);
+                unlock_kernel();
+                if (rc)
+                        CERROR("error renaming new object %lu:%u: rc %d\n",
+                                filp->f_dentry->d_inode->i_ino,
+                                filp->f_dentry->d_inode->i_generation, rc);
+
+                err = llog_fsfilt_commit(ctxt, parent->d_inode, handle, 0);
+                if (!rc)
+                        rc = err;
+
+        out_dput:
+                dput(new_child);
+        out_close:
+                up(&parent->d_inode->i_sem);
+                if (rc) {
+                        filp_close(filp, 0);
+                        filp = ERR_PTR(rc);
+                } else {
+                        /* FIXME: is this group 1 is correct? */
+                        lgh_id->lgl_ogr = 1;
+                        lgh_id->lgl_oid = filp->f_dentry->d_inode->i_ino;
+                        lgh_id->lgl_ogen = filp->f_dentry->d_inode->i_generation;
+                }
+                RETURN(filp);
         }
-
-        RETURN(filp);
 }
 
 /* creates object for generic case (obd exists) */
 static struct file *
 llog_object_create_generic(struct llog_ctxt *ctxt, struct llog_logid *lgh_id)
 {
-        int rc = 0;
         struct file *filp;
         struct dentry *dchild;
         struct obd_device *obd;
         struct obdo *oa = NULL;
-        int open_flags = O_RDWR | O_CREAT | O_LARGEFILE;
+        int open_flags = O_RDWR | O_LARGEFILE;
+        int rc = 0;
         ENTRY;
-        
+
         obd = ctxt->loc_exp->exp_obd;
+        LASSERT(obd != NULL);
+
+        if (lgh_id->lgl_oid) {
+                dchild = obd_lvfs_fid2dentry(ctxt->loc_exp, lgh_id->lgl_oid,
+                                             lgh_id->lgl_ogen, lgh_id->lgl_ogr);
+                if (IS_ERR(dchild) == -ENOENT) {
+                        OBD_ALLOC(oa, sizeof(*oa));
+                        if (!oa)
+                                RETURN(ERR_PTR(-ENOMEM));
+
+                        oa->o_id = lgh_id->lgl_oid;
+                        oa->o_generation = lgh_id->lgl_ogen;
+                        oa->o_gr = lgh_id->lgl_ogr;
+                        oa->o_valid = OBD_MD_FLGENER | OBD_MD_FLGROUP;
+                        rc = obd_create(ctxt->loc_exp, oa, NULL, NULL);
+                        if (rc) {
+                                CDEBUG(D_INODE, "err during create: %d\n", rc);
+                                GOTO(out_free_oa, rc);
+                        }
+                        CDEBUG(D_HA, "re-create log object "LPX64":0x%x:"LPX64"\n",
+                               lgh_id->lgl_oid, lgh_id->lgl_ogen, lgh_id->lgl_ogr);
+
+                        dchild = obd_lvfs_fid2dentry(ctxt->loc_exp, lgh_id->lgl_oid,
+                                                     lgh_id->lgl_ogen, lgh_id->lgl_ogr);
+                } else if (IS_ERR(dchild)) {
+                        CERROR("error looking up logfile "LPX64":0x%x: rc %d\n",
+                               lgh_id->lgl_oid, lgh_id->lgl_ogen, rc);
+                        RETURN((struct file *)dchild);
+                }
 
-        /* this is important to work here over obd_create() as it manages 
-           groups and we need it. Yet another reason is that mds_obd_create()
-           is fully the same as old version of this function and this helps
-           us to avoid code duplicating and layering violating. */
-        OBD_ALLOC(oa, sizeof(*oa));
-        if (!oa)
-                RETURN(ERR_PTR(-ENOMEM));
-                
-        oa->o_gr = FILTER_GROUP_LLOG;
-        oa->o_valid = OBD_MD_FLGENER | OBD_MD_FLGROUP;
-        rc = obd_create(ctxt->loc_exp, oa, NULL, NULL);
-        if (rc)
+                filp = l_dentry_open(&obd->obd_lvfs_ctxt, dchild, open_flags);
+                if (IS_ERR(filp)) {
+                        l_dput(dchild);
+                        rc = PTR_ERR(filp);
+                        CERROR("error opening logfile "LPX64"0x%x: rc %d\n",
+                               lgh_id->lgl_oid, lgh_id->lgl_ogen, rc);
+                }
                 GOTO(out_free_oa, rc);
-
-        dchild = obd_lvfs_fid2dentry(ctxt->loc_exp, oa->o_id,
-                                     oa->o_generation, oa->o_gr);
-        if (IS_ERR(dchild))
-                GOTO(out_free_oa, rc = PTR_ERR(dchild));
+        } else {
+                /* this is important to work here over obd_create() as it manages 
+                  groups and we need it. Yet another reason is that mds_obd_create()
+                 is fully the same as old version of this function and this helps
+                 us to avoid code duplicating and layering violating. */
+                OBD_ALLOC(oa, sizeof(*oa));
+                if (!oa)
+                        RETURN(ERR_PTR(-ENOMEM));
                 
-        filp = l_dentry_open(&obd->obd_lvfs_ctxt, dchild,
-                             open_flags);
-        if (IS_ERR(filp)) {
-                l_dput(dchild);
-                GOTO(out_free_oa, rc = PTR_ERR(filp));
+                oa->o_gr = FILTER_GROUP_LLOG;
+                oa->o_valid = OBD_MD_FLGENER | OBD_MD_FLGROUP;
+                rc = obd_create(ctxt->loc_exp, oa, NULL, NULL);
+                if (rc)
+                        GOTO(out_free_oa, rc);
+
+                dchild = obd_lvfs_fid2dentry(ctxt->loc_exp, oa->o_id,
+                                             oa->o_generation, oa->o_gr);
+                if (IS_ERR(dchild))
+                        GOTO(out_free_oa, rc = PTR_ERR(dchild));
+
+                filp = l_dentry_open(&obd->obd_lvfs_ctxt, dchild,
+                                     open_flags);
+                if (IS_ERR(filp)) {
+                        l_dput(dchild);
+                        GOTO(out_free_oa, rc = PTR_ERR(filp));
+                }
+
+                /* group 1 is not longer valid, we use the group which is set 
+                by obd_create()->mds_obd_create(). */
+                lgh_id->lgl_ogr = oa->o_gr;
+                lgh_id->lgl_oid = oa->o_id;
+                lgh_id->lgl_ogen = oa->o_generation;
         }
 
-        /* group 1 is not longer valid, we use the group which is set 
-           by obd_create()->mds_obd_create(). */
-        lgh_id->lgl_ogr = oa->o_gr;
-        lgh_id->lgl_oid = oa->o_id;
-        lgh_id->lgl_ogen = oa->o_generation;
-        OBD_FREE(oa, sizeof(*oa));
-        RETURN(filp);
-        
 out_free_oa:
-        OBD_FREE(oa, sizeof(*oa));
-        RETURN(ERR_PTR(rc));
+        if (rc)
+                filp = ERR_PTR(rc);
+        if (oa)
+                OBD_FREE(oa, sizeof(*oa));
+        RETURN(filp);
 }
 
 static struct file *
@@ -688,24 +795,15 @@ static int llog_lvfs_open(struct llog_ctxt *ctxt, struct llog_handle **res,
                 push_ctxt(&saved, ctxt->loc_lvfs_ctxt, NULL);
         
         if (logid != NULL) {
-                char logname[LL_FID_NAMELEN + 10] = "OBJECTS/";
-                char fidname[LL_FID_NAMELEN];
-                ll_fid2str(fidname, logid->lgl_oid, logid->lgl_ogen);
-                strcat(logname, fidname);
-                
-                handle->lgh_file = filp_open(logname, O_RDWR | O_LARGEFILE, 0644);
+                handle->lgh_file = llog_object_create(ctxt, logid);
                 if (IS_ERR(handle->lgh_file)) {
-                        CERROR("cannot open %s file, error = %ld\n", 
-                               logname, PTR_ERR(handle->lgh_file));
+                        CERROR("cannot create/open llog object "LPX64":%x "
+                               "error = %ld", logid->lgl_oid, logid->lgl_ogen,
+                               PTR_ERR(handle->lgh_file));
                         GOTO(cleanup, rc = PTR_ERR(handle->lgh_file));
                 }
-                if (!S_ISREG(handle->lgh_file->f_dentry->d_inode->i_mode)) {
-                        CERROR("%s is not a regular file!: mode = %o\n", logname,
-                               handle->lgh_file->f_dentry->d_inode->i_mode);
-                        GOTO(cleanup, rc = -ENOENT);
-                }
-                LASSERT(handle->lgh_file->f_dentry->d_parent == ctxt->loc_objects_dir);
                 handle->lgh_id = *logid;
+
         } else if (name) {
                 handle->lgh_file = llog_filp_open(name, open_flags, 0644);
                 if (IS_ERR(handle->lgh_file)) {
@@ -721,6 +819,7 @@ static int llog_lvfs_open(struct llog_ctxt *ctxt, struct llog_handle **res,
                 rc = llog_add_link_object(ctxt, handle->lgh_id, handle->lgh_file->f_dentry);
                 if (rc)
                         GOTO(cleanup, rc);
+
         } else {
                 handle->lgh_file = llog_object_create(ctxt, &handle->lgh_id);
                 if (IS_ERR(handle->lgh_file)) {