Whamcloud - gitweb
Merge b_md into HEAD
[fs/lustre-release.git] / lustre / mds / mds_open.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/mds/handler.c
5  *  Lustre Metadata Server (mds) request handler
6  *
7  *  Copyright (c) 2001, 2002 Cluster File Systems, Inc.
8  *   Author: Peter Braam <braam@clusterfs.com>
9  *   Author: Andreas Dilger <adilger@clusterfs.com>
10  *   Author: Phil Schwan <phil@clusterfs.com>
11  *   Author: Mike Shaver <shaver@clusterfs.com>
12  *
13  *   This file is part of Lustre, http://www.lustre.org.
14  *
15  *   Lustre is free software; you can redistribute it and/or
16  *   modify it under the terms of version 2 of the GNU General Public
17  *   License as published by the Free Software Foundation.
18  *
19  *   Lustre is distributed in the hope that it will be useful,
20  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
21  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22  *   GNU General Public License for more details.
23  *
24  *   You should have received a copy of the GNU General Public License
25  *   along with Lustre; if not, write to the Free Software
26  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
27  */
28
29 #define EXPORT_SYMTAB
30 #define DEBUG_SUBSYSTEM S_MDS
31
32 #include <linux/module.h>
33 #include <linux/lustre_mds.h>
34 #include <linux/lustre_dlm.h>
35 #include <linux/init.h>
36 #include <linux/obd_class.h>
37 #include <linux/random.h>
38 #include <linux/locks.h>
39 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
40 #include <linux/buffer_head.h>
41 #include <linux/workqueue.h>
42 #endif
43 #include <linux/obd_lov.h>
44 #include <linux/lustre_mds.h>
45 #include <linux/lustre_fsfilt.h>
46 #include <linux/lprocfs_status.h>
47
48 extern kmem_cache_t *mds_file_cache;
49 extern inline struct mds_obd *mds_req2mds(struct ptlrpc_request *req);
50 extern void mds_start_transno(struct mds_obd *mds);
51 extern int mds_finish_transno(struct mds_obd *mds, void *handle,
52                               struct ptlrpc_request *req, int rc);
53 extern int enqueue_ordered_locks(int lock_mode, struct obd_device *obd,
54                                  struct ldlm_res_id *p1_res_id,
55                                  struct ldlm_res_id *p2_res_id,
56                                  struct ldlm_res_id *c1_res_id,
57                                  struct ldlm_res_id *c2_res_id,
58                                  struct lustre_handle *p1_lockh,
59                                  struct lustre_handle *p2_lockh,
60                                  struct lustre_handle *c1_lockh,
61                                  struct lustre_handle *c2_lockh);
62
63 int mds_open(struct mds_update_record *rec, int offset,
64              struct ptlrpc_request *req, struct lustre_handle *child_lockh)
65 {
66         struct mds_obd *mds = mds_req2mds(req);
67         struct obd_device *obd = req->rq_export->exp_obd;
68         struct ldlm_reply *rep = lustre_msg_buf(req->rq_repmsg, 0);
69         struct file *file;
70         struct mds_body *body = lustre_msg_buf(req->rq_repmsg, 1);
71         struct dentry *dchild, *parent;
72         struct mds_export_data *med;
73         struct mds_file_data *mfd = NULL;
74         struct ldlm_res_id child_res_id = { .name = {0} };
75         struct lustre_handle parent_lockh;
76         int rc = 0, parent_mode, child_mode = LCK_PR, lock_flags, created = 0;
77         ENTRY;
78
79 #warning replay of open needs to be redone
80         /* was this animal open already and the client lost the reply? */
81         /* XXX need some way to detect a reopen, to avoid locked list walks */
82         med = &req->rq_export->exp_mds_data;
83 #if 0
84         spin_lock(&med->med_open_lock);
85         list_for_each(tmp, &med->med_open_head) {
86                 mfd = list_entry(tmp, typeof(*mfd), mfd_list);
87                 if (!memcmp(&mfd->mfd_clienthandle, &body->handle,
88                             sizeof(mfd->mfd_clienthandle)) &&
89                     body->fid1.id == mfd->mfd_file->f_dentry->d_inode->i_ino) {
90                         dchild = mfd->mfd_file->f_dentry;
91                         spin_unlock(&med->med_open_lock);
92                         CERROR("Re opening "LPD64"\n", body->fid1.id);
93                         GOTO(out_pack, rc = 0);
94                 }
95         }
96         spin_unlock(&med->med_open_lock);
97 #endif
98         rep->lock_policy_res1 |= IT_OPEN_LOOKUP;
99         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_OPEN_PACK)) {
100                 CERROR("test case OBD_FAIL_MDS_OPEN_PACK\n");
101                 req->rq_status = -ENOMEM;
102                 RETURN(-ENOMEM);
103         }
104
105         /* Step 1: Find and lock the parent */
106         parent_mode = (rec->ur_flags & O_CREAT) ? LCK_PW : LCK_PR;
107         parent = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, parent_mode,
108                                        &parent_lockh);
109         if (IS_ERR(parent)) {
110                 rc = PTR_ERR(parent);
111                 CERROR("parent lookup error %d\n", rc);
112                 LBUG();
113                 RETURN(rc);
114         }
115         LASSERT(parent->d_inode);
116
117         /* Step 2: Lookup the child */
118         dchild = lookup_one_len(lustre_msg_buf(req->rq_reqmsg, 3),
119                                 parent, req->rq_reqmsg->buflens[3] - 1);
120         if (IS_ERR(dchild))
121                 GOTO(out_step_2, rc = PTR_ERR(dchild));
122
123         if (dchild->d_inode)
124                 rep->lock_policy_res1 |= IT_OPEN_POS;
125         else
126                 rep->lock_policy_res1 |= IT_OPEN_NEG;
127
128         /* Step 3: If the child was negative, and we're supposed to,
129          * create it. */
130         if ((rec->ur_flags & O_CREAT) && !dchild->d_inode) {
131                 int err;
132                 void *handle;
133                 mds_start_transno(mds);
134                 rep->lock_policy_res1 |= IT_OPEN_CREATE;
135                 handle = fsfilt_start(obd, parent->d_inode, FSFILT_OP_CREATE);
136                 if (IS_ERR(handle)) {
137                         rc = PTR_ERR(handle);
138                         mds_finish_transno(mds, handle, req, rc);
139                         GOTO(out_step_3, rc);
140                 }
141                 rc = vfs_create(parent->d_inode, dchild, rec->ur_mode);
142                 rc = mds_finish_transno(mds, handle, req, rc);
143                 err = fsfilt_commit(obd, parent->d_inode, handle);
144                 if (rc || err) {
145                         CERROR("error on commit: err = %d\n", err);
146                         if (!rc)
147                                 rc = err;
148                         GOTO(out_step_3, rc);
149                 }
150                 created = 1;
151                 child_mode = LCK_PW;
152         } else if (!dchild->d_inode) {
153                 /* It's negative and we weren't supposed to create it */
154                 GOTO(out_step_3, rc = -ENOENT);
155         }
156
157         /* Step 4: It's positive, so lock the child */
158         child_res_id.name[0] = dchild->d_inode->i_ino;
159         child_res_id.name[1] = dchild->d_inode->i_generation;
160  reacquire:
161         lock_flags = 0;
162         rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
163                               child_res_id, LDLM_PLAIN, NULL, 0, child_mode,
164                               &lock_flags, ldlm_completion_ast,
165                               mds_blocking_ast, NULL, NULL, child_lockh);
166         if (rc != ELDLM_OK) {
167                 CERROR("ldlm_cli_enqueue: %d\n", rc);
168                 GOTO(out_step_3, rc = -EIO);
169         }
170
171         mds_pack_inode2fid(&body->fid1, dchild->d_inode);
172         mds_pack_inode2body(body, dchild->d_inode);
173         if (S_ISREG(dchild->d_inode->i_mode)) {
174                 rc = mds_pack_md(obd, req->rq_repmsg, 2, body, dchild->d_inode);
175                 if (rc)
176                         GOTO(out_step_4, rc);
177         } else {
178                 /* If this isn't a regular file, we can't open it. */
179                 GOTO(out_step_3, rc = 0); /* returns the lock to the client */
180         }
181
182         if (!created && (rec->ur_flags & O_CREAT) && (rec->ur_flags & O_EXCL)) {
183                 /* File already exists, we didn't just create it, and we
184                  * were passed O_EXCL; err-or. */
185                 GOTO(out_step_3, rc = -EEXIST); // returns a lock to the client
186         }
187
188         /* If we're opening a file without an EA, the client needs a write
189          * lock. */
190         if (child_mode != LCK_PW && S_ISREG(dchild->d_inode->i_mode) &&
191             !(body->valid & OBD_MD_FLEASIZE)) {
192                 ldlm_lock_decref(child_lockh, child_mode);
193                 child_mode = LCK_PW;
194                 goto reacquire;
195         }
196
197         /* Step 5: Open it */
198         rep->lock_policy_res1 |= IT_OPEN_OPEN;
199         mfd = kmem_cache_alloc(mds_file_cache, GFP_KERNEL);
200         if (!mfd) {
201                 CERROR("mds: out of memory\n");
202                 GOTO(out_step_4, req->rq_status = -ENOMEM);
203         }
204
205         /* dentry_open does a dput(de) and mntput(mds->mds_vfsmnt) on error */
206         mntget(mds->mds_vfsmnt);
207         file = dentry_open(dchild,mds->mds_vfsmnt,
208                            rec->ur_flags & ~(O_DIRECT | O_TRUNC));
209         if (IS_ERR(file))
210                 GOTO(out_step_5, rc = PTR_ERR(file));
211
212         file->private_data = mfd;
213         mfd->mfd_file = file;
214         get_random_bytes(&mfd->mfd_servercookie, sizeof(mfd->mfd_servercookie));
215         spin_lock(&med->med_open_lock);
216         list_add(&mfd->mfd_list, &med->med_open_head);
217         spin_unlock(&med->med_open_lock);
218
219         body->handle.addr = (__u64)(unsigned long)mfd;
220         body->handle.cookie = mfd->mfd_servercookie;
221         CDEBUG(D_INODE, "file %p: mfd %p, cookie "LPX64"\n",
222                mfd->mfd_file, mfd, mfd->mfd_servercookie);
223         GOTO(out_step_2, rc = 0); /* returns a lock to the client */
224
225  out_step_5:
226         if (mfd != NULL) {
227                 kmem_cache_free(mds_file_cache, mfd);
228                 mfd = NULL;
229         }
230  out_step_4:
231         ldlm_lock_decref(child_lockh, child_mode);
232  out_step_3:
233         l_dput(dchild);
234  out_step_2:
235         l_dput(parent);
236         ldlm_lock_decref(&parent_lockh, parent_mode);
237         RETURN(rc);
238 }