Whamcloud - gitweb
LU-3467 target: unified transaction callbacks 30/7330/30
authorMikhail Pershin <mike.pershin@intel.com>
Wed, 14 Aug 2013 09:14:51 +0000 (13:14 +0400)
committerOleg Drokin <oleg.drokin@intel.com>
Fri, 29 Nov 2013 01:31:35 +0000 (01:31 +0000)
Use common transaction callbacks to update last_rcvd
in target

Signed-off-by: Mikhail Pershin <mike.pershin@intel.com>
Change-Id: Ib10d0b958fd9f09753246c85f272e1f917069d01
Reviewed-on: http://review.whamcloud.com/7330
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Tested-by: Jenkins
Reviewed-by: Fan Yong <fan.yong@intel.com>
Tested-by: Maloo <hpdd-maloo@intel.com>
18 files changed:
lustre/include/lu_target.h
lustre/include/obd_target.h
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_internal.h
lustre/mdt/mdt_open.c
lustre/mdt/mdt_recovery.c
lustre/mdt/mdt_reint.c
lustre/mdt/mdt_xattr.c
lustre/ofd/ofd_dev.c
lustre/ofd/ofd_fs.c
lustre/ofd/ofd_internal.h
lustre/ofd/ofd_obd.c
lustre/ofd/ofd_trans.c
lustre/osd-ldiskfs/osd_compat.c
lustre/target/out_handler.c
lustre/target/tgt_internal.h
lustre/target/tgt_lastrcvd.c
lustre/target/tgt_main.c

index 6755af8..eb155f7 100644 (file)
@@ -54,15 +54,14 @@ struct lu_target {
        /* sptlrpc rules */
        rwlock_t                 lut_sptlrpc_lock;
        struct sptlrpc_rule_set  lut_sptlrpc_rset;
        /* sptlrpc rules */
        rwlock_t                 lut_sptlrpc_lock;
        struct sptlrpc_rule_set  lut_sptlrpc_rset;
-       int                      lut_sec_level;
-
        spinlock_t               lut_flags_lock;
        spinlock_t               lut_flags_lock;
+       int                      lut_sec_level;
        unsigned int             lut_mds_capa:1,
                                 lut_oss_capa:1,
                                 lut_syncjournal:1,
        unsigned int             lut_mds_capa:1,
                                 lut_oss_capa:1,
                                 lut_syncjournal:1,
-                                lut_sync_lock_cancel:2;
-
-       /* LAST_RCVD parameters */
+                                lut_sync_lock_cancel:2,
+                                /* e.g. OST node */
+                                lut_no_reconstruct:1;
        /** last_rcvd file */
        struct dt_object        *lut_last_rcvd;
        /* transaction callbacks */
        /** last_rcvd file */
        struct dt_object        *lut_last_rcvd;
        /* transaction callbacks */
@@ -105,13 +104,19 @@ struct tgt_session_info {
 
        struct lu_fid            tsi_fid;
        struct ldlm_res_id       tsi_resid;
 
        struct lu_fid            tsi_fid;
        struct ldlm_res_id       tsi_resid;
+
+       /* object affected by VBR, for last_rcvd_update */
+       struct dt_object        *tsi_vbr_obj;
+       /* opdata for mdt_reint_open(), has the same value as
+        * ldlm_reply:lock_policy_res1.  The tgt_update_last_rcvd() stores
+        * this value onto disk for recovery when tgt_txn_stop_cb() is called.
+        */
+       __u64                    tsi_opdata;
+
        /*
         * Additional fail id that can be set by handler.
         */
        int                      tsi_reply_fail_id;
        /*
         * Additional fail id that can be set by handler.
         */
        int                      tsi_reply_fail_id;
-       int                      tsi_request_fail_id;
-
-       int                      tsi_has_trans:1; /* has txn already? */
        /* request JobID */
        char                    *tsi_jobid;
 };
        /* request JobID */
        char                    *tsi_jobid;
 };
@@ -126,6 +131,37 @@ static inline struct tgt_session_info *tgt_ses_info(const struct lu_env *env)
        return tsi;
 }
 
        return tsi;
 }
 
+static inline void tgt_vbr_obj_set(const struct lu_env *env,
+                                  struct dt_object *obj)
+{
+       struct tgt_session_info *tsi;
+
+       if (env->le_ses != NULL) {
+               tsi = tgt_ses_info(env);
+               tsi->tsi_vbr_obj = obj;
+       }
+}
+
+static inline void tgt_opdata_set(const struct lu_env *env, __u64 flags)
+{
+       struct tgt_session_info *tsi;
+
+       if (env->le_ses != NULL) {
+               tsi = tgt_ses_info(env);
+               tsi->tsi_opdata |= flags;
+       }
+}
+
+static inline void tgt_opdata_clear(const struct lu_env *env, __u64 flags)
+{
+       struct tgt_session_info *tsi;
+
+       if (env->le_ses != NULL) {
+               tsi = tgt_ses_info(env);
+               tsi->tsi_opdata &= ~flags;
+       }
+}
+
 /*
  * Generic unified target support.
  */
 /*
  * Generic unified target support.
  */
@@ -286,12 +322,6 @@ int tgt_server_data_update(const struct lu_env *env, struct lu_target *tg,
                           int sync);
 int tgt_truncate_last_rcvd(const struct lu_env *env, struct lu_target *tg,
                           loff_t off);
                           int sync);
 int tgt_truncate_last_rcvd(const struct lu_env *env, struct lu_target *tg,
                           loff_t off);
-int tgt_last_rcvd_update(const struct lu_env *env, struct lu_target *tgt,
-                        struct dt_object *obj, __u64 opdata,
-                        struct thandle *th, struct ptlrpc_request *req);
-int tgt_last_rcvd_update_echo(const struct lu_env *env, struct lu_target *tgt,
-                             struct dt_object *obj, struct thandle *th,
-                             struct obd_export *exp);
 
 enum {
        ESERIOUS = 0x0001000
 
 enum {
        ESERIOUS = 0x0001000
index d59282f..d387aa1 100644 (file)
@@ -48,7 +48,7 @@ struct obd_device_target {
        struct obd_job_stats    obt_jobstats;
 };
 
        struct obd_job_stats    obt_jobstats;
 };
 
-#define FILTER_SUBDIR_COUNT 32 /* set to zero for no subdirs */
+#define OBJ_SUBDIR_COUNT 32 /* set to zero for no subdirs */
 
 struct filter_obd {
        /* NB this field MUST be first */
 
 struct filter_obd {
        /* NB this field MUST be first */
index e2344fa..1d384e8 100644 (file)
@@ -135,21 +135,25 @@ int mdt_get_disposition(struct ldlm_reply *rep, int flag)
 }
 
 void mdt_clear_disposition(struct mdt_thread_info *info,
 }
 
 void mdt_clear_disposition(struct mdt_thread_info *info,
-                           struct ldlm_reply *rep, int flag)
+                          struct ldlm_reply *rep, int flag)
 {
 {
-        if (info)
-                info->mti_opdata &= ~flag;
-        if (rep)
-                rep->lock_policy_res1 &= ~flag;
+       if (info) {
+               info->mti_opdata &= ~flag;
+               tgt_opdata_clear(info->mti_env, flag);
+       }
+       if (rep)
+               rep->lock_policy_res1 &= ~flag;
 }
 
 void mdt_set_disposition(struct mdt_thread_info *info,
 }
 
 void mdt_set_disposition(struct mdt_thread_info *info,
-                         struct ldlm_reply *rep, int flag)
+                        struct ldlm_reply *rep, int flag)
 {
 {
-        if (info)
-                info->mti_opdata |= flag;
-        if (rep)
-                rep->lock_policy_res1 |= flag;
+       if (info) {
+               info->mti_opdata |= flag;
+               tgt_opdata_set(info->mti_env, flag);
+       }
+       if (rep)
+               rep->lock_policy_res1 |= flag;
 }
 
 void mdt_lock_reg_init(struct mdt_lock_handle *lh, ldlm_mode_t lm)
 }
 
 void mdt_lock_reg_init(struct mdt_lock_handle *lh, ldlm_mode_t lm)
@@ -2637,7 +2641,6 @@ void mdt_thread_info_init(struct ptlrpc_request *req,
                 info->mti_mdt = NULL;
        info->mti_env = req->rq_svc_thread->t_env;
        info->mti_transno = lustre_msg_get_transno(req->rq_reqmsg);
                 info->mti_mdt = NULL;
        info->mti_env = req->rq_svc_thread->t_env;
        info->mti_transno = lustre_msg_get_transno(req->rq_reqmsg);
-       info->mti_mos = NULL;
 
         memset(&info->mti_attr, 0, sizeof(info->mti_attr));
        info->mti_big_buf = LU_BUF_NULL;
 
         memset(&info->mti_attr, 0, sizeof(info->mti_attr));
        info->mti_big_buf = LU_BUF_NULL;
index dc26857..dd36201 100644 (file)
@@ -335,60 +335,6 @@ enum mdt_reint_flag {
         MRF_OPEN_TRUNC = 1 << 0,
 };
 
         MRF_OPEN_TRUNC = 1 << 0,
 };
 
-struct mdt_thread_info;
-struct tx_arg;
-typedef int (*tx_exec_func_t)(const struct lu_env *, struct thandle *,
-                             struct tx_arg *);
-
-struct tx_arg {
-       tx_exec_func_t          exec_fn;
-       tx_exec_func_t          undo_fn;
-       struct dt_object        *object;
-       char                    *file;
-       struct update_reply     *reply;
-       int                     line;
-       int                     index;
-       union {
-               struct {
-                       const struct dt_rec     *rec;
-                       const struct dt_key     *key;
-               } insert;
-               struct {
-               } ref;
-               struct {
-                       struct lu_attr  attr;
-               } attr_set;
-               struct {
-                       struct lu_buf   buf;
-                       const char      *name;
-                       int             flags;
-                       __u32           csum;
-               } xattr_set;
-               struct {
-                       struct lu_attr                  attr;
-                       struct dt_allocation_hint       hint;
-                       struct dt_object_format         dof;
-                       struct lu_fid                   fid;
-               } create;
-               struct {
-                       struct lu_buf   buf;
-                       loff_t          pos;
-               } write;
-               struct {
-                       struct ost_body     *body;
-               } destroy;
-       } u;
-};
-
-#define TX_MAX_OPS       10
-struct thandle_exec_args {
-       struct thandle          *ta_handle;
-       struct dt_device        *ta_dev;
-       int                     ta_err;
-       struct tx_arg           ta_args[TX_MAX_OPS];
-       int                     ta_argno;   /* used args */
-};
-
 /*
  * Common data shared by mdt-level handlers. This is allocated per-thread to
  * reduce stack consumption.
 /*
  * Common data shared by mdt-level handlers. This is allocated per-thread to
  * reduce stack consumption.
@@ -414,9 +360,6 @@ struct mdt_thread_info {
 
         struct mdt_device         *mti_mdt;
         const struct lu_env       *mti_env;
 
         struct mdt_device         *mti_mdt;
         const struct lu_env       *mti_env;
-       /* XXX: temporary flag to have healthy mti during OUT calls
-        * to be removed upon moving MDT to the unified target code */
-       bool                       mti_txn_compat;
 
         /* transaction number of current request */
         __u64                      mti_transno;
 
         /* transaction number of current request */
         __u64                      mti_transno;
@@ -467,8 +410,6 @@ struct mdt_thread_info {
          */
         struct mdt_reint_record    mti_rr;
 
          */
         struct mdt_reint_record    mti_rr;
 
-        /** md objects included in operation */
-        struct mdt_object         *mti_mos;
         __u64                      mti_ver[PTLRPC_NUM_VERSIONS];
         /*
          * Operation specification (currently create and lookup)
         __u64                      mti_ver[PTLRPC_NUM_VERSIONS];
         /*
          * Operation specification (currently create and lookup)
@@ -505,14 +446,6 @@ struct mdt_thread_info {
                         struct md_attr attr;
                         struct md_som_data data;
                 } som;
                         struct md_attr attr;
                         struct md_som_data data;
                 } som;
-               struct {
-                       struct dt_object_format mti_update_dof;
-                       struct update_reply     *mti_update_reply;
-                       struct update           *mti_update;
-                       int                     mti_update_reply_index;
-                       struct obdo             mti_obdo;
-                       struct dt_object        *mti_dt_object;
-               } update;
         } mti_u;
 
         /* IO epoch related stuff. */
         } mti_u;
 
         /* IO epoch related stuff. */
@@ -533,7 +466,6 @@ struct mdt_thread_info {
        int                        mti_big_lmm_used;
        /* should be enough to fit lustre_mdt_attrs */
        char                       mti_xattr_buf[128];
        int                        mti_big_lmm_used;
        /* should be enough to fit lustre_mdt_attrs */
        char                       mti_xattr_buf[128];
-       struct thandle_exec_args   mti_handle;
        struct ldlm_enqueue_info   mti_einfo;
 };
 
        struct ldlm_enqueue_info   mti_einfo;
 };
 
index bc3f42d..a95cef8 100644 (file)
@@ -1721,12 +1721,12 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
                if (fid_is_obf(rr->rr_fid1) || fid_is_dot_lustre(rr->rr_fid1))
                        GOTO(out_child, result = -EPERM);
 
                if (fid_is_obf(rr->rr_fid1) || fid_is_dot_lustre(rr->rr_fid1))
                        GOTO(out_child, result = -EPERM);
 
-                /* save versions in reply */
-                mdt_version_get_save(info, parent, 0);
-                mdt_version_get_save(info, child, 1);
+               /* save versions in reply */
+               mdt_version_get_save(info, parent, 0);
+               mdt_version_get_save(info, child, 1);
 
 
-                /* version of child will be changed */
-                info->mti_mos = child;
+               /* version of child will be changed */
+               tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(child));
 
                 /* Not found and with MDS_OPEN_CREAT: let's create it. */
                 mdt_set_disposition(info, ldlm_rep, DISP_OPEN_CREATE);
 
                 /* Not found and with MDS_OPEN_CREAT: let's create it. */
                 mdt_set_disposition(info, ldlm_rep, DISP_OPEN_CREATE);
index 1358c63..56635af 100644 (file)
@@ -77,370 +77,46 @@ void mdt_trans_stop(const struct lu_env *env,
         dt_trans_stop(env, mdt->mdt_bottom, th);
 }
 
         dt_trans_stop(env, mdt->mdt_bottom, th);
 }
 
-static int mdt_clients_data_init(const struct lu_env *env,
-                                 struct mdt_device *mdt,
-                                 unsigned long last_size)
-{
-        struct lr_server_data  *lsd = &mdt->mdt_lut.lut_lsd;
-        struct lsd_client_data *lcd;
-        struct obd_device      *obd = mdt2obd_dev(mdt);
-        loff_t off;
-        int cl_idx;
-        int rc = 0;
-        ENTRY;
-
-        OBD_ALLOC_PTR(lcd);
-        if (!lcd)
-                RETURN(-ENOMEM);
-
-        /* When we do a clean MDS shutdown, we save the last_transno into
-         * the header.  If we find clients with higher last_transno values
-         * then those clients may need recovery done. */
-        LASSERT(cfs_atomic_read(&obd->obd_req_replay_clients) == 0);
-        for (cl_idx = 0, off = lsd->lsd_client_start;
-             off < last_size; cl_idx++) {
-                __u64 last_transno;
-                struct obd_export *exp;
-                struct mdt_thread_info *mti;
-
-                off = lsd->lsd_client_start +
-                        cl_idx * lsd->lsd_client_size;
-
-               rc = tgt_client_data_read(env, &mdt->mdt_lut, lcd, &off, cl_idx);
-                if (rc) {
-                        CERROR("error reading MDS %s idx %d, off %llu: rc %d\n",
-                               LAST_RCVD, cl_idx, off, rc);
-                        rc = 0;
-                        break; /* read error shouldn't cause startup to fail */
-                }
-
-                if (lcd->lcd_uuid[0] == '\0') {
-                        CDEBUG(D_INFO, "skipping zeroed client at offset %d\n",
-                               cl_idx);
-                        continue;
-                }
-
-                last_transno = lcd_last_transno(lcd);
-
-                /* These exports are cleaned up by mdt_obd_disconnect(), so
-                 * they need to be set up like real exports as
-                 * mdt_obd_connect() does.
-                 */
-                CDEBUG(D_HA, "RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64
-                       " srv lr: "LPU64" lx: "LPU64"\n", lcd->lcd_uuid, cl_idx,
-                       last_transno, lsd->lsd_last_transno,
-                       lcd_last_xid(lcd));
-
-                exp = class_new_export(obd, (struct obd_uuid *)lcd->lcd_uuid);
-                if (IS_ERR(exp)) {
-                        if (PTR_ERR(exp) == -EALREADY) {
-                                /* export already exists, zero out this one */
-                                CERROR("Duplicate export %s!\n", lcd->lcd_uuid);
-                                continue;
-                        }
-                        GOTO(err_client, rc = PTR_ERR(exp));
-                }
-
-                mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
-                LASSERT(mti != NULL);
-                mti->mti_exp = exp;
-                /* copy on-disk lcd to the export */
-                *exp->exp_target_data.ted_lcd = *lcd;
-               rc = tgt_client_add(env, exp, cl_idx);
-                /* can't fail existing */
-                LASSERTF(rc == 0, "rc = %d\n", rc);
-                /* VBR: set export last committed version */
-                exp->exp_last_committed = last_transno;
-               spin_lock(&exp->exp_lock);
-               exp->exp_connecting = 0;
-               exp->exp_in_recovery = 0;
-               spin_unlock(&exp->exp_lock);
-               obd->obd_max_recoverable_clients++;
-               class_export_put(exp);
-
-               CDEBUG(D_OTHER, "client at idx %d has last_transno ="LPU64"\n",
-                      cl_idx, last_transno);
-               /* protect __u64 value update */
-               spin_lock(&mdt->mdt_lut.lut_translock);
-               mdt->mdt_lut.lut_last_transno = max(last_transno,
-                                               mdt->mdt_lut.lut_last_transno);
-               spin_unlock(&mdt->mdt_lut.lut_translock);
-        }
-
-err_client:
-        OBD_FREE_PTR(lcd);
-        RETURN(rc);
-}
-
-static int mdt_server_data_init(const struct lu_env *env,
-                                struct mdt_device *mdt,
-                                struct lustre_sb_info *lsi)
-{
-        struct lr_server_data  *lsd = &mdt->mdt_lut.lut_lsd;
-        struct lsd_client_data *lcd = NULL;
-        struct obd_device      *obd = mdt2obd_dev(mdt);
-        struct mdt_thread_info *mti;
-        struct dt_object       *obj;
-        struct lu_attr         *la;
-        unsigned long last_rcvd_size;
-       __u32                   index;
-        __u64 mount_count;
-        int rc;
-        ENTRY;
-
-        /* ensure padding in the struct is the correct size */
-        CLASSERT(offsetof(struct lr_server_data, lsd_padding) +
-                sizeof(lsd->lsd_padding) == LR_SERVER_SIZE);
-        CLASSERT(offsetof(struct lsd_client_data, lcd_padding) +
-                sizeof(lcd->lcd_padding) == LR_CLIENT_SIZE);
-
-       rc = server_name2index(mdt_obd_name(mdt), &index, NULL);
-       if (rc < 0) {
-               CERROR("%s: Can not get index from obd_name: rc = %d\n",
-                      mdt_obd_name(mdt), rc);
-               RETURN(rc);
-       }
-
-        mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
-        LASSERT(mti != NULL);
-        la = &mti->mti_attr.ma_attr;
-
-        obj = mdt->mdt_lut.lut_last_rcvd;
-       rc = dt_attr_get(env, obj, la, BYPASS_CAPA);
-        if (rc)
-                RETURN(rc);
-
-       last_rcvd_size = (unsigned long)la->la_size;
-
-       if (last_rcvd_size == 0) {
-               LCONSOLE_WARN("%s: new disk, initializing\n",
-                             mdt_obd_name(mdt));
-
-               memcpy(lsd->lsd_uuid, obd->obd_uuid.uuid,
-                      sizeof(lsd->lsd_uuid));
-               lsd->lsd_last_transno = 0;
-               lsd->lsd_mount_count = 0;
-               lsd->lsd_server_size = LR_SERVER_SIZE;
-               lsd->lsd_client_start = LR_CLIENT_START;
-               lsd->lsd_client_size = LR_CLIENT_SIZE;
-               lsd->lsd_feature_compat = OBD_COMPAT_MDT;
-               lsd->lsd_feature_rocompat = OBD_ROCOMPAT_LOVOBJID;
-               lsd->lsd_feature_incompat = OBD_INCOMPAT_MDT |
-                                           OBD_INCOMPAT_COMMON_LR |
-                                           OBD_INCOMPAT_MULTI_OI;
-               lsd->lsd_osd_index = index;
-       } else {
-               LCONSOLE_WARN("%s: used disk, loading\n", mdt_obd_name(mdt));
-               rc = tgt_server_data_read(env, &mdt->mdt_lut);
-               if (rc) {
-                       CERROR("error reading MDS %s: rc %d\n", LAST_RCVD, rc);
-                       GOTO(out, rc);
-               }
-               if (strcmp(lsd->lsd_uuid, obd->obd_uuid.uuid) != 0) {
-                       LCONSOLE_ERROR_MSG(0x157, "Trying to start OBD %s using"
-                                          "the wrong disk %s. Were the /dev/ "
-                                          "assignments rearranged?\n",
-                                          obd->obd_uuid.uuid, lsd->lsd_uuid);
-                       GOTO(out, rc = -EINVAL);
-               }
-               lsd->lsd_feature_compat |= OBD_COMPAT_MDT;
-               lsd->lsd_feature_incompat |= OBD_INCOMPAT_MDT |
-                                               OBD_INCOMPAT_COMMON_LR;
-               if (lsd->lsd_osd_index != index) {
-                       LCONSOLE_ERROR_MSG(0x157, "%s: index %d in last rcvd is"
-                                          "different with the index %d in"
-                                          "config log, It might be disk"
-                                          "corruption!\n", mdt_obd_name(mdt),
-                                          lsd->lsd_osd_index, index);
-                       GOTO(out, rc = -EINVAL);
-               }
-       }
-       mount_count = lsd->lsd_mount_count;
-
-       if (lsd->lsd_feature_incompat & ~MDT_INCOMPAT_SUPP) {
-               CERROR("%s: unsupported incompat filesystem feature(s) %x\n",
-                      mdt_obd_name(mdt),
-                      lsd->lsd_feature_incompat & ~MDT_INCOMPAT_SUPP);
-               GOTO(out, rc = -EINVAL);
-       }
-       if (lsd->lsd_feature_rocompat & ~MDT_ROCOMPAT_SUPP) {
-               CERROR("%s: unsupported read-only filesystem feature(s) %x\n",
-                      mdt_obd_name(mdt),
-                      lsd->lsd_feature_rocompat & ~MDT_ROCOMPAT_SUPP);
-               /* XXX: Do something like remount filesystem read-only */
-               GOTO(out, rc = -EINVAL);
-       }
-       /** Interop: evict all clients at first boot with 1.8 last_rcvd */
-       if (!(lsd->lsd_feature_compat & OBD_COMPAT_20)) {
-               if (last_rcvd_size > lsd->lsd_client_start) {
-                       LCONSOLE_WARN("Mounting %s at first time on 1.8 FS, "
-                                     "remove all clients for interop needs\n",
-                                     mdt_obd_name(mdt));
-                       rc = tgt_truncate_last_rcvd(env, &mdt->mdt_lut,
-                                                   lsd->lsd_client_start);
-                       if (rc)
-                               GOTO(out, rc);
-                       last_rcvd_size = lsd->lsd_client_start;
-               }
-               /** set 2.0 flag to upgrade/downgrade between 1.8 and 2.0 */
-               lsd->lsd_feature_compat |= OBD_COMPAT_20;
-       }
-
-       lsd->lsd_feature_incompat |= OBD_INCOMPAT_FID;
-
-       spin_lock(&mdt->mdt_lut.lut_translock);
-       mdt->mdt_lut.lut_last_transno = lsd->lsd_last_transno;
-       spin_unlock(&mdt->mdt_lut.lut_translock);
-
-       CDEBUG(D_INODE, "=======,=BEGIN DUMPING LAST_RCVD========\n");
-       CDEBUG(D_INODE, "%s: server last_transno: "LPU64"\n",
-              mdt_obd_name(mdt), mdt->mdt_lut.lut_last_transno);
-       CDEBUG(D_INODE, "%s: server mount_count: "LPU64"\n",
-              mdt_obd_name(mdt), mount_count + 1);
-       CDEBUG(D_INODE, "%s: server data size: %u\n",
-              mdt_obd_name(mdt), lsd->lsd_server_size);
-       CDEBUG(D_INODE, "%s: per-client data start: %u\n",
-              mdt_obd_name(mdt), lsd->lsd_client_start);
-       CDEBUG(D_INODE, "%s: per-client data size: %u\n",
-              mdt_obd_name(mdt), lsd->lsd_client_size);
-       CDEBUG(D_INODE, "%s: last_rcvd size: %lu\n",
-              mdt_obd_name(mdt), last_rcvd_size);
-       CDEBUG(D_INODE, "%s: last_rcvd clients: %lu\n", mdt_obd_name(mdt),
-              last_rcvd_size <= lsd->lsd_client_start ? 0 :
-              (last_rcvd_size - lsd->lsd_client_start) /
-               lsd->lsd_client_size);
-       CDEBUG(D_INODE, "========END DUMPING LAST_RCVD========\n");
-
-        if (!lsd->lsd_server_size || !lsd->lsd_client_start ||
-            !lsd->lsd_client_size) {
-                CERROR("Bad last_rcvd contents!\n");
-                GOTO(out, rc = -EINVAL);
-        }
-
-        rc = mdt_clients_data_init(env, mdt, last_rcvd_size);
-        if (rc)
-                GOTO(err_client, rc);
-
-       spin_lock(&mdt->mdt_lut.lut_translock);
-       /* obd_last_committed is used for compatibility
-        * with other lustre recovery code */
-       obd->obd_last_committed = mdt->mdt_lut.lut_last_transno;
-       spin_unlock(&mdt->mdt_lut.lut_translock);
-
-        obd->u.obt.obt_mount_count = mount_count + 1;
-        obd->u.obt.obt_instance = (__u32)obd->u.obt.obt_mount_count;
-        lsd->lsd_mount_count = obd->u.obt.obt_mount_count;
-
-        /* save it, so mount count and last_transno is current */
-       rc = tgt_server_data_update(env, &mdt->mdt_lut, 0);
-        if (rc)
-                GOTO(err_client, rc);
-
-        RETURN(0);
-
-err_client:
-        class_disconnect_exports(obd);
-out:
-        return rc;
-}
-
 /*
  * last_rcvd & last_committed update callbacks
  */
 extern struct lu_context_key mdt_thread_key;
 
 /*
  * last_rcvd & last_committed update callbacks
  */
 extern struct lu_context_key mdt_thread_key;
 
-/* add credits for last_rcvd update */
-static int mdt_txn_start_cb(const struct lu_env *env,
-                           struct thandle *th, void *cookie)
-{
-       struct lu_target        *tgt = cookie;
-       struct mdt_thread_info  *mti;
-       int                      rc;
-
-       /* if there is no session, then this transaction is not result of
-        * request processing but some local operation or echo client */
-       if (env->le_ses == NULL)
-               return 0;
-
-       mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
-
-       LASSERT(tgt->lut_last_rcvd);
-       if (mti->mti_exp == NULL)
-               return 0;
-
-       rc = dt_declare_record_write(env, tgt->lut_last_rcvd,
-                                    sizeof(struct lsd_client_data),
-                                    mti->mti_exp->exp_target_data.ted_lr_off,
-                                    th);
-       if (rc)
-               return rc;
-
-       rc = dt_declare_record_write(env, tgt->lut_last_rcvd,
-                                    sizeof(struct lr_server_data), 0, th);
-       if (rc)
-               return rc;
-
-       /* we probably should not set local transno to the remote object
-        * on another storage, What about VBR on remote object? XXX */
-       if (mti->mti_mos != NULL && !mdt_object_remote(mti->mti_mos))
-               rc = dt_declare_version_set(env, mdt_obj2dt(mti->mti_mos), th);
-
-       return rc;
-}
-
-/* Update last_rcvd records with latests transaction data */
+/* This callback notifies MDT that transaction was done. This is needed by
+ * mdt_save_lock() only. It is similar to new target code and will be removed
+ * as mdt_save_lock() will be converted to use target structures */
 static int mdt_txn_stop_cb(const struct lu_env *env,
                            struct thandle *txn, void *cookie)
 {
 static int mdt_txn_stop_cb(const struct lu_env *env,
                            struct thandle *txn, void *cookie)
 {
-       struct lu_target        *tgt = cookie;
        struct mdt_thread_info  *mti;
        struct mdt_thread_info  *mti;
-       struct dt_object        *obj = NULL;
-       int                      rc;
-
-       if (env->le_ses == NULL)
-               return 0;
 
        mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
        LASSERT(mti);
 
 
        mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
        LASSERT(mti);
 
-       if (mti->mti_mos != NULL &&
-           !mdt_object_remote(mti->mti_mos)) {
-               obj = mdt_obj2dt(mti->mti_mos);
-       }
-
-       rc = tgt_last_rcvd_update(env, tgt, obj, mti->mti_opdata, txn,
-                                 mdt_info_req(mti));
-
-       /* This callback should notify MDT that transaction was
-        * done for mdt_save_lock() */
        if (mti->mti_has_trans)
                CDEBUG(D_INFO, "More than one transaction\n");
        else
                mti->mti_has_trans = 1;
        if (mti->mti_has_trans)
                CDEBUG(D_INFO, "More than one transaction\n");
        else
                mti->mti_has_trans = 1;
-       return rc;
+       return 0;
 }
 
 int mdt_fs_setup(const struct lu_env *env, struct mdt_device *mdt,
 }
 
 int mdt_fs_setup(const struct lu_env *env, struct mdt_device *mdt,
-                 struct obd_device *obd,
-                 struct lustre_sb_info *lsi)
+                struct obd_device *obd, struct lustre_sb_info *lsi)
 {
 {
-        int rc = 0;
-        ENTRY;
+       int rc = 0;
 
 
-        if (OBD_FAIL_CHECK(OBD_FAIL_MDS_FS_SETUP))
-                RETURN(-ENOENT);
+       ENTRY;
 
 
-        /* prepare transactions callbacks */
-        mdt->mdt_txn_cb.dtc_txn_start = mdt_txn_start_cb;
-        mdt->mdt_txn_cb.dtc_txn_stop = mdt_txn_stop_cb;
-        mdt->mdt_txn_cb.dtc_txn_commit = NULL;
-        mdt->mdt_txn_cb.dtc_cookie = &mdt->mdt_lut;
-        mdt->mdt_txn_cb.dtc_tag = LCT_MD_THREAD;
-        CFS_INIT_LIST_HEAD(&mdt->mdt_txn_cb.dtc_linkage);
+       if (OBD_FAIL_CHECK(OBD_FAIL_MDS_FS_SETUP))
+               RETURN(-ENOENT);
 
 
-       rc = mdt_server_data_init(env, mdt, lsi);
-       if (rc != 0)
-               RETURN(rc);
+       /* prepare transactions callbacks */
+       mdt->mdt_txn_cb.dtc_txn_start = NULL;
+       mdt->mdt_txn_cb.dtc_txn_stop = mdt_txn_stop_cb;
+       mdt->mdt_txn_cb.dtc_txn_commit = NULL;
+       mdt->mdt_txn_cb.dtc_cookie = NULL;
+       mdt->mdt_txn_cb.dtc_tag = LCT_MD_THREAD;
+       CFS_INIT_LIST_HEAD(&mdt->mdt_txn_cb.dtc_linkage);
 
        dt_txn_callback_add(mdt->mdt_bottom, &mdt->mdt_txn_cb);
 
 
        dt_txn_callback_add(mdt->mdt_bottom, &mdt->mdt_txn_cb);
 
index 617ac43..a3d8a2f 100644 (file)
@@ -349,7 +349,7 @@ static int mdt_md_create(struct mdt_thread_info *info)
                                OBD_FAIL_MDS_REINT_CREATE_WRITE);
 
                 /* Version of child will be updated on disk. */
                                OBD_FAIL_MDS_REINT_CREATE_WRITE);
 
                 /* Version of child will be updated on disk. */
-                info->mti_mos = child;
+               tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(child));
                 rc = mdt_version_get_check_save(info, child, 2);
                 if (rc)
                         GOTO(out_put_child, rc);
                 rc = mdt_version_get_check_save(info, child, 2);
                 if (rc)
                         GOTO(out_put_child, rc);
@@ -428,7 +428,7 @@ int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo,
         /* VBR: update version if attr changed are important for recovery */
         if (do_vbr) {
                 /* update on-disk version of changed object */
         /* VBR: update version if attr changed are important for recovery */
         if (do_vbr) {
                 /* update on-disk version of changed object */
-                info->mti_mos = mo;
+               tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(mo));
                 rc = mdt_version_get_check_save(info, mo, 0);
                 if (rc)
                         GOTO(out_unlock, rc);
                 rc = mdt_version_get_check_save(info, mo, 0);
                 if (rc)
                         GOTO(out_unlock, rc);
@@ -964,7 +964,7 @@ static int mdt_reint_link(struct mdt_thread_info *info,
         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
                        OBD_FAIL_MDS_REINT_LINK_WRITE);
 
         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
                        OBD_FAIL_MDS_REINT_LINK_WRITE);
 
-        info->mti_mos = ms;
+       tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(ms));
         rc = mdt_version_get_check_save(info, ms, 1);
         if (rc)
                 GOTO(out_unlock_child, rc);
         rc = mdt_version_get_check_save(info, ms, 1);
         if (rc)
                 GOTO(out_unlock_child, rc);
@@ -1252,7 +1252,7 @@ static int mdt_reint_rename(struct mdt_thread_info *info,
                 GOTO(out_unlock_target, rc);
         }
 
                 GOTO(out_unlock_target, rc);
         }
 
-        info->mti_mos = mold;
+       tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(mold));
         /* save version after locking */
         mdt_version_get_save(info, mold, 2);
         mdt_set_capainfo(info, 2, old_fid, BYPASS_CAPA);
         /* save version after locking */
         mdt_version_get_save(info, mold, 2);
         mdt_set_capainfo(info, 2, old_fid, BYPASS_CAPA);
index dfb7676..ecbf3b7 100644 (file)
@@ -455,10 +455,10 @@ int mdt_reint_setxattr(struct mdt_thread_info *info,
         if (IS_ERR(obj))
                 GOTO(out, rc =  PTR_ERR(obj));
 
         if (IS_ERR(obj))
                 GOTO(out, rc =  PTR_ERR(obj));
 
-        info->mti_mos = obj;
-        rc = mdt_version_get_check_save(info, obj, 0);
-        if (rc)
-                GOTO(out_unlock, rc);
+       tgt_vbr_obj_set(env, mdt_obj2dt(obj));
+       rc = mdt_version_get_check_save(info, obj, 0);
+       if (rc)
+               GOTO(out_unlock, rc);
 
        if (unlikely(!(valid & OBD_MD_FLCTIME))) {
                /* This isn't strictly an error, but all current clients
 
        if (unlikely(!(valid & OBD_MD_FLCTIME))) {
                /* This isn't strictly an error, but all current clients
index d8196f6..2ddd9a2 100644 (file)
@@ -925,7 +925,6 @@ static int ofd_getattr_hdl(struct tgt_session_info *tsi)
        ldlm_mode_t              lock_mode = LCK_PR;
        bool                     srvlock;
        int                      rc;
        ldlm_mode_t              lock_mode = LCK_PR;
        bool                     srvlock;
        int                      rc;
-
        ENTRY;
 
        LASSERT(tsi->tsi_ost_body != NULL);
        ENTRY;
 
        LASSERT(tsi->tsi_ost_body != NULL);
@@ -1307,15 +1306,6 @@ static int ofd_destroy_hdl(struct tgt_session_info *tsi)
        else
                count = 1; /* default case - single destroy */
 
        else
                count = 1; /* default case - single destroy */
 
-       /**
-        * There can be sequence of objects to destroy. Therefore this request
-        * may have multiple transaction involved in. It is OK, we need only
-        * the highest used transno to be reported back in reply but not for
-        * replays, they must report their transno
-        */
-       if (fti->fti_transno == 0) /* not replay */
-               fti->fti_mult_trans = 1;
-
        CDEBUG(D_HA, "%s: Destroy object "DOSTID" count %d\n", ofd_name(ofd),
               POSTID(&body->oa.o_oi), count);
        while (count > 0) {
        CDEBUG(D_HA, "%s: Destroy object "DOSTID" count %d\n", ofd_name(ofd),
               POSTID(&body->oa.o_oi), count);
        while (count > 0) {
@@ -1345,22 +1335,6 @@ static int ofd_destroy_hdl(struct tgt_session_info *tsi)
                ostid_inc_id(&repbody->oa.o_oi);
        }
 
                ostid_inc_id(&repbody->oa.o_oi);
        }
 
-       /* if we have transaction then there were some deletions, we don't
-        * need to return ENOENT in that case because it will not wait
-        * for commit of these deletions. The ENOENT must be returned only
-        * if there were no transations.
-        */
-       if (rc == -ENOENT) {
-               if (fti->fti_transno != 0)
-                       rc = 0;
-       } else if (rc != 0) {
-               /*
-                * If we have at least one transaction then llog record
-                * on server will be removed upon commit, so for rc != 0
-                * we return no transno and llog record will be reprocessed.
-                */
-               fti->fti_transno = 0;
-       }
        ofd_counter_incr(tsi->tsi_exp, LPROC_OFD_STATS_DESTROY,
                         tsi->tsi_jobid, 1);
 out:
        ofd_counter_incr(tsi->tsi_exp, LPROC_OFD_STATS_DESTROY,
                         tsi->tsi_jobid, 1);
 out:
@@ -1877,11 +1851,7 @@ static void ofd_key_exit(const struct lu_context *ctx,
        info->fti_exp = NULL;
 
        info->fti_xid = 0;
        info->fti_exp = NULL;
 
        info->fti_xid = 0;
-       info->fti_transno = 0;
        info->fti_pre_version = 0;
        info->fti_pre_version = 0;
-       info->fti_obj = NULL;
-       info->fti_has_trans = 0;
-       info->fti_mult_trans = 0;
        info->fti_used = 0;
 
        memset(&info->fti_attr, 0, sizeof info->fti_attr);
        info->fti_used = 0;
 
        memset(&info->fti_attr, 0, sizeof info->fti_attr);
index e8d4cb2..ba0cd9f 100644 (file)
@@ -418,223 +418,6 @@ int ofd_seqs_init(const struct lu_env *env, struct ofd_device *ofd)
        return rc;
 }
 
        return rc;
 }
 
-int ofd_clients_data_init(const struct lu_env *env, struct ofd_device *ofd,
-                         unsigned long fsize)
-{
-       struct obd_device               *obd = ofd_obd(ofd);
-       struct lr_server_data           *lsd = &ofd->ofd_lut.lut_lsd;
-       struct lsd_client_data          *lcd = NULL;
-       struct filter_export_data       *fed;
-       int                              cl_idx;
-       int                              rc = 0;
-       loff_t                           off = lsd->lsd_client_start;
-
-       CLASSERT(offsetof(struct lsd_client_data, lcd_padding) +
-                sizeof(lcd->lcd_padding) == LR_CLIENT_SIZE);
-
-       OBD_ALLOC_PTR(lcd);
-       if (lcd == NULL)
-               RETURN(-ENOMEM);
-
-       for (cl_idx = 0; off < fsize; cl_idx++) {
-               struct obd_export       *exp;
-               __u64                    last_rcvd;
-
-               /* Don't assume off is incremented properly by
-                * read_record(), in case sizeof(*lcd)
-                * isn't the same as fsd->lsd_client_size.  */
-               off = lsd->lsd_client_start + cl_idx * lsd->lsd_client_size;
-               rc = tgt_client_data_read(env, &ofd->ofd_lut, lcd, &off, cl_idx);
-               if (rc) {
-                       CERROR("%s: error reading FILT %s idx %d off %llu: "
-                              "rc = %d\n", ofd_name(ofd), LAST_RCVD, cl_idx,
-                              off, rc);
-                       rc = 0;
-                       break; /* read error shouldn't cause startup to fail */
-               }
-
-               if (lcd->lcd_uuid[0] == '\0') {
-                       CDEBUG(D_INFO, "skipping zeroed client at offset %d\n",
-                              cl_idx);
-                       continue;
-               }
-
-               last_rcvd = lcd->lcd_last_transno;
-
-               /* These exports are cleaned up by ofd_disconnect(), so they
-                * need to be set up like real exports as ofd_connect() does.
-                */
-               exp = class_new_export(obd, (struct obd_uuid *)lcd->lcd_uuid);
-
-               CDEBUG(D_HA, "RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64
-                      " srv lr: "LPU64"\n", lcd->lcd_uuid, cl_idx,
-                      last_rcvd, lsd->lsd_last_transno);
-
-               if (IS_ERR(exp)) {
-                       if (PTR_ERR(exp) == -EALREADY) {
-                               /* export already exists, zero out this one */
-                               CERROR("%s: Duplicate export %s!\n",
-                                      ofd_name(ofd), lcd->lcd_uuid);
-                               continue;
-                       }
-                       GOTO(err_out, rc = PTR_ERR(exp));
-               }
-
-               fed = &exp->exp_filter_data;
-               *fed->fed_ted.ted_lcd = *lcd;
-
-               rc = tgt_client_add(env, exp, cl_idx);
-               LASSERTF(rc == 0, "rc = %d\n", rc); /* can't fail existing */
-               /* VBR: set export last committed version */
-               exp->exp_last_committed = last_rcvd;
-               spin_lock(&exp->exp_lock);
-               exp->exp_connecting = 0;
-               exp->exp_in_recovery = 0;
-               spin_unlock(&exp->exp_lock);
-               obd->obd_max_recoverable_clients++;
-               class_export_put(exp);
-
-               /* Need to check last_rcvd even for duplicated exports. */
-               CDEBUG(D_OTHER, "client at idx %d has last_rcvd = "LPU64"\n",
-                      cl_idx, last_rcvd);
-
-               spin_lock(&ofd->ofd_lut.lut_translock);
-               if (last_rcvd > lsd->lsd_last_transno)
-                       lsd->lsd_last_transno = last_rcvd;
-               spin_unlock(&ofd->ofd_lut.lut_translock);
-       }
-
-err_out:
-       OBD_FREE_PTR(lcd);
-       RETURN(rc);
-}
-
-int ofd_server_data_init(const struct lu_env *env, struct ofd_device *ofd)
-{
-       struct ofd_thread_info  *info = ofd_info(env);
-       struct lr_server_data   *lsd = &ofd->ofd_lut.lut_lsd;
-       struct obd_device       *obd = ofd_obd(ofd);
-       unsigned long           last_rcvd_size;
-       __u32                   index;
-       int                     rc;
-
-       rc = dt_attr_get(env, ofd->ofd_lut.lut_last_rcvd, &info->fti_attr,
-                        BYPASS_CAPA);
-       if (rc)
-               RETURN(rc);
-
-       last_rcvd_size = (unsigned long)info->fti_attr.la_size;
-
-       /* ensure padding in the struct is the correct size */
-       CLASSERT (offsetof(struct lr_server_data, lsd_padding) +
-                 sizeof(lsd->lsd_padding) == LR_SERVER_SIZE);
-
-       rc = server_name2index(obd->obd_name, &index, NULL);
-       if (rc < 0) {
-               CERROR("%s: Can not get index from obd_name: rc = %d\n",
-                      obd->obd_name, rc);
-               RETURN(rc);
-       }
-
-       if (last_rcvd_size == 0) {
-               LCONSOLE_WARN("%s: new disk, initializing\n", obd->obd_name);
-
-               memcpy(lsd->lsd_uuid, obd->obd_uuid.uuid,
-                      sizeof(lsd->lsd_uuid));
-               lsd->lsd_last_transno = 0;
-               lsd->lsd_mount_count = 0;
-               lsd->lsd_server_size = LR_SERVER_SIZE;
-               lsd->lsd_client_start = LR_CLIENT_START;
-               lsd->lsd_client_size = LR_CLIENT_SIZE;
-               lsd->lsd_subdir_count = FILTER_SUBDIR_COUNT;
-               lsd->lsd_feature_incompat = OBD_INCOMPAT_OST;
-               lsd->lsd_osd_index = index;
-       } else {
-               rc = tgt_server_data_read(env, &ofd->ofd_lut);
-               if (rc) {
-                       CDEBUG(D_INODE,"OBD ofd: error reading %s: rc %d\n",
-                              LAST_RCVD, rc);
-                       GOTO(err_fsd, rc);
-               }
-               if (strcmp((char *)lsd->lsd_uuid,
-                          (char *)obd->obd_uuid.uuid)) {
-                       LCONSOLE_ERROR("Trying to start OBD %s using the wrong"
-                                      " disk %s. Were the /dev/ assignments "
-                                      "rearranged?\n",
-                                      obd->obd_uuid.uuid, lsd->lsd_uuid);
-                       GOTO(err_fsd, rc = -EINVAL);
-               }
-
-               if (lsd->lsd_osd_index == 0) {
-                       lsd->lsd_osd_index = index;
-               } else if (lsd->lsd_osd_index != index) {
-                       LCONSOLE_ERROR("%s: index %d in last rcvd is different"
-                                      " with the index %d in config log."
-                                      " It might be disk corruption!\n",
-                                      obd->obd_name, lsd->lsd_osd_index,
-                                      index);
-                       GOTO(err_fsd, rc = -EINVAL);
-               }
-       }
-
-       lsd->lsd_mount_count++;
-       obd->u.obt.obt_mount_count = lsd->lsd_mount_count;
-       obd->u.obt.obt_instance = (__u32)obd->u.obt.obt_mount_count;
-       ofd->ofd_subdir_count = lsd->lsd_subdir_count;
-
-       if (lsd->lsd_feature_incompat & ~OFD_INCOMPAT_SUPP) {
-               CERROR("%s: unsupported incompat filesystem feature(s) %x\n",
-                      obd->obd_name,
-                      lsd->lsd_feature_incompat & ~OFD_INCOMPAT_SUPP);
-               GOTO(err_fsd, rc = -EINVAL);
-       }
-       if (lsd->lsd_feature_rocompat & ~OFD_ROCOMPAT_SUPP) {
-               CERROR("%s: unsupported read-only filesystem feature(s) %x\n",
-                      obd->obd_name,
-                      lsd->lsd_feature_rocompat & ~OFD_ROCOMPAT_SUPP);
-               /* Do something like remount filesystem read-only */
-               GOTO(err_fsd, rc = -EINVAL);
-       }
-
-       CDEBUG(D_INODE, "%s: server last_transno : "LPU64"\n",
-              obd->obd_name, lsd->lsd_last_transno);
-       CDEBUG(D_INODE, "%s: server mount_count: "LPU64"\n",
-              obd->obd_name, lsd->lsd_mount_count);
-       CDEBUG(D_INODE, "%s: server data size: %u\n",
-              obd->obd_name, lsd->lsd_server_size);
-       CDEBUG(D_INODE, "%s: per-client data start: %u\n",
-              obd->obd_name, lsd->lsd_client_start);
-       CDEBUG(D_INODE, "%s: per-client data size: %u\n",
-              obd->obd_name, lsd->lsd_client_size);
-       CDEBUG(D_INODE, "%s: server subdir_count: %u\n",
-              obd->obd_name, lsd->lsd_subdir_count);
-       CDEBUG(D_INODE, "%s: last_rcvd clients: %lu\n", obd->obd_name,
-              last_rcvd_size <= lsd->lsd_client_start ? 0 :
-              (last_rcvd_size - lsd->lsd_client_start) /
-              lsd->lsd_client_size);
-
-       if (!obd->obd_replayable)
-               CWARN("%s: recovery support OFF\n", obd->obd_name);
-
-       rc = ofd_clients_data_init(env, ofd, last_rcvd_size);
-
-       spin_lock(&ofd->ofd_lut.lut_translock);
-       obd->obd_last_committed = lsd->lsd_last_transno;
-       ofd->ofd_lut.lut_last_transno = lsd->lsd_last_transno;
-       spin_unlock(&ofd->ofd_lut.lut_translock);
-
-       /* save it, so mount count and last_transno is current */
-       rc = tgt_server_data_update(env, &ofd->ofd_lut, 0);
-       if (rc)
-               GOTO(err_fsd, rc);
-
-       RETURN(0);
-
-err_fsd:
-       class_disconnect_exports(obd);
-       RETURN(rc);
-}
-
 int ofd_fs_setup(const struct lu_env *env, struct ofd_device *ofd,
                 struct obd_device *obd)
 {
 int ofd_fs_setup(const struct lu_env *env, struct ofd_device *ofd,
                 struct obd_device *obd)
 {
@@ -647,20 +430,6 @@ int ofd_fs_setup(const struct lu_env *env, struct ofd_device *ofd,
        if (OBD_FAIL_CHECK(OBD_FAIL_MDS_FS_SETUP))
                RETURN (-ENOENT);
 
        if (OBD_FAIL_CHECK(OBD_FAIL_MDS_FS_SETUP))
                RETURN (-ENOENT);
 
-       /* prepare transactions callbacks */
-       ofd->ofd_txn_cb.dtc_txn_start = NULL;
-       ofd->ofd_txn_cb.dtc_txn_stop = ofd_txn_stop_cb;
-       ofd->ofd_txn_cb.dtc_txn_commit = NULL;
-       ofd->ofd_txn_cb.dtc_cookie = ofd;
-       ofd->ofd_txn_cb.dtc_tag = LCT_DT_THREAD;
-       CFS_INIT_LIST_HEAD(&ofd->ofd_txn_cb.dtc_linkage);
-
-       dt_txn_callback_add(ofd->ofd_osd, &ofd->ofd_txn_cb);
-
-       rc = ofd_server_data_init(env, ofd);
-       if (rc)
-               GOTO(out, rc);
-
        lu_local_obj_fid(&info->fti_fid, OFD_HEALTH_CHECK_OID);
        memset(&info->fti_attr, 0, sizeof(info->fti_attr));
        info->fti_attr.la_valid = LA_MODE;
        lu_local_obj_fid(&info->fti_fid, OFD_HEALTH_CHECK_OID);
        memset(&info->fti_attr, 0, sizeof(info->fti_attr));
        info->fti_attr.la_valid = LA_MODE;
@@ -682,7 +451,6 @@ int ofd_fs_setup(const struct lu_env *env, struct ofd_device *ofd,
 out_hc:
        lu_object_put(env, &ofd->ofd_health_check_file->do_lu);
 out:
 out_hc:
        lu_object_put(env, &ofd->ofd_health_check_file->do_lu);
 out:
-       dt_txn_callback_del(ofd->ofd_osd, &ofd->ofd_txn_cb);
        return rc;
 }
 
        return rc;
 }
 
@@ -700,9 +468,6 @@ void ofd_fs_cleanup(const struct lu_env *env, struct ofd_device *ofd)
        if (i)
                CERROR("can't sync: %d\n", i);
 
        if (i)
                CERROR("can't sync: %d\n", i);
 
-       /* Remove transaction callback */
-       dt_txn_callback_del(ofd->ofd_osd, &ofd->ofd_txn_cb);
-
        if (ofd->ofd_health_check_file) {
                lu_object_put(env, &ofd->ofd_health_check_file->do_lu);
                ofd->ofd_health_check_file = NULL;
        if (ofd->ofd_health_check_file) {
                lu_object_put(env, &ofd->ofd_health_check_file->do_lu);
                ofd->ofd_health_check_file = NULL;
index 1667d0d..d96a517 100644 (file)
@@ -49,7 +49,7 @@
 #define OFD_ROCOMPAT_SUPP (0)
 #define OFD_INCOMPAT_SUPP (OBD_INCOMPAT_GROUPS | OBD_INCOMPAT_OST | \
                           OBD_INCOMPAT_COMMON_LR)
 #define OFD_ROCOMPAT_SUPP (0)
 #define OFD_INCOMPAT_SUPP (OBD_INCOMPAT_GROUPS | OBD_INCOMPAT_OST | \
                           OBD_INCOMPAT_COMMON_LR)
-#define OFD_PRECREATE_BATCH_DEFAULT (FILTER_SUBDIR_COUNT * 4)
+#define OFD_PRECREATE_BATCH_DEFAULT (OBJ_SUBDIR_COUNT * 4)
 
 /* on small filesystems we should not precreate too many objects in
  * a single transaction, otherwise we can overflow transactions */
 
 /* on small filesystems we should not precreate too many objects in
  * a single transaction, otherwise we can overflow transactions */
@@ -124,9 +124,6 @@ struct ofd_device {
        /* DLM name-space for meta-data locks maintained by this server */
        struct ldlm_namespace   *ofd_namespace;
 
        /* DLM name-space for meta-data locks maintained by this server */
        struct ldlm_namespace   *ofd_namespace;
 
-       /* transaction callbacks */
-       struct dt_txn_callback   ofd_txn_cb;
-
        /* last_rcvd file */
        struct lu_target         ofd_lut;
        struct dt_object        *ofd_health_check_file;
        /* last_rcvd file */
        struct lu_target         ofd_lut;
        struct dt_object        *ofd_health_check_file;
@@ -290,10 +287,7 @@ struct ofd_thread_info {
 
        struct obd_export               *fti_exp;
        __u64                            fti_xid;
 
        struct obd_export               *fti_exp;
        __u64                            fti_xid;
-       __u64                            fti_transno;
        __u64                            fti_pre_version;
        __u64                            fti_pre_version;
-       __u32                            fti_has_trans:1, /* has txn already */
-                                        fti_mult_trans:1;
 
        struct lu_fid                    fti_fid;
        struct lu_attr                   fti_attr;
 
        struct lu_fid                    fti_fid;
        struct lu_attr                   fti_attr;
@@ -541,8 +535,6 @@ static inline struct ofd_thread_info *ofd_info_init(const struct lu_env *env,
        info->fti_env = env;
        info->fti_exp = exp;
        info->fti_pre_version = 0;
        info->fti_env = env;
        info->fti_exp = exp;
        info->fti_pre_version = 0;
-       info->fti_transno = 0;
-       info->fti_has_trans = 0;
        return info;
 }
 
        return info;
 }
 
@@ -558,7 +550,6 @@ static inline struct ofd_thread_info *tsi2ofd_info(struct tgt_session_info *tsi)
 
        info->fti_env = tsi->tsi_env;
        info->fti_exp = tsi->tsi_exp;
 
        info->fti_env = tsi->tsi_env;
        info->fti_exp = tsi->tsi_exp;
-       info->fti_has_trans = 0;
 
        info->fti_xid = req->rq_xid;
        /** VBR: take versions from request */
 
        info->fti_xid = req->rq_xid;
        /** VBR: take versions from request */
@@ -567,7 +558,6 @@ static inline struct ofd_thread_info *tsi2ofd_info(struct tgt_session_info *tsi)
                __u64 *pre_version = lustre_msg_get_versions(req->rq_reqmsg);
 
                info->fti_pre_version = pre_version ? pre_version[0] : 0;
                __u64 *pre_version = lustre_msg_get_versions(req->rq_reqmsg);
 
                info->fti_pre_version = pre_version ? pre_version[0] : 0;
-               info->fti_transno = lustre_msg_get_transno(req->rq_reqmsg);
        }
        return info;
 }
        }
        return info;
 }
@@ -576,7 +566,6 @@ static inline void ofd_oti2info(struct ofd_thread_info *info,
                                struct obd_trans_info *oti)
 {
        info->fti_xid = oti->oti_xid;
                                struct obd_trans_info *oti)
 {
        info->fti_xid = oti->oti_xid;
-       info->fti_transno = oti->oti_transno;
        info->fti_pre_version = oti->oti_pre_version;
 }
 
        info->fti_pre_version = oti->oti_pre_version;
 }
 
@@ -584,11 +573,6 @@ static inline void ofd_info2oti(struct ofd_thread_info *info,
                                 struct obd_trans_info *oti)
 {
        oti->oti_xid = info->fti_xid;
                                 struct obd_trans_info *oti)
 {
        oti->oti_xid = info->fti_xid;
-       LASSERTF(ergo(oti->oti_transno > 0,
-                     oti->oti_transno == info->fti_transno),
-                "Overwrite replay transno "LPX64" by "LPX64"\n",
-                oti->oti_transno, info->fti_transno);
-       oti->oti_transno = info->fti_transno;
        oti->oti_pre_version = info->fti_pre_version;
 }
 
        oti->oti_pre_version = info->fti_pre_version;
 }
 
index d1dffb5..edb5bf2 100644 (file)
@@ -1041,15 +1041,6 @@ int ofd_destroy(const struct lu_env *env, struct obd_export *exp,
        else
                count = 1; /* default case - single destroy */
 
        else
                count = 1; /* default case - single destroy */
 
-       /**
-        * There can be sequence of objects to destroy. Therefore this request
-        * may have multiple transaction involved in. It is OK, we need only
-        * the highest used transno to be reported back in reply but not for
-        * replays, they must report their transno
-        */
-       if (info->fti_transno == 0) /* not replay */
-               info->fti_mult_trans = 1;
-
        CDEBUG(D_HA, "%s: Destroy object "DOSTID" count %d\n", ofd_name(ofd),
               POSTID(&oa->o_oi), count);
        while (count > 0) {
        CDEBUG(D_HA, "%s: Destroy object "DOSTID" count %d\n", ofd_name(ofd),
               POSTID(&oa->o_oi), count);
        while (count > 0) {
@@ -1079,22 +1070,6 @@ int ofd_destroy(const struct lu_env *env, struct obd_export *exp,
                ostid_inc_id(&oa->o_oi);
        }
 
                ostid_inc_id(&oa->o_oi);
        }
 
-       /* if we have transaction then there were some deletions, we don't
-        * need to return ENOENT in that case because it will not wait
-        * for commit of these deletions. The ENOENT must be returned only
-        * if there were no transations.
-        */
-       if (rc == -ENOENT) {
-               if (info->fti_transno != 0)
-                       rc = 0;
-       } else if (rc != 0) {
-               /*
-                * If we have at least one transaction then llog record
-                * on server will be removed upon commit, so for rc != 0
-                * we return no transno and llog record will be reprocessed.
-                */
-               info->fti_transno = 0;
-       }
        ofd_info2oti(info, oti);
 out:
        RETURN(rc);
        ofd_info2oti(info, oti);
 out:
        RETURN(rc);
index a14afe2..67cc912 100644 (file)
@@ -64,33 +64,9 @@ struct thandle *ofd_trans_create(const struct lu_env *env,
 int ofd_trans_start(const struct lu_env *env, struct ofd_device *ofd,
                    struct ofd_object *obj, struct thandle *th)
 {
 int ofd_trans_start(const struct lu_env *env, struct ofd_device *ofd,
                    struct ofd_object *obj, struct thandle *th)
 {
-       struct ofd_thread_info  *info = ofd_info(env);
-       int                      rc;
-
-       if (env->le_ses == NULL || info->fti_exp == NULL)
-               return 0;
-
-       /* declare last_rcvd update */
-       rc = dt_declare_record_write(env, ofd->ofd_lut.lut_last_rcvd,
-                                    sizeof(struct lsd_client_data),
-                                    info->fti_exp->exp_target_data.ted_lr_off,
-                                    th);
-       if (rc)
-               RETURN(rc);
-
-       /* declare last_rcvd header update */
-       rc = dt_declare_record_write(env, ofd->ofd_lut.lut_last_rcvd,
-                                    sizeof(ofd->ofd_lut.lut_lsd), 0, th);
-       if (rc)
-               RETURN(rc);
-
        /* version change is required for this object */
        /* version change is required for this object */
-       if (obj) {
-               ofd_info(env)->fti_obj = obj;
-               rc = dt_declare_version_set(env, ofd_object_child(obj), th);
-               if (rc)
-                       RETURN(rc);
-       }
+       if (obj != NULL)
+               tgt_vbr_obj_set(env, ofd_object_child(obj));
 
        return dt_trans_start(env, ofd->ofd_osd, th);
 }
 
        return dt_trans_start(env, ofd->ofd_osd, th);
 }
@@ -101,51 +77,3 @@ void ofd_trans_stop(const struct lu_env *env, struct ofd_device *ofd,
        th->th_result = rc;
        dt_trans_stop(env, ofd->ofd_osd, th);
 }
        th->th_result = rc;
        dt_trans_stop(env, ofd->ofd_osd, th);
 }
-
-/* Update last_rcvd records with the latest transaction data */
-int ofd_txn_stop_cb(const struct lu_env *env, struct thandle *txn,
-                   void *cookie)
-{
-       struct ofd_device       *ofd = cookie;
-       struct ofd_thread_info  *info = ofd_info(env);
-       struct dt_object        *obj;
-       struct tgt_session_info *tsi;
-       bool                     echo_client;
-       int                      rc;
-
-       ENTRY;
-
-       if (env->le_ses == NULL || info->fti_exp == NULL)
-               RETURN(0);
-
-       tsi = tgt_ses_info(env);
-
-       echo_client = (tgt_ses_req(tsi) == NULL);
-
-       if (info->fti_has_trans && !echo_client) {
-               if (info->fti_mult_trans == 0) {
-                       CERROR("More than one transaction "LPU64"\n",
-                              info->fti_transno);
-                       RETURN(0);
-               }
-               /* we need another transno to be assigned */
-               info->fti_transno = 0;
-       } else if (txn->th_result == 0) {
-               info->fti_has_trans = 1;
-       }
-
-       /** VBR: set new versions */
-       if (info->fti_obj != NULL)
-               obj = ofd_object_child(info->fti_obj);
-       else
-               obj = NULL;
-
-       if (unlikely(echo_client)) /* echo client special case */
-               rc = tgt_last_rcvd_update_echo(env, &ofd->ofd_lut, obj, txn,
-                                              tsi->tsi_exp);
-       else
-               rc = tgt_last_rcvd_update(env, &ofd->ofd_lut, obj, 0, txn,
-                                         tgt_ses_req(tsi));
-       RETURN(rc);
-}
-
index 2fceb69..f5f21c8 100644 (file)
@@ -131,7 +131,7 @@ int osd_last_rcvd_subdir_count(struct osd_device *osd)
         struct dentry        *dlast;
         loff_t                off;
         int                   rc = 0;
         struct dentry        *dlast;
         loff_t                off;
         int                   rc = 0;
-       int                   count = FILTER_SUBDIR_COUNT;
+       int                   count = OBJ_SUBDIR_COUNT;
 
         ENTRY;
 
 
         ENTRY;
 
index bd89e6c..a848f87 100644 (file)
@@ -1278,6 +1278,8 @@ int out_handle(struct tgt_session_info *tsi)
        if (rc != 0)
                RETURN(rc);
 
        if (rc != 0)
                RETURN(rc);
 
+       tti->tti_mult_trans = !req_is_replay(tgt_ses_req(tsi));
+
        /* Walk through updates in the request to execute them synchronously */
        off = cfs_size_round(offsetof(struct update_buf, ub_bufs[0]));
        for (i = 0; i < count; i++) {
        /* Walk through updates in the request to execute them synchronously */
        off = cfs_size_round(offsetof(struct update_buf, ub_bufs[0]));
        for (i = 0; i < count; i++) {
index bc7809c..130560f 100644 (file)
@@ -112,6 +112,8 @@ struct tgt_thread_info {
 
        /* transno storage during last_rcvd update */
        __u64                    tti_transno;
 
        /* transno storage during last_rcvd update */
        __u64                    tti_transno;
+       __u32                    tti_has_trans:1,
+                                tti_mult_trans:1;
 
        /* Updates data for OUT target */
        struct thandle_exec_args tti_tea;
 
        /* Updates data for OUT target */
        struct thandle_exec_args tti_tea;
@@ -201,4 +203,10 @@ struct tgt_thread_big_cache {
        struct niobuf_local     local[PTLRPC_MAX_BRW_PAGES];
 };
 
        struct niobuf_local     local[PTLRPC_MAX_BRW_PAGES];
 };
 
+int tgt_server_data_init(const struct lu_env *env, struct lu_target *tgt);
+int tgt_txn_start_cb(const struct lu_env *env, struct thandle *th,
+                    void *cookie);
+int tgt_txn_stop_cb(const struct lu_env *env, struct thandle *th,
+                   void *cookie);
+
 #endif /* _TG_INTERNAL_H */
 #endif /* _TG_INTERNAL_H */
index 01a757f..b5d2831 100644 (file)
@@ -693,7 +693,6 @@ int tgt_last_rcvd_update(const struct lu_env *env, struct lu_target *tgt,
                         struct thandle *th, struct ptlrpc_request *req)
 {
        struct tgt_thread_info  *tti = tgt_th_info(env);
                         struct thandle *th, struct ptlrpc_request *req)
 {
        struct tgt_thread_info  *tti = tgt_th_info(env);
-       struct tgt_session_info *tsi = tgt_ses_info(env);
        struct tg_export_data   *ted;
        __u64                   *transno_p;
        int                      rc = 0;
        struct tg_export_data   *ted;
        __u64                   *transno_p;
        int                      rc = 0;
@@ -701,22 +700,6 @@ int tgt_last_rcvd_update(const struct lu_env *env, struct lu_target *tgt,
 
        ENTRY;
 
 
        ENTRY;
 
-       if (tsi->tsi_has_trans) {
-               /* XXX: currently there are allowed cases, but the wrong cases
-                * are also possible, so better check is needed here */
-               CDEBUG(D_INFO, "More than one transaction "LPU64"\n",
-                      tti->tti_transno);
-               return 0;
-       }
-
-       tsi->tsi_has_trans = 1;
-       /* that can be OUT target and we need tgt_session_info */
-       if (req == NULL) {
-               req = tgt_ses_req(tsi);
-               if (req == NULL) /* echo client case */
-                       RETURN(0);
-       }
-
        ted = &req->rq_export->exp_target_data;
 
        lw_client = exp_connect_flags(req->rq_export) & OBD_CONNECT_LIGHTWEIGHT;
        ted = &req->rq_export->exp_target_data;
 
        lw_client = exp_connect_flags(req->rq_export) & OBD_CONNECT_LIGHTWEIGHT;
@@ -811,7 +794,8 @@ int tgt_last_rcvd_update(const struct lu_env *env, struct lu_target *tgt,
 
        /* Update transno in slot only if non-zero number, i.e. no errors */
        if (likely(tti->tti_transno != 0)) {
 
        /* Update transno in slot only if non-zero number, i.e. no errors */
        if (likely(tti->tti_transno != 0)) {
-               if (*transno_p > tti->tti_transno) {
+               if (*transno_p > tti->tti_transno &&
+                   !tgt->lut_no_reconstruct) {
                        CERROR("%s: trying to overwrite bigger transno:"
                               "on-disk: "LPU64", new: "LPU64" replay: %d. "
                               "see LU-617.\n", tgt_name(tgt), *transno_p,
                        CERROR("%s: trying to overwrite bigger transno:"
                               "on-disk: "LPU64", new: "LPU64" replay: %d. "
                               "see LU-617.\n", tgt_name(tgt), *transno_p,
@@ -842,7 +826,6 @@ srv_update:
                rc = tgt_server_data_write(env, tgt, th);
        return rc;
 }
                rc = tgt_server_data_write(env, tgt, th);
        return rc;
 }
-EXPORT_SYMBOL(tgt_last_rcvd_update);
 
 /*
  * last_rcvd update for echo client simulation.
 
 /*
  * last_rcvd update for echo client simulation.
@@ -886,4 +869,375 @@ int tgt_last_rcvd_update_echo(const struct lu_env *env, struct lu_target *tgt,
        mutex_unlock(&ted->ted_lcd_lock);
        RETURN(rc);
 }
        mutex_unlock(&ted->ted_lcd_lock);
        RETURN(rc);
 }
-EXPORT_SYMBOL(tgt_last_rcvd_update_echo);
+
+int tgt_clients_data_init(const struct lu_env *env, struct lu_target *tgt,
+                         unsigned long last_size)
+{
+       struct obd_device       *obd = tgt->lut_obd;
+       struct lr_server_data   *lsd = &tgt->lut_lsd;
+       struct lsd_client_data  *lcd = NULL;
+       struct tg_export_data   *ted;
+       int                      cl_idx;
+       int                      rc = 0;
+       loff_t                   off = lsd->lsd_client_start;
+
+       ENTRY;
+
+       CLASSERT(offsetof(struct lsd_client_data, lcd_padding) +
+                sizeof(lcd->lcd_padding) == LR_CLIENT_SIZE);
+
+       OBD_ALLOC_PTR(lcd);
+       if (lcd == NULL)
+               RETURN(-ENOMEM);
+
+       for (cl_idx = 0; off < last_size; cl_idx++) {
+               struct obd_export       *exp;
+               __u64                    last_transno;
+
+               /* Don't assume off is incremented properly by
+                * read_record(), in case sizeof(*lcd)
+                * isn't the same as fsd->lsd_client_size.  */
+               off = lsd->lsd_client_start + cl_idx * lsd->lsd_client_size;
+               rc = tgt_client_data_read(env, tgt, lcd, &off, cl_idx);
+               if (rc) {
+                       CERROR("%s: error reading last_rcvd %s idx %d off "
+                              "%llu: rc = %d\n", tgt_name(tgt), LAST_RCVD,
+                              cl_idx, off, rc);
+                       rc = 0;
+                       break; /* read error shouldn't cause startup to fail */
+               }
+
+               if (lcd->lcd_uuid[0] == '\0') {
+                       CDEBUG(D_INFO, "skipping zeroed client at offset %d\n",
+                              cl_idx);
+                       continue;
+               }
+
+               last_transno = lcd_last_transno(lcd);
+
+               /* These exports are cleaned up by disconnect, so they
+                * need to be set up like real exports as connect does.
+                */
+               CDEBUG(D_HA, "RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64
+                      " srv lr: "LPU64" lx: "LPU64"\n", lcd->lcd_uuid, cl_idx,
+                      last_transno, lsd->lsd_last_transno, lcd_last_xid(lcd));
+
+               exp = class_new_export(obd, (struct obd_uuid *)lcd->lcd_uuid);
+               if (IS_ERR(exp)) {
+                       if (PTR_ERR(exp) == -EALREADY) {
+                               /* export already exists, zero out this one */
+                               CERROR("%s: Duplicate export %s!\n",
+                                      tgt_name(tgt), lcd->lcd_uuid);
+                               continue;
+                       }
+                       GOTO(err_out, rc = PTR_ERR(exp));
+               }
+
+               ted = &exp->exp_target_data;
+               *ted->ted_lcd = *lcd;
+
+               rc = tgt_client_add(env, exp, cl_idx);
+               LASSERTF(rc == 0, "rc = %d\n", rc); /* can't fail existing */
+               /* VBR: set export last committed version */
+               exp->exp_last_committed = last_transno;
+               spin_lock(&exp->exp_lock);
+               exp->exp_connecting = 0;
+               exp->exp_in_recovery = 0;
+               spin_unlock(&exp->exp_lock);
+               obd->obd_max_recoverable_clients++;
+               class_export_put(exp);
+
+               /* Need to check last_rcvd even for duplicated exports. */
+               CDEBUG(D_OTHER, "client at idx %d has last_transno = "LPU64"\n",
+                      cl_idx, last_transno);
+
+               spin_lock(&tgt->lut_translock);
+               tgt->lut_last_transno = max(last_transno,
+                                           tgt->lut_last_transno);
+               spin_unlock(&tgt->lut_translock);
+       }
+
+err_out:
+       OBD_FREE_PTR(lcd);
+       RETURN(rc);
+}
+
+struct server_compat_data {
+       __u32 rocompat;
+       __u32 incompat;
+       __u32 rocinit;
+       __u32 incinit;
+};
+
+static struct server_compat_data tgt_scd[] = {
+       [LDD_F_SV_TYPE_MDT] = {
+               .rocompat = OBD_ROCOMPAT_LOVOBJID,
+               .incompat = OBD_INCOMPAT_MDT | OBD_INCOMPAT_COMMON_LR |
+                           OBD_INCOMPAT_FID | OBD_INCOMPAT_IAM_DIR |
+                           OBD_INCOMPAT_LMM_VER | OBD_INCOMPAT_MULTI_OI,
+               .rocinit = OBD_ROCOMPAT_LOVOBJID,
+               .incinit = OBD_INCOMPAT_MDT | OBD_INCOMPAT_COMMON_LR |
+                          OBD_INCOMPAT_MULTI_OI,
+       },
+       [LDD_F_SV_TYPE_OST] = {
+               .rocompat = 0,
+               .incompat = OBD_INCOMPAT_OST | OBD_INCOMPAT_COMMON_LR |
+                           OBD_INCOMPAT_FID,
+               .rocinit = 0,
+               .incinit = OBD_INCOMPAT_OST | OBD_INCOMPAT_COMMON_LR,
+       }
+};
+
+int tgt_server_data_init(const struct lu_env *env, struct lu_target *tgt)
+{
+       struct tgt_thread_info          *tti = tgt_th_info(env);
+       struct lr_server_data           *lsd = &tgt->lut_lsd;
+       unsigned long                    last_rcvd_size;
+       __u32                            index;
+       int                              rc, type;
+
+       rc = dt_attr_get(env, tgt->lut_last_rcvd, &tti->tti_attr, BYPASS_CAPA);
+       if (rc)
+               RETURN(rc);
+
+       last_rcvd_size = (unsigned long)tti->tti_attr.la_size;
+
+       /* ensure padding in the struct is the correct size */
+       CLASSERT(offsetof(struct lr_server_data, lsd_padding) +
+                sizeof(lsd->lsd_padding) == LR_SERVER_SIZE);
+
+       rc = server_name2index(tgt_name(tgt), &index, NULL);
+       if (rc < 0) {
+               CERROR("%s: Can not get index from name: rc = %d\n",
+                      tgt_name(tgt), rc);
+               RETURN(rc);
+       }
+       /* server_name2index() returns type */
+       type = rc;
+       if (type != LDD_F_SV_TYPE_MDT && type != LDD_F_SV_TYPE_OST) {
+               CERROR("%s: unknown target type %x\n", tgt_name(tgt), type);
+               RETURN(-EINVAL);
+       }
+
+       /* last_rcvd on OST doesn't provide reconstruct support because there
+        * may be up to 8 in-flight write requests per single slot in
+        * last_rcvd client data
+        */
+       tgt->lut_no_reconstruct = (type == LDD_F_SV_TYPE_OST);
+
+       if (last_rcvd_size == 0) {
+               LCONSOLE_WARN("%s: new disk, initializing\n", tgt_name(tgt));
+
+               memcpy(lsd->lsd_uuid, tgt->lut_obd->obd_uuid.uuid,
+                      sizeof(lsd->lsd_uuid));
+               lsd->lsd_last_transno = 0;
+               lsd->lsd_mount_count = 0;
+               lsd->lsd_server_size = LR_SERVER_SIZE;
+               lsd->lsd_client_start = LR_CLIENT_START;
+               lsd->lsd_client_size = LR_CLIENT_SIZE;
+               lsd->lsd_subdir_count = OBJ_SUBDIR_COUNT;
+               lsd->lsd_osd_index = index;
+               lsd->lsd_feature_rocompat = tgt_scd[type].rocinit;
+               lsd->lsd_feature_incompat = tgt_scd[type].incinit;
+       } else {
+               rc = tgt_server_data_read(env, tgt);
+               if (rc) {
+                       CERROR("%s: error reading LAST_RCVD: rc= %d\n",
+                              tgt_name(tgt), rc);
+                       RETURN(rc);
+               }
+               if (strcmp(lsd->lsd_uuid, tgt->lut_obd->obd_uuid.uuid)) {
+                       LCONSOLE_ERROR_MSG(0x157, "Trying to start OBD %s "
+                                          "using the wrong disk %s. Were the"
+                                          " /dev/ assignments rearranged?\n",
+                                          tgt->lut_obd->obd_uuid.uuid,
+                                          lsd->lsd_uuid);
+                       RETURN(-EINVAL);
+               }
+
+               if (lsd->lsd_osd_index != index) {
+                       LCONSOLE_ERROR_MSG(0x157, "%s: index %d in last rcvd "
+                                          "is different with the index %d in"
+                                          "config log, It might be disk"
+                                          "corruption!\n", tgt_name(tgt),
+                                          lsd->lsd_osd_index, index);
+                       RETURN(-EINVAL);
+               }
+       }
+
+       if (lsd->lsd_feature_incompat & ~tgt_scd[type].incompat) {
+               CERROR("%s: unsupported incompat filesystem feature(s) %x\n",
+                      tgt_name(tgt),
+                      lsd->lsd_feature_incompat & ~tgt_scd[type].incompat);
+               RETURN(-EINVAL);
+       }
+       if (lsd->lsd_feature_rocompat & ~tgt_scd[type].rocompat) {
+               CERROR("%s: unsupported read-only filesystem feature(s) %x\n",
+                      tgt_name(tgt),
+                      lsd->lsd_feature_rocompat & ~tgt_scd[type].rocompat);
+               RETURN(-EINVAL);
+       }
+       /** Interop: evict all clients at first boot with 1.8 last_rcvd */
+       if (type == LDD_F_SV_TYPE_MDT &&
+           !(lsd->lsd_feature_compat & OBD_COMPAT_20)) {
+               if (last_rcvd_size > lsd->lsd_client_start) {
+                       LCONSOLE_WARN("%s: mounting at first time on 1.8 FS, "
+                                     "remove all clients for interop needs\n",
+                                     tgt_name(tgt));
+                       rc = tgt_truncate_last_rcvd(env, tgt,
+                                                   lsd->lsd_client_start);
+                       if (rc)
+                               RETURN(rc);
+                       last_rcvd_size = lsd->lsd_client_start;
+               }
+               /** set 2.0 flag to upgrade/downgrade between 1.8 and 2.0 */
+               lsd->lsd_feature_compat |= OBD_COMPAT_20;
+       }
+
+       lsd->lsd_feature_incompat |= OBD_INCOMPAT_FID;
+
+       spin_lock(&tgt->lut_translock);
+       tgt->lut_last_transno = lsd->lsd_last_transno;
+       spin_unlock(&tgt->lut_translock);
+
+       lsd->lsd_mount_count++;
+
+       CDEBUG(D_INODE, "=======,=BEGIN DUMPING LAST_RCVD========\n");
+       CDEBUG(D_INODE, "%s: server last_transno: "LPU64"\n",
+              tgt_name(tgt), tgt->lut_last_transno);
+       CDEBUG(D_INODE, "%s: server mount_count: "LPU64"\n",
+              tgt_name(tgt), lsd->lsd_mount_count);
+       CDEBUG(D_INODE, "%s: server data size: %u\n",
+              tgt_name(tgt), lsd->lsd_server_size);
+       CDEBUG(D_INODE, "%s: per-client data start: %u\n",
+              tgt_name(tgt), lsd->lsd_client_start);
+       CDEBUG(D_INODE, "%s: per-client data size: %u\n",
+              tgt_name(tgt), lsd->lsd_client_size);
+       CDEBUG(D_INODE, "%s: last_rcvd size: %lu\n",
+              tgt_name(tgt), last_rcvd_size);
+       CDEBUG(D_INODE, "%s: server subdir_count: %u\n",
+              tgt_name(tgt), lsd->lsd_subdir_count);
+       CDEBUG(D_INODE, "%s: last_rcvd clients: %lu\n", tgt_name(tgt),
+              last_rcvd_size <= lsd->lsd_client_start ? 0 :
+              (last_rcvd_size - lsd->lsd_client_start) /
+               lsd->lsd_client_size);
+       CDEBUG(D_INODE, "========END DUMPING LAST_RCVD========\n");
+
+       if (lsd->lsd_server_size == 0 || lsd->lsd_client_start == 0 ||
+           lsd->lsd_client_size == 0) {
+               CERROR("%s: bad last_rcvd contents!\n", tgt_name(tgt));
+               RETURN(-EINVAL);
+       }
+
+       if (!tgt->lut_obd->obd_replayable)
+               CWARN("%s: recovery support OFF\n", tgt_name(tgt));
+
+       rc = tgt_clients_data_init(env, tgt, last_rcvd_size);
+       if (rc < 0)
+               GOTO(err_client, rc);
+
+       spin_lock(&tgt->lut_translock);
+       /* obd_last_committed is used for compatibility
+        * with other lustre recovery code */
+       tgt->lut_obd->obd_last_committed = tgt->lut_last_transno;
+       spin_unlock(&tgt->lut_translock);
+
+       tgt->lut_obd->u.obt.obt_mount_count = lsd->lsd_mount_count;
+       tgt->lut_obd->u.obt.obt_instance = (__u32)lsd->lsd_mount_count;
+
+       /* save it, so mount count and last_transno is current */
+       rc = tgt_server_data_update(env, tgt, 0);
+       if (rc < 0)
+               GOTO(err_client, rc);
+
+       RETURN(0);
+
+err_client:
+       class_disconnect_exports(tgt->lut_obd);
+       return rc;
+}
+
+/* add credits for last_rcvd update */
+int tgt_txn_start_cb(const struct lu_env *env, struct thandle *th,
+                    void *cookie)
+{
+       struct lu_target        *tgt = cookie;
+       struct tgt_session_info *tsi;
+       int                      rc;
+
+       /* if there is no session, then this transaction is not result of
+        * request processing but some local operation */
+       if (env->le_ses == NULL)
+               return 0;
+
+       LASSERT(tgt->lut_last_rcvd);
+       tsi = tgt_ses_info(env);
+       /* OFD may start transaction without export assigned */
+       if (tsi->tsi_exp == NULL)
+               return 0;
+
+       rc = dt_declare_record_write(env, tgt->lut_last_rcvd,
+                                    sizeof(struct lsd_client_data),
+                                    tsi->tsi_exp->exp_target_data.ted_lr_off,
+                                    th);
+       if (rc)
+               return rc;
+
+       rc = dt_declare_record_write(env, tgt->lut_last_rcvd,
+                                    sizeof(struct lr_server_data), 0, th);
+       if (rc)
+               return rc;
+
+       if (tsi->tsi_vbr_obj != NULL &&
+           !lu_object_remote(&tsi->tsi_vbr_obj->do_lu))
+               rc = dt_declare_version_set(env, tsi->tsi_vbr_obj, th);
+
+       return rc;
+}
+
+/* Update last_rcvd records with latests transaction data */
+int tgt_txn_stop_cb(const struct lu_env *env, struct thandle *th,
+                   void *cookie)
+{
+       struct lu_target        *tgt = cookie;
+       struct tgt_session_info *tsi;
+       struct tgt_thread_info  *tti = tgt_th_info(env);
+       struct dt_object        *obj = NULL;
+       int                      rc;
+       bool                     echo_client;
+
+       if (env->le_ses == NULL)
+               return 0;
+
+       tsi = tgt_ses_info(env);
+       /* OFD may start transaction without export assigned */
+       if (tsi->tsi_exp == NULL)
+               return 0;
+
+       echo_client = (tgt_ses_req(tsi) == NULL);
+
+       if (tti->tti_has_trans && !echo_client) {
+               if (tti->tti_mult_trans == 0) {
+                       CDEBUG(D_HA, "More than one transaction "LPU64"\n",
+                              tti->tti_transno);
+                       RETURN(0);
+               }
+               /* we need another transno to be assigned */
+               tti->tti_transno = 0;
+       } else if (th->th_result == 0) {
+               tti->tti_has_trans = 1;
+       }
+
+       if (tsi->tsi_vbr_obj != NULL &&
+           !lu_object_remote(&tsi->tsi_vbr_obj->do_lu)) {
+               obj = tsi->tsi_vbr_obj;
+       }
+
+       if (unlikely(echo_client)) /* echo client special case */
+               rc = tgt_last_rcvd_update_echo(env, tgt, obj, th,
+                                              tsi->tsi_exp);
+       else
+               rc = tgt_last_rcvd_update(env, tgt, obj, tsi->tsi_opdata, th,
+                                         tgt_ses_req(tsi));
+       return rc;
+}
index 4f7cff5..1d2a933 100644 (file)
@@ -90,16 +90,36 @@ int tgt_init(const struct lu_env *env, struct lu_target *lut,
        lu_local_obj_fid(&fid, LAST_RECV_OID);
 
        o = dt_find_or_create(env, lut->lut_bottom, &fid, &dof, &attr);
        lu_local_obj_fid(&fid, LAST_RECV_OID);
 
        o = dt_find_or_create(env, lut->lut_bottom, &fid, &dof, &attr);
-       if (!IS_ERR(o)) {
-               lut->lut_last_rcvd = o;
-       } else {
-               OBD_FREE(lut->lut_client_bitmap, LR_MAX_CLIENTS >> 3);
-               lut->lut_client_bitmap = NULL;
+       if (IS_ERR(o)) {
                rc = PTR_ERR(o);
                rc = PTR_ERR(o);
-               CERROR("cannot open %s: rc = %d\n", LAST_RCVD, rc);
+               CERROR("%s: cannot open LAST_RCVD: rc = %d\n", tgt_name(lut),
+                      rc);
+               GOTO(out_bitmap, rc);
        }
 
        }
 
-       RETURN(rc);
+       lut->lut_last_rcvd = o;
+       rc = tgt_server_data_init(env, lut);
+       if (rc < 0)
+               GOTO(out_obj, rc);
+
+       /* prepare transactions callbacks */
+       lut->lut_txn_cb.dtc_txn_start = tgt_txn_start_cb;
+       lut->lut_txn_cb.dtc_txn_stop = tgt_txn_stop_cb;
+       lut->lut_txn_cb.dtc_txn_commit = NULL;
+       lut->lut_txn_cb.dtc_cookie = lut;
+       lut->lut_txn_cb.dtc_tag = LCT_DT_THREAD | LCT_MD_THREAD;
+       CFS_INIT_LIST_HEAD(&lut->lut_txn_cb.dtc_linkage);
+
+       dt_txn_callback_add(lut->lut_bottom, &lut->lut_txn_cb);
+
+       RETURN(0);
+out_obj:
+       lu_object_put(env, &lut->lut_last_rcvd->do_lu);
+       lut->lut_last_rcvd = NULL;
+out_bitmap:
+       OBD_FREE(lut->lut_client_bitmap, LR_MAX_CLIENTS >> 3);
+       lut->lut_client_bitmap = NULL;
+       return rc;
 }
 EXPORT_SYMBOL(tgt_init);
 
 }
 EXPORT_SYMBOL(tgt_init);
 
@@ -114,6 +134,7 @@ void tgt_fini(const struct lu_env *env, struct lu_target *lut)
                lut->lut_client_bitmap = NULL;
        }
        if (lut->lut_last_rcvd) {
                lut->lut_client_bitmap = NULL;
        }
        if (lut->lut_last_rcvd) {
+               dt_txn_callback_del(lut->lut_bottom, &lut->lut_txn_cb);
                lu_object_put(env, &lut->lut_last_rcvd->do_lu);
                lut->lut_last_rcvd = NULL;
        }
                lu_object_put(env, &lut->lut_last_rcvd->do_lu);
                lut->lut_last_rcvd = NULL;
        }
@@ -123,9 +144,22 @@ EXPORT_SYMBOL(tgt_fini);
 
 /* context key constructor/destructor: tg_key_init, tg_key_fini */
 LU_KEY_INIT_FINI(tgt, struct tgt_thread_info);
 
 /* context key constructor/destructor: tg_key_init, tg_key_fini */
 LU_KEY_INIT_FINI(tgt, struct tgt_thread_info);
+static void tgt_key_exit(const struct lu_context *ctx,
+                        struct lu_context_key *key, void *data)
+{
+       struct tgt_thread_info *tti = data;
+
+       tti->tti_has_trans = 0;
+       tti->tti_mult_trans = 0;
+}
 
 /* context key: tg_thread_key */
 
 /* context key: tg_thread_key */
-LU_CONTEXT_KEY_DEFINE(tgt, LCT_MD_THREAD | LCT_DT_THREAD);
+struct lu_context_key tgt_thread_key = {
+       .lct_tags = LCT_MD_THREAD | LCT_DT_THREAD,
+       .lct_init = tgt_key_init,
+       .lct_fini = tgt_key_fini,
+       .lct_exit = tgt_key_exit,
+};
 EXPORT_SYMBOL(tgt_thread_key);
 
 LU_KEY_INIT_GENERIC(tgt);
 EXPORT_SYMBOL(tgt_thread_key);
 
 LU_KEY_INIT_GENERIC(tgt);