this for getstatus explicitly.
Store an xid in last_xid and not last_rcvd for reply packets. Mike please
verify this. I think it will help resolve strangeness I have observed in
the cleanup of expired requests on the clients, but that still needs some
investigation.
Add a spinlock to protect 64-bit increment of mds_last_rcvd, and also
make sure we only use each transaction value a single time. This
would probably be a very rare issue for recovery, but may as well get
it now.
/* lib/mds_updates.c */
void mds_unpack_body(struct mds_body *b);
+void mds_unpack_fid(struct ll_fid *fid);
+void mds_pack_fid(struct ll_fid *fid);
void mds_pack_req_body(struct ptlrpc_request *);
void mds_pack_rep_body(struct ptlrpc_request *);
int mds_update_unpack(struct ptlrpc_request *, int offset,
static inline int mds_fs_setattr(struct mds_obd *mds, struct dentry *dentry,
void *handle, struct iattr *iattr)
{
+ int rc;
/*
* NOTE: we probably don't need to take i_sem here when changing
* ATTR_SIZE because the MDS never needs to truncate a file.
* stored on the MDS are entirely sparse (no data blocks).
* If we do need to get it, we can do it here.
*/
- return mds->mds_fsops->fs_setattr(dentry, handle, iattr);
+ lock_kernel();
+ rc = mds->mds_fsops->fs_setattr(dentry, handle, iattr);
+ unlock_kernel();
+
+ return rc;
}
static inline int mds_fs_set_md(struct mds_obd *mds, struct inode *inode,
struct mds_fs_operations *mds_fsops;
int mds_max_mdsize;
struct file *mds_rcvd_filp;
+ spinlock_t mds_last_lock;
__u64 mds_last_committed;
__u64 mds_last_rcvd;
__u64 mds_mount_count;
{
struct mds_obd *mds = mds_req2mds(req);
struct mds_body *body;
- struct mds_export_data *med = &req->rq_export->exp_mds_data;
int rc, size = sizeof(*body);
ENTRY;
RETURN(0);
}
- body = lustre_msg_buf(req->rq_reqmsg, 0);
- mds_unpack_body(body);
-
/* Anything we need to do here with the client's trans no or so? */
body = lustre_msg_buf(req->rq_repmsg, 0);
memcpy(&body->fid1, &mds->mds_rootfid, sizeof(body->fid1));
- LASSERT(med->med_mcd);
-
- /* mcd_last_xid is is stored in little endian on the disk and
- mds_pack_rep_body converts it to network order */
- req->rq_repmsg->last_xid = le32_to_cpu(med->med_mcd->mcd_last_xid);
- mds_pack_rep_body(req);
+ /* the last_committed and last_xid fields are filled in for all
+ * replies already - no need to do so here also.
+ */
RETURN(0);
}
EXIT;
if (!rc) {
+ struct mds_export_data *med = &req->rq_export->exp_mds_data;
struct mds_obd *mds = mds_req2mds(req);
- req->rq_repmsg->last_xid = HTON__u64(mds->mds_last_rcvd);
+
+ req->rq_repmsg->last_xid =
+ HTON__u64(le64_to_cpu(med->med_mcd->mcd_last_xid));
req->rq_repmsg->last_committed =
HTON__u64(mds->mds_last_committed);
- CDEBUG(D_INFO, "last_rcvd %Lu, last_committed %Lu, xid %d\n",
+ CDEBUG(D_INFO, "last_rcvd ~%Lu, last_committed %Lu, xid %d\n",
(unsigned long long)mds->mds_last_rcvd,
(unsigned long long)mds->mds_last_committed,
cpu_to_le32(req->rq_xid));
}
out:
- /* Still not 100% sure whether we should reply with the server
- * last_rcvd or that of this client. I'm not sure it even makes
- * a difference on a per-client basis, because last_rcvd is global
- * and we are not supposed to allow transactions while in recovery.
- */
if (rc) {
CERROR("mds: processing error %d\n", rc);
ptlrpc_error(req->rq_svc, req);
* This will alert us that we may need to do client recovery.
*
* Assumes we are already in the server filesystem context.
+ *
+ * Also assumes for mds_last_rcvd that we are not modifying it (no locking).
*/
static
int mds_update_server_data(struct mds_obd *mds)
if (!mds->mds_sb)
GOTO(err_put, rc = -ENODEV);
+ spin_lock_init(&mds->mds_last_lock);
mds->mds_max_mdsize = sizeof(struct lov_mds_md);
rc = mds_fs_setup(obddev, mnt);
if (rc) {
return journal_stop((handle_t *)handle);
}
+/* Assumes BKL is held */
static int mds_extN_setattr(struct dentry *dentry, void *handle,
struct iattr *iattr)
{
{
int rc;
- lock_kernel();
down(&inode->i_sem);
+ lock_kernel();
if (md == NULL)
rc = extN_xattr_set(handle, inode, EXTN_XATTR_INDEX_LUSTRE,
XATTR_LUSTRE_MDS_OBJID, NULL, 0, 0);
XATTR_LUSTRE_MDS_OBJID, md,
md->lmd_easize, XATTR_CREATE);
}
- up(&inode->i_sem);
unlock_kernel();
+ up(&inode->i_sem);
if (rc) {
CERROR("error adding objectid %Ld to inode %ld: %d\n",
int rc;
int size = md->lmd_easize;
- lock_kernel();
down(&inode->i_sem);
+ lock_kernel();
rc = extN_xattr_get(inode, EXTN_XATTR_INDEX_LUSTRE,
XATTR_LUSTRE_MDS_OBJID, md, size);
-
- up(&inode->i_sem);
unlock_kernel();
+ up(&inode->i_sem);
if (rc < 0) {
CDEBUG(D_INFO, "error getting EA %s from MDS inode %ld: "
{
struct mds_export_data *med = &req->rq_export->exp_mds_data;
struct mds_client_data *mcd = med->med_mcd;
+ __u64 last_rcvd;
loff_t off;
int rc;
off = MDS_LR_CLIENT + med->med_off * MDS_LR_SIZE;
- ++mds->mds_last_rcvd; /* lock this, or make it an LDLM function? */
- req->rq_repmsg->transno = HTON__u64(mds->mds_last_rcvd);
- mcd->mcd_last_rcvd = cpu_to_le64(mds->mds_last_rcvd);
+ spin_lock(&mds->mds_last_lock);
+ last_rcvd = ++mds->mds_last_rcvd;
+ spin_unlock(&mds->mds_last_lock);
+ req->rq_repmsg->transno = HTON__u64(last_rcvd);
+ mcd->mcd_last_rcvd = cpu_to_le64(last_rcvd);
mcd->mcd_mount_count = cpu_to_le64(mds->mds_mount_count);
mcd->mcd_last_xid = cpu_to_le64(req->rq_xid);
mds_fs_set_last_rcvd(mds, handle);
rc = lustre_fwrite(mds->mds_rcvd_filp, (char *)mcd, sizeof(*mcd), &off);
CDEBUG(D_INODE, "wrote trans #%Ld for client '%s' at #%d: rc = %d\n",
- mds->mds_last_rcvd, mcd->mcd_uuid, med->med_off, rc);
+ last_rcvd, mcd->mcd_uuid, med->med_off, rc);
if (rc == sizeof(*mcd))
rc = 0;
OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_SETATTR_WRITE,
de->d_inode->i_sb->s_dev);
- lock_kernel();
handle = mds_fs_start(mds, de->d_inode, MDS_FSOP_SETATTR);
if (!handle)
- GOTO(out_unlock, rc = PTR_ERR(handle));
+ GOTO(out_setattr_de, rc = PTR_ERR(handle));
+
rc = mds_fs_setattr(mds, de, handle, &rec->ur_iattr);
if (!rc)
}
EXIT;
-out_unlock:
- unlock_kernel();
out_setattr_de:
l_dput(de);
out_setattr:
rc = mds_update_last_rcvd(mds, handle, req);
if (rc) {
- CERROR("error on update_last_rcvd: rc = %d\n", rc);
+ CERROR("error on mds_update_last_rcvd: rc = %d\n", rc);
/* XXX should we abort here in case of error? */
}
err = vfs_unlink(dir, dchild);
if (err)
CERROR("failed unlink in error path: rc = %d\n", err);
+ break;
}
goto out_create_commit;