Whamcloud - gitweb
Merge b_md to HEAD for 0.5.19 release.
authoradilger <adilger>
Mon, 6 Jan 2003 22:22:15 +0000 (22:22 +0000)
committeradilger <adilger>
Mon, 6 Jan 2003 22:22:15 +0000 (22:22 +0000)
Fixes a _huge_ number of bugs:
  - Fully reactivate OST imports after reconnection (512, others)
  - Make sure client sees our -ENOTCONN from mds_handle (513 - partial)
  - More graceful error handling for truncating on dead OST (515)
  - Don't error out unless we're actually accessing dead stripes (474)
  - Fix garbage sizes when stripes are missing (410)
  - LRU counters were broken, causing constant lock purge (433, 432)
  - garbage on read from stripes with failed OSTs (441)
  - mark OSCs as active before reconnecting during recovery (438)
  - lov_enqueue and lov_cancel need to handle inactive OSTs (403)
  - lfind did not preserve OST order in output (443)
  - symlinks cause hung clients, incorrect data (439)
  - stop dereferencing request after dropping refcount (457)
  - don't LASSERT(spin_is_locked) on non-SMP (455)
  - fixes for many rename() bugs
  - fstat didn't correctly synchronize attributes (399)
  - server must handle lock cancellation during blocking AST prep (487)
  - bulk descriptors were free()d too soon (511)
  - fix paths in lconf, which would load incorrect modules (451, 507)
  - fix confusing lconf 'host not found' error message (386)
  - fix lock order deadlock on OST (O/R i_sem before journal ops, 478)
  - fix race condition in mdc_blocking_ast() for inode access (526)
  - fix lov_unpackmd() unpacking wrong number of stripes (537)
  - fix lov_set_osc_active() marking wrong OSC inactive (440)
  - fix bad lstripe lov_unpackmd() assertion (fix layering too) (527)
  - fix multiple writes of stripe MD to MDS (358, maybe 519)
  - fix lstripe in several ways (kernel side) (527)
  - fix request leak in ldlm_cli_enqueue (262)
  - incorrect OSC was marked inactive after OST failure
  - call mds_fs_cleanup before unmounting filesystem (524)
  - fix races between taking ns_lock and ldlm_lock_change_resource
  - fix races updating LOV export open file list
  - fix lov_enqueue error path, avoid decref-ing bad lock handle
  - fix recovery NULL deref in ldlm_cli_cancel_unused
  - fix some DLM races by using new hash table for lock handles (419)
  - permit the client to specify desired inodes, at replay
  - duplicate requests when we queue them for replay reintegration
  - fix last_rcvd offset calculation
  - sync after each recovered transaction, so we always make progress
  - never, not always, ERESTART requests without transnos
  - store the lov_desc in the MDS, so we don't depend on getlovinfo to set it
  - skip replay if the MDS says that the client is already connected
  - don't check for a recovery-enabled export to match lctl's UUID
  - don't INC_USE_COUNT for phantom exports
  - don't crash when cleaning up phantom exports (567)
  - don't double-finish or set replay data for errored mdc_open requests
  - abort requests when they time out, so we don't get old replies
  - send/receive replies for AST messages again
  - if the client says that it doesn't have the lock, cancel it on the server
  - if we timeout during I/O, don't try to cancel an in-use lock; instead
    mark it as destroyed, it will all work out when decref is called
  - fix module use counts (22, 581)
 * protocol changes
  - ASTs now expect a reply (server cancels lock on error reply)

118 files changed:
lustre/.cvsignore
lustre/ChangeLog
lustre/Makefile.am
lustre/cobd/.cvsignore [new file with mode: 0644]
lustre/cobd/Makefile.am [new file with mode: 0644]
lustre/cobd/cache_obd.c [new file with mode: 0644]
lustre/cobd/lproc_cache.c [new file with mode: 0644]
lustre/conf/.cvsignore [new file with mode: 0644]
lustre/conf/Makefile.am [new file with mode: 0644]
lustre/conf/lustre.dtd [new file with mode: 0644]
lustre/conf/lustre2ldif.xsl [new file with mode: 0644]
lustre/conf/slapd-lustre.conf [new file with mode: 0644]
lustre/conf/top.ldif [new file with mode: 0644]
lustre/configure.in
lustre/extN/Makefile.am
lustre/extN/extN-noread.diff [new file with mode: 0644]
lustre/extN/extN-wantedi.diff [new file with mode: 0644]
lustre/include/linux/lustre_dlm.h
lustre/include/linux/lustre_export.h
lustre/include/linux/lustre_ha.h
lustre/include/linux/lustre_idl.h
lustre/include/linux/lustre_lib.h
lustre/include/linux/lustre_lite.h
lustre/include/linux/lustre_mds.h
lustre/include/linux/lustre_net.h
lustre/include/linux/obd.h
lustre/include/linux/obd_cache.h [new file with mode: 0644]
lustre/include/linux/obd_class.h
lustre/include/linux/obd_ost.h
lustre/include/linux/obd_ptlbd.h [new file with mode: 0644]
lustre/include/linux/obd_support.h
lustre/ldlm/l_lock.c
lustre/ldlm/ldlm_extent.c
lustre/ldlm/ldlm_lock.c
lustre/ldlm/ldlm_lockd.c
lustre/ldlm/ldlm_request.c
lustre/ldlm/ldlm_resource.c
lustre/ldlm/ldlm_test.c
lustre/lib/Makefile.am
lustre/lib/client.c
lustre/lib/obd_pack.c
lustre/lib/simple.c
lustre/lib/target.c
lustre/llite/Makefile.am
lustre/llite/commit_callback.c
lustre/llite/dir.c
lustre/llite/file.c
lustre/llite/namei.c
lustre/llite/recover.c
lustre/llite/rw.c
lustre/llite/super.c
lustre/llite/super25.c
lustre/llite/symlink.c
lustre/lov/lov_obd.c
lustre/lov/lov_pack.c
lustre/lov/lproc_lov.c
lustre/mdc/Makefile.am
lustre/mdc/mdc_request.c
lustre/mds/Makefile.am
lustre/mds/handler.c
lustre/mds/lproc_mds.c
lustre/mds/mds_fs.c
lustre/mds/mds_lov.c
lustre/mds/mds_reint.c
lustre/obdclass/Makefile.am
lustre/obdclass/class_obd.c
lustre/obdclass/fsfilt.c
lustre/obdclass/fsfilt_extN.c
lustre/obdclass/fsfilt_reiserfs.c [new file with mode: 0644]
lustre/obdclass/genops.c
lustre/obdclass/lprocfs_status.c
lustre/obdclass/statfs_pack.c [moved from lustre/lib/ll_pack.c with 88% similarity]
lustre/obdecho/echo.c
lustre/obdecho/echo_client.c
lustre/obdfilter/Makefile.am
lustre/obdfilter/filter.c
lustre/osc/Makefile.am
lustre/osc/osc_request.c
lustre/ost/Makefile.am
lustre/ost/ost_handler.c
lustre/patches/.cvsignore [deleted file]
lustre/ptlbd/.cvsignore [new file with mode: 0644]
lustre/ptlbd/Makefile.am [new file with mode: 0644]
lustre/ptlbd/blk.c [new file with mode: 0644]
lustre/ptlbd/client.c [new file with mode: 0644]
lustre/ptlbd/main.c [new file with mode: 0644]
lustre/ptlbd/rpc.c [new file with mode: 0644]
lustre/ptlbd/server.c [new file with mode: 0644]
lustre/ptlrpc/client.c
lustre/ptlrpc/niobuf.c
lustre/ptlrpc/recovd.c
lustre/ptlrpc/recover.c
lustre/ptlrpc/rpc.c
lustre/ptlrpc/service.c
lustre/tests/.cvsignore
lustre/tests/Makefile.am
lustre/tests/createmany.c
lustre/tests/echo.sh [new file with mode: 0755]
lustre/tests/llecho.sh
lustre/tests/llechocleanup.sh
lustre/tests/llmount.sh
lustre/tests/llmountcleanup.sh
lustre/tests/local.sh
lustre/tests/lov.sh
lustre/tests/lovstripe.c [deleted file]
lustre/tests/sanity.sh
lustre/tests/statmany.c [new file with mode: 0644]
lustre/tests/uml.sh
lustre/tests/wantedi.c [new file with mode: 0644]
lustre/utils/automatic-reconnect-sample [new file with mode: 0755]
lustre/utils/lconf.in
lustre/utils/lctl.c
lustre/utils/lfind.c
lustre/utils/lmc
lustre/utils/lstripe.c
lustre/utils/lustre.dtd [deleted file]
lustre/utils/obd.c
lustre/utils/obdctl.h

index 111b232..34373dd 100644 (file)
@@ -12,3 +12,4 @@ TAGS
 lustre*.tar.gz
 cscope.files
 cscope.out
+autom4te-2.53.cache
index fc930e2..41e712f 100644 (file)
@@ -1,4 +1,5 @@
-TBA
+2003-01-06  Andreas Dilger  <adilger@clusterfs.com>
+       * version v0_5_19
        * bug fixes
         - Fully reactivate OST imports after reconnection (512, others)
         - Make sure client sees our -ENOTCONN from mds_handle (513 - partial)
@@ -20,6 +21,41 @@ TBA
         - fix paths in lconf, which would load incorrect modules (451, 507)
         - fix confusing lconf 'host not found' error message (386)
         - fix lock order deadlock on OST (O/R i_sem before journal ops, 478)
+        - fix race condition in mdc_blocking_ast() for inode access (526)
+        - fix lov_unpackmd() unpacking wrong number of stripes (537)
+        - fix lov_set_osc_active() marking wrong OSC inactive (440)
+        - fix bad lstripe lov_unpackmd() assertion (fix layering too) (527)
+        - fix multiple writes of stripe MD to MDS (358, maybe 519)
+        - fix lstripe in several ways (kernel side) (527)
+        - fix request leak in ldlm_cli_enqueue (262)
+        - incorrect OSC was marked inactive after OST failure
+        - call mds_fs_cleanup before unmounting filesystem (524)
+        - fix races between taking ns_lock and ldlm_lock_change_resource
+        - fix races updating LOV export open file list
+        - fix lov_enqueue error path, avoid decref-ing bad lock handle
+        - fix recovery NULL deref in ldlm_cli_cancel_unused
+        - fix some DLM races by using new hash table for lock handles (419)
+        - permit the client to specify desired inodes, at replay
+        - duplicate requests when we queue them for replay reintegration
+        - fix last_rcvd offset calculation
+        - sync after each recovered transaction, so we always make progress
+        - never, not always, ERESTART requests without transnos
+        - store the lov_desc in the MDS, so we don't depend on getlovinfo to
+          set it
+        - skip replay if the MDS says that the client is already connected
+        - don't check for a recovery-enabled export to match lctl's UUID
+        - don't INC_USE_COUNT for phantom exports
+        - don't crash when cleaning up phantom exports (567)
+        - don't double-finish or set replay data for errored mdc_open requests
+        - abort requests when they time out, so we don't get old replies
+        - send/receive replies for AST messages again
+        - if the client says that it doesn't have the lock, cancel it on the
+          server
+        - if we timeout during I/O, don't try to cancel an in-use lock; instead
+          mark it as destroyed, it will all work out when decref is called
+        - fix module use counts (22, 581)
+       * protocol changes
+        - ASTs now expect a reply (server cancels lock on error reply)
 
 2002-12-02  Andreas Dilger  <adilger@clusterfs.com>
        * version v0_5_18
index 6e9281d..b0d8dd3 100644 (file)
@@ -12,8 +12,8 @@ DIRS24 = extN mds
 endif
 
 # NOTE: keep extN before mds and obdfilter
-SUBDIRS = $(DIRS24) obdclass utils ptlrpc ldlm lib obdfilter mdc osc ost llite 
-SUBDIRS+= obdecho lov tests doc scripts 
+SUBDIRS = $(DIRS24) obdclass utils ptlrpc ldlm lib obdfilter mdc osc ost llite
+SUBDIRS+= obdecho lov cobd ptlbd tests doc scripts conf
 
 DIST_SUBDIRS = $(SUBDIRS)
 EXTRA_DIST = BUGS FDL Rules include archdep.m4
@@ -28,4 +28,4 @@ dist-hook:
 include $(top_srcdir)/Rules
 
 rpms: dist Makefile
-       rpm -ta $(distdir).tar.gz
+       rpmbuild -ta $(distdir).tar.gz
diff --git a/lustre/cobd/.cvsignore b/lustre/cobd/.cvsignore
new file mode 100644 (file)
index 0000000..e995588
--- /dev/null
@@ -0,0 +1,3 @@
+.deps
+Makefile
+Makefile.in
diff --git a/lustre/cobd/Makefile.am b/lustre/cobd/Makefile.am
new file mode 100644 (file)
index 0000000..781c6ce
--- /dev/null
@@ -0,0 +1,15 @@
+# Copyright (C) 2002 Cluster File Systems, Inc.
+#
+# This code is issued under the GNU General Public License.
+# See the file COPYING in this distribution
+
+DEFS=
+
+MODULE = cobd
+modulefs_DATA = cobd.o
+EXTRA_PROGRAMS = cobd
+LINX=
+
+cobd_SOURCES = cache_obd.c lproc_cache.c $(LINX)
+
+include $(top_srcdir)/Rules
diff --git a/lustre/cobd/cache_obd.c b/lustre/cobd/cache_obd.c
new file mode 100644 (file)
index 0000000..ac921d8
--- /dev/null
@@ -0,0 +1,329 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
+ *
+ * This code is issued under the GNU General Public License.
+ * See the file COPYING in this distribution
+ */
+
+#define DEBUG_SUBSYSTEM S_COBD
+
+#include <linux/obd_support.h>
+#include <linux/lustre_lib.h>
+#include <linux/lustre_net.h>
+#include <linux/lustre_idl.h>
+#include <linux/obd_class.h>
+#include <linux/obd_cache.h>
+
+extern struct lprocfs_vars status_var_nm_1[];
+extern struct lprocfs_vars status_class_var[];
+
+static int
+cobd_attach (struct obd_device *dev, obd_count len, void *data)
+{
+       return (lprocfs_reg_obd (dev, status_var_nm_1, dev));
+}
+
+static int
+cobd_detach (struct obd_device *dev)
+{
+       return (lprocfs_dereg_obd (dev));
+}
+
+static int
+cobd_setup (struct obd_device *dev, obd_count len, void *buf)
+{
+        struct obd_ioctl_data *data = (struct obd_ioctl_data *)buf;
+        struct cache_obd  *cobd = &dev->u.cobd;
+        struct obd_device *target;
+        struct obd_device *cache;
+        int                rc;
+        
+        if (data->ioc_inlbuf1 == NULL ||
+            data->ioc_inlbuf2 == NULL)
+                return (-EINVAL);
+        
+        target = class_uuid2obd (data->ioc_inlbuf1);
+        cache  = class_uuid2obd (data->ioc_inlbuf2);
+        if (target == NULL ||
+            cache == NULL)
+                return (-EINVAL);
+        
+        /* don't bother checking attached/setup; 
+         * obd_connect() should, and it can change underneath us */
+
+        rc = obd_connect (&cobd->cobd_target, target, NULL, NULL, NULL);
+        if (rc != 0)
+                return (rc);
+
+        rc = obd_connect (&cobd->cobd_cache, cache, NULL, NULL, NULL);
+        if (rc != 0)
+                goto fail_0;
+
+        return (0);
+
+ fail_0:
+        obd_disconnect (&cobd->cobd_target);
+        return (rc);
+}
+
+static int
+cobd_cleanup (struct obd_device *dev)
+{
+        struct cache_obd  *cobd = &dev->u.cobd;
+        int                rc;
+        
+        if (!list_empty (&dev->obd_exports))
+                return (-EBUSY);
+        
+        rc = obd_disconnect (&cobd->cobd_cache);
+        if (rc != 0)
+                CERROR ("error %d disconnecting cache\n", rc);
+        
+        rc = obd_disconnect (&cobd->cobd_target);
+        if (rc != 0)
+                CERROR ("error %d disconnecting target\n", rc);
+
+        return (0);
+}
+
+static int
+cobd_connect (struct lustre_handle *conn, struct obd_device *obd,
+              obd_uuid_t cluuid, struct recovd_obd *recovd,
+              ptlrpc_recovery_cb_t recover)
+{
+        int rc = class_connect (conn, obd, cluuid);
+
+        CERROR ("rc %d\n", rc);
+        return (rc);
+}
+
+static int
+cobd_disconnect (struct lustre_handle *conn)
+{
+       int rc = class_disconnect (conn);
+       
+        CERROR ("rc %d\n", rc);
+       return (rc);
+}
+
+static int 
+cobd_get_info(struct lustre_handle *conn, obd_count keylen,
+              void *key, obd_count *vallen, void **val)
+{
+        struct obd_device *obd = class_conn2obd(conn);
+        struct cache_obd  *cobd;
+
+        if (obd == NULL) {
+                CERROR("invalid client "LPX64"\n", conn->addr);
+                return -EINVAL;
+        }
+
+        cobd = &obd->u.cobd;
+
+        /* intercept cache utilisation info? */
+
+        return (obd_get_info (&cobd->cobd_target, 
+                              keylen, key, vallen, val));
+}
+
+static int 
+cobd_statfs(struct lustre_handle *conn, struct obd_statfs *osfs)
+{
+        struct obd_device *obd = class_conn2obd(conn);
+        struct cache_obd  *cobd;
+
+        if (obd == NULL) {
+                CERROR("invalid client "LPX64"\n", conn->addr);
+                return -EINVAL;
+        }
+
+        cobd = &obd->u.cobd;
+        return (obd_statfs (&cobd->cobd_target, osfs));
+}
+
+static int 
+cobd_getattr(struct lustre_handle *conn, struct obdo *oa,
+             struct lov_stripe_md *lsm)
+{
+        struct obd_device *obd = class_conn2obd(conn);
+        struct cache_obd  *cobd;
+
+        if (obd == NULL) {
+                CERROR("invalid client "LPX64"\n", conn->addr);
+                return -EINVAL;
+        }
+
+        cobd = &obd->u.cobd;
+        return (obd_getattr (&cobd->cobd_target, oa, lsm));
+}
+
+static int 
+cobd_open(struct lustre_handle *conn, struct obdo *oa,
+          struct lov_stripe_md *lsm)
+{
+        struct obd_device *obd = class_conn2obd(conn);
+        struct cache_obd  *cobd;
+
+        if (obd == NULL) {
+                CERROR("invalid client "LPX64"\n", conn->addr);
+                return -EINVAL;
+        }
+
+        cobd = &obd->u.cobd;
+        return (obd_open (&cobd->cobd_target, oa, lsm));
+}
+
+static int 
+cobd_close(struct lustre_handle *conn, struct obdo *oa,
+           struct lov_stripe_md *lsm)
+{
+        struct obd_device *obd = class_conn2obd(conn);
+        struct cache_obd  *cobd;
+
+        if (obd == NULL) {
+                CERROR("invalid client "LPX64"\n", conn->addr);
+                return -EINVAL;
+        }
+
+        cobd = &obd->u.cobd;
+        return (obd_close (&cobd->cobd_target, oa, lsm));
+}
+
+static int 
+cobd_preprw(int cmd, struct lustre_handle *conn,
+            int objcount, struct obd_ioobj *obj,
+            int niocount, struct niobuf_remote *nb,
+            struct niobuf_local *res, void **desc_private)
+{
+        struct obd_device *obd = class_conn2obd(conn);
+        struct cache_obd  *cobd;
+
+        if (obd == NULL) {
+                CERROR("invalid client "LPX64"\n", conn->addr);
+                return -EINVAL;
+        }
+
+        if ((cmd & OBD_BRW_WRITE) != 0)
+                return -EOPNOTSUPP;
+        
+        cobd = &obd->u.cobd;
+        return (obd_preprw (cmd, &cobd->cobd_target, 
+                            objcount, obj, 
+                            niocount, nb, 
+                            res, desc_private));
+}
+
+static int 
+cobd_commitrw(int cmd, struct lustre_handle *conn,
+              int objcount, struct obd_ioobj *obj,
+              int niocount, struct niobuf_local *local,
+              void *desc_private)
+{
+        struct obd_device *obd = class_conn2obd(conn);
+        struct cache_obd  *cobd;
+
+        if (obd == NULL) {
+                CERROR("invalid client "LPX64"\n", conn->addr);
+                return -EINVAL;
+        }
+
+        if ((cmd & OBD_BRW_WRITE) != 0)
+                return -EOPNOTSUPP;
+        
+        cobd = &obd->u.cobd;
+        return (obd_commitrw (cmd, &cobd->cobd_target,
+                              objcount, obj,
+                              niocount, local,
+                              desc_private));
+}
+
+static inline int 
+cobd_brw(int cmd, struct lustre_handle *conn,
+         struct lov_stripe_md *lsm, obd_count oa_bufs,
+         struct brw_page *pga, struct obd_brw_set *set)
+{
+        struct obd_device *obd = class_conn2obd(conn);
+        struct cache_obd  *cobd;
+
+        if (obd == NULL) {
+                CERROR("invalid client "LPX64"\n", conn->addr);
+                return -EINVAL;
+        }
+
+        if ((cmd & OBD_BRW_WRITE) != 0)
+                return -EOPNOTSUPP;
+        
+        cobd = &obd->u.cobd;
+        return (obd_brw (cmd, &cobd->cobd_target, 
+                         lsm, oa_bufs, pga, set));
+}
+
+static int 
+cobd_iocontrol(unsigned int cmd, struct lustre_handle *conn, int len,
+               void *karg, void *uarg)
+{
+        struct obd_device *obd = class_conn2obd(conn);
+        struct cache_obd  *cobd;
+
+        if (obd == NULL) {
+                CERROR("invalid client "LPX64"\n", conn->addr);
+                return -EINVAL;
+        }
+
+        /* intercept? */
+
+        cobd = &obd->u.cobd;
+        return (obd_iocontrol (cmd, &cobd->cobd_target, len, karg, uarg));
+}
+
+static struct obd_ops cobd_ops = {
+        o_owner:                THIS_MODULE,
+        o_attach:               cobd_attach,
+        o_detach:               cobd_detach,
+
+        o_setup:                cobd_setup,
+        o_cleanup:              cobd_cleanup,
+
+        o_connect:              cobd_connect,
+        o_disconnect:           cobd_disconnect,
+
+        o_get_info:             cobd_get_info,
+        o_statfs:               cobd_statfs,
+
+        o_getattr:              cobd_getattr,
+        o_open:                 cobd_open,
+        o_close:                cobd_close,
+        o_preprw:               cobd_preprw,
+        o_commitrw:             cobd_commitrw,
+        o_brw:                  cobd_brw,
+        o_iocontrol:            cobd_iocontrol,
+};
+
+static int __init
+cobd_init (void)
+{
+       int   rc;
+       
+       printk (KERN_INFO "Lustre Caching OBD driver\n");
+       
+       rc = class_register_type (&cobd_ops, status_class_var,
+                                 OBD_CACHE_DEVICENAME);
+       return (rc);
+}
+
+static void __exit
+cobd_exit (void)
+{
+       class_unregister_type (OBD_CACHE_DEVICENAME);
+}
+
+MODULE_AUTHOR("Cluster Filesystems Inc. <info@clusterfs.com>");
+MODULE_DESCRIPTION("Lustre Caching OBD driver");
+MODULE_LICENSE("GPL");
+
+module_init(cobd_init);
+module_exit(cobd_exit);
+
+       
diff --git a/lustre/cobd/lproc_cache.c b/lustre/cobd/lproc_cache.c
new file mode 100644 (file)
index 0000000..5adcaf8
--- /dev/null
@@ -0,0 +1,95 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ *  Copyright (C) 2002 Cluster File Systems, Inc.
+ *
+ *   This file is part of Lustre, http://www.lustre.org.
+ *
+ *   Lustre is free software; you can redistribute it and/or
+ *   modify it under the terms of version 2 of the GNU General Public
+ *   License as published by the Free Software Foundation.
+ *
+ *   Lustre is distributed in the hope that it will be useful,
+ *   but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *   GNU General Public License for more details.
+ *
+ *   You should have received a copy of the GNU General Public License
+ *   along with Lustre; if not, write to the Free Software
+ *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ *
+ */
+#define DEBUG_SUBSYSTEM S_CLASS
+
+#include <linux/lustre_lite.h>
+#include <linux/lprocfs_status.h>
+
+/*
+ * Common STATUS namespace
+ */
+
+static int rd_uuid (char *page, char **start, off_t off, int count,
+                    int *eof, void *data)
+{
+        struct obd_device* dev = (struct obd_device*)data;
+
+        return (snprintf(page, count, "%s\n", dev->obd_uuid));
+}
+
+static int rd_target (char *page, char **start, off_t off, int count,
+                      int *eof, void *data)
+{
+        struct obd_device    *dev = (struct obd_device*)data;
+        struct cache_obd     *cobd = &dev->u.cobd;
+       struct lustre_handle *conn = &cobd->cobd_target;
+       struct obd_export    *exp;
+       int    rc;
+
+       if ((dev->obd_flags & OBD_SET_UP) == 0)
+               rc = snprintf (page, count, "not set up\n");
+       else {
+               exp = class_conn2export (conn);
+               LASSERT (exp != NULL);
+               rc = snprintf(page, count, "%s\n", exp->exp_obd->obd_uuid);
+       }
+       return (rc);
+}
+
+static int rd_cache(char *page, char **start, off_t off, int count,
+                    int *eof, void *data)
+{
+        struct obd_device    *dev = (struct obd_device*)data;
+       struct cache_obd     *cobd = &dev->u.cobd;
+       struct lustre_handle *conn = &cobd->cobd_cache;
+       struct obd_export    *exp;
+       int    rc;
+
+       if ((dev->obd_flags & OBD_SET_UP) == 0)
+               rc = snprintf (page, count, "not set up\n");
+       else {
+               exp = class_conn2export (conn);
+               LASSERT (exp != NULL);
+               rc = snprintf(page, count, "%s\n", exp->exp_obd->obd_uuid);
+       }
+       return (rc);
+}
+
+struct lprocfs_vars status_var_nm_1[] = {
+        {"status/uuid", rd_uuid, 0, 0},
+        {"status/target_uuid", rd_target, 0, 0},
+        {"status/cache_uuid", rd_cache, 0, 0},
+        {0}
+};
+
+int rd_numrefs(char *page, char **start, off_t off, int count,
+               int *eof, void *data)
+{
+        struct obd_type* class = (struct obd_type*)data;
+
+        return (snprintf(page, count, "%d\n", class->typ_refcnt));
+}
+
+struct lprocfs_vars status_class_var[] = {
+        {"status/num_refs", rd_numrefs, 0, 0},
+        {0}
+};
diff --git a/lustre/conf/.cvsignore b/lustre/conf/.cvsignore
new file mode 100644 (file)
index 0000000..282522d
--- /dev/null
@@ -0,0 +1,2 @@
+Makefile
+Makefile.in
diff --git a/lustre/conf/Makefile.am b/lustre/conf/Makefile.am
new file mode 100644 (file)
index 0000000..7f98129
--- /dev/null
@@ -0,0 +1,13 @@
+# Copyright (C) 2001  Cluster File Systems, Inc.
+#
+# This code is issued under the GNU General Public License.
+# See the file COPYING in this distribution
+
+EXTRA_DIST = lustre2ldif.xsl lustre.dtd lustre.schema slapd-lustre.conf
+ldapconfdir = $(sysconfdir)/openldap
+ldapschemadir = $(sysconfdir)/openldap/schema
+ldapconf_SCRIPTS = slapd-lustre.conf
+ldapschema_SCRIPTS = lustre.schema
+
+include $(top_srcdir)/Rules
+
diff --git a/lustre/conf/lustre.dtd b/lustre/conf/lustre.dtd
new file mode 100644 (file)
index 0000000..73f7c95
--- /dev/null
@@ -0,0 +1,111 @@
+<!-- Lustre Management DTD -->
+
+<!-- basic entities -->
+<!ENTITY % object.content "(#PCDATA)">
+<!ENTITY % object.attr "
+             name CDATA #REQUIRED
+             uuid CDATA #REQUIRED">
+
+<!ENTITY % objref.content "(#PCDATA)">
+<!ENTITY % objref.attr    "uuidref CDATA #REQUIRED">
+
+<!-- main elements -->
+<!ELEMENT lustre (node | profile | mountpoint | ldlm | echoclient |
+                  mds | obd | ost | lov | lovconfig)*>
+
+<!ELEMENT node (network | profile_ref)*>
+<!ATTLIST node %object.attr;
+               router CDATA #IMPLIED>
+               
+<!ELEMENT network (nid | port | route_tbl | sendmem | recvmem)*>
+<!ATTLIST network %object.attr;
+                  nettype (tcp | elan | gm) 'tcp'>
+
+<!ELEMENT routetbl (route)*>
+<!ATTLIST routetbl %object.attr;>
+<!ELEMENT route %object.content;>
+<!ATTLIST route type (elan | tcp | gm) #REQUIRED
+                gw CDATA #REQUIRED
+                lo CDATA #REQUIRED
+                hi CDATA #IMPLIED >
+
+<!ELEMENT profile (ldlm_ref | network_ref | obd_ref | ost_ref |
+                   echoclient_ref | mdsdev_ref | lov_ref |
+                   lovconfig_ref| mountpoint_ref)*>
+<!ATTLIST profile %object.attr;>
+
+<!ELEMENT mountpoint (path | fileset | mds_ref | obd_ref)*>
+<!ATTLIST mountpoint %object.attr;>
+
+<!ELEMENT echoclient (obd_ref)>
+<!ATTLIST echoclient %object.attr;>
+
+<!ELEMENT ldlm EMPTY>
+<!ATTLIST ldlm %object.attr;>
+
+<!ELEMENT obd (fstype | devpath | devsize | autoformat | active_ref)*>
+<!ATTLIST obd %object.attr; 
+              obdtype (obdfilter | obdecho) 'obdfilter'>
+
+<!ELEMENT ost (network_ref | obd_ref | failover_ref)*>
+<!ATTLIST ost %object.attr;>
+
+<!ELEMENT mds (active_ref)*>
+<!ATTLIST mds %object.attr;>
+
+<!ELEMENT mdsdev (fstype | devpath | devsize | autoformat | 
+                  mds_ref | network_ref )*>
+<!ATTLIST mdsdev %object.attr;>
+
+<!ELEMENT lov (mds_ref |(obd_ref)+)*>
+<!ATTLIST lov %object.attr;
+               stripesize    CDATA #REQUIRED
+               stripecount   CDATA #REQUIRED
+               stripeoffset  CDATA #IMPLIED
+               stripepattern CDATA #REQUIRED>
+
+<!ELEMENT lovconfig (lov_ref)>
+<!ATTLIST lovconfig %object.attr;>
+
+<!-- basic elements -->
+<!ELEMENT fstype        %object.content;>
+<!ELEMENT nid           %object.content;>
+<!ELEMENT port          %object.content;>
+<!ELEMENT send_mem      %object.content;>
+<!ELEMENT recv_mem      %object.content;>
+<!ELEMENT autoformat    %object.content;>
+<!ELEMENT activetarget  %object.content;>
+<!ELEMENT devpath       %object.content;>
+<!ELEMENT devsize       %object.content;>
+<!ELEMENT path          %object.content;>
+<!ELEMENT fileset       %object.content;>
+
+<!-- object reference tag elements -->
+<!ELEMENT network_ref     %objref.content;>
+<!ATTLIST network_ref     %objref.attr;>
+<!ELEMENT node_ref        %objref.content;>
+<!ATTLIST node_ref        %objref.attr;>
+<!ELEMENT profile_ref     %objref.content;>
+<!ATTLIST profile_ref     %objref.attr;>
+<!ELEMENT obd_ref         %objref.content;>
+<!ATTLIST obd_ref         %objref.attr;>
+<!ELEMENT mds_ref         %objref.content;>
+<!ATTLIST mds_ref         %objref.attr;>
+<!ELEMENT mdsdev_ref      %objref.content;>
+<!ATTLIST mdsdev_ref      %objref.attr;>
+<!ELEMENT ost_ref         %objref.content;>
+<!ATTLIST ost_ref         %objref.attr;>
+<!ELEMENT lov_ref         %objref.content;>
+<!ATTLIST lov_ref         %objref.attr;>
+<!ELEMENT lovconfig_ref   %objref.content;>
+<!ATTLIST lovconfig_ref   %objref.attr;>
+<!ELEMENT mountpoint_ref  %objref.content;>
+<!ATTLIST mountpoint_ref  %objref.attr;>
+<!ELEMENT echoclient_ref %objref.content;>
+<!ATTLIST echoclient_ref %objref.attr;>
+<!ELEMENT failover_ref    %objref.content;>
+<!ATTLIST failover_ref    %objref.attr;>
+<!ELEMENT ldlm_ref        %objref.content;>
+<!ATTLIST ldlm_ref        %objref.attr;>
+
+
diff --git a/lustre/conf/lustre2ldif.xsl b/lustre/conf/lustre2ldif.xsl
new file mode 100644 (file)
index 0000000..f5d8098
--- /dev/null
@@ -0,0 +1,212 @@
+<?xml version="1.0" encoding="iso-8859-1"?>
+<stylesheet version="1.0" xmlns="http://www.w3.org/1999/XSL/Transform"> 
+<output omit-xml-declaration="yes" />
+<strip-space elements="*"/>
+<param name="config">fs=lustre</param>
+<variable name="basedn">config=<value-of select="$config"/>,fs=lustre</variable>
+
+<template match="lustre">
+dn: <value-of select="$basedn"/>
+uuid: CONFIG_UUID
+objectClass: LUSTRECONFIG
+config: <value-of select="$config"/>
+<text>
+</text><apply-templates/>
+</template>
+
+<template match="node">
+dn: uuid=<value-of select="@uuid"/>,<value-of select="$basedn"/>
+objectClass: NODE
+lustreName: <value-of select="@name"/>
+uuid: <value-of select="@uuid"/>
+networkRef: <value-of select="network/@uuid"/>
+<for-each select="profile_ref">
+profileRef: <value-of select="@uuidref"/>
+</for-each>
+<text>
+</text><apply-templates/>
+</template>
+
+<template match="profile">
+dn: uuid=<value-of select="@uuid"/>,<value-of select="$basedn"/>
+objectClass: PROFILE
+lustreName: <value-of select="@name"/>
+uuid: <value-of select="@uuid"/><apply-templates/>
+<text>
+</text>
+</template>
+
+<template match="network">
+dn: uuid=<value-of select="@uuid"/>,<value-of select="$basedn"/>
+objectClass: NETWORK
+lustreName: <value-of select="@name"/>
+uuid: <value-of select="@uuid"/>
+nettype: <value-of select="@nettype"/>
+nid: <value-of select="nid"/>
+<if test="port">
+port: <value-of select="port"/>
+</if>
+<text>
+</text>
+</template>
+
+<template match="mds">
+dn: uuid=<value-of select="@uuid"/>,<value-of select="$basedn"/>
+objectClass: MDS
+lustreName: <value-of select="@name"/>
+uuid: <value-of select="@uuid"/><apply-templates/>
+<text>
+</text>
+</template>
+
+<template match="mdsdev">
+dn: uuid=<value-of select="@uuid"/>,<value-of select="$basedn"/>
+objectClass: MDSDEV
+lustreName: <value-of select="@name"/>
+uuid: <value-of select="@uuid"/>
+<if test="fstype">
+fstype: <value-of select="fstype"/>
+</if>
+<if test="autoformat">
+autoformat: <value-of select="autoformat"/>
+</if>
+<if test="devpath">
+devpath: <value-of select="devpath"/>
+</if>
+<if test="devsize">
+devsize: <value-of select="devsize"/>
+</if>
+networkRef: <value-of select="network_ref/@uuidref"/>
+mdsRef: <value-of select="mds_ref/@uuidref"/>
+<text>
+</text>
+</template>
+
+<template match="lov">
+dn: uuid=<value-of select="@uuid"/>,<value-of select="$basedn"/>
+objectClass: LOV
+lustreName: <value-of select="@name"/>
+uuid: <value-of select="@uuid"/>
+mdsRef: <value-of select="mds_ref/@uuidref"/>
+stripepattern: <value-of select="@stripepattern"/>
+stripesize: <value-of select="@stripesize"/>
+stripecount: <value-of select="@stripecount"/><apply-templates/>
+<text>
+</text>
+</template>
+
+<template match="lovconfig">
+dn: uuid=<value-of select="@uuid"/>,<value-of select="$basedn"/>
+objectClass: LOVCONFIG
+lustreName: <value-of select="@name"/>
+uuid: <value-of select="@uuid"/><apply-templates/>
+<text>
+</text>
+</template>
+
+<template match="obd">
+dn: uuid=<value-of select="@uuid"/>,<value-of select="$basedn"/>
+objectClass: OBD
+lustreName: <value-of select="@name"/>
+uuid: <value-of select="@uuid"/>
+activeRef: <value-of select="active_ref/@uuidref"/>
+obdtype: <value-of select="@obdtype"/>
+<if test="fstype">
+fstype: <value-of select="fstype"/>
+</if>
+<if test="autoformat">
+autoformat: <value-of select="autoformat"/>
+</if>
+<if test="devpath">
+devpath: <value-of select="devpath"/>
+</if>
+<if test="devsize">
+devsize: <value-of select="devsize"/>
+</if>
+<text>
+</text>
+</template>
+
+<template match="ost">
+dn: uuid=<value-of select="@uuid"/>,<value-of select="$basedn"/>
+objectClass: OST
+lustreName: <value-of select="@name"/>
+uuid: <value-of select="@uuid"/><apply-templates/>
+<text>
+</text>
+</template>
+
+<template match="mountpoint">
+dn: uuid=<value-of select="@uuid"/>,<value-of select="$basedn"/>
+objectClass: MOUNTPOINT
+lustreName: <value-of select="@name"/>
+uuid: <value-of select="@uuid"/><apply-templates/>
+<text>
+</text>
+</template>
+
+<template match="echoclient">
+dn: uuid=<value-of select="@uuid"/>,<value-of select="$basedn"/>
+objectClass: ECHOCLIENT
+lustreName: <value-of select="@name"/>
+uuid: <value-of select="@uuid"/><apply-templates/>
+<text>
+</text>
+</template>
+
+<template match="ldlm">
+dn: uuid=<value-of select="@uuid"/>,<value-of select="$basedn"/>
+objectClass: LDLM
+lustreName: <value-of select="@name"/>
+uuid: <value-of select="@uuid"/>
+<text>
+</text>
+</template>
+
+
+<template match="ldlm_ref">
+ldlmRef: <value-of select="@uuidref"/>
+</template>
+
+<template match="obd_ref">
+obdRef: <value-of select="@uuidref"/>
+</template>
+
+<template match="ost_ref">
+ostRef: <value-of select="@uuidref"/>
+</template>
+
+<template match="network_ref">
+networkRef: <value-of select="@uuidref"/>
+</template>
+
+<template match="mds_ref">
+mdsRef: <value-of select="@uuidref"/>
+</template>
+
+<template match="mountpoint_ref">
+mountpointRef: <value-of select="@uuidref"/>
+</template>
+
+<template match="echoclient_ref">
+echoclientRef: <value-of select="@uuidref"/>
+</template>
+
+<template match="lov_ref">
+lovRef: <value-of select="@uuidref"/>
+</template>
+
+<template match="lovconfig_ref">
+lovconfigRef: <value-of select="@uuidref"/>
+</template>
+
+<template match="path">
+path: <value-of select="."/>
+</template>
+
+<template match="active_ref">
+activeRef: <value-of select="@uuidref"/>
+</template>
+</stylesheet>
+
+
diff --git a/lustre/conf/slapd-lustre.conf b/lustre/conf/slapd-lustre.conf
new file mode 100644 (file)
index 0000000..de89c76
--- /dev/null
@@ -0,0 +1,12 @@
+#######################################################################
+# lustre ldap config database
+# $Id: slapd-lustre.conf,v 1.2 2003/01/06 22:17:53 adilger Exp $
+#######################################################################
+
+database       ldbm
+suffix         "fs=lustre"
+rootdn         "cn=Manager,fs=lustre"
+include                /etc/openldap/schema/lustre.schema
+rootpw         secret
+directory      /var/lib/ldap/lustre
+index           objectClass eq, uuid eq
diff --git a/lustre/conf/top.ldif b/lustre/conf/top.ldif
new file mode 100644 (file)
index 0000000..8629444
--- /dev/null
@@ -0,0 +1,4 @@
+dn: fs=lustre
+fs:lustre
+objectClass: lustre
+desc: Lustre Config
index bd378b7..c172cd2 100644 (file)
@@ -128,6 +128,7 @@ AC_SUBST(demodir)
 
 AC_OUTPUT(Makefile lib/Makefile ldlm/Makefile obdecho/Makefile ptlrpc/Makefile \
        lov/Makefile osc/Makefile mdc/Makefile mds/Makefile ost/Makefile \
+       cobd/Makefile ptlbd/Makefile conf/Makefile \
        utils/Makefile utils/lconf tests/Makefile obdfilter/Makefile \
         obdclass/Makefile llite/Makefile doc/Makefile scripts/Makefile \
        scripts/lustre.spec extN/Makefile, chmod +x utils/lconf)
index 33c6d07..5ad1642 100644 (file)
@@ -16,7 +16,8 @@ EXTRA_PROGRAMS = extN
 EXTN_FIXES = patch-2.4.18-chaos22
 #EXTN_FIXES = ext3-2.4.18-fixes.diff
 EXTNP = htree-ext3-2.4.18.diff linux-2.4.18ea-0.8.26.diff
-EXTNP+= ext3-2.4.18-ino_sb_macro.diff extN-misc-fixup.diff
+EXTNP+= ext3-2.4.18-ino_sb_macro.diff extN-misc-fixup.diff extN-noread.diff
+EXTNP+= extN-wantedi.diff
 EXTNC = balloc.c bitmap.c dir.c file.c fsync.c ialloc.c inode.c ioctl.c
 EXTNC+= namei.c super.c symlink.c
 EXTNI = extN_fs.h extN_fs_i.h extN_fs_sb.h extN_jbd.h quotaops.h
@@ -52,31 +53,27 @@ diff:
        $(RM) extN.patchT
        l='$(EXTNC)'; for f in $$l; do                                        \
           echo "$$f";                                                         \
-          (diff -u $(extN_orig)/$$f extN/$$f) >> extN.patchT;              \
-          test $$? -le 1 || exit 1;                                       
+          (diff -u $(extN_orig)/$$f extN/$$f) >> extN.patchT;                 \
+          test $$? -le 1 || exit 1;                                           \
        done
        l='$(EXTNI)'; for f in $$l; do                                        \
           echo "$$f";                                                         \
           (diff -u $(extN_include_orig)/$$f $(top_srcdir)/include/linux/$$f)>>extN.patchT;\
-          test $$? -le 1 || exit 1;                                          
+          test $$? -le 1 || exit 1;                                           \
        done
        l='$(EXTN_EXTRA)'; for f in $$l; do                                   \
           f=`echo "$$f" | sed 's%^fs/%%'`;                                    \
           echo "$$f";                                                         \
           (cd $(top_srcdir) &&                                                \
             diff -u /dev/null $$f) >> extN.patchT;                            \
-          test $$? -le 1 || exit 1;                                          
+          test $$? -le 1 || exit 1;                                           \
        done
        mv -f extN.patchT $(top_builddir)/$(subdir)/extN.patch-$(RELEASE)
        echo "Don't forget to add $(srcdir)/extN.patch-$(RELEASE) to CVS!"
 
-
-
 .PHONY: diff
 
 # Just do the SUB transformation on all our source files.
-
-
 sed-stamp:
        $(RM) $@
        rm -rf $(extN_orig) $(extN_include_orig)
diff --git a/lustre/extN/extN-noread.diff b/lustre/extN/extN-noread.diff
new file mode 100644 (file)
index 0000000..463516c
--- /dev/null
@@ -0,0 +1,225 @@
+diff -ru lustre-head/fs/extN/ialloc.c lustre/fs/extN/ialloc.c
+--- lustre-head/fs/extN/ialloc.c       Mon Dec 23 10:02:58 2002
++++ lustre/fs/extN/ialloc.c    Mon Dec 23 09:46:20 2002
+@@ -289,6 +289,37 @@
+ }
+ /*
++ * @block_group: block group of inode
++ * @offset: relative offset of inode within @block_group
++ *
++ * Check whether any of the inodes in this disk block are in use.
++ *
++ * Caller must be holding superblock lock (group/bitmap read lock in future).
++ */
++int extN_itable_block_used(struct super_block *sb, unsigned int block_group,
++                         int offset)
++{
++      int bitmap_nr = load_inode_bitmap(sb, block_group);
++      int inodes_per_block;
++      unsigned long inum, iend;
++      struct buffer_head *ibitmap;
++
++      if (bitmap_nr < 0)
++              return 1;
++
++      inodes_per_block = sb->s_blocksize / EXTN_SB(sb)->s_inode_size;
++      inum = offset & ~(inodes_per_block - 1);
++      iend = inum + inodes_per_block;
++      ibitmap = EXTN_SB(sb)->s_inode_bitmap[bitmap_nr];
++      for (; inum < iend; inum++) {
++              if (inum != offset && extN_test_bit(inum, ibitmap->b_data))
++                      return 1;
++      }
++
++      return 0;
++}
++
++/*
+  * There are two policies for allocating an inode.  If the new inode is
+  * a directory, then a forward search is made for a block group with both
+  * free space and a low directory-to-inode ratio; if that fails, then of
+@@ -312,6 +343,7 @@
+       struct extN_group_desc * gdp;
+       struct extN_group_desc * tmp;
+       struct extN_super_block * es;
++      struct extN_iloc iloc;
+       int err = 0;
+       /* Cannot create files in a deleted directory */
+@@ -505,7 +538,7 @@
+       ei->i_prealloc_count = 0;
+ #endif
+       ei->i_block_group = i;
+-      
++
+       if (ei->i_flags & EXTN_SYNC_FL)
+               inode->i_flags |= S_SYNC;
+       if (IS_SYNC(inode))
+@@ -514,9 +547,18 @@
+       inode->i_generation = sbi->s_next_generation++;
+       ei->i_state = EXTN_STATE_NEW;
+-      err = extN_mark_inode_dirty(handle, inode);
++      err = extN_get_inode_loc_new(inode, &iloc, 1);
+       if (err) goto fail;
+-      
++      BUFFER_TRACE(iloc->bh, "get_write_access");
++      err = extN_journal_get_write_access(handle, iloc.bh);
++      if (err) {
++              brelse(iloc.bh);
++              iloc.bh = NULL;
++              goto fail;
++      }
++      err = extN_mark_iloc_dirty(handle, inode, &iloc);
++      if (err) goto fail;
++
+       unlock_super (sb);
+       if(DQUOT_ALLOC_INODE(inode)) {
+               DQUOT_DROP(inode);
+diff -ru lustre-head/fs/extN/inode.c lustre/fs/extN/inode.c
+--- lustre-head/fs/extN/inode.c        Mon Dec 23 10:02:58 2002
++++ lustre/fs/extN/inode.c     Mon Dec 23 09:50:25 2002
+@@ -2011,23 +1994,32 @@
+       extN_journal_stop(handle, inode);
+ }
+-/* 
+- * extN_get_inode_loc returns with an extra refcount against the
+- * inode's underlying buffer_head on success. 
+- */
++extern int extN_itable_block_used(struct super_block *sb,
++                                unsigned int block_group,
++                                int offset);
++
++#define NUM_INODE_PREREAD 16
+-int extN_get_inode_loc (struct inode *inode, struct extN_iloc *iloc)
++/*
++ * extN_get_inode_loc returns with an extra refcount against the inode's
++ * underlying buffer_head on success.  If this is for a new inode allocation
++ * (new is non-zero) then we may be able to optimize away the read if there
++ * are no other in-use inodes in this inode table block.  If we need to do
++ * a read, then read in a whole chunk of blocks to avoid blocking again soon
++ * if we are doing lots of creates/updates.
++ */
++int extN_get_inode_loc_new(struct inode *inode, struct extN_iloc *iloc, int new)
+ {
+       struct super_block *sb = inode->i_sb;
+       struct extN_sb_info *sbi = EXTN_SB(sb);
+-      struct buffer_head *bh = 0;
++      struct buffer_head *bh[NUM_INODE_PREREAD];
+       unsigned long block;
+       unsigned long block_group;
+       unsigned long group_desc;
+       unsigned long desc;
+       unsigned long offset;
+       struct extN_group_desc * gdp;
+-              
++
+       if ((inode->i_ino != EXTN_ROOT_INO &&
+               inode->i_ino != EXTN_JOURNAL_INO &&
+               inode->i_ino < EXTN_FIRST_INO(sb)) ||
+@@ -2042,38 +2034,86 @@
+       }
+       group_desc = block_group >> sbi->s_desc_per_block_bits;
+       desc = block_group & (sbi->s_desc_per_block - 1);
+-      bh = sbi->s_group_desc[group_desc];
+-      if (!bh) {
++      if (!sbi->s_group_desc[group_desc]) {
+               extN_error(sb, __FUNCTION__, "Descriptor not loaded");
+               goto bad_inode;
+       }
+-      gdp = (struct extN_group_desc *) bh->b_data;
++      gdp = (struct extN_group_desc *)(sbi->s_group_desc[group_desc]->b_data);
++
+       /*
+        * Figure out the offset within the block group inode table
+        */
+-      offset = ((inode->i_ino - 1) % sbi->s_inodes_per_group) *
+-              sbi->s_inode_size;
++      offset = ((inode->i_ino - 1) % sbi->s_inodes_per_group);
++
+       block = le32_to_cpu(gdp[desc].bg_inode_table) +
+-              (offset >> EXTN_BLOCK_SIZE_BITS(sb));
+-      if (!(bh = sb_bread(sb, block))) {
+-              extN_error (sb, __FUNCTION__,
+-                          "unable to read inode block - "
+-                          "inode=%lu, block=%lu", inode->i_ino, block);
+-              goto bad_inode;
++              (offset * sbi->s_inode_size >> EXTN_BLOCK_SIZE_BITS(sb));
++
++      bh[0] = sb_getblk(sb, block);
++      if (buffer_uptodate(bh[0]))
++              goto done;
++
++      /* If we don't really need to read this block, and it isn't already
++       * in memory, then we just zero it out.  Otherwise, we keep the
++       * current block contents (deleted inode data) for posterity.
++       */
++      if (new && !extN_itable_block_used(sb, block_group, offset)) {
++              lock_buffer(bh[0]);
++              memset(bh[0]->b_data, 0, bh[0]->b_size);
++              mark_buffer_uptodate(bh[0], 1);
++              unlock_buffer(bh[0]);
++      } else {
++              unsigned long block_end, itable_end;
++              int count = 1;
++
++              itable_end = le32_to_cpu(gdp[desc].bg_inode_table) +
++                              sbi->s_itb_per_group;
++              block_end = block + NUM_INODE_PREREAD;
++              if (block_end > itable_end)
++                      block_end = itable_end;
++
++              for (; block < block_end; block++) {
++                      bh[count] = sb_getblk(sb, block);
++                      if (count && (buffer_uptodate(bh[count]) ||
++                                    buffer_locked(bh[count]))) {
++                              __brelse(bh[count]);
++                      } else
++                              count++;
++              }
++
++              ll_rw_block(READ, count, bh);
++
++              /* Release all but the block we actually need (bh[0]) */
++              while (--count > 0)
++                      __brelse(bh[count]);
++
++              wait_on_buffer(bh[0]);
++              if (!buffer_uptodate(bh[0])) {
++                      extN_error(sb, __FUNCTION__,
++                                 "unable to read inode block - "
++                                 "inode=%lu, block=%lu", inode->i_ino,
++                                 bh[0]->b_blocknr);
++                      goto bad_inode;
++              }
+       }
+-      offset &= (EXTN_BLOCK_SIZE(sb) - 1);
++ done:
++      offset = (offset * sbi->s_inode_size) & (EXTN_BLOCK_SIZE(sb) - 1);
+-      iloc->bh = bh;
+-      iloc->raw_inode = (struct extN_inode *) (bh->b_data + offset);
++      iloc->bh = bh[0];
++      iloc->raw_inode = (struct extN_inode *)(bh[0]->b_data + offset);
+       iloc->block_group = block_group;
+-      
++
+       return 0;
+-      
++
+  bad_inode:
+       return -EIO;
+ }
++int extN_get_inode_loc(struct inode *inode, struct extN_iloc *iloc)
++{
++      return extN_get_inode_loc_new(inode, iloc, 0);
++}
++
+ void extN_read_inode(struct inode * inode)
+ {
+       struct extN_iloc iloc;
diff --git a/lustre/extN/extN-wantedi.diff b/lustre/extN/extN-wantedi.diff
new file mode 100644 (file)
index 0000000..3be559f
--- /dev/null
@@ -0,0 +1,163 @@
+--- lustre/extN-clean/namei.c  2002-12-30 05:56:09.000000000 -0500
++++ lustre/extN/namei.c        2002-12-30 06:29:39.000000000 -0500
+@@ -1224,7 +1224,8 @@
+       if (IS_SYNC(dir))
+               handle->h_sync = 1;
+-      inode = extN_new_inode (handle, dir, mode);
++      inode = extN_new_inode (handle, dir, mode,
++                              (unsigned long)dentry->d_fsdata);
+       err = PTR_ERR(inode);
+       if (!IS_ERR(inode)) {
+               inode->i_op = &extN_file_inode_operations;
+@@ -1254,7 +1254,8 @@
+       if (IS_SYNC(dir))
+               handle->h_sync = 1;
+-      inode = extN_new_inode (handle, dir, mode);
++      inode = extN_new_inode (handle, dir, mode,
++                              (unsigned long)dentry->d_fsdata);
+       err = PTR_ERR(inode);
+       if (!IS_ERR(inode)) {
+               init_special_inode(inode, mode, rdev);
+@@ -1286,7 +1286,8 @@
+       if (IS_SYNC(dir))
+               handle->h_sync = 1;
+-      inode = extN_new_inode (handle, dir, S_IFDIR | mode);
++      inode = extN_new_inode (handle, dir, S_IFDIR | mode,
++                              (unsigned long)dentry->d_fsdata);
+       err = PTR_ERR(inode);
+       if (IS_ERR(inode))
+               goto out_stop;
+@@ -1680,7 +1681,8 @@
+       if (IS_SYNC(dir))
+               handle->h_sync = 1;
+-      inode = extN_new_inode (handle, dir, S_IFLNK|S_IRWXUGO);
++      inode = extN_new_inode (handle, dir, S_IFLNK|S_IRWXUGO,
++                              (unsigned long)dentry->d_fsdata);
+       err = PTR_ERR(inode);
+       if (IS_ERR(inode))
+               goto out_stop;
+--- lustre/extN-clean/ialloc.c 2002-12-28 23:56:42.000000000 -0500
++++ lustre/extN/ialloc.c       2002-12-30 06:29:39.000000000 -0500
+@@ -329,8 +329,8 @@
+  * For other inodes, search forward from the parent directory's block
+  * group to find a free inode.
+  */
+-struct inode * extN_new_inode (handle_t *handle,
+-                              const struct inode * dir, int mode)
++struct inode *extN_new_inode(handle_t *handle, const struct inode *dir,
++                           int mode, unsigned long goal)
+ {
+       struct super_block * sb;
+       struct buffer_head * bh;
+@@ -360,6 +361,38 @@
+       lock_super (sb);
+       es = sbi->s_es;
++
++      if (goal) {
++              i = (goal - 1) / EXTN_INODES_PER_GROUP(sb);
++              j = (goal - 1) % EXTN_INODES_PER_GROUP(sb);
++              gdp = extN_get_group_desc(sb, i, &bh2);
++
++              bitmap_nr = load_inode_bitmap (sb, i);
++              if (bitmap_nr < 0)
++                      goto fail;
++
++              bh = sbi->s_inode_bitmap[bitmap_nr];
++
++              BUFFER_TRACE(bh, "get_write_access");
++              err = extN_journal_get_write_access(handle, bh);
++              if (err) goto fail;
++
++              if (extN_set_bit(j, bh->b_data)) {
++                      printk(KERN_ERR "goal inode %lu unavailable", goal);
++                      /* Oh well, we tried. */
++                      goto repeat;
++              }
++
++              BUFFER_TRACE(bh, "call extN_journal_dirty_metadata");
++              err = extN_journal_dirty_metadata(handle, bh);
++              if (err) goto fail;
++
++              /* We've shortcircuited the allocation system successfully,
++               * now finish filling in the inode.
++               */
++              goto have_bit_and_group;
++      }
++
+ repeat:
+       gdp = NULL;
+       i = 0;
+@@ -474,6 +509,7 @@
+               }
+               goto repeat;
+       }
++have_bit_and_group:
+       j += i * sbi->s_inodes_per_group + 1;
+       if (j < sbi->s_first_ino || j > le32_to_cpu(es->s_inodes_count)) {
+               extN_error (sb, "extN_new_inode",
+--- lustre/extN-clean/ioctl.c  2002-12-28 23:56:42.000000000 -0500
++++ lustre/extN/ioctl.c        2002-12-30 06:29:39.000000000 -0500
+@@ -24,6 +24,31 @@
+       extN_debug ("cmd = %u, arg = %lu\n", cmd, arg);
+       switch (cmd) {
++      case EXTN_IOC_CREATE_INUM: {
++              char name[32];
++              struct dentry *dchild, *dparent;
++              int rc = 0;
++
++              dparent = list_entry(inode->i_dentry.next, struct dentry,
++                                   d_alias);
++              snprintf(name, sizeof name, "%lu", arg);
++              dchild = lookup_one_len(name, dparent, strlen(name));
++              if (dchild->d_inode) {
++                      printk(KERN_ERR "%*s/%lu already exists (ino %lu)\n",
++                             dparent->d_name.len, dparent->d_name.name, arg,
++                             dchild->d_inode->i_ino);
++                      rc = -EEXIST;
++              } else {
++                      dchild->d_fsdata = (void *)arg;
++                      rc = vfs_create(inode, dchild, 0644);
++                      if (rc)
++                              printk(KERN_ERR "vfs_create: %d\n", rc);
++                      else if (dchild->d_inode->i_ino != arg)
++                              rc = -EEXIST;
++              }
++              dput(dchild);
++              return rc;
++      }
+       case EXTN_IOC_GETFLAGS:
+               flags = ei->i_flags & EXTN_FL_USER_VISIBLE;
+               return put_user(flags, (int *) arg);
+--- lustre/include/linux/extN_fs.h~    2002-12-30 06:01:43.000000000 -0500
++++ lustre/include/linux/extN_fs.h     2002-12-30 06:02:51.000000000 -0500
+@@ -200,6 +200,7 @@
+ #define       EXTN_IOC_SETFLAGS               _IOW('f', 2, long)
+ #define       EXTN_IOC_GETVERSION             _IOR('f', 3, long)
+ #define       EXTN_IOC_SETVERSION             _IOW('f', 4, long)
++/* EXTN_IOC_CREATE_INUM at bottom of file (visible to kernel and user). */
+ #define       EXTN_IOC_GETVERSION_OLD         _IOR('v', 1, long)
+ #define       EXTN_IOC_SETVERSION_OLD         _IOW('v', 2, long)
+ #ifdef CONFIG_JBD_DEBUG
+@@ -632,7 +633,8 @@
+ extern int extN_sync_file (struct file *, struct dentry *, int);
+ /* ialloc.c */
+-extern struct inode * extN_new_inode (handle_t *, const struct inode *, int);
++extern struct inode * extN_new_inode (handle_t *, const struct inode *, int,
++                                    unsigned long);
+ extern void extN_free_inode (handle_t *, struct inode *);
+ extern struct inode * extN_orphan_get (struct super_block *, ino_t);
+ extern unsigned long extN_count_free_inodes (struct super_block *);
+@@ -714,4 +716,6 @@
+ #endif        /* __KERNEL__ */
++#define EXTN_IOC_CREATE_INUM                  _IOW('f', 5, long)
++
+ #endif        /* _LINUX_EXTN_FS_H */
index c1382a9..e552dfd 100644 (file)
@@ -145,7 +145,7 @@ typedef int (*ldlm_blocking_callback)(struct ldlm_lock *lock,
 typedef int (*ldlm_completion_callback)(struct ldlm_lock *lock, int flags);
 
 struct ldlm_lock {
-        __u64                 l_random;
+        struct portals_handle l_handle; // must be first in the structure
         atomic_t              l_refc;
         struct ldlm_resource *l_resource;
         struct ldlm_lock     *l_parent;
@@ -183,8 +183,9 @@ struct ldlm_lock {
 };
 
 typedef int (*ldlm_res_compat)(struct ldlm_lock *child, struct ldlm_lock *new);
-typedef int (*ldlm_res_policy)(struct ldlm_lock *lock, void *req_cookie,
-                               ldlm_mode_t mode, int flags, void *data);
+typedef int (*ldlm_res_policy)(struct ldlm_namespace *, struct ldlm_lock *,
+                               void *req_cookie, ldlm_mode_t mode, int flags,
+                               void *data);
 
 #define LDLM_PLAIN       10
 #define LDLM_EXTENT      11
@@ -246,22 +247,24 @@ extern char *ldlm_it2str(int it);
 do {                                                                          \
         if (lock->l_resource == NULL) {                                       \
                 CDEBUG(D_DLMTRACE, "### " format                              \
-                       " ns: \?\? lock: %p lrc: %d/%d,%d mode: %s/%s "        \
+                       " ns: \?\? lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "\
                        "res: \?\? rrc=\?\? type: \?\?\? remote: "LPX64")\n"   \
-                       , ## a, lock, lock->l_refc, lock->l_readers,           \
-                       lock->l_writers,                                       \
+                       , ## a, lock, lock->l_handle.h_cookie,                 \
+                       atomic_read(&lock->l_refc),                            \
+                       lock->l_readers, lock->l_writers,                      \
                        ldlm_lockname[lock->l_granted_mode],                   \
                        ldlm_lockname[lock->l_req_mode],                       \
-                       lock->l_remote_handle.addr);                           \
+                       lock->l_remote_handle.cookie);                         \
                 break;                                                        \
         }                                                                     \
         if (lock->l_resource->lr_type == LDLM_EXTENT) {                       \
                 CDEBUG(D_DLMTRACE, "### " format                              \
-                       " ns: %s lock: %p lrc: %d/%d,%d mode: %s/%s res: "LPU64   \
-                       "/"LPU64" rrc: %d type: %s ["LPU64"->"LPU64"] remote: "  \
-                       LPX64"\n" , ## a,                                     \
+                       " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "  \
+                       "res: "LPU64"/"LPU64" rrc: %d type: %s ["LPU64"->"LPU64\
+                       "] remote: "LPX64"\n" , ## a,                          \
                        lock->l_resource->lr_namespace->ns_name, lock,         \
-                       lock->l_refc, lock->l_readers, lock->l_writers,        \
+                       lock->l_handle.h_cookie, atomic_read(&lock->l_refc),   \
+                       lock->l_readers, lock->l_writers,                      \
                        ldlm_lockname[lock->l_granted_mode],                   \
                        ldlm_lockname[lock->l_req_mode],                       \
                        lock->l_resource->lr_name[0],                          \
@@ -269,22 +272,24 @@ do {                                                                          \
                        atomic_read(&lock->l_resource->lr_refcount),           \
                        ldlm_typename[lock->l_resource->lr_type],              \
                        lock->l_extent.start, lock->l_extent.end,              \
-                       lock->l_remote_handle.addr);                           \
+                       lock->l_remote_handle.cookie);                         \
                 break;                                                        \
         }                                                                     \
         {                                                                     \
                 CDEBUG(D_DLMTRACE, "### " format                              \
-                       " ns: %s lock: %p lrc: %d/%d,%d mode: %s/%s res: "LPU64   \
-                       "/"LPU64" rrc: %d type: %s remote: "LPX64"\n" , ## a,   \
-                       lock->l_resource->lr_namespace->ns_name, lock,         \
-                       lock->l_refc, lock->l_readers, lock->l_writers,        \
+                       " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "  \
+                       "res: "LPU64"/"LPU64" rrc: %d type: %s remote: "LPX64  \
+                       "\n" , ## a, lock->l_resource->lr_namespace->ns_name,  \
+                       lock, lock->l_handle.h_cookie,                         \
+                       atomic_read (&lock->l_refc),                           \
+                       lock->l_readers, lock->l_writers,                      \
                        ldlm_lockname[lock->l_granted_mode],                   \
                        ldlm_lockname[lock->l_req_mode],                       \
                        lock->l_resource->lr_name[0],                          \
                        lock->l_resource->lr_name[1],                          \
                        atomic_read(&lock->l_resource->lr_refcount),           \
                        ldlm_typename[lock->l_resource->lr_type],              \
-                       lock->l_remote_handle.addr);                           \
+                       lock->l_remote_handle.cookie);                         \
         }                                                                     \
 } while (0)
 
@@ -295,22 +300,25 @@ do {                                                                          \
  * Iterators.
  */
 
-#define LDLM_ITER_CONTINUE 0 /* keep iterating */
-#define LDLM_ITER_STOP     1 /* stop iterating */
+#define LDLM_ITER_CONTINUE 1 /* keep iterating */
+#define LDLM_ITER_STOP     0 /* stop iterating */
 
 typedef int (*ldlm_iterator_t)(struct ldlm_lock *, void *);
+typedef int (*ldlm_res_iterator_t)(struct ldlm_resource *, void *);
 
 int ldlm_resource_foreach(struct ldlm_resource *res, ldlm_iterator_t iter,
                           void *closure);
 int ldlm_namespace_foreach(struct ldlm_namespace *ns, ldlm_iterator_t iter,
                            void *closure);
+int ldlm_namespace_foreach_res(struct ldlm_namespace *ns,
+                               ldlm_res_iterator_t iter, void *closure);
 
 int ldlm_replay_locks(struct obd_import *imp);
 
 /* ldlm_extent.c */
 int ldlm_extent_compat(struct ldlm_lock *, struct ldlm_lock *);
-int ldlm_extent_policy(struct ldlm_lock *, void *, ldlm_mode_t, int flags,
-                       void *);
+int ldlm_extent_policy(struct ldlm_namespace *, struct ldlm_lock *, void *,
+                       ldlm_mode_t, int flags, void *);
 
 /* ldlm_lockd.c */
 int ldlm_handle_enqueue(struct ptlrpc_request *req);
@@ -319,19 +327,17 @@ int ldlm_handle_cancel(struct ptlrpc_request *req);
 int ldlm_del_waiting_lock(struct ldlm_lock *lock);
 
 /* ldlm_lock.c */
-void ldlm_register_intent(int (*arg)(struct ldlm_lock *lock, void *req_cookie,
-                                     ldlm_mode_t mode, int flags, void *data));
+void ldlm_register_intent(ldlm_res_policy arg);
 void ldlm_unregister_intent(void);
 void ldlm_lock2handle(struct ldlm_lock *lock, struct lustre_handle *lockh);
-struct ldlm_lock *__ldlm_handle2lock(struct lustre_handle *, int strict,
-                                     int flags);
+struct ldlm_lock *__ldlm_handle2lock(struct lustre_handle *, int flags);
 void ldlm_cancel_callback(struct ldlm_lock *);
 int ldlm_lock_set_data(struct lustre_handle *, void *data, int datalen);
 void ldlm_lock_remove_from_lru(struct ldlm_lock *);
 
 static inline struct ldlm_lock *ldlm_handle2lock(struct lustre_handle *h)
 {
-        return __ldlm_handle2lock(h, 1, 0);
+        return __ldlm_handle2lock(h, 0);
 }
 
 #define LDLM_LOCK_PUT(lock)                     \
@@ -363,18 +369,19 @@ ldlm_lock_create(struct ldlm_namespace *ns,
                  struct lustre_handle *parent_lock_handle,
                  __u64 *res_id, __u32 type, ldlm_mode_t mode, void *data,
                  __u32 data_len);
-ldlm_error_t ldlm_lock_enqueue(struct ldlm_lock *lock, void *cookie,
-                               int cookie_len, int *flags,
+ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *, struct ldlm_lock *,
+                               void *cookie, int cookie_len, int *flags,
                                ldlm_completion_callback completion,
                                ldlm_blocking_callback blocking);
 struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode,
                                         int *flags);
 void ldlm_lock_cancel(struct ldlm_lock *lock);
 void ldlm_cancel_locks_for_export(struct obd_export *export);
-void ldlm_run_ast_work(struct list_head *rpc_list);
+int ldlm_run_ast_work(struct list_head *rpc_list);
 void ldlm_reprocess_all(struct ldlm_resource *res);
-void ldlm_lock_dump(struct ldlm_lock *lock);
-void ldlm_lock_dump_handle(struct lustre_handle *);
+void ldlm_reprocess_all_ns(struct ldlm_namespace *ns);
+void ldlm_lock_dump(int level, struct ldlm_lock *lock);
+void ldlm_lock_dump_handle(int level, struct lustre_handle *);
 
 /* ldlm_test.c */
 int ldlm_test(struct obd_device *device, struct lustre_handle *connh);
@@ -406,9 +413,11 @@ void ldlm_res2desc(struct ldlm_resource *res, struct ldlm_resource_desc *desc);
 void ldlm_dump_all_namespaces(void);
 void ldlm_namespace_dump(struct ldlm_namespace *);
 void ldlm_resource_dump(struct ldlm_resource *);
-int ldlm_lock_change_resource(struct ldlm_lock *, __u64 new_resid[3]);
+int ldlm_lock_change_resource(struct ldlm_namespace *, struct ldlm_lock *,
+                              __u64 new_resid[3]);
 
 /* ldlm_request.c */
+int ldlm_expired_completion_wait(void *data);
 int ldlm_completion_ast(struct ldlm_lock *lock, int flags);
 int ldlm_cli_enqueue(struct lustre_handle *conn,
                      struct ptlrpc_request *req,
index ba9555c..342721c 100644 (file)
 #include <linux/obd_filter.h>
 
 struct lov_export_data {
+        spinlock_t       led_lock;
         struct list_head led_open_head;
 };
 
+struct ost_export_data {
+        __u8 oed_uuid[37]; /* client UUID */
+};
+
 struct obd_export {
         __u64                     exp_cookie;
         struct list_head          exp_obd_chain;
@@ -32,12 +37,14 @@ struct obd_export {
                 struct mds_export_data    eu_mds_data;
                 struct filter_export_data eu_filter_data;
                 struct lov_export_data    eu_lov_data;
+                struct ost_export_data    eu_ost_data;
         } u;
 };
 
 #define exp_mds_data    u.eu_mds_data
 #define exp_lov_data    u.eu_lov_data
 #define exp_filter_data u.eu_filter_data
+#define exp_ost_data    u.eu_ost_data
 
 extern struct obd_export *class_conn2export(struct lustre_handle *conn);
 extern struct obd_device *class_conn2obd(struct lustre_handle *conn);
index bfac4c3..87b0bf3 100644 (file)
@@ -52,10 +52,12 @@ int recovd_setup(struct recovd_obd *mgr);
 int recovd_cleanup(struct recovd_obd *mgr);
 
 extern struct recovd_obd *ptlrpc_recovd;
+struct ptlrpc_request;
 
 int ptlrpc_run_recovery_upcall(struct ptlrpc_connection *conn);
-int ptlrpc_reconnect_import(struct obd_import *imp, int rq_opc);
-int ptlrpc_replay(struct obd_import *imp, int send_last_flag);
+int ptlrpc_reconnect_import(struct obd_import *imp, int rq_opc,
+                            struct ptlrpc_request **reqptr);
+int ptlrpc_replay(struct obd_import *imp);
 int ptlrpc_resend(struct obd_import *imp);
 void ptlrpc_free_committed(struct obd_import *imp);
 void ptlrpc_wake_delayed(struct obd_import *imp);
index 0febd11..cc194ac 100644 (file)
@@ -72,6 +72,9 @@ typedef __u8 obd_uuid_t[37];
 #define LDLM_CB_REPLY_PORTAL       16
 #define LDLM_CANCEL_REQUEST_PORTAL     17
 #define LDLM_CANCEL_REPLY_PORTAL       18
+#define PTLBD_REQUEST_PORTAL           19
+#define PTLBD_REPLY_PORTAL             20
+#define PTLBD_BULK_PORTAL              21
 
 #define SVC_KILLED               1
 #define SVC_EVENT                2
@@ -126,9 +129,12 @@ struct lustre_msg {
 #define MSG_OP_FLAG_SHIFT  16
 
 /* Flags that apply to all requests are in the bottom 16 bits */
-#define MSG_GEN_FLAG_MASK  0x0000ffff
-#define MSG_LAST_REPLAY    1
-#define MSG_RESENT         2
+#define MSG_GEN_FLAG_MASK      0x0000ffff
+#define MSG_LAST_REPLAY        1
+#define MSG_RESENT             2
+
+/* XXX horrible interim hack -- see bug 578 */
+#define MSG_REPLAY_IN_PROGRESS 4
 
 static inline int lustre_msg_get_flags(struct lustre_msg *msg)
 {
@@ -231,13 +237,11 @@ struct lov_object_id { /* per-child structure */
 
 struct lov_mds_md {
         __u32 lmm_magic;
-        __u32 lmm_unused;          /* was packed size of extended attribute */
         __u64 lmm_object_id;       /* lov object id */
-        __u32 lmm_stripe_offset;   /* starting stripe offset in lmd_objects */
-        __u32 lmm_stripe_count;    /* number of stipes in use for this object */
-        __u64 lmm_stripe_size;     /* size of the stripe */
-        __u32 lmm_ost_count;       /* how many OST idx are in this LOV md */
-        __u32 lmm_stripe_pattern;  /* per-lov object stripe pattern */
+        __u32 lmm_stripe_size;     /* size of the stripe */
+        __u32 lmm_stripe_offset;   /* starting stripe offset in lmm_objects */
+        __u16 lmm_stripe_count;    /* number of stipes in use for this object */
+        __u16 lmm_ost_count;       /* how many OST idx are in this LOV md */
         struct lov_object_id lmm_objects[0];
 };
 
@@ -334,6 +338,7 @@ struct ost_body {
 #define MDS_GETSTATUS  9
 #define MDS_STATFS     10
 #define MDS_GETLOVINFO 11
+#define MDS_GETATTR_NAME 12
 
 #define REINT_SETATTR  1
 #define REINT_CREATE   2
@@ -549,4 +554,33 @@ struct ldlm_reply {
         __u64  lock_policy_res1;
         __u64  lock_policy_res2;
 };
+
+/*
+ * ptlbd, portal block device requests
+ */
+typedef enum {
+        PTLBD_QUERY = 200,
+        PTLBD_READ = 201,
+        PTLBD_WRITE = 202,
+} ptlbd_cmd_t;
+
+struct ptlbd_op {
+        __u16 op_cmd;
+        __u16 op_lun;
+        __u16 op_niob_cnt;
+        __u16 op__padding;
+        __u32 op_block_cnt;
+};
+
+struct ptlbd_niob {
+        __u64 n_xid;
+        __u64 n_block_nr;
+        __u32 n_offset;
+        __u32 n_length;
+};
+
+struct ptlbd_rsp {
+        __u16 r_status;
+        __u16 r_error_cnt;
+};
 #endif
index aa58c49..b1f9288 100644 (file)
@@ -53,10 +53,13 @@ struct ptlrpc_request;
 struct obd_device;
 struct recovd_data;
 struct recovd_obd;
+struct obd_export;
 #include <linux/lustre_ha.h>
 
 int target_handle_connect(struct ptlrpc_request *req);
 int target_handle_disconnect(struct ptlrpc_request *req);
+int target_handle_reconnect(struct lustre_handle *conn, struct obd_export *exp,
+                            char *cluuid);
 int client_obd_connect(struct lustre_handle *conn, struct obd_device *obd,
                        obd_uuid_t cluuid, struct recovd_obd *recovd,
                        ptlrpc_recovery_cb_t recover);
@@ -138,17 +141,6 @@ static inline void ldlm_object2handle(void *object, struct lustre_handle *handle
         handle->addr = (__u64)(unsigned long)object;
 }
 
-struct obd_statfs;
-struct statfs;
-void statfs_pack(struct obd_statfs *osfs, struct statfs *sfs);
-void statfs_unpack(struct statfs *sfs, struct obd_statfs *osfs);
-void obd_statfs_pack(struct obd_statfs *tgt, struct obd_statfs *src);
-static inline void
-obd_statfs_unpack(struct obd_statfs *tgt, struct obd_statfs *src)
-{
-        obd_statfs_pack(tgt, src);
-}
-
 #include <linux/portals_lib.h>
 
 /*
@@ -408,11 +400,13 @@ static inline int obd_ioctl_getdata(char **buf, int *len, void *arg)
         }
 
         if (data->ioc_inllen2) {
-                data->ioc_inlbuf2 = &data->ioc_bulk[0] + size_round(data->ioc_inllen1);
+                data->ioc_inlbuf2 = &data->ioc_bulk[0] +
+                        size_round(data->ioc_inllen1);
         }
 
         if (data->ioc_inllen3) {
-                data->ioc_inlbuf3 = &data->ioc_bulk[0] + size_round(data->ioc_inllen1) + 
+                data->ioc_inlbuf3 = &data->ioc_bulk[0] +
+                        size_round(data->ioc_inllen1) +
                         size_round(data->ioc_inllen2);
         }
 
@@ -426,7 +420,7 @@ static inline int obd_ioctl_getdata(char **buf, int *len, void *arg)
 #define OBD_IOC_CLEANUP                _IO  ('f', 103      )
 #define OBD_IOC_DESTROY                _IOW ('f', 104, long)
 #define OBD_IOC_PREALLOCATE            _IOWR('f', 105, long)
-#define OBD_IOC_DEC_USE_COUNT          _IO  ('f', 106      )
+
 #define OBD_IOC_SETATTR                _IOW ('f', 107, long)
 #define OBD_IOC_GETATTR                _IOR ('f', 108, long)
 #define OBD_IOC_READ                   _IOWR('f', 109, long)
index a965bcb..deb9656 100644 (file)
@@ -224,7 +224,7 @@ struct ldlm_lock;
 int ll_lock_callback(struct ldlm_lock *, struct ldlm_lock_desc *, void *data,
                      __u32 data_len, int flag);
 int ll_size_lock(struct inode *, struct lov_stripe_md *, obd_off start,
-                 int mode, struct lustre_handle **);
+                 int mode, struct lustre_handle *);
 int ll_size_unlock(struct inode *, struct lov_stripe_md *, int mode,
                    struct lustre_handle *);
 int ll_file_size(struct inode *inode, struct lov_stripe_md *md);
index 558c10b..7a02dae 100644 (file)
@@ -155,7 +155,7 @@ int mds_pack_md(struct mds_obd *mds, struct ptlrpc_request *req,
 
 /* mds/mds_fs.c */
 int mds_fs_setup(struct obd_device *obddev, struct vfsmount *mnt);
-void mds_fs_cleanup(struct obd_device *obddev);
+int mds_fs_cleanup(struct obd_device *obddev);
 
 /* mdc/mdc_request.c */
 int mdc_enqueue(struct lustre_handle *conn, int lock_type,
@@ -167,8 +167,11 @@ int mdc_getlovinfo(struct obd_device *obd, struct lustre_handle *mdc_connh,
                    struct ptlrpc_request **request);
 int mdc_getstatus(struct lustre_handle *conn, struct ll_fid *rootfid);
 int mdc_getattr(struct lustre_handle *conn,
-                obd_id ino, int type, unsigned long valid, size_t ea_size,
+                obd_id ino, int type, unsigned long valid, unsigned int ea_size,
                 struct ptlrpc_request **request);
+int mdc_getattr_name(struct lustre_handle *conn, struct inode *parent,
+                     char *filename, int namelen, unsigned long valid,
+                     unsigned int ea_size, struct ptlrpc_request **request);
 int mdc_setattr(struct lustre_handle *conn,
                 struct inode *, struct iattr *iattr, struct ptlrpc_request **);
 int mdc_open(struct lustre_handle *conn, obd_id ino, int type, int flags,
index 142db3b..081492c 100644 (file)
 
 #define LDLM_NUM_THREADS        4
 #define LDLM_NEVENTS    1024
-#define LDLM_NBUFS      20
-#define LDLM_BUFSIZE    (32 * 1024)
+#define LDLM_NBUFS      100
+#define LDLM_BUFSIZE    (8 * 1024)
 #define LDLM_MAXREQSIZE 1024
 
 #define MDT_NUM_THREADS 8
 #define MDS_NEVENTS     1024
-#define MDS_NBUFS       20
-#define MDS_BUFSIZE     (32 * 1024)
+#define MDS_NBUFS       100
+#define MDS_BUFSIZE     (8 * 1024)
 #define MDS_MAXREQSIZE  1024
 
 #define OST_NUM_THREADS 6
 #define OST_NEVENTS     min(num_physpages / 16, 32768UL)
-#define OST_NBUFS       min(OST_NEVENTS / 128, 256UL)
-#define OST_BUFSIZE     ((OST_NEVENTS > 4096UL ? 128 : 32) * 1024)
+#define OST_NBUFS       min(OST_NEVENTS / 128, 1280UL)
+#define OST_BUFSIZE     ((OST_NEVENTS > 4096UL ? 32 : 8) * 1024)
 #define OST_MAXREQSIZE  (8 * 1024)
 
+#define PTLBD_NUM_THREADS        4
+#define PTLBD_NEVENTS    1024
+#define PTLBD_NBUFS      20
+#define PTLBD_BUFSIZE    (32 * 1024)
+#define PTLBD_MAXREQSIZE 1024
+
 #define CONN_INVALID 1
 
 struct ptlrpc_connection {
@@ -137,7 +143,6 @@ struct ptlrpc_request {
         __u64 rq_xid;
 
         int rq_level;
-        time_t rq_timeout;
         //        void * rq_reply_handle;
         wait_queue_head_t rq_wait_for_rep;
 
@@ -160,13 +165,14 @@ struct ptlrpc_request {
 #define DEBUG_REQ(level, req, fmt, args...)                                    \
 do {                                                                           \
 CDEBUG(level,                                                                  \
-       "@@@ " fmt " req x"LPD64"/t"LPD64" o%d->%s:%d lens %d/%d ref %d fl "    \
-       "%x\n" ,  ## args, req->rq_xid, req->rq_transno,                        \
+       "@@@ " fmt " req@%p x"LPD64"/t"LPD64" o%d->%s:%d lens %d/%d ref %d fl " \
+       "%x\n" ,  ## args, req, req->rq_xid, req->rq_reqmsg->transno,           \
        req->rq_reqmsg ? req->rq_reqmsg->opc : -1,                              \
        req->rq_connection ? (char *)req->rq_connection->c_remote_uuid : "<?>", \
        (req->rq_import && req->rq_import->imp_client) ?                        \
            req->rq_import->imp_client->cli_request_portal : -1,                \
-       req->rq_reqlen, req->rq_replen, req->rq_refcount, req->rq_flags);       \
+       req->rq_reqlen, req->rq_replen,                                         \
+       atomic_read (&req->rq_refcount), req->rq_flags);                        \
 } while (0)
 
 struct ptlrpc_bulk_page {
index 9612846..94ffd4f 100644 (file)
 
 struct lov_oinfo { /* per-child structure */
         __u64 loi_id;              /* object ID on the target OST */
-        struct lustre_handle *loi_handle; /* handle for object on OST */
+        struct lustre_handle *loi_handle; /* open file handle for obj on OST */
         int loi_ost_idx;           /* OST stripe index in lmd_objects array */
 };
 
 struct lov_stripe_md {
-        __u32 lsm_magic;
         __u64 lsm_object_id;       /* lov object id */
-        __u64 lsm_stripe_size;     /* size of the stripe */
-        __u32 lsm_stripe_pattern;  /* per-lov object stripe pattern */
+        __u32 lsm_magic;
+        __u32 lsm_stripe_size;     /* size of the stripe */
         int   lsm_stripe_offset;   /* offset of first stripe in lmd_objects */
         int   lsm_stripe_count;    /* how many objects are being striped on */
         struct lov_oinfo lsm_oinfo[0];
 };
 
+#define IOC_OSC_TYPE         'h'
+#define IOC_OSC_MIN_NR       20
+#define IOC_OSC_REGISTER_LOV _IOWR(IOC_OSC_TYPE, 20, struct obd_device *)
+#define IOC_OSC_MAX_NR       50
+
+#define IOC_MDC_TYPE         'i'
+#define IOC_MDC_MIN_NR       20
+#define IOC_MDC_LOOKUP       _IOWR(IOC_MDC_TYPE, 20, struct obd_device *)
+#define IOC_MDC_MAX_NR       50
+
 #ifdef __KERNEL__
 # include <linux/fs.h>
 # include <linux/list.h>
@@ -46,9 +55,9 @@ struct obd_type {
 };
 
 struct brw_page {
-        struct page *pg;
-        obd_size count;
         obd_off  off;
+        struct page *pg;
+        int count;
         obd_flag flag;
 };
 
@@ -95,6 +104,7 @@ struct filter_obd {
         struct dentry *fo_dentry_O_mode[16];
         spinlock_t fo_objidlock;        /* protects fo_lastobjid increment */
         __u64 fo_lastobjid;
+        __u64 fo_last_committed;
         struct file_operations *fo_fop;
         struct inode_operations *fo_iop;
         struct address_space_operations *fo_aops;
@@ -115,11 +125,6 @@ struct client_obd {
         struct obd_device   *cl_containing_lov;
 };
 
-#define IOC_OSC_TYPE         'h'
-#define IOC_OSC_MIN_NR       20
-#define IOC_OSC_REGISTER_LOV _IOWR('h', 20, struct obd_device *)
-#define IOC_OSC_MAX_NR       50
-
 struct mds_obd {
         struct ptlrpc_service           *mds_service;
 
@@ -146,6 +151,9 @@ struct mds_obd {
         struct list_head                 mds_delayed_reply_queue;
         spinlock_t                       mds_processing_task_lock;
         pid_t                            mds_processing_task;
+
+        int                              mds_has_lov_desc;
+        struct lov_desc                  mds_lov_desc;
 };
 
 struct ldlm_obd {
@@ -169,6 +177,19 @@ struct echo_obd {
         atomic_t eo_write;
 };
 
+/*
+ * this struct does double-duty acting as either a client or
+ * server instance .. maybe not wise.
+ */
+struct ptlbd_obd {
+        /* server's */
+        struct ptlrpc_service *ptlbd_service;
+        /* client's */
+        struct ptlrpc_client bd_client;
+        struct obd_import bd_import;
+        int refcount; /* XXX sigh */
+};
+
 struct recovd_obd {
         spinlock_t            recovd_lock;
         struct list_head      recovd_managed_items; /* items managed  */
@@ -202,6 +223,11 @@ struct echo_client_obd {
         struct lustre_handle conn;   /* the local connection to osc/lov */
 };
 
+struct cache_obd {
+        struct lustre_handle cobd_target;       /* local connection to target obd */
+        struct lustre_handle cobd_cache;        /* local connection to cache obd */
+};
+
 struct lov_tgt_desc {
         obd_uuid_t uuid;
         struct lustre_handle conn;
@@ -260,6 +286,8 @@ struct obd_device {
                 struct recovd_obd recovd;
                 struct trace_obd trace;
                 struct lov_obd lov;
+                struct cache_obd cobd;
+                struct ptlbd_obd ptlbd;
 #if 0
                 struct snap_obd snap;
 #endif
@@ -270,6 +298,7 @@ struct obd_device {
 };
 
 struct obd_ops {
+        struct module *o_owner;
         int (*o_iocontrol)(unsigned int cmd, struct lustre_handle *, int len,
                            void *karg, void *uarg);
         int (*o_get_info)(struct lustre_handle *, obd_count keylen, void *key,
diff --git a/lustre/include/linux/obd_cache.h b/lustre/include/linux/obd_cache.h
new file mode 100644 (file)
index 0000000..e75b9f4
--- /dev/null
@@ -0,0 +1,13 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ */
+
+#ifndef _OBD_CACHE_H__
+#define _OBD_CACHE_H__
+
+#ifdef __KERNEL__
+
+#define OBD_CACHE_DEVICENAME "cobd"
+
+#endif
+#endif
index 197de84..ed3eb99 100644 (file)
@@ -673,8 +673,6 @@ static inline void iattr_from_obdo(struct iattr *attr, struct obdo *oa,
 static inline void obdo_from_inode(struct obdo *dst, struct inode *src,
                                    obd_flag valid)
 {
-//        if (valid & OBD_MD_FLID)
-//                dst->o_id = src->i_ino;
         if (valid & OBD_MD_FLATIME)
                 dst->o_atime = src->i_atime;
         if (valid & OBD_MD_FLMTIME)
@@ -710,8 +708,8 @@ static inline void obdo_from_inode(struct obdo *dst, struct inode *src,
 static inline void obdo_to_inode(struct inode *dst, struct obdo *src,
                                  obd_flag valid)
 {
-//        if (valid & OBD_MD_FLID)
-//                dst->i_ino = src->o_id;
+        valid &= src->o_valid;
+
         if (valid & OBD_MD_FLATIME)
                 dst->i_atime = src->o_atime;
         if (valid & OBD_MD_FLMTIME)
@@ -847,7 +845,8 @@ int class_name2dev(char *name);
 int class_uuid2dev(char *uuid);
 struct obd_device *class_uuid2obd(char *uuid);
 struct obd_export *class_new_export(struct obd_device *obddev);
-struct obd_type *class_nm_to_type(char* name);
+struct obd_type *class_get_type(char *name);
+void class_put_type(struct obd_type *type);
 void class_destroy_export(struct obd_export *exp);
 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
                   obd_uuid_t cluuid);
@@ -866,6 +865,13 @@ static inline struct ptlrpc_connection *class_rd2conn(struct recovd_data *rd)
         return list_entry(rd, struct ptlrpc_connection, c_recovd_data);
 }
 
+struct obd_statfs;
+struct statfs;
+void statfs_pack(struct obd_statfs *osfs, struct statfs *sfs);
+void statfs_unpack(struct statfs *sfs, struct obd_statfs *osfs);
+void obd_statfs_pack(struct obd_statfs *tgt, struct obd_statfs *src);
+void obd_statfs_unpack(struct obd_statfs *tgt, struct obd_statfs *src);
+
 #endif
 
 /* sysctl.c */
index e999451..5de0a25 100644 (file)
@@ -36,7 +36,7 @@
 void ost_pack_niobuf(void **tmp, __u64 offset, __u32 len, __u32 flags,
                      __u32 xid);
 void ost_unpack_niobuf(void **tmp, struct niobuf_remote **nbp);
-void ost_pack_ioo(void **tmp, struct lov_stripe_md *oa, int bufcnt);
-void ost_unpack_ioo(void **tmp, struct obd_ioobj **ioop);
+void ost_pack_ioo(struct obd_ioobj **ioop, struct lov_stripe_md *oa,int bufcnt);
+void ost_unpack_ioo(struct obd_ioobj **tmp, struct obd_ioobj **ioop);
 
 #endif
diff --git a/lustre/include/linux/obd_ptlbd.h b/lustre/include/linux/obd_ptlbd.h
new file mode 100644 (file)
index 0000000..b4f9fe9
--- /dev/null
@@ -0,0 +1,30 @@
+#ifndef _OBD_PTLBD_H
+#define _OBD_PTLBD_H
+
+#include <linux/lustre_idl.h>
+/*
+ * Copyright (C) 2002  Cluster File Systems, Inc.
+ *
+ * This code is issued under the GNU General Public License.
+ * See the file COPYING in this distribution
+ */
+
+#define OBD_PTLBD_SV_DEVICENAME "ptlbd_server"
+#define OBD_PTLBD_CL_DEVICENAME "ptlbd_client"
+
+/* XXX maybe this isn't the best header to be dumping all this in.. */
+
+extern int ptlbd_blk_init(void);
+extern int ptlbd_cl_init(void);
+extern int ptlbd_sv_init(void);
+
+extern void ptlbd_blk_exit(void);
+extern void ptlbd_cl_exit(void);
+extern void ptlbd_sv_exit(void);
+
+extern void ptlbd_blk_register(struct ptlbd_obd *ptlbd);
+extern int ptlbd_send_req(struct ptlbd_obd *, ptlbd_cmd_t cmd, 
+               struct buffer_head *);
+extern int ptlbd_parse_req(struct ptlrpc_request *req);
+
+#endif
index f6c2770..e3e23f4 100644 (file)
@@ -31,6 +31,7 @@
 
 /* global variables */
 extern atomic_t obd_memory;
+extern int obd_memmax;
 extern unsigned long obd_fail_loc;
 extern unsigned long obd_timeout;
 extern char obd_recovery_upcall[128];
@@ -66,6 +67,7 @@ extern char obd_recovery_upcall[128];
 #define OBD_FAIL_MDS_GETSTATUS_PACK      0x11c
 #define OBD_FAIL_MDS_STATFS_PACK         0x11d
 #define OBD_FAIL_MDS_STATFS_NET          0x11e
+#define OBD_FAIL_MDS_GETATTR_NAME_NET    0x11f
 
 #define OBD_FAIL_OST                     0x200
 #define OBD_FAIL_OST_CONNECT_NET         0x201
@@ -156,13 +158,17 @@ do {                                                                    \
         int s = (size);                                                 \
         (ptr) = lptr = kmalloc(s, GFP_KERNEL);                          \
         if (lptr == NULL) {                                             \
-                CERROR("kmalloc of '" #ptr "' (%ld bytes) failed "      \
+                CERROR("kmalloc of '" #ptr "' (%d bytes) failed "       \
                        "at %s:%d\n", s, __FILE__, __LINE__);            \
         } else {                                                        \
+                int obd_curmem;                                         \
                 memset(lptr, 0, s);                                     \
                 atomic_add(s, &obd_memory);                             \
+                obd_curmem = atomic_read(&obd_memory);                  \
+                if (obd_curmem > obd_memmax)                            \
+                        obd_memmax = obd_curmem;                        \
                 CDEBUG(D_MALLOC, "kmalloced '" #ptr "': %d at %p "      \
-                       "(tot %d)\n", s, lptr, atomic_read(&obd_memory));\
+                       "(tot %d)\n", s, lptr, obd_curmem);              \
         }                                                               \
 } while (0)
 
index 680d4f0..e8ffd5b 100644 (file)
@@ -65,6 +65,10 @@ void l_lock(struct lustre_lock *lock)
                 owner = 1;
         spin_unlock(&lock->l_spin);
 
+        /* This is safe to increment outside the spinlock because we
+         * can only have 1 CPU running on the current task
+         * (i.e. l_owner == current), regardless of the number of CPUs.
+         */
         if (owner) {
                 ++lock->l_depth;
         } else {
index 468eb2b..ae1153f 100644 (file)
@@ -67,7 +67,8 @@ static void policy_internal(struct list_head *queue, struct ldlm_extent *req_ex,
 }
 
 /* apply the internal policy by walking all the lists */
-int ldlm_extent_policy(struct ldlm_lock *lock, void *req_cookie,
+int ldlm_extent_policy(struct ldlm_namespace *ns, struct ldlm_lock *lock,
+                       void *req_cookie,
                        ldlm_mode_t mode, int flags, void *data)
 {
         struct ldlm_resource *res = lock->l_resource;
@@ -79,11 +80,11 @@ int ldlm_extent_policy(struct ldlm_lock *lock, void *req_cookie,
         if (!res)
                 LBUG();
 
-        l_lock(&res->lr_namespace->ns_lock);
+        l_lock(&ns->ns_lock);
         policy_internal(&res->lr_granted, req_ex, &new_ex, mode);
         policy_internal(&res->lr_converting, req_ex, &new_ex, mode);
         policy_internal(&res->lr_waiting, req_ex, &new_ex, mode);
-        l_unlock(&res->lr_namespace->ns_lock);
+        l_unlock(&ns->ns_lock);
 
         memcpy(&lock->l_extent, &new_ex, sizeof(new_ex));
 
index 81b3b5d..a1220ab 100644 (file)
@@ -25,7 +25,6 @@
 
 #include <linux/slab.h>
 #include <linux/module.h>
-#include <linux/random.h>
 #include <linux/lustre_dlm.h>
 #include <linux/lustre_mds.h>
 #include <linux/obd_class.h>
@@ -102,12 +101,13 @@ ldlm_res_compat ldlm_res_compat_table[] = {
 
 static ldlm_res_policy ldlm_intent_policy_func;
 
-static int ldlm_plain_policy(struct ldlm_lock *lock, void *req_cookie,
-                             ldlm_mode_t mode, int flags, void *data)
+static int ldlm_plain_policy(struct ldlm_namespace *ns, struct ldlm_lock *lock,
+                             void *req_cookie, ldlm_mode_t mode, int flags,
+                             void *data)
 {
         if ((flags & LDLM_FL_HAS_INTENT) && ldlm_intent_policy_func) {
-                return ldlm_intent_policy_func(lock, req_cookie, mode, flags, 
-                                               data);
+                return ldlm_intent_policy_func(ns, lock, req_cookie, mode,
+                                               flags, data);
         }
 
         return ELDLM_OK;
@@ -186,6 +186,8 @@ void ldlm_lock_remove_from_lru(struct ldlm_lock *lock)
         EXIT;
 }
 
+/* Only called with strict == 0 by recovery, to mark in-use locks as
+ * should-be-destroyed */
 void ldlm_lock_destroy(struct ldlm_lock *lock)
 {
         ENTRY;
@@ -194,16 +196,16 @@ void ldlm_lock_destroy(struct ldlm_lock *lock)
         if (!list_empty(&lock->l_children)) {
                 LDLM_DEBUG(lock, "still has children (%p)!",
                            lock->l_children.next);
-                ldlm_lock_dump(lock);
+                ldlm_lock_dump(D_ERROR, lock);
                 LBUG();
         }
         if (lock->l_readers || lock->l_writers) {
                 LDLM_DEBUG(lock, "lock still has references");
-                ldlm_lock_dump(lock);
+                ldlm_lock_dump(D_OTHER, lock);
         }
 
         if (!list_empty(&lock->l_res_link)) {
-                ldlm_lock_dump(lock);
+                ldlm_lock_dump(D_ERROR, lock);
                 LBUG();
         }
 
@@ -217,6 +219,7 @@ void ldlm_lock_destroy(struct ldlm_lock *lock)
 
         list_del_init(&lock->l_export_chain);
         ldlm_lock_remove_from_lru(lock);
+        portals_handle_unhash(&lock->l_handle);
 
 #if 0
         /* Wake anyone waiting for this lock */
@@ -257,7 +260,6 @@ static struct ldlm_lock *ldlm_lock_new(struct ldlm_lock *parent,
         if (lock == NULL)
                 RETURN(NULL);
 
-        get_random_bytes(&lock->l_random, sizeof(__u64));
         lock->l_resource = ldlm_resource_getref(resource);
 
         atomic_set(&lock->l_refc, 2);
@@ -279,12 +281,15 @@ static struct ldlm_lock *ldlm_lock_new(struct ldlm_lock *parent,
                 l_unlock(&parent->l_resource->lr_namespace->ns_lock);
         }
 
+        INIT_LIST_HEAD(&lock->l_handle.h_link);
+        portals_handle_hash(&lock->l_handle, lock_handle_addref);
+
         RETURN(lock);
 }
 
-int ldlm_lock_change_resource(struct ldlm_lock *lock, __u64 new_resid[3])
+int ldlm_lock_change_resource(struct ldlm_namespace *ns, struct ldlm_lock *lock,
+                              __u64 new_resid[3])
 {
-        struct ldlm_namespace *ns = lock->l_resource->lr_namespace;
         struct ldlm_resource *oldres = lock->l_resource;
         ENTRY;
 
@@ -321,66 +326,63 @@ int ldlm_lock_change_resource(struct ldlm_lock *lock, __u64 new_resid[3])
 
 void ldlm_lock2handle(struct ldlm_lock *lock, struct lustre_handle *lockh)
 {
-        lockh->addr = (__u64) (unsigned long)lock;
-        lockh->cookie = lock->l_random;
+        //lockh->addr = (__u64)(unsigned long)lock;
+        memset(&lockh->addr, 0x69, sizeof(lockh->addr));
+        lockh->cookie = lock->l_handle.h_cookie;
 }
 
-/* 
- * if flags: atomically get the lock and set the flags. 
- * Return NULL if flag already set
+/* if flags: atomically get the lock and set the flags. 
+ *           Return NULL if flag already set
  */
 
-struct ldlm_lock *__ldlm_handle2lock(struct lustre_handle *handle, int strict,
-                                     int flags)
+struct ldlm_lock *__ldlm_handle2lock(struct lustre_handle *handle, int flags)
 {
         struct ldlm_lock *lock = NULL, *retval = NULL;
         ENTRY;
 
         LASSERT(handle);
 
-        if (!handle->addr)
+        lock = portals_handle2object(handle->cookie);
+        if (lock == NULL)
                 RETURN(NULL);
 
-        lock = (struct ldlm_lock *)(unsigned long)(handle->addr);
-        if (!kmem_cache_validate(ldlm_lock_slab, (void *)lock)) {
-                //CERROR("bogus lock %p\n", lock);
-                GOTO(out2, retval);
-        }
-
-        if (lock->l_random != handle->cookie) {
-                //CERROR("bogus cookie: lock %p has "LPX64" vs. handle "LPX64
-                //       "\n", lock, lock->l_random, handle->cookie);
-                GOTO(out2, NULL);
-        }
-        if (!lock->l_resource) {
-                CERROR("trying to lock bogus resource: lock %p\n", lock);
-                //LDLM_DEBUG(lock, "ldlm_handle2lock(%p)", lock);
-                GOTO(out2, retval);
-        }
-        if (!lock->l_resource->lr_namespace) {
-                CERROR("trying to lock bogus namespace: lock %p\n", lock);
-                //LDLM_DEBUG(lock, "ldlm_handle2lock(%p)", lock);
-                GOTO(out2, retval);
-        }
+        LASSERT(lock->l_resource != NULL);
+        LASSERT(lock->l_resource->lr_namespace != NULL);
 
         l_lock(&lock->l_resource->lr_namespace->ns_lock);
-        if (strict && lock->l_destroyed) {
+
+        /* It's unlikely but possible that someone marked the lock as
+         * destroyed after we did handle2object on it */
+        if (lock->l_destroyed) {
                 CERROR("lock already destroyed: lock %p\n", lock);
-                //LDLM_DEBUG(lock, "ldlm_handle2lock(%p)", lock);
-                GOTO(out, NULL);
+                LDLM_LOCK_PUT(lock);
+                GOTO(out, retval);
         }
 
-        if (flags && (lock->l_flags & flags))
-                GOTO(out, NULL);
+        if (flags && (lock->l_flags & flags)) {
+                LDLM_LOCK_PUT(lock);
+                GOTO(out, retval);
+        }
 
         if (flags)
                 lock->l_flags |= flags;
 
-        retval = LDLM_LOCK_GET(lock);
+        retval = lock;
         EXIT;
  out:
         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
- out2:
+        return retval;
+}
+
+struct ldlm_lock *ldlm_handle2lock_ns(struct ldlm_namespace *ns,
+                                      struct lustre_handle *handle)
+{
+        struct ldlm_lock *retval = NULL;
+
+        l_lock(&ns->ns_lock);
+        retval = __ldlm_handle2lock(handle, 0);
+        l_unlock(&ns->ns_lock);
+
         return retval;
 }
 
@@ -456,7 +458,7 @@ int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns,
 
 void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode)
 {
-        struct ldlm_lock *lock = __ldlm_handle2lock(lockh, 0, 0);
+        struct ldlm_lock *lock = __ldlm_handle2lock(lockh, 0);
         struct ldlm_namespace *ns;
         ENTRY;
 
@@ -466,10 +468,13 @@ void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode)
         LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]);
         ns = lock->l_resource->lr_namespace;
         l_lock(&lock->l_resource->lr_namespace->ns_lock);
-        if (mode == LCK_NL || mode == LCK_CR || mode == LCK_PR)
+        if (mode == LCK_NL || mode == LCK_CR || mode == LCK_PR) {
+                LASSERT(lock->l_readers > 0);
                 lock->l_readers--;
-        else
+        } else {
+                LASSERT(lock->l_writers > 0);
                 lock->l_writers--;
+        }
 
         /* If we received a blocked AST and this was the last reference,
          * run the callback. */
@@ -493,8 +498,9 @@ void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode)
                 ns->ns_nr_unused++;
                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
                 ldlm_cancel_lru(ns);
-        } else
+        } else {
                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+        }
 
         LDLM_LOCK_PUT(lock);    /* matches the ldlm_lock_get in addref */
         LDLM_LOCK_PUT(lock);    /* matches the handle2lock above */
@@ -711,8 +717,8 @@ struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns,
         return lock;
 }
 
-/* Must be called with lock->l_lock and lock->l_resource->lr_lock not held */
-ldlm_error_t ldlm_lock_enqueue(struct ldlm_lock * lock,
+ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *ns,
+                               struct ldlm_lock *lock,
                                void *cookie, int cookie_len,
                                int *flags,
                                ldlm_completion_callback completion,
@@ -734,7 +740,7 @@ ldlm_error_t ldlm_lock_enqueue(struct ldlm_lock * lock,
         if (!local && !(*flags & LDLM_FL_REPLAY) &&
             (policy = ldlm_res_policy_table[res->lr_type])) {
                 int rc;
-                rc = policy(lock, cookie, lock->l_req_mode, *flags, NULL);
+                rc = policy(ns, lock, cookie, lock->l_req_mode, *flags, NULL);
 
                 if (rc == ELDLM_LOCK_CHANGED) {
                         res = lock->l_resource;
@@ -745,7 +751,7 @@ ldlm_error_t ldlm_lock_enqueue(struct ldlm_lock * lock,
                 }
         }
 
-        l_lock(&res->lr_namespace->ns_lock);
+        l_lock(&ns->ns_lock);
         if (local && lock->l_req_mode == lock->l_granted_mode) {
                 /* The server returned a blocked lock, but it was granted before
                  * we got a chance to actually enqueue it.  We don't need to do
@@ -767,7 +773,7 @@ ldlm_error_t ldlm_lock_enqueue(struct ldlm_lock * lock,
          * FIXME (bug 268): Detect obvious lies by checking compatibility in
          * granted/converting queues. */
         ldlm_resource_unlink_lock(lock);
-        if (local || (*flags & LDLM_FL_REPLAY)) {
+        if (local) {
                 if (*flags & LDLM_FL_BLOCK_CONV)
                         ldlm_resource_add_lock(res, res->lr_converting.prev,
                                                lock);
@@ -776,6 +782,19 @@ ldlm_error_t ldlm_lock_enqueue(struct ldlm_lock * lock,
                 else
                         ldlm_grant_lock(lock);
                 GOTO(out, ELDLM_OK);
+        } else if (*flags & LDLM_FL_REPLAY) {
+                if (*flags & LDLM_FL_BLOCK_CONV) {
+                        ldlm_resource_add_lock(res, res->lr_converting.prev,
+                                               lock);
+                        GOTO(out, ELDLM_OK);
+                } else if (*flags & LDLM_FL_BLOCK_WAIT) {
+                        ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
+                        GOTO(out, ELDLM_OK);
+                } else if (*flags & LDLM_FL_BLOCK_GRANTED) {
+                        ldlm_grant_lock(lock);
+                        GOTO(out, ELDLM_OK);
+                }
+                /* If no flags, fall through to normal enqueue path. */
         }
 
         /* FIXME: We may want to optimize by checking lr_most_restr */
@@ -798,7 +817,7 @@ ldlm_error_t ldlm_lock_enqueue(struct ldlm_lock * lock,
         ldlm_grant_lock(lock);
         EXIT;
       out:
-        l_unlock(&res->lr_namespace->ns_lock);
+        l_unlock(&ns->ns_lock);
         /* Don't set 'completion_ast' until here so that if the lock is granted
          * immediately we don't do an unnecessary completion call. */
         lock->l_completion_ast = completion;
@@ -828,10 +847,10 @@ static int ldlm_reprocess_queue(struct ldlm_resource *res,
         RETURN(0);
 }
 
-void ldlm_run_ast_work(struct list_head *rpc_list)
+int ldlm_run_ast_work(struct list_head *rpc_list)
 {
         struct list_head *tmp, *pos;
-        int rc;
+        int rc, retval = 0;
         ENTRY;
 
         list_for_each_safe(tmp, pos, rpc_list) {
@@ -844,20 +863,34 @@ void ldlm_run_ast_work(struct list_head *rpc_list)
                                  w->w_datalen, LDLM_CB_BLOCKING);
                 else
                         rc = w->w_lock->l_completion_ast(w->w_lock, w->w_flags);
-                if (rc)
+                if (rc == -ERESTART)
+                        retval = rc;
+                else if (rc)
                         CERROR("Failed AST - should clean & disconnect "
                                "client\n");
                 LDLM_LOCK_PUT(w->w_lock);
                 list_del(&w->w_list);
                 OBD_FREE(w, sizeof(*w));
         }
-        EXIT;
+        RETURN(retval);
+}
+
+static int reprocess_one_queue(struct ldlm_resource *res, void *closure)
+{
+        ldlm_reprocess_all(res);
+        return LDLM_ITER_CONTINUE;
+}
+
+void ldlm_reprocess_all_ns(struct ldlm_namespace *ns)
+{
+        (void)ldlm_namespace_foreach_res(ns, reprocess_one_queue, NULL);
 }
 
 /* Must be called with resource->lr_lock not taken. */
 void ldlm_reprocess_all(struct ldlm_resource *res)
 {
         struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
+        int rc;
         ENTRY;
 
         /* Local lock trees don't get reprocessed. */
@@ -866,6 +899,7 @@ void ldlm_reprocess_all(struct ldlm_resource *res)
                 return;
         }
 
+ restart:
         l_lock(&res->lr_namespace->ns_lock);
         res->lr_tmp = &rpc_list;
 
@@ -876,7 +910,9 @@ void ldlm_reprocess_all(struct ldlm_resource *res)
         res->lr_tmp = NULL;
         l_unlock(&res->lr_namespace->ns_lock);
 
-        ldlm_run_ast_work(&rpc_list);
+        rc = ldlm_run_ast_work(&rpc_list);
+        if (rc == -ERESTART)
+                goto restart;
         EXIT;
 }
 
@@ -905,10 +941,12 @@ void ldlm_lock_cancel(struct ldlm_lock *lock)
         ns = res->lr_namespace;
 
         l_lock(&ns->ns_lock);
+        /* Please do not, no matter how tempting, remove this LBUG without
+         * talking to me first. -phik */
         if (lock->l_readers || lock->l_writers) {
                 LDLM_DEBUG(lock, "lock still has references");
-                ldlm_lock_dump(lock);
-                //LBUG();
+                ldlm_lock_dump(D_OTHER, lock);
+                LBUG();
         }
 
         ldlm_cancel_callback(lock);
@@ -1001,18 +1039,18 @@ struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode,
         RETURN(res);
 }
 
-void ldlm_lock_dump(struct ldlm_lock *lock)
+void ldlm_lock_dump(int level, struct ldlm_lock *lock)
 {
         char ver[128];
 
-        if (!(portal_debug & D_OTHER))
+        if (!(portal_debug & level))
                 return;
 
         if (RES_VERSION_SIZE != 4)
                 LBUG();
 
         if (!lock) {
-                CDEBUG(D_OTHER, "  NULL LDLM lock\n");
+                CDEBUG(level, "  NULL LDLM lock\n");
                 return;
         }
 
@@ -1020,27 +1058,26 @@ void ldlm_lock_dump(struct ldlm_lock *lock)
                  lock->l_version[0], lock->l_version[1],
                  lock->l_version[2], lock->l_version[3]);
 
-        CDEBUG(D_OTHER, "  -- Lock dump: %p (%s)\n", lock, ver);
+        CDEBUG(level, "  -- Lock dump: %p (%s)\n", lock, ver);
         if (lock->l_export && lock->l_export->exp_connection)
-                CDEBUG(D_OTHER, "  Node: NID %x (rhandle: "LPX64")\n",
+                CDEBUG(level, "  Node: NID %x (rhandle: "LPX64")\n",
                        lock->l_export->exp_connection->c_peer.peer_nid,
-                       lock->l_remote_handle.addr);
+                       lock->l_remote_handle.cookie);
         else
-                CDEBUG(D_OTHER, "  Node: local\n");
-        CDEBUG(D_OTHER, "  Parent: %p\n", lock->l_parent);
-        CDEBUG(D_OTHER, "  Resource: %p ("LPD64")\n", lock->l_resource,
+                CDEBUG(level, "  Node: local\n");
+        CDEBUG(level, "  Parent: %p\n", lock->l_parent);
+        CDEBUG(level, "  Resource: %p ("LPD64")\n", lock->l_resource,
                lock->l_resource->lr_name[0]);
-        CDEBUG(D_OTHER, "  Requested mode: %d, granted mode: %d\n",
+        CDEBUG(level, "  Requested mode: %d, granted mode: %d\n",
                (int)lock->l_req_mode, (int)lock->l_granted_mode);
-        CDEBUG(D_OTHER, "  Readers: %u ; Writers; %u\n",
+        CDEBUG(level, "  Readers: %u ; Writers; %u\n",
                lock->l_readers, lock->l_writers);
         if (lock->l_resource->lr_type == LDLM_EXTENT)
-                CDEBUG(D_OTHER, "  Extent: %Lu -> %Lu\n",
-                       (unsigned long long)lock->l_extent.start,
-                       (unsigned long long)lock->l_extent.end);
+                CDEBUG(level, "  Extent: "LPU64" -> "LPU64"\n",
+                       lock->l_extent.start, lock->l_extent.end);
 }
 
-void ldlm_lock_dump_handle(struct lustre_handle *lockh)
+void ldlm_lock_dump_handle(int level, struct lustre_handle *lockh)
 {
         struct ldlm_lock *lock;
 
@@ -1048,7 +1085,7 @@ void ldlm_lock_dump_handle(struct lustre_handle *lockh)
         if (lock == NULL)
                 return;
 
-        ldlm_lock_dump(lock);
+        ldlm_lock_dump(D_OTHER, lock);
 
         LDLM_LOCK_PUT(lock);
 }
index b76fbcd..d826db1 100644 (file)
@@ -57,6 +57,7 @@ static void waiting_locks_callback(unsigned long unused)
                                                  l_pending_chain);
                 if (l->l_callback_timeout > jiffies)
                         break;
+                CERROR("lock timer expired, lock %p\n", l);
                 LDLM_DEBUG(l, "timer expired, recovering exp %p on conn %p",
                            l->l_export, l->l_export->exp_connection);
                 recovd_conn_fail(l->l_export->exp_connection);
@@ -162,14 +163,26 @@ static int ldlm_server_blocking_ast(struct ldlm_lock *lock,
         memcpy(&body->lock_desc, desc, sizeof(*desc));
 
         LDLM_DEBUG(lock, "server preparing blocking AST");
-        req->rq_replen = 0; /* no reply needed */
+        req->rq_replen = lustre_msg_size(0, NULL);
 
         ldlm_add_waiting_lock(lock);
         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
 
-        (void)ptl_send_rpc(req);
+        req->rq_level = LUSTRE_CONN_RECOVD;
+        rc = ptlrpc_queue_wait(req);
+        if (rc == -ETIMEDOUT || rc == -EINTR) {
+                ldlm_expired_completion_wait(lock);
+        } else if (rc) {
+                CERROR("client returned %d from blocking AST for lock %p\n",
+                       req->rq_status, lock);
+                LDLM_DEBUG(lock, "client returned error %d from blocking AST",
+                           req->rq_status);
+                ldlm_lock_cancel(lock);
+                /* Server-side AST functions are called from ldlm_reprocess_all,
+                 * which needs to be told to please restart its reprocessing. */
+                rc = -ERESTART;
+        }
 
-        /* not waiting for reply */
         ptlrpc_req_finished(req);
 
         RETURN(rc);
@@ -199,11 +212,22 @@ static int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags)
         ldlm_lock2desc(lock, &body->lock_desc);
 
         LDLM_DEBUG(lock, "server preparing completion AST");
-        req->rq_replen = 0; /* no reply needed */
-
-        (void)ptl_send_rpc(req);
-
-        /* not waiting for reply */
+        req->rq_replen = lustre_msg_size(0, NULL);
+
+        req->rq_level = LUSTRE_CONN_RECOVD;
+        rc = ptlrpc_queue_wait(req);
+        if (rc == -ETIMEDOUT || rc == -EINTR) {
+                ldlm_expired_completion_wait(lock);
+        } else if (rc) {
+                CERROR("client returned %d from completion AST for lock %p\n",
+                       req->rq_status, lock);
+                LDLM_DEBUG(lock, "client returned error %d from completion AST",
+                           req->rq_status);
+                ldlm_lock_cancel(lock);
+                /* Server-side AST functions are called from ldlm_reprocess_all,
+                 * which needs to be told to please restart its reprocessing. */
+                rc = -ERESTART;
+        }
         ptlrpc_req_finished(req);
 
         RETURN(rc);
@@ -265,8 +289,8 @@ int ldlm_handle_enqueue(struct ptlrpc_request *req)
                  &lock->l_export->exp_ldlm_data.led_held_locks);
         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
 
-        err = ldlm_lock_enqueue(lock, cookie, cookielen, &flags,
-                                ldlm_server_completion_ast,
+        err = ldlm_lock_enqueue(obddev->obd_namespace, lock, cookie, cookielen,
+                                &flags, ldlm_server_completion_ast,
                                 ldlm_server_blocking_ast);
         if (err != ELDLM_OK)
                 GOTO(out, err);
@@ -384,7 +408,11 @@ int ldlm_handle_cancel(struct ptlrpc_request *req)
         RETURN(0);
 }
 
-static int ldlm_handle_bl_callback(struct ptlrpc_request *req)
+struct ldlm_lock *ldlm_handle2lock_ns(struct ldlm_namespace *ns,
+                                      struct lustre_handle *handle);
+
+static int ldlm_handle_bl_callback(struct ptlrpc_request *req,
+                                   struct ldlm_namespace *ns)
 {
         struct ldlm_request *dlm_req;
         struct ldlm_lock *lock;
@@ -395,11 +423,11 @@ static int ldlm_handle_bl_callback(struct ptlrpc_request *req)
 
         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
 
-        lock = ldlm_handle2lock(&dlm_req->lock_handle1);
+        lock = ldlm_handle2lock_ns(ns, &dlm_req->lock_handle1);
         if (!lock) {
                 CERROR("blocking callback on lock "LPX64" - lock disappeared\n",
-                       dlm_req->lock_handle1.addr);
-                RETURN(0);
+                       dlm_req->lock_handle1.cookie);
+                RETURN(-EINVAL);
         }
 
         LDLM_DEBUG(lock, "client blocking AST callback handler START");
@@ -426,7 +454,8 @@ static int ldlm_handle_bl_callback(struct ptlrpc_request *req)
         RETURN(0);
 }
 
-static int ldlm_handle_cp_callback(struct ptlrpc_request *req)
+static int ldlm_handle_cp_callback(struct ptlrpc_request *req,
+                                   struct ldlm_namespace *ns)
 {
         struct list_head ast_list = LIST_HEAD_INIT(ast_list);
         struct ldlm_request *dlm_req;
@@ -437,16 +466,16 @@ static int ldlm_handle_cp_callback(struct ptlrpc_request *req)
 
         dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
 
-        lock = ldlm_handle2lock(&dlm_req->lock_handle1);
+        lock = ldlm_handle2lock_ns(ns, &dlm_req->lock_handle1);
         if (!lock) {
                 CERROR("completion callback on lock "LPX64" - lock "
-                       "disappeared\n", dlm_req->lock_handle1.addr);
-                RETURN(0);
+                       "disappeared\n", dlm_req->lock_handle1.cookie);
+                RETURN(-EINVAL);
         }
 
         LDLM_DEBUG(lock, "client completion callback handler START");
 
-        l_lock(&lock->l_resource->lr_namespace->ns_lock);
+        l_lock(&ns->ns_lock);
 
         /* If we receive the completion AST before the actual enqueue returned,
          * then we might need to switch lock modes, resources, or extents. */
@@ -461,14 +490,14 @@ static int ldlm_handle_cp_callback(struct ptlrpc_request *req)
         if (memcmp(dlm_req->lock_desc.l_resource.lr_name,
                    lock->l_resource->lr_name,
                    sizeof(__u64) * RES_NAME_SIZE) != 0) {
-                ldlm_lock_change_resource(lock,
+                ldlm_lock_change_resource(ns, lock,
                                          dlm_req->lock_desc.l_resource.lr_name);
                 LDLM_DEBUG(lock, "completion AST, new resource");
         }
         lock->l_resource->lr_tmp = &ast_list;
         ldlm_grant_lock(lock);
         lock->l_resource->lr_tmp = NULL;
-        l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+        l_unlock(&ns->ns_lock);
         LDLM_DEBUG(lock, "callback handler finished, about to run_ast_work");
         LDLM_LOCK_PUT(lock);
 
@@ -481,12 +510,13 @@ static int ldlm_handle_cp_callback(struct ptlrpc_request *req)
 
 static int ldlm_callback_handler(struct ptlrpc_request *req)
 {
+        struct ldlm_namespace *ns;
         int rc;
         ENTRY;
 
         rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
         if (rc) {
-                CERROR("lustre_ldlm: Invalid request: %d\n", rc);
+                CERROR("Invalid request: %d\n", rc);
                 RETURN(rc);
         }
 
@@ -501,32 +531,44 @@ static int ldlm_callback_handler(struct ptlrpc_request *req)
                 dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
                 CERROR("--> lock addr: "LPX64", cookie: "LPX64"\n",
                        dlm_req->lock_handle1.addr,dlm_req->lock_handle1.cookie);
-                CERROR("--> ignoring this error as a temporary workaround!  "
-                       "beware!\n");
-                //RETURN(-ENOTCONN);
+                RETURN(-ENOTCONN);
         }
 
+        LASSERT(req->rq_export != NULL);
+        LASSERT(req->rq_export->exp_obd != NULL);
+        ns = req->rq_export->exp_obd->obd_namespace;
+        LASSERT(ns != NULL);
+
         switch (req->rq_reqmsg->opc) {
         case LDLM_BL_CALLBACK:
                 CDEBUG(D_INODE, "blocking ast\n");
                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0);
-                rc = ldlm_handle_bl_callback(req);
-                RETURN(rc);
+                rc = ldlm_handle_bl_callback(req, ns);
+                break;
         case LDLM_CP_CALLBACK:
                 CDEBUG(D_INODE, "completion ast\n");
                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CP_CALLBACK, 0);
-                rc = ldlm_handle_cp_callback(req);
-                RETURN(rc);
-
+                rc = ldlm_handle_cp_callback(req, ns);
+                break;
         default:
                 CERROR("invalid opcode %d\n", req->rq_reqmsg->opc);
                 RETURN(-EINVAL);
         }
 
+        req->rq_status = rc;
+        if (rc) {
+                ptlrpc_error(req->rq_svc, req);
+        } else {
+                rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen,
+                                     &req->rq_repmsg);
+                if (rc)
+                        RETURN(rc);
+                ptlrpc_reply(req->rq_svc, req);
+        }
+
         RETURN(0);
 }
 
-
 static int ldlm_cancel_handler(struct ptlrpc_request *req)
 {
         int rc;
@@ -539,11 +581,14 @@ static int ldlm_cancel_handler(struct ptlrpc_request *req)
         }
 
         if (req->rq_export == NULL) {
+                struct ldlm_request *dlm_req;
                 CERROR("operation %d with bad export (ptl req %d/rep %d)\n",
                        req->rq_reqmsg->opc, req->rq_request_portal,
                        req->rq_reply_portal);
                 CERROR("--> export addr: "LPX64", cookie: "LPX64"\n",
                        req->rq_reqmsg->addr, req->rq_reqmsg->cookie);
+                dlm_req = lustre_msg_buf(req->rq_reqmsg, 0);
+                ldlm_lock_dump_handle(D_ERROR, &dlm_req->lock_handle1);
                 CERROR("--> ignoring this error as a temporary workaround!  "
                        "beware!\n");
                 //RETURN(-ENOTCONN);
@@ -568,7 +613,6 @@ static int ldlm_cancel_handler(struct ptlrpc_request *req)
         RETURN(0);
 }
 
-
 static int ldlm_iocontrol(unsigned int cmd, struct lustre_handle *conn, int len,
                           void *karg, void *uarg)
 {
@@ -579,7 +623,7 @@ static int ldlm_iocontrol(unsigned int cmd, struct lustre_handle *conn, int len,
 
         if (_IOC_TYPE(cmd) != IOC_LDLM_TYPE || _IOC_NR(cmd) < IOC_LDLM_MIN_NR ||
             _IOC_NR(cmd) > IOC_LDLM_MAX_NR) {
-                CDEBUG(D_IOCTL, "invalid ioctl (type %ld, nr %ld, size %ld)\n",
+                CDEBUG(D_IOCTL, "invalid ioctl (type %d, nr %d, size %d)\n",
                        _IOC_TYPE(cmd), _IOC_NR(cmd), _IOC_SIZE(cmd));
                 RETURN(-EINVAL);
         }
@@ -619,11 +663,9 @@ static int ldlm_setup(struct obd_device *obddev, obd_count len, void *buf)
         if (ldlm_already_setup)
                 RETURN(-EALREADY);
 
-        MOD_INC_USE_COUNT;
-
         rc = ldlm_proc_setup(obddev);
         if (rc != 0)
-                GOTO(out_dec, rc);
+                RETURN(rc);
 
         ldlm->ldlm_cb_service =
                 ptlrpc_init_svc(LDLM_NEVENTS, LDLM_NBUFS, LDLM_BUFSIZE,
@@ -689,8 +731,6 @@ static int ldlm_setup(struct obd_device *obddev, obd_count len, void *buf)
  out_proc:
         ldlm_proc_cleanup(obddev);
 
- out_dec:
-        MOD_DEC_USE_COUNT;
         return rc;
 }
 
@@ -711,7 +751,6 @@ static int ldlm_cleanup(struct obd_device *obddev)
         ldlm_proc_cleanup(obddev);
 
         ldlm_already_setup = 0;
-        MOD_DEC_USE_COUNT;
         RETURN(0);
 }
 
@@ -723,6 +762,7 @@ static int ldlm_connect(struct lustre_handle *conn, struct obd_device *src,
 }
 
 struct obd_ops ldlm_obd_ops = {
+        o_owner:       THIS_MODULE,
         o_iocontrol:   ldlm_iocontrol,
         o_setup:       ldlm_setup,
         o_cleanup:     ldlm_cleanup,
@@ -798,7 +838,9 @@ EXPORT_SYMBOL(ldlm_namespace_dump);
 EXPORT_SYMBOL(ldlm_cancel_locks_for_export);
 EXPORT_SYMBOL(ldlm_replay_locks);
 EXPORT_SYMBOL(ldlm_resource_foreach);
+EXPORT_SYMBOL(ldlm_reprocess_all_ns);
 EXPORT_SYMBOL(ldlm_namespace_foreach);
+EXPORT_SYMBOL(ldlm_namespace_foreach_res);
 EXPORT_SYMBOL(l_lock);
 EXPORT_SYMBOL(l_unlock);
 
index 7a972b9..b71dd20 100644 (file)
@@ -30,7 +30,7 @@ static int interrupted_completion_wait(void *data)
         RETURN(1);
 }
 
-static int expired_completion_wait(void *data)
+int ldlm_expired_completion_wait(void *data)
 {
         struct ldlm_lock *lock = data;
         struct ptlrpc_connection *conn;
@@ -48,6 +48,7 @@ static int expired_completion_wait(void *data)
                 LDLM_DEBUG(lock, "timed out waiting for completion");
                 CERROR("lock %p timed out from %s\n", lock,
                        conn->c_remote_uuid);
+                ldlm_lock_dump(D_ERROR, lock);
                 class_signal_connection_failure(conn);
         }
         RETURN(0);
@@ -56,7 +57,7 @@ static int expired_completion_wait(void *data)
 int ldlm_completion_ast(struct ldlm_lock *lock, int flags)
 {
         struct l_wait_info lwi =
-                LWI_TIMEOUT_INTR(obd_timeout * HZ, expired_completion_wait,
+                LWI_TIMEOUT_INTR(obd_timeout * HZ, ldlm_expired_completion_wait,
                                  interrupted_completion_wait, lock);
         int rc = 0;
         ENTRY;
@@ -75,7 +76,7 @@ int ldlm_completion_ast(struct ldlm_lock *lock, int flags)
 
         LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, "
                    "sleeping");
-        ldlm_lock_dump(lock);
+        ldlm_lock_dump(D_OTHER, lock);
         ldlm_reprocess_all(lock->l_resource);
 
  noreproc:
@@ -131,7 +132,7 @@ static int ldlm_cli_enqueue_local(struct ldlm_namespace *ns,
         ldlm_lock2handle(lock, lockh);
         lock->l_connh = NULL;
 
-        err = ldlm_lock_enqueue(lock, cookie, cookielen, flags, completion,
+        err = ldlm_lock_enqueue(ns, lock, cookie, cookielen, flags, completion,
                                 blocking);
         if (err != ELDLM_OK)
                 GOTO(out, err);
@@ -243,7 +244,7 @@ int ldlm_cli_enqueue(struct lustre_handle *connh,
                 /* FIXME: if we've already received a completion AST, this will
                  * LBUG! */
                 ldlm_lock_destroy(lock);
-                GOTO(out, rc);
+                GOTO(out_req, rc);
         }
 
         reply = lustre_msg_buf(req->rq_repmsg, 0);
@@ -282,28 +283,28 @@ int ldlm_cli_enqueue(struct lustre_handle *connh,
                                (long)reply->lock_resource_name[0],
                                (long)lock->l_resource->lr_name[0]);
 
-                        ldlm_lock_change_resource(lock,
+                        ldlm_lock_change_resource(ns, lock,
                                                   reply->lock_resource_name);
                         if (lock->l_resource == NULL) {
                                 LBUG();
-                                RETURN(-ENOMEM);
+                                GOTO(out_req, rc = -ENOMEM);
                         }
                         LDLM_DEBUG(lock, "client-side enqueue, new resource");
                 }
         }
 
         if (!is_replay) {
-                rc = ldlm_lock_enqueue(lock, cookie, cookielen, flags,
+                rc = ldlm_lock_enqueue(ns, lock, cookie, cookielen, flags,
                                        completion, blocking);
                 if (lock->l_completion_ast)
                         lock->l_completion_ast(lock, *flags);
         }
 
-        if (!req_passed_in)
-                ptlrpc_req_finished(req);
-
         LDLM_DEBUG(lock, "client-side enqueue END");
         EXIT;
+ out_req:
+        if (!req_passed_in)
+                ptlrpc_req_finished(req);
  out:
         LDLM_LOCK_PUT(lock);
  out_nolock:
@@ -437,7 +438,7 @@ int ldlm_cli_cancel(struct lustre_handle *lockh)
         ENTRY;
 
         /* concurrent cancels on the same handle can happen */
-        lock = __ldlm_handle2lock(lockh, 0, LDLM_FL_CANCELING);
+        lock = __ldlm_handle2lock(lockh, LDLM_FL_CANCELING);
         if (lock == NULL)
                 RETURN(0);
 
@@ -620,6 +621,9 @@ int ldlm_cli_cancel_unused(struct ldlm_namespace *ns, __u64 *res_id,
         int i;
         ENTRY;
 
+        if (ns == NULL)
+                RETURN(ELDLM_OK);
+
         if (res_id)
                 RETURN(ldlm_cli_cancel_unused_resource(ns, res_id, flags));
 
@@ -698,11 +702,22 @@ static int ldlm_iter_helper(struct ldlm_lock *lock, void *closure)
         return helper->iter(lock, helper->closure);
 }
 
+static int ldlm_res_iter_helper(struct ldlm_resource *res, void *closure)
+{
+        return ldlm_resource_foreach(res, ldlm_iter_helper, closure);
+}
+
 int ldlm_namespace_foreach(struct ldlm_namespace *ns, ldlm_iterator_t iter,
                            void *closure)
 {
-        int i, rc = LDLM_ITER_CONTINUE;
         struct iter_helper_data helper = { iter: iter, closure: closure };
+        return ldlm_namespace_foreach_res(ns, ldlm_res_iter_helper, &helper);
+}
+
+int ldlm_namespace_foreach_res(struct ldlm_namespace *ns,
+                               ldlm_res_iterator_t iter, void *closure)
+{
+        int i, rc = LDLM_ITER_CONTINUE;
         
         l_lock(&ns->ns_lock);
         for (i = 0; i < RES_HASH_SIZE; i++) {
@@ -712,8 +727,7 @@ int ldlm_namespace_foreach(struct ldlm_namespace *ns, ldlm_iterator_t iter,
                                 list_entry(tmp, struct ldlm_resource, lr_hash);
 
                         ldlm_resource_getref(res);
-                        rc = ldlm_resource_foreach(res, ldlm_iter_helper,
-                                                   &helper);
+                        rc = iter(res, closure);
                         ldlm_resource_putref(res);
                         if (rc == LDLM_ITER_STOP)
                                 GOTO(out, rc);
@@ -735,22 +749,44 @@ static int ldlm_chain_lock_for_replay(struct ldlm_lock *lock, void *closure)
         return LDLM_ITER_CONTINUE;
 }
 
-static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock,
-                           int last)
+static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock)
 {
         struct ptlrpc_request *req;
         struct ldlm_request *body;
         struct ldlm_reply *reply;
         int rc, size;
-        int flags = LDLM_FL_REPLAY;
-
-        flags |= lock->l_flags & 
-                (LDLM_FL_BLOCK_GRANTED|LDLM_FL_BLOCK_CONV|LDLM_FL_BLOCK_WAIT);
-
+        int flags;
+
+        /*
+         * If granted mode matches the requested mode, this lock is granted.
+         *
+         * If they differ, but we have a granted mode, then we were granted
+         * one mode and now want another: ergo, converting.
+         *
+         * If we haven't been granted anything and are on a resource list,
+         * then we're blocked/waiting.
+         *
+         * If we haven't been granted anything and we're NOT on a resource list,
+         * then we haven't got a reply yet and don't have a known disposition.
+         * This happens whenever a lock enqueue is the request that triggers
+         * recovery.
+         */
+        if (lock->l_granted_mode == lock->l_req_mode)
+                flags = LDLM_FL_REPLAY | LDLM_FL_BLOCK_GRANTED;
+        else if (lock->l_granted_mode)
+                flags = LDLM_FL_REPLAY | LDLM_FL_BLOCK_CONV;
+        else if (!list_empty(&lock->l_res_link))
+                flags = LDLM_FL_REPLAY | LDLM_FL_BLOCK_WAIT;
+        else
+                flags = LDLM_FL_REPLAY;
+                
         size = sizeof(*body);
         req = ptlrpc_prep_req(imp, LDLM_ENQUEUE, 1, &size, NULL);
         if (!req)
                 RETURN(-ENOMEM);
+
+        /* We're part of recovery, so don't wait for it. */
+        req->rq_level = LUSTRE_CONN_RECOVD;
         
         body = lustre_msg_buf(req->rq_reqmsg, 0);
         ldlm_lock2desc(lock, &body->lock_desc);
@@ -760,9 +796,6 @@ static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock,
         size = sizeof(*reply);
         req->rq_replen = lustre_msg_size(1, &size);
 
-        if (last)
-                req->rq_reqmsg->flags |= MSG_LAST_REPLAY;
-
         LDLM_DEBUG(lock, "replaying lock:");
         rc = ptlrpc_queue_wait(req);
         if (rc != ELDLM_OK)
@@ -792,7 +825,7 @@ int ldlm_replay_locks(struct obd_import *imp)
 
         list_for_each_safe(pos, next, &list) {
                 lock = list_entry(pos, struct ldlm_lock, l_pending_chain);
-                rc = replay_one_lock(imp, lock, (next == &list));
+                rc = replay_one_lock(imp, lock);
                 if (rc)
                         break; /* or try to do the rest? */
         }
index d1f5b61..e5960bd 100644 (file)
@@ -134,7 +134,9 @@ out_ns:
 
 extern struct ldlm_lock *ldlm_lock_get(struct ldlm_lock *lock);
 
-/* If 'local_only' is true, don't try to tell the server, just cleanup. */
+/* If 'local_only' is true, don't try to tell the server, just cleanup.
+ * This is currently only used for recovery, and we make certain assumptions
+ * as a result--notably, that we shouldn't cancel locks with refs. -phil */
 static void cleanup_resource(struct ldlm_resource *res, struct list_head *q,
                              int local_only)
 {
@@ -147,6 +149,18 @@ static void cleanup_resource(struct ldlm_resource *res, struct list_head *q,
                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
                 LDLM_LOCK_GET(lock);
 
+                if (local_only && (lock->l_readers || lock->l_writers)) {
+                        /* This is a little bit gross, but much better than the
+                         * alternative: pretend that we got a blocking AST from
+                         * the server, so that when the lock is decref'd, it
+                         * will go away ... */
+                        lock->l_flags |= LDLM_FL_CBPENDING;
+                        /* ... without sending a CANCEL message. */
+                        lock->l_flags |= LDLM_FL_CANCELING;
+                        LDLM_LOCK_PUT(lock);
+                        continue;
+                }
+
                 /* At shutdown time, don't call the cancellation callback */
                 lock->l_flags |= LDLM_FL_CANCEL;
 
@@ -170,12 +184,18 @@ static void cleanup_resource(struct ldlm_resource *res, struct list_head *q,
                 }
                 LDLM_LOCK_PUT(lock);
         }
+        EXIT;
 }
 
 int ldlm_namespace_cleanup(struct ldlm_namespace *ns, int local_only)
 {
         int i;
 
+        if (ns == NULL) {
+                CDEBUG(D_INFO, "NULL ns, skipping cleanup\n");
+                return ELDLM_OK;
+        }
+
         l_lock(&ns->ns_lock);
         for (i = 0; i < RES_HASH_SIZE; i++) {
                 struct list_head *tmp, *pos;
@@ -431,7 +451,8 @@ void ldlm_resource_add_lock(struct ldlm_resource *res, struct list_head *head,
         l_lock(&res->lr_namespace->ns_lock);
 
         ldlm_resource_dump(res);
-        ldlm_lock_dump(lock);
+        CDEBUG(D_OTHER, "About to grant this lock:\n");
+        ldlm_lock_dump(D_OTHER, lock);
 
         LASSERT(list_empty(&lock->l_res_link));
 
@@ -510,20 +531,20 @@ void ldlm_resource_dump(struct ldlm_resource *res)
         list_for_each(tmp, &res->lr_granted) {
                 struct ldlm_lock *lock;
                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
-                ldlm_lock_dump(lock);
+                ldlm_lock_dump(D_OTHER, lock);
         }
 
         CDEBUG(D_OTHER, "Converting locks:\n");
         list_for_each(tmp, &res->lr_converting) {
                 struct ldlm_lock *lock;
                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
-                ldlm_lock_dump(lock);
+                ldlm_lock_dump(D_OTHER, lock);
         }
 
         CDEBUG(D_OTHER, "Waiting locks:\n");
         list_for_each(tmp, &res->lr_waiting) {
                 struct ldlm_lock *lock;
                 lock = list_entry(tmp, struct ldlm_lock, l_res_link);
-                ldlm_lock_dump(lock);
+                ldlm_lock_dump(D_OTHER, lock);
         }
 }
index ce7a73d..b34c9ab 100644 (file)
@@ -172,7 +172,7 @@ int ldlm_test_basics(struct obd_device *obddev)
         lock1 = ldlm_lock_create(ns, NULL, res_id, LDLM_PLAIN, LCK_CR, NULL, 0);
         if (lock1 == NULL)
                 LBUG();
-        err = ldlm_lock_enqueue(lock1, NULL, 0, &flags,
+        err = ldlm_lock_enqueue(ns, lock1, NULL, 0, &flags,
                                 ldlm_completion_ast, ldlm_blocking_ast);
         if (err != ELDLM_OK)
                 LBUG();
@@ -180,7 +180,7 @@ int ldlm_test_basics(struct obd_device *obddev)
         lock = ldlm_lock_create(ns, NULL, res_id, LDLM_PLAIN, LCK_EX, NULL, 0);
         if (lock == NULL)
                 LBUG();
-        err = ldlm_lock_enqueue(lock, NULL, 0, &flags,
+        err = ldlm_lock_enqueue(ns, lock, NULL, 0, &flags,
                                 ldlm_completion_ast, ldlm_blocking_ast);
         if (err != ELDLM_OK)
                 LBUG();
@@ -222,7 +222,8 @@ int ldlm_test_extents(struct obd_device *obddev)
                                  0);
         if (lock1 == NULL)
                 LBUG();
-        err = ldlm_lock_enqueue(lock1, &ext1, sizeof(ext1), &flags, NULL, NULL);
+        err = ldlm_lock_enqueue(ns, lock1, &ext1, sizeof(ext1), &flags, NULL,
+                                NULL);
         if (err != ELDLM_OK)
                 LBUG();
         if (!(flags & LDLM_FL_LOCK_CHANGED))
@@ -231,7 +232,8 @@ int ldlm_test_extents(struct obd_device *obddev)
         flags = 0;
         lock2 = ldlm_lock_create(ns, NULL, res_id, LDLM_EXTENT, LCK_PR,
                                 NULL, 0);
-        err = ldlm_lock_enqueue(lock2, &ext2, sizeof(ext2), &flags, NULL, NULL);
+        err = ldlm_lock_enqueue(ns, lock2, &ext2, sizeof(ext2), &flags, NULL,
+                                NULL);
         if (err != ELDLM_OK)
                 LBUG();
         if (!(flags & LDLM_FL_LOCK_CHANGED))
@@ -241,7 +243,7 @@ int ldlm_test_extents(struct obd_device *obddev)
         lock = ldlm_lock_create(ns, NULL, res_id, LDLM_EXTENT, LCK_EX, NULL, 0);
         if (lock == NULL)
                 LBUG();
-        err = ldlm_lock_enqueue(lock, &ext3, sizeof(ext3), &flags,
+        err = ldlm_lock_enqueue(ns, lock, &ext3, sizeof(ext3), &flags,
                                 NULL, NULL);
         if (err != ELDLM_OK)
                 LBUG();
@@ -293,7 +295,7 @@ static int ldlm_test_network(struct obd_device *obddev,
         CERROR("ldlm_cli_convert: %d\n", err);
 
         lock = ldlm_handle2lock(&lockh1);
-        ldlm_lock_dump(lock);
+        ldlm_lock_dump(D_OTHER, lock);
         ldlm_lock_put(lock);
 
         /* Need to decrement old mode. Don't bother incrementing new
@@ -432,6 +434,7 @@ static int ldlm_do_convert(void)
 static int ldlm_test_main(void *data)
 {
         struct ldlm_test_thread *thread = data;
+        unsigned long flags;
         ENTRY;
 
         lock_kernel();
@@ -440,10 +443,10 @@ static int ldlm_test_main(void *data)
         sigfillset(&current->blocked);
         recalc_sigpending();
 #else
-        spin_lock_irq(&current->sigmask_lock);
+        spin_lock_irqsave(&current->sigmask_lock, flags);
         sigfillset(&current->blocked);
         recalc_sigpending(current);
-        spin_unlock_irq(&current->sigmask_lock);
+        spin_unlock_irqrestore(&current->sigmask_lock, flags);
 #endif
 
         sprintf(current->comm, "ldlm_test");
index da31808..1bcc388 100644 (file)
@@ -1,4 +1,4 @@
-EXTRA_DIST = mds_updates.c obd_pack.c ll_pack.c simple.c
+EXTRA_DIST = mds_updates.c obd_pack.c  simple.c
 EXTRA_DIST += client.c target.c
 
 include $(top_srcdir)/Rules
index 03fa4e2..5bf0d4a 100644 (file)
@@ -44,12 +44,12 @@ struct obd_device *client_tgtuuid2obd(char *tgtuuid)
 {
         int i;
 
-        for (i=0; i < MAX_OBD_DEVICES; i++) {
+        for (i = 0; i < MAX_OBD_DEVICES; i++) {
                 struct obd_device *obd = &obd_dev[i];
                 if ((strcmp(obd->obd_type->typ_name, LUSTRE_OSC_NAME) == 0) ||
                     (strcmp(obd->obd_type->typ_name, LUSTRE_MDC_NAME) == 0)) {
                         struct client_obd *cli = &obd->u.cli;
-                        if (strncmp(tgtuuid, cli->cl_target_uuid, 
+                        if (strncmp(tgtuuid, cli->cl_target_uuid,
                                     sizeof(cli->cl_target_uuid)) == 0)
                                 return obd;
                 }
@@ -107,7 +107,7 @@ int client_obd_setup(struct obd_device *obddev, obd_count len, void *buf)
         imp->imp_connection = ptlrpc_uuid_to_connection(server_uuid);
         if (!imp->imp_connection)
                 RETURN(-ENOENT);
-        
+
         INIT_LIST_HEAD(&imp->imp_replay_list);
         INIT_LIST_HEAD(&imp->imp_sending_list);
         INIT_LIST_HEAD(&imp->imp_delayed_list);
@@ -120,7 +120,6 @@ int client_obd_setup(struct obd_device *obddev, obd_count len, void *buf)
 
         cli->cl_max_mds_easize = sizeof(struct lov_mds_md);
 
-        MOD_INC_USE_COUNT;
         RETURN(0);
 }
 
@@ -131,7 +130,6 @@ int client_obd_cleanup(struct obd_device * obddev)
         ptlrpc_cleanup_client(&obd->cl_import);
         ptlrpc_put_connection(obd->cl_import.imp_connection);
 
-        MOD_DEC_USE_COUNT;
         return 0;
 }
 
@@ -150,12 +148,10 @@ int client_obd_connect(struct lustre_handle *conn, struct obd_device *obd,
 
         ENTRY;
         down(&cli->cl_sem);
-        MOD_INC_USE_COUNT;
         rc = class_connect(conn, obd, cluuid);
-        if (rc) {
-                MOD_DEC_USE_COUNT;
+        if (rc)
                 GOTO(out_sem, rc);
-        }
+
         cli->cl_conn_count++;
         if (cli->cl_conn_count > 1)
                 GOTO(out_sem, rc);
@@ -217,7 +213,6 @@ out_ldlm:
 out_disco:
                         cli->cl_conn_count--;
                         class_disconnect(conn);
-                        MOD_DEC_USE_COUNT;
                 }
         }
 out_sem:
@@ -251,20 +246,20 @@ int client_obd_disconnect(struct lustre_handle *conn)
 
         cli->cl_conn_count--;
         if (cli->cl_conn_count)
-                GOTO(out_disco, rc = 0);
+                GOTO(out_no_disconnect, rc = 0);
 
         ldlm_namespace_free(obd->obd_namespace);
         obd->obd_namespace = NULL;
         request = ptlrpc_prep_req(&cli->cl_import, rq_opc, 0, NULL,
                                   NULL);
         if (!request)
-                GOTO(out_disco, rc = -ENOMEM);
-        
+                GOTO(out_req, rc = -ENOMEM);
+
         request->rq_replen = lustre_msg_size(0, NULL);
 
         /* Process disconnects even if we're waiting for recovery. */
         request->rq_level = LUSTRE_CONN_RECOVD;
-        
+
         rc = ptlrpc_queue_wait(request);
         if (rc)
                 GOTO(out_req, rc);
@@ -273,12 +268,11 @@ int client_obd_disconnect(struct lustre_handle *conn)
  out_req:
         if (request)
                 ptlrpc_req_finished(request);
- out_disco:
+        list_del_init(&cli->cl_import.imp_chain);
+ out_no_disconnect:
         err = class_disconnect(conn);
         if (!rc && err)
                 rc = err;
-        list_del_init(&cli->cl_import.imp_chain);
-        MOD_DEC_USE_COUNT;
  out_sem:
         up(&cli->cl_sem);
         RETURN(rc);
index 8b3c33a..a03d2bb 100644 (file)
 #include <linux/obd_ost.h>
 #include <linux/lustre_net.h>
 
-void ost_pack_ioo(void **tmp, struct lov_stripe_md *lsm, int bufcnt)
+void ost_pack_ioo(struct obd_ioobj **tmp, struct lov_stripe_md *lsm,int bufcnt)
 {
         struct obd_ioobj *ioo = *tmp;
-        char *c = *tmp;
+        void *p = *tmp;
 
         ioo->ioo_id = HTON__u64(lsm->lsm_object_id);
         ioo->ioo_gr = HTON__u64(0);
         ioo->ioo_type = HTON__u32(S_IFREG);
         ioo->ioo_bufcnt = HTON__u32(bufcnt);
-        *tmp = c + sizeof(*ioo);
+        *tmp = p + sizeof(*ioo);
 }
 
-void ost_unpack_ioo(void **tmp, struct obd_ioobj **ioop)
+void ost_unpack_ioo(struct obd_ioobj **tmp, struct obd_ioobj **ioop)
 {
-        char *c = *tmp;
+        void *p = *tmp;
         struct obd_ioobj *ioo = *tmp;
         *ioop = *tmp;
 
@@ -49,7 +49,7 @@ void ost_unpack_ioo(void **tmp, struct obd_ioobj **ioop)
         ioo->ioo_gr = NTOH__u64(ioo->ioo_gr);
         ioo->ioo_type = NTOH__u32(ioo->ioo_type);
         ioo->ioo_bufcnt = NTOH__u32(ioo->ioo_bufcnt);
-        *tmp = c + sizeof(*ioo);
+        *tmp = p + sizeof(*ioo);
 }
 
 void ost_pack_niobuf(void **tmp, __u64 offset, __u32 len, __u32 flags,
index cb4ccda..73a4383 100644 (file)
 
 #ifdef OBD_CTXT_DEBUG
 /* Debugging check only needed during development */
-#define ASSERT_CTXT_MAGIC(magic) do { if ((magic) != OBD_RUN_CTXT_MAGIC) { \
-                                CERROR("bad ctxt magic\n"); LBUG(); } } while(0)
-#define ASSERT_NOT_KERNEL_CTXT(msg) do { if (segment_eq(get_fs(), get_ds())) { \
-                                        CERROR(msg); LBUG(); } } while(0)
-#define ASSERT_KERNEL_CTXT(msg) do { if (!segment_eq(get_fs(), get_ds())) { \
-                                        CERROR(msg); LBUG(); } } while(0)
+#define ASSERT_CTXT_MAGIC(magic) LASSERT((magic) == OBD_RUN_CTXT_MAGIC)
+#define ASSERT_NOT_KERNEL_CTXT(msg) LASSERT(!segment_eq(get_fs(), get_ds()))
+#define ASSERT_KERNEL_CTXT(msg) LASSERT(segment_eq(get_fs(), get_ds()))
 #else
 #define ASSERT_CTXT_MAGIC(magic) do {} while(0)
 #define ASSERT_NOT_KERNEL_CTXT(msg) do {} while(0)
@@ -56,6 +53,8 @@ void push_ctxt(struct obd_run_ctxt *save, struct obd_run_ctxt *new_ctx,
         */
 
         save->fs = get_fs();
+        LASSERT(atomic_read(&current->fs->pwd->d_count));
+        LASSERT(atomic_read(&new_ctx->pwd->d_count));
         save->pwd = dget(current->fs->pwd);
         save->pwdmnt = mntget(current->fs->pwdmnt);
 
@@ -218,14 +217,19 @@ int lustre_fread(struct file *file, char *str, int len, loff_t *off)
  */
 int lustre_fwrite(struct file *file, const char *str, int len, loff_t *off)
 {
+        ENTRY;
         ASSERT_KERNEL_CTXT("kernel doing write outside kernel context\n");
-        if (!file || !file->f_op || !off)
+        if (!file)
+                RETURN(-ENOENT);
+        if (!file->f_op)
                 RETURN(-ENOSYS);
+        if (!off)
+                RETURN(-EINVAL);
 
         if (!file->f_op->write)
                 RETURN(-EROFS);
 
-        return file->f_op->write(file, str, len, off);
+        RETURN(file->f_op->write(file, str, len, off));
 }
 
 /*
@@ -234,9 +238,10 @@ int lustre_fwrite(struct file *file, const char *str, int len, loff_t *off)
  */
 int lustre_fsync(struct file *file)
 {
+        ENTRY;
         ASSERT_KERNEL_CTXT("kernel doing sync outside kernel context\n");
         if (!file || !file->f_op || !file->f_op->fsync)
                 RETURN(-ENOSYS);
 
-        return file->f_op->fsync(file, file->f_dentry, 0);
+        RETURN(file->f_op->fsync(file, file->f_dentry, 0));
 }
index 141e155..3889f1c 100644 (file)
 #include <linux/lustre_net.h>
 #include <linux/lustre_dlm.h>
 
+int target_handle_reconnect(struct lustre_handle *conn, struct obd_export *exp,
+                            char *cluuid)
+{
+        if (exp->exp_connection) {
+                struct lustre_handle *hdl;
+                hdl = &exp->exp_ldlm_data.led_import.imp_handle;
+                /* Might be a re-connect after a partition. */
+                if (!memcmp(conn, hdl, sizeof *conn)) {
+                        CERROR("%s reconnecting\n", cluuid);
+                        conn->addr = (__u64) (unsigned long)exp;
+                        conn->cookie = exp->exp_cookie;
+                        RETURN(EALREADY);
+                } else {
+                        CERROR("%s reconnecting from %s, "
+                               "handle mismatch (ours "LPX64"/"LPX64", "
+                               "theirs "LPX64"/"LPX64")\n", cluuid,
+                               exp->exp_connection->c_remote_uuid, hdl->addr,
+                               hdl->cookie, conn->addr, conn->cookie);
+                        /* XXX disconnect them here? */
+                        memset(conn, 0, sizeof *conn);
+                        /* This is a little scary, but right now we build this
+                         * file separately into each server module, so I won't
+                         * go _immediately_ to hell.
+                         */
+                        RETURN(-EALREADY);
+                }
+        }
+
+        conn->addr = (__u64) (unsigned long)exp;
+        conn->cookie = exp->exp_cookie;
+        CDEBUG(D_INFO, "existing export for UUID '%s' at %p\n", cluuid, exp);
+        CDEBUG(D_IOCTL,"connect: addr %Lx cookie %Lx\n",
+               (long long)conn->addr, (long long)conn->cookie);
+        RETURN(0);
+}
+
 int target_handle_connect(struct ptlrpc_request *req)
 {
         struct obd_device *target;
@@ -73,6 +109,9 @@ int target_handle_connect(struct ptlrpc_request *req)
         if (rc && rc != EALREADY)
                 GOTO(out, rc);
 
+        /* If all else goes well, this is our RPC return code. */
+        req->rq_status = rc;
+
         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
         if (rc)
                 GOTO(out, rc);
@@ -100,10 +139,15 @@ int target_handle_connect(struct ptlrpc_request *req)
         dlmimp->imp_handle.addr = req->rq_reqmsg->addr;
         dlmimp->imp_handle.cookie = req->rq_reqmsg->cookie;
         dlmimp->imp_obd = /* LDLM! */ NULL;
+        dlmimp->imp_recover = NULL;
+        INIT_LIST_HEAD(&dlmimp->imp_replay_list);
+        INIT_LIST_HEAD(&dlmimp->imp_sending_list);
+        INIT_LIST_HEAD(&dlmimp->imp_delayed_list);
         spin_lock_init(&dlmimp->imp_lock);
         dlmimp->imp_level = LUSTRE_CONN_FULL;
 out:
-        req->rq_status = rc;
+        if (rc)
+                req->rq_status = rc;
         RETURN(rc);
 }
 
index 071c0fd..c536a0a 100644 (file)
@@ -9,13 +9,8 @@ MODULE = llite
 modulefs_DATA = llite.o
 EXTRA_PROGRAMS = llite
 
-LINX= ll_pack.c
-
 llite_SOURCES = dcache.c commit_callback.c super.c rw.c super25.c
-llite_SOURCES += file.c dir.c sysctl.c symlink.c $(LINX)
+llite_SOURCES += file.c dir.c sysctl.c symlink.c
 llite_SOURCES += recover.c namei.c lproc_llite.c
 
-ll_pack.c:
-       test -e ll_pack.c || ln -sf $(top_srcdir)/lib/ll_pack.c .
-
 include $(top_srcdir)/Rules
index e5a595a..a62716b 100644 (file)
@@ -1,10 +1,10 @@
 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
  * vim:expandtab:shiftwidth=8:tabstop=8:
  *
- *  The daemon that causes completed but not committed transactions 
+ *  The daemon that causes completed but not committed transactions
  *   on the MDS to be flushed periodically when they are committed.
- *   A gratuitous getattr RPC is made to the MDS to discover the 
- *   last committed record. 
+ *   A gratuitous getattr RPC is made to the MDS to discover the
+ *   last committed record.
  *
  *  Lustre High Availability Daemon
  *
 
 static int ll_commitcbd_check_event(struct ll_sb_info *sbi)
 {
-        int rc = 0; 
+        int rc = 0;
         ENTRY;
 
-        spin_lock(&sbi->ll_commitcbd_lock); 
-        if (sbi->ll_commitcbd_flags & LL_COMMITCBD_STOPPING) { 
+        spin_lock(&sbi->ll_commitcbd_lock);
+        if (sbi->ll_commitcbd_flags & LL_COMMITCBD_STOPPING)
                 GOTO(out, rc = 1);
-        }
 
+        EXIT;
  out:
         spin_unlock(&sbi->ll_commitcbd_lock);
-        RETURN(rc);
+        return rc;
 }
 
 static int ll_commitcbd_main(void *arg)
 {
         struct ll_sb_info *sbi = (struct ll_sb_info *)arg;
-
+        unsigned long flags;
         ENTRY;
 
         lock_kernel();
         daemonize();
 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
-        spin_lock_irq(&current->sigmask_lock);
+        spin_lock_irqsave(&current->sigmask_lock, flags);
         sigfillset(&current->blocked);
         our_recalc_sigpending(current);
-        spin_unlock_irq(&current->sigmask_lock);
+        spin_unlock_irqrestore(&current->sigmask_lock, flags);
 #else
         sigfillset(&current->blocked);
         our_recalc_sigpending(current);
@@ -80,19 +80,19 @@ static int ll_commitcbd_main(void *arg)
 
         /* And now, loop forever on requests */
         while (1) {
-                wait_event(sbi->ll_commitcbd_waitq, 
+                wait_event(sbi->ll_commitcbd_waitq,
                            ll_commitcbd_check_event(sbi));
 
                 spin_lock(&sbi->ll_commitcbd_lock);
                 if (sbi->ll_commitcbd_flags & LL_COMMITCBD_STOPPING) {
                         spin_unlock(&sbi->ll_commitcbd_lock);
-                        CERROR("lustre_commitd quitting\n"); 
+                        CERROR("lustre_commitd quitting\n");
                         EXIT;
                         break;
                 }
 
                 schedule_timeout(sbi->ll_commitcbd_timeout);
-                CERROR("commit callback daemon woken up - FIXME\n"); 
+                CERROR("commit callback daemon woken up - FIXME\n");
                 spin_unlock(&sbi->ll_commitcbd_lock);
         }
 
@@ -116,7 +116,7 @@ int ll_commitcbd_setup(struct ll_sb_info *sbi)
                 CERROR("cannot start thread\n");
                 RETURN(rc);
         }
-        wait_event(sbi->ll_commitcbd_ctl_waitq, 
+        wait_event(sbi->ll_commitcbd_ctl_waitq,
                    sbi->ll_commitcbd_flags & LL_COMMITCBD_RUNNING);
         RETURN(0);
 }
index ab9596f..921eea2 100644 (file)
@@ -91,7 +91,7 @@ static int ll_dir_readpage(struct file *file, struct page *page)
                 unlock_page(page);
                 RETURN(rc);
         }
-        ldlm_lock_dump_handle(&lockh);
+        ldlm_lock_dump_handle(D_OTHER, &lockh);
 
         if (PageUptodate(page)) {
                 CERROR("Explain this please?\n");
@@ -745,7 +745,69 @@ not_empty:
         return 0;
 }
 
+static int ll_dir_ioctl(struct inode *inode, struct file *file,
+                        unsigned int cmd, unsigned long arg)
+{
+        struct ll_sb_info *sbi = ll_i2sbi(inode);
+        struct obd_ioctl_data *data;
+        ENTRY;
+
+        switch(cmd) {
+        case IOC_MDC_LOOKUP: {
+                struct ptlrpc_request *request = NULL;
+                char *buf = NULL;
+                char *filename;
+                int namelen, rc, err, len = 0;
+                int ea_size = 0; // obd_size_wiremd(&sbi->ll_osc_conn, NULL);
+                unsigned long valid;
+
+                rc = obd_ioctl_getdata(&buf, &len, (void *)arg);
+                if (rc)
+                        RETURN(rc);
+                data = (void *)buf;
+
+                filename = data->ioc_inlbuf1;
+                namelen = data->ioc_inllen1;
+
+                if (namelen < 1) {
+                        CERROR("IOC_MDC_LOOKUP missing filename\n");
+                        GOTO(out, rc = -EINVAL);
+                }
+
+                valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLSIZE;
+                rc = mdc_getattr_name(&sbi->ll_mdc_conn, inode, filename,
+                                      namelen, valid, ea_size, &request);
+                if (rc < 0) {
+                        CERROR("mdc_getattr_name: %d\n", rc);
+                        GOTO(out, rc);
+                } else {
+                        struct mds_body *body;
+                        body = lustre_msg_buf(request->rq_repmsg, 0);
+                        /* surely there's a better way -phik */
+                        data->ioc_obdo1.o_mode = body->mode;
+                        data->ioc_obdo1.o_uid = body->uid;
+                        data->ioc_obdo1.o_gid = body->gid;
+                }
+
+                err = copy_to_user((void *)arg, buf, len);
+                if (err)
+                        GOTO(out_req, rc = -EFAULT);
+
+                EXIT;
+        out_req:
+                ptlrpc_req_finished(request);
+        out:
+                OBD_FREE(buf, len);
+                return rc;
+        }
+        default:
+                CERROR("unrecognized ioctl %#x\n", cmd);
+                RETURN(-ENOTTY);
+        }
+}
+
 struct file_operations ll_dir_operations = {
         read: generic_read_dir,
-        readdir: ll_readdir
+        readdir: ll_readdir,
+        ioctl: ll_dir_ioctl
 };
index 87c9012..6b37d99 100644 (file)
 int ll_inode_setattr(struct inode *inode, struct iattr *attr, int do_trunc);
 extern int ll_setattr(struct dentry *de, struct iattr *attr);
 
-int ll_create_objects(struct super_block *sb, obd_id id, uid_t uid, gid_t gid,
-                      struct lov_stripe_md **lsmp)
+static int ll_mdc_open(struct lustre_handle *mdc_conn, struct inode *inode,
+                       struct file *file, struct lov_mds_md *lmm, int lmm_size)
 {
+        struct ptlrpc_request *req = NULL;
+        struct ll_file_data *fd;
+        int rc;
+        ENTRY;
+
+        LASSERT(!file->private_data);
+
+        fd = kmem_cache_alloc(ll_file_data_slab, SLAB_KERNEL);
+        if (!fd)
+                RETURN(-ENOMEM);
+
+        memset(fd, 0, sizeof(*fd));
+        fd->fd_mdshandle.addr = (__u64)(unsigned long)file;
+        get_random_bytes(&fd->fd_mdshandle.cookie,
+                         sizeof(fd->fd_mdshandle.cookie));
+
+        rc = mdc_open(mdc_conn, inode->i_ino, S_IFREG | inode->i_mode,
+                      file->f_flags, lmm, lmm_size, &fd->fd_mdshandle, &req);
+
+        /* This is the "reply" refcount. */
+        ptlrpc_req_finished(req);
+
+        if (rc)
+                GOTO(out_fd, rc);
+
+        fd->fd_req = req;
+        file->private_data = fd;
+
+        if (!fd->fd_mdshandle.addr ||
+            fd->fd_mdshandle.addr == (__u64)(unsigned long)file) {
+                CERROR("hmm, mdc_open didn't assign fd_mdshandle?\n");
+                /* XXX handle this how, abort or is it non-fatal? */
+        }
+
+        file->f_flags &= ~O_LOV_DELAY_CREATE;
+        RETURN(0);
+
+out_fd:
+        fd->fd_mdshandle.cookie = DEAD_HANDLE_MAGIC;
+        kmem_cache_free(ll_file_data_slab, fd);
+
+        return -abs(rc);
+}
+
+static int ll_mdc_close(struct lustre_handle *mdc_conn, struct inode *inode,
+                        struct file *file)
+{
+        struct ll_file_data *fd = file->private_data;
+        struct ptlrpc_request *req = NULL;
+        unsigned long flags;
+        struct obd_import *imp = fd->fd_req->rq_import;
+        int rc;
+
+        /* Complete the open request and remove it from replay list */
+        DEBUG_REQ(D_HA, fd->fd_req, "matched open req %p", fd->fd_req);
+        rc = mdc_close(&ll_i2sbi(inode)->ll_mdc_conn, inode->i_ino,
+                       inode->i_mode, &fd->fd_mdshandle, &req);
+
+        if (rc)
+                CERROR("inode %lu close failed: rc = %d\n", inode->i_ino, rc);
+        ptlrpc_req_finished(req);
+
+        spin_lock_irqsave(&imp->imp_lock, flags);
+        if (fd->fd_req->rq_transno) {
+                /* This caused an EA to be written, need to replay as a normal
+                 * transaction now.  Our reference is now effectively owned
+                 * by the imp_replay_list, and we'll be committed just like
+                 * other transno-having requests now.
+                 */
+                fd->fd_req->rq_flags &= ~PTL_RPC_FL_REPLAY;
+                spin_unlock_irqrestore(&imp->imp_lock, flags);
+        } else {
+                /* No transno means that we can just drop our ref. */
+                spin_unlock_irqrestore(&imp->imp_lock, flags);
+                ptlrpc_req_finished(fd->fd_req);
+        }
+        fd->fd_mdshandle.cookie = DEAD_HANDLE_MAGIC;
+        file->private_data = NULL;
+        kmem_cache_free(ll_file_data_slab, fd);
+
+        return -abs(rc);
+}
+
+static int ll_osc_open(struct lustre_handle *conn, struct inode *inode,
+                       struct file *file, struct lov_stripe_md *lsm)
+{
+        struct ll_file_data *fd;
         struct obdo *oa;
         int rc;
         ENTRY;
@@ -43,231 +130,245 @@ int ll_create_objects(struct super_block *sb, obd_id id, uid_t uid, gid_t gid,
         oa = obdo_alloc();
         if (!oa)
                 RETURN(-ENOMEM);
+        oa->o_id = lsm->lsm_object_id;
+        oa->o_mode = S_IFREG;
+        oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLSIZE |
+                OBD_MD_FLBLOCKS;
+        rc = obd_open(conn, oa, lsm);
+        if (rc)
+                GOTO(out, rc);
 
-        oa->o_mode = S_IFREG | 0600;
-        oa->o_id = id;
-        oa->o_uid = uid;
-        oa->o_gid = gid;
-        oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLMODE |
-                OBD_MD_FLUID | OBD_MD_FLGID;
-        rc = obd_create(ll_s2obdconn(sb), oa, lsmp);
-        obdo_free(oa);
+        obdo_to_inode(inode, oa, OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
 
-        if (!rc)
-                LASSERT(*lsmp && (*lsmp)->lsm_object_id);
+        fd = file->private_data;
+        obd_oa2handle(&fd->fd_osthandle, oa);
+
+        atomic_inc(&ll_i2info(inode)->lli_open_count);
+out:
+        obdo_free(oa);
         RETURN(rc);
 }
 
-static int ll_file_open(struct inode *inode, struct file *file)
+/* Caller must hold lli_open_sem to protect lli->lli_smd from changing and
+ * duplicate objects from being created.  We only install lsm to lli_smd if
+ * the mdc open was successful (hence stored stripe MD on MDS), otherwise
+ * other nodes could try to create different objects for the same file.
+ */
+static int ll_create_open_obj(struct lustre_handle *conn, struct inode *inode,
+                              struct file *file, struct lov_stripe_md *lsm)
 {
-        struct ll_sb_info *sbi = ll_i2sbi(inode);
         struct ll_inode_info *lli = ll_i2info(inode);
-        struct lustre_handle *conn = ll_i2obdconn(inode);
-        struct ptlrpc_request *req = NULL;
-        struct ll_file_data *fd;
-        struct obdo *oa;
-        struct lov_stripe_md *lsm;
         struct lov_mds_md *lmm = NULL;
         int lmm_size = 0;
-        int rc = 0;
+        struct obdo *oa;
+        int rc, err;
         ENTRY;
 
-        LASSERT(!file->private_data);
-
-        lsm = lli->lli_smd;
+        oa = obdo_alloc();
+        if (!oa)
+                RETURN(-ENOMEM);
 
-        /*  delayed create of object (intent created inode) */
-        /*  XXX object needs to be cleaned up if mdc_open fails */
-        /*  XXX error handling appropriate here? */
-        if (lsm == NULL) {
-                if (file->f_flags & O_LOV_DELAY_CREATE) {
-                        CDEBUG(D_INODE, "delaying object creation\n");
-                        RETURN(0);
-                }
-                down(&lli->lli_open_sem);
-                /* Check to see if we lost the race */
-                if (!lli->lli_smd)
-                        rc = ll_create_objects(inode->i_sb, inode->i_ino, 0, 0,
-                                               &lli->lli_smd);
-                up(&lli->lli_open_sem);
-                if (rc)
-                        RETURN(rc);
+        oa->o_mode = S_IFREG | 0600;
+        oa->o_id = inode->i_ino;
+        /* Keep these 0 for now, because chown/chgrp does not change the
+         * ownership on the OST, and we don't want to allow BA OST NFS
+         * users to access these objects by mistake.
+         */
+        oa->o_uid = 0;
+        oa->o_gid = 0;
+        oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLMODE |
+                OBD_MD_FLUID | OBD_MD_FLGID;
 
-                lsm = lli->lli_smd;
+        rc = obd_create(conn, oa, &lsm);
+        if (rc) {
+                CERROR("error creating objects for inode %lu: rc = %d\n",
+                       inode->i_ino, rc);
+                GOTO(out_oa, rc);
         }
 
-        /* XXX We should only send this to MDS if we just created these
-         *     objects, except we also need to handle the user-stripe case.
-         */
-        rc = obd_packmd(conn, &lmm, lli->lli_smd);
+        LASSERT(lsm && lsm->lsm_object_id);
+        rc = obd_packmd(conn, &lmm, lsm);
         if (rc < 0)
-                GOTO(out, rc);
+                GOTO(out_destroy, rc);
 
         lmm_size = rc;
 
-        fd = kmem_cache_alloc(ll_file_data_slab, SLAB_KERNEL);
-        if (!fd) {
-                if (lmm)
-                        obd_free_wiremd(conn, &lmm);
-                GOTO(out, rc = -ENOMEM);
-        }
-        memset(fd, 0, sizeof(*fd));
+        rc = ll_mdc_open(&ll_i2sbi(inode)->ll_mdc_conn,inode,file,lmm,lmm_size);
 
-        fd->fd_mdshandle.addr = (__u64)(unsigned long)file;
-        get_random_bytes(&fd->fd_mdshandle.cookie,
-                         sizeof(fd->fd_mdshandle.cookie));
-        rc = mdc_open(&sbi->ll_mdc_conn, inode->i_ino, S_IFREG | inode->i_mode,
-                      file->f_flags, lmm, lmm_size, &fd->fd_mdshandle, &req);
-        if (lmm)
-                obd_free_wiremd(conn, &lmm);
-        fd->fd_req = req;
+        obd_free_wiremd(conn, &lmm);
 
-        /* This is the "reply" refcount. */
-        ptlrpc_req_finished(req);
-        if (rc)
-                GOTO(out_req, -abs(rc));
-        if (!fd->fd_mdshandle.addr ||
-            fd->fd_mdshandle.addr == (__u64)(unsigned long)file) {
-                CERROR("hmm, mdc_open didn't assign fd_mdshandle?\n");
-                /* XXX handle this how, abort or is it non-fatal? */
+        /* If we couldn't complete mdc_open() and store the stripe MD on the
+         * MDS, we need to destroy the objects now or they will be leaked.
+         */
+        if (rc) {
+                CERROR("error MDS opening %lu with delayed create: rc %d\n",
+                       inode->i_ino, rc);
+                GOTO(out_destroy, rc);
         }
+        lli->lli_smd = lsm;
 
-        oa = obdo_alloc();
-        if (!oa)
-                GOTO(out_mdc, rc = -EINVAL);
+        EXIT;
+out_oa:
+        obdo_free(oa);
+        return rc;
 
+out_destroy:
+        obdo_from_inode(oa, inode, OBD_MD_FLTYPE);
         oa->o_id = lsm->lsm_object_id;
-        oa->o_mode = S_IFREG;
-        oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLSIZE |
-                OBD_MD_FLBLOCKS;
-        rc = obd_open(ll_i2obdconn(inode), oa, lsm);
-        obdo_to_inode(inode, oa, oa->o_valid & (OBD_MD_FLSIZE|OBD_MD_FLBLOCKS));
+        oa->o_valid |= OBD_MD_FLID;
+        err = obd_destroy(conn, oa, lsm);
+        obd_free_memmd(conn, &lsm);
+        if (err)
+                CERROR("error uncreating inode %lu objects: rc %d\n",
+                       inode->i_ino, err);
+        goto out_oa;
+}
 
-        obd_oa2handle(&fd->fd_osthandle, oa);
-        obdo_free(oa);
+/* Open a file, and (for the very first open) create objects on the OSTs at
+ * this time.  If opened with O_LOV_DELAY_CREATE, then we don't do the object
+ * creation or open until ll_lov_setstripe() ioctl is called.  We grab
+ * lli_open_sem to ensure no other process will create objects, send the
+ * stripe MD to the MDS, or try to destroy the objects if that fails.
+ *
+ * If we already have the stripe MD locally, we don't request it in
+ * mdc_open() by passing a lmm_size = 0.
+ *
+ * It is up to the application to ensure no other processes open this file
+ * in the O_LOV_DELAY_CREATE case, or the default striping pattern will be
+ * used.  We might be able to avoid races of that sort by getting lli_open_sem
+ * before returning in the O_LOV_DELAY_CREATE case and dropping it here
+ * or in ll_file_release(), but I'm not sure that is desirable/necessary.
+ */
+static int ll_file_open(struct inode *inode, struct file *file)
+{
+        struct ll_sb_info *sbi = ll_i2sbi(inode);
+        struct ll_inode_info *lli = ll_i2info(inode);
+        struct lustre_handle *conn = ll_i2obdconn(inode);
+        struct lov_stripe_md *lsm;
+        int rc = 0;
+        ENTRY;
 
-        if (rc)
-                GOTO(out_mdc, rc = -abs(rc));
+        lsm = lli->lli_smd;
+        if (lsm == NULL) {
+                if (file->f_flags & O_LOV_DELAY_CREATE) {
+                        CDEBUG(D_INODE, "delaying object creation\n");
+                        RETURN(0);
+                }
 
-        atomic_inc(&lli->lli_open_count);
+                down(&lli->lli_open_sem);
+                if (!lli->lli_smd) {
+                        rc = ll_create_open_obj(conn, inode, file, NULL);
+                        up(&lli->lli_open_sem);
+                } else {
+                        CERROR("stripe already set on ino %lu\n", inode->i_ino);
+                        up(&lli->lli_open_sem);
+                        rc = ll_mdc_open(&sbi->ll_mdc_conn, inode, file,NULL,0);
+                }
+                lsm = lli->lli_smd;
+        } else
+                rc = ll_mdc_open(&sbi->ll_mdc_conn, inode, file, NULL, 0);
 
-        file->private_data = fd;
+        if (rc)
+                RETURN(rc);
 
+        rc = ll_osc_open(conn, inode, file, lsm);
+        if (rc)
+                GOTO(out_close, rc);
         RETURN(0);
-out_mdc:
-        mdc_close(&sbi->ll_mdc_conn, inode->i_ino,
-                  S_IFREG, &fd->fd_mdshandle, &req);
-out_req:
-        ptlrpc_req_finished(req); /* once for an early "commit" */
-//out_fd:
-        fd->fd_mdshandle.cookie = DEAD_HANDLE_MAGIC;
-        kmem_cache_free(ll_file_data_slab, fd);
-out:
+out_close:
+        ll_mdc_close(&sbi->ll_mdc_conn, inode, file);
         return rc;
 }
 
 int ll_size_lock(struct inode *inode, struct lov_stripe_md *lsm, obd_off start,
-                 int mode, struct lustre_handle **lockhs_p)
+                 int mode, struct lustre_handle *lockh)
 {
         struct ll_sb_info *sbi = ll_i2sbi(inode);
         struct ldlm_extent extent;
-        struct lustre_handle *lockhs = NULL;
-        int rc, flags = 0, stripe_count;
+        int rc, flags = 0;
         ENTRY;
 
-        if (sbi->ll_flags & LL_SBI_NOLCK) {
-                *lockhs_p = NULL;
+        /* XXX phil: can we do this?  won't it screw the file size up? */
+        if (sbi->ll_flags & LL_SBI_NOLCK)
                 RETURN(0);
-        }
-
-        stripe_count = lsm->lsm_stripe_count;
-        if (!stripe_count)
-                stripe_count = 1;
-
-        OBD_ALLOC(lockhs, stripe_count * sizeof(*lockhs));
-        if (lockhs == NULL)
-                RETURN(-ENOMEM);
 
         extent.start = start;
         extent.end = OBD_OBJECT_EOF;
 
         rc = obd_enqueue(&sbi->ll_osc_conn, lsm, NULL, LDLM_EXTENT, &extent,
                          sizeof(extent), mode, &flags, ll_lock_callback,
-                         inode, sizeof(*inode), lockhs);
-        if (rc != ELDLM_OK) {
-                CERROR("lock enqueue: %d\n", rc);
-                OBD_FREE(lockhs, stripe_count * sizeof(*lockhs));
-        } else
-                *lockhs_p = lockhs;
+                         inode, sizeof(*inode), lockh);
         RETURN(rc);
 }
 
 int ll_size_unlock(struct inode *inode, struct lov_stripe_md *lsm, int mode,
-                   struct lustre_handle *lockhs)
+                   struct lustre_handle *lockh)
 {
         struct ll_sb_info *sbi = ll_i2sbi(inode);
-        int rc, stripe_count;
+        int rc;
         ENTRY;
 
+        /* XXX phil: can we do this?  won't it screw the file size up? */
         if (sbi->ll_flags & LL_SBI_NOLCK)
                 RETURN(0);
 
-        if (lockhs == NULL) {
-                LBUG();
-                RETURN(-EINVAL);
-        }
-
-        rc = obd_cancel(&sbi->ll_osc_conn, lsm, mode, lockhs);
+        rc = obd_cancel(&sbi->ll_osc_conn, lsm, mode, lockh);
         if (rc != ELDLM_OK) {
                 CERROR("lock cancel: %d\n", rc);
                 LBUG();
         }
 
-        stripe_count = lsm->lsm_stripe_count;
-        if (!stripe_count)
-                stripe_count = 1;
-
-        OBD_FREE(lockhs, stripe_count * sizeof(*lockhs));
         RETURN(rc);
 }
 
 int ll_file_size(struct inode *inode, struct lov_stripe_md *lsm)
 {
         struct ll_sb_info *sbi = ll_i2sbi(inode);
-        struct lustre_handle *lockhs;
+        //struct lustre_handle lockh = { 0, 0 };
         struct obdo oa;
-        int err, rc;
+        //int err;
+        int rc;
         ENTRY;
 
         LASSERT(lsm);
         LASSERT(sbi);
 
-        rc = ll_size_lock(inode, lsm, 0, LCK_PR, &lockhs);
+        /* XXX do not yet need size lock - OST size always correct (sync write)
+        rc = ll_size_lock(inode, lsm, 0, LCK_PR, &lockh);
         if (rc != ELDLM_OK) {
                 CERROR("lock enqueue: %d\n", rc);
                 RETURN(rc);
         }
+        */
 
         memset(&oa, 0, sizeof oa);
         oa.o_id = lsm->lsm_object_id;
         oa.o_mode = S_IFREG;
         oa.o_valid = OBD_MD_FLID|OBD_MD_FLTYPE|OBD_MD_FLSIZE|OBD_MD_FLBLOCKS;
         rc = obd_getattr(&sbi->ll_osc_conn, &oa, lsm);
-        if (!rc)
-                obdo_to_inode(inode, &oa,
-                              oa.o_valid & ~(OBD_MD_FLTYPE | OBD_MD_FLMODE));
-
-        err = ll_size_unlock(inode, lsm, LCK_PR, lockhs);
+        if (!rc) {
+                obdo_to_inode(inode, &oa, OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
+                CDEBUG(D_INODE, LPX64" size %Lu/%Lu\n",
+                       lsm->lsm_object_id, inode->i_size, inode->i_size);
+        }
+        /* XXX do not need size lock, because OST size always correct (sync write)
+        err = ll_size_unlock(inode, lsm, LCK_PR, &lockh);
         if (err != ELDLM_OK) {
                 CERROR("lock cancel: %d\n", err);
-                LBUG();
+                if (!rc)
+                        rc = err;
         }
+        */
         RETURN(rc);
 }
 
+/* While this returns an error code, fput() the caller does not, so we need
+ * to make every effort to clean up all of our state here.  Also, applications
+ * rarely check close errors and even if an error is returned they will not
+ * re-try the close call.
+ */
 static int ll_file_release(struct inode *inode, struct file *file)
 {
-        struct ptlrpc_request *req = NULL;
         struct ll_file_data *fd;
         struct obdo oa;
         struct ll_sb_info *sbi = ll_i2sbi(inode);
@@ -278,93 +379,34 @@ static int ll_file_release(struct inode *inode, struct file *file)
         ENTRY;
 
         fd = (struct ll_file_data *)file->private_data;
-        if (!fd) {
-                LASSERT(file->f_flags & O_LOV_DELAY_CREATE);
-                GOTO(out, rc = 0);
-        }
+        if (!fd) /* no process opened the file after an mcreate */
+                RETURN(rc = 0);
 
         memset(&oa, 0, sizeof(oa));
         oa.o_id = lsm->lsm_object_id;
         oa.o_mode = S_IFREG;
         oa.o_valid = OBD_MD_FLTYPE | OBD_MD_FLID;
         obd_handle2oa(&oa, &fd->fd_osthandle);
-        rc = obd_close(ll_i2obdconn(inode), &oa, lsm);
+        rc = obd_close(&sbi->ll_osc_conn, &oa, lsm);
         if (rc)
-                GOTO(out_mdc, rc = -abs(rc));
-
-#if 0
-#error "This should only be done on the node that already has the EOF lock"
-#error "and only in the case where the file size actually changed.  For now"
-#error "we don't care about the size on the MDS, since we never use it (the"
-#error "OST always has the authoritative size and we don't even use the MDS."
-        /* If this fails and we goto out_fd, the file size on the MDS is out of
-         * date.  Is that a big deal? */
-        if (file->f_mode & FMODE_WRITE) {
-                struct lustre_handle *lockhs;
-
-                rc = ll_size_lock(inode, lsm, 0, LCK_PR, &lockhs);
-                if (rc)
-                        GOTO(out_mdc, -abs(rc));
-
-                oa.o_id = lsm->lsm_object_id;
-                oa.o_mode = S_IFREG;
-                oa.o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLSIZE |
-                        OBD_MD_FLBLOCKS;
-                rc = obd_getattr(&sbi->ll_osc_conn, &oa, lsm);
-                if (!rc) {
-                        struct iattr attr;
-                        attr.ia_valid = (ATTR_MTIME | ATTR_CTIME | ATTR_ATIME |
-                                         ATTR_SIZE);
-                        attr.ia_mtime = inode->i_mtime;
-                        attr.ia_ctime = inode->i_ctime;
-                        attr.ia_atime = inode->i_atime;
-                        attr.ia_size = oa.o_size;
-
-                        inode->i_blocks = oa.o_blocks;
-
-                        /* XXX: this introduces a small race that we should
-                         * evaluate */
-                        rc = ll_inode_setattr(inode, &attr, 0);
-                }
-                rc2 = ll_size_unlock(inode, lli->lli_smd, LCK_PR, lockhs);
-                if (rc2) {
-                        CERROR("lock cancel: %d\n", rc);
-                        LBUG();
-                        if (!rc)
-                                rc = rc2;
-                }
-        }
-#endif
+                CERROR("inode %lu object close failed: rc = %d\n",
+                       inode->i_ino, rc);
 
-out_mdc:
-        rc2 = mdc_close(&sbi->ll_mdc_conn, inode->i_ino,
-                        S_IFREG, &fd->fd_mdshandle, &req);
-        ptlrpc_req_finished(req);
-        if (rc2) {
-                if (!rc)
-                        rc = -abs(rc2);
-                GOTO(out_fd, rc);
-        }
-        DEBUG_REQ(D_HA, fd->fd_req, "matched open for this close: ");
-        ptlrpc_req_finished(fd->fd_req);
+        rc2 = ll_mdc_close(&sbi->ll_mdc_conn, inode, file);
+        if (rc2 && !rc)
+                rc = rc2;
 
         if (atomic_dec_and_test(&lli->lli_open_count)) {
                 CDEBUG(D_INFO, "last close, cancelling unused locks\n");
-                rc = obd_cancel_unused(ll_i2obdconn(inode), lsm, 0);
-                if (rc)
+                rc2 = obd_cancel_unused(&sbi->ll_osc_conn, lsm, 0);
+                if (rc2 && !rc) {
+                        rc = rc2;
                         CERROR("obd_cancel_unused: %d\n", rc);
-        } else {
+                }
+        } else
                 CDEBUG(D_INFO, "not last close, not cancelling unused locks\n");
-        }
-
-        EXIT;
 
-out_fd:
-        fd->fd_mdshandle.cookie = DEAD_HANDLE_MAGIC;
-        file->private_data = NULL;
-        kmem_cache_free(ll_file_data_slab, fd);
-out:
-        return rc;
+        RETURN(rc);
 }
 
 static inline void ll_remove_suid(struct inode *inode)
@@ -401,7 +443,7 @@ int ll_lock_callback(struct ldlm_lock *lock, struct ldlm_lock_desc *new,
                      void *data, __u32 data_len, int flag)
 {
         struct inode *inode = data;
-        struct lustre_handle lockh;
+        struct lustre_handle lockh = { 0, 0 };
         int rc;
         ENTRY;
 
@@ -438,7 +480,7 @@ static ssize_t ll_file_read(struct file *filp, char *buf, size_t count,
         struct ll_file_data *fd = (struct ll_file_data *)filp->private_data;
         struct inode *inode = filp->f_dentry->d_inode;
         struct ll_sb_info *sbi = ll_i2sbi(inode);
-        struct lustre_handle *lockhs = NULL;
+        struct lustre_handle lockh = { 0, 0 };
         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
         int flags = 0;
         ldlm_error_t err;
@@ -449,17 +491,13 @@ static ssize_t ll_file_read(struct file *filp, char *buf, size_t count,
          * call us */
         retval = ll_file_size(inode, lsm);
         if (retval < 0) {
-                CERROR("ll_file_size: %d\n", retval);
+                CERROR("ll_file_size: "LPSZ"\n", retval);
                 RETURN(retval);
         }
 
         if (!(fd->fd_flags & LL_FILE_IGNORE_LOCK) &&
             !(sbi->ll_flags & LL_SBI_NOLCK)) {
                 struct ldlm_extent extent;
-                OBD_ALLOC(lockhs, lsm->lsm_stripe_count * sizeof(*lockhs));
-                if (!lockhs)
-                        RETURN(-ENOMEM);
-
                 extent.start = *ppos;
                 extent.end = *ppos + count;
                 CDEBUG(D_INFO, "Locking inode %lu, start "LPU64" end "LPU64"\n",
@@ -468,15 +506,14 @@ static ssize_t ll_file_read(struct file *filp, char *buf, size_t count,
                 err = obd_enqueue(&sbi->ll_osc_conn, lsm, NULL, LDLM_EXTENT,
                                   &extent, sizeof(extent), LCK_PR, &flags,
                                   ll_lock_callback, inode, sizeof(*inode),
-                                  lockhs);
+                                  &lockh);
                 if (err != ELDLM_OK) {
-                        OBD_FREE(lockhs, lsm->lsm_stripe_count*sizeof(*lockhs));
                         CERROR("lock enqueue: err: %d\n", err);
                         RETURN(err);
                 }
         }
 
-        CDEBUG(D_INFO, "Reading inode %lu, %d bytes, offset %Ld\n",
+        CDEBUG(D_INFO, "Reading inode %lu, "LPSZ" bytes, offset %Ld\n",
                inode->i_ino, count, *ppos);
         retval = generic_file_read(filp, buf, count, ppos);
 
@@ -485,15 +522,13 @@ static ssize_t ll_file_read(struct file *filp, char *buf, size_t count,
 
         if (!(fd->fd_flags & LL_FILE_IGNORE_LOCK) &&
             !(sbi->ll_flags & LL_SBI_NOLCK)) {
-                err = obd_cancel(&sbi->ll_osc_conn, lsm, LCK_PR, lockhs);
+                err = obd_cancel(&sbi->ll_osc_conn, lsm, LCK_PR, &lockh);
                 if (err != ELDLM_OK) {
                         CERROR("lock cancel: err: %d\n", err);
                         retval = err;
                 }
         }
 
-        if (lockhs)
-                OBD_FREE(lockhs, lsm->lsm_stripe_count * sizeof(*lockhs));
         RETURN(retval);
 }
 
@@ -506,7 +541,7 @@ ll_file_write(struct file *file, const char *buf, size_t count, loff_t *ppos)
         struct ll_file_data *fd = (struct ll_file_data *)file->private_data;
         struct inode *inode = file->f_dentry->d_inode;
         struct ll_sb_info *sbi = ll_i2sbi(inode);
-        struct lustre_handle *lockhs = NULL, *eof_lockhs = NULL;
+        struct lustre_handle lockh = { 0, 0 }, eof_lockh = { 0, 0 };
         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
         int flags = 0;
         ldlm_error_t err;
@@ -520,7 +555,7 @@ ll_file_write(struct file *file, const char *buf, size_t count, loff_t *ppos)
                 if (!oa)
                         RETURN(-ENOMEM);
 
-                err = ll_size_lock(inode, lsm, 0, LCK_PW, &eof_lockhs);
+                err = ll_size_lock(inode, lsm, 0, LCK_PW, &eof_lockh);
                 if (err) {
                         obdo_free(oa);
                         RETURN(err);
@@ -545,9 +580,6 @@ ll_file_write(struct file *file, const char *buf, size_t count, loff_t *ppos)
         if (!(fd->fd_flags & LL_FILE_IGNORE_LOCK) &&
             !(sbi->ll_flags & LL_SBI_NOLCK)) {
                 struct ldlm_extent extent;
-                OBD_ALLOC(lockhs, lsm->lsm_stripe_count * sizeof(*lockhs));
-                if (!lockhs)
-                        GOTO(out_eof, retval = -ENOMEM);
                 extent.start = *ppos;
                 extent.end = *ppos + count;
                 CDEBUG(D_INFO, "Locking inode %lu, start "LPU64" end "LPU64"\n",
@@ -556,35 +588,31 @@ ll_file_write(struct file *file, const char *buf, size_t count, loff_t *ppos)
                 err = obd_enqueue(&sbi->ll_osc_conn, lsm, NULL, LDLM_EXTENT,
                                   &extent, sizeof(extent), LCK_PW, &flags,
                                   ll_lock_callback, inode, sizeof(*inode),
-                                  lockhs);
+                                  &lockh);
                 if (err != ELDLM_OK) {
                         CERROR("lock enqueue: err: %d\n", err);
-                        GOTO(out_free, retval = err);
+                        GOTO(out_eof, retval = err);
                 }
         }
 
-        CDEBUG(D_INFO, "Writing inode %lu, %ld bytes, offset "LPD64"\n",
-               inode->i_ino, (long)count, *ppos);
+        CDEBUG(D_INFO, "Writing inode %lu, "LPSZ" bytes, offset %Lu\n",
+               inode->i_ino, count, *ppos);
 
         retval = generic_file_write(file, buf, count, ppos);
 
         if (!(fd->fd_flags & LL_FILE_IGNORE_LOCK) ||
             sbi->ll_flags & LL_SBI_NOLCK) {
-                err = obd_cancel(&sbi->ll_osc_conn, lsm, LCK_PW, lockhs);
+                err = obd_cancel(&sbi->ll_osc_conn, lsm, LCK_PW, &lockh);
                 if (err != ELDLM_OK) {
                         CERROR("lock cancel: err: %d\n", err);
-                        GOTO(out_free, retval = err);
+                        GOTO(out_eof, retval = err);
                 }
         }
 
         EXIT;
- out_free:
-        if (lockhs)
-                OBD_FREE(lockhs, lsm->lsm_stripe_count * sizeof(*lockhs));
-
  out_eof:
         if (!S_ISBLK(inode->i_mode) && file->f_flags & O_APPEND) {
-                err = ll_size_unlock(inode, lsm, LCK_PW, eof_lockhs);
+                err = ll_size_unlock(inode, lsm, LCK_PW, &eof_lockh);
                 if (err && !retval)
                         retval = err;
         }
@@ -592,121 +620,54 @@ ll_file_write(struct file *file, const char *buf, size_t count, loff_t *ppos)
         return retval;
 }
 
-/* Retrieve object striping information.
- *
- * @arg is a pointer to a user struct with one or more of the fields set to
- * indicate the application preference: lmm_stripe_count, lmm_stripe_size,
- * lmm_stripe_offset, and lmm_stripe_pattern.  lmm_magic must be LOV_MAGIC.
- */
 static int ll_lov_setstripe(struct inode *inode, struct file *file,
                             unsigned long arg)
 {
         struct ll_inode_info *lli = ll_i2info(inode);
-        struct lov_mds_md *lmm = NULL, *lmmu = (void *)arg;
-        struct lustre_handle *conn = ll_i2obdconn(inode);
+        struct lustre_handle *conn;
+        struct lov_stripe_md *lsm;
         int rc;
+        ENTRY;
 
-        rc = obd_alloc_wiremd(conn, &lmm);
-        if (rc < 0)
-                RETURN(rc);
-
-        rc = copy_from_user(lmm, lmmu, sizeof(*lmm));
-        if (rc)
-                GOTO(out_free, rc = -EFAULT);
+        down(&lli->lli_open_sem);
+        lsm = lli->lli_smd;
+        if (lsm) {
+                up(&lli->lli_open_sem);
+                CERROR("stripe already set for ino %lu\n", inode->i_ino);
+                /* If we haven't already done the open, do so now */
+                if (file->f_flags & O_LOV_DELAY_CREATE) {
+                        int rc2 = ll_file_open(inode, file);
+                        if (rc2)
+                                RETURN(rc2);
+                }
 
-        if (lmm->lmm_magic != LOV_MAGIC) {
-                CERROR("bad LOV magic %X\n", lmm->lmm_magic);
-                GOTO(out_free, rc = -EINVAL);
+                RETURN(-EALREADY);
         }
 
-        down(&lli->lli_open_sem);
-        if (lli->lli_smd) {
-                CERROR("striping data already set for %lu\n", inode->i_ino);
-                GOTO(out_lov_up, rc = -EPERM);
-        }
-        rc = obd_unpackmd(conn, &lli->lli_smd, lmm);
-        if (rc < 0) {
-                CERROR("error setting LOV striping on %lu: rc = %d\n",
-                       inode->i_ino, rc);
-                GOTO(out_lov_up, rc);
-        }
+        conn = ll_i2obdconn(inode);
+
+        rc = obd_iocontrol(LL_IOC_LOV_SETSTRIPE, conn, 0, &lsm, (void *)arg);
+        if (!rc)
+                rc = ll_create_open_obj(conn, inode, file, lsm);
+        up(&lli->lli_open_sem);
 
-        rc = ll_create_objects(inode->i_sb, inode->i_ino, 0, 0, &lli->lli_smd);
         if (rc) {
-                obd_free_memmd(conn, &lli->lli_smd);
-        } else {
-                file->f_flags &= ~O_LOV_DELAY_CREATE;
-                rc = ll_file_open(inode, file);
+                obd_free_memmd(conn, &lsm);
+                RETURN(rc);
         }
-out_lov_up:
-        up(&lli->lli_open_sem);
-out_free:
-        obd_free_wiremd(conn, &lmm);
-        return rc;
+        rc = ll_osc_open(conn, inode, file, lli->lli_smd);
+        RETURN(rc);
 }
 
-/* Retrieve object striping information.
- *
- * @arg is a pointer to a user struct with lmm_ost_count indicating
- * the maximum number of OST indices which will fit in the user buffer.
- * lmm_magic must be LOV_MAGIC.
- */
 static int ll_lov_getstripe(struct inode *inode, unsigned long arg)
 {
-        struct lov_mds_md lmm, *lmmu = (void *)arg, *lmmk = NULL;
         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
         struct lustre_handle *conn = ll_i2obdconn(inode);
-        int ost_count, rc, lmm_size;
 
         if (!lsm)
                 RETURN(-ENODATA);
 
-        rc = copy_from_user(&lmm, lmmu, sizeof(lmm));
-        if (rc)
-                RETURN(-EFAULT);
-
-        if (lmm.lmm_magic != LOV_MAGIC)
-                RETURN(-EINVAL);
-
-        if (lsm->lsm_stripe_count == 0)
-                ost_count = 1;
-        else {
-                struct obd_device *obd = class_conn2obd(conn);
-                struct lov_obd *lov = &obd->u.lov;
-                ost_count = lov->desc.ld_tgt_count;
-        }
-
-        /* XXX we _could_ check if indices > user lmm_ost_count are zero */
-        if (lmm.lmm_ost_count < ost_count)
-                RETURN(-EOVERFLOW);
-
-        rc = obd_packmd(conn, &lmmk, lsm);
-        if (rc < 0)
-                RETURN(rc);
-
-        lmm_size = rc;
-
-        /* LOV STACKING layering violation to make LOV/OSC return same data */
-        if (lsm->lsm_stripe_count == 0) {
-                struct lov_object_id *loi;
-
-                loi = (void *)lmmu + offsetof(typeof(*lmmu), lmm_objects);
-                rc = copy_to_user(loi, &lsm->lsm_object_id, sizeof(*loi));
-                if (rc) {
-                        lmm_size = 0;
-                        rc = -EFAULT;
-                } else {
-                        lmmk->lmm_magic = LOV_MAGIC;
-                        lmmk->lmm_ost_count = lmmk->lmm_stripe_count = 1;
-                }
-        }
-
-        if (lmm_size && copy_to_user(lmmu, lmmk, lmm_size))
-                rc = -EFAULT;
-
-        obd_free_wiremd(conn, &lmmk);
-
-        RETURN(rc);
+        return obd_iocontrol(LL_IOC_LOV_GETSTRIPE, conn, 0, lsm, (void *)arg);
 }
 
 int ll_file_ioctl(struct inode *inode, struct file *file, unsigned int cmd,
@@ -822,7 +783,7 @@ static int ll_inode_revalidate(struct dentry *dentry)
                 rc = mdc_getattr(&sbi->ll_mdc_conn, inode->i_ino,
                                  inode->i_mode, valid, datalen, &req);
                 if (rc) {
-                        CERROR("failure %d inode "LPX64"\n", rc, inode->i_ino);
+                        CERROR("failure %d inode %lu\n", rc, inode->i_ino);
                         ptlrpc_req_finished(req);
                         RETURN(-abs(rc));
                 }
index 54a81a4..81a5aad 100644 (file)
@@ -287,7 +287,9 @@ int ll_intent_lock(struct inode *parent, struct dentry **de,
                         GOTO(out, flag = LL_LOOKUP_POSITIVE);
                 }
 
-                /* Do a getattr now that we have the lock */
+                /* Do a getattr now that we have the lock, and fetch the
+                 * up-to-date stripe MD at the same time.
+                 */
                 valid = OBD_MD_FLNOTOBD;
                 if (it->it_op == IT_READLINK) {
                         datalen = mds_body->size;
@@ -340,7 +342,6 @@ int ll_intent_lock(struct inode *parent, struct dentry **de,
                 }
         }
 
-        EXIT;
  out:
         if (intent_finish != NULL) {
                 rc = intent_finish(flag, request, de, it, offset, ino);
@@ -485,17 +486,18 @@ static struct dentry *ll_lookup2(struct inode *parent, struct dentry *dentry,
 {
         struct dentry *save = dentry;
         int rc;
+        ENTRY;
 
         rc = ll_intent_lock(parent, &dentry, it, lookup2_finish);
         if (rc < 0) {
                 CERROR("ll_intent_lock: %d\n", rc);
-                return ERR_PTR(rc);
+                RETURN(ERR_PTR(rc));
         }
 
         if (dentry == save)
-                return NULL;
+                RETURN(NULL);
         else
-                return dentry;
+                RETURN(dentry);
 }
 
 static struct inode *ll_create_node(struct inode *dir, const char *name,
index 3310c34..4c7ad42 100644 (file)
@@ -35,9 +35,10 @@ int ll_recover(struct recovd_data *rd, int phase)
                                 list_entry(tmp, struct obd_import, imp_chain);
 
                         if (phase == PTLRPC_RECOVD_PHASE_PREPARE) {
-                                spin_lock(&imp->imp_lock);
+                                unsigned long flags;
+                                spin_lock_irqsave(&imp->imp_lock, flags);
                                 imp->imp_level = LUSTRE_CONN_RECOVD;
-                                spin_unlock(&imp->imp_lock);
+                                spin_unlock_irqrestore(&imp->imp_lock, flags);
                         }
                         imp->imp_recover(imp, phase);
                 }
index 7f486fb..e1402d1 100644 (file)
@@ -107,6 +107,16 @@ static int ll_brw(int cmd, struct inode *inode, struct page *page, int create)
         else
                 pg.count = PAGE_SIZE;
 
+        CDEBUG(D_PAGE, "%s %d bytes ino %lu at "LPU64"/"LPX64"\n",
+              cmd & OBD_BRW_WRITE ? "write" : "read", pg.count, inode->i_ino,
+              pg.off, pg.off);
+        if (pg.count == 0) {
+                CERROR("ZERO COUNT: ino %lu: size %p:%Lu(%p:%Lu) idx %lu off "
+                       LPU64"\n",
+                       inode->i_ino, inode, inode->i_size, page->mapping->host,
+                       page->mapping->host->i_size, page->index, pg.off);
+        }
+
         pg.flag = create ? OBD_BRW_CREATE : 0;
 
         set->brw_callback = ll_brw_sync_wait;
@@ -160,7 +170,7 @@ void ll_truncate(struct inode *inode)
 {
         struct obdo oa = {0};
         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
-        struct lustre_handle *lockhs = NULL;
+        struct lustre_handle lockh = { 0, 0 };
         int err;
         ENTRY;
 
@@ -174,10 +184,10 @@ void ll_truncate(struct inode *inode)
         oa.o_mode = inode->i_mode;
         oa.o_valid = OBD_MD_FLID | OBD_MD_FLMODE | OBD_MD_FLTYPE;
 
-        CDEBUG(D_INFO, "calling punch for "LPX64" (all bytes after "LPD64")\n",
+        CDEBUG(D_INFO, "calling punch for "LPX64" (all bytes after %Lu)\n",
                oa.o_id, inode->i_size);
 
-        err = ll_size_lock(inode, lsm, inode->i_size, LCK_PW, &lockhs);
+        err = ll_size_lock(inode, lsm, inode->i_size, LCK_PW, &lockh);
         if (err) {
                 CERROR("ll_size_lock failed: %d\n", err);
                 return;
@@ -191,7 +201,7 @@ void ll_truncate(struct inode *inode)
         else
                 obdo_to_inode(inode, &oa, oa.o_valid);
 
-        err = ll_size_unlock(inode, lsm, LCK_PW, lockhs);
+        err = ll_size_unlock(inode, lsm, LCK_PW, &lockh);
         if (err)
                 CERROR("ll_size_unlock failed: %d\n", err);
 
@@ -280,6 +290,7 @@ static int ll_commit_write(struct file *file, struct page *page,
 
         pg.pg = page;
         pg.count = to;
+        /* XXX make the starting offset "from" */
         pg.off = (((obd_off)page->index) << PAGE_SHIFT);
         pg.flag = create ? OBD_BRW_CREATE : 0;
 
@@ -292,7 +303,7 @@ static int ll_commit_write(struct file *file, struct page *page,
         if (!PageLocked(page))
                 LBUG();
 
-        CDEBUG(D_INODE, "commit_page writing (off "LPD64"), count "LPD64"\n",
+        CDEBUG(D_INODE, "commit_page writing (off "LPD64"), count %d\n",
                pg.off, pg.count);
 
         set->brw_callback = ll_brw_sync_wait;
index cb3ae90..73b6ea5 100644 (file)
@@ -95,8 +95,8 @@ static void ll_options(char *options, char **ost, char **mds, int *flags)
 #define log2(n) ffz(~(n))
 #endif
 
-static struct super_block * ll_read_super(struct super_block *sb,
-                                          void *data, int silent)
+static struct super_block *ll_read_super(struct super_block *sb,
+                                         void *data, int silent)
 {
         struct inode *root = 0;
         struct obd_device *obd;
@@ -112,13 +112,10 @@ static struct super_block * ll_read_super(struct super_block *sb,
         class_uuid_t uuid;
 
         ENTRY;
-        MOD_INC_USE_COUNT;
 
         OBD_ALLOC(sbi, sizeof(*sbi));
-        if (!sbi) {
-                MOD_DEC_USE_COUNT;
+        if (!sbi)
                 RETURN(NULL);
-        }
 
         INIT_LIST_HEAD(&sbi->ll_conn_chain);
         INIT_LIST_HEAD(&sbi->ll_orphan_dentry_list);
@@ -238,7 +235,6 @@ out_mdc:
 out_free:
         OBD_FREE(sbi, sizeof(*sbi));
 
-        MOD_DEC_USE_COUNT;
         goto out_dev;
 } /* ll_read_super */
 
@@ -275,7 +271,6 @@ static void ll_put_super(struct super_block *sb)
 
         OBD_FREE(sbi, sizeof(*sbi));
 
-        MOD_DEC_USE_COUNT;
         EXIT;
 } /* ll_put_super */
 
@@ -300,16 +295,16 @@ static void ll_clear_inode(struct inode *inode)
                 }
         }
 
-        if (atomic_read(&inode->i_count) == 0) {
-                char *symlink_name = lli->lli_symlink_name;
+        if (atomic_read(&inode->i_count) != 0)
+                CERROR("clearing in-use inode %lu: count = %d\n",
+                       inode->i_ino, atomic_read(&inode->i_count));
 
-                if (lli->lli_smd)
-                        obd_free_memmd(&sbi->ll_osc_conn, &lli->lli_smd);
+        if (lli->lli_smd)
+                obd_free_memmd(&sbi->ll_osc_conn, &lli->lli_smd);
 
-                if (symlink_name) {
-                        OBD_FREE(symlink_name, strlen(symlink_name) + 1);
-                        lli->lli_symlink_name = NULL;
-                }
+        if (lli->lli_symlink_name) {
+                OBD_FREE(lli->lli_symlink_name,strlen(lli->lli_symlink_name)+1);
+                lli->lli_symlink_name = NULL;
         }
 
         EXIT;
@@ -323,8 +318,9 @@ static void ll_delete_inode(struct inode *inode)
                 struct obdo *oa;
                 struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
 
+                /* mcreate with no open */
                 if (!lsm)
-                        GOTO(out, -EINVAL);
+                        GOTO(out, 0);
 
                 if (lsm->lsm_object_id == 0) {
                         CERROR("This really happens\n");
@@ -337,13 +333,13 @@ static void ll_delete_inode(struct inode *inode)
                         GOTO(out, -ENOMEM);
 
                 oa->o_id = lsm->lsm_object_id;
-                oa->o_mode = inode->i_mode;
-                oa->o_valid = OBD_MD_FLID | OBD_MD_FLEASIZE | OBD_MD_FLTYPE;
+                obdo_from_inode(oa, inode, OBD_MD_FLID | OBD_MD_FLTYPE);
 
                 err = obd_destroy(ll_i2obdconn(inode), oa, lsm);
                 obdo_free(oa);
-                CDEBUG(D_SUPER, "obd destroy of objid "LPX64" error %d\n",
-                       lsm->lsm_object_id, err);
+                if (err)
+                        CDEBUG(D_SUPER, "obd destroy objid "LPX64" error %d\n",
+                               lsm->lsm_object_id, err);
         }
 out:
         clear_inode(inode);
@@ -386,18 +382,23 @@ int ll_inode_setattr(struct inode *inode, struct iattr *attr, int do_trunc)
 {
         struct ptlrpc_request *request = NULL;
         struct ll_sb_info *sbi = ll_i2sbi(inode);
-        int err;
-
+        int err = 0;
         ENTRY;
 
         /* change incore inode */
         ll_attr2inode(inode, attr, do_trunc);
 
-        err = mdc_setattr(&sbi->ll_mdc_conn, inode, attr, &request);
-        if (err)
-                CERROR("mdc_setattr fails (%d)\n", err);
+        /* Don't send size changes to MDS to avoid "fast EA" problems, and
+         * also avoid a pointless RPC (we get file size from OST anyways).
+         */
+        attr->ia_valid &= ~ATTR_SIZE;
+        if (attr->ia_valid) {
+                err = mdc_setattr(&sbi->ll_mdc_conn, inode, attr, &request);
+                if (err)
+                        CERROR("mdc_setattr fails (%d)\n", err);
 
-        ptlrpc_req_finished(request);
+                ptlrpc_req_finished(request);
+        }
 
         RETURN(err);
 }
@@ -503,7 +504,6 @@ static void ll_read_inode2(struct inode *inode, void *opaque)
         /* core attributes first */
         ll_update_inode(inode, body);
 
-        //if (body->valid & OBD_MD_FLEASIZE)
         LASSERT(!lli->lli_smd);
         if (lic && lic->lic_lmm)
                 obd_unpackmd(ll_i2obdconn(inode), &lli->lli_smd, lic->lic_lmm);
@@ -515,8 +515,7 @@ static void ll_read_inode2(struct inode *inode, void *opaque)
                 rc = ll_file_size(inode, lli->lli_smd);
                 if (rc) {
                         CERROR("ll_file_size: %d\n", rc);
-                        /* FIXME: need to somehow prevent inode creation */
-                        LBUG();
+                        ll_clear_inode(inode);
                         make_bad_inode(inode);
                 }
         }
@@ -548,8 +547,8 @@ static inline void invalidate_request_list(struct list_head *req_list)
         list_for_each_safe(tmp, n, req_list) {
                 struct ptlrpc_request *req =
                         list_entry(tmp, struct ptlrpc_request, rq_list);
-                CERROR("invalidating req xid "LPD64" op %d to %s:%d\n",
-                       (unsigned long long)req->rq_xid, req->rq_reqmsg->opc,
+                CERROR("invalidating req xid "LPU64" op %d to %s:%d\n",
+                       req->rq_xid, req->rq_reqmsg->opc,
                        req->rq_connection->c_remote_uuid,
                        req->rq_import->imp_client->cli_request_portal);
                 req->rq_flags |= PTL_RPC_FL_ERR;
@@ -591,8 +590,11 @@ struct super_operations ll_super_operations =
         umount_begin: ll_umount_begin
 };
 
-struct file_system_type lustre_lite_fs_type = {
-        "lustre_lite", 0, ll_read_super, NULL
+static struct file_system_type lustre_lite_fs_type = {
+        name:           "lustre_lite",
+        fs_flags:       0,
+        read_super:     ll_read_super,
+        owner:          THIS_MODULE,
 };
 
 static int __init init_lustre_lite(void)
index cd6544a..557d715 100644 (file)
@@ -114,13 +114,10 @@ static int ll_fill_super(struct super_block *sb, void *data, int silent)
         class_uuid_t uuid;
 
         ENTRY;
-        MOD_INC_USE_COUNT;
 
         OBD_ALLOC(sbi, sizeof(*sbi));
-        if (!sbi) {
-                MOD_DEC_USE_COUNT;
+        if (!sbi)
                 RETURN(-ENOMEM);
-        }
 
         INIT_LIST_HEAD(&sbi->ll_conn_chain);
         generate_random_uuid(uuid);
@@ -238,7 +235,6 @@ out_mdc:
 out_free:
         OBD_FREE(sbi, sizeof(*sbi));
 
-        MOD_DEC_USE_COUNT;
         goto out_dev;
 } /* ll_fill_super */
 
@@ -272,25 +268,45 @@ static void ll_put_super(struct super_block *sb)
         obd_disconnect(&sbi->ll_mdc_conn);
         OBD_FREE(sbi, sizeof(*sbi));
 
-        MOD_DEC_USE_COUNT;
         EXIT;
 } /* ll_put_super */
 
 static void ll_clear_inode(struct inode *inode)
 {
+        struct ll_sb_info *sbi = ll_i2sbi(inode);
+        struct ll_inode_info *lli = ll_i2info(inode);
+        int rc;
         ENTRY;
 
-        if (atomic_read(&inode->i_count) == 0) {
-                struct ll_inode_info *lli = ll_i2info(inode);
-                char *symlink_name = lli->lli_symlink_name;
+#warning "Is there a reason we don't do this in 2.5, but we do in 2.4?"
+#if 0
+        rc = mdc_cancel_unused(&sbi->ll_mdc_conn, inode, LDLM_FL_NO_CALLBACK);
+        if (rc < 0) {
+                CERROR("mdc_cancel_unused: %d\n", rc);
+                /* XXX FIXME do something dramatic */
+        }
 
-                if (lli->lli_smd)
-                        obd_free_memmd(&sbi->ll_osc_conn, &lli->lli_smd);
-                if (symlink_name) {
-                        OBD_FREE(symlink_name, strlen(symlink_name) + 1);
-                        lli->lli_symlink_name = NULL;
+        if (lli->lli_smd) {
+                rc = obd_cancel_unused(&sbi->ll_osc_conn, lli->lli_smd, 0);
+                if (rc < 0) {
+                        CERROR("obd_cancel_unused: %d\n", rc);
+                        /* XXX FIXME do something dramatic */
                 }
         }
+#endif
+
+        if (atomic_read(&inode->i_count) != 0)
+                CERROR("clearing in-use inode %lu: count = %d\n",
+                       inode->i_ino, atomic_read(&inode->i_count));
+
+        if (lli->lli_smd)
+                obd_free_memmd(&sbi->ll_osc_conn, &lli->lli_smd);
+
+        if (lli->lli_symlink_name) {
+                OBD_FREE(lli->lli_symlink_name,strlen(lli->lli_symlink_name)+1);
+                lli->lli_symlink_name = NULL;
+        }
+
         EXIT;
 }
 
@@ -302,8 +318,9 @@ static void ll_delete_inode(struct inode *inode)
                 struct obdo *oa;
                 struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
 
+                /* mcreate with no open */
                 if (!lsm)
-                        GOTO(out, -EINVAL);
+                        GOTO(out, 0);
 
                 if (lsm->lsm_object_id == 0) {
                         CERROR("This really happens\n");
@@ -317,12 +334,13 @@ static void ll_delete_inode(struct inode *inode)
 
                 oa->o_id = lsm->lsm_object_id;
                 oa->o_mode = inode->i_mode;
-                oa->o_valid = OBD_MD_FLID | OBD_MD_FLEASIZE | OBD_MD_FLTYPE;
+                oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE;
 
                 err = obd_destroy(ll_i2obdconn(inode), oa, lsm);
                 obdo_free(oa);
-                CDEBUG(D_SUPER, "obd destroy of objid "LPX64" error %d\n",
-                       lsm->lsm_object_id, err);
+                if (err)
+                        CDEBUG(D_SUPER, "obd destroy objid "LPX64" error %d\n",
+                               lsm->lsm_object_id, err);
         }
 out:
         clear_inode(inode);
@@ -365,18 +383,24 @@ int ll_inode_setattr(struct inode *inode, struct iattr *attr, int do_trunc)
 {
         struct ptlrpc_request *request = NULL;
         struct ll_sb_info *sbi = ll_i2sbi(inode);
-        int err;
+        int err = 0;
 
         ENTRY;
 
         /* change incore inode */
         ll_attr2inode(inode, attr, do_trunc);
 
-        err = mdc_setattr(&sbi->ll_mdc_conn, inode, attr, &request);
-        if (err)
-                CERROR("mdc_setattr fails (%d)\n", err);
+        /* Don't send size changes to MDS to avoid "fast EA" problems, and
+         * also avoid a pointless RPC (we get file size from OST anyways).
+         */
+        attr->ia_valid &= ~ATTR_SIZE;
+        if (attr->ia_valid) {
+                err = mdc_setattr(&sbi->ll_mdc_conn, inode, attr, &request);
+                if (err)
+                        CERROR("mdc_setattr fails (%d)\n", err);
 
-        ptlrpc_req_finished(request);
+                ptlrpc_req_finished(request);
+        }
 
         RETURN(err);
 }
@@ -482,7 +506,6 @@ int ll_read_inode2(struct inode *inode, void *opaque)
         /* core attributes first */
         ll_update_inode(inode, body);
 
-        //if (body->valid & OBD_MD_FLEASIZE)
         LASSERT(!lli->lli_smd);
         if (lic && lic->lic_lmm)
                 obd_unpackmd(ll_i2obdconn(inode), &lli->lli_smd, lic->lic_lmm);
@@ -492,9 +515,9 @@ int ll_read_inode2(struct inode *inode, void *opaque)
                 rc = ll_file_size(inode, lli->lli_smd);
                 if (rc) {
                         CERROR("ll_file_size: %d\n", rc);
-                        /* FIXME: need to somehow prevent inode creation */
-                        LBUG();
+                        ll_clear_inode(inode);
                         make_bad_inode(inode);
+                        RETURN(rc);
                 }
         }
 
index ef86d58..5be4717 100644 (file)
@@ -89,7 +89,7 @@ static int ll_follow_link(struct dentry *dentry, struct nameidata *nd,
         struct inode *inode = dentry->d_inode;
         struct ll_inode_info *lli = ll_i2info(inode);
         struct ptlrpc_request *request;
-        int op, mode, rc;
+        int op = 0, mode = 0, rc;
         char *symname;
         ENTRY;
 
index fe5aad4..7135743 100644 (file)
@@ -21,6 +21,7 @@
 #include <linux/lustre_lib.h>
 #include <linux/lustre_net.h>
 #include <linux/lustre_idl.h>
+#include <linux/lustre_lite.h> /* for LL_IOC_LOV_[GS]ETSTRIPE */
 #include <linux/lustre_mds.h>
 #include <linux/obd_class.h>
 #include <linux/obd_lov.h>
@@ -42,10 +43,19 @@ struct lov_file_handles {
         struct lustre_handle *lfh_handles;
 };
 
+struct lov_lock_handles {
+        __u64 llh_cookie;
+        struct lustre_handle llh_handles[0];
+};
+
 extern int lov_packmd(struct lustre_handle *conn, struct lov_mds_md **lmm,
                        struct lov_stripe_md *lsm);
 extern int lov_unpackmd(struct lustre_handle *conn, struct lov_stripe_md **lsm,
                          struct lov_mds_md *lmm);
+extern int lov_setstripe(struct lustre_handle *conn,
+                         struct lov_stripe_md **lsmp, struct lov_mds_md *lmmu);
+extern int lov_getstripe(struct lustre_handle *conn, struct lov_mds_md *lmmu,
+                         struct lov_stripe_md *lsm);
 
 /* obd methods */
 int lov_attach(struct obd_device *dev, obd_count len, void *data)
@@ -72,10 +82,9 @@ static int lov_connect(struct lustre_handle *conn, struct obd_device *obd,
         int rc, rc2, i;
         ENTRY;
 
-        MOD_INC_USE_COUNT;
         rc = class_connect(conn, obd, cluuid);
         if (rc)
-                GOTO(out_dec, rc);
+                RETURN(rc);
 
         /* We don't want to actually do the underlying connections more than
          * once, so keep track. */
@@ -84,6 +93,7 @@ static int lov_connect(struct lustre_handle *conn, struct obd_device *obd,
                 RETURN(0);
 
         exp = class_conn2export(conn);
+        spin_lock_init(&exp->exp_lov_data.led_lock);
         INIT_LIST_HEAD(&exp->exp_lov_data.led_open_head);
 
         /* retrieve LOV metadata from MDS */
@@ -159,7 +169,6 @@ static int lov_connect(struct lustre_handle *conn, struct obd_device *obd,
 
         for (i = 0; i < desc->ld_tgt_count; i++) {
                 struct obd_device *tgt = client_tgtuuid2obd(uuidarray[i]);
-                int rc2;
 
                 if (!tgt) {
                         CERROR("Target %s not attached\n", uuidarray[i]);
@@ -174,26 +183,20 @@ static int lov_connect(struct lustre_handle *conn, struct obd_device *obd,
                 rc = obd_connect(&lov->tgts[i].conn, tgt, NULL, recovd,
                                  recover);
 
-                /* Register even if connect failed, so that we get reactivation
-                 * notices.
-                 */
-                rc2 = obd_iocontrol(IOC_OSC_REGISTER_LOV, &lov->tgts[i].conn,
-                                    sizeof(struct obd_device *), obd, NULL);
-                if (rc2) {
-                        CERROR("Target %s REGISTER_LOV error %d\n",
-                               uuidarray[i], rc2);
-                        GOTO(out_disc, rc2);
+                if (rc) {
+                        CERROR("Target %s connect error %d\n", uuidarray[i],
+                               rc);
+                        GOTO(out_disc, rc);
                 }
-
-                /* But mark failed-connect OSCs as inactive! */
+                        
+                rc = obd_iocontrol(IOC_OSC_REGISTER_LOV, &lov->tgts[i].conn,
+                                    sizeof(struct obd_device *), obd, NULL);
                 if (rc) {
-                        CDEBUG(D_INFO, "Target %s connect error %d\n",
+                        CERROR("Target %s REGISTER_LOV error %d\n",
                                uuidarray[i], rc);
-                        LASSERT(lov->tgts[i].active == 0);
-                        rc = 0;
-                        continue;
+                        GOTO(out_disc, rc);
                 }
-                
+
                 desc->ld_active_tgt_count++;
                 lov->tgts[i].active = 1;
         }
@@ -205,6 +208,7 @@ static int lov_connect(struct lustre_handle *conn, struct obd_device *obd,
         RETURN(rc);
 
  out_disc:
+        i--; /* skip failed-connect OSC */
         while (i-- > 0) {
                 desc->ld_active_tgt_count--;
                 lov->tgts[i].active = 0;
@@ -216,8 +220,6 @@ static int lov_connect(struct lustre_handle *conn, struct obd_device *obd,
         OBD_FREE(lov->tgts, lov->bufsize);
  out_conn:
         class_disconnect(conn);
- out_dec:
-        MOD_DEC_USE_COUNT;
         goto out;
 }
 
@@ -256,6 +258,7 @@ static int lov_disconnect(struct lustre_handle *conn)
         lov->tgts = NULL;
 
         exp = class_conn2export(conn);
+        spin_lock(&exp->exp_lov_data.led_lock);
         list_for_each_safe(p, n, &exp->exp_lov_data.led_open_head) {
                 /* XXX close these, instead of just discarding them? */
                 struct lov_file_handles *lfh;
@@ -267,11 +270,10 @@ static int lov_disconnect(struct lustre_handle *conn)
                          lfh->lfh_count * sizeof(*lfh->lfh_handles));
                 kmem_cache_free(lov_file_cache, lfh);
         }
+        spin_unlock(&exp->exp_lov_data.led_lock);
 
  out_local:
         rc = class_disconnect(conn);
-        if (!rc)
-                MOD_DEC_USE_COUNT;
         return rc;
 }
 
@@ -286,6 +288,7 @@ static int lov_set_osc_active(struct lov_obd *lov, obd_uuid_t uuid,
                               int activate)
 {
         struct obd_device *obd;
+        struct lov_tgt_desc *tgt;
         int i, rc = 0;
         ENTRY;
 
@@ -293,27 +296,31 @@ static int lov_set_osc_active(struct lov_obd *lov, obd_uuid_t uuid,
                lov, uuid, activate);
 
         spin_lock(&lov->lov_lock);
-        for (i = 0; i < lov->desc.ld_tgt_count; i++)
-                if (strncmp(uuid, lov->tgts[i].uuid,
-                            sizeof(lov->tgts[i].uuid)) == 0)
+        for (i = 0, tgt = lov->tgts; i < lov->desc.ld_tgt_count; i++, tgt++) {
+                CDEBUG(D_INFO, "lov idx %d is %s conn "LPX64"\n",
+                       i, tgt->uuid, tgt->conn.addr);
+                if (strncmp(uuid, tgt->uuid, sizeof(tgt->uuid)) == 0)
                         break;
+        }
 
         if (i == lov->desc.ld_tgt_count)
                 GOTO(out, rc = -EINVAL);
 
-        obd = class_conn2obd(&lov->tgts[i].conn);
+        obd = class_conn2obd(&tgt->conn);
         if (obd == NULL) {
                 LBUG();
                 GOTO(out, rc = -ENOTCONN);
         }
 
-        CDEBUG(D_INFO, "Found OBD %p type %s\n", obd, obd->obd_type->typ_name);
+        CDEBUG(D_INFO, "Found OBD %s=%s device %d (%p) type %s at LOV idx %d\n",
+               obd->obd_name, obd->obd_uuid, obd->obd_minor, obd,
+               obd->obd_type->typ_name, i);
         if (strcmp(obd->obd_type->typ_name, "osc") != 0) {
                 LBUG();
                 GOTO(out, rc = -EBADF);
         }
 
-        if (lov->tgts[i].active == activate) {
+        if (tgt->active == activate) {
                 CDEBUG(D_INFO, "OBD %p already %sactive!\n", obd,
                        activate ? "" : "in");
                 GOTO(out, rc = -EALREADY);
@@ -321,7 +328,7 @@ static int lov_set_osc_active(struct lov_obd *lov, obd_uuid_t uuid,
 
         CDEBUG(D_INFO, "Marking OBD %p %sactive\n", obd, activate ? "" : "in");
 
-        lov->tgts[i].active = activate;
+        tgt->active = activate;
         if (activate) {
                 /*
                  * foreach(export)
@@ -341,6 +348,7 @@ static int lov_set_osc_active(struct lov_obd *lov, obd_uuid_t uuid,
                 lov->desc.ld_active_tgt_count--;
         }
 
+#warning "FIXME: walk open files list for objects that need opening"
         EXIT;
  out:
         spin_unlock(&lov->lov_lock);
@@ -400,7 +408,8 @@ static int lov_create(struct lustre_handle *conn, struct obdo *oa,
         struct lov_stripe_md *lsm;
         struct lov_oinfo *loi;
         struct obdo *tmp;
-        int ost_count, ost_idx = 1;
+        int ost_count, ost_idx;
+        int first = 1, obj_alloc = 0;
         int rc = 0, i;
         ENTRY;
 
@@ -409,119 +418,111 @@ static int lov_create(struct lustre_handle *conn, struct obdo *oa,
         if (!export)
                 RETURN(-EINVAL);
 
-        tmp = obdo_alloc();
-        if (!tmp)
-                RETURN(-ENOMEM);
-
         lov = &export->exp_obd->u.lov;
 
         if (!lov->desc.ld_active_tgt_count)
                 RETURN(-EIO);
 
-        spin_lock(&lov->lov_lock);
-        ost_count = lov->desc.ld_tgt_count;
+        tmp = obdo_alloc();
+        if (!tmp)
+                RETURN(-ENOMEM);
 
         lsm = *ea;
 
-        /* Can't create more stripes than we have targets (incl inactive). */
-        if (lsm && lsm->lsm_stripe_count > lov->desc.ld_tgt_count)
-                GOTO(out_tmp, rc = -EINVAL);
-
-        /* Free the user lsm if it needs to be changed, to avoid memory leaks */
-        if (!lsm || (lsm &&
-                     lsm->lsm_stripe_count > lov->desc.ld_active_tgt_count)) {
-                struct lov_stripe_md *lsm_new = NULL;
-                rc = obd_alloc_memmd(conn, &lsm_new);
-                if (rc < 0) {
-                        spin_unlock(&lov->lov_lock);
-                        if (lsm)
-                                obd_free_memmd(conn, &lsm);
+        if (!lsm) {
+                rc = obd_alloc_memmd(conn, &lsm);
+                if (rc < 0)
                         GOTO(out_tmp, rc);
-                }
-                if (lsm) {
-                        LASSERT(lsm->lsm_magic == LOV_MAGIC);
-                        CERROR("replace user LOV MD: stripes %u > %u active\n",
-                               lsm->lsm_stripe_count,
-                               lov->desc.ld_active_tgt_count);
-                        lsm_new->lsm_stripe_offset = lsm->lsm_stripe_offset;
-                        lsm_new->lsm_stripe_size = lsm->lsm_stripe_size;
-                        lsm_new->lsm_stripe_pattern = lsm->lsm_stripe_pattern;
-                        obd_free_memmd(conn, &lsm);
-                }
-                lsm = lsm_new;
-                ost_idx = 0; /* if lsm->lsm_stripe_offset is set yet */
+
+                rc = 0;
                 lsm->lsm_magic = LOV_MAGIC;
         }
 
+        ost_count = lov->desc.ld_tgt_count;
+
         LASSERT(oa->o_valid & OBD_MD_FLID);
         lsm->lsm_object_id = oa->o_id;
         if (!lsm->lsm_stripe_size)
                 lsm->lsm_stripe_size = lov->desc.ld_default_stripe_size;
 
-        /* Because of 64-bit divide/mod operations only work with a 32-bit
-         * divisor in a 32-bit kernel, we cannot support a stripe width
-         * of 4GB or larger on 32-bit CPUs.
-         */
-        if (lsm->lsm_stripe_size * lsm->lsm_stripe_count > ~0UL) {
-                CERROR("LOV: stripe width "LPU64"x%u > %lu on 32-bit system\n",
-                       lsm->lsm_stripe_size, lsm->lsm_stripe_count, ~0UL);
-                spin_unlock(&lov->lov_lock);
-                GOTO(out_free, rc = -EINVAL);
-        }
-
-        if (!ost_idx || lsm->lsm_stripe_offset >= ost_count) {
+        if (!*ea || lsm->lsm_stripe_offset >= ost_count) {
                 int mult = lsm->lsm_object_id * lsm->lsm_stripe_count;
                 int stripe_offset = mult % ost_count;
                 int sub_offset = (mult / ost_count) % lsm->lsm_stripe_count;
 
-                lsm->lsm_stripe_offset = stripe_offset + sub_offset;
-        }
-
-        /* Start with lsm_stripe_offset on an active OSC to avoid confusion */
-        while (!lov->tgts[lsm->lsm_stripe_offset].active)
-                lsm->lsm_stripe_offset = (lsm->lsm_stripe_offset+1) % ost_count;
-
-        /* Pick the OSTs before we release the lock */
-        ost_idx = lsm->lsm_stripe_offset;
-        for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
-                CDEBUG(D_INODE, "objid "LPX64"[%d] is ost_idx %d (uuid %s)\n",
-                       lsm->lsm_object_id, i, ost_idx, lov->tgts[ost_idx].uuid);
-                loi->loi_ost_idx = ost_idx;
-                do {
-                        ost_idx = (ost_idx + 1) % ost_count;
-                } while (!lov->tgts[ost_idx].active);
-        }
-
-        spin_unlock(&lov->lov_lock);
+                ost_idx = stripe_offset + sub_offset;
+        } else
+                ost_idx = lsm->lsm_stripe_offset;
 
         CDEBUG(D_INODE, "allocating %d subobjs for objid "LPX64" at idx %d\n",
-               lsm->lsm_stripe_count,lsm->lsm_object_id,lsm->lsm_stripe_offset);
+               lsm->lsm_stripe_count, lsm->lsm_object_id, ost_idx);
 
-        for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
+        loi = lsm->lsm_oinfo;
+        for (i = 0; i < ost_count; i++, ost_idx = (ost_idx + 1) % ost_count) {
                 struct lov_stripe_md obj_md;
                 struct lov_stripe_md *obj_mdp = &obj_md;
+                int err;
 
-                ost_idx = loi->loi_ost_idx;
+                if (lov->tgts[ost_idx].active == 0) {
+                        CDEBUG(D_HA, "lov idx %d inactive\n", ost_idx);
+                        continue;
+                }
 
                 /* create data objects with "parent" OA */
                 memcpy(tmp, oa, sizeof(*tmp));
                 /* XXX: LOV STACKING: use real "obj_mdp" sub-data */
-                rc = obd_create(&lov->tgts[ost_idx].conn, tmp, &obj_mdp);
-                if (rc) {
-                        CERROR("error creating objid "LPX64" sub-object on "
-                               "OST idx %d: rc = %d\n", oa->o_id, ost_idx, rc);
-                        GOTO(out_cleanup, rc);
+                err = obd_create(&lov->tgts[ost_idx].conn, tmp, &obj_mdp);
+                if (err) {
+                        if (lov->tgts[ost_idx].active) {
+                                CERROR("error creating objid "LPX64" sub-object"
+                                       "on OST idx %d: rc = %d\n",
+                                       oa->o_id, ost_idx, err);
+                                if (!rc)
+                                        rc = err;
+                        }
+                        continue;
                 }
                 loi->loi_id = tmp->o_id;
+                loi->loi_ost_idx = ost_idx;
                 CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64" at idx %d\n",
                        lsm->lsm_object_id, loi->loi_id, ost_idx);
+
+                if (first) {
+                        lsm->lsm_stripe_offset = ost_idx;
+                        first = 0;
+                }
+
+                ++obj_alloc;
+                ++loi;
+
+                /* If we have allocated enough objects, we are OK */
+                if (obj_alloc == lsm->lsm_stripe_count) {
+                        rc = 0;
+                        GOTO(out_done, rc);
+                }
         }
 
+        if (*ea)
+                GOTO(out_cleanup, rc);
+        else {
+                struct lov_stripe_md *lsm_new;
+                /* XXX LOV STACKING call into osc for sizes */
+                int size = lov_stripe_md_size(obj_alloc);
+
+                OBD_ALLOC(lsm_new, size);
+                if (!lsm_new)
+                        GOTO(out_cleanup, rc = -ENOMEM);
+                memcpy(lsm_new, lsm, size);
+                /* XXX LOV STACKING call into osc for sizes */
+                OBD_FREE(lsm, lov_stripe_md_size(lsm->lsm_stripe_count));
+                lsm = lsm_new;
+        }
+ out_done:
         *ea = lsm;
 
  out_tmp:
         obdo_free(tmp);
-        RETURN(rc);
+        return rc;
 
  out_cleanup:
         while (i-- > 0) {
@@ -538,7 +539,6 @@ static int lov_create(struct lustre_handle *conn, struct obdo *oa,
                                oa->o_id, loi->loi_id, loi->loi_ost_idx,
                                err);
         }
- out_free:
         if (!*ea)
                 obd_free_memmd(conn, &lsm);
         goto out_tmp;
@@ -561,7 +561,7 @@ static int lov_destroy(struct lustre_handle *conn, struct obdo *oa,
         }
 
         if (lsm->lsm_magic != LOV_MAGIC) {
-                CERROR("LOV striping magic bad %#lx != %#lx\n",
+                CERROR("LOV striping magic bad %#x != %#x\n",
                        lsm->lsm_magic, LOV_MAGIC);
                 RETURN(-EINVAL);
         }
@@ -576,6 +576,7 @@ static int lov_destroy(struct lustre_handle *conn, struct obdo *oa,
         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
                 int err;
                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
+                        CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
                         /* Orphan clean up will (someday) fix this up. */
                         continue;
                 }
@@ -667,7 +668,7 @@ static int lov_getattr(struct lustre_handle *conn, struct obdo *oa,
         }
 
         if (lsm->lsm_magic != LOV_MAGIC) {
-                CERROR("LOV striping magic bad %#lx != %#lx\n",
+                CERROR("LOV striping magic bad %#x != %#x\n",
                        lsm->lsm_magic, LOV_MAGIC);
                 RETURN(-EINVAL);
         }
@@ -680,14 +681,15 @@ static int lov_getattr(struct lustre_handle *conn, struct obdo *oa,
         if (oa->o_valid & OBD_MD_FLHANDLE)
                 lfh = lov_handle2lfh(obdo_handle(oa));
 
+        CDEBUG(D_INFO, "objid "LPX64": %ux%u byte stripes\n",
+               lsm->lsm_object_id, lsm->lsm_stripe_count, lsm->lsm_stripe_size);
         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
                 int err;
 
-                if (loi->loi_id == 0)
-                        continue;
-
-                if (lov->tgts[loi->loi_ost_idx].active == 0)
+                if (lov->tgts[loi->loi_ost_idx].active == 0) {
+                        CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
                         continue;
+                }
 
                 CDEBUG(D_INFO, "objid "LPX64"[%d] has subobj "LPX64" at idx "
                        "%u\n", oa->o_id, i, loi->loi_id, loi->loi_ost_idx);
@@ -739,7 +741,7 @@ static int lov_setattr(struct lustre_handle *conn, struct obdo *oa,
         }
 
         if (lsm->lsm_magic != LOV_MAGIC) {
-                CERROR("LOV striping magic bad %#lx != %#lx\n",
+                CERROR("LOV striping magic bad %#x != %#x\n",
                        lsm->lsm_magic, LOV_MAGIC);
                 RETURN(-EINVAL);
         }
@@ -803,7 +805,7 @@ static int lov_open(struct lustre_handle *conn, struct obdo *oa,
         }
 
         if (lsm->lsm_magic != LOV_MAGIC) {
-                CERROR("LOV striping magic bad %#lx != %#lx\n",
+                CERROR("LOV striping magic bad %#x != %#x\n",
                        lsm->lsm_magic, LOV_MAGIC);
                 RETURN(-EINVAL);
         }
@@ -829,6 +831,7 @@ static int lov_open(struct lustre_handle *conn, struct obdo *oa,
         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
 
                 if (lov->tgts[loi->loi_ost_idx].active == 0) {
+                        CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
                         continue;
                 }
 
@@ -863,7 +866,9 @@ static int lov_open(struct lustre_handle *conn, struct obdo *oa,
         handle->addr = (__u64)(unsigned long)lfh;
         handle->cookie = lfh->lfh_cookie;
         oa->o_valid |= OBD_MD_FLHANDLE;
+        spin_lock(&export->exp_lov_data.led_lock);
         list_add(&lfh->lfh_list, &export->exp_lov_data.led_open_head);
+        spin_unlock(&export->exp_lov_data.led_lock);
 
 out_tmp:
         obdo_free(tmp);
@@ -914,7 +919,7 @@ static int lov_close(struct lustre_handle *conn, struct obdo *oa,
         }
 
         if (lsm->lsm_magic != LOV_MAGIC) {
-                CERROR("LOV striping magic bad %#lx != %#lx\n",
+                CERROR("LOV striping magic bad %#x != %#x\n",
                        lsm->lsm_magic, LOV_MAGIC);
                 RETURN(-EINVAL);
         }
@@ -928,9 +933,11 @@ static int lov_close(struct lustre_handle *conn, struct obdo *oa,
         lov = &export->exp_obd->u.lov;
         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
                 int err;
-                
-                if (lov->tgts[loi->loi_ost_idx].active == 0)
+
+                if (lov->tgts[loi->loi_ost_idx].active == 0) {
+                        CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
                         continue;
+                }
 
                 /* create data objects with "parent" OA */
                 memcpy(&tmp, oa, sizeof(tmp));
@@ -1029,7 +1036,7 @@ static int lov_punch(struct lustre_handle *conn, struct obdo *oa,
         }
 
         if (lsm->lsm_magic != LOV_MAGIC) {
-                CERROR("LOV striping magic bad %#lx != %#lx\n",
+                CERROR("LOV striping magic bad %#x != %#x\n",
                        lsm->lsm_magic, LOV_MAGIC);
                 RETURN(-EINVAL);
         }
@@ -1048,6 +1055,7 @@ static int lov_punch(struct lustre_handle *conn, struct obdo *oa,
 
                 if (starti == endi)
                         continue;
+
                 /* create data objects with "parent" OA */
                 memcpy(&tmp, oa, sizeof(tmp));
                 tmp.o_id = loi->loi_id;
@@ -1094,7 +1102,7 @@ static inline int lov_brw(int cmd, struct lustre_handle *conn,
         }
 
         if (lsm->lsm_magic != LOV_MAGIC) {
-                CERROR("LOV striping magic bad %#lx != %#lx\n",
+                CERROR("LOV striping magic bad %#x != %#x\n",
                        lsm->lsm_magic, LOV_MAGIC);
                 RETURN(-EINVAL);
         }
@@ -1159,13 +1167,49 @@ static inline int lov_brw(int cmd, struct lustre_handle *conn,
         RETURN(rc);
 }
 
+static struct lov_lock_handles *lov_newlockh(struct lov_stripe_md *lsm)
+{
+        struct lov_lock_handles *lov_lockh;
+
+        OBD_ALLOC(lov_lockh, sizeof(*lov_lockh) +
+                  sizeof(*lov_lockh->llh_handles) * lsm->lsm_stripe_count);
+        if (!lov_lockh)
+                return NULL;
+
+        get_random_bytes(&lov_lockh->llh_cookie, sizeof(lov_lockh->llh_cookie));
+
+        return lov_lockh;
+}
+
+/* We are only ever passed local lock handles here, so we do not need to
+ * validate (and we can't really because these structs are variable sized
+ * and therefore alloced, and not from a private slab).
+ *
+ * We just check because we can...
+ */
+static struct lov_lock_handles *lov_h2lovlockh(struct lustre_handle *handle)
+{
+        struct lov_lock_handles *lov_lockh = NULL;
+
+        if (!handle || !handle->addr)
+                RETURN(NULL);
+
+        lov_lockh = (struct lov_lock_handles *)(unsigned long)(handle->addr);
+        if (lov_lockh->llh_cookie != handle->cookie)
+                RETURN(NULL);
+
+        return lov_lockh;
+}
+
 static int lov_enqueue(struct lustre_handle *conn, struct lov_stripe_md *lsm,
                        struct lustre_handle *parent_lock,
                        __u32 type, void *cookie, int cookielen, __u32 mode,
                        int *flags, void *cb, void *data, int datalen,
-                       struct lustre_handle *lockhs)
+                       struct lustre_handle *lockh)
 {
         struct obd_export *export = class_conn2export(conn);
+        struct lov_lock_handles *lov_lockh = NULL;
+        struct lustre_handle *lov_lockhp;
         struct lov_obd *lov;
         struct lov_oinfo *loi;
         struct lov_stripe_md submd;
@@ -1178,7 +1222,7 @@ static int lov_enqueue(struct lustre_handle *conn, struct lov_stripe_md *lsm,
         }
 
         if (lsm->lsm_magic != LOV_MAGIC) {
-                CERROR("LOV striping magic bad %#lx != %#lx\n",
+                CERROR("LOV striping magic bad %#x != %#x\n",
                        lsm->lsm_magic, LOV_MAGIC);
                 RETURN(-EINVAL);
         }
@@ -1190,33 +1234,45 @@ static int lov_enqueue(struct lustre_handle *conn, struct lov_stripe_md *lsm,
         if (!export || !export->exp_obd)
                 RETURN(-ENODEV);
 
-        memset(lockhs, 0, sizeof(*lockhs) * lsm->lsm_stripe_count);
+        if (lsm->lsm_stripe_count > 1) {
+                lov_lockh = lov_newlockh(lsm);
+                if (!lov_lockh)
+                        RETURN(-ENOMEM);
+
+                lockh->addr = (__u64)(unsigned long)lov_lockh;
+                lockh->cookie = lov_lockh->llh_cookie;
+                lov_lockhp = lov_lockh->llh_handles;
+        } else
+                lov_lockhp = lockh;
 
         lov = &export->exp_obd->u.lov;
-        for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
+        for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count;
+             i++, loi++, lov_lockhp++) {
                 struct ldlm_extent *extent = (struct ldlm_extent *)cookie;
                 struct ldlm_extent sub_ext;
 
-                if (lov->tgts[loi->loi_ost_idx].active == 0)
+                if (lov->tgts[loi->loi_ost_idx].active == 0) {
+                        CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);
                         continue;
+                }
 
                 *flags = 0;
                 sub_ext.start = lov_stripe_offset(lsm, extent->start, i);
                 sub_ext.end = lov_stripe_offset(lsm, extent->end, i);
-                if (sub_ext.start == sub_ext.end)
+                if (sub_ext.start == sub_ext.end /* || !active */)
                         continue;
 
+                /* XXX LOV STACKING: submd should be from the subobj */
                 submd.lsm_object_id = loi->loi_id;
-                /* XXX submd should be that from the subobj, it should come
-                 *     opaquely from the LOV.
-                 */
                 submd.lsm_stripe_count = 0;
                 /* XXX submd is not fully initialized here */
                 *flags = 0;
                 rc = obd_enqueue(&(lov->tgts[loi->loi_ost_idx].conn), &submd,
                                  parent_lock, type, &sub_ext, sizeof(sub_ext),
-                                 mode, flags, cb, data, datalen, &(lockhs[i]));
+                                 mode, flags, cb, data, datalen, lov_lockhp);
                 // XXX add a lock debug statement here
+                if (rc)
+                        memset(lov_lockhp, 0, sizeof(*lov_lockhp));
                 if (rc && lov->tgts[loi->loi_ost_idx].active) {
                         CERROR("Error enqueue objid "LPX64" subobj "LPX64
                                " on OST idx %d: rc = %d\n", lsm->lsm_object_id,
@@ -1224,33 +1280,47 @@ static int lov_enqueue(struct lustre_handle *conn, struct lov_stripe_md *lsm,
                         goto out_locks;
                 }
         }
-
         RETURN(0);
 
- out_locks:
-        for (i--, loi = &lsm->lsm_oinfo[i]; i >= 0; i--, loi--) {
+out_locks:
+        while (loi--, lov_lockhp--, i-- > 0) {
+                struct lov_stripe_md submd;
                 int err;
-                
-                if (lov->tgts[loi->loi_ost_idx].active == 0)
+
+                if (lov_lockhp->addr == 0 ||
+                    lov->tgts[loi->loi_ost_idx].active == 0)
                         continue;
 
+                /* XXX LOV STACKING: submd should be from the subobj */
                 submd.lsm_object_id = loi->loi_id;
                 submd.lsm_stripe_count = 0;
                 err = obd_cancel(&lov->tgts[loi->loi_ost_idx].conn, &submd,
-                                 mode, &lockhs[i]);
+                                 mode, lov_lockhp);
                 if (err) {
-                        CERROR("Error cancelling objid "LPX64" subobj "LPX64
+                        CERROR("Error cancelling objid "LPX64
                                " on OST idx %d after enqueue error: rc = %d\n",
                                loi->loi_id, loi->loi_ost_idx, err);
                 }
         }
+
+        if (lsm->lsm_stripe_count > 1) {
+                lov_lockh->llh_cookie = DEAD_HANDLE_MAGIC;
+                OBD_FREE(lov_lockh, sizeof(*lov_lockh) +
+                          sizeof(*lov_lockh->llh_handles) *
+                          lsm->lsm_stripe_count);
+        }
+        lockh->addr = 0;
+        lockh->cookie = DEAD_HANDLE_MAGIC;
+
         RETURN(rc);
 }
 
 static int lov_cancel(struct lustre_handle *conn, struct lov_stripe_md *lsm,
-                      __u32 mode, struct lustre_handle *lockhs)
+                      __u32 mode, struct lustre_handle *lockh)
 {
         struct obd_export *export = class_conn2export(conn);
+        struct lov_lock_handles *lov_lockh = NULL;
+        struct lustre_handle *lov_lockhp;
         struct lov_obd *lov;
         struct lov_oinfo *loi;
         int rc = 0, i;
@@ -1262,7 +1332,7 @@ static int lov_cancel(struct lustre_handle *conn, struct lov_stripe_md *lsm,
         }
 
         if (lsm->lsm_magic != LOV_MAGIC) {
-                CERROR("LOV striping magic bad %#lx != %#lx\n",
+                CERROR("LOV striping magic bad %#x != %#x\n",
                        lsm->lsm_magic, LOV_MAGIC);
                 RETURN(-EINVAL);
         }
@@ -1270,29 +1340,55 @@ static int lov_cancel(struct lustre_handle *conn, struct lov_stripe_md *lsm,
         if (!export || !export->exp_obd)
                 RETURN(-ENODEV);
 
+        LASSERT(lockh);
+        if (lsm->lsm_stripe_count > 1) {
+                lov_lockh = lov_h2lovlockh(lockh);
+                if (!lov_lockh) {
+                        CERROR("LOV: invalid lov lock handle %p\n", lockh);
+                        RETURN(-EINVAL);
+                }
+
+                lov_lockhp = lov_lockh->llh_handles;
+        } else
+                lov_lockhp = lockh;
+
         lov = &export->exp_obd->u.lov;
-        for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
+        for (i = 0, loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count;
+             i++, loi++, lov_lockhp++ ) {
                 struct lov_stripe_md submd;
                 int err;
 
-                if (lov->tgts[loi->loi_ost_idx].active == 0)
-                        continue;
-
-                if (lockhs[i].addr == 0)
+                if (lov_lockhp->addr == 0) {
+                        CDEBUG(D_HA, "lov idx %d no lock?\n", loi->loi_ost_idx);
                         continue;
+                }
 
+                /* XXX LOV STACKING: submd should be from the subobj */
                 submd.lsm_object_id = loi->loi_id;
                 submd.lsm_stripe_count = 0;
                 err = obd_cancel(&lov->tgts[loi->loi_ost_idx].conn, &submd,
-                                mode, &lockhs[i]);
-                if (err && lov->tgts[loi->loi_ost_idx].active) {
-                        CERROR("Error cancel objid "LPX64" subobj "LPX64
-                               " on OST idx %d: rc = %d\n", lsm->lsm_object_id,
-                               loi->loi_id, loi->loi_ost_idx, err);
-                        if (!rc)
-                                rc = err;
+                                 mode, lov_lockhp);
+                if (err) {
+                        if (lov->tgts[loi->loi_ost_idx].active) {
+                                CERROR("Error cancel objid "LPX64" subobj "
+                                       LPX64" on OST idx %d: rc = %d\n",
+                                       lsm->lsm_object_id,
+                                       loi->loi_id, loi->loi_ost_idx, err);
+                                if (!rc)
+                                        rc = err;
+                        }
                 }
         }
+
+        if (lsm->lsm_stripe_count > 1) {
+                lov_lockh->llh_cookie = DEAD_HANDLE_MAGIC;
+                OBD_FREE(lov_lockh, sizeof(*lov_lockh) +
+                          sizeof(*lov_lockh->llh_handles) *
+                          lsm->lsm_stripe_count);
+        }
+        lockh->addr = 0;
+        lockh->cookie = DEAD_HANDLE_MAGIC;
+
         RETURN(rc);
 }
 
@@ -1302,7 +1398,7 @@ static int lov_cancel_unused(struct lustre_handle *conn,
         struct obd_export *export = class_conn2export(conn);
         struct lov_obd *lov;
         struct lov_oinfo *loi;
-        int rc = 0, i, err;
+        int rc = 0, i;
         ENTRY;
 
         if (!lsm) {
@@ -1316,6 +1412,7 @@ static int lov_cancel_unused(struct lustre_handle *conn,
         lov = &export->exp_obd->u.lov;
         for (i = 0,loi = lsm->lsm_oinfo; i < lsm->lsm_stripe_count; i++,loi++) {
                 struct lov_stripe_md submd;
+                int err;
 
                 submd.lsm_object_id = loi->loi_id;
                 submd.lsm_stripe_count = 0;
@@ -1352,12 +1449,14 @@ static int lov_statfs(struct lustre_handle *conn, struct obd_statfs *osfs)
         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
                 int err;
 
-                if (!lov->tgts[i].active)
+                if (!lov->tgts[i].active) {
+                        CDEBUG(D_HA, "lov idx %d inactive\n", i);
                         continue;
+                }
 
                 err = obd_statfs(&lov->tgts[i].conn, &lov_sfs);
                 if (err) {
-                        CERROR("Error statfs OSC %s idx %d: err = %d\n",
+                        CERROR("Error statfs OSC %s i %d: err = %d\n",
                                lov->tgts[i].uuid, i, err);
                         if (!rc)
                                 rc = err;
@@ -1389,7 +1488,6 @@ static int lov_iocontrol(unsigned int cmd, struct lustre_handle *conn, int len,
 {
         struct obd_device *obddev = class_conn2obd(conn);
         struct lov_obd *lov = &obddev->u.lov;
-        struct obd_ioctl_data *data = karg;
         int i, count = lov->desc.ld_tgt_count;
         int rc;
 
@@ -1397,10 +1495,12 @@ static int lov_iocontrol(unsigned int cmd, struct lustre_handle *conn, int len,
 
         switch (cmd) {
         case IOC_LOV_SET_OSC_ACTIVE: {
+                struct obd_ioctl_data *data = karg;
                 rc = lov_set_osc_active(lov,data->ioc_inlbuf1,data->ioc_offset);
                 break;
         }
         case OBD_IOC_LOV_GET_CONFIG: {
+                struct obd_ioctl_data *data = karg;
                 struct lov_tgt_desc *tgtdesc;
                 struct lov_desc *desc;
                 obd_uuid_t *uuidp;
@@ -1437,13 +1537,21 @@ static int lov_iocontrol(unsigned int cmd, struct lustre_handle *conn, int len,
                 OBD_FREE(buf, len);
                 break;
         }
+        case LL_IOC_LOV_SETSTRIPE:
+                rc = lov_setstripe(conn, karg, uarg);
+                break;
+        case LL_IOC_LOV_GETSTRIPE:
+                rc = lov_getstripe(conn, karg, uarg);
+                break;
         default:
                 if (count == 0)
                         RETURN(-ENOTTY);
                 rc = 0;
                 for (i = 0; i < count; i++) {
-                        int err = obd_iocontrol(cmd, &lov->tgts[i].conn,
-                                                len, karg, uarg);
+                        int err;
+
+                        err = obd_iocontrol(cmd, &lov->tgts[i].conn,
+                                            len, karg, uarg);
                         if (err && !rc)
                                 rc = err;
                 }
@@ -1453,6 +1561,7 @@ static int lov_iocontrol(unsigned int cmd, struct lustre_handle *conn, int len,
 }
 
 struct obd_ops lov_obd_ops = {
+        o_owner:       THIS_MODULE,
         o_attach:      lov_attach,
         o_detach:      lov_detach,
         o_setup:       lov_setup,
index 247015c..3d4b4b8 100644 (file)
 #include <linux/lustre_net.h>
 #include <linux/obd.h>
 #include <linux/obd_lov.h>
+#include <linux/obd_class.h>
 #include <linux/obd_support.h>
 
 /* lov_packdesc() is in mds/mds_lov.c */
-
 void lov_unpackdesc(struct lov_desc *ld)
 {
         ld->ld_tgt_count = NTOH__u32(ld->ld_tgt_count);
@@ -39,6 +39,28 @@ void lov_unpackdesc(struct lov_desc *ld)
         ld->ld_pattern = HTON__u32(ld->ld_pattern);
 }
 
+void lov_dump_lmm(int level, struct lov_mds_md *lmm)
+{
+        struct lov_object_id *loi;
+        int idx;
+
+        CDEBUG(level, "objid "LPX64", magic %#08x, ost_count %u\n",
+               lmm->lmm_object_id, lmm->lmm_magic, lmm->lmm_ost_count);
+        CDEBUG(level,"stripe_size %u, stripe_count %u, stripe_offset %u\n",
+               lmm->lmm_stripe_size, lmm->lmm_stripe_count,
+               lmm->lmm_stripe_offset);
+        for (idx = 0, loi = lmm->lmm_objects; idx < lmm->lmm_ost_count;
+             idx++, loi++)
+                CDEBUG(level, "ost idx %u subobj "LPX64"\n", idx,
+                       loi->l_object_id);
+}
+
+#define LMM_ASSERT(test)                                                \
+do {                                                                    \
+        if (!(test)) lov_dump_lmm(D_ERROR, lmm);                        \
+        LASSERT(test); /* so we know what assertion failed */           \
+} while(0)
+
 /* Pack LOV object metadata for shipment to the MDS.
  *
  * XXX In the future, this will be enhanced to get the EA size from the
@@ -60,12 +82,19 @@ int lov_packmd(struct lustre_handle *conn, struct lov_mds_md **lmmp,
         ENTRY;
 
         if (lsm) {
+                int i, max = 0;
                 if (lsm->lsm_magic != LOV_MAGIC) {
-                        CERROR("bad mem LOV MAGIC: %#08x != %#08x\n",
+                        CERROR("bad mem LOV MAGIC: %#010x != %#010x\n",
                                lsm->lsm_magic, LOV_MAGIC);
                         RETURN(-EINVAL);
                 }
                 stripe_count = lsm->lsm_stripe_count;
+
+                for (i = 0,loi = lsm->lsm_oinfo; i < stripe_count; i++,loi++) {
+                        if (loi->loi_ost_idx > max)
+                                max = loi->loi_ost_idx;
+                }
+                ost_count = max + 1;
         }
 
         /* XXX LOV STACKING call into osc for sizes */
@@ -93,14 +122,14 @@ int lov_packmd(struct lustre_handle *conn, struct lov_mds_md **lmmp,
         lmm->lmm_stripe_count = (stripe_count);
         if (!lsm)
                 RETURN(lmm_size);
+
         /* XXX endianness */
         lmm->lmm_magic = (lsm->lsm_magic);
         lmm->lmm_object_id = (lsm->lsm_object_id);
         LASSERT(lsm->lsm_object_id);
         lmm->lmm_stripe_size = (lsm->lsm_stripe_size);
-        lmm->lmm_stripe_pattern = (lsm->lsm_stripe_pattern);
         lmm->lmm_stripe_offset = (lsm->lsm_stripe_offset);
-        lmm->lmm_ost_count = (lov->desc.ld_tgt_count);
+        lmm->lmm_ost_count = (ost_count);
 
         /* Only fill in the object ids which we are actually using.
          * Assumes lmm_objects is otherwise zero-filled. */
@@ -113,6 +142,16 @@ int lov_packmd(struct lustre_handle *conn, struct lov_mds_md **lmmp,
         RETURN(lmm_size);
 }
 
+static int lov_get_stripecnt(struct lov_obd *lov, int stripe_count)
+{
+        if (!stripe_count)
+                stripe_count = lov->desc.ld_default_stripe_count;
+        if (!stripe_count || stripe_count > lov->desc.ld_active_tgt_count)
+                stripe_count = lov->desc.ld_active_tgt_count;
+
+        return stripe_count;
+}
+
 int lov_unpackmd(struct lustre_handle *conn, struct lov_stripe_md **lsmp,
                  struct lov_mds_md *lmm)
 {
@@ -120,9 +159,9 @@ int lov_unpackmd(struct lustre_handle *conn, struct lov_stripe_md **lsmp,
         struct lov_obd *lov = &obd->u.lov;
         struct lov_stripe_md *lsm;
         struct lov_oinfo *loi;
-        int ost_count = lov->desc.ld_active_tgt_count;
+        int ost_count;
         int ost_offset = 0;
-        int stripe_count = 0;
+        int stripe_count;
         int lsm_size;
         int i;
         ENTRY;
@@ -135,12 +174,9 @@ int lov_unpackmd(struct lustre_handle *conn, struct lov_stripe_md **lsmp,
                         RETURN(-EINVAL);
                 }
                 stripe_count = (lmm->lmm_stripe_count);
-        }
-
-        if (!stripe_count)
-                stripe_count = lov->desc.ld_default_stripe_count;
-        if (!stripe_count || stripe_count > ost_count)
-                stripe_count = ost_count;
+                LASSERT(stripe_count);
+        } else
+                stripe_count = lov_get_stripecnt(lov, 0);
 
         /* XXX LOV STACKING call into osc for sizes */
         lsm_size = lov_stripe_md_size(stripe_count);
@@ -171,9 +207,12 @@ int lov_unpackmd(struct lustre_handle *conn, struct lov_stripe_md **lsmp,
         ost_offset = lsm->lsm_stripe_offset = (lmm->lmm_stripe_offset);
         lsm->lsm_magic = (lmm->lmm_magic);
         lsm->lsm_object_id = (lmm->lmm_object_id);
-        LASSERT(lsm->lsm_object_id);
         lsm->lsm_stripe_size = (lmm->lmm_stripe_size);
-        lsm->lsm_stripe_pattern = (lmm->lmm_stripe_pattern);
+
+        ost_count = (lmm->lmm_ost_count);
+
+        LMM_ASSERT(lsm->lsm_object_id);
+        LMM_ASSERT(ost_count);
 
         for (i = 0, loi = lsm->lsm_oinfo; i < ost_count; i++, ost_offset++) {
                 ost_offset %= ost_count;
@@ -181,13 +220,126 @@ int lov_unpackmd(struct lustre_handle *conn, struct lov_stripe_md **lsmp,
                 if (!lmm->lmm_objects[ost_offset].l_object_id)
                         continue;
 
-                LASSERT(loi - lsm->lsm_oinfo < stripe_count);
+                LMM_ASSERT(loi - lsm->lsm_oinfo < stripe_count);
                 /* XXX LOV STACKING call down to osc_unpackmd() */
                 loi->loi_id = (lmm->lmm_objects[ost_offset].l_object_id);
                 loi->loi_ost_idx = ost_offset;
                 loi++;
         }
-        LASSERT(loi - lsm->lsm_oinfo == stripe_count);
+        LMM_ASSERT(loi - lsm->lsm_oinfo > 0);
+        LMM_ASSERT(loi - lsm->lsm_oinfo == stripe_count);
 
         RETURN(lsm_size);
 }
+
+/* Configure object striping information on a new file.
+ *
+ * @lmmu is a pointer to a user struct with one or more of the fields set to
+ * indicate the application preference: lmm_stripe_count, lmm_stripe_size,
+ * lmm_stripe_offset, and lmm_stripe_pattern.  lmm_magic must be LOV_MAGIC.
+ * @lsmp is a pointer to an in-core stripe MD that needs to be filled in.
+ */
+int lov_setstripe(struct lustre_handle *conn, struct lov_stripe_md **lsmp,
+                  struct lov_mds_md *lmmu)
+{
+        struct obd_device *obd = class_conn2obd(conn);
+        struct lov_obd *lov = &obd->u.lov;
+        struct lov_mds_md lmm;
+        struct lov_stripe_md *lsm;
+        int stripe_count;
+        int rc;
+        ENTRY;
+
+        rc = copy_from_user(&lmm, lmmu, sizeof(lmm));
+        if (rc)
+                RETURN(-EFAULT);
+
+        if (lmm.lmm_magic != LOV_MAGIC) {
+                CERROR("bad wire LOV MAGIC: %#08x != %#08x\n",
+                       lmm.lmm_magic, LOV_MAGIC);
+                RETURN(-EINVAL);
+        }
+        if (lmm.lmm_stripe_count > lov->desc.ld_tgt_count) {
+                CERROR("stripe count %d more than OST count %d\n",
+                       (int)lmm.lmm_stripe_count, lov->desc.ld_tgt_count);
+                RETURN(-EINVAL);
+        }
+        if (lmm.lmm_stripe_offset >= lov->desc.ld_tgt_count) {
+                CERROR("stripe offset %d more than max OST index %d\n",
+                       (int)lmm.lmm_stripe_count, lov->desc.ld_tgt_count);
+                RETURN(-EINVAL);
+        }
+        if (lmm.lmm_stripe_size & (PAGE_SIZE - 1)) {
+                CERROR("stripe size %u not multiple of %lu\n",
+                       lmm.lmm_stripe_size, PAGE_SIZE);
+                RETURN(-EINVAL);
+        }
+        if (lmm.lmm_stripe_size * lmm.lmm_stripe_count > ~0UL) {
+                CERROR("stripe width %ux%u > %lu on 32-bit system\n",
+                       lmm.lmm_stripe_size, (int)lmm.lmm_stripe_count, ~0UL);
+                RETURN(-EINVAL);
+        }
+
+        stripe_count = lov_get_stripecnt(lov, lmm.lmm_stripe_count);
+
+        /* XXX LOV STACKING call into osc for sizes */
+        OBD_ALLOC(lsm, lov_stripe_md_size(stripe_count));
+        if (!lsm)
+                RETURN(-ENOMEM);
+
+        lsm->lsm_magic = LOV_MAGIC;
+        /* This is all validated in lov_create() */
+        lsm->lsm_stripe_count = stripe_count;
+        lsm->lsm_stripe_offset = lmm.lmm_stripe_offset;
+        lsm->lsm_stripe_size = lmm.lmm_stripe_size;
+
+        *lsmp = lsm;
+
+        RETURN(rc);
+}
+
+/* Retrieve object striping information.
+ *
+ * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
+ * the maximum number of OST indices which will fit in the user buffer.
+ * lmm_magic must be LOV_MAGIC.
+ */
+int lov_getstripe(struct lustre_handle *conn, struct lov_stripe_md *lsm,
+                  struct lov_mds_md *lmmu)
+{
+        struct obd_device *obd = class_conn2obd(conn);
+        struct lov_obd *lov = &obd->u.lov;
+        struct lov_mds_md lmm, *lmmk = NULL;
+        int ost_count, rc, lmm_size;
+        ENTRY;
+
+        if (!lsm)
+                RETURN(-ENODATA);
+
+        rc = copy_from_user(&lmm, lmmu, sizeof(lmm));
+        if (rc)
+                RETURN(-EFAULT);
+
+        if (lmm.lmm_magic != LOV_MAGIC)
+                RETURN(-EINVAL);
+
+        ost_count = lov->desc.ld_tgt_count;
+
+        /* XXX we _could_ check if indices > user lmm_ost_count are zero */
+        if (lmm.lmm_ost_count < ost_count)
+                RETURN(-EOVERFLOW);
+
+        rc = lov_packmd(conn, &lmmk, lsm);
+        if (rc < 0)
+                RETURN(rc);
+
+        lmm_size = rc;
+        rc = 0;
+
+        if (lmm_size && copy_to_user(lmmu, lmmk, lmm_size))
+                rc = -EFAULT;
+
+        obd_free_wiremd(conn, &lmmk);
+
+        RETURN(rc);
+}
index a68b57e..0812e00 100644 (file)
  * Common STATUS namespace
  */
 
-int rd_uuid(char* page, char **start, off_t off, int count, int *eof, 
+int rd_uuid(char *page, char **start, off_t off, int count, int *eof,
             void *data)
 {
-        int len = 0;
         struct obd_device* dev = (struct obd_device*)data;
-        len += snprintf(page, count, "%s\n", dev->obd_uuid);
-        return len;
-        
-
+        return snprintf(page, count, "%s\n", dev->obd_uuid);
 }
-int rd_stripesize(char* page, char **start, off_t off, int count, int *eof, 
+
+int rd_stripesize(char *page, char **start, off_t off, int count, int *eof,
                   void *data)
 {
-        struct obd_device* dev = (struct obd_device*)data;
-        int len = 0; 
-        struct lov_obd* lov = &dev->u.lov;
-        len += snprintf(page, count, LPU64"\n", 
-                        (__u64)(lov->desc.ld_default_stripe_size));
-        
-        return len;
+        struct obd_device *dev = (struct obd_device*)data;
+        struct lov_desc *desc = &dev->u.lov.desc;
+
+        return snprintf(page, count, LPU64"\n", desc->ld_default_stripe_size);
 }
 
-int rd_stripeoffset(char* page, char **start, off_t off, int count, int *eof, 
+int rd_stripeoffset(char *page, char **start, off_t off, int count, int *eof,
                     void *data)
 {
         struct obd_device* dev = (struct obd_device*)data;
-        int len = 0;
         struct lov_obd* lov = &dev->u.lov;
-        len += snprintf(page, count, LPU64"\n", 
-                        lov->desc.ld_default_stripe_offset);
-        return len;
 
+        return snprintf(page, count, LPU64"\n",
+                        lov->desc.ld_default_stripe_offset);
 }
 
-int rd_stripetype(char* page, char **start, off_t off, int count, int *eof, 
+int rd_stripetype(char *page, char **start, off_t off, int count, int *eof,
                   void *data)
 {