1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (c) 2003 Cluster File Systems, Inc.
5 * Author: Peter Braam <braam@clusterfs.com>
6 * Author: Andreas Dilger <adilger@clusterfs.com>
7 * Author: Phil Schwan <phil@clusterfs.com>
8 * Author: Mike Shaver <shaver@clusterfs.com>
10 * This file is part of Lustre, http://www.lustre.org.
12 * Lustre is free software; you can redistribute it and/or
13 * modify it under the terms of version 2 of the GNU General Public
14 * License as published by the Free Software Foundation.
16 * Lustre is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 * GNU General Public License for more details.
21 * You should have received a copy of the GNU General Public License
22 * along with Lustre; if not, write to the Free Software
23 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
27 #define DEBUG_SUBSYSTEM S_MDS
29 #include <linux/module.h>
30 #include <linux/lustre_mds.h>
31 #include <linux/lustre_dlm.h>
32 #include <linux/init.h>
33 #include <linux/obd_class.h>
34 #include <linux/random.h>
35 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
36 # include <linux/buffer_head.h>
37 # include <linux/workqueue.h>
39 # include <linux/locks.h>
41 #include <linux/obd_lov.h>
42 #include <linux/lustre_mds.h>
43 #include <linux/lustre_fsfilt.h>
44 #include <linux/lprocfs_status.h>
46 #include "mds_internal.h"
48 extern inline struct mds_obd *mds_req2mds(struct ptlrpc_request *req);
49 int mds_finish_transno(struct mds_obd *mds, struct inode *i, void *handle,
50 struct ptlrpc_request *req, int rc, __u32 op_data);
51 extern int enqueue_ordered_locks(int lock_mode, struct obd_device *obd,
52 struct ldlm_res_id *p1_res_id,
53 struct ldlm_res_id *p2_res_id,
54 struct ldlm_res_id *c1_res_id,
55 struct ldlm_res_id *c2_res_id,
56 struct lustre_handle *p1_lockh,
57 struct lustre_handle *p2_lockh,
58 struct lustre_handle *c1_lockh,
59 struct lustre_handle *c2_lockh);
61 struct mds_file_data *mds_dentry_open(struct dentry *dentry,
64 struct ptlrpc_request *req)
66 struct mds_export_data *med = &req->rq_export->exp_mds_data;
69 struct mds_file_data *mfd;
74 CERROR("mds: out of memory\n");
75 GOTO(cleanup_dentry, error = -ENOMEM);
78 mode = (flags+1) & O_ACCMODE;
79 inode = dentry->d_inode;
81 if (mode & FMODE_WRITE) {
82 error = get_write_access(inode);
88 mfd->mfd_dentry = dentry;
89 mfd->mfd_xid = req->rq_xid;
91 spin_lock(&med->med_open_lock);
92 list_add(&mfd->mfd_list, &med->med_open_head);
93 spin_unlock(&med->med_open_lock);
103 return ERR_PTR(error);
106 void reconstruct_open(struct mds_update_record *rec, int offset,
107 struct ptlrpc_request *req,
108 struct lustre_handle *child_lockh)
110 struct mds_export_data *med = &req->rq_export->exp_mds_data;
111 struct mds_client_data *mcd = med->med_mcd;
112 struct mds_obd *mds = mds_req2mds(req);
113 struct mds_file_data *mfd;
114 struct obd_device *obd = req->rq_export->exp_obd;
115 struct dentry *parent, *child;
116 struct ldlm_reply *rep;
117 struct mds_body *body;
121 LASSERT(offset == 2); /* only called via intent */
122 rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*rep));
123 body = lustre_msg_buf(req->rq_repmsg, 1, sizeof (*body));
125 /* copy rc, transno and disp; steal locks */
126 req->rq_transno = mcd->mcd_last_transno;
127 req->rq_status = mcd->mcd_last_result;
128 disp = rep->lock_policy_res1 = mcd->mcd_last_data;
130 if (req->rq_export->exp_outstanding_reply)
131 mds_steal_ack_locks(req->rq_export, req);
133 /* We never care about these. */
134 disp &= ~(IT_OPEN_LOOKUP | IT_OPEN_POS | IT_OPEN_NEG);
137 return; /* error looking up parent or child */
140 parent = mds_fid2dentry(mds, rec->ur_fid1, NULL);
141 LASSERT(!IS_ERR(parent));
143 child = ll_lookup_one_len(rec->ur_name, parent, rec->ur_namelen - 1);
144 LASSERT(!IS_ERR(child));
146 if (!child->d_inode) {
147 GOTO(out_dput, 0); /* child not present to open */
150 /* At this point, we know we have a child, which means that we'll send
151 * it back _unless_ it was open failed, _and_ we didn't create the file.
152 * I love you guys. No, really.
154 if (((disp & (IT_OPEN_OPEN | IT_OPEN_CREATE)) == IT_OPEN_OPEN) &&
159 /* get lock (write for O_CREAT, read otherwise) */
161 mds_pack_inode2fid(&body->fid1, child->d_inode);
162 mds_pack_inode2body(body, child->d_inode);
163 if (S_ISREG(child->d_inode->i_mode)) {
164 rc = mds_pack_md(obd, req->rq_repmsg, 2, body,
167 LASSERT(rc == req->rq_status);
169 /* XXX need to check this case */
172 /* If we're opening a file without an EA, change to a write
173 lock (unless we already have one). */
175 /* If we have -EEXIST as the status, and we were asked to create
176 * exclusively, we can tell we failed because the file already existed.
178 if (req->rq_status == -EEXIST &&
179 ((rec->ur_flags & (O_CREAT | O_EXCL)) == (O_CREAT | O_EXCL))) {
183 /* If we didn't get as far as trying to open, then some locking thing
184 * probably went wrong, and we'll just bail here.
186 if ((disp & IT_OPEN_OPEN) == 0) {
190 /* If we failed, then we must have failed opening, so don't look for
191 * file descriptor or anything, just give the client the bad news.
193 if (req->rq_status) {
197 if (req->rq_export->exp_outstanding_reply) {
200 /* XXX can we just look in the old reply to find the handle in
202 list_for_each(t, &med->med_open_head) {
203 mfd = list_entry(t, struct mds_file_data, mfd_list);
204 if (mfd->mfd_xid == req->rq_xid)
208 /* if we're not recovering, it had better be found */
211 mntget(mds->mds_vfsmnt);
212 mfd = mds_dentry_open(child, mds->mds_vfsmnt,
213 rec->ur_flags & ~(O_DIRECT | O_TRUNC), req);
215 CERROR("mds: out of memory\n");
216 GOTO(out_dput, req->rq_status = -ENOMEM);
220 body->handle.cookie = mfd->mfd_handle.h_cookie;
228 int mds_open(struct mds_update_record *rec, int offset,
229 struct ptlrpc_request *req, struct lustre_handle *child_lockh)
231 static const char acc_table [] = {[O_RDONLY] MAY_READ,
232 [O_WRONLY] MAY_WRITE,
233 [O_RDWR] MAY_READ | MAY_WRITE};
234 struct mds_obd *mds = mds_req2mds(req);
235 struct obd_device *obd = req->rq_export->exp_obd;
236 struct ldlm_reply *rep;
237 struct mds_body *body;
238 struct dentry *dchild = NULL, *parent;
239 struct mds_export_data *med;
240 struct mds_file_data *mfd = NULL;
241 struct ldlm_res_id child_res_id = { .name = {0} };
242 struct lustre_handle parent_lockh;
243 int rc = 0, parent_mode, child_mode = LCK_PR, lock_flags, created = 0;
244 int cleanup_phase = 0;
249 LASSERT(offset == 2); /* only called via intent */
250 rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*rep));
251 body = lustre_msg_buf(req->rq_repmsg, 1, sizeof (*body));
253 MDS_CHECK_RESENT(req, reconstruct_open(rec, offset, req, child_lockh));
255 med = &req->rq_export->exp_mds_data;
256 rep->lock_policy_res1 |= IT_OPEN_LOOKUP;
257 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_OPEN_PACK)) {
258 CERROR("test case OBD_FAIL_MDS_OPEN_PACK\n");
259 req->rq_status = -ENOMEM;
263 if ((rec->ur_flags & O_ACCMODE) >= sizeof (acc_table))
265 acc_mode = acc_table [rec->ur_flags & O_ACCMODE];
266 if ((rec->ur_flags & O_TRUNC) != 0)
267 acc_mode |= MAY_WRITE;
269 /* Step 1: Find and lock the parent */
270 parent_mode = (rec->ur_flags & O_CREAT) ? LCK_PW : LCK_PR;
271 parent = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, parent_mode,
273 if (IS_ERR(parent)) {
274 rc = PTR_ERR(parent);
275 CERROR("parent lookup error %d\n", rc);
278 LASSERT(parent->d_inode);
280 cleanup_phase = 1; /* parent dentry and lock */
282 /* Step 2: Lookup the child */
283 dchild = ll_lookup_one_len(rec->ur_name, parent, rec->ur_namelen - 1);
285 GOTO(cleanup, rc = PTR_ERR(dchild));
287 cleanup_phase = 2; /* child dentry */
290 rep->lock_policy_res1 |= IT_OPEN_POS;
292 rep->lock_policy_res1 |= IT_OPEN_NEG;
294 /* Step 3: If the child was negative, and we're supposed to,
296 if (!dchild->d_inode) {
297 if (!(rec->ur_flags & O_CREAT)) {
298 /* It's negative and we weren't supposed to create it */
299 GOTO(cleanup, rc = -ENOENT);
302 rep->lock_policy_res1 |= IT_OPEN_CREATE;
303 handle = fsfilt_start(obd, parent->d_inode, FSFILT_OP_CREATE);
304 if (IS_ERR(handle)) {
305 rc = PTR_ERR(handle);
309 rc = vfs_create(parent->d_inode, dchild, rec->ur_mode);
314 acc_mode = 0; /* Don't check for permissions */
317 /* Step 4: It's positive, so lock the child */
318 child_res_id.name[0] = dchild->d_inode->i_ino;
319 child_res_id.name[1] = dchild->d_inode->i_generation;
322 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
323 child_res_id, LDLM_PLAIN, NULL, 0, child_mode,
324 &lock_flags, ldlm_completion_ast,
325 mds_blocking_ast, NULL, child_lockh);
326 if (rc != ELDLM_OK) {
327 CERROR("ldlm_cli_enqueue: %d\n", rc);
328 GOTO(cleanup, rc = -EIO);
331 cleanup_phase = 3; /* child lock */
333 mds_pack_inode2fid(&body->fid1, dchild->d_inode);
334 mds_pack_inode2body(body, dchild->d_inode);
336 if (S_ISREG(dchild->d_inode->i_mode)) {
337 /* Check permissions etc */
338 rc = permission(dchild->d_inode, acc_mode);
342 /* Can't write to a read-only file */
343 if (IS_RDONLY(dchild->d_inode) && (acc_mode & MAY_WRITE) != 0)
344 GOTO(cleanup, rc = -EPERM);
346 /* An append-only file must be opened in append mode for
348 if (IS_APPEND(dchild->d_inode) &&
349 (acc_mode & MAY_WRITE) != 0 &&
350 ((rec->ur_flags & O_APPEND) == 0 ||
351 (rec->ur_flags & O_TRUNC) != 0))
352 GOTO (cleanup, rc = -EPERM);
354 rc = mds_pack_md(obd, req->rq_repmsg, 2, body, dchild->d_inode);
359 if (!created && (rec->ur_flags & O_CREAT) &&
360 (rec->ur_flags & O_EXCL)) {
361 /* File already exists, we didn't just create it, and we
362 * were passed O_EXCL; err-or. */
363 GOTO(cleanup, rc = -EEXIST); // returns a lock to the client
366 /* If we're opening a file without an EA, the client needs a write
368 if (S_ISREG(dchild->d_inode->i_mode) &&
369 child_mode != LCK_PW && !(body->valid & OBD_MD_FLEASIZE)) {
370 ldlm_lock_decref(child_lockh, child_mode);
375 /* if we are following a symlink, don't open */
376 if (S_ISLNK(dchild->d_inode->i_mode))
377 GOTO(cleanup, rc = 0);
379 /* Step 5: mds_open it */
380 rep->lock_policy_res1 |= IT_OPEN_OPEN;
382 /* dentry_open does a dput(de) and mntput(mds->mds_vfsmnt) on error */
383 mfd = mds_dentry_open(dchild, mds->mds_vfsmnt,
384 rec->ur_flags & ~(O_DIRECT | O_TRUNC), req);
386 CERROR("mds: out of memory\n");
387 dchild = NULL; /* prevent a double dput in step 2 */
388 GOTO(cleanup, rc = -ENOMEM);
391 cleanup_phase = 4; /* mfd allocated */
392 body->handle.cookie = mfd->mfd_handle.h_cookie;
393 CDEBUG(D_INODE, "mfd %p, cookie "LPX64"\n", mfd,
394 mfd->mfd_handle.h_cookie);
395 GOTO(cleanup, rc = 0); /* returns a lock to the client */
398 rc = mds_finish_transno(mds, dchild ? dchild->d_inode : NULL, handle,
399 req, rc, rep->lock_policy_res1);
400 switch (cleanup_phase) {
402 if (rc && !S_ISLNK(dchild->d_inode->i_mode))
403 mds_mfd_destroy(mfd);
405 /* This is the same logic as in the IT_OPEN part of
406 * ldlm_intent_policy: if we found the dentry, or we tried to
407 * open it (meaning that we created, if it wasn't found), then
408 * we return the lock to the caller and client. */
409 if (!(rep->lock_policy_res1 & (IT_OPEN_OPEN | IT_OPEN_POS)))
410 ldlm_lock_decref(child_lockh, child_mode);
412 if (rc || S_ISLNK(dchild->d_inode->i_mode))
417 ldlm_lock_decref(&parent_lockh, parent_mode);
419 memcpy(&req->rq_ack_locks[0].lock, &parent_lockh,
420 sizeof(parent_lockh));
421 req->rq_ack_locks[0].mode = parent_mode;