Whamcloud - gitweb
merge b_devel into HEAD (20030703)
[fs/lustre-release.git] / lustre / mds / mds_open.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2003 Cluster File Systems, Inc.
5  *   Author: Peter Braam <braam@clusterfs.com>
6  *   Author: Andreas Dilger <adilger@clusterfs.com>
7  *   Author: Phil Schwan <phil@clusterfs.com>
8  *   Author: Mike Shaver <shaver@clusterfs.com>
9  *
10  *   This file is part of Lustre, http://www.lustre.org.
11  *
12  *   Lustre is free software; you can redistribute it and/or
13  *   modify it under the terms of version 2 of the GNU General Public
14  *   License as published by the Free Software Foundation.
15  *
16  *   Lustre is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with Lustre; if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  */
25
26 #define EXPORT_SYMTAB
27 #define DEBUG_SUBSYSTEM S_MDS
28
29 #include <linux/module.h>
30 #include <linux/lustre_mds.h>
31 #include <linux/lustre_dlm.h>
32 #include <linux/init.h>
33 #include <linux/obd_class.h>
34 #include <linux/random.h>
35 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
36 # include <linux/buffer_head.h>
37 # include <linux/workqueue.h>
38 #else
39 # include <linux/locks.h>
40 #endif
41 #include <linux/obd_lov.h>
42 #include <linux/lustre_mds.h>
43 #include <linux/lustre_fsfilt.h>
44 #include <linux/lprocfs_status.h>
45
46 #include "mds_internal.h"
47
48 extern inline struct mds_obd *mds_req2mds(struct ptlrpc_request *req);
49 int mds_finish_transno(struct mds_obd *mds, struct inode *i, void *handle,
50                        struct ptlrpc_request *req, int rc, __u32 op_data);
51 extern int enqueue_ordered_locks(int lock_mode, struct obd_device *obd,
52                                  struct ldlm_res_id *p1_res_id,
53                                  struct ldlm_res_id *p2_res_id,
54                                  struct ldlm_res_id *c1_res_id,
55                                  struct ldlm_res_id *c2_res_id,
56                                  struct lustre_handle *p1_lockh,
57                                  struct lustre_handle *p2_lockh,
58                                  struct lustre_handle *c1_lockh,
59                                  struct lustre_handle *c2_lockh);
60
61 struct mds_file_data *mds_dentry_open(struct dentry *dentry,
62                                       struct vfsmount *mnt,
63                                       int flags,
64                                       struct ptlrpc_request *req)
65 {
66         struct mds_export_data *med = &req->rq_export->exp_mds_data;
67         struct inode *inode;
68         int mode;
69         struct mds_file_data *mfd;
70         int error;
71
72         mfd = mds_mfd_new();
73         if (!mfd) {
74                 CERROR("mds: out of memory\n");
75                 GOTO(cleanup_dentry, error = -ENOMEM);
76         }
77
78         mode = (flags+1) & O_ACCMODE;
79         inode = dentry->d_inode;
80
81         if (mode & FMODE_WRITE) {
82                 error = get_write_access(inode);
83                 if (error)
84                         goto cleanup_mfd;
85         }
86
87         mfd->mfd_mode = mode;
88         mfd->mfd_dentry = dentry;
89         mfd->mfd_xid = req->rq_xid;
90
91         spin_lock(&med->med_open_lock);
92         list_add(&mfd->mfd_list, &med->med_open_head);
93         spin_unlock(&med->med_open_lock);
94         mds_mfd_put(mfd);
95         return mfd;
96
97 cleanup_mfd:
98         mds_mfd_put(mfd);
99         mds_mfd_destroy(mfd);
100 cleanup_dentry:
101         dput(dentry);
102         mntput(mnt);
103         return ERR_PTR(error);
104 }
105
106 void reconstruct_open(struct mds_update_record *rec, int offset,
107                       struct ptlrpc_request *req,
108                       struct lustre_handle *child_lockh)
109 {
110         struct mds_export_data *med = &req->rq_export->exp_mds_data;
111         struct mds_client_data *mcd = med->med_mcd;
112         struct mds_obd *mds = mds_req2mds(req);
113         struct mds_file_data *mfd;
114         struct obd_device *obd = req->rq_export->exp_obd;
115         struct dentry *parent, *child;
116         struct ldlm_reply *rep;
117         struct mds_body *body;
118         int disp, rc;
119         struct list_head *t;
120         int put_child = 1;
121         ENTRY;
122
123         LASSERT(offset == 2);                  /* only called via intent */
124         rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*rep));
125         body = lustre_msg_buf(req->rq_repmsg, 1, sizeof (*body));
126
127         /* copy rc, transno and disp; steal locks */
128         req->rq_transno = mcd->mcd_last_transno;
129         req->rq_status = mcd->mcd_last_result;
130         disp = rep->lock_policy_res1 = mcd->mcd_last_data;
131
132         if (req->rq_export->exp_outstanding_reply)
133                 mds_steal_ack_locks(req->rq_export, req);
134
135         /* We never care about these. */
136         disp &= ~(IT_OPEN_LOOKUP | IT_OPEN_POS | IT_OPEN_NEG);
137         if (!disp) {
138                 EXIT;
139                 return; /* error looking up parent or child */
140         }
141
142         parent = mds_fid2dentry(mds, rec->ur_fid1, NULL);
143         LASSERT(!IS_ERR(parent));
144
145         child = ll_lookup_one_len(rec->ur_name, parent, rec->ur_namelen - 1);
146         LASSERT(!IS_ERR(child));
147
148         if (!child->d_inode) {
149                 GOTO(out_dput, 0); /* child not present to open */
150         }
151
152         /* At this point, we know we have a child, which means that we'll send
153          * it back _unless_ it was open failed, _and_ we didn't create the file.
154          * I love you guys.  No, really.
155          */
156         if (((disp & (IT_OPEN_OPEN | IT_OPEN_CREATE)) == IT_OPEN_OPEN) &&
157             req->rq_status) {
158                 GOTO(out_dput, 0);
159         }
160
161         /* get lock (write for O_CREAT, read otherwise) */
162
163         mds_pack_inode2fid(&body->fid1, child->d_inode);
164         mds_pack_inode2body(body, child->d_inode);
165         if (S_ISREG(child->d_inode->i_mode)) {
166                 rc = mds_pack_md(obd, req->rq_repmsg, 2, body,
167                                  child->d_inode);
168                 if (rc)
169                         LASSERT(rc == req->rq_status);
170         } else {
171                 /* XXX need to check this case */
172         }
173
174         /* If we're opening a file without an EA, change to a write
175            lock (unless we already have one). */
176
177         /* If we have -EEXIST as the status, and we were asked to create
178          * exclusively, we can tell we failed because the file already existed.
179          */
180         if (req->rq_status == -EEXIST &&
181             ((rec->ur_flags & (O_CREAT | O_EXCL)) == (O_CREAT | O_EXCL))) {
182                 GOTO(out_dput, 0);
183         }
184
185         /* If we didn't get as far as trying to open, then some locking thing
186          * probably went wrong, and we'll just bail here.
187          */
188         if ((disp & IT_OPEN_OPEN) == 0)
189                 GOTO(out_dput, 0);
190
191         /* If we failed, then we must have failed opening, so don't look for
192          * file descriptor or anything, just give the client the bad news.
193          */
194         if (req->rq_status)
195                 GOTO(out_dput, 0);
196
197         mfd = NULL;
198         list_for_each(t, &med->med_open_head) {
199                 mfd = list_entry(t, struct mds_file_data, mfd_list);
200                 if (mfd->mfd_xid == req->rq_xid) 
201                         break;
202                 mfd = NULL;
203         }
204
205         if (req->rq_export->exp_outstanding_reply) {
206                 /* if we're not recovering, it had better be found */
207                 LASSERT(mfd);
208         } else if (mfd == NULL) {
209                 mntget(mds->mds_vfsmnt);
210                 CERROR("Re-opened file \n");
211                 mfd = mds_dentry_open(child, mds->mds_vfsmnt,
212                                    rec->ur_flags & ~(O_DIRECT | O_TRUNC), req);
213                 if (!mfd) {
214                         CERROR("mds: out of memory\n");
215                         GOTO(out_dput, req->rq_status = -ENOMEM);
216                 }
217                 put_child = 0;
218         }
219
220         body->handle.cookie = mfd->mfd_handle.h_cookie;
221
222  out_dput:
223         if (put_child)
224                 l_dput(child);
225         l_dput(parent);
226         EXIT;
227 }
228
229 int mds_open(struct mds_update_record *rec, int offset,
230              struct ptlrpc_request *req, struct lustre_handle *child_lockh)
231 {
232         static const char acc_table [] = {[O_RDONLY] MAY_READ,
233                                           [O_WRONLY] MAY_WRITE,
234                                           [O_RDWR]   MAY_READ | MAY_WRITE};
235         struct mds_obd *mds = mds_req2mds(req);
236         struct obd_device *obd = req->rq_export->exp_obd;
237         struct ldlm_reply *rep;
238         struct mds_body *body;
239         struct dentry *dchild = NULL, *parent;
240         struct mds_export_data *med;
241         struct mds_file_data *mfd = NULL;
242         struct ldlm_res_id child_res_id = { .name = {0} };
243         struct lustre_handle parent_lockh;
244         int rc = 0, parent_mode, child_mode = LCK_PR, lock_flags, created = 0;
245         int cleanup_phase = 0;
246         void *handle = NULL;
247         int acc_mode;
248         ENTRY;
249
250         LASSERT(offset == 2);                  /* only called via intent */
251         rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*rep));
252         body = lustre_msg_buf(req->rq_repmsg, 1, sizeof (*body));
253
254         MDS_CHECK_RESENT(req, reconstruct_open(rec, offset, req, child_lockh));
255
256         med = &req->rq_export->exp_mds_data;
257         rep->lock_policy_res1 |= IT_OPEN_LOOKUP;
258         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_OPEN_PACK)) {
259                 CERROR("test case OBD_FAIL_MDS_OPEN_PACK\n");
260                 req->rq_status = -ENOMEM;
261                 RETURN(-ENOMEM);
262         }
263
264         if ((rec->ur_flags & O_ACCMODE) >= sizeof (acc_table))
265                 RETURN(-EINVAL);
266         acc_mode = acc_table [rec->ur_flags & O_ACCMODE];
267         if ((rec->ur_flags & O_TRUNC) != 0)
268                 acc_mode |= MAY_WRITE;
269
270         /* Step 1: Find and lock the parent */
271         parent_mode = (rec->ur_flags & O_CREAT) ? LCK_PW : LCK_PR;
272         parent = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, parent_mode,
273                                        &parent_lockh);
274         if (IS_ERR(parent)) {
275                 rc = PTR_ERR(parent);
276                 CERROR("parent lookup error %d\n", rc);
277                 GOTO(cleanup, rc);
278         }
279         LASSERT(parent->d_inode);
280
281         cleanup_phase = 1; /* parent dentry and lock */
282
283         /* Step 2: Lookup the child */
284         dchild = ll_lookup_one_len(rec->ur_name, parent, rec->ur_namelen - 1);
285         if (IS_ERR(dchild))
286                 GOTO(cleanup, rc = PTR_ERR(dchild));
287
288         cleanup_phase = 2; /* child dentry */
289
290         if (dchild->d_inode)
291                 rep->lock_policy_res1 |= IT_OPEN_POS;
292         else
293                 rep->lock_policy_res1 |= IT_OPEN_NEG;
294
295         /* Step 3: If the child was negative, and we're supposed to,
296          * create it. */
297         if (!dchild->d_inode) {
298                 if (!(rec->ur_flags & O_CREAT)) {
299                         /* It's negative and we weren't supposed to create it */
300                         GOTO(cleanup, rc = -ENOENT);
301                 }
302
303                 rep->lock_policy_res1 |= IT_OPEN_CREATE;
304                 handle = fsfilt_start(obd, parent->d_inode, FSFILT_OP_CREATE);
305                 if (IS_ERR(handle)) {
306                         rc = PTR_ERR(handle);
307                         handle = NULL;
308                         GOTO(cleanup, rc);
309                 }
310                 rc = vfs_create(parent->d_inode, dchild, rec->ur_mode);
311                 if (rc)
312                         GOTO(cleanup, rc);
313                 created = 1;
314                 child_mode = LCK_PW;
315                 acc_mode = 0;                  /* Don't check for permissions */
316         }
317
318         /* Step 4: It's positive, so lock the child */
319         child_res_id.name[0] = dchild->d_inode->i_ino;
320         child_res_id.name[1] = dchild->d_inode->i_generation;
321  reacquire:
322         lock_flags = 0;
323         rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
324                               child_res_id, LDLM_PLAIN, NULL, 0, child_mode,
325                               &lock_flags, ldlm_completion_ast,
326                               mds_blocking_ast, NULL, child_lockh);
327         if (rc != ELDLM_OK) {
328                 CERROR("ldlm_cli_enqueue: %d\n", rc);
329                 GOTO(cleanup, rc = -EIO);
330         }
331
332         cleanup_phase = 3; /* child lock */
333
334         mds_pack_inode2fid(&body->fid1, dchild->d_inode);
335         mds_pack_inode2body(body, dchild->d_inode);
336
337         if (S_ISREG(dchild->d_inode->i_mode)) {
338                 /* Check permissions etc */
339                 rc = permission(dchild->d_inode, acc_mode);
340                 if (rc != 0)
341                         GOTO(cleanup, rc);
342
343                 /* Can't write to a read-only file */
344                 if (IS_RDONLY(dchild->d_inode) && (acc_mode & MAY_WRITE) != 0)
345                         GOTO(cleanup, rc = -EPERM);
346
347                 /* An append-only file must be opened in append mode for
348                  * writing */
349                 if (IS_APPEND(dchild->d_inode) &&
350                     (acc_mode & MAY_WRITE) != 0 &&
351                     ((rec->ur_flags & O_APPEND) == 0 ||
352                      (rec->ur_flags & O_TRUNC) != 0))
353                         GOTO (cleanup, rc = -EPERM);
354
355                 rc = mds_pack_md(obd, req->rq_repmsg, 2, body, dchild->d_inode);
356                 if (rc)
357                         GOTO(cleanup, rc);
358         }
359
360         if (!created && (rec->ur_flags & O_CREAT) &&
361             (rec->ur_flags & O_EXCL)) {
362                 /* File already exists, we didn't just create it, and we
363                  * were passed O_EXCL; err-or. */
364                 GOTO(cleanup, rc = -EEXIST); // returns a lock to the client
365         }
366
367         /* If we're opening a file without an EA, the client needs a write
368          * lock. */
369         if (S_ISREG(dchild->d_inode->i_mode) &&
370             child_mode != LCK_PW && !(body->valid & OBD_MD_FLEASIZE)) {
371                 ldlm_lock_decref(child_lockh, child_mode);
372                 child_mode = LCK_PW;
373                 goto reacquire;
374         }
375
376         /* if we are following a symlink, don't open */
377         if (S_ISLNK(dchild->d_inode->i_mode))
378                 GOTO(cleanup, rc = 0);
379
380         if ((rec->ur_flags & O_DIRECTORY) && !S_ISDIR(dchild->d_inode->i_mode))
381                 GOTO(cleanup, rc = -ENOTDIR);
382
383         /* Step 5: mds_open it */
384         rep->lock_policy_res1 |= IT_OPEN_OPEN;
385
386         /* dentry_open does a dput(de) and mntput(mds->mds_vfsmnt) on error */
387         mfd = mds_dentry_open(dchild, mds->mds_vfsmnt,
388                               rec->ur_flags & ~(O_DIRECT | O_TRUNC), req);
389         if (!mfd) {
390                 CERROR("mds: out of memory\n");
391                 dchild = NULL; /* prevent a double dput in step 2 */
392                 GOTO(cleanup, rc = -ENOMEM);
393         }
394
395         cleanup_phase = 4; /* mfd allocated */
396         body->handle.cookie = mfd->mfd_handle.h_cookie;
397         CDEBUG(D_INODE, "mfd %p, cookie "LPX64"\n", mfd,
398                mfd->mfd_handle.h_cookie);
399         GOTO(cleanup, rc = 0); /* returns a lock to the client */
400
401  cleanup:
402         rc = mds_finish_transno(mds, dchild ? dchild->d_inode : NULL, handle,
403                                 req, rc, rep->lock_policy_res1);
404         switch (cleanup_phase) {
405         case 4:
406                 if (rc && !S_ISLNK(dchild->d_inode->i_mode))
407                         mds_mfd_destroy(mfd);
408         case 3:
409                 /* This is the same logic as in the IT_OPEN part of
410                  * ldlm_intent_policy: if we found the dentry, or we tried to
411                  * open it (meaning that we created, if it wasn't found), then
412                  * we return the lock to the caller and client. */
413                 if (!(rep->lock_policy_res1 & (IT_OPEN_OPEN | IT_OPEN_POS)))
414                         ldlm_lock_decref(child_lockh, child_mode);
415         case 2:
416                 if (rc || S_ISLNK(dchild->d_inode->i_mode))
417                         l_dput(dchild);
418         case 1:
419                 l_dput(parent);
420                 if (rc) {
421                         ldlm_lock_decref(&parent_lockh, parent_mode);
422                 } else {
423                         memcpy(&req->rq_ack_locks[0].lock, &parent_lockh,
424                                sizeof(parent_lockh));
425                         req->rq_ack_locks[0].mode = parent_mode;
426                 }
427         }
428         RETURN(rc);
429 }