1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (c) 2003 Cluster File Systems, Inc.
5 * Author: Peter Braam <braam@clusterfs.com>
6 * Author: Andreas Dilger <adilger@clusterfs.com>
7 * Author: Phil Schwan <phil@clusterfs.com>
8 * Author: Mike Shaver <shaver@clusterfs.com>
10 * This file is part of Lustre, http://www.lustre.org.
12 * Lustre is free software; you can redistribute it and/or
13 * modify it under the terms of version 2 of the GNU General Public
14 * License as published by the Free Software Foundation.
16 * Lustre is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 * GNU General Public License for more details.
21 * You should have received a copy of the GNU General Public License
22 * along with Lustre; if not, write to the Free Software
23 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
27 #define DEBUG_SUBSYSTEM S_MDS
29 #include <linux/module.h>
30 #include <linux/lustre_mds.h>
31 #include <linux/lustre_dlm.h>
32 #include <linux/init.h>
33 #include <linux/obd_class.h>
34 #include <linux/random.h>
35 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
36 # include <linux/buffer_head.h>
37 # include <linux/workqueue.h>
39 # include <linux/locks.h>
41 #include <linux/obd_lov.h>
42 #include <linux/lustre_mds.h>
43 #include <linux/lustre_fsfilt.h>
44 #include <linux/lprocfs_status.h>
46 #include "mds_internal.h"
48 struct mds_file_data *mds_dentry_open(struct dentry *dentry,
51 struct ptlrpc_request *req)
53 struct mds_export_data *med = &req->rq_export->exp_mds_data;
55 struct mds_file_data *mfd;
60 CERROR("mds: out of memory\n");
61 GOTO(cleanup_dentry, error = -ENOMEM);
64 mode = (flags + 1) & O_ACCMODE;
65 inode = dentry->d_inode;
67 if (mode & FMODE_WRITE) {
68 error = get_write_access(inode);
74 mfd->mfd_dentry = dentry;
75 mfd->mfd_xid = req->rq_xid;
77 spin_lock(&med->med_open_lock);
78 list_add(&mfd->mfd_list, &med->med_open_head);
79 spin_unlock(&med->med_open_lock);
89 return ERR_PTR(error);
92 void reconstruct_open(struct mds_update_record *rec, int offset,
93 struct ptlrpc_request *req,
94 struct lustre_handle *child_lockh)
96 struct ptlrpc_request *oldreq = req->rq_export->exp_outstanding_reply;
97 struct mds_export_data *med = &req->rq_export->exp_mds_data;
98 struct mds_client_data *mcd = med->med_mcd;
99 struct mds_obd *mds = mds_req2mds(req);
100 struct mds_file_data *mfd;
101 struct obd_device *obd = req->rq_export->exp_obd;
102 struct dentry *parent, *child;
103 struct ldlm_reply *rep;
104 struct mds_body *body;
110 LASSERT(offset == 2); /* only called via intent */
111 rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*rep));
112 body = lustre_msg_buf(req->rq_repmsg, 1, sizeof (*body));
114 /* copy rc, transno and disp; steal locks */
115 req->rq_transno = mcd->mcd_last_transno;
116 req->rq_status = mcd->mcd_last_result;
117 intent_set_disposition(rep, mcd->mcd_last_data);
120 mds_steal_ack_locks(req->rq_export, req);
122 /* Only replay if create or open actually happened. */
123 if (!intent_disposition(rep, DISP_OPEN_CREATE | DISP_OPEN_OPEN) ) {
125 return; /* error looking up parent or child */
128 parent = mds_fid2dentry(mds, rec->ur_fid1, NULL);
129 LASSERT(!IS_ERR(parent));
131 child = ll_lookup_one_len(rec->ur_name, parent, rec->ur_namelen - 1);
132 LASSERT(!IS_ERR(child));
134 if (!child->d_inode) {
135 GOTO(out_dput, 0); /* child not present to open */
138 /* At this point, we know we have a child. We'll send
139 * it back _unless_ it not created and open failed.
141 if (intent_disposition(rep, DISP_OPEN_OPEN) &&
142 !intent_disposition(rep, DISP_OPEN_CREATE) &&
147 /* get lock (write for O_CREAT, read otherwise) */
149 mds_pack_inode2fid(&body->fid1, child->d_inode);
150 mds_pack_inode2body(body, child->d_inode);
151 if (S_ISREG(child->d_inode->i_mode)) {
152 rc = mds_pack_md(obd, req->rq_repmsg, 2, body,
156 LASSERT(rc == req->rq_status);
158 /* If we have LOV EA data, the OST holds size, mtime */
159 if (!(body->valid & OBD_MD_FLEASIZE))
160 body->valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
161 OBD_MD_FLATIME | OBD_MD_FLMTIME);
163 /* XXX need to check this case */
166 /* If we're opening a file without an EA, change to a write
167 lock (unless we already have one). */
169 /* If we have -EEXIST as the status, and we were asked to create
170 * exclusively, we can tell we failed because the file already existed.
172 if (req->rq_status == -EEXIST &&
173 ((rec->ur_flags & (O_CREAT | O_EXCL)) == (O_CREAT | O_EXCL))) {
177 /* If we didn't get as far as trying to open, then some locking thing
178 * probably went wrong, and we'll just bail here.
180 if (!intent_disposition(rep, DISP_OPEN_OPEN))
183 /* If we failed, then we must have failed opening, so don't look for
184 * file descriptor or anything, just give the client the bad news.
190 list_for_each(t, &med->med_open_head) {
191 mfd = list_entry(t, struct mds_file_data, mfd_list);
192 if (mfd->mfd_xid == req->rq_xid)
198 /* if we're not recovering, it had better be found */
200 } else if (mfd == NULL) {
201 mntget(mds->mds_vfsmnt);
202 CERROR("Re-opened file \n");
203 mfd = mds_dentry_open(child, mds->mds_vfsmnt,
204 rec->ur_flags & ~(O_DIRECT | O_TRUNC), req);
206 CERROR("mds: out of memory\n");
207 GOTO(out_dput, req->rq_status = -ENOMEM);
212 body->handle.cookie = mfd->mfd_handle.h_cookie;
221 int mds_pin(struct ptlrpc_request *req)
223 struct mds_obd *mds = mds_req2mds(req);
224 struct inode *pending_dir = mds->mds_pending_dir->d_inode;
225 struct mds_file_data *mfd = NULL;
226 struct mds_body *body;
227 struct dentry *dchild;
228 struct obd_run_ctxt saved;
229 char fidname[LL_FID_NAMELEN];
230 int fidlen = 0, rc, cleanup_phase = 0, size = sizeof(*body);
233 body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*body));
235 down(&pending_dir->i_sem);
236 fidlen = ll_fid2str(fidname, body->fid1.id, body->fid1.generation);
237 dchild = lookup_one_len(fidname, mds->mds_pending_dir, fidlen);
238 if (IS_ERR(dchild)) {
239 up(&pending_dir->i_sem);
240 rc = PTR_ERR(dchild);
241 CERROR("error looking up %s in PENDING: rc = %d\n",
248 if (dchild->d_inode) {
249 up(&pending_dir->i_sem);
250 mds_inode_set_orphan(dchild->d_inode);
251 mds_pack_inode2fid(&body->fid1, dchild->d_inode);
252 mds_pack_inode2body(body, dchild->d_inode);
253 GOTO(openit, rc = 0);
256 up(&pending_dir->i_sem);
258 /* We didn't find it in PENDING so it isn't an orphan. See
259 * if it's a regular inode. */
260 dchild = mds_fid2dentry(mds, &body->fid1, NULL);
261 if (!IS_ERR(dchild)) {
262 mds_pack_inode2fid(&body->fid1, dchild->d_inode);
263 mds_pack_inode2body(body, dchild->d_inode);
264 GOTO(openit, rc = 0);
267 /* We didn't find this inode on disk, but we're trying to pin it.
268 * This should never happen. */
269 CERROR("ENOENT during mds_pin for fid "LPU64"/%u\n", body->fid1.id,
270 body->fid1.generation);
274 /* dentry_open does a dput(de) and mntput(mds->mds_vfsmnt) on error */
275 mfd = mds_dentry_open(dchild, mds->mds_vfsmnt, body->flags, req);
277 dchild = NULL; /* prevent a double dput in cleanup phase 2 */
278 GOTO(cleanup, rc = PTR_ERR(mfd));
281 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
283 CERROR("out of memoryK\n");
286 body = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*body));
288 cleanup_phase = 4; /* mfd allocated */
289 body->handle.cookie = mfd->mfd_handle.h_cookie;
290 CDEBUG(D_INODE, "mfd %p, cookie "LPX64"\n", mfd,
291 mfd->mfd_handle.h_cookie);
292 GOTO(cleanup, rc = 0);
295 push_ctxt(&saved, &mds->mds_ctxt, NULL);
296 rc = mds_finish_transno(mds, dchild ? dchild->d_inode : NULL, NULL,
298 pop_ctxt(&saved, &mds->mds_ctxt, NULL);
299 /* XXX what do we do here if mds_finish_transno itself failed? */
300 switch (cleanup_phase) {
303 mds_mfd_destroy(mfd);
305 if (rc || S_ISLNK(dchild->d_inode->i_mode))
311 int mds_open(struct mds_update_record *rec, int offset,
312 struct ptlrpc_request *req, struct lustre_handle *child_lockh)
314 /* XXX ALLOCATE _something_ - 464 bytes on stack here */
315 static const char acc_table [] = {[O_RDONLY] MAY_READ,
316 [O_WRONLY] MAY_WRITE,
317 [O_RDWR] MAY_READ | MAY_WRITE};
318 struct mds_obd *mds = mds_req2mds(req);
319 struct obd_device *obd = req->rq_export->exp_obd;
320 struct ldlm_reply *rep = NULL;
321 struct mds_body *body = NULL;
322 struct dentry *dchild = NULL, *parent = NULL;
323 struct mds_export_data *med;
324 struct mds_file_data *mfd = NULL;
325 struct ldlm_res_id child_res_id = { .name = {0} };
326 struct lustre_handle parent_lockh;
327 int rc = 0, parent_mode, child_mode = LCK_PR, lock_flags, created = 0;
328 int cleanup_phase = 0, acc_mode;
332 if (offset == 2) { /* intent */
333 rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*rep));
334 body = lustre_msg_buf(req->rq_repmsg, 1, sizeof (*body));
335 } else if (offset == 0) { /* non-intent reint */
336 body = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*body));
342 MDS_CHECK_RESENT(req, reconstruct_open(rec, offset, req, child_lockh));
344 /* Step 0: If we are passed a fid, then we assume the client already
345 * opened this file and is only replaying the RPC, so we open the
346 * inode by fid (at some large expense in security).
348 if (rec->ur_fid2->id) {
349 struct inode *pending_dir = mds->mds_pending_dir->d_inode;
350 char fidname[LL_FID_NAMELEN];
353 down(&pending_dir->i_sem);
354 fidlen = ll_fid2str(fidname, rec->ur_fid2->id,
355 rec->ur_fid2->generation);
356 dchild = lookup_one_len(fidname, mds->mds_pending_dir, fidlen);
357 if (IS_ERR(dchild)) {
358 up(&pending_dir->i_sem);
359 rc = PTR_ERR(dchild);
360 CERROR("error looking up %s in PENDING: rc = %d\n",
365 if (dchild->d_inode) {
366 up(&pending_dir->i_sem);
367 mds_inode_set_orphan(dchild->d_inode);
368 mds_pack_inode2fid(&body->fid1, dchild->d_inode);
369 mds_pack_inode2body(body, dchild->d_inode);
371 GOTO(openit, rc = 0);
374 up(&pending_dir->i_sem);
376 /* We didn't find it in PENDING so it isn't an orphan. See
377 * if it was a regular inode that was previously created.
379 dchild = mds_fid2dentry(mds, rec->ur_fid2, NULL);
380 if (!IS_ERR(dchild)) {
381 mds_pack_inode2fid(&body->fid1, dchild->d_inode);
382 mds_pack_inode2body(body, dchild->d_inode);
384 GOTO(openit, rc = 0);
387 /* We didn't find the correct inode on disk either, so we
388 * need to re-create it via a regular replay. Do that below.
390 LASSERT(rec->ur_flags & O_CREAT);
392 LASSERT(offset == 2); /* If we got here, we must be called via intent */
394 med = &req->rq_export->exp_mds_data;
395 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_OPEN_PACK)) {
396 CERROR("test case OBD_FAIL_MDS_OPEN_PACK\n");
397 req->rq_status = -ENOMEM;
401 if ((rec->ur_flags & O_ACCMODE) >= sizeof (acc_table))
403 acc_mode = acc_table[rec->ur_flags & O_ACCMODE];
404 if ((rec->ur_flags & O_TRUNC) != 0)
405 acc_mode |= MAY_WRITE;
407 /* Step 1: Find and lock the parent */
408 intent_set_disposition(rep, DISP_LOOKUP_EXECD);
409 parent_mode = (rec->ur_flags & O_CREAT) ? LCK_PW : LCK_PR;
410 parent = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, parent_mode,
412 if (IS_ERR(parent)) {
413 rc = PTR_ERR(parent);
414 CERROR("parent lookup error %d\n", rc);
417 LASSERT(parent->d_inode);
419 cleanup_phase = 1; /* parent dentry and lock */
421 /* Step 2: Lookup the child */
422 dchild = ll_lookup_one_len(rec->ur_name, parent, rec->ur_namelen - 1);
424 GOTO(cleanup, rc = PTR_ERR(dchild));
426 cleanup_phase = 2; /* child dentry */
429 intent_set_disposition(rep, DISP_LOOKUP_POS);
431 intent_set_disposition(rep, DISP_LOOKUP_NEG);
433 /* Step 3: If the child was negative, and we're supposed to,
435 if (!dchild->d_inode) {
436 unsigned long ino = rec->ur_fid2->id;
438 if (!(rec->ur_flags & O_CREAT)) {
439 /* It's negative and we weren't supposed to create it */
440 GOTO(cleanup, rc = -ENOENT);
443 intent_set_disposition(rep, DISP_OPEN_CREATE);
444 handle = fsfilt_start(obd, parent->d_inode, FSFILT_OP_CREATE,
446 if (IS_ERR(handle)) {
447 rc = PTR_ERR(handle);
452 dchild->d_fsdata = (void *)(unsigned long)ino;
454 rc = vfs_create(parent->d_inode, dchild, rec->ur_mode);
455 if (dchild->d_fsdata == (void *)(unsigned long)ino)
456 dchild->d_fsdata = NULL;
459 CDEBUG(D_INODE, "error during create: %d\n", rc);
463 struct inode *inode = dchild->d_inode;
466 LASSERT(ino == inode->i_ino);
467 /* Written as part of setattr */
468 inode->i_generation = rec->ur_fid2->generation;
469 CDEBUG(D_HA, "recreated ino %lu with gen %x\n",
470 inode->i_ino, inode->i_generation);
474 LTIME_S(iattr.ia_atime) = rec->ur_time;
475 LTIME_S(iattr.ia_ctime) = rec->ur_time;
476 LTIME_S(iattr.ia_mtime) = rec->ur_time;
478 iattr.ia_uid = rec->ur_uid;
479 if (parent->d_inode->i_mode & S_ISGID) {
480 iattr.ia_gid = parent->d_inode->i_gid;
482 iattr.ia_gid = rec->ur_gid;
484 iattr.ia_valid = ATTR_UID | ATTR_GID | ATTR_ATIME |
485 ATTR_MTIME | ATTR_CTIME;
487 rc = fsfilt_setattr(obd, dchild, handle, &iattr, 0);
489 CERROR("error on setattr: rc = %d\n", rc);
490 /* XXX should we abort here in case of error? */
495 acc_mode = 0; /* Don't check for permissions */
498 LASSERT(!mds_inode_is_orphan(dchild->d_inode));
500 /* Step 4: It's positive, so lock the child */
501 child_res_id.name[0] = dchild->d_inode->i_ino;
502 child_res_id.name[1] = dchild->d_inode->i_generation;
505 /* For the open(O_CREAT) case, this would technically be a lock
506 * inversion (getting a VFS lock after starting a transaction),
507 * but in that case we cannot possibly block on this lock because
508 * we just created the child and also hold a write lock on the
509 * parent, so nobody could be holding the lock yet.
511 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
512 child_res_id, LDLM_PLAIN, NULL, 0, child_mode,
513 &lock_flags, ldlm_completion_ast,
514 mds_blocking_ast, NULL, child_lockh);
515 if (rc != ELDLM_OK) {
516 CERROR("ldlm_cli_enqueue: %d\n", rc);
517 GOTO(cleanup, rc = -EIO);
520 cleanup_phase = 3; /* child lock */
522 mds_pack_inode2fid(&body->fid1, dchild->d_inode);
523 mds_pack_inode2body(body, dchild->d_inode);
525 if (S_ISREG(dchild->d_inode->i_mode)) {
526 /* Check permissions etc */
527 rc = permission(dchild->d_inode, acc_mode);
531 /* Can't write to a read-only file */
532 if (IS_RDONLY(dchild->d_inode) && (acc_mode & MAY_WRITE) != 0)
533 GOTO(cleanup, rc = -EPERM);
535 /* An append-only file must be opened in append mode for
537 if (IS_APPEND(dchild->d_inode) && (acc_mode & MAY_WRITE) != 0 &&
538 ((rec->ur_flags & O_APPEND) == 0 ||
539 (rec->ur_flags & O_TRUNC) != 0))
540 GOTO(cleanup, rc = -EPERM);
542 rc = mds_pack_md(obd, req->rq_repmsg, 2, body, dchild->d_inode);
546 /* If we have LOV EA data, the OST holds size, mtime */
547 if (!(body->valid & OBD_MD_FLEASIZE))
548 body->valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
549 OBD_MD_FLATIME | OBD_MD_FLMTIME);
552 if (!created && (rec->ur_flags & O_CREAT) &&
553 (rec->ur_flags & O_EXCL)) {
554 /* File already exists, we didn't just create it, and we
555 * were passed O_EXCL; err-or. */
556 GOTO(cleanup, rc = -EEXIST); // returns a lock to the client
559 /* If we're opening a file without an EA for write, the client needs
561 if (S_ISREG(dchild->d_inode->i_mode) && (rec->ur_flags & O_ACCMODE) &&
562 child_mode != LCK_PW && !(body->valid & OBD_MD_FLEASIZE)) {
563 ldlm_lock_decref(child_lockh, child_mode);
568 /* if we are following a symlink, don't open */
569 if (S_ISLNK(dchild->d_inode->i_mode))
570 GOTO(cleanup, rc = 0);
572 if ((rec->ur_flags & O_DIRECTORY) && !S_ISDIR(dchild->d_inode->i_mode))
573 GOTO(cleanup, rc = -ENOTDIR);
575 /* Step 5: mds_open it */
576 intent_set_disposition(rep, DISP_OPEN_OPEN);
578 /* dentry_open does a dput(de) and mntput(mds->mds_vfsmnt) on error */
579 mfd = mds_dentry_open(dchild, mds->mds_vfsmnt,
580 rec->ur_flags & ~(O_DIRECT | O_TRUNC), req);
582 dchild = NULL; /* prevent a double dput in cleanup phase 2 */
583 GOTO(cleanup, rc = PTR_ERR(mfd));
586 cleanup_phase = 4; /* mfd allocated */
587 body->handle.cookie = mfd->mfd_handle.h_cookie;
588 CDEBUG(D_INODE, "mfd %p, cookie "LPX64"\n", mfd,
589 mfd->mfd_handle.h_cookie);
590 GOTO(cleanup, rc = 0); /* returns a lock to the client */
593 rc = mds_finish_transno(mds, dchild ? dchild->d_inode : NULL, handle,
594 req, rc, rep->lock_policy_res1);
595 /* XXX what do we do here if mds_finish_transno itself failed? */
596 switch (cleanup_phase) {
598 if (rc && !S_ISLNK(dchild->d_inode->i_mode))
599 mds_mfd_destroy(mfd);
601 /* This is the same logic as in the IT_OPEN part of
602 * ldlm_intent_policy: if we found the dentry, or we tried to
603 * open it (meaning that we created, if it wasn't found), then
604 * we return the lock to the caller and client. */
605 if (intent_disposition(rep, DISP_LOOKUP_NEG) &&
606 !intent_disposition(rep, DISP_OPEN_OPEN))
607 ldlm_lock_decref(child_lockh, child_mode);
609 if (rc || S_ISLNK(dchild->d_inode->i_mode))
615 ldlm_lock_decref(&parent_lockh, parent_mode);
617 memcpy(&req->rq_ack_locks[0].lock,&parent_lockh,
618 sizeof(parent_lockh));
619 req->rq_ack_locks[0].mode = parent_mode;