Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / osd / osd_handler.c
index 7324e27..98a872f 100644 (file)
@@ -1,29 +1,43 @@
 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
  * vim:expandtab:shiftwidth=8:tabstop=8:
  *
- *  lustre/osd/osd_handler.c
- *  Top-level entry points into osd module
+ * GPL HEADER START
  *
- *  Copyright (c) 2006 Cluster File Systems, Inc.
- *   Author: Nikita Danilov <nikita@clusterfs.com>
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
- *   This file is part of the Lustre file system, http://www.lustre.org
- *   Lustre is a trademark of Cluster File Systems, Inc.
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
  *
- *   You may have signed or agreed to another license before downloading
- *   this software.  If so, you are bound by the terms and conditions
- *   of that agreement, and the following does not apply to you.  See the
- *   LICENSE file included with this distribution for more information.
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
  *
- *   If you did not agree to a different license, then this copy of Lustre
- *   is open source software; you can redistribute it and/or modify it
- *   under the terms of version 2 of the GNU General Public License as
- *   published by the Free Software Foundation.
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
  *
- *   In either case, Lustre is distributed in the hope that it will be
- *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
- *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- *   license text for more details.
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright  2008 Sun Microsystems, Inc. All rights reserved
+ * Use is subject to license terms.
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ * Lustre is a trademark of Sun Microsystems, Inc.
+ *
+ * lustre/osd/osd_handler.c
+ *
+ * Top-level entry points into osd module
+ *
+ * Author: Nikita Danilov <nikita@clusterfs.com>
  */
 
 #ifndef EXPORT_SYMTAB
 #include <obd_support.h>
 /* struct ptlrpc_thread */
 #include <lustre_net.h>
-/* LUSTRE_OSD_NAME */
-#include <obd.h>
-/* class_register_type(), class_unregister_type(), class_get_type() */
-#include <obd_class.h>
-#include <lustre_disk.h>
 
 /* fid_is_local() */
 #include <lustre_fid.h>
@@ -81,7 +90,7 @@ struct osd_directory {
 
 struct osd_object {
         struct dt_object       oo_dt;
-        /*
+        /**
          * Inode for file system object represented by this osd_object. This
          * inode is pinned for the whole duration of lu_object life.
          *
@@ -91,58 +100,16 @@ struct osd_object {
         struct inode          *oo_inode;
         struct rw_semaphore    oo_sem;
         struct osd_directory  *oo_dir;
-        /* protects inode attributes. */
+        /** protects inode attributes. */
         spinlock_t             oo_guard;
-#if OSD_COUNTERS
         const struct lu_env   *oo_owner;
+#ifdef CONFIG_LOCKDEP
+        struct lockdep_map     oo_dep_map;
 #endif
 };
 
-/*
- * osd device.
- */
-struct osd_device {
-        /* super-class */
-        struct dt_device          od_dt_dev;
-        /* information about underlying file system */
-        struct lustre_mount_info *od_mount;
-        /* object index */
-        struct osd_oi             od_oi;
-        /*
-         * XXX temporary stuff for object index: directory where every object
-         * is named by its fid.
-         */
-        struct dentry            *od_obj_area;
-
-        /* Environment for transaction commit callback.
-         * Currently, OSD is based on ext3/JBD. Transaction commit in ext3/JBD
-         * is serialized, that is there is no more than one transaction commit
-         * at a time (JBD journal_commit_transaction() is serialized).
-         * This means that it's enough to have _one_ lu_context.
-         */
-        struct lu_env             od_env_for_commit;
-
-        /*
-         * Fid Capability
-         */
-        unsigned int              od_fl_capa:1;
-        unsigned long             od_capa_timeout;
-        __u32                     od_capa_alg;
-        struct lustre_capa_key   *od_capa_keys;
-        struct hlist_head        *od_capa_hash;
-        
-        /*
-         * statfs optimization: we cache a bit.
-         */
-        cfs_time_t                od_osfs_age;
-        struct kstatfs            od_kstatfs;
-        spinlock_t                od_osfs_lock;
-};
-
 static int   osd_root_get      (const struct lu_env *env,
                                 struct dt_device *dev, struct lu_fid *f);
-static int   osd_statfs        (const struct lu_env *env,
-                                struct dt_device *dev, struct kstatfs *sfs);
 
 static int   lu_device_is_osd  (const struct lu_device *d);
 static void  osd_mod_exit      (void) __exit;
@@ -150,12 +117,13 @@ static int   osd_mod_init      (void) __init;
 static int   osd_type_init     (struct lu_device_type *t);
 static void  osd_type_fini     (struct lu_device_type *t);
 static int   osd_object_init   (const struct lu_env *env,
-                                struct lu_object *l);
+                                struct lu_object *l,
+                                const struct lu_object_conf *_);
 static void  osd_object_release(const struct lu_env *env,
                                 struct lu_object *l);
 static int   osd_object_print  (const struct lu_env *env, void *cookie,
                                 lu_printer_t p, const struct lu_object *o);
-static void  osd_device_free   (const struct lu_env *env,
+static struct lu_device *osd_device_free   (const struct lu_env *env,
                                 struct lu_device *m);
 static void *osd_key_init      (const struct lu_context *ctx,
                                 struct lu_context_key *key);
@@ -248,23 +216,24 @@ static struct thandle     *osd_trans_start  (const struct lu_env *env,
                                              struct txn_param *p);
 static journal_t          *osd_journal      (const struct osd_device *dev);
 
-static struct lu_device_type_operations osd_device_type_ops;
+static const struct lu_device_type_operations osd_device_type_ops;
 static struct lu_device_type            osd_device_type;
-static struct lu_object_operations      osd_lu_obj_ops;
+static const struct lu_object_operations      osd_lu_obj_ops;
 static struct obd_ops                   osd_obd_device_ops;
-static struct lprocfs_vars              lprocfs_osd_module_vars[];
-static struct lprocfs_vars              lprocfs_osd_obd_vars[];
-static struct lu_device_operations      osd_lu_ops;
+static const struct lu_device_operations      osd_lu_ops;
 static struct lu_context_key            osd_key;
-static struct dt_object_operations      osd_obj_ops;
-static struct dt_body_operations        osd_body_ops;
-static struct dt_index_operations       osd_index_ops;
-static struct dt_index_operations       osd_index_compat_ops;
+static const struct dt_object_operations      osd_obj_ops;
+static const struct dt_body_operations        osd_body_ops;
+static const struct dt_index_operations       osd_index_ops;
+static const struct dt_index_operations       osd_index_compat_ops;
 
 struct osd_thandle {
         struct thandle          ot_super;
         handle_t               *ot_handle;
         struct journal_callback ot_jcb;
+        /* Link to the device, for debugging. */
+        struct lu_ref_link     *ot_dev_link;
+
 };
 
 /*
@@ -298,7 +267,6 @@ static inline struct osd_thread_info *osd_oti_get(const struct lu_env *env)
         return lu_context_key_get(&env->le_ctx, &osd_key);
 }
 
-#if OSD_COUNTERS
 /*
  * Concurrency: doesn't matter
  */
@@ -316,15 +284,6 @@ static int osd_write_locked(const struct lu_env *env, struct osd_object *o)
         return oti->oti_w_locks > 0 && o->oo_owner == env;
 }
 
-#define OSD_COUNTERS_DO(exp) exp
-#else
-
-
-#define osd_read_locked(env, o) (1)
-#define osd_write_locked(env, o) (1)
-#define OSD_COUNTERS_DO(exp) ((void)0)
-#endif
-
 /*
  * Concurrency: doesn't access mutable data
  */
@@ -382,19 +341,20 @@ static void osd_object_init0(struct osd_object *obj)
  * Concurrency: no concurrent access is possible that early in object
  * life-cycle.
  */
-static int osd_object_init(const struct lu_env *env, struct lu_object *l)
+static int osd_object_init(const struct lu_env *env, struct lu_object *l,
+                           const struct lu_object_conf *_)
 {
         struct osd_object *obj = osd_obj(l);
         int result;
 
-        LASSERT(osd_invariant(obj));
+        LINVRNT(osd_invariant(obj));
 
         result = osd_fid_lookup(env, obj, lu_object_fid(l));
         if (result == 0) {
                 if (obj->oo_inode != NULL)
                         osd_object_init0(obj);
         }
-        LASSERT(osd_invariant(obj));
+        LINVRNT(osd_invariant(obj));
         return result;
 }
 
@@ -406,7 +366,7 @@ static void osd_object_free(const struct lu_env *env, struct lu_object *l)
 {
         struct osd_object *obj = osd_obj(l);
 
-        LASSERT(osd_invariant(obj));
+        LINVRNT(osd_invariant(obj));
 
         dt_object_fini(&obj->oo_dt);
         OBD_FREE_PTR(obj);
@@ -496,7 +456,7 @@ static void osd_object_delete(const struct lu_env *env, struct lu_object *l)
         struct osd_object *obj   = osd_obj(l);
         struct inode      *inode = obj->oo_inode;
 
-        LASSERT(osd_invariant(obj));
+        LINVRNT(osd_invariant(obj));
 
         /*
          * If object is unlinked remove fid->ino mapping from object index.
@@ -556,8 +516,8 @@ static int osd_object_print(const struct lu_env *env, void *cookie,
 /*
  * Concurrency: shouldn't matter.
  */
-static int osd_statfs(const struct lu_env *env,
-                      struct dt_device *d, struct kstatfs *sfs)
+int osd_statfs(const struct lu_env *env, struct dt_device *d,
+               struct kstatfs *sfs)
 {
         struct osd_device *osd = osd_dt_dev(d);
         struct super_block *sb = osd_sb(osd);
@@ -614,6 +574,7 @@ static void osd_trans_commit_cb(struct journal_callback *jcb, int error)
         struct osd_thandle *oh = container_of0(jcb, struct osd_thandle, ot_jcb);
         struct thandle     *th = &oh->ot_super;
         struct dt_device   *dev = th->th_dev;
+        struct lu_device   *lud = &dev->dd_lu_dev;
 
         LASSERT(dev != NULL);
         LASSERT(oh->ot_handle == NULL);
@@ -621,14 +582,18 @@ static void osd_trans_commit_cb(struct journal_callback *jcb, int error)
         if (error) {
                 CERROR("transaction @0x%p commit error: %d\n", th, error);
         } else {
+                struct lu_env *env = &osd_dt_dev(dev)->od_env_for_commit;
                 /*
                  * This od_env_for_commit is only for commit usage.  see
                  * "struct dt_device"
                  */
-                dt_txn_hook_commit(&osd_dt_dev(dev)->od_env_for_commit, th);
+                lu_context_enter(&env->le_ctx);
+                dt_txn_hook_commit(env, th);
+                lu_context_exit(&env->le_ctx);
         }
 
-        lu_device_put(&dev->dd_lu_dev);
+        lu_ref_del_at(&lud->ld_reference, oh->ot_dev_link, "osd-tx", th);
+        lu_device_put(lud);
         th->th_dev = NULL;
 
         lu_context_exit(&th->th_ctx);
@@ -658,6 +623,8 @@ static struct thandle *osd_trans_start(const struct lu_env *env,
         if (osd_param_is_sane(dev, p)) {
                 OBD_ALLOC_GFP(oh, sizeof *oh, CFS_ALLOC_IO);
                 if (oh != NULL) {
+                        struct osd_thread_info *oti = osd_oti_get(env);
+
                         /*
                          * XXX temporary stuff. Some abstraction layer should
                          * be used.
@@ -671,22 +638,18 @@ static struct thandle *osd_trans_start(const struct lu_env *env,
                                 th->th_result = 0;
                                 jh->h_sync = p->tp_sync;
                                 lu_device_get(&d->dd_lu_dev);
+                                oh->ot_dev_link = lu_ref_add
+                                        (&d->dd_lu_dev.ld_reference,
+                                         "osd-tx", th);
                                 /* add commit callback */
                                 lu_context_init(&th->th_ctx, LCT_TX_HANDLE);
                                 lu_context_enter(&th->th_ctx);
                                 journal_callback_set(jh, osd_trans_commit_cb,
                                                      (struct journal_callback *)&oh->ot_jcb);
-#if OSD_COUNTERS
-                                {
-                                        struct osd_thread_info *oti =
-                                                osd_oti_get(env);
-
                                         LASSERT(oti->oti_txns == 0);
                                         LASSERT(oti->oti_r_locks == 0);
                                         LASSERT(oti->oti_w_locks == 0);
                                         oti->oti_txns++;
-                                }
-#endif
                         } else {
                                 OBD_FREE_PTR(oh);
                                 th = (void *)jh;
@@ -708,35 +671,25 @@ static void osd_trans_stop(const struct lu_env *env, struct thandle *th)
 {
         int result;
         struct osd_thandle *oh;
+        struct osd_thread_info *oti = osd_oti_get(env);
 
         ENTRY;
 
         oh = container_of0(th, struct osd_thandle, ot_super);
         if (oh->ot_handle != NULL) {
                 handle_t *hdl = oh->ot_handle;
-                /*
-                 * XXX temporary stuff. Some abstraction layer should be used.
-                 */
+
+                LASSERT(oti->oti_txns == 1);
+                oti->oti_txns--;
+                LASSERT(oti->oti_r_locks == 0);
+                LASSERT(oti->oti_w_locks == 0);
                 result = dt_txn_hook_stop(env, th);
                 if (result != 0)
                         CERROR("Failure in transaction hook: %d\n", result);
-
-                /**/
                 oh->ot_handle = NULL;
                 result = journal_stop(hdl);
                 if (result != 0)
                         CERROR("Failure to stop transaction: %d\n", result);
-
-#if OSD_COUNTERS
-                {
-                        struct osd_thread_info *oti = osd_oti_get(env);
-
-                        LASSERT(oti->oti_txns == 1);
-                        LASSERT(oti->oti_r_locks == 0);
-                        LASSERT(oti->oti_w_locks == 0);
-                        oti->oti_txns--;
-                }
-#endif
         }
         EXIT;
 }
@@ -750,6 +703,28 @@ static int osd_sync(const struct lu_env *env, struct dt_device *d)
         return ldiskfs_force_commit(osd_sb(osd_dt_dev(d)));
 }
 
+/**
+ * Start commit for OSD device.
+ *
+ * An implementation of dt_commit_async method for OSD device.
+ * Asychronously starts underlayng fs sync and thereby a transaction
+ * commit.
+ *
+ * \param env environment
+ * \param d dt device
+ *
+ * \see dt_device_operations
+ */
+static int osd_commit_async(const struct lu_env *env,
+                            struct dt_device *d)
+{
+        struct super_block *s = osd_sb(osd_dt_dev(d));
+        ENTRY;
+
+        CDEBUG(D_HA, "async commit OSD %s\n", LUSTRE_OSD_NAME);
+        RETURN(s->s_op->sync_fs(s, 0));
+}
+
 /*
  * Concurrency: shouldn't matter.
  */
@@ -825,7 +800,7 @@ static int osd_credit_get(const struct lu_env *env, struct dt_device *d,
         return osd_dto_credits[op];
 }
 
-static struct dt_device_operations osd_dt_ops = {
+static const struct dt_device_operations osd_dt_ops = {
         .dt_root_get       = osd_root_get,
         .dt_statfs         = osd_statfs,
         .dt_trans_start    = osd_trans_start,
@@ -833,63 +808,52 @@ static struct dt_device_operations osd_dt_ops = {
         .dt_conf_get       = osd_conf_get,
         .dt_sync           = osd_sync,
         .dt_ro             = osd_ro,
+        .dt_commit_async   = osd_commit_async,
         .dt_credit_get     = osd_credit_get,
         .dt_init_capa_ctxt = osd_init_capa_ctxt,
 };
 
 static void osd_object_read_lock(const struct lu_env *env,
-                                 struct dt_object *dt)
+                                 struct dt_object *dt, unsigned role)
 {
         struct osd_object *obj = osd_dt_obj(dt);
+        struct osd_thread_info *oti = osd_oti_get(env);
 
-        LASSERT(osd_invariant(obj));
+        LINVRNT(osd_invariant(obj));
 
-        OSD_COUNTERS_DO(LASSERT(obj->oo_owner != env));
-        down_read(&obj->oo_sem);
-#if OSD_COUNTERS
-        {
-                struct osd_thread_info *oti = osd_oti_get(env);
+        LASSERT(obj->oo_owner != env);
+        down_read_nested(&obj->oo_sem, role);
 
                 LASSERT(obj->oo_owner == NULL);
                 oti->oti_r_locks++;
-        }
-#endif
 }
 
 static void osd_object_write_lock(const struct lu_env *env,
-                                  struct dt_object *dt)
+                                  struct dt_object *dt, unsigned role)
 {
         struct osd_object *obj = osd_dt_obj(dt);
+        struct osd_thread_info *oti = osd_oti_get(env);
 
-        LASSERT(osd_invariant(obj));
+        LINVRNT(osd_invariant(obj));
 
-        OSD_COUNTERS_DO(LASSERT(obj->oo_owner != env));
-        down_write(&obj->oo_sem);
-#if OSD_COUNTERS
-        {
-                struct osd_thread_info *oti = osd_oti_get(env);
+        LASSERT(obj->oo_owner != env);
+        down_write_nested(&obj->oo_sem, role);
 
                 LASSERT(obj->oo_owner == NULL);
                 obj->oo_owner = env;
                 oti->oti_w_locks++;
-        }
-#endif
 }
 
 static void osd_object_read_unlock(const struct lu_env *env,
                                    struct dt_object *dt)
 {
         struct osd_object *obj = osd_dt_obj(dt);
-
-        LASSERT(osd_invariant(obj));
-#if OSD_COUNTERS
-        {
                 struct osd_thread_info *oti = osd_oti_get(env);
 
+        LINVRNT(osd_invariant(obj));
+
                 LASSERT(oti->oti_r_locks > 0);
                 oti->oti_r_locks--;
-        }
-#endif
         up_read(&obj->oo_sem);
 }
 
@@ -897,18 +861,14 @@ static void osd_object_write_unlock(const struct lu_env *env,
                                     struct dt_object *dt)
 {
         struct osd_object *obj = osd_dt_obj(dt);
-
-        LASSERT(osd_invariant(obj));
-#if OSD_COUNTERS
-        {
                 struct osd_thread_info *oti = osd_oti_get(env);
 
+        LINVRNT(osd_invariant(obj));
+
                 LASSERT(obj->oo_owner == env);
                 LASSERT(oti->oti_w_locks > 0);
                 oti->oti_w_locks--;
                 obj->oo_owner = NULL;
-        }
-#endif
         up_write(&obj->oo_sem);
 }
 
@@ -1006,7 +966,7 @@ static int osd_attr_get(const struct lu_env *env,
         struct osd_object *obj = osd_dt_obj(dt);
 
         LASSERT(dt_object_exists(dt));
-        LASSERT(osd_invariant(obj));
+        LINVRNT(osd_invariant(obj));
 
         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
                 return -EACCES;
@@ -1128,7 +1088,7 @@ static int osd_mkfile(struct osd_thread_info *info, struct osd_object *obj,
         struct inode       *parent;
         struct inode       *inode;
 
-        LASSERT(osd_invariant(obj));
+        LINVRNT(osd_invariant(obj));
         LASSERT(obj->oo_inode == NULL);
         LASSERT(osd->od_obj_area != NULL);
 
@@ -1147,7 +1107,7 @@ static int osd_mkfile(struct osd_thread_info *info, struct osd_object *obj,
                 result = 0;
         } else
                 result = PTR_ERR(inode);
-        LASSERT(osd_invariant(obj));
+        LINVRNT(osd_invariant(obj));
         return result;
 }
 
@@ -1215,7 +1175,7 @@ static int osd_mknod(struct osd_thread_info *info, struct osd_object *obj,
         struct inode      *dir;
         umode_t mode = attr->la_mode & (S_IFMT | S_IRWXUGO | S_ISVTX);
 
-        LASSERT(osd_invariant(obj));
+        LINVRNT(osd_invariant(obj));
         LASSERT(obj->oo_inode == NULL);
         LASSERT(osd->od_obj_area != NULL);
         LASSERT(S_ISCHR(mode) || S_ISBLK(mode) ||
@@ -1229,7 +1189,7 @@ static int osd_mknod(struct osd_thread_info *info, struct osd_object *obj,
                 LASSERT(obj->oo_inode != NULL);
                 init_special_inode(obj->oo_inode, mode, attr->la_rdev);
         }
-        LASSERT(osd_invariant(obj));
+        LINVRNT(osd_invariant(obj));
         return result;
 }
 
@@ -1293,7 +1253,7 @@ static int osd_object_create(const struct lu_env *env, struct dt_object *dt,
 
         ENTRY;
 
-        LASSERT(osd_invariant(obj));
+        LINVRNT(osd_invariant(obj));
         LASSERT(!dt_object_exists(dt));
         LASSERT(osd_write_locked(env, obj));
         LASSERT(th != NULL);
@@ -1321,7 +1281,7 @@ static int osd_object_create(const struct lu_env *env, struct dt_object *dt,
         }
 
         LASSERT(ergo(result == 0, dt_object_exists(dt)));
-        LASSERT(osd_invariant(obj));
+        LINVRNT(osd_invariant(obj));
         RETURN(result);
 }
 
@@ -1335,22 +1295,17 @@ static void osd_object_ref_add(const struct lu_env *env,
         struct osd_object *obj = osd_dt_obj(dt);
         struct inode *inode = obj->oo_inode;
 
-        LASSERT(osd_invariant(obj));
+        LINVRNT(osd_invariant(obj));
         LASSERT(dt_object_exists(dt));
         LASSERT(osd_write_locked(env, obj));
         LASSERT(th != NULL);
 
         spin_lock(&obj->oo_guard);
-        if (inode->i_nlink < LDISKFS_LINK_MAX) {
-                inode->i_nlink ++;
-                spin_unlock(&obj->oo_guard);
-                mark_inode_dirty(inode);
-        } else {
-                spin_unlock(&obj->oo_guard);
-                LU_OBJECT_DEBUG(D_ERROR, env, &dt->do_lu,
-                                "Overflowed nlink\n");
-        }
-        LASSERT(osd_invariant(obj));
+        LASSERT(inode->i_nlink < LDISKFS_LINK_MAX);
+        inode->i_nlink++;
+        spin_unlock(&obj->oo_guard);
+        mark_inode_dirty(inode);
+        LINVRNT(osd_invariant(obj));
 }
 
 /*
@@ -1363,22 +1318,17 @@ static void osd_object_ref_del(const struct lu_env *env,
         struct osd_object *obj = osd_dt_obj(dt);
         struct inode *inode = obj->oo_inode;
 
-        LASSERT(osd_invariant(obj));
+        LINVRNT(osd_invariant(obj));
         LASSERT(dt_object_exists(dt));
         LASSERT(osd_write_locked(env, obj));
         LASSERT(th != NULL);
 
         spin_lock(&obj->oo_guard);
-        if (inode->i_nlink > 0) {
-                inode->i_nlink --;
-                spin_unlock(&obj->oo_guard);
-                mark_inode_dirty(inode);
-        } else {
-                spin_unlock(&obj->oo_guard);
-                LU_OBJECT_DEBUG(D_ERROR, env, &dt->do_lu,
-                                "Underflowed nlink\n");
-        }
-        LASSERT(osd_invariant(obj));
+        LASSERT(inode->i_nlink > 0);
+        inode->i_nlink--;
+        spin_unlock(&obj->oo_guard);
+        mark_inode_dirty(inode);
+        LINVRNT(osd_invariant(obj));
 }
 
 /*
@@ -1413,12 +1363,12 @@ static int osd_xattr_set(const struct lu_env *env, struct dt_object *dt,
                          const struct lu_buf *buf, const char *name, int fl,
                          struct thandle *handle, struct lustre_capa *capa)
 {
-        int fs_flags;
-
         struct osd_object      *obj    = osd_dt_obj(dt);
         struct inode           *inode  = obj->oo_inode;
         struct osd_thread_info *info   = osd_oti_get(env);
         struct dentry          *dentry = &info->oti_dentry;
+        struct timespec        *t      = &info->oti_time;
+        int                     fs_flags = 0, rc;
 
         LASSERT(dt_object_exists(dt));
         LASSERT(inode->i_op != NULL && inode->i_op->setxattr != NULL);
@@ -1428,17 +1378,24 @@ static int osd_xattr_set(const struct lu_env *env, struct dt_object *dt,
         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
                 return -EACCES;
 
-        dentry->d_inode = inode;
-
-        fs_flags = 0;
         if (fl & LU_XATTR_REPLACE)
                 fs_flags |= XATTR_REPLACE;
 
         if (fl & LU_XATTR_CREATE)
                 fs_flags |= XATTR_CREATE;
 
-        return inode->i_op->setxattr(dentry, name,
-                                     buf->lb_buf, buf->lb_len, fs_flags);
+        dentry->d_inode = inode;
+        *t = inode->i_ctime;
+        rc = inode->i_op->setxattr(dentry, name,
+                                   buf->lb_buf, buf->lb_len, fs_flags);
+        if (likely(rc == 0)) {
+                /* ctime should not be updated with server-side time. */
+                spin_lock(&obj->oo_guard);
+                inode->i_ctime = *t;
+                spin_unlock(&obj->oo_guard);
+                mark_inode_dirty(inode);
+        }
+        return rc;
 }
 
 /*
@@ -1478,6 +1435,8 @@ static int osd_xattr_del(const struct lu_env *env,
         struct inode           *inode  = obj->oo_inode;
         struct osd_thread_info *info   = osd_oti_get(env);
         struct dentry          *dentry = &info->oti_dentry;
+        struct timespec        *t      = &info->oti_time;
+        int                     rc;
 
         LASSERT(dt_object_exists(dt));
         LASSERT(inode->i_op != NULL && inode->i_op->removexattr != NULL);
@@ -1488,7 +1447,16 @@ static int osd_xattr_del(const struct lu_env *env,
                 return -EACCES;
 
         dentry->d_inode = inode;
-        return inode->i_op->removexattr(dentry, name);
+        *t = inode->i_ctime;
+        rc = inode->i_op->removexattr(dentry, name);
+        if (likely(rc == 0)) {
+                /* ctime should not be updated with server-side time. */
+                spin_lock(&obj->oo_guard);
+                inode->i_ctime = *t;
+                spin_unlock(&obj->oo_guard);
+                mark_inode_dirty(inode);
+        }
+        return rc;
 }
 
 static struct obd_capa *osd_capa_get(const struct lu_env *env,
@@ -1510,7 +1478,7 @@ static struct obd_capa *osd_capa_get(const struct lu_env *env,
                 RETURN(ERR_PTR(-ENOENT));
 
         LASSERT(dt_object_exists(dt));
-        LASSERT(osd_invariant(obj));
+        LINVRNT(osd_invariant(obj));
 
         /* renewal sanity check */
         if (old && osd_object_auth(env, dt, old, opc))
@@ -1534,7 +1502,7 @@ static struct obd_capa *osd_capa_get(const struct lu_env *env,
         spin_unlock(&capa_lock);
 
         capa->lc_keyid = key->lk_keyid;
-        capa->lc_expiry = CURRENT_SECONDS + dev->od_capa_timeout;
+        capa->lc_expiry = cfs_time_current_sec() + dev->od_capa_timeout;
 
         rc = capa_hmac(capa->lc_hmac, capa, key->lk_key);
         if (rc) {
@@ -1546,7 +1514,27 @@ static struct obd_capa *osd_capa_get(const struct lu_env *env,
         RETURN(oc);
 }
 
-static struct dt_object_operations osd_obj_ops = {
+static int osd_object_sync(const struct lu_env *env, struct dt_object *dt)
+{
+        int rc;
+        struct osd_object      *obj    = osd_dt_obj(dt);
+        struct inode           *inode  = obj->oo_inode;
+        struct osd_thread_info *info   = osd_oti_get(env);
+        struct dentry          *dentry = &info->oti_dentry;
+        struct file            *file   = &info->oti_file;
+        ENTRY;
+
+        dentry->d_inode = inode;
+        file->f_dentry = dentry;
+        file->f_mapping = inode->i_mapping;
+        file->f_op = inode->i_fop;
+        LOCK_INODE_MUTEX(inode);
+        rc = file->f_op->fsync(file, dentry, 0);
+        UNLOCK_INODE_MUTEX(inode);
+        RETURN(rc);
+}
+
+static const struct dt_object_operations osd_obj_ops = {
         .do_read_lock    = osd_object_read_lock,
         .do_write_lock   = osd_object_write_lock,
         .do_read_unlock  = osd_object_read_unlock,
@@ -1563,6 +1551,7 @@ static struct dt_object_operations osd_obj_ops = {
         .do_xattr_del    = osd_xattr_del,
         .do_xattr_list   = osd_xattr_list,
         .do_capa_get     = osd_capa_get,
+        .do_object_sync  = osd_object_sync,
 };
 
 /*
@@ -1618,7 +1607,7 @@ static ssize_t osd_write(const struct lu_env *env, struct dt_object *dt,
         return result;
 }
 
-static struct dt_body_operations osd_body_ops = {
+static const struct dt_body_operations osd_body_ops = {
         .dbo_read  = osd_read,
         .dbo_write = osd_write
 };
@@ -1691,7 +1680,7 @@ static int osd_index_try(const struct lu_env *env, struct dt_object *dt,
         int result;
         struct osd_object *obj = osd_dt_obj(dt);
 
-        LASSERT(osd_invariant(obj));
+        LINVRNT(osd_invariant(obj));
         LASSERT(dt_object_exists(dt));
 
         if (osd_object_is_root(obj)) {
@@ -1735,7 +1724,7 @@ static int osd_index_try(const struct lu_env *env, struct dt_object *dt,
                 if (!osd_index_probe(env, obj, feat))
                         result = -ENOTDIR;
         }
-        LASSERT(osd_invariant(obj));
+        LINVRNT(osd_invariant(obj));
 
         return result;
 }
@@ -1752,7 +1741,7 @@ static int osd_index_delete(const struct lu_env *env, struct dt_object *dt,
 
         ENTRY;
 
-        LASSERT(osd_invariant(obj));
+        LINVRNT(osd_invariant(obj));
         LASSERT(dt_object_exists(dt));
         LASSERT(bag->ic_object == obj->oo_inode);
         LASSERT(handle != NULL);
@@ -1770,7 +1759,7 @@ static int osd_index_delete(const struct lu_env *env, struct dt_object *dt,
 
         rc = iam_delete(oh->ot_handle, bag, (const struct iam_key *)key, ipd);
         osd_ipd_put(env, bag, ipd);
-        LASSERT(osd_invariant(obj));
+        LINVRNT(osd_invariant(obj));
         RETURN(rc);
 }
 
@@ -1785,7 +1774,7 @@ static int osd_index_lookup(const struct lu_env *env, struct dt_object *dt,
 
         ENTRY;
 
-        LASSERT(osd_invariant(obj));
+        LINVRNT(osd_invariant(obj));
         LASSERT(dt_object_exists(dt));
         LASSERT(bag->ic_object == obj->oo_inode);
 
@@ -1799,7 +1788,7 @@ static int osd_index_lookup(const struct lu_env *env, struct dt_object *dt,
         rc = iam_lookup(bag, (const struct iam_key *)key,
                         (struct iam_rec *)rec, ipd);
         osd_ipd_put(env, bag, ipd);
-        LASSERT(osd_invariant(obj));
+        LINVRNT(osd_invariant(obj));
 
         RETURN(rc);
 }
@@ -1816,7 +1805,7 @@ static int osd_index_insert(const struct lu_env *env, struct dt_object *dt,
 
         ENTRY;
 
-        LASSERT(osd_invariant(obj));
+        LINVRNT(osd_invariant(obj));
         LASSERT(dt_object_exists(dt));
         LASSERT(bag->ic_object == obj->oo_inode);
         LASSERT(th != NULL);
@@ -1834,7 +1823,7 @@ static int osd_index_insert(const struct lu_env *env, struct dt_object *dt,
         rc = iam_insert(oh->ot_handle, bag, (const struct iam_key *)key,
                         (struct iam_rec *)rec, ipd);
         osd_ipd_put(env, bag, ipd);
-        LASSERT(osd_invariant(obj));
+        LINVRNT(osd_invariant(obj));
         RETURN(rc);
 }
 
@@ -1956,7 +1945,7 @@ static struct dt_rec *osd_it_rec(const struct lu_env *env,
         return (struct dt_rec *)iam_it_rec_get(&it->oi_it);
 }
 
-static __u32 osd_it_store(const struct lu_env *env, const struct dt_it *di)
+static __u64 osd_it_store(const struct lu_env *env, const struct dt_it *di)
 {
         struct osd_it *it = (struct osd_it *)di;
 
@@ -1964,14 +1953,14 @@ static __u32 osd_it_store(const struct lu_env *env, const struct dt_it *di)
 }
 
 static int osd_it_load(const struct lu_env *env,
-                       const struct dt_it *di, __u32 hash)
+                       const struct dt_it *di, __u64 hash)
 {
         struct osd_it *it = (struct osd_it *)di;
 
         return iam_it_load(&it->oi_it, hash);
 }
 
-static struct dt_index_operations osd_index_ops = {
+static const struct dt_index_operations osd_index_ops = {
         .dio_lookup = osd_index_lookup,
         .dio_insert = osd_index_insert,
         .dio_delete = osd_index_delete,
@@ -2046,7 +2035,7 @@ static int osd_index_compat_lookup(const struct lu_env *env,
         struct dentry *dentry;
         struct dentry *parent;
 
-        LASSERT(osd_invariant(obj));
+        LINVRNT(osd_invariant(obj));
         LASSERT(S_ISDIR(obj->oo_inode->i_mode));
         LASSERT(osd_has_index(obj));
 
@@ -2092,7 +2081,7 @@ static int osd_index_compat_lookup(const struct lu_env *env,
         } else
                 result = -ENOMEM;
         dput(parent);
-        LASSERT(osd_invariant(obj));
+        LINVRNT(osd_invariant(obj));
         return result;
 }
 
@@ -2162,7 +2151,7 @@ static int osd_index_compat_insert(const struct lu_env *env,
         int result;
 
         LASSERT(S_ISDIR(obj->oo_inode->i_mode));
-        LASSERT(osd_invariant(obj));
+        LINVRNT(osd_invariant(obj));
         LASSERT(th != NULL);
 
         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT))
@@ -2172,7 +2161,7 @@ static int osd_index_compat_insert(const struct lu_env *env,
         if (result != 0)
                 return result;
 
-        luch = lu_object_find(env, ludev->ld_site, fid);
+        luch = lu_object_find(env, ludev, fid, NULL);
         if (!IS_ERR(luch)) {
                 if (lu_object_exists(luch)) {
                         struct osd_object *child;
@@ -2187,8 +2176,8 @@ static int osd_index_compat_insert(const struct lu_env *env,
                                 CERROR("No osd slice.\n");
                                 result = -ENOENT;
                         }
-                        LASSERT(osd_invariant(obj));
-                        LASSERT(osd_invariant(child));
+                        LINVRNT(osd_invariant(obj));
+                        LINVRNT(osd_invariant(child));
                 } else {
                         CERROR("Sorry.\n");
                         result = -ENOENT;
@@ -2196,11 +2185,11 @@ static int osd_index_compat_insert(const struct lu_env *env,
                 lu_object_put(env, luch);
         } else
                 result = PTR_ERR(luch);
-        LASSERT(osd_invariant(obj));
+        LINVRNT(osd_invariant(obj));
         return result;
 }
 
-static struct dt_index_operations osd_index_compat_ops = {
+static const struct dt_index_operations osd_index_compat_ops = {
         .dio_lookup = osd_index_compat_lookup,
         .dio_insert = osd_index_compat_insert,
         .dio_delete = osd_index_compat_delete
@@ -2235,19 +2224,23 @@ LU_KEY_FINI(osd, struct osd_thread_info);
 static void osd_key_exit(const struct lu_context *ctx,
                          struct lu_context_key *key, void *data)
 {
-#if OSD_COUNTERS
         struct osd_thread_info *info = data;
 
         LASSERT(info->oti_r_locks == 0);
         LASSERT(info->oti_w_locks == 0);
         LASSERT(info->oti_txns    == 0);
-#endif
 }
 
 static int osd_device_init(const struct lu_env *env, struct lu_device *d,
                            const char *name, struct lu_device *next)
 {
-        return lu_env_init(&osd_dev(d)->od_env_for_commit, NULL, LCT_MD_THREAD);
+        int rc;
+        /* context for commit hooks */
+        rc = lu_context_init(&osd_dev(d)->od_env_for_commit.le_ctx,
+                             LCT_MD_THREAD);
+        if (rc == 0)
+                rc = osd_procfs_init(osd_dev(d), name);
+        return rc;
 }
 
 static int osd_shutdown(const struct lu_env *env, struct osd_device *o)
@@ -2293,7 +2286,8 @@ static int osd_mount(const struct lu_env *env,
         if (result == 0) {
                 struct dentry *d;
 
-                d = simple_mkdir(osd_sb(o)->s_root, "*OBJ-TEMP*", 0777, 1);
+                d = simple_mkdir(osd_sb(o)->s_root, lmi->lmi_mnt, "*OBJ-TEMP*",
+                                 0777, 1);
                 if (!IS_ERR(d)) {
                         o->od_obj_area = d;
                 } else
@@ -2307,17 +2301,24 @@ static int osd_mount(const struct lu_env *env,
 static struct lu_device *osd_device_fini(const struct lu_env *env,
                                          struct lu_device *d)
 {
+        int rc;
         ENTRY;
 
         shrink_dcache_sb(osd_sb(osd_dev(d)));
         osd_sync(env, lu2dt_dev(d));
 
+        rc = osd_procfs_fini(osd_dev(d));
+        if (rc) {
+                CERROR("proc fini error %d \n", rc);
+                RETURN (ERR_PTR(rc));
+        }
+
         if (osd_dev(d)->od_mount)
                 server_put_mount(osd_dev(d)->od_mount->lmi_name,
                                  osd_dev(d)->od_mount->lmi_mnt);
         osd_dev(d)->od_mount = NULL;
 
-        lu_env_fini(&osd_dev(d)->od_env_for_commit);
+        lu_context_fini(&osd_dev(d)->od_env_for_commit.le_ctx);
         RETURN(NULL);
 }
 
@@ -2340,22 +2341,30 @@ static struct lu_device *osd_device_alloc(const struct lu_env *env,
                         spin_lock_init(&o->od_osfs_lock);
                         o->od_osfs_age = cfs_time_shift_64(-1000);
                         o->od_capa_hash = init_capa_hash();
-                        if (o->od_capa_hash == NULL)
+                        if (o->od_capa_hash == NULL) {
+                                dt_device_fini(&o->od_dt_dev);
                                 l = ERR_PTR(-ENOMEM);
+                        }
                 } else
                         l = ERR_PTR(result);
+
+                if (IS_ERR(l))
+                        OBD_FREE_PTR(o);
         } else
                 l = ERR_PTR(-ENOMEM);
         return l;
 }
 
-static void osd_device_free(const struct lu_env *env, struct lu_device *d)
+static struct lu_device *osd_device_free(const struct lu_env *env,
+                                         struct lu_device *d)
 {
         struct osd_device *o = osd_dev(d);
+        ENTRY;
 
         cleanup_capa_hash(o->od_capa_hash);
         dt_device_fini(&o->od_dt_dev);
         OBD_FREE_PTR(o);
+        RETURN(NULL);
 }
 
 static int osd_process_config(const struct lu_env *env,
@@ -2426,7 +2435,7 @@ static int osd_fid_lookup(const struct lu_env *env,
         struct inode           *inode;
         int                     result;
 
-        LASSERT(osd_invariant(obj));
+        LINVRNT(osd_invariant(obj));
         LASSERT(obj->oo_inode == NULL);
         LASSERT(fid_is_sane(fid));
         /*
@@ -2464,7 +2473,7 @@ static int osd_fid_lookup(const struct lu_env *env,
                         result = PTR_ERR(inode);
         } else if (result == -ENOENT)
                 result = 0;
-        LASSERT(osd_invariant(obj));
+        LINVRNT(osd_invariant(obj));
         RETURN(result);
 }
 
@@ -2552,7 +2561,7 @@ static int osd_object_invariant(const struct lu_object *l)
         return osd_invariant(osd_obj(l));
 }
 
-static struct lu_object_operations osd_lu_obj_ops = {
+static const struct lu_object_operations osd_lu_obj_ops = {
         .loo_object_init      = osd_object_init,
         .loo_object_delete    = osd_object_delete,
         .loo_object_release   = osd_object_release,
@@ -2561,16 +2570,19 @@ static struct lu_object_operations osd_lu_obj_ops = {
         .loo_object_invariant = osd_object_invariant
 };
 
-static struct lu_device_operations osd_lu_ops = {
+static const struct lu_device_operations osd_lu_ops = {
         .ldo_object_alloc      = osd_object_alloc,
         .ldo_process_config    = osd_process_config,
         .ldo_recovery_complete = osd_recovery_complete
 };
 
-static struct lu_device_type_operations osd_device_type_ops = {
+static const struct lu_device_type_operations osd_device_type_ops = {
         .ldto_init = osd_type_init,
         .ldto_fini = osd_type_fini,
 
+        .ldto_start = osd_type_start,
+        .ldto_stop  = osd_type_stop,
+
         .ldto_device_alloc = osd_device_alloc,
         .ldto_device_free  = osd_device_free,
 
@@ -2588,25 +2600,10 @@ static struct lu_device_type osd_device_type = {
 /*
  * lprocfs legacy support.
  */
-static struct lprocfs_vars lprocfs_osd_obd_vars[] = {
-        { 0 }
-};
-
-static struct lprocfs_vars lprocfs_osd_module_vars[] = {
-        { 0 }
-};
-
 static struct obd_ops osd_obd_device_ops = {
         .o_owner = THIS_MODULE
 };
 
-static void lprocfs_osd_init_vars(struct lprocfs_static_vars *lvars)
-{
-    lvars->module_vars  = lprocfs_osd_module_vars;
-    lvars->obd_vars     = lprocfs_osd_obd_vars;
-}
-
-
 static int __init osd_mod_init(void)
 {
         struct lprocfs_static_vars lvars;
@@ -2621,7 +2618,7 @@ static void __exit osd_mod_exit(void)
         class_unregister_type(LUSTRE_OSD_NAME);
 }
 
-MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
+MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
 MODULE_DESCRIPTION("Lustre Object Storage Device ("LUSTRE_OSD_NAME")");
 MODULE_LICENSE("GPL");