Whamcloud - gitweb
Merge b_md to HEAD for 0.5.19 release.
[fs/lustre-release.git] / lustre / llite / file.c
index 87c9012..6b37d99 100644 (file)
 int ll_inode_setattr(struct inode *inode, struct iattr *attr, int do_trunc);
 extern int ll_setattr(struct dentry *de, struct iattr *attr);
 
-int ll_create_objects(struct super_block *sb, obd_id id, uid_t uid, gid_t gid,
-                      struct lov_stripe_md **lsmp)
+static int ll_mdc_open(struct lustre_handle *mdc_conn, struct inode *inode,
+                       struct file *file, struct lov_mds_md *lmm, int lmm_size)
 {
+        struct ptlrpc_request *req = NULL;
+        struct ll_file_data *fd;
+        int rc;
+        ENTRY;
+
+        LASSERT(!file->private_data);
+
+        fd = kmem_cache_alloc(ll_file_data_slab, SLAB_KERNEL);
+        if (!fd)
+                RETURN(-ENOMEM);
+
+        memset(fd, 0, sizeof(*fd));
+        fd->fd_mdshandle.addr = (__u64)(unsigned long)file;
+        get_random_bytes(&fd->fd_mdshandle.cookie,
+                         sizeof(fd->fd_mdshandle.cookie));
+
+        rc = mdc_open(mdc_conn, inode->i_ino, S_IFREG | inode->i_mode,
+                      file->f_flags, lmm, lmm_size, &fd->fd_mdshandle, &req);
+
+        /* This is the "reply" refcount. */
+        ptlrpc_req_finished(req);
+
+        if (rc)
+                GOTO(out_fd, rc);
+
+        fd->fd_req = req;
+        file->private_data = fd;
+
+        if (!fd->fd_mdshandle.addr ||
+            fd->fd_mdshandle.addr == (__u64)(unsigned long)file) {
+                CERROR("hmm, mdc_open didn't assign fd_mdshandle?\n");
+                /* XXX handle this how, abort or is it non-fatal? */
+        }
+
+        file->f_flags &= ~O_LOV_DELAY_CREATE;
+        RETURN(0);
+
+out_fd:
+        fd->fd_mdshandle.cookie = DEAD_HANDLE_MAGIC;
+        kmem_cache_free(ll_file_data_slab, fd);
+
+        return -abs(rc);
+}
+
+static int ll_mdc_close(struct lustre_handle *mdc_conn, struct inode *inode,
+                        struct file *file)
+{
+        struct ll_file_data *fd = file->private_data;
+        struct ptlrpc_request *req = NULL;
+        unsigned long flags;
+        struct obd_import *imp = fd->fd_req->rq_import;
+        int rc;
+
+        /* Complete the open request and remove it from replay list */
+        DEBUG_REQ(D_HA, fd->fd_req, "matched open req %p", fd->fd_req);
+        rc = mdc_close(&ll_i2sbi(inode)->ll_mdc_conn, inode->i_ino,
+                       inode->i_mode, &fd->fd_mdshandle, &req);
+
+        if (rc)
+                CERROR("inode %lu close failed: rc = %d\n", inode->i_ino, rc);
+        ptlrpc_req_finished(req);
+
+        spin_lock_irqsave(&imp->imp_lock, flags);
+        if (fd->fd_req->rq_transno) {
+                /* This caused an EA to be written, need to replay as a normal
+                 * transaction now.  Our reference is now effectively owned
+                 * by the imp_replay_list, and we'll be committed just like
+                 * other transno-having requests now.
+                 */
+                fd->fd_req->rq_flags &= ~PTL_RPC_FL_REPLAY;
+                spin_unlock_irqrestore(&imp->imp_lock, flags);
+        } else {
+                /* No transno means that we can just drop our ref. */
+                spin_unlock_irqrestore(&imp->imp_lock, flags);
+                ptlrpc_req_finished(fd->fd_req);
+        }
+        fd->fd_mdshandle.cookie = DEAD_HANDLE_MAGIC;
+        file->private_data = NULL;
+        kmem_cache_free(ll_file_data_slab, fd);
+
+        return -abs(rc);
+}
+
+static int ll_osc_open(struct lustre_handle *conn, struct inode *inode,
+                       struct file *file, struct lov_stripe_md *lsm)
+{
+        struct ll_file_data *fd;
         struct obdo *oa;
         int rc;
         ENTRY;
@@ -43,231 +130,245 @@ int ll_create_objects(struct super_block *sb, obd_id id, uid_t uid, gid_t gid,
         oa = obdo_alloc();
         if (!oa)
                 RETURN(-ENOMEM);
+        oa->o_id = lsm->lsm_object_id;
+        oa->o_mode = S_IFREG;
+        oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLSIZE |
+                OBD_MD_FLBLOCKS;
+        rc = obd_open(conn, oa, lsm);
+        if (rc)
+                GOTO(out, rc);
 
-        oa->o_mode = S_IFREG | 0600;
-        oa->o_id = id;
-        oa->o_uid = uid;
-        oa->o_gid = gid;
-        oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLMODE |
-                OBD_MD_FLUID | OBD_MD_FLGID;
-        rc = obd_create(ll_s2obdconn(sb), oa, lsmp);
-        obdo_free(oa);
+        obdo_to_inode(inode, oa, OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
 
-        if (!rc)
-                LASSERT(*lsmp && (*lsmp)->lsm_object_id);
+        fd = file->private_data;
+        obd_oa2handle(&fd->fd_osthandle, oa);
+
+        atomic_inc(&ll_i2info(inode)->lli_open_count);
+out:
+        obdo_free(oa);
         RETURN(rc);
 }
 
-static int ll_file_open(struct inode *inode, struct file *file)
+/* Caller must hold lli_open_sem to protect lli->lli_smd from changing and
+ * duplicate objects from being created.  We only install lsm to lli_smd if
+ * the mdc open was successful (hence stored stripe MD on MDS), otherwise
+ * other nodes could try to create different objects for the same file.
+ */
+static int ll_create_open_obj(struct lustre_handle *conn, struct inode *inode,
+                              struct file *file, struct lov_stripe_md *lsm)
 {
-        struct ll_sb_info *sbi = ll_i2sbi(inode);
         struct ll_inode_info *lli = ll_i2info(inode);
-        struct lustre_handle *conn = ll_i2obdconn(inode);
-        struct ptlrpc_request *req = NULL;
-        struct ll_file_data *fd;
-        struct obdo *oa;
-        struct lov_stripe_md *lsm;
         struct lov_mds_md *lmm = NULL;
         int lmm_size = 0;
-        int rc = 0;
+        struct obdo *oa;
+        int rc, err;
         ENTRY;
 
-        LASSERT(!file->private_data);
-
-        lsm = lli->lli_smd;
+        oa = obdo_alloc();
+        if (!oa)
+                RETURN(-ENOMEM);
 
-        /*  delayed create of object (intent created inode) */
-        /*  XXX object needs to be cleaned up if mdc_open fails */
-        /*  XXX error handling appropriate here? */
-        if (lsm == NULL) {
-                if (file->f_flags & O_LOV_DELAY_CREATE) {
-                        CDEBUG(D_INODE, "delaying object creation\n");
-                        RETURN(0);
-                }
-                down(&lli->lli_open_sem);
-                /* Check to see if we lost the race */
-                if (!lli->lli_smd)
-                        rc = ll_create_objects(inode->i_sb, inode->i_ino, 0, 0,
-                                               &lli->lli_smd);
-                up(&lli->lli_open_sem);
-                if (rc)
-                        RETURN(rc);
+        oa->o_mode = S_IFREG | 0600;
+        oa->o_id = inode->i_ino;
+        /* Keep these 0 for now, because chown/chgrp does not change the
+         * ownership on the OST, and we don't want to allow BA OST NFS
+         * users to access these objects by mistake.
+         */
+        oa->o_uid = 0;
+        oa->o_gid = 0;
+        oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLMODE |
+                OBD_MD_FLUID | OBD_MD_FLGID;
 
-                lsm = lli->lli_smd;
+        rc = obd_create(conn, oa, &lsm);
+        if (rc) {
+                CERROR("error creating objects for inode %lu: rc = %d\n",
+                       inode->i_ino, rc);
+                GOTO(out_oa, rc);
         }
 
-        /* XXX We should only send this to MDS if we just created these
-         *     objects, except we also need to handle the user-stripe case.
-         */
-        rc = obd_packmd(conn, &lmm, lli->lli_smd);
+        LASSERT(lsm && lsm->lsm_object_id);
+        rc = obd_packmd(conn, &lmm, lsm);
         if (rc < 0)
-                GOTO(out, rc);
+                GOTO(out_destroy, rc);
 
         lmm_size = rc;
 
-        fd = kmem_cache_alloc(ll_file_data_slab, SLAB_KERNEL);
-        if (!fd) {
-                if (lmm)
-                        obd_free_wiremd(conn, &lmm);
-                GOTO(out, rc = -ENOMEM);
-        }
-        memset(fd, 0, sizeof(*fd));
+        rc = ll_mdc_open(&ll_i2sbi(inode)->ll_mdc_conn,inode,file,lmm,lmm_size);
 
-        fd->fd_mdshandle.addr = (__u64)(unsigned long)file;
-        get_random_bytes(&fd->fd_mdshandle.cookie,
-                         sizeof(fd->fd_mdshandle.cookie));
-        rc = mdc_open(&sbi->ll_mdc_conn, inode->i_ino, S_IFREG | inode->i_mode,
-                      file->f_flags, lmm, lmm_size, &fd->fd_mdshandle, &req);
-        if (lmm)
-                obd_free_wiremd(conn, &lmm);
-        fd->fd_req = req;
+        obd_free_wiremd(conn, &lmm);
 
-        /* This is the "reply" refcount. */
-        ptlrpc_req_finished(req);
-        if (rc)
-                GOTO(out_req, -abs(rc));
-        if (!fd->fd_mdshandle.addr ||
-            fd->fd_mdshandle.addr == (__u64)(unsigned long)file) {
-                CERROR("hmm, mdc_open didn't assign fd_mdshandle?\n");
-                /* XXX handle this how, abort or is it non-fatal? */
+        /* If we couldn't complete mdc_open() and store the stripe MD on the
+         * MDS, we need to destroy the objects now or they will be leaked.
+         */
+        if (rc) {
+                CERROR("error MDS opening %lu with delayed create: rc %d\n",
+                       inode->i_ino, rc);
+                GOTO(out_destroy, rc);
         }
+        lli->lli_smd = lsm;
 
-        oa = obdo_alloc();
-        if (!oa)
-                GOTO(out_mdc, rc = -EINVAL);
+        EXIT;
+out_oa:
+        obdo_free(oa);
+        return rc;
 
+out_destroy:
+        obdo_from_inode(oa, inode, OBD_MD_FLTYPE);
         oa->o_id = lsm->lsm_object_id;
-        oa->o_mode = S_IFREG;
-        oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLSIZE |
-                OBD_MD_FLBLOCKS;
-        rc = obd_open(ll_i2obdconn(inode), oa, lsm);
-        obdo_to_inode(inode, oa, oa->o_valid & (OBD_MD_FLSIZE|OBD_MD_FLBLOCKS));
+        oa->o_valid |= OBD_MD_FLID;
+        err = obd_destroy(conn, oa, lsm);
+        obd_free_memmd(conn, &lsm);
+        if (err)
+                CERROR("error uncreating inode %lu objects: rc %d\n",
+                       inode->i_ino, err);
+        goto out_oa;
+}
 
-        obd_oa2handle(&fd->fd_osthandle, oa);
-        obdo_free(oa);
+/* Open a file, and (for the very first open) create objects on the OSTs at
+ * this time.  If opened with O_LOV_DELAY_CREATE, then we don't do the object
+ * creation or open until ll_lov_setstripe() ioctl is called.  We grab
+ * lli_open_sem to ensure no other process will create objects, send the
+ * stripe MD to the MDS, or try to destroy the objects if that fails.
+ *
+ * If we already have the stripe MD locally, we don't request it in
+ * mdc_open() by passing a lmm_size = 0.
+ *
+ * It is up to the application to ensure no other processes open this file
+ * in the O_LOV_DELAY_CREATE case, or the default striping pattern will be
+ * used.  We might be able to avoid races of that sort by getting lli_open_sem
+ * before returning in the O_LOV_DELAY_CREATE case and dropping it here
+ * or in ll_file_release(), but I'm not sure that is desirable/necessary.
+ */
+static int ll_file_open(struct inode *inode, struct file *file)
+{
+        struct ll_sb_info *sbi = ll_i2sbi(inode);
+        struct ll_inode_info *lli = ll_i2info(inode);
+        struct lustre_handle *conn = ll_i2obdconn(inode);
+        struct lov_stripe_md *lsm;
+        int rc = 0;
+        ENTRY;
 
-        if (rc)
-                GOTO(out_mdc, rc = -abs(rc));
+        lsm = lli->lli_smd;
+        if (lsm == NULL) {
+                if (file->f_flags & O_LOV_DELAY_CREATE) {
+                        CDEBUG(D_INODE, "delaying object creation\n");
+                        RETURN(0);
+                }
 
-        atomic_inc(&lli->lli_open_count);
+                down(&lli->lli_open_sem);
+                if (!lli->lli_smd) {
+                        rc = ll_create_open_obj(conn, inode, file, NULL);
+                        up(&lli->lli_open_sem);
+                } else {
+                        CERROR("stripe already set on ino %lu\n", inode->i_ino);
+                        up(&lli->lli_open_sem);
+                        rc = ll_mdc_open(&sbi->ll_mdc_conn, inode, file,NULL,0);
+                }
+                lsm = lli->lli_smd;
+        } else
+                rc = ll_mdc_open(&sbi->ll_mdc_conn, inode, file, NULL, 0);
 
-        file->private_data = fd;
+        if (rc)
+                RETURN(rc);
 
+        rc = ll_osc_open(conn, inode, file, lsm);
+        if (rc)
+                GOTO(out_close, rc);
         RETURN(0);
-out_mdc:
-        mdc_close(&sbi->ll_mdc_conn, inode->i_ino,
-                  S_IFREG, &fd->fd_mdshandle, &req);
-out_req:
-        ptlrpc_req_finished(req); /* once for an early "commit" */
-//out_fd:
-        fd->fd_mdshandle.cookie = DEAD_HANDLE_MAGIC;
-        kmem_cache_free(ll_file_data_slab, fd);
-out:
+out_close:
+        ll_mdc_close(&sbi->ll_mdc_conn, inode, file);
         return rc;
 }
 
 int ll_size_lock(struct inode *inode, struct lov_stripe_md *lsm, obd_off start,
-                 int mode, struct lustre_handle **lockhs_p)
+                 int mode, struct lustre_handle *lockh)
 {
         struct ll_sb_info *sbi = ll_i2sbi(inode);
         struct ldlm_extent extent;
-        struct lustre_handle *lockhs = NULL;
-        int rc, flags = 0, stripe_count;
+        int rc, flags = 0;
         ENTRY;
 
-        if (sbi->ll_flags & LL_SBI_NOLCK) {
-                *lockhs_p = NULL;
+        /* XXX phil: can we do this?  won't it screw the file size up? */
+        if (sbi->ll_flags & LL_SBI_NOLCK)
                 RETURN(0);
-        }
-
-        stripe_count = lsm->lsm_stripe_count;
-        if (!stripe_count)
-                stripe_count = 1;
-
-        OBD_ALLOC(lockhs, stripe_count * sizeof(*lockhs));
-        if (lockhs == NULL)
-                RETURN(-ENOMEM);
 
         extent.start = start;
         extent.end = OBD_OBJECT_EOF;
 
         rc = obd_enqueue(&sbi->ll_osc_conn, lsm, NULL, LDLM_EXTENT, &extent,
                          sizeof(extent), mode, &flags, ll_lock_callback,
-                         inode, sizeof(*inode), lockhs);
-        if (rc != ELDLM_OK) {
-                CERROR("lock enqueue: %d\n", rc);
-                OBD_FREE(lockhs, stripe_count * sizeof(*lockhs));
-        } else
-                *lockhs_p = lockhs;
+                         inode, sizeof(*inode), lockh);
         RETURN(rc);
 }
 
 int ll_size_unlock(struct inode *inode, struct lov_stripe_md *lsm, int mode,
-                   struct lustre_handle *lockhs)
+                   struct lustre_handle *lockh)
 {
         struct ll_sb_info *sbi = ll_i2sbi(inode);
-        int rc, stripe_count;
+        int rc;
         ENTRY;
 
+        /* XXX phil: can we do this?  won't it screw the file size up? */
         if (sbi->ll_flags & LL_SBI_NOLCK)
                 RETURN(0);
 
-        if (lockhs == NULL) {
-                LBUG();
-                RETURN(-EINVAL);
-        }
-
-        rc = obd_cancel(&sbi->ll_osc_conn, lsm, mode, lockhs);
+        rc = obd_cancel(&sbi->ll_osc_conn, lsm, mode, lockh);
         if (rc != ELDLM_OK) {
                 CERROR("lock cancel: %d\n", rc);
                 LBUG();
         }
 
-        stripe_count = lsm->lsm_stripe_count;
-        if (!stripe_count)
-                stripe_count = 1;
-
-        OBD_FREE(lockhs, stripe_count * sizeof(*lockhs));
         RETURN(rc);
 }
 
 int ll_file_size(struct inode *inode, struct lov_stripe_md *lsm)
 {
         struct ll_sb_info *sbi = ll_i2sbi(inode);
-        struct lustre_handle *lockhs;
+        //struct lustre_handle lockh = { 0, 0 };
         struct obdo oa;
-        int err, rc;
+        //int err;
+        int rc;
         ENTRY;
 
         LASSERT(lsm);
         LASSERT(sbi);
 
-        rc = ll_size_lock(inode, lsm, 0, LCK_PR, &lockhs);
+        /* XXX do not yet need size lock - OST size always correct (sync write)
+        rc = ll_size_lock(inode, lsm, 0, LCK_PR, &lockh);
         if (rc != ELDLM_OK) {
                 CERROR("lock enqueue: %d\n", rc);
                 RETURN(rc);
         }
+        */
 
         memset(&oa, 0, sizeof oa);
         oa.o_id = lsm->lsm_object_id;
         oa.o_mode = S_IFREG;
         oa.o_valid = OBD_MD_FLID|OBD_MD_FLTYPE|OBD_MD_FLSIZE|OBD_MD_FLBLOCKS;
         rc = obd_getattr(&sbi->ll_osc_conn, &oa, lsm);
-        if (!rc)
-                obdo_to_inode(inode, &oa,
-                              oa.o_valid & ~(OBD_MD_FLTYPE | OBD_MD_FLMODE));
-
-        err = ll_size_unlock(inode, lsm, LCK_PR, lockhs);
+        if (!rc) {
+                obdo_to_inode(inode, &oa, OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
+                CDEBUG(D_INODE, LPX64" size %Lu/%Lu\n",
+                       lsm->lsm_object_id, inode->i_size, inode->i_size);
+        }
+        /* XXX do not need size lock, because OST size always correct (sync write)
+        err = ll_size_unlock(inode, lsm, LCK_PR, &lockh);
         if (err != ELDLM_OK) {
                 CERROR("lock cancel: %d\n", err);
-                LBUG();
+                if (!rc)
+                        rc = err;
         }
+        */
         RETURN(rc);
 }
 
+/* While this returns an error code, fput() the caller does not, so we need
+ * to make every effort to clean up all of our state here.  Also, applications
+ * rarely check close errors and even if an error is returned they will not
+ * re-try the close call.
+ */
 static int ll_file_release(struct inode *inode, struct file *file)
 {
-        struct ptlrpc_request *req = NULL;
         struct ll_file_data *fd;
         struct obdo oa;
         struct ll_sb_info *sbi = ll_i2sbi(inode);
@@ -278,93 +379,34 @@ static int ll_file_release(struct inode *inode, struct file *file)
         ENTRY;
 
         fd = (struct ll_file_data *)file->private_data;
-        if (!fd) {
-                LASSERT(file->f_flags & O_LOV_DELAY_CREATE);
-                GOTO(out, rc = 0);
-        }
+        if (!fd) /* no process opened the file after an mcreate */
+                RETURN(rc = 0);
 
         memset(&oa, 0, sizeof(oa));
         oa.o_id = lsm->lsm_object_id;
         oa.o_mode = S_IFREG;
         oa.o_valid = OBD_MD_FLTYPE | OBD_MD_FLID;
         obd_handle2oa(&oa, &fd->fd_osthandle);
-        rc = obd_close(ll_i2obdconn(inode), &oa, lsm);
+        rc = obd_close(&sbi->ll_osc_conn, &oa, lsm);
         if (rc)
-                GOTO(out_mdc, rc = -abs(rc));
-
-#if 0
-#error "This should only be done on the node that already has the EOF lock"
-#error "and only in the case where the file size actually changed.  For now"
-#error "we don't care about the size on the MDS, since we never use it (the"
-#error "OST always has the authoritative size and we don't even use the MDS."
-        /* If this fails and we goto out_fd, the file size on the MDS is out of
-         * date.  Is that a big deal? */
-        if (file->f_mode & FMODE_WRITE) {
-                struct lustre_handle *lockhs;
-
-                rc = ll_size_lock(inode, lsm, 0, LCK_PR, &lockhs);
-                if (rc)
-                        GOTO(out_mdc, -abs(rc));
-
-                oa.o_id = lsm->lsm_object_id;
-                oa.o_mode = S_IFREG;
-                oa.o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLSIZE |
-                        OBD_MD_FLBLOCKS;
-                rc = obd_getattr(&sbi->ll_osc_conn, &oa, lsm);
-                if (!rc) {
-                        struct iattr attr;
-                        attr.ia_valid = (ATTR_MTIME | ATTR_CTIME | ATTR_ATIME |
-                                         ATTR_SIZE);
-                        attr.ia_mtime = inode->i_mtime;
-                        attr.ia_ctime = inode->i_ctime;
-                        attr.ia_atime = inode->i_atime;
-                        attr.ia_size = oa.o_size;
-
-                        inode->i_blocks = oa.o_blocks;
-
-                        /* XXX: this introduces a small race that we should
-                         * evaluate */
-                        rc = ll_inode_setattr(inode, &attr, 0);
-                }
-                rc2 = ll_size_unlock(inode, lli->lli_smd, LCK_PR, lockhs);
-                if (rc2) {
-                        CERROR("lock cancel: %d\n", rc);
-                        LBUG();
-                        if (!rc)
-                                rc = rc2;
-                }
-        }
-#endif
+                CERROR("inode %lu object close failed: rc = %d\n",
+                       inode->i_ino, rc);
 
-out_mdc:
-        rc2 = mdc_close(&sbi->ll_mdc_conn, inode->i_ino,
-                        S_IFREG, &fd->fd_mdshandle, &req);
-        ptlrpc_req_finished(req);
-        if (rc2) {
-                if (!rc)
-                        rc = -abs(rc2);
-                GOTO(out_fd, rc);
-        }
-        DEBUG_REQ(D_HA, fd->fd_req, "matched open for this close: ");
-        ptlrpc_req_finished(fd->fd_req);
+        rc2 = ll_mdc_close(&sbi->ll_mdc_conn, inode, file);
+        if (rc2 && !rc)
+                rc = rc2;
 
         if (atomic_dec_and_test(&lli->lli_open_count)) {
                 CDEBUG(D_INFO, "last close, cancelling unused locks\n");
-                rc = obd_cancel_unused(ll_i2obdconn(inode), lsm, 0);
-                if (rc)
+                rc2 = obd_cancel_unused(&sbi->ll_osc_conn, lsm, 0);
+                if (rc2 && !rc) {
+                        rc = rc2;
                         CERROR("obd_cancel_unused: %d\n", rc);
-        } else {
+                }
+        } else
                 CDEBUG(D_INFO, "not last close, not cancelling unused locks\n");
-        }
-
-        EXIT;
 
-out_fd:
-        fd->fd_mdshandle.cookie = DEAD_HANDLE_MAGIC;
-        file->private_data = NULL;
-        kmem_cache_free(ll_file_data_slab, fd);
-out:
-        return rc;
+        RETURN(rc);
 }
 
 static inline void ll_remove_suid(struct inode *inode)
@@ -401,7 +443,7 @@ int ll_lock_callback(struct ldlm_lock *lock, struct ldlm_lock_desc *new,
                      void *data, __u32 data_len, int flag)
 {
         struct inode *inode = data;
-        struct lustre_handle lockh;
+        struct lustre_handle lockh = { 0, 0 };
         int rc;
         ENTRY;
 
@@ -438,7 +480,7 @@ static ssize_t ll_file_read(struct file *filp, char *buf, size_t count,
         struct ll_file_data *fd = (struct ll_file_data *)filp->private_data;
         struct inode *inode = filp->f_dentry->d_inode;
         struct ll_sb_info *sbi = ll_i2sbi(inode);
-        struct lustre_handle *lockhs = NULL;
+        struct lustre_handle lockh = { 0, 0 };
         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
         int flags = 0;
         ldlm_error_t err;
@@ -449,17 +491,13 @@ static ssize_t ll_file_read(struct file *filp, char *buf, size_t count,
          * call us */
         retval = ll_file_size(inode, lsm);
         if (retval < 0) {
-                CERROR("ll_file_size: %d\n", retval);
+                CERROR("ll_file_size: "LPSZ"\n", retval);
                 RETURN(retval);
         }
 
         if (!(fd->fd_flags & LL_FILE_IGNORE_LOCK) &&
             !(sbi->ll_flags & LL_SBI_NOLCK)) {
                 struct ldlm_extent extent;
-                OBD_ALLOC(lockhs, lsm->lsm_stripe_count * sizeof(*lockhs));
-                if (!lockhs)
-                        RETURN(-ENOMEM);
-
                 extent.start = *ppos;
                 extent.end = *ppos + count;
                 CDEBUG(D_INFO, "Locking inode %lu, start "LPU64" end "LPU64"\n",
@@ -468,15 +506,14 @@ static ssize_t ll_file_read(struct file *filp, char *buf, size_t count,
                 err = obd_enqueue(&sbi->ll_osc_conn, lsm, NULL, LDLM_EXTENT,
                                   &extent, sizeof(extent), LCK_PR, &flags,
                                   ll_lock_callback, inode, sizeof(*inode),
-                                  lockhs);
+                                  &lockh);
                 if (err != ELDLM_OK) {
-                        OBD_FREE(lockhs, lsm->lsm_stripe_count*sizeof(*lockhs));
                         CERROR("lock enqueue: err: %d\n", err);
                         RETURN(err);
                 }
         }
 
-        CDEBUG(D_INFO, "Reading inode %lu, %d bytes, offset %Ld\n",
+        CDEBUG(D_INFO, "Reading inode %lu, "LPSZ" bytes, offset %Ld\n",
                inode->i_ino, count, *ppos);
         retval = generic_file_read(filp, buf, count, ppos);
 
@@ -485,15 +522,13 @@ static ssize_t ll_file_read(struct file *filp, char *buf, size_t count,
 
         if (!(fd->fd_flags & LL_FILE_IGNORE_LOCK) &&
             !(sbi->ll_flags & LL_SBI_NOLCK)) {
-                err = obd_cancel(&sbi->ll_osc_conn, lsm, LCK_PR, lockhs);
+                err = obd_cancel(&sbi->ll_osc_conn, lsm, LCK_PR, &lockh);
                 if (err != ELDLM_OK) {
                         CERROR("lock cancel: err: %d\n", err);
                         retval = err;
                 }
         }
 
-        if (lockhs)
-                OBD_FREE(lockhs, lsm->lsm_stripe_count * sizeof(*lockhs));
         RETURN(retval);
 }
 
@@ -506,7 +541,7 @@ ll_file_write(struct file *file, const char *buf, size_t count, loff_t *ppos)
         struct ll_file_data *fd = (struct ll_file_data *)file->private_data;
         struct inode *inode = file->f_dentry->d_inode;
         struct ll_sb_info *sbi = ll_i2sbi(inode);
-        struct lustre_handle *lockhs = NULL, *eof_lockhs = NULL;
+        struct lustre_handle lockh = { 0, 0 }, eof_lockh = { 0, 0 };
         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
         int flags = 0;
         ldlm_error_t err;
@@ -520,7 +555,7 @@ ll_file_write(struct file *file, const char *buf, size_t count, loff_t *ppos)
                 if (!oa)
                         RETURN(-ENOMEM);
 
-                err = ll_size_lock(inode, lsm, 0, LCK_PW, &eof_lockhs);
+                err = ll_size_lock(inode, lsm, 0, LCK_PW, &eof_lockh);
                 if (err) {
                         obdo_free(oa);
                         RETURN(err);
@@ -545,9 +580,6 @@ ll_file_write(struct file *file, const char *buf, size_t count, loff_t *ppos)
         if (!(fd->fd_flags & LL_FILE_IGNORE_LOCK) &&
             !(sbi->ll_flags & LL_SBI_NOLCK)) {
                 struct ldlm_extent extent;
-                OBD_ALLOC(lockhs, lsm->lsm_stripe_count * sizeof(*lockhs));
-                if (!lockhs)
-                        GOTO(out_eof, retval = -ENOMEM);
                 extent.start = *ppos;
                 extent.end = *ppos + count;
                 CDEBUG(D_INFO, "Locking inode %lu, start "LPU64" end "LPU64"\n",
@@ -556,35 +588,31 @@ ll_file_write(struct file *file, const char *buf, size_t count, loff_t *ppos)
                 err = obd_enqueue(&sbi->ll_osc_conn, lsm, NULL, LDLM_EXTENT,
                                   &extent, sizeof(extent), LCK_PW, &flags,
                                   ll_lock_callback, inode, sizeof(*inode),
-                                  lockhs);
+                                  &lockh);
                 if (err != ELDLM_OK) {
                         CERROR("lock enqueue: err: %d\n", err);
-                        GOTO(out_free, retval = err);
+                        GOTO(out_eof, retval = err);
                 }
         }
 
-        CDEBUG(D_INFO, "Writing inode %lu, %ld bytes, offset "LPD64"\n",
-               inode->i_ino, (long)count, *ppos);
+        CDEBUG(D_INFO, "Writing inode %lu, "LPSZ" bytes, offset %Lu\n",
+               inode->i_ino, count, *ppos);
 
         retval = generic_file_write(file, buf, count, ppos);
 
         if (!(fd->fd_flags & LL_FILE_IGNORE_LOCK) ||
             sbi->ll_flags & LL_SBI_NOLCK) {
-                err = obd_cancel(&sbi->ll_osc_conn, lsm, LCK_PW, lockhs);
+                err = obd_cancel(&sbi->ll_osc_conn, lsm, LCK_PW, &lockh);
                 if (err != ELDLM_OK) {
                         CERROR("lock cancel: err: %d\n", err);
-                        GOTO(out_free, retval = err);
+                        GOTO(out_eof, retval = err);
                 }
         }
 
         EXIT;
- out_free:
-        if (lockhs)
-                OBD_FREE(lockhs, lsm->lsm_stripe_count * sizeof(*lockhs));
-
  out_eof:
         if (!S_ISBLK(inode->i_mode) && file->f_flags & O_APPEND) {
-                err = ll_size_unlock(inode, lsm, LCK_PW, eof_lockhs);
+                err = ll_size_unlock(inode, lsm, LCK_PW, &eof_lockh);
                 if (err && !retval)
                         retval = err;
         }
@@ -592,121 +620,54 @@ ll_file_write(struct file *file, const char *buf, size_t count, loff_t *ppos)
         return retval;
 }
 
-/* Retrieve object striping information.
- *
- * @arg is a pointer to a user struct with one or more of the fields set to
- * indicate the application preference: lmm_stripe_count, lmm_stripe_size,
- * lmm_stripe_offset, and lmm_stripe_pattern.  lmm_magic must be LOV_MAGIC.
- */
 static int ll_lov_setstripe(struct inode *inode, struct file *file,
                             unsigned long arg)
 {
         struct ll_inode_info *lli = ll_i2info(inode);
-        struct lov_mds_md *lmm = NULL, *lmmu = (void *)arg;
-        struct lustre_handle *conn = ll_i2obdconn(inode);
+        struct lustre_handle *conn;
+        struct lov_stripe_md *lsm;
         int rc;
+        ENTRY;
 
-        rc = obd_alloc_wiremd(conn, &lmm);
-        if (rc < 0)
-                RETURN(rc);
-
-        rc = copy_from_user(lmm, lmmu, sizeof(*lmm));
-        if (rc)
-                GOTO(out_free, rc = -EFAULT);
+        down(&lli->lli_open_sem);
+        lsm = lli->lli_smd;
+        if (lsm) {
+                up(&lli->lli_open_sem);
+                CERROR("stripe already set for ino %lu\n", inode->i_ino);
+                /* If we haven't already done the open, do so now */
+                if (file->f_flags & O_LOV_DELAY_CREATE) {
+                        int rc2 = ll_file_open(inode, file);
+                        if (rc2)
+                                RETURN(rc2);
+                }
 
-        if (lmm->lmm_magic != LOV_MAGIC) {
-                CERROR("bad LOV magic %X\n", lmm->lmm_magic);
-                GOTO(out_free, rc = -EINVAL);
+                RETURN(-EALREADY);
         }
 
-        down(&lli->lli_open_sem);
-        if (lli->lli_smd) {
-                CERROR("striping data already set for %lu\n", inode->i_ino);
-                GOTO(out_lov_up, rc = -EPERM);
-        }
-        rc = obd_unpackmd(conn, &lli->lli_smd, lmm);
-        if (rc < 0) {
-                CERROR("error setting LOV striping on %lu: rc = %d\n",
-                       inode->i_ino, rc);
-                GOTO(out_lov_up, rc);
-        }
+        conn = ll_i2obdconn(inode);
+
+        rc = obd_iocontrol(LL_IOC_LOV_SETSTRIPE, conn, 0, &lsm, (void *)arg);
+        if (!rc)
+                rc = ll_create_open_obj(conn, inode, file, lsm);
+        up(&lli->lli_open_sem);
 
-        rc = ll_create_objects(inode->i_sb, inode->i_ino, 0, 0, &lli->lli_smd);
         if (rc) {
-                obd_free_memmd(conn, &lli->lli_smd);
-        } else {
-                file->f_flags &= ~O_LOV_DELAY_CREATE;
-                rc = ll_file_open(inode, file);
+                obd_free_memmd(conn, &lsm);
+                RETURN(rc);
         }
-out_lov_up:
-        up(&lli->lli_open_sem);
-out_free:
-        obd_free_wiremd(conn, &lmm);
-        return rc;
+        rc = ll_osc_open(conn, inode, file, lli->lli_smd);
+        RETURN(rc);
 }
 
-/* Retrieve object striping information.
- *
- * @arg is a pointer to a user struct with lmm_ost_count indicating
- * the maximum number of OST indices which will fit in the user buffer.
- * lmm_magic must be LOV_MAGIC.
- */
 static int ll_lov_getstripe(struct inode *inode, unsigned long arg)
 {
-        struct lov_mds_md lmm, *lmmu = (void *)arg, *lmmk = NULL;
         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
         struct lustre_handle *conn = ll_i2obdconn(inode);
-        int ost_count, rc, lmm_size;
 
         if (!lsm)
                 RETURN(-ENODATA);
 
-        rc = copy_from_user(&lmm, lmmu, sizeof(lmm));
-        if (rc)
-                RETURN(-EFAULT);
-
-        if (lmm.lmm_magic != LOV_MAGIC)
-                RETURN(-EINVAL);
-
-        if (lsm->lsm_stripe_count == 0)
-                ost_count = 1;
-        else {
-                struct obd_device *obd = class_conn2obd(conn);
-                struct lov_obd *lov = &obd->u.lov;
-                ost_count = lov->desc.ld_tgt_count;
-        }
-
-        /* XXX we _could_ check if indices > user lmm_ost_count are zero */
-        if (lmm.lmm_ost_count < ost_count)
-                RETURN(-EOVERFLOW);
-
-        rc = obd_packmd(conn, &lmmk, lsm);
-        if (rc < 0)
-                RETURN(rc);
-
-        lmm_size = rc;
-
-        /* LOV STACKING layering violation to make LOV/OSC return same data */
-        if (lsm->lsm_stripe_count == 0) {
-                struct lov_object_id *loi;
-
-                loi = (void *)lmmu + offsetof(typeof(*lmmu), lmm_objects);
-                rc = copy_to_user(loi, &lsm->lsm_object_id, sizeof(*loi));
-                if (rc) {
-                        lmm_size = 0;
-                        rc = -EFAULT;
-                } else {
-                        lmmk->lmm_magic = LOV_MAGIC;
-                        lmmk->lmm_ost_count = lmmk->lmm_stripe_count = 1;
-                }
-        }
-
-        if (lmm_size && copy_to_user(lmmu, lmmk, lmm_size))
-                rc = -EFAULT;
-
-        obd_free_wiremd(conn, &lmmk);
-
-        RETURN(rc);
+        return obd_iocontrol(LL_IOC_LOV_GETSTRIPE, conn, 0, lsm, (void *)arg);
 }
 
 int ll_file_ioctl(struct inode *inode, struct file *file, unsigned int cmd,
@@ -822,7 +783,7 @@ static int ll_inode_revalidate(struct dentry *dentry)
                 rc = mdc_getattr(&sbi->ll_mdc_conn, inode->i_ino,
                                  inode->i_mode, valid, datalen, &req);
                 if (rc) {
-                        CERROR("failure %d inode "LPX64"\n", rc, inode->i_ino);
+                        CERROR("failure %d inode %lu\n", rc, inode->i_ino);
                         ptlrpc_req_finished(req);
                         RETURN(-abs(rc));
                 }