+void reconstruct_open(struct mds_update_record *rec, struct ptlrpc_request *req,
+ struct lustre_handle *child_lockh)
+{
+ struct mds_export_data *med = &req->rq_export->exp_mds_data;
+ struct mds_client_data *mcd = med->med_mcd;
+ struct mds_obd *mds = mds_req2mds(req);
+ struct mds_file_data *mfd;
+ struct obd_device *obd = req->rq_export->exp_obd;
+ struct dentry *parent, *child;
+ struct ldlm_reply *rep = lustre_msg_buf(req->rq_repmsg, 0);
+ struct mds_body *body = lustre_msg_buf(req->rq_repmsg, 1);
+ int disp, rc;
+ ENTRY;
+
+ ENTRY;
+
+ /* copy rc, transno and disp; steal locks */
+ req->rq_transno = mcd->mcd_last_transno;
+ req->rq_status = mcd->mcd_last_result;
+ disp = rep->lock_policy_res1 = mcd->mcd_last_data;
+
+ if (med->med_outstanding_reply)
+ mds_steal_ack_locks(med, req);
+
+ /* We never care about these. */
+ disp &= ~(IT_OPEN_LOOKUP | IT_OPEN_POS | IT_OPEN_NEG);
+ if (!disp) {
+ EXIT;
+ return; /* error looking up parent or child */
+ }
+
+ parent = mds_fid2dentry(mds, rec->ur_fid1, NULL);
+ LASSERT(!IS_ERR(parent));
+
+ child = lookup_one_len(lustre_msg_buf(req->rq_reqmsg, 3),
+ parent, req->rq_reqmsg->buflens[3] - 1);
+ LASSERT(!IS_ERR(child));
+
+ if (!child->d_inode) {
+ GOTO(out_dput, 0); /* child not present to open */
+ }
+
+ /* At this point, we know we have a child, which means that we'll send
+ * it back _unless_ it was open failed, _and_ we didn't create the file.
+ * I love you guys. No, really.
+ */
+ if (((disp & (IT_OPEN_OPEN | IT_OPEN_CREATE)) == IT_OPEN_OPEN) &&
+ req->rq_status) {
+ GOTO(out_dput, 0);
+ }
+
+ if (!med->med_outstanding_reply) {
+ LBUG(); /* XXX need to get enqueue client lock */
+ }
+
+ /* get lock (write for O_CREAT, read otherwise) */
+
+ mds_pack_inode2fid(&body->fid1, child->d_inode);
+ mds_pack_inode2body(body, child->d_inode);
+ if (S_ISREG(child->d_inode->i_mode)) {
+ rc = mds_pack_md(obd, req->rq_repmsg, 2, body,
+ child->d_inode);
+ if (rc)
+ LASSERT(rc == req->rq_status);
+ } else {
+ /* XXX need to check this case */
+ }
+
+ /* If we're opening a file without an EA, change to a write
+ lock (unless we already have one). */
+
+ /* If we have -EEXIST as the status, and we were asked to create
+ * exclusively, we can tell we failed because the file already existed.
+ */
+ if (req->rq_status == -EEXIST &&
+ ((rec->ur_flags & (O_CREAT | O_EXCL)) == (O_CREAT | O_EXCL))) {
+ GOTO(out_dput, 0);
+ }
+
+ /* If we didn't get as far as trying to open, then some locking thing
+ * probably went wrong, and we'll just bail here.
+ */
+ if ((disp & IT_OPEN_OPEN) == 0) {
+ GOTO(out_dput, 0);
+ }
+
+ /* If we failed, then we must have failed opening, so don't look for
+ * file descriptor or anything, just give the client the bad news.
+ */
+ if (req->rq_status) {
+ GOTO(out_dput, 0);
+ }
+
+ if (med->med_outstanding_reply) {
+ struct list_head *t;
+ mfd = NULL;
+ /* XXX can we just look in the old reply to find the handle in
+ * XXX O(1) here? */
+ list_for_each(t, &med->med_open_head) {
+ mfd = list_entry(t, struct mds_file_data, mfd_list);
+ if (mfd->mfd_xid == req->rq_xid)
+ break;
+ mfd = NULL;
+ }
+ /* if we're not recovering, it had better be found */
+ LASSERT(mfd);
+ } else {
+ struct file *file;
+ mfd = kmem_cache_alloc(mds_file_cache, GFP_KERNEL);
+ if (!mfd) {
+ CERROR("mds: out of memory\n");
+ GOTO(out_dput, req->rq_status = -ENOMEM);
+ }
+ mntget(mds->mds_vfsmnt);
+ file = dentry_open(child, mds->mds_vfsmnt,
+ rec->ur_flags & ~(O_DIRECT | O_TRUNC));
+ LASSERT(!IS_ERR(file)); /* XXX -ENOMEM? */
+ file->private_data = mfd;
+ mfd->mfd_file = file;
+ mfd->mfd_xid = req->rq_xid;
+ get_random_bytes(&mfd->mfd_servercookie,
+ sizeof(mfd->mfd_servercookie));
+ spin_lock(&med->med_open_lock);
+ list_add(&mfd->mfd_list, &med->med_open_head);
+ spin_unlock(&med->med_open_lock);
+ }
+
+ body->handle.addr = (__u64)(unsigned long)mfd;
+ body->handle.cookie = mfd->mfd_servercookie;
+
+ out_dput:
+ l_dput(child);
+ l_dput(parent);
+ EXIT;
+}
+