Whamcloud - gitweb
Code update during for recovery in new MDS:
authortappro <tappro>
Thu, 17 Aug 2006 23:08:46 +0000 (23:08 +0000)
committertappro <tappro>
Thu, 17 Aug 2006 23:08:46 +0000 (23:08 +0000)
- add ldo_recovery_complete() to the devices
- place all recovery-related stuff into mdt_recovery.c
- reconstruction code (except reconstruct_open)
- last_rcvd read/write helpers and update
- mdd_trans_stop() takes rc as parameter and saves it in thandle

lustre/cmm/cmm_device.c
lustre/include/lustre_export.h
lustre/mdd/mdd_handler.c
lustre/mdt/Makefile.in
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_internal.h
lustre/mdt/mdt_recovery.c [new file with mode: 0644]
lustre/osd/osd_handler.c

index 5ad0878..fabc806 100644 (file)
@@ -247,9 +247,21 @@ static int cmm_process_config(const struct lu_context *ctx,
         RETURN(err);
 }
 
+static int cmm_recovery_complete(const struct lu_context *ctxt,
+                                 struct lu_device *d)
+{
+        struct cmm_device *m = lu2cmm_dev(d);
+        struct lu_device *next = md2lu_dev(m->cmm_child);
+        int rc;
+        ENTRY;
+        rc = next->ld_ops->ldo_recovery_complete(ctxt, next);
+        RETURN(rc);
+}
+
 static struct lu_device_operations cmm_lu_ops = {
-       .ldo_object_alloc   = cmm_object_alloc,
-        .ldo_process_config = cmm_process_config,
+       .ldo_object_alloc      = cmm_object_alloc,
+        .ldo_process_config    = cmm_process_config,
+        .ldo_recovery_complete = cmm_recovery_complete
 };
 
 /* --- lu_device_type operations --- */
index 4c09e9e..9801d8a 100644 (file)
@@ -24,6 +24,7 @@ struct mds_export_data {
 struct mdt_export_data {
         struct list_head        med_open_head;
         spinlock_t              med_open_lock; /* lock med_open_head, mfd_list*/
+        struct semaphore        med_mcd_lock; 
         struct mdt_client_data *med_mcd;
         __u64                   med_ibits_known;
         loff_t                  med_lr_off;
index 7aceb56..3dba109 100644 (file)
@@ -50,7 +50,8 @@
 static struct thandle* mdd_trans_start(const struct lu_context *ctxt,
                                        struct mdd_device *);
 static void mdd_trans_stop(const struct lu_context *ctxt,
-                           struct mdd_device *mdd, struct thandle *handle);
+                           struct mdd_device *mdd, int rc,
+                           struct thandle *handle);
 static struct dt_object* mdd_object_child(struct mdd_object *o);
 static void __mdd_ref_add(const struct lu_context *ctxt, struct mdd_object *obj,
                           struct thandle *handle);
@@ -525,9 +526,35 @@ out:
         RETURN(rc);
 }
 
+static int mdd_recovery_complete(const struct lu_context *ctxt,
+                                 struct lu_device *d)
+{
+        struct mdd_device *mdd = lu2mdd_dev(d);
+        struct lu_device *next = &mdd->mdd_child->dd_lu_dev;
+        int rc;
+        ENTRY;
+/* TODO:
+        rc = mdd_lov_set_nextid(ctx, mdd);
+        if (rc) {
+                CERROR("%s: mdd_lov_set_nextid failed %d\n",
+                       obd->obd_name, rc);
+                GOTO(out, rc);
+        }
+        rc = mdd_cleanup_unlink_llog(ctx, mdd);
+      
+        obd_notify(obd->u.mds.mds_osc_obd, NULL,
+                   obd->obd_async_recov ? OBD_NOTIFY_SYNC_NONBLOCK :
+                   OBD_NOTIFY_SYNC, NULL);
+*/
+        /* TODO: orphans handling */
+        rc = next->ld_ops->ldo_recovery_complete(ctxt, next);
+        RETURN(rc);
+}
+
 struct lu_device_operations mdd_lu_ops = {
-       .ldo_object_alloc   = mdd_object_alloc,
-        .ldo_process_config = mdd_process_config,
+       .ldo_object_alloc      = mdd_object_alloc,
+        .ldo_process_config    = mdd_process_config,
+        .ldo_recovery_complete = mdd_recovery_complete
 };
 
 static struct lu_object_operations mdd_lu_obj_ops = {
@@ -576,8 +603,10 @@ static struct thandle* mdd_trans_start(const struct lu_context *ctxt,
 }
 
 static void mdd_trans_stop(const struct lu_context *ctxt,
-                           struct mdd_device *mdd, struct thandle *handle)
+                           struct mdd_device *mdd, int result,
+                           struct thandle *handle)
 {
+        handle->th_result = result;
         mdd_child_ops(mdd)->dt_trans_stop(ctxt, handle);
 }
 
@@ -831,7 +860,7 @@ static int mdd_attr_set(const struct lu_context *ctxt,
 
         }
 cleanup:
-        mdd_trans_stop(ctxt, mdd, handle);
+        mdd_trans_stop(ctxt, mdd, rc, handle);
         if (rc == 0 && lmm_size) {
                 /*set obd attr, if needed*/
                 rc = mdd_lov_setattr_async(ctxt, mdd_obj, lmm, lmm_size);
@@ -874,7 +903,7 @@ static int mdd_xattr_set(const struct lu_context *ctxt, struct md_object *obj,
         rc = mdd_xattr_set_txn(ctxt, md2mdd_obj(obj), buf, buf_len, name,
                                fl, handle);
 
-        mdd_trans_stop(ctxt, mdd, handle);
+        mdd_trans_stop(ctxt, mdd, rc, handle);
 
         RETURN(rc);
 }
@@ -908,7 +937,7 @@ int mdd_xattr_del(const struct lu_context *ctxt, struct md_object *obj,
         rc = __mdd_xattr_del(ctxt, mdd, md2mdd_obj(obj), name, handle);
         mdd_unlock(ctxt, mdd_obj, DT_WRITE_LOCK);
 
-        mdd_trans_stop(ctxt, mdd, handle);
+        mdd_trans_stop(ctxt, mdd, rc, handle);
 
         RETURN(rc);
 }
@@ -992,7 +1021,7 @@ static int mdd_link(const struct lu_context *ctxt, struct md_object *tgt_obj,
 
 out:
         mdd_unlock2(ctxt, mdd_tobj, mdd_sobj);
-        mdd_trans_stop(ctxt, mdd, handle);
+        mdd_trans_stop(ctxt, mdd, rc, handle);
         RETURN(rc);
 }
 
@@ -1126,7 +1155,7 @@ static int mdd_unlink(const struct lu_context *ctxt, struct md_object *pobj,
 
 cleanup:
         mdd_unlock2(ctxt, mdd_pobj, mdd_cobj);
-        mdd_trans_stop(ctxt, mdd, handle);
+        mdd_trans_stop(ctxt, mdd, rc, handle);
         RETURN(rc);
 }
 
@@ -1316,7 +1345,7 @@ static int mdd_rename(const struct lu_context *ctxt, struct md_object *src_pobj,
 cleanup:
         mdd_rename_unlock(ctxt, mdd_spobj, mdd_tpobj);
 cleanup_unlocked:
-        mdd_trans_stop(ctxt, mdd, handle);
+        mdd_trans_stop(ctxt, mdd, rc, handle);
         RETURN(rc);
 }
 
@@ -1426,7 +1455,7 @@ static int mdd_create_data(const struct lu_context *ctxt,
                         rc = mdd_attr_get_internal_locked(ctxt, son, ma);
         }
 
-        mdd_trans_stop(ctxt, mdd, handle);
+        mdd_trans_stop(ctxt, mdd, rc, handle);
         RETURN(rc);
 }
 
@@ -1615,7 +1644,7 @@ cleanup:
         if (lmm)
                 OBD_FREE(lmm, lmm_size);
         mdd_unlock(ctxt, mdd_pobj, DT_WRITE_LOCK);
-        mdd_trans_stop(ctxt, mdd, handle);
+        mdd_trans_stop(ctxt, mdd, rc, handle);
         RETURN(rc);
 }
 /* partial operation */
@@ -1643,9 +1672,8 @@ static int mdd_object_create(const struct lu_context *ctxt,
                 GOTO(out, rc);
 
         rc = mdd_attr_get_internal(ctxt, md2mdd_obj(obj), ma);
-
-        mdd_trans_stop(ctxt, mdd, handle);
 out:
+        mdd_trans_stop(ctxt, mdd, rc, handle);
         RETURN(rc);
 }
 /* partial operation */
@@ -1669,7 +1697,7 @@ static int mdd_name_insert(const struct lu_context *ctxt,
         rc = __mdd_index_insert(ctxt, mdd_obj, fid, name, handle);
 
         mdd_unlock(ctxt, mdd_obj, DT_WRITE_LOCK);
-        mdd_trans_stop(ctxt, mdd, handle);
+        mdd_trans_stop(ctxt, mdd, rc, handle);
         RETURN(rc);
 }
 
@@ -1694,7 +1722,7 @@ static int mdd_name_remove(const struct lu_context *ctxt,
 
         mdd_unlock(ctxt, mdd_obj, DT_WRITE_LOCK);
 
-        mdd_trans_stop(ctxt, mdd, handle);
+        mdd_trans_stop(ctxt, mdd, rc, handle);
         RETURN(rc);
 }
 
@@ -1734,7 +1762,7 @@ static int mdd_rename_tgt(const struct lu_context *ctxt, struct md_object *pobj,
                 __mdd_ref_del(ctxt, mdd_tobj, handle);
 cleanup:
         mdd_unlock2(ctxt, mdd_tpobj, mdd_tobj);
-        mdd_trans_stop(ctxt, mdd, handle);
+        mdd_trans_stop(ctxt, mdd, rc, handle);
         RETURN(rc);
 }
 
@@ -1800,7 +1828,7 @@ static int mdd_ref_add(const struct lu_context *ctxt, struct md_object *obj)
         __mdd_ref_add(ctxt, mdd_obj, handle);
         mdd_unlock(ctxt, mdd_obj, DT_WRITE_LOCK);
 
-        mdd_trans_stop(ctxt, mdd, handle);
+        mdd_trans_stop(ctxt, mdd, 0, handle);
 
         RETURN(0);
 }
@@ -1852,7 +1880,7 @@ static int mdd_ref_del(const struct lu_context *ctxt, struct md_object *obj,
 
 cleanup:
         mdd_unlock(ctxt, mdd_obj, DT_WRITE_LOCK);
-        mdd_trans_stop(ctxt, mdd, handle);
+        mdd_trans_stop(ctxt, mdd, rc, handle);
         RETURN(rc);
 }
 
index 46168c8..b1d1c39 100644 (file)
@@ -1,4 +1,4 @@
 MODULES := mdt
-mdt-objs := mdt_handler.o mdt_lib.o mdt_reint.o mdt_xattr.o mdt_fs.o mdt_open.o
+mdt-objs := mdt_handler.o mdt_lib.o mdt_reint.o mdt_xattr.o mdt_recovery.o mdt_open.o
 
 @INCLUDE_RULES@
index f76593c..19954bc 100644 (file)
@@ -619,6 +619,7 @@ free_rdpg:
         return rc;
 }
 
+//extern int mdt_reconstruct(struct mdt_thread_info *);
 static int mdt_reint_internal(struct mdt_thread_info *info, __u32 op)
 {
         int rc;
@@ -628,6 +629,19 @@ static int mdt_reint_internal(struct mdt_thread_info *info, __u32 op)
 
         rc = mdt_reint_unpack(info, op);
         if (rc == 0) {
+                struct ptlrpc_request *req = mdt_info_req(info);
+                if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) {
+                        struct mdt_client_data *mcd;
+                        
+                        mcd = req->rq_export->exp_mdt_data.med_mcd;
+                        if (mcd->mcd_last_xid == req->rq_xid) {
+                                mdt_reconstruct(info);
+                                RETURN(req->rq_repmsg->status);
+                        } 
+                        DEBUG_REQ(D_HA, req,
+                                  "no reply for RESENT (xid "LPD64")",
+                                  mcd->mcd_last_xid);
+                }
                 rc = mdt_reint_rec(info);
         }
 
@@ -2233,7 +2247,6 @@ static void mdt_fini(const struct lu_context *ctx, struct mdt_device *m)
 
         ENTRY;
 
-
         mdt_fs_cleanup(ctx, m);
         ping_evictor_stop();
         mdt_stop_ptlrpc_service(m);
@@ -2470,9 +2483,16 @@ static struct lu_object_operations mdt_obj_ops = {
 };
 
 /* mds_connect_internal */
-static int mdt_connect0(struct mdt_device *mdt,
-                        struct obd_export *exp, struct obd_connect_data *data)
+static int mdt_connect_internal(const struct lu_context *ctx,
+                                struct mdt_device *mdt,
+                                struct obd_export *exp,
+                                struct obd_uuid *cluuid,
+                                struct obd_connect_data *data)
 {
+        struct mdt_export_data *med = &exp->exp_mdt_data;
+        struct mdt_client_data *mcd;
+        int rc;
+
         if (data != NULL) {
                 data->ocd_connect_flags &= MDT_CONNECT_SUPPORTED;
                 data->ocd_ibits_known &= MDS_INODELOCK_FULL;
@@ -2501,7 +2521,18 @@ static int mdt_connect0(struct mdt_device *mdt,
                       mdt->mdt_md_dev.md_lu_dev.ld_obd->obd_name);
                 return -EBADE;
         }
-        return 0;
+
+        OBD_ALLOC_PTR(mcd);
+        if (mcd != NULL) {
+                memcpy(mcd->mcd_uuid, cluuid, sizeof mcd->mcd_uuid);
+                med->med_mcd = mcd;
+                rc = mdt_client_add(ctx, mdt, med, -1);
+                if (rc != 0)
+                        OBD_FREE_PTR(mcd);
+        } else
+                rc = -ENOMEM;
+
+        return rc;
 }
 
 /* mds_connect copy */
@@ -2512,8 +2543,6 @@ static int mdt_obd_connect(const struct lu_context *ctx,
 {
         struct obd_export      *exp;
         struct mdt_device      *mdt;
-        struct mdt_export_data *med;
-        struct mdt_client_data *mcd;
         int                     rc;
         ENTRY;
 
@@ -2529,22 +2558,8 @@ static int mdt_obd_connect(const struct lu_context *ctx,
 
         exp = class_conn2export(conn);
         LASSERT(exp != NULL);
-        med = &exp->exp_mdt_data;
 
-        rc = mdt_connect0(mdt, exp, data);
-        if (rc == 0) {
-                OBD_ALLOC_PTR(mcd);
-                if (mcd != NULL) {
-                        memcpy(mcd->mcd_uuid, cluuid, sizeof mcd->mcd_uuid);
-                        med->med_mcd = mcd;
-                        /*
-                         * rc = mdt_client_add(ctx, mdt, med, -1);
-                         */
-                        if (rc != 0)
-                                OBD_FREE_PTR(mcd);
-                } else
-                        rc = -ENOMEM;
-        }
+        rc = mdt_connect_internal(ctx, mdt, exp, cluuid, data);
         if (rc != 0)
                 class_disconnect(exp);
         else
@@ -2602,6 +2617,7 @@ static int mdt_destroy_export(struct obd_export *export)
         struct mdt_export_data *med;
         struct obd_device *obd = export->exp_obd;
         struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
+        struct mdt_thread_info *info;
         struct lu_context ctxt;
         struct md_attr ma;
         int rc = 0;
@@ -2618,6 +2634,9 @@ static int mdt_destroy_export(struct obd_export *export)
                 RETURN(rc);
 
         lu_context_enter(&ctxt);
+
+        info = lu_context_key_get(&ctxt, &mdt_thread_key);
+        LASSERT(info != NULL);
         /* Close any open files (which may also cause orphan unlinking). */
         spin_lock(&med->med_open_lock);
         while (!list_empty(&med->med_open_head)) {
index 1887596..7adbc19 100644 (file)
@@ -343,12 +343,10 @@ int mdt_setxattr(struct mdt_thread_info *info);
 void mdt_lock_handle_init(struct mdt_lock_handle *lh);
 void mdt_lock_handle_fini(struct mdt_lock_handle *lh);
 
+void mdt_reconstruct(struct mdt_thread_info *);
 
-int mdt_fs_setup(const struct lu_context *ctxt,
-                 struct mdt_device *mdt);
-
-void mdt_fs_cleanup(const struct lu_context *ctxt,
-                    struct mdt_device *mdt);
+int mdt_fs_setup(const struct lu_context *, struct mdt_device *);
+void mdt_fs_cleanup(const struct lu_context *, struct mdt_device *);
 
 int mdt_client_free(const struct lu_context *ctxt,
                     struct mdt_device *mdt,
diff --git a/lustre/mdt/mdt_recovery.c b/lustre/mdt/mdt_recovery.c
new file mode 100644 (file)
index 0000000..ab0c55a
--- /dev/null
@@ -0,0 +1,754 @@
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ *  linux/mdt/mdt_recovery.c
+ *  Lustre Metadata Target (mdt) recovery-related methods
+ *
+ *  Copyright (C) 2002-2006 Cluster File Systems, Inc.
+ *   Author: Huang Hua <huanghua@clusterfs.com>
+ *   Author; Pershin Mike <tappro@clusterfs.com>
+ *
+ *   This file is part of the Lustre file system, http://www.lustre.org
+ *   Lustre is a trademark of Cluster File Systems, Inc.
+ *
+ *   You may have signed or agreed to another license before downloading
+ *   this software.  If so, you are bound by the terms and conditions
+ *   of that agreement, and the following does not apply to you.  See the
+ *   LICENSE file included with this distribution for more information.
+ *
+ *   If you did not agree to a different license, then this copy of Lustre
+ *   is open source software; you can redistribute it and/or modify it
+ *   under the terms of version 2 of the GNU General Public License as
+ *   published by the Free Software Foundation.
+ *
+ *   In either case, Lustre is distributed in the hope that it will be
+ *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
+ *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *   license text for more details.
+ */
+#ifndef EXPORT_SYMTAB
+# define EXPORT_SYMTAB
+#endif
+#define DEBUG_SUBSYSTEM S_MDS
+
+#include "mdt_internal.h"
+
+static int mdt_update_server_data(const struct lu_context *ctx,
+                                  struct mdt_device *mdt);
+
+/* last_rcvd handling */
+static inline int mdt_read_last_rcvd_header (const struct lu_context *ctx,
+                                             struct mdt_device *mdt,
+                                             struct mdt_server_data *msd)
+{
+        loff_t off = 0;
+        int rc;
+
+        rc = mdt->mdt_last_rcvd->do_body_ops->dbo_read(ctx,
+                                                       mdt->mdt_last_rcvd,
+                                                       msd, sizeof(*msd),
+                                                       &off);
+        if (rc == sizeof(*msd))
+                rc = 0;
+        else if (rc >= 0)
+                rc = -EFAULT;
+        return rc;
+}
+
+static inline int mdt_write_last_rcvd_header(const struct lu_context *ctx, 
+                                             struct mdt_device *mdt,
+                                             struct mdt_server_data *msd,
+                                             struct thandle *th)
+{
+        loff_t off = 0;
+        int rc;
+
+        rc = mdt->mdt_last_rcvd->do_body_ops->dbo_write(ctx,
+                                                        mdt->mdt_last_rcvd,
+                                                        msd, sizeof(*msd),
+                                                        &off, th);
+        if (rc == sizeof(*msd))
+                rc = 0;
+        else if (rc >= 0)
+                rc = -EFAULT;
+        return rc;
+}
+
+static inline int mdt_read_last_rcvd(const struct lu_context *ctx,
+                                     struct mdt_device *mdt,
+                                     struct mdt_client_data *mcd, loff_t *off)
+{ 
+        int rc;
+
+        rc = mdt->mdt_last_rcvd->do_body_ops->dbo_read(ctx, mdt->mdt_last_rcvd,
+                                                       mcd, sizeof(*mcd), off);
+        if (rc == sizeof(*mcd))
+                rc = 0;
+        else if (rc >= 0)
+                rc = -EFAULT;
+        return rc;
+}
+
+static inline int mdt_write_last_rcvd(const struct lu_context *ctx, 
+                                      struct mdt_device *mdt,
+                                      struct mdt_client_data *mcd,
+                                      loff_t *off, struct thandle *th)
+{
+        int rc;
+
+        rc = mdt->mdt_last_rcvd->do_body_ops->dbo_write(ctx,
+                                                        mdt->mdt_last_rcvd,
+                                                        mcd, sizeof(*mcd),
+                                                        off, th);
+        if (rc == sizeof(*mcd))
+                rc = 0;
+        else if (rc >= 0)
+                rc = -EFAULT;
+        return rc;
+}
+
+static int mdt_init_clients_data(const struct lu_context *ctx,
+                                struct mdt_device *mdt,
+                                unsigned long last_size)
+{
+        struct mdt_server_data *msd = &mdt->mdt_msd;
+        struct mdt_client_data *mcd;
+        struct obd_device      *obd = mdt->mdt_md_dev.md_lu_dev.ld_obd;
+        loff_t off = 0;
+        int cl_idx;
+        int rc = 0;
+        ENTRY;
+
+        /* When we do a clean MDS shutdown, we save the last_transno into
+         * the header.  If we find clients with higher last_transno values
+         * then those clients may need recovery done. */
+
+        OBD_ALLOC_PTR(mcd);
+        if (!mcd)
+                RETURN(rc = -ENOMEM);
+
+        for (cl_idx = 0, off = le32_to_cpu(msd->msd_client_start);
+             off < last_size; cl_idx++) {
+                __u64 last_transno;
+                struct obd_export *exp;
+                struct mdt_export_data *med;
+
+                off = le32_to_cpu(msd->msd_client_start) +
+                        cl_idx * le16_to_cpu(msd->msd_client_size);
+
+                rc = mdt_read_last_rcvd(ctx, mdt, mcd, &off);
+                if (rc) {
+                        CERROR("error reading MDS %s idx %d, off %llu: rc %d\n",
+                               LAST_RCVD, cl_idx, off, rc);
+                        break; /* read error shouldn't cause startup to fail */
+                }
+
+                if (mcd->mcd_uuid[0] == '\0') {
+                        CDEBUG(D_INFO, "skipping zeroed client at offset %d\n",
+                               cl_idx);
+                        continue;
+                }
+
+                last_transno = le64_to_cpu(mcd->mcd_last_transno);
+
+                /* These exports are cleaned up by mdt_obd_disconnect(), so
+                 * they need to be set up like real exports as
+                 * mdt_obd_connect() does.
+                 */
+                CDEBUG(D_HA, "RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64
+                       " srv lr: "LPU64" lx: "LPU64"\n", mcd->mcd_uuid, cl_idx,
+                       last_transno, le64_to_cpu(msd->msd_last_transno),
+                       le64_to_cpu(mcd->mcd_last_xid));
+
+                exp = class_new_export(obd, (struct obd_uuid *)mcd->mcd_uuid);
+                if (IS_ERR(exp))
+                        GOTO(err_client, rc = PTR_ERR(exp));
+
+                med = &exp->exp_mdt_data;
+                med->med_mcd = mcd;
+                rc = mdt_client_add(ctx, mdt, med, cl_idx);
+                LASSERTF(rc == 0, "rc = %d\n", rc); /* can't fail existing */
+
+                exp->exp_replay_needed = 1;
+                exp->exp_connecting = 0;
+                obd->obd_recoverable_clients++;
+                obd->obd_max_recoverable_clients++;
+                class_export_put(exp);
+
+                CDEBUG(D_OTHER, "client at idx %d has last_transno = "LPU64"\n",
+                       cl_idx, last_transno);
+
+                spin_lock(&mdt->mdt_transno_lock);
+                if (last_transno > mdt->mdt_last_transno)
+                        mdt->mdt_last_transno = last_transno;
+                spin_unlock(&mdt->mdt_transno_lock);
+        }
+
+err_client:
+        OBD_FREE_PTR(mcd);
+
+        RETURN(rc);
+}
+
+static int mdt_init_server_data(const struct lu_context *ctx,
+                                struct mdt_device *mdt)
+{
+        struct mdt_server_data *msd = &mdt->mdt_msd;
+        struct mdt_client_data *mcd = NULL;
+        struct obd_device      *obd = mdt->mdt_md_dev.md_lu_dev.ld_obd;
+        struct mdt_thread_info *mti;
+        struct lu_attr         *la;
+        unsigned long last_rcvd_size = 0;
+        __u64 mount_count;
+        int rc = 0;
+        ENTRY;
+
+        /* ensure padding in the struct is the correct size */
+        LASSERT(offsetof(struct mdt_server_data, msd_padding) +
+                sizeof(msd->msd_padding) == LR_SERVER_SIZE);
+        LASSERT(offsetof(struct mdt_client_data, mcd_padding) +
+                sizeof(mcd->mcd_padding) == LR_CLIENT_SIZE);
+        
+        mti = lu_context_key_get(ctx, &mdt_thread_key);
+        LASSERT(mti != NULL);
+        la = &mti->mti_attr.ma_attr;
+
+        rc = mdt->mdt_last_rcvd->do_ops->do_attr_get(ctx,
+                                                     mdt->mdt_last_rcvd, la);
+        if (rc)
+                RETURN(rc);
+
+        last_rcvd_size = la->la_size;
+
+        if (last_rcvd_size == 0) {
+                LCONSOLE_WARN("%s: new disk, initializing\n", obd->obd_name);
+
+                memcpy(msd->msd_uuid, obd->obd_uuid.uuid,
+                       sizeof(msd->msd_uuid));
+                msd->msd_last_transno = 0;
+                msd->msd_mount_count = 0;
+                msd->msd_server_size = cpu_to_le32(LR_SERVER_SIZE);
+                msd->msd_client_start = cpu_to_le32(LR_CLIENT_START);
+                msd->msd_client_size = cpu_to_le16(LR_CLIENT_SIZE);
+                msd->msd_feature_rocompat = cpu_to_le32(OBD_ROCOMPAT_LOVOBJID);
+                msd->msd_feature_incompat = cpu_to_le32(OBD_INCOMPAT_MDT |
+                                                       OBD_INCOMPAT_COMMON_LR);
+        } else {
+                rc = mdt_read_last_rcvd_header(mti->mti_ctxt, mdt, msd);
+                if (rc) {
+                        CERROR("error reading MDS %s: rc %d\n", LAST_RCVD, rc);
+                        GOTO(out, rc);
+                }
+                if (strcmp(msd->msd_uuid, obd->obd_uuid.uuid) != 0) {
+                        LCONSOLE_ERROR("Trying to start OBD %s using the wrong"
+                                       " disk %s. Were the /dev/ assignments "
+                                       "rearranged?\n",
+                                       obd->obd_uuid.uuid, msd->msd_uuid);
+                        GOTO(out, rc = -EINVAL);
+                }
+        }
+        mount_count = le64_to_cpu(msd->msd_mount_count);
+
+        if (msd->msd_feature_incompat & ~cpu_to_le32(MDT_INCOMPAT_SUPP)) {
+                CERROR("%s: unsupported incompat filesystem feature(s) %x\n",
+                       obd->obd_name, le32_to_cpu(msd->msd_feature_incompat) &
+                       ~MDT_INCOMPAT_SUPP);
+                GOTO(out, rc = -EINVAL);
+        }
+        if (msd->msd_feature_rocompat & ~cpu_to_le32(MDT_ROCOMPAT_SUPP)) {
+                CERROR("%s: unsupported read-only filesystem feature(s) %x\n",
+                       obd->obd_name, le32_to_cpu(msd->msd_feature_rocompat) &
+                       ~MDT_ROCOMPAT_SUPP);
+                /* Do something like remount filesystem read-only */
+                GOTO(out, rc = -EINVAL);
+        }
+        if (!(msd->msd_feature_incompat & cpu_to_le32(OBD_INCOMPAT_COMMON_LR))){
+                CDEBUG(D_WARNING, "using old last_rcvd format\n");
+                msd->msd_mount_count = msd->msd_last_transno;
+                msd->msd_last_transno = msd->msd_unused;
+                /* If we update the last_rcvd, we can never go back to
+                   an old install, so leave this in the old format for now.
+                msd->msd_feature_incompat |= cpu_to_le32(LR_INCOMPAT_COMMON_LR);
+                */
+        }
+        msd->msd_feature_compat = cpu_to_le32(OBD_COMPAT_MDT);
+
+        spin_lock(&mdt->mdt_transno_lock);
+        mdt->mdt_last_transno = le64_to_cpu(msd->msd_last_transno);
+        spin_unlock(&mdt->mdt_transno_lock);
+
+        CDEBUG(D_INODE, "%s: server last_transno: "LPU64"\n",
+               obd->obd_name, mdt->mdt_last_transno);
+        CDEBUG(D_INODE, "%s: server mount_count: "LPU64"\n",
+               obd->obd_name, mount_count + 1);
+        CDEBUG(D_INODE, "%s: server data size: %u\n",
+               obd->obd_name, le32_to_cpu(msd->msd_server_size));
+        CDEBUG(D_INODE, "%s: per-client data start: %u\n",
+               obd->obd_name, le32_to_cpu(msd->msd_client_start));
+        CDEBUG(D_INODE, "%s: per-client data size: %u\n",
+               obd->obd_name, le32_to_cpu(msd->msd_client_size));
+        CDEBUG(D_INODE, "%s: last_rcvd size: %lu\n",
+               obd->obd_name, last_rcvd_size);
+        CDEBUG(D_INODE, "%s: last_rcvd clients: %lu\n", obd->obd_name,
+               last_rcvd_size <= le32_to_cpu(msd->msd_client_start) ? 0 :
+               (last_rcvd_size - le32_to_cpu(msd->msd_client_start)) /
+                le16_to_cpu(msd->msd_client_size));
+
+        if (!msd->msd_server_size || !msd->msd_client_start ||
+            !msd->msd_client_size) {
+                CERROR("Bad last_rcvd contents!\n");
+                GOTO(out, rc = -EINVAL);
+        }
+
+        rc = mdt_init_clients_data(ctx, mdt, last_rcvd_size);
+        if (rc)
+                GOTO(err_client, rc);
+
+        spin_lock(&mdt->mdt_transno_lock);
+        obd->obd_last_committed = mdt->mdt_last_transno;
+        spin_unlock(&mdt->mdt_transno_lock);
+
+        if (obd->obd_recoverable_clients) {
+                CWARN("RECOVERY: service %s, %d recoverable clients, "
+                      "last_transno "LPU64"\n", obd->obd_name,
+                      obd->obd_recoverable_clients, mdt->mdt_last_transno);
+                obd->obd_next_recovery_transno = obd->obd_last_committed + 1;
+                obd->obd_recovering = 1;
+                obd->obd_recovery_start = CURRENT_SECONDS;
+                /* Only used for lprocfs_status */
+                obd->obd_recovery_end = obd->obd_recovery_start +
+                        OBD_RECOVERY_TIMEOUT;
+        }
+
+        mdt->mdt_mount_count++;
+        msd->msd_mount_count = cpu_to_le64(mdt->mdt_mount_count);
+
+        /* save it, so mount count and last_transno is current */
+        rc = mdt_update_server_data(ctx, mdt);
+        if (rc)
+                GOTO(err_client, rc);
+
+        RETURN(0);
+
+err_client:
+        class_disconnect_exports(obd);
+out:
+        return rc;
+}
+
+static int mdt_update_server_data(const struct lu_context *ctx,
+                                  struct mdt_device *mdt)
+{
+        struct mdt_server_data *msd = &mdt->mdt_msd;
+        int rc = 0;
+        ENTRY;
+
+        CDEBUG(D_SUPER, "MDS mount_count is "LPU64", last_transno is "LPU64"\n",
+                mdt->mdt_mount_count, mdt->mdt_last_transno);
+
+        spin_lock(&mdt->mdt_transno_lock);
+        msd->msd_last_transno = cpu_to_le64(mdt->mdt_last_transno);
+        spin_unlock(&mdt->mdt_transno_lock);
+
+        rc = mdt_write_last_rcvd_header(ctx, mdt, msd, NULL);
+        RETURN(rc);
+}
+
+/* Add client data to the MDS.  We use a bitmap to locate a free space
+ * in the last_rcvd file if cl_off is -1 (i.e. a new client).
+ * Otherwise, we just have to read the data from the last_rcvd file and
+ * we know its offset.
+ *
+ * It should not be possible to fail adding an existing client - otherwise
+ * mdt_init_server_data() callsite needs to be fixed.
+ */
+int mdt_client_add(const struct lu_context *ctx,
+                   struct mdt_device *mdt,
+                   struct mdt_export_data *med, int cl_idx)
+{
+        unsigned long *bitmap = mdt->mdt_client_bitmap;
+        struct mdt_client_data *mcd = med->med_mcd;
+        struct mdt_server_data *msd = &mdt->mdt_msd;
+        int new_client = (cl_idx == -1);
+        int rc = 0;
+        ENTRY;
+
+        LASSERT(bitmap != NULL);
+        LASSERTF(cl_idx > -2, "%d\n", cl_idx);
+
+        /* the bitmap operations can handle cl_idx > sizeof(long) * 8, so
+         * there's no need for extra complication here
+         */
+        if (new_client) {
+                cl_idx = find_first_zero_bit(bitmap, LR_MAX_CLIENTS);
+        repeat:
+                if (cl_idx >= LR_MAX_CLIENTS ||
+                    OBD_FAIL_CHECK_ONCE(OBD_FAIL_MDS_CLIENT_ADD)) {
+                        CERROR("no room for clients - fix LR_MAX_CLIENTS\n");
+                        return -EOVERFLOW;
+                }
+                if (test_and_set_bit(cl_idx, bitmap)) {
+                        cl_idx = find_next_zero_bit(bitmap, LR_MAX_CLIENTS,
+                                                    cl_idx);
+                        goto repeat;
+                }
+        } else {
+                if (test_and_set_bit(cl_idx, bitmap)) {
+                        CERROR("MDS client %d: bit already set in bitmap!!\n",
+                               cl_idx);
+                        LBUG();
+                }
+        }
+
+        CDEBUG(D_INFO, "client at idx %d with UUID '%s' added\n",
+               cl_idx, med->med_mcd->mcd_uuid);
+
+        med->med_lr_idx = cl_idx;
+        med->med_lr_off = le32_to_cpu(msd->msd_client_start) +
+                          (cl_idx * le16_to_cpu(msd->msd_client_size));
+        init_mutex(&med->med_mcd_lock);
+
+        LASSERTF(med->med_lr_off > 0, "med_lr_off = %llu\n", med->med_lr_off);
+
+        if (new_client) {
+                rc = mdt_write_last_rcvd(ctx, mdt, mcd,
+                                         &med->med_lr_off, NULL);
+                CDEBUG(D_INFO, "wrote client mcd at idx %u off %llu (len %u)\n",
+                       cl_idx, med->med_lr_off, sizeof(*mcd));
+        }
+        RETURN(rc);
+}
+
+int mdt_client_free(const struct lu_context *ctx,
+                    struct mdt_device *mdt,
+                    struct mdt_export_data *med)
+{
+        struct mdt_client_data *mcd  = med->med_mcd;
+        int rc = 0;
+        loff_t off;
+        ENTRY;
+
+        if (!mcd)
+                RETURN(0);
+
+        CDEBUG(D_INFO, "freeing client at idx %u, offset %lld\n",
+               med->med_lr_idx, med->med_lr_off);
+
+        off = med->med_lr_off;
+
+        /* Don't clear med_lr_idx here as it is likely also unset.  At worst
+         * we leak a client slot that will be cleaned on the next recovery. */
+        if (off <= 0) {
+                CERROR("client idx %d has offset %lld\n",
+                        med->med_lr_idx, off);
+                GOTO(free, rc = -EINVAL);
+        }
+
+        /* Clear the bit _after_ zeroing out the client so we don't
+           race with mdt_client_add and zero out new clients.*/
+        if (!test_bit(med->med_lr_idx, mdt->mdt_client_bitmap)) {
+                CERROR("MDT client %u: bit already clear in bitmap!!\n",
+                       med->med_lr_idx);
+                LBUG();
+        }
+
+        mutex_down(&med->med_mcd_lock);
+        memset(mcd, 0, sizeof *mcd);
+        rc = mdt_write_last_rcvd(ctx, mdt, mcd, &off, NULL);
+        mutex_up(&med->med_mcd_lock);
+
+        CDEBUG_EX(rc == 0 ? D_INFO : D_ERROR,
+                  "zeroing out client idx %u in %s rc %d\n",
+                  med->med_lr_idx, LAST_RCVD, rc);
+
+        if (!test_and_clear_bit(med->med_lr_idx, mdt->mdt_client_bitmap)) {
+                CERROR("MDS client %u: bit already clear in bitmap!!\n",
+                       med->med_lr_idx);
+                LBUG();
+        }
+
+        /* Make sure the server's last_transno is up to date. Do this
+         * after the client is freed so we know all the client's
+         * transactions have been committed. */
+        mdt_update_server_data(ctx, mdt);
+
+        EXIT;
+free:
+        OBD_FREE_PTR(mcd);
+        med->med_mcd = NULL;
+        return 0;
+}
+
+/*
+ * last_rcvd & last_committed update callbacks
+ */
+static int mdt_update_last_rcvd(struct mdt_thread_info *mti,
+                                struct dt_device *dt,
+                                struct thandle *th)
+{
+        struct mdt_device *mdt = mti->mti_mdt;
+        struct ptlrpc_request *req = mdt_info_req(mti);
+        struct mdt_export_data *med;
+        struct mdt_client_data *mcd;
+        loff_t off;
+        int err;
+        __s32 rc = th->th_result;
+        
+        ENTRY;
+        LASSERT(req);
+        LASSERT(req->rq_export);
+        LASSERT(mdt);
+        med = &req->rq_export->exp_mdt_data;
+        LASSERT(med);
+        mcd = med->med_mcd;
+        /* if the export has already been failed, we have no last_rcvd slot */
+        if (req->rq_export->exp_failed) {
+                CWARN("commit transaction for disconnected client %s: rc %d\n",
+                      req->rq_export->exp_client_uuid.uuid, rc);
+                if (rc == 0)
+                        rc = -ENOTCONN;
+                RETURN(rc);
+        }
+
+        off = med->med_lr_off;
+        mutex_down(&med->med_mcd_lock);
+        mcd->mcd_last_transno = cpu_to_le64(mti->mti_transno);
+        mcd->mcd_last_xid = cpu_to_le64(req->rq_xid);
+        mcd->mcd_last_result = cpu_to_le32(rc);
+        /* XXX: how to pass op_data here? */
+        //mcd->mcd_last_data = cpu_to_le32(op_data);
+        
+        if (off <= 0) {
+                CERROR("client idx %d has offset %lld\n", med->med_lr_idx, off);
+                err = -EINVAL;
+        } else {
+                err = mdt_write_last_rcvd(mti->mti_ctxt, mdt, mcd, &off, th);
+        }
+        mutex_up(&med->med_mcd_lock);
+        RETURN(err);
+}
+
+extern struct lu_context_key mdt_txn_key;
+extern struct lu_context_key mdt_thread_key;
+
+enum {
+        MDT_TXN_LAST_RCVD_CREDITS = 3
+};
+
+/* add credits for last_rcvd update */
+static int mdt_txn_start_cb(const struct lu_context *ctx,
+                            struct dt_device *dev,
+                            struct txn_param *param, void *cookie)
+{
+        param->tp_credits += MDT_TXN_LAST_RCVD_CREDITS;
+        return 0;
+}
+
+/* Update last_rcvd records with latests transaction data */
+static int mdt_txn_stop_cb(const struct lu_context *ctx,
+                           struct dt_device *dev,
+                           struct thandle *txn, void *cookie)
+{
+        struct mdt_device *mdt = cookie;
+        struct mdt_txn_info *txi;
+        struct mdt_thread_info *mti;
+
+        /* transno in two contexts - for commit_cb and for thread */
+        txi = lu_context_key_get(&txn->th_ctx, &mdt_txn_key);
+        mti = lu_context_key_get(ctx, &mdt_thread_key);
+
+        /*TODO: checks for recovery cases, see mds_finish_transno */
+        spin_lock(&mdt->mdt_transno_lock);
+        if (mti->mti_transno == 0) {
+                mti->mti_transno = ++ mdt->mdt_last_transno;
+        } else {
+                /* replay */
+                if (mti->mti_transno > mdt->mdt_last_transno)
+                        mdt->mdt_last_transno = mti->mti_transno;
+        }
+        spin_unlock(&mdt->mdt_transno_lock);
+        /* save transno for the commit callback */
+        txi->txi_transno = mti->mti_transno;
+
+        return 0;//mdt_update_last_rcvd(mti, dev, txn);
+}
+
+/* commit callback, need to update last_commited value */
+static int mdt_txn_commit_cb(const struct lu_context *ctx,
+                             struct dt_device *dev,
+                             struct thandle *txn, void *cookie)
+{
+        struct mdt_device *mdt = cookie;
+        struct obd_device *obd = md2lu_dev(&mdt->mdt_md_dev)->ld_obd;
+        struct mdt_txn_info *txi;
+
+        txi = lu_context_key_get(&txn->th_ctx, &mdt_txn_key);
+        spin_lock(&mdt->mdt_transno_lock);
+        if (txi->txi_transno > mdt->mdt_last_committed) {
+                mdt->mdt_last_committed = txi->txi_transno;
+                spin_lock(&mdt->mdt_transno_lock);
+                ptlrpc_commit_replies (obd);
+        } else 
+                spin_lock(&mdt->mdt_transno_lock);
+
+        CDEBUG(D_HA, "%s: transno "LPD64" committed\n",
+               obd->obd_name, txi->txi_transno);
+
+        return 0;
+}
+
+int mdt_fs_setup(const struct lu_context *ctx, struct mdt_device *mdt)
+{
+        struct lu_fid last_fid;
+        struct dt_object *last;
+        int rc = 0;
+        ENTRY;
+
+        /* prepare transactions callbacks */
+        mdt->mdt_txn_cb.dtc_txn_start = mdt_txn_start_cb;
+        mdt->mdt_txn_cb.dtc_txn_stop = mdt_txn_stop_cb;
+        mdt->mdt_txn_cb.dtc_txn_commit = mdt_txn_commit_cb;
+        mdt->mdt_txn_cb.dtc_cookie = mdt;
+
+        dt_txn_callback_add(mdt->mdt_bottom, &mdt->mdt_txn_cb);
+
+        last = dt_store_open(ctx, mdt->mdt_bottom,
+                             LAST_RCVD, &last_fid);
+        if(!IS_ERR(last)) {
+                mdt->mdt_last_rcvd = last;
+                rc = mdt_init_server_data(ctx, mdt);
+                if (rc) {
+                        lu_object_put(ctx, &last->do_lu);
+                        mdt->mdt_last_rcvd = NULL;
+                }
+        } else {
+                rc = PTR_ERR(last);
+                CERROR("cannot open %s: rc = %d\n", LAST_RCVD, rc);
+        }
+
+        RETURN (rc);
+}
+
+
+void mdt_fs_cleanup(const struct lu_context *ctx, struct mdt_device *mdt)
+{
+        struct obd_device *obd = mdt->mdt_md_dev.md_lu_dev.ld_obd;
+
+        /* remove transaction callback */
+        dt_txn_callback_del(mdt->mdt_bottom, &mdt->mdt_txn_cb);
+
+        class_disconnect_exports(obd); /* cleans up client info too */
+
+        if (mdt->mdt_last_rcvd)
+                lu_object_put(ctx, &mdt->mdt_last_rcvd->do_lu);
+        mdt->mdt_last_rcvd = NULL;
+}
+
+/* reconstruction code */
+static inline void mdt_req_from_mcd(struct ptlrpc_request *req,
+                                    struct mdt_client_data *mcd)
+{
+        DEBUG_REQ(D_HA, req, "restoring transno "LPD64"/status %d",
+                  mcd->mcd_last_transno, mcd->mcd_last_result);
+        req->rq_repmsg->transno = req->rq_transno = mcd->mcd_last_transno;
+        req->rq_repmsg->status = req->rq_status = mcd->mcd_last_result;
+        //mds_steal_ack_locks(req);
+}
+
+static void mdt_reconstruct_generic(struct mdt_thread_info *mti)
+{
+        struct ptlrpc_request *req = mdt_info_req(mti);
+        struct mdt_export_data *med = &req->rq_export->exp_mdt_data;
+
+        return mdt_req_from_mcd(req, med->med_mcd);
+}
+
+static void mdt_reconstruct_create(struct mdt_thread_info *mti)
+{
+        struct ptlrpc_request  *req = mdt_info_req(mti);
+        struct mdt_export_data *med = &req->rq_export->exp_mdt_data;
+        struct mdt_device *mdt = mti->mti_mdt;
+        struct mdt_object *child;
+        struct mdt_body *body;
+        int rc;
+
+        mdt_req_from_mcd(req, med->med_mcd);
+        if (req->rq_status)
+                return;
+
+        /* if no error, so child was created with requested fid */
+        child = mdt_object_find(mti->mti_ctxt, mdt, mti->mti_rr.rr_fid2);
+        LASSERT(!IS_ERR(child));
+
+        body = req_capsule_server_get(&mti->mti_pill, &RMF_MDT_BODY);
+        rc = mo_attr_get(mti->mti_ctxt, mdt_object_child(child),
+                         &mti->mti_attr);
+        if (rc == -EREMOTE) {
+                /* object was created on remote server */
+                req->rq_status = rc;
+                body->valid |= OBD_MD_MDS;
+        }
+        mdt_pack_attr2body(body, &mti->mti_attr.ma_attr,
+                           mti->mti_rr.rr_fid2);
+        mdt_object_put(mti->mti_ctxt, child);
+}
+
+static void mdt_reconstruct_setattr(struct mdt_thread_info *mti)
+{
+        struct ptlrpc_request  *req = mdt_info_req(mti);
+        struct mdt_export_data *med = &req->rq_export->exp_mdt_data;
+        struct mdt_device *mdt = mti->mti_mdt;
+        struct mdt_object *obj;
+        struct mdt_body *body;
+        
+        mdt_req_from_mcd(req, med->med_mcd);
+        if (req->rq_status)
+                return;
+        
+        body = req_capsule_server_get(&mti->mti_pill, &RMF_MDT_BODY);
+        obj = mdt_object_find(mti->mti_ctxt, mdt, mti->mti_rr.rr_fid1);
+        LASSERT(!IS_ERR(obj));
+        mo_attr_get(mti->mti_ctxt, mdt_object_child(obj), &mti->mti_attr);
+        mdt_pack_attr2body(body, &mti->mti_attr.ma_attr,
+                           mti->mti_rr.rr_fid1);
+
+        /* Don't return OST-specific attributes if we didn't just set them */
+/*
+        if (rec->ur_iattr.ia_valid & ATTR_SIZE)
+                body->valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
+        if (rec->ur_iattr.ia_valid & (ATTR_MTIME | ATTR_MTIME_SET))
+                body->valid |= OBD_MD_FLMTIME;
+        if (rec->ur_iattr.ia_valid & (ATTR_ATIME | ATTR_ATIME_SET))
+                body->valid |= OBD_MD_FLATIME;
+*/
+        mdt_object_put(mti->mti_ctxt, obj);
+}
+
+static void mdt_reconstruct_open(struct mdt_thread_info *mti)
+{
+        struct ptlrpc_request *req = mdt_info_req(mti);
+        struct mdt_export_data *med = &req->rq_export->exp_mdt_data;
+
+        /*TODO: after open recovery task will be finished */
+
+        mdt_req_from_mcd(req, med->med_mcd);
+}
+
+typedef void (*mdt_reconstructor)(struct mdt_thread_info *mti);
+
+static mdt_reconstructor reconstructors[REINT_MAX] = {
+        [REINT_SETATTR]  = mdt_reconstruct_setattr,
+        [REINT_CREATE] = mdt_reconstruct_create,
+        [REINT_LINK] = mdt_reconstruct_generic,
+        [REINT_UNLINK] = mdt_reconstruct_generic,
+        [REINT_RENAME] = mdt_reconstruct_generic,
+        [REINT_OPEN] = mdt_reconstruct_open
+};
+
+void mdt_reconstruct(struct mdt_thread_info *mti)
+{
+        ENTRY;
+        reconstructors[mti->mti_rr.rr_opcode](mti);
+        EXIT;
+}
+
index a0c29fa..89a391f 100644 (file)
@@ -1819,6 +1819,16 @@ static int osd_process_config(const struct lu_context *ctx,
         RETURN(err);
 }
 
+static int osd_recovery_complete(const struct lu_context *ctxt,
+                                 struct lu_device *d)
+{
+        struct osd_device *o = osd_dev(d);
+        int rc = 0;
+        ENTRY;
+        /* TODO: orphans handling */
+        RETURN(rc);
+}
+
 /*
  * fid<->inode<->object functions.
  */
@@ -2054,8 +2064,9 @@ static struct lu_object_operations osd_lu_obj_ops = {
 };
 
 static struct lu_device_operations osd_lu_ops = {
-        .ldo_object_alloc   = osd_object_alloc,
-        .ldo_process_config = osd_process_config
+        .ldo_object_alloc      = osd_object_alloc,
+        .ldo_process_config    = osd_process_config,
+        .ldo_recovery_complete = osd_recovery_complete
 };
 
 static struct lu_device_type_operations osd_device_type_ops = {