Whamcloud - gitweb
We already set last_committed and last_xid for all packets, no need to do
authoradilger <adilger>
Wed, 28 Aug 2002 23:50:06 +0000 (23:50 +0000)
committeradilger <adilger>
Wed, 28 Aug 2002 23:50:06 +0000 (23:50 +0000)
this for getstatus explicitly.

Store an xid in last_xid and not last_rcvd for reply packets.  Mike please
verify this.  I think it will help resolve strangeness I have observed in
the cleanup of expired requests on the clients, but that still needs some
investigation.

Add a spinlock to protect 64-bit increment of mds_last_rcvd, and also
make sure we only use each transaction value a single time.  This
would probably be a very rare issue for recovery, but may as well get
it now.

lustre/include/linux/lustre_mds.h
lustre/include/linux/obd.h
lustre/mds/handler.c
lustre/mds/mds_extN.c
lustre/mds/mds_reint.c

index 96ddd4f..16074a7 100644 (file)
@@ -100,6 +100,8 @@ int mds_reint_rec(struct mds_update_record *r, int offset,
 
 /* lib/mds_updates.c */
 void mds_unpack_body(struct mds_body *b);
+void mds_unpack_fid(struct ll_fid *fid);
+void mds_pack_fid(struct ll_fid *fid);
 void mds_pack_req_body(struct ptlrpc_request *);
 void mds_pack_rep_body(struct ptlrpc_request *);
 int mds_update_unpack(struct ptlrpc_request *, int offset,
@@ -225,6 +227,7 @@ static inline int mds_fs_commit(struct mds_obd *mds, struct inode *inode,
 static inline int mds_fs_setattr(struct mds_obd *mds, struct dentry *dentry,
                                  void *handle, struct iattr *iattr)
 {
+        int rc;
         /*
          * NOTE: we probably don't need to take i_sem here when changing
          *       ATTR_SIZE because the MDS never needs to truncate a file.
@@ -232,7 +235,11 @@ static inline int mds_fs_setattr(struct mds_obd *mds, struct dentry *dentry,
          *       stored on the MDS are entirely sparse (no data blocks).
          *       If we do need to get it, we can do it here.
          */
-        return mds->mds_fsops->fs_setattr(dentry, handle, iattr);
+        lock_kernel();
+        rc = mds->mds_fsops->fs_setattr(dentry, handle, iattr);
+        unlock_kernel();
+
+        return rc;
 }
 
 static inline int mds_fs_set_md(struct mds_obd *mds, struct inode *inode,
index eccf74f..eaea044 100644 (file)
@@ -123,6 +123,7 @@ struct mds_obd {
         struct mds_fs_operations *mds_fsops;
         int mds_max_mdsize;
         struct file *mds_rcvd_filp;
+        spinlock_t mds_last_lock;
         __u64 mds_last_committed;
         __u64 mds_last_rcvd;
         __u64 mds_mount_count;
index f5a0193..a6ed213 100644 (file)
@@ -340,7 +340,6 @@ static int mds_getstatus(struct ptlrpc_request *req)
 {
         struct mds_obd *mds = mds_req2mds(req);
         struct mds_body *body;
-        struct mds_export_data *med = &req->rq_export->exp_mds_data;
         int rc, size = sizeof(*body);
         ENTRY;
 
@@ -351,19 +350,13 @@ static int mds_getstatus(struct ptlrpc_request *req)
                 RETURN(0);
         }
 
-        body = lustre_msg_buf(req->rq_reqmsg, 0);
-        mds_unpack_body(body);
-
         /* Anything we need to do here with the client's trans no or so? */
         body = lustre_msg_buf(req->rq_repmsg, 0);
         memcpy(&body->fid1, &mds->mds_rootfid, sizeof(body->fid1));
 
-        LASSERT(med->med_mcd);
-
-        /* mcd_last_xid is is stored in little endian on the disk and
-           mds_pack_rep_body converts it to network order */
-        req->rq_repmsg->last_xid = le32_to_cpu(med->med_mcd->mcd_last_xid);
-        mds_pack_rep_body(req);
+        /* the last_committed and last_xid fields are filled in for all
+         * replies already - no need to do so here also.
+         */
         RETURN(0);
 }
 
@@ -978,21 +971,19 @@ int mds_handle(struct ptlrpc_request *req)
         EXIT;
 
         if (!rc) {
+                struct mds_export_data *med = &req->rq_export->exp_mds_data;
                 struct mds_obd *mds = mds_req2mds(req);
-                req->rq_repmsg->last_xid = HTON__u64(mds->mds_last_rcvd);
+
+                req->rq_repmsg->last_xid =
+                        HTON__u64(le64_to_cpu(med->med_mcd->mcd_last_xid));
                 req->rq_repmsg->last_committed =
                         HTON__u64(mds->mds_last_committed);
-                CDEBUG(D_INFO, "last_rcvd %Lu, last_committed %Lu, xid %d\n",
+                CDEBUG(D_INFO, "last_rcvd ~%Lu, last_committed %Lu, xid %d\n",
                        (unsigned long long)mds->mds_last_rcvd,
                        (unsigned long long)mds->mds_last_committed,
                        cpu_to_le32(req->rq_xid));
         }
  out:
-        /* Still not 100% sure whether we should reply with the server
-         * last_rcvd or that of this client.  I'm not sure it even makes
-         * a difference on a per-client basis, because last_rcvd is global
-         * and we are not supposed to allow transactions while in recovery.
-         */
         if (rc) {
                 CERROR("mds: processing error %d\n", rc);
                 ptlrpc_error(req->rq_svc, req);
@@ -1009,6 +1000,8 @@ int mds_handle(struct ptlrpc_request *req)
  * This will alert us that we may need to do client recovery.
  *
  * Assumes we are already in the server filesystem context.
+ *
+ * Also assumes for mds_last_rcvd that we are not modifying it (no locking).
  */
 static
 int mds_update_server_data(struct mds_obd *mds)
@@ -1085,6 +1078,7 @@ static int mds_setup(struct obd_device *obddev, obd_count len, void *buf)
         if (!mds->mds_sb)
                 GOTO(err_put, rc = -ENODEV);
 
+        spin_lock_init(&mds->mds_last_lock);
         mds->mds_max_mdsize = sizeof(struct lov_mds_md);
         rc = mds_fs_setup(obddev, mnt);
         if (rc) {
index b3a6888..757f69b 100644 (file)
@@ -96,6 +96,7 @@ static int mds_extN_commit(struct inode *inode, void *handle)
         return journal_stop((handle_t *)handle);
 }
 
+/* Assumes BKL is held */
 static int mds_extN_setattr(struct dentry *dentry, void *handle,
                             struct iattr *iattr)
 {
@@ -112,8 +113,8 @@ static int mds_extN_set_md(struct inode *inode, void *handle,
 {
         int rc;
 
-        lock_kernel();
         down(&inode->i_sem);
+        lock_kernel();
         if (md == NULL)
                 rc = extN_xattr_set(handle, inode, EXTN_XATTR_INDEX_LUSTRE,
                                     XATTR_LUSTRE_MDS_OBJID, NULL, 0, 0);
@@ -123,8 +124,8 @@ static int mds_extN_set_md(struct inode *inode, void *handle,
                                     XATTR_LUSTRE_MDS_OBJID, md,
                                     md->lmd_easize, XATTR_CREATE);
         }
-        up(&inode->i_sem);
         unlock_kernel();
+        up(&inode->i_sem);
 
         if (rc) {
                 CERROR("error adding objectid %Ld to inode %ld: %d\n",
@@ -139,13 +140,12 @@ static int mds_extN_get_md(struct inode *inode, struct lov_mds_md *md)
         int rc;
         int size = md->lmd_easize;
 
-        lock_kernel();
         down(&inode->i_sem);
+        lock_kernel();
         rc = extN_xattr_get(inode, EXTN_XATTR_INDEX_LUSTRE,
                             XATTR_LUSTRE_MDS_OBJID, md, size);
-
-        up(&inode->i_sem);
         unlock_kernel();
+        up(&inode->i_sem);
 
         if (rc < 0) {
                 CDEBUG(D_INFO, "error getting EA %s from MDS inode %ld: "
index dc47de2..74b2e0b 100644 (file)
@@ -35,21 +35,24 @@ int mds_update_last_rcvd(struct mds_obd *mds, void *handle,
 {
         struct mds_export_data *med = &req->rq_export->exp_mds_data;
         struct mds_client_data *mcd = med->med_mcd;
+        __u64 last_rcvd;
         loff_t off;
         int rc;
 
         off = MDS_LR_CLIENT + med->med_off * MDS_LR_SIZE;
 
-        ++mds->mds_last_rcvd;   /* lock this, or make it an LDLM function? */
-        req->rq_repmsg->transno = HTON__u64(mds->mds_last_rcvd);
-        mcd->mcd_last_rcvd = cpu_to_le64(mds->mds_last_rcvd);
+        spin_lock(&mds->mds_last_lock);
+        last_rcvd = ++mds->mds_last_rcvd;
+        spin_unlock(&mds->mds_last_lock);
+        req->rq_repmsg->transno = HTON__u64(last_rcvd);
+        mcd->mcd_last_rcvd = cpu_to_le64(last_rcvd);
         mcd->mcd_mount_count = cpu_to_le64(mds->mds_mount_count);
         mcd->mcd_last_xid = cpu_to_le64(req->rq_xid);
 
         mds_fs_set_last_rcvd(mds, handle);
         rc = lustre_fwrite(mds->mds_rcvd_filp, (char *)mcd, sizeof(*mcd), &off);
         CDEBUG(D_INODE, "wrote trans #%Ld for client '%s' at #%d: rc = %d\n",
-               mds->mds_last_rcvd, mcd->mcd_uuid, med->med_off, rc);
+               last_rcvd, mcd->mcd_uuid, med->med_off, rc);
 
         if (rc == sizeof(*mcd))
                 rc = 0;
@@ -110,10 +113,10 @@ static int mds_reint_setattr(struct mds_update_record *rec, int offset,
         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_SETATTR_WRITE,
                        de->d_inode->i_sb->s_dev);
 
-        lock_kernel();
         handle = mds_fs_start(mds, de->d_inode, MDS_FSOP_SETATTR);
         if (!handle)
-                GOTO(out_unlock, rc = PTR_ERR(handle));
+                GOTO(out_setattr_de, rc = PTR_ERR(handle));
+
         rc = mds_fs_setattr(mds, de, handle, &rec->ur_iattr);
 
         if (!rc)
@@ -127,8 +130,6 @@ static int mds_reint_setattr(struct mds_update_record *rec, int offset,
         }
 
         EXIT;
-out_unlock:
-        unlock_kernel();
 out_setattr_de:
         l_dput(de);
 out_setattr:
@@ -340,7 +341,7 @@ static int mds_reint_create(struct mds_update_record *rec, int offset,
 
                 rc = mds_update_last_rcvd(mds, handle, req);
                 if (rc) {
-                        CERROR("error on update_last_rcvd: rc = %d\n", rc);
+                        CERROR("error on mds_update_last_rcvd: rc = %d\n", rc);
                         /* XXX should we abort here in case of error? */
                 }
 
@@ -382,6 +383,7 @@ out_create_unlink:
                 err = vfs_unlink(dir, dchild);
                 if (err)
                         CERROR("failed unlink in error path: rc = %d\n", err);
+                break;
         }
 
         goto out_create_commit;