struct llog_logid lgc_lgl;
__u32 lgc_subsys;
__u32 lgc_index;
- __u32 lgc_padding;
+ __u32 lgc_flags;
} __attribute__((packed));
/* llog protocol */
struct llog_logid *logid);
int llog_cat_put(struct llog_handle *cathandle);
int llog_cat_add_rec(struct llog_handle *cathandle, struct llog_rec_hdr *rec,
- struct llog_cookie *reccookie, void *buf);
+ struct llog_cookie *reccookie, void *buf,
+ struct rw_semaphore **lock, int *lock_count);
int llog_cat_cancel_records(struct llog_handle *cathandle, int count,
struct llog_cookie *cookies);
int llog_cat_process(struct llog_handle *cat_llh, llog_cb_t cb, void *data);
/* llog_obd.c */
int llog_catalog_add(struct llog_ctxt *ctxt, struct llog_rec_hdr *rec,
- void *buf, struct llog_cookie *reccookie, int, void *data);
+ void *buf, struct llog_cookie *reccookie, int, void *data,
+ struct rw_semaphore **lock, int *lock_count);
int llog_catalog_cancel(struct llog_ctxt *ctxt, int count, struct llog_cookie *,
int flags, void *data);
int llog_catalog_setup(struct llog_ctxt **res, char *name, struct obd_export *exp,
int llog_obd_origin_setup(struct obd_device *, struct obd_llogs *, int,
struct obd_device *, int, struct llog_logid *);
-int llog_obd_origin_cleanup(struct llog_ctxt *ctxt);
-int llog_obd_origin_add(struct llog_ctxt *ctxt,
- struct llog_rec_hdr *rec, struct lov_stripe_md *lsm,
- struct llog_cookie *logcookies, int numcookies);
int obd_llog_cat_initialize(struct obd_device *, struct obd_llogs *, int, char *);
int (*lop_read_header)(struct llog_handle *handle);
int (*lop_add)(struct llog_ctxt *ctxt, struct llog_rec_hdr *rec,
void *buf, struct llog_cookie *logcookies,
- int numcookies, void *data);
+ int numcookies, void *data, struct rw_semaphore **lock,
+ int *lock_count);
int (*lop_cancel)(struct llog_ctxt *ctxt, int count,
struct llog_cookie *cookies, int flags, void *data);
int (*lop_write_rec)(struct llog_handle *loghandle,
struct obd_llogs *loc_llogs;
};
+struct llog_create_locks {
+ int lcl_count;
+ struct rw_semaphore *lcl_locks[0];
+};
+
static inline void llog_gen_init(struct llog_ctxt *ctxt)
{
struct obd_device *obd = ctxt->loc_exp->exp_obd;
#define LLOG_PROC_BREAK 0x0001
#define LLOG_DEL_RECORD 0x0002
+#define EQ_LOGID(a, b) (a.lgl_oid == b.lgl_oid && \
+ a.lgl_ogr == b.lgl_ogr && \
+ a.lgl_ogen == b.lgl_ogen)
+
+/* Flags are in the bottom 16 bit */
+#define LLOG_COOKIE_FLAG_MASK 0x0000ffff
+#define LLOG_COOKIE_REPLAY_NEW 0x00000001
+#define LLOG_COOKIE_REPLAY 0x00000002
+
+static inline int llog_cookie_get_flags(struct llog_cookie *logcookie)
+{
+ return(logcookie->lgc_flags & LLOG_COOKIE_FLAG_MASK);
+}
+
+static inline void llog_cookie_add_flags(struct llog_cookie *logcookie, int flags)
+{
+ logcookie->lgc_flags |= LLOG_COOKIE_FLAG_MASK & flags;
+}
+
+static inline void llog_cookie_set_flags(struct llog_cookie *logcookie, int flags)
+{
+ logcookie->lgc_flags &= ~LLOG_COOKIE_FLAG_MASK;
+ llog_cookie_add_flags(logcookie, flags);
+}
+
+static inline void llog_cookie_clear_flags(struct llog_cookie *logcookie, int flags)
+{
+ logcookie->lgc_flags &= ~(LLOG_COOKIE_FLAG_MASK & flags);
+}
+
static inline int llog_ctxt2ops(struct llog_ctxt *ctxt,
struct llog_operations **lop)
{
static inline int llog_add(struct llog_ctxt *ctxt, struct llog_rec_hdr *rec,
void *buf, struct llog_cookie *logcookies,
- int numcookies, void *data)
+ int numcookies, void *data,
+ struct rw_semaphore **lock, int *lock_count)
{
struct llog_operations *lop;
int rc;
if (lop->lop_add == NULL)
RETURN(-EOPNOTSUPP);
- rc = lop->lop_add(ctxt, rec, buf, logcookies, numcookies, data);
+ rc = lop->lop_add(ctxt, rec, buf, logcookies, numcookies, data,
+ lock, lock_count);
RETURN(rc);
}
RETURN(rc);
}
+static inline void llog_create_lock_free(struct llog_create_locks *lcl)
+{
+ int size, offset = offsetof(struct llog_create_locks, lcl_locks);
+ int i;
+ ENTRY;
+
+ for (i = 0; i < lcl->lcl_count; i ++) {
+ if (lcl->lcl_locks[i] != NULL) {
+#ifdef __KERNEL__
+ LASSERT(down_write_trylock(lcl->lcl_locks[i]) == 0);
+#endif
+ up_write(lcl->lcl_locks[i]);
+ }
+ }
+ size = offset + sizeof(struct rw_semaphore *) * lcl->lcl_count;
+ OBD_FREE(lcl, size);
+}
+
#endif
int rs_nlocks;
struct lustre_handle rs_locks[RS_MAX_LOCKS];
ldlm_mode_t rs_modes[RS_MAX_LOCKS];
+ struct llog_create_locks *rs_llog_locks;
+
/* last member: variable sized reply message */
struct lustre_msg rs_msg;
};
/* ptlrpc/service.c */
void ptlrpc_save_lock (struct ptlrpc_request *req,
struct lustre_handle *lock, int mode);
+void ptlrpc_save_llog_lock (struct ptlrpc_request *req,
+ struct llog_create_locks *lcl);
void ptlrpc_commit_replies (struct obd_device *obd);
void ptlrpc_schedule_difficult_reply (struct ptlrpc_reply_state *rs);
struct ptlrpc_service *ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size,
inode->i_ino, rc);
}
if (rc == 0) {
- rc = ll_objects_destroy(req, file->f_dentry->d_inode);
+ rc = ll_objects_destroy(req, file->f_dentry->d_inode, 1);
if (rc)
CERROR("inode %lu ll_objects destroy: rc = %d\n",
inode->i_ino, rc);
extern struct inode_operations ll_dir_inode_operations;
/* llite/namei.c */
-int ll_objects_destroy(struct ptlrpc_request *request, struct inode *dir);
+int ll_objects_destroy(struct ptlrpc_request *request,
+ struct inode *dir, int offset);
struct inode *ll_iget(struct super_block *sb, ino_t hash,
struct lustre_md *lic);
struct dentry *ll_find_alias(struct inode *, struct dentry *);
RETURN(rc);
}
-int ll_objects_destroy(struct ptlrpc_request *request, struct inode *dir)
+int ll_objects_destroy(struct ptlrpc_request *request,
+ struct inode *dir, int offset)
{
struct mds_body *body;
struct lov_mds_md *eadata;
oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLGROUP;
if (body->valid & OBD_MD_FLCOOKIE) {
+ int length = sizeof(struct llog_cookie) *
+ lsm->lsm_stripe_count;
oa->o_valid |= OBD_MD_FLCOOKIE;
oti.oti_logcookies =
- lustre_msg_buf(request->rq_repmsg, 2,
- sizeof(struct llog_cookie) *
- lsm->lsm_stripe_count);
+ lustre_msg_buf(request->rq_repmsg, 2, length);
if (oti.oti_logcookies == NULL) {
oa->o_valid &= ~OBD_MD_FLCOOKIE;
body->valid &= ~OBD_MD_FLCOOKIE;
+ } else {
+ /* copy llog cookies to request to replay unlink
+ * so that the same llog file and records as those created
+ * during fail can be re-created while doing replay
+ */
+ if (offset >= 0)
+ memcpy(lustre_msg_buf(request->rq_reqmsg, offset, 0),
+ oti.oti_logcookies, length);
}
}
if (rc)
GOTO(out, rc);
- rc = ll_objects_destroy(request, dir);
+ rc = ll_objects_destroy(request, dir, 2);
out:
ptlrpc_req_finished(request);
RETURN(rc);
err = md_rename(sbi->ll_mdc_exp, &op_data,
oldname, oldlen, newname, newlen, &request);
if (!err) {
- err = ll_objects_destroy(request, src);
+ err = ll_objects_destroy(request, src, 3);
}
ptlrpc_req_finished(request);
* Unset cookies should be all-zero (which will never occur naturally). */
static int lov_llog_origin_add(struct llog_ctxt *ctxt, struct llog_rec_hdr *rec,
void *buf, struct llog_cookie *logcookies,
- int numcookies, void *data)
+ int numcookies, void *data,
+ struct rw_semaphore **lock, int *lock_count)
{
struct obd_device *obd = ctxt->loc_obd;
struct lov_obd *lov = &obd->u.lov;
lur->lur_ogen = loi->loi_gr;
LASSERT(lsm->lsm_object_gr == loi->loi_gr);
rc += llog_add(cctxt, &lur->lur_hdr, NULL, logcookies + rc,
- numcookies - rc, NULL);
+ numcookies - rc, NULL,
+ lock != NULL ? lock + rc : NULL, lock_count);
}
OBD_FREE(lur, sizeof(*lur));
*
* Assumes caller has already pushed us into the kernel context and is locking.
*/
-static struct llog_handle *llog_cat_new_log(struct llog_handle *cathandle)
+static struct llog_handle *llog_cat_new_log(struct llog_handle *cathandle,
+ struct llog_cookie *logcookie)
{
struct llog_handle *loghandle;
struct llog_log_hdr *llh;
llh->llh_tail.lrt_index = cpu_to_le32(index);
}
- rc = llog_open(cathandle->lgh_ctxt, &loghandle, NULL, NULL,
- OBD_LLOG_FL_CREATE);
+ if (logcookie && llog_cookie_get_flags(logcookie) & LLOG_COOKIE_REPLAY_NEW)
+ rc = llog_open(cathandle->lgh_ctxt, &loghandle,
+ &logcookie->lgc_lgl, NULL, OBD_LLOG_FL_CREATE);
+ else
+ rc = llog_open(cathandle->lgh_ctxt, &loghandle, NULL, NULL,
+ OBD_LLOG_FL_CREATE);
if (rc) {
CERROR("cannot create new log, error = %d\n", rc);
RETURN(ERR_PTR(rc));
list_add_tail(&loghandle->u.phd.phd_entry, &cathandle->u.chd.chd_head);
out_destroy:
- if (rc < 0)
+ if (rc < 0)
llog_destroy(loghandle);
+ else if (logcookie) {
+ if (llog_cookie_get_flags(logcookie) & LLOG_COOKIE_REPLAY_NEW)
+ LASSERT(EQ_LOGID(loghandle->lgh_id, logcookie->lgc_lgl));
+ else
+ llog_cookie_set_flags(logcookie, LLOG_COOKIE_REPLAY_NEW);
+ }
RETURN(loghandle);
}
* NOTE: loghandle is write-locked upon successful return
*/
static struct llog_handle *llog_cat_current_log(struct llog_handle *cathandle,
- int create)
+ int create,
+ struct llog_cookie *logcookie,
+ struct rw_semaphore **lock)
{
struct llog_handle *loghandle = NULL;
ENTRY;
loghandle = cathandle->u.chd.chd_current_log;
if (loghandle) {
struct llog_log_hdr *llh = loghandle->lgh_hdr;
- if (loghandle->lgh_last_idx < (sizeof(llh->llh_bitmap)*8) - 1) {
+ if (loghandle->lgh_last_idx < (sizeof(llh->llh_bitmap)*8) - 1 &&
+ (!logcookie ||
+ !(llog_cookie_get_flags(logcookie) & LLOG_COOKIE_REPLAY) ||
+ EQ_LOGID(loghandle->lgh_id, logcookie->lgc_lgl))) {
down_write(&loghandle->lgh_lock);
up_read(&cathandle->lgh_lock);
RETURN(loghandle);
}
}
+
+ LASSERT(!logcookie ||
+ !(llog_cookie_get_flags(logcookie) & LLOG_COOKIE_REPLAY) ||
+ llog_cookie_get_flags(logcookie) & LLOG_COOKIE_REPLAY_NEW);
+
if (!create) {
if (loghandle)
down_write(&loghandle->lgh_lock);
loghandle = cathandle->u.chd.chd_current_log;
if (loghandle) {
struct llog_log_hdr *llh = loghandle->lgh_hdr;
- if (loghandle->lgh_last_idx < (sizeof(llh->llh_bitmap)*8) - 1) {
+ if (loghandle->lgh_last_idx < (sizeof(llh->llh_bitmap)*8) - 1 &&
+ (!logcookie ||
+ !(llog_cookie_get_flags(logcookie) & LLOG_COOKIE_REPLAY) ||
+ EQ_LOGID(loghandle->lgh_id, logcookie->lgc_lgl))) {
down_write(&loghandle->lgh_lock);
up_write(&cathandle->lgh_lock);
RETURN(loghandle);
}
CDEBUG(D_INODE, "creating new log\n");
- loghandle = llog_cat_new_log(cathandle);
- if (!IS_ERR(loghandle))
+ loghandle = llog_cat_new_log(cathandle, logcookie);
+ if (!IS_ERR(loghandle)) {
down_write(&loghandle->lgh_lock);
+ if (lock != NULL)
+ *lock = &loghandle->lgh_lock;
+ }
+
up_write(&cathandle->lgh_lock);
RETURN(loghandle);
}
* Assumes caller has already pushed us into the kernel context.
*/
int llog_cat_add_rec(struct llog_handle *cathandle, struct llog_rec_hdr *rec,
- struct llog_cookie *reccookie, void *buf)
+ struct llog_cookie *reccookie, void *buf,
+ struct rw_semaphore **lock, int *lock_count)
{
struct llog_handle *loghandle;
int rc;
ENTRY;
LASSERT(le32_to_cpu(rec->lrh_len) <= LLOG_CHUNK_SIZE);
- loghandle = llog_cat_current_log(cathandle, 1);
+ loghandle = llog_cat_current_log(cathandle, 1, reccookie, lock);
if (IS_ERR(loghandle))
RETURN(PTR_ERR(loghandle));
/* loghandle is already locked by llog_cat_current_log() for us */
rc = llog_write_rec(loghandle, rec, reccookie, 1, buf, -1);
- up_write(&loghandle->lgh_lock);
+ if (!lock || *lock == NULL) {
+ up_write(&loghandle->lgh_lock);
+ } else {
+ LASSERT(lock_count != NULL);
+ *lock_count += 1;
+ }
RETURN(rc);
}
int llog_catalog_add(struct llog_ctxt *ctxt, struct llog_rec_hdr *rec,
void *buf, struct llog_cookie *logcookies,
- int numcookies, void *data)
+ int numcookies, void *data,
+ struct rw_semaphore **lock, int *lock_count)
{
struct llog_handle *cathandle;
int rc;
cathandle = ctxt->loc_handle;
LASSERT(cathandle != NULL);
- rc = llog_cat_add_rec(cathandle, rec, logcookies, buf);
+ rc = llog_cat_add_rec(cathandle, rec, logcookies, buf, lock, lock_count);
if (rc != 1)
CERROR("write one catalog record failed: %d\n", rc);
RETURN(rc);
int llog_catalog_cleanup(struct llog_ctxt *ctxt)
{
- struct llog_handle *cathandle, *n, *loghandle;
- struct llog_log_hdr *llh;
- int rc, index;
+ struct llog_handle *cathandle;
ENTRY;
if (!ctxt)
return 0;
cathandle = ctxt->loc_handle;
- if (cathandle) {
- list_for_each_entry_safe(loghandle, n,
- &cathandle->u.chd.chd_head,
- u.phd.phd_entry) {
- llh = loghandle->lgh_hdr;
- if ((le32_to_cpu(llh->llh_flags) &
- LLOG_F_ZAP_WHEN_EMPTY) &&
- (le32_to_cpu(llh->llh_count) == 1)) {
- rc = llog_destroy(loghandle);
- if (rc)
- CERROR("failure destroying log during "
- "cleanup: %d\n", rc);
- LASSERT(rc == 0);
- index = loghandle->u.phd.phd_cookie.lgc_index;
- llog_free_handle(loghandle);
-
- LASSERT(index);
- llog_cat_set_first_idx(cathandle, index);
- rc = llog_cancel_rec(cathandle, index);
- if (rc == 0)
- CDEBUG(D_HA, "cancel plain log at index"
- " %u of catalog "LPX64"\n",
- index,cathandle->lgh_id.lgl_oid);
- }
- }
+ if (cathandle)
llog_cat_put(ctxt->loc_handle);
- }
+
+// OBD_FREE(ctxt, sizeof(*ctxt));
return 0;
}
EXPORT_SYMBOL(llog_catalog_cleanup);
RETURN(rc);
if (rc == 0 && reccookie) {
- reccookie->lgc_lgl = loghandle->lgh_id;
- reccookie->lgc_index = index;
+ if (llog_cookie_get_flags(reccookie) & LLOG_COOKIE_REPLAY) {
+ LASSERT(EQ_LOGID(reccookie->lgc_lgl, loghandle->lgh_id));
+ LASSERT(reccookie->lgc_index == index);
+ } else {
+ reccookie->lgc_lgl = loghandle->lgh_id;
+ reccookie->lgc_index = index;
+ llog_cookie_add_flags(reccookie, LLOG_COOKIE_REPLAY);
+ }
+
if (le32_to_cpu(rec->lrh_type) == MDS_UNLINK_REC)
reccookie->lgc_subsys = LLOG_UNLINK_ORIG_CTXT;
else if (le32_to_cpu(rec->lrh_type) == OST_SZ_REC)
static struct file *
llog_object_create_alone(struct llog_ctxt *ctxt, struct llog_logid *lgh_id)
{
- unsigned int tmpname = ll_insecure_random_int();
- char fidname[LL_FID_NAMELEN];
struct file *filp;
- struct dentry *new_child, *parent;
- void *handle;
- int rc = 0, err, namelen;
+ int rc = 0;
ENTRY;
- sprintf(fidname, "OBJECTS/%u", tmpname);
- filp = filp_open(fidname, O_CREAT | O_EXCL, 0644);
- if (IS_ERR(filp)) {
- rc = PTR_ERR(filp);
- if (rc == -EEXIST) {
- CERROR("impossible object name collision %u\n",
- tmpname);
- LBUG();
+ LASSERT(lgh_id != NULL);
+ if (lgh_id->lgl_oid) {
+ struct dentry *dchild;
+ char fidname[LL_FID_NAMELEN];
+ int fidlen = 0;
+
+ down(&ctxt->loc_objects_dir->d_inode->i_sem);
+ fidlen = ll_fid2str(fidname, lgh_id->lgl_oid, lgh_id->lgl_ogen);
+ dchild = lookup_one_len(fidname, ctxt->loc_objects_dir, fidlen);
+ if (IS_ERR(dchild)) {
+ up(&ctxt->loc_objects_dir->d_inode->i_sem);
+ RETURN((struct file *)dchild);
}
- CERROR("error creating tmp object %u: rc %d\n", tmpname, rc);
+ if (dchild->d_inode == NULL) {
+ struct dentry_params dp;
+ struct inode *inode;
+
+ dchild->d_fsdata = (void *) &dp;
+ dp.p_ptr = NULL;
+ dp.p_inum = lgh_id->lgl_oid;
+ rc = ll_vfs_create(ctxt->loc_objects_dir->d_inode,
+ dchild, S_IFREG, NULL);
+ if (dchild->d_fsdata == (void *)(unsigned long)lgh_id->lgl_oid)
+ dchild->d_fsdata = NULL;
+ if (rc) {
+ CDEBUG(D_INODE, "err during create: %d\n", rc);
+ dput(dchild);
+ up(&ctxt->loc_objects_dir->d_inode->i_sem);
+ RETURN(ERR_PTR(rc));
+ }
+ inode = dchild->d_inode;
+ LASSERT(inode->i_ino == lgh_id->lgl_oid);
+ inode->i_generation = lgh_id->lgl_ogen;
+ CDEBUG(D_HA, "recreated ino %lu with gen %u\n",
+ inode->i_ino, inode->i_generation);
+ mark_inode_dirty(inode);
+ }
+
+ mntget(ctxt->loc_lvfs_ctxt->pwdmnt);
+ filp = dentry_open(dchild, ctxt->loc_lvfs_ctxt->pwdmnt,
+ O_RDWR | O_LARGEFILE);
+ if (IS_ERR(filp)) {
+ dput(dchild);
+ up(&ctxt->loc_objects_dir->d_inode->i_sem);
+ RETURN(filp);
+ }
+ if (!S_ISREG(filp->f_dentry->d_inode->i_mode)) {
+ CERROR("%s is not a regular file!: mode = %o\n", fidname,
+ filp->f_dentry->d_inode->i_mode);
+ filp_close(filp, 0);
+ up(&ctxt->loc_objects_dir->d_inode->i_sem);
+ RETURN(ERR_PTR(-ENOENT));
+ }
+
+ up(&ctxt->loc_objects_dir->d_inode->i_sem);
RETURN(filp);
- }
- namelen = ll_fid2str(fidname, filp->f_dentry->d_inode->i_ino,
- filp->f_dentry->d_inode->i_generation);
- parent = filp->f_dentry->d_parent;
- down(&parent->d_inode->i_sem);
- new_child = lookup_one_len(fidname, parent, namelen);
- if (IS_ERR(new_child)) {
- CERROR("getting neg dentry for obj rename: %d\n", rc);
- GOTO(out_close, rc = PTR_ERR(new_child));
- }
- if (new_child->d_inode != NULL) {
- CERROR("impossible non-negative obj dentry %lu:%u!\n",
- filp->f_dentry->d_inode->i_ino,
- filp->f_dentry->d_inode->i_generation);
- LBUG();
- }
+ } else {
+ unsigned int tmpname = ll_insecure_random_int();
+ char fidname[LL_FID_NAMELEN];
+ struct dentry *new_child, *parent;
+ void *handle;
+ int err, namelen;
- handle = llog_fsfilt_start(ctxt, parent->d_inode, FSFILT_OP_RENAME, NULL);
- if (IS_ERR(handle))
- GOTO(out_dput, rc = PTR_ERR(handle));
+ sprintf(fidname, "OBJECTS/%u", tmpname);
+ filp = filp_open(fidname, O_CREAT | O_EXCL, 0644);
+ if (IS_ERR(filp)) {
+ rc = PTR_ERR(filp);
+ if (rc == -EEXIST) {
+ CERROR("impossible object name collision %u\n",
+ tmpname);
+ LBUG();
+ }
+ CERROR("error creating tmp object %u: rc %d\n", tmpname, rc);
+ RETURN(filp);
+ }
- lock_kernel();
- rc = vfs_rename(parent->d_inode, filp->f_dentry,
- parent->d_inode, new_child);
- unlock_kernel();
- if (rc)
- CERROR("error renaming new object %lu:%u: rc %d\n",
- filp->f_dentry->d_inode->i_ino,
- filp->f_dentry->d_inode->i_generation, rc);
+ namelen = ll_fid2str(fidname, filp->f_dentry->d_inode->i_ino,
+ filp->f_dentry->d_inode->i_generation);
+ parent = filp->f_dentry->d_parent;
+ down(&parent->d_inode->i_sem);
+ new_child = lookup_one_len(fidname, parent, namelen);
+ if (IS_ERR(new_child)) {
+ CERROR("getting neg dentry for obj rename: %d\n", rc);
+ GOTO(out_close, rc = PTR_ERR(new_child));
+ }
+ if (new_child->d_inode != NULL) {
+ CERROR("impossible non-negative obj dentry %lu:%u!\n",
+ filp->f_dentry->d_inode->i_ino,
+ filp->f_dentry->d_inode->i_generation);
+ LBUG();
+ }
- err = llog_fsfilt_commit(ctxt, parent->d_inode, handle, 0);
- if (!rc)
- rc = err;
+ handle = llog_fsfilt_start(ctxt, parent->d_inode, FSFILT_OP_RENAME, NULL);
+ if (IS_ERR(handle))
+ GOTO(out_dput, rc = PTR_ERR(handle));
-out_dput:
- dput(new_child);
-out_close:
- up(&parent->d_inode->i_sem);
- if (rc) {
- filp_close(filp, 0);
- filp = (struct file *)rc;
- } else {
- /* FIXME: is this group 1 is correct? */
- lgh_id->lgl_ogr = 1;
- lgh_id->lgl_oid = filp->f_dentry->d_inode->i_ino;
- lgh_id->lgl_ogen = filp->f_dentry->d_inode->i_generation;
+ lock_kernel();
+ rc = vfs_rename(parent->d_inode, filp->f_dentry,
+ parent->d_inode, new_child);
+ unlock_kernel();
+ if (rc)
+ CERROR("error renaming new object %lu:%u: rc %d\n",
+ filp->f_dentry->d_inode->i_ino,
+ filp->f_dentry->d_inode->i_generation, rc);
+
+ err = llog_fsfilt_commit(ctxt, parent->d_inode, handle, 0);
+ if (!rc)
+ rc = err;
+
+ out_dput:
+ dput(new_child);
+ out_close:
+ up(&parent->d_inode->i_sem);
+ if (rc) {
+ filp_close(filp, 0);
+ filp = ERR_PTR(rc);
+ } else {
+ /* FIXME: is this group 1 is correct? */
+ lgh_id->lgl_ogr = 1;
+ lgh_id->lgl_oid = filp->f_dentry->d_inode->i_ino;
+ lgh_id->lgl_ogen = filp->f_dentry->d_inode->i_generation;
+ }
+ RETURN(filp);
}
-
- RETURN(filp);
}
/* creates object for generic case (obd exists) */
static struct file *
llog_object_create_generic(struct llog_ctxt *ctxt, struct llog_logid *lgh_id)
{
- int rc = 0;
struct file *filp;
struct dentry *dchild;
struct obd_device *obd;
struct obdo *oa = NULL;
- int open_flags = O_RDWR | O_CREAT | O_LARGEFILE;
+ int open_flags = O_RDWR | O_LARGEFILE;
+ int rc = 0;
ENTRY;
-
+
obd = ctxt->loc_exp->exp_obd;
+ LASSERT(obd != NULL);
+
+ if (lgh_id->lgl_oid) {
+ dchild = obd_lvfs_fid2dentry(ctxt->loc_exp, lgh_id->lgl_oid,
+ lgh_id->lgl_ogen, lgh_id->lgl_ogr);
+ if (IS_ERR(dchild) == -ENOENT) {
+ OBD_ALLOC(oa, sizeof(*oa));
+ if (!oa)
+ RETURN(ERR_PTR(-ENOMEM));
+
+ oa->o_id = lgh_id->lgl_oid;
+ oa->o_generation = lgh_id->lgl_ogen;
+ oa->o_gr = lgh_id->lgl_ogr;
+ oa->o_valid = OBD_MD_FLGENER | OBD_MD_FLGROUP;
+ rc = obd_create(ctxt->loc_exp, oa, NULL, NULL);
+ if (rc) {
+ CDEBUG(D_INODE, "err during create: %d\n", rc);
+ GOTO(out_free_oa, rc);
+ }
+ CDEBUG(D_HA, "re-create log object "LPX64":0x%x:"LPX64"\n",
+ lgh_id->lgl_oid, lgh_id->lgl_ogen, lgh_id->lgl_ogr);
+
+ dchild = obd_lvfs_fid2dentry(ctxt->loc_exp, lgh_id->lgl_oid,
+ lgh_id->lgl_ogen, lgh_id->lgl_ogr);
+ } else if (IS_ERR(dchild)) {
+ CERROR("error looking up logfile "LPX64":0x%x: rc %d\n",
+ lgh_id->lgl_oid, lgh_id->lgl_ogen, rc);
+ RETURN((struct file *)dchild);
+ }
- /* this is important to work here over obd_create() as it manages
- groups and we need it. Yet another reason is that mds_obd_create()
- is fully the same as old version of this function and this helps
- us to avoid code duplicating and layering violating. */
- OBD_ALLOC(oa, sizeof(*oa));
- if (!oa)
- RETURN(ERR_PTR(-ENOMEM));
-
- oa->o_gr = FILTER_GROUP_LLOG;
- oa->o_valid = OBD_MD_FLGENER | OBD_MD_FLGROUP;
- rc = obd_create(ctxt->loc_exp, oa, NULL, NULL);
- if (rc)
+ filp = l_dentry_open(&obd->obd_lvfs_ctxt, dchild, open_flags);
+ if (IS_ERR(filp)) {
+ l_dput(dchild);
+ rc = PTR_ERR(filp);
+ CERROR("error opening logfile "LPX64"0x%x: rc %d\n",
+ lgh_id->lgl_oid, lgh_id->lgl_ogen, rc);
+ }
GOTO(out_free_oa, rc);
-
- dchild = obd_lvfs_fid2dentry(ctxt->loc_exp, oa->o_id,
- oa->o_generation, oa->o_gr);
- if (IS_ERR(dchild))
- GOTO(out_free_oa, rc = PTR_ERR(dchild));
+ } else {
+ /* this is important to work here over obd_create() as it manages
+ groups and we need it. Yet another reason is that mds_obd_create()
+ is fully the same as old version of this function and this helps
+ us to avoid code duplicating and layering violating. */
+ OBD_ALLOC(oa, sizeof(*oa));
+ if (!oa)
+ RETURN(ERR_PTR(-ENOMEM));
- filp = l_dentry_open(&obd->obd_lvfs_ctxt, dchild,
- open_flags);
- if (IS_ERR(filp)) {
- l_dput(dchild);
- GOTO(out_free_oa, rc = PTR_ERR(filp));
+ oa->o_gr = FILTER_GROUP_LLOG;
+ oa->o_valid = OBD_MD_FLGENER | OBD_MD_FLGROUP;
+ rc = obd_create(ctxt->loc_exp, oa, NULL, NULL);
+ if (rc)
+ GOTO(out_free_oa, rc);
+
+ dchild = obd_lvfs_fid2dentry(ctxt->loc_exp, oa->o_id,
+ oa->o_generation, oa->o_gr);
+ if (IS_ERR(dchild))
+ GOTO(out_free_oa, rc = PTR_ERR(dchild));
+
+ filp = l_dentry_open(&obd->obd_lvfs_ctxt, dchild,
+ open_flags);
+ if (IS_ERR(filp)) {
+ l_dput(dchild);
+ GOTO(out_free_oa, rc = PTR_ERR(filp));
+ }
+
+ /* group 1 is not longer valid, we use the group which is set
+ by obd_create()->mds_obd_create(). */
+ lgh_id->lgl_ogr = oa->o_gr;
+ lgh_id->lgl_oid = oa->o_id;
+ lgh_id->lgl_ogen = oa->o_generation;
}
- /* group 1 is not longer valid, we use the group which is set
- by obd_create()->mds_obd_create(). */
- lgh_id->lgl_ogr = oa->o_gr;
- lgh_id->lgl_oid = oa->o_id;
- lgh_id->lgl_ogen = oa->o_generation;
- OBD_FREE(oa, sizeof(*oa));
- RETURN(filp);
-
out_free_oa:
- OBD_FREE(oa, sizeof(*oa));
- RETURN(ERR_PTR(rc));
+ if (rc)
+ filp = ERR_PTR(rc);
+ if (oa)
+ OBD_FREE(oa, sizeof(*oa));
+ RETURN(filp);
}
static struct file *
push_ctxt(&saved, ctxt->loc_lvfs_ctxt, NULL);
if (logid != NULL) {
- char logname[LL_FID_NAMELEN + 10] = "OBJECTS/";
- char fidname[LL_FID_NAMELEN];
- ll_fid2str(fidname, logid->lgl_oid, logid->lgl_ogen);
- strcat(logname, fidname);
-
- handle->lgh_file = filp_open(logname, O_RDWR | O_LARGEFILE, 0644);
+ handle->lgh_file = llog_object_create(ctxt, logid);
if (IS_ERR(handle->lgh_file)) {
- CERROR("cannot open %s file, error = %ld\n",
- logname, PTR_ERR(handle->lgh_file));
+ CERROR("cannot create/open llog object "LPX64":%x "
+ "error = %ld", logid->lgl_oid, logid->lgl_ogen,
+ PTR_ERR(handle->lgh_file));
GOTO(cleanup, rc = PTR_ERR(handle->lgh_file));
}
- if (!S_ISREG(handle->lgh_file->f_dentry->d_inode->i_mode)) {
- CERROR("%s is not a regular file!: mode = %o\n", logname,
- handle->lgh_file->f_dentry->d_inode->i_mode);
- GOTO(cleanup, rc = -ENOENT);
- }
- LASSERT(handle->lgh_file->f_dentry->d_parent == ctxt->loc_objects_dir);
handle->lgh_id = *logid;
+
} else if (name) {
handle->lgh_file = llog_filp_open(name, open_flags, 0644);
if (IS_ERR(handle->lgh_file)) {
rc = llog_add_link_object(ctxt, handle->lgh_id, handle->lgh_file->f_dentry);
if (rc)
GOTO(cleanup, rc);
+
} else {
handle->lgh_file = llog_object_create(ctxt, &handle->lgh_id);
if (IS_ERR(handle->lgh_file)) {
{
struct obd_device *obddev = class_exp2obd(exp);
struct ptlrpc_request *req = *request;
- int rc, size[2] = {sizeof(struct mds_rec_unlink), data->namelen + 1};
+ int rc, size[3] = {sizeof(struct mds_rec_unlink),
+ data->namelen + 1,
+ obddev->u.cli.cl_max_mds_cookiesize};
ENTRY;
LASSERT(req == NULL);
- req = ptlrpc_prep_req(class_exp2cliimp(exp), MDS_REINT, 2, size,
+ req = ptlrpc_prep_req(class_exp2cliimp(exp), MDS_REINT, 3, size,
NULL);
if (req == NULL)
RETURN(-ENOMEM);
{
struct obd_device *obd = exp->exp_obd;
struct ptlrpc_request *req;
- int rc, size[3] = {sizeof(struct mds_rec_rename), oldlen + 1,
- newlen + 1};
+ int rc, size[4] = {sizeof(struct mds_rec_rename), oldlen + 1,
+ newlen + 1, obd->u.cli.cl_max_mds_cookiesize};
ENTRY;
- req = ptlrpc_prep_req(class_exp2cliimp(exp), MDS_REINT, 3, size,
+ req = ptlrpc_prep_req(class_exp2cliimp(exp), MDS_REINT, 4, size,
NULL);
if (req == NULL)
RETURN(-ENOMEM);
size[0] = sizeof(struct mds_body);
size[1] = obd->u.cli.cl_max_mds_easize;
- req->rq_replen = lustre_msg_size(2, size);
+ size[2] = obd->u.cli.cl_max_mds_cookiesize;
+ req->rq_replen = lustre_msg_size(3, size);
rc = mdc_reint(req, obd->u.cli.cl_rpc_lock, LUSTRE_IMP_FULL);
*request = req;
struct obd_client_handle *och, struct ptlrpc_request **request)
{
struct obd_device *obd = class_exp2obd(exp);
- int reqsize = sizeof(struct mds_body);
+ int reqsize[2] = {sizeof(struct mds_body),
+ obd->u.cli.cl_max_mds_cookiesize};
int rc, repsize[3] = {sizeof(struct mds_body),
obd->u.cli.cl_max_mds_easize,
obd->u.cli.cl_max_mds_cookiesize};
struct l_wait_info lwi;
ENTRY;
- req = ptlrpc_prep_req(class_exp2cliimp(exp), MDS_CLOSE, 1, &reqsize,
+ req = ptlrpc_prep_req(class_exp2cliimp(exp), MDS_CLOSE, 2, reqsize,
NULL);
if (req == NULL)
GOTO(out, rc = -ENOMEM);
OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_NET, 0);
- if (opc == REINT_UNLINK)
+ if (opc == REINT_UNLINK || opc == REINT_RENAME)
bufcount = 3;
- else if (opc == REINT_OPEN || opc == REINT_RENAME)
+ else if (opc == REINT_OPEN)
bufcount = 2;
else
bufcount = 1;
LDLM_DEBUG(lock, "intent policy, opc: %s", ldlm_it2str(it->opc));
- rc = lustre_pack_reply(req, 3, repsize, NULL);
+ rc = lustre_pack_reply(req, it->opc == IT_UNLINK ? 4 : 3, repsize, NULL);
if (rc)
RETURN(req->rq_status = rc);
struct inode *parent_inode = mds->mds_objects_dir->d_inode;
unsigned int tmpname = ll_insecure_random_int();
struct file *filp;
- struct dentry *new_child;
+ struct dentry *dchild;
struct lvfs_run_ctxt saved;
char fidname[LL_FID_NAMELEN];
void *handle;
int rc = 0, err, namelen;
ENTRY;
+ if (oa->o_id) {
+ push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
+
+ namelen = ll_fid2str(fidname, oa->o_id, oa->o_generation);
+
+ down(&parent_inode->i_sem);
+ dchild = lookup_one_len(fidname, mds->mds_objects_dir, namelen);
+ if (IS_ERR(dchild)) {
+ up(&parent_inode->i_sem);
+ GOTO(out_pop, rc = PTR_ERR(dchild));
+ }
+ if (dchild->d_inode == NULL) {
+ struct dentry_params dp;
+ struct inode *inode;
+
+ dchild->d_fsdata = (void *) &dp;
+ dp.p_ptr = NULL;
+ dp.p_inum = oa->o_id;
+ rc = ll_vfs_create(parent_inode, dchild, S_IFREG, NULL);
+ if (dchild->d_fsdata == (void *)(unsigned long)oa->o_id)
+ dchild->d_fsdata = NULL;
+ if (rc) {
+ CDEBUG(D_INODE, "err during create: %d\n", rc);
+ dput(dchild);
+ up(&parent_inode->i_sem);
+ GOTO(out_pop, rc);
+ }
+ inode = dchild->d_inode;
+ LASSERT(inode->i_ino == oa->o_id);
+ inode->i_generation = oa->o_generation;
+ CDEBUG(D_HA, "recreated ino %lu with gen %u\n",
+ inode->i_ino, inode->i_generation);
+ mark_inode_dirty(inode);
+ } else {
+ CWARN("it should be here!\n");
+ }
+ GOTO(out_pop, rc);
+ }
+
push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
sprintf(fidname, "OBJECTS/%u", tmpname);
namelen = ll_fid2str(fidname, oa->o_id, oa->o_generation);
down(&parent_inode->i_sem);
- new_child = lookup_one_len(fidname, mds->mds_objects_dir, namelen);
+ dchild = lookup_one_len(fidname, mds->mds_objects_dir, namelen);
- if (IS_ERR(new_child)) {
+ if (IS_ERR(dchild)) {
CERROR("getting neg dentry for obj rename: %d\n", rc);
- GOTO(out_close, rc = PTR_ERR(new_child));
+ GOTO(out_close, rc = PTR_ERR(dchild));
}
- if (new_child->d_inode != NULL) {
+ if (dchild->d_inode != NULL) {
CERROR("impossible non-negative obj dentry " LPU64":%u!\n",
oa->o_id, oa->o_generation);
LBUG();
lock_kernel();
rc = vfs_rename(mds->mds_objects_dir->d_inode, filp->f_dentry,
- mds->mds_objects_dir->d_inode, new_child);
+ mds->mds_objects_dir->d_inode, dchild);
unlock_kernel();
if (rc)
CERROR("error renaming new object "LPU64":%u: rc %d\n",
else if (!rc)
rc = err;
out_dput:
- dput(new_child);
+ dput(dchild);
out_close:
up(&parent_inode->i_sem);
err = filp_close(filp, 0);
/* mds/mds_log.c */
int mds_log_op_unlink(struct obd_device *obd, struct inode *inode,
struct lov_mds_md *lmm, int lmm_size,
- struct llog_cookie *logcookies, int cookies_size);
+ struct llog_cookie *logcookies, int cookies_size,
+ struct llog_create_locks **res);
int mds_llog_init(struct obd_device *obd, struct obd_llogs *,
struct obd_device *tgt, int count, struct llog_catid *logid);
int mds_llog_finish(struct obd_device *obd, struct obd_llogs *, int count);
static int mds_llog_origin_add(struct llog_ctxt *ctxt, struct llog_rec_hdr *rec,
void *buf, struct llog_cookie *logcookies,
- int numcookies, void *data)
+ int numcookies, void *data,
+ struct rw_semaphore **lock, int *lock_count)
{
struct obd_device *obd = ctxt->loc_obd;
struct obd_device *lov_obd = obd->u.mds.mds_osc_obd;
ENTRY;
lctxt = llog_get_context(&lov_obd->obd_llogs, ctxt->loc_idx);
- rc = llog_add(lctxt, rec, buf, logcookies, numcookies, data);
+ rc = llog_add(lctxt, rec, buf, logcookies, numcookies, data,
+ lock, lock_count);
RETURN(rc);
}
int mds_log_op_unlink(struct obd_device *obd, struct inode *inode,
struct lov_mds_md *lmm, int lmm_size,
- struct llog_cookie *logcookies, int cookies_size)
+ struct llog_cookie *logcookies, int cookies_size,
+ struct llog_create_locks **res)
{
struct mds_obd *mds = &obd->u.mds;
struct lov_stripe_md *lsm = NULL;
struct llog_ctxt *ctxt;
- int rc;
+ struct llog_create_locks *lcl;
+ int rc, size, offset = offsetof(struct llog_create_locks, lcl_locks);
+ int lock_count = 0;
ENTRY;
if (IS_ERR(mds->mds_osc_obd))
if (rc < 0)
RETURN(rc);
+ if (res != NULL) {
+ size = offset +
+ sizeof(struct rw_semaphore *) * lsm->lsm_stripe_count;
+ OBD_ALLOC(lcl, size);
+ if (lcl == NULL)
+ RETURN(-ENOMEM);
+
+ lcl->lcl_count = lsm->lsm_stripe_count;
+ *res = lcl;
+ }
+
ctxt = llog_get_context(&obd->obd_llogs, LLOG_UNLINK_ORIG_CTXT);
rc = llog_add(ctxt, NULL, lsm, logcookies,
- cookies_size / sizeof(struct llog_cookie), NULL);
+ cookies_size / sizeof(struct llog_cookie), NULL,
+ res ? &lcl->lcl_locks[0] : NULL, &lock_count);
obd_free_memmd(mds->mds_osc_exp, &lsm);
+ if (res && (rc <= 0 || lock_count == 0)) {
+ OBD_FREE(lcl, size);
+ *res = NULL;
+ }
+
RETURN(rc);
}
struct mds_body *request_body = NULL, *reply_body = NULL;
struct dentry_params dp;
struct iattr iattr = { 0 };
+ struct llog_create_locks *lcl = NULL;
ENTRY;
if (req && req->rq_reqmsg != NULL)
GOTO(cleanup, rc);
}
- if (req != NULL && req->rq_repmsg != NULL &&
- (reply_body->valid & OBD_MD_FLEASIZE) &&
- mds_log_op_unlink(obd, pending_child->d_inode, lmm,
- req->rq_repmsg->buflens[1],
- lustre_msg_buf(req->rq_repmsg, 2, 0),
- req->rq_repmsg->buflens[2]) > 0) {
- reply_body->valid |= OBD_MD_FLCOOKIE;
- }
-
pending_child->d_fsdata = (void *) &dp;
dp.p_inum = 0;
dp.p_ptr = req;
if (rc)
CERROR("error unlinking orphan %s: rc %d\n",fidname,rc);
+ if (req != NULL && req->rq_repmsg != NULL &&
+ (reply_body->valid & OBD_MD_FLEASIZE) &&
+ mds_log_op_unlink(obd, pending_child->d_inode,
+ lmm, req->rq_repmsg->buflens[1],
+ lustre_msg_buf(req->rq_repmsg, 2, 0),
+ req->rq_repmsg->buflens[2], &lcl) > 0) {
+ reply_body->valid |= OBD_MD_FLCOOKIE;
+ }
+
goto out; /* Don't bother updating attrs on unlinked inode */
}
switch (cleanup_phase) {
case 2:
+ if (lcl != NULL)
+ ptlrpc_save_llog_lock(req, lcl);
dput(pending_child);
case 1:
up(&pending_dir->i_sem);
MDS_CHECK_RESENT(req, mds_reconstruct_generic(req));
}
+ if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) {
+ DEBUG_REQ(D_HA, req, "close replay\n");
+ memcpy(lustre_msg_buf(req->rq_repmsg, 2, 0),
+ lustre_msg_buf(req->rq_reqmsg, 1, 0),
+ req->rq_repmsg->buflens[2]);
+ }
+
body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_mds_body);
if (body == NULL) {
CERROR("Can't unpack body\n");
struct list_head *tmp;
struct ptlrpc_reply_state *oldrep;
struct ptlrpc_service *svc;
+ struct llog_create_locks *lcl;
unsigned long flags;
char str[PTL_NALFMT_SIZE];
int i;
oldrep->rs_modes[i]);
oldrep->rs_nlocks = 0;
+ lcl = oldrep->rs_llog_locks;
+ oldrep->rs_llog_locks = NULL;
+ if (lcl != NULL)
+ ptlrpc_save_llog_lock(req, lcl);
+
DEBUG_REQ(D_HA, req, "stole locks for");
ptlrpc_schedule_difficult_reply (oldrep);
struct lustre_handle child_reuse_lockh = {0};
#endif
struct lustre_handle * slave_lockh = NULL;
+ struct llog_create_locks *lcl = NULL;
char fidname[LL_FID_NAMELEN];
void *handle = NULL;
int rc = 0, log_unlink = 0, cleanup_phase = 0;
MDS_CHECK_RESENT(req, mds_reconstruct_generic(req));
+ if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) {
+ DEBUG_REQ(D_HA, req, "unlink replay\n");
+ memcpy(lustre_msg_buf(req->rq_repmsg, offset + 2, 0),
+ lustre_msg_buf(req->rq_reqmsg, 2, 0),
+ req->rq_repmsg->buflens[offset + 2]);
+ }
+
if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK))
GOTO(cleanup, rc = -ENOENT);
lustre_msg_buf(req->rq_repmsg, offset + 1, 0),
req->rq_repmsg->buflens[offset + 1],
lustre_msg_buf(req->rq_repmsg, offset + 2, 0),
- req->rq_repmsg->buflens[offset + 2]) > 0)
+ req->rq_repmsg->buflens[offset + 2], &lcl) > 0)
body->valid |= OBD_MD_FLCOOKIE;
break;
}
if (!rc)
(void)obd_set_info(mds->mds_osc_exp, strlen("unlinked"),
"unlinked", 0, NULL);
+ if (lcl != NULL)
+ ptlrpc_save_llog_lock(req, lcl);
case 3: /* child ino-reuse lock */
#if 0
if (rc && body != NULL) {
struct mds_obd *mds = mds_req2mds(req);
struct lustre_handle dlm_handles[7] = {{0},{0},{0},{0},{0},{0},{0}};
struct mds_body *body = NULL;
- int rc = 0, lock_count = 3;
+ struct llog_create_locks *lcl = NULL;
+ int rc = 0, log_unlink = 0, lock_count = 3;
int cleanup_phase = 0;
void *handle = NULL;
ENTRY;
MDS_CHECK_RESENT(req, mds_reconstruct_generic(req));
+ if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) {
+ DEBUG_REQ(D_HA, req, "rename replay\n");
+ memcpy(lustre_msg_buf(req->rq_repmsg, 2, 0),
+ lustre_msg_buf(req->rq_reqmsg, 3, 0),
+ req->rq_repmsg->buflens[2]);
+ }
+
if (rec->ur_namelen == 1) {
rc = mds_reint_rename_create_name(rec, offset, req);
RETURN(rc);
body->valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
OBD_MD_FLATIME | OBD_MD_FLMTIME);
} else {
- /* XXX need log unlink? */
+ log_unlink = 1;
}
}
rc = vfs_rename(de_srcdir->d_inode, de_old, de_tgtdir->d_inode, de_new);
unlock_kernel();
+ if (!rc && log_unlink) {
+ if (mds_log_op_unlink(obd, de_tgtdir->d_inode,
+ lustre_msg_buf(req->rq_repmsg, 1, 0),
+ req->rq_repmsg->buflens[1],
+ lustre_msg_buf(req->rq_repmsg, 2, 0),
+ req->rq_repmsg->buflens[2], &lcl) > 0) {
+ body->valid |= OBD_MD_FLCOOKIE;
+ }
+ }
+
GOTO(cleanup, rc);
cleanup:
rc = mds_finish_transno(mds, de_tgtdir ? de_tgtdir->d_inode : NULL,
if (dlm_handles[6].cookie != 0)
ldlm_lock_decref(&(dlm_handles[6]), LCK_PW);
#endif
+ if (lcl != NULL)
+ ptlrpc_save_llog_lock(req, lcl);
+
if (rc) {
if (lock_count == 4)
ldlm_lock_decref(&(dlm_handles[3]), LCK_EX);
if (logcookies == NULL)
rc = -ENOMEM;
else if (mds_log_op_unlink(obd, inode, lmm,lmm_size,logcookies,
- mds->mds_max_cookiesize) > 0)
+ mds->mds_max_cookiesize, NULL) > 0)
log_unlink = 1;
}
err = fsfilt_commit(obd, mds->mds_sb, pending_dir, handle, 0);
{
struct llog_logid_rec *lir = (struct llog_logid_rec *)rec;
struct llog_handle *loghandle;
- struct llog_log_hdr *llh;
- int rc, index;
+ int rc;
ENTRY;
if (rec->lrh_type != LLOG_LOGID_MAGIC) {
CERROR("invalid record in catalog\n");
RETURN(-EINVAL);
}
- CWARN("processing log "LPX64":%x at index %u of catalog "LPX64"\n",
+ CDEBUG(D_HA, "processing log "LPX64":%x at index %u of catalog "LPX64"\n",
lir->lid_id.lgl_oid, lir->lid_id.lgl_ogen,
rec->lrh_index, cathandle->lgh_id.lgl_oid);
RETURN(rc);
}
- llh = loghandle->lgh_hdr;
- if ((llh->llh_flags & LLOG_F_ZAP_WHEN_EMPTY) &&
- (llh->llh_count == 1)) {
- rc = llog_destroy(loghandle);
- if (rc)
- CERROR("failure destroying log in postsetup: %d\n", rc);
- LASSERT(rc == 0);
-
- index = loghandle->u.phd.phd_cookie.lgc_index;
- llog_free_handle(loghandle);
-
- LASSERT(index);
- llog_cat_set_first_idx(cathandle, index);
- rc = llog_cancel_rec(cathandle, index);
- if (rc == 0)
- CWARN("cancel log "LPX64":%x at index %u of catalog "
- LPX64"\n", lir->lid_id.lgl_oid,
- lir->lid_id.lgl_ogen, rec->lrh_index,
- cathandle->lgh_id.lgl_oid);
- }
+ if (cathandle->lgh_last_idx == loghandle->u.phd.phd_cookie.lgc_index)
+ cathandle->u.chd.chd_current_log = loghandle;
RETURN(rc);
}
}
EXPORT_SYMBOL(llog_obd_origin_setup);
-int llog_obd_origin_cleanup(struct llog_ctxt *ctxt)
-{
- struct llog_handle *cathandle, *n, *loghandle;
- struct llog_log_hdr *llh;
- int rc, index;
- ENTRY;
-
- if (!ctxt)
- return 0;
-
- cathandle = ctxt->loc_handle;
- if (cathandle) {
- list_for_each_entry_safe(loghandle, n,
- &cathandle->u.chd.chd_head,
- u.phd.phd_entry) {
- llh = loghandle->lgh_hdr;
- if ((llh->llh_flags &
- LLOG_F_ZAP_WHEN_EMPTY) &&
- (llh->llh_count == 1)) {
- rc = llog_destroy(loghandle);
- if (rc)
- CERROR("failure destroying log during "
- "cleanup: %d\n", rc);
- LASSERT(rc == 0);
-
- index = loghandle->u.phd.phd_cookie.lgc_index;
- llog_free_handle(loghandle);
-
- LASSERT(index);
- llog_cat_set_first_idx(cathandle, index);
- rc = llog_cancel_rec(cathandle, index);
- if (rc == 0)
- CDEBUG(D_HA, "cancel plain log at index"
- " %u of catalog "LPX64"\n",
- index,cathandle->lgh_id.lgl_oid);
- }
- }
- llog_cat_put(ctxt->loc_handle);
- }
- return 0;
-}
-EXPORT_SYMBOL(llog_obd_origin_cleanup);
-
-/* add for obdfilter/sz and mds/unlink */
-int llog_obd_origin_add(struct llog_ctxt *ctxt,
- struct llog_rec_hdr *rec, struct lov_stripe_md *lsm,
- struct llog_cookie *logcookies, int numcookies)
-{
- struct llog_handle *cathandle;
- int rc;
- ENTRY;
-
- cathandle = ctxt->loc_handle;
- LASSERT(cathandle != NULL);
- rc = llog_cat_add_rec(cathandle, rec, logcookies, NULL);
- if (rc != 1)
- CERROR("write one catalog record failed: %d\n", rc);
- RETURN(rc);
-}
-EXPORT_SYMBOL(llog_obd_origin_add);
-
int obd_llog_cat_initialize(struct obd_device *obd, struct obd_llogs *llogs,
int count, char *name)
{
cat_logid = cath->lgh_id;
CWARN("4b: write 1 record into the catalog\n");
- rc = llog_cat_add_rec(cath, &lmr.lmr_hdr, &cookie, NULL);
+ rc = llog_cat_add_rec(cath, &lmr.lmr_hdr, &cookie, NULL, NULL, NULL);
if (rc != 1) {
CERROR("4b: write 1 catalog record failed at: %d\n", rc);
GOTO(out, rc);
CWARN("4d: write 40,000 more log records\n");
for (i = 0; i < 40000; i++) {
- rc = llog_cat_add_rec(cath, &lmr.lmr_hdr, NULL, NULL);
+ rc = llog_cat_add_rec(cath, &lmr.lmr_hdr, NULL, NULL, NULL, NULL);
if (rc) {
CERROR("4d: write 40000 records failed at #%d: %d\n",
i + 1, rc);
for (i = 0; i < 5; i++) {
rec.lrh_len = buflen;
rec.lrh_type = OBD_CFG_REC;
- rc = llog_cat_add_rec(cath, &rec, NULL, buf);
+ rc = llog_cat_add_rec(cath, &rec, NULL, buf, NULL, NULL);
if (rc) {
CERROR("4e: write 5 records failed at #%d: %d\n",
i + 1, rc);
}
CWARN("5d: add 1 record to the log with many canceled empty pages\n");
- rc = llog_cat_add_rec(llh, &lmr.lmr_hdr, NULL, NULL);
+ rc = llog_cat_add_rec(llh, &lmr.lmr_hdr, NULL, NULL, NULL, NULL);
if (rc) {
CERROR("5d: add record to the log with many canceled empty\
pages failed\n");
lsc->lsc_fid = *mds_fid;
lsc->lsc_io_epoch = io_epoch;
- rc = llog_cat_add_rec(cathandle, &lsc->lsc_hdr, logcookie, NULL);
+ rc = llog_cat_add_rec(cathandle, &lsc->lsc_hdr, logcookie,
+ NULL, NULL, NULL);
OBD_FREE(lsc, sizeof(*lsc));
if (rc > 0) {
lgr->lgr_hdr.lrh_len = lgr->lgr_tail.lrt_len = sizeof(*lgr);
lgr->lgr_hdr.lrh_type = LLOG_GEN_REC;
lgr->lgr_gen = ctxt->loc_gen;
- rc = llog_add(ctxt, &lgr->lgr_hdr, NULL, NULL, 1, NULL);
+ rc = llog_add(ctxt, &lgr->lgr_hdr, NULL, NULL, 1, NULL, NULL, NULL);
OBD_FREE(lgr, sizeof(*lgr));
if (rc != 1)
RETURN(rc);
/* service.c */
EXPORT_SYMBOL(ptlrpc_save_lock);
+EXPORT_SYMBOL(ptlrpc_save_llog_lock);
EXPORT_SYMBOL(ptlrpc_schedule_difficult_reply);
EXPORT_SYMBOL(ptlrpc_commit_replies);
EXPORT_SYMBOL(ptlrpc_init_svc);
#include <linux/obd_support.h>
#include <linux/obd_class.h>
#include <linux/lustre_net.h>
+#include <linux/lustre_log.h>
#include <portals/types.h>
#include "ptlrpc_internal.h"
}
void
+ptlrpc_save_llog_lock (struct ptlrpc_request *req,
+ struct llog_create_locks *lcl)
+{
+ struct ptlrpc_reply_state *rs = req->rq_reply_state;
+ LASSERT (rs != NULL);
+ LASSERT (rs->rs_llog_locks == NULL);
+
+ rs->rs_llog_locks = lcl;
+}
+
+void
ptlrpc_save_lock (struct ptlrpc_request *req,
struct lustre_handle *lock, int mode)
{
list_for_each_safe (tmp, nxt, &obd->obd_uncommitted_replies) {
struct ptlrpc_reply_state *rs =
list_entry (tmp, struct ptlrpc_reply_state, rs_obd_list);
+ struct llog_create_locks *lcl = rs->rs_llog_locks;
+ rs->rs_llog_locks = NULL;
LASSERT (rs->rs_difficult);
if (rs->rs_transno <= obd->obd_last_committed) {
list_del_init (&rs->rs_obd_list);
ptlrpc_schedule_difficult_reply (rs);
spin_unlock (&svc->srv_lock);
+
+ if (lcl != NULL)
+ llog_create_lock_free(lcl);
}
}
unsigned long flags;
struct obd_export *exp;
struct obd_device *obd;
+ struct llog_create_locks *lcl;
int nlocks;
int been_handled;
char str[PTL_NALFMT_SIZE];
nlocks = rs->rs_nlocks; /* atomic "steal", but */
rs->rs_nlocks = 0; /* locks still on rs_locks! */
+ lcl = rs->rs_llog_locks;
+ rs->rs_llog_locks = NULL;
+
if (nlocks == 0 && !been_handled) {
/* If we see this, we should already have seen the warning
* in mds_steal_ack_locks() */
}
if ((!been_handled && rs->rs_on_net) ||
- nlocks > 0) {
+ nlocks > 0 || lcl != NULL) {
spin_unlock_irqrestore(&svc->srv_lock, flags);
if (!been_handled && rs->rs_on_net) {
ldlm_lock_decref(&rs->rs_locks[nlocks],
rs->rs_modes[nlocks]);
+ if (lcl != NULL)
+ llog_create_lock_free(lcl);
+
spin_lock_irqsave(&svc->srv_lock, flags);
}
llr->llr_pfid.generation = parent->i_generation;
llr->llr_pfid.f_type = parent->i_mode & S_IFMT;
- rc = llog_add(ctxt, &llr->llr_hdr, NULL, logcookie, 1, NULL);
+ rc = llog_add(ctxt, &llr->llr_hdr, NULL, logcookie, 1,
+ NULL, NULL, NULL);
if (rc != 1) {
CERROR("failed at llog_add: %d\n", rc);
GOTO(out, rc);
lgr->lgr_hdr.lrh_len = lgr->lgr_tail.lrt_len = sizeof(*lgr);
lgr->lgr_hdr.lrh_type = LLOG_GEN_REC;
lgr->lgr_gen = ctxt->loc_gen;
- rc = llog_add(ctxt, &lgr->lgr_hdr, NULL, NULL, 1, NULL);
+ rc = llog_add(ctxt, &lgr->lgr_hdr, NULL, NULL, 1,
+ NULL, NULL, NULL);
OBD_FREE(lgr, sizeof(*lgr));
if (rc != 1)
RETURN(rc);
rec.lrh_len = size_round(data_size);
rec.lrh_type = SMFS_UPDATE_REC;
- rc = llog_add(sinfo->smsi_rec_log, &rec, data, NULL, 0, NULL);
+ rc = llog_add(sinfo->smsi_rec_log, &rec, data, NULL, 0, NULL, NULL, NULL);
if (rc != 1) {
CERROR("error adding kml rec: %d\n", rc);
RETURN(-EINVAL);
}
run_test 48 "Don't lose transno when client is evicted (2525)"
+# b=3550 - replay of unlink
+test_49() {
+ replay_barrier mds
+ createmany -o $DIR/$tfile-%d 400 || return 1
+ unlinkmany $DIR/$tfile-%d 0 400 || return 2
+ fail mds
+ $CHECKSTAT -t file $DIR/$tfile-* && return 3 || true
+}
+run_test 49 "re-write records to llog as written during fail"
+
equals_msg test complete, cleaning up
$CLEANUP