X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Flvfs%2Fllog_lvfs.c;h=938cfc35233c446e645c88f0b0b1ddda673d47ee;hb=926c6309185a25a8ac1541cfa67910325ed8626f;hp=6b70cf18c856e7fbf676df9591fe4bd36e5596d9;hpb=2dc9c16e770415d56839e1996015fec5fab93f29;p=fs%2Flustre-release.git diff --git a/lustre/lvfs/llog_lvfs.c b/lustre/lvfs/llog_lvfs.c index 6b70cf1..938cfc3 100644 --- a/lustre/lvfs/llog_lvfs.c +++ b/lustre/lvfs/llog_lvfs.c @@ -204,7 +204,7 @@ static int llog_lvfs_write_rec(struct llog_handle *loghandle, loff_t saved_offset; /* no header: only allowed to insert record 1 */ - if (idx != 1 && !file->f_dentry->d_inode->i_size) { + if (idx > 1 && !file->f_dentry->d_inode->i_size) { CERROR("idx != -1 in empty log\n"); LBUG(); } @@ -246,6 +246,9 @@ static int llog_lvfs_write_rec(struct llog_handle *loghandle, rc = llog_lvfs_pad(ctxt, file, left, loghandle->lgh_last_idx); if (rc) RETURN(rc); + /* if it's the last idx in log file, then return -ENOSPC */ + if (loghandle->lgh_last_idx == LLOG_BITMAP_SIZE(llh) - 1) + RETURN(-ENOSPC); } loghandle->lgh_last_idx++; @@ -268,15 +271,32 @@ static int llog_lvfs_write_rec(struct llog_handle *loghandle, if (rc) RETURN(rc); + CDEBUG(D_HA, "adding record "LPX64": idx: %u, %u bytes off: %lld\n", + loghandle->lgh_id.lgl_oid, index, le32_to_cpu(rec->lrh_len), + file->f_pos); + rc = llog_lvfs_write_blob(ctxt, file, rec, buf, file->f_pos); if (rc) RETURN(rc); - CDEBUG(D_HA, "added record "LPX64": idx: %u, %u bytes\n", - loghandle->lgh_id.lgl_oid, index, le32_to_cpu(rec->lrh_len)); if (rc == 0 && reccookie) { - reccookie->lgc_lgl = loghandle->lgh_id; - reccookie->lgc_index = index; + if (llog_cookie_get_flags(reccookie) & LLOG_COOKIE_REPLAY) { + LASSERTF(EQ_LOGID(reccookie->lgc_lgl,loghandle->lgh_id), + "lgc_lgl.oid/gr "LPU64"/"LPU64" lgh_id.oid/gr" + LPU64"/"LPU64"\n", + reccookie->lgc_lgl.lgl_oid, + reccookie->lgc_lgl.lgl_ogr, + loghandle->lgh_id.lgl_oid, + loghandle->lgh_id.lgl_oid); + LASSERTF(reccookie->lgc_index == index, + "lgc_index %u != index %u\n", + reccookie->lgc_index, index); + } else { + reccookie->lgc_lgl = loghandle->lgh_id; + reccookie->lgc_index = index; + llog_cookie_add_flags(reccookie, LLOG_COOKIE_REPLAY); + } + if (le32_to_cpu(rec->lrh_type) == MDS_UNLINK_REC) reccookie->lgc_subsys = LLOG_UNLINK_ORIG_CTXT; else if (le32_to_cpu(rec->lrh_type) == OST_SZ_REC) @@ -318,7 +338,6 @@ static int llog_lvfs_next_block(struct llog_handle *loghandle, int *curr_idx, int len) { struct llog_ctxt *ctxt = loghandle->lgh_ctxt; - int rc; ENTRY; if (len == 0 || len & (LLOG_CHUNK_SIZE - 1)) @@ -331,6 +350,7 @@ static int llog_lvfs_next_block(struct llog_handle *loghandle, int *curr_idx, struct llog_rec_hdr *rec; struct llog_rec_tail *tail; loff_t ppos; + int nbytes, rc; llog_skip_over(curr_offset, *curr_idx, next_idx); @@ -347,21 +367,20 @@ static int llog_lvfs_next_block(struct llog_handle *loghandle, int *curr_idx, RETURN(rc); } - /* put number of bytes read into rc to make code simpler */ - rc = ppos - *curr_offset; + nbytes = ppos - *curr_offset; *curr_offset = ppos; - if (rc == 0) /* end of file, nothing to do */ + if (nbytes == 0) /* end of file, nothing to do */ RETURN(0); - if (rc < sizeof(*tail)) { + if (nbytes < sizeof(*tail)) { CERROR("Invalid llog block at log id "LPU64"/%u offset " LPU64"\n", loghandle->lgh_id.lgl_oid, loghandle->lgh_id.lgl_ogen, *curr_offset); RETURN(-EINVAL); } - tail = buf + rc - sizeof(struct llog_rec_tail); + tail = buf + nbytes - sizeof(struct llog_rec_tail); *curr_idx = le32_to_cpu(tail->lrt_index); /* this shouldn't happen */ @@ -476,7 +495,8 @@ static struct file *llog_filp_open(char *name, int flags, int mode) } else { filp = l_filp_open(logname, flags, mode); if (IS_ERR(filp)) { - CERROR("logfile creation %s: %ld\n", logname, + CERROR("logfile %s(%s): %ld\n", + flags & O_CREAT ? "create" : "open", logname, PTR_ERR(filp)); } } @@ -489,128 +509,228 @@ static struct file *llog_filp_open(char *name, int flags, int mode) static struct file * llog_object_create_alone(struct llog_ctxt *ctxt, struct llog_logid *lgh_id) { - unsigned int tmpname = ll_insecure_random_int(); - char fidname[LL_FID_NAMELEN]; struct file *filp; - struct dentry *new_child, *parent; - void *handle; - int rc = 0, err, namelen; + int rc = 0; ENTRY; - sprintf(fidname, "OBJECTS/%u", tmpname); - filp = filp_open(fidname, O_CREAT | O_EXCL, 0644); - if (IS_ERR(filp)) { - rc = PTR_ERR(filp); - if (rc == -EEXIST) { - CERROR("impossible object name collision %u\n", - tmpname); - LBUG(); + LASSERT(lgh_id != NULL); + if (lgh_id->lgl_oid) { + struct dentry *dchild; + char fidname[LL_FID_NAMELEN]; + int fidlen = 0; + + down(&ctxt->loc_objects_dir->d_inode->i_sem); + fidlen = ll_fid2str(fidname, lgh_id->lgl_oid, lgh_id->lgl_ogen); + dchild = lookup_one_len(fidname, ctxt->loc_objects_dir, fidlen); + if (IS_ERR(dchild)) { + up(&ctxt->loc_objects_dir->d_inode->i_sem); + RETURN((struct file *)dchild); + } + if (dchild->d_inode == NULL) { + struct dentry_params dp; + struct inode *inode; + + dchild->d_fsdata = (void *) &dp; + dp.p_ptr = NULL; + dp.p_inum = lgh_id->lgl_oid; + rc = ll_vfs_create(ctxt->loc_objects_dir->d_inode, + dchild, S_IFREG, NULL); + if (dchild->d_fsdata == (void *)(unsigned long)lgh_id->lgl_oid) + dchild->d_fsdata = NULL; + if (rc) { + CDEBUG(D_INODE, "err during create: %d\n", rc); + dput(dchild); + up(&ctxt->loc_objects_dir->d_inode->i_sem); + RETURN(ERR_PTR(rc)); + } + inode = dchild->d_inode; + LASSERT(inode->i_ino == lgh_id->lgl_oid); + inode->i_generation = lgh_id->lgl_ogen; + CDEBUG(D_HA, "recreated ino %lu with gen %u\n", + inode->i_ino, inode->i_generation); + mark_inode_dirty(inode); + } + + mntget(ctxt->loc_lvfs_ctxt->pwdmnt); + filp = dentry_open(dchild, ctxt->loc_lvfs_ctxt->pwdmnt, + O_RDWR | O_LARGEFILE); + if (IS_ERR(filp)) { + dput(dchild); + up(&ctxt->loc_objects_dir->d_inode->i_sem); + RETURN(filp); + } + if (!S_ISREG(filp->f_dentry->d_inode->i_mode)) { + CERROR("%s is not a regular file!: mode = %o\n", fidname, + filp->f_dentry->d_inode->i_mode); + filp_close(filp, 0); + up(&ctxt->loc_objects_dir->d_inode->i_sem); + RETURN(ERR_PTR(-ENOENT)); } - CERROR("error creating tmp object %u: rc %d\n", tmpname, rc); + + up(&ctxt->loc_objects_dir->d_inode->i_sem); RETURN(filp); - } - namelen = ll_fid2str(fidname, filp->f_dentry->d_inode->i_ino, - filp->f_dentry->d_inode->i_generation); - parent = filp->f_dentry->d_parent; - down(&parent->d_inode->i_sem); - new_child = lookup_one_len(fidname, parent, namelen); - if (IS_ERR(new_child)) { - CERROR("getting neg dentry for obj rename: %d\n", rc); - GOTO(out_close, rc = PTR_ERR(new_child)); - } - if (new_child->d_inode != NULL) { - CERROR("impossible non-negative obj dentry %lu:%u!\n", - filp->f_dentry->d_inode->i_ino, - filp->f_dentry->d_inode->i_generation); - LBUG(); - } + } else { + unsigned int tmpname = ll_insecure_random_int(); + char fidname[LL_FID_NAMELEN]; + struct dentry *new_child, *parent; + void *handle; + int err, namelen; - handle = llog_fsfilt_start(ctxt, parent->d_inode, FSFILT_OP_RENAME, NULL); - if (IS_ERR(handle)) - GOTO(out_dput, rc = PTR_ERR(handle)); + sprintf(fidname, "OBJECTS/%u", tmpname); + filp = filp_open(fidname, O_CREAT | O_EXCL, 0644); + if (IS_ERR(filp)) { + rc = PTR_ERR(filp); + if (rc == -EEXIST) { + CERROR("impossible object name collision %u\n", + tmpname); + LBUG(); + } + CERROR("error creating tmp object %u: rc %d\n", tmpname, rc); + RETURN(filp); + } - lock_kernel(); - rc = vfs_rename(parent->d_inode, filp->f_dentry, - parent->d_inode, new_child); - unlock_kernel(); - if (rc) - CERROR("error renaming new object %lu:%u: rc %d\n", - filp->f_dentry->d_inode->i_ino, - filp->f_dentry->d_inode->i_generation, rc); + namelen = ll_fid2str(fidname, filp->f_dentry->d_inode->i_ino, + filp->f_dentry->d_inode->i_generation); + parent = filp->f_dentry->d_parent; + down(&parent->d_inode->i_sem); + new_child = lookup_one_len(fidname, parent, namelen); + if (IS_ERR(new_child)) { + CERROR("getting neg dentry for obj rename: %d\n", rc); + GOTO(out_close, rc = PTR_ERR(new_child)); + } + if (new_child->d_inode != NULL) { + CERROR("impossible non-negative obj dentry %lu:%u!\n", + filp->f_dentry->d_inode->i_ino, + filp->f_dentry->d_inode->i_generation); + LBUG(); + } - err = llog_fsfilt_commit(ctxt, parent->d_inode, handle, 0); - if (!rc) - rc = err; + handle = llog_fsfilt_start(ctxt, parent->d_inode, FSFILT_OP_RENAME, NULL); + if (IS_ERR(handle)) + GOTO(out_dput, rc = PTR_ERR(handle)); -out_dput: - dput(new_child); -out_close: - up(&parent->d_inode->i_sem); - if (rc) { - filp_close(filp, 0); - filp = (struct file *)rc; - } else { - /* FIXME: is this group 1 is correct? */ - lgh_id->lgl_ogr = 1; - lgh_id->lgl_oid = filp->f_dentry->d_inode->i_ino; - lgh_id->lgl_ogen = filp->f_dentry->d_inode->i_generation; + lock_kernel(); + rc = vfs_rename(parent->d_inode, filp->f_dentry, + parent->d_inode, new_child); + unlock_kernel(); + if (rc) + CERROR("error renaming new object %lu:%u: rc %d\n", + filp->f_dentry->d_inode->i_ino, + filp->f_dentry->d_inode->i_generation, rc); + + err = llog_fsfilt_commit(ctxt, parent->d_inode, handle, 0); + if (!rc) + rc = err; + + out_dput: + dput(new_child); + out_close: + up(&parent->d_inode->i_sem); + if (rc) { + filp_close(filp, 0); + filp = ERR_PTR(rc); + } else { + /* FIXME: is this group 1 is correct? */ + lgh_id->lgl_ogr = 1; + lgh_id->lgl_oid = filp->f_dentry->d_inode->i_ino; + lgh_id->lgl_ogen = filp->f_dentry->d_inode->i_generation; + } + RETURN(filp); } - - RETURN(filp); } /* creates object for generic case (obd exists) */ static struct file * llog_object_create_generic(struct llog_ctxt *ctxt, struct llog_logid *lgh_id) { - int rc = 0; - struct file *filp; + struct file *filp = NULL; struct dentry *dchild; struct obd_device *obd; struct obdo *oa = NULL; - int open_flags = O_RDWR | O_CREAT | O_LARGEFILE; + int open_flags = O_RDWR | O_LARGEFILE; + int rc = 0; ENTRY; - + obd = ctxt->loc_exp->exp_obd; + LASSERT(obd != NULL); + + if (lgh_id->lgl_oid) { + dchild = obd_lvfs_fid2dentry(ctxt->loc_exp, lgh_id->lgl_oid, + lgh_id->lgl_ogen, lgh_id->lgl_ogr); + if (IS_ERR(dchild) == -ENOENT) { + OBD_ALLOC(oa, sizeof(*oa)); + if (!oa) + RETURN(ERR_PTR(-ENOMEM)); + + oa->o_id = lgh_id->lgl_oid; + oa->o_generation = lgh_id->lgl_ogen; + oa->o_gr = lgh_id->lgl_ogr; + oa->o_valid = OBD_MD_FLGENER | OBD_MD_FLGROUP; + rc = obd_create(ctxt->loc_exp, oa, NULL, NULL); + if (rc) { + CDEBUG(D_INODE, "err during create: %d\n", rc); + GOTO(out_free_oa, rc); + } + CDEBUG(D_HA, "re-create log object "LPX64":0x%x:"LPX64"\n", + lgh_id->lgl_oid, lgh_id->lgl_ogen, lgh_id->lgl_ogr); + + dchild = obd_lvfs_fid2dentry(ctxt->loc_exp, lgh_id->lgl_oid, + lgh_id->lgl_ogen, lgh_id->lgl_ogr); + } else if (IS_ERR(dchild)) { + CERROR("error looking up logfile "LPX64":0x%x: rc %d\n", + lgh_id->lgl_oid, lgh_id->lgl_ogen, rc); + RETURN((struct file *)dchild); + } - /* this is important to work here over obd_create() as it manages - groups and we need it. Yet another reason is that mds_obd_create() - is fully the same as old version of this function and this helps - us to avoid code duplicating and layering violating. */ - OBD_ALLOC(oa, sizeof(*oa)); - if (!oa) - RETURN(ERR_PTR(-ENOMEM)); - - oa->o_gr = FILTER_GROUP_LLOG; - oa->o_valid = OBD_MD_FLGENER | OBD_MD_FLGROUP; - rc = obd_create(ctxt->loc_exp, oa, NULL, NULL); - if (rc) + filp = l_dentry_open(&obd->obd_lvfs_ctxt, dchild, open_flags); + if (IS_ERR(filp)) { + l_dput(dchild); + rc = PTR_ERR(filp); + CERROR("error opening logfile "LPX64"0x%x: rc %d\n", + lgh_id->lgl_oid, lgh_id->lgl_ogen, rc); + } GOTO(out_free_oa, rc); + } else { + /* this is important to work here over obd_create() as it manages + groups and we need it. Yet another reason is that mds_obd_create() + is fully the same as old version of this function and this helps + us to avoid code duplicating and layering violating. */ + OBD_ALLOC(oa, sizeof(*oa)); + if (!oa) + RETURN(ERR_PTR(-ENOMEM)); - dchild = obd_lvfs_fid2dentry(ctxt->loc_exp, oa->o_id, - oa->o_generation, oa->o_gr); - if (IS_ERR(dchild)) - GOTO(out_free_oa, rc = PTR_ERR(dchild)); - - filp = l_dentry_open(&obd->obd_lvfs_ctxt, dchild, - open_flags); - if (IS_ERR(filp)) { - l_dput(dchild); - GOTO(out_free_oa, rc = PTR_ERR(filp)); + oa->o_gr = FILTER_GROUP_LLOG; + oa->o_valid = OBD_MD_FLGENER | OBD_MD_FLGROUP; + rc = obd_create(ctxt->loc_exp, oa, NULL, NULL); + if (rc) + GOTO(out_free_oa, rc); + + dchild = obd_lvfs_fid2dentry(ctxt->loc_exp, oa->o_id, + oa->o_generation, oa->o_gr); + if (IS_ERR(dchild)) + GOTO(out_free_oa, rc = PTR_ERR(dchild)); + + filp = l_dentry_open(&obd->obd_lvfs_ctxt, dchild, + open_flags); + if (IS_ERR(filp)) { + l_dput(dchild); + GOTO(out_free_oa, rc = PTR_ERR(filp)); + } + + /* group 1 is not longer valid, we use the group which is set + by obd_create()->mds_obd_create(). */ + lgh_id->lgl_ogr = oa->o_gr; + lgh_id->lgl_oid = oa->o_id; + lgh_id->lgl_ogen = oa->o_generation; } - /* group 1 is not longer valid, we use the group which is set - by obd_create()->mds_obd_create(). */ - lgh_id->lgl_ogr = oa->o_gr; - lgh_id->lgl_oid = oa->o_id; - lgh_id->lgl_ogen = oa->o_generation; - OBD_FREE(oa, sizeof(*oa)); - RETURN(filp); - out_free_oa: - OBD_FREE(oa, sizeof(*oa)); - RETURN(ERR_PTR(rc)); + if (rc) + filp = ERR_PTR(rc); + if (oa) + OBD_FREE(oa, sizeof(*oa)); + RETURN(filp); } static struct file * @@ -664,15 +784,18 @@ out: RETURN(rc); } -static int llog_lvfs_create(struct llog_ctxt *ctxt, struct llog_handle **res, - struct llog_logid *logid, char *name) +static int llog_lvfs_open(struct llog_ctxt *ctxt, struct llog_handle **res, + struct llog_logid *logid, char *name, int flags) { struct llog_handle *handle; struct lvfs_run_ctxt saved; int rc = 0; - int open_flags = O_RDWR | O_CREAT | O_LARGEFILE; + int open_flags = O_RDWR | O_LARGEFILE; ENTRY; - + + if (flags & OBD_LLOG_FL_CREATE) + open_flags |= O_CREAT; + handle = llog_alloc_handle(); if (handle == NULL) RETURN(-ENOMEM); @@ -683,24 +806,15 @@ static int llog_lvfs_create(struct llog_ctxt *ctxt, struct llog_handle **res, push_ctxt(&saved, ctxt->loc_lvfs_ctxt, NULL); if (logid != NULL) { - char logname[LL_FID_NAMELEN + 10] = "OBJECTS/"; - char fidname[LL_FID_NAMELEN]; - ll_fid2str(fidname, logid->lgl_oid, logid->lgl_ogen); - strcat(logname, fidname); - - handle->lgh_file = filp_open(logname, O_RDWR | O_LARGEFILE, 0644); + handle->lgh_file = llog_object_create(ctxt, logid); if (IS_ERR(handle->lgh_file)) { - CERROR("cannot open %s file, error = %ld\n", - logname, PTR_ERR(handle->lgh_file)); + CERROR("cannot create/open llog object "LPX64":%x " + "error = %ld", logid->lgl_oid, logid->lgl_ogen, + PTR_ERR(handle->lgh_file)); GOTO(cleanup, rc = PTR_ERR(handle->lgh_file)); } - if (!S_ISREG(handle->lgh_file->f_dentry->d_inode->i_mode)) { - CERROR("%s is not a regular file!: mode = %o\n", logname, - handle->lgh_file->f_dentry->d_inode->i_mode); - GOTO(cleanup, rc = -ENOENT); - } - LASSERT(handle->lgh_file->f_dentry->d_parent == ctxt->loc_objects_dir); handle->lgh_id = *logid; + } else if (name) { handle->lgh_file = llog_filp_open(name, open_flags, 0644); if (IS_ERR(handle->lgh_file)) { @@ -716,6 +830,7 @@ static int llog_lvfs_create(struct llog_ctxt *ctxt, struct llog_handle **res, rc = llog_add_link_object(ctxt, handle->lgh_id, handle->lgh_file->f_dentry); if (rc) GOTO(cleanup, rc); + } else { handle->lgh_file = llog_object_create(ctxt, &handle->lgh_id); if (IS_ERR(handle->lgh_file)) { @@ -937,7 +1052,7 @@ int llog_put_cat_list(struct lvfs_run_ctxt *ctxt, EXPORT_SYMBOL(llog_put_cat_list); struct llog_operations llog_lvfs_ops = { - lop_create: llog_lvfs_create, + lop_open: llog_lvfs_open, lop_destroy: llog_lvfs_destroy, lop_close: llog_lvfs_close, lop_read_header: llog_lvfs_read_header, @@ -964,8 +1079,8 @@ static int llog_lvfs_write_rec(struct llog_handle *loghandle, return 0; } -static int llog_lvfs_create(struct llog_ctxt *ctxt, struct llog_handle **res, - struct llog_logid *logid, char *name) +static int llog_lvfs_open(struct llog_ctxt *ctxt, struct llog_handle **res, + struct llog_logid *logid, char *name, int flags) { LBUG(); return 0; @@ -1014,7 +1129,7 @@ int llog_lvfs_next_block(struct llog_handle *loghandle, int *curr_idx, } struct llog_operations llog_lvfs_ops = { - lop_create: llog_lvfs_create, + lop_open: llog_lvfs_open, lop_destroy: llog_lvfs_destroy, lop_close: llog_lvfs_close, lop_read_header: llog_lvfs_read_header,