1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (c) 2003 Cluster File Systems, Inc.
5 * Author: Peter Braam <braam@clusterfs.com>
6 * Author: Andreas Dilger <adilger@clusterfs.com>
7 * Author: Phil Schwan <phil@clusterfs.com>
8 * Author: Mike Shaver <shaver@clusterfs.com>
10 * This file is part of Lustre, http://www.lustre.org.
12 * Lustre is free software; you can redistribute it and/or
13 * modify it under the terms of version 2 of the GNU General Public
14 * License as published by the Free Software Foundation.
16 * Lustre is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 * GNU General Public License for more details.
21 * You should have received a copy of the GNU General Public License
22 * along with Lustre; if not, write to the Free Software
23 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
27 #define DEBUG_SUBSYSTEM S_MDS
29 #include <linux/module.h>
30 #include <linux/lustre_mds.h>
31 #include <linux/lustre_dlm.h>
32 #include <linux/init.h>
33 #include <linux/obd_class.h>
34 #include <linux/random.h>
35 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
36 # include <linux/buffer_head.h>
37 # include <linux/workqueue.h>
39 # include <linux/locks.h>
41 #include <linux/obd_lov.h>
42 #include <linux/lustre_mds.h>
43 #include <linux/lustre_fsfilt.h>
44 #include <linux/lprocfs_status.h>
46 #include "mds_internal.h"
48 extern inline struct mds_obd *mds_req2mds(struct ptlrpc_request *req);
49 int mds_finish_transno(struct mds_obd *mds, struct inode *i, void *handle,
50 struct ptlrpc_request *req, int rc, __u32 op_data);
51 extern int enqueue_ordered_locks(int lock_mode, struct obd_device *obd,
52 struct ldlm_res_id *p1_res_id,
53 struct ldlm_res_id *p2_res_id,
54 struct ldlm_res_id *c1_res_id,
55 struct ldlm_res_id *c2_res_id,
56 struct lustre_handle *p1_lockh,
57 struct lustre_handle *p2_lockh,
58 struct lustre_handle *c1_lockh,
59 struct lustre_handle *c2_lockh);
61 struct mds_file_data *mds_dentry_open(struct dentry *dentry,
64 struct ptlrpc_request *req)
66 struct mds_export_data *med = &req->rq_export->exp_mds_data;
69 struct mds_file_data *mfd;
74 CERROR("mds: out of memory\n");
75 GOTO(cleanup_dentry, error = -ENOMEM);
78 mode = (flags+1) & O_ACCMODE;
79 inode = dentry->d_inode;
81 if (mode & FMODE_WRITE) {
82 error = get_write_access(inode);
88 mfd->mfd_dentry = dentry;
89 mfd->mfd_xid = req->rq_xid;
91 spin_lock(&med->med_open_lock);
92 list_add(&mfd->mfd_list, &med->med_open_head);
93 spin_unlock(&med->med_open_lock);
103 return ERR_PTR(error);
106 void reconstruct_open(struct mds_update_record *rec, int offset,
107 struct ptlrpc_request *req,
108 struct lustre_handle *child_lockh)
110 struct mds_export_data *med = &req->rq_export->exp_mds_data;
111 struct mds_client_data *mcd = med->med_mcd;
112 struct mds_obd *mds = mds_req2mds(req);
113 struct mds_file_data *mfd;
114 struct obd_device *obd = req->rq_export->exp_obd;
115 struct dentry *parent, *child;
116 struct ldlm_reply *rep;
117 struct mds_body *body;
123 LASSERT(offset == 2); /* only called via intent */
124 rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*rep));
125 body = lustre_msg_buf(req->rq_repmsg, 1, sizeof (*body));
127 /* copy rc, transno and disp; steal locks */
128 req->rq_transno = mcd->mcd_last_transno;
129 req->rq_status = mcd->mcd_last_result;
130 disp = rep->lock_policy_res1 = mcd->mcd_last_data;
132 if (req->rq_export->exp_outstanding_reply)
133 mds_steal_ack_locks(req->rq_export, req);
135 /* We never care about these. */
136 disp &= ~(IT_OPEN_LOOKUP | IT_OPEN_POS | IT_OPEN_NEG);
139 return; /* error looking up parent or child */
142 parent = mds_fid2dentry(mds, rec->ur_fid1, NULL);
143 LASSERT(!IS_ERR(parent));
145 child = ll_lookup_one_len(rec->ur_name, parent, rec->ur_namelen - 1);
146 LASSERT(!IS_ERR(child));
148 if (!child->d_inode) {
149 GOTO(out_dput, 0); /* child not present to open */
152 /* At this point, we know we have a child, which means that we'll send
153 * it back _unless_ it was open failed, _and_ we didn't create the file.
154 * I love you guys. No, really.
156 if (((disp & (IT_OPEN_OPEN | IT_OPEN_CREATE)) == IT_OPEN_OPEN) &&
161 /* get lock (write for O_CREAT, read otherwise) */
163 mds_pack_inode2fid(&body->fid1, child->d_inode);
164 mds_pack_inode2body(body, child->d_inode);
165 if (S_ISREG(child->d_inode->i_mode)) {
166 rc = mds_pack_md(obd, req->rq_repmsg, 2, body,
169 LASSERT(rc == req->rq_status);
171 /* XXX need to check this case */
174 /* If we're opening a file without an EA, change to a write
175 lock (unless we already have one). */
177 /* If we have -EEXIST as the status, and we were asked to create
178 * exclusively, we can tell we failed because the file already existed.
180 if (req->rq_status == -EEXIST &&
181 ((rec->ur_flags & (O_CREAT | O_EXCL)) == (O_CREAT | O_EXCL))) {
185 /* If we didn't get as far as trying to open, then some locking thing
186 * probably went wrong, and we'll just bail here.
188 if ((disp & IT_OPEN_OPEN) == 0)
191 /* If we failed, then we must have failed opening, so don't look for
192 * file descriptor or anything, just give the client the bad news.
198 list_for_each(t, &med->med_open_head) {
199 mfd = list_entry(t, struct mds_file_data, mfd_list);
200 if (mfd->mfd_xid == req->rq_xid)
205 if (req->rq_export->exp_outstanding_reply) {
206 /* if we're not recovering, it had better be found */
208 } else if (mfd == NULL) {
209 mntget(mds->mds_vfsmnt);
210 CERROR("Re-opened file \n");
211 mfd = mds_dentry_open(child, mds->mds_vfsmnt,
212 rec->ur_flags & ~(O_DIRECT | O_TRUNC), req);
214 CERROR("mds: out of memory\n");
215 GOTO(out_dput, req->rq_status = -ENOMEM);
220 body->handle.cookie = mfd->mfd_handle.h_cookie;
229 int mds_open(struct mds_update_record *rec, int offset,
230 struct ptlrpc_request *req, struct lustre_handle *child_lockh)
232 static const char acc_table [] = {[O_RDONLY] MAY_READ,
233 [O_WRONLY] MAY_WRITE,
234 [O_RDWR] MAY_READ | MAY_WRITE};
235 struct mds_obd *mds = mds_req2mds(req);
236 struct obd_device *obd = req->rq_export->exp_obd;
237 struct ldlm_reply *rep;
238 struct mds_body *body;
239 struct dentry *dchild = NULL, *parent;
240 struct mds_export_data *med;
241 struct mds_file_data *mfd = NULL;
242 struct ldlm_res_id child_res_id = { .name = {0} };
243 struct lustre_handle parent_lockh;
244 int rc = 0, parent_mode, child_mode = LCK_PR, lock_flags, created = 0;
245 int cleanup_phase = 0;
250 LASSERT(offset == 2); /* only called via intent */
251 rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*rep));
252 body = lustre_msg_buf(req->rq_repmsg, 1, sizeof (*body));
254 MDS_CHECK_RESENT(req, reconstruct_open(rec, offset, req, child_lockh));
256 med = &req->rq_export->exp_mds_data;
257 rep->lock_policy_res1 |= IT_OPEN_LOOKUP;
258 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_OPEN_PACK)) {
259 CERROR("test case OBD_FAIL_MDS_OPEN_PACK\n");
260 req->rq_status = -ENOMEM;
264 if ((rec->ur_flags & O_ACCMODE) >= sizeof (acc_table))
266 acc_mode = acc_table [rec->ur_flags & O_ACCMODE];
267 if ((rec->ur_flags & O_TRUNC) != 0)
268 acc_mode |= MAY_WRITE;
270 /* Step 1: Find and lock the parent */
271 parent_mode = (rec->ur_flags & O_CREAT) ? LCK_PW : LCK_PR;
272 parent = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, parent_mode,
274 if (IS_ERR(parent)) {
275 rc = PTR_ERR(parent);
276 CERROR("parent lookup error %d\n", rc);
279 LASSERT(parent->d_inode);
281 cleanup_phase = 1; /* parent dentry and lock */
283 /* Step 2: Lookup the child */
284 dchild = ll_lookup_one_len(rec->ur_name, parent, rec->ur_namelen - 1);
286 GOTO(cleanup, rc = PTR_ERR(dchild));
288 cleanup_phase = 2; /* child dentry */
291 rep->lock_policy_res1 |= IT_OPEN_POS;
293 rep->lock_policy_res1 |= IT_OPEN_NEG;
295 /* Step 3: If the child was negative, and we're supposed to,
297 if (!dchild->d_inode) {
298 if (!(rec->ur_flags & O_CREAT)) {
299 /* It's negative and we weren't supposed to create it */
300 GOTO(cleanup, rc = -ENOENT);
303 rep->lock_policy_res1 |= IT_OPEN_CREATE;
304 handle = fsfilt_start(obd, parent->d_inode, FSFILT_OP_CREATE);
305 if (IS_ERR(handle)) {
306 rc = PTR_ERR(handle);
310 rc = vfs_create(parent->d_inode, dchild, rec->ur_mode);
315 acc_mode = 0; /* Don't check for permissions */
318 /* Step 4: It's positive, so lock the child */
319 child_res_id.name[0] = dchild->d_inode->i_ino;
320 child_res_id.name[1] = dchild->d_inode->i_generation;
323 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
324 child_res_id, LDLM_PLAIN, NULL, 0, child_mode,
325 &lock_flags, ldlm_completion_ast,
326 mds_blocking_ast, NULL, child_lockh);
327 if (rc != ELDLM_OK) {
328 CERROR("ldlm_cli_enqueue: %d\n", rc);
329 GOTO(cleanup, rc = -EIO);
332 cleanup_phase = 3; /* child lock */
334 mds_pack_inode2fid(&body->fid1, dchild->d_inode);
335 mds_pack_inode2body(body, dchild->d_inode);
337 if (S_ISREG(dchild->d_inode->i_mode)) {
338 /* Check permissions etc */
339 rc = permission(dchild->d_inode, acc_mode);
343 /* Can't write to a read-only file */
344 if (IS_RDONLY(dchild->d_inode) && (acc_mode & MAY_WRITE) != 0)
345 GOTO(cleanup, rc = -EPERM);
347 /* An append-only file must be opened in append mode for
349 if (IS_APPEND(dchild->d_inode) &&
350 (acc_mode & MAY_WRITE) != 0 &&
351 ((rec->ur_flags & O_APPEND) == 0 ||
352 (rec->ur_flags & O_TRUNC) != 0))
353 GOTO (cleanup, rc = -EPERM);
355 rc = mds_pack_md(obd, req->rq_repmsg, 2, body, dchild->d_inode);
360 if (!created && (rec->ur_flags & O_CREAT) &&
361 (rec->ur_flags & O_EXCL)) {
362 /* File already exists, we didn't just create it, and we
363 * were passed O_EXCL; err-or. */
364 GOTO(cleanup, rc = -EEXIST); // returns a lock to the client
367 /* If we're opening a file without an EA, the client needs a write
369 if (S_ISREG(dchild->d_inode->i_mode) &&
370 child_mode != LCK_PW && !(body->valid & OBD_MD_FLEASIZE)) {
371 ldlm_lock_decref(child_lockh, child_mode);
376 /* if we are following a symlink, don't open */
377 if (S_ISLNK(dchild->d_inode->i_mode))
378 GOTO(cleanup, rc = 0);
380 if ((rec->ur_flags & O_DIRECTORY) && !S_ISDIR(dchild->d_inode->i_mode))
381 GOTO(cleanup, rc = -ENOTDIR);
383 /* Step 5: mds_open it */
384 rep->lock_policy_res1 |= IT_OPEN_OPEN;
386 /* dentry_open does a dput(de) and mntput(mds->mds_vfsmnt) on error */
387 mfd = mds_dentry_open(dchild, mds->mds_vfsmnt,
388 rec->ur_flags & ~(O_DIRECT | O_TRUNC), req);
390 CERROR("mds: out of memory\n");
391 dchild = NULL; /* prevent a double dput in step 2 */
392 GOTO(cleanup, rc = -ENOMEM);
395 cleanup_phase = 4; /* mfd allocated */
396 body->handle.cookie = mfd->mfd_handle.h_cookie;
397 CDEBUG(D_INODE, "mfd %p, cookie "LPX64"\n", mfd,
398 mfd->mfd_handle.h_cookie);
399 GOTO(cleanup, rc = 0); /* returns a lock to the client */
402 rc = mds_finish_transno(mds, dchild ? dchild->d_inode : NULL, handle,
403 req, rc, rep->lock_policy_res1);
404 switch (cleanup_phase) {
406 if (rc && !S_ISLNK(dchild->d_inode->i_mode))
407 mds_mfd_destroy(mfd);
409 /* This is the same logic as in the IT_OPEN part of
410 * ldlm_intent_policy: if we found the dentry, or we tried to
411 * open it (meaning that we created, if it wasn't found), then
412 * we return the lock to the caller and client. */
413 if (!(rep->lock_policy_res1 & (IT_OPEN_OPEN | IT_OPEN_POS)))
414 ldlm_lock_decref(child_lockh, child_mode);
416 if (rc || S_ISLNK(dchild->d_inode->i_mode))
421 ldlm_lock_decref(&parent_lockh, parent_mode);
423 memcpy(&req->rq_ack_locks[0].lock, &parent_lockh,
424 sizeof(parent_lockh));
425 req->rq_ack_locks[0].mode = parent_mode;