Whamcloud - gitweb
Open/create the last_rcvd file for MDS at filesystem mount time and close it
[fs/lustre-release.git] / lustre / mds / handler.c
index 6aa6ab5..a2cb3b6 100644 (file)
@@ -38,70 +38,57 @@ int mds_sendpage(struct ptlrpc_request *req, struct file *file,
 {
         int rc = 0;
         mm_segment_t oldfs = get_fs();
+        struct ptlrpc_bulk_desc *bulk;
+        char *buf;
 
-        OBD_FAIL_RETURN(OBD_FAIL_MDS_SENDPAGE, -EIO);
-
-        if (req->rq_peer.peer_nid == 0) {
-                /* dst->addr is a user address, but in a different task! */
-                char *buf = (char *)(long)dst->addr;
-
-                set_fs(KERNEL_DS);
-                rc = mds_fs_readpage(&req->rq_obd->u.mds, file, buf, PAGE_SIZE,
-                                     &offset);
-                set_fs(oldfs);
-
-                if (rc != PAGE_SIZE) {
-                        rc = -EIO;
-                        GOTO(out, rc);
-                }
-                EXIT;
-        } else {
-                struct ptlrpc_bulk_desc *bulk;
-                char *buf;
-
-                bulk = ptlrpc_prep_bulk(&req->rq_peer);
-                if (bulk == NULL) {
-                        rc = -ENOMEM;
-                        GOTO(out, rc);
-                }
-
-                bulk->b_xid = req->rq_xid;
+        bulk = ptlrpc_prep_bulk(req->rq_connection);
+        if (bulk == NULL) {
+                rc = -ENOMEM;
+                GOTO(out, rc);
+        }
 
-                OBD_ALLOC(buf, PAGE_SIZE);
-                if (!buf) {
-                        rc = -ENOMEM;
-                        GOTO(cleanup_bulk, rc);
-                }
+        bulk->b_xid = req->rq_reqmsg->xid;
 
-                set_fs(KERNEL_DS);
-                rc = mds_fs_readpage(&req->rq_obd->u.mds, file, buf, PAGE_SIZE,
-                                     &offset);
-                set_fs(oldfs);
+        OBD_ALLOC(buf, PAGE_SIZE);
+        if (!buf) {
+                rc = -ENOMEM;
+                GOTO(cleanup_bulk, rc);
+        }
 
-                if (rc != PAGE_SIZE) {
-                        rc = -EIO;
-                        GOTO(cleanup_buf, rc);
-                }
+        set_fs(KERNEL_DS);
+        rc = mds_fs_readpage(&req->rq_obd->u.mds, file, buf, PAGE_SIZE,
+                             &offset);
+        set_fs(oldfs);
 
-                bulk->b_buf = buf;
-                bulk->b_buflen = PAGE_SIZE;
+        if (rc != PAGE_SIZE) {
+                rc = -EIO;
+                GOTO(cleanup_buf, rc);
+        }
 
-                rc = ptlrpc_send_bulk(bulk, MDS_BULK_PORTAL);
-                wait_event_interruptible(bulk->b_waitq,
-                                         ptlrpc_check_bulk_sent(bulk));
+        bulk->b_buf = buf;
+        bulk->b_buflen = PAGE_SIZE;
 
-                if (bulk->b_flags == PTL_RPC_INTR) {
-                        rc = -EINTR;
-                        GOTO(cleanup_buf, rc);
-                }
+        rc = ptlrpc_send_bulk(bulk, MDS_BULK_PORTAL);
+        if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE)) {
+                CERROR("obd_fail_loc=%x, fail operation rc=%d\n",
+                       OBD_FAIL_MDS_SENDPAGE, rc);
+                PtlMDUnlink(bulk->b_md_h);
+                GOTO(cleanup_buf, rc);
+        }
+        wait_event_interruptible(bulk->b_waitq,
+                                 ptlrpc_check_bulk_sent(bulk));
 
-                EXIT;
-        cleanup_buf:
-                OBD_FREE(buf, PAGE_SIZE);
-        cleanup_bulk:
-                OBD_FREE(bulk, sizeof(*bulk));
+        if (bulk->b_flags == PTL_RPC_INTR) {
+                rc = -EINTR;
+                GOTO(cleanup_buf, rc);
         }
-out:
+
+        EXIT;
+ cleanup_buf:
+        OBD_FREE(buf, PAGE_SIZE);
+ cleanup_bulk:
+        OBD_FREE(bulk, sizeof(*bulk));
+ out:
         return rc;
 }
 
@@ -172,113 +159,108 @@ int mds_getattr(struct ptlrpc_request *req)
 {
         struct dentry *de;
         struct inode *inode;
-        struct mds_rep *rep;
+        struct mds_body *body;
         struct mds_obd *mds = &req->rq_obd->u.mds;
-        int rc;
+        int rc, size = sizeof(*body);
+        ENTRY;
 
-        rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
-                          &req->rq_replen, &req->rq_repbuf);
+        rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) {
                 CERROR("mds: out of memory\n");
                 req->rq_status = -ENOMEM;
                 RETURN(0);
         }
 
-        req->rq_rephdr->xid = req->rq_reqhdr->xid;
-        rep = req->rq_rep.mds;
-
-        de = mds_fid2dentry(mds, &req->rq_req.mds->fid1, NULL);
+        body = lustre_msg_buf(req->rq_reqmsg, 0);
+        de = mds_fid2dentry(mds, &body->fid1, NULL);
         if (IS_ERR(de)) {
-                req->rq_rephdr->status = -ENOENT;
+                req->rq_status = -ENOENT;
                 RETURN(0);
         }
 
+        body = lustre_msg_buf(req->rq_repmsg, 0);
         inode = de->d_inode;
-        rep->ino = inode->i_ino;
-        rep->generation = inode->i_generation;
-        rep->atime = inode->i_atime;
-        rep->ctime = inode->i_ctime;
-        rep->mtime = inode->i_mtime;
-        rep->uid = inode->i_uid;
-        rep->gid = inode->i_gid;
-        rep->size = inode->i_size;
-        rep->mode = inode->i_mode;
-        rep->nlink = inode->i_nlink;
-        rep->valid = ~0;
-        mds_fs_get_objid(mds, inode, &rep->objid);
-        dput(de);
-        return 0;
+        body->ino = inode->i_ino;
+        body->generation = inode->i_generation;
+        body->atime = inode->i_atime;
+        body->ctime = inode->i_ctime;
+        body->mtime = inode->i_mtime;
+        body->uid = inode->i_uid;
+        body->gid = inode->i_gid;
+        body->size = inode->i_size;
+        body->mode = inode->i_mode;
+        body->nlink = inode->i_nlink;
+        body->valid = ~0;
+        mds_fs_get_objid(mds, inode, &body->objid);
+        l_dput(de);
+        RETURN(0);
 }
 
 int mds_open(struct ptlrpc_request *req)
 {
         struct dentry *de;
-        struct mds_rep *rep;
+        struct mds_body *body;
         struct file *file;
         struct vfsmount *mnt;
         __u32 flags;
-        int rc;
+        int rc, size = sizeof(*body);
+        ENTRY;
 
-        rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
-                          &req->rq_replen, &req->rq_repbuf);
+        rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_OPEN_PACK)) {
                 CERROR("mds: out of memory\n");
                 req->rq_status = -ENOMEM;
                 RETURN(0);
         }
 
-        req->rq_rephdr->xid = req->rq_reqhdr->xid;
-        rep = req->rq_rep.mds;
-
-        de = mds_fid2dentry(&req->rq_obd->u.mds, &req->rq_req.mds->fid1, &mnt);
+        body = lustre_msg_buf(req->rq_reqmsg, 0);
+        de = mds_fid2dentry(&req->rq_obd->u.mds, &body->fid1, &mnt);
         if (IS_ERR(de)) {
-                req->rq_rephdr->status = -ENOENT;
+                req->rq_status = -ENOENT;
                 RETURN(0);
         }
-        flags = req->rq_req.mds->flags;
+        flags = body->flags;
         file = dentry_open(de, mnt, flags);
         if (!file || IS_ERR(file)) {
-                req->rq_rephdr->status = -EINVAL;
+                req->rq_status = -EINVAL;
                 RETURN(0);
         }
 
-        rep->objid = (__u64) (unsigned long)file;
-        return 0;
+        body = lustre_msg_buf(req->rq_repmsg, 0);
+        body->objid = (__u64) (unsigned long)file;
+        RETURN(0);
 }
 
 int mds_close(struct ptlrpc_request *req)
 {
         struct dentry *de;
-        struct mds_rep *rep;
+        struct mds_body *body;
         struct file *file;
         struct vfsmount *mnt;
         int rc;
+        ENTRY;
 
-        rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
-                          &req->rq_replen, &req->rq_repbuf);
+        rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_CLOSE_PACK)) {
                 CERROR("mds: out of memory\n");
                 req->rq_status = -ENOMEM;
                 RETURN(0);
         }
 
-        req->rq_rephdr->xid = req->rq_reqhdr->xid;
-        rep = req->rq_rep.mds;
-
-        de = mds_fid2dentry(&req->rq_obd->u.mds, &req->rq_req.mds->fid1, &mnt);
+        body = lustre_msg_buf(req->rq_reqmsg, 0);
+        de = mds_fid2dentry(&req->rq_obd->u.mds, &body->fid1, &mnt);
         if (IS_ERR(de)) {
-                req->rq_rephdr->status = -ENOENT;
+                req->rq_status = -ENOENT;
                 RETURN(0);
         }
 
-        file = (struct file *)(unsigned long) req->rq_req.mds->objid;
-
-        req->rq_rephdr->status = filp_close(file, 0);
-        dput(de);
+        file = (struct file *)(unsigned long)body->objid;
+        req->rq_status = filp_close(file, 0);
+        l_dput(de);
         mntput(mnt);
-        return 0;
-}
 
+        RETURN(0);
+}
 
 int mds_readpage(struct ptlrpc_request *req)
 {
@@ -286,25 +268,21 @@ int mds_readpage(struct ptlrpc_request *req)
         struct dentry *de;
         struct file *file;
         struct niobuf *niobuf;
-        struct mds_rep *rep;
-        int rc;
-
+        struct mds_body *body;
+        int rc, size = sizeof(*body);
         ENTRY;
 
-        rc = mds_pack_rep(NULL, 0, NULL, 0, &req->rq_rephdr, &req->rq_rep,
-                          &req->rq_replen, &req->rq_repbuf);
+        rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK)) {
                 CERROR("mds: out of memory\n");
                 req->rq_status = -ENOMEM;
                 RETURN(0);
         }
 
-        req->rq_rephdr->xid = req->rq_reqhdr->xid;
-        rep = req->rq_rep.mds;
-
-        de = mds_fid2dentry(&req->rq_obd->u.mds, &req->rq_req.mds->fid1, &mnt);
+        body = lustre_msg_buf(req->rq_reqmsg, 0);
+        de = mds_fid2dentry(&req->rq_obd->u.mds, &body->fid1, &mnt);
         if (IS_ERR(de)) {
-                req->rq_rephdr->status = PTR_ERR(de);
+                req->rq_status = PTR_ERR(de);
                 RETURN(0);
         }
 
@@ -313,32 +291,33 @@ int mds_readpage(struct ptlrpc_request *req)
         file = dentry_open(de, mnt, O_RDONLY | O_LARGEFILE);
         /* note: in case of an error, dentry_open puts dentry */
         if (IS_ERR(file)) {
-                req->rq_rephdr->status = PTR_ERR(file);
+                req->rq_status = PTR_ERR(file);
                 RETURN(0);
         }
 
-        niobuf = mds_req_tgt(req->rq_req.mds);
+        niobuf = lustre_msg_buf(req->rq_reqmsg, 1);
+        if (!niobuf) {
+                req->rq_status = -EINVAL;
+                LBUG();
+                RETURN(0);
+        }
 
         /* to make this asynchronous make sure that the handling function
            doesn't send a reply when this function completes. Instead a
            callback function would send the reply */
-        rc = mds_sendpage(req, file, req->rq_req.mds->size, niobuf);
+        rc = mds_sendpage(req, file, body->size, niobuf);
 
         filp_close(file, 0);
-        req->rq_rephdr->status = rc;
+        req->rq_status = rc;
         RETURN(0);
 }
 
 int mds_reint(struct ptlrpc_request *req)
 {
-        char *buf;
-        int rc, len;
+        int rc;
         struct mds_update_record rec;
 
-        buf = mds_req_tgt(req->rq_req.mds);
-        len = req->rq_req.mds->tgtlen;
-
-        rc = mds_update_unpack(buf, len, &rec);
+        rc = mds_update_unpack(req, &rec);
         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNPACK)) {
                 CERROR("invalid record\n");
                 req->rq_status = -EINVAL;
@@ -353,28 +332,21 @@ int mds_handle(struct obd_device *dev, struct ptlrpc_service *svc,
                struct ptlrpc_request *req)
 {
         int rc;
-        struct ptlreq_hdr *hdr;
-
         ENTRY;
 
-        hdr = (struct ptlreq_hdr *)req->rq_reqbuf;
-
-        if (NTOH__u32(hdr->type) != PTL_RPC_REQUEST) {
-                CERROR("lustre_mds: wrong packet type sent %d\n",
-                       NTOH__u32(hdr->type));
-                rc = -EINVAL;
-                GOTO(out, rc);
-        }
-
-        rc = mds_unpack_req(req->rq_reqbuf, req->rq_reqlen,
-                            &req->rq_reqhdr, &req->rq_req);
+        rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_HANDLE_UNPACK)) {
                 CERROR("lustre_mds: Invalid request\n");
                 GOTO(out, rc);
         }
 
-        switch (req->rq_reqhdr->opc) {
+        if (req->rq_reqmsg->type != PTL_RPC_REQUEST) {
+                CERROR("lustre_mds: wrong packet type sent %d\n",
+                       req->rq_reqmsg->type);
+                GOTO(out, rc = -EINVAL);
+        }
 
+        switch (req->rq_reqmsg->opc) {
         case MDS_GETATTR:
                 CDEBUG(D_INODE, "getattr\n");
                 OBD_FAIL_RETURN(OBD_FAIL_MDS_GETATTR_NET, 0);
@@ -385,6 +357,9 @@ int mds_handle(struct obd_device *dev, struct ptlrpc_service *svc,
                 CDEBUG(D_INODE, "readpage\n");
                 OBD_FAIL_RETURN(OBD_FAIL_MDS_READPAGE_NET, 0);
                 rc = mds_readpage(req);
+
+                if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE))
+                        return 0;
                 break;
 
         case MDS_REINT:
@@ -406,74 +381,37 @@ int mds_handle(struct obd_device *dev, struct ptlrpc_service *svc,
                 break;
 
         default:
-                rc = ptlrpc_error(dev, svc, req);
+                rc = ptlrpc_error(svc, req);
                 RETURN(rc);
         }
 
         EXIT;
 out:
         if (rc) {
-                CERROR("no header\n");
-                LBUG();
-                return 0;
-        }
-
-        if( req->rq_status) {
-                ptlrpc_error(dev, svc, req);
+                ptlrpc_error(svc, req);
         } else {
                 CDEBUG(D_NET, "sending reply\n");
-                ptlrpc_reply(dev, svc, req);
+                ptlrpc_reply(svc, req);
         }
 
         return 0;
 }
 
-
-/* mount the file system (secretly) */
-static int mds_setup(struct obd_device *obddev, obd_count len, void *buf)
+static int mds_prep(struct obd_device *obddev)
 {
-        struct obd_ioctl_data* data = buf;
+        struct obd_run_ctxt saved;
         struct mds_obd *mds = &obddev->u.mds;
         struct super_operations *s_ops;
-        struct super_block *sb;
-        struct vfsmount *mnt;
+        struct file *f;
         int err;
-        ENTRY;
-
-        MOD_INC_USE_COUNT;
-        mnt = do_kern_mount(data->ioc_inlbuf2, 0, data->ioc_inlbuf1, NULL);
-        err = PTR_ERR(mnt);
-        if (IS_ERR(mnt)) {
-                CERROR("do_kern_mount failed: %d\n", err);
-                GOTO(err_dec, err);
-        }
-
-        sb = mds->mds_sb = mnt->mnt_root->d_inode->i_sb;
-        if (!sb)
-                GOTO(err_put, (err = -ENODEV));
-
-        mds->mds_vfsmnt = mnt;
-        mds->mds_fstype = strdup(data->ioc_inlbuf2);
-
-        if (!strcmp(mds->mds_fstype, "ext3"))
-                mds->mds_fsops = &mds_ext3_fs_ops;
-        else if (!strcmp(mds->mds_fstype, "ext2"))
-                mds->mds_fsops = &mds_ext2_fs_ops;
-        else {
-                CERROR("unsupported MDS filesystem type %s\n", mds->mds_fstype);
-                GOTO(err_kfree, (err = -EPERM));
-        }
-
-        mds->mds_ctxt.pwdmnt = mnt;
-        mds->mds_ctxt.pwd = mnt->mnt_root;
-        mds->mds_ctxt.fs = KERNEL_DS;
 
         mds->mds_service = ptlrpc_init_svc(128 * 1024,
                                            MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL,
                                            "self", mds_handle);
+
         if (!mds->mds_service) {
                 CERROR("failed to start service\n");
-                GOTO(err_kfree, (err = -EINVAL));
+                RETURN(-EINVAL);
         }
 
         err = ptlrpc_start_thread(obddev, mds->mds_service, "lustre_mds");
@@ -482,6 +420,25 @@ static int mds_setup(struct obd_device *obddev, obd_count len, void *buf)
                 GOTO(err_svc, err);
         }
 
+        push_ctxt(&saved, &mds->mds_ctxt);
+        err = simple_mkdir(current->fs->pwd, "ROOT", 0700);
+        if (err && err != -EEXIST) {
+                CERROR("cannot create ROOT directory\n");
+                GOTO(err_svc, err);
+        }
+        err = simple_mkdir(current->fs->pwd, "FH", 0700);
+        if (err && err != -EEXIST) {
+                CERROR("cannot create FH directory\n");
+                GOTO(err_svc, err);
+        }
+        f = filp_open("last_rcvd", O_RDWR | O_CREAT, 0644);
+        if (IS_ERR(f)) {
+                CERROR("cannot open/create last_rcvd file\n");
+                GOTO(err_svc, err = PTR_ERR(f));
+        }
+        mds->last_rcvd = f;
+        pop_ctxt(&saved);
+
         /*
          * Replace the client filesystem delete_inode method with our own,
          * so that we can clear the object ID before the inode is deleted.
@@ -494,25 +451,74 @@ static int mds_setup(struct obd_device *obddev, obd_count len, void *buf)
          * type, as we don't have access to the mds struct in * delete_inode.
          */
         OBD_ALLOC(s_ops, sizeof(*s_ops));
-        memcpy(s_ops, sb->s_op, sizeof(*s_ops));
+        memcpy(s_ops, mds->mds_sb->s_op, sizeof(*s_ops));
         mds->mds_fsops->cl_delete_inode = s_ops->delete_inode;
         s_ops->delete_inode = mds->mds_fsops->fs_delete_inode;
-        sb->s_op = s_ops;
+        mds->mds_sb->s_op = s_ops;
 
         RETURN(0);
 
 err_svc:
         rpc_unregister_service(mds->mds_service);
         OBD_FREE(mds->mds_service, sizeof(*mds->mds_service));
-err_kfree:
-        kfree(mds->mds_fstype);
+
+        return(err);
+}
+
+/* mount the file system (secretly) */
+static int mds_setup(struct obd_device *obddev, obd_count len, void *buf)
+{
+        struct obd_ioctl_data* data = buf;
+        struct mds_obd *mds = &obddev->u.mds;
+        struct vfsmount *mnt;
+        int err = 0;
+        ENTRY;
+
+#ifdef CONFIG_DEV_RDONLY
+        dev_clear_rdonly(2);
+#endif
+        mds->mds_fstype = strdup(data->ioc_inlbuf2);
+
+        if (!strcmp(mds->mds_fstype, "ext3"))
+                mds->mds_fsops = &mds_ext3_fs_ops;
+        else if (!strcmp(mds->mds_fstype, "ext2"))
+                mds->mds_fsops = &mds_ext2_fs_ops;
+        else {
+                CERROR("unsupported MDS filesystem type %s\n", mds->mds_fstype);
+                GOTO(err_kfree, (err = -EPERM));
+        }
+
+        MOD_INC_USE_COUNT;
+        mnt = do_kern_mount(mds->mds_fstype, 0, data->ioc_inlbuf1, NULL);
+        if (IS_ERR(mnt)) {
+                CERROR("do_kern_mount failed: %d\n", err);
+                GOTO(err_dec, err = PTR_ERR(mnt));
+        }
+
+        mds->mds_sb = mnt->mnt_root->d_inode->i_sb;
+        if (!mds->mds_sb)
+                GOTO(err_put, (err = -ENODEV));
+
+        mds->mds_vfsmnt = mnt;
+        mds->mds_ctxt.pwdmnt = mnt;
+        mds->mds_ctxt.pwd = mnt->mnt_root;
+        mds->mds_ctxt.fs = KERNEL_DS;
+
+        err = mds_prep(obddev);
+        if (err)
+                GOTO(err_put, err);
+
+        RETURN(0);
+
 err_put:
-        unlock_kernel(); // XXX do we want/need this?
+        unlock_kernel();
         mntput(mds->mds_vfsmnt);
         mds->mds_sb = 0;
-        lock_kernel();   // XXX do we want/need this?
+        lock_kernel();
 err_dec:
         MOD_DEC_USE_COUNT;
+err_kfree:
+        kfree(mds->mds_fstype);
         return err;
 }
 
@@ -530,19 +536,24 @@ static int mds_cleanup(struct obd_device * obddev)
         }
 
         ptlrpc_stop_thread(mds->mds_service);
-
+        rpc_unregister_service(mds->mds_service);
         if (!list_empty(&mds->mds_service->srv_reqs)) {
                 // XXX reply with errors and clean up
                 CERROR("Request list not empty!\n");
         }
-
-        rpc_unregister_service(mds->mds_service);
         OBD_FREE(mds->mds_service, sizeof(*mds->mds_service));
 
         sb = mds->mds_sb;
         if (!mds->mds_sb)
                 RETURN(0);
 
+        if (mds->last_rcvd) {
+                int rc = filp_close(mds->last_rcvd, 0);
+                mds->last_rcvd = NULL;
+
+                if (rc)
+                        CERROR("last_rcvd file won't close, rc=%d\n", rc);
+        }
         s_ops = sb->s_op;
 
         unlock_kernel();
@@ -550,7 +561,9 @@ static int mds_cleanup(struct obd_device * obddev)
         mds->mds_sb = 0;
         kfree(mds->mds_fstype);
         lock_kernel();
-
+#ifdef CONFIG_DEV_RDONLY
+        dev_clear_rdonly(2);
+#endif
         OBD_FREE(s_ops, sizeof(*s_ops));
 
         MOD_DEC_USE_COUNT;