Whamcloud - gitweb
Add flock support.
[fs/lustre-release.git] / lustre / lvfs / llog_lvfs.c
index 6b70cf1..938cfc3 100644 (file)
@@ -204,7 +204,7 @@ static int llog_lvfs_write_rec(struct llog_handle *loghandle,
                 loff_t saved_offset;
 
                 /* no header: only allowed to insert record 1 */
-                if (idx != 1 && !file->f_dentry->d_inode->i_size) {
+                if (idx > 1 && !file->f_dentry->d_inode->i_size) {
                         CERROR("idx != -1 in empty log\n");
                         LBUG();
                 }
@@ -246,6 +246,9 @@ static int llog_lvfs_write_rec(struct llog_handle *loghandle,
                 rc = llog_lvfs_pad(ctxt, file, left, loghandle->lgh_last_idx);
                 if (rc)
                         RETURN(rc);
+                /* if it's the last idx in log file, then return -ENOSPC */
+                if (loghandle->lgh_last_idx == LLOG_BITMAP_SIZE(llh) - 1)
+                        RETURN(-ENOSPC);
         }
 
         loghandle->lgh_last_idx++;
@@ -268,15 +271,32 @@ static int llog_lvfs_write_rec(struct llog_handle *loghandle,
         if (rc)
                 RETURN(rc);
 
+        CDEBUG(D_HA, "adding record "LPX64": idx: %u, %u bytes off: %lld\n",
+               loghandle->lgh_id.lgl_oid, index, le32_to_cpu(rec->lrh_len),
+               file->f_pos);
+
         rc = llog_lvfs_write_blob(ctxt, file, rec, buf, file->f_pos);
         if (rc)
                 RETURN(rc);
 
-        CDEBUG(D_HA, "added record "LPX64": idx: %u, %u bytes\n",
-               loghandle->lgh_id.lgl_oid, index, le32_to_cpu(rec->lrh_len));
         if (rc == 0 && reccookie) {
-                reccookie->lgc_lgl = loghandle->lgh_id;
-                reccookie->lgc_index = index;
+                if (llog_cookie_get_flags(reccookie) & LLOG_COOKIE_REPLAY) {
+                        LASSERTF(EQ_LOGID(reccookie->lgc_lgl,loghandle->lgh_id),
+                                 "lgc_lgl.oid/gr "LPU64"/"LPU64" lgh_id.oid/gr"
+                                 LPU64"/"LPU64"\n",
+                                 reccookie->lgc_lgl.lgl_oid,
+                                 reccookie->lgc_lgl.lgl_ogr,
+                                 loghandle->lgh_id.lgl_oid,
+                                 loghandle->lgh_id.lgl_oid);
+                        LASSERTF(reccookie->lgc_index == index,
+                                 "lgc_index %u != index %u\n",
+                                 reccookie->lgc_index, index);
+                } else {
+                        reccookie->lgc_lgl = loghandle->lgh_id;
+                        reccookie->lgc_index = index;
+                        llog_cookie_add_flags(reccookie, LLOG_COOKIE_REPLAY);
+                }
+
                 if (le32_to_cpu(rec->lrh_type) == MDS_UNLINK_REC)
                         reccookie->lgc_subsys = LLOG_UNLINK_ORIG_CTXT;
                 else if (le32_to_cpu(rec->lrh_type) == OST_SZ_REC)
@@ -318,7 +338,6 @@ static int llog_lvfs_next_block(struct llog_handle *loghandle, int *curr_idx,
                                 int len)
 {
         struct llog_ctxt *ctxt = loghandle->lgh_ctxt;
-        int rc;
         ENTRY;
 
         if (len == 0 || len & (LLOG_CHUNK_SIZE - 1))
@@ -331,6 +350,7 @@ static int llog_lvfs_next_block(struct llog_handle *loghandle, int *curr_idx,
                 struct llog_rec_hdr *rec;
                 struct llog_rec_tail *tail;
                 loff_t ppos;
+                int nbytes, rc;
 
                 llog_skip_over(curr_offset, *curr_idx, next_idx);
 
@@ -347,21 +367,20 @@ static int llog_lvfs_next_block(struct llog_handle *loghandle, int *curr_idx,
                         RETURN(rc);
                 }
 
-                /* put number of bytes read into rc to make code simpler */
-                rc = ppos - *curr_offset;
+                nbytes = ppos - *curr_offset;
                 *curr_offset = ppos;
 
-                if (rc == 0) /* end of file, nothing to do */
+                if (nbytes == 0) /* end of file, nothing to do */
                         RETURN(0);
 
-                if (rc < sizeof(*tail)) {
+                if (nbytes < sizeof(*tail)) {
                         CERROR("Invalid llog block at log id "LPU64"/%u offset "
                                LPU64"\n", loghandle->lgh_id.lgl_oid,
                                loghandle->lgh_id.lgl_ogen, *curr_offset);
                         RETURN(-EINVAL);
                 }
 
-                tail = buf + rc - sizeof(struct llog_rec_tail);
+                tail = buf + nbytes - sizeof(struct llog_rec_tail);
                 *curr_idx = le32_to_cpu(tail->lrt_index);
 
                 /* this shouldn't happen */
@@ -476,7 +495,8 @@ static struct file *llog_filp_open(char *name, int flags, int mode)
         } else {
                 filp = l_filp_open(logname, flags, mode);
                 if (IS_ERR(filp)) {
-                        CERROR("logfile creation %s: %ld\n", logname,
+                        CERROR("logfile %s(%s): %ld\n",
+                               flags & O_CREAT ? "create" : "open", logname,
                                PTR_ERR(filp));
                 }
         }
@@ -489,128 +509,228 @@ static struct file *llog_filp_open(char *name, int flags, int mode)
 static struct file *
 llog_object_create_alone(struct llog_ctxt *ctxt, struct llog_logid *lgh_id)
 {
-        unsigned int tmpname = ll_insecure_random_int();
-        char fidname[LL_FID_NAMELEN];
         struct file *filp;
-        struct dentry *new_child, *parent;
-        void *handle;
-        int rc = 0, err, namelen;
+        int rc = 0;
         ENTRY;
 
-        sprintf(fidname, "OBJECTS/%u", tmpname);
-        filp = filp_open(fidname, O_CREAT | O_EXCL, 0644);
-        if (IS_ERR(filp)) {
-                rc = PTR_ERR(filp);
-                if (rc == -EEXIST) {
-                        CERROR("impossible object name collision %u\n",
-                               tmpname);
-                        LBUG();
+        LASSERT(lgh_id != NULL);
+        if (lgh_id->lgl_oid) {
+                struct dentry *dchild;
+                char fidname[LL_FID_NAMELEN];
+                int fidlen = 0;
+
+                down(&ctxt->loc_objects_dir->d_inode->i_sem);
+                fidlen = ll_fid2str(fidname, lgh_id->lgl_oid, lgh_id->lgl_ogen);
+                dchild = lookup_one_len(fidname, ctxt->loc_objects_dir, fidlen);
+                if (IS_ERR(dchild)) {
+                        up(&ctxt->loc_objects_dir->d_inode->i_sem);
+                        RETURN((struct file *)dchild);
+                }
+                if (dchild->d_inode == NULL) {
+                        struct dentry_params dp;
+                        struct inode *inode;
+
+                        dchild->d_fsdata = (void *) &dp;
+                        dp.p_ptr = NULL;
+                        dp.p_inum = lgh_id->lgl_oid;
+                        rc = ll_vfs_create(ctxt->loc_objects_dir->d_inode,
+                                           dchild, S_IFREG, NULL);
+                        if (dchild->d_fsdata == (void *)(unsigned long)lgh_id->lgl_oid)
+                                dchild->d_fsdata = NULL;
+                        if (rc) {
+                                CDEBUG(D_INODE, "err during create: %d\n", rc);
+                                dput(dchild);
+                                up(&ctxt->loc_objects_dir->d_inode->i_sem);
+                                RETURN(ERR_PTR(rc));
+                        }
+                        inode = dchild->d_inode;
+                        LASSERT(inode->i_ino == lgh_id->lgl_oid);
+                        inode->i_generation = lgh_id->lgl_ogen;
+                        CDEBUG(D_HA, "recreated ino %lu with gen %u\n",
+                               inode->i_ino, inode->i_generation);
+                        mark_inode_dirty(inode);
+                }
+
+                mntget(ctxt->loc_lvfs_ctxt->pwdmnt);
+                filp = dentry_open(dchild, ctxt->loc_lvfs_ctxt->pwdmnt,
+                                    O_RDWR | O_LARGEFILE);
+                if (IS_ERR(filp)) {
+                        dput(dchild);
+                        up(&ctxt->loc_objects_dir->d_inode->i_sem);
+                        RETURN(filp);
+                }
+                if (!S_ISREG(filp->f_dentry->d_inode->i_mode)) {
+                        CERROR("%s is not a regular file!: mode = %o\n", fidname,
+                               filp->f_dentry->d_inode->i_mode);
+                        filp_close(filp, 0);
+                        up(&ctxt->loc_objects_dir->d_inode->i_sem);
+                        RETURN(ERR_PTR(-ENOENT));
                 }
-                CERROR("error creating tmp object %u: rc %d\n", tmpname, rc);
+
+                up(&ctxt->loc_objects_dir->d_inode->i_sem);
                 RETURN(filp);
-        }
 
-        namelen = ll_fid2str(fidname, filp->f_dentry->d_inode->i_ino,
-                             filp->f_dentry->d_inode->i_generation);
-        parent = filp->f_dentry->d_parent;
-        down(&parent->d_inode->i_sem);
-        new_child = lookup_one_len(fidname, parent, namelen);
-        if (IS_ERR(new_child)) {
-                CERROR("getting neg dentry for obj rename: %d\n", rc);
-                GOTO(out_close, rc = PTR_ERR(new_child));
-        }
-        if (new_child->d_inode != NULL) {
-                CERROR("impossible non-negative obj dentry %lu:%u!\n",
-                       filp->f_dentry->d_inode->i_ino,
-                       filp->f_dentry->d_inode->i_generation);
-                LBUG();
-        }
+        } else {
+                unsigned int tmpname = ll_insecure_random_int();
+                char fidname[LL_FID_NAMELEN];
+                struct dentry *new_child, *parent;
+                void *handle;
+                int err, namelen;
 
-        handle = llog_fsfilt_start(ctxt, parent->d_inode, FSFILT_OP_RENAME, NULL);
-        if (IS_ERR(handle))
-                GOTO(out_dput, rc = PTR_ERR(handle));
+                sprintf(fidname, "OBJECTS/%u", tmpname);
+                filp = filp_open(fidname, O_CREAT | O_EXCL, 0644);
+                if (IS_ERR(filp)) {
+                        rc = PTR_ERR(filp);
+                        if (rc == -EEXIST) {
+                                CERROR("impossible object name collision %u\n",
+                                        tmpname);
+                                LBUG();
+                        }
+                        CERROR("error creating tmp object %u: rc %d\n", tmpname, rc);
+                        RETURN(filp);
+                }
 
-        lock_kernel();
-        rc = vfs_rename(parent->d_inode, filp->f_dentry,
-                        parent->d_inode, new_child);
-        unlock_kernel();
-        if (rc)
-                CERROR("error renaming new object %lu:%u: rc %d\n",
-                       filp->f_dentry->d_inode->i_ino,
-                       filp->f_dentry->d_inode->i_generation, rc);
+                namelen = ll_fid2str(fidname, filp->f_dentry->d_inode->i_ino,
+                                     filp->f_dentry->d_inode->i_generation);
+                parent = filp->f_dentry->d_parent;
+                down(&parent->d_inode->i_sem);
+                new_child = lookup_one_len(fidname, parent, namelen);
+                if (IS_ERR(new_child)) {
+                        CERROR("getting neg dentry for obj rename: %d\n", rc);
+                        GOTO(out_close, rc = PTR_ERR(new_child));
+                }
+                if (new_child->d_inode != NULL) {
+                        CERROR("impossible non-negative obj dentry %lu:%u!\n",
+                                filp->f_dentry->d_inode->i_ino,
+                                filp->f_dentry->d_inode->i_generation);
+                        LBUG();
+                }
 
-        err = llog_fsfilt_commit(ctxt, parent->d_inode, handle, 0);
-        if (!rc)
-                rc = err;
+                handle = llog_fsfilt_start(ctxt, parent->d_inode, FSFILT_OP_RENAME, NULL);
+                if (IS_ERR(handle))
+                        GOTO(out_dput, rc = PTR_ERR(handle));
 
-out_dput:
-        dput(new_child);
-out_close:
-        up(&parent->d_inode->i_sem);
-        if (rc) {
-                filp_close(filp, 0);
-                filp = (struct file *)rc;
-        } else {
-                /* FIXME: is this group 1 is correct? */
-                lgh_id->lgl_ogr = 1;
-                lgh_id->lgl_oid = filp->f_dentry->d_inode->i_ino;
-                lgh_id->lgl_ogen = filp->f_dentry->d_inode->i_generation;
+                lock_kernel();
+                rc = vfs_rename(parent->d_inode, filp->f_dentry,
+                                parent->d_inode, new_child);
+                unlock_kernel();
+                if (rc)
+                        CERROR("error renaming new object %lu:%u: rc %d\n",
+                                filp->f_dentry->d_inode->i_ino,
+                                filp->f_dentry->d_inode->i_generation, rc);
+
+                err = llog_fsfilt_commit(ctxt, parent->d_inode, handle, 0);
+                if (!rc)
+                        rc = err;
+
+        out_dput:
+                dput(new_child);
+        out_close:
+                up(&parent->d_inode->i_sem);
+                if (rc) {
+                        filp_close(filp, 0);
+                        filp = ERR_PTR(rc);
+                } else {
+                        /* FIXME: is this group 1 is correct? */
+                        lgh_id->lgl_ogr = 1;
+                        lgh_id->lgl_oid = filp->f_dentry->d_inode->i_ino;
+                        lgh_id->lgl_ogen = filp->f_dentry->d_inode->i_generation;
+                }
+                RETURN(filp);
         }
-
-        RETURN(filp);
 }
 
 /* creates object for generic case (obd exists) */
 static struct file *
 llog_object_create_generic(struct llog_ctxt *ctxt, struct llog_logid *lgh_id)
 {
-        int rc = 0;
-        struct file *filp;
+        struct file *filp = NULL;
         struct dentry *dchild;
         struct obd_device *obd;
         struct obdo *oa = NULL;
-        int open_flags = O_RDWR | O_CREAT | O_LARGEFILE;
+        int open_flags = O_RDWR | O_LARGEFILE;
+        int rc = 0;
         ENTRY;
-        
+
         obd = ctxt->loc_exp->exp_obd;
+        LASSERT(obd != NULL);
+
+        if (lgh_id->lgl_oid) {
+                dchild = obd_lvfs_fid2dentry(ctxt->loc_exp, lgh_id->lgl_oid,
+                                             lgh_id->lgl_ogen, lgh_id->lgl_ogr);
+                if (IS_ERR(dchild) == -ENOENT) {
+                        OBD_ALLOC(oa, sizeof(*oa));
+                        if (!oa)
+                                RETURN(ERR_PTR(-ENOMEM));
+
+                        oa->o_id = lgh_id->lgl_oid;
+                        oa->o_generation = lgh_id->lgl_ogen;
+                        oa->o_gr = lgh_id->lgl_ogr;
+                        oa->o_valid = OBD_MD_FLGENER | OBD_MD_FLGROUP;
+                        rc = obd_create(ctxt->loc_exp, oa, NULL, NULL);
+                        if (rc) {
+                                CDEBUG(D_INODE, "err during create: %d\n", rc);
+                                GOTO(out_free_oa, rc);
+                        }
+                        CDEBUG(D_HA, "re-create log object "LPX64":0x%x:"LPX64"\n",
+                               lgh_id->lgl_oid, lgh_id->lgl_ogen, lgh_id->lgl_ogr);
+
+                        dchild = obd_lvfs_fid2dentry(ctxt->loc_exp, lgh_id->lgl_oid,
+                                                     lgh_id->lgl_ogen, lgh_id->lgl_ogr);
+                } else if (IS_ERR(dchild)) {
+                        CERROR("error looking up logfile "LPX64":0x%x: rc %d\n",
+                               lgh_id->lgl_oid, lgh_id->lgl_ogen, rc);
+                        RETURN((struct file *)dchild);
+                }
 
-        /* this is important to work here over obd_create() as it manages 
-           groups and we need it. Yet another reason is that mds_obd_create()
-           is fully the same as old version of this function and this helps
-           us to avoid code duplicating and layering violating. */
-        OBD_ALLOC(oa, sizeof(*oa));
-        if (!oa)
-                RETURN(ERR_PTR(-ENOMEM));
-                
-        oa->o_gr = FILTER_GROUP_LLOG;
-        oa->o_valid = OBD_MD_FLGENER | OBD_MD_FLGROUP;
-        rc = obd_create(ctxt->loc_exp, oa, NULL, NULL);
-        if (rc)
+                filp = l_dentry_open(&obd->obd_lvfs_ctxt, dchild, open_flags);
+                if (IS_ERR(filp)) {
+                        l_dput(dchild);
+                        rc = PTR_ERR(filp);
+                        CERROR("error opening logfile "LPX64"0x%x: rc %d\n",
+                               lgh_id->lgl_oid, lgh_id->lgl_ogen, rc);
+                }
                 GOTO(out_free_oa, rc);
+        } else {
+                /* this is important to work here over obd_create() as it manages 
+                  groups and we need it. Yet another reason is that mds_obd_create()
+                 is fully the same as old version of this function and this helps
+                 us to avoid code duplicating and layering violating. */
+                OBD_ALLOC(oa, sizeof(*oa));
+                if (!oa)
+                        RETURN(ERR_PTR(-ENOMEM));
 
-        dchild = obd_lvfs_fid2dentry(ctxt->loc_exp, oa->o_id,
-                                     oa->o_generation, oa->o_gr);
-        if (IS_ERR(dchild))
-                GOTO(out_free_oa, rc = PTR_ERR(dchild));
-                
-        filp = l_dentry_open(&obd->obd_lvfs_ctxt, dchild,
-                             open_flags);
-        if (IS_ERR(filp)) {
-                l_dput(dchild);
-                GOTO(out_free_oa, rc = PTR_ERR(filp));
+                oa->o_gr = FILTER_GROUP_LLOG;
+                oa->o_valid = OBD_MD_FLGENER | OBD_MD_FLGROUP;
+                rc = obd_create(ctxt->loc_exp, oa, NULL, NULL);
+                if (rc)
+                        GOTO(out_free_oa, rc);
+
+                dchild = obd_lvfs_fid2dentry(ctxt->loc_exp, oa->o_id,
+                                             oa->o_generation, oa->o_gr);
+                if (IS_ERR(dchild))
+                        GOTO(out_free_oa, rc = PTR_ERR(dchild));
+
+                filp = l_dentry_open(&obd->obd_lvfs_ctxt, dchild,
+                                     open_flags);
+                if (IS_ERR(filp)) {
+                        l_dput(dchild);
+                        GOTO(out_free_oa, rc = PTR_ERR(filp));
+                }
+
+                /* group 1 is not longer valid, we use the group which is set 
+                by obd_create()->mds_obd_create(). */
+                lgh_id->lgl_ogr = oa->o_gr;
+                lgh_id->lgl_oid = oa->o_id;
+                lgh_id->lgl_ogen = oa->o_generation;
         }
 
-        /* group 1 is not longer valid, we use the group which is set 
-           by obd_create()->mds_obd_create(). */
-        lgh_id->lgl_ogr = oa->o_gr;
-        lgh_id->lgl_oid = oa->o_id;
-        lgh_id->lgl_ogen = oa->o_generation;
-        OBD_FREE(oa, sizeof(*oa));
-        RETURN(filp);
-        
 out_free_oa:
-        OBD_FREE(oa, sizeof(*oa));
-        RETURN(ERR_PTR(rc));
+        if (rc)
+                filp = ERR_PTR(rc);
+        if (oa)
+                OBD_FREE(oa, sizeof(*oa));
+        RETURN(filp);
 }
 
 static struct file *
@@ -664,15 +784,18 @@ out:
         RETURN(rc);
 }
 
-static int llog_lvfs_create(struct llog_ctxt *ctxt, struct llog_handle **res,
-                            struct llog_logid *logid, char *name)
+static int llog_lvfs_open(struct llog_ctxt *ctxt, struct llog_handle **res,
+                          struct llog_logid *logid, char *name, int flags)
 {
         struct llog_handle *handle;
         struct lvfs_run_ctxt saved;
         int rc = 0;
-        int open_flags = O_RDWR | O_CREAT | O_LARGEFILE;
+        int open_flags = O_RDWR | O_LARGEFILE;
         ENTRY;
-        
+
+        if (flags & OBD_LLOG_FL_CREATE)
+                open_flags |= O_CREAT;
+
         handle = llog_alloc_handle();
         if (handle == NULL)
                 RETURN(-ENOMEM);
@@ -683,24 +806,15 @@ static int llog_lvfs_create(struct llog_ctxt *ctxt, struct llog_handle **res,
                 push_ctxt(&saved, ctxt->loc_lvfs_ctxt, NULL);
         
         if (logid != NULL) {
-                char logname[LL_FID_NAMELEN + 10] = "OBJECTS/";
-                char fidname[LL_FID_NAMELEN];
-                ll_fid2str(fidname, logid->lgl_oid, logid->lgl_ogen);
-                strcat(logname, fidname);
-                
-                handle->lgh_file = filp_open(logname, O_RDWR | O_LARGEFILE, 0644);
+                handle->lgh_file = llog_object_create(ctxt, logid);
                 if (IS_ERR(handle->lgh_file)) {
-                        CERROR("cannot open %s file, error = %ld\n", 
-                               logname, PTR_ERR(handle->lgh_file));
+                        CERROR("cannot create/open llog object "LPX64":%x "
+                               "error = %ld", logid->lgl_oid, logid->lgl_ogen,
+                               PTR_ERR(handle->lgh_file));
                         GOTO(cleanup, rc = PTR_ERR(handle->lgh_file));
                 }
-                if (!S_ISREG(handle->lgh_file->f_dentry->d_inode->i_mode)) {
-                        CERROR("%s is not a regular file!: mode = %o\n", logname,
-                               handle->lgh_file->f_dentry->d_inode->i_mode);
-                        GOTO(cleanup, rc = -ENOENT);
-                }
-                LASSERT(handle->lgh_file->f_dentry->d_parent == ctxt->loc_objects_dir);
                 handle->lgh_id = *logid;
+
         } else if (name) {
                 handle->lgh_file = llog_filp_open(name, open_flags, 0644);
                 if (IS_ERR(handle->lgh_file)) {
@@ -716,6 +830,7 @@ static int llog_lvfs_create(struct llog_ctxt *ctxt, struct llog_handle **res,
                 rc = llog_add_link_object(ctxt, handle->lgh_id, handle->lgh_file->f_dentry);
                 if (rc)
                         GOTO(cleanup, rc);
+
         } else {
                 handle->lgh_file = llog_object_create(ctxt, &handle->lgh_id);
                 if (IS_ERR(handle->lgh_file)) {
@@ -937,7 +1052,7 @@ int llog_put_cat_list(struct lvfs_run_ctxt *ctxt,
 EXPORT_SYMBOL(llog_put_cat_list);
 
 struct llog_operations llog_lvfs_ops = {
-        lop_create:      llog_lvfs_create,
+        lop_open:        llog_lvfs_open,
         lop_destroy:     llog_lvfs_destroy,
         lop_close:       llog_lvfs_close,
         lop_read_header: llog_lvfs_read_header,
@@ -964,8 +1079,8 @@ static int llog_lvfs_write_rec(struct llog_handle *loghandle,
         return 0;
 }
 
-static int llog_lvfs_create(struct llog_ctxt *ctxt, struct llog_handle **res,
-                            struct llog_logid *logid, char *name)
+static int llog_lvfs_open(struct llog_ctxt *ctxt, struct llog_handle **res,
+                          struct llog_logid *logid, char *name, int flags)
 {
         LBUG();
         return 0;
@@ -1014,7 +1129,7 @@ int llog_lvfs_next_block(struct llog_handle *loghandle, int *curr_idx,
 }
 
 struct llog_operations llog_lvfs_ops = {
-        lop_create:      llog_lvfs_create,
+        lop_open:        llog_lvfs_open,
         lop_destroy:     llog_lvfs_destroy,
         lop_close:       llog_lvfs_close,
         lop_read_header: llog_lvfs_read_header,