rc = llog_lvfs_pad(ctxt, file, left, loghandle->lgh_last_idx);
if (rc)
RETURN(rc);
+ /* if it's the last idx in log file, then return -ENOSPC */
+ if (loghandle->lgh_last_idx == LLOG_BITMAP_SIZE(llh) - 1)
+ RETURN(-ENOSPC);
}
loghandle->lgh_last_idx++;
RETURN(rc);
if (rc == 0 && reccookie) {
- reccookie->lgc_lgl = loghandle->lgh_id;
- reccookie->lgc_index = index;
+ if (llog_cookie_get_flags(reccookie) & LLOG_COOKIE_REPLAY) {
+ LASSERTF(EQ_LOGID(reccookie->lgc_lgl,loghandle->lgh_id),
+ "lgc_lgl.oid/gr "LPU64"/"LPU64" lgh_id.oid/gr"
+ LPU64"/"LPU64"\n",
+ reccookie->lgc_lgl.lgl_oid,
+ reccookie->lgc_lgl.lgl_ogr,
+ loghandle->lgh_id.lgl_oid,
+ loghandle->lgh_id.lgl_oid);
+ LASSERTF(reccookie->lgc_index == index,
+ "lgc_index %u != index %u\n",
+ reccookie->lgc_index, index);
+ } else {
+ reccookie->lgc_lgl = loghandle->lgh_id;
+ reccookie->lgc_index = index;
+ llog_cookie_add_flags(reccookie, LLOG_COOKIE_REPLAY);
+ }
+
if (le32_to_cpu(rec->lrh_type) == MDS_UNLINK_REC)
reccookie->lgc_subsys = LLOG_UNLINK_ORIG_CTXT;
else if (le32_to_cpu(rec->lrh_type) == OST_SZ_REC)
static struct file *
llog_object_create_alone(struct llog_ctxt *ctxt, struct llog_logid *lgh_id)
{
- unsigned int tmpname = ll_insecure_random_int();
- char fidname[LL_FID_NAMELEN];
struct file *filp;
- struct dentry *new_child, *parent;
- void *handle;
- int rc = 0, err, namelen;
+ int rc = 0;
ENTRY;
- sprintf(fidname, "OBJECTS/%u", tmpname);
- filp = filp_open(fidname, O_CREAT | O_EXCL, 0644);
- if (IS_ERR(filp)) {
- rc = PTR_ERR(filp);
- if (rc == -EEXIST) {
- CERROR("impossible object name collision %u\n",
- tmpname);
- LBUG();
+ LASSERT(lgh_id != NULL);
+ if (lgh_id->lgl_oid) {
+ struct dentry *dchild;
+ char fidname[LL_FID_NAMELEN];
+ int fidlen = 0;
+
+ down(&ctxt->loc_objects_dir->d_inode->i_sem);
+ fidlen = ll_fid2str(fidname, lgh_id->lgl_oid, lgh_id->lgl_ogen);
+ dchild = lookup_one_len(fidname, ctxt->loc_objects_dir, fidlen);
+ if (IS_ERR(dchild)) {
+ up(&ctxt->loc_objects_dir->d_inode->i_sem);
+ RETURN((struct file *)dchild);
}
- CERROR("error creating tmp object %u: rc %d\n", tmpname, rc);
+ if (dchild->d_inode == NULL) {
+ struct dentry_params dp;
+ struct inode *inode;
+
+ dchild->d_fsdata = (void *) &dp;
+ dp.p_ptr = NULL;
+ dp.p_inum = lgh_id->lgl_oid;
+ rc = ll_vfs_create(ctxt->loc_objects_dir->d_inode,
+ dchild, S_IFREG, NULL);
+ if (dchild->d_fsdata == (void *)(unsigned long)lgh_id->lgl_oid)
+ dchild->d_fsdata = NULL;
+ if (rc) {
+ CDEBUG(D_INODE, "err during create: %d\n", rc);
+ dput(dchild);
+ up(&ctxt->loc_objects_dir->d_inode->i_sem);
+ RETURN(ERR_PTR(rc));
+ }
+ inode = dchild->d_inode;
+ LASSERT(inode->i_ino == lgh_id->lgl_oid);
+ inode->i_generation = lgh_id->lgl_ogen;
+ CDEBUG(D_HA, "recreated ino %lu with gen %u\n",
+ inode->i_ino, inode->i_generation);
+ mark_inode_dirty(inode);
+ }
+
+ mntget(ctxt->loc_lvfs_ctxt->pwdmnt);
+ filp = dentry_open(dchild, ctxt->loc_lvfs_ctxt->pwdmnt,
+ O_RDWR | O_LARGEFILE);
+ if (IS_ERR(filp)) {
+ dput(dchild);
+ up(&ctxt->loc_objects_dir->d_inode->i_sem);
+ RETURN(filp);
+ }
+ if (!S_ISREG(filp->f_dentry->d_inode->i_mode)) {
+ CERROR("%s is not a regular file!: mode = %o\n", fidname,
+ filp->f_dentry->d_inode->i_mode);
+ filp_close(filp, 0);
+ up(&ctxt->loc_objects_dir->d_inode->i_sem);
+ RETURN(ERR_PTR(-ENOENT));
+ }
+
+ up(&ctxt->loc_objects_dir->d_inode->i_sem);
RETURN(filp);
- }
- namelen = ll_fid2str(fidname, filp->f_dentry->d_inode->i_ino,
- filp->f_dentry->d_inode->i_generation);
- parent = filp->f_dentry->d_parent;
- down(&parent->d_inode->i_sem);
- new_child = lookup_one_len(fidname, parent, namelen);
- if (IS_ERR(new_child)) {
- CERROR("getting neg dentry for obj rename: %d\n", rc);
- GOTO(out_close, rc = PTR_ERR(new_child));
- }
- if (new_child->d_inode != NULL) {
- CERROR("impossible non-negative obj dentry %lu:%u!\n",
- filp->f_dentry->d_inode->i_ino,
- filp->f_dentry->d_inode->i_generation);
- LBUG();
- }
+ } else {
+ unsigned int tmpname = ll_insecure_random_int();
+ char fidname[LL_FID_NAMELEN];
+ struct dentry *new_child, *parent;
+ void *handle;
+ int err, namelen;
- handle = llog_fsfilt_start(ctxt, parent->d_inode, FSFILT_OP_RENAME, NULL);
- if (IS_ERR(handle))
- GOTO(out_dput, rc = PTR_ERR(handle));
+ sprintf(fidname, "OBJECTS/%u", tmpname);
+ filp = filp_open(fidname, O_CREAT | O_EXCL, 0644);
+ if (IS_ERR(filp)) {
+ rc = PTR_ERR(filp);
+ if (rc == -EEXIST) {
+ CERROR("impossible object name collision %u\n",
+ tmpname);
+ LBUG();
+ }
+ CERROR("error creating tmp object %u: rc %d\n", tmpname, rc);
+ RETURN(filp);
+ }
- lock_kernel();
- rc = vfs_rename(parent->d_inode, filp->f_dentry,
- parent->d_inode, new_child);
- unlock_kernel();
- if (rc)
- CERROR("error renaming new object %lu:%u: rc %d\n",
- filp->f_dentry->d_inode->i_ino,
- filp->f_dentry->d_inode->i_generation, rc);
+ namelen = ll_fid2str(fidname, filp->f_dentry->d_inode->i_ino,
+ filp->f_dentry->d_inode->i_generation);
+ parent = filp->f_dentry->d_parent;
+ down(&parent->d_inode->i_sem);
+ new_child = lookup_one_len(fidname, parent, namelen);
+ if (IS_ERR(new_child)) {
+ CERROR("getting neg dentry for obj rename: %d\n", rc);
+ GOTO(out_close, rc = PTR_ERR(new_child));
+ }
+ if (new_child->d_inode != NULL) {
+ CERROR("impossible non-negative obj dentry %lu:%u!\n",
+ filp->f_dentry->d_inode->i_ino,
+ filp->f_dentry->d_inode->i_generation);
+ LBUG();
+ }
- err = llog_fsfilt_commit(ctxt, parent->d_inode, handle, 0);
- if (!rc)
- rc = err;
+ handle = llog_fsfilt_start(ctxt, parent->d_inode, FSFILT_OP_RENAME, NULL);
+ if (IS_ERR(handle))
+ GOTO(out_dput, rc = PTR_ERR(handle));
-out_dput:
- dput(new_child);
-out_close:
- up(&parent->d_inode->i_sem);
- if (rc) {
- filp_close(filp, 0);
- filp = (struct file *)rc;
- } else {
- /* FIXME: is this group 1 is correct? */
- lgh_id->lgl_ogr = 1;
- lgh_id->lgl_oid = filp->f_dentry->d_inode->i_ino;
- lgh_id->lgl_ogen = filp->f_dentry->d_inode->i_generation;
+ lock_kernel();
+ rc = vfs_rename(parent->d_inode, filp->f_dentry,
+ parent->d_inode, new_child);
+ unlock_kernel();
+ if (rc)
+ CERROR("error renaming new object %lu:%u: rc %d\n",
+ filp->f_dentry->d_inode->i_ino,
+ filp->f_dentry->d_inode->i_generation, rc);
+
+ err = llog_fsfilt_commit(ctxt, parent->d_inode, handle, 0);
+ if (!rc)
+ rc = err;
+
+ out_dput:
+ dput(new_child);
+ out_close:
+ up(&parent->d_inode->i_sem);
+ if (rc) {
+ filp_close(filp, 0);
+ filp = ERR_PTR(rc);
+ } else {
+ /* FIXME: is this group 1 is correct? */
+ lgh_id->lgl_ogr = 1;
+ lgh_id->lgl_oid = filp->f_dentry->d_inode->i_ino;
+ lgh_id->lgl_ogen = filp->f_dentry->d_inode->i_generation;
+ }
+ RETURN(filp);
}
-
- RETURN(filp);
}
/* creates object for generic case (obd exists) */
static struct file *
llog_object_create_generic(struct llog_ctxt *ctxt, struct llog_logid *lgh_id)
{
- int rc = 0;
- struct file *filp;
+ struct file *filp = NULL;
struct dentry *dchild;
struct obd_device *obd;
struct obdo *oa = NULL;
- int open_flags = O_RDWR | O_CREAT | O_LARGEFILE;
+ int open_flags = O_RDWR | O_LARGEFILE;
+ int rc = 0;
ENTRY;
-
+
obd = ctxt->loc_exp->exp_obd;
+ LASSERT(obd != NULL);
+
+ if (lgh_id->lgl_oid) {
+ dchild = obd_lvfs_fid2dentry(ctxt->loc_exp, lgh_id->lgl_oid,
+ lgh_id->lgl_ogen, lgh_id->lgl_ogr);
+ if (IS_ERR(dchild) == -ENOENT) {
+ OBD_ALLOC(oa, sizeof(*oa));
+ if (!oa)
+ RETURN(ERR_PTR(-ENOMEM));
+
+ oa->o_id = lgh_id->lgl_oid;
+ oa->o_generation = lgh_id->lgl_ogen;
+ oa->o_gr = lgh_id->lgl_ogr;
+ oa->o_valid = OBD_MD_FLGENER | OBD_MD_FLGROUP;
+ rc = obd_create(ctxt->loc_exp, oa, NULL, NULL);
+ if (rc) {
+ CDEBUG(D_INODE, "err during create: %d\n", rc);
+ GOTO(out_free_oa, rc);
+ }
+ CDEBUG(D_HA, "re-create log object "LPX64":0x%x:"LPX64"\n",
+ lgh_id->lgl_oid, lgh_id->lgl_ogen, lgh_id->lgl_ogr);
+
+ dchild = obd_lvfs_fid2dentry(ctxt->loc_exp, lgh_id->lgl_oid,
+ lgh_id->lgl_ogen, lgh_id->lgl_ogr);
+ } else if (IS_ERR(dchild)) {
+ CERROR("error looking up logfile "LPX64":0x%x: rc %d\n",
+ lgh_id->lgl_oid, lgh_id->lgl_ogen, rc);
+ RETURN((struct file *)dchild);
+ }
- /* this is important to work here over obd_create() as it manages
- groups and we need it. Yet another reason is that mds_obd_create()
- is fully the same as old version of this function and this helps
- us to avoid code duplicating and layering violating. */
- OBD_ALLOC(oa, sizeof(*oa));
- if (!oa)
- RETURN(ERR_PTR(-ENOMEM));
-
- oa->o_gr = FILTER_GROUP_LLOG;
- oa->o_valid = OBD_MD_FLGENER | OBD_MD_FLGROUP;
- rc = obd_create(ctxt->loc_exp, oa, NULL, NULL);
- if (rc)
+ filp = l_dentry_open(&obd->obd_lvfs_ctxt, dchild, open_flags);
+ if (IS_ERR(filp)) {
+ l_dput(dchild);
+ rc = PTR_ERR(filp);
+ CERROR("error opening logfile "LPX64"0x%x: rc %d\n",
+ lgh_id->lgl_oid, lgh_id->lgl_ogen, rc);
+ }
GOTO(out_free_oa, rc);
+ } else {
+ /* this is important to work here over obd_create() as it manages
+ groups and we need it. Yet another reason is that mds_obd_create()
+ is fully the same as old version of this function and this helps
+ us to avoid code duplicating and layering violating. */
+ OBD_ALLOC(oa, sizeof(*oa));
+ if (!oa)
+ RETURN(ERR_PTR(-ENOMEM));
- dchild = obd_lvfs_fid2dentry(ctxt->loc_exp, oa->o_id,
- oa->o_generation, oa->o_gr);
- if (IS_ERR(dchild))
- GOTO(out_free_oa, rc = PTR_ERR(dchild));
-
- filp = l_dentry_open(&obd->obd_lvfs_ctxt, dchild,
- open_flags);
- if (IS_ERR(filp)) {
- l_dput(dchild);
- GOTO(out_free_oa, rc = PTR_ERR(filp));
+ oa->o_gr = FILTER_GROUP_LLOG;
+ oa->o_valid = OBD_MD_FLGENER | OBD_MD_FLGROUP;
+ rc = obd_create(ctxt->loc_exp, oa, NULL, NULL);
+ if (rc)
+ GOTO(out_free_oa, rc);
+
+ dchild = obd_lvfs_fid2dentry(ctxt->loc_exp, oa->o_id,
+ oa->o_generation, oa->o_gr);
+ if (IS_ERR(dchild))
+ GOTO(out_free_oa, rc = PTR_ERR(dchild));
+
+ filp = l_dentry_open(&obd->obd_lvfs_ctxt, dchild,
+ open_flags);
+ if (IS_ERR(filp)) {
+ l_dput(dchild);
+ GOTO(out_free_oa, rc = PTR_ERR(filp));
+ }
+
+ /* group 1 is not longer valid, we use the group which is set
+ by obd_create()->mds_obd_create(). */
+ lgh_id->lgl_ogr = oa->o_gr;
+ lgh_id->lgl_oid = oa->o_id;
+ lgh_id->lgl_ogen = oa->o_generation;
}
- /* group 1 is not longer valid, we use the group which is set
- by obd_create()->mds_obd_create(). */
- lgh_id->lgl_ogr = oa->o_gr;
- lgh_id->lgl_oid = oa->o_id;
- lgh_id->lgl_ogen = oa->o_generation;
- OBD_FREE(oa, sizeof(*oa));
- RETURN(filp);
-
out_free_oa:
- OBD_FREE(oa, sizeof(*oa));
- RETURN(ERR_PTR(rc));
+ if (rc)
+ filp = ERR_PTR(rc);
+ if (oa)
+ OBD_FREE(oa, sizeof(*oa));
+ RETURN(filp);
}
static struct file *
push_ctxt(&saved, ctxt->loc_lvfs_ctxt, NULL);
if (logid != NULL) {
- char logname[LL_FID_NAMELEN + 10] = "OBJECTS/";
- char fidname[LL_FID_NAMELEN];
- ll_fid2str(fidname, logid->lgl_oid, logid->lgl_ogen);
- strcat(logname, fidname);
-
- handle->lgh_file = filp_open(logname, O_RDWR | O_LARGEFILE, 0644);
+ handle->lgh_file = llog_object_create(ctxt, logid);
if (IS_ERR(handle->lgh_file)) {
- CERROR("cannot open %s file, error = %ld\n",
- logname, PTR_ERR(handle->lgh_file));
+ CERROR("cannot create/open llog object "LPX64":%x "
+ "error = %ld", logid->lgl_oid, logid->lgl_ogen,
+ PTR_ERR(handle->lgh_file));
GOTO(cleanup, rc = PTR_ERR(handle->lgh_file));
}
- if (!S_ISREG(handle->lgh_file->f_dentry->d_inode->i_mode)) {
- CERROR("%s is not a regular file!: mode = %o\n", logname,
- handle->lgh_file->f_dentry->d_inode->i_mode);
- GOTO(cleanup, rc = -ENOENT);
- }
- LASSERT(handle->lgh_file->f_dentry->d_parent == ctxt->loc_objects_dir);
handle->lgh_id = *logid;
+
} else if (name) {
handle->lgh_file = llog_filp_open(name, open_flags, 0644);
if (IS_ERR(handle->lgh_file)) {
rc = llog_add_link_object(ctxt, handle->lgh_id, handle->lgh_file->f_dentry);
if (rc)
GOTO(cleanup, rc);
+
} else {
handle->lgh_file = llog_object_create(ctxt, &handle->lgh_id);
if (IS_ERR(handle->lgh_file)) {