+
+/* Close a "file descriptor" and possibly unlink an orphan from the
+ * PENDING directory.
+ *
+ * If we are being called from mds_disconnect() because the client has
+ * disappeared, then req == NULL and we do not update last_rcvd because
+ * there is nothing that could be recovered by the client at this stage
+ * (it will not even _have_ an entry in last_rcvd anymore).
+ *
+ * Returns EAGAIN if the client needs to get more data and re-close. */
+int mds_mfd_close(struct ptlrpc_request *req, struct obd_device *obd,
+ struct mds_file_data *mfd, int unlink_orphan)
+{
+ struct inode *inode = mfd->mfd_dentry->d_inode;
+ char fidname[LL_FID_NAMELEN];
+ int last_orphan, fidlen, rc = 0, cleanup_phase = 0;
+ struct dentry *pending_child = NULL;
+ struct mds_obd *mds = &obd->u.mds;
+ struct inode *pending_dir = mds->mds_pending_dir->d_inode;
+ void *handle = NULL;
+ struct mds_body *request_body, *reply_body;
+ struct dentry_params dp;
+ ENTRY;
+
+ if (req != NULL) {
+ request_body = lustre_msg_buf(req->rq_reqmsg, 0,
+ sizeof(*request_body));
+ reply_body = lustre_msg_buf(req->rq_repmsg, 0,
+ sizeof(*reply_body));
+ }
+
+ fidlen = ll_fid2str(fidname, inode->i_ino, inode->i_generation);
+
+ last_orphan = mds_open_orphan_dec_test(inode) &&
+ mds_inode_is_orphan(inode);
+
+ /* this is the actual "close" */
+ if (mfd->mfd_mode & FMODE_WRITE) {
+ mds_put_write_access(inode);
+ } else if (mfd->mfd_mode & FMODE_EXEC) {
+ mds_allow_write_access(inode);
+ }
+
+ if (last_orphan && unlink_orphan) {
+ LASSERT(mds_query_write_access(inode) == 0);
+ if (mfd->mfd_mode & FMODE_WRITE)
+ rc = mds_close_io_epoch(mds, inode, request_body, 1);
+
+ CWARN("destroying orphan object %s\n", fidname);
+
+ /* Sadly, there is no easy way to save pending_child from
+ * mds_reint_unlink() into mfd, so we need to re-lookup,
+ * but normally it will still be in the dcache. */
+ pending_dir = mds->mds_pending_dir->d_inode;
+ down(&pending_dir->i_sem);
+ cleanup_phase = 1; /* up(i_sem) when finished */
+ pending_child = lookup_one_len(fidname, mds->mds_pending_dir,
+ fidlen);
+ if (IS_ERR(pending_child))
+ GOTO(cleanup, rc = PTR_ERR(pending_child));
+ LASSERT(pending_child->d_inode != NULL);
+
+ cleanup_phase = 2; /* dput(pending_child) when finished */
+ handle = fsfilt_start(obd, pending_dir, FSFILT_OP_UNLINK_LOG,
+ NULL);
+ if (IS_ERR(handle)) {
+ rc = PTR_ERR(handle);
+ handle = NULL;
+ GOTO(cleanup, rc);
+ }
+
+#ifdef ENABLE_ORPHANS
+ if (req != NULL &&
+ (reply_body->valid & OBD_MD_FLEASIZE) &&
+ mds_log_op_unlink(obd, pending_child->d_inode,
+ req->rq_repmsg, 1) > 0) {
+ reply_body->valid |= OBD_MD_FLCOOKIE;
+ }
+#endif
+ pending_child->d_fsdata = (void *) &dp;
+ dp.p_inum = 0;
+ dp.p_ptr = req;
+ if (S_ISDIR(pending_child->d_inode->i_mode))
+ rc = vfs_rmdir(pending_dir, pending_child);
+ else
+ rc = vfs_unlink(pending_dir, pending_child);
+ if (rc)
+ CERROR("error unlinking orphan %s: rc %d\n",fidname,rc);
+ } else if (mfd->mfd_mode & FMODE_WRITE &&
+ mds_query_write_access(inode) == 0) {
+ // XXX this should probably be abstracted with mds_reint_setattr
+ struct iattr iattr;
+
+ rc = mds_close_io_epoch(mds, inode, request_body, 0);
+ if (rc == EAGAIN)
+ goto dput;
+
+#if 0
+ /* XXX can't set block count with fsfilt_setattr (!) */
+ iattr.ia_valid = ATTR_CTIME | ATTR_ATIME |
+ ATTR_MTIME | ATTR_SIZE;
+ iattr.ia_atime = request_body->atime;
+ iattr.ia_ctime = request_body->ctime;
+ iattr.ia_mtime = request_body->mtime;
+ iattr.ia_size = request_body->size;
+ /* iattr.ia_blocks = request_body->blocks */
+
+ handle = fsfilt_start(obd, inode, FSFILT_OP_SETATTR, NULL);
+ if (IS_ERR(handle))
+ GOTO(cleanup, rc = PTR_ERR(handle));
+ rc = fsfilt_setattr(obd, mfd->mfd_dentry, handle, &iattr, 0);
+ if (rc)
+ CERROR("error in setattr(%s): rc %d\n", fidname, rc);
+#endif
+ }
+ dput:
+ l_dput(mfd->mfd_dentry);
+ mds_mfd_destroy(mfd);
+
+ cleanup:
+ if (req) {
+ rc = mds_finish_transno(mds, pending_dir, handle, req, rc, 0);
+ } else if (handle) {
+ int err = fsfilt_commit(obd, pending_dir, handle, 0);
+ if (err) {
+ CERROR("error committing close: %d\n", err);
+ if (!rc)
+ rc = err;
+ }
+ }
+
+ switch (cleanup_phase) {
+ case 2:
+ dput(pending_child);
+ case 1:
+ up(&pending_dir->i_sem);
+ }
+ RETURN(rc);
+}
+
+int mds_close(struct ptlrpc_request *req)
+{
+ struct mds_export_data *med = &req->rq_export->exp_mds_data;
+ struct obd_device *obd = req->rq_export->exp_obd;
+ struct mds_body *body;
+ struct mds_file_data *mfd;
+ struct obd_run_ctxt saved;
+ struct inode *inode;
+ int rc, repsize[3] = {sizeof(struct mds_body),
+ obd->u.mds.mds_max_mdsize,
+ obd->u.mds.mds_max_cookiesize};
+ ENTRY;
+
+ MDS_CHECK_RESENT(req, mds_reconstruct_generic(req));
+
+ body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_mds_body);
+ if (body == NULL) {
+ CERROR("Can't unpack body\n");
+ req->rq_status = -EFAULT;
+ RETURN(-EFAULT);
+ }
+
+ if (body->flags & MDS_BFLAG_UNCOMMITTED_WRITES)
+ /* do some stuff */ ;
+
+ mfd = mds_handle2mfd(&body->handle);
+ if (mfd == NULL) {
+ DEBUG_REQ(D_ERROR, req, "no handle for file close ino "LPD64
+ ": cookie "LPX64, body->fid1.id, body->handle.cookie);
+ req->rq_status = -ESTALE;
+ RETURN(-ESTALE);
+ }
+
+ rc = lustre_pack_reply(req, 3, repsize, NULL);
+ if (rc) {
+ CERROR("lustre_pack_reply: rc = %d\n", rc);
+ req->rq_status = rc;
+ }
+
+ inode = mfd->mfd_dentry->d_inode;
+ if (mds_inode_is_orphan(inode) && mds_open_orphan_count(inode) == 1) {
+ body = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*body));
+ LASSERT(body != NULL);
+
+ mds_pack_inode2fid(&body->fid1, inode);
+ mds_pack_inode2body(body, inode);
+ mds_pack_md(obd, req->rq_repmsg, 1, body, inode, 1);
+ }
+ spin_lock(&med->med_open_lock);
+ list_del(&mfd->mfd_list);
+ spin_unlock(&med->med_open_lock);
+
+ push_ctxt(&saved, &obd->obd_ctxt, NULL);
+ req->rq_status = mds_mfd_close(rc ? NULL : req, obd, mfd, 1);
+ pop_ctxt(&saved, &obd->obd_ctxt, NULL);
+
+ if (OBD_FAIL_CHECK(OBD_FAIL_MDS_CLOSE_PACK)) {
+ CERROR("test case OBD_FAIL_MDS_CLOSE_PACK\n");
+ req->rq_status = -ENOMEM;
+ mds_mfd_put(mfd);
+ RETURN(-ENOMEM);
+ }
+
+ mds_mfd_put(mfd);
+ RETURN(0);
+}
+
+int mds_done_writing(struct ptlrpc_request *req)
+{
+ struct mds_body *body;
+ int rc, size = sizeof(struct mds_body);
+ ENTRY;
+
+ MDS_CHECK_RESENT(req, mds_reconstruct_generic(req));
+
+ body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_mds_body);
+ if (body == NULL) {
+ CERROR("Can't unpack body\n");
+ req->rq_status = -EFAULT;
+ RETURN(-EFAULT);
+ }
+
+ rc = lustre_pack_reply(req, 1, &size, NULL);
+ if (rc) {
+ CERROR("lustre_pack_reply: rc = %d\n", rc);
+ req->rq_status = rc;
+ }
+
+ RETURN(0);
+}