Whamcloud - gitweb
b=3643
[fs/lustre-release.git] / lustre / llite / namei.c
index 9d6f227..61cf15b 100644 (file)
@@ -1,17 +1,24 @@
 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
  * vim:expandtab:shiftwidth=8:tabstop=8:
  *
- * This code is issued under the GNU General Public License.
- * See the file COPYING in this distribution
+ *  Copyright (c) 2002, 2003 Cluster File Systems, Inc.
  *
- * Copyright (C) 1992, 1993, 1994, 1995
- * Remy Card (card@masi.ibp.fr)
- * Laboratoire MASI - Institut Blaise Pascal
- * Universite Pierre et Marie Curie (Paris VI)
+ *   This file is part of Lustre, http://www.lustre.org.
  *
- *  from
+ *   Lustre is free software; you can redistribute it and/or
+ *   modify it under the terms of version 2 of the GNU General Public
+ *   License as published by the Free Software Foundation.
  *
- *  linux/fs/ext2/namei.c
+ *   Lustre is distributed in the hope that it will be useful,
+ *   but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *   GNU General Public License for more details.
+ *
+ *   You should have received a copy of the GNU General Public License
+ *   along with Lustre; if not, write to the Free Software
+ *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ *
+ *  derived in small part from linux/fs/ext2/namei.c
  *
  *  Copyright (C) 1991, 1992  Linus Torvalds
  *
  *        David S. Miller (davem@caip.rutgers.edu), 1995
  *  Directory entry file type support and forward compatibility hooks
  *      for B-tree directories by Theodore Ts'o (tytso@mit.edu), 1998
- *
- *  Changes for use in OBDFS
- *  Copyright (c) 1999, Seagate Technology Inc.
- *  Copyright (C) 2001, Cluster File Systems, Inc.
- *                       Rewritten based on recent ext2 page cache use.
- *
  */
 
 #include <linux/fs.h>
-#include <linux/locks.h>
+#include <linux/sched.h>
+#include <linux/mm.h>
+#include <linux/smp_lock.h>
 #include <linux/quotaops.h>
+#include <linux/highmem.h>
+#include <linux/pagemap.h>
 
 #define DEBUG_SUBSYSTEM S_LLITE
 
 #include <linux/obd_support.h>
 #include <linux/lustre_lite.h>
 #include <linux/lustre_dlm.h>
-#include <linux/obd_lov.h>
-
-extern struct address_space_operations ll_aops;
-
-/* from super.c */
-extern void ll_change_inode(struct inode *inode);
-extern int ll_setattr(struct dentry *de, struct iattr *attr);
-
-/* from dir.c */
-extern int ll_add_link (struct dentry *dentry, struct inode *inode);
-obd_id ll_inode_by_name(struct inode * dir, struct dentry *dentry, int *typ);
-int ext2_make_empty(struct inode *inode, struct inode *parent);
-struct ext2_dir_entry_2 * ext2_find_entry (struct inode * dir,
-                   struct dentry *dentry, struct page ** res_page);
-int ext2_delete_entry (struct ext2_dir_entry_2 * dir, struct page * page );
-int ext2_empty_dir (struct inode * inode);
-struct ext2_dir_entry_2 * ext2_dotdot (struct inode *dir, struct page **p);
-void ext2_set_link(struct inode *dir, struct ext2_dir_entry_2 *de,
-                   struct page *page, struct inode *inode);
+#include <linux/lustre_version.h>
+#include "llite_internal.h"
 
-/*
- * Couple of helper functions - make the code slightly cleaner.
- */
-static inline void ext2_inc_count(struct inode *inode)
-{
-        inode->i_nlink++;
-}
+/* methods */
 
-/* postpone the disk update until the inode really goes away */
-static inline void ext2_dec_count(struct inode *inode)
+#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
+static int ll_test_inode(struct inode *inode, unsigned long ino, void *opaque)
+#else
+static int ll_test_inode(struct inode *inode, void *opaque)
+#endif
 {
-        inode->i_nlink--;
-}
+        static int last_ino, last_gen, last_count;
+        struct lustre_md *md = opaque;
 
-static inline int ext2_add_nondir(struct dentry *dentry, struct inode *inode)
-{
-        int err;
-        err = ll_add_link(dentry, inode);
-        if (!err) {
-                d_instantiate(dentry, inode);
+        if (!(md->body->valid & (OBD_MD_FLGENER | OBD_MD_FLID))) {
+                CERROR("MDS body missing inum or generation\n");
                 return 0;
         }
-        ext2_dec_count(inode);
-        iput(inode);
-        return err;
-}
 
-/* methods */
-static int ll_find_inode(struct inode *inode, unsigned long ino, void *opaque)
-{
-        struct ll_inode_md *md = opaque;
+        if (last_ino == md->body->ino && last_gen == md->body->generation &&
+            last_count < 500) {
+                last_count++;
+        } else {
+                if (last_count > 1)
+                        CDEBUG(D_VFSTRACE, "compared %u/%u %u times\n",
+                               last_ino, last_gen, last_count);
+                last_count = 0;
+                last_ino = md->body->ino;
+                last_gen = md->body->generation;
+                CDEBUG(D_VFSTRACE,
+                       "comparing inode %p ino %lu/%u/%u to body %u/%u/%u\n",
+                       inode, inode->i_ino, inode->i_generation,
+                       ll_i2info(inode)->lli_mds,
+                       md->body->ino, md->body->generation,
+                       md->body->mds);
+        }
 
+#if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
+        if (inode->i_ino != md->body->ino)
+                return 0;
+#endif
         if (inode->i_generation != md->body->generation)
                 return 0;
 
+        if (ll_i2info(inode)->lli_mds != md->body->mds)
+                return 0;
+
+        /* Apply the attributes in 'opaque' to this inode */
+        ll_update_inode(inode, md);
         return 1;
 }
 
 extern struct dentry_operations ll_d_ops;
 
-int ll_lock(struct inode *dir, struct dentry *dentry,
-            struct lookup_intent *it, struct lustre_handle *lockh)
-{
-        struct ll_sb_info *sbi = ll_i2sbi(dir);
-        char *tgt = NULL;
-        int tgtlen = 0;
-        int err, lock_mode;
-
-        if ((it->it_op & (IT_CREAT | IT_MKDIR | IT_SETATTR | IT_MKNOD)))
-                lock_mode = LCK_PW;
-        else if (it->it_op & (IT_READDIR | IT_GETATTR | IT_OPEN | IT_UNLINK |
-                              IT_RMDIR | IT_RENAME | IT_RENAME2 | IT_READLINK))
-                lock_mode = LCK_PR;
-        else if (it->it_op & IT_SYMLINK) {
-                lock_mode = LCK_PW;
-                tgt = it->it_data;
-                tgtlen = strlen(tgt);
-                it->it_data = NULL;
-        } else if (it->it_op & IT_LOOKUP)
-                lock_mode = LCK_CR;
-        else {
-                LBUG();
-                RETURN(-EINVAL);
-        }
-
-        err = mdc_enqueue(&sbi->ll_mdc_conn, LDLM_MDSINTENT, it, lock_mode, dir,
-                          dentry, lockh, tgt, tgtlen, dir, sizeof(*dir));
-
-        RETURN(err);
-}
-
 int ll_unlock(__u32 mode, struct lustre_handle *lockh)
 {
         ENTRY;
@@ -136,632 +104,726 @@ int ll_unlock(__u32 mode, struct lustre_handle *lockh)
         RETURN(0);
 }
 
-static struct dentry *ll_lookup2(struct inode *dir, struct dentry *dentry,
-                                 struct lookup_intent *it)
+/* Get an inode by inode number (already instantiated by the intent lookup).
+ * Returns inode or NULL
+ */
+#if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
+int ll_set_inode(struct inode *inode, void *opaque)
 {
-        struct ptlrpc_request *request = NULL;
-        struct inode * inode = NULL;
-        struct ll_sb_info *sbi = ll_i2sbi(dir);
-        struct ll_inode_md md;
-        struct lustre_handle lockh;
-        struct lookup_intent lookup_it = { IT_LOOKUP };
-        int err, offset;
-        obd_id ino = 0;
+        ll_read_inode2(inode, opaque);
+        return 0;
+}
 
-        ENTRY;
+struct inode *ll_iget(struct super_block *sb, ino_t hash,
+                      struct lustre_md *md)
+{
+        struct inode *inode;
 
-        if (it == NULL) {
-                it = &lookup_it;
-                dentry->d_it = it;
+        LASSERT(hash != 0);
+        inode = iget5_locked(sb, hash, ll_test_inode, ll_set_inode, md);
+
+        if (inode) {
+                if (inode->i_state & I_NEW)
+                        unlock_new_inode(inode);
+                CDEBUG(D_VFSTRACE, "inode: %lu/%u(%p)\n", inode->i_ino,
+                       inode->i_generation, inode);
         }
 
-        CDEBUG(D_INFO, "name: %*s, intent op: %d\n", dentry->d_name.len,
-               dentry->d_name.name, it->it_op);
+        return inode;
+}
+#else
+struct inode *ll_iget(struct super_block *sb, ino_t hash,
+                      struct lustre_md *md)
+{
+        struct inode *inode;
+        LASSERT(hash != 0);
+        inode = iget4(sb, hash, ll_test_inode, md);
+        if (inode)
+                CDEBUG(D_VFSTRACE, "inode: %lu/%u(%p)\n", inode->i_ino,
+                       inode->i_generation, inode);
+        return inode;
+}
+#endif
 
-        if (dentry->d_name.len > EXT2_NAME_LEN)
-                RETURN(ERR_PTR(-ENAMETOOLONG));
+int ll_mdc_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
+                        void *data, int flag)
+{
+        int rc;
+        struct lustre_handle lockh;
+        ENTRY;
 
-        err = ll_lock(dir, dentry, it, &lockh);
-        if (err < 0)
-                RETURN(ERR_PTR(err));
-        memcpy(it->it_lock_handle, &lockh, sizeof(lockh));
-
-        request = (struct ptlrpc_request *)it->it_data;
-        if (it->it_disposition) {
-                int mode, easize = 0;
-                obd_flag valid;
-
-                offset = 1;
-                if (it->it_op & (IT_CREAT | IT_MKDIR | IT_SYMLINK | IT_MKNOD)) {
-                        /* For create ops, we want the lookup to be negative */
-                        if (!it->it_status)
-                                GOTO(negative, NULL);
-                } else if (it->it_op & (IT_GETATTR | IT_SETATTR | IT_LOOKUP)) {
-                        /* For check ops, we want the lookup to succeed */
-                        it->it_data = NULL;
-                        if (it->it_status)
-                                GOTO(neg_req, NULL);
-                } else if (it->it_op & IT_RENAME) {
-                        /* For rename, we want the lookup to succeed */
-                        if (it->it_status) {
-                                it->it_data = NULL;
-                                GOTO(neg_req, NULL);
-                        }
-                        it->it_data = dentry;
-                } else if (it->it_op & (IT_UNLINK | IT_RMDIR)) {
-                        /* For remove ops, we want the lookup to succeed */
-                        it->it_data = NULL;
-                        if (it->it_status)
-                                GOTO(neg_req, NULL);
-                        goto iget;
-                } else if (it->it_op == IT_OPEN) {
-                        it->it_data = NULL;
-                        if (it->it_status && it->it_status != -EEXIST)
-                                GOTO(neg_req, NULL);
-                } else if (it->it_op == IT_RENAME2) {
-                        struct mds_body *body =
-                                lustre_msg_buf(request->rq_repmsg, offset);
-                        it->it_data = NULL;
-                        /* For rename2, this means the lookup is negative */
-                        if (body->valid == 0)
-                                GOTO(neg_req, NULL);
-                        goto iget; /* XXX not sure about this */
+        switch (flag) {
+        case LDLM_CB_BLOCKING:
+                ldlm_lock2handle(lock, &lockh);
+                rc = ldlm_cli_cancel(&lockh);
+                if (rc < 0) {
+                        CDEBUG(D_INODE, "ldlm_cli_cancel: %d\n", rc);
+                        RETURN(rc);
                 }
+                break;
+        case LDLM_CB_CANCELING: {
+                struct inode *inode = ll_inode_from_lock(lock);
+                __u64 bits = lock->l_policy_data.l_inodebits.bits;
 
-                /* Do a getattr now that we have the lock */
-                md.body = lustre_msg_buf(request->rq_repmsg, offset);
-                ino = md.body->fid1.id;
-                mode = md.body->mode;
-                valid = OBD_MD_FLNOTOBD | OBD_MD_FLEASIZE;
-                if (it->it_op == IT_READLINK) {
-                        valid |= OBD_MD_LINKNAME;
-                        easize = md.body->size;
-                }
-                ptlrpc_free_req(request);
-                request = NULL;
-                err = mdc_getattr(&sbi->ll_mdc_conn, ino, mode,
-                                  valid, easize, &request);
-                if (err) {
-                        CERROR("failure %d inode %Ld\n", err, (long long)ino);
-                        ptlrpc_free_req(request);
-#warning FIXME: must release lock here
-                        RETURN(ERR_PTR(-abs(err)));
+                /* For lookup locks: Invalidate all dentries associated with
+                   this inode, for UPDATE locks - invalidate directory pages */
+                if (inode == NULL)
+                        break;
+
+                if (bits & MDS_INODELOCK_UPDATE)
+                        clear_bit(LLI_F_HAVE_MDS_SIZE_LOCK,
+                                  &(ll_i2info(inode)->lli_flags));
+
+
+                if (lock->l_resource->lr_name.name[0] != inode->i_ino ||
+                    lock->l_resource->lr_name.name[1] != inode->i_generation) {
+                        LDLM_ERROR(lock, "data mismatch with ino %lu/%u",
+                                   inode->i_ino, inode->i_generation);
                 }
-                offset = 0;
-        } else {
-                struct ll_inode_info *lli = ll_i2info(dir);
-                int type;
-
-                memcpy(&lli->lli_intent_lock_handle, &lockh, sizeof(lockh));
-                offset = 0;
-
-                ino = ll_inode_by_name(dir, dentry, &type);
-#warning FIXME: handle negative inode case (see old ll_lookup)
-
-                err = mdc_getattr(&sbi->ll_mdc_conn, ino, type,
-                                  OBD_MD_FLNOTOBD|OBD_MD_FLEASIZE, 0, &request);
-                if (err) {
-                        CERROR("failure %d inode %Ld\n", err, (long long)ino);
-                        ptlrpc_free_req(request);
-#warning FIXME: must release lock here
-                        RETURN(ERR_PTR(-abs(err)));
+
+                /* If lookup lock is cancelled, we just drop the dentry and
+                   this will cause us to reget data from MDS when we'd want to
+                   access this dentry/inode again. If this is lock on
+                   other parts of inode that is cancelled, we do not need to do
+                   much (but need to discard data from readdir, if any), since
+                   abscence of lock will cause ll_revalidate_it (called from
+                   stat() and similar functions) to renew the data anyway */
+                if (S_ISDIR(inode->i_mode) &&
+                    (bits & MDS_INODELOCK_UPDATE)) {
+                        CDEBUG(D_INODE, "invalidating inode %lu\n",
+                               inode->i_ino);
+
+                        truncate_inode_pages(inode->i_mapping, 0);
                 }
+
+                if (inode->i_sb->s_root &&
+                    inode != inode->i_sb->s_root->d_inode &&
+                    (bits & MDS_INODELOCK_LOOKUP))
+                        ll_unhash_aliases(inode);
+                iput(inode);
+                break;
         }
+        default:
+                LBUG();
+        }
+
+        RETURN(0);
+}
+
+int ll_mdc_cancel_unused(struct lustre_handle *conn, struct inode *inode,
+                         int flags, void *opaque)
+{
+        struct ldlm_res_id res_id =
+                { .name = {inode->i_ino, inode->i_generation} };
+        struct obd_device *obddev = class_conn2obd(conn);
+        ENTRY;
+        
+        RETURN(ldlm_cli_cancel_unused(obddev->obd_namespace, &res_id, flags,
+                                      opaque));
+}
+
+/* Search "inode"'s alias list for a dentry that has the same name and parent as
+ * de.  If found, return it.  If not found, return de. */
+struct dentry *ll_find_alias(struct inode *inode, struct dentry *de)
+{
+        struct list_head *tmp;
+
+        spin_lock(&dcache_lock);
+        list_for_each(tmp, &inode->i_dentry) {
+                struct dentry *dentry = list_entry(tmp, struct dentry, d_alias);
+
+                /* We are called here with 'de' already on the aliases list. */
+                if (dentry == de) {
+                        CERROR("whoops\n");
+                        continue;
+                }
+
+                if (dentry->d_parent != de->d_parent)
+                        continue;
+
+                if (dentry->d_name.len != de->d_name.len)
+                        continue;
+
+                if (memcmp(dentry->d_name.name, de->d_name.name,
+                           de->d_name.len) != 0)
+                        continue;
 
- iget:
-        md.body = lustre_msg_buf(request->rq_repmsg, offset);
-        if (S_ISREG(md.body->mode)) {
-                if (request->rq_repmsg->bufcount < offset + 1)
-                        LBUG();
-                md.md = lustre_msg_buf(request->rq_repmsg, offset + 1);
-        } else
-                md.md = NULL;
-
-        /* No rpc's happen during iget4, -ENOMEM's are possible */
-        inode = iget4(dir->i_sb, ino, ll_find_inode, &md);
-
-        LASSERT(!IS_ERR(inode));
-        if (!inode) {
-                ptlrpc_free_req(request);
-                ll_intent_release(dentry);
-                RETURN(ERR_PTR(-ENOMEM));
+                if (!list_empty(&dentry->d_lru))
+                        list_del_init(&dentry->d_lru);
+
+                hlist_del_init(&dentry->d_hash);
+                __d_rehash(dentry, 0); /* avoid taking dcache_lock inside */
+                spin_unlock(&dcache_lock);
+                atomic_inc(&dentry->d_count);
+                iput(inode);
+                dentry->d_flags &= ~DCACHE_LUSTRE_INVALID;
+                CDEBUG(D_DENTRY, "alias dentry %*s (%p) parent %p inode %p "
+                       "refc %d\n", de->d_name.len, de->d_name.name, de,
+                       de->d_parent, de->d_inode, atomic_read(&de->d_count));
+                return dentry;
         }
 
-        EXIT;
- neg_req:
-        ptlrpc_req_finished(request);
- negative:
-        dentry->d_op = &ll_d_ops;
-        d_add(dentry, inode);
-        if (it->it_op == IT_LOOKUP)
-                ll_intent_release(dentry);
+        spin_unlock(&dcache_lock);
 
-        return NULL;
+        return de;
 }
 
-static struct inode *ll_create_node(struct inode *dir, const char *name,
-                                    int namelen, const char *tgt, int tgtlen,
-                                    int mode, __u64 extra,
-                                    struct lookup_intent *it,
-                                    struct lov_stripe_md *smd)
+static int lookup_it_finish(struct ptlrpc_request *request, int offset,
+                            struct lookup_intent *it, void *data)
 {
-        struct inode *inode;
-        struct ptlrpc_request *request = NULL;
-        struct mds_body *body;
+        struct it_cb_data *icbd = data;
+        struct dentry **de = icbd->icbd_childp;
+        struct inode *parent = icbd->icbd_parent;
+        struct ll_sb_info *sbi = ll_i2sbi(parent);
+        struct dentry *dentry = *de, *saved = *de;
+        struct inode *inode = NULL;
         int rc;
-        time_t time = CURRENT_TIME;
-        struct ll_sb_info *sbi = ll_i2sbi(dir);
-        int gid = current->fsgid;
-        struct ll_inode_md md;
-        struct lov_mds_md *mds_md = NULL;
-        int mds_md_size = 0;
 
-        ENTRY;
+        /* NB 1 request reference will be taken away by ll_intent_lock()
+         * when I return */
+        if (!it_disposition(it, DISP_LOOKUP_NEG)) {
+                ENTRY;
 
-        if (dir->i_mode & S_ISGID) {
-                gid = dir->i_gid;
-                if (S_ISDIR(mode))
-                        mode |= S_ISGID;
-        }
+                rc = ll_prep_inode(sbi->ll_osc_exp, sbi->ll_mdc_exp,
+                                   &inode, request, offset, dentry->d_sb);
+                if (rc)
+                        RETURN(rc);
 
-        if (!it || !it->it_disposition) {
-                rc = mdc_create(&sbi->ll_mdc_conn, dir, name, namelen, tgt,
-                                 tgtlen, mode, current->fsuid,
-                                 gid, time, extra, smd, &request);
-                if (rc) {
-                        inode = ERR_PTR(rc);
-                        GOTO(out, rc);
-                }
-                body = lustre_msg_buf(request->rq_repmsg, 0);
-                if (smd != NULL) {
-                        mds_md_size = sizeof (struct lov_mds_md) + 
-                                smd->lmd_stripe_count * sizeof(struct lov_object_id);
-                        OBD_ALLOC(mds_md, mds_md_size);
-                        lov_packmd(mds_md, smd);
-                        md.md = mds_md;
-                } else
-                        md.md = NULL;
+                CDEBUG(D_DLMTRACE, "setting l_data to inode %p (%lu/%u)\n",
+                       inode, inode->i_ino, inode->i_generation);
+                mdc_set_lock_data(NULL, &it->d.lustre.it_lock_handle, inode);
 
-        } else {
-                request = it->it_data;
-                body = lustre_msg_buf(request->rq_repmsg, 1);
-                md.md = NULL;
-        }
+                /* If this is a stat, get the authoritative file size */
+                if (it->it_op == IT_GETATTR && S_ISREG(inode->i_mode) &&
+                    ll_i2info(inode)->lli_smd != NULL) {
+                        struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
+                        struct ost_lvb lvb;
+                        ldlm_error_t rc;
 
-        body->valid = OBD_MD_FLNOTOBD;
+                        LASSERT(lsm->lsm_object_id != 0);
 
-        body->nlink = 1;
-        body->atime = body->ctime = body->mtime = time;
-        body->uid = current->fsuid;
-        body->gid = gid;
-        body->mode = mode;
+                        /* bug 2334: drop MDS lock before acquiring OST lock */
+                        ll_intent_drop_lock(it);
 
-        md.body = body;
+                        rc = ll_glimpse_size(inode, &lvb);
+                        if (rc) {
+                                iput(inode);
+                                RETURN(rc);
+                        }
+                        inode->i_size = lvb.lvb_size;
+                }
 
-        inode = iget4(dir->i_sb, body->ino, ll_find_inode, &md);
-        if (IS_ERR(inode)) {
-                rc = PTR_ERR(inode);
-                CERROR("new_inode -fatal: rc %d\n", rc);
-                LBUG();
-                GOTO(out, rc);
+                dentry = *de = ll_find_alias(inode, dentry);
+        } else {
+                ENTRY;
         }
 
-        if (!list_empty(&inode->i_dentry)) {
-                CERROR("new_inode -fatal: inode %d, ct %d lnk %d\n",
-                       body->ino, atomic_read(&inode->i_count),
-                       inode->i_nlink);
-                iput(inode);
-                LBUG();
-                inode = ERR_PTR(-EIO);
-                GOTO(out, -EIO);
+        dentry->d_op = &ll_d_ops;
+        ll_set_dd(dentry);
+
+        if (dentry == saved) {
+                d_add(dentry, inode);
         }
 
-        EXIT;
- out:
-        if (mds_md != NULL) 
-                OBD_FREE(mds_md, mds_md_size);
-        ptlrpc_free_req(request);
-        return inode;
+        RETURN(0);
 }
 
-static int ll_mdc_unlink(struct inode *dir, struct inode *child, __u32 mode,
-                         const char *name, int len)
-{
-        struct ptlrpc_request *request = NULL;
-        struct ll_sb_info *sbi = ll_i2sbi(dir);
-        int err;
 
+static struct dentry *ll_lookup_it(struct inode *parent, struct dentry *dentry,
+                                   struct nameidata *nd,
+                                   struct lookup_intent *it, int flags)
+{
+        struct dentry *save = dentry, *retval;
+        struct ll_fid pfid;
+        struct ll_uctxt ctxt;
+        struct it_cb_data icbd;
+        struct ptlrpc_request *req = NULL;
+        struct lookup_intent lookup_it = { .it_op = IT_LOOKUP };
+        int rc;
         ENTRY;
 
-        err = mdc_unlink(&sbi->ll_mdc_conn, dir, child, mode, name, len,
-                         &request);
-        ptlrpc_free_req(request);
+        if (dentry->d_name.len > EXT3_NAME_LEN)
+                RETURN(ERR_PTR(-ENAMETOOLONG));
 
-        RETURN(err);
+        CDEBUG(D_VFSTRACE, "VFS Op:name=%s,dir=%lu/%u(%p),intent=%s\n",
+               dentry->d_name.name, parent->i_ino, parent->i_generation,
+               parent, LL_IT2STR(it));
+
+        if (d_mountpoint(dentry))
+                CERROR("Tell Peter, lookup on mtpt, it %s\n", LL_IT2STR(it));
+
+        if (nd != NULL)
+                nd->mnt->mnt_last_used = jiffies;
+
+        ll_frob_intent(&it, &lookup_it);
+
+        icbd.icbd_childp = &dentry;
+        icbd.icbd_parent = parent;
+        ll_inode2fid(&pfid, parent);
+        ll_i2uctxt(&ctxt, parent, NULL);
+
+        rc = md_intent_lock(ll_i2mdcexp(parent), &ctxt, &pfid,
+                            dentry->d_name.name, dentry->d_name.len, NULL, 0,
+                            NULL, it, flags, &req, ll_mdc_blocking_ast);
+        if (rc < 0)
+                GOTO(out, retval = ERR_PTR(rc));
+
+        rc = lookup_it_finish(req, 1, it, &icbd);
+        if (rc != 0) {
+                ll_intent_release(it);
+                GOTO(out, retval = ERR_PTR(rc));
+        }
+
+        ll_lookup_finish_locks(it, dentry);
+
+        if (nd &&
+            dentry->d_inode != NULL && dentry->d_inode->i_mode & S_ISUID &&
+            S_ISDIR(dentry->d_inode->i_mode) &&
+            (flags & LOOKUP_CONTINUE || (it->it_op & (IT_CHDIR | IT_OPEN))))
+                ll_dir_process_mount_object(dentry, nd->mnt);
+
+        if (dentry == save)
+                GOTO(out, retval = NULL);
+        else
+                GOTO(out, retval = dentry);
+ out:
+        if (req)
+                ptlrpc_req_finished(req);
+        if (dentry->d_inode)
+                CDEBUG(D_INODE, "lookup 0x%p in %lu/%lu: %*s -> %lu/%lu\n",
+                                dentry,
+                                (unsigned long) parent->i_ino,
+                                (unsigned long) parent->i_generation,
+                                dentry->d_name.len, dentry->d_name.name,
+                                (unsigned long) dentry->d_inode->i_ino,
+                                (unsigned long) dentry->d_inode->i_generation);
+        else
+                CDEBUG(D_INODE, "lookup 0x%p in %lu/%lu: %*s -> ??\n",
+                                dentry,
+                                (unsigned long) parent->i_ino,
+                                (unsigned long) parent->i_generation,
+                                dentry->d_name.len, dentry->d_name.name);
+        return retval;
 }
 
-int ll_mdc_link(struct dentry *src, struct inode *dir,
-                const char *name, int len)
+#if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
+static struct dentry *ll_lookup_nd(struct inode *parent, struct dentry *dentry,
+                                   struct nameidata *nd)
 {
-        struct ptlrpc_request *request = NULL;
-        int err;
-        struct ll_sb_info *sbi = ll_i2sbi(dir);
-
+        struct dentry *de;
         ENTRY;
 
-        err = mdc_link(&sbi->ll_mdc_conn, src, dir, name,
-                       len, &request);
-        ptlrpc_free_req(request);
+        if (nd && nd->flags & LOOKUP_LAST && !(nd->flags & LOOKUP_LINK_NOTLAST))
+                de = ll_lookup_it(parent, dentry, nd, &nd->intent, nd->flags);
+        else
+                de = ll_lookup_it(parent, dentry, nd, NULL, 0);
 
-        RETURN(err);
+        RETURN(de);
 }
+#endif
 
-int ll_mdc_rename(struct inode *src, struct inode *tgt,
-                  struct dentry *old, struct dentry *new)
+/* We depend on "mode" being set with the proper file type/umask by now */
+static struct inode *ll_create_node(struct inode *dir, const char *name,
+                                    int namelen, const void *data, int datalen,
+                                    int mode, __u64 extra,
+                                    struct lookup_intent *it)
 {
+        struct inode *inode = NULL;
         struct ptlrpc_request *request = NULL;
-        struct ll_sb_info *sbi = ll_i2sbi(src);
-        int err;
-
+        struct ll_sb_info *sbi = ll_i2sbi(dir);
+        int rc;
         ENTRY;
 
-        err = mdc_rename(&sbi->ll_mdc_conn, src, tgt,
-                         old->d_name.name, old->d_name.len,
-                         new->d_name.name, new->d_name.len, &request);
-        ptlrpc_free_req(request);
+        LASSERT(it && it->d.lustre.it_disposition);
 
-        RETURN(err);
+        request = it->d.lustre.it_data;
+        rc = ll_prep_inode(sbi->ll_osc_exp, sbi->ll_mdc_exp,
+                           &inode, request, 1, dir->i_sb);
+        if (rc)
+                GOTO(out, inode = ERR_PTR(rc));
+
+        LASSERT(list_empty(&inode->i_dentry));
+
+        /* We asked for a lock on the directory, but were granted a
+         * lock on the inode.  Since we finally have an inode pointer,
+         * stuff it in the lock. */
+        CDEBUG(D_DLMTRACE, "setting l_ast_data to inode %p (%lu/%u)\n",
+               inode, inode->i_ino, inode->i_generation);
+        mdc_set_lock_data(NULL, &it->d.lustre.it_lock_handle, inode);
+        EXIT;
+ out:
+        ptlrpc_req_finished(request);
+        return inode;
 }
 
 /*
- * By the time this is called, we already have created
- * the directory cache entry for the new file, but it
- * is so far negative - it has no inode.
+ * By the time this is called, we already have created the directory cache
+ * entry for the new file, but it is so far negative - it has no inode.
+ *
+ * We defer creating the OBD object(s) until open, to keep the intent and
+ * non-intent code paths similar, and also because we do not have the MDS
+ * inode number before calling ll_create_node() (which is needed for LOV),
+ * so we would need to do yet another RPC to the MDS to store the LOV EA
+ * data on the MDS.  If needed, we would pass the PACKED lmm as data and
+ * lmm_size in datalen (the MDS still has code which will handle that).
  *
  * If the create succeeds, we fill in the inode information
  * with d_instantiate().
  */
-
-static int ll_create(struct inode * dir, struct dentry * dentry, int mode)
+static int ll_create_it(struct inode *dir, struct dentry *dentry, int mode,
+                        struct lookup_intent *it)
 {
-        int err, rc = 0;
-        struct obdo oa;
         struct inode *inode;
-        struct lov_stripe_md *smd = NULL;
-        struct ll_inode_info *lli = NULL;
+        struct ptlrpc_request *request = it->d.lustre.it_data;
+        struct obd_export *mdc_exp = ll_i2mdcexp(dir); 
+        int rc = 0;
         ENTRY;
 
-        if (dentry->d_it->it_disposition == 0) {
-                memset(&oa, 0, sizeof(oa));
-                oa.o_valid = OBD_MD_FLMODE;
-                oa.o_mode = S_IFREG | 0600;
-                rc = obd_create(ll_i2obdconn(dir), &oa, &smd);
-                CDEBUG(D_DENTRY, "name %s mode %o o_id %lld: rc = %d\n",
-                       dentry->d_name.name, mode, (long long)oa.o_id, rc);
-                if (rc)
-                        RETURN(rc);
-        }
+        CDEBUG(D_VFSTRACE, "VFS Op:name=%s,dir=%lu/%u(%p),intent=%s\n",
+               dentry->d_name.name, dir->i_ino, dir->i_generation, dir,
+               LL_IT2STR(it));
 
-        inode = ll_create_node(dir, dentry->d_name.name, dentry->d_name.len,
-                               NULL, 0, mode, 0, dentry->d_it, smd);
+        rc = it_open_error(DISP_OPEN_CREATE, it);
+        if (rc)
+                RETURN(rc);
 
+        mdc_store_inode_generation(mdc_exp, request, 2, 1);
+        inode = ll_create_node(dir, dentry->d_name.name, dentry->d_name.len,
+                               NULL, 0, mode, 0, it);
         if (IS_ERR(inode)) {
-                rc = PTR_ERR(inode);
-                CERROR("error creating MDS object for id %Ld: rc = %d\n",
-                       (unsigned long long)oa.o_id, rc);
-                GOTO(out_destroy, rc);
-        }
-
-        if (dentry->d_it->it_disposition) {
-                lli = ll_i2info(inode);
-                memcpy(&lli->lli_intent_lock_handle,
-                       dentry->d_it->it_lock_handle,
-                       sizeof(struct lustre_handle));
-                d_instantiate(dentry, inode);
-        } else {
-                /* no directory data updates when intents rule */
-                rc = ext2_add_nondir(dentry, inode);
-        }
-
-        RETURN(rc);
-
-out_destroy:
-        if (smd) {
-                oa.o_easize = smd->lmd_easize;
-                oa.o_valid |= OBD_MD_FLEASIZE;
-                err = obd_destroy(ll_i2obdconn(dir), &oa, smd);
-                if (err)
-                        CERROR("error destroying objid %Ld on error: err %d\n",
-                       (unsigned long long)oa.o_id, err);
+                RETURN(PTR_ERR(inode));
         }
 
-        return rc;
+        d_instantiate(dentry, inode);
+        RETURN(0);
 }
 
-static int ll_mknod(struct inode *dir, struct dentry *dentry, int mode,
-                    int rdev)
+#if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
+static int ll_create_nd(struct inode *dir, struct dentry *dentry, int mode, struct nameidata *nd)
 {
-        struct inode *inode;
-        int err = 0;
-
-        inode = ll_create_node(dir, dentry->d_name.name, dentry->d_name.len,
-                               NULL, 0, mode, rdev, dentry->d_it, NULL);
-
-        if (IS_ERR(inode))
-                RETURN(PTR_ERR(inode));
-
-        /* no directory data updates when intents rule */
-        if (dentry->d_it && dentry->d_it->it_disposition)
-                d_instantiate(dentry, inode);
-        else
-                err = ext2_add_nondir(dentry, inode);
-
-        return err;
+        return ll_create_it(dir, dentry, mode, &nd->intent);
 }
+#endif
 
-static int ll_symlink(struct inode *dir, struct dentry *dentry,
-                      const char *symname)
+static int ll_mknod_raw(struct nameidata *nd, int mode, dev_t rdev)
 {
-        unsigned l = strlen(symname);
-        struct inode *inode;
-        struct ll_inode_info *lli;
-        int err = 0;
+        struct ptlrpc_request *request = NULL;
+        struct inode *dir = nd->dentry->d_inode;
+        const char *name = nd->last.name;
+        int len = nd->last.len;
+        struct ll_sb_info *sbi = ll_i2sbi(dir);
+        struct mdc_op_data op_data;
+        int err = -EMLINK;
         ENTRY;
 
-        inode = ll_create_node(dir, dentry->d_name.name, dentry->d_name.len,
-                               symname, l, S_IFLNK | S_IRWXUGO, 0,
-                               dentry->d_it, NULL);
-        if (IS_ERR(inode))
-                RETURN(PTR_ERR(inode));
-
-        lli = ll_i2info(inode);
-
-        OBD_ALLOC(lli->lli_symlink_name, l + 1);
-        /* this _could_ be a non-fatal error, since the symlink is already
-         * stored on the MDS by this point, and we can re-get it in readlink.
-         */
-        if (!lli->lli_symlink_name)
-                RETURN(-ENOMEM);
+        CDEBUG(D_VFSTRACE, "VFS Op:name=%s,dir=%lu/%u(%p)\n",
+               name, dir->i_ino, dir->i_generation, dir);
 
-        memcpy(lli->lli_symlink_name, symname, l + 1);
-        inode->i_size = l;
-
-        /* no directory data updates when intents rule */
-        if (dentry->d_it && dentry->d_it->it_disposition)
-                d_instantiate(dentry, inode);
-        else
-                err = ext2_add_nondir(dentry, inode);
+        if (dir->i_nlink >= EXT3_LINK_MAX)
+                RETURN(err);
 
+        mode &= ~current->fs->umask;
+
+        switch (mode & S_IFMT) {
+        case 0:
+        case S_IFREG:
+                mode |= S_IFREG; /* for mode = 0 case, fallthrough */
+        case S_IFCHR:
+        case S_IFBLK:
+        case S_IFIFO:
+        case S_IFSOCK:
+                ll_prepare_mdc_op_data(&op_data, dir, NULL, name, len, 0);
+                err = md_create(sbi->ll_mdc_exp, &op_data, NULL, 0, mode,
+                                current->fsuid, current->fsgid, rdev, &request);
+                ptlrpc_req_finished(request);
+                break;
+        case S_IFDIR:
+                err = -EPERM;
+                break;
+        default:
+                err = -EINVAL;
+        }
         RETURN(err);
 }
 
-static int ll_link(struct dentry *old_dentry, struct inode * dir,
-                   struct dentry *dentry)
+static int ll_mknod(struct inode *dir, struct dentry *child, int mode,
+                    ll_dev_t rdev)
 {
-        int err;
-        struct inode *inode = old_dentry->d_inode;
-
-#warning FIXME: still needs intent support
-        if (S_ISDIR(inode->i_mode))
-                return -EPERM;
+        struct ptlrpc_request *request = NULL;
+        struct inode *inode = NULL;
+        const char *name = child->d_name.name;
+        int len = child->d_name.len;
+        struct ll_sb_info *sbi = ll_i2sbi(dir);
+        struct mdc_op_data op_data;
+        int err = -EMLINK;
+        ENTRY;
 
-        if (inode->i_nlink >= EXT2_LINK_MAX)
-                return -EMLINK;
+        CDEBUG(D_VFSTRACE, "VFS Op:name=%s,dir=%lu/%u(%p)\n",
+               name, dir->i_ino, dir->i_generation, dir);
 
-        err = ll_mdc_link(old_dentry, dir,
-                          dentry->d_name.name, dentry->d_name.len);
-        if (err)
+        if (dir->i_nlink >= EXT3_LINK_MAX)
                 RETURN(err);
 
-        inode->i_ctime = CURRENT_TIME;
-        ext2_inc_count(inode);
-        atomic_inc(&inode->i_count);
+        mode &= ~current->fs->umask;
+
+        switch (mode & S_IFMT) {
+        case 0:
+        case S_IFREG:
+                mode |= S_IFREG; /* for mode = 0 case, fallthrough */
+        case S_IFCHR:
+        case S_IFBLK:
+        case S_IFIFO:
+        case S_IFSOCK:
+                ll_prepare_mdc_op_data(&op_data, dir, NULL, name, len, 0);
+                err = md_create(sbi->ll_mdc_exp, &op_data, NULL, 0, mode,
+                                current->fsuid, current->fsgid, rdev, &request);
+                err = ll_prep_inode(sbi->ll_osc_exp, sbi->ll_mdc_exp,
+                                    &inode, request, 0, child->d_sb);
+                if (err)
+                        GOTO(out_err, err);
+                break;
+        case S_IFDIR:
+                RETURN(-EPERM);
+                break;
+        default:
+                RETURN(-EINVAL);
+        }
 
-        return ext2_add_nondir(dentry, inode);
+        d_instantiate(child, inode);
+ out_err:
+        ptlrpc_req_finished(request);
+        RETURN(err);
 }
 
-static int ll_mkdir(struct inode * dir, struct dentry * dentry, int mode)
+static int ll_symlink_raw(struct nameidata *nd, const char *tgt)
 {
-        struct inode * inode;
+        struct inode *dir = nd->dentry->d_inode;
+        const char *name = nd->last.name;
+        int len = nd->last.len;
+        struct ptlrpc_request *request = NULL;
+        struct ll_sb_info *sbi = ll_i2sbi(dir);
+        struct mdc_op_data op_data;
         int err = -EMLINK;
         ENTRY;
 
-        if (dir->i_nlink >= EXT2_LINK_MAX)
-                goto out;
-
-        ext2_inc_count(dir);
-
-        inode = ll_create_node(dir, dentry->d_name.name, dentry->d_name.len,
-                               NULL, 0, S_IFDIR | mode, 0, dentry->d_it, NULL);
-        err = PTR_ERR(inode);
-        if (IS_ERR(inode))
-                goto out_dir;
-
-        ext2_inc_count(inode);
+        CDEBUG(D_VFSTRACE, "VFS Op:name=%s,dir=%lu/%u(%p),target=%s\n",
+               name, dir->i_ino, dir->i_generation, dir, tgt);
 
-        err = ext2_make_empty(inode, dir);
-        if (err)
-                goto out_fail;
-
-        /* no directory data updates when intents rule */
-        if (dentry->d_it->it_disposition == 0) {
-                err = ll_add_link(dentry, inode);
-                if (err)
-                        goto out_fail;
-        }
-
-        d_instantiate(dentry, inode);
-out:
-        EXIT;
-        return err;
+        if (dir->i_nlink >= EXT3_LINK_MAX)
+                RETURN(err);
 
-out_fail:
-        ext2_dec_count(inode);
-        ext2_dec_count(inode);
-        iput(inode);
-        EXIT;
-out_dir:
-        ext2_dec_count(dir);
-        EXIT;
-        goto out;
+        ll_prepare_mdc_op_data(&op_data, dir, NULL, name, len, 0);
+        err = md_create(sbi->ll_mdc_exp, &op_data,
+                        tgt, strlen(tgt) + 1, S_IFLNK | S_IRWXUGO,
+                        current->fsuid, current->fsgid, 0, &request);
+        ptlrpc_req_finished(request);
+        RETURN(err);
 }
 
-static int ll_common_unlink(struct inode *dir, struct dentry *dentry,
-                            __u32 mode)
+static int ll_link_raw(struct nameidata *srcnd, struct nameidata *tgtnd)
 {
-        struct inode * inode = dentry->d_inode;
-        struct ext2_dir_entry_2 * de;
-        struct page * page;
-        int err = -ENOENT;
-
-        if (dentry->d_it && dentry->d_it->it_disposition) {
-                err = dentry->d_it->it_status;
-                GOTO(out, err);
-        }
-
-        de = ext2_find_entry(dir, dentry, &page);
-        if (!de)
-                goto out;
+        struct inode *src = srcnd->dentry->d_inode;
+        struct inode *dir = tgtnd->dentry->d_inode;
+        const char *name = tgtnd->last.name;
+        int len = tgtnd->last.len;
+        struct ptlrpc_request *request = NULL;
+        struct mdc_op_data op_data;
+        int err;
+        struct ll_sb_info *sbi = ll_i2sbi(dir);
 
-        err = ll_mdc_unlink(dir, dentry->d_inode, mode,
-                            dentry->d_name.name, dentry->d_name.len);
-        if (err)
-                goto out;
+        ENTRY;
+        CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),dir=%lu/%u(%p),target=%s\n",
+               src->i_ino, src->i_generation, src,
+               dir->i_ino, dir->i_generation, dir, name);
 
-        err = ext2_delete_entry(de, page);
-        if (err)
-                goto out;
+        ll_prepare_mdc_op_data(&op_data, src, dir, name, len, 0);
+        err = md_link(sbi->ll_mdc_exp, &op_data, &request);
+        ptlrpc_req_finished(request);
 
-        inode->i_ctime = dir->i_ctime;
-out:
-        ext2_dec_count(inode);
-        return err;
+        RETURN(err);
 }
 
-static int ll_unlink(struct inode *dir, struct dentry *dentry)
+
+static int ll_mkdir_raw(struct nameidata *nd, int mode)
 {
-        return ll_common_unlink(dir, dentry, S_IFREG);
+        struct inode *dir = nd->dentry->d_inode;
+        const char *name = nd->last.name;
+        int len = nd->last.len;
+        struct ptlrpc_request *request = NULL;
+        struct ll_sb_info *sbi = ll_i2sbi(dir);
+        struct mdc_op_data op_data;
+        int err = -EMLINK;
+        ENTRY;
+        CDEBUG(D_VFSTRACE, "VFS Op:name=%s,dir=%lu/%u(%p)\n",
+               name, dir->i_ino, dir->i_generation, dir);
+
+        mode = (mode & (S_IRWXUGO|S_ISVTX) & ~current->fs->umask) | S_IFDIR;
+        ll_prepare_mdc_op_data(&op_data, dir, NULL, name, len, 0);
+        err = md_create(sbi->ll_mdc_exp, &op_data, NULL, 0, mode,
+                        current->fsuid, current->fsgid, 0, &request);
+        ptlrpc_req_finished(request);
+        RETURN(err);
 }
 
-static int ll_rmdir(struct inode *dir, struct dentry *dentry)
+static int ll_rmdir_raw(struct nameidata *nd)
 {
-        struct inode * inode = dentry->d_inode;
-        int err = 0;
+        struct inode *dir = nd->dentry->d_inode;
+        const char *name = nd->last.name;
+        int len = nd->last.len;
+        struct ptlrpc_request *request = NULL;
+        struct mdc_op_data op_data;
+        int rc;
         ENTRY;
+        CDEBUG(D_VFSTRACE, "VFS Op:name=%s,dir=%lu/%u(%p)\n",
+               name, dir->i_ino, dir->i_generation, dir);
 
-        if (!dentry->d_it || dentry->d_it->it_disposition == 0) {
-                if (!ext2_empty_dir(inode))
-                        LBUG();
-
-                err = ll_common_unlink(dir, dentry, S_IFDIR);
-        } else
-                err = dentry->d_it->it_status;
-        if (err)
-                RETURN(err);
-        inode->i_size = 0;
-        ext2_dec_count(inode);
-        ext2_dec_count(dir);
-        RETURN(err);
+        ll_prepare_mdc_op_data(&op_data, dir, NULL, name, len, S_IFDIR);
+        rc = md_unlink(ll_i2sbi(dir)->ll_mdc_exp, &op_data, &request);
+        ptlrpc_req_finished(request);
+        RETURN(rc);
 }
 
-static int ll_rename(struct inode * old_dir, struct dentry * old_dentry,
-                     struct inode * new_dir, struct dentry * new_dentry)
+int ll_objects_destroy(struct ptlrpc_request *request, struct inode *dir)
 {
-        struct inode * old_inode = old_dentry->d_inode;
-        struct inode * new_inode = new_dentry->d_inode;
-        struct page * dir_page = NULL;
-        struct ext2_dir_entry_2 * dir_de = NULL;
-        struct ext2_dir_entry_2 * old_de;
-        struct page * old_page;
-        int err = -ENOENT;
-
-        if (new_dentry->d_it && new_dentry->d_it->it_disposition) { 
-               if (new_inode) {
-                       new_inode->i_ctime = CURRENT_TIME;
-                       new_inode->i_nlink--;
-               }
-                GOTO(out, err = new_dentry->d_it->it_status);
-        }
+        struct mds_body *body;
+        struct lov_mds_md *eadata;
+        struct lov_stripe_md *lsm = NULL;
+        struct obd_trans_info oti = { 0 };
+        struct obdo *oa;
+        int rc;
+        ENTRY;
 
-        err = ll_mdc_rename(old_dir, new_dir, old_dentry, new_dentry);
-        if (err)
-                goto out;
+        /* req is swabbed so this is safe */
+        body = lustre_msg_buf(request->rq_repmsg, 0, sizeof(*body));
 
-        old_de = ext2_find_entry (old_dir, old_dentry, &old_page);
-        if (!old_de)
-                goto out;
+        if (!(body->valid & OBD_MD_FLEASIZE))
+                RETURN(0);
 
-        if (S_ISDIR(old_inode->i_mode)) {
-                err = -EIO;
-                dir_de = ext2_dotdot(old_inode, &dir_page);
-                if (!dir_de)
-                        goto out_old;
+        if (body->eadatasize == 0) {
+                CERROR("OBD_MD_FLEASIZE set but eadatasize zero\n");
+                GOTO(out, rc = -EPROTO);
         }
 
-        if (new_inode) {
-                struct page *new_page;
-                struct ext2_dir_entry_2 *new_de;
-
-                err = -ENOTEMPTY;
-                if (dir_de && !ext2_empty_dir (new_inode))
-                        goto out_dir;
-
-                err = -ENOENT;
-                new_de = ext2_find_entry (new_dir, new_dentry, &new_page);
-                if (!new_de)
-                        goto out_dir;
-                ext2_inc_count(old_inode);
-                ext2_set_link(new_dir, new_de, new_page, old_inode);
-                new_inode->i_ctime = CURRENT_TIME;
-                if (dir_de)
-                        new_inode->i_nlink--;
-                ext2_dec_count(new_inode);
-        } else {
-                if (dir_de) {
-                        err = -EMLINK;
-                        if (new_dir->i_nlink >= EXT2_LINK_MAX)
-                                goto out_dir;
-                }
-                ext2_inc_count(old_inode);
-                err = ll_add_link(new_dentry, old_inode);
-                if (err) {
-                        ext2_dec_count(old_inode);
-                        goto out_dir;
+        /* The MDS sent back the EA because we unlinked the last reference
+         * to this file. Use this EA to unlink the objects on the OST.
+         * It's opaque so we don't swab here; we leave it to obd_unpackmd() to
+         * check it is complete and sensible. */
+        eadata = lustre_swab_repbuf(request, 1, body->eadatasize, NULL);
+        LASSERT(eadata != NULL);
+        if (eadata == NULL) {
+                CERROR("Can't unpack MDS EA data\n");
+                GOTO(out, rc = -EPROTO);
+        }
+
+        rc = obd_unpackmd(ll_i2obdexp(dir), &lsm, eadata, body->eadatasize);
+        if (rc < 0) {
+                CERROR("obd_unpackmd: %d\n", rc);
+                GOTO(out, rc);
+        }
+        LASSERT(rc >= sizeof(*lsm));
+
+        oa = obdo_alloc();
+        if (oa == NULL)
+                GOTO(out_free_memmd, rc = -ENOMEM);
+
+        oa->o_id = lsm->lsm_object_id;
+        oa->o_gr = lsm->lsm_object_gr;
+        oa->o_mode = body->mode & S_IFMT;
+        oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLGROUP;
+
+        if (body->valid & OBD_MD_FLCOOKIE) {
+                oa->o_valid |= OBD_MD_FLCOOKIE;
+                oti.oti_logcookies =
+                        lustre_msg_buf(request->rq_repmsg, 2,
+                                       sizeof(struct llog_cookie) *
+                                       lsm->lsm_stripe_count);
+                if (oti.oti_logcookies == NULL) {
+                        oa->o_valid &= ~OBD_MD_FLCOOKIE;
+                        body->valid &= ~OBD_MD_FLCOOKIE;
                 }
-                if (dir_de)
-                        ext2_inc_count(new_dir);
         }
 
-        ext2_delete_entry (old_de, old_page);
-        ext2_dec_count(old_inode);
+        rc = obd_destroy(ll_i2obdexp(dir), oa, lsm, &oti);
+        obdo_free(oa);
+        if (rc)
+                CERROR("obd destroy objid "LPX64" error %d\n",
+                       lsm->lsm_object_id, rc);
+ out_free_memmd:
+        obd_free_memmd(ll_i2obdexp(dir), &lsm);
+ out:
+        return rc;
+}
 
-        if (dir_de) {
-                ext2_set_link(old_inode, dir_de, dir_page, new_dir);
-                ext2_dec_count(old_dir);
-        }
-        return 0;
+static int ll_unlink_raw(struct nameidata *nd)
+{
+        struct inode *dir = nd->dentry->d_inode;
+        const char *name = nd->last.name;
+        int len = nd->last.len;
+        struct ptlrpc_request *request = NULL;
+        struct mdc_op_data op_data;
+        int rc;
+        ENTRY;
+        CDEBUG(D_VFSTRACE, "VFS Op:name=%s,dir=%lu/%u(%p)\n",
+               name, dir->i_ino, dir->i_generation, dir);
+
+        ll_prepare_mdc_op_data(&op_data, dir, NULL, name, len, 0);
+        rc = md_unlink(ll_i2sbi(dir)->ll_mdc_exp, &op_data, &request);
+        if (rc)
+                GOTO(out, rc);
 
-out_dir:
-        if (dir_de) {
-                kunmap(dir_page);
-                page_cache_release(dir_page);
+        rc = ll_objects_destroy(request, dir);
+ out:
+        ptlrpc_req_finished(request);
+        RETURN(rc);
+}
+
+static int ll_rename_raw(struct nameidata *oldnd, struct nameidata *newnd)
+{
+        struct inode *src = oldnd->dentry->d_inode;
+        struct inode *tgt = newnd->dentry->d_inode;
+        const char *oldname = oldnd->last.name;
+        int oldlen  = oldnd->last.len;
+        const char *newname = newnd->last.name;
+        int newlen  = newnd->last.len;
+        struct ptlrpc_request *request = NULL;
+        struct ll_sb_info *sbi = ll_i2sbi(src);
+        struct mdc_op_data op_data;
+        int err;
+        ENTRY;
+        CDEBUG(D_VFSTRACE, "VFS Op:oldname=%s,src_dir=%lu/%u(%p),newname=%s,"
+               "tgt_dir=%lu/%u(%p)\n", oldname, src->i_ino, src->i_generation,
+               src, newname, tgt->i_ino, tgt->i_generation, tgt);
+
+        ll_prepare_mdc_op_data(&op_data, src, tgt, NULL, 0, 0);
+        err = md_rename(sbi->ll_mdc_exp, &op_data,
+                        oldname, oldlen, newname, newlen, &request);
+        if (!err) {
+                err = ll_objects_destroy(request, src);
         }
-out_old:
-        kunmap(old_page);
-        page_cache_release(old_page);
-out:
-        return err;
+
+        ptlrpc_req_finished(request);
+
+        RETURN(err);
 }
 
 struct inode_operations ll_dir_inode_operations = {
-        create:         ll_create,
-        lookup2:        ll_lookup2,
-        link:           ll_link,
-        unlink:         ll_unlink,
-        symlink:        ll_symlink,
-        mkdir:          ll_mkdir,
-        rmdir:          ll_rmdir,
-        mknod:          ll_mknod,
-        rename:         ll_rename,
-        setattr:        ll_setattr
+        .link_raw           = ll_link_raw,
+        .unlink_raw         = ll_unlink_raw,
+        .symlink_raw        = ll_symlink_raw,
+        .mkdir_raw          = ll_mkdir_raw,
+        .rmdir_raw          = ll_rmdir_raw,
+        .mknod_raw          = ll_mknod_raw,
+        .mknod              = ll_mknod,
+        .rename_raw         = ll_rename_raw,
+        .setattr            = ll_setattr,
+        .setattr_raw        = ll_setattr_raw,
+#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
+        .create_it          = ll_create_it,
+        .lookup_it          = ll_lookup_it,
+        .revalidate_it      = ll_inode_revalidate_it,
+#else
+        .lookup             = ll_lookup_nd,
+        .create             = ll_create_nd,
+        .getattr_it         = ll_getattr,
+#endif
 };